Adds SlowConsumerTracker class for per-ClientKind slow consumer counting with configurable threshold callbacks, and extends Account with atomic IncrementSlowConsumers/SlowConsumerCount/ResetSlowConsumerCount members. Includes 10 unit tests covering concurrency, threshold firing, and reset.
60 lines
2.3 KiB
C#
60 lines
2.3 KiB
C#
using System.Collections.Concurrent;
|
|
|
|
namespace NATS.Server;
|
|
|
|
/// <summary>
|
|
/// Tracks slow consumer events per <see cref="ClientKind"/> with optional threshold-based callbacks.
|
|
/// Go reference: server/client.go — handleSlowConsumer, markConnAsSlow.
|
|
/// </summary>
|
|
public sealed class SlowConsumerTracker
|
|
{
|
|
private readonly ConcurrentDictionary<ClientKind, long> _countsByKind = new();
|
|
private long _totalCount;
|
|
private readonly int _threshold;
|
|
private Action<ClientKind>? _onThresholdExceeded;
|
|
|
|
/// <param name="threshold">
|
|
/// When the total count reaches this value the registered callback is fired.
|
|
/// 0 means no threshold (callback never fires automatically).
|
|
/// </param>
|
|
public SlowConsumerTracker(int threshold = 0)
|
|
{
|
|
_threshold = threshold;
|
|
}
|
|
|
|
/// <summary>Total slow-consumer events recorded across all <see cref="ClientKind"/>s.</summary>
|
|
public long TotalCount => Interlocked.Read(ref _totalCount);
|
|
|
|
/// <summary>
|
|
/// Records one slow-consumer event for the given <paramref name="kind"/>.
|
|
/// Increments both the per-kind counter and the total counter.
|
|
/// Fires the threshold callback if <see cref="TotalCount"/> reaches the configured threshold.
|
|
/// </summary>
|
|
public void RecordSlowConsumer(ClientKind kind)
|
|
{
|
|
_countsByKind.AddOrUpdate(kind, 1L, static (_, existing) => existing + 1L);
|
|
var total = Interlocked.Increment(ref _totalCount);
|
|
|
|
if (_threshold > 0 && total >= _threshold)
|
|
_onThresholdExceeded?.Invoke(kind);
|
|
}
|
|
|
|
/// <summary>Returns the number of slow-consumer events recorded for <paramref name="kind"/>.</summary>
|
|
public long GetCount(ClientKind kind) =>
|
|
_countsByKind.TryGetValue(kind, out var count) ? count : 0L;
|
|
|
|
/// <summary>
|
|
/// Registers a callback that is invoked (with the triggering <see cref="ClientKind"/>)
|
|
/// each time the total count reaches or exceeds the configured threshold.
|
|
/// </summary>
|
|
public void OnThresholdExceeded(Action<ClientKind> callback) =>
|
|
_onThresholdExceeded = callback;
|
|
|
|
/// <summary>Resets all per-kind and total counters to zero.</summary>
|
|
public void Reset()
|
|
{
|
|
_countsByKind.Clear();
|
|
Interlocked.Exchange(ref _totalCount, 0L);
|
|
}
|
|
}
|