diff --git a/src/NATS.Server/JetStream/ConsumerManager.cs b/src/NATS.Server/JetStream/ConsumerManager.cs index 4caa8b7..a0fd71b 100644 --- a/src/NATS.Server/JetStream/ConsumerManager.cs +++ b/src/NATS.Server/JetStream/ConsumerManager.cs @@ -8,14 +8,21 @@ using NATS.Server.Subscriptions; namespace NATS.Server.JetStream; -public sealed class ConsumerManager +public sealed class ConsumerManager : IDisposable { private readonly JetStreamMetaGroup? _metaGroup; private readonly ConcurrentDictionary<(string Stream, string Name), ConsumerHandle> _consumers = new(); private readonly ConcurrentDictionary _ackFloors = new(StringComparer.Ordinal); + private readonly ConcurrentDictionary<(string Stream, string Name), Timer> _resumeTimers = new(); private readonly PullConsumerEngine _pullConsumerEngine = new(); private readonly PushConsumerEngine _pushConsumerEngine = new(); + /// + /// Raised when a consumer is automatically resumed by the deadline timer. + /// Arguments are (stream, durableName). + /// + public event EventHandler<(string Stream, string Name)>? OnAutoResumed; + public ConsumerManager(JetStreamMetaGroup? metaGroup = null) { _metaGroup = metaGroup; @@ -77,6 +84,7 @@ public sealed class ConsumerManager public bool Delete(string stream, string durableName) { + CancelResumeTimer((stream, durableName)); return _consumers.TryRemove((stream, durableName), out _); } @@ -93,9 +101,120 @@ public sealed class ConsumerManager return false; handle.Paused = paused; + if (!paused) + { + handle.PauseUntilUtc = null; + CancelResumeTimer((stream, durableName)); + } return true; } + /// + /// Pause a consumer until . + /// A background timer will auto-resume the consumer when the deadline passes. + /// Go reference: consumer.go (pauseConsumer). + /// + public bool Pause(string stream, string durableName, DateTime pauseUntilUtc) + { + if (!_consumers.TryGetValue((stream, durableName), out var handle)) + return false; + + handle.Paused = true; + handle.PauseUntilUtc = pauseUntilUtc; + + // Cancel any existing timer for this consumer before scheduling a new one. + CancelResumeTimer((stream, durableName)); + + var delay = pauseUntilUtc - DateTime.UtcNow; + if (delay <= TimeSpan.Zero) + { + // Deadline already passed — resume immediately. + AutoResume(stream, durableName); + } + else + { + var key = (stream, durableName); + var timer = new Timer(_ => AutoResume(key.stream, key.durableName), + state: null, dueTime: delay, period: Timeout.InfiniteTimeSpan); + _resumeTimers[key] = timer; + } + + return true; + } + + /// + /// Explicitly resume a paused consumer, cancelling any pending auto-resume timer. + /// Go reference: consumer.go (resumeConsumer). + /// + public bool Resume(string stream, string durableName) + { + if (!_consumers.TryGetValue((stream, durableName), out var handle)) + return false; + + handle.Paused = false; + handle.PauseUntilUtc = null; + CancelResumeTimer((stream, durableName)); + return true; + } + + /// + /// Returns true when the consumer is paused and the deadline (if set) has not yet passed. + /// If the deadline has passed, auto-resumes the consumer and returns false. + /// Go reference: consumer.go (isPaused). + /// + public bool IsPaused(string stream, string durableName) + { + if (!_consumers.TryGetValue((stream, durableName), out var handle)) + return false; + + if (!handle.Paused) + return false; + + if (handle.PauseUntilUtc.HasValue && handle.PauseUntilUtc.Value <= DateTime.UtcNow) + { + AutoResume(stream, durableName); + return false; + } + + return true; + } + + /// + /// Returns the UTC deadline until which the consumer is paused, or null. + /// Go reference: consumer.go (pauseUntil). + /// + public DateTime? GetPauseUntil(string stream, string durableName) + { + if (!_consumers.TryGetValue((stream, durableName), out var handle)) + return null; + + return handle.PauseUntilUtc; + } + + private void AutoResume(string stream, string durableName) + { + if (!_consumers.TryGetValue((stream, durableName), out var handle)) + return; + + handle.Paused = false; + handle.PauseUntilUtc = null; + CancelResumeTimer((stream, durableName)); + OnAutoResumed?.Invoke(this, (stream, durableName)); + } + + private void CancelResumeTimer((string Stream, string Name) key) + { + if (_resumeTimers.TryRemove(key, out var timer)) + timer.Dispose(); + } + + public void Dispose() + { + foreach (var timer in _resumeTimers.Values) + timer.Dispose(); + _resumeTimers.Clear(); + } + public bool Reset(string stream, string durableName) { if (!_consumers.TryGetValue((stream, durableName), out var handle)) @@ -191,6 +310,12 @@ public sealed record ConsumerHandle(string Stream, ConsumerConfig Config) { public ulong NextSequence { get; set; } = 1; public bool Paused { get; set; } + + /// + /// UTC deadline until which this consumer is paused. Null means pause indefinitely + /// (until explicitly resumed). Go reference: consumer.go pauseUntil field. + /// + public DateTime? PauseUntilUtc { get; set; } public Queue Pending { get; } = new(); public Queue PushFrames { get; } = new(); public AckProcessor AckProcessor { get; } = new(); diff --git a/tests/NATS.Server.Tests/JetStream/Consumers/ConsumerPauseResumeTests.cs b/tests/NATS.Server.Tests/JetStream/Consumers/ConsumerPauseResumeTests.cs new file mode 100644 index 0000000..6cf0c4e --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/Consumers/ConsumerPauseResumeTests.cs @@ -0,0 +1,103 @@ +using NATS.Server.JetStream; +using NATS.Server.JetStream.Models; + +namespace NATS.Server.Tests.JetStream.Consumers; + +/// +/// Tests for consumer pause/resume with auto-resume timer. +/// Go reference: consumer.go (pause/resume). +/// +public class ConsumerPauseResumeTests +{ + private static ConsumerManager CreateManager() => new(); + + private static void CreateConsumer(ConsumerManager mgr, string stream, string name) + { + mgr.CreateOrUpdate(stream, new ConsumerConfig { DurableName = name }); + } + + [Fact] + public void Pause_with_deadline_sets_paused() + { + var mgr = CreateManager(); + CreateConsumer(mgr, "test-stream", "test-consumer"); + + var until = DateTime.UtcNow.AddSeconds(5); + mgr.Pause("test-stream", "test-consumer", until); + + mgr.IsPaused("test-stream", "test-consumer").ShouldBeTrue(); + mgr.GetPauseUntil("test-stream", "test-consumer").ShouldBe(until); + } + + [Fact] + public void Resume_clears_pause() + { + var mgr = CreateManager(); + CreateConsumer(mgr, "test-stream", "test-consumer"); + + mgr.Pause("test-stream", "test-consumer", DateTime.UtcNow.AddSeconds(5)); + mgr.Resume("test-stream", "test-consumer"); + + mgr.IsPaused("test-stream", "test-consumer").ShouldBeFalse(); + mgr.GetPauseUntil("test-stream", "test-consumer").ShouldBeNull(); + } + + [Fact] + public async Task Pause_auto_resumes_after_deadline() + { + var mgr = CreateManager(); + CreateConsumer(mgr, "test-stream", "test-consumer"); + + // Use a semaphore to synchronize on the actual timer callback rather than a blind delay. + using var resumed = new SemaphoreSlim(0, 1); + mgr.OnAutoResumed += (_, _) => resumed.Release(); + + mgr.Pause("test-stream", "test-consumer", DateTime.UtcNow.AddMilliseconds(100)); + + var signalled = await resumed.WaitAsync(TimeSpan.FromSeconds(5)); + signalled.ShouldBeTrue("auto-resume timer did not fire within 5 seconds"); + + mgr.IsPaused("test-stream", "test-consumer").ShouldBeFalse(); + } + + [Fact] + public void IsPaused_returns_false_for_unknown_consumer() + { + var mgr = CreateManager(); + mgr.IsPaused("unknown", "unknown").ShouldBeFalse(); + } + + [Fact] + public void GetPauseUntil_returns_null_for_unknown_consumer() + { + var mgr = CreateManager(); + mgr.GetPauseUntil("unknown", "unknown").ShouldBeNull(); + } + + [Fact] + public void Resume_returns_false_for_unknown_consumer() + { + var mgr = CreateManager(); + mgr.Resume("unknown", "unknown").ShouldBeFalse(); + } + + [Fact] + public void Pause_returns_false_for_unknown_consumer() + { + var mgr = CreateManager(); + mgr.Pause("unknown", "unknown", DateTime.UtcNow.AddSeconds(5)).ShouldBeFalse(); + } + + [Fact] + public void IsPaused_auto_resumes_expired_deadline() + { + var mgr = CreateManager(); + CreateConsumer(mgr, "test-stream", "c1"); + + // Pause with a deadline in the past + mgr.Pause("test-stream", "c1", DateTime.UtcNow.AddMilliseconds(-100)); + + // IsPaused should detect the expired deadline and auto-resume + mgr.IsPaused("test-stream", "c1").ShouldBeFalse(); + } +}