From 59fa600b3cdb4b1a828ce1aacfb856c6f0efa932 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 1 Mar 2026 02:00:08 -0500 Subject: [PATCH] feat(batch25): implement gateway interest and outbound send logic --- .../ClientConnection.Gateways.Messages.cs | 38 ++++++ .../ClientConnection.Gateways.Protocol.cs | 115 ++++++++++++++++++ .../Gateway/GatewayHandler.cs | 22 ++++ .../Gateway/GatewayTypes.cs | 19 +++ .../NatsServer.Gateways.Interest.cs | 77 ++++++++++++ porting.db | Bin 6803456 -> 6807552 bytes 6 files changed, 271 insertions(+) create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.Gateways.Messages.cs create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Gateways.Interest.cs diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.Gateways.Messages.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.Gateways.Messages.cs new file mode 100644 index 0000000..28e310b --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.Gateways.Messages.cs @@ -0,0 +1,38 @@ +// Copyright 2018-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 + +using System.Text; + +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class ClientConnection +{ + internal void SendMsgToGateways(Account? account, byte[] subject, byte[]? reply, byte[] payload) + { + if (Server is not NatsServer server || account == null || subject.Length == 0) + return; + + var outboundGateways = server.GetOutboundGatewayConnections(); + if (outboundGateways.Count == 0) + return; + + foreach (var gateway in outboundGateways) + { + if (!GatewayInterest(account, Encoding.ASCII.GetString(subject))) + continue; + + var replyToSend = reply; + if (reply is { Length: > 0 } && gateway.Gateway != null) + { + var shouldMap = server.ShouldMapReplyForGatewaySend(reply, gateway.Gateway.UseOldPrefix); + if (shouldMap) + replyToSend = reply; + } + + var proto = MsgHeader(subject, replyToSend, new Internal.Subscription { Sid = Encoding.ASCII.GetBytes("1") }); + gateway.EnqueueProto(proto); + gateway.EnqueueProto(payload); + gateway.EnqueueProto(Encoding.ASCII.GetBytes("\r\n")); + } + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.Gateways.Protocol.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.Gateways.Protocol.cs index 895874a..b20ffa5 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.Gateways.Protocol.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.Gateways.Protocol.cs @@ -99,4 +99,119 @@ public sealed partial class ClientConnection return null; } + + internal Exception? ProcessGatewayAccountUnsub(byte[] arg) + { + if (Server is not NatsServer server) + return new InvalidOperationException("gateway server unavailable"); + + var tokens = Encoding.ASCII.GetString(arg).Split(' ', StringSplitOptions.RemoveEmptyEntries); + if (tokens.Length < 1) + return null; + + if (tokens.Length == 1) + return null; + + var accountName = tokens[0]; + var subject = Encoding.ASCII.GetBytes(tokens[1]); + var queue = tokens.Length > 2 ? Encoding.ASCII.GetBytes(tokens[2]) : null; + + var (account, _) = server.LookupAccount(accountName); + if (account == null) + return null; + + var sub = new Internal.Subscription { Subject = subject, Queue = queue }; + server.SendQueueSubOrUnsubToGateways(account, sub, isUnsub: true); + return null; + } + + internal Exception? ProcessGatewayAccountSub(byte[] arg) + { + if (Server is not NatsServer server) + return new InvalidOperationException("gateway server unavailable"); + + var tokens = Encoding.ASCII.GetString(arg).Split(' ', StringSplitOptions.RemoveEmptyEntries); + if (tokens.Length < 2) + return null; + + var accountName = tokens[0]; + var subject = Encoding.ASCII.GetBytes(tokens[1]); + var queue = tokens.Length > 2 ? Encoding.ASCII.GetBytes(tokens[2]) : null; + + var (account, _) = server.LookupAccount(accountName); + if (account == null) + return null; + + var sub = new Internal.Subscription { Subject = subject, Queue = queue }; + if (queue is { Length: > 0 }) + server.SendQueueSubOrUnsubToGateways(account, sub, isUnsub: false); + else + server.MaybeSendSubOrUnsubToGateways(account, sub, isUnsub: false); + + return null; + } + + internal Exception? ProcessGatewayRUnsub(byte[] arg) + { + var tokens = Encoding.ASCII.GetString(arg).Split(' ', StringSplitOptions.RemoveEmptyEntries); + if (tokens.Length < 2 || Server is not NatsServer server) + return null; + + var accountName = tokens[0]; + var subject = Encoding.ASCII.GetBytes(tokens[1]); + var queue = tokens.Length > 2 ? Encoding.ASCII.GetBytes(tokens[2]) : null; + var (account, _) = server.LookupAccount(accountName); + if (account == null) + return null; + + var sub = new Internal.Subscription { Subject = subject, Queue = queue }; + server.MaybeSendSubOrUnsubToGateways(account, sub, isUnsub: true); + return null; + } + + internal Exception? ProcessGatewayRSub(byte[] arg) + { + var tokens = Encoding.ASCII.GetString(arg).Split(' ', StringSplitOptions.RemoveEmptyEntries); + if (tokens.Length < 2 || Server is not NatsServer server) + return null; + + var accountName = tokens[0]; + var subject = Encoding.ASCII.GetBytes(tokens[1]); + var queue = tokens.Length > 2 ? Encoding.ASCII.GetBytes(tokens[2]) : null; + var (account, _) = server.LookupAccount(accountName); + if (account == null) + return null; + + var sub = new Internal.Subscription { Subject = subject, Queue = queue }; + server.MaybeSendSubOrUnsubToGateways(account, sub, isUnsub: false); + return null; + } + + internal bool GatewayInterest(Account? account, string subject) + { + if (account == null || string.IsNullOrWhiteSpace(subject)) + return false; + + if (Gateway?.OutSim == null) + return true; + + if (!Gateway.OutSim.TryGetValue(account.Name, out var outSi)) + return true; + + outSi.AcquireReadLock(); + try + { + if (outSi.Mode == GatewayInterestMode.InterestOnly && outSi.Sl != null) + { + var match = outSi.Sl.Match(subject); + return match.PSubs.Count > 0 || match.QSubs.Count > 0; + } + + return outSi.Ni == null || !outSi.Ni.Contains(subject); + } + finally + { + outSi.ReleaseReadLock(); + } + } } diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Gateway/GatewayHandler.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Gateway/GatewayHandler.cs index 8b1df33..60281b3 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Gateway/GatewayHandler.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Gateway/GatewayHandler.cs @@ -9,6 +9,9 @@ namespace ZB.MOM.NatsNet.Server; internal static class GatewayHandler { + internal static readonly byte[] GwReplyPrefix = Encoding.ASCII.GetBytes("_GR_."); + internal static readonly byte[] OldGwReplyPrefix = Encoding.ASCII.GetBytes("$GR."); + private static readonly TimeSpan DefaultSolicitGatewaysDelay = TimeSpan.FromSeconds(1); private static long _gatewaySolicitDelayTicks = DefaultSolicitGatewaysDelay.Ticks; private static int _doNotForceInterestOnlyMode; @@ -77,6 +80,25 @@ internal static class GatewayHandler return Encoding.ASCII.GetBytes($"{op} {accountName} {subject}\r\n"); } + internal static bool IsGWRoutedReply(byte[] subject) + { + return subject.AsSpan().StartsWith(GwReplyPrefix) || subject.AsSpan().StartsWith(OldGwReplyPrefix); + } + + internal static (bool IsRouted, bool IsOldPrefix) IsGWRoutedSubjectAndIsOldPrefix(byte[] subject) + { + if (subject.AsSpan().StartsWith(OldGwReplyPrefix)) + return (true, true); + if (subject.AsSpan().StartsWith(GwReplyPrefix)) + return (true, false); + return (false, false); + } + + internal static bool HasGWRoutedReplyPrefix(byte[] subject) + { + return IsGWRoutedReply(subject); + } + private static byte[] GetHash(string value, int len) { var bytes = Encoding.UTF8.GetBytes(value); diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Gateway/GatewayTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Gateway/GatewayTypes.cs index ddeb7b4..e567081 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Gateway/GatewayTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Gateway/GatewayTypes.cs @@ -262,6 +262,25 @@ internal sealed class SrvGateway { Outo = [.. Out.Values.OrderBy(c => c.GetRttValue()).ThenBy(c => c.Cid)]; } + + internal void OrderOutboundConnections() + { + _lock.EnterWriteLock(); + try { OrderOutboundConnectionsLocked(); } + finally { _lock.ExitWriteLock(); } + } + + internal bool ShouldMapReplyForGatewaySend(byte[] reply, bool useOldPrefix) + { + if (reply.Length == 0) + return false; + + if (useOldPrefix) + return !GatewayHandler.IsGWRoutedSubjectAndIsOldPrefix(reply).IsRouted; + + return !GatewayHandler.IsGWRoutedReply(reply); + } + } /// diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Gateways.Interest.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Gateways.Interest.cs new file mode 100644 index 0000000..523d260 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Gateways.Interest.cs @@ -0,0 +1,77 @@ +// Copyright 2018-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 + +using System.Net; +using System.Text; +using ZB.MOM.NatsNet.Server.Internal; + +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class NatsServer +{ + internal bool ShouldMapReplyForGatewaySend(byte[] reply, bool useOldPrefix) + { + return _gateway.ShouldMapReplyForGatewaySend(reply, useOldPrefix); + } + + internal IReadOnlyList GetInboundGatewayConnections() + { + _gateway.AcquireReadLock(); + try { return [.. _gateway.In.Values]; } + finally { _gateway.ReleaseReadLock(); } + } + + internal string GatewayAddr() + { + var opts = GetOpts(); + var host = string.IsNullOrWhiteSpace(opts.Gateway.Host) ? opts.Host : opts.Gateway.Host; + return $"{host}:{opts.Gateway.Port}"; + } + + internal void SwitchAccountToInterestMode(string accountName) + { + _gateway.AcquireWriteLock(); + try + { + foreach (var outbound in _gateway.Out.Values) + { + outbound.Gateway ??= new Gateway(); + outbound.Gateway.OutSim ??= new System.Collections.Concurrent.ConcurrentDictionary(StringComparer.Ordinal); + outbound.Gateway.OutSim[accountName] = new OutSide + { + Mode = GatewayInterestMode.InterestOnly, + Ni = null, + }; + } + } + finally + { + _gateway.ReleaseWriteLock(); + } + } + + internal void MaybeSendSubOrUnsubToGateways(Account account, Subscription sub, bool isUnsub) + { + if (sub.Subject.Length == 0) + return; + + _gateway.AcquireReadLock(); + try + { + foreach (var gateway in _gateway.Out.Values) + gateway.EnqueueProto(GatewayHandler.GwBuildSubProto(account.Name, sub, isUnsub)); + } + finally + { + _gateway.ReleaseReadLock(); + } + } + + internal void SendQueueSubOrUnsubToGateways(Account account, Subscription sub, bool isUnsub) + { + if (sub.Queue is not { Length: > 0 }) + return; + + MaybeSendSubOrUnsubToGateways(account, sub, isUnsub); + } +} diff --git a/porting.db b/porting.db index 07133d61e94225679ffad29c3fef9ff5c4cf83ff..1a95955f1953e57580a0412aeb94caae2c6d0c6c 100644 GIT binary patch delta 5148 zcmcJRdvH@#9>?!J_my*#rYUWqrD;fzM_(jq(xhp5NadkW9_1;eK%2HVQE1B}K~Nrj zDq7iDy5sgZ>Rx3*Mt5NcUDrD(vzA?#tyB;MD_X`aD}y_`j3cb$Agk{Fa@(f8oA6)h z%xC8J?fISadz^DmPSYzdGfiDDGY2xb6BNbo?XccYpLnBcwu7#3J?KylI_Rq7<0y*y zFESvu*N|th7))${UBfP9XOO0wCQBpx4trEg6GcqvZt1S+dUf5pcXUT}GHYk8>=>41 zzGMEue2#Ol85@}onLjeenO`%{F;6i&nT^akW(hN!aWRvav5XGgK{wDB=sfxpdJ7#v z&!cv<8#SORv>eSvUX+E#BLmwlz9}9SJH@BPC&W5&gGd^af&zJ|pZ-~+%pPD{*suZV zztvyYUlz^?Cxzb$uL!>o+Jwi1M})P)B4L)`6ebEO0?*&(|H)tB&+%RSoBUzElYg3j zg0JH@@MZi1d;vd&Pvb@I2kvXGpZl15pL?5ojeCLH&+X;5aMj#OZa(MZayT<*6EohQVI~lZk3J*Ketl!V*`cQ9g|-)F`h;c{IwcQ7(gSM7R>}mlgl;q-W_7I;khX1;YB@JWF%LzR3siAb<%7zLDkmsQR5?JIqRI{mMb$Kl zLf=MJ9;oY4l}ja2=yFtM!{8akj?pE=8j2|+;SH#OP3)zN$}9tVp0UFInoh^^ASt3L$) z;`b_s^C(Os{ULIpEYYmU7Z4}%FtA6i*!OxL0vLb=IDiKP<@G*Y<+YTAFAcPag#<3h z+}6!S3cZP%OD;^OvpTzR7iswsXOiRZVI%ohH+GPw8yK@i#jYYU^#go`*zVw5(%6GV z;yZ)wBSu4K@YR92)-yPbJkx_M>Zma?w`EhidkjUdYH<_Wr#OXF{D2MQiyquZY+u59 z$9}{*a^pjsGd!bpV!JCcqt&L)P)g2X3mx&EN}f80y(0pDa1MVI8OS1CA7QsTkQj;C zTGCzsE0nhGagY+1DOsuR!ZR7t(UxH$rXvOu+53CUlZ$`Gw&BGT`hC2I9`GqUi^h@- zkC_E>;xEvVNFUU7+tS|545(py69(}wJs zh)yN=BJNSX{}`KMg8lRf7D(wOY#vcl_!3?k&vEhypK+h!g|r?YqjDk8ie|wdo65Y9I86$&SzPNn-PJ79#gS&-xB!rmG92M9P8_8QHGL%=CGY6t`Z+6N$4I z|9oK2uJ__e!{LbkGCnpi;Q0)Dh6nat>BE2s=m7(e02qNpzyu@#V}N8J1xN+P0^@)* zARQR5TGfz(n9aU=lDHm;y`%vVd$L2gn8TfN6jo zZ~#sqA8-L~paAdyUZ4>00YyMDFddiy%miiu_XD$m65s(~4loy(2h0Z^1Reqw01JUq zU=gqwSOP2s%7A6Sa$p6p5?BSS1|A010Oi11paNJ2tOqs#l|U8X2daS@pcasTfYKWX z+RGUWRY8QVpv}zdxfi)y_FcA=>0>sb|Db142K`%lI@Lo}bd-!st(Pk>m5DN!rk1z^ zl0UqCn-t>68=Pb(&cgv-=dlL2ZQj16fHeOhm@(d6>?x|3*T&B968hDETkD-Z;9VZ; zO)j+6xUnnV+G}FH-Er0)%;e!%Z&w`e!Aw@idJ}rz-7;Af>+OuQ_FyI}W4#@5yazK` z5$jFp+ci1F`kR2L?c(72%VTFui?jZq_p(^;yt{js#d_!7-Fs=QcXk}_!Ah6JdXr^D z`v2W$BWFd~bvpPY?(D~G));i0N<5HgH3k!8!zTt-?-r|ZhwhYcTX66J?kKz+eM}=d zuAJ&IwV|};Qn_9qImXSCGn+F;9OSEk{0w566TGhyZdGZA|M8H0u@nmVgHp&|6DrPk z4!tv1`a_$na3I0Uy2T#~Nwvk+$V0#ys_{33tTlC#zah_Bz5UU;TKFO)2}IbxE%IQj zwT7ion9$Fn3f=c6v{&AwhTfwIT}Zah zDK92ftwDBSQQ^IcyfIE>??BO}W#z_!@IK$DhBpuO+$%pC6P_gPDtC|(u}8P*)}h$* zA`X(@jG(Dc5G?+x8Yg8LBB8i%~N!%=mlZ2<>UL{c-H;Im0>oQ2=YcKlD54^^WSX9J(v(<@K>IlcG59nDGV0PH(-u NF4o%@$6I~J{11?etx^C0 delta 1839 zcmZ|Odu&rx90%}wPw#E->Fwjf#-44rZnziQ+6{)o$zw7>-UFY2u+qJV8H2zSBN{WJ18B)AtHr+K{Hw~Ejcpi6g2Q>z? z3??0>Wkc!{=x{{b>}-?PN$*IDZA|@L{YAYI*naXlxO)GYKOab5Z1qn6<_l z`oEVtPW7u9a+2NTW;L~g@6}6}ENiH3s>KGj3yxU(wj**g)JVr`FibwJWV`EtdmoX`z98&jS$lR>k z`dO1mMW>lZ-!a>%T3h#o8JIu<8O&hO_k@+IYjN^bNo3sdgFRC{AXHda@N9)nm5I6P z+VYK~h6>`ewaq>%FV^DpU=JT_qE8)ZZVENqhS2J-xtY%NaxZBoRJ$&o;LRenACn8H z>Ljz!Pv^OXzB$R=#U(~R4xBE&-!Y`|*!zsEn&*U5bpX)-yep_7Lxqf7p8VtE%YjZSoQ@yKI$ zba(R{V?w{z%_Wh%yImt_-vh3Y{|L{cz-qfgUwnj@iF9j@Ih&3j<)fqPIF9i?nlm_N z>fxgX$Lt=v^%FrAN(FI~Q0h2oAE^%TmC7PZqS-Gsv3uLfQ}oh)-pTaH8Lo+Rdqu)i zPv)R*zvPOZ+x&BU)>G$}KRDx{$R+*}oj&OHQu-B~<@U=wGx`Kc7x)yBj$Y>bsB4wk zOR5G!!_RqrqHd8aUO82uLJ1JvIpCr7Wl zF~Cb&2f`ezpn?tT-~cCRFa(A|9K^#gNPt90f@Db12g0fO>*D23w%gt+rdsa{K7K&y zwe*_LZ!3?}t7=>;neGd?bQb-w^v7EyO00HeMZf5&)vmo1yz}S@+%>Ke<1|_?)VaV7 z9!P_9@InS;!f<#7vLG9BAQ$pr1mr^j6oL=@Pz1#=5=KD@jD}~S6vn`FPzKM#SQrQ6 zVFHxH3osEX;6<1Oli?+p0#jicOotgT6J|jr%!WDeGQ0w>!fWt4ya97z9?XX~VF4_J uDhR+L2tqZ~KrPhi7wYN@=9)C~Dzb#?y_9j&K37loFSsx3tB%&M=l=psYpOy3