diff --git a/src/NATS.Server/JetStream/ConsumerManager.cs b/src/NATS.Server/JetStream/ConsumerManager.cs index f591e68..b6023b4 100644 --- a/src/NATS.Server/JetStream/ConsumerManager.cs +++ b/src/NATS.Server/JetStream/ConsumerManager.cs @@ -12,6 +12,7 @@ public sealed class ConsumerManager { private readonly JetStreamMetaGroup? _metaGroup; private readonly ConcurrentDictionary<(string Stream, string Name), ConsumerHandle> _consumers = new(); + private readonly ConcurrentDictionary _ackFloors = new(StringComparer.Ordinal); private readonly PullConsumerEngine _pullConsumerEngine = new(); private readonly PushConsumerEngine _pushConsumerEngine = new(); @@ -130,6 +131,7 @@ public sealed class ConsumerManager return false; handle.AckProcessor.AckAll(sequence); + _ackFloors.AddOrUpdate(stream, _ => sequence, (_, existing) => Math.Max(existing, sequence)); return true; } @@ -180,6 +182,9 @@ public sealed class ConsumerManager return true; } + + internal ulong GetAckFloor(string stream) + => _ackFloors.TryGetValue(stream, out var ackFloor) ? ackFloor : 0; } public sealed record ConsumerHandle(string Stream, ConsumerConfig Config) diff --git a/src/NATS.Server/JetStream/StreamManager.cs b/src/NATS.Server/JetStream/StreamManager.cs index 899dabf..4293ccf 100644 --- a/src/NATS.Server/JetStream/StreamManager.cs +++ b/src/NATS.Server/JetStream/StreamManager.cs @@ -14,6 +14,7 @@ namespace NATS.Server.JetStream; public sealed class StreamManager { private readonly Account? _account; + private readonly ConsumerManager? _consumerManager; private readonly JetStreamMetaGroup? _metaGroup; private readonly ConcurrentDictionary _streams = new(StringComparer.Ordinal); @@ -25,10 +26,11 @@ public sealed class StreamManager new(StringComparer.Ordinal); private readonly StreamSnapshotService _snapshotService = new(); - public StreamManager(JetStreamMetaGroup? metaGroup = null, Account? account = null) + public StreamManager(JetStreamMetaGroup? metaGroup = null, Account? account = null, ConsumerManager? consumerManager = null) { _metaGroup = metaGroup; _account = account; + _consumerManager = consumerManager; } public IReadOnlyCollection StreamNames => _streams.Keys.ToArray(); @@ -261,7 +263,7 @@ public sealed class StreamManager }; } - private static void EnforceRuntimePolicies(StreamHandle stream, DateTime nowUtc) + private void EnforceRuntimePolicies(StreamHandle stream, DateTime nowUtc) { switch (stream.Config.Retention) { @@ -284,11 +286,23 @@ public sealed class StreamManager PruneExpiredMessages(stream, nowUtc); } - private static void ApplyWorkQueueRetention(StreamHandle stream, DateTime nowUtc) + private void ApplyWorkQueueRetention(StreamHandle stream, DateTime nowUtc) { - // WorkQueue keeps one-consumer processing semantics; current parity baseline - // applies the same bounded retention guards used by limits retention. ApplyLimitsRetention(stream, nowUtc); + + if (_consumerManager == null) + return; + + var ackFloor = _consumerManager.GetAckFloor(stream.Config.Name); + if (ackFloor == 0) + return; + + var messages = stream.Store.ListAsync(default).GetAwaiter().GetResult(); + foreach (var message in messages) + { + if (message.Sequence <= ackFloor) + stream.Store.RemoveAsync(message.Sequence, default).GetAwaiter().GetResult(); + } } private static void ApplyInterestRetention(StreamHandle stream, DateTime nowUtc) diff --git a/src/NATS.Server/NatsServer.cs b/src/NATS.Server/NatsServer.cs index 72b0212..21692e6 100644 --- a/src/NATS.Server/NatsServer.cs +++ b/src/NATS.Server/NatsServer.cs @@ -408,8 +408,8 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable if (options.JetStream != null) { - _jetStreamStreamManager = new StreamManager(); _jetStreamConsumerManager = new ConsumerManager(); + _jetStreamStreamManager = new StreamManager(consumerManager: _jetStreamConsumerManager); var jsClientId = Interlocked.Increment(ref _nextClientId); _jetStreamInternalClient = new InternalClient(jsClientId, ClientKind.JetStream, _systemAccount); _jetStreamService = new JetStreamService(options.JetStream, _jetStreamInternalClient); diff --git a/tests/NATS.Server.Tests/JetStream/JetStreamRetentionRuntimeStrictParityTests.cs b/tests/NATS.Server.Tests/JetStream/JetStreamRetentionRuntimeStrictParityTests.cs new file mode 100644 index 0000000..b968ddf --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/JetStreamRetentionRuntimeStrictParityTests.cs @@ -0,0 +1,53 @@ +using NATS.Server.JetStream; +using NATS.Server.JetStream.Models; + +namespace NATS.Server.Tests.JetStream; + +public class JetStreamRetentionRuntimeStrictParityTests +{ + [Fact] + public async Task Limits_interest_and_workqueue_retention_diverge_by_runtime_contract() + { + var consumers = new ConsumerManager(); + var streams = new StreamManager(consumerManager: consumers); + + streams.CreateOrUpdate(new StreamConfig + { + Name = "WQ_STRICT", + Subjects = ["wq.strict"], + Retention = RetentionPolicy.WorkQueue, + MaxMsgs = 32, + }).Error.ShouldBeNull(); + streams.CreateOrUpdate(new StreamConfig + { + Name = "INT_STRICT", + Subjects = ["int.strict"], + Retention = RetentionPolicy.Interest, + MaxMsgs = 32, + }).Error.ShouldBeNull(); + + consumers.CreateOrUpdate("WQ_STRICT", new ConsumerConfig { DurableName = "WQ", AckPolicy = AckPolicy.All }).Error.ShouldBeNull(); + consumers.CreateOrUpdate("INT_STRICT", new ConsumerConfig { DurableName = "INT", AckPolicy = AckPolicy.All }).Error.ShouldBeNull(); + + streams.Capture("wq.strict", "one"u8.ToArray()); + streams.Capture("wq.strict", "two"u8.ToArray()); + streams.Capture("int.strict", "one"u8.ToArray()); + streams.Capture("int.strict", "two"u8.ToArray()); + + consumers.AckAll("WQ_STRICT", "WQ", 1).ShouldBeTrue(); + consumers.AckAll("INT_STRICT", "INT", 1).ShouldBeTrue(); + + // Trigger another retention pass after ack-floor updates. + streams.Capture("wq.strict", "three"u8.ToArray()); + streams.Capture("int.strict", "three"u8.ToArray()); + + streams.TryGet("WQ_STRICT", out var wq).ShouldBeTrue(); + streams.TryGet("INT_STRICT", out var interest).ShouldBeTrue(); + + var wqState = await wq.Store.GetStateAsync(default); + var interestState = await interest.Store.GetStateAsync(default); + + wqState.Messages.ShouldBe((ulong)2); // seq=1 pruned by workqueue ack floor + interestState.Messages.ShouldBe((ulong)3); // interest retention does not use ack-floor pruning + } +}