feat(consumer): enhance AckProcessor with NAK delay, TERM, WPI, and maxAckPending
Add RedeliveryTracker constructor overload, Register(ulong, string) for tracker-based ack-wait, ProcessAck(ulong) payload-free overload, GetDeadline, CanRegister for maxAckPending enforcement, ParseAckType static parser, and AckType enum. All existing API signatures are preserved; 9 new tests added in AckProcessorEnhancedTests.cs with no regressions to existing 10 tests.
This commit is contained in:
@@ -1,6 +1,9 @@
|
||||
// Go: consumer.go (processAckMsg, processNak, processTerm, processAckProgress)
|
||||
namespace NATS.Server.JetStream.Consumers;
|
||||
|
||||
// Go: consumer.go:2550 — ack type prefix constants (+ACK, -NAK, +TERM, +WPI)
|
||||
public enum AckType { Ack, Nak, Term, Progress, Unknown }
|
||||
|
||||
public sealed class AckProcessor
|
||||
{
|
||||
// Go: consumer.go — ackTerminatedFlag marks sequences that must not be redelivered
|
||||
@@ -9,6 +12,13 @@ public sealed class AckProcessor
|
||||
private readonly int[]? _backoffMs;
|
||||
private int _ackWaitMs;
|
||||
|
||||
// Fields for the RedeliveryTracker constructor overload
|
||||
private readonly RedeliveryTracker? _tracker;
|
||||
private readonly int _maxAckPending;
|
||||
|
||||
// Stores deliver subjects keyed by sequence for tracker-based registrations
|
||||
private readonly Dictionary<ulong, string>? _deliverSubjects;
|
||||
|
||||
public ulong AckFloor { get; private set; }
|
||||
public int TerminatedCount { get; private set; }
|
||||
|
||||
@@ -17,6 +27,15 @@ public sealed class AckProcessor
|
||||
_backoffMs = backoffMs;
|
||||
}
|
||||
|
||||
// Go: consumer.go — ConsumerConfig maxAckPending + RedeliveryTracker integration
|
||||
public AckProcessor(RedeliveryTracker tracker, int maxAckPending = 0)
|
||||
{
|
||||
_tracker = tracker;
|
||||
_maxAckPending = maxAckPending;
|
||||
_deliverSubjects = new Dictionary<ulong, string>();
|
||||
_backoffMs = null;
|
||||
}
|
||||
|
||||
public void Register(ulong sequence, int ackWaitMs)
|
||||
{
|
||||
if (sequence <= AckFloor)
|
||||
@@ -34,6 +53,52 @@ public sealed class AckProcessor
|
||||
};
|
||||
}
|
||||
|
||||
// Go: consumer.go — register with deliver subject; ackWait comes from the tracker
|
||||
public void Register(ulong sequence, string deliverSubject)
|
||||
{
|
||||
if (_tracker is null)
|
||||
throw new InvalidOperationException("Register(ulong, string) requires a RedeliveryTracker constructor.");
|
||||
|
||||
var ackWaitMs = (int)Math.Max(_tracker.GetBackoffDelay(1), 1);
|
||||
Register(sequence, ackWaitMs);
|
||||
|
||||
if (_deliverSubjects is not null)
|
||||
_deliverSubjects[sequence] = deliverSubject;
|
||||
}
|
||||
|
||||
// Go: consumer.go — processAck without payload: plain +ACK, also notifies tracker
|
||||
public void ProcessAck(ulong seq)
|
||||
{
|
||||
AckSequence(seq);
|
||||
_tracker?.Acknowledge(seq);
|
||||
}
|
||||
|
||||
// Go: consumer.go — returns ack deadline for a pending sequence; MinValue if not tracked
|
||||
public DateTimeOffset GetDeadline(ulong seq)
|
||||
{
|
||||
if (_pending.TryGetValue(seq, out var state))
|
||||
return new DateTimeOffset(state.DeadlineUtc, TimeSpan.Zero);
|
||||
|
||||
return DateTimeOffset.MinValue;
|
||||
}
|
||||
|
||||
// Go: consumer.go — maxAckPending=0 means unlimited; otherwise cap pending registrations
|
||||
public bool CanRegister() => _maxAckPending <= 0 || _pending.Count < _maxAckPending;
|
||||
|
||||
// Go: consumer.go:2550 — parse ack type prefix from raw payload bytes
|
||||
public static AckType ParseAckType(ReadOnlySpan<byte> data)
|
||||
{
|
||||
if (data.StartsWith("+ACK"u8))
|
||||
return AckType.Ack;
|
||||
if (data.StartsWith("-NAK"u8))
|
||||
return AckType.Nak;
|
||||
if (data.StartsWith("+TERM"u8))
|
||||
return AckType.Term;
|
||||
if (data.StartsWith("+WPI"u8))
|
||||
return AckType.Progress;
|
||||
return AckType.Unknown;
|
||||
}
|
||||
|
||||
public bool TryGetExpired(out ulong sequence, out int deliveries)
|
||||
{
|
||||
foreach (var (seq, state) in _pending)
|
||||
|
||||
@@ -0,0 +1,118 @@
|
||||
using NATS.Server.JetStream.Consumers;
|
||||
|
||||
namespace NATS.Server.Tests.JetStream.Consumers;
|
||||
|
||||
/// <summary>
|
||||
/// Tests for enhanced AckProcessor with RedeliveryTracker integration.
|
||||
/// Go reference: consumer.go:4854 (processInboundAcks).
|
||||
/// </summary>
|
||||
public class AckProcessorEnhancedTests
|
||||
{
|
||||
[Fact]
|
||||
public void ProcessAck_removes_from_pending()
|
||||
{
|
||||
var tracker = new RedeliveryTracker(maxDeliveries: 5, ackWaitMs: 30000);
|
||||
var processor = new AckProcessor(tracker);
|
||||
|
||||
processor.Register(1, "deliver.subj");
|
||||
processor.PendingCount.ShouldBe(1);
|
||||
|
||||
processor.ProcessAck(1);
|
||||
processor.PendingCount.ShouldBe(0);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ProcessNak_schedules_redelivery()
|
||||
{
|
||||
var tracker = new RedeliveryTracker(maxDeliveries: 5, ackWaitMs: 30000);
|
||||
var processor = new AckProcessor(tracker);
|
||||
|
||||
processor.Register(1, "deliver.subj");
|
||||
processor.ProcessNak(1, delayMs: 500);
|
||||
|
||||
processor.PendingCount.ShouldBe(1); // still pending until redelivered
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ProcessTerm_removes_permanently()
|
||||
{
|
||||
var tracker = new RedeliveryTracker(maxDeliveries: 5, ackWaitMs: 30000);
|
||||
var processor = new AckProcessor(tracker);
|
||||
|
||||
processor.Register(1, "deliver.subj");
|
||||
processor.ProcessTerm(1);
|
||||
|
||||
processor.PendingCount.ShouldBe(0);
|
||||
processor.TerminatedCount.ShouldBe(1);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ProcessProgress_resets_deadline_to_full_ack_wait()
|
||||
{
|
||||
// Go: consumer.go — processAckProgress (+WPI): resets deadline to UtcNow + ackWait
|
||||
// Verify the invariant: after ProcessProgress, the deadline is strictly in the future
|
||||
// by at least (ackWait - epsilon) milliseconds, without relying on wall-clock delays.
|
||||
var ackWaitMs = 1000;
|
||||
var tracker = new RedeliveryTracker(maxDeliveries: 5, ackWaitMs: ackWaitMs);
|
||||
var processor = new AckProcessor(tracker);
|
||||
|
||||
processor.Register(1, "deliver.subj");
|
||||
|
||||
var before = DateTimeOffset.UtcNow;
|
||||
processor.ProcessProgress(1);
|
||||
var after = DateTimeOffset.UtcNow;
|
||||
|
||||
var deadline = processor.GetDeadline(1);
|
||||
|
||||
// Deadline must be at least (before + ackWait) and at most (after + ackWait + epsilon)
|
||||
deadline.ShouldBeGreaterThanOrEqualTo(before.AddMilliseconds(ackWaitMs));
|
||||
deadline.ShouldBeLessThanOrEqualTo(after.AddMilliseconds(ackWaitMs + 50));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void MaxAckPending_blocks_new_registrations()
|
||||
{
|
||||
var tracker = new RedeliveryTracker(maxDeliveries: 5, ackWaitMs: 30000);
|
||||
var processor = new AckProcessor(tracker, maxAckPending: 2);
|
||||
|
||||
processor.Register(1, "d.1");
|
||||
processor.Register(2, "d.2");
|
||||
processor.CanRegister().ShouldBeFalse();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void CanRegister_true_when_unlimited()
|
||||
{
|
||||
var tracker = new RedeliveryTracker(maxDeliveries: 5, ackWaitMs: 30000);
|
||||
var processor = new AckProcessor(tracker); // maxAckPending=0 means unlimited
|
||||
|
||||
processor.Register(1, "d.1");
|
||||
processor.CanRegister().ShouldBeTrue();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ParseAckType_identifies_all_types()
|
||||
{
|
||||
AckProcessor.ParseAckType("+ACK"u8).ShouldBe(AckType.Ack);
|
||||
AckProcessor.ParseAckType("-NAK"u8).ShouldBe(AckType.Nak);
|
||||
AckProcessor.ParseAckType("+TERM"u8).ShouldBe(AckType.Term);
|
||||
AckProcessor.ParseAckType("+WPI"u8).ShouldBe(AckType.Progress);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ParseAckType_returns_unknown_for_invalid()
|
||||
{
|
||||
AckProcessor.ParseAckType("GARBAGE"u8).ShouldBe(AckType.Unknown);
|
||||
AckProcessor.ParseAckType(""u8).ShouldBe(AckType.Unknown);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void GetDeadline_returns_min_for_unknown_sequence()
|
||||
{
|
||||
var tracker = new RedeliveryTracker(maxDeliveries: 5, ackWaitMs: 1000);
|
||||
var processor = new AckProcessor(tracker);
|
||||
|
||||
// Unknown sequence should return DateTimeOffset.MinValue
|
||||
processor.GetDeadline(999).ShouldBe(DateTimeOffset.MinValue);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user