feat(stream): add InterestRetentionPolicy for per-consumer ack tracking

Implements Interest retention policy logic that tracks which consumers
are interested in each message subject and whether they have acknowledged
delivery, retaining messages until all interested consumers have acked.
Go reference: stream.go checkInterestState/noInterest.
This commit is contained in:
Joseph Doherty
2026-02-25 02:25:39 -05:00
parent 2eaa736b21
commit 955d568423
2 changed files with 166 additions and 0 deletions

View File

@@ -0,0 +1,75 @@
using NATS.Server.Subscriptions;
namespace NATS.Server.JetStream;
/// <summary>
/// Tracks per-consumer interest and determines when messages can be removed
/// under Interest retention policy. A message should be retained until all
/// interested consumers have acknowledged it.
/// Go reference: stream.go checkInterestState/noInterest.
/// </summary>
public sealed class InterestRetentionPolicy
{
// consumer → filter subject pattern
private readonly Dictionary<string, string> _interests = new(StringComparer.Ordinal);
// seq → set of consumers that have acked this sequence
private readonly Dictionary<ulong, HashSet<string>> _acks = new();
/// <summary>
/// Register a consumer's interest in a subject pattern.
/// </summary>
public void RegisterInterest(string consumer, string filterSubject)
{
_interests[consumer] = filterSubject;
}
/// <summary>
/// Remove a consumer's interest (e.g., on deletion).
/// </summary>
public void UnregisterInterest(string consumer)
{
_interests.Remove(consumer);
}
/// <summary>
/// Record that a consumer has acknowledged delivery of a sequence.
/// </summary>
public void AcknowledgeDelivery(string consumer, ulong seq)
{
if (!_acks.TryGetValue(seq, out var ackedBy))
{
ackedBy = new HashSet<string>(StringComparer.Ordinal);
_acks[seq] = ackedBy;
}
ackedBy.Add(consumer);
}
/// <summary>
/// Returns true if the message should be retained (i.e., at least one
/// interested consumer has NOT yet acknowledged it).
/// A consumer is "interested" if its filter subject matches the message subject.
/// </summary>
public bool ShouldRetain(ulong seq, string msgSubject)
{
_acks.TryGetValue(seq, out var ackedBy);
foreach (var (consumer, filterSubject) in _interests)
{
// Check if this consumer is interested in this message's subject
if (!SubjectMatch.MatchLiteral(msgSubject, filterSubject))
continue;
// Consumer is interested — has it acked?
if (ackedBy == null || !ackedBy.Contains(consumer))
return true; // Not yet acked — must retain
}
// All interested consumers have acked (or no one is interested)
// Clean up ack tracking for this sequence
_acks.Remove(seq);
return false;
}
/// <summary>Number of registered consumers.</summary>
public int ConsumerCount => _interests.Count;
}

View File

@@ -0,0 +1,91 @@
using NATS.Server.JetStream;
namespace NATS.Server.Tests.JetStream;
/// <summary>
/// Tests for InterestRetentionPolicy per-consumer ack tracking.
/// Go reference: stream.go checkInterestState/noInterest.
/// </summary>
public class InterestRetentionTests
{
[Fact]
public void ShouldRetain_true_when_consumers_have_not_acked()
{
var policy = new InterestRetentionPolicy();
policy.RegisterInterest("consumer-A", "orders.>");
policy.RegisterInterest("consumer-B", "orders.>");
policy.ShouldRetain(1, "orders.new").ShouldBeTrue();
}
[Fact]
public void ShouldRetain_false_when_all_consumers_acked()
{
var policy = new InterestRetentionPolicy();
policy.RegisterInterest("consumer-A", "orders.>");
policy.RegisterInterest("consumer-B", "orders.>");
policy.AcknowledgeDelivery("consumer-A", 1);
policy.ShouldRetain(1, "orders.new").ShouldBeTrue(); // B hasn't acked
policy.AcknowledgeDelivery("consumer-B", 1);
policy.ShouldRetain(1, "orders.new").ShouldBeFalse(); // both acked
}
[Fact]
public void ShouldRetain_ignores_consumers_without_interest()
{
var policy = new InterestRetentionPolicy();
policy.RegisterInterest("consumer-A", "orders.>");
policy.RegisterInterest("consumer-B", "billing.>"); // no interest in orders
policy.AcknowledgeDelivery("consumer-A", 1);
policy.ShouldRetain(1, "orders.new").ShouldBeFalse(); // B has no interest
}
[Fact]
public void UnregisterInterest_removes_consumer()
{
var policy = new InterestRetentionPolicy();
policy.RegisterInterest("consumer-A", "x.>");
policy.RegisterInterest("consumer-B", "x.>");
policy.UnregisterInterest("consumer-B");
// Only A needs to ack
policy.AcknowledgeDelivery("consumer-A", 1);
policy.ShouldRetain(1, "x.y").ShouldBeFalse();
}
[Fact]
public void ShouldRetain_false_when_no_consumers_registered()
{
var policy = new InterestRetentionPolicy();
policy.ShouldRetain(1, "any.subject").ShouldBeFalse();
}
[Fact]
public void Multiple_sequences_tracked_independently()
{
var policy = new InterestRetentionPolicy();
policy.RegisterInterest("c1", "x.>");
policy.AcknowledgeDelivery("c1", 1);
policy.ShouldRetain(1, "x.y").ShouldBeFalse();
policy.ShouldRetain(2, "x.y").ShouldBeTrue(); // seq 2 not acked
}
[Fact]
public void ConsumerCount_tracks_registrations()
{
var policy = new InterestRetentionPolicy();
policy.ConsumerCount.ShouldBe(0);
policy.RegisterInterest("c1", "x.>");
policy.RegisterInterest("c2", "y.>");
policy.ConsumerCount.ShouldBe(2);
policy.UnregisterInterest("c1");
policy.ConsumerCount.ShouldBe(1);
}
}