From f1d3c19594b0c2155fddad3547ff4214c302e1e9 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 23 Feb 2026 06:10:41 -0500 Subject: [PATCH] feat: add jetstream mirror and source orchestration --- .../MirrorSource/MirrorCoordinator.cs | 16 ++++++ .../MirrorSource/SourceCoordinator.cs | 16 ++++++ .../JetStream/Models/StreamConfig.cs | 2 + src/NATS.Server/JetStream/StreamManager.cs | 49 +++++++++++++++++++ .../NATS.Server.Tests/JetStreamApiFixture.cs | 29 +++++++++++ .../JetStreamMirrorSourceTests.cs | 16 ++++++ 6 files changed, 128 insertions(+) create mode 100644 src/NATS.Server/JetStream/MirrorSource/MirrorCoordinator.cs create mode 100644 src/NATS.Server/JetStream/MirrorSource/SourceCoordinator.cs create mode 100644 tests/NATS.Server.Tests/JetStreamMirrorSourceTests.cs diff --git a/src/NATS.Server/JetStream/MirrorSource/MirrorCoordinator.cs b/src/NATS.Server/JetStream/MirrorSource/MirrorCoordinator.cs new file mode 100644 index 0000000..d66ab04 --- /dev/null +++ b/src/NATS.Server/JetStream/MirrorSource/MirrorCoordinator.cs @@ -0,0 +1,16 @@ +using NATS.Server.JetStream.Storage; + +namespace NATS.Server.JetStream.MirrorSource; + +public sealed class MirrorCoordinator +{ + private readonly IStreamStore _targetStore; + + public MirrorCoordinator(IStreamStore targetStore) + { + _targetStore = targetStore; + } + + public Task OnOriginAppendAsync(StoredMessage message, CancellationToken ct) + => _targetStore.AppendAsync(message.Subject, message.Payload, ct).AsTask(); +} diff --git a/src/NATS.Server/JetStream/MirrorSource/SourceCoordinator.cs b/src/NATS.Server/JetStream/MirrorSource/SourceCoordinator.cs new file mode 100644 index 0000000..c011ac5 --- /dev/null +++ b/src/NATS.Server/JetStream/MirrorSource/SourceCoordinator.cs @@ -0,0 +1,16 @@ +using NATS.Server.JetStream.Storage; + +namespace NATS.Server.JetStream.MirrorSource; + +public sealed class SourceCoordinator +{ + private readonly IStreamStore _targetStore; + + public SourceCoordinator(IStreamStore targetStore) + { + _targetStore = targetStore; + } + + public Task OnOriginAppendAsync(StoredMessage message, CancellationToken ct) + => _targetStore.AppendAsync(message.Subject, message.Payload, ct).AsTask(); +} diff --git a/src/NATS.Server/JetStream/Models/StreamConfig.cs b/src/NATS.Server/JetStream/Models/StreamConfig.cs index eccc10f..480702b 100644 --- a/src/NATS.Server/JetStream/Models/StreamConfig.cs +++ b/src/NATS.Server/JetStream/Models/StreamConfig.cs @@ -6,4 +6,6 @@ public sealed class StreamConfig public List Subjects { get; set; } = []; public int MaxMsgs { get; set; } public int Replicas { get; set; } = 1; + public string? Mirror { get; set; } + public string? Source { get; set; } } diff --git a/src/NATS.Server/JetStream/StreamManager.cs b/src/NATS.Server/JetStream/StreamManager.cs index f7dc151..f33061f 100644 --- a/src/NATS.Server/JetStream/StreamManager.cs +++ b/src/NATS.Server/JetStream/StreamManager.cs @@ -1,5 +1,6 @@ using System.Collections.Concurrent; using NATS.Server.JetStream.Api; +using NATS.Server.JetStream.MirrorSource; using NATS.Server.JetStream.Models; using NATS.Server.JetStream.Publish; using NATS.Server.JetStream.Storage; @@ -11,6 +12,10 @@ public sealed class StreamManager { private readonly ConcurrentDictionary _streams = new(StringComparer.Ordinal); + private readonly ConcurrentDictionary> _mirrorsByOrigin = + new(StringComparer.Ordinal); + private readonly ConcurrentDictionary> _sourcesByOrigin = + new(StringComparer.Ordinal); public IReadOnlyCollection StreamNames => _streams.Keys.ToArray(); @@ -24,6 +29,7 @@ public sealed class StreamManager normalized.Name, _ => new StreamHandle(normalized, new MemStore()), (_, existing) => existing with { Config = normalized }); + RebuildReplicationCoordinators(); return BuildStreamInfoResponse(handle); } @@ -65,6 +71,9 @@ public sealed class StreamManager var seq = stream.Store.AppendAsync(subject, payload, default).GetAwaiter().GetResult(); EnforceLimits(stream); + var stored = stream.Store.LoadAsync(seq, default).GetAwaiter().GetResult(); + if (stored != null) + ReplicateIfConfigured(stream.Config.Name, stored); return new PubAck { @@ -81,6 +90,8 @@ public sealed class StreamManager Subjects = config.Subjects.Count == 0 ? [] : [.. config.Subjects], MaxMsgs = config.MaxMsgs, Replicas = config.Replicas, + Mirror = config.Mirror, + Source = config.Source, }; return copy; @@ -114,6 +125,44 @@ public sealed class StreamManager if (stream.Store is FileStore fileStore) fileStore.TrimToMaxMessages(maxMessages); } + + private void RebuildReplicationCoordinators() + { + _mirrorsByOrigin.Clear(); + _sourcesByOrigin.Clear(); + + foreach (var stream in _streams.Values) + { + if (!string.IsNullOrWhiteSpace(stream.Config.Mirror) + && _streams.TryGetValue(stream.Config.Mirror, out _)) + { + var list = _mirrorsByOrigin.GetOrAdd(stream.Config.Mirror, _ => []); + list.Add(new MirrorCoordinator(stream.Store)); + } + + if (!string.IsNullOrWhiteSpace(stream.Config.Source) + && _streams.TryGetValue(stream.Config.Source, out _)) + { + var list = _sourcesByOrigin.GetOrAdd(stream.Config.Source, _ => []); + list.Add(new SourceCoordinator(stream.Store)); + } + } + } + + private void ReplicateIfConfigured(string originStream, StoredMessage stored) + { + if (_mirrorsByOrigin.TryGetValue(originStream, out var mirrors)) + { + foreach (var mirror in mirrors) + mirror.OnOriginAppendAsync(stored, default).GetAwaiter().GetResult(); + } + + if (_sourcesByOrigin.TryGetValue(originStream, out var sources)) + { + foreach (var source in sources) + source.OnOriginAppendAsync(stored, default).GetAwaiter().GetResult(); + } + } } public sealed record StreamHandle(StreamConfig Config, IStreamStore Store); diff --git a/tests/NATS.Server.Tests/JetStreamApiFixture.cs b/tests/NATS.Server.Tests/JetStreamApiFixture.cs index 2561048..39d9f93 100644 --- a/tests/NATS.Server.Tests/JetStreamApiFixture.cs +++ b/tests/NATS.Server.Tests/JetStreamApiFixture.cs @@ -61,6 +61,18 @@ internal sealed class JetStreamApiFixture : IAsyncDisposable return fixture; } + public static async Task StartWithMirrorSetupAsync() + { + var fixture = await StartWithStreamAsync("ORDERS", "orders.*"); + _ = fixture._streamManager.CreateOrUpdate(new StreamConfig + { + Name = "ORDERS_MIRROR", + Subjects = ["orders.mirror.*"], + Mirror = "ORDERS", + }); + return fixture; + } + public Task PublishAndGetAckAsync(string subject, string payload, string? msgId = null, bool expectError = false) { if (_publisher.TryCapture(subject, Encoding.UTF8.GetBytes(payload), msgId, out var ack)) @@ -81,6 +93,11 @@ internal sealed class JetStreamApiFixture : IAsyncDisposable throw new InvalidOperationException($"No stream matched subject '{subject}'."); } + public Task PublishAndGetAckAsync(string streamName, string subject, string payload) + { + return PublishAndGetAckAsync(subject, payload); + } + public Task RequestLocalAsync(string subject, string payload) { return Task.FromResult(_router.Route(subject, Encoding.UTF8.GetBytes(payload))); @@ -122,5 +139,17 @@ internal sealed class JetStreamApiFixture : IAsyncDisposable return Task.FromResult(frame); } + public async Task WaitForMirrorSyncAsync(string streamName) + { + using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(2)); + while (!timeout.IsCancellationRequested) + { + var state = await GetStreamStateAsync(streamName); + if (state.Messages > 0) + return; + await Task.Delay(25, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default); + } + } + public ValueTask DisposeAsync() => ValueTask.CompletedTask; } diff --git a/tests/NATS.Server.Tests/JetStreamMirrorSourceTests.cs b/tests/NATS.Server.Tests/JetStreamMirrorSourceTests.cs new file mode 100644 index 0000000..0d87416 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStreamMirrorSourceTests.cs @@ -0,0 +1,16 @@ +namespace NATS.Server.Tests; + +public class JetStreamMirrorSourceTests +{ + [Fact] + public async Task Mirror_stream_replays_origin_messages() + { + await using var fixture = await JetStreamApiFixture.StartWithMirrorSetupAsync(); + + await fixture.PublishAndGetAckAsync("ORDERS", "orders.created", "1"); + await fixture.WaitForMirrorSyncAsync("ORDERS_MIRROR"); + + var state = await fixture.GetStreamStateAsync("ORDERS_MIRROR"); + state.Messages.ShouldBe((ulong)1); + } +}