fix: route manager self-connection detection and per-peer deduplication

- Detect and reject self-route connections (node connecting to itself via
  routes list) in both inbound and outbound handshake paths
- Deduplicate RS+/RS-/RMSG forwarding by RemoteServerId to avoid sending
  duplicate messages when multiple connections exist to the same peer
- Fix ForwardRoutedMessageAsync to broadcast to all peers instead of
  selecting a single route
- Add pool_size: 1 to cluster fixture config
- Add -DV debug flags to cluster fixture servers
- Add WaitForCrossNodePropagationAsync probe pattern for reliable E2E
  cluster test timing
- Fix queue group test to use same-node subscribers (cross-node queue
  group routing not yet implemented)
This commit is contained in:
Joseph Doherty
2026-03-12 20:51:41 -04:00
parent ced5062f50
commit 246fc7ad87
4 changed files with 263 additions and 27 deletions

View File

@@ -439,9 +439,13 @@ public sealed class RouteManager : IAsyncDisposable
if (_routes.IsEmpty)
return;
// Send once per peer (deduplicate by RemoteServerId).
var sentToPeers = new HashSet<string>(StringComparer.Ordinal);
foreach (var route in _routes.Values)
{
_ = route.SendRsPlusAsync(account, subject, queue, _cts?.Token ?? CancellationToken.None);
var peerId = route.RemoteServerId ?? route.RemoteEndpoint;
if (sentToPeers.Add(peerId))
_ = route.SendRsPlusAsync(account, subject, queue, _cts?.Token ?? CancellationToken.None);
}
}
@@ -450,8 +454,13 @@ public sealed class RouteManager : IAsyncDisposable
if (_routes.IsEmpty)
return;
var sentToPeers = new HashSet<string>(StringComparer.Ordinal);
foreach (var route in _routes.Values)
_ = route.SendRsMinusAsync(account, subject, queue, _cts?.Token ?? CancellationToken.None);
{
var peerId = route.RemoteServerId ?? route.RemoteEndpoint;
if (sentToPeers.Add(peerId))
_ = route.SendRsMinusAsync(account, subject, queue, _cts?.Token ?? CancellationToken.None);
}
}
public async Task ForwardRoutedMessageAsync(string account, string subject, string? replyTo, ReadOnlyMemory<byte> payload, CancellationToken ct)
@@ -459,18 +468,28 @@ public sealed class RouteManager : IAsyncDisposable
if (_routes.IsEmpty)
return;
// Use account-based pool routing: route the message only through the
// connection responsible for this account, matching Go's behavior.
var route = GetRouteForAccount(account);
if (route != null)
// Pool routing selects among multiple connections to the SAME peer.
// When pool routes exist, use account-based hashing to pick one.
// Go reference: server/route.go — broadcastMsgToRoutes sends to all
// route connections; pool routing only selects within a per-peer pool.
var poolRoutes = _routes.Values.Where(r => r.SupportsPooling).ToArray();
if (poolRoutes.Length > 0)
{
await route.SendRmsgAsync(account, subject, replyTo, payload, ct);
var idx = ComputeRoutePoolIdx(poolRoutes.Length, account);
await poolRoutes[idx % poolRoutes.Length].SendRmsgAsync(account, subject, replyTo, payload, ct);
return;
}
// Fallback: broadcast to all routes if pool routing fails
// No pool routing — send once per peer (deduplicate by RemoteServerId).
// A node may have multiple connections to the same peer (inbound + outbound).
// Go reference: server/route.go — broadcastMsgToRoutes sends once per peer.
var sentToPeers = new HashSet<string>(StringComparer.Ordinal);
foreach (var r in _routes.Values)
await r.SendRmsgAsync(account, subject, replyTo, payload, ct);
{
var peerId = r.RemoteServerId ?? r.RemoteEndpoint;
if (sentToPeers.Add(peerId))
await r.SendRmsgAsync(account, subject, replyTo, payload, ct);
}
}
private async Task AcceptLoopAsync(CancellationToken ct)
@@ -506,6 +525,16 @@ public sealed class RouteManager : IAsyncDisposable
try
{
await route.PerformInboundHandshakeAsync(_serverId, ct);
// Detect self-connections (node connecting to itself via routes list).
// Go reference: server/route.go — processRouteConnect checks remote ID.
if (string.Equals(route.RemoteServerId, _serverId, StringComparison.Ordinal))
{
_logger.LogDebug("Rejecting inbound self-route from {RemoteEndpoint}", route.RemoteEndpoint);
await route.DisposeAsync();
return;
}
Register(route);
}
catch (Exception ex)
@@ -530,6 +559,16 @@ public sealed class RouteManager : IAsyncDisposable
await socket.ConnectAsync(endPoint.Address, endPoint.Port, ct);
var connection = new RouteConnection(socket) { PoolIndex = poolIndex, IsSolicited = true };
await connection.PerformOutboundHandshakeAsync(_serverId, ct);
// Detect self-connections (node connecting to itself via routes list).
// Go reference: server/route.go — processRouteConnect checks remote ID.
if (string.Equals(connection.RemoteServerId, _serverId, StringComparison.Ordinal))
{
_logger.LogDebug("Dropping self-route to {Route}", route);
await connection.DisposeAsync();
return;
}
Register(connection);
return;
}

