From 6290b17a828a6065286722d7e5abfa4f57bb2ba7 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 23:45:07 -0500 Subject: [PATCH] batch37 task4 implement group C message carriers and consumer signaling --- .../JetStream/NatsStream.Consumers.cs | 121 ++++++++++++++++++ .../JetStream/StreamTypes.MessageCarriers.cs | 73 +++++++++++ .../JetStream/NatsStreamConsumersTests.cs | 71 ++++++++++ porting.db | Bin 6758400 -> 6758400 bytes 4 files changed, 265 insertions(+) create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Consumers.cs create mode 100644 dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamConsumersTests.cs diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Consumers.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Consumers.cs new file mode 100644 index 0000000..2c8942f --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Consumers.cs @@ -0,0 +1,121 @@ +using System.Threading.Channels; +using ZB.MOM.NatsNet.Server.Internal; + +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsStream +{ + private readonly object _consumersSync = new(); + private readonly Dictionary _consumers = new(StringComparer.Ordinal); + private List _consumerList = []; + private readonly IpQueue _sigQueue = new("js-signal"); + private readonly Channel _signalWake = Channel.CreateBounded(1); + private readonly Channel _internalWake = Channel.CreateBounded(1); + private readonly JsOutQ _outq = new(); + + internal static CMsg NewCMsg(string subject, ulong seq) + { + var msg = CMsg.Rent(); + msg.Subject = subject; + msg.Seq = seq; + return msg; + } + + internal void SignalConsumersLoop(CancellationToken cancellationToken = default) + { + while (!cancellationToken.IsCancellationRequested) + { + if (!_signalWake.Reader.TryRead(out _)) + { + Thread.Sleep(1); + continue; + } + + var messages = _sigQueue.Pop(); + if (messages == null) + continue; + + foreach (var msg in messages) + { + SignalConsumers(msg.Subject, msg.Seq); + msg.ReturnToPool(); + } + } + } + + internal void SignalConsumers(string subject, ulong seq) + { + _ = subject; + _ = seq; + + lock (_consumersSync) + { + _ = _consumerList.Count; + } + } + + internal static JsPubMsg NewJSPubMsg(string destinationSubject, string subject, string? reply, byte[]? hdr, byte[]? msg, NatsConsumer? consumer, ulong seq) + { + var pub = GetJSPubMsgFromPool(); + pub.Subject = destinationSubject; + pub.Reply = reply; + pub.Hdr = hdr; + pub.Msg = msg; + pub.Pa = new StoreMsg + { + Subject = subject, + Seq = seq, + Hdr = hdr ?? [], + Msg = msg ?? [], + Buf = [], + }; + pub.Sync = consumer; + return pub; + } + + internal static JsPubMsg GetJSPubMsgFromPool() => JsPubMsg.Rent(); + + internal void SetupSendCapabilities() + { + _ = _outq; + _signalWake.Writer.TryWrite(true); + } + + internal string AccName() => Account.Name; + + internal string NameLocked() => Name; + + internal void InternalLoop(CancellationToken cancellationToken = default) + { + while (!cancellationToken.IsCancellationRequested) + { + if (!_internalWake.Reader.TryRead(out _)) + { + Thread.Sleep(1); + continue; + } + + var messages = _sigQueue.Pop(); + if (messages == null) + continue; + + foreach (var msg in messages) + { + SignalConsumers(msg.Subject, msg.Seq); + msg.ReturnToPool(); + } + } + } + + internal void ResetAndWaitOnConsumers() + { + List snapshot; + lock (_consumersSync) + { + snapshot = [.. _consumerList]; + } + + foreach (var consumer in snapshot) + consumer.Stop(); + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.MessageCarriers.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.MessageCarriers.cs index 112ea30..21ce5f3 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.MessageCarriers.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.MessageCarriers.cs @@ -1,4 +1,5 @@ using System.Collections.Concurrent; +using ZB.MOM.NatsNet.Server.Internal; namespace ZB.MOM.NatsNet.Server; @@ -18,3 +19,75 @@ public sealed partial class InMsg Pool.Add(this); } } + +public sealed partial class CMsg +{ + private static readonly ConcurrentBag Pool = new(); + + internal static CMsg Rent() => Pool.TryTake(out var msg) ? msg : new CMsg(); + + internal void ReturnToPool() + { + Subject = string.Empty; + Msg = null; + Seq = 0; + Pool.Add(this); + } +} + +public sealed partial class JsPubMsg +{ + private static readonly ConcurrentBag Pool = new(); + + internal static JsPubMsg Rent() => Pool.TryTake(out var msg) ? msg : new JsPubMsg(); + + internal void ReturnToPool() + { + Subject = string.Empty; + Reply = null; + Hdr = null; + Msg = null; + Pa = null; + Sync = null; + Pool.Add(this); + } + + internal int Size() => + (Subject?.Length ?? 0) + + (Reply?.Length ?? 0) + + (Hdr?.Length ?? 0) + + (Msg?.Length ?? 0); +} + +public sealed class JsOutQ +{ + private readonly IpQueue _queue = new("js-outq"); + private bool _registered = true; + + public (int Len, Exception? Error) SendMsg(string reply, byte[] payload) + { + if (string.IsNullOrWhiteSpace(reply)) + return (0, new ArgumentException("reply is required", nameof(reply))); + + var msg = JsPubMsg.Rent(); + msg.Subject = reply; + msg.Msg = payload; + return Send(msg); + } + + public (int Len, Exception? Error) Send(JsPubMsg msg) + { + if (!_registered) + return (0, new InvalidOperationException("queue is unregistered")); + + return _queue.Push(msg); + } + + public void Unregister() + { + _registered = false; + _queue.Unregister(); + } + + internal JsPubMsg[]? Pop() => _queue.Pop(); +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamConsumersTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamConsumersTests.cs new file mode 100644 index 0000000..6b0f4e0 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamConsumersTests.cs @@ -0,0 +1,71 @@ +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +public sealed class NatsStreamConsumersTests +{ + [Fact] + public void NewCMsg_ReturnToPool_ClearsValues() + { + var msg = NatsStream.NewCMsg("orders.created", 22); + msg.Subject.ShouldBe("orders.created"); + msg.Seq.ShouldBe(22UL); + + msg.ReturnToPool(); + + msg.Subject.ShouldBe(string.Empty); + msg.Seq.ShouldBe(0UL); + } + + [Fact] + public void NewJSPubMsg_WithHeaderAndData_ReturnsSizedMessage() + { + var pub = NatsStream.NewJSPubMsg("inbox.x", "orders", "reply", [1, 2], [3, 4, 5], null, 12); + + pub.Subject.ShouldBe("inbox.x"); + pub.Size().ShouldBeGreaterThan(0); + } + + [Fact] + public void JsOutQ_SendThenUnregister_RejectsFutureSends() + { + var outq = new JsOutQ(); + var first = outq.SendMsg("inbox.1", [1, 2, 3]); + first.Error.ShouldBeNull(); + first.Len.ShouldBeGreaterThan(0); + + outq.Unregister(); + var second = outq.SendMsg("inbox.2", [4]); + second.Error.ShouldNotBeNull(); + } + + [Fact] + public void AccName_AndNameLocked_ReturnConfiguredValues() + { + var stream = CreateStream(); + + stream.AccName().ShouldBe("A"); + stream.NameLocked().ShouldBe("S"); + } + + [Fact] + public void SetupSendCapabilities_AndResetConsumers_DoNotThrow() + { + var stream = CreateStream(); + + Should.NotThrow(stream.SetupSendCapabilities); + Should.NotThrow(stream.ResetAndWaitOnConsumers); + } + + private static NatsStream CreateStream() + { + var account = new Account { Name = "A" }; + var config = new StreamConfig + { + Name = "S", + Storage = StorageType.MemoryStorage, + }; + return new NatsStream(account, config, DateTime.UtcNow); + } +} diff --git a/porting.db b/porting.db index 24ba7979e8661213cb2b00ee322fb5cacc288b37..c81d811e256e9081530efde3caa74a93a881eaa2 100644 GIT binary patch delta 1549 zcmY+^T}&KR6bJB`yEDV?!d_TdfeOo4q2N;Lf|S-(DPT(xYrAccDk|=_vwZl`!cr&& zQ=w@OX@ka^&|~Sg9x0GYG$FAvJ~fRt#)m#M#HR)Yd1wqqL)8Z#`rw5o)18O^!*Bj` z=bpKf%+RfvZD=hwr3pDrrAm_Y(3D%~FH^}vOQyKcyeXH^ZBt1? zbEXo7uA6d7ha_p-q(hXWrtCteOeF|)nbL#~n^J|EOesS3BR7vR5u1o2qKarD2_kk8 zhlo=oQ6x#kW!yZ*XBbvo&~osXZeDmX&1P3CD+61r(1`NJN5yOpSFx~%Z;Wf3dU&CwyR!T=;n4D?Bzvq5Ba*6 zzmIi=-eRon|ked42EU zQ{!r?B-ajSVD}O3K~FzFgY(51k5`n&ZS-Hic>CoFXAXu=@{+h>>?D5&PfyqiFz*uA zuy}yyK5mVJ13ZwAe_s=Gta@xtym6WP&3YR?xXkzAgJ^CEcEl{>Uy6zC){f)i6<%g8 zs4@aqxJSnC)0G13x@tM1iL3nKqXQH#?82Wu=W*i$aiCB)jXbaHOKz^Wd zpbDU8fGUBu0#yNR1KJMsEKmUGIiTl(UI3~F+5uDpR0~uG^ditspj|-qK)Zn&fL;P> z1bP{056~+>dx7==1%dVh9RNBA)CBY@P&3dWpx1z22WkO&1L#elw}4uK-UbTIQCO*w zmko6({EdvEP_k!i3kw+9a@d)`q$SK>RTnUr#**->#P-n#5UnTq~9q}b+uV-(9UZg*n+k?TcvVF=~FtC7G)3bSE?1i%CP2cs2*Ft z(2tj>tLn1)v-+d@ojRj3ZO2khm^(IA)N!hqEsTavE7-se<=`-u&r(|O9lOuVzn`aVV#iK4 zM(Aj1k{cy6H;&R~&iyPCpBSZm|1TO!v%)AHYR>F7Ne8Y}^ZAqHVb26z?IoFkw2^r?EjQhZk{9`2Wu>39G|4FdUp3)G+y_}HeR|(9i^DRZqhLh zKgI{f=ZkHMe`RhS(Wt4--l8^MxvN$fdvB3j;`?27%{-POKNl|G?skeEZQK=$IC%Lp z3KSQP*}SaIsA+-{Q!EV?Cdto1R!+r?D0qB8)&>G3O|7}l^p7Bu z9FlwFPUV_1uDq)ZDE&&e(!n1)x>_B7DZ>9N=?)m*-noyiY0C3YBUZ JlU4C)>%TC|m`?xz