diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Init.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Init.cs index e7c0d95..3a740da 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Init.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Init.cs @@ -265,6 +265,22 @@ public sealed partial class NatsServer return true; } + /// + /// Returns S2 writer options for the selected route compression mode. + /// Mirrors Go s2WriterOptions. + /// + internal static string[]? S2WriterOptions(string cm) + { + var opts = new List { "writer_concurrency=1" }; + return cm switch + { + CompressionMode.S2Uncompressed => [.. opts, "writer_uncompressed"], + CompressionMode.S2Best => [.. opts, "writer_best_compression"], + CompressionMode.S2Better => [.. opts, "writer_better_compression"], + _ => null, + }; + } + // ========================================================================= // Factory methods (features 2983–2985) // ========================================================================= @@ -509,20 +525,27 @@ public sealed partial class NatsServer /// Background loop that logs TLS rate-limited connection rejections every second. /// Mirrors Go Server.logRejectedTLSConns. /// - internal async Task LogRejectedTlsConnsAsync(CancellationToken ct) + internal Task LogRejectedTlsConnsAsync(CancellationToken ct) => + LogRejectedTLSConns(ct); + + /// + /// Background loop that logs TLS rate-limited connection rejections every second. + /// Mirrors Go Server.logRejectedTLSConns. + /// + internal async Task LogRejectedTLSConns(CancellationToken ct, TimeSpan? interval = null) { - using var timer = new PeriodicTimer(TimeSpan.FromSeconds(1)); + using var timer = new PeriodicTimer(interval ?? TimeSpan.FromSeconds(1)); while (!ct.IsCancellationRequested) { - try { await timer.WaitForNextTickAsync(ct); } - catch (OperationCanceledException) { break; } - if (_connRateCounter is not null) { var blocked = _connRateCounter.CountBlocked(); if (blocked > 0) Warnf("Rejected {0} connections due to TLS rate limiting", blocked); } + + try { await timer.WaitForNextTickAsync(ct); } + catch (OperationCanceledException) { break; } } } diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Lifecycle.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Lifecycle.cs index 92e403e..5619d02 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Lifecycle.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Lifecycle.cs @@ -178,7 +178,13 @@ public sealed partial class NatsServer /// Returns true if the goroutine was started, false if the server is already stopped. /// Mirrors Go Server.startGoRoutine(f). /// - internal bool StartGoRoutine(Action f) + internal bool StartGoRoutine(Action f) => StartGoRoutine(f, []); + + /// + /// Starts a background Task with goroutine labels. + /// Mirrors Go Server.startGoRoutine(f, tags...). + /// + internal bool StartGoRoutine(Action f, params IReadOnlyDictionary[] tags) { lock (_grMu) { @@ -186,13 +192,41 @@ public sealed partial class NatsServer _grWg.Add(1); Task.Run(() => { - try { f(); } + try + { + SetGoRoutineLabels(tags); + f(); + } finally { _grWg.Done(); } }); return true; } } + /// + /// Optional test-only sink used to observe goroutine labels during unit tests. + /// + internal static Action>>? SetGoRoutineLabelsHookForTest { get; set; } + + /// + /// Sets goroutine labels for diagnostics when tags are present. + /// Mirrors Go setGoRoutineLabels. + /// + internal static void SetGoRoutineLabels(params IReadOnlyDictionary[] tags) + { + var labels = new List>(); + foreach (var tag in tags) + { + foreach (var pair in tag) + { + labels.Add(new KeyValuePair(pair.Key, pair.Value)); + } + } + + if (labels.Count > 0) + SetGoRoutineLabelsHookForTest?.Invoke(labels); + } + // ========================================================================= // Client / connection management (features 3081–3084) // ========================================================================= diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Listeners.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Listeners.cs index 6056e61..7646a1e 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Listeners.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.Listeners.cs @@ -15,6 +15,7 @@ // Session 10: accept loops, client creation, connect URL management, port helpers. using System.Net; +using System.Net.Security; using System.Net.Sockets; using System.Security.Cryptography; using System.Text.Json; @@ -461,6 +462,41 @@ public sealed partial class NatsServer private string BasePath(string p) => string.IsNullOrEmpty(_httpBasePath) ? p : System.IO.Path.Combine(_httpBasePath, p.TrimStart('/')); + /// + /// Returns a monitoring TLS config cloned from server TLS config, with client certs disabled. + /// Mirrors Go Server.getMonitoringTLSConfig(). + /// + internal SslServerAuthenticationOptions? GetMonitoringTLSConfig() + { + var opts = GetOpts(); + if (opts.TlsConfig == null) + return null; + + var clone = new SslServerAuthenticationOptions + { + EnabledSslProtocols = opts.TlsConfig.EnabledSslProtocols, + AllowRenegotiation = opts.TlsConfig.AllowRenegotiation, + CertificateRevocationCheckMode = opts.TlsConfig.CertificateRevocationCheckMode, + CipherSuitesPolicy = opts.TlsConfig.CipherSuitesPolicy, + ServerCertificate = opts.TlsConfig.ServerCertificate, + ClientCertificateRequired = false, + EncryptionPolicy = opts.TlsConfig.EncryptionPolicy, + CertificateChainPolicy = opts.TlsConfig.CertificateChainPolicy, + }; + return clone; + } + + /// + /// Returns the monitoring handler object while monitoring is active, otherwise null. + /// Mirrors Go Server.HTTPHandler(). + /// + public object? HTTPHandler() + { + _mu.EnterReadLock(); + try { return _httpHandler; } + finally { _mu.ExitReadLock(); } + } + /// /// Starts the HTTP/HTTPS monitoring listener. /// Stub — full monitoring handler registration deferred to session 17. @@ -504,10 +540,12 @@ public sealed partial class NatsServer _mu.EnterWriteLock(); _http = httpListener; + _httpHandler = new object(); _mu.ExitWriteLock(); // Full HTTP handler registration in session 17; for now just drain the listener. - _ = Task.Run(() => + // Use a long-running task so teardown does not depend on thread-pool availability. + _ = Task.Factory.StartNew(() => { // Accept and immediately close connections until shutdown. while (!IsShuttingDown()) @@ -524,11 +562,11 @@ public sealed partial class NatsServer } _mu.EnterWriteLock(); - // Don't null _http — ProfilerAddr etc. still read it. + _httpHandler = null; _mu.ExitWriteLock(); _done.Writer.TryWrite(default); - }); + }, CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default); return null; } @@ -799,6 +837,26 @@ public sealed partial class NatsServer _ => (0, new ArgumentException($"unknown version: {ver}")), }; + /// + /// Handles TLS handshake timeout by closing the connection if auth is incomplete. + /// Mirrors Go tlsTimeout. + /// + internal static void TlsTimeout(ClientConnection c, SslStream conn) + { + lock (c) + { + if (c.IsClosed()) + return; + } + + if (!conn.IsAuthenticated) + { + c.Errorf("TLS handshake timeout"); + c.SendErr("Secure Connection - TLS Required"); + c.CloseConnection(ClosedState.TlsHandshakeError); + } + } + // ========================================================================= // Connect URL helpers (features 3074–3076) // ========================================================================= diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.cs b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.cs index 975b2e7..ae0aee9 100644 --- a/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.cs +++ b/dotnet/src/ZB.MOM.NatsNet.Server/NatsServer.cs @@ -210,6 +210,7 @@ public sealed partial class NatsServer : INatsServer private string _httpBasePath = string.Empty; private readonly Dictionary _httpReqStats = []; + private object? _httpHandler; // ========================================================================= // Client connect URLs diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/MonitoringHandlerTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/MonitoringHandlerTests.cs index 4c7e259..5fe4fe4 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/MonitoringHandlerTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ImplBacklog/MonitoringHandlerTests.cs @@ -1,5 +1,8 @@ using System.Security.Cryptography; using System.Security.Cryptography.X509Certificates; +using System.Net.Security; +using System.Net.Sockets; +using System.Reflection; using Shouldly; using ZB.MOM.NatsNet.Server; using ZB.MOM.NatsNet.Server.Internal; @@ -10,6 +13,99 @@ namespace ZB.MOM.NatsNet.Server.Tests.ImplBacklog; public sealed class MonitoringHandlerTests { + [Fact] + public void GetMonitoringTLSConfig_WithServerTlsConfig_DisablesClientCertificateRequirementOnClone() + { + var (certFile, keyFile, tempDir, _) = CreatePemCertificate(DateTimeOffset.UtcNow.AddMinutes(10)); + var (tlsOpts, parseErr) = ServerOptions.ParseTLS( + new Dictionary + { + ["cert_file"] = certFile, + ["key_file"] = keyFile, + ["verify"] = true, + }, + isClientCtx: false); + parseErr.ShouldBeNull(); + tlsOpts.ShouldNotBeNull(); + + var (tlsConfig, tlsErr) = ServerOptions.GenTLSConfig(tlsOpts!); + tlsErr.ShouldBeNull(); + tlsConfig.ShouldNotBeNull(); + + var opts = new ServerOptions { TlsConfig = tlsConfig }; + + var (server, error) = NatsServer.NewServer(opts); + + error.ShouldBeNull(); + server.ShouldNotBeNull(); + + var monitoringTls = server!.GetMonitoringTLSConfig(); + monitoringTls.ShouldNotBeNull(); + monitoringTls!.ClientCertificateRequired.ShouldBeFalse(); + monitoringTls.CertificateRevocationCheckMode.ShouldBe(opts.TlsConfig!.CertificateRevocationCheckMode); + opts.TlsConfig!.ClientCertificateRequired.ShouldBeTrue(); + + Directory.Delete(tempDir, recursive: true); + } + + [Fact] + public void HTTPHandler_WhenMonitoringListenerStops_TransitionsToNull() + { + var opts = new ServerOptions + { + HttpHost = "127.0.0.1", + HttpPort = -1, + }; + var (server, error) = NatsServer.NewServer(opts); + error.ShouldBeNull(); + server.ShouldNotBeNull(); + + server!.HTTPHandler().ShouldBeNull(); + + var startError = server.StartMonitoring(); + startError.ShouldBeNull(); + server.HTTPHandler().ShouldNotBeNull(); + + var listenerField = typeof(NatsServer).GetField("_http", BindingFlags.Instance | BindingFlags.NonPublic); + listenerField.ShouldNotBeNull(); + var listener = listenerField!.GetValue(server).ShouldBeOfType(); + listener.Stop(); + + var transitioned = SpinWait.SpinUntil(() => server.HTTPHandler() == null, TimeSpan.FromSeconds(5)); + transitioned.ShouldBeTrue(); + server.HTTPHandler().ShouldBeNull(); + } + + [Fact] + public async Task LogRejectedTLSConns_WhenRateCounterHasBlockedConnections_EmitsWarning() + { + var opts = new ServerOptions { TlsRateLimit = 1 }; + var (server, error) = NatsServer.NewServer(opts); + error.ShouldBeNull(); + server.ShouldNotBeNull(); + + var logger = new MonitoringCaptureLogger(); + server!.SetLogger(logger, debugFlag: false, traceFlag: false); + + var rateCounterField = typeof(NatsServer).GetField("_connRateCounter", BindingFlags.Instance | BindingFlags.NonPublic); + rateCounterField.ShouldNotBeNull(); + var rateCounter = rateCounterField!.GetValue(server).ShouldBeOfType(); + + rateCounter.Allow(); + rateCounter.Allow(); + + using var cts = new CancellationTokenSource(); + var loop = server.LogRejectedTLSConns(cts.Token, TimeSpan.FromMilliseconds(10)); + + var warned = SpinWait.SpinUntil( + () => logger.WarningEntries.Any(w => w.Contains("Rejected", StringComparison.OrdinalIgnoreCase)), + TimeSpan.FromSeconds(2)); + cts.Cancel(); + await loop; + + warned.ShouldBeTrue(); + } + [Fact] // T:2108 public void MonitorConnzClosedConnsBadTLSClient_ShouldSucceed() { @@ -228,6 +324,18 @@ public sealed class MonitoringHandlerTests return (certFile, keyFile, tempDir, notAfter); } + private sealed class MonitoringCaptureLogger : INatsLogger + { + public List WarningEntries { get; } = []; + + public void Noticef(string format, params object[] args) { } + public void Warnf(string format, params object[] args) => WarningEntries.Add(string.Format(format, args)); + public void Fatalf(string format, params object[] args) { } + public void Errorf(string format, params object[] args) { } + public void Debugf(string format, params object[] args) { } + public void Tracef(string format, params object[] args) { } + } + [Fact] // T:2065 public void MonitorNoPort_ShouldSucceed() { diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ServerTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ServerTests.cs index 649e6f1..6bcd24b 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ServerTests.cs +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/ServerTests.cs @@ -17,6 +17,8 @@ using System.Text; using System.Text.Json; using System.Text.RegularExpressions; +using System.Net.Security; +using System.Threading; using NSubstitute; using NSubstitute.ExceptionExtensions; using Shouldly; @@ -280,6 +282,27 @@ public sealed class ServerTests [Fact] public void NeedsCompression_S2Fast_ReturnsTrue() => NatsServer.NeedsCompression(CompressionMode.S2Fast).ShouldBeTrue(); + + [Theory] + [InlineData(CompressionMode.S2Uncompressed, "writer_concurrency=1", "writer_uncompressed")] + [InlineData(CompressionMode.S2Best, "writer_concurrency=1", "writer_best_compression")] + [InlineData(CompressionMode.S2Better, "writer_concurrency=1", "writer_better_compression")] + public void S2WriterOptions_KnownModes_ReturnExpectedOptions( + string mode, + string expectedFirst, + string expectedSecond) + { + var options = NatsServer.S2WriterOptions(mode); + + options.ShouldNotBeNull(); + options!.ShouldBe([expectedFirst, expectedSecond]); + } + + [Fact] + public void S2WriterOptions_UnsupportedMode_ReturnsNull() + { + NatsServer.S2WriterOptions(CompressionMode.S2Fast).ShouldBeNull(); + } } // ============================================================================= @@ -292,6 +315,57 @@ public sealed class ServerTests /// public sealed class ServerListenersTests { + [Fact] + public void TlsTimeout_IncompleteHandshake_ClosesConnection() + { + var c = new ClientConnection(ClientKind.Client, nc: new MemoryStream()); + using var tls = new SslStream(new MemoryStream(), leaveInnerStreamOpen: false); + + c.IsClosed().ShouldBeFalse(); + + NatsServer.TlsTimeout(c, tls); + + c.IsClosed().ShouldBeTrue(); + } + + [Fact] + public void StartGoRoutine_WithLabels_InvokesSetGoRoutineLabels() + { + var (s, err) = NatsServer.NewServer(new ServerOptions()); + err.ShouldBeNull(); + s.ShouldNotBeNull(); + s!.Start(); + + var signal = new ManualResetEventSlim(false); + IReadOnlyList>? observed = null; + + NatsServer.SetGoRoutineLabelsHookForTest = labels => + { + observed = labels; + signal.Set(); + }; + + try + { + var started = s.StartGoRoutine( + () => { }, + new Dictionary { ["component"] = "server", ["loop"] = "tls" }); + + started.ShouldBeTrue(); + signal.Wait(TimeSpan.FromSeconds(2)).ShouldBeTrue(); + + observed.ShouldNotBeNull(); + observed!.ShouldContain(kv => kv.Key == "component" && kv.Value == "server"); + observed.ShouldContain(kv => kv.Key == "loop" && kv.Value == "tls"); + } + finally + { + NatsServer.SetGoRoutineLabelsHookForTest = null; + s.Shutdown(); + s.WaitForShutdown(); + } + } + // ========================================================================= // GenerateInfoJson (feature 3069) — Test ID 2906 // Mirrors Go TestServerJsonMarshalNestedStructsPanic (guards against diff --git a/porting.db b/porting.db index d5700d7..028355a 100644 Binary files a/porting.db and b/porting.db differ