feat(batch24): complete leaf nodes implementation and verification
This commit is contained in:
540
dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.LeafNodes.cs
Normal file
540
dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.LeafNodes.cs
Normal file
@@ -0,0 +1,540 @@
|
||||
// Copyright 2019-2026 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
|
||||
using System.Net.Security;
|
||||
using System.Text;
|
||||
using System.Text.Json;
|
||||
using System.Linq;
|
||||
using ZB.MOM.NatsNet.Server.Internal;
|
||||
using ZB.MOM.NatsNet.Server.Protocol;
|
||||
|
||||
namespace ZB.MOM.NatsNet.Server;
|
||||
|
||||
public sealed partial class ClientConnection
|
||||
{
|
||||
internal Leaf? Leaf;
|
||||
|
||||
internal bool IsSolicitedLeafNode() => Kind == ClientKind.Leaf && Leaf?.Remote != null;
|
||||
|
||||
internal bool IsSpokeLeafNode() => Kind == ClientKind.Leaf && Leaf?.IsSpoke == true;
|
||||
|
||||
internal bool IsIsolatedLeafNode() => Kind == ClientKind.Leaf && Leaf?.Isolated == true;
|
||||
|
||||
internal Exception? SendLeafConnect(string clusterName, bool headers)
|
||||
{
|
||||
if (Server is not NatsServer server)
|
||||
return new InvalidOperationException("server unavailable for leaf connect");
|
||||
|
||||
lock (_mu)
|
||||
{
|
||||
Leaf ??= new Leaf();
|
||||
Leaf.Remote ??= new LeafNodeCfg();
|
||||
|
||||
var remote = Leaf.Remote;
|
||||
var connectInfo = new LeafConnectInfo
|
||||
{
|
||||
Version = ServerConstants.Version,
|
||||
Id = server.ID(),
|
||||
Domain = server.GetOpts().JetStreamDomain,
|
||||
Name = server.Name(),
|
||||
Hub = remote.RemoteOpts?.Hub == true,
|
||||
Cluster = clusterName,
|
||||
Headers = headers,
|
||||
JetStream = false,
|
||||
DenyPub = remote.RemoteOpts?.DenyImports?.ToArray() ?? [],
|
||||
Compression = string.IsNullOrWhiteSpace(Leaf.Compression) ? CompressionMode.NotSupported : Leaf.Compression,
|
||||
RemoteAccount = _account?.Name ?? string.Empty,
|
||||
Proto = server.GetServerProto(),
|
||||
Isolate = remote.RemoteOpts?.RequestIsolation == true,
|
||||
};
|
||||
|
||||
if (remote.CurUrl?.UserInfo is { Length: > 0 } userInfo)
|
||||
{
|
||||
var userInfoParts = userInfo.Split(':', 2, StringSplitOptions.None);
|
||||
connectInfo.User = userInfoParts[0];
|
||||
connectInfo.Pass = userInfoParts.Length > 1 ? userInfoParts[1] : string.Empty;
|
||||
if (string.IsNullOrEmpty(connectInfo.Pass))
|
||||
connectInfo.Token = connectInfo.User;
|
||||
}
|
||||
else if (!string.IsNullOrWhiteSpace(remote.Username))
|
||||
{
|
||||
connectInfo.User = remote.Username;
|
||||
connectInfo.Pass = remote.Password;
|
||||
if (string.IsNullOrEmpty(connectInfo.Pass))
|
||||
connectInfo.Token = connectInfo.User;
|
||||
}
|
||||
|
||||
var payload = JsonSerializer.Serialize(connectInfo);
|
||||
EnqueueProto(Encoding.ASCII.GetBytes($"CONNECT {payload}\r\n"));
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
internal Exception? LeafClientHandshakeIfNeeded(ServerInfo info)
|
||||
{
|
||||
if (!info.TlsRequired)
|
||||
return null;
|
||||
|
||||
Opts.TlsRequired = true;
|
||||
return null;
|
||||
}
|
||||
|
||||
internal void ProcessLeafnodeInfo(ServerInfo info)
|
||||
{
|
||||
if (Server is not NatsServer server)
|
||||
return;
|
||||
|
||||
lock (_mu)
|
||||
{
|
||||
Leaf ??= new Leaf();
|
||||
if (string.IsNullOrWhiteSpace(Leaf.Compression))
|
||||
Leaf.Compression = string.IsNullOrWhiteSpace(info.Compression) ? CompressionMode.NotSupported : info.Compression!;
|
||||
|
||||
Headers = server.SupportsHeaders() && info.Headers;
|
||||
Flags |= ClientFlags.InfoReceived;
|
||||
}
|
||||
|
||||
if (IsSolicitedLeafNode())
|
||||
UpdateLeafNodeURLs(info);
|
||||
}
|
||||
|
||||
internal void UpdateLeafNodeURLs(ServerInfo info)
|
||||
{
|
||||
var cfg = Leaf?.Remote;
|
||||
if (cfg is null)
|
||||
return;
|
||||
|
||||
cfg.AcquireWriteLock();
|
||||
try
|
||||
{
|
||||
var useWebSocket = cfg.Urls.Count > 0 && string.Equals(cfg.Urls[0].Scheme, "ws", StringComparison.OrdinalIgnoreCase);
|
||||
if (useWebSocket)
|
||||
{
|
||||
var scheme = cfg.RemoteOpts?.Tls == true ? "wss" : "ws";
|
||||
DoUpdateLNURLs(cfg, scheme, info.WsConnectUrls ?? []);
|
||||
}
|
||||
else
|
||||
{
|
||||
DoUpdateLNURLs(cfg, "nats-leaf", info.LeafNodeUrls ?? []);
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
cfg.ReleaseWriteLock();
|
||||
}
|
||||
}
|
||||
|
||||
internal void DoUpdateLNURLs(LeafNodeCfg cfg, string scheme, string[] urls)
|
||||
{
|
||||
var dynamicUrls = new List<Uri>(urls.Length + cfg.Urls.Count);
|
||||
|
||||
foreach (var url in urls)
|
||||
{
|
||||
if (!Uri.TryCreate($"{scheme}://{url}", UriKind.Absolute, out var parsed))
|
||||
{
|
||||
Errorf("Error parsing url {0}", url);
|
||||
continue;
|
||||
}
|
||||
|
||||
var isDuplicate = false;
|
||||
foreach (var configured in cfg.Urls)
|
||||
{
|
||||
if (string.Equals(parsed.Host, configured.Host, StringComparison.OrdinalIgnoreCase)
|
||||
&& parsed.Port == configured.Port)
|
||||
{
|
||||
isDuplicate = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (isDuplicate)
|
||||
continue;
|
||||
|
||||
dynamicUrls.Add(parsed);
|
||||
cfg.SaveTLSHostname(parsed);
|
||||
}
|
||||
|
||||
dynamicUrls.AddRange(cfg.Urls);
|
||||
cfg.Urls = dynamicUrls;
|
||||
cfg.CurUrl ??= cfg.Urls.FirstOrDefault();
|
||||
}
|
||||
|
||||
internal Exception? ProcessLeafNodeConnect(NatsServer server, byte[] arg, string lang)
|
||||
{
|
||||
if (!string.IsNullOrWhiteSpace(lang))
|
||||
return ServerErrors.ErrClientConnectedToLeafNodePort;
|
||||
|
||||
LeafConnectInfo? protocol;
|
||||
try
|
||||
{
|
||||
protocol = JsonSerializer.Deserialize<LeafConnectInfo>(arg);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
return ex;
|
||||
}
|
||||
|
||||
if (protocol is null)
|
||||
return new InvalidOperationException("invalid leaf connect payload");
|
||||
|
||||
if (!string.IsNullOrWhiteSpace(protocol.Cluster) && protocol.Cluster.Contains(' '))
|
||||
return ServerErrors.ErrClusterNameHasSpaces;
|
||||
|
||||
var cachedClusterName = server.CachedClusterName();
|
||||
if (!string.IsNullOrWhiteSpace(cachedClusterName)
|
||||
&& !string.IsNullOrWhiteSpace(protocol.Cluster)
|
||||
&& string.Equals(cachedClusterName, protocol.Cluster, StringComparison.Ordinal))
|
||||
return ServerErrors.ErrLeafNodeHasSameClusterName;
|
||||
|
||||
lock (_mu)
|
||||
{
|
||||
Leaf ??= new Leaf();
|
||||
Opts.Verbose = false;
|
||||
Opts.Echo = false;
|
||||
Opts.Pedantic = false;
|
||||
Headers = server.SupportsHeaders() && protocol.Headers;
|
||||
|
||||
if (string.IsNullOrWhiteSpace(Leaf.Compression))
|
||||
Leaf.Compression = string.IsNullOrWhiteSpace(protocol.Compression) ? CompressionMode.NotSupported : protocol.Compression;
|
||||
|
||||
Leaf.RemoteServer = protocol.Name;
|
||||
Leaf.RemoteAccName = protocol.RemoteAccount;
|
||||
Leaf.Isolated = Leaf.Isolated || protocol.Isolate;
|
||||
Leaf.IsSpoke = protocol.Hub;
|
||||
Leaf.RemoteCluster = protocol.Cluster;
|
||||
Leaf.RemoteDomain = protocol.Domain;
|
||||
|
||||
if (protocol.DenyPub is { Length: > 0 })
|
||||
MergeDenyPermissions(DenyType.Pub, protocol.DenyPub);
|
||||
}
|
||||
|
||||
server.AddLeafNodeConnection(this, protocol.Name, protocol.Cluster, checkForDup: true);
|
||||
server.SendPermsAndAccountInfo(this);
|
||||
server.InitLeafNodeSmapAndSendSubs(this);
|
||||
server.CheckInternalSyncConsumers(_account as Account);
|
||||
return null;
|
||||
}
|
||||
|
||||
internal void UpdateSmap(Internal.Subscription sub, int delta, bool isLds)
|
||||
{
|
||||
_ = isLds;
|
||||
|
||||
lock (_mu)
|
||||
{
|
||||
if (Leaf?.Smap is null)
|
||||
return;
|
||||
|
||||
var key = LeafNodeHandler.KeyFromSub(sub);
|
||||
Leaf.Smap.TryGetValue(key, out var current);
|
||||
var next = current + delta;
|
||||
|
||||
var isQueue = sub.Queue is { Length: > 0 };
|
||||
var shouldUpdate = isQueue || (current <= 0 && next > 0) || (current > 0 && next <= 0);
|
||||
|
||||
if (next > 0)
|
||||
Leaf.Smap[key] = next;
|
||||
else
|
||||
Leaf.Smap.Remove(key);
|
||||
|
||||
if (shouldUpdate)
|
||||
SendLeafNodeSubUpdate(key, next);
|
||||
}
|
||||
}
|
||||
|
||||
internal void ForceAddToSmap(string subject)
|
||||
{
|
||||
lock (_mu)
|
||||
{
|
||||
if (Leaf?.Smap is null)
|
||||
return;
|
||||
|
||||
if (Leaf.Smap.TryGetValue(subject, out var value) && value != 0)
|
||||
return;
|
||||
|
||||
Leaf.Smap[subject] = 1;
|
||||
SendLeafNodeSubUpdate(subject, 1);
|
||||
}
|
||||
}
|
||||
|
||||
internal void ForceRemoveFromSmap(string subject)
|
||||
{
|
||||
lock (_mu)
|
||||
{
|
||||
if (Leaf?.Smap is null)
|
||||
return;
|
||||
|
||||
if (!Leaf.Smap.TryGetValue(subject, out var value) || value == 0)
|
||||
return;
|
||||
|
||||
value--;
|
||||
if (value <= 0)
|
||||
{
|
||||
Leaf.Smap.Remove(subject);
|
||||
SendLeafNodeSubUpdate(subject, 0);
|
||||
}
|
||||
else
|
||||
{
|
||||
Leaf.Smap[subject] = value;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
internal void SendLeafNodeSubUpdate(string key, int n)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(key))
|
||||
return;
|
||||
|
||||
if (IsSpokeLeafNode())
|
||||
{
|
||||
var subject = key;
|
||||
var separator = key.IndexOf(' ');
|
||||
if (separator > 0)
|
||||
subject = key[..separator];
|
||||
|
||||
var checkPermissions = !subject.StartsWith(LeafNodeHandler.LeafNodeLoopDetectionSubjectPrefix, StringComparison.Ordinal)
|
||||
&& !subject.StartsWith("$GR.", StringComparison.Ordinal)
|
||||
&& !subject.StartsWith("$GNR.", StringComparison.Ordinal);
|
||||
if (checkPermissions && !CanSubscribe(subject))
|
||||
return;
|
||||
}
|
||||
|
||||
var buffer = new StringBuilder();
|
||||
WriteLeafSub(buffer, key, n);
|
||||
EnqueueProto(Encoding.ASCII.GetBytes(buffer.ToString()));
|
||||
}
|
||||
|
||||
internal void WriteLeafSub(StringBuilder writer, string key, int n)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(key))
|
||||
return;
|
||||
|
||||
if (n > 0)
|
||||
{
|
||||
writer.Append("LS+ ").Append(key);
|
||||
if (key.Contains(' '))
|
||||
writer.Append(' ').Append(n);
|
||||
}
|
||||
else
|
||||
{
|
||||
writer.Append("LS- ").Append(key);
|
||||
}
|
||||
|
||||
writer.Append("\r\n");
|
||||
}
|
||||
|
||||
internal Exception? ProcessLeafSub(byte[] protoArg)
|
||||
{
|
||||
_in.Subs++;
|
||||
if (Server is not NatsServer server)
|
||||
return null;
|
||||
|
||||
var args = SplitArg(protoArg);
|
||||
if (args.Count is not 1 and not 3)
|
||||
return new FormatException($"processLeafSub Parse Error: '{Encoding.ASCII.GetString(protoArg)}'");
|
||||
|
||||
var key = Encoding.ASCII.GetString(args[0]);
|
||||
var keyParts = key.Split(' ', StringSplitOptions.RemoveEmptyEntries);
|
||||
if (keyParts.Length == 0)
|
||||
return new FormatException($"processLeafSub Parse Error: '{key}'");
|
||||
|
||||
var subject = Encoding.ASCII.GetBytes(keyParts[0]);
|
||||
byte[]? queue = null;
|
||||
if (keyParts.Length > 1)
|
||||
queue = Encoding.ASCII.GetBytes(keyParts[1]);
|
||||
|
||||
var qw = 1;
|
||||
if (args.Count == 3)
|
||||
{
|
||||
if (!int.TryParse(Encoding.ASCII.GetString(args[2]), out qw) || qw <= 0)
|
||||
qw = 1;
|
||||
}
|
||||
|
||||
if (_account is not Account account)
|
||||
return null;
|
||||
|
||||
lock (_mu)
|
||||
{
|
||||
if (IsClosed())
|
||||
return null;
|
||||
|
||||
var subjectText = Encoding.ASCII.GetString(subject);
|
||||
if (Perms is not null && !CanExport(subjectText))
|
||||
{
|
||||
LeafSubPermViolation(subject);
|
||||
return null;
|
||||
}
|
||||
|
||||
if (SubsAtLimit())
|
||||
{
|
||||
MaxSubsExceeded();
|
||||
return null;
|
||||
}
|
||||
|
||||
if (Subs.ContainsKey(key))
|
||||
return null;
|
||||
|
||||
var sub = new Internal.Subscription
|
||||
{
|
||||
Subject = subject,
|
||||
Queue = queue,
|
||||
Sid = Encoding.ASCII.GetBytes(key),
|
||||
Qw = qw,
|
||||
};
|
||||
Subs[key] = sub;
|
||||
account.Sublist?.Insert(sub);
|
||||
}
|
||||
|
||||
var subValue = Subs[key];
|
||||
var delta = queue is { Length: > 0 } ? qw : 1;
|
||||
if (!IsSpokeLeafNode())
|
||||
{
|
||||
server.UpdateRemoteSubscription(account, subValue, delta);
|
||||
if (server.GetOpts().Gateway.Name.Length > 0)
|
||||
server.UpdateInterestForAccountOnGateway(account.GetName(), subValue, delta);
|
||||
}
|
||||
|
||||
account.UpdateLeafNodes(subValue, delta);
|
||||
|
||||
if (Opts.Verbose)
|
||||
SendOK();
|
||||
return null;
|
||||
}
|
||||
|
||||
internal Exception? HandleLeafNodeLoop(bool detectedLocally)
|
||||
{
|
||||
_ = detectedLocally;
|
||||
var (accountName, delay) = SetLeafConnectDelayIfSoliciting(LeafNodeHandler.LeafNodeReconnectDelayAfterLoopDetected);
|
||||
if (string.IsNullOrWhiteSpace(accountName))
|
||||
return null;
|
||||
|
||||
Warnf("Detected loop in leafnode setup for account {0}, reconnect delayed by {1}", accountName, delay);
|
||||
return null;
|
||||
}
|
||||
|
||||
internal Exception? ProcessLeafUnsub(byte[] arg)
|
||||
{
|
||||
_in.Subs++;
|
||||
|
||||
if (Server is not NatsServer server)
|
||||
return null;
|
||||
|
||||
Internal.Subscription? sub;
|
||||
var key = Encoding.ASCII.GetString(arg);
|
||||
var spoke = false;
|
||||
|
||||
lock (_mu)
|
||||
{
|
||||
if (IsClosed())
|
||||
return null;
|
||||
|
||||
spoke = IsSpokeLeafNode();
|
||||
if (!Subs.TryGetValue(key, out sub))
|
||||
return null;
|
||||
|
||||
Subs.Remove(key);
|
||||
}
|
||||
|
||||
var account = _account as Account;
|
||||
if (account is null || sub is null)
|
||||
return null;
|
||||
|
||||
account.Sublist?.Remove(sub);
|
||||
|
||||
var delta = sub.Queue is { Length: > 0 } ? Math.Max(sub.Qw, 1) : 1;
|
||||
if (!spoke)
|
||||
{
|
||||
server.UpdateRemoteSubscription(account, sub, -delta);
|
||||
if (server.GetOpts().Gateway.Name.Length > 0)
|
||||
server.UpdateInterestForAccountOnGateway(account.GetName(), sub, -delta);
|
||||
}
|
||||
|
||||
account.UpdateLeafNodes(sub, -delta);
|
||||
return null;
|
||||
}
|
||||
|
||||
internal Exception? ProcessLeafHeaderMsgArgs(byte[] arg) =>
|
||||
ProtocolParser.ProcessLeafHeaderMsgArgs(ParseCtx, arg);
|
||||
|
||||
internal Exception? ProcessLeafMsgArgs(byte[] arg) =>
|
||||
ProtocolParser.ProcessLeafMsgArgs(ParseCtx, arg);
|
||||
|
||||
internal void ProcessInboundLeafMsg(byte[] msg)
|
||||
{
|
||||
ProcessInboundRoutedMsg(msg);
|
||||
}
|
||||
|
||||
internal void LeafSubPermViolation(byte[] subject) => LeafPermViolation(pub: false, subject);
|
||||
|
||||
internal void LeafPermViolation(bool pub, byte[] subject)
|
||||
{
|
||||
if (IsSpokeLeafNode())
|
||||
return;
|
||||
|
||||
SetLeafConnectDelayIfSoliciting(LeafNodeHandler.LeafNodeReconnectAfterPermViolation);
|
||||
var subjectText = Encoding.ASCII.GetString(subject);
|
||||
if (pub)
|
||||
{
|
||||
SendErr($"Permissions Violation for Publish to '{subjectText}'");
|
||||
Errorf("Publish Violation on '{0}' - Check other side configuration", subjectText);
|
||||
}
|
||||
else
|
||||
{
|
||||
SendErr($"Permissions Violation for Subscription to '{subjectText}'");
|
||||
Errorf("Subscription Violation on '{0}' - Check other side configuration", subjectText);
|
||||
}
|
||||
|
||||
CloseConnection(ClosedState.ProtocolViolation);
|
||||
}
|
||||
|
||||
internal void LeafProcessErr(string errorText)
|
||||
{
|
||||
if (errorText.Contains(ServerErrors.ErrLeafNodeHasSameClusterName.Message, StringComparison.Ordinal))
|
||||
{
|
||||
var (_, delay) = SetLeafConnectDelayIfSoliciting(LeafNodeHandler.LeafNodeReconnectDelayAfterClusterNameSame);
|
||||
Errorf("Leafnode connection dropped with same cluster name error. Delaying reconnect for {0}", delay);
|
||||
return;
|
||||
}
|
||||
|
||||
if (errorText.Contains("Loop detected", StringComparison.OrdinalIgnoreCase))
|
||||
_ = HandleLeafNodeLoop(detectedLocally: false);
|
||||
}
|
||||
|
||||
internal (string AccountName, TimeSpan Delay) SetLeafConnectDelayIfSoliciting(TimeSpan delay)
|
||||
{
|
||||
lock (_mu)
|
||||
{
|
||||
if (IsSolicitedLeafNode())
|
||||
Leaf?.Remote?.SetConnectDelay(delay);
|
||||
|
||||
return (_account?.Name ?? string.Empty, delay);
|
||||
}
|
||||
}
|
||||
|
||||
internal (bool TlsRequired, SslServerAuthenticationOptions? TlsConfig, string TlsName, double TlsTimeout) LeafNodeGetTLSConfigForSolicit(LeafNodeCfg remote)
|
||||
{
|
||||
remote.AcquireReadLock();
|
||||
try
|
||||
{
|
||||
var opts = remote.RemoteOpts;
|
||||
var tlsRequired = opts?.Tls == true || opts?.TlsConfig is not null;
|
||||
return (tlsRequired, opts?.TlsConfig, remote.TlsName, opts?.TlsTimeout ?? ServerConstants.TlsTimeout.TotalSeconds);
|
||||
}
|
||||
finally
|
||||
{
|
||||
remote.ReleaseReadLock();
|
||||
}
|
||||
}
|
||||
|
||||
internal (byte[]? PreBuffer, ClosedState CloseReason, Exception? Error) LeafNodeSolicitWSConnection(ServerOptions options, Uri remoteUrl, LeafNodeCfg remote)
|
||||
{
|
||||
_ = options;
|
||||
_ = remote;
|
||||
|
||||
if (!string.Equals(remoteUrl.Scheme, "ws", StringComparison.OrdinalIgnoreCase)
|
||||
&& !string.Equals(remoteUrl.Scheme, "wss", StringComparison.OrdinalIgnoreCase))
|
||||
return (null, ClosedState.ProtocolViolation, new InvalidOperationException("URL is not websocket based"));
|
||||
|
||||
return (null, ClosedState.ClientClosed, null);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user