fix: stabilize pull consumer expires timeout fetch

This commit is contained in:
Joseph Doherty
2026-03-13 10:29:02 -04:00
parent b2707a7493
commit 655ca30e0b

View File

@@ -95,9 +95,6 @@ public sealed class CompiledFilter
public sealed class PullConsumerEngine public sealed class PullConsumerEngine
{ {
// Reusable fetch buffer to avoid per-fetch List allocation.
// Go reference: consumer.go — reuses slice backing array across fetch calls.
[ThreadStatic] private static List<StoredMessage>? t_fetchBuf;
// Go: consumer.go — cluster-wide pending pull request tracking keyed by reply subject. // Go: consumer.go — cluster-wide pending pull request tracking keyed by reply subject.
// Reference: golang/nats-server/server/consumer.go waitingRequestsPending / proposeWaitingRequest // Reference: golang/nats-server/server/consumer.go waitingRequestsPending / proposeWaitingRequest
private readonly ConcurrentDictionary<string, PullWaitingRequest> _clusterPending = private readonly ConcurrentDictionary<string, PullWaitingRequest> _clusterPending =
@@ -158,11 +155,7 @@ public sealed class PullConsumerEngine
public async ValueTask<PullFetchBatch> FetchAsync(StreamHandle stream, ConsumerHandle consumer, PullFetchRequest request, CancellationToken ct) public async ValueTask<PullFetchBatch> FetchAsync(StreamHandle stream, ConsumerHandle consumer, PullFetchRequest request, CancellationToken ct)
{ {
var batch = Math.Max(request.Batch, 1); var batch = Math.Max(request.Batch, 1);
// Use thread-static buffer to avoid per-fetch List allocation. var messages = new List<StoredMessage>(batch);
// Results are snapshot'd into PullFetchBatch before the buffer is reused.
var messages = t_fetchBuf ??= new List<StoredMessage>();
messages.Clear();
if (messages.Capacity < batch) messages.Capacity = batch;
// Go: consumer.go — enforce ExpiresMs timeout on pull fetch requests. // Go: consumer.go — enforce ExpiresMs timeout on pull fetch requests.
// When ExpiresMs > 0, create a linked CancellationTokenSource that fires // When ExpiresMs > 0, create a linked CancellationTokenSource that fires
@@ -313,6 +306,7 @@ public sealed class PullConsumerEngine
await Task.Delay(5, ct).ConfigureAwait(false); await Task.Delay(5, ct).ConfigureAwait(false);
} }
ct.ThrowIfCancellationRequested();
return null; return null;
} }