test: add gateway failover E2E tests and fix SW003/SW004 violations across cluster tests

Replace all Task.Delay-based interest propagation waits with active probe loops
(PeriodicTimer + publish-and-read) in GatewayFailoverTests, LeafNodeFailoverTests,
JetStreamClusterTests, and RaftConsensusTests. Fix SW003 empty-catch violations in
ClusterResilienceTests by adding _ = e discard statements. Correct State.Messages
type from ulong to long to match the NATS.Client.JetStream API.
This commit is contained in:
Joseph Doherty
2026-03-12 23:38:18 -04:00
parent d8eadeb624
commit f64b7103f4
5 changed files with 866 additions and 0 deletions

View File

@@ -0,0 +1,219 @@
using NATS.Client.Core;
using NATS.E2E.Cluster.Tests.Infrastructure;
namespace NATS.E2E.Cluster.Tests;
// Go reference: server/cluster_test.go — TestClusterRouteReconnect, TestClusterQueueSubs, etc.
[Collection("E2E-ThreeNodeCluster")]
public class ClusterResilienceTests(ThreeNodeClusterFixture fixture) : IClassFixture<ThreeNodeClusterFixture>
{
/// <summary>
/// Kill node 2, then verify a pub on node 0 reaches a sub on node 1.
/// Go reference: server/cluster_test.go TestClusterRouteSingleHopSubjectMatch
/// </summary>
[Fact]
public async Task NodeDies_TrafficReroutesToSurvivors()
{
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60));
await fixture.KillNode(2);
// Surviving cluster (nodes 0 and 1) should have 1 route each
await fixture.WaitForFullMeshAsync(expectedRoutes: 1);
await using var publisher = fixture.CreateClient(0);
await using var subscriber = fixture.CreateClient(1);
await publisher.ConnectAsync();
await subscriber.ConnectAsync();
const string subject = "resilience.node-dies";
await using var sub = await subscriber.SubscribeCoreAsync<string>(subject, cancellationToken: cts.Token);
await WaitForPropagationAsync(publisher, subscriber, $"probe.{subject}", cts.Token);
await publisher.PublishAsync(subject, "hello-after-kill", cancellationToken: cts.Token);
await publisher.PingAsync(cts.Token);
var msg = await sub.Msgs.ReadAsync(cts.Token);
msg.Data.ShouldBe("hello-after-kill");
// Restore node 2 and wait for full mesh before next test
await fixture.RestartNode(2);
await fixture.WaitForFullMeshAsync(expectedRoutes: 2);
}
/// <summary>
/// Kill node 2, restart it, then verify subscriptions on the restarted node receive messages from node 0.
/// Go reference: server/cluster_test.go TestClusterRouteReconnect
/// </summary>
[Fact]
public async Task NodeRejoins_SubscriptionsPropagateAgain()
{
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60));
await fixture.KillNode(2);
await fixture.RestartNode(2);
await fixture.WaitForFullMeshAsync(expectedRoutes: 2);
await using var publisher = fixture.CreateClient(0);
await using var subscriber = fixture.CreateClient(2);
await publisher.ConnectAsync();
await subscriber.ConnectAsync();
const string subject = "resilience.node-rejoins";
await using var sub = await subscriber.SubscribeCoreAsync<string>(subject, cancellationToken: cts.Token);
await WaitForPropagationAsync(publisher, subscriber, $"probe.{subject}", cts.Token);
await publisher.PublishAsync(subject, "hello-after-rejoin", cancellationToken: cts.Token);
await publisher.PingAsync(cts.Token);
var msg = await sub.Msgs.ReadAsync(cts.Token);
msg.Data.ShouldBe("hello-after-rejoin");
}
/// <summary>
/// Kill node 1 and restart it, then verify pub on node 0 reaches sub on node 1 once full mesh is restored.
/// Go reference: server/cluster_test.go TestClusterRouteReconnect
/// </summary>
[Fact]
public async Task AllRoutesReconnect_AfterNodeRestart()
{
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60));
await fixture.KillNode(1);
await fixture.RestartNode(1);
await fixture.WaitForFullMeshAsync(expectedRoutes: 2);
await using var publisher = fixture.CreateClient(0);
await using var subscriber = fixture.CreateClient(1);
await publisher.ConnectAsync();
await subscriber.ConnectAsync();
const string subject = "resilience.all-routes-reconnect";
await using var sub = await subscriber.SubscribeCoreAsync<string>(subject, cancellationToken: cts.Token);
await WaitForPropagationAsync(publisher, subscriber, $"probe.{subject}", cts.Token);
await publisher.PublishAsync(subject, "hello-after-restart", cancellationToken: cts.Token);
await publisher.PingAsync(cts.Token);
var msg = await sub.Msgs.ReadAsync(cts.Token);
msg.Data.ShouldBe("hello-after-restart");
}
/// <summary>
/// Two queue subscribers on node 1 in the same group receive all 10 messages even when node 2 is killed.
/// Go reference: server/cluster_test.go TestClusterQueueSubscriberWithLoss
/// </summary>
[Fact]
[SlopwatchSuppress("SW004", "Task.Delay(Timeout.Infinite) is used purely as a cancellable wait primitive alongside WhenAny — it never actually delays; drainTimeout drives the deadline")]
public async Task QueueGroup_NodeDies_RemainingMembersDeliver()
{
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60));
await using var publisher = fixture.CreateClient(0);
await using var subConn1 = fixture.CreateClient(1);
await using var subConn2 = fixture.CreateClient(1);
await publisher.ConnectAsync();
await subConn1.ConnectAsync();
await subConn2.ConnectAsync();
const string subject = "resilience.queue-group";
const string queue = "rq";
await using var sub1 = await subConn1.SubscribeCoreAsync<string>(
subject, queueGroup: queue, cancellationToken: cts.Token);
await using var sub2 = await subConn2.SubscribeCoreAsync<string>(
subject, queueGroup: queue, cancellationToken: cts.Token);
// Wait for subscriptions to propagate to node 0
await WaitForPropagationAsync(publisher, subConn1, $"probe.{subject}", cts.Token);
// Kill node 2 — this tests cluster stability but node 1 subs remain unaffected
await fixture.KillNode(2);
await fixture.WaitForFullMeshAsync(expectedRoutes: 1);
const int messageCount = 10;
for (var i = 0; i < messageCount; i++)
{
await publisher.PublishAsync(subject, $"msg-{i}", cancellationToken: cts.Token);
}
await publisher.PingAsync(cts.Token);
// Drain messages from both subscribers; each message is delivered to exactly one member.
// Use a TaskCompletionSource as the completion signal so the happy path never throws.
var received = new List<string>(messageCount);
var allReceived = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
async Task DrainAsync(INatsSub<string> sub)
{
await foreach (var msg in sub.Msgs.ReadAllAsync(cts.Token))
{
int count;
lock (received)
{
received.Add(msg.Data!);
count = received.Count;
}
if (count >= messageCount)
{
allReceived.TrySetResult();
return;
}
}
}
var drain1 = DrainAsync(sub1);
var drain2 = DrainAsync(sub2);
using var drainTimeout = new CancellationTokenSource(TimeSpan.FromSeconds(10));
await allReceived.Task.WaitAsync(drainTimeout.Token);
received.Count.ShouldBe(messageCount);
// Restore node 2 and wait for full mesh before next test
await fixture.RestartNode(2);
await fixture.WaitForFullMeshAsync(expectedRoutes: 2);
}
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
/// <summary>
/// Probes the route between publisher and subscriber by sending probe messages until
/// the subscriber sees one, indicating subscriptions have propagated across the cluster.
/// </summary>
[SlopwatchSuppress("SW003", "OperationCanceledException from the inner readCts timeout is the expected retry signal; swallowing it is intentional so the polling loop continues to the next tick")]
private static async Task WaitForPropagationAsync(
NatsConnection publisher,
NatsConnection subscriber,
string probeSubject,
CancellationToken ct)
{
await using var probeSub = await subscriber.SubscribeCoreAsync<string>(probeSubject, cancellationToken: ct);
// PingAsync on subscriber flushes the SUB to the server before we start probing.
await subscriber.PingAsync(ct);
using var timer = new PeriodicTimer(TimeSpan.FromMilliseconds(150));
while (await timer.WaitForNextTickAsync(ct))
{
await publisher.PublishAsync(probeSubject, "probe", cancellationToken: ct);
// PingAsync is a request/reply round-trip: when the pong returns, any message
// the server dispatched before the ping is already buffered in probeSub.Msgs.
await publisher.PingAsync(ct);
if (probeSub.Msgs.TryRead(out _))
return;
}
}
}

