feat(mirror): add exponential backoff retry, gap detection, and error tracking
Exposes public RecordFailure/RecordSuccess/GetRetryDelay (no-jitter, deterministic) on MirrorCoordinator, plus RecordSourceSeq with HasGap/GapStart/GapEnd properties and SetError/ClearError/HasError/ErrorMessage for error state. Makes IsDuplicate and RecordMsgId public on SourceCoordinator and adds PruneDedupWindow(DateTimeOffset) for explicit-cutoff dedup window pruning. Adds 5 unit tests in MirrorSourceRetryTests.
This commit is contained in:
@@ -58,12 +58,44 @@ public sealed class MirrorCoordinator : IAsyncDisposable
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
public ulong Lag { get; private set; }
|
public ulong Lag { get; private set; }
|
||||||
|
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
// Gap detection properties
|
||||||
|
// Go reference: server/stream.go:2863-3014 (processInboundMirrorMsg gap handling)
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
|
||||||
|
/// <summary>Whether a sequence gap has been detected in the origin stream.</summary>
|
||||||
|
public bool HasGap => _hasGap;
|
||||||
|
|
||||||
|
/// <summary>First missing sequence number when a gap is detected.</summary>
|
||||||
|
public ulong GapStart => _gapStart;
|
||||||
|
|
||||||
|
/// <summary>Last missing sequence number when a gap is detected.</summary>
|
||||||
|
public ulong GapEnd => _gapEnd;
|
||||||
|
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
// Error state properties
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
|
||||||
|
/// <summary>Whether the coordinator is in an error state.</summary>
|
||||||
|
public bool HasError => _errorMessage is not null;
|
||||||
|
|
||||||
|
/// <summary>Current error message, or null if no error.</summary>
|
||||||
|
public string? ErrorMessage => _errorMessage;
|
||||||
|
|
||||||
// Go: mirror.sseq — stream sequence tracking for gap detection
|
// Go: mirror.sseq — stream sequence tracking for gap detection
|
||||||
private ulong _expectedOriginSeq;
|
private ulong _expectedOriginSeq;
|
||||||
|
|
||||||
// Go: mirror.dseq — delivery sequence tracking
|
// Go: mirror.dseq — delivery sequence tracking
|
||||||
private ulong _deliverySeq;
|
private ulong _deliverySeq;
|
||||||
|
|
||||||
|
// Gap detection state
|
||||||
|
private ulong _gapStart;
|
||||||
|
private ulong _gapEnd;
|
||||||
|
private bool _hasGap;
|
||||||
|
|
||||||
|
// Error state tracking
|
||||||
|
private string? _errorMessage;
|
||||||
|
|
||||||
public MirrorCoordinator(IStreamStore targetStore)
|
public MirrorCoordinator(IStreamStore targetStore)
|
||||||
{
|
{
|
||||||
_targetStore = targetStore;
|
_targetStore = targetStore;
|
||||||
@@ -175,6 +207,74 @@ public sealed class MirrorCoordinator : IAsyncDisposable
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
// Public retry / failure tracking API
|
||||||
|
// Go reference: server/stream.go:3478-3505 (calculateRetryBackoff),
|
||||||
|
// server/stream.go:3125-3400 (setupMirrorConsumer retry logic)
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Increments the consecutive failure counter.
|
||||||
|
/// Call this when a sync attempt fails.
|
||||||
|
/// </summary>
|
||||||
|
public void RecordFailure()
|
||||||
|
{
|
||||||
|
lock (_gate) _consecutiveFailures++;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Resets the consecutive failure counter to zero.
|
||||||
|
/// Call this when a sync attempt succeeds.
|
||||||
|
/// </summary>
|
||||||
|
public void RecordSuccess()
|
||||||
|
{
|
||||||
|
lock (_gate) _consecutiveFailures = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Returns the exponential backoff delay for the current failure count,
|
||||||
|
/// without jitter so that tests get deterministic results.
|
||||||
|
/// Go reference: server/stream.go:3478-3505 (calculateRetryBackoff)
|
||||||
|
/// </summary>
|
||||||
|
public TimeSpan GetRetryDelay()
|
||||||
|
{
|
||||||
|
var failures = ConsecutiveFailures;
|
||||||
|
if (failures == 0) return InitialRetryDelay;
|
||||||
|
var baseDelay = InitialRetryDelay.TotalMilliseconds * Math.Pow(2, Math.Min(failures - 1, 10));
|
||||||
|
var capped = Math.Min(baseDelay, MaxRetryDelay.TotalMilliseconds);
|
||||||
|
return TimeSpan.FromMilliseconds(capped);
|
||||||
|
}
|
||||||
|
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
// Sequence gap detection
|
||||||
|
// Go reference: server/stream.go:2863-3014 (processInboundMirrorMsg)
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Records the next received sequence number from the origin stream.
|
||||||
|
/// Sets gap state when a gap (skipped sequences) is detected.
|
||||||
|
/// </summary>
|
||||||
|
public void RecordSourceSeq(ulong seq)
|
||||||
|
{
|
||||||
|
if (_expectedOriginSeq > 0 && seq > _expectedOriginSeq + 1)
|
||||||
|
{
|
||||||
|
_hasGap = true;
|
||||||
|
_gapStart = _expectedOriginSeq + 1;
|
||||||
|
_gapEnd = seq - 1;
|
||||||
|
}
|
||||||
|
_expectedOriginSeq = seq;
|
||||||
|
}
|
||||||
|
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
// Error state management
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
|
||||||
|
/// <summary>Sets the coordinator into an error state with the given message.</summary>
|
||||||
|
public void SetError(string message) => _errorMessage = message;
|
||||||
|
|
||||||
|
/// <summary>Clears the error state.</summary>
|
||||||
|
public void ClearError() => _errorMessage = null;
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Reports current health state for monitoring.
|
/// Reports current health state for monitoring.
|
||||||
/// Go reference: server/stream.go:2739-2743 (mirrorInfo), 2698-2736 (sourceInfo)
|
/// Go reference: server/stream.go:2739-2743 (mirrorInfo), 2698-2736 (sourceInfo)
|
||||||
|
|||||||
@@ -411,17 +411,41 @@ public sealed class SourceCoordinator : IAsyncDisposable
|
|||||||
// Deduplication helpers
|
// Deduplication helpers
|
||||||
// -------------------------------------------------------------------------
|
// -------------------------------------------------------------------------
|
||||||
|
|
||||||
private bool IsDuplicate(string msgId)
|
/// <summary>
|
||||||
|
/// Returns true if the given message ID is already present in the dedup window.
|
||||||
|
/// Go reference: server/stream.go duplicate window check
|
||||||
|
/// </summary>
|
||||||
|
public bool IsDuplicate(string msgId)
|
||||||
{
|
{
|
||||||
PruneDedupWindowIfNeeded();
|
PruneDedupWindowIfNeeded();
|
||||||
return _dedupWindow.ContainsKey(msgId);
|
return _dedupWindow.ContainsKey(msgId);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void RecordMsgId(string msgId)
|
/// <summary>
|
||||||
|
/// Records a message ID in the dedup window with the current timestamp.
|
||||||
|
/// Go reference: server/stream.go duplicate window tracking
|
||||||
|
/// </summary>
|
||||||
|
public void RecordMsgId(string msgId)
|
||||||
{
|
{
|
||||||
_dedupWindow[msgId] = DateTime.UtcNow;
|
_dedupWindow[msgId] = DateTime.UtcNow;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Removes all dedup window entries whose timestamps are earlier than the given cutoff.
|
||||||
|
/// This is the explicit-cutoff variant intended for testing, as opposed to the internal
|
||||||
|
/// time-based pruning done by <see cref="PruneDedupWindowIfNeeded"/>.
|
||||||
|
/// Go reference: server/stream.go duplicate window pruning
|
||||||
|
/// </summary>
|
||||||
|
public void PruneDedupWindow(DateTimeOffset cutoff)
|
||||||
|
{
|
||||||
|
var cutoffDt = cutoff.UtcDateTime;
|
||||||
|
foreach (var kvp in _dedupWindow)
|
||||||
|
{
|
||||||
|
if (kvp.Value < cutoffDt)
|
||||||
|
_dedupWindow.TryRemove(kvp.Key, out _);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void PruneDedupWindowIfNeeded()
|
private void PruneDedupWindowIfNeeded()
|
||||||
{
|
{
|
||||||
if (_sourceConfig.DuplicateWindowMs <= 0)
|
if (_sourceConfig.DuplicateWindowMs <= 0)
|
||||||
|
|||||||
110
tests/NATS.Server.Tests/MirrorSourceRetryTests.cs
Normal file
110
tests/NATS.Server.Tests/MirrorSourceRetryTests.cs
Normal file
@@ -0,0 +1,110 @@
|
|||||||
|
using NSubstitute;
|
||||||
|
using NATS.Server.JetStream.MirrorSource;
|
||||||
|
using NATS.Server.JetStream.Models;
|
||||||
|
using NATS.Server.JetStream.Storage;
|
||||||
|
|
||||||
|
namespace NATS.Server.Tests;
|
||||||
|
|
||||||
|
// Go reference: server/stream.go:3478-3505 (calculateRetryBackoff),
|
||||||
|
// server/stream.go:3125-3400 (setupMirrorConsumer retry logic)
|
||||||
|
|
||||||
|
public class MirrorSourceRetryTests
|
||||||
|
{
|
||||||
|
[Fact]
|
||||||
|
public void Mirror_retry_uses_exponential_backoff()
|
||||||
|
{
|
||||||
|
// Go reference: server/stream.go:3478-3505 calculateRetryBackoff
|
||||||
|
var mirror = MirrorCoordinatorTestHelper.Create();
|
||||||
|
|
||||||
|
mirror.RecordFailure();
|
||||||
|
var delay1 = mirror.GetRetryDelay();
|
||||||
|
delay1.ShouldBeGreaterThanOrEqualTo(TimeSpan.FromMilliseconds(250));
|
||||||
|
|
||||||
|
mirror.RecordFailure();
|
||||||
|
var delay2 = mirror.GetRetryDelay();
|
||||||
|
delay2.ShouldBeGreaterThan(delay1);
|
||||||
|
|
||||||
|
// Cap at max
|
||||||
|
for (int i = 0; i < 20; i++) mirror.RecordFailure();
|
||||||
|
var delayMax = mirror.GetRetryDelay();
|
||||||
|
delayMax.ShouldBeLessThanOrEqualTo(TimeSpan.FromSeconds(30));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Mirror_success_resets_backoff()
|
||||||
|
{
|
||||||
|
// Go reference: server/stream.go setupMirrorConsumer — success resets retry
|
||||||
|
var mirror = MirrorCoordinatorTestHelper.Create();
|
||||||
|
|
||||||
|
for (int i = 0; i < 5; i++) mirror.RecordFailure();
|
||||||
|
mirror.RecordSuccess();
|
||||||
|
|
||||||
|
var delay = mirror.GetRetryDelay();
|
||||||
|
delay.ShouldBe(TimeSpan.FromMilliseconds(250));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Mirror_tracks_sequence_gap()
|
||||||
|
{
|
||||||
|
// Go reference: server/stream.go:2863-3014 processInboundMirrorMsg — gap detection
|
||||||
|
var mirror = MirrorCoordinatorTestHelper.Create();
|
||||||
|
|
||||||
|
mirror.RecordSourceSeq(1);
|
||||||
|
mirror.RecordSourceSeq(2);
|
||||||
|
mirror.RecordSourceSeq(5); // gap: 3, 4 missing
|
||||||
|
|
||||||
|
mirror.HasGap.ShouldBeTrue();
|
||||||
|
mirror.GapStart.ShouldBe(3UL);
|
||||||
|
mirror.GapEnd.ShouldBe(4UL);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Mirror_tracks_error_state()
|
||||||
|
{
|
||||||
|
// Go reference: server/stream.go mirror error state tracking
|
||||||
|
var mirror = MirrorCoordinatorTestHelper.Create();
|
||||||
|
|
||||||
|
mirror.SetError("connection refused");
|
||||||
|
mirror.HasError.ShouldBeTrue();
|
||||||
|
mirror.ErrorMessage.ShouldBe("connection refused");
|
||||||
|
|
||||||
|
mirror.ClearError();
|
||||||
|
mirror.HasError.ShouldBeFalse();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Source_dedup_window_prunes_expired()
|
||||||
|
{
|
||||||
|
// Go reference: server/stream.go duplicate window pruning
|
||||||
|
var source = SourceCoordinatorTestHelper.Create();
|
||||||
|
|
||||||
|
source.RecordMsgId("msg-1");
|
||||||
|
source.RecordMsgId("msg-2");
|
||||||
|
|
||||||
|
source.IsDuplicate("msg-1").ShouldBeTrue();
|
||||||
|
source.IsDuplicate("msg-3").ShouldBeFalse();
|
||||||
|
|
||||||
|
// Simulate time passing beyond dedup window
|
||||||
|
source.PruneDedupWindow(DateTimeOffset.UtcNow.AddMinutes(5));
|
||||||
|
source.IsDuplicate("msg-1").ShouldBeFalse();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class MirrorCoordinatorTestHelper
|
||||||
|
{
|
||||||
|
public static MirrorCoordinator Create()
|
||||||
|
{
|
||||||
|
var store = Substitute.For<IStreamStore>();
|
||||||
|
return new MirrorCoordinator(store);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class SourceCoordinatorTestHelper
|
||||||
|
{
|
||||||
|
public static SourceCoordinator Create()
|
||||||
|
{
|
||||||
|
var store = Substitute.For<IStreamStore>();
|
||||||
|
var config = new StreamSourceConfig { Name = "test-source", DuplicateWindowMs = 60_000 };
|
||||||
|
return new SourceCoordinator(store, config);
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user