# E2E Cluster & RAFT Tests Implementation Plan > **For Claude:** REQUIRED SUB-SKILL: Use superpowers-extended-cc:executing-plans to implement this plan task-by-task. **Goal:** Create `NATS.E2E.Cluster.Tests` with 16 E2E tests covering cluster resilience, RAFT consensus, JetStream cluster replication, gateway failover, and leaf node failover. **Architecture:** Single xUnit 3 project, 4 fixtures (ThreeNodeCluster, JetStreamCluster, GatewayPair, HubLeaf), all running real multi-process servers via `NatsServerProcess`. **Tech Stack:** .NET 10, xUnit 3, Shouldly, NATS.Client.Core, NATS.Client.JetStream, NATS.NKeys --- ### Task 0: Create project scaffold and add to solution **Files:** - Create: `tests/NATS.E2E.Cluster.Tests/NATS.E2E.Cluster.Tests.csproj` - Modify: `NatsDotNet.slnx` **Step 1: Create the csproj file** ```xml false ``` **Step 2: Add to solution file** Add `` to the `/tests/` folder in `NatsDotNet.slnx`. **Step 3: Verify build** Run: `dotnet build tests/NATS.E2E.Cluster.Tests` Expected: Build succeeded, 0 errors **Step 4: Commit** ```bash git add tests/NATS.E2E.Cluster.Tests/NATS.E2E.Cluster.Tests.csproj NatsDotNet.slnx git commit -m "feat: scaffold NATS.E2E.Cluster.Tests project" ``` --- ### Task 1: Copy NatsServerProcess and create base infrastructure **Files:** - Create: `tests/NATS.E2E.Cluster.Tests/Infrastructure/NatsServerProcess.cs` **Step 1: Copy NatsServerProcess from NATS.E2E.Tests** Copy `tests/NATS.E2E.Tests/Infrastructure/NatsServerProcess.cs` to `tests/NATS.E2E.Cluster.Tests/Infrastructure/NatsServerProcess.cs`. Change the namespace from `NATS.E2E.Tests.Infrastructure` to `NATS.E2E.Cluster.Tests.Infrastructure`. **Step 2: Verify build** Run: `dotnet build tests/NATS.E2E.Cluster.Tests` Expected: Build succeeded **Step 3: Commit** ```bash git add tests/NATS.E2E.Cluster.Tests/Infrastructure/NatsServerProcess.cs git commit -m "feat: add NatsServerProcess to cluster E2E infrastructure" ``` --- ### Task 2: Create ThreeNodeClusterFixture with KillNode/RestartNode **Files:** - Create: `tests/NATS.E2E.Cluster.Tests/Infrastructure/ThreeNodeClusterFixture.cs` **Step 1: Write the fixture** Model after existing `ClusterFixture` from `NATS.E2E.Tests` but with these additions: - Store `NatsServerProcess[]` array (3 servers) and parallel `string[]` for config content and `int[]` for cluster ports - Pre-allocate all ports (client, cluster, monitoring) in constructor or `InitializeAsync` - Full-mesh routes config with `pool_size: 1`, monitoring enabled, `-DV` flags - `WaitForFullMeshAsync()` — polls `/routez` on all alive nodes until each reports `NumRoutes >= expectedRouteCount` - `KillNode(int index)` — calls `DisposeAsync()` on that server process, nulls it out - `RestartNode(int index)` — creates a new `NatsServerProcess` using the saved config content and ports, calls `StartAsync()` - `WaitForFullMeshAsync(int expectedRoutes = 2)` — parameterized so after a kill, we can wait for `NumRoutes >= 1` on survivors - `CreateClient(int index)` — returns `NatsConnection` to specified node - `GetServerOutput(int index)` — returns output from specified node - Private JSON model `Routez` with `num_routes` property The key difference from the existing `ClusterFixture`: the fixture must support restarting servers on the same ports. This means: - The `NatsServerProcess` constructor takes a pre-assigned port. We need a new constructor or factory method. **However**, looking at the existing `NatsServerProcess`, it allocates its own port in the constructor. For the kill/restart pattern, we need the port to be stable. The simplest approach: add a new `NatsServerProcess` constructor overload that accepts a pre-assigned port. Copy this into our local version. Add to `NatsServerProcess`: ```csharp /// /// Constructor with pre-assigned port for kill/restart scenarios. /// public NatsServerProcess(int port, string[]? extraArgs = null, string? configContent = null, bool enableMonitoring = false, int? monitorPort = null) { Port = port; _extraArgs = extraArgs; _configContent = configContent; _enableMonitoring = enableMonitoring; if (monitorPort.HasValue) MonitorPort = monitorPort.Value; else if (_enableMonitoring) MonitorPort = AllocateFreePort(); } ``` Fixture pattern: ```csharp namespace NATS.E2E.Cluster.Tests.Infrastructure; public sealed class ThreeNodeClusterFixture : IAsyncLifetime { private NatsServerProcess?[] _servers = new NatsServerProcess?[3]; private readonly int[] _clientPorts = new int[3]; private readonly int[] _clusterPorts = new int[3]; private readonly int[] _monitorPorts = new int[3]; private readonly string[] _configs = new string[3]; public int GetPort(int index) => _clientPorts[index]; public int GetMonitorPort(int index) => _monitorPorts[index]; public async Task InitializeAsync() { /* allocate ports, build configs, start 3, wait mesh */ } public async Task KillNode(int index) { /* dispose server, null it */ } public async Task RestartNode(int index) { /* recreate server on same ports, start */ } public async Task WaitForFullMeshAsync(int expectedRoutes = 2) { /* poll /routez */ } public NatsConnection CreateClient(int index) { /* ... */ } public string GetServerOutput(int index) { /* ... */ } public async Task DisposeAsync() { /* dispose all */ } } ``` **Step 2: Write a smoke test to verify the fixture** Create a temporary test (can be removed later or kept): ```csharp // In a temp file or ClusterResilienceTests.cs [Fact] public async Task Fixture_Smoke_ThreeNodeClusterStarts() { // Just verifies the fixture starts without errors — implicit via IAsyncLifetime await Task.CompletedTask; // fixture.InitializeAsync already ran } ``` **Step 3: Run the smoke test** Run: `dotnet test tests/NATS.E2E.Cluster.Tests --filter "FullyQualifiedName~Fixture_Smoke" -v normal` Expected: PASS **Step 4: Commit** ```bash git add tests/NATS.E2E.Cluster.Tests/Infrastructure/ git commit -m "feat: add ThreeNodeClusterFixture with KillNode/RestartNode" ``` --- ### Task 3: Create JetStreamClusterFixture **Files:** - Create: `tests/NATS.E2E.Cluster.Tests/Infrastructure/JetStreamClusterFixture.cs` **Step 1: Write the fixture** Same as `ThreeNodeClusterFixture` but with `jetstream { store_dir, max_mem_store, max_file_store }` in each node's config. Store dirs are created in `/tmp/nats-e2e-js-cluster-{guid}-node{i}` and deleted on dispose. Key differences from `ThreeNodeClusterFixture`: - Config includes `jetstream { store_dir: "{storeDir}" max_mem_store: 64mb max_file_store: 256mb }` - `DisposeAsync` also deletes store directories - `RestartNode` does NOT delete the store dir (preserves state for catchup tests) Pattern: ```csharp public sealed class JetStreamClusterFixture : IAsyncLifetime { // Same fields as ThreeNodeClusterFixture plus: private readonly string[] _storeDirs = new string[3]; // Config template adds jetstream block // Dispose cleans up store dirs // RestartNode preserves store dirs } ``` **Step 2: Verify build** Run: `dotnet build tests/NATS.E2E.Cluster.Tests` Expected: Build succeeded **Step 3: Commit** ```bash git add tests/NATS.E2E.Cluster.Tests/Infrastructure/JetStreamClusterFixture.cs git commit -m "feat: add JetStreamClusterFixture for R3 replication tests" ``` --- ### Task 4: Create GatewayPairFixture with KillNode/RestartNode **Files:** - Create: `tests/NATS.E2E.Cluster.Tests/Infrastructure/GatewayPairFixture.cs` **Step 1: Write the fixture** Model after existing `GatewayFixture` from `NATS.E2E.Tests` but with: - Pre-allocated ports (client, gateway, monitoring) for both servers - `KillNode(int index)` / `RestartNode(int index)` (index 0 = server A, 1 = server B) - `WaitForGatewayConnectionAsync()` — polls `/gatewayz` on alive servers until `NumGateways >= 1` - `CreateClientA()` / `CreateClientB()` ```csharp public sealed class GatewayPairFixture : IAsyncLifetime { private NatsServerProcess?[] _servers = new NatsServerProcess?[2]; private readonly int[] _clientPorts = new int[2]; private readonly int[] _gwPorts = new int[2]; private readonly int[] _monitorPorts = new int[2]; private readonly string[] _configs = new string[2]; public async Task KillNode(int index) { /* ... */ } public async Task RestartNode(int index) { /* ... */ } public async Task WaitForGatewayConnectionAsync() { /* ... */ } // ... } ``` **Step 2: Verify build** Run: `dotnet build tests/NATS.E2E.Cluster.Tests` Expected: Build succeeded **Step 3: Commit** ```bash git add tests/NATS.E2E.Cluster.Tests/Infrastructure/GatewayPairFixture.cs git commit -m "feat: add GatewayPairFixture for failover tests" ``` --- ### Task 5: Create HubLeafFixture with KillNode/RestartNode **Files:** - Create: `tests/NATS.E2E.Cluster.Tests/Infrastructure/HubLeafFixture.cs` **Step 1: Write the fixture** Model after existing `LeafNodeFixture` from `NATS.E2E.Tests` but with: - Pre-allocated ports (client, leaf-listen, monitoring) for hub and leaf - `KillNode(int index)` / `RestartNode(int index)` (index 0 = hub, 1 = leaf) - `WaitForLeafConnectionAsync()` — polls hub's `/leafz` until `NumLeafs >= 1` - Hub starts first in `InitializeAsync()`, leaf starts after hub is ready - `RestartNode(0)` for hub: restart hub, then wait for leaf to reconnect - `RestartNode(1)` for leaf: restart leaf, then wait for hub to detect connection ```csharp public sealed class HubLeafFixture : IAsyncLifetime { private NatsServerProcess?[] _servers = new NatsServerProcess?[2]; // 0=hub, 1=leaf private readonly int[] _clientPorts = new int[2]; private readonly int _leafListenPort; private readonly int _hubMonitorPort; private readonly string[] _configs = new string[2]; public int HubPort => _clientPorts[0]; public int LeafPort => _clientPorts[1]; public async Task KillNode(int index) { /* ... */ } public async Task RestartNode(int index) { /* ... */ } public async Task WaitForLeafConnectionAsync() { /* ... */ } public NatsConnection CreateHubClient() { /* ... */ } public NatsConnection CreateLeafClient() { /* ... */ } } ``` **Step 2: Verify build** Run: `dotnet build tests/NATS.E2E.Cluster.Tests` Expected: Build succeeded **Step 3: Commit** ```bash git add tests/NATS.E2E.Cluster.Tests/Infrastructure/HubLeafFixture.cs git commit -m "feat: add HubLeafFixture for leaf node failover tests" ``` --- ### Task 6: Write ClusterResilienceTests (4 tests) **Files:** - Create: `tests/NATS.E2E.Cluster.Tests/ClusterResilienceTests.cs` **Step 1: Write the failing tests** ```csharp using NATS.Client.Core; using NATS.E2E.Cluster.Tests.Infrastructure; namespace NATS.E2E.Cluster.Tests; public class ClusterResilienceTests(ThreeNodeClusterFixture fixture) : IClassFixture { [Fact] public async Task NodeDies_TrafficReroutesToSurvivors() { // Kill node 2 await fixture.KillNode(2); // Pub on node 0, sub on node 1 — should still work await using var pub = fixture.CreateClient(0); await using var sub = fixture.CreateClient(1); await pub.ConnectAsync(); await sub.ConnectAsync(); await using var subscription = await sub.SubscribeCoreAsync("e2e.resilience.reroute"); await sub.PingAsync(); // Wait for route between node 0 and node 1 to stabilize await fixture.WaitForFullMeshAsync(expectedRoutes: 1); using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); await WaitForPropagationAsync(pub, sub, "e2e.resilience.reroute.probe", cts.Token); await pub.PublishAsync("e2e.resilience.reroute", "still-alive", cancellationToken: cts.Token); var msg = await subscription.Msgs.ReadAsync(cts.Token); msg.Data.ShouldBe("still-alive"); // Restart node 2 for fixture cleanup await fixture.RestartNode(2); await fixture.WaitForFullMeshAsync(); } [Fact] public async Task NodeRejoins_SubscriptionsPropagateAgain() { // Kill node 2 await fixture.KillNode(2); // Restart node 2 await fixture.RestartNode(2); await fixture.WaitForFullMeshAsync(); // Subscribe on restarted node 2, publish on node 0 await using var pub = fixture.CreateClient(0); await using var sub = fixture.CreateClient(2); await pub.ConnectAsync(); await sub.ConnectAsync(); await using var subscription = await sub.SubscribeCoreAsync("e2e.resilience.rejoin"); await sub.PingAsync(); using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); await WaitForPropagationAsync(pub, sub, "e2e.resilience.rejoin.probe", cts.Token); await pub.PublishAsync("e2e.resilience.rejoin", "welcome-back", cancellationToken: cts.Token); var msg = await subscription.Msgs.ReadAsync(cts.Token); msg.Data.ShouldBe("welcome-back"); } [Fact] public async Task AllRoutesReconnect_AfterNodeRestart() { await fixture.KillNode(1); await fixture.RestartNode(1); await fixture.WaitForFullMeshAsync(); // All 3 nodes report NumRoutes >= 2 // Verify messaging works across all nodes await using var pub = fixture.CreateClient(0); await using var sub = fixture.CreateClient(1); await pub.ConnectAsync(); await sub.ConnectAsync(); await using var subscription = await sub.SubscribeCoreAsync("e2e.resilience.reconnect"); await sub.PingAsync(); using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); await WaitForPropagationAsync(pub, sub, "e2e.resilience.reconnect.probe", cts.Token); await pub.PublishAsync("e2e.resilience.reconnect", "reconnected", cancellationToken: cts.Token); var msg = await subscription.Msgs.ReadAsync(cts.Token); msg.Data.ShouldBe("reconnected"); } [Fact] public async Task QueueGroup_NodeDies_RemainingMembersDeliver() { // Both queue subs on node 1 (same node), publisher on node 0 await using var pub = fixture.CreateClient(0); await using var sub1 = fixture.CreateClient(1); await using var sub2 = fixture.CreateClient(1); await pub.ConnectAsync(); await sub1.ConnectAsync(); await sub2.ConnectAsync(); await using var s1 = await sub1.SubscribeCoreAsync("e2e.resilience.qg", queueGroup: "rq"); await using var s2 = await sub2.SubscribeCoreAsync("e2e.resilience.qg", queueGroup: "rq"); await sub1.PingAsync(); await sub2.PingAsync(); using var propCts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); await WaitForPropagationAsync(pub, sub1, "e2e.resilience.qg.probe", propCts.Token); // Kill node 2 (doesn't affect subs on node 1, but tests cluster stability) await fixture.KillNode(2); using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var count = 0; var allReceived = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); async Task Collect(INatsSub sub, CancellationToken ct) { await foreach (var _ in sub.Msgs.ReadAllAsync(ct)) { if (Interlocked.Increment(ref count) >= 10) allReceived.TrySetResult(); } } _ = Collect(s1, cts.Token); _ = Collect(s2, cts.Token); for (var i = 0; i < 10; i++) await pub.PublishAsync("e2e.resilience.qg", i, cancellationToken: cts.Token); await pub.PingAsync(cts.Token); await allReceived.Task.WaitAsync(cts.Token); count.ShouldBe(10); // Restore node 2 await fixture.RestartNode(2); await fixture.WaitForFullMeshAsync(); } /// /// Probe-based propagation wait — same pattern as NATS.E2E.Tests.ClusterTests. /// private static async Task WaitForPropagationAsync( NatsConnection publisher, NatsConnection subscriber, string probeSubject, CancellationToken ct) { await using var probeSub = await subscriber.SubscribeCoreAsync(probeSubject, cancellationToken: ct); await subscriber.PingAsync(ct); using var timer = new PeriodicTimer(TimeSpan.FromMilliseconds(150)); while (await timer.WaitForNextTickAsync(ct)) { await publisher.PublishAsync(probeSubject, "probe", cancellationToken: ct); await publisher.PingAsync(ct); using var readCts = CancellationTokenSource.CreateLinkedTokenSource(ct); readCts.CancelAfter(TimeSpan.FromMilliseconds(500)); try { await probeSub.Msgs.ReadAsync(readCts.Token); return; // Propagated } catch (OperationCanceledException) when (!ct.IsCancellationRequested) { // Not yet propagated — retry } } } } ``` **Step 2: Run tests** Run: `dotnet test tests/NATS.E2E.Cluster.Tests --filter "FullyQualifiedName~ClusterResilience" -v normal` Expected: All 4 PASS **Step 3: Commit** ```bash git add tests/NATS.E2E.Cluster.Tests/ClusterResilienceTests.cs git commit -m "test: add cluster resilience E2E tests (node failure, rejoin, reconnect, queue groups)" ``` --- ### Task 7: Write RaftConsensusTests (4 tests) **Files:** - Create: `tests/NATS.E2E.Cluster.Tests/RaftConsensusTests.cs` **Step 1: Write the tests** These tests use `JetStreamClusterFixture`. RAFT runs underneath JetStream — creating an R3 stream triggers RAFT group formation. We verify leader election, log replication, and leader failover by querying stream info via `$JS.API`. ```csharp using NATS.Client.Core; using NATS.Client.JetStream; using NATS.Client.JetStream.Models; using NATS.E2E.Cluster.Tests.Infrastructure; namespace NATS.E2E.Cluster.Tests; public class RaftConsensusTests(JetStreamClusterFixture fixture) : IClassFixture { [Fact] public async Task LeaderElection_ClusterFormsLeader() { // Create R3 stream on node 0 await using var client = fixture.CreateClient(0); await client.ConnectAsync(); var js = new NatsJSContext(client); using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); var stream = await js.CreateStreamAsync( new StreamConfig("RAFT_LEADER", ["raft.leader.>"]) { NumReplicas = 3 }, cts.Token); // Verify stream was created and has cluster info stream.Info.Config.Name.ShouldBe("RAFT_LEADER"); // At least one node should report as having the stream stream.Info.State.ShouldNotBeNull(); } [Fact] public async Task LeaderDies_NewLeaderElected() { await using var client0 = fixture.CreateClient(0); await client0.ConnectAsync(); var js0 = new NatsJSContext(client0); using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); await js0.CreateStreamAsync( new StreamConfig("RAFT_FAILOVER", ["raft.failover.>"]) { NumReplicas = 3 }, cts.Token); // Publish some data for (int i = 0; i < 5; i++) await js0.PublishAsync("raft.failover.data", $"msg-{i}", cancellationToken: cts.Token); // Kill node 0 (may or may not be leader) await fixture.KillNode(0); // Try accessing stream from node 1 await using var client1 = fixture.CreateClient(1); await client1.ConnectAsync(); var js1 = new NatsJSContext(client1); // Give RAFT time to elect new leader await Task.Delay(2000, cts.Token); // Stream should still be accessible — RAFT elected a new leader var info = await js1.GetStreamAsync("RAFT_FAILOVER", cancellationToken: cts.Token); info.Info.State.Messages.ShouldBeGreaterThanOrEqualTo(5); // Restore await fixture.RestartNode(0); await fixture.WaitForFullMeshAsync(); } [Fact] public async Task LogReplication_AllReplicasHaveData() { await using var client = fixture.CreateClient(0); await client.ConnectAsync(); var js = new NatsJSContext(client); using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); await js.CreateStreamAsync( new StreamConfig("RAFT_REPL", ["raft.repl.>"]) { NumReplicas = 3 }, cts.Token); // Publish 10 messages for (int i = 0; i < 10; i++) await js.PublishAsync("raft.repl.data", $"msg-{i}", cancellationToken: cts.Token); // Give replication time to propagate await Task.Delay(1000, cts.Token); // Query stream info from each node — all should report 10 messages for (int node = 0; node < 3; node++) { await using var c = fixture.CreateClient(node); await c.ConnectAsync(); var nodeJs = new NatsJSContext(c); var info = await nodeJs.GetStreamAsync("RAFT_REPL", cancellationToken: cts.Token); info.Info.State.Messages.ShouldBe(10, $"Node {node} should have 10 messages"); } } [Fact] public async Task LeaderRestart_RejoinsAsFollower() { await using var client0 = fixture.CreateClient(0); await client0.ConnectAsync(); var js0 = new NatsJSContext(client0); using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); await js0.CreateStreamAsync( new StreamConfig("RAFT_REJOIN", ["raft.rejoin.>"]) { NumReplicas = 3 }, cts.Token); for (int i = 0; i < 5; i++) await js0.PublishAsync("raft.rejoin.data", $"msg-{i}", cancellationToken: cts.Token); // Kill node 0 await fixture.KillNode(0); // Publish more on node 1 while node 0 is down await using var client1 = fixture.CreateClient(1); await client1.ConnectAsync(); var js1 = new NatsJSContext(client1); await Task.Delay(2000, cts.Token); // Wait for RAFT re-election for (int i = 5; i < 10; i++) await js1.PublishAsync("raft.rejoin.data", $"msg-{i}", cancellationToken: cts.Token); // Restart node 0 — it should rejoin and catch up await fixture.RestartNode(0); await fixture.WaitForFullMeshAsync(); // Give time for RAFT catchup await Task.Delay(2000, cts.Token); // Verify node 0 has all 10 messages await using var client0b = fixture.CreateClient(0); await client0b.ConnectAsync(); var js0b = new NatsJSContext(client0b); var info = await js0b.GetStreamAsync("RAFT_REJOIN", cancellationToken: cts.Token); info.Info.State.Messages.ShouldBe(10); } } ``` **Step 2: Run tests** Run: `dotnet test tests/NATS.E2E.Cluster.Tests --filter "FullyQualifiedName~RaftConsensus" -v normal` Expected: All 4 PASS **Step 3: Commit** ```bash git add tests/NATS.E2E.Cluster.Tests/RaftConsensusTests.cs git commit -m "test: add RAFT consensus E2E tests (leader election, failover, replication, rejoin)" ``` --- ### Task 8: Write JetStreamClusterTests (4 tests) **Files:** - Create: `tests/NATS.E2E.Cluster.Tests/JetStreamClusterTests.cs` **Step 1: Write the tests** ```csharp using NATS.Client.Core; using NATS.Client.JetStream; using NATS.Client.JetStream.Models; using NATS.E2E.Cluster.Tests.Infrastructure; namespace NATS.E2E.Cluster.Tests; public class JetStreamClusterTests(JetStreamClusterFixture fixture) : IClassFixture { [Fact] public async Task R3Stream_CreateAndPublish_ReplicatedAcrossNodes() { await using var client = fixture.CreateClient(0); await client.ConnectAsync(); var js = new NatsJSContext(client); using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); await js.CreateStreamAsync( new StreamConfig("JS_REPL", ["js.repl.>"]) { NumReplicas = 3 }, cts.Token); for (int i = 0; i < 5; i++) await js.PublishAsync("js.repl.data", $"msg-{i}", cancellationToken: cts.Token); await Task.Delay(1000, cts.Token); // All nodes should report 5 messages for (int node = 0; node < 3; node++) { await using var c = fixture.CreateClient(node); await c.ConnectAsync(); var nodeJs = new NatsJSContext(c); var info = await nodeJs.GetStreamAsync("JS_REPL", cancellationToken: cts.Token); info.Info.State.Messages.ShouldBe(5, $"Node {node} should have 5 messages"); } } [Fact] public async Task R3Stream_NodeDies_PublishContinues() { await using var client = fixture.CreateClient(0); await client.ConnectAsync(); var js = new NatsJSContext(client); using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); await js.CreateStreamAsync( new StreamConfig("JS_NODEDIES", ["js.nodedies.>"]) { NumReplicas = 3 }, cts.Token); for (int i = 0; i < 3; i++) await js.PublishAsync("js.nodedies.data", $"msg-{i}", cancellationToken: cts.Token); // Kill node 2 await fixture.KillNode(2); await Task.Delay(2000, cts.Token); // Continue publishing on node 0 — should still work with 2/3 nodes for (int i = 3; i < 8; i++) await js.PublishAsync("js.nodedies.data", $"msg-{i}", cancellationToken: cts.Token); // Verify on surviving node 1 await using var client1 = fixture.CreateClient(1); await client1.ConnectAsync(); var js1 = new NatsJSContext(client1); await Task.Delay(1000, cts.Token); var info = await js1.GetStreamAsync("JS_NODEDIES", cancellationToken: cts.Token); info.Info.State.Messages.ShouldBeGreaterThanOrEqualTo(8); await fixture.RestartNode(2); await fixture.WaitForFullMeshAsync(); } [Fact] public async Task Consumer_NodeDies_PullContinuesOnSurvivor() { await using var client = fixture.CreateClient(0); await client.ConnectAsync(); var js = new NatsJSContext(client); using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); var stream = await js.CreateStreamAsync( new StreamConfig("JS_CONS_FAIL", ["js.consfail.>"]) { NumReplicas = 3 }, cts.Token); for (int i = 0; i < 5; i++) await js.PublishAsync("js.consfail.data", $"msg-{i}", cancellationToken: cts.Token); // Create a pull consumer var consumer = await stream.CreateOrUpdateConsumerAsync( new ConsumerConfig("cons-fail") { AckPolicy = ConsumerConfigAckPolicy.Explicit }, cts.Token); // Kill node 2 await fixture.KillNode(2); await Task.Delay(2000, cts.Token); // Consumer should still work from node 0 (which is still alive) await using var client1 = fixture.CreateClient(1); await client1.ConnectAsync(); var js1 = new NatsJSContext(client1); var stream1 = await js1.GetStreamAsync("JS_CONS_FAIL", cancellationToken: cts.Token); var cons1 = await stream1.GetConsumerAsync("cons-fail", cts.Token); cons1.ShouldNotBeNull(); await fixture.RestartNode(2); await fixture.WaitForFullMeshAsync(); } [Fact] public async Task R3Stream_Purge_ReplicatedAcrossNodes() { await using var client = fixture.CreateClient(0); await client.ConnectAsync(); var js = new NatsJSContext(client); using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); await js.CreateStreamAsync( new StreamConfig("JS_PURGE", ["js.purge.>"]) { NumReplicas = 3 }, cts.Token); for (int i = 0; i < 5; i++) await js.PublishAsync("js.purge.data", $"msg-{i}", cancellationToken: cts.Token); await Task.Delay(1000, cts.Token); // Purge on node 0 await js.PurgeStreamAsync("JS_PURGE", new StreamPurgeRequest(), cts.Token); await Task.Delay(1000, cts.Token); // All nodes should report 0 messages for (int node = 0; node < 3; node++) { await using var c = fixture.CreateClient(node); await c.ConnectAsync(); var nodeJs = new NatsJSContext(c); var info = await nodeJs.GetStreamAsync("JS_PURGE", cancellationToken: cts.Token); info.Info.State.Messages.ShouldBe(0, $"Node {node} should have 0 messages after purge"); } } } ``` **Step 2: Run tests** Run: `dotnet test tests/NATS.E2E.Cluster.Tests --filter "FullyQualifiedName~JetStreamCluster" -v normal` Expected: All 4 PASS **Step 3: Commit** ```bash git add tests/NATS.E2E.Cluster.Tests/JetStreamClusterTests.cs git commit -m "test: add JetStream cluster E2E tests (R3 replication, node failure, consumer failover, purge)" ``` --- ### Task 9: Write GatewayFailoverTests (2 tests) **Files:** - Create: `tests/NATS.E2E.Cluster.Tests/GatewayFailoverTests.cs` **Step 1: Write the tests** ```csharp using NATS.Client.Core; using NATS.E2E.Cluster.Tests.Infrastructure; namespace NATS.E2E.Cluster.Tests; public class GatewayFailoverTests(GatewayPairFixture fixture) : IClassFixture { [Fact] public async Task Gateway_Disconnect_Reconnects() { // Kill gateway B await fixture.KillNode(1); // Restart gateway B await fixture.RestartNode(1); await fixture.WaitForGatewayConnectionAsync(); // Verify messaging resumes: pub on A, sub on B await using var clientA = fixture.CreateClientA(); await using var clientB = fixture.CreateClientB(); await clientA.ConnectAsync(); await clientB.ConnectAsync(); await using var sub = await clientB.SubscribeCoreAsync("e2e.gw.reconnect"); await clientB.PingAsync(); // Allow interest propagation across gateway await Task.Delay(1000); using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); await clientA.PublishAsync("e2e.gw.reconnect", "after-reconnect", cancellationToken: cts.Token); await clientA.PingAsync(cts.Token); var msg = await sub.Msgs.ReadAsync(cts.Token); msg.Data.ShouldBe("after-reconnect"); } [Fact] public async Task Gateway_InterestUpdated_AfterReconnect() { // Subscribe on B, verify delivery from A await using var clientA1 = fixture.CreateClientA(); await using var clientB1 = fixture.CreateClientB(); await clientA1.ConnectAsync(); await clientB1.ConnectAsync(); await using var sub1 = await clientB1.SubscribeCoreAsync("e2e.gw.interest"); await clientB1.PingAsync(); await Task.Delay(1000); using var cts1 = new CancellationTokenSource(TimeSpan.FromSeconds(10)); await clientA1.PublishAsync("e2e.gw.interest", "before-kill", cancellationToken: cts1.Token); var msg1 = await sub1.Msgs.ReadAsync(cts1.Token); msg1.Data.ShouldBe("before-kill"); // Kill B, restart, resubscribe await fixture.KillNode(1); await fixture.RestartNode(1); await fixture.WaitForGatewayConnectionAsync(); await using var clientB2 = fixture.CreateClientB(); await clientB2.ConnectAsync(); await using var sub2 = await clientB2.SubscribeCoreAsync("e2e.gw.interest"); await clientB2.PingAsync(); await Task.Delay(1000); using var cts2 = new CancellationTokenSource(TimeSpan.FromSeconds(10)); await clientA1.PublishAsync("e2e.gw.interest", "after-restart", cancellationToken: cts2.Token); var msg2 = await sub2.Msgs.ReadAsync(cts2.Token); msg2.Data.ShouldBe("after-restart"); } } ``` **Step 2: Run tests** Run: `dotnet test tests/NATS.E2E.Cluster.Tests --filter "FullyQualifiedName~GatewayFailover" -v normal` Expected: All 2 PASS **Step 3: Commit** ```bash git add tests/NATS.E2E.Cluster.Tests/GatewayFailoverTests.cs git commit -m "test: add gateway failover E2E tests (disconnect/reconnect, interest update)" ``` --- ### Task 10: Write LeafNodeFailoverTests (2 tests) **Files:** - Create: `tests/NATS.E2E.Cluster.Tests/LeafNodeFailoverTests.cs` **Step 1: Write the tests** ```csharp using NATS.Client.Core; using NATS.E2E.Cluster.Tests.Infrastructure; namespace NATS.E2E.Cluster.Tests; public class LeafNodeFailoverTests(HubLeafFixture fixture) : IClassFixture { [Fact] public async Task Leaf_Disconnect_ReconnectsToHub() { // Kill leaf await fixture.KillNode(1); // Restart leaf await fixture.RestartNode(1); await fixture.WaitForLeafConnectionAsync(); // Verify messaging: pub on hub, sub on leaf await using var hub = fixture.CreateHubClient(); await using var leaf = fixture.CreateLeafClient(); await hub.ConnectAsync(); await leaf.ConnectAsync(); await using var sub = await leaf.SubscribeCoreAsync("e2e.leaf.reconnect"); await leaf.PingAsync(); await Task.Delay(1000); using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); await hub.PublishAsync("e2e.leaf.reconnect", "leaf-back", cancellationToken: cts.Token); await hub.PingAsync(cts.Token); var msg = await sub.Msgs.ReadAsync(cts.Token); msg.Data.ShouldBe("leaf-back"); } [Fact] public async Task Leaf_HubRestart_LeafReconnects() { // Kill hub await fixture.KillNode(0); // Restart hub await fixture.RestartNode(0); // Wait for leaf to reconnect (leaf has exponential backoff, may take a few seconds) await fixture.WaitForLeafConnectionAsync(); // Verify messaging: pub on leaf, sub on hub await using var hub = fixture.CreateHubClient(); await using var leaf = fixture.CreateLeafClient(); await hub.ConnectAsync(); await leaf.ConnectAsync(); await using var sub = await hub.SubscribeCoreAsync("e2e.leaf.hubrestart"); await hub.PingAsync(); await Task.Delay(1000); using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); await leaf.PublishAsync("e2e.leaf.hubrestart", "hub-back", cancellationToken: cts.Token); await leaf.PingAsync(cts.Token); var msg = await sub.Msgs.ReadAsync(cts.Token); msg.Data.ShouldBe("hub-back"); } } ``` **Step 2: Run tests** Run: `dotnet test tests/NATS.E2E.Cluster.Tests --filter "FullyQualifiedName~LeafNodeFailover" -v normal` Expected: All 2 PASS **Step 3: Commit** ```bash git add tests/NATS.E2E.Cluster.Tests/LeafNodeFailoverTests.cs git commit -m "test: add leaf node failover E2E tests (leaf restart, hub restart)" ``` --- ### Task 11: Run full suite and final commit **Step 1: Build the server host (E2E tests run against the compiled binary)** Run: `dotnet build src/NATS.Server.Host` Expected: Build succeeded **Step 2: Run the full E2E cluster test suite** Run: `dotnet test tests/NATS.E2E.Cluster.Tests -v normal` Expected: 16 tests, all PASS **Step 3: Run the original E2E suite to verify no regressions** Run: `dotnet test tests/NATS.E2E.Tests -v normal` Expected: 78 tests, all PASS **Step 4: Final commit if any cleanup needed** ```bash git add -A git commit -m "test: finalize NATS.E2E.Cluster.Tests — 16 tests all passing" ```