From 1763304e28169794613437ad9f9ccf6e54381fde Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 1 Mar 2026 01:50:40 -0500 Subject: [PATCH] feat(batch25): implement gateway handshake and gossip --- .../ClientConnection.Gateways.Protocol.cs | 102 +++++++++++ .../Gateway/GatewayHandler.cs | 11 ++ ...atsServer.Gateways.ConnectionsAndGossip.cs | 169 ++++++++++++++++++ porting.db | Bin 6795264 -> 6799360 bytes 4 files changed, 282 insertions(+) create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.Gateways.Protocol.cs create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Gateways.ConnectionsAndGossip.cs diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.Gateways.Protocol.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.Gateways.Protocol.cs new file mode 100644 index 0000000..895874a --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.Gateways.Protocol.cs @@ -0,0 +1,102 @@ +// Copyright 2018-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 + +using System.Text; +using System.Text.Json; + +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class ClientConnection +{ + internal Exception? SendGatewayConnect() + { + if (Server is not NatsServer server) + return new InvalidOperationException("gateway server unavailable"); + + var opts = server.GetOpts(); + var connect = new ConnectInfo + { + Echo = true, + Verbose = false, + Pedantic = false, + User = opts.Gateway.Username, + Pass = opts.Gateway.Password, + Tls = opts.Gateway.TlsConfig != null, + Name = server.ID(), + Headers = server.SupportsHeaders(), + Gateway = opts.Gateway.Name, + Dynamic = server.IsClusterNameDynamic(), + }; + + var payload = JsonSerializer.Serialize(connect); + EnqueueProto(Encoding.ASCII.GetBytes($"CONNECT {payload}\r\n")); + return null; + } + + internal Exception? ProcessGatewayConnect(byte[] arg) + { + ConnectInfo? connect; + try + { + connect = JsonSerializer.Deserialize(arg); + } + catch (Exception ex) + { + return ex; + } + + if (connect == null) + return new InvalidOperationException("invalid gateway connect payload"); + + Gateway ??= new Gateway(); + Gateway.Connected = true; + + if (!string.IsNullOrWhiteSpace(connect.Gateway)) + Gateway.Name = connect.Gateway; + if (!string.IsNullOrWhiteSpace(connect.Name)) + Gateway.RemoteName = connect.Name; + + Headers = connect.Headers; + return null; + } + + internal Exception? ProcessGatewayInfo(byte[] arg) + { + ServerInfo? info; + try + { + info = JsonSerializer.Deserialize(arg); + } + catch (Exception ex) + { + return ex; + } + + if (info == null) + return new InvalidOperationException("invalid gateway info payload"); + + return ProcessGatewayInfo(info); + } + + internal Exception? ProcessGatewayInfo(ServerInfo info) + { + if (Server is not NatsServer server) + return new InvalidOperationException("gateway server unavailable"); + + Gateway ??= new Gateway(); + + if (!string.IsNullOrWhiteSpace(info.Gateway)) + Gateway.Name = info.Gateway; + + if (!string.IsNullOrWhiteSpace(info.GatewayUrl) && Uri.TryCreate(info.GatewayUrl, UriKind.Absolute, out var url)) + Gateway.ConnectUrl = url; + + Gateway.UseOldPrefix = info.GatewayNrp; + Gateway.InterestOnlyMode = info.GatewayIom; + + if (info.GatewayUrls is { Length: > 0 }) + server.ProcessImplicitGateway(Gateway.Name, info.GatewayUrls); + + return null; + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Gateway/GatewayHandler.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Gateway/GatewayHandler.cs index b51ded3..8b1df33 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Gateway/GatewayHandler.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Gateway/GatewayHandler.cs @@ -3,6 +3,7 @@ using System.Security.Cryptography; using System.Text; +using ZB.MOM.NatsNet.Server.Internal; namespace ZB.MOM.NatsNet.Server; @@ -66,6 +67,16 @@ internal static class GatewayHandler internal static byte[] GetOldHash(string value) => GetHash(value, 4); + internal static byte[] GwBuildSubProto(string accountName, Subscription subscription, bool isUnsub = false) + { + var op = isUnsub ? "RS-" : "RS+"; + var subject = Encoding.ASCII.GetString(subscription.Subject); + if (subscription.Queue is { Length: > 0 } queue) + return Encoding.ASCII.GetBytes($"{op} {accountName} {subject} {Encoding.ASCII.GetString(queue)}\r\n"); + + return Encoding.ASCII.GetBytes($"{op} {accountName} {subject}\r\n"); + } + private static byte[] GetHash(string value, int len) { var bytes = Encoding.UTF8.GetBytes(value); diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Gateways.ConnectionsAndGossip.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Gateways.ConnectionsAndGossip.cs new file mode 100644 index 0000000..cd5d823 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Gateways.ConnectionsAndGossip.cs @@ -0,0 +1,169 @@ +// Copyright 2018-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 + +using System.Text; +using System.Linq; +using ZB.MOM.NatsNet.Server.Internal; + +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class NatsServer +{ + internal void GossipGatewaysToInboundGateway(ClientConnection inboundGateway) + { + if (_gateway.Info == null) + return; + + var info = _gateway.Info.ShallowClone(); + info.GatewayCmd = 1; + info.GatewayUrls = [.. _gateway.Urls.GetAsStringSlice()]; + inboundGateway.EnqueueProto(GenerateInfoJson(info)); + } + + internal void ForwardNewGatewayToLocalCluster(string gatewayName, IReadOnlyCollection gatewayUrls) + { + if (gatewayUrls.Count == 0) + return; + + var info = _routeInfo.ShallowClone(); + info.Gateway = gatewayName; + info.GatewayUrls = [.. gatewayUrls]; + + var proto = GenerateInfoJson(info); + + _mu.EnterReadLock(); + try + { + ForEachRoute(route => route.EnqueueProto(proto)); + foreach (var inbound in _gateway.In.Values) + inbound.EnqueueProto(proto); + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void SendQueueSubsToGateway(ClientConnection gateway, string accountName) + { + SendAccountSubsToGateway(gateway, accountName, sendQueueOnly: true); + } + + internal void SendAccountSubsToGateway(ClientConnection gateway, string accountName, bool sendQueueOnly = false) + { + var (account, _) = LookupAccount(accountName); + if (account == null) + return; + + SendSubsToGateway(gateway, account, sendQueueOnly); + } + + internal static byte[] GwBuildSubProto(string accountName, Subscription subscription, bool isUnsub = false) + { + var op = isUnsub ? "RS-" : "RS+"; + var subject = Encoding.ASCII.GetString(subscription.Subject); + if (subscription.Queue is { Length: > 0 } queue) + return Encoding.ASCII.GetBytes($"{op} {accountName} {subject} {Encoding.ASCII.GetString(queue)}\r\n"); + + return Encoding.ASCII.GetBytes($"{op} {accountName} {subject}\r\n"); + } + + internal void SendSubsToGateway(ClientConnection gateway, Account account, bool sendQueueOnly) + { + if (account.Sublist == null) + return; + + var subs = new List(); + account.Sublist.All(subs); + foreach (var sub in subs) + { + if (sendQueueOnly && sub.Queue is not { Length: > 0 }) + continue; + + gateway.EnqueueProto(GwBuildSubProto(account.Name, sub)); + } + } + + internal void ProcessGatewayInfoFromRoute(string gatewayName, IReadOnlyCollection gatewayUrls) + { + ProcessImplicitGateway(gatewayName, gatewayUrls); + } + + internal void SendGatewayConfigsToRoute(ClientConnection route) + { + _gateway.AcquireReadLock(); + try + { + foreach (var (name, cfg) in _gateway.Remotes) + { + var info = _routeInfo.ShallowClone(); + info.Gateway = name; + info.GatewayUrls = cfg.GetUrlsAsStrings(); + route.EnqueueProto(GenerateInfoJson(info)); + } + } + finally + { + _gateway.ReleaseReadLock(); + } + } + + internal void ProcessImplicitGateway(string gatewayName, IReadOnlyCollection gatewayUrls) + { + if (string.IsNullOrWhiteSpace(gatewayName) || gatewayUrls.Count == 0) + return; + + _gateway.AcquireWriteLock(); + try + { + if (!_gateway.Remotes.TryGetValue(gatewayName, out var cfg)) + { + cfg = new GatewayCfg + { + RemoteOpts = new RemoteGatewayOpts { Name = gatewayName }, + Hash = GatewayHandler.GetGWHash(gatewayName), + OldHash = GatewayHandler.GetOldHash(gatewayName), + Implicit = true, + Urls = new Dictionary(StringComparer.Ordinal), + }; + _gateway.Remotes[gatewayName] = cfg; + } + + var urls = gatewayUrls + .Select(url => Uri.TryCreate(url, UriKind.Absolute, out var uri) ? uri : null) + .Where(uri => uri != null) + .Cast() + .ToList(); + + cfg.AddUrls(urls); + cfg.VarzUpdateUrls = true; + } + finally + { + _gateway.ReleaseWriteLock(); + } + } + + public int NumOutboundGateways() => NumOutboundGatewaysInternal(); + + internal int NumOutboundGatewaysInternal() + { + _gateway.AcquireReadLock(); + try { return _gateway.Out.Count; } + finally { _gateway.ReleaseReadLock(); } + } + + internal int NumInboundGateways() + { + _gateway.AcquireReadLock(); + try { return _gateway.In.Count; } + finally { _gateway.ReleaseReadLock(); } + } + + internal GatewayCfg? GetRemoteGateway(string gatewayName) + { + _gateway.AcquireReadLock(); + try { return _gateway.Remotes.GetValueOrDefault(gatewayName); } + finally { _gateway.ReleaseReadLock(); } + } +} diff --git a/porting.db b/porting.db index 8d456e96be048dee80f1179cb6ea00adbc661063..5bf203332e6b143ced4e227bf2974b4eb30e785c 100644 GIT binary patch delta 4713 zcma)=du&_v702(>zCZi=9Vel-Q#)ziPuqE?Bn_+-#-QmsC~a7&6FUahJQ^os5Sqm8 z6_3E$MjQQ2V!2ZVt2B+#vQh5XBsR4EQ8pH}s#I-a3Slam_MmN=t>e*_9bYFm*SdGE z|M*0|uaCdych0$2zn_)qMUkGK{t#yWI*=M1B zRR8ec$llmkjCE!xM~1RzC|icIW++RB;-!X*q)1Z)fzSSpoS(`FnlqFsL!k`CWhi5Y zGGr(=Lopf3ZSko4&C3Y7q}IQa@8>gz49x_-@=+uo|IiV4;!SqSekw|tNW6!ntjZrw z8X~DhmNJz|Jx1yRHAN8Qd^9!?8y$^BSvRQPDk=}u&lTkY^{S$rppGia0qQwL*+D&} zC>yA~in4;*ttbmIMUXv;pj+ZQ@(Ma7hU67= zNkrw91y;M{6*Ncm$SY`!xJzC^Q^ZDCHTAL35fPFomd6nvjRbt`CaB3bcTDzxZ6<1y6BrUM=s-Y9b_jSx`ZCbfe7ovCx3&C_|QL* z0aKSzDZVmKacpB-dpL|6FQdJx@0`AjYUHq84j*vL6gvs>-lVS{cZcjQ>A)iTmj&Or zjLZd=kNMk8N%Uv#x^WA8nl7Wlpxy2p1y`E)bUf- zkdI8qPrl7D_{M)xojTt7DY|bZF8hD+8LCyq@h6|5DV2R;(qq7LpCiBeJHP)N-K|W| zhC9DN<#ISZy@RbYrquKfersiVN4`L9D_dTSvzMqcpOQ=#65ln(v;023d6>NfCa0pJpCFBWiAzvsE3WeK*b%N(j zFE`>ubJp_~kw0#Ji2JVLFdU&Tq~4!>(!1kcIF95AU;VbXMU`4=e!^SD;90>_gnRx- z<>SNq;R(Dv;q}R1T512O85Y{#ER5IVq{HIEH{PR+*nb`wrKJh)4gz*3yUge2yk{J_v2$1QU0^blU~3F z6ani2KM(+lff8T?Pzsa*03HAy z1a<%q0XumZ_#+p*_@o@x@KBKhE+vP(kxlUZ4&R)8fJ;54q@_4KUPd^!S6l{r%#h!>f znW*a-9UdF*AKsfv-WrsXH{L3Fkxp{_?vAJjFTNCXX4xRzoLluioedbg^t--n)dzE` zo)Nt$HTVhjWZXC~5rN!d+)jssZE}&}>{DPwk2VkfXeL;@nkJ_w6%f~@dkw4Wai!}S zc{yHCsQ(%?hjV#Uw0K*(NO$+@OO{|&&AOr4)NR}s8S8&E)Z~wk4v&w7@V+-=g+6$f zTG~U+Nm?w_=2&YKQplHlmK1|xft+Ehx(XL2e>@Q8GWPVY>^UgrYYlkz`A8jZcy+*# zX<99GP;_fe4xL@r40&3EUB|FimP>2E`+g7&BmLP8icYP;nso<<)_~8|MgIQ|cCCTe z*eaAXaOR3>=*5^&NeYa^fs!MpI6wZ$PZlq))JpTb@Xb>0|R2mq$RX)X9z` zmv+`>E4{HLmy07HR_G+RVX|tVCR_I0ZVta#p3bhivJ)&$PEsCa^7@x{TTWMjUo6wf zZ^40Q;&Gu3 z9RA6PxaAfh0|pffqC zl{w1WG$U#KS2{zEaBL`Z&kd$DZ<`l z6uHU*LoagMHFk=HwPD5kGrVnh)gLA}%}UKFWj zw>LFOiFHYSv8sAybxloml@c?o`RMR&Z@#FwsB5=()KtYWY-%6Y_^@Uh)~qzU$(ybR z9b$Ub&|^FFO6{v5`5w|IMjAxgYor0BCL{fWwB1N|kQ$BDkF>!^eMoDJ^fywKk$RDq z8tE^jQX};sl^CfTX&Ta`klclsYb@SEN*zjygp_0?5h>0{49RXJp)D-R6}H2URxDXI z>&^}_%&1{fLQrom^Ng@>X}uzX1TrXK0V~+_)+@YR_IvW&Yg`{WzjYk9pWu6JQ!IVT zX_+KVT+22Jlsv$S9F4xM!5D$fYw%~&tXyxb?z_fsx~M9`U!;SM(S8ct{NMprJRxFg!nL;*!Joovo z=zi_@`NZf}3%mF>W34F;-?p{;zgtZ2=I{K!-qpjsr1bFQ=tX%wd?$rgbH8yBhG`V3 zm0bF`+q_Vu&<&ipB5$;hTHa5x5%qHXTWck^eyo?PjJg+EQmEx~n@r9=o<$XPF1J3n zkI!dxw#}bRGaDoe74@oC`l_F2M(;P+&+BOCA7lK~QI{A;`|n^!<3U`$!{<=V#{oaJ ze91ku;4V*zZrFI2FJZe6KQtMp*b~|=GHu#SMzBxTC5hQJ*35JBeVo87~)h0&43nTqh|En59^(RDy>dtd3RcufL#(U`P4eF=Si#y`1GQF@-O)&=c z#3*woeF+Pg3%CwWC+3-7z+=>I7oq1NP~39fbozCS&$7mFadI5 zBILm&$cM>L08?Np6v8x^4l|$#X2L8ehDTsFJPIW+2j;?K@Hota`LF<