feat: add ServerStats counters and NatsClient metadata for monitoring

This commit is contained in:
Joseph Doherty
2026-02-22 22:04:43 -05:00
parent ceaafc48d4
commit 1a777e09c9
5 changed files with 138 additions and 3 deletions

View File

@@ -1,5 +1,6 @@
using System.Buffers;
using System.IO.Pipelines;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Text.Json;
@@ -32,11 +33,17 @@ public sealed class NatsClient : IDisposable
private CancellationTokenSource? _clientCts;
private readonly Dictionary<string, Subscription> _subs = new();
private readonly ILogger _logger;
private readonly ServerStats _serverStats;
public ulong Id { get; }
public ClientOptions? ClientOpts { get; private set; }
public IMessageRouter? Router { get; set; }
public bool ConnectReceived { get; private set; }
public DateTime StartTime { get; }
private long _lastActivityTicks;
public DateTime LastActivity => new(Interlocked.Read(ref _lastActivityTicks), DateTimeKind.Utc);
public string? RemoteIp { get; }
public int RemotePort { get; }
// Stats
public long InMsgs;
@@ -50,7 +57,8 @@ public sealed class NatsClient : IDisposable
public IReadOnlyDictionary<string, Subscription> Subscriptions => _subs;
public NatsClient(ulong id, Socket socket, NatsOptions options, ServerInfo serverInfo, ILogger logger)
public NatsClient(ulong id, Socket socket, NatsOptions options, ServerInfo serverInfo,
ILogger logger, ServerStats serverStats)
{
Id = id;
_socket = socket;
@@ -58,7 +66,15 @@ public sealed class NatsClient : IDisposable
_options = options;
_serverInfo = serverInfo;
_logger = logger;
_serverStats = serverStats;
_parser = new NatsParser(options.MaxPayload);
StartTime = DateTime.UtcNow;
_lastActivityTicks = StartTime.Ticks;
if (socket.RemoteEndPoint is IPEndPoint ep)
{
RemoteIp = ep.Address.ToString();
RemotePort = ep.Port;
}
}
public async Task RunAsync(CancellationToken ct)
@@ -147,6 +163,7 @@ public sealed class NatsClient : IDisposable
private async ValueTask DispatchCommandAsync(ParsedCommand cmd, CancellationToken ct)
{
Interlocked.Exchange(ref _lastActivityTicks, DateTime.UtcNow.Ticks);
switch (cmd.Type)
{
case CommandType.Connect:
@@ -226,6 +243,8 @@ public sealed class NatsClient : IDisposable
{
Interlocked.Increment(ref InMsgs);
Interlocked.Add(ref InBytes, cmd.Payload.Length);
Interlocked.Increment(ref _serverStats.InMsgs);
Interlocked.Add(ref _serverStats.InBytes, cmd.Payload.Length);
// Max payload validation (always, hard close)
if (cmd.Payload.Length > _options.MaxPayload)
@@ -268,6 +287,8 @@ public sealed class NatsClient : IDisposable
{
Interlocked.Increment(ref OutMsgs);
Interlocked.Add(ref OutBytes, payload.Length + headers.Length);
Interlocked.Increment(ref _serverStats.OutMsgs);
Interlocked.Add(ref _serverStats.OutBytes, payload.Length + headers.Length);
byte[] line;
if (headers.Length > 0)

View File

@@ -16,11 +16,19 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
private readonly ServerInfo _serverInfo;
private readonly ILogger<NatsServer> _logger;
private readonly ILoggerFactory _loggerFactory;
private readonly ServerStats _stats = new();
private readonly TaskCompletionSource _listeningStarted = new(TaskCreationOptions.RunContinuationsAsynchronously);
private Socket? _listener;
private ulong _nextClientId;
private long _startTimeTicks;
public SubList SubList => _subList;
public ServerStats Stats => _stats;
public DateTime StartTime => new(Interlocked.Read(ref _startTimeTicks), DateTimeKind.Utc);
public string ServerId => _serverInfo.ServerId;
public string ServerName => _serverInfo.ServerName;
public int ClientCount => _clients.Count;
public IEnumerable<NatsClient> GetClients() => _clients.Values;
public Task WaitForReadyAsync() => _listeningStarted.Task;
@@ -47,6 +55,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
_listener.Bind(new IPEndPoint(
_options.Host == "0.0.0.0" ? IPAddress.Any : IPAddress.Parse(_options.Host),
_options.Port));
Interlocked.Exchange(ref _startTimeTicks, DateTime.UtcNow.Ticks);
_listener.Listen(128);
_listeningStarted.TrySetResult();
@@ -84,11 +93,12 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
}
var clientId = Interlocked.Increment(ref _nextClientId);
Interlocked.Increment(ref _stats.TotalConnections);
_logger.LogDebug("Client {ClientId} connected from {RemoteEndpoint}", clientId, socket.RemoteEndPoint);
var clientLogger = _loggerFactory.CreateLogger($"NATS.Server.NatsClient[{clientId}]");
var client = new NatsClient(clientId, socket, _options, _serverInfo, clientLogger);
var client = new NatsClient(clientId, socket, _options, _serverInfo, clientLogger, _stats);
client.Router = this;
_clients[clientId] = client;

View File

@@ -0,0 +1,20 @@
using System.Collections.Concurrent;
namespace NATS.Server;
public sealed class ServerStats
{
public long InMsgs;
public long OutMsgs;
public long InBytes;
public long OutBytes;
public long TotalConnections;
public long SlowConsumers;
public long StaleConnections;
public long Stalls;
public long SlowConsumerClients;
public long SlowConsumerRoutes;
public long SlowConsumerLeafs;
public long SlowConsumerGateways;
public readonly ConcurrentDictionary<string, long> HttpReqStats = new();
}

View File

@@ -39,7 +39,7 @@ public class ClientTests : IAsyncDisposable
Port = 4222,
};
_natsClient = new NatsClient(1, _serverSocket, new NatsOptions(), serverInfo, NullLogger.Instance);
_natsClient = new NatsClient(1, _serverSocket, new NatsOptions(), serverInfo, NullLogger.Instance, new ServerStats());
}
public async ValueTask DisposeAsync()

View File

@@ -0,0 +1,84 @@
using System.Net;
using System.Net.Sockets;
using Microsoft.Extensions.Logging.Abstractions;
using NATS.Server;
namespace NATS.Server.Tests;
public class ServerStatsTests : IAsyncLifetime
{
private readonly NatsServer _server;
private readonly int _port;
private readonly CancellationTokenSource _cts = new();
public ServerStatsTests()
{
_port = GetFreePort();
_server = new NatsServer(new NatsOptions { Port = _port }, NullLoggerFactory.Instance);
}
public async Task InitializeAsync()
{
_ = _server.StartAsync(_cts.Token);
await _server.WaitForReadyAsync();
}
public Task DisposeAsync()
{
_cts.Cancel();
_server.Dispose();
return Task.CompletedTask;
}
[Fact]
public void Server_has_start_time()
{
_server.StartTime.ShouldNotBe(default);
_server.StartTime.ShouldBeLessThanOrEqualTo(DateTime.UtcNow);
}
[Fact]
public async Task Server_tracks_total_connections()
{
using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await sock.ConnectAsync(new IPEndPoint(IPAddress.Loopback, _port));
await Task.Delay(100);
_server.Stats.TotalConnections.ShouldBeGreaterThanOrEqualTo(1);
}
[Fact]
public async Task Server_stats_track_messages()
{
using var pub = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await pub.ConnectAsync(new IPEndPoint(IPAddress.Loopback, _port));
var buf = new byte[4096];
await pub.ReceiveAsync(buf, SocketFlags.None); // INFO
await pub.SendAsync("CONNECT {}\r\nSUB test 1\r\nPUB test 5\r\nhello\r\n"u8.ToArray());
await Task.Delay(200);
_server.Stats.InMsgs.ShouldBeGreaterThanOrEqualTo(1);
_server.Stats.InBytes.ShouldBeGreaterThanOrEqualTo(5);
}
[Fact]
public async Task Client_has_metadata()
{
using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
await sock.ConnectAsync(new IPEndPoint(IPAddress.Loopback, _port));
await Task.Delay(100);
var client = _server.GetClients().First();
client.RemoteIp.ShouldNotBeNullOrEmpty();
client.RemotePort.ShouldBeGreaterThan(0);
client.StartTime.ShouldNotBe(default);
}
private static int GetFreePort()
{
using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
sock.Bind(new IPEndPoint(IPAddress.Loopback, 0));
return ((IPEndPoint)sock.LocalEndPoint!).Port;
}
}