diff --git a/tests/NATS.E2E.Cluster.Tests/Infrastructure/GatewayPairFixture.cs b/tests/NATS.E2E.Cluster.Tests/Infrastructure/GatewayPairFixture.cs new file mode 100644 index 0000000..652089d --- /dev/null +++ b/tests/NATS.E2E.Cluster.Tests/Infrastructure/GatewayPairFixture.cs @@ -0,0 +1,167 @@ +using System.Net.Http.Json; +using System.Text.Json.Serialization; +using NATS.Client.Core; + +namespace NATS.E2E.Cluster.Tests.Infrastructure; + +public sealed class GatewayPairFixture : IAsyncLifetime +{ + private readonly NatsServerProcess?[] _servers = new NatsServerProcess?[2]; + private readonly int[] _clientPorts = new int[2]; + private readonly int[] _gatewayPorts = new int[2]; + private readonly int[] _monitorPorts = new int[2]; + private readonly string[] _configs = new string[2]; + + public async Task InitializeAsync() + { + // Pre-allocate all ports upfront so they can be reused across kill/restart + for (var i = 0; i < 2; i++) + { + _clientPorts[i] = NatsServerProcess.AllocateFreePort(); + _gatewayPorts[i] = NatsServerProcess.AllocateFreePort(); + _monitorPorts[i] = NatsServerProcess.AllocateFreePort(); + } + + _configs[0] = $$""" + server_name: gw-a + gateway { + name: cluster-a + listen: 127.0.0.1:{{_gatewayPorts[0]}} + gateways: [ + { name: cluster-b, url: nats://127.0.0.1:{{_gatewayPorts[1]}} } + ] + } + """; + + _configs[1] = $$""" + server_name: gw-b + gateway { + name: cluster-b + listen: 127.0.0.1:{{_gatewayPorts[1]}} + gateways: [ + { name: cluster-a, url: nats://127.0.0.1:{{_gatewayPorts[0]}} } + ] + } + """; + + for (var i = 0; i < 2; i++) + { + _servers[i] = new NatsServerProcess( + _clientPorts[i], + configContent: _configs[i], + enableMonitoring: true, + monitorPort: _monitorPorts[i]); + } + + await Task.WhenAll(_servers.Select(s => s!.StartAsync())); + + await WaitForGatewayConnectionAsync(); + } + + /// + /// Kills the server at the given index (0=A, 1=B) by disposing it. + /// + public async Task KillNode(int index) + { + if (_servers[index] is { } server) + { + await server.DisposeAsync(); + _servers[index] = null; + } + } + + /// + /// Restarts the server at the given index on its originally allocated ports. + /// + public async Task RestartNode(int index) + { + // Dispose any existing process first (idempotent) + if (_servers[index] is not null) + await KillNode(index); + + _servers[index] = new NatsServerProcess( + _clientPorts[index], + configContent: _configs[index], + enableMonitoring: true, + monitorPort: _monitorPorts[index]); + + await _servers[index]!.StartAsync(); + } + + /// + /// Polls /gatewayz on alive nodes until NumGateways >= 1 for each. + /// Times out after 30 seconds. + /// + public async Task WaitForGatewayConnectionAsync() + { + using var http = new HttpClient(); + using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + using var timer = new PeriodicTimer(TimeSpan.FromMilliseconds(200)); + + while (await timer.WaitForNextTickAsync(timeout.Token).ConfigureAwait(false)) + { + var allConnected = true; + + for (var i = 0; i < 2; i++) + { + // Skip killed nodes + if (_servers[i] is null) + continue; + + try + { + var gatewayz = await http.GetFromJsonAsync( + $"http://127.0.0.1:{_monitorPorts[i]}/gatewayz", + timeout.Token); + + if (gatewayz?.NumGateways < 1) + { + allConnected = false; + break; + } + } + catch (HttpRequestException) + { + // Monitor not yet ready — retry on next tick + allConnected = false; + break; + } + } + + if (allConnected) + return; + } + + throw new TimeoutException("Gateways did not connect to each other within 30s."); + } + + /// + /// Creates a NatsConnection pointed at server A (index 0). + /// + public NatsConnection CreateClientA() + => new(new NatsOpts { Url = $"nats://127.0.0.1:{_clientPorts[0]}" }); + + /// + /// Creates a NatsConnection pointed at server B (index 1). + /// + public NatsConnection CreateClientB() + => new(new NatsOpts { Url = $"nats://127.0.0.1:{_clientPorts[1]}" }); + + public async Task DisposeAsync() + { + var disposeTasks = _servers + .Where(s => s is not null) + .Select(s => s!.DisposeAsync().AsTask()); + + await Task.WhenAll(disposeTasks); + } + + private sealed class Gatewayz + { + [JsonPropertyName("num_gateways")] + public int NumGateways { get; init; } + } +} + +[CollectionDefinition("E2E-GatewayPair")] +public class GatewayPairCollection : ICollectionFixture;