feat(batch25): merge gateways

This commit is contained in:
Joseph Doherty
2026-03-01 02:31:54 -05:00
23 changed files with 2288 additions and 26 deletions

View File

@@ -358,6 +358,12 @@ public sealed partial class Account : INatsAccount
/// </summary>
internal byte[]? ServiceImportReply { get; set; }
/// <summary>
/// Gateway reply mapping table used for routed reply restoration.
/// Mirrors Go <c>gwReplyMapping</c>.
/// </summary>
internal GwReplyMapping GwReplyMapping { get; } = new();
/// <summary>
/// Subscription ID counter for internal use.
/// Mirrors Go <c>isid uint64</c>.

View File

@@ -0,0 +1,224 @@
// Copyright 2018-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0
using System.Text;
using System.Text.Json;
using ZB.MOM.NatsNet.Server.Internal;
namespace ZB.MOM.NatsNet.Server;
public sealed partial class ClientConnection
{
internal void SendMsgToGateways(Account? account, byte[] subject, byte[]? reply, byte[] payload)
{
if (Server is not NatsServer server || account == null || subject.Length == 0)
return;
var outboundGateways = server.GetOutboundGatewayConnections();
if (outboundGateways.Count == 0)
return;
foreach (var gateway in outboundGateways)
{
if (!GatewayInterest(account, Encoding.ASCII.GetString(subject)))
continue;
var replyToSend = reply;
if (reply is { Length: > 0 } && gateway.Gateway != null)
{
var shouldMap = server.ShouldMapReplyForGatewaySend(reply, gateway.Gateway.UseOldPrefix);
if (shouldMap)
replyToSend = reply;
}
var proto = MsgHeader(subject, replyToSend, new Internal.Subscription { Sid = Encoding.ASCII.GetBytes("1") });
gateway.EnqueueProto(proto);
gateway.EnqueueProto(payload);
gateway.EnqueueProto(Encoding.ASCII.GetBytes("\r\n"));
}
}
internal void SendAccountUnsubToGateway(byte[] accountName)
{
if (accountName.Length == 0)
return;
lock (_mu)
{
Gateway ??= new Gateway();
Gateway.InSim ??= new Dictionary<string, InSide>(StringComparer.Ordinal);
var key = Encoding.ASCII.GetString(accountName);
if (!Gateway.InSim.TryGetValue(key, out var entry) || entry != null)
{
Gateway.InSim[key] = null!;
var proto = Encoding.ASCII.GetBytes($"A- {key}\r\n");
EnqueueProto(proto);
if (Trace)
TraceOutOp(string.Empty, proto.AsSpan(0, proto.Length - 2).ToArray());
}
}
}
internal bool HandleGatewayReply(byte[] msg)
{
if (Server is not NatsServer server || ParseCtx.Pa.Subject is not { Length: > 0 } originalSubject)
return false;
var (isRoutedReply, isOldPrefix) = GatewayHandler.IsGWRoutedSubjectAndIsOldPrefix(originalSubject);
if (!isRoutedReply)
return false;
ParseCtx.Pa.Subject = GatewayHandler.GetSubjectFromGWRoutedReply(originalSubject, isOldPrefix);
ParseCtx.Pa.PaCache = [.. ParseCtx.Pa.Account ?? [], (byte)' ', .. ParseCtx.Pa.Subject];
var (account, result) = GetAccAndResultFromCache();
if (account is not Account concreteAccount)
{
Debugf("Unknown account {0} for gateway message on subject: {1}",
ParseCtx.Pa.Account is { Length: > 0 } accountName ? Encoding.ASCII.GetString(accountName) : string.Empty,
Encoding.ASCII.GetString(ParseCtx.Pa.Subject));
if (ParseCtx.Pa.Account is { Length: > 0 } gatewayAccount)
server.GatewayHandleAccountNoInterest(this, gatewayAccount);
return true;
}
if (result != null && (result.PSubs.Count + result.QSubs.Count) > 0)
ProcessMsgResults(concreteAccount, result, msg, null, ParseCtx.Pa.Subject, ParseCtx.Pa.Reply, PmrFlags.None);
if (!IsServiceReply(ParseCtx.Pa.Subject))
SendMsgToGateways(concreteAccount, ParseCtx.Pa.Subject, ParseCtx.Pa.Reply, msg);
return true;
}
internal void ProcessInboundGatewayMsg(byte[] msg)
{
_in.Msgs++;
_in.Bytes += Math.Max(0, msg.Length - 2);
if (Opts.Verbose)
SendOK();
if (Server is not NatsServer server || ParseCtx.Pa.Subject is not { Length: > 0 })
return;
if (HandleGatewayReply(msg))
return;
var (account, result) = GetAccAndResultFromCache();
if (account is not Account concreteAccount)
{
Debugf("Unknown account {0} for gateway message on subject: {1}",
ParseCtx.Pa.Account is { Length: > 0 } accountName ? Encoding.ASCII.GetString(accountName) : string.Empty,
Encoding.ASCII.GetString(ParseCtx.Pa.Subject));
if (ParseCtx.Pa.Account is { Length: > 0 } gatewayAccount)
server.GatewayHandleAccountNoInterest(this, gatewayAccount);
return;
}
var noInterest = result == null || result.PSubs.Count == 0;
if (noInterest)
{
server.GatewayHandleSubjectNoInterest(this, concreteAccount, ParseCtx.Pa.Account ?? Encoding.ASCII.GetBytes(concreteAccount.Name), ParseCtx.Pa.Subject);
if (ParseCtx.Pa.Queues is null || ParseCtx.Pa.Queues.Count == 0)
return;
}
ProcessMsgResults(concreteAccount, result, msg, null, ParseCtx.Pa.Subject, ParseCtx.Pa.Reply, PmrFlags.None);
}
internal void GatewayAllSubsReceiveStart(ServerInfo info)
{
var account = GatewayHandler.GetAccountFromGatewayCommand(this, info, "start");
if (string.IsNullOrWhiteSpace(account))
return;
Debugf("Gateway {0}: switching account {1} to {2} mode",
info.Gateway ?? string.Empty, account, GatewayInterestMode.InterestOnly.String());
Gateway ??= new Gateway();
Gateway.OutSim ??= new System.Collections.Concurrent.ConcurrentDictionary<string, OutSide>(StringComparer.Ordinal);
var outSide = Gateway.OutSim.GetOrAdd(account, _ => new OutSide
{
Sl = Internal.DataStructures.SubscriptionIndex.NewSublistWithCache(),
});
outSide.AcquireWriteLock();
try { outSide.Mode = GatewayInterestMode.Transitioning; }
finally { outSide.ReleaseWriteLock(); }
}
internal void GatewayAllSubsReceiveComplete(ServerInfo info)
{
var account = GatewayHandler.GetAccountFromGatewayCommand(this, info, "complete");
if (string.IsNullOrWhiteSpace(account))
return;
if (Gateway?.OutSim == null || !Gateway.OutSim.TryGetValue(account, out var outSide))
return;
outSide.AcquireWriteLock();
try
{
outSide.Ni = null;
outSide.Mode = GatewayInterestMode.InterestOnly;
}
finally
{
outSide.ReleaseWriteLock();
}
Debugf("Gateway {0}: switching account {1} to {2} mode complete",
info.Gateway ?? string.Empty, account, GatewayInterestMode.InterestOnly.String());
}
internal void GatewaySwitchAccountToSendAllSubs(InSide inSide, string accountName)
{
if (Server is not NatsServer server || string.IsNullOrWhiteSpace(accountName))
return;
inSide.Ni = null;
inSide.Mode = GatewayInterestMode.Transitioning;
var remoteGatewayName = Gateway?.Name ?? string.Empty;
Debugf("Gateway {0}: switching account {1} to {2} mode",
remoteGatewayName, accountName, GatewayInterestMode.InterestOnly.String());
void SendCommand(byte command, bool withLock)
{
var info = new ServerInfo
{
Gateway = server.GetGatewayName(),
GatewayCmd = command,
GatewayCmdPayload = Encoding.ASCII.GetBytes(accountName),
};
var infoProto = NatsServer.GenerateInfoJson(info);
if (withLock)
{
lock (_mu)
{
EnqueueProto(infoProto);
}
return;
}
EnqueueProto(infoProto);
}
SendCommand(GatewayHandler.GatewayCmdAllSubsStart, withLock: false);
_ = server.StartGoRoutine(() =>
{
server.SendAccountSubsToGateway(this, accountName);
SendCommand(GatewayHandler.GatewayCmdAllSubsComplete, withLock: true);
Debugf("Gateway {0}: switching account {1} to {2} mode complete",
remoteGatewayName, accountName, GatewayInterestMode.InterestOnly.String());
});
}
}

