feat: add closed connection ring buffer for /connz?state=closed (Gap 10.1)

Replace ConcurrentQueue<ClosedClient> with ClosedConnectionRingBuffer — a
fixed-size ring buffer that overwrites oldest entries when full, eliminating
the manual dequeue-to-cap loop. Adds TotalClosed lifetime counter, GetRecent(),
and Clear(). Wires the ring buffer into NatsServer including capacity resize
on config reload. Adds 10 unit tests covering capacity, ordering, wrapping,
TotalClosed tracking, and Clear behavior.
This commit is contained in:
Joseph Doherty
2026-02-25 13:07:16 -05:00
parent 9fb2ae205c
commit 9ece600ebc
3 changed files with 354 additions and 9 deletions

View File

@@ -0,0 +1,104 @@
namespace NATS.Server.Monitoring;
/// <summary>
/// Fixed-size ring buffer for tracking recently closed client connections.
/// When full, oldest entries are overwritten. Thread-safe via Lock.
/// Go reference: server.go closedClients / ClosedState tracking.
/// </summary>
public sealed class ClosedConnectionRingBuffer
{
private readonly ClosedClient[] _buffer;
private readonly Lock _lock = new();
private int _head; // Next write position
private int _count; // Current count (up to capacity)
private long _totalClosed; // Running total of all closed connections ever
public ClosedConnectionRingBuffer(int capacity = 1024)
{
if (capacity <= 0) throw new ArgumentOutOfRangeException(nameof(capacity), "Capacity must be greater than zero.");
_buffer = new ClosedClient[capacity];
}
public int Capacity => _buffer.Length;
public int Count
{
get { lock (_lock) return _count; }
}
public long TotalClosed
{
get { lock (_lock) return _totalClosed; }
}
/// <summary>
/// Adds a closed connection snapshot. If the buffer is full the oldest entry is overwritten.
/// </summary>
public void Add(ClosedClient info)
{
lock (_lock)
{
_buffer[_head] = info;
_head = (_head + 1) % _buffer.Length;
if (_count < _buffer.Length)
_count++;
_totalClosed++;
}
}
/// <summary>
/// Returns a snapshot of all entries ordered newest-first.
/// </summary>
public IReadOnlyList<ClosedClient> GetAll()
{
lock (_lock)
{
return BuildSnapshot(_count);
}
}
/// <summary>
/// Returns up to <paramref name="count"/> most recent entries, ordered newest-first.
/// </summary>
public IReadOnlyList<ClosedClient> GetRecent(int count)
{
lock (_lock)
{
var take = Math.Min(count, _count);
return BuildSnapshot(take);
}
}
/// <summary>
/// Clears all entries and resets counters.
/// </summary>
public void Clear()
{
lock (_lock)
{
Array.Clear(_buffer, 0, _buffer.Length);
_head = 0;
_count = 0;
// TotalClosed is intentionally not reset — it is a running lifetime total.
}
}
// Must be called while _lock is held.
// Returns the 'take' most recent entries, newest-first.
private ClosedClient[] BuildSnapshot(int take)
{
if (take == 0) return [];
var result = new ClosedClient[take];
var capacity = _buffer.Length;
// _head points to the next write slot, so (_head - 1) is the most recent entry.
for (var i = 0; i < take; i++)
{
var idx = ((_head - 1 - i) % capacity + capacity) % capacity;
result[i] = _buffer[idx];
}
return result;
}
}

View File

@@ -32,7 +32,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
{
private readonly NatsOptions _options;
private readonly ConcurrentDictionary<ulong, NatsClient> _clients = new();
private readonly ConcurrentQueue<ClosedClient> _closedClients = new();
private ClosedConnectionRingBuffer _closedClients;
private readonly ServerInfo _serverInfo;
private readonly ILogger<NatsServer> _logger;
private readonly ILoggerFactory _loggerFactory;
@@ -118,7 +118,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
public Action? ReOpenLogFile { get; set; }
public IEnumerable<NatsClient> GetClients() => _clients.Values;
public IEnumerable<ClosedClient> GetClosedClients() => _closedClients;
public IReadOnlyList<ClosedClient> GetClosedClients() => _closedClients.GetAll();
public IEnumerable<Auth.Account> GetAccounts() => _accounts.Values;
public bool HasRemoteInterest(string subject) => _globalAccount.SubList.HasRemoteInterest(subject);
@@ -364,6 +364,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
_options = options;
_loggerFactory = loggerFactory;
_logger = loggerFactory.CreateLogger<NatsServer>();
_closedClients = new ClosedConnectionRingBuffer(options.MaxClosedClients);
_authService = AuthService.Build(options);
_globalAccount = new Account(Account.GlobalAccountName);
_accounts[Account.GlobalAccountName] = _globalAccount;
@@ -1509,8 +1510,8 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
_clients.TryRemove(client.Id, out _);
_logger.LogDebug("Removed client {ClientId}", client.Id);
// Snapshot for closed-connections tracking
_closedClients.Enqueue(new ClosedClient
// Snapshot for closed-connections tracking (ring buffer auto-overwrites oldest when full)
_closedClients.Add(new ClosedClient
{
Cid = client.Id,
Ip = client.RemoteIp ?? "",
@@ -1538,10 +1539,6 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
Proxy = client.ClientOpts?.Username?.StartsWith("proxy:", StringComparison.Ordinal) == true ? "true" : "",
});
// Cap closed clients list
while (_closedClients.Count > _options.MaxClosedClients)
_closedClients.TryDequeue(out _);
var subList = client.Account?.SubList ?? _globalAccount.SubList;
client.RemoveAllSubscriptions(subList);
client.Account?.RemoveClient(client.Id);
@@ -1830,7 +1827,11 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
_options.MaxSubs = newOpts.MaxSubs;
_options.MaxSubTokens = newOpts.MaxSubTokens;
_options.MaxTracedMsgLen = newOpts.MaxTracedMsgLen;
_options.MaxClosedClients = newOpts.MaxClosedClients;
if (newOpts.MaxClosedClients != _options.MaxClosedClients)
{
_options.MaxClosedClients = newOpts.MaxClosedClients;
_closedClients = new ClosedConnectionRingBuffer(newOpts.MaxClosedClients);
}
// TLS
_options.TlsCert = newOpts.TlsCert;