feat(batch25): implement gateway handshake and gossip

This commit is contained in:
Joseph Doherty
2026-03-01 01:50:40 -05:00
parent a83339fe71
commit 1763304e28
4 changed files with 282 additions and 0 deletions

View File

@@ -0,0 +1,102 @@
// Copyright 2018-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0
using System.Text;
using System.Text.Json;
namespace ZB.MOM.NatsNet.Server;
public sealed partial class ClientConnection
{
internal Exception? SendGatewayConnect()
{
if (Server is not NatsServer server)
return new InvalidOperationException("gateway server unavailable");
var opts = server.GetOpts();
var connect = new ConnectInfo
{
Echo = true,
Verbose = false,
Pedantic = false,
User = opts.Gateway.Username,
Pass = opts.Gateway.Password,
Tls = opts.Gateway.TlsConfig != null,
Name = server.ID(),
Headers = server.SupportsHeaders(),
Gateway = opts.Gateway.Name,
Dynamic = server.IsClusterNameDynamic(),
};
var payload = JsonSerializer.Serialize(connect);
EnqueueProto(Encoding.ASCII.GetBytes($"CONNECT {payload}\r\n"));
return null;
}
internal Exception? ProcessGatewayConnect(byte[] arg)
{
ConnectInfo? connect;
try
{
connect = JsonSerializer.Deserialize<ConnectInfo>(arg);
}
catch (Exception ex)
{
return ex;
}
if (connect == null)
return new InvalidOperationException("invalid gateway connect payload");
Gateway ??= new Gateway();
Gateway.Connected = true;
if (!string.IsNullOrWhiteSpace(connect.Gateway))
Gateway.Name = connect.Gateway;
if (!string.IsNullOrWhiteSpace(connect.Name))
Gateway.RemoteName = connect.Name;
Headers = connect.Headers;
return null;
}
internal Exception? ProcessGatewayInfo(byte[] arg)
{
ServerInfo? info;
try
{
info = JsonSerializer.Deserialize<ServerInfo>(arg);
}
catch (Exception ex)
{
return ex;
}
if (info == null)
return new InvalidOperationException("invalid gateway info payload");
return ProcessGatewayInfo(info);
}
internal Exception? ProcessGatewayInfo(ServerInfo info)
{
if (Server is not NatsServer server)
return new InvalidOperationException("gateway server unavailable");
Gateway ??= new Gateway();
if (!string.IsNullOrWhiteSpace(info.Gateway))
Gateway.Name = info.Gateway;
if (!string.IsNullOrWhiteSpace(info.GatewayUrl) && Uri.TryCreate(info.GatewayUrl, UriKind.Absolute, out var url))
Gateway.ConnectUrl = url;
Gateway.UseOldPrefix = info.GatewayNrp;
Gateway.InterestOnlyMode = info.GatewayIom;
if (info.GatewayUrls is { Length: > 0 })
server.ProcessImplicitGateway(Gateway.Name, info.GatewayUrls);
return null;
}
}

View File