View File

@@ -0,0 +1,217 @@
// Copyright 2018-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0
using System.Text;
using System.Text.Json;
namespace ZB.MOM.NatsNet.Server;
public sealed partial class ClientConnection
{
internal Exception? SendGatewayConnect()
{
if (Server is not NatsServer server)
return new InvalidOperationException("gateway server unavailable");
var opts = server.GetOpts();
var connect = new ConnectInfo
{
Echo = true,
Verbose = false,
Pedantic = false,
User = opts.Gateway.Username,
Pass = opts.Gateway.Password,
Tls = opts.Gateway.TlsConfig != null,
Name = server.ID(),
Headers = server.SupportsHeaders(),
Gateway = opts.Gateway.Name,
Dynamic = server.IsClusterNameDynamic(),
};
var payload = JsonSerializer.Serialize(connect);
EnqueueProto(Encoding.ASCII.GetBytes($"CONNECT {payload}\r\n"));
return null;
}
internal Exception? ProcessGatewayConnect(byte[] arg)
{
ConnectInfo? connect;
try
{
connect = JsonSerializer.Deserialize<ConnectInfo>(arg);
}
catch (Exception ex)
{
return ex;
}
if (connect == null)
return new InvalidOperationException("invalid gateway connect payload");
Gateway ??= new Gateway();
Gateway.Connected = true;
if (!string.IsNullOrWhiteSpace(connect.Gateway))
Gateway.Name = connect.Gateway;
if (!string.IsNullOrWhiteSpace(connect.Name))
Gateway.RemoteName = connect.Name;
Headers = connect.Headers;
return null;
}
internal Exception? ProcessGatewayInfo(byte[] arg)
{
ServerInfo? info;
try
{
info = JsonSerializer.Deserialize<ServerInfo>(arg);
}
catch (Exception ex)
{
return ex;
}
if (info == null)
return new InvalidOperationException("invalid gateway info payload");
return ProcessGatewayInfo(info);
}
internal Exception? ProcessGatewayInfo(ServerInfo info)
{
if (Server is not NatsServer server)
return new InvalidOperationException("gateway server unavailable");
Gateway ??= new Gateway();
if (!string.IsNullOrWhiteSpace(info.Gateway))
Gateway.Name = info.Gateway;
if (!string.IsNullOrWhiteSpace(info.GatewayUrl) && Uri.TryCreate(info.GatewayUrl, UriKind.Absolute, out var url))
Gateway.ConnectUrl = url;
Gateway.UseOldPrefix = info.GatewayNrp;
Gateway.InterestOnlyMode = info.GatewayIom;
if (info.GatewayUrls is { Length: > 0 })
server.ProcessImplicitGateway(Gateway.Name, info.GatewayUrls);
return null;
}
internal Exception? ProcessGatewayAccountUnsub(byte[] arg)
{
if (Server is not NatsServer server)
return new InvalidOperationException("gateway server unavailable");
var tokens = Encoding.ASCII.GetString(arg).Split(' ', StringSplitOptions.RemoveEmptyEntries);
if (tokens.Length < 1)
return null;
if (tokens.Length == 1)
return null;
var accountName = tokens[0];
var subject = Encoding.ASCII.GetBytes(tokens[1]);
var queue = tokens.Length > 2 ? Encoding.ASCII.GetBytes(tokens[2]) : null;
var (account, _) = server.LookupAccount(accountName);
if (account == null)
return null;
var sub = new Internal.Subscription { Subject = subject, Queue = queue };
server.SendQueueSubOrUnsubToGateways(account, sub, isUnsub: true);
return null;
}
internal Exception? ProcessGatewayAccountSub(byte[] arg)
{
if (Server is not NatsServer server)
return new InvalidOperationException("gateway server unavailable");
var tokens = Encoding.ASCII.GetString(arg).Split(' ', StringSplitOptions.RemoveEmptyEntries);
if (tokens.Length < 2)
return null;
var accountName = tokens[0];
var subject = Encoding.ASCII.GetBytes(tokens[1]);
var queue = tokens.Length > 2 ? Encoding.ASCII.GetBytes(tokens[2]) : null;
var (account, _) = server.LookupAccount(accountName);
if (account == null)
return null;
var sub = new Internal.Subscription { Subject = subject, Queue = queue };
if (queue is { Length: > 0 })
server.SendQueueSubOrUnsubToGateways(account, sub, isUnsub: false);
else
server.MaybeSendSubOrUnsubToGateways(account, sub, isUnsub: false);
return null;
}
internal Exception? ProcessGatewayRUnsub(byte[] arg)
{
var tokens = Encoding.ASCII.GetString(arg).Split(' ', StringSplitOptions.RemoveEmptyEntries);
if (tokens.Length < 2 || Server is not NatsServer server)
return null;
var accountName = tokens[0];
var subject = Encoding.ASCII.GetBytes(tokens[1]);
var queue = tokens.Length > 2 ? Encoding.ASCII.GetBytes(tokens[2]) : null;
var (account, _) = server.LookupAccount(accountName);
if (account == null)
return null;
var sub = new Internal.Subscription { Subject = subject, Queue = queue };
server.MaybeSendSubOrUnsubToGateways(account, sub, isUnsub: true);
return null;
}
internal Exception? ProcessGatewayRSub(byte[] arg)
{
var tokens = Encoding.ASCII.GetString(arg).Split(' ', StringSplitOptions.RemoveEmptyEntries);
if (tokens.Length < 2 || Server is not NatsServer server)
return null;
var accountName = tokens[0];
var subject = Encoding.ASCII.GetBytes(tokens[1]);
var queue = tokens.Length > 2 ? Encoding.ASCII.GetBytes(tokens[2]) : null;
var (account, _) = server.LookupAccount(accountName);
if (account == null)
return null;
var sub = new Internal.Subscription { Subject = subject, Queue = queue };
server.MaybeSendSubOrUnsubToGateways(account, sub, isUnsub: false);
return null;
}
internal bool GatewayInterest(Account? account, string subject)
{
if (account == null || string.IsNullOrWhiteSpace(subject))
return false;
if (Gateway?.OutSim == null)
return true;
if (!Gateway.OutSim.TryGetValue(account.Name, out var outSi))
return true;
outSi.AcquireReadLock();
try
{
if (outSi.Mode == GatewayInterestMode.InterestOnly && outSi.Sl != null)
{
var match = outSi.Sl.Match(subject);
return match.PSubs.Count > 0 || match.QSubs.Count > 0;
}
return outSi.Ni == null || !outSi.Ni.Contains(subject);
}
finally
{
outSi.ReleaseReadLock();
}
}
}

