using System.Net; using System.Net.Sockets; using System.Text; using Microsoft.Extensions.Logging.Abstractions; using NATS.Server.Configuration; namespace NATS.Server.Tests; public class RouteSubscriptionPropagationTests { [Fact] public async Task Subscriptions_propagate_between_routed_servers() { await using var fixture = await RouteFixture.StartTwoNodeClusterAsync(); await fixture.SubscribeOnServerBAsync("foo.*"); var hasInterest = await fixture.ServerAHasRemoteInterestAsync("foo.bar"); hasInterest.ShouldBeTrue(); } } internal sealed class RouteFixture : IAsyncDisposable { private readonly NatsServer _serverA; private readonly NatsServer _serverB; private readonly CancellationTokenSource _ctsA; private readonly CancellationTokenSource _ctsB; private Socket? _subscriberOnB; private Socket? _publisherOnA; private Socket? _manualRouteToA; private RouteFixture(NatsServer serverA, NatsServer serverB, CancellationTokenSource ctsA, CancellationTokenSource ctsB) { _serverA = serverA; _serverB = serverB; _ctsA = ctsA; _ctsB = ctsB; } public static async Task StartTwoNodeClusterAsync() { var optsA = new NatsOptions { Host = "127.0.0.1", Port = 0, Cluster = new ClusterOptions { Name = Guid.NewGuid().ToString("N"), Host = "127.0.0.1", Port = 0, }, }; var serverA = new NatsServer(optsA, NullLoggerFactory.Instance); var ctsA = new CancellationTokenSource(); _ = serverA.StartAsync(ctsA.Token); await serverA.WaitForReadyAsync(); var optsB = new NatsOptions { Host = "127.0.0.1", Port = 0, Cluster = new ClusterOptions { Name = Guid.NewGuid().ToString("N"), Host = "127.0.0.1", Port = 0, Routes = [serverA.ClusterListen!], }, }; var serverB = new NatsServer(optsB, NullLoggerFactory.Instance); var ctsB = new CancellationTokenSource(); _ = serverB.StartAsync(ctsB.Token); await serverB.WaitForReadyAsync(); using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); while (!timeout.IsCancellationRequested && (serverA.Stats.Routes == 0 || serverB.Stats.Routes == 0)) await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default); return new RouteFixture(serverA, serverB, ctsA, ctsB); } public async Task SubscribeOnServerBAsync(string subject) { var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); await sock.ConnectAsync(IPAddress.Loopback, _serverB.Port); _subscriberOnB = sock; await ReadLineAsync(sock); // INFO await sock.SendAsync(Encoding.ASCII.GetBytes($"CONNECT {{}}\r\nSUB {subject} 1\r\nPING\r\n")); await ReadUntilAsync(sock, "PONG"); } public async Task SendRouteSubFrameAsync(string subject) { var (host, port) = ParseHostPort(_serverA.ClusterListen!); var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); await sock.ConnectAsync(IPAddress.Parse(host), port); _manualRouteToA = sock; await sock.SendAsync(Encoding.ASCII.GetBytes("ROUTE test-remote\r\n")); _ = await ReadLineAsync(sock); // ROUTE await sock.SendAsync(Encoding.ASCII.GetBytes($"RS+ {subject}\r\n")); } public async Task SendRouteUnsubFrameAsync(string subject) { if (_manualRouteToA == null) throw new InvalidOperationException("Route frame socket not established."); await _manualRouteToA.SendAsync(Encoding.ASCII.GetBytes($"RS- {subject}\r\n")); } public async Task PublishFromServerAAsync(string subject, string payload) { var sock = _publisherOnA; if (sock == null) { sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); await sock.ConnectAsync(IPAddress.Loopback, _serverA.Port); _publisherOnA = sock; _ = await ReadLineAsync(sock); // INFO await sock.SendAsync(Encoding.ASCII.GetBytes("CONNECT {}\r\nPING\r\n")); await ReadUntilAsync(sock, "PONG"); } await sock.SendAsync(Encoding.ASCII.GetBytes($"PUB {subject} {payload.Length}\r\n{payload}\r\nPING\r\n")); await ReadUntilAsync(sock, "PONG"); } public async Task ReadServerBMessageAsync() { if (_subscriberOnB == null) throw new InvalidOperationException("No subscriber socket on server B."); return await ReadUntilAsync(_subscriberOnB, "MSG "); } public async Task ServerAHasRemoteInterestAsync(string subject, bool expected = true) { using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); while (!timeout.IsCancellationRequested) { if (_serverA.HasRemoteInterest(subject) == expected) return expected; await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default); } return !expected; } public async Task ServerARouteLinkCountToServerBAsync() { using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); while (!timeout.IsCancellationRequested) { if (_serverA.Stats.Routes >= 3) return (int)_serverA.Stats.Routes; await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default); } return (int)_serverA.Stats.Routes; } public async ValueTask DisposeAsync() { _subscriberOnB?.Dispose(); _publisherOnA?.Dispose(); _manualRouteToA?.Dispose(); await _ctsA.CancelAsync(); await _ctsB.CancelAsync(); _serverA.Dispose(); _serverB.Dispose(); _ctsA.Dispose(); _ctsB.Dispose(); } private static async Task ReadLineAsync(Socket sock) { var buf = new byte[4096]; var n = await sock.ReceiveAsync(buf, SocketFlags.None); return Encoding.ASCII.GetString(buf, 0, n); } private static async Task ReadUntilAsync(Socket sock, string expected) { var sb = new StringBuilder(); var buf = new byte[4096]; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); while (!sb.ToString().Contains(expected, StringComparison.Ordinal)) { var n = await sock.ReceiveAsync(buf, SocketFlags.None, cts.Token); if (n == 0) break; sb.Append(Encoding.ASCII.GetString(buf, 0, n)); } return sb.ToString(); } private static (string Host, int Port) ParseHostPort(string endpoint) { var parts = endpoint.Split(':', 2, StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries); return (parts[0], int.Parse(parts[1])); } }