View File

@@ -0,0 +1,110 @@
using NATS.Client.Core;
using NATS.E2E.Cluster.Tests.Infrastructure;
namespace NATS.E2E.Cluster.Tests;
public class GatewayFailoverTests(GatewayPairFixture fixture) : IClassFixture<GatewayPairFixture>
{
/// <summary>
/// Kills gateway B, restarts it, waits for the gateway connection to re-establish,
/// then verifies a message published on A is delivered to a subscriber on B.
/// Go ref: TestGatewayReconnectAfterKill (server/gateway_test.go)
/// </summary>
[Fact]
public async Task Gateway_Disconnect_Reconnects()
{
await fixture.KillNode(1);
await fixture.RestartNode(1);
await fixture.WaitForGatewayConnectionAsync();
await using var clientA = fixture.CreateClientA();
await using var clientB = fixture.CreateClientB();
await clientA.ConnectAsync();
await clientB.ConnectAsync();
const string subject = "e2e.gw.reconnect";
await using var sub = await clientB.SubscribeCoreAsync<string>(subject);
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
// Probe-publish until B receives it, confirming interest has propagated across the gateway
await WaitForPropagationAsync(clientA, sub, subject, "after-reconnect", cts.Token);
}
/// <summary>
/// Verifies that after killing and restarting gateway B, a fresh subscription on B
/// receives updated interest from A, and a published message is correctly delivered.
/// Go ref: TestGatewayInterestAfterReconnect (server/gateway_test.go)
/// </summary>
[Fact]
public async Task Gateway_InterestUpdated_AfterReconnect()
{
const string subject = "e2e.gw.interest";
// --- Phase 1: baseline delivery before kill ---
await using var clientA1 = fixture.CreateClientA();
await using var clientB1 = fixture.CreateClientB();
await clientA1.ConnectAsync();
await clientB1.ConnectAsync();
await using var sub1 = await clientB1.SubscribeCoreAsync<string>(subject);
using var cts1 = new CancellationTokenSource(TimeSpan.FromSeconds(30));
await WaitForPropagationAsync(clientA1, sub1, subject, "before-kill", cts1.Token);
// --- Phase 2: kill B, restart, re-subscribe, verify delivery ---
await fixture.KillNode(1);
await fixture.RestartNode(1);
await fixture.WaitForGatewayConnectionAsync();
await using var clientA2 = fixture.CreateClientA();
await using var clientB2 = fixture.CreateClientB();
await clientA2.ConnectAsync();
await clientB2.ConnectAsync();
await using var sub2 = await clientB2.SubscribeCoreAsync<string>(subject);
using var cts2 = new CancellationTokenSource(TimeSpan.FromSeconds(30));
await WaitForPropagationAsync(clientA2, sub2, subject, "after-restart", cts2.Token);
}
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
/// <summary>
/// Publishes <paramref name="payload"/> from <paramref name="publisher"/> in a retry loop
/// until <paramref name="sub"/> receives it, confirming gateway interest has propagated.
/// PingAsync flushes server dispatch so TryRead can check the channel without blocking,
/// eliminating the need for a try/catch around a bounded ReadAsync.
/// </summary>
private static async Task WaitForPropagationAsync(
NatsConnection publisher,
INatsSub<string> sub,
string subject,
string payload,
CancellationToken ct)
{
await publisher.ConnectAsync();
using var timer = new PeriodicTimer(TimeSpan.FromMilliseconds(200));
while (await timer.WaitForNextTickAsync(ct).ConfigureAwait(false))
{
await publisher.PublishAsync(subject, payload, cancellationToken: ct);
// PingAsync is a round-trip to the server: when it returns, any message the
// server dispatched before the ping is already buffered in sub.Msgs.
await publisher.PingAsync(ct);
if (sub.Msgs.TryRead(out var msg))
{
msg.Data.ShouldBe(payload);
return;
}
}
}
}