View File

@@ -114,6 +114,7 @@ public sealed partial class ClientConnection
// Client options (from CONNECT message).
internal ClientOptions Opts = ClientOptions.Default;
internal Route? Route;
internal Gateway? Gateway;
internal WebsocketConnection? Ws;
// Flags and state.
@@ -134,6 +135,7 @@ public sealed partial class ClientConnection
internal Dictionary<string, RespEntry>? Replies;
internal Dictionary<ClientConnection, bool>? Pcd; // pending clients with data to flush
internal Dictionary<string, bool>? DArray; // denied subscribe patterns
internal GwReplyMapping GwReplyMapping = new();
// Outbound state (simplified — full write loop ported when Server is available).
internal long OutPb; // pending bytes

View File

@@ -0,0 +1,142 @@
// Copyright 2018-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0
using System.Security.Cryptography;
using System.Text;
using ZB.MOM.NatsNet.Server.Internal;
namespace ZB.MOM.NatsNet.Server;
internal static class GatewayHandler
{
internal static readonly byte[] GwReplyPrefix = Encoding.ASCII.GetBytes("_GR_.");
internal static readonly byte[] OldGwReplyPrefix = Encoding.ASCII.GetBytes("$GR.");
internal const int GwHashLen = 6;
internal const int GwClusterOffset = 5;
internal const int GwServerOffset = GwClusterOffset + GwHashLen + 1;
internal const int GwSubjectOffset = GwServerOffset + GwHashLen + 1;
internal const int OldGwReplyPrefixLen = 4;
internal const int OldGwReplyStart = OldGwReplyPrefixLen + 5;
internal const int GatewayCmdAllSubsStart = 2;
internal const int GatewayCmdAllSubsComplete = 3;
internal const int GatewayMaxRUnsubBeforeSwitch = 1000;
private static readonly TimeSpan DefaultSolicitGatewaysDelay = TimeSpan.FromSeconds(1);
private static long _gatewaySolicitDelayTicks = DefaultSolicitGatewaysDelay.Ticks;
private static int _doNotForceInterestOnlyMode;
internal static void SetGatewaysSolicitDelay(TimeSpan delay)
{
Interlocked.Exchange(ref _gatewaySolicitDelayTicks, delay.Ticks);
}
internal static void ResetGatewaysSolicitDelay()
{
Interlocked.Exchange(ref _gatewaySolicitDelayTicks, DefaultSolicitGatewaysDelay.Ticks);
}
internal static TimeSpan GetGatewaysSolicitDelay()
{
return TimeSpan.FromTicks(Interlocked.Read(ref _gatewaySolicitDelayTicks));
}
internal static void GatewayDoNotForceInterestOnlyMode(bool doNotForce)
{
Interlocked.Exchange(ref _doNotForceInterestOnlyMode, doNotForce ? 1 : 0);
}
internal static bool DoNotForceInterestOnlyMode()
{
return Interlocked.CompareExchange(ref _doNotForceInterestOnlyMode, 0, 0) != 0;
}
internal static Exception? ValidateGatewayOptions(ServerOptions options)
{
var gateway = options.Gateway;
if (string.IsNullOrWhiteSpace(gateway.Name) || gateway.Port == 0)
return null;
if (gateway.Name.Contains(' '))
return ServerErrors.ErrGatewayNameHasSpaces;
var names = new HashSet<string>(StringComparer.Ordinal);
foreach (var remote in gateway.Gateways)
{
if (string.IsNullOrWhiteSpace(remote.Name))
return new InvalidOperationException("gateway remote requires a name");
if (!names.Add(remote.Name))
return new InvalidOperationException($"duplicate gateway remote: {remote.Name}");
if (remote.Urls.Count == 0)
return new InvalidOperationException($"gateway remote {remote.Name} has no URLs");
}
return null;
}
internal static byte[] GetGWHash(string value) => GetHash(value, 6);
internal static byte[] GetOldHash(string value) => GetHash(value, 4);
internal static byte[] GwBuildSubProto(string accountName, Subscription subscription, bool isUnsub = false)
{
var op = isUnsub ? "RS-" : "RS+";
var subject = Encoding.ASCII.GetString(subscription.Subject);
if (subscription.Queue is { Length: > 0 } queue)
return Encoding.ASCII.GetBytes($"{op} {accountName} {subject} {Encoding.ASCII.GetString(queue)}\r\n");
return Encoding.ASCII.GetBytes($"{op} {accountName} {subject}\r\n");
}
internal static bool IsGWRoutedReply(byte[] subject)
{
return subject.AsSpan().StartsWith(GwReplyPrefix) || subject.AsSpan().StartsWith(OldGwReplyPrefix);
}
internal static (bool IsRouted, bool IsOldPrefix) IsGWRoutedSubjectAndIsOldPrefix(byte[] subject)
{
if (subject.AsSpan().StartsWith(OldGwReplyPrefix))
return (true, true);
if (subject.AsSpan().StartsWith(GwReplyPrefix))
return (true, false);
return (false, false);
}
internal static bool HasGWRoutedReplyPrefix(byte[] subject)
{
return IsGWRoutedReply(subject);
}
internal static byte[] GetSubjectFromGWRoutedReply(byte[] reply, bool isOldPrefix)
{
if (isOldPrefix)
return reply.Length > OldGwReplyStart ? reply[OldGwReplyStart..] : [];
return reply.Length > GwSubjectOffset ? reply[GwSubjectOffset..] : [];
}
internal static string GetAccountFromGatewayCommand(ClientConnection connection, ServerInfo info, string command)
{
if (info.GatewayCmdPayload == null || info.GatewayCmdPayload.Length == 0)
{
connection.SendErrAndErr($"Account absent from receive-all-subscriptions-{command} command");
connection.CloseConnection(ClosedState.ProtocolViolation);
return string.Empty;
}
return Encoding.ASCII.GetString(info.GatewayCmdPayload);
}
private static byte[] GetHash(string value, int len)
{
var bytes = Encoding.UTF8.GetBytes(value);
var hash = SHA256.HashData(bytes);
var encoded = Convert.ToBase64String(hash)
.Replace('+', '-')
.Replace('/', '_')
.TrimEnd('=');
return Encoding.ASCII.GetBytes(encoded[..len]);
}
}

