feat(batch23): implement route lifecycle, solicitation, dedupe, and iteration

This commit is contained in:
Joseph Doherty
2026-02-28 21:19:30 -05:00
parent acf51bf480
commit 51c899b651
5 changed files with 294 additions and 0 deletions

View File

@@ -449,4 +449,38 @@ public sealed partial class ClientConnection
if (buf.Length > 0)
EnqueueProto(buf);
}
internal bool ImportFilter(string subject) => CanImport(subject);
internal bool IsSolicitedRoute() => Route?.DidSolicit == true;
internal Exception? ProcessRouteConnect(byte[] arg)
{
if (arg is not { Length: > 0 })
return new FormatException("processRouteConnect parse error");
ConnectInfo? info;
try
{
info = JsonSerializer.Deserialize<ConnectInfo>(arg);
}
catch (Exception ex)
{
return ex;
}
if (info is null)
return new FormatException("processRouteConnect missing CONNECT payload");
lock (_mu)
{
Opts.Name = info.Name;
Opts.Headers = info.Headers;
Route ??= new Route();
Route.Lnoc = info.Lnoc;
Route.Lnocu = info.Lnocu;
}
return null;
}
}

View File

@@ -1,8 +1,234 @@
// Copyright 2012-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
using System.Security.Cryptography;
using System.Text;
using System.Linq;
namespace ZB.MOM.NatsNet.Server;
public sealed partial class NatsServer
{
internal byte[] GenerateRouteInitialInfoJSON(string accName, string compression, int poolIdx, byte gossipMode)
{
var info = _routeInfo.ShallowClone();
Span<byte> nonce = stackalloc byte[16];
RandomNumberGenerator.Fill(nonce);
info.Nonce = Convert.ToBase64String(nonce);
info.RouteAccount = string.IsNullOrEmpty(accName) ? null : accName;
info.RoutePoolIdx = poolIdx;
info.GossipMode = gossipMode;
info.Compression = CompressionModeForInfoProtocol(GetOpts().Cluster.Compression, compression);
return GenerateInfoJson(info);
}
internal bool AddRoute(ClientConnection route, bool didSolicit, bool sendDelayedInfo, byte gossipMode, ServerInfo info, string accName)
{
_mu.EnterWriteLock();
try
{
route.Route ??= new Route();
route.Route.RemoteId = info.Id;
route.Route.DidSolicit = didSolicit;
route.Route.RemoteName = info.Name;
if (!string.IsNullOrEmpty(accName))
route.Route.AccName = Encoding.ASCII.GetBytes(accName);
if (!_routes.TryGetValue(info.Id, out var pool))
{
pool = [];
_routes[info.Id] = pool;
}
else
{
foreach (var existing in pool.ToArray())
{
if (!RouteHandler.HandleDuplicateRoute(existing, route))
return false;
pool.Remove(existing);
}
}
route.Route.PoolIdx = pool.Count;
pool.Add(route);
if (sendDelayedInfo)
route.EnqueueProto(GenerateRouteInitialInfoJSON(accName, info.Compression ?? string.Empty, route.Route.PoolIdx, gossipMode));
ForwardNewRouteInfoToKnownServers(info, didSolicit ? RouteType.Explicit : RouteType.Implicit, didSolicit, gossipMode);
return true;
}
finally
{
_mu.ExitWriteLock();
}
}
internal Exception? StartRouteAcceptLoop()
{
if (_routeListener == null)
return null;
if (!StartGoRoutine(() => Noticef("Route accept loop started")))
return new InvalidOperationException("unable to start route accept loop");
return null;
}
internal Exception? SetRouteInfoHostPortAndIP()
{
var opts = GetOpts();
var host = opts.Cluster.Host;
if (string.IsNullOrWhiteSpace(host))
host = opts.Host;
_mu.EnterWriteLock();
try
{
_routeInfo.Host = host;
_routeInfo.Port = opts.Cluster.Port;
_routeInfo.Ip = $"nats-route://{host}:{opts.Cluster.Port}/";
return null;
}
finally
{
_mu.ExitWriteLock();
}
}
public Exception? StartRouting()
{
var err = SetRouteInfoHostPortAndIP();
if (err != null)
return err;
SolicitRoutes();
return StartRouteAcceptLoop();
}
internal void ReConnectToRoute(Uri routeUrl, string accName = "")
{
StartGoRoutine(() =>
{
_ = ConnectToRoute(routeUrl, RouteType.Explicit, false, GossipMode.Default, accName);
});
}
internal bool RouteStillValid(Uri routeUrl)
{
var opts = GetOpts();
return opts.Routes.Any(r => string.Equals(r.Host, routeUrl.Host, StringComparison.OrdinalIgnoreCase) && r.Port == routeUrl.Port);
}
internal Exception? ConnectToRoute(Uri routeUrl, RouteType routeType, bool firstConnect, byte gossipMode, string accName)
{
_ = firstConnect;
if (!RouteStillValid(routeUrl))
return new InvalidOperationException($"route is no longer configured: {routeUrl}");
SaveRouteTLSName(routeUrl);
var route = CreateRoute(null, routeUrl, routeType, gossipMode, accName);
if (route is null)
return new InvalidOperationException("failed to create route");
return null;
}
internal bool SaveRouteTLSName(Uri routeUrl)
{
if (routeUrl is null || string.IsNullOrWhiteSpace(routeUrl.Host))
return false;
_mu.EnterWriteLock();
try
{
_routeTlsName = routeUrl.Host;
return true;
}
finally
{
_mu.ExitWriteLock();
}
}
internal void SolicitRoutes()
{
foreach (var route in GetOpts().Routes)
_ = ConnectToRoute(route, RouteType.Explicit, true, GossipMode.Default, string.Empty);
}
internal void RemoveAllRoutesExcept(string remoteId)
{
_mu.EnterWriteLock();
try
{
foreach (var (rid, routes) in _routes.ToArray())
{
if (rid == remoteId)
continue;
foreach (var route in routes)
route.CloseConnection(ClosedState.RouteRemoved);
_routes.Remove(rid);
}
}
finally
{
_mu.ExitWriteLock();
}
}
internal bool IsDuplicateServerName(string serverName)
{
var duplicate = false;
_mu.EnterReadLock();
try
{
ForEachRoute(route =>
{
if (route.Route?.RemoteName == serverName)
duplicate = true;
});
}
finally
{
_mu.ExitReadLock();
}
return duplicate;
}
internal void ForEachNonPerAccountRoute(Func<ClientConnection, bool> fn)
{
_mu.EnterReadLock();
try
{
foreach (var route in _routes.Values.SelectMany(v => v))
{
if (route.Route?.AccName is { Length: > 0 })
continue;
if (!fn(route))
break;
}
}
finally
{
_mu.ExitReadLock();
}
}
internal void ForEachRouteIdx(int idx, Func<ClientConnection, bool> fn)
{
_mu.EnterReadLock();
try
{
foreach (var pool in _routes.Values)
{
if (idx < 0 || idx >= pool.Count)
continue;
if (!fn(pool[idx]))
break;
}
}
finally
{
_mu.ExitReadLock();
}
}
}

