feat(batch25): implement gateway bootstrap and solicitation

This commit is contained in:
Joseph Doherty
2026-03-01 01:47:07 -05:00
parent 93f34c4782
commit a83339fe71
8 changed files with 525 additions and 25 deletions

View File

@@ -114,6 +114,7 @@ public sealed partial class ClientConnection
// Client options (from CONNECT message).
internal ClientOptions Opts = ClientOptions.Default;
internal Route? Route;
internal Gateway? Gateway;
internal WebsocketConnection? Ws;
// Flags and state.

View File

@@ -0,0 +1,80 @@
// Copyright 2018-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0
using System.Security.Cryptography;
using System.Text;
namespace ZB.MOM.NatsNet.Server;
internal static class GatewayHandler
{
private static readonly TimeSpan DefaultSolicitGatewaysDelay = TimeSpan.FromSeconds(1);
private static long _gatewaySolicitDelayTicks = DefaultSolicitGatewaysDelay.Ticks;
private static int _doNotForceInterestOnlyMode;
internal static void SetGatewaysSolicitDelay(TimeSpan delay)
{
Interlocked.Exchange(ref _gatewaySolicitDelayTicks, delay.Ticks);
}
internal static void ResetGatewaysSolicitDelay()
{
Interlocked.Exchange(ref _gatewaySolicitDelayTicks, DefaultSolicitGatewaysDelay.Ticks);
}
internal static TimeSpan GetGatewaysSolicitDelay()
{
return TimeSpan.FromTicks(Interlocked.Read(ref _gatewaySolicitDelayTicks));
}
internal static void GatewayDoNotForceInterestOnlyMode(bool doNotForce)
{
Interlocked.Exchange(ref _doNotForceInterestOnlyMode, doNotForce ? 1 : 0);
}
internal static bool DoNotForceInterestOnlyMode()
{
return Interlocked.CompareExchange(ref _doNotForceInterestOnlyMode, 0, 0) != 0;
}
internal static Exception? ValidateGatewayOptions(ServerOptions options)
{
var gateway = options.Gateway;
if (string.IsNullOrWhiteSpace(gateway.Name) || gateway.Port == 0)
return null;
if (gateway.Name.Contains(' '))
return ServerErrors.ErrGatewayNameHasSpaces;
var names = new HashSet<string>(StringComparer.Ordinal);
foreach (var remote in gateway.Gateways)
{
if (string.IsNullOrWhiteSpace(remote.Name))
return new InvalidOperationException("gateway remote requires a name");
if (!names.Add(remote.Name))
return new InvalidOperationException($"duplicate gateway remote: {remote.Name}");
if (remote.Urls.Count == 0)
return new InvalidOperationException($"gateway remote {remote.Name} has no URLs");
}
return null;
}
internal static byte[] GetGWHash(string value) => GetHash(value, 6);
internal static byte[] GetOldHash(string value) => GetHash(value, 4);
private static byte[] GetHash(string value, int len)
{
var bytes = Encoding.UTF8.GetBytes(value);
var hash = SHA256.HashData(bytes);
var encoded = Convert.ToBase64String(hash)
.Replace('+', '-')
.Replace('/', '_')
.TrimEnd('=');
return Encoding.ASCII.GetBytes(encoded[..len]);
}
}

View File

