Files
natsdotnet/tests/NATS.E2E.Cluster.Tests/Infrastructure/JetStreamClusterFixture.cs

197 lines
6.3 KiB
C#
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
using System.Net.Http.Json;
using System.Text.Json.Serialization;
using NATS.Client.Core;
namespace NATS.E2E.Cluster.Tests.Infrastructure;
public sealed class JetStreamClusterFixture : IAsyncLifetime
{
private readonly NatsServerProcess?[] _servers = new NatsServerProcess?[3];
private readonly int[] _clientPorts = new int[3];
private readonly int[] _clusterPorts = new int[3];
private readonly int[] _monitorPorts = new int[3];
private readonly string[] _configs = new string[3];
private readonly string[] _storeDirs = new string[3];
public async Task InitializeAsync()
{
// Pre-allocate all ports upfront so they can be reused across kill/restart
for (var i = 0; i < 3; i++)
{
_clientPorts[i] = NatsServerProcess.AllocateFreePort();
_clusterPorts[i] = NatsServerProcess.AllocateFreePort();
_monitorPorts[i] = NatsServerProcess.AllocateFreePort();
}
// Create unique store directories for this fixture instance
var guid = Guid.NewGuid().ToString("N");
for (var i = 0; i < 3; i++)
{
_storeDirs[i] = $"/tmp/nats-e2e-js-cluster-{guid}-node{i}";
Directory.CreateDirectory(_storeDirs[i]);
}
var routes = $"""
nats-route://127.0.0.1:{_clusterPorts[0]}
nats-route://127.0.0.1:{_clusterPorts[1]}
nats-route://127.0.0.1:{_clusterPorts[2]}
""";
for (var i = 0; i < 3; i++)
{
_configs[i] = $$"""
server_name: node{{i + 1}}
cluster {
name: e2e-js-cluster
listen: 127.0.0.1:{{_clusterPorts[i]}}
pool_size: 1
routes: [
{{routes}}
]
}
jetstream {
store_dir: "{{_storeDirs[i]}}"
max_mem_store: 64mb
max_file_store: 256mb
}
""";
_servers[i] = new NatsServerProcess(
_clientPorts[i],
extraArgs: ["-DV"],
configContent: _configs[i],
enableMonitoring: true,
monitorPort: _monitorPorts[i]);
}
await Task.WhenAll(_servers.Select(s => s!.StartAsync()));
await WaitForFullMeshAsync();
}
/// <summary>
/// Kills the server at the given index (02) by disposing it.
/// </summary>
public async Task KillNode(int index)
{
if (_servers[index] is { } server)
{
await server.DisposeAsync();
_servers[index] = null;
}
}
/// <summary>
/// Restarts the server at the given index on its originally allocated ports.
/// Store dir is preserved to maintain JetStream state for catchup tests.
/// </summary>
public async Task RestartNode(int index)
{
// Dispose any existing process first (idempotent)
if (_servers[index] is not null)
await KillNode(index);
// Store dir is intentionally NOT deleted — preserves JetStream state for catchup tests
_servers[index] = new NatsServerProcess(
_clientPorts[index],
extraArgs: ["-DV"],
configContent: _configs[index],
enableMonitoring: true,
monitorPort: _monitorPorts[index]);
await _servers[index]!.StartAsync();
}
/// <summary>
/// Polls /routez on all alive nodes until each reports NumRoutes >= expectedRoutes.
/// Times out after 30 seconds.
/// </summary>
public async Task WaitForFullMeshAsync(int expectedRoutes = 2)
{
using var http = new HttpClient();
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(30));
using var timer = new PeriodicTimer(TimeSpan.FromMilliseconds(200));
while (await timer.WaitForNextTickAsync(timeout.Token).ConfigureAwait(false))
{
var allConnected = true;
for (var i = 0; i < 3; i++)
{
// Skip killed nodes
if (_servers[i] is null)
continue;
try
{
var routez = await http.GetFromJsonAsync<Routez>(
$"http://127.0.0.1:{_monitorPorts[i]}/routez",
timeout.Token);
if (routez?.NumRoutes < expectedRoutes)
{
allConnected = false;
break;
}
}
catch
{
allConnected = false;
break;
}
}
if (allConnected)
return;
}
throw new TimeoutException($"Cluster did not reach {expectedRoutes} routes per node within 30s.");
}
/// <summary>
/// Creates a NatsConnection pointed at the given node index (02).
/// </summary>
public NatsConnection CreateClient(int index = 0)
=> new(new NatsOpts { Url = $"nats://127.0.0.1:{_clientPorts[index]}" });
/// <summary>
/// Returns the captured stdout/stderr output from the server at the given index.
/// </summary>
public string GetServerOutput(int index)
=> _servers[index]?.Output ?? string.Empty;
public async Task DisposeAsync()
{
var disposeTasks = _servers
.Where(s => s is not null)
.Select(s => s!.DisposeAsync().AsTask());
await Task.WhenAll(disposeTasks);
// Delete store directories after servers are stopped
for (var i = 0; i < 3; i++)
{
if (_storeDirs[i] is { Length: > 0 } dir && Directory.Exists(dir))
{
try
{
Directory.Delete(dir, recursive: true);
}
catch (Exception ex)
{
Console.Error.WriteLine($"[JetStreamClusterFixture] Failed to delete store dir '{dir}': {ex.Message}");
}
}
}
}
private sealed class Routez
{
[JsonPropertyName("num_routes")]
public int NumRoutes { get; init; }
}
}
[CollectionDefinition("E2E-JetStreamCluster")]
public class JetStreamClusterCollection : ICollectionFixture<JetStreamClusterFixture>;