using NATS.Client.Core; using NATS.E2E.Tests.Infrastructure; namespace NATS.E2E.Tests; [Collection("E2E-Cluster")] public class ClusterTests(ClusterFixture fixture) { [Fact] public async Task Cluster_MessagePropagatesAcrossNodes() { await using var pub = fixture.CreateClient(0); await using var sub = fixture.CreateClient(1); await pub.ConnectAsync(); await sub.ConnectAsync(); await using var subscription = await sub.SubscribeCoreAsync("e2e.cluster.cross"); await sub.PingAsync(); await pub.PingAsync(); await pub.PublishAsync("e2e.cluster.cross", "across-nodes"); using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var msg = await subscription.Msgs.ReadAsync(cts.Token); msg.Data.ShouldBe("across-nodes"); } [Fact] public async Task Cluster_LateSubscriberReceivesMessages() { await using var pub = fixture.CreateClient(0); await pub.ConnectAsync(); await using var sub = fixture.CreateClient(2); await sub.ConnectAsync(); await using var subscription = await sub.SubscribeCoreAsync("e2e.cluster.late"); await sub.PingAsync(); await pub.PingAsync(); await pub.PublishAsync("e2e.cluster.late", "late-join"); using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var msg = await subscription.Msgs.ReadAsync(cts.Token); msg.Data.ShouldBe("late-join"); } [Fact] public async Task Cluster_QueueGroupAcrossNodes_DeliversOnce() { await using var pub = fixture.CreateClient(0); await using var sub1 = fixture.CreateClient(1); await using var sub2 = fixture.CreateClient(2); await pub.ConnectAsync(); await sub1.ConnectAsync(); await sub2.ConnectAsync(); await using var s1 = await sub1.SubscribeCoreAsync("e2e.cluster.qg", queueGroup: "cq"); await using var s2 = await sub2.SubscribeCoreAsync("e2e.cluster.qg", queueGroup: "cq"); await sub1.PingAsync(); await sub2.PingAsync(); await pub.PingAsync(); using var collectionCts = new CancellationTokenSource(TimeSpan.FromSeconds(15)); var count1 = 0; var count2 = 0; var allReceived = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); async Task Collect(INatsSub sub, Action inc, CancellationToken ct) { await foreach (var _ in sub.Msgs.ReadAllAsync(ct)) { inc(); if (Volatile.Read(ref count1) + Volatile.Read(ref count2) >= 20) allReceived.TrySetResult(); } } _ = Collect(s1, () => Interlocked.Increment(ref count1), collectionCts.Token); _ = Collect(s2, () => Interlocked.Increment(ref count2), collectionCts.Token); for (var i = 0; i < 20; i++) await pub.PublishAsync("e2e.cluster.qg", i); await pub.PingAsync(); // Wait for all 20 messages to arrive via event-driven signaling await allReceived.Task.WaitAsync(collectionCts.Token); (count1 + count2).ShouldBe(20); } }