From a805af1beaf1794a05f091883838516b56851877 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 23:47:48 -0500 Subject: [PATCH] batch37 task5 implement group D consumer registry and interest state --- .../JetStream/NatsStream.Consumers.cs | 192 ++++++++++++++++++ .../JetStream/NatsStreamConsumersTests.cs | 73 ++++++- porting.db | Bin 6758400 -> 6758400 bytes 3 files changed, 264 insertions(+), 1 deletion(-) diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Consumers.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Consumers.cs index 2c8942f..13761e9 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Consumers.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Consumers.cs @@ -1,5 +1,6 @@ using System.Threading.Channels; using ZB.MOM.NatsNet.Server.Internal; +using ZB.MOM.NatsNet.Server.Internal.DataStructures; namespace ZB.MOM.NatsNet.Server; @@ -118,4 +119,195 @@ internal sealed partial class NatsStream foreach (var consumer in snapshot) consumer.Stop(); } + + internal StoredMsg? GetMsg(ulong seq) + { + StoredMsg? result = null; + if (Store == null) + return result; + + var loaded = Store.LoadMsg(seq, new StoreMsg()); + if (loaded == null) + return result; + + result = new StoredMsg + { + Subject = loaded.Subject, + Sequence = loaded.Seq, + Header = loaded.Hdr, + Data = loaded.Msg, + Time = DateTimeOffset.FromUnixTimeMilliseconds(loaded.Ts / 1_000_000L).UtcDateTime, + }; + return result; + } + + internal List GetConsumers() + { + lock (_consumersSync) + { + return [.. _consumerList]; + } + } + + internal int NumPublicConsumers() + { + lock (_consumersSync) + { + return _consumerList.Count(c => !c.Config.Direct); + } + } + + internal List GetPublicConsumers() + { + lock (_consumersSync) + { + return [.. _consumerList.Where(c => !c.Config.Direct)]; + } + } + + internal List GetDirectConsumers() + { + lock (_consumersSync) + { + return [.. _consumerList.Where(c => c.Config.Direct)]; + } + } + + internal void CheckInterestState() + { + if (!IsInterestRetention() || Store == null) + return; + + var consumers = GetConsumers(); + if (consumers.Count == 0) + return; + + ulong floor = ulong.MaxValue; + foreach (var consumer in consumers) + { + var ack = Interlocked.Read(ref consumer.AckFloor); + if (ack > 0 && (ulong)ack < floor) + floor = (ulong)ack; + } + + if (floor != ulong.MaxValue) + Store.Compact(floor); + } + + internal bool IsInterestRetention() => Config.Retention != RetentionPolicy.LimitsPolicy; + + internal int NumConsumers() + { + lock (_consumersSync) + { + return _consumerList.Count; + } + } + + internal void SetConsumer(NatsConsumer consumer) + { + ArgumentNullException.ThrowIfNull(consumer); + + lock (_consumersSync) + { + _consumers[consumer.Name] = consumer; + if (_consumerList.All(c => !ReferenceEquals(c, consumer))) + _consumerList.Add(consumer); + } + } + + internal void RemoveConsumer(NatsConsumer consumer) + { + ArgumentNullException.ThrowIfNull(consumer); + + lock (_consumersSync) + { + _consumers.Remove(consumer.Name); + _consumerList.RemoveAll(c => ReferenceEquals(c, consumer)); + } + } + + internal void SwapSigSubs(NatsConsumer consumer, string[]? newFilters) + { + _ = consumer; + _ = newFilters; + } + + internal NatsConsumer? LookupConsumer(string name) + { + if (string.IsNullOrWhiteSpace(name)) + return _consumers.GetValueOrDefault(string.Empty); + + lock (_consumersSync) + { + return _consumers.GetValueOrDefault(name); + } + } + + internal int NumDirectConsumers() + { + lock (_consumersSync) + { + return _consumerList.Count(c => c.Config.Direct); + } + } + + internal StreamState StateWithDetail(bool details) + { + _ = details; + return State(); + } + + internal bool PartitionUnique(string name, string[] partitions) + { + lock (_consumersSync) + { + foreach (var partition in partitions) + { + foreach (var existing in _consumerList) + { + if (existing.Name == name) + continue; + + var filters = existing.Config.FilterSubjects ?? + (string.IsNullOrWhiteSpace(existing.Config.FilterSubject) ? [] : [existing.Config.FilterSubject!]); + + foreach (var filter in filters) + { + if (SubscriptionIndex.SubjectsCollide(partition, filter)) + return false; + } + } + } + } + + return true; + } + + internal bool PotentialFilteredConsumers() + { + var subjects = Config.Subjects ?? []; + if (subjects.Length == 0) + return false; + + lock (_consumersSync) + { + if (_consumerList.Count == 0) + return false; + } + + if (subjects.Length > 1) + return true; + + return SubscriptionIndex.SubjectHasWildcard(subjects[0]); + } + + internal bool NoInterest(ulong seq, NatsConsumer? observingConsumer) + { + _ = seq; + lock (_consumersSync) + { + return _consumerList.All(c => ReferenceEquals(c, observingConsumer)); + } + } } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamConsumersTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamConsumersTests.cs index 6b0f4e0..1090410 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamConsumersTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamConsumersTests.cs @@ -58,13 +58,84 @@ public sealed class NatsStreamConsumersTests Should.NotThrow(stream.ResetAndWaitOnConsumers); } - private static NatsStream CreateStream() + [Fact] + public void SetConsumer_LookupAndCounts_ReturnExpectedValues() + { + var stream = CreateStream(); + var standard = new NatsConsumer("S", new ConsumerConfig { Name = "c1", Direct = false }, DateTime.UtcNow); + var direct = new NatsConsumer("S", new ConsumerConfig { Name = "c2", Direct = true }, DateTime.UtcNow); + + stream.SetConsumer(standard); + stream.SetConsumer(direct); + + stream.NumConsumers().ShouldBe(2); + stream.NumPublicConsumers().ShouldBe(1); + stream.NumDirectConsumers().ShouldBe(1); + stream.LookupConsumer("c2").ShouldBe(direct); + } + + [Fact] + public void GetMsg_WhenStored_ReturnsStoredMessage() + { + var stream = CreateStream(); + stream.SetupStore(null).ShouldBeNull(); + stream.Store!.StoreMsg("events", null, [1, 2], ttl: 0); + + var message = stream.GetMsg(1); + + message.ShouldNotBeNull(); + message!.Subject.ShouldBe("events"); + message.Sequence.ShouldBe(1UL); + } + + [Fact] + public void PartitionUnique_WithCollidingFilters_ReturnsFalse() + { + var stream = CreateStream(); + var existing = new NatsConsumer("S", new ConsumerConfig { Name = "existing", FilterSubject = "orders.*" }, DateTime.UtcNow); + stream.SetConsumer(existing); + + var unique = stream.PartitionUnique("new", ["orders.created"]); + + unique.ShouldBeFalse(); + } + + [Fact] + public void PotentialFilteredConsumers_WithWildcardSubjectAndConsumer_ReturnsTrue() + { + var stream = CreateStream(["orders.>"]); + stream.SetConsumer(new NatsConsumer("S", new ConsumerConfig { Name = "c1" }, DateTime.UtcNow)); + + stream.PotentialFilteredConsumers().ShouldBeTrue(); + } + + [Fact] + public void NoInterest_WithOnlyObservingConsumer_ReturnsTrue() + { + var stream = CreateStream(); + var observer = new NatsConsumer("S", new ConsumerConfig { Name = "observer" }, DateTime.UtcNow); + stream.SetConsumer(observer); + + stream.NoInterest(1, observer).ShouldBeTrue(); + } + + [Fact] + public void IsInterestRetention_WhenPolicyIsInterest_ReturnsTrue() + { + var stream = CreateStream(retention: RetentionPolicy.InterestPolicy); + + stream.IsInterestRetention().ShouldBeTrue(); + } + + private static NatsStream CreateStream(string[]? subjects = null, RetentionPolicy retention = RetentionPolicy.LimitsPolicy) { var account = new Account { Name = "A" }; var config = new StreamConfig { Name = "S", Storage = StorageType.MemoryStorage, + Subjects = subjects ?? ["events.>"], + Retention = retention, }; return new NatsStream(account, config, DateTime.UtcNow); } diff --git a/porting.db b/porting.db index c81d811e256e9081530efde3caa74a93a881eaa2..66904ce90565d7fc2d30984e741804bce92eb3ba 100644 GIT binary patch delta 1545 zcmY+^NlX(_9LMqLw8&EWR~Dh$lu}BIf`B3*?%=+G;J$;lpoUaK(8PnXL_<6nO||{Q z$f025fGi%2V`7ZPgEx&ao;;Z7L9awjjENe5o#N0rd=B6K-%MWLOQ!e5u(3Dps^O7n zXiYVTg+WuP5OIlqrMXqSZs7B6qDT4ACO%2;5d>2x5Nr>2bp->KlFWcWTKO$TNz}@R zl^eD4qH47Aph~nVMA@}+qcXKB5Cr32tz5zZ!8oI(gyCncoT&F&IZ&^(vZJ18WkZc? zWkn4ur#rvoDLr2v?G(u%_zbkbSDll-1(<*W78JW2^N{i>ltVHP=_m3Jq z(H*acqc& z6B~BRvl~u4pC#wvdB26*p5pm)@nQ33M=3v-$ual5@>Qn3`H45*9H*kW2|pjFE1cbt zf^(~Pm^t`>mZ?+q-1^P359e0bw~aq_;{7ilP=w$4^Jf?LOw8UzVlQQ~7kjC*B-Xy2 zpjsYnN?eGUD<*aKRxNutE5~T#-bq@+EhA13yN4)^zfR5`PvSnDQ`FDjR~M_7Z{}B1 zDf2IlM!ve^HY{-FJSO|HL4_2^&9UR~U30zPBZYTr~ z6oD6t!3TaQfkm(wN}&vvKsi)EB`k$yPzB4O8fu^xRzMxBgjKK_*1%d=2kW688eju# zgiWv+w!l`{2HRl=?1Wv=2u-jXnxO^uz+Tt~`{4i_ghOx`j=)hk2FKw9v_c!4RQ^n# zGR>sU8x($vB=D^T9pzjT3w-0)L(!f|jVa@_K4sYS!}KNcMqJBhM*`iCM+09(XZ+)J zG1zsc{Y?DBrYk>-_XBc|?l+&Vyt?w}s!&&MT@~ocr7KBSPRpL;4LWo~yRK}yvg#^d RS9t@Wu>b^>&{%u;(m%PFNBsZ* delta 1083 zcmYMzSx8i26bJB`@7}B4ra9`klsR|QQOD9sOIvI+OIyu$+f8jI6~t(epr9HdvY^5; z=h)4_q@>*PAsith>!FtlM0)bUEYO0WB%-JY`p*nQ55Jf5eP{X3<$8wOC6C_)<6~j0 zFwJxt9?Dq;8l%^ovoosLmt7$)8+~JyVo2qa_r$17pG1wXrcStpY?d_9G+yr~feZ?S zK{!N!3Nye2Gr{Z|ucy1I+}UfjbCg9=xa$UG@W3mHq&(fmIXPMa|GYu#*-{d&@s0%= zv8#oW12@!aU9%XBxs5U0QJ5IZ11;3RRnHa8dxuP1a+A^m*ZAB`I>asSR69>Rrf?Rm zl*tnf68ToNlFi8Oq&XUQJVlDfixYou9`e!!c2BBy?|CAwW3mV1?M#k2*C*{;%2G9hzvko6d6Q^$0AdFww-W9$FlDoNCe?SWbT9 zW`19XqAz-=l{<@)Qg}E?3gdIVq0#5-Y3QAA4qhASrDbgSqXu?u;_SZA*I1(4IXXs4 zU{4>dW>IVoK5nwlzsz51B-{GQ6MWostYqSs{p1J?!oT~ehj;zJ`ja=J^P5MM%G1xR zu}$&-K_o;$G*}=8{AY0x53^u4B)}X<1S@D@gCt0X6tF`oq(M43AOq&YJjjGBm=6nJ zA!I`iEP}<53wf{vmclYv4*9SGR>CS+4QrqP)#X7!n*V$lKrj-xMTjixPq;xAn{w_}`?P5?o5W2YH|ASAt20NWoLRQ)hR<4Pw pPUpz`&Z`vGW*9x?a@QWObvexs`C44{NYj}vC%AlPx{hBw{SSjui=zMl