From 856cd1755489e44f315e9481d5a601282c21b76d Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 23:02:51 -0500 Subject: [PATCH] batch36 task4 stream mirror lifecycle helpers --- .../JetStream/NatsStream.Lifecycle.cs | 14 +- .../JetStream/NatsStream.Mirror.cs | 270 ++++++++++++++++++ .../JetStream/NatsStream.cs | 7 +- .../JetStream/StreamLifecycleGroupBTests.cs | 38 +++ porting.db | Bin 6754304 -> 6754304 bytes 5 files changed, 315 insertions(+), 14 deletions(-) create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Mirror.cs diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Lifecycle.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Lifecycle.cs index cda43de..bd444d3 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Lifecycle.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Lifecycle.cs @@ -185,20 +185,10 @@ internal sealed partial class NatsStream }; internal Exception? Update(StreamConfig config) - { - if (config == null) - return new ArgumentNullException(nameof(config)); - - UpdateConfig(config); - Exception? error = null; - return error; - } + => UpdateWithAdvisory(config, sendAdvisory: true, pedantic: false); internal Exception? UpdatePedantic(StreamConfig config, bool pedantic) - { - _ = pedantic; - return Update(config); - } + => UpdateWithAdvisory(config, sendAdvisory: true, pedantic); internal StreamAssignment? StreamAssignment() { diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Mirror.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Mirror.cs new file mode 100644 index 0000000..2bb4736 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Mirror.cs @@ -0,0 +1,270 @@ +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsStream +{ + private static readonly TimeSpan RetryBackoff = TimeSpan.FromSeconds(5); + private static readonly TimeSpan RetryMaximum = TimeSpan.FromMinutes(2); + private static readonly TimeSpan SourceConsumerRetryThreshold = TimeSpan.FromSeconds(10); + + internal Exception? UpdateWithAdvisory(StreamConfig config, bool sendAdvisory, bool pedantic) + { + _ = pedantic; + + if (config == null) + return new ArgumentNullException(nameof(config)); + + UpdateConfig(config); + Exception? error = null; + if (sendAdvisory) + SendUpdateAdvisoryLocked(); + + return error; + } + + internal string GetCfgName() + { + _mu.EnterReadLock(); + try + { + return Config.Name ?? string.Empty; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal (ulong Purged, Exception? Error) PurgeLocked(StreamPurgeRequest? request, bool needLock) + { + if (needLock) + _mu.EnterWriteLock(); + + try + { + if (_closed) + return (0UL, new InvalidOperationException("stream closed")); + + if (_sealed) + return (0UL, new InvalidOperationException("sealed stream")); + + if (Store == null) + return (0UL, new InvalidOperationException("stream store unavailable")); + + var result = request == null + ? Store.Purge() + : Store.PurgeEx(request.Filter ?? string.Empty, request.Sequence, request.Keep); + SyncCountersFromState(Store.State()); + return result; + } + finally + { + if (needLock) + _mu.ExitWriteLock(); + } + } + + internal (bool Removed, Exception? Error) RemoveMsg(ulong sequence) + => DeleteMsg(sequence); + + internal (bool Removed, Exception? Error) DeleteMsg(ulong sequence) + { + _mu.EnterWriteLock(); + try + { + if (_closed) + return (false, new InvalidOperationException("stream closed")); + + if (Store == null) + return (false, new InvalidOperationException("stream store unavailable")); + + var result = Store.RemoveMsg(sequence); + if (result.Error == null) + SyncCountersFromState(Store.State()); + return result; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal (bool Removed, Exception? Error) EraseMsg(ulong sequence) + { + _mu.EnterWriteLock(); + try + { + if (_closed) + return (false, new InvalidOperationException("stream closed")); + + if (Store == null) + return (false, new InvalidOperationException("stream store unavailable")); + + var result = Store.EraseMsg(sequence); + if (result.Error == null) + SyncCountersFromState(Store.State()); + return result; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal bool IsMirror() + { + _mu.EnterReadLock(); + try + { + return _isMirror || Config.Mirror != null; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal StreamSourceInfo[] SourcesInfo() + { + _mu.EnterReadLock(); + try + { + return _sources.Values.Select(SourceInfo).Where(static info => info != null).Cast().ToArray(); + } + finally + { + _mu.ExitReadLock(); + } + } + + internal StreamSourceInfo? SourceInfo(StreamSourceInfo? sourceInfo) + { + if (sourceInfo == null) + return null; + + return new StreamSourceInfo + { + Name = sourceInfo.Name, + Lag = sourceInfo.Lag, + FilterSubject = sourceInfo.FilterSubject, + Active = sourceInfo.Active, + Error = sourceInfo.Error, + External = sourceInfo.External == null + ? null + : new StreamSource + { + Name = sourceInfo.External.Name, + FilterSubject = sourceInfo.External.FilterSubject, + SubjectTransforms = sourceInfo.External.SubjectTransforms, + External = sourceInfo.External.External, + }, + }; + } + + internal StreamSourceInfo? MirrorInfo() + { + _mu.EnterReadLock(); + try + { + return SourceInfo(_mirrorInfo); + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void SetMirrorErr(JsApiError? error) + { + _mu.EnterWriteLock(); + try + { + if (_mirrorInfo != null) + _mirrorInfo.Error = error?.Description; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void CancelMirrorConsumer() + { + _mu.EnterWriteLock(); + try + { + _mirrorConsumerSetupTimer?.Dispose(); + _mirrorConsumerSetupTimer = null; + if (_mirrorInfo != null) + { + _mirrorInfo.Active = null; + _mirrorInfo.Error = null; + } + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal Exception? RetryMirrorConsumer() + { + CancelMirrorConsumer(); + Exception? error = null; + return error; + } + + internal void SkipMsgs(ulong start, ulong end) + { + _mu.EnterWriteLock(); + try + { + if (Store == null || start > end) + return; + + var count = (end - start) + 1; + Store.SkipMsgs(start, count); + SetLastSeq(end); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal static TimeSpan CalculateRetryBackoff(int failures) + { + var backoff = TimeSpan.FromTicks(RetryBackoff.Ticks * Math.Max(1, failures * 2)); + return backoff > RetryMaximum ? RetryMaximum : backoff; + } + + internal void ScheduleSetupMirrorConsumerRetry() + { + _mu.EnterWriteLock(); + try + { + var lastAttempt = _mirrorInfo?.Active ?? DateTime.UtcNow - SourceConsumerRetryThreshold; + var next = SourceConsumerRetryThreshold - (DateTime.UtcNow - lastAttempt); + if (next < TimeSpan.Zero) + next = TimeSpan.Zero; + + var failures = _mirrorInfo == null ? 1 : (int)Math.Min(_mirrorInfo.Lag, int.MaxValue); + next += CalculateRetryBackoff(failures); + next += TimeSpan.FromMilliseconds(Random.Shared.Next(100, 201)); + + _mirrorConsumerSetupTimer?.Dispose(); + _mirrorConsumerSetupTimer = new Timer( + static state => + { + if (state is NatsStream stream) + stream.RetryMirrorConsumer(); + }, + this, + next, + Timeout.InfiniteTimeSpan); + } + finally + { + _mu.ExitWriteLock(); + } + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs index 74d1853..ff8b82d 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.cs @@ -37,7 +37,7 @@ internal sealed partial class NatsStream : IDisposable internal long FirstSeq; internal long LastSeq; - internal bool IsMirror; + private bool _isMirror; private bool _closed; private bool _isLeader; @@ -50,6 +50,9 @@ internal sealed partial class NatsStream : IDisposable private bool _clusterSubsActive; private ulong _clseq; private ulong _clfs; + private readonly Dictionary _sources = new(StringComparer.Ordinal); + private StreamSourceInfo? _mirrorInfo; + private Timer? _mirrorConsumerSetupTimer; /// IRaftNode — stored as object to avoid cross-dependency on Raft session. private object? _node; @@ -86,7 +89,7 @@ internal sealed partial class NatsStream : IDisposable var stream = new NatsStream(acc, cfg.Clone(), DateTime.UtcNow) { Store = store, - IsMirror = cfg.Mirror != null, + _isMirror = cfg.Mirror != null, }; return stream; } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StreamLifecycleGroupBTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StreamLifecycleGroupBTests.cs index 4d7cd08..f223741 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StreamLifecycleGroupBTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/StreamLifecycleGroupBTests.cs @@ -123,6 +123,44 @@ public sealed class StreamLifecycleGroupBTests jsa.SubjectsOverlap(["orders.created"], ownAssignment: null).ShouldBeTrue(); } + [Fact] + public void GroupCHelpers_GetCfgNameAndIsMirror_ReturnExpectedValues() + { + var mirrorCfg = new StreamConfig + { + Name = "MIRROR", + Storage = StorageType.MemoryStorage, + Mirror = new StreamSource { Name = "ORIGIN" }, + }; + + var stream = CreateStream(mirrorCfg); + + stream.GetCfgName().ShouldBe("MIRROR"); + stream.IsMirror().ShouldBeTrue(); + stream.MirrorInfo().ShouldBeNull(); + } + + [Fact] + public void GroupCHelpers_SourceInfoAndBackoff_Behave() + { + var stream = CreateStream(); + var info = new StreamSourceInfo + { + Name = "SRC", + FilterSubject = "orders.*", + Lag = 10, + Error = "x", + }; + + var cloned = stream.SourceInfo(info); + cloned.ShouldNotBeNull(); + cloned!.Name.ShouldBe("SRC"); + cloned.FilterSubject.ShouldBe("orders.*"); + + NatsStream.CalculateRetryBackoff(1).ShouldBeGreaterThan(TimeSpan.Zero); + NatsStream.CalculateRetryBackoff(1000).ShouldBe(TimeSpan.FromMinutes(2)); + } + private static NatsStream CreateStream(StreamConfig? cfg = null) { cfg ??= new StreamConfig { Name = "ORDERS", Subjects = ["orders.*"], Storage = StorageType.MemoryStorage }; diff --git a/porting.db b/porting.db index 6d4b756427b758429191a76c2a2836fdc7b94621..3d5bfc76315348cd89da489886131b00922fc6d2 100644 GIT binary patch delta 1962 zcmZ9NYitx%6vub&&fJ+hyF0rrD;C&c%9cL1m9n(eR$8?x@~l`Y)Pg9xwcCoI;8u&E zrl&>^YSdsgJyLvBEl9AED(ZxnF?`UdiI_-yqLf52BvpYA#`wOrw0523m)!ro|2g;E zb8cp!?^!QMwoMD{@p$;19*?I)qC8ntsqE=Wyp>S;j}~WHJzOp>398mPYsmW8I%Mq$ zE)G^%J;8$D2SGQuI@lH58T>K$wOjg$a@tZhP|gV@Pbs9D6Uy`{{-ZJ=4|r|XkxH*^ zU7KnVgUR62o@}@Mgz`ka${+Q7>S}RzyE5I|@9p8g@nQZg|AZgmZ}MHDSVV*+JjV0J zHcFgS!iMj`z$JmAz*K@cEbP8Jhkd3{Xf^YP{JsC~(UNLSrPHYve~N~7k|H0 z79DS4S6P1MvBq8d-1(QXeuZ*3uxIGcQz47_Kcwzz)~=W?uVL8=ow`G&Q-2wIJ*vz4 zpH@pelo(Q)sJ@RCP<$wf>*%tdCcS!tG<`DVnNWSXWTeic89 zQ{o$OOnfAci2Y(v>=M1=aRH)Tw2GCYPFx|%M83!oKI1Rrtnr=kx$&WK*cdckHhPRl zjQflYMzgWPSZGumrN#^+Z0P!L`j7gM{;B?+zF!~c)L+oM^@sF(^mY2p`Z9fG{1<+jALmD9+uq{4`40XB@8WmyHol5q%WHW#FW^(SU;A4-r+u$|p?##iqrI;6 zYotA@-LGxbZqXXGMOuwETZ?KJXomW``jdK6J*K{|9#CIX`_!jYR5z=E_39e6LA^$; zQcFyehU!CM_e_#iGupkEEv2Hpva#KNg$mpyyVy!?ws(c$oy6`~lk_eh8(cp&Xc!wT z8ynQqbFZ>l?k{_psnDJM>|1x^dUh+L>vpooskxs;==e^yfQ}8Yl8lS?_pXgBZpf~U zJX?)EChxb{SdIJ{>o`S^5@%mnTDEt`z&?OunQ@-*HWpez;-eX4;I+_DB2W0pI@-Z z=G0PX=a0BT=acM;Z7<65D3t+9O!kIpU!i^RXhyl~EJ5;}!V!Dch`l1-wRU@9wnr)K zs;1;YD@@5E`@T#+Co^VGnUG0Ed}};rOQyT=m@{b+9hs~~DR+UF)4H_tzeDXvJB_1< z%4v5p%jwQDh>oPDMXB#)i&O0;XVyeR8Ex6* ztfKU7vLquB{@CQ?W*TDmIK!h6LfKx8YBoC;Wg70@>^w%@O|t2Ihb1To$+H^t)fOkS`>wdx8TO7IHq!b3LezCSH&Js%o=u6g6Qa*L9ofbS zmkYr`$VHfe5JkvCn2C^&P=HW~P=pXeC`KqjxCmhuLMg&*ggC++gffJ3gbIX<5iUWf zM5scz6k#qxH9`%-WeAreT!B!Fa3#W32v;M_L%0TEKEeWog$Rof79%V{s6)6GVJX6O z2=xfd5E>A!M_7)q0-+J%281Srl?ba4ZbVp(a1+AK2x|}$Z$nZ|sK=P)p?4Fh(riEY z)BYCg9bbp^p*2X`>O&E?b|_h=xPvE?37xT3ZuaS9Gov>}n_Y5v;(yJ5C_^30 tPzT5oZL#3KjLA2sShOW+yJ(A$Fxz5Vdxw*fX2~s*7D>wO9d2#S`v-PZV_pCN delta 4019 zcmc)Ne{dA#9mnzgaoJq<_V&&*MhRIq+$Fio8PkLx$?;2D8c{(iiGYCoxP-e%Dk4oZ zNTBXTBe7_ih?pmdkw%J?P(=*rsstt?wUsi;bXtebbcT$7bf!pW(oWmyA1%|*-NE;{ z{iA;y=KaU(p1s{?H~Vas)X1=$LVeS{qk^FPNVxIyN~B&K?T)`67Y~0K4e0#}2ABC< z`rq|Q{R@3uKjvHNd(s!v`+XI@&waLUy|3H%mhZ22^(FByx_E%yaYYP?cd~_7#2M2H zdIdpLys{y6CfnOu+meZb6U{hI*zUL@?pxuK`-M|>UXApG$o!4cQdZh1x$KEr=_|2B z*(*d7$<}0hdorQ+Ym%UoUbDw$OMjHCKH*vVYy?|uC;bd+AsvKPlHP!pkcOcc>2-)7 zbqFepwBfq7+Ht(W$QmU>nAfHry;8yzJbhlWV+KrfTthWPY<3GwNVLVWs1AwKq&!vYX~f1ES^W$(hba80FLX5aQFZAwHcCAU>V< zAwHc`5WkuyAwHWih|lIdO-@hetlMAYFKECJzqB8-{R^ZHaj~r|lmcRvde+;Nvzs#z z-_1D0zj@kjSR|bZ-kLWhY;au=lmR8KG$?gSL@8#a@nn?6<5CriPl{`;g!Zlev%by4 ztExI3{l6bJxx!y)Pc=&mq}$Sq@1i=9bH#tFPOyo5f3by6`7QDrh~FZ=hWIV=8N_dq z^ANv9K85%#at`9Z+fN|=yPZfsMQ*Mh?IW)YvVX;-#g=2>q(fs4z30#ghu(GQxI^za z^tMCCyk_paMjeAk(_+JQ(wwN0a}R5WAUaR&O^D7@8)28~q?n~_%lYhe$6kk&BmA0a zgAm`>0L1r&hg8lr>lcK)vvdVtgHDhRK!-@L=35tYcJqp3H+}gRh4ZxOK8PNF+D{>R z{An*i^!U^ELiG64ege_+Mtc#Wn@Q`!Ed|b~=eR`G9vQ+CJ zT9xWth*qUK2clJ}&W31Js>6tDnz%T^geG#cc5vg7)`g`BPH$6vA`C^U{i&%%H9kbGugyuMQ4|S#?v_iTSG=L zJ1`tFju*_J-8n4fYFs^8aJcE@Mfom!ZGdu7? zg)zngWonT519BeoMGS={!bW-K81Bl!$Ss0c?`82KxsaU<8*_7xh;74}%?jEE!vR4I zcQ0Vgr}aYC95Hq~w^Nu}X-vy(+*?~|pc!^;r5SjDy&5etSiIe=W~o28ee8{Pb7gj( z{>yHim2{X!=3DlC9p)8@1(#0`v*n%Uyv&E}K&RQmq|