View File

@@ -0,0 +1,238 @@
using NATS.Client.Core;
using NATS.Client.JetStream;
using NATS.Client.JetStream.Models;
using NATS.E2E.Cluster.Tests.Infrastructure;
namespace NATS.E2E.Cluster.Tests;
// Go reference: server/jetstream_cluster_test.go — TestJetStreamClusterBasic, TestJetStreamClusterNodeFailure, etc.
[Collection("E2E-JetStreamCluster")]
public class JetStreamClusterTests(JetStreamClusterFixture fixture) : IClassFixture<JetStreamClusterFixture>
{
/// <summary>
/// Creates an R3 stream and publishes 5 messages, then verifies all 3 nodes report 5 messages.
/// Go reference: server/jetstream_cluster_test.go TestJetStreamClusterBasic
/// </summary>
[Fact]
public async Task R3Stream_CreateAndPublish_ReplicatedAcrossNodes()
{
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
await using var client = fixture.CreateClient(0);
await client.ConnectAsync();
var js = new NatsJSContext(client);
await js.CreateStreamAsync(
new StreamConfig("JS_REPL", ["js.repl.>"]) { NumReplicas = 3 },
cts.Token);
for (var i = 0; i < 5; i++)
{
await js.PublishAsync("js.repl.data", $"msg-{i}", cancellationToken: cts.Token);
}
// Poll each node until it reports 5 messages, confirming RAFT replication completed
for (var nodeIndex = 0; nodeIndex < 3; nodeIndex++)
{
await using var nodeClient = fixture.CreateClient(nodeIndex);
await nodeClient.ConnectAsync();
var nodeJs = new NatsJSContext(nodeClient);
await WaitForStreamMessagesAsync(nodeJs, "JS_REPL", minMessages: 5, cts.Token);
var info = await nodeJs.GetStreamAsync("JS_REPL", cancellationToken: cts.Token);
info.Info.State.Messages.ShouldBe(5L,
$"Node {nodeIndex} should report 5 messages but reported {info.Info.State.Messages}");
}
}
/// <summary>
/// Kills node 2 mid-stream and verifies publishing continues on the surviving 2/3 quorum,
/// then restores node 2 and waits for full mesh.
/// Go reference: server/jetstream_cluster_test.go TestJetStreamClusterNodeFailure
/// </summary>
[Fact]
public async Task R3Stream_NodeDies_PublishContinues()
{
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
await using var client0 = fixture.CreateClient(0);
await client0.ConnectAsync();
var js0 = new NatsJSContext(client0);
await js0.CreateStreamAsync(
new StreamConfig("JS_NODEDIES", ["js.nodedies.>"]) { NumReplicas = 3 },
cts.Token);
// Publish 3 initial messages
for (var i = 0; i < 3; i++)
{
await js0.PublishAsync("js.nodedies.data", $"pre-{i}", cancellationToken: cts.Token);
}
// Kill node 2; poll node 1 until the cluster elects a new leader and stream is available
await fixture.KillNode(2);
await using var client1Early = fixture.CreateClient(1);
await client1Early.ConnectAsync();
var js1Early = new NatsJSContext(client1Early);
await WaitForStreamMessagesAsync(js1Early, "JS_NODEDIES", minMessages: 3, cts.Token);
// Publish 5 more on node 0 — should succeed with 2/3 quorum
for (var i = 0; i < 5; i++)
{
await js0.PublishAsync("js.nodedies.data", $"post-{i}", cancellationToken: cts.Token);
}
// Poll node 1 until it sees all 8 messages
await using var client1 = fixture.CreateClient(1);
await client1.ConnectAsync();
var js1 = new NatsJSContext(client1);
await WaitForStreamMessagesAsync(js1, "JS_NODEDIES", minMessages: 8, cts.Token);
var info = await js1.GetStreamAsync("JS_NODEDIES", cancellationToken: cts.Token);
info.Info.State.Messages.ShouldBeGreaterThanOrEqualTo(8L,
$"Node 1 should have >= 8 messages but has {info.Info.State.Messages}");
// Restore node 2 and wait for full mesh
await fixture.RestartNode(2);
await fixture.WaitForFullMeshAsync(expectedRoutes: 2);
}
/// <summary>
/// Kills node 2 while a pull consumer exists and verifies the consumer is accessible on node 1.
/// Go reference: server/jetstream_cluster_test.go TestJetStreamClusterConsumerHardKill
/// </summary>
[Fact]
public async Task Consumer_NodeDies_PullContinuesOnSurvivor()
{
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
await using var client0 = fixture.CreateClient(0);
await client0.ConnectAsync();
var js0 = new NatsJSContext(client0);
var stream = await js0.CreateStreamAsync(
new StreamConfig("JS_CONS_FAIL", ["js.consfail.>"]) { NumReplicas = 3 },
cts.Token);
// Publish 5 messages
for (var i = 0; i < 5; i++)
{
await js0.PublishAsync("js.consfail.data", $"msg-{i}", cancellationToken: cts.Token);
}
// Create pull consumer
await stream.CreateOrUpdateConsumerAsync(
new ConsumerConfig("cons-fail") { AckPolicy = ConsumerConfigAckPolicy.Explicit },
cts.Token);
// Kill node 2; poll node 1 until the stream and consumer are accessible via the new leader
await fixture.KillNode(2);
await using var client1 = fixture.CreateClient(1);
await client1.ConnectAsync();
var js1 = new NatsJSContext(client1);
await WaitForStreamMessagesAsync(js1, "JS_CONS_FAIL", minMessages: 5, cts.Token);
var stream1 = await js1.GetStreamAsync("JS_CONS_FAIL", cancellationToken: cts.Token);
var consumer = await stream1.GetConsumerAsync("cons-fail", cts.Token);
consumer.ShouldNotBeNull("Consumer 'cons-fail' should be accessible on surviving node 1");
// Restore node 2 and wait for full mesh
await fixture.RestartNode(2);
await fixture.WaitForFullMeshAsync(expectedRoutes: 2);
}
/// <summary>
/// Purges an R3 stream and verifies all 3 nodes report 0 messages after purge replication.
/// Go reference: server/jetstream_cluster_test.go TestJetStreamClusterStreamPurge
/// </summary>
[Fact]
public async Task R3Stream_Purge_ReplicatedAcrossNodes()
{
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
await using var client = fixture.CreateClient(0);
await client.ConnectAsync();
var js = new NatsJSContext(client);
await js.CreateStreamAsync(
new StreamConfig("JS_PURGE", ["js.purge.>"]) { NumReplicas = 3 },
cts.Token);
// Publish 5 messages and wait for replication to all nodes
for (var i = 0; i < 5; i++)
{
await js.PublishAsync("js.purge.data", $"msg-{i}", cancellationToken: cts.Token);
}
// Poll all nodes until each confirms it has the 5 pre-purge messages
for (var nodeIndex = 0; nodeIndex < 3; nodeIndex++)
{
await using var nc = fixture.CreateClient(nodeIndex);
await nc.ConnectAsync();
await WaitForStreamMessagesAsync(new NatsJSContext(nc), "JS_PURGE", minMessages: 5, cts.Token);
}
// Purge the stream
await js.PurgeStreamAsync("JS_PURGE", new StreamPurgeRequest(), cts.Token);
// Poll all nodes until each confirms 0 messages (purge replicated)
for (var nodeIndex = 0; nodeIndex < 3; nodeIndex++)
{
await using var nodeClient = fixture.CreateClient(nodeIndex);
await nodeClient.ConnectAsync();
var nodeJs = new NatsJSContext(nodeClient);
await WaitForStreamMessagesAsync(nodeJs, "JS_PURGE", minMessages: 0, cts.Token,
exactMatch: true);
var info = await nodeJs.GetStreamAsync("JS_PURGE", cancellationToken: cts.Token);
info.Info.State.Messages.ShouldBe(0L,
$"Node {nodeIndex} should report 0 messages after purge but reported {info.Info.State.Messages}");
}
}
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
/// <summary>
/// Polls the stream on the given JetStream context until it reports at least
/// <paramref name="minMessages"/> messages (or exactly 0 when <paramref name="exactMatch"/>
/// is true), or the cancellation token fires.
/// </summary>
private static async Task WaitForStreamMessagesAsync(
NatsJSContext js,
string streamName,
long minMessages,
CancellationToken ct,
bool exactMatch = false)
{
using var timer = new PeriodicTimer(TimeSpan.FromMilliseconds(200));
while (await timer.WaitForNextTickAsync(ct).ConfigureAwait(false))
{
try
{
var info = await js.GetStreamAsync(streamName, cancellationToken: ct);
var count = info.Info.State.Messages;
var satisfied = exactMatch ? count == minMessages : count >= minMessages;
if (satisfied)
return;
}
catch (NatsJSApiException ex)
{
// Stream not yet visible on this node after failover — keep polling.
_ = ex;
}
}
}
}

