feat(batch25): implement gateway reply map and inbound message pipeline

This commit is contained in:
Joseph Doherty
2026-03-01 02:07:25 -05:00
parent 59fa600b3c
commit e9be0751ec
8 changed files with 483 additions and 0 deletions

View File

@@ -358,6 +358,12 @@ public sealed partial class Account : INatsAccount
/// </summary>
internal byte[]? ServiceImportReply { get; set; }
/// <summary>
/// Gateway reply mapping table used for routed reply restoration.
/// Mirrors Go <c>gwReplyMapping</c>.
/// </summary>
internal GwReplyMapping GwReplyMapping { get; } = new();
/// <summary>
/// Subscription ID counter for internal use.
/// Mirrors Go <c>isid uint64</c>.

View File

@@ -2,6 +2,8 @@
// Licensed under the Apache License, Version 2.0
using System.Text;
using System.Text.Json;
using ZB.MOM.NatsNet.Server.Internal;
namespace ZB.MOM.NatsNet.Server;
@@ -35,4 +37,188 @@ public sealed partial class ClientConnection
gateway.EnqueueProto(Encoding.ASCII.GetBytes("\r\n"));
}
}
internal void SendAccountUnsubToGateway(byte[] accountName)
{
if (accountName.Length == 0)
return;
lock (_mu)
{
Gateway ??= new Gateway();
Gateway.InSim ??= new Dictionary<string, InSide>(StringComparer.Ordinal);
var key = Encoding.ASCII.GetString(accountName);
if (!Gateway.InSim.TryGetValue(key, out var entry) || entry != null)
{
Gateway.InSim[key] = null!;
var proto = Encoding.ASCII.GetBytes($"A- {key}\r\n");
EnqueueProto(proto);
if (Trace)
TraceOutOp(string.Empty, proto.AsSpan(0, proto.Length - 2).ToArray());
}
}
}
internal bool HandleGatewayReply(byte[] msg)
{
if (Server is not NatsServer server || ParseCtx.Pa.Subject is not { Length: > 0 } originalSubject)
return false;
var (isRoutedReply, isOldPrefix) = GatewayHandler.IsGWRoutedSubjectAndIsOldPrefix(originalSubject);
if (!isRoutedReply)
return false;
ParseCtx.Pa.Subject = GatewayHandler.GetSubjectFromGWRoutedReply(originalSubject, isOldPrefix);
ParseCtx.Pa.PaCache = [.. ParseCtx.Pa.Account ?? [], (byte)' ', .. ParseCtx.Pa.Subject];
var (account, result) = GetAccAndResultFromCache();
if (account is not Account concreteAccount)
{
Debugf("Unknown account {0} for gateway message on subject: {1}",
ParseCtx.Pa.Account is { Length: > 0 } accountName ? Encoding.ASCII.GetString(accountName) : string.Empty,
Encoding.ASCII.GetString(ParseCtx.Pa.Subject));
if (ParseCtx.Pa.Account is { Length: > 0 } gatewayAccount)
server.GatewayHandleAccountNoInterest(this, gatewayAccount);
return true;
}
if (result != null && (result.PSubs.Count + result.QSubs.Count) > 0)
ProcessMsgResults(concreteAccount, result, msg, null, ParseCtx.Pa.Subject, ParseCtx.Pa.Reply, PmrFlags.None);
if (!IsServiceReply(ParseCtx.Pa.Subject))
SendMsgToGateways(concreteAccount, ParseCtx.Pa.Subject, ParseCtx.Pa.Reply, msg);
return true;
}
internal void ProcessInboundGatewayMsg(byte[] msg)
{
_in.Msgs++;
_in.Bytes += Math.Max(0, msg.Length - 2);
if (Opts.Verbose)
SendOK();
if (Server is not NatsServer server || ParseCtx.Pa.Subject is not { Length: > 0 })
return;
if (HandleGatewayReply(msg))
return;
var (account, result) = GetAccAndResultFromCache();
if (account is not Account concreteAccount)
{
Debugf("Unknown account {0} for gateway message on subject: {1}",
ParseCtx.Pa.Account is { Length: > 0 } accountName ? Encoding.ASCII.GetString(accountName) : string.Empty,
Encoding.ASCII.GetString(ParseCtx.Pa.Subject));
if (ParseCtx.Pa.Account is { Length: > 0 } gatewayAccount)
server.GatewayHandleAccountNoInterest(this, gatewayAccount);
return;
}
var noInterest = result == null || result.PSubs.Count == 0;
if (noInterest)
{
server.GatewayHandleSubjectNoInterest(this, concreteAccount, ParseCtx.Pa.Account ?? Encoding.ASCII.GetBytes(concreteAccount.Name), ParseCtx.Pa.Subject);
if (ParseCtx.Pa.Queues is null || ParseCtx.Pa.Queues.Count == 0)
return;
}
ProcessMsgResults(concreteAccount, result, msg, null, ParseCtx.Pa.Subject, ParseCtx.Pa.Reply, PmrFlags.None);
}
internal void GatewayAllSubsReceiveStart(ServerInfo info)
{
var account = GatewayHandler.GetAccountFromGatewayCommand(this, info, "start");
if (string.IsNullOrWhiteSpace(account))
return;
Debugf("Gateway {0}: switching account {1} to {2} mode",
info.Gateway ?? string.Empty, account, GatewayInterestMode.InterestOnly.String());
Gateway ??= new Gateway();
Gateway.OutSim ??= new System.Collections.Concurrent.ConcurrentDictionary<string, OutSide>(StringComparer.Ordinal);
var outSide = Gateway.OutSim.GetOrAdd(account, _ => new OutSide
{
Sl = Internal.DataStructures.SubscriptionIndex.NewSublistWithCache(),
});
outSide.AcquireWriteLock();
try { outSide.Mode = GatewayInterestMode.Transitioning; }
finally { outSide.ReleaseWriteLock(); }
}
internal void GatewayAllSubsReceiveComplete(ServerInfo info)
{
var account = GatewayHandler.GetAccountFromGatewayCommand(this, info, "complete");
if (string.IsNullOrWhiteSpace(account))
return;
if (Gateway?.OutSim == null || !Gateway.OutSim.TryGetValue(account, out var outSide))
return;
outSide.AcquireWriteLock();
try
{
outSide.Ni = null;
outSide.Mode = GatewayInterestMode.InterestOnly;
}
finally
{
outSide.ReleaseWriteLock();
}
Debugf("Gateway {0}: switching account {1} to {2} mode complete",
info.Gateway ?? string.Empty, account, GatewayInterestMode.InterestOnly.String());
}
internal void GatewaySwitchAccountToSendAllSubs(InSide inSide, string accountName)
{
if (Server is not NatsServer server || string.IsNullOrWhiteSpace(accountName))
return;
inSide.Ni = null;
inSide.Mode = GatewayInterestMode.Transitioning;
var remoteGatewayName = Gateway?.Name ?? string.Empty;
Debugf("Gateway {0}: switching account {1} to {2} mode",
remoteGatewayName, accountName, GatewayInterestMode.InterestOnly.String());
void SendCommand(byte command, bool withLock)
{
var info = new ServerInfo
{
Gateway = server.GetGatewayName(),
GatewayCmd = command,
GatewayCmdPayload = Encoding.ASCII.GetBytes(accountName),
};
var infoProto = NatsServer.GenerateInfoJson(info);
if (withLock)
{
lock (_mu)
{
EnqueueProto(infoProto);
}
return;
}
EnqueueProto(infoProto);
}
SendCommand(GatewayHandler.GatewayCmdAllSubsStart, withLock: false);
_ = server.StartGoRoutine(() =>
{
server.SendAccountSubsToGateway(this, accountName);
SendCommand(GatewayHandler.GatewayCmdAllSubsComplete, withLock: true);
Debugf("Gateway {0}: switching account {1} to {2} mode complete",
remoteGatewayName, accountName, GatewayInterestMode.InterestOnly.String());
});
}
}

