From 9ece600ebc2109fb1154f6ceecf78b356bf9e6e1 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Wed, 25 Feb 2026 13:07:16 -0500 Subject: [PATCH] feat: add closed connection ring buffer for /connz?state=closed (Gap 10.1) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace ConcurrentQueue with ClosedConnectionRingBuffer — a fixed-size ring buffer that overwrites oldest entries when full, eliminating the manual dequeue-to-cap loop. Adds TotalClosed lifetime counter, GetRecent(), and Clear(). Wires the ring buffer into NatsServer including capacity resize on config reload. Adds 10 unit tests covering capacity, ordering, wrapping, TotalClosed tracking, and Clear behavior. --- .../Monitoring/ClosedConnectionRingBuffer.cs | 104 ++++++++ src/NATS.Server/NatsServer.cs | 19 +- .../ClosedConnectionRingBufferTests.cs | 240 ++++++++++++++++++ 3 files changed, 354 insertions(+), 9 deletions(-) create mode 100644 src/NATS.Server/Monitoring/ClosedConnectionRingBuffer.cs create mode 100644 tests/NATS.Server.Tests/Monitoring/ClosedConnectionRingBufferTests.cs diff --git a/src/NATS.Server/Monitoring/ClosedConnectionRingBuffer.cs b/src/NATS.Server/Monitoring/ClosedConnectionRingBuffer.cs new file mode 100644 index 0000000..a10f028 --- /dev/null +++ b/src/NATS.Server/Monitoring/ClosedConnectionRingBuffer.cs @@ -0,0 +1,104 @@ +namespace NATS.Server.Monitoring; + +/// +/// Fixed-size ring buffer for tracking recently closed client connections. +/// When full, oldest entries are overwritten. Thread-safe via Lock. +/// Go reference: server.go closedClients / ClosedState tracking. +/// +public sealed class ClosedConnectionRingBuffer +{ + private readonly ClosedClient[] _buffer; + private readonly Lock _lock = new(); + private int _head; // Next write position + private int _count; // Current count (up to capacity) + private long _totalClosed; // Running total of all closed connections ever + + public ClosedConnectionRingBuffer(int capacity = 1024) + { + if (capacity <= 0) throw new ArgumentOutOfRangeException(nameof(capacity), "Capacity must be greater than zero."); + _buffer = new ClosedClient[capacity]; + } + + public int Capacity => _buffer.Length; + + public int Count + { + get { lock (_lock) return _count; } + } + + public long TotalClosed + { + get { lock (_lock) return _totalClosed; } + } + + /// + /// Adds a closed connection snapshot. If the buffer is full the oldest entry is overwritten. + /// + public void Add(ClosedClient info) + { + lock (_lock) + { + _buffer[_head] = info; + _head = (_head + 1) % _buffer.Length; + if (_count < _buffer.Length) + _count++; + _totalClosed++; + } + } + + /// + /// Returns a snapshot of all entries ordered newest-first. + /// + public IReadOnlyList GetAll() + { + lock (_lock) + { + return BuildSnapshot(_count); + } + } + + /// + /// Returns up to most recent entries, ordered newest-first. + /// + public IReadOnlyList GetRecent(int count) + { + lock (_lock) + { + var take = Math.Min(count, _count); + return BuildSnapshot(take); + } + } + + /// + /// Clears all entries and resets counters. + /// + public void Clear() + { + lock (_lock) + { + Array.Clear(_buffer, 0, _buffer.Length); + _head = 0; + _count = 0; + // TotalClosed is intentionally not reset — it is a running lifetime total. + } + } + + // Must be called while _lock is held. + // Returns the 'take' most recent entries, newest-first. + private ClosedClient[] BuildSnapshot(int take) + { + if (take == 0) return []; + + var result = new ClosedClient[take]; + var capacity = _buffer.Length; + + // _head points to the next write slot, so (_head - 1) is the most recent entry. + for (var i = 0; i < take; i++) + { + var idx = ((_head - 1 - i) % capacity + capacity) % capacity; + result[i] = _buffer[idx]; + } + + return result; + } +} diff --git a/src/NATS.Server/NatsServer.cs b/src/NATS.Server/NatsServer.cs index 90a27df..5ecedfd 100644 --- a/src/NATS.Server/NatsServer.cs +++ b/src/NATS.Server/NatsServer.cs @@ -32,7 +32,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable { private readonly NatsOptions _options; private readonly ConcurrentDictionary _clients = new(); - private readonly ConcurrentQueue _closedClients = new(); + private ClosedConnectionRingBuffer _closedClients; private readonly ServerInfo _serverInfo; private readonly ILogger _logger; private readonly ILoggerFactory _loggerFactory; @@ -118,7 +118,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable public Action? ReOpenLogFile { get; set; } public IEnumerable GetClients() => _clients.Values; - public IEnumerable GetClosedClients() => _closedClients; + public IReadOnlyList GetClosedClients() => _closedClients.GetAll(); public IEnumerable GetAccounts() => _accounts.Values; public bool HasRemoteInterest(string subject) => _globalAccount.SubList.HasRemoteInterest(subject); @@ -364,6 +364,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable _options = options; _loggerFactory = loggerFactory; _logger = loggerFactory.CreateLogger(); + _closedClients = new ClosedConnectionRingBuffer(options.MaxClosedClients); _authService = AuthService.Build(options); _globalAccount = new Account(Account.GlobalAccountName); _accounts[Account.GlobalAccountName] = _globalAccount; @@ -1509,8 +1510,8 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable _clients.TryRemove(client.Id, out _); _logger.LogDebug("Removed client {ClientId}", client.Id); - // Snapshot for closed-connections tracking - _closedClients.Enqueue(new ClosedClient + // Snapshot for closed-connections tracking (ring buffer auto-overwrites oldest when full) + _closedClients.Add(new ClosedClient { Cid = client.Id, Ip = client.RemoteIp ?? "", @@ -1538,10 +1539,6 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable Proxy = client.ClientOpts?.Username?.StartsWith("proxy:", StringComparison.Ordinal) == true ? "true" : "", }); - // Cap closed clients list - while (_closedClients.Count > _options.MaxClosedClients) - _closedClients.TryDequeue(out _); - var subList = client.Account?.SubList ?? _globalAccount.SubList; client.RemoveAllSubscriptions(subList); client.Account?.RemoveClient(client.Id); @@ -1830,7 +1827,11 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable _options.MaxSubs = newOpts.MaxSubs; _options.MaxSubTokens = newOpts.MaxSubTokens; _options.MaxTracedMsgLen = newOpts.MaxTracedMsgLen; - _options.MaxClosedClients = newOpts.MaxClosedClients; + if (newOpts.MaxClosedClients != _options.MaxClosedClients) + { + _options.MaxClosedClients = newOpts.MaxClosedClients; + _closedClients = new ClosedConnectionRingBuffer(newOpts.MaxClosedClients); + } // TLS _options.TlsCert = newOpts.TlsCert; diff --git a/tests/NATS.Server.Tests/Monitoring/ClosedConnectionRingBufferTests.cs b/tests/NATS.Server.Tests/Monitoring/ClosedConnectionRingBufferTests.cs new file mode 100644 index 0000000..aaeda9d --- /dev/null +++ b/tests/NATS.Server.Tests/Monitoring/ClosedConnectionRingBufferTests.cs @@ -0,0 +1,240 @@ +// Go reference: server/monitor.go — closedClients ring buffer, ClosedState tracking. +// These tests verify the fixed-size ring buffer used to track recently closed connections +// for the /connz?state=closed monitoring endpoint. + +using NATS.Server.Monitoring; + +namespace NATS.Server.Tests.Monitoring; + +public class ClosedConnectionRingBufferTests +{ + // ----------------------------------------------------------------------- + // Helpers + // ----------------------------------------------------------------------- + + private static ClosedClient MakeEntry(ulong cid, string reason = "normal") => new() + { + Cid = cid, + Ip = "127.0.0.1", + Port = 4222, + Start = DateTime.UtcNow.AddSeconds(-10), + Stop = DateTime.UtcNow, + Reason = reason, + }; + + // ----------------------------------------------------------------------- + // 1. Add_IncreasesCount + // ----------------------------------------------------------------------- + + /// + /// Adding entries should increase Count up to capacity. + /// + [Fact] + public void Add_IncreasesCount() + { + var buf = new ClosedConnectionRingBuffer(capacity: 10); + + buf.Count.ShouldBe(0); + + buf.Add(MakeEntry(1)); + buf.Count.ShouldBe(1); + + buf.Add(MakeEntry(2)); + buf.Count.ShouldBe(2); + + buf.Add(MakeEntry(3)); + buf.Count.ShouldBe(3); + } + + // ----------------------------------------------------------------------- + // 2. Add_RingOverwrite_CapacityNotExceeded + // ----------------------------------------------------------------------- + + /// + /// When capacity is exceeded the count stays at capacity (oldest entry is overwritten). + /// + [Fact] + public void Add_RingOverwrite_CapacityNotExceeded() + { + const int capacity = 5; + var buf = new ClosedConnectionRingBuffer(capacity); + + for (var i = 1; i <= capacity + 3; i++) + buf.Add(MakeEntry((ulong)i)); + + buf.Count.ShouldBe(capacity); + } + + // ----------------------------------------------------------------------- + // 3. GetAll_ReturnsNewestFirst + // ----------------------------------------------------------------------- + + /// + /// GetAll should return entries ordered newest-first. + /// + [Fact] + public void GetAll_ReturnsNewestFirst() + { + var buf = new ClosedConnectionRingBuffer(capacity: 10); + + buf.Add(MakeEntry(1)); + buf.Add(MakeEntry(2)); + buf.Add(MakeEntry(3)); + + var all = buf.GetAll(); + + all.Count.ShouldBe(3); + all[0].Cid.ShouldBe(3UL); // newest + all[1].Cid.ShouldBe(2UL); + all[2].Cid.ShouldBe(1UL); // oldest + } + + // ----------------------------------------------------------------------- + // 4. GetAll_EmptyBuffer_ReturnsEmpty + // ----------------------------------------------------------------------- + + /// + /// GetAll on an empty buffer should return an empty list. + /// + [Fact] + public void GetAll_EmptyBuffer_ReturnsEmpty() + { + var buf = new ClosedConnectionRingBuffer(capacity: 10); + + var all = buf.GetAll(); + + all.ShouldBeEmpty(); + } + + // ----------------------------------------------------------------------- + // 5. GetRecent_ReturnsRequestedCount + // ----------------------------------------------------------------------- + + /// + /// GetRecent(n) where n <= Count should return exactly n entries. + /// + [Fact] + public void GetRecent_ReturnsRequestedCount() + { + var buf = new ClosedConnectionRingBuffer(capacity: 10); + for (var i = 1; i <= 8; i++) + buf.Add(MakeEntry((ulong)i)); + + var recent = buf.GetRecent(3); + + recent.Count.ShouldBe(3); + recent[0].Cid.ShouldBe(8UL); // newest + recent[1].Cid.ShouldBe(7UL); + recent[2].Cid.ShouldBe(6UL); + } + + // ----------------------------------------------------------------------- + // 6. GetRecent_LessThanAvailable_ReturnsAll + // ----------------------------------------------------------------------- + + /// + /// GetRecent(n) where n > Count should return all available entries. + /// + [Fact] + public void GetRecent_LessThanAvailable_ReturnsAll() + { + var buf = new ClosedConnectionRingBuffer(capacity: 10); + + buf.Add(MakeEntry(1)); + buf.Add(MakeEntry(2)); + + var recent = buf.GetRecent(100); + + recent.Count.ShouldBe(2); + } + + // ----------------------------------------------------------------------- + // 7. TotalClosed_TracksAllAdditions + // ----------------------------------------------------------------------- + + /// + /// TotalClosed should increment for every Add, even after the ring wraps around. + /// + [Fact] + public void TotalClosed_TracksAllAdditions() + { + const int capacity = 4; + var buf = new ClosedConnectionRingBuffer(capacity); + + buf.TotalClosed.ShouldBe(0L); + + for (var i = 1; i <= 10; i++) + buf.Add(MakeEntry((ulong)i)); + + buf.TotalClosed.ShouldBe(10L); + buf.Count.ShouldBe(capacity); // buffer is full but total reflects all 10 + } + + // ----------------------------------------------------------------------- + // 8. Clear_ResetsCountAndBuffer + // ----------------------------------------------------------------------- + + /// + /// Clear should reset Count to zero and GetAll should return an empty list. + /// TotalClosed is intentionally not reset because it is a running lifetime counter. + /// + [Fact] + public void Clear_ResetsCountAndBuffer() + { + var buf = new ClosedConnectionRingBuffer(capacity: 10); + + buf.Add(MakeEntry(1)); + buf.Add(MakeEntry(2)); + buf.Add(MakeEntry(3)); + buf.TotalClosed.ShouldBe(3L); + + buf.Clear(); + + buf.Count.ShouldBe(0); + buf.GetAll().ShouldBeEmpty(); + // TotalClosed is a lifetime counter; it is not reset by Clear. + buf.TotalClosed.ShouldBe(3L); + } + + // ----------------------------------------------------------------------- + // 9. Capacity_ReturnsConfiguredSize + // ----------------------------------------------------------------------- + + /// + /// Capacity should reflect the value passed to the constructor. + /// + [Fact] + public void Capacity_ReturnsConfiguredSize() + { + var buf = new ClosedConnectionRingBuffer(capacity: 42); + buf.Capacity.ShouldBe(42); + } + + // ----------------------------------------------------------------------- + // 10. Add_WrapsCorrectly + // ----------------------------------------------------------------------- + + /// + /// After adding capacity+1 items the oldest entry (cid=1) should no longer be present, + /// and the buffer should contain the most recent 'capacity' items. + /// + [Fact] + public void Add_WrapsCorrectly() + { + const int capacity = 5; + var buf = new ClosedConnectionRingBuffer(capacity); + + for (var i = 1; i <= capacity + 1; i++) + buf.Add(MakeEntry((ulong)i)); + + var all = buf.GetAll(); + + all.Count.ShouldBe(capacity); + + // cid=1 (the oldest) should have been overwritten + all.Any(e => e.Cid == 1UL).ShouldBeFalse(); + + // The newest entry (cid=capacity+1) should be first + all[0].Cid.ShouldBe((ulong)(capacity + 1)); + } +}