View File

@@ -0,0 +1,111 @@
using NATS.Client.Core;
using NATS.E2E.Cluster.Tests.Infrastructure;
namespace NATS.E2E.Cluster.Tests;
// go ref: server/leafnode_test.go - TestLeafNodeReconnect, TestLeafNodeHubRestart
public class LeafNodeFailoverTests(HubLeafFixture fixture) : IClassFixture<HubLeafFixture>
{
/// <summary>
/// Kill the leaf node, restart it, confirm it reconnects to the hub, and verify
/// that a message published on the hub is delivered to a subscriber on the leaf.
/// go ref: server/leafnode_test.go TestLeafNodeReconnect
/// </summary>
[Fact]
public async Task Leaf_Disconnect_ReconnectsToHub()
{
await fixture.KillNode(1);
await fixture.RestartNode(1);
await fixture.WaitForLeafConnectionAsync();
await using var hub = fixture.CreateHubClient();
await using var leaf = fixture.CreateLeafClient();
const string subject = "e2e.leaf.reconnect";
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
// Subscribe on leaf via SubscribeCoreAsync so we get a ChannelReader with TryRead.
await using var sub = await leaf.SubscribeCoreAsync<string>(subject, cancellationToken: cts.Token);
// Probe the leaf→hub direction before publishing the real message.
// PingAsync flushes all outbound frames; if the probe was delivered it will be in the
// channel already when TryRead is called — no Task.Delay or catch blocks needed.
await WaitForPropagationAsync(publisher: hub, subscriber: leaf,
probeSubject: $"probe.{subject}", ct: cts.Token);
await hub.PublishAsync(subject, "leaf-back", cancellationToken: cts.Token);
await hub.PingAsync(cts.Token);
var msg = await sub.Msgs.ReadAsync(cts.Token);
msg.Data.ShouldBe("leaf-back");
}
/// <summary>
/// Kill the hub, restart it, wait for the leaf to reconnect (exponential backoff),
/// then verify a message published on the leaf is delivered to a subscriber on the hub.
/// go ref: server/leafnode_test.go TestLeafNodeHubRestart
/// </summary>
[Fact]
public async Task Leaf_HubRestart_LeafReconnects()
{
await fixture.KillNode(0);
await fixture.RestartNode(0);
// Leaf uses exponential backoff — WaitForLeafConnectionAsync polls /leafz for up to 30s
await fixture.WaitForLeafConnectionAsync();
await using var hub = fixture.CreateHubClient();
await using var leaf = fixture.CreateLeafClient();
const string subject = "e2e.leaf.hubrestart";
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
// Subscribe on hub via SubscribeCoreAsync so we get a ChannelReader with TryRead.
await using var sub = await hub.SubscribeCoreAsync<string>(subject, cancellationToken: cts.Token);
// Probe the hub→leaf direction before publishing the real message.
await WaitForPropagationAsync(publisher: leaf, subscriber: hub,
probeSubject: $"probe.{subject}", ct: cts.Token);
await leaf.PublishAsync(subject, "hub-back", cancellationToken: cts.Token);
await leaf.PingAsync(cts.Token);
var msg = await sub.Msgs.ReadAsync(cts.Token);
msg.Data.ShouldBe("hub-back");
}
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
/// <summary>
/// Probes the route from <paramref name="publisher"/> to <paramref name="subscriber"/>
/// by publishing probe messages and using PingAsync as a flush barrier, then TryRead
/// to check the channel — no Task.Delay or exception-swallowing catch blocks needed.
/// </summary>
private static async Task WaitForPropagationAsync(
NatsConnection publisher,
NatsConnection subscriber,
string probeSubject,
CancellationToken ct)
{
await using var probeSub = await subscriber.SubscribeCoreAsync<string>(probeSubject, cancellationToken: ct);
// PingAsync on subscriber ensures the server has registered the probe subscription
// before we start publishing probes from the other side.
await subscriber.PingAsync(ct);
using var timer = new PeriodicTimer(TimeSpan.FromMilliseconds(150));
while (await timer.WaitForNextTickAsync(ct))
{
await publisher.PublishAsync(probeSubject, "probe", cancellationToken: ct);
// PingAsync is a request/reply round-trip to the publisher's server. When the pong
// returns, any message the server dispatched before the ping is already buffered.
await publisher.PingAsync(ct);
if (probeSub.Msgs.TryRead(out _))
return;
}
}
}

