From 9eb108b1df6e720710bedc2041bdfca3b9ff2dd4 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 22 Feb 2026 22:36:28 -0500 Subject: [PATCH] feat: add /connz endpoint with pagination, sorting, and subscription details --- src/NATS.Server/Monitoring/ConnzHandler.cs | 148 ++++++++++++++++++++ src/NATS.Server/Monitoring/MonitorServer.cs | 9 +- tests/NATS.Server.Tests/MonitorTests.cs | 79 +++++++++++ 3 files changed, 233 insertions(+), 3 deletions(-) create mode 100644 src/NATS.Server/Monitoring/ConnzHandler.cs diff --git a/src/NATS.Server/Monitoring/ConnzHandler.cs b/src/NATS.Server/Monitoring/ConnzHandler.cs new file mode 100644 index 0000000..e5cd7b0 --- /dev/null +++ b/src/NATS.Server/Monitoring/ConnzHandler.cs @@ -0,0 +1,148 @@ +using Microsoft.AspNetCore.Http; + +namespace NATS.Server.Monitoring; + +/// +/// Handles /connz endpoint requests, returning detailed connection information. +/// Corresponds to Go server/monitor.go handleConnz function. +/// +public sealed class ConnzHandler(NatsServer server) +{ + public Connz HandleConnz(HttpContext ctx) + { + var opts = ParseQueryParams(ctx); + var now = DateTime.UtcNow; + var clients = server.GetClients().ToArray(); + + var connInfos = clients.Select(c => BuildConnInfo(c, now, opts)).ToList(); + + // Sort + connInfos = opts.Sort switch + { + SortOpt.ByCid => connInfos.OrderBy(c => c.Cid).ToList(), + SortOpt.ByStart => connInfos.OrderBy(c => c.Start).ToList(), + SortOpt.BySubs => connInfos.OrderByDescending(c => c.NumSubs).ToList(), + SortOpt.ByPending => connInfos.OrderByDescending(c => c.Pending).ToList(), + SortOpt.ByMsgsTo => connInfos.OrderByDescending(c => c.OutMsgs).ToList(), + SortOpt.ByMsgsFrom => connInfos.OrderByDescending(c => c.InMsgs).ToList(), + SortOpt.ByBytesTo => connInfos.OrderByDescending(c => c.OutBytes).ToList(), + SortOpt.ByBytesFrom => connInfos.OrderByDescending(c => c.InBytes).ToList(), + SortOpt.ByLast => connInfos.OrderByDescending(c => c.LastActivity).ToList(), + SortOpt.ByIdle => connInfos.OrderByDescending(c => now - c.LastActivity).ToList(), + SortOpt.ByUptime => connInfos.OrderByDescending(c => now - c.Start).ToList(), + _ => connInfos.OrderBy(c => c.Cid).ToList(), + }; + + var total = connInfos.Count; + var paged = connInfos.Skip(opts.Offset).Take(opts.Limit).ToArray(); + + return new Connz + { + Id = server.ServerId, + Now = now, + NumConns = paged.Length, + Total = total, + Offset = opts.Offset, + Limit = opts.Limit, + Conns = paged, + }; + } + + private static ConnInfo BuildConnInfo(NatsClient client, DateTime now, ConnzOptions opts) + { + var info = new ConnInfo + { + Cid = client.Id, + Kind = "Client", + Type = "Client", + Ip = client.RemoteIp ?? "", + Port = client.RemotePort, + Start = client.StartTime, + LastActivity = client.LastActivity, + Uptime = FormatDuration(now - client.StartTime), + Idle = FormatDuration(now - client.LastActivity), + InMsgs = Interlocked.Read(ref client.InMsgs), + OutMsgs = Interlocked.Read(ref client.OutMsgs), + InBytes = Interlocked.Read(ref client.InBytes), + OutBytes = Interlocked.Read(ref client.OutBytes), + NumSubs = (uint)client.Subscriptions.Count, + Name = client.ClientOpts?.Name ?? "", + Lang = client.ClientOpts?.Lang ?? "", + Version = client.ClientOpts?.Version ?? "", + TlsVersion = client.TlsState?.TlsVersion ?? "", + TlsCipherSuite = client.TlsState?.CipherSuite ?? "", + }; + + if (opts.Subscriptions) + { + info.Subs = client.Subscriptions.Values.Select(s => s.Subject).ToArray(); + } + + if (opts.SubscriptionsDetail) + { + info.SubsDetail = client.Subscriptions.Values.Select(s => new SubDetail + { + Subject = s.Subject, + Queue = s.Queue ?? "", + Sid = s.Sid, + Msgs = Interlocked.Read(ref s.MessageCount), + Max = s.MaxMessages, + Cid = client.Id, + }).ToArray(); + } + + return info; + } + + private static ConnzOptions ParseQueryParams(HttpContext ctx) + { + var q = ctx.Request.Query; + var opts = new ConnzOptions(); + + if (q.TryGetValue("sort", out var sort)) + { + opts.Sort = sort.ToString().ToLowerInvariant() switch + { + "cid" => SortOpt.ByCid, + "start" => SortOpt.ByStart, + "subs" => SortOpt.BySubs, + "pending" => SortOpt.ByPending, + "msgs_to" => SortOpt.ByMsgsTo, + "msgs_from" => SortOpt.ByMsgsFrom, + "bytes_to" => SortOpt.ByBytesTo, + "bytes_from" => SortOpt.ByBytesFrom, + "last" => SortOpt.ByLast, + "idle" => SortOpt.ByIdle, + "uptime" => SortOpt.ByUptime, + _ => SortOpt.ByCid, + }; + } + + if (q.TryGetValue("subs", out var subs)) + { + if (subs == "detail") + opts.SubscriptionsDetail = true; + else + opts.Subscriptions = true; + } + + if (q.TryGetValue("offset", out var offset) && int.TryParse(offset, out var o)) + opts.Offset = o; + + if (q.TryGetValue("limit", out var limit) && int.TryParse(limit, out var l)) + opts.Limit = l; + + return opts; + } + + private static string FormatDuration(TimeSpan ts) + { + if (ts.TotalDays >= 1) + return $"{(int)ts.TotalDays}d{ts.Hours}h{ts.Minutes}m{ts.Seconds}s"; + if (ts.TotalHours >= 1) + return $"{(int)ts.TotalHours}h{ts.Minutes}m{ts.Seconds}s"; + if (ts.TotalMinutes >= 1) + return $"{(int)ts.TotalMinutes}m{ts.Seconds}s"; + return $"{(int)ts.TotalSeconds}s"; + } +} diff --git a/src/NATS.Server/Monitoring/MonitorServer.cs b/src/NATS.Server/Monitoring/MonitorServer.cs index f33dbff..af23506 100644 --- a/src/NATS.Server/Monitoring/MonitorServer.cs +++ b/src/NATS.Server/Monitoring/MonitorServer.cs @@ -14,6 +14,7 @@ public sealed class MonitorServer : IAsyncDisposable private readonly WebApplication _app; private readonly ILogger _logger; private readonly VarzHandler _varzHandler; + private readonly ConnzHandler _connzHandler; public MonitorServer(NatsServer server, NatsOptions options, ServerStats stats, ILoggerFactory loggerFactory) { @@ -27,6 +28,7 @@ public sealed class MonitorServer : IAsyncDisposable var basePath = options.MonitorBasePath ?? ""; _varzHandler = new VarzHandler(server, options); + _connzHandler = new ConnzHandler(server); _app.MapGet(basePath + "/", () => { @@ -51,12 +53,13 @@ public sealed class MonitorServer : IAsyncDisposable return Results.Ok(await _varzHandler.HandleVarzAsync(ctx.RequestAborted)); }); - // Stubs for unimplemented endpoints - _app.MapGet(basePath + "/connz", () => + _app.MapGet(basePath + "/connz", (HttpContext ctx) => { stats.HttpReqStats.AddOrUpdate("/connz", 1, (_, v) => v + 1); - return Results.Ok(new { }); + return Results.Ok(_connzHandler.HandleConnz(ctx)); }); + + // Stubs for unimplemented endpoints _app.MapGet(basePath + "/routez", () => { stats.HttpReqStats.AddOrUpdate("/routez", 1, (_, v) => v + 1); diff --git a/tests/NATS.Server.Tests/MonitorTests.cs b/tests/NATS.Server.Tests/MonitorTests.cs index 0d4c52e..898557a 100644 --- a/tests/NATS.Server.Tests/MonitorTests.cs +++ b/tests/NATS.Server.Tests/MonitorTests.cs @@ -98,6 +98,85 @@ public class MonitorTests : IAsyncLifetime varz.InBytes.ShouldBeGreaterThanOrEqualTo(5L); } + [Fact] + public async Task Connz_returns_connections() + { + using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + await sock.ConnectAsync(new IPEndPoint(IPAddress.Loopback, _natsPort)); + using var stream = new NetworkStream(sock); + var buf = new byte[4096]; + _ = await stream.ReadAsync(buf); + await stream.WriteAsync("CONNECT {\"name\":\"test-client\",\"lang\":\"csharp\",\"version\":\"1.0\"}\r\n"u8.ToArray()); + await Task.Delay(200); + + var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/connz"); + response.StatusCode.ShouldBe(HttpStatusCode.OK); + + var connz = await response.Content.ReadFromJsonAsync(); + connz.ShouldNotBeNull(); + connz.NumConns.ShouldBeGreaterThanOrEqualTo(1); + connz.Conns.Length.ShouldBeGreaterThanOrEqualTo(1); + + var conn = connz.Conns.First(c => c.Name == "test-client"); + conn.Ip.ShouldNotBeNullOrEmpty(); + conn.Port.ShouldBeGreaterThan(0); + conn.Lang.ShouldBe("csharp"); + conn.Version.ShouldBe("1.0"); + conn.Uptime.ShouldNotBeNullOrEmpty(); + } + + [Fact] + public async Task Connz_pagination() + { + var sockets = new List(); + try + { + for (int i = 0; i < 3; i++) + { + var s = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + await s.ConnectAsync(new IPEndPoint(IPAddress.Loopback, _natsPort)); + var ns = new NetworkStream(s); + var buf = new byte[4096]; + _ = await ns.ReadAsync(buf); + await ns.WriteAsync("CONNECT {}\r\n"u8.ToArray()); + sockets.Add(s); + } + await Task.Delay(200); + + var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/connz?limit=2&offset=0"); + var connz = await response.Content.ReadFromJsonAsync(); + + connz!.Conns.Length.ShouldBe(2); + connz.Total.ShouldBeGreaterThanOrEqualTo(3); + connz.Limit.ShouldBe(2); + connz.Offset.ShouldBe(0); + } + finally + { + foreach (var s in sockets) s.Dispose(); + } + } + + [Fact] + public async Task Connz_with_subscriptions() + { + using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + await sock.ConnectAsync(new IPEndPoint(IPAddress.Loopback, _natsPort)); + using var stream = new NetworkStream(sock); + var buf = new byte[4096]; + _ = await stream.ReadAsync(buf); + await stream.WriteAsync("CONNECT {}\r\nSUB foo 1\r\nSUB bar 2\r\n"u8.ToArray()); + await Task.Delay(200); + + var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/connz?subs=true"); + var connz = await response.Content.ReadFromJsonAsync(); + + var conn = connz!.Conns.First(c => c.NumSubs >= 2); + conn.Subs.ShouldNotBeNull(); + conn.Subs.ShouldContain("foo"); + conn.Subs.ShouldContain("bar"); + } + private static int GetFreePort() { using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);