feat(batch25): implement gateway interest and outbound send logic

This commit is contained in:
Joseph Doherty
2026-03-01 02:00:08 -05:00
parent 0fece7f2f3
commit 59fa600b3c
6 changed files with 271 additions and 0 deletions

View File

@@ -0,0 +1,38 @@
// Copyright 2018-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0
using System.Text;
namespace ZB.MOM.NatsNet.Server;
public sealed partial class ClientConnection
{
internal void SendMsgToGateways(Account? account, byte[] subject, byte[]? reply, byte[] payload)
{
if (Server is not NatsServer server || account == null || subject.Length == 0)
return;
var outboundGateways = server.GetOutboundGatewayConnections();
if (outboundGateways.Count == 0)
return;
foreach (var gateway in outboundGateways)
{
if (!GatewayInterest(account, Encoding.ASCII.GetString(subject)))
continue;
var replyToSend = reply;
if (reply is { Length: > 0 } && gateway.Gateway != null)
{
var shouldMap = server.ShouldMapReplyForGatewaySend(reply, gateway.Gateway.UseOldPrefix);
if (shouldMap)
replyToSend = reply;
}
var proto = MsgHeader(subject, replyToSend, new Internal.Subscription { Sid = Encoding.ASCII.GetBytes("1") });
gateway.EnqueueProto(proto);
gateway.EnqueueProto(payload);
gateway.EnqueueProto(Encoding.ASCII.GetBytes("\r\n"));
}
}
}

View File

@@ -99,4 +99,119 @@ public sealed partial class ClientConnection
return null;
}
internal Exception? ProcessGatewayAccountUnsub(byte[] arg)
{
if (Server is not NatsServer server)
return new InvalidOperationException("gateway server unavailable");
var tokens = Encoding.ASCII.GetString(arg).Split(' ', StringSplitOptions.RemoveEmptyEntries);
if (tokens.Length < 1)
return null;
if (tokens.Length == 1)
return null;
var accountName = tokens[0];
var subject = Encoding.ASCII.GetBytes(tokens[1]);
var queue = tokens.Length > 2 ? Encoding.ASCII.GetBytes(tokens[2]) : null;
var (account, _) = server.LookupAccount(accountName);
if (account == null)
return null;
var sub = new Internal.Subscription { Subject = subject, Queue = queue };
server.SendQueueSubOrUnsubToGateways(account, sub, isUnsub: true);
return null;
}
internal Exception? ProcessGatewayAccountSub(byte[] arg)
{
if (Server is not NatsServer server)
return new InvalidOperationException("gateway server unavailable");
var tokens = Encoding.ASCII.GetString(arg).Split(' ', StringSplitOptions.RemoveEmptyEntries);
if (tokens.Length < 2)
return null;
var accountName = tokens[0];
var subject = Encoding.ASCII.GetBytes(tokens[1]);
var queue = tokens.Length > 2 ? Encoding.ASCII.GetBytes(tokens[2]) : null;
var (account, _) = server.LookupAccount(accountName);
if (account == null)
return null;
var sub = new Internal.Subscription { Subject = subject, Queue = queue };
if (queue is { Length: > 0 })
server.SendQueueSubOrUnsubToGateways(account, sub, isUnsub: false);
else
server.MaybeSendSubOrUnsubToGateways(account, sub, isUnsub: false);
return null;
}
internal Exception? ProcessGatewayRUnsub(byte[] arg)
{
var tokens = Encoding.ASCII.GetString(arg).Split(' ', StringSplitOptions.RemoveEmptyEntries);
if (tokens.Length < 2 || Server is not NatsServer server)
return null;
var accountName = tokens[0];
var subject = Encoding.ASCII.GetBytes(tokens[1]);
var queue = tokens.Length > 2 ? Encoding.ASCII.GetBytes(tokens[2]) : null;
var (account, _) = server.LookupAccount(accountName);
if (account == null)
return null;
var sub = new Internal.Subscription { Subject = subject, Queue = queue };
server.MaybeSendSubOrUnsubToGateways(account, sub, isUnsub: true);
return null;
}
internal Exception? ProcessGatewayRSub(byte[] arg)
{
var tokens = Encoding.ASCII.GetString(arg).Split(' ', StringSplitOptions.RemoveEmptyEntries);
if (tokens.Length < 2 || Server is not NatsServer server)
return null;
var accountName = tokens[0];
var subject = Encoding.ASCII.GetBytes(tokens[1]);
var queue = tokens.Length > 2 ? Encoding.ASCII.GetBytes(tokens[2]) : null;
var (account, _) = server.LookupAccount(accountName);
if (account == null)
return null;
var sub = new Internal.Subscription { Subject = subject, Queue = queue };
server.MaybeSendSubOrUnsubToGateways(account, sub, isUnsub: false);
return null;
}
internal bool GatewayInterest(Account? account, string subject)
{
if (account == null || string.IsNullOrWhiteSpace(subject))
return false;
if (Gateway?.OutSim == null)
return true;
if (!Gateway.OutSim.TryGetValue(account.Name, out var outSi))
return true;
outSi.AcquireReadLock();
try
{
if (outSi.Mode == GatewayInterestMode.InterestOnly && outSi.Sl != null)
{
var match = outSi.Sl.Match(subject);
return match.PSubs.Count > 0 || match.QSubs.Count > 0;
}
return outSi.Ni == null || !outSi.Ni.Contains(subject);
}
finally
{
outSi.ReleaseReadLock();
}
}
}

