feat(consumer): add PriorityQueue-based RedeliveryTracker overloads

Add new constructor, Schedule(DateTimeOffset), GetDue(DateTimeOffset),
IncrementDeliveryCount, IsMaxDeliveries(), and GetBackoffDelay() to
RedeliveryTracker without breaking existing API. Uses PriorityQueue<ulong,
DateTimeOffset> for deadline-ordered dispatch mirroring Go consumer.go rdq.
This commit is contained in:
Joseph Doherty
2026-02-25 02:12:37 -05:00
parent 7e4a23a0b7
commit 55de052009
2 changed files with 219 additions and 1 deletions

View File

@@ -7,14 +7,37 @@ namespace NATS.Server.JetStream.Consumers;
public sealed class RedeliveryTracker public sealed class RedeliveryTracker
{ {
private readonly int[] _backoffMs; private readonly int[] _backoffMs;
private readonly long[]? _backoffMsLong;
// Go: consumer.go — pending maps sseq → (deadline, deliveries) // Go: consumer.go — pending maps sseq → (deadline, deliveries)
private readonly Dictionary<ulong, RedeliveryEntry> _entries = new(); private readonly Dictionary<ulong, RedeliveryEntry> _entries = new();
// Go: consumer.go — rdc map tracks per-sequence delivery counts
private readonly Dictionary<ulong, int> _deliveryCounts = new();
// Go: consumer.go — rdq priority queue ordered by deadline for efficient dispatch
private readonly PriorityQueue<ulong, DateTimeOffset> _priorityQueue = new();
// Stored config for the new constructor overload
private readonly int _maxDeliveries;
private readonly long _ackWaitMs;
// Go: consumer.go:100 — BackOff []time.Duration in ConsumerConfig; empty falls back to ackWait // Go: consumer.go:100 — BackOff []time.Duration in ConsumerConfig; empty falls back to ackWait
public RedeliveryTracker(int[] backoffMs) public RedeliveryTracker(int[] backoffMs)
{ {
_backoffMs = backoffMs; _backoffMs = backoffMs;
_backoffMsLong = null;
_maxDeliveries = 0;
_ackWaitMs = 0;
}
// Go: consumer.go — ConsumerConfig maxDeliver + ackWait + backoff, new overload storing config fields
public RedeliveryTracker(int maxDeliveries, long ackWaitMs, long[]? backoffMs = null)
{
_backoffMs = [];
_backoffMsLong = backoffMs;
_maxDeliveries = maxDeliveries;
_ackWaitMs = ackWaitMs;
} }
// Go: consumer.go:5540 — trackPending records delivery count and schedules deadline // Go: consumer.go:5540 — trackPending records delivery count and schedules deadline
@@ -34,6 +57,13 @@ public sealed class RedeliveryTracker
return deadline; return deadline;
} }
// Go: consumer.go — schedule with an explicit deadline into the priority queue
public void Schedule(ulong seq, DateTimeOffset deadline)
{
_deliveryCounts.TryAdd(seq, 0);
_priorityQueue.Enqueue(seq, deadline);
}
// Go: consumer.go — rdq entries are dispatched once their deadline has passed // Go: consumer.go — rdq entries are dispatched once their deadline has passed
public IReadOnlyList<ulong> GetDue() public IReadOnlyList<ulong> GetDue()
{ {
@@ -52,8 +82,53 @@ public sealed class RedeliveryTracker
return due ?? (IReadOnlyList<ulong>)[]; return due ?? (IReadOnlyList<ulong>)[];
} }
// Go: consumer.go — drain the rdq priority queue of all entries whose deadline <= now,
// returning them in deadline order (earliest first).
public IEnumerable<ulong> GetDue(DateTimeOffset now)
{
List<(ulong seq, DateTimeOffset deadline)>? dequeued = null;
List<(ulong seq, DateTimeOffset deadline)>? future = null;
// Drain the entire queue, separating due from future
while (_priorityQueue.TryDequeue(out var seq, out var deadline))
{
// Skip sequences that were acknowledged
if (!_deliveryCounts.ContainsKey(seq))
continue;
if (deadline <= now)
{
dequeued ??= [];
dequeued.Add((seq, deadline));
}
else
{
future ??= [];
future.Add((seq, deadline));
}
}
// Re-enqueue future items
if (future is not null)
{
foreach (var (seq, deadline) in future)
_priorityQueue.Enqueue(seq, deadline);
}
if (dequeued is null)
return [];
// Already extracted in priority order since PriorityQueue dequeues min first
return dequeued.Select(x => x.seq);
}
// Go: consumer.go — acking a sequence removes it from the pending redelivery set // Go: consumer.go — acking a sequence removes it from the pending redelivery set
public void Acknowledge(ulong seq) => _entries.Remove(seq); public void Acknowledge(ulong seq)
{
_entries.Remove(seq);
_deliveryCounts.Remove(seq);
// Priority queue entries are lazily skipped in GetDue when seq not in _deliveryCounts
}
// Go: consumer.go — maxdeliver check: drop sequence once delivery count exceeds max // Go: consumer.go — maxdeliver check: drop sequence once delivery count exceeds max
public bool IsMaxDeliveries(ulong seq, int maxDeliver) public bool IsMaxDeliveries(ulong seq, int maxDeliver)
@@ -67,6 +142,36 @@ public sealed class RedeliveryTracker
return entry.DeliveryCount >= maxDeliver; return entry.DeliveryCount >= maxDeliver;
} }
// Go: consumer.go — maxdeliver check using the stored _maxDeliveries from new constructor
public bool IsMaxDeliveries(ulong seq)
{
if (_maxDeliveries <= 0)
return false;
_deliveryCounts.TryGetValue(seq, out var count);
return count >= _maxDeliveries;
}
// Go: consumer.go — rdc map increment: track how many times a sequence has been delivered
public void IncrementDeliveryCount(ulong seq)
{
_deliveryCounts[seq] = _deliveryCounts.TryGetValue(seq, out var count) ? count + 1 : 1;
}
// Go: consumer.go — backoff delay lookup: index by deliveryCount, clamp to last entry,
// fall back to ackWait when no backoff array is configured.
public long GetBackoffDelay(int deliveryCount)
{
if (_backoffMsLong is { Length: > 0 })
{
var idx = Math.Min(deliveryCount - 1, _backoffMsLong.Length - 1);
if (idx < 0) idx = 0;
return _backoffMsLong[idx];
}
return _ackWaitMs;
}
public bool IsTracking(ulong seq) => _entries.ContainsKey(seq); public bool IsTracking(ulong seq) => _entries.ContainsKey(seq);
public int TrackedCount => _entries.Count; public int TrackedCount => _entries.Count;

