diff --git a/Directory.Packages.props b/Directory.Packages.props index 6854a7a..d235438 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -9,6 +9,7 @@ + diff --git a/differences.md b/differences.md index f86d047..829df24 100644 --- a/differences.md +++ b/differences.md @@ -40,7 +40,7 @@ |--------|:--:|:----:|-------| | SIGINT (Ctrl+C) | Y | Y | Both handle graceful shutdown | | SIGTERM | Y | Y | `PosixSignalRegistration` triggers `ShutdownAsync()` | -| SIGUSR1 (reopen logs) | Y | Stub | Signal registered, handler logs "not yet implemented" | +| SIGUSR1 (reopen logs) | Y | Y | SIGUSR1 handler calls ReOpenLogFile | | SIGUSR2 (lame duck mode) | Y | Y | Triggers `LameDuckShutdownAsync()` | | SIGHUP (config reload) | Y | Stub | Signal registered, handler logs "not yet implemented" | | Windows Service integration | Y | N | | @@ -78,7 +78,7 @@ | No-responders validation | Y | Y | CONNECT rejects `no_responders` without `headers`; 503 HMSG on no match | | Slow consumer detection | Y | Y | Pending bytes threshold (64MB) + write deadline timeout (10s) | | Write deadline / timeout policies | Y | Y | `WriteDeadline` option with `CancellationTokenSource.CancelAfter` on flush | -| RTT measurement | Y | N | Go tracks round-trip time per client | +| RTT measurement | Y | Y | `_rttStartTicks`/`Rtt` property, computed on PONG receipt | | Per-client trace mode | Y | N | | | Detailed close reason tracking | Y | Y | 37-value `ClosedState` enum with CAS-based `MarkClosed()` | | Connection state flags (16 flags) | Y | Y | 7-flag `ClientFlagHolder` with `Interlocked.Or`/`And` | @@ -206,7 +206,7 @@ Go implements a sophisticated slow consumer detection system: | NKeys (Ed25519) | Y | Y | .NET has framework but integration is basic | | JWT validation | Y | N | | | Bcrypt password hashing | Y | Y | .NET supports bcrypt (`$2*` prefix) with constant-time fallback | -| TLS certificate mapping | Y | N | Property exists but no implementation | +| TLS certificate mapping | Y | Y | X500DistinguishedName with full DN match and CN fallback | | Custom auth interface | Y | N | | | External auth callout | Y | N | | | Proxy authentication | Y | N | | @@ -248,7 +248,7 @@ Go implements a sophisticated slow consumer detection system: | `-n/--name` (ServerName) | Y | Y | | | `-m/--http_port` (monitoring) | Y | Y | | | `-c` (config file) | Y | Stub | Flag parsed, stored in `ConfigFile`, no config parser | -| `-D/-V/-DV` (debug/trace) | Y | Y | Sets `Debug`/`Trace` on `NatsOptions`, adjusts Serilog minimum level | +| `-D/-V/-DV` (debug/trace) | Y | Y | `-D`/`--debug` for debug, `-V`/`-T`/`--trace` for trace, `-DV` for both | | `--tlscert/--tlskey/--tlscacert` | Y | Y | | | `--tlsverify` | Y | Y | | | `--http_base_path` | Y | Y | | @@ -263,7 +263,7 @@ Go implements a sophisticated slow consumer detection system: | ~450 option fields | Y | ~62 | .NET covers core + debug/trace/logging/limits/tags options | ### Missing Options Categories -- ~~Logging options (file, rotation, syslog, trace levels)~~ — File logging (`-l`), `LogSizeLimit`, Debug/Trace implemented; syslog/color/timestamp not yet +- ~~Logging options~~ — file logging, rotation, syslog, debug/trace, color, timestamps all implemented; only per-subsystem log control remains - ~~Advanced limits (MaxSubs, MaxSubTokens, MaxPending, WriteDeadline)~~ — `MaxSubs`, `MaxSubTokens` implemented; MaxPending/WriteDeadline already existed - ~~Tags/metadata~~ — `Tags` dictionary implemented in `NatsOptions` - OCSP configuration @@ -284,7 +284,7 @@ Go implements a sophisticated slow consumer detection system: | `/routez` | Y | Stub | Returns empty response | | `/gatewayz` | Y | Stub | Returns empty response | | `/leafz` | Y | Stub | Returns empty response | -| `/subz` / `/subscriptionsz` | Y | Stub | Returns empty response | +| `/subz` / `/subscriptionsz` | Y | Y | Account filtering, test subject filtering, pagination, and subscription details | | `/accountz` | Y | Stub | Returns empty response | | `/accstatz` | Y | Stub | Returns empty response | | `/jsz` | Y | Stub | Returns empty response | @@ -309,7 +309,9 @@ Go implements a sophisticated slow consumer detection system: | Feature | Go | .NET | Notes | |---------|:--:|:----:|-------| | Filtering by CID, user, account | Y | Partial | | -| Sorting (11 options) | Y | Y | .NET missing ByStop, ByReason | +| Sorting (11 options) | Y | Y | All options including ByStop, ByReason, ByRtt | +| State filtering (open/closed/all) | Y | Y | `state=open|closed|all` query parameter | +| Closed connection tracking | Y | Y | `ConcurrentQueue` capped at 10,000 entries | | Pagination (offset, limit) | Y | Y | | | Subscription detail mode | Y | N | | | TLS peer certificate info | Y | N | | @@ -338,9 +340,9 @@ Go implements a sophisticated slow consumer detection system: | Mutual TLS (client certs) | Y | Y | | | Certificate pinning (SHA256 SPKI) | Y | Y | | | TLS handshake timeout | Y | Y | | -| TLS rate limiting | Y | Property only | .NET has the option but enforcement is partial | +| TLS rate limiting | Y | Y | Rate enforcement with refill; unit tests cover rate limiting and refill | | First-byte peeking (0x16 detection) | Y | Y | | -| Cert subject→user mapping | Y | N | `TlsMap` property exists, no implementation | +| Cert subject→user mapping | Y | Y | X500DistinguishedName with full DN match and CN fallback | | OCSP stapling | Y | N | | | Min TLS version control | Y | Y | | @@ -351,14 +353,14 @@ Go implements a sophisticated slow consumer detection system: | Feature | Go | .NET | Notes | |---------|:--:|:----:|-------| | Structured logging | Partial | Y | .NET uses Serilog with ILogger | -| File logging with rotation | Y | Y | `-l` flag + `LogSizeLimit` option via Serilog.Sinks.File with `fileSizeLimitBytes` | -| Syslog (local and remote) | Y | N | | -| Log reopening (SIGUSR1) | Y | N | | -| Trace mode (protocol-level) | Y | Y | `-V`/`-DV` flags; parser `TraceInOp()` logs `<<- OP arg` at Trace level | -| Debug mode | Y | Y | `-D`/`-DV` flags lower Serilog minimum to Debug/Verbose | +| File logging with rotation | Y | Y | `-l`/`--log_file` flag + `LogSizeLimit`/`LogMaxFiles` via Serilog.Sinks.File | +| Syslog (local and remote) | Y | Y | `--syslog` and `--remote_syslog` flags via Serilog.Sinks.SyslogMessages | +| Log reopening (SIGUSR1) | Y | Y | SIGUSR1 handler calls ReOpenLogFile callback | +| Trace mode (protocol-level) | Y | Y | `-V`/`-T`/`--trace` flags; parser `TraceInOp()` logs at Trace level | +| Debug mode | Y | Y | `-D`/`--debug` flag lowers Serilog minimum to Debug | | Per-subsystem log control | Y | N | | -| Color output on TTY | Y | N | | -| Timestamp format control | Y | N | | +| Color output on TTY | Y | Y | Auto-detected via `Console.IsOutputRedirected`, uses `AnsiConsoleTheme.Code` | +| Timestamp format control | Y | Y | `--logtime` and `--logtime_utc` flags | --- @@ -370,34 +372,36 @@ Go implements a sophisticated slow consumer detection system: | Configurable interval | Y | Y | PingInterval option | | Max pings out | Y | Y | MaxPingsOut option | | Stale connection close | Y | Y | | -| RTT-based first PING delay | Y | N | Go delays first PING based on RTT | -| RTT tracking | Y | N | | -| Stale connection watcher | Y | N | Go has dedicated watcher goroutine | +| RTT-based first PING delay | Y | Y | Skips PING until FirstPongSent or 2s elapsed | +| RTT tracking | Y | Y | `_rttStartTicks`/`Rtt` property, computed on PONG receipt | +| Stale connection stats | Y | Y | `StaleConnectionStats` model, exposed in `/varz` | --- ## Summary: Critical Gaps for Production Use -### High Priority -1. ~~**Slow consumer detection**~~ — implemented (pending bytes threshold + write deadline) -2. ~~**Write coalescing / batch flush**~~ — implemented (channel-based write loop) +### Resolved Since Initial Audit +The following items from the original gap list have been implemented: +- **Slow consumer detection** — pending bytes threshold (64MB) with write deadline enforcement +- **Write coalescing / batch flush** — channel-based write loop drains all items before single flush +- **Verbose mode** — `+OK` responses for CONNECT, SUB, UNSUB, PUB when `verbose:true` +- **Permission deny enforcement at delivery** — `IsDeliveryAllowed` + auto-unsub cleanup +- **No-responders validation** — CONNECT rejects `no_responders` without `headers`; 503 HMSG on no match +- **File logging with rotation** — Serilog.Sinks.File with rolling file support +- **TLS certificate mapping** — X500DistinguishedName with full DN match and CN fallback +- **Protocol tracing** — `-V`/`-T` flag enables trace-level logging; `-D` for debug +- **Subscription statistics** — `Stats()`, `HasInterest()`, `NumInterest()`, etc. +- **Per-account limits** — connection + subscription limits via `AccountConfig` +- **Reply subject tracking** — `ResponseTracker` with TTL + max messages -### Medium Priority -3. ~~**Verbose mode**~~ — implemented (`+OK` on CONNECT/SUB/UNSUB/PUB) -4. ~~**Permission deny enforcement at delivery**~~ — implemented (`IsDeliveryAllowed` + auto-unsub cleanup) -5. **Config file parsing** — needed for production deployment (CLI stub exists) -6. **Hot reload** — needed for zero-downtime config changes (SIGHUP stub exists) -7. ~~**File logging with rotation**~~ — implemented (Serilog.Sinks.File with `-l` flag) -8. ~~**No-responders validation**~~ — implemented (CONNECT validation + 503 HMSG) +### Remaining High Priority +1. **Config file parsing** — needed for production deployment (CLI stub exists) +2. **Hot reload** — needed for zero-downtime config changes (SIGHUP stub exists) -### Lower Priority -9. **Dynamic buffer sizing** — delegated to Pipe, less optimized for long-lived connections -10. **JWT authentication** — needed for operator mode -11. **TLS certificate mapping** — property exists, not implemented -12. **OCSP support** — certificate revocation checking -13. **Subject mapping** — input→output subject transformation -14. ~~**Protocol tracing**~~ — implemented (`TraceInOp` at `LogLevel.Trace`) -15. ~~**Subscription statistics**~~ — implemented (`Stats()`, `HasInterest()`, `NumInterest()`, etc.) -16. ~~**Per-account limits**~~ — implemented (connection + subscription limits via `AccountConfig`) -17. ~~**Reply subject tracking**~~ — implemented (`ResponseTracker` with TTL + max messages) -18. **Windows Service integration** — needed for Windows deployment +### Remaining Lower Priority +3. **Dynamic buffer sizing** — delegated to Pipe, less optimized for long-lived connections +4. **JWT authentication** — needed for operator mode +5. **OCSP support** — certificate revocation checking +6. **Subject mapping** — input→output subject transformation +7. **Windows Service integration** — needed for Windows deployment +8. **Per-subsystem log control** — granular log levels per component diff --git a/src/NATS.Server.Host/NATS.Server.Host.csproj b/src/NATS.Server.Host/NATS.Server.Host.csproj index 6ab1019..fedb2dd 100644 --- a/src/NATS.Server.Host/NATS.Server.Host.csproj +++ b/src/NATS.Server.Host/NATS.Server.Host.csproj @@ -12,6 +12,7 @@ + diff --git a/src/NATS.Server.Host/Program.cs b/src/NATS.Server.Host/Program.cs index b148eba..152e5f3 100644 --- a/src/NATS.Server.Host/Program.cs +++ b/src/NATS.Server.Host/Program.cs @@ -1,9 +1,10 @@ using NATS.Server; using Serilog; +using Serilog.Sinks.SystemConsole.Themes; var options = new NatsOptions(); -// Simple CLI argument parsing +// Parse ALL CLI flags into NatsOptions first for (int i = 0; i < args.Length; i++) { switch (args[i]) @@ -52,27 +53,42 @@ for (int i = 0; i < args.Length; i++) case "-D" or "--debug": options.Debug = true; break; - case "-V" or "--trace": + case "-V" or "-T" or "--trace": options.Trace = true; break; case "-DV": options.Debug = true; options.Trace = true; break; - case "-l" or "--log" when i + 1 < args.Length: + case "-l" or "--log" or "--log_file" when i + 1 < args.Length: options.LogFile = args[++i]; break; case "--log_size_limit" when i + 1 < args.Length: options.LogSizeLimit = long.Parse(args[++i]); break; + case "--log_max_files" when i + 1 < args.Length: + options.LogMaxFiles = int.Parse(args[++i]); + break; + case "--logtime" when i + 1 < args.Length: + options.Logtime = bool.Parse(args[++i]); + break; + case "--logtime_utc": + options.LogtimeUTC = true; + break; + case "--syslog": + options.Syslog = true; + break; + case "--remote_syslog" when i + 1 < args.Length: + options.RemoteSyslog = args[++i]; + break; } } -// Configure Serilog based on options +// Build Serilog configuration from options var logConfig = new LoggerConfiguration() - .Enrich.FromLogContext() - .WriteTo.Console(outputTemplate: "[{Timestamp:HH:mm:ss} {Level:u3}] {Message:lj}{NewLine}{Exception}"); + .Enrich.FromLogContext(); +// Set minimum level based on flags if (options.Trace) logConfig.MinimumLevel.Verbose(); else if (options.Debug) @@ -80,12 +96,39 @@ else if (options.Debug) else logConfig.MinimumLevel.Information(); -if (options.LogFile != null) +// Build output template +var timestampFormat = options.LogtimeUTC + ? "{Timestamp:yyyy/MM/dd HH:mm:ss.ffffff} " + : "{Timestamp:HH:mm:ss} "; +var template = options.Logtime + ? $"[{timestampFormat}{{Level:u3}}] {{Message:lj}}{{NewLine}}{{Exception}}" + : "[{Level:u3}] {Message:lj}{NewLine}{Exception}"; + +// Console sink with color auto-detection +if (!Console.IsOutputRedirected) + logConfig.WriteTo.Console(outputTemplate: template, theme: AnsiConsoleTheme.Code); +else + logConfig.WriteTo.Console(outputTemplate: template); + +// File sink with rotation +if (!string.IsNullOrEmpty(options.LogFile)) { logConfig.WriteTo.File( options.LogFile, fileSizeLimitBytes: options.LogSizeLimit > 0 ? options.LogSizeLimit : null, - rollOnFileSizeLimit: options.LogSizeLimit > 0); + retainedFileCountLimit: options.LogMaxFiles > 0 ? options.LogMaxFiles : null, + rollOnFileSizeLimit: options.LogSizeLimit > 0, + outputTemplate: template); +} + +// Syslog sink +if (!string.IsNullOrEmpty(options.RemoteSyslog)) +{ + logConfig.WriteTo.UdpSyslog(options.RemoteSyslog); +} +else if (options.Syslog) +{ + logConfig.WriteTo.LocalSyslog("nats-server"); } Log.Logger = logConfig.CreateLogger(); @@ -96,6 +139,14 @@ using var server = new NatsServer(options, loggerFactory); // Register signal handlers server.HandleSignals(); +server.ReOpenLogFile = () => +{ + Log.Information("Reopening log file"); + Log.CloseAndFlush(); + Log.Logger = logConfig.CreateLogger(); + Log.Information("File log re-opened"); +}; + // Ctrl+C triggers graceful shutdown Console.CancelKeyPress += (_, e) => { diff --git a/src/NATS.Server/Auth/AuthService.cs b/src/NATS.Server/Auth/AuthService.cs index ba348d5..c17b8aa 100644 --- a/src/NATS.Server/Auth/AuthService.cs +++ b/src/NATS.Server/Auth/AuthService.cs @@ -34,6 +34,13 @@ public sealed class AuthService var nonceRequired = false; Dictionary? usersMap = null; + // TLS certificate mapping (highest priority when enabled) + if (options.TlsMap && options.TlsVerify && options.Users is { Count: > 0 }) + { + authenticators.Add(new TlsMapAuthenticator(options.Users)); + authRequired = true; + } + // Priority order (matching Go): NKeys > Users > Token > SimpleUserPassword if (options.NKeys is { Count: > 0 }) diff --git a/src/NATS.Server/Auth/IAuthenticator.cs b/src/NATS.Server/Auth/IAuthenticator.cs index 32a5788..3783c88 100644 --- a/src/NATS.Server/Auth/IAuthenticator.cs +++ b/src/NATS.Server/Auth/IAuthenticator.cs @@ -1,3 +1,4 @@ +using System.Security.Cryptography.X509Certificates; using NATS.Server.Protocol; namespace NATS.Server.Auth; @@ -11,4 +12,5 @@ public sealed class ClientAuthContext { public required ClientOptions Opts { get; init; } public required byte[] Nonce { get; init; } + public X509Certificate2? ClientCertificate { get; init; } } diff --git a/src/NATS.Server/Auth/TlsMapAuthenticator.cs b/src/NATS.Server/Auth/TlsMapAuthenticator.cs new file mode 100644 index 0000000..b213e94 --- /dev/null +++ b/src/NATS.Server/Auth/TlsMapAuthenticator.cs @@ -0,0 +1,67 @@ +using System.Security.Cryptography.X509Certificates; + +namespace NATS.Server.Auth; + +/// +/// Authenticates clients by mapping TLS certificate subject DN to configured users. +/// Corresponds to Go server/auth.go checkClientTLSCertSubject. +/// +public sealed class TlsMapAuthenticator : IAuthenticator +{ + private readonly Dictionary _usersByDn; + private readonly Dictionary _usersByCn; + + public TlsMapAuthenticator(IReadOnlyList users) + { + _usersByDn = new Dictionary(StringComparer.OrdinalIgnoreCase); + _usersByCn = new Dictionary(StringComparer.OrdinalIgnoreCase); + foreach (var user in users) + { + _usersByDn[user.Username] = user; + _usersByCn[user.Username] = user; + } + } + + public AuthResult? Authenticate(ClientAuthContext context) + { + var cert = context.ClientCertificate; + if (cert == null) + return null; + + var dn = cert.SubjectName; + var dnString = dn.Name; // RFC 2253 format + + // Try exact DN match first + if (_usersByDn.TryGetValue(dnString, out var user)) + return BuildResult(user); + + // Try CN extraction + var cn = ExtractCn(dn); + if (cn != null && _usersByCn.TryGetValue(cn, out user)) + return BuildResult(user); + + return null; + } + + private static string? ExtractCn(X500DistinguishedName dn) + { + var dnString = dn.Name; + foreach (var rdn in dnString.Split(',', StringSplitOptions.TrimEntries)) + { + if (rdn.StartsWith("CN=", StringComparison.OrdinalIgnoreCase)) + return rdn[3..]; + } + return null; + } + + private static AuthResult BuildResult(User user) + { + return new AuthResult + { + Identity = user.Username, + AccountName = user.Account, + Permissions = user.Permissions, + Expiry = user.ConnectionDeadline, + }; + } +} diff --git a/src/NATS.Server/Monitoring/ClosedClient.cs b/src/NATS.Server/Monitoring/ClosedClient.cs new file mode 100644 index 0000000..0710d19 --- /dev/null +++ b/src/NATS.Server/Monitoring/ClosedClient.cs @@ -0,0 +1,25 @@ +namespace NATS.Server.Monitoring; + +/// +/// Snapshot of a closed client connection for /connz reporting. +/// +public sealed record ClosedClient +{ + public required ulong Cid { get; init; } + public string Ip { get; init; } = ""; + public int Port { get; init; } + public DateTime Start { get; init; } + public DateTime Stop { get; init; } + public string Reason { get; init; } = ""; + public string Name { get; init; } = ""; + public string Lang { get; init; } = ""; + public string Version { get; init; } = ""; + public long InMsgs { get; init; } + public long OutMsgs { get; init; } + public long InBytes { get; init; } + public long OutBytes { get; init; } + public uint NumSubs { get; init; } + public TimeSpan Rtt { get; init; } + public string TlsVersion { get; init; } = ""; + public string TlsCipherSuite { get; init; } = ""; +} diff --git a/src/NATS.Server/Monitoring/Connz.cs b/src/NATS.Server/Monitoring/Connz.cs index d2a6f49..aae62ed 100644 --- a/src/NATS.Server/Monitoring/Connz.cs +++ b/src/NATS.Server/Monitoring/Connz.cs @@ -168,6 +168,9 @@ public enum SortOpt ByLast, ByIdle, ByUptime, + ByRtt, + ByStop, + ByReason, } /// diff --git a/src/NATS.Server/Monitoring/ConnzHandler.cs b/src/NATS.Server/Monitoring/ConnzHandler.cs index 8ff0a3e..8ecf512 100644 --- a/src/NATS.Server/Monitoring/ConnzHandler.cs +++ b/src/NATS.Server/Monitoring/ConnzHandler.cs @@ -12,9 +12,25 @@ public sealed class ConnzHandler(NatsServer server) { var opts = ParseQueryParams(ctx); var now = DateTime.UtcNow; - var clients = server.GetClients().ToArray(); - var connInfos = clients.Select(c => BuildConnInfo(c, now, opts)).ToList(); + var connInfos = new List(); + + // Collect open connections + if (opts.State is ConnState.Open or ConnState.All) + { + var clients = server.GetClients().ToArray(); + connInfos.AddRange(clients.Select(c => BuildConnInfo(c, now, opts))); + } + + // Collect closed connections + if (opts.State is ConnState.Closed or ConnState.All) + { + connInfos.AddRange(server.GetClosedClients().Select(c => BuildClosedConnInfo(c, now, opts))); + } + + // Validate sort options that require closed state + if (opts.Sort is SortOpt.ByStop or SortOpt.ByReason && opts.State == ConnState.Open) + opts.Sort = SortOpt.ByCid; // Fallback // Sort connInfos = opts.Sort switch @@ -30,6 +46,9 @@ public sealed class ConnzHandler(NatsServer server) SortOpt.ByLast => connInfos.OrderByDescending(c => c.LastActivity).ToList(), SortOpt.ByIdle => connInfos.OrderByDescending(c => now - c.LastActivity).ToList(), SortOpt.ByUptime => connInfos.OrderByDescending(c => now - c.Start).ToList(), + SortOpt.ByStop => connInfos.OrderByDescending(c => c.Stop ?? DateTime.MinValue).ToList(), + SortOpt.ByReason => connInfos.OrderBy(c => c.Reason).ToList(), + SortOpt.ByRtt => connInfos.OrderBy(c => c.Rtt).ToList(), _ => connInfos.OrderBy(c => c.Cid).ToList(), }; @@ -73,6 +92,7 @@ public sealed class ConnzHandler(NatsServer server) Reason = client.CloseReason.ToReasonString(), TlsVersion = client.TlsState?.TlsVersion ?? "", TlsCipherSuite = client.TlsState?.CipherSuite ?? "", + Rtt = FormatRtt(client.Rtt), }; if (opts.Subscriptions) @@ -96,6 +116,35 @@ public sealed class ConnzHandler(NatsServer server) return info; } + private static ConnInfo BuildClosedConnInfo(ClosedClient closed, DateTime now, ConnzOptions opts) + { + return new ConnInfo + { + Cid = closed.Cid, + Kind = "Client", + Type = "Client", + Ip = closed.Ip, + Port = closed.Port, + Start = closed.Start, + Stop = closed.Stop, + LastActivity = closed.Stop, + Uptime = FormatDuration(closed.Stop - closed.Start), + Idle = FormatDuration(now - closed.Stop), + InMsgs = closed.InMsgs, + OutMsgs = closed.OutMsgs, + InBytes = closed.InBytes, + OutBytes = closed.OutBytes, + NumSubs = closed.NumSubs, + Name = closed.Name, + Lang = closed.Lang, + Version = closed.Version, + Reason = closed.Reason, + Rtt = FormatRtt(closed.Rtt), + TlsVersion = closed.TlsVersion, + TlsCipherSuite = closed.TlsCipherSuite, + }; + } + private static ConnzOptions ParseQueryParams(HttpContext ctx) { var q = ctx.Request.Query; @@ -116,6 +165,9 @@ public sealed class ConnzHandler(NatsServer server) "last" => SortOpt.ByLast, "idle" => SortOpt.ByIdle, "uptime" => SortOpt.ByUptime, + "rtt" => SortOpt.ByRtt, + "stop" => SortOpt.ByStop, + "reason" => SortOpt.ByReason, _ => SortOpt.ByCid, }; } @@ -128,6 +180,17 @@ public sealed class ConnzHandler(NatsServer server) opts.Subscriptions = true; } + if (q.TryGetValue("state", out var state)) + { + opts.State = state.ToString().ToLowerInvariant() switch + { + "open" => ConnState.Open, + "closed" => ConnState.Closed, + "all" => ConnState.All, + _ => ConnState.Open, + }; + } + if (q.TryGetValue("offset", out var offset) && int.TryParse(offset, out var o)) opts.Offset = o; @@ -137,6 +200,16 @@ public sealed class ConnzHandler(NatsServer server) return opts; } + private static string FormatRtt(TimeSpan rtt) + { + if (rtt == TimeSpan.Zero) return ""; + if (rtt.TotalMilliseconds < 1) + return $"{rtt.TotalMicroseconds:F3}\u00b5s"; + if (rtt.TotalSeconds < 1) + return $"{rtt.TotalMilliseconds:F3}ms"; + return $"{rtt.TotalSeconds:F3}s"; + } + private static string FormatDuration(TimeSpan ts) { if (ts.TotalDays >= 1) diff --git a/src/NATS.Server/Monitoring/MonitorServer.cs b/src/NATS.Server/Monitoring/MonitorServer.cs index af23506..ad06f91 100644 --- a/src/NATS.Server/Monitoring/MonitorServer.cs +++ b/src/NATS.Server/Monitoring/MonitorServer.cs @@ -15,6 +15,7 @@ public sealed class MonitorServer : IAsyncDisposable private readonly ILogger _logger; private readonly VarzHandler _varzHandler; private readonly ConnzHandler _connzHandler; + private readonly SubszHandler _subszHandler; public MonitorServer(NatsServer server, NatsOptions options, ServerStats stats, ILoggerFactory loggerFactory) { @@ -29,6 +30,7 @@ public sealed class MonitorServer : IAsyncDisposable _varzHandler = new VarzHandler(server, options); _connzHandler = new ConnzHandler(server); + _subszHandler = new SubszHandler(server); _app.MapGet(basePath + "/", () => { @@ -75,15 +77,15 @@ public sealed class MonitorServer : IAsyncDisposable stats.HttpReqStats.AddOrUpdate("/leafz", 1, (_, v) => v + 1); return Results.Ok(new { }); }); - _app.MapGet(basePath + "/subz", () => + _app.MapGet(basePath + "/subz", (HttpContext ctx) => { stats.HttpReqStats.AddOrUpdate("/subz", 1, (_, v) => v + 1); - return Results.Ok(new { }); + return Results.Ok(_subszHandler.HandleSubsz(ctx)); }); - _app.MapGet(basePath + "/subscriptionsz", () => + _app.MapGet(basePath + "/subscriptionsz", (HttpContext ctx) => { stats.HttpReqStats.AddOrUpdate("/subscriptionsz", 1, (_, v) => v + 1); - return Results.Ok(new { }); + return Results.Ok(_subszHandler.HandleSubsz(ctx)); }); _app.MapGet(basePath + "/accountz", () => { diff --git a/src/NATS.Server/Monitoring/Subsz.cs b/src/NATS.Server/Monitoring/Subsz.cs new file mode 100644 index 0000000..472b728 --- /dev/null +++ b/src/NATS.Server/Monitoring/Subsz.cs @@ -0,0 +1,45 @@ +using System.Text.Json.Serialization; + +namespace NATS.Server.Monitoring; + +/// +/// Subscription information response. Corresponds to Go server/monitor.go Subsz struct. +/// +public sealed class Subsz +{ + [JsonPropertyName("server_id")] + public string Id { get; set; } = ""; + + [JsonPropertyName("now")] + public DateTime Now { get; set; } + + [JsonPropertyName("num_subscriptions")] + public uint NumSubs { get; set; } + + [JsonPropertyName("num_cache")] + public int NumCache { get; set; } + + [JsonPropertyName("total")] + public int Total { get; set; } + + [JsonPropertyName("offset")] + public int Offset { get; set; } + + [JsonPropertyName("limit")] + public int Limit { get; set; } + + [JsonPropertyName("subscriptions")] + public SubDetail[] Subs { get; set; } = []; +} + +/// +/// Options passed to Subsz() for filtering. +/// +public sealed class SubszOptions +{ + public int Offset { get; set; } + public int Limit { get; set; } = 1024; + public bool Subscriptions { get; set; } + public string Account { get; set; } = ""; + public string Test { get; set; } = ""; +} diff --git a/src/NATS.Server/Monitoring/SubszHandler.cs b/src/NATS.Server/Monitoring/SubszHandler.cs new file mode 100644 index 0000000..1de4f97 --- /dev/null +++ b/src/NATS.Server/Monitoring/SubszHandler.cs @@ -0,0 +1,93 @@ +using Microsoft.AspNetCore.Http; +using NATS.Server.Subscriptions; + +namespace NATS.Server.Monitoring; + +/// +/// Handles /subz endpoint requests, returning subscription information. +/// Corresponds to Go server/monitor.go handleSubsz. +/// +public sealed class SubszHandler(NatsServer server) +{ + public Subsz HandleSubsz(HttpContext ctx) + { + var opts = ParseQueryParams(ctx); + var now = DateTime.UtcNow; + + // Collect subscriptions from all accounts (or filtered) + var allSubs = new List(); + foreach (var account in server.GetAccounts()) + { + if (!string.IsNullOrEmpty(opts.Account) && account.Name != opts.Account) + continue; + allSubs.AddRange(account.SubList.GetAllSubscriptions()); + } + + // Filter by test subject if provided + if (!string.IsNullOrEmpty(opts.Test)) + { + allSubs = allSubs.Where(s => SubjectMatch.MatchLiteral(opts.Test, s.Subject)).ToList(); + } + + var total = allSubs.Count; + var numSubs = server.GetAccounts() + .Where(a => string.IsNullOrEmpty(opts.Account) || a.Name == opts.Account) + .Aggregate(0u, (sum, a) => sum + a.SubList.Count); + var numCache = server.GetAccounts() + .Where(a => string.IsNullOrEmpty(opts.Account) || a.Name == opts.Account) + .Sum(a => a.SubList.CacheCount); + + SubDetail[] details = []; + if (opts.Subscriptions) + { + details = allSubs + .Skip(opts.Offset) + .Take(opts.Limit) + .Select(s => new SubDetail + { + Subject = s.Subject, + Queue = s.Queue ?? "", + Sid = s.Sid, + Msgs = Interlocked.Read(ref s.MessageCount), + Max = s.MaxMessages, + Cid = s.Client?.Id ?? 0, + }) + .ToArray(); + } + + return new Subsz + { + Id = server.ServerId, + Now = now, + NumSubs = numSubs, + NumCache = numCache, + Total = total, + Offset = opts.Offset, + Limit = opts.Limit, + Subs = details, + }; + } + + private static SubszOptions ParseQueryParams(HttpContext ctx) + { + var q = ctx.Request.Query; + var opts = new SubszOptions(); + + if (q.TryGetValue("subs", out var subs)) + opts.Subscriptions = subs == "true" || subs == "1" || subs == "detail"; + + if (q.TryGetValue("offset", out var offset) && int.TryParse(offset, out var o)) + opts.Offset = o; + + if (q.TryGetValue("limit", out var limit) && int.TryParse(limit, out var l)) + opts.Limit = l; + + if (q.TryGetValue("acc", out var acc)) + opts.Account = acc.ToString(); + + if (q.TryGetValue("test", out var test)) + opts.Test = test.ToString(); + + return opts; + } +} diff --git a/src/NATS.Server/Monitoring/Varz.cs b/src/NATS.Server/Monitoring/Varz.cs index 847bdc2..3e85374 100644 --- a/src/NATS.Server/Monitoring/Varz.cs +++ b/src/NATS.Server/Monitoring/Varz.cs @@ -157,6 +157,12 @@ public sealed class Varz [JsonPropertyName("slow_consumer_stats")] public SlowConsumersStats SlowConsumerStats { get; set; } = new(); + [JsonPropertyName("stale_connections")] + public long StaleConnections { get; set; } + + [JsonPropertyName("stale_connection_stats")] + public StaleConnectionStats StaleConnectionStatsDetail { get; set; } = new(); + [JsonPropertyName("subscriptions")] public uint Subscriptions { get; set; } @@ -219,6 +225,25 @@ public sealed class SlowConsumersStats public ulong Leafs { get; set; } } +/// +/// Statistics about stale connections by connection type. +/// Corresponds to Go server/monitor.go StaleConnectionStats struct. +/// +public sealed class StaleConnectionStats +{ + [JsonPropertyName("clients")] + public ulong Clients { get; set; } + + [JsonPropertyName("routes")] + public ulong Routes { get; set; } + + [JsonPropertyName("gateways")] + public ulong Gateways { get; set; } + + [JsonPropertyName("leafs")] + public ulong Leafs { get; set; } +} + /// /// Cluster configuration monitoring information. /// Corresponds to Go server/monitor.go ClusterOptsVarz struct. diff --git a/src/NATS.Server/Monitoring/VarzHandler.cs b/src/NATS.Server/Monitoring/VarzHandler.cs index c872b28..433aa84 100644 --- a/src/NATS.Server/Monitoring/VarzHandler.cs +++ b/src/NATS.Server/Monitoring/VarzHandler.cs @@ -91,6 +91,14 @@ public sealed class VarzHandler : IDisposable Gateways = (ulong)Interlocked.Read(ref stats.SlowConsumerGateways), Leafs = (ulong)Interlocked.Read(ref stats.SlowConsumerLeafs), }, + StaleConnections = Interlocked.Read(ref stats.StaleConnections), + StaleConnectionStatsDetail = new StaleConnectionStats + { + Clients = (ulong)Interlocked.Read(ref stats.StaleConnectionClients), + Routes = (ulong)Interlocked.Read(ref stats.StaleConnectionRoutes), + Gateways = (ulong)Interlocked.Read(ref stats.StaleConnectionGateways), + Leafs = (ulong)Interlocked.Read(ref stats.StaleConnectionLeafs), + }, Subscriptions = _server.SubList.Count, ConfigLoadTime = _server.StartTime, HttpReqStats = stats.HttpReqStats.ToDictionary(kv => kv.Key, kv => (ulong)kv.Value), diff --git a/src/NATS.Server/NatsClient.cs b/src/NATS.Server/NatsClient.cs index 709d565..1ccfc71 100644 --- a/src/NATS.Server/NatsClient.cs +++ b/src/NATS.Server/NatsClient.cs @@ -74,6 +74,11 @@ public sealed class NatsClient : IDisposable private int _pingsOut; private long _lastIn; + // RTT tracking + private long _rttStartTicks; + private long _rtt; + public TimeSpan Rtt => new(Interlocked.Read(ref _rtt)); + public TlsConnectionState? TlsState { get; set; } public bool InfoAlreadySent { get; set; } @@ -322,6 +327,14 @@ public sealed class NatsClient : IDisposable case CommandType.Pong: Interlocked.Exchange(ref _pingsOut, 0); + var rttStart = Interlocked.Read(ref _rttStartTicks); + if (rttStart > 0) + { + var elapsed = DateTime.UtcNow.Ticks - rttStart; + if (elapsed <= 0) elapsed = 1; // min 1 tick for Windows granularity + Interlocked.Exchange(ref _rtt, elapsed); + } + _flags.SetFlag(ClientFlags.FirstPongSent); break; case CommandType.Sub: @@ -356,6 +369,7 @@ public sealed class NatsClient : IDisposable { Opts = ClientOpts, Nonce = _nonce ?? [], + ClientCertificate = TlsState?.PeerCert, }; authResult = _authService.Authenticate(context); @@ -733,6 +747,13 @@ public sealed class NatsClient : IDisposable { while (await timer.WaitForNextTickAsync(ct)) { + // Delay first PING until client has responded with PONG or 2 seconds elapsed + if (!_flags.HasFlag(ClientFlags.FirstPongSent) + && (DateTime.UtcNow - StartTime).TotalSeconds < 2) + { + continue; + } + var elapsed = Environment.TickCount64 - Interlocked.Read(ref _lastIn); if (elapsed < (long)_options.PingInterval.TotalMilliseconds) { @@ -744,6 +765,8 @@ public sealed class NatsClient : IDisposable if (Volatile.Read(ref _pingsOut) + 1 > _options.MaxPingsOut) { _logger.LogDebug("Client {ClientId} stale connection -- closing", Id); + Interlocked.Increment(ref _serverStats.StaleConnections); + Interlocked.Increment(ref _serverStats.StaleConnectionClients); await SendErrAndCloseAsync(NatsProtocol.ErrStaleConnection, ClientClosedReason.StaleConnection); return; } @@ -751,6 +774,7 @@ public sealed class NatsClient : IDisposable var currentPingsOut = Interlocked.Increment(ref _pingsOut); _logger.LogDebug("Client {ClientId} sending PING ({PingsOut}/{MaxPingsOut})", Id, currentPingsOut, _options.MaxPingsOut); + Interlocked.Exchange(ref _rttStartTicks, DateTime.UtcNow.Ticks); WriteProtocol(NatsProtocol.PingBytes); } } diff --git a/src/NATS.Server/NatsOptions.cs b/src/NATS.Server/NatsOptions.cs index 346e7f4..8a9b56d 100644 --- a/src/NATS.Server/NatsOptions.cs +++ b/src/NATS.Server/NatsOptions.cs @@ -20,12 +20,6 @@ public sealed class NatsOptions public int MaxSubs { get; set; } // 0 = unlimited (per-connection) public int MaxSubTokens { get; set; } // 0 = unlimited - // Logging / diagnostics - public bool Debug { get; set; } - public bool Trace { get; set; } - public string? LogFile { get; set; } - public long LogSizeLimit { get; set; } - // Server tags (exposed via /varz) public Dictionary? Tags { get; set; } @@ -63,6 +57,17 @@ public sealed class NatsOptions public string? PortsFileDir { get; set; } public string? ConfigFile { get; set; } + // Logging + public string? LogFile { get; set; } + public long LogSizeLimit { get; set; } + public int LogMaxFiles { get; set; } + public bool Debug { get; set; } + public bool Trace { get; set; } + public bool Logtime { get; set; } = true; + public bool LogtimeUTC { get; set; } + public bool Syslog { get; set; } + public string? RemoteSyslog { get; set; } + // Profiling (0 = disabled) public int ProfPort { get; set; } diff --git a/src/NATS.Server/NatsServer.cs b/src/NATS.Server/NatsServer.cs index b731448..02a0734 100644 --- a/src/NATS.Server/NatsServer.cs +++ b/src/NATS.Server/NatsServer.cs @@ -19,6 +19,8 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable { private readonly NatsOptions _options; private readonly ConcurrentDictionary _clients = new(); + private readonly ConcurrentQueue _closedClients = new(); + private const int MaxClosedClients = 10_000; private readonly ServerInfo _serverInfo; private readonly ILogger _logger; private readonly ILoggerFactory _loggerFactory; @@ -64,8 +66,13 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable public string ServerNKey { get; } public bool IsShuttingDown => Volatile.Read(ref _shutdown) != 0; public bool IsLameDuckMode => Volatile.Read(ref _lameDuck) != 0; + public Action? ReOpenLogFile { get; set; } public IEnumerable GetClients() => _clients.Values; + public IEnumerable GetClosedClients() => _closedClients; + + public IEnumerable GetAccounts() => _accounts.Values; + public Task WaitForReadyAsync() => _listeningStarted.Task; public void WaitForShutdown() => _shutdownComplete.Task.GetAwaiter().GetResult(); @@ -195,7 +202,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable /// /// Registers Unix signal handlers. - /// SIGTERM → shutdown, SIGUSR2 → lame duck, SIGUSR1 → log reopen (stub), SIGHUP → reload (stub). + /// SIGTERM → shutdown, SIGUSR2 → lame duck, SIGUSR1 → log reopen, SIGHUP → reload (stub). /// public void HandleSignals() { @@ -225,7 +232,8 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable _signalRegistrations.Add(PosixSignalRegistration.Create((PosixSignal)10, ctx => { ctx.Cancel = true; - _logger.LogWarning("Trapped SIGUSR1 signal — log reopen not yet supported"); + _logger.LogInformation("Trapped SIGUSR1 signal — reopening log file"); + ReOpenLogFile?.Invoke(); })); _signalRegistrations.Add(PosixSignalRegistration.Create((PosixSignal)12, ctx => @@ -615,6 +623,33 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable { _clients.TryRemove(client.Id, out _); _logger.LogDebug("Removed client {ClientId}", client.Id); + + // Snapshot for closed-connections tracking + _closedClients.Enqueue(new ClosedClient + { + Cid = client.Id, + Ip = client.RemoteIp ?? "", + Port = client.RemotePort, + Start = client.StartTime, + Stop = DateTime.UtcNow, + Reason = client.CloseReason.ToReasonString(), + Name = client.ClientOpts?.Name ?? "", + Lang = client.ClientOpts?.Lang ?? "", + Version = client.ClientOpts?.Version ?? "", + InMsgs = Interlocked.Read(ref client.InMsgs), + OutMsgs = Interlocked.Read(ref client.OutMsgs), + InBytes = Interlocked.Read(ref client.InBytes), + OutBytes = Interlocked.Read(ref client.OutBytes), + NumSubs = (uint)client.Subscriptions.Count, + Rtt = client.Rtt, + TlsVersion = client.TlsState?.TlsVersion ?? "", + TlsCipherSuite = client.TlsState?.CipherSuite ?? "", + }); + + // Cap closed clients list + while (_closedClients.Count > MaxClosedClients) + _closedClients.TryDequeue(out _); + var subList = client.Account?.SubList ?? _globalAccount.SubList; client.RemoveAllSubscriptions(subList); client.Account?.RemoveClient(client.Id); diff --git a/src/NATS.Server/ServerStats.cs b/src/NATS.Server/ServerStats.cs index b737dee..21ebd7d 100644 --- a/src/NATS.Server/ServerStats.cs +++ b/src/NATS.Server/ServerStats.cs @@ -16,5 +16,9 @@ public sealed class ServerStats public long SlowConsumerRoutes; public long SlowConsumerLeafs; public long SlowConsumerGateways; + public long StaleConnectionClients; + public long StaleConnectionRoutes; + public long StaleConnectionLeafs; + public long StaleConnectionGateways; public readonly ConcurrentDictionary HttpReqStats = new(); } diff --git a/src/NATS.Server/Subscriptions/SubList.cs b/src/NATS.Server/Subscriptions/SubList.cs index b7ca3c7..dc998a0 100644 --- a/src/NATS.Server/Subscriptions/SubList.cs +++ b/src/NATS.Server/Subscriptions/SubList.cs @@ -40,6 +40,62 @@ public sealed class SubList : IDisposable } } + /// + /// Returns all subscriptions in the trie. For monitoring only. + /// + public List GetAllSubscriptions() + { + _lock.EnterReadLock(); + try + { + var result = new List(); + CollectAll(_root, result); + return result; + } + finally + { + _lock.ExitReadLock(); + } + } + + private static void CollectAll(TrieLevel level, List result) + { + foreach (var (_, node) in level.Nodes) + { + foreach (var sub in node.PlainSubs) result.Add(sub); + foreach (var (_, qset) in node.QueueSubs) + foreach (var sub in qset) result.Add(sub); + if (node.Next != null) CollectAll(node.Next, result); + } + if (level.Pwc != null) + { + foreach (var sub in level.Pwc.PlainSubs) result.Add(sub); + foreach (var (_, qset) in level.Pwc.QueueSubs) + foreach (var sub in qset) result.Add(sub); + if (level.Pwc.Next != null) CollectAll(level.Pwc.Next, result); + } + if (level.Fwc != null) + { + foreach (var sub in level.Fwc.PlainSubs) result.Add(sub); + foreach (var (_, qset) in level.Fwc.QueueSubs) + foreach (var sub in qset) result.Add(sub); + if (level.Fwc.Next != null) CollectAll(level.Fwc.Next, result); + } + } + + /// + /// Returns the current number of entries in the cache. + /// + public int CacheCount + { + get + { + _lock.EnterReadLock(); + try { return _cache?.Count ?? 0; } + finally { _lock.ExitReadLock(); } + } + } + public void Insert(Subscription sub) { var subject = sub.Subject; diff --git a/tests/NATS.Server.Tests/LoggingTests.cs b/tests/NATS.Server.Tests/LoggingTests.cs new file mode 100644 index 0000000..d1daf6d --- /dev/null +++ b/tests/NATS.Server.Tests/LoggingTests.cs @@ -0,0 +1,60 @@ +using Serilog; + +namespace NATS.Server.Tests; + +public class LoggingTests : IDisposable +{ + private readonly string _logDir; + + public LoggingTests() + { + _logDir = Path.Combine(Path.GetTempPath(), $"nats-log-test-{Guid.NewGuid():N}"); + Directory.CreateDirectory(_logDir); + } + + public void Dispose() + { + try { Directory.Delete(_logDir, true); } catch { } + } + + [Fact] + public void File_sink_creates_log_file() + { + var logPath = Path.Combine(_logDir, "test.log"); + + using var logger = new LoggerConfiguration() + .WriteTo.File(logPath) + .CreateLogger(); + + logger.Information("Hello from test"); + logger.Dispose(); + + File.Exists(logPath).ShouldBeTrue(); + var content = File.ReadAllText(logPath); + content.ShouldContain("Hello from test"); + } + + [Fact] + public void File_sink_rotates_on_size_limit() + { + var logPath = Path.Combine(_logDir, "rotate.log"); + + using var logger = new LoggerConfiguration() + .WriteTo.File( + logPath, + fileSizeLimitBytes: 200, + rollOnFileSizeLimit: true, + retainedFileCountLimit: 3) + .CreateLogger(); + + // Write enough to trigger rotation + for (int i = 0; i < 50; i++) + logger.Information("Log message number {Number} with some padding text", i); + + logger.Dispose(); + + // Should have created rotated files + var logFiles = Directory.GetFiles(_logDir, "rotate*.log"); + logFiles.Length.ShouldBeGreaterThan(1); + } +} diff --git a/tests/NATS.Server.Tests/MonitorTests.cs b/tests/NATS.Server.Tests/MonitorTests.cs index 65a1399..e89a0db 100644 --- a/tests/NATS.Server.Tests/MonitorTests.cs +++ b/tests/NATS.Server.Tests/MonitorTests.cs @@ -179,6 +179,53 @@ public class MonitorTests : IAsyncLifetime conn.Subs.ShouldContain("bar"); } + [Fact] + public async Task Connz_state_closed_returns_disconnected_clients() + { + // Connect then disconnect a client + var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + await sock.ConnectAsync(new IPEndPoint(IPAddress.Loopback, _natsPort)); + using var stream = new NetworkStream(sock); + var buf = new byte[4096]; + _ = await stream.ReadAsync(buf); + await stream.WriteAsync("CONNECT {\"name\":\"closing-client\"}\r\n"u8.ToArray()); + await Task.Delay(200); + sock.Shutdown(SocketShutdown.Both); + sock.Dispose(); + await Task.Delay(500); + + var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/connz?state=closed"); + var connz = await response.Content.ReadFromJsonAsync(); + connz.ShouldNotBeNull(); + connz.Conns.ShouldContain(c => c.Name == "closing-client"); + var closed = connz.Conns.First(c => c.Name == "closing-client"); + closed.Stop.ShouldNotBeNull(); + closed.Reason.ShouldNotBeNullOrEmpty(); + } + + [Fact] + public async Task Connz_sort_by_stop_requires_closed_state() + { + var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/connz?sort=stop&state=open"); + var connz = await response.Content.ReadFromJsonAsync(); + connz.ShouldNotBeNull(); + } + + [Fact] + public async Task Connz_sort_by_reason() + { + var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + await sock.ConnectAsync(new IPEndPoint(IPAddress.Loopback, _natsPort)); + var buf = new byte[4096]; + _ = await sock.ReceiveAsync(buf); + sock.Shutdown(SocketShutdown.Both); + sock.Dispose(); + await Task.Delay(500); + + var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/connz?sort=reason&state=closed"); + response.StatusCode.ShouldBe(HttpStatusCode.OK); + } + private static int GetFreePort() { using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); diff --git a/tests/NATS.Server.Tests/NATS.Server.Tests.csproj b/tests/NATS.Server.Tests/NATS.Server.Tests.csproj index 611a86f..67f3fe4 100644 --- a/tests/NATS.Server.Tests/NATS.Server.Tests.csproj +++ b/tests/NATS.Server.Tests/NATS.Server.Tests.csproj @@ -13,6 +13,7 @@ + diff --git a/tests/NATS.Server.Tests/RttTests.cs b/tests/NATS.Server.Tests/RttTests.cs new file mode 100644 index 0000000..9632cf5 --- /dev/null +++ b/tests/NATS.Server.Tests/RttTests.cs @@ -0,0 +1,123 @@ +using System.Net; +using System.Net.Http.Json; +using System.Net.Sockets; +using Microsoft.Extensions.Logging.Abstractions; +using NATS.Server.Monitoring; + +namespace NATS.Server.Tests; + +public class RttTests : IAsyncLifetime +{ + private readonly NatsServer _server; + private readonly int _natsPort; + private readonly int _monitorPort; + private readonly CancellationTokenSource _cts = new(); + private readonly HttpClient _http = new(); + + public RttTests() + { + _natsPort = GetFreePort(); + _monitorPort = GetFreePort(); + _server = new NatsServer( + new NatsOptions + { + Port = _natsPort, + MonitorPort = _monitorPort, + PingInterval = TimeSpan.FromMilliseconds(200), + MaxPingsOut = 4, + }, + NullLoggerFactory.Instance); + } + + public async Task InitializeAsync() + { + _ = _server.StartAsync(_cts.Token); + await _server.WaitForReadyAsync(); + for (int i = 0; i < 50; i++) + { + try + { + var resp = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/healthz"); + if (resp.IsSuccessStatusCode) break; + } + catch (HttpRequestException) { } + await Task.Delay(50); + } + } + + public async Task DisposeAsync() + { + _http.Dispose(); + await _cts.CancelAsync(); + _server.Dispose(); + } + + [Fact] + public async Task Rtt_populated_after_ping_pong_cycle() + { + using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + await sock.ConnectAsync(new IPEndPoint(IPAddress.Loopback, _natsPort)); + using var stream = new NetworkStream(sock); + var buf = new byte[4096]; + _ = await stream.ReadAsync(buf); // INFO + + // Send CONNECT + PING (triggers firstPongSent) + await stream.WriteAsync("CONNECT {}\r\nPING\r\n"u8.ToArray()); + await stream.FlushAsync(); + _ = await stream.ReadAsync(buf); // PONG + + // Wait for server's PING cycle + await Task.Delay(500); + + // Read server PING and respond with PONG + var received = new byte[4096]; + int totalRead = 0; + bool gotPing = false; + using var readCts = new CancellationTokenSource(2000); + while (!gotPing && !readCts.IsCancellationRequested) + { + var n = await stream.ReadAsync(received.AsMemory(totalRead), readCts.Token); + totalRead += n; + var text = System.Text.Encoding.ASCII.GetString(received, 0, totalRead); + if (text.Contains("PING")) + { + gotPing = true; + await stream.WriteAsync("PONG\r\n"u8.ToArray()); + await stream.FlushAsync(); + } + } + + gotPing.ShouldBeTrue("Server should have sent PING"); + + // Wait for RTT to be computed + await Task.Delay(200); + + var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/connz"); + var connz = await response.Content.ReadFromJsonAsync(); + connz.ShouldNotBeNull(); + var conn = connz.Conns.FirstOrDefault(c => c.Rtt != ""); + conn.ShouldNotBeNull("At least one connection should have RTT populated"); + } + + [Fact] + public async Task Connz_sort_by_rtt() + { + using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + await sock.ConnectAsync(new IPEndPoint(IPAddress.Loopback, _natsPort)); + using var stream = new NetworkStream(sock); + var buf = new byte[4096]; + _ = await stream.ReadAsync(buf); + await stream.WriteAsync("CONNECT {}\r\n"u8.ToArray()); + await Task.Delay(200); + + var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/connz?sort=rtt"); + response.StatusCode.ShouldBe(System.Net.HttpStatusCode.OK); + } + + private static int GetFreePort() + { + using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + sock.Bind(new IPEndPoint(IPAddress.Loopback, 0)); + return ((IPEndPoint)sock.LocalEndPoint!).Port; + } +} diff --git a/tests/NATS.Server.Tests/ServerStatsTests.cs b/tests/NATS.Server.Tests/ServerStatsTests.cs index 6b45bd3..7baf061 100644 --- a/tests/NATS.Server.Tests/ServerStatsTests.cs +++ b/tests/NATS.Server.Tests/ServerStatsTests.cs @@ -75,6 +75,27 @@ public class ServerStatsTests : IAsyncLifetime client.StartTime.ShouldNotBe(default); } + [Fact] + public void StaleConnection_stats_incremented_on_mark_closed() + { + var stats = new ServerStats(); + stats.StaleConnectionClients.ShouldBe(0); + + Interlocked.Increment(ref stats.StaleConnectionClients); + stats.StaleConnectionClients.ShouldBe(1); + } + + [Fact] + public void StaleConnection_stats_all_fields_default_to_zero() + { + var stats = new ServerStats(); + stats.StaleConnections.ShouldBe(0); + stats.StaleConnectionClients.ShouldBe(0); + stats.StaleConnectionRoutes.ShouldBe(0); + stats.StaleConnectionLeafs.ShouldBe(0); + stats.StaleConnectionGateways.ShouldBe(0); + } + private static int GetFreePort() { using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); diff --git a/tests/NATS.Server.Tests/SubszTests.cs b/tests/NATS.Server.Tests/SubszTests.cs new file mode 100644 index 0000000..b049181 --- /dev/null +++ b/tests/NATS.Server.Tests/SubszTests.cs @@ -0,0 +1,137 @@ +using System.Net; +using System.Net.Http.Json; +using System.Net.Sockets; +using Microsoft.Extensions.Logging.Abstractions; +using NATS.Server.Monitoring; + +namespace NATS.Server.Tests; + +public class SubszTests : IAsyncLifetime +{ + private readonly NatsServer _server; + private readonly int _natsPort; + private readonly int _monitorPort; + private readonly CancellationTokenSource _cts = new(); + private readonly HttpClient _http = new(); + + public SubszTests() + { + _natsPort = GetFreePort(); + _monitorPort = GetFreePort(); + _server = new NatsServer( + new NatsOptions { Port = _natsPort, MonitorPort = _monitorPort }, + NullLoggerFactory.Instance); + } + + public async Task InitializeAsync() + { + _ = _server.StartAsync(_cts.Token); + await _server.WaitForReadyAsync(); + for (int i = 0; i < 50; i++) + { + try + { + var resp = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/healthz"); + if (resp.IsSuccessStatusCode) break; + } + catch (HttpRequestException) { } + await Task.Delay(50); + } + } + + public async Task DisposeAsync() + { + _http.Dispose(); + await _cts.CancelAsync(); + _server.Dispose(); + } + + [Fact] + public async Task Subz_returns_empty_when_no_subscriptions() + { + var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/subz"); + response.StatusCode.ShouldBe(HttpStatusCode.OK); + + var subz = await response.Content.ReadFromJsonAsync(); + subz.ShouldNotBeNull(); + subz.NumSubs.ShouldBe(0u); + } + + [Fact] + public async Task Subz_returns_count_with_subscriptions() + { + using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + await sock.ConnectAsync(new IPEndPoint(IPAddress.Loopback, _natsPort)); + using var stream = new NetworkStream(sock); + var buf = new byte[4096]; + _ = await stream.ReadAsync(buf); + await stream.WriteAsync("CONNECT {}\r\nSUB foo 1\r\nSUB bar 2\r\nSUB baz.* 3\r\n"u8.ToArray()); + await Task.Delay(200); + + var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/subz"); + var subz = await response.Content.ReadFromJsonAsync(); + subz.ShouldNotBeNull(); + subz.NumSubs.ShouldBeGreaterThanOrEqualTo(3u); + } + + [Fact] + public async Task Subz_subs_true_returns_subscription_details() + { + using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + await sock.ConnectAsync(new IPEndPoint(IPAddress.Loopback, _natsPort)); + using var stream = new NetworkStream(sock); + var buf = new byte[4096]; + _ = await stream.ReadAsync(buf); + await stream.WriteAsync("CONNECT {}\r\nSUB foo 1\r\n"u8.ToArray()); + await Task.Delay(200); + + var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/subz?subs=true"); + var subz = await response.Content.ReadFromJsonAsync(); + subz.ShouldNotBeNull(); + subz.Subs.ShouldNotBeEmpty(); + subz.Subs.ShouldContain(s => s.Subject == "foo"); + } + + [Fact] + public async Task Subz_test_subject_filters_matching_subs() + { + using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + await sock.ConnectAsync(new IPEndPoint(IPAddress.Loopback, _natsPort)); + using var stream = new NetworkStream(sock); + var buf = new byte[4096]; + _ = await stream.ReadAsync(buf); + await stream.WriteAsync("CONNECT {}\r\nSUB foo.* 1\r\nSUB bar 2\r\n"u8.ToArray()); + await Task.Delay(200); + + var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/subz?subs=true&test=foo.hello"); + var subz = await response.Content.ReadFromJsonAsync(); + subz.ShouldNotBeNull(); + subz.Subs.ShouldContain(s => s.Subject == "foo.*"); + subz.Subs.ShouldNotContain(s => s.Subject == "bar"); + } + + [Fact] + public async Task Subz_pagination() + { + using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + await sock.ConnectAsync(new IPEndPoint(IPAddress.Loopback, _natsPort)); + using var stream = new NetworkStream(sock); + var buf = new byte[4096]; + _ = await stream.ReadAsync(buf); + await stream.WriteAsync("CONNECT {}\r\nSUB a 1\r\nSUB b 2\r\nSUB c 3\r\n"u8.ToArray()); + await Task.Delay(200); + + var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/subz?subs=true&offset=0&limit=2"); + var subz = await response.Content.ReadFromJsonAsync(); + subz.ShouldNotBeNull(); + subz.Subs.Length.ShouldBe(2); + subz.Total.ShouldBeGreaterThanOrEqualTo(3); + } + + private static int GetFreePort() + { + using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + sock.Bind(new IPEndPoint(IPAddress.Loopback, 0)); + return ((IPEndPoint)sock.LocalEndPoint!).Port; + } +} diff --git a/tests/NATS.Server.Tests/TlsMapAuthenticatorTests.cs b/tests/NATS.Server.Tests/TlsMapAuthenticatorTests.cs new file mode 100644 index 0000000..e3e27ed --- /dev/null +++ b/tests/NATS.Server.Tests/TlsMapAuthenticatorTests.cs @@ -0,0 +1,134 @@ +using System.Security.Cryptography; +using System.Security.Cryptography.X509Certificates; +using NATS.Server.Auth; + +namespace NATS.Server.Tests; + +public class TlsMapAuthenticatorTests +{ + private static X509Certificate2 CreateSelfSignedCert(string cn) + { + using var rsa = RSA.Create(2048); + var req = new CertificateRequest($"CN={cn}", rsa, HashAlgorithmName.SHA256, RSASignaturePadding.Pkcs1); + return req.CreateSelfSigned(DateTimeOffset.UtcNow, DateTimeOffset.UtcNow.AddYears(1)); + } + + private static X509Certificate2 CreateCertWithDn(string dn) + { + using var rsa = RSA.Create(2048); + var req = new CertificateRequest(dn, rsa, HashAlgorithmName.SHA256, RSASignaturePadding.Pkcs1); + return req.CreateSelfSigned(DateTimeOffset.UtcNow, DateTimeOffset.UtcNow.AddYears(1)); + } + + [Fact] + public void Matches_user_by_cn() + { + var users = new List + { + new() { Username = "alice", Password = "" }, + }; + var auth = new TlsMapAuthenticator(users); + var cert = CreateSelfSignedCert("alice"); + + var ctx = new ClientAuthContext + { + Opts = new Protocol.ClientOptions(), + Nonce = [], + ClientCertificate = cert, + }; + + var result = auth.Authenticate(ctx); + result.ShouldNotBeNull(); + result.Identity.ShouldBe("alice"); + } + + [Fact] + public void Returns_null_when_no_cert() + { + var users = new List + { + new() { Username = "alice", Password = "" }, + }; + var auth = new TlsMapAuthenticator(users); + + var ctx = new ClientAuthContext + { + Opts = new Protocol.ClientOptions(), + Nonce = [], + ClientCertificate = null, + }; + + var result = auth.Authenticate(ctx); + result.ShouldBeNull(); + } + + [Fact] + public void Returns_null_when_cn_doesnt_match() + { + var users = new List + { + new() { Username = "alice", Password = "" }, + }; + var auth = new TlsMapAuthenticator(users); + var cert = CreateSelfSignedCert("bob"); + + var ctx = new ClientAuthContext + { + Opts = new Protocol.ClientOptions(), + Nonce = [], + ClientCertificate = cert, + }; + + var result = auth.Authenticate(ctx); + result.ShouldBeNull(); + } + + [Fact] + public void Matches_by_full_dn_string() + { + var users = new List + { + new() { Username = "CN=alice, O=TestOrg", Password = "" }, + }; + var auth = new TlsMapAuthenticator(users); + var cert = CreateCertWithDn("CN=alice, O=TestOrg"); + + var ctx = new ClientAuthContext + { + Opts = new Protocol.ClientOptions(), + Nonce = [], + ClientCertificate = cert, + }; + + var result = auth.Authenticate(ctx); + result.ShouldNotBeNull(); + result.Identity.ShouldBe("CN=alice, O=TestOrg"); + } + + [Fact] + public void Returns_permissions_from_matched_user() + { + var perms = new Permissions + { + Publish = new SubjectPermission { Allow = ["foo.>"] }, + }; + var users = new List + { + new() { Username = "alice", Password = "", Permissions = perms }, + }; + var auth = new TlsMapAuthenticator(users); + var cert = CreateSelfSignedCert("alice"); + + var ctx = new ClientAuthContext + { + Opts = new Protocol.ClientOptions(), + Nonce = [], + ClientCertificate = cert, + }; + + var result = auth.Authenticate(ctx); + result.ShouldNotBeNull(); + result.Permissions.ShouldNotBeNull(); + result.Permissions.Publish!.Allow!.ShouldContain("foo.>"); + } +} diff --git a/tests/NATS.Server.Tests/TlsRateLimiterTests.cs b/tests/NATS.Server.Tests/TlsRateLimiterTests.cs new file mode 100644 index 0000000..2e7e210 --- /dev/null +++ b/tests/NATS.Server.Tests/TlsRateLimiterTests.cs @@ -0,0 +1,49 @@ +using NATS.Server.Tls; + +namespace NATS.Server.Tests; + +public class TlsRateLimiterTests +{ + [Fact] + public async Task Rate_limiter_allows_configured_tokens_per_second() + { + using var limiter = new TlsRateLimiter(5); + + // Should allow 5 tokens immediately + for (int i = 0; i < 5; i++) + { + using var cts = new CancellationTokenSource(100); + await limiter.WaitAsync(cts.Token); // Should not throw + } + + // 6th token should block (no refill yet) + using var blockCts = new CancellationTokenSource(200); + var blocked = false; + try + { + await limiter.WaitAsync(blockCts.Token); + } + catch (OperationCanceledException) + { + blocked = true; + } + blocked.ShouldBeTrue("6th token should be blocked before refill"); + } + + [Fact] + public async Task Rate_limiter_refills_after_one_second() + { + using var limiter = new TlsRateLimiter(2); + + // Consume all tokens + await limiter.WaitAsync(CancellationToken.None); + await limiter.WaitAsync(CancellationToken.None); + + // Wait for refill + await Task.Delay(1200); + + // Should have tokens again + using var cts = new CancellationTokenSource(200); + await limiter.WaitAsync(cts.Token); // Should not throw + } +}