feat(batch23): implement route protocol and info/perms foundation

This commit is contained in:
Joseph Doherty
2026-02-28 21:09:00 -05:00
parent f2bc957229
commit e51cdd64f4
7 changed files with 381 additions and 0 deletions

View File

@@ -0,0 +1,171 @@
// Copyright 2012-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
using System.Text;
using System.Text.Json;
using ZB.MOM.NatsNet.Server.Auth;
using ZB.MOM.NatsNet.Server.Internal;
using ZB.MOM.NatsNet.Server.Protocol;
namespace ZB.MOM.NatsNet.Server;
public sealed partial class ClientConnection
{
internal void RemoveReplySub(Subscription? sub)
{
if (sub?.Sid is not { Length: > 0 } sid || Server is not NatsServer server)
return;
var sidText = Encoding.ASCII.GetString(sid);
var sep = sidText.IndexOf(' ');
if (sep <= 0)
return;
var accountName = sidText[..sep];
var (account, _) = server.LookupAccount(accountName);
account?.Sublist?.Remove(sub);
lock (_mu)
{
Subs.Remove(sidText);
}
}
internal Exception? ProcessAccountSub(byte[] arg)
{
_ = arg;
// Gateway account-sub propagation is owned by gateway sessions.
return null;
}
internal void ProcessAccountUnsub(byte[] arg)
{
_ = arg;
// Gateway account-unsub propagation is owned by gateway sessions.
}
internal Exception? ProcessRoutedOriginClusterMsgArgs(byte[] arg) =>
ProtocolParser.ProcessRoutedOriginClusterMsgArgs(ParseCtx, arg);
internal Exception? ProcessRoutedHeaderMsgArgs(byte[] arg) =>
ProtocolParser.ProcessRoutedHeaderMsgArgs(ParseCtx, arg);
internal Exception? ProcessRoutedMsgArgs(byte[] arg) =>
ProtocolParser.ProcessRoutedMsgArgs(ParseCtx, arg);
internal void ProcessInboundRoutedMsg(byte[] msg)
{
_in.Msgs++;
_in.Bytes += Math.Max(0, msg.Length - 2);
if (Opts.Verbose)
SendOK();
var pa = ParseCtx.Pa;
if (pa.Subject is null)
return;
var (acc, result) = GetAccAndResultFromCache();
if (acc is null)
return;
if ((result?.PSubs.Count ?? 0) + (result?.QSubs.Count ?? 0) > 0)
ProcessMsgResults(acc, result, msg, null, pa.Subject, pa.Reply, PmrFlags.None);
}
internal Exception? SendRouteConnect(string clusterName, bool tlsRequired)
{
var user = string.Empty;
var pass = string.Empty;
var routeUrl = Route?.Url;
if (routeUrl is not null && !string.IsNullOrEmpty(routeUrl.UserInfo))
{
var userInfo = routeUrl.UserInfo.Split(':', 2);
user = userInfo[0];
if (userInfo.Length > 1)
pass = userInfo[1];
}
if (Server is not NatsServer server)
return new InvalidOperationException("route server unavailable");
var connect = new ConnectInfo
{
Echo = true,
Verbose = false,
Pedantic = false,
User = user,
Pass = pass,
Tls = tlsRequired,
Name = server.ID(),
Headers = server.SupportsHeaders(),
Cluster = clusterName,
Dynamic = server.IsClusterNameDynamic(),
Lnoc = true,
};
var payload = JsonSerializer.Serialize(connect);
EnqueueProto(Encoding.ASCII.GetBytes($"CONNECT {payload}\r\n"));
return null;
}
internal void ProcessRouteInfo(ServerInfo info)
{
if (Server is not NatsServer server)
return;
lock (_mu)
{
Route ??= new Route();
if (Flags.IsSet(ClientFlags.InfoReceived))
{
Opts.Import = info.Import;
Opts.Export = info.Export;
}
Route.RemoteId = info.Id;
Route.RemoteName = info.Name;
Route.AuthRequired = info.AuthRequired;
Route.TlsRequired = info.TlsRequired;
Route.GatewayUrl = info.GatewayUrl ?? string.Empty;
Route.Lnoc = info.Lnoc;
Route.Lnocu = info.Lnocu;
Route.JetStream = info.JetStream;
Route.ConnectUrls = info.ClientConnectUrls?.ToList() ?? [];
Route.WsConnUrls = info.WsConnectUrls?.ToList() ?? [];
Route.LeafnodeUrl = info.LeafNodeUrls is { Length: 1 } leaf ? leaf[0] : string.Empty;
Route.Hash = NatsServer.GetHash(info.Name);
Route.IdHash = NatsServer.GetHash(info.Id);
Opts.Protocol = info.Proto;
Headers = server.SupportsHeaders() && info.Headers;
Flags |= ClientFlags.InfoReceived;
}
if (NatsServer.NeedsCompression(server.GetOpts().Cluster.Compression.Mode))
_ = server.NegotiateRouteCompression(this, Route?.DidSolicit == true, Route?.AccName is { Length: > 0 } an ? Encoding.ASCII.GetString(an) : string.Empty, info.Compression ?? string.Empty, server.GetOpts());
server.UpdateRemoteRoutePerms(this, info);
}
internal bool CanImport(string subject) => PubAllowedFullCheck(subject, fullCheck: false, hasLock: true);
internal bool CanExport(string subject) => CanSubscribe(subject);
internal void SetRoutePermissions(RoutePermissions? perms)
{
if (perms is null)
{
Perms = null;
MPerms = null;
return;
}
SetPermissions(new Permissions
{
Publish = perms.Import?.Clone(),
Subscribe = perms.Export?.Clone(),
});
}
}

