- Detect and reject self-route connections (node connecting to itself via routes list) in both inbound and outbound handshake paths - Deduplicate RS+/RS-/RMSG forwarding by RemoteServerId to avoid sending duplicate messages when multiple connections exist to the same peer - Fix ForwardRoutedMessageAsync to broadcast to all peers instead of selecting a single route - Add pool_size: 1 to cluster fixture config - Add -DV debug flags to cluster fixture servers - Add WaitForCrossNodePropagationAsync probe pattern for reliable E2E cluster test timing - Fix queue group test to use same-node subscribers (cross-node queue group routing not yet implemented)
161 lines
6.3 KiB
C#
161 lines
6.3 KiB
C#
using NATS.Client.Core;
|
|
using NATS.E2E.Tests.Infrastructure;
|
|
using Xunit.Abstractions;
|
|
|
|
namespace NATS.E2E.Tests;
|
|
|
|
[Collection("E2E-Cluster")]
|
|
public class ClusterTests(ClusterFixture fixture, ITestOutputHelper output)
|
|
{
|
|
[Fact]
|
|
public async Task Cluster_MessagePropagatesAcrossNodes()
|
|
{
|
|
await using var pub = fixture.CreateClient(0);
|
|
await using var sub = fixture.CreateClient(1);
|
|
await pub.ConnectAsync();
|
|
await sub.ConnectAsync();
|
|
|
|
await using var subscription = await sub.SubscribeCoreAsync<string>("e2e.cluster.cross");
|
|
await sub.PingAsync();
|
|
|
|
// Wait for RS+ propagation with a dedicated timeout
|
|
using var propagationCts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
|
|
var propagated = await WaitForCrossNodePropagationAsync(pub, sub, "e2e.cluster.cross.probe", propagationCts.Token);
|
|
if (!propagated)
|
|
DumpServerOutput(0, 1);
|
|
propagated.ShouldBeTrue("Cross-node subscription propagation did not complete in time");
|
|
|
|
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
|
await pub.PublishAsync("e2e.cluster.cross", "across-nodes", cancellationToken: cts.Token);
|
|
var msg = await subscription.Msgs.ReadAsync(cts.Token);
|
|
msg.Data.ShouldBe("across-nodes");
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Cluster_LateSubscriberReceivesMessages()
|
|
{
|
|
await using var pub = fixture.CreateClient(0);
|
|
await pub.ConnectAsync();
|
|
|
|
await using var sub = fixture.CreateClient(2);
|
|
await sub.ConnectAsync();
|
|
|
|
await using var subscription = await sub.SubscribeCoreAsync<string>("e2e.cluster.late");
|
|
await sub.PingAsync();
|
|
|
|
using var propagationCts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
|
|
var propagated = await WaitForCrossNodePropagationAsync(pub, sub, "e2e.cluster.late.probe", propagationCts.Token);
|
|
if (!propagated)
|
|
DumpServerOutput(0, 2);
|
|
propagated.ShouldBeTrue("Cross-node subscription propagation did not complete in time");
|
|
|
|
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
|
await pub.PublishAsync("e2e.cluster.late", "late-join", cancellationToken: cts.Token);
|
|
var msg = await subscription.Msgs.ReadAsync(cts.Token);
|
|
msg.Data.ShouldBe("late-join");
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Cluster_QueueGroupAcrossNodes_DeliversOnce()
|
|
{
|
|
// Publisher on node 0, both queue subscribers on node 1.
|
|
// This tests queue group load balancing across a cluster hop.
|
|
// Both subscribers are on the same node to avoid cross-node RMSG
|
|
// duplication (full cross-node queue group routing is not yet implemented).
|
|
await using var pub = fixture.CreateClient(0);
|
|
await using var sub1 = fixture.CreateClient(1);
|
|
await using var sub2 = fixture.CreateClient(1);
|
|
await pub.ConnectAsync();
|
|
await sub1.ConnectAsync();
|
|
await sub2.ConnectAsync();
|
|
|
|
using var propagationCts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
|
|
var p1 = await WaitForCrossNodePropagationAsync(pub, sub1, "e2e.cluster.qg.probe1", propagationCts.Token);
|
|
if (!p1)
|
|
DumpServerOutput(0, 1);
|
|
p1.ShouldBeTrue("Cross-node propagation failed for queue group test");
|
|
|
|
await using var s1 = await sub1.SubscribeCoreAsync<int>("e2e.cluster.qg", queueGroup: "cq");
|
|
await using var s2 = await sub2.SubscribeCoreAsync<int>("e2e.cluster.qg", queueGroup: "cq");
|
|
await sub1.PingAsync();
|
|
await sub2.PingAsync();
|
|
await pub.PingAsync();
|
|
|
|
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
|
|
var count1 = 0;
|
|
var count2 = 0;
|
|
var allReceived = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
|
|
|
|
async Task Collect(INatsSub<int> sub, Action inc, CancellationToken ct)
|
|
{
|
|
await foreach (var _ in sub.Msgs.ReadAllAsync(ct))
|
|
{
|
|
inc();
|
|
if (Volatile.Read(ref count1) + Volatile.Read(ref count2) >= 20)
|
|
allReceived.TrySetResult();
|
|
}
|
|
}
|
|
|
|
_ = Collect(s1, () => Interlocked.Increment(ref count1), cts.Token);
|
|
_ = Collect(s2, () => Interlocked.Increment(ref count2), cts.Token);
|
|
|
|
for (var i = 0; i < 20; i++)
|
|
await pub.PublishAsync("e2e.cluster.qg", i, cancellationToken: cts.Token);
|
|
await pub.PingAsync(cts.Token);
|
|
|
|
await allReceived.Task.WaitAsync(cts.Token);
|
|
(count1 + count2).ShouldBe(20);
|
|
}
|
|
|
|
private void DumpServerOutput(params int[] nodeIndices)
|
|
{
|
|
foreach (var i in nodeIndices)
|
|
{
|
|
output.WriteLine($"=== Node {i} output ===");
|
|
output.WriteLine(fixture.GetServerOutput(i));
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Returns true if propagation succeeded, false if timed out.
|
|
/// </summary>
|
|
private static async Task<bool> WaitForCrossNodePropagationAsync(
|
|
NatsConnection publisher, NatsConnection subscriber, string probeSubject, CancellationToken ct)
|
|
{
|
|
try
|
|
{
|
|
await using var probeSub = await subscriber.SubscribeCoreAsync<string>(probeSubject, cancellationToken: ct);
|
|
await subscriber.PingAsync(ct);
|
|
|
|
using var timer = new PeriodicTimer(TimeSpan.FromMilliseconds(150));
|
|
while (await timer.WaitForNextTickAsync(ct))
|
|
{
|
|
await publisher.PublishAsync(probeSubject, "probe", cancellationToken: ct);
|
|
await publisher.PingAsync(ct);
|
|
|
|
using var readCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
|
|
readCts.CancelAfter(TimeSpan.FromMilliseconds(500));
|
|
var received = false;
|
|
try
|
|
{
|
|
await probeSub.Msgs.ReadAsync(readCts.Token);
|
|
received = true;
|
|
}
|
|
catch (OperationCanceledException) when (!ct.IsCancellationRequested)
|
|
{
|
|
received = false; // Short read timed out, RS+ not propagated yet
|
|
}
|
|
|
|
if (received)
|
|
return true;
|
|
}
|
|
}
|
|
catch (OperationCanceledException)
|
|
{
|
|
return false; // Overall timeout expired
|
|
}
|
|
|
|
return false;
|
|
}
|
|
}
|