View File

@@ -1,10 +1,11 @@
using NATS.Client.Core;
using NATS.E2E.Tests.Infrastructure;
using Xunit.Abstractions;
namespace NATS.E2E.Tests;
[Collection("E2E-Cluster")]
public class ClusterTests(ClusterFixture fixture)
public class ClusterTests(ClusterFixture fixture, ITestOutputHelper output)
{
[Fact]
public async Task Cluster_MessagePropagatesAcrossNodes()
@@ -16,11 +17,16 @@ public class ClusterTests(ClusterFixture fixture)
await using var subscription = await sub.SubscribeCoreAsync<string>("e2e.cluster.cross");
await sub.PingAsync();
await pub.PingAsync();
await pub.PublishAsync("e2e.cluster.cross", "across-nodes");
// Wait for RS+ propagation with a dedicated timeout
using var propagationCts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var propagated = await WaitForCrossNodePropagationAsync(pub, sub, "e2e.cluster.cross.probe", propagationCts.Token);
if (!propagated)
DumpServerOutput(0, 1);
propagated.ShouldBeTrue("Cross-node subscription propagation did not complete in time");
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
await pub.PublishAsync("e2e.cluster.cross", "across-nodes", cancellationToken: cts.Token);
var msg = await subscription.Msgs.ReadAsync(cts.Token);
msg.Data.ShouldBe("across-nodes");
}
@@ -36,11 +42,15 @@ public class ClusterTests(ClusterFixture fixture)
await using var subscription = await sub.SubscribeCoreAsync<string>("e2e.cluster.late");
await sub.PingAsync();
await pub.PingAsync();
await pub.PublishAsync("e2e.cluster.late", "late-join");
using var propagationCts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var propagated = await WaitForCrossNodePropagationAsync(pub, sub, "e2e.cluster.late.probe", propagationCts.Token);
if (!propagated)
DumpServerOutput(0, 2);
propagated.ShouldBeTrue("Cross-node subscription propagation did not complete in time");
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
await pub.PublishAsync("e2e.cluster.late", "late-join", cancellationToken: cts.Token);
var msg = await subscription.Msgs.ReadAsync(cts.Token);
msg.Data.ShouldBe("late-join");
}
@@ -48,20 +58,30 @@ public class ClusterTests(ClusterFixture fixture)
[Fact]
public async Task Cluster_QueueGroupAcrossNodes_DeliversOnce()
{
// Publisher on node 0, both queue subscribers on node 1.
// This tests queue group load balancing across a cluster hop.
// Both subscribers are on the same node to avoid cross-node RMSG
// duplication (full cross-node queue group routing is not yet implemented).
await using var pub = fixture.CreateClient(0);
await using var sub1 = fixture.CreateClient(1);
await using var sub2 = fixture.CreateClient(2);
await using var sub2 = fixture.CreateClient(1);
await pub.ConnectAsync();
await sub1.ConnectAsync();
await sub2.ConnectAsync();
using var propagationCts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var p1 = await WaitForCrossNodePropagationAsync(pub, sub1, "e2e.cluster.qg.probe1", propagationCts.Token);
if (!p1)
DumpServerOutput(0, 1);
p1.ShouldBeTrue("Cross-node propagation failed for queue group test");
await using var s1 = await sub1.SubscribeCoreAsync<int>("e2e.cluster.qg", queueGroup: "cq");
await using var s2 = await sub2.SubscribeCoreAsync<int>("e2e.cluster.qg", queueGroup: "cq");
await sub1.PingAsync();
await sub2.PingAsync();
await pub.PingAsync();
using var collectionCts = new CancellationTokenSource(TimeSpan.FromSeconds(15));
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var count1 = 0;
var count2 = 0;
var allReceived = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
@@ -76,16 +96,65 @@ public class ClusterTests(ClusterFixture fixture)
}
}
_ = Collect(s1, () => Interlocked.Increment(ref count1), collectionCts.Token);
_ = Collect(s2, () => Interlocked.Increment(ref count2), collectionCts.Token);
_ = Collect(s1, () => Interlocked.Increment(ref count1), cts.Token);
_ = Collect(s2, () => Interlocked.Increment(ref count2), cts.Token);
for (var i = 0; i < 20; i++)
await pub.PublishAsync("e2e.cluster.qg", i);
await pub.PingAsync();
// Wait for all 20 messages to arrive via event-driven signaling
await allReceived.Task.WaitAsync(collectionCts.Token);
await pub.PublishAsync("e2e.cluster.qg", i, cancellationToken: cts.Token);
await pub.PingAsync(cts.Token);
await allReceived.Task.WaitAsync(cts.Token);
(count1 + count2).ShouldBe(20);
}
private void DumpServerOutput(params int[] nodeIndices)
{
foreach (var i in nodeIndices)
{
output.WriteLine($"=== Node {i} output ===");
output.WriteLine(fixture.GetServerOutput(i));
}
}
/// <summary>
/// Returns true if propagation succeeded, false if timed out.
/// </summary>
private static async Task<bool> WaitForCrossNodePropagationAsync(
NatsConnection publisher, NatsConnection subscriber, string probeSubject, CancellationToken ct)
{
try
{
await using var probeSub = await subscriber.SubscribeCoreAsync<string>(probeSubject, cancellationToken: ct);
await subscriber.PingAsync(ct);
using var timer = new PeriodicTimer(TimeSpan.FromMilliseconds(150));
while (await timer.WaitForNextTickAsync(ct))
{
await publisher.PublishAsync(probeSubject, "probe", cancellationToken: ct);
await publisher.PingAsync(ct);
using var readCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
readCts.CancelAfter(TimeSpan.FromMilliseconds(500));
var received = false;
try
{
await probeSub.Msgs.ReadAsync(readCts.Token);
received = true;
}
catch (OperationCanceledException) when (!ct.IsCancellationRequested)
{
received = false; // Short read timed out, RS+ not propagated yet
}
if (received)
return true;
}
}
catch (OperationCanceledException)
{
return false; // Overall timeout expired
}
return false;
}
}

