diff --git a/src/NATS.Server/JetStream/ConsumerManager.cs b/src/NATS.Server/JetStream/ConsumerManager.cs index b6023b4..4caa8b7 100644 --- a/src/NATS.Server/JetStream/ConsumerManager.cs +++ b/src/NATS.Server/JetStream/ConsumerManager.cs @@ -131,7 +131,7 @@ public sealed class ConsumerManager return false; handle.AckProcessor.AckAll(sequence); - _ackFloors.AddOrUpdate(stream, _ => sequence, (_, existing) => Math.Max(existing, sequence)); + _ackFloors.AddOrUpdate(stream, _ => handle.AckProcessor.AckFloor, (_, existing) => Math.Max(existing, handle.AckProcessor.AckFloor)); return true; } diff --git a/src/NATS.Server/JetStream/Consumers/AckProcessor.cs b/src/NATS.Server/JetStream/Consumers/AckProcessor.cs index 88a4289..2a581d1 100644 --- a/src/NATS.Server/JetStream/Consumers/AckProcessor.cs +++ b/src/NATS.Server/JetStream/Consumers/AckProcessor.cs @@ -3,9 +3,13 @@ namespace NATS.Server.JetStream.Consumers; public sealed class AckProcessor { private readonly Dictionary _pending = new(); + public ulong AckFloor { get; private set; } public void Register(ulong sequence, int ackWaitMs) { + if (sequence <= AckFloor) + return; + if (_pending.ContainsKey(sequence)) return; @@ -55,6 +59,9 @@ public sealed class AckProcessor { foreach (var key in _pending.Keys.Where(k => k <= sequence).ToArray()) _pending.Remove(key); + + if (sequence > AckFloor) + AckFloor = sequence; } private sealed class PendingState diff --git a/src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs b/src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs index a957dd0..7953844 100644 --- a/src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs +++ b/src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs @@ -30,9 +30,9 @@ public sealed class PullConsumerEngine { if (consumer.AckProcessor.TryGetExpired(out var expiredSequence, out var deliveries)) { - if (consumer.Config.MaxDeliver > 0 && deliveries > consumer.Config.MaxDeliver) + if (consumer.Config.MaxDeliver > 0 && deliveries >= consumer.Config.MaxDeliver) { - consumer.AckProcessor.Drop(expiredSequence); + consumer.AckProcessor.AckAll(expiredSequence); return new PullFetchBatch(messages); } @@ -75,6 +75,13 @@ public sealed class PullConsumerEngine continue; } + if (message.Sequence <= consumer.AckProcessor.AckFloor) + { + sequence++; + i--; + continue; + } + if (consumer.Config.ReplayPolicy == ReplayPolicy.Original) await Task.Delay(60, ct); diff --git a/src/NATS.Server/JetStream/Consumers/PushConsumerEngine.cs b/src/NATS.Server/JetStream/Consumers/PushConsumerEngine.cs index d937974..735a59b 100644 --- a/src/NATS.Server/JetStream/Consumers/PushConsumerEngine.cs +++ b/src/NATS.Server/JetStream/Consumers/PushConsumerEngine.cs @@ -7,6 +7,9 @@ public sealed class PushConsumerEngine { public void Enqueue(ConsumerHandle consumer, StoredMessage message) { + if (message.Sequence <= consumer.AckProcessor.AckFloor) + return; + var availableAtUtc = DateTime.UtcNow; if (consumer.Config.RateLimitBps > 0) { diff --git a/tests/NATS.Server.Tests/JetStream/JetStreamConsumerStateMachineStrictParityTests.cs b/tests/NATS.Server.Tests/JetStream/JetStreamConsumerStateMachineStrictParityTests.cs new file mode 100644 index 0000000..ab12a8c --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/JetStreamConsumerStateMachineStrictParityTests.cs @@ -0,0 +1,42 @@ +using NATS.Server.JetStream; +using NATS.Server.JetStream.Models; + +namespace NATS.Server.Tests.JetStream; + +public class JetStreamConsumerStateMachineStrictParityTests +{ + [Fact] + public async Task Ack_redelivery_backoff_and_replay_timing_follow_monotonic_consumer_state_machine_rules() + { + var streams = new StreamManager(); + var consumers = new ConsumerManager(); + + streams.CreateOrUpdate(new StreamConfig + { + Name = "ORDERS_SM", + Subjects = ["orders.sm"], + Retention = RetentionPolicy.Limits, + MaxMsgs = 32, + }).Error.ShouldBeNull(); + + consumers.CreateOrUpdate("ORDERS_SM", new ConsumerConfig + { + DurableName = "D1", + AckPolicy = AckPolicy.Explicit, + AckWaitMs = 1, + MaxDeliver = 1, + BackOffMs = [1], + }).Error.ShouldBeNull(); + + streams.Capture("orders.sm", "x"u8.ToArray()); + + var first = await consumers.FetchAsync("ORDERS_SM", "D1", 1, streams, default); + first.Messages.Count.ShouldBe(1); + + await Task.Delay(5); + var second = await consumers.FetchAsync("ORDERS_SM", "D1", 1, streams, default); + + // MaxDeliver=1 means the initial delivery is the only allowed delivery. + second.Messages.Count.ShouldBe(0); + } +}