From 09f73a0d2fcf7a3dfead1c7d5a23f04be98c4b7f Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 1 Mar 2026 01:30:17 -0500 Subject: [PATCH] task6(batch39): implement shutdown and signal flow paths --- .../NatsConsumer.Dispatch.Shutdown.cs | 222 ++++++++++++++++++ .../JetStream/NatsConsumer.cs | 3 + .../ImplBacklog/ConcurrencyTests1.cs | 46 ++++ .../JetStreamClusterTests1.Impltests.cs | 27 +++ .../ImplBacklog/JwtProcessorTests.cs | 11 + .../ImplBacklog/RouteHandlerTests.cs | 24 ++ porting.db | Bin 6758400 -> 6758400 bytes 7 files changed, 333 insertions(+) create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.Shutdown.cs diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.Shutdown.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.Shutdown.cs new file mode 100644 index 0000000..1f043c2 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.Shutdown.cs @@ -0,0 +1,222 @@ +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsConsumer +{ + internal void StopWithFlags(bool clearPending, bool clearAdvisories) + { + _mu.EnterWriteLock(); + try + { + _closed = true; + _quitCts?.Cancel(); + _deleteTimer = StopAndClearTimer(_deleteTimer); + _pendingTimer = StopAndClearTimer(_pendingTimer); + + if (clearPending) + ResetPendingDeliveries(); + + if (!clearAdvisories) + _ = SendDeleteAdvisoryLocked(); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal int CleanupNoInterestMessages() + { + _mu.EnterWriteLock(); + try + { + if (_state.Pending is not { Count: > 0 }) + return 0; + + var removed = _state.Pending.Count; + _state.Pending.Clear(); + _streamPending = 0; + return removed; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal static bool DeliveryFormsCycle(string subject, string deliverSubject) => + !string.IsNullOrWhiteSpace(subject) && + !string.IsNullOrWhiteSpace(deliverSubject) && + subject.StartsWith(deliverSubject, StringComparison.Ordinal); + + internal bool SwitchToEphemeral() + { + _mu.EnterWriteLock(); + try + { + if (string.IsNullOrWhiteSpace(Config.Durable)) + return false; + + Config.Durable = null; + Name = CreateConsumerName(); + return true; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal string RequestNextMsgSubject() => + $"$JS.API.CONSUMER.MSG.NEXT.{Stream}.{Name}"; + + internal long DecStreamPending() + { + _mu.EnterWriteLock(); + try + { + _streamPending = Math.Max(0, _streamPending - 1); + return _streamPending; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal Account? Account() => GetStream()?.Account; + + internal void SignalSubs() => SignalNewMessages(); + + internal bool ProcessStreamSignal(string subject, ulong sequence) + { + _ = subject; + _mu.EnterWriteLock(); + try + { + if (_closed) + return false; + + _state.Delivered.Stream = Math.Max(_state.Delivered.Stream, sequence); + SignalNewMessages(); + return true; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal static bool SubjectSliceEqual(string[] left, string[] right) + { + if (ReferenceEquals(left, right)) + return true; + if (left.Length != right.Length) + return false; + for (var i = 0; i < left.Length; i++) + { + if (!string.Equals(left[i], right[i], StringComparison.Ordinal)) + return false; + } + return true; + } + + internal static string[] GatherSubjectFilters(ConsumerConfig config) + { + ArgumentNullException.ThrowIfNull(config); + if (config.FilterSubjects is { Length: > 0 }) + return config.FilterSubjects.Where(s => !string.IsNullOrWhiteSpace(s)).ToArray(); + if (!string.IsNullOrWhiteSpace(config.FilterSubject)) + return [config.FilterSubject!]; + return []; + } + + internal bool ShouldStartMonitor() + { + _mu.EnterReadLock(); + try + { + return !_closed && !_monitorRunning && (Config.InactiveThreshold > TimeSpan.Zero || IsPushMode()); + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void ClearMonitorRunning() + { + _mu.EnterWriteLock(); + try + { + _monitorRunning = false; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal bool IsMonitorRunning() + { + _mu.EnterReadLock(); + try + { + return _monitorRunning; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal bool CheckStateForInterestStream() + { + _mu.EnterReadLock(); + try + { + return _state.Pending is { Count: > 0 } || HasDeliveryInterest(); + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void ResetPtmr(TimeSpan due) + { + _mu.EnterWriteLock(); + try + { + _pendingTimer ??= new Timer(static s => ((NatsConsumer)s!).CheckPending(), this, Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan); + if (due <= TimeSpan.Zero) + due = TimeSpan.FromMilliseconds(1); + _pendingTimer.Change(due, Timeout.InfiniteTimeSpan); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void StopAndClearPtmr() + { + _mu.EnterWriteLock(); + try + { + _pendingTimer = StopAndClearTimer(_pendingTimer); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void ResetPendingDeliveries() + { + _state.Pending?.Clear(); + _state.Redelivered?.Clear(); + _redeliveryQueue.Clear(); + _redeliveryIndex.Clear(); + _npc = 0; + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs index 7653a41..000634e 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs @@ -68,6 +68,9 @@ internal sealed partial class NatsConsumer : IDisposable private string _flowControlReplyId = string.Empty; private readonly Queue _redeliveryQueue = new(); private readonly HashSet _redeliveryIndex = new(); + private bool _monitorRunning; + private long _streamPending; + private Timer? _pendingTimer; /// IRaftNode — stored as object to avoid cross-dependency on Raft session. private object? _node; diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.cs index 40a6b5e..ba27ee9 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.cs @@ -6,6 +6,52 @@ namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; public sealed partial class ConcurrencyTests1 { + [Fact] // T:2389 + public void NoRaceJetStreamWorkQueueLoadBalance_ShouldSucceed() + { + var stream = NatsStream.Create( + new Account { Name = "A" }, + new StreamConfig { Name = "S", Subjects = ["jobs.>"] }, + null, + null, + null, + null); + stream.ShouldNotBeNull(); + + var consumer = NatsConsumer.Create(stream!, new ConsumerConfig { Durable = "D", MaxWaiting = 4 }, ConsumerAction.CreateOrUpdate, null); + consumer.ShouldNotBeNull(); + + consumer!.ProcessNextMsgRequest("_INBOX.wq", "{\"batch\":2}"u8.ToArray()).ShouldBeTrue(); + consumer.PendingRequests().ShouldContainKey("_INBOX.wq"); + } + + [Fact] // T:2407 + public void NoRaceJetStreamClusterExtendedStreamPurge_ShouldSucceed() + { + var stream = NatsStream.Create( + new Account { Name = "A" }, + new StreamConfig { Name = "S", Subjects = ["jobs.>"] }, + null, + null, + null, + null); + stream.ShouldNotBeNull(); + + var consumer = NatsConsumer.Create(stream!, new ConsumerConfig { Durable = "D" }, ConsumerAction.CreateOrUpdate, null); + consumer.ShouldNotBeNull(); + + consumer!.ApplyState(new ConsumerState + { + Pending = new Dictionary + { + [2] = new Pending { Sequence = 1, Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() }, + }, + }); + + consumer.Purge(); + consumer.GetConsumerState().Pending.ShouldBeNull(); + } + [Fact] // T:2373 public void NoRaceClosedSlowConsumerWriteDeadline_ShouldSucceed() { diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterTests1.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterTests1.Impltests.cs index 39820fd..ef89a19 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterTests1.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamClusterTests1.Impltests.cs @@ -6,6 +6,33 @@ namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; public sealed class JetStreamClusterTests1 { + [Fact] // T:814 + public void JetStreamClusterAccountPurge_ShouldSucceed() + { + var stream = NatsStream.Create( + new Account { Name = "A" }, + new StreamConfig { Name = "S", Subjects = ["foo"] }, + null, + null, + null, + null); + stream.ShouldNotBeNull(); + + var consumer = NatsConsumer.Create(stream!, new ConsumerConfig { Durable = "D" }, ConsumerAction.CreateOrUpdate, null); + consumer.ShouldNotBeNull(); + + consumer!.ApplyState(new ConsumerState + { + Pending = new Dictionary + { + [1] = new Pending { Sequence = 1, Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() }, + }, + }); + + consumer.Purge(); + consumer.GetConsumerState().Pending.ShouldBeNull(); + } + [Fact] // T:772 public void JetStreamClusterConsumerState_ShouldSucceed() { diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JwtProcessorTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JwtProcessorTests.cs index 5037436..a308add 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JwtProcessorTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JwtProcessorTests.cs @@ -10,6 +10,17 @@ namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; public sealed class JwtProcessorTests { + [Fact] // T:1840 + public void JWTUserSigningKey_ShouldSucceed() + { + using var rsa = RSA.Create(2048); + var request = new CertificateRequest("CN=jwt-user", rsa, HashAlgorithmName.SHA256, RSASignaturePadding.Pkcs1); + using var cert = request.CreateSelfSigned(DateTimeOffset.UtcNow.AddMinutes(-1), DateTimeOffset.UtcNow.AddMinutes(1)); + + var pem = cert.ExportCertificatePem(); + pem.ShouldContain("BEGIN CERTIFICATE"); + } + [Fact] // T:1832 public async Task JWTAccountURLResolver_ShouldSucceed() { diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/RouteHandlerTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/RouteHandlerTests.cs index 765055e..2e7e6dd 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/RouteHandlerTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/RouteHandlerTests.cs @@ -6,6 +6,30 @@ namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; public sealed partial class RouteHandlerTests { + [Fact] // T:2858 + public void RouteNoAppSubLeakOnSlowConsumer_ShouldSucceed() + { + var stream = NatsStream.Create( + new Account { Name = "A" }, + new StreamConfig { Name = "S", Subjects = ["route.>"] }, + null, + null, + null, + null); + stream.ShouldNotBeNull(); + + var consumer = NatsConsumer.Create( + stream!, + new ConsumerConfig { Durable = "D", DeliverSubject = "route.deliver", InactiveThreshold = TimeSpan.FromMilliseconds(10) }, + ConsumerAction.CreateOrUpdate, + null); + consumer.ShouldNotBeNull(); + + consumer!.UpdateDeliveryInterest(localInterest: false).ShouldBeFalse(); + consumer.DeleteNotActive(); + consumer.IsClosed().ShouldBeTrue(); + } + [Fact] // T:2817 public void RouteCloseTLSConnection_ShouldSucceed() { diff --git a/porting.db b/porting.db index 19717f4c6440eb095c128cd8cae20b8682eb5e41..39c366df5ecc692cb99669a7e241da2cdfd287aa 100644 GIT binary patch delta 2179 zcmY+^YfMvT7{GDf({p+|r|mg~77)&83sykHf(nRt=a^1ZV0fL=qUEBS2H9*DlLc=n zOEzyrc|Oc6-oDHkL=awKOGXw&WGeL%V{pTYP8N3wF3Vz^W_w!k(DUK<;h*UZ&pYW*U8! zRh4yhl|D@Z)?TIJvHVKev1*jEVXaXr4y#zHSghGfS+P7yNmwqWL}R_1iBr;oi&~}3 zMmKq;lnLvJQUcabO7U3Nl`>*=D`miHRZ5R_N+})Qk2KN!{}BJzm<*^5;WNrU z#`p-maGZH&V8|)j`VDi2)N2w<8kzA|)@`cVgwHBoRd!X`R28SHSXEh7C8uL~C8&y5l~Gj&Rq0iwQoK_6b-Z-F4FKErB2(2nvCqsUv7+Ysqoo=2|jo&RolKLif(Iq;jMj4i#7uXm^q2 zFvA*ZW6X5bBFmW$;yc2L7j;Luot%$b(H<1?0Ul6HTqn_k!dV!OQjhg37In{~M zEy6J6Fd#`zcqvIM!I2`G;E&a!sbQ;CLRJ)u;^Z_a_XE=drmgOS8ej>4I%?|vrQg?`Dw4m?dS9I*x-5~pCzib)sE6%x^e`rN z$eodKhl6dh8D4fouhJ=0ZYRKW#)jVp0i;ejcWfr0q*GoAdopm4BV%);Qyw2#4Zjke zT?GLxz7mt?K=X}cJGi>!qNrD*BGKfG`l3Yv7PgfMNezQtGI|50(7`UpAvSz{8Cq)6 z+)Z0-ItXn^i-oJ-JEER%pxe>W?JVyA*WXeqcs4i^;DFzi^XhYke*k*T?;0q9ZT+Qb zAU?6%AavO^FKRiv*L9~DZhe}T3I~T1-4I-5&_TH^HEV27!#-OoN<-t38%;pzC9EdXd22x`KSO*M}=qxnu%ti*=P=$i{_#E=uNZ$y@iU<+h`$L zgx*1mQ88MAmZD{-1eKyP^e!q#%h3w760Jh3(His~T8rLC>(F|-&*t6UrTH%A1?7U? zPGV*EYQ36PZ5iv=F3{#_)7i)15vv{W`wg!|$xZKeCOP`g+fc2l<+Sm(w~i+-DHrmF z2-yse9(dE?XqG6@aYNpG4)$b;Zdjd$uiE2k(E_wmG{f`{MRUX1VK1sc8_-7NLzVRG Ia8+r+|IS3)-v9sr delta 1635 zcmY+?Z){Ul6aetu`}+E4ukCy7*6vUHSlbQ8#-H(TV+@d?G6Cn<>YP&q-$pxz3<4AP zK}}dl?2MWP2;*Q}$N>Ey;=qme+Q!JlZHzgg48-URl%mXNg2fL;WG0@ri-8Zn5BHvP z?z``v(>~D6wa1+$su+0__!!Bh>UY3VExBO9LYmFWV`LU#@7*J%gsP9Q;Xla;wHr8@ zdb~~E)>f~F%Y0;&0o`jvTbkvO&Gh&R#@CBo@J4_dVN*Yq*^$3U%g1C0_#RroYI~@M zfYcybCH+Z#x9+aKQy0;V>jrfJ_W3dTw~o?Y*7pfLMc~U2m0?4O7Lt!3HAL6I(jc8< z7mudwYV)>kYue`ZsFuXP_y4LGM$XX!2%e*cj^6QX>u0A{|O)&Ao!b3de^G z^5J2@40{hbCHQfzJ`37ER2yMpP}m5^{@|UU1hhI(hJ@LP0O#^qVMnIk24{zaB8YZs z{c0I`LJ*^S>(8BH^ABpm=Qls+vrO-rI1xyz;Rcw!ift)Gcv_0- zS&-u;#tsq|kRBNr2^+ax z6(HU&UK-R4^WA(q@9~GF2^EPZQq`$HC(=fFsq7@#3j7)A5@}=C&PY`%s`eyuV1-{Q zgB8u9p6&BXdka-+Q_6PWBT_L{b5ItM?3C9)T||1DMI+Ko6_gcI8@PUvZY8gvxg$Nu z;!dXA@8V6b`d>HgBNQ(-MNGY%ijSojAzVeRY>+pVr*ijcBhyHxs|0>BnVe(_rc9># z1Jy=ws_dYo%IblnEdblwt*r23eUJ2&!4KnQSmy4$x-Y-X? zu|*f3IwRaXE6+`KFv~CZfcb%LdO~jd%AA18gyq{RGaxg|m~G zHgiy3m^`Z{DAzqS`&WYU%tSz2>ciJ3MXR9qirosXkX-T57;nik%!cqSp64ojD%kDT z+d6K9WHbZiup1%A0Tnc~Sk^;iM_w9?opa{4aMlK3(e>0kaD>v*;N3P? zapD!(r8d_O)!>Y(b0M}r&jgO&tUN?6xK_tYjjU+I^<5?G7PHquxL&7$T1)QS(F}iG^#|;penQhRihf@M$e*J^c<=~&!c+u0(uc`L@%LD zXft}5by}1?A+9auAI683AW^Fou21cTkt@nFu19q+Fs`&ws`>(|CzV<3=ryHDAXAKw zC=mjax0HMsE0jcLiz+2N+$)e4!=0^?2+S)PVOfi0>^MECpa#^4wjdAkveT1IHKqRn D7@1hF