Files
natsdotnet/tests/NATS.E2E.Cluster.Tests/Infrastructure/ThreeNodeClusterFixture.cs

165 lines
5.1 KiB
C#
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
using System.Net.Http.Json;
using System.Text.Json.Serialization;
using NATS.Client.Core;
namespace NATS.E2E.Cluster.Tests.Infrastructure;
public sealed class ThreeNodeClusterFixture : IAsyncLifetime
{
private readonly NatsServerProcess?[] _servers = new NatsServerProcess?[3];
private readonly int[] _clientPorts = new int[3];
private readonly int[] _clusterPorts = new int[3];
private readonly int[] _monitorPorts = new int[3];
private readonly string[] _configs = new string[3];
public async Task InitializeAsync()
{
// Pre-allocate all ports upfront so they can be reused across kill/restart
for (var i = 0; i < 3; i++)
{
_clientPorts[i] = NatsServerProcess.AllocateFreePort();
_clusterPorts[i] = NatsServerProcess.AllocateFreePort();
_monitorPorts[i] = NatsServerProcess.AllocateFreePort();
}
var routes = $"""
nats-route://127.0.0.1:{_clusterPorts[0]}
nats-route://127.0.0.1:{_clusterPorts[1]}
nats-route://127.0.0.1:{_clusterPorts[2]}
""";
for (var i = 0; i < 3; i++)
{
_configs[i] = $$"""
server_name: node{{i + 1}}
cluster {
name: e2e-cluster
listen: 127.0.0.1:{{_clusterPorts[i]}}
pool_size: 1
routes: [
{{routes}}
]
}
""";
_servers[i] = new NatsServerProcess(
_clientPorts[i],
extraArgs: ["-DV"],
configContent: _configs[i],
enableMonitoring: true,
monitorPort: _monitorPorts[i]);
}
await Task.WhenAll(_servers.Select(s => s!.StartAsync()));
await WaitForFullMeshAsync();
}
/// <summary>
/// Kills the server at the given index (02) by disposing it.
/// </summary>
public async Task KillNode(int index)
{
if (_servers[index] is { } server)
{
await server.DisposeAsync();
_servers[index] = null;
}
}
/// <summary>
/// Restarts the server at the given index on its originally allocated ports.
/// </summary>
public async Task RestartNode(int index)
{
// Dispose any existing process first (idempotent)
if (_servers[index] is not null)
await KillNode(index);
_servers[index] = new NatsServerProcess(
_clientPorts[index],
extraArgs: ["-DV"],
configContent: _configs[index],
enableMonitoring: true,
monitorPort: _monitorPorts[index]);
await _servers[index]!.StartAsync();
}
/// <summary>
/// Polls /routez on all alive nodes until each reports NumRoutes >= expectedRoutes.
/// Times out after 30 seconds.
/// </summary>
public async Task WaitForFullMeshAsync(int expectedRoutes = 2)
{
using var http = new HttpClient();
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(30));
using var timer = new PeriodicTimer(TimeSpan.FromMilliseconds(200));
while (await timer.WaitForNextTickAsync(timeout.Token).ConfigureAwait(false))
{
var allConnected = true;
for (var i = 0; i < 3; i++)
{
// Skip killed nodes
if (_servers[i] is null)
continue;
try
{
var routez = await http.GetFromJsonAsync<Routez>(
$"http://127.0.0.1:{_monitorPorts[i]}/routez",
timeout.Token);
if (routez?.NumRoutes < expectedRoutes)
{
allConnected = false;
break;
}
}
catch
{
allConnected = false;
break;
}
}
if (allConnected)
return;
}
throw new TimeoutException($"Cluster did not reach {expectedRoutes} routes per node within 30s.");
}
/// <summary>
/// Creates a NatsConnection pointed at the given node index (02).
/// </summary>
public NatsConnection CreateClient(int index = 0)
=> new(new NatsOpts { Url = $"nats://127.0.0.1:{_clientPorts[index]}" });
/// <summary>
/// Returns the captured stdout/stderr output from the server at the given index.
/// </summary>
public string GetServerOutput(int index)
=> _servers[index]?.Output ?? string.Empty;
public async Task DisposeAsync()
{
var disposeTasks = _servers
.Where(s => s is not null)
.Select(s => s!.DisposeAsync().AsTask());
await Task.WhenAll(disposeTasks);
}
private sealed class Routez
{
[JsonPropertyName("num_routes")]
public int NumRoutes { get; init; }
}
}
[CollectionDefinition("E2E-ThreeNodeCluster")]
public class ThreeNodeClusterCollection : ICollectionFixture<ThreeNodeClusterFixture>;