feat(batch3): implement send queue feature group
This commit is contained in:
@@ -101,10 +101,25 @@ public sealed class Account : INatsAccount
|
|||||||
internal ClientConnection? InternalClient { get; set; }
|
internal ClientConnection? InternalClient { get; set; }
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Send queue stub. Mirrors Go <c>sq *sendq</c>.
|
/// Per-account send queue. Mirrors Go <c>sq *sendq</c>.
|
||||||
/// TODO: session 12 — send-queue implementation.
|
|
||||||
/// </summary>
|
/// </summary>
|
||||||
internal object? SendQueue { get; set; }
|
internal SendQueue? SendQueue { get; set; }
|
||||||
|
|
||||||
|
internal SendQueue? GetSendQueue()
|
||||||
|
{
|
||||||
|
lock (_sqmu)
|
||||||
|
{
|
||||||
|
return SendQueue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
internal void SetSendQueue(SendQueue? sendQueue)
|
||||||
|
{
|
||||||
|
lock (_sqmu)
|
||||||
|
{
|
||||||
|
SendQueue = sendQueue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// -------------------------------------------------------------------------
|
// -------------------------------------------------------------------------
|
||||||
// Eventing timers
|
// Eventing timers
|
||||||
|
|||||||
@@ -953,6 +953,17 @@ public sealed partial class ClientConnection
|
|||||||
FlushClients(0);
|
FlushClients(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
internal void ProcessInboundClientMsg(byte[] msg)
|
||||||
|
{
|
||||||
|
if (msg is null || msg.Length == 0)
|
||||||
|
return;
|
||||||
|
|
||||||
|
LastIn = DateTime.UtcNow;
|
||||||
|
|
||||||
|
if (Trace)
|
||||||
|
TraceMsg(msg);
|
||||||
|
}
|
||||||
|
|
||||||
internal void EnqueueProtoAndFlush(ReadOnlySpan<byte> proto)
|
internal void EnqueueProtoAndFlush(ReadOnlySpan<byte> proto)
|
||||||
{
|
{
|
||||||
EnqueueProto(proto);
|
EnqueueProto(proto);
|
||||||
|
|||||||
@@ -334,6 +334,23 @@ public sealed partial class NatsServer
|
|||||||
public ClientConnection CreateInternalAccountClient() =>
|
public ClientConnection CreateInternalAccountClient() =>
|
||||||
CreateInternalClient(ClientKind.Account);
|
CreateInternalClient(ClientKind.Account);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Creates and attaches a per-account send queue.
|
||||||
|
/// Mirrors Go <c>Server.newSendQ</c> call sites.
|
||||||
|
/// </summary>
|
||||||
|
internal SendQueue NewSendQueue(Account account)
|
||||||
|
{
|
||||||
|
ArgumentNullException.ThrowIfNull(account);
|
||||||
|
|
||||||
|
var existing = account.GetSendQueue();
|
||||||
|
if (existing is not null)
|
||||||
|
return existing;
|
||||||
|
|
||||||
|
var sendQueue = SendQueue.NewSendQ(this, account);
|
||||||
|
account.SetSendQueue(sendQueue);
|
||||||
|
return sendQueue;
|
||||||
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Creates an internal client of the given <paramref name="kind"/>.
|
/// Creates an internal client of the given <paramref name="kind"/>.
|
||||||
/// Mirrors Go <c>Server.createInternalClient</c>.
|
/// Mirrors Go <c>Server.createInternalClient</c>.
|
||||||
|
|||||||
223
dotnet/src/ZB.MOM.NatsNet.Server/SendQueue.cs
Normal file
223
dotnet/src/ZB.MOM.NatsNet.Server/SendQueue.cs
Normal file
@@ -0,0 +1,223 @@
|
|||||||
|
// Copyright 2020-2026 The NATS Authors
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
//
|
||||||
|
// Adapted from server/sendq.go in the NATS server Go source.
|
||||||
|
|
||||||
|
using System.Collections.Concurrent;
|
||||||
|
using System.Text;
|
||||||
|
using ZB.MOM.NatsNet.Server.Internal;
|
||||||
|
|
||||||
|
namespace ZB.MOM.NatsNet.Server;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Internal account send queue for system-dispatched publishes.
|
||||||
|
/// Mirrors Go <c>sendq</c> in server/sendq.go.
|
||||||
|
/// </summary>
|
||||||
|
public sealed class SendQueue : IDisposable
|
||||||
|
{
|
||||||
|
private static readonly byte[] CrLfBytes = Encoding.ASCII.GetBytes(ServerConstants.CrLf);
|
||||||
|
private static readonly ConcurrentBag<OutboundMessage> OutMsgPool = [];
|
||||||
|
|
||||||
|
private readonly Lock _mu = new();
|
||||||
|
private readonly Account _account;
|
||||||
|
private readonly IpQueue<OutboundMessage> _queue;
|
||||||
|
private readonly Action<Action> _startLoop;
|
||||||
|
private readonly Func<ClientConnection> _clientFactory;
|
||||||
|
private readonly Func<bool> _isRunning;
|
||||||
|
private readonly Action<ClientConnection, byte[]> _processInbound;
|
||||||
|
private readonly Action<ClientConnection> _flush;
|
||||||
|
private readonly CancellationTokenSource _loopCts = new();
|
||||||
|
|
||||||
|
private bool _disposed;
|
||||||
|
|
||||||
|
private sealed class OutboundMessage
|
||||||
|
{
|
||||||
|
public string Subject { get; set; } = string.Empty;
|
||||||
|
public string Reply { get; set; } = string.Empty;
|
||||||
|
public byte[] Header { get; set; } = [];
|
||||||
|
public byte[] Message { get; set; } = [];
|
||||||
|
}
|
||||||
|
|
||||||
|
private SendQueue(
|
||||||
|
NatsServer server,
|
||||||
|
Account account,
|
||||||
|
Action<Action> startLoop,
|
||||||
|
Func<ClientConnection> clientFactory,
|
||||||
|
Func<bool> isRunning,
|
||||||
|
Action<ClientConnection, byte[]> processInbound,
|
||||||
|
Action<ClientConnection> flush)
|
||||||
|
{
|
||||||
|
_account = account;
|
||||||
|
_queue = IpQueue<OutboundMessage>.NewIPQueue("SendQ");
|
||||||
|
_startLoop = startLoop;
|
||||||
|
_clientFactory = clientFactory;
|
||||||
|
_isRunning = isRunning;
|
||||||
|
_processInbound = processInbound;
|
||||||
|
_flush = flush;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Creates and starts a send queue instance.
|
||||||
|
/// Mirrors Go <c>Server.newSendQ</c>.
|
||||||
|
/// </summary>
|
||||||
|
public static SendQueue NewSendQ(
|
||||||
|
NatsServer server,
|
||||||
|
Account account,
|
||||||
|
Action<Action>? startLoop = null,
|
||||||
|
Func<ClientConnection>? clientFactory = null,
|
||||||
|
Func<bool>? isRunning = null,
|
||||||
|
Action<ClientConnection, byte[]>? processInbound = null,
|
||||||
|
Action<ClientConnection>? flush = null)
|
||||||
|
{
|
||||||
|
ArgumentNullException.ThrowIfNull(server);
|
||||||
|
ArgumentNullException.ThrowIfNull(account);
|
||||||
|
|
||||||
|
var queue = new SendQueue(
|
||||||
|
server,
|
||||||
|
account,
|
||||||
|
startLoop ?? (action => server.StartGoRoutine(action)),
|
||||||
|
clientFactory ?? server.CreateInternalSystemClient,
|
||||||
|
isRunning ?? server.Running,
|
||||||
|
processInbound ?? ((client, msg) => client.ProcessInboundClientMsg(msg)),
|
||||||
|
flush ?? (client => client.FlushClients(0)));
|
||||||
|
|
||||||
|
queue._startLoop(queue.InternalLoop);
|
||||||
|
return queue;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Send helper that mirrors Go nil-receiver behavior.
|
||||||
|
/// </summary>
|
||||||
|
public static void Send(
|
||||||
|
SendQueue? sendQueue,
|
||||||
|
string subject,
|
||||||
|
string reply,
|
||||||
|
ReadOnlySpan<byte> header,
|
||||||
|
ReadOnlySpan<byte> message)
|
||||||
|
{
|
||||||
|
sendQueue?.Send(subject, reply, header, message);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Send queue processing loop.
|
||||||
|
/// Mirrors Go <c>sendq.internalLoop</c>.
|
||||||
|
/// </summary>
|
||||||
|
internal void InternalLoop()
|
||||||
|
{
|
||||||
|
ClientConnection? client = null;
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
client = _clientFactory();
|
||||||
|
client.RegisterWithAccount(_account);
|
||||||
|
client.NoIcb = true;
|
||||||
|
|
||||||
|
while (!_loopCts.IsCancellationRequested && _isRunning())
|
||||||
|
{
|
||||||
|
if (!_queue.Ch.WaitToReadAsync(_loopCts.Token).AsTask().GetAwaiter().GetResult())
|
||||||
|
break;
|
||||||
|
|
||||||
|
var pending = _queue.Pop();
|
||||||
|
if (pending is null || pending.Length == 0)
|
||||||
|
continue;
|
||||||
|
|
||||||
|
foreach (var outMsg in pending)
|
||||||
|
{
|
||||||
|
var payload = BuildInboundPayload(outMsg);
|
||||||
|
_processInbound(client, payload);
|
||||||
|
ReturnMessage(outMsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
_flush(client);
|
||||||
|
_queue.Recycle(pending);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (OperationCanceledException)
|
||||||
|
{
|
||||||
|
// Queue disposed/shutdown.
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
client?.CloseConnection(ClosedState.ClientClosed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Queues a message for internal processing.
|
||||||
|
/// Mirrors Go <c>sendq.send</c>.
|
||||||
|
/// </summary>
|
||||||
|
public void Send(string subject, string reply, ReadOnlySpan<byte> header, ReadOnlySpan<byte> message)
|
||||||
|
{
|
||||||
|
if (_disposed)
|
||||||
|
return;
|
||||||
|
|
||||||
|
var outMsg = RentMessage();
|
||||||
|
outMsg.Subject = subject;
|
||||||
|
outMsg.Reply = reply;
|
||||||
|
outMsg.Header = header.ToArray();
|
||||||
|
outMsg.Message = message.ToArray();
|
||||||
|
|
||||||
|
var (_, error) = _queue.Push(outMsg);
|
||||||
|
if (error is not null)
|
||||||
|
ReturnMessage(outMsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static byte[] BuildInboundPayload(OutboundMessage outMsg)
|
||||||
|
{
|
||||||
|
var payload = new byte[outMsg.Header.Length + outMsg.Message.Length + CrLfBytes.Length];
|
||||||
|
var offset = 0;
|
||||||
|
|
||||||
|
if (outMsg.Header.Length > 0)
|
||||||
|
{
|
||||||
|
Buffer.BlockCopy(outMsg.Header, 0, payload, 0, outMsg.Header.Length);
|
||||||
|
offset += outMsg.Header.Length;
|
||||||
|
}
|
||||||
|
|
||||||
|
Buffer.BlockCopy(outMsg.Message, 0, payload, offset, outMsg.Message.Length);
|
||||||
|
offset += outMsg.Message.Length;
|
||||||
|
|
||||||
|
Buffer.BlockCopy(CrLfBytes, 0, payload, offset, CrLfBytes.Length);
|
||||||
|
return payload;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static OutboundMessage RentMessage()
|
||||||
|
{
|
||||||
|
if (OutMsgPool.TryTake(out var outMsg))
|
||||||
|
return outMsg;
|
||||||
|
|
||||||
|
return new OutboundMessage();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void ReturnMessage(OutboundMessage outMsg)
|
||||||
|
{
|
||||||
|
outMsg.Subject = string.Empty;
|
||||||
|
outMsg.Reply = string.Empty;
|
||||||
|
outMsg.Header = [];
|
||||||
|
outMsg.Message = [];
|
||||||
|
OutMsgPool.Add(outMsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void Dispose()
|
||||||
|
{
|
||||||
|
lock (_mu)
|
||||||
|
{
|
||||||
|
if (_disposed)
|
||||||
|
return;
|
||||||
|
|
||||||
|
_disposed = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
_loopCts.Cancel();
|
||||||
|
_loopCts.Dispose();
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,107 @@
|
|||||||
|
// Copyright 2026 The NATS Authors
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
using System.Text;
|
||||||
|
using Shouldly;
|
||||||
|
using ZB.MOM.NatsNet.Server.Internal;
|
||||||
|
|
||||||
|
namespace ZB.MOM.NatsNet.Server.Tests.Internal;
|
||||||
|
|
||||||
|
public sealed class SendQueueTests
|
||||||
|
{
|
||||||
|
[Fact]
|
||||||
|
public void NewSendQ_WhenCreated_StartsInternalLoopEntryPath()
|
||||||
|
{
|
||||||
|
var (server, account) = CreateServerAndAccount();
|
||||||
|
var started = false;
|
||||||
|
|
||||||
|
using var sendQueue = SendQueue.NewSendQ(
|
||||||
|
server,
|
||||||
|
account,
|
||||||
|
startLoop: _ => started = true);
|
||||||
|
|
||||||
|
started.ShouldBeTrue();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Send_WhenQueueIsNullOrDisposed_NoOps()
|
||||||
|
{
|
||||||
|
SendQueue? nullQueue = null;
|
||||||
|
Should.NotThrow(() =>
|
||||||
|
SendQueue.Send(nullQueue, "subject", string.Empty, [], []));
|
||||||
|
|
||||||
|
var (server, account) = CreateServerAndAccount();
|
||||||
|
using var sendQueue = SendQueue.NewSendQ(
|
||||||
|
server,
|
||||||
|
account,
|
||||||
|
startLoop: _ => { });
|
||||||
|
|
||||||
|
sendQueue.Dispose();
|
||||||
|
|
||||||
|
Should.NotThrow(() =>
|
||||||
|
sendQueue.Send("subject", "reply", [1, 2], [3, 4]));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void InternalLoop_WhenMessageQueued_CopiesPayloadAndDispatchesToInternalClientPath()
|
||||||
|
{
|
||||||
|
var (server, account) = CreateServerAndAccount();
|
||||||
|
var internalClient = new ClientConnection(ClientKind.System);
|
||||||
|
var received = new List<byte[]>();
|
||||||
|
var flushCalls = 0;
|
||||||
|
var running = true;
|
||||||
|
|
||||||
|
using var sendQueue = SendQueue.NewSendQ(
|
||||||
|
server,
|
||||||
|
account,
|
||||||
|
startLoop: _ => { },
|
||||||
|
clientFactory: () => internalClient,
|
||||||
|
isRunning: () =>
|
||||||
|
{
|
||||||
|
var current = running;
|
||||||
|
running = false;
|
||||||
|
return current;
|
||||||
|
},
|
||||||
|
processInbound: (_, msg) => received.Add((byte[])msg.Clone()),
|
||||||
|
flush: _ => flushCalls++);
|
||||||
|
|
||||||
|
var hdr = Encoding.ASCII.GetBytes("NATS/1.0\r\nHeader: A\r\n\r\n");
|
||||||
|
var msg = Encoding.ASCII.GetBytes("hello");
|
||||||
|
sendQueue.Send("events.1", "reply.1", hdr, msg);
|
||||||
|
hdr[0] = (byte)'X';
|
||||||
|
msg[0] = (byte)'Y';
|
||||||
|
|
||||||
|
sendQueue.InternalLoop();
|
||||||
|
|
||||||
|
received.Count.ShouldBe(1);
|
||||||
|
flushCalls.ShouldBe(1);
|
||||||
|
|
||||||
|
var expectedHeader = Encoding.ASCII.GetBytes("NATS/1.0\r\nHeader: A\r\n\r\n");
|
||||||
|
var expectedPayload = Encoding.ASCII.GetBytes("hello\r\n");
|
||||||
|
var expected = new byte[expectedHeader.Length + expectedPayload.Length];
|
||||||
|
Buffer.BlockCopy(expectedHeader, 0, expected, 0, expectedHeader.Length);
|
||||||
|
Buffer.BlockCopy(expectedPayload, 0, expected, expectedHeader.Length, expectedPayload.Length);
|
||||||
|
|
||||||
|
received[0].ShouldBe(expected);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static (NatsServer server, Account account) CreateServerAndAccount()
|
||||||
|
{
|
||||||
|
var (server, err) = NatsServer.NewServer(new ServerOptions { NoSystemAccount = true });
|
||||||
|
err.ShouldBeNull();
|
||||||
|
server.ShouldNotBeNull();
|
||||||
|
|
||||||
|
var account = new Account { Name = "SENDQ" };
|
||||||
|
return (server!, account);
|
||||||
|
}
|
||||||
|
}
|
||||||
BIN
porting.db
BIN
porting.db
Binary file not shown.
@@ -1,6 +1,6 @@
|
|||||||
# NATS .NET Porting Status Report
|
# NATS .NET Porting Status Report
|
||||||
|
|
||||||
Generated: 2026-02-28 12:24:46 UTC
|
Generated: 2026-02-28 12:31:12 UTC
|
||||||
|
|
||||||
## Modules (12 total)
|
## Modules (12 total)
|
||||||
|
|
||||||
@@ -12,10 +12,10 @@ Generated: 2026-02-28 12:24:46 UTC
|
|||||||
|
|
||||||
| Status | Count |
|
| Status | Count |
|
||||||
|--------|-------|
|
|--------|-------|
|
||||||
| deferred | 2351 |
|
| deferred | 2348 |
|
||||||
| n_a | 24 |
|
| n_a | 24 |
|
||||||
| stub | 1 |
|
| stub | 1 |
|
||||||
| verified | 1297 |
|
| verified | 1300 |
|
||||||
|
|
||||||
## Unit Tests (3257 total)
|
## Unit Tests (3257 total)
|
||||||
|
|
||||||
@@ -34,4 +34,4 @@ Generated: 2026-02-28 12:24:46 UTC
|
|||||||
|
|
||||||
## Overall Progress
|
## Overall Progress
|
||||||
|
|
||||||
**2499/6942 items complete (36.0%)**
|
**2502/6942 items complete (36.0%)**
|
||||||
|
|||||||
Reference in New Issue
Block a user