From 8fa16d59d2011ea3404f9516874cb649a69433ff Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Wed, 25 Feb 2026 02:30:55 -0500 Subject: [PATCH] feat(mirror): add exponential backoff retry, gap detection, and error tracking Exposes public RecordFailure/RecordSuccess/GetRetryDelay (no-jitter, deterministic) on MirrorCoordinator, plus RecordSourceSeq with HasGap/GapStart/GapEnd properties and SetError/ClearError/HasError/ErrorMessage for error state. Makes IsDuplicate and RecordMsgId public on SourceCoordinator and adds PruneDedupWindow(DateTimeOffset) for explicit-cutoff dedup window pruning. Adds 5 unit tests in MirrorSourceRetryTests. --- .../MirrorSource/MirrorCoordinator.cs | 100 ++++++++++++++++ .../MirrorSource/SourceCoordinator.cs | 28 ++++- .../MirrorSourceRetryTests.cs | 110 ++++++++++++++++++ 3 files changed, 236 insertions(+), 2 deletions(-) create mode 100644 tests/NATS.Server.Tests/MirrorSourceRetryTests.cs diff --git a/src/NATS.Server/JetStream/MirrorSource/MirrorCoordinator.cs b/src/NATS.Server/JetStream/MirrorSource/MirrorCoordinator.cs index 795a797..0a61004 100644 --- a/src/NATS.Server/JetStream/MirrorSource/MirrorCoordinator.cs +++ b/src/NATS.Server/JetStream/MirrorSource/MirrorCoordinator.cs @@ -58,12 +58,44 @@ public sealed class MirrorCoordinator : IAsyncDisposable /// public ulong Lag { get; private set; } + // ------------------------------------------------------------------------- + // Gap detection properties + // Go reference: server/stream.go:2863-3014 (processInboundMirrorMsg gap handling) + // ------------------------------------------------------------------------- + + /// Whether a sequence gap has been detected in the origin stream. + public bool HasGap => _hasGap; + + /// First missing sequence number when a gap is detected. + public ulong GapStart => _gapStart; + + /// Last missing sequence number when a gap is detected. + public ulong GapEnd => _gapEnd; + + // ------------------------------------------------------------------------- + // Error state properties + // ------------------------------------------------------------------------- + + /// Whether the coordinator is in an error state. + public bool HasError => _errorMessage is not null; + + /// Current error message, or null if no error. + public string? ErrorMessage => _errorMessage; + // Go: mirror.sseq — stream sequence tracking for gap detection private ulong _expectedOriginSeq; // Go: mirror.dseq — delivery sequence tracking private ulong _deliverySeq; + // Gap detection state + private ulong _gapStart; + private ulong _gapEnd; + private bool _hasGap; + + // Error state tracking + private string? _errorMessage; + public MirrorCoordinator(IStreamStore targetStore) { _targetStore = targetStore; @@ -175,6 +207,74 @@ public sealed class MirrorCoordinator : IAsyncDisposable } } + // ------------------------------------------------------------------------- + // Public retry / failure tracking API + // Go reference: server/stream.go:3478-3505 (calculateRetryBackoff), + // server/stream.go:3125-3400 (setupMirrorConsumer retry logic) + // ------------------------------------------------------------------------- + + /// + /// Increments the consecutive failure counter. + /// Call this when a sync attempt fails. + /// + public void RecordFailure() + { + lock (_gate) _consecutiveFailures++; + } + + /// + /// Resets the consecutive failure counter to zero. + /// Call this when a sync attempt succeeds. + /// + public void RecordSuccess() + { + lock (_gate) _consecutiveFailures = 0; + } + + /// + /// Returns the exponential backoff delay for the current failure count, + /// without jitter so that tests get deterministic results. + /// Go reference: server/stream.go:3478-3505 (calculateRetryBackoff) + /// + public TimeSpan GetRetryDelay() + { + var failures = ConsecutiveFailures; + if (failures == 0) return InitialRetryDelay; + var baseDelay = InitialRetryDelay.TotalMilliseconds * Math.Pow(2, Math.Min(failures - 1, 10)); + var capped = Math.Min(baseDelay, MaxRetryDelay.TotalMilliseconds); + return TimeSpan.FromMilliseconds(capped); + } + + // ------------------------------------------------------------------------- + // Sequence gap detection + // Go reference: server/stream.go:2863-3014 (processInboundMirrorMsg) + // ------------------------------------------------------------------------- + + /// + /// Records the next received sequence number from the origin stream. + /// Sets gap state when a gap (skipped sequences) is detected. + /// + public void RecordSourceSeq(ulong seq) + { + if (_expectedOriginSeq > 0 && seq > _expectedOriginSeq + 1) + { + _hasGap = true; + _gapStart = _expectedOriginSeq + 1; + _gapEnd = seq - 1; + } + _expectedOriginSeq = seq; + } + + // ------------------------------------------------------------------------- + // Error state management + // ------------------------------------------------------------------------- + + /// Sets the coordinator into an error state with the given message. + public void SetError(string message) => _errorMessage = message; + + /// Clears the error state. + public void ClearError() => _errorMessage = null; + /// /// Reports current health state for monitoring. /// Go reference: server/stream.go:2739-2743 (mirrorInfo), 2698-2736 (sourceInfo) diff --git a/src/NATS.Server/JetStream/MirrorSource/SourceCoordinator.cs b/src/NATS.Server/JetStream/MirrorSource/SourceCoordinator.cs index 669c72e..57d30b5 100644 --- a/src/NATS.Server/JetStream/MirrorSource/SourceCoordinator.cs +++ b/src/NATS.Server/JetStream/MirrorSource/SourceCoordinator.cs @@ -411,17 +411,41 @@ public sealed class SourceCoordinator : IAsyncDisposable // Deduplication helpers // ------------------------------------------------------------------------- - private bool IsDuplicate(string msgId) + /// + /// Returns true if the given message ID is already present in the dedup window. + /// Go reference: server/stream.go duplicate window check + /// + public bool IsDuplicate(string msgId) { PruneDedupWindowIfNeeded(); return _dedupWindow.ContainsKey(msgId); } - private void RecordMsgId(string msgId) + /// + /// Records a message ID in the dedup window with the current timestamp. + /// Go reference: server/stream.go duplicate window tracking + /// + public void RecordMsgId(string msgId) { _dedupWindow[msgId] = DateTime.UtcNow; } + /// + /// Removes all dedup window entries whose timestamps are earlier than the given cutoff. + /// This is the explicit-cutoff variant intended for testing, as opposed to the internal + /// time-based pruning done by . + /// Go reference: server/stream.go duplicate window pruning + /// + public void PruneDedupWindow(DateTimeOffset cutoff) + { + var cutoffDt = cutoff.UtcDateTime; + foreach (var kvp in _dedupWindow) + { + if (kvp.Value < cutoffDt) + _dedupWindow.TryRemove(kvp.Key, out _); + } + } + private void PruneDedupWindowIfNeeded() { if (_sourceConfig.DuplicateWindowMs <= 0) diff --git a/tests/NATS.Server.Tests/MirrorSourceRetryTests.cs b/tests/NATS.Server.Tests/MirrorSourceRetryTests.cs new file mode 100644 index 0000000..54efd06 --- /dev/null +++ b/tests/NATS.Server.Tests/MirrorSourceRetryTests.cs @@ -0,0 +1,110 @@ +using NSubstitute; +using NATS.Server.JetStream.MirrorSource; +using NATS.Server.JetStream.Models; +using NATS.Server.JetStream.Storage; + +namespace NATS.Server.Tests; + +// Go reference: server/stream.go:3478-3505 (calculateRetryBackoff), +// server/stream.go:3125-3400 (setupMirrorConsumer retry logic) + +public class MirrorSourceRetryTests +{ + [Fact] + public void Mirror_retry_uses_exponential_backoff() + { + // Go reference: server/stream.go:3478-3505 calculateRetryBackoff + var mirror = MirrorCoordinatorTestHelper.Create(); + + mirror.RecordFailure(); + var delay1 = mirror.GetRetryDelay(); + delay1.ShouldBeGreaterThanOrEqualTo(TimeSpan.FromMilliseconds(250)); + + mirror.RecordFailure(); + var delay2 = mirror.GetRetryDelay(); + delay2.ShouldBeGreaterThan(delay1); + + // Cap at max + for (int i = 0; i < 20; i++) mirror.RecordFailure(); + var delayMax = mirror.GetRetryDelay(); + delayMax.ShouldBeLessThanOrEqualTo(TimeSpan.FromSeconds(30)); + } + + [Fact] + public void Mirror_success_resets_backoff() + { + // Go reference: server/stream.go setupMirrorConsumer — success resets retry + var mirror = MirrorCoordinatorTestHelper.Create(); + + for (int i = 0; i < 5; i++) mirror.RecordFailure(); + mirror.RecordSuccess(); + + var delay = mirror.GetRetryDelay(); + delay.ShouldBe(TimeSpan.FromMilliseconds(250)); + } + + [Fact] + public void Mirror_tracks_sequence_gap() + { + // Go reference: server/stream.go:2863-3014 processInboundMirrorMsg — gap detection + var mirror = MirrorCoordinatorTestHelper.Create(); + + mirror.RecordSourceSeq(1); + mirror.RecordSourceSeq(2); + mirror.RecordSourceSeq(5); // gap: 3, 4 missing + + mirror.HasGap.ShouldBeTrue(); + mirror.GapStart.ShouldBe(3UL); + mirror.GapEnd.ShouldBe(4UL); + } + + [Fact] + public void Mirror_tracks_error_state() + { + // Go reference: server/stream.go mirror error state tracking + var mirror = MirrorCoordinatorTestHelper.Create(); + + mirror.SetError("connection refused"); + mirror.HasError.ShouldBeTrue(); + mirror.ErrorMessage.ShouldBe("connection refused"); + + mirror.ClearError(); + mirror.HasError.ShouldBeFalse(); + } + + [Fact] + public void Source_dedup_window_prunes_expired() + { + // Go reference: server/stream.go duplicate window pruning + var source = SourceCoordinatorTestHelper.Create(); + + source.RecordMsgId("msg-1"); + source.RecordMsgId("msg-2"); + + source.IsDuplicate("msg-1").ShouldBeTrue(); + source.IsDuplicate("msg-3").ShouldBeFalse(); + + // Simulate time passing beyond dedup window + source.PruneDedupWindow(DateTimeOffset.UtcNow.AddMinutes(5)); + source.IsDuplicate("msg-1").ShouldBeFalse(); + } +} + +public static class MirrorCoordinatorTestHelper +{ + public static MirrorCoordinator Create() + { + var store = Substitute.For(); + return new MirrorCoordinator(store); + } +} + +public static class SourceCoordinatorTestHelper +{ + public static SourceCoordinator Create() + { + var store = Substitute.For(); + var config = new StreamSourceConfig { Name = "test-source", DuplicateWindowMs = 60_000 }; + return new SourceCoordinator(store, config); + } +}