From 51c899b65119be6b8c031f8005c4a9e37536ce0f Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 21:19:30 -0500 Subject: [PATCH] feat(batch23): implement route lifecycle, solicitation, dedupe, and iteration --- .../ClientConnection.Routes.cs | 34 +++ .../NatsServer.Routes.Connections.cs | 226 ++++++++++++++++++ .../src/ZB.MOM.NatsNet.Server/NatsServer.cs | 1 + .../Routes/RouteHandler.cs | 33 +++ porting.db | Bin 6733824 -> 6737920 bytes 5 files changed, 294 insertions(+) diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.Routes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.Routes.cs index 9eee451..530c2b1 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.Routes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.Routes.cs @@ -449,4 +449,38 @@ public sealed partial class ClientConnection if (buf.Length > 0) EnqueueProto(buf); } + + internal bool ImportFilter(string subject) => CanImport(subject); + + internal bool IsSolicitedRoute() => Route?.DidSolicit == true; + + internal Exception? ProcessRouteConnect(byte[] arg) + { + if (arg is not { Length: > 0 }) + return new FormatException("processRouteConnect parse error"); + + ConnectInfo? info; + try + { + info = JsonSerializer.Deserialize(arg); + } + catch (Exception ex) + { + return ex; + } + + if (info is null) + return new FormatException("processRouteConnect missing CONNECT payload"); + + lock (_mu) + { + Opts.Name = info.Name; + Opts.Headers = info.Headers; + Route ??= new Route(); + Route.Lnoc = info.Lnoc; + Route.Lnocu = info.Lnocu; + } + + return null; + } } diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Routes.Connections.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Routes.Connections.cs index e5654ae..d72caec 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Routes.Connections.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Routes.Connections.cs @@ -1,8 +1,234 @@ // Copyright 2012-2026 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); +using System.Security.Cryptography; +using System.Text; +using System.Linq; + namespace ZB.MOM.NatsNet.Server; public sealed partial class NatsServer { + internal byte[] GenerateRouteInitialInfoJSON(string accName, string compression, int poolIdx, byte gossipMode) + { + var info = _routeInfo.ShallowClone(); + Span nonce = stackalloc byte[16]; + RandomNumberGenerator.Fill(nonce); + info.Nonce = Convert.ToBase64String(nonce); + info.RouteAccount = string.IsNullOrEmpty(accName) ? null : accName; + info.RoutePoolIdx = poolIdx; + info.GossipMode = gossipMode; + info.Compression = CompressionModeForInfoProtocol(GetOpts().Cluster.Compression, compression); + return GenerateInfoJson(info); + } + + internal bool AddRoute(ClientConnection route, bool didSolicit, bool sendDelayedInfo, byte gossipMode, ServerInfo info, string accName) + { + _mu.EnterWriteLock(); + try + { + route.Route ??= new Route(); + route.Route.RemoteId = info.Id; + route.Route.DidSolicit = didSolicit; + route.Route.RemoteName = info.Name; + if (!string.IsNullOrEmpty(accName)) + route.Route.AccName = Encoding.ASCII.GetBytes(accName); + + if (!_routes.TryGetValue(info.Id, out var pool)) + { + pool = []; + _routes[info.Id] = pool; + } + else + { + foreach (var existing in pool.ToArray()) + { + if (!RouteHandler.HandleDuplicateRoute(existing, route)) + return false; + pool.Remove(existing); + } + } + + route.Route.PoolIdx = pool.Count; + pool.Add(route); + + if (sendDelayedInfo) + route.EnqueueProto(GenerateRouteInitialInfoJSON(accName, info.Compression ?? string.Empty, route.Route.PoolIdx, gossipMode)); + + ForwardNewRouteInfoToKnownServers(info, didSolicit ? RouteType.Explicit : RouteType.Implicit, didSolicit, gossipMode); + return true; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal Exception? StartRouteAcceptLoop() + { + if (_routeListener == null) + return null; + + if (!StartGoRoutine(() => Noticef("Route accept loop started"))) + return new InvalidOperationException("unable to start route accept loop"); + return null; + } + + internal Exception? SetRouteInfoHostPortAndIP() + { + var opts = GetOpts(); + var host = opts.Cluster.Host; + if (string.IsNullOrWhiteSpace(host)) + host = opts.Host; + + _mu.EnterWriteLock(); + try + { + _routeInfo.Host = host; + _routeInfo.Port = opts.Cluster.Port; + _routeInfo.Ip = $"nats-route://{host}:{opts.Cluster.Port}/"; + return null; + } + finally + { + _mu.ExitWriteLock(); + } + } + + public Exception? StartRouting() + { + var err = SetRouteInfoHostPortAndIP(); + if (err != null) + return err; + SolicitRoutes(); + return StartRouteAcceptLoop(); + } + + internal void ReConnectToRoute(Uri routeUrl, string accName = "") + { + StartGoRoutine(() => + { + _ = ConnectToRoute(routeUrl, RouteType.Explicit, false, GossipMode.Default, accName); + }); + } + + internal bool RouteStillValid(Uri routeUrl) + { + var opts = GetOpts(); + return opts.Routes.Any(r => string.Equals(r.Host, routeUrl.Host, StringComparison.OrdinalIgnoreCase) && r.Port == routeUrl.Port); + } + + internal Exception? ConnectToRoute(Uri routeUrl, RouteType routeType, bool firstConnect, byte gossipMode, string accName) + { + _ = firstConnect; + if (!RouteStillValid(routeUrl)) + return new InvalidOperationException($"route is no longer configured: {routeUrl}"); + + SaveRouteTLSName(routeUrl); + var route = CreateRoute(null, routeUrl, routeType, gossipMode, accName); + if (route is null) + return new InvalidOperationException("failed to create route"); + return null; + } + + internal bool SaveRouteTLSName(Uri routeUrl) + { + if (routeUrl is null || string.IsNullOrWhiteSpace(routeUrl.Host)) + return false; + + _mu.EnterWriteLock(); + try + { + _routeTlsName = routeUrl.Host; + return true; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void SolicitRoutes() + { + foreach (var route in GetOpts().Routes) + _ = ConnectToRoute(route, RouteType.Explicit, true, GossipMode.Default, string.Empty); + } + + internal void RemoveAllRoutesExcept(string remoteId) + { + _mu.EnterWriteLock(); + try + { + foreach (var (rid, routes) in _routes.ToArray()) + { + if (rid == remoteId) + continue; + + foreach (var route in routes) + route.CloseConnection(ClosedState.RouteRemoved); + _routes.Remove(rid); + } + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal bool IsDuplicateServerName(string serverName) + { + var duplicate = false; + _mu.EnterReadLock(); + try + { + ForEachRoute(route => + { + if (route.Route?.RemoteName == serverName) + duplicate = true; + }); + } + finally + { + _mu.ExitReadLock(); + } + return duplicate; + } + + internal void ForEachNonPerAccountRoute(Func fn) + { + _mu.EnterReadLock(); + try + { + foreach (var route in _routes.Values.SelectMany(v => v)) + { + if (route.Route?.AccName is { Length: > 0 }) + continue; + if (!fn(route)) + break; + } + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void ForEachRouteIdx(int idx, Func fn) + { + _mu.EnterReadLock(); + try + { + foreach (var pool in _routes.Values) + { + if (idx < 0 || idx >= pool.Count) + continue; + if (!fn(pool[idx])) + break; + } + } + finally + { + _mu.ExitReadLock(); + } + } } diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.cs index ae0aee9..fc6abad 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.cs @@ -254,6 +254,7 @@ public sealed partial class NatsServer : INatsServer private readonly ConcurrentDictionary _nodeToInfo = new(StringComparer.Ordinal); private readonly ConcurrentDictionary _raftNodes = new(StringComparer.Ordinal); private readonly Dictionary _routesToSelf = []; + private string _routeTlsName = string.Empty; private INetResolver? _routeResolver; private readonly ConcurrentDictionary _rateLimitLogging = new(); private readonly Channel _rateLimitLoggingCh; diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Routes/RouteHandler.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Routes/RouteHandler.cs index fd6aa43..6327326 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Routes/RouteHandler.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Routes/RouteHandler.cs @@ -24,4 +24,37 @@ internal static class RouteHandler internal static bool RouteShouldDelayInfo(string accName, ServerOptions opts) => string.IsNullOrEmpty(accName) && opts.Cluster.PoolSize >= 1; + + internal static bool HasSolicitedRoute(IReadOnlyList routes, string accName) + { + foreach (var route in routes) + { + if (route.Route?.DidSolicit != true) + continue; + + var routeAcc = route.Route?.AccName is { Length: > 0 } an + ? System.Text.Encoding.ASCII.GetString(an) + : string.Empty; + if (routeAcc == accName) + return true; + } + return false; + } + + internal static void UpgradeRouteToSolicited(ClientConnection route) + { + if (route.Route is null) + return; + route.Route.DidSolicit = true; + route.Route.Retry = true; + } + + internal static bool HandleDuplicateRoute(ClientConnection existing, ClientConnection incoming) + { + if (existing.IsSolicitedRoute() && !incoming.IsSolicitedRoute()) + return false; + if (!existing.IsSolicitedRoute() && incoming.IsSolicitedRoute()) + return true; + return incoming.Cid > existing.Cid; + } } diff --git a/porting.db b/porting.db index d4115548f06d8800ba7aa7a545deb238578f57a9..3d585863166bba8a79e3efba141fd62886763103 100644 GIT binary patch delta 4602 zcmai%drVu`9mntE_<3zG&j7Xwq)8wclOOMdCMA!y>myx~cA*J*VZct?g#<7(JcHe^ zU!|(5LZ$JWgck*obQy}GM4IAunzBZ%`p2G)wrTCwY4ovd+00z z{yATM-}|}W^E>C<1A)Mt95_EGpS9~21VP_7Q_(IiTFM7~;!TV4&hihzHK~}`2jUk~R@o`_ob2mA?j7ScW}_#s~}es2{p~t ziG(`CRaZg+<@aP=Q-xe1-VkPT+YNVeLWXMHtJXd8 zI~KQs=6{$UGyTx`8C6z@Pt$;t{e~(%Iwy6#DJyimoUNlhmCVF8wKX<|&y_QssFq(} z%^IVXoJb|uG&`9mi1%7u6#P_IKrcuRg?H!*qJNpu}0s#m1O?r2dp&UV1@LCo$XKqR{UDFP@RL-F7xP^ zgI!XqXW!LZXp=L!8g>-1HaR?FV|}t3`{*wv1GTp>9kmv+iYTWx@=hynyG>XWa>}gF zTHY||_20IXDu>OBroS6!btd_1=_9E@><|_sEq#vuyN>OR_3Q?3Y2S8(i6+-GXHu>o ztY-n$^7dZ6Lc6P2RkW)1wr+MT-rHUJ0yDVD|IdvUa@H_dju!5$VPbp`p%3A#F2!#km2*n7i5K0hA5y}wk2o3}%LOH@}gbIX8gf$5F zAgo1Lhp--@3ZWWd140dg3!xUljZlZM5djeD5gHI05t${zD8rq50Ljqe&e4SzHo(*H?+ME7T17ki&|%OA?U(qAN>_*d~M;g0Z3WdGL` zPlP0s;FO%Zi;gz>yq=-qL9bsqL*hkW8-3p0yN0@gKBK+9r42SughZ`1m}V`DSp}_? z=1tx@f(t&YhCDZ=wNIsyVE&X#vvYadS|$McS9 ztz&7{c!*K0m2T>KvxOMZ+DEdGPid`m^FB`_U46x8)Go)fTKjMsIX=B%t#v5P8lT>f z)=Ia2k#%~0t$i?!91k(5wGL#l4rs0YX;xiy4~O@^=DVbCid-kf;>$vAFjvaiZ@pys z!t#*vJ@XxNo9X)|!PsMX&0y9a)Lmv@vR!gW`c&E>z8vO9>xj52SQ{!aM-Ere={&Z4 zy6BURZaW1=4;0f^ynmgtuqt>60h?bv^#J`sVuh-_yqiPY(At()J(q zx~YHOXDW?ethJ$48Z^zwhn=$*z2%vajZN8*#wc=qsF;HDz5;QQw*R=(P20Zfv)h`R zvpr;wGE-88yoRt#@yOE0nJi?I|nmdGnB+&YtT|3cEUOr8jR{iJX^-e64^t*mlLY zHl5d+HK{sxSICxucZ$RsPb>ZOvd@qXY{?3YZ<}0o+YBdfo5s7(sx*`nJ%^*o?c(2! z8lRia-sx1RV!_93%?&G2tXhgcozj$b->dtqG-T0I2B~t~Q%g%TKAWv!#U)ZQw3LF- z?Oxvg>1}OTVUM~(<_yq&dcUfd$BG}SbZY$y9jgtQRBGxMX?t$x3TLXJz9rjQ#Ltv5 z19>{u#&}-M!P=a4cf~&fh74G${LX>&gQY>YW?i;;Vtod5aHYh}Srf<4gHAmU{GWyi nsZ9L}+JZiZE&G#YkNWO%b;q)R delta 1924 zcmaiyYitx%6vyXrcc(kMv)k_WWxETzEq!huhf)fKLR+9{vD%^(Di*qZYyoMdw4gkc zZh;(QBnFK37)c0^2Czf~ql_e`R^*c=4b&JR5`$>akVpt>V#J8IJ2*0*%zXL(&i~y1 zz4zRS9X=?>Vh82@nYN>XVDC8)tQL=+jZ~LP+MPW|O7$b9V&@t8DoZBiy4XwO*CS2z zUBpvP!SBTjxtH&2?hkkdOR&EmArq%uysOq$85 z&m`c~VNxll6(*H%T53|GGLa$7HE|g?3r(umlYbO{*{LMl6<%jw2yfk%`f04zqv>rC zX^yCmwM%~c;M9b=qTSK{{%DsIEwtA{@mZRkLl!S)A#3I?i}$jHUeX31-hQXW++m^Z z7TRW^7cCV31euANhi<=l%G7^eFMTtwTz%Ccr$_@^wsxo^|0-v@c1f5v zRdYtTN{*{GuGF|%tN(gTy6IT3-rXux#VhN$DvPUbuJYq5%9WAP#g&oK$(51OL8+&u z<^`Js;X%anTp1C)Tp1BPTp1A?xiTW2jn7i^O@KZSzg2EEwN zdS$x%aaw?4Klr^A`%XVUGVCh3=%XFpYQG-Bggq*3#X2zT|WoD~Wg0 ztu%Imq>b(@ifqhu(pWmnH;gwD*(6DIT7V)KWhd=RVmU_sr%8;7bo5(C9^LY?f~lf5 zJTBX_Mg^84t`SDvecIPr*tN;|iQFuS>S{-0!fa)a{eWcAw2hQZ_qAAE>E(r#7dDE@wX&EZxs$Bjmm3;KmGA6<>Tp&Ed1i`pN#F zxgwX3FFxy2g{+aJw-x@vMa)e(Vg3kxZ@N9y7Uu67pX;SCPl-2Y6tQ~pHWoQ7 zP>Zk#VKKrIggS(!2#+E>hER{N4B>Hv281UNo`+g+7UVsIuW`MqUWHS#nM$ri{NQ?k81a|X4i=GPiK>Q-0{1k QMdyELl|yei-+j{l4;Ug?t^fc4