// Copyright 2020-2026 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // // Adapted from server/sendq.go in the NATS server Go source. using System.Collections.Concurrent; using System.Text; using ZB.MOM.NatsNet.Server.Internal; namespace ZB.MOM.NatsNet.Server; /// /// Internal account send queue for system-dispatched publishes. /// Mirrors Go sendq in server/sendq.go. /// public sealed class SendQueue : IDisposable { private static readonly byte[] CrLfBytes = Encoding.ASCII.GetBytes(ServerConstants.CrLf); private static readonly ConcurrentBag OutMsgPool = []; private readonly Lock _mu = new(); private readonly Account _account; private readonly IpQueue _queue; private readonly Action _startLoop; private readonly Func _clientFactory; private readonly Func _isRunning; private readonly Action _processInbound; private readonly Action _flush; private readonly CancellationTokenSource _loopCts = new(); private bool _disposed; private sealed class OutboundMessage { public string Subject { get; set; } = string.Empty; public string Reply { get; set; } = string.Empty; public byte[] Header { get; set; } = []; public byte[] Message { get; set; } = []; } private SendQueue( NatsServer server, Account account, Action startLoop, Func clientFactory, Func isRunning, Action processInbound, Action flush) { _account = account; _queue = IpQueue.NewIPQueue("SendQ"); _startLoop = startLoop; _clientFactory = clientFactory; _isRunning = isRunning; _processInbound = processInbound; _flush = flush; } /// /// Creates and starts a send queue instance. /// Mirrors Go Server.newSendQ. /// public static SendQueue NewSendQ( NatsServer server, Account account, Action? startLoop = null, Func? clientFactory = null, Func? isRunning = null, Action? processInbound = null, Action? flush = null) { ArgumentNullException.ThrowIfNull(server); ArgumentNullException.ThrowIfNull(account); var queue = new SendQueue( server, account, startLoop ?? (action => server.StartGoRoutine(action)), clientFactory ?? server.CreateInternalSystemClient, isRunning ?? server.Running, processInbound ?? ((client, msg) => client.ProcessInboundClientMsg(msg)), flush ?? (client => client.FlushClients(0))); queue._startLoop(queue.InternalLoop); return queue; } /// /// Send helper that mirrors Go nil-receiver behavior. /// public static void Send( SendQueue? sendQueue, string subject, string reply, ReadOnlySpan header, ReadOnlySpan message) { sendQueue?.Send(subject, reply, header, message); } /// /// Send queue processing loop. /// Mirrors Go sendq.internalLoop. /// internal void InternalLoop() { ClientConnection? client = null; try { client = _clientFactory(); client.RegisterWithAccount(_account); client.NoIcb = true; while (!_loopCts.IsCancellationRequested && _isRunning()) { if (!_queue.Ch.WaitToReadAsync(_loopCts.Token).AsTask().GetAwaiter().GetResult()) break; var pending = _queue.Pop(); if (pending is null || pending.Length == 0) continue; foreach (var outMsg in pending) { var payload = BuildInboundPayload(outMsg); _processInbound(client, payload); ReturnMessage(outMsg); } _flush(client); _queue.Recycle(pending); } } catch (OperationCanceledException) { // Queue disposed/shutdown. } finally { client?.CloseConnection(ClosedState.ClientClosed); } } /// /// Queues a message for internal processing. /// Mirrors Go sendq.send. /// public void Send(string subject, string reply, ReadOnlySpan header, ReadOnlySpan message) { if (_disposed) return; var outMsg = RentMessage(); outMsg.Subject = subject; outMsg.Reply = reply; outMsg.Header = header.ToArray(); outMsg.Message = message.ToArray(); var (_, error) = _queue.Push(outMsg); if (error is not null) ReturnMessage(outMsg); } private static byte[] BuildInboundPayload(OutboundMessage outMsg) { var payload = new byte[outMsg.Header.Length + outMsg.Message.Length + CrLfBytes.Length]; var offset = 0; if (outMsg.Header.Length > 0) { Buffer.BlockCopy(outMsg.Header, 0, payload, 0, outMsg.Header.Length); offset += outMsg.Header.Length; } Buffer.BlockCopy(outMsg.Message, 0, payload, offset, outMsg.Message.Length); offset += outMsg.Message.Length; Buffer.BlockCopy(CrLfBytes, 0, payload, offset, CrLfBytes.Length); return payload; } private static OutboundMessage RentMessage() { if (OutMsgPool.TryTake(out var outMsg)) return outMsg; return new OutboundMessage(); } private static void ReturnMessage(OutboundMessage outMsg) { outMsg.Subject = string.Empty; outMsg.Reply = string.Empty; outMsg.Header = []; outMsg.Message = []; OutMsgPool.Add(outMsg); } public void Dispose() { lock (_mu) { if (_disposed) return; _disposed = true; } _loopCts.Cancel(); _loopCts.Dispose(); } }