using System.Collections.Concurrent; using NATS.Server.JetStream.Api; using NATS.Server.JetStream.Cluster; using NATS.Server.JetStream.Consumers; using NATS.Server.JetStream.Models; using NATS.Server.JetStream.Storage; using NATS.Server.Subscriptions; namespace NATS.Server.JetStream; public sealed class ConsumerManager { private readonly JetStreamMetaGroup? _metaGroup; private readonly ConcurrentDictionary<(string Stream, string Name), ConsumerHandle> _consumers = new(); private readonly ConcurrentDictionary _ackFloors = new(StringComparer.Ordinal); private readonly PullConsumerEngine _pullConsumerEngine = new(); private readonly PushConsumerEngine _pushConsumerEngine = new(); public ConsumerManager(JetStreamMetaGroup? metaGroup = null) { _metaGroup = metaGroup; } public int ConsumerCount => _consumers.Count; public JetStreamApiResponse CreateOrUpdate(string stream, ConsumerConfig config) { if (string.IsNullOrWhiteSpace(config.DurableName)) { if (config.Ephemeral) config.DurableName = $"ephemeral-{Guid.NewGuid():N}"[..24]; else return JetStreamApiResponse.ErrorResponse(400, "durable name required"); } if (config.FilterSubjects.Count == 0 && !string.IsNullOrWhiteSpace(config.FilterSubject)) config.FilterSubjects.Add(config.FilterSubject); if (config.DeliverPolicy == DeliverPolicy.LastPerSubject && string.IsNullOrWhiteSpace(config.ResolvePrimaryFilterSubject())) { return JetStreamApiResponse.ErrorResponse(400, "last per subject requires filter subject"); } var key = (stream, config.DurableName); var handle = _consumers.AddOrUpdate(key, _ => new ConsumerHandle(stream, config), (_, existing) => existing with { Config = config }); return new JetStreamApiResponse { ConsumerInfo = new JetStreamConsumerInfo { Config = handle.Config, }, }; } public JetStreamApiResponse GetInfo(string stream, string durableName) { if (_consumers.TryGetValue((stream, durableName), out var handle)) { return new JetStreamApiResponse { ConsumerInfo = new JetStreamConsumerInfo { Config = handle.Config, }, }; } return JetStreamApiResponse.NotFound($"$JS.API.CONSUMER.INFO.{stream}.{durableName}"); } public bool TryGet(string stream, string durableName, out ConsumerHandle handle) => _consumers.TryGetValue((stream, durableName), out handle!); public bool Delete(string stream, string durableName) { return _consumers.TryRemove((stream, durableName), out _); } public IReadOnlyList ListNames(string stream) => _consumers.Keys .Where(k => string.Equals(k.Stream, stream, StringComparison.Ordinal)) .Select(k => k.Name) .OrderBy(x => x, StringComparer.Ordinal) .ToArray(); public bool Pause(string stream, string durableName, bool paused) { if (!_consumers.TryGetValue((stream, durableName), out var handle)) return false; handle.Paused = paused; return true; } public bool Reset(string stream, string durableName) { if (!_consumers.TryGetValue((stream, durableName), out var handle)) return false; handle.NextSequence = 1; handle.Pending.Clear(); return true; } public bool Unpin(string stream, string durableName) { return _consumers.ContainsKey((stream, durableName)); } public async ValueTask FetchAsync(string stream, string durableName, int batch, StreamManager streamManager, CancellationToken ct) => await FetchAsync(stream, durableName, new PullFetchRequest { Batch = batch }, streamManager, ct); public async ValueTask FetchAsync(string stream, string durableName, PullFetchRequest request, StreamManager streamManager, CancellationToken ct) { if (!_consumers.TryGetValue((stream, durableName), out var consumer)) return new PullFetchBatch([]); if (!streamManager.TryGet(stream, out var streamHandle)) return new PullFetchBatch([]); return await _pullConsumerEngine.FetchAsync(streamHandle, consumer, request, ct); } public bool AckAll(string stream, string durableName, ulong sequence) { if (!_consumers.TryGetValue((stream, durableName), out var handle)) return false; handle.AckProcessor.AckAll(sequence); _ackFloors.AddOrUpdate(stream, _ => handle.AckProcessor.AckFloor, (_, existing) => Math.Max(existing, handle.AckProcessor.AckFloor)); return true; } public int GetPendingCount(string stream, string durableName) { if (!_consumers.TryGetValue((stream, durableName), out var handle)) return 0; return handle.AckProcessor.PendingCount; } public void OnPublished(string stream, StoredMessage message) { foreach (var handle in _consumers.Values.Where(c => c.Stream == stream && c.Config.Push)) { if (!MatchesFilter(handle.Config, message.Subject)) continue; if (handle.Config.MaxAckPending > 0 && handle.AckProcessor.PendingCount >= handle.Config.MaxAckPending) continue; _pushConsumerEngine.Enqueue(handle, message); } } public PushFrame? ReadPushFrame(string stream, string durableName) { if (!_consumers.TryGetValue((stream, durableName), out var consumer)) return null; if (consumer.PushFrames.Count == 0) return null; var frame = consumer.PushFrames.Peek(); if (frame.AvailableAtUtc > DateTime.UtcNow) return null; return consumer.PushFrames.Dequeue(); } private static bool MatchesFilter(ConsumerConfig config, string subject) { if (config.FilterSubjects.Count > 0) return config.FilterSubjects.Any(f => SubjectMatch.MatchLiteral(subject, f)); if (!string.IsNullOrWhiteSpace(config.FilterSubject)) return SubjectMatch.MatchLiteral(subject, config.FilterSubject); return true; } internal ulong GetAckFloor(string stream) => _ackFloors.TryGetValue(stream, out var ackFloor) ? ackFloor : 0; } public sealed record ConsumerHandle(string Stream, ConsumerConfig Config) { public ulong NextSequence { get; set; } = 1; public bool Paused { get; set; } public Queue Pending { get; } = new(); public Queue PushFrames { get; } = new(); public AckProcessor AckProcessor { get; } = new(); public DateTime NextPushDataAvailableAtUtc { get; set; } }