View File

@@ -9,6 +9,9 @@ namespace ZB.MOM.NatsNet.Server;
internal static class GatewayHandler
{
internal static readonly byte[] GwReplyPrefix = Encoding.ASCII.GetBytes("_GR_.");
internal static readonly byte[] OldGwReplyPrefix = Encoding.ASCII.GetBytes("$GR.");
private static readonly TimeSpan DefaultSolicitGatewaysDelay = TimeSpan.FromSeconds(1);
private static long _gatewaySolicitDelayTicks = DefaultSolicitGatewaysDelay.Ticks;
private static int _doNotForceInterestOnlyMode;
@@ -77,6 +80,25 @@ internal static class GatewayHandler
return Encoding.ASCII.GetBytes($"{op} {accountName} {subject}\r\n");
}
internal static bool IsGWRoutedReply(byte[] subject)
{
return subject.AsSpan().StartsWith(GwReplyPrefix) || subject.AsSpan().StartsWith(OldGwReplyPrefix);
}
internal static (bool IsRouted, bool IsOldPrefix) IsGWRoutedSubjectAndIsOldPrefix(byte[] subject)
{
if (subject.AsSpan().StartsWith(OldGwReplyPrefix))
return (true, true);
if (subject.AsSpan().StartsWith(GwReplyPrefix))
return (true, false);
return (false, false);
}
internal static bool HasGWRoutedReplyPrefix(byte[] subject)
{
return IsGWRoutedReply(subject);
}
private static byte[] GetHash(string value, int len)
{
var bytes = Encoding.UTF8.GetBytes(value);

View File

@@ -262,6 +262,25 @@ internal sealed class SrvGateway
{
Outo = [.. Out.Values.OrderBy(c => c.GetRttValue()).ThenBy(c => c.Cid)];
}
internal void OrderOutboundConnections()
{
_lock.EnterWriteLock();
try { OrderOutboundConnectionsLocked(); }
finally { _lock.ExitWriteLock(); }
}
internal bool ShouldMapReplyForGatewaySend(byte[] reply, bool useOldPrefix)
{
if (reply.Length == 0)
return false;
if (useOldPrefix)
return !GatewayHandler.IsGWRoutedSubjectAndIsOldPrefix(reply).IsRouted;
return !GatewayHandler.IsGWRoutedReply(reply);
}
}
/// <summary>

View File

@@ -0,0 +1,77 @@
// Copyright 2018-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0
using System.Net;
using System.Text;
using ZB.MOM.NatsNet.Server.Internal;
namespace ZB.MOM.NatsNet.Server;
public sealed partial class NatsServer
{
internal bool ShouldMapReplyForGatewaySend(byte[] reply, bool useOldPrefix)
{
return _gateway.ShouldMapReplyForGatewaySend(reply, useOldPrefix);
}
internal IReadOnlyList<ClientConnection> GetInboundGatewayConnections()
{
_gateway.AcquireReadLock();
try { return [.. _gateway.In.Values]; }
finally { _gateway.ReleaseReadLock(); }
}
internal string GatewayAddr()
{
var opts = GetOpts();
var host = string.IsNullOrWhiteSpace(opts.Gateway.Host) ? opts.Host : opts.Gateway.Host;
return $"{host}:{opts.Gateway.Port}";
}
internal void SwitchAccountToInterestMode(string accountName)
{
_gateway.AcquireWriteLock();
try
{
foreach (var outbound in _gateway.Out.Values)
{
outbound.Gateway ??= new Gateway();
outbound.Gateway.OutSim ??= new System.Collections.Concurrent.ConcurrentDictionary<string, OutSide>(StringComparer.Ordinal);
outbound.Gateway.OutSim[accountName] = new OutSide
{
Mode = GatewayInterestMode.InterestOnly,
Ni = null,
};
}
}
finally
{
_gateway.ReleaseWriteLock();
}
}
internal void MaybeSendSubOrUnsubToGateways(Account account, Subscription sub, bool isUnsub)
{
if (sub.Subject.Length == 0)
return;
_gateway.AcquireReadLock();
try
{
foreach (var gateway in _gateway.Out.Values)
gateway.EnqueueProto(GatewayHandler.GwBuildSubProto(account.Name, sub, isUnsub));
}
finally
{
_gateway.ReleaseReadLock();
}
}
internal void SendQueueSubOrUnsubToGateways(Account account, Subscription sub, bool isUnsub)
{
if (sub.Queue is not { Length: > 0 })
return;
MaybeSendSubOrUnsubToGateways(account, sub, isUnsub);
}
}