From e51cdd64f4f6176927e48b0a2cdb2c7b55e515bc Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 21:09:00 -0500 Subject: [PATCH] feat(batch23): implement route protocol and info/perms foundation --- .../ClientConnection.Routes.cs | 171 ++++++++++++++++ .../ZB.MOM.NatsNet.Server/ClientConnection.cs | 1 + .../NatsServer.Routes.Connections.cs | 8 + .../NatsServer.Routes.InfoAndPerms.cs | 183 ++++++++++++++++++ .../NatsServer.Routes.Subscriptions.cs | 8 + .../Routes/RouteHandler.cs | 10 + porting.db | Bin 6721536 -> 6729728 bytes 7 files changed, 381 insertions(+) create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.Routes.cs create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Routes.Connections.cs create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Routes.InfoAndPerms.cs create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Routes.Subscriptions.cs create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/Routes/RouteHandler.cs diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.Routes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.Routes.cs new file mode 100644 index 0000000..8329317 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.Routes.cs @@ -0,0 +1,171 @@ +// Copyright 2012-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); + +using System.Text; +using System.Text.Json; +using ZB.MOM.NatsNet.Server.Auth; +using ZB.MOM.NatsNet.Server.Internal; +using ZB.MOM.NatsNet.Server.Protocol; + +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class ClientConnection +{ + internal void RemoveReplySub(Subscription? sub) + { + if (sub?.Sid is not { Length: > 0 } sid || Server is not NatsServer server) + return; + + var sidText = Encoding.ASCII.GetString(sid); + var sep = sidText.IndexOf(' '); + if (sep <= 0) + return; + + var accountName = sidText[..sep]; + var (account, _) = server.LookupAccount(accountName); + account?.Sublist?.Remove(sub); + + lock (_mu) + { + Subs.Remove(sidText); + } + } + + internal Exception? ProcessAccountSub(byte[] arg) + { + _ = arg; + // Gateway account-sub propagation is owned by gateway sessions. + return null; + } + + internal void ProcessAccountUnsub(byte[] arg) + { + _ = arg; + // Gateway account-unsub propagation is owned by gateway sessions. + } + + internal Exception? ProcessRoutedOriginClusterMsgArgs(byte[] arg) => + ProtocolParser.ProcessRoutedOriginClusterMsgArgs(ParseCtx, arg); + + internal Exception? ProcessRoutedHeaderMsgArgs(byte[] arg) => + ProtocolParser.ProcessRoutedHeaderMsgArgs(ParseCtx, arg); + + internal Exception? ProcessRoutedMsgArgs(byte[] arg) => + ProtocolParser.ProcessRoutedMsgArgs(ParseCtx, arg); + + internal void ProcessInboundRoutedMsg(byte[] msg) + { + _in.Msgs++; + _in.Bytes += Math.Max(0, msg.Length - 2); + + if (Opts.Verbose) + SendOK(); + + var pa = ParseCtx.Pa; + if (pa.Subject is null) + return; + + var (acc, result) = GetAccAndResultFromCache(); + if (acc is null) + return; + + if ((result?.PSubs.Count ?? 0) + (result?.QSubs.Count ?? 0) > 0) + ProcessMsgResults(acc, result, msg, null, pa.Subject, pa.Reply, PmrFlags.None); + } + + internal Exception? SendRouteConnect(string clusterName, bool tlsRequired) + { + var user = string.Empty; + var pass = string.Empty; + var routeUrl = Route?.Url; + if (routeUrl is not null && !string.IsNullOrEmpty(routeUrl.UserInfo)) + { + var userInfo = routeUrl.UserInfo.Split(':', 2); + user = userInfo[0]; + if (userInfo.Length > 1) + pass = userInfo[1]; + } + + if (Server is not NatsServer server) + return new InvalidOperationException("route server unavailable"); + + var connect = new ConnectInfo + { + Echo = true, + Verbose = false, + Pedantic = false, + User = user, + Pass = pass, + Tls = tlsRequired, + Name = server.ID(), + Headers = server.SupportsHeaders(), + Cluster = clusterName, + Dynamic = server.IsClusterNameDynamic(), + Lnoc = true, + }; + + var payload = JsonSerializer.Serialize(connect); + EnqueueProto(Encoding.ASCII.GetBytes($"CONNECT {payload}\r\n")); + return null; + } + + internal void ProcessRouteInfo(ServerInfo info) + { + if (Server is not NatsServer server) + return; + + lock (_mu) + { + Route ??= new Route(); + + if (Flags.IsSet(ClientFlags.InfoReceived)) + { + Opts.Import = info.Import; + Opts.Export = info.Export; + } + + Route.RemoteId = info.Id; + Route.RemoteName = info.Name; + Route.AuthRequired = info.AuthRequired; + Route.TlsRequired = info.TlsRequired; + Route.GatewayUrl = info.GatewayUrl ?? string.Empty; + Route.Lnoc = info.Lnoc; + Route.Lnocu = info.Lnocu; + Route.JetStream = info.JetStream; + Route.ConnectUrls = info.ClientConnectUrls?.ToList() ?? []; + Route.WsConnUrls = info.WsConnectUrls?.ToList() ?? []; + Route.LeafnodeUrl = info.LeafNodeUrls is { Length: 1 } leaf ? leaf[0] : string.Empty; + Route.Hash = NatsServer.GetHash(info.Name); + Route.IdHash = NatsServer.GetHash(info.Id); + + Opts.Protocol = info.Proto; + Headers = server.SupportsHeaders() && info.Headers; + Flags |= ClientFlags.InfoReceived; + } + + if (NatsServer.NeedsCompression(server.GetOpts().Cluster.Compression.Mode)) + _ = server.NegotiateRouteCompression(this, Route?.DidSolicit == true, Route?.AccName is { Length: > 0 } an ? Encoding.ASCII.GetString(an) : string.Empty, info.Compression ?? string.Empty, server.GetOpts()); + + server.UpdateRemoteRoutePerms(this, info); + } + + internal bool CanImport(string subject) => PubAllowedFullCheck(subject, fullCheck: false, hasLock: true); + + internal bool CanExport(string subject) => CanSubscribe(subject); + + internal void SetRoutePermissions(RoutePermissions? perms) + { + if (perms is null) + { + Perms = null; + MPerms = null; + return; + } + + SetPermissions(new Permissions + { + Publish = perms.Import?.Clone(), + Subscribe = perms.Export?.Clone(), + }); + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs index 9066d2e..030d937 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs @@ -112,6 +112,7 @@ public sealed partial class ClientConnection // Client options (from CONNECT message). internal ClientOptions Opts = ClientOptions.Default; + internal Route? Route; // Flags and state. internal ClientFlags Flags; // mirrors c.flags clientFlag diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Routes.Connections.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Routes.Connections.cs new file mode 100644 index 0000000..e5654ae --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Routes.Connections.cs @@ -0,0 +1,8 @@ +// Copyright 2012-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); + +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class NatsServer +{ +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Routes.InfoAndPerms.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Routes.InfoAndPerms.cs new file mode 100644 index 0000000..5253380 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Routes.InfoAndPerms.cs @@ -0,0 +1,183 @@ +// Copyright 2012-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); + +using System.Text.Json; +using ZB.MOM.NatsNet.Server.Auth; +using ZB.MOM.NatsNet.Server.Internal; + +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class NatsServer +{ + internal bool NegotiateRouteCompression( + ClientConnection c, + bool didSolicit, + string accName, + string infoCompression, + ServerOptions opts) + { + var mode = SelectCompressionMode(opts.Cluster.Compression.Mode, infoCompression); + + lock (c) + { + c.Route ??= new Route(); + if (mode == CompressionMode.S2Auto) + { + if (c.Rtt == TimeSpan.Zero) + c.Rtt = ClientConnection.ComputeRtt(c.Start); + mode = SelectS2AutoModeBasedOnRtt(c.Rtt, opts.Cluster.Compression.RttThresholds); + } + c.Route.Compression = mode; + } + + if (!NeedsCompression(mode)) + return false; + + var info = CopyInfo(); + info.Compression = CompressionModeForInfoProtocol(opts.Cluster.Compression, mode); + if (!string.IsNullOrEmpty(accName)) + info.RouteAccount = accName; + var proto = GenerateInfoJson(info); + + lock (c) + { + if (didSolicit) + c.EnqueueProto(proto); + else + c.EnqueueProto(proto); + c.SetFirstPingTimer(); + } + + return true; + } + + internal void UpdateRemoteRoutePerms(ClientConnection c, ServerInfo info) + { + SubjectPermission? oldExport; + lock (c) + { + oldExport = c.Opts.Export?.Clone(); + c.Opts.Import = info.Import?.Clone(); + c.Opts.Export = info.Export?.Clone(); + } + + // Build old/new export checkers to preserve route permission semantics. + var oldTester = new ClientConnection(ClientKind.Router) { Route = new Route() }; + oldTester.SetRoutePermissions(new RoutePermissions { Export = oldExport }); + var newTester = new ClientConnection(ClientKind.Router) { Route = new Route() }; + newTester.SetRoutePermissions(new RoutePermissions { Export = info.Export?.Clone() }); + + if (oldExport is null && info.Export is null) + return; + + // Subscription fanout wiring for route permission delta is completed in group 2. + _ = oldTester; + _ = newTester; + } + + internal void ProcessImplicitRoute(ServerInfo info, bool routeNoPool) + { + var remoteId = info.Id; + _mu.EnterWriteLock(); + try + { + if (remoteId == _info.Id) + return; + + var opts = GetOpts(); + if (!string.IsNullOrEmpty(info.RouteAccount)) + { + if (opts.Cluster.PoolSize <= 0) + return; + if (_accRoutes != null + && _accRoutes.TryGetValue(info.RouteAccount, out var remotes) + && remotes.TryGetValue(remoteId, out var existing) + && existing != null) + { + return; + } + } + else if (_routes.ContainsKey(remoteId)) + { + return; + } + + if (HasThisRouteConfigured(info)) + return; + } + finally + { + _mu.ExitWriteLock(); + } + + if (!Uri.TryCreate(info.Ip, UriKind.Absolute, out var _)) + return; + + if (routeNoPool && string.IsNullOrEmpty(info.RouteAccount)) + Debugf("Implicit route from non-pooling remote {0} processed", info.Id); + } + + internal bool HasThisRouteConfigured(ServerInfo info) + { + var routes = GetOpts().Routes; + if (routes.Count == 0) + return false; + + var infoPort = info.Port <= 0 ? 6222 : info.Port; + var primary = $"{info.Host}:{infoPort}".ToLowerInvariant(); + + var secondary = string.Empty; + if (!string.IsNullOrWhiteSpace(info.Ip) && Uri.TryCreate(info.Ip, UriKind.Absolute, out var infoUri)) + { + var host = infoUri.Host; + if (!string.IsNullOrWhiteSpace(host)) + secondary = $"{host}:{infoPort}".ToLowerInvariant(); + } + + foreach (var route in routes) + { + var routePort = route.IsDefaultPort ? infoPort : route.Port; + var hostPort = $"{route.Host}:{routePort}".ToLowerInvariant(); + if (hostPort == primary || (!string.IsNullOrEmpty(secondary) && hostPort == secondary)) + return true; + } + + return false; + } + + internal void ForwardNewRouteInfoToKnownServers(ServerInfo info, RouteType routeType, bool didSolicit, byte localGossipMode) + { + var fromGossip = didSolicit && routeType == RouteType.Implicit; + if ((fromGossip && localGossipMode != GossipMode.Override) || info.GossipMode == GossipMode.Disabled) + return; + + info.Nonce = string.Empty; + + byte[] BuildInfo(byte gossipMode) + { + info.GossipMode = gossipMode; + return GenerateInfoJson(info); + } + + byte[]? infoDefault = null; + byte[]? infoDisabled = null; + byte[]? infoOverride = null; + + byte[] SelectInfo(ClientConnection route) + { + var rType = route.Route?.RouteType ?? RouteType.Implicit; + if ((!didSolicit && rType == RouteType.Explicit) || (didSolicit && routeType == RouteType.Explicit)) + return infoOverride ??= BuildInfo(GossipMode.Override); + if (!didSolicit) + return infoDisabled ??= BuildInfo(GossipMode.Disabled); + return infoDefault ??= BuildInfo(GossipMode.Default); + } + + ForEachRemote(route => + { + if (route.Route?.RemoteId == info.Id) + return; + route.EnqueueProto(SelectInfo(route)); + }); + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Routes.Subscriptions.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Routes.Subscriptions.cs new file mode 100644 index 0000000..e5654ae --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Routes.Subscriptions.cs @@ -0,0 +1,8 @@ +// Copyright 2012-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); + +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class NatsServer +{ +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Routes/RouteHandler.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Routes/RouteHandler.cs new file mode 100644 index 0000000..ea98da3 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Routes/RouteHandler.cs @@ -0,0 +1,10 @@ +// Copyright 2012-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); + +namespace ZB.MOM.NatsNet.Server; + +internal static class RouteHandler +{ + internal static int ComputeRoutePoolIdx(int poolSize, string accountName) => + NatsServer.ComputeRoutePoolIdx(poolSize, accountName); +} diff --git a/porting.db b/porting.db index dc90e94bc0fcdf2ea8fb99c53b9c56fbbe3d31cf..5609464497ff31c6d531fda02cdd512783da8e4e 100644 GIT binary patch delta 5562 zcmbVQYitzP6`q+Ld!M#<4aN|AXV<|zf+617hu05cLmmW(iP!H3#@@AeO$gypKWbHq zgq_Dll`16=;~OzJJtS%rPKmR72e_|e|oS;qHn z#x`q9$GV^A==<(D=iYNJOu$Lg#Oz7a+a)=tIWBkqF>eEZ`jhCEO8%?YPgkm^EBWX- z(R`Wv9nP8MD@E~)2-xQoym&S$ys3UV%k#W?<0k)wP?~3Q3xkIa9DXG+ny^P0)TI96 z7Jthma3A6K|IVK~VvjMOqYUb2P#=T#FsPS7Jq+q*P#1&RI~mx)pmqkeF{qV6EevW_ z*M7_YWz1~7z?~9*%i%3f!H>Po&exKex`8KRLb-Y|Cfqk4;W*(;JTa6Q9!|vVF@iux zQG$-^sGp!Wbks-C_jR;~pqF&iOHhxFdI)ONQ8z*LI_lz%aDq>Vo#f-=I_e;3xsKWi z%GFUD$MN@d)Jo8|I%*;4x{jI&x~!uJL6>yY#Bu6RUlu$yUUCMma?;4?*nmAsR?AbX zezLNqR((u7x>5!&kQ zBw1+=lBhHXNm81FBq+_%MOGRI$xE7}L;dKC&|7Bt*kL7q%iXU!t_l%ebQCzO*!87Q zufBIp*l%-?PcJKt-EQ@n&T<8uEC@6F%>tJ#VLhK8%R6O1$lc~- z%RTd(xx+d8M3O!I2Mg6=CqDK@iG(A65LV!;VbP4Is>M?5-6`gp{k6Vutv`%EuNJ2? zui2vx7oMmQ*Dwz1S3dDF?d^RLzaQ!hP z-p_P3hs8}S;nuLYoAM%AVdFh|zD6XLLoAn=nV5x`m6(l~otT4I9nj^cu~_Fvn+v8}LtllNzHd(OYaZKjp{ zEe`X731ic7={MS5o!y>7{NT9s1QkrckL7oyKWPqd%HhP}DXENgNK8pzQVxj|84hYQ zmNxUaESBTOmoA!Y__P06ZTRB5l8@r?ht&t~N&@Xxep0GrJ)$S2O=*v4&_fQ=f2|h$ zD@BqRH+)x-u2OP0>O2m7@07Hf^(vc@_NKh1m*?2=xfw}LeRrl0?PPSi%f<0^gShY2 zl2ZJ$6H+f#QgVoL@UCge&2XxBXQXrK4Sx1xsha(G;caObkH@E_aXsHI?8eeM#tngl zz=ObxpbWv|2v#E~N3aILS_JD5tVi$!f(-~N5Nt%S3Bi*HDiQb)$O!xh0tkW#LI|o5 zgb`FDs6kMRpbo)i1WzHTN6>&^3xcf(o<{Htf@cvthhQ6m=Mij2umizP1TP?HM6e6N ziwK$!L=ZG1XhG15pbbGgf(`_o2)Yn-Bj`b}8$mCEJqY>`^dpEOh#?q2Fo+<20TSY@ zX&Lu>Y@Qf$+e{JRtdP$iAh^uSzgIlzRzfz;Ex0EOiuwaXiP-4aaAL%ckNtah33gs{ zu9jL3jg2Noo*W#h4b~((=|F6B@Fjo1yLb4|*kKueZ$rEcPn=KWN~C(h+E5i?3>q>1 zvUuRTj8zLUdgd4d28_+P>{VeSp7??6qBrS#L`uH_WdvUvk9~(S5O_pN*?_Vsxy#2e zd_GW^xm9^V-ee0;=2N}*`cQ$RDvz}2V(t!C(4T)X-<|hE$KM-z)Ko(b(`_@?eX5N!>>inE!cP!u?-sbx)3(NlAIZnmkG4v__{^BWwml{|KX~vB$9_LwfzcZhk-|5PTTM&&mMXE$NJ78nlteI`l>}9i=L{sMdrxb~OVxPRK!a)oltv)9 zRE=j0G%`B_B_wAjq?cV%p4Nn1Ofz@IdrPK&BxnuDGj~6WjhWI9ol*@9!U%z zNDVeQ7+#D&HQClE^#=TVs1~u1h`iwV)Q4TU@{}gxri19!M6Q&?3R976Fpea=U?BR4 zE>|`iXy7sz8PT&RRi)1>>cJ9T^bV#1b}L~6#?0|V@h*P1)4Wv% zywo(JDO3Nbp6>zp@B}AcDi&LmE7+Mom3Jd=o#U|mlHF;Gs^l%Q-BwHx@F|(X$ zQuKRdNpm*fCpxVLI#kzUM6&C++K5Vtv>1p`ZHy7g?q#Km1e9h?#6xF{>T8>gyMY|c z?zyE2lBZlii5LiGHoEy#{n@=JbXC97q^Y{;Ff%(}opG$;>_XK?reAr{Ks2Kj&Z8Mx Z%Aq0Y^((s!G^yS=k6xB9^N0Tl^nX77B$EID delta 1820 zcmZ9~Yfw~W7zglkdG~hM3%3=J-B%U_R8%f|9#H;ob$fB zjpJ={V^^Ep6l>@d1mn)bacjiRFMRo_V&B2;RJA)*^z|{;C7dL~xR^>`kNY;#b)O@% zrCwq*J}%Bs--NhOy*?ohN-oo|;Hr83)%sWbyZp*EB)gujssoeaq%4VVQ_pR&Z=dqB zO?FFqdrA<5AwAtgI-#e#NXPVa2dPa@w~^k}({D(7_4F%Jy`F9%RqLr_5^;+jZz2`y z=@+D2Jxw4j(bG86<1^ksdVIz)HR-N+ZMRA37kb$Rp`};qr_*a4HtGlqjiCXbbX`4N zB|S9ln>~81zs}##;I9p-3QE49R2h_Z1f}gksUj$q2c@#0w9OVf_w`$Ynx$l~ma5hM zmnBbLoO0SWD)2XU?XFf%p^6Mt0~XmS1**&HUmr`iEUT4o?G?hRz|s(^j6ii3)uKQ( zh)NrBM&CMxH6_r!NAr56EvuDL+@v;R1eG>p7?n2T5-M%Rcc`=vIgd)4aZWwfD^+;R z$E;4Q8F#AIQE8({Raw#N)Q`N5J<1{Hn4~(#LE{@*sD&YvtcphKtyXvxF8noUywrHW@`l1XR`Mw`w_(atQ^Lz{ZS-PCx~6;6#8B?GO=W=S-(FytemRP7?x4xCCNmMc`Q+DWTdfD%E-3Pos6!D z&ZaS!_Q>yPteK{rMmNcL_j1s zVID+5G{nGshy@q8)#G`*q%_*N$9CV)Z1aTN7fvWX^COd&`Q>y;rt~H!saHFAu}CMH z_<0I{WN}me0o-=)VZK$6Y>vFo-$rlA) zrWPLIt3>Ml!;nm0{cBO^r4}AV7g~4>H4Vvj^+5|4MQW%uCQ}UYRI2|+u`5bz1^zq* z2UAkWGayG$*54M#X9tLjG`%FkO}@+pVbph&Cj~~(gwQ21D)1G2FoE8(jXl&U