197 lines
6.3 KiB
C#
197 lines
6.3 KiB
C#
using System.Net.Http.Json;
|
||
using System.Text.Json.Serialization;
|
||
using NATS.Client.Core;
|
||
|
||
namespace NATS.E2E.Cluster.Tests.Infrastructure;
|
||
|
||
public sealed class JetStreamClusterFixture : IAsyncLifetime
|
||
{
|
||
private readonly NatsServerProcess?[] _servers = new NatsServerProcess?[3];
|
||
private readonly int[] _clientPorts = new int[3];
|
||
private readonly int[] _clusterPorts = new int[3];
|
||
private readonly int[] _monitorPorts = new int[3];
|
||
private readonly string[] _configs = new string[3];
|
||
private readonly string[] _storeDirs = new string[3];
|
||
|
||
public async Task InitializeAsync()
|
||
{
|
||
// Pre-allocate all ports upfront so they can be reused across kill/restart
|
||
for (var i = 0; i < 3; i++)
|
||
{
|
||
_clientPorts[i] = NatsServerProcess.AllocateFreePort();
|
||
_clusterPorts[i] = NatsServerProcess.AllocateFreePort();
|
||
_monitorPorts[i] = NatsServerProcess.AllocateFreePort();
|
||
}
|
||
|
||
// Create unique store directories for this fixture instance
|
||
var guid = Guid.NewGuid().ToString("N");
|
||
for (var i = 0; i < 3; i++)
|
||
{
|
||
_storeDirs[i] = $"/tmp/nats-e2e-js-cluster-{guid}-node{i}";
|
||
Directory.CreateDirectory(_storeDirs[i]);
|
||
}
|
||
|
||
var routes = $"""
|
||
nats-route://127.0.0.1:{_clusterPorts[0]}
|
||
nats-route://127.0.0.1:{_clusterPorts[1]}
|
||
nats-route://127.0.0.1:{_clusterPorts[2]}
|
||
""";
|
||
|
||
for (var i = 0; i < 3; i++)
|
||
{
|
||
_configs[i] = $$"""
|
||
server_name: node{{i + 1}}
|
||
cluster {
|
||
name: e2e-js-cluster
|
||
listen: 127.0.0.1:{{_clusterPorts[i]}}
|
||
pool_size: 1
|
||
routes: [
|
||
{{routes}}
|
||
]
|
||
}
|
||
jetstream {
|
||
store_dir: "{{_storeDirs[i]}}"
|
||
max_mem_store: 64mb
|
||
max_file_store: 256mb
|
||
}
|
||
""";
|
||
|
||
_servers[i] = new NatsServerProcess(
|
||
_clientPorts[i],
|
||
extraArgs: ["-DV"],
|
||
configContent: _configs[i],
|
||
enableMonitoring: true,
|
||
monitorPort: _monitorPorts[i]);
|
||
}
|
||
|
||
await Task.WhenAll(_servers.Select(s => s!.StartAsync()));
|
||
|
||
await WaitForFullMeshAsync();
|
||
}
|
||
|
||
/// <summary>
|
||
/// Kills the server at the given index (0–2) by disposing it.
|
||
/// </summary>
|
||
public async Task KillNode(int index)
|
||
{
|
||
if (_servers[index] is { } server)
|
||
{
|
||
await server.DisposeAsync();
|
||
_servers[index] = null;
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// Restarts the server at the given index on its originally allocated ports.
|
||
/// Store dir is preserved to maintain JetStream state for catchup tests.
|
||
/// </summary>
|
||
public async Task RestartNode(int index)
|
||
{
|
||
// Dispose any existing process first (idempotent)
|
||
if (_servers[index] is not null)
|
||
await KillNode(index);
|
||
|
||
// Store dir is intentionally NOT deleted — preserves JetStream state for catchup tests
|
||
_servers[index] = new NatsServerProcess(
|
||
_clientPorts[index],
|
||
extraArgs: ["-DV"],
|
||
configContent: _configs[index],
|
||
enableMonitoring: true,
|
||
monitorPort: _monitorPorts[index]);
|
||
|
||
await _servers[index]!.StartAsync();
|
||
}
|
||
|
||
/// <summary>
|
||
/// Polls /routez on all alive nodes until each reports NumRoutes >= expectedRoutes.
|
||
/// Times out after 30 seconds.
|
||
/// </summary>
|
||
public async Task WaitForFullMeshAsync(int expectedRoutes = 2)
|
||
{
|
||
using var http = new HttpClient();
|
||
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(30));
|
||
using var timer = new PeriodicTimer(TimeSpan.FromMilliseconds(200));
|
||
|
||
while (await timer.WaitForNextTickAsync(timeout.Token).ConfigureAwait(false))
|
||
{
|
||
var allConnected = true;
|
||
|
||
for (var i = 0; i < 3; i++)
|
||
{
|
||
// Skip killed nodes
|
||
if (_servers[i] is null)
|
||
continue;
|
||
|
||
try
|
||
{
|
||
var routez = await http.GetFromJsonAsync<Routez>(
|
||
$"http://127.0.0.1:{_monitorPorts[i]}/routez",
|
||
timeout.Token);
|
||
|
||
if (routez?.NumRoutes < expectedRoutes)
|
||
{
|
||
allConnected = false;
|
||
break;
|
||
}
|
||
}
|
||
catch
|
||
{
|
||
allConnected = false;
|
||
break;
|
||
}
|
||
}
|
||
|
||
if (allConnected)
|
||
return;
|
||
}
|
||
|
||
throw new TimeoutException($"Cluster did not reach {expectedRoutes} routes per node within 30s.");
|
||
}
|
||
|
||
/// <summary>
|
||
/// Creates a NatsConnection pointed at the given node index (0–2).
|
||
/// </summary>
|
||
public NatsConnection CreateClient(int index = 0)
|
||
=> new(new NatsOpts { Url = $"nats://127.0.0.1:{_clientPorts[index]}" });
|
||
|
||
/// <summary>
|
||
/// Returns the captured stdout/stderr output from the server at the given index.
|
||
/// </summary>
|
||
public string GetServerOutput(int index)
|
||
=> _servers[index]?.Output ?? string.Empty;
|
||
|
||
public async Task DisposeAsync()
|
||
{
|
||
var disposeTasks = _servers
|
||
.Where(s => s is not null)
|
||
.Select(s => s!.DisposeAsync().AsTask());
|
||
|
||
await Task.WhenAll(disposeTasks);
|
||
|
||
// Delete store directories after servers are stopped
|
||
for (var i = 0; i < 3; i++)
|
||
{
|
||
if (_storeDirs[i] is { Length: > 0 } dir && Directory.Exists(dir))
|
||
{
|
||
try
|
||
{
|
||
Directory.Delete(dir, recursive: true);
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
Console.Error.WriteLine($"[JetStreamClusterFixture] Failed to delete store dir '{dir}': {ex.Message}");
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
private sealed class Routez
|
||
{
|
||
[JsonPropertyName("num_routes")]
|
||
public int NumRoutes { get; init; }
|
||
}
|
||
}
|
||
|
||
[CollectionDefinition("E2E-JetStreamCluster")]
|
||
public class JetStreamClusterCollection : ICollectionFixture<JetStreamClusterFixture>;
|