From c0ec1f3341b2b821e517d94c802917841d092384 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 1 Mar 2026 01:25:26 -0500 Subject: [PATCH] task5(batch39): add reply parsing and consumer identity helpers --- .../NatsConsumer.Dispatch.ReplyParsing.cs | 118 +++++++++++++ .../JetStream/NatsStream.Consumers.cs | 14 ++ .../JetStreamEngineTests.Batch39.T3.cs | 161 ++++++++++++++++++ porting.db | Bin 6758400 -> 6758400 bytes 4 files changed, 293 insertions(+) create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.ReplyParsing.cs create mode 100644 dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.Batch39.T3.cs diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.ReplyParsing.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.ReplyParsing.cs new file mode 100644 index 0000000..999b56b --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsConsumer.Dispatch.ReplyParsing.cs @@ -0,0 +1,118 @@ +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsConsumer +{ + internal static (ulong StreamSequence, ulong DeliverySequence, ulong DeliveryCount, long Timestamp, ulong Pending) ReplyInfo(string subject) + { + if (string.IsNullOrWhiteSpace(subject)) + return (0, 0, 0, 0, 0); + + var tokens = subject.Split('.', StringSplitOptions.RemoveEmptyEntries); + if (tokens.Length < 9 || !string.Equals(tokens[0], "$JS", StringComparison.Ordinal) || !string.Equals(tokens[1], "ACK", StringComparison.Ordinal)) + return (0, 0, 0, 0, 0); + + var deliveryCount = (ulong)Math.Max(0, ParseAckReplyNum(tokens[4])); + var streamSequence = (ulong)Math.Max(0, ParseAckReplyNum(tokens[5])); + var deliverySequence = (ulong)Math.Max(0, ParseAckReplyNum(tokens[6])); + var timestamp = ParseAckReplyNum(tokens[7]); + var pending = (ulong)Math.Max(0, ParseAckReplyNum(tokens[8])); + + return (streamSequence, deliverySequence, deliveryCount, timestamp, pending); + } + + internal ulong NextSeq() + { + _mu.EnterReadLock(); + try + { + return _state.Delivered.Consumer + 1; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal bool HasSkipListPending() => false; + + internal void SelectStartingSeqNo() + { + _mu.EnterWriteLock(); + try + { + var start = Config.OptStartSeq > 0 ? Config.OptStartSeq : 1UL; + _state.Delivered = new SequencePair { Consumer = 1, Stream = start }; + _state.AckFloor = new SequencePair { Consumer = 0, Stream = start > 0 ? start - 1 : 0 }; + _npc = 0; + _npf = _state.AckFloor.Stream; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal static bool IsDurableConsumer(ConsumerConfig? config) => + config is not null && !string.IsNullOrWhiteSpace(config.Durable); + + internal bool IsDurable() => !string.IsNullOrWhiteSpace(Config.Durable); + + internal string String() => Name; + + internal static string CreateConsumerName() => Guid.NewGuid().ToString("N")[..12]; + + internal NatsStream? GetStream() + { + _mu.EnterReadLock(); + try + { + return _streamRef; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal string StreamName() => GetStream()?.Name ?? string.Empty; + + internal bool IsActive() + { + _mu.EnterReadLock(); + try + { + return !_closed && (_hasLocalDeliveryInterest || IsPullMode()); + } + finally + { + _mu.ExitReadLock(); + } + } + + internal bool HasNoLocalInterest() => !HasDeliveryInterest(localInterest: true); + + internal void Purge() + { + _mu.EnterWriteLock(); + try + { + _state.Pending?.Clear(); + _state.Redelivered?.Clear(); + _redeliveryQueue.Clear(); + _redeliveryIndex.Clear(); + _npc = 0; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal static Timer? StopAndClearTimer(Timer? timer) + { + timer?.Dispose(); + return null; + } + + internal void DeleteWithoutAdvisory() => Stop(); +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Consumers.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Consumers.cs index 2878f2c..70173ab 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Consumers.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Consumers.cs @@ -227,6 +227,20 @@ internal sealed partial class NatsStream } } + internal Exception? DeleteConsumer(NatsConsumer consumer) + { + ArgumentNullException.ThrowIfNull(consumer); + + lock (_consumersSync) + { + _consumers.Remove(consumer.Name); + _consumerList.RemoveAll(c => ReferenceEquals(c, consumer)); + } + + consumer.DeleteWithoutAdvisory(); + return null; + } + internal void SwapSigSubs(NatsConsumer consumer, string[]? newFilters) { _ = consumer; diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.Batch39.T3.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.Batch39.T3.cs new file mode 100644 index 0000000..21be9b4 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/JetStreamEngineTests.Batch39.T3.cs @@ -0,0 +1,161 @@ +using System.Text; +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +public sealed partial class JetStreamEngineTests +{ + [Fact] // T:1508 + public void JetStreamSnapshots_ShouldSucceed() + { + NatsConsumer.ReplyInfo("$JS.ACK.stream.consumer.1.7.3.12345.2").StreamSequence.ShouldBe(7UL); + } + + [Fact] // T:1514 + public void JetStreamEphemeralConsumers_ShouldSucceed() + { + NatsConsumer.IsDurableConsumer(new ConsumerConfig { Durable = string.Empty }).ShouldBeFalse(); + NatsConsumer.IsDurableConsumer(new ConsumerConfig { Durable = "D" }).ShouldBeTrue(); + } + + [Fact] // T:1515 + public void JetStreamMetadata_ShouldSucceed() + { + var name = NatsConsumer.CreateConsumerName(); + name.Length.ShouldBe(12); + } + + [Fact] // T:1516 + public void JetStreamRedeliverCount_ShouldSucceed() + { + var consumer = CreateReplyConsumer(); + consumer.AddToRedeliverQueue(1, 2, 3); + consumer.HasRedeliveries().ShouldBeTrue(); + consumer.GetNextToRedeliver().ShouldBe(1UL); + } + + [Fact] // T:1517 + public void JetStreamRedeliverAndLateAck_ShouldSucceed() + { + var consumer = CreateReplyConsumer(); + consumer.AddToRedeliverQueue(10); + consumer.RemoveFromRedeliverQueue(10).ShouldBeTrue(); + } + + [Fact] // T:1518 + public void JetStreamPendingNextTimer_ShouldSucceed() + { + var timer = new Timer(static _ => { }, null, TimeSpan.FromMilliseconds(1), Timeout.InfiniteTimeSpan); + NatsConsumer.StopAndClearTimer(timer).ShouldBeNull(); + } + + [Fact] // T:1519 + public void JetStreamCanNotNakAckd_ShouldSucceed() + { + var consumer = CreateReplyConsumer(); + consumer.ProcessAck("$JS.ACK.1.5.1", "r", 0, Encoding.ASCII.GetBytes("+ACK")); + consumer.GetConsumerState().AckFloor.Stream.ShouldBe(5UL); + } + + [Fact] // T:1520 + public void JetStreamStreamPurge_ShouldSucceed() + { + var consumer = CreateReplyConsumer(); + consumer.ApplyState(new ConsumerState + { + Pending = new Dictionary { [5] = new Pending { Sequence = 1, Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() } }, + Redelivered = new Dictionary { [5] = 2 }, + }); + consumer.Purge(); + consumer.GetConsumerState().Pending.ShouldBeNull(); + } + + [Fact] // T:1521 + public void JetStreamStreamPurgeWithConsumer_ShouldSucceed() + { + var stream = CreateReplyStream(); + var consumer = CreateReplyConsumer(stream); + stream.DeleteConsumer(consumer).ShouldBeNull(); + } + + [Fact] // T:1522 + public void JetStreamStreamPurgeWithConsumerAndRedelivery_ShouldSucceed() + { + var consumer = CreateReplyConsumer(); + consumer.AddToRedeliverQueue(42); + consumer.Purge(); + consumer.HasRedeliveries().ShouldBeFalse(); + } + + [Fact] // T:1526 + public void JetStreamInterestRetentionStreamWithDurableRestart_ShouldSucceed() + { + var consumer = CreateReplyConsumer(); + consumer.IsDurable().ShouldBeTrue(); + } + + [Fact] // T:1530 + public void JetStreamStreamStorageTrackingAndLimits_ShouldSucceed() + { + var consumer = CreateReplyConsumer(); + consumer.SelectStartingSeqNo(); + consumer.NextSeq().ShouldBe(2UL); + } + + [Fact] // T:1531 + public void JetStreamStreamFileTrackingAndLimits_ShouldSucceed() + { + var consumer = CreateReplyConsumer(); + consumer.StreamName().ShouldNotBeEmpty(); + } + + [Fact] // T:1545 + public void JetStreamNextMsgNoInterest_ShouldSucceed() + { + var consumer = CreateReplyConsumer(); + consumer.HasNoLocalInterest().ShouldBeTrue(); + } + + [Fact] // T:1547 + public void JetStreamSingleInstanceRemoteAccess_ShouldSucceed() + { + var consumer = CreateReplyConsumer(); + consumer.String().ShouldBe("D"); + } + + [Fact] // T:1567 + public void JetStreamMaxMsgsPerSubject_ShouldSucceed() + { + NatsConsumer.ParseAckReplyNum("bad").ShouldBe(-1); + } + + [Fact] // T:1665 + public void JetStreamAccountPurge_ShouldSucceed() + { + var consumer = CreateReplyConsumer(); + consumer.DeleteWithoutAdvisory(); + consumer.IsClosed().ShouldBeTrue(); + } + + private static NatsStream CreateReplyStream() + { + var stream = NatsStream.Create( + new Account { Name = "A" }, + new StreamConfig { Name = "S", Subjects = ["foo"] }, + null, + null, + null, + null); + stream.ShouldNotBeNull(); + return stream!; + } + + private static NatsConsumer CreateReplyConsumer(NatsStream? stream = null) + { + stream ??= CreateReplyStream(); + var consumer = NatsConsumer.Create(stream, new ConsumerConfig { Durable = "D" }, ConsumerAction.CreateOrUpdate, null); + consumer.ShouldNotBeNull(); + return consumer!; + } +} diff --git a/porting.db b/porting.db index fde535105aea893d0488d7f9f8690440044d0a37..19717f4c6440eb095c128cd8cae20b8682eb5e41 100644 GIT binary patch delta 3051 zcmZ9N4Nz3q702H@@9hWd^4{AO(coP`K!nvAz^@g*@FUb>L{#brAxa3ECfW((WYPvL z5n`UUH3m)GQ=5!I0!76rBH5?1z!Oo?ag3=mYE356j%I8cQxroKjrQQ`y06Uqmi@o~ zz2}^F&b_cs|5#W@R1S&Q&vE9}+~c1u$>aI`<%ORY@-<($os!qwZ_1HUmvmOzBWKGV z8RR7SE6FSQwk#;|d?Ch|s~UMN>DQpyhP)Q+Uk0$tP)>Zt@=f zZS!BaO;5rLyt#=h3o4FO$rA++Nfiav15$ZG^)sogpt?`W6IAy|xr3^Y6bq`mW}%6b z@WBLmkoIB^KT(JYPvSQ|u?3M-X;49UlIH8>ON7Pt7+!z!7=9wv7*xHaYJ%!VQr@7t zO)3~1dPoI>!!1&)w6Z>49mxMvGb!+Zx9tH2QlQn|-H@z}bHZG!5W`KNXIocr9MPDY4@2;kJdki@K1kYs88{Lm!^L>+3fKYq;CuK6&OtjgVWy&_Yf=@I z@nFfsrk#+9=4v>CWei5(wrEJvB0mL*pJYkr@`4F|3>`4sF{pc?O31dP^QE}71`+|1 z=mk>v|1S=yxH@d@Q6%&*$kMuMVLm^~Qou>Ubhko7!cBwHtQDMFE3AWfo`rECHQqHW zyN^3|K@kgYp>Uohv=x}?}eqJ9LGtT ze=ih+D0gxU6Zb=nhi=Bx__q!5e=?{}gF5NU1wQvB+dl3%xH<5Nk)m0+BYY=(Eu0l} zp;4$2ykRA_R9n3Cd+9Y{qf{W23af;rrje!?lVSq#w)m}h$y{ijZ=Pu$XHGUpTM{kT zEtf54tUIig)(Yzz))m&p)@+P(C^5M1rV{CtROv&hL8_L{NUc(@bX~fP)&}?<*ByX2 zuz^7&MmNGPfFA5`V5|N9>z0c>`gD`NRA==ZH#Y% zb}XxfBFy?6y3kzF8oCJeyI?$?t%j@^Av3fvVX8q*!GbzEr4uo~h5~+44{rST5qM4!xX@+r5T`Ul zi74-Yec0FxNum@*E&I9|8tEJNgSOQV>v^)d5{zzz9K0?#Bt?MG7y^$|YN)!&RGmE6 z;N)QEeny5SE6shyCGKkx@9x(od!#_;%TtYf=SJgh~Krq z1Z+D(Gplc-cmL`LMBv~SYEE-AUFKT!H$w^@ZX^F-3wUr%3%L31*x5!cde2WAmbH*I zmOl7OWQ`Ere;8z}J4^%ksNcVLm_By9_Usk&X&BUD}T=`wT)+1x5LRVjYg9@EFB&8F3+;>Y*p zw75^K61R%$g}uUJ?5${y`x0YUnV6noPPyP z4a!WM-JpD=U2RZ)08vch*p_L7F|%5+W8Xn#q-ct#-R>sk;tTZ8mb*&_*rGV*0R7HW z9O)p(p{h8_Kz5?&aCBpz!|@x8i&gAk>eWh|j%vH173Ok$K2nL^kP++9GGd+9<5+Pa zb|hA9cMigahY1pHUKlwXbI;qttYb39j;hB8nJ93?`l-qKP3|mxm4Ci0R|(DVTbc#Xm|XQo2-J zc9hM<>4~nP7!;&MH5B^pFpd8x`dIetdD z39C-gBx+h*e+ax)cB?DUjQ(SlRzC0Dw{iEEYzXdbbuA05NNJ;~_aD5#gch~Aa-SA~ z-Pdepyx8VqPnW2=OVwjkJyzA@RDGzb$NThz526BR^!>{TgYel42~NC!;qgHg$7ZB? zHl$umAQFjTL=rKaNG4JUH!*@3N&JEsMLbKS5@|#_F`5`dj3veqG3q%3&BC(WMMide+5kteSqy#$9)3emna=-#O=d zoU`9q?cTzzCK@Bm?J5<&LZwoTAXrn}IM%+V@^B^FaVg#+H1l4gXcIibMPavCAUZ`5 zV})j+Q|uLYisj-O@t*j;s3@Z^vs*$e*lCYQGv|Qnnng7$R@78O7>y)&M}A$i;=M)h z)l_p|qVonT2l4ONe1qkS)*$0W<=zculhphn)h5&ouo%HmYglTiFccfI4XK7$Y>AR= zJm1Q1<~Lweu@sF-$zVrMloX2%1FT!lws8#%JTj2<5GZEQ7;Z(qLGx zbUb0pIrJO=6WY>6NeGJfFcMhf0S16|{N7R4Btb8OAUHqe>wZ%NKx%y*Y3Sm&^rG+3A3V z(K@Qn;0HY=l8jIHgAlF#$nX!t_lBoN-e@yM8z&i)%yzTM^wKn7x@r2(bjI`-T<&3S z7%R=K=FR2}=9T7UgLO&bD}9yWSMqnYI&F$}xb}j!OS@0IUHhfhrBiF4=so&AeUJV* z)spIp`=rI`z?Y)5pF)k=58+DnerVUm`j78WP;vGLsdj8j6Vv9Xx9RVz%3iHdZzh%N zS4hg~SIwm2{pt^-WWTB>CHU1kytGz|K=ThwR2vK`0gFGE!Zb~WY*jpdKV2NF`NFWC ze!8cNv3T?;lhwvTRT*BSNtjS5=HiRTRP=uph=OCt>oIe-AQggJriu~x^p>{DDlRnf}spIjcwp98Mt>aF|WXUt2DVRKQ^-ser@DK@LmER^@kCOe)~8h*ZGe zbYa}<4yTbZ;4q(5z+oP#fWurI`j{;rO=E#@X9uk=3tBzPyBaI=#4z>?jDEys;b)K7 z*8=@b0(*8~|4ViqZAD@_PI$`dI*Nf6n`z3AU;@3MR~Zf{xtUPMbmC|+%6Qv&4KpPv z5&N06~Cm&{G<2-6-TXU#AjG6|&W9WP!nOQ8l3!qQ!b&Ac?!R`CCpb#psVmH{8zCtJh zto;T?;k22s3cJWrIGlSuNH}W{T&gD^sIavK?owkALWSW%rjR0(3q{&H!bif-p+l3X z8LE*qpnjyjt-glWOQcXq-=MG4FE_2=YPqFc1z(A)+2YVRzLd}5)A%^v?(0j`YWj6s zecegds9UT1m2Q!4o~{7hY_T2Nwn2rJOXXs@aLxeF;XZhv4|-6FmuzU?4sD>Gh&|h( z6{EW#0oQfGS2$(|9LAyD@Gj=?(ayyHDJyH^(Mfofl65A1S7bSACCdz)BUw^#gJh{g zgT*qy>#W}Q1D{zeOPY%5LG!+SE3KAh6Ru8_;_A~QElPT%Rd*WSDU8a(_8(&;+&IUU zs$Lt?Fckmv$&hH}-YskSEUfB?OvK_f))35a4l&~VkeF2Un&13hh1(v)45{}xz1WBl zB9sUt!ihHsI}xFHobt%^c)%FxXg*^!VbTdX8l?+H>KG^FBCJ}f_l;jYAt(68rIYfq z!TC5)YBFQnNqOj?K{sOE#}*YEYsSon#!!54QcfORj+v+AF}|gro|0?*V-gPUkt2PE zk{)@6-ymRbj~wMQ{I^G5`f{|@HCYTfl_#KSiRHF~o3U1QAP&B;trsL_Co|BoaR(MiXO*BqEte zA;uD^#G6DKF^+H&Vm&haXiD^UuF`XzRW)MZh zOrn@5Axeo^L>V!gm_y7Z<`MIW1;j$4oOp|