diff --git a/tests/NATS.E2E.Cluster.Tests/Infrastructure/JetStreamClusterFixture.cs b/tests/NATS.E2E.Cluster.Tests/Infrastructure/JetStreamClusterFixture.cs
new file mode 100644
index 0000000..c0b9e11
--- /dev/null
+++ b/tests/NATS.E2E.Cluster.Tests/Infrastructure/JetStreamClusterFixture.cs
@@ -0,0 +1,196 @@
+using System.Net.Http.Json;
+using System.Text.Json.Serialization;
+using NATS.Client.Core;
+
+namespace NATS.E2E.Cluster.Tests.Infrastructure;
+
+public sealed class JetStreamClusterFixture : IAsyncLifetime
+{
+ private readonly NatsServerProcess?[] _servers = new NatsServerProcess?[3];
+ private readonly int[] _clientPorts = new int[3];
+ private readonly int[] _clusterPorts = new int[3];
+ private readonly int[] _monitorPorts = new int[3];
+ private readonly string[] _configs = new string[3];
+ private readonly string[] _storeDirs = new string[3];
+
+ public async Task InitializeAsync()
+ {
+ // Pre-allocate all ports upfront so they can be reused across kill/restart
+ for (var i = 0; i < 3; i++)
+ {
+ _clientPorts[i] = NatsServerProcess.AllocateFreePort();
+ _clusterPorts[i] = NatsServerProcess.AllocateFreePort();
+ _monitorPorts[i] = NatsServerProcess.AllocateFreePort();
+ }
+
+ // Create unique store directories for this fixture instance
+ var guid = Guid.NewGuid().ToString("N");
+ for (var i = 0; i < 3; i++)
+ {
+ _storeDirs[i] = $"/tmp/nats-e2e-js-cluster-{guid}-node{i}";
+ Directory.CreateDirectory(_storeDirs[i]);
+ }
+
+ var routes = $"""
+ nats-route://127.0.0.1:{_clusterPorts[0]}
+ nats-route://127.0.0.1:{_clusterPorts[1]}
+ nats-route://127.0.0.1:{_clusterPorts[2]}
+ """;
+
+ for (var i = 0; i < 3; i++)
+ {
+ _configs[i] = $$"""
+ server_name: node{{i + 1}}
+ cluster {
+ name: e2e-js-cluster
+ listen: 127.0.0.1:{{_clusterPorts[i]}}
+ pool_size: 1
+ routes: [
+ {{routes}}
+ ]
+ }
+ jetstream {
+ store_dir: "{{_storeDirs[i]}}"
+ max_mem_store: 64mb
+ max_file_store: 256mb
+ }
+ """;
+
+ _servers[i] = new NatsServerProcess(
+ _clientPorts[i],
+ extraArgs: ["-DV"],
+ configContent: _configs[i],
+ enableMonitoring: true,
+ monitorPort: _monitorPorts[i]);
+ }
+
+ await Task.WhenAll(_servers.Select(s => s!.StartAsync()));
+
+ await WaitForFullMeshAsync();
+ }
+
+ ///
+ /// Kills the server at the given index (0–2) by disposing it.
+ ///
+ public async Task KillNode(int index)
+ {
+ if (_servers[index] is { } server)
+ {
+ await server.DisposeAsync();
+ _servers[index] = null;
+ }
+ }
+
+ ///
+ /// Restarts the server at the given index on its originally allocated ports.
+ /// Store dir is preserved to maintain JetStream state for catchup tests.
+ ///
+ public async Task RestartNode(int index)
+ {
+ // Dispose any existing process first (idempotent)
+ if (_servers[index] is not null)
+ await KillNode(index);
+
+ // Store dir is intentionally NOT deleted — preserves JetStream state for catchup tests
+ _servers[index] = new NatsServerProcess(
+ _clientPorts[index],
+ extraArgs: ["-DV"],
+ configContent: _configs[index],
+ enableMonitoring: true,
+ monitorPort: _monitorPorts[index]);
+
+ await _servers[index]!.StartAsync();
+ }
+
+ ///
+ /// Polls /routez on all alive nodes until each reports NumRoutes >= expectedRoutes.
+ /// Times out after 30 seconds.
+ ///
+ public async Task WaitForFullMeshAsync(int expectedRoutes = 2)
+ {
+ using var http = new HttpClient();
+ using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(30));
+ using var timer = new PeriodicTimer(TimeSpan.FromMilliseconds(200));
+
+ while (await timer.WaitForNextTickAsync(timeout.Token).ConfigureAwait(false))
+ {
+ var allConnected = true;
+
+ for (var i = 0; i < 3; i++)
+ {
+ // Skip killed nodes
+ if (_servers[i] is null)
+ continue;
+
+ try
+ {
+ var routez = await http.GetFromJsonAsync(
+ $"http://127.0.0.1:{_monitorPorts[i]}/routez",
+ timeout.Token);
+
+ if (routez?.NumRoutes < expectedRoutes)
+ {
+ allConnected = false;
+ break;
+ }
+ }
+ catch
+ {
+ allConnected = false;
+ break;
+ }
+ }
+
+ if (allConnected)
+ return;
+ }
+
+ throw new TimeoutException($"Cluster did not reach {expectedRoutes} routes per node within 30s.");
+ }
+
+ ///
+ /// Creates a NatsConnection pointed at the given node index (0–2).
+ ///
+ public NatsConnection CreateClient(int index = 0)
+ => new(new NatsOpts { Url = $"nats://127.0.0.1:{_clientPorts[index]}" });
+
+ ///
+ /// Returns the captured stdout/stderr output from the server at the given index.
+ ///
+ public string GetServerOutput(int index)
+ => _servers[index]?.Output ?? string.Empty;
+
+ public async Task DisposeAsync()
+ {
+ var disposeTasks = _servers
+ .Where(s => s is not null)
+ .Select(s => s!.DisposeAsync().AsTask());
+
+ await Task.WhenAll(disposeTasks);
+
+ // Delete store directories after servers are stopped
+ for (var i = 0; i < 3; i++)
+ {
+ if (_storeDirs[i] is { Length: > 0 } dir && Directory.Exists(dir))
+ {
+ try
+ {
+ Directory.Delete(dir, recursive: true);
+ }
+ catch (Exception ex)
+ {
+ Console.Error.WriteLine($"[JetStreamClusterFixture] Failed to delete store dir '{dir}': {ex.Message}");
+ }
+ }
+ }
+ }
+
+ private sealed class Routez
+ {
+ [JsonPropertyName("num_routes")]
+ public int NumRoutes { get; init; }
+ }
+}
+
+[CollectionDefinition("E2E-JetStreamCluster")]
+public class JetStreamClusterCollection : ICollectionFixture;