diff --git a/src/NATS.Server/Auth/Account.cs b/src/NATS.Server/Auth/Account.cs index 5ffa852..5fed016 100644 --- a/src/NATS.Server/Auth/Account.cs +++ b/src/NATS.Server/Auth/Account.cs @@ -152,6 +152,16 @@ public sealed class Account : IDisposable return true; } + // Slow consumer tracking + // Go reference: server/client.go — handleSlowConsumer, markConnAsSlow, server/accounts.go slowConsumerCount + private long _slowConsumerCount; + + public long SlowConsumerCount => Interlocked.Read(ref _slowConsumerCount); + + public void IncrementSlowConsumers() => Interlocked.Increment(ref _slowConsumerCount); + + public void ResetSlowConsumerCount() => Interlocked.Exchange(ref _slowConsumerCount, 0L); + // Per-account message/byte stats private long _inMsgs; private long _outMsgs; diff --git a/src/NATS.Server/SlowConsumerTracker.cs b/src/NATS.Server/SlowConsumerTracker.cs new file mode 100644 index 0000000..47bc54b --- /dev/null +++ b/src/NATS.Server/SlowConsumerTracker.cs @@ -0,0 +1,59 @@ +using System.Collections.Concurrent; + +namespace NATS.Server; + +/// +/// Tracks slow consumer events per with optional threshold-based callbacks. +/// Go reference: server/client.go — handleSlowConsumer, markConnAsSlow. +/// +public sealed class SlowConsumerTracker +{ + private readonly ConcurrentDictionary _countsByKind = new(); + private long _totalCount; + private readonly int _threshold; + private Action? _onThresholdExceeded; + + /// + /// When the total count reaches this value the registered callback is fired. + /// 0 means no threshold (callback never fires automatically). + /// + public SlowConsumerTracker(int threshold = 0) + { + _threshold = threshold; + } + + /// Total slow-consumer events recorded across all s. + public long TotalCount => Interlocked.Read(ref _totalCount); + + /// + /// Records one slow-consumer event for the given . + /// Increments both the per-kind counter and the total counter. + /// Fires the threshold callback if reaches the configured threshold. + /// + public void RecordSlowConsumer(ClientKind kind) + { + _countsByKind.AddOrUpdate(kind, 1L, static (_, existing) => existing + 1L); + var total = Interlocked.Increment(ref _totalCount); + + if (_threshold > 0 && total >= _threshold) + _onThresholdExceeded?.Invoke(kind); + } + + /// Returns the number of slow-consumer events recorded for . + public long GetCount(ClientKind kind) => + _countsByKind.TryGetValue(kind, out var count) ? count : 0L; + + /// + /// Registers a callback that is invoked (with the triggering ) + /// each time the total count reaches or exceeds the configured threshold. + /// + public void OnThresholdExceeded(Action callback) => + _onThresholdExceeded = callback; + + /// Resets all per-kind and total counters to zero. + public void Reset() + { + _countsByKind.Clear(); + Interlocked.Exchange(ref _totalCount, 0L); + } +} diff --git a/tests/NATS.Server.Tests/SlowConsumerStallGateTests.cs b/tests/NATS.Server.Tests/SlowConsumerStallGateTests.cs new file mode 100644 index 0000000..0dc966c --- /dev/null +++ b/tests/NATS.Server.Tests/SlowConsumerStallGateTests.cs @@ -0,0 +1,153 @@ +using NATS.Server; +using NATS.Server.Auth; +using Shouldly; + +namespace NATS.Server.Tests; + +/// +/// Tests for and slow-consumer counters. +/// Go reference: server/client.go — handleSlowConsumer, markConnAsSlow. +/// +public class SlowConsumerStallGateTests +{ + // ── SlowConsumerTracker ──────────────────────────────────────────────────── + + [Fact] + public void RecordSlowConsumer_increments_total_count() + { + var tracker = new SlowConsumerTracker(); + + tracker.RecordSlowConsumer(ClientKind.Client); + tracker.RecordSlowConsumer(ClientKind.Client); + tracker.RecordSlowConsumer(ClientKind.Router); + + tracker.TotalCount.ShouldBe(3L); + } + + [Fact] + public void RecordSlowConsumer_increments_per_kind_count() + { + var tracker = new SlowConsumerTracker(); + + tracker.RecordSlowConsumer(ClientKind.Client); + tracker.RecordSlowConsumer(ClientKind.Client); + tracker.RecordSlowConsumer(ClientKind.Gateway); + + tracker.GetCount(ClientKind.Client).ShouldBe(2L); + tracker.GetCount(ClientKind.Gateway).ShouldBe(1L); + } + + [Fact] + public void GetCount_returns_zero_for_unrecorded_kind() + { + var tracker = new SlowConsumerTracker(); + + tracker.GetCount(ClientKind.Leaf).ShouldBe(0L); + } + + [Fact] + public void Multiple_kinds_tracked_independently() + { + var tracker = new SlowConsumerTracker(); + + tracker.RecordSlowConsumer(ClientKind.Client); + tracker.RecordSlowConsumer(ClientKind.Router); + tracker.RecordSlowConsumer(ClientKind.Gateway); + tracker.RecordSlowConsumer(ClientKind.Leaf); + + tracker.GetCount(ClientKind.Client).ShouldBe(1L); + tracker.GetCount(ClientKind.Router).ShouldBe(1L); + tracker.GetCount(ClientKind.Gateway).ShouldBe(1L); + tracker.GetCount(ClientKind.Leaf).ShouldBe(1L); + tracker.GetCount(ClientKind.System).ShouldBe(0L); + tracker.TotalCount.ShouldBe(4L); + } + + [Fact] + public void OnThresholdExceeded_fires_when_threshold_reached() + { + var tracker = new SlowConsumerTracker(threshold: 3); + ClientKind? firedKind = null; + tracker.OnThresholdExceeded(k => firedKind = k); + + tracker.RecordSlowConsumer(ClientKind.Client); + firedKind.ShouldBeNull(); + + tracker.RecordSlowConsumer(ClientKind.Client); + firedKind.ShouldBeNull(); + + tracker.RecordSlowConsumer(ClientKind.Router); + firedKind.ShouldBe(ClientKind.Router); + } + + [Fact] + public void OnThresholdExceeded_not_fired_below_threshold() + { + var tracker = new SlowConsumerTracker(threshold: 5); + var fired = false; + tracker.OnThresholdExceeded(_ => fired = true); + + tracker.RecordSlowConsumer(ClientKind.Client); + tracker.RecordSlowConsumer(ClientKind.Client); + tracker.RecordSlowConsumer(ClientKind.Client); + + fired.ShouldBeFalse(); + } + + [Fact] + public void Reset_clears_all_counters() + { + var tracker = new SlowConsumerTracker(); + + tracker.RecordSlowConsumer(ClientKind.Client); + tracker.RecordSlowConsumer(ClientKind.Router); + tracker.TotalCount.ShouldBe(2L); + + tracker.Reset(); + + tracker.TotalCount.ShouldBe(0L); + tracker.GetCount(ClientKind.Client).ShouldBe(0L); + tracker.GetCount(ClientKind.Router).ShouldBe(0L); + } + + // ── Account slow-consumer counters ──────────────────────────────────────── + + [Fact] + public void Account_IncrementSlowConsumers_tracks_count() + { + var account = new Account("test-account"); + + account.SlowConsumerCount.ShouldBe(0L); + + account.IncrementSlowConsumers(); + account.IncrementSlowConsumers(); + account.IncrementSlowConsumers(); + + account.SlowConsumerCount.ShouldBe(3L); + } + + [Fact] + public void Account_ResetSlowConsumerCount_clears() + { + var account = new Account("test-account"); + + account.IncrementSlowConsumers(); + account.IncrementSlowConsumers(); + account.SlowConsumerCount.ShouldBe(2L); + + account.ResetSlowConsumerCount(); + + account.SlowConsumerCount.ShouldBe(0L); + } + + [Fact] + public void Thread_safety_concurrent_increments() + { + var tracker = new SlowConsumerTracker(); + + Parallel.For(0, 1000, _ => tracker.RecordSlowConsumer(ClientKind.Client)); + + tracker.TotalCount.ShouldBe(1000L); + tracker.GetCount(ClientKind.Client).ShouldBe(1000L); + } +}