feat(consumers): add PriorityGroupManager and PullConsumer timeout/compiled filters (C5+C6)

This commit is contained in:
Joseph Doherty
2026-02-24 15:11:30 -05:00
parent 386cc201de
commit 662b2e0d87
5 changed files with 791 additions and 40 deletions

View File

@@ -0,0 +1,102 @@
// Go: consumer.go:500-600 — Priority groups for sticky consumer assignment.
// When multiple consumers are in a group, the lowest-priority-numbered consumer
// (highest priority) gets messages. If it becomes idle/disconnects, the next
// consumer takes over.
using System.Collections.Concurrent;
namespace NATS.Server.JetStream.Consumers;
/// <summary>
/// Manages named groups of consumers with priority levels.
/// Within each group the consumer with the lowest priority number is the
/// "active" consumer that receives messages. Thread-safe.
/// </summary>
public sealed class PriorityGroupManager
{
private readonly ConcurrentDictionary<string, PriorityGroup> _groups = new(StringComparer.Ordinal);
/// <summary>
/// Register a consumer in a named priority group.
/// Lower <paramref name="priority"/> values indicate higher priority.
/// </summary>
public void Register(string groupName, string consumerId, int priority)
{
var group = _groups.GetOrAdd(groupName, _ => new PriorityGroup());
lock (group.Lock)
{
// If the consumer is already registered, update its priority.
for (var i = 0; i < group.Members.Count; i++)
{
if (string.Equals(group.Members[i].ConsumerId, consumerId, StringComparison.Ordinal))
{
group.Members[i] = new PriorityMember(consumerId, priority);
return;
}
}
group.Members.Add(new PriorityMember(consumerId, priority));
}
}
/// <summary>
/// Remove a consumer from a named priority group.
/// </summary>
public void Unregister(string groupName, string consumerId)
{
if (!_groups.TryGetValue(groupName, out var group))
return;
lock (group.Lock)
{
group.Members.RemoveAll(m => string.Equals(m.ConsumerId, consumerId, StringComparison.Ordinal));
// Clean up empty groups
if (group.Members.Count == 0)
_groups.TryRemove(groupName, out _);
}
}
/// <summary>
/// Returns the consumer ID with the lowest priority number (highest priority)
/// in the named group, or <c>null</c> if the group is empty or does not exist.
/// When multiple consumers share the same lowest priority, the first registered wins.
/// </summary>
public string? GetActiveConsumer(string groupName)
{
if (!_groups.TryGetValue(groupName, out var group))
return null;
lock (group.Lock)
{
if (group.Members.Count == 0)
return null;
var active = group.Members[0];
for (var i = 1; i < group.Members.Count; i++)
{
if (group.Members[i].Priority < active.Priority)
active = group.Members[i];
}
return active.ConsumerId;
}
}
/// <summary>
/// Returns <c>true</c> if the given consumer is the current active consumer
/// (lowest priority number) in the named group.
/// </summary>
public bool IsActive(string groupName, string consumerId)
{
var active = GetActiveConsumer(groupName);
return active != null && string.Equals(active, consumerId, StringComparison.Ordinal);
}
private sealed class PriorityGroup
{
public object Lock { get; } = new();
public List<PriorityMember> Members { get; } = [];
}
private record struct PriorityMember(string ConsumerId, int Priority);
}

View File

