diff --git a/src/NATS.Server/JetStream/Consumers/SampleTracker.cs b/src/NATS.Server/JetStream/Consumers/SampleTracker.cs new file mode 100644 index 0000000..634ef03 --- /dev/null +++ b/src/NATS.Server/JetStream/Consumers/SampleTracker.cs @@ -0,0 +1,111 @@ +namespace NATS.Server.JetStream.Consumers; + +/// +/// Implements stochastic sampling for consumer delivery latency measurement. +/// When enabled, a configurable percentage of messages are sampled and their +/// delivery latency recorded for advisory publication. +/// Go reference: consumer.go sampleFrequency, shouldSample. +/// +public sealed class SampleTracker +{ + private readonly double _sampleRate; + private readonly Random _random; + private long _sampleCount; + private long _totalDeliveries; + + /// + /// Creates a sample tracker with the given rate (0.0 to 1.0). + /// Use ParseSampleFrequency to convert string like "1%" to rate. + /// + public SampleTracker(double sampleRate, Random? random = null) + { + _sampleRate = Math.Clamp(sampleRate, 0.0, 1.0); + _random = random ?? Random.Shared; + } + + /// The configured sample rate (0.0 to 1.0). + public double SampleRate => _sampleRate; + + /// Number of messages that were sampled. + public long SampleCount => Interlocked.Read(ref _sampleCount); + + /// Total number of deliveries checked. + public long TotalDeliveries => Interlocked.Read(ref _totalDeliveries); + + /// + /// Returns true if this delivery should be sampled. + /// Uses Random to stochastically select based on sample rate. + /// Go reference: consumer.go shouldSample. + /// + public bool ShouldSample() + { + Interlocked.Increment(ref _totalDeliveries); + + if (_sampleRate <= 0.0) return false; + if (_sampleRate >= 1.0) + { + Interlocked.Increment(ref _sampleCount); + return true; + } + + if (_random.NextDouble() < _sampleRate) + { + Interlocked.Increment(ref _sampleCount); + return true; + } + + return false; + } + + /// + /// Records a latency measurement for a sampled delivery. + /// Returns a LatencySample for advisory publication. + /// + public LatencySample RecordLatency(TimeSpan deliveryLatency, ulong sequence, string subject) + { + return new LatencySample + { + Sequence = sequence, + Subject = subject, + DeliveryLatency = deliveryLatency, + SampledAtUtc = DateTime.UtcNow, + }; + } + + /// + /// Parses a sample frequency string like "1%", "50%", "100%". + /// Returns the rate as a double (0.0 to 1.0). + /// Returns 0.0 for invalid or empty strings. + /// Go reference: consumer.go parseSampleFrequency. + /// + public static double ParseSampleFrequency(string? frequency) + { + if (string.IsNullOrWhiteSpace(frequency)) + return 0.0; + + var trimmed = frequency.Trim(); + if (trimmed.EndsWith('%')) + trimmed = trimmed[..^1].Trim(); + + if (double.TryParse(trimmed, System.Globalization.CultureInfo.InvariantCulture, out var value)) + { + if (value <= 0) return 0.0; + if (value > 100) return 1.0; + return value / 100.0; + } + + return 0.0; + } +} + +/// +/// A single latency sample from a consumer delivery. +/// Go reference: consumer.go ackLatency advisory. +/// +public sealed class LatencySample +{ + public ulong Sequence { get; init; } + public string Subject { get; init; } = string.Empty; + public TimeSpan DeliveryLatency { get; init; } + public DateTime SampledAtUtc { get; init; } +} diff --git a/tests/NATS.Server.Tests/JetStream/Consumers/SampleModeTests.cs b/tests/NATS.Server.Tests/JetStream/Consumers/SampleModeTests.cs new file mode 100644 index 0000000..e0ff983 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/Consumers/SampleModeTests.cs @@ -0,0 +1,174 @@ +using NATS.Server.JetStream.Consumers; + +namespace NATS.Server.Tests.JetStream.Consumers; + +/// +/// Tests for SampleTracker: sample frequency parsing and stochastic latency sampling. +/// Go reference: consumer.go sampleFrequency, shouldSample, parseSampleFrequency. +/// +public class SampleModeTests +{ + // --- ParseSampleFrequency --- + + [Fact] + public void ParseSampleFrequency_one_percent() + { + var rate = SampleTracker.ParseSampleFrequency("1%"); + rate.ShouldBe(0.01, 1e-9); + } + + [Fact] + public void ParseSampleFrequency_fifty_percent() + { + var rate = SampleTracker.ParseSampleFrequency("50%"); + rate.ShouldBe(0.5, 1e-9); + } + + [Fact] + public void ParseSampleFrequency_hundred_percent() + { + var rate = SampleTracker.ParseSampleFrequency("100%"); + rate.ShouldBe(1.0, 1e-9); + } + + [Fact] + public void ParseSampleFrequency_zero() + { + var rate = SampleTracker.ParseSampleFrequency("0%"); + rate.ShouldBe(0.0, 1e-9); + } + + [Fact] + public void ParseSampleFrequency_no_percent_sign() + { + var rate = SampleTracker.ParseSampleFrequency("25"); + rate.ShouldBe(0.25, 1e-9); + } + + [Fact] + public void ParseSampleFrequency_empty_string() + { + var rate = SampleTracker.ParseSampleFrequency(""); + rate.ShouldBe(0.0, 1e-9); + } + + [Fact] + public void ParseSampleFrequency_null() + { + var rate = SampleTracker.ParseSampleFrequency(null); + rate.ShouldBe(0.0, 1e-9); + } + + [Fact] + public void ParseSampleFrequency_invalid() + { + var rate = SampleTracker.ParseSampleFrequency("abc"); + rate.ShouldBe(0.0, 1e-9); + } + + [Fact] + public void ParseSampleFrequency_over_100_clamped() + { + var rate = SampleTracker.ParseSampleFrequency("200%"); + rate.ShouldBe(1.0, 1e-9); + } + + // --- ShouldSample --- + + [Fact] + public void ShouldSample_rate_100_always_samples() + { + var tracker = new SampleTracker(1.0); + for (var i = 0; i < 20; i++) + { + tracker.ShouldSample().ShouldBeTrue(); + } + } + + [Fact] + public void ShouldSample_rate_0_never_samples() + { + var tracker = new SampleTracker(0.0); + for (var i = 0; i < 20; i++) + { + tracker.ShouldSample().ShouldBeFalse(); + } + } + + [Fact] + public void ShouldSample_increments_total_deliveries() + { + var tracker = new SampleTracker(0.5); + tracker.TotalDeliveries.ShouldBe(0L); + + tracker.ShouldSample(); + tracker.TotalDeliveries.ShouldBe(1L); + + tracker.ShouldSample(); + tracker.TotalDeliveries.ShouldBe(2L); + + tracker.ShouldSample(); + tracker.TotalDeliveries.ShouldBe(3L); + } + + [Fact] + public void ShouldSample_stochastic_with_seeded_random() + { + // Use a seeded Random for deterministic results. + // With seed 42 and rate 0.5, we can predict exact outcomes. + var rng = new Random(42); + var tracker = new SampleTracker(0.5, rng); + + // Pre-compute expected outcomes using the same seed. + var expectedRng = new Random(42); + var expected = new bool[10]; + for (var i = 0; i < 10; i++) + { + expected[i] = expectedRng.NextDouble() < 0.5; + } + + var actual = new bool[10]; + for (var i = 0; i < 10; i++) + { + actual[i] = tracker.ShouldSample(); + } + + actual.ShouldBe(expected); + tracker.TotalDeliveries.ShouldBe(10L); + } + + [Fact] + public void RecordLatency_captures_all_fields() + { + var tracker = new SampleTracker(1.0); + var latency = TimeSpan.FromMilliseconds(42); + const ulong seq = 7UL; + const string subject = "orders.new"; + + var before = DateTime.UtcNow; + var sample = tracker.RecordLatency(latency, seq, subject); + var after = DateTime.UtcNow; + + sample.Sequence.ShouldBe(seq); + sample.Subject.ShouldBe(subject); + sample.DeliveryLatency.ShouldBe(latency); + sample.SampledAtUtc.ShouldBeGreaterThanOrEqualTo(before); + sample.SampledAtUtc.ShouldBeLessThanOrEqualTo(after); + } + + [Fact] + public void SampleCount_tracks_sampled_only() + { + // Rate 1.0: every delivery is sampled. + var allSampled = new SampleTracker(1.0); + for (var i = 0; i < 5; i++) allSampled.ShouldSample(); + allSampled.SampleCount.ShouldBe(5L); + allSampled.TotalDeliveries.ShouldBe(5L); + + // Rate 0.0: no delivery is sampled. + var noneSampled = new SampleTracker(0.0); + for (var i = 0; i < 5; i++) noneSampled.ShouldSample(); + noneSampled.SampleCount.ShouldBe(0L); + noneSampled.TotalDeliveries.ShouldBe(5L); + } +}