From 0fece7f2f3bacb150ed63a26e8ec4223b69861b6 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 1 Mar 2026 01:53:51 -0500 Subject: [PATCH] feat(batch25): implement gateway URL and registry bookkeeping --- .../Gateway/GatewayTypes.cs | 6 + ...atsServer.Gateways.ConnectionsAndGossip.cs | 127 ++++++++++++++++++ porting.db | Bin 6799360 -> 6803456 bytes 3 files changed, 133 insertions(+) diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Gateway/GatewayTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Gateway/GatewayTypes.cs index f921ed1..ddeb7b4 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Gateway/GatewayTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Gateway/GatewayTypes.cs @@ -15,6 +15,7 @@ using System.Text; using System.Threading; +using System.Linq; using ZB.MOM.NatsNet.Server.Internal; using ZB.MOM.NatsNet.Server.Internal.DataStructures; @@ -256,6 +257,11 @@ internal sealed class SrvGateway public void ReleaseReadLock() => _lock.ExitReadLock(); public void AcquireWriteLock() => _lock.EnterWriteLock(); public void ReleaseWriteLock() => _lock.ExitWriteLock(); + + internal void OrderOutboundConnectionsLocked() + { + Outo = [.. Out.Values.OrderBy(c => c.GetRttValue()).ThenBy(c => c.Cid)]; + } } /// diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Gateways.ConnectionsAndGossip.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Gateways.ConnectionsAndGossip.cs index cd5d823..e01af38 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Gateways.ConnectionsAndGossip.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Gateways.ConnectionsAndGossip.cs @@ -144,6 +144,133 @@ public sealed partial class NatsServer } } + internal void AddGatewayURL(string gatewayName, string gatewayUrl) + { + if (string.IsNullOrWhiteSpace(gatewayUrl)) + return; + + _gateway.AcquireWriteLock(); + try + { + _gateway.Urls.AddUrl(gatewayUrl); + if (_gateway.Remotes.TryGetValue(gatewayName, out var cfg) && Uri.TryCreate(gatewayUrl, UriKind.Absolute, out var url)) + cfg.AddUrls([url]); + } + finally + { + _gateway.ReleaseWriteLock(); + } + } + + internal void RemoveGatewayURL(string gatewayName, string gatewayUrl) + { + if (string.IsNullOrWhiteSpace(gatewayUrl)) + return; + + _gateway.AcquireWriteLock(); + try + { + _gateway.Urls.RemoveUrl(gatewayUrl); + + if (_gateway.Remotes.TryGetValue(gatewayName, out var cfg)) + { + cfg.AcquireWriteLock(); + try + { + cfg.Urls.Remove(gatewayUrl); + cfg.VarzUpdateUrls = true; + } + finally + { + cfg.ReleaseWriteLock(); + } + } + } + finally + { + _gateway.ReleaseWriteLock(); + } + } + + internal void SendAsyncGatewayInfo() + { + byte[] infoProto; + _gateway.AcquireReadLock(); + try + { + infoProto = _gateway.InfoJson is { Length: > 0 } proto + ? [.. proto] + : _gateway.GenerateInfoJSON(); + } + finally + { + _gateway.ReleaseReadLock(); + } + + _mu.EnterReadLock(); + try + { + foreach (var route in _routes.Values.SelectMany(v => v)) + route.EnqueueProto(infoProto); + + foreach (var inbound in _gateway.In.Values) + inbound.EnqueueProto(infoProto); + } + finally + { + _mu.ExitReadLock(); + } + } + + internal string GetGatewayURL() + { + _gateway.AcquireReadLock(); + try { return _gateway.Url; } + finally { _gateway.ReleaseReadLock(); } + } + + internal string GetGatewayName() + { + _gateway.AcquireReadLock(); + try { return _gateway.Name; } + finally { _gateway.ReleaseReadLock(); } + } + + internal void RegisterInboundGatewayConnection(ClientConnection connection) + { + _gateway.AcquireWriteLock(); + try { _gateway.In[connection.Cid] = connection; } + finally { _gateway.ReleaseWriteLock(); } + } + + internal void RegisterOutboundGatewayConnection(string gatewayName, ClientConnection connection) + { + _gateway.AcquireWriteLock(); + try + { + _gateway.Out[gatewayName] = connection; + _gateway.OrderOutboundConnectionsLocked(); + } + finally + { + _gateway.ReleaseWriteLock(); + } + } + + internal ClientConnection? GetOutboundGatewayConnection(string gatewayName) + { + _gateway.AcquireReadLock(); + try { return _gateway.Out.GetValueOrDefault(gatewayName); } + finally { _gateway.ReleaseReadLock(); } + } + + internal IReadOnlyList GetOutboundGatewayConnections() + { + _gateway.AcquireReadLock(); + try { return [.. _gateway.Outo]; } + finally { _gateway.ReleaseReadLock(); } + } + public int NumOutboundGateways() => NumOutboundGatewaysInternal(); internal int NumOutboundGatewaysInternal() diff --git a/porting.db b/porting.db index 5bf203332e6b143ced4e227bf2974b4eb30e785c..07133d61e94225679ffad29c3fef9ff5c4cf83ff 100644 GIT binary patch delta 3816 zcmai$du&uy9>>qUcV1^+t#vxn$IP_T$Bfd)JZ@+D;L(->iok-fxKczrWv0U)J5Xxb zCCiewuDr9D7R++lTqo(D7W=b z!d1S=tVM1!+dfp@z?)97(c5xf=@&&V!#(~nSDbJO0eY;ovPQVI-Lr-CbZKBbV;94@ywS;8+-WdO;Ts{Bw5s;Y(ZsHz6aqN-{_xPPgt3hHxJ z`JiSczn$;NPEetee|UV?uo-J12jvLsAp7J9YaqMj2>p}oa#W8fBS+|)49Zau(KCbIxtIrZMjr0FL)L+AT&(Tj=`WqTy6Hf(gEPIYtv;2Adn|qVDnUjg0 zMB?e+o~N9~v-?9@^+S&|`4;UISo93ND5YlTDo)z+KCR>==^Yy4%CQH_DhT(`R6ZJ7 z$#a$L(rG%&W)7RH+4O1Jz=Sh2z}`DeFSAhE9Av?EJ7r(IOFQKz%H$^Ydk=(BV&i_w z`evv{n*M-R%s2JjEcLThAJ7_2&&)YX|E~0mbW+YNkPEr+M9WA5iD(MZE zs4|7JC+yKn^a>08hc>Y%KGThn-uR5#I3@M<=hvoGH`UjqUw>D^f%rr-V z9XNm!xPTj!fpTygr~tPE52yrQ-~&~l8q|PV;0FN^1a+Vugg^sm1WlkBgh2${0qz6~ zKnrLEZJ-@QK@7w}0*IgkBtZ&vf-cYvdcZ=k2rLFmKriS6{a`5=0L#E~umao#?glHt zJzy1B4c34kfwf>AxEHJk8^DjjMsOea3Ai6@0)t>PcmQkx4}yol5J-bzuoYxL_G#1H z$X}KO$N^IJfcu2o<2vU2uXD3Qa#Yy!^bOi*d)YQ%J!MT=p0`BIN6poy!=?t|C1EN5 zHb2aL#qHxN$N|=0o?huS6&6zx-~)RC3&oLadVJSd_F=P^h4!Rd*<>MCE{I}+?L3yN zPu0JR& zByKFh;_EZDk00bK zxn&P2+wGonZ*vu0_d92tEsjU+AK4dCX8X#v#`=s^u&~gaYfy?!Z+OMbiVL~rz9~~- zjj~kj%@JWc2R4Z;_(+ZliBw#Bb5Qx_!1Sx-=~=pKes_-kIoHOgI!k(2$zHBm*#yNI zE9tyPaW-k39jTJem5Q@bb+)t7V>$2q1*f)ox6)?3Zi{k=FnK$ty9-t8T}s7zwPG8e zNS2&?g;Mn1e5RI_KR*&+eZS8+*^bw9l|sC`^pNFBaoir)-b{4SD#TNzt;-ZEE{^L~ zcJ5@ZYJO#YWdlrw0pq@Dt`0%$(ydY*w$wOGb6sGP_@SKdhjaBC2Wc({K}_j(Q~7qm t(dsh}(_adLm=v{7SRL70Vx;aO5JdgCP}jG_I8yWS^P;FdCdyB^{{eMx#Ek#| delta 1842 zcmY+>eN0tl90&08e(&$x7cQ3%CH27?G{EEI^ws zW=?eLw>76Y|ETp3YRi*a)0`{Y%AswFQH&Q+6fsO!TUz$zx$~?$_SwGQ=XcL@p7YS$ zzF%!_-mmV;(k)3cHn#=KW$SQlMOZH0dLV2a2+QjasXgjBcJ#{CiuN@(B-f=|dwb=m zY?!CU{`g9te7{L+mn+x5Uf=LqY*Wl}7j3@X?x0Px+aG8lyWK|1u-hn_X}90CcIm#| zZlT?>+Xz~(-G;TMcIkp$Z(w-FZr9O{+HDB!GrL_y`^0X8XglmSfcC!K`q3Kf*5^?J z%EPAnJ*w3+AcwNGokp3&qkZerd2e!^pDTQd@5>xzifnC^m0n#jLsq3j8IqgE9*@R0 z#2Oo8QN1Xkg%Vm}LYt7#3KCjgLJKCe+=Q0n;WK-D({uFk3Co!YErS>B_GMXxrcxSf zk|Zhh%}sAb^i(8Mq!d!q6p{Q`+aXdi(iV|?NbiW0gtSp4FH%$_4^ov#Zlp?)Tu2K< zGLhzrL`YMSiX*xcaiUmsAmxZ;AjOZRBgK!U9c@swTIVrW2bPS@mb+7Fl)3F^C1~wi zo|J5E@90)Q1r2mCz+rWCJ6GHdIMXGUR3>@%c*@;wvr+HTELtO#Il{_4xw&m#hBfXI zEs?FiFHo6ky?vSTWd60AZt{We^#E7?Liv2Mn*v;0qa|_4MY@K2kQewRc~#4+;mn1K zoW@0$sFOQ)>zTakvf<`n7tQ8FKdMeE(nV>qxi!5tH1_+g#Cew}m4|yMo9ln0Kl${# zMu3}psDgVpsa`(ULqG5lSFTtK@tiKo75nh4P8#IhGx6h+d)abZ< zIH(w$xIm?21JB2YbR!s5N8AA%V+05K>na7sz$&~(4mo}YVzP3GCdME6aELC(|BGUpJb@{L{EK3_RR)11}MSFK*E1?0*IKEC$<$@zr8*WSBmPf3?`E=H|n#cHX4= z|GhkageHx>+{6Ez%yw+oMhQ%Cfg3#Fg(UDnGWa0{QXvh}VH{*YCX9zH$c6yqKrRF! z5AvY^CO{#Cpa>?yBQObyVKO`lQy>f_PzqDwF_;F^p$ulgOqd0;VGhiN$KeSmhYFYn z^WjN&3Z8}qun?YsMX(r_z*1NS&%$$13Cm#xtc2%b6|9CA;6-=|s-POyKn<*gm!THc zK?K%A6k?xhqZ>|*cTs_qw8s0DH`Q~{{lH!AvRr<%p3YK%m0VJzlh%^AubAIBUYBCc JK34ag@gElyl&}B*