View File

@@ -13,7 +13,9 @@
//
// Adapted from server/gateway.go in the NATS server Go source.
using System.Text;
using System.Threading;
using System.Linq;
using ZB.MOM.NatsNet.Server.Internal;
using ZB.MOM.NatsNet.Server.Internal.DataStructures;
@@ -54,6 +56,20 @@ public enum GatewayInterestMode : byte
CacheFlushed = 3,
}
internal static class GatewayInterestModeExtensions
{
internal static string String(this GatewayInterestMode mode)
{
return mode switch
{
GatewayInterestMode.Optimistic => "Optimistic",
GatewayInterestMode.InterestOnly => "Interest-Only",
GatewayInterestMode.Transitioning => "Transitioning",
_ => "Unknown",
};
}
}
/// <summary>
/// Server-level gateway state kept on the <see cref="NatsServer"/> instance.
/// Replaces the stub that was in <c>NatsServerTypes.cs</c>.
@@ -130,6 +146,71 @@ internal sealed class SrvGateway
public Lock PasiLock => _pasiLock;
internal byte[] GenerateInfoJSON()
{
Info ??= new ServerInfo();
Info.Gateway = Name;
Info.GatewayUrl = Url;
Info.GatewayUrls = [.. Urls.GetAsStringSlice()];
InfoJson = NatsServer.GenerateInfoJson(Info);
return InfoJson;
}
internal bool HasInbound(string gatewayName)
{
if (string.IsNullOrWhiteSpace(gatewayName))
return false;
_lock.EnterReadLock();
try
{
foreach (var inbound in In.Values)
{
if (string.Equals(inbound.Gateway?.Name, gatewayName, StringComparison.Ordinal))
return true;
}
}
finally
{
_lock.ExitReadLock();
}
return false;
}
internal void UpdateRemotesTLSConfig(IReadOnlyList<RemoteGatewayOpts> remotes)
{
if (remotes.Count == 0)
return;
_lock.EnterWriteLock();
try
{
foreach (var remote in remotes)
{
if (!Remotes.TryGetValue(remote.Name, out var cfg))
continue;
cfg.AcquireWriteLock();
try
{
cfg.RemoteOpts ??= remote.Clone();
cfg.RemoteOpts.TlsConfig = remote.TlsConfig;
cfg.RemoteOpts.TlsTimeout = remote.TlsTimeout;
cfg.RemoteOpts.TlsConfigOpts = remote.TlsConfigOpts;
}
finally
{
cfg.ReleaseWriteLock();
}
}
}
finally
{
_lock.ExitWriteLock();
}
}
// -------------------------------------------------------------------------
// Recent subscription tracking (thread-safe map)
// -------------------------------------------------------------------------
@@ -176,6 +257,46 @@ internal sealed class SrvGateway
public void ReleaseReadLock() => _lock.ExitReadLock();
public void AcquireWriteLock() => _lock.EnterWriteLock();
public void ReleaseWriteLock() => _lock.ExitWriteLock();
internal void OrderOutboundConnectionsLocked()
{
Outo = [.. Out.Values.OrderBy(c => c.GetRttValue()).ThenBy(c => c.Cid)];
}
internal void OrderOutboundConnections()
{
_lock.EnterWriteLock();
try { OrderOutboundConnectionsLocked(); }
finally { _lock.ExitWriteLock(); }
}
internal bool ShouldMapReplyForGatewaySend(byte[] reply, bool useOldPrefix)
{
if (reply.Length == 0)
return false;
if (useOldPrefix)
return !GatewayHandler.IsGWRoutedSubjectAndIsOldPrefix(reply).IsRouted;
return !GatewayHandler.IsGWRoutedReply(reply);
}
internal byte[] GetClusterHash()
{
_lock.EnterReadLock();
try
{
if (ReplyPfx.Length < GatewayHandler.GwClusterOffset + GatewayHandler.GwHashLen)
return [];
return ReplyPfx[GatewayHandler.GwClusterOffset..(GatewayHandler.GwClusterOffset + GatewayHandler.GwHashLen)];
}
finally
{
_lock.ExitReadLock();
}
}
}
/// <summary>
@@ -219,6 +340,7 @@ internal sealed class GatewayCfg
/// <summary>TLS server name override for SNI.</summary>
public string TlsName { get; set; } = string.Empty;
/// <summary>TLS server name override for SNI.</summary>
/// <summary>True if this remote was discovered via gossip (not configured).</summary>
public bool Implicit { get; set; }
@@ -228,6 +350,81 @@ internal sealed class GatewayCfg
// Forwarded properties from RemoteGatewayOpts
public string Name { get => RemoteOpts?.Name ?? string.Empty; }
internal void BumpConnAttempts()
{
_lock.EnterWriteLock();
try { ConnAttempts++; }
finally { _lock.ExitWriteLock(); }
}
internal int GetConnAttempts()
{
_lock.EnterReadLock();
try { return ConnAttempts; }
finally { _lock.ExitReadLock(); }
}
internal void ResetConnAttempts()
{
_lock.EnterWriteLock();
try { ConnAttempts = 0; }
finally { _lock.ExitWriteLock(); }
}
internal bool IsImplicit()
{
_lock.EnterReadLock();
try { return Implicit; }
finally { _lock.ExitReadLock(); }
}
internal IReadOnlyCollection<Uri> GetUrls()
{
_lock.EnterReadLock();
try { return [.. Urls.Values]; }
finally { _lock.ExitReadLock(); }
}
internal string[] GetUrlsAsStrings()
{
_lock.EnterReadLock();
try { return [.. Urls.Keys]; }
finally { _lock.ExitReadLock(); }
}
internal void UpdateUrls(IEnumerable<Uri> urls)
{
_lock.EnterWriteLock();
try
{
Urls.Clear();
foreach (var url in urls)
Urls[url.ToString()] = url;
}
finally { _lock.ExitWriteLock(); }
}
internal void SaveTLSHostname(Uri url)
{
if (string.IsNullOrWhiteSpace(url.Host))
return;
_lock.EnterWriteLock();
try { TlsName = url.Host; }
finally { _lock.ExitWriteLock(); }
}
internal void AddUrls(IEnumerable<Uri> urls)
{
_lock.EnterWriteLock();
try
{
foreach (var url in urls)
Urls[url.ToString()] = url;
}
finally { _lock.ExitWriteLock(); }
}
// -------------------------------------------------------------------------
// Lock helpers
// -------------------------------------------------------------------------
@@ -378,7 +575,24 @@ internal sealed class GwReplyMapping
/// </summary>
public (byte[] Subject, bool Found) Get(byte[] subject)
{
// TODO: session 16 — implement mapping lookup
return (subject, false);
var key = Encoding.UTF8.GetString(subject);
if (!Mapping.TryGetValue(key, out var entry))
return (subject, false);
if (entry.Exp <= DateTime.UtcNow.Ticks)
{
Mapping.Remove(key);
return (subject, false);
}
return (Encoding.UTF8.GetBytes(entry.Ms), true);
}
}
internal static class RemoteGatewayOptsExtensions
{
internal static RemoteGatewayOpts Clone(this RemoteGatewayOpts source)
{
return source.Clone();
}
}

