using System.Net; using System.Net.Sockets; using System.Text; using Microsoft.Extensions.Logging.Abstractions; using NATS.Server.Configuration; using NATS.Server.Subscriptions; namespace NATS.Server.Clustering.Tests.Routes; public class RouteRemoteSubCleanupParityBatch2Tests { [Fact] public void Routed_sub_key_helpers_parse_account_and_queue_fields() { var key = SubList.BuildRoutedSubKey("R1", "A", "orders.*", "q1"); SubList.GetAccNameFromRoutedSubKey(key).ShouldBe("A"); var info = SubList.GetRoutedSubKeyInfo(key); info.ShouldNotBeNull(); info.Value.RouteId.ShouldBe("R1"); info.Value.Account.ShouldBe("A"); info.Value.Subject.ShouldBe("orders.*"); info.Value.Queue.ShouldBe("q1"); SubList.GetRoutedSubKeyInfo("invalid").ShouldBeNull(); SubList.GetAccNameFromRoutedSubKey("invalid").ShouldBeNull(); } [Fact] public void Remove_remote_subs_methods_only_remove_matching_route_or_account() { using var sl = new SubList(); sl.ApplyRemoteSub(new RemoteSubscription("orders.*", null, "r1", "A")); sl.ApplyRemoteSub(new RemoteSubscription("orders.*", null, "r1", "B")); sl.ApplyRemoteSub(new RemoteSubscription("orders.*", null, "r2", "A")); sl.HasRemoteInterest("A", "orders.created").ShouldBeTrue(); sl.HasRemoteInterest("B", "orders.created").ShouldBeTrue(); sl.RemoveRemoteSubsForAccount("r1", "A").ShouldBe(1); sl.HasRemoteInterest("A", "orders.created").ShouldBeTrue(); // r2 still present sl.HasRemoteInterest("B", "orders.created").ShouldBeTrue(); sl.RemoveRemoteSubs("r2").ShouldBe(1); sl.HasRemoteInterest("A", "orders.created").ShouldBeFalse(); sl.HasRemoteInterest("B", "orders.created").ShouldBeTrue(); } [Fact] public async Task Route_disconnect_cleans_remote_interest_without_explicit_rs_minus() { var opts = new NatsOptions { Host = "127.0.0.1", Port = 0, Cluster = new ClusterOptions { Name = Guid.NewGuid().ToString("N"), Host = "127.0.0.1", Port = 0, PoolSize = 1, }, }; var server = new NatsServer(opts, NullLoggerFactory.Instance); using var serverCts = new CancellationTokenSource(); _ = server.StartAsync(serverCts.Token); await server.WaitForReadyAsync(); try { var cluster = server.ClusterListen!; var sep = cluster.LastIndexOf(':'); var host = cluster[..sep]; var port = int.Parse(cluster[(sep + 1)..]); using var remote = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(8)); await remote.ConnectAsync(IPAddress.Parse(host), port, timeout.Token); await WriteLineAsync(remote, "ROUTE REMOTE1", timeout.Token); var response = await ReadLineAsync(remote, timeout.Token); response.ShouldStartWith("ROUTE "); await WriteLineAsync(remote, "RS+ $G route.cleanup.test", timeout.Token); await WaitForCondition(() => server.HasRemoteInterest("route.cleanup.test"), 5000); remote.Dispose(); await WaitForCondition(() => !server.HasRemoteInterest("route.cleanup.test"), 10000); server.HasRemoteInterest("route.cleanup.test").ShouldBeFalse(); } finally { await serverCts.CancelAsync(); server.Dispose(); } } private static async Task WaitForCondition(Func predicate, int timeoutMs) { using var timeout = new CancellationTokenSource(timeoutMs); while (!timeout.IsCancellationRequested) { if (predicate()) return; await Task.Yield(); } throw new TimeoutException("Condition not met."); } private static async Task WriteLineAsync(Socket socket, string line, CancellationToken ct) { var data = Encoding.ASCII.GetBytes($"{line}\r\n"); await socket.SendAsync(data, ct); } private static async Task ReadLineAsync(Socket socket, CancellationToken ct) { var bytes = new List(64); var one = new byte[1]; while (true) { var read = await socket.ReceiveAsync(one, SocketFlags.None, ct); if (read == 0) throw new IOException("Socket closed while reading line"); if (one[0] == (byte)'\n') break; if (one[0] != (byte)'\r') bytes.Add(one[0]); } return Encoding.ASCII.GetString([.. bytes]); } }