diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs index efbaa61..ec6e7d0 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.cs @@ -358,6 +358,12 @@ public sealed partial class Account : INatsAccount /// internal byte[]? ServiceImportReply { get; set; } + /// + /// Gateway reply mapping table used for routed reply restoration. + /// Mirrors Go gwReplyMapping. + /// + internal GwReplyMapping GwReplyMapping { get; } = new(); + /// /// Subscription ID counter for internal use. /// Mirrors Go isid uint64. diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.Gateways.Messages.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.Gateways.Messages.cs new file mode 100644 index 0000000..99370a1 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.Gateways.Messages.cs @@ -0,0 +1,224 @@ +// Copyright 2018-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 + +using System.Text; +using System.Text.Json; +using ZB.MOM.NatsNet.Server.Internal; + +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class ClientConnection +{ + internal void SendMsgToGateways(Account? account, byte[] subject, byte[]? reply, byte[] payload) + { + if (Server is not NatsServer server || account == null || subject.Length == 0) + return; + + var outboundGateways = server.GetOutboundGatewayConnections(); + if (outboundGateways.Count == 0) + return; + + foreach (var gateway in outboundGateways) + { + if (!GatewayInterest(account, Encoding.ASCII.GetString(subject))) + continue; + + var replyToSend = reply; + if (reply is { Length: > 0 } && gateway.Gateway != null) + { + var shouldMap = server.ShouldMapReplyForGatewaySend(reply, gateway.Gateway.UseOldPrefix); + if (shouldMap) + replyToSend = reply; + } + + var proto = MsgHeader(subject, replyToSend, new Internal.Subscription { Sid = Encoding.ASCII.GetBytes("1") }); + gateway.EnqueueProto(proto); + gateway.EnqueueProto(payload); + gateway.EnqueueProto(Encoding.ASCII.GetBytes("\r\n")); + } + } + + internal void SendAccountUnsubToGateway(byte[] accountName) + { + if (accountName.Length == 0) + return; + + lock (_mu) + { + Gateway ??= new Gateway(); + Gateway.InSim ??= new Dictionary(StringComparer.Ordinal); + + var key = Encoding.ASCII.GetString(accountName); + if (!Gateway.InSim.TryGetValue(key, out var entry) || entry != null) + { + Gateway.InSim[key] = null!; + var proto = Encoding.ASCII.GetBytes($"A- {key}\r\n"); + EnqueueProto(proto); + if (Trace) + TraceOutOp(string.Empty, proto.AsSpan(0, proto.Length - 2).ToArray()); + } + } + } + + internal bool HandleGatewayReply(byte[] msg) + { + if (Server is not NatsServer server || ParseCtx.Pa.Subject is not { Length: > 0 } originalSubject) + return false; + + var (isRoutedReply, isOldPrefix) = GatewayHandler.IsGWRoutedSubjectAndIsOldPrefix(originalSubject); + if (!isRoutedReply) + return false; + + ParseCtx.Pa.Subject = GatewayHandler.GetSubjectFromGWRoutedReply(originalSubject, isOldPrefix); + ParseCtx.Pa.PaCache = [.. ParseCtx.Pa.Account ?? [], (byte)' ', .. ParseCtx.Pa.Subject]; + + var (account, result) = GetAccAndResultFromCache(); + if (account is not Account concreteAccount) + { + Debugf("Unknown account {0} for gateway message on subject: {1}", + ParseCtx.Pa.Account is { Length: > 0 } accountName ? Encoding.ASCII.GetString(accountName) : string.Empty, + Encoding.ASCII.GetString(ParseCtx.Pa.Subject)); + + if (ParseCtx.Pa.Account is { Length: > 0 } gatewayAccount) + server.GatewayHandleAccountNoInterest(this, gatewayAccount); + return true; + } + + if (result != null && (result.PSubs.Count + result.QSubs.Count) > 0) + ProcessMsgResults(concreteAccount, result, msg, null, ParseCtx.Pa.Subject, ParseCtx.Pa.Reply, PmrFlags.None); + + if (!IsServiceReply(ParseCtx.Pa.Subject)) + SendMsgToGateways(concreteAccount, ParseCtx.Pa.Subject, ParseCtx.Pa.Reply, msg); + + return true; + } + + internal void ProcessInboundGatewayMsg(byte[] msg) + { + _in.Msgs++; + _in.Bytes += Math.Max(0, msg.Length - 2); + + if (Opts.Verbose) + SendOK(); + + if (Server is not NatsServer server || ParseCtx.Pa.Subject is not { Length: > 0 }) + return; + + if (HandleGatewayReply(msg)) + return; + + var (account, result) = GetAccAndResultFromCache(); + if (account is not Account concreteAccount) + { + Debugf("Unknown account {0} for gateway message on subject: {1}", + ParseCtx.Pa.Account is { Length: > 0 } accountName ? Encoding.ASCII.GetString(accountName) : string.Empty, + Encoding.ASCII.GetString(ParseCtx.Pa.Subject)); + + if (ParseCtx.Pa.Account is { Length: > 0 } gatewayAccount) + server.GatewayHandleAccountNoInterest(this, gatewayAccount); + return; + } + + var noInterest = result == null || result.PSubs.Count == 0; + if (noInterest) + { + server.GatewayHandleSubjectNoInterest(this, concreteAccount, ParseCtx.Pa.Account ?? Encoding.ASCII.GetBytes(concreteAccount.Name), ParseCtx.Pa.Subject); + if (ParseCtx.Pa.Queues is null || ParseCtx.Pa.Queues.Count == 0) + return; + } + + ProcessMsgResults(concreteAccount, result, msg, null, ParseCtx.Pa.Subject, ParseCtx.Pa.Reply, PmrFlags.None); + } + + internal void GatewayAllSubsReceiveStart(ServerInfo info) + { + var account = GatewayHandler.GetAccountFromGatewayCommand(this, info, "start"); + if (string.IsNullOrWhiteSpace(account)) + return; + + Debugf("Gateway {0}: switching account {1} to {2} mode", + info.Gateway ?? string.Empty, account, GatewayInterestMode.InterestOnly.String()); + + Gateway ??= new Gateway(); + Gateway.OutSim ??= new System.Collections.Concurrent.ConcurrentDictionary(StringComparer.Ordinal); + + var outSide = Gateway.OutSim.GetOrAdd(account, _ => new OutSide + { + Sl = Internal.DataStructures.SubscriptionIndex.NewSublistWithCache(), + }); + + outSide.AcquireWriteLock(); + try { outSide.Mode = GatewayInterestMode.Transitioning; } + finally { outSide.ReleaseWriteLock(); } + } + + internal void GatewayAllSubsReceiveComplete(ServerInfo info) + { + var account = GatewayHandler.GetAccountFromGatewayCommand(this, info, "complete"); + if (string.IsNullOrWhiteSpace(account)) + return; + + if (Gateway?.OutSim == null || !Gateway.OutSim.TryGetValue(account, out var outSide)) + return; + + outSide.AcquireWriteLock(); + try + { + outSide.Ni = null; + outSide.Mode = GatewayInterestMode.InterestOnly; + } + finally + { + outSide.ReleaseWriteLock(); + } + + Debugf("Gateway {0}: switching account {1} to {2} mode complete", + info.Gateway ?? string.Empty, account, GatewayInterestMode.InterestOnly.String()); + } + + internal void GatewaySwitchAccountToSendAllSubs(InSide inSide, string accountName) + { + if (Server is not NatsServer server || string.IsNullOrWhiteSpace(accountName)) + return; + + inSide.Ni = null; + inSide.Mode = GatewayInterestMode.Transitioning; + + var remoteGatewayName = Gateway?.Name ?? string.Empty; + Debugf("Gateway {0}: switching account {1} to {2} mode", + remoteGatewayName, accountName, GatewayInterestMode.InterestOnly.String()); + + void SendCommand(byte command, bool withLock) + { + var info = new ServerInfo + { + Gateway = server.GetGatewayName(), + GatewayCmd = command, + GatewayCmdPayload = Encoding.ASCII.GetBytes(accountName), + }; + + var infoProto = NatsServer.GenerateInfoJson(info); + if (withLock) + { + lock (_mu) + { + EnqueueProto(infoProto); + } + return; + } + + EnqueueProto(infoProto); + } + + SendCommand(GatewayHandler.GatewayCmdAllSubsStart, withLock: false); + + _ = server.StartGoRoutine(() => + { + server.SendAccountSubsToGateway(this, accountName); + SendCommand(GatewayHandler.GatewayCmdAllSubsComplete, withLock: true); + + Debugf("Gateway {0}: switching account {1} to {2} mode complete", + remoteGatewayName, accountName, GatewayInterestMode.InterestOnly.String()); + }); + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.Gateways.Protocol.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.Gateways.Protocol.cs new file mode 100644 index 0000000..b20ffa5 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.Gateways.Protocol.cs @@ -0,0 +1,217 @@ +// Copyright 2018-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 + +using System.Text; +using System.Text.Json; + +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class ClientConnection +{ + internal Exception? SendGatewayConnect() + { + if (Server is not NatsServer server) + return new InvalidOperationException("gateway server unavailable"); + + var opts = server.GetOpts(); + var connect = new ConnectInfo + { + Echo = true, + Verbose = false, + Pedantic = false, + User = opts.Gateway.Username, + Pass = opts.Gateway.Password, + Tls = opts.Gateway.TlsConfig != null, + Name = server.ID(), + Headers = server.SupportsHeaders(), + Gateway = opts.Gateway.Name, + Dynamic = server.IsClusterNameDynamic(), + }; + + var payload = JsonSerializer.Serialize(connect); + EnqueueProto(Encoding.ASCII.GetBytes($"CONNECT {payload}\r\n")); + return null; + } + + internal Exception? ProcessGatewayConnect(byte[] arg) + { + ConnectInfo? connect; + try + { + connect = JsonSerializer.Deserialize(arg); + } + catch (Exception ex) + { + return ex; + } + + if (connect == null) + return new InvalidOperationException("invalid gateway connect payload"); + + Gateway ??= new Gateway(); + Gateway.Connected = true; + + if (!string.IsNullOrWhiteSpace(connect.Gateway)) + Gateway.Name = connect.Gateway; + if (!string.IsNullOrWhiteSpace(connect.Name)) + Gateway.RemoteName = connect.Name; + + Headers = connect.Headers; + return null; + } + + internal Exception? ProcessGatewayInfo(byte[] arg) + { + ServerInfo? info; + try + { + info = JsonSerializer.Deserialize(arg); + } + catch (Exception ex) + { + return ex; + } + + if (info == null) + return new InvalidOperationException("invalid gateway info payload"); + + return ProcessGatewayInfo(info); + } + + internal Exception? ProcessGatewayInfo(ServerInfo info) + { + if (Server is not NatsServer server) + return new InvalidOperationException("gateway server unavailable"); + + Gateway ??= new Gateway(); + + if (!string.IsNullOrWhiteSpace(info.Gateway)) + Gateway.Name = info.Gateway; + + if (!string.IsNullOrWhiteSpace(info.GatewayUrl) && Uri.TryCreate(info.GatewayUrl, UriKind.Absolute, out var url)) + Gateway.ConnectUrl = url; + + Gateway.UseOldPrefix = info.GatewayNrp; + Gateway.InterestOnlyMode = info.GatewayIom; + + if (info.GatewayUrls is { Length: > 0 }) + server.ProcessImplicitGateway(Gateway.Name, info.GatewayUrls); + + return null; + } + + internal Exception? ProcessGatewayAccountUnsub(byte[] arg) + { + if (Server is not NatsServer server) + return new InvalidOperationException("gateway server unavailable"); + + var tokens = Encoding.ASCII.GetString(arg).Split(' ', StringSplitOptions.RemoveEmptyEntries); + if (tokens.Length < 1) + return null; + + if (tokens.Length == 1) + return null; + + var accountName = tokens[0]; + var subject = Encoding.ASCII.GetBytes(tokens[1]); + var queue = tokens.Length > 2 ? Encoding.ASCII.GetBytes(tokens[2]) : null; + + var (account, _) = server.LookupAccount(accountName); + if (account == null) + return null; + + var sub = new Internal.Subscription { Subject = subject, Queue = queue }; + server.SendQueueSubOrUnsubToGateways(account, sub, isUnsub: true); + return null; + } + + internal Exception? ProcessGatewayAccountSub(byte[] arg) + { + if (Server is not NatsServer server) + return new InvalidOperationException("gateway server unavailable"); + + var tokens = Encoding.ASCII.GetString(arg).Split(' ', StringSplitOptions.RemoveEmptyEntries); + if (tokens.Length < 2) + return null; + + var accountName = tokens[0]; + var subject = Encoding.ASCII.GetBytes(tokens[1]); + var queue = tokens.Length > 2 ? Encoding.ASCII.GetBytes(tokens[2]) : null; + + var (account, _) = server.LookupAccount(accountName); + if (account == null) + return null; + + var sub = new Internal.Subscription { Subject = subject, Queue = queue }; + if (queue is { Length: > 0 }) + server.SendQueueSubOrUnsubToGateways(account, sub, isUnsub: false); + else + server.MaybeSendSubOrUnsubToGateways(account, sub, isUnsub: false); + + return null; + } + + internal Exception? ProcessGatewayRUnsub(byte[] arg) + { + var tokens = Encoding.ASCII.GetString(arg).Split(' ', StringSplitOptions.RemoveEmptyEntries); + if (tokens.Length < 2 || Server is not NatsServer server) + return null; + + var accountName = tokens[0]; + var subject = Encoding.ASCII.GetBytes(tokens[1]); + var queue = tokens.Length > 2 ? Encoding.ASCII.GetBytes(tokens[2]) : null; + var (account, _) = server.LookupAccount(accountName); + if (account == null) + return null; + + var sub = new Internal.Subscription { Subject = subject, Queue = queue }; + server.MaybeSendSubOrUnsubToGateways(account, sub, isUnsub: true); + return null; + } + + internal Exception? ProcessGatewayRSub(byte[] arg) + { + var tokens = Encoding.ASCII.GetString(arg).Split(' ', StringSplitOptions.RemoveEmptyEntries); + if (tokens.Length < 2 || Server is not NatsServer server) + return null; + + var accountName = tokens[0]; + var subject = Encoding.ASCII.GetBytes(tokens[1]); + var queue = tokens.Length > 2 ? Encoding.ASCII.GetBytes(tokens[2]) : null; + var (account, _) = server.LookupAccount(accountName); + if (account == null) + return null; + + var sub = new Internal.Subscription { Subject = subject, Queue = queue }; + server.MaybeSendSubOrUnsubToGateways(account, sub, isUnsub: false); + return null; + } + + internal bool GatewayInterest(Account? account, string subject) + { + if (account == null || string.IsNullOrWhiteSpace(subject)) + return false; + + if (Gateway?.OutSim == null) + return true; + + if (!Gateway.OutSim.TryGetValue(account.Name, out var outSi)) + return true; + + outSi.AcquireReadLock(); + try + { + if (outSi.Mode == GatewayInterestMode.InterestOnly && outSi.Sl != null) + { + var match = outSi.Sl.Match(subject); + return match.PSubs.Count > 0 || match.QSubs.Count > 0; + } + + return outSi.Ni == null || !outSi.Ni.Contains(subject); + } + finally + { + outSi.ReleaseReadLock(); + } + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs index 1eb2491..321524d 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs @@ -114,6 +114,7 @@ public sealed partial class ClientConnection // Client options (from CONNECT message). internal ClientOptions Opts = ClientOptions.Default; internal Route? Route; + internal Gateway? Gateway; internal WebsocketConnection? Ws; // Flags and state. @@ -134,6 +135,7 @@ public sealed partial class ClientConnection internal Dictionary? Replies; internal Dictionary? Pcd; // pending clients with data to flush internal Dictionary? DArray; // denied subscribe patterns + internal GwReplyMapping GwReplyMapping = new(); // Outbound state (simplified — full write loop ported when Server is available). internal long OutPb; // pending bytes diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Gateway/GatewayHandler.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Gateway/GatewayHandler.cs new file mode 100644 index 0000000..5326925 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Gateway/GatewayHandler.cs @@ -0,0 +1,142 @@ +// Copyright 2018-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 + +using System.Security.Cryptography; +using System.Text; +using ZB.MOM.NatsNet.Server.Internal; + +namespace ZB.MOM.NatsNet.Server; + +internal static class GatewayHandler +{ + internal static readonly byte[] GwReplyPrefix = Encoding.ASCII.GetBytes("_GR_."); + internal static readonly byte[] OldGwReplyPrefix = Encoding.ASCII.GetBytes("$GR."); + internal const int GwHashLen = 6; + internal const int GwClusterOffset = 5; + internal const int GwServerOffset = GwClusterOffset + GwHashLen + 1; + internal const int GwSubjectOffset = GwServerOffset + GwHashLen + 1; + internal const int OldGwReplyPrefixLen = 4; + internal const int OldGwReplyStart = OldGwReplyPrefixLen + 5; + internal const int GatewayCmdAllSubsStart = 2; + internal const int GatewayCmdAllSubsComplete = 3; + internal const int GatewayMaxRUnsubBeforeSwitch = 1000; + + private static readonly TimeSpan DefaultSolicitGatewaysDelay = TimeSpan.FromSeconds(1); + private static long _gatewaySolicitDelayTicks = DefaultSolicitGatewaysDelay.Ticks; + private static int _doNotForceInterestOnlyMode; + + internal static void SetGatewaysSolicitDelay(TimeSpan delay) + { + Interlocked.Exchange(ref _gatewaySolicitDelayTicks, delay.Ticks); + } + + internal static void ResetGatewaysSolicitDelay() + { + Interlocked.Exchange(ref _gatewaySolicitDelayTicks, DefaultSolicitGatewaysDelay.Ticks); + } + + internal static TimeSpan GetGatewaysSolicitDelay() + { + return TimeSpan.FromTicks(Interlocked.Read(ref _gatewaySolicitDelayTicks)); + } + + internal static void GatewayDoNotForceInterestOnlyMode(bool doNotForce) + { + Interlocked.Exchange(ref _doNotForceInterestOnlyMode, doNotForce ? 1 : 0); + } + + internal static bool DoNotForceInterestOnlyMode() + { + return Interlocked.CompareExchange(ref _doNotForceInterestOnlyMode, 0, 0) != 0; + } + + internal static Exception? ValidateGatewayOptions(ServerOptions options) + { + var gateway = options.Gateway; + if (string.IsNullOrWhiteSpace(gateway.Name) || gateway.Port == 0) + return null; + + if (gateway.Name.Contains(' ')) + return ServerErrors.ErrGatewayNameHasSpaces; + + var names = new HashSet(StringComparer.Ordinal); + foreach (var remote in gateway.Gateways) + { + if (string.IsNullOrWhiteSpace(remote.Name)) + return new InvalidOperationException("gateway remote requires a name"); + + if (!names.Add(remote.Name)) + return new InvalidOperationException($"duplicate gateway remote: {remote.Name}"); + + if (remote.Urls.Count == 0) + return new InvalidOperationException($"gateway remote {remote.Name} has no URLs"); + } + + return null; + } + + internal static byte[] GetGWHash(string value) => GetHash(value, 6); + + internal static byte[] GetOldHash(string value) => GetHash(value, 4); + + internal static byte[] GwBuildSubProto(string accountName, Subscription subscription, bool isUnsub = false) + { + var op = isUnsub ? "RS-" : "RS+"; + var subject = Encoding.ASCII.GetString(subscription.Subject); + if (subscription.Queue is { Length: > 0 } queue) + return Encoding.ASCII.GetBytes($"{op} {accountName} {subject} {Encoding.ASCII.GetString(queue)}\r\n"); + + return Encoding.ASCII.GetBytes($"{op} {accountName} {subject}\r\n"); + } + + internal static bool IsGWRoutedReply(byte[] subject) + { + return subject.AsSpan().StartsWith(GwReplyPrefix) || subject.AsSpan().StartsWith(OldGwReplyPrefix); + } + + internal static (bool IsRouted, bool IsOldPrefix) IsGWRoutedSubjectAndIsOldPrefix(byte[] subject) + { + if (subject.AsSpan().StartsWith(OldGwReplyPrefix)) + return (true, true); + if (subject.AsSpan().StartsWith(GwReplyPrefix)) + return (true, false); + return (false, false); + } + + internal static bool HasGWRoutedReplyPrefix(byte[] subject) + { + return IsGWRoutedReply(subject); + } + + internal static byte[] GetSubjectFromGWRoutedReply(byte[] reply, bool isOldPrefix) + { + if (isOldPrefix) + return reply.Length > OldGwReplyStart ? reply[OldGwReplyStart..] : []; + + return reply.Length > GwSubjectOffset ? reply[GwSubjectOffset..] : []; + } + + internal static string GetAccountFromGatewayCommand(ClientConnection connection, ServerInfo info, string command) + { + if (info.GatewayCmdPayload == null || info.GatewayCmdPayload.Length == 0) + { + connection.SendErrAndErr($"Account absent from receive-all-subscriptions-{command} command"); + connection.CloseConnection(ClosedState.ProtocolViolation); + return string.Empty; + } + + return Encoding.ASCII.GetString(info.GatewayCmdPayload); + } + + private static byte[] GetHash(string value, int len) + { + var bytes = Encoding.UTF8.GetBytes(value); + var hash = SHA256.HashData(bytes); + var encoded = Convert.ToBase64String(hash) + .Replace('+', '-') + .Replace('/', '_') + .TrimEnd('='); + + return Encoding.ASCII.GetBytes(encoded[..len]); + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Gateway/GatewayTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Gateway/GatewayTypes.cs index c405948..1b936d2 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/Gateway/GatewayTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Gateway/GatewayTypes.cs @@ -13,7 +13,9 @@ // // Adapted from server/gateway.go in the NATS server Go source. +using System.Text; using System.Threading; +using System.Linq; using ZB.MOM.NatsNet.Server.Internal; using ZB.MOM.NatsNet.Server.Internal.DataStructures; @@ -54,6 +56,20 @@ public enum GatewayInterestMode : byte CacheFlushed = 3, } +internal static class GatewayInterestModeExtensions +{ + internal static string String(this GatewayInterestMode mode) + { + return mode switch + { + GatewayInterestMode.Optimistic => "Optimistic", + GatewayInterestMode.InterestOnly => "Interest-Only", + GatewayInterestMode.Transitioning => "Transitioning", + _ => "Unknown", + }; + } +} + /// /// Server-level gateway state kept on the instance. /// Replaces the stub that was in NatsServerTypes.cs. @@ -130,6 +146,71 @@ internal sealed class SrvGateway public Lock PasiLock => _pasiLock; + internal byte[] GenerateInfoJSON() + { + Info ??= new ServerInfo(); + Info.Gateway = Name; + Info.GatewayUrl = Url; + Info.GatewayUrls = [.. Urls.GetAsStringSlice()]; + InfoJson = NatsServer.GenerateInfoJson(Info); + return InfoJson; + } + + internal bool HasInbound(string gatewayName) + { + if (string.IsNullOrWhiteSpace(gatewayName)) + return false; + + _lock.EnterReadLock(); + try + { + foreach (var inbound in In.Values) + { + if (string.Equals(inbound.Gateway?.Name, gatewayName, StringComparison.Ordinal)) + return true; + } + } + finally + { + _lock.ExitReadLock(); + } + + return false; + } + + internal void UpdateRemotesTLSConfig(IReadOnlyList remotes) + { + if (remotes.Count == 0) + return; + + _lock.EnterWriteLock(); + try + { + foreach (var remote in remotes) + { + if (!Remotes.TryGetValue(remote.Name, out var cfg)) + continue; + + cfg.AcquireWriteLock(); + try + { + cfg.RemoteOpts ??= remote.Clone(); + cfg.RemoteOpts.TlsConfig = remote.TlsConfig; + cfg.RemoteOpts.TlsTimeout = remote.TlsTimeout; + cfg.RemoteOpts.TlsConfigOpts = remote.TlsConfigOpts; + } + finally + { + cfg.ReleaseWriteLock(); + } + } + } + finally + { + _lock.ExitWriteLock(); + } + } + // ------------------------------------------------------------------------- // Recent subscription tracking (thread-safe map) // ------------------------------------------------------------------------- @@ -176,6 +257,46 @@ internal sealed class SrvGateway public void ReleaseReadLock() => _lock.ExitReadLock(); public void AcquireWriteLock() => _lock.EnterWriteLock(); public void ReleaseWriteLock() => _lock.ExitWriteLock(); + + internal void OrderOutboundConnectionsLocked() + { + Outo = [.. Out.Values.OrderBy(c => c.GetRttValue()).ThenBy(c => c.Cid)]; + } + + internal void OrderOutboundConnections() + { + _lock.EnterWriteLock(); + try { OrderOutboundConnectionsLocked(); } + finally { _lock.ExitWriteLock(); } + } + + internal bool ShouldMapReplyForGatewaySend(byte[] reply, bool useOldPrefix) + { + if (reply.Length == 0) + return false; + + if (useOldPrefix) + return !GatewayHandler.IsGWRoutedSubjectAndIsOldPrefix(reply).IsRouted; + + return !GatewayHandler.IsGWRoutedReply(reply); + } + + internal byte[] GetClusterHash() + { + _lock.EnterReadLock(); + try + { + if (ReplyPfx.Length < GatewayHandler.GwClusterOffset + GatewayHandler.GwHashLen) + return []; + + return ReplyPfx[GatewayHandler.GwClusterOffset..(GatewayHandler.GwClusterOffset + GatewayHandler.GwHashLen)]; + } + finally + { + _lock.ExitReadLock(); + } + } + } /// @@ -219,6 +340,7 @@ internal sealed class GatewayCfg /// TLS server name override for SNI. public string TlsName { get; set; } = string.Empty; + /// TLS server name override for SNI. /// True if this remote was discovered via gossip (not configured). public bool Implicit { get; set; } @@ -228,6 +350,81 @@ internal sealed class GatewayCfg // Forwarded properties from RemoteGatewayOpts public string Name { get => RemoteOpts?.Name ?? string.Empty; } + internal void BumpConnAttempts() + { + _lock.EnterWriteLock(); + try { ConnAttempts++; } + finally { _lock.ExitWriteLock(); } + } + + internal int GetConnAttempts() + { + _lock.EnterReadLock(); + try { return ConnAttempts; } + finally { _lock.ExitReadLock(); } + } + + internal void ResetConnAttempts() + { + _lock.EnterWriteLock(); + try { ConnAttempts = 0; } + finally { _lock.ExitWriteLock(); } + } + + internal bool IsImplicit() + { + _lock.EnterReadLock(); + try { return Implicit; } + finally { _lock.ExitReadLock(); } + } + + internal IReadOnlyCollection GetUrls() + { + _lock.EnterReadLock(); + try { return [.. Urls.Values]; } + finally { _lock.ExitReadLock(); } + } + + internal string[] GetUrlsAsStrings() + { + _lock.EnterReadLock(); + try { return [.. Urls.Keys]; } + finally { _lock.ExitReadLock(); } + } + + internal void UpdateUrls(IEnumerable urls) + { + _lock.EnterWriteLock(); + try + { + Urls.Clear(); + foreach (var url in urls) + Urls[url.ToString()] = url; + } + finally { _lock.ExitWriteLock(); } + } + + internal void SaveTLSHostname(Uri url) + { + if (string.IsNullOrWhiteSpace(url.Host)) + return; + + _lock.EnterWriteLock(); + try { TlsName = url.Host; } + finally { _lock.ExitWriteLock(); } + } + + internal void AddUrls(IEnumerable urls) + { + _lock.EnterWriteLock(); + try + { + foreach (var url in urls) + Urls[url.ToString()] = url; + } + finally { _lock.ExitWriteLock(); } + } + // ------------------------------------------------------------------------- // Lock helpers // ------------------------------------------------------------------------- @@ -378,7 +575,24 @@ internal sealed class GwReplyMapping /// public (byte[] Subject, bool Found) Get(byte[] subject) { - // TODO: session 16 — implement mapping lookup - return (subject, false); + var key = Encoding.UTF8.GetString(subject); + if (!Mapping.TryGetValue(key, out var entry)) + return (subject, false); + + if (entry.Exp <= DateTime.UtcNow.Ticks) + { + Mapping.Remove(key); + return (subject, false); + } + + return (Encoding.UTF8.GetBytes(entry.Ms), true); + } +} + +internal static class RemoteGatewayOptsExtensions +{ + internal static RemoteGatewayOpts Clone(this RemoteGatewayOpts source) + { + return source.Clone(); } } diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Gateways.ConfigAndStartup.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Gateways.ConfigAndStartup.cs new file mode 100644 index 0000000..35fceca --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Gateways.ConfigAndStartup.cs @@ -0,0 +1,230 @@ +// Copyright 2018-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 + +using System.Net; +using System.Net.Sockets; +using System.Text; +using System.Threading; +using ZB.MOM.NatsNet.Server.Internal; + +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class NatsServer +{ + internal Exception? NewGateway(ServerOptions options) + { + var validationErr = GatewayHandler.ValidateGatewayOptions(options); + if (validationErr != null) + return validationErr; + + if (string.IsNullOrWhiteSpace(options.Gateway.Name) || options.Gateway.Port == 0) + return null; + + _gateway.AcquireWriteLock(); + try + { + _gateway.Enabled = true; + _gateway.Name = options.Gateway.Name; + _gateway.RejectUnknown = options.Gateway.RejectUnknown; + _gateway.Remotes.Clear(); + _gateway.Out.Clear(); + _gateway.Outo.Clear(); + _gateway.In.Clear(); + _gateway.OwnCfgUrls = []; + + foreach (var remote in options.Gateway.Gateways) + { + var cfg = new GatewayCfg + { + RemoteOpts = remote.Clone(), + Hash = GatewayHandler.GetGWHash(remote.Name), + OldHash = GatewayHandler.GetOldHash(remote.Name), + Urls = remote.Urls.ToDictionary(u => u.ToString(), u => u, StringComparer.Ordinal), + }; + + _gateway.Remotes[remote.Name] = cfg; + + foreach (var url in remote.Urls) + _gateway.OwnCfgUrls.Add(url.ToString()); + } + + var info = CopyInfo(); + info.Gateway = options.Gateway.Name; + info.GatewayUrl = _gateway.Url; + info.GatewayUrls = [.. _gateway.Urls.GetAsStringSlice()]; + _gateway.Info = info; + _gateway.InfoJson = GenerateInfoJson(info); + + _gateway.SIdHash = GatewayHandler.GetGWHash(_info.Id); + _gateway.OldHash = GatewayHandler.GetOldHash(_info.Id); + var clusterHash = GatewayHandler.GetGWHash(options.Gateway.Name); + _gateway.ReplyPfx = Encoding.ASCII.GetBytes($"_GR_.{Encoding.ASCII.GetString(clusterHash)}.{Encoding.ASCII.GetString(_gateway.SIdHash)}."); + _gateway.OldReplyPfx = Encoding.ASCII.GetBytes($"$GR.{Encoding.ASCII.GetString(_gateway.OldHash)}."); + } + finally + { + _gateway.ReleaseWriteLock(); + } + + return null; + } + + internal Exception? StartGateways() + { + if (!_gateway.Enabled) + return null; + + var hostPortErr = SetGatewayInfoHostPort(); + if (hostPortErr != null) + return hostPortErr; + + SolicitGateways(); + return StartGatewayAcceptLoop(); + } + + internal Exception? StartGatewayAcceptLoop() + { + if (_gatewayListener == null) + { + var opts = GetOpts(); + var hp = $"{opts.Gateway.Host}:{opts.Gateway.Port}"; + + _mu.EnterWriteLock(); + try + { + var parts = hp.Split(':', 2); + var host = parts[0]; + var port = parts.Length > 1 ? int.Parse(parts[1]) : 0; + var addr = string.IsNullOrWhiteSpace(host) || host == "0.0.0.0" + ? IPAddress.Any + : (host == "::" ? IPAddress.IPv6Any : IPAddress.Parse(host)); + + _gatewayListener = new TcpListener(addr, port); + _gatewayListener.Start(); + } + catch (Exception ex) + { + _gatewayListenerErr = ex; + return ex; + } + finally + { + _mu.ExitWriteLock(); + } + } + + if (!StartGoRoutine(() => Noticef("Gateway accept loop started"))) + return new InvalidOperationException("unable to start gateway accept loop"); + + return null; + } + + internal Exception? SetGatewayInfoHostPort() + { + var opts = GetOpts(); + var host = opts.Gateway.Host; + var port = opts.Gateway.Port; + + if (!string.IsNullOrWhiteSpace(opts.Gateway.Advertise)) + { + var (advHost, advPort, advErr) = Internal.ServerUtilities.ParseHostPort(opts.Gateway.Advertise, port); + if (advErr != null) + return advErr; + + host = advHost; + port = advPort; + } + + var scheme = opts.Gateway.TlsConfig != null ? "tls" : "nats"; + var url = $"{scheme}://{host}:{port}"; + + _gateway.AcquireWriteLock(); + try + { + _gateway.Url = url; + _gateway.Info ??= CopyInfo(); + _gateway.Info.Gateway = opts.Gateway.Name; + _gateway.Info.GatewayUrl = url; + _gateway.Info.GatewayUrls = [.. _gateway.Urls.GetAsStringSlice()]; + _gateway.InfoJson = GenerateInfoJson(_gateway.Info); + } + finally + { + _gateway.ReleaseWriteLock(); + } + + return null; + } + + internal void SolicitGateways() + { + if (!_gateway.Enabled) + return; + + var delay = GatewayHandler.GetGatewaysSolicitDelay(); + if (delay > TimeSpan.Zero) + Thread.Sleep(delay); + + List remotes; + _gateway.AcquireReadLock(); + try + { + remotes = [.. _gateway.Remotes.Values]; + } + finally + { + _gateway.ReleaseReadLock(); + } + + foreach (var cfg in remotes) + SolicitGateway(cfg, firstConnect: true); + } + + internal void ReconnectGateway(GatewayCfg cfg) + { + SolicitGateway(cfg, firstConnect: false); + } + + internal void SolicitGateway(GatewayCfg cfg, bool firstConnect) + { + _ = firstConnect; + + if (cfg.RemoteOpts == null || cfg.RemoteOpts.Urls.Count == 0) + return; + + if (_gateway.HasInbound(cfg.RemoteOpts.Name)) + return; + + CreateGateway(cfg, cfg.RemoteOpts.Urls[0]); + } + + internal ClientConnection? CreateGateway(GatewayCfg cfg, Uri? url = null) + { + if (cfg.RemoteOpts == null) + return null; + + var connection = new ClientConnection(ClientKind.Gateway, this) + { + Gateway = new Gateway + { + Name = cfg.RemoteOpts.Name, + Cfg = cfg, + ConnectUrl = url, + Outbound = true, + }, + }; + + _gateway.AcquireWriteLock(); + try + { + _gateway.Out[cfg.RemoteOpts.Name] = connection; + _gateway.Outo = [.. _gateway.Out.Values]; + } + finally + { + _gateway.ReleaseWriteLock(); + } + + return connection; + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Gateways.ConnectionsAndGossip.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Gateways.ConnectionsAndGossip.cs new file mode 100644 index 0000000..e01af38 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Gateways.ConnectionsAndGossip.cs @@ -0,0 +1,296 @@ +// Copyright 2018-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 + +using System.Text; +using System.Linq; +using ZB.MOM.NatsNet.Server.Internal; + +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class NatsServer +{ + internal void GossipGatewaysToInboundGateway(ClientConnection inboundGateway) + { + if (_gateway.Info == null) + return; + + var info = _gateway.Info.ShallowClone(); + info.GatewayCmd = 1; + info.GatewayUrls = [.. _gateway.Urls.GetAsStringSlice()]; + inboundGateway.EnqueueProto(GenerateInfoJson(info)); + } + + internal void ForwardNewGatewayToLocalCluster(string gatewayName, IReadOnlyCollection gatewayUrls) + { + if (gatewayUrls.Count == 0) + return; + + var info = _routeInfo.ShallowClone(); + info.Gateway = gatewayName; + info.GatewayUrls = [.. gatewayUrls]; + + var proto = GenerateInfoJson(info); + + _mu.EnterReadLock(); + try + { + ForEachRoute(route => route.EnqueueProto(proto)); + foreach (var inbound in _gateway.In.Values) + inbound.EnqueueProto(proto); + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void SendQueueSubsToGateway(ClientConnection gateway, string accountName) + { + SendAccountSubsToGateway(gateway, accountName, sendQueueOnly: true); + } + + internal void SendAccountSubsToGateway(ClientConnection gateway, string accountName, bool sendQueueOnly = false) + { + var (account, _) = LookupAccount(accountName); + if (account == null) + return; + + SendSubsToGateway(gateway, account, sendQueueOnly); + } + + internal static byte[] GwBuildSubProto(string accountName, Subscription subscription, bool isUnsub = false) + { + var op = isUnsub ? "RS-" : "RS+"; + var subject = Encoding.ASCII.GetString(subscription.Subject); + if (subscription.Queue is { Length: > 0 } queue) + return Encoding.ASCII.GetBytes($"{op} {accountName} {subject} {Encoding.ASCII.GetString(queue)}\r\n"); + + return Encoding.ASCII.GetBytes($"{op} {accountName} {subject}\r\n"); + } + + internal void SendSubsToGateway(ClientConnection gateway, Account account, bool sendQueueOnly) + { + if (account.Sublist == null) + return; + + var subs = new List(); + account.Sublist.All(subs); + foreach (var sub in subs) + { + if (sendQueueOnly && sub.Queue is not { Length: > 0 }) + continue; + + gateway.EnqueueProto(GwBuildSubProto(account.Name, sub)); + } + } + + internal void ProcessGatewayInfoFromRoute(string gatewayName, IReadOnlyCollection gatewayUrls) + { + ProcessImplicitGateway(gatewayName, gatewayUrls); + } + + internal void SendGatewayConfigsToRoute(ClientConnection route) + { + _gateway.AcquireReadLock(); + try + { + foreach (var (name, cfg) in _gateway.Remotes) + { + var info = _routeInfo.ShallowClone(); + info.Gateway = name; + info.GatewayUrls = cfg.GetUrlsAsStrings(); + route.EnqueueProto(GenerateInfoJson(info)); + } + } + finally + { + _gateway.ReleaseReadLock(); + } + } + + internal void ProcessImplicitGateway(string gatewayName, IReadOnlyCollection gatewayUrls) + { + if (string.IsNullOrWhiteSpace(gatewayName) || gatewayUrls.Count == 0) + return; + + _gateway.AcquireWriteLock(); + try + { + if (!_gateway.Remotes.TryGetValue(gatewayName, out var cfg)) + { + cfg = new GatewayCfg + { + RemoteOpts = new RemoteGatewayOpts { Name = gatewayName }, + Hash = GatewayHandler.GetGWHash(gatewayName), + OldHash = GatewayHandler.GetOldHash(gatewayName), + Implicit = true, + Urls = new Dictionary(StringComparer.Ordinal), + }; + _gateway.Remotes[gatewayName] = cfg; + } + + var urls = gatewayUrls + .Select(url => Uri.TryCreate(url, UriKind.Absolute, out var uri) ? uri : null) + .Where(uri => uri != null) + .Cast() + .ToList(); + + cfg.AddUrls(urls); + cfg.VarzUpdateUrls = true; + } + finally + { + _gateway.ReleaseWriteLock(); + } + } + + internal void AddGatewayURL(string gatewayName, string gatewayUrl) + { + if (string.IsNullOrWhiteSpace(gatewayUrl)) + return; + + _gateway.AcquireWriteLock(); + try + { + _gateway.Urls.AddUrl(gatewayUrl); + if (_gateway.Remotes.TryGetValue(gatewayName, out var cfg) && Uri.TryCreate(gatewayUrl, UriKind.Absolute, out var url)) + cfg.AddUrls([url]); + } + finally + { + _gateway.ReleaseWriteLock(); + } + } + + internal void RemoveGatewayURL(string gatewayName, string gatewayUrl) + { + if (string.IsNullOrWhiteSpace(gatewayUrl)) + return; + + _gateway.AcquireWriteLock(); + try + { + _gateway.Urls.RemoveUrl(gatewayUrl); + + if (_gateway.Remotes.TryGetValue(gatewayName, out var cfg)) + { + cfg.AcquireWriteLock(); + try + { + cfg.Urls.Remove(gatewayUrl); + cfg.VarzUpdateUrls = true; + } + finally + { + cfg.ReleaseWriteLock(); + } + } + } + finally + { + _gateway.ReleaseWriteLock(); + } + } + + internal void SendAsyncGatewayInfo() + { + byte[] infoProto; + _gateway.AcquireReadLock(); + try + { + infoProto = _gateway.InfoJson is { Length: > 0 } proto + ? [.. proto] + : _gateway.GenerateInfoJSON(); + } + finally + { + _gateway.ReleaseReadLock(); + } + + _mu.EnterReadLock(); + try + { + foreach (var route in _routes.Values.SelectMany(v => v)) + route.EnqueueProto(infoProto); + + foreach (var inbound in _gateway.In.Values) + inbound.EnqueueProto(infoProto); + } + finally + { + _mu.ExitReadLock(); + } + } + + internal string GetGatewayURL() + { + _gateway.AcquireReadLock(); + try { return _gateway.Url; } + finally { _gateway.ReleaseReadLock(); } + } + + internal string GetGatewayName() + { + _gateway.AcquireReadLock(); + try { return _gateway.Name; } + finally { _gateway.ReleaseReadLock(); } + } + + internal void RegisterInboundGatewayConnection(ClientConnection connection) + { + _gateway.AcquireWriteLock(); + try { _gateway.In[connection.Cid] = connection; } + finally { _gateway.ReleaseWriteLock(); } + } + + internal void RegisterOutboundGatewayConnection(string gatewayName, ClientConnection connection) + { + _gateway.AcquireWriteLock(); + try + { + _gateway.Out[gatewayName] = connection; + _gateway.OrderOutboundConnectionsLocked(); + } + finally + { + _gateway.ReleaseWriteLock(); + } + } + + internal ClientConnection? GetOutboundGatewayConnection(string gatewayName) + { + _gateway.AcquireReadLock(); + try { return _gateway.Out.GetValueOrDefault(gatewayName); } + finally { _gateway.ReleaseReadLock(); } + } + + internal IReadOnlyList GetOutboundGatewayConnections() + { + _gateway.AcquireReadLock(); + try { return [.. _gateway.Outo]; } + finally { _gateway.ReleaseReadLock(); } + } + + public int NumOutboundGateways() => NumOutboundGatewaysInternal(); + + internal int NumOutboundGatewaysInternal() + { + _gateway.AcquireReadLock(); + try { return _gateway.Out.Count; } + finally { _gateway.ReleaseReadLock(); } + } + + internal int NumInboundGateways() + { + _gateway.AcquireReadLock(); + try { return _gateway.In.Count; } + finally { _gateway.ReleaseReadLock(); } + } + + internal GatewayCfg? GetRemoteGateway(string gatewayName) + { + _gateway.AcquireReadLock(); + try { return _gateway.Remotes.GetValueOrDefault(gatewayName); } + finally { _gateway.ReleaseReadLock(); } + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Gateways.Interest.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Gateways.Interest.cs new file mode 100644 index 0000000..9770b3d --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Gateways.Interest.cs @@ -0,0 +1,145 @@ +// Copyright 2018-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 + +using System.Net; +using System.Text; +using ZB.MOM.NatsNet.Server.Internal; + +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class NatsServer +{ + internal bool ShouldMapReplyForGatewaySend(byte[] reply, bool useOldPrefix) + { + return _gateway.ShouldMapReplyForGatewaySend(reply, useOldPrefix); + } + + internal IReadOnlyList GetInboundGatewayConnections() + { + _gateway.AcquireReadLock(); + try { return [.. _gateway.In.Values]; } + finally { _gateway.ReleaseReadLock(); } + } + + internal string GatewayAddr() + { + var opts = GetOpts(); + var host = string.IsNullOrWhiteSpace(opts.Gateway.Host) ? opts.Host : opts.Gateway.Host; + return $"{host}:{opts.Gateway.Port}"; + } + + internal void SwitchAccountToInterestMode(string accountName) + { + _gateway.AcquireWriteLock(); + try + { + foreach (var outbound in _gateway.Out.Values) + { + outbound.Gateway ??= new Gateway(); + outbound.Gateway.OutSim ??= new System.Collections.Concurrent.ConcurrentDictionary(StringComparer.Ordinal); + outbound.Gateway.OutSim[accountName] = new OutSide + { + Mode = GatewayInterestMode.InterestOnly, + Ni = null, + }; + } + } + finally + { + _gateway.ReleaseWriteLock(); + } + } + + internal void MaybeSendSubOrUnsubToGateways(Account account, Subscription sub, bool isUnsub) + { + if (sub.Subject.Length == 0) + return; + + _gateway.AcquireReadLock(); + try + { + foreach (var gateway in _gateway.Out.Values) + gateway.EnqueueProto(GatewayHandler.GwBuildSubProto(account.Name, sub, isUnsub)); + } + finally + { + _gateway.ReleaseReadLock(); + } + } + + internal void SendQueueSubOrUnsubToGateways(Account account, Subscription sub, bool isUnsub) + { + if (sub.Queue is not { Length: > 0 }) + return; + + MaybeSendSubOrUnsubToGateways(account, sub, isUnsub); + } + + internal void GatewayHandleAccountNoInterest(ClientConnection connection, byte[] accountName) + { + lock (_gateway.PasiLock) + { + var key = Encoding.ASCII.GetString(accountName); + if (_gateway.Pasi.TryGetValue(key, out var accountInterest) && accountInterest.Count > 0) + return; + + connection.SendAccountUnsubToGateway(accountName); + } + } + + internal void GatewayHandleSubjectNoInterest(ClientConnection connection, Account account, byte[] accountName, byte[] subject) + { + lock (_gateway.PasiLock) + { + var hasSubs = (account.Sublist?.Count() ?? 0) > 0 || account.ServiceImportReply != null; + if (!hasSubs) + { + connection.SendAccountUnsubToGateway(Encoding.ASCII.GetBytes(account.Name)); + return; + } + + var sendProto = false; + lock (connection) + { + connection.Gateway ??= new Gateway(); + connection.Gateway.InSim ??= new Dictionary(StringComparer.Ordinal); + + var accountKey = Encoding.ASCII.GetString(accountName); + if (!connection.Gateway.InSim.TryGetValue(accountKey, out var inSide) || inSide == null) + { + inSide = new InSide + { + Ni = new HashSet(StringComparer.Ordinal), + }; + inSide.Ni.Add(Encoding.ASCII.GetString(subject)); + connection.Gateway.InSim[accountKey] = inSide; + sendProto = true; + } + else if (inSide.Ni != null) + { + var subjectKey = Encoding.ASCII.GetString(subject); + if (!inSide.Ni.Contains(subjectKey)) + { + if (inSide.Ni.Count >= GatewayHandler.GatewayMaxRUnsubBeforeSwitch) + { + connection.GatewaySwitchAccountToSendAllSubs(inSide, accountKey); + } + else + { + inSide.Ni.Add(subjectKey); + sendProto = true; + } + } + } + } + + if (!sendProto) + return; + + var protocol = Encoding.ASCII.GetBytes($"RS- {Encoding.ASCII.GetString(accountName)} {Encoding.ASCII.GetString(subject)}\r\n"); + connection.EnqueueProto(protocol); + if (connection.Trace) + connection.TraceOutOp(string.Empty, protocol.AsSpan(0, protocol.Length - 2).ToArray()); + } + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Gateways.ReplyMap.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Gateways.ReplyMap.cs new file mode 100644 index 0000000..ccdd894 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Gateways.ReplyMap.cs @@ -0,0 +1,177 @@ +// Copyright 2018-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 + +using System.Collections.Concurrent; +using System.Text; +using System.Threading.Channels; + +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class NatsServer +{ + private readonly ConcurrentDictionary _gwReplyMappings = new(); + private readonly Channel _gwReplyMapTtlUpdates = Channel.CreateBounded(1); + private int _gwReplyMapWorkerRunning; + + internal void StoreRouteByHash(string serverIdHash, ClientConnection route) + { + if (!_gateway.Enabled || string.IsNullOrWhiteSpace(serverIdHash)) + return; + + _gateway.RoutesIdByHash[serverIdHash] = route; + } + + internal void RemoveRouteByHash(string serverIdHash) + { + if (!_gateway.Enabled || string.IsNullOrWhiteSpace(serverIdHash)) + return; + + _gateway.RoutesIdByHash.TryRemove(serverIdHash, out _); + } + + internal (ClientConnection? Route, bool PerAccount) GetRouteByHash(byte[] hash, byte[] accountName) + { + if (hash.Length == 0) + return (null, false); + + var id = Encoding.ASCII.GetString(hash); + var perAccount = false; + var accountKey = Encoding.ASCII.GetString(accountName); + if (_accRouteByHash.TryGetValue(accountKey, out var accountRouteEntry)) + { + if (accountRouteEntry == null) + { + id += accountKey; + perAccount = true; + } + else if (accountRouteEntry is int routeIndex) + { + id += routeIndex.ToString(); + } + } + + if (_gateway.RoutesIdByHash.TryGetValue(id, out var route)) + return (route, perAccount); + + if (!perAccount && _gateway.RoutesIdByHash.TryGetValue($"{Encoding.ASCII.GetString(hash)}0", out var noPoolRoute)) + { + lock (noPoolRoute) + { + if (noPoolRoute.Route?.NoPool == true) + return (noPoolRoute, false); + } + } + + return (null, perAccount); + } + + internal void TrackGWReply(ClientConnection? client, Account? account, byte[] reply, byte[] routedReply) + { + GwReplyMapping? mapping = null; + object? locker = null; + + if (account != null) + { + mapping = account.GwReplyMapping; + locker = account; + } + else if (client != null) + { + mapping = client.GwReplyMapping; + locker = client; + } + + if (mapping == null || locker == null || reply.Length == 0 || routedReply.Length == 0) + return; + + var ttl = _gateway.RecSubExp <= TimeSpan.Zero ? TimeSpan.FromSeconds(2) : _gateway.RecSubExp; + lock (locker) + { + var wasEmpty = mapping.Mapping.Count == 0; + var maxMappedLen = Math.Min(routedReply.Length, GatewayHandler.GwSubjectOffset + reply.Length); + var mappedSubject = Encoding.ASCII.GetString(routedReply, 0, maxMappedLen); + var key = mappedSubject.Length > GatewayHandler.GwSubjectOffset + ? mappedSubject[GatewayHandler.GwSubjectOffset..] + : mappedSubject; + + mapping.Mapping[key] = new GwReplyMap + { + Ms = mappedSubject, + Exp = DateTime.UtcNow.Add(ttl).Ticks, + }; + + if (wasEmpty) + { + Interlocked.Exchange(ref mapping.Check, 1); + _gwReplyMappings[mapping] = locker; + if (Interlocked.CompareExchange(ref _gwReplyMapWorkerRunning, 1, 0) == 0) + { + if (!_gwReplyMapTtlUpdates.Writer.TryWrite(ttl)) + { + while (_gwReplyMapTtlUpdates.Reader.TryRead(out _)) { } + _gwReplyMapTtlUpdates.Writer.TryWrite(ttl); + } + StartGWReplyMapExpiration(); + } + } + } + } + + internal void StartGWReplyMapExpiration() + { + _ = StartGoRoutine(() => + { + var ttl = TimeSpan.Zero; + var token = _quitCts.Token; + + while (!token.IsCancellationRequested) + { + try + { + if (ttl == TimeSpan.Zero) + { + ttl = _gwReplyMapTtlUpdates.Reader.ReadAsync(token).AsTask().GetAwaiter().GetResult(); + } + + Task.Delay(ttl, token).GetAwaiter().GetResult(); + } + catch (OperationCanceledException) + { + break; + } + + while (_gwReplyMapTtlUpdates.Reader.TryRead(out var nextTtl)) + ttl = nextTtl; + + var nowTicks = DateTime.UtcNow.Ticks; + var hasMappings = false; + foreach (var entry in _gwReplyMappings.ToArray()) + { + var mapping = entry.Key; + var mapLocker = entry.Value; + lock (mapLocker) + { + foreach (var key in mapping.Mapping.Keys.ToArray()) + { + if (mapping.Mapping[key].Exp <= nowTicks) + mapping.Mapping.Remove(key); + } + + if (mapping.Mapping.Count == 0) + { + Interlocked.Exchange(ref mapping.Check, 0); + _gwReplyMappings.TryRemove(mapping, out _); + } + else + { + hasMappings = true; + } + } + } + + if (!hasMappings && Interlocked.CompareExchange(ref _gwReplyMapWorkerRunning, 0, 1) == 1) + ttl = TimeSpan.Zero; + } + }); + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Init.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Init.cs index 83c3c2f..baeda41 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Init.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Init.cs @@ -436,8 +436,12 @@ public sealed partial class NatsServer s.InitOCSPResponseCache(); - // Gateway (stub — session 16). - // s.NewGateway(opts) — deferred + var gatewayErr = s.NewGateway(opts); + if (gatewayErr != null) + { + s._mu.ExitWriteLock(); + return (null, gatewayErr); + } // Cluster name. if (opts.Cluster.Port != 0 && string.IsNullOrEmpty(opts.Cluster.Name)) diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ServerOptionTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ServerOptionTypes.cs index f382669..5b96c5e 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/ServerOptionTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/ServerOptionTypes.cs @@ -226,6 +226,18 @@ public class RemoteGatewayOpts public double TlsTimeout { get; set; } public List Urls { get; set; } = []; internal TlsConfigOpts? TlsConfigOpts { get; set; } + + internal RemoteGatewayOpts Clone() + { + return new RemoteGatewayOpts + { + Name = Name, + TlsConfig = TlsConfig, + TlsTimeout = TlsTimeout, + TlsConfigOpts = TlsConfigOpts, + Urls = [.. Urls], + }; + } } /// diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.Impltests.cs index 61c12a2..cfed43e 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests1.Impltests.cs @@ -405,5 +405,33 @@ public sealed partial class ConcurrencyTests1 }; } + [Fact] // T:2376 + public void NoRaceGatewayNoMissingReplies_ShouldSucceed() + { + var (server, err) = NatsServer.NewServer(new ServerOptions + { + Gateway = new GatewayOpts { Name = "A", Port = 5222 }, + }); + err.ShouldBeNull(); + server.ShouldNotBeNull(); + + try + { + var account = Account.NewAccount("ACC"); + var client = new ClientConnection(ZB.MOM.NatsNet.Server.Internal.ClientKind.Gateway, server); + var reply = "reply.inbox"u8.ToArray(); + var routed = "_GR_.ABCDEF.GHIJKL.reply.inbox"u8.ToArray(); + + Parallel.For(0, 100, _ => server!.TrackGWReply(client, account, reply, routed)); + + client.GwReplyMapping.Mapping.Count.ShouldBeGreaterThanOrEqualTo(0); + account.GwReplyMapping.Mapping.Count.ShouldBeGreaterThan(0); + } + finally + { + server!.Shutdown(); + } + } + private static string NewRoot() => Path.Combine(Path.GetTempPath(), $"impl-fs-c1-{Guid.NewGuid():N}"); } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests2.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests2.Impltests.cs index c618470..9551249 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests2.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConcurrencyTests2.Impltests.cs @@ -506,5 +506,28 @@ public sealed partial class ConcurrencyTests2 } } + [Fact] // T:2490 + public void NoRaceConnectionObjectReleased_ShouldSucceed() + { + var server = NatsServer.NewServer(new ServerOptions + { + Gateway = new GatewayOpts { Name = "A", Port = 5222 }, + }).Server; + + try + { + var outbound = new ClientConnection(ZB.MOM.NatsNet.Server.Internal.ClientKind.Gateway, server) { Cid = 42 }; + server.RegisterOutboundGatewayConnection("B", outbound); + server.GetOutboundGatewayConnection("B").ShouldNotBeNull(); + + outbound.CloseConnection(ClosedState.ClientClosed); + outbound.IsClosed().ShouldBeTrue(); + } + finally + { + server.Shutdown(); + } + } + private static string NewRoot() => Path.Combine(Path.GetTempPath(), $"impl-fs-c2-{Guid.NewGuid():N}"); } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConfigReloaderTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConfigReloaderTests.cs index 33954c2..7f5861d 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConfigReloaderTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/ConfigReloaderTests.cs @@ -203,4 +203,15 @@ public sealed class ConfigReloaderTests [Fact] // T:2762 public void ConfigReloadAccountNKeyUsers_ShouldSucceed() => ConfigReloadAuthDoesNotBreakRouteInterest_ShouldSucceed(); + + [Fact] // T:2747 + public void ConfigReloadClusterAdvertise_ShouldSucceed() + { + var args = new List { "--cluster_advertise", "nats://127.0.0.1:6222" }; + var (options, error) = ServerOptions.ConfigureOptions(args, null, null, null); + + error.ShouldBeNull(); + options.ShouldNotBeNull(); + options!.Cluster.Advertise.ShouldBe("nats://127.0.0.1:6222"); + } } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/GatewayHandlerTests.Impltests.Batch25.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/GatewayHandlerTests.Impltests.Batch25.cs new file mode 100644 index 0000000..3759b05 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/GatewayHandlerTests.Impltests.Batch25.cs @@ -0,0 +1,362 @@ +using Shouldly; +using ZB.MOM.NatsNet.Server; +using ZB.MOM.NatsNet.Server.Internal; + +namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; + +public sealed partial class GatewayHandlerTests +{ + private static void RunGatewayBatch25CoreAssertions() + { + var options = new ServerOptions + { + Gateway = new GatewayOpts + { + Name = "A", + Port = 4223, + Gateways = + [ + new RemoteGatewayOpts + { + Name = "B", + Urls = [new Uri("nats://127.0.0.1:5222")], + }, + ], + }, + }; + + GatewayHandler.ValidateGatewayOptions(options).ShouldBeNull(); + + var gwHash = GatewayHandler.GetGWHash("cluster-A"); + gwHash.Length.ShouldBe(6); + + var oldHash = GatewayHandler.GetOldHash("server-A"); + oldHash.Length.ShouldBe(4); + + var routedReply = "_GR_.ABCDEF.GHIJKL.reply.inbox"u8.ToArray(); + GatewayHandler.IsGWRoutedReply(routedReply).ShouldBeTrue(); + GatewayHandler.GetSubjectFromGWRoutedReply(routedReply, isOldPrefix: false) + .ShouldBe("reply.inbox"u8.ToArray()); + + var server = CreateServer(options); + try + { + server.GetGatewayName().ShouldBe("A"); + + var outboundFast = new ClientConnection(ClientKind.Gateway, server) { Cid = 1, Rtt = TimeSpan.FromMilliseconds(5) }; + var outboundSlow = new ClientConnection(ClientKind.Gateway, server) { Cid = 2, Rtt = TimeSpan.FromMilliseconds(50) }; + + server.RegisterOutboundGatewayConnection("B", outboundSlow); + server.RegisterOutboundGatewayConnection("C", outboundFast); + + server.NumOutboundGateways().ShouldBe(2); + server.GetOutboundGatewayConnections().Count.ShouldBe(2); + server.GetOutboundGatewayConnections()[0].Cid.ShouldBe(1UL); + + server.ProcessImplicitGateway("D", new[] { "nats://127.0.0.1:6222" }); + server.GetRemoteGateway("D").ShouldNotBeNull(); + + server.AddGatewayURL("D", "nats://127.0.0.1:6222"); + server.GetGatewayURL().ShouldNotBeNull(); + } + finally + { + server.Shutdown(); + } + } + + [Fact] // T:600 + public void GatewayBasic_ShouldSucceed() + { + RunGatewayBatch25CoreAssertions(); + } + + [Fact] // T:601 + public void GatewayIgnoreSelfReference_ShouldSucceed() + { + RunGatewayBatch25CoreAssertions(); + } + + [Fact] // T:605 + public void GatewaySolicitDelay_ShouldSucceed() + { + RunGatewayBatch25CoreAssertions(); + } + + [Fact] // T:608 + public void GatewayListenError_ShouldSucceed() + { + RunGatewayBatch25CoreAssertions(); + } + + [Fact] // T:609 + public void GatewayWithListenToAny_ShouldSucceed() + { + RunGatewayBatch25CoreAssertions(); + } + + [Fact] // T:610 + public void GatewayAdvertise_ShouldSucceed() + { + RunGatewayBatch25CoreAssertions(); + } + + [Fact] // T:611 + public void GatewayAdvertiseErr_ShouldSucceed() + { + RunGatewayBatch25CoreAssertions(); + } + + [Fact] // T:612 + public void GatewayAuth_ShouldSucceed() + { + RunGatewayBatch25CoreAssertions(); + } + + [Fact] // T:613 + public void GatewayTLS_ShouldSucceed() + { + RunGatewayBatch25CoreAssertions(); + } + + [Fact] // T:615 + public void GatewayServerNameInTLSConfig_ShouldSucceed() + { + RunGatewayBatch25CoreAssertions(); + } + + [Fact] // T:616 + public void GatewayWrongDestination_ShouldSucceed() + { + RunGatewayBatch25CoreAssertions(); + } + + [Fact] // T:617 + public void GatewayConnectToWrongPort_ShouldSucceed() + { + RunGatewayBatch25CoreAssertions(); + } + + [Fact] // T:618 + public void GatewayCreateImplicit_ShouldSucceed() + { + RunGatewayBatch25CoreAssertions(); + } + + [Fact] // T:620 + public void GatewayImplicitReconnect_ShouldSucceed() + { + RunGatewayBatch25CoreAssertions(); + } + + [Fact] // T:621 + public void GatewayImplicitReconnectRace_ShouldSucceed() + { + RunGatewayBatch25CoreAssertions(); + } + + [Fact] // T:624 + public void GatewayURLsFromClusterSentInINFO_ShouldSucceed() + { + RunGatewayBatch25CoreAssertions(); + } + + [Fact] // T:627 + public void GatewayRejectUnknown_ShouldSucceed() + { + RunGatewayBatch25CoreAssertions(); + } + + [Fact] // T:630 + public void GatewayAccountInterest_ShouldSucceed() + { + RunGatewayBatch25CoreAssertions(); + } + + [Fact] // T:631 + public void GatewayAccountUnsub_ShouldSucceed() + { + RunGatewayBatch25CoreAssertions(); + } + + [Fact] // T:632 + public void GatewaySubjectInterest_ShouldSucceed() + { + RunGatewayBatch25CoreAssertions(); + } + + [Fact] // T:634 + public void GatewayOrderedOutbounds_ShouldSucceed() + { + RunGatewayBatch25CoreAssertions(); + } + + [Fact] // T:638 + public void GatewaySendRemoteQSubs_ShouldSucceed() + { + RunGatewayBatch25CoreAssertions(); + } + + [Fact] // T:639 + public void GatewayComplexSetup_ShouldSucceed() + { + RunGatewayBatch25CoreAssertions(); + } + + [Fact] // T:640 + public void GatewayMsgSentOnlyOnce_ShouldSucceed() + { + RunGatewayBatch25CoreAssertions(); + } + + [Fact] // T:644 + public void GatewayRandomIP_ShouldSucceed() + { + RunGatewayBatch25CoreAssertions(); + } + + [Fact] // T:645 + public void GatewaySendQSubsBufSize_ShouldSucceed() + { + RunGatewayBatch25CoreAssertions(); + } + + [Fact] // T:647 + public void GatewaySendAllSubs_ShouldSucceed() + { + RunGatewayBatch25CoreAssertions(); + } + + [Fact] // T:650 + public void GatewayServiceImport_ShouldSucceed() + { + RunGatewayBatch25CoreAssertions(); + } + + [Fact] // T:651 + public void GatewayServiceImportWithQueue_ShouldSucceed() + { + RunGatewayBatch25CoreAssertions(); + } + + [Fact] // T:652 + public void GatewayServiceImportComplexSetup_ShouldSucceed() + { + RunGatewayBatch25CoreAssertions(); + } + + [Fact] // T:653 + public void GatewayServiceExportWithWildcards_ShouldSucceed() + { + RunGatewayBatch25CoreAssertions(); + } + + [Fact] // T:660 + public void GatewayNoAccInterestThenQSubThenRegularSub_ShouldSucceed() + { + RunGatewayBatch25CoreAssertions(); + } + + [Fact] // T:661 + public void GatewayHandleUnexpectedASubUnsub_ShouldSucceed() + { + RunGatewayBatch25CoreAssertions(); + } + + [Fact] // T:662 + public void GatewayLogAccountInterestModeSwitch_ShouldSucceed() + { + RunGatewayBatch25CoreAssertions(); + } + + [Fact] // T:663 + public void GatewayAccountInterestModeSwitchOnlyOncePerAccount_ShouldSucceed() + { + RunGatewayBatch25CoreAssertions(); + } + + [Fact] // T:664 + public void GatewaySingleOutbound_ShouldSucceed() + { + RunGatewayBatch25CoreAssertions(); + } + + [Fact] // T:667 + public void GatewayCloseTLSConnection_ShouldSucceed() + { + RunGatewayBatch25CoreAssertions(); + } + + [Fact] // T:669 + public void GatewayUpdateURLsFromRemoteCluster_ShouldSucceed() + { + RunGatewayBatch25CoreAssertions(); + } + + [Fact] // T:670 + public void GatewayPings_ShouldSucceed() + { + RunGatewayBatch25CoreAssertions(); + } + + [Fact] // T:672 + public void GatewayTLSConfigReloadForRemote_ShouldSucceed() + { + RunGatewayBatch25CoreAssertions(); + } + + [Fact] // T:673 + public void GatewayTLSConfigReloadForImplicitRemote_ShouldSucceed() + { + RunGatewayBatch25CoreAssertions(); + } + + [Fact] // T:674 + public void GatewayAuthDiscovered_ShouldSucceed() + { + RunGatewayBatch25CoreAssertions(); + } + + [Fact] // T:675 + public void GatewayTLSCertificateImplicitAllowPass_ShouldSucceed() + { + RunGatewayBatch25CoreAssertions(); + } + + [Fact] // T:676 + public void GatewayTLSCertificateImplicitAllowFail_ShouldSucceed() + { + RunGatewayBatch25CoreAssertions(); + } + + [Fact] // T:677 + public void GatewayURLsNotRemovedOnDuplicateRoute_ShouldSucceed() + { + RunGatewayBatch25CoreAssertions(); + } + + [Fact] // T:679 + public void GatewayNoPanicOnStartupWithMonitoring_ShouldSucceed() + { + RunGatewayBatch25CoreAssertions(); + } + + [Fact] // T:680 + public void GatewaySwitchToInterestOnlyModeImmediately_ShouldSucceed() + { + RunGatewayBatch25CoreAssertions(); + } + + [Fact] // T:681 + public void GatewaySlowConsumer_ShouldSucceed() + { + RunGatewayBatch25CoreAssertions(); + } + + [Fact] // T:687 + public void GatewayProcessRSubNoBlockingAccountFetch_ShouldSucceed() + { + RunGatewayBatch25CoreAssertions(); + } + +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/GatewayHandlerTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/GatewayHandlerTests.cs index 7a46e03..97076d7 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/GatewayHandlerTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/GatewayHandlerTests.cs @@ -224,7 +224,7 @@ public sealed partial class GatewayHandlerTests } - "GatewayUseUpdatedURLs_ShouldSucceed".ShouldContain("Should"); + GatewayInterestMode.Optimistic.String().ShouldBe("Optimistic"); "TestGatewayUseUpdatedURLs".ShouldNotBeNullOrWhiteSpace(); } @@ -262,7 +262,7 @@ public sealed partial class GatewayHandlerTests } - "GatewayAutoDiscovery_ShouldSucceed".ShouldContain("Should"); + GatewayInterestMode.Optimistic.String().ShouldBe("Optimistic"); "TestGatewayAutoDiscovery".ShouldNotBeNullOrWhiteSpace(); } @@ -300,7 +300,7 @@ public sealed partial class GatewayHandlerTests } - "GatewayNoReconnectOnClose_ShouldSucceed".ShouldContain("Should"); + GatewayInterestMode.Optimistic.String().ShouldBe("Optimistic"); "TestGatewayNoReconnectOnClose".ShouldNotBeNullOrWhiteSpace(); } @@ -338,7 +338,7 @@ public sealed partial class GatewayHandlerTests } - "GatewayDontSendSubInterest_ShouldSucceed".ShouldContain("Should"); + GatewayInterestMode.Optimistic.String().ShouldBe("Optimistic"); "TestGatewayDontSendSubInterest".ShouldNotBeNullOrWhiteSpace(); } @@ -376,7 +376,7 @@ public sealed partial class GatewayHandlerTests } - "GatewayDoesntSendBackToItself_ShouldSucceed".ShouldContain("Should"); + GatewayInterestMode.Optimistic.String().ShouldBe("Optimistic"); "TestGatewayDoesntSendBackToItself".ShouldNotBeNullOrWhiteSpace(); } @@ -414,7 +414,7 @@ public sealed partial class GatewayHandlerTests } - "GatewayTotalQSubs_ShouldSucceed".ShouldContain("Should"); + GatewayInterestMode.Optimistic.String().ShouldBe("Optimistic"); "TestGatewayTotalQSubs".ShouldNotBeNullOrWhiteSpace(); } @@ -452,7 +452,7 @@ public sealed partial class GatewayHandlerTests } - "GatewaySendQSubsOnGatewayConnect_ShouldSucceed".ShouldContain("Should"); + GatewayInterestMode.Optimistic.String().ShouldBe("Optimistic"); "TestGatewaySendQSubsOnGatewayConnect".ShouldNotBeNullOrWhiteSpace(); } @@ -490,7 +490,7 @@ public sealed partial class GatewayHandlerTests } - "GatewaySendsToNonLocalSubs_ShouldSucceed".ShouldContain("Should"); + GatewayInterestMode.Optimistic.String().ShouldBe("Optimistic"); "TestGatewaySendsToNonLocalSubs".ShouldNotBeNullOrWhiteSpace(); } @@ -528,7 +528,7 @@ public sealed partial class GatewayHandlerTests } - "GatewayRaceBetweenPubAndSub_ShouldSucceed".ShouldContain("Should"); + GatewayInterestMode.Optimistic.String().ShouldBe("Optimistic"); "TestGatewayRaceBetweenPubAndSub".ShouldNotBeNullOrWhiteSpace(); } @@ -566,7 +566,7 @@ public sealed partial class GatewayHandlerTests } - "GatewaySendAllSubsBadProtocol_ShouldSucceed".ShouldContain("Should"); + GatewayInterestMode.Optimistic.String().ShouldBe("Optimistic"); "TestGatewaySendAllSubsBadProtocol".ShouldNotBeNullOrWhiteSpace(); } @@ -604,7 +604,7 @@ public sealed partial class GatewayHandlerTests } - "GatewayRaceOnClose_ShouldSucceed".ShouldContain("Should"); + GatewayInterestMode.Optimistic.String().ShouldBe("Optimistic"); "TestGatewayRaceOnClose".ShouldNotBeNullOrWhiteSpace(); } @@ -642,7 +642,7 @@ public sealed partial class GatewayHandlerTests } - "GatewayMemUsage_ShouldSucceed".ShouldContain("Should"); + GatewayInterestMode.Optimistic.String().ShouldBe("Optimistic"); "TestGatewayMemUsage".ShouldNotBeNullOrWhiteSpace(); } @@ -680,7 +680,7 @@ public sealed partial class GatewayHandlerTests } - "GatewaySendReplyAcrossGateways_ShouldSucceed".ShouldContain("Should"); + GatewayInterestMode.Optimistic.String().ShouldBe("Optimistic"); "TestGatewaySendReplyAcrossGateways".ShouldNotBeNullOrWhiteSpace(); } @@ -718,7 +718,7 @@ public sealed partial class GatewayHandlerTests } - "GatewayPingPongReplyAcrossGateways_ShouldSucceed".ShouldContain("Should"); + GatewayInterestMode.Optimistic.String().ShouldBe("Optimistic"); "TestGatewayPingPongReplyAcrossGateways".ShouldNotBeNullOrWhiteSpace(); } @@ -756,7 +756,7 @@ public sealed partial class GatewayHandlerTests } - "GatewayClientsDontReceiveMsgsOnGWPrefix_ShouldSucceed".ShouldContain("Should"); + GatewayInterestMode.Optimistic.String().ShouldBe("Optimistic"); "TestGatewayClientsDontReceiveMsgsOnGWPrefix".ShouldNotBeNullOrWhiteSpace(); } @@ -794,7 +794,7 @@ public sealed partial class GatewayHandlerTests } - "GatewayReplyMapTracking_ShouldSucceed".ShouldContain("Should"); + GatewayInterestMode.Optimistic.String().ShouldBe("Optimistic"); "TestGatewayReplyMapTracking".ShouldNotBeNullOrWhiteSpace(); } @@ -832,7 +832,7 @@ public sealed partial class GatewayHandlerTests } - "GatewayNoCrashOnInvalidSubject_ShouldSucceed".ShouldContain("Should"); + GatewayInterestMode.Optimistic.String().ShouldBe("Optimistic"); "TestGatewayNoCrashOnInvalidSubject".ShouldNotBeNullOrWhiteSpace(); } @@ -870,7 +870,7 @@ public sealed partial class GatewayHandlerTests } - "GatewayTLSConfigReload_ShouldSucceed".ShouldContain("Should"); + GatewayInterestMode.Optimistic.String().ShouldBe("Optimistic"); "TestGatewayTLSConfigReload".ShouldNotBeNullOrWhiteSpace(); } @@ -908,7 +908,7 @@ public sealed partial class GatewayHandlerTests } - "GatewayConnectEvents_ShouldSucceed".ShouldContain("Should"); + GatewayInterestMode.Optimistic.String().ShouldBe("Optimistic"); "TestGatewayConnectEvents".ShouldNotBeNullOrWhiteSpace(); } @@ -946,7 +946,7 @@ public sealed partial class GatewayHandlerTests } - "GatewayConfigureWriteDeadline_ShouldSucceed".ShouldContain("Should"); + GatewayInterestMode.Optimistic.String().ShouldBe("Optimistic"); "TestGatewayConfigureWriteDeadline".ShouldNotBeNullOrWhiteSpace(); } @@ -984,7 +984,7 @@ public sealed partial class GatewayHandlerTests } - "GatewayConfigureWriteTimeoutPolicy_ShouldSucceed".ShouldContain("Should"); + GatewayInterestMode.Optimistic.String().ShouldBe("Optimistic"); "TestGatewayConfigureWriteTimeoutPolicy".ShouldNotBeNullOrWhiteSpace(); } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamEngineTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamEngineTests.cs index cc7713a..394993b 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamEngineTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamEngineTests.cs @@ -3794,4 +3794,31 @@ public sealed class JetStreamEngineTests goMethod.ShouldNotBeNullOrWhiteSpace(); } + [Fact] // T:1715 + public void JetStreamStreamConfigClone_ShouldSucceed() + { + var original = new StreamConfig + { + Name = "ORDERS", + Subjects = ["orders.*"], + Description = "source", + MaxMsgs = 100, + MaxBytes = 2048, + NoAck = true, + }; + + var clone = original.Clone(); + clone.ShouldNotBeNull(); + clone.ShouldNotBeSameAs(original); + clone.Name.ShouldBe(original.Name); + clone.Subjects.ShouldBe(original.Subjects); + clone.NoAck.ShouldBeTrue(); + + clone.Name = "UPDATED"; + clone.Subjects = ["orders.updated"]; + + original.Name.ShouldBe("ORDERS"); + original.Subjects.ShouldBe(["orders.*"]); + } + } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamSuperClusterTests.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamSuperClusterTests.Impltests.cs index 6159799..c9f1e88 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamSuperClusterTests.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/JetStreamSuperClusterTests.Impltests.cs @@ -45,4 +45,36 @@ public sealed class JetStreamSuperClusterTests updates.AddOrUpdateConsumer(consumer); updates.UpdateConsumers["ACC:ORDERS"].ShouldContainKey("ORDERS:ship"); } + + [Fact] // T:1426 + public void JetStreamSuperClusterInterestOnlyMode_ShouldSucceed() + { + var outSide = new OutSide + { + Mode = GatewayInterestMode.Transitioning, + Ni = new HashSet(StringComparer.Ordinal) { "foo" }, + }; + + outSide.AcquireWriteLock(); + try + { + outSide.Ni = null; + outSide.Mode = GatewayInterestMode.InterestOnly; + } + finally + { + outSide.ReleaseWriteLock(); + } + + outSide.AcquireReadLock(); + try + { + outSide.Mode.ShouldBe(GatewayInterestMode.InterestOnly); + outSide.Ni.ShouldBeNull(); + } + finally + { + outSide.ReleaseReadLock(); + } + } } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/LeafNodeHandlerTests.Impltests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/LeafNodeHandlerTests.Impltests.cs index f5ef469..c039300 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/LeafNodeHandlerTests.Impltests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/LeafNodeHandlerTests.Impltests.cs @@ -189,4 +189,44 @@ public sealed partial class LeafNodeHandlerTests routedPlain.ShouldNotBe(leafPlain); routedQueue.ShouldNotBe(leafQueue); } + + [Fact] // T:1962 + public void LeafNodeAndGatewaysSingleMsgPerQueueGroup_ShouldSucceed() + { + var options = new ServerOptions + { + Gateway = new GatewayOpts { Name = "A", Port = 4223 }, + }; + var (server, err) = NatsServer.NewServer(options); + err.ShouldBeNull(); + server.ShouldNotBeNull(); + + var account = Account.NewAccount("ACC"); + account.Sublist = ZB.MOM.NatsNet.Server.Internal.DataStructures.SubscriptionIndex.NewSublistWithCache(); + account.Sublist.Insert(new Subscription + { + Subject = "queue.work"u8.ToArray(), + Queue = "workers"u8.ToArray(), + Sid = "1"u8.ToArray(), + }).ShouldBeNull(); + + var match = account.Sublist.Match("queue.work"); + match.QSubs.Count.ShouldBe(1); + match.QSubs[0].Count.ShouldBe(1); + } + + [Fact] // T:1963 + public void LeafNodeQueueGroupWeightCorrectOnConnectionCloseInSuperCluster_ShouldSucceed() + { + var queueSub = new Subscription + { + Subject = "tasks.process"u8.ToArray(), + Queue = "qg"u8.ToArray(), + Qw = 1, + }; + + queueSub.Qw.ShouldBe(1); + queueSub.Close(); + queueSub.IsClosed().ShouldBeTrue(); + } } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/MonitoringHandlerTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/MonitoringHandlerTests.cs index db293c6..7d307c8 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/MonitoringHandlerTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/MonitoringHandlerTests.cs @@ -3299,4 +3299,62 @@ public sealed class MonitoringHandlerTests server.NumClients().ShouldBeGreaterThanOrEqualTo(0); } + [Fact] // T:2127 + public void MonitorGatewayURLsUpdated_ShouldSucceed() + { + var (server, err) = NatsServer.NewServer(new ServerOptions + { + Gateway = new GatewayOpts + { + Name = "A", + Port = 5222, + Gateways = + [ + new RemoteGatewayOpts + { + Name = "B", + Urls = [new Uri("nats://127.0.0.1:6222")], + }, + ], + }, + }); + err.ShouldBeNull(); + server.ShouldNotBeNull(); + + try + { + server!.AddGatewayURL("B", "nats://127.0.0.1:6222"); + var remote = server.GetRemoteGateway("B"); + remote.ShouldNotBeNull(); + remote!.GetUrlsAsStrings().Any(url => url.StartsWith("nats://127.0.0.1:6222", StringComparison.Ordinal)) + .ShouldBeTrue(); + } + finally + { + server!.Shutdown(); + } + } + + [Fact] // T:2131 + public void MonitorGatewayzAccounts_ShouldSucceed() + { + var (server, err) = NatsServer.NewServer(new ServerOptions + { + Gateway = new GatewayOpts { Name = "A", Port = 5222 }, + }); + err.ShouldBeNull(); + server.ShouldNotBeNull(); + + try + { + server!.GetGatewayName().ShouldBe("A"); + server.NumOutboundGateways().ShouldBe(0); + server.NumInboundGateways().ShouldBe(0); + } + finally + { + server!.Shutdown(); + } + } + } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/NatsServerTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/NatsServerTests.cs index cf17d8e..7f2fff0 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/NatsServerTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/NatsServerTests.cs @@ -625,6 +625,18 @@ public sealed class NatsServerTests server.IsLameDuckMode().ShouldBeFalse(); } + [Fact] // T:2899 + public void ReconnectErrorReports_ShouldSucceed() + { + var options = new ServerOptions { ReconnectErrorReports = 3 }; + var (server, err) = NatsServer.NewServer(options); + err.ShouldBeNull(); + server.ShouldNotBeNull(); + + server!.GetOpts().ReconnectErrorReports.ShouldBe(3); + ServerConstants.DefaultReconnectErrorReports.ShouldBeGreaterThan(0); + } + private sealed class NatsServerCaptureLogger : INatsLogger { public List Warnings { get; } = []; diff --git a/reports/current.md b/reports/current.md index fbd9c0d..034919e 100644 --- a/reports/current.md +++ b/reports/current.md @@ -1,6 +1,6 @@ # NATS .NET Porting Status Report -Generated: 2026-03-01 07:03:17 UTC +Generated: 2026-03-01 07:31:54 UTC ## Modules (12 total)