feat: implement strict retention runtime parity for jetstream
This commit is contained in:
@@ -12,6 +12,7 @@ public sealed class ConsumerManager
|
|||||||
{
|
{
|
||||||
private readonly JetStreamMetaGroup? _metaGroup;
|
private readonly JetStreamMetaGroup? _metaGroup;
|
||||||
private readonly ConcurrentDictionary<(string Stream, string Name), ConsumerHandle> _consumers = new();
|
private readonly ConcurrentDictionary<(string Stream, string Name), ConsumerHandle> _consumers = new();
|
||||||
|
private readonly ConcurrentDictionary<string, ulong> _ackFloors = new(StringComparer.Ordinal);
|
||||||
private readonly PullConsumerEngine _pullConsumerEngine = new();
|
private readonly PullConsumerEngine _pullConsumerEngine = new();
|
||||||
private readonly PushConsumerEngine _pushConsumerEngine = new();
|
private readonly PushConsumerEngine _pushConsumerEngine = new();
|
||||||
|
|
||||||
@@ -130,6 +131,7 @@ public sealed class ConsumerManager
|
|||||||
return false;
|
return false;
|
||||||
|
|
||||||
handle.AckProcessor.AckAll(sequence);
|
handle.AckProcessor.AckAll(sequence);
|
||||||
|
_ackFloors.AddOrUpdate(stream, _ => sequence, (_, existing) => Math.Max(existing, sequence));
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -180,6 +182,9 @@ public sealed class ConsumerManager
|
|||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
internal ulong GetAckFloor(string stream)
|
||||||
|
=> _ackFloors.TryGetValue(stream, out var ackFloor) ? ackFloor : 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
public sealed record ConsumerHandle(string Stream, ConsumerConfig Config)
|
public sealed record ConsumerHandle(string Stream, ConsumerConfig Config)
|
||||||
|
|||||||
@@ -14,6 +14,7 @@ namespace NATS.Server.JetStream;
|
|||||||
public sealed class StreamManager
|
public sealed class StreamManager
|
||||||
{
|
{
|
||||||
private readonly Account? _account;
|
private readonly Account? _account;
|
||||||
|
private readonly ConsumerManager? _consumerManager;
|
||||||
private readonly JetStreamMetaGroup? _metaGroup;
|
private readonly JetStreamMetaGroup? _metaGroup;
|
||||||
private readonly ConcurrentDictionary<string, StreamHandle> _streams =
|
private readonly ConcurrentDictionary<string, StreamHandle> _streams =
|
||||||
new(StringComparer.Ordinal);
|
new(StringComparer.Ordinal);
|
||||||
@@ -25,10 +26,11 @@ public sealed class StreamManager
|
|||||||
new(StringComparer.Ordinal);
|
new(StringComparer.Ordinal);
|
||||||
private readonly StreamSnapshotService _snapshotService = new();
|
private readonly StreamSnapshotService _snapshotService = new();
|
||||||
|
|
||||||
public StreamManager(JetStreamMetaGroup? metaGroup = null, Account? account = null)
|
public StreamManager(JetStreamMetaGroup? metaGroup = null, Account? account = null, ConsumerManager? consumerManager = null)
|
||||||
{
|
{
|
||||||
_metaGroup = metaGroup;
|
_metaGroup = metaGroup;
|
||||||
_account = account;
|
_account = account;
|
||||||
|
_consumerManager = consumerManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
public IReadOnlyCollection<string> StreamNames => _streams.Keys.ToArray();
|
public IReadOnlyCollection<string> StreamNames => _streams.Keys.ToArray();
|
||||||
@@ -261,7 +263,7 @@ public sealed class StreamManager
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void EnforceRuntimePolicies(StreamHandle stream, DateTime nowUtc)
|
private void EnforceRuntimePolicies(StreamHandle stream, DateTime nowUtc)
|
||||||
{
|
{
|
||||||
switch (stream.Config.Retention)
|
switch (stream.Config.Retention)
|
||||||
{
|
{
|
||||||
@@ -284,11 +286,23 @@ public sealed class StreamManager
|
|||||||
PruneExpiredMessages(stream, nowUtc);
|
PruneExpiredMessages(stream, nowUtc);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void ApplyWorkQueueRetention(StreamHandle stream, DateTime nowUtc)
|
private void ApplyWorkQueueRetention(StreamHandle stream, DateTime nowUtc)
|
||||||
{
|
{
|
||||||
// WorkQueue keeps one-consumer processing semantics; current parity baseline
|
|
||||||
// applies the same bounded retention guards used by limits retention.
|
|
||||||
ApplyLimitsRetention(stream, nowUtc);
|
ApplyLimitsRetention(stream, nowUtc);
|
||||||
|
|
||||||
|
if (_consumerManager == null)
|
||||||
|
return;
|
||||||
|
|
||||||
|
var ackFloor = _consumerManager.GetAckFloor(stream.Config.Name);
|
||||||
|
if (ackFloor == 0)
|
||||||
|
return;
|
||||||
|
|
||||||
|
var messages = stream.Store.ListAsync(default).GetAwaiter().GetResult();
|
||||||
|
foreach (var message in messages)
|
||||||
|
{
|
||||||
|
if (message.Sequence <= ackFloor)
|
||||||
|
stream.Store.RemoveAsync(message.Sequence, default).GetAwaiter().GetResult();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void ApplyInterestRetention(StreamHandle stream, DateTime nowUtc)
|
private static void ApplyInterestRetention(StreamHandle stream, DateTime nowUtc)
|
||||||
|
|||||||
@@ -408,8 +408,8 @@ public sealed class NatsServer : IMessageRouter, ISubListAccess, IDisposable
|
|||||||
|
|
||||||
if (options.JetStream != null)
|
if (options.JetStream != null)
|
||||||
{
|
{
|
||||||
_jetStreamStreamManager = new StreamManager();
|
|
||||||
_jetStreamConsumerManager = new ConsumerManager();
|
_jetStreamConsumerManager = new ConsumerManager();
|
||||||
|
_jetStreamStreamManager = new StreamManager(consumerManager: _jetStreamConsumerManager);
|
||||||
var jsClientId = Interlocked.Increment(ref _nextClientId);
|
var jsClientId = Interlocked.Increment(ref _nextClientId);
|
||||||
_jetStreamInternalClient = new InternalClient(jsClientId, ClientKind.JetStream, _systemAccount);
|
_jetStreamInternalClient = new InternalClient(jsClientId, ClientKind.JetStream, _systemAccount);
|
||||||
_jetStreamService = new JetStreamService(options.JetStream, _jetStreamInternalClient);
|
_jetStreamService = new JetStreamService(options.JetStream, _jetStreamInternalClient);
|
||||||
|
|||||||
@@ -0,0 +1,53 @@
|
|||||||
|
using NATS.Server.JetStream;
|
||||||
|
using NATS.Server.JetStream.Models;
|
||||||
|
|
||||||
|
namespace NATS.Server.Tests.JetStream;
|
||||||
|
|
||||||
|
public class JetStreamRetentionRuntimeStrictParityTests
|
||||||
|
{
|
||||||
|
[Fact]
|
||||||
|
public async Task Limits_interest_and_workqueue_retention_diverge_by_runtime_contract()
|
||||||
|
{
|
||||||
|
var consumers = new ConsumerManager();
|
||||||
|
var streams = new StreamManager(consumerManager: consumers);
|
||||||
|
|
||||||
|
streams.CreateOrUpdate(new StreamConfig
|
||||||
|
{
|
||||||
|
Name = "WQ_STRICT",
|
||||||
|
Subjects = ["wq.strict"],
|
||||||
|
Retention = RetentionPolicy.WorkQueue,
|
||||||
|
MaxMsgs = 32,
|
||||||
|
}).Error.ShouldBeNull();
|
||||||
|
streams.CreateOrUpdate(new StreamConfig
|
||||||
|
{
|
||||||
|
Name = "INT_STRICT",
|
||||||
|
Subjects = ["int.strict"],
|
||||||
|
Retention = RetentionPolicy.Interest,
|
||||||
|
MaxMsgs = 32,
|
||||||
|
}).Error.ShouldBeNull();
|
||||||
|
|
||||||
|
consumers.CreateOrUpdate("WQ_STRICT", new ConsumerConfig { DurableName = "WQ", AckPolicy = AckPolicy.All }).Error.ShouldBeNull();
|
||||||
|
consumers.CreateOrUpdate("INT_STRICT", new ConsumerConfig { DurableName = "INT", AckPolicy = AckPolicy.All }).Error.ShouldBeNull();
|
||||||
|
|
||||||
|
streams.Capture("wq.strict", "one"u8.ToArray());
|
||||||
|
streams.Capture("wq.strict", "two"u8.ToArray());
|
||||||
|
streams.Capture("int.strict", "one"u8.ToArray());
|
||||||
|
streams.Capture("int.strict", "two"u8.ToArray());
|
||||||
|
|
||||||
|
consumers.AckAll("WQ_STRICT", "WQ", 1).ShouldBeTrue();
|
||||||
|
consumers.AckAll("INT_STRICT", "INT", 1).ShouldBeTrue();
|
||||||
|
|
||||||
|
// Trigger another retention pass after ack-floor updates.
|
||||||
|
streams.Capture("wq.strict", "three"u8.ToArray());
|
||||||
|
streams.Capture("int.strict", "three"u8.ToArray());
|
||||||
|
|
||||||
|
streams.TryGet("WQ_STRICT", out var wq).ShouldBeTrue();
|
||||||
|
streams.TryGet("INT_STRICT", out var interest).ShouldBeTrue();
|
||||||
|
|
||||||
|
var wqState = await wq.Store.GetStateAsync(default);
|
||||||
|
var interestState = await interest.Store.GetStateAsync(default);
|
||||||
|
|
||||||
|
wqState.Messages.ShouldBe((ulong)2); // seq=1 pruned by workqueue ack floor
|
||||||
|
interestState.Messages.ShouldBe((ulong)3); // interest retention does not use ack-floor pruning
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user