From d8eadeb624c5c3c58868489cf015eb694d36466f Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Thu, 12 Mar 2026 23:32:10 -0400 Subject: [PATCH] feat: add HubLeafFixture for leaf node failover tests --- .../Infrastructure/HubLeafFixture.cs | 166 ++++++++++++++++++ 1 file changed, 166 insertions(+) create mode 100644 tests/NATS.E2E.Cluster.Tests/Infrastructure/HubLeafFixture.cs diff --git a/tests/NATS.E2E.Cluster.Tests/Infrastructure/HubLeafFixture.cs b/tests/NATS.E2E.Cluster.Tests/Infrastructure/HubLeafFixture.cs new file mode 100644 index 0000000..252d280 --- /dev/null +++ b/tests/NATS.E2E.Cluster.Tests/Infrastructure/HubLeafFixture.cs @@ -0,0 +1,166 @@ +using System.Net.Http.Json; +using System.Text.Json.Serialization; +using NATS.Client.Core; + +namespace NATS.E2E.Cluster.Tests.Infrastructure; + +public sealed class HubLeafFixture : IAsyncLifetime +{ + private NatsServerProcess? _hub; + private NatsServerProcess? _leaf; + + // Pre-allocated ports stored for restart reuse + private readonly int _hubPort; + private readonly int _leafPort; + private readonly int _leafListenPort; + private readonly int _hubMonitorPort; + + // Configs stored for restart reuse + private readonly string _hubConfig; + private readonly string _leafConfig; + + public int HubPort => _hubPort; + public int LeafPort => _leafPort; + + public HubLeafFixture() + { + _hubPort = NatsServerProcess.AllocateFreePort(); + _leafPort = NatsServerProcess.AllocateFreePort(); + _leafListenPort = NatsServerProcess.AllocateFreePort(); + _hubMonitorPort = NatsServerProcess.AllocateFreePort(); + + _hubConfig = $$""" + server_name: hub + leafnodes { + listen: 127.0.0.1:{{_leafListenPort}} + } + """; + + _leafConfig = $$""" + server_name: leaf + leafnodes { + remotes [ + { url: "nats-leaf://127.0.0.1:{{_leafListenPort}}" } + ] + } + """; + } + + public async Task InitializeAsync() + { + _hub = new NatsServerProcess(_hubPort, configContent: _hubConfig, enableMonitoring: true, monitorPort: _hubMonitorPort); + await _hub.StartAsync(); + + _leaf = new NatsServerProcess(_leafPort, configContent: _leafConfig); + await _leaf.StartAsync(); + + await WaitForLeafConnectionAsync(); + } + + /// + /// Kills the server at the given index (0 = hub, 1 = leaf). + /// + public async Task KillNode(int index) + { + if (index == 0 && _hub is not null) + { + await _hub.DisposeAsync(); + _hub = null; + } + else if (index == 1 && _leaf is not null) + { + await _leaf.DisposeAsync(); + _leaf = null; + } + else + { + throw new ArgumentOutOfRangeException(nameof(index), "Index must be 0 (hub) or 1 (leaf)."); + } + } + + /// + /// Restarts the server at the given index on its original ports. + /// For hub (0): restarts hub and waits for TCP ready. + /// For leaf (1): restarts leaf and waits for TCP ready. + /// After either restart, call WaitForLeafConnectionAsync separately. + /// + public async Task RestartNode(int index) + { + if (index == 0) + { + if (_hub is not null) + await _hub.DisposeAsync(); + + _hub = new NatsServerProcess(_hubPort, configContent: _hubConfig, enableMonitoring: true, monitorPort: _hubMonitorPort); + await _hub.StartAsync(); + } + else if (index == 1) + { + if (_leaf is not null) + await _leaf.DisposeAsync(); + + _leaf = new NatsServerProcess(_leafPort, configContent: _leafConfig); + await _leaf.StartAsync(); + } + else + { + throw new ArgumentOutOfRangeException(nameof(index), "Index must be 0 (hub) or 1 (leaf)."); + } + } + + /// + /// Polls hub's /leafz until NumLeafs >= 1. 30s timeout, 200ms polling. + /// Handles hub being down gracefully by retrying on network errors. + /// + public async Task WaitForLeafConnectionAsync() + { + using var http = new HttpClient { Timeout = TimeSpan.FromSeconds(2) }; + using var deadline = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + using var timer = new PeriodicTimer(TimeSpan.FromMilliseconds(200)); + + while (await timer.WaitForNextTickAsync(deadline.Token).ConfigureAwait(false)) + { + try + { + var leafz = await http.GetFromJsonAsync( + $"http://127.0.0.1:{_hubMonitorPort}/leafz", + deadline.Token); + + if (leafz?.NumLeafs >= 1) + return; + } + catch (Exception ex) when (ex is HttpRequestException or TaskCanceledException + && !deadline.IsCancellationRequested) + { + // Hub not yet ready or per-request timeout — retry on next tick + _ = ex; + } + } + + throw new TimeoutException("Leaf node did not connect to hub within 30s."); + } + + public NatsConnection CreateHubClient() + => new(new NatsOpts { Url = $"nats://127.0.0.1:{_hubPort}" }); + + public NatsConnection CreateLeafClient() + => new(new NatsOpts { Url = $"nats://127.0.0.1:{_leafPort}" }); + + public async Task DisposeAsync() + { + if (_leaf is not null) + await _leaf.DisposeAsync(); + + if (_hub is not null) + await _hub.DisposeAsync(); + } + + private sealed class Leafz + { + [JsonPropertyName("num_leafs")] + public int NumLeafs { get; init; } + } +} + +[CollectionDefinition("E2E-HubLeaf")] +public class HubLeafCollection : ICollectionFixture;