diff --git a/src/NATS.Server/Events/InternalEventSystem.cs b/src/NATS.Server/Events/InternalEventSystem.cs
index f8f49f1..caac5dd 100644
--- a/src/NATS.Server/Events/InternalEventSystem.cs
+++ b/src/NATS.Server/Events/InternalEventSystem.cs
@@ -97,6 +97,45 @@ public sealed class InternalEventSystem : IAsyncDisposable
}, ct);
}
+ ///
+ /// Registers system request-reply monitoring services for this server.
+ /// Maps to Go's initEventTracking in events.go.
+ /// Sets up handlers for $SYS.REQ.SERVER.{id}.VARZ, HEALTHZ, SUBSZ, STATSZ, IDZ
+ /// and wildcard $SYS.REQ.SERVER.PING.* subjects.
+ ///
+ public void InitEventTracking(NatsServer server)
+ {
+ _server = server;
+ var serverId = server.ServerId;
+
+ // Server-specific monitoring services
+ RegisterService(serverId, "VARZ", server.HandleVarzRequest);
+ RegisterService(serverId, "HEALTHZ", server.HandleHealthzRequest);
+ RegisterService(serverId, "SUBSZ", server.HandleSubszRequest);
+ RegisterService(serverId, "STATSZ", server.HandleStatszRequest);
+ RegisterService(serverId, "IDZ", server.HandleIdzRequest);
+
+ // Wildcard ping services (all servers respond)
+ SysSubscribe(string.Format(EventSubjects.ServerPing, "VARZ"), WrapRequestHandler(server.HandleVarzRequest));
+ SysSubscribe(string.Format(EventSubjects.ServerPing, "HEALTHZ"), WrapRequestHandler(server.HandleHealthzRequest));
+ SysSubscribe(string.Format(EventSubjects.ServerPing, "IDZ"), WrapRequestHandler(server.HandleIdzRequest));
+ SysSubscribe(string.Format(EventSubjects.ServerPing, "STATSZ"), WrapRequestHandler(server.HandleStatszRequest));
+ }
+
+ private void RegisterService(string serverId, string name, Action handler)
+ {
+ var subject = string.Format(EventSubjects.ServerReq, serverId, name);
+ SysSubscribe(subject, WrapRequestHandler(handler));
+ }
+
+ private SystemMessageHandler WrapRequestHandler(Action handler)
+ {
+ return (sub, client, acc, subject, reply, hdr, msg) =>
+ {
+ handler(subject, reply);
+ };
+ }
+
///
/// Publishes a $SYS.SERVER.{id}.STATSZ message with current server statistics.
/// Maps to Go's sendStatsz in events.go.
diff --git a/src/NATS.Server/Monitoring/SubszHandler.cs b/src/NATS.Server/Monitoring/SubszHandler.cs
index 1de4f97..4a64796 100644
--- a/src/NATS.Server/Monitoring/SubszHandler.cs
+++ b/src/NATS.Server/Monitoring/SubszHandler.cs
@@ -14,12 +14,16 @@ public sealed class SubszHandler(NatsServer server)
var opts = ParseQueryParams(ctx);
var now = DateTime.UtcNow;
- // Collect subscriptions from all accounts (or filtered)
+ // Collect subscriptions from all accounts (or filtered).
+ // Exclude the $SYS system account unless explicitly requested — its internal
+ // subscriptions are infrastructure and not user-facing.
var allSubs = new List();
foreach (var account in server.GetAccounts())
{
if (!string.IsNullOrEmpty(opts.Account) && account.Name != opts.Account)
continue;
+ if (string.IsNullOrEmpty(opts.Account) && account.Name == "$SYS")
+ continue;
allSubs.AddRange(account.SubList.GetAllSubscriptions());
}
@@ -31,10 +35,10 @@ public sealed class SubszHandler(NatsServer server)
var total = allSubs.Count;
var numSubs = server.GetAccounts()
- .Where(a => string.IsNullOrEmpty(opts.Account) || a.Name == opts.Account)
+ .Where(a => (string.IsNullOrEmpty(opts.Account) && a.Name != "$SYS") || a.Name == opts.Account)
.Aggregate(0u, (sum, a) => sum + a.SubList.Count);
var numCache = server.GetAccounts()
- .Where(a => string.IsNullOrEmpty(opts.Account) || a.Name == opts.Account)
+ .Where(a => (string.IsNullOrEmpty(opts.Account) && a.Name != "$SYS") || a.Name == opts.Account)
.Sum(a => a.SubList.CacheCount);
SubDetail[] details = [];
diff --git a/src/NATS.Server/NatsServer.cs b/src/NATS.Server/NatsServer.cs
index 9239f36..dbae0ba 100644
--- a/src/NATS.Server/NatsServer.cs
+++ b/src/NATS.Server/NatsServer.cs
@@ -398,6 +398,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
_listeningStarted.TrySetResult();
_eventSystem?.Start(this);
+ _eventSystem?.InitEventTracking(this);
_logger.LogInformation("Listening for client connections on {Host}:{Port}", _options.Host, _options.Port);
@@ -724,6 +725,98 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
_eventSystem?.Enqueue(new PublishMessage { Subject = subject, Body = msg });
}
+ ///
+ /// Handles $SYS.REQ.SERVER.{id}.VARZ requests.
+ /// Returns core server information including stats counters.
+ ///
+ public void HandleVarzRequest(string subject, string? reply)
+ {
+ if (reply == null) return;
+ var varz = new
+ {
+ server_id = _serverInfo.ServerId,
+ server_name = _serverInfo.ServerName,
+ version = NatsProtocol.Version,
+ host = _options.Host,
+ port = _options.Port,
+ max_payload = _options.MaxPayload,
+ connections = ClientCount,
+ total_connections = Interlocked.Read(ref _stats.TotalConnections),
+ in_msgs = Interlocked.Read(ref _stats.InMsgs),
+ out_msgs = Interlocked.Read(ref _stats.OutMsgs),
+ in_bytes = Interlocked.Read(ref _stats.InBytes),
+ out_bytes = Interlocked.Read(ref _stats.OutBytes),
+ };
+ SendInternalMsg(reply, null, varz);
+ }
+
+ ///
+ /// Handles $SYS.REQ.SERVER.{id}.HEALTHZ requests.
+ /// Returns a simple health status response.
+ ///
+ public void HandleHealthzRequest(string subject, string? reply)
+ {
+ if (reply == null) return;
+ SendInternalMsg(reply, null, new { status = "ok" });
+ }
+
+ ///
+ /// Handles $SYS.REQ.SERVER.{id}.SUBSZ requests.
+ /// Returns the current subscription count.
+ ///
+ public void HandleSubszRequest(string subject, string? reply)
+ {
+ if (reply == null) return;
+ SendInternalMsg(reply, null, new { num_subscriptions = SubList.Count });
+ }
+
+ ///
+ /// Handles $SYS.REQ.SERVER.{id}.STATSZ requests.
+ /// Publishes current server statistics through the event system.
+ ///
+ public void HandleStatszRequest(string subject, string? reply)
+ {
+ if (reply == null) return;
+ var process = System.Diagnostics.Process.GetCurrentProcess();
+ var statsMsg = new Events.ServerStatsMsg
+ {
+ Server = BuildEventServerInfo(),
+ Stats = new Events.ServerStatsData
+ {
+ Start = StartTime,
+ Mem = process.WorkingSet64,
+ Cores = Environment.ProcessorCount,
+ Connections = ClientCount,
+ TotalConnections = Interlocked.Read(ref _stats.TotalConnections),
+ Subscriptions = SubList.Count,
+ InMsgs = Interlocked.Read(ref _stats.InMsgs),
+ OutMsgs = Interlocked.Read(ref _stats.OutMsgs),
+ InBytes = Interlocked.Read(ref _stats.InBytes),
+ OutBytes = Interlocked.Read(ref _stats.OutBytes),
+ SlowConsumers = Interlocked.Read(ref _stats.SlowConsumers),
+ },
+ };
+ SendInternalMsg(reply, null, statsMsg);
+ }
+
+ ///
+ /// Handles $SYS.REQ.SERVER.{id}.IDZ requests.
+ /// Returns basic server identity information.
+ ///
+ public void HandleIdzRequest(string subject, string? reply)
+ {
+ if (reply == null) return;
+ var idz = new
+ {
+ server_id = _serverInfo.ServerId,
+ server_name = _serverInfo.ServerName,
+ version = NatsProtocol.Version,
+ host = _options.Host,
+ port = _options.Port,
+ };
+ SendInternalMsg(reply, null, idz);
+ }
+
///
/// Builds an EventServerInfo block for embedding in system event messages.
/// Maps to Go's serverInfo() helper used in events.go advisory publishing.
diff --git a/tests/NATS.Server.Tests/SystemRequestReplyTests.cs b/tests/NATS.Server.Tests/SystemRequestReplyTests.cs
new file mode 100644
index 0000000..c5d0c97
--- /dev/null
+++ b/tests/NATS.Server.Tests/SystemRequestReplyTests.cs
@@ -0,0 +1,170 @@
+using System.Text;
+using System.Text.Json;
+using NATS.Server;
+using NATS.Server.Events;
+using Microsoft.Extensions.Logging.Abstractions;
+
+namespace NATS.Server.Tests;
+
+public class SystemRequestReplyTests
+{
+ [Fact]
+ public async Task Varz_request_reply_returns_server_info()
+ {
+ using var server = CreateTestServer();
+ _ = server.StartAsync(CancellationToken.None);
+ await server.WaitForReadyAsync();
+
+ var received = new TaskCompletionSource();
+ var replySubject = $"_INBOX.test.{Guid.NewGuid():N}";
+ server.EventSystem!.SysSubscribe(replySubject, (sub, client, acc, subject, reply, hdr, msg) =>
+ {
+ received.TrySetResult(msg.ToArray());
+ });
+
+ var reqSubject = string.Format(EventSubjects.ServerReq, server.ServerId, "VARZ");
+ server.SendInternalMsg(reqSubject, replySubject, null);
+
+ var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5));
+ var json = Encoding.UTF8.GetString(result);
+ json.ShouldContain("\"server_id\"");
+ json.ShouldContain("\"version\"");
+ json.ShouldContain("\"host\"");
+ json.ShouldContain("\"port\"");
+
+ await server.ShutdownAsync();
+ }
+
+ [Fact]
+ public async Task Healthz_request_reply_returns_ok()
+ {
+ using var server = CreateTestServer();
+ _ = server.StartAsync(CancellationToken.None);
+ await server.WaitForReadyAsync();
+
+ var received = new TaskCompletionSource();
+ var replySubject = $"_INBOX.test.{Guid.NewGuid():N}";
+ server.EventSystem!.SysSubscribe(replySubject, (sub, client, acc, subject, reply, hdr, msg) =>
+ {
+ received.TrySetResult(msg.ToArray());
+ });
+
+ var reqSubject = string.Format(EventSubjects.ServerReq, server.ServerId, "HEALTHZ");
+ server.SendInternalMsg(reqSubject, replySubject, null);
+
+ var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5));
+ var json = Encoding.UTF8.GetString(result);
+ json.ShouldContain("ok");
+
+ await server.ShutdownAsync();
+ }
+
+ [Fact]
+ public async Task Subsz_request_reply_returns_subscription_count()
+ {
+ using var server = CreateTestServer();
+ _ = server.StartAsync(CancellationToken.None);
+ await server.WaitForReadyAsync();
+
+ var received = new TaskCompletionSource();
+ var replySubject = $"_INBOX.test.{Guid.NewGuid():N}";
+ server.EventSystem!.SysSubscribe(replySubject, (sub, client, acc, subject, reply, hdr, msg) =>
+ {
+ received.TrySetResult(msg.ToArray());
+ });
+
+ var reqSubject = string.Format(EventSubjects.ServerReq, server.ServerId, "SUBSZ");
+ server.SendInternalMsg(reqSubject, replySubject, null);
+
+ var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5));
+ var json = Encoding.UTF8.GetString(result);
+ json.ShouldContain("\"num_subscriptions\"");
+
+ await server.ShutdownAsync();
+ }
+
+ [Fact]
+ public async Task Idz_request_reply_returns_server_identity()
+ {
+ using var server = CreateTestServer();
+ _ = server.StartAsync(CancellationToken.None);
+ await server.WaitForReadyAsync();
+
+ var received = new TaskCompletionSource();
+ var replySubject = $"_INBOX.test.{Guid.NewGuid():N}";
+ server.EventSystem!.SysSubscribe(replySubject, (sub, client, acc, subject, reply, hdr, msg) =>
+ {
+ received.TrySetResult(msg.ToArray());
+ });
+
+ var reqSubject = string.Format(EventSubjects.ServerReq, server.ServerId, "IDZ");
+ server.SendInternalMsg(reqSubject, replySubject, null);
+
+ var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5));
+ var json = Encoding.UTF8.GetString(result);
+ json.ShouldContain("\"server_id\"");
+ json.ShouldContain("\"server_name\"");
+
+ await server.ShutdownAsync();
+ }
+
+ [Fact]
+ public async Task Ping_varz_responds_via_wildcard_subject()
+ {
+ using var server = CreateTestServer();
+ _ = server.StartAsync(CancellationToken.None);
+ await server.WaitForReadyAsync();
+
+ var received = new TaskCompletionSource();
+ var replySubject = $"_INBOX.test.{Guid.NewGuid():N}";
+ server.EventSystem!.SysSubscribe(replySubject, (sub, client, acc, subject, reply, hdr, msg) =>
+ {
+ received.TrySetResult(msg.ToArray());
+ });
+
+ var pingSubject = string.Format(EventSubjects.ServerPing, "VARZ");
+ server.SendInternalMsg(pingSubject, replySubject, null);
+
+ var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5));
+ var json = Encoding.UTF8.GetString(result);
+ json.ShouldContain("\"server_id\"");
+
+ await server.ShutdownAsync();
+ }
+
+ [Fact]
+ public async Task Request_without_reply_is_ignored()
+ {
+ using var server = CreateTestServer();
+ _ = server.StartAsync(CancellationToken.None);
+ await server.WaitForReadyAsync();
+
+ // Send a request with no reply subject -- should not crash
+ var reqSubject = string.Format(EventSubjects.ServerReq, server.ServerId, "VARZ");
+ server.SendInternalMsg(reqSubject, null, null);
+
+ // Give it a moment to process without error
+ await Task.Delay(200);
+
+ // Server should still be running
+ server.IsShuttingDown.ShouldBeFalse();
+
+ await server.ShutdownAsync();
+ }
+
+ private static NatsServer CreateTestServer()
+ {
+ var port = GetFreePort();
+ return new NatsServer(new NatsOptions { Port = port }, NullLoggerFactory.Instance);
+ }
+
+ private static int GetFreePort()
+ {
+ using var sock = new System.Net.Sockets.Socket(
+ System.Net.Sockets.AddressFamily.InterNetwork,
+ System.Net.Sockets.SocketType.Stream,
+ System.Net.Sockets.ProtocolType.Tcp);
+ sock.Bind(new System.Net.IPEndPoint(System.Net.IPAddress.Loopback, 0));
+ return ((System.Net.IPEndPoint)sock.LocalEndPoint!).Port;
+ }
+}