feat: add connz mqtt_client filtering for open and closed connections

This commit is contained in:
Joseph Doherty
2026-02-23 05:53:24 -05:00
parent 4a242f614f
commit 3f48d1c5ee
5 changed files with 57 additions and 0 deletions

View File

@@ -22,4 +22,5 @@ public sealed record ClosedClient
public TimeSpan Rtt { get; init; }
public string TlsVersion { get; init; } = "";
public string TlsCipherSuite { get; init; } = "";
public string MqttClient { get; init; } = "";
}

View File

@@ -204,6 +204,8 @@ public sealed class ConnzOptions
public string FilterSubject { get; set; } = "";
public string MqttClient { get; set; } = "";
public int Offset { get; set; }
public int Limit { get; set; } = 1024;

View File

@@ -28,6 +28,10 @@ public sealed class ConnzHandler(NatsServer server)
connInfos.AddRange(server.GetClosedClients().Select(c => BuildClosedConnInfo(c, now, opts)));
}
// Filter by MQTT client ID
if (!string.IsNullOrEmpty(opts.MqttClient))
connInfos = connInfos.Where(c => c.MqttClient == opts.MqttClient).ToList();
// Validate sort options that require closed state
if (opts.Sort is SortOpt.ByStop or SortOpt.ByReason && opts.State == ConnState.Open)
opts.Sort = SortOpt.ByCid; // Fallback
@@ -142,6 +146,7 @@ public sealed class ConnzHandler(NatsServer server)
Rtt = FormatRtt(closed.Rtt),
TlsVersion = closed.TlsVersion,
TlsCipherSuite = closed.TlsCipherSuite,
MqttClient = closed.MqttClient,
};
}
@@ -197,6 +202,9 @@ public sealed class ConnzHandler(NatsServer server)
if (q.TryGetValue("limit", out var limit) && int.TryParse(limit, out var l))
opts.Limit = l;
if (q.TryGetValue("mqtt_client", out var mqttClient))
opts.MqttClient = mqttClient.ToString();
return opts;
}

View File

@@ -837,6 +837,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
Rtt = client.Rtt,
TlsVersion = client.TlsState?.TlsVersion ?? "",
TlsCipherSuite = client.TlsState?.CipherSuite ?? "",
MqttClient = "", // populated when MQTT transport is implemented
});
// Cap closed clients list

View File

@@ -203,6 +203,51 @@ public class MonitorTests : IAsyncLifetime
closed.Reason.ShouldNotBeNullOrEmpty();
}
[Fact]
public async Task Connz_filters_by_mqtt_client_for_open_connections()
{
// Connect a regular NATS client (no MQTT ID)
using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await sock.ConnectAsync(new IPEndPoint(IPAddress.Loopback, _natsPort));
using var stream = new NetworkStream(sock);
var buf = new byte[4096];
_ = await stream.ReadAsync(buf);
await stream.WriteAsync("CONNECT {}\r\n"u8.ToArray());
await Task.Delay(200);
// Query for an MQTT client ID that no connection has
var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/connz?mqtt_client=some-id");
response.StatusCode.ShouldBe(HttpStatusCode.OK);
var connz = await response.Content.ReadFromJsonAsync<Connz>();
connz.ShouldNotBeNull();
connz.NumConns.ShouldBe(0);
}
[Fact]
public async Task Connz_filters_by_mqtt_client_for_closed_connections()
{
// Connect then disconnect a client so it appears in closed list
var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await sock.ConnectAsync(new IPEndPoint(IPAddress.Loopback, _natsPort));
using var stream = new NetworkStream(sock);
var buf = new byte[4096];
_ = await stream.ReadAsync(buf);
await stream.WriteAsync("CONNECT {}\r\n"u8.ToArray());
await Task.Delay(200);
sock.Shutdown(SocketShutdown.Both);
sock.Dispose();
await Task.Delay(500);
// Query closed connections with an MQTT client ID that no connection has
var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/connz?state=closed&mqtt_client=missing-id");
response.StatusCode.ShouldBe(HttpStatusCode.OK);
var connz = await response.Content.ReadFromJsonAsync<Connz>();
connz.ShouldNotBeNull();
connz.NumConns.ShouldBe(0);
}
[Fact]
public async Task Connz_sort_by_stop_requires_closed_state()
{