feat: add MonitorServer with /healthz and /varz endpoints

This commit is contained in:
Joseph Doherty
2026-02-22 22:20:44 -05:00
parent f6b38df291
commit f2badc3488
4 changed files with 289 additions and 0 deletions

View File

@@ -0,0 +1,62 @@
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
namespace NATS.Server.Monitoring;
/// <summary>
/// HTTP monitoring server providing /healthz, /varz, and other monitoring endpoints.
/// Corresponds to Go server/monitor.go HTTP server setup.
/// </summary>
public sealed class MonitorServer : IAsyncDisposable
{
private readonly WebApplication _app;
private readonly ILogger<MonitorServer> _logger;
public MonitorServer(NatsServer server, NatsOptions options, ILoggerFactory loggerFactory)
{
_logger = loggerFactory.CreateLogger<MonitorServer>();
var builder = WebApplication.CreateSlimBuilder();
builder.WebHost.UseUrls($"http://{options.MonitorHost}:{options.MonitorPort}");
builder.Logging.ClearProviders();
_app = builder.Build();
var basePath = options.MonitorBasePath ?? "";
var varzHandler = new VarzHandler(server, options);
_app.MapGet(basePath + "/", () => Results.Ok(new
{
endpoints = new[]
{
"/varz", "/connz", "/healthz", "/routez",
"/gatewayz", "/leafz", "/subz", "/accountz", "/jsz",
},
}));
_app.MapGet(basePath + "/healthz", () => Results.Ok("ok"));
_app.MapGet(basePath + "/varz", async () => Results.Ok(await varzHandler.HandleVarzAsync()));
// Stubs for unimplemented endpoints
_app.MapGet(basePath + "/routez", () => Results.Ok(new { }));
_app.MapGet(basePath + "/gatewayz", () => Results.Ok(new { }));
_app.MapGet(basePath + "/leafz", () => Results.Ok(new { }));
_app.MapGet(basePath + "/subz", () => Results.Ok(new { }));
_app.MapGet(basePath + "/subscriptionsz", () => Results.Ok(new { }));
_app.MapGet(basePath + "/accountz", () => Results.Ok(new { }));
_app.MapGet(basePath + "/accstatz", () => Results.Ok(new { }));
_app.MapGet(basePath + "/jsz", () => Results.Ok(new { }));
}
public async Task StartAsync(CancellationToken ct)
{
await _app.StartAsync(ct);
_logger.LogInformation("Monitoring listening on {Urls}", string.Join(", ", _app.Urls));
}
public async ValueTask DisposeAsync()
{
await _app.DisposeAsync();
}
}

View File

@@ -0,0 +1,119 @@
using System.Diagnostics;
using System.Runtime.InteropServices;
using NATS.Server.Protocol;
namespace NATS.Server.Monitoring;
/// <summary>
/// Handles building the Varz response from server state and process metrics.
/// Corresponds to Go server/monitor.go handleVarz function.
/// </summary>
public sealed class VarzHandler
{
private readonly NatsServer _server;
private readonly NatsOptions _options;
private readonly SemaphoreSlim _varzMu = new(1, 1);
private DateTime _lastCpuSampleTime;
private TimeSpan _lastCpuUsage;
private double _cachedCpuPercent;
public VarzHandler(NatsServer server, NatsOptions options)
{
_server = server;
_options = options;
var proc = Process.GetCurrentProcess();
_lastCpuSampleTime = DateTime.UtcNow;
_lastCpuUsage = proc.TotalProcessorTime;
}
public async Task<Varz> HandleVarzAsync()
{
await _varzMu.WaitAsync();
try
{
var proc = Process.GetCurrentProcess();
var now = DateTime.UtcNow;
var uptime = now - _server.StartTime;
var stats = _server.Stats;
// CPU sampling with 1-second cache to avoid excessive sampling
if ((now - _lastCpuSampleTime).TotalSeconds >= 1.0)
{
var currentCpu = proc.TotalProcessorTime;
var elapsed = now - _lastCpuSampleTime;
_cachedCpuPercent = (currentCpu - _lastCpuUsage).TotalMilliseconds
/ elapsed.TotalMilliseconds / Environment.ProcessorCount * 100.0;
_lastCpuSampleTime = now;
_lastCpuUsage = currentCpu;
}
// Track HTTP request count for /varz
stats.HttpReqStats.AddOrUpdate("/varz", 1, (_, v) => v + 1);
return new Varz
{
Id = _server.ServerId,
Name = _server.ServerName,
Version = NatsProtocol.Version,
Proto = NatsProtocol.ProtoVersion,
GoVersion = $"dotnet {RuntimeInformation.FrameworkDescription}",
Host = _options.Host,
Port = _options.Port,
HttpHost = _options.MonitorHost,
HttpPort = _options.MonitorPort,
HttpBasePath = _options.MonitorBasePath ?? "",
HttpsPort = _options.MonitorHttpsPort,
TlsRequired = _options.HasTls && !_options.AllowNonTls,
TlsVerify = _options.HasTls && _options.TlsVerify,
TlsTimeout = _options.HasTls ? _options.TlsTimeout.TotalSeconds : 0,
MaxConnections = _options.MaxConnections,
MaxPayload = _options.MaxPayload,
MaxControlLine = _options.MaxControlLine,
MaxPingsOut = _options.MaxPingsOut,
PingInterval = (long)_options.PingInterval.TotalNanoseconds,
Start = _server.StartTime,
Now = now,
Uptime = FormatUptime(uptime),
Mem = proc.WorkingSet64,
Cpu = Math.Round(_cachedCpuPercent, 2),
Cores = Environment.ProcessorCount,
MaxProcs = ThreadPool.ThreadCount,
Connections = _server.ClientCount,
TotalConnections = (ulong)Interlocked.Read(ref stats.TotalConnections),
InMsgs = Interlocked.Read(ref stats.InMsgs),
OutMsgs = Interlocked.Read(ref stats.OutMsgs),
InBytes = Interlocked.Read(ref stats.InBytes),
OutBytes = Interlocked.Read(ref stats.OutBytes),
SlowConsumers = Interlocked.Read(ref stats.SlowConsumers),
SlowConsumerStats = new SlowConsumersStats
{
Clients = (ulong)Interlocked.Read(ref stats.SlowConsumerClients),
Routes = (ulong)Interlocked.Read(ref stats.SlowConsumerRoutes),
Gateways = (ulong)Interlocked.Read(ref stats.SlowConsumerGateways),
Leafs = (ulong)Interlocked.Read(ref stats.SlowConsumerLeafs),
},
Subscriptions = _server.SubList.Count,
ConfigLoadTime = _server.StartTime,
HttpReqStats = stats.HttpReqStats.ToDictionary(kv => kv.Key, kv => (ulong)kv.Value),
};
}
finally
{
_varzMu.Release();
}
}
/// <summary>
/// Formats a TimeSpan as a human-readable uptime string matching Go server format.
/// </summary>
private static string FormatUptime(TimeSpan ts)
{
if (ts.TotalDays >= 1)
return $"{(int)ts.TotalDays}d{ts.Hours}h{ts.Minutes}m{ts.Seconds}s";
if (ts.TotalHours >= 1)
return $"{(int)ts.TotalHours}h{ts.Minutes}m{ts.Seconds}s";
if (ts.TotalMinutes >= 1)
return $"{(int)ts.TotalMinutes}m{ts.Seconds}s";
return $"{(int)ts.TotalSeconds}s";
}
}

View File

@@ -3,6 +3,7 @@ using System.Net;
using System.Net.Sockets;
using System.Text;
using Microsoft.Extensions.Logging;
using NATS.Server.Monitoring;
using NATS.Server.Protocol;
using NATS.Server.Subscriptions;
@@ -19,6 +20,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
private readonly ServerStats _stats = new();
private readonly TaskCompletionSource _listeningStarted = new(TaskCreationOptions.RunContinuationsAsynchronously);
private Socket? _listener;
private MonitorServer? _monitorServer;
private ulong _nextClientId;
private long _startTimeTicks;
@@ -61,6 +63,12 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
_logger.LogInformation("Listening on {Host}:{Port}", _options.Host, _options.Port);
if (_options.MonitorPort > 0)
{
_monitorServer = new MonitorServer(this, _options, _loggerFactory);
await _monitorServer.StartAsync(ct);
}
try
{
while (!ct.IsCancellationRequested)
@@ -189,6 +197,8 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
public void Dispose()
{
if (_monitorServer != null)
_monitorServer.DisposeAsync().AsTask().GetAwaiter().GetResult();
_listener?.Dispose();
foreach (var client in _clients.Values)
client.Dispose();

View File

@@ -0,0 +1,98 @@
using System.Net;
using System.Net.Http.Json;
using System.Net.Sockets;
using Microsoft.Extensions.Logging.Abstractions;
using NATS.Server.Monitoring;
namespace NATS.Server.Tests;
public class MonitorTests : IAsyncLifetime
{
private readonly NatsServer _server;
private readonly int _natsPort;
private readonly int _monitorPort;
private readonly CancellationTokenSource _cts = new();
private readonly HttpClient _http = new();
public MonitorTests()
{
_natsPort = GetFreePort();
_monitorPort = GetFreePort();
_server = new NatsServer(
new NatsOptions { Port = _natsPort, MonitorPort = _monitorPort },
NullLoggerFactory.Instance);
}
public async Task InitializeAsync()
{
_ = _server.StartAsync(_cts.Token);
await _server.WaitForReadyAsync();
// Give monitoring server time to start
await Task.Delay(200);
}
public async Task DisposeAsync()
{
_http.Dispose();
await _cts.CancelAsync();
_server.Dispose();
}
[Fact]
public async Task Healthz_returns_ok()
{
var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/healthz");
response.StatusCode.ShouldBe(HttpStatusCode.OK);
}
[Fact]
public async Task Varz_returns_server_identity()
{
var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/varz");
response.StatusCode.ShouldBe(HttpStatusCode.OK);
var varz = await response.Content.ReadFromJsonAsync<Varz>();
varz.ShouldNotBeNull();
varz.Id.ShouldNotBeNullOrEmpty();
varz.Name.ShouldNotBeNullOrEmpty();
varz.Version.ShouldBe("0.1.0");
varz.Host.ShouldBe("0.0.0.0");
varz.Port.ShouldBe(_natsPort);
varz.MaxPayload.ShouldBe(1024 * 1024);
varz.Uptime.ShouldNotBeNullOrEmpty();
varz.Now.ShouldBeGreaterThan(DateTime.MinValue);
varz.Mem.ShouldBeGreaterThan(0);
varz.Cores.ShouldBeGreaterThan(0);
}
[Fact]
public async Task Varz_tracks_connections_and_messages()
{
// Connect a client and send a message
using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await sock.ConnectAsync(new IPEndPoint(IPAddress.Loopback, _natsPort));
var buf = new byte[4096];
_ = await sock.ReceiveAsync(buf, SocketFlags.None); // Read INFO
var cmd = "CONNECT {}\r\nSUB test 1\r\nPUB test 5\r\nhello\r\n"u8.ToArray();
await sock.SendAsync(cmd, SocketFlags.None);
await Task.Delay(200);
var response = await _http.GetAsync($"http://127.0.0.1:{_monitorPort}/varz");
var varz = await response.Content.ReadFromJsonAsync<Varz>();
varz.ShouldNotBeNull();
varz.Connections.ShouldBeGreaterThanOrEqualTo(1);
varz.TotalConnections.ShouldBeGreaterThanOrEqualTo(1UL);
varz.InMsgs.ShouldBeGreaterThanOrEqualTo(1L);
varz.InBytes.ShouldBeGreaterThanOrEqualTo(5L);
}
private static int GetFreePort()
{
using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
sock.Bind(new IPEndPoint(IPAddress.Loopback, 0));
return ((IPEndPoint)sock.LocalEndPoint!).Port;
}
}