diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.Gateways.Protocol.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.Gateways.Protocol.cs new file mode 100644 index 0000000..895874a --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.Gateways.Protocol.cs @@ -0,0 +1,102 @@ +// Copyright 2018-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 + +using System.Text; +using System.Text.Json; + +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class ClientConnection +{ + internal Exception? SendGatewayConnect() + { + if (Server is not NatsServer server) + return new InvalidOperationException("gateway server unavailable"); + + var opts = server.GetOpts(); + var connect = new ConnectInfo + { + Echo = true, + Verbose = false, + Pedantic = false, + User = opts.Gateway.Username, + Pass = opts.Gateway.Password, + Tls = opts.Gateway.TlsConfig != null, + Name = server.ID(), + Headers = server.SupportsHeaders(), + Gateway = opts.Gateway.Name, + Dynamic = server.IsClusterNameDynamic(), + }; + + var payload = JsonSerializer.Serialize(connect); + EnqueueProto(Encoding.ASCII.GetBytes($"CONNECT {payload}\r\n")); + return null; + } + + internal Exception? ProcessGatewayConnect(byte[] arg) + { + ConnectInfo? connect; + try + { + connect = JsonSerializer.Deserialize(arg); + } + catch (Exception ex) + { + return ex; + } + + if (connect == null) + return new InvalidOperationException("invalid gateway connect payload"); + + Gateway ??= new Gateway(); + Gateway.Connected = true; + + if (!string.IsNullOrWhiteSpace(connect.Gateway)) + Gateway.Name = connect.Gateway; + if (!string.IsNullOrWhiteSpace(connect.Name)) + Gateway.RemoteName = connect.Name; + + Headers = connect.Headers; + return null; + } + + internal Exception? ProcessGatewayInfo(byte[] arg) + { + ServerInfo? info; + try + { + info = JsonSerializer.Deserialize(arg); + } + catch (Exception ex) + { + return ex; + } + + if (info == null) + return new InvalidOperationException("invalid gateway info payload"); + + return ProcessGatewayInfo(info); + } + + internal Exception? ProcessGatewayInfo(ServerInfo info) + { + if (Server is not NatsServer server) + return new InvalidOperationException("gateway server unavailable"); + + Gateway ??= new Gateway(); + + if (!string.IsNullOrWhiteSpace(info.Gateway)) + Gateway.Name = info.Gateway; + + if (!string.IsNullOrWhiteSpace(info.GatewayUrl) && Uri.TryCreate(info.GatewayUrl, UriKind.Absolute, out var url)) + Gateway.ConnectUrl = url; + + Gateway.UseOldPrefix = info.GatewayNrp; + Gateway.InterestOnlyMode = info.GatewayIom; + + if (info.GatewayUrls is { Length: > 0 }) + server.ProcessImplicitGateway(Gateway.Name, info.GatewayUrls); + + return null; + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Gateway/GatewayHandler.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Gateway/GatewayHandler.cs index b51ded3..8b1df33 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Gateway/GatewayHandler.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Gateway/GatewayHandler.cs @@ -3,6 +3,7 @@ using System.Security.Cryptography; using System.Text; +using ZB.MOM.NatsNet.Server.Internal; namespace ZB.MOM.NatsNet.Server; @@ -66,6 +67,16 @@ internal static class GatewayHandler internal static byte[] GetOldHash(string value) => GetHash(value, 4); + internal static byte[] GwBuildSubProto(string accountName, Subscription subscription, bool isUnsub = false) + { + var op = isUnsub ? "RS-" : "RS+"; + var subject = Encoding.ASCII.GetString(subscription.Subject); + if (subscription.Queue is { Length: > 0 } queue) + return Encoding.ASCII.GetBytes($"{op} {accountName} {subject} {Encoding.ASCII.GetString(queue)}\r\n"); + + return Encoding.ASCII.GetBytes($"{op} {accountName} {subject}\r\n"); + } + private static byte[] GetHash(string value, int len) { var bytes = Encoding.UTF8.GetBytes(value); diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Gateways.ConnectionsAndGossip.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Gateways.ConnectionsAndGossip.cs new file mode 100644 index 0000000..cd5d823 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Gateways.ConnectionsAndGossip.cs @@ -0,0 +1,169 @@ +// Copyright 2018-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 + +using System.Text; +using System.Linq; +using ZB.MOM.NatsNet.Server.Internal; + +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class NatsServer +{ + internal void GossipGatewaysToInboundGateway(ClientConnection inboundGateway) + { + if (_gateway.Info == null) + return; + + var info = _gateway.Info.ShallowClone(); + info.GatewayCmd = 1; + info.GatewayUrls = [.. _gateway.Urls.GetAsStringSlice()]; + inboundGateway.EnqueueProto(GenerateInfoJson(info)); + } + + internal void ForwardNewGatewayToLocalCluster(string gatewayName, IReadOnlyCollection gatewayUrls) + { + if (gatewayUrls.Count == 0) + return; + + var info = _routeInfo.ShallowClone(); + info.Gateway = gatewayName; + info.GatewayUrls = [.. gatewayUrls]; + + var proto = GenerateInfoJson(info); + + _mu.EnterReadLock(); + try + { + ForEachRoute(route => route.EnqueueProto(proto)); + foreach (var inbound in _gateway.In.Values) + inbound.EnqueueProto(proto); + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void SendQueueSubsToGateway(ClientConnection gateway, string accountName) + { + SendAccountSubsToGateway(gateway, accountName, sendQueueOnly: true); + } + + internal void SendAccountSubsToGateway(ClientConnection gateway, string accountName, bool sendQueueOnly = false) + { + var (account, _) = LookupAccount(accountName); + if (account == null) + return; + + SendSubsToGateway(gateway, account, sendQueueOnly); + } + + internal static byte[] GwBuildSubProto(string accountName, Subscription subscription, bool isUnsub = false) + { + var op = isUnsub ? "RS-" : "RS+"; + var subject = Encoding.ASCII.GetString(subscription.Subject); + if (subscription.Queue is { Length: > 0 } queue) + return Encoding.ASCII.GetBytes($"{op} {accountName} {subject} {Encoding.ASCII.GetString(queue)}\r\n"); + + return Encoding.ASCII.GetBytes($"{op} {accountName} {subject}\r\n"); + } + + internal void SendSubsToGateway(ClientConnection gateway, Account account, bool sendQueueOnly) + { + if (account.Sublist == null) + return; + + var subs = new List(); + account.Sublist.All(subs); + foreach (var sub in subs) + { + if (sendQueueOnly && sub.Queue is not { Length: > 0 }) + continue; + + gateway.EnqueueProto(GwBuildSubProto(account.Name, sub)); + } + } + + internal void ProcessGatewayInfoFromRoute(string gatewayName, IReadOnlyCollection gatewayUrls) + { + ProcessImplicitGateway(gatewayName, gatewayUrls); + } + + internal void SendGatewayConfigsToRoute(ClientConnection route) + { + _gateway.AcquireReadLock(); + try + { + foreach (var (name, cfg) in _gateway.Remotes) + { + var info = _routeInfo.ShallowClone(); + info.Gateway = name; + info.GatewayUrls = cfg.GetUrlsAsStrings(); + route.EnqueueProto(GenerateInfoJson(info)); + } + } + finally + { + _gateway.ReleaseReadLock(); + } + } + + internal void ProcessImplicitGateway(string gatewayName, IReadOnlyCollection gatewayUrls) + { + if (string.IsNullOrWhiteSpace(gatewayName) || gatewayUrls.Count == 0) + return; + + _gateway.AcquireWriteLock(); + try + { + if (!_gateway.Remotes.TryGetValue(gatewayName, out var cfg)) + { + cfg = new GatewayCfg + { + RemoteOpts = new RemoteGatewayOpts { Name = gatewayName }, + Hash = GatewayHandler.GetGWHash(gatewayName), + OldHash = GatewayHandler.GetOldHash(gatewayName), + Implicit = true, + Urls = new Dictionary(StringComparer.Ordinal), + }; + _gateway.Remotes[gatewayName] = cfg; + } + + var urls = gatewayUrls + .Select(url => Uri.TryCreate(url, UriKind.Absolute, out var uri) ? uri : null) + .Where(uri => uri != null) + .Cast() + .ToList(); + + cfg.AddUrls(urls); + cfg.VarzUpdateUrls = true; + } + finally + { + _gateway.ReleaseWriteLock(); + } + } + + public int NumOutboundGateways() => NumOutboundGatewaysInternal(); + + internal int NumOutboundGatewaysInternal() + { + _gateway.AcquireReadLock(); + try { return _gateway.Out.Count; } + finally { _gateway.ReleaseReadLock(); } + } + + internal int NumInboundGateways() + { + _gateway.AcquireReadLock(); + try { return _gateway.In.Count; } + finally { _gateway.ReleaseReadLock(); } + } + + internal GatewayCfg? GetRemoteGateway(string gatewayName) + { + _gateway.AcquireReadLock(); + try { return _gateway.Remotes.GetValueOrDefault(gatewayName); } + finally { _gateway.ReleaseReadLock(); } + } +} diff --git a/porting.db b/porting.db index 8d456e9..5bf2033 100644 Binary files a/porting.db and b/porting.db differ