using System.Net; using System.Net.Http.Json; using System.Net.Sockets; using Microsoft.Extensions.Logging.Abstractions; using NATS.Server.Monitoring; namespace NATS.Server.Tests; public class MonitorTests : IAsyncLifetime { private readonly NatsServer _server; private readonly int _natsPort; private readonly int _monitorPort; private readonly CancellationTokenSource _cts = new(); private readonly HttpClient _http = new(); public MonitorTests() { _natsPort = GetFreePort(); _monitorPort = GetFreePort(); _server = new NatsServer( new NatsOptions { Port = _natsPort, MonitorPort = _monitorPort }, NullLoggerFactory.Instance); } public async Task InitializeAsync() { _ = _server.StartAsync(_cts.Token); await _server.WaitForReadyAsync(); // Give monitoring server time to start await Task.Delay(200); } public async Task DisposeAsync() { _http.Dispose(); await _cts.CancelAsync(); _server.Dispose(); } [Fact] public async Task Healthz_returns_ok() { var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/healthz"); response.StatusCode.ShouldBe(HttpStatusCode.OK); } [Fact] public async Task Varz_returns_server_identity() { var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/varz"); response.StatusCode.ShouldBe(HttpStatusCode.OK); var varz = await response.Content.ReadFromJsonAsync(); varz.ShouldNotBeNull(); varz.Id.ShouldNotBeNullOrEmpty(); varz.Name.ShouldNotBeNullOrEmpty(); varz.Version.ShouldBe("0.1.0"); varz.Host.ShouldBe("0.0.0.0"); varz.Port.ShouldBe(_natsPort); varz.MaxPayload.ShouldBe(1024 * 1024); varz.Uptime.ShouldNotBeNullOrEmpty(); varz.Now.ShouldBeGreaterThan(DateTime.MinValue); varz.Mem.ShouldBeGreaterThan(0); varz.Cores.ShouldBeGreaterThan(0); } [Fact] public async Task Varz_tracks_connections_and_messages() { // Connect a client and send a message using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); await sock.ConnectAsync(new IPEndPoint(IPAddress.Loopback, _natsPort)); var buf = new byte[4096]; _ = await sock.ReceiveAsync(buf, SocketFlags.None); // Read INFO var cmd = "CONNECT {}\r\nSUB test 1\r\nPUB test 5\r\nhello\r\n"u8.ToArray(); await sock.SendAsync(cmd, SocketFlags.None); await Task.Delay(200); var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/varz"); var varz = await response.Content.ReadFromJsonAsync(); varz.ShouldNotBeNull(); varz.Connections.ShouldBeGreaterThanOrEqualTo(1); varz.TotalConnections.ShouldBeGreaterThanOrEqualTo(1UL); varz.InMsgs.ShouldBeGreaterThanOrEqualTo(1L); varz.InBytes.ShouldBeGreaterThanOrEqualTo(5L); } private static int GetFreePort() { using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); sock.Bind(new IPEndPoint(IPAddress.Loopback, 0)); return ((IPEndPoint)sock.LocalEndPoint!).Port; } }