using System.Collections.Concurrent; namespace NATS.Server.JetStream.Api; /// /// Tracks pending clustered JetStream API requests, correlates RAFT apply callbacks with /// waiting callers, and enforces per-request timeouts. /// Go reference: jetstream_cluster.go:7620-7701 — jsClusteredStreamRequest proposes an entry /// to the meta RAFT group and waits for the leader to apply it; the result is delivered via /// a per-request channel. This class models that channel-per-request pattern using /// TaskCompletionSource. /// public sealed class ClusteredRequestProcessor { private static readonly TimeSpan DefaultTimeout = TimeSpan.FromSeconds(5); private readonly ConcurrentDictionary> _pending = new(); private readonly TimeSpan _timeout; private int _pendingCount; public ClusteredRequestProcessor(TimeSpan? timeout = null) { _timeout = timeout ?? DefaultTimeout; } /// Current number of in-flight pending requests. public int PendingCount => _pendingCount; /// /// Registers a new pending request and returns a unique correlation ID. /// Go reference: jetstream_cluster.go:7620 — each clustered request gets a unique ID /// used to correlate the RAFT apply callback with the waiting caller. /// public string RegisterPending() { var id = Guid.NewGuid().ToString("N"); var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); _pending[id] = tcs; Interlocked.Increment(ref _pendingCount); return id; } /// /// Waits for a result to be delivered for the given request ID. /// Returns a timeout error if no result is delivered within the configured timeout, /// or a 500 error if the ID was never registered. /// Go reference: jetstream_cluster.go:7620 — the goroutine waits on a per-request channel /// with a context deadline derived from the cluster's JSApiTimeout option. /// public async Task WaitForResultAsync(string requestId, CancellationToken ct = default) { if (!_pending.TryGetValue(requestId, out var tcs)) { return JetStreamApiResponse.ErrorResponse(500, "request id not found"); } using var timeoutCts = new CancellationTokenSource(_timeout); using var linked = CancellationTokenSource.CreateLinkedTokenSource(timeoutCts.Token, ct); try { await using var reg = linked.Token.Register(() => tcs.TrySetCanceled(linked.Token)); return await tcs.Task.ConfigureAwait(false); } catch (OperationCanceledException) { _pending.TryRemove(requestId, out _); Interlocked.Decrement(ref _pendingCount); return JetStreamApiResponse.ErrorResponse(408, "timeout waiting for cluster response"); } } /// /// Delivers a result for a pending request. Returns true if the request was found and /// the result was accepted; false if the ID is unknown or already completed. /// Go reference: jetstream_cluster.go:7620 — the RAFT apply callback resolves the pending /// request channel so the waiting goroutine can return the response to the caller. /// public bool DeliverResult(string requestId, JetStreamApiResponse response) { if (!_pending.TryRemove(requestId, out var tcs)) return false; Interlocked.Decrement(ref _pendingCount); return tcs.TrySetResult(response); } /// /// Cancels all pending requests with a 503 error, typically called when this node loses /// RAFT leadership so callers do not hang indefinitely. /// Go reference: jetstream_cluster.go — when RAFT leadership changes, all in-flight /// proposals must be failed with a "not leader" or "cancelled" error. /// public void CancelAll(string reason = "leadership changed") { foreach (var (key, tcs) in _pending) { if (_pending.TryRemove(key, out _)) { Interlocked.Decrement(ref _pendingCount); tcs.TrySetResult(JetStreamApiResponse.ErrorResponse(503, reason)); } } } }