diff --git a/docs/plans/2026-03-12-e2e-cluster-tests-plan.md b/docs/plans/2026-03-12-e2e-cluster-tests-plan.md new file mode 100644 index 0000000..34b063b --- /dev/null +++ b/docs/plans/2026-03-12-e2e-cluster-tests-plan.md @@ -0,0 +1,1048 @@ +# E2E Cluster & RAFT Tests Implementation Plan + +> **For Claude:** REQUIRED SUB-SKILL: Use superpowers-extended-cc:executing-plans to implement this plan task-by-task. + +**Goal:** Create `NATS.E2E.Cluster.Tests` with 16 E2E tests covering cluster resilience, RAFT consensus, JetStream cluster replication, gateway failover, and leaf node failover. + +**Architecture:** Single xUnit 3 project, 4 fixtures (ThreeNodeCluster, JetStreamCluster, GatewayPair, HubLeaf), all running real multi-process servers via `NatsServerProcess`. + +**Tech Stack:** .NET 10, xUnit 3, Shouldly, NATS.Client.Core, NATS.Client.JetStream, NATS.NKeys + +--- + +### Task 0: Create project scaffold and add to solution + +**Files:** +- Create: `tests/NATS.E2E.Cluster.Tests/NATS.E2E.Cluster.Tests.csproj` +- Modify: `NatsDotNet.slnx` + +**Step 1: Create the csproj file** + +```xml + + + + false + + + + + + + + + + + + + + + + + + + +``` + +**Step 2: Add to solution file** + +Add `` to the `/tests/` folder in `NatsDotNet.slnx`. + +**Step 3: Verify build** + +Run: `dotnet build tests/NATS.E2E.Cluster.Tests` +Expected: Build succeeded, 0 errors + +**Step 4: Commit** + +```bash +git add tests/NATS.E2E.Cluster.Tests/NATS.E2E.Cluster.Tests.csproj NatsDotNet.slnx +git commit -m "feat: scaffold NATS.E2E.Cluster.Tests project" +``` + +--- + +### Task 1: Copy NatsServerProcess and create base infrastructure + +**Files:** +- Create: `tests/NATS.E2E.Cluster.Tests/Infrastructure/NatsServerProcess.cs` + +**Step 1: Copy NatsServerProcess from NATS.E2E.Tests** + +Copy `tests/NATS.E2E.Tests/Infrastructure/NatsServerProcess.cs` to `tests/NATS.E2E.Cluster.Tests/Infrastructure/NatsServerProcess.cs`. Change the namespace from `NATS.E2E.Tests.Infrastructure` to `NATS.E2E.Cluster.Tests.Infrastructure`. + +**Step 2: Verify build** + +Run: `dotnet build tests/NATS.E2E.Cluster.Tests` +Expected: Build succeeded + +**Step 3: Commit** + +```bash +git add tests/NATS.E2E.Cluster.Tests/Infrastructure/NatsServerProcess.cs +git commit -m "feat: add NatsServerProcess to cluster E2E infrastructure" +``` + +--- + +### Task 2: Create ThreeNodeClusterFixture with KillNode/RestartNode + +**Files:** +- Create: `tests/NATS.E2E.Cluster.Tests/Infrastructure/ThreeNodeClusterFixture.cs` + +**Step 1: Write the fixture** + +Model after existing `ClusterFixture` from `NATS.E2E.Tests` but with these additions: + +- Store `NatsServerProcess[]` array (3 servers) and parallel `string[]` for config content and `int[]` for cluster ports +- Pre-allocate all ports (client, cluster, monitoring) in constructor or `InitializeAsync` +- Full-mesh routes config with `pool_size: 1`, monitoring enabled, `-DV` flags +- `WaitForFullMeshAsync()` — polls `/routez` on all alive nodes until each reports `NumRoutes >= expectedRouteCount` +- `KillNode(int index)` — calls `DisposeAsync()` on that server process, nulls it out +- `RestartNode(int index)` — creates a new `NatsServerProcess` using the saved config content and ports, calls `StartAsync()` +- `WaitForFullMeshAsync(int expectedRoutes = 2)` — parameterized so after a kill, we can wait for `NumRoutes >= 1` on survivors +- `CreateClient(int index)` — returns `NatsConnection` to specified node +- `GetServerOutput(int index)` — returns output from specified node +- Private JSON model `Routez` with `num_routes` property + +The key difference from the existing `ClusterFixture`: the fixture must support restarting servers on the same ports. This means: +- The `NatsServerProcess` constructor takes a pre-assigned port. We need a new constructor or factory method. **However**, looking at the existing `NatsServerProcess`, it allocates its own port in the constructor. For the kill/restart pattern, we need the port to be stable. The simplest approach: add a new `NatsServerProcess` constructor overload that accepts a pre-assigned port. Copy this into our local version. + +Add to `NatsServerProcess`: +```csharp +/// +/// Constructor with pre-assigned port for kill/restart scenarios. +/// +public NatsServerProcess(int port, string[]? extraArgs = null, string? configContent = null, bool enableMonitoring = false, int? monitorPort = null) +{ + Port = port; + _extraArgs = extraArgs; + _configContent = configContent; + _enableMonitoring = enableMonitoring; + if (monitorPort.HasValue) + MonitorPort = monitorPort.Value; + else if (_enableMonitoring) + MonitorPort = AllocateFreePort(); +} +``` + +Fixture pattern: +```csharp +namespace NATS.E2E.Cluster.Tests.Infrastructure; + +public sealed class ThreeNodeClusterFixture : IAsyncLifetime +{ + private NatsServerProcess?[] _servers = new NatsServerProcess?[3]; + private readonly int[] _clientPorts = new int[3]; + private readonly int[] _clusterPorts = new int[3]; + private readonly int[] _monitorPorts = new int[3]; + private readonly string[] _configs = new string[3]; + + public int GetPort(int index) => _clientPorts[index]; + public int GetMonitorPort(int index) => _monitorPorts[index]; + + public async Task InitializeAsync() { /* allocate ports, build configs, start 3, wait mesh */ } + public async Task KillNode(int index) { /* dispose server, null it */ } + public async Task RestartNode(int index) { /* recreate server on same ports, start */ } + public async Task WaitForFullMeshAsync(int expectedRoutes = 2) { /* poll /routez */ } + public NatsConnection CreateClient(int index) { /* ... */ } + public string GetServerOutput(int index) { /* ... */ } + public async Task DisposeAsync() { /* dispose all */ } +} +``` + +**Step 2: Write a smoke test to verify the fixture** + +Create a temporary test (can be removed later or kept): +```csharp +// In a temp file or ClusterResilienceTests.cs +[Fact] +public async Task Fixture_Smoke_ThreeNodeClusterStarts() +{ + // Just verifies the fixture starts without errors — implicit via IAsyncLifetime + await Task.CompletedTask; // fixture.InitializeAsync already ran +} +``` + +**Step 3: Run the smoke test** + +Run: `dotnet test tests/NATS.E2E.Cluster.Tests --filter "FullyQualifiedName~Fixture_Smoke" -v normal` +Expected: PASS + +**Step 4: Commit** + +```bash +git add tests/NATS.E2E.Cluster.Tests/Infrastructure/ +git commit -m "feat: add ThreeNodeClusterFixture with KillNode/RestartNode" +``` + +--- + +### Task 3: Create JetStreamClusterFixture + +**Files:** +- Create: `tests/NATS.E2E.Cluster.Tests/Infrastructure/JetStreamClusterFixture.cs` + +**Step 1: Write the fixture** + +Same as `ThreeNodeClusterFixture` but with `jetstream { store_dir, max_mem_store, max_file_store }` in each node's config. Store dirs are created in `/tmp/nats-e2e-js-cluster-{guid}-node{i}` and deleted on dispose. + +Key differences from `ThreeNodeClusterFixture`: +- Config includes `jetstream { store_dir: "{storeDir}" max_mem_store: 64mb max_file_store: 256mb }` +- `DisposeAsync` also deletes store directories +- `RestartNode` does NOT delete the store dir (preserves state for catchup tests) + +Pattern: +```csharp +public sealed class JetStreamClusterFixture : IAsyncLifetime +{ + // Same fields as ThreeNodeClusterFixture plus: + private readonly string[] _storeDirs = new string[3]; + + // Config template adds jetstream block + // Dispose cleans up store dirs + // RestartNode preserves store dirs +} +``` + +**Step 2: Verify build** + +Run: `dotnet build tests/NATS.E2E.Cluster.Tests` +Expected: Build succeeded + +**Step 3: Commit** + +```bash +git add tests/NATS.E2E.Cluster.Tests/Infrastructure/JetStreamClusterFixture.cs +git commit -m "feat: add JetStreamClusterFixture for R3 replication tests" +``` + +--- + +### Task 4: Create GatewayPairFixture with KillNode/RestartNode + +**Files:** +- Create: `tests/NATS.E2E.Cluster.Tests/Infrastructure/GatewayPairFixture.cs` + +**Step 1: Write the fixture** + +Model after existing `GatewayFixture` from `NATS.E2E.Tests` but with: +- Pre-allocated ports (client, gateway, monitoring) for both servers +- `KillNode(int index)` / `RestartNode(int index)` (index 0 = server A, 1 = server B) +- `WaitForGatewayConnectionAsync()` — polls `/gatewayz` on alive servers until `NumGateways >= 1` +- `CreateClientA()` / `CreateClientB()` + +```csharp +public sealed class GatewayPairFixture : IAsyncLifetime +{ + private NatsServerProcess?[] _servers = new NatsServerProcess?[2]; + private readonly int[] _clientPorts = new int[2]; + private readonly int[] _gwPorts = new int[2]; + private readonly int[] _monitorPorts = new int[2]; + private readonly string[] _configs = new string[2]; + + public async Task KillNode(int index) { /* ... */ } + public async Task RestartNode(int index) { /* ... */ } + public async Task WaitForGatewayConnectionAsync() { /* ... */ } + // ... +} +``` + +**Step 2: Verify build** + +Run: `dotnet build tests/NATS.E2E.Cluster.Tests` +Expected: Build succeeded + +**Step 3: Commit** + +```bash +git add tests/NATS.E2E.Cluster.Tests/Infrastructure/GatewayPairFixture.cs +git commit -m "feat: add GatewayPairFixture for failover tests" +``` + +--- + +### Task 5: Create HubLeafFixture with KillNode/RestartNode + +**Files:** +- Create: `tests/NATS.E2E.Cluster.Tests/Infrastructure/HubLeafFixture.cs` + +**Step 1: Write the fixture** + +Model after existing `LeafNodeFixture` from `NATS.E2E.Tests` but with: +- Pre-allocated ports (client, leaf-listen, monitoring) for hub and leaf +- `KillNode(int index)` / `RestartNode(int index)` (index 0 = hub, 1 = leaf) +- `WaitForLeafConnectionAsync()` — polls hub's `/leafz` until `NumLeafs >= 1` +- Hub starts first in `InitializeAsync()`, leaf starts after hub is ready +- `RestartNode(0)` for hub: restart hub, then wait for leaf to reconnect +- `RestartNode(1)` for leaf: restart leaf, then wait for hub to detect connection + +```csharp +public sealed class HubLeafFixture : IAsyncLifetime +{ + private NatsServerProcess?[] _servers = new NatsServerProcess?[2]; // 0=hub, 1=leaf + private readonly int[] _clientPorts = new int[2]; + private readonly int _leafListenPort; + private readonly int _hubMonitorPort; + private readonly string[] _configs = new string[2]; + + public int HubPort => _clientPorts[0]; + public int LeafPort => _clientPorts[1]; + + public async Task KillNode(int index) { /* ... */ } + public async Task RestartNode(int index) { /* ... */ } + public async Task WaitForLeafConnectionAsync() { /* ... */ } + public NatsConnection CreateHubClient() { /* ... */ } + public NatsConnection CreateLeafClient() { /* ... */ } +} +``` + +**Step 2: Verify build** + +Run: `dotnet build tests/NATS.E2E.Cluster.Tests` +Expected: Build succeeded + +**Step 3: Commit** + +```bash +git add tests/NATS.E2E.Cluster.Tests/Infrastructure/HubLeafFixture.cs +git commit -m "feat: add HubLeafFixture for leaf node failover tests" +``` + +--- + +### Task 6: Write ClusterResilienceTests (4 tests) + +**Files:** +- Create: `tests/NATS.E2E.Cluster.Tests/ClusterResilienceTests.cs` + +**Step 1: Write the failing tests** + +```csharp +using NATS.Client.Core; +using NATS.E2E.Cluster.Tests.Infrastructure; + +namespace NATS.E2E.Cluster.Tests; + +public class ClusterResilienceTests(ThreeNodeClusterFixture fixture) : IClassFixture +{ + [Fact] + public async Task NodeDies_TrafficReroutesToSurvivors() + { + // Kill node 2 + await fixture.KillNode(2); + + // Pub on node 0, sub on node 1 — should still work + await using var pub = fixture.CreateClient(0); + await using var sub = fixture.CreateClient(1); + await pub.ConnectAsync(); + await sub.ConnectAsync(); + + await using var subscription = await sub.SubscribeCoreAsync("e2e.resilience.reroute"); + await sub.PingAsync(); + + // Wait for route between node 0 and node 1 to stabilize + await fixture.WaitForFullMeshAsync(expectedRoutes: 1); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + await WaitForPropagationAsync(pub, sub, "e2e.resilience.reroute.probe", cts.Token); + + await pub.PublishAsync("e2e.resilience.reroute", "still-alive", cancellationToken: cts.Token); + var msg = await subscription.Msgs.ReadAsync(cts.Token); + msg.Data.ShouldBe("still-alive"); + + // Restart node 2 for fixture cleanup + await fixture.RestartNode(2); + await fixture.WaitForFullMeshAsync(); + } + + [Fact] + public async Task NodeRejoins_SubscriptionsPropagateAgain() + { + // Kill node 2 + await fixture.KillNode(2); + + // Restart node 2 + await fixture.RestartNode(2); + await fixture.WaitForFullMeshAsync(); + + // Subscribe on restarted node 2, publish on node 0 + await using var pub = fixture.CreateClient(0); + await using var sub = fixture.CreateClient(2); + await pub.ConnectAsync(); + await sub.ConnectAsync(); + + await using var subscription = await sub.SubscribeCoreAsync("e2e.resilience.rejoin"); + await sub.PingAsync(); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + await WaitForPropagationAsync(pub, sub, "e2e.resilience.rejoin.probe", cts.Token); + + await pub.PublishAsync("e2e.resilience.rejoin", "welcome-back", cancellationToken: cts.Token); + var msg = await subscription.Msgs.ReadAsync(cts.Token); + msg.Data.ShouldBe("welcome-back"); + } + + [Fact] + public async Task AllRoutesReconnect_AfterNodeRestart() + { + await fixture.KillNode(1); + await fixture.RestartNode(1); + await fixture.WaitForFullMeshAsync(); // All 3 nodes report NumRoutes >= 2 + + // Verify messaging works across all nodes + await using var pub = fixture.CreateClient(0); + await using var sub = fixture.CreateClient(1); + await pub.ConnectAsync(); + await sub.ConnectAsync(); + + await using var subscription = await sub.SubscribeCoreAsync("e2e.resilience.reconnect"); + await sub.PingAsync(); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + await WaitForPropagationAsync(pub, sub, "e2e.resilience.reconnect.probe", cts.Token); + + await pub.PublishAsync("e2e.resilience.reconnect", "reconnected", cancellationToken: cts.Token); + var msg = await subscription.Msgs.ReadAsync(cts.Token); + msg.Data.ShouldBe("reconnected"); + } + + [Fact] + public async Task QueueGroup_NodeDies_RemainingMembersDeliver() + { + // Both queue subs on node 1 (same node), publisher on node 0 + await using var pub = fixture.CreateClient(0); + await using var sub1 = fixture.CreateClient(1); + await using var sub2 = fixture.CreateClient(1); + await pub.ConnectAsync(); + await sub1.ConnectAsync(); + await sub2.ConnectAsync(); + + await using var s1 = await sub1.SubscribeCoreAsync("e2e.resilience.qg", queueGroup: "rq"); + await using var s2 = await sub2.SubscribeCoreAsync("e2e.resilience.qg", queueGroup: "rq"); + await sub1.PingAsync(); + await sub2.PingAsync(); + + using var propCts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + await WaitForPropagationAsync(pub, sub1, "e2e.resilience.qg.probe", propCts.Token); + + // Kill node 2 (doesn't affect subs on node 1, but tests cluster stability) + await fixture.KillNode(2); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var count = 0; + var allReceived = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + async Task Collect(INatsSub sub, CancellationToken ct) + { + await foreach (var _ in sub.Msgs.ReadAllAsync(ct)) + { + if (Interlocked.Increment(ref count) >= 10) + allReceived.TrySetResult(); + } + } + + _ = Collect(s1, cts.Token); + _ = Collect(s2, cts.Token); + + for (var i = 0; i < 10; i++) + await pub.PublishAsync("e2e.resilience.qg", i, cancellationToken: cts.Token); + await pub.PingAsync(cts.Token); + + await allReceived.Task.WaitAsync(cts.Token); + count.ShouldBe(10); + + // Restore node 2 + await fixture.RestartNode(2); + await fixture.WaitForFullMeshAsync(); + } + + /// + /// Probe-based propagation wait — same pattern as NATS.E2E.Tests.ClusterTests. + /// + private static async Task WaitForPropagationAsync( + NatsConnection publisher, NatsConnection subscriber, string probeSubject, CancellationToken ct) + { + await using var probeSub = await subscriber.SubscribeCoreAsync(probeSubject, cancellationToken: ct); + await subscriber.PingAsync(ct); + + using var timer = new PeriodicTimer(TimeSpan.FromMilliseconds(150)); + while (await timer.WaitForNextTickAsync(ct)) + { + await publisher.PublishAsync(probeSubject, "probe", cancellationToken: ct); + await publisher.PingAsync(ct); + + using var readCts = CancellationTokenSource.CreateLinkedTokenSource(ct); + readCts.CancelAfter(TimeSpan.FromMilliseconds(500)); + try + { + await probeSub.Msgs.ReadAsync(readCts.Token); + return; // Propagated + } + catch (OperationCanceledException) when (!ct.IsCancellationRequested) + { + // Not yet propagated — retry + } + } + } +} +``` + +**Step 2: Run tests** + +Run: `dotnet test tests/NATS.E2E.Cluster.Tests --filter "FullyQualifiedName~ClusterResilience" -v normal` +Expected: All 4 PASS + +**Step 3: Commit** + +```bash +git add tests/NATS.E2E.Cluster.Tests/ClusterResilienceTests.cs +git commit -m "test: add cluster resilience E2E tests (node failure, rejoin, reconnect, queue groups)" +``` + +--- + +### Task 7: Write RaftConsensusTests (4 tests) + +**Files:** +- Create: `tests/NATS.E2E.Cluster.Tests/RaftConsensusTests.cs` + +**Step 1: Write the tests** + +These tests use `JetStreamClusterFixture`. RAFT runs underneath JetStream — creating an R3 stream triggers RAFT group formation. We verify leader election, log replication, and leader failover by querying stream info via `$JS.API`. + +```csharp +using NATS.Client.Core; +using NATS.Client.JetStream; +using NATS.Client.JetStream.Models; +using NATS.E2E.Cluster.Tests.Infrastructure; + +namespace NATS.E2E.Cluster.Tests; + +public class RaftConsensusTests(JetStreamClusterFixture fixture) : IClassFixture +{ + [Fact] + public async Task LeaderElection_ClusterFormsLeader() + { + // Create R3 stream on node 0 + await using var client = fixture.CreateClient(0); + await client.ConnectAsync(); + var js = new NatsJSContext(client); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + + var stream = await js.CreateStreamAsync( + new StreamConfig("RAFT_LEADER", ["raft.leader.>"]) { NumReplicas = 3 }, + cts.Token); + + // Verify stream was created and has cluster info + stream.Info.Config.Name.ShouldBe("RAFT_LEADER"); + // At least one node should report as having the stream + stream.Info.State.ShouldNotBeNull(); + } + + [Fact] + public async Task LeaderDies_NewLeaderElected() + { + await using var client0 = fixture.CreateClient(0); + await client0.ConnectAsync(); + var js0 = new NatsJSContext(client0); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + + await js0.CreateStreamAsync( + new StreamConfig("RAFT_FAILOVER", ["raft.failover.>"]) { NumReplicas = 3 }, + cts.Token); + + // Publish some data + for (int i = 0; i < 5; i++) + await js0.PublishAsync("raft.failover.data", $"msg-{i}", cancellationToken: cts.Token); + + // Kill node 0 (may or may not be leader) + await fixture.KillNode(0); + + // Try accessing stream from node 1 + await using var client1 = fixture.CreateClient(1); + await client1.ConnectAsync(); + var js1 = new NatsJSContext(client1); + + // Give RAFT time to elect new leader + await Task.Delay(2000, cts.Token); + + // Stream should still be accessible — RAFT elected a new leader + var info = await js1.GetStreamAsync("RAFT_FAILOVER", cancellationToken: cts.Token); + info.Info.State.Messages.ShouldBeGreaterThanOrEqualTo(5); + + // Restore + await fixture.RestartNode(0); + await fixture.WaitForFullMeshAsync(); + } + + [Fact] + public async Task LogReplication_AllReplicasHaveData() + { + await using var client = fixture.CreateClient(0); + await client.ConnectAsync(); + var js = new NatsJSContext(client); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + + await js.CreateStreamAsync( + new StreamConfig("RAFT_REPL", ["raft.repl.>"]) { NumReplicas = 3 }, + cts.Token); + + // Publish 10 messages + for (int i = 0; i < 10; i++) + await js.PublishAsync("raft.repl.data", $"msg-{i}", cancellationToken: cts.Token); + + // Give replication time to propagate + await Task.Delay(1000, cts.Token); + + // Query stream info from each node — all should report 10 messages + for (int node = 0; node < 3; node++) + { + await using var c = fixture.CreateClient(node); + await c.ConnectAsync(); + var nodeJs = new NatsJSContext(c); + var info = await nodeJs.GetStreamAsync("RAFT_REPL", cancellationToken: cts.Token); + info.Info.State.Messages.ShouldBe(10, $"Node {node} should have 10 messages"); + } + } + + [Fact] + public async Task LeaderRestart_RejoinsAsFollower() + { + await using var client0 = fixture.CreateClient(0); + await client0.ConnectAsync(); + var js0 = new NatsJSContext(client0); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + + await js0.CreateStreamAsync( + new StreamConfig("RAFT_REJOIN", ["raft.rejoin.>"]) { NumReplicas = 3 }, + cts.Token); + + for (int i = 0; i < 5; i++) + await js0.PublishAsync("raft.rejoin.data", $"msg-{i}", cancellationToken: cts.Token); + + // Kill node 0 + await fixture.KillNode(0); + + // Publish more on node 1 while node 0 is down + await using var client1 = fixture.CreateClient(1); + await client1.ConnectAsync(); + var js1 = new NatsJSContext(client1); + + await Task.Delay(2000, cts.Token); // Wait for RAFT re-election + + for (int i = 5; i < 10; i++) + await js1.PublishAsync("raft.rejoin.data", $"msg-{i}", cancellationToken: cts.Token); + + // Restart node 0 — it should rejoin and catch up + await fixture.RestartNode(0); + await fixture.WaitForFullMeshAsync(); + + // Give time for RAFT catchup + await Task.Delay(2000, cts.Token); + + // Verify node 0 has all 10 messages + await using var client0b = fixture.CreateClient(0); + await client0b.ConnectAsync(); + var js0b = new NatsJSContext(client0b); + var info = await js0b.GetStreamAsync("RAFT_REJOIN", cancellationToken: cts.Token); + info.Info.State.Messages.ShouldBe(10); + } +} +``` + +**Step 2: Run tests** + +Run: `dotnet test tests/NATS.E2E.Cluster.Tests --filter "FullyQualifiedName~RaftConsensus" -v normal` +Expected: All 4 PASS + +**Step 3: Commit** + +```bash +git add tests/NATS.E2E.Cluster.Tests/RaftConsensusTests.cs +git commit -m "test: add RAFT consensus E2E tests (leader election, failover, replication, rejoin)" +``` + +--- + +### Task 8: Write JetStreamClusterTests (4 tests) + +**Files:** +- Create: `tests/NATS.E2E.Cluster.Tests/JetStreamClusterTests.cs` + +**Step 1: Write the tests** + +```csharp +using NATS.Client.Core; +using NATS.Client.JetStream; +using NATS.Client.JetStream.Models; +using NATS.E2E.Cluster.Tests.Infrastructure; + +namespace NATS.E2E.Cluster.Tests; + +public class JetStreamClusterTests(JetStreamClusterFixture fixture) : IClassFixture +{ + [Fact] + public async Task R3Stream_CreateAndPublish_ReplicatedAcrossNodes() + { + await using var client = fixture.CreateClient(0); + await client.ConnectAsync(); + var js = new NatsJSContext(client); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + + await js.CreateStreamAsync( + new StreamConfig("JS_REPL", ["js.repl.>"]) { NumReplicas = 3 }, + cts.Token); + + for (int i = 0; i < 5; i++) + await js.PublishAsync("js.repl.data", $"msg-{i}", cancellationToken: cts.Token); + + await Task.Delay(1000, cts.Token); + + // All nodes should report 5 messages + for (int node = 0; node < 3; node++) + { + await using var c = fixture.CreateClient(node); + await c.ConnectAsync(); + var nodeJs = new NatsJSContext(c); + var info = await nodeJs.GetStreamAsync("JS_REPL", cancellationToken: cts.Token); + info.Info.State.Messages.ShouldBe(5, $"Node {node} should have 5 messages"); + } + } + + [Fact] + public async Task R3Stream_NodeDies_PublishContinues() + { + await using var client = fixture.CreateClient(0); + await client.ConnectAsync(); + var js = new NatsJSContext(client); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + + await js.CreateStreamAsync( + new StreamConfig("JS_NODEDIES", ["js.nodedies.>"]) { NumReplicas = 3 }, + cts.Token); + + for (int i = 0; i < 3; i++) + await js.PublishAsync("js.nodedies.data", $"msg-{i}", cancellationToken: cts.Token); + + // Kill node 2 + await fixture.KillNode(2); + await Task.Delay(2000, cts.Token); + + // Continue publishing on node 0 — should still work with 2/3 nodes + for (int i = 3; i < 8; i++) + await js.PublishAsync("js.nodedies.data", $"msg-{i}", cancellationToken: cts.Token); + + // Verify on surviving node 1 + await using var client1 = fixture.CreateClient(1); + await client1.ConnectAsync(); + var js1 = new NatsJSContext(client1); + await Task.Delay(1000, cts.Token); + var info = await js1.GetStreamAsync("JS_NODEDIES", cancellationToken: cts.Token); + info.Info.State.Messages.ShouldBeGreaterThanOrEqualTo(8); + + await fixture.RestartNode(2); + await fixture.WaitForFullMeshAsync(); + } + + [Fact] + public async Task Consumer_NodeDies_PullContinuesOnSurvivor() + { + await using var client = fixture.CreateClient(0); + await client.ConnectAsync(); + var js = new NatsJSContext(client); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + + var stream = await js.CreateStreamAsync( + new StreamConfig("JS_CONS_FAIL", ["js.consfail.>"]) { NumReplicas = 3 }, + cts.Token); + + for (int i = 0; i < 5; i++) + await js.PublishAsync("js.consfail.data", $"msg-{i}", cancellationToken: cts.Token); + + // Create a pull consumer + var consumer = await stream.CreateOrUpdateConsumerAsync( + new ConsumerConfig("cons-fail") { AckPolicy = ConsumerConfigAckPolicy.Explicit }, + cts.Token); + + // Kill node 2 + await fixture.KillNode(2); + await Task.Delay(2000, cts.Token); + + // Consumer should still work from node 0 (which is still alive) + await using var client1 = fixture.CreateClient(1); + await client1.ConnectAsync(); + var js1 = new NatsJSContext(client1); + var stream1 = await js1.GetStreamAsync("JS_CONS_FAIL", cancellationToken: cts.Token); + var cons1 = await stream1.GetConsumerAsync("cons-fail", cts.Token); + cons1.ShouldNotBeNull(); + + await fixture.RestartNode(2); + await fixture.WaitForFullMeshAsync(); + } + + [Fact] + public async Task R3Stream_Purge_ReplicatedAcrossNodes() + { + await using var client = fixture.CreateClient(0); + await client.ConnectAsync(); + var js = new NatsJSContext(client); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + + await js.CreateStreamAsync( + new StreamConfig("JS_PURGE", ["js.purge.>"]) { NumReplicas = 3 }, + cts.Token); + + for (int i = 0; i < 5; i++) + await js.PublishAsync("js.purge.data", $"msg-{i}", cancellationToken: cts.Token); + + await Task.Delay(1000, cts.Token); + + // Purge on node 0 + await js.PurgeStreamAsync("JS_PURGE", new StreamPurgeRequest(), cts.Token); + + await Task.Delay(1000, cts.Token); + + // All nodes should report 0 messages + for (int node = 0; node < 3; node++) + { + await using var c = fixture.CreateClient(node); + await c.ConnectAsync(); + var nodeJs = new NatsJSContext(c); + var info = await nodeJs.GetStreamAsync("JS_PURGE", cancellationToken: cts.Token); + info.Info.State.Messages.ShouldBe(0, $"Node {node} should have 0 messages after purge"); + } + } +} +``` + +**Step 2: Run tests** + +Run: `dotnet test tests/NATS.E2E.Cluster.Tests --filter "FullyQualifiedName~JetStreamCluster" -v normal` +Expected: All 4 PASS + +**Step 3: Commit** + +```bash +git add tests/NATS.E2E.Cluster.Tests/JetStreamClusterTests.cs +git commit -m "test: add JetStream cluster E2E tests (R3 replication, node failure, consumer failover, purge)" +``` + +--- + +### Task 9: Write GatewayFailoverTests (2 tests) + +**Files:** +- Create: `tests/NATS.E2E.Cluster.Tests/GatewayFailoverTests.cs` + +**Step 1: Write the tests** + +```csharp +using NATS.Client.Core; +using NATS.E2E.Cluster.Tests.Infrastructure; + +namespace NATS.E2E.Cluster.Tests; + +public class GatewayFailoverTests(GatewayPairFixture fixture) : IClassFixture +{ + [Fact] + public async Task Gateway_Disconnect_Reconnects() + { + // Kill gateway B + await fixture.KillNode(1); + + // Restart gateway B + await fixture.RestartNode(1); + await fixture.WaitForGatewayConnectionAsync(); + + // Verify messaging resumes: pub on A, sub on B + await using var clientA = fixture.CreateClientA(); + await using var clientB = fixture.CreateClientB(); + await clientA.ConnectAsync(); + await clientB.ConnectAsync(); + + await using var sub = await clientB.SubscribeCoreAsync("e2e.gw.reconnect"); + await clientB.PingAsync(); + + // Allow interest propagation across gateway + await Task.Delay(1000); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + await clientA.PublishAsync("e2e.gw.reconnect", "after-reconnect", cancellationToken: cts.Token); + await clientA.PingAsync(cts.Token); + + var msg = await sub.Msgs.ReadAsync(cts.Token); + msg.Data.ShouldBe("after-reconnect"); + } + + [Fact] + public async Task Gateway_InterestUpdated_AfterReconnect() + { + // Subscribe on B, verify delivery from A + await using var clientA1 = fixture.CreateClientA(); + await using var clientB1 = fixture.CreateClientB(); + await clientA1.ConnectAsync(); + await clientB1.ConnectAsync(); + + await using var sub1 = await clientB1.SubscribeCoreAsync("e2e.gw.interest"); + await clientB1.PingAsync(); + await Task.Delay(1000); + + using var cts1 = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + await clientA1.PublishAsync("e2e.gw.interest", "before-kill", cancellationToken: cts1.Token); + var msg1 = await sub1.Msgs.ReadAsync(cts1.Token); + msg1.Data.ShouldBe("before-kill"); + + // Kill B, restart, resubscribe + await fixture.KillNode(1); + await fixture.RestartNode(1); + await fixture.WaitForGatewayConnectionAsync(); + + await using var clientB2 = fixture.CreateClientB(); + await clientB2.ConnectAsync(); + await using var sub2 = await clientB2.SubscribeCoreAsync("e2e.gw.interest"); + await clientB2.PingAsync(); + await Task.Delay(1000); + + using var cts2 = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + await clientA1.PublishAsync("e2e.gw.interest", "after-restart", cancellationToken: cts2.Token); + var msg2 = await sub2.Msgs.ReadAsync(cts2.Token); + msg2.Data.ShouldBe("after-restart"); + } +} +``` + +**Step 2: Run tests** + +Run: `dotnet test tests/NATS.E2E.Cluster.Tests --filter "FullyQualifiedName~GatewayFailover" -v normal` +Expected: All 2 PASS + +**Step 3: Commit** + +```bash +git add tests/NATS.E2E.Cluster.Tests/GatewayFailoverTests.cs +git commit -m "test: add gateway failover E2E tests (disconnect/reconnect, interest update)" +``` + +--- + +### Task 10: Write LeafNodeFailoverTests (2 tests) + +**Files:** +- Create: `tests/NATS.E2E.Cluster.Tests/LeafNodeFailoverTests.cs` + +**Step 1: Write the tests** + +```csharp +using NATS.Client.Core; +using NATS.E2E.Cluster.Tests.Infrastructure; + +namespace NATS.E2E.Cluster.Tests; + +public class LeafNodeFailoverTests(HubLeafFixture fixture) : IClassFixture +{ + [Fact] + public async Task Leaf_Disconnect_ReconnectsToHub() + { + // Kill leaf + await fixture.KillNode(1); + + // Restart leaf + await fixture.RestartNode(1); + await fixture.WaitForLeafConnectionAsync(); + + // Verify messaging: pub on hub, sub on leaf + await using var hub = fixture.CreateHubClient(); + await using var leaf = fixture.CreateLeafClient(); + await hub.ConnectAsync(); + await leaf.ConnectAsync(); + + await using var sub = await leaf.SubscribeCoreAsync("e2e.leaf.reconnect"); + await leaf.PingAsync(); + await Task.Delay(1000); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + await hub.PublishAsync("e2e.leaf.reconnect", "leaf-back", cancellationToken: cts.Token); + await hub.PingAsync(cts.Token); + + var msg = await sub.Msgs.ReadAsync(cts.Token); + msg.Data.ShouldBe("leaf-back"); + } + + [Fact] + public async Task Leaf_HubRestart_LeafReconnects() + { + // Kill hub + await fixture.KillNode(0); + + // Restart hub + await fixture.RestartNode(0); + + // Wait for leaf to reconnect (leaf has exponential backoff, may take a few seconds) + await fixture.WaitForLeafConnectionAsync(); + + // Verify messaging: pub on leaf, sub on hub + await using var hub = fixture.CreateHubClient(); + await using var leaf = fixture.CreateLeafClient(); + await hub.ConnectAsync(); + await leaf.ConnectAsync(); + + await using var sub = await hub.SubscribeCoreAsync("e2e.leaf.hubrestart"); + await hub.PingAsync(); + await Task.Delay(1000); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + await leaf.PublishAsync("e2e.leaf.hubrestart", "hub-back", cancellationToken: cts.Token); + await leaf.PingAsync(cts.Token); + + var msg = await sub.Msgs.ReadAsync(cts.Token); + msg.Data.ShouldBe("hub-back"); + } +} +``` + +**Step 2: Run tests** + +Run: `dotnet test tests/NATS.E2E.Cluster.Tests --filter "FullyQualifiedName~LeafNodeFailover" -v normal` +Expected: All 2 PASS + +**Step 3: Commit** + +```bash +git add tests/NATS.E2E.Cluster.Tests/LeafNodeFailoverTests.cs +git commit -m "test: add leaf node failover E2E tests (leaf restart, hub restart)" +``` + +--- + +### Task 11: Run full suite and final commit + +**Step 1: Build the server host (E2E tests run against the compiled binary)** + +Run: `dotnet build src/NATS.Server.Host` +Expected: Build succeeded + +**Step 2: Run the full E2E cluster test suite** + +Run: `dotnet test tests/NATS.E2E.Cluster.Tests -v normal` +Expected: 16 tests, all PASS + +**Step 3: Run the original E2E suite to verify no regressions** + +Run: `dotnet test tests/NATS.E2E.Tests -v normal` +Expected: 78 tests, all PASS + +**Step 4: Final commit if any cleanup needed** + +```bash +git add -A +git commit -m "test: finalize NATS.E2E.Cluster.Tests — 16 tests all passing" +```