diff --git a/src/NATS.Server/Gateways/GatewayInfo.cs b/src/NATS.Server/Gateways/GatewayInfo.cs new file mode 100644 index 0000000..c5a33a5 --- /dev/null +++ b/src/NATS.Server/Gateways/GatewayInfo.cs @@ -0,0 +1,14 @@ +namespace NATS.Server.Gateways; + +/// +/// Information about a remote gateway cluster received during implicit discovery. +/// Go reference: server/gateway.go — implicit gateway discovery via INFO gossip. +/// +public sealed record GatewayInfo +{ + /// Name of the remote gateway cluster. + public required string Name { get; init; } + + /// URLs for connecting to the remote gateway cluster. + public required string[] Urls { get; init; } +} diff --git a/src/NATS.Server/Gateways/GatewayManager.cs b/src/NATS.Server/Gateways/GatewayManager.cs index 32a4c3c..a6632fd 100644 --- a/src/NATS.Server/Gateways/GatewayManager.cs +++ b/src/NATS.Server/Gateways/GatewayManager.cs @@ -16,6 +16,7 @@ public sealed class GatewayManager : IAsyncDisposable private readonly Action _messageSink; private readonly ILogger _logger; private readonly ConcurrentDictionary _connections = new(StringComparer.Ordinal); + private readonly HashSet _discoveredGateways = new(StringComparer.OrdinalIgnoreCase); private long _forwardedJetStreamClusterMessages; private CancellationTokenSource? _cts; @@ -44,6 +45,29 @@ public sealed class GatewayManager : IAsyncDisposable _logger = logger; } + /// + /// Gateway clusters auto-discovered via INFO gossip. + /// Go reference: server/gateway.go processImplicitGateway. + /// + public IReadOnlyCollection DiscoveredGateways + { + get { lock (_discoveredGateways) return _discoveredGateways.ToList(); } + } + + /// + /// Processes a gateway info message from a peer, discovering new gateway clusters. + /// Go reference: server/gateway.go:800-850 (processImplicitGateway). + /// + public void ProcessImplicitGateway(GatewayInfo gwInfo) + { + ArgumentNullException.ThrowIfNull(gwInfo); + + lock (_discoveredGateways) + { + _discoveredGateways.Add(gwInfo.Name); + } + } + public Task StartAsync(CancellationToken ct) { _cts = CancellationTokenSource.CreateLinkedTokenSource(ct); diff --git a/src/NATS.Server/Protocol/NatsProtocol.cs b/src/NATS.Server/Protocol/NatsProtocol.cs index ab4ef7a..ca952da 100644 --- a/src/NATS.Server/Protocol/NatsProtocol.cs +++ b/src/NATS.Server/Protocol/NatsProtocol.cs @@ -89,6 +89,10 @@ public sealed class ServerInfo [JsonPropertyName("tls_available")] [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)] public bool TlsAvailable { get; set; } + + [JsonPropertyName("connect_urls")] + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + public string[]? ConnectUrls { get; set; } } public sealed class ClientOptions diff --git a/src/NATS.Server/Routes/RouteManager.cs b/src/NATS.Server/Routes/RouteManager.cs index 4437116..f41353b 100644 --- a/src/NATS.Server/Routes/RouteManager.cs +++ b/src/NATS.Server/Routes/RouteManager.cs @@ -3,6 +3,7 @@ using System.Net; using System.Net.Sockets; using Microsoft.Extensions.Logging; using NATS.Server.Configuration; +using NATS.Server.Protocol; using NATS.Server.Subscriptions; namespace NATS.Server.Routes; @@ -18,6 +19,8 @@ public sealed class RouteManager : IAsyncDisposable private readonly Action _routedMessageSink; private readonly ConcurrentDictionary _routes = new(StringComparer.Ordinal); private readonly ConcurrentDictionary _connectedServerIds = new(StringComparer.Ordinal); + private readonly HashSet _discoveredRoutes = new(StringComparer.OrdinalIgnoreCase); + private readonly HashSet _knownRouteUrls = new(StringComparer.OrdinalIgnoreCase); private CancellationTokenSource? _cts; private Socket? _listener; @@ -49,6 +52,62 @@ public sealed class RouteManager : IAsyncDisposable _logger = logger; } + /// + /// Routes auto-discovered via INFO gossip from peers. + /// Go reference: server/route.go processImplicitRoute. + /// + public IReadOnlyCollection DiscoveredRoutes + { + get { lock (_discoveredRoutes) return _discoveredRoutes.ToList(); } + } + + /// + /// Event raised when new route info should be forwarded to known peers. + /// Go reference: server/route.go forwardNewRouteInfoToKnownServers. + /// + public event Action>? OnForwardInfo; + + /// + /// Processes connect_urls from a peer's INFO message. Any URLs not already + /// known are added to DiscoveredRoutes for solicited connection. + /// Go reference: server/route.go:1500-1550 (processImplicitRoute). + /// + public void ProcessImplicitRoute(ServerInfo serverInfo) + { + if (serverInfo.ConnectUrls is null || serverInfo.ConnectUrls.Length == 0) + return; + + lock (_discoveredRoutes) + { + foreach (var url in serverInfo.ConnectUrls) + { + if (!_knownRouteUrls.Contains(url)) + { + _discoveredRoutes.Add(url); + } + } + } + } + + /// + /// Forwards new peer URL information to all known route connections. + /// Go reference: server/route.go forwardNewRouteInfoToKnownServers. + /// + public void ForwardNewRouteInfoToKnownServers(string newPeerUrl) + { + OnForwardInfo?.Invoke([newPeerUrl]); + } + + /// + /// Adds a URL to the known route set. Used during initialization and testing. + /// + public void AddKnownRoute(string url) + { + lock (_discoveredRoutes) + { + _knownRouteUrls.Add(url); + } + } /// /// Returns a route pool index for the given account name, matching Go's diff --git a/tests/NATS.Server.Tests/ImplicitDiscoveryTests.cs b/tests/NATS.Server.Tests/ImplicitDiscoveryTests.cs new file mode 100644 index 0000000..36ca02e --- /dev/null +++ b/tests/NATS.Server.Tests/ImplicitDiscoveryTests.cs @@ -0,0 +1,236 @@ +using Microsoft.Extensions.Logging.Abstractions; +using NATS.Server.Configuration; +using NATS.Server; +using NATS.Server.Gateways; +using NATS.Server.Protocol; +using NATS.Server.Routes; + +namespace NATS.Server.Tests; + +// Go reference: server/route.go processImplicitRoute, server/gateway.go processImplicitGateway + +public class ImplicitDiscoveryTests +{ + [Fact] + public void ProcessImplicitRoute_discovers_new_peer() + { + // Go reference: server/route.go processImplicitRoute + var mgr = RouteManagerTestHelper.Create(); + var serverInfo = new ServerInfo + { + ServerId = "server-2", + ServerName = "server-2", + Version = "0.1.0", + Host = "0.0.0.0", + Port = 4222, + ConnectUrls = ["nats://10.0.0.2:6222", "nats://10.0.0.3:6222"], + }; + + mgr.ProcessImplicitRoute(serverInfo); + + mgr.DiscoveredRoutes.ShouldContain("nats://10.0.0.2:6222"); + mgr.DiscoveredRoutes.ShouldContain("nats://10.0.0.3:6222"); + } + + [Fact] + public void ProcessImplicitRoute_skips_known_peers() + { + // Go reference: server/route.go processImplicitRoute — skip already-known + var mgr = RouteManagerTestHelper.Create(); + mgr.AddKnownRoute("nats://10.0.0.2:6222"); + + var serverInfo = new ServerInfo + { + ServerId = "server-2", + ServerName = "server-2", + Version = "0.1.0", + Host = "0.0.0.0", + Port = 4222, + ConnectUrls = ["nats://10.0.0.2:6222", "nats://10.0.0.3:6222"], + }; + + mgr.ProcessImplicitRoute(serverInfo); + + mgr.DiscoveredRoutes.Count.ShouldBe(1); // only 10.0.0.3 is new + mgr.DiscoveredRoutes.ShouldContain("nats://10.0.0.3:6222"); + } + + [Fact] + public void ProcessImplicitRoute_with_null_urls_is_noop() + { + // Go reference: server/route.go processImplicitRoute — nil ConnectUrls guard + var mgr = RouteManagerTestHelper.Create(); + var serverInfo = new ServerInfo + { + ServerId = "server-2", + ServerName = "server-2", + Version = "0.1.0", + Host = "0.0.0.0", + Port = 4222, + ConnectUrls = null, + }; + + mgr.ProcessImplicitRoute(serverInfo); + + mgr.DiscoveredRoutes.Count.ShouldBe(0); + } + + [Fact] + public void ProcessImplicitRoute_with_empty_urls_is_noop() + { + // Go reference: server/route.go processImplicitRoute — empty ConnectUrls guard + var mgr = RouteManagerTestHelper.Create(); + var serverInfo = new ServerInfo + { + ServerId = "server-2", + ServerName = "server-2", + Version = "0.1.0", + Host = "0.0.0.0", + Port = 4222, + ConnectUrls = [], + }; + + mgr.ProcessImplicitRoute(serverInfo); + + mgr.DiscoveredRoutes.Count.ShouldBe(0); + } + + [Fact] + public void ProcessImplicitGateway_discovers_new_gateway() + { + // Go reference: server/gateway.go processImplicitGateway + var mgr = GatewayManagerTestHelper.Create(); + var gwInfo = new GatewayInfo + { + Name = "cluster-B", + Urls = ["nats://10.0.1.1:7222"], + }; + + mgr.ProcessImplicitGateway(gwInfo); + + mgr.DiscoveredGateways.ShouldContain("cluster-B"); + } + + [Fact] + public void ProcessImplicitGateway_with_null_throws() + { + // Go reference: server/gateway.go processImplicitGateway — null guard + var mgr = GatewayManagerTestHelper.Create(); + + Should.Throw(() => mgr.ProcessImplicitGateway(null!)); + } + + [Fact] + public void ProcessImplicitGateway_deduplicates_same_cluster() + { + // Go reference: server/gateway.go processImplicitGateway — idempotent discovery + var mgr = GatewayManagerTestHelper.Create(); + var gwInfo = new GatewayInfo { Name = "cluster-B", Urls = ["nats://10.0.1.1:7222"] }; + + mgr.ProcessImplicitGateway(gwInfo); + mgr.ProcessImplicitGateway(gwInfo); + + mgr.DiscoveredGateways.Count.ShouldBe(1); + } + + [Fact] + public void ForwardNewRouteInfo_invokes_event() + { + // Go reference: server/route.go forwardNewRouteInfoToKnownServers + var mgr = RouteManagerTestHelper.Create(); + var forwarded = new List(); + mgr.OnForwardInfo += urls => forwarded.AddRange(urls); + + mgr.ForwardNewRouteInfoToKnownServers("nats://10.0.0.5:6222"); + + forwarded.ShouldContain("nats://10.0.0.5:6222"); + } + + [Fact] + public void ForwardNewRouteInfo_with_no_handler_does_not_throw() + { + // Go reference: server/route.go forwardNewRouteInfoToKnownServers — no subscribers + var mgr = RouteManagerTestHelper.Create(); + + Should.NotThrow(() => mgr.ForwardNewRouteInfoToKnownServers("nats://10.0.0.5:6222")); + } + + [Fact] + public void AddKnownRoute_prevents_later_discovery() + { + // Go reference: server/route.go processImplicitRoute — pre-seeded known routes + var mgr = RouteManagerTestHelper.Create(); + mgr.AddKnownRoute("nats://10.0.0.9:6222"); + + var serverInfo = new ServerInfo + { + ServerId = "server-3", + ServerName = "server-3", + Version = "0.1.0", + Host = "0.0.0.0", + Port = 4222, + ConnectUrls = ["nats://10.0.0.9:6222"], + }; + + mgr.ProcessImplicitRoute(serverInfo); + + mgr.DiscoveredRoutes.Count.ShouldBe(0); + } + + [Fact] + public void ConnectUrls_is_serialized_when_set() + { + // Go reference: server/route.go INFO message includes connect_urls + var info = new ServerInfo + { + ServerId = "s1", + ServerName = "s1", + Version = "0.1.0", + Host = "0.0.0.0", + Port = 4222, + ConnectUrls = ["nats://10.0.0.1:4222"], + }; + + var json = System.Text.Json.JsonSerializer.Serialize(info); + json.ShouldContain("connect_urls"); + json.ShouldContain("nats://10.0.0.1:4222"); + } + + [Fact] + public void ConnectUrls_is_omitted_when_null() + { + // Go reference: server/route.go INFO omits connect_urls when empty + var info = new ServerInfo + { + ServerId = "s1", + ServerName = "s1", + Version = "0.1.0", + Host = "0.0.0.0", + Port = 4222, + ConnectUrls = null, + }; + + var json = System.Text.Json.JsonSerializer.Serialize(info); + json.ShouldNotContain("connect_urls"); + } +} + +public static class RouteManagerTestHelper +{ + public static RouteManager Create() + { + var options = new ClusterOptions { Name = "test-cluster", Host = "127.0.0.1", Port = 0 }; + var stats = new ServerStats(); + return new RouteManager(options, stats, "server-1", _ => { }, _ => { }, NullLogger.Instance); + } +} + +public static class GatewayManagerTestHelper +{ + public static GatewayManager Create() + { + var options = new GatewayOptions { Name = "cluster-A", Host = "127.0.0.1", Port = 0 }; + var stats = new ServerStats(); + return new GatewayManager(options, stats, "server-1", _ => { }, _ => { }, NullLogger.Instance); + } +}