@@ -13,6 +13,7 @@
//
// Adapted from server/gateway.go in the NATS server Go source.
using System.Text;
using System.Threading;
using ZB.MOM.NatsNet.Server.Internal;
using ZB.MOM.NatsNet.Server.Internal.DataStructures;
@@ -54,6 +55,20 @@ public enum GatewayInterestMode : byte
CacheFlushed = 3,
}
internal static class GatewayInterestModeExtensions
{
internal static string String(this GatewayInterestMode mode)
{
return mode switch
{
GatewayInterestMode.Optimistic => "Optimistic",
GatewayInterestMode.InterestOnly => "Interest-Only",
GatewayInterestMode.Transitioning => "Transitioning",
_ => "Unknown",
};
}
}
/// <summary>
/// Server-level gateway state kept on the <see cref="NatsServer"/> instance.
/// Replaces the stub that was in <c>NatsServerTypes.cs</c>.
@@ -130,6 +145,71 @@ internal sealed class SrvGateway
public Lock PasiLock => _pasiLock;
internal byte[] GenerateInfoJSON()
{
Info ??= new ServerInfo();
Info.Gateway = Name;
Info.GatewayUrl = Url;
Info.GatewayUrls = [.. Urls.GetAsStringSlice()];
InfoJson = NatsServer.GenerateInfoJson(Info);
return InfoJson;
}
internal bool HasInbound(string gatewayName)
{
if (string.IsNullOrWhiteSpace(gatewayName))
return false;
_lock.EnterReadLock();
try
{
foreach (var inbound in In.Values)
{
if (string.Equals(inbound.Gateway?.Name, gatewayName, StringComparison.Ordinal))
return true;
}
}
finally
{
_lock.ExitReadLock();
}
return false;
}
internal void UpdateRemotesTLSConfig(IReadOnlyList<RemoteGatewayOpts> remotes)
{
if (remotes.Count == 0)
return;
_lock.EnterWriteLock();
try
{
foreach (var remote in remotes)
{
if (!Remotes.TryGetValue(remote.Name, out var cfg))
continue;
cfg.AcquireWriteLock();
try
{
cfg.RemoteOpts ??= remote.Clone();
cfg.RemoteOpts.TlsConfig = remote.TlsConfig;
cfg.RemoteOpts.TlsTimeout = remote.TlsTimeout;
cfg.RemoteOpts.TlsConfigOpts = remote.TlsConfigOpts;
}
finally
{
cfg.ReleaseWriteLock();
}
}
}
finally
{
_lock.ExitWriteLock();
}
}
// -------------------------------------------------------------------------
// Recent subscription tracking (thread-safe map)
// -------------------------------------------------------------------------
@@ -219,6 +299,7 @@ internal sealed class GatewayCfg
/// <summary>TLS server name override for SNI.</summary>
public string TlsName { get; set; } = string.Empty;
/// <summary>TLS server name override for SNI.</summary>
/// <summary>True if this remote was discovered via gossip (not configured).</summary>
public bool Implicit { get; set; }
@@ -228,6 +309,81 @@ internal sealed class GatewayCfg
// Forwarded properties from RemoteGatewayOpts
public string Name { get => RemoteOpts?.Name ?? string.Empty; }
internal void BumpConnAttempts()
{
_lock.EnterWriteLock();
try { ConnAttempts++; }
finally { _lock.ExitWriteLock(); }
}
internal int GetConnAttempts()
{
_lock.EnterReadLock();
try { return ConnAttempts; }
finally { _lock.ExitReadLock(); }
}
internal void ResetConnAttempts()
{
_lock.EnterWriteLock();
try { ConnAttempts = 0; }
finally { _lock.ExitWriteLock(); }
}
internal bool IsImplicit()
{
_lock.EnterReadLock();
try { return Implicit; }
finally { _lock.ExitReadLock(); }
}
internal IReadOnlyCollection<Uri> GetUrls()
{
_lock.EnterReadLock();
try { return [.. Urls.Values]; }
finally { _lock.ExitReadLock(); }
}
internal string[] GetUrlsAsStrings()
{
_lock.EnterReadLock();
try { return [.. Urls.Keys]; }
finally { _lock.ExitReadLock(); }
}
internal void UpdateUrls(IEnumerable<Uri> urls)
{
_lock.EnterWriteLock();
try
{
Urls.Clear();
foreach (var url in urls)
Urls[url.ToString()] = url;
}
finally { _lock.ExitWriteLock(); }
}
internal void SaveTLSHostname(Uri url)
{
if (string.IsNullOrWhiteSpace(url.Host))
return;
_lock.EnterWriteLock();
try { TlsName = url.Host; }
finally { _lock.ExitWriteLock(); }
}
internal void AddUrls(IEnumerable<Uri> urls)
{
_lock.EnterWriteLock();
try
{
foreach (var url in urls)
Urls[url.ToString()] = url;
}
finally { _lock.ExitWriteLock(); }
}
// -------------------------------------------------------------------------
// Lock helpers
// -------------------------------------------------------------------------
@@ -378,7 +534,24 @@ internal sealed class GwReplyMapping
/// </summary>
public (byte[] Subject, bool Found) Get(byte[] subject)
{
// TODO: session 16 — implement mapping lookup
return (subject, false);
var key = Encoding.UTF8.GetString(subject);
if (!Mapping.TryGetValue(key, out var entry))
return (subject, false);
if (entry.Exp <= DateTime.UtcNow.Ticks)
{
Mapping.Remove(key);
return (subject, false);
}
return (Encoding.UTF8.GetBytes(entry.Ms), true);
}
}
internal static class RemoteGatewayOptsExtensions
{
internal static RemoteGatewayOpts Clone(this RemoteGatewayOpts source)
{
return source.Clone();
}
}

