diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.Routes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.Routes.cs new file mode 100644 index 0000000..8329317 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.Routes.cs @@ -0,0 +1,171 @@ +// Copyright 2012-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); + +using System.Text; +using System.Text.Json; +using ZB.MOM.NatsNet.Server.Auth; +using ZB.MOM.NatsNet.Server.Internal; +using ZB.MOM.NatsNet.Server.Protocol; + +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class ClientConnection +{ + internal void RemoveReplySub(Subscription? sub) + { + if (sub?.Sid is not { Length: > 0 } sid || Server is not NatsServer server) + return; + + var sidText = Encoding.ASCII.GetString(sid); + var sep = sidText.IndexOf(' '); + if (sep <= 0) + return; + + var accountName = sidText[..sep]; + var (account, _) = server.LookupAccount(accountName); + account?.Sublist?.Remove(sub); + + lock (_mu) + { + Subs.Remove(sidText); + } + } + + internal Exception? ProcessAccountSub(byte[] arg) + { + _ = arg; + // Gateway account-sub propagation is owned by gateway sessions. + return null; + } + + internal void ProcessAccountUnsub(byte[] arg) + { + _ = arg; + // Gateway account-unsub propagation is owned by gateway sessions. + } + + internal Exception? ProcessRoutedOriginClusterMsgArgs(byte[] arg) => + ProtocolParser.ProcessRoutedOriginClusterMsgArgs(ParseCtx, arg); + + internal Exception? ProcessRoutedHeaderMsgArgs(byte[] arg) => + ProtocolParser.ProcessRoutedHeaderMsgArgs(ParseCtx, arg); + + internal Exception? ProcessRoutedMsgArgs(byte[] arg) => + ProtocolParser.ProcessRoutedMsgArgs(ParseCtx, arg); + + internal void ProcessInboundRoutedMsg(byte[] msg) + { + _in.Msgs++; + _in.Bytes += Math.Max(0, msg.Length - 2); + + if (Opts.Verbose) + SendOK(); + + var pa = ParseCtx.Pa; + if (pa.Subject is null) + return; + + var (acc, result) = GetAccAndResultFromCache(); + if (acc is null) + return; + + if ((result?.PSubs.Count ?? 0) + (result?.QSubs.Count ?? 0) > 0) + ProcessMsgResults(acc, result, msg, null, pa.Subject, pa.Reply, PmrFlags.None); + } + + internal Exception? SendRouteConnect(string clusterName, bool tlsRequired) + { + var user = string.Empty; + var pass = string.Empty; + var routeUrl = Route?.Url; + if (routeUrl is not null && !string.IsNullOrEmpty(routeUrl.UserInfo)) + { + var userInfo = routeUrl.UserInfo.Split(':', 2); + user = userInfo[0]; + if (userInfo.Length > 1) + pass = userInfo[1]; + } + + if (Server is not NatsServer server) + return new InvalidOperationException("route server unavailable"); + + var connect = new ConnectInfo + { + Echo = true, + Verbose = false, + Pedantic = false, + User = user, + Pass = pass, + Tls = tlsRequired, + Name = server.ID(), + Headers = server.SupportsHeaders(), + Cluster = clusterName, + Dynamic = server.IsClusterNameDynamic(), + Lnoc = true, + }; + + var payload = JsonSerializer.Serialize(connect); + EnqueueProto(Encoding.ASCII.GetBytes($"CONNECT {payload}\r\n")); + return null; + } + + internal void ProcessRouteInfo(ServerInfo info) + { + if (Server is not NatsServer server) + return; + + lock (_mu) + { + Route ??= new Route(); + + if (Flags.IsSet(ClientFlags.InfoReceived)) + { + Opts.Import = info.Import; + Opts.Export = info.Export; + } + + Route.RemoteId = info.Id; + Route.RemoteName = info.Name; + Route.AuthRequired = info.AuthRequired; + Route.TlsRequired = info.TlsRequired; + Route.GatewayUrl = info.GatewayUrl ?? string.Empty; + Route.Lnoc = info.Lnoc; + Route.Lnocu = info.Lnocu; + Route.JetStream = info.JetStream; + Route.ConnectUrls = info.ClientConnectUrls?.ToList() ?? []; + Route.WsConnUrls = info.WsConnectUrls?.ToList() ?? []; + Route.LeafnodeUrl = info.LeafNodeUrls is { Length: 1 } leaf ? leaf[0] : string.Empty; + Route.Hash = NatsServer.GetHash(info.Name); + Route.IdHash = NatsServer.GetHash(info.Id); + + Opts.Protocol = info.Proto; + Headers = server.SupportsHeaders() && info.Headers; + Flags |= ClientFlags.InfoReceived; + } + + if (NatsServer.NeedsCompression(server.GetOpts().Cluster.Compression.Mode)) + _ = server.NegotiateRouteCompression(this, Route?.DidSolicit == true, Route?.AccName is { Length: > 0 } an ? Encoding.ASCII.GetString(an) : string.Empty, info.Compression ?? string.Empty, server.GetOpts()); + + server.UpdateRemoteRoutePerms(this, info); + } + + internal bool CanImport(string subject) => PubAllowedFullCheck(subject, fullCheck: false, hasLock: true); + + internal bool CanExport(string subject) => CanSubscribe(subject); + + internal void SetRoutePermissions(RoutePermissions? perms) + { + if (perms is null) + { + Perms = null; + MPerms = null; + return; + } + + SetPermissions(new Permissions + { + Publish = perms.Import?.Clone(), + Subscribe = perms.Export?.Clone(), + }); + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs index 9066d2e..030d937 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs @@ -112,6 +112,7 @@ public sealed partial class ClientConnection // Client options (from CONNECT message). internal ClientOptions Opts = ClientOptions.Default; + internal Route? Route; // Flags and state. internal ClientFlags Flags; // mirrors c.flags clientFlag diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Routes.Connections.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Routes.Connections.cs new file mode 100644 index 0000000..e5654ae --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Routes.Connections.cs @@ -0,0 +1,8 @@ +// Copyright 2012-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); + +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class NatsServer +{ +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Routes.InfoAndPerms.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Routes.InfoAndPerms.cs new file mode 100644 index 0000000..5253380 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Routes.InfoAndPerms.cs @@ -0,0 +1,183 @@ +// Copyright 2012-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); + +using System.Text.Json; +using ZB.MOM.NatsNet.Server.Auth; +using ZB.MOM.NatsNet.Server.Internal; + +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class NatsServer +{ + internal bool NegotiateRouteCompression( + ClientConnection c, + bool didSolicit, + string accName, + string infoCompression, + ServerOptions opts) + { + var mode = SelectCompressionMode(opts.Cluster.Compression.Mode, infoCompression); + + lock (c) + { + c.Route ??= new Route(); + if (mode == CompressionMode.S2Auto) + { + if (c.Rtt == TimeSpan.Zero) + c.Rtt = ClientConnection.ComputeRtt(c.Start); + mode = SelectS2AutoModeBasedOnRtt(c.Rtt, opts.Cluster.Compression.RttThresholds); + } + c.Route.Compression = mode; + } + + if (!NeedsCompression(mode)) + return false; + + var info = CopyInfo(); + info.Compression = CompressionModeForInfoProtocol(opts.Cluster.Compression, mode); + if (!string.IsNullOrEmpty(accName)) + info.RouteAccount = accName; + var proto = GenerateInfoJson(info); + + lock (c) + { + if (didSolicit) + c.EnqueueProto(proto); + else + c.EnqueueProto(proto); + c.SetFirstPingTimer(); + } + + return true; + } + + internal void UpdateRemoteRoutePerms(ClientConnection c, ServerInfo info) + { + SubjectPermission? oldExport; + lock (c) + { + oldExport = c.Opts.Export?.Clone(); + c.Opts.Import = info.Import?.Clone(); + c.Opts.Export = info.Export?.Clone(); + } + + // Build old/new export checkers to preserve route permission semantics. + var oldTester = new ClientConnection(ClientKind.Router) { Route = new Route() }; + oldTester.SetRoutePermissions(new RoutePermissions { Export = oldExport }); + var newTester = new ClientConnection(ClientKind.Router) { Route = new Route() }; + newTester.SetRoutePermissions(new RoutePermissions { Export = info.Export?.Clone() }); + + if (oldExport is null && info.Export is null) + return; + + // Subscription fanout wiring for route permission delta is completed in group 2. + _ = oldTester; + _ = newTester; + } + + internal void ProcessImplicitRoute(ServerInfo info, bool routeNoPool) + { + var remoteId = info.Id; + _mu.EnterWriteLock(); + try + { + if (remoteId == _info.Id) + return; + + var opts = GetOpts(); + if (!string.IsNullOrEmpty(info.RouteAccount)) + { + if (opts.Cluster.PoolSize <= 0) + return; + if (_accRoutes != null + && _accRoutes.TryGetValue(info.RouteAccount, out var remotes) + && remotes.TryGetValue(remoteId, out var existing) + && existing != null) + { + return; + } + } + else if (_routes.ContainsKey(remoteId)) + { + return; + } + + if (HasThisRouteConfigured(info)) + return; + } + finally + { + _mu.ExitWriteLock(); + } + + if (!Uri.TryCreate(info.Ip, UriKind.Absolute, out var _)) + return; + + if (routeNoPool && string.IsNullOrEmpty(info.RouteAccount)) + Debugf("Implicit route from non-pooling remote {0} processed", info.Id); + } + + internal bool HasThisRouteConfigured(ServerInfo info) + { + var routes = GetOpts().Routes; + if (routes.Count == 0) + return false; + + var infoPort = info.Port <= 0 ? 6222 : info.Port; + var primary = $"{info.Host}:{infoPort}".ToLowerInvariant(); + + var secondary = string.Empty; + if (!string.IsNullOrWhiteSpace(info.Ip) && Uri.TryCreate(info.Ip, UriKind.Absolute, out var infoUri)) + { + var host = infoUri.Host; + if (!string.IsNullOrWhiteSpace(host)) + secondary = $"{host}:{infoPort}".ToLowerInvariant(); + } + + foreach (var route in routes) + { + var routePort = route.IsDefaultPort ? infoPort : route.Port; + var hostPort = $"{route.Host}:{routePort}".ToLowerInvariant(); + if (hostPort == primary || (!string.IsNullOrEmpty(secondary) && hostPort == secondary)) + return true; + } + + return false; + } + + internal void ForwardNewRouteInfoToKnownServers(ServerInfo info, RouteType routeType, bool didSolicit, byte localGossipMode) + { + var fromGossip = didSolicit && routeType == RouteType.Implicit; + if ((fromGossip && localGossipMode != GossipMode.Override) || info.GossipMode == GossipMode.Disabled) + return; + + info.Nonce = string.Empty; + + byte[] BuildInfo(byte gossipMode) + { + info.GossipMode = gossipMode; + return GenerateInfoJson(info); + } + + byte[]? infoDefault = null; + byte[]? infoDisabled = null; + byte[]? infoOverride = null; + + byte[] SelectInfo(ClientConnection route) + { + var rType = route.Route?.RouteType ?? RouteType.Implicit; + if ((!didSolicit && rType == RouteType.Explicit) || (didSolicit && routeType == RouteType.Explicit)) + return infoOverride ??= BuildInfo(GossipMode.Override); + if (!didSolicit) + return infoDisabled ??= BuildInfo(GossipMode.Disabled); + return infoDefault ??= BuildInfo(GossipMode.Default); + } + + ForEachRemote(route => + { + if (route.Route?.RemoteId == info.Id) + return; + route.EnqueueProto(SelectInfo(route)); + }); + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Routes.Subscriptions.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Routes.Subscriptions.cs new file mode 100644 index 0000000..e5654ae --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Routes.Subscriptions.cs @@ -0,0 +1,8 @@ +// Copyright 2012-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); + +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class NatsServer +{ +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Routes/RouteHandler.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Routes/RouteHandler.cs new file mode 100644 index 0000000..ea98da3 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Routes/RouteHandler.cs @@ -0,0 +1,10 @@ +// Copyright 2012-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); + +namespace ZB.MOM.NatsNet.Server; + +internal static class RouteHandler +{ + internal static int ComputeRoutePoolIdx(int poolSize, string accountName) => + NatsServer.ComputeRoutePoolIdx(poolSize, accountName); +} diff --git a/porting.db b/porting.db index dc90e94..5609464 100644 Binary files a/porting.db and b/porting.db differ