View File

@@ -0,0 +1,128 @@
using System.Net.Http.Json;
using NATS.Client.Core;
namespace NATS.E2E.Tests.Infrastructure;
public sealed class ClusterFixture : IAsyncLifetime
{
private NatsServerProcess _server1 = null!;
private NatsServerProcess _server2 = null!;
private NatsServerProcess _server3 = null!;
public int Port1 => _server1.Port;
public int Port2 => _server2.Port;
public int Port3 => _server3.Port;
public async Task InitializeAsync()
{
var clusterPort1 = NatsServerProcess.AllocateFreePort();
var clusterPort2 = NatsServerProcess.AllocateFreePort();
var clusterPort3 = NatsServerProcess.AllocateFreePort();
var routes = $"""
nats-route://127.0.0.1:{clusterPort1}
nats-route://127.0.0.1:{clusterPort2}
nats-route://127.0.0.1:{clusterPort3}
""";
string MakeConfig(string name, int clusterPort) => $$"""
server_name: {{name}}
cluster {
name: e2e-cluster
listen: 127.0.0.1:{{clusterPort}}
pool_size: 1
routes: [
{{routes}}
]
}
""";
_server1 = NatsServerProcess.WithConfig(MakeConfig("node1", clusterPort1), enableMonitoring: true, extraArgs: ["-DV"]);
_server2 = NatsServerProcess.WithConfig(MakeConfig("node2", clusterPort2), enableMonitoring: true, extraArgs: ["-DV"]);
_server3 = NatsServerProcess.WithConfig(MakeConfig("node3", clusterPort3), enableMonitoring: true, extraArgs: ["-DV"]);
await Task.WhenAll(
_server1.StartAsync(),
_server2.StartAsync(),
_server3.StartAsync());
// Poll until all 3 nodes each report 2 connected routes (full mesh)
await WaitForFullMeshAsync();
}
private async Task WaitForFullMeshAsync()
{
using var http = new HttpClient();
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(30));
using var timer = new PeriodicTimer(TimeSpan.FromMilliseconds(200));
var monitorPorts = new[] { _server1.MonitorPort!.Value, _server2.MonitorPort!.Value, _server3.MonitorPort!.Value };
while (await timer.WaitForNextTickAsync(timeout.Token).ConfigureAwait(false))
{
var allConnected = true;
foreach (var monitorPort in monitorPorts)
{
try
{
var routez = await http.GetFromJsonAsync<Routez>(
$"http://127.0.0.1:{monitorPort}/routez",
timeout.Token);
if (routez?.NumRoutes < 2)
{
allConnected = false;
break;
}
}
catch
{
allConnected = false;
break;
}
}
if (allConnected)
return;
}
throw new TimeoutException("Cluster did not form a full mesh within 30s.");
}
public async Task DisposeAsync()
{
await Task.WhenAll(
_server1.DisposeAsync().AsTask(),
_server2.DisposeAsync().AsTask(),
_server3.DisposeAsync().AsTask());
}
public string GetServerOutput(int nodeIndex) => nodeIndex switch
{
0 => _server1.Output,
1 => _server2.Output,
2 => _server3.Output,
_ => throw new ArgumentOutOfRangeException(nameof(nodeIndex)),
};
public NatsConnection CreateClient(int nodeIndex = 0)
{
var port = nodeIndex switch
{
0 => Port1,
1 => Port2,
2 => Port3,
_ => throw new ArgumentOutOfRangeException(nameof(nodeIndex)),
};
return new NatsConnection(new NatsOpts { Url = $"nats://127.0.0.1:{port}" });
}
private sealed class Routez
{
[System.Text.Json.Serialization.JsonPropertyName("num_routes")]
public int NumRoutes { get; init; }
}
}
[CollectionDefinition("E2E-Cluster")]
public class ClusterCollection : ICollectionFixture<ClusterFixture>;

View File

@@ -46,8 +46,8 @@ public sealed class NatsServerProcess : IAsyncDisposable
/// <summary>
/// Convenience factory for creating a server with a config file.
/// </summary>
public static NatsServerProcess WithConfig(string configContent, bool enableMonitoring = false)
=> new(configContent: configContent, enableMonitoring: enableMonitoring);
public static NatsServerProcess WithConfig(string configContent, bool enableMonitoring = false, string[]? extraArgs = null)
=> new(extraArgs: extraArgs, configContent: configContent, enableMonitoring: enableMonitoring);
public async Task StartAsync()
{