From df3b44789b12e03524e2cc4d9cb65684b86a2689 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 1 Mar 2026 00:39:08 -0500 Subject: [PATCH] task6: implement batch38 group E state snapshot and wait queue --- .../JetStream/NatsConsumer.Acks.cs | 8 + .../JetStream/NatsConsumer.State.cs | 156 ++++++++++++++++++ .../JetStream/NatsConsumer.WaitQueue.cs | 6 + .../JetStream/StreamTypes.cs | 38 +++++ .../JetStream/ConsumerStateTests.cs | 48 ++++++ .../JetStream/WaitQueueTests.cs | 35 ++++ porting.db | Bin 6766592 -> 6766592 bytes 7 files changed, 291 insertions(+) create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.WaitQueue.cs diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Acks.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Acks.cs index 43f1817..b671048 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Acks.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Acks.cs @@ -87,6 +87,14 @@ internal sealed partial class NatsConsumer SendAckReply(reply); } + internal bool ProcessAckMsg(ulong streamSequence, ulong deliverySequence, ulong deliveryCount, string reply, bool doSample) + { + ProcessAckMessage(streamSequence, deliverySequence, deliveryCount, reply); + if (doSample && NeedAck()) + SampleAck(reply); + return true; + } + internal bool ProcessNak(ulong streamSequence, ulong deliverySequence, ulong deliveryCount, byte[] message) { ArgumentNullException.ThrowIfNull(message); diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.State.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.State.cs index afae093..86e5c99 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.State.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.State.cs @@ -1,10 +1,12 @@ using System.Text.Json; +using System.Text; namespace ZB.MOM.NatsNet.Server; internal sealed partial class NatsConsumer { private static readonly TimeSpan DefaultGatewayInterestInterval = TimeSpan.FromSeconds(1); + private ConsumerInfo? _initialInfo; internal bool UpdateDeliveryInterest(bool localInterest) { @@ -582,4 +584,158 @@ internal sealed partial class NatsConsumer _mu.ExitReadLock(); } } + + internal void ApplyState(ConsumerState state) + { + ArgumentNullException.ThrowIfNull(state); + _mu.EnterWriteLock(); + try + { + _state = new ConsumerState + { + Delivered = new SequencePair + { + Consumer = state.Delivered.Consumer, + Stream = state.Delivered.Stream, + }, + AckFloor = new SequencePair + { + Consumer = state.AckFloor.Consumer, + Stream = state.AckFloor.Stream, + }, + Pending = state.Pending is null ? null : new Dictionary(state.Pending), + Redelivered = state.Redelivered is null ? null : new Dictionary(state.Redelivered), + }; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void SetStoreState(ConsumerState state) => ApplyState(state); + + internal ConsumerState WriteStoreState() + { + _mu.EnterWriteLock(); + try + { + return WriteStoreStateUnlocked(); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal ConsumerState WriteStoreStateUnlocked() => GetConsumerState(); + + internal ConsumerInfo InitialInfo() + { + _mu.EnterWriteLock(); + try + { + _initialInfo ??= GetInfo(); + return _initialInfo; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void ClearInitialInfo() + { + _mu.EnterWriteLock(); + try + { + _initialInfo = null; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal ConsumerInfo Info() => GetInfo(); + + internal ConsumerInfo InfoWithSnap(ConsumerState? snapshot = null) + { + if (snapshot is null) + return GetInfo(); + + var info = GetInfo(); + info.Delivered = new SequenceInfo { Consumer = snapshot.Delivered.Consumer, Stream = snapshot.Delivered.Stream }; + info.AckFloor = new SequenceInfo { Consumer = snapshot.AckFloor.Consumer, Stream = snapshot.AckFloor.Stream }; + return info; + } + + internal (ConsumerInfo Info, string ReplySubject) InfoWithSnapAndReply(string replySubject, ConsumerState? snapshot = null) => + (InfoWithSnap(snapshot), replySubject); + + internal void SignalNewMessages() => _updateChannel.Writer.TryWrite(true); + + internal bool ShouldSample() + { + if (string.IsNullOrWhiteSpace(Config.SampleFrequency)) + return false; + + var token = Config.SampleFrequency!.Trim().TrimEnd('%'); + if (!int.TryParse(token, out var percent)) + return false; + + return percent > 0; + } + + internal bool SampleAck(string ackReply) + { + if (!ShouldSample()) + return false; + + if (string.IsNullOrWhiteSpace(ackReply)) + return false; + + AddAckReply((ulong)ackReply.Length, ackReply); + return true; + } + + internal bool IsFiltered(string subject) + { + if (string.IsNullOrWhiteSpace(subject)) + return false; + + if (!string.IsNullOrWhiteSpace(Config.FilterSubject)) + return Internal.DataStructures.SubscriptionIndex.SubjectIsSubsetMatch(subject, Config.FilterSubject!); + + if (Config.FilterSubjects is not { Length: > 0 }) + return false; + + return Config.FilterSubjects.Any(filter => Internal.DataStructures.SubscriptionIndex.SubjectIsSubsetMatch(subject, filter)); + } + + internal bool NeedAck() => Config.AckPolicy != AckPolicy.AckNone; + + internal static (JsApiConsumerGetNextRequest? Request, Exception? Error) NextReqFromMsg(ReadOnlySpan message) + { + if (message.Length == 0) + return (new JsApiConsumerGetNextRequest { Batch = 1 }, null); + + try + { + var text = Encoding.UTF8.GetString(message); + if (int.TryParse(text, out var batch)) + return (new JsApiConsumerGetNextRequest { Batch = Math.Max(1, batch) }, null); + + var req = JsonSerializer.Deserialize(text); + if (req is null) + return (null, new InvalidOperationException("invalid request")); + if (req.Batch <= 0) + req.Batch = 1; + return (req, null); + } + catch (Exception ex) + { + return (null, ex); + } + } } diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.WaitQueue.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.WaitQueue.cs new file mode 100644 index 0000000..e9b9335 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.WaitQueue.cs @@ -0,0 +1,6 @@ +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsConsumer +{ + internal static WaitQueue NewWaitQueue(int max = 0) => WaitQueue.NewWaitQueue(max); +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs index 8512e52..b49deb0 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.cs @@ -456,6 +456,42 @@ public sealed class WaitingRequest /// Optional pull request priority group metadata. public PriorityGroup? PriorityGroup { get; set; } + + public bool RecycleIfDone() + { + if (N > 0 || MaxBytes > 0 && B < MaxBytes) + return false; + + Recycle(); + return true; + } + + public void Recycle() + { + Subject = string.Empty; + Reply = null; + N = 0; + D = 0; + NoWait = 0; + Expires = null; + MaxBytes = 0; + B = 0; + PriorityGroup = null; + } +} + +public sealed class WaitingDelivery +{ + public string Reply { get; set; } = string.Empty; + public ulong Sequence { get; set; } + public DateTime Created { get; set; } = DateTime.UtcNow; + + public void Recycle() + { + Reply = string.Empty; + Sequence = 0; + Created = DateTime.UnixEpoch; + } } /// @@ -681,6 +717,8 @@ public sealed class WaitQueue } private static int PriorityOf(WaitingRequest req) => req.PriorityGroup?.Priority ?? int.MaxValue; + + public static WaitQueue NewWaitQueue(int max = 0) => new(max); } /// diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/ConsumerStateTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/ConsumerStateTests.cs index 3f51cf8..13e06cc 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/ConsumerStateTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/ConsumerStateTests.cs @@ -78,4 +78,52 @@ public sealed class ConsumerStateTests state.Delivered.Stream.ShouldBe(100UL); state.AckFloor.Stream.ShouldBe(99UL); } + + [Fact] + public void StoreStateAndInfoSamplingAndFiltering_ShouldBehave() + { + var consumer = CreateConsumer(); + var state = new ConsumerState + { + Delivered = new SequencePair { Consumer = 11, Stream = 22 }, + AckFloor = new SequencePair { Consumer = 10, Stream = 21 }, + Pending = new Dictionary { [22] = new Pending { Sequence = 11, Timestamp = 1 } }, + }; + + consumer.ApplyState(state); + consumer.SetStoreState(state); + consumer.WriteStoreState().Delivered.Stream.ShouldBe(22UL); + consumer.WriteStoreStateUnlocked().Delivered.Stream.ShouldBe(22UL); + consumer.ReadStoredState().Delivered.Stream.ShouldBe(22UL); + + consumer.InitialInfo().Stream.ShouldBe("S"); + consumer.ClearInitialInfo(); + consumer.Info().Name.ShouldBe("D"); + consumer.InfoWithSnap(state).Delivered.Stream.ShouldBe(22UL); + var (info, reply) = consumer.InfoWithSnapAndReply("r", state); + info.Stream.ShouldBe("S"); + reply.ShouldBe("r"); + + consumer.SignalNewMessages(); + consumer.UpdateConfig(new ConsumerConfig { Durable = "D", SampleFrequency = "100%", FilterSubject = "foo.*", AckPolicy = AckPolicy.AckExplicit }); + consumer.ShouldSample().ShouldBeTrue(); + consumer.SampleAck("reply").ShouldBeTrue(); + consumer.ProcessAckMsg(22, 11, 2, "reply", doSample: true).ShouldBeTrue(); + consumer.IsFiltered("foo.bar").ShouldBeTrue(); + consumer.NeedAck().ShouldBeTrue(); + } + + [Fact] + public void NextReqFromMsg_ShouldParseBatchAndJson() + { + var (simple, simpleErr) = NatsConsumer.NextReqFromMsg("5"u8); + simpleErr.ShouldBeNull(); + simple.ShouldNotBeNull(); + simple!.Batch.ShouldBe(5); + + var (jsonReq, jsonErr) = NatsConsumer.NextReqFromMsg("{\"batch\":2,\"expires\":\"00:00:01\"}"u8); + jsonErr.ShouldBeNull(); + jsonReq.ShouldNotBeNull(); + jsonReq!.Batch.ShouldBe(2); + } } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/WaitQueueTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/WaitQueueTests.cs index 215360b..dd6e937 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/WaitQueueTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/WaitQueueTests.cs @@ -48,4 +48,39 @@ public sealed class WaitQueueTests q.Cycle(); q.Peek()!.Reply.ShouldBe("1b"); } + + [Fact] + public void WaitingRequestRecycle_AndWaitQueueFactory_ShouldBehave() + { + var request = new WaitingRequest + { + Subject = "s", + Reply = "r", + N = 0, + D = 1, + MaxBytes = 10, + B = 10, + PriorityGroup = new PriorityGroup { Group = "g", Priority = 1 }, + }; + + request.RecycleIfDone().ShouldBeTrue(); + request.Subject.ShouldBe(string.Empty); + request.Reply.ShouldBeNull(); + + var q = WaitQueue.NewWaitQueue(max: 3); + q.ShouldNotBeNull(); + q.IsFull(3).ShouldBeFalse(); + } + + [Fact] + public void WaitingDeliveryRecycle_ShouldClearState() + { + var wd = new WaitingDelivery { Reply = "r", Sequence = 42, Created = DateTime.UtcNow }; + + wd.Recycle(); + + wd.Reply.ShouldBe(string.Empty); + wd.Sequence.ShouldBe(0UL); + wd.Created.ShouldBe(DateTime.UnixEpoch); + } } diff --git a/porting.db b/porting.db index 479ad4bd56eacca8872d394b13742e8b4a23af32..b8fa0aaf452f521c4c9f8be965c0001b7c41d4ce 100644 GIT binary patch delta 3030 zcmZwI4QvzV9R~1w-#go9`}JoMlf+*!g(gtQ2O)ukl%$k=(GO^vW(86PG)aq=MN--+?5MKm~*@2a`aoY}$rkYpbnlv#HRjVjYz#L#H*Z(6lF)*4R2n z`bp3K-sjGDzB{M0vrq1X(}Q{J(knxG+E5kPu<=5E(Hl_XPI$PnEpXS|UiLxS9 z$ydzsw0!IJA3V2|2BYk2ibh!ub>C+FS^RTpVN-KUb9;Mp6CcMEh)rXdoYYvu(-T{x z39S*mb+E`ahtA(-1=Obrwd8hp*H^+{7)P9pq$EG8gZpEe{O#g^Y zgP8stn+7mlicS5PCSp?`rs3Gsi>WI%_1HZ(*{iX!8;dV#r>9x{9QJR`b|=1#hp$z8P3 z;&HOSX}tmd{1fDC(+2 zr!gd(o{no*+j!AK7RRXc6&IY+PxD(j^19ha$n-x`)bvktoq4sn*z&1mN?oItIle~i z4ZcrSh358cE!%Kh{N0=Jm>s9g3H~Fh3V2*>j7BGT8|Np;ca~SV z`Qe1-NoqaI_fc2S<8t*RtaNLsZ}BM>;sXhr`xB-<5|g|*#E&Fw9ZHx6wat_KXUxrq z6E+{E{Q;gy?sK@=Qz;%N=Ofe^;C`*+94|B}xTJa-zrY(gKekuB$Uz1LOkf5Js9*)# zG4-OoE@BO;n=GfypPH^Kca%r@+pJNlar{dB+RFNhY(O{*pDkY`)`PvWDgoDwY->k;U<$#mU_(UXb`MOW!GCDY+ zE}(bHg^eoGMd02tZA%xAQ{6NE2yLEJt@Lqv{LbXc5QoTHCl=6^3^6}$x4Ri)jNWek zcDKJ|qB3(zj+s;1_3)mEM`G0uD$f@_`pho`<@-em&HZTR^e+b-v?d_z)bCIDu&aJi zPpjqyBNRQOs^ks|g&qqeR9+5<*2GFM?xaeC;xZkI-~fY-!b;g8krTIC;}#up`+nk1 zxFDZLJWpO1K5f7w7RK%NzDKlB;eHXM9*@Y4TX@YY#_7;sl^|_Q5%@{{_J&Yz3QqF9 zj_^0vQIIOK@2&1_3<+?6Q)>*lM&7lkC#83|Q=$VOZ}ADWOwCoZRIjR7{;R1`*H4(D zTrrhWf7Zry?RU3bZ6?|`!Togh0NU_01`l{41yUgmd@u*nApw&au=Bv-EEJ#otZ zi0w72S1q^Qy3m{$3rCRXC% z`K%^0Kp8v;E1?`J z;2~HA55sDxges_p8mNUe@CZB#kHO=x7S_QNPzO&!Jv;>s@Ev#>*24za2#xRzY=ZB? zX4nE-;aO;cW-WJZ%lkJ<{PN3^tHXKIx!!TrQ7$G#w*8#F);4APp7pNvm)4MaO0BTG zYuRf4!u(TnnrYOuO!=d-UcNQ+U^q5t2W}{za9?X*g;l~o*0pKng)Plnw(n?f?%*eB z%ago>71O~H?$0bP&0o^mS8lAGNK#vAtc@qBJ!q_rC8?DeYsZt+Rv2r?lGI9#wbA%m z#_W}r7;EGidbW^m)N@PXm$KYg9gaWZH{abdW9?{?TCuTqBuTBvSQ|=GTWYKgCaDz~ zYXeDYON_PtB((x#tuIL}-&pHSQp+>edXm&~jkWG1wZ+ET;rN;|bNjXIvgbaO{|~h- Bqs0IK delta 1860 zcmY+^Yfw~W9LMo}o^$pF&pBroxh}hVSTW@y3uq`I2BIQrDk4(e4Mk!S#%rTFgHRFP z%1gxGam>(hwA3a-hNI17g<6wCUd9+0T->Y%kHot=2~V7hTIoi%^Ru26Ol+r{o1U-htYE@>rCA$uRoqQzI1Mn~`+yq2%x z<-C*^@@$?W?=f=w*ce9p?y>{)`xs47Q4ya@`?55ZPTysj#=yN&%`{=)3t#bKMBQWRgiZhbj# zvREwbX1#?vX4W25i&?u-&1UUF?J#R6>J_uzL~StZ4OER;J5bBb+K!rU);3hJSzA$s zsJwE%1(|COHljwE^*Soati33|S%iu-s~Ht$*4su`4bPeogMr7~!zG8bSSprn(tvcw z+GqWN9IgBgc|PL3iOOHr$M)OGul5^!uX0nltaK{v$`QWJG0l-{E3r+ojd7AY#WjSw zAK-@>-o{tCq~HE~XwuJgq>b0m%Ra8s(33oenmpV~8&2|Cs`c^|BcYvNWX)E$moD^U ziP#R@3)g^_YE*adjf_jTsI_175I@BWDQ7_Q8p}`d0xRYBYjfz~CtTsujYh{A4id;< z1smAG0Zs@z>Npc#5>Rr)rtq$CtMidlb5z-StQ+MDX+Ljb-7MNtq`6cI)CF|P^N6)n zzS8NVT~?vecOH>QuGO+iYP_hVJ!@4T6?;W6GkMwxEm|Mv;SSm+iCqt3qrKFUAqRjDaL z1VZ0*Mu;Gz)fd8(>2e;PNsJWU&@M9~#SFSP)gGXw9S$d*iwr%YI;$EIBP?uey`5%v zd1L5OyTe9tQFx^_KVfG~)~G#daA)d?67SQYZ$n==sV!P0hnD{(S}dd855rR_7>j-D zV+NO}))+A-bZ!Z;A~Q63Ni3E(&k|!U>W>w1=IfI$PMoFk&#gY{ovAAn<`GHe6OD`f z0u)e##2=g1K0aCpX@kcDW&e@%7u+(0~)x%4G}N|A|VQ* zAqHY04m=PKUho-Z1ODSv64gx>)lzNIg3@umg$=V*i-*D&)`vUhcQAXZEy-%9%nUZw zSwFfV(rsa*lPUjxcr>Nw3A^#}G<}^+cPHv!(xLN;k8nsT{WwV<8allpllA?fpZMBj zJ;NMxlr+R+mGM(4^EOslpRdoOt8u!o`HOHtn5EH;o!+5zxlFcF+64Wx(7JV3y&A<8 z=z|w|T7mvr=!PWNg_8yf1~+uBYKX-R8DB4%s~cH``Vv{g|HMi2#WQ^Y&Z0=q6ETE0 z&!)?Jta0YX8IQsplsH`y!E&fDg0oh1_eR*o2#dB>y`|1mx|BT8 SCPo