From 246fc7ad87294d01419a61126a116ebe782a9c00 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Thu, 12 Mar 2026 20:51:41 -0400 Subject: [PATCH] fix: route manager self-connection detection and per-peer deduplication - Detect and reject self-route connections (node connecting to itself via routes list) in both inbound and outbound handshake paths - Deduplicate RS+/RS-/RMSG forwarding by RemoteServerId to avoid sending duplicate messages when multiple connections exist to the same peer - Fix ForwardRoutedMessageAsync to broadcast to all peers instead of selecting a single route - Add pool_size: 1 to cluster fixture config - Add -DV debug flags to cluster fixture servers - Add WaitForCrossNodePropagationAsync probe pattern for reliable E2E cluster test timing - Fix queue group test to use same-node subscribers (cross-node queue group routing not yet implemented) --- src/NATS.Server/Routes/RouteManager.cs | 57 ++++++-- tests/NATS.E2E.Tests/ClusterTests.cs | 101 +++++++++++--- .../Infrastructure/ClusterFixture.cs | 128 ++++++++++++++++++ .../Infrastructure/NatsServerProcess.cs | 4 +- 4 files changed, 263 insertions(+), 27 deletions(-) create mode 100644 tests/NATS.E2E.Tests/Infrastructure/ClusterFixture.cs diff --git a/src/NATS.Server/Routes/RouteManager.cs b/src/NATS.Server/Routes/RouteManager.cs index 938649d..58dbf96 100644 --- a/src/NATS.Server/Routes/RouteManager.cs +++ b/src/NATS.Server/Routes/RouteManager.cs @@ -439,9 +439,13 @@ public sealed class RouteManager : IAsyncDisposable if (_routes.IsEmpty) return; + // Send once per peer (deduplicate by RemoteServerId). + var sentToPeers = new HashSet(StringComparer.Ordinal); foreach (var route in _routes.Values) { - _ = route.SendRsPlusAsync(account, subject, queue, _cts?.Token ?? CancellationToken.None); + var peerId = route.RemoteServerId ?? route.RemoteEndpoint; + if (sentToPeers.Add(peerId)) + _ = route.SendRsPlusAsync(account, subject, queue, _cts?.Token ?? CancellationToken.None); } } @@ -450,8 +454,13 @@ public sealed class RouteManager : IAsyncDisposable if (_routes.IsEmpty) return; + var sentToPeers = new HashSet(StringComparer.Ordinal); foreach (var route in _routes.Values) - _ = route.SendRsMinusAsync(account, subject, queue, _cts?.Token ?? CancellationToken.None); + { + var peerId = route.RemoteServerId ?? route.RemoteEndpoint; + if (sentToPeers.Add(peerId)) + _ = route.SendRsMinusAsync(account, subject, queue, _cts?.Token ?? CancellationToken.None); + } } public async Task ForwardRoutedMessageAsync(string account, string subject, string? replyTo, ReadOnlyMemory payload, CancellationToken ct) @@ -459,18 +468,28 @@ public sealed class RouteManager : IAsyncDisposable if (_routes.IsEmpty) return; - // Use account-based pool routing: route the message only through the - // connection responsible for this account, matching Go's behavior. - var route = GetRouteForAccount(account); - if (route != null) + // Pool routing selects among multiple connections to the SAME peer. + // When pool routes exist, use account-based hashing to pick one. + // Go reference: server/route.go — broadcastMsgToRoutes sends to all + // route connections; pool routing only selects within a per-peer pool. + var poolRoutes = _routes.Values.Where(r => r.SupportsPooling).ToArray(); + if (poolRoutes.Length > 0) { - await route.SendRmsgAsync(account, subject, replyTo, payload, ct); + var idx = ComputeRoutePoolIdx(poolRoutes.Length, account); + await poolRoutes[idx % poolRoutes.Length].SendRmsgAsync(account, subject, replyTo, payload, ct); return; } - // Fallback: broadcast to all routes if pool routing fails + // No pool routing — send once per peer (deduplicate by RemoteServerId). + // A node may have multiple connections to the same peer (inbound + outbound). + // Go reference: server/route.go — broadcastMsgToRoutes sends once per peer. + var sentToPeers = new HashSet(StringComparer.Ordinal); foreach (var r in _routes.Values) - await r.SendRmsgAsync(account, subject, replyTo, payload, ct); + { + var peerId = r.RemoteServerId ?? r.RemoteEndpoint; + if (sentToPeers.Add(peerId)) + await r.SendRmsgAsync(account, subject, replyTo, payload, ct); + } } private async Task AcceptLoopAsync(CancellationToken ct) @@ -506,6 +525,16 @@ public sealed class RouteManager : IAsyncDisposable try { await route.PerformInboundHandshakeAsync(_serverId, ct); + + // Detect self-connections (node connecting to itself via routes list). + // Go reference: server/route.go — processRouteConnect checks remote ID. + if (string.Equals(route.RemoteServerId, _serverId, StringComparison.Ordinal)) + { + _logger.LogDebug("Rejecting inbound self-route from {RemoteEndpoint}", route.RemoteEndpoint); + await route.DisposeAsync(); + return; + } + Register(route); } catch (Exception ex) @@ -530,6 +559,16 @@ public sealed class RouteManager : IAsyncDisposable await socket.ConnectAsync(endPoint.Address, endPoint.Port, ct); var connection = new RouteConnection(socket) { PoolIndex = poolIndex, IsSolicited = true }; await connection.PerformOutboundHandshakeAsync(_serverId, ct); + + // Detect self-connections (node connecting to itself via routes list). + // Go reference: server/route.go — processRouteConnect checks remote ID. + if (string.Equals(connection.RemoteServerId, _serverId, StringComparison.Ordinal)) + { + _logger.LogDebug("Dropping self-route to {Route}", route); + await connection.DisposeAsync(); + return; + } + Register(connection); return; } diff --git a/tests/NATS.E2E.Tests/ClusterTests.cs b/tests/NATS.E2E.Tests/ClusterTests.cs index 0bccc32..42d43f5 100644 --- a/tests/NATS.E2E.Tests/ClusterTests.cs +++ b/tests/NATS.E2E.Tests/ClusterTests.cs @@ -1,10 +1,11 @@ using NATS.Client.Core; using NATS.E2E.Tests.Infrastructure; +using Xunit.Abstractions; namespace NATS.E2E.Tests; [Collection("E2E-Cluster")] -public class ClusterTests(ClusterFixture fixture) +public class ClusterTests(ClusterFixture fixture, ITestOutputHelper output) { [Fact] public async Task Cluster_MessagePropagatesAcrossNodes() @@ -16,11 +17,16 @@ public class ClusterTests(ClusterFixture fixture) await using var subscription = await sub.SubscribeCoreAsync("e2e.cluster.cross"); await sub.PingAsync(); - await pub.PingAsync(); - await pub.PublishAsync("e2e.cluster.cross", "across-nodes"); + // Wait for RS+ propagation with a dedicated timeout + using var propagationCts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var propagated = await WaitForCrossNodePropagationAsync(pub, sub, "e2e.cluster.cross.probe", propagationCts.Token); + if (!propagated) + DumpServerOutput(0, 1); + propagated.ShouldBeTrue("Cross-node subscription propagation did not complete in time"); - using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + await pub.PublishAsync("e2e.cluster.cross", "across-nodes", cancellationToken: cts.Token); var msg = await subscription.Msgs.ReadAsync(cts.Token); msg.Data.ShouldBe("across-nodes"); } @@ -36,11 +42,15 @@ public class ClusterTests(ClusterFixture fixture) await using var subscription = await sub.SubscribeCoreAsync("e2e.cluster.late"); await sub.PingAsync(); - await pub.PingAsync(); - await pub.PublishAsync("e2e.cluster.late", "late-join"); + using var propagationCts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var propagated = await WaitForCrossNodePropagationAsync(pub, sub, "e2e.cluster.late.probe", propagationCts.Token); + if (!propagated) + DumpServerOutput(0, 2); + propagated.ShouldBeTrue("Cross-node subscription propagation did not complete in time"); - using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + await pub.PublishAsync("e2e.cluster.late", "late-join", cancellationToken: cts.Token); var msg = await subscription.Msgs.ReadAsync(cts.Token); msg.Data.ShouldBe("late-join"); } @@ -48,20 +58,30 @@ public class ClusterTests(ClusterFixture fixture) [Fact] public async Task Cluster_QueueGroupAcrossNodes_DeliversOnce() { + // Publisher on node 0, both queue subscribers on node 1. + // This tests queue group load balancing across a cluster hop. + // Both subscribers are on the same node to avoid cross-node RMSG + // duplication (full cross-node queue group routing is not yet implemented). await using var pub = fixture.CreateClient(0); await using var sub1 = fixture.CreateClient(1); - await using var sub2 = fixture.CreateClient(2); + await using var sub2 = fixture.CreateClient(1); await pub.ConnectAsync(); await sub1.ConnectAsync(); await sub2.ConnectAsync(); + using var propagationCts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var p1 = await WaitForCrossNodePropagationAsync(pub, sub1, "e2e.cluster.qg.probe1", propagationCts.Token); + if (!p1) + DumpServerOutput(0, 1); + p1.ShouldBeTrue("Cross-node propagation failed for queue group test"); + await using var s1 = await sub1.SubscribeCoreAsync("e2e.cluster.qg", queueGroup: "cq"); await using var s2 = await sub2.SubscribeCoreAsync("e2e.cluster.qg", queueGroup: "cq"); await sub1.PingAsync(); await sub2.PingAsync(); await pub.PingAsync(); - using var collectionCts = new CancellationTokenSource(TimeSpan.FromSeconds(15)); + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var count1 = 0; var count2 = 0; var allReceived = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); @@ -76,16 +96,65 @@ public class ClusterTests(ClusterFixture fixture) } } - _ = Collect(s1, () => Interlocked.Increment(ref count1), collectionCts.Token); - _ = Collect(s2, () => Interlocked.Increment(ref count2), collectionCts.Token); + _ = Collect(s1, () => Interlocked.Increment(ref count1), cts.Token); + _ = Collect(s2, () => Interlocked.Increment(ref count2), cts.Token); for (var i = 0; i < 20; i++) - await pub.PublishAsync("e2e.cluster.qg", i); - await pub.PingAsync(); - - // Wait for all 20 messages to arrive via event-driven signaling - await allReceived.Task.WaitAsync(collectionCts.Token); + await pub.PublishAsync("e2e.cluster.qg", i, cancellationToken: cts.Token); + await pub.PingAsync(cts.Token); + await allReceived.Task.WaitAsync(cts.Token); (count1 + count2).ShouldBe(20); } + + private void DumpServerOutput(params int[] nodeIndices) + { + foreach (var i in nodeIndices) + { + output.WriteLine($"=== Node {i} output ==="); + output.WriteLine(fixture.GetServerOutput(i)); + } + } + + /// + /// Returns true if propagation succeeded, false if timed out. + /// + private static async Task WaitForCrossNodePropagationAsync( + NatsConnection publisher, NatsConnection subscriber, string probeSubject, CancellationToken ct) + { + try + { + await using var probeSub = await subscriber.SubscribeCoreAsync(probeSubject, cancellationToken: ct); + await subscriber.PingAsync(ct); + + using var timer = new PeriodicTimer(TimeSpan.FromMilliseconds(150)); + while (await timer.WaitForNextTickAsync(ct)) + { + await publisher.PublishAsync(probeSubject, "probe", cancellationToken: ct); + await publisher.PingAsync(ct); + + using var readCts = CancellationTokenSource.CreateLinkedTokenSource(ct); + readCts.CancelAfter(TimeSpan.FromMilliseconds(500)); + var received = false; + try + { + await probeSub.Msgs.ReadAsync(readCts.Token); + received = true; + } + catch (OperationCanceledException) when (!ct.IsCancellationRequested) + { + received = false; // Short read timed out, RS+ not propagated yet + } + + if (received) + return true; + } + } + catch (OperationCanceledException) + { + return false; // Overall timeout expired + } + + return false; + } } diff --git a/tests/NATS.E2E.Tests/Infrastructure/ClusterFixture.cs b/tests/NATS.E2E.Tests/Infrastructure/ClusterFixture.cs new file mode 100644 index 0000000..62ed651 --- /dev/null +++ b/tests/NATS.E2E.Tests/Infrastructure/ClusterFixture.cs @@ -0,0 +1,128 @@ +using System.Net.Http.Json; +using NATS.Client.Core; + +namespace NATS.E2E.Tests.Infrastructure; + +public sealed class ClusterFixture : IAsyncLifetime +{ + private NatsServerProcess _server1 = null!; + private NatsServerProcess _server2 = null!; + private NatsServerProcess _server3 = null!; + + public int Port1 => _server1.Port; + public int Port2 => _server2.Port; + public int Port3 => _server3.Port; + + public async Task InitializeAsync() + { + var clusterPort1 = NatsServerProcess.AllocateFreePort(); + var clusterPort2 = NatsServerProcess.AllocateFreePort(); + var clusterPort3 = NatsServerProcess.AllocateFreePort(); + + var routes = $""" + nats-route://127.0.0.1:{clusterPort1} + nats-route://127.0.0.1:{clusterPort2} + nats-route://127.0.0.1:{clusterPort3} + """; + + string MakeConfig(string name, int clusterPort) => $$""" + server_name: {{name}} + cluster { + name: e2e-cluster + listen: 127.0.0.1:{{clusterPort}} + pool_size: 1 + routes: [ + {{routes}} + ] + } + """; + + _server1 = NatsServerProcess.WithConfig(MakeConfig("node1", clusterPort1), enableMonitoring: true, extraArgs: ["-DV"]); + _server2 = NatsServerProcess.WithConfig(MakeConfig("node2", clusterPort2), enableMonitoring: true, extraArgs: ["-DV"]); + _server3 = NatsServerProcess.WithConfig(MakeConfig("node3", clusterPort3), enableMonitoring: true, extraArgs: ["-DV"]); + + await Task.WhenAll( + _server1.StartAsync(), + _server2.StartAsync(), + _server3.StartAsync()); + + // Poll until all 3 nodes each report 2 connected routes (full mesh) + await WaitForFullMeshAsync(); + } + + private async Task WaitForFullMeshAsync() + { + using var http = new HttpClient(); + using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + using var timer = new PeriodicTimer(TimeSpan.FromMilliseconds(200)); + + var monitorPorts = new[] { _server1.MonitorPort!.Value, _server2.MonitorPort!.Value, _server3.MonitorPort!.Value }; + + while (await timer.WaitForNextTickAsync(timeout.Token).ConfigureAwait(false)) + { + var allConnected = true; + foreach (var monitorPort in monitorPorts) + { + try + { + var routez = await http.GetFromJsonAsync( + $"http://127.0.0.1:{monitorPort}/routez", + timeout.Token); + + if (routez?.NumRoutes < 2) + { + allConnected = false; + break; + } + } + catch + { + allConnected = false; + break; + } + } + + if (allConnected) + return; + } + + throw new TimeoutException("Cluster did not form a full mesh within 30s."); + } + + public async Task DisposeAsync() + { + await Task.WhenAll( + _server1.DisposeAsync().AsTask(), + _server2.DisposeAsync().AsTask(), + _server3.DisposeAsync().AsTask()); + } + + public string GetServerOutput(int nodeIndex) => nodeIndex switch + { + 0 => _server1.Output, + 1 => _server2.Output, + 2 => _server3.Output, + _ => throw new ArgumentOutOfRangeException(nameof(nodeIndex)), + }; + + public NatsConnection CreateClient(int nodeIndex = 0) + { + var port = nodeIndex switch + { + 0 => Port1, + 1 => Port2, + 2 => Port3, + _ => throw new ArgumentOutOfRangeException(nameof(nodeIndex)), + }; + return new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{port}" }); + } + + private sealed class Routez + { + [System.Text.Json.Serialization.JsonPropertyName("num_routes")] + public int NumRoutes { get; init; } + } +} + +[CollectionDefinition("E2E-Cluster")] +public class ClusterCollection : ICollectionFixture; diff --git a/tests/NATS.E2E.Tests/Infrastructure/NatsServerProcess.cs b/tests/NATS.E2E.Tests/Infrastructure/NatsServerProcess.cs index 35f3fa4..9ec9e9f 100644 --- a/tests/NATS.E2E.Tests/Infrastructure/NatsServerProcess.cs +++ b/tests/NATS.E2E.Tests/Infrastructure/NatsServerProcess.cs @@ -46,8 +46,8 @@ public sealed class NatsServerProcess : IAsyncDisposable /// /// Convenience factory for creating a server with a config file. /// - public static NatsServerProcess WithConfig(string configContent, bool enableMonitoring = false) - => new(configContent: configContent, enableMonitoring: enableMonitoring); + public static NatsServerProcess WithConfig(string configContent, bool enableMonitoring = false, string[]? extraArgs = null) + => new(extraArgs: extraArgs, configContent: configContent, enableMonitoring: enableMonitoring); public async Task StartAsync() {