diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.Routes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.Routes.cs index 9eee451..530c2b1 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.Routes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.Routes.cs @@ -449,4 +449,38 @@ public sealed partial class ClientConnection if (buf.Length > 0) EnqueueProto(buf); } + + internal bool ImportFilter(string subject) => CanImport(subject); + + internal bool IsSolicitedRoute() => Route?.DidSolicit == true; + + internal Exception? ProcessRouteConnect(byte[] arg) + { + if (arg is not { Length: > 0 }) + return new FormatException("processRouteConnect parse error"); + + ConnectInfo? info; + try + { + info = JsonSerializer.Deserialize(arg); + } + catch (Exception ex) + { + return ex; + } + + if (info is null) + return new FormatException("processRouteConnect missing CONNECT payload"); + + lock (_mu) + { + Opts.Name = info.Name; + Opts.Headers = info.Headers; + Route ??= new Route(); + Route.Lnoc = info.Lnoc; + Route.Lnocu = info.Lnocu; + } + + return null; + } } diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Routes.Connections.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Routes.Connections.cs index e5654ae..d72caec 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Routes.Connections.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Routes.Connections.cs @@ -1,8 +1,234 @@ // Copyright 2012-2026 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); +using System.Security.Cryptography; +using System.Text; +using System.Linq; + namespace ZB.MOM.NatsNet.Server; public sealed partial class NatsServer { + internal byte[] GenerateRouteInitialInfoJSON(string accName, string compression, int poolIdx, byte gossipMode) + { + var info = _routeInfo.ShallowClone(); + Span nonce = stackalloc byte[16]; + RandomNumberGenerator.Fill(nonce); + info.Nonce = Convert.ToBase64String(nonce); + info.RouteAccount = string.IsNullOrEmpty(accName) ? null : accName; + info.RoutePoolIdx = poolIdx; + info.GossipMode = gossipMode; + info.Compression = CompressionModeForInfoProtocol(GetOpts().Cluster.Compression, compression); + return GenerateInfoJson(info); + } + + internal bool AddRoute(ClientConnection route, bool didSolicit, bool sendDelayedInfo, byte gossipMode, ServerInfo info, string accName) + { + _mu.EnterWriteLock(); + try + { + route.Route ??= new Route(); + route.Route.RemoteId = info.Id; + route.Route.DidSolicit = didSolicit; + route.Route.RemoteName = info.Name; + if (!string.IsNullOrEmpty(accName)) + route.Route.AccName = Encoding.ASCII.GetBytes(accName); + + if (!_routes.TryGetValue(info.Id, out var pool)) + { + pool = []; + _routes[info.Id] = pool; + } + else + { + foreach (var existing in pool.ToArray()) + { + if (!RouteHandler.HandleDuplicateRoute(existing, route)) + return false; + pool.Remove(existing); + } + } + + route.Route.PoolIdx = pool.Count; + pool.Add(route); + + if (sendDelayedInfo) + route.EnqueueProto(GenerateRouteInitialInfoJSON(accName, info.Compression ?? string.Empty, route.Route.PoolIdx, gossipMode)); + + ForwardNewRouteInfoToKnownServers(info, didSolicit ? RouteType.Explicit : RouteType.Implicit, didSolicit, gossipMode); + return true; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal Exception? StartRouteAcceptLoop() + { + if (_routeListener == null) + return null; + + if (!StartGoRoutine(() => Noticef("Route accept loop started"))) + return new InvalidOperationException("unable to start route accept loop"); + return null; + } + + internal Exception? SetRouteInfoHostPortAndIP() + { + var opts = GetOpts(); + var host = opts.Cluster.Host; + if (string.IsNullOrWhiteSpace(host)) + host = opts.Host; + + _mu.EnterWriteLock(); + try + { + _routeInfo.Host = host; + _routeInfo.Port = opts.Cluster.Port; + _routeInfo.Ip = $"nats-route://{host}:{opts.Cluster.Port}/"; + return null; + } + finally + { + _mu.ExitWriteLock(); + } + } + + public Exception? StartRouting() + { + var err = SetRouteInfoHostPortAndIP(); + if (err != null) + return err; + SolicitRoutes(); + return StartRouteAcceptLoop(); + } + + internal void ReConnectToRoute(Uri routeUrl, string accName = "") + { + StartGoRoutine(() => + { + _ = ConnectToRoute(routeUrl, RouteType.Explicit, false, GossipMode.Default, accName); + }); + } + + internal bool RouteStillValid(Uri routeUrl) + { + var opts = GetOpts(); + return opts.Routes.Any(r => string.Equals(r.Host, routeUrl.Host, StringComparison.OrdinalIgnoreCase) && r.Port == routeUrl.Port); + } + + internal Exception? ConnectToRoute(Uri routeUrl, RouteType routeType, bool firstConnect, byte gossipMode, string accName) + { + _ = firstConnect; + if (!RouteStillValid(routeUrl)) + return new InvalidOperationException($"route is no longer configured: {routeUrl}"); + + SaveRouteTLSName(routeUrl); + var route = CreateRoute(null, routeUrl, routeType, gossipMode, accName); + if (route is null) + return new InvalidOperationException("failed to create route"); + return null; + } + + internal bool SaveRouteTLSName(Uri routeUrl) + { + if (routeUrl is null || string.IsNullOrWhiteSpace(routeUrl.Host)) + return false; + + _mu.EnterWriteLock(); + try + { + _routeTlsName = routeUrl.Host; + return true; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void SolicitRoutes() + { + foreach (var route in GetOpts().Routes) + _ = ConnectToRoute(route, RouteType.Explicit, true, GossipMode.Default, string.Empty); + } + + internal void RemoveAllRoutesExcept(string remoteId) + { + _mu.EnterWriteLock(); + try + { + foreach (var (rid, routes) in _routes.ToArray()) + { + if (rid == remoteId) + continue; + + foreach (var route in routes) + route.CloseConnection(ClosedState.RouteRemoved); + _routes.Remove(rid); + } + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal bool IsDuplicateServerName(string serverName) + { + var duplicate = false; + _mu.EnterReadLock(); + try + { + ForEachRoute(route => + { + if (route.Route?.RemoteName == serverName) + duplicate = true; + }); + } + finally + { + _mu.ExitReadLock(); + } + return duplicate; + } + + internal void ForEachNonPerAccountRoute(Func fn) + { + _mu.EnterReadLock(); + try + { + foreach (var route in _routes.Values.SelectMany(v => v)) + { + if (route.Route?.AccName is { Length: > 0 }) + continue; + if (!fn(route)) + break; + } + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void ForEachRouteIdx(int idx, Func fn) + { + _mu.EnterReadLock(); + try + { + foreach (var pool in _routes.Values) + { + if (idx < 0 || idx >= pool.Count) + continue; + if (!fn(pool[idx])) + break; + } + } + finally + { + _mu.ExitReadLock(); + } + } } diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.cs index ae0aee9..fc6abad 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.cs @@ -254,6 +254,7 @@ public sealed partial class NatsServer : INatsServer private readonly ConcurrentDictionary _nodeToInfo = new(StringComparer.Ordinal); private readonly ConcurrentDictionary _raftNodes = new(StringComparer.Ordinal); private readonly Dictionary _routesToSelf = []; + private string _routeTlsName = string.Empty; private INetResolver? _routeResolver; private readonly ConcurrentDictionary _rateLimitLogging = new(); private readonly Channel _rateLimitLoggingCh; diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Routes/RouteHandler.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Routes/RouteHandler.cs index fd6aa43..6327326 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Routes/RouteHandler.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Routes/RouteHandler.cs @@ -24,4 +24,37 @@ internal static class RouteHandler internal static bool RouteShouldDelayInfo(string accName, ServerOptions opts) => string.IsNullOrEmpty(accName) && opts.Cluster.PoolSize >= 1; + + internal static bool HasSolicitedRoute(IReadOnlyList routes, string accName) + { + foreach (var route in routes) + { + if (route.Route?.DidSolicit != true) + continue; + + var routeAcc = route.Route?.AccName is { Length: > 0 } an + ? System.Text.Encoding.ASCII.GetString(an) + : string.Empty; + if (routeAcc == accName) + return true; + } + return false; + } + + internal static void UpgradeRouteToSolicited(ClientConnection route) + { + if (route.Route is null) + return; + route.Route.DidSolicit = true; + route.Route.Retry = true; + } + + internal static bool HandleDuplicateRoute(ClientConnection existing, ClientConnection incoming) + { + if (existing.IsSolicitedRoute() && !incoming.IsSolicitedRoute()) + return false; + if (!existing.IsSolicitedRoute() && incoming.IsSolicitedRoute()) + return true; + return incoming.Cid > existing.Cid; + } } diff --git a/porting.db b/porting.db index d411554..3d58586 100644 Binary files a/porting.db and b/porting.db differ