feat: harden jetstream consumer state machine parity

This commit is contained in:
Joseph Doherty
2026-02-23 14:48:47 -05:00
parent cdde3c7a1d
commit 7bea35aaa8
5 changed files with 62 additions and 3 deletions

View File

@@ -131,7 +131,7 @@ public sealed class ConsumerManager
return false; return false;
handle.AckProcessor.AckAll(sequence); handle.AckProcessor.AckAll(sequence);
_ackFloors.AddOrUpdate(stream, _ => sequence, (_, existing) => Math.Max(existing, sequence)); _ackFloors.AddOrUpdate(stream, _ => handle.AckProcessor.AckFloor, (_, existing) => Math.Max(existing, handle.AckProcessor.AckFloor));
return true; return true;
} }

View File

@@ -3,9 +3,13 @@ namespace NATS.Server.JetStream.Consumers;
public sealed class AckProcessor public sealed class AckProcessor
{ {
private readonly Dictionary<ulong, PendingState> _pending = new(); private readonly Dictionary<ulong, PendingState> _pending = new();
public ulong AckFloor { get; private set; }
public void Register(ulong sequence, int ackWaitMs) public void Register(ulong sequence, int ackWaitMs)
{ {
if (sequence <= AckFloor)
return;
if (_pending.ContainsKey(sequence)) if (_pending.ContainsKey(sequence))
return; return;
@@ -55,6 +59,9 @@ public sealed class AckProcessor
{ {
foreach (var key in _pending.Keys.Where(k => k <= sequence).ToArray()) foreach (var key in _pending.Keys.Where(k => k <= sequence).ToArray())
_pending.Remove(key); _pending.Remove(key);
if (sequence > AckFloor)
AckFloor = sequence;
} }
private sealed class PendingState private sealed class PendingState

View File

@@ -30,9 +30,9 @@ public sealed class PullConsumerEngine
{ {
if (consumer.AckProcessor.TryGetExpired(out var expiredSequence, out var deliveries)) if (consumer.AckProcessor.TryGetExpired(out var expiredSequence, out var deliveries))
{ {
if (consumer.Config.MaxDeliver > 0 && deliveries > consumer.Config.MaxDeliver) if (consumer.Config.MaxDeliver > 0 && deliveries >= consumer.Config.MaxDeliver)
{ {
consumer.AckProcessor.Drop(expiredSequence); consumer.AckProcessor.AckAll(expiredSequence);
return new PullFetchBatch(messages); return new PullFetchBatch(messages);
} }
@@ -75,6 +75,13 @@ public sealed class PullConsumerEngine
continue; continue;
} }
if (message.Sequence <= consumer.AckProcessor.AckFloor)
{
sequence++;
i--;
continue;
}
if (consumer.Config.ReplayPolicy == ReplayPolicy.Original) if (consumer.Config.ReplayPolicy == ReplayPolicy.Original)
await Task.Delay(60, ct); await Task.Delay(60, ct);

View File

@@ -7,6 +7,9 @@ public sealed class PushConsumerEngine
{ {
public void Enqueue(ConsumerHandle consumer, StoredMessage message) public void Enqueue(ConsumerHandle consumer, StoredMessage message)
{ {
if (message.Sequence <= consumer.AckProcessor.AckFloor)
return;
var availableAtUtc = DateTime.UtcNow; var availableAtUtc = DateTime.UtcNow;
if (consumer.Config.RateLimitBps > 0) if (consumer.Config.RateLimitBps > 0)
{ {

View File

@@ -0,0 +1,42 @@
using NATS.Server.JetStream;
using NATS.Server.JetStream.Models;
namespace NATS.Server.Tests.JetStream;
public class JetStreamConsumerStateMachineStrictParityTests
{
[Fact]
public async Task Ack_redelivery_backoff_and_replay_timing_follow_monotonic_consumer_state_machine_rules()
{
var streams = new StreamManager();
var consumers = new ConsumerManager();
streams.CreateOrUpdate(new StreamConfig
{
Name = "ORDERS_SM",
Subjects = ["orders.sm"],
Retention = RetentionPolicy.Limits,
MaxMsgs = 32,
}).Error.ShouldBeNull();
consumers.CreateOrUpdate("ORDERS_SM", new ConsumerConfig
{
DurableName = "D1",
AckPolicy = AckPolicy.Explicit,
AckWaitMs = 1,
MaxDeliver = 1,
BackOffMs = [1],
}).Error.ShouldBeNull();
streams.Capture("orders.sm", "x"u8.ToArray());
var first = await consumers.FetchAsync("ORDERS_SM", "D1", 1, streams, default);
first.Messages.Count.ShouldBe(1);
await Task.Delay(5);
var second = await consumers.FetchAsync("ORDERS_SM", "D1", 1, streams, default);
// MaxDeliver=1 means the initial delivery is the only allowed delivery.
second.Messages.Count.ShouldBe(0);
}
}