From 655ca30e0bbd0ca790872c606b9a58ae77235c74 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Fri, 13 Mar 2026 10:29:02 -0400 Subject: [PATCH] fix: stabilize pull consumer expires timeout fetch --- .../JetStream/Consumers/PullConsumerEngine.cs | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs b/src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs index 5d1bbbc..eb84e6d 100644 --- a/src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs +++ b/src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs @@ -95,9 +95,6 @@ public sealed class CompiledFilter public sealed class PullConsumerEngine { - // Reusable fetch buffer to avoid per-fetch List allocation. - // Go reference: consumer.go — reuses slice backing array across fetch calls. - [ThreadStatic] private static List? t_fetchBuf; // Go: consumer.go — cluster-wide pending pull request tracking keyed by reply subject. // Reference: golang/nats-server/server/consumer.go waitingRequestsPending / proposeWaitingRequest private readonly ConcurrentDictionary _clusterPending = @@ -158,11 +155,7 @@ public sealed class PullConsumerEngine public async ValueTask FetchAsync(StreamHandle stream, ConsumerHandle consumer, PullFetchRequest request, CancellationToken ct) { var batch = Math.Max(request.Batch, 1); - // Use thread-static buffer to avoid per-fetch List allocation. - // Results are snapshot'd into PullFetchBatch before the buffer is reused. - var messages = t_fetchBuf ??= new List(); - messages.Clear(); - if (messages.Capacity < batch) messages.Capacity = batch; + var messages = new List(batch); // Go: consumer.go — enforce ExpiresMs timeout on pull fetch requests. // When ExpiresMs > 0, create a linked CancellationTokenSource that fires @@ -313,6 +306,7 @@ public sealed class PullConsumerEngine await Task.Delay(5, ct).ConfigureAwait(false); } + ct.ThrowIfCancellationRequested(); return null; }