feat: add system request-reply monitoring services ($SYS.REQ.SERVER.*)

Register VARZ, HEALTHZ, SUBSZ, STATSZ, and IDZ request-reply handlers
on $SYS.REQ.SERVER.{id}.* subjects and $SYS.REQ.SERVER.PING.* wildcard
subjects via InitEventTracking. Also excludes the $SYS system account
from the /subz monitoring endpoint by default since its subscriptions
are internal infrastructure.
This commit is contained in:
Joseph Doherty
2026-02-23 05:48:32 -05:00
parent 0b34f8cec4
commit 5bae9cc289
4 changed files with 309 additions and 3 deletions

View File

@@ -97,6 +97,45 @@ public sealed class InternalEventSystem : IAsyncDisposable
}, ct);
}
/// <summary>
/// Registers system request-reply monitoring services for this server.
/// Maps to Go's initEventTracking in events.go.
/// Sets up handlers for $SYS.REQ.SERVER.{id}.VARZ, HEALTHZ, SUBSZ, STATSZ, IDZ
/// and wildcard $SYS.REQ.SERVER.PING.* subjects.
/// </summary>
public void InitEventTracking(NatsServer server)
{
_server = server;
var serverId = server.ServerId;
// Server-specific monitoring services
RegisterService(serverId, "VARZ", server.HandleVarzRequest);
RegisterService(serverId, "HEALTHZ", server.HandleHealthzRequest);
RegisterService(serverId, "SUBSZ", server.HandleSubszRequest);
RegisterService(serverId, "STATSZ", server.HandleStatszRequest);
RegisterService(serverId, "IDZ", server.HandleIdzRequest);
// Wildcard ping services (all servers respond)
SysSubscribe(string.Format(EventSubjects.ServerPing, "VARZ"), WrapRequestHandler(server.HandleVarzRequest));
SysSubscribe(string.Format(EventSubjects.ServerPing, "HEALTHZ"), WrapRequestHandler(server.HandleHealthzRequest));
SysSubscribe(string.Format(EventSubjects.ServerPing, "IDZ"), WrapRequestHandler(server.HandleIdzRequest));
SysSubscribe(string.Format(EventSubjects.ServerPing, "STATSZ"), WrapRequestHandler(server.HandleStatszRequest));
}
private void RegisterService(string serverId, string name, Action<string, string?> handler)
{
var subject = string.Format(EventSubjects.ServerReq, serverId, name);
SysSubscribe(subject, WrapRequestHandler(handler));
}
private SystemMessageHandler WrapRequestHandler(Action<string, string?> handler)
{
return (sub, client, acc, subject, reply, hdr, msg) =>
{
handler(subject, reply);
};
}
/// <summary>
/// Publishes a $SYS.SERVER.{id}.STATSZ message with current server statistics.
/// Maps to Go's sendStatsz in events.go.

View File

@@ -14,12 +14,16 @@ public sealed class SubszHandler(NatsServer server)
var opts = ParseQueryParams(ctx);
var now = DateTime.UtcNow;
// Collect subscriptions from all accounts (or filtered)
// Collect subscriptions from all accounts (or filtered).
// Exclude the $SYS system account unless explicitly requested — its internal
// subscriptions are infrastructure and not user-facing.
var allSubs = new List<Subscription>();
foreach (var account in server.GetAccounts())
{
if (!string.IsNullOrEmpty(opts.Account) && account.Name != opts.Account)
continue;
if (string.IsNullOrEmpty(opts.Account) && account.Name == "$SYS")
continue;
allSubs.AddRange(account.SubList.GetAllSubscriptions());
}
@@ -31,10 +35,10 @@ public sealed class SubszHandler(NatsServer server)
var total = allSubs.Count;
var numSubs = server.GetAccounts()
.Where(a => string.IsNullOrEmpty(opts.Account) || a.Name == opts.Account)
.Where(a => (string.IsNullOrEmpty(opts.Account) && a.Name != "$SYS") || a.Name == opts.Account)
.Aggregate(0u, (sum, a) => sum + a.SubList.Count);
var numCache = server.GetAccounts()
.Where(a => string.IsNullOrEmpty(opts.Account) || a.Name == opts.Account)
.Where(a => (string.IsNullOrEmpty(opts.Account) && a.Name != "$SYS") || a.Name == opts.Account)
.Sum(a => a.SubList.CacheCount);
SubDetail[] details = [];

View File

