From 75ad411d836f852c0b7311942e1803807093f3b9 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Thu, 12 Mar 2026 23:31:49 -0400 Subject: [PATCH] feat: add JetStreamClusterFixture for R3 replication tests --- .../Infrastructure/JetStreamClusterFixture.cs | 196 ++++++++++++++++++ 1 file changed, 196 insertions(+) create mode 100644 tests/NATS.E2E.Cluster.Tests/Infrastructure/JetStreamClusterFixture.cs diff --git a/tests/NATS.E2E.Cluster.Tests/Infrastructure/JetStreamClusterFixture.cs b/tests/NATS.E2E.Cluster.Tests/Infrastructure/JetStreamClusterFixture.cs new file mode 100644 index 0000000..c0b9e11 --- /dev/null +++ b/tests/NATS.E2E.Cluster.Tests/Infrastructure/JetStreamClusterFixture.cs @@ -0,0 +1,196 @@ +using System.Net.Http.Json; +using System.Text.Json.Serialization; +using NATS.Client.Core; + +namespace NATS.E2E.Cluster.Tests.Infrastructure; + +public sealed class JetStreamClusterFixture : IAsyncLifetime +{ + private readonly NatsServerProcess?[] _servers = new NatsServerProcess?[3]; + private readonly int[] _clientPorts = new int[3]; + private readonly int[] _clusterPorts = new int[3]; + private readonly int[] _monitorPorts = new int[3]; + private readonly string[] _configs = new string[3]; + private readonly string[] _storeDirs = new string[3]; + + public async Task InitializeAsync() + { + // Pre-allocate all ports upfront so they can be reused across kill/restart + for (var i = 0; i < 3; i++) + { + _clientPorts[i] = NatsServerProcess.AllocateFreePort(); + _clusterPorts[i] = NatsServerProcess.AllocateFreePort(); + _monitorPorts[i] = NatsServerProcess.AllocateFreePort(); + } + + // Create unique store directories for this fixture instance + var guid = Guid.NewGuid().ToString("N"); + for (var i = 0; i < 3; i++) + { + _storeDirs[i] = $"/tmp/nats-e2e-js-cluster-{guid}-node{i}"; + Directory.CreateDirectory(_storeDirs[i]); + } + + var routes = $""" + nats-route://127.0.0.1:{_clusterPorts[0]} + nats-route://127.0.0.1:{_clusterPorts[1]} + nats-route://127.0.0.1:{_clusterPorts[2]} + """; + + for (var i = 0; i < 3; i++) + { + _configs[i] = $$""" + server_name: node{{i + 1}} + cluster { + name: e2e-js-cluster + listen: 127.0.0.1:{{_clusterPorts[i]}} + pool_size: 1 + routes: [ + {{routes}} + ] + } + jetstream { + store_dir: "{{_storeDirs[i]}}" + max_mem_store: 64mb + max_file_store: 256mb + } + """; + + _servers[i] = new NatsServerProcess( + _clientPorts[i], + extraArgs: ["-DV"], + configContent: _configs[i], + enableMonitoring: true, + monitorPort: _monitorPorts[i]); + } + + await Task.WhenAll(_servers.Select(s => s!.StartAsync())); + + await WaitForFullMeshAsync(); + } + + /// + /// Kills the server at the given index (0–2) by disposing it. + /// + public async Task KillNode(int index) + { + if (_servers[index] is { } server) + { + await server.DisposeAsync(); + _servers[index] = null; + } + } + + /// + /// Restarts the server at the given index on its originally allocated ports. + /// Store dir is preserved to maintain JetStream state for catchup tests. + /// + public async Task RestartNode(int index) + { + // Dispose any existing process first (idempotent) + if (_servers[index] is not null) + await KillNode(index); + + // Store dir is intentionally NOT deleted — preserves JetStream state for catchup tests + _servers[index] = new NatsServerProcess( + _clientPorts[index], + extraArgs: ["-DV"], + configContent: _configs[index], + enableMonitoring: true, + monitorPort: _monitorPorts[index]); + + await _servers[index]!.StartAsync(); + } + + /// + /// Polls /routez on all alive nodes until each reports NumRoutes >= expectedRoutes. + /// Times out after 30 seconds. + /// + public async Task WaitForFullMeshAsync(int expectedRoutes = 2) + { + using var http = new HttpClient(); + using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + using var timer = new PeriodicTimer(TimeSpan.FromMilliseconds(200)); + + while (await timer.WaitForNextTickAsync(timeout.Token).ConfigureAwait(false)) + { + var allConnected = true; + + for (var i = 0; i < 3; i++) + { + // Skip killed nodes + if (_servers[i] is null) + continue; + + try + { + var routez = await http.GetFromJsonAsync( + $"http://127.0.0.1:{_monitorPorts[i]}/routez", + timeout.Token); + + if (routez?.NumRoutes < expectedRoutes) + { + allConnected = false; + break; + } + } + catch + { + allConnected = false; + break; + } + } + + if (allConnected) + return; + } + + throw new TimeoutException($"Cluster did not reach {expectedRoutes} routes per node within 30s."); + } + + /// + /// Creates a NatsConnection pointed at the given node index (0–2). + /// + public NatsConnection CreateClient(int index = 0) + => new(new NatsOpts { Url = $"nats://127.0.0.1:{_clientPorts[index]}" }); + + /// + /// Returns the captured stdout/stderr output from the server at the given index. + /// + public string GetServerOutput(int index) + => _servers[index]?.Output ?? string.Empty; + + public async Task DisposeAsync() + { + var disposeTasks = _servers + .Where(s => s is not null) + .Select(s => s!.DisposeAsync().AsTask()); + + await Task.WhenAll(disposeTasks); + + // Delete store directories after servers are stopped + for (var i = 0; i < 3; i++) + { + if (_storeDirs[i] is { Length: > 0 } dir && Directory.Exists(dir)) + { + try + { + Directory.Delete(dir, recursive: true); + } + catch (Exception ex) + { + Console.Error.WriteLine($"[JetStreamClusterFixture] Failed to delete store dir '{dir}': {ex.Message}"); + } + } + } + } + + private sealed class Routez + { + [JsonPropertyName("num_routes")] + public int NumRoutes { get; init; } + } +} + +[CollectionDefinition("E2E-JetStreamCluster")] +public class JetStreamClusterCollection : ICollectionFixture;