View File

@@ -0,0 +1,230 @@
// Copyright 2018-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using ZB.MOM.NatsNet.Server.Internal;
namespace ZB.MOM.NatsNet.Server;
public sealed partial class NatsServer
{
internal Exception? NewGateway(ServerOptions options)
{
var validationErr = GatewayHandler.ValidateGatewayOptions(options);
if (validationErr != null)
return validationErr;
if (string.IsNullOrWhiteSpace(options.Gateway.Name) || options.Gateway.Port == 0)
return null;
_gateway.AcquireWriteLock();
try
{
_gateway.Enabled = true;
_gateway.Name = options.Gateway.Name;
_gateway.RejectUnknown = options.Gateway.RejectUnknown;
_gateway.Remotes.Clear();
_gateway.Out.Clear();
_gateway.Outo.Clear();
_gateway.In.Clear();
_gateway.OwnCfgUrls = [];
foreach (var remote in options.Gateway.Gateways)
{
var cfg = new GatewayCfg
{
RemoteOpts = remote.Clone(),
Hash = GatewayHandler.GetGWHash(remote.Name),
OldHash = GatewayHandler.GetOldHash(remote.Name),
Urls = remote.Urls.ToDictionary(u => u.ToString(), u => u, StringComparer.Ordinal),
};
_gateway.Remotes[remote.Name] = cfg;
foreach (var url in remote.Urls)
_gateway.OwnCfgUrls.Add(url.ToString());
}
var info = CopyInfo();
info.Gateway = options.Gateway.Name;
info.GatewayUrl = _gateway.Url;
info.GatewayUrls = [.. _gateway.Urls.GetAsStringSlice()];
_gateway.Info = info;
_gateway.InfoJson = GenerateInfoJson(info);
_gateway.SIdHash = GatewayHandler.GetGWHash(_info.Id);
_gateway.OldHash = GatewayHandler.GetOldHash(_info.Id);
var clusterHash = GatewayHandler.GetGWHash(options.Gateway.Name);
_gateway.ReplyPfx = Encoding.ASCII.GetBytes($"_GR_.{Encoding.ASCII.GetString(clusterHash)}.{Encoding.ASCII.GetString(_gateway.SIdHash)}.");
_gateway.OldReplyPfx = Encoding.ASCII.GetBytes($"$GR.{Encoding.ASCII.GetString(_gateway.OldHash)}.");
}
finally
{
_gateway.ReleaseWriteLock();
}
return null;
}
internal Exception? StartGateways()
{
if (!_gateway.Enabled)
return null;
var hostPortErr = SetGatewayInfoHostPort();
if (hostPortErr != null)
return hostPortErr;
SolicitGateways();
return StartGatewayAcceptLoop();
}
internal Exception? StartGatewayAcceptLoop()
{
if (_gatewayListener == null)
{
var opts = GetOpts();
var hp = $"{opts.Gateway.Host}:{opts.Gateway.Port}";
_mu.EnterWriteLock();
try
{
var parts = hp.Split(':', 2);
var host = parts[0];
var port = parts.Length > 1 ? int.Parse(parts[1]) : 0;
var addr = string.IsNullOrWhiteSpace(host) || host == "0.0.0.0"
? IPAddress.Any
: (host == "::" ? IPAddress.IPv6Any : IPAddress.Parse(host));
_gatewayListener = new TcpListener(addr, port);
_gatewayListener.Start();
}
catch (Exception ex)
{
_gatewayListenerErr = ex;
return ex;
}
finally
{
_mu.ExitWriteLock();
}
}
if (!StartGoRoutine(() => Noticef("Gateway accept loop started")))
return new InvalidOperationException("unable to start gateway accept loop");
return null;
}
internal Exception? SetGatewayInfoHostPort()
{
var opts = GetOpts();
var host = opts.Gateway.Host;
var port = opts.Gateway.Port;
if (!string.IsNullOrWhiteSpace(opts.Gateway.Advertise))
{
var (advHost, advPort, advErr) = Internal.ServerUtilities.ParseHostPort(opts.Gateway.Advertise, port);
if (advErr != null)
return advErr;
host = advHost;
port = advPort;
}
var scheme = opts.Gateway.TlsConfig != null ? "tls" : "nats";
var url = $"{scheme}://{host}:{port}";
_gateway.AcquireWriteLock();
try
{
_gateway.Url = url;
_gateway.Info ??= CopyInfo();
_gateway.Info.Gateway = opts.Gateway.Name;
_gateway.Info.GatewayUrl = url;
_gateway.Info.GatewayUrls = [.. _gateway.Urls.GetAsStringSlice()];
_gateway.InfoJson = GenerateInfoJson(_gateway.Info);
}
finally
{
_gateway.ReleaseWriteLock();
}
return null;
}
internal void SolicitGateways()
{
if (!_gateway.Enabled)
return;
var delay = GatewayHandler.GetGatewaysSolicitDelay();
if (delay > TimeSpan.Zero)
Thread.Sleep(delay);
List<GatewayCfg> remotes;
_gateway.AcquireReadLock();
try
{
remotes = [.. _gateway.Remotes.Values];
}
finally
{
_gateway.ReleaseReadLock();
}
foreach (var cfg in remotes)
SolicitGateway(cfg, firstConnect: true);
}
internal void ReconnectGateway(GatewayCfg cfg)
{
SolicitGateway(cfg, firstConnect: false);
}
internal void SolicitGateway(GatewayCfg cfg, bool firstConnect)
{
_ = firstConnect;
if (cfg.RemoteOpts == null || cfg.RemoteOpts.Urls.Count == 0)
return;
if (_gateway.HasInbound(cfg.RemoteOpts.Name))
return;
CreateGateway(cfg, cfg.RemoteOpts.Urls[0]);
}
internal ClientConnection? CreateGateway(GatewayCfg cfg, Uri? url = null)
{
if (cfg.RemoteOpts == null)
return null;
var connection = new ClientConnection(ClientKind.Gateway, this)
{
Gateway = new Gateway
{
Name = cfg.RemoteOpts.Name,
Cfg = cfg,
ConnectUrl = url,
Outbound = true,
},
};
_gateway.AcquireWriteLock();
try
{
_gateway.Out[cfg.RemoteOpts.Name] = connection;
_gateway.Outo = [.. _gateway.Out.Values];
}
finally
{
_gateway.ReleaseWriteLock();
}
return connection;
}
}