@@ -398,6 +398,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
_listeningStarted.TrySetResult();
_eventSystem?.Start(this);
_eventSystem?.InitEventTracking(this);
_logger.LogInformation("Listening for client connections on {Host}:{Port}", _options.Host, _options.Port);
@@ -724,6 +725,98 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
_eventSystem?.Enqueue(new PublishMessage { Subject = subject, Body = msg });
}
/// <summary>
/// Handles $SYS.REQ.SERVER.{id}.VARZ requests.
/// Returns core server information including stats counters.
/// </summary>
public void HandleVarzRequest(string subject, string? reply)
{
if (reply == null) return;
var varz = new
{
server_id = _serverInfo.ServerId,
server_name = _serverInfo.ServerName,
version = NatsProtocol.Version,
host = _options.Host,
port = _options.Port,
max_payload = _options.MaxPayload,
connections = ClientCount,
total_connections = Interlocked.Read(ref _stats.TotalConnections),
in_msgs = Interlocked.Read(ref _stats.InMsgs),
out_msgs = Interlocked.Read(ref _stats.OutMsgs),
in_bytes = Interlocked.Read(ref _stats.InBytes),
out_bytes = Interlocked.Read(ref _stats.OutBytes),
};
SendInternalMsg(reply, null, varz);
}
/// <summary>
/// Handles $SYS.REQ.SERVER.{id}.HEALTHZ requests.
/// Returns a simple health status response.
/// </summary>
public void HandleHealthzRequest(string subject, string? reply)
{
if (reply == null) return;
SendInternalMsg(reply, null, new { status = "ok" });
}
/// <summary>
/// Handles $SYS.REQ.SERVER.{id}.SUBSZ requests.
/// Returns the current subscription count.
/// </summary>
public void HandleSubszRequest(string subject, string? reply)
{
if (reply == null) return;
SendInternalMsg(reply, null, new { num_subscriptions = SubList.Count });
}
/// <summary>
/// Handles $SYS.REQ.SERVER.{id}.STATSZ requests.
/// Publishes current server statistics through the event system.
/// </summary>
public void HandleStatszRequest(string subject, string? reply)
{
if (reply == null) return;
var process = System.Diagnostics.Process.GetCurrentProcess();
var statsMsg = new Events.ServerStatsMsg
{
Server = BuildEventServerInfo(),
Stats = new Events.ServerStatsData
{
Start = StartTime,
Mem = process.WorkingSet64,
Cores = Environment.ProcessorCount,
Connections = ClientCount,
TotalConnections = Interlocked.Read(ref _stats.TotalConnections),
Subscriptions = SubList.Count,
InMsgs = Interlocked.Read(ref _stats.InMsgs),
OutMsgs = Interlocked.Read(ref _stats.OutMsgs),
InBytes = Interlocked.Read(ref _stats.InBytes),
OutBytes = Interlocked.Read(ref _stats.OutBytes),
SlowConsumers = Interlocked.Read(ref _stats.SlowConsumers),
},
};
SendInternalMsg(reply, null, statsMsg);
}
/// <summary>
/// Handles $SYS.REQ.SERVER.{id}.IDZ requests.
/// Returns basic server identity information.
/// </summary>
public void HandleIdzRequest(string subject, string? reply)
{
if (reply == null) return;
var idz = new
{
server_id = _serverInfo.ServerId,
server_name = _serverInfo.ServerName,
version = NatsProtocol.Version,
host = _options.Host,
port = _options.Port,
};
SendInternalMsg(reply, null, idz);
}
/// <summary>
/// Builds an EventServerInfo block for embedding in system event messages.
/// Maps to Go's serverInfo() helper used in events.go advisory publishing.

View File

