// Copyright 2026 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. using System.Text; using Shouldly; using ZB.MOM.NatsNet.Server.Internal; namespace ZB.MOM.NatsNet.Server.Tests.Internal; public sealed class SendQueueTests { [Fact] public void NewSendQ_WhenCreated_StartsInternalLoopEntryPath() { var (server, account) = CreateServerAndAccount(); var started = false; using var sendQueue = SendQueue.NewSendQ( server, account, startLoop: _ => started = true); started.ShouldBeTrue(); } [Fact] public void Send_WhenQueueIsNullOrDisposed_NoOps() { SendQueue? nullQueue = null; Should.NotThrow(() => SendQueue.Send(nullQueue, "subject", string.Empty, [], [])); var (server, account) = CreateServerAndAccount(); using var sendQueue = SendQueue.NewSendQ( server, account, startLoop: _ => { }); sendQueue.Dispose(); Should.NotThrow(() => sendQueue.Send("subject", "reply", [1, 2], [3, 4])); } [Fact] public void InternalLoop_WhenMessageQueued_CopiesPayloadAndDispatchesToInternalClientPath() { var (server, account) = CreateServerAndAccount(); var internalClient = new ClientConnection(ClientKind.System); var received = new List(); var flushCalls = 0; var running = true; using var sendQueue = SendQueue.NewSendQ( server, account, startLoop: _ => { }, clientFactory: () => internalClient, isRunning: () => { var current = running; running = false; return current; }, processInbound: (_, msg) => received.Add((byte[])msg.Clone()), flush: _ => flushCalls++); var hdr = Encoding.ASCII.GetBytes("NATS/1.0\r\nHeader: A\r\n\r\n"); var msg = Encoding.ASCII.GetBytes("hello"); sendQueue.Send("events.1", "reply.1", hdr, msg); hdr[0] = (byte)'X'; msg[0] = (byte)'Y'; sendQueue.InternalLoop(); received.Count.ShouldBe(1); flushCalls.ShouldBe(1); var expectedHeader = Encoding.ASCII.GetBytes("NATS/1.0\r\nHeader: A\r\n\r\n"); var expectedPayload = Encoding.ASCII.GetBytes("hello\r\n"); var expected = new byte[expectedHeader.Length + expectedPayload.Length]; Buffer.BlockCopy(expectedHeader, 0, expected, 0, expectedHeader.Length); Buffer.BlockCopy(expectedPayload, 0, expected, expectedHeader.Length, expectedPayload.Length); received[0].ShouldBe(expected); } private static (NatsServer server, Account account) CreateServerAndAccount() { var (server, err) = NatsServer.NewServer(new ServerOptions { NoSystemAccount = true }); err.ShouldBeNull(); server.ShouldNotBeNull(); var account = new Account { Name = "SENDQ" }; return (server!, account); } }