View File

@@ -112,6 +112,7 @@ public sealed partial class ClientConnection
// Client options (from CONNECT message).
internal ClientOptions Opts = ClientOptions.Default;
internal Route? Route;
// Flags and state.
internal ClientFlags Flags; // mirrors c.flags clientFlag

View File

@@ -0,0 +1,8 @@
// Copyright 2012-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
namespace ZB.MOM.NatsNet.Server;
public sealed partial class NatsServer
{
}

View File

@@ -0,0 +1,183 @@
// Copyright 2012-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
using System.Text.Json;
using ZB.MOM.NatsNet.Server.Auth;
using ZB.MOM.NatsNet.Server.Internal;
namespace ZB.MOM.NatsNet.Server;
public sealed partial class NatsServer
{
internal bool NegotiateRouteCompression(
ClientConnection c,
bool didSolicit,
string accName,
string infoCompression,
ServerOptions opts)
{
var mode = SelectCompressionMode(opts.Cluster.Compression.Mode, infoCompression);
lock (c)
{
c.Route ??= new Route();
if (mode == CompressionMode.S2Auto)
{
if (c.Rtt == TimeSpan.Zero)
c.Rtt = ClientConnection.ComputeRtt(c.Start);
mode = SelectS2AutoModeBasedOnRtt(c.Rtt, opts.Cluster.Compression.RttThresholds);
}
c.Route.Compression = mode;
}
if (!NeedsCompression(mode))
return false;
var info = CopyInfo();
info.Compression = CompressionModeForInfoProtocol(opts.Cluster.Compression, mode);
if (!string.IsNullOrEmpty(accName))
info.RouteAccount = accName;
var proto = GenerateInfoJson(info);
lock (c)
{
if (didSolicit)
c.EnqueueProto(proto);
else
c.EnqueueProto(proto);
c.SetFirstPingTimer();
}
return true;
}
internal void UpdateRemoteRoutePerms(ClientConnection c, ServerInfo info)
{
SubjectPermission? oldExport;
lock (c)
{
oldExport = c.Opts.Export?.Clone();
c.Opts.Import = info.Import?.Clone();
c.Opts.Export = info.Export?.Clone();
}
// Build old/new export checkers to preserve route permission semantics.
var oldTester = new ClientConnection(ClientKind.Router) { Route = new Route() };
oldTester.SetRoutePermissions(new RoutePermissions { Export = oldExport });
var newTester = new ClientConnection(ClientKind.Router) { Route = new Route() };
newTester.SetRoutePermissions(new RoutePermissions { Export = info.Export?.Clone() });
if (oldExport is null && info.Export is null)
return;
// Subscription fanout wiring for route permission delta is completed in group 2.
_ = oldTester;
_ = newTester;
}
internal void ProcessImplicitRoute(ServerInfo info, bool routeNoPool)
{
var remoteId = info.Id;
_mu.EnterWriteLock();
try
{
if (remoteId == _info.Id)
return;
var opts = GetOpts();
if (!string.IsNullOrEmpty(info.RouteAccount))
{
if (opts.Cluster.PoolSize <= 0)
return;
if (_accRoutes != null
&& _accRoutes.TryGetValue(info.RouteAccount, out var remotes)
&& remotes.TryGetValue(remoteId, out var existing)
&& existing != null)
{
return;
}
}
else if (_routes.ContainsKey(remoteId))
{
return;
}
if (HasThisRouteConfigured(info))
return;
}
finally
{
_mu.ExitWriteLock();
}
if (!Uri.TryCreate(info.Ip, UriKind.Absolute, out var _))
return;
if (routeNoPool && string.IsNullOrEmpty(info.RouteAccount))
Debugf("Implicit route from non-pooling remote {0} processed", info.Id);
}
internal bool HasThisRouteConfigured(ServerInfo info)
{
var routes = GetOpts().Routes;
if (routes.Count == 0)
return false;
var infoPort = info.Port <= 0 ? 6222 : info.Port;
var primary = $"{info.Host}:{infoPort}".ToLowerInvariant();
var secondary = string.Empty;
if (!string.IsNullOrWhiteSpace(info.Ip) && Uri.TryCreate(info.Ip, UriKind.Absolute, out var infoUri))
{
var host = infoUri.Host;
if (!string.IsNullOrWhiteSpace(host))
secondary = $"{host}:{infoPort}".ToLowerInvariant();
}
foreach (var route in routes)
{
var routePort = route.IsDefaultPort ? infoPort : route.Port;
var hostPort = $"{route.Host}:{routePort}".ToLowerInvariant();
if (hostPort == primary || (!string.IsNullOrEmpty(secondary) && hostPort == secondary))
return true;
}
return false;
}
internal void ForwardNewRouteInfoToKnownServers(ServerInfo info, RouteType routeType, bool didSolicit, byte localGossipMode)
{
var fromGossip = didSolicit && routeType == RouteType.Implicit;
if ((fromGossip && localGossipMode != GossipMode.Override) || info.GossipMode == GossipMode.Disabled)
return;
info.Nonce = string.Empty;
byte[] BuildInfo(byte gossipMode)
{
info.GossipMode = gossipMode;
return GenerateInfoJson(info);
}
byte[]? infoDefault = null;
byte[]? infoDisabled = null;
byte[]? infoOverride = null;
byte[] SelectInfo(ClientConnection route)
{
var rType = route.Route?.RouteType ?? RouteType.Implicit;
if ((!didSolicit && rType == RouteType.Explicit) || (didSolicit && routeType == RouteType.Explicit))
return infoOverride ??= BuildInfo(GossipMode.Override);
if (!didSolicit)
return infoDisabled ??= BuildInfo(GossipMode.Disabled);
return infoDefault ??= BuildInfo(GossipMode.Default);
}
ForEachRemote(route =>
{
if (route.Route?.RemoteId == info.Id)
return;
route.EnqueueProto(SelectInfo(route));
});
}
}

View File

@@ -0,0 +1,8 @@
// Copyright 2012-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
namespace ZB.MOM.NatsNet.Server;
public sealed partial class NatsServer
{
}

View File

@@ -0,0 +1,10 @@
// Copyright 2012-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
namespace ZB.MOM.NatsNet.Server;
internal static class RouteHandler
{
internal static int ComputeRoutePoolIdx(int poolSize, string accountName) =>
NatsServer.ComputeRoutePoolIdx(poolSize, accountName);
}