diff --git a/src/NATS.Server/Events/InternalEventSystem.cs b/src/NATS.Server/Events/InternalEventSystem.cs index d0c0964..f8f49f1 100644 --- a/src/NATS.Server/Events/InternalEventSystem.cs +++ b/src/NATS.Server/Events/InternalEventSystem.cs @@ -85,6 +85,50 @@ public sealed class InternalEventSystem : IAsyncDisposable _sendLoop = Task.Run(() => InternalSendLoopAsync(ct), ct); _receiveLoop = Task.Run(() => InternalReceiveLoopAsync(_receiveQueue, ct), ct); _receiveLoopPings = Task.Run(() => InternalReceiveLoopAsync(_receiveQueuePings, ct), ct); + + // Periodic stats publish every 10 seconds + _ = Task.Run(async () => + { + using var timer = new PeriodicTimer(TimeSpan.FromSeconds(10)); + while (await timer.WaitForNextTickAsync(ct)) + { + PublishServerStats(); + } + }, ct); + } + + /// + /// Publishes a $SYS.SERVER.{id}.STATSZ message with current server statistics. + /// Maps to Go's sendStatsz in events.go. + /// Can be called manually for testing or is invoked periodically by the stats timer. + /// + public void PublishServerStats() + { + if (_server == null) return; + + var subject = string.Format(EventSubjects.ServerStats, _server.ServerId); + var process = System.Diagnostics.Process.GetCurrentProcess(); + + var statsMsg = new ServerStatsMsg + { + Server = _server.BuildEventServerInfo(), + Stats = new ServerStatsData + { + Start = _server.StartTime, + Mem = process.WorkingSet64, + Cores = Environment.ProcessorCount, + Connections = _server.ClientCount, + TotalConnections = Interlocked.Read(ref _server.Stats.TotalConnections), + Subscriptions = SystemAccount.SubList.Count, + InMsgs = Interlocked.Read(ref _server.Stats.InMsgs), + OutMsgs = Interlocked.Read(ref _server.Stats.OutMsgs), + InBytes = Interlocked.Read(ref _server.Stats.InBytes), + OutBytes = Interlocked.Read(ref _server.Stats.OutBytes), + SlowConsumers = Interlocked.Read(ref _server.Stats.SlowConsumers), + }, + }; + + Enqueue(new PublishMessage { Subject = subject, Body = statsMsg }); } /// diff --git a/tests/NATS.Server.Tests/SystemEventsTests.cs b/tests/NATS.Server.Tests/SystemEventsTests.cs index 7a11bdd..efd6d11 100644 --- a/tests/NATS.Server.Tests/SystemEventsTests.cs +++ b/tests/NATS.Server.Tests/SystemEventsTests.cs @@ -74,6 +74,28 @@ public class SystemEventsTests await server.ShutdownAsync(); } + [Fact] + public async Task Server_publishes_statsz_periodically() + { + using var server = CreateTestServer(); + _ = server.StartAsync(CancellationToken.None); + await server.WaitForReadyAsync(); + + var received = new TaskCompletionSource(); + server.EventSystem!.SysSubscribe("$SYS.SERVER.*.STATSZ", (sub, client, acc, subject, reply, hdr, msg) => + { + received.TrySetResult(subject); + }); + + // Trigger a manual stats publish (don't wait 10s) + server.EventSystem!.PublishServerStats(); + + var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5)); + result.ShouldContain(".STATSZ"); + + await server.ShutdownAsync(); + } + [Fact] public async Task Server_publishes_shutdown_event() {