test: add E2E cluster tests and fix leaf/gateway using warnings
This commit is contained in:
91
tests/NATS.E2E.Tests/ClusterTests.cs
Normal file
91
tests/NATS.E2E.Tests/ClusterTests.cs
Normal file
@@ -0,0 +1,91 @@
|
||||
using NATS.Client.Core;
|
||||
using NATS.E2E.Tests.Infrastructure;
|
||||
|
||||
namespace NATS.E2E.Tests;
|
||||
|
||||
[Collection("E2E-Cluster")]
|
||||
public class ClusterTests(ClusterFixture fixture)
|
||||
{
|
||||
[Fact]
|
||||
public async Task Cluster_MessagePropagatesAcrossNodes()
|
||||
{
|
||||
await using var pub = fixture.CreateClient(0);
|
||||
await using var sub = fixture.CreateClient(1);
|
||||
await pub.ConnectAsync();
|
||||
await sub.ConnectAsync();
|
||||
|
||||
await using var subscription = await sub.SubscribeCoreAsync<string>("e2e.cluster.cross");
|
||||
await sub.PingAsync();
|
||||
await pub.PingAsync();
|
||||
|
||||
await pub.PublishAsync("e2e.cluster.cross", "across-nodes");
|
||||
|
||||
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
|
||||
var msg = await subscription.Msgs.ReadAsync(cts.Token);
|
||||
msg.Data.ShouldBe("across-nodes");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Cluster_LateSubscriberReceivesMessages()
|
||||
{
|
||||
await using var pub = fixture.CreateClient(0);
|
||||
await pub.ConnectAsync();
|
||||
|
||||
await using var sub = fixture.CreateClient(2);
|
||||
await sub.ConnectAsync();
|
||||
|
||||
await using var subscription = await sub.SubscribeCoreAsync<string>("e2e.cluster.late");
|
||||
await sub.PingAsync();
|
||||
await pub.PingAsync();
|
||||
|
||||
await pub.PublishAsync("e2e.cluster.late", "late-join");
|
||||
|
||||
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
|
||||
var msg = await subscription.Msgs.ReadAsync(cts.Token);
|
||||
msg.Data.ShouldBe("late-join");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Cluster_QueueGroupAcrossNodes_DeliversOnce()
|
||||
{
|
||||
await using var pub = fixture.CreateClient(0);
|
||||
await using var sub1 = fixture.CreateClient(1);
|
||||
await using var sub2 = fixture.CreateClient(2);
|
||||
await pub.ConnectAsync();
|
||||
await sub1.ConnectAsync();
|
||||
await sub2.ConnectAsync();
|
||||
|
||||
await using var s1 = await sub1.SubscribeCoreAsync<int>("e2e.cluster.qg", queueGroup: "cq");
|
||||
await using var s2 = await sub2.SubscribeCoreAsync<int>("e2e.cluster.qg", queueGroup: "cq");
|
||||
await sub1.PingAsync();
|
||||
await sub2.PingAsync();
|
||||
await pub.PingAsync();
|
||||
|
||||
using var collectionCts = new CancellationTokenSource(TimeSpan.FromSeconds(15));
|
||||
var count1 = 0;
|
||||
var count2 = 0;
|
||||
var allReceived = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||
|
||||
async Task Collect(INatsSub<int> sub, Action inc, CancellationToken ct)
|
||||
{
|
||||
await foreach (var _ in sub.Msgs.ReadAllAsync(ct))
|
||||
{
|
||||
inc();
|
||||
if (Volatile.Read(ref count1) + Volatile.Read(ref count2) >= 20)
|
||||
allReceived.TrySetResult();
|
||||
}
|
||||
}
|
||||
|
||||
_ = Collect(s1, () => Interlocked.Increment(ref count1), collectionCts.Token);
|
||||
_ = Collect(s2, () => Interlocked.Increment(ref count2), collectionCts.Token);
|
||||
|
||||
for (var i = 0; i < 20; i++)
|
||||
await pub.PublishAsync("e2e.cluster.qg", i);
|
||||
await pub.PingAsync();
|
||||
|
||||
// Wait for all 20 messages to arrive via event-driven signaling
|
||||
await allReceived.Task.WaitAsync(collectionCts.Token);
|
||||
|
||||
(count1 + count2).ShouldBe(20);
|
||||
}
|
||||
}
|
||||
@@ -1,4 +1,3 @@
|
||||
using NATS.Client.Core;
|
||||
using NATS.E2E.Tests.Infrastructure;
|
||||
|
||||
namespace NATS.E2E.Tests;
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
using NATS.Client.Core;
|
||||
using NATS.E2E.Tests.Infrastructure;
|
||||
|
||||
namespace NATS.E2E.Tests;
|
||||
|
||||
Reference in New Issue
Block a user