diff --git a/tests/NATS.E2E.Tests/ClusterTests.cs b/tests/NATS.E2E.Tests/ClusterTests.cs new file mode 100644 index 0000000..0bccc32 --- /dev/null +++ b/tests/NATS.E2E.Tests/ClusterTests.cs @@ -0,0 +1,91 @@ +using NATS.Client.Core; +using NATS.E2E.Tests.Infrastructure; + +namespace NATS.E2E.Tests; + +[Collection("E2E-Cluster")] +public class ClusterTests(ClusterFixture fixture) +{ + [Fact] + public async Task Cluster_MessagePropagatesAcrossNodes() + { + await using var pub = fixture.CreateClient(0); + await using var sub = fixture.CreateClient(1); + await pub.ConnectAsync(); + await sub.ConnectAsync(); + + await using var subscription = await sub.SubscribeCoreAsync("e2e.cluster.cross"); + await sub.PingAsync(); + await pub.PingAsync(); + + await pub.PublishAsync("e2e.cluster.cross", "across-nodes"); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var msg = await subscription.Msgs.ReadAsync(cts.Token); + msg.Data.ShouldBe("across-nodes"); + } + + [Fact] + public async Task Cluster_LateSubscriberReceivesMessages() + { + await using var pub = fixture.CreateClient(0); + await pub.ConnectAsync(); + + await using var sub = fixture.CreateClient(2); + await sub.ConnectAsync(); + + await using var subscription = await sub.SubscribeCoreAsync("e2e.cluster.late"); + await sub.PingAsync(); + await pub.PingAsync(); + + await pub.PublishAsync("e2e.cluster.late", "late-join"); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var msg = await subscription.Msgs.ReadAsync(cts.Token); + msg.Data.ShouldBe("late-join"); + } + + [Fact] + public async Task Cluster_QueueGroupAcrossNodes_DeliversOnce() + { + await using var pub = fixture.CreateClient(0); + await using var sub1 = fixture.CreateClient(1); + await using var sub2 = fixture.CreateClient(2); + await pub.ConnectAsync(); + await sub1.ConnectAsync(); + await sub2.ConnectAsync(); + + await using var s1 = await sub1.SubscribeCoreAsync("e2e.cluster.qg", queueGroup: "cq"); + await using var s2 = await sub2.SubscribeCoreAsync("e2e.cluster.qg", queueGroup: "cq"); + await sub1.PingAsync(); + await sub2.PingAsync(); + await pub.PingAsync(); + + using var collectionCts = new CancellationTokenSource(TimeSpan.FromSeconds(15)); + var count1 = 0; + var count2 = 0; + var allReceived = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + async Task Collect(INatsSub sub, Action inc, CancellationToken ct) + { + await foreach (var _ in sub.Msgs.ReadAllAsync(ct)) + { + inc(); + if (Volatile.Read(ref count1) + Volatile.Read(ref count2) >= 20) + allReceived.TrySetResult(); + } + } + + _ = Collect(s1, () => Interlocked.Increment(ref count1), collectionCts.Token); + _ = Collect(s2, () => Interlocked.Increment(ref count2), collectionCts.Token); + + for (var i = 0; i < 20; i++) + await pub.PublishAsync("e2e.cluster.qg", i); + await pub.PingAsync(); + + // Wait for all 20 messages to arrive via event-driven signaling + await allReceived.Task.WaitAsync(collectionCts.Token); + + (count1 + count2).ShouldBe(20); + } +} diff --git a/tests/NATS.E2E.Tests/GatewayTests.cs b/tests/NATS.E2E.Tests/GatewayTests.cs index 7bfeeca..35a6944 100644 --- a/tests/NATS.E2E.Tests/GatewayTests.cs +++ b/tests/NATS.E2E.Tests/GatewayTests.cs @@ -1,4 +1,3 @@ -using NATS.Client.Core; using NATS.E2E.Tests.Infrastructure; namespace NATS.E2E.Tests; diff --git a/tests/NATS.E2E.Tests/LeafNodeTests.cs b/tests/NATS.E2E.Tests/LeafNodeTests.cs index 7157aa4..94bd92a 100644 --- a/tests/NATS.E2E.Tests/LeafNodeTests.cs +++ b/tests/NATS.E2E.Tests/LeafNodeTests.cs @@ -1,4 +1,3 @@ -using NATS.Client.Core; using NATS.E2E.Tests.Infrastructure; namespace NATS.E2E.Tests;