diff --git a/src/NATS.Server/Gateways/GatewayManager.cs b/src/NATS.Server/Gateways/GatewayManager.cs index 76fa93b..4fd44d4 100644 --- a/src/NATS.Server/Gateways/GatewayManager.cs +++ b/src/NATS.Server/Gateways/GatewayManager.cs @@ -210,7 +210,17 @@ public sealed class GatewayManager : IAsyncDisposable _options.Port = ((IPEndPoint)_listener.LocalEndPoint!).Port; _acceptLoopTask = Task.Run(() => AcceptLoopAsync(_cts.Token)); - foreach (var remote in _options.Remotes.Distinct(StringComparer.OrdinalIgnoreCase)) + + // Collect outbound endpoints from both the legacy Remotes list and the + // config-parsed RemoteGateways list (populated by ConfigProcessor). + var endpoints = new HashSet(_options.Remotes, StringComparer.OrdinalIgnoreCase); + foreach (var rgw in _options.RemoteGateways) + { + foreach (var uri in rgw.GetUrls()) + endpoints.Add($"{uri.Host}:{uri.Port}"); + } + + foreach (var remote in endpoints) _ = Task.Run(() => ConnectWithRetryAsync(remote, _cts.Token)); _logger.LogDebug("Gateway manager started (name={Name}, listen={Host}:{Port})", diff --git a/tests/NATS.E2E.Tests/GatewayTests.cs b/tests/NATS.E2E.Tests/GatewayTests.cs new file mode 100644 index 0000000..7bfeeca --- /dev/null +++ b/tests/NATS.E2E.Tests/GatewayTests.cs @@ -0,0 +1,50 @@ +using NATS.Client.Core; +using NATS.E2E.Tests.Infrastructure; + +namespace NATS.E2E.Tests; + +[Collection("E2E-Gateway")] +public class GatewayTests(GatewayFixture fixture) +{ + [Fact] + public async Task Gateway_MessageCrossesGateway() + { + await using var pub = fixture.CreateClientA(); + await using var sub = fixture.CreateClientB(); + await pub.ConnectAsync(); + await sub.ConnectAsync(); + + await using var subscription = await sub.SubscribeCoreAsync("e2e.gw.cross"); + // Ping both sides: subscriber's ping flushes the SUB to server B, + // publisher's ping ensures server A has received the propagated interest. + await sub.PingAsync(); + await pub.PingAsync(); + + await pub.PublishAsync("e2e.gw.cross", "gateway-msg"); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var msg = await subscription.Msgs.ReadAsync(cts.Token); + msg.Data.ShouldBe("gateway-msg"); + } + + [Fact] + public async Task Gateway_NoInterest_NoDelivery() + { + await using var pub = fixture.CreateClientA(); + await using var sub = fixture.CreateClientB(); + await pub.ConnectAsync(); + await sub.ConnectAsync(); + + await using var subscription = await sub.SubscribeCoreAsync("e2e.gw.listen"); + // Ping both sides to ensure subscription interest has propagated before publishing. + await sub.PingAsync(); + await pub.PingAsync(); + + await pub.PublishAsync("e2e.gw.nolisten", "should-not-arrive"); + await pub.PublishAsync("e2e.gw.listen", "should-arrive"); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var msg = await subscription.Msgs.ReadAsync(cts.Token); + msg.Data.ShouldBe("should-arrive"); + } +} diff --git a/tests/NATS.E2E.Tests/Infrastructure/GatewayFixture.cs b/tests/NATS.E2E.Tests/Infrastructure/GatewayFixture.cs new file mode 100644 index 0000000..012477b --- /dev/null +++ b/tests/NATS.E2E.Tests/Infrastructure/GatewayFixture.cs @@ -0,0 +1,111 @@ +using System.Net.Http.Json; +using NATS.Client.Core; + +namespace NATS.E2E.Tests.Infrastructure; + +public sealed class GatewayFixture : IAsyncLifetime +{ + private NatsServerProcess _serverA = null!; + private NatsServerProcess _serverB = null!; + + public int PortA => _serverA.Port; + public int PortB => _serverB.Port; + + public async Task InitializeAsync() + { + var gwPortA = NatsServerProcess.AllocateFreePort(); + var gwPortB = NatsServerProcess.AllocateFreePort(); + + var configA = $$""" + server_name: gw-a + gateway { + name: cluster-a + listen: 127.0.0.1:{{gwPortA}} + gateways: [ + { name: cluster-b, url: nats://127.0.0.1:{{gwPortB}} } + ] + } + """; + + var configB = $$""" + server_name: gw-b + gateway { + name: cluster-b + listen: 127.0.0.1:{{gwPortB}} + gateways: [ + { name: cluster-a, url: nats://127.0.0.1:{{gwPortA}} } + ] + } + """; + + _serverA = NatsServerProcess.WithConfig(configA, enableMonitoring: true); + _serverB = NatsServerProcess.WithConfig(configB, enableMonitoring: true); + + await Task.WhenAll(_serverA.StartAsync(), _serverB.StartAsync()); + + // Poll until both gateways report a connected outbound gateway + await WaitForGatewayConnectionAsync(); + } + + private async Task WaitForGatewayConnectionAsync() + { + using var http = new HttpClient(); + using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + using var timer = new PeriodicTimer(TimeSpan.FromMilliseconds(200)); + + var monitorPorts = new[] { _serverA.MonitorPort!.Value, _serverB.MonitorPort!.Value }; + + while (await timer.WaitForNextTickAsync(timeout.Token).ConfigureAwait(false)) + { + var allConnected = true; + foreach (var monitorPort in monitorPorts) + { + try + { + var gatewayz = await http.GetFromJsonAsync( + $"http://127.0.0.1:{monitorPort}/gatewayz", + timeout.Token); + + if (gatewayz?.NumGateways < 1) + { + allConnected = false; + break; + } + } + catch (HttpRequestException) + { + // monitor not yet ready — retry on next tick + allConnected = false; + break; + } + } + + if (allConnected) + return; + } + + throw new TimeoutException("Gateways did not connect to each other within 30s."); + } + + public async Task DisposeAsync() + { + await Task.WhenAll( + _serverA.DisposeAsync().AsTask(), + _serverB.DisposeAsync().AsTask()); + } + + public NatsConnection CreateClientA() + => new(new NatsOpts { Url = $"nats://127.0.0.1:{PortA}" }); + + public NatsConnection CreateClientB() + => new(new NatsOpts { Url = $"nats://127.0.0.1:{PortB}" }); + + private sealed class Gatewayz + { + [System.Text.Json.Serialization.JsonPropertyName("num_gateways")] + public int NumGateways { get; init; } + } +} + +[CollectionDefinition("E2E-Gateway")] +public class GatewayCollection : ICollectionFixture; diff --git a/tests/NATS.E2E.Tests/LeafNodeTests.cs b/tests/NATS.E2E.Tests/LeafNodeTests.cs new file mode 100644 index 0000000..7157aa4 --- /dev/null +++ b/tests/NATS.E2E.Tests/LeafNodeTests.cs @@ -0,0 +1,71 @@ +using NATS.Client.Core; +using NATS.E2E.Tests.Infrastructure; + +namespace NATS.E2E.Tests; + +[Collection("E2E-LeafNode")] +public class LeafNodeTests(LeafNodeFixture fixture) +{ + [Fact] + public async Task LeafNode_HubToLeaf_MessageDelivered() + { + await using var pub = fixture.CreateHubClient(); + await using var sub = fixture.CreateLeafClient(); + await pub.ConnectAsync(); + await sub.ConnectAsync(); + + await using var subscription = await sub.SubscribeCoreAsync("e2e.leaf.h2l"); + // Ping both sides: subscriber's ping flushes the SUB to the leaf server, + // publisher's ping ensures the hub has received the propagated interest. + await sub.PingAsync(); + await pub.PingAsync(); + + await pub.PublishAsync("e2e.leaf.h2l", "hub-to-leaf"); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var msg = await subscription.Msgs.ReadAsync(cts.Token); + msg.Data.ShouldBe("hub-to-leaf"); + } + + [Fact] + public async Task LeafNode_LeafToHub_MessageDelivered() + { + await using var pub = fixture.CreateLeafClient(); + await using var sub = fixture.CreateHubClient(); + await pub.ConnectAsync(); + await sub.ConnectAsync(); + + await using var subscription = await sub.SubscribeCoreAsync("e2e.leaf.l2h"); + // Ping both sides: subscriber's ping flushes the SUB to the hub server, + // publisher's ping ensures the leaf has received the propagated interest. + await sub.PingAsync(); + await pub.PingAsync(); + + await pub.PublishAsync("e2e.leaf.l2h", "leaf-to-hub"); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var msg = await subscription.Msgs.ReadAsync(cts.Token); + msg.Data.ShouldBe("leaf-to-hub"); + } + + [Fact] + public async Task LeafNode_OnlySubscribedSubjectsPropagate() + { + await using var pub = fixture.CreateHubClient(); + await using var sub = fixture.CreateLeafClient(); + await pub.ConnectAsync(); + await sub.ConnectAsync(); + + await using var subscription = await sub.SubscribeCoreAsync("e2e.leaf.specific"); + // Ping both sides to ensure subscription interest has propagated before publishing. + await sub.PingAsync(); + await pub.PingAsync(); + + await pub.PublishAsync("e2e.leaf.other", "wrong-subject"); + await pub.PublishAsync("e2e.leaf.specific", "right-subject"); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var msg = await subscription.Msgs.ReadAsync(cts.Token); + msg.Data.ShouldBe("right-subject"); + } +}