diff --git a/src/NATS.Server/JetStream/InterestRetentionPolicy.cs b/src/NATS.Server/JetStream/InterestRetentionPolicy.cs new file mode 100644 index 0000000..dabad26 --- /dev/null +++ b/src/NATS.Server/JetStream/InterestRetentionPolicy.cs @@ -0,0 +1,75 @@ +using NATS.Server.Subscriptions; + +namespace NATS.Server.JetStream; + +/// +/// Tracks per-consumer interest and determines when messages can be removed +/// under Interest retention policy. A message should be retained until all +/// interested consumers have acknowledged it. +/// Go reference: stream.go checkInterestState/noInterest. +/// +public sealed class InterestRetentionPolicy +{ + // consumer → filter subject pattern + private readonly Dictionary _interests = new(StringComparer.Ordinal); + // seq → set of consumers that have acked this sequence + private readonly Dictionary> _acks = new(); + + /// + /// Register a consumer's interest in a subject pattern. + /// + public void RegisterInterest(string consumer, string filterSubject) + { + _interests[consumer] = filterSubject; + } + + /// + /// Remove a consumer's interest (e.g., on deletion). + /// + public void UnregisterInterest(string consumer) + { + _interests.Remove(consumer); + } + + /// + /// Record that a consumer has acknowledged delivery of a sequence. + /// + public void AcknowledgeDelivery(string consumer, ulong seq) + { + if (!_acks.TryGetValue(seq, out var ackedBy)) + { + ackedBy = new HashSet(StringComparer.Ordinal); + _acks[seq] = ackedBy; + } + ackedBy.Add(consumer); + } + + /// + /// Returns true if the message should be retained (i.e., at least one + /// interested consumer has NOT yet acknowledged it). + /// A consumer is "interested" if its filter subject matches the message subject. + /// + public bool ShouldRetain(ulong seq, string msgSubject) + { + _acks.TryGetValue(seq, out var ackedBy); + + foreach (var (consumer, filterSubject) in _interests) + { + // Check if this consumer is interested in this message's subject + if (!SubjectMatch.MatchLiteral(msgSubject, filterSubject)) + continue; + + // Consumer is interested — has it acked? + if (ackedBy == null || !ackedBy.Contains(consumer)) + return true; // Not yet acked — must retain + } + + // All interested consumers have acked (or no one is interested) + // Clean up ack tracking for this sequence + _acks.Remove(seq); + return false; + } + + /// Number of registered consumers. + public int ConsumerCount => _interests.Count; +} diff --git a/tests/NATS.Server.Tests/JetStream/InterestRetentionTests.cs b/tests/NATS.Server.Tests/JetStream/InterestRetentionTests.cs new file mode 100644 index 0000000..3b39596 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/InterestRetentionTests.cs @@ -0,0 +1,91 @@ +using NATS.Server.JetStream; + +namespace NATS.Server.Tests.JetStream; + +/// +/// Tests for InterestRetentionPolicy per-consumer ack tracking. +/// Go reference: stream.go checkInterestState/noInterest. +/// +public class InterestRetentionTests +{ + [Fact] + public void ShouldRetain_true_when_consumers_have_not_acked() + { + var policy = new InterestRetentionPolicy(); + policy.RegisterInterest("consumer-A", "orders.>"); + policy.RegisterInterest("consumer-B", "orders.>"); + + policy.ShouldRetain(1, "orders.new").ShouldBeTrue(); + } + + [Fact] + public void ShouldRetain_false_when_all_consumers_acked() + { + var policy = new InterestRetentionPolicy(); + policy.RegisterInterest("consumer-A", "orders.>"); + policy.RegisterInterest("consumer-B", "orders.>"); + + policy.AcknowledgeDelivery("consumer-A", 1); + policy.ShouldRetain(1, "orders.new").ShouldBeTrue(); // B hasn't acked + + policy.AcknowledgeDelivery("consumer-B", 1); + policy.ShouldRetain(1, "orders.new").ShouldBeFalse(); // both acked + } + + [Fact] + public void ShouldRetain_ignores_consumers_without_interest() + { + var policy = new InterestRetentionPolicy(); + policy.RegisterInterest("consumer-A", "orders.>"); + policy.RegisterInterest("consumer-B", "billing.>"); // no interest in orders + + policy.AcknowledgeDelivery("consumer-A", 1); + policy.ShouldRetain(1, "orders.new").ShouldBeFalse(); // B has no interest + } + + [Fact] + public void UnregisterInterest_removes_consumer() + { + var policy = new InterestRetentionPolicy(); + policy.RegisterInterest("consumer-A", "x.>"); + policy.RegisterInterest("consumer-B", "x.>"); + + policy.UnregisterInterest("consumer-B"); + + // Only A needs to ack + policy.AcknowledgeDelivery("consumer-A", 1); + policy.ShouldRetain(1, "x.y").ShouldBeFalse(); + } + + [Fact] + public void ShouldRetain_false_when_no_consumers_registered() + { + var policy = new InterestRetentionPolicy(); + policy.ShouldRetain(1, "any.subject").ShouldBeFalse(); + } + + [Fact] + public void Multiple_sequences_tracked_independently() + { + var policy = new InterestRetentionPolicy(); + policy.RegisterInterest("c1", "x.>"); + + policy.AcknowledgeDelivery("c1", 1); + policy.ShouldRetain(1, "x.y").ShouldBeFalse(); + policy.ShouldRetain(2, "x.y").ShouldBeTrue(); // seq 2 not acked + } + + [Fact] + public void ConsumerCount_tracks_registrations() + { + var policy = new InterestRetentionPolicy(); + policy.ConsumerCount.ShouldBe(0); + + policy.RegisterInterest("c1", "x.>"); + policy.RegisterInterest("c2", "y.>"); + policy.ConsumerCount.ShouldBe(2); + + policy.UnregisterInterest("c1"); + policy.ConsumerCount.ShouldBe(1); + } +}