using System.Net; using System.Net.Sockets; using System.Text; using Microsoft.Extensions.Logging.Abstractions; using NATS.Client.Core; using NATS.Server.Auth; using NATS.Server.Configuration; using NATS.Server.Routes; using NATS.Server.Subscriptions; namespace NATS.Server.Tests.Routes; /// /// Tests for route message forwarding (RMSG), reply propagation, payload delivery, /// and cross-cluster message routing. /// Ported from Go: server/routes_test.go. /// public class RouteForwardingTests { // -- Helpers -- private static async Task<(NatsServer Server, CancellationTokenSource Cts)> StartServerAsync( NatsOptions opts) { var server = new NatsServer(opts, NullLoggerFactory.Instance); var cts = new CancellationTokenSource(); _ = server.StartAsync(cts.Token); await server.WaitForReadyAsync(); return (server, cts); } private static NatsOptions MakeClusterOpts(string? clusterName = null, string? seed = null) { return new NatsOptions { Host = "127.0.0.1", Port = 0, Cluster = new ClusterOptions { Name = clusterName ?? Guid.NewGuid().ToString("N"), Host = "127.0.0.1", Port = 0, Routes = seed is null ? [] : [seed], }, }; } private static async Task WaitForRouteFormation(NatsServer a, NatsServer b, int timeoutMs = 5000) { using var timeout = new CancellationTokenSource(timeoutMs); while (!timeout.IsCancellationRequested && (Interlocked.Read(ref a.Stats.Routes) == 0 || Interlocked.Read(ref b.Stats.Routes) == 0)) { await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default); } } private static async Task WaitForCondition(Func predicate, int timeoutMs = 5000) { using var cts = new CancellationTokenSource(timeoutMs); while (!cts.IsCancellationRequested) { if (predicate()) return; await Task.Delay(20, cts.Token).ContinueWith(_ => { }, TaskScheduler.Default); } throw new TimeoutException("Condition not met."); } private static async Task DisposeServers(params (NatsServer Server, CancellationTokenSource Cts)[] servers) { foreach (var (server, cts) in servers) { await cts.CancelAsync(); server.Dispose(); cts.Dispose(); } } // -- Tests: RMSG forwarding -- // Go: TestSeedSolicitWorks server/routes_test.go:365 (message forwarding) [Fact] public async Task RMSG_forwards_published_message_to_remote_subscriber() { var cluster = Guid.NewGuid().ToString("N"); var a = await StartServerAsync(MakeClusterOpts(cluster)); var b = await StartServerAsync(MakeClusterOpts(cluster, a.Server.ClusterListen!)); try { await WaitForRouteFormation(a.Server, b.Server); await using var subscriber = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{a.Server.Port}", }); await subscriber.ConnectAsync(); await using var sub = await subscriber.SubscribeCoreAsync("rmsg.test"); await subscriber.PingAsync(); await WaitForCondition(() => b.Server.HasRemoteInterest("rmsg.test")); await using var publisher = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{b.Server.Port}", }); await publisher.ConnectAsync(); await publisher.PublishAsync("rmsg.test", "routed-payload"); using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); var msg = await sub.Msgs.ReadAsync(timeout.Token); msg.Data.ShouldBe("routed-payload"); } finally { await DisposeServers(a, b); } } // Go: Request-Reply across routes via raw socket with reply-to [Fact] public async Task Request_reply_works_across_routed_servers() { var cluster = Guid.NewGuid().ToString("N"); var optsA = MakeClusterOpts(cluster); optsA.Cluster!.PoolSize = 1; var a = await StartServerAsync(optsA); var optsB = MakeClusterOpts(cluster, a.Server.ClusterListen!); optsB.Cluster!.PoolSize = 1; var b = await StartServerAsync(optsB); try { await WaitForRouteFormation(a.Server, b.Server); // Responder on server A: subscribe via raw socket to get exact wire control using var responderSock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); await responderSock.ConnectAsync(IPAddress.Loopback, a.Server.Port); var buf = new byte[4096]; _ = await responderSock.ReceiveAsync(buf); // INFO await responderSock.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\nSUB service.echo 1\r\nPING\r\n")); await ReadUntilAsync(responderSock, "PONG"); await WaitForCondition(() => b.Server.HasRemoteInterest("service.echo")); // Requester on server B: subscribe to reply inbox via raw socket using var requesterSock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); await requesterSock.ConnectAsync(IPAddress.Loopback, b.Server.Port); _ = await requesterSock.ReceiveAsync(buf); // INFO var replyInbox = $"_INBOX.{Guid.NewGuid():N}"; await requesterSock.SendAsync(Encoding.ASCII.GetBytes( $"CONNECT {{}}\r\nSUB {replyInbox} 2\r\nPING\r\n")); await ReadUntilAsync(requesterSock, "PONG"); await WaitForCondition(() => a.Server.HasRemoteInterest(replyInbox)); // Publish request with reply-to from B await requesterSock.SendAsync(Encoding.ASCII.GetBytes( $"PUB service.echo {replyInbox} 4\r\nping\r\nPING\r\n")); await ReadUntilAsync(requesterSock, "PONG"); // Read the request on A, verify reply-to var requestData = await ReadUntilAsync(responderSock, "ping"); requestData.ShouldContain($"MSG service.echo 1 {replyInbox} 4"); requestData.ShouldContain("ping"); // Publish reply from A to the reply-to subject await responderSock.SendAsync(Encoding.ASCII.GetBytes( $"PUB {replyInbox} 4\r\npong\r\nPING\r\n")); await ReadUntilAsync(responderSock, "PONG"); // Read the reply on B var replyData = await ReadUntilAsync(requesterSock, "pong"); replyData.ShouldContain($"MSG {replyInbox} 2 4"); replyData.ShouldContain("pong"); } finally { await DisposeServers(a, b); } } // Go: RMSG wire-level parsing [Fact] public async Task RMSG_wire_frame_delivers_payload_to_handler() { using var listener = new TcpListener(IPAddress.Loopback, 0); listener.Start(); var port = ((IPEndPoint)listener.LocalEndpoint).Port; using var remote = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); await remote.ConnectAsync(IPAddress.Loopback, port); using var routeSock = await listener.AcceptSocketAsync(); await using var route = new RouteConnection(routeSock); using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); var handshakeTask = route.PerformOutboundHandshakeAsync("LOCAL", timeout.Token); _ = await ReadLineAsync(remote, timeout.Token); await WriteLineAsync(remote, "ROUTE REMOTE", timeout.Token); await handshakeTask; RouteMessage? receivedMsg = null; route.RoutedMessageReceived = msg => { receivedMsg = msg; return Task.CompletedTask; }; route.StartFrameLoop(timeout.Token); var payload = "hello-world"; var frame = $"RMSG $G test.subject - {payload.Length}\r\n{payload}\r\n"; await remote.SendAsync(Encoding.ASCII.GetBytes(frame), SocketFlags.None, timeout.Token); await WaitForCondition(() => receivedMsg != null); receivedMsg.ShouldNotBeNull(); receivedMsg!.Subject.ShouldBe("test.subject"); receivedMsg.ReplyTo.ShouldBeNull(); Encoding.UTF8.GetString(receivedMsg.Payload.Span).ShouldBe("hello-world"); } // Go: RMSG with reply subject [Fact] public async Task RMSG_wire_frame_includes_reply_to() { using var listener = new TcpListener(IPAddress.Loopback, 0); listener.Start(); var port = ((IPEndPoint)listener.LocalEndpoint).Port; using var remote = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); await remote.ConnectAsync(IPAddress.Loopback, port); using var routeSock = await listener.AcceptSocketAsync(); await using var route = new RouteConnection(routeSock); using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); var handshakeTask = route.PerformOutboundHandshakeAsync("LOCAL", timeout.Token); _ = await ReadLineAsync(remote, timeout.Token); await WriteLineAsync(remote, "ROUTE REMOTE", timeout.Token); await handshakeTask; RouteMessage? receivedMsg = null; route.RoutedMessageReceived = msg => { receivedMsg = msg; return Task.CompletedTask; }; route.StartFrameLoop(timeout.Token); var payload = "data"; var frame = $"RMSG $G test.subject _INBOX.abc123 {payload.Length}\r\n{payload}\r\n"; await remote.SendAsync(Encoding.ASCII.GetBytes(frame), SocketFlags.None, timeout.Token); await WaitForCondition(() => receivedMsg != null); receivedMsg.ShouldNotBeNull(); receivedMsg!.Subject.ShouldBe("test.subject"); receivedMsg.ReplyTo.ShouldBe("_INBOX.abc123"); } // Go: RMSG with account [Fact] public async Task RMSG_wire_frame_with_account_scope() { using var listener = new TcpListener(IPAddress.Loopback, 0); listener.Start(); var port = ((IPEndPoint)listener.LocalEndpoint).Port; using var remote = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); await remote.ConnectAsync(IPAddress.Loopback, port); using var routeSock = await listener.AcceptSocketAsync(); await using var route = new RouteConnection(routeSock); using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); var handshakeTask = route.PerformOutboundHandshakeAsync("LOCAL", timeout.Token); _ = await ReadLineAsync(remote, timeout.Token); await WriteLineAsync(remote, "ROUTE REMOTE", timeout.Token); await handshakeTask; RouteMessage? receivedMsg = null; route.RoutedMessageReceived = msg => { receivedMsg = msg; return Task.CompletedTask; }; route.StartFrameLoop(timeout.Token); var payload = "acct-data"; var frame = $"RMSG MYACCOUNT test.sub - {payload.Length}\r\n{payload}\r\n"; await remote.SendAsync(Encoding.ASCII.GetBytes(frame), SocketFlags.None, timeout.Token); await WaitForCondition(() => receivedMsg != null); receivedMsg.ShouldNotBeNull(); receivedMsg!.Account.ShouldBe("MYACCOUNT"); receivedMsg.Subject.ShouldBe("test.sub"); } // Go: RMSG with zero-length payload [Fact] public async Task RMSG_wire_frame_with_empty_payload() { using var listener = new TcpListener(IPAddress.Loopback, 0); listener.Start(); var port = ((IPEndPoint)listener.LocalEndpoint).Port; using var remote = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); await remote.ConnectAsync(IPAddress.Loopback, port); using var routeSock = await listener.AcceptSocketAsync(); await using var route = new RouteConnection(routeSock); using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); var handshakeTask = route.PerformOutboundHandshakeAsync("LOCAL", timeout.Token); _ = await ReadLineAsync(remote, timeout.Token); await WriteLineAsync(remote, "ROUTE REMOTE", timeout.Token); await handshakeTask; RouteMessage? receivedMsg = null; route.RoutedMessageReceived = msg => { receivedMsg = msg; return Task.CompletedTask; }; route.StartFrameLoop(timeout.Token); var frame = "RMSG $G empty.test - 0\r\n\r\n"; await remote.SendAsync(Encoding.ASCII.GetBytes(frame), SocketFlags.None, timeout.Token); await WaitForCondition(() => receivedMsg != null); receivedMsg.ShouldNotBeNull(); receivedMsg!.Subject.ShouldBe("empty.test"); receivedMsg.Payload.Length.ShouldBe(0); } // Go: TestServerRoutesWithClients server/routes_test.go:216 (large payload) [Fact] public async Task Large_payload_forwarded_across_route() { var cluster = Guid.NewGuid().ToString("N"); var a = await StartServerAsync(MakeClusterOpts(cluster)); var b = await StartServerAsync(MakeClusterOpts(cluster, a.Server.ClusterListen!)); try { await WaitForRouteFormation(a.Server, b.Server); await using var subscriber = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{a.Server.Port}", }); await subscriber.ConnectAsync(); await using var sub = await subscriber.SubscribeCoreAsync("large.payload"); await subscriber.PingAsync(); await WaitForCondition(() => b.Server.HasRemoteInterest("large.payload")); await using var publisher = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{b.Server.Port}", }); await publisher.ConnectAsync(); var data = new byte[8192]; Random.Shared.NextBytes(data); await publisher.PublishAsync("large.payload", data); using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); var msg = await sub.Msgs.ReadAsync(timeout.Token); msg.Data.ShouldBe(data); } finally { await DisposeServers(a, b); } } // Go: TestRoutePool server/routes_test.go:1966 (message sent and received across pool) [Fact] public async Task Messages_flow_across_route_with_pool_size() { var cluster = Guid.NewGuid().ToString("N"); var optsA = new NatsOptions { Host = "127.0.0.1", Port = 0, Cluster = new ClusterOptions { Name = cluster, Host = "127.0.0.1", Port = 0, PoolSize = 2, }, }; var a = await StartServerAsync(optsA); var optsB = new NatsOptions { Host = "127.0.0.1", Port = 0, Cluster = new ClusterOptions { Name = cluster, Host = "127.0.0.1", Port = 0, PoolSize = 2, Routes = [a.Server.ClusterListen!], }, }; var b = await StartServerAsync(optsB); try { await WaitForRouteFormation(a.Server, b.Server); await using var subscriber = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{b.Server.Port}", }); await subscriber.ConnectAsync(); await using var sub = await subscriber.SubscribeCoreAsync("pool.forward"); await subscriber.PingAsync(); await WaitForCondition(() => a.Server.HasRemoteInterest("pool.forward")); await using var publisher = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{a.Server.Port}", }); await publisher.ConnectAsync(); const int messageCount = 10; for (var i = 0; i < messageCount; i++) await publisher.PublishAsync("pool.forward", $"msg-{i}"); // With PoolSize=2, each message may be forwarded on multiple route connections. // Collect all received messages and verify each expected one arrived at least once. var received = new HashSet(); using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(10)); while (received.Count < messageCount) { var msg = await sub.Msgs.ReadAsync(timeout.Token); msg.Data.ShouldNotBeNull(); received.Add(msg.Data!); } for (var i = 0; i < messageCount; i++) received.ShouldContain($"msg-{i}"); } finally { await DisposeServers(a, b); } } // Go: TestRoutePerAccount server/routes_test.go:2539 (account-scoped delivery) [Fact] public async Task Account_scoped_RMSG_delivers_to_correct_account() { var users = new User[] { new() { Username = "ua", Password = "p", Account = "A" }, new() { Username = "ub", Password = "p", Account = "B" }, }; var cluster = Guid.NewGuid().ToString("N"); var optsA = new NatsOptions { Host = "127.0.0.1", Port = 0, Users = users, Cluster = new ClusterOptions { Name = cluster, Host = "127.0.0.1", Port = 0, }, }; var a = await StartServerAsync(optsA); var optsB = new NatsOptions { Host = "127.0.0.1", Port = 0, Users = users, Cluster = new ClusterOptions { Name = cluster, Host = "127.0.0.1", Port = 0, Routes = [a.Server.ClusterListen!], }, }; var b = await StartServerAsync(optsB); try { await WaitForRouteFormation(a.Server, b.Server); // Account A subscriber on server B await using var subConn = new NatsConnection(new NatsOpts { Url = $"nats://ua:p@127.0.0.1:{b.Server.Port}", }); await subConn.ConnectAsync(); await using var sub = await subConn.SubscribeCoreAsync("acct.fwd"); await subConn.PingAsync(); await WaitForCondition(() => a.Server.HasRemoteInterest("A", "acct.fwd")); // Publish from account A on server A await using var pubConn = new NatsConnection(new NatsOpts { Url = $"nats://ua:p@127.0.0.1:{a.Server.Port}", }); await pubConn.ConnectAsync(); await pubConn.PublishAsync("acct.fwd", "from-a"); using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); var msg = await sub.Msgs.ReadAsync(timeout.Token); msg.Data.ShouldBe("from-a"); } finally { await DisposeServers(a, b); } } // Go: bidirectional forwarding [Fact] public async Task Bidirectional_message_forwarding_across_route() { var cluster = Guid.NewGuid().ToString("N"); var a = await StartServerAsync(MakeClusterOpts(cluster)); var b = await StartServerAsync(MakeClusterOpts(cluster, a.Server.ClusterListen!)); try { await WaitForRouteFormation(a.Server, b.Server); await using var ncA = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{a.Server.Port}", }); await ncA.ConnectAsync(); await using var ncB = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{b.Server.Port}", }); await ncB.ConnectAsync(); // Sub on A, pub from B await using var subOnA = await ncA.SubscribeCoreAsync("bidir.a"); // Sub on B, pub from A await using var subOnB = await ncB.SubscribeCoreAsync("bidir.b"); await ncA.PingAsync(); await ncB.PingAsync(); await WaitForCondition(() => b.Server.HasRemoteInterest("bidir.a") && a.Server.HasRemoteInterest("bidir.b")); await ncB.PublishAsync("bidir.a", "from-b"); await ncA.PublishAsync("bidir.b", "from-a"); using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); var msgA = await subOnA.Msgs.ReadAsync(timeout.Token); var msgB = await subOnB.Msgs.ReadAsync(timeout.Token); msgA.Data.ShouldBe("from-b"); msgB.Data.ShouldBe("from-a"); } finally { await DisposeServers(a, b); } } // Go: Route forwarding with reply (non-request-reply, just reply subject) [Fact] public async Task Message_with_reply_subject_forwarded_across_route() { var cluster = Guid.NewGuid().ToString("N"); var a = await StartServerAsync(MakeClusterOpts(cluster)); var b = await StartServerAsync(MakeClusterOpts(cluster, a.Server.ClusterListen!)); try { await WaitForRouteFormation(a.Server, b.Server); await using var subscriber = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{a.Server.Port}", }); await subscriber.ConnectAsync(); await using var sub = await subscriber.SubscribeCoreAsync("reply.subject.test"); await subscriber.PingAsync(); await WaitForCondition(() => b.Server.HasRemoteInterest("reply.subject.test")); // Use raw socket to publish with reply-to using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); await sock.ConnectAsync(IPAddress.Loopback, b.Server.Port); var buf = new byte[4096]; _ = await sock.ReceiveAsync(buf); // INFO await sock.SendAsync(Encoding.ASCII.GetBytes( "CONNECT {}\r\nPUB reply.subject.test _INBOX.reply123 5\r\nHello\r\nPING\r\n")); await ReadUntilAsync(sock, "PONG"); using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); var msg = await sub.Msgs.ReadAsync(timeout.Token); msg.Data.ShouldBe("Hello"); msg.ReplyTo.ShouldBe("_INBOX.reply123"); sock.Dispose(); } finally { await DisposeServers(a, b); } } // Go: Multiple messages with varying payloads [Fact] public async Task Multiple_different_subjects_forwarded_simultaneously() { var cluster = Guid.NewGuid().ToString("N"); var a = await StartServerAsync(MakeClusterOpts(cluster)); var b = await StartServerAsync(MakeClusterOpts(cluster, a.Server.ClusterListen!)); try { await WaitForRouteFormation(a.Server, b.Server); await using var ncA = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{a.Server.Port}", }); await ncA.ConnectAsync(); await using var sub1 = await ncA.SubscribeCoreAsync("multi.a"); await using var sub2 = await ncA.SubscribeCoreAsync("multi.b"); await using var sub3 = await ncA.SubscribeCoreAsync("multi.c"); await ncA.PingAsync(); await WaitForCondition(() => b.Server.HasRemoteInterest("multi.a") && b.Server.HasRemoteInterest("multi.b") && b.Server.HasRemoteInterest("multi.c")); await using var pub = new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{b.Server.Port}", }); await pub.ConnectAsync(); await pub.PublishAsync("multi.a", "alpha"); await pub.PublishAsync("multi.b", "beta"); await pub.PublishAsync("multi.c", "gamma"); using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); var msgA = await sub1.Msgs.ReadAsync(timeout.Token); var msgB = await sub2.Msgs.ReadAsync(timeout.Token); var msgC = await sub3.Msgs.ReadAsync(timeout.Token); msgA.Data.ShouldBe("alpha"); msgB.Data.ShouldBe("beta"); msgC.Data.ShouldBe("gamma"); } finally { await DisposeServers(a, b); } } // Go: SendRmsgAsync (send RMSG on RouteConnection) [Fact] public async Task RouteConnection_SendRmsgAsync_sends_valid_wire_frame() { using var listener = new TcpListener(IPAddress.Loopback, 0); listener.Start(); var port = ((IPEndPoint)listener.LocalEndpoint).Port; using var remote = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); await remote.ConnectAsync(IPAddress.Loopback, port); using var routeSock = await listener.AcceptSocketAsync(); await using var route = new RouteConnection(routeSock); using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); var handshakeTask = route.PerformOutboundHandshakeAsync("LOCAL", timeout.Token); _ = await ReadLineAsync(remote, timeout.Token); await WriteLineAsync(remote, "ROUTE REMOTE", timeout.Token); await handshakeTask; var payload = Encoding.UTF8.GetBytes("test-payload"); await route.SendRmsgAsync("$G", "subject.test", "_INBOX.reply", payload, timeout.Token); // Read the RMSG frame from the remote side, waiting until expected content arrives var data = await ReadUntilAsync(remote, "test-payload"); data.ShouldContain("RMSG $G subject.test _INBOX.reply 12"); data.ShouldContain("test-payload"); } // Go: SendRsPlusAsync (send RS+ on RouteConnection) [Fact] public async Task RouteConnection_SendRsPlusAsync_sends_valid_wire_frame() { using var listener = new TcpListener(IPAddress.Loopback, 0); listener.Start(); var port = ((IPEndPoint)listener.LocalEndpoint).Port; using var remote = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); await remote.ConnectAsync(IPAddress.Loopback, port); using var routeSock = await listener.AcceptSocketAsync(); await using var route = new RouteConnection(routeSock); using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); var handshakeTask = route.PerformOutboundHandshakeAsync("LOCAL", timeout.Token); _ = await ReadLineAsync(remote, timeout.Token); await WriteLineAsync(remote, "ROUTE REMOTE", timeout.Token); await handshakeTask; await route.SendRsPlusAsync("$G", "foo.bar", null, timeout.Token); var data = await ReadAllAvailableAsync(remote, timeout.Token); data.ShouldContain("RS+ $G foo.bar"); } // Go: SendRsMinusAsync (send RS- on RouteConnection) [Fact] public async Task RouteConnection_SendRsMinusAsync_sends_valid_wire_frame() { using var listener = new TcpListener(IPAddress.Loopback, 0); listener.Start(); var port = ((IPEndPoint)listener.LocalEndpoint).Port; using var remote = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); await remote.ConnectAsync(IPAddress.Loopback, port); using var routeSock = await listener.AcceptSocketAsync(); await using var route = new RouteConnection(routeSock); using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); var handshakeTask = route.PerformOutboundHandshakeAsync("LOCAL", timeout.Token); _ = await ReadLineAsync(remote, timeout.Token); await WriteLineAsync(remote, "ROUTE REMOTE", timeout.Token); await handshakeTask; await route.SendRsMinusAsync("$G", "foo.bar", null, timeout.Token); var data = await ReadAllAvailableAsync(remote, timeout.Token); data.ShouldContain("RS- $G foo.bar"); } // Go: SendRsPlusAsync with queue [Fact] public async Task RouteConnection_SendRsPlusAsync_with_queue_sends_valid_frame() { using var listener = new TcpListener(IPAddress.Loopback, 0); listener.Start(); var port = ((IPEndPoint)listener.LocalEndpoint).Port; using var remote = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); await remote.ConnectAsync(IPAddress.Loopback, port); using var routeSock = await listener.AcceptSocketAsync(); await using var route = new RouteConnection(routeSock); using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); var handshakeTask = route.PerformOutboundHandshakeAsync("LOCAL", timeout.Token); _ = await ReadLineAsync(remote, timeout.Token); await WriteLineAsync(remote, "ROUTE REMOTE", timeout.Token); await handshakeTask; await route.SendRsPlusAsync("ACCT_A", "foo.bar", "myqueue", timeout.Token); var data = await ReadAllAvailableAsync(remote, timeout.Token); data.ShouldContain("RS+ ACCT_A foo.bar myqueue"); } // -- Wire-level helpers -- private static async Task ReadLineAsync(Socket socket, CancellationToken ct) { var bytes = new List(64); var single = new byte[1]; while (true) { var read = await socket.ReceiveAsync(single, SocketFlags.None, ct); if (read == 0) break; if (single[0] == (byte)'\n') break; if (single[0] != (byte)'\r') bytes.Add(single[0]); } return Encoding.ASCII.GetString([.. bytes]); } private static Task WriteLineAsync(Socket socket, string line, CancellationToken ct) => socket.SendAsync(Encoding.ASCII.GetBytes($"{line}\r\n"), SocketFlags.None, ct).AsTask(); private static async Task ReadAllAvailableAsync(Socket socket, CancellationToken ct) { var sb = new StringBuilder(); var buf = new byte[4096]; // First read blocks until at least some data arrives var n = await socket.ReceiveAsync(buf, SocketFlags.None, ct); if (n > 0) sb.Append(Encoding.ASCII.GetString(buf, 0, n)); // Drain any additional data that's already buffered while (n == buf.Length && socket.Available > 0) { n = await socket.ReceiveAsync(buf, SocketFlags.None, ct); if (n == 0) break; sb.Append(Encoding.ASCII.GetString(buf, 0, n)); } return sb.ToString(); } private static async Task ReadUntilAsync(Socket sock, string expected) { var sb = new StringBuilder(); var buf = new byte[4096]; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); while (!sb.ToString().Contains(expected, StringComparison.Ordinal)) { var n = await sock.ReceiveAsync(buf, SocketFlags.None, cts.Token); if (n == 0) break; sb.Append(Encoding.ASCII.GetString(buf, 0, n)); } return sb.ToString(); } }