using System.Net; using System.Net.Http.Json; using System.Net.Security; using System.Net.Sockets; using System.Text; using Microsoft.Extensions.Logging.Abstractions; using NATS.Server.Monitoring; namespace NATS.Server.Tests; public class MonitorTests : IAsyncLifetime { private readonly NatsServer _server; private readonly int _natsPort; private readonly int _monitorPort; private readonly CancellationTokenSource _cts = new(); private readonly HttpClient _http = new(); public MonitorTests() { _natsPort = GetFreePort(); _monitorPort = GetFreePort(); _server = new NatsServer( new NatsOptions { Port = _natsPort, MonitorPort = _monitorPort }, NullLoggerFactory.Instance); } public async Task InitializeAsync() { _ = _server.StartAsync(_cts.Token); await _server.WaitForReadyAsync(); // Wait for monitoring HTTP server to be ready for (int i = 0; i < 50; i++) { try { var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/healthz"); if (response.IsSuccessStatusCode) break; } catch (HttpRequestException) { } await Task.Delay(50); } } public async Task DisposeAsync() { _http.Dispose(); await _cts.CancelAsync(); _server.Dispose(); } [Fact] public async Task Healthz_returns_ok() { var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/healthz"); response.StatusCode.ShouldBe(HttpStatusCode.OK); } [Fact] public async Task Varz_returns_server_identity() { var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/varz"); response.StatusCode.ShouldBe(HttpStatusCode.OK); var varz = await response.Content.ReadFromJsonAsync(); varz.ShouldNotBeNull(); varz.Id.ShouldNotBeNullOrEmpty(); varz.Name.ShouldNotBeNullOrEmpty(); varz.Version.ShouldBe("0.1.0"); varz.Host.ShouldBe("0.0.0.0"); varz.Port.ShouldBe(_natsPort); varz.MaxPayload.ShouldBe(1024 * 1024); varz.Uptime.ShouldNotBeNullOrEmpty(); varz.Now.ShouldBeGreaterThan(DateTime.MinValue); varz.Mem.ShouldBeGreaterThan(0); varz.Cores.ShouldBeGreaterThan(0); } [Fact] public async Task Varz_tracks_connections_and_messages() { // Connect a client and send a message using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); await sock.ConnectAsync(new IPEndPoint(IPAddress.Loopback, _natsPort)); var buf = new byte[4096]; _ = await sock.ReceiveAsync(buf, SocketFlags.None); // Read INFO var cmd = "CONNECT {}\r\nSUB test 1\r\nPUB test 5\r\nhello\r\n"u8.ToArray(); await sock.SendAsync(cmd, SocketFlags.None); await Task.Delay(200); var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/varz"); var varz = await response.Content.ReadFromJsonAsync(); varz.ShouldNotBeNull(); varz.Connections.ShouldBeGreaterThanOrEqualTo(1); varz.TotalConnections.ShouldBeGreaterThanOrEqualTo(1UL); varz.InMsgs.ShouldBeGreaterThanOrEqualTo(1L); varz.InBytes.ShouldBeGreaterThanOrEqualTo(5L); } [Fact] public async Task Connz_returns_connections() { using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); await sock.ConnectAsync(new IPEndPoint(IPAddress.Loopback, _natsPort)); using var stream = new NetworkStream(sock); var buf = new byte[4096]; _ = await stream.ReadAsync(buf); await stream.WriteAsync("CONNECT {\"name\":\"test-client\",\"lang\":\"csharp\",\"version\":\"1.0\"}\r\n"u8.ToArray()); await Task.Delay(200); var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/connz"); response.StatusCode.ShouldBe(HttpStatusCode.OK); var connz = await response.Content.ReadFromJsonAsync(); connz.ShouldNotBeNull(); connz.NumConns.ShouldBeGreaterThanOrEqualTo(1); connz.Conns.Length.ShouldBeGreaterThanOrEqualTo(1); var conn = connz.Conns.First(c => c.Name == "test-client"); conn.Ip.ShouldNotBeNullOrEmpty(); conn.Port.ShouldBeGreaterThan(0); conn.Lang.ShouldBe("csharp"); conn.Version.ShouldBe("1.0"); conn.Uptime.ShouldNotBeNullOrEmpty(); } [Fact] public async Task Connz_pagination() { var sockets = new List(); try { for (int i = 0; i < 3; i++) { var s = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); await s.ConnectAsync(new IPEndPoint(IPAddress.Loopback, _natsPort)); var ns = new NetworkStream(s); var buf = new byte[4096]; _ = await ns.ReadAsync(buf); await ns.WriteAsync("CONNECT {}\r\n"u8.ToArray()); sockets.Add(s); } await Task.Delay(200); var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/connz?limit=2&offset=0"); var connz = await response.Content.ReadFromJsonAsync(); connz!.Conns.Length.ShouldBe(2); connz.Total.ShouldBeGreaterThanOrEqualTo(3); connz.Limit.ShouldBe(2); connz.Offset.ShouldBe(0); } finally { foreach (var s in sockets) s.Dispose(); } } [Fact] public async Task Connz_with_subscriptions() { using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); await sock.ConnectAsync(new IPEndPoint(IPAddress.Loopback, _natsPort)); using var stream = new NetworkStream(sock); var buf = new byte[4096]; _ = await stream.ReadAsync(buf); await stream.WriteAsync("CONNECT {}\r\nSUB foo 1\r\nSUB bar 2\r\n"u8.ToArray()); await Task.Delay(200); var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/connz?subs=true"); var connz = await response.Content.ReadFromJsonAsync(); var conn = connz!.Conns.First(c => c.NumSubs >= 2); conn.Subs.ShouldNotBeNull(); conn.Subs.ShouldContain("foo"); conn.Subs.ShouldContain("bar"); } [Fact] public async Task Connz_state_closed_returns_disconnected_clients() { // Connect then disconnect a client var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); await sock.ConnectAsync(new IPEndPoint(IPAddress.Loopback, _natsPort)); using var stream = new NetworkStream(sock); var buf = new byte[4096]; _ = await stream.ReadAsync(buf); await stream.WriteAsync("CONNECT {\"name\":\"closing-client\"}\r\n"u8.ToArray()); await Task.Delay(200); sock.Shutdown(SocketShutdown.Both); sock.Dispose(); await Task.Delay(500); var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/connz?state=closed"); var connz = await response.Content.ReadFromJsonAsync(); connz.ShouldNotBeNull(); connz.Conns.ShouldContain(c => c.Name == "closing-client"); var closed = connz.Conns.First(c => c.Name == "closing-client"); closed.Stop.ShouldNotBeNull(); closed.Reason.ShouldNotBeNullOrEmpty(); } [Fact] public async Task Connz_sort_by_stop_requires_closed_state() { var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/connz?sort=stop&state=open"); var connz = await response.Content.ReadFromJsonAsync(); connz.ShouldNotBeNull(); } [Fact] public async Task Connz_sort_by_reason() { var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); await sock.ConnectAsync(new IPEndPoint(IPAddress.Loopback, _natsPort)); var buf = new byte[4096]; _ = await sock.ReceiveAsync(buf); sock.Shutdown(SocketShutdown.Both); sock.Dispose(); await Task.Delay(500); var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/connz?sort=reason&state=closed"); response.StatusCode.ShouldBe(HttpStatusCode.OK); } private static int GetFreePort() { using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); sock.Bind(new IPEndPoint(IPAddress.Loopback, 0)); return ((IPEndPoint)sock.LocalEndPoint!).Port; } } public class MonitorTlsTests : IAsyncLifetime { private readonly NatsServer _server; private readonly int _natsPort; private readonly int _monitorPort; private readonly CancellationTokenSource _cts = new(); private readonly HttpClient _http = new(); private readonly string _certPath; private readonly string _keyPath; public MonitorTlsTests() { _natsPort = GetFreePort(); _monitorPort = GetFreePort(); (_certPath, _keyPath) = TlsHelperTests.GenerateTestCertFiles(); _server = new NatsServer( new NatsOptions { Port = _natsPort, MonitorPort = _monitorPort, TlsCert = _certPath, TlsKey = _keyPath, }, NullLoggerFactory.Instance); } public async Task InitializeAsync() { _ = _server.StartAsync(_cts.Token); await _server.WaitForReadyAsync(); // Wait for monitoring HTTP server to be ready for (int i = 0; i < 50; i++) { try { var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/healthz"); if (response.IsSuccessStatusCode) break; } catch (HttpRequestException) { } await Task.Delay(50); } } public async Task DisposeAsync() { _http.Dispose(); await _cts.CancelAsync(); _server.Dispose(); File.Delete(_certPath); File.Delete(_keyPath); } [Fact] public async Task Connz_shows_tls_info_for_tls_client() { // Connect and upgrade to TLS using var tcp = new TcpClient(); await tcp.ConnectAsync(IPAddress.Loopback, _natsPort); using var netStream = tcp.GetStream(); var buf = new byte[4096]; _ = await netStream.ReadAsync(buf); // Read INFO using var ssl = new SslStream(netStream, false, (_, _, _, _) => true); await ssl.AuthenticateAsClientAsync("localhost"); await ssl.WriteAsync("CONNECT {}\r\n"u8.ToArray()); await ssl.FlushAsync(); await Task.Delay(200); var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/connz"); var connz = await response.Content.ReadFromJsonAsync(); connz!.Conns.Length.ShouldBeGreaterThanOrEqualTo(1); var conn = connz.Conns[0]; conn.TlsVersion.ShouldNotBeNullOrEmpty(); conn.TlsCipherSuite.ShouldNotBeNullOrEmpty(); } private static int GetFreePort() { using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); sock.Bind(new IPEndPoint(IPAddress.Loopback, 0)); return ((IPEndPoint)sock.LocalEndPoint!).Port; } }