diff --git a/src/NATS.Server/LeafNodes/LeafNodeManager.cs b/src/NATS.Server/LeafNodes/LeafNodeManager.cs index 8ae9784..9fe5947 100644 --- a/src/NATS.Server/LeafNodes/LeafNodeManager.cs +++ b/src/NATS.Server/LeafNodes/LeafNodeManager.cs @@ -239,6 +239,16 @@ public sealed class LeafNodeManager : IAsyncDisposable foreach (var remote in _options.Remotes.Distinct(StringComparer.OrdinalIgnoreCase)) _ = Task.Run(() => ConnectSolicitedWithRetryAsync(remote, _options.JetStreamDomain, _cts.Token)); + // Also start solicited connections for remotes parsed from the config file (RemoteLeaves). + // RemoteLeaves are populated by the config parser from leafnodes.remotes[] blocks; + // _options.Remotes is the simple programmatic list only. + // Go reference: leafnode.go — createLeafNode starts solicited connections via connectToRemoteLeaf. + foreach (var remoteLeaf in _options.RemoteLeaves) + { + foreach (var url in remoteLeaf.Urls.Distinct(StringComparer.OrdinalIgnoreCase)) + _ = Task.Run(() => ConnectSolicitedWithRetryAsync(url, _options.JetStreamDomain, _cts.Token)); + } + _logger.LogDebug("Leaf manager started (listen={Host}:{Port})", _options.Host, _options.Port); return Task.CompletedTask; } @@ -819,6 +829,12 @@ public sealed class LeafNodeManager : IAsyncDisposable private static IPEndPoint ParseEndpoint(string endpoint) { + // Handle full URLs with a scheme (e.g. "nats-leaf://127.0.0.1:5222"). + // Uri.TryCreate handles both schemed URLs and bare "host:port" strings. + if (Uri.TryCreate(endpoint, UriKind.Absolute, out var uri)) + return new IPEndPoint(IPAddress.Parse(uri.Host), uri.Port); + + // Fall back to bare "host:port" splitting for plain strings without a scheme. var parts = endpoint.Split(':', 2, StringSplitOptions.TrimEntries | StringSplitOptions.RemoveEmptyEntries); if (parts.Length != 2) throw new FormatException($"Invalid endpoint: {endpoint}"); diff --git a/tests/NATS.E2E.Tests/Infrastructure/LeafNodeFixture.cs b/tests/NATS.E2E.Tests/Infrastructure/LeafNodeFixture.cs new file mode 100644 index 0000000..12798ab --- /dev/null +++ b/tests/NATS.E2E.Tests/Infrastructure/LeafNodeFixture.cs @@ -0,0 +1,92 @@ +using System.Net.Http.Json; +using NATS.Client.Core; + +namespace NATS.E2E.Tests.Infrastructure; + +public sealed class LeafNodeFixture : IAsyncLifetime +{ + private NatsServerProcess _hub = null!; + private NatsServerProcess _leaf = null!; + + public int HubPort => _hub.Port; + public int LeafPort => _leaf.Port; + + public async Task InitializeAsync() + { + var leafListenPort = NatsServerProcess.AllocateFreePort(); + + var hubConfig = $$""" + server_name: hub + leafnodes { + listen: 127.0.0.1:{{leafListenPort}} + } + """; + + _hub = NatsServerProcess.WithConfig(hubConfig, enableMonitoring: true); + await _hub.StartAsync(); + + var leafConfig = $$""" + server_name: leaf + leafnodes { + remotes [ + { url: "nats-leaf://127.0.0.1:{{leafListenPort}}" } + ] + } + """; + + _leaf = NatsServerProcess.WithConfig(leafConfig); + await _leaf.StartAsync(); + + // Poll hub's /leafz until it reports 1 connected leaf node + await WaitForLeafConnectionAsync(); + } + + private async Task WaitForLeafConnectionAsync() + { + using var http = new HttpClient { Timeout = TimeSpan.FromSeconds(2) }; + using var deadline = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + using var timer = new PeriodicTimer(TimeSpan.FromMilliseconds(200)); + + while (await timer.WaitForNextTickAsync(deadline.Token).ConfigureAwait(false)) + { + try + { + var leafz = await http.GetFromJsonAsync( + $"http://127.0.0.1:{_hub.MonitorPort!.Value}/leafz", + deadline.Token); + + if (leafz?.NumLeafs >= 1) + return; + } + catch (Exception ex) when (ex is HttpRequestException or TaskCanceledException + && !deadline.IsCancellationRequested) + { + // Monitor not yet ready or per-request timeout — retry on next tick + _ = ex; + } + } + + throw new TimeoutException("Leaf node did not connect to hub within 30s."); + } + + public async Task DisposeAsync() + { + await _leaf.DisposeAsync(); + await _hub.DisposeAsync(); + } + + public NatsConnection CreateHubClient() + => new(new NatsOpts { Url = $"nats://127.0.0.1:{HubPort}" }); + + public NatsConnection CreateLeafClient() + => new(new NatsOpts { Url = $"nats://127.0.0.1:{LeafPort}" }); + + private sealed class Leafz + { + [System.Text.Json.Serialization.JsonPropertyName("num_leafs")] + public int NumLeafs { get; init; } + } +} + +[CollectionDefinition("E2E-LeafNode")] +public class LeafNodeCollection : ICollectionFixture;