@@ -0,0 +1,170 @@
using System.Text;
using System.Text.Json;
using NATS.Server;
using NATS.Server.Events;
using Microsoft.Extensions.Logging.Abstractions;
namespace NATS.Server.Tests;
public class SystemRequestReplyTests
{
[Fact]
public async Task Varz_request_reply_returns_server_info()
{
using var server = CreateTestServer();
_ = server.StartAsync(CancellationToken.None);
await server.WaitForReadyAsync();
var received = new TaskCompletionSource<byte[]>();
var replySubject = $"_INBOX.test.{Guid.NewGuid():N}";
server.EventSystem!.SysSubscribe(replySubject, (sub, client, acc, subject, reply, hdr, msg) =>
{
received.TrySetResult(msg.ToArray());
});
var reqSubject = string.Format(EventSubjects.ServerReq, server.ServerId, "VARZ");
server.SendInternalMsg(reqSubject, replySubject, null);
var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5));
var json = Encoding.UTF8.GetString(result);
json.ShouldContain("\"server_id\"");
json.ShouldContain("\"version\"");
json.ShouldContain("\"host\"");
json.ShouldContain("\"port\"");
await server.ShutdownAsync();
}
[Fact]
public async Task Healthz_request_reply_returns_ok()
{
using var server = CreateTestServer();
_ = server.StartAsync(CancellationToken.None);
await server.WaitForReadyAsync();
var received = new TaskCompletionSource<byte[]>();
var replySubject = $"_INBOX.test.{Guid.NewGuid():N}";
server.EventSystem!.SysSubscribe(replySubject, (sub, client, acc, subject, reply, hdr, msg) =>
{
received.TrySetResult(msg.ToArray());
});
var reqSubject = string.Format(EventSubjects.ServerReq, server.ServerId, "HEALTHZ");
server.SendInternalMsg(reqSubject, replySubject, null);
var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5));
var json = Encoding.UTF8.GetString(result);
json.ShouldContain("ok");
await server.ShutdownAsync();
}
[Fact]
public async Task Subsz_request_reply_returns_subscription_count()
{
using var server = CreateTestServer();
_ = server.StartAsync(CancellationToken.None);
await server.WaitForReadyAsync();
var received = new TaskCompletionSource<byte[]>();
var replySubject = $"_INBOX.test.{Guid.NewGuid():N}";
server.EventSystem!.SysSubscribe(replySubject, (sub, client, acc, subject, reply, hdr, msg) =>
{
received.TrySetResult(msg.ToArray());
});
var reqSubject = string.Format(EventSubjects.ServerReq, server.ServerId, "SUBSZ");
server.SendInternalMsg(reqSubject, replySubject, null);
var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5));
var json = Encoding.UTF8.GetString(result);
json.ShouldContain("\"num_subscriptions\"");
await server.ShutdownAsync();
}
[Fact]
public async Task Idz_request_reply_returns_server_identity()
{
using var server = CreateTestServer();
_ = server.StartAsync(CancellationToken.None);
await server.WaitForReadyAsync();
var received = new TaskCompletionSource<byte[]>();
var replySubject = $"_INBOX.test.{Guid.NewGuid():N}";
server.EventSystem!.SysSubscribe(replySubject, (sub, client, acc, subject, reply, hdr, msg) =>
{
received.TrySetResult(msg.ToArray());
});
var reqSubject = string.Format(EventSubjects.ServerReq, server.ServerId, "IDZ");
server.SendInternalMsg(reqSubject, replySubject, null);
var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5));
var json = Encoding.UTF8.GetString(result);
json.ShouldContain("\"server_id\"");
json.ShouldContain("\"server_name\"");
await server.ShutdownAsync();
}
[Fact]
public async Task Ping_varz_responds_via_wildcard_subject()
{
using var server = CreateTestServer();
_ = server.StartAsync(CancellationToken.None);
await server.WaitForReadyAsync();
var received = new TaskCompletionSource<byte[]>();
var replySubject = $"_INBOX.test.{Guid.NewGuid():N}";
server.EventSystem!.SysSubscribe(replySubject, (sub, client, acc, subject, reply, hdr, msg) =>
{
received.TrySetResult(msg.ToArray());
});
var pingSubject = string.Format(EventSubjects.ServerPing, "VARZ");
server.SendInternalMsg(pingSubject, replySubject, null);
var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5));
var json = Encoding.UTF8.GetString(result);
json.ShouldContain("\"server_id\"");
await server.ShutdownAsync();
}
[Fact]
public async Task Request_without_reply_is_ignored()
{
using var server = CreateTestServer();
_ = server.StartAsync(CancellationToken.None);
await server.WaitForReadyAsync();
// Send a request with no reply subject -- should not crash
var reqSubject = string.Format(EventSubjects.ServerReq, server.ServerId, "VARZ");
server.SendInternalMsg(reqSubject, null, null);
// Give it a moment to process without error
await Task.Delay(200);
// Server should still be running
server.IsShuttingDown.ShouldBeFalse();
await server.ShutdownAsync();
}
private static NatsServer CreateTestServer()
{
var port = GetFreePort();
return new NatsServer(new NatsOptions { Port = port }, NullLoggerFactory.Instance);
}
private static int GetFreePort()
{
using var sock = new System.Net.Sockets.Socket(
System.Net.Sockets.AddressFamily.InterNetwork,
System.Net.Sockets.SocketType.Stream,
System.Net.Sockets.ProtocolType.Tcp);
sock.Bind(new System.Net.IPEndPoint(System.Net.IPAddress.Loopback, 0));
return ((System.Net.IPEndPoint)sock.LocalEndPoint!).Port;
}
}