View File

@@ -135,6 +135,7 @@ public sealed partial class ClientConnection
internal Dictionary<string, RespEntry>? Replies;
internal Dictionary<ClientConnection, bool>? Pcd; // pending clients with data to flush
internal Dictionary<string, bool>? DArray; // denied subscribe patterns
internal GwReplyMapping GwReplyMapping = new();
// Outbound state (simplified — full write loop ported when Server is available).
internal long OutPb; // pending bytes

View File

@@ -11,6 +11,15 @@ internal static class GatewayHandler
{
internal static readonly byte[] GwReplyPrefix = Encoding.ASCII.GetBytes("_GR_.");
internal static readonly byte[] OldGwReplyPrefix = Encoding.ASCII.GetBytes("$GR.");
internal const int GwHashLen = 6;
internal const int GwClusterOffset = 5;
internal const int GwServerOffset = GwClusterOffset + GwHashLen + 1;
internal const int GwSubjectOffset = GwServerOffset + GwHashLen + 1;
internal const int OldGwReplyPrefixLen = 4;
internal const int OldGwReplyStart = OldGwReplyPrefixLen + 5;
internal const int GatewayCmdAllSubsStart = 2;
internal const int GatewayCmdAllSubsComplete = 3;
internal const int GatewayMaxRUnsubBeforeSwitch = 1000;
private static readonly TimeSpan DefaultSolicitGatewaysDelay = TimeSpan.FromSeconds(1);
private static long _gatewaySolicitDelayTicks = DefaultSolicitGatewaysDelay.Ticks;
@@ -99,6 +108,26 @@ internal static class GatewayHandler
return IsGWRoutedReply(subject);
}
internal static byte[] GetSubjectFromGWRoutedReply(byte[] reply, bool isOldPrefix)
{
if (isOldPrefix)
return reply.Length > OldGwReplyStart ? reply[OldGwReplyStart..] : [];
return reply.Length > GwSubjectOffset ? reply[GwSubjectOffset..] : [];
}
internal static string GetAccountFromGatewayCommand(ClientConnection connection, ServerInfo info, string command)
{
if (info.GatewayCmdPayload == null || info.GatewayCmdPayload.Length == 0)
{
connection.SendErrAndErr($"Account absent from receive-all-subscriptions-{command} command");
connection.CloseConnection(ClosedState.ProtocolViolation);
return string.Empty;
}
return Encoding.ASCII.GetString(info.GatewayCmdPayload);
}
private static byte[] GetHash(string value, int len)
{
var bytes = Encoding.UTF8.GetBytes(value);

View File

@@ -281,6 +281,22 @@ internal sealed class SrvGateway
return !GatewayHandler.IsGWRoutedReply(reply);
}
internal byte[] GetClusterHash()
{
_lock.EnterReadLock();
try
{
if (ReplyPfx.Length < GatewayHandler.GwClusterOffset + GatewayHandler.GwHashLen)
return [];
return ReplyPfx[GatewayHandler.GwClusterOffset..(GatewayHandler.GwClusterOffset + GatewayHandler.GwHashLen)];
}
finally
{
_lock.ExitReadLock();
}
}
}
/// <summary>