View File

@@ -0,0 +1,296 @@
// Copyright 2018-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0
using System.Text;
using System.Linq;
using ZB.MOM.NatsNet.Server.Internal;
namespace ZB.MOM.NatsNet.Server;
public sealed partial class NatsServer
{
internal void GossipGatewaysToInboundGateway(ClientConnection inboundGateway)
{
if (_gateway.Info == null)
return;
var info = _gateway.Info.ShallowClone();
info.GatewayCmd = 1;
info.GatewayUrls = [.. _gateway.Urls.GetAsStringSlice()];
inboundGateway.EnqueueProto(GenerateInfoJson(info));
}
internal void ForwardNewGatewayToLocalCluster(string gatewayName, IReadOnlyCollection<string> gatewayUrls)
{
if (gatewayUrls.Count == 0)
return;
var info = _routeInfo.ShallowClone();
info.Gateway = gatewayName;
info.GatewayUrls = [.. gatewayUrls];
var proto = GenerateInfoJson(info);
_mu.EnterReadLock();
try
{
ForEachRoute(route => route.EnqueueProto(proto));
foreach (var inbound in _gateway.In.Values)
inbound.EnqueueProto(proto);
}
finally
{
_mu.ExitReadLock();
}
}
internal void SendQueueSubsToGateway(ClientConnection gateway, string accountName)
{
SendAccountSubsToGateway(gateway, accountName, sendQueueOnly: true);
}
internal void SendAccountSubsToGateway(ClientConnection gateway, string accountName, bool sendQueueOnly = false)
{
var (account, _) = LookupAccount(accountName);
if (account == null)
return;
SendSubsToGateway(gateway, account, sendQueueOnly);
}
internal static byte[] GwBuildSubProto(string accountName, Subscription subscription, bool isUnsub = false)
{
var op = isUnsub ? "RS-" : "RS+";
var subject = Encoding.ASCII.GetString(subscription.Subject);
if (subscription.Queue is { Length: > 0 } queue)
return Encoding.ASCII.GetBytes($"{op} {accountName} {subject} {Encoding.ASCII.GetString(queue)}\r\n");
return Encoding.ASCII.GetBytes($"{op} {accountName} {subject}\r\n");
}
internal void SendSubsToGateway(ClientConnection gateway, Account account, bool sendQueueOnly)
{
if (account.Sublist == null)
return;
var subs = new List<Subscription>();
account.Sublist.All(subs);
foreach (var sub in subs)
{
if (sendQueueOnly && sub.Queue is not { Length: > 0 })
continue;
gateway.EnqueueProto(GwBuildSubProto(account.Name, sub));
}
}
internal void ProcessGatewayInfoFromRoute(string gatewayName, IReadOnlyCollection<string> gatewayUrls)
{
ProcessImplicitGateway(gatewayName, gatewayUrls);
}
internal void SendGatewayConfigsToRoute(ClientConnection route)
{
_gateway.AcquireReadLock();
try
{
foreach (var (name, cfg) in _gateway.Remotes)
{
var info = _routeInfo.ShallowClone();
info.Gateway = name;
info.GatewayUrls = cfg.GetUrlsAsStrings();
route.EnqueueProto(GenerateInfoJson(info));
}
}
finally
{
_gateway.ReleaseReadLock();
}
}
internal void ProcessImplicitGateway(string gatewayName, IReadOnlyCollection<string> gatewayUrls)
{
if (string.IsNullOrWhiteSpace(gatewayName) || gatewayUrls.Count == 0)
return;
_gateway.AcquireWriteLock();
try
{
if (!_gateway.Remotes.TryGetValue(gatewayName, out var cfg))
{
cfg = new GatewayCfg
{
RemoteOpts = new RemoteGatewayOpts { Name = gatewayName },
Hash = GatewayHandler.GetGWHash(gatewayName),
OldHash = GatewayHandler.GetOldHash(gatewayName),
Implicit = true,
Urls = new Dictionary<string, Uri>(StringComparer.Ordinal),
};
_gateway.Remotes[gatewayName] = cfg;
}
var urls = gatewayUrls
.Select(url => Uri.TryCreate(url, UriKind.Absolute, out var uri) ? uri : null)
.Where(uri => uri != null)
.Cast<Uri>()
.ToList();
cfg.AddUrls(urls);
cfg.VarzUpdateUrls = true;
}
finally
{
_gateway.ReleaseWriteLock();
}
}
internal void AddGatewayURL(string gatewayName, string gatewayUrl)
{
if (string.IsNullOrWhiteSpace(gatewayUrl))
return;
_gateway.AcquireWriteLock();
try
{
_gateway.Urls.AddUrl(gatewayUrl);
if (_gateway.Remotes.TryGetValue(gatewayName, out var cfg) && Uri.TryCreate(gatewayUrl, UriKind.Absolute, out var url))
cfg.AddUrls([url]);
}
finally
{
_gateway.ReleaseWriteLock();
}
}
internal void RemoveGatewayURL(string gatewayName, string gatewayUrl)
{
if (string.IsNullOrWhiteSpace(gatewayUrl))
return;
_gateway.AcquireWriteLock();
try
{
_gateway.Urls.RemoveUrl(gatewayUrl);
if (_gateway.Remotes.TryGetValue(gatewayName, out var cfg))
{
cfg.AcquireWriteLock();
try
{
cfg.Urls.Remove(gatewayUrl);
cfg.VarzUpdateUrls = true;
}
finally
{
cfg.ReleaseWriteLock();
}
}
}
finally
{
_gateway.ReleaseWriteLock();
}
}
internal void SendAsyncGatewayInfo()
{
byte[] infoProto;
_gateway.AcquireReadLock();
try
{
infoProto = _gateway.InfoJson is { Length: > 0 } proto
? [.. proto]
: _gateway.GenerateInfoJSON();
}
finally
{
_gateway.ReleaseReadLock();
}
_mu.EnterReadLock();
try
{
foreach (var route in _routes.Values.SelectMany(v => v))
route.EnqueueProto(infoProto);
foreach (var inbound in _gateway.In.Values)
inbound.EnqueueProto(infoProto);
}
finally
{
_mu.ExitReadLock();
}
}
internal string GetGatewayURL()
{
_gateway.AcquireReadLock();
try { return _gateway.Url; }
finally { _gateway.ReleaseReadLock(); }
}
internal string GetGatewayName()
{
_gateway.AcquireReadLock();
try { return _gateway.Name; }
finally { _gateway.ReleaseReadLock(); }
}
internal void RegisterInboundGatewayConnection(ClientConnection connection)
{
_gateway.AcquireWriteLock();
try { _gateway.In[connection.Cid] = connection; }
finally { _gateway.ReleaseWriteLock(); }
}
internal void RegisterOutboundGatewayConnection(string gatewayName, ClientConnection connection)
{
_gateway.AcquireWriteLock();
try
{
_gateway.Out[gatewayName] = connection;
_gateway.OrderOutboundConnectionsLocked();
}
finally
{
_gateway.ReleaseWriteLock();
}
}
internal ClientConnection? GetOutboundGatewayConnection(string gatewayName)
{
_gateway.AcquireReadLock();
try { return _gateway.Out.GetValueOrDefault(gatewayName); }
finally { _gateway.ReleaseReadLock(); }
}
internal IReadOnlyList<ClientConnection> GetOutboundGatewayConnections()
{
_gateway.AcquireReadLock();
try { return [.. _gateway.Outo]; }
finally { _gateway.ReleaseReadLock(); }
}
public int NumOutboundGateways() => NumOutboundGatewaysInternal();
internal int NumOutboundGatewaysInternal()
{
_gateway.AcquireReadLock();
try { return _gateway.Out.Count; }
finally { _gateway.ReleaseReadLock(); }
}
internal int NumInboundGateways()
{
_gateway.AcquireReadLock();
try { return _gateway.In.Count; }
finally { _gateway.ReleaseReadLock(); }
}
internal GatewayCfg? GetRemoteGateway(string gatewayName)
{
_gateway.AcquireReadLock();
try { return _gateway.Remotes.GetValueOrDefault(gatewayName); }
finally { _gateway.ReleaseReadLock(); }
}
}

