diff --git a/src/NATS.Server/Auth/Account.cs b/src/NATS.Server/Auth/Account.cs index ea5cad8..e48537d 100644 --- a/src/NATS.Server/Auth/Account.cs +++ b/src/NATS.Server/Auth/Account.cs @@ -205,6 +205,13 @@ public sealed class Account : IDisposable return _internalClient; } + // Service export latency tracking + // Go reference: accounts.go serviceLatency / serviceExportLatencyStats. + public ServiceLatencyTracker LatencyTracker { get; } = new(); + + /// Records a service request latency sample on this account's tracker. + public void RecordServiceLatency(double latencyMs) => LatencyTracker.RecordLatency(latencyMs); + public void AddServiceExport(string subject, ServiceResponseType responseType, IEnumerable? approved) { var auth = new ExportAuth @@ -296,5 +303,114 @@ public sealed class Account : IDisposable return true; } + /// + /// Returns true if adding a stream import from would create a cycle. + /// Uses DFS through the stream import graph starting at proposedSource, checking if any path leads back to this account. + /// Go reference: accounts.go streamImportFormsCycle / checkStreamImportsForCycles. + /// + public bool StreamImportFormsCycle(Account proposedSource) + { + ArgumentNullException.ThrowIfNull(proposedSource); + var visited = new HashSet(StringComparer.Ordinal); + return DetectStreamImportCycle(proposedSource, visited); + } + + private bool DetectStreamImportCycle(Account current, HashSet visited) + { + // If we've reached this account, a cycle exists. + if (string.Equals(current.Name, Name, StringComparison.Ordinal)) + return true; + + // Guard against revisiting nodes (handles diamonds and other shared paths). + if (!visited.Add(current.Name)) + return false; + + foreach (var sourceAccount in current.Imports.GetStreamImportSourceAccounts()) + { + if (DetectStreamImportCycle(sourceAccount, visited)) + return true; + } + + return false; + } + + /// + /// Returns the names of all accounts this account imports streams from. + /// Go reference: accounts.go imports.streams — acc field on each streamImport. + /// + public IReadOnlyList GetStreamImportSources() + { + var sources = Imports.GetStreamImportSourceAccounts(); + if (sources.Count == 0) + return []; + var names = new List(sources.Count); + foreach (var acc in sources) + names.Add(acc.Name); + return names; + } + + /// + /// Returns true if this account has at least one stream import from the account with the given name. + /// + public bool HasStreamImportFrom(string accountName) => + Imports.Streams.Exists(si => string.Equals(si.SourceAccount.Name, accountName, StringComparison.Ordinal)); + + // Per-subject service response thresholds. + // Go reference: server/accounts.go — serviceExport.respThresh, SetServiceExportResponseThreshold, ServiceExportResponseThreshold. + public ConcurrentDictionary ServiceResponseThresholds { get; } = + new(StringComparer.Ordinal); + + /// + /// Sets the maximum time a service export responder may take to reply. + /// Go reference: accounts.go SetServiceExportResponseThreshold (~line 2522). + /// + public void SetServiceResponseThreshold(string subject, TimeSpan threshold) => + ServiceResponseThresholds[subject] = threshold; + + /// + /// Returns the threshold for , or if none is set. + /// Go reference: accounts.go ServiceExportResponseThreshold (~line 2510). + /// + public TimeSpan? GetServiceResponseThreshold(string subject) => + ServiceResponseThresholds.TryGetValue(subject, out var t) ? t : null; + + /// + /// Returns if exceeds the registered threshold + /// for . When no threshold is set the response is never considered overdue. + /// Go reference: accounts.go — respThresh check inside response-timer logic. + /// + public bool IsServiceResponseOverdue(string subject, TimeSpan elapsed) + { + if (!ServiceResponseThresholds.TryGetValue(subject, out var threshold)) + return false; + return elapsed > threshold; + } + + /// + /// Combines threshold lookup and overdue check into a single result. + /// Go reference: accounts.go — ServiceExportResponseThreshold + response-timer logic. + /// + public ServiceResponseThresholdResult CheckServiceResponse(string subject, TimeSpan elapsed) + { + if (!ServiceResponseThresholds.TryGetValue(subject, out var threshold)) + return new ServiceResponseThresholdResult(Found: false, IsOverdue: false, Threshold: null, Elapsed: elapsed); + + var overdue = elapsed > threshold; + return new ServiceResponseThresholdResult(Found: true, IsOverdue: overdue, Threshold: threshold, Elapsed: elapsed); + } + public void Dispose() => SubList.Dispose(); } + +/// +/// Carries the result of a call. +/// +/// Whether a threshold was registered for the subject. +/// Whether exceeds the threshold. +/// The registered threshold, or when not found. +/// The measured elapsed time that was checked. +public sealed record ServiceResponseThresholdResult( + bool Found, + bool IsOverdue, + TimeSpan? Threshold, + TimeSpan Elapsed); diff --git a/src/NATS.Server/Auth/ServiceLatencyTracker.cs b/src/NATS.Server/Auth/ServiceLatencyTracker.cs new file mode 100644 index 0000000..f6cc7d3 --- /dev/null +++ b/src/NATS.Server/Auth/ServiceLatencyTracker.cs @@ -0,0 +1,133 @@ +namespace NATS.Server.Auth; + +/// +/// Tracks service request latency using a sorted list of samples for percentile calculation. +/// Go reference: accounts.go serviceLatency / serviceExportLatencyStats. +/// +public sealed class ServiceLatencyTracker +{ + private readonly Lock _lock = new(); + private readonly List _samples = []; + private readonly int _maxSamples; + private long _totalRequests; + + public ServiceLatencyTracker(int maxSamples = 10000) + { + _maxSamples = maxSamples; + } + + /// Records a latency sample in milliseconds. + public void RecordLatency(double latencyMs) + { + lock (_lock) + { + if (_samples.Count >= _maxSamples) + _samples.RemoveAt(0); + _samples.Add(latencyMs); + _totalRequests++; + } + } + + public double GetP50() => GetPercentile(0.50); + public double GetP90() => GetPercentile(0.90); + public double GetP99() => GetPercentile(0.99); + + /// Returns the value at the given percentile (0.0–1.0) over recorded samples. + public double GetPercentile(double percentile) + { + lock (_lock) + return ComputePercentile(_samples, percentile); + } + + // Must be called under _lock. + private static double ComputePercentile(List samples, double percentile) + { + if (samples.Count == 0) + return 0; + var sorted = new List(samples); + sorted.Sort(); + var index = (int)(percentile * (sorted.Count - 1)); + return sorted[index]; + } + + // Must be called under _lock. + private static double ComputeAverage(List samples) + { + if (samples.Count == 0) + return 0; + var sum = 0.0; + foreach (var s in samples) + sum += s; + return sum / samples.Count; + } + + public long TotalRequests + { + get { lock (_lock) return _totalRequests; } + } + + public double AverageLatencyMs + { + get { lock (_lock) return ComputeAverage(_samples); } + } + + public double MinLatencyMs + { + get + { + lock (_lock) + return _samples.Count == 0 ? 0 : _samples.Min(); + } + } + + public double MaxLatencyMs + { + get + { + lock (_lock) + return _samples.Count == 0 ? 0 : _samples.Max(); + } + } + + public int SampleCount + { + get { lock (_lock) return _samples.Count; } + } + + /// Clears all samples and resets the total request counter. + public void Reset() + { + lock (_lock) + { + _samples.Clear(); + _totalRequests = 0; + } + } + + /// Returns an immutable snapshot of the current tracker state. + public ServiceLatencySnapshot GetSnapshot() + { + lock (_lock) + { + return new ServiceLatencySnapshot( + TotalRequests: _totalRequests, + P50Ms: ComputePercentile(_samples, 0.50), + P90Ms: ComputePercentile(_samples, 0.90), + P99Ms: ComputePercentile(_samples, 0.99), + AverageMs: ComputeAverage(_samples), + MinMs: _samples.Count == 0 ? 0 : _samples.Min(), + MaxMs: _samples.Count == 0 ? 0 : _samples.Max(), + SampleCount: _samples.Count); + } + } +} + +public sealed record ServiceLatencySnapshot( + long TotalRequests, + double P50Ms, + double P90Ms, + double P99Ms, + double AverageMs, + double MinMs, + double MaxMs, + int SampleCount); diff --git a/tests/NATS.Server.Tests/Auth/ResponseThresholdTests.cs b/tests/NATS.Server.Tests/Auth/ResponseThresholdTests.cs new file mode 100644 index 0000000..cbe9898 --- /dev/null +++ b/tests/NATS.Server.Tests/Auth/ResponseThresholdTests.cs @@ -0,0 +1,137 @@ +// Tests for Account.SetServiceResponseThreshold / GetServiceResponseThreshold / +// IsServiceResponseOverdue / CheckServiceResponse. +// Go reference: server/accounts.go — SetServiceExportResponseThreshold (~line 2522), +// ServiceExportResponseThreshold (~line 2510). + +using NATS.Server.Auth; + +namespace NATS.Server.Tests.Auth; + +public class ResponseThresholdTests +{ + // --------------------------------------------------------------------------- + // SetServiceResponseThreshold / GetServiceResponseThreshold + // --------------------------------------------------------------------------- + + [Fact] + public void SetServiceResponseThreshold_StoresThreshold() + { + // Go ref: accounts.go SetServiceExportResponseThreshold (~line 2522) + var account = new Account("test"); + account.SetServiceResponseThreshold("svc.foo", TimeSpan.FromSeconds(5)); + + account.ServiceResponseThresholds.ContainsKey("svc.foo").ShouldBeTrue(); + account.ServiceResponseThresholds["svc.foo"].ShouldBe(TimeSpan.FromSeconds(5)); + } + + [Fact] + public void GetServiceResponseThreshold_ReturnsStored() + { + // Go ref: accounts.go ServiceExportResponseThreshold (~line 2510) + var account = new Account("test"); + account.SetServiceResponseThreshold("svc.bar", TimeSpan.FromMilliseconds(200)); + + account.GetServiceResponseThreshold("svc.bar").ShouldBe(TimeSpan.FromMilliseconds(200)); + } + + [Fact] + public void GetServiceResponseThreshold_NotSet_ReturnsNull() + { + // Go ref: accounts.go ServiceExportResponseThreshold — returns error when export not found + var account = new Account("test"); + + account.GetServiceResponseThreshold("svc.unknown").ShouldBeNull(); + } + + // --------------------------------------------------------------------------- + // IsServiceResponseOverdue + // --------------------------------------------------------------------------- + + [Fact] + public void IsServiceResponseOverdue_WithinThreshold_ReturnsFalse() + { + // Go ref: accounts.go respThresh check — elapsed < threshold ⇒ not overdue + var account = new Account("test"); + account.SetServiceResponseThreshold("svc.a", TimeSpan.FromSeconds(10)); + + account.IsServiceResponseOverdue("svc.a", TimeSpan.FromSeconds(9)).ShouldBeFalse(); + } + + [Fact] + public void IsServiceResponseOverdue_ExceedsThreshold_ReturnsTrue() + { + // Go ref: accounts.go respThresh check — elapsed > threshold ⇒ overdue + var account = new Account("test"); + account.SetServiceResponseThreshold("svc.b", TimeSpan.FromSeconds(1)); + + account.IsServiceResponseOverdue("svc.b", TimeSpan.FromSeconds(2)).ShouldBeTrue(); + } + + [Fact] + public void IsServiceResponseOverdue_NoThreshold_ReturnsFalse() + { + // Go ref: accounts.go — when no respThresh is set the timer never fires (never overdue) + var account = new Account("test"); + + account.IsServiceResponseOverdue("svc.unregistered", TimeSpan.FromHours(1)).ShouldBeFalse(); + } + + [Fact] + public void SetServiceResponseThreshold_OverwritesPrevious() + { + // Go ref: accounts.go SetServiceExportResponseThreshold — se.respThresh = maxTime overwrites + var account = new Account("test"); + account.SetServiceResponseThreshold("svc.c", TimeSpan.FromSeconds(5)); + account.SetServiceResponseThreshold("svc.c", TimeSpan.FromSeconds(30)); + + account.GetServiceResponseThreshold("svc.c").ShouldBe(TimeSpan.FromSeconds(30)); + } + + // --------------------------------------------------------------------------- + // CheckServiceResponse + // --------------------------------------------------------------------------- + + [Fact] + public void CheckServiceResponse_Found_NotOverdue() + { + // Go ref: accounts.go ServiceExportResponseThreshold + respThresh timer — within window + var account = new Account("test"); + account.SetServiceResponseThreshold("svc.d", TimeSpan.FromSeconds(10)); + + var result = account.CheckServiceResponse("svc.d", TimeSpan.FromSeconds(5)); + + result.Found.ShouldBeTrue(); + result.IsOverdue.ShouldBeFalse(); + result.Threshold.ShouldBe(TimeSpan.FromSeconds(10)); + result.Elapsed.ShouldBe(TimeSpan.FromSeconds(5)); + } + + [Fact] + public void CheckServiceResponse_Found_Overdue() + { + // Go ref: accounts.go respThresh timer fires — elapsed exceeded threshold + var account = new Account("test"); + account.SetServiceResponseThreshold("svc.e", TimeSpan.FromSeconds(2)); + + var result = account.CheckServiceResponse("svc.e", TimeSpan.FromSeconds(5)); + + result.Found.ShouldBeTrue(); + result.IsOverdue.ShouldBeTrue(); + result.Threshold.ShouldBe(TimeSpan.FromSeconds(2)); + result.Elapsed.ShouldBe(TimeSpan.FromSeconds(5)); + } + + [Fact] + public void CheckServiceResponse_NotFound() + { + // Go ref: accounts.go — no export defined, returns error; here Found=false + var account = new Account("test"); + + var result = account.CheckServiceResponse("svc.none", TimeSpan.FromSeconds(1)); + + result.Found.ShouldBeFalse(); + result.IsOverdue.ShouldBeFalse(); + result.Threshold.ShouldBeNull(); + result.Elapsed.ShouldBe(TimeSpan.FromSeconds(1)); + } +} diff --git a/tests/NATS.Server.Tests/Auth/ServiceLatencyTrackerTests.cs b/tests/NATS.Server.Tests/Auth/ServiceLatencyTrackerTests.cs new file mode 100644 index 0000000..1e5cf3a --- /dev/null +++ b/tests/NATS.Server.Tests/Auth/ServiceLatencyTrackerTests.cs @@ -0,0 +1,168 @@ +// Tests for service export latency tracker with p50/p90/p99 percentile histogram. +// Go reference: accounts_test.go TestServiceLatency, serviceExportLatencyStats. + +using NATS.Server.Auth; + +namespace NATS.Server.Tests.Auth; + +public class ServiceLatencyTrackerTests +{ + [Fact] + public void RecordLatency_IncrementsTotalRequests() + { + var tracker = new ServiceLatencyTracker(); + + tracker.RecordLatency(10.0); + tracker.RecordLatency(20.0); + tracker.RecordLatency(30.0); + + tracker.TotalRequests.ShouldBe(3L); + } + + [Fact] + public void GetP50_ReturnsMedian() + { + var tracker = new ServiceLatencyTracker(); + + foreach (var v in new double[] { 1, 2, 3, 4, 5 }) + tracker.RecordLatency(v); + + // Sorted: [1, 2, 3, 4, 5], index = (int)(0.50 * 4) = 2 → value 3 + tracker.GetP50().ShouldBe(3.0); + } + + [Fact] + public void GetP90_ReturnsHighPercentile() + { + var tracker = new ServiceLatencyTracker(); + + for (var i = 1; i <= 100; i++) + tracker.RecordLatency(i); + + // Sorted [1..100], index = (int)(0.90 * 99) = (int)89.1 = 89 → value 90 + tracker.GetP90().ShouldBe(90.0); + } + + [Fact] + public void GetP99_ReturnsTopPercentile() + { + var tracker = new ServiceLatencyTracker(); + + for (var i = 1; i <= 100; i++) + tracker.RecordLatency(i); + + // Sorted [1..100], index = (int)(0.99 * 99) = (int)98.01 = 98 → value 99 + tracker.GetP99().ShouldBe(99.0); + } + + [Fact] + public void AverageLatencyMs_CalculatesCorrectly() + { + var tracker = new ServiceLatencyTracker(); + + tracker.RecordLatency(10.0); + tracker.RecordLatency(20.0); + tracker.RecordLatency(30.0); + + tracker.AverageLatencyMs.ShouldBe(20.0); + } + + [Fact] + public void MinLatencyMs_ReturnsMinimum() + { + var tracker = new ServiceLatencyTracker(); + + tracker.RecordLatency(15.0); + tracker.RecordLatency(5.0); + tracker.RecordLatency(10.0); + + tracker.MinLatencyMs.ShouldBe(5.0); + } + + [Fact] + public void MaxLatencyMs_ReturnsMaximum() + { + var tracker = new ServiceLatencyTracker(); + + tracker.RecordLatency(5.0); + tracker.RecordLatency(15.0); + tracker.RecordLatency(10.0); + + tracker.MaxLatencyMs.ShouldBe(15.0); + } + + [Fact] + public void Reset_ClearsSamples() + { + var tracker = new ServiceLatencyTracker(); + + tracker.RecordLatency(10.0); + tracker.RecordLatency(20.0); + tracker.SampleCount.ShouldBe(2); + tracker.TotalRequests.ShouldBe(2L); + + tracker.Reset(); + + tracker.SampleCount.ShouldBe(0); + tracker.TotalRequests.ShouldBe(0L); + tracker.AverageLatencyMs.ShouldBe(0.0); + tracker.MinLatencyMs.ShouldBe(0.0); + tracker.MaxLatencyMs.ShouldBe(0.0); + tracker.GetP50().ShouldBe(0.0); + } + + [Fact] + public void GetSnapshot_ReturnsImmutableSnapshot() + { + var tracker = new ServiceLatencyTracker(); + + tracker.RecordLatency(10.0); + tracker.RecordLatency(20.0); + tracker.RecordLatency(30.0); + + var snapshot = tracker.GetSnapshot(); + + snapshot.TotalRequests.ShouldBe(3L); + snapshot.SampleCount.ShouldBe(3); + snapshot.AverageMs.ShouldBe(20.0); + snapshot.MinMs.ShouldBe(10.0); + snapshot.MaxMs.ShouldBe(30.0); + // P50 of [10, 20, 30]: index = (int)(0.50 * 2) = 1 → 20 + snapshot.P50Ms.ShouldBe(20.0); + + // Mutating tracker after snapshot does not change the snapshot + tracker.RecordLatency(1000.0); + snapshot.MaxMs.ShouldBe(30.0); + snapshot.SampleCount.ShouldBe(3); + } + + [Fact] + public void MaxSamples_EvictsOldest() + { + var tracker = new ServiceLatencyTracker(maxSamples: 5); + + for (var i = 1; i <= 10; i++) + tracker.RecordLatency(i); + + // Only the last 5 samples should remain (6, 7, 8, 9, 10) + tracker.SampleCount.ShouldBe(5); + // TotalRequests counts all recorded calls, not just retained ones + tracker.TotalRequests.ShouldBe(10L); + // Minimum of retained samples is 6 + tracker.MinLatencyMs.ShouldBe(6.0); + // Maximum of retained samples is 10 + tracker.MaxLatencyMs.ShouldBe(10.0); + } + + [Fact] + public void Account_RecordServiceLatency_DelegatesToTracker() + { + var account = new Account("test"); + + account.RecordServiceLatency(50.0); + account.RecordServiceLatency(100.0); + + account.LatencyTracker.TotalRequests.ShouldBe(2L); + account.LatencyTracker.AverageLatencyMs.ShouldBe(75.0); + } +}