diff --git a/src/NATS.Server/Monitoring/ClosedConnectionRingBuffer.cs b/src/NATS.Server/Monitoring/ClosedConnectionRingBuffer.cs new file mode 100644 index 0000000..a10f028 --- /dev/null +++ b/src/NATS.Server/Monitoring/ClosedConnectionRingBuffer.cs @@ -0,0 +1,104 @@ +namespace NATS.Server.Monitoring; + +/// +/// Fixed-size ring buffer for tracking recently closed client connections. +/// When full, oldest entries are overwritten. Thread-safe via Lock. +/// Go reference: server.go closedClients / ClosedState tracking. +/// +public sealed class ClosedConnectionRingBuffer +{ + private readonly ClosedClient[] _buffer; + private readonly Lock _lock = new(); + private int _head; // Next write position + private int _count; // Current count (up to capacity) + private long _totalClosed; // Running total of all closed connections ever + + public ClosedConnectionRingBuffer(int capacity = 1024) + { + if (capacity <= 0) throw new ArgumentOutOfRangeException(nameof(capacity), "Capacity must be greater than zero."); + _buffer = new ClosedClient[capacity]; + } + + public int Capacity => _buffer.Length; + + public int Count + { + get { lock (_lock) return _count; } + } + + public long TotalClosed + { + get { lock (_lock) return _totalClosed; } + } + + /// + /// Adds a closed connection snapshot. If the buffer is full the oldest entry is overwritten. + /// + public void Add(ClosedClient info) + { + lock (_lock) + { + _buffer[_head] = info; + _head = (_head + 1) % _buffer.Length; + if (_count < _buffer.Length) + _count++; + _totalClosed++; + } + } + + /// + /// Returns a snapshot of all entries ordered newest-first. + /// + public IReadOnlyList GetAll() + { + lock (_lock) + { + return BuildSnapshot(_count); + } + } + + /// + /// Returns up to most recent entries, ordered newest-first. + /// + public IReadOnlyList GetRecent(int count) + { + lock (_lock) + { + var take = Math.Min(count, _count); + return BuildSnapshot(take); + } + } + + /// + /// Clears all entries and resets counters. + /// + public void Clear() + { + lock (_lock) + { + Array.Clear(_buffer, 0, _buffer.Length); + _head = 0; + _count = 0; + // TotalClosed is intentionally not reset — it is a running lifetime total. + } + } + + // Must be called while _lock is held. + // Returns the 'take' most recent entries, newest-first. + private ClosedClient[] BuildSnapshot(int take) + { + if (take == 0) return []; + + var result = new ClosedClient[take]; + var capacity = _buffer.Length; + + // _head points to the next write slot, so (_head - 1) is the most recent entry. + for (var i = 0; i < take; i++) + { + var idx = ((_head - 1 - i) % capacity + capacity) % capacity; + result[i] = _buffer[idx]; + } + + return result; + } +} diff --git a/src/NATS.Server/NatsServer.cs b/src/NATS.Server/NatsServer.cs index 90a27df..5ecedfd 100644 --- a/src/NATS.Server/NatsServer.cs +++ b/src/NATS.Server/NatsServer.cs @@ -32,7 +32,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable { private readonly NatsOptions _options; private readonly ConcurrentDictionary _clients = new(); - private readonly ConcurrentQueue _closedClients = new(); + private ClosedConnectionRingBuffer _closedClients; private readonly ServerInfo _serverInfo; private readonly ILogger _logger; private readonly ILoggerFactory _loggerFactory; @@ -118,7 +118,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable public Action? ReOpenLogFile { get; set; } public IEnumerable GetClients() => _clients.Values; - public IEnumerable GetClosedClients() => _closedClients; + public IReadOnlyList GetClosedClients() => _closedClients.GetAll(); public IEnumerable GetAccounts() => _accounts.Values; public bool HasRemoteInterest(string subject) => _globalAccount.SubList.HasRemoteInterest(subject); @@ -364,6 +364,7 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable _options = options; _loggerFactory = loggerFactory; _logger = loggerFactory.CreateLogger(); + _closedClients = new ClosedConnectionRingBuffer(options.MaxClosedClients); _authService = AuthService.Build(options); _globalAccount = new Account(Account.GlobalAccountName); _accounts[Account.GlobalAccountName] = _globalAccount; @@ -1509,8 +1510,8 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable _clients.TryRemove(client.Id, out _); _logger.LogDebug("Removed client {ClientId}", client.Id); - // Snapshot for closed-connections tracking - _closedClients.Enqueue(new ClosedClient + // Snapshot for closed-connections tracking (ring buffer auto-overwrites oldest when full) + _closedClients.Add(new ClosedClient { Cid = client.Id, Ip = client.RemoteIp ?? "", @@ -1538,10 +1539,6 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable Proxy = client.ClientOpts?.Username?.StartsWith("proxy:", StringComparison.Ordinal) == true ? "true" : "", }); - // Cap closed clients list - while (_closedClients.Count > _options.MaxClosedClients) - _closedClients.TryDequeue(out _); - var subList = client.Account?.SubList ?? _globalAccount.SubList; client.RemoveAllSubscriptions(subList); client.Account?.RemoveClient(client.Id); @@ -1830,7 +1827,11 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable _options.MaxSubs = newOpts.MaxSubs; _options.MaxSubTokens = newOpts.MaxSubTokens; _options.MaxTracedMsgLen = newOpts.MaxTracedMsgLen; - _options.MaxClosedClients = newOpts.MaxClosedClients; + if (newOpts.MaxClosedClients != _options.MaxClosedClients) + { + _options.MaxClosedClients = newOpts.MaxClosedClients; + _closedClients = new ClosedConnectionRingBuffer(newOpts.MaxClosedClients); + } // TLS _options.TlsCert = newOpts.TlsCert; diff --git a/tests/NATS.Server.Tests/Monitoring/ClosedConnectionRingBufferTests.cs b/tests/NATS.Server.Tests/Monitoring/ClosedConnectionRingBufferTests.cs new file mode 100644 index 0000000..aaeda9d --- /dev/null +++ b/tests/NATS.Server.Tests/Monitoring/ClosedConnectionRingBufferTests.cs @@ -0,0 +1,240 @@ +// Go reference: server/monitor.go — closedClients ring buffer, ClosedState tracking. +// These tests verify the fixed-size ring buffer used to track recently closed connections +// for the /connz?state=closed monitoring endpoint. + +using NATS.Server.Monitoring; + +namespace NATS.Server.Tests.Monitoring; + +public class ClosedConnectionRingBufferTests +{ + // ----------------------------------------------------------------------- + // Helpers + // ----------------------------------------------------------------------- + + private static ClosedClient MakeEntry(ulong cid, string reason = "normal") => new() + { + Cid = cid, + Ip = "127.0.0.1", + Port = 4222, + Start = DateTime.UtcNow.AddSeconds(-10), + Stop = DateTime.UtcNow, + Reason = reason, + }; + + // ----------------------------------------------------------------------- + // 1. Add_IncreasesCount + // ----------------------------------------------------------------------- + + /// + /// Adding entries should increase Count up to capacity. + /// + [Fact] + public void Add_IncreasesCount() + { + var buf = new ClosedConnectionRingBuffer(capacity: 10); + + buf.Count.ShouldBe(0); + + buf.Add(MakeEntry(1)); + buf.Count.ShouldBe(1); + + buf.Add(MakeEntry(2)); + buf.Count.ShouldBe(2); + + buf.Add(MakeEntry(3)); + buf.Count.ShouldBe(3); + } + + // ----------------------------------------------------------------------- + // 2. Add_RingOverwrite_CapacityNotExceeded + // ----------------------------------------------------------------------- + + /// + /// When capacity is exceeded the count stays at capacity (oldest entry is overwritten). + /// + [Fact] + public void Add_RingOverwrite_CapacityNotExceeded() + { + const int capacity = 5; + var buf = new ClosedConnectionRingBuffer(capacity); + + for (var i = 1; i <= capacity + 3; i++) + buf.Add(MakeEntry((ulong)i)); + + buf.Count.ShouldBe(capacity); + } + + // ----------------------------------------------------------------------- + // 3. GetAll_ReturnsNewestFirst + // ----------------------------------------------------------------------- + + /// + /// GetAll should return entries ordered newest-first. + /// + [Fact] + public void GetAll_ReturnsNewestFirst() + { + var buf = new ClosedConnectionRingBuffer(capacity: 10); + + buf.Add(MakeEntry(1)); + buf.Add(MakeEntry(2)); + buf.Add(MakeEntry(3)); + + var all = buf.GetAll(); + + all.Count.ShouldBe(3); + all[0].Cid.ShouldBe(3UL); // newest + all[1].Cid.ShouldBe(2UL); + all[2].Cid.ShouldBe(1UL); // oldest + } + + // ----------------------------------------------------------------------- + // 4. GetAll_EmptyBuffer_ReturnsEmpty + // ----------------------------------------------------------------------- + + /// + /// GetAll on an empty buffer should return an empty list. + /// + [Fact] + public void GetAll_EmptyBuffer_ReturnsEmpty() + { + var buf = new ClosedConnectionRingBuffer(capacity: 10); + + var all = buf.GetAll(); + + all.ShouldBeEmpty(); + } + + // ----------------------------------------------------------------------- + // 5. GetRecent_ReturnsRequestedCount + // ----------------------------------------------------------------------- + + /// + /// GetRecent(n) where n <= Count should return exactly n entries. + /// + [Fact] + public void GetRecent_ReturnsRequestedCount() + { + var buf = new ClosedConnectionRingBuffer(capacity: 10); + for (var i = 1; i <= 8; i++) + buf.Add(MakeEntry((ulong)i)); + + var recent = buf.GetRecent(3); + + recent.Count.ShouldBe(3); + recent[0].Cid.ShouldBe(8UL); // newest + recent[1].Cid.ShouldBe(7UL); + recent[2].Cid.ShouldBe(6UL); + } + + // ----------------------------------------------------------------------- + // 6. GetRecent_LessThanAvailable_ReturnsAll + // ----------------------------------------------------------------------- + + /// + /// GetRecent(n) where n > Count should return all available entries. + /// + [Fact] + public void GetRecent_LessThanAvailable_ReturnsAll() + { + var buf = new ClosedConnectionRingBuffer(capacity: 10); + + buf.Add(MakeEntry(1)); + buf.Add(MakeEntry(2)); + + var recent = buf.GetRecent(100); + + recent.Count.ShouldBe(2); + } + + // ----------------------------------------------------------------------- + // 7. TotalClosed_TracksAllAdditions + // ----------------------------------------------------------------------- + + /// + /// TotalClosed should increment for every Add, even after the ring wraps around. + /// + [Fact] + public void TotalClosed_TracksAllAdditions() + { + const int capacity = 4; + var buf = new ClosedConnectionRingBuffer(capacity); + + buf.TotalClosed.ShouldBe(0L); + + for (var i = 1; i <= 10; i++) + buf.Add(MakeEntry((ulong)i)); + + buf.TotalClosed.ShouldBe(10L); + buf.Count.ShouldBe(capacity); // buffer is full but total reflects all 10 + } + + // ----------------------------------------------------------------------- + // 8. Clear_ResetsCountAndBuffer + // ----------------------------------------------------------------------- + + /// + /// Clear should reset Count to zero and GetAll should return an empty list. + /// TotalClosed is intentionally not reset because it is a running lifetime counter. + /// + [Fact] + public void Clear_ResetsCountAndBuffer() + { + var buf = new ClosedConnectionRingBuffer(capacity: 10); + + buf.Add(MakeEntry(1)); + buf.Add(MakeEntry(2)); + buf.Add(MakeEntry(3)); + buf.TotalClosed.ShouldBe(3L); + + buf.Clear(); + + buf.Count.ShouldBe(0); + buf.GetAll().ShouldBeEmpty(); + // TotalClosed is a lifetime counter; it is not reset by Clear. + buf.TotalClosed.ShouldBe(3L); + } + + // ----------------------------------------------------------------------- + // 9. Capacity_ReturnsConfiguredSize + // ----------------------------------------------------------------------- + + /// + /// Capacity should reflect the value passed to the constructor. + /// + [Fact] + public void Capacity_ReturnsConfiguredSize() + { + var buf = new ClosedConnectionRingBuffer(capacity: 42); + buf.Capacity.ShouldBe(42); + } + + // ----------------------------------------------------------------------- + // 10. Add_WrapsCorrectly + // ----------------------------------------------------------------------- + + /// + /// After adding capacity+1 items the oldest entry (cid=1) should no longer be present, + /// and the buffer should contain the most recent 'capacity' items. + /// + [Fact] + public void Add_WrapsCorrectly() + { + const int capacity = 5; + var buf = new ClosedConnectionRingBuffer(capacity); + + for (var i = 1; i <= capacity + 1; i++) + buf.Add(MakeEntry((ulong)i)); + + var all = buf.GetAll(); + + all.Count.ShouldBe(capacity); + + // cid=1 (the oldest) should have been overwritten + all.Any(e => e.Cid == 1UL).ShouldBeFalse(); + + // The newest entry (cid=capacity+1) should be first + all[0].Cid.ShouldBe((ulong)(capacity + 1)); + } +}