From 3183fd2dc7883086c53e59916fe5f3ffffb1ff15 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Wed, 25 Feb 2026 02:15:46 -0500 Subject: [PATCH] feat(consumer): enhance AckProcessor with NAK delay, TERM, WPI, and maxAckPending Add RedeliveryTracker constructor overload, Register(ulong, string) for tracker-based ack-wait, ProcessAck(ulong) payload-free overload, GetDeadline, CanRegister for maxAckPending enforcement, ParseAckType static parser, and AckType enum. All existing API signatures are preserved; 9 new tests added in AckProcessorEnhancedTests.cs with no regressions to existing 10 tests. --- .../JetStream/Consumers/AckProcessor.cs | 65 ++++++++++ .../Consumers/AckProcessorEnhancedTests.cs | 118 ++++++++++++++++++ 2 files changed, 183 insertions(+) create mode 100644 tests/NATS.Server.Tests/JetStream/Consumers/AckProcessorEnhancedTests.cs diff --git a/src/NATS.Server/JetStream/Consumers/AckProcessor.cs b/src/NATS.Server/JetStream/Consumers/AckProcessor.cs index ef8b96e..7c26e02 100644 --- a/src/NATS.Server/JetStream/Consumers/AckProcessor.cs +++ b/src/NATS.Server/JetStream/Consumers/AckProcessor.cs @@ -1,6 +1,9 @@ // Go: consumer.go (processAckMsg, processNak, processTerm, processAckProgress) namespace NATS.Server.JetStream.Consumers; +// Go: consumer.go:2550 — ack type prefix constants (+ACK, -NAK, +TERM, +WPI) +public enum AckType { Ack, Nak, Term, Progress, Unknown } + public sealed class AckProcessor { // Go: consumer.go — ackTerminatedFlag marks sequences that must not be redelivered @@ -9,6 +12,13 @@ public sealed class AckProcessor private readonly int[]? _backoffMs; private int _ackWaitMs; + // Fields for the RedeliveryTracker constructor overload + private readonly RedeliveryTracker? _tracker; + private readonly int _maxAckPending; + + // Stores deliver subjects keyed by sequence for tracker-based registrations + private readonly Dictionary? _deliverSubjects; + public ulong AckFloor { get; private set; } public int TerminatedCount { get; private set; } @@ -17,6 +27,15 @@ public sealed class AckProcessor _backoffMs = backoffMs; } + // Go: consumer.go — ConsumerConfig maxAckPending + RedeliveryTracker integration + public AckProcessor(RedeliveryTracker tracker, int maxAckPending = 0) + { + _tracker = tracker; + _maxAckPending = maxAckPending; + _deliverSubjects = new Dictionary(); + _backoffMs = null; + } + public void Register(ulong sequence, int ackWaitMs) { if (sequence <= AckFloor) @@ -34,6 +53,52 @@ public sealed class AckProcessor }; } + // Go: consumer.go — register with deliver subject; ackWait comes from the tracker + public void Register(ulong sequence, string deliverSubject) + { + if (_tracker is null) + throw new InvalidOperationException("Register(ulong, string) requires a RedeliveryTracker constructor."); + + var ackWaitMs = (int)Math.Max(_tracker.GetBackoffDelay(1), 1); + Register(sequence, ackWaitMs); + + if (_deliverSubjects is not null) + _deliverSubjects[sequence] = deliverSubject; + } + + // Go: consumer.go — processAck without payload: plain +ACK, also notifies tracker + public void ProcessAck(ulong seq) + { + AckSequence(seq); + _tracker?.Acknowledge(seq); + } + + // Go: consumer.go — returns ack deadline for a pending sequence; MinValue if not tracked + public DateTimeOffset GetDeadline(ulong seq) + { + if (_pending.TryGetValue(seq, out var state)) + return new DateTimeOffset(state.DeadlineUtc, TimeSpan.Zero); + + return DateTimeOffset.MinValue; + } + + // Go: consumer.go — maxAckPending=0 means unlimited; otherwise cap pending registrations + public bool CanRegister() => _maxAckPending <= 0 || _pending.Count < _maxAckPending; + + // Go: consumer.go:2550 — parse ack type prefix from raw payload bytes + public static AckType ParseAckType(ReadOnlySpan data) + { + if (data.StartsWith("+ACK"u8)) + return AckType.Ack; + if (data.StartsWith("-NAK"u8)) + return AckType.Nak; + if (data.StartsWith("+TERM"u8)) + return AckType.Term; + if (data.StartsWith("+WPI"u8)) + return AckType.Progress; + return AckType.Unknown; + } + public bool TryGetExpired(out ulong sequence, out int deliveries) { foreach (var (seq, state) in _pending) diff --git a/tests/NATS.Server.Tests/JetStream/Consumers/AckProcessorEnhancedTests.cs b/tests/NATS.Server.Tests/JetStream/Consumers/AckProcessorEnhancedTests.cs new file mode 100644 index 0000000..de50bb2 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/Consumers/AckProcessorEnhancedTests.cs @@ -0,0 +1,118 @@ +using NATS.Server.JetStream.Consumers; + +namespace NATS.Server.Tests.JetStream.Consumers; + +/// +/// Tests for enhanced AckProcessor with RedeliveryTracker integration. +/// Go reference: consumer.go:4854 (processInboundAcks). +/// +public class AckProcessorEnhancedTests +{ + [Fact] + public void ProcessAck_removes_from_pending() + { + var tracker = new RedeliveryTracker(maxDeliveries: 5, ackWaitMs: 30000); + var processor = new AckProcessor(tracker); + + processor.Register(1, "deliver.subj"); + processor.PendingCount.ShouldBe(1); + + processor.ProcessAck(1); + processor.PendingCount.ShouldBe(0); + } + + [Fact] + public void ProcessNak_schedules_redelivery() + { + var tracker = new RedeliveryTracker(maxDeliveries: 5, ackWaitMs: 30000); + var processor = new AckProcessor(tracker); + + processor.Register(1, "deliver.subj"); + processor.ProcessNak(1, delayMs: 500); + + processor.PendingCount.ShouldBe(1); // still pending until redelivered + } + + [Fact] + public void ProcessTerm_removes_permanently() + { + var tracker = new RedeliveryTracker(maxDeliveries: 5, ackWaitMs: 30000); + var processor = new AckProcessor(tracker); + + processor.Register(1, "deliver.subj"); + processor.ProcessTerm(1); + + processor.PendingCount.ShouldBe(0); + processor.TerminatedCount.ShouldBe(1); + } + + [Fact] + public void ProcessProgress_resets_deadline_to_full_ack_wait() + { + // Go: consumer.go — processAckProgress (+WPI): resets deadline to UtcNow + ackWait + // Verify the invariant: after ProcessProgress, the deadline is strictly in the future + // by at least (ackWait - epsilon) milliseconds, without relying on wall-clock delays. + var ackWaitMs = 1000; + var tracker = new RedeliveryTracker(maxDeliveries: 5, ackWaitMs: ackWaitMs); + var processor = new AckProcessor(tracker); + + processor.Register(1, "deliver.subj"); + + var before = DateTimeOffset.UtcNow; + processor.ProcessProgress(1); + var after = DateTimeOffset.UtcNow; + + var deadline = processor.GetDeadline(1); + + // Deadline must be at least (before + ackWait) and at most (after + ackWait + epsilon) + deadline.ShouldBeGreaterThanOrEqualTo(before.AddMilliseconds(ackWaitMs)); + deadline.ShouldBeLessThanOrEqualTo(after.AddMilliseconds(ackWaitMs + 50)); + } + + [Fact] + public void MaxAckPending_blocks_new_registrations() + { + var tracker = new RedeliveryTracker(maxDeliveries: 5, ackWaitMs: 30000); + var processor = new AckProcessor(tracker, maxAckPending: 2); + + processor.Register(1, "d.1"); + processor.Register(2, "d.2"); + processor.CanRegister().ShouldBeFalse(); + } + + [Fact] + public void CanRegister_true_when_unlimited() + { + var tracker = new RedeliveryTracker(maxDeliveries: 5, ackWaitMs: 30000); + var processor = new AckProcessor(tracker); // maxAckPending=0 means unlimited + + processor.Register(1, "d.1"); + processor.CanRegister().ShouldBeTrue(); + } + + [Fact] + public void ParseAckType_identifies_all_types() + { + AckProcessor.ParseAckType("+ACK"u8).ShouldBe(AckType.Ack); + AckProcessor.ParseAckType("-NAK"u8).ShouldBe(AckType.Nak); + AckProcessor.ParseAckType("+TERM"u8).ShouldBe(AckType.Term); + AckProcessor.ParseAckType("+WPI"u8).ShouldBe(AckType.Progress); + } + + [Fact] + public void ParseAckType_returns_unknown_for_invalid() + { + AckProcessor.ParseAckType("GARBAGE"u8).ShouldBe(AckType.Unknown); + AckProcessor.ParseAckType(""u8).ShouldBe(AckType.Unknown); + } + + [Fact] + public void GetDeadline_returns_min_for_unknown_sequence() + { + var tracker = new RedeliveryTracker(maxDeliveries: 5, ackWaitMs: 1000); + var processor = new AckProcessor(tracker); + + // Unknown sequence should return DateTimeOffset.MinValue + processor.GetDeadline(999).ShouldBe(DateTimeOffset.MinValue); + } +}