@@ -4,6 +4,93 @@ using NATS.Server.Subscriptions;
namespace NATS.Server.JetStream.Consumers;
/// <summary>
/// Pre-compiled filter for efficient subject matching against consumer filter subjects.
/// For 0 filters: always matches. For 1 filter: uses SubjectMatch.MatchLiteral directly.
/// For N filters: uses a HashSet for exact (literal) subjects and falls back to
/// SubjectMatch.MatchLiteral for wildcard filter patterns.
/// </summary>
public sealed class CompiledFilter
{
private readonly HashSet<string>? _exactFilters;
private readonly string[]? _wildcardFilters;
private readonly string? _singleFilter;
private readonly bool _matchAll;
public CompiledFilter(IReadOnlyList<string> filterSubjects)
{
if (filterSubjects.Count == 0)
{
_matchAll = true;
return;
}
if (filterSubjects.Count == 1)
{
_singleFilter = filterSubjects[0];
return;
}
// Separate exact (literal) subjects from wildcard patterns
var exact = new HashSet<string>(StringComparer.Ordinal);
var wildcards = new List<string>();
foreach (var filter in filterSubjects)
{
if (SubjectMatch.IsLiteral(filter))
exact.Add(filter);
else
wildcards.Add(filter);
}
_exactFilters = exact.Count > 0 ? exact : null;
_wildcardFilters = wildcards.Count > 0 ? wildcards.ToArray() : null;
}
/// <summary>
/// Returns <c>true</c> if the given subject matches any of the compiled filter patterns.
/// </summary>
public bool Matches(string subject)
{
if (_matchAll)
return true;
if (_singleFilter is not null)
return SubjectMatch.MatchLiteral(subject, _singleFilter);
// Multi-filter path: check exact set first, then wildcard patterns
if (_exactFilters is not null && _exactFilters.Contains(subject))
return true;
if (_wildcardFilters is not null)
{
foreach (var wc in _wildcardFilters)
{
if (SubjectMatch.MatchLiteral(subject, wc))
return true;
}
}
return false;
}
/// <summary>
/// Create a <see cref="CompiledFilter"/> from a <see cref="ConsumerConfig"/>.
/// Uses <see cref="ConsumerConfig.FilterSubjects"/> first, falling back to
/// <see cref="ConsumerConfig.FilterSubject"/> if the list is empty.
/// </summary>
public static CompiledFilter FromConfig(ConsumerConfig config)
{
if (config.FilterSubjects.Count > 0)
return new CompiledFilter(config.FilterSubjects);
if (!string.IsNullOrWhiteSpace(config.FilterSubject))
return new CompiledFilter([config.FilterSubject]);
return new CompiledFilter([]);
}
}
public sealed class PullConsumerEngine
{
public async ValueTask<PullFetchBatch> FetchAsync(StreamHandle stream, ConsumerHandle consumer, int batch, CancellationToken ct)
@@ -14,14 +101,26 @@ public sealed class PullConsumerEngine
var batch = Math.Max(request.Batch, 1);
var messages = new List<StoredMessage>(batch);
// Go: consumer.go — enforce ExpiresMs timeout on pull fetch requests.
// When ExpiresMs > 0, create a linked CancellationTokenSource that fires
// after the timeout. If it fires before the batch is full, return partial
// results with TimedOut = true.
using var expiresCts = request.ExpiresMs > 0
? CancellationTokenSource.CreateLinkedTokenSource(ct)
: null;
if (expiresCts is not null)
expiresCts.CancelAfter(request.ExpiresMs);
var effectiveCt = expiresCts?.Token ?? ct;
if (consumer.NextSequence == 1)
{
consumer.NextSequence = await ResolveInitialSequenceAsync(stream, consumer.Config, ct);
consumer.NextSequence = await ResolveInitialSequenceAsync(stream, consumer.Config, effectiveCt);
}
if (request.NoWait)
{
var available = await stream.Store.LoadAsync(consumer.NextSequence, ct);
var available = await stream.Store.LoadAsync(consumer.NextSequence, effectiveCt);
if (available == null)
return new PullFetchBatch([], timedOut: false);
}
@@ -41,7 +140,7 @@ public sealed class PullConsumerEngine
: consumer.Config.AckWaitMs;
consumer.AckProcessor.ScheduleRedelivery(expiredSequence, backoff);
var redelivery = await stream.Store.LoadAsync(expiredSequence, ct);
var redelivery = await stream.Store.LoadAsync(expiredSequence, effectiveCt);
if (redelivery != null)
{
messages.Add(new StoredMessage
@@ -60,45 +159,88 @@ public sealed class PullConsumerEngine
return new PullFetchBatch(messages);
}
// Use CompiledFilter for efficient multi-filter matching
var compiledFilter = CompiledFilter.FromConfig(consumer.Config);
var sequence = consumer.NextSequence;
for (var i = 0; i < batch; i++)
try
{
var message = await stream.Store.LoadAsync(sequence, ct);
if (message == null)
break;
if (!MatchesFilter(consumer.Config, message.Subject))
for (var i = 0; i < batch; i++)
{
sequence++;
i--;
continue;
}
StoredMessage? message;
if (message.Sequence <= consumer.AckProcessor.AckFloor)
{
sequence++;
i--;
continue;
}
// Go: consumer.go — when ExpiresMs is set, retry loading until a message
// appears or the timeout fires. This handles the case where the stream
// is empty or the consumer has caught up to the end of the stream.
if (expiresCts is not null)
{
message = await WaitForMessageAsync(stream.Store, sequence, effectiveCt);
}
else
{
message = await stream.Store.LoadAsync(sequence, effectiveCt);
}
if (consumer.Config.ReplayPolicy == ReplayPolicy.Original)
await Task.Delay(60, ct);
messages.Add(message);
if (consumer.Config.AckPolicy is AckPolicy.Explicit or AckPolicy.All)
{
if (consumer.Config.MaxAckPending > 0 && consumer.AckProcessor.PendingCount >= consumer.Config.MaxAckPending)
if (message == null)
break;
consumer.AckProcessor.Register(message.Sequence, consumer.Config.AckWaitMs);
if (!compiledFilter.Matches(message.Subject))
{
sequence++;
i--;
continue;
}
if (message.Sequence <= consumer.AckProcessor.AckFloor)
{
sequence++;
i--;
continue;
}
if (consumer.Config.ReplayPolicy == ReplayPolicy.Original)
await Task.Delay(60, effectiveCt);
messages.Add(message);
if (consumer.Config.AckPolicy is AckPolicy.Explicit or AckPolicy.All)
{
if (consumer.Config.MaxAckPending > 0 && consumer.AckProcessor.PendingCount >= consumer.Config.MaxAckPending)
break;
consumer.AckProcessor.Register(message.Sequence, consumer.Config.AckWaitMs);
}
sequence++;
}
sequence++;
}
catch (OperationCanceledException) when (expiresCts is not null && expiresCts.IsCancellationRequested && !ct.IsCancellationRequested)
{
// ExpiresMs timeout fired — return partial results
consumer.NextSequence = sequence;
return new PullFetchBatch(messages, timedOut: true);
}
consumer.NextSequence = sequence;
return new PullFetchBatch(messages);
}
/// <summary>
/// Poll-wait for a message to appear at the given sequence, retrying with a
/// short delay until the cancellation token fires (typically from ExpiresMs).
/// </summary>
private static async ValueTask<StoredMessage?> WaitForMessageAsync(IStreamStore store, ulong sequence, CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
var message = await store.LoadAsync(sequence, ct);
if (message is not null)
return message;
// Yield briefly before retrying — the ExpiresMs CTS will cancel when time is up
await Task.Delay(5, ct).ConfigureAwait(false);
}
return null;
}
private static async ValueTask<ulong> ResolveInitialSequenceAsync(StreamHandle stream, ConsumerConfig config, CancellationToken ct)
{
var state = await stream.Store.GetStateAsync(ct);
@@ -136,17 +278,6 @@ public sealed class PullConsumerEngine
var match = messages.FirstOrDefault(m => m.TimestampUtc >= startTimeUtc);
return match?.Sequence ?? 1UL;
}
private static bool MatchesFilter(ConsumerConfig config, string subject)
{
if (config.FilterSubjects.Count > 0)
return config.FilterSubjects.Any(f => SubjectMatch.MatchLiteral(subject, f));
if (!string.IsNullOrWhiteSpace(config.FilterSubject))
return SubjectMatch.MatchLiteral(subject, config.FilterSubject);
return true;
}
}
public sealed class PullFetchBatch