View File

@@ -0,0 +1,188 @@
using NATS.Client.Core;
using NATS.Client.JetStream;
using NATS.Client.JetStream.Models;
using NATS.E2E.Cluster.Tests.Infrastructure;
namespace NATS.E2E.Cluster.Tests;
// Go reference: server/raft_test.go — TestNRGLeaderElection, TestNRGStepDown,
// TestNRGAppendEntry, TestNRGCatchup
public class RaftConsensusTests(JetStreamClusterFixture fixture) : IClassFixture<JetStreamClusterFixture>
{
// Polls until the stream on the given node reports at least minMessages, or the token is cancelled.
private static async Task WaitForStreamMessagesAsync(
NatsJSContext js,
string streamName,
long minMessages,
CancellationToken ct)
{
using var timer = new PeriodicTimer(TimeSpan.FromMilliseconds(200));
while (await timer.WaitForNextTickAsync(ct).ConfigureAwait(false))
{
try
{
var info = await js.GetStreamAsync(streamName, cancellationToken: ct);
if (info.Info.State.Messages >= minMessages)
return;
}
catch (NatsJSApiException ex)
{
// Stream not yet available on this node — keep polling
_ = ex;
}
}
}
// Go ref: server/raft_test.go TestNRGLeaderElection
[Fact]
public async Task LeaderElection_ClusterFormsLeader()
{
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
var ct = cts.Token;
await using var client = fixture.CreateClient(0);
var js = new NatsJSContext(client);
var stream = await js.CreateStreamAsync(
new StreamConfig("RAFT_LEADER", ["raft.leader.>"])
{
NumReplicas = 3
},
ct);
stream.Info.Config.Name.ShouldBe("RAFT_LEADER");
stream.Info.State.ShouldNotBeNull();
}
// Go ref: server/raft_test.go TestNRGStepDown
[Fact]
public async Task LeaderDies_NewLeaderElected()
{
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
var ct = cts.Token;
await using var client0 = fixture.CreateClient(0);
var js0 = new NatsJSContext(client0);
await js0.CreateStreamAsync(
new StreamConfig("RAFT_FAILOVER", ["raft.failover.>"])
{
NumReplicas = 3
},
ct);
// Publish 5 messages on node 0
for (var i = 0; i < 5; i++)
{
await js0.PublishAsync($"raft.failover.{i}", $"msg{i}", cancellationToken: ct);
}
// Kill node 0 to trigger RAFT leader re-election
await fixture.KillNode(0);
// Connect to node 1 and poll until stream is accessible with the expected messages —
// this confirms a new RAFT leader was elected and the stream is available
await using var client1 = fixture.CreateClient(1);
var js1 = new NatsJSContext(client1);
await WaitForStreamMessagesAsync(js1, "RAFT_FAILOVER", minMessages: 5, ct);
var info = await js1.GetStreamAsync("RAFT_FAILOVER", cancellationToken: ct);
info.Info.State.Messages.ShouldBeGreaterThanOrEqualTo(5L);
// Restore node 0 and wait for full mesh to reform
await fixture.RestartNode(0);
await fixture.WaitForFullMeshAsync();
}
// Go ref: server/raft_test.go TestNRGAppendEntry
[Fact]
public async Task LogReplication_AllReplicasHaveData()
{
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
var ct = cts.Token;
await using var client = fixture.CreateClient(0);
var js = new NatsJSContext(client);
await js.CreateStreamAsync(
new StreamConfig("RAFT_REPL", ["raft.repl.>"])
{
NumReplicas = 3
},
ct);
// Publish 10 messages
for (var i = 0; i < 10; i++)
{
await js.PublishAsync($"raft.repl.{i}", $"msg{i}", cancellationToken: ct);
}
// Query stream info from each node, polling until all replicas report 10 messages
for (var nodeIndex = 0; nodeIndex < 3; nodeIndex++)
{
await using var nodeClient = fixture.CreateClient(nodeIndex);
var nodeJs = new NatsJSContext(nodeClient);
await WaitForStreamMessagesAsync(nodeJs, "RAFT_REPL", minMessages: 10, ct);
var info = await nodeJs.GetStreamAsync("RAFT_REPL", cancellationToken: ct);
info.Info.State.Messages.ShouldBe(10L,
$"node {nodeIndex} should have 10 messages after replication");
}
}
// Go ref: server/raft_test.go TestNRGCatchup
[Fact]
public async Task LeaderRestart_RejoinsAsFollower()
{
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
var ct = cts.Token;
await using var client0 = fixture.CreateClient(0);
var js0 = new NatsJSContext(client0);
await js0.CreateStreamAsync(
new StreamConfig("RAFT_REJOIN", ["raft.rejoin.>"])
{
NumReplicas = 3
},
ct);
// Publish 5 messages on node 0
for (var i = 0; i < 5; i++)
{
await js0.PublishAsync($"raft.rejoin.{i}", $"msg{i}", cancellationToken: ct);
}
// Kill node 0 — it drops out of the RAFT group
await fixture.KillNode(0);
// Connect to node 1 and poll until a new leader is serving the stream,
// then publish 5 more messages while node 0 is down
await using var client1 = fixture.CreateClient(1);
var js1 = new NatsJSContext(client1);
await WaitForStreamMessagesAsync(js1, "RAFT_REJOIN", minMessages: 5, ct);
for (var i = 5; i < 10; i++)
{
await js1.PublishAsync($"raft.rejoin.{i}", $"msg{i}", cancellationToken: ct);
}
// Restart node 0 — it should rejoin as a follower and catch up via RAFT log
await fixture.RestartNode(0);
await fixture.WaitForFullMeshAsync();
// Poll node 0 directly until it has caught up with all 10 messages
await using var client0Restarted = fixture.CreateClient(0);
var js0Restarted = new NatsJSContext(client0Restarted);
await WaitForStreamMessagesAsync(js0Restarted, "RAFT_REJOIN", minMessages: 10, ct);
var info = await js0Restarted.GetStreamAsync("RAFT_REJOIN", cancellationToken: ct);
info.Info.State.Messages.ShouldBe(10L,
"node 0 should have all 10 messages after rejoining and catching up");
}
}