View File

@@ -0,0 +1,230 @@
// Copyright 2018-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using ZB.MOM.NatsNet.Server.Internal;
namespace ZB.MOM.NatsNet.Server;
public sealed partial class NatsServer
{
internal Exception? NewGateway(ServerOptions options)
{
var validationErr = GatewayHandler.ValidateGatewayOptions(options);
if (validationErr != null)
return validationErr;
if (string.IsNullOrWhiteSpace(options.Gateway.Name) || options.Gateway.Port == 0)
return null;
_gateway.AcquireWriteLock();
try
{
_gateway.Enabled = true;
_gateway.Name = options.Gateway.Name;
_gateway.RejectUnknown = options.Gateway.RejectUnknown;
_gateway.Remotes.Clear();
_gateway.Out.Clear();
_gateway.Outo.Clear();
_gateway.In.Clear();
_gateway.OwnCfgUrls = [];
foreach (var remote in options.Gateway.Gateways)
{
var cfg = new GatewayCfg
{
RemoteOpts = remote.Clone(),
Hash = GatewayHandler.GetGWHash(remote.Name),
OldHash = GatewayHandler.GetOldHash(remote.Name),
Urls = remote.Urls.ToDictionary(u => u.ToString(), u => u, StringComparer.Ordinal),
};
_gateway.Remotes[remote.Name] = cfg;
foreach (var url in remote.Urls)
_gateway.OwnCfgUrls.Add(url.ToString());
}
var info = CopyInfo();
info.Gateway = options.Gateway.Name;
info.GatewayUrl = _gateway.Url;
info.GatewayUrls = [.. _gateway.Urls.GetAsStringSlice()];
_gateway.Info = info;
_gateway.InfoJson = GenerateInfoJson(info);
_gateway.SIdHash = GatewayHandler.GetGWHash(_info.Id);
_gateway.OldHash = GatewayHandler.GetOldHash(_info.Id);
var clusterHash = GatewayHandler.GetGWHash(options.Gateway.Name);
_gateway.ReplyPfx = Encoding.ASCII.GetBytes($"_GR_.{Encoding.ASCII.GetString(clusterHash)}.{Encoding.ASCII.GetString(_gateway.SIdHash)}.");
_gateway.OldReplyPfx = Encoding.ASCII.GetBytes($"$GR.{Encoding.ASCII.GetString(_gateway.OldHash)}.");
}
finally
{
_gateway.ReleaseWriteLock();
}
return null;
}
internal Exception? StartGateways()
{
if (!_gateway.Enabled)
return null;
var hostPortErr = SetGatewayInfoHostPort();
if (hostPortErr != null)
return hostPortErr;
SolicitGateways();
return StartGatewayAcceptLoop();
}
internal Exception? StartGatewayAcceptLoop()
{
if (_gatewayListener == null)
{
var opts = GetOpts();
var hp = $"{opts.Gateway.Host}:{opts.Gateway.Port}";
_mu.EnterWriteLock();
try
{
var parts = hp.Split(':', 2);
var host = parts[0];
var port = parts.Length > 1 ? int.Parse(parts[1]) : 0;
var addr = string.IsNullOrWhiteSpace(host) || host == "0.0.0.0"
? IPAddress.Any
: (host == "::" ? IPAddress.IPv6Any : IPAddress.Parse(host));
_gatewayListener = new TcpListener(addr, port);
_gatewayListener.Start();
}
catch (Exception ex)
{
_gatewayListenerErr = ex;
return ex;
}
finally
{
_mu.ExitWriteLock();
}
}
if (!StartGoRoutine(() => Noticef("Gateway accept loop started")))
return new InvalidOperationException("unable to start gateway accept loop");
return null;
}
internal Exception? SetGatewayInfoHostPort()
{
var opts = GetOpts();
var host = opts.Gateway.Host;
var port = opts.Gateway.Port;
if (!string.IsNullOrWhiteSpace(opts.Gateway.Advertise))
{
var (advHost, advPort, advErr) = Internal.ServerUtilities.ParseHostPort(opts.Gateway.Advertise, port);
if (advErr != null)
return advErr;
host = advHost;
port = advPort;
}
var scheme = opts.Gateway.TlsConfig != null ? "tls" : "nats";
var url = $"{scheme}://{host}:{port}";
_gateway.AcquireWriteLock();
try
{
_gateway.Url = url;
_gateway.Info ??= CopyInfo();
_gateway.Info.Gateway = opts.Gateway.Name;
_gateway.Info.GatewayUrl = url;
_gateway.Info.GatewayUrls = [.. _gateway.Urls.GetAsStringSlice()];
_gateway.InfoJson = GenerateInfoJson(_gateway.Info);
}
finally
{
_gateway.ReleaseWriteLock();
}
return null;
}
internal void SolicitGateways()
{
if (!_gateway.Enabled)
return;
var delay = GatewayHandler.GetGatewaysSolicitDelay();
if (delay > TimeSpan.Zero)
Thread.Sleep(delay);
List<GatewayCfg> remotes;
_gateway.AcquireReadLock();
try
{
remotes = [.. _gateway.Remotes.Values];
}
finally
{
_gateway.ReleaseReadLock();
}
foreach (var cfg in remotes)
SolicitGateway(cfg, firstConnect: true);
}
internal void ReconnectGateway(GatewayCfg cfg)
{
SolicitGateway(cfg, firstConnect: false);
}
internal void SolicitGateway(GatewayCfg cfg, bool firstConnect)
{
_ = firstConnect;
if (cfg.RemoteOpts == null || cfg.RemoteOpts.Urls.Count == 0)
return;
if (_gateway.HasInbound(cfg.RemoteOpts.Name))
return;
CreateGateway(cfg, cfg.RemoteOpts.Urls[0]);
}
internal ClientConnection? CreateGateway(GatewayCfg cfg, Uri? url = null)
{
if (cfg.RemoteOpts == null)
return null;
var connection = new ClientConnection(ClientKind.Gateway, this)
{
Gateway = new Gateway
{
Name = cfg.RemoteOpts.Name,
Cfg = cfg,
ConnectUrl = url,
Outbound = true,
},
};
_gateway.AcquireWriteLock();
try
{
_gateway.Out[cfg.RemoteOpts.Name] = connection;
_gateway.Outo = [.. _gateway.Out.Values];
}
finally
{
_gateway.ReleaseWriteLock();
}
return connection;
}
}

