using System.Net; using System.Net.Sockets; using System.Text; using Microsoft.Extensions.Logging.Abstractions; using NATS.Server.Configuration; namespace NATS.Server.Tests; public class RouteSubscriptionPropagationTests { [Fact] public async Task Subscriptions_propagate_between_routed_servers() { await using var fixture = await RouteFixture.StartTwoNodeClusterAsync(); await fixture.SubscribeOnServerBAsync("foo.*"); var hasInterest = await fixture.ServerAHasRemoteInterestAsync("foo.bar"); hasInterest.ShouldBeTrue(); } } internal sealed class RouteFixture : IAsyncDisposable { private readonly NatsServer _serverA; private readonly NatsServer _serverB; private readonly CancellationTokenSource _ctsA; private readonly CancellationTokenSource _ctsB; private Socket? _subscriberOnB; private RouteFixture(NatsServer serverA, NatsServer serverB, CancellationTokenSource ctsA, CancellationTokenSource ctsB) { _serverA = serverA; _serverB = serverB; _ctsA = ctsA; _ctsB = ctsB; } public static async Task StartTwoNodeClusterAsync() { var optsA = new NatsOptions { Host = "127.0.0.1", Port = 0, Cluster = new ClusterOptions { Name = Guid.NewGuid().ToString("N"), Host = "127.0.0.1", Port = 0, }, }; var serverA = new NatsServer(optsA, NullLoggerFactory.Instance); var ctsA = new CancellationTokenSource(); _ = serverA.StartAsync(ctsA.Token); await serverA.WaitForReadyAsync(); var optsB = new NatsOptions { Host = "127.0.0.1", Port = 0, Cluster = new ClusterOptions { Name = Guid.NewGuid().ToString("N"), Host = "127.0.0.1", Port = 0, Routes = [serverA.ClusterListen!], }, }; var serverB = new NatsServer(optsB, NullLoggerFactory.Instance); var ctsB = new CancellationTokenSource(); _ = serverB.StartAsync(ctsB.Token); await serverB.WaitForReadyAsync(); using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); while (!timeout.IsCancellationRequested && (serverA.Stats.Routes == 0 || serverB.Stats.Routes == 0)) await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default); return new RouteFixture(serverA, serverB, ctsA, ctsB); } public async Task SubscribeOnServerBAsync(string subject) { var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); await sock.ConnectAsync(IPAddress.Loopback, _serverB.Port); _subscriberOnB = sock; await ReadLineAsync(sock); // INFO await sock.SendAsync(Encoding.ASCII.GetBytes($"CONNECT {{}}\r\nSUB {subject} 1\r\nPING\r\n")); await ReadUntilAsync(sock, "PONG"); } public async Task ServerAHasRemoteInterestAsync(string subject) { using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); while (!timeout.IsCancellationRequested) { if (_serverA.HasRemoteInterest(subject)) return true; await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default); } return false; } public async ValueTask DisposeAsync() { _subscriberOnB?.Dispose(); await _ctsA.CancelAsync(); await _ctsB.CancelAsync(); _serverA.Dispose(); _serverB.Dispose(); _ctsA.Dispose(); _ctsB.Dispose(); } private static async Task ReadLineAsync(Socket sock) { var buf = new byte[4096]; var n = await sock.ReceiveAsync(buf, SocketFlags.None); return Encoding.ASCII.GetString(buf, 0, n); } private static async Task ReadUntilAsync(Socket sock, string expected) { var sb = new StringBuilder(); var buf = new byte[4096]; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); while (!sb.ToString().Contains(expected, StringComparison.Ordinal)) { var n = await sock.ReceiveAsync(buf, SocketFlags.None, cts.Token); if (n == 0) break; sb.Append(Encoding.ASCII.GetString(buf, 0, n)); } return sb.ToString(); } }