diff --git a/src/NATS.Server/JetStream/Consumers/AckProcessor.cs b/src/NATS.Server/JetStream/Consumers/AckProcessor.cs index 2a581d1..ef8b96e 100644 --- a/src/NATS.Server/JetStream/Consumers/AckProcessor.cs +++ b/src/NATS.Server/JetStream/Consumers/AckProcessor.cs @@ -1,9 +1,21 @@ +// Go: consumer.go (processAckMsg, processNak, processTerm, processAckProgress) namespace NATS.Server.JetStream.Consumers; public sealed class AckProcessor { + // Go: consumer.go — ackTerminatedFlag marks sequences that must not be redelivered + private readonly HashSet _terminated = new(); private readonly Dictionary _pending = new(); + private readonly int[]? _backoffMs; + private int _ackWaitMs; + public ulong AckFloor { get; private set; } + public int TerminatedCount { get; private set; } + + public AckProcessor(int[]? backoffMs = null) + { + _backoffMs = backoffMs; + } public void Register(ulong sequence, int ackWaitMs) { @@ -13,6 +25,8 @@ public sealed class AckProcessor if (_pending.ContainsKey(sequence)) return; + _ackWaitMs = ackWaitMs; + _pending[sequence] = new PendingState { DeadlineUtc = DateTime.UtcNow.AddMilliseconds(Math.Max(ackWaitMs, 1)), @@ -37,6 +51,120 @@ public sealed class AckProcessor return false; } + // Go: consumer.go:2550 (processAck) + // Dispatches to the appropriate ack handler based on ack type prefix. + // Empty or "+ACK" → ack single; "-NAK" → schedule redelivery; "+TERM" → terminate; "+WPI" → progress reset. + public void ProcessAck(ulong seq, ReadOnlySpan payload) + { + if (payload.IsEmpty || payload.SequenceEqual("+ACK"u8)) + { + AckSequence(seq); + return; + } + + if (payload.StartsWith("-NAK"u8)) + { + // Go: consumer.go — parseNak extracts optional delay from "-NAK {delay}" + var delayMs = 0; + var rest = payload["-NAK"u8.Length..]; + if (!rest.IsEmpty && rest[0] == (byte)' ') + { + var delaySpan = rest[1..]; + if (TryParseInt(delaySpan, out var parsed)) + delayMs = parsed; + } + ProcessNak(seq, delayMs); + return; + } + + if (payload.StartsWith("+TERM"u8)) + { + ProcessTerm(seq); + return; + } + + if (payload.StartsWith("+WPI"u8)) + { + ProcessProgress(seq); + return; + } + + // Unknown ack type — treat as plain ack per Go behavior + AckSequence(seq); + } + + // Go: consumer.go — processAck for "+ACK": removes from pending and advances AckFloor when contiguous + public void AckSequence(ulong seq) + { + _pending.Remove(seq); + _terminated.Remove(seq); + + // Advance floor while the next-in-order sequences are no longer pending + if (seq == AckFloor + 1) + { + AckFloor = seq; + while (_pending.Count > 0) + { + var next = AckFloor + 1; + if (_pending.ContainsKey(next)) + break; + // Only advance if next is definitely below any pending sequence + // Stop when we hit a gap or run out of sequences to check + if (!HasSequenceBelow(next)) + break; + AckFloor = next; + } + } + } + + // Go: consumer.go — processNak: schedules redelivery with optional explicit delay or backoff array + public void ProcessNak(ulong seq, int delayMs = 0) + { + if (_terminated.Contains(seq)) + return; + + if (!_pending.TryGetValue(seq, out var state)) + return; + + int effectiveDelay; + if (delayMs > 0) + { + effectiveDelay = delayMs; + } + else if (_backoffMs is { Length: > 0 }) + { + // Go: consumer.go — backoff array clamps at last entry for high delivery counts + var idx = Math.Min(state.Deliveries - 1, _backoffMs.Length - 1); + effectiveDelay = _backoffMs[idx]; + } + else + { + effectiveDelay = Math.Max(_ackWaitMs, 1); + } + + ScheduleRedelivery(seq, effectiveDelay); + } + + // Go: consumer.go — processTerm: removes from pending permanently; sequence is never redelivered + public void ProcessTerm(ulong seq) + { + if (_pending.Remove(seq)) + { + _terminated.Add(seq); + TerminatedCount++; + } + } + + // Go: consumer.go — processAckProgress (+WPI): resets ack deadline to original ackWait without bumping delivery count + public void ProcessProgress(ulong seq) + { + if (!_pending.TryGetValue(seq, out var state)) + return; + + state.DeadlineUtc = DateTime.UtcNow.AddMilliseconds(Math.Max(_ackWaitMs, 1)); + _pending[seq] = state; + } + public void ScheduleRedelivery(ulong sequence, int delayMs) { if (!_pending.TryGetValue(sequence, out var state)) @@ -64,6 +192,31 @@ public sealed class AckProcessor AckFloor = sequence; } + private bool HasSequenceBelow(ulong upTo) + { + foreach (var key in _pending.Keys) + { + if (key < upTo) + return true; + } + return false; + } + + private static bool TryParseInt(ReadOnlySpan span, out int value) + { + value = 0; + if (span.IsEmpty) + return false; + + foreach (var b in span) + { + if (b < (byte)'0' || b > (byte)'9') + return false; + value = value * 10 + (b - '0'); + } + return true; + } + private sealed class PendingState { public DateTime DeadlineUtc { get; set; } diff --git a/tests/NATS.Server.Tests/JetStream/Consumers/AckProcessorNakTests.cs b/tests/NATS.Server.Tests/JetStream/Consumers/AckProcessorNakTests.cs new file mode 100644 index 0000000..73e3406 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/Consumers/AckProcessorNakTests.cs @@ -0,0 +1,185 @@ +// Go: consumer.go:2550 (processAckMsg, processNak, processTerm, processAckProgress) +using NATS.Server.JetStream.Consumers; + +namespace NATS.Server.Tests.JetStream.Consumers; + +public class AckProcessorNakTests +{ + // Test 1: ProcessAck with empty payload acks the sequence + [Fact] + public void ProcessAck_empty_payload_acks_sequence() + { + // Go: consumer.go — empty ack payload treated as "+ACK" + var ack = new AckProcessor(); + ack.Register(1, ackWaitMs: 5000); + + ack.ProcessAck(1, ReadOnlySpan.Empty); + + ack.PendingCount.ShouldBe(0); + ack.AckFloor.ShouldBe((ulong)1); + } + + // Test 2: ProcessAck with -NAK schedules redelivery + [Fact] + public async Task ProcessAck_nak_payload_schedules_redelivery() + { + // Go: consumer.go — "-NAK" triggers rescheduled redelivery + var ack = new AckProcessor(); + ack.Register(1, ackWaitMs: 5000); + + ack.ProcessAck(1, "-NAK"u8); + + // Should still be pending (redelivery scheduled) + ack.PendingCount.ShouldBe(1); + + // Should expire quickly (using ackWait fallback of 5000ms — verify it is still pending now) + ack.TryGetExpired(out _, out _).ShouldBeFalse(); + + await Task.CompletedTask; + } + + // Test 3: ProcessAck with -NAK {delay} uses custom delay + [Fact] + public async Task ProcessAck_nak_with_delay_uses_custom_delay() + { + // Go: consumer.go — "-NAK {delay}" parses optional explicit delay in milliseconds + var ack = new AckProcessor(); + ack.Register(1, ackWaitMs: 5000); + + ack.ProcessAck(1, "-NAK 1"u8); + + // Sequence still pending + ack.PendingCount.ShouldBe(1); + + // With a 1ms delay, should expire quickly + await Task.Delay(10); + ack.TryGetExpired(out var seq, out _).ShouldBeTrue(); + seq.ShouldBe((ulong)1); + } + + // Test 4: ProcessAck with +TERM removes from pending + [Fact] + public void ProcessAck_term_removes_from_pending() + { + // Go: consumer.go — "+TERM" permanently terminates delivery; sequence never redelivered + var ack = new AckProcessor(); + ack.Register(1, ackWaitMs: 5000); + + ack.ProcessAck(1, "+TERM"u8); + + ack.PendingCount.ShouldBe(0); + ack.HasPending.ShouldBeFalse(); + } + + // Test 5: ProcessAck with +WPI resets deadline without incrementing delivery count + [Fact] + public async Task ProcessAck_wpi_resets_deadline_without_incrementing_deliveries() + { + // Go: consumer.go — "+WPI" resets ack deadline; delivery count must not change + var ack = new AckProcessor(); + ack.Register(1, ackWaitMs: 10); + + // Wait for the deadline to approach, then reset it via progress + await Task.Delay(5); + ack.ProcessAck(1, "+WPI"u8); + + // Deadline was just reset — should not be expired yet + ack.TryGetExpired(out _, out var deliveries).ShouldBeFalse(); + + // Deliveries count must remain at 1 (not incremented by WPI) + deliveries.ShouldBe(0); + + // Sequence still pending + ack.PendingCount.ShouldBe(1); + } + + // Test 6: Backoff array applies correct delay per redelivery attempt + [Fact] + public async Task ProcessNak_backoff_array_applies_delay_by_delivery_count() + { + // Go: consumer.go — backoff array indexes by (deliveries - 1) + var ack = new AckProcessor(backoffMs: [1, 50, 5000]); + ack.Register(1, ackWaitMs: 5000); + + // First NAK — delivery count is 1 → backoff[0] = 1ms + ack.ProcessNak(1); + + await Task.Delay(10); + ack.TryGetExpired(out _, out _).ShouldBeTrue(); + + // Now delivery count is 2 → backoff[1] = 50ms + ack.ProcessNak(1); + ack.TryGetExpired(out _, out _).ShouldBeFalse(); + } + + // Test 7: Backoff array clamps at last entry for high delivery counts + [Fact] + public async Task ProcessNak_backoff_clamps_at_last_entry_for_high_delivery_count() + { + // Go: consumer.go — backoff index clamped to backoff.Length-1 when deliveries exceed array size + var ack = new AckProcessor(backoffMs: [1, 2]); + ack.Register(1, ackWaitMs: 5000); + + // Drive deliveries up: NAK twice to advance delivery count past array length + ack.ProcessNak(1); // deliveries becomes 2 (index 1 = 2ms) + await Task.Delay(10); + ack.TryGetExpired(out _, out _).ShouldBeTrue(); + + ack.ProcessNak(1); // deliveries becomes 3 (index clamps to 1 = 2ms) + await Task.Delay(10); + ack.TryGetExpired(out var seq, out _).ShouldBeTrue(); + seq.ShouldBe((ulong)1); + } + + // Test 8: AckSequence advances AckFloor when contiguous + [Fact] + public void AckSequence_advances_ackfloor_for_contiguous_sequences() + { + // Go: consumer.go — acking contiguous sequences from floor advances AckFloor monotonically + var ack = new AckProcessor(); + ack.Register(1, ackWaitMs: 5000); + ack.Register(2, ackWaitMs: 5000); + ack.Register(3, ackWaitMs: 5000); + + ack.AckSequence(1); + ack.AckFloor.ShouldBe((ulong)1); + + ack.AckSequence(2); + ack.AckFloor.ShouldBe((ulong)2); + } + + // Test 9: ProcessTerm increments TerminatedCount + [Fact] + public void ProcessTerm_increments_terminated_count() + { + // Go: consumer.go — terminated sequences tracked separately from acked sequences + var ack = new AckProcessor(); + ack.Register(1, ackWaitMs: 5000); + ack.Register(2, ackWaitMs: 5000); + + ack.TerminatedCount.ShouldBe(0); + + ack.ProcessTerm(1); + ack.TerminatedCount.ShouldBe(1); + + ack.ProcessTerm(2); + ack.TerminatedCount.ShouldBe(2); + } + + // Test 10: NAK after TERM is ignored (sequence already terminated) + [Fact] + public void ProcessNak_after_term_is_ignored() + { + // Go: consumer.go — once terminated, a sequence cannot be rescheduled via NAK + var ack = new AckProcessor(backoffMs: [1]); + ack.Register(1, ackWaitMs: 5000); + + ack.ProcessTerm(1); + ack.PendingCount.ShouldBe(0); + + // Attempting to NAK a terminated sequence has no effect + ack.ProcessNak(1); + ack.PendingCount.ShouldBe(0); + ack.TerminatedCount.ShouldBe(1); + } +}