View File

@@ -254,6 +254,7 @@ public sealed partial class NatsServer : INatsServer
private readonly ConcurrentDictionary<string, object?> _nodeToInfo = new(StringComparer.Ordinal);
private readonly ConcurrentDictionary<string, object?> _raftNodes = new(StringComparer.Ordinal);
private readonly Dictionary<string, string> _routesToSelf = [];
private string _routeTlsName = string.Empty;
private INetResolver? _routeResolver;
private readonly ConcurrentDictionary<string, object?> _rateLimitLogging = new();
private readonly Channel<TimeSpan> _rateLimitLoggingCh;

View File

@@ -24,4 +24,37 @@ internal static class RouteHandler
internal static bool RouteShouldDelayInfo(string accName, ServerOptions opts) =>
string.IsNullOrEmpty(accName) && opts.Cluster.PoolSize >= 1;
internal static bool HasSolicitedRoute(IReadOnlyList<ClientConnection> routes, string accName)
{
foreach (var route in routes)
{
if (route.Route?.DidSolicit != true)
continue;
var routeAcc = route.Route?.AccName is { Length: > 0 } an
? System.Text.Encoding.ASCII.GetString(an)
: string.Empty;
if (routeAcc == accName)
return true;
}
return false;
}
internal static void UpgradeRouteToSolicited(ClientConnection route)
{
if (route.Route is null)
return;
route.Route.DidSolicit = true;
route.Route.Retry = true;
}
internal static bool HandleDuplicateRoute(ClientConnection existing, ClientConnection incoming)
{
if (existing.IsSolicitedRoute() && !incoming.IsSolicitedRoute())
return false;
if (!existing.IsSolicitedRoute() && incoming.IsSolicitedRoute())
return true;
return incoming.Cid > existing.Cid;
}
}