diff --git a/src/NATS.Server/Monitoring/ClosedClient.cs b/src/NATS.Server/Monitoring/ClosedClient.cs new file mode 100644 index 0000000..0710d19 --- /dev/null +++ b/src/NATS.Server/Monitoring/ClosedClient.cs @@ -0,0 +1,25 @@ +namespace NATS.Server.Monitoring; + +/// +/// Snapshot of a closed client connection for /connz reporting. +/// +public sealed record ClosedClient +{ + public required ulong Cid { get; init; } + public string Ip { get; init; } = ""; + public int Port { get; init; } + public DateTime Start { get; init; } + public DateTime Stop { get; init; } + public string Reason { get; init; } = ""; + public string Name { get; init; } = ""; + public string Lang { get; init; } = ""; + public string Version { get; init; } = ""; + public long InMsgs { get; init; } + public long OutMsgs { get; init; } + public long InBytes { get; init; } + public long OutBytes { get; init; } + public uint NumSubs { get; init; } + public TimeSpan Rtt { get; init; } + public string TlsVersion { get; init; } = ""; + public string TlsCipherSuite { get; init; } = ""; +} diff --git a/src/NATS.Server/Monitoring/Connz.cs b/src/NATS.Server/Monitoring/Connz.cs index 2147715..aae62ed 100644 --- a/src/NATS.Server/Monitoring/Connz.cs +++ b/src/NATS.Server/Monitoring/Connz.cs @@ -169,6 +169,8 @@ public enum SortOpt ByIdle, ByUptime, ByRtt, + ByStop, + ByReason, } /// diff --git a/src/NATS.Server/Monitoring/ConnzHandler.cs b/src/NATS.Server/Monitoring/ConnzHandler.cs index f532e7c..8ecf512 100644 --- a/src/NATS.Server/Monitoring/ConnzHandler.cs +++ b/src/NATS.Server/Monitoring/ConnzHandler.cs @@ -12,9 +12,25 @@ public sealed class ConnzHandler(NatsServer server) { var opts = ParseQueryParams(ctx); var now = DateTime.UtcNow; - var clients = server.GetClients().ToArray(); - var connInfos = clients.Select(c => BuildConnInfo(c, now, opts)).ToList(); + var connInfos = new List(); + + // Collect open connections + if (opts.State is ConnState.Open or ConnState.All) + { + var clients = server.GetClients().ToArray(); + connInfos.AddRange(clients.Select(c => BuildConnInfo(c, now, opts))); + } + + // Collect closed connections + if (opts.State is ConnState.Closed or ConnState.All) + { + connInfos.AddRange(server.GetClosedClients().Select(c => BuildClosedConnInfo(c, now, opts))); + } + + // Validate sort options that require closed state + if (opts.Sort is SortOpt.ByStop or SortOpt.ByReason && opts.State == ConnState.Open) + opts.Sort = SortOpt.ByCid; // Fallback // Sort connInfos = opts.Sort switch @@ -30,6 +46,8 @@ public sealed class ConnzHandler(NatsServer server) SortOpt.ByLast => connInfos.OrderByDescending(c => c.LastActivity).ToList(), SortOpt.ByIdle => connInfos.OrderByDescending(c => now - c.LastActivity).ToList(), SortOpt.ByUptime => connInfos.OrderByDescending(c => now - c.Start).ToList(), + SortOpt.ByStop => connInfos.OrderByDescending(c => c.Stop ?? DateTime.MinValue).ToList(), + SortOpt.ByReason => connInfos.OrderBy(c => c.Reason).ToList(), SortOpt.ByRtt => connInfos.OrderBy(c => c.Rtt).ToList(), _ => connInfos.OrderBy(c => c.Cid).ToList(), }; @@ -74,7 +92,7 @@ public sealed class ConnzHandler(NatsServer server) Reason = client.CloseReason.ToReasonString(), TlsVersion = client.TlsState?.TlsVersion ?? "", TlsCipherSuite = client.TlsState?.CipherSuite ?? "", - Rtt = client.Rtt == TimeSpan.Zero ? "" : $"{client.Rtt.TotalMilliseconds:F3}ms", + Rtt = FormatRtt(client.Rtt), }; if (opts.Subscriptions) @@ -98,6 +116,35 @@ public sealed class ConnzHandler(NatsServer server) return info; } + private static ConnInfo BuildClosedConnInfo(ClosedClient closed, DateTime now, ConnzOptions opts) + { + return new ConnInfo + { + Cid = closed.Cid, + Kind = "Client", + Type = "Client", + Ip = closed.Ip, + Port = closed.Port, + Start = closed.Start, + Stop = closed.Stop, + LastActivity = closed.Stop, + Uptime = FormatDuration(closed.Stop - closed.Start), + Idle = FormatDuration(now - closed.Stop), + InMsgs = closed.InMsgs, + OutMsgs = closed.OutMsgs, + InBytes = closed.InBytes, + OutBytes = closed.OutBytes, + NumSubs = closed.NumSubs, + Name = closed.Name, + Lang = closed.Lang, + Version = closed.Version, + Reason = closed.Reason, + Rtt = FormatRtt(closed.Rtt), + TlsVersion = closed.TlsVersion, + TlsCipherSuite = closed.TlsCipherSuite, + }; + } + private static ConnzOptions ParseQueryParams(HttpContext ctx) { var q = ctx.Request.Query; @@ -119,6 +166,8 @@ public sealed class ConnzHandler(NatsServer server) "idle" => SortOpt.ByIdle, "uptime" => SortOpt.ByUptime, "rtt" => SortOpt.ByRtt, + "stop" => SortOpt.ByStop, + "reason" => SortOpt.ByReason, _ => SortOpt.ByCid, }; } @@ -131,6 +180,17 @@ public sealed class ConnzHandler(NatsServer server) opts.Subscriptions = true; } + if (q.TryGetValue("state", out var state)) + { + opts.State = state.ToString().ToLowerInvariant() switch + { + "open" => ConnState.Open, + "closed" => ConnState.Closed, + "all" => ConnState.All, + _ => ConnState.Open, + }; + } + if (q.TryGetValue("offset", out var offset) && int.TryParse(offset, out var o)) opts.Offset = o; @@ -140,6 +200,16 @@ public sealed class ConnzHandler(NatsServer server) return opts; } + private static string FormatRtt(TimeSpan rtt) + { + if (rtt == TimeSpan.Zero) return ""; + if (rtt.TotalMilliseconds < 1) + return $"{rtt.TotalMicroseconds:F3}\u00b5s"; + if (rtt.TotalSeconds < 1) + return $"{rtt.TotalMilliseconds:F3}ms"; + return $"{rtt.TotalSeconds:F3}s"; + } + private static string FormatDuration(TimeSpan ts) { if (ts.TotalDays >= 1) diff --git a/src/NATS.Server/NatsServer.cs b/src/NATS.Server/NatsServer.cs index 2eff8de..64b1cf5 100644 --- a/src/NATS.Server/NatsServer.cs +++ b/src/NATS.Server/NatsServer.cs @@ -19,6 +19,8 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable { private readonly NatsOptions _options; private readonly ConcurrentDictionary _clients = new(); + private readonly ConcurrentQueue _closedClients = new(); + private const int MaxClosedClients = 10_000; private readonly ServerInfo _serverInfo; private readonly ILogger _logger; private readonly ILoggerFactory _loggerFactory; @@ -64,6 +66,8 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable public Action? ReOpenLogFile { get; set; } public IEnumerable GetClients() => _clients.Values; + public IEnumerable GetClosedClients() => _closedClients; + public IEnumerable GetAccounts() => _accounts.Values; public Task WaitForReadyAsync() => _listeningStarted.Task; @@ -580,6 +584,33 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable { _clients.TryRemove(client.Id, out _); _logger.LogDebug("Removed client {ClientId}", client.Id); + + // Snapshot for closed-connections tracking + _closedClients.Enqueue(new ClosedClient + { + Cid = client.Id, + Ip = client.RemoteIp ?? "", + Port = client.RemotePort, + Start = client.StartTime, + Stop = DateTime.UtcNow, + Reason = client.CloseReason.ToReasonString(), + Name = client.ClientOpts?.Name ?? "", + Lang = client.ClientOpts?.Lang ?? "", + Version = client.ClientOpts?.Version ?? "", + InMsgs = Interlocked.Read(ref client.InMsgs), + OutMsgs = Interlocked.Read(ref client.OutMsgs), + InBytes = Interlocked.Read(ref client.InBytes), + OutBytes = Interlocked.Read(ref client.OutBytes), + NumSubs = (uint)client.Subscriptions.Count, + Rtt = client.Rtt, + TlsVersion = client.TlsState?.TlsVersion ?? "", + TlsCipherSuite = client.TlsState?.CipherSuite ?? "", + }); + + // Cap closed clients list + while (_closedClients.Count > MaxClosedClients) + _closedClients.TryDequeue(out _); + var subList = client.Account?.SubList ?? _globalAccount.SubList; client.RemoveAllSubscriptions(subList); client.Account?.RemoveClient(client.Id); diff --git a/tests/NATS.Server.Tests/MonitorTests.cs b/tests/NATS.Server.Tests/MonitorTests.cs index 65a1399..e89a0db 100644 --- a/tests/NATS.Server.Tests/MonitorTests.cs +++ b/tests/NATS.Server.Tests/MonitorTests.cs @@ -179,6 +179,53 @@ public class MonitorTests : IAsyncLifetime conn.Subs.ShouldContain("bar"); } + [Fact] + public async Task Connz_state_closed_returns_disconnected_clients() + { + // Connect then disconnect a client + var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + await sock.ConnectAsync(new IPEndPoint(IPAddress.Loopback, _natsPort)); + using var stream = new NetworkStream(sock); + var buf = new byte[4096]; + _ = await stream.ReadAsync(buf); + await stream.WriteAsync("CONNECT {\"name\":\"closing-client\"}\r\n"u8.ToArray()); + await Task.Delay(200); + sock.Shutdown(SocketShutdown.Both); + sock.Dispose(); + await Task.Delay(500); + + var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/connz?state=closed"); + var connz = await response.Content.ReadFromJsonAsync(); + connz.ShouldNotBeNull(); + connz.Conns.ShouldContain(c => c.Name == "closing-client"); + var closed = connz.Conns.First(c => c.Name == "closing-client"); + closed.Stop.ShouldNotBeNull(); + closed.Reason.ShouldNotBeNullOrEmpty(); + } + + [Fact] + public async Task Connz_sort_by_stop_requires_closed_state() + { + var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/connz?sort=stop&state=open"); + var connz = await response.Content.ReadFromJsonAsync(); + connz.ShouldNotBeNull(); + } + + [Fact] + public async Task Connz_sort_by_reason() + { + var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + await sock.ConnectAsync(new IPEndPoint(IPAddress.Loopback, _natsPort)); + var buf = new byte[4096]; + _ = await sock.ReceiveAsync(buf); + sock.Shutdown(SocketShutdown.Both); + sock.Dispose(); + await Task.Delay(500); + + var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/connz?sort=reason&state=closed"); + response.StatusCode.ShouldBe(HttpStatusCode.OK); + } + private static int GetFreePort() { using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);