558 lines
22 KiB
C#
558 lines
22 KiB
C#
using System.Collections.Concurrent;
|
|
using NATS.Server.JetStream.Api;
|
|
using NATS.Server.JetStream.Cluster;
|
|
using NATS.Server.JetStream.Consumers;
|
|
using NATS.Server.JetStream.Models;
|
|
using NATS.Server.JetStream.Storage;
|
|
using NATS.Server.JetStream.Validation;
|
|
using NATS.Server.Subscriptions;
|
|
|
|
namespace NATS.Server.JetStream;
|
|
|
|
public sealed class ConsumerManager : IDisposable
|
|
{
|
|
private readonly JetStreamMetaGroup? _metaGroup;
|
|
private readonly ConcurrentDictionary<(string Stream, string Name), ConsumerHandle> _consumers = new();
|
|
private readonly ConcurrentDictionary<string, ulong> _ackFloors = new(StringComparer.Ordinal);
|
|
private readonly ConcurrentDictionary<(string Stream, string Name), Timer> _resumeTimers = new();
|
|
private readonly PullConsumerEngine _pullConsumerEngine = new();
|
|
private readonly PushConsumerEngine _pushConsumerEngine = new();
|
|
|
|
/// <summary>
|
|
/// Raised when a consumer is automatically resumed by the deadline timer.
|
|
/// Arguments are (stream, durableName).
|
|
/// </summary>
|
|
public event EventHandler<(string Stream, string Name)>? OnAutoResumed;
|
|
|
|
/// <summary>
|
|
/// Optional reference to the stream manager, used to resolve DeliverPolicy.New
|
|
/// start sequences at consumer creation time.
|
|
/// </summary>
|
|
public StreamManager? StreamManager { get; set; }
|
|
|
|
/// <summary>
|
|
/// Creates the consumer manager for stream-scoped durable/ephemeral consumers.
|
|
/// </summary>
|
|
/// <param name="metaGroup">Optional JetStream meta group reference for cluster-aware operations.</param>
|
|
public ConsumerManager(JetStreamMetaGroup? metaGroup = null)
|
|
{
|
|
_metaGroup = metaGroup;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Gets the number of registered consumers across all streams.
|
|
/// </summary>
|
|
public int ConsumerCount => _consumers.Count;
|
|
|
|
/// <summary>
|
|
/// Creates a new consumer or updates an existing durable consumer configuration.
|
|
/// </summary>
|
|
/// <param name="stream">Owning stream for the consumer.</param>
|
|
/// <param name="config">Requested consumer configuration from the JetStream API request.</param>
|
|
public JetStreamApiResponse CreateOrUpdate(string stream, ConsumerConfig config)
|
|
{
|
|
if (string.IsNullOrWhiteSpace(config.DurableName))
|
|
{
|
|
if (config.Ephemeral)
|
|
config.DurableName = $"ephemeral-{Guid.NewGuid():N}"[..24];
|
|
else
|
|
return JetStreamApiResponse.ErrorResponse(400, "durable name required");
|
|
}
|
|
|
|
if (!JetStreamConfigValidator.IsValidName(config.DurableName))
|
|
return JetStreamApiResponse.ErrorResponse(400, "invalid durable name");
|
|
|
|
if (!JetStreamConfigValidator.IsMetadataWithinLimit(config.Metadata))
|
|
return JetStreamApiResponse.ErrorResponse(400, "consumer metadata exceeds maximum size");
|
|
|
|
// Go: DeliverPolicy.New — snapshot the stream's current last sequence at creation
|
|
// time so the consumer only sees messages published after this point.
|
|
// Reference: server/consumer.go — setStartingSequenceForDeliverNew.
|
|
// We set OptStartSeq but preserve DeliverPolicy.New in the stored config;
|
|
// the fetch engine uses OptStartSeq when set regardless of policy.
|
|
if (config.DeliverPolicy == DeliverPolicy.New && StreamManager != null)
|
|
{
|
|
if (StreamManager.TryGet(stream, out var streamHandle))
|
|
{
|
|
var streamState = streamHandle.Store.GetStateAsync(default).GetAwaiter().GetResult();
|
|
config.OptStartSeq = streamState.LastSeq + 1;
|
|
}
|
|
}
|
|
|
|
if (config.FilterSubjects.Count == 0 && !string.IsNullOrWhiteSpace(config.FilterSubject))
|
|
config.FilterSubjects.Add(config.FilterSubject);
|
|
|
|
if (config.DeliverPolicy == DeliverPolicy.LastPerSubject
|
|
&& string.IsNullOrWhiteSpace(config.ResolvePrimaryFilterSubject()))
|
|
{
|
|
return JetStreamApiResponse.ErrorResponse(400, "last per subject requires filter subject");
|
|
}
|
|
|
|
var key = (stream, config.DurableName);
|
|
var handle = _consumers.AddOrUpdate(key,
|
|
_ => new ConsumerHandle(stream, config),
|
|
(_, existing) => existing with { Config = config });
|
|
|
|
return new JetStreamApiResponse
|
|
{
|
|
ConsumerInfo = new JetStreamConsumerInfo
|
|
{
|
|
Name = handle.Config.DurableName,
|
|
StreamName = stream,
|
|
Config = handle.Config,
|
|
},
|
|
};
|
|
}
|
|
|
|
/// <summary>
|
|
/// Returns API info payload for a specific stream consumer.
|
|
/// </summary>
|
|
/// <param name="stream">Owning stream name.</param>
|
|
/// <param name="durableName">Consumer durable name.</param>
|
|
public JetStreamApiResponse GetInfo(string stream, string durableName)
|
|
{
|
|
if (_consumers.TryGetValue((stream, durableName), out var handle))
|
|
{
|
|
return new JetStreamApiResponse
|
|
{
|
|
ConsumerInfo = new JetStreamConsumerInfo
|
|
{
|
|
Name = handle.Config.DurableName,
|
|
StreamName = stream,
|
|
Config = handle.Config,
|
|
},
|
|
};
|
|
}
|
|
|
|
return JetStreamApiResponse.NotFound($"$JS.API.CONSUMER.INFO.{stream}.{durableName}");
|
|
}
|
|
|
|
/// <summary>
|
|
/// Tries to resolve a consumer handle by stream and durable name.
|
|
/// </summary>
|
|
/// <param name="stream">Owning stream name.</param>
|
|
/// <param name="durableName">Consumer durable name.</param>
|
|
/// <param name="handle">Resolved in-memory consumer handle when found.</param>
|
|
public bool TryGet(string stream, string durableName, out ConsumerHandle handle)
|
|
=> _consumers.TryGetValue((stream, durableName), out handle!);
|
|
|
|
/// <summary>
|
|
/// Deletes a consumer and clears any pending auto-resume timer.
|
|
/// </summary>
|
|
/// <param name="stream">Owning stream name.</param>
|
|
/// <param name="durableName">Consumer durable name.</param>
|
|
public bool Delete(string stream, string durableName)
|
|
{
|
|
CancelResumeTimer((stream, durableName));
|
|
return _consumers.TryRemove((stream, durableName), out _);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Lists consumer durable names for a stream.
|
|
/// </summary>
|
|
/// <param name="stream">Stream name to list consumers from.</param>
|
|
public IReadOnlyList<string> ListNames(string stream)
|
|
=> _consumers.Keys
|
|
.Where(k => string.Equals(k.Stream, stream, StringComparison.Ordinal))
|
|
.Select(k => k.Name)
|
|
.OrderBy(x => x, StringComparer.Ordinal)
|
|
.ToArray();
|
|
|
|
/// <summary>
|
|
/// Lists API consumer info objects for a stream.
|
|
/// </summary>
|
|
/// <param name="stream">Stream name to list consumer details from.</param>
|
|
public IReadOnlyList<JetStreamConsumerInfo> ListConsumerInfos(string stream)
|
|
=> _consumers
|
|
.Where(kv => string.Equals(kv.Key.Stream, stream, StringComparison.Ordinal))
|
|
.OrderBy(kv => kv.Key.Name, StringComparer.Ordinal)
|
|
.Select(kv => new JetStreamConsumerInfo { Name = kv.Value.Config.DurableName, StreamName = stream, Config = kv.Value.Config })
|
|
.ToList();
|
|
|
|
/// <summary>
|
|
/// Pauses or unpauses a consumer immediately.
|
|
/// </summary>
|
|
/// <param name="stream">Owning stream name.</param>
|
|
/// <param name="durableName">Consumer durable name.</param>
|
|
/// <param name="paused"><see langword="true"/> to pause delivery; <see langword="false"/> to resume immediately.</param>
|
|
public bool Pause(string stream, string durableName, bool paused)
|
|
{
|
|
if (!_consumers.TryGetValue((stream, durableName), out var handle))
|
|
return false;
|
|
|
|
handle.Paused = paused;
|
|
if (!paused)
|
|
{
|
|
handle.PauseUntilUtc = null;
|
|
CancelResumeTimer((stream, durableName));
|
|
}
|
|
return true;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Pause a consumer until <paramref name="pauseUntilUtc"/>.
|
|
/// A background timer will auto-resume the consumer when the deadline passes.
|
|
/// Go reference: consumer.go (pauseConsumer).
|
|
/// </summary>
|
|
/// <param name="stream">Stream name containing the consumer.</param>
|
|
/// <param name="durableName">Consumer durable name.</param>
|
|
/// <param name="pauseUntilUtc">UTC deadline for automatic resume.</param>
|
|
public bool Pause(string stream, string durableName, DateTime pauseUntilUtc)
|
|
{
|
|
if (!_consumers.TryGetValue((stream, durableName), out var handle))
|
|
return false;
|
|
|
|
handle.Paused = true;
|
|
handle.PauseUntilUtc = pauseUntilUtc;
|
|
|
|
// Cancel any existing timer for this consumer before scheduling a new one.
|
|
CancelResumeTimer((stream, durableName));
|
|
|
|
var delay = pauseUntilUtc - DateTime.UtcNow;
|
|
if (delay <= TimeSpan.Zero)
|
|
{
|
|
// Deadline already passed — resume immediately.
|
|
AutoResume(stream, durableName);
|
|
}
|
|
else
|
|
{
|
|
var key = (stream, durableName);
|
|
var timer = new Timer(_ => AutoResume(key.stream, key.durableName),
|
|
state: null, dueTime: delay, period: Timeout.InfiniteTimeSpan);
|
|
_resumeTimers[key] = timer;
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Explicitly resume a paused consumer, cancelling any pending auto-resume timer.
|
|
/// Go reference: consumer.go (resumeConsumer).
|
|
/// </summary>
|
|
/// <param name="stream">Stream name containing the consumer.</param>
|
|
/// <param name="durableName">Consumer durable name.</param>
|
|
public bool Resume(string stream, string durableName)
|
|
{
|
|
if (!_consumers.TryGetValue((stream, durableName), out var handle))
|
|
return false;
|
|
|
|
handle.Paused = false;
|
|
handle.PauseUntilUtc = null;
|
|
CancelResumeTimer((stream, durableName));
|
|
return true;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Returns true when the consumer is paused and the deadline (if set) has not yet passed.
|
|
/// If the deadline has passed, auto-resumes the consumer and returns false.
|
|
/// Go reference: consumer.go (isPaused).
|
|
/// </summary>
|
|
/// <param name="stream">Stream name containing the consumer.</param>
|
|
/// <param name="durableName">Consumer durable name.</param>
|
|
public bool IsPaused(string stream, string durableName)
|
|
{
|
|
if (!_consumers.TryGetValue((stream, durableName), out var handle))
|
|
return false;
|
|
|
|
if (!handle.Paused)
|
|
return false;
|
|
|
|
if (handle.PauseUntilUtc.HasValue && handle.PauseUntilUtc.Value <= DateTime.UtcNow)
|
|
{
|
|
AutoResume(stream, durableName);
|
|
return false;
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Returns the UTC deadline until which the consumer is paused, or null.
|
|
/// Go reference: consumer.go (pauseUntil).
|
|
/// </summary>
|
|
/// <param name="stream">Stream name containing the consumer.</param>
|
|
/// <param name="durableName">Consumer durable name.</param>
|
|
public DateTime? GetPauseUntil(string stream, string durableName)
|
|
{
|
|
if (!_consumers.TryGetValue((stream, durableName), out var handle))
|
|
return null;
|
|
|
|
return handle.PauseUntilUtc;
|
|
}
|
|
|
|
private void AutoResume(string stream, string durableName)
|
|
{
|
|
if (!_consumers.TryGetValue((stream, durableName), out var handle))
|
|
return;
|
|
|
|
handle.Paused = false;
|
|
handle.PauseUntilUtc = null;
|
|
CancelResumeTimer((stream, durableName));
|
|
OnAutoResumed?.Invoke(this, (stream, durableName));
|
|
}
|
|
|
|
private void CancelResumeTimer((string Stream, string Name) key)
|
|
{
|
|
if (_resumeTimers.TryRemove(key, out var timer))
|
|
timer.Dispose();
|
|
}
|
|
|
|
/// <summary>
|
|
/// Disposes active resume timers and clears timer registry.
|
|
/// </summary>
|
|
public void Dispose()
|
|
{
|
|
foreach (var timer in _resumeTimers.Values)
|
|
timer.Dispose();
|
|
_resumeTimers.Clear();
|
|
}
|
|
|
|
/// <summary>
|
|
/// Resets consumer sequence and pending queue to initial state.
|
|
/// </summary>
|
|
/// <param name="stream">Owning stream name.</param>
|
|
/// <param name="durableName">Consumer durable name.</param>
|
|
public bool Reset(string stream, string durableName)
|
|
{
|
|
if (!_consumers.TryGetValue((stream, durableName), out var handle))
|
|
return false;
|
|
|
|
handle.NextSequence = 1;
|
|
handle.Pending.Clear();
|
|
return true;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Resets a consumer's position to the specified sequence.
|
|
/// Clears pending acks and redelivery state.
|
|
/// Go reference: consumer.go:4241 processResetReq.
|
|
/// </summary>
|
|
/// <param name="stream">Stream name containing the consumer.</param>
|
|
/// <param name="durableName">Consumer durable name.</param>
|
|
/// <param name="sequence">Next sequence to resume delivery from.</param>
|
|
public bool ResetToSequence(string stream, string durableName, ulong sequence)
|
|
{
|
|
if (!_consumers.TryGetValue((stream, durableName), out var handle))
|
|
return false;
|
|
|
|
// Update the consumer's next sequence
|
|
handle.NextSequence = sequence;
|
|
|
|
// Clear pending acks — all outstanding acks are invalid after reset
|
|
handle.AckProcessor.ClearAll();
|
|
|
|
// Clear pending bytes
|
|
handle.PendingBytes = 0;
|
|
|
|
return true;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Returns whether a consumer exists for unpin-style API semantics.
|
|
/// </summary>
|
|
/// <param name="stream">Owning stream name.</param>
|
|
/// <param name="durableName">Consumer durable name.</param>
|
|
public bool Unpin(string stream, string durableName)
|
|
{
|
|
return _consumers.ContainsKey((stream, durableName));
|
|
}
|
|
|
|
/// <summary>
|
|
/// Fetches a pull batch using a simple batch-size request.
|
|
/// </summary>
|
|
/// <param name="stream">Owning stream name.</param>
|
|
/// <param name="durableName">Consumer durable name.</param>
|
|
/// <param name="batch">Maximum number of messages requested.</param>
|
|
/// <param name="streamManager">Stream registry used to resolve the source stream handle.</param>
|
|
/// <param name="ct">Cancellation token for wait and fetch operations.</param>
|
|
public async ValueTask<PullFetchBatch> FetchAsync(string stream, string durableName, int batch, StreamManager streamManager, CancellationToken ct)
|
|
=> await FetchAsync(stream, durableName, new PullFetchRequest { Batch = batch }, streamManager, ct);
|
|
|
|
/// <summary>
|
|
/// Fetches a pull batch for a consumer using a detailed pull request.
|
|
/// </summary>
|
|
/// <param name="stream">Owning stream name.</param>
|
|
/// <param name="durableName">Consumer durable name.</param>
|
|
/// <param name="request">Pull request options such as batch size, expiry, and byte limits.</param>
|
|
/// <param name="streamManager">Stream registry used to resolve the source stream handle.</param>
|
|
/// <param name="ct">Cancellation token for wait and fetch operations.</param>
|
|
public async ValueTask<PullFetchBatch> FetchAsync(string stream, string durableName, PullFetchRequest request, StreamManager streamManager, CancellationToken ct)
|
|
{
|
|
if (!_consumers.TryGetValue((stream, durableName), out var consumer))
|
|
return new PullFetchBatch([]);
|
|
|
|
if (!streamManager.TryGet(stream, out var streamHandle))
|
|
return new PullFetchBatch([]);
|
|
|
|
return await _pullConsumerEngine.FetchAsync(streamHandle, consumer, request, ct);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Acknowledges all pending entries up to the specified sequence.
|
|
/// </summary>
|
|
/// <param name="stream">Owning stream name.</param>
|
|
/// <param name="durableName">Consumer durable name.</param>
|
|
/// <param name="sequence">Inclusive stream sequence that advances the ack floor.</param>
|
|
public bool AckAll(string stream, string durableName, ulong sequence)
|
|
{
|
|
if (!_consumers.TryGetValue((stream, durableName), out var handle))
|
|
return false;
|
|
|
|
handle.AckProcessor.AckAll(sequence);
|
|
_ackFloors.AddOrUpdate(stream, _ => handle.AckProcessor.AckFloor, (_, existing) => Math.Max(existing, handle.AckProcessor.AckFloor));
|
|
return true;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Returns pending-ack count for a consumer.
|
|
/// </summary>
|
|
/// <param name="stream">Owning stream name.</param>
|
|
/// <param name="durableName">Consumer durable name.</param>
|
|
public int GetPendingCount(string stream, string durableName)
|
|
{
|
|
if (!_consumers.TryGetValue((stream, durableName), out var handle))
|
|
return 0;
|
|
|
|
return handle.AckProcessor.PendingCount;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Returns true if there are any consumers registered for the given stream.
|
|
/// Used to short-circuit the LoadAsync call on the publish hot path.
|
|
/// </summary>
|
|
/// <param name="stream">Stream name to check.</param>
|
|
public bool HasConsumersForStream(string stream)
|
|
=> _consumers.Keys.Any(k => string.Equals(k.Stream, stream, StringComparison.Ordinal));
|
|
|
|
/// <summary>
|
|
/// Handles a newly stored stream message for push-consumer fan-out.
|
|
/// </summary>
|
|
/// <param name="stream">Owning stream name for the published message.</param>
|
|
/// <param name="message">Stored message metadata and payload to fan out.</param>
|
|
public void OnPublished(string stream, StoredMessage message)
|
|
{
|
|
foreach (var handle in _consumers.Values.Where(c => c.Stream == stream && c.Config.Push))
|
|
{
|
|
if (!MatchesFilter(handle.Config, message.Subject))
|
|
continue;
|
|
|
|
if (handle.Config.MaxAckPending > 0 && handle.AckProcessor.PendingCount >= handle.Config.MaxAckPending)
|
|
continue;
|
|
|
|
_pushConsumerEngine.Enqueue(handle, message);
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Reads the next available push frame for a consumer when release time has arrived.
|
|
/// </summary>
|
|
/// <param name="stream">Owning stream name.</param>
|
|
/// <param name="durableName">Consumer durable name.</param>
|
|
public PushFrame? ReadPushFrame(string stream, string durableName)
|
|
{
|
|
if (!_consumers.TryGetValue((stream, durableName), out var consumer))
|
|
return null;
|
|
|
|
if (consumer.PushFrames.Count == 0)
|
|
return null;
|
|
|
|
var frame = consumer.PushFrames.Peek();
|
|
if (frame.AvailableAtUtc > DateTime.UtcNow)
|
|
return null;
|
|
|
|
return consumer.PushFrames.Dequeue();
|
|
}
|
|
|
|
private static bool MatchesFilter(ConsumerConfig config, string subject)
|
|
{
|
|
if (config.FilterSubjects.Count > 0)
|
|
return config.FilterSubjects.Any(f => SubjectMatch.MatchLiteral(subject, f));
|
|
|
|
if (!string.IsNullOrWhiteSpace(config.FilterSubject))
|
|
return SubjectMatch.MatchLiteral(subject, config.FilterSubject);
|
|
|
|
return true;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Gets stream-level ack floor derived from consumer acknowledgements.
|
|
/// </summary>
|
|
/// <param name="stream">Stream name whose ack floor should be returned.</param>
|
|
internal ulong GetAckFloor(string stream)
|
|
=> _ackFloors.TryGetValue(stream, out var ackFloor) ? ackFloor : 0;
|
|
}
|
|
|
|
public sealed record ConsumerHandle(string Stream, ConsumerConfig Config)
|
|
{
|
|
/// <summary>
|
|
/// Compiled filter derived from Config. Cached per-instance; invalidated when Config
|
|
/// changes (either via <c>with { Config = newConfig }</c> or in-place mutation of
|
|
/// FilterSubject/FilterSubjects).
|
|
/// Go reference: consumer.go — filter subjects resolved once at consumer creation.
|
|
/// </summary>
|
|
private Consumers.CompiledFilter? _compiledFilter;
|
|
private string? _compiledFilterSubject;
|
|
private int _compiledFilterSubjectsCount;
|
|
/// <summary>
|
|
/// Gets cached compiled subject filter for this consumer configuration.
|
|
/// </summary>
|
|
public Consumers.CompiledFilter CompiledFilter
|
|
{
|
|
get
|
|
{
|
|
// Detect both reference change (with expression) and in-place mutation
|
|
if (_compiledFilter == null
|
|
|| _compiledFilterSubject != Config.FilterSubject
|
|
|| _compiledFilterSubjectsCount != Config.FilterSubjects.Count)
|
|
{
|
|
_compiledFilter = Consumers.CompiledFilter.FromConfig(Config);
|
|
_compiledFilterSubject = Config.FilterSubject;
|
|
_compiledFilterSubjectsCount = Config.FilterSubjects.Count;
|
|
}
|
|
return _compiledFilter;
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Gets or sets next stream sequence to deliver.
|
|
/// </summary>
|
|
public ulong NextSequence { get; set; } = 1;
|
|
|
|
/// <summary>
|
|
/// Gets or sets whether delivery is currently paused.
|
|
/// </summary>
|
|
public bool Paused { get; set; }
|
|
|
|
/// <summary>
|
|
/// UTC deadline until which this consumer is paused. Null means pause indefinitely
|
|
/// (until explicitly resumed). Go reference: consumer.go pauseUntil field.
|
|
/// </summary>
|
|
public DateTime? PauseUntilUtc { get; set; }
|
|
/// <summary>
|
|
/// Gets pending stored messages queued for this consumer.
|
|
/// </summary>
|
|
public Queue<StoredMessage> Pending { get; } = new();
|
|
|
|
/// <summary>
|
|
/// Gets queued push frames waiting for delivery window release.
|
|
/// </summary>
|
|
public Queue<PushFrame> PushFrames { get; } = new();
|
|
|
|
/// <summary>
|
|
/// Gets ack processor state for pending and ack-floor tracking.
|
|
/// </summary>
|
|
public AckProcessor AckProcessor { get; } = new();
|
|
|
|
/// <summary>
|
|
/// Gets or sets next UTC time when push data can be delivered.
|
|
/// </summary>
|
|
public DateTime NextPushDataAvailableAtUtc { get; set; }
|
|
|
|
/// <summary>
|
|
/// Total pending bytes across all unacknowledged messages.
|
|
/// Included in idle heartbeat headers as Nats-Pending-Bytes.
|
|
/// Go reference: consumer.go sendIdleHeartbeat.
|
|
/// </summary>
|
|
public long PendingBytes { get; set; }
|
|
}
|