using System.Collections.Concurrent; using NATS.Server.JetStream.Api; using NATS.Server.JetStream.Cluster; using NATS.Server.JetStream.Consumers; using NATS.Server.JetStream.Models; using NATS.Server.JetStream.Storage; using NATS.Server.Subscriptions; namespace NATS.Server.JetStream; public sealed class ConsumerManager : IDisposable { private readonly JetStreamMetaGroup? _metaGroup; private readonly ConcurrentDictionary<(string Stream, string Name), ConsumerHandle> _consumers = new(); private readonly ConcurrentDictionary _ackFloors = new(StringComparer.Ordinal); private readonly ConcurrentDictionary<(string Stream, string Name), Timer> _resumeTimers = new(); private readonly PullConsumerEngine _pullConsumerEngine = new(); private readonly PushConsumerEngine _pushConsumerEngine = new(); /// /// Raised when a consumer is automatically resumed by the deadline timer. /// Arguments are (stream, durableName). /// public event EventHandler<(string Stream, string Name)>? OnAutoResumed; public ConsumerManager(JetStreamMetaGroup? metaGroup = null) { _metaGroup = metaGroup; } public int ConsumerCount => _consumers.Count; public JetStreamApiResponse CreateOrUpdate(string stream, ConsumerConfig config) { if (string.IsNullOrWhiteSpace(config.DurableName)) { if (config.Ephemeral) config.DurableName = $"ephemeral-{Guid.NewGuid():N}"[..24]; else return JetStreamApiResponse.ErrorResponse(400, "durable name required"); } if (config.FilterSubjects.Count == 0 && !string.IsNullOrWhiteSpace(config.FilterSubject)) config.FilterSubjects.Add(config.FilterSubject); if (config.DeliverPolicy == DeliverPolicy.LastPerSubject && string.IsNullOrWhiteSpace(config.ResolvePrimaryFilterSubject())) { return JetStreamApiResponse.ErrorResponse(400, "last per subject requires filter subject"); } var key = (stream, config.DurableName); var handle = _consumers.AddOrUpdate(key, _ => new ConsumerHandle(stream, config), (_, existing) => existing with { Config = config }); return new JetStreamApiResponse { ConsumerInfo = new JetStreamConsumerInfo { Config = handle.Config, }, }; } public JetStreamApiResponse GetInfo(string stream, string durableName) { if (_consumers.TryGetValue((stream, durableName), out var handle)) { return new JetStreamApiResponse { ConsumerInfo = new JetStreamConsumerInfo { Config = handle.Config, }, }; } return JetStreamApiResponse.NotFound($"$JS.API.CONSUMER.INFO.{stream}.{durableName}"); } public bool TryGet(string stream, string durableName, out ConsumerHandle handle) => _consumers.TryGetValue((stream, durableName), out handle!); public bool Delete(string stream, string durableName) { CancelResumeTimer((stream, durableName)); return _consumers.TryRemove((stream, durableName), out _); } public IReadOnlyList ListNames(string stream) => _consumers.Keys .Where(k => string.Equals(k.Stream, stream, StringComparison.Ordinal)) .Select(k => k.Name) .OrderBy(x => x, StringComparer.Ordinal) .ToArray(); public bool Pause(string stream, string durableName, bool paused) { if (!_consumers.TryGetValue((stream, durableName), out var handle)) return false; handle.Paused = paused; if (!paused) { handle.PauseUntilUtc = null; CancelResumeTimer((stream, durableName)); } return true; } /// /// Pause a consumer until . /// A background timer will auto-resume the consumer when the deadline passes. /// Go reference: consumer.go (pauseConsumer). /// public bool Pause(string stream, string durableName, DateTime pauseUntilUtc) { if (!_consumers.TryGetValue((stream, durableName), out var handle)) return false; handle.Paused = true; handle.PauseUntilUtc = pauseUntilUtc; // Cancel any existing timer for this consumer before scheduling a new one. CancelResumeTimer((stream, durableName)); var delay = pauseUntilUtc - DateTime.UtcNow; if (delay <= TimeSpan.Zero) { // Deadline already passed — resume immediately. AutoResume(stream, durableName); } else { var key = (stream, durableName); var timer = new Timer(_ => AutoResume(key.stream, key.durableName), state: null, dueTime: delay, period: Timeout.InfiniteTimeSpan); _resumeTimers[key] = timer; } return true; } /// /// Explicitly resume a paused consumer, cancelling any pending auto-resume timer. /// Go reference: consumer.go (resumeConsumer). /// public bool Resume(string stream, string durableName) { if (!_consumers.TryGetValue((stream, durableName), out var handle)) return false; handle.Paused = false; handle.PauseUntilUtc = null; CancelResumeTimer((stream, durableName)); return true; } /// /// Returns true when the consumer is paused and the deadline (if set) has not yet passed. /// If the deadline has passed, auto-resumes the consumer and returns false. /// Go reference: consumer.go (isPaused). /// public bool IsPaused(string stream, string durableName) { if (!_consumers.TryGetValue((stream, durableName), out var handle)) return false; if (!handle.Paused) return false; if (handle.PauseUntilUtc.HasValue && handle.PauseUntilUtc.Value <= DateTime.UtcNow) { AutoResume(stream, durableName); return false; } return true; } /// /// Returns the UTC deadline until which the consumer is paused, or null. /// Go reference: consumer.go (pauseUntil). /// public DateTime? GetPauseUntil(string stream, string durableName) { if (!_consumers.TryGetValue((stream, durableName), out var handle)) return null; return handle.PauseUntilUtc; } private void AutoResume(string stream, string durableName) { if (!_consumers.TryGetValue((stream, durableName), out var handle)) return; handle.Paused = false; handle.PauseUntilUtc = null; CancelResumeTimer((stream, durableName)); OnAutoResumed?.Invoke(this, (stream, durableName)); } private void CancelResumeTimer((string Stream, string Name) key) { if (_resumeTimers.TryRemove(key, out var timer)) timer.Dispose(); } public void Dispose() { foreach (var timer in _resumeTimers.Values) timer.Dispose(); _resumeTimers.Clear(); } public bool Reset(string stream, string durableName) { if (!_consumers.TryGetValue((stream, durableName), out var handle)) return false; handle.NextSequence = 1; handle.Pending.Clear(); return true; } public bool Unpin(string stream, string durableName) { return _consumers.ContainsKey((stream, durableName)); } public async ValueTask FetchAsync(string stream, string durableName, int batch, StreamManager streamManager, CancellationToken ct) => await FetchAsync(stream, durableName, new PullFetchRequest { Batch = batch }, streamManager, ct); public async ValueTask FetchAsync(string stream, string durableName, PullFetchRequest request, StreamManager streamManager, CancellationToken ct) { if (!_consumers.TryGetValue((stream, durableName), out var consumer)) return new PullFetchBatch([]); if (!streamManager.TryGet(stream, out var streamHandle)) return new PullFetchBatch([]); return await _pullConsumerEngine.FetchAsync(streamHandle, consumer, request, ct); } public bool AckAll(string stream, string durableName, ulong sequence) { if (!_consumers.TryGetValue((stream, durableName), out var handle)) return false; handle.AckProcessor.AckAll(sequence); _ackFloors.AddOrUpdate(stream, _ => handle.AckProcessor.AckFloor, (_, existing) => Math.Max(existing, handle.AckProcessor.AckFloor)); return true; } public int GetPendingCount(string stream, string durableName) { if (!_consumers.TryGetValue((stream, durableName), out var handle)) return 0; return handle.AckProcessor.PendingCount; } public void OnPublished(string stream, StoredMessage message) { foreach (var handle in _consumers.Values.Where(c => c.Stream == stream && c.Config.Push)) { if (!MatchesFilter(handle.Config, message.Subject)) continue; if (handle.Config.MaxAckPending > 0 && handle.AckProcessor.PendingCount >= handle.Config.MaxAckPending) continue; _pushConsumerEngine.Enqueue(handle, message); } } public PushFrame? ReadPushFrame(string stream, string durableName) { if (!_consumers.TryGetValue((stream, durableName), out var consumer)) return null; if (consumer.PushFrames.Count == 0) return null; var frame = consumer.PushFrames.Peek(); if (frame.AvailableAtUtc > DateTime.UtcNow) return null; return consumer.PushFrames.Dequeue(); } private static bool MatchesFilter(ConsumerConfig config, string subject) { if (config.FilterSubjects.Count > 0) return config.FilterSubjects.Any(f => SubjectMatch.MatchLiteral(subject, f)); if (!string.IsNullOrWhiteSpace(config.FilterSubject)) return SubjectMatch.MatchLiteral(subject, config.FilterSubject); return true; } internal ulong GetAckFloor(string stream) => _ackFloors.TryGetValue(stream, out var ackFloor) ? ackFloor : 0; } public sealed record ConsumerHandle(string Stream, ConsumerConfig Config) { public ulong NextSequence { get; set; } = 1; public bool Paused { get; set; } /// /// UTC deadline until which this consumer is paused. Null means pause indefinitely /// (until explicitly resumed). Go reference: consumer.go pauseUntil field. /// public DateTime? PauseUntilUtc { get; set; } public Queue Pending { get; } = new(); public Queue PushFrames { get; } = new(); public AckProcessor AckProcessor { get; } = new(); public DateTime NextPushDataAvailableAtUtc { get; set; } /// /// Total pending bytes across all unacknowledged messages. /// Included in idle heartbeat headers as Nats-Pending-Bytes. /// Go reference: consumer.go sendIdleHeartbeat. /// public long PendingBytes { get; set; } }