test: add E2E leaf node tests (hub-to-leaf, leaf-to-hub, subject propagation)
Fix LeafNodeManager.StartAsync to launch solicited connections for RemoteLeaves (config-file-parsed remotes) in addition to the programmatic Remotes list, and update ParseEndpoint to handle nats-leaf:// scheme URLs. Add LeafNodeFixture that polls /leafz for connection readiness and three E2E tests covering hub→leaf delivery, leaf→hub delivery, and subject-scoped propagation.
This commit is contained in:
@@ -239,6 +239,16 @@ public sealed class LeafNodeManager : IAsyncDisposable
|
||||
foreach (var remote in _options.Remotes.Distinct(StringComparer.OrdinalIgnoreCase))
|
||||
_ = Task.Run(() => ConnectSolicitedWithRetryAsync(remote, _options.JetStreamDomain, _cts.Token));
|
||||
|
||||
// Also start solicited connections for remotes parsed from the config file (RemoteLeaves).
|
||||
// RemoteLeaves are populated by the config parser from leafnodes.remotes[] blocks;
|
||||
// _options.Remotes is the simple programmatic list only.
|
||||
// Go reference: leafnode.go — createLeafNode starts solicited connections via connectToRemoteLeaf.
|
||||
foreach (var remoteLeaf in _options.RemoteLeaves)
|
||||
{
|
||||
foreach (var url in remoteLeaf.Urls.Distinct(StringComparer.OrdinalIgnoreCase))
|
||||
_ = Task.Run(() => ConnectSolicitedWithRetryAsync(url, _options.JetStreamDomain, _cts.Token));
|
||||
}
|
||||
|
||||
_logger.LogDebug("Leaf manager started (listen={Host}:{Port})", _options.Host, _options.Port);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
@@ -819,6 +829,12 @@ public sealed class LeafNodeManager : IAsyncDisposable
|
||||
|
||||
private static IPEndPoint ParseEndpoint(string endpoint)
|
||||
{
|
||||
// Handle full URLs with a scheme (e.g. "nats-leaf://127.0.0.1:5222").
|
||||
// Uri.TryCreate handles both schemed URLs and bare "host:port" strings.
|
||||
if (Uri.TryCreate(endpoint, UriKind.Absolute, out var uri))
|
||||
return new IPEndPoint(IPAddress.Parse(uri.Host), uri.Port);
|
||||
|
||||
// Fall back to bare "host:port" splitting for plain strings without a scheme.
|
||||
var parts = endpoint.Split(':', 2, StringSplitOptions.TrimEntries | StringSplitOptions.RemoveEmptyEntries);
|
||||
if (parts.Length != 2)
|
||||
throw new FormatException($"Invalid endpoint: {endpoint}");
|
||||
|
||||
92
tests/NATS.E2E.Tests/Infrastructure/LeafNodeFixture.cs
Normal file
92
tests/NATS.E2E.Tests/Infrastructure/LeafNodeFixture.cs
Normal file
@@ -0,0 +1,92 @@
|
||||
using System.Net.Http.Json;
|
||||
using NATS.Client.Core;
|
||||
|
||||
namespace NATS.E2E.Tests.Infrastructure;
|
||||
|
||||
public sealed class LeafNodeFixture : IAsyncLifetime
|
||||
{
|
||||
private NatsServerProcess _hub = null!;
|
||||
private NatsServerProcess _leaf = null!;
|
||||
|
||||
public int HubPort => _hub.Port;
|
||||
public int LeafPort => _leaf.Port;
|
||||
|
||||
public async Task InitializeAsync()
|
||||
{
|
||||
var leafListenPort = NatsServerProcess.AllocateFreePort();
|
||||
|
||||
var hubConfig = $$"""
|
||||
server_name: hub
|
||||
leafnodes {
|
||||
listen: 127.0.0.1:{{leafListenPort}}
|
||||
}
|
||||
""";
|
||||
|
||||
_hub = NatsServerProcess.WithConfig(hubConfig, enableMonitoring: true);
|
||||
await _hub.StartAsync();
|
||||
|
||||
var leafConfig = $$"""
|
||||
server_name: leaf
|
||||
leafnodes {
|
||||
remotes [
|
||||
{ url: "nats-leaf://127.0.0.1:{{leafListenPort}}" }
|
||||
]
|
||||
}
|
||||
""";
|
||||
|
||||
_leaf = NatsServerProcess.WithConfig(leafConfig);
|
||||
await _leaf.StartAsync();
|
||||
|
||||
// Poll hub's /leafz until it reports 1 connected leaf node
|
||||
await WaitForLeafConnectionAsync();
|
||||
}
|
||||
|
||||
private async Task WaitForLeafConnectionAsync()
|
||||
{
|
||||
using var http = new HttpClient { Timeout = TimeSpan.FromSeconds(2) };
|
||||
using var deadline = new CancellationTokenSource(TimeSpan.FromSeconds(30));
|
||||
using var timer = new PeriodicTimer(TimeSpan.FromMilliseconds(200));
|
||||
|
||||
while (await timer.WaitForNextTickAsync(deadline.Token).ConfigureAwait(false))
|
||||
{
|
||||
try
|
||||
{
|
||||
var leafz = await http.GetFromJsonAsync<Leafz>(
|
||||
$"http://127.0.0.1:{_hub.MonitorPort!.Value}/leafz",
|
||||
deadline.Token);
|
||||
|
||||
if (leafz?.NumLeafs >= 1)
|
||||
return;
|
||||
}
|
||||
catch (Exception ex) when (ex is HttpRequestException or TaskCanceledException
|
||||
&& !deadline.IsCancellationRequested)
|
||||
{
|
||||
// Monitor not yet ready or per-request timeout — retry on next tick
|
||||
_ = ex;
|
||||
}
|
||||
}
|
||||
|
||||
throw new TimeoutException("Leaf node did not connect to hub within 30s.");
|
||||
}
|
||||
|
||||
public async Task DisposeAsync()
|
||||
{
|
||||
await _leaf.DisposeAsync();
|
||||
await _hub.DisposeAsync();
|
||||
}
|
||||
|
||||
public NatsConnection CreateHubClient()
|
||||
=> new(new NatsOpts { Url = $"nats://127.0.0.1:{HubPort}" });
|
||||
|
||||
public NatsConnection CreateLeafClient()
|
||||
=> new(new NatsOpts { Url = $"nats://127.0.0.1:{LeafPort}" });
|
||||
|
||||
private sealed class Leafz
|
||||
{
|
||||
[System.Text.Json.Serialization.JsonPropertyName("num_leafs")]
|
||||
public int NumLeafs { get; init; }
|
||||
}
|
||||
}
|
||||
|
||||
[CollectionDefinition("E2E-LeafNode")]
|
||||
public class LeafNodeCollection : ICollectionFixture<LeafNodeFixture>;
|
||||
Reference in New Issue
Block a user