From 955d5684236641a6c5e0a462ddbcaadb9c832037 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Wed, 25 Feb 2026 02:25:39 -0500 Subject: [PATCH] feat(stream): add InterestRetentionPolicy for per-consumer ack tracking Implements Interest retention policy logic that tracks which consumers are interested in each message subject and whether they have acknowledged delivery, retaining messages until all interested consumers have acked. Go reference: stream.go checkInterestState/noInterest. --- .../JetStream/InterestRetentionPolicy.cs | 75 +++++++++++++++ .../JetStream/InterestRetentionTests.cs | 91 +++++++++++++++++++ 2 files changed, 166 insertions(+) create mode 100644 src/NATS.Server/JetStream/InterestRetentionPolicy.cs create mode 100644 tests/NATS.Server.Tests/JetStream/InterestRetentionTests.cs diff --git a/src/NATS.Server/JetStream/InterestRetentionPolicy.cs b/src/NATS.Server/JetStream/InterestRetentionPolicy.cs new file mode 100644 index 0000000..dabad26 --- /dev/null +++ b/src/NATS.Server/JetStream/InterestRetentionPolicy.cs @@ -0,0 +1,75 @@ +using NATS.Server.Subscriptions; + +namespace NATS.Server.JetStream; + +/// +/// Tracks per-consumer interest and determines when messages can be removed +/// under Interest retention policy. A message should be retained until all +/// interested consumers have acknowledged it. +/// Go reference: stream.go checkInterestState/noInterest. +/// +public sealed class InterestRetentionPolicy +{ + // consumer → filter subject pattern + private readonly Dictionary _interests = new(StringComparer.Ordinal); + // seq → set of consumers that have acked this sequence + private readonly Dictionary> _acks = new(); + + /// + /// Register a consumer's interest in a subject pattern. + /// + public void RegisterInterest(string consumer, string filterSubject) + { + _interests[consumer] = filterSubject; + } + + /// + /// Remove a consumer's interest (e.g., on deletion). + /// + public void UnregisterInterest(string consumer) + { + _interests.Remove(consumer); + } + + /// + /// Record that a consumer has acknowledged delivery of a sequence. + /// + public void AcknowledgeDelivery(string consumer, ulong seq) + { + if (!_acks.TryGetValue(seq, out var ackedBy)) + { + ackedBy = new HashSet(StringComparer.Ordinal); + _acks[seq] = ackedBy; + } + ackedBy.Add(consumer); + } + + /// + /// Returns true if the message should be retained (i.e., at least one + /// interested consumer has NOT yet acknowledged it). + /// A consumer is "interested" if its filter subject matches the message subject. + /// + public bool ShouldRetain(ulong seq, string msgSubject) + { + _acks.TryGetValue(seq, out var ackedBy); + + foreach (var (consumer, filterSubject) in _interests) + { + // Check if this consumer is interested in this message's subject + if (!SubjectMatch.MatchLiteral(msgSubject, filterSubject)) + continue; + + // Consumer is interested — has it acked? + if (ackedBy == null || !ackedBy.Contains(consumer)) + return true; // Not yet acked — must retain + } + + // All interested consumers have acked (or no one is interested) + // Clean up ack tracking for this sequence + _acks.Remove(seq); + return false; + } + + /// Number of registered consumers. + public int ConsumerCount => _interests.Count; +} diff --git a/tests/NATS.Server.Tests/JetStream/InterestRetentionTests.cs b/tests/NATS.Server.Tests/JetStream/InterestRetentionTests.cs new file mode 100644 index 0000000..3b39596 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/InterestRetentionTests.cs @@ -0,0 +1,91 @@ +using NATS.Server.JetStream; + +namespace NATS.Server.Tests.JetStream; + +/// +/// Tests for InterestRetentionPolicy per-consumer ack tracking. +/// Go reference: stream.go checkInterestState/noInterest. +/// +public class InterestRetentionTests +{ + [Fact] + public void ShouldRetain_true_when_consumers_have_not_acked() + { + var policy = new InterestRetentionPolicy(); + policy.RegisterInterest("consumer-A", "orders.>"); + policy.RegisterInterest("consumer-B", "orders.>"); + + policy.ShouldRetain(1, "orders.new").ShouldBeTrue(); + } + + [Fact] + public void ShouldRetain_false_when_all_consumers_acked() + { + var policy = new InterestRetentionPolicy(); + policy.RegisterInterest("consumer-A", "orders.>"); + policy.RegisterInterest("consumer-B", "orders.>"); + + policy.AcknowledgeDelivery("consumer-A", 1); + policy.ShouldRetain(1, "orders.new").ShouldBeTrue(); // B hasn't acked + + policy.AcknowledgeDelivery("consumer-B", 1); + policy.ShouldRetain(1, "orders.new").ShouldBeFalse(); // both acked + } + + [Fact] + public void ShouldRetain_ignores_consumers_without_interest() + { + var policy = new InterestRetentionPolicy(); + policy.RegisterInterest("consumer-A", "orders.>"); + policy.RegisterInterest("consumer-B", "billing.>"); // no interest in orders + + policy.AcknowledgeDelivery("consumer-A", 1); + policy.ShouldRetain(1, "orders.new").ShouldBeFalse(); // B has no interest + } + + [Fact] + public void UnregisterInterest_removes_consumer() + { + var policy = new InterestRetentionPolicy(); + policy.RegisterInterest("consumer-A", "x.>"); + policy.RegisterInterest("consumer-B", "x.>"); + + policy.UnregisterInterest("consumer-B"); + + // Only A needs to ack + policy.AcknowledgeDelivery("consumer-A", 1); + policy.ShouldRetain(1, "x.y").ShouldBeFalse(); + } + + [Fact] + public void ShouldRetain_false_when_no_consumers_registered() + { + var policy = new InterestRetentionPolicy(); + policy.ShouldRetain(1, "any.subject").ShouldBeFalse(); + } + + [Fact] + public void Multiple_sequences_tracked_independently() + { + var policy = new InterestRetentionPolicy(); + policy.RegisterInterest("c1", "x.>"); + + policy.AcknowledgeDelivery("c1", 1); + policy.ShouldRetain(1, "x.y").ShouldBeFalse(); + policy.ShouldRetain(2, "x.y").ShouldBeTrue(); // seq 2 not acked + } + + [Fact] + public void ConsumerCount_tracks_registrations() + { + var policy = new InterestRetentionPolicy(); + policy.ConsumerCount.ShouldBe(0); + + policy.RegisterInterest("c1", "x.>"); + policy.RegisterInterest("c2", "y.>"); + policy.ConsumerCount.ShouldBe(2); + + policy.UnregisterInterest("c1"); + policy.ConsumerCount.ShouldBe(1); + } +}