using NATS.Server.JetStream.MirrorSource; using NATS.Server.JetStream.Storage; namespace NATS.Server.Tests.JetStream.MirrorSource; // Go reference: server/stream.go:2788-2854 (processMirrorMsgs) // Go reference: server/stream.go:2863-3014 (processInboundMirrorMsg) // Go reference: server/stream.go:3125-3400 (setupMirrorConsumer) public class MirrorSyncTests { // ------------------------------------------------------------------------- // Direct in-process synchronization tests // ------------------------------------------------------------------------- [Fact] // Go reference: server/stream.go:2915 — sseq == mset.mirror.sseq+1 (normal in-order) public async Task Mirror_applies_single_message_and_tracks_sequence() { var target = new MemStore(); var mirror = new MirrorCoordinator(target); var msg = MakeMessage(seq: 1, subject: "orders.created", payload: "order-1"); await mirror.OnOriginAppendAsync(msg, default); mirror.LastOriginSequence.ShouldBe(1UL); mirror.LastSyncUtc.ShouldNotBe(default(DateTime)); mirror.Lag.ShouldBe(0UL); var stored = await target.LoadAsync(1, default); stored.ShouldNotBeNull(); stored.Subject.ShouldBe("orders.created"); } [Fact] // Go reference: server/stream.go:2915-2917 — sequential messages increment sseq/dseq public async Task Mirror_applies_sequential_messages_in_order() { var target = new MemStore(); var mirror = new MirrorCoordinator(target); for (ulong i = 1; i <= 5; i++) { await mirror.OnOriginAppendAsync( MakeMessage(seq: i, subject: $"orders.{i}", payload: $"payload-{i}"), default); } mirror.LastOriginSequence.ShouldBe(5UL); var state = await target.GetStateAsync(default); state.Messages.ShouldBe(5UL); } [Fact] // Go reference: server/stream.go:2918-2921 — sseq <= mset.mirror.sseq (ignore older) public async Task Mirror_ignores_older_duplicate_messages() { var target = new MemStore(); var mirror = new MirrorCoordinator(target); await mirror.OnOriginAppendAsync(MakeMessage(seq: 5, subject: "a", payload: "1"), default); await mirror.OnOriginAppendAsync(MakeMessage(seq: 3, subject: "b", payload: "2"), default); // older await mirror.OnOriginAppendAsync(MakeMessage(seq: 5, subject: "c", payload: "3"), default); // same mirror.LastOriginSequence.ShouldBe(5UL); var state = await target.GetStateAsync(default); state.Messages.ShouldBe(1UL); // only seq 5 stored } [Fact] // Go reference: server/stream.go:2927-2936 — gap handling (sseq > mirror.sseq+1) public async Task Mirror_handles_sequence_gaps_from_origin() { var target = new MemStore(); var mirror = new MirrorCoordinator(target); await mirror.OnOriginAppendAsync(MakeMessage(seq: 1, subject: "a", payload: "1"), default); // Gap: origin deleted seq 2-4 await mirror.OnOriginAppendAsync(MakeMessage(seq: 5, subject: "b", payload: "2"), default); mirror.LastOriginSequence.ShouldBe(5UL); var state = await target.GetStateAsync(default); state.Messages.ShouldBe(2UL); } [Fact] public async Task Mirror_first_message_at_arbitrary_sequence() { var target = new MemStore(); var mirror = new MirrorCoordinator(target); // First message arrives at seq 100 (origin has prior history) await mirror.OnOriginAppendAsync(MakeMessage(seq: 100, subject: "a", payload: "1"), default); mirror.LastOriginSequence.ShouldBe(100UL); var stored = await target.LoadAsync(1, default); stored.ShouldNotBeNull(); } // ------------------------------------------------------------------------- // Health reporting tests // ------------------------------------------------------------------------- [Fact] // Go reference: server/stream.go:2739-2743 (mirrorInfo) public async Task Health_report_reflects_current_state() { var target = new MemStore(); var mirror = new MirrorCoordinator(target); var report = mirror.GetHealthReport(originLastSeq: 10); report.LastOriginSequence.ShouldBe(0UL); report.Lag.ShouldBe(10UL); report.IsRunning.ShouldBeFalse(); await mirror.OnOriginAppendAsync(MakeMessage(seq: 7, subject: "a", payload: "1"), default); report = mirror.GetHealthReport(originLastSeq: 10); report.LastOriginSequence.ShouldBe(7UL); report.Lag.ShouldBe(3UL); } [Fact] public async Task Health_report_shows_zero_lag_when_caught_up() { var target = new MemStore(); var mirror = new MirrorCoordinator(target); await mirror.OnOriginAppendAsync(MakeMessage(seq: 10, subject: "a", payload: "1"), default); var report = mirror.GetHealthReport(originLastSeq: 10); report.Lag.ShouldBe(0UL); } // ------------------------------------------------------------------------- // Background sync loop: channel-based // Go reference: server/stream.go:2788-2854 (processMirrorMsgs goroutine) // ------------------------------------------------------------------------- [Fact] public async Task Channel_sync_loop_processes_enqueued_messages() { var target = new MemStore(); await using var mirror = new MirrorCoordinator(target); mirror.StartSyncLoop(); mirror.IsRunning.ShouldBeTrue(); mirror.TryEnqueue(MakeMessage(seq: 1, subject: "a", payload: "1")); mirror.TryEnqueue(MakeMessage(seq: 2, subject: "b", payload: "2")); await WaitForConditionAsync(() => mirror.LastOriginSequence >= 2, TimeSpan.FromSeconds(5)); mirror.LastOriginSequence.ShouldBe(2UL); var state = await target.GetStateAsync(default); state.Messages.ShouldBe(2UL); } [Fact] public async Task Channel_sync_loop_can_be_stopped() { var target = new MemStore(); await using var mirror = new MirrorCoordinator(target); mirror.StartSyncLoop(); mirror.IsRunning.ShouldBeTrue(); await mirror.StopAsync(); mirror.IsRunning.ShouldBeFalse(); } [Fact] public async Task Channel_sync_loop_ignores_duplicates() { var target = new MemStore(); await using var mirror = new MirrorCoordinator(target); mirror.StartSyncLoop(); mirror.TryEnqueue(MakeMessage(seq: 1, subject: "a", payload: "1")); mirror.TryEnqueue(MakeMessage(seq: 1, subject: "a", payload: "1")); // duplicate mirror.TryEnqueue(MakeMessage(seq: 2, subject: "b", payload: "2")); await WaitForConditionAsync(() => mirror.LastOriginSequence >= 2, TimeSpan.FromSeconds(5)); var state = await target.GetStateAsync(default); state.Messages.ShouldBe(2UL); } // ------------------------------------------------------------------------- // Background sync loop: pull-based // Go reference: server/stream.go:3125-3400 (setupMirrorConsumer) // ------------------------------------------------------------------------- [Fact] public async Task Pull_sync_loop_fetches_from_origin_store() { var origin = new MemStore(); var target = new MemStore(); await using var mirror = new MirrorCoordinator(target); // Pre-populate origin await origin.AppendAsync("a", "1"u8.ToArray(), default); await origin.AppendAsync("b", "2"u8.ToArray(), default); await origin.AppendAsync("c", "3"u8.ToArray(), default); mirror.StartPullSyncLoop(origin); await WaitForConditionAsync(() => mirror.LastOriginSequence >= 3, TimeSpan.FromSeconds(5)); mirror.LastOriginSequence.ShouldBe(3UL); var state = await target.GetStateAsync(default); state.Messages.ShouldBe(3UL); } [Fact] public async Task Pull_sync_loop_catches_up_after_restart() { var origin = new MemStore(); var target = new MemStore(); // Phase 1: sync first 2 messages { await using var mirror = new MirrorCoordinator(target); await origin.AppendAsync("a", "1"u8.ToArray(), default); await origin.AppendAsync("b", "2"u8.ToArray(), default); mirror.StartPullSyncLoop(origin); await WaitForConditionAsync(() => mirror.LastOriginSequence >= 2, TimeSpan.FromSeconds(5)); await mirror.StopAsync(); } // Phase 2: add more messages and restart with new coordinator await origin.AppendAsync("c", "3"u8.ToArray(), default); await origin.AppendAsync("d", "4"u8.ToArray(), default); { // Simulate restart: new coordinator, same target store await using var mirror2 = new MirrorCoordinator(target); // Manually sync to simulate catchup from seq 2 await mirror2.OnOriginAppendAsync( new StoredMessage { Sequence = 3, Subject = "c", Payload = "3"u8.ToArray() }, default); await mirror2.OnOriginAppendAsync( new StoredMessage { Sequence = 4, Subject = "d", Payload = "4"u8.ToArray() }, default); mirror2.LastOriginSequence.ShouldBe(4UL); } var state = await target.GetStateAsync(default); state.Messages.ShouldBe(4UL); } [Fact] public async Task Pull_sync_loop_updates_lag() { var origin = new MemStore(); var target = new MemStore(); await using var mirror = new MirrorCoordinator(target); // Pre-populate origin with 10 messages for (var i = 0; i < 10; i++) await origin.AppendAsync($"subj.{i}", System.Text.Encoding.UTF8.GetBytes($"payload-{i}"), default); mirror.StartPullSyncLoop(origin, batchSize: 3); // Wait for some progress await WaitForConditionAsync(() => mirror.LastOriginSequence >= 3, TimeSpan.FromSeconds(5)); // Eventually should catch up to all 10 await WaitForConditionAsync(() => mirror.LastOriginSequence >= 10, TimeSpan.FromSeconds(10)); var report = mirror.GetHealthReport(originLastSeq: 10); report.Lag.ShouldBe(0UL); } [Fact] public async Task Pull_sync_loop_handles_empty_origin() { var origin = new MemStore(); var target = new MemStore(); await using var mirror = new MirrorCoordinator(target); mirror.StartPullSyncLoop(origin); // Wait a bit to ensure it doesn't crash await Task.Delay(200); mirror.IsRunning.ShouldBeTrue(); mirror.LastOriginSequence.ShouldBe(0UL); } // ------------------------------------------------------------------------- // Dispose / lifecycle tests // ------------------------------------------------------------------------- [Fact] public async Task Dispose_stops_running_sync_loop() { var target = new MemStore(); var mirror = new MirrorCoordinator(target); mirror.StartSyncLoop(); mirror.IsRunning.ShouldBeTrue(); await mirror.DisposeAsync(); mirror.IsRunning.ShouldBeFalse(); } [Fact] public async Task Multiple_start_calls_are_idempotent() { var target = new MemStore(); await using var mirror = new MirrorCoordinator(target); mirror.StartSyncLoop(); mirror.StartSyncLoop(); // second call should be no-op mirror.IsRunning.ShouldBeTrue(); } // ------------------------------------------------------------------------- // Helpers // ------------------------------------------------------------------------- private static StoredMessage MakeMessage(ulong seq, string subject, string payload) => new() { Sequence = seq, Subject = subject, Payload = System.Text.Encoding.UTF8.GetBytes(payload), TimestampUtc = DateTime.UtcNow, }; private static async Task WaitForConditionAsync(Func condition, TimeSpan timeout) { using var cts = new CancellationTokenSource(timeout); while (!condition()) { await Task.Delay(25, cts.Token); } } }