@@ -3,6 +3,7 @@
using System.Security.Cryptography;
using System.Text;
using ZB.MOM.NatsNet.Server.Internal;
namespace ZB.MOM.NatsNet.Server;
@@ -66,6 +67,16 @@ internal static class GatewayHandler
internal static byte[] GetOldHash(string value) => GetHash(value, 4);
internal static byte[] GwBuildSubProto(string accountName, Subscription subscription, bool isUnsub = false)
{
var op = isUnsub ? "RS-" : "RS+";
var subject = Encoding.ASCII.GetString(subscription.Subject);
if (subscription.Queue is { Length: > 0 } queue)
return Encoding.ASCII.GetBytes($"{op} {accountName} {subject} {Encoding.ASCII.GetString(queue)}\r\n");
return Encoding.ASCII.GetBytes($"{op} {accountName} {subject}\r\n");
}
private static byte[] GetHash(string value, int len)
{
var bytes = Encoding.UTF8.GetBytes(value);

View File

@@ -0,0 +1,169 @@
// Copyright 2018-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0
using System.Text;
using System.Linq;
using ZB.MOM.NatsNet.Server.Internal;
namespace ZB.MOM.NatsNet.Server;
public sealed partial class NatsServer
{
internal void GossipGatewaysToInboundGateway(ClientConnection inboundGateway)
{
if (_gateway.Info == null)
return;
var info = _gateway.Info.ShallowClone();
info.GatewayCmd = 1;
info.GatewayUrls = [.. _gateway.Urls.GetAsStringSlice()];
inboundGateway.EnqueueProto(GenerateInfoJson(info));
}
internal void ForwardNewGatewayToLocalCluster(string gatewayName, IReadOnlyCollection<string> gatewayUrls)
{
if (gatewayUrls.Count == 0)
return;
var info = _routeInfo.ShallowClone();
info.Gateway = gatewayName;
info.GatewayUrls = [.. gatewayUrls];
var proto = GenerateInfoJson(info);
_mu.EnterReadLock();
try
{
ForEachRoute(route => route.EnqueueProto(proto));
foreach (var inbound in _gateway.In.Values)
inbound.EnqueueProto(proto);
}
finally
{
_mu.ExitReadLock();
}
}
internal void SendQueueSubsToGateway(ClientConnection gateway, string accountName)
{
SendAccountSubsToGateway(gateway, accountName, sendQueueOnly: true);
}
internal void SendAccountSubsToGateway(ClientConnection gateway, string accountName, bool sendQueueOnly = false)
{
var (account, _) = LookupAccount(accountName);
if (account == null)
return;
SendSubsToGateway(gateway, account, sendQueueOnly);
}
internal static byte[] GwBuildSubProto(string accountName, Subscription subscription, bool isUnsub = false)
{
var op = isUnsub ? "RS-" : "RS+";
var subject = Encoding.ASCII.GetString(subscription.Subject);
if (subscription.Queue is { Length: > 0 } queue)
return Encoding.ASCII.GetBytes($"{op} {accountName} {subject} {Encoding.ASCII.GetString(queue)}\r\n");
return Encoding.ASCII.GetBytes($"{op} {accountName} {subject}\r\n");
}
internal void SendSubsToGateway(ClientConnection gateway, Account account, bool sendQueueOnly)
{
if (account.Sublist == null)
return;
var subs = new List<Subscription>();
account.Sublist.All(subs);
foreach (var sub in subs)
{
if (sendQueueOnly && sub.Queue is not { Length: > 0 })
continue;
gateway.EnqueueProto(GwBuildSubProto(account.Name, sub));
}
}
internal void ProcessGatewayInfoFromRoute(string gatewayName, IReadOnlyCollection<string> gatewayUrls)
{
ProcessImplicitGateway(gatewayName, gatewayUrls);
}
internal void SendGatewayConfigsToRoute(ClientConnection route)
{
_gateway.AcquireReadLock();
try
{
foreach (var (name, cfg) in _gateway.Remotes)
{
var info = _routeInfo.ShallowClone();
info.Gateway = name;
info.GatewayUrls = cfg.GetUrlsAsStrings();
route.EnqueueProto(GenerateInfoJson(info));
}
}
finally
{
_gateway.ReleaseReadLock();
}
}
internal void ProcessImplicitGateway(string gatewayName, IReadOnlyCollection<string> gatewayUrls)
{
if (string.IsNullOrWhiteSpace(gatewayName) || gatewayUrls.Count == 0)
return;
_gateway.AcquireWriteLock();
try
{
if (!_gateway.Remotes.TryGetValue(gatewayName, out var cfg))
{
cfg = new GatewayCfg
{
RemoteOpts = new RemoteGatewayOpts { Name = gatewayName },
Hash = GatewayHandler.GetGWHash(gatewayName),
OldHash = GatewayHandler.GetOldHash(gatewayName),
Implicit = true,
Urls = new Dictionary<string, Uri>(StringComparer.Ordinal),
};
_gateway.Remotes[gatewayName] = cfg;
}
var urls = gatewayUrls
.Select(url => Uri.TryCreate(url, UriKind.Absolute, out var uri) ? uri : null)
.Where(uri => uri != null)
.Cast<Uri>()
.ToList();
cfg.AddUrls(urls);
cfg.VarzUpdateUrls = true;
}
finally
{
_gateway.ReleaseWriteLock();
}
}
public int NumOutboundGateways() => NumOutboundGatewaysInternal();
internal int NumOutboundGatewaysInternal()
{
_gateway.AcquireReadLock();
try { return _gateway.Out.Count; }
finally { _gateway.ReleaseReadLock(); }
}
internal int NumInboundGateways()
{
_gateway.AcquireReadLock();
try { return _gateway.In.Count; }
finally { _gateway.ReleaseReadLock(); }
}
internal GatewayCfg? GetRemoteGateway(string gatewayName)
{
_gateway.AcquireReadLock();
try { return _gateway.Remotes.GetValueOrDefault(gatewayName); }
finally { _gateway.ReleaseReadLock(); }
}
}

Binary file not shown.