View File

@@ -0,0 +1,145 @@
// Copyright 2018-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0
using System.Net;
using System.Text;
using ZB.MOM.NatsNet.Server.Internal;
namespace ZB.MOM.NatsNet.Server;
public sealed partial class NatsServer
{
internal bool ShouldMapReplyForGatewaySend(byte[] reply, bool useOldPrefix)
{
return _gateway.ShouldMapReplyForGatewaySend(reply, useOldPrefix);
}
internal IReadOnlyList<ClientConnection> GetInboundGatewayConnections()
{
_gateway.AcquireReadLock();
try { return [.. _gateway.In.Values]; }
finally { _gateway.ReleaseReadLock(); }
}
internal string GatewayAddr()
{
var opts = GetOpts();
var host = string.IsNullOrWhiteSpace(opts.Gateway.Host) ? opts.Host : opts.Gateway.Host;
return $"{host}:{opts.Gateway.Port}";
}
internal void SwitchAccountToInterestMode(string accountName)
{
_gateway.AcquireWriteLock();
try
{
foreach (var outbound in _gateway.Out.Values)
{
outbound.Gateway ??= new Gateway();
outbound.Gateway.OutSim ??= new System.Collections.Concurrent.ConcurrentDictionary<string, OutSide>(StringComparer.Ordinal);
outbound.Gateway.OutSim[accountName] = new OutSide
{
Mode = GatewayInterestMode.InterestOnly,
Ni = null,
};
}
}
finally
{
_gateway.ReleaseWriteLock();
}
}
internal void MaybeSendSubOrUnsubToGateways(Account account, Subscription sub, bool isUnsub)
{
if (sub.Subject.Length == 0)
return;
_gateway.AcquireReadLock();
try
{
foreach (var gateway in _gateway.Out.Values)
gateway.EnqueueProto(GatewayHandler.GwBuildSubProto(account.Name, sub, isUnsub));
}
finally
{
_gateway.ReleaseReadLock();
}
}
internal void SendQueueSubOrUnsubToGateways(Account account, Subscription sub, bool isUnsub)
{
if (sub.Queue is not { Length: > 0 })
return;
MaybeSendSubOrUnsubToGateways(account, sub, isUnsub);
}
internal void GatewayHandleAccountNoInterest(ClientConnection connection, byte[] accountName)
{
lock (_gateway.PasiLock)
{
var key = Encoding.ASCII.GetString(accountName);
if (_gateway.Pasi.TryGetValue(key, out var accountInterest) && accountInterest.Count > 0)
return;
connection.SendAccountUnsubToGateway(accountName);
}
}
internal void GatewayHandleSubjectNoInterest(ClientConnection connection, Account account, byte[] accountName, byte[] subject)
{
lock (_gateway.PasiLock)
{
var hasSubs = (account.Sublist?.Count() ?? 0) > 0 || account.ServiceImportReply != null;
if (!hasSubs)
{
connection.SendAccountUnsubToGateway(Encoding.ASCII.GetBytes(account.Name));
return;
}
var sendProto = false;
lock (connection)
{
connection.Gateway ??= new Gateway();
connection.Gateway.InSim ??= new Dictionary<string, InSide>(StringComparer.Ordinal);
var accountKey = Encoding.ASCII.GetString(accountName);
if (!connection.Gateway.InSim.TryGetValue(accountKey, out var inSide) || inSide == null)
{
inSide = new InSide
{
Ni = new HashSet<string>(StringComparer.Ordinal),
};
inSide.Ni.Add(Encoding.ASCII.GetString(subject));
connection.Gateway.InSim[accountKey] = inSide;
sendProto = true;
}
else if (inSide.Ni != null)
{
var subjectKey = Encoding.ASCII.GetString(subject);
if (!inSide.Ni.Contains(subjectKey))
{
if (inSide.Ni.Count >= GatewayHandler.GatewayMaxRUnsubBeforeSwitch)
{
connection.GatewaySwitchAccountToSendAllSubs(inSide, accountKey);
}
else
{
inSide.Ni.Add(subjectKey);
sendProto = true;
}
}
}
}
if (!sendProto)
return;
var protocol = Encoding.ASCII.GetBytes($"RS- {Encoding.ASCII.GetString(accountName)} {Encoding.ASCII.GetString(subject)}\r\n");
connection.EnqueueProto(protocol);
if (connection.Trace)
connection.TraceOutOp(string.Empty, protocol.AsSpan(0, protocol.Length - 2).ToArray());
}
}
}

