feat(batch25): implement gateway URL and registry bookkeeping

This commit is contained in:
Joseph Doherty
2026-03-01 01:53:51 -05:00
parent 1763304e28
commit 0fece7f2f3
3 changed files with 133 additions and 0 deletions

View File

@@ -15,6 +15,7 @@
using System.Text;
using System.Threading;
using System.Linq;
using ZB.MOM.NatsNet.Server.Internal;
using ZB.MOM.NatsNet.Server.Internal.DataStructures;
@@ -256,6 +257,11 @@ internal sealed class SrvGateway
public void ReleaseReadLock() => _lock.ExitReadLock();
public void AcquireWriteLock() => _lock.EnterWriteLock();
public void ReleaseWriteLock() => _lock.ExitWriteLock();
internal void OrderOutboundConnectionsLocked()
{
Outo = [.. Out.Values.OrderBy(c => c.GetRttValue()).ThenBy(c => c.Cid)];
}
}
/// <summary>

View File

@@ -144,6 +144,133 @@ public sealed partial class NatsServer
}
}
internal void AddGatewayURL(string gatewayName, string gatewayUrl)
{
if (string.IsNullOrWhiteSpace(gatewayUrl))
return;
_gateway.AcquireWriteLock();
try
{
_gateway.Urls.AddUrl(gatewayUrl);
if (_gateway.Remotes.TryGetValue(gatewayName, out var cfg) && Uri.TryCreate(gatewayUrl, UriKind.Absolute, out var url))
cfg.AddUrls([url]);
}
finally
{
_gateway.ReleaseWriteLock();
}
}
internal void RemoveGatewayURL(string gatewayName, string gatewayUrl)
{
if (string.IsNullOrWhiteSpace(gatewayUrl))
return;
_gateway.AcquireWriteLock();
try
{
_gateway.Urls.RemoveUrl(gatewayUrl);
if (_gateway.Remotes.TryGetValue(gatewayName, out var cfg))
{
cfg.AcquireWriteLock();
try
{
cfg.Urls.Remove(gatewayUrl);
cfg.VarzUpdateUrls = true;
}
finally
{
cfg.ReleaseWriteLock();
}
}
}
finally
{
_gateway.ReleaseWriteLock();
}
}
internal void SendAsyncGatewayInfo()
{
byte[] infoProto;
_gateway.AcquireReadLock();
try
{
infoProto = _gateway.InfoJson is { Length: > 0 } proto
? [.. proto]
: _gateway.GenerateInfoJSON();
}
finally
{
_gateway.ReleaseReadLock();
}
_mu.EnterReadLock();
try
{
foreach (var route in _routes.Values.SelectMany(v => v))
route.EnqueueProto(infoProto);
foreach (var inbound in _gateway.In.Values)
inbound.EnqueueProto(infoProto);
}
finally
{
_mu.ExitReadLock();
}
}
internal string GetGatewayURL()
{
_gateway.AcquireReadLock();
try { return _gateway.Url; }
finally { _gateway.ReleaseReadLock(); }
}
internal string GetGatewayName()
{
_gateway.AcquireReadLock();
try { return _gateway.Name; }
finally { _gateway.ReleaseReadLock(); }
}
internal void RegisterInboundGatewayConnection(ClientConnection connection)
{
_gateway.AcquireWriteLock();
try { _gateway.In[connection.Cid] = connection; }
finally { _gateway.ReleaseWriteLock(); }
}
internal void RegisterOutboundGatewayConnection(string gatewayName, ClientConnection connection)
{
_gateway.AcquireWriteLock();
try
{
_gateway.Out[gatewayName] = connection;
_gateway.OrderOutboundConnectionsLocked();
}
finally
{
_gateway.ReleaseWriteLock();
}
}
internal ClientConnection? GetOutboundGatewayConnection(string gatewayName)
{
_gateway.AcquireReadLock();
try { return _gateway.Out.GetValueOrDefault(gatewayName); }
finally { _gateway.ReleaseReadLock(); }
}
internal IReadOnlyList<ClientConnection> GetOutboundGatewayConnections()
{
_gateway.AcquireReadLock();
try { return [.. _gateway.Outo]; }
finally { _gateway.ReleaseReadLock(); }
}
public int NumOutboundGateways() => NumOutboundGatewaysInternal();
internal int NumOutboundGatewaysInternal()