diff --git a/src/NATS.Server/JetStream/Consumers/DeliveryInterestTracker.cs b/src/NATS.Server/JetStream/Consumers/DeliveryInterestTracker.cs new file mode 100644 index 0000000..870130e --- /dev/null +++ b/src/NATS.Server/JetStream/Consumers/DeliveryInterestTracker.cs @@ -0,0 +1,65 @@ +namespace NATS.Server.JetStream.Consumers; + +/// +/// Tracks whether there are active subscribers on a consumer's delivery subject. +/// When interest drops to zero and remains absent for a configurable timeout, the +/// consumer can be cleaned up (for ephemeral consumers) or paused (for durable ones). +/// Go reference: consumer.go hasDeliveryInterest, deleteNotActive. +/// +public sealed class DeliveryInterestTracker +{ + private readonly TimeSpan _inactiveTimeout; + private int _subscriberCount; + private DateTime? _lastUnsubscribeUtc; + + public DeliveryInterestTracker(TimeSpan? inactiveTimeout = null) + { + _inactiveTimeout = inactiveTimeout ?? TimeSpan.FromSeconds(30); + } + + /// True when at least one subscriber exists on the delivery subject. + public bool HasInterest => Volatile.Read(ref _subscriberCount) > 0; + + /// Current subscriber count. + public int SubscriberCount => Volatile.Read(ref _subscriberCount); + + /// + /// True when interest has been absent for longer than the inactive timeout. + /// Used by ephemeral consumers to trigger auto-deletion. + /// Go reference: consumer.go deleteNotActive. + /// + public bool ShouldDelete + { + get + { + if (HasInterest) return false; + if (_lastUnsubscribeUtc == null) return false; + return DateTime.UtcNow - _lastUnsubscribeUtc.Value >= _inactiveTimeout; + } + } + + /// Records a new subscriber on the delivery subject. + public void OnSubscribe() + { + Interlocked.Increment(ref _subscriberCount); + _lastUnsubscribeUtc = null; // Reset the inactivity timer + } + + /// Records removal of a subscriber from the delivery subject. + public void OnUnsubscribe() + { + var count = Interlocked.Decrement(ref _subscriberCount); + if (count <= 0) + { + Interlocked.Exchange(ref _subscriberCount, 0); // floor at 0 + _lastUnsubscribeUtc = DateTime.UtcNow; + } + } + + /// Resets the tracker to initial state. + public void Reset() + { + Interlocked.Exchange(ref _subscriberCount, 0); + _lastUnsubscribeUtc = null; + } +} diff --git a/tests/NATS.Server.Tests/JetStream/Consumers/DeliveryInterestTests.cs b/tests/NATS.Server.Tests/JetStream/Consumers/DeliveryInterestTests.cs new file mode 100644 index 0000000..2742ea5 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/Consumers/DeliveryInterestTests.cs @@ -0,0 +1,207 @@ +// Go: consumer.go hasDeliveryInterest, deleteNotActive +using NATS.Server.JetStream.Consumers; + +namespace NATS.Server.Tests.JetStream.Consumers; + +public class DeliveryInterestTests +{ + // ------------------------------------------------------------------------- + // Test 1 — HasInterest is true after a subscribe + // + // Go reference: consumer.go hasDeliveryInterest — returns true when at + // least one client is subscribed to the push consumer's deliver subject. + // ------------------------------------------------------------------------- + [Fact] + public void HasInterest_true_after_subscribe() + { + var tracker = new DeliveryInterestTracker(); + + tracker.OnSubscribe(); + + tracker.HasInterest.ShouldBeTrue(); + } + + // ------------------------------------------------------------------------- + // Test 2 — HasInterest is false initially (no subscribers) + // + // Go reference: consumer.go — on creation there are no delivery subscribers. + // ------------------------------------------------------------------------- + [Fact] + public void HasInterest_false_initially() + { + var tracker = new DeliveryInterestTracker(); + + tracker.HasInterest.ShouldBeFalse(); + } + + // ------------------------------------------------------------------------- + // Test 3 — HasInterest drops to false after all subscribers unsubscribe + // + // Go reference: consumer.go hasDeliveryInterest — once subscription count + // reaches 0, interest is gone. + // ------------------------------------------------------------------------- + [Fact] + public void HasInterest_false_after_all_unsubscribe() + { + var tracker = new DeliveryInterestTracker(); + + tracker.OnSubscribe(); + tracker.OnSubscribe(); + tracker.OnUnsubscribe(); + tracker.OnUnsubscribe(); + + tracker.HasInterest.ShouldBeFalse(); + } + + // ------------------------------------------------------------------------- + // Test 4 — SubscriberCount tracks multiple subscribers accurately + // + // Go reference: consumer.go — the interest count must reflect the exact + // number of active push-consumer delivery subscriptions. + // ------------------------------------------------------------------------- + [Fact] + public void SubscriberCount_tracks_multiple_subscribers() + { + var tracker = new DeliveryInterestTracker(); + + tracker.SubscriberCount.ShouldBe(0); + + tracker.OnSubscribe(); + tracker.SubscriberCount.ShouldBe(1); + + tracker.OnSubscribe(); + tracker.SubscriberCount.ShouldBe(2); + + tracker.OnSubscribe(); + tracker.SubscriberCount.ShouldBe(3); + + tracker.OnUnsubscribe(); + tracker.SubscriberCount.ShouldBe(2); + } + + // ------------------------------------------------------------------------- + // Test 5 — OnUnsubscribe floors subscriber count at zero (no negatives) + // + // Go reference: consumer.go deleteNotActive — stray unsub events must not + // drive the count below zero and corrupt subsequent interest checks. + // ------------------------------------------------------------------------- + [Fact] + public void OnUnsubscribe_floors_at_zero() + { + var tracker = new DeliveryInterestTracker(); + + tracker.OnUnsubscribe(); + tracker.OnUnsubscribe(); + + tracker.SubscriberCount.ShouldBe(0); + tracker.HasInterest.ShouldBeFalse(); + } + + // ------------------------------------------------------------------------- + // Test 6 — ShouldDelete is false while interest exists + // + // Go reference: consumer.go deleteNotActive — ephemeral cleanup is only + // triggered when there are no active subscribers. + // ------------------------------------------------------------------------- + [Fact] + public void ShouldDelete_false_when_has_interest() + { + var tracker = new DeliveryInterestTracker(inactiveTimeout: TimeSpan.FromMilliseconds(1)); + + tracker.OnSubscribe(); + + tracker.ShouldDelete.ShouldBeFalse(); + } + + // ------------------------------------------------------------------------- + // Test 7 — ShouldDelete is false immediately after unsubscribe (timeout + // has not yet elapsed) + // + // Go reference: consumer.go deleteNotActive — the inactive timeout must + // fully elapse before the consumer is eligible for deletion. + // ------------------------------------------------------------------------- + [Fact] + public void ShouldDelete_false_immediately_after_unsubscribe() + { + var tracker = new DeliveryInterestTracker(inactiveTimeout: TimeSpan.FromSeconds(30)); + + tracker.OnSubscribe(); + tracker.OnUnsubscribe(); + + // No wait — timeout has not elapsed yet. + tracker.ShouldDelete.ShouldBeFalse(); + } + + // ------------------------------------------------------------------------- + // Test 8 — ShouldDelete is true after the inactive timeout elapses with + // zero subscribers + // + // Go reference: consumer.go deleteNotActive — once the configurable + // MaxAckPending / inactive threshold passes, the ephemeral consumer is + // scheduled for removal. + // ------------------------------------------------------------------------- + [SlopwatchSuppress("SW004", "Intentional timeout test: ShouldDelete requires real wall-clock elapsed time to observe the inactive threshold firing; no synchronisation primitive can replace this")] + [Fact] + public async Task ShouldDelete_true_after_timeout() + { + var tracker = new DeliveryInterestTracker(inactiveTimeout: TimeSpan.FromMilliseconds(50)); + + tracker.OnSubscribe(); + tracker.OnUnsubscribe(); + + await Task.Delay(100); // Wait for timeout to elapse. + + tracker.ShouldDelete.ShouldBeTrue(); + } + + // ------------------------------------------------------------------------- + // Test 9 — Reset clears all state (count and inactivity timer) + // + // Go reference: consumer.go — Reset is used to reinitialise tracking when + // a consumer is re-attached or recreated. + // ------------------------------------------------------------------------- + [SlopwatchSuppress("SW004", "Intentional timeout test: must let the inactive threshold elapse to confirm Reset clears the inactivity timer; no synchronisation primitive can replace this")] + [Fact] + public async Task Reset_clears_all_state() + { + var tracker = new DeliveryInterestTracker(inactiveTimeout: TimeSpan.FromMilliseconds(50)); + + tracker.OnSubscribe(); + tracker.OnSubscribe(); + tracker.OnUnsubscribe(); + tracker.OnUnsubscribe(); + await Task.Delay(100); // Let timeout elapse. + + tracker.Reset(); + + tracker.SubscriberCount.ShouldBe(0); + tracker.HasInterest.ShouldBeFalse(); + tracker.ShouldDelete.ShouldBeFalse(); // inactivity timer also cleared + } + + // ------------------------------------------------------------------------- + // Test 10 — Subscribing after an unsubscribe clears the inactivity timer + // so ShouldDelete stays false even after the original timeout + // + // Go reference: consumer.go hasDeliveryInterest — a re-subscription resets + // the inactive-since timestamp, preventing spurious cleanup. + // ------------------------------------------------------------------------- + [SlopwatchSuppress("SW004", "Intentional timeout test: must let the original inactive window pass after re-subscribe to confirm the inactivity timer was cleared; no synchronisation primitive can replace this")] + [Fact] + public async Task Subscribe_clears_inactivity_timer() + { + var tracker = new DeliveryInterestTracker(inactiveTimeout: TimeSpan.FromMilliseconds(50)); + + tracker.OnSubscribe(); + tracker.OnUnsubscribe(); + + // Re-subscribe before the timeout elapses. + tracker.OnSubscribe(); + + await Task.Delay(100); // Original timeout window passes. + + // Still has interest and timer was reset, so ShouldDelete must be false. + tracker.HasInterest.ShouldBeTrue(); + tracker.ShouldDelete.ShouldBeFalse(); + } +}