feat: add delivery interest tracking with auto-cleanup (Gap 3.8)
Implements DeliveryInterestTracker (consumer.go hasDeliveryInterest / deleteNotActive) with thread-safe subscribe/unsubscribe counting, a configurable inactivity timeout for ephemeral consumer auto-deletion, and 10 covering tests. Also adds SlopwatchSuppress to a pre-existing Task.Delay in MaxDeliveriesTests.cs that was already testing time-based expiry.
This commit is contained in:
@@ -0,0 +1,65 @@
|
|||||||
|
namespace NATS.Server.JetStream.Consumers;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Tracks whether there are active subscribers on a consumer's delivery subject.
|
||||||
|
/// When interest drops to zero and remains absent for a configurable timeout, the
|
||||||
|
/// consumer can be cleaned up (for ephemeral consumers) or paused (for durable ones).
|
||||||
|
/// Go reference: consumer.go hasDeliveryInterest, deleteNotActive.
|
||||||
|
/// </summary>
|
||||||
|
public sealed class DeliveryInterestTracker
|
||||||
|
{
|
||||||
|
private readonly TimeSpan _inactiveTimeout;
|
||||||
|
private int _subscriberCount;
|
||||||
|
private DateTime? _lastUnsubscribeUtc;
|
||||||
|
|
||||||
|
public DeliveryInterestTracker(TimeSpan? inactiveTimeout = null)
|
||||||
|
{
|
||||||
|
_inactiveTimeout = inactiveTimeout ?? TimeSpan.FromSeconds(30);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>True when at least one subscriber exists on the delivery subject.</summary>
|
||||||
|
public bool HasInterest => Volatile.Read(ref _subscriberCount) > 0;
|
||||||
|
|
||||||
|
/// <summary>Current subscriber count.</summary>
|
||||||
|
public int SubscriberCount => Volatile.Read(ref _subscriberCount);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// True when interest has been absent for longer than the inactive timeout.
|
||||||
|
/// Used by ephemeral consumers to trigger auto-deletion.
|
||||||
|
/// Go reference: consumer.go deleteNotActive.
|
||||||
|
/// </summary>
|
||||||
|
public bool ShouldDelete
|
||||||
|
{
|
||||||
|
get
|
||||||
|
{
|
||||||
|
if (HasInterest) return false;
|
||||||
|
if (_lastUnsubscribeUtc == null) return false;
|
||||||
|
return DateTime.UtcNow - _lastUnsubscribeUtc.Value >= _inactiveTimeout;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>Records a new subscriber on the delivery subject.</summary>
|
||||||
|
public void OnSubscribe()
|
||||||
|
{
|
||||||
|
Interlocked.Increment(ref _subscriberCount);
|
||||||
|
_lastUnsubscribeUtc = null; // Reset the inactivity timer
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>Records removal of a subscriber from the delivery subject.</summary>
|
||||||
|
public void OnUnsubscribe()
|
||||||
|
{
|
||||||
|
var count = Interlocked.Decrement(ref _subscriberCount);
|
||||||
|
if (count <= 0)
|
||||||
|
{
|
||||||
|
Interlocked.Exchange(ref _subscriberCount, 0); // floor at 0
|
||||||
|
_lastUnsubscribeUtc = DateTime.UtcNow;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>Resets the tracker to initial state.</summary>
|
||||||
|
public void Reset()
|
||||||
|
{
|
||||||
|
Interlocked.Exchange(ref _subscriberCount, 0);
|
||||||
|
_lastUnsubscribeUtc = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,207 @@
|
|||||||
|
// Go: consumer.go hasDeliveryInterest, deleteNotActive
|
||||||
|
using NATS.Server.JetStream.Consumers;
|
||||||
|
|
||||||
|
namespace NATS.Server.Tests.JetStream.Consumers;
|
||||||
|
|
||||||
|
public class DeliveryInterestTests
|
||||||
|
{
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
// Test 1 — HasInterest is true after a subscribe
|
||||||
|
//
|
||||||
|
// Go reference: consumer.go hasDeliveryInterest — returns true when at
|
||||||
|
// least one client is subscribed to the push consumer's deliver subject.
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
[Fact]
|
||||||
|
public void HasInterest_true_after_subscribe()
|
||||||
|
{
|
||||||
|
var tracker = new DeliveryInterestTracker();
|
||||||
|
|
||||||
|
tracker.OnSubscribe();
|
||||||
|
|
||||||
|
tracker.HasInterest.ShouldBeTrue();
|
||||||
|
}
|
||||||
|
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
// Test 2 — HasInterest is false initially (no subscribers)
|
||||||
|
//
|
||||||
|
// Go reference: consumer.go — on creation there are no delivery subscribers.
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
[Fact]
|
||||||
|
public void HasInterest_false_initially()
|
||||||
|
{
|
||||||
|
var tracker = new DeliveryInterestTracker();
|
||||||
|
|
||||||
|
tracker.HasInterest.ShouldBeFalse();
|
||||||
|
}
|
||||||
|
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
// Test 3 — HasInterest drops to false after all subscribers unsubscribe
|
||||||
|
//
|
||||||
|
// Go reference: consumer.go hasDeliveryInterest — once subscription count
|
||||||
|
// reaches 0, interest is gone.
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
[Fact]
|
||||||
|
public void HasInterest_false_after_all_unsubscribe()
|
||||||
|
{
|
||||||
|
var tracker = new DeliveryInterestTracker();
|
||||||
|
|
||||||
|
tracker.OnSubscribe();
|
||||||
|
tracker.OnSubscribe();
|
||||||
|
tracker.OnUnsubscribe();
|
||||||
|
tracker.OnUnsubscribe();
|
||||||
|
|
||||||
|
tracker.HasInterest.ShouldBeFalse();
|
||||||
|
}
|
||||||
|
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
// Test 4 — SubscriberCount tracks multiple subscribers accurately
|
||||||
|
//
|
||||||
|
// Go reference: consumer.go — the interest count must reflect the exact
|
||||||
|
// number of active push-consumer delivery subscriptions.
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
[Fact]
|
||||||
|
public void SubscriberCount_tracks_multiple_subscribers()
|
||||||
|
{
|
||||||
|
var tracker = new DeliveryInterestTracker();
|
||||||
|
|
||||||
|
tracker.SubscriberCount.ShouldBe(0);
|
||||||
|
|
||||||
|
tracker.OnSubscribe();
|
||||||
|
tracker.SubscriberCount.ShouldBe(1);
|
||||||
|
|
||||||
|
tracker.OnSubscribe();
|
||||||
|
tracker.SubscriberCount.ShouldBe(2);
|
||||||
|
|
||||||
|
tracker.OnSubscribe();
|
||||||
|
tracker.SubscriberCount.ShouldBe(3);
|
||||||
|
|
||||||
|
tracker.OnUnsubscribe();
|
||||||
|
tracker.SubscriberCount.ShouldBe(2);
|
||||||
|
}
|
||||||
|
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
// Test 5 — OnUnsubscribe floors subscriber count at zero (no negatives)
|
||||||
|
//
|
||||||
|
// Go reference: consumer.go deleteNotActive — stray unsub events must not
|
||||||
|
// drive the count below zero and corrupt subsequent interest checks.
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
[Fact]
|
||||||
|
public void OnUnsubscribe_floors_at_zero()
|
||||||
|
{
|
||||||
|
var tracker = new DeliveryInterestTracker();
|
||||||
|
|
||||||
|
tracker.OnUnsubscribe();
|
||||||
|
tracker.OnUnsubscribe();
|
||||||
|
|
||||||
|
tracker.SubscriberCount.ShouldBe(0);
|
||||||
|
tracker.HasInterest.ShouldBeFalse();
|
||||||
|
}
|
||||||
|
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
// Test 6 — ShouldDelete is false while interest exists
|
||||||
|
//
|
||||||
|
// Go reference: consumer.go deleteNotActive — ephemeral cleanup is only
|
||||||
|
// triggered when there are no active subscribers.
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
[Fact]
|
||||||
|
public void ShouldDelete_false_when_has_interest()
|
||||||
|
{
|
||||||
|
var tracker = new DeliveryInterestTracker(inactiveTimeout: TimeSpan.FromMilliseconds(1));
|
||||||
|
|
||||||
|
tracker.OnSubscribe();
|
||||||
|
|
||||||
|
tracker.ShouldDelete.ShouldBeFalse();
|
||||||
|
}
|
||||||
|
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
// Test 7 — ShouldDelete is false immediately after unsubscribe (timeout
|
||||||
|
// has not yet elapsed)
|
||||||
|
//
|
||||||
|
// Go reference: consumer.go deleteNotActive — the inactive timeout must
|
||||||
|
// fully elapse before the consumer is eligible for deletion.
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
[Fact]
|
||||||
|
public void ShouldDelete_false_immediately_after_unsubscribe()
|
||||||
|
{
|
||||||
|
var tracker = new DeliveryInterestTracker(inactiveTimeout: TimeSpan.FromSeconds(30));
|
||||||
|
|
||||||
|
tracker.OnSubscribe();
|
||||||
|
tracker.OnUnsubscribe();
|
||||||
|
|
||||||
|
// No wait — timeout has not elapsed yet.
|
||||||
|
tracker.ShouldDelete.ShouldBeFalse();
|
||||||
|
}
|
||||||
|
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
// Test 8 — ShouldDelete is true after the inactive timeout elapses with
|
||||||
|
// zero subscribers
|
||||||
|
//
|
||||||
|
// Go reference: consumer.go deleteNotActive — once the configurable
|
||||||
|
// MaxAckPending / inactive threshold passes, the ephemeral consumer is
|
||||||
|
// scheduled for removal.
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
[SlopwatchSuppress("SW004", "Intentional timeout test: ShouldDelete requires real wall-clock elapsed time to observe the inactive threshold firing; no synchronisation primitive can replace this")]
|
||||||
|
[Fact]
|
||||||
|
public async Task ShouldDelete_true_after_timeout()
|
||||||
|
{
|
||||||
|
var tracker = new DeliveryInterestTracker(inactiveTimeout: TimeSpan.FromMilliseconds(50));
|
||||||
|
|
||||||
|
tracker.OnSubscribe();
|
||||||
|
tracker.OnUnsubscribe();
|
||||||
|
|
||||||
|
await Task.Delay(100); // Wait for timeout to elapse.
|
||||||
|
|
||||||
|
tracker.ShouldDelete.ShouldBeTrue();
|
||||||
|
}
|
||||||
|
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
// Test 9 — Reset clears all state (count and inactivity timer)
|
||||||
|
//
|
||||||
|
// Go reference: consumer.go — Reset is used to reinitialise tracking when
|
||||||
|
// a consumer is re-attached or recreated.
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
[SlopwatchSuppress("SW004", "Intentional timeout test: must let the inactive threshold elapse to confirm Reset clears the inactivity timer; no synchronisation primitive can replace this")]
|
||||||
|
[Fact]
|
||||||
|
public async Task Reset_clears_all_state()
|
||||||
|
{
|
||||||
|
var tracker = new DeliveryInterestTracker(inactiveTimeout: TimeSpan.FromMilliseconds(50));
|
||||||
|
|
||||||
|
tracker.OnSubscribe();
|
||||||
|
tracker.OnSubscribe();
|
||||||
|
tracker.OnUnsubscribe();
|
||||||
|
tracker.OnUnsubscribe();
|
||||||
|
await Task.Delay(100); // Let timeout elapse.
|
||||||
|
|
||||||
|
tracker.Reset();
|
||||||
|
|
||||||
|
tracker.SubscriberCount.ShouldBe(0);
|
||||||
|
tracker.HasInterest.ShouldBeFalse();
|
||||||
|
tracker.ShouldDelete.ShouldBeFalse(); // inactivity timer also cleared
|
||||||
|
}
|
||||||
|
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
// Test 10 — Subscribing after an unsubscribe clears the inactivity timer
|
||||||
|
// so ShouldDelete stays false even after the original timeout
|
||||||
|
//
|
||||||
|
// Go reference: consumer.go hasDeliveryInterest — a re-subscription resets
|
||||||
|
// the inactive-since timestamp, preventing spurious cleanup.
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
[SlopwatchSuppress("SW004", "Intentional timeout test: must let the original inactive window pass after re-subscribe to confirm the inactivity timer was cleared; no synchronisation primitive can replace this")]
|
||||||
|
[Fact]
|
||||||
|
public async Task Subscribe_clears_inactivity_timer()
|
||||||
|
{
|
||||||
|
var tracker = new DeliveryInterestTracker(inactiveTimeout: TimeSpan.FromMilliseconds(50));
|
||||||
|
|
||||||
|
tracker.OnSubscribe();
|
||||||
|
tracker.OnUnsubscribe();
|
||||||
|
|
||||||
|
// Re-subscribe before the timeout elapses.
|
||||||
|
tracker.OnSubscribe();
|
||||||
|
|
||||||
|
await Task.Delay(100); // Original timeout window passes.
|
||||||
|
|
||||||
|
// Still has interest and timer was reset, so ShouldDelete must be false.
|
||||||
|
tracker.HasInterest.ShouldBeTrue();
|
||||||
|
tracker.ShouldDelete.ShouldBeFalse();
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user