From a0f02d6641a6ceba463efb5b33bb06c7d9ea049a Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 23 Feb 2026 00:28:32 -0500 Subject: [PATCH 01/13] chore: add Serilog.Sinks.File and SyslogMessages packages --- Directory.Packages.props | 2 ++ src/NATS.Server.Host/NATS.Server.Host.csproj | 2 ++ 2 files changed, 4 insertions(+) diff --git a/Directory.Packages.props b/Directory.Packages.props index 60cb5b9..d235438 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -8,6 +8,8 @@ + + diff --git a/src/NATS.Server.Host/NATS.Server.Host.csproj b/src/NATS.Server.Host/NATS.Server.Host.csproj index 0d9510d..fedb2dd 100644 --- a/src/NATS.Server.Host/NATS.Server.Host.csproj +++ b/src/NATS.Server.Host/NATS.Server.Host.csproj @@ -11,6 +11,8 @@ + + From 573cd06bb1fc0458ad7d2f7a478afd13e5f56507 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 23 Feb 2026 00:29:45 -0500 Subject: [PATCH 02/13] feat: add logging and timestamp options to NatsOptions --- src/NATS.Server/NatsOptions.cs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/NATS.Server/NatsOptions.cs b/src/NATS.Server/NatsOptions.cs index 0aee913..f4a8c1a 100644 --- a/src/NATS.Server/NatsOptions.cs +++ b/src/NATS.Server/NatsOptions.cs @@ -47,6 +47,17 @@ public sealed class NatsOptions public string? PortsFileDir { get; set; } public string? ConfigFile { get; set; } + // Logging + public string? LogFile { get; set; } + public long LogSizeLimit { get; set; } + public int LogMaxFiles { get; set; } + public bool Debug { get; set; } + public bool Trace { get; set; } + public bool Logtime { get; set; } = true; + public bool LogtimeUTC { get; set; } + public bool Syslog { get; set; } + public string? RemoteSyslog { get; set; } + // Profiling (0 = disabled) public int ProfPort { get; set; } From eb25d52ed5ec000573ce8929279ea7b06503721c Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 23 Feb 2026 00:34:30 -0500 Subject: [PATCH 03/13] feat: add RTT tracking and first-PING delay to NatsClient --- src/NATS.Server/Monitoring/Connz.cs | 1 + src/NATS.Server/Monitoring/ConnzHandler.cs | 3 + src/NATS.Server/NatsClient.cs | 21 ++++ tests/NATS.Server.Tests/RttTests.cs | 123 +++++++++++++++++++++ 4 files changed, 148 insertions(+) create mode 100644 tests/NATS.Server.Tests/RttTests.cs diff --git a/src/NATS.Server/Monitoring/Connz.cs b/src/NATS.Server/Monitoring/Connz.cs index d2a6f49..2147715 100644 --- a/src/NATS.Server/Monitoring/Connz.cs +++ b/src/NATS.Server/Monitoring/Connz.cs @@ -168,6 +168,7 @@ public enum SortOpt ByLast, ByIdle, ByUptime, + ByRtt, } /// diff --git a/src/NATS.Server/Monitoring/ConnzHandler.cs b/src/NATS.Server/Monitoring/ConnzHandler.cs index 8ff0a3e..f532e7c 100644 --- a/src/NATS.Server/Monitoring/ConnzHandler.cs +++ b/src/NATS.Server/Monitoring/ConnzHandler.cs @@ -30,6 +30,7 @@ public sealed class ConnzHandler(NatsServer server) SortOpt.ByLast => connInfos.OrderByDescending(c => c.LastActivity).ToList(), SortOpt.ByIdle => connInfos.OrderByDescending(c => now - c.LastActivity).ToList(), SortOpt.ByUptime => connInfos.OrderByDescending(c => now - c.Start).ToList(), + SortOpt.ByRtt => connInfos.OrderBy(c => c.Rtt).ToList(), _ => connInfos.OrderBy(c => c.Cid).ToList(), }; @@ -73,6 +74,7 @@ public sealed class ConnzHandler(NatsServer server) Reason = client.CloseReason.ToReasonString(), TlsVersion = client.TlsState?.TlsVersion ?? "", TlsCipherSuite = client.TlsState?.CipherSuite ?? "", + Rtt = client.Rtt == TimeSpan.Zero ? "" : $"{client.Rtt.TotalMilliseconds:F3}ms", }; if (opts.Subscriptions) @@ -116,6 +118,7 @@ public sealed class ConnzHandler(NatsServer server) "last" => SortOpt.ByLast, "idle" => SortOpt.ByIdle, "uptime" => SortOpt.ByUptime, + "rtt" => SortOpt.ByRtt, _ => SortOpt.ByCid, }; } diff --git a/src/NATS.Server/NatsClient.cs b/src/NATS.Server/NatsClient.cs index b380560..a2bb19e 100644 --- a/src/NATS.Server/NatsClient.cs +++ b/src/NATS.Server/NatsClient.cs @@ -73,6 +73,11 @@ public sealed class NatsClient : IDisposable private int _pingsOut; private long _lastIn; + // RTT tracking + private long _rttStartTicks; + private long _rtt; + public TimeSpan Rtt => new(Interlocked.Read(ref _rtt)); + public TlsConnectionState? TlsState { get; set; } public bool InfoAlreadySent { get; set; } @@ -321,6 +326,14 @@ public sealed class NatsClient : IDisposable case CommandType.Pong: Interlocked.Exchange(ref _pingsOut, 0); + var rttStart = Interlocked.Read(ref _rttStartTicks); + if (rttStart > 0) + { + var elapsed = DateTime.UtcNow.Ticks - rttStart; + if (elapsed <= 0) elapsed = 1; // min 1 tick for Windows granularity + Interlocked.Exchange(ref _rtt, elapsed); + } + _flags.SetFlag(ClientFlags.FirstPongSent); break; case CommandType.Sub: @@ -611,6 +624,13 @@ public sealed class NatsClient : IDisposable { while (await timer.WaitForNextTickAsync(ct)) { + // Delay first PING until client has responded with PONG or 2 seconds elapsed + if (!_flags.HasFlag(ClientFlags.FirstPongSent) + && (DateTime.UtcNow - StartTime).TotalSeconds < 2) + { + continue; + } + var elapsed = Environment.TickCount64 - Interlocked.Read(ref _lastIn); if (elapsed < (long)_options.PingInterval.TotalMilliseconds) { @@ -629,6 +649,7 @@ public sealed class NatsClient : IDisposable var currentPingsOut = Interlocked.Increment(ref _pingsOut); _logger.LogDebug("Client {ClientId} sending PING ({PingsOut}/{MaxPingsOut})", Id, currentPingsOut, _options.MaxPingsOut); + Interlocked.Exchange(ref _rttStartTicks, DateTime.UtcNow.Ticks); WriteProtocol(NatsProtocol.PingBytes); } } diff --git a/tests/NATS.Server.Tests/RttTests.cs b/tests/NATS.Server.Tests/RttTests.cs new file mode 100644 index 0000000..9632cf5 --- /dev/null +++ b/tests/NATS.Server.Tests/RttTests.cs @@ -0,0 +1,123 @@ +using System.Net; +using System.Net.Http.Json; +using System.Net.Sockets; +using Microsoft.Extensions.Logging.Abstractions; +using NATS.Server.Monitoring; + +namespace NATS.Server.Tests; + +public class RttTests : IAsyncLifetime +{ + private readonly NatsServer _server; + private readonly int _natsPort; + private readonly int _monitorPort; + private readonly CancellationTokenSource _cts = new(); + private readonly HttpClient _http = new(); + + public RttTests() + { + _natsPort = GetFreePort(); + _monitorPort = GetFreePort(); + _server = new NatsServer( + new NatsOptions + { + Port = _natsPort, + MonitorPort = _monitorPort, + PingInterval = TimeSpan.FromMilliseconds(200), + MaxPingsOut = 4, + }, + NullLoggerFactory.Instance); + } + + public async Task InitializeAsync() + { + _ = _server.StartAsync(_cts.Token); + await _server.WaitForReadyAsync(); + for (int i = 0; i < 50; i++) + { + try + { + var resp = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/healthz"); + if (resp.IsSuccessStatusCode) break; + } + catch (HttpRequestException) { } + await Task.Delay(50); + } + } + + public async Task DisposeAsync() + { + _http.Dispose(); + await _cts.CancelAsync(); + _server.Dispose(); + } + + [Fact] + public async Task Rtt_populated_after_ping_pong_cycle() + { + using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + await sock.ConnectAsync(new IPEndPoint(IPAddress.Loopback, _natsPort)); + using var stream = new NetworkStream(sock); + var buf = new byte[4096]; + _ = await stream.ReadAsync(buf); // INFO + + // Send CONNECT + PING (triggers firstPongSent) + await stream.WriteAsync("CONNECT {}\r\nPING\r\n"u8.ToArray()); + await stream.FlushAsync(); + _ = await stream.ReadAsync(buf); // PONG + + // Wait for server's PING cycle + await Task.Delay(500); + + // Read server PING and respond with PONG + var received = new byte[4096]; + int totalRead = 0; + bool gotPing = false; + using var readCts = new CancellationTokenSource(2000); + while (!gotPing && !readCts.IsCancellationRequested) + { + var n = await stream.ReadAsync(received.AsMemory(totalRead), readCts.Token); + totalRead += n; + var text = System.Text.Encoding.ASCII.GetString(received, 0, totalRead); + if (text.Contains("PING")) + { + gotPing = true; + await stream.WriteAsync("PONG\r\n"u8.ToArray()); + await stream.FlushAsync(); + } + } + + gotPing.ShouldBeTrue("Server should have sent PING"); + + // Wait for RTT to be computed + await Task.Delay(200); + + var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/connz"); + var connz = await response.Content.ReadFromJsonAsync(); + connz.ShouldNotBeNull(); + var conn = connz.Conns.FirstOrDefault(c => c.Rtt != ""); + conn.ShouldNotBeNull("At least one connection should have RTT populated"); + } + + [Fact] + public async Task Connz_sort_by_rtt() + { + using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + await sock.ConnectAsync(new IPEndPoint(IPAddress.Loopback, _natsPort)); + using var stream = new NetworkStream(sock); + var buf = new byte[4096]; + _ = await stream.ReadAsync(buf); + await stream.WriteAsync("CONNECT {}\r\n"u8.ToArray()); + await Task.Delay(200); + + var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/connz?sort=rtt"); + response.StatusCode.ShouldBe(System.Net.HttpStatusCode.OK); + } + + private static int GetFreePort() + { + using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + sock.Bind(new IPEndPoint(IPAddress.Loopback, 0)); + return ((IPEndPoint)sock.LocalEndPoint!).Port; + } +} From cd4ae3cce6b2ea6954b57c041a6b604266435a04 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 23 Feb 2026 00:38:43 -0500 Subject: [PATCH 04/13] feat: add stale connection stats tracking and varz exposure --- src/NATS.Server/Monitoring/Varz.cs | 25 +++++++++++++++++++++ src/NATS.Server/Monitoring/VarzHandler.cs | 8 +++++++ src/NATS.Server/NatsClient.cs | 6 +++++ src/NATS.Server/ServerStats.cs | 4 ++++ tests/NATS.Server.Tests/ServerStatsTests.cs | 21 +++++++++++++++++ 5 files changed, 64 insertions(+) diff --git a/src/NATS.Server/Monitoring/Varz.cs b/src/NATS.Server/Monitoring/Varz.cs index 847bdc2..3e85374 100644 --- a/src/NATS.Server/Monitoring/Varz.cs +++ b/src/NATS.Server/Monitoring/Varz.cs @@ -157,6 +157,12 @@ public sealed class Varz [JsonPropertyName("slow_consumer_stats")] public SlowConsumersStats SlowConsumerStats { get; set; } = new(); + [JsonPropertyName("stale_connections")] + public long StaleConnections { get; set; } + + [JsonPropertyName("stale_connection_stats")] + public StaleConnectionStats StaleConnectionStatsDetail { get; set; } = new(); + [JsonPropertyName("subscriptions")] public uint Subscriptions { get; set; } @@ -219,6 +225,25 @@ public sealed class SlowConsumersStats public ulong Leafs { get; set; } } +/// +/// Statistics about stale connections by connection type. +/// Corresponds to Go server/monitor.go StaleConnectionStats struct. +/// +public sealed class StaleConnectionStats +{ + [JsonPropertyName("clients")] + public ulong Clients { get; set; } + + [JsonPropertyName("routes")] + public ulong Routes { get; set; } + + [JsonPropertyName("gateways")] + public ulong Gateways { get; set; } + + [JsonPropertyName("leafs")] + public ulong Leafs { get; set; } +} + /// /// Cluster configuration monitoring information. /// Corresponds to Go server/monitor.go ClusterOptsVarz struct. diff --git a/src/NATS.Server/Monitoring/VarzHandler.cs b/src/NATS.Server/Monitoring/VarzHandler.cs index c872b28..433aa84 100644 --- a/src/NATS.Server/Monitoring/VarzHandler.cs +++ b/src/NATS.Server/Monitoring/VarzHandler.cs @@ -91,6 +91,14 @@ public sealed class VarzHandler : IDisposable Gateways = (ulong)Interlocked.Read(ref stats.SlowConsumerGateways), Leafs = (ulong)Interlocked.Read(ref stats.SlowConsumerLeafs), }, + StaleConnections = Interlocked.Read(ref stats.StaleConnections), + StaleConnectionStatsDetail = new StaleConnectionStats + { + Clients = (ulong)Interlocked.Read(ref stats.StaleConnectionClients), + Routes = (ulong)Interlocked.Read(ref stats.StaleConnectionRoutes), + Gateways = (ulong)Interlocked.Read(ref stats.StaleConnectionGateways), + Leafs = (ulong)Interlocked.Read(ref stats.StaleConnectionLeafs), + }, Subscriptions = _server.SubList.Count, ConfigLoadTime = _server.StartTime, HttpReqStats = stats.HttpReqStats.ToDictionary(kv => kv.Key, kv => (ulong)kv.Value), diff --git a/src/NATS.Server/NatsClient.cs b/src/NATS.Server/NatsClient.cs index a2bb19e..b568536 100644 --- a/src/NATS.Server/NatsClient.cs +++ b/src/NATS.Server/NatsClient.cs @@ -682,6 +682,12 @@ public sealed class NatsClient : IDisposable break; } + if (reason == ClientClosedReason.StaleConnection) + { + Interlocked.Increment(ref _serverStats.StaleConnections); + Interlocked.Increment(ref _serverStats.StaleConnectionClients); + } + _logger.LogDebug("Client {ClientId} connection closed: {CloseReason}", Id, reason); } diff --git a/src/NATS.Server/ServerStats.cs b/src/NATS.Server/ServerStats.cs index b737dee..21ebd7d 100644 --- a/src/NATS.Server/ServerStats.cs +++ b/src/NATS.Server/ServerStats.cs @@ -16,5 +16,9 @@ public sealed class ServerStats public long SlowConsumerRoutes; public long SlowConsumerLeafs; public long SlowConsumerGateways; + public long StaleConnectionClients; + public long StaleConnectionRoutes; + public long StaleConnectionLeafs; + public long StaleConnectionGateways; public readonly ConcurrentDictionary HttpReqStats = new(); } diff --git a/tests/NATS.Server.Tests/ServerStatsTests.cs b/tests/NATS.Server.Tests/ServerStatsTests.cs index 6b45bd3..7baf061 100644 --- a/tests/NATS.Server.Tests/ServerStatsTests.cs +++ b/tests/NATS.Server.Tests/ServerStatsTests.cs @@ -75,6 +75,27 @@ public class ServerStatsTests : IAsyncLifetime client.StartTime.ShouldNotBe(default); } + [Fact] + public void StaleConnection_stats_incremented_on_mark_closed() + { + var stats = new ServerStats(); + stats.StaleConnectionClients.ShouldBe(0); + + Interlocked.Increment(ref stats.StaleConnectionClients); + stats.StaleConnectionClients.ShouldBe(1); + } + + [Fact] + public void StaleConnection_stats_all_fields_default_to_zero() + { + var stats = new ServerStats(); + stats.StaleConnections.ShouldBe(0); + stats.StaleConnectionClients.ShouldBe(0); + stats.StaleConnectionRoutes.ShouldBe(0); + stats.StaleConnectionLeafs.ShouldBe(0); + stats.StaleConnectionGateways.ShouldBe(0); + } + private static int GetFreePort() { using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); From b7c0e321d9acc970095c8a87745b8a872a69b750 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 23 Feb 2026 00:41:12 -0500 Subject: [PATCH 05/13] fix: move stale connection stat increments to detection site in RunPingTimerAsync --- src/NATS.Server/NatsClient.cs | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/NATS.Server/NatsClient.cs b/src/NATS.Server/NatsClient.cs index b568536..daace96 100644 --- a/src/NATS.Server/NatsClient.cs +++ b/src/NATS.Server/NatsClient.cs @@ -642,6 +642,8 @@ public sealed class NatsClient : IDisposable if (Volatile.Read(ref _pingsOut) + 1 > _options.MaxPingsOut) { _logger.LogDebug("Client {ClientId} stale connection -- closing", Id); + Interlocked.Increment(ref _serverStats.StaleConnections); + Interlocked.Increment(ref _serverStats.StaleConnectionClients); await SendErrAndCloseAsync(NatsProtocol.ErrStaleConnection, ClientClosedReason.StaleConnection); return; } @@ -682,12 +684,6 @@ public sealed class NatsClient : IDisposable break; } - if (reason == ClientClosedReason.StaleConnection) - { - Interlocked.Increment(ref _serverStats.StaleConnections); - Interlocked.Increment(ref _serverStats.StaleConnectionClients); - } - _logger.LogDebug("Client {ClientId} connection closed: {CloseReason}", Id, reason); } From cf75077bc41ac3f6112edf1c9182913c7a67d61b Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 23 Feb 2026 00:43:27 -0500 Subject: [PATCH 06/13] feat: add CLI flags for debug/trace modes, file logging, syslog, color, timestamps --- src/NATS.Server.Host/Program.cs | 89 ++++++++++++++++++++++++++++++--- 1 file changed, 82 insertions(+), 7 deletions(-) diff --git a/src/NATS.Server.Host/Program.cs b/src/NATS.Server.Host/Program.cs index 4dde566..f0d614c 100644 --- a/src/NATS.Server.Host/Program.cs +++ b/src/NATS.Server.Host/Program.cs @@ -1,15 +1,10 @@ using NATS.Server; using Serilog; - -Log.Logger = new LoggerConfiguration() - .MinimumLevel.Debug() - .Enrich.FromLogContext() - .WriteTo.Console(outputTemplate: "[{Timestamp:HH:mm:ss} {Level:u3}] {Message:lj}{NewLine}{Exception}") - .CreateLogger(); +using Serilog.Sinks.SystemConsole.Themes; var options = new NatsOptions(); -// Simple CLI argument parsing +// Parse ALL CLI flags into NatsOptions first for (int i = 0; i < args.Length; i++) { switch (args[i]) @@ -55,9 +50,89 @@ for (int i = 0; i < args.Length; i++) case "--tlsverify": options.TlsVerify = true; break; + case "-D": + options.Debug = true; + break; + case "-V" or "-T": + options.Trace = true; + break; + case "-DV": + options.Debug = true; + options.Trace = true; + break; + case "--log_file" when i + 1 < args.Length: + options.LogFile = args[++i]; + break; + case "--log_size_limit" when i + 1 < args.Length: + options.LogSizeLimit = long.Parse(args[++i]); + break; + case "--log_max_files" when i + 1 < args.Length: + options.LogMaxFiles = int.Parse(args[++i]); + break; + case "--logtime" when i + 1 < args.Length: + options.Logtime = bool.Parse(args[++i]); + break; + case "--logtime_utc": + options.LogtimeUTC = true; + break; + case "--syslog": + options.Syslog = true; + break; + case "--remote_syslog" when i + 1 < args.Length: + options.RemoteSyslog = args[++i]; + break; } } +// Build Serilog configuration from options +var logConfig = new LoggerConfiguration() + .Enrich.FromLogContext(); + +// Set minimum level based on flags +if (options.Trace) + logConfig.MinimumLevel.Verbose(); +else if (options.Debug) + logConfig.MinimumLevel.Debug(); +else + logConfig.MinimumLevel.Information(); + +// Build output template +var timestampFormat = options.LogtimeUTC + ? "{Timestamp:yyyy/MM/dd HH:mm:ss.ffffff} " + : "{Timestamp:HH:mm:ss} "; +var template = options.Logtime + ? $"[{timestampFormat}{{Level:u3}}] {{Message:lj}}{{NewLine}}{{Exception}}" + : "[{Level:u3}] {Message:lj}{NewLine}{Exception}"; + +// Console sink with color auto-detection +if (!Console.IsOutputRedirected) + logConfig.WriteTo.Console(outputTemplate: template, theme: AnsiConsoleTheme.Code); +else + logConfig.WriteTo.Console(outputTemplate: template); + +// File sink with rotation +if (!string.IsNullOrEmpty(options.LogFile)) +{ + logConfig.WriteTo.File( + options.LogFile, + fileSizeLimitBytes: options.LogSizeLimit > 0 ? options.LogSizeLimit : null, + retainedFileCountLimit: options.LogMaxFiles > 0 ? options.LogMaxFiles : null, + rollOnFileSizeLimit: options.LogSizeLimit > 0, + outputTemplate: template); +} + +// Syslog sink +if (!string.IsNullOrEmpty(options.RemoteSyslog)) +{ + logConfig.WriteTo.UdpSyslog(options.RemoteSyslog); +} +else if (options.Syslog) +{ + logConfig.WriteTo.LocalSyslog("nats-server"); +} + +Log.Logger = logConfig.CreateLogger(); + using var loggerFactory = new Serilog.Extensions.Logging.SerilogLoggerFactory(Log.Logger); using var server = new NatsServer(options, loggerFactory); From 345e7ca15cbef87ed61537b92032f93743ba8ad7 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 23 Feb 2026 00:46:09 -0500 Subject: [PATCH 07/13] feat: implement log reopening on SIGUSR1 signal --- src/NATS.Server.Host/Program.cs | 8 ++++++++ src/NATS.Server/NatsServer.cs | 6 ++++-- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/src/NATS.Server.Host/Program.cs b/src/NATS.Server.Host/Program.cs index f0d614c..3febf15 100644 --- a/src/NATS.Server.Host/Program.cs +++ b/src/NATS.Server.Host/Program.cs @@ -139,6 +139,14 @@ using var server = new NatsServer(options, loggerFactory); // Register signal handlers server.HandleSignals(); +server.ReOpenLogFile = () => +{ + Log.Information("Reopening log file"); + Log.CloseAndFlush(); + Log.Logger = logConfig.CreateLogger(); + Log.Information("File log re-opened"); +}; + // Ctrl+C triggers graceful shutdown Console.CancelKeyPress += (_, e) => { diff --git a/src/NATS.Server/NatsServer.cs b/src/NATS.Server/NatsServer.cs index 7bd3e76..c80fa74 100644 --- a/src/NATS.Server/NatsServer.cs +++ b/src/NATS.Server/NatsServer.cs @@ -61,6 +61,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable public string ServerNKey { get; } public bool IsShuttingDown => Volatile.Read(ref _shutdown) != 0; public bool IsLameDuckMode => Volatile.Read(ref _lameDuck) != 0; + public Action? ReOpenLogFile { get; set; } public IEnumerable GetClients() => _clients.Values; public Task WaitForReadyAsync() => _listeningStarted.Task; @@ -192,7 +193,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable /// /// Registers Unix signal handlers. - /// SIGTERM → shutdown, SIGUSR2 → lame duck, SIGUSR1 → log reopen (stub), SIGHUP → reload (stub). + /// SIGTERM → shutdown, SIGUSR2 → lame duck, SIGUSR1 → log reopen, SIGHUP → reload (stub). /// public void HandleSignals() { @@ -222,7 +223,8 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable _signalRegistrations.Add(PosixSignalRegistration.Create((PosixSignal)10, ctx => { ctx.Cancel = true; - _logger.LogWarning("Trapped SIGUSR1 signal — log reopen not yet supported"); + _logger.LogInformation("Trapped SIGUSR1 signal — reopening log file"); + ReOpenLogFile?.Invoke(); })); _signalRegistrations.Add(PosixSignalRegistration.Create((PosixSignal)12, ctx => From 1269ae82752a6410323088a164db4cd2d17cd776 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 23 Feb 2026 00:50:26 -0500 Subject: [PATCH 08/13] feat: implement /subz endpoint with account filter, test subject, and pagination --- src/NATS.Server/Monitoring/MonitorServer.cs | 10 +- src/NATS.Server/Monitoring/Subsz.cs | 45 +++++++ src/NATS.Server/Monitoring/SubszHandler.cs | 93 +++++++++++++ src/NATS.Server/NatsServer.cs | 2 + src/NATS.Server/Subscriptions/SubList.cs | 56 ++++++++ tests/NATS.Server.Tests/SubszTests.cs | 137 ++++++++++++++++++++ 6 files changed, 339 insertions(+), 4 deletions(-) create mode 100644 src/NATS.Server/Monitoring/Subsz.cs create mode 100644 src/NATS.Server/Monitoring/SubszHandler.cs create mode 100644 tests/NATS.Server.Tests/SubszTests.cs diff --git a/src/NATS.Server/Monitoring/MonitorServer.cs b/src/NATS.Server/Monitoring/MonitorServer.cs index af23506..ad06f91 100644 --- a/src/NATS.Server/Monitoring/MonitorServer.cs +++ b/src/NATS.Server/Monitoring/MonitorServer.cs @@ -15,6 +15,7 @@ public sealed class MonitorServer : IAsyncDisposable private readonly ILogger _logger; private readonly VarzHandler _varzHandler; private readonly ConnzHandler _connzHandler; + private readonly SubszHandler _subszHandler; public MonitorServer(NatsServer server, NatsOptions options, ServerStats stats, ILoggerFactory loggerFactory) { @@ -29,6 +30,7 @@ public sealed class MonitorServer : IAsyncDisposable _varzHandler = new VarzHandler(server, options); _connzHandler = new ConnzHandler(server); + _subszHandler = new SubszHandler(server); _app.MapGet(basePath + "/", () => { @@ -75,15 +77,15 @@ public sealed class MonitorServer : IAsyncDisposable stats.HttpReqStats.AddOrUpdate("/leafz", 1, (_, v) => v + 1); return Results.Ok(new { }); }); - _app.MapGet(basePath + "/subz", () => + _app.MapGet(basePath + "/subz", (HttpContext ctx) => { stats.HttpReqStats.AddOrUpdate("/subz", 1, (_, v) => v + 1); - return Results.Ok(new { }); + return Results.Ok(_subszHandler.HandleSubsz(ctx)); }); - _app.MapGet(basePath + "/subscriptionsz", () => + _app.MapGet(basePath + "/subscriptionsz", (HttpContext ctx) => { stats.HttpReqStats.AddOrUpdate("/subscriptionsz", 1, (_, v) => v + 1); - return Results.Ok(new { }); + return Results.Ok(_subszHandler.HandleSubsz(ctx)); }); _app.MapGet(basePath + "/accountz", () => { diff --git a/src/NATS.Server/Monitoring/Subsz.cs b/src/NATS.Server/Monitoring/Subsz.cs new file mode 100644 index 0000000..472b728 --- /dev/null +++ b/src/NATS.Server/Monitoring/Subsz.cs @@ -0,0 +1,45 @@ +using System.Text.Json.Serialization; + +namespace NATS.Server.Monitoring; + +/// +/// Subscription information response. Corresponds to Go server/monitor.go Subsz struct. +/// +public sealed class Subsz +{ + [JsonPropertyName("server_id")] + public string Id { get; set; } = ""; + + [JsonPropertyName("now")] + public DateTime Now { get; set; } + + [JsonPropertyName("num_subscriptions")] + public uint NumSubs { get; set; } + + [JsonPropertyName("num_cache")] + public int NumCache { get; set; } + + [JsonPropertyName("total")] + public int Total { get; set; } + + [JsonPropertyName("offset")] + public int Offset { get; set; } + + [JsonPropertyName("limit")] + public int Limit { get; set; } + + [JsonPropertyName("subscriptions")] + public SubDetail[] Subs { get; set; } = []; +} + +/// +/// Options passed to Subsz() for filtering. +/// +public sealed class SubszOptions +{ + public int Offset { get; set; } + public int Limit { get; set; } = 1024; + public bool Subscriptions { get; set; } + public string Account { get; set; } = ""; + public string Test { get; set; } = ""; +} diff --git a/src/NATS.Server/Monitoring/SubszHandler.cs b/src/NATS.Server/Monitoring/SubszHandler.cs new file mode 100644 index 0000000..1de4f97 --- /dev/null +++ b/src/NATS.Server/Monitoring/SubszHandler.cs @@ -0,0 +1,93 @@ +using Microsoft.AspNetCore.Http; +using NATS.Server.Subscriptions; + +namespace NATS.Server.Monitoring; + +/// +/// Handles /subz endpoint requests, returning subscription information. +/// Corresponds to Go server/monitor.go handleSubsz. +/// +public sealed class SubszHandler(NatsServer server) +{ + public Subsz HandleSubsz(HttpContext ctx) + { + var opts = ParseQueryParams(ctx); + var now = DateTime.UtcNow; + + // Collect subscriptions from all accounts (or filtered) + var allSubs = new List(); + foreach (var account in server.GetAccounts()) + { + if (!string.IsNullOrEmpty(opts.Account) && account.Name != opts.Account) + continue; + allSubs.AddRange(account.SubList.GetAllSubscriptions()); + } + + // Filter by test subject if provided + if (!string.IsNullOrEmpty(opts.Test)) + { + allSubs = allSubs.Where(s => SubjectMatch.MatchLiteral(opts.Test, s.Subject)).ToList(); + } + + var total = allSubs.Count; + var numSubs = server.GetAccounts() + .Where(a => string.IsNullOrEmpty(opts.Account) || a.Name == opts.Account) + .Aggregate(0u, (sum, a) => sum + a.SubList.Count); + var numCache = server.GetAccounts() + .Where(a => string.IsNullOrEmpty(opts.Account) || a.Name == opts.Account) + .Sum(a => a.SubList.CacheCount); + + SubDetail[] details = []; + if (opts.Subscriptions) + { + details = allSubs + .Skip(opts.Offset) + .Take(opts.Limit) + .Select(s => new SubDetail + { + Subject = s.Subject, + Queue = s.Queue ?? "", + Sid = s.Sid, + Msgs = Interlocked.Read(ref s.MessageCount), + Max = s.MaxMessages, + Cid = s.Client?.Id ?? 0, + }) + .ToArray(); + } + + return new Subsz + { + Id = server.ServerId, + Now = now, + NumSubs = numSubs, + NumCache = numCache, + Total = total, + Offset = opts.Offset, + Limit = opts.Limit, + Subs = details, + }; + } + + private static SubszOptions ParseQueryParams(HttpContext ctx) + { + var q = ctx.Request.Query; + var opts = new SubszOptions(); + + if (q.TryGetValue("subs", out var subs)) + opts.Subscriptions = subs == "true" || subs == "1" || subs == "detail"; + + if (q.TryGetValue("offset", out var offset) && int.TryParse(offset, out var o)) + opts.Offset = o; + + if (q.TryGetValue("limit", out var limit) && int.TryParse(limit, out var l)) + opts.Limit = l; + + if (q.TryGetValue("acc", out var acc)) + opts.Account = acc.ToString(); + + if (q.TryGetValue("test", out var test)) + opts.Test = test.ToString(); + + return opts; + } +} diff --git a/src/NATS.Server/NatsServer.cs b/src/NATS.Server/NatsServer.cs index c80fa74..2eff8de 100644 --- a/src/NATS.Server/NatsServer.cs +++ b/src/NATS.Server/NatsServer.cs @@ -64,6 +64,8 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable public Action? ReOpenLogFile { get; set; } public IEnumerable GetClients() => _clients.Values; + public IEnumerable GetAccounts() => _accounts.Values; + public Task WaitForReadyAsync() => _listeningStarted.Task; public void WaitForShutdown() => _shutdownComplete.Task.GetAwaiter().GetResult(); diff --git a/src/NATS.Server/Subscriptions/SubList.cs b/src/NATS.Server/Subscriptions/SubList.cs index 047bef2..b999403 100644 --- a/src/NATS.Server/Subscriptions/SubList.cs +++ b/src/NATS.Server/Subscriptions/SubList.cs @@ -33,6 +33,62 @@ public sealed class SubList : IDisposable } } + /// + /// Returns all subscriptions in the trie. For monitoring only. + /// + public List GetAllSubscriptions() + { + _lock.EnterReadLock(); + try + { + var result = new List(); + CollectAll(_root, result); + return result; + } + finally + { + _lock.ExitReadLock(); + } + } + + private static void CollectAll(TrieLevel level, List result) + { + foreach (var (_, node) in level.Nodes) + { + foreach (var sub in node.PlainSubs) result.Add(sub); + foreach (var (_, qset) in node.QueueSubs) + foreach (var sub in qset) result.Add(sub); + if (node.Next != null) CollectAll(node.Next, result); + } + if (level.Pwc != null) + { + foreach (var sub in level.Pwc.PlainSubs) result.Add(sub); + foreach (var (_, qset) in level.Pwc.QueueSubs) + foreach (var sub in qset) result.Add(sub); + if (level.Pwc.Next != null) CollectAll(level.Pwc.Next, result); + } + if (level.Fwc != null) + { + foreach (var sub in level.Fwc.PlainSubs) result.Add(sub); + foreach (var (_, qset) in level.Fwc.QueueSubs) + foreach (var sub in qset) result.Add(sub); + if (level.Fwc.Next != null) CollectAll(level.Fwc.Next, result); + } + } + + /// + /// Returns the current number of entries in the cache. + /// + public int CacheCount + { + get + { + _lock.EnterReadLock(); + try { return _cache?.Count ?? 0; } + finally { _lock.ExitReadLock(); } + } + } + public void Insert(Subscription sub) { var subject = sub.Subject; diff --git a/tests/NATS.Server.Tests/SubszTests.cs b/tests/NATS.Server.Tests/SubszTests.cs new file mode 100644 index 0000000..b049181 --- /dev/null +++ b/tests/NATS.Server.Tests/SubszTests.cs @@ -0,0 +1,137 @@ +using System.Net; +using System.Net.Http.Json; +using System.Net.Sockets; +using Microsoft.Extensions.Logging.Abstractions; +using NATS.Server.Monitoring; + +namespace NATS.Server.Tests; + +public class SubszTests : IAsyncLifetime +{ + private readonly NatsServer _server; + private readonly int _natsPort; + private readonly int _monitorPort; + private readonly CancellationTokenSource _cts = new(); + private readonly HttpClient _http = new(); + + public SubszTests() + { + _natsPort = GetFreePort(); + _monitorPort = GetFreePort(); + _server = new NatsServer( + new NatsOptions { Port = _natsPort, MonitorPort = _monitorPort }, + NullLoggerFactory.Instance); + } + + public async Task InitializeAsync() + { + _ = _server.StartAsync(_cts.Token); + await _server.WaitForReadyAsync(); + for (int i = 0; i < 50; i++) + { + try + { + var resp = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/healthz"); + if (resp.IsSuccessStatusCode) break; + } + catch (HttpRequestException) { } + await Task.Delay(50); + } + } + + public async Task DisposeAsync() + { + _http.Dispose(); + await _cts.CancelAsync(); + _server.Dispose(); + } + + [Fact] + public async Task Subz_returns_empty_when_no_subscriptions() + { + var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/subz"); + response.StatusCode.ShouldBe(HttpStatusCode.OK); + + var subz = await response.Content.ReadFromJsonAsync(); + subz.ShouldNotBeNull(); + subz.NumSubs.ShouldBe(0u); + } + + [Fact] + public async Task Subz_returns_count_with_subscriptions() + { + using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + await sock.ConnectAsync(new IPEndPoint(IPAddress.Loopback, _natsPort)); + using var stream = new NetworkStream(sock); + var buf = new byte[4096]; + _ = await stream.ReadAsync(buf); + await stream.WriteAsync("CONNECT {}\r\nSUB foo 1\r\nSUB bar 2\r\nSUB baz.* 3\r\n"u8.ToArray()); + await Task.Delay(200); + + var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/subz"); + var subz = await response.Content.ReadFromJsonAsync(); + subz.ShouldNotBeNull(); + subz.NumSubs.ShouldBeGreaterThanOrEqualTo(3u); + } + + [Fact] + public async Task Subz_subs_true_returns_subscription_details() + { + using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + await sock.ConnectAsync(new IPEndPoint(IPAddress.Loopback, _natsPort)); + using var stream = new NetworkStream(sock); + var buf = new byte[4096]; + _ = await stream.ReadAsync(buf); + await stream.WriteAsync("CONNECT {}\r\nSUB foo 1\r\n"u8.ToArray()); + await Task.Delay(200); + + var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/subz?subs=true"); + var subz = await response.Content.ReadFromJsonAsync(); + subz.ShouldNotBeNull(); + subz.Subs.ShouldNotBeEmpty(); + subz.Subs.ShouldContain(s => s.Subject == "foo"); + } + + [Fact] + public async Task Subz_test_subject_filters_matching_subs() + { + using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + await sock.ConnectAsync(new IPEndPoint(IPAddress.Loopback, _natsPort)); + using var stream = new NetworkStream(sock); + var buf = new byte[4096]; + _ = await stream.ReadAsync(buf); + await stream.WriteAsync("CONNECT {}\r\nSUB foo.* 1\r\nSUB bar 2\r\n"u8.ToArray()); + await Task.Delay(200); + + var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/subz?subs=true&test=foo.hello"); + var subz = await response.Content.ReadFromJsonAsync(); + subz.ShouldNotBeNull(); + subz.Subs.ShouldContain(s => s.Subject == "foo.*"); + subz.Subs.ShouldNotContain(s => s.Subject == "bar"); + } + + [Fact] + public async Task Subz_pagination() + { + using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + await sock.ConnectAsync(new IPEndPoint(IPAddress.Loopback, _natsPort)); + using var stream = new NetworkStream(sock); + var buf = new byte[4096]; + _ = await stream.ReadAsync(buf); + await stream.WriteAsync("CONNECT {}\r\nSUB a 1\r\nSUB b 2\r\nSUB c 3\r\n"u8.ToArray()); + await Task.Delay(200); + + var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/subz?subs=true&offset=0&limit=2"); + var subz = await response.Content.ReadFromJsonAsync(); + subz.ShouldNotBeNull(); + subz.Subs.Length.ShouldBe(2); + subz.Total.ShouldBeGreaterThanOrEqualTo(3); + } + + private static int GetFreePort() + { + using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + sock.Bind(new IPEndPoint(IPAddress.Loopback, 0)); + return ((IPEndPoint)sock.LocalEndPoint!).Port; + } +} From 1f132694479f50c0cebe355702c1aa1269b94afe Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 23 Feb 2026 00:55:29 -0500 Subject: [PATCH 09/13] feat: implement TLS cert-to-user mapping via X500 DN matching --- src/NATS.Server/Auth/AuthService.cs | 7 + src/NATS.Server/Auth/IAuthenticator.cs | 2 + src/NATS.Server/Auth/TlsMapAuthenticator.cs | 67 +++++++++ src/NATS.Server/NatsClient.cs | 1 + .../TlsMapAuthenticatorTests.cs | 134 ++++++++++++++++++ 5 files changed, 211 insertions(+) create mode 100644 src/NATS.Server/Auth/TlsMapAuthenticator.cs create mode 100644 tests/NATS.Server.Tests/TlsMapAuthenticatorTests.cs diff --git a/src/NATS.Server/Auth/AuthService.cs b/src/NATS.Server/Auth/AuthService.cs index ba348d5..c17b8aa 100644 --- a/src/NATS.Server/Auth/AuthService.cs +++ b/src/NATS.Server/Auth/AuthService.cs @@ -34,6 +34,13 @@ public sealed class AuthService var nonceRequired = false; Dictionary? usersMap = null; + // TLS certificate mapping (highest priority when enabled) + if (options.TlsMap && options.TlsVerify && options.Users is { Count: > 0 }) + { + authenticators.Add(new TlsMapAuthenticator(options.Users)); + authRequired = true; + } + // Priority order (matching Go): NKeys > Users > Token > SimpleUserPassword if (options.NKeys is { Count: > 0 }) diff --git a/src/NATS.Server/Auth/IAuthenticator.cs b/src/NATS.Server/Auth/IAuthenticator.cs index 32a5788..3783c88 100644 --- a/src/NATS.Server/Auth/IAuthenticator.cs +++ b/src/NATS.Server/Auth/IAuthenticator.cs @@ -1,3 +1,4 @@ +using System.Security.Cryptography.X509Certificates; using NATS.Server.Protocol; namespace NATS.Server.Auth; @@ -11,4 +12,5 @@ public sealed class ClientAuthContext { public required ClientOptions Opts { get; init; } public required byte[] Nonce { get; init; } + public X509Certificate2? ClientCertificate { get; init; } } diff --git a/src/NATS.Server/Auth/TlsMapAuthenticator.cs b/src/NATS.Server/Auth/TlsMapAuthenticator.cs new file mode 100644 index 0000000..b213e94 --- /dev/null +++ b/src/NATS.Server/Auth/TlsMapAuthenticator.cs @@ -0,0 +1,67 @@ +using System.Security.Cryptography.X509Certificates; + +namespace NATS.Server.Auth; + +/// +/// Authenticates clients by mapping TLS certificate subject DN to configured users. +/// Corresponds to Go server/auth.go checkClientTLSCertSubject. +/// +public sealed class TlsMapAuthenticator : IAuthenticator +{ + private readonly Dictionary _usersByDn; + private readonly Dictionary _usersByCn; + + public TlsMapAuthenticator(IReadOnlyList users) + { + _usersByDn = new Dictionary(StringComparer.OrdinalIgnoreCase); + _usersByCn = new Dictionary(StringComparer.OrdinalIgnoreCase); + foreach (var user in users) + { + _usersByDn[user.Username] = user; + _usersByCn[user.Username] = user; + } + } + + public AuthResult? Authenticate(ClientAuthContext context) + { + var cert = context.ClientCertificate; + if (cert == null) + return null; + + var dn = cert.SubjectName; + var dnString = dn.Name; // RFC 2253 format + + // Try exact DN match first + if (_usersByDn.TryGetValue(dnString, out var user)) + return BuildResult(user); + + // Try CN extraction + var cn = ExtractCn(dn); + if (cn != null && _usersByCn.TryGetValue(cn, out user)) + return BuildResult(user); + + return null; + } + + private static string? ExtractCn(X500DistinguishedName dn) + { + var dnString = dn.Name; + foreach (var rdn in dnString.Split(',', StringSplitOptions.TrimEntries)) + { + if (rdn.StartsWith("CN=", StringComparison.OrdinalIgnoreCase)) + return rdn[3..]; + } + return null; + } + + private static AuthResult BuildResult(User user) + { + return new AuthResult + { + Identity = user.Username, + AccountName = user.Account, + Permissions = user.Permissions, + Expiry = user.ConnectionDeadline, + }; + } +} diff --git a/src/NATS.Server/NatsClient.cs b/src/NATS.Server/NatsClient.cs index daace96..6c2f325 100644 --- a/src/NATS.Server/NatsClient.cs +++ b/src/NATS.Server/NatsClient.cs @@ -367,6 +367,7 @@ public sealed class NatsClient : IDisposable { Opts = ClientOpts, Nonce = _nonce ?? [], + ClientCertificate = TlsState?.PeerCert, }; var result = _authService.Authenticate(context); diff --git a/tests/NATS.Server.Tests/TlsMapAuthenticatorTests.cs b/tests/NATS.Server.Tests/TlsMapAuthenticatorTests.cs new file mode 100644 index 0000000..e3e27ed --- /dev/null +++ b/tests/NATS.Server.Tests/TlsMapAuthenticatorTests.cs @@ -0,0 +1,134 @@ +using System.Security.Cryptography; +using System.Security.Cryptography.X509Certificates; +using NATS.Server.Auth; + +namespace NATS.Server.Tests; + +public class TlsMapAuthenticatorTests +{ + private static X509Certificate2 CreateSelfSignedCert(string cn) + { + using var rsa = RSA.Create(2048); + var req = new CertificateRequest($"CN={cn}", rsa, HashAlgorithmName.SHA256, RSASignaturePadding.Pkcs1); + return req.CreateSelfSigned(DateTimeOffset.UtcNow, DateTimeOffset.UtcNow.AddYears(1)); + } + + private static X509Certificate2 CreateCertWithDn(string dn) + { + using var rsa = RSA.Create(2048); + var req = new CertificateRequest(dn, rsa, HashAlgorithmName.SHA256, RSASignaturePadding.Pkcs1); + return req.CreateSelfSigned(DateTimeOffset.UtcNow, DateTimeOffset.UtcNow.AddYears(1)); + } + + [Fact] + public void Matches_user_by_cn() + { + var users = new List + { + new() { Username = "alice", Password = "" }, + }; + var auth = new TlsMapAuthenticator(users); + var cert = CreateSelfSignedCert("alice"); + + var ctx = new ClientAuthContext + { + Opts = new Protocol.ClientOptions(), + Nonce = [], + ClientCertificate = cert, + }; + + var result = auth.Authenticate(ctx); + result.ShouldNotBeNull(); + result.Identity.ShouldBe("alice"); + } + + [Fact] + public void Returns_null_when_no_cert() + { + var users = new List + { + new() { Username = "alice", Password = "" }, + }; + var auth = new TlsMapAuthenticator(users); + + var ctx = new ClientAuthContext + { + Opts = new Protocol.ClientOptions(), + Nonce = [], + ClientCertificate = null, + }; + + var result = auth.Authenticate(ctx); + result.ShouldBeNull(); + } + + [Fact] + public void Returns_null_when_cn_doesnt_match() + { + var users = new List + { + new() { Username = "alice", Password = "" }, + }; + var auth = new TlsMapAuthenticator(users); + var cert = CreateSelfSignedCert("bob"); + + var ctx = new ClientAuthContext + { + Opts = new Protocol.ClientOptions(), + Nonce = [], + ClientCertificate = cert, + }; + + var result = auth.Authenticate(ctx); + result.ShouldBeNull(); + } + + [Fact] + public void Matches_by_full_dn_string() + { + var users = new List + { + new() { Username = "CN=alice, O=TestOrg", Password = "" }, + }; + var auth = new TlsMapAuthenticator(users); + var cert = CreateCertWithDn("CN=alice, O=TestOrg"); + + var ctx = new ClientAuthContext + { + Opts = new Protocol.ClientOptions(), + Nonce = [], + ClientCertificate = cert, + }; + + var result = auth.Authenticate(ctx); + result.ShouldNotBeNull(); + result.Identity.ShouldBe("CN=alice, O=TestOrg"); + } + + [Fact] + public void Returns_permissions_from_matched_user() + { + var perms = new Permissions + { + Publish = new SubjectPermission { Allow = ["foo.>"] }, + }; + var users = new List + { + new() { Username = "alice", Password = "", Permissions = perms }, + }; + var auth = new TlsMapAuthenticator(users); + var cert = CreateSelfSignedCert("alice"); + + var ctx = new ClientAuthContext + { + Opts = new Protocol.ClientOptions(), + Nonce = [], + ClientCertificate = cert, + }; + + var result = auth.Authenticate(ctx); + result.ShouldNotBeNull(); + result.Permissions.ShouldNotBeNull(); + result.Permissions.Publish!.Allow!.ShouldContain("foo.>"); + } +} From 1806ae607e47690e739d57fd2f455c276363c5aa Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 23 Feb 2026 00:57:14 -0500 Subject: [PATCH 10/13] test: add TLS rate limiter unit tests --- .../NATS.Server.Tests/TlsRateLimiterTests.cs | 49 +++++++++++++++++++ 1 file changed, 49 insertions(+) create mode 100644 tests/NATS.Server.Tests/TlsRateLimiterTests.cs diff --git a/tests/NATS.Server.Tests/TlsRateLimiterTests.cs b/tests/NATS.Server.Tests/TlsRateLimiterTests.cs new file mode 100644 index 0000000..2e7e210 --- /dev/null +++ b/tests/NATS.Server.Tests/TlsRateLimiterTests.cs @@ -0,0 +1,49 @@ +using NATS.Server.Tls; + +namespace NATS.Server.Tests; + +public class TlsRateLimiterTests +{ + [Fact] + public async Task Rate_limiter_allows_configured_tokens_per_second() + { + using var limiter = new TlsRateLimiter(5); + + // Should allow 5 tokens immediately + for (int i = 0; i < 5; i++) + { + using var cts = new CancellationTokenSource(100); + await limiter.WaitAsync(cts.Token); // Should not throw + } + + // 6th token should block (no refill yet) + using var blockCts = new CancellationTokenSource(200); + var blocked = false; + try + { + await limiter.WaitAsync(blockCts.Token); + } + catch (OperationCanceledException) + { + blocked = true; + } + blocked.ShouldBeTrue("6th token should be blocked before refill"); + } + + [Fact] + public async Task Rate_limiter_refills_after_one_second() + { + using var limiter = new TlsRateLimiter(2); + + // Consume all tokens + await limiter.WaitAsync(CancellationToken.None); + await limiter.WaitAsync(CancellationToken.None); + + // Wait for refill + await Task.Delay(1200); + + // Should have tokens again + using var cts = new CancellationTokenSource(200); + await limiter.WaitAsync(cts.Token); // Should not throw + } +} From e31ba04fdb70030d2549b96dc38486b21bd3b979 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 23 Feb 2026 01:01:56 -0500 Subject: [PATCH 11/13] feat: add closed connection tracking, state filtering, ByStop/ByReason sorting --- src/NATS.Server/Monitoring/ClosedClient.cs | 25 +++++++ src/NATS.Server/Monitoring/Connz.cs | 2 + src/NATS.Server/Monitoring/ConnzHandler.cs | 76 +++++++++++++++++++++- src/NATS.Server/NatsServer.cs | 31 +++++++++ tests/NATS.Server.Tests/MonitorTests.cs | 47 +++++++++++++ 5 files changed, 178 insertions(+), 3 deletions(-) create mode 100644 src/NATS.Server/Monitoring/ClosedClient.cs diff --git a/src/NATS.Server/Monitoring/ClosedClient.cs b/src/NATS.Server/Monitoring/ClosedClient.cs new file mode 100644 index 0000000..0710d19 --- /dev/null +++ b/src/NATS.Server/Monitoring/ClosedClient.cs @@ -0,0 +1,25 @@ +namespace NATS.Server.Monitoring; + +/// +/// Snapshot of a closed client connection for /connz reporting. +/// +public sealed record ClosedClient +{ + public required ulong Cid { get; init; } + public string Ip { get; init; } = ""; + public int Port { get; init; } + public DateTime Start { get; init; } + public DateTime Stop { get; init; } + public string Reason { get; init; } = ""; + public string Name { get; init; } = ""; + public string Lang { get; init; } = ""; + public string Version { get; init; } = ""; + public long InMsgs { get; init; } + public long OutMsgs { get; init; } + public long InBytes { get; init; } + public long OutBytes { get; init; } + public uint NumSubs { get; init; } + public TimeSpan Rtt { get; init; } + public string TlsVersion { get; init; } = ""; + public string TlsCipherSuite { get; init; } = ""; +} diff --git a/src/NATS.Server/Monitoring/Connz.cs b/src/NATS.Server/Monitoring/Connz.cs index 2147715..aae62ed 100644 --- a/src/NATS.Server/Monitoring/Connz.cs +++ b/src/NATS.Server/Monitoring/Connz.cs @@ -169,6 +169,8 @@ public enum SortOpt ByIdle, ByUptime, ByRtt, + ByStop, + ByReason, } /// diff --git a/src/NATS.Server/Monitoring/ConnzHandler.cs b/src/NATS.Server/Monitoring/ConnzHandler.cs index f532e7c..8ecf512 100644 --- a/src/NATS.Server/Monitoring/ConnzHandler.cs +++ b/src/NATS.Server/Monitoring/ConnzHandler.cs @@ -12,9 +12,25 @@ public sealed class ConnzHandler(NatsServer server) { var opts = ParseQueryParams(ctx); var now = DateTime.UtcNow; - var clients = server.GetClients().ToArray(); - var connInfos = clients.Select(c => BuildConnInfo(c, now, opts)).ToList(); + var connInfos = new List(); + + // Collect open connections + if (opts.State is ConnState.Open or ConnState.All) + { + var clients = server.GetClients().ToArray(); + connInfos.AddRange(clients.Select(c => BuildConnInfo(c, now, opts))); + } + + // Collect closed connections + if (opts.State is ConnState.Closed or ConnState.All) + { + connInfos.AddRange(server.GetClosedClients().Select(c => BuildClosedConnInfo(c, now, opts))); + } + + // Validate sort options that require closed state + if (opts.Sort is SortOpt.ByStop or SortOpt.ByReason && opts.State == ConnState.Open) + opts.Sort = SortOpt.ByCid; // Fallback // Sort connInfos = opts.Sort switch @@ -30,6 +46,8 @@ public sealed class ConnzHandler(NatsServer server) SortOpt.ByLast => connInfos.OrderByDescending(c => c.LastActivity).ToList(), SortOpt.ByIdle => connInfos.OrderByDescending(c => now - c.LastActivity).ToList(), SortOpt.ByUptime => connInfos.OrderByDescending(c => now - c.Start).ToList(), + SortOpt.ByStop => connInfos.OrderByDescending(c => c.Stop ?? DateTime.MinValue).ToList(), + SortOpt.ByReason => connInfos.OrderBy(c => c.Reason).ToList(), SortOpt.ByRtt => connInfos.OrderBy(c => c.Rtt).ToList(), _ => connInfos.OrderBy(c => c.Cid).ToList(), }; @@ -74,7 +92,7 @@ public sealed class ConnzHandler(NatsServer server) Reason = client.CloseReason.ToReasonString(), TlsVersion = client.TlsState?.TlsVersion ?? "", TlsCipherSuite = client.TlsState?.CipherSuite ?? "", - Rtt = client.Rtt == TimeSpan.Zero ? "" : $"{client.Rtt.TotalMilliseconds:F3}ms", + Rtt = FormatRtt(client.Rtt), }; if (opts.Subscriptions) @@ -98,6 +116,35 @@ public sealed class ConnzHandler(NatsServer server) return info; } + private static ConnInfo BuildClosedConnInfo(ClosedClient closed, DateTime now, ConnzOptions opts) + { + return new ConnInfo + { + Cid = closed.Cid, + Kind = "Client", + Type = "Client", + Ip = closed.Ip, + Port = closed.Port, + Start = closed.Start, + Stop = closed.Stop, + LastActivity = closed.Stop, + Uptime = FormatDuration(closed.Stop - closed.Start), + Idle = FormatDuration(now - closed.Stop), + InMsgs = closed.InMsgs, + OutMsgs = closed.OutMsgs, + InBytes = closed.InBytes, + OutBytes = closed.OutBytes, + NumSubs = closed.NumSubs, + Name = closed.Name, + Lang = closed.Lang, + Version = closed.Version, + Reason = closed.Reason, + Rtt = FormatRtt(closed.Rtt), + TlsVersion = closed.TlsVersion, + TlsCipherSuite = closed.TlsCipherSuite, + }; + } + private static ConnzOptions ParseQueryParams(HttpContext ctx) { var q = ctx.Request.Query; @@ -119,6 +166,8 @@ public sealed class ConnzHandler(NatsServer server) "idle" => SortOpt.ByIdle, "uptime" => SortOpt.ByUptime, "rtt" => SortOpt.ByRtt, + "stop" => SortOpt.ByStop, + "reason" => SortOpt.ByReason, _ => SortOpt.ByCid, }; } @@ -131,6 +180,17 @@ public sealed class ConnzHandler(NatsServer server) opts.Subscriptions = true; } + if (q.TryGetValue("state", out var state)) + { + opts.State = state.ToString().ToLowerInvariant() switch + { + "open" => ConnState.Open, + "closed" => ConnState.Closed, + "all" => ConnState.All, + _ => ConnState.Open, + }; + } + if (q.TryGetValue("offset", out var offset) && int.TryParse(offset, out var o)) opts.Offset = o; @@ -140,6 +200,16 @@ public sealed class ConnzHandler(NatsServer server) return opts; } + private static string FormatRtt(TimeSpan rtt) + { + if (rtt == TimeSpan.Zero) return ""; + if (rtt.TotalMilliseconds < 1) + return $"{rtt.TotalMicroseconds:F3}\u00b5s"; + if (rtt.TotalSeconds < 1) + return $"{rtt.TotalMilliseconds:F3}ms"; + return $"{rtt.TotalSeconds:F3}s"; + } + private static string FormatDuration(TimeSpan ts) { if (ts.TotalDays >= 1) diff --git a/src/NATS.Server/NatsServer.cs b/src/NATS.Server/NatsServer.cs index 2eff8de..64b1cf5 100644 --- a/src/NATS.Server/NatsServer.cs +++ b/src/NATS.Server/NatsServer.cs @@ -19,6 +19,8 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable { private readonly NatsOptions _options; private readonly ConcurrentDictionary _clients = new(); + private readonly ConcurrentQueue _closedClients = new(); + private const int MaxClosedClients = 10_000; private readonly ServerInfo _serverInfo; private readonly ILogger _logger; private readonly ILoggerFactory _loggerFactory; @@ -64,6 +66,8 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable public Action? ReOpenLogFile { get; set; } public IEnumerable GetClients() => _clients.Values; + public IEnumerable GetClosedClients() => _closedClients; + public IEnumerable GetAccounts() => _accounts.Values; public Task WaitForReadyAsync() => _listeningStarted.Task; @@ -580,6 +584,33 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable { _clients.TryRemove(client.Id, out _); _logger.LogDebug("Removed client {ClientId}", client.Id); + + // Snapshot for closed-connections tracking + _closedClients.Enqueue(new ClosedClient + { + Cid = client.Id, + Ip = client.RemoteIp ?? "", + Port = client.RemotePort, + Start = client.StartTime, + Stop = DateTime.UtcNow, + Reason = client.CloseReason.ToReasonString(), + Name = client.ClientOpts?.Name ?? "", + Lang = client.ClientOpts?.Lang ?? "", + Version = client.ClientOpts?.Version ?? "", + InMsgs = Interlocked.Read(ref client.InMsgs), + OutMsgs = Interlocked.Read(ref client.OutMsgs), + InBytes = Interlocked.Read(ref client.InBytes), + OutBytes = Interlocked.Read(ref client.OutBytes), + NumSubs = (uint)client.Subscriptions.Count, + Rtt = client.Rtt, + TlsVersion = client.TlsState?.TlsVersion ?? "", + TlsCipherSuite = client.TlsState?.CipherSuite ?? "", + }); + + // Cap closed clients list + while (_closedClients.Count > MaxClosedClients) + _closedClients.TryDequeue(out _); + var subList = client.Account?.SubList ?? _globalAccount.SubList; client.RemoveAllSubscriptions(subList); client.Account?.RemoveClient(client.Id); diff --git a/tests/NATS.Server.Tests/MonitorTests.cs b/tests/NATS.Server.Tests/MonitorTests.cs index 65a1399..e89a0db 100644 --- a/tests/NATS.Server.Tests/MonitorTests.cs +++ b/tests/NATS.Server.Tests/MonitorTests.cs @@ -179,6 +179,53 @@ public class MonitorTests : IAsyncLifetime conn.Subs.ShouldContain("bar"); } + [Fact] + public async Task Connz_state_closed_returns_disconnected_clients() + { + // Connect then disconnect a client + var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + await sock.ConnectAsync(new IPEndPoint(IPAddress.Loopback, _natsPort)); + using var stream = new NetworkStream(sock); + var buf = new byte[4096]; + _ = await stream.ReadAsync(buf); + await stream.WriteAsync("CONNECT {\"name\":\"closing-client\"}\r\n"u8.ToArray()); + await Task.Delay(200); + sock.Shutdown(SocketShutdown.Both); + sock.Dispose(); + await Task.Delay(500); + + var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/connz?state=closed"); + var connz = await response.Content.ReadFromJsonAsync(); + connz.ShouldNotBeNull(); + connz.Conns.ShouldContain(c => c.Name == "closing-client"); + var closed = connz.Conns.First(c => c.Name == "closing-client"); + closed.Stop.ShouldNotBeNull(); + closed.Reason.ShouldNotBeNullOrEmpty(); + } + + [Fact] + public async Task Connz_sort_by_stop_requires_closed_state() + { + var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/connz?sort=stop&state=open"); + var connz = await response.Content.ReadFromJsonAsync(); + connz.ShouldNotBeNull(); + } + + [Fact] + public async Task Connz_sort_by_reason() + { + var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + await sock.ConnectAsync(new IPEndPoint(IPAddress.Loopback, _natsPort)); + var buf = new byte[4096]; + _ = await sock.ReceiveAsync(buf); + sock.Shutdown(SocketShutdown.Both); + sock.Dispose(); + await Task.Delay(500); + + var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/connz?sort=reason&state=closed"); + response.StatusCode.ShouldBe(HttpStatusCode.OK); + } + private static int GetFreePort() { using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); From 8878301c7f1736039efc59a1dff96a1f882a68f5 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 23 Feb 2026 01:05:10 -0500 Subject: [PATCH 12/13] test: add file logging and rotation tests --- tests/NATS.Server.Tests/LoggingTests.cs | 60 +++++++++++++++++++ .../NATS.Server.Tests.csproj | 1 + 2 files changed, 61 insertions(+) create mode 100644 tests/NATS.Server.Tests/LoggingTests.cs diff --git a/tests/NATS.Server.Tests/LoggingTests.cs b/tests/NATS.Server.Tests/LoggingTests.cs new file mode 100644 index 0000000..d1daf6d --- /dev/null +++ b/tests/NATS.Server.Tests/LoggingTests.cs @@ -0,0 +1,60 @@ +using Serilog; + +namespace NATS.Server.Tests; + +public class LoggingTests : IDisposable +{ + private readonly string _logDir; + + public LoggingTests() + { + _logDir = Path.Combine(Path.GetTempPath(), $"nats-log-test-{Guid.NewGuid():N}"); + Directory.CreateDirectory(_logDir); + } + + public void Dispose() + { + try { Directory.Delete(_logDir, true); } catch { } + } + + [Fact] + public void File_sink_creates_log_file() + { + var logPath = Path.Combine(_logDir, "test.log"); + + using var logger = new LoggerConfiguration() + .WriteTo.File(logPath) + .CreateLogger(); + + logger.Information("Hello from test"); + logger.Dispose(); + + File.Exists(logPath).ShouldBeTrue(); + var content = File.ReadAllText(logPath); + content.ShouldContain("Hello from test"); + } + + [Fact] + public void File_sink_rotates_on_size_limit() + { + var logPath = Path.Combine(_logDir, "rotate.log"); + + using var logger = new LoggerConfiguration() + .WriteTo.File( + logPath, + fileSizeLimitBytes: 200, + rollOnFileSizeLimit: true, + retainedFileCountLimit: 3) + .CreateLogger(); + + // Write enough to trigger rotation + for (int i = 0; i < 50; i++) + logger.Information("Log message number {Number} with some padding text", i); + + logger.Dispose(); + + // Should have created rotated files + var logFiles = Directory.GetFiles(_logDir, "rotate*.log"); + logFiles.Length.ShouldBeGreaterThan(1); + } +} diff --git a/tests/NATS.Server.Tests/NATS.Server.Tests.csproj b/tests/NATS.Server.Tests/NATS.Server.Tests.csproj index 611a86f..67f3fe4 100644 --- a/tests/NATS.Server.Tests/NATS.Server.Tests.csproj +++ b/tests/NATS.Server.Tests/NATS.Server.Tests.csproj @@ -13,6 +13,7 @@ + From 56de543713062d6ccbba1d8c271e8d172f92303b Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 23 Feb 2026 01:08:34 -0500 Subject: [PATCH 13/13] docs: update differences.md sections 7-10 to reflect implemented features --- differences.md | 84 ++++++++++++++++++++++++++------------------------ 1 file changed, 44 insertions(+), 40 deletions(-) diff --git a/differences.md b/differences.md index 8f77056..5792ae7 100644 --- a/differences.md +++ b/differences.md @@ -40,7 +40,7 @@ |--------|:--:|:----:|-------| | SIGINT (Ctrl+C) | Y | Y | Both handle graceful shutdown | | SIGTERM | Y | Y | `PosixSignalRegistration` triggers `ShutdownAsync()` | -| SIGUSR1 (reopen logs) | Y | Stub | Signal registered, handler logs "not yet implemented" | +| SIGUSR1 (reopen logs) | Y | Y | SIGUSR1 handler calls ReOpenLogFile | | SIGUSR2 (lame duck mode) | Y | Y | Triggers `LameDuckShutdownAsync()` | | SIGHUP (config reload) | Y | Stub | Signal registered, handler logs "not yet implemented" | | Windows Service integration | Y | N | | @@ -78,7 +78,7 @@ | No-responders validation | Y | Y | CONNECT rejects `no_responders` without `headers`; 503 HMSG on no match | | Slow consumer detection | Y | Y | Pending bytes threshold (64MB) + write deadline timeout (10s) | | Write deadline / timeout policies | Y | Y | `WriteDeadline` option with `CancellationTokenSource.CancelAfter` on flush | -| RTT measurement | Y | N | Go tracks round-trip time per client | +| RTT measurement | Y | Y | `_rttStartTicks`/`Rtt` property, computed on PONG receipt | | Per-client trace mode | Y | N | | | Detailed close reason tracking | Y | Y | 37-value `ClosedState` enum with CAS-based `MarkClosed()` | | Connection state flags (16 flags) | Y | Y | 7-flag `ClientFlagHolder` with `Interlocked.Or`/`And` | @@ -206,7 +206,7 @@ Go implements a sophisticated slow consumer detection system: | NKeys (Ed25519) | Y | Y | .NET has framework but integration is basic | | JWT validation | Y | N | | | Bcrypt password hashing | Y | Y | .NET supports bcrypt (`$2*` prefix) with constant-time fallback | -| TLS certificate mapping | Y | N | Property exists but no implementation | +| TLS certificate mapping | Y | Y | X500DistinguishedName with full DN match and CN fallback | | Custom auth interface | Y | N | | | External auth callout | Y | N | | | Proxy authentication | Y | N | | @@ -247,7 +247,7 @@ Go implements a sophisticated slow consumer detection system: | `-n/--name` (ServerName) | Y | Y | | | `-m/--http_port` (monitoring) | Y | Y | | | `-c` (config file) | Y | Stub | Flag parsed, stored in `ConfigFile`, no config parser | -| `-D/-V/-DV` (debug/trace) | Y | N | | +| `-D/-V/-DV` (debug/trace) | Y | Y | `-D` for debug, `-V`/`-T` for trace, `-DV` for both | | `--tlscert/--tlskey/--tlscacert` | Y | Y | | | `--tlsverify` | Y | Y | | | `--http_base_path` | Y | Y | | @@ -262,7 +262,7 @@ Go implements a sophisticated slow consumer detection system: | ~450 option fields | Y | ~54 | .NET covers core options only | ### Missing Options Categories -- Logging options (file, rotation, syslog, trace levels) +- Logging options: per-subsystem log control (file, rotation, syslog, trace/debug flags are implemented) - Advanced limits (MaxSubs, MaxSubTokens, MaxPending, WriteDeadline) - Tags/metadata - OCSP configuration @@ -283,7 +283,7 @@ Go implements a sophisticated slow consumer detection system: | `/routez` | Y | Stub | Returns empty response | | `/gatewayz` | Y | Stub | Returns empty response | | `/leafz` | Y | Stub | Returns empty response | -| `/subz` / `/subscriptionsz` | Y | Stub | Returns empty response | +| `/subz` / `/subscriptionsz` | Y | Y | Account filtering, test subject filtering, pagination, and subscription details | | `/accountz` | Y | Stub | Returns empty response | | `/accstatz` | Y | Stub | Returns empty response | | `/jsz` | Y | Stub | Returns empty response | @@ -308,7 +308,9 @@ Go implements a sophisticated slow consumer detection system: | Feature | Go | .NET | Notes | |---------|:--:|:----:|-------| | Filtering by CID, user, account | Y | Partial | | -| Sorting (11 options) | Y | Y | .NET missing ByStop, ByReason | +| Sorting (11 options) | Y | Y | All options including ByStop, ByReason, ByRtt | +| State filtering (open/closed/all) | Y | Y | `state=open|closed|all` query parameter | +| Closed connection tracking | Y | Y | `ConcurrentQueue` capped at 10,000 entries | | Pagination (offset, limit) | Y | Y | | | Subscription detail mode | Y | N | | | TLS peer certificate info | Y | N | | @@ -337,9 +339,9 @@ Go implements a sophisticated slow consumer detection system: | Mutual TLS (client certs) | Y | Y | | | Certificate pinning (SHA256 SPKI) | Y | Y | | | TLS handshake timeout | Y | Y | | -| TLS rate limiting | Y | Property only | .NET has the option but enforcement is partial | +| TLS rate limiting | Y | Y | Rate enforcement with refill; unit tests cover rate limiting and refill | | First-byte peeking (0x16 detection) | Y | Y | | -| Cert subject→user mapping | Y | N | `TlsMap` property exists, no implementation | +| Cert subject→user mapping | Y | Y | X500DistinguishedName with full DN match and CN fallback | | OCSP stapling | Y | N | | | Min TLS version control | Y | Y | | @@ -350,14 +352,14 @@ Go implements a sophisticated slow consumer detection system: | Feature | Go | .NET | Notes | |---------|:--:|:----:|-------| | Structured logging | Partial | Y | .NET uses Serilog with ILogger | -| File logging with rotation | Y | N | | -| Syslog (local and remote) | Y | N | | -| Log reopening (SIGUSR1) | Y | N | | -| Trace mode (protocol-level) | Y | N | | -| Debug mode | Y | N | | +| File logging with rotation | Y | Y | Serilog.Sinks.File with rolling file support | +| Syslog (local and remote) | Y | Y | `--syslog` and `--remote_syslog` flags via Serilog.Sinks.SyslogMessages | +| Log reopening (SIGUSR1) | Y | Y | SIGUSR1 handler calls ReOpenLogFile | +| Trace mode (protocol-level) | Y | Y | `-V` or `-T` flag enables trace-level logging | +| Debug mode | Y | Y | `-D` flag enables debug-level logging | | Per-subsystem log control | Y | N | | -| Color output on TTY | Y | N | | -| Timestamp format control | Y | N | | +| Color output on TTY | Y | Y | Auto-detected via `Console.IsOutputRedirected`, uses `AnsiConsoleTheme.Code` | +| Timestamp format control | Y | Y | `--logtime` and `--logtime_utc` flags | --- @@ -369,34 +371,36 @@ Go implements a sophisticated slow consumer detection system: | Configurable interval | Y | Y | PingInterval option | | Max pings out | Y | Y | MaxPingsOut option | | Stale connection close | Y | Y | | -| RTT-based first PING delay | Y | N | Go delays first PING based on RTT | -| RTT tracking | Y | N | | -| Stale connection watcher | Y | N | Go has dedicated watcher goroutine | +| RTT-based first PING delay | Y | Y | Skips PING until FirstPongSent or 2s elapsed | +| RTT tracking | Y | Y | `_rttStartTicks`/`Rtt` property, computed on PONG receipt | +| Stale connection stats | Y | Y | `StaleConnectionStats` model, exposed in `/varz` | --- ## Summary: Critical Gaps for Production Use -### High Priority -1. **Slow consumer detection** — unbounded writes can exhaust memory (stat fields exist but no detection logic) -2. **Write coalescing / batch flush** — performance gap for high-throughput scenarios +### Resolved Since Initial Audit +The following items from the original gap list have been implemented: +- **Slow consumer detection** — pending bytes threshold (64MB) with write deadline enforcement +- **Write coalescing / batch flush** — channel-based write loop drains all items before single flush +- **Verbose mode** — `+OK` responses for CONNECT, SUB, UNSUB, PUB when `verbose:true` +- **No-responders validation** — CONNECT rejects `no_responders` without `headers`; 503 HMSG on no match +- **File logging with rotation** — Serilog.Sinks.File with rolling file support +- **TLS certificate mapping** — X500DistinguishedName with full DN match and CN fallback +- **Protocol tracing** — `-V`/`-T` flag enables trace-level logging; `-D` for debug -### Medium Priority -3. **Verbose mode** — clients expect `+OK` when `verbose: true` -4. **Permission deny enforcement at delivery** — deny lists checked at SUB/PUB time but not during message delivery -5. **Config file parsing** — needed for production deployment (CLI stub exists) -6. **Hot reload** — needed for zero-downtime config changes (SIGHUP stub exists) -7. **File logging with rotation** — needed for production logging -8. **No-responders validation** — flag parsed but not enforced +### Remaining High Priority +1. **Permission deny enforcement at delivery** — deny lists checked at SUB/PUB time but not during message delivery +2. **Config file parsing** — needed for production deployment (CLI stub exists) +3. **Hot reload** — needed for zero-downtime config changes (SIGHUP stub exists) -### Lower Priority -9. **Dynamic buffer sizing** — delegated to Pipe, less optimized for long-lived connections -10. **JWT authentication** — needed for operator mode -11. **TLS certificate mapping** — property exists, not implemented -12. **OCSP support** — certificate revocation checking -13. **Subject mapping** — input→output subject transformation -14. **Protocol tracing** — no trace-level logging -15. **Subscription statistics** — SubList has no stats collection -16. **Per-account limits** — connections, subscriptions per account -17. **Reply subject tracking** — dynamic response permissions -18. **Windows Service integration** — needed for Windows deployment +### Remaining Lower Priority +4. **Dynamic buffer sizing** — delegated to Pipe, less optimized for long-lived connections +5. **JWT authentication** — needed for operator mode +6. **OCSP support** — certificate revocation checking +7. **Subject mapping** — input→output subject transformation +8. **Subscription statistics** — SubList has no stats collection +9. **Per-account limits** — connections, subscriptions per account +10. **Reply subject tracking** — dynamic response permissions +11. **Windows Service integration** — needed for Windows deployment +12. **Per-subsystem log control** — granular log levels per component