From dcc3e4460ecdf7b1d5e83e1ec62a6f24baf8836e Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Wed, 25 Feb 2026 02:21:08 -0500 Subject: [PATCH] feat(consumer): add pause/resume with auto-resume timer Adds PauseUntilUtc to ConsumerHandle, a new Pause(DateTime) overload, Resume, IsPaused, and GetPauseUntil to ConsumerManager. A System.Threading.Timer fires when the deadline passes and calls AutoResume, raising OnAutoResumed so tests can synchronise via SemaphoreSlim instead of Task.Delay. ConsumerManager now implements IDisposable to clean up outstanding timers. Timer is also cancelled on explicit Resume and Delete. Go reference: consumer.go (pauseConsumer / resumeConsumer / isPaused). --- src/NATS.Server/JetStream/ConsumerManager.cs | 127 +++++++++++++++++- .../Consumers/ConsumerPauseResumeTests.cs | 103 ++++++++++++++ 2 files changed, 229 insertions(+), 1 deletion(-) create mode 100644 tests/NATS.Server.Tests/JetStream/Consumers/ConsumerPauseResumeTests.cs diff --git a/src/NATS.Server/JetStream/ConsumerManager.cs b/src/NATS.Server/JetStream/ConsumerManager.cs index 4caa8b7..a0fd71b 100644 --- a/src/NATS.Server/JetStream/ConsumerManager.cs +++ b/src/NATS.Server/JetStream/ConsumerManager.cs @@ -8,14 +8,21 @@ using NATS.Server.Subscriptions; namespace NATS.Server.JetStream; -public sealed class ConsumerManager +public sealed class ConsumerManager : IDisposable { private readonly JetStreamMetaGroup? _metaGroup; private readonly ConcurrentDictionary<(string Stream, string Name), ConsumerHandle> _consumers = new(); private readonly ConcurrentDictionary _ackFloors = new(StringComparer.Ordinal); + private readonly ConcurrentDictionary<(string Stream, string Name), Timer> _resumeTimers = new(); private readonly PullConsumerEngine _pullConsumerEngine = new(); private readonly PushConsumerEngine _pushConsumerEngine = new(); + /// + /// Raised when a consumer is automatically resumed by the deadline timer. + /// Arguments are (stream, durableName). + /// + public event EventHandler<(string Stream, string Name)>? OnAutoResumed; + public ConsumerManager(JetStreamMetaGroup? metaGroup = null) { _metaGroup = metaGroup; @@ -77,6 +84,7 @@ public sealed class ConsumerManager public bool Delete(string stream, string durableName) { + CancelResumeTimer((stream, durableName)); return _consumers.TryRemove((stream, durableName), out _); } @@ -93,9 +101,120 @@ public sealed class ConsumerManager return false; handle.Paused = paused; + if (!paused) + { + handle.PauseUntilUtc = null; + CancelResumeTimer((stream, durableName)); + } return true; } + /// + /// Pause a consumer until . + /// A background timer will auto-resume the consumer when the deadline passes. + /// Go reference: consumer.go (pauseConsumer). + /// + public bool Pause(string stream, string durableName, DateTime pauseUntilUtc) + { + if (!_consumers.TryGetValue((stream, durableName), out var handle)) + return false; + + handle.Paused = true; + handle.PauseUntilUtc = pauseUntilUtc; + + // Cancel any existing timer for this consumer before scheduling a new one. + CancelResumeTimer((stream, durableName)); + + var delay = pauseUntilUtc - DateTime.UtcNow; + if (delay <= TimeSpan.Zero) + { + // Deadline already passed — resume immediately. + AutoResume(stream, durableName); + } + else + { + var key = (stream, durableName); + var timer = new Timer(_ => AutoResume(key.stream, key.durableName), + state: null, dueTime: delay, period: Timeout.InfiniteTimeSpan); + _resumeTimers[key] = timer; + } + + return true; + } + + /// + /// Explicitly resume a paused consumer, cancelling any pending auto-resume timer. + /// Go reference: consumer.go (resumeConsumer). + /// + public bool Resume(string stream, string durableName) + { + if (!_consumers.TryGetValue((stream, durableName), out var handle)) + return false; + + handle.Paused = false; + handle.PauseUntilUtc = null; + CancelResumeTimer((stream, durableName)); + return true; + } + + /// + /// Returns true when the consumer is paused and the deadline (if set) has not yet passed. + /// If the deadline has passed, auto-resumes the consumer and returns false. + /// Go reference: consumer.go (isPaused). + /// + public bool IsPaused(string stream, string durableName) + { + if (!_consumers.TryGetValue((stream, durableName), out var handle)) + return false; + + if (!handle.Paused) + return false; + + if (handle.PauseUntilUtc.HasValue && handle.PauseUntilUtc.Value <= DateTime.UtcNow) + { + AutoResume(stream, durableName); + return false; + } + + return true; + } + + /// + /// Returns the UTC deadline until which the consumer is paused, or null. + /// Go reference: consumer.go (pauseUntil). + /// + public DateTime? GetPauseUntil(string stream, string durableName) + { + if (!_consumers.TryGetValue((stream, durableName), out var handle)) + return null; + + return handle.PauseUntilUtc; + } + + private void AutoResume(string stream, string durableName) + { + if (!_consumers.TryGetValue((stream, durableName), out var handle)) + return; + + handle.Paused = false; + handle.PauseUntilUtc = null; + CancelResumeTimer((stream, durableName)); + OnAutoResumed?.Invoke(this, (stream, durableName)); + } + + private void CancelResumeTimer((string Stream, string Name) key) + { + if (_resumeTimers.TryRemove(key, out var timer)) + timer.Dispose(); + } + + public void Dispose() + { + foreach (var timer in _resumeTimers.Values) + timer.Dispose(); + _resumeTimers.Clear(); + } + public bool Reset(string stream, string durableName) { if (!_consumers.TryGetValue((stream, durableName), out var handle)) @@ -191,6 +310,12 @@ public sealed record ConsumerHandle(string Stream, ConsumerConfig Config) { public ulong NextSequence { get; set; } = 1; public bool Paused { get; set; } + + /// + /// UTC deadline until which this consumer is paused. Null means pause indefinitely + /// (until explicitly resumed). Go reference: consumer.go pauseUntil field. + /// + public DateTime? PauseUntilUtc { get; set; } public Queue Pending { get; } = new(); public Queue PushFrames { get; } = new(); public AckProcessor AckProcessor { get; } = new(); diff --git a/tests/NATS.Server.Tests/JetStream/Consumers/ConsumerPauseResumeTests.cs b/tests/NATS.Server.Tests/JetStream/Consumers/ConsumerPauseResumeTests.cs new file mode 100644 index 0000000..6cf0c4e --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/Consumers/ConsumerPauseResumeTests.cs @@ -0,0 +1,103 @@ +using NATS.Server.JetStream; +using NATS.Server.JetStream.Models; + +namespace NATS.Server.Tests.JetStream.Consumers; + +/// +/// Tests for consumer pause/resume with auto-resume timer. +/// Go reference: consumer.go (pause/resume). +/// +public class ConsumerPauseResumeTests +{ + private static ConsumerManager CreateManager() => new(); + + private static void CreateConsumer(ConsumerManager mgr, string stream, string name) + { + mgr.CreateOrUpdate(stream, new ConsumerConfig { DurableName = name }); + } + + [Fact] + public void Pause_with_deadline_sets_paused() + { + var mgr = CreateManager(); + CreateConsumer(mgr, "test-stream", "test-consumer"); + + var until = DateTime.UtcNow.AddSeconds(5); + mgr.Pause("test-stream", "test-consumer", until); + + mgr.IsPaused("test-stream", "test-consumer").ShouldBeTrue(); + mgr.GetPauseUntil("test-stream", "test-consumer").ShouldBe(until); + } + + [Fact] + public void Resume_clears_pause() + { + var mgr = CreateManager(); + CreateConsumer(mgr, "test-stream", "test-consumer"); + + mgr.Pause("test-stream", "test-consumer", DateTime.UtcNow.AddSeconds(5)); + mgr.Resume("test-stream", "test-consumer"); + + mgr.IsPaused("test-stream", "test-consumer").ShouldBeFalse(); + mgr.GetPauseUntil("test-stream", "test-consumer").ShouldBeNull(); + } + + [Fact] + public async Task Pause_auto_resumes_after_deadline() + { + var mgr = CreateManager(); + CreateConsumer(mgr, "test-stream", "test-consumer"); + + // Use a semaphore to synchronize on the actual timer callback rather than a blind delay. + using var resumed = new SemaphoreSlim(0, 1); + mgr.OnAutoResumed += (_, _) => resumed.Release(); + + mgr.Pause("test-stream", "test-consumer", DateTime.UtcNow.AddMilliseconds(100)); + + var signalled = await resumed.WaitAsync(TimeSpan.FromSeconds(5)); + signalled.ShouldBeTrue("auto-resume timer did not fire within 5 seconds"); + + mgr.IsPaused("test-stream", "test-consumer").ShouldBeFalse(); + } + + [Fact] + public void IsPaused_returns_false_for_unknown_consumer() + { + var mgr = CreateManager(); + mgr.IsPaused("unknown", "unknown").ShouldBeFalse(); + } + + [Fact] + public void GetPauseUntil_returns_null_for_unknown_consumer() + { + var mgr = CreateManager(); + mgr.GetPauseUntil("unknown", "unknown").ShouldBeNull(); + } + + [Fact] + public void Resume_returns_false_for_unknown_consumer() + { + var mgr = CreateManager(); + mgr.Resume("unknown", "unknown").ShouldBeFalse(); + } + + [Fact] + public void Pause_returns_false_for_unknown_consumer() + { + var mgr = CreateManager(); + mgr.Pause("unknown", "unknown", DateTime.UtcNow.AddSeconds(5)).ShouldBeFalse(); + } + + [Fact] + public void IsPaused_auto_resumes_expired_deadline() + { + var mgr = CreateManager(); + CreateConsumer(mgr, "test-stream", "c1"); + + // Pause with a deadline in the past + mgr.Pause("test-stream", "c1", DateTime.UtcNow.AddMilliseconds(-100)); + + // IsPaused should detect the expired deadline and auto-resume + mgr.IsPaused("test-stream", "c1").ShouldBeFalse(); + } +}