diff --git a/src/NATS.Server/JetStream/Consumers/AckProcessor.cs b/src/NATS.Server/JetStream/Consumers/AckProcessor.cs index 7c26e02..8f63e9b 100644 --- a/src/NATS.Server/JetStream/Consumers/AckProcessor.cs +++ b/src/NATS.Server/JetStream/Consumers/AckProcessor.cs @@ -4,6 +4,18 @@ namespace NATS.Server.JetStream.Consumers; // Go: consumer.go:2550 — ack type prefix constants (+ACK, -NAK, +TERM, +WPI) public enum AckType { Ack, Nak, Term, Progress, Unknown } +/// +/// Policy applied when a sequence exceeds its max delivery count. +/// Go reference: consumer.go maxDeliver config. +/// +public enum DeliveryExceededPolicy +{ + /// Drop the message — no further action. + Drop, + /// Move to a dead-letter queue subject. + DeadLetter, +} + public sealed class AckProcessor { // Go: consumer.go — ackTerminatedFlag marks sequences that must not be redelivered @@ -19,9 +31,35 @@ public sealed class AckProcessor // Stores deliver subjects keyed by sequence for tracker-based registrations private readonly Dictionary? _deliverSubjects; + // Go: consumer.go — maxDeliver config; sequences that exceeded the limit + private int _maxDeliver; + private readonly List _exceededSequences = new(); + public ulong AckFloor { get; private set; } public int TerminatedCount { get; private set; } + /// + /// Sets the max delivery limit. 0 or negative means unlimited. + /// Go reference: consumer.go maxDeliver config. + /// + public int MaxDeliver + { + get => _maxDeliver; + set => _maxDeliver = Math.Max(value, 0); + } + + /// Number of sequences that exceeded max deliveries. + public int ExceededCount => _exceededSequences.Count; + + /// Returns sequences that exceeded max deliveries since last drain. + public IReadOnlyList GetExceededSequences() => _exceededSequences.AsReadOnly(); + + /// Clears the exceeded sequences list (after advisories are sent). + public void DrainExceeded() => _exceededSequences.Clear(); + + /// Policy applied when a sequence exceeds its max delivery count. + public DeliveryExceededPolicy ExceededPolicy { get; set; } = DeliveryExceededPolicy.Drop; + public AckProcessor(int[]? backoffMs = null) { _backoffMs = backoffMs; @@ -236,6 +274,17 @@ public sealed class AckProcessor return; state.Deliveries++; + + // Go: consumer.go — check maxDeliver; Deliveries > maxDeliver means the limit is exceeded + if (_maxDeliver > 0 && state.Deliveries > _maxDeliver) + { + _exceededSequences.Add(sequence); + _pending.Remove(sequence); + _terminated.Add(sequence); + TerminatedCount++; + return; + } + state.DeadlineUtc = DateTime.UtcNow.AddMilliseconds(Math.Max(delayMs, 1)); _pending[sequence] = state; } diff --git a/tests/NATS.Server.Tests/JetStream/Consumers/MaxDeliveriesTests.cs b/tests/NATS.Server.Tests/JetStream/Consumers/MaxDeliveriesTests.cs new file mode 100644 index 0000000..d134ff2 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/Consumers/MaxDeliveriesTests.cs @@ -0,0 +1,184 @@ +// Go: consumer.go maxDeliver config — max delivery enforcement and advisory generation +using NATS.Server.JetStream.Consumers; + +namespace NATS.Server.Tests.JetStream.Consumers; + +public class MaxDeliveriesTests +{ + // Test 1: MaxDeliver=0 means unlimited; many redeliveries do not terminate + [Fact] + public void MaxDeliver_zero_means_unlimited() + { + // Go: consumer.go — maxDeliver=0 disables the limit entirely + var ack = new AckProcessor(); + ack.MaxDeliver = 0; + ack.Register(1, ackWaitMs: 5000); + + for (var i = 0; i < 100; i++) + ack.ScheduleRedelivery(1, delayMs: 1); + + ack.PendingCount.ShouldBe(1); + ack.ExceededCount.ShouldBe(0); + ack.TerminatedCount.ShouldBe(0); + } + + // Test 2: MaxDeliver=3 terminates on the 4th redelivery attempt + [Fact] + public void MaxDeliver_terminates_when_exceeded() + { + // Go: consumer.go — maxDeliver enforced in ScheduleRedelivery; Deliveries > maxDeliver → terminate + var ack = new AckProcessor(); + ack.MaxDeliver = 3; + ack.Register(1, ackWaitMs: 5000); + + // Deliveries starts at 1 after Register; three more bumps → Deliveries reaches 4 + ack.ScheduleRedelivery(1, delayMs: 1); // Deliveries = 2 (ok) + ack.ScheduleRedelivery(1, delayMs: 1); // Deliveries = 3 (ok, at limit) + ack.ScheduleRedelivery(1, delayMs: 1); // Deliveries = 4 (exceeded) + + ack.PendingCount.ShouldBe(0); + ack.TerminatedCount.ShouldBe(1); + } + + // Test 3: Exceeded sequence is added to the exceeded list + [Fact] + public void Exceeded_sequence_added_to_list() + { + // Go: consumer.go — exceeded sequences collected for advisory events + var ack = new AckProcessor(); + ack.MaxDeliver = 1; + ack.Register(1, ackWaitMs: 5000); + + // Deliveries starts at 1; next call makes it 2 which exceeds maxDeliver=1 + ack.ScheduleRedelivery(1, delayMs: 1); + + ack.ExceededCount.ShouldBe(1); + ack.GetExceededSequences().ShouldContain((ulong)1); + } + + // Test 4: DrainExceeded clears the list after returning sequences + [Fact] + public void DrainExceeded_clears_list() + { + // Go: consumer.go — drain after sending advisories to avoid duplicate events + var ack = new AckProcessor(); + ack.MaxDeliver = 1; + ack.Register(1, ackWaitMs: 5000); + ack.ScheduleRedelivery(1, delayMs: 1); + + ack.ExceededCount.ShouldBe(1); + + ack.DrainExceeded(); + + ack.ExceededCount.ShouldBe(0); + ack.GetExceededSequences().ShouldBeEmpty(); + } + + // Test 5: Exceeded sequence is moved to the terminated set + [Fact] + public void Exceeded_message_is_terminated() + { + // Go: consumer.go — exceeded sequence enters terminated set; cannot be redelivered + var ack = new AckProcessor(); + ack.MaxDeliver = 2; + ack.Register(1, ackWaitMs: 5000); + + ack.ScheduleRedelivery(1, delayMs: 1); // Deliveries = 2 (at limit) + ack.ScheduleRedelivery(1, delayMs: 1); // Deliveries = 3 (exceeded) + + // Sequence removed from pending and not redeliverable + ack.PendingCount.ShouldBe(0); + ack.TerminatedCount.ShouldBe(1); + } + + // Test 6: ProcessNak triggers redelivery check through ScheduleRedelivery + [Fact] + public void ProcessNak_triggers_redelivery_check() + { + // Go: consumer.go — processNak calls ScheduleRedelivery which enforces maxDeliver + var ack = new AckProcessor(); + ack.MaxDeliver = 2; + ack.Register(1, ackWaitMs: 5000); + + ack.ProcessNak(1); // Deliveries = 2 (at limit) + ack.PendingCount.ShouldBe(1); + ack.ExceededCount.ShouldBe(0); + + ack.ProcessNak(1); // Deliveries = 3 (exceeded) + ack.PendingCount.ShouldBe(0); + ack.ExceededCount.ShouldBe(1); + } + + // Test 7: ScheduleRedelivery enforces max deliver regardless of whether it was called from expiry path + [Fact] + public void TryGetExpired_respects_max_deliver() + { + // Go: consumer.go — ScheduleRedelivery checks maxDeliver regardless of call site; + // the expiry redelivery loop calls ScheduleRedelivery so maxDeliver is enforced there too. + // We verify this by directly calling ScheduleRedelivery after the deadline would have passed, + // without relying on wall-clock time. + var ack = new AckProcessor(); + ack.MaxDeliver = 1; + ack.Register(1, ackWaitMs: 5000); + + // Deliveries starts at 1; calling ScheduleRedelivery makes it 2 which exceeds maxDeliver=1 + // This is exactly what the expiry dispatch loop does: read expired seq, call ScheduleRedelivery + ack.ScheduleRedelivery(1, delayMs: 1); + + ack.PendingCount.ShouldBe(0); + ack.ExceededCount.ShouldBe(1); + } + + // Test 8: MaxDeliver=-1 is treated as unlimited (negative values clamped to 0) + [Fact] + public void MaxDeliver_negative_treated_as_unlimited() + { + // Go: consumer.go — any non-positive maxDeliver is treated as unlimited + var ack = new AckProcessor(); + ack.MaxDeliver = -1; + ack.Register(1, ackWaitMs: 5000); + + for (var i = 0; i < 50; i++) + ack.ScheduleRedelivery(1, delayMs: 1); + + ack.PendingCount.ShouldBe(1); + ack.ExceededCount.ShouldBe(0); + ack.MaxDeliver.ShouldBe(0); // -1 clamped to 0 + } + + // Test 9: ExceededPolicy defaults to Drop + [Fact] + public void DeliveryExceededPolicy_defaults_to_drop() + { + // Go: consumer.go — default behavior for exceeded messages is to drop them + var ack = new AckProcessor(); + + ack.ExceededPolicy.ShouldBe(DeliveryExceededPolicy.Drop); + } + + // Test 10: Multiple independent sequences can each exceed max deliveries + [Fact] + public void Multiple_sequences_can_exceed() + { + // Go: consumer.go — each sequence tracked independently; multiple can exceed in same window + var ack = new AckProcessor(); + ack.MaxDeliver = 1; + ack.Register(1, ackWaitMs: 5000); + ack.Register(2, ackWaitMs: 5000); + ack.Register(3, ackWaitMs: 5000); + + // Each sequence starts at Deliveries=1; one call makes each exceed maxDeliver=1 + ack.ScheduleRedelivery(1, delayMs: 1); + ack.ScheduleRedelivery(2, delayMs: 1); + ack.ScheduleRedelivery(3, delayMs: 1); + + ack.ExceededCount.ShouldBe(3); + ack.PendingCount.ShouldBe(0); + ack.TerminatedCount.ShouldBe(3); + + var exceeded = ack.GetExceededSequences(); + exceeded.ShouldContain((ulong)1); + exceeded.ShouldContain((ulong)2); + exceeded.ShouldContain((ulong)3); + } +}