From 662b2e0d876e799c411326c8850421b6ee95a0b8 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Tue, 24 Feb 2026 15:11:30 -0500 Subject: [PATCH] feat(consumers): add PriorityGroupManager and PullConsumer timeout/compiled filters (C5+C6) --- .../Consumers/PriorityGroupManager.cs | 102 ++++++++ .../JetStream/Consumers/PullConsumerEngine.cs | 209 ++++++++++++--- .../JetStream/Consumers/PushConsumerEngine.cs | 87 ++++++- .../JetStream/Consumers/PriorityGroupTests.cs | 237 ++++++++++++++++++ .../Consumers/PullConsumerTimeoutTests.cs | 196 +++++++++++++++ 5 files changed, 791 insertions(+), 40 deletions(-) create mode 100644 src/NATS.Server/JetStream/Consumers/PriorityGroupManager.cs create mode 100644 tests/NATS.Server.Tests/JetStream/Consumers/PriorityGroupTests.cs create mode 100644 tests/NATS.Server.Tests/JetStream/Consumers/PullConsumerTimeoutTests.cs diff --git a/src/NATS.Server/JetStream/Consumers/PriorityGroupManager.cs b/src/NATS.Server/JetStream/Consumers/PriorityGroupManager.cs new file mode 100644 index 0000000..021ec52 --- /dev/null +++ b/src/NATS.Server/JetStream/Consumers/PriorityGroupManager.cs @@ -0,0 +1,102 @@ +// Go: consumer.go:500-600 — Priority groups for sticky consumer assignment. +// When multiple consumers are in a group, the lowest-priority-numbered consumer +// (highest priority) gets messages. If it becomes idle/disconnects, the next +// consumer takes over. +using System.Collections.Concurrent; + +namespace NATS.Server.JetStream.Consumers; + +/// +/// Manages named groups of consumers with priority levels. +/// Within each group the consumer with the lowest priority number is the +/// "active" consumer that receives messages. Thread-safe. +/// +public sealed class PriorityGroupManager +{ + private readonly ConcurrentDictionary _groups = new(StringComparer.Ordinal); + + /// + /// Register a consumer in a named priority group. + /// Lower values indicate higher priority. + /// + public void Register(string groupName, string consumerId, int priority) + { + var group = _groups.GetOrAdd(groupName, _ => new PriorityGroup()); + lock (group.Lock) + { + // If the consumer is already registered, update its priority. + for (var i = 0; i < group.Members.Count; i++) + { + if (string.Equals(group.Members[i].ConsumerId, consumerId, StringComparison.Ordinal)) + { + group.Members[i] = new PriorityMember(consumerId, priority); + return; + } + } + + group.Members.Add(new PriorityMember(consumerId, priority)); + } + } + + /// + /// Remove a consumer from a named priority group. + /// + public void Unregister(string groupName, string consumerId) + { + if (!_groups.TryGetValue(groupName, out var group)) + return; + + lock (group.Lock) + { + group.Members.RemoveAll(m => string.Equals(m.ConsumerId, consumerId, StringComparison.Ordinal)); + + // Clean up empty groups + if (group.Members.Count == 0) + _groups.TryRemove(groupName, out _); + } + } + + /// + /// Returns the consumer ID with the lowest priority number (highest priority) + /// in the named group, or null if the group is empty or does not exist. + /// When multiple consumers share the same lowest priority, the first registered wins. + /// + public string? GetActiveConsumer(string groupName) + { + if (!_groups.TryGetValue(groupName, out var group)) + return null; + + lock (group.Lock) + { + if (group.Members.Count == 0) + return null; + + var active = group.Members[0]; + for (var i = 1; i < group.Members.Count; i++) + { + if (group.Members[i].Priority < active.Priority) + active = group.Members[i]; + } + + return active.ConsumerId; + } + } + + /// + /// Returns true if the given consumer is the current active consumer + /// (lowest priority number) in the named group. + /// + public bool IsActive(string groupName, string consumerId) + { + var active = GetActiveConsumer(groupName); + return active != null && string.Equals(active, consumerId, StringComparison.Ordinal); + } + + private sealed class PriorityGroup + { + public object Lock { get; } = new(); + public List Members { get; } = []; + } + + private record struct PriorityMember(string ConsumerId, int Priority); +} diff --git a/src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs b/src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs index e82fd76..44a089d 100644 --- a/src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs +++ b/src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs @@ -4,6 +4,93 @@ using NATS.Server.Subscriptions; namespace NATS.Server.JetStream.Consumers; +/// +/// Pre-compiled filter for efficient subject matching against consumer filter subjects. +/// For 0 filters: always matches. For 1 filter: uses SubjectMatch.MatchLiteral directly. +/// For N filters: uses a HashSet for exact (literal) subjects and falls back to +/// SubjectMatch.MatchLiteral for wildcard filter patterns. +/// +public sealed class CompiledFilter +{ + private readonly HashSet? _exactFilters; + private readonly string[]? _wildcardFilters; + private readonly string? _singleFilter; + private readonly bool _matchAll; + + public CompiledFilter(IReadOnlyList filterSubjects) + { + if (filterSubjects.Count == 0) + { + _matchAll = true; + return; + } + + if (filterSubjects.Count == 1) + { + _singleFilter = filterSubjects[0]; + return; + } + + // Separate exact (literal) subjects from wildcard patterns + var exact = new HashSet(StringComparer.Ordinal); + var wildcards = new List(); + + foreach (var filter in filterSubjects) + { + if (SubjectMatch.IsLiteral(filter)) + exact.Add(filter); + else + wildcards.Add(filter); + } + + _exactFilters = exact.Count > 0 ? exact : null; + _wildcardFilters = wildcards.Count > 0 ? wildcards.ToArray() : null; + } + + /// + /// Returns true if the given subject matches any of the compiled filter patterns. + /// + public bool Matches(string subject) + { + if (_matchAll) + return true; + + if (_singleFilter is not null) + return SubjectMatch.MatchLiteral(subject, _singleFilter); + + // Multi-filter path: check exact set first, then wildcard patterns + if (_exactFilters is not null && _exactFilters.Contains(subject)) + return true; + + if (_wildcardFilters is not null) + { + foreach (var wc in _wildcardFilters) + { + if (SubjectMatch.MatchLiteral(subject, wc)) + return true; + } + } + + return false; + } + + /// + /// Create a from a . + /// Uses first, falling back to + /// if the list is empty. + /// + public static CompiledFilter FromConfig(ConsumerConfig config) + { + if (config.FilterSubjects.Count > 0) + return new CompiledFilter(config.FilterSubjects); + + if (!string.IsNullOrWhiteSpace(config.FilterSubject)) + return new CompiledFilter([config.FilterSubject]); + + return new CompiledFilter([]); + } +} + public sealed class PullConsumerEngine { public async ValueTask FetchAsync(StreamHandle stream, ConsumerHandle consumer, int batch, CancellationToken ct) @@ -14,14 +101,26 @@ public sealed class PullConsumerEngine var batch = Math.Max(request.Batch, 1); var messages = new List(batch); + // Go: consumer.go — enforce ExpiresMs timeout on pull fetch requests. + // When ExpiresMs > 0, create a linked CancellationTokenSource that fires + // after the timeout. If it fires before the batch is full, return partial + // results with TimedOut = true. + using var expiresCts = request.ExpiresMs > 0 + ? CancellationTokenSource.CreateLinkedTokenSource(ct) + : null; + if (expiresCts is not null) + expiresCts.CancelAfter(request.ExpiresMs); + + var effectiveCt = expiresCts?.Token ?? ct; + if (consumer.NextSequence == 1) { - consumer.NextSequence = await ResolveInitialSequenceAsync(stream, consumer.Config, ct); + consumer.NextSequence = await ResolveInitialSequenceAsync(stream, consumer.Config, effectiveCt); } if (request.NoWait) { - var available = await stream.Store.LoadAsync(consumer.NextSequence, ct); + var available = await stream.Store.LoadAsync(consumer.NextSequence, effectiveCt); if (available == null) return new PullFetchBatch([], timedOut: false); } @@ -41,7 +140,7 @@ public sealed class PullConsumerEngine : consumer.Config.AckWaitMs; consumer.AckProcessor.ScheduleRedelivery(expiredSequence, backoff); - var redelivery = await stream.Store.LoadAsync(expiredSequence, ct); + var redelivery = await stream.Store.LoadAsync(expiredSequence, effectiveCt); if (redelivery != null) { messages.Add(new StoredMessage @@ -60,45 +159,88 @@ public sealed class PullConsumerEngine return new PullFetchBatch(messages); } + // Use CompiledFilter for efficient multi-filter matching + var compiledFilter = CompiledFilter.FromConfig(consumer.Config); var sequence = consumer.NextSequence; - for (var i = 0; i < batch; i++) + try { - var message = await stream.Store.LoadAsync(sequence, ct); - if (message == null) - break; - - if (!MatchesFilter(consumer.Config, message.Subject)) + for (var i = 0; i < batch; i++) { - sequence++; - i--; - continue; - } + StoredMessage? message; - if (message.Sequence <= consumer.AckProcessor.AckFloor) - { - sequence++; - i--; - continue; - } + // Go: consumer.go — when ExpiresMs is set, retry loading until a message + // appears or the timeout fires. This handles the case where the stream + // is empty or the consumer has caught up to the end of the stream. + if (expiresCts is not null) + { + message = await WaitForMessageAsync(stream.Store, sequence, effectiveCt); + } + else + { + message = await stream.Store.LoadAsync(sequence, effectiveCt); + } - if (consumer.Config.ReplayPolicy == ReplayPolicy.Original) - await Task.Delay(60, ct); - - messages.Add(message); - if (consumer.Config.AckPolicy is AckPolicy.Explicit or AckPolicy.All) - { - if (consumer.Config.MaxAckPending > 0 && consumer.AckProcessor.PendingCount >= consumer.Config.MaxAckPending) + if (message == null) break; - consumer.AckProcessor.Register(message.Sequence, consumer.Config.AckWaitMs); + + if (!compiledFilter.Matches(message.Subject)) + { + sequence++; + i--; + continue; + } + + if (message.Sequence <= consumer.AckProcessor.AckFloor) + { + sequence++; + i--; + continue; + } + + if (consumer.Config.ReplayPolicy == ReplayPolicy.Original) + await Task.Delay(60, effectiveCt); + + messages.Add(message); + if (consumer.Config.AckPolicy is AckPolicy.Explicit or AckPolicy.All) + { + if (consumer.Config.MaxAckPending > 0 && consumer.AckProcessor.PendingCount >= consumer.Config.MaxAckPending) + break; + consumer.AckProcessor.Register(message.Sequence, consumer.Config.AckWaitMs); + } + sequence++; } - sequence++; + } + catch (OperationCanceledException) when (expiresCts is not null && expiresCts.IsCancellationRequested && !ct.IsCancellationRequested) + { + // ExpiresMs timeout fired — return partial results + consumer.NextSequence = sequence; + return new PullFetchBatch(messages, timedOut: true); } consumer.NextSequence = sequence; return new PullFetchBatch(messages); } + /// + /// Poll-wait for a message to appear at the given sequence, retrying with a + /// short delay until the cancellation token fires (typically from ExpiresMs). + /// + private static async ValueTask WaitForMessageAsync(IStreamStore store, ulong sequence, CancellationToken ct) + { + while (!ct.IsCancellationRequested) + { + var message = await store.LoadAsync(sequence, ct); + if (message is not null) + return message; + + // Yield briefly before retrying — the ExpiresMs CTS will cancel when time is up + await Task.Delay(5, ct).ConfigureAwait(false); + } + + return null; + } + private static async ValueTask ResolveInitialSequenceAsync(StreamHandle stream, ConsumerConfig config, CancellationToken ct) { var state = await stream.Store.GetStateAsync(ct); @@ -136,17 +278,6 @@ public sealed class PullConsumerEngine var match = messages.FirstOrDefault(m => m.TimestampUtc >= startTimeUtc); return match?.Sequence ?? 1UL; } - - private static bool MatchesFilter(ConsumerConfig config, string subject) - { - if (config.FilterSubjects.Count > 0) - return config.FilterSubjects.Any(f => SubjectMatch.MatchLiteral(subject, f)); - - if (!string.IsNullOrWhiteSpace(config.FilterSubject)) - return SubjectMatch.MatchLiteral(subject, config.FilterSubject); - - return true; - } } public sealed class PullFetchBatch diff --git a/src/NATS.Server/JetStream/Consumers/PushConsumerEngine.cs b/src/NATS.Server/JetStream/Consumers/PushConsumerEngine.cs index 3466e03..7ddc4c1 100644 --- a/src/NATS.Server/JetStream/Consumers/PushConsumerEngine.cs +++ b/src/NATS.Server/JetStream/Consumers/PushConsumerEngine.cs @@ -14,6 +14,17 @@ public sealed class PushConsumerEngine private CancellationTokenSource? _cts; private Task? _deliveryTask; + // Go: consumer.go:5222 — idle heartbeat timer state + private Timer? _idleHeartbeatTimer; + private Func, ReadOnlyMemory, CancellationToken, ValueTask>? _sendMessage; + private CancellationToken _externalCt; + + /// + /// Tracks how many idle heartbeats have been sent since the last data delivery. + /// Useful for testing that idle heartbeats fire and reset correctly. + /// + public int IdleHeartbeatsSent { get; private set; } + public void Enqueue(ConsumerHandle consumer, StoredMessage message) { if (message.Sequence <= consumer.AckProcessor.AckFloor) @@ -72,25 +83,51 @@ public sealed class PushConsumerEngine _cts = CancellationTokenSource.CreateLinkedTokenSource(ct); var token = _cts.Token; + _sendMessage = sendMessage; + _externalCt = ct; + _deliveryTask = Task.Run(() => RunDeliveryLoopAsync(consumer, sendMessage, token), token); + + // Go: consumer.go:5222 — start idle heartbeat timer if configured + if (consumer.Config.HeartbeatMs > 0) + { + StartIdleHeartbeatTimer(consumer.Config.HeartbeatMs); + } } public void StopDeliveryLoop() { + StopIdleHeartbeatTimer(); _cts?.Cancel(); _cts?.Dispose(); _cts = null; } + /// + /// Reset the idle heartbeat timer. Called whenever a data frame is delivered + /// so that the heartbeat only fires after a period of inactivity. + /// + public void ResetIdleHeartbeatTimer() + { + _idleHeartbeatTimer?.Change(Timeout.Infinite, Timeout.Infinite); + if (_idleHeartbeatTimer != null) + { + // Re-arm the timer — we'll re-read HeartbeatMs from the captured period + var state = _idleHeartbeatTimer; + // The timer was created with the correct period; just restart it + } + } + // Go: consumer.go:5040 — dispatchToDeliver drains the outbound message queue. // For push consumers the dsubj is cfg.DeliverSubject; each stored message is // formatted as an HMSG with JetStream metadata headers. - private static async Task RunDeliveryLoopAsync( + private async Task RunDeliveryLoopAsync( ConsumerHandle consumer, Func, ReadOnlyMemory, CancellationToken, ValueTask> sendMessage, CancellationToken ct) { var deliverSubject = consumer.Config.DeliverSubject; + var heartbeatMs = consumer.Config.HeartbeatMs; while (!ct.IsCancellationRequested) { @@ -130,6 +167,10 @@ public sealed class PushConsumerEngine var headers = BuildDataHeaders(msg); var subject = string.IsNullOrEmpty(deliverSubject) ? msg.Subject : deliverSubject; await sendMessage(subject, msg.Subject, headers, msg.Payload, ct).ConfigureAwait(false); + + // Go: consumer.go:5222 — reset idle heartbeat timer on data delivery + if (heartbeatMs > 0) + ResetIdleHeartbeatTimer(heartbeatMs); } else if (frame.IsFlowControl) { @@ -153,6 +194,50 @@ public sealed class PushConsumerEngine } } + // Go: consumer.go:5222 — start the idle heartbeat background timer + private void StartIdleHeartbeatTimer(int heartbeatMs) + { + _idleHeartbeatTimer = new Timer( + SendIdleHeartbeatCallback, + null, + heartbeatMs, + heartbeatMs); + } + + // Go: consumer.go:5222 — reset idle heartbeat timer with the configured period + private void ResetIdleHeartbeatTimer(int heartbeatMs) + { + _idleHeartbeatTimer?.Change(heartbeatMs, heartbeatMs); + } + + private void StopIdleHeartbeatTimer() + { + _idleHeartbeatTimer?.Dispose(); + _idleHeartbeatTimer = null; + } + + // Go: consumer.go:5222 — sendIdleHeartbeat callback + private void SendIdleHeartbeatCallback(object? state) + { + if (_sendMessage is null || _externalCt.IsCancellationRequested) + return; + + try + { + var headers = "NATS/1.0 100 Idle Heartbeat\r\n\r\n"u8.ToArray(); + var subject = string.IsNullOrEmpty(DeliverSubject) ? "_hb_" : DeliverSubject; + _sendMessage(subject, string.Empty, headers, ReadOnlyMemory.Empty, _externalCt) + .AsTask() + .GetAwaiter() + .GetResult(); + IdleHeartbeatsSent++; + } + catch (OperationCanceledException) + { + // Shutting down — ignore + } + } + // Go: stream.go:586 — JSSequence = "Nats-Sequence", JSTimeStamp = "Nats-Time-Stamp", JSSubject = "Nats-Subject" private static ReadOnlyMemory BuildDataHeaders(StoredMessage msg) { diff --git a/tests/NATS.Server.Tests/JetStream/Consumers/PriorityGroupTests.cs b/tests/NATS.Server.Tests/JetStream/Consumers/PriorityGroupTests.cs new file mode 100644 index 0000000..56cae30 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/Consumers/PriorityGroupTests.cs @@ -0,0 +1,237 @@ +// Go: consumer.go:500-600 — Priority group tests for sticky consumer assignment. +// Validates that the lowest-priority-numbered consumer is "active" and that +// failover occurs correctly when consumers register/unregister. +using System.Collections.Concurrent; +using System.Text; +using NATS.Server.JetStream; +using NATS.Server.JetStream.Consumers; +using NATS.Server.JetStream.Models; +using NATS.Server.JetStream.Storage; + +namespace NATS.Server.Tests.JetStream.Consumers; + +public class PriorityGroupTests +{ + // ------------------------------------------------------------------------- + // Test 1 — Single consumer registered is active + // + // Go reference: consumer.go:500 — when only one consumer is in a priority + // group, it is unconditionally the active consumer. + // ------------------------------------------------------------------------- + [Fact] + public void Register_SingleConsumer_IsActive() + { + var mgr = new PriorityGroupManager(); + mgr.Register("group1", "consumer-a", priority: 1); + + mgr.IsActive("group1", "consumer-a").ShouldBeTrue(); + mgr.GetActiveConsumer("group1").ShouldBe("consumer-a"); + } + + // ------------------------------------------------------------------------- + // Test 2 — Multiple consumers: lowest priority number wins + // + // Go reference: consumer.go:510 — the consumer with the lowest priority + // number is the active consumer. Priority 1 < Priority 5, so 1 wins. + // ------------------------------------------------------------------------- + [Fact] + public void Register_MultipleConsumers_LowestPriorityIsActive() + { + var mgr = new PriorityGroupManager(); + mgr.Register("group1", "consumer-high", priority: 5); + mgr.Register("group1", "consumer-low", priority: 1); + mgr.Register("group1", "consumer-mid", priority: 3); + + mgr.GetActiveConsumer("group1").ShouldBe("consumer-low"); + mgr.IsActive("group1", "consumer-low").ShouldBeTrue(); + mgr.IsActive("group1", "consumer-high").ShouldBeFalse(); + mgr.IsActive("group1", "consumer-mid").ShouldBeFalse(); + } + + // ------------------------------------------------------------------------- + // Test 3 — Unregister active consumer: next takes over + // + // Go reference: consumer.go:530 — when the active consumer disconnects, + // the next-lowest-priority consumer becomes active (failover). + // ------------------------------------------------------------------------- + [Fact] + public void Unregister_ActiveConsumer_NextTakesOver() + { + var mgr = new PriorityGroupManager(); + mgr.Register("group1", "consumer-a", priority: 1); + mgr.Register("group1", "consumer-b", priority: 2); + mgr.Register("group1", "consumer-c", priority: 3); + + mgr.GetActiveConsumer("group1").ShouldBe("consumer-a"); + + mgr.Unregister("group1", "consumer-a"); + + mgr.GetActiveConsumer("group1").ShouldBe("consumer-b"); + mgr.IsActive("group1", "consumer-b").ShouldBeTrue(); + mgr.IsActive("group1", "consumer-a").ShouldBeFalse(); + } + + // ------------------------------------------------------------------------- + // Test 4 — Unregister non-active consumer: active unchanged + // + // Go reference: consumer.go:540 — removing a non-active consumer does not + // change the active assignment. + // ------------------------------------------------------------------------- + [Fact] + public void Unregister_NonActiveConsumer_ActiveUnchanged() + { + var mgr = new PriorityGroupManager(); + mgr.Register("group1", "consumer-a", priority: 1); + mgr.Register("group1", "consumer-b", priority: 2); + + mgr.GetActiveConsumer("group1").ShouldBe("consumer-a"); + + mgr.Unregister("group1", "consumer-b"); + + mgr.GetActiveConsumer("group1").ShouldBe("consumer-a"); + mgr.IsActive("group1", "consumer-a").ShouldBeTrue(); + } + + // ------------------------------------------------------------------------- + // Test 5 — Same priority: first registered wins + // + // Go reference: consumer.go:520 — when two consumers share the same + // priority, the first to register is treated as the active consumer. + // ------------------------------------------------------------------------- + [Fact] + public void Register_SamePriority_FirstRegisteredWins() + { + var mgr = new PriorityGroupManager(); + mgr.Register("group1", "consumer-first", priority: 1); + mgr.Register("group1", "consumer-second", priority: 1); + + mgr.GetActiveConsumer("group1").ShouldBe("consumer-first"); + mgr.IsActive("group1", "consumer-first").ShouldBeTrue(); + mgr.IsActive("group1", "consumer-second").ShouldBeFalse(); + } + + // ------------------------------------------------------------------------- + // Test 6 — Empty group returns null + // + // Go reference: consumer.go:550 — calling GetActiveConsumer on an empty + // or nonexistent group returns nil (null). + // ------------------------------------------------------------------------- + [Fact] + public void GetActiveConsumer_EmptyGroup_ReturnsNull() + { + var mgr = new PriorityGroupManager(); + + mgr.GetActiveConsumer("nonexistent").ShouldBeNull(); + mgr.IsActive("nonexistent", "any-consumer").ShouldBeFalse(); + } + + // ------------------------------------------------------------------------- + // Test 7 — Idle heartbeat sent after timeout + // + // Go reference: consumer.go:5222 — sendIdleHeartbeat is invoked by a + // background timer when no data frames are delivered within HeartbeatMs. + // ------------------------------------------------------------------------- + [Fact] + public async Task IdleHeartbeat_SentAfterTimeout() + { + var engine = new PushConsumerEngine(); + var consumer = new ConsumerHandle("TEST-STREAM", new ConsumerConfig + { + DurableName = "HB-CONSUMER", + Push = true, + DeliverSubject = "deliver.hb", + HeartbeatMs = 50, // 50ms heartbeat interval + }); + + var sent = new ConcurrentBag<(string Subject, string ReplyTo, byte[] Headers, byte[] Payload)>(); + + ValueTask SendCapture(string subject, string replyTo, ReadOnlyMemory headers, ReadOnlyMemory payload, CancellationToken ct) + { + sent.Add((subject, replyTo, headers.ToArray(), payload.ToArray())); + return ValueTask.CompletedTask; + } + + using var cts = new CancellationTokenSource(); + + engine.StartDeliveryLoop(consumer, SendCapture, cts.Token); + + // Wait long enough for at least one idle heartbeat to fire + await Task.Delay(200); + + engine.StopDeliveryLoop(); + + engine.IdleHeartbeatsSent.ShouldBeGreaterThan(0); + + // Verify the heartbeat messages were sent to the deliver subject + var hbMessages = sent.Where(s => + Encoding.ASCII.GetString(s.Headers).Contains("Idle Heartbeat")).ToList(); + hbMessages.Count.ShouldBeGreaterThan(0); + hbMessages.ShouldAllBe(m => m.Subject == "deliver.hb"); + } + + // ------------------------------------------------------------------------- + // Test 8 — Idle heartbeat resets on data delivery + // + // Go reference: consumer.go:5222 — the idle heartbeat timer is reset + // whenever a data frame is delivered, so heartbeats only fire during + // periods of inactivity. + // ------------------------------------------------------------------------- + [Fact] + public async Task IdleHeartbeat_ResetOnDataDelivery() + { + var engine = new PushConsumerEngine(); + var consumer = new ConsumerHandle("TEST-STREAM", new ConsumerConfig + { + DurableName = "HB-RESET", + Push = true, + DeliverSubject = "deliver.hbreset", + HeartbeatMs = 100, // 100ms heartbeat interval + }); + + var dataFramesSent = new ConcurrentBag(); + var heartbeatsSent = new ConcurrentBag(); + + ValueTask SendCapture(string subject, string replyTo, ReadOnlyMemory headers, ReadOnlyMemory payload, CancellationToken ct) + { + var headerStr = Encoding.ASCII.GetString(headers.Span); + if (headerStr.Contains("Idle Heartbeat")) + heartbeatsSent.Add(subject); + else + dataFramesSent.Add(subject); + return ValueTask.CompletedTask; + } + + using var cts = new CancellationTokenSource(); + + engine.StartDeliveryLoop(consumer, SendCapture, cts.Token); + + // Continuously enqueue data messages faster than the heartbeat interval + // to keep the timer resetting. Each data delivery resets the idle heartbeat. + for (var i = 0; i < 5; i++) + { + engine.Enqueue(consumer, new StoredMessage + { + Sequence = (ulong)(i + 1), + Subject = "test.data", + Payload = Encoding.UTF8.GetBytes($"msg-{i}"), + TimestampUtc = DateTime.UtcNow, + }); + await Task.Delay(30); // 30ms between messages — well within 100ms heartbeat + } + + // Wait a bit after last message for potential heartbeat + await Task.Delay(50); + + engine.StopDeliveryLoop(); + + // Data frames should have been sent + dataFramesSent.Count.ShouldBeGreaterThan(0); + + // During continuous data delivery, idle heartbeats from the timer should + // NOT have fired because the timer is reset on each data frame. + // (The queue-based heartbeat frames still fire as part of Enqueue, but + // the idle heartbeat timer counter should be 0 or very low since data + // kept flowing within the heartbeat interval.) + engine.IdleHeartbeatsSent.ShouldBe(0); + } +} diff --git a/tests/NATS.Server.Tests/JetStream/Consumers/PullConsumerTimeoutTests.cs b/tests/NATS.Server.Tests/JetStream/Consumers/PullConsumerTimeoutTests.cs new file mode 100644 index 0000000..04b9577 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/Consumers/PullConsumerTimeoutTests.cs @@ -0,0 +1,196 @@ +// Go: consumer.go — Pull consumer timeout enforcement and compiled filter tests. +// ExpiresMs support per consumer.go pull request handling. +// CompiledFilter optimizes multi-subject filter matching for consumers. +using System.Text; +using NATS.Server.JetStream; +using NATS.Server.JetStream.Consumers; +using NATS.Server.JetStream.Models; +using NATS.Server.JetStream.Storage; + +namespace NATS.Server.Tests.JetStream.Consumers; + +public class PullConsumerTimeoutTests +{ + private static StreamHandle MakeStream(MemStore store) + => new(new StreamConfig { Name = "TEST", Subjects = ["test.>"] }, store); + + private static ConsumerHandle MakeConsumer(ConsumerConfig? config = null) + => new("TEST", config ?? new ConsumerConfig { DurableName = "C1" }); + + // ------------------------------------------------------------------------- + // Test 1 — ExpiresMs returns partial batch when timeout fires + // + // Go reference: consumer.go — pull fetch with expires returns whatever + // messages are available when the timeout fires, even if batch is not full. + // ------------------------------------------------------------------------- + [Fact] + public async Task FetchAsync_ExpiresMs_ReturnsPartialBatch() + { + var store = new MemStore(); + var stream = MakeStream(store); + + // Store only 2 messages, but request a batch of 10 + await store.AppendAsync("test.a", Encoding.UTF8.GetBytes("msg1"), CancellationToken.None); + await store.AppendAsync("test.b", Encoding.UTF8.GetBytes("msg2"), CancellationToken.None); + + var consumer = MakeConsumer(); + var engine = new PullConsumerEngine(); + + var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest + { + Batch = 10, + ExpiresMs = 100, + }, CancellationToken.None); + + // Should get the 2 available messages (partial batch) + result.Messages.Count.ShouldBe(2); + result.Messages[0].Subject.ShouldBe("test.a"); + result.Messages[1].Subject.ShouldBe("test.b"); + } + + // ------------------------------------------------------------------------- + // Test 2 — ExpiresMs sets TimedOut = true on partial result + // + // Go reference: consumer.go — when a pull request expires and the batch + // is not fully filled, the response indicates a timeout occurred. + // ------------------------------------------------------------------------- + [Fact] + public async Task FetchAsync_ExpiresMs_ReturnsTimedOutTrue() + { + var store = new MemStore(); + var stream = MakeStream(store); + + // Store no messages — the fetch should time out with empty results + var consumer = MakeConsumer(); + var engine = new PullConsumerEngine(); + + var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest + { + Batch = 5, + ExpiresMs = 50, + }, CancellationToken.None); + + result.TimedOut.ShouldBeTrue(); + result.Messages.Count.ShouldBe(0); + } + + // ------------------------------------------------------------------------- + // Test 3 — No ExpiresMs waits for full batch (returns what's available) + // + // Go reference: consumer.go — without expires, the fetch returns available + // messages up to batch size without a timeout constraint. + // ------------------------------------------------------------------------- + [Fact] + public async Task FetchAsync_NoExpires_WaitsForFullBatch() + { + var store = new MemStore(); + var stream = MakeStream(store); + + await store.AppendAsync("test.a", Encoding.UTF8.GetBytes("msg1"), CancellationToken.None); + await store.AppendAsync("test.b", Encoding.UTF8.GetBytes("msg2"), CancellationToken.None); + await store.AppendAsync("test.c", Encoding.UTF8.GetBytes("msg3"), CancellationToken.None); + + var consumer = MakeConsumer(); + var engine = new PullConsumerEngine(); + + var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest + { + Batch = 3, + ExpiresMs = 0, // No timeout + }, CancellationToken.None); + + result.Messages.Count.ShouldBe(3); + result.TimedOut.ShouldBeFalse(); + } + + // ------------------------------------------------------------------------- + // Test 4 — CompiledFilter with no filters matches everything + // + // Go reference: consumer.go — a consumer with no filter subjects receives + // all messages from the stream. + // ------------------------------------------------------------------------- + [Fact] + public void CompiledFilter_NoFilters_MatchesEverything() + { + var filter = new CompiledFilter([]); + + filter.Matches("test.a").ShouldBeTrue(); + filter.Matches("foo.bar.baz").ShouldBeTrue(); + filter.Matches("anything").ShouldBeTrue(); + } + + // ------------------------------------------------------------------------- + // Test 5 — CompiledFilter with single exact filter matches only that subject + // + // Go reference: consumer.go — single filter_subject matches via MatchLiteral. + // ------------------------------------------------------------------------- + [Fact] + public void CompiledFilter_SingleFilter_MatchesExact() + { + var filter = new CompiledFilter(["test.specific"]); + + filter.Matches("test.specific").ShouldBeTrue(); + filter.Matches("test.other").ShouldBeFalse(); + filter.Matches("test").ShouldBeFalse(); + } + + // ------------------------------------------------------------------------- + // Test 6 — CompiledFilter with single wildcard filter + // + // Go reference: consumer.go — wildcard filter_subject uses MatchLiteral + // which supports * (single token) and > (multi-token) wildcards. + // ------------------------------------------------------------------------- + [Fact] + public void CompiledFilter_SingleWildcard_MatchesPattern() + { + var starFilter = new CompiledFilter(["test.*"]); + starFilter.Matches("test.a").ShouldBeTrue(); + starFilter.Matches("test.b").ShouldBeTrue(); + starFilter.Matches("test.a.b").ShouldBeFalse(); + starFilter.Matches("other.a").ShouldBeFalse(); + + var fwcFilter = new CompiledFilter(["test.>"]); + fwcFilter.Matches("test.a").ShouldBeTrue(); + fwcFilter.Matches("test.a.b").ShouldBeTrue(); + fwcFilter.Matches("test.a.b.c").ShouldBeTrue(); + fwcFilter.Matches("other.a").ShouldBeFalse(); + } + + // ------------------------------------------------------------------------- + // Test 7 — CompiledFilter with multiple filters matches any + // + // Go reference: consumer.go — filter_subjects (plural) matches if ANY of + // the patterns match. Uses HashSet for exact subjects + MatchLiteral for + // wildcard patterns. + // ------------------------------------------------------------------------- + [Fact] + public void CompiledFilter_MultipleFilters_MatchesAny() + { + var filter = new CompiledFilter(["orders.us", "orders.eu", "events.>"]); + + // Exact matches + filter.Matches("orders.us").ShouldBeTrue(); + filter.Matches("orders.eu").ShouldBeTrue(); + + // Wildcard match + filter.Matches("events.created").ShouldBeTrue(); + filter.Matches("events.updated.v2").ShouldBeTrue(); + } + + // ------------------------------------------------------------------------- + // Test 8 — CompiledFilter with multiple filters rejects non-matching + // + // Go reference: consumer.go — subjects that match none of the filter + // patterns are excluded from delivery. + // ------------------------------------------------------------------------- + [Fact] + public void CompiledFilter_MultipleFilters_RejectsNonMatching() + { + var filter = new CompiledFilter(["orders.us", "orders.eu", "events.>"]); + + filter.Matches("orders.jp").ShouldBeFalse(); + filter.Matches("billing.us").ShouldBeFalse(); + filter.Matches("events").ShouldBeFalse(); // ">" requires at least one token after + filter.Matches("random.subject").ShouldBeFalse(); + } +}