Files
natsdotnet/tests/NATS.Server.Clustering.Tests/Routes/RouteRemoteSubCleanupParityBatch2Tests.cs
2026-03-13 09:49:54 -04:00

157 lines
5.4 KiB
C#

using System.Net;
using System.Net.Sockets;
using System.Text;
using Microsoft.Extensions.Logging.Abstractions;
using NATS.Server.Configuration;
using NATS.Server.Subscriptions;
namespace NATS.Server.Clustering.Tests.Routes;
public class RouteRemoteSubCleanupParityBatch2Tests
{
[Fact]
public void Routed_sub_key_helpers_parse_account_and_queue_fields()
{
var key = SubList.BuildRoutedSubKey("R1", "A", "orders.*", "q1");
SubList.GetAccNameFromRoutedSubKey(key).ShouldBe("A");
var info = SubList.GetRoutedSubKeyInfo(key);
info.ShouldNotBeNull();
info.Value.RouteId.ShouldBe("R1");
info.Value.Account.ShouldBe("A");
info.Value.Subject.ShouldBe("orders.*");
info.Value.Queue.ShouldBe("q1");
SubList.GetRoutedSubKeyInfo("invalid").ShouldBeNull();
SubList.GetAccNameFromRoutedSubKey("invalid").ShouldBeNull();
}
[Fact]
public void Remove_remote_subs_methods_only_remove_matching_route_or_account()
{
using var sl = new SubList();
sl.ApplyRemoteSub(new RemoteSubscription("orders.*", null, "r1", "A"));
sl.ApplyRemoteSub(new RemoteSubscription("orders.*", null, "r1", "B"));
sl.ApplyRemoteSub(new RemoteSubscription("orders.*", null, "r2", "A"));
sl.HasRemoteInterest("A", "orders.created").ShouldBeTrue();
sl.HasRemoteInterest("B", "orders.created").ShouldBeTrue();
sl.RemoveRemoteSubsForAccount("r1", "A").ShouldBe(1);
sl.HasRemoteInterest("A", "orders.created").ShouldBeTrue(); // r2 still present
sl.HasRemoteInterest("B", "orders.created").ShouldBeTrue();
sl.RemoveRemoteSubs("r2").ShouldBe(1);
sl.HasRemoteInterest("A", "orders.created").ShouldBeFalse();
sl.HasRemoteInterest("B", "orders.created").ShouldBeTrue();
}
[Fact]
public void Applying_same_remote_subscription_twice_is_idempotent_for_interest_tracking()
{
using var sl = new SubList();
var changes = new List<InterestChange>();
sl.InterestChanged += changes.Add;
var sub = new RemoteSubscription("orders.*", "workers", "r1", "A");
sl.ApplyRemoteSub(sub);
sl.ApplyRemoteSub(sub);
sl.HasRemoteInterest("A", "orders.created").ShouldBeTrue();
sl.MatchRemote("A", "orders.created").Count.ShouldBe(1);
changes.Count(change => change.Kind == InterestChangeKind.RemoteAdded).ShouldBe(1);
}
[Fact]
public async Task Route_disconnect_cleans_remote_interest_without_explicit_rs_minus()
{
var opts = new NatsOptions
{
Host = "127.0.0.1",
Port = 0,
Cluster = new ClusterOptions
{
Name = Guid.NewGuid().ToString("N"),
Host = "127.0.0.1",
Port = 0,
PoolSize = 1,
},
};
var server = new NatsServer(opts, NullLoggerFactory.Instance);
using var serverCts = new CancellationTokenSource();
_ = server.StartAsync(serverCts.Token);
await server.WaitForReadyAsync();
try
{
var cluster = server.ClusterListen!;
var sep = cluster.LastIndexOf(':');
var host = cluster[..sep];
var port = int.Parse(cluster[(sep + 1)..]);
using var remote = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(8));
await remote.ConnectAsync(IPAddress.Parse(host), port, timeout.Token);
await WriteLineAsync(remote, "ROUTE REMOTE1", timeout.Token);
var response = await ReadLineAsync(remote, timeout.Token);
response.ShouldStartWith("ROUTE ");
await WriteLineAsync(remote, "RS+ $G route.cleanup.test", timeout.Token);
await WaitForCondition(() => server.HasRemoteInterest("route.cleanup.test"), 5000);
remote.Dispose();
await WaitForCondition(() => !server.HasRemoteInterest("route.cleanup.test"), 10000);
server.HasRemoteInterest("route.cleanup.test").ShouldBeFalse();
}
finally
{
await serverCts.CancelAsync();
server.Dispose();
}
}
private static async Task WaitForCondition(Func<bool> predicate, int timeoutMs)
{
using var timeout = new CancellationTokenSource(timeoutMs);
while (!timeout.IsCancellationRequested)
{
if (predicate())
return;
await Task.Yield();
}
throw new TimeoutException("Condition not met.");
}
private static async Task WriteLineAsync(Socket socket, string line, CancellationToken ct)
{
var data = Encoding.ASCII.GetBytes($"{line}\r\n");
await socket.SendAsync(data, ct);
}
private static async Task<string> ReadLineAsync(Socket socket, CancellationToken ct)
{
var bytes = new List<byte>(64);
var one = new byte[1];
while (true)
{
var read = await socket.ReceiveAsync(one, SocketFlags.None, ct);
if (read == 0)
throw new IOException("Socket closed while reading line");
if (one[0] == (byte)'\n')
break;
if (one[0] != (byte)'\r')
bytes.Add(one[0]);
}
return Encoding.ASCII.GetString([.. bytes]);
}
}