using System.Net; using System.Net.Sockets; using System.Text; using Microsoft.Extensions.Logging.Abstractions; using NATS.Client.Core; using NATS.Server.Auth; using NATS.Server.Configuration; using NATS.Server.LeafNodes; using NATS.Server.Subscriptions; namespace NATS.Server.Tests.LeafNodes; /// /// Tests for leaf node connection establishment, authentication, and lifecycle. /// Reference: golang/nats-server/server/leafnode_test.go /// public class LeafNodeConnectionTests { // Go: TestLeafNodeBasicAuthSingleton server/leafnode_test.go:602 [Fact] public async Task Leaf_node_connects_with_basic_hub_spoke_setup() { await using var fixture = await LeafFixture.StartAsync(); fixture.Hub.Stats.Leafs.ShouldBeGreaterThan(0); fixture.Spoke.Stats.Leafs.ShouldBeGreaterThan(0); } // Go: TestLeafNodesBasicTokenAuth server/leafnode_test.go:10862 [Fact] public async Task Leaf_node_connects_with_token_auth_on_hub() { var hubOptions = new NatsOptions { Host = "127.0.0.1", Port = 0, Authorization = "secret-token", LeafNode = new LeafNodeOptions { Host = "127.0.0.1", Port = 0 }, }; var hub = new NatsServer(hubOptions, NullLoggerFactory.Instance); var hubCts = new CancellationTokenSource(); _ = hub.StartAsync(hubCts.Token); await hub.WaitForReadyAsync(); var spokeOptions = new NatsOptions { Host = "127.0.0.1", Port = 0, LeafNode = new LeafNodeOptions { Host = "127.0.0.1", Port = 0, Remotes = [hub.LeafListen!], }, }; var spoke = new NatsServer(spokeOptions, NullLoggerFactory.Instance); var spokeCts = new CancellationTokenSource(); _ = spoke.StartAsync(spokeCts.Token); await spoke.WaitForReadyAsync(); using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); while (!timeout.IsCancellationRequested && (hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0)) await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default); hub.Stats.Leafs.ShouldBeGreaterThan(0); spoke.Stats.Leafs.ShouldBeGreaterThan(0); await spokeCts.CancelAsync(); await hubCts.CancelAsync(); spoke.Dispose(); hub.Dispose(); spokeCts.Dispose(); hubCts.Dispose(); } // Go: TestLeafNodeBasicAuthSingleton server/leafnode_test.go:602 [Fact] public async Task Leaf_node_connects_with_user_password_auth() { var users = new User[] { new() { Username = "leafuser", Password = "leafpass" } }; var hubOptions = new NatsOptions { Host = "127.0.0.1", Port = 0, Users = users, LeafNode = new LeafNodeOptions { Host = "127.0.0.1", Port = 0 }, }; var hub = new NatsServer(hubOptions, NullLoggerFactory.Instance); var hubCts = new CancellationTokenSource(); _ = hub.StartAsync(hubCts.Token); await hub.WaitForReadyAsync(); var spokeOptions = new NatsOptions { Host = "127.0.0.1", Port = 0, LeafNode = new LeafNodeOptions { Host = "127.0.0.1", Port = 0, Remotes = [hub.LeafListen!] }, }; var spoke = new NatsServer(spokeOptions, NullLoggerFactory.Instance); var spokeCts = new CancellationTokenSource(); _ = spoke.StartAsync(spokeCts.Token); await spoke.WaitForReadyAsync(); using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); while (!timeout.IsCancellationRequested && (hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0)) await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default); hub.Stats.Leafs.ShouldBeGreaterThan(0); await spokeCts.CancelAsync(); await hubCts.CancelAsync(); spoke.Dispose(); hub.Dispose(); spokeCts.Dispose(); hubCts.Dispose(); } // Go: TestLeafNodeRTT server/leafnode_test.go:488 [Fact] public async Task Hub_and_spoke_both_report_leaf_connection_count() { await using var fixture = await LeafFixture.StartAsync(); Interlocked.Read(ref fixture.Hub.Stats.Leafs).ShouldBe(1); Interlocked.Read(ref fixture.Spoke.Stats.Leafs).ShouldBe(1); } // Go: TestLeafNodeTwoRemotesToSameHubAccount server/leafnode_test.go:8758 [Fact] public async Task Two_spoke_servers_can_connect_to_same_hub() { await using var fixture = await TwoSpokeFixture.StartAsync(); Interlocked.Read(ref fixture.Hub.Stats.Leafs).ShouldBeGreaterThanOrEqualTo(2); Interlocked.Read(ref fixture.Spoke1.Stats.Leafs).ShouldBeGreaterThan(0); Interlocked.Read(ref fixture.Spoke2.Stats.Leafs).ShouldBeGreaterThan(0); } // Go: TestLeafNodeRemoteWrongPort server/leafnode_test.go:1095 [Fact] public async Task Outbound_handshake_completes_between_raw_sockets() { using var listener = new TcpListener(IPAddress.Loopback, 0); listener.Start(); var port = ((IPEndPoint)listener.LocalEndpoint).Port; using var clientSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); await clientSocket.ConnectAsync(IPAddress.Loopback, port); using var acceptedSocket = await listener.AcceptSocketAsync(); await using var leaf = new LeafConnection(acceptedSocket); using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); var handshakeTask = leaf.PerformOutboundHandshakeAsync("LOCAL", timeout.Token); (await ReadLineAsync(clientSocket, timeout.Token)).ShouldBe("LEAF LOCAL"); await WriteLineAsync(clientSocket, "LEAF REMOTE", timeout.Token); await handshakeTask; leaf.RemoteId.ShouldBe("REMOTE"); } // Go: TestLeafNodeCloseTLSConnection server/leafnode_test.go:968 [Fact] public async Task Inbound_handshake_completes_between_raw_sockets() { using var listener = new TcpListener(IPAddress.Loopback, 0); listener.Start(); var port = ((IPEndPoint)listener.LocalEndpoint).Port; using var clientSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); await clientSocket.ConnectAsync(IPAddress.Loopback, port); using var acceptedSocket = await listener.AcceptSocketAsync(); await using var leaf = new LeafConnection(acceptedSocket); using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); var handshakeTask = leaf.PerformInboundHandshakeAsync("SERVER", timeout.Token); await WriteLineAsync(clientSocket, "LEAF REMOTE_CLIENT", timeout.Token); (await ReadLineAsync(clientSocket, timeout.Token)).ShouldBe("LEAF SERVER"); await handshakeTask; leaf.RemoteId.ShouldBe("REMOTE_CLIENT"); } // Go: TestLeafNodeNoPingBeforeConnect server/leafnode_test.go:3713 [Fact] public async Task Leaf_connection_disposes_cleanly_without_starting_loop() { using var listener = new TcpListener(IPAddress.Loopback, 0); listener.Start(); var port = ((IPEndPoint)listener.LocalEndpoint).Port; using var clientSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); await clientSocket.ConnectAsync(IPAddress.Loopback, port); using var acceptedSocket = await listener.AcceptSocketAsync(); var leaf = new LeafConnection(acceptedSocket); await leaf.DisposeAsync(); var buffer = new byte[1]; var read = await clientSocket.ReceiveAsync(buffer, SocketFlags.None); read.ShouldBe(0); } // Go: TestLeafNodeBannerNoClusterNameIfNoCluster server/leafnode_test.go:9803 [Fact] public async Task Leaf_connection_sends_LS_plus_and_LS_minus() { using var listener = new TcpListener(IPAddress.Loopback, 0); listener.Start(); var port = ((IPEndPoint)listener.LocalEndpoint).Port; using var remoteSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); await remoteSocket.ConnectAsync(IPAddress.Loopback, port); using var leafSocket = await listener.AcceptSocketAsync(); await using var leaf = new LeafConnection(leafSocket); using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); var handshakeTask = leaf.PerformOutboundHandshakeAsync("LOCAL", timeout.Token); (await ReadLineAsync(remoteSocket, timeout.Token)).ShouldBe("LEAF LOCAL"); await WriteLineAsync(remoteSocket, "LEAF REMOTE", timeout.Token); await handshakeTask; await leaf.SendLsPlusAsync("$G", "foo.bar", null, timeout.Token); (await ReadLineAsync(remoteSocket, timeout.Token)).ShouldBe("LS+ $G foo.bar"); await leaf.SendLsPlusAsync("$G", "foo.baz", "queue1", timeout.Token); (await ReadLineAsync(remoteSocket, timeout.Token)).ShouldBe("LS+ $G foo.baz queue1"); await leaf.SendLsMinusAsync("$G", "foo.bar", null, timeout.Token); (await ReadLineAsync(remoteSocket, timeout.Token)).ShouldBe("LS- $G foo.bar"); } // Go: TestLeafNodeLMsgSplit server/leafnode_test.go:2387 [Fact] public async Task Leaf_connection_sends_LMSG() { using var listener = new TcpListener(IPAddress.Loopback, 0); listener.Start(); var port = ((IPEndPoint)listener.LocalEndpoint).Port; using var remoteSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); await remoteSocket.ConnectAsync(IPAddress.Loopback, port); using var leafSocket = await listener.AcceptSocketAsync(); await using var leaf = new LeafConnection(leafSocket); using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); var handshakeTask = leaf.PerformOutboundHandshakeAsync("LOCAL", timeout.Token); (await ReadLineAsync(remoteSocket, timeout.Token)).ShouldBe("LEAF LOCAL"); await WriteLineAsync(remoteSocket, "LEAF REMOTE", timeout.Token); await handshakeTask; var payload = "hello world"u8.ToArray(); await leaf.SendMessageAsync("$G", "test.subject", "reply-to", payload, timeout.Token); var controlLine = await ReadLineAsync(remoteSocket, timeout.Token); controlLine.ShouldBe($"LMSG $G test.subject reply-to {payload.Length}"); } // Go: TestLeafNodeLMsgSplit server/leafnode_test.go:2387 [Fact] public async Task Leaf_connection_sends_LMSG_with_no_reply() { using var listener = new TcpListener(IPAddress.Loopback, 0); listener.Start(); var port = ((IPEndPoint)listener.LocalEndpoint).Port; using var remoteSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); await remoteSocket.ConnectAsync(IPAddress.Loopback, port); using var leafSocket = await listener.AcceptSocketAsync(); await using var leaf = new LeafConnection(leafSocket); using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); var handshakeTask = leaf.PerformOutboundHandshakeAsync("LOCAL", timeout.Token); (await ReadLineAsync(remoteSocket, timeout.Token)).ShouldBe("LEAF LOCAL"); await WriteLineAsync(remoteSocket, "LEAF REMOTE", timeout.Token); await handshakeTask; var payload = "test"u8.ToArray(); await leaf.SendMessageAsync("ACCT", "subject", null, payload, timeout.Token); var controlLine = await ReadLineAsync(remoteSocket, timeout.Token); controlLine.ShouldBe($"LMSG ACCT subject - {payload.Length}"); } // Go: TestLeafNodeLMsgSplit server/leafnode_test.go:2387 [Fact] public async Task Leaf_connection_sends_LMSG_with_empty_payload() { using var listener = new TcpListener(IPAddress.Loopback, 0); listener.Start(); var port = ((IPEndPoint)listener.LocalEndpoint).Port; using var remoteSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); await remoteSocket.ConnectAsync(IPAddress.Loopback, port); using var leafSocket = await listener.AcceptSocketAsync(); await using var leaf = new LeafConnection(leafSocket); using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); var handshakeTask = leaf.PerformOutboundHandshakeAsync("LOCAL", timeout.Token); (await ReadLineAsync(remoteSocket, timeout.Token)).ShouldBe("LEAF LOCAL"); await WriteLineAsync(remoteSocket, "LEAF REMOTE", timeout.Token); await handshakeTask; await leaf.SendMessageAsync("$G", "empty.msg", null, ReadOnlyMemory.Empty, timeout.Token); var controlLine = await ReadLineAsync(remoteSocket, timeout.Token); controlLine.ShouldBe("LMSG $G empty.msg - 0"); } // Go: TestLeafNodeTmpClients server/leafnode_test.go:1663 [Fact] public async Task Leaf_connection_receives_LS_plus_and_triggers_callback() { using var listener = new TcpListener(IPAddress.Loopback, 0); listener.Start(); var port = ((IPEndPoint)listener.LocalEndpoint).Port; using var remoteSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); await remoteSocket.ConnectAsync(IPAddress.Loopback, port); using var leafSocket = await listener.AcceptSocketAsync(); await using var leaf = new LeafConnection(leafSocket); using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); var handshakeTask = leaf.PerformOutboundHandshakeAsync("LOCAL", timeout.Token); (await ReadLineAsync(remoteSocket, timeout.Token)).ShouldBe("LEAF LOCAL"); await WriteLineAsync(remoteSocket, "LEAF REMOTE", timeout.Token); await handshakeTask; var received = new List(); leaf.RemoteSubscriptionReceived = sub => { received.Add(sub); return Task.CompletedTask; }; leaf.StartLoop(timeout.Token); await WriteLineAsync(remoteSocket, "LS+ $G orders.>", timeout.Token); await WaitForAsync(() => received.Count >= 1, timeout.Token); received[0].Subject.ShouldBe("orders.>"); received[0].Account.ShouldBe("$G"); received[0].IsRemoval.ShouldBeFalse(); } // Go: TestLeafNodeRouteParseLSUnsub server/leafnode_test.go:2486 [Fact] public async Task Leaf_connection_receives_LS_minus_and_triggers_removal() { using var listener = new TcpListener(IPAddress.Loopback, 0); listener.Start(); var port = ((IPEndPoint)listener.LocalEndpoint).Port; using var remoteSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); await remoteSocket.ConnectAsync(IPAddress.Loopback, port); using var leafSocket = await listener.AcceptSocketAsync(); await using var leaf = new LeafConnection(leafSocket); using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); var handshakeTask = leaf.PerformOutboundHandshakeAsync("LOCAL", timeout.Token); (await ReadLineAsync(remoteSocket, timeout.Token)).ShouldBe("LEAF LOCAL"); await WriteLineAsync(remoteSocket, "LEAF REMOTE", timeout.Token); await handshakeTask; var received = new List(); leaf.RemoteSubscriptionReceived = sub => { received.Add(sub); return Task.CompletedTask; }; leaf.StartLoop(timeout.Token); await WriteLineAsync(remoteSocket, "LS+ $G foo.bar", timeout.Token); await WaitForAsync(() => received.Count >= 1, timeout.Token); await WriteLineAsync(remoteSocket, "LS- $G foo.bar", timeout.Token); await WaitForAsync(() => received.Count >= 2, timeout.Token); received[1].Subject.ShouldBe("foo.bar"); received[1].IsRemoval.ShouldBeTrue(); } // Go: TestLeafNodeLMsgSplit server/leafnode_test.go:2387 [Fact] public async Task Leaf_connection_receives_LMSG_and_triggers_message_callback() { using var listener = new TcpListener(IPAddress.Loopback, 0); listener.Start(); var port = ((IPEndPoint)listener.LocalEndpoint).Port; using var remoteSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); await remoteSocket.ConnectAsync(IPAddress.Loopback, port); using var leafSocket = await listener.AcceptSocketAsync(); await using var leaf = new LeafConnection(leafSocket); using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); var handshakeTask = leaf.PerformOutboundHandshakeAsync("LOCAL", timeout.Token); (await ReadLineAsync(remoteSocket, timeout.Token)).ShouldBe("LEAF LOCAL"); await WriteLineAsync(remoteSocket, "LEAF REMOTE", timeout.Token); await handshakeTask; var messages = new List(); leaf.MessageReceived = msg => { messages.Add(msg); return Task.CompletedTask; }; leaf.StartLoop(timeout.Token); var payload = "hello from remote"u8.ToArray(); await WriteLineAsync(remoteSocket, $"LMSG $G test.subject reply-to {payload.Length}", timeout.Token); await remoteSocket.SendAsync(payload, SocketFlags.None, timeout.Token); await remoteSocket.SendAsync("\r\n"u8.ToArray(), SocketFlags.None, timeout.Token); await WaitForAsync(() => messages.Count >= 1, timeout.Token); messages[0].Subject.ShouldBe("test.subject"); messages[0].ReplyTo.ShouldBe("reply-to"); messages[0].Account.ShouldBe("$G"); Encoding.ASCII.GetString(messages[0].Payload.Span).ShouldBe("hello from remote"); } // Go: TestLeafNodeLMsgSplit server/leafnode_test.go:2387 [Fact] public async Task Leaf_connection_receives_LMSG_with_account_scoped_format() { using var listener = new TcpListener(IPAddress.Loopback, 0); listener.Start(); var port = ((IPEndPoint)listener.LocalEndpoint).Port; using var remoteSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); await remoteSocket.ConnectAsync(IPAddress.Loopback, port); using var leafSocket = await listener.AcceptSocketAsync(); await using var leaf = new LeafConnection(leafSocket); using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); var handshakeTask = leaf.PerformOutboundHandshakeAsync("LOCAL", timeout.Token); (await ReadLineAsync(remoteSocket, timeout.Token)).ShouldBe("LEAF LOCAL"); await WriteLineAsync(remoteSocket, "LEAF REMOTE", timeout.Token); await handshakeTask; var messages = new List(); leaf.MessageReceived = msg => { messages.Add(msg); return Task.CompletedTask; }; leaf.StartLoop(timeout.Token); var payload = "acct"u8.ToArray(); await WriteLineAsync(remoteSocket, $"LMSG MYACCT test.subject - {payload.Length}", timeout.Token); await remoteSocket.SendAsync(payload, SocketFlags.None, timeout.Token); await remoteSocket.SendAsync("\r\n"u8.ToArray(), SocketFlags.None, timeout.Token); await WaitForAsync(() => messages.Count >= 1, timeout.Token); messages[0].Account.ShouldBe("MYACCT"); messages[0].Subject.ShouldBe("test.subject"); messages[0].ReplyTo.ShouldBeNull(); } // Go: TestLeafNodeTwoRemotesToSameHubAccount server/leafnode_test.go:2210 [Fact] public async Task Leaf_connection_receives_LS_plus_with_queue() { using var listener = new TcpListener(IPAddress.Loopback, 0); listener.Start(); var port = ((IPEndPoint)listener.LocalEndpoint).Port; using var remoteSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); await remoteSocket.ConnectAsync(IPAddress.Loopback, port); using var leafSocket = await listener.AcceptSocketAsync(); await using var leaf = new LeafConnection(leafSocket); using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); var handshakeTask = leaf.PerformOutboundHandshakeAsync("LOCAL", timeout.Token); (await ReadLineAsync(remoteSocket, timeout.Token)).ShouldBe("LEAF LOCAL"); await WriteLineAsync(remoteSocket, "LEAF REMOTE", timeout.Token); await handshakeTask; var received = new List(); leaf.RemoteSubscriptionReceived = sub => { received.Add(sub); return Task.CompletedTask; }; leaf.StartLoop(timeout.Token); await WriteLineAsync(remoteSocket, "LS+ $G work.> workers", timeout.Token); await WaitForAsync(() => received.Count >= 1, timeout.Token); received[0].Subject.ShouldBe("work.>"); received[0].Queue.ShouldBe("workers"); received[0].Account.ShouldBe("$G"); } // Go: TestLeafNodeSlowConsumer server/leafnode_test.go:9103 [Fact] public async Task Leaf_connection_handles_multiple_rapid_LMSG_messages() { using var listener = new TcpListener(IPAddress.Loopback, 0); listener.Start(); var port = ((IPEndPoint)listener.LocalEndpoint).Port; using var remoteSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); await remoteSocket.ConnectAsync(IPAddress.Loopback, port); using var leafSocket = await listener.AcceptSocketAsync(); await using var leaf = new LeafConnection(leafSocket); using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); var handshakeTask = leaf.PerformOutboundHandshakeAsync("LOCAL", timeout.Token); (await ReadLineAsync(remoteSocket, timeout.Token)).ShouldBe("LEAF LOCAL"); await WriteLineAsync(remoteSocket, "LEAF REMOTE", timeout.Token); await handshakeTask; var messageCount = 0; leaf.MessageReceived = _ => { Interlocked.Increment(ref messageCount); return Task.CompletedTask; }; leaf.StartLoop(timeout.Token); const int numMessages = 20; for (var i = 0; i < numMessages; i++) { var payload = Encoding.ASCII.GetBytes($"msg-{i}"); var line = $"LMSG $G test.multi - {payload.Length}\r\n"; await remoteSocket.SendAsync(Encoding.ASCII.GetBytes(line), SocketFlags.None, timeout.Token); await remoteSocket.SendAsync(payload, SocketFlags.None, timeout.Token); await remoteSocket.SendAsync("\r\n"u8.ToArray(), SocketFlags.None, timeout.Token); } await WaitForAsync(() => Volatile.Read(ref messageCount) >= numMessages, timeout.Token); Volatile.Read(ref messageCount).ShouldBe(numMessages); } private static async Task ReadLineAsync(Socket socket, CancellationToken ct) { var bytes = new List(64); var single = new byte[1]; while (true) { var read = await socket.ReceiveAsync(single, SocketFlags.None, ct); if (read == 0) break; if (single[0] == (byte)'\n') break; if (single[0] != (byte)'\r') bytes.Add(single[0]); } return Encoding.ASCII.GetString([.. bytes]); } private static Task WriteLineAsync(Socket socket, string line, CancellationToken ct) => socket.SendAsync(Encoding.ASCII.GetBytes($"{line}\r\n"), SocketFlags.None, ct).AsTask(); private static async Task WaitForAsync(Func predicate, CancellationToken ct) { while (!ct.IsCancellationRequested) { if (predicate()) return; await Task.Delay(20, ct); } throw new TimeoutException("Timed out waiting for condition."); } }