feat(cluster): add implicit route and gateway discovery via INFO gossip
Implements ProcessImplicitRoute and ForwardNewRouteInfoToKnownServers on RouteManager, and ProcessImplicitGateway on GatewayManager, mirroring Go server/route.go and server/gateway.go INFO gossip-based peer discovery. Adds ConnectUrls to ServerInfo and introduces GatewayInfo model. 12 new unit tests in ImplicitDiscoveryTests.
This commit is contained in:
14
src/NATS.Server/Gateways/GatewayInfo.cs
Normal file
14
src/NATS.Server/Gateways/GatewayInfo.cs
Normal file
@@ -0,0 +1,14 @@
|
|||||||
|
namespace NATS.Server.Gateways;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Information about a remote gateway cluster received during implicit discovery.
|
||||||
|
/// Go reference: server/gateway.go — implicit gateway discovery via INFO gossip.
|
||||||
|
/// </summary>
|
||||||
|
public sealed record GatewayInfo
|
||||||
|
{
|
||||||
|
/// <summary>Name of the remote gateway cluster.</summary>
|
||||||
|
public required string Name { get; init; }
|
||||||
|
|
||||||
|
/// <summary>URLs for connecting to the remote gateway cluster.</summary>
|
||||||
|
public required string[] Urls { get; init; }
|
||||||
|
}
|
||||||
@@ -16,6 +16,7 @@ public sealed class GatewayManager : IAsyncDisposable
|
|||||||
private readonly Action<GatewayMessage> _messageSink;
|
private readonly Action<GatewayMessage> _messageSink;
|
||||||
private readonly ILogger<GatewayManager> _logger;
|
private readonly ILogger<GatewayManager> _logger;
|
||||||
private readonly ConcurrentDictionary<string, GatewayConnection> _connections = new(StringComparer.Ordinal);
|
private readonly ConcurrentDictionary<string, GatewayConnection> _connections = new(StringComparer.Ordinal);
|
||||||
|
private readonly HashSet<string> _discoveredGateways = new(StringComparer.OrdinalIgnoreCase);
|
||||||
private long _forwardedJetStreamClusterMessages;
|
private long _forwardedJetStreamClusterMessages;
|
||||||
|
|
||||||
private CancellationTokenSource? _cts;
|
private CancellationTokenSource? _cts;
|
||||||
@@ -44,6 +45,29 @@ public sealed class GatewayManager : IAsyncDisposable
|
|||||||
_logger = logger;
|
_logger = logger;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Gateway clusters auto-discovered via INFO gossip.
|
||||||
|
/// Go reference: server/gateway.go processImplicitGateway.
|
||||||
|
/// </summary>
|
||||||
|
public IReadOnlyCollection<string> DiscoveredGateways
|
||||||
|
{
|
||||||
|
get { lock (_discoveredGateways) return _discoveredGateways.ToList(); }
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Processes a gateway info message from a peer, discovering new gateway clusters.
|
||||||
|
/// Go reference: server/gateway.go:800-850 (processImplicitGateway).
|
||||||
|
/// </summary>
|
||||||
|
public void ProcessImplicitGateway(GatewayInfo gwInfo)
|
||||||
|
{
|
||||||
|
ArgumentNullException.ThrowIfNull(gwInfo);
|
||||||
|
|
||||||
|
lock (_discoveredGateways)
|
||||||
|
{
|
||||||
|
_discoveredGateways.Add(gwInfo.Name);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public Task StartAsync(CancellationToken ct)
|
public Task StartAsync(CancellationToken ct)
|
||||||
{
|
{
|
||||||
_cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
|
_cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
|
||||||
|
|||||||
@@ -89,6 +89,10 @@ public sealed class ServerInfo
|
|||||||
[JsonPropertyName("tls_available")]
|
[JsonPropertyName("tls_available")]
|
||||||
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
|
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingDefault)]
|
||||||
public bool TlsAvailable { get; set; }
|
public bool TlsAvailable { get; set; }
|
||||||
|
|
||||||
|
[JsonPropertyName("connect_urls")]
|
||||||
|
[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]
|
||||||
|
public string[]? ConnectUrls { get; set; }
|
||||||
}
|
}
|
||||||
|
|
||||||
public sealed class ClientOptions
|
public sealed class ClientOptions
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ using System.Net;
|
|||||||
using System.Net.Sockets;
|
using System.Net.Sockets;
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
using NATS.Server.Configuration;
|
using NATS.Server.Configuration;
|
||||||
|
using NATS.Server.Protocol;
|
||||||
using NATS.Server.Subscriptions;
|
using NATS.Server.Subscriptions;
|
||||||
|
|
||||||
namespace NATS.Server.Routes;
|
namespace NATS.Server.Routes;
|
||||||
@@ -18,6 +19,8 @@ public sealed class RouteManager : IAsyncDisposable
|
|||||||
private readonly Action<RouteMessage> _routedMessageSink;
|
private readonly Action<RouteMessage> _routedMessageSink;
|
||||||
private readonly ConcurrentDictionary<string, RouteConnection> _routes = new(StringComparer.Ordinal);
|
private readonly ConcurrentDictionary<string, RouteConnection> _routes = new(StringComparer.Ordinal);
|
||||||
private readonly ConcurrentDictionary<string, byte> _connectedServerIds = new(StringComparer.Ordinal);
|
private readonly ConcurrentDictionary<string, byte> _connectedServerIds = new(StringComparer.Ordinal);
|
||||||
|
private readonly HashSet<string> _discoveredRoutes = new(StringComparer.OrdinalIgnoreCase);
|
||||||
|
private readonly HashSet<string> _knownRouteUrls = new(StringComparer.OrdinalIgnoreCase);
|
||||||
|
|
||||||
private CancellationTokenSource? _cts;
|
private CancellationTokenSource? _cts;
|
||||||
private Socket? _listener;
|
private Socket? _listener;
|
||||||
@@ -49,6 +52,62 @@ public sealed class RouteManager : IAsyncDisposable
|
|||||||
_logger = logger;
|
_logger = logger;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Routes auto-discovered via INFO gossip from peers.
|
||||||
|
/// Go reference: server/route.go processImplicitRoute.
|
||||||
|
/// </summary>
|
||||||
|
public IReadOnlyCollection<string> DiscoveredRoutes
|
||||||
|
{
|
||||||
|
get { lock (_discoveredRoutes) return _discoveredRoutes.ToList(); }
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Event raised when new route info should be forwarded to known peers.
|
||||||
|
/// Go reference: server/route.go forwardNewRouteInfoToKnownServers.
|
||||||
|
/// </summary>
|
||||||
|
public event Action<List<string>>? OnForwardInfo;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Processes connect_urls from a peer's INFO message. Any URLs not already
|
||||||
|
/// known are added to DiscoveredRoutes for solicited connection.
|
||||||
|
/// Go reference: server/route.go:1500-1550 (processImplicitRoute).
|
||||||
|
/// </summary>
|
||||||
|
public void ProcessImplicitRoute(ServerInfo serverInfo)
|
||||||
|
{
|
||||||
|
if (serverInfo.ConnectUrls is null || serverInfo.ConnectUrls.Length == 0)
|
||||||
|
return;
|
||||||
|
|
||||||
|
lock (_discoveredRoutes)
|
||||||
|
{
|
||||||
|
foreach (var url in serverInfo.ConnectUrls)
|
||||||
|
{
|
||||||
|
if (!_knownRouteUrls.Contains(url))
|
||||||
|
{
|
||||||
|
_discoveredRoutes.Add(url);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Forwards new peer URL information to all known route connections.
|
||||||
|
/// Go reference: server/route.go forwardNewRouteInfoToKnownServers.
|
||||||
|
/// </summary>
|
||||||
|
public void ForwardNewRouteInfoToKnownServers(string newPeerUrl)
|
||||||
|
{
|
||||||
|
OnForwardInfo?.Invoke([newPeerUrl]);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Adds a URL to the known route set. Used during initialization and testing.
|
||||||
|
/// </summary>
|
||||||
|
public void AddKnownRoute(string url)
|
||||||
|
{
|
||||||
|
lock (_discoveredRoutes)
|
||||||
|
{
|
||||||
|
_knownRouteUrls.Add(url);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Returns a route pool index for the given account name, matching Go's
|
/// Returns a route pool index for the given account name, matching Go's
|
||||||
|
|||||||
236
tests/NATS.Server.Tests/ImplicitDiscoveryTests.cs
Normal file
236
tests/NATS.Server.Tests/ImplicitDiscoveryTests.cs
Normal file
@@ -0,0 +1,236 @@
|
|||||||
|
using Microsoft.Extensions.Logging.Abstractions;
|
||||||
|
using NATS.Server.Configuration;
|
||||||
|
using NATS.Server;
|
||||||
|
using NATS.Server.Gateways;
|
||||||
|
using NATS.Server.Protocol;
|
||||||
|
using NATS.Server.Routes;
|
||||||
|
|
||||||
|
namespace NATS.Server.Tests;
|
||||||
|
|
||||||
|
// Go reference: server/route.go processImplicitRoute, server/gateway.go processImplicitGateway
|
||||||
|
|
||||||
|
public class ImplicitDiscoveryTests
|
||||||
|
{
|
||||||
|
[Fact]
|
||||||
|
public void ProcessImplicitRoute_discovers_new_peer()
|
||||||
|
{
|
||||||
|
// Go reference: server/route.go processImplicitRoute
|
||||||
|
var mgr = RouteManagerTestHelper.Create();
|
||||||
|
var serverInfo = new ServerInfo
|
||||||
|
{
|
||||||
|
ServerId = "server-2",
|
||||||
|
ServerName = "server-2",
|
||||||
|
Version = "0.1.0",
|
||||||
|
Host = "0.0.0.0",
|
||||||
|
Port = 4222,
|
||||||
|
ConnectUrls = ["nats://10.0.0.2:6222", "nats://10.0.0.3:6222"],
|
||||||
|
};
|
||||||
|
|
||||||
|
mgr.ProcessImplicitRoute(serverInfo);
|
||||||
|
|
||||||
|
mgr.DiscoveredRoutes.ShouldContain("nats://10.0.0.2:6222");
|
||||||
|
mgr.DiscoveredRoutes.ShouldContain("nats://10.0.0.3:6222");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void ProcessImplicitRoute_skips_known_peers()
|
||||||
|
{
|
||||||
|
// Go reference: server/route.go processImplicitRoute — skip already-known
|
||||||
|
var mgr = RouteManagerTestHelper.Create();
|
||||||
|
mgr.AddKnownRoute("nats://10.0.0.2:6222");
|
||||||
|
|
||||||
|
var serverInfo = new ServerInfo
|
||||||
|
{
|
||||||
|
ServerId = "server-2",
|
||||||
|
ServerName = "server-2",
|
||||||
|
Version = "0.1.0",
|
||||||
|
Host = "0.0.0.0",
|
||||||
|
Port = 4222,
|
||||||
|
ConnectUrls = ["nats://10.0.0.2:6222", "nats://10.0.0.3:6222"],
|
||||||
|
};
|
||||||
|
|
||||||
|
mgr.ProcessImplicitRoute(serverInfo);
|
||||||
|
|
||||||
|
mgr.DiscoveredRoutes.Count.ShouldBe(1); // only 10.0.0.3 is new
|
||||||
|
mgr.DiscoveredRoutes.ShouldContain("nats://10.0.0.3:6222");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void ProcessImplicitRoute_with_null_urls_is_noop()
|
||||||
|
{
|
||||||
|
// Go reference: server/route.go processImplicitRoute — nil ConnectUrls guard
|
||||||
|
var mgr = RouteManagerTestHelper.Create();
|
||||||
|
var serverInfo = new ServerInfo
|
||||||
|
{
|
||||||
|
ServerId = "server-2",
|
||||||
|
ServerName = "server-2",
|
||||||
|
Version = "0.1.0",
|
||||||
|
Host = "0.0.0.0",
|
||||||
|
Port = 4222,
|
||||||
|
ConnectUrls = null,
|
||||||
|
};
|
||||||
|
|
||||||
|
mgr.ProcessImplicitRoute(serverInfo);
|
||||||
|
|
||||||
|
mgr.DiscoveredRoutes.Count.ShouldBe(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void ProcessImplicitRoute_with_empty_urls_is_noop()
|
||||||
|
{
|
||||||
|
// Go reference: server/route.go processImplicitRoute — empty ConnectUrls guard
|
||||||
|
var mgr = RouteManagerTestHelper.Create();
|
||||||
|
var serverInfo = new ServerInfo
|
||||||
|
{
|
||||||
|
ServerId = "server-2",
|
||||||
|
ServerName = "server-2",
|
||||||
|
Version = "0.1.0",
|
||||||
|
Host = "0.0.0.0",
|
||||||
|
Port = 4222,
|
||||||
|
ConnectUrls = [],
|
||||||
|
};
|
||||||
|
|
||||||
|
mgr.ProcessImplicitRoute(serverInfo);
|
||||||
|
|
||||||
|
mgr.DiscoveredRoutes.Count.ShouldBe(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void ProcessImplicitGateway_discovers_new_gateway()
|
||||||
|
{
|
||||||
|
// Go reference: server/gateway.go processImplicitGateway
|
||||||
|
var mgr = GatewayManagerTestHelper.Create();
|
||||||
|
var gwInfo = new GatewayInfo
|
||||||
|
{
|
||||||
|
Name = "cluster-B",
|
||||||
|
Urls = ["nats://10.0.1.1:7222"],
|
||||||
|
};
|
||||||
|
|
||||||
|
mgr.ProcessImplicitGateway(gwInfo);
|
||||||
|
|
||||||
|
mgr.DiscoveredGateways.ShouldContain("cluster-B");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void ProcessImplicitGateway_with_null_throws()
|
||||||
|
{
|
||||||
|
// Go reference: server/gateway.go processImplicitGateway — null guard
|
||||||
|
var mgr = GatewayManagerTestHelper.Create();
|
||||||
|
|
||||||
|
Should.Throw<ArgumentNullException>(() => mgr.ProcessImplicitGateway(null!));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void ProcessImplicitGateway_deduplicates_same_cluster()
|
||||||
|
{
|
||||||
|
// Go reference: server/gateway.go processImplicitGateway — idempotent discovery
|
||||||
|
var mgr = GatewayManagerTestHelper.Create();
|
||||||
|
var gwInfo = new GatewayInfo { Name = "cluster-B", Urls = ["nats://10.0.1.1:7222"] };
|
||||||
|
|
||||||
|
mgr.ProcessImplicitGateway(gwInfo);
|
||||||
|
mgr.ProcessImplicitGateway(gwInfo);
|
||||||
|
|
||||||
|
mgr.DiscoveredGateways.Count.ShouldBe(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void ForwardNewRouteInfo_invokes_event()
|
||||||
|
{
|
||||||
|
// Go reference: server/route.go forwardNewRouteInfoToKnownServers
|
||||||
|
var mgr = RouteManagerTestHelper.Create();
|
||||||
|
var forwarded = new List<string>();
|
||||||
|
mgr.OnForwardInfo += urls => forwarded.AddRange(urls);
|
||||||
|
|
||||||
|
mgr.ForwardNewRouteInfoToKnownServers("nats://10.0.0.5:6222");
|
||||||
|
|
||||||
|
forwarded.ShouldContain("nats://10.0.0.5:6222");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void ForwardNewRouteInfo_with_no_handler_does_not_throw()
|
||||||
|
{
|
||||||
|
// Go reference: server/route.go forwardNewRouteInfoToKnownServers — no subscribers
|
||||||
|
var mgr = RouteManagerTestHelper.Create();
|
||||||
|
|
||||||
|
Should.NotThrow(() => mgr.ForwardNewRouteInfoToKnownServers("nats://10.0.0.5:6222"));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void AddKnownRoute_prevents_later_discovery()
|
||||||
|
{
|
||||||
|
// Go reference: server/route.go processImplicitRoute — pre-seeded known routes
|
||||||
|
var mgr = RouteManagerTestHelper.Create();
|
||||||
|
mgr.AddKnownRoute("nats://10.0.0.9:6222");
|
||||||
|
|
||||||
|
var serverInfo = new ServerInfo
|
||||||
|
{
|
||||||
|
ServerId = "server-3",
|
||||||
|
ServerName = "server-3",
|
||||||
|
Version = "0.1.0",
|
||||||
|
Host = "0.0.0.0",
|
||||||
|
Port = 4222,
|
||||||
|
ConnectUrls = ["nats://10.0.0.9:6222"],
|
||||||
|
};
|
||||||
|
|
||||||
|
mgr.ProcessImplicitRoute(serverInfo);
|
||||||
|
|
||||||
|
mgr.DiscoveredRoutes.Count.ShouldBe(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void ConnectUrls_is_serialized_when_set()
|
||||||
|
{
|
||||||
|
// Go reference: server/route.go INFO message includes connect_urls
|
||||||
|
var info = new ServerInfo
|
||||||
|
{
|
||||||
|
ServerId = "s1",
|
||||||
|
ServerName = "s1",
|
||||||
|
Version = "0.1.0",
|
||||||
|
Host = "0.0.0.0",
|
||||||
|
Port = 4222,
|
||||||
|
ConnectUrls = ["nats://10.0.0.1:4222"],
|
||||||
|
};
|
||||||
|
|
||||||
|
var json = System.Text.Json.JsonSerializer.Serialize(info);
|
||||||
|
json.ShouldContain("connect_urls");
|
||||||
|
json.ShouldContain("nats://10.0.0.1:4222");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void ConnectUrls_is_omitted_when_null()
|
||||||
|
{
|
||||||
|
// Go reference: server/route.go INFO omits connect_urls when empty
|
||||||
|
var info = new ServerInfo
|
||||||
|
{
|
||||||
|
ServerId = "s1",
|
||||||
|
ServerName = "s1",
|
||||||
|
Version = "0.1.0",
|
||||||
|
Host = "0.0.0.0",
|
||||||
|
Port = 4222,
|
||||||
|
ConnectUrls = null,
|
||||||
|
};
|
||||||
|
|
||||||
|
var json = System.Text.Json.JsonSerializer.Serialize(info);
|
||||||
|
json.ShouldNotContain("connect_urls");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class RouteManagerTestHelper
|
||||||
|
{
|
||||||
|
public static RouteManager Create()
|
||||||
|
{
|
||||||
|
var options = new ClusterOptions { Name = "test-cluster", Host = "127.0.0.1", Port = 0 };
|
||||||
|
var stats = new ServerStats();
|
||||||
|
return new RouteManager(options, stats, "server-1", _ => { }, _ => { }, NullLogger<RouteManager>.Instance);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class GatewayManagerTestHelper
|
||||||
|
{
|
||||||
|
public static GatewayManager Create()
|
||||||
|
{
|
||||||
|
var options = new GatewayOptions { Name = "cluster-A", Host = "127.0.0.1", Port = 0 };
|
||||||
|
var stats = new ServerStats();
|
||||||
|
return new GatewayManager(options, stats, "server-1", _ => { }, _ => { }, NullLogger<GatewayManager>.Instance);
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user