diff --git a/src/NATS.Server/Monitoring/ClosedClient.cs b/src/NATS.Server/Monitoring/ClosedClient.cs index 0710d19..277fba8 100644 --- a/src/NATS.Server/Monitoring/ClosedClient.cs +++ b/src/NATS.Server/Monitoring/ClosedClient.cs @@ -22,4 +22,5 @@ public sealed record ClosedClient public TimeSpan Rtt { get; init; } public string TlsVersion { get; init; } = ""; public string TlsCipherSuite { get; init; } = ""; + public string MqttClient { get; init; } = ""; } diff --git a/src/NATS.Server/Monitoring/Connz.cs b/src/NATS.Server/Monitoring/Connz.cs index aae62ed..cea93ca 100644 --- a/src/NATS.Server/Monitoring/Connz.cs +++ b/src/NATS.Server/Monitoring/Connz.cs @@ -204,6 +204,8 @@ public sealed class ConnzOptions public string FilterSubject { get; set; } = ""; + public string MqttClient { get; set; } = ""; + public int Offset { get; set; } public int Limit { get; set; } = 1024; diff --git a/src/NATS.Server/Monitoring/ConnzHandler.cs b/src/NATS.Server/Monitoring/ConnzHandler.cs index 8ecf512..96c96de 100644 --- a/src/NATS.Server/Monitoring/ConnzHandler.cs +++ b/src/NATS.Server/Monitoring/ConnzHandler.cs @@ -28,6 +28,10 @@ public sealed class ConnzHandler(NatsServer server) connInfos.AddRange(server.GetClosedClients().Select(c => BuildClosedConnInfo(c, now, opts))); } + // Filter by MQTT client ID + if (!string.IsNullOrEmpty(opts.MqttClient)) + connInfos = connInfos.Where(c => c.MqttClient == opts.MqttClient).ToList(); + // Validate sort options that require closed state if (opts.Sort is SortOpt.ByStop or SortOpt.ByReason && opts.State == ConnState.Open) opts.Sort = SortOpt.ByCid; // Fallback @@ -142,6 +146,7 @@ public sealed class ConnzHandler(NatsServer server) Rtt = FormatRtt(closed.Rtt), TlsVersion = closed.TlsVersion, TlsCipherSuite = closed.TlsCipherSuite, + MqttClient = closed.MqttClient, }; } @@ -197,6 +202,9 @@ public sealed class ConnzHandler(NatsServer server) if (q.TryGetValue("limit", out var limit) && int.TryParse(limit, out var l)) opts.Limit = l; + if (q.TryGetValue("mqtt_client", out var mqttClient)) + opts.MqttClient = mqttClient.ToString(); + return opts; } diff --git a/src/NATS.Server/NatsServer.cs b/src/NATS.Server/NatsServer.cs index 9a1f717..f49a6c4 100644 --- a/src/NATS.Server/NatsServer.cs +++ b/src/NATS.Server/NatsServer.cs @@ -837,6 +837,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable Rtt = client.Rtt, TlsVersion = client.TlsState?.TlsVersion ?? "", TlsCipherSuite = client.TlsState?.CipherSuite ?? "", + MqttClient = "", // populated when MQTT transport is implemented }); // Cap closed clients list diff --git a/tests/NATS.Server.Tests/MonitorTests.cs b/tests/NATS.Server.Tests/MonitorTests.cs index e89a0db..d4dad6c 100644 --- a/tests/NATS.Server.Tests/MonitorTests.cs +++ b/tests/NATS.Server.Tests/MonitorTests.cs @@ -203,6 +203,51 @@ public class MonitorTests : IAsyncLifetime closed.Reason.ShouldNotBeNullOrEmpty(); } + [Fact] + public async Task Connz_filters_by_mqtt_client_for_open_connections() + { + // Connect a regular NATS client (no MQTT ID) + using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + await sock.ConnectAsync(new IPEndPoint(IPAddress.Loopback, _natsPort)); + using var stream = new NetworkStream(sock); + var buf = new byte[4096]; + _ = await stream.ReadAsync(buf); + await stream.WriteAsync("CONNECT {}\r\n"u8.ToArray()); + await Task.Delay(200); + + // Query for an MQTT client ID that no connection has + var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/connz?mqtt_client=some-id"); + response.StatusCode.ShouldBe(HttpStatusCode.OK); + + var connz = await response.Content.ReadFromJsonAsync(); + connz.ShouldNotBeNull(); + connz.NumConns.ShouldBe(0); + } + + [Fact] + public async Task Connz_filters_by_mqtt_client_for_closed_connections() + { + // Connect then disconnect a client so it appears in closed list + var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + await sock.ConnectAsync(new IPEndPoint(IPAddress.Loopback, _natsPort)); + using var stream = new NetworkStream(sock); + var buf = new byte[4096]; + _ = await stream.ReadAsync(buf); + await stream.WriteAsync("CONNECT {}\r\n"u8.ToArray()); + await Task.Delay(200); + sock.Shutdown(SocketShutdown.Both); + sock.Dispose(); + await Task.Delay(500); + + // Query closed connections with an MQTT client ID that no connection has + var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/connz?state=closed&mqtt_client=missing-id"); + response.StatusCode.ShouldBe(HttpStatusCode.OK); + + var connz = await response.Content.ReadFromJsonAsync(); + connz.ShouldNotBeNull(); + connz.NumConns.ShouldBe(0); + } + [Fact] public async Task Connz_sort_by_stop_requires_closed_state() {