using Microsoft.Extensions.Logging.Abstractions; using NATS.Server.Configuration; namespace NATS.Server.Tests.LeafNodes; /// /// Shared fixture for leaf node tests that creates a hub and a spoke server /// connected via leaf node protocol. /// internal sealed class LeafFixture : IAsyncDisposable { private readonly CancellationTokenSource _hubCts; private readonly CancellationTokenSource _spokeCts; private LeafFixture(NatsServer hub, NatsServer spoke, CancellationTokenSource hubCts, CancellationTokenSource spokeCts) { Hub = hub; Spoke = spoke; _hubCts = hubCts; _spokeCts = spokeCts; } public NatsServer Hub { get; } public NatsServer Spoke { get; } public static async Task StartAsync() { var hubOptions = new NatsOptions { Host = "127.0.0.1", Port = 0, LeafNode = new LeafNodeOptions { Host = "127.0.0.1", Port = 0, }, }; var hub = new NatsServer(hubOptions, NullLoggerFactory.Instance); var hubCts = new CancellationTokenSource(); _ = hub.StartAsync(hubCts.Token); await hub.WaitForReadyAsync(); var spokeOptions = new NatsOptions { Host = "127.0.0.1", Port = 0, LeafNode = new LeafNodeOptions { Host = "127.0.0.1", Port = 0, Remotes = [hub.LeafListen!], }, }; var spoke = new NatsServer(spokeOptions, NullLoggerFactory.Instance); var spokeCts = new CancellationTokenSource(); _ = spoke.StartAsync(spokeCts.Token); await spoke.WaitForReadyAsync(); using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); while (!timeout.IsCancellationRequested && (hub.Stats.Leafs == 0 || spoke.Stats.Leafs == 0)) await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default); return new LeafFixture(hub, spoke, hubCts, spokeCts); } public async Task WaitForRemoteInterestOnHubAsync(string subject) { using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); while (!timeout.IsCancellationRequested) { if (Hub.HasRemoteInterest(subject)) return; await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default); } throw new TimeoutException($"Timed out waiting for remote interest on hub for '{subject}'."); } public async Task WaitForRemoteInterestOnSpokeAsync(string subject) { using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); while (!timeout.IsCancellationRequested) { if (Spoke.HasRemoteInterest(subject)) return; await Task.Delay(50, timeout.Token).ContinueWith(_ => { }, TaskScheduler.Default); } throw new TimeoutException($"Timed out waiting for remote interest on spoke for '{subject}'."); } public async ValueTask DisposeAsync() { await _spokeCts.CancelAsync(); await _hubCts.CancelAsync(); Spoke.Dispose(); Hub.Dispose(); _spokeCts.Dispose(); _hubCts.Dispose(); } }