View File

@@ -0,0 +1,177 @@
// Copyright 2018-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0
using System.Collections.Concurrent;
using System.Text;
using System.Threading.Channels;
namespace ZB.MOM.NatsNet.Server;
public sealed partial class NatsServer
{
private readonly ConcurrentDictionary<GwReplyMapping, object> _gwReplyMappings = new();
private readonly Channel<TimeSpan> _gwReplyMapTtlUpdates = Channel.CreateBounded<TimeSpan>(1);
private int _gwReplyMapWorkerRunning;
internal void StoreRouteByHash(string serverIdHash, ClientConnection route)
{
if (!_gateway.Enabled || string.IsNullOrWhiteSpace(serverIdHash))
return;
_gateway.RoutesIdByHash[serverIdHash] = route;
}
internal void RemoveRouteByHash(string serverIdHash)
{
if (!_gateway.Enabled || string.IsNullOrWhiteSpace(serverIdHash))
return;
_gateway.RoutesIdByHash.TryRemove(serverIdHash, out _);
}
internal (ClientConnection? Route, bool PerAccount) GetRouteByHash(byte[] hash, byte[] accountName)
{
if (hash.Length == 0)
return (null, false);
var id = Encoding.ASCII.GetString(hash);
var perAccount = false;
var accountKey = Encoding.ASCII.GetString(accountName);
if (_accRouteByHash.TryGetValue(accountKey, out var accountRouteEntry))
{
if (accountRouteEntry == null)
{
id += accountKey;
perAccount = true;
}
else if (accountRouteEntry is int routeIndex)
{
id += routeIndex.ToString();
}
}
if (_gateway.RoutesIdByHash.TryGetValue(id, out var route))
return (route, perAccount);
if (!perAccount && _gateway.RoutesIdByHash.TryGetValue($"{Encoding.ASCII.GetString(hash)}0", out var noPoolRoute))
{
lock (noPoolRoute)
{
if (noPoolRoute.Route?.NoPool == true)
return (noPoolRoute, false);
}
}
return (null, perAccount);
}
internal void TrackGWReply(ClientConnection? client, Account? account, byte[] reply, byte[] routedReply)
{
GwReplyMapping? mapping = null;
object? locker = null;
if (account != null)
{
mapping = account.GwReplyMapping;
locker = account;
}
else if (client != null)
{
mapping = client.GwReplyMapping;
locker = client;
}
if (mapping == null || locker == null || reply.Length == 0 || routedReply.Length == 0)
return;
var ttl = _gateway.RecSubExp <= TimeSpan.Zero ? TimeSpan.FromSeconds(2) : _gateway.RecSubExp;
lock (locker)
{
var wasEmpty = mapping.Mapping.Count == 0;
var maxMappedLen = Math.Min(routedReply.Length, GatewayHandler.GwSubjectOffset + reply.Length);
var mappedSubject = Encoding.ASCII.GetString(routedReply, 0, maxMappedLen);
var key = mappedSubject.Length > GatewayHandler.GwSubjectOffset
? mappedSubject[GatewayHandler.GwSubjectOffset..]
: mappedSubject;
mapping.Mapping[key] = new GwReplyMap
{
Ms = mappedSubject,
Exp = DateTime.UtcNow.Add(ttl).Ticks,
};
if (wasEmpty)
{
Interlocked.Exchange(ref mapping.Check, 1);
_gwReplyMappings[mapping] = locker;
if (Interlocked.CompareExchange(ref _gwReplyMapWorkerRunning, 1, 0) == 0)
{
if (!_gwReplyMapTtlUpdates.Writer.TryWrite(ttl))
{
while (_gwReplyMapTtlUpdates.Reader.TryRead(out _)) { }
_gwReplyMapTtlUpdates.Writer.TryWrite(ttl);
}
StartGWReplyMapExpiration();
}
}
}
}
internal void StartGWReplyMapExpiration()
{
_ = StartGoRoutine(() =>
{
var ttl = TimeSpan.Zero;
var token = _quitCts.Token;
while (!token.IsCancellationRequested)
{
try
{
if (ttl == TimeSpan.Zero)
{
ttl = _gwReplyMapTtlUpdates.Reader.ReadAsync(token).AsTask().GetAwaiter().GetResult();
}
Task.Delay(ttl, token).GetAwaiter().GetResult();
}
catch (OperationCanceledException)
{
break;
}
while (_gwReplyMapTtlUpdates.Reader.TryRead(out var nextTtl))
ttl = nextTtl;
var nowTicks = DateTime.UtcNow.Ticks;
var hasMappings = false;
foreach (var entry in _gwReplyMappings.ToArray())
{
var mapping = entry.Key;
var mapLocker = entry.Value;
lock (mapLocker)
{
foreach (var key in mapping.Mapping.Keys.ToArray())
{
if (mapping.Mapping[key].Exp <= nowTicks)
mapping.Mapping.Remove(key);
}
if (mapping.Mapping.Count == 0)
{
Interlocked.Exchange(ref mapping.Check, 0);
_gwReplyMappings.TryRemove(mapping, out _);
}
else
{
hasMappings = true;
}
}
}
if (!hasMappings && Interlocked.CompareExchange(ref _gwReplyMapWorkerRunning, 0, 1) == 1)
ttl = TimeSpan.Zero;
}
});
}
}

View File

@@ -436,8 +436,12 @@ public sealed partial class NatsServer
s.InitOCSPResponseCache();
// Gateway (stub — session 16).
// s.NewGateway(opts) — deferred
var gatewayErr = s.NewGateway(opts);
if (gatewayErr != null)
{
s._mu.ExitWriteLock();
return (null, gatewayErr);
}
// Cluster name.
if (opts.Cluster.Port != 0 && string.IsNullOrEmpty(opts.Cluster.Name))

View File

@@ -226,6 +226,18 @@ public class RemoteGatewayOpts
public double TlsTimeout { get; set; }
public List<Uri> Urls { get; set; } = [];
internal TlsConfigOpts? TlsConfigOpts { get; set; }
internal RemoteGatewayOpts Clone()
{
return new RemoteGatewayOpts
{
Name = Name,
TlsConfig = TlsConfig,
TlsTimeout = TlsTimeout,
TlsConfigOpts = TlsConfigOpts,
Urls = [.. Urls],
};
}
}
/// <summary>