diff --git a/src/NATS.Server/JetStream/Cluster/AssetPlacementPlanner.cs b/src/NATS.Server/JetStream/Cluster/AssetPlacementPlanner.cs new file mode 100644 index 0000000..6722fda --- /dev/null +++ b/src/NATS.Server/JetStream/Cluster/AssetPlacementPlanner.cs @@ -0,0 +1,17 @@ +namespace NATS.Server.JetStream.Cluster; + +public sealed class AssetPlacementPlanner +{ + private readonly int _nodes; + + public AssetPlacementPlanner(int nodes) + { + _nodes = Math.Max(nodes, 1); + } + + public IReadOnlyList PlanReplicas(int replicas) + { + var count = Math.Min(Math.Max(replicas, 1), _nodes); + return Enumerable.Range(1, count).ToArray(); + } +} diff --git a/src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs b/src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs new file mode 100644 index 0000000..75943d1 --- /dev/null +++ b/src/NATS.Server/JetStream/Cluster/JetStreamMetaGroup.cs @@ -0,0 +1,36 @@ +using System.Collections.Concurrent; +using NATS.Server.JetStream.Models; + +namespace NATS.Server.JetStream.Cluster; + +public sealed class JetStreamMetaGroup +{ + private readonly int _nodes; + private readonly ConcurrentDictionary _streams = new(StringComparer.Ordinal); + + public JetStreamMetaGroup(int nodes) + { + _nodes = nodes; + } + + public Task ProposeCreateStreamAsync(StreamConfig config, CancellationToken ct) + { + _streams[config.Name] = 0; + return Task.CompletedTask; + } + + public MetaGroupState GetState() + { + return new MetaGroupState + { + Streams = _streams.Keys.OrderBy(x => x, StringComparer.Ordinal).ToArray(), + ClusterSize = _nodes, + }; + } +} + +public sealed class MetaGroupState +{ + public IReadOnlyList Streams { get; init; } = []; + public int ClusterSize { get; init; } +} diff --git a/src/NATS.Server/JetStream/ConsumerManager.cs b/src/NATS.Server/JetStream/ConsumerManager.cs index 36d778e..68ba351 100644 --- a/src/NATS.Server/JetStream/ConsumerManager.cs +++ b/src/NATS.Server/JetStream/ConsumerManager.cs @@ -1,5 +1,6 @@ using System.Collections.Concurrent; using NATS.Server.JetStream.Api; +using NATS.Server.JetStream.Cluster; using NATS.Server.JetStream.Consumers; using NATS.Server.JetStream.Models; using NATS.Server.JetStream.Storage; @@ -8,10 +9,16 @@ namespace NATS.Server.JetStream; public sealed class ConsumerManager { + private readonly JetStreamMetaGroup? _metaGroup; private readonly ConcurrentDictionary<(string Stream, string Name), ConsumerHandle> _consumers = new(); private readonly PullConsumerEngine _pullConsumerEngine = new(); private readonly PushConsumerEngine _pushConsumerEngine = new(); + public ConsumerManager(JetStreamMetaGroup? metaGroup = null) + { + _metaGroup = metaGroup; + } + public int ConsumerCount => _consumers.Count; public JetStreamApiResponse CreateOrUpdate(string stream, ConsumerConfig config) diff --git a/src/NATS.Server/JetStream/StreamManager.cs b/src/NATS.Server/JetStream/StreamManager.cs index f33061f..b294cff 100644 --- a/src/NATS.Server/JetStream/StreamManager.cs +++ b/src/NATS.Server/JetStream/StreamManager.cs @@ -1,5 +1,6 @@ using System.Collections.Concurrent; using NATS.Server.JetStream.Api; +using NATS.Server.JetStream.Cluster; using NATS.Server.JetStream.MirrorSource; using NATS.Server.JetStream.Models; using NATS.Server.JetStream.Publish; @@ -10,6 +11,7 @@ namespace NATS.Server.JetStream; public sealed class StreamManager { + private readonly JetStreamMetaGroup? _metaGroup; private readonly ConcurrentDictionary _streams = new(StringComparer.Ordinal); private readonly ConcurrentDictionary> _mirrorsByOrigin = @@ -17,6 +19,11 @@ public sealed class StreamManager private readonly ConcurrentDictionary> _sourcesByOrigin = new(StringComparer.Ordinal); + public StreamManager(JetStreamMetaGroup? metaGroup = null) + { + _metaGroup = metaGroup; + } + public IReadOnlyCollection StreamNames => _streams.Keys.ToArray(); public JetStreamApiResponse CreateOrUpdate(StreamConfig config) @@ -30,6 +37,7 @@ public sealed class StreamManager _ => new StreamHandle(normalized, new MemStore()), (_, existing) => existing with { Config = normalized }); RebuildReplicationCoordinators(); + _metaGroup?.ProposeCreateStreamAsync(normalized, default).GetAwaiter().GetResult(); return BuildStreamInfoResponse(handle); } diff --git a/tests/NATS.Server.Tests/JetStreamMetaGroupTests.cs b/tests/NATS.Server.Tests/JetStreamMetaGroupTests.cs new file mode 100644 index 0000000..4f669f6 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStreamMetaGroupTests.cs @@ -0,0 +1,54 @@ +using NATS.Server.JetStream; +using NATS.Server.JetStream.Cluster; +using NATS.Server.JetStream.Models; + +namespace NATS.Server.Tests; + +public class JetStreamMetaGroupTests +{ + [Fact] + public async Task Stream_create_requires_meta_group_commit() + { + await using var fixture = await JetStreamClusterFixture.StartAsync(nodes: 3); + + var result = await fixture.CreateStreamAsync("ORDERS", replicas: 3); + result.Error.ShouldBeNull(); + + var meta = await fixture.GetMetaStateAsync(); + meta.Streams.ShouldContain("ORDERS"); + } +} + +internal sealed class JetStreamClusterFixture : IAsyncDisposable +{ + private readonly JetStreamMetaGroup _metaGroup; + private readonly StreamManager _streamManager; + + private JetStreamClusterFixture(JetStreamMetaGroup metaGroup, StreamManager streamManager) + { + _metaGroup = metaGroup; + _streamManager = streamManager; + } + + public static Task StartAsync(int nodes) + { + var meta = new JetStreamMetaGroup(nodes); + var streamManager = new StreamManager(meta); + return Task.FromResult(new JetStreamClusterFixture(meta, streamManager)); + } + + public Task CreateStreamAsync(string name, int replicas) + { + var response = _streamManager.CreateOrUpdate(new StreamConfig + { + Name = name, + Subjects = [name.ToLowerInvariant() + ".*"], + Replicas = replicas, + }); + return Task.FromResult(response); + } + + public Task GetMetaStateAsync() => Task.FromResult(_metaGroup.GetState()); + + public ValueTask DisposeAsync() => ValueTask.CompletedTask; +}