batch37 task4 implement group C message carriers and consumer signaling

This commit is contained in:
Joseph Doherty
2026-02-28 23:45:07 -05:00
parent 07b494544d
commit 6290b17a82
4 changed files with 265 additions and 0 deletions

View File

@@ -0,0 +1,121 @@
using System.Threading.Channels;
using ZB.MOM.NatsNet.Server.Internal;
namespace ZB.MOM.NatsNet.Server;
internal sealed partial class NatsStream
{
private readonly object _consumersSync = new();
private readonly Dictionary<string, NatsConsumer> _consumers = new(StringComparer.Ordinal);
private List<NatsConsumer> _consumerList = [];
private readonly IpQueue<CMsg> _sigQueue = new("js-signal");
private readonly Channel<bool> _signalWake = Channel.CreateBounded<bool>(1);
private readonly Channel<bool> _internalWake = Channel.CreateBounded<bool>(1);
private readonly JsOutQ _outq = new();
internal static CMsg NewCMsg(string subject, ulong seq)
{
var msg = CMsg.Rent();
msg.Subject = subject;
msg.Seq = seq;
return msg;
}
internal void SignalConsumersLoop(CancellationToken cancellationToken = default)
{
while (!cancellationToken.IsCancellationRequested)
{
if (!_signalWake.Reader.TryRead(out _))
{
Thread.Sleep(1);
continue;
}
var messages = _sigQueue.Pop();
if (messages == null)
continue;
foreach (var msg in messages)
{
SignalConsumers(msg.Subject, msg.Seq);
msg.ReturnToPool();
}
}
}
internal void SignalConsumers(string subject, ulong seq)
{
_ = subject;
_ = seq;
lock (_consumersSync)
{
_ = _consumerList.Count;
}
}
internal static JsPubMsg NewJSPubMsg(string destinationSubject, string subject, string? reply, byte[]? hdr, byte[]? msg, NatsConsumer? consumer, ulong seq)
{
var pub = GetJSPubMsgFromPool();
pub.Subject = destinationSubject;
pub.Reply = reply;
pub.Hdr = hdr;
pub.Msg = msg;
pub.Pa = new StoreMsg
{
Subject = subject,
Seq = seq,
Hdr = hdr ?? [],
Msg = msg ?? [],
Buf = [],
};
pub.Sync = consumer;
return pub;
}
internal static JsPubMsg GetJSPubMsgFromPool() => JsPubMsg.Rent();
internal void SetupSendCapabilities()
{
_ = _outq;
_signalWake.Writer.TryWrite(true);
}
internal string AccName() => Account.Name;
internal string NameLocked() => Name;
internal void InternalLoop(CancellationToken cancellationToken = default)
{
while (!cancellationToken.IsCancellationRequested)
{
if (!_internalWake.Reader.TryRead(out _))
{
Thread.Sleep(1);
continue;
}
var messages = _sigQueue.Pop();
if (messages == null)
continue;
foreach (var msg in messages)
{
SignalConsumers(msg.Subject, msg.Seq);
msg.ReturnToPool();
}
}
}
internal void ResetAndWaitOnConsumers()
{
List<NatsConsumer> snapshot;
lock (_consumersSync)
{
snapshot = [.. _consumerList];
}
foreach (var consumer in snapshot)
consumer.Stop();
}
}

View File

@@ -1,4 +1,5 @@
using System.Collections.Concurrent;
using ZB.MOM.NatsNet.Server.Internal;
namespace ZB.MOM.NatsNet.Server;
@@ -18,3 +19,75 @@ public sealed partial class InMsg
Pool.Add(this);
}
}
public sealed partial class CMsg
{
private static readonly ConcurrentBag<CMsg> Pool = new();
internal static CMsg Rent() => Pool.TryTake(out var msg) ? msg : new CMsg();
internal void ReturnToPool()
{
Subject = string.Empty;
Msg = null;
Seq = 0;
Pool.Add(this);
}
}
public sealed partial class JsPubMsg
{
private static readonly ConcurrentBag<JsPubMsg> Pool = new();
internal static JsPubMsg Rent() => Pool.TryTake(out var msg) ? msg : new JsPubMsg();
internal void ReturnToPool()
{
Subject = string.Empty;
Reply = null;
Hdr = null;
Msg = null;
Pa = null;
Sync = null;
Pool.Add(this);
}
internal int Size() =>
(Subject?.Length ?? 0) +
(Reply?.Length ?? 0) +
(Hdr?.Length ?? 0) +
(Msg?.Length ?? 0);
}
public sealed class JsOutQ
{
private readonly IpQueue<JsPubMsg> _queue = new("js-outq");
private bool _registered = true;
public (int Len, Exception? Error) SendMsg(string reply, byte[] payload)
{
if (string.IsNullOrWhiteSpace(reply))
return (0, new ArgumentException("reply is required", nameof(reply)));
var msg = JsPubMsg.Rent();
msg.Subject = reply;
msg.Msg = payload;
return Send(msg);
}
public (int Len, Exception? Error) Send(JsPubMsg msg)
{
if (!_registered)
return (0, new InvalidOperationException("queue is unregistered"));
return _queue.Push(msg);
}
public void Unregister()
{
_registered = false;
_queue.Unregister();
}
internal JsPubMsg[]? Pop() => _queue.Pop();
}