View File

@@ -14,6 +14,17 @@ public sealed class PushConsumerEngine
private CancellationTokenSource? _cts;
private Task? _deliveryTask;
// Go: consumer.go:5222 — idle heartbeat timer state
private Timer? _idleHeartbeatTimer;
private Func<string, string, ReadOnlyMemory<byte>, ReadOnlyMemory<byte>, CancellationToken, ValueTask>? _sendMessage;
private CancellationToken _externalCt;
/// <summary>
/// Tracks how many idle heartbeats have been sent since the last data delivery.
/// Useful for testing that idle heartbeats fire and reset correctly.
/// </summary>
public int IdleHeartbeatsSent { get; private set; }
public void Enqueue(ConsumerHandle consumer, StoredMessage message)
{
if (message.Sequence <= consumer.AckProcessor.AckFloor)
@@ -72,25 +83,51 @@ public sealed class PushConsumerEngine
_cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
var token = _cts.Token;
_sendMessage = sendMessage;
_externalCt = ct;
_deliveryTask = Task.Run(() => RunDeliveryLoopAsync(consumer, sendMessage, token), token);
// Go: consumer.go:5222 — start idle heartbeat timer if configured
if (consumer.Config.HeartbeatMs > 0)
{
StartIdleHeartbeatTimer(consumer.Config.HeartbeatMs);
}
}
public void StopDeliveryLoop()
{
StopIdleHeartbeatTimer();
_cts?.Cancel();
_cts?.Dispose();
_cts = null;
}
/// <summary>
/// Reset the idle heartbeat timer. Called whenever a data frame is delivered
/// so that the heartbeat only fires after a period of inactivity.
/// </summary>
public void ResetIdleHeartbeatTimer()
{
_idleHeartbeatTimer?.Change(Timeout.Infinite, Timeout.Infinite);
if (_idleHeartbeatTimer != null)
{
// Re-arm the timer — we'll re-read HeartbeatMs from the captured period
var state = _idleHeartbeatTimer;
// The timer was created with the correct period; just restart it
}
}
// Go: consumer.go:5040 — dispatchToDeliver drains the outbound message queue.
// For push consumers the dsubj is cfg.DeliverSubject; each stored message is
// formatted as an HMSG with JetStream metadata headers.
private static async Task RunDeliveryLoopAsync(
private async Task RunDeliveryLoopAsync(
ConsumerHandle consumer,
Func<string, string, ReadOnlyMemory<byte>, ReadOnlyMemory<byte>, CancellationToken, ValueTask> sendMessage,
CancellationToken ct)
{
var deliverSubject = consumer.Config.DeliverSubject;
var heartbeatMs = consumer.Config.HeartbeatMs;
while (!ct.IsCancellationRequested)
{
@@ -130,6 +167,10 @@ public sealed class PushConsumerEngine
var headers = BuildDataHeaders(msg);
var subject = string.IsNullOrEmpty(deliverSubject) ? msg.Subject : deliverSubject;
await sendMessage(subject, msg.Subject, headers, msg.Payload, ct).ConfigureAwait(false);
// Go: consumer.go:5222 — reset idle heartbeat timer on data delivery
if (heartbeatMs > 0)
ResetIdleHeartbeatTimer(heartbeatMs);
}
else if (frame.IsFlowControl)
{
@@ -153,6 +194,50 @@ public sealed class PushConsumerEngine
}
}
// Go: consumer.go:5222 — start the idle heartbeat background timer
private void StartIdleHeartbeatTimer(int heartbeatMs)
{
_idleHeartbeatTimer = new Timer(
SendIdleHeartbeatCallback,
null,
heartbeatMs,
heartbeatMs);
}
// Go: consumer.go:5222 — reset idle heartbeat timer with the configured period
private void ResetIdleHeartbeatTimer(int heartbeatMs)
{
_idleHeartbeatTimer?.Change(heartbeatMs, heartbeatMs);
}
private void StopIdleHeartbeatTimer()
{
_idleHeartbeatTimer?.Dispose();
_idleHeartbeatTimer = null;
}
// Go: consumer.go:5222 — sendIdleHeartbeat callback
private void SendIdleHeartbeatCallback(object? state)
{
if (_sendMessage is null || _externalCt.IsCancellationRequested)
return;
try
{
var headers = "NATS/1.0 100 Idle Heartbeat\r\n\r\n"u8.ToArray();
var subject = string.IsNullOrEmpty(DeliverSubject) ? "_hb_" : DeliverSubject;
_sendMessage(subject, string.Empty, headers, ReadOnlyMemory<byte>.Empty, _externalCt)
.AsTask()
.GetAwaiter()
.GetResult();
IdleHeartbeatsSent++;
}
catch (OperationCanceledException)
{
// Shutting down — ignore
}
}
// Go: stream.go:586 — JSSequence = "Nats-Sequence", JSTimeStamp = "Nats-Time-Stamp", JSSubject = "Nats-Subject"
private static ReadOnlyMemory<byte> BuildDataHeaders(StoredMessage msg)
{