View File

@@ -74,4 +74,72 @@ public sealed partial class NatsServer
MaybeSendSubOrUnsubToGateways(account, sub, isUnsub);
}
internal void GatewayHandleAccountNoInterest(ClientConnection connection, byte[] accountName)
{
lock (_gateway.PasiLock)
{
var key = Encoding.ASCII.GetString(accountName);
if (_gateway.Pasi.TryGetValue(key, out var accountInterest) && accountInterest.Count > 0)
return;
connection.SendAccountUnsubToGateway(accountName);
}
}
internal void GatewayHandleSubjectNoInterest(ClientConnection connection, Account account, byte[] accountName, byte[] subject)
{
lock (_gateway.PasiLock)
{
var hasSubs = (account.Sublist?.Count() ?? 0) > 0 || account.ServiceImportReply != null;
if (!hasSubs)
{
connection.SendAccountUnsubToGateway(Encoding.ASCII.GetBytes(account.Name));
return;
}
var sendProto = false;
lock (connection)
{
connection.Gateway ??= new Gateway();
connection.Gateway.InSim ??= new Dictionary<string, InSide>(StringComparer.Ordinal);
var accountKey = Encoding.ASCII.GetString(accountName);
if (!connection.Gateway.InSim.TryGetValue(accountKey, out var inSide) || inSide == null)
{
inSide = new InSide
{
Ni = new HashSet<string>(StringComparer.Ordinal),
};
inSide.Ni.Add(Encoding.ASCII.GetString(subject));
connection.Gateway.InSim[accountKey] = inSide;
sendProto = true;
}
else if (inSide.Ni != null)
{
var subjectKey = Encoding.ASCII.GetString(subject);
if (!inSide.Ni.Contains(subjectKey))
{
if (inSide.Ni.Count >= GatewayHandler.GatewayMaxRUnsubBeforeSwitch)
{
connection.GatewaySwitchAccountToSendAllSubs(inSide, accountKey);
}
else
{
inSide.Ni.Add(subjectKey);
sendProto = true;
}
}
}
}
if (!sendProto)
return;
var protocol = Encoding.ASCII.GetBytes($"RS- {Encoding.ASCII.GetString(accountName)} {Encoding.ASCII.GetString(subject)}\r\n");
connection.EnqueueProto(protocol);
if (connection.Trace)
connection.TraceOutOp(string.Empty, protocol.AsSpan(0, protocol.Length - 2).ToArray());
}
}
}

View File

@@ -0,0 +1,177 @@
// Copyright 2018-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0
using System.Collections.Concurrent;
using System.Text;
using System.Threading.Channels;
namespace ZB.MOM.NatsNet.Server;
public sealed partial class NatsServer
{
private readonly ConcurrentDictionary<GwReplyMapping, object> _gwReplyMappings = new();
private readonly Channel<TimeSpan> _gwReplyMapTtlUpdates = Channel.CreateBounded<TimeSpan>(1);
private int _gwReplyMapWorkerRunning;
internal void StoreRouteByHash(string serverIdHash, ClientConnection route)
{
if (!_gateway.Enabled || string.IsNullOrWhiteSpace(serverIdHash))
return;
_gateway.RoutesIdByHash[serverIdHash] = route;
}
internal void RemoveRouteByHash(string serverIdHash)
{
if (!_gateway.Enabled || string.IsNullOrWhiteSpace(serverIdHash))
return;
_gateway.RoutesIdByHash.TryRemove(serverIdHash, out _);
}
internal (ClientConnection? Route, bool PerAccount) GetRouteByHash(byte[] hash, byte[] accountName)
{
if (hash.Length == 0)
return (null, false);
var id = Encoding.ASCII.GetString(hash);
var perAccount = false;
var accountKey = Encoding.ASCII.GetString(accountName);
if (_accRouteByHash.TryGetValue(accountKey, out var accountRouteEntry))
{
if (accountRouteEntry == null)
{
id += accountKey;
perAccount = true;
}
else if (accountRouteEntry is int routeIndex)
{
id += routeIndex.ToString();
}
}
if (_gateway.RoutesIdByHash.TryGetValue(id, out var route))
return (route, perAccount);
if (!perAccount && _gateway.RoutesIdByHash.TryGetValue($"{Encoding.ASCII.GetString(hash)}0", out var noPoolRoute))
{
lock (noPoolRoute)
{
if (noPoolRoute.Route?.NoPool == true)
return (noPoolRoute, false);
}
}
return (null, perAccount);
}
internal void TrackGWReply(ClientConnection? client, Account? account, byte[] reply, byte[] routedReply)
{
GwReplyMapping? mapping = null;
object? locker = null;
if (account != null)
{
mapping = account.GwReplyMapping;
locker = account;
}
else if (client != null)
{
mapping = client.GwReplyMapping;
locker = client;
}
if (mapping == null || locker == null || reply.Length == 0 || routedReply.Length == 0)
return;
var ttl = _gateway.RecSubExp <= TimeSpan.Zero ? TimeSpan.FromSeconds(2) : _gateway.RecSubExp;
lock (locker)
{
var wasEmpty = mapping.Mapping.Count == 0;
var maxMappedLen = Math.Min(routedReply.Length, GatewayHandler.GwSubjectOffset + reply.Length);
var mappedSubject = Encoding.ASCII.GetString(routedReply, 0, maxMappedLen);
var key = mappedSubject.Length > GatewayHandler.GwSubjectOffset
? mappedSubject[GatewayHandler.GwSubjectOffset..]
: mappedSubject;
mapping.Mapping[key] = new GwReplyMap
{
Ms = mappedSubject,
Exp = DateTime.UtcNow.Add(ttl).Ticks,
};
if (wasEmpty)
{
Interlocked.Exchange(ref mapping.Check, 1);
_gwReplyMappings[mapping] = locker;
if (Interlocked.CompareExchange(ref _gwReplyMapWorkerRunning, 1, 0) == 0)
{
if (!_gwReplyMapTtlUpdates.Writer.TryWrite(ttl))
{
while (_gwReplyMapTtlUpdates.Reader.TryRead(out _)) { }
_gwReplyMapTtlUpdates.Writer.TryWrite(ttl);
}
StartGWReplyMapExpiration();
}
}
}
}
internal void StartGWReplyMapExpiration()
{
_ = StartGoRoutine(() =>
{
var ttl = TimeSpan.Zero;
var token = _quitCts.Token;
while (!token.IsCancellationRequested)
{
try
{
if (ttl == TimeSpan.Zero)
{
ttl = _gwReplyMapTtlUpdates.Reader.ReadAsync(token).AsTask().GetAwaiter().GetResult();
}
Task.Delay(ttl, token).GetAwaiter().GetResult();
}
catch (OperationCanceledException)
{
break;
}
while (_gwReplyMapTtlUpdates.Reader.TryRead(out var nextTtl))
ttl = nextTtl;
var nowTicks = DateTime.UtcNow.Ticks;
var hasMappings = false;
foreach (var entry in _gwReplyMappings.ToArray())
{
var mapping = entry.Key;
var mapLocker = entry.Value;
lock (mapLocker)
{
foreach (var key in mapping.Mapping.Keys.ToArray())
{
if (mapping.Mapping[key].Exp <= nowTicks)
mapping.Mapping.Remove(key);
}
if (mapping.Mapping.Count == 0)
{
Interlocked.Exchange(ref mapping.Check, 0);
_gwReplyMappings.TryRemove(mapping, out _);
}
else
{
hasMappings = true;
}
}
}
if (!hasMappings && Interlocked.CompareExchange(ref _gwReplyMapWorkerRunning, 0, 1) == 1)
ttl = TimeSpan.Zero;
}
});
}
}

Binary file not shown.