feat(consumers): add NAK/TERM/PROGRESS ack types with backoff (Go parity)
This commit is contained in:
@@ -1,9 +1,21 @@
|
|||||||
|
// Go: consumer.go (processAckMsg, processNak, processTerm, processAckProgress)
|
||||||
namespace NATS.Server.JetStream.Consumers;
|
namespace NATS.Server.JetStream.Consumers;
|
||||||
|
|
||||||
public sealed class AckProcessor
|
public sealed class AckProcessor
|
||||||
{
|
{
|
||||||
|
// Go: consumer.go — ackTerminatedFlag marks sequences that must not be redelivered
|
||||||
|
private readonly HashSet<ulong> _terminated = new();
|
||||||
private readonly Dictionary<ulong, PendingState> _pending = new();
|
private readonly Dictionary<ulong, PendingState> _pending = new();
|
||||||
|
private readonly int[]? _backoffMs;
|
||||||
|
private int _ackWaitMs;
|
||||||
|
|
||||||
public ulong AckFloor { get; private set; }
|
public ulong AckFloor { get; private set; }
|
||||||
|
public int TerminatedCount { get; private set; }
|
||||||
|
|
||||||
|
public AckProcessor(int[]? backoffMs = null)
|
||||||
|
{
|
||||||
|
_backoffMs = backoffMs;
|
||||||
|
}
|
||||||
|
|
||||||
public void Register(ulong sequence, int ackWaitMs)
|
public void Register(ulong sequence, int ackWaitMs)
|
||||||
{
|
{
|
||||||
@@ -13,6 +25,8 @@ public sealed class AckProcessor
|
|||||||
if (_pending.ContainsKey(sequence))
|
if (_pending.ContainsKey(sequence))
|
||||||
return;
|
return;
|
||||||
|
|
||||||
|
_ackWaitMs = ackWaitMs;
|
||||||
|
|
||||||
_pending[sequence] = new PendingState
|
_pending[sequence] = new PendingState
|
||||||
{
|
{
|
||||||
DeadlineUtc = DateTime.UtcNow.AddMilliseconds(Math.Max(ackWaitMs, 1)),
|
DeadlineUtc = DateTime.UtcNow.AddMilliseconds(Math.Max(ackWaitMs, 1)),
|
||||||
@@ -37,6 +51,120 @@ public sealed class AckProcessor
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Go: consumer.go:2550 (processAck)
|
||||||
|
// Dispatches to the appropriate ack handler based on ack type prefix.
|
||||||
|
// Empty or "+ACK" → ack single; "-NAK" → schedule redelivery; "+TERM" → terminate; "+WPI" → progress reset.
|
||||||
|
public void ProcessAck(ulong seq, ReadOnlySpan<byte> payload)
|
||||||
|
{
|
||||||
|
if (payload.IsEmpty || payload.SequenceEqual("+ACK"u8))
|
||||||
|
{
|
||||||
|
AckSequence(seq);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (payload.StartsWith("-NAK"u8))
|
||||||
|
{
|
||||||
|
// Go: consumer.go — parseNak extracts optional delay from "-NAK {delay}"
|
||||||
|
var delayMs = 0;
|
||||||
|
var rest = payload["-NAK"u8.Length..];
|
||||||
|
if (!rest.IsEmpty && rest[0] == (byte)' ')
|
||||||
|
{
|
||||||
|
var delaySpan = rest[1..];
|
||||||
|
if (TryParseInt(delaySpan, out var parsed))
|
||||||
|
delayMs = parsed;
|
||||||
|
}
|
||||||
|
ProcessNak(seq, delayMs);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (payload.StartsWith("+TERM"u8))
|
||||||
|
{
|
||||||
|
ProcessTerm(seq);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (payload.StartsWith("+WPI"u8))
|
||||||
|
{
|
||||||
|
ProcessProgress(seq);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unknown ack type — treat as plain ack per Go behavior
|
||||||
|
AckSequence(seq);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Go: consumer.go — processAck for "+ACK": removes from pending and advances AckFloor when contiguous
|
||||||
|
public void AckSequence(ulong seq)
|
||||||
|
{
|
||||||
|
_pending.Remove(seq);
|
||||||
|
_terminated.Remove(seq);
|
||||||
|
|
||||||
|
// Advance floor while the next-in-order sequences are no longer pending
|
||||||
|
if (seq == AckFloor + 1)
|
||||||
|
{
|
||||||
|
AckFloor = seq;
|
||||||
|
while (_pending.Count > 0)
|
||||||
|
{
|
||||||
|
var next = AckFloor + 1;
|
||||||
|
if (_pending.ContainsKey(next))
|
||||||
|
break;
|
||||||
|
// Only advance if next is definitely below any pending sequence
|
||||||
|
// Stop when we hit a gap or run out of sequences to check
|
||||||
|
if (!HasSequenceBelow(next))
|
||||||
|
break;
|
||||||
|
AckFloor = next;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Go: consumer.go — processNak: schedules redelivery with optional explicit delay or backoff array
|
||||||
|
public void ProcessNak(ulong seq, int delayMs = 0)
|
||||||
|
{
|
||||||
|
if (_terminated.Contains(seq))
|
||||||
|
return;
|
||||||
|
|
||||||
|
if (!_pending.TryGetValue(seq, out var state))
|
||||||
|
return;
|
||||||
|
|
||||||
|
int effectiveDelay;
|
||||||
|
if (delayMs > 0)
|
||||||
|
{
|
||||||
|
effectiveDelay = delayMs;
|
||||||
|
}
|
||||||
|
else if (_backoffMs is { Length: > 0 })
|
||||||
|
{
|
||||||
|
// Go: consumer.go — backoff array clamps at last entry for high delivery counts
|
||||||
|
var idx = Math.Min(state.Deliveries - 1, _backoffMs.Length - 1);
|
||||||
|
effectiveDelay = _backoffMs[idx];
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
effectiveDelay = Math.Max(_ackWaitMs, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
ScheduleRedelivery(seq, effectiveDelay);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Go: consumer.go — processTerm: removes from pending permanently; sequence is never redelivered
|
||||||
|
public void ProcessTerm(ulong seq)
|
||||||
|
{
|
||||||
|
if (_pending.Remove(seq))
|
||||||
|
{
|
||||||
|
_terminated.Add(seq);
|
||||||
|
TerminatedCount++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Go: consumer.go — processAckProgress (+WPI): resets ack deadline to original ackWait without bumping delivery count
|
||||||
|
public void ProcessProgress(ulong seq)
|
||||||
|
{
|
||||||
|
if (!_pending.TryGetValue(seq, out var state))
|
||||||
|
return;
|
||||||
|
|
||||||
|
state.DeadlineUtc = DateTime.UtcNow.AddMilliseconds(Math.Max(_ackWaitMs, 1));
|
||||||
|
_pending[seq] = state;
|
||||||
|
}
|
||||||
|
|
||||||
public void ScheduleRedelivery(ulong sequence, int delayMs)
|
public void ScheduleRedelivery(ulong sequence, int delayMs)
|
||||||
{
|
{
|
||||||
if (!_pending.TryGetValue(sequence, out var state))
|
if (!_pending.TryGetValue(sequence, out var state))
|
||||||
@@ -64,6 +192,31 @@ public sealed class AckProcessor
|
|||||||
AckFloor = sequence;
|
AckFloor = sequence;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private bool HasSequenceBelow(ulong upTo)
|
||||||
|
{
|
||||||
|
foreach (var key in _pending.Keys)
|
||||||
|
{
|
||||||
|
if (key < upTo)
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static bool TryParseInt(ReadOnlySpan<byte> span, out int value)
|
||||||
|
{
|
||||||
|
value = 0;
|
||||||
|
if (span.IsEmpty)
|
||||||
|
return false;
|
||||||
|
|
||||||
|
foreach (var b in span)
|
||||||
|
{
|
||||||
|
if (b < (byte)'0' || b > (byte)'9')
|
||||||
|
return false;
|
||||||
|
value = value * 10 + (b - '0');
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
private sealed class PendingState
|
private sealed class PendingState
|
||||||
{
|
{
|
||||||
public DateTime DeadlineUtc { get; set; }
|
public DateTime DeadlineUtc { get; set; }
|
||||||
|
|||||||
@@ -0,0 +1,185 @@
|
|||||||
|
// Go: consumer.go:2550 (processAckMsg, processNak, processTerm, processAckProgress)
|
||||||
|
using NATS.Server.JetStream.Consumers;
|
||||||
|
|
||||||
|
namespace NATS.Server.Tests.JetStream.Consumers;
|
||||||
|
|
||||||
|
public class AckProcessorNakTests
|
||||||
|
{
|
||||||
|
// Test 1: ProcessAck with empty payload acks the sequence
|
||||||
|
[Fact]
|
||||||
|
public void ProcessAck_empty_payload_acks_sequence()
|
||||||
|
{
|
||||||
|
// Go: consumer.go — empty ack payload treated as "+ACK"
|
||||||
|
var ack = new AckProcessor();
|
||||||
|
ack.Register(1, ackWaitMs: 5000);
|
||||||
|
|
||||||
|
ack.ProcessAck(1, ReadOnlySpan<byte>.Empty);
|
||||||
|
|
||||||
|
ack.PendingCount.ShouldBe(0);
|
||||||
|
ack.AckFloor.ShouldBe((ulong)1);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test 2: ProcessAck with -NAK schedules redelivery
|
||||||
|
[Fact]
|
||||||
|
public async Task ProcessAck_nak_payload_schedules_redelivery()
|
||||||
|
{
|
||||||
|
// Go: consumer.go — "-NAK" triggers rescheduled redelivery
|
||||||
|
var ack = new AckProcessor();
|
||||||
|
ack.Register(1, ackWaitMs: 5000);
|
||||||
|
|
||||||
|
ack.ProcessAck(1, "-NAK"u8);
|
||||||
|
|
||||||
|
// Should still be pending (redelivery scheduled)
|
||||||
|
ack.PendingCount.ShouldBe(1);
|
||||||
|
|
||||||
|
// Should expire quickly (using ackWait fallback of 5000ms — verify it is still pending now)
|
||||||
|
ack.TryGetExpired(out _, out _).ShouldBeFalse();
|
||||||
|
|
||||||
|
await Task.CompletedTask;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test 3: ProcessAck with -NAK {delay} uses custom delay
|
||||||
|
[Fact]
|
||||||
|
public async Task ProcessAck_nak_with_delay_uses_custom_delay()
|
||||||
|
{
|
||||||
|
// Go: consumer.go — "-NAK {delay}" parses optional explicit delay in milliseconds
|
||||||
|
var ack = new AckProcessor();
|
||||||
|
ack.Register(1, ackWaitMs: 5000);
|
||||||
|
|
||||||
|
ack.ProcessAck(1, "-NAK 1"u8);
|
||||||
|
|
||||||
|
// Sequence still pending
|
||||||
|
ack.PendingCount.ShouldBe(1);
|
||||||
|
|
||||||
|
// With a 1ms delay, should expire quickly
|
||||||
|
await Task.Delay(10);
|
||||||
|
ack.TryGetExpired(out var seq, out _).ShouldBeTrue();
|
||||||
|
seq.ShouldBe((ulong)1);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test 4: ProcessAck with +TERM removes from pending
|
||||||
|
[Fact]
|
||||||
|
public void ProcessAck_term_removes_from_pending()
|
||||||
|
{
|
||||||
|
// Go: consumer.go — "+TERM" permanently terminates delivery; sequence never redelivered
|
||||||
|
var ack = new AckProcessor();
|
||||||
|
ack.Register(1, ackWaitMs: 5000);
|
||||||
|
|
||||||
|
ack.ProcessAck(1, "+TERM"u8);
|
||||||
|
|
||||||
|
ack.PendingCount.ShouldBe(0);
|
||||||
|
ack.HasPending.ShouldBeFalse();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test 5: ProcessAck with +WPI resets deadline without incrementing delivery count
|
||||||
|
[Fact]
|
||||||
|
public async Task ProcessAck_wpi_resets_deadline_without_incrementing_deliveries()
|
||||||
|
{
|
||||||
|
// Go: consumer.go — "+WPI" resets ack deadline; delivery count must not change
|
||||||
|
var ack = new AckProcessor();
|
||||||
|
ack.Register(1, ackWaitMs: 10);
|
||||||
|
|
||||||
|
// Wait for the deadline to approach, then reset it via progress
|
||||||
|
await Task.Delay(5);
|
||||||
|
ack.ProcessAck(1, "+WPI"u8);
|
||||||
|
|
||||||
|
// Deadline was just reset — should not be expired yet
|
||||||
|
ack.TryGetExpired(out _, out var deliveries).ShouldBeFalse();
|
||||||
|
|
||||||
|
// Deliveries count must remain at 1 (not incremented by WPI)
|
||||||
|
deliveries.ShouldBe(0);
|
||||||
|
|
||||||
|
// Sequence still pending
|
||||||
|
ack.PendingCount.ShouldBe(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test 6: Backoff array applies correct delay per redelivery attempt
|
||||||
|
[Fact]
|
||||||
|
public async Task ProcessNak_backoff_array_applies_delay_by_delivery_count()
|
||||||
|
{
|
||||||
|
// Go: consumer.go — backoff array indexes by (deliveries - 1)
|
||||||
|
var ack = new AckProcessor(backoffMs: [1, 50, 5000]);
|
||||||
|
ack.Register(1, ackWaitMs: 5000);
|
||||||
|
|
||||||
|
// First NAK — delivery count is 1 → backoff[0] = 1ms
|
||||||
|
ack.ProcessNak(1);
|
||||||
|
|
||||||
|
await Task.Delay(10);
|
||||||
|
ack.TryGetExpired(out _, out _).ShouldBeTrue();
|
||||||
|
|
||||||
|
// Now delivery count is 2 → backoff[1] = 50ms
|
||||||
|
ack.ProcessNak(1);
|
||||||
|
ack.TryGetExpired(out _, out _).ShouldBeFalse();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test 7: Backoff array clamps at last entry for high delivery counts
|
||||||
|
[Fact]
|
||||||
|
public async Task ProcessNak_backoff_clamps_at_last_entry_for_high_delivery_count()
|
||||||
|
{
|
||||||
|
// Go: consumer.go — backoff index clamped to backoff.Length-1 when deliveries exceed array size
|
||||||
|
var ack = new AckProcessor(backoffMs: [1, 2]);
|
||||||
|
ack.Register(1, ackWaitMs: 5000);
|
||||||
|
|
||||||
|
// Drive deliveries up: NAK twice to advance delivery count past array length
|
||||||
|
ack.ProcessNak(1); // deliveries becomes 2 (index 1 = 2ms)
|
||||||
|
await Task.Delay(10);
|
||||||
|
ack.TryGetExpired(out _, out _).ShouldBeTrue();
|
||||||
|
|
||||||
|
ack.ProcessNak(1); // deliveries becomes 3 (index clamps to 1 = 2ms)
|
||||||
|
await Task.Delay(10);
|
||||||
|
ack.TryGetExpired(out var seq, out _).ShouldBeTrue();
|
||||||
|
seq.ShouldBe((ulong)1);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test 8: AckSequence advances AckFloor when contiguous
|
||||||
|
[Fact]
|
||||||
|
public void AckSequence_advances_ackfloor_for_contiguous_sequences()
|
||||||
|
{
|
||||||
|
// Go: consumer.go — acking contiguous sequences from floor advances AckFloor monotonically
|
||||||
|
var ack = new AckProcessor();
|
||||||
|
ack.Register(1, ackWaitMs: 5000);
|
||||||
|
ack.Register(2, ackWaitMs: 5000);
|
||||||
|
ack.Register(3, ackWaitMs: 5000);
|
||||||
|
|
||||||
|
ack.AckSequence(1);
|
||||||
|
ack.AckFloor.ShouldBe((ulong)1);
|
||||||
|
|
||||||
|
ack.AckSequence(2);
|
||||||
|
ack.AckFloor.ShouldBe((ulong)2);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test 9: ProcessTerm increments TerminatedCount
|
||||||
|
[Fact]
|
||||||
|
public void ProcessTerm_increments_terminated_count()
|
||||||
|
{
|
||||||
|
// Go: consumer.go — terminated sequences tracked separately from acked sequences
|
||||||
|
var ack = new AckProcessor();
|
||||||
|
ack.Register(1, ackWaitMs: 5000);
|
||||||
|
ack.Register(2, ackWaitMs: 5000);
|
||||||
|
|
||||||
|
ack.TerminatedCount.ShouldBe(0);
|
||||||
|
|
||||||
|
ack.ProcessTerm(1);
|
||||||
|
ack.TerminatedCount.ShouldBe(1);
|
||||||
|
|
||||||
|
ack.ProcessTerm(2);
|
||||||
|
ack.TerminatedCount.ShouldBe(2);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test 10: NAK after TERM is ignored (sequence already terminated)
|
||||||
|
[Fact]
|
||||||
|
public void ProcessNak_after_term_is_ignored()
|
||||||
|
{
|
||||||
|
// Go: consumer.go — once terminated, a sequence cannot be rescheduled via NAK
|
||||||
|
var ack = new AckProcessor(backoffMs: [1]);
|
||||||
|
ack.Register(1, ackWaitMs: 5000);
|
||||||
|
|
||||||
|
ack.ProcessTerm(1);
|
||||||
|
ack.PendingCount.ShouldBe(0);
|
||||||
|
|
||||||
|
// Attempting to NAK a terminated sequence has no effect
|
||||||
|
ack.ProcessNak(1);
|
||||||
|
ack.PendingCount.ShouldBe(0);
|
||||||
|
ack.TerminatedCount.ShouldBe(1);
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user