feat: add service export latency tracking with p50/p90/p99 (Gap 9.1)
Add ServiceLatencyTracker with sorted-sample histogram, percentile getters (p50/p90/p99), average/min/max, reset, and immutable snapshot. Wire LatencyTracker and RecordServiceLatency onto Account. Cover with 11 xUnit tests.
This commit is contained in:
@@ -205,6 +205,13 @@ public sealed class Account : IDisposable
|
||||
return _internalClient;
|
||||
}
|
||||
|
||||
// Service export latency tracking
|
||||
// Go reference: accounts.go serviceLatency / serviceExportLatencyStats.
|
||||
public ServiceLatencyTracker LatencyTracker { get; } = new();
|
||||
|
||||
/// <summary>Records a service request latency sample on this account's tracker.</summary>
|
||||
public void RecordServiceLatency(double latencyMs) => LatencyTracker.RecordLatency(latencyMs);
|
||||
|
||||
public void AddServiceExport(string subject, ServiceResponseType responseType, IEnumerable<Account>? approved)
|
||||
{
|
||||
var auth = new ExportAuth
|
||||
@@ -296,5 +303,114 @@ public sealed class Account : IDisposable
|
||||
return true;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Returns true if adding a stream import from <paramref name="proposedSource"/> would create a cycle.
|
||||
/// Uses DFS through the stream import graph starting at proposedSource, checking if any path leads back to this account.
|
||||
/// Go reference: accounts.go streamImportFormsCycle / checkStreamImportsForCycles.
|
||||
/// </summary>
|
||||
public bool StreamImportFormsCycle(Account proposedSource)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(proposedSource);
|
||||
var visited = new HashSet<string>(StringComparer.Ordinal);
|
||||
return DetectStreamImportCycle(proposedSource, visited);
|
||||
}
|
||||
|
||||
private bool DetectStreamImportCycle(Account current, HashSet<string> visited)
|
||||
{
|
||||
// If we've reached this account, a cycle exists.
|
||||
if (string.Equals(current.Name, Name, StringComparison.Ordinal))
|
||||
return true;
|
||||
|
||||
// Guard against revisiting nodes (handles diamonds and other shared paths).
|
||||
if (!visited.Add(current.Name))
|
||||
return false;
|
||||
|
||||
foreach (var sourceAccount in current.Imports.GetStreamImportSourceAccounts())
|
||||
{
|
||||
if (DetectStreamImportCycle(sourceAccount, visited))
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Returns the names of all accounts this account imports streams from.
|
||||
/// Go reference: accounts.go imports.streams — acc field on each streamImport.
|
||||
/// </summary>
|
||||
public IReadOnlyList<string> GetStreamImportSources()
|
||||
{
|
||||
var sources = Imports.GetStreamImportSourceAccounts();
|
||||
if (sources.Count == 0)
|
||||
return [];
|
||||
var names = new List<string>(sources.Count);
|
||||
foreach (var acc in sources)
|
||||
names.Add(acc.Name);
|
||||
return names;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Returns true if this account has at least one stream import from the account with the given name.
|
||||
/// </summary>
|
||||
public bool HasStreamImportFrom(string accountName) =>
|
||||
Imports.Streams.Exists(si => string.Equals(si.SourceAccount.Name, accountName, StringComparison.Ordinal));
|
||||
|
||||
// Per-subject service response thresholds.
|
||||
// Go reference: server/accounts.go — serviceExport.respThresh, SetServiceExportResponseThreshold, ServiceExportResponseThreshold.
|
||||
public ConcurrentDictionary<string, TimeSpan> ServiceResponseThresholds { get; } =
|
||||
new(StringComparer.Ordinal);
|
||||
|
||||
/// <summary>
|
||||
/// Sets the maximum time a service export responder may take to reply.
|
||||
/// Go reference: accounts.go SetServiceExportResponseThreshold (~line 2522).
|
||||
/// </summary>
|
||||
public void SetServiceResponseThreshold(string subject, TimeSpan threshold) =>
|
||||
ServiceResponseThresholds[subject] = threshold;
|
||||
|
||||
/// <summary>
|
||||
/// Returns the threshold for <paramref name="subject"/>, or <see langword="null"/> if none is set.
|
||||
/// Go reference: accounts.go ServiceExportResponseThreshold (~line 2510).
|
||||
/// </summary>
|
||||
public TimeSpan? GetServiceResponseThreshold(string subject) =>
|
||||
ServiceResponseThresholds.TryGetValue(subject, out var t) ? t : null;
|
||||
|
||||
/// <summary>
|
||||
/// Returns <see langword="true"/> if <paramref name="elapsed"/> exceeds the registered threshold
|
||||
/// for <paramref name="subject"/>. When no threshold is set the response is never considered overdue.
|
||||
/// Go reference: accounts.go — respThresh check inside response-timer logic.
|
||||
/// </summary>
|
||||
public bool IsServiceResponseOverdue(string subject, TimeSpan elapsed)
|
||||
{
|
||||
if (!ServiceResponseThresholds.TryGetValue(subject, out var threshold))
|
||||
return false;
|
||||
return elapsed > threshold;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Combines threshold lookup and overdue check into a single result.
|
||||
/// Go reference: accounts.go — ServiceExportResponseThreshold + response-timer logic.
|
||||
/// </summary>
|
||||
public ServiceResponseThresholdResult CheckServiceResponse(string subject, TimeSpan elapsed)
|
||||
{
|
||||
if (!ServiceResponseThresholds.TryGetValue(subject, out var threshold))
|
||||
return new ServiceResponseThresholdResult(Found: false, IsOverdue: false, Threshold: null, Elapsed: elapsed);
|
||||
|
||||
var overdue = elapsed > threshold;
|
||||
return new ServiceResponseThresholdResult(Found: true, IsOverdue: overdue, Threshold: threshold, Elapsed: elapsed);
|
||||
}
|
||||
|
||||
public void Dispose() => SubList.Dispose();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Carries the result of a <see cref="Account.CheckServiceResponse"/> call.
|
||||
/// </summary>
|
||||
/// <param name="Found">Whether a threshold was registered for the subject.</param>
|
||||
/// <param name="IsOverdue">Whether <paramref name="Elapsed"/> exceeds the threshold.</param>
|
||||
/// <param name="Threshold">The registered threshold, or <see langword="null"/> when not found.</param>
|
||||
/// <param name="Elapsed">The measured elapsed time that was checked.</param>
|
||||
public sealed record ServiceResponseThresholdResult(
|
||||
bool Found,
|
||||
bool IsOverdue,
|
||||
TimeSpan? Threshold,
|
||||
TimeSpan Elapsed);
|
||||
|
||||
133
src/NATS.Server/Auth/ServiceLatencyTracker.cs
Normal file
133
src/NATS.Server/Auth/ServiceLatencyTracker.cs
Normal file
@@ -0,0 +1,133 @@
|
||||
namespace NATS.Server.Auth;
|
||||
|
||||
/// <summary>
|
||||
/// Tracks service request latency using a sorted list of samples for percentile calculation.
|
||||
/// Go reference: accounts.go serviceLatency / serviceExportLatencyStats.
|
||||
/// </summary>
|
||||
public sealed class ServiceLatencyTracker
|
||||
{
|
||||
private readonly Lock _lock = new();
|
||||
private readonly List<double> _samples = [];
|
||||
private readonly int _maxSamples;
|
||||
private long _totalRequests;
|
||||
|
||||
public ServiceLatencyTracker(int maxSamples = 10000)
|
||||
{
|
||||
_maxSamples = maxSamples;
|
||||
}
|
||||
|
||||
/// <summary>Records a latency sample in milliseconds.</summary>
|
||||
public void RecordLatency(double latencyMs)
|
||||
{
|
||||
lock (_lock)
|
||||
{
|
||||
if (_samples.Count >= _maxSamples)
|
||||
_samples.RemoveAt(0);
|
||||
_samples.Add(latencyMs);
|
||||
_totalRequests++;
|
||||
}
|
||||
}
|
||||
|
||||
public double GetP50() => GetPercentile(0.50);
|
||||
public double GetP90() => GetPercentile(0.90);
|
||||
public double GetP99() => GetPercentile(0.99);
|
||||
|
||||
/// <summary>Returns the value at the given percentile (0.0–1.0) over recorded samples.</summary>
|
||||
public double GetPercentile(double percentile)
|
||||
{
|
||||
lock (_lock)
|
||||
return ComputePercentile(_samples, percentile);
|
||||
}
|
||||
|
||||
// Must be called under _lock.
|
||||
private static double ComputePercentile(List<double> samples, double percentile)
|
||||
{
|
||||
if (samples.Count == 0)
|
||||
return 0;
|
||||
var sorted = new List<double>(samples);
|
||||
sorted.Sort();
|
||||
var index = (int)(percentile * (sorted.Count - 1));
|
||||
return sorted[index];
|
||||
}
|
||||
|
||||
// Must be called under _lock.
|
||||
private static double ComputeAverage(List<double> samples)
|
||||
{
|
||||
if (samples.Count == 0)
|
||||
return 0;
|
||||
var sum = 0.0;
|
||||
foreach (var s in samples)
|
||||
sum += s;
|
||||
return sum / samples.Count;
|
||||
}
|
||||
|
||||
public long TotalRequests
|
||||
{
|
||||
get { lock (_lock) return _totalRequests; }
|
||||
}
|
||||
|
||||
public double AverageLatencyMs
|
||||
{
|
||||
get { lock (_lock) return ComputeAverage(_samples); }
|
||||
}
|
||||
|
||||
public double MinLatencyMs
|
||||
{
|
||||
get
|
||||
{
|
||||
lock (_lock)
|
||||
return _samples.Count == 0 ? 0 : _samples.Min();
|
||||
}
|
||||
}
|
||||
|
||||
public double MaxLatencyMs
|
||||
{
|
||||
get
|
||||
{
|
||||
lock (_lock)
|
||||
return _samples.Count == 0 ? 0 : _samples.Max();
|
||||
}
|
||||
}
|
||||
|
||||
public int SampleCount
|
||||
{
|
||||
get { lock (_lock) return _samples.Count; }
|
||||
}
|
||||
|
||||
/// <summary>Clears all samples and resets the total request counter.</summary>
|
||||
public void Reset()
|
||||
{
|
||||
lock (_lock)
|
||||
{
|
||||
_samples.Clear();
|
||||
_totalRequests = 0;
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>Returns an immutable snapshot of the current tracker state.</summary>
|
||||
public ServiceLatencySnapshot GetSnapshot()
|
||||
{
|
||||
lock (_lock)
|
||||
{
|
||||
return new ServiceLatencySnapshot(
|
||||
TotalRequests: _totalRequests,
|
||||
P50Ms: ComputePercentile(_samples, 0.50),
|
||||
P90Ms: ComputePercentile(_samples, 0.90),
|
||||
P99Ms: ComputePercentile(_samples, 0.99),
|
||||
AverageMs: ComputeAverage(_samples),
|
||||
MinMs: _samples.Count == 0 ? 0 : _samples.Min(),
|
||||
MaxMs: _samples.Count == 0 ? 0 : _samples.Max(),
|
||||
SampleCount: _samples.Count);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public sealed record ServiceLatencySnapshot(
|
||||
long TotalRequests,
|
||||
double P50Ms,
|
||||
double P90Ms,
|
||||
double P99Ms,
|
||||
double AverageMs,
|
||||
double MinMs,
|
||||
double MaxMs,
|
||||
int SampleCount);
|
||||
Reference in New Issue
Block a user