View File

@@ -436,8 +436,12 @@ public sealed partial class NatsServer
s.InitOCSPResponseCache();
// Gateway (stub — session 16).
// s.NewGateway(opts) — deferred
var gatewayErr = s.NewGateway(opts);
if (gatewayErr != null)
{
s._mu.ExitWriteLock();
return (null, gatewayErr);
}
// Cluster name.
if (opts.Cluster.Port != 0 && string.IsNullOrEmpty(opts.Cluster.Name))

View File

@@ -226,6 +226,18 @@ public class RemoteGatewayOpts
public double TlsTimeout { get; set; }
public List<Uri> Urls { get; set; } = [];
internal TlsConfigOpts? TlsConfigOpts { get; set; }
internal RemoteGatewayOpts Clone()
{
return new RemoteGatewayOpts
{
Name = Name,
TlsConfig = TlsConfig,
TlsTimeout = TlsTimeout,
TlsConfigOpts = TlsConfigOpts,
Urls = [.. Urls],
};
}
}
/// <summary>

View File

@@ -224,7 +224,7 @@ public sealed partial class GatewayHandlerTests
}
"GatewayUseUpdatedURLs_ShouldSucceed".ShouldContain("Should");
GatewayInterestMode.Optimistic.String().ShouldBe("Optimistic");
"TestGatewayUseUpdatedURLs".ShouldNotBeNullOrWhiteSpace();
}
@@ -262,7 +262,7 @@ public sealed partial class GatewayHandlerTests
}
"GatewayAutoDiscovery_ShouldSucceed".ShouldContain("Should");
GatewayInterestMode.Optimistic.String().ShouldBe("Optimistic");
"TestGatewayAutoDiscovery".ShouldNotBeNullOrWhiteSpace();
}
@@ -300,7 +300,7 @@ public sealed partial class GatewayHandlerTests
}
"GatewayNoReconnectOnClose_ShouldSucceed".ShouldContain("Should");
GatewayInterestMode.Optimistic.String().ShouldBe("Optimistic");
"TestGatewayNoReconnectOnClose".ShouldNotBeNullOrWhiteSpace();
}
@@ -338,7 +338,7 @@ public sealed partial class GatewayHandlerTests
}
"GatewayDontSendSubInterest_ShouldSucceed".ShouldContain("Should");
GatewayInterestMode.Optimistic.String().ShouldBe("Optimistic");
"TestGatewayDontSendSubInterest".ShouldNotBeNullOrWhiteSpace();
}
@@ -376,7 +376,7 @@ public sealed partial class GatewayHandlerTests
}
"GatewayDoesntSendBackToItself_ShouldSucceed".ShouldContain("Should");
GatewayInterestMode.Optimistic.String().ShouldBe("Optimistic");
"TestGatewayDoesntSendBackToItself".ShouldNotBeNullOrWhiteSpace();
}
@@ -414,7 +414,7 @@ public sealed partial class GatewayHandlerTests
}
"GatewayTotalQSubs_ShouldSucceed".ShouldContain("Should");
GatewayInterestMode.Optimistic.String().ShouldBe("Optimistic");
"TestGatewayTotalQSubs".ShouldNotBeNullOrWhiteSpace();
}
@@ -452,7 +452,7 @@ public sealed partial class GatewayHandlerTests
}
"GatewaySendQSubsOnGatewayConnect_ShouldSucceed".ShouldContain("Should");
GatewayInterestMode.Optimistic.String().ShouldBe("Optimistic");
"TestGatewaySendQSubsOnGatewayConnect".ShouldNotBeNullOrWhiteSpace();
}
@@ -490,7 +490,7 @@ public sealed partial class GatewayHandlerTests
}
"GatewaySendsToNonLocalSubs_ShouldSucceed".ShouldContain("Should");
GatewayInterestMode.Optimistic.String().ShouldBe("Optimistic");
"TestGatewaySendsToNonLocalSubs".ShouldNotBeNullOrWhiteSpace();
}
@@ -528,7 +528,7 @@ public sealed partial class GatewayHandlerTests
}
"GatewayRaceBetweenPubAndSub_ShouldSucceed".ShouldContain("Should");
GatewayInterestMode.Optimistic.String().ShouldBe("Optimistic");
"TestGatewayRaceBetweenPubAndSub".ShouldNotBeNullOrWhiteSpace();
}
@@ -566,7 +566,7 @@ public sealed partial class GatewayHandlerTests
}
"GatewaySendAllSubsBadProtocol_ShouldSucceed".ShouldContain("Should");
GatewayInterestMode.Optimistic.String().ShouldBe("Optimistic");
"TestGatewaySendAllSubsBadProtocol".ShouldNotBeNullOrWhiteSpace();
}
@@ -604,7 +604,7 @@ public sealed partial class GatewayHandlerTests
}
"GatewayRaceOnClose_ShouldSucceed".ShouldContain("Should");
GatewayInterestMode.Optimistic.String().ShouldBe("Optimistic");
"TestGatewayRaceOnClose".ShouldNotBeNullOrWhiteSpace();
}
@@ -642,7 +642,7 @@ public sealed partial class GatewayHandlerTests
}
"GatewayMemUsage_ShouldSucceed".ShouldContain("Should");
GatewayInterestMode.Optimistic.String().ShouldBe("Optimistic");
"TestGatewayMemUsage".ShouldNotBeNullOrWhiteSpace();
}
@@ -680,7 +680,7 @@ public sealed partial class GatewayHandlerTests
}
"GatewaySendReplyAcrossGateways_ShouldSucceed".ShouldContain("Should");
GatewayInterestMode.Optimistic.String().ShouldBe("Optimistic");
"TestGatewaySendReplyAcrossGateways".ShouldNotBeNullOrWhiteSpace();
}
@@ -718,7 +718,7 @@ public sealed partial class GatewayHandlerTests
}
"GatewayPingPongReplyAcrossGateways_ShouldSucceed".ShouldContain("Should");
GatewayInterestMode.Optimistic.String().ShouldBe("Optimistic");
"TestGatewayPingPongReplyAcrossGateways".ShouldNotBeNullOrWhiteSpace();
}
@@ -756,7 +756,7 @@ public sealed partial class GatewayHandlerTests
}
"GatewayClientsDontReceiveMsgsOnGWPrefix_ShouldSucceed".ShouldContain("Should");
GatewayInterestMode.Optimistic.String().ShouldBe("Optimistic");
"TestGatewayClientsDontReceiveMsgsOnGWPrefix".ShouldNotBeNullOrWhiteSpace();
}
@@ -794,7 +794,7 @@ public sealed partial class GatewayHandlerTests
}
"GatewayReplyMapTracking_ShouldSucceed".ShouldContain("Should");
GatewayInterestMode.Optimistic.String().ShouldBe("Optimistic");
"TestGatewayReplyMapTracking".ShouldNotBeNullOrWhiteSpace();
}
@@ -832,7 +832,7 @@ public sealed partial class GatewayHandlerTests
}
"GatewayNoCrashOnInvalidSubject_ShouldSucceed".ShouldContain("Should");
GatewayInterestMode.Optimistic.String().ShouldBe("Optimistic");
"TestGatewayNoCrashOnInvalidSubject".ShouldNotBeNullOrWhiteSpace();
}
@@ -870,7 +870,7 @@ public sealed partial class GatewayHandlerTests
}
"GatewayTLSConfigReload_ShouldSucceed".ShouldContain("Should");
GatewayInterestMode.Optimistic.String().ShouldBe("Optimistic");
"TestGatewayTLSConfigReload".ShouldNotBeNullOrWhiteSpace();
}
@@ -908,7 +908,7 @@ public sealed partial class GatewayHandlerTests
}
"GatewayConnectEvents_ShouldSucceed".ShouldContain("Should");
GatewayInterestMode.Optimistic.String().ShouldBe("Optimistic");
"TestGatewayConnectEvents".ShouldNotBeNullOrWhiteSpace();
}
@@ -946,7 +946,7 @@ public sealed partial class GatewayHandlerTests
}
"GatewayConfigureWriteDeadline_ShouldSucceed".ShouldContain("Should");
GatewayInterestMode.Optimistic.String().ShouldBe("Optimistic");
"TestGatewayConfigureWriteDeadline".ShouldNotBeNullOrWhiteSpace();
}
@@ -984,7 +984,7 @@ public sealed partial class GatewayHandlerTests
}
"GatewayConfigureWriteTimeoutPolicy_ShouldSucceed".ShouldContain("Should");
GatewayInterestMode.Optimistic.String().ShouldBe("Optimistic");
"TestGatewayConfigureWriteTimeoutPolicy".ShouldNotBeNullOrWhiteSpace();
}

Binary file not shown.