Files
natsdotnet/src/NATS.Server/JetStream/ConsumerManager.cs
Joseph Doherty 778687cf6f feat: add consumer reset to specific sequence (Gap 3.12)
Add ResetToSequence to ConsumerManager that updates NextSequence,
clears AckProcessor state via new ClearAll(), and zeroes PendingBytes.
Add AckProcessor.SetAckFloor() that prunes pending entries below the
new floor. Go reference: consumer.go:4241 processResetReq.
2026-02-25 11:15:33 -05:00

353 lines
12 KiB
C#

using System.Collections.Concurrent;
using NATS.Server.JetStream.Api;
using NATS.Server.JetStream.Cluster;
using NATS.Server.JetStream.Consumers;
using NATS.Server.JetStream.Models;
using NATS.Server.JetStream.Storage;
using NATS.Server.Subscriptions;
namespace NATS.Server.JetStream;
public sealed class ConsumerManager : IDisposable
{
private readonly JetStreamMetaGroup? _metaGroup;
private readonly ConcurrentDictionary<(string Stream, string Name), ConsumerHandle> _consumers = new();
private readonly ConcurrentDictionary<string, ulong> _ackFloors = new(StringComparer.Ordinal);
private readonly ConcurrentDictionary<(string Stream, string Name), Timer> _resumeTimers = new();
private readonly PullConsumerEngine _pullConsumerEngine = new();
private readonly PushConsumerEngine _pushConsumerEngine = new();
/// <summary>
/// Raised when a consumer is automatically resumed by the deadline timer.
/// Arguments are (stream, durableName).
/// </summary>
public event EventHandler<(string Stream, string Name)>? OnAutoResumed;
public ConsumerManager(JetStreamMetaGroup? metaGroup = null)
{
_metaGroup = metaGroup;
}
public int ConsumerCount => _consumers.Count;
public JetStreamApiResponse CreateOrUpdate(string stream, ConsumerConfig config)
{
if (string.IsNullOrWhiteSpace(config.DurableName))
{
if (config.Ephemeral)
config.DurableName = $"ephemeral-{Guid.NewGuid():N}"[..24];
else
return JetStreamApiResponse.ErrorResponse(400, "durable name required");
}
if (config.FilterSubjects.Count == 0 && !string.IsNullOrWhiteSpace(config.FilterSubject))
config.FilterSubjects.Add(config.FilterSubject);
if (config.DeliverPolicy == DeliverPolicy.LastPerSubject
&& string.IsNullOrWhiteSpace(config.ResolvePrimaryFilterSubject()))
{
return JetStreamApiResponse.ErrorResponse(400, "last per subject requires filter subject");
}
var key = (stream, config.DurableName);
var handle = _consumers.AddOrUpdate(key,
_ => new ConsumerHandle(stream, config),
(_, existing) => existing with { Config = config });
return new JetStreamApiResponse
{
ConsumerInfo = new JetStreamConsumerInfo
{
Config = handle.Config,
},
};
}
public JetStreamApiResponse GetInfo(string stream, string durableName)
{
if (_consumers.TryGetValue((stream, durableName), out var handle))
{
return new JetStreamApiResponse
{
ConsumerInfo = new JetStreamConsumerInfo
{
Config = handle.Config,
},
};
}
return JetStreamApiResponse.NotFound($"$JS.API.CONSUMER.INFO.{stream}.{durableName}");
}
public bool TryGet(string stream, string durableName, out ConsumerHandle handle)
=> _consumers.TryGetValue((stream, durableName), out handle!);
public bool Delete(string stream, string durableName)
{
CancelResumeTimer((stream, durableName));
return _consumers.TryRemove((stream, durableName), out _);
}
public IReadOnlyList<string> ListNames(string stream)
=> _consumers.Keys
.Where(k => string.Equals(k.Stream, stream, StringComparison.Ordinal))
.Select(k => k.Name)
.OrderBy(x => x, StringComparer.Ordinal)
.ToArray();
public bool Pause(string stream, string durableName, bool paused)
{
if (!_consumers.TryGetValue((stream, durableName), out var handle))
return false;
handle.Paused = paused;
if (!paused)
{
handle.PauseUntilUtc = null;
CancelResumeTimer((stream, durableName));
}
return true;
}
/// <summary>
/// Pause a consumer until <paramref name="pauseUntilUtc"/>.
/// A background timer will auto-resume the consumer when the deadline passes.
/// Go reference: consumer.go (pauseConsumer).
/// </summary>
public bool Pause(string stream, string durableName, DateTime pauseUntilUtc)
{
if (!_consumers.TryGetValue((stream, durableName), out var handle))
return false;
handle.Paused = true;
handle.PauseUntilUtc = pauseUntilUtc;
// Cancel any existing timer for this consumer before scheduling a new one.
CancelResumeTimer((stream, durableName));
var delay = pauseUntilUtc - DateTime.UtcNow;
if (delay <= TimeSpan.Zero)
{
// Deadline already passed — resume immediately.
AutoResume(stream, durableName);
}
else
{
var key = (stream, durableName);
var timer = new Timer(_ => AutoResume(key.stream, key.durableName),
state: null, dueTime: delay, period: Timeout.InfiniteTimeSpan);
_resumeTimers[key] = timer;
}
return true;
}
/// <summary>
/// Explicitly resume a paused consumer, cancelling any pending auto-resume timer.
/// Go reference: consumer.go (resumeConsumer).
/// </summary>
public bool Resume(string stream, string durableName)
{
if (!_consumers.TryGetValue((stream, durableName), out var handle))
return false;
handle.Paused = false;
handle.PauseUntilUtc = null;
CancelResumeTimer((stream, durableName));
return true;
}
/// <summary>
/// Returns true when the consumer is paused and the deadline (if set) has not yet passed.
/// If the deadline has passed, auto-resumes the consumer and returns false.
/// Go reference: consumer.go (isPaused).
/// </summary>
public bool IsPaused(string stream, string durableName)
{
if (!_consumers.TryGetValue((stream, durableName), out var handle))
return false;
if (!handle.Paused)
return false;
if (handle.PauseUntilUtc.HasValue && handle.PauseUntilUtc.Value <= DateTime.UtcNow)
{
AutoResume(stream, durableName);
return false;
}
return true;
}
/// <summary>
/// Returns the UTC deadline until which the consumer is paused, or null.
/// Go reference: consumer.go (pauseUntil).
/// </summary>
public DateTime? GetPauseUntil(string stream, string durableName)
{
if (!_consumers.TryGetValue((stream, durableName), out var handle))
return null;
return handle.PauseUntilUtc;
}
private void AutoResume(string stream, string durableName)
{
if (!_consumers.TryGetValue((stream, durableName), out var handle))
return;
handle.Paused = false;
handle.PauseUntilUtc = null;
CancelResumeTimer((stream, durableName));
OnAutoResumed?.Invoke(this, (stream, durableName));
}
private void CancelResumeTimer((string Stream, string Name) key)
{
if (_resumeTimers.TryRemove(key, out var timer))
timer.Dispose();
}
public void Dispose()
{
foreach (var timer in _resumeTimers.Values)
timer.Dispose();
_resumeTimers.Clear();
}
public bool Reset(string stream, string durableName)
{
if (!_consumers.TryGetValue((stream, durableName), out var handle))
return false;
handle.NextSequence = 1;
handle.Pending.Clear();
return true;
}
/// <summary>
/// Resets a consumer's position to the specified sequence.
/// Clears pending acks and redelivery state.
/// Go reference: consumer.go:4241 processResetReq.
/// </summary>
public bool ResetToSequence(string stream, string durableName, ulong sequence)
{
if (!_consumers.TryGetValue((stream, durableName), out var handle))
return false;
// Update the consumer's next sequence
handle.NextSequence = sequence;
// Clear pending acks — all outstanding acks are invalid after reset
handle.AckProcessor.ClearAll();
// Clear pending bytes
handle.PendingBytes = 0;
return true;
}
public bool Unpin(string stream, string durableName)
{
return _consumers.ContainsKey((stream, durableName));
}
public async ValueTask<PullFetchBatch> FetchAsync(string stream, string durableName, int batch, StreamManager streamManager, CancellationToken ct)
=> await FetchAsync(stream, durableName, new PullFetchRequest { Batch = batch }, streamManager, ct);
public async ValueTask<PullFetchBatch> FetchAsync(string stream, string durableName, PullFetchRequest request, StreamManager streamManager, CancellationToken ct)
{
if (!_consumers.TryGetValue((stream, durableName), out var consumer))
return new PullFetchBatch([]);
if (!streamManager.TryGet(stream, out var streamHandle))
return new PullFetchBatch([]);
return await _pullConsumerEngine.FetchAsync(streamHandle, consumer, request, ct);
}
public bool AckAll(string stream, string durableName, ulong sequence)
{
if (!_consumers.TryGetValue((stream, durableName), out var handle))
return false;
handle.AckProcessor.AckAll(sequence);
_ackFloors.AddOrUpdate(stream, _ => handle.AckProcessor.AckFloor, (_, existing) => Math.Max(existing, handle.AckProcessor.AckFloor));
return true;
}
public int GetPendingCount(string stream, string durableName)
{
if (!_consumers.TryGetValue((stream, durableName), out var handle))
return 0;
return handle.AckProcessor.PendingCount;
}
public void OnPublished(string stream, StoredMessage message)
{
foreach (var handle in _consumers.Values.Where(c => c.Stream == stream && c.Config.Push))
{
if (!MatchesFilter(handle.Config, message.Subject))
continue;
if (handle.Config.MaxAckPending > 0 && handle.AckProcessor.PendingCount >= handle.Config.MaxAckPending)
continue;
_pushConsumerEngine.Enqueue(handle, message);
}
}
public PushFrame? ReadPushFrame(string stream, string durableName)
{
if (!_consumers.TryGetValue((stream, durableName), out var consumer))
return null;
if (consumer.PushFrames.Count == 0)
return null;
var frame = consumer.PushFrames.Peek();
if (frame.AvailableAtUtc > DateTime.UtcNow)
return null;
return consumer.PushFrames.Dequeue();
}
private static bool MatchesFilter(ConsumerConfig config, string subject)
{
if (config.FilterSubjects.Count > 0)
return config.FilterSubjects.Any(f => SubjectMatch.MatchLiteral(subject, f));
if (!string.IsNullOrWhiteSpace(config.FilterSubject))
return SubjectMatch.MatchLiteral(subject, config.FilterSubject);
return true;
}
internal ulong GetAckFloor(string stream)
=> _ackFloors.TryGetValue(stream, out var ackFloor) ? ackFloor : 0;
}
public sealed record ConsumerHandle(string Stream, ConsumerConfig Config)
{
public ulong NextSequence { get; set; } = 1;
public bool Paused { get; set; }
/// <summary>
/// UTC deadline until which this consumer is paused. Null means pause indefinitely
/// (until explicitly resumed). Go reference: consumer.go pauseUntil field.
/// </summary>
public DateTime? PauseUntilUtc { get; set; }
public Queue<StoredMessage> Pending { get; } = new();
public Queue<PushFrame> PushFrames { get; } = new();
public AckProcessor AckProcessor { get; } = new();
public DateTime NextPushDataAvailableAtUtc { get; set; }
/// <summary>
/// Total pending bytes across all unacknowledged messages.
/// Included in idle heartbeat headers as Nats-Pending-Bytes.
/// Go reference: consumer.go sendIdleHeartbeat.
/// </summary>
public long PendingBytes { get; set; }
}