From 9fbcafb2a3206c7e49223d1b1d8e574536de0ec8 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 1 Mar 2026 00:35:46 -0500 Subject: [PATCH] task5: implement batch38 group D proposal and ack flow --- .../JetStream/NatsConsumer.Acks.cs | 63 +++++ .../JetStream/NatsConsumer.State.cs | 243 ++++++++++++++++++ .../JetStream/NatsConsumer.cs | 4 + .../JetStream/ConsumerStateTests.cs | 81 ++++++ porting.db | Bin 6762496 -> 6766592 bytes 5 files changed, 391 insertions(+) create mode 100644 dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/ConsumerStateTests.cs diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Acks.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Acks.cs index f7cea2b..43f1817 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Acks.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Acks.cs @@ -87,6 +87,69 @@ internal sealed partial class NatsConsumer SendAckReply(reply); } + internal bool ProcessNak(ulong streamSequence, ulong deliverySequence, ulong deliveryCount, byte[] message) + { + ArgumentNullException.ThrowIfNull(message); + _mu.EnterWriteLock(); + try + { + _state.Redelivered ??= []; + _state.Redelivered[streamSequence] = Math.Max(deliveryCount + 1, _state.Redelivered.GetValueOrDefault(streamSequence)); + _state.Pending ??= []; + _state.Pending[streamSequence] = new Pending + { + Sequence = deliverySequence, + Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), + }; + return true; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal bool ProcessTerm(ulong streamSequence, ulong deliverySequence, ulong deliveryCount, string reason, string reply) + { + _mu.EnterWriteLock(); + try + { + _state.Pending?.Remove(streamSequence); + _state.Redelivered ??= []; + _state.Redelivered[streamSequence] = Math.Max(deliveryCount, _state.Redelivered.GetValueOrDefault(streamSequence)); + if (!string.IsNullOrWhiteSpace(reply)) + _lastAckReplySubject = reply; + _ = reason; + _ = deliverySequence; + return true; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal TimeSpan AckWait(TimeSpan backOff) + { + if (backOff > TimeSpan.Zero) + return backOff; + + return Config.AckWait > TimeSpan.Zero ? Config.AckWait : DefaultAckWait; + } + + internal bool CheckRedelivered(ulong streamSequence) + { + _mu.EnterReadLock(); + try + { + return _state.Redelivered?.TryGetValue(streamSequence, out var count) == true && count > 1; + } + finally + { + _mu.ExitReadLock(); + } + } + internal void ProgressUpdate(ulong sequence) { _mu.EnterWriteLock(); diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.State.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.State.cs index 6ff6351..afae093 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.State.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.State.cs @@ -339,4 +339,247 @@ internal sealed partial class NatsConsumer PriorityPolicy = cfg.PriorityPolicy, PinnedTTL = cfg.PinnedTTL, }; + + internal void ResetLocalStartingSeq(ulong sequence) + { + _mu.EnterWriteLock(); + try + { + _state.Delivered.Stream = sequence; + _state.Delivered.Consumer = sequence; + _state.AckFloor.Stream = sequence > 0 ? sequence - 1 : 0; + _state.AckFloor.Consumer = sequence > 0 ? sequence - 1 : 0; + _state.Pending = []; + _state.Redelivered = []; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal int LoopAndForwardProposals() + { + _mu.EnterWriteLock(); + try + { + var count = 0; + while (_proposalQueue.TryDequeue(out _)) + { + count++; + } + + return count; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void Propose(byte[] proposal) + { + ArgumentNullException.ThrowIfNull(proposal); + _mu.EnterWriteLock(); + try + { + _proposalQueue.Enqueue((byte[])proposal.Clone()); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void UpdateDelivered(ulong consumerSequence, ulong streamSequence, ulong deliveryCount, long timestamp) + { + _mu.EnterWriteLock(); + try + { + _state.Delivered.Consumer = Math.Max(_state.Delivered.Consumer, consumerSequence); + _state.Delivered.Stream = Math.Max(_state.Delivered.Stream, streamSequence); + _state.AckFloor.Consumer = Math.Max(_state.AckFloor.Consumer, consumerSequence > 0 ? consumerSequence - 1 : 0); + _state.AckFloor.Stream = Math.Max(_state.AckFloor.Stream, streamSequence > 0 ? streamSequence - 1 : 0); + _state.Pending ??= []; + _state.Pending[streamSequence] = new Pending + { + Sequence = consumerSequence, + Timestamp = timestamp, + }; + + _state.Redelivered ??= []; + if (deliveryCount > 1) + _state.Redelivered[streamSequence] = deliveryCount; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void AddAckReply(ulong streamSequence, string reply) + { + if (string.IsNullOrWhiteSpace(reply)) + return; + + _mu.EnterWriteLock(); + try + { + _ackReplies[streamSequence] = reply; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void AddReplicatedQueuedMsg(ulong sequence, JsPubMsg message) + { + ArgumentNullException.ThrowIfNull(message); + _mu.EnterWriteLock(); + try + { + _state.Pending ??= []; + _state.Pending[sequence] = new Pending + { + Sequence = sequence, + Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), + }; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal int UpdateAcks() + { + _mu.EnterWriteLock(); + try + { + var count = _ackReplies.Count; + _ackReplies.Clear(); + return count; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void AddClusterPendingRequest(string requestId) + { + if (string.IsNullOrWhiteSpace(requestId)) + return; + + _mu.EnterWriteLock(); + try + { + _clusterPendingRequests[requestId] = DateTime.UtcNow; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void RemoveClusterPendingRequest(string requestId) + { + if (string.IsNullOrWhiteSpace(requestId)) + return; + + _mu.EnterWriteLock(); + try + { + _clusterPendingRequests.Remove(requestId); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void SetPendingRequestsOk(bool ok) + { + _mu.EnterWriteLock(); + try + { + _pendingRequestsOk = ok; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal bool PendingRequestsOk() + { + _mu.EnterReadLock(); + try + { + return _pendingRequestsOk; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal bool CheckAndSetPendingRequestsOk(bool ok) + { + _mu.EnterWriteLock(); + try + { + var previous = _pendingRequestsOk; + _pendingRequestsOk = ok; + return previous; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal int CheckPendingRequests(TimeSpan maxAge) + { + _mu.EnterReadLock(); + try + { + var cutoff = DateTime.UtcNow - maxAge; + return _clusterPendingRequests.Values.Count(timestamp => timestamp >= cutoff); + } + finally + { + _mu.ExitReadLock(); + } + } + + internal int ReleaseAnyPendingRequests() + { + _mu.EnterWriteLock(); + try + { + var released = _clusterPendingRequests.Count; + _clusterPendingRequests.Clear(); + _pendingRequestsOk = true; + return released; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal ConsumerState ReadStoredState() + { + _mu.EnterReadLock(); + try + { + return GetConsumerState(); + } + finally + { + _mu.ExitReadLock(); + } + } } diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs index 57b2b68..ba76a8a 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.cs @@ -48,6 +48,10 @@ internal sealed partial class NatsConsumer : IDisposable private bool _hasLocalDeliveryInterest; private long _rateLimitBitsPerSecond; private int _rateLimitBurstBytes; + private readonly Queue _proposalQueue = new(); + private readonly Dictionary _ackReplies = new(); + private readonly Dictionary _clusterPendingRequests = new(StringComparer.Ordinal); + private bool _pendingRequestsOk; /// IRaftNode — stored as object to avoid cross-dependency on Raft session. private object? _node; diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/ConsumerStateTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/ConsumerStateTests.cs new file mode 100644 index 0000000..3f51cf8 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/ConsumerStateTests.cs @@ -0,0 +1,81 @@ +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +public sealed class ConsumerStateTests +{ + private static NatsConsumer CreateConsumer() + { + var account = new Account { Name = "A" }; + var stream = NatsStream.Create(account, new StreamConfig { Name = "S", Subjects = ["foo"] }, null, null, null, null)!; + return NatsConsumer.Create(stream, new ConsumerConfig { Durable = "D", AckPolicy = AckPolicy.AckExplicit }, ConsumerAction.Create, null)!; + } + + [Fact] + public void ProposalAndPendingRequestFlow_ShouldBehave() + { + var consumer = CreateConsumer(); + + consumer.Propose([1, 2, 3]); + consumer.Propose([4]); + consumer.LoopAndForwardProposals().ShouldBe(2); + + consumer.AddClusterPendingRequest("r1"); + consumer.AddClusterPendingRequest("r2"); + consumer.CheckPendingRequests(TimeSpan.FromMinutes(1)).ShouldBe(2); + consumer.RemoveClusterPendingRequest("r2"); + consumer.CheckPendingRequests(TimeSpan.FromMinutes(1)).ShouldBe(1); + + consumer.SetPendingRequestsOk(false); + consumer.PendingRequestsOk().ShouldBeFalse(); + consumer.CheckAndSetPendingRequestsOk(true).ShouldBeFalse(); + consumer.PendingRequestsOk().ShouldBeTrue(); + consumer.ReleaseAnyPendingRequests().ShouldBe(1); + } + + [Fact] + public void DeliveredAckReplyAndAcks_ShouldBehave() + { + var consumer = CreateConsumer(); + + consumer.UpdateDelivered(10, 20, 2, DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()); + consumer.AddAckReply(20, "reply"); + consumer.UpdateAcks().ShouldBe(1); + + var state = consumer.ReadStoredState(); + state.Delivered.Consumer.ShouldBeGreaterThanOrEqualTo(10UL); + state.Delivered.Stream.ShouldBeGreaterThanOrEqualTo(20UL); + state.Redelivered.ShouldNotBeNull(); + state.Redelivered!.ShouldContainKey(20UL); + } + + [Fact] + public void ReplicatedQueueAndNakTermFlow_ShouldBehave() + { + var consumer = CreateConsumer(); + + consumer.AddReplicatedQueuedMsg(33, new JsPubMsg { Subject = "foo" }); + consumer.ProcessNak(33, 2, 1, "-NAK"u8.ToArray()).ShouldBeTrue(); + consumer.CheckRedelivered(33).ShouldBeTrue(); + consumer.ProcessNak(33, 2, 2, "-NAK"u8.ToArray()).ShouldBeTrue(); + consumer.CheckRedelivered(33).ShouldBeTrue(); + + consumer.ProcessTerm(33, 2, 2, "done", "reply").ShouldBeTrue(); + consumer.AckWait(TimeSpan.Zero).ShouldBe(TimeSpan.FromSeconds(30)); + consumer.AckWait(TimeSpan.FromSeconds(5)).ShouldBe(TimeSpan.FromSeconds(5)); + } + + [Fact] + public void ResetLocalStartingSeq_ShouldResetState() + { + var consumer = CreateConsumer(); + + consumer.UpdateDelivered(1, 1, 1, DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()); + consumer.ResetLocalStartingSeq(100); + + var state = consumer.GetConsumerState(); + state.Delivered.Stream.ShouldBe(100UL); + state.AckFloor.Stream.ShouldBe(99UL); + } +} diff --git a/porting.db b/porting.db index 2b19db4c9c3f1122949f011fddc0a8b8cfcfdf9b..479ad4bd56eacca8872d394b13742e8b4a23af32 100644 GIT binary patch delta 3441 zcmaKueN0=|8OD8&zwKiilLQF1xi}$wgaqsugZW7L2<>P-3ZX26v`hFhTGW=Um~_>e zb&!fefF^__ezR(HB+IllTUSaeU5ROHw`^0VwGwp8A8n&F>ZG)PtQ)PmN!8M9kFT*Y zyLTY{r02OW_qp#m_gtSdbn6G&p`jV=luQ4zMq?N}n_DZq{7PqCsqph>E|p4`N`;;` zghxrgEL6~urOpiL$z@@SK)Ig^`O@>B3I?stw7@pfmCuEiUeki`MsK(;JTMUMWs`98 zNjCv^CF#cDqDglOZY=4>;J%Y|qi~NV-Ls|zsr{cqx3ZLiNgO0_w1=#-UndbqeY}S&c!} zLDo0iXwNX}4D2uEvNN@GCkYAgt`6U@=Sy7hZO+$@-PQO(b z)tR*?*gsgea9wCZwP}_`GKbAG+bkLn^9&m|)du>)U4zF5!VyMiy}diB?x_yGx#YpY zuEB6gPo%1>!WVl)ch_LgHv*wt>iTQgBL=GiWmP^ua`IGiCTZ#0T{{#(!Jr~2XEkW6 zN)UDR9Ck&29L^x$;~l2drUK=Ptdyp({rMhcQ@fO-bFVh#RRq!Ep$<=UlBNIe z@B)=(>$Q}tNQ$sA)s_MwWlJez6IFsnRVO@9?pJh_GPWE=5Dm5epRtt%6iF$s&Z9`e zTJq*QU25|btXL<@G~DR2wgU7q$>oTjPzUPrhY+3rt)|n;Kj7udZx;jfKIRW`g zQ(xYunQWDu3#vJOWsY%WAxUjNVK12s_*(C>Y%-n}uTe+>!{xAE-_V*kM>;&s) zovf8LvT|0)vTY}9{kBe9t8`qb%9>~P9CqHO^{^+x1IPM~;RpHL8aGX^ zd7MRbEXQf18y=@acupFhXD?Xjs@s{(*eLxv)9EU{xaJ;O+0XIJT`=}ykrr1a+NtR~GgZ+#adHVjI95x+jn`Yw6jVI|TFORU-wn~UO^ zZKEH&&wv)_fF2lt5tx7(Sfmf$w-&YSwEo3fX*q3Pa5(JSY)APO(+v|hwi_n(le#AD z1nU#3G(m*M|40n*%QWrvcq#3=+d_W}@M4;4q)UC^)3L%Q4Hj1-y0(gUQP+~Yi2hi`bLFe?=J$^nsrI_tMVqSm zKKg9V<%#~m%IVf8RwGSS^P)t+wQBxdIyqsCm7UaIvr>t-)-ocXwVlVwziiEr+I;*^ zOgCIHqNkg`v${sF)o@?~cHjVMzzNd92Cxxq0vR9^WP!~f8^rE+0}sfNuGNV5Xt&y* z(%3bRX&fcmpRq9^PxBc6f_p5><~!yblVB`16zYxS_X+n?q)t3S-IuH$TJ&)n-F-kb zkWeqW6Qe4q7ym<-n=P?X+38?|7=N!=4HFIGHA-*AFnu3c^>kv7SeUr!7kk9d>79bi z_)N*=#m1F8b+K0^E!-%8^{L*pb!*+ z?VuR!06RemCf(Jnx_$oLAz6RPs2RIBK0uO_)gCpQ5=mcG$8}xu)5SG4d z?$h4Nuwm+ryg=iK*l*iAY;W0W`8+R>y#AwliykfxM7Q0;Ld=@M4JMfT#&2Ask5*ZI33G z1f8Il00Fe1gx357<;9mmt3?fBw~cjpQ)X(_qLB^*Em1qx+S+N%1ZJRU`^#SL)S3R} z-F#-g-+T7iy?uK+Iu7t1A3nwJ@rW&FD2Rhx zfm>WE&J{DoWYH}WVOaPZxa-VJd*vWEm9zJM!AZOz-sxHl&kb`y=*}|>%!hT}N9r3I z8=9LN>d8)cEzg`RknKvPU3HZU@u20KlR*!dbD*Ty^rm!wYx6zdYLAUHyX-pmPD`}Y z<#L^fw6umu2d3wp6vcGdNe^M_anggBb~$M?rY%lt$8^7w9>5fK(*2m~owNzlYA3Z} z3OcD3(_$w@FwKj!o!4`fAOAnJs^nkiY-Kw^!8`L#wg<7lz)O6LO8Y-s@ zE1xQtly{U<$}7qdWuLN3i7L%XgHoj|Q|2o2K%@^dRk~+vp~GZW_k~;Y-^V}3ck%6f6Mr{f!>{6Rqm{Ib z7U5}T(y4SJHK-unkOoP+m$d)iZr(?je7{>gYbh~?3E zIe?`zUY^3z5-(3;xjSC=V_6+9`>@;|FHc}8iWk5#J657$+JkL+eCTm3lj7xPSXjLL zlzl+{8!vmY48=<~mQO~D!rwwh9#r+-L~2Z=hD555r>Gi=M|E8y)g{u}M5;}s)rnLc zPug~4P3-;AwqbJKA8jip=l#(-F*)y#){&SIbry!2(Pc%yz|Ofc;){&_hJN1VO|-Mv zj6^&0%@svIiSxeRvJczWTfPca8;Ktd-ACSoi(#@GIyaIfCgXivTSN1P#tjYip_W!S z^p$&(-Cs`>&aOO4j>(dq?)oQr1nA4;f=zk};b!QidOm$yPdEGIS@^~!$pXEmKcWrm zoAt0>$Jg>f=?S??fiW5Gu`uyCG2qIO>4lX;xF#MQGCg+Bak5u6j7zRi{K&#o=(uY7 z!MaTzw4q$~R_$ec>d@a?8Ei0RH5< z)n?~Oh=b04l5HPcVJ=j63|uCNM*e9pvgWUxNh-J#oZXwu)J;}vYbY?;4Melgnp34r#Hk?A40?TK!?2!!(*fpE2hoQ)L z9}Hgd>aekziIAMfe6js6&0_Qbq^z*^i*< zEzJ+%Y9_%$#f(CB0rNPce0G-%4_?$fu&ID8fW5!?X&Z&`FyIVy9`&vWzOPz$)j9`E2963o!6dKUsB$aPtYsU zWw8$y4%P)BTx8Wi^L`=rr?YQrGTbb(oO9M8ZJ~7&lK!KN3>vVk*mADC4m*pjQ_%d0 zk^+a?>k9}m-{{^x)kJu!baa**AB$snV*{P7IlffFd0@gK>wrxc zSw9gW=^p2@8Sr|UH9fYk!7}Sz2wfAuv9HgUS!uD$nOtsl#ctxMa?81~x_!OeO6TDC z!>P`Uo{#YQuUKw5kAneO6;>03e(0PEgBvn4VW4#MJbQjDNIRsUg{GhsG!>9}5@}mrNE6PMO(JYjOvQZAoMYB;J3ZQ&cfC|wZG#7mj%|qWu^U(rSgchP=RDw#; zBD5GSK}*qXs0=Mb<>+>_995toT7g!gRj3l(f$l_ip(<33R-+%FHK+#FqP3_FtwSNS O9@V1;TM9Pb82BGdax4@8