diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Lifecycle.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Lifecycle.cs index cda43de..bd444d3 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Lifecycle.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Lifecycle.cs @@ -185,20 +185,10 @@ internal sealed partial class NatsStream }; internal Exception? Update(StreamConfig config) - { - if (config == null) - return new ArgumentNullException(nameof(config)); - - UpdateConfig(config); - Exception? error = null; - return error; - } + => UpdateWithAdvisory(config, sendAdvisory: true, pedantic: false); internal Exception? UpdatePedantic(StreamConfig config, bool pedantic) - { - _ = pedantic; - return Update(config); - } + => UpdateWithAdvisory(config, sendAdvisory: true, pedantic); internal StreamAssignment? StreamAssignment() { diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Mirror.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Mirror.cs new file mode 100644 index 0000000..2bb4736 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Mirror.cs @@ -0,0 +1,270 @@ +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsStream +{ + private static readonly TimeSpan RetryBackoff = TimeSpan.FromSeconds(5); + private static readonly TimeSpan RetryMaximum = TimeSpan.FromMinutes(2); + private static readonly TimeSpan SourceConsumerRetryThreshold = TimeSpan.FromSeconds(10); + + internal Exception? UpdateWithAdvisory(StreamConfig config, bool sendAdvisory, bool pedantic) + { + _ = pedantic; + + if (config == null) + return new ArgumentNullException(nameof(config)); + + UpdateConfig(config); + Exception? error = null; + if (sendAdvisory) + SendUpdateAdvisoryLocked(); + + return error; + } + + internal string GetCfgName() + { + _mu.EnterReadLock(); + try + { + return Config.Name ?? string.Empty; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal (ulong Purged, Exception? Error) PurgeLocked(StreamPurgeRequest? request, bool needLock) + { + if (needLock) + _mu.EnterWriteLock(); + + try + { + if (_closed) + return (0UL, new InvalidOperationException("stream closed")); + + if (_sealed) + return (0UL, new InvalidOperationException("sealed stream")); + + if (Store == null) + return (0UL, new InvalidOperationException("stream store unavailable")); + + var result = request == null + ? Store.Purge() + : Store.PurgeEx(request.Filter ?? string.Empty, request.Sequence, request.Keep); + SyncCountersFromState(Store.State()); + return result; + } + finally + { + if (needLock) + _mu.ExitWriteLock(); + } + } + + internal (bool Removed, Exception? Error) RemoveMsg(ulong sequence) + => DeleteMsg(sequence); + + internal (bool Removed, Exception? Error) DeleteMsg(ulong sequence) + { + _mu.EnterWriteLock(); + try + { + if (_closed) + return (false, new InvalidOperationException("stream closed")); + + if (Store == null) + return (false, new InvalidOperationException("stream store unavailable")); + + var result = Store.RemoveMsg(sequence); + if (result.Error == null) + SyncCountersFromState(Store.State()); + return result; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal (bool Removed, Exception? Error) EraseMsg(ulong sequence) + { + _mu.EnterWriteLock(); + try + { + if (_closed) + return (false, new InvalidOperationException("stream closed")); + + if (Store == null) + return (false, new InvalidOperationException("stream store unavailable")); + + var result = Store.EraseMsg(sequence); + if (result.Error == null) + SyncCountersFromState(Store.State()); + return result; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal bool IsMirror() + { + _mu.EnterReadLock(); + try + { + return _isMirror || Config.Mirror != null; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal StreamSourceInfo[] SourcesInfo() + { + _mu.EnterReadLock(); + try + { + return _sources.Values.Select(SourceInfo).Where(static info => info != null).Cast().ToArray(); + } + finally + { + _mu.ExitReadLock(); + } + } + + internal StreamSourceInfo? SourceInfo(StreamSourceInfo? sourceInfo) + { + if (sourceInfo == null) + return null; + + return new StreamSourceInfo + { + Name = sourceInfo.Name, + Lag = sourceInfo.Lag, + FilterSubject = sourceInfo.FilterSubject, + Active = sourceInfo.Active, + Error = sourceInfo.Error, + External = sourceInfo.External == null + ? null + : new StreamSource + { + Name = sourceInfo.External.Name, + FilterSubject = sourceInfo.External.FilterSubject, + SubjectTransforms = sourceInfo.External.SubjectTransforms, + External = sourceInfo.External.External, + }, + }; + } + + internal StreamSourceInfo? MirrorInfo() + { + _mu.EnterReadLock(); + try + { + return SourceInfo(_mirrorInfo); + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void SetMirrorErr(JsApiError? error) + { + _mu.EnterWriteLock(); + try + { + if (_mirrorInfo != null) + _mirrorInfo.Error = error?.Description; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void CancelMirrorConsumer() + { + _mu.EnterWriteLock(); + try + { + _mirrorConsumerSetupTimer?.Dispose(); + _mirrorConsumerSetupTimer = null; + if (_mirrorInfo != null) + { + _mirrorInfo.Active = null; + _mirrorInfo.Error = null; + } + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal Exception? RetryMirrorConsumer() + { + CancelMirrorConsumer(); + Exception? error = null; + return error; + } + + internal void SkipMsgs(ulong start, ulong end) + { + _mu.EnterWriteLock(); + try + { + if (Store == null || start > end) + return; + + var count = (end - start) + 1; + Store.SkipMsgs(start, count); + SetLastSeq(end); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal static TimeSpan CalculateRetryBackoff(int failures) + { + var backoff = TimeSpan.FromTicks(RetryBackoff.Ticks * Math.Max(1, failures * 2)); + return backoff > RetryMaximum ? RetryMaximum : backoff; + } + + internal void ScheduleSetupMirrorConsumerRetry() + { + _mu.EnterWriteLock(); + try + { + var lastAttempt = _mirrorInfo?.Active ?? DateTime.UtcNow - SourceConsumerRetryThreshold; + var next = SourceConsumerRetryThreshold - (DateTime.UtcNow - lastAttempt); + if (next < TimeSpan.Zero) + next = TimeSpan.Zero; + + var failures = _mirrorInfo == null ? 1 : (int)Math.Min(_mirrorInfo.Lag, int.MaxValue); + next += CalculateRetryBackoff(failures); + next += TimeSpan.FromMilliseconds(Random.Shared.Next(100, 201)); + + _mirrorConsumerSetupTimer?.Dispose(); + _mirrorConsumerSetupTimer = new Timer( + static state => + { + if (state is NatsStream stream) + stream.RetryMirrorConsumer(); + }, + this, + next, + Timeout.InfiniteTimeSpan); + } + finally + { + _mu.ExitWriteLock(); + } + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs index 74d1853..ff8b82d 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs @@ -37,7 +37,7 @@ internal sealed partial class NatsStream : IDisposable internal long FirstSeq; internal long LastSeq; - internal bool IsMirror; + private bool _isMirror; private bool _closed; private bool _isLeader; @@ -50,6 +50,9 @@ internal sealed partial class NatsStream : IDisposable private bool _clusterSubsActive; private ulong _clseq; private ulong _clfs; + private readonly Dictionary _sources = new(StringComparer.Ordinal); + private StreamSourceInfo? _mirrorInfo; + private Timer? _mirrorConsumerSetupTimer; /// IRaftNode — stored as object to avoid cross-dependency on Raft session. private object? _node; @@ -86,7 +89,7 @@ internal sealed partial class NatsStream : IDisposable var stream = new NatsStream(acc, cfg.Clone(), DateTime.UtcNow) { Store = store, - IsMirror = cfg.Mirror != null, + _isMirror = cfg.Mirror != null, }; return stream; } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StreamLifecycleGroupBTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StreamLifecycleGroupBTests.cs index 4d7cd08..f223741 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StreamLifecycleGroupBTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StreamLifecycleGroupBTests.cs @@ -123,6 +123,44 @@ public sealed class StreamLifecycleGroupBTests jsa.SubjectsOverlap(["orders.created"], ownAssignment: null).ShouldBeTrue(); } + [Fact] + public void GroupCHelpers_GetCfgNameAndIsMirror_ReturnExpectedValues() + { + var mirrorCfg = new StreamConfig + { + Name = "MIRROR", + Storage = StorageType.MemoryStorage, + Mirror = new StreamSource { Name = "ORIGIN" }, + }; + + var stream = CreateStream(mirrorCfg); + + stream.GetCfgName().ShouldBe("MIRROR"); + stream.IsMirror().ShouldBeTrue(); + stream.MirrorInfo().ShouldBeNull(); + } + + [Fact] + public void GroupCHelpers_SourceInfoAndBackoff_Behave() + { + var stream = CreateStream(); + var info = new StreamSourceInfo + { + Name = "SRC", + FilterSubject = "orders.*", + Lag = 10, + Error = "x", + }; + + var cloned = stream.SourceInfo(info); + cloned.ShouldNotBeNull(); + cloned!.Name.ShouldBe("SRC"); + cloned.FilterSubject.ShouldBe("orders.*"); + + NatsStream.CalculateRetryBackoff(1).ShouldBeGreaterThan(TimeSpan.Zero); + NatsStream.CalculateRetryBackoff(1000).ShouldBe(TimeSpan.FromMinutes(2)); + } + private static NatsStream CreateStream(StreamConfig? cfg = null) { cfg ??= new StreamConfig { Name = "ORDERS", Subjects = ["orders.*"], Storage = StorageType.MemoryStorage }; diff --git a/porting.db b/porting.db index 6d4b756..3d5bfc7 100644 Binary files a/porting.db and b/porting.db differ