View File

@@ -0,0 +1,113 @@
using NATS.Server.JetStream.Consumers;
namespace NATS.Server.Tests.JetStream.Consumers;
/// <summary>
/// Tests for the new PriorityQueue-based RedeliveryTracker features.
/// Go reference: consumer.go (rdq redelivery queue).
/// </summary>
public class RedeliveryTrackerPriorityQueueTests
{
[Fact]
public void Schedule_and_get_due_returns_expired()
{
var tracker = new RedeliveryTracker(maxDeliveries: 5, ackWaitMs: 1000);
var past = DateTimeOffset.UtcNow.AddMilliseconds(-100);
tracker.Schedule(1, past);
tracker.Schedule(2, DateTimeOffset.UtcNow.AddSeconds(60)); // future
var due = tracker.GetDue(DateTimeOffset.UtcNow).ToList();
due.Count.ShouldBe(1);
due[0].ShouldBe(1UL);
}
[Fact]
public void Acknowledge_removes_from_queue()
{
var tracker = new RedeliveryTracker(maxDeliveries: 5, ackWaitMs: 1000);
tracker.Schedule(1, DateTimeOffset.UtcNow.AddMilliseconds(-100));
tracker.Acknowledge(1);
var due = tracker.GetDue(DateTimeOffset.UtcNow).ToList();
due.ShouldBeEmpty();
}
[Fact]
public void IsMaxDeliveries_returns_true_at_threshold()
{
var tracker = new RedeliveryTracker(maxDeliveries: 3, ackWaitMs: 1000);
tracker.IncrementDeliveryCount(1);
tracker.IncrementDeliveryCount(1);
tracker.IsMaxDeliveries(1).ShouldBeFalse();
tracker.IncrementDeliveryCount(1);
tracker.IsMaxDeliveries(1).ShouldBeTrue();
}
[Fact]
public void Backoff_schedule_uses_delivery_count()
{
var backoff = new long[] { 100, 500, 2000 };
var tracker = new RedeliveryTracker(maxDeliveries: 10, ackWaitMs: 1000, backoffMs: backoff);
// First redeliver: 100ms
var delay1 = tracker.GetBackoffDelay(deliveryCount: 1);
delay1.ShouldBe(100L);
// Second: 500ms
var delay2 = tracker.GetBackoffDelay(deliveryCount: 2);
delay2.ShouldBe(500L);
// Beyond schedule: use last value
var delay4 = tracker.GetBackoffDelay(deliveryCount: 4);
delay4.ShouldBe(2000L);
}
[Fact]
public void GetDue_returns_in_deadline_order()
{
var tracker = new RedeliveryTracker(maxDeliveries: 5, ackWaitMs: 1000);
var now = DateTimeOffset.UtcNow;
tracker.Schedule(3, now.AddMilliseconds(-300));
tracker.Schedule(1, now.AddMilliseconds(-100));
tracker.Schedule(2, now.AddMilliseconds(-200));
var due = tracker.GetDue(now).ToList();
due.Count.ShouldBe(3);
due[0].ShouldBe(3UL); // earliest deadline first
due[1].ShouldBe(2UL);
due[2].ShouldBe(1UL);
}
[Fact]
public void GetBackoffDelay_with_no_backoff_returns_ackWait()
{
var tracker = new RedeliveryTracker(maxDeliveries: 5, ackWaitMs: 2000);
tracker.GetBackoffDelay(1).ShouldBe(2000L);
tracker.GetBackoffDelay(5).ShouldBe(2000L);
}
[Fact]
public void IncrementDeliveryCount_for_untracked_seq_starts_at_one()
{
var tracker = new RedeliveryTracker(maxDeliveries: 5, ackWaitMs: 1000);
tracker.IncrementDeliveryCount(42);
// First increment should make count = 1, so maxDeliveries=5 means not max yet
tracker.IsMaxDeliveries(42).ShouldBeFalse();
}
[Fact]
public void Acknowledge_also_clears_delivery_count()
{
var tracker = new RedeliveryTracker(maxDeliveries: 3, ackWaitMs: 1000);
tracker.IncrementDeliveryCount(1);
tracker.IncrementDeliveryCount(1);
tracker.Acknowledge(1);
// After ack, delivery count should be cleared
tracker.IsMaxDeliveries(1).ShouldBeFalse();
}
}