From 1a777e09c90061029fa316aee474e832d1890053 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 22 Feb 2026 22:04:43 -0500 Subject: [PATCH] feat: add ServerStats counters and NatsClient metadata for monitoring --- src/NATS.Server/NatsClient.cs | 23 +++++- src/NATS.Server/NatsServer.cs | 12 ++- src/NATS.Server/ServerStats.cs | 20 +++++ tests/NATS.Server.Tests/ClientTests.cs | 2 +- tests/NATS.Server.Tests/ServerStatsTests.cs | 84 +++++++++++++++++++++ 5 files changed, 138 insertions(+), 3 deletions(-) create mode 100644 src/NATS.Server/ServerStats.cs create mode 100644 tests/NATS.Server.Tests/ServerStatsTests.cs diff --git a/src/NATS.Server/NatsClient.cs b/src/NATS.Server/NatsClient.cs index df04363..31e69a0 100644 --- a/src/NATS.Server/NatsClient.cs +++ b/src/NATS.Server/NatsClient.cs @@ -1,5 +1,6 @@ using System.Buffers; using System.IO.Pipelines; +using System.Net; using System.Net.Sockets; using System.Text; using System.Text.Json; @@ -32,11 +33,17 @@ public sealed class NatsClient : IDisposable private CancellationTokenSource? _clientCts; private readonly Dictionary _subs = new(); private readonly ILogger _logger; + private readonly ServerStats _serverStats; public ulong Id { get; } public ClientOptions? ClientOpts { get; private set; } public IMessageRouter? Router { get; set; } public bool ConnectReceived { get; private set; } + public DateTime StartTime { get; } + private long _lastActivityTicks; + public DateTime LastActivity => new(Interlocked.Read(ref _lastActivityTicks), DateTimeKind.Utc); + public string? RemoteIp { get; } + public int RemotePort { get; } // Stats public long InMsgs; @@ -50,7 +57,8 @@ public sealed class NatsClient : IDisposable public IReadOnlyDictionary Subscriptions => _subs; - public NatsClient(ulong id, Socket socket, NatsOptions options, ServerInfo serverInfo, ILogger logger) + public NatsClient(ulong id, Socket socket, NatsOptions options, ServerInfo serverInfo, + ILogger logger, ServerStats serverStats) { Id = id; _socket = socket; @@ -58,7 +66,15 @@ public sealed class NatsClient : IDisposable _options = options; _serverInfo = serverInfo; _logger = logger; + _serverStats = serverStats; _parser = new NatsParser(options.MaxPayload); + StartTime = DateTime.UtcNow; + _lastActivityTicks = StartTime.Ticks; + if (socket.RemoteEndPoint is IPEndPoint ep) + { + RemoteIp = ep.Address.ToString(); + RemotePort = ep.Port; + } } public async Task RunAsync(CancellationToken ct) @@ -147,6 +163,7 @@ public sealed class NatsClient : IDisposable private async ValueTask DispatchCommandAsync(ParsedCommand cmd, CancellationToken ct) { + Interlocked.Exchange(ref _lastActivityTicks, DateTime.UtcNow.Ticks); switch (cmd.Type) { case CommandType.Connect: @@ -226,6 +243,8 @@ public sealed class NatsClient : IDisposable { Interlocked.Increment(ref InMsgs); Interlocked.Add(ref InBytes, cmd.Payload.Length); + Interlocked.Increment(ref _serverStats.InMsgs); + Interlocked.Add(ref _serverStats.InBytes, cmd.Payload.Length); // Max payload validation (always, hard close) if (cmd.Payload.Length > _options.MaxPayload) @@ -268,6 +287,8 @@ public sealed class NatsClient : IDisposable { Interlocked.Increment(ref OutMsgs); Interlocked.Add(ref OutBytes, payload.Length + headers.Length); + Interlocked.Increment(ref _serverStats.OutMsgs); + Interlocked.Add(ref _serverStats.OutBytes, payload.Length + headers.Length); byte[] line; if (headers.Length > 0) diff --git a/src/NATS.Server/NatsServer.cs b/src/NATS.Server/NatsServer.cs index 5900aef..b49c9d5 100644 --- a/src/NATS.Server/NatsServer.cs +++ b/src/NATS.Server/NatsServer.cs @@ -16,11 +16,19 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable private readonly ServerInfo _serverInfo; private readonly ILogger _logger; private readonly ILoggerFactory _loggerFactory; + private readonly ServerStats _stats = new(); private readonly TaskCompletionSource _listeningStarted = new(TaskCreationOptions.RunContinuationsAsynchronously); private Socket? _listener; private ulong _nextClientId; + private long _startTimeTicks; public SubList SubList => _subList; + public ServerStats Stats => _stats; + public DateTime StartTime => new(Interlocked.Read(ref _startTimeTicks), DateTimeKind.Utc); + public string ServerId => _serverInfo.ServerId; + public string ServerName => _serverInfo.ServerName; + public int ClientCount => _clients.Count; + public IEnumerable GetClients() => _clients.Values; public Task WaitForReadyAsync() => _listeningStarted.Task; @@ -47,6 +55,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable _listener.Bind(new IPEndPoint( _options.Host == "0.0.0.0" ? IPAddress.Any : IPAddress.Parse(_options.Host), _options.Port)); + Interlocked.Exchange(ref _startTimeTicks, DateTime.UtcNow.Ticks); _listener.Listen(128); _listeningStarted.TrySetResult(); @@ -84,11 +93,12 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable } var clientId = Interlocked.Increment(ref _nextClientId); + Interlocked.Increment(ref _stats.TotalConnections); _logger.LogDebug("Client {ClientId} connected from {RemoteEndpoint}", clientId, socket.RemoteEndPoint); var clientLogger = _loggerFactory.CreateLogger($"NATS.Server.NatsClient[{clientId}]"); - var client = new NatsClient(clientId, socket, _options, _serverInfo, clientLogger); + var client = new NatsClient(clientId, socket, _options, _serverInfo, clientLogger, _stats); client.Router = this; _clients[clientId] = client; diff --git a/src/NATS.Server/ServerStats.cs b/src/NATS.Server/ServerStats.cs new file mode 100644 index 0000000..b737dee --- /dev/null +++ b/src/NATS.Server/ServerStats.cs @@ -0,0 +1,20 @@ +using System.Collections.Concurrent; + +namespace NATS.Server; + +public sealed class ServerStats +{ + public long InMsgs; + public long OutMsgs; + public long InBytes; + public long OutBytes; + public long TotalConnections; + public long SlowConsumers; + public long StaleConnections; + public long Stalls; + public long SlowConsumerClients; + public long SlowConsumerRoutes; + public long SlowConsumerLeafs; + public long SlowConsumerGateways; + public readonly ConcurrentDictionary HttpReqStats = new(); +} diff --git a/tests/NATS.Server.Tests/ClientTests.cs b/tests/NATS.Server.Tests/ClientTests.cs index 9e6b9a8..ce65f18 100644 --- a/tests/NATS.Server.Tests/ClientTests.cs +++ b/tests/NATS.Server.Tests/ClientTests.cs @@ -39,7 +39,7 @@ public class ClientTests : IAsyncDisposable Port = 4222, }; - _natsClient = new NatsClient(1, _serverSocket, new NatsOptions(), serverInfo, NullLogger.Instance); + _natsClient = new NatsClient(1, _serverSocket, new NatsOptions(), serverInfo, NullLogger.Instance, new ServerStats()); } public async ValueTask DisposeAsync() diff --git a/tests/NATS.Server.Tests/ServerStatsTests.cs b/tests/NATS.Server.Tests/ServerStatsTests.cs new file mode 100644 index 0000000..6b45bd3 --- /dev/null +++ b/tests/NATS.Server.Tests/ServerStatsTests.cs @@ -0,0 +1,84 @@ +using System.Net; +using System.Net.Sockets; +using Microsoft.Extensions.Logging.Abstractions; +using NATS.Server; + +namespace NATS.Server.Tests; + +public class ServerStatsTests : IAsyncLifetime +{ + private readonly NatsServer _server; + private readonly int _port; + private readonly CancellationTokenSource _cts = new(); + + public ServerStatsTests() + { + _port = GetFreePort(); + _server = new NatsServer(new NatsOptions { Port = _port }, NullLoggerFactory.Instance); + } + + public async Task InitializeAsync() + { + _ = _server.StartAsync(_cts.Token); + await _server.WaitForReadyAsync(); + } + + public Task DisposeAsync() + { + _cts.Cancel(); + _server.Dispose(); + return Task.CompletedTask; + } + + [Fact] + public void Server_has_start_time() + { + _server.StartTime.ShouldNotBe(default); + _server.StartTime.ShouldBeLessThanOrEqualTo(DateTime.UtcNow); + } + + [Fact] + public async Task Server_tracks_total_connections() + { + using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + await sock.ConnectAsync(new IPEndPoint(IPAddress.Loopback, _port)); + await Task.Delay(100); + _server.Stats.TotalConnections.ShouldBeGreaterThanOrEqualTo(1); + } + + [Fact] + public async Task Server_stats_track_messages() + { + using var pub = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + await pub.ConnectAsync(new IPEndPoint(IPAddress.Loopback, _port)); + + var buf = new byte[4096]; + await pub.ReceiveAsync(buf, SocketFlags.None); // INFO + + await pub.SendAsync("CONNECT {}\r\nSUB test 1\r\nPUB test 5\r\nhello\r\n"u8.ToArray()); + await Task.Delay(200); + + _server.Stats.InMsgs.ShouldBeGreaterThanOrEqualTo(1); + _server.Stats.InBytes.ShouldBeGreaterThanOrEqualTo(5); + } + + [Fact] + public async Task Client_has_metadata() + { + using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + await sock.ConnectAsync(new IPEndPoint(IPAddress.Loopback, _port)); + await Task.Delay(100); + + var client = _server.GetClients().First(); + client.RemoteIp.ShouldNotBeNullOrEmpty(); + client.RemotePort.ShouldBeGreaterThan(0); + client.StartTime.ShouldNotBe(default); + } + + private static int GetFreePort() + { + using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); + sock.Bind(new IPEndPoint(IPAddress.Loopback, 0)); + return ((IPEndPoint)sock.LocalEndPoint!).Port; + } +}