From 4974339b52462062eb4e2eac2c7c2eadebece01a Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 19:32:59 -0500 Subject: [PATCH] feat(batch17): port lifecycle, tls, and rate-limited logging features --- .../ClientConnection.LifecycleAndTls.cs | 516 ++++++++++++++++++ ...ientConnection.SubscriptionsAndDelivery.cs | 2 +- .../ZB.MOM.NatsNet.Server/ClientConnection.cs | 33 +- .../src/ZB.MOM.NatsNet.Server/ClientTypes.cs | 10 + .../NatsServer.Listeners.cs | 5 +- .../ClientConnectionStubFeaturesTests.cs | 113 ++++ .../ImplBacklog/NatsServerTests.cs | 33 ++ porting.db | Bin 6672384 -> 6676480 bytes 8 files changed, 694 insertions(+), 18 deletions(-) create mode 100644 dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.LifecycleAndTls.cs diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.LifecycleAndTls.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.LifecycleAndTls.cs new file mode 100644 index 0000000..3eac89d --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.LifecycleAndTls.cs @@ -0,0 +1,516 @@ +// Copyright 2012-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); + +using System.Collections.Concurrent; +using System.Net.Security; +using System.Runtime.CompilerServices; +using System.Security.Authentication; +using System.Text; +using System.Linq; +using ZB.MOM.NatsNet.Server.Auth; +using ZB.MOM.NatsNet.Server.Internal; +using ZB.MOM.NatsNet.Server.Internal.DataStructures; + +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class ClientConnection +{ + private static readonly TimeSpan FirstPingInterval = TimeSpan.FromSeconds(15); + private static readonly TimeSpan FirstClientPingInterval = TimeSpan.FromSeconds(2); + private const int MaxPerAccountCacheSize = 8192; + private const string StaleErrProtoFormat = "-ERR '{0}'\r\n"; + private static readonly ConditionalWeakTable> RateLimitCacheByServer = new(); + + internal void WatchForStaleConnection(TimeSpan pingInterval, int pingMax) + { + if (pingInterval <= TimeSpan.Zero || pingMax < 0) + return; + + var staleAfter = TimeSpan.FromTicks(pingInterval.Ticks * (pingMax + 1L)); + if (staleAfter <= TimeSpan.Zero) + return; + + ClearPingTimer(); + _pingTimer = new Timer(_ => + { + lock (_mu) + { + if (IsClosed()) + return; + + Debugf("Stale Client Connection - Closing"); + EnqueueProto(Encoding.ASCII.GetBytes(string.Format(StaleErrProtoFormat, "Stale Connection"))); + } + CloseConnection(ClosedState.StaleConnection); + }, null, staleAfter, Timeout.InfiniteTimeSpan); + } + + internal void SwapAccountAfterReload() + { + string accountName; + lock (_mu) + { + if (_account is null || Server is null) + return; + accountName = _account.Name; + } + + if (Server is not NatsServer server) + return; + + var (updated, _) = server.LookupAccount(accountName); + if (updated is null) + return; + + lock (_mu) + { + if (!ReferenceEquals(_account, updated)) + _account = updated; + } + } + + internal void ProcessSubsOnConfigReload(ISet? accountsWithChangedStreamImports) + { + INatsAccount? acc; + var checkPerms = false; + var checkAcc = false; + var retained = new List(); + var removed = new List(); + + lock (_mu) + { + checkPerms = Perms is not null; + checkAcc = _account is not null; + acc = _account; + if (!checkPerms && !checkAcc) + return; + + if (checkAcc && acc is not null && accountsWithChangedStreamImports is not null && + !accountsWithChangedStreamImports.Contains(acc.Name)) + { + checkAcc = false; + } + + MPerms = null; + foreach (var sub in Subs.Values) + { + var subject = Encoding.ASCII.GetString(sub.Subject); + var canSub = CanSubscribe(subject); + var canQSub = sub.Queue is { Length: > 0 } q && CanSubscribe(subject, Encoding.ASCII.GetString(q)); + + if (!canSub && !canQSub) + { + removed.Add(sub); + } + else if (checkAcc) + { + retained.Add(sub); + } + } + } + + if (checkAcc && acc is not null) + { + foreach (var sub in retained) + { + AddShadowSubscriptions(acc, sub); + } + } + + foreach (var sub in removed) + { + Unsubscribe(acc, sub, force: true, remove: true); + var sid = sub.Sid is { Length: > 0 } s ? Encoding.ASCII.GetString(s) : string.Empty; + SendErr($"Permissions Violation for Subscription to \"{Encoding.ASCII.GetString(sub.Subject)}\" (sid \"{sid}\")"); + Noticef("Removed sub \"{0}\" (sid \"{1}\") for \"{2}\" - not authorized", + Encoding.ASCII.GetString(sub.Subject), sid, GetAuthUser()); + } + } + + internal void Reconnect() + { + lock (_mu) + { + if (Flags.IsSet(ClientFlags.NoReconnect) || Server is null) + return; + } + + // Route/gateway/leaf reconnect orchestration is owned by server sessions. + } + + internal (INatsAccount? Account, SubscriptionIndexResult? Result) GetAccAndResultFromCache() + { + var pa = ParseCtx.Pa; + if (pa.Subject is null || pa.Subject.Length == 0) + return (null, null); + + _in.PaCache ??= new Dictionary(StringComparer.Ordinal); + var cacheKeyBytes = pa.PaCache is { Length: > 0 } k ? k : pa.Subject; + var cacheKey = Encoding.ASCII.GetString(cacheKeyBytes); + + if (_in.PaCache.TryGetValue(cacheKey, out var cached) && + cached.Acc is Account cachedAcc && + cached.Results is not null && + cachedAcc.Sublist is not null && + cached.GenId == (ulong)cachedAcc.Sublist.GenId()) + { + return (cached.Acc, cached.Results); + } + + INatsAccount? acc = null; + if (Kind == ClientKind.Router && pa.Account is { Length: > 0 } && _account is not null) + { + acc = _account; + } + else if (Server is NatsServer server && pa.Account is { Length: > 0 } accountNameBytes) + { + var accountName = Encoding.ASCII.GetString(accountNameBytes); + (acc, _) = server.LookupAccount(accountName); + } + + if (acc is not Account concreteAcc || concreteAcc.Sublist is null) + return (null, null); + + var result = concreteAcc.Sublist.MatchBytes(pa.Subject); + if (_in.PaCache.Count >= MaxPerAccountCacheSize) + { + foreach (var key in _in.PaCache.Keys.ToArray()) + { + _in.PaCache.Remove(key); + if (_in.PaCache.Count < MaxPerAccountCacheSize) + break; + } + } + + _in.PaCache[cacheKey] = new PerAccountCache + { + Acc = concreteAcc, + Results = result, + GenId = (ulong)concreteAcc.Sublist.GenId(), + }; + return (concreteAcc, result); + } + + internal void PruneClosedSubFromPerAccountCache() + { + if (_in.PaCache is null || _in.PaCache.Count == 0) + return; + + foreach (var key in _in.PaCache.Keys.ToArray()) + { + var entry = _in.PaCache[key]; + var result = entry.Results; + if (result is null) + { + _in.PaCache.Remove(key); + continue; + } + + var remove = result.PSubs.Any(static s => s.IsClosed()); + if (!remove) + { + foreach (var qsub in result.QSubs) + { + if (qsub.Any(static s => s.IsClosed())) + { + remove = true; + break; + } + } + } + + if (remove) + _in.PaCache.Remove(key); + } + } + + internal void AddServerAndClusterInfo(ClientInfo? ci) + { + if (ci is null) + return; + + if (Server is NatsServer server) + { + ci.Server = Kind == ClientKind.Leaf ? ci.Server : server.Name(); + var cluster = server.CachedClusterName(); + if (!string.IsNullOrWhiteSpace(cluster)) + ci.Cluster = [cluster]; + } + } + + internal ClientInfo? GetClientInfo(bool detailed) + { + if (Kind is not (ClientKind.Client or ClientKind.Leaf or ClientKind.JetStream or ClientKind.Account)) + return null; + + var ci = new ClientInfo(); + if (detailed) + AddServerAndClusterInfo(ci); + + lock (_mu) + { + ci.Account = _account?.Name ?? string.Empty; + ci.Rtt = Rtt; + if (!detailed) + return ci; + + ci.Start = Start == default ? string.Empty : Start.ToString("O"); + ci.Host = Host; + ci.Id = Cid; + ci.Name = Opts.Name; + ci.User = GetRawAuthUser(); + ci.Lang = Opts.Lang; + ci.Version = Opts.Version; + ci.Jwt = Opts.Jwt; + ci.NameTag = NameTag; + ci.Kind = KindString(); + ci.ClientType = ClientTypeString(); + } + + return ci; + } + + internal Exception? DoTLSServerHandshake( + string typ, + SslServerAuthenticationOptions tlsConfig, + double timeout, + PinnedCertSet? pinnedCerts) + { + var (_, err) = DoTLSHandshake(typ, solicit: false, null, tlsConfig, null, string.Empty, timeout, pinnedCerts); + return err; + } + + internal (bool resetTlsName, Exception? err) DoTLSClientHandshake( + string typ, + Uri? url, + SslClientAuthenticationOptions tlsConfig, + string tlsName, + double timeout, + PinnedCertSet? pinnedCerts) + { + return DoTLSHandshake(typ, solicit: true, url, null, tlsConfig, tlsName, timeout, pinnedCerts); + } + + internal (bool resetTlsName, Exception? err) DoTLSHandshake( + string typ, + bool solicit, + Uri? url, + SslServerAuthenticationOptions? serverTlsConfig, + SslClientAuthenticationOptions? clientTlsConfig, + string tlsName, + double timeout, + PinnedCertSet? pinnedCerts) + { + if (_nc is null) + return (false, ServerErrors.ErrConnectionClosed); + + var kind = Kind; + var resetTlsName = false; + Exception? err = null; + SslStream? ssl = null; + + try + { + var baseStream = _nc; + if (solicit) + { + Debugf("Starting TLS {0} client handshake", typ); + var options = clientTlsConfig ?? new SslClientAuthenticationOptions(); + if (string.IsNullOrWhiteSpace(options.TargetHost)) + { + var host = url?.Host ?? string.Empty; + options.TargetHost = !string.IsNullOrWhiteSpace(tlsName) ? tlsName : host; + } + + ssl = new SslStream(baseStream, leaveInnerStreamOpen: false); + _nc = ssl; + + using var cts = timeout > 0 + ? new CancellationTokenSource(TimeSpan.FromSeconds(timeout)) + : new CancellationTokenSource(); + ssl.AuthenticateAsClientAsync(options, cts.Token).GetAwaiter().GetResult(); + } + else + { + Debugf(kind == ClientKind.Client + ? "Starting TLS client connection handshake" + : "Starting TLS {0} server handshake", typ); + ssl = new SslStream(baseStream, leaveInnerStreamOpen: false); + _nc = ssl; + + using var cts = timeout > 0 + ? new CancellationTokenSource(TimeSpan.FromSeconds(timeout)) + : new CancellationTokenSource(); + ssl.AuthenticateAsServerAsync(serverTlsConfig ?? new SslServerAuthenticationOptions(), cts.Token) + .GetAwaiter() + .GetResult(); + } + + if (pinnedCerts is { Count: > 0 } && !MatchesPinnedCert(pinnedCerts)) + err = new InvalidOperationException("certificate not pinned"); + } + catch (AuthenticationException authEx) + { + if (solicit && !string.IsNullOrWhiteSpace(tlsName) && url is not null && + string.Equals(url.Host, tlsName, StringComparison.OrdinalIgnoreCase)) + { + resetTlsName = true; + } + err = authEx; + } + catch (OperationCanceledException) + { + err = new TimeoutException("TLS handshake timeout"); + } + catch (Exception ex) + { + err = ex; + } + + if (err is null) + { + lock (_mu) + { + Flags = Flags.Set(ClientFlags.HandshakeComplete); + if (IsClosed()) + return (false, ServerErrors.ErrConnectionClosed); + } + return (false, null); + } + + if (kind == ClientKind.Client) + Errorf("TLS handshake error: {0}", err.Message); + else + Errorf("TLS {0} handshake error: {1}", typ, err.Message); + + CloseConnection(ClosedState.TlsHandshakeError); + return (resetTlsName, ServerErrors.ErrConnectionClosed); + } + + internal static (HashSet Allowed, Exception? Error) ConvertAllowedConnectionTypes(IEnumerable cts) + { + var unknown = new List(); + var allowed = new HashSet(StringComparer.Ordinal); + + foreach (var value in cts) + { + var upper = value.ToUpperInvariant(); + if (AuthHandler.ConnectionTypes.IsKnown(upper)) + { + allowed.Add(upper); + } + else + { + unknown.Add(upper); + } + } + + return unknown.Count == 0 + ? (allowed, null) + : (allowed, new ArgumentException($"invalid connection types \"{string.Join(",", unknown)}\"")); + } + + internal void RateLimitErrorf(string format, params object?[] args) + { + if (Server is null) + return; + + var statement = string.Format(format, args); + if (!TryMarkRateLimited("ERR:" + statement)) + return; + + var suffix = FormatClientSuffix(); + if (!string.IsNullOrWhiteSpace(String())) + Errorf("{0} - {1}{2}", String(), statement, suffix); + else + Errorf("{0}{1}", statement, suffix); + } + + internal void RateLimitFormatWarnf(string format, params object?[] args) + { + if (Server is null) + return; + + if (!TryMarkRateLimited("WARN_FMT:" + format)) + return; + + var statement = string.Format(format, args); + var suffix = FormatClientSuffix(); + if (!string.IsNullOrWhiteSpace(String())) + Warnf("{0} - {1}{2}", String(), statement, suffix); + else + Warnf("{0}{1}", statement, suffix); + } + + internal void RateLimitWarnf(string format, params object?[] args) + { + if (Server is null) + return; + + var statement = string.Format(format, args); + if (!TryMarkRateLimited("WARN:" + statement)) + return; + + var suffix = FormatClientSuffix(); + if (!string.IsNullOrWhiteSpace(String())) + Warnf("{0} - {1}{2}", String(), statement, suffix); + else + Warnf("{0}{1}", statement, suffix); + } + + internal void RateLimitDebugf(string format, params object?[] args) + { + if (Server is null) + return; + + var statement = string.Format(format, args); + if (!TryMarkRateLimited("DBG:" + statement)) + return; + + var suffix = FormatClientSuffix(); + if (!string.IsNullOrWhiteSpace(String())) + Debugf("{0} - {1}{2}", String(), statement, suffix); + else + Debugf("{0}{1}", statement, suffix); + } + + internal void SetFirstPingTimer() + { + var opts = Server?.Options; + if (opts is null) + return; + + var d = opts.PingInterval; + if (Kind == ClientKind.Router && opts.Cluster.PingInterval > TimeSpan.Zero) + d = opts.Cluster.PingInterval; + if (IsWebSocket() && opts.Websocket.PingInterval > TimeSpan.Zero) + d = opts.Websocket.PingInterval; + + if (!opts.DisableShortFirstPing) + { + if (Kind != ClientKind.Client) + { + if (d > FirstPingInterval) + d = FirstPingInterval; + d = AdjustPingInterval(Kind, d); + } + else if (d > FirstClientPingInterval) + { + d = FirstClientPingInterval; + } + } + + var addTicks = d.Ticks > 0 ? Random.Shared.NextInt64(Math.Max(1, d.Ticks / 5)) : 0L; + d = d.Add(TimeSpan.FromTicks(addTicks)); + + ClearPingTimer(); + _pingTimer = new Timer(_ => ProcessPingTimer(), null, d, Timeout.InfiniteTimeSpan); + } + + private bool TryMarkRateLimited(string key) + { + var serverKey = (object?)Server ?? this; + var cache = RateLimitCacheByServer.GetOrCreateValue(serverKey); + return cache.TryAdd(key, DateTime.UtcNow); + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.SubscriptionsAndDelivery.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.SubscriptionsAndDelivery.cs index 927c304..8b26bba 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.SubscriptionsAndDelivery.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.SubscriptionsAndDelivery.cs @@ -134,7 +134,7 @@ public sealed partial class ClientConnection return null; // Max-delivery based deferred unsub is not modeled yet, so unsubscribe immediately. - Unsubscribe(Account, sub, force: true, remove: true); + Unsubscribe(_account, sub, force: true, remove: true); } if (Opts.Verbose) diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs index edecabb..516ad45 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/ClientConnection.cs @@ -97,7 +97,7 @@ public sealed partial class ClientConnection // Connection kind and server references. internal ClientKind Kind; // mirrors c.kind internal INatsServer? Server; // mirrors c.srv - internal INatsAccount? Account; // mirrors c.acc + internal INatsAccount? _account; // mirrors c.acc internal ClientPermissions? Perms; // mirrors c.perms internal MsgDeny? MPerms; // mirrors c.mperms @@ -439,15 +439,15 @@ public sealed partial class ClientConnection if (!acc.IsValid) throw new BadAccountException(); // Deregister from previous account. - if (Account is not null) + if (_account is not null) { - var prev = Account.RemoveClient(this); + var prev = _account.RemoveClient(this); if (prev == 1) Server?.DecActiveAccounts(); } lock (_mu) { - Account = acc; + _account = acc; ApplyAccountLimits(); } @@ -503,7 +503,7 @@ public sealed partial class ClientConnection /// internal void ApplyAccountLimits() { - if (Account is null || (Kind != ClientKind.Client && Kind != ClientKind.Leaf)) + if (_account is null || (Kind != ClientKind.Client && Kind != ClientKind.Leaf)) return; Volatile.Write(ref _mpay, JwtNoLimit); @@ -1111,7 +1111,7 @@ public sealed partial class ClientConnection internal void SetAccount(INatsAccount? acc) { - lock (_mu) { Account = acc; } + lock (_mu) { _account = acc; } } internal void SetAccount(Account? acc) => SetAccount(acc as INatsAccount); @@ -1360,25 +1360,29 @@ public sealed partial class ClientConnection // Account / server helpers (features 540-545) // ========================================================================= - internal INatsAccount? GetAccount() + internal INatsAccount? Account() { - lock (_mu) { return Account; } + lock (_mu) { return _account; } } + internal INatsAccount? GetAccount() => Account(); + // ========================================================================= // TLS handshake helpers (features 546-548) // ========================================================================= internal async Task DoTlsServerHandshakeAsync(SslServerAuthenticationOptions opts, CancellationToken ct = default) { - // Deferred: full TLS flow will be completed with server integration. - return false; + _ = ct; + return await Task.FromResult( + DoTLSServerHandshake("client", opts, Server?.Options.TlsTimeout ?? 2, Server?.Options.TlsPinnedCerts) is null); } internal async Task DoTlsClientHandshakeAsync(SslClientAuthenticationOptions opts, CancellationToken ct = default) { - // Deferred: full TLS flow will be completed with server integration. - return false; + _ = ct; + var (_, err) = DoTLSClientHandshake("route", null, opts, opts.TargetHost ?? string.Empty, Server?.Options.TlsTimeout ?? 2, null); + return await Task.FromResult(err is null); } // ========================================================================= @@ -1759,9 +1763,8 @@ public sealed partial class ClientConnection // features 477-496 and 487-503: see ClientConnection.SubscriptionsAndDelivery.cs // features 497-515 and 520: see ClientConnection.InboundAndHeaders.cs - // feature 534-535: swapAccountAfterReload, processSubsOnConfigReload - // feature 537: reconnect - // feature 569: setFirstPingTimer + // features 521-522, 534-535, 537, 540-548, 553, 565-569: + // see ClientConnection.LifecycleAndTls.cs // ========================================================================= // IsMqtt / IsWebSocket helpers (used by clientType, not separately tracked) diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/ClientTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/ClientTypes.cs index 608761e..ca53a62 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/ClientTypes.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/ClientTypes.cs @@ -292,10 +292,12 @@ public sealed class ClientOptions /// public sealed class ClientInfo { + public string Server { get; set; } = string.Empty; public string Start { get; set; } = string.Empty; public string Host { get; set; } = string.Empty; public ulong Id { get; set; } public string Account { get; set; } = string.Empty; + public string ServiceName { get; set; } = string.Empty; public string User { get; set; } = string.Empty; public string Name { get; set; } = string.Empty; public string Lang { get; set; } = string.Empty; @@ -311,6 +313,7 @@ public sealed class ClientInfo public bool Restart { get; set; } public bool Disconnect { get; set; } public string[]? Cluster { get; set; } + public List Alternates { get; set; } = []; public bool Service { get; set; } /// @@ -319,6 +322,13 @@ public sealed class ClientInfo /// Added here to support . /// public TimeSpan Rtt { get; set; } + + /// + /// Returns the service account for this client info payload. + /// Mirrors Go ClientInfo.serviceAccount(). + /// + public string ServiceAccount() => + string.IsNullOrWhiteSpace(ServiceName) ? Account : ServiceName; } // ============================================================================ diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Listeners.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Listeners.cs index 6056e61..dc176c6 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Listeners.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Listeners.cs @@ -690,8 +690,9 @@ public sealed partial class NatsServer lock (c) { // acc name if not the global account. - if (c.Account?.Name != null && c.Account.Name != ServerConstants.DefaultGlobalAccount) - acc = c.Account.Name; + var account = c.GetAccount(); + if (account?.Name != null && account.Name != ServerConstants.DefaultGlobalAccount) + acc = account.Name; } var cc = new ClosedClient diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ClientConnectionStubFeaturesTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ClientConnectionStubFeaturesTests.cs index 19e14f3..ccaf654 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ClientConnectionStubFeaturesTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ClientConnectionStubFeaturesTests.cs @@ -283,4 +283,117 @@ public sealed class ClientConnectionStubFeaturesTests result.PSubs.Add(new Subscription { Subject = Encoding.ASCII.GetBytes("foo"), Sid = Encoding.ASCII.GetBytes("1") }); c.ProcessMsgResults(null, result, "hello\r\n"u8.ToArray(), null, Encoding.ASCII.GetBytes("foo"), null, PmrFlags.None).didDeliver.ShouldBeTrue(); } + + [Fact] + public void LifecycleAndTlsHelpers_GroupC_ShouldBehave() + { + var logger = new CaptureLogger(); + var (server, err) = NatsServer.NewServer(new ServerOptions + { + PingInterval = TimeSpan.FromMilliseconds(120), + }); + err.ShouldBeNull(); + server.SetLogger(logger, debugFlag: true, traceFlag: true); + + using var ms = new MemoryStream(); + var c = new ClientConnection(ClientKind.Client, server, ms) + { + Cid = 42, + Host = "127.0.0.1", + Start = DateTime.UtcNow.AddSeconds(-2), + Rtt = TimeSpan.FromMilliseconds(5), + }; + + c.SetFirstPingTimer(); + GetTimer(c, "_pingTimer").ShouldNotBeNull(); + + c.WatchForStaleConnection(TimeSpan.FromMilliseconds(20), pingMax: 0); + Thread.Sleep(60); + c.IsClosed().ShouldBeTrue(); + + var temp = Account.NewAccount("A"); + temp.Sublist = SubscriptionIndex.NewSublistWithCache(); + c.SetAccount(temp); + + var registered = server.LookupOrRegisterAccount("A").Account; + registered.Sublist = SubscriptionIndex.NewSublistWithCache(); + var inserted = new Subscription + { + Subject = Encoding.ASCII.GetBytes("foo.bar"), + Sid = Encoding.ASCII.GetBytes("11"), + }; + registered.Sublist.Insert(inserted).ShouldBeNull(); + + c.SwapAccountAfterReload(); + c.GetAccount().ShouldBe(registered); + + c.Perms = new ClientPermissions(); + c.Perms.Sub.Deny = SubscriptionIndex.NewSublistWithCache(); + c.Perms.Sub.Deny.Insert(new Subscription { Subject = Encoding.ASCII.GetBytes(">") }).ShouldBeNull(); + c.Subs["22"] = new Subscription + { + Subject = Encoding.ASCII.GetBytes("foo.bar"), + Sid = Encoding.ASCII.GetBytes("22"), + }; + c.ProcessSubsOnConfigReload(new HashSet(StringComparer.Ordinal) { registered.Name }); + c.Subs.ContainsKey("22").ShouldBeFalse(); + + c.ParseCtx.Pa.Account = Encoding.ASCII.GetBytes("A"); + c.ParseCtx.Pa.Subject = Encoding.ASCII.GetBytes("foo.bar"); + c.ParseCtx.Pa.PaCache = Encoding.ASCII.GetBytes("A:foo.bar"); + var cached = c.GetAccAndResultFromCache(); + cached.Account.ShouldBe(registered); + cached.Result.ShouldNotBeNull(); + cached.Result.PSubs.Count.ShouldBeGreaterThan(0); + + var closedSub = new Subscription { Subject = Encoding.ASCII.GetBytes("foo.closed") }; + closedSub.Close(); + var inField = typeof(ClientConnection).GetField("_in", BindingFlags.Instance | BindingFlags.NonPublic)!; + var state = (ReadCacheState)inField.GetValue(c)!; + state.PaCache = new Dictionary(StringComparer.Ordinal) + { + ["closed"] = new PerAccountCache + { + Acc = registered, + Results = new SubscriptionIndexResult + { + PSubs = { closedSub }, + }, + GenId = 1, + }, + }; + inField.SetValue(c, state); + c.PruneClosedSubFromPerAccountCache(); + state = (ReadCacheState)inField.GetValue(c)!; + state.PaCache.ShouldNotBeNull(); + state.PaCache.Count.ShouldBe(0); + + var info = c.GetClientInfo(detailed: true); + info.ShouldNotBeNull(); + info!.Account.ShouldBe("A"); + info.Server.ShouldNotBeNullOrWhiteSpace(); + info.ServiceAccount().ShouldBe("A"); + + var (allowed, convertErr) = ClientConnection.ConvertAllowedConnectionTypes( + ["standard", "mqtt", "bad"]); + allowed.ShouldContain(AuthHandler.ConnectionTypes.Standard); + allowed.ShouldContain(AuthHandler.ConnectionTypes.Mqtt); + convertErr.ShouldNotBeNull(); + + c.RateLimitWarnf("warn {0}", 1); + c.RateLimitWarnf("warn {0}", 1); + logger.Warnings.Count.ShouldBe(1); + } + + private sealed class CaptureLogger : INatsLogger + { + public List Warnings { get; } = []; + + public void Noticef(string format, params object[] args) { } + public void Warnf(string format, params object[] args) => Warnings.Add(string.Format(format, args)); + public void Fatalf(string format, params object[] args) { } + public void Errorf(string format, params object[] args) { } + public void Debugf(string format, params object[] args) { } + public void Tracef(string format, params object[] args) { } + } } diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/NatsServerTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/NatsServerTests.cs index e4f24b7..eca96fa 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/NatsServerTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/NatsServerTests.cs @@ -6,6 +6,26 @@ namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; public sealed class NatsServerTests { + [Fact] + public void RateLimitedClientLogging_ShouldSuppressDuplicates() + { + var logger = new NatsServerCaptureLogger(); + var (server, err) = NatsServer.NewServer(new ServerOptions()); + err.ShouldBeNull(); + server.SetLogger(logger, debugFlag: true, traceFlag: true); + + var c = new ClientConnection(ClientKind.Client, server, new MemoryStream()); + c.RateLimitWarnf("duplicate warning {0}", "A"); + c.RateLimitWarnf("duplicate warning {0}", "A"); + c.RateLimitFormatWarnf("format warning {0}", "B"); + c.RateLimitFormatWarnf("format warning {0}", "C"); + c.RateLimitErrorf("duplicate error {0}", "X"); + c.RateLimitErrorf("duplicate error {0}", "X"); + + logger.Warnings.Count.ShouldBe(2); + logger.Errors.Count.ShouldBe(1); + } + [Fact] // T:2886 public void CustomRouterAuthentication_ShouldSucceed() { @@ -518,4 +538,17 @@ public sealed class NatsServerTests "TestServerShutdownDuringStart".ShouldNotBeNullOrWhiteSpace(); } + private sealed class NatsServerCaptureLogger : INatsLogger + { + public List Warnings { get; } = []; + public List Errors { get; } = []; + + public void Noticef(string format, params object[] args) { } + public void Warnf(string format, params object[] args) => Warnings.Add(string.Format(format, args)); + public void Fatalf(string format, params object[] args) { } + public void Errorf(string format, params object[] args) => Errors.Add(string.Format(format, args)); + public void Debugf(string format, params object[] args) { } + public void Tracef(string format, params object[] args) { } + } + } diff --git a/porting.db b/porting.db index 6278a21852447c74f9249bd352e9dd2cbf8af694..26ab5b27c083af13cd4c6a1df10f793661b04619 100644 GIT binary patch delta 7249 zcmbuE3wRXO6~|}pyeD^OHX$TzUYn2z34~;Kvq?6Q0HGA|#UqIb2$e(e$}r;mEy?5At~c-dFmZ{b+P%PCSLd!ax!}Ng_V@Z|&o7s`>lQ6o#+kTab>Q%oQj_SByrJy?CSIMY$ z`jr(`t6wFdTH{xVsA~PHDgkAsUs_O=`Bf~c$$k}s%H>xos!@KWpi1zo0#w(h?J`FE zojoUrJDFWXV+`9xb}UytyRJHzR@=#U!RW7PBzsf*0**NeOdxy+xU$dRY<^QtR z2+uIg75L^F+nI%y%ilW{)hNFjjw->gQc!8er{A)cCf+oO`OFZSNIeT@zBI>_@Z)$J zKb()|MeZ{9ckUd2l0VFM@LTy7ei^@z?-uHWXM`?FXdSJlGwBpsNHgh0+C|@?r|Aj$ zj494UP5Vu4rj4f6rW#YZsRWvu$uQ$`GvQfqIS3^t1>V|6%HU)RiG$CdB?*Ln3p>`4 z$xw^UuwoynVpT{eAn6dhk5s~YEyN60_ma3BkB~a}0u5+=ge1X79_v-Kw4f5O8W$fS zQDX2flfm3d65ye|q#RmXi4_hVA*DfGA{N5tBN)=Ti42L*dUHi*Le>#dV{~pJ#Zi)S zjQNsGVYb(rXEc7HcB>z#@2G!Jf1|#n9#x-Kcd6UeR&|}aQhi8WsNSp2RPR(LsW+<* zHA78S4k%A3KT*~yOO=JnJ<3$2SaB*Nl{m$uF!EP&xBMshb@>(fn7mJJlOL1U$PdZY z^4;=na*=G4Q{@<0k*-KzNL|vq(reN$rNfQVZfTpeL0Tz2AeBp{(qySX%9N~9lq86k z#eazB#k1n8q9HybJ}GV%*NMx-MdDm>nmAF+71PCdQ4?9=U&3F7_k}lvPT_fBztAqU z3XQnDZef;iyD(0$3u!{Epz>Gwi~L{s_xRu8Rye}%;kWaT@~im98oq)r<8R>$`7wMl zKLq#7f4I-M54pFwQ``yeAh(O#!nJVAxy9T(ZaVh^E{{v+;y6mKk&C2@yhBct6XY54 z1bLh^kvdXMW|ArTr~3Q)>-x+3QGJiTO<%7+tS{E*>QnUzxjM*B=Vubt6; z)2O|m?bq70joNChMl07!v`JcCxPQHG#Z5u)$X1`i%JBYCxS^!JVQD2v#Wh#G*6^VI z#*=%4%9Ewq7%jFmB&ZKHwWo=}e9H;S{v0y;d?>bG()n zt?6DX39ZRqD-o@+UMm4Dhu5;8mF~4-(Ms}KF=$15Efp=_02P@VLTc`9Y*gZ@sN9njD`Y_ zwF<3KK}Gmtka|8y9fPkAlTp0>HLF3|5%T6(f3gcO_r96B(DKbRA1&WZ^U(6mG}r$` zhVj{P@-iFEZ1Fdysp5I&eSEjW=NH&p;T0P(K}V-K+BkNUOkh=cv%J>#!h$Q$k+H_* z=SVlt(`t6F(RG|W%fkAv%%kCrugn=n*9kH_R(^ucL*L?jgPtLIETQw@Y~ipu#;&vE z6;;HBdF{%Es&fS45FQZ_5s?rX@wBT()wzhP+hO7|OCn6~rcv;dI_iX@2dOONI124H z<4he@S=h5XDG_Q@r6^Du=uO^s#oOLo+>#3R4r+l}4YU|0?xbVjbOZjl?qbJMIv*x} zYwB^ZX(=7;ZPk1ZN?KAP88&w|G+(08(6yA#hNw6#3C#O(H1{u~sR6FwXUphQKq_Or zzRYm{a%u}4Ol(Ugyb@)y7^jxg3leNCL_bZh5dmIn>6Ro3 zmJ(k_!k@K@PK9`=~0SjllRGKKceQOR2x6>m~GL!2W*rXjj$?Mq?CpXiLWW(wm6fq&1v3iI8$m~)rhPi?V zsnV*fQEHV+^-Z;lE0xi?zTwJ~Dkn_ca|cBauiHG6ac6h9WUYEHVtS zAaO`Ml7J){-Pe-hXN;CEF;Pq=qZeyyO(*CTIw7K2J*846lbc4KV7ui5;wj;6{@-|e z>Vo;D)-qVTr#IgpmRi&OOKK}Hx~5wNmUFbYAo12D3#=-&DzJLCH7ocL1uo3CS_7Vg z?itpS@Aj!Fu_k%@c*@sSfaRO#E8pWK*5ZIQ$1x}{rqr4i;LwOJv!3UMVeK4`V%Va! zb6X0$|K2mkl1ohJ8}ndCo&b8hD=9hASFmCQij}x%tY=+W+AhYImkW8 zTx1?HAGsGPM=Fp?qzb7v=p6TD_t*sH6GqR{=9)Irf6_HnkLZZ79;v6;(R;I3#`TIa4O z@X-Q$0zCM0cWU+>ix#-6R#Yu<7uQsmEve6{sxNfr=l6c1G*r~z?@V6g`7nFHUDM#M zhR7Eyli|#{YTl82Uqyqv-eGg(jj`p7vDuStw!$2HVQyaYxOUkWH8m)z3wD2AnF&oR z?2&zj&9e_StkhT7{O<|t%pGi4v9GXs-xJnRFxap{Utx2?O}Hn<~oDFYnH%1{@;IxE6l6VD)4gnFcTQW7Y(5RHsfP#4f)uE-f5R z&6c)@RPR|jGVtA}K=HP!Ot3xg9+B^GxIACJl@$$D3&Zs~wXGJ)-*iVCd){!*CnG$c zB|3ka(Z_15OuwN2q4^OF>aUe=l~Q?&^p<22Zx`S^XzFy*GGjWF$j>TKwI&uos*31{c_e{sRJj0~L>ur<|*H(p8mVE1lz zyV)052$)zsN%ZYq+ba9egq*x^cFPURQuoCOF3SX8Sw6Ou^uJT>VQrTiCRXbsvDqcK zaQ$8N?O`qE4dz?aZt}%U3Cd}#XTEXJ@vBPTe6#xay2*D8?p0LOPWuWS16vNci~0)a x^#5nj|MxXj)E?1SK=t}Xzpto{LB4S(w5z_TBZD#=aNH%}aTi7^E&W#f{{Zf-!YKd% delta 3718 zcmbuBdsJ0b8o?Vx$)zo;$;`}U6J;?Dy(gqBHN^QYk8>}7^w%ub@2>mXk8gkb z+h^ZHSsl=_=YUp4irY9&%Gwq+p5JyPCnMQXP4o+O< zV+r31_lz*(Ea!jt5ke_{N2}wGc8=k1qaCuX1#P!&zoAvyb_;E-Z8y;hZM%V%Yum4A z8Mgg`mTX(zb<{*#e@2V5?I$$1ZOv%*X83D?4R`htC3GaAp(6>g`hUyUi^KoRy+#bq zTt~hj7szRHlpG?v=>g_6ubP+4v*vNLmR8eE^l`eBX46zhquC5meIM=xbxsm)jcwu+ z`4@Iy<9VpP#*bs|g(l&FEZ}N0|2Qm|Y{tPSqfMP33Gs<$q;=#cev%@vKAhXS@f+Vt z1ha~(gj=nA-H_13A~9Vox{LJWYqk_)Cfj{W(1J1Z(SkAaXk>jz&@Ai> z2F--(3X&`ci?~9l8bf+z;5Rz~M+%xf){z{Vfa6~5vlnqp?lNyumw6MxJ%+dip^?i4 zqk`naB|mW*Vo@+OdmMiDleLCe6Vi4a%GQ%OLu?3Xy9UPsZH$Dp{a`h%CoaiggF_lu zfU}agGgu9mVHc?gEm*Qg(Sju_M6-9pmZRBKUUYA6hW*{4V8Pq)!eRnS&M_nXOZZ+d@30h4$WNhE8VbMp8(6X=uTHr=kVZ zn}!xl?;*5cdQ;JYi%#jXLPEf-x3`eHUDNdQyhy6TSrcnu7o{*s7H^8*iJyrd2zkN` z@h$On@p%!%KZ~oyW#S`Znm9=uDGm^Oiw@zo@T2gh@R4vvu!I+dJ;D~DRCpA&=9vBa z>8JEJgcN<3zFB`lFA&D)>H1`SlpZS#(iv+NJc23kJ99{soBrgu?0DDlmgAV?RmVZc zUPq%Iq37ra4X+UnQG9k}X&;BnA!;|Y@3hag54F>prPXMAwJNPbTd6J9GPP;iL~W!N zt3_&t#;ZT8U#b66-&gC@*VTjS(`uz!surm}b+-DTI!+y?_EUSRnsQrdQvR)6Qr=Nc zD6cB}m7U5aWwlbE%u_Oy6lJUuuS6-`6-mA+e_twJkuS(+^W`i# zRh}e|k_XCe*_4TNUHV%3R5~ZUCA}fNAnlg6NM%y7v_P68O_9b+!=)HwnlaHBX~Y`e z81=>n#wp`XlD!QJorjOEvG>cBB_sbdb19B23H&~9Ad&!0@NVlXW zvif)a3dlT?0ez$R0bz_rk`)C@3YM=bke_82>7IZZAFS zu(g&%@^8X8KZyXPf|xL-l1z8%QC&uRp=LdaaO$2et!`N4Ck60z1<8Y+6(q%E)Wdha zxpPZaz_YE+-qyzl$Y%mX)sQFPLJgT{D6~5%cw`<;om?Q?;vo7Z@&hcXAOj(>7Jr&L zd()B&7p=fMAcLQIuFm!@> zH%#N5u~7QOycDbGFiGO2_G@U}I6@kQwwYGLQBuppB=dUcKY?PG~L;_dseRbHP(ZYy^~DcNo@( zMNH)(?L86?atRv%O|J+Bo#2mwVb8JNFuR0JfR7fa9w>~FU2wL94Qd+!{0f!@%^&sb z=-9HNqeDse_wB~&zE%RC4EYd^VSFRO#QSTa`ZgaOpe8 zEr%@pon9eTfhYPt>&s27KY{!*_9?igbS_a;&c?Rqe!QHO1_H;!mXtm+EL(>gJ)bV< zntzDj?c(?c*H49EVR{ei$U62S54jaeNBW!nY)D(x;B7y<1gAE}cclN$dKTSY&YSBQ z<>6FRpLj?($u#(O12f^SVPRf4{Jlf57H?!GM*KzeKvf;<1_w7`#6Za{kdIX(FN6`K> zoF1Un>S^_;dPv>f{_mfa(PY;0{fxuLeq)ER!FY_;2i_j^9IZ3*<$uVB<^4vcyu);v z!qH~)q9z#OTr%WFIPdb51g=Bp4Ht^B!)@C#h`4Ruah$drQlQ14vzF?nnKgL>ElA~NZqcNIIp5`0XnR2sqM!Xet z(_nj|)KT@9Q@tu|XqEaxPO{*D)al-=kTYrPJ$OqR(B2!L>Yd(oZfyHA5tiZojJ|Ma zwwH%=SA+)+&+>MM%X7RPw{*Ba5r(FEqk*M+hja~~LSees+ZJV`KG%yxA$^f(BnIh+ z+=cW<1|YG>KqL+sgbYUFks(L|G87qx3`hQej6g;rqma=^A~FUUi`