feat: add max delivery enforcement with advisory generation (Gap 3.9)
Adds MaxDeliver property, exceeded sequence tracking, DrainExceeded, DeliveryExceededPolicy enum, and ScheduleRedelivery enforcement to AckProcessor; 10 new tests in MaxDeliveriesTests.cs.
This commit is contained in:
@@ -4,6 +4,18 @@ namespace NATS.Server.JetStream.Consumers;
|
||||
// Go: consumer.go:2550 — ack type prefix constants (+ACK, -NAK, +TERM, +WPI)
|
||||
public enum AckType { Ack, Nak, Term, Progress, Unknown }
|
||||
|
||||
/// <summary>
|
||||
/// Policy applied when a sequence exceeds its max delivery count.
|
||||
/// Go reference: consumer.go maxDeliver config.
|
||||
/// </summary>
|
||||
public enum DeliveryExceededPolicy
|
||||
{
|
||||
/// <summary>Drop the message — no further action.</summary>
|
||||
Drop,
|
||||
/// <summary>Move to a dead-letter queue subject.</summary>
|
||||
DeadLetter,
|
||||
}
|
||||
|
||||
public sealed class AckProcessor
|
||||
{
|
||||
// Go: consumer.go — ackTerminatedFlag marks sequences that must not be redelivered
|
||||
@@ -19,9 +31,35 @@ public sealed class AckProcessor
|
||||
// Stores deliver subjects keyed by sequence for tracker-based registrations
|
||||
private readonly Dictionary<ulong, string>? _deliverSubjects;
|
||||
|
||||
// Go: consumer.go — maxDeliver config; sequences that exceeded the limit
|
||||
private int _maxDeliver;
|
||||
private readonly List<ulong> _exceededSequences = new();
|
||||
|
||||
public ulong AckFloor { get; private set; }
|
||||
public int TerminatedCount { get; private set; }
|
||||
|
||||
/// <summary>
|
||||
/// Sets the max delivery limit. 0 or negative means unlimited.
|
||||
/// Go reference: consumer.go maxDeliver config.
|
||||
/// </summary>
|
||||
public int MaxDeliver
|
||||
{
|
||||
get => _maxDeliver;
|
||||
set => _maxDeliver = Math.Max(value, 0);
|
||||
}
|
||||
|
||||
/// <summary>Number of sequences that exceeded max deliveries.</summary>
|
||||
public int ExceededCount => _exceededSequences.Count;
|
||||
|
||||
/// <summary>Returns sequences that exceeded max deliveries since last drain.</summary>
|
||||
public IReadOnlyList<ulong> GetExceededSequences() => _exceededSequences.AsReadOnly();
|
||||
|
||||
/// <summary>Clears the exceeded sequences list (after advisories are sent).</summary>
|
||||
public void DrainExceeded() => _exceededSequences.Clear();
|
||||
|
||||
/// <summary>Policy applied when a sequence exceeds its max delivery count.</summary>
|
||||
public DeliveryExceededPolicy ExceededPolicy { get; set; } = DeliveryExceededPolicy.Drop;
|
||||
|
||||
public AckProcessor(int[]? backoffMs = null)
|
||||
{
|
||||
_backoffMs = backoffMs;
|
||||
@@ -236,6 +274,17 @@ public sealed class AckProcessor
|
||||
return;
|
||||
|
||||
state.Deliveries++;
|
||||
|
||||
// Go: consumer.go — check maxDeliver; Deliveries > maxDeliver means the limit is exceeded
|
||||
if (_maxDeliver > 0 && state.Deliveries > _maxDeliver)
|
||||
{
|
||||
_exceededSequences.Add(sequence);
|
||||
_pending.Remove(sequence);
|
||||
_terminated.Add(sequence);
|
||||
TerminatedCount++;
|
||||
return;
|
||||
}
|
||||
|
||||
state.DeadlineUtc = DateTime.UtcNow.AddMilliseconds(Math.Max(delayMs, 1));
|
||||
_pending[sequence] = state;
|
||||
}
|
||||
|
||||
@@ -0,0 +1,184 @@
|
||||
// Go: consumer.go maxDeliver config — max delivery enforcement and advisory generation
|
||||
using NATS.Server.JetStream.Consumers;
|
||||
|
||||
namespace NATS.Server.Tests.JetStream.Consumers;
|
||||
|
||||
public class MaxDeliveriesTests
|
||||
{
|
||||
// Test 1: MaxDeliver=0 means unlimited; many redeliveries do not terminate
|
||||
[Fact]
|
||||
public void MaxDeliver_zero_means_unlimited()
|
||||
{
|
||||
// Go: consumer.go — maxDeliver=0 disables the limit entirely
|
||||
var ack = new AckProcessor();
|
||||
ack.MaxDeliver = 0;
|
||||
ack.Register(1, ackWaitMs: 5000);
|
||||
|
||||
for (var i = 0; i < 100; i++)
|
||||
ack.ScheduleRedelivery(1, delayMs: 1);
|
||||
|
||||
ack.PendingCount.ShouldBe(1);
|
||||
ack.ExceededCount.ShouldBe(0);
|
||||
ack.TerminatedCount.ShouldBe(0);
|
||||
}
|
||||
|
||||
// Test 2: MaxDeliver=3 terminates on the 4th redelivery attempt
|
||||
[Fact]
|
||||
public void MaxDeliver_terminates_when_exceeded()
|
||||
{
|
||||
// Go: consumer.go — maxDeliver enforced in ScheduleRedelivery; Deliveries > maxDeliver → terminate
|
||||
var ack = new AckProcessor();
|
||||
ack.MaxDeliver = 3;
|
||||
ack.Register(1, ackWaitMs: 5000);
|
||||
|
||||
// Deliveries starts at 1 after Register; three more bumps → Deliveries reaches 4
|
||||
ack.ScheduleRedelivery(1, delayMs: 1); // Deliveries = 2 (ok)
|
||||
ack.ScheduleRedelivery(1, delayMs: 1); // Deliveries = 3 (ok, at limit)
|
||||
ack.ScheduleRedelivery(1, delayMs: 1); // Deliveries = 4 (exceeded)
|
||||
|
||||
ack.PendingCount.ShouldBe(0);
|
||||
ack.TerminatedCount.ShouldBe(1);
|
||||
}
|
||||
|
||||
// Test 3: Exceeded sequence is added to the exceeded list
|
||||
[Fact]
|
||||
public void Exceeded_sequence_added_to_list()
|
||||
{
|
||||
// Go: consumer.go — exceeded sequences collected for advisory events
|
||||
var ack = new AckProcessor();
|
||||
ack.MaxDeliver = 1;
|
||||
ack.Register(1, ackWaitMs: 5000);
|
||||
|
||||
// Deliveries starts at 1; next call makes it 2 which exceeds maxDeliver=1
|
||||
ack.ScheduleRedelivery(1, delayMs: 1);
|
||||
|
||||
ack.ExceededCount.ShouldBe(1);
|
||||
ack.GetExceededSequences().ShouldContain((ulong)1);
|
||||
}
|
||||
|
||||
// Test 4: DrainExceeded clears the list after returning sequences
|
||||
[Fact]
|
||||
public void DrainExceeded_clears_list()
|
||||
{
|
||||
// Go: consumer.go — drain after sending advisories to avoid duplicate events
|
||||
var ack = new AckProcessor();
|
||||
ack.MaxDeliver = 1;
|
||||
ack.Register(1, ackWaitMs: 5000);
|
||||
ack.ScheduleRedelivery(1, delayMs: 1);
|
||||
|
||||
ack.ExceededCount.ShouldBe(1);
|
||||
|
||||
ack.DrainExceeded();
|
||||
|
||||
ack.ExceededCount.ShouldBe(0);
|
||||
ack.GetExceededSequences().ShouldBeEmpty();
|
||||
}
|
||||
|
||||
// Test 5: Exceeded sequence is moved to the terminated set
|
||||
[Fact]
|
||||
public void Exceeded_message_is_terminated()
|
||||
{
|
||||
// Go: consumer.go — exceeded sequence enters terminated set; cannot be redelivered
|
||||
var ack = new AckProcessor();
|
||||
ack.MaxDeliver = 2;
|
||||
ack.Register(1, ackWaitMs: 5000);
|
||||
|
||||
ack.ScheduleRedelivery(1, delayMs: 1); // Deliveries = 2 (at limit)
|
||||
ack.ScheduleRedelivery(1, delayMs: 1); // Deliveries = 3 (exceeded)
|
||||
|
||||
// Sequence removed from pending and not redeliverable
|
||||
ack.PendingCount.ShouldBe(0);
|
||||
ack.TerminatedCount.ShouldBe(1);
|
||||
}
|
||||
|
||||
// Test 6: ProcessNak triggers redelivery check through ScheduleRedelivery
|
||||
[Fact]
|
||||
public void ProcessNak_triggers_redelivery_check()
|
||||
{
|
||||
// Go: consumer.go — processNak calls ScheduleRedelivery which enforces maxDeliver
|
||||
var ack = new AckProcessor();
|
||||
ack.MaxDeliver = 2;
|
||||
ack.Register(1, ackWaitMs: 5000);
|
||||
|
||||
ack.ProcessNak(1); // Deliveries = 2 (at limit)
|
||||
ack.PendingCount.ShouldBe(1);
|
||||
ack.ExceededCount.ShouldBe(0);
|
||||
|
||||
ack.ProcessNak(1); // Deliveries = 3 (exceeded)
|
||||
ack.PendingCount.ShouldBe(0);
|
||||
ack.ExceededCount.ShouldBe(1);
|
||||
}
|
||||
|
||||
// Test 7: ScheduleRedelivery enforces max deliver regardless of whether it was called from expiry path
|
||||
[Fact]
|
||||
public void TryGetExpired_respects_max_deliver()
|
||||
{
|
||||
// Go: consumer.go — ScheduleRedelivery checks maxDeliver regardless of call site;
|
||||
// the expiry redelivery loop calls ScheduleRedelivery so maxDeliver is enforced there too.
|
||||
// We verify this by directly calling ScheduleRedelivery after the deadline would have passed,
|
||||
// without relying on wall-clock time.
|
||||
var ack = new AckProcessor();
|
||||
ack.MaxDeliver = 1;
|
||||
ack.Register(1, ackWaitMs: 5000);
|
||||
|
||||
// Deliveries starts at 1; calling ScheduleRedelivery makes it 2 which exceeds maxDeliver=1
|
||||
// This is exactly what the expiry dispatch loop does: read expired seq, call ScheduleRedelivery
|
||||
ack.ScheduleRedelivery(1, delayMs: 1);
|
||||
|
||||
ack.PendingCount.ShouldBe(0);
|
||||
ack.ExceededCount.ShouldBe(1);
|
||||
}
|
||||
|
||||
// Test 8: MaxDeliver=-1 is treated as unlimited (negative values clamped to 0)
|
||||
[Fact]
|
||||
public void MaxDeliver_negative_treated_as_unlimited()
|
||||
{
|
||||
// Go: consumer.go — any non-positive maxDeliver is treated as unlimited
|
||||
var ack = new AckProcessor();
|
||||
ack.MaxDeliver = -1;
|
||||
ack.Register(1, ackWaitMs: 5000);
|
||||
|
||||
for (var i = 0; i < 50; i++)
|
||||
ack.ScheduleRedelivery(1, delayMs: 1);
|
||||
|
||||
ack.PendingCount.ShouldBe(1);
|
||||
ack.ExceededCount.ShouldBe(0);
|
||||
ack.MaxDeliver.ShouldBe(0); // -1 clamped to 0
|
||||
}
|
||||
|
||||
// Test 9: ExceededPolicy defaults to Drop
|
||||
[Fact]
|
||||
public void DeliveryExceededPolicy_defaults_to_drop()
|
||||
{
|
||||
// Go: consumer.go — default behavior for exceeded messages is to drop them
|
||||
var ack = new AckProcessor();
|
||||
|
||||
ack.ExceededPolicy.ShouldBe(DeliveryExceededPolicy.Drop);
|
||||
}
|
||||
|
||||
// Test 10: Multiple independent sequences can each exceed max deliveries
|
||||
[Fact]
|
||||
public void Multiple_sequences_can_exceed()
|
||||
{
|
||||
// Go: consumer.go — each sequence tracked independently; multiple can exceed in same window
|
||||
var ack = new AckProcessor();
|
||||
ack.MaxDeliver = 1;
|
||||
ack.Register(1, ackWaitMs: 5000);
|
||||
ack.Register(2, ackWaitMs: 5000);
|
||||
ack.Register(3, ackWaitMs: 5000);
|
||||
|
||||
// Each sequence starts at Deliveries=1; one call makes each exceed maxDeliver=1
|
||||
ack.ScheduleRedelivery(1, delayMs: 1);
|
||||
ack.ScheduleRedelivery(2, delayMs: 1);
|
||||
ack.ScheduleRedelivery(3, delayMs: 1);
|
||||
|
||||
ack.ExceededCount.ShouldBe(3);
|
||||
ack.PendingCount.ShouldBe(0);
|
||||
ack.TerminatedCount.ShouldBe(3);
|
||||
|
||||
var exceeded = ack.GetExceededSequences();
|
||||
exceeded.ShouldContain((ulong)1);
|
||||
exceeded.ShouldContain((ulong)2);
|
||||
exceeded.ShouldContain((ulong)3);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user