diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.Gateways.Messages.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.Gateways.Messages.cs new file mode 100644 index 0000000..28e310b --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.Gateways.Messages.cs @@ -0,0 +1,38 @@ +// Copyright 2018-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 + +using System.Text; + +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class ClientConnection +{ + internal void SendMsgToGateways(Account? account, byte[] subject, byte[]? reply, byte[] payload) + { + if (Server is not NatsServer server || account == null || subject.Length == 0) + return; + + var outboundGateways = server.GetOutboundGatewayConnections(); + if (outboundGateways.Count == 0) + return; + + foreach (var gateway in outboundGateways) + { + if (!GatewayInterest(account, Encoding.ASCII.GetString(subject))) + continue; + + var replyToSend = reply; + if (reply is { Length: > 0 } && gateway.Gateway != null) + { + var shouldMap = server.ShouldMapReplyForGatewaySend(reply, gateway.Gateway.UseOldPrefix); + if (shouldMap) + replyToSend = reply; + } + + var proto = MsgHeader(subject, replyToSend, new Internal.Subscription { Sid = Encoding.ASCII.GetBytes("1") }); + gateway.EnqueueProto(proto); + gateway.EnqueueProto(payload); + gateway.EnqueueProto(Encoding.ASCII.GetBytes("\r\n")); + } + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.Gateways.Protocol.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.Gateways.Protocol.cs index 895874a..b20ffa5 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.Gateways.Protocol.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.Gateways.Protocol.cs @@ -99,4 +99,119 @@ public sealed partial class ClientConnection return null; } + + internal Exception? ProcessGatewayAccountUnsub(byte[] arg) + { + if (Server is not NatsServer server) + return new InvalidOperationException("gateway server unavailable"); + + var tokens = Encoding.ASCII.GetString(arg).Split(' ', StringSplitOptions.RemoveEmptyEntries); + if (tokens.Length < 1) + return null; + + if (tokens.Length == 1) + return null; + + var accountName = tokens[0]; + var subject = Encoding.ASCII.GetBytes(tokens[1]); + var queue = tokens.Length > 2 ? Encoding.ASCII.GetBytes(tokens[2]) : null; + + var (account, _) = server.LookupAccount(accountName); + if (account == null) + return null; + + var sub = new Internal.Subscription { Subject = subject, Queue = queue }; + server.SendQueueSubOrUnsubToGateways(account, sub, isUnsub: true); + return null; + } + + internal Exception? ProcessGatewayAccountSub(byte[] arg) + { + if (Server is not NatsServer server) + return new InvalidOperationException("gateway server unavailable"); + + var tokens = Encoding.ASCII.GetString(arg).Split(' ', StringSplitOptions.RemoveEmptyEntries); + if (tokens.Length < 2) + return null; + + var accountName = tokens[0]; + var subject = Encoding.ASCII.GetBytes(tokens[1]); + var queue = tokens.Length > 2 ? Encoding.ASCII.GetBytes(tokens[2]) : null; + + var (account, _) = server.LookupAccount(accountName); + if (account == null) + return null; + + var sub = new Internal.Subscription { Subject = subject, Queue = queue }; + if (queue is { Length: > 0 }) + server.SendQueueSubOrUnsubToGateways(account, sub, isUnsub: false); + else + server.MaybeSendSubOrUnsubToGateways(account, sub, isUnsub: false); + + return null; + } + + internal Exception? ProcessGatewayRUnsub(byte[] arg) + { + var tokens = Encoding.ASCII.GetString(arg).Split(' ', StringSplitOptions.RemoveEmptyEntries); + if (tokens.Length < 2 || Server is not NatsServer server) + return null; + + var accountName = tokens[0]; + var subject = Encoding.ASCII.GetBytes(tokens[1]); + var queue = tokens.Length > 2 ? Encoding.ASCII.GetBytes(tokens[2]) : null; + var (account, _) = server.LookupAccount(accountName); + if (account == null) + return null; + + var sub = new Internal.Subscription { Subject = subject, Queue = queue }; + server.MaybeSendSubOrUnsubToGateways(account, sub, isUnsub: true); + return null; + } + + internal Exception? ProcessGatewayRSub(byte[] arg) + { + var tokens = Encoding.ASCII.GetString(arg).Split(' ', StringSplitOptions.RemoveEmptyEntries); + if (tokens.Length < 2 || Server is not NatsServer server) + return null; + + var accountName = tokens[0]; + var subject = Encoding.ASCII.GetBytes(tokens[1]); + var queue = tokens.Length > 2 ? Encoding.ASCII.GetBytes(tokens[2]) : null; + var (account, _) = server.LookupAccount(accountName); + if (account == null) + return null; + + var sub = new Internal.Subscription { Subject = subject, Queue = queue }; + server.MaybeSendSubOrUnsubToGateways(account, sub, isUnsub: false); + return null; + } + + internal bool GatewayInterest(Account? account, string subject) + { + if (account == null || string.IsNullOrWhiteSpace(subject)) + return false; + + if (Gateway?.OutSim == null) + return true; + + if (!Gateway.OutSim.TryGetValue(account.Name, out var outSi)) + return true; + + outSi.AcquireReadLock(); + try + { + if (outSi.Mode == GatewayInterestMode.InterestOnly && outSi.Sl != null) + { + var match = outSi.Sl.Match(subject); + return match.PSubs.Count > 0 || match.QSubs.Count > 0; + } + + return outSi.Ni == null || !outSi.Ni.Contains(subject); + } + finally + { + outSi.ReleaseReadLock(); + } + } } diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Gateway/GatewayHandler.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Gateway/GatewayHandler.cs index 8b1df33..60281b3 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Gateway/GatewayHandler.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Gateway/GatewayHandler.cs @@ -9,6 +9,9 @@ namespace ZB.MOM.NatsNet.Server; internal static class GatewayHandler { + internal static readonly byte[] GwReplyPrefix = Encoding.ASCII.GetBytes("_GR_."); + internal static readonly byte[] OldGwReplyPrefix = Encoding.ASCII.GetBytes("$GR."); + private static readonly TimeSpan DefaultSolicitGatewaysDelay = TimeSpan.FromSeconds(1); private static long _gatewaySolicitDelayTicks = DefaultSolicitGatewaysDelay.Ticks; private static int _doNotForceInterestOnlyMode; @@ -77,6 +80,25 @@ internal static class GatewayHandler return Encoding.ASCII.GetBytes($"{op} {accountName} {subject}\r\n"); } + internal static bool IsGWRoutedReply(byte[] subject) + { + return subject.AsSpan().StartsWith(GwReplyPrefix) || subject.AsSpan().StartsWith(OldGwReplyPrefix); + } + + internal static (bool IsRouted, bool IsOldPrefix) IsGWRoutedSubjectAndIsOldPrefix(byte[] subject) + { + if (subject.AsSpan().StartsWith(OldGwReplyPrefix)) + return (true, true); + if (subject.AsSpan().StartsWith(GwReplyPrefix)) + return (true, false); + return (false, false); + } + + internal static bool HasGWRoutedReplyPrefix(byte[] subject) + { + return IsGWRoutedReply(subject); + } + private static byte[] GetHash(string value, int len) { var bytes = Encoding.UTF8.GetBytes(value); diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Gateway/GatewayTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Gateway/GatewayTypes.cs index ddeb7b4..e567081 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Gateway/GatewayTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Gateway/GatewayTypes.cs @@ -262,6 +262,25 @@ internal sealed class SrvGateway { Outo = [.. Out.Values.OrderBy(c => c.GetRttValue()).ThenBy(c => c.Cid)]; } + + internal void OrderOutboundConnections() + { + _lock.EnterWriteLock(); + try { OrderOutboundConnectionsLocked(); } + finally { _lock.ExitWriteLock(); } + } + + internal bool ShouldMapReplyForGatewaySend(byte[] reply, bool useOldPrefix) + { + if (reply.Length == 0) + return false; + + if (useOldPrefix) + return !GatewayHandler.IsGWRoutedSubjectAndIsOldPrefix(reply).IsRouted; + + return !GatewayHandler.IsGWRoutedReply(reply); + } + } /// diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Gateways.Interest.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Gateways.Interest.cs new file mode 100644 index 0000000..523d260 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Gateways.Interest.cs @@ -0,0 +1,77 @@ +// Copyright 2018-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 + +using System.Net; +using System.Text; +using ZB.MOM.NatsNet.Server.Internal; + +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class NatsServer +{ + internal bool ShouldMapReplyForGatewaySend(byte[] reply, bool useOldPrefix) + { + return _gateway.ShouldMapReplyForGatewaySend(reply, useOldPrefix); + } + + internal IReadOnlyList GetInboundGatewayConnections() + { + _gateway.AcquireReadLock(); + try { return [.. _gateway.In.Values]; } + finally { _gateway.ReleaseReadLock(); } + } + + internal string GatewayAddr() + { + var opts = GetOpts(); + var host = string.IsNullOrWhiteSpace(opts.Gateway.Host) ? opts.Host : opts.Gateway.Host; + return $"{host}:{opts.Gateway.Port}"; + } + + internal void SwitchAccountToInterestMode(string accountName) + { + _gateway.AcquireWriteLock(); + try + { + foreach (var outbound in _gateway.Out.Values) + { + outbound.Gateway ??= new Gateway(); + outbound.Gateway.OutSim ??= new System.Collections.Concurrent.ConcurrentDictionary(StringComparer.Ordinal); + outbound.Gateway.OutSim[accountName] = new OutSide + { + Mode = GatewayInterestMode.InterestOnly, + Ni = null, + }; + } + } + finally + { + _gateway.ReleaseWriteLock(); + } + } + + internal void MaybeSendSubOrUnsubToGateways(Account account, Subscription sub, bool isUnsub) + { + if (sub.Subject.Length == 0) + return; + + _gateway.AcquireReadLock(); + try + { + foreach (var gateway in _gateway.Out.Values) + gateway.EnqueueProto(GatewayHandler.GwBuildSubProto(account.Name, sub, isUnsub)); + } + finally + { + _gateway.ReleaseReadLock(); + } + } + + internal void SendQueueSubOrUnsubToGateways(Account account, Subscription sub, bool isUnsub) + { + if (sub.Queue is not { Length: > 0 }) + return; + + MaybeSendSubOrUnsubToGateways(account, sub, isUnsub); + } +} diff --git a/porting.db b/porting.db index 07133d6..1a95955 100644 Binary files a/porting.db and b/porting.db differ