From 55de0520096de68fa8a9cb025afe8f4f400036ee Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Wed, 25 Feb 2026 02:12:37 -0500 Subject: [PATCH] feat(consumer): add PriorityQueue-based RedeliveryTracker overloads Add new constructor, Schedule(DateTimeOffset), GetDue(DateTimeOffset), IncrementDeliveryCount, IsMaxDeliveries(), and GetBackoffDelay() to RedeliveryTracker without breaking existing API. Uses PriorityQueue for deadline-ordered dispatch mirroring Go consumer.go rdq. --- .../JetStream/Consumers/RedeliveryTracker.cs | 107 ++++++++++++++++- .../RedeliveryTrackerPriorityQueueTests.cs | 113 ++++++++++++++++++ 2 files changed, 219 insertions(+), 1 deletion(-) create mode 100644 tests/NATS.Server.Tests/JetStream/Consumers/RedeliveryTrackerPriorityQueueTests.cs diff --git a/src/NATS.Server/JetStream/Consumers/RedeliveryTracker.cs b/src/NATS.Server/JetStream/Consumers/RedeliveryTracker.cs index eb003ed..1213fb2 100644 --- a/src/NATS.Server/JetStream/Consumers/RedeliveryTracker.cs +++ b/src/NATS.Server/JetStream/Consumers/RedeliveryTracker.cs @@ -7,14 +7,37 @@ namespace NATS.Server.JetStream.Consumers; public sealed class RedeliveryTracker { private readonly int[] _backoffMs; + private readonly long[]? _backoffMsLong; // Go: consumer.go — pending maps sseq → (deadline, deliveries) private readonly Dictionary _entries = new(); + // Go: consumer.go — rdc map tracks per-sequence delivery counts + private readonly Dictionary _deliveryCounts = new(); + + // Go: consumer.go — rdq priority queue ordered by deadline for efficient dispatch + private readonly PriorityQueue _priorityQueue = new(); + + // Stored config for the new constructor overload + private readonly int _maxDeliveries; + private readonly long _ackWaitMs; + // Go: consumer.go:100 — BackOff []time.Duration in ConsumerConfig; empty falls back to ackWait public RedeliveryTracker(int[] backoffMs) { _backoffMs = backoffMs; + _backoffMsLong = null; + _maxDeliveries = 0; + _ackWaitMs = 0; + } + + // Go: consumer.go — ConsumerConfig maxDeliver + ackWait + backoff, new overload storing config fields + public RedeliveryTracker(int maxDeliveries, long ackWaitMs, long[]? backoffMs = null) + { + _backoffMs = []; + _backoffMsLong = backoffMs; + _maxDeliveries = maxDeliveries; + _ackWaitMs = ackWaitMs; } // Go: consumer.go:5540 — trackPending records delivery count and schedules deadline @@ -34,6 +57,13 @@ public sealed class RedeliveryTracker return deadline; } + // Go: consumer.go — schedule with an explicit deadline into the priority queue + public void Schedule(ulong seq, DateTimeOffset deadline) + { + _deliveryCounts.TryAdd(seq, 0); + _priorityQueue.Enqueue(seq, deadline); + } + // Go: consumer.go — rdq entries are dispatched once their deadline has passed public IReadOnlyList GetDue() { @@ -52,8 +82,53 @@ public sealed class RedeliveryTracker return due ?? (IReadOnlyList)[]; } + // Go: consumer.go — drain the rdq priority queue of all entries whose deadline <= now, + // returning them in deadline order (earliest first). + public IEnumerable GetDue(DateTimeOffset now) + { + List<(ulong seq, DateTimeOffset deadline)>? dequeued = null; + List<(ulong seq, DateTimeOffset deadline)>? future = null; + + // Drain the entire queue, separating due from future + while (_priorityQueue.TryDequeue(out var seq, out var deadline)) + { + // Skip sequences that were acknowledged + if (!_deliveryCounts.ContainsKey(seq)) + continue; + + if (deadline <= now) + { + dequeued ??= []; + dequeued.Add((seq, deadline)); + } + else + { + future ??= []; + future.Add((seq, deadline)); + } + } + + // Re-enqueue future items + if (future is not null) + { + foreach (var (seq, deadline) in future) + _priorityQueue.Enqueue(seq, deadline); + } + + if (dequeued is null) + return []; + + // Already extracted in priority order since PriorityQueue dequeues min first + return dequeued.Select(x => x.seq); + } + // Go: consumer.go — acking a sequence removes it from the pending redelivery set - public void Acknowledge(ulong seq) => _entries.Remove(seq); + public void Acknowledge(ulong seq) + { + _entries.Remove(seq); + _deliveryCounts.Remove(seq); + // Priority queue entries are lazily skipped in GetDue when seq not in _deliveryCounts + } // Go: consumer.go — maxdeliver check: drop sequence once delivery count exceeds max public bool IsMaxDeliveries(ulong seq, int maxDeliver) @@ -67,6 +142,36 @@ public sealed class RedeliveryTracker return entry.DeliveryCount >= maxDeliver; } + // Go: consumer.go — maxdeliver check using the stored _maxDeliveries from new constructor + public bool IsMaxDeliveries(ulong seq) + { + if (_maxDeliveries <= 0) + return false; + + _deliveryCounts.TryGetValue(seq, out var count); + return count >= _maxDeliveries; + } + + // Go: consumer.go — rdc map increment: track how many times a sequence has been delivered + public void IncrementDeliveryCount(ulong seq) + { + _deliveryCounts[seq] = _deliveryCounts.TryGetValue(seq, out var count) ? count + 1 : 1; + } + + // Go: consumer.go — backoff delay lookup: index by deliveryCount, clamp to last entry, + // fall back to ackWait when no backoff array is configured. + public long GetBackoffDelay(int deliveryCount) + { + if (_backoffMsLong is { Length: > 0 }) + { + var idx = Math.Min(deliveryCount - 1, _backoffMsLong.Length - 1); + if (idx < 0) idx = 0; + return _backoffMsLong[idx]; + } + + return _ackWaitMs; + } + public bool IsTracking(ulong seq) => _entries.ContainsKey(seq); public int TrackedCount => _entries.Count; diff --git a/tests/NATS.Server.Tests/JetStream/Consumers/RedeliveryTrackerPriorityQueueTests.cs b/tests/NATS.Server.Tests/JetStream/Consumers/RedeliveryTrackerPriorityQueueTests.cs new file mode 100644 index 0000000..755ab30 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/Consumers/RedeliveryTrackerPriorityQueueTests.cs @@ -0,0 +1,113 @@ +using NATS.Server.JetStream.Consumers; + +namespace NATS.Server.Tests.JetStream.Consumers; + +/// +/// Tests for the new PriorityQueue-based RedeliveryTracker features. +/// Go reference: consumer.go (rdq redelivery queue). +/// +public class RedeliveryTrackerPriorityQueueTests +{ + [Fact] + public void Schedule_and_get_due_returns_expired() + { + var tracker = new RedeliveryTracker(maxDeliveries: 5, ackWaitMs: 1000); + var past = DateTimeOffset.UtcNow.AddMilliseconds(-100); + + tracker.Schedule(1, past); + tracker.Schedule(2, DateTimeOffset.UtcNow.AddSeconds(60)); // future + + var due = tracker.GetDue(DateTimeOffset.UtcNow).ToList(); + due.Count.ShouldBe(1); + due[0].ShouldBe(1UL); + } + + [Fact] + public void Acknowledge_removes_from_queue() + { + var tracker = new RedeliveryTracker(maxDeliveries: 5, ackWaitMs: 1000); + tracker.Schedule(1, DateTimeOffset.UtcNow.AddMilliseconds(-100)); + + tracker.Acknowledge(1); + + var due = tracker.GetDue(DateTimeOffset.UtcNow).ToList(); + due.ShouldBeEmpty(); + } + + [Fact] + public void IsMaxDeliveries_returns_true_at_threshold() + { + var tracker = new RedeliveryTracker(maxDeliveries: 3, ackWaitMs: 1000); + + tracker.IncrementDeliveryCount(1); + tracker.IncrementDeliveryCount(1); + tracker.IsMaxDeliveries(1).ShouldBeFalse(); + + tracker.IncrementDeliveryCount(1); + tracker.IsMaxDeliveries(1).ShouldBeTrue(); + } + + [Fact] + public void Backoff_schedule_uses_delivery_count() + { + var backoff = new long[] { 100, 500, 2000 }; + var tracker = new RedeliveryTracker(maxDeliveries: 10, ackWaitMs: 1000, backoffMs: backoff); + + // First redeliver: 100ms + var delay1 = tracker.GetBackoffDelay(deliveryCount: 1); + delay1.ShouldBe(100L); + + // Second: 500ms + var delay2 = tracker.GetBackoffDelay(deliveryCount: 2); + delay2.ShouldBe(500L); + + // Beyond schedule: use last value + var delay4 = tracker.GetBackoffDelay(deliveryCount: 4); + delay4.ShouldBe(2000L); + } + + [Fact] + public void GetDue_returns_in_deadline_order() + { + var tracker = new RedeliveryTracker(maxDeliveries: 5, ackWaitMs: 1000); + var now = DateTimeOffset.UtcNow; + + tracker.Schedule(3, now.AddMilliseconds(-300)); + tracker.Schedule(1, now.AddMilliseconds(-100)); + tracker.Schedule(2, now.AddMilliseconds(-200)); + + var due = tracker.GetDue(now).ToList(); + due.Count.ShouldBe(3); + due[0].ShouldBe(3UL); // earliest deadline first + due[1].ShouldBe(2UL); + due[2].ShouldBe(1UL); + } + + [Fact] + public void GetBackoffDelay_with_no_backoff_returns_ackWait() + { + var tracker = new RedeliveryTracker(maxDeliveries: 5, ackWaitMs: 2000); + tracker.GetBackoffDelay(1).ShouldBe(2000L); + tracker.GetBackoffDelay(5).ShouldBe(2000L); + } + + [Fact] + public void IncrementDeliveryCount_for_untracked_seq_starts_at_one() + { + var tracker = new RedeliveryTracker(maxDeliveries: 5, ackWaitMs: 1000); + tracker.IncrementDeliveryCount(42); + // First increment should make count = 1, so maxDeliveries=5 means not max yet + tracker.IsMaxDeliveries(42).ShouldBeFalse(); + } + + [Fact] + public void Acknowledge_also_clears_delivery_count() + { + var tracker = new RedeliveryTracker(maxDeliveries: 3, ackWaitMs: 1000); + tracker.IncrementDeliveryCount(1); + tracker.IncrementDeliveryCount(1); + tracker.Acknowledge(1); + // After ack, delivery count should be cleared + tracker.IsMaxDeliveries(1).ShouldBeFalse(); + } +}