diff --git a/src/NATS.Server/Monitoring/MonitorServer.cs b/src/NATS.Server/Monitoring/MonitorServer.cs new file mode 100644 index 0000000..0abdf56 --- /dev/null +++ b/src/NATS.Server/Monitoring/MonitorServer.cs @@ -0,0 +1,62 @@ +using Microsoft.AspNetCore.Builder; +using Microsoft.AspNetCore.Hosting; +using Microsoft.AspNetCore.Http; +using Microsoft.Extensions.Logging; + +namespace NATS.Server.Monitoring; + +/// +/// HTTP monitoring server providing /healthz, /varz, and other monitoring endpoints. +/// Corresponds to Go server/monitor.go HTTP server setup. +/// +public sealed class MonitorServer : IAsyncDisposable +{ + private readonly WebApplication _app; + private readonly ILogger _logger; + + public MonitorServer(NatsServer server, NatsOptions options, ILoggerFactory loggerFactory) + { + _logger = loggerFactory.CreateLogger(); + + var builder = WebApplication.CreateSlimBuilder(); + builder.WebHost.UseUrls($"http://{options.MonitorHost}:{options.MonitorPort}"); + builder.Logging.ClearProviders(); + + _app = builder.Build(); + var basePath = options.MonitorBasePath ?? ""; + + var varzHandler = new VarzHandler(server, options); + + _app.MapGet(basePath + "/", () => Results.Ok(new + { + endpoints = new[] + { + "/varz", "/connz", "/healthz", "/routez", + "/gatewayz", "/leafz", "/subz", "/accountz", "/jsz", + }, + })); + _app.MapGet(basePath + "/healthz", () => Results.Ok("ok")); + _app.MapGet(basePath + "/varz", async () => Results.Ok(await varzHandler.HandleVarzAsync())); + + // Stubs for unimplemented endpoints + _app.MapGet(basePath + "/routez", () => Results.Ok(new { })); + _app.MapGet(basePath + "/gatewayz", () => Results.Ok(new { })); + _app.MapGet(basePath + "/leafz", () => Results.Ok(new { })); + _app.MapGet(basePath + "/subz", () => Results.Ok(new { })); + _app.MapGet(basePath + "/subscriptionsz", () => Results.Ok(new { })); + _app.MapGet(basePath + "/accountz", () => Results.Ok(new { })); + _app.MapGet(basePath + "/accstatz", () => Results.Ok(new { })); + _app.MapGet(basePath + "/jsz", () => Results.Ok(new { })); + } + + public async Task StartAsync(CancellationToken ct) + { + await _app.StartAsync(ct); + _logger.LogInformation("Monitoring listening on {Urls}", string.Join(", ", _app.Urls)); + } + + public async ValueTask DisposeAsync() + { + await _app.DisposeAsync(); + } +} diff --git a/src/NATS.Server/Monitoring/VarzHandler.cs b/src/NATS.Server/Monitoring/VarzHandler.cs new file mode 100644 index 0000000..c24b910 --- /dev/null +++ b/src/NATS.Server/Monitoring/VarzHandler.cs @@ -0,0 +1,119 @@ +using System.Diagnostics; +using System.Runtime.InteropServices; +using NATS.Server.Protocol; + +namespace NATS.Server.Monitoring; + +/// +/// Handles building the Varz response from server state and process metrics. +/// Corresponds to Go server/monitor.go handleVarz function. +/// +public sealed class VarzHandler +{ + private readonly NatsServer _server; + private readonly NatsOptions _options; + private readonly SemaphoreSlim _varzMu = new(1, 1); + private DateTime _lastCpuSampleTime; + private TimeSpan _lastCpuUsage; + private double _cachedCpuPercent; + + public VarzHandler(NatsServer server, NatsOptions options) + { + _server = server; + _options = options; + var proc = Process.GetCurrentProcess(); + _lastCpuSampleTime = DateTime.UtcNow; + _lastCpuUsage = proc.TotalProcessorTime; + } + + public async Task HandleVarzAsync() + { + await _varzMu.WaitAsync(); + try + { + var proc = Process.GetCurrentProcess(); + var now = DateTime.UtcNow; + var uptime = now - _server.StartTime; + var stats = _server.Stats; + + // CPU sampling with 1-second cache to avoid excessive sampling + if ((now - _lastCpuSampleTime).TotalSeconds >= 1.0) + { + var currentCpu = proc.TotalProcessorTime; + var elapsed = now - _lastCpuSampleTime; + _cachedCpuPercent = (currentCpu - _lastCpuUsage).TotalMilliseconds + / elapsed.TotalMilliseconds / Environment.ProcessorCount * 100.0; + _lastCpuSampleTime = now; + _lastCpuUsage = currentCpu; + } + + // Track HTTP request count for /varz + stats.HttpReqStats.AddOrUpdate("/varz", 1, (_, v) => v + 1); + + return new Varz + { + Id = _server.ServerId, + Name = _server.ServerName, + Version = NatsProtocol.Version, + Proto = NatsProtocol.ProtoVersion, + GoVersion = $"dotnet {RuntimeInformation.FrameworkDescription}", + Host = _options.Host, + Port = _options.Port, + HttpHost = _options.MonitorHost, + HttpPort = _options.MonitorPort, + HttpBasePath = _options.MonitorBasePath ?? "", + HttpsPort = _options.MonitorHttpsPort, + TlsRequired = _options.HasTls && !_options.AllowNonTls, + TlsVerify = _options.HasTls && _options.TlsVerify, + TlsTimeout = _options.HasTls ? _options.TlsTimeout.TotalSeconds : 0, + MaxConnections = _options.MaxConnections, + MaxPayload = _options.MaxPayload, + MaxControlLine = _options.MaxControlLine, + MaxPingsOut = _options.MaxPingsOut, + PingInterval = (long)_options.PingInterval.TotalNanoseconds, + Start = _server.StartTime, + Now = now, + Uptime = FormatUptime(uptime), + Mem = proc.WorkingSet64, + Cpu = Math.Round(_cachedCpuPercent, 2), + Cores = Environment.ProcessorCount, + MaxProcs = ThreadPool.ThreadCount, + Connections = _server.ClientCount, + TotalConnections = (ulong)Interlocked.Read(ref stats.TotalConnections), + InMsgs = Interlocked.Read(ref stats.InMsgs), + OutMsgs = Interlocked.Read(ref stats.OutMsgs), + InBytes = Interlocked.Read(ref stats.InBytes), + OutBytes = Interlocked.Read(ref stats.OutBytes), + SlowConsumers = Interlocked.Read(ref stats.SlowConsumers), + SlowConsumerStats = new SlowConsumersStats + { + Clients = (ulong)Interlocked.Read(ref stats.SlowConsumerClients), + Routes = (ulong)Interlocked.Read(ref stats.SlowConsumerRoutes), + Gateways = (ulong)Interlocked.Read(ref stats.SlowConsumerGateways), + Leafs = (ulong)Interlocked.Read(ref stats.SlowConsumerLeafs), + }, + Subscriptions = _server.SubList.Count, + ConfigLoadTime = _server.StartTime, + HttpReqStats = stats.HttpReqStats.ToDictionary(kv => kv.Key, kv => (ulong)kv.Value), + }; + } + finally + { + _varzMu.Release(); + } + } + + /// + /// Formats a TimeSpan as a human-readable uptime string matching Go server format. + /// + private static string FormatUptime(TimeSpan ts) + { + if (ts.TotalDays >= 1) + return $"{(int)ts.TotalDays}d{ts.Hours}h{ts.Minutes}m{ts.Seconds}s"; + if (ts.TotalHours >= 1) + return $"{(int)ts.TotalHours}h{ts.Minutes}m{ts.Seconds}s"; + if (ts.TotalMinutes >= 1) + return $"{(int)ts.TotalMinutes}m{ts.Seconds}s"; + return $"{(int)ts.TotalSeconds}s"; + } +} diff --git a/src/NATS.Server/NatsServer.cs b/src/NATS.Server/NatsServer.cs index cdf9626..0f9e5e0 100644 --- a/src/NATS.Server/NatsServer.cs +++ b/src/NATS.Server/NatsServer.cs @@ -3,6 +3,7 @@ using System.Net; using System.Net.Sockets; using System.Text; using Microsoft.Extensions.Logging; +using NATS.Server.Monitoring; using NATS.Server.Protocol; using NATS.Server.Subscriptions; @@ -19,6 +20,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable private readonly ServerStats _stats = new(); private readonly TaskCompletionSource _listeningStarted = new(TaskCreationOptions.RunContinuationsAsynchronously); private Socket? _listener; + private MonitorServer? _monitorServer; private ulong _nextClientId; private long _startTimeTicks; @@ -61,6 +63,12 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable _logger.LogInformation("Listening on {Host}:{Port}", _options.Host, _options.Port); + if (_options.MonitorPort > 0) + { + _monitorServer = new MonitorServer(this, _options, _loggerFactory); + await _monitorServer.StartAsync(ct); + } + try { while (!ct.IsCancellationRequested) @@ -189,6 +197,8 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable public void Dispose() { + if (_monitorServer != null) + _monitorServer.DisposeAsync().AsTask().GetAwaiter().GetResult(); _listener?.Dispose(); foreach (var client in _clients.Values) client.Dispose(); diff --git a/tests/NATS.Server.Tests/MonitorTests.cs b/tests/NATS.Server.Tests/MonitorTests.cs new file mode 100644 index 0000000..73caef5 --- /dev/null +++ b/tests/NATS.Server.Tests/MonitorTests.cs @@ -0,0 +1,98 @@ +using System.Net; +using System.Net.Http.Json; +using System.Net.Sockets; +using Microsoft.Extensions.Logging.Abstractions; +using NATS.Server.Monitoring; + +namespace NATS.Server.Tests; + +public class MonitorTests : IAsyncLifetime +{ + private readonly NatsServer _server; + private readonly int _natsPort; + private readonly int _monitorPort; + private readonly CancellationTokenSource _cts = new(); + private readonly HttpClient _http = new(); + + public MonitorTests() + { + _natsPort = GetFreePort(); + _monitorPort = GetFreePort(); + _server = new NatsServer( + new NatsOptions { Port = _natsPort, MonitorPort = _monitorPort }, + NullLoggerFactory.Instance); + } + + public async Task InitializeAsync() + { + _ = _server.StartAsync(_cts.Token); + await _server.WaitForReadyAsync(); + // Give monitoring server time to start + await Task.Delay(200); + } + + public async Task DisposeAsync() + { + _http.Dispose(); + await _cts.CancelAsync(); + _server.Dispose(); + } + + [Fact] + public async Task Healthz_returns_ok() + { + var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/healthz"); + response.StatusCode.ShouldBe(HttpStatusCode.OK); + } + + [Fact] + public async Task Varz_returns_server_identity() + { + var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/varz"); + response.StatusCode.ShouldBe(HttpStatusCode.OK); + + var varz = await response.Content.ReadFromJsonAsync(); + varz.ShouldNotBeNull(); + varz.Id.ShouldNotBeNullOrEmpty(); + varz.Name.ShouldNotBeNullOrEmpty(); + varz.Version.ShouldBe("0.1.0"); + varz.Host.ShouldBe("0.0.0.0"); + varz.Port.ShouldBe(_natsPort); + varz.MaxPayload.ShouldBe(1024 * 1024); + varz.Uptime.ShouldNotBeNullOrEmpty(); + varz.Now.ShouldBeGreaterThan(DateTime.MinValue); + varz.Mem.ShouldBeGreaterThan(0); + varz.Cores.ShouldBeGreaterThan(0); + } + + [Fact] + public async Task Varz_tracks_connections_and_messages() + { + // Connect a client and send a message + using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + await sock.ConnectAsync(new IPEndPoint(IPAddress.Loopback, _natsPort)); + + var buf = new byte[4096]; + _ = await sock.ReceiveAsync(buf, SocketFlags.None); // Read INFO + + var cmd = "CONNECT {}\r\nSUB test 1\r\nPUB test 5\r\nhello\r\n"u8.ToArray(); + await sock.SendAsync(cmd, SocketFlags.None); + await Task.Delay(200); + + var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/varz"); + var varz = await response.Content.ReadFromJsonAsync(); + + varz.ShouldNotBeNull(); + varz.Connections.ShouldBeGreaterThanOrEqualTo(1); + varz.TotalConnections.ShouldBeGreaterThanOrEqualTo(1UL); + varz.InMsgs.ShouldBeGreaterThanOrEqualTo(1L); + varz.InBytes.ShouldBeGreaterThanOrEqualTo(5L); + } + + private static int GetFreePort() + { + using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + sock.Bind(new IPEndPoint(IPAddress.Loopback, 0)); + return ((IPEndPoint)sock.LocalEndPoint!).Port; + } +}