From f8d384711d36bf05892a2b628c906b8a1c53e337 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 28 Feb 2026 19:17:53 -0500 Subject: [PATCH] feat(batch18): implement group-a server core helpers --- .../ZB.MOM.NatsNet.Server/NatsServer.Init.cs | 33 +++++- .../NatsServer.Lifecycle.cs | 38 +++++- .../NatsServer.Listeners.cs | 64 ++++++++++- .../src/ZB.MOM.NatsNet.Server/NatsServer.cs | 1 + .../ImplBacklog/MonitoringHandlerTests.cs | 108 ++++++++++++++++++ .../ServerTests.cs | 74 ++++++++++++ porting.db | Bin 6660096 -> 6660096 bytes 7 files changed, 308 insertions(+), 10 deletions(-) diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Init.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Init.cs index e7c0d95..3a740da 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Init.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Init.cs @@ -265,6 +265,22 @@ public sealed partial class NatsServer return true; } + /// + /// Returns S2 writer options for the selected route compression mode. + /// Mirrors Go s2WriterOptions. + /// + internal static string[]? S2WriterOptions(string cm) + { + var opts = new List { "writer_concurrency=1" }; + return cm switch + { + CompressionMode.S2Uncompressed => [.. opts, "writer_uncompressed"], + CompressionMode.S2Best => [.. opts, "writer_best_compression"], + CompressionMode.S2Better => [.. opts, "writer_better_compression"], + _ => null, + }; + } + // ========================================================================= // Factory methods (features 2983–2985) // ========================================================================= @@ -509,20 +525,27 @@ public sealed partial class NatsServer /// Background loop that logs TLS rate-limited connection rejections every second. /// Mirrors Go Server.logRejectedTLSConns. /// - internal async Task LogRejectedTlsConnsAsync(CancellationToken ct) + internal Task LogRejectedTlsConnsAsync(CancellationToken ct) => + LogRejectedTLSConns(ct); + + /// + /// Background loop that logs TLS rate-limited connection rejections every second. + /// Mirrors Go Server.logRejectedTLSConns. + /// + internal async Task LogRejectedTLSConns(CancellationToken ct, TimeSpan? interval = null) { - using var timer = new PeriodicTimer(TimeSpan.FromSeconds(1)); + using var timer = new PeriodicTimer(interval ?? TimeSpan.FromSeconds(1)); while (!ct.IsCancellationRequested) { - try { await timer.WaitForNextTickAsync(ct); } - catch (OperationCanceledException) { break; } - if (_connRateCounter is not null) { var blocked = _connRateCounter.CountBlocked(); if (blocked > 0) Warnf("Rejected {0} connections due to TLS rate limiting", blocked); } + + try { await timer.WaitForNextTickAsync(ct); } + catch (OperationCanceledException) { break; } } } diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Lifecycle.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Lifecycle.cs index 92e403e..5619d02 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Lifecycle.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Lifecycle.cs @@ -178,7 +178,13 @@ public sealed partial class NatsServer /// Returns true if the goroutine was started, false if the server is already stopped. /// Mirrors Go Server.startGoRoutine(f). /// - internal bool StartGoRoutine(Action f) + internal bool StartGoRoutine(Action f) => StartGoRoutine(f, []); + + /// + /// Starts a background Task with goroutine labels. + /// Mirrors Go Server.startGoRoutine(f, tags...). + /// + internal bool StartGoRoutine(Action f, params IReadOnlyDictionary[] tags) { lock (_grMu) { @@ -186,13 +192,41 @@ public sealed partial class NatsServer _grWg.Add(1); Task.Run(() => { - try { f(); } + try + { + SetGoRoutineLabels(tags); + f(); + } finally { _grWg.Done(); } }); return true; } } + /// + /// Optional test-only sink used to observe goroutine labels during unit tests. + /// + internal static Action>>? SetGoRoutineLabelsHookForTest { get; set; } + + /// + /// Sets goroutine labels for diagnostics when tags are present. + /// Mirrors Go setGoRoutineLabels. + /// + internal static void SetGoRoutineLabels(params IReadOnlyDictionary[] tags) + { + var labels = new List>(); + foreach (var tag in tags) + { + foreach (var pair in tag) + { + labels.Add(new KeyValuePair(pair.Key, pair.Value)); + } + } + + if (labels.Count > 0) + SetGoRoutineLabelsHookForTest?.Invoke(labels); + } + // ========================================================================= // Client / connection management (features 3081–3084) // ========================================================================= diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Listeners.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Listeners.cs index 6056e61..7646a1e 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Listeners.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Listeners.cs @@ -15,6 +15,7 @@ // Session 10: accept loops, client creation, connect URL management, port helpers. using System.Net; +using System.Net.Security; using System.Net.Sockets; using System.Security.Cryptography; using System.Text.Json; @@ -461,6 +462,41 @@ public sealed partial class NatsServer private string BasePath(string p) => string.IsNullOrEmpty(_httpBasePath) ? p : System.IO.Path.Combine(_httpBasePath, p.TrimStart('/')); + /// + /// Returns a monitoring TLS config cloned from server TLS config, with client certs disabled. + /// Mirrors Go Server.getMonitoringTLSConfig(). + /// + internal SslServerAuthenticationOptions? GetMonitoringTLSConfig() + { + var opts = GetOpts(); + if (opts.TlsConfig == null) + return null; + + var clone = new SslServerAuthenticationOptions + { + EnabledSslProtocols = opts.TlsConfig.EnabledSslProtocols, + AllowRenegotiation = opts.TlsConfig.AllowRenegotiation, + CertificateRevocationCheckMode = opts.TlsConfig.CertificateRevocationCheckMode, + CipherSuitesPolicy = opts.TlsConfig.CipherSuitesPolicy, + ServerCertificate = opts.TlsConfig.ServerCertificate, + ClientCertificateRequired = false, + EncryptionPolicy = opts.TlsConfig.EncryptionPolicy, + CertificateChainPolicy = opts.TlsConfig.CertificateChainPolicy, + }; + return clone; + } + + /// + /// Returns the monitoring handler object while monitoring is active, otherwise null. + /// Mirrors Go Server.HTTPHandler(). + /// + public object? HTTPHandler() + { + _mu.EnterReadLock(); + try { return _httpHandler; } + finally { _mu.ExitReadLock(); } + } + /// /// Starts the HTTP/HTTPS monitoring listener. /// Stub — full monitoring handler registration deferred to session 17. @@ -504,10 +540,12 @@ public sealed partial class NatsServer _mu.EnterWriteLock(); _http = httpListener; + _httpHandler = new object(); _mu.ExitWriteLock(); // Full HTTP handler registration in session 17; for now just drain the listener. - _ = Task.Run(() => + // Use a long-running task so teardown does not depend on thread-pool availability. + _ = Task.Factory.StartNew(() => { // Accept and immediately close connections until shutdown. while (!IsShuttingDown()) @@ -524,11 +562,11 @@ public sealed partial class NatsServer } _mu.EnterWriteLock(); - // Don't null _http — ProfilerAddr etc. still read it. + _httpHandler = null; _mu.ExitWriteLock(); _done.Writer.TryWrite(default); - }); + }, CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default); return null; } @@ -799,6 +837,26 @@ public sealed partial class NatsServer _ => (0, new ArgumentException($"unknown version: {ver}")), }; + /// + /// Handles TLS handshake timeout by closing the connection if auth is incomplete. + /// Mirrors Go tlsTimeout. + /// + internal static void TlsTimeout(ClientConnection c, SslStream conn) + { + lock (c) + { + if (c.IsClosed()) + return; + } + + if (!conn.IsAuthenticated) + { + c.Errorf("TLS handshake timeout"); + c.SendErr("Secure Connection - TLS Required"); + c.CloseConnection(ClosedState.TlsHandshakeError); + } + } + // ========================================================================= // Connect URL helpers (features 3074–3076) // ========================================================================= diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.cs index 975b2e7..ae0aee9 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.cs @@ -210,6 +210,7 @@ public sealed partial class NatsServer : INatsServer private string _httpBasePath = string.Empty; private readonly Dictionary _httpReqStats = []; + private object? _httpHandler; // ========================================================================= // Client connect URLs diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/MonitoringHandlerTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/MonitoringHandlerTests.cs index 4c7e259..5fe4fe4 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/MonitoringHandlerTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/MonitoringHandlerTests.cs @@ -1,5 +1,8 @@ using System.Security.Cryptography; using System.Security.Cryptography.X509Certificates; +using System.Net.Security; +using System.Net.Sockets; +using System.Reflection; using Shouldly; using ZB.MOM.NatsNet.Server; using ZB.MOM.NatsNet.Server.Internal; @@ -10,6 +13,99 @@ namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; public sealed class MonitoringHandlerTests { + [Fact] + public void GetMonitoringTLSConfig_WithServerTlsConfig_DisablesClientCertificateRequirementOnClone() + { + var (certFile, keyFile, tempDir, _) = CreatePemCertificate(DateTimeOffset.UtcNow.AddMinutes(10)); + var (tlsOpts, parseErr) = ServerOptions.ParseTLS( + new Dictionary + { + ["cert_file"] = certFile, + ["key_file"] = keyFile, + ["verify"] = true, + }, + isClientCtx: false); + parseErr.ShouldBeNull(); + tlsOpts.ShouldNotBeNull(); + + var (tlsConfig, tlsErr) = ServerOptions.GenTLSConfig(tlsOpts!); + tlsErr.ShouldBeNull(); + tlsConfig.ShouldNotBeNull(); + + var opts = new ServerOptions { TlsConfig = tlsConfig }; + + var (server, error) = NatsServer.NewServer(opts); + + error.ShouldBeNull(); + server.ShouldNotBeNull(); + + var monitoringTls = server!.GetMonitoringTLSConfig(); + monitoringTls.ShouldNotBeNull(); + monitoringTls!.ClientCertificateRequired.ShouldBeFalse(); + monitoringTls.CertificateRevocationCheckMode.ShouldBe(opts.TlsConfig!.CertificateRevocationCheckMode); + opts.TlsConfig!.ClientCertificateRequired.ShouldBeTrue(); + + Directory.Delete(tempDir, recursive: true); + } + + [Fact] + public void HTTPHandler_WhenMonitoringListenerStops_TransitionsToNull() + { + var opts = new ServerOptions + { + HttpHost = "127.0.0.1", + HttpPort = -1, + }; + var (server, error) = NatsServer.NewServer(opts); + error.ShouldBeNull(); + server.ShouldNotBeNull(); + + server!.HTTPHandler().ShouldBeNull(); + + var startError = server.StartMonitoring(); + startError.ShouldBeNull(); + server.HTTPHandler().ShouldNotBeNull(); + + var listenerField = typeof(NatsServer).GetField("_http", BindingFlags.Instance | BindingFlags.NonPublic); + listenerField.ShouldNotBeNull(); + var listener = listenerField!.GetValue(server).ShouldBeOfType(); + listener.Stop(); + + var transitioned = SpinWait.SpinUntil(() => server.HTTPHandler() == null, TimeSpan.FromSeconds(5)); + transitioned.ShouldBeTrue(); + server.HTTPHandler().ShouldBeNull(); + } + + [Fact] + public async Task LogRejectedTLSConns_WhenRateCounterHasBlockedConnections_EmitsWarning() + { + var opts = new ServerOptions { TlsRateLimit = 1 }; + var (server, error) = NatsServer.NewServer(opts); + error.ShouldBeNull(); + server.ShouldNotBeNull(); + + var logger = new MonitoringCaptureLogger(); + server!.SetLogger(logger, debugFlag: false, traceFlag: false); + + var rateCounterField = typeof(NatsServer).GetField("_connRateCounter", BindingFlags.Instance | BindingFlags.NonPublic); + rateCounterField.ShouldNotBeNull(); + var rateCounter = rateCounterField!.GetValue(server).ShouldBeOfType(); + + rateCounter.Allow(); + rateCounter.Allow(); + + using var cts = new CancellationTokenSource(); + var loop = server.LogRejectedTLSConns(cts.Token, TimeSpan.FromMilliseconds(10)); + + var warned = SpinWait.SpinUntil( + () => logger.WarningEntries.Any(w => w.Contains("Rejected", StringComparison.OrdinalIgnoreCase)), + TimeSpan.FromSeconds(2)); + cts.Cancel(); + await loop; + + warned.ShouldBeTrue(); + } + [Fact] // T:2108 public void MonitorConnzClosedConnsBadTLSClient_ShouldSucceed() { @@ -228,6 +324,18 @@ public sealed class MonitoringHandlerTests return (certFile, keyFile, tempDir, notAfter); } + private sealed class MonitoringCaptureLogger : INatsLogger + { + public List WarningEntries { get; } = []; + + public void Noticef(string format, params object[] args) { } + public void Warnf(string format, params object[] args) => WarningEntries.Add(string.Format(format, args)); + public void Fatalf(string format, params object[] args) { } + public void Errorf(string format, params object[] args) { } + public void Debugf(string format, params object[] args) { } + public void Tracef(string format, params object[] args) { } + } + [Fact] // T:2065 public void MonitorNoPort_ShouldSucceed() { diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ServerTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ServerTests.cs index 649e6f1..6bcd24b 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ServerTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ServerTests.cs @@ -17,6 +17,8 @@ using System.Text; using System.Text.Json; using System.Text.RegularExpressions; +using System.Net.Security; +using System.Threading; using NSubstitute; using NSubstitute.ExceptionExtensions; using Shouldly; @@ -280,6 +282,27 @@ public sealed class ServerTests [Fact] public void NeedsCompression_S2Fast_ReturnsTrue() => NatsServer.NeedsCompression(CompressionMode.S2Fast).ShouldBeTrue(); + + [Theory] + [InlineData(CompressionMode.S2Uncompressed, "writer_concurrency=1", "writer_uncompressed")] + [InlineData(CompressionMode.S2Best, "writer_concurrency=1", "writer_best_compression")] + [InlineData(CompressionMode.S2Better, "writer_concurrency=1", "writer_better_compression")] + public void S2WriterOptions_KnownModes_ReturnExpectedOptions( + string mode, + string expectedFirst, + string expectedSecond) + { + var options = NatsServer.S2WriterOptions(mode); + + options.ShouldNotBeNull(); + options!.ShouldBe([expectedFirst, expectedSecond]); + } + + [Fact] + public void S2WriterOptions_UnsupportedMode_ReturnsNull() + { + NatsServer.S2WriterOptions(CompressionMode.S2Fast).ShouldBeNull(); + } } // ============================================================================= @@ -292,6 +315,57 @@ public sealed class ServerTests /// public sealed class ServerListenersTests { + [Fact] + public void TlsTimeout_IncompleteHandshake_ClosesConnection() + { + var c = new ClientConnection(ClientKind.Client, nc: new MemoryStream()); + using var tls = new SslStream(new MemoryStream(), leaveInnerStreamOpen: false); + + c.IsClosed().ShouldBeFalse(); + + NatsServer.TlsTimeout(c, tls); + + c.IsClosed().ShouldBeTrue(); + } + + [Fact] + public void StartGoRoutine_WithLabels_InvokesSetGoRoutineLabels() + { + var (s, err) = NatsServer.NewServer(new ServerOptions()); + err.ShouldBeNull(); + s.ShouldNotBeNull(); + s!.Start(); + + var signal = new ManualResetEventSlim(false); + IReadOnlyList>? observed = null; + + NatsServer.SetGoRoutineLabelsHookForTest = labels => + { + observed = labels; + signal.Set(); + }; + + try + { + var started = s.StartGoRoutine( + () => { }, + new Dictionary { ["component"] = "server", ["loop"] = "tls" }); + + started.ShouldBeTrue(); + signal.Wait(TimeSpan.FromSeconds(2)).ShouldBeTrue(); + + observed.ShouldNotBeNull(); + observed!.ShouldContain(kv => kv.Key == "component" && kv.Value == "server"); + observed.ShouldContain(kv => kv.Key == "loop" && kv.Value == "tls"); + } + finally + { + NatsServer.SetGoRoutineLabelsHookForTest = null; + s.Shutdown(); + s.WaitForShutdown(); + } + } + // ========================================================================= // GenerateInfoJson (feature 3069) — Test ID 2906 // Mirrors Go TestServerJsonMarshalNestedStructsPanic (guards against diff --git a/porting.db b/porting.db index d5700d7b48e87eb94a7434c167673ab5a6161031..028355a528248bc115fe82f4a4a753736467d400 100644 GIT binary patch delta 1031 zcmXZZUu;u#6bJC%y}$O}-rm3K%4Dl6n@%<oubAV z65EVWU*r-I&LR*hF}@&VQInrftRy5P=mR`J_8`U?vlmA)$U+RlF5fmhe3I|O$vMAs ze!0>qo+~>d{UQg7(i8=Y@?DxQ%5!4+JNd4@y8F<_pA1Tq8ebrnlhJAou(nE4kXp(~ ze}ojTq=!h+O1gj)7S1L4-c6Y?;yK>L``H9FHb^JMM2?@d?0m~uYZa4HX3|W_QaufM zyo36-@_HKiEo9JjkEar;ww3M6O~#nb(#+~HhmDK67ydxYho(czq#xCtEUg&9ErCmX zhEh(XS{tFm5%e5QP-c=x>7z4zo(8{)H`9r;e5q4ucVrcPj+&mMQj)G5k5`Mo&+-Y0 zzTSq(#kxf%T|39ygOyT|KF*`YsT*7oN|FEHTZ@-yti;S+#_4cv=$MiXHc7**WF0Io z_Jb(R@{6W3YSUs2NQ@pxnh9(s^7Fh|qVv7sdUCIFoqqRuYoI)!;QJf`5CjEO2!RG+ z(7^x`EaCfh&<%gDhM0Zd{zt8$@djtTtcGZ$%T9}9uJfrvv$tH64&HGsNue`;xa$hT zLk?6y1gaql3!ny`frU^jhKK6L{;RGW`!cf*n*;i|_8_#+ULvAytj2coePastYRub7 zv$6bR9oblxK0L&m=|(n|c)C$GR`9dWLcQ>_aoMRt!wCC=SwrTpMoPErwIY&>x7mSw zniBMQjMvgWFWyNJ9Eq-Z@%E>Eym-O)7C{3v!gKIEG{Fn77@DC4UW6r(fTfUxWzY)C zAq8!)0&sQjC3qQLfi%1dE8#U*1*@S0I-v{J!0Ye^tc7*Z4L$HCyanswZFmRXg$>XP z8(|al!Di@(EwB~dgKe-KcEC;;fL*W~_JAvVFB9A-&57V&nH7?u&uRTyceoV(NuQ>! zsm!+#eNr9QudC9(E7>3WiXJ$G-_uPBiX_28iOl2rP!_ZD!bwK%3l&VliDNwZF9gs8=e>%pW z1tL!A%?m5~L4&92W^kb~MneMjvLMn02?_>8gIqn?9~j##Tk&dH20t+Ak5_ zr2EQfn^mFzRqXDpx-+Z3_LXOUtN;0vZuLrD;c4E-&dQdQcFa3l;m6E4V=OR~9&4*- zT25p|PS={`?h(FPUOE~~$npng>OA`gkQX8t^qi zHgYl3>h>Bc$qs9#_AZ%GwXYM|M5iCQc_g{%zR2?<$(b{QlA!sZ6sQICFlYfN4O$3l z1uX(C1}y-SplzT&(00(v zpjSZspdFx{pjSc8I28j0ZORK?Eq1EjD|$c{96cqS$>M%lKB1doBWc)%9=qjDO%`QD i+b<9OUhI)?m269XQnHsEG_MyGyA-=U^ZIMkC;tQ4)=GB(