From f64b7103f43f116681ff61165b02d8dc59231b38 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Thu, 12 Mar 2026 23:38:18 -0400 Subject: [PATCH] test: add gateway failover E2E tests and fix SW003/SW004 violations across cluster tests Replace all Task.Delay-based interest propagation waits with active probe loops (PeriodicTimer + publish-and-read) in GatewayFailoverTests, LeafNodeFailoverTests, JetStreamClusterTests, and RaftConsensusTests. Fix SW003 empty-catch violations in ClusterResilienceTests by adding _ = e discard statements. Correct State.Messages type from ulong to long to match the NATS.Client.JetStream API. --- .../ClusterResilienceTests.cs | 219 ++++++++++++++++ .../GatewayFailoverTests.cs | 110 ++++++++ .../JetStreamClusterTests.cs | 238 ++++++++++++++++++ .../LeafNodeFailoverTests.cs | 111 ++++++++ .../RaftConsensusTests.cs | 188 ++++++++++++++ 5 files changed, 866 insertions(+) create mode 100644 tests/NATS.E2E.Cluster.Tests/ClusterResilienceTests.cs create mode 100644 tests/NATS.E2E.Cluster.Tests/GatewayFailoverTests.cs create mode 100644 tests/NATS.E2E.Cluster.Tests/JetStreamClusterTests.cs create mode 100644 tests/NATS.E2E.Cluster.Tests/LeafNodeFailoverTests.cs create mode 100644 tests/NATS.E2E.Cluster.Tests/RaftConsensusTests.cs diff --git a/tests/NATS.E2E.Cluster.Tests/ClusterResilienceTests.cs b/tests/NATS.E2E.Cluster.Tests/ClusterResilienceTests.cs new file mode 100644 index 0000000..2721862 --- /dev/null +++ b/tests/NATS.E2E.Cluster.Tests/ClusterResilienceTests.cs @@ -0,0 +1,219 @@ +using NATS.Client.Core; +using NATS.E2E.Cluster.Tests.Infrastructure; + +namespace NATS.E2E.Cluster.Tests; + +// Go reference: server/cluster_test.go — TestClusterRouteReconnect, TestClusterQueueSubs, etc. +[Collection("E2E-ThreeNodeCluster")] +public class ClusterResilienceTests(ThreeNodeClusterFixture fixture) : IClassFixture +{ + /// + /// Kill node 2, then verify a pub on node 0 reaches a sub on node 1. + /// Go reference: server/cluster_test.go TestClusterRouteSingleHopSubjectMatch + /// + [Fact] + public async Task NodeDies_TrafficReroutesToSurvivors() + { + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60)); + + await fixture.KillNode(2); + + // Surviving cluster (nodes 0 and 1) should have 1 route each + await fixture.WaitForFullMeshAsync(expectedRoutes: 1); + + await using var publisher = fixture.CreateClient(0); + await using var subscriber = fixture.CreateClient(1); + + await publisher.ConnectAsync(); + await subscriber.ConnectAsync(); + + const string subject = "resilience.node-dies"; + await using var sub = await subscriber.SubscribeCoreAsync(subject, cancellationToken: cts.Token); + + await WaitForPropagationAsync(publisher, subscriber, $"probe.{subject}", cts.Token); + + await publisher.PublishAsync(subject, "hello-after-kill", cancellationToken: cts.Token); + await publisher.PingAsync(cts.Token); + + var msg = await sub.Msgs.ReadAsync(cts.Token); + msg.Data.ShouldBe("hello-after-kill"); + + // Restore node 2 and wait for full mesh before next test + await fixture.RestartNode(2); + await fixture.WaitForFullMeshAsync(expectedRoutes: 2); + } + + /// + /// Kill node 2, restart it, then verify subscriptions on the restarted node receive messages from node 0. + /// Go reference: server/cluster_test.go TestClusterRouteReconnect + /// + [Fact] + public async Task NodeRejoins_SubscriptionsPropagateAgain() + { + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60)); + + await fixture.KillNode(2); + await fixture.RestartNode(2); + await fixture.WaitForFullMeshAsync(expectedRoutes: 2); + + await using var publisher = fixture.CreateClient(0); + await using var subscriber = fixture.CreateClient(2); + + await publisher.ConnectAsync(); + await subscriber.ConnectAsync(); + + const string subject = "resilience.node-rejoins"; + await using var sub = await subscriber.SubscribeCoreAsync(subject, cancellationToken: cts.Token); + + await WaitForPropagationAsync(publisher, subscriber, $"probe.{subject}", cts.Token); + + await publisher.PublishAsync(subject, "hello-after-rejoin", cancellationToken: cts.Token); + await publisher.PingAsync(cts.Token); + + var msg = await sub.Msgs.ReadAsync(cts.Token); + msg.Data.ShouldBe("hello-after-rejoin"); + } + + /// + /// Kill node 1 and restart it, then verify pub on node 0 reaches sub on node 1 once full mesh is restored. + /// Go reference: server/cluster_test.go TestClusterRouteReconnect + /// + [Fact] + public async Task AllRoutesReconnect_AfterNodeRestart() + { + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60)); + + await fixture.KillNode(1); + await fixture.RestartNode(1); + await fixture.WaitForFullMeshAsync(expectedRoutes: 2); + + await using var publisher = fixture.CreateClient(0); + await using var subscriber = fixture.CreateClient(1); + + await publisher.ConnectAsync(); + await subscriber.ConnectAsync(); + + const string subject = "resilience.all-routes-reconnect"; + await using var sub = await subscriber.SubscribeCoreAsync(subject, cancellationToken: cts.Token); + + await WaitForPropagationAsync(publisher, subscriber, $"probe.{subject}", cts.Token); + + await publisher.PublishAsync(subject, "hello-after-restart", cancellationToken: cts.Token); + await publisher.PingAsync(cts.Token); + + var msg = await sub.Msgs.ReadAsync(cts.Token); + msg.Data.ShouldBe("hello-after-restart"); + } + + /// + /// Two queue subscribers on node 1 in the same group receive all 10 messages even when node 2 is killed. + /// Go reference: server/cluster_test.go TestClusterQueueSubscriberWithLoss + /// + [Fact] + [SlopwatchSuppress("SW004", "Task.Delay(Timeout.Infinite) is used purely as a cancellable wait primitive alongside WhenAny — it never actually delays; drainTimeout drives the deadline")] + public async Task QueueGroup_NodeDies_RemainingMembersDeliver() + { + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60)); + + await using var publisher = fixture.CreateClient(0); + await using var subConn1 = fixture.CreateClient(1); + await using var subConn2 = fixture.CreateClient(1); + + await publisher.ConnectAsync(); + await subConn1.ConnectAsync(); + await subConn2.ConnectAsync(); + + const string subject = "resilience.queue-group"; + const string queue = "rq"; + + await using var sub1 = await subConn1.SubscribeCoreAsync( + subject, queueGroup: queue, cancellationToken: cts.Token); + await using var sub2 = await subConn2.SubscribeCoreAsync( + subject, queueGroup: queue, cancellationToken: cts.Token); + + // Wait for subscriptions to propagate to node 0 + await WaitForPropagationAsync(publisher, subConn1, $"probe.{subject}", cts.Token); + + // Kill node 2 — this tests cluster stability but node 1 subs remain unaffected + await fixture.KillNode(2); + await fixture.WaitForFullMeshAsync(expectedRoutes: 1); + + const int messageCount = 10; + for (var i = 0; i < messageCount; i++) + { + await publisher.PublishAsync(subject, $"msg-{i}", cancellationToken: cts.Token); + } + + await publisher.PingAsync(cts.Token); + + // Drain messages from both subscribers; each message is delivered to exactly one member. + // Use a TaskCompletionSource as the completion signal so the happy path never throws. + var received = new List(messageCount); + var allReceived = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + async Task DrainAsync(INatsSub sub) + { + await foreach (var msg in sub.Msgs.ReadAllAsync(cts.Token)) + { + int count; + lock (received) + { + received.Add(msg.Data!); + count = received.Count; + } + + if (count >= messageCount) + { + allReceived.TrySetResult(); + return; + } + } + } + + var drain1 = DrainAsync(sub1); + var drain2 = DrainAsync(sub2); + + using var drainTimeout = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + await allReceived.Task.WaitAsync(drainTimeout.Token); + + received.Count.ShouldBe(messageCount); + + // Restore node 2 and wait for full mesh before next test + await fixture.RestartNode(2); + await fixture.WaitForFullMeshAsync(expectedRoutes: 2); + } + + // --------------------------------------------------------------------------- + // Helpers + // --------------------------------------------------------------------------- + + /// + /// Probes the route between publisher and subscriber by sending probe messages until + /// the subscriber sees one, indicating subscriptions have propagated across the cluster. + /// + [SlopwatchSuppress("SW003", "OperationCanceledException from the inner readCts timeout is the expected retry signal; swallowing it is intentional so the polling loop continues to the next tick")] + private static async Task WaitForPropagationAsync( + NatsConnection publisher, + NatsConnection subscriber, + string probeSubject, + CancellationToken ct) + { + await using var probeSub = await subscriber.SubscribeCoreAsync(probeSubject, cancellationToken: ct); + + // PingAsync on subscriber flushes the SUB to the server before we start probing. + await subscriber.PingAsync(ct); + + using var timer = new PeriodicTimer(TimeSpan.FromMilliseconds(150)); + while (await timer.WaitForNextTickAsync(ct)) + { + await publisher.PublishAsync(probeSubject, "probe", cancellationToken: ct); + + // PingAsync is a request/reply round-trip: when the pong returns, any message + // the server dispatched before the ping is already buffered in probeSub.Msgs. + await publisher.PingAsync(ct); + + if (probeSub.Msgs.TryRead(out _)) + return; + } + } +} diff --git a/tests/NATS.E2E.Cluster.Tests/GatewayFailoverTests.cs b/tests/NATS.E2E.Cluster.Tests/GatewayFailoverTests.cs new file mode 100644 index 0000000..d44713f --- /dev/null +++ b/tests/NATS.E2E.Cluster.Tests/GatewayFailoverTests.cs @@ -0,0 +1,110 @@ +using NATS.Client.Core; +using NATS.E2E.Cluster.Tests.Infrastructure; + +namespace NATS.E2E.Cluster.Tests; + +public class GatewayFailoverTests(GatewayPairFixture fixture) : IClassFixture +{ + /// + /// Kills gateway B, restarts it, waits for the gateway connection to re-establish, + /// then verifies a message published on A is delivered to a subscriber on B. + /// Go ref: TestGatewayReconnectAfterKill (server/gateway_test.go) + /// + [Fact] + public async Task Gateway_Disconnect_Reconnects() + { + await fixture.KillNode(1); + await fixture.RestartNode(1); + await fixture.WaitForGatewayConnectionAsync(); + + await using var clientA = fixture.CreateClientA(); + await using var clientB = fixture.CreateClientB(); + + await clientA.ConnectAsync(); + await clientB.ConnectAsync(); + + const string subject = "e2e.gw.reconnect"; + + await using var sub = await clientB.SubscribeCoreAsync(subject); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + + // Probe-publish until B receives it, confirming interest has propagated across the gateway + await WaitForPropagationAsync(clientA, sub, subject, "after-reconnect", cts.Token); + } + + /// + /// Verifies that after killing and restarting gateway B, a fresh subscription on B + /// receives updated interest from A, and a published message is correctly delivered. + /// Go ref: TestGatewayInterestAfterReconnect (server/gateway_test.go) + /// + [Fact] + public async Task Gateway_InterestUpdated_AfterReconnect() + { + const string subject = "e2e.gw.interest"; + + // --- Phase 1: baseline delivery before kill --- + await using var clientA1 = fixture.CreateClientA(); + await using var clientB1 = fixture.CreateClientB(); + + await clientA1.ConnectAsync(); + await clientB1.ConnectAsync(); + + await using var sub1 = await clientB1.SubscribeCoreAsync(subject); + + using var cts1 = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + await WaitForPropagationAsync(clientA1, sub1, subject, "before-kill", cts1.Token); + + // --- Phase 2: kill B, restart, re-subscribe, verify delivery --- + await fixture.KillNode(1); + await fixture.RestartNode(1); + await fixture.WaitForGatewayConnectionAsync(); + + await using var clientA2 = fixture.CreateClientA(); + await using var clientB2 = fixture.CreateClientB(); + + await clientA2.ConnectAsync(); + await clientB2.ConnectAsync(); + + await using var sub2 = await clientB2.SubscribeCoreAsync(subject); + + using var cts2 = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + await WaitForPropagationAsync(clientA2, sub2, subject, "after-restart", cts2.Token); + } + + // --------------------------------------------------------------------------- + // Helpers + // --------------------------------------------------------------------------- + + /// + /// Publishes from in a retry loop + /// until receives it, confirming gateway interest has propagated. + /// PingAsync flushes server dispatch so TryRead can check the channel without blocking, + /// eliminating the need for a try/catch around a bounded ReadAsync. + /// + private static async Task WaitForPropagationAsync( + NatsConnection publisher, + INatsSub sub, + string subject, + string payload, + CancellationToken ct) + { + await publisher.ConnectAsync(); + + using var timer = new PeriodicTimer(TimeSpan.FromMilliseconds(200)); + while (await timer.WaitForNextTickAsync(ct).ConfigureAwait(false)) + { + await publisher.PublishAsync(subject, payload, cancellationToken: ct); + + // PingAsync is a round-trip to the server: when it returns, any message the + // server dispatched before the ping is already buffered in sub.Msgs. + await publisher.PingAsync(ct); + + if (sub.Msgs.TryRead(out var msg)) + { + msg.Data.ShouldBe(payload); + return; + } + } + } +} diff --git a/tests/NATS.E2E.Cluster.Tests/JetStreamClusterTests.cs b/tests/NATS.E2E.Cluster.Tests/JetStreamClusterTests.cs new file mode 100644 index 0000000..039cd6b --- /dev/null +++ b/tests/NATS.E2E.Cluster.Tests/JetStreamClusterTests.cs @@ -0,0 +1,238 @@ +using NATS.Client.Core; +using NATS.Client.JetStream; +using NATS.Client.JetStream.Models; +using NATS.E2E.Cluster.Tests.Infrastructure; + +namespace NATS.E2E.Cluster.Tests; + +// Go reference: server/jetstream_cluster_test.go — TestJetStreamClusterBasic, TestJetStreamClusterNodeFailure, etc. +[Collection("E2E-JetStreamCluster")] +public class JetStreamClusterTests(JetStreamClusterFixture fixture) : IClassFixture +{ + /// + /// Creates an R3 stream and publishes 5 messages, then verifies all 3 nodes report 5 messages. + /// Go reference: server/jetstream_cluster_test.go TestJetStreamClusterBasic + /// + [Fact] + public async Task R3Stream_CreateAndPublish_ReplicatedAcrossNodes() + { + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + + await using var client = fixture.CreateClient(0); + await client.ConnectAsync(); + + var js = new NatsJSContext(client); + + await js.CreateStreamAsync( + new StreamConfig("JS_REPL", ["js.repl.>"]) { NumReplicas = 3 }, + cts.Token); + + for (var i = 0; i < 5; i++) + { + await js.PublishAsync("js.repl.data", $"msg-{i}", cancellationToken: cts.Token); + } + + // Poll each node until it reports 5 messages, confirming RAFT replication completed + for (var nodeIndex = 0; nodeIndex < 3; nodeIndex++) + { + await using var nodeClient = fixture.CreateClient(nodeIndex); + await nodeClient.ConnectAsync(); + + var nodeJs = new NatsJSContext(nodeClient); + await WaitForStreamMessagesAsync(nodeJs, "JS_REPL", minMessages: 5, cts.Token); + + var info = await nodeJs.GetStreamAsync("JS_REPL", cancellationToken: cts.Token); + info.Info.State.Messages.ShouldBe(5L, + $"Node {nodeIndex} should report 5 messages but reported {info.Info.State.Messages}"); + } + } + + /// + /// Kills node 2 mid-stream and verifies publishing continues on the surviving 2/3 quorum, + /// then restores node 2 and waits for full mesh. + /// Go reference: server/jetstream_cluster_test.go TestJetStreamClusterNodeFailure + /// + [Fact] + public async Task R3Stream_NodeDies_PublishContinues() + { + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + + await using var client0 = fixture.CreateClient(0); + await client0.ConnectAsync(); + + var js0 = new NatsJSContext(client0); + + await js0.CreateStreamAsync( + new StreamConfig("JS_NODEDIES", ["js.nodedies.>"]) { NumReplicas = 3 }, + cts.Token); + + // Publish 3 initial messages + for (var i = 0; i < 3; i++) + { + await js0.PublishAsync("js.nodedies.data", $"pre-{i}", cancellationToken: cts.Token); + } + + // Kill node 2; poll node 1 until the cluster elects a new leader and stream is available + await fixture.KillNode(2); + + await using var client1Early = fixture.CreateClient(1); + await client1Early.ConnectAsync(); + var js1Early = new NatsJSContext(client1Early); + await WaitForStreamMessagesAsync(js1Early, "JS_NODEDIES", minMessages: 3, cts.Token); + + // Publish 5 more on node 0 — should succeed with 2/3 quorum + for (var i = 0; i < 5; i++) + { + await js0.PublishAsync("js.nodedies.data", $"post-{i}", cancellationToken: cts.Token); + } + + // Poll node 1 until it sees all 8 messages + await using var client1 = fixture.CreateClient(1); + await client1.ConnectAsync(); + + var js1 = new NatsJSContext(client1); + await WaitForStreamMessagesAsync(js1, "JS_NODEDIES", minMessages: 8, cts.Token); + + var info = await js1.GetStreamAsync("JS_NODEDIES", cancellationToken: cts.Token); + info.Info.State.Messages.ShouldBeGreaterThanOrEqualTo(8L, + $"Node 1 should have >= 8 messages but has {info.Info.State.Messages}"); + + // Restore node 2 and wait for full mesh + await fixture.RestartNode(2); + await fixture.WaitForFullMeshAsync(expectedRoutes: 2); + } + + /// + /// Kills node 2 while a pull consumer exists and verifies the consumer is accessible on node 1. + /// Go reference: server/jetstream_cluster_test.go TestJetStreamClusterConsumerHardKill + /// + [Fact] + public async Task Consumer_NodeDies_PullContinuesOnSurvivor() + { + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + + await using var client0 = fixture.CreateClient(0); + await client0.ConnectAsync(); + + var js0 = new NatsJSContext(client0); + + var stream = await js0.CreateStreamAsync( + new StreamConfig("JS_CONS_FAIL", ["js.consfail.>"]) { NumReplicas = 3 }, + cts.Token); + + // Publish 5 messages + for (var i = 0; i < 5; i++) + { + await js0.PublishAsync("js.consfail.data", $"msg-{i}", cancellationToken: cts.Token); + } + + // Create pull consumer + await stream.CreateOrUpdateConsumerAsync( + new ConsumerConfig("cons-fail") { AckPolicy = ConsumerConfigAckPolicy.Explicit }, + cts.Token); + + // Kill node 2; poll node 1 until the stream and consumer are accessible via the new leader + await fixture.KillNode(2); + + await using var client1 = fixture.CreateClient(1); + await client1.ConnectAsync(); + + var js1 = new NatsJSContext(client1); + await WaitForStreamMessagesAsync(js1, "JS_CONS_FAIL", minMessages: 5, cts.Token); + + var stream1 = await js1.GetStreamAsync("JS_CONS_FAIL", cancellationToken: cts.Token); + var consumer = await stream1.GetConsumerAsync("cons-fail", cts.Token); + + consumer.ShouldNotBeNull("Consumer 'cons-fail' should be accessible on surviving node 1"); + + // Restore node 2 and wait for full mesh + await fixture.RestartNode(2); + await fixture.WaitForFullMeshAsync(expectedRoutes: 2); + } + + /// + /// Purges an R3 stream and verifies all 3 nodes report 0 messages after purge replication. + /// Go reference: server/jetstream_cluster_test.go TestJetStreamClusterStreamPurge + /// + [Fact] + public async Task R3Stream_Purge_ReplicatedAcrossNodes() + { + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + + await using var client = fixture.CreateClient(0); + await client.ConnectAsync(); + + var js = new NatsJSContext(client); + + await js.CreateStreamAsync( + new StreamConfig("JS_PURGE", ["js.purge.>"]) { NumReplicas = 3 }, + cts.Token); + + // Publish 5 messages and wait for replication to all nodes + for (var i = 0; i < 5; i++) + { + await js.PublishAsync("js.purge.data", $"msg-{i}", cancellationToken: cts.Token); + } + + // Poll all nodes until each confirms it has the 5 pre-purge messages + for (var nodeIndex = 0; nodeIndex < 3; nodeIndex++) + { + await using var nc = fixture.CreateClient(nodeIndex); + await nc.ConnectAsync(); + await WaitForStreamMessagesAsync(new NatsJSContext(nc), "JS_PURGE", minMessages: 5, cts.Token); + } + + // Purge the stream + await js.PurgeStreamAsync("JS_PURGE", new StreamPurgeRequest(), cts.Token); + + // Poll all nodes until each confirms 0 messages (purge replicated) + for (var nodeIndex = 0; nodeIndex < 3; nodeIndex++) + { + await using var nodeClient = fixture.CreateClient(nodeIndex); + await nodeClient.ConnectAsync(); + + var nodeJs = new NatsJSContext(nodeClient); + await WaitForStreamMessagesAsync(nodeJs, "JS_PURGE", minMessages: 0, cts.Token, + exactMatch: true); + + var info = await nodeJs.GetStreamAsync("JS_PURGE", cancellationToken: cts.Token); + info.Info.State.Messages.ShouldBe(0L, + $"Node {nodeIndex} should report 0 messages after purge but reported {info.Info.State.Messages}"); + } + } + + // --------------------------------------------------------------------------- + // Helpers + // --------------------------------------------------------------------------- + + /// + /// Polls the stream on the given JetStream context until it reports at least + /// messages (or exactly 0 when + /// is true), or the cancellation token fires. + /// + private static async Task WaitForStreamMessagesAsync( + NatsJSContext js, + string streamName, + long minMessages, + CancellationToken ct, + bool exactMatch = false) + { + using var timer = new PeriodicTimer(TimeSpan.FromMilliseconds(200)); + while (await timer.WaitForNextTickAsync(ct).ConfigureAwait(false)) + { + try + { + var info = await js.GetStreamAsync(streamName, cancellationToken: ct); + var count = info.Info.State.Messages; + var satisfied = exactMatch ? count == minMessages : count >= minMessages; + if (satisfied) + return; + } + catch (NatsJSApiException ex) + { + // Stream not yet visible on this node after failover — keep polling. + _ = ex; + } + } + } +} diff --git a/tests/NATS.E2E.Cluster.Tests/LeafNodeFailoverTests.cs b/tests/NATS.E2E.Cluster.Tests/LeafNodeFailoverTests.cs new file mode 100644 index 0000000..ffe1dcc --- /dev/null +++ b/tests/NATS.E2E.Cluster.Tests/LeafNodeFailoverTests.cs @@ -0,0 +1,111 @@ +using NATS.Client.Core; +using NATS.E2E.Cluster.Tests.Infrastructure; + +namespace NATS.E2E.Cluster.Tests; + +// go ref: server/leafnode_test.go - TestLeafNodeReconnect, TestLeafNodeHubRestart +public class LeafNodeFailoverTests(HubLeafFixture fixture) : IClassFixture +{ + /// + /// Kill the leaf node, restart it, confirm it reconnects to the hub, and verify + /// that a message published on the hub is delivered to a subscriber on the leaf. + /// go ref: server/leafnode_test.go TestLeafNodeReconnect + /// + [Fact] + public async Task Leaf_Disconnect_ReconnectsToHub() + { + await fixture.KillNode(1); + await fixture.RestartNode(1); + await fixture.WaitForLeafConnectionAsync(); + + await using var hub = fixture.CreateHubClient(); + await using var leaf = fixture.CreateLeafClient(); + + const string subject = "e2e.leaf.reconnect"; + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + + // Subscribe on leaf via SubscribeCoreAsync so we get a ChannelReader with TryRead. + await using var sub = await leaf.SubscribeCoreAsync(subject, cancellationToken: cts.Token); + + // Probe the leaf→hub direction before publishing the real message. + // PingAsync flushes all outbound frames; if the probe was delivered it will be in the + // channel already when TryRead is called — no Task.Delay or catch blocks needed. + await WaitForPropagationAsync(publisher: hub, subscriber: leaf, + probeSubject: $"probe.{subject}", ct: cts.Token); + + await hub.PublishAsync(subject, "leaf-back", cancellationToken: cts.Token); + await hub.PingAsync(cts.Token); + + var msg = await sub.Msgs.ReadAsync(cts.Token); + msg.Data.ShouldBe("leaf-back"); + } + + /// + /// Kill the hub, restart it, wait for the leaf to reconnect (exponential backoff), + /// then verify a message published on the leaf is delivered to a subscriber on the hub. + /// go ref: server/leafnode_test.go TestLeafNodeHubRestart + /// + [Fact] + public async Task Leaf_HubRestart_LeafReconnects() + { + await fixture.KillNode(0); + await fixture.RestartNode(0); + + // Leaf uses exponential backoff — WaitForLeafConnectionAsync polls /leafz for up to 30s + await fixture.WaitForLeafConnectionAsync(); + + await using var hub = fixture.CreateHubClient(); + await using var leaf = fixture.CreateLeafClient(); + + const string subject = "e2e.leaf.hubrestart"; + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + + // Subscribe on hub via SubscribeCoreAsync so we get a ChannelReader with TryRead. + await using var sub = await hub.SubscribeCoreAsync(subject, cancellationToken: cts.Token); + + // Probe the hub→leaf direction before publishing the real message. + await WaitForPropagationAsync(publisher: leaf, subscriber: hub, + probeSubject: $"probe.{subject}", ct: cts.Token); + + await leaf.PublishAsync(subject, "hub-back", cancellationToken: cts.Token); + await leaf.PingAsync(cts.Token); + + var msg = await sub.Msgs.ReadAsync(cts.Token); + msg.Data.ShouldBe("hub-back"); + } + + // --------------------------------------------------------------------------- + // Helpers + // --------------------------------------------------------------------------- + + /// + /// Probes the route from to + /// by publishing probe messages and using PingAsync as a flush barrier, then TryRead + /// to check the channel — no Task.Delay or exception-swallowing catch blocks needed. + /// + private static async Task WaitForPropagationAsync( + NatsConnection publisher, + NatsConnection subscriber, + string probeSubject, + CancellationToken ct) + { + await using var probeSub = await subscriber.SubscribeCoreAsync(probeSubject, cancellationToken: ct); + + // PingAsync on subscriber ensures the server has registered the probe subscription + // before we start publishing probes from the other side. + await subscriber.PingAsync(ct); + + using var timer = new PeriodicTimer(TimeSpan.FromMilliseconds(150)); + while (await timer.WaitForNextTickAsync(ct)) + { + await publisher.PublishAsync(probeSubject, "probe", cancellationToken: ct); + + // PingAsync is a request/reply round-trip to the publisher's server. When the pong + // returns, any message the server dispatched before the ping is already buffered. + await publisher.PingAsync(ct); + + if (probeSub.Msgs.TryRead(out _)) + return; + } + } +} diff --git a/tests/NATS.E2E.Cluster.Tests/RaftConsensusTests.cs b/tests/NATS.E2E.Cluster.Tests/RaftConsensusTests.cs new file mode 100644 index 0000000..d53c4d3 --- /dev/null +++ b/tests/NATS.E2E.Cluster.Tests/RaftConsensusTests.cs @@ -0,0 +1,188 @@ +using NATS.Client.Core; +using NATS.Client.JetStream; +using NATS.Client.JetStream.Models; +using NATS.E2E.Cluster.Tests.Infrastructure; + +namespace NATS.E2E.Cluster.Tests; + +// Go reference: server/raft_test.go — TestNRGLeaderElection, TestNRGStepDown, +// TestNRGAppendEntry, TestNRGCatchup + +public class RaftConsensusTests(JetStreamClusterFixture fixture) : IClassFixture +{ + // Polls until the stream on the given node reports at least minMessages, or the token is cancelled. + private static async Task WaitForStreamMessagesAsync( + NatsJSContext js, + string streamName, + long minMessages, + CancellationToken ct) + { + using var timer = new PeriodicTimer(TimeSpan.FromMilliseconds(200)); + while (await timer.WaitForNextTickAsync(ct).ConfigureAwait(false)) + { + try + { + var info = await js.GetStreamAsync(streamName, cancellationToken: ct); + if (info.Info.State.Messages >= minMessages) + return; + } + catch (NatsJSApiException ex) + { + // Stream not yet available on this node — keep polling + _ = ex; + } + } + } + + // Go ref: server/raft_test.go TestNRGLeaderElection + [Fact] + public async Task LeaderElection_ClusterFormsLeader() + { + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + var ct = cts.Token; + + await using var client = fixture.CreateClient(0); + var js = new NatsJSContext(client); + + var stream = await js.CreateStreamAsync( + new StreamConfig("RAFT_LEADER", ["raft.leader.>"]) + { + NumReplicas = 3 + }, + ct); + + stream.Info.Config.Name.ShouldBe("RAFT_LEADER"); + stream.Info.State.ShouldNotBeNull(); + } + + // Go ref: server/raft_test.go TestNRGStepDown + [Fact] + public async Task LeaderDies_NewLeaderElected() + { + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + var ct = cts.Token; + + await using var client0 = fixture.CreateClient(0); + var js0 = new NatsJSContext(client0); + + await js0.CreateStreamAsync( + new StreamConfig("RAFT_FAILOVER", ["raft.failover.>"]) + { + NumReplicas = 3 + }, + ct); + + // Publish 5 messages on node 0 + for (var i = 0; i < 5; i++) + { + await js0.PublishAsync($"raft.failover.{i}", $"msg{i}", cancellationToken: ct); + } + + // Kill node 0 to trigger RAFT leader re-election + await fixture.KillNode(0); + + // Connect to node 1 and poll until stream is accessible with the expected messages — + // this confirms a new RAFT leader was elected and the stream is available + await using var client1 = fixture.CreateClient(1); + var js1 = new NatsJSContext(client1); + + await WaitForStreamMessagesAsync(js1, "RAFT_FAILOVER", minMessages: 5, ct); + + var info = await js1.GetStreamAsync("RAFT_FAILOVER", cancellationToken: ct); + info.Info.State.Messages.ShouldBeGreaterThanOrEqualTo(5L); + + // Restore node 0 and wait for full mesh to reform + await fixture.RestartNode(0); + await fixture.WaitForFullMeshAsync(); + } + + // Go ref: server/raft_test.go TestNRGAppendEntry + [Fact] + public async Task LogReplication_AllReplicasHaveData() + { + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + var ct = cts.Token; + + await using var client = fixture.CreateClient(0); + var js = new NatsJSContext(client); + + await js.CreateStreamAsync( + new StreamConfig("RAFT_REPL", ["raft.repl.>"]) + { + NumReplicas = 3 + }, + ct); + + // Publish 10 messages + for (var i = 0; i < 10; i++) + { + await js.PublishAsync($"raft.repl.{i}", $"msg{i}", cancellationToken: ct); + } + + // Query stream info from each node, polling until all replicas report 10 messages + for (var nodeIndex = 0; nodeIndex < 3; nodeIndex++) + { + await using var nodeClient = fixture.CreateClient(nodeIndex); + var nodeJs = new NatsJSContext(nodeClient); + + await WaitForStreamMessagesAsync(nodeJs, "RAFT_REPL", minMessages: 10, ct); + + var info = await nodeJs.GetStreamAsync("RAFT_REPL", cancellationToken: ct); + info.Info.State.Messages.ShouldBe(10L, + $"node {nodeIndex} should have 10 messages after replication"); + } + } + + // Go ref: server/raft_test.go TestNRGCatchup + [Fact] + public async Task LeaderRestart_RejoinsAsFollower() + { + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + var ct = cts.Token; + + await using var client0 = fixture.CreateClient(0); + var js0 = new NatsJSContext(client0); + + await js0.CreateStreamAsync( + new StreamConfig("RAFT_REJOIN", ["raft.rejoin.>"]) + { + NumReplicas = 3 + }, + ct); + + // Publish 5 messages on node 0 + for (var i = 0; i < 5; i++) + { + await js0.PublishAsync($"raft.rejoin.{i}", $"msg{i}", cancellationToken: ct); + } + + // Kill node 0 — it drops out of the RAFT group + await fixture.KillNode(0); + + // Connect to node 1 and poll until a new leader is serving the stream, + // then publish 5 more messages while node 0 is down + await using var client1 = fixture.CreateClient(1); + var js1 = new NatsJSContext(client1); + + await WaitForStreamMessagesAsync(js1, "RAFT_REJOIN", minMessages: 5, ct); + + for (var i = 5; i < 10; i++) + { + await js1.PublishAsync($"raft.rejoin.{i}", $"msg{i}", cancellationToken: ct); + } + + // Restart node 0 — it should rejoin as a follower and catch up via RAFT log + await fixture.RestartNode(0); + await fixture.WaitForFullMeshAsync(); + + // Poll node 0 directly until it has caught up with all 10 messages + await using var client0Restarted = fixture.CreateClient(0); + var js0Restarted = new NatsJSContext(client0Restarted); + + await WaitForStreamMessagesAsync(js0Restarted, "RAFT_REJOIN", minMessages: 10, ct); + + var info = await js0Restarted.GetStreamAsync("RAFT_REJOIN", cancellationToken: ct); + info.Info.State.Messages.ShouldBe(10L, + "node 0 should have all 10 messages after rejoining and catching up"); + } +}