feat: add slow consumer per-kind tracking with account counters (Gap 5.5)
Adds SlowConsumerTracker class for per-ClientKind slow consumer counting with configurable threshold callbacks, and extends Account with atomic IncrementSlowConsumers/SlowConsumerCount/ResetSlowConsumerCount members. Includes 10 unit tests covering concurrency, threshold firing, and reset.
This commit is contained in:
@@ -152,6 +152,16 @@ public sealed class Account : IDisposable
|
||||
return true;
|
||||
}
|
||||
|
||||
// Slow consumer tracking
|
||||
// Go reference: server/client.go — handleSlowConsumer, markConnAsSlow, server/accounts.go slowConsumerCount
|
||||
private long _slowConsumerCount;
|
||||
|
||||
public long SlowConsumerCount => Interlocked.Read(ref _slowConsumerCount);
|
||||
|
||||
public void IncrementSlowConsumers() => Interlocked.Increment(ref _slowConsumerCount);
|
||||
|
||||
public void ResetSlowConsumerCount() => Interlocked.Exchange(ref _slowConsumerCount, 0L);
|
||||
|
||||
// Per-account message/byte stats
|
||||
private long _inMsgs;
|
||||
private long _outMsgs;
|
||||
|
||||
59
src/NATS.Server/SlowConsumerTracker.cs
Normal file
59
src/NATS.Server/SlowConsumerTracker.cs
Normal file
@@ -0,0 +1,59 @@
|
||||
using System.Collections.Concurrent;
|
||||
|
||||
namespace NATS.Server;
|
||||
|
||||
/// <summary>
|
||||
/// Tracks slow consumer events per <see cref="ClientKind"/> with optional threshold-based callbacks.
|
||||
/// Go reference: server/client.go — handleSlowConsumer, markConnAsSlow.
|
||||
/// </summary>
|
||||
public sealed class SlowConsumerTracker
|
||||
{
|
||||
private readonly ConcurrentDictionary<ClientKind, long> _countsByKind = new();
|
||||
private long _totalCount;
|
||||
private readonly int _threshold;
|
||||
private Action<ClientKind>? _onThresholdExceeded;
|
||||
|
||||
/// <param name="threshold">
|
||||
/// When the total count reaches this value the registered callback is fired.
|
||||
/// 0 means no threshold (callback never fires automatically).
|
||||
/// </param>
|
||||
public SlowConsumerTracker(int threshold = 0)
|
||||
{
|
||||
_threshold = threshold;
|
||||
}
|
||||
|
||||
/// <summary>Total slow-consumer events recorded across all <see cref="ClientKind"/>s.</summary>
|
||||
public long TotalCount => Interlocked.Read(ref _totalCount);
|
||||
|
||||
/// <summary>
|
||||
/// Records one slow-consumer event for the given <paramref name="kind"/>.
|
||||
/// Increments both the per-kind counter and the total counter.
|
||||
/// Fires the threshold callback if <see cref="TotalCount"/> reaches the configured threshold.
|
||||
/// </summary>
|
||||
public void RecordSlowConsumer(ClientKind kind)
|
||||
{
|
||||
_countsByKind.AddOrUpdate(kind, 1L, static (_, existing) => existing + 1L);
|
||||
var total = Interlocked.Increment(ref _totalCount);
|
||||
|
||||
if (_threshold > 0 && total >= _threshold)
|
||||
_onThresholdExceeded?.Invoke(kind);
|
||||
}
|
||||
|
||||
/// <summary>Returns the number of slow-consumer events recorded for <paramref name="kind"/>.</summary>
|
||||
public long GetCount(ClientKind kind) =>
|
||||
_countsByKind.TryGetValue(kind, out var count) ? count : 0L;
|
||||
|
||||
/// <summary>
|
||||
/// Registers a callback that is invoked (with the triggering <see cref="ClientKind"/>)
|
||||
/// each time the total count reaches or exceeds the configured threshold.
|
||||
/// </summary>
|
||||
public void OnThresholdExceeded(Action<ClientKind> callback) =>
|
||||
_onThresholdExceeded = callback;
|
||||
|
||||
/// <summary>Resets all per-kind and total counters to zero.</summary>
|
||||
public void Reset()
|
||||
{
|
||||
_countsByKind.Clear();
|
||||
Interlocked.Exchange(ref _totalCount, 0L);
|
||||
}
|
||||
}
|
||||
153
tests/NATS.Server.Tests/SlowConsumerStallGateTests.cs
Normal file
153
tests/NATS.Server.Tests/SlowConsumerStallGateTests.cs
Normal file
@@ -0,0 +1,153 @@
|
||||
using NATS.Server;
|
||||
using NATS.Server.Auth;
|
||||
using Shouldly;
|
||||
|
||||
namespace NATS.Server.Tests;
|
||||
|
||||
/// <summary>
|
||||
/// Tests for <see cref="SlowConsumerTracker"/> and <see cref="Account"/> slow-consumer counters.
|
||||
/// Go reference: server/client.go — handleSlowConsumer, markConnAsSlow.
|
||||
/// </summary>
|
||||
public class SlowConsumerStallGateTests
|
||||
{
|
||||
// ── SlowConsumerTracker ────────────────────────────────────────────────────
|
||||
|
||||
[Fact]
|
||||
public void RecordSlowConsumer_increments_total_count()
|
||||
{
|
||||
var tracker = new SlowConsumerTracker();
|
||||
|
||||
tracker.RecordSlowConsumer(ClientKind.Client);
|
||||
tracker.RecordSlowConsumer(ClientKind.Client);
|
||||
tracker.RecordSlowConsumer(ClientKind.Router);
|
||||
|
||||
tracker.TotalCount.ShouldBe(3L);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void RecordSlowConsumer_increments_per_kind_count()
|
||||
{
|
||||
var tracker = new SlowConsumerTracker();
|
||||
|
||||
tracker.RecordSlowConsumer(ClientKind.Client);
|
||||
tracker.RecordSlowConsumer(ClientKind.Client);
|
||||
tracker.RecordSlowConsumer(ClientKind.Gateway);
|
||||
|
||||
tracker.GetCount(ClientKind.Client).ShouldBe(2L);
|
||||
tracker.GetCount(ClientKind.Gateway).ShouldBe(1L);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void GetCount_returns_zero_for_unrecorded_kind()
|
||||
{
|
||||
var tracker = new SlowConsumerTracker();
|
||||
|
||||
tracker.GetCount(ClientKind.Leaf).ShouldBe(0L);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Multiple_kinds_tracked_independently()
|
||||
{
|
||||
var tracker = new SlowConsumerTracker();
|
||||
|
||||
tracker.RecordSlowConsumer(ClientKind.Client);
|
||||
tracker.RecordSlowConsumer(ClientKind.Router);
|
||||
tracker.RecordSlowConsumer(ClientKind.Gateway);
|
||||
tracker.RecordSlowConsumer(ClientKind.Leaf);
|
||||
|
||||
tracker.GetCount(ClientKind.Client).ShouldBe(1L);
|
||||
tracker.GetCount(ClientKind.Router).ShouldBe(1L);
|
||||
tracker.GetCount(ClientKind.Gateway).ShouldBe(1L);
|
||||
tracker.GetCount(ClientKind.Leaf).ShouldBe(1L);
|
||||
tracker.GetCount(ClientKind.System).ShouldBe(0L);
|
||||
tracker.TotalCount.ShouldBe(4L);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void OnThresholdExceeded_fires_when_threshold_reached()
|
||||
{
|
||||
var tracker = new SlowConsumerTracker(threshold: 3);
|
||||
ClientKind? firedKind = null;
|
||||
tracker.OnThresholdExceeded(k => firedKind = k);
|
||||
|
||||
tracker.RecordSlowConsumer(ClientKind.Client);
|
||||
firedKind.ShouldBeNull();
|
||||
|
||||
tracker.RecordSlowConsumer(ClientKind.Client);
|
||||
firedKind.ShouldBeNull();
|
||||
|
||||
tracker.RecordSlowConsumer(ClientKind.Router);
|
||||
firedKind.ShouldBe(ClientKind.Router);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void OnThresholdExceeded_not_fired_below_threshold()
|
||||
{
|
||||
var tracker = new SlowConsumerTracker(threshold: 5);
|
||||
var fired = false;
|
||||
tracker.OnThresholdExceeded(_ => fired = true);
|
||||
|
||||
tracker.RecordSlowConsumer(ClientKind.Client);
|
||||
tracker.RecordSlowConsumer(ClientKind.Client);
|
||||
tracker.RecordSlowConsumer(ClientKind.Client);
|
||||
|
||||
fired.ShouldBeFalse();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Reset_clears_all_counters()
|
||||
{
|
||||
var tracker = new SlowConsumerTracker();
|
||||
|
||||
tracker.RecordSlowConsumer(ClientKind.Client);
|
||||
tracker.RecordSlowConsumer(ClientKind.Router);
|
||||
tracker.TotalCount.ShouldBe(2L);
|
||||
|
||||
tracker.Reset();
|
||||
|
||||
tracker.TotalCount.ShouldBe(0L);
|
||||
tracker.GetCount(ClientKind.Client).ShouldBe(0L);
|
||||
tracker.GetCount(ClientKind.Router).ShouldBe(0L);
|
||||
}
|
||||
|
||||
// ── Account slow-consumer counters ────────────────────────────────────────
|
||||
|
||||
[Fact]
|
||||
public void Account_IncrementSlowConsumers_tracks_count()
|
||||
{
|
||||
var account = new Account("test-account");
|
||||
|
||||
account.SlowConsumerCount.ShouldBe(0L);
|
||||
|
||||
account.IncrementSlowConsumers();
|
||||
account.IncrementSlowConsumers();
|
||||
account.IncrementSlowConsumers();
|
||||
|
||||
account.SlowConsumerCount.ShouldBe(3L);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Account_ResetSlowConsumerCount_clears()
|
||||
{
|
||||
var account = new Account("test-account");
|
||||
|
||||
account.IncrementSlowConsumers();
|
||||
account.IncrementSlowConsumers();
|
||||
account.SlowConsumerCount.ShouldBe(2L);
|
||||
|
||||
account.ResetSlowConsumerCount();
|
||||
|
||||
account.SlowConsumerCount.ShouldBe(0L);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Thread_safety_concurrent_increments()
|
||||
{
|
||||
var tracker = new SlowConsumerTracker();
|
||||
|
||||
Parallel.For(0, 1000, _ => tracker.RecordSlowConsumer(ClientKind.Client));
|
||||
|
||||
tracker.TotalCount.ShouldBe(1000L);
|
||||
tracker.GetCount(ClientKind.Client).ShouldBe(1000L);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user