diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs index efbaa61..ec6e7d0 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs @@ -358,6 +358,12 @@ public sealed partial class Account : INatsAccount /// internal byte[]? ServiceImportReply { get; set; } + /// + /// Gateway reply mapping table used for routed reply restoration. + /// Mirrors Go gwReplyMapping. + /// + internal GwReplyMapping GwReplyMapping { get; } = new(); + /// /// Subscription ID counter for internal use. /// Mirrors Go isid uint64. diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.Gateways.Messages.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.Gateways.Messages.cs index 28e310b..99370a1 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.Gateways.Messages.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.Gateways.Messages.cs @@ -2,6 +2,8 @@ // Licensed under the Apache License, Version 2.0 using System.Text; +using System.Text.Json; +using ZB.MOM.NatsNet.Server.Internal; namespace ZB.MOM.NatsNet.Server; @@ -35,4 +37,188 @@ public sealed partial class ClientConnection gateway.EnqueueProto(Encoding.ASCII.GetBytes("\r\n")); } } + + internal void SendAccountUnsubToGateway(byte[] accountName) + { + if (accountName.Length == 0) + return; + + lock (_mu) + { + Gateway ??= new Gateway(); + Gateway.InSim ??= new Dictionary(StringComparer.Ordinal); + + var key = Encoding.ASCII.GetString(accountName); + if (!Gateway.InSim.TryGetValue(key, out var entry) || entry != null) + { + Gateway.InSim[key] = null!; + var proto = Encoding.ASCII.GetBytes($"A- {key}\r\n"); + EnqueueProto(proto); + if (Trace) + TraceOutOp(string.Empty, proto.AsSpan(0, proto.Length - 2).ToArray()); + } + } + } + + internal bool HandleGatewayReply(byte[] msg) + { + if (Server is not NatsServer server || ParseCtx.Pa.Subject is not { Length: > 0 } originalSubject) + return false; + + var (isRoutedReply, isOldPrefix) = GatewayHandler.IsGWRoutedSubjectAndIsOldPrefix(originalSubject); + if (!isRoutedReply) + return false; + + ParseCtx.Pa.Subject = GatewayHandler.GetSubjectFromGWRoutedReply(originalSubject, isOldPrefix); + ParseCtx.Pa.PaCache = [.. ParseCtx.Pa.Account ?? [], (byte)' ', .. ParseCtx.Pa.Subject]; + + var (account, result) = GetAccAndResultFromCache(); + if (account is not Account concreteAccount) + { + Debugf("Unknown account {0} for gateway message on subject: {1}", + ParseCtx.Pa.Account is { Length: > 0 } accountName ? Encoding.ASCII.GetString(accountName) : string.Empty, + Encoding.ASCII.GetString(ParseCtx.Pa.Subject)); + + if (ParseCtx.Pa.Account is { Length: > 0 } gatewayAccount) + server.GatewayHandleAccountNoInterest(this, gatewayAccount); + return true; + } + + if (result != null && (result.PSubs.Count + result.QSubs.Count) > 0) + ProcessMsgResults(concreteAccount, result, msg, null, ParseCtx.Pa.Subject, ParseCtx.Pa.Reply, PmrFlags.None); + + if (!IsServiceReply(ParseCtx.Pa.Subject)) + SendMsgToGateways(concreteAccount, ParseCtx.Pa.Subject, ParseCtx.Pa.Reply, msg); + + return true; + } + + internal void ProcessInboundGatewayMsg(byte[] msg) + { + _in.Msgs++; + _in.Bytes += Math.Max(0, msg.Length - 2); + + if (Opts.Verbose) + SendOK(); + + if (Server is not NatsServer server || ParseCtx.Pa.Subject is not { Length: > 0 }) + return; + + if (HandleGatewayReply(msg)) + return; + + var (account, result) = GetAccAndResultFromCache(); + if (account is not Account concreteAccount) + { + Debugf("Unknown account {0} for gateway message on subject: {1}", + ParseCtx.Pa.Account is { Length: > 0 } accountName ? Encoding.ASCII.GetString(accountName) : string.Empty, + Encoding.ASCII.GetString(ParseCtx.Pa.Subject)); + + if (ParseCtx.Pa.Account is { Length: > 0 } gatewayAccount) + server.GatewayHandleAccountNoInterest(this, gatewayAccount); + return; + } + + var noInterest = result == null || result.PSubs.Count == 0; + if (noInterest) + { + server.GatewayHandleSubjectNoInterest(this, concreteAccount, ParseCtx.Pa.Account ?? Encoding.ASCII.GetBytes(concreteAccount.Name), ParseCtx.Pa.Subject); + if (ParseCtx.Pa.Queues is null || ParseCtx.Pa.Queues.Count == 0) + return; + } + + ProcessMsgResults(concreteAccount, result, msg, null, ParseCtx.Pa.Subject, ParseCtx.Pa.Reply, PmrFlags.None); + } + + internal void GatewayAllSubsReceiveStart(ServerInfo info) + { + var account = GatewayHandler.GetAccountFromGatewayCommand(this, info, "start"); + if (string.IsNullOrWhiteSpace(account)) + return; + + Debugf("Gateway {0}: switching account {1} to {2} mode", + info.Gateway ?? string.Empty, account, GatewayInterestMode.InterestOnly.String()); + + Gateway ??= new Gateway(); + Gateway.OutSim ??= new System.Collections.Concurrent.ConcurrentDictionary(StringComparer.Ordinal); + + var outSide = Gateway.OutSim.GetOrAdd(account, _ => new OutSide + { + Sl = Internal.DataStructures.SubscriptionIndex.NewSublistWithCache(), + }); + + outSide.AcquireWriteLock(); + try { outSide.Mode = GatewayInterestMode.Transitioning; } + finally { outSide.ReleaseWriteLock(); } + } + + internal void GatewayAllSubsReceiveComplete(ServerInfo info) + { + var account = GatewayHandler.GetAccountFromGatewayCommand(this, info, "complete"); + if (string.IsNullOrWhiteSpace(account)) + return; + + if (Gateway?.OutSim == null || !Gateway.OutSim.TryGetValue(account, out var outSide)) + return; + + outSide.AcquireWriteLock(); + try + { + outSide.Ni = null; + outSide.Mode = GatewayInterestMode.InterestOnly; + } + finally + { + outSide.ReleaseWriteLock(); + } + + Debugf("Gateway {0}: switching account {1} to {2} mode complete", + info.Gateway ?? string.Empty, account, GatewayInterestMode.InterestOnly.String()); + } + + internal void GatewaySwitchAccountToSendAllSubs(InSide inSide, string accountName) + { + if (Server is not NatsServer server || string.IsNullOrWhiteSpace(accountName)) + return; + + inSide.Ni = null; + inSide.Mode = GatewayInterestMode.Transitioning; + + var remoteGatewayName = Gateway?.Name ?? string.Empty; + Debugf("Gateway {0}: switching account {1} to {2} mode", + remoteGatewayName, accountName, GatewayInterestMode.InterestOnly.String()); + + void SendCommand(byte command, bool withLock) + { + var info = new ServerInfo + { + Gateway = server.GetGatewayName(), + GatewayCmd = command, + GatewayCmdPayload = Encoding.ASCII.GetBytes(accountName), + }; + + var infoProto = NatsServer.GenerateInfoJson(info); + if (withLock) + { + lock (_mu) + { + EnqueueProto(infoProto); + } + return; + } + + EnqueueProto(infoProto); + } + + SendCommand(GatewayHandler.GatewayCmdAllSubsStart, withLock: false); + + _ = server.StartGoRoutine(() => + { + server.SendAccountSubsToGateway(this, accountName); + SendCommand(GatewayHandler.GatewayCmdAllSubsComplete, withLock: true); + + Debugf("Gateway {0}: switching account {1} to {2} mode complete", + remoteGatewayName, accountName, GatewayInterestMode.InterestOnly.String()); + }); + } } diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs index 1e940ba..321524d 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs @@ -135,6 +135,7 @@ public sealed partial class ClientConnection internal Dictionary? Replies; internal Dictionary? Pcd; // pending clients with data to flush internal Dictionary? DArray; // denied subscribe patterns + internal GwReplyMapping GwReplyMapping = new(); // Outbound state (simplified — full write loop ported when Server is available). internal long OutPb; // pending bytes diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Gateway/GatewayHandler.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Gateway/GatewayHandler.cs index 60281b3..5326925 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Gateway/GatewayHandler.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Gateway/GatewayHandler.cs @@ -11,6 +11,15 @@ internal static class GatewayHandler { internal static readonly byte[] GwReplyPrefix = Encoding.ASCII.GetBytes("_GR_."); internal static readonly byte[] OldGwReplyPrefix = Encoding.ASCII.GetBytes("$GR."); + internal const int GwHashLen = 6; + internal const int GwClusterOffset = 5; + internal const int GwServerOffset = GwClusterOffset + GwHashLen + 1; + internal const int GwSubjectOffset = GwServerOffset + GwHashLen + 1; + internal const int OldGwReplyPrefixLen = 4; + internal const int OldGwReplyStart = OldGwReplyPrefixLen + 5; + internal const int GatewayCmdAllSubsStart = 2; + internal const int GatewayCmdAllSubsComplete = 3; + internal const int GatewayMaxRUnsubBeforeSwitch = 1000; private static readonly TimeSpan DefaultSolicitGatewaysDelay = TimeSpan.FromSeconds(1); private static long _gatewaySolicitDelayTicks = DefaultSolicitGatewaysDelay.Ticks; @@ -99,6 +108,26 @@ internal static class GatewayHandler return IsGWRoutedReply(subject); } + internal static byte[] GetSubjectFromGWRoutedReply(byte[] reply, bool isOldPrefix) + { + if (isOldPrefix) + return reply.Length > OldGwReplyStart ? reply[OldGwReplyStart..] : []; + + return reply.Length > GwSubjectOffset ? reply[GwSubjectOffset..] : []; + } + + internal static string GetAccountFromGatewayCommand(ClientConnection connection, ServerInfo info, string command) + { + if (info.GatewayCmdPayload == null || info.GatewayCmdPayload.Length == 0) + { + connection.SendErrAndErr($"Account absent from receive-all-subscriptions-{command} command"); + connection.CloseConnection(ClosedState.ProtocolViolation); + return string.Empty; + } + + return Encoding.ASCII.GetString(info.GatewayCmdPayload); + } + private static byte[] GetHash(string value, int len) { var bytes = Encoding.UTF8.GetBytes(value); diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Gateway/GatewayTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Gateway/GatewayTypes.cs index e567081..1b936d2 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Gateway/GatewayTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Gateway/GatewayTypes.cs @@ -281,6 +281,22 @@ internal sealed class SrvGateway return !GatewayHandler.IsGWRoutedReply(reply); } + internal byte[] GetClusterHash() + { + _lock.EnterReadLock(); + try + { + if (ReplyPfx.Length < GatewayHandler.GwClusterOffset + GatewayHandler.GwHashLen) + return []; + + return ReplyPfx[GatewayHandler.GwClusterOffset..(GatewayHandler.GwClusterOffset + GatewayHandler.GwHashLen)]; + } + finally + { + _lock.ExitReadLock(); + } + } + } /// diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Gateways.Interest.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Gateways.Interest.cs index 523d260..9770b3d 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Gateways.Interest.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Gateways.Interest.cs @@ -74,4 +74,72 @@ public sealed partial class NatsServer MaybeSendSubOrUnsubToGateways(account, sub, isUnsub); } + + internal void GatewayHandleAccountNoInterest(ClientConnection connection, byte[] accountName) + { + lock (_gateway.PasiLock) + { + var key = Encoding.ASCII.GetString(accountName); + if (_gateway.Pasi.TryGetValue(key, out var accountInterest) && accountInterest.Count > 0) + return; + + connection.SendAccountUnsubToGateway(accountName); + } + } + + internal void GatewayHandleSubjectNoInterest(ClientConnection connection, Account account, byte[] accountName, byte[] subject) + { + lock (_gateway.PasiLock) + { + var hasSubs = (account.Sublist?.Count() ?? 0) > 0 || account.ServiceImportReply != null; + if (!hasSubs) + { + connection.SendAccountUnsubToGateway(Encoding.ASCII.GetBytes(account.Name)); + return; + } + + var sendProto = false; + lock (connection) + { + connection.Gateway ??= new Gateway(); + connection.Gateway.InSim ??= new Dictionary(StringComparer.Ordinal); + + var accountKey = Encoding.ASCII.GetString(accountName); + if (!connection.Gateway.InSim.TryGetValue(accountKey, out var inSide) || inSide == null) + { + inSide = new InSide + { + Ni = new HashSet(StringComparer.Ordinal), + }; + inSide.Ni.Add(Encoding.ASCII.GetString(subject)); + connection.Gateway.InSim[accountKey] = inSide; + sendProto = true; + } + else if (inSide.Ni != null) + { + var subjectKey = Encoding.ASCII.GetString(subject); + if (!inSide.Ni.Contains(subjectKey)) + { + if (inSide.Ni.Count >= GatewayHandler.GatewayMaxRUnsubBeforeSwitch) + { + connection.GatewaySwitchAccountToSendAllSubs(inSide, accountKey); + } + else + { + inSide.Ni.Add(subjectKey); + sendProto = true; + } + } + } + } + + if (!sendProto) + return; + + var protocol = Encoding.ASCII.GetBytes($"RS- {Encoding.ASCII.GetString(accountName)} {Encoding.ASCII.GetString(subject)}\r\n"); + connection.EnqueueProto(protocol); + if (connection.Trace) + connection.TraceOutOp(string.Empty, protocol.AsSpan(0, protocol.Length - 2).ToArray()); + } + } } diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Gateways.ReplyMap.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Gateways.ReplyMap.cs new file mode 100644 index 0000000..ccdd894 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Gateways.ReplyMap.cs @@ -0,0 +1,177 @@ +// Copyright 2018-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 + +using System.Collections.Concurrent; +using System.Text; +using System.Threading.Channels; + +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class NatsServer +{ + private readonly ConcurrentDictionary _gwReplyMappings = new(); + private readonly Channel _gwReplyMapTtlUpdates = Channel.CreateBounded(1); + private int _gwReplyMapWorkerRunning; + + internal void StoreRouteByHash(string serverIdHash, ClientConnection route) + { + if (!_gateway.Enabled || string.IsNullOrWhiteSpace(serverIdHash)) + return; + + _gateway.RoutesIdByHash[serverIdHash] = route; + } + + internal void RemoveRouteByHash(string serverIdHash) + { + if (!_gateway.Enabled || string.IsNullOrWhiteSpace(serverIdHash)) + return; + + _gateway.RoutesIdByHash.TryRemove(serverIdHash, out _); + } + + internal (ClientConnection? Route, bool PerAccount) GetRouteByHash(byte[] hash, byte[] accountName) + { + if (hash.Length == 0) + return (null, false); + + var id = Encoding.ASCII.GetString(hash); + var perAccount = false; + var accountKey = Encoding.ASCII.GetString(accountName); + if (_accRouteByHash.TryGetValue(accountKey, out var accountRouteEntry)) + { + if (accountRouteEntry == null) + { + id += accountKey; + perAccount = true; + } + else if (accountRouteEntry is int routeIndex) + { + id += routeIndex.ToString(); + } + } + + if (_gateway.RoutesIdByHash.TryGetValue(id, out var route)) + return (route, perAccount); + + if (!perAccount && _gateway.RoutesIdByHash.TryGetValue($"{Encoding.ASCII.GetString(hash)}0", out var noPoolRoute)) + { + lock (noPoolRoute) + { + if (noPoolRoute.Route?.NoPool == true) + return (noPoolRoute, false); + } + } + + return (null, perAccount); + } + + internal void TrackGWReply(ClientConnection? client, Account? account, byte[] reply, byte[] routedReply) + { + GwReplyMapping? mapping = null; + object? locker = null; + + if (account != null) + { + mapping = account.GwReplyMapping; + locker = account; + } + else if (client != null) + { + mapping = client.GwReplyMapping; + locker = client; + } + + if (mapping == null || locker == null || reply.Length == 0 || routedReply.Length == 0) + return; + + var ttl = _gateway.RecSubExp <= TimeSpan.Zero ? TimeSpan.FromSeconds(2) : _gateway.RecSubExp; + lock (locker) + { + var wasEmpty = mapping.Mapping.Count == 0; + var maxMappedLen = Math.Min(routedReply.Length, GatewayHandler.GwSubjectOffset + reply.Length); + var mappedSubject = Encoding.ASCII.GetString(routedReply, 0, maxMappedLen); + var key = mappedSubject.Length > GatewayHandler.GwSubjectOffset + ? mappedSubject[GatewayHandler.GwSubjectOffset..] + : mappedSubject; + + mapping.Mapping[key] = new GwReplyMap + { + Ms = mappedSubject, + Exp = DateTime.UtcNow.Add(ttl).Ticks, + }; + + if (wasEmpty) + { + Interlocked.Exchange(ref mapping.Check, 1); + _gwReplyMappings[mapping] = locker; + if (Interlocked.CompareExchange(ref _gwReplyMapWorkerRunning, 1, 0) == 0) + { + if (!_gwReplyMapTtlUpdates.Writer.TryWrite(ttl)) + { + while (_gwReplyMapTtlUpdates.Reader.TryRead(out _)) { } + _gwReplyMapTtlUpdates.Writer.TryWrite(ttl); + } + StartGWReplyMapExpiration(); + } + } + } + } + + internal void StartGWReplyMapExpiration() + { + _ = StartGoRoutine(() => + { + var ttl = TimeSpan.Zero; + var token = _quitCts.Token; + + while (!token.IsCancellationRequested) + { + try + { + if (ttl == TimeSpan.Zero) + { + ttl = _gwReplyMapTtlUpdates.Reader.ReadAsync(token).AsTask().GetAwaiter().GetResult(); + } + + Task.Delay(ttl, token).GetAwaiter().GetResult(); + } + catch (OperationCanceledException) + { + break; + } + + while (_gwReplyMapTtlUpdates.Reader.TryRead(out var nextTtl)) + ttl = nextTtl; + + var nowTicks = DateTime.UtcNow.Ticks; + var hasMappings = false; + foreach (var entry in _gwReplyMappings.ToArray()) + { + var mapping = entry.Key; + var mapLocker = entry.Value; + lock (mapLocker) + { + foreach (var key in mapping.Mapping.Keys.ToArray()) + { + if (mapping.Mapping[key].Exp <= nowTicks) + mapping.Mapping.Remove(key); + } + + if (mapping.Mapping.Count == 0) + { + Interlocked.Exchange(ref mapping.Check, 0); + _gwReplyMappings.TryRemove(mapping, out _); + } + else + { + hasMappings = true; + } + } + } + + if (!hasMappings && Interlocked.CompareExchange(ref _gwReplyMapWorkerRunning, 0, 1) == 1) + ttl = TimeSpan.Zero; + } + }); + } +} diff --git a/porting.db b/porting.db index 1a95955..f94140b 100644 Binary files a/porting.db and b/porting.db differ