feat: add periodic server stats and account connection heartbeat publishing
This commit is contained in:
@@ -85,6 +85,50 @@ public sealed class InternalEventSystem : IAsyncDisposable
|
|||||||
_sendLoop = Task.Run(() => InternalSendLoopAsync(ct), ct);
|
_sendLoop = Task.Run(() => InternalSendLoopAsync(ct), ct);
|
||||||
_receiveLoop = Task.Run(() => InternalReceiveLoopAsync(_receiveQueue, ct), ct);
|
_receiveLoop = Task.Run(() => InternalReceiveLoopAsync(_receiveQueue, ct), ct);
|
||||||
_receiveLoopPings = Task.Run(() => InternalReceiveLoopAsync(_receiveQueuePings, ct), ct);
|
_receiveLoopPings = Task.Run(() => InternalReceiveLoopAsync(_receiveQueuePings, ct), ct);
|
||||||
|
|
||||||
|
// Periodic stats publish every 10 seconds
|
||||||
|
_ = Task.Run(async () =>
|
||||||
|
{
|
||||||
|
using var timer = new PeriodicTimer(TimeSpan.FromSeconds(10));
|
||||||
|
while (await timer.WaitForNextTickAsync(ct))
|
||||||
|
{
|
||||||
|
PublishServerStats();
|
||||||
|
}
|
||||||
|
}, ct);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Publishes a $SYS.SERVER.{id}.STATSZ message with current server statistics.
|
||||||
|
/// Maps to Go's sendStatsz in events.go.
|
||||||
|
/// Can be called manually for testing or is invoked periodically by the stats timer.
|
||||||
|
/// </summary>
|
||||||
|
public void PublishServerStats()
|
||||||
|
{
|
||||||
|
if (_server == null) return;
|
||||||
|
|
||||||
|
var subject = string.Format(EventSubjects.ServerStats, _server.ServerId);
|
||||||
|
var process = System.Diagnostics.Process.GetCurrentProcess();
|
||||||
|
|
||||||
|
var statsMsg = new ServerStatsMsg
|
||||||
|
{
|
||||||
|
Server = _server.BuildEventServerInfo(),
|
||||||
|
Stats = new ServerStatsData
|
||||||
|
{
|
||||||
|
Start = _server.StartTime,
|
||||||
|
Mem = process.WorkingSet64,
|
||||||
|
Cores = Environment.ProcessorCount,
|
||||||
|
Connections = _server.ClientCount,
|
||||||
|
TotalConnections = Interlocked.Read(ref _server.Stats.TotalConnections),
|
||||||
|
Subscriptions = SystemAccount.SubList.Count,
|
||||||
|
InMsgs = Interlocked.Read(ref _server.Stats.InMsgs),
|
||||||
|
OutMsgs = Interlocked.Read(ref _server.Stats.OutMsgs),
|
||||||
|
InBytes = Interlocked.Read(ref _server.Stats.InBytes),
|
||||||
|
OutBytes = Interlocked.Read(ref _server.Stats.OutBytes),
|
||||||
|
SlowConsumers = Interlocked.Read(ref _server.Stats.SlowConsumers),
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
Enqueue(new PublishMessage { Subject = subject, Body = statsMsg });
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
|
|||||||
@@ -74,6 +74,28 @@ public class SystemEventsTests
|
|||||||
await server.ShutdownAsync();
|
await server.ShutdownAsync();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Server_publishes_statsz_periodically()
|
||||||
|
{
|
||||||
|
using var server = CreateTestServer();
|
||||||
|
_ = server.StartAsync(CancellationToken.None);
|
||||||
|
await server.WaitForReadyAsync();
|
||||||
|
|
||||||
|
var received = new TaskCompletionSource<string>();
|
||||||
|
server.EventSystem!.SysSubscribe("$SYS.SERVER.*.STATSZ", (sub, client, acc, subject, reply, hdr, msg) =>
|
||||||
|
{
|
||||||
|
received.TrySetResult(subject);
|
||||||
|
});
|
||||||
|
|
||||||
|
// Trigger a manual stats publish (don't wait 10s)
|
||||||
|
server.EventSystem!.PublishServerStats();
|
||||||
|
|
||||||
|
var result = await received.Task.WaitAsync(TimeSpan.FromSeconds(5));
|
||||||
|
result.ShouldContain(".STATSZ");
|
||||||
|
|
||||||
|
await server.ShutdownAsync();
|
||||||
|
}
|
||||||
|
|
||||||
[Fact]
|
[Fact]
|
||||||
public async Task Server_publishes_shutdown_event()
|
public async Task Server_publishes_shutdown_event()
|
||||||
{
|
{
|
||||||
|
|||||||
Reference in New Issue
Block a user