feat(consumer): add pause/resume with auto-resume timer
Adds PauseUntilUtc to ConsumerHandle, a new Pause(DateTime) overload, Resume, IsPaused, and GetPauseUntil to ConsumerManager. A System.Threading.Timer fires when the deadline passes and calls AutoResume, raising OnAutoResumed so tests can synchronise via SemaphoreSlim instead of Task.Delay. ConsumerManager now implements IDisposable to clean up outstanding timers. Timer is also cancelled on explicit Resume and Delete. Go reference: consumer.go (pauseConsumer / resumeConsumer / isPaused).
This commit is contained in:
@@ -8,14 +8,21 @@ using NATS.Server.Subscriptions;
|
||||
|
||||
namespace NATS.Server.JetStream;
|
||||
|
||||
public sealed class ConsumerManager
|
||||
public sealed class ConsumerManager : IDisposable
|
||||
{
|
||||
private readonly JetStreamMetaGroup? _metaGroup;
|
||||
private readonly ConcurrentDictionary<(string Stream, string Name), ConsumerHandle> _consumers = new();
|
||||
private readonly ConcurrentDictionary<string, ulong> _ackFloors = new(StringComparer.Ordinal);
|
||||
private readonly ConcurrentDictionary<(string Stream, string Name), Timer> _resumeTimers = new();
|
||||
private readonly PullConsumerEngine _pullConsumerEngine = new();
|
||||
private readonly PushConsumerEngine _pushConsumerEngine = new();
|
||||
|
||||
/// <summary>
|
||||
/// Raised when a consumer is automatically resumed by the deadline timer.
|
||||
/// Arguments are (stream, durableName).
|
||||
/// </summary>
|
||||
public event EventHandler<(string Stream, string Name)>? OnAutoResumed;
|
||||
|
||||
public ConsumerManager(JetStreamMetaGroup? metaGroup = null)
|
||||
{
|
||||
_metaGroup = metaGroup;
|
||||
@@ -77,6 +84,7 @@ public sealed class ConsumerManager
|
||||
|
||||
public bool Delete(string stream, string durableName)
|
||||
{
|
||||
CancelResumeTimer((stream, durableName));
|
||||
return _consumers.TryRemove((stream, durableName), out _);
|
||||
}
|
||||
|
||||
@@ -93,9 +101,120 @@ public sealed class ConsumerManager
|
||||
return false;
|
||||
|
||||
handle.Paused = paused;
|
||||
if (!paused)
|
||||
{
|
||||
handle.PauseUntilUtc = null;
|
||||
CancelResumeTimer((stream, durableName));
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Pause a consumer until <paramref name="pauseUntilUtc"/>.
|
||||
/// A background timer will auto-resume the consumer when the deadline passes.
|
||||
/// Go reference: consumer.go (pauseConsumer).
|
||||
/// </summary>
|
||||
public bool Pause(string stream, string durableName, DateTime pauseUntilUtc)
|
||||
{
|
||||
if (!_consumers.TryGetValue((stream, durableName), out var handle))
|
||||
return false;
|
||||
|
||||
handle.Paused = true;
|
||||
handle.PauseUntilUtc = pauseUntilUtc;
|
||||
|
||||
// Cancel any existing timer for this consumer before scheduling a new one.
|
||||
CancelResumeTimer((stream, durableName));
|
||||
|
||||
var delay = pauseUntilUtc - DateTime.UtcNow;
|
||||
if (delay <= TimeSpan.Zero)
|
||||
{
|
||||
// Deadline already passed — resume immediately.
|
||||
AutoResume(stream, durableName);
|
||||
}
|
||||
else
|
||||
{
|
||||
var key = (stream, durableName);
|
||||
var timer = new Timer(_ => AutoResume(key.stream, key.durableName),
|
||||
state: null, dueTime: delay, period: Timeout.InfiniteTimeSpan);
|
||||
_resumeTimers[key] = timer;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Explicitly resume a paused consumer, cancelling any pending auto-resume timer.
|
||||
/// Go reference: consumer.go (resumeConsumer).
|
||||
/// </summary>
|
||||
public bool Resume(string stream, string durableName)
|
||||
{
|
||||
if (!_consumers.TryGetValue((stream, durableName), out var handle))
|
||||
return false;
|
||||
|
||||
handle.Paused = false;
|
||||
handle.PauseUntilUtc = null;
|
||||
CancelResumeTimer((stream, durableName));
|
||||
return true;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Returns true when the consumer is paused and the deadline (if set) has not yet passed.
|
||||
/// If the deadline has passed, auto-resumes the consumer and returns false.
|
||||
/// Go reference: consumer.go (isPaused).
|
||||
/// </summary>
|
||||
public bool IsPaused(string stream, string durableName)
|
||||
{
|
||||
if (!_consumers.TryGetValue((stream, durableName), out var handle))
|
||||
return false;
|
||||
|
||||
if (!handle.Paused)
|
||||
return false;
|
||||
|
||||
if (handle.PauseUntilUtc.HasValue && handle.PauseUntilUtc.Value <= DateTime.UtcNow)
|
||||
{
|
||||
AutoResume(stream, durableName);
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Returns the UTC deadline until which the consumer is paused, or null.
|
||||
/// Go reference: consumer.go (pauseUntil).
|
||||
/// </summary>
|
||||
public DateTime? GetPauseUntil(string stream, string durableName)
|
||||
{
|
||||
if (!_consumers.TryGetValue((stream, durableName), out var handle))
|
||||
return null;
|
||||
|
||||
return handle.PauseUntilUtc;
|
||||
}
|
||||
|
||||
private void AutoResume(string stream, string durableName)
|
||||
{
|
||||
if (!_consumers.TryGetValue((stream, durableName), out var handle))
|
||||
return;
|
||||
|
||||
handle.Paused = false;
|
||||
handle.PauseUntilUtc = null;
|
||||
CancelResumeTimer((stream, durableName));
|
||||
OnAutoResumed?.Invoke(this, (stream, durableName));
|
||||
}
|
||||
|
||||
private void CancelResumeTimer((string Stream, string Name) key)
|
||||
{
|
||||
if (_resumeTimers.TryRemove(key, out var timer))
|
||||
timer.Dispose();
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
foreach (var timer in _resumeTimers.Values)
|
||||
timer.Dispose();
|
||||
_resumeTimers.Clear();
|
||||
}
|
||||
|
||||
public bool Reset(string stream, string durableName)
|
||||
{
|
||||
if (!_consumers.TryGetValue((stream, durableName), out var handle))
|
||||
@@ -191,6 +310,12 @@ public sealed record ConsumerHandle(string Stream, ConsumerConfig Config)
|
||||
{
|
||||
public ulong NextSequence { get; set; } = 1;
|
||||
public bool Paused { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// UTC deadline until which this consumer is paused. Null means pause indefinitely
|
||||
/// (until explicitly resumed). Go reference: consumer.go pauseUntil field.
|
||||
/// </summary>
|
||||
public DateTime? PauseUntilUtc { get; set; }
|
||||
public Queue<StoredMessage> Pending { get; } = new();
|
||||
public Queue<PushFrame> PushFrames { get; } = new();
|
||||
public AckProcessor AckProcessor { get; } = new();
|
||||
|
||||
Reference in New Issue
Block a user