diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs index be00bb1..7735173 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs @@ -101,10 +101,25 @@ public sealed class Account : INatsAccount internal ClientConnection? InternalClient { get; set; } /// - /// Send queue stub. Mirrors Go sq *sendq. - /// TODO: session 12 — send-queue implementation. + /// Per-account send queue. Mirrors Go sq *sendq. /// - internal object? SendQueue { get; set; } + internal SendQueue? SendQueue { get; set; } + + internal SendQueue? GetSendQueue() + { + lock (_sqmu) + { + return SendQueue; + } + } + + internal void SetSendQueue(SendQueue? sendQueue) + { + lock (_sqmu) + { + SendQueue = sendQueue; + } + } // ------------------------------------------------------------------------- // Eventing timers diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs index 8f677f6..7f526d9 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs @@ -953,6 +953,17 @@ public sealed partial class ClientConnection FlushClients(0); } + internal void ProcessInboundClientMsg(byte[] msg) + { + if (msg is null || msg.Length == 0) + return; + + LastIn = DateTime.UtcNow; + + if (Trace) + TraceMsg(msg); + } + internal void EnqueueProtoAndFlush(ReadOnlySpan proto) { EnqueueProto(proto); diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Accounts.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Accounts.cs index 6130fcc..64e95f1 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Accounts.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Accounts.cs @@ -334,6 +334,23 @@ public sealed partial class NatsServer public ClientConnection CreateInternalAccountClient() => CreateInternalClient(ClientKind.Account); + /// + /// Creates and attaches a per-account send queue. + /// Mirrors Go Server.newSendQ call sites. + /// + internal SendQueue NewSendQueue(Account account) + { + ArgumentNullException.ThrowIfNull(account); + + var existing = account.GetSendQueue(); + if (existing is not null) + return existing; + + var sendQueue = SendQueue.NewSendQ(this, account); + account.SetSendQueue(sendQueue); + return sendQueue; + } + /// /// Creates an internal client of the given . /// Mirrors Go Server.createInternalClient. diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/SendQueue.cs b/dotnet/src/ZB.MOM.NatsNet.Server/SendQueue.cs new file mode 100644 index 0000000..860d296 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/SendQueue.cs @@ -0,0 +1,223 @@ +// Copyright 2020-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Adapted from server/sendq.go in the NATS server Go source. + +using System.Collections.Concurrent; +using System.Text; +using ZB.MOM.NatsNet.Server.Internal; + +namespace ZB.MOM.NatsNet.Server; + +/// +/// Internal account send queue for system-dispatched publishes. +/// Mirrors Go sendq in server/sendq.go. +/// +public sealed class SendQueue : IDisposable +{ + private static readonly byte[] CrLfBytes = Encoding.ASCII.GetBytes(ServerConstants.CrLf); + private static readonly ConcurrentBag OutMsgPool = []; + + private readonly Lock _mu = new(); + private readonly Account _account; + private readonly IpQueue _queue; + private readonly Action _startLoop; + private readonly Func _clientFactory; + private readonly Func _isRunning; + private readonly Action _processInbound; + private readonly Action _flush; + private readonly CancellationTokenSource _loopCts = new(); + + private bool _disposed; + + private sealed class OutboundMessage + { + public string Subject { get; set; } = string.Empty; + public string Reply { get; set; } = string.Empty; + public byte[] Header { get; set; } = []; + public byte[] Message { get; set; } = []; + } + + private SendQueue( + NatsServer server, + Account account, + Action startLoop, + Func clientFactory, + Func isRunning, + Action processInbound, + Action flush) + { + _account = account; + _queue = IpQueue.NewIPQueue("SendQ"); + _startLoop = startLoop; + _clientFactory = clientFactory; + _isRunning = isRunning; + _processInbound = processInbound; + _flush = flush; + } + + /// + /// Creates and starts a send queue instance. + /// Mirrors Go Server.newSendQ. + /// + public static SendQueue NewSendQ( + NatsServer server, + Account account, + Action? startLoop = null, + Func? clientFactory = null, + Func? isRunning = null, + Action? processInbound = null, + Action? flush = null) + { + ArgumentNullException.ThrowIfNull(server); + ArgumentNullException.ThrowIfNull(account); + + var queue = new SendQueue( + server, + account, + startLoop ?? (action => server.StartGoRoutine(action)), + clientFactory ?? server.CreateInternalSystemClient, + isRunning ?? server.Running, + processInbound ?? ((client, msg) => client.ProcessInboundClientMsg(msg)), + flush ?? (client => client.FlushClients(0))); + + queue._startLoop(queue.InternalLoop); + return queue; + } + + /// + /// Send helper that mirrors Go nil-receiver behavior. + /// + public static void Send( + SendQueue? sendQueue, + string subject, + string reply, + ReadOnlySpan header, + ReadOnlySpan message) + { + sendQueue?.Send(subject, reply, header, message); + } + + /// + /// Send queue processing loop. + /// Mirrors Go sendq.internalLoop. + /// + internal void InternalLoop() + { + ClientConnection? client = null; + + try + { + client = _clientFactory(); + client.RegisterWithAccount(_account); + client.NoIcb = true; + + while (!_loopCts.IsCancellationRequested && _isRunning()) + { + if (!_queue.Ch.WaitToReadAsync(_loopCts.Token).AsTask().GetAwaiter().GetResult()) + break; + + var pending = _queue.Pop(); + if (pending is null || pending.Length == 0) + continue; + + foreach (var outMsg in pending) + { + var payload = BuildInboundPayload(outMsg); + _processInbound(client, payload); + ReturnMessage(outMsg); + } + + _flush(client); + _queue.Recycle(pending); + } + } + catch (OperationCanceledException) + { + // Queue disposed/shutdown. + } + finally + { + client?.CloseConnection(ClosedState.ClientClosed); + } + } + + /// + /// Queues a message for internal processing. + /// Mirrors Go sendq.send. + /// + public void Send(string subject, string reply, ReadOnlySpan header, ReadOnlySpan message) + { + if (_disposed) + return; + + var outMsg = RentMessage(); + outMsg.Subject = subject; + outMsg.Reply = reply; + outMsg.Header = header.ToArray(); + outMsg.Message = message.ToArray(); + + var (_, error) = _queue.Push(outMsg); + if (error is not null) + ReturnMessage(outMsg); + } + + private static byte[] BuildInboundPayload(OutboundMessage outMsg) + { + var payload = new byte[outMsg.Header.Length + outMsg.Message.Length + CrLfBytes.Length]; + var offset = 0; + + if (outMsg.Header.Length > 0) + { + Buffer.BlockCopy(outMsg.Header, 0, payload, 0, outMsg.Header.Length); + offset += outMsg.Header.Length; + } + + Buffer.BlockCopy(outMsg.Message, 0, payload, offset, outMsg.Message.Length); + offset += outMsg.Message.Length; + + Buffer.BlockCopy(CrLfBytes, 0, payload, offset, CrLfBytes.Length); + return payload; + } + + private static OutboundMessage RentMessage() + { + if (OutMsgPool.TryTake(out var outMsg)) + return outMsg; + + return new OutboundMessage(); + } + + private static void ReturnMessage(OutboundMessage outMsg) + { + outMsg.Subject = string.Empty; + outMsg.Reply = string.Empty; + outMsg.Header = []; + outMsg.Message = []; + OutMsgPool.Add(outMsg); + } + + public void Dispose() + { + lock (_mu) + { + if (_disposed) + return; + + _disposed = true; + } + + _loopCts.Cancel(); + _loopCts.Dispose(); + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Internal/SendQueueTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Internal/SendQueueTests.cs new file mode 100644 index 0000000..f5513d5 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Internal/SendQueueTests.cs @@ -0,0 +1,107 @@ +// Copyright 2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System.Text; +using Shouldly; +using ZB.MOM.NatsNet.Server.Internal; + +namespace ZB.MOM.NatsNet.Server.Tests.Internal; + +public sealed class SendQueueTests +{ + [Fact] + public void NewSendQ_WhenCreated_StartsInternalLoopEntryPath() + { + var (server, account) = CreateServerAndAccount(); + var started = false; + + using var sendQueue = SendQueue.NewSendQ( + server, + account, + startLoop: _ => started = true); + + started.ShouldBeTrue(); + } + + [Fact] + public void Send_WhenQueueIsNullOrDisposed_NoOps() + { + SendQueue? nullQueue = null; + Should.NotThrow(() => + SendQueue.Send(nullQueue, "subject", string.Empty, [], [])); + + var (server, account) = CreateServerAndAccount(); + using var sendQueue = SendQueue.NewSendQ( + server, + account, + startLoop: _ => { }); + + sendQueue.Dispose(); + + Should.NotThrow(() => + sendQueue.Send("subject", "reply", [1, 2], [3, 4])); + } + + [Fact] + public void InternalLoop_WhenMessageQueued_CopiesPayloadAndDispatchesToInternalClientPath() + { + var (server, account) = CreateServerAndAccount(); + var internalClient = new ClientConnection(ClientKind.System); + var received = new List(); + var flushCalls = 0; + var running = true; + + using var sendQueue = SendQueue.NewSendQ( + server, + account, + startLoop: _ => { }, + clientFactory: () => internalClient, + isRunning: () => + { + var current = running; + running = false; + return current; + }, + processInbound: (_, msg) => received.Add((byte[])msg.Clone()), + flush: _ => flushCalls++); + + var hdr = Encoding.ASCII.GetBytes("NATS/1.0\r\nHeader: A\r\n\r\n"); + var msg = Encoding.ASCII.GetBytes("hello"); + sendQueue.Send("events.1", "reply.1", hdr, msg); + hdr[0] = (byte)'X'; + msg[0] = (byte)'Y'; + + sendQueue.InternalLoop(); + + received.Count.ShouldBe(1); + flushCalls.ShouldBe(1); + + var expectedHeader = Encoding.ASCII.GetBytes("NATS/1.0\r\nHeader: A\r\n\r\n"); + var expectedPayload = Encoding.ASCII.GetBytes("hello\r\n"); + var expected = new byte[expectedHeader.Length + expectedPayload.Length]; + Buffer.BlockCopy(expectedHeader, 0, expected, 0, expectedHeader.Length); + Buffer.BlockCopy(expectedPayload, 0, expected, expectedHeader.Length, expectedPayload.Length); + + received[0].ShouldBe(expected); + } + + private static (NatsServer server, Account account) CreateServerAndAccount() + { + var (server, err) = NatsServer.NewServer(new ServerOptions { NoSystemAccount = true }); + err.ShouldBeNull(); + server.ShouldNotBeNull(); + + var account = new Account { Name = "SENDQ" }; + return (server!, account); + } +} diff --git a/porting.db b/porting.db index 6364f8d..3da9f5a 100644 Binary files a/porting.db and b/porting.db differ diff --git a/reports/current.md b/reports/current.md index 92f8af9..e860f29 100644 --- a/reports/current.md +++ b/reports/current.md @@ -1,6 +1,6 @@ # NATS .NET Porting Status Report -Generated: 2026-02-28 12:24:46 UTC +Generated: 2026-02-28 12:31:12 UTC ## Modules (12 total) @@ -12,10 +12,10 @@ Generated: 2026-02-28 12:24:46 UTC | Status | Count | |--------|-------| -| deferred | 2351 | +| deferred | 2348 | | n_a | 24 | | stub | 1 | -| verified | 1297 | +| verified | 1300 | ## Unit Tests (3257 total) @@ -34,4 +34,4 @@ Generated: 2026-02-28 12:24:46 UTC ## Overall Progress -**2499/6942 items complete (36.0%)** +**2502/6942 items complete (36.0%)**