diff --git a/src/NATS.Server/JetStream/MirrorSource/MirrorCoordinator.cs b/src/NATS.Server/JetStream/MirrorSource/MirrorCoordinator.cs
index 795a797..0a61004 100644
--- a/src/NATS.Server/JetStream/MirrorSource/MirrorCoordinator.cs
+++ b/src/NATS.Server/JetStream/MirrorSource/MirrorCoordinator.cs
@@ -58,12 +58,44 @@ public sealed class MirrorCoordinator : IAsyncDisposable
///
public ulong Lag { get; private set; }
+ // -------------------------------------------------------------------------
+ // Gap detection properties
+ // Go reference: server/stream.go:2863-3014 (processInboundMirrorMsg gap handling)
+ // -------------------------------------------------------------------------
+
+ /// Whether a sequence gap has been detected in the origin stream.
+ public bool HasGap => _hasGap;
+
+ /// First missing sequence number when a gap is detected.
+ public ulong GapStart => _gapStart;
+
+ /// Last missing sequence number when a gap is detected.
+ public ulong GapEnd => _gapEnd;
+
+ // -------------------------------------------------------------------------
+ // Error state properties
+ // -------------------------------------------------------------------------
+
+ /// Whether the coordinator is in an error state.
+ public bool HasError => _errorMessage is not null;
+
+ /// Current error message, or null if no error.
+ public string? ErrorMessage => _errorMessage;
+
// Go: mirror.sseq — stream sequence tracking for gap detection
private ulong _expectedOriginSeq;
// Go: mirror.dseq — delivery sequence tracking
private ulong _deliverySeq;
+ // Gap detection state
+ private ulong _gapStart;
+ private ulong _gapEnd;
+ private bool _hasGap;
+
+ // Error state tracking
+ private string? _errorMessage;
+
public MirrorCoordinator(IStreamStore targetStore)
{
_targetStore = targetStore;
@@ -175,6 +207,74 @@ public sealed class MirrorCoordinator : IAsyncDisposable
}
}
+ // -------------------------------------------------------------------------
+ // Public retry / failure tracking API
+ // Go reference: server/stream.go:3478-3505 (calculateRetryBackoff),
+ // server/stream.go:3125-3400 (setupMirrorConsumer retry logic)
+ // -------------------------------------------------------------------------
+
+ ///
+ /// Increments the consecutive failure counter.
+ /// Call this when a sync attempt fails.
+ ///
+ public void RecordFailure()
+ {
+ lock (_gate) _consecutiveFailures++;
+ }
+
+ ///
+ /// Resets the consecutive failure counter to zero.
+ /// Call this when a sync attempt succeeds.
+ ///
+ public void RecordSuccess()
+ {
+ lock (_gate) _consecutiveFailures = 0;
+ }
+
+ ///
+ /// Returns the exponential backoff delay for the current failure count,
+ /// without jitter so that tests get deterministic results.
+ /// Go reference: server/stream.go:3478-3505 (calculateRetryBackoff)
+ ///
+ public TimeSpan GetRetryDelay()
+ {
+ var failures = ConsecutiveFailures;
+ if (failures == 0) return InitialRetryDelay;
+ var baseDelay = InitialRetryDelay.TotalMilliseconds * Math.Pow(2, Math.Min(failures - 1, 10));
+ var capped = Math.Min(baseDelay, MaxRetryDelay.TotalMilliseconds);
+ return TimeSpan.FromMilliseconds(capped);
+ }
+
+ // -------------------------------------------------------------------------
+ // Sequence gap detection
+ // Go reference: server/stream.go:2863-3014 (processInboundMirrorMsg)
+ // -------------------------------------------------------------------------
+
+ ///
+ /// Records the next received sequence number from the origin stream.
+ /// Sets gap state when a gap (skipped sequences) is detected.
+ ///
+ public void RecordSourceSeq(ulong seq)
+ {
+ if (_expectedOriginSeq > 0 && seq > _expectedOriginSeq + 1)
+ {
+ _hasGap = true;
+ _gapStart = _expectedOriginSeq + 1;
+ _gapEnd = seq - 1;
+ }
+ _expectedOriginSeq = seq;
+ }
+
+ // -------------------------------------------------------------------------
+ // Error state management
+ // -------------------------------------------------------------------------
+
+ /// Sets the coordinator into an error state with the given message.
+ public void SetError(string message) => _errorMessage = message;
+
+ /// Clears the error state.
+ public void ClearError() => _errorMessage = null;
+
///
/// Reports current health state for monitoring.
/// Go reference: server/stream.go:2739-2743 (mirrorInfo), 2698-2736 (sourceInfo)
diff --git a/src/NATS.Server/JetStream/MirrorSource/SourceCoordinator.cs b/src/NATS.Server/JetStream/MirrorSource/SourceCoordinator.cs
index 669c72e..57d30b5 100644
--- a/src/NATS.Server/JetStream/MirrorSource/SourceCoordinator.cs
+++ b/src/NATS.Server/JetStream/MirrorSource/SourceCoordinator.cs
@@ -411,17 +411,41 @@ public sealed class SourceCoordinator : IAsyncDisposable
// Deduplication helpers
// -------------------------------------------------------------------------
- private bool IsDuplicate(string msgId)
+ ///
+ /// Returns true if the given message ID is already present in the dedup window.
+ /// Go reference: server/stream.go duplicate window check
+ ///
+ public bool IsDuplicate(string msgId)
{
PruneDedupWindowIfNeeded();
return _dedupWindow.ContainsKey(msgId);
}
- private void RecordMsgId(string msgId)
+ ///
+ /// Records a message ID in the dedup window with the current timestamp.
+ /// Go reference: server/stream.go duplicate window tracking
+ ///
+ public void RecordMsgId(string msgId)
{
_dedupWindow[msgId] = DateTime.UtcNow;
}
+ ///
+ /// Removes all dedup window entries whose timestamps are earlier than the given cutoff.
+ /// This is the explicit-cutoff variant intended for testing, as opposed to the internal
+ /// time-based pruning done by .
+ /// Go reference: server/stream.go duplicate window pruning
+ ///
+ public void PruneDedupWindow(DateTimeOffset cutoff)
+ {
+ var cutoffDt = cutoff.UtcDateTime;
+ foreach (var kvp in _dedupWindow)
+ {
+ if (kvp.Value < cutoffDt)
+ _dedupWindow.TryRemove(kvp.Key, out _);
+ }
+ }
+
private void PruneDedupWindowIfNeeded()
{
if (_sourceConfig.DuplicateWindowMs <= 0)
diff --git a/tests/NATS.Server.Tests/MirrorSourceRetryTests.cs b/tests/NATS.Server.Tests/MirrorSourceRetryTests.cs
new file mode 100644
index 0000000..54efd06
--- /dev/null
+++ b/tests/NATS.Server.Tests/MirrorSourceRetryTests.cs
@@ -0,0 +1,110 @@
+using NSubstitute;
+using NATS.Server.JetStream.MirrorSource;
+using NATS.Server.JetStream.Models;
+using NATS.Server.JetStream.Storage;
+
+namespace NATS.Server.Tests;
+
+// Go reference: server/stream.go:3478-3505 (calculateRetryBackoff),
+// server/stream.go:3125-3400 (setupMirrorConsumer retry logic)
+
+public class MirrorSourceRetryTests
+{
+ [Fact]
+ public void Mirror_retry_uses_exponential_backoff()
+ {
+ // Go reference: server/stream.go:3478-3505 calculateRetryBackoff
+ var mirror = MirrorCoordinatorTestHelper.Create();
+
+ mirror.RecordFailure();
+ var delay1 = mirror.GetRetryDelay();
+ delay1.ShouldBeGreaterThanOrEqualTo(TimeSpan.FromMilliseconds(250));
+
+ mirror.RecordFailure();
+ var delay2 = mirror.GetRetryDelay();
+ delay2.ShouldBeGreaterThan(delay1);
+
+ // Cap at max
+ for (int i = 0; i < 20; i++) mirror.RecordFailure();
+ var delayMax = mirror.GetRetryDelay();
+ delayMax.ShouldBeLessThanOrEqualTo(TimeSpan.FromSeconds(30));
+ }
+
+ [Fact]
+ public void Mirror_success_resets_backoff()
+ {
+ // Go reference: server/stream.go setupMirrorConsumer — success resets retry
+ var mirror = MirrorCoordinatorTestHelper.Create();
+
+ for (int i = 0; i < 5; i++) mirror.RecordFailure();
+ mirror.RecordSuccess();
+
+ var delay = mirror.GetRetryDelay();
+ delay.ShouldBe(TimeSpan.FromMilliseconds(250));
+ }
+
+ [Fact]
+ public void Mirror_tracks_sequence_gap()
+ {
+ // Go reference: server/stream.go:2863-3014 processInboundMirrorMsg — gap detection
+ var mirror = MirrorCoordinatorTestHelper.Create();
+
+ mirror.RecordSourceSeq(1);
+ mirror.RecordSourceSeq(2);
+ mirror.RecordSourceSeq(5); // gap: 3, 4 missing
+
+ mirror.HasGap.ShouldBeTrue();
+ mirror.GapStart.ShouldBe(3UL);
+ mirror.GapEnd.ShouldBe(4UL);
+ }
+
+ [Fact]
+ public void Mirror_tracks_error_state()
+ {
+ // Go reference: server/stream.go mirror error state tracking
+ var mirror = MirrorCoordinatorTestHelper.Create();
+
+ mirror.SetError("connection refused");
+ mirror.HasError.ShouldBeTrue();
+ mirror.ErrorMessage.ShouldBe("connection refused");
+
+ mirror.ClearError();
+ mirror.HasError.ShouldBeFalse();
+ }
+
+ [Fact]
+ public void Source_dedup_window_prunes_expired()
+ {
+ // Go reference: server/stream.go duplicate window pruning
+ var source = SourceCoordinatorTestHelper.Create();
+
+ source.RecordMsgId("msg-1");
+ source.RecordMsgId("msg-2");
+
+ source.IsDuplicate("msg-1").ShouldBeTrue();
+ source.IsDuplicate("msg-3").ShouldBeFalse();
+
+ // Simulate time passing beyond dedup window
+ source.PruneDedupWindow(DateTimeOffset.UtcNow.AddMinutes(5));
+ source.IsDuplicate("msg-1").ShouldBeFalse();
+ }
+}
+
+public static class MirrorCoordinatorTestHelper
+{
+ public static MirrorCoordinator Create()
+ {
+ var store = Substitute.For();
+ return new MirrorCoordinator(store);
+ }
+}
+
+public static class SourceCoordinatorTestHelper
+{
+ public static SourceCoordinator Create()
+ {
+ var store = Substitute.For();
+ var config = new StreamSourceConfig { Name = "test-source", DuplicateWindowMs = 60_000 };
+ return new SourceCoordinator(store, config);
+ }
+}