diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Consumers.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Consumers.cs new file mode 100644 index 0000000..2c8942f --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.Consumers.cs @@ -0,0 +1,121 @@ +using System.Threading.Channels; +using ZB.MOM.NatsNet.Server.Internal; + +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsStream +{ + private readonly object _consumersSync = new(); + private readonly Dictionary _consumers = new(StringComparer.Ordinal); + private List _consumerList = []; + private readonly IpQueue _sigQueue = new("js-signal"); + private readonly Channel _signalWake = Channel.CreateBounded(1); + private readonly Channel _internalWake = Channel.CreateBounded(1); + private readonly JsOutQ _outq = new(); + + internal static CMsg NewCMsg(string subject, ulong seq) + { + var msg = CMsg.Rent(); + msg.Subject = subject; + msg.Seq = seq; + return msg; + } + + internal void SignalConsumersLoop(CancellationToken cancellationToken = default) + { + while (!cancellationToken.IsCancellationRequested) + { + if (!_signalWake.Reader.TryRead(out _)) + { + Thread.Sleep(1); + continue; + } + + var messages = _sigQueue.Pop(); + if (messages == null) + continue; + + foreach (var msg in messages) + { + SignalConsumers(msg.Subject, msg.Seq); + msg.ReturnToPool(); + } + } + } + + internal void SignalConsumers(string subject, ulong seq) + { + _ = subject; + _ = seq; + + lock (_consumersSync) + { + _ = _consumerList.Count; + } + } + + internal static JsPubMsg NewJSPubMsg(string destinationSubject, string subject, string? reply, byte[]? hdr, byte[]? msg, NatsConsumer? consumer, ulong seq) + { + var pub = GetJSPubMsgFromPool(); + pub.Subject = destinationSubject; + pub.Reply = reply; + pub.Hdr = hdr; + pub.Msg = msg; + pub.Pa = new StoreMsg + { + Subject = subject, + Seq = seq, + Hdr = hdr ?? [], + Msg = msg ?? [], + Buf = [], + }; + pub.Sync = consumer; + return pub; + } + + internal static JsPubMsg GetJSPubMsgFromPool() => JsPubMsg.Rent(); + + internal void SetupSendCapabilities() + { + _ = _outq; + _signalWake.Writer.TryWrite(true); + } + + internal string AccName() => Account.Name; + + internal string NameLocked() => Name; + + internal void InternalLoop(CancellationToken cancellationToken = default) + { + while (!cancellationToken.IsCancellationRequested) + { + if (!_internalWake.Reader.TryRead(out _)) + { + Thread.Sleep(1); + continue; + } + + var messages = _sigQueue.Pop(); + if (messages == null) + continue; + + foreach (var msg in messages) + { + SignalConsumers(msg.Subject, msg.Seq); + msg.ReturnToPool(); + } + } + } + + internal void ResetAndWaitOnConsumers() + { + List snapshot; + lock (_consumersSync) + { + snapshot = [.. _consumerList]; + } + + foreach (var consumer in snapshot) + consumer.Stop(); + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.MessageCarriers.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.MessageCarriers.cs index 112ea30..21ce5f3 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.MessageCarriers.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/StreamTypes.MessageCarriers.cs @@ -1,4 +1,5 @@ using System.Collections.Concurrent; +using ZB.MOM.NatsNet.Server.Internal; namespace ZB.MOM.NatsNet.Server; @@ -18,3 +19,75 @@ public sealed partial class InMsg Pool.Add(this); } } + +public sealed partial class CMsg +{ + private static readonly ConcurrentBag Pool = new(); + + internal static CMsg Rent() => Pool.TryTake(out var msg) ? msg : new CMsg(); + + internal void ReturnToPool() + { + Subject = string.Empty; + Msg = null; + Seq = 0; + Pool.Add(this); + } +} + +public sealed partial class JsPubMsg +{ + private static readonly ConcurrentBag Pool = new(); + + internal static JsPubMsg Rent() => Pool.TryTake(out var msg) ? msg : new JsPubMsg(); + + internal void ReturnToPool() + { + Subject = string.Empty; + Reply = null; + Hdr = null; + Msg = null; + Pa = null; + Sync = null; + Pool.Add(this); + } + + internal int Size() => + (Subject?.Length ?? 0) + + (Reply?.Length ?? 0) + + (Hdr?.Length ?? 0) + + (Msg?.Length ?? 0); +} + +public sealed class JsOutQ +{ + private readonly IpQueue _queue = new("js-outq"); + private bool _registered = true; + + public (int Len, Exception? Error) SendMsg(string reply, byte[] payload) + { + if (string.IsNullOrWhiteSpace(reply)) + return (0, new ArgumentException("reply is required", nameof(reply))); + + var msg = JsPubMsg.Rent(); + msg.Subject = reply; + msg.Msg = payload; + return Send(msg); + } + + public (int Len, Exception? Error) Send(JsPubMsg msg) + { + if (!_registered) + return (0, new InvalidOperationException("queue is unregistered")); + + return _queue.Push(msg); + } + + public void Unregister() + { + _registered = false; + _queue.Unregister(); + } + + internal JsPubMsg[]? Pop() => _queue.Pop(); +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamConsumersTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamConsumersTests.cs new file mode 100644 index 0000000..6b0f4e0 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamConsumersTests.cs @@ -0,0 +1,71 @@ +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +public sealed class NatsStreamConsumersTests +{ + [Fact] + public void NewCMsg_ReturnToPool_ClearsValues() + { + var msg = NatsStream.NewCMsg("orders.created", 22); + msg.Subject.ShouldBe("orders.created"); + msg.Seq.ShouldBe(22UL); + + msg.ReturnToPool(); + + msg.Subject.ShouldBe(string.Empty); + msg.Seq.ShouldBe(0UL); + } + + [Fact] + public void NewJSPubMsg_WithHeaderAndData_ReturnsSizedMessage() + { + var pub = NatsStream.NewJSPubMsg("inbox.x", "orders", "reply", [1, 2], [3, 4, 5], null, 12); + + pub.Subject.ShouldBe("inbox.x"); + pub.Size().ShouldBeGreaterThan(0); + } + + [Fact] + public void JsOutQ_SendThenUnregister_RejectsFutureSends() + { + var outq = new JsOutQ(); + var first = outq.SendMsg("inbox.1", [1, 2, 3]); + first.Error.ShouldBeNull(); + first.Len.ShouldBeGreaterThan(0); + + outq.Unregister(); + var second = outq.SendMsg("inbox.2", [4]); + second.Error.ShouldNotBeNull(); + } + + [Fact] + public void AccName_AndNameLocked_ReturnConfiguredValues() + { + var stream = CreateStream(); + + stream.AccName().ShouldBe("A"); + stream.NameLocked().ShouldBe("S"); + } + + [Fact] + public void SetupSendCapabilities_AndResetConsumers_DoNotThrow() + { + var stream = CreateStream(); + + Should.NotThrow(stream.SetupSendCapabilities); + Should.NotThrow(stream.ResetAndWaitOnConsumers); + } + + private static NatsStream CreateStream() + { + var account = new Account { Name = "A" }; + var config = new StreamConfig + { + Name = "S", + Storage = StorageType.MemoryStorage, + }; + return new NatsStream(account, config, DateTime.UtcNow); + } +} diff --git a/porting.db b/porting.db index 24ba797..c81d811 100644 Binary files a/porting.db and b/porting.db differ