feat: add sample/observe mode with latency measurement (Gap 3.11)

Implements SampleTracker with stochastic delivery sampling (ParseSampleFrequency,
ShouldSample, RecordLatency) and LatencySample for consumer observability advisories.
Ports consumer.go sampleFrequency / shouldSample / parseSampleFrequency logic.
15 new tests covering parsing, rate extremes, deterministic seeded sampling, and field capture.
This commit is contained in:
Joseph Doherty
2026-02-25 11:14:58 -05:00
parent 8b4b236968
commit b9aa62ae99
2 changed files with 285 additions and 0 deletions

View File

@@ -0,0 +1,111 @@
namespace NATS.Server.JetStream.Consumers;
/// <summary>
/// Implements stochastic sampling for consumer delivery latency measurement.
/// When enabled, a configurable percentage of messages are sampled and their
/// delivery latency recorded for advisory publication.
/// Go reference: consumer.go sampleFrequency, shouldSample.
/// </summary>
public sealed class SampleTracker
{
private readonly double _sampleRate;
private readonly Random _random;
private long _sampleCount;
private long _totalDeliveries;
/// <summary>
/// Creates a sample tracker with the given rate (0.0 to 1.0).
/// Use ParseSampleFrequency to convert string like "1%" to rate.
/// </summary>
public SampleTracker(double sampleRate, Random? random = null)
{
_sampleRate = Math.Clamp(sampleRate, 0.0, 1.0);
_random = random ?? Random.Shared;
}
/// <summary>The configured sample rate (0.0 to 1.0).</summary>
public double SampleRate => _sampleRate;
/// <summary>Number of messages that were sampled.</summary>
public long SampleCount => Interlocked.Read(ref _sampleCount);
/// <summary>Total number of deliveries checked.</summary>
public long TotalDeliveries => Interlocked.Read(ref _totalDeliveries);
/// <summary>
/// Returns true if this delivery should be sampled.
/// Uses Random to stochastically select based on sample rate.
/// Go reference: consumer.go shouldSample.
/// </summary>
public bool ShouldSample()
{
Interlocked.Increment(ref _totalDeliveries);
if (_sampleRate <= 0.0) return false;
if (_sampleRate >= 1.0)
{
Interlocked.Increment(ref _sampleCount);
return true;
}
if (_random.NextDouble() < _sampleRate)
{
Interlocked.Increment(ref _sampleCount);
return true;
}
return false;
}
/// <summary>
/// Records a latency measurement for a sampled delivery.
/// Returns a LatencySample for advisory publication.
/// </summary>
public LatencySample RecordLatency(TimeSpan deliveryLatency, ulong sequence, string subject)
{
return new LatencySample
{
Sequence = sequence,
Subject = subject,
DeliveryLatency = deliveryLatency,
SampledAtUtc = DateTime.UtcNow,
};
}
/// <summary>
/// Parses a sample frequency string like "1%", "50%", "100%".
/// Returns the rate as a double (0.0 to 1.0).
/// Returns 0.0 for invalid or empty strings.
/// Go reference: consumer.go parseSampleFrequency.
/// </summary>
public static double ParseSampleFrequency(string? frequency)
{
if (string.IsNullOrWhiteSpace(frequency))
return 0.0;
var trimmed = frequency.Trim();
if (trimmed.EndsWith('%'))
trimmed = trimmed[..^1].Trim();
if (double.TryParse(trimmed, System.Globalization.CultureInfo.InvariantCulture, out var value))
{
if (value <= 0) return 0.0;
if (value > 100) return 1.0;
return value / 100.0;
}
return 0.0;
}
}
/// <summary>
/// A single latency sample from a consumer delivery.
/// Go reference: consumer.go ackLatency advisory.
/// </summary>
public sealed class LatencySample
{
public ulong Sequence { get; init; }
public string Subject { get; init; } = string.Empty;
public TimeSpan DeliveryLatency { get; init; }
public DateTime SampledAtUtc { get; init; }
}