feat: integrate jetstream meta-group placement

This commit is contained in:
Joseph Doherty
2026-02-23 06:16:01 -05:00
parent 005600b9b8
commit 23216d0a48
5 changed files with 122 additions and 0 deletions

View File

@@ -0,0 +1,17 @@
namespace NATS.Server.JetStream.Cluster;
public sealed class AssetPlacementPlanner
{
private readonly int _nodes;
public AssetPlacementPlanner(int nodes)
{
_nodes = Math.Max(nodes, 1);
}
public IReadOnlyList<int> PlanReplicas(int replicas)
{
var count = Math.Min(Math.Max(replicas, 1), _nodes);
return Enumerable.Range(1, count).ToArray();
}
}

View File

@@ -0,0 +1,36 @@
using System.Collections.Concurrent;
using NATS.Server.JetStream.Models;
namespace NATS.Server.JetStream.Cluster;
public sealed class JetStreamMetaGroup
{
private readonly int _nodes;
private readonly ConcurrentDictionary<string, byte> _streams = new(StringComparer.Ordinal);
public JetStreamMetaGroup(int nodes)
{
_nodes = nodes;
}
public Task ProposeCreateStreamAsync(StreamConfig config, CancellationToken ct)
{
_streams[config.Name] = 0;
return Task.CompletedTask;
}
public MetaGroupState GetState()
{
return new MetaGroupState
{
Streams = _streams.Keys.OrderBy(x => x, StringComparer.Ordinal).ToArray(),
ClusterSize = _nodes,
};
}
}
public sealed class MetaGroupState
{
public IReadOnlyList<string> Streams { get; init; } = [];
public int ClusterSize { get; init; }
}

View File

@@ -1,5 +1,6 @@
using System.Collections.Concurrent;
using NATS.Server.JetStream.Api;
using NATS.Server.JetStream.Cluster;
using NATS.Server.JetStream.Consumers;
using NATS.Server.JetStream.Models;
using NATS.Server.JetStream.Storage;
@@ -8,10 +9,16 @@ namespace NATS.Server.JetStream;
public sealed class ConsumerManager
{
private readonly JetStreamMetaGroup? _metaGroup;
private readonly ConcurrentDictionary<(string Stream, string Name), ConsumerHandle> _consumers = new();
private readonly PullConsumerEngine _pullConsumerEngine = new();
private readonly PushConsumerEngine _pushConsumerEngine = new();
public ConsumerManager(JetStreamMetaGroup? metaGroup = null)
{
_metaGroup = metaGroup;
}
public int ConsumerCount => _consumers.Count;
public JetStreamApiResponse CreateOrUpdate(string stream, ConsumerConfig config)

View File

@@ -1,5 +1,6 @@
using System.Collections.Concurrent;
using NATS.Server.JetStream.Api;
using NATS.Server.JetStream.Cluster;
using NATS.Server.JetStream.MirrorSource;
using NATS.Server.JetStream.Models;
using NATS.Server.JetStream.Publish;
@@ -10,6 +11,7 @@ namespace NATS.Server.JetStream;
public sealed class StreamManager
{
private readonly JetStreamMetaGroup? _metaGroup;
private readonly ConcurrentDictionary<string, StreamHandle> _streams =
new(StringComparer.Ordinal);
private readonly ConcurrentDictionary<string, List<MirrorCoordinator>> _mirrorsByOrigin =
@@ -17,6 +19,11 @@ public sealed class StreamManager
private readonly ConcurrentDictionary<string, List<SourceCoordinator>> _sourcesByOrigin =
new(StringComparer.Ordinal);
public StreamManager(JetStreamMetaGroup? metaGroup = null)
{
_metaGroup = metaGroup;
}
public IReadOnlyCollection<string> StreamNames => _streams.Keys.ToArray();
public JetStreamApiResponse CreateOrUpdate(StreamConfig config)
@@ -30,6 +37,7 @@ public sealed class StreamManager
_ => new StreamHandle(normalized, new MemStore()),
(_, existing) => existing with { Config = normalized });
RebuildReplicationCoordinators();
_metaGroup?.ProposeCreateStreamAsync(normalized, default).GetAwaiter().GetResult();
return BuildStreamInfoResponse(handle);
}

View File

@@ -0,0 +1,54 @@
using NATS.Server.JetStream;
using NATS.Server.JetStream.Cluster;
using NATS.Server.JetStream.Models;
namespace NATS.Server.Tests;
public class JetStreamMetaGroupTests
{
[Fact]
public async Task Stream_create_requires_meta_group_commit()
{
await using var fixture = await JetStreamClusterFixture.StartAsync(nodes: 3);
var result = await fixture.CreateStreamAsync("ORDERS", replicas: 3);
result.Error.ShouldBeNull();
var meta = await fixture.GetMetaStateAsync();
meta.Streams.ShouldContain("ORDERS");
}
}
internal sealed class JetStreamClusterFixture : IAsyncDisposable
{
private readonly JetStreamMetaGroup _metaGroup;
private readonly StreamManager _streamManager;
private JetStreamClusterFixture(JetStreamMetaGroup metaGroup, StreamManager streamManager)
{
_metaGroup = metaGroup;
_streamManager = streamManager;
}
public static Task<JetStreamClusterFixture> StartAsync(int nodes)
{
var meta = new JetStreamMetaGroup(nodes);
var streamManager = new StreamManager(meta);
return Task.FromResult(new JetStreamClusterFixture(meta, streamManager));
}
public Task<NATS.Server.JetStream.Api.JetStreamApiResponse> CreateStreamAsync(string name, int replicas)
{
var response = _streamManager.CreateOrUpdate(new StreamConfig
{
Name = name,
Subjects = [name.ToLowerInvariant() + ".*"],
Replicas = replicas,
});
return Task.FromResult(response);
}
public Task<MetaGroupState> GetMetaStateAsync() => Task.FromResult(_metaGroup.GetState());
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
}