From 5bae9cc289f0cc2fc91665b3cf4b8b198c374f4d Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 23 Feb 2026 05:48:32 -0500 Subject: [PATCH] feat: add system request-reply monitoring services ($SYS.REQ.SERVER.*) Register VARZ, HEALTHZ, SUBSZ, STATSZ, and IDZ request-reply handlers on $SYS.REQ.SERVER.{id}.* subjects and $SYS.REQ.SERVER.PING.* wildcard subjects via InitEventTracking. Also excludes the $SYS system account from the /subz monitoring endpoint by default since its subscriptions are internal infrastructure. --- src/NATS.Server/Events/InternalEventSystem.cs | 39 ++++ src/NATS.Server/Monitoring/SubszHandler.cs | 10 +- src/NATS.Server/NatsServer.cs | 93 ++++++++++ .../SystemRequestReplyTests.cs | 170 ++++++++++++++++++ 4 files changed, 309 insertions(+), 3 deletions(-) create mode 100644 tests/NATS.Server.Tests/SystemRequestReplyTests.cs diff --git a/src/NATS.Server/Events/InternalEventSystem.cs b/src/NATS.Server/Events/InternalEventSystem.cs index f8f49f1..caac5dd 100644 --- a/src/NATS.Server/Events/InternalEventSystem.cs +++ b/src/NATS.Server/Events/InternalEventSystem.cs @@ -97,6 +97,45 @@ public sealed class InternalEventSystem : IAsyncDisposable }, ct); } + /// + /// Registers system request-reply monitoring services for this server. + /// Maps to Go's initEventTracking in events.go. + /// Sets up handlers for $SYS.REQ.SERVER.{id}.VARZ, HEALTHZ, SUBSZ, STATSZ, IDZ + /// and wildcard $SYS.REQ.SERVER.PING.* subjects. + /// + public void InitEventTracking(NatsServer server) + { + _server = server; + var serverId = server.ServerId; + + // Server-specific monitoring services + RegisterService(serverId, "VARZ", server.HandleVarzRequest); + RegisterService(serverId, "HEALTHZ", server.HandleHealthzRequest); + RegisterService(serverId, "SUBSZ", server.HandleSubszRequest); + RegisterService(serverId, "STATSZ", server.HandleStatszRequest); + RegisterService(serverId, "IDZ", server.HandleIdzRequest); + + // Wildcard ping services (all servers respond) + SysSubscribe(string.Format(EventSubjects.ServerPing, "VARZ"), WrapRequestHandler(server.HandleVarzRequest)); + SysSubscribe(string.Format(EventSubjects.ServerPing, "HEALTHZ"), WrapRequestHandler(server.HandleHealthzRequest)); + SysSubscribe(string.Format(EventSubjects.ServerPing, "IDZ"), WrapRequestHandler(server.HandleIdzRequest)); + SysSubscribe(string.Format(EventSubjects.ServerPing, "STATSZ"), WrapRequestHandler(server.HandleStatszRequest)); + } + + private void RegisterService(string serverId, string name, Action handler) + { + var subject = string.Format(EventSubjects.ServerReq, serverId, name); + SysSubscribe(subject, WrapRequestHandler(handler)); + } + + private SystemMessageHandler WrapRequestHandler(Action handler) + { + return (sub, client, acc, subject, reply, hdr, msg) => + { + handler(subject, reply); + }; + } + /// /// Publishes a $SYS.SERVER.{id}.STATSZ message with current server statistics. /// Maps to Go's sendStatsz in events.go. diff --git a/src/NATS.Server/Monitoring/SubszHandler.cs b/src/NATS.Server/Monitoring/SubszHandler.cs index 1de4f97..4a64796 100644 --- a/src/NATS.Server/Monitoring/SubszHandler.cs +++ b/src/NATS.Server/Monitoring/SubszHandler.cs @@ -14,12 +14,16 @@ public sealed class SubszHandler(NatsServer server) var opts = ParseQueryParams(ctx); var now = DateTime.UtcNow; - // Collect subscriptions from all accounts (or filtered) + // Collect subscriptions from all accounts (or filtered). + // Exclude the $SYS system account unless explicitly requested — its internal + // subscriptions are infrastructure and not user-facing. var allSubs = new List(); foreach (var account in server.GetAccounts()) { if (!string.IsNullOrEmpty(opts.Account) && account.Name != opts.Account) continue; + if (string.IsNullOrEmpty(opts.Account) && account.Name == "$SYS") + continue; allSubs.AddRange(account.SubList.GetAllSubscriptions()); } @@ -31,10 +35,10 @@ public sealed class SubszHandler(NatsServer server) var total = allSubs.Count; var numSubs = server.GetAccounts() - .Where(a => string.IsNullOrEmpty(opts.Account) || a.Name == opts.Account) + .Where(a => (string.IsNullOrEmpty(opts.Account) && a.Name != "$SYS") || a.Name == opts.Account) .Aggregate(0u, (sum, a) => sum + a.SubList.Count); var numCache = server.GetAccounts() - .Where(a => string.IsNullOrEmpty(opts.Account) || a.Name == opts.Account) + .Where(a => (string.IsNullOrEmpty(opts.Account) && a.Name != "$SYS") || a.Name == opts.Account) .Sum(a => a.SubList.CacheCount); SubDetail[] details = []; diff --git a/src/NATS.Server/NatsServer.cs b/src/NATS.Server/NatsServer.cs index 9239f36..dbae0ba 100644 --- a/src/NATS.Server/NatsServer.cs +++ b/src/NATS.Server/NatsServer.cs @@ -398,6 +398,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable _listeningStarted.TrySetResult(); _eventSystem?.Start(this); + _eventSystem?.InitEventTracking(this); _logger.LogInformation("Listening for client connections on {Host}:{Port}", _options.Host, _options.Port); @@ -724,6 +725,98 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable _eventSystem?.Enqueue(new PublishMessage { Subject = subject, Body = msg }); } + /// + /// Handles $SYS.REQ.SERVER.{id}.VARZ requests. + /// Returns core server information including stats counters. + /// + public void HandleVarzRequest(string subject, string? reply) + { + if (reply == null) return; + var varz = new + { + server_id = _serverInfo.ServerId, + server_name = _serverInfo.ServerName, + version = NatsProtocol.Version, + host = _options.Host, + port = _options.Port, + max_payload = _options.MaxPayload, + connections = ClientCount, + total_connections = Interlocked.Read(ref _stats.TotalConnections), + in_msgs = Interlocked.Read(ref _stats.InMsgs), + out_msgs = Interlocked.Read(ref _stats.OutMsgs), + in_bytes = Interlocked.Read(ref _stats.InBytes), + out_bytes = Interlocked.Read(ref _stats.OutBytes), + }; + SendInternalMsg(reply, null, varz); + } + + /// + /// Handles $SYS.REQ.SERVER.{id}.HEALTHZ requests. + /// Returns a simple health status response. + /// + public void HandleHealthzRequest(string subject, string? reply) + { + if (reply == null) return; + SendInternalMsg(reply, null, new { status = "ok" }); + } + + /// + /// Handles $SYS.REQ.SERVER.{id}.SUBSZ requests. + /// Returns the current subscription count. + /// + public void HandleSubszRequest(string subject, string? reply) + { + if (reply == null) return; + SendInternalMsg(reply, null, new { num_subscriptions = SubList.Count }); + } + + /// + /// Handles $SYS.REQ.SERVER.{id}.STATSZ requests. + /// Publishes current server statistics through the event system. + /// + public void HandleStatszRequest(string subject, string? reply) + { + if (reply == null) return; + var process = System.Diagnostics.Process.GetCurrentProcess(); + var statsMsg = new Events.ServerStatsMsg + { + Server = BuildEventServerInfo(), + Stats = new Events.ServerStatsData + { + Start = StartTime, + Mem = process.WorkingSet64, + Cores = Environment.ProcessorCount, + Connections = ClientCount, + TotalConnections = Interlocked.Read(ref _stats.TotalConnections), + Subscriptions = SubList.Count, + InMsgs = Interlocked.Read(ref _stats.InMsgs), + OutMsgs = Interlocked.Read(ref _stats.OutMsgs), + InBytes = Interlocked.Read(ref _stats.InBytes), + OutBytes = Interlocked.Read(ref _stats.OutBytes), + SlowConsumers = Interlocked.Read(ref _stats.SlowConsumers), + }, + }; + SendInternalMsg(reply, null, statsMsg); + } + + /// + /// Handles $SYS.REQ.SERVER.{id}.IDZ requests. + /// Returns basic server identity information. + /// + public void HandleIdzRequest(string subject, string? reply) + { + if (reply == null) return; + var idz = new + { + server_id = _serverInfo.ServerId, + server_name = _serverInfo.ServerName, + version = NatsProtocol.Version, + host = _options.Host, + port = _options.Port, + }; + SendInternalMsg(reply, null, idz); + } + /// /// Builds an EventServerInfo block for embedding in system event messages. /// Maps to Go's serverInfo() helper used in events.go advisory publishing. diff --git a/tests/NATS.Server.Tests/SystemRequestReplyTests.cs b/tests/NATS.Server.Tests/SystemRequestReplyTests.cs new file mode 100644 index 0000000..c5d0c97 --- /dev/null +++ b/tests/NATS.Server.Tests/SystemRequestReplyTests.cs @@ -0,0 +1,170 @@ +using System.Text; +using System.Text.Json; +using NATS.Server; +using NATS.Server.Events; +using Microsoft.Extensions.Logging.Abstractions; + +namespace NATS.Server.Tests; + +public class SystemRequestReplyTests +{ + [Fact] + public async Task Varz_request_reply_returns_server_info() + { + using var server = CreateTestServer(); + _ = server.StartAsync(CancellationToken.None); + await server.WaitForReadyAsync(); + + var received = new TaskCompletionSource(); + var replySubject = $"_INBOX.test.{Guid.NewGuid():N}"; + server.EventSystem!.SysSubscribe(replySubject, (sub, client, acc, subject, reply, hdr, msg) => + { + received.TrySetResult(msg.ToArray()); + }); + + var reqSubject = string.Format(EventSubjects.ServerReq, server.ServerId, "VARZ"); + server.SendInternalMsg(reqSubject, replySubject, null); + + var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5)); + var json = Encoding.UTF8.GetString(result); + json.ShouldContain("\"server_id\""); + json.ShouldContain("\"version\""); + json.ShouldContain("\"host\""); + json.ShouldContain("\"port\""); + + await server.ShutdownAsync(); + } + + [Fact] + public async Task Healthz_request_reply_returns_ok() + { + using var server = CreateTestServer(); + _ = server.StartAsync(CancellationToken.None); + await server.WaitForReadyAsync(); + + var received = new TaskCompletionSource(); + var replySubject = $"_INBOX.test.{Guid.NewGuid():N}"; + server.EventSystem!.SysSubscribe(replySubject, (sub, client, acc, subject, reply, hdr, msg) => + { + received.TrySetResult(msg.ToArray()); + }); + + var reqSubject = string.Format(EventSubjects.ServerReq, server.ServerId, "HEALTHZ"); + server.SendInternalMsg(reqSubject, replySubject, null); + + var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5)); + var json = Encoding.UTF8.GetString(result); + json.ShouldContain("ok"); + + await server.ShutdownAsync(); + } + + [Fact] + public async Task Subsz_request_reply_returns_subscription_count() + { + using var server = CreateTestServer(); + _ = server.StartAsync(CancellationToken.None); + await server.WaitForReadyAsync(); + + var received = new TaskCompletionSource(); + var replySubject = $"_INBOX.test.{Guid.NewGuid():N}"; + server.EventSystem!.SysSubscribe(replySubject, (sub, client, acc, subject, reply, hdr, msg) => + { + received.TrySetResult(msg.ToArray()); + }); + + var reqSubject = string.Format(EventSubjects.ServerReq, server.ServerId, "SUBSZ"); + server.SendInternalMsg(reqSubject, replySubject, null); + + var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5)); + var json = Encoding.UTF8.GetString(result); + json.ShouldContain("\"num_subscriptions\""); + + await server.ShutdownAsync(); + } + + [Fact] + public async Task Idz_request_reply_returns_server_identity() + { + using var server = CreateTestServer(); + _ = server.StartAsync(CancellationToken.None); + await server.WaitForReadyAsync(); + + var received = new TaskCompletionSource(); + var replySubject = $"_INBOX.test.{Guid.NewGuid():N}"; + server.EventSystem!.SysSubscribe(replySubject, (sub, client, acc, subject, reply, hdr, msg) => + { + received.TrySetResult(msg.ToArray()); + }); + + var reqSubject = string.Format(EventSubjects.ServerReq, server.ServerId, "IDZ"); + server.SendInternalMsg(reqSubject, replySubject, null); + + var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5)); + var json = Encoding.UTF8.GetString(result); + json.ShouldContain("\"server_id\""); + json.ShouldContain("\"server_name\""); + + await server.ShutdownAsync(); + } + + [Fact] + public async Task Ping_varz_responds_via_wildcard_subject() + { + using var server = CreateTestServer(); + _ = server.StartAsync(CancellationToken.None); + await server.WaitForReadyAsync(); + + var received = new TaskCompletionSource(); + var replySubject = $"_INBOX.test.{Guid.NewGuid():N}"; + server.EventSystem!.SysSubscribe(replySubject, (sub, client, acc, subject, reply, hdr, msg) => + { + received.TrySetResult(msg.ToArray()); + }); + + var pingSubject = string.Format(EventSubjects.ServerPing, "VARZ"); + server.SendInternalMsg(pingSubject, replySubject, null); + + var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5)); + var json = Encoding.UTF8.GetString(result); + json.ShouldContain("\"server_id\""); + + await server.ShutdownAsync(); + } + + [Fact] + public async Task Request_without_reply_is_ignored() + { + using var server = CreateTestServer(); + _ = server.StartAsync(CancellationToken.None); + await server.WaitForReadyAsync(); + + // Send a request with no reply subject -- should not crash + var reqSubject = string.Format(EventSubjects.ServerReq, server.ServerId, "VARZ"); + server.SendInternalMsg(reqSubject, null, null); + + // Give it a moment to process without error + await Task.Delay(200); + + // Server should still be running + server.IsShuttingDown.ShouldBeFalse(); + + await server.ShutdownAsync(); + } + + private static NatsServer CreateTestServer() + { + var port = GetFreePort(); + return new NatsServer(new NatsOptions { Port = port }, NullLoggerFactory.Instance); + } + + private static int GetFreePort() + { + using var sock = new System.Net.Sockets.Socket( + System.Net.Sockets.AddressFamily.InterNetwork, + System.Net.Sockets.SocketType.Stream, + System.Net.Sockets.ProtocolType.Tcp); + sock.Bind(new System.Net.IPEndPoint(System.Net.IPAddress.Loopback, 0)); + return ((System.Net.IPEndPoint)sock.LocalEndPoint!).Port; + } +}