diff --git a/src/NATS.Server/JetStream/Api/ApiRateLimiter.cs b/src/NATS.Server/JetStream/Api/ApiRateLimiter.cs new file mode 100644 index 0000000..a8d04c2 --- /dev/null +++ b/src/NATS.Server/JetStream/Api/ApiRateLimiter.cs @@ -0,0 +1,108 @@ +using System.Collections.Concurrent; + +namespace NATS.Server.JetStream.Api; + +/// +/// Limits concurrent JetStream API requests and deduplicates by request ID. +/// Go reference: jetstream_api.go rate limiting and deduplication logic. +/// The Go server uses a semaphore (default 256 slots) to prevent request storms and a +/// dedup cache keyed by Nats-Msg-Id to serve identical requests without reprocessing. +/// +public sealed class ApiRateLimiter : IDisposable +{ + private readonly SemaphoreSlim _semaphore; + private readonly ConcurrentDictionary _dedupCache = new(); + private readonly TimeSpan _dedupTtl; + private readonly int _maxConcurrent; + + public ApiRateLimiter(int maxConcurrent = 256, TimeSpan? dedupTtl = null) + { + _maxConcurrent = maxConcurrent; + _semaphore = new SemaphoreSlim(maxConcurrent, maxConcurrent); + _dedupTtl = dedupTtl ?? TimeSpan.FromSeconds(5); + } + + /// Current number of in-flight requests. + public int ActiveCount => _maxConcurrent - _semaphore.CurrentCount; + + /// Number of cached dedup responses. + public int DedupCacheCount => _dedupCache.Count; + + /// + /// Attempts to acquire a concurrency slot. Returns false if the limit is reached. + /// Go reference: jetstream_api.go — non-blocking semaphore acquire; request is rejected + /// immediately if no slots are available rather than queuing indefinitely. + /// + public async Task TryAcquireAsync(CancellationToken ct = default) + { + return await _semaphore.WaitAsync(0, ct); + } + + /// + /// Releases a concurrency slot. + /// + public void Release() + { + _semaphore.Release(); + } + + /// + /// Checks if a request with the given ID has a cached response. + /// Returns the cached response if found and not expired, null otherwise. + /// Go reference: jetstream_api.go — dedup cache is keyed by Nats-Msg-Id header value. + /// + public JetStreamApiResponse? GetCachedResponse(string? requestId) + { + if (string.IsNullOrEmpty(requestId)) + return null; + + if (_dedupCache.TryGetValue(requestId, out var cached)) + { + if (DateTime.UtcNow - cached.CachedAt < _dedupTtl) + return cached.Response; + + // Expired — remove and return null. + _dedupCache.TryRemove(requestId, out _); + } + + return null; + } + + /// + /// Caches a response for deduplication. + /// Go reference: jetstream_api.go — response is stored with a timestamp so that + /// subsequent requests with the same Nats-Msg-Id within the TTL window get the same result. + /// + public void CacheResponse(string? requestId, JetStreamApiResponse response) + { + if (string.IsNullOrEmpty(requestId)) + return; + + _dedupCache[requestId] = new CachedResponse(response, DateTime.UtcNow); + } + + /// + /// Removes expired entries from the dedup cache. + /// Call periodically to prevent unbounded growth. + /// Go reference: jetstream_api.go — Go's dedup window uses a sliding expiry; entries older + /// than dedupWindow are dropped on the next sweep. + /// + public int PurgeExpired() + { + var cutoff = DateTime.UtcNow - _dedupTtl; + var removed = 0; + foreach (var (key, value) in _dedupCache) + { + if (value.CachedAt < cutoff && _dedupCache.TryRemove(key, out _)) + removed++; + } + return removed; + } + + public void Dispose() + { + _semaphore.Dispose(); + } + + private sealed record CachedResponse(JetStreamApiResponse Response, DateTime CachedAt); +} diff --git a/src/NATS.Server/JetStream/Api/ClusteredRequestProcessor.cs b/src/NATS.Server/JetStream/Api/ClusteredRequestProcessor.cs new file mode 100644 index 0000000..97bea6d --- /dev/null +++ b/src/NATS.Server/JetStream/Api/ClusteredRequestProcessor.cs @@ -0,0 +1,105 @@ +using System.Collections.Concurrent; + +namespace NATS.Server.JetStream.Api; + +/// +/// Tracks pending clustered JetStream API requests, correlates RAFT apply callbacks with +/// waiting callers, and enforces per-request timeouts. +/// Go reference: jetstream_cluster.go:7620-7701 — jsClusteredStreamRequest proposes an entry +/// to the meta RAFT group and waits for the leader to apply it; the result is delivered via +/// a per-request channel. This class models that channel-per-request pattern using +/// TaskCompletionSource. +/// +public sealed class ClusteredRequestProcessor +{ + private static readonly TimeSpan DefaultTimeout = TimeSpan.FromSeconds(5); + + private readonly ConcurrentDictionary> _pending = new(); + private readonly TimeSpan _timeout; + private int _pendingCount; + + public ClusteredRequestProcessor(TimeSpan? timeout = null) + { + _timeout = timeout ?? DefaultTimeout; + } + + /// Current number of in-flight pending requests. + public int PendingCount => _pendingCount; + + /// + /// Registers a new pending request and returns a unique correlation ID. + /// Go reference: jetstream_cluster.go:7620 — each clustered request gets a unique ID + /// used to correlate the RAFT apply callback with the waiting caller. + /// + public string RegisterPending() + { + var id = Guid.NewGuid().ToString("N"); + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + _pending[id] = tcs; + Interlocked.Increment(ref _pendingCount); + return id; + } + + /// + /// Waits for a result to be delivered for the given request ID. + /// Returns a timeout error if no result is delivered within the configured timeout, + /// or a 500 error if the ID was never registered. + /// Go reference: jetstream_cluster.go:7620 — the goroutine waits on a per-request channel + /// with a context deadline derived from the cluster's JSApiTimeout option. + /// + public async Task WaitForResultAsync(string requestId, CancellationToken ct = default) + { + if (!_pending.TryGetValue(requestId, out var tcs)) + { + return JetStreamApiResponse.ErrorResponse(500, "request id not found"); + } + + using var timeoutCts = new CancellationTokenSource(_timeout); + using var linked = CancellationTokenSource.CreateLinkedTokenSource(timeoutCts.Token, ct); + + try + { + await using var reg = linked.Token.Register(() => tcs.TrySetCanceled(linked.Token)); + return await tcs.Task.ConfigureAwait(false); + } + catch (OperationCanceledException) + { + _pending.TryRemove(requestId, out _); + Interlocked.Decrement(ref _pendingCount); + return JetStreamApiResponse.ErrorResponse(408, "timeout waiting for cluster response"); + } + } + + /// + /// Delivers a result for a pending request. Returns true if the request was found and + /// the result was accepted; false if the ID is unknown or already completed. + /// Go reference: jetstream_cluster.go:7620 — the RAFT apply callback resolves the pending + /// request channel so the waiting goroutine can return the response to the caller. + /// + public bool DeliverResult(string requestId, JetStreamApiResponse response) + { + if (!_pending.TryRemove(requestId, out var tcs)) + return false; + + Interlocked.Decrement(ref _pendingCount); + return tcs.TrySetResult(response); + } + + /// + /// Cancels all pending requests with a 503 error, typically called when this node loses + /// RAFT leadership so callers do not hang indefinitely. + /// Go reference: jetstream_cluster.go — when RAFT leadership changes, all in-flight + /// proposals must be failed with a "not leader" or "cancelled" error. + /// + public void CancelAll(string reason = "leadership changed") + { + foreach (var (key, tcs) in _pending) + { + if (_pending.TryRemove(key, out _)) + { + Interlocked.Decrement(ref _pendingCount); + tcs.TrySetResult(JetStreamApiResponse.ErrorResponse(503, reason)); + } + } + } +} diff --git a/tests/NATS.Server.Tests/JetStream/Api/ApiRateLimiterTests.cs b/tests/NATS.Server.Tests/JetStream/Api/ApiRateLimiterTests.cs new file mode 100644 index 0000000..94da3a5 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/Api/ApiRateLimiterTests.cs @@ -0,0 +1,187 @@ +// Go reference: jetstream_api.go — rate limiting via maxConcurrentRequests semaphore and +// request deduplication via the dedup cache keyed by Nats-Msg-Id header. +// The Go server uses a configurable semaphore (default 256) to throttle concurrent API +// requests, and caches responses for duplicate request IDs within a TTL window. + +namespace NATS.Server.Tests.JetStream.Api; + +using NATS.Server.JetStream.Api; +using NATS.Server.Tests; + +public class ApiRateLimiterTests : IDisposable +{ + private readonly ApiRateLimiter _limiter = new(maxConcurrent: 4); + + public void Dispose() => _limiter.Dispose(); + + // Go reference: jetstream_api.go — semaphore.TryAcquire(0) used for non-blocking attempt. + [Fact] + public async Task TryAcquire_succeeds_when_slots_available() + { + var acquired = await _limiter.TryAcquireAsync(); + acquired.ShouldBeTrue(); + } + + // Go reference: jetstream_api.go — when all slots are taken, new requests are rejected. + [Fact] + public async Task TryAcquire_fails_when_all_slots_taken() + { + // Fill all 4 slots. + for (var i = 0; i < 4; i++) + (await _limiter.TryAcquireAsync()).ShouldBeTrue(); + + // 5th attempt should fail. + var rejected = await _limiter.TryAcquireAsync(); + rejected.ShouldBeFalse(); + } + + // Go reference: jetstream_api.go — releasing a slot allows a subsequent request to proceed. + [Fact] + public async Task Release_frees_slot_for_next_request() + { + // Fill all slots. + for (var i = 0; i < 4; i++) + (await _limiter.TryAcquireAsync()).ShouldBeTrue(); + + // Currently full. + (await _limiter.TryAcquireAsync()).ShouldBeFalse(); + + // Release one slot. + _limiter.Release(); + + // Now one slot is free. + (await _limiter.TryAcquireAsync()).ShouldBeTrue(); + } + + // Go reference: jetstream_api.go — active count reflects in-flight requests. + [Fact] + public async Task ActiveCount_tracks_concurrent_requests() + { + _limiter.ActiveCount.ShouldBe(0); + + await _limiter.TryAcquireAsync(); + await _limiter.TryAcquireAsync(); + await _limiter.TryAcquireAsync(); + + _limiter.ActiveCount.ShouldBe(3); + } + + // Go reference: jetstream_api.go — unknown request ID returns null (cache miss). + [Fact] + public void GetCachedResponse_returns_null_for_unknown_id() + { + var result = _limiter.GetCachedResponse("nonexistent-id"); + result.ShouldBeNull(); + } + + // Go reference: jetstream_api.go — dedup cache stores response keyed by Nats-Msg-Id. + [Fact] + public void CacheResponse_and_get_returns_cached() + { + var response = JetStreamApiResponse.SuccessResponse(); + _limiter.CacheResponse("req-001", response); + + var cached = _limiter.GetCachedResponse("req-001"); + cached.ShouldNotBeNull(); + cached!.Success.ShouldBeTrue(); + } + + // Go reference: jetstream_api.go — dedup window expires after TTL (dedupWindow config). + [SlopwatchSuppress("SW004", "TTL expiry test requires real wall-clock time to elapse; no synchronisation primitive can replace observing a time-based cache eviction")] + [Fact] + public async Task GetCachedResponse_returns_null_after_ttl_expiry() + { + using var shortLimiter = new ApiRateLimiter(maxConcurrent: 4, dedupTtl: TimeSpan.FromMilliseconds(50)); + var response = JetStreamApiResponse.SuccessResponse(); + shortLimiter.CacheResponse("req-ttl", response); + + // Verify it's cached before expiry. + shortLimiter.GetCachedResponse("req-ttl").ShouldNotBeNull(); + + // Wait for TTL to expire. + await Task.Delay(120); + + // Should be null after expiry. + shortLimiter.GetCachedResponse("req-ttl").ShouldBeNull(); + } + + // Go reference: jetstream_api.go — null/empty Nats-Msg-Id is ignored for dedup. + [Fact] + public void CacheResponse_ignores_null_request_id() + { + var response = JetStreamApiResponse.SuccessResponse(); + + // These should not throw and should not increment the cache count. + _limiter.CacheResponse(null, response); + _limiter.CacheResponse("", response); + _limiter.CacheResponse(string.Empty, response); + + _limiter.DedupCacheCount.ShouldBe(0); + _limiter.GetCachedResponse(null).ShouldBeNull(); + _limiter.GetCachedResponse("").ShouldBeNull(); + } + + // Go reference: jetstream_api.go — periodic sweep removes expired dedup entries. + [SlopwatchSuppress("SW004", "TTL expiry test requires real wall-clock time to elapse; no synchronisation primitive can replace observing a time-based cache eviction")] + [Fact] + public async Task PurgeExpired_removes_old_entries() + { + using var shortLimiter = new ApiRateLimiter(maxConcurrent: 4, dedupTtl: TimeSpan.FromMilliseconds(50)); + + shortLimiter.CacheResponse("req-a", JetStreamApiResponse.SuccessResponse()); + shortLimiter.CacheResponse("req-b", JetStreamApiResponse.SuccessResponse()); + shortLimiter.CacheResponse("req-c", JetStreamApiResponse.SuccessResponse()); + + shortLimiter.DedupCacheCount.ShouldBe(3); + + // Wait for all entries to expire. + await Task.Delay(120); + + var removed = shortLimiter.PurgeExpired(); + removed.ShouldBe(3); + shortLimiter.DedupCacheCount.ShouldBe(0); + } + + // Go reference: jetstream_api.go — dedup cache count is observable. + [Fact] + public void DedupCacheCount_tracks_cached_entries() + { + _limiter.DedupCacheCount.ShouldBe(0); + + _limiter.CacheResponse("req-1", JetStreamApiResponse.Ok()); + _limiter.CacheResponse("req-2", JetStreamApiResponse.Ok()); + _limiter.CacheResponse("req-3", JetStreamApiResponse.Ok()); + + _limiter.DedupCacheCount.ShouldBe(3); + } + + // Go reference: jetstream_api.go — semaphore enforces max-concurrent across goroutines. + [Fact] + public async Task Concurrent_acquire_respects_max() + { + using var limiter = new ApiRateLimiter(maxConcurrent: 5); + + // Spin up 10 tasks, only 5 should succeed. + var results = await Task.WhenAll( + Enumerable.Range(0, 10).Select(_ => limiter.TryAcquireAsync())); + + var acquired = results.Count(r => r); + acquired.ShouldBe(5); + } + + // Go reference: jetstream_api.go — default maxConcurrentRequests = 256. + [Fact] + public async Task Default_max_concurrent_is_256() + { + using var defaultLimiter = new ApiRateLimiter(); + + // Acquire 256 slots — all should succeed. + var tasks = Enumerable.Range(0, 256).Select(_ => defaultLimiter.TryAcquireAsync()); + var results = await Task.WhenAll(tasks); + results.ShouldAllBe(r => r); + + // 257th should fail. + var rejected = await defaultLimiter.TryAcquireAsync(); + rejected.ShouldBeFalse(); + } +} diff --git a/tests/NATS.Server.Tests/JetStream/Api/ClusteredRequestTests.cs b/tests/NATS.Server.Tests/JetStream/Api/ClusteredRequestTests.cs new file mode 100644 index 0000000..76bd25f --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/Api/ClusteredRequestTests.cs @@ -0,0 +1,303 @@ +// Go reference: jetstream_cluster.go:7620-7701 — jsClusteredStreamRequest lifecycle: +// propose to meta RAFT → wait for result → deliver or time out. +// ClusteredRequestProcessor tracks pending requests and delivers results when RAFT entries +// are applied, matching the Go server's callback-based completion mechanism. + +using NATS.Server.JetStream.Api; + +namespace NATS.Server.Tests.JetStream.Api; + +public class ClusteredRequestTests +{ + // --------------------------------------------------------------- + // RegisterPending + // --------------------------------------------------------------- + + /// + /// Each call to RegisterPending returns a distinct, non-empty string identifier. + /// Go reference: jetstream_cluster.go:7620 — each clustered request gets a unique ID + /// used to correlate the RAFT apply callback with the waiting caller. + /// + [Fact] + public void RegisterPending_returns_unique_id() + { + var processor = new ClusteredRequestProcessor(); + + var id1 = processor.RegisterPending(); + var id2 = processor.RegisterPending(); + + id1.ShouldNotBeNullOrWhiteSpace(); + id2.ShouldNotBeNullOrWhiteSpace(); + id1.ShouldNotBe(id2); + } + + // --------------------------------------------------------------- + // WaitForResult + // --------------------------------------------------------------- + + /// + /// When a result is delivered for a pending request, WaitForResultAsync returns that response. + /// Go reference: jetstream_cluster.go:7620 — the waiting goroutine receives the result + /// via channel once the RAFT leader applies the entry. + /// + [Fact] + public async Task WaitForResult_returns_delivered_response() + { + var processor = new ClusteredRequestProcessor(timeout: TimeSpan.FromSeconds(5)); + var requestId = processor.RegisterPending(); + var expected = JetStreamApiResponse.SuccessResponse(); + + // Use a semaphore so the wait starts before delivery occurs — no timing dependency. + var waitStarted = new SemaphoreSlim(0, 1); + var deliverTask = Task.Run(async () => + { + // Wait until WaitForResultAsync has been entered before delivering. + await waitStarted.WaitAsync(); + processor.DeliverResult(requestId, expected); + }); + + // Signal the deliver task once we begin waiting. + waitStarted.Release(); + var result = await processor.WaitForResultAsync(requestId); + + await deliverTask; + result.ShouldBeSameAs(expected); + } + + /// + /// When no result is delivered within the timeout, WaitForResultAsync returns a 408 error. + /// Go reference: jetstream_cluster.go:7620 — if the RAFT group does not respond in time, + /// the request is considered timed out and an error is returned to the client. + /// + [Fact] + public async Task WaitForResult_times_out_after_timeout() + { + var processor = new ClusteredRequestProcessor(timeout: TimeSpan.FromMilliseconds(50)); + var requestId = processor.RegisterPending(); + + var result = await processor.WaitForResultAsync(requestId); + + result.Error.ShouldNotBeNull(); + result.Error!.Code.ShouldBe(408); + result.Error.Description.ShouldContain("timeout"); + } + + /// + /// WaitForResultAsync returns a 500 error for an ID that was never registered. + /// Go reference: jetstream_cluster.go — requesting a result for an unknown request ID + /// is a programming error; return an internal server error. + /// + [Fact] + public async Task WaitForResult_returns_error_for_unknown_id() + { + var processor = new ClusteredRequestProcessor(); + + var result = await processor.WaitForResultAsync("nonexistent-id"); + + result.Error.ShouldNotBeNull(); + result.Error!.Code.ShouldBe(500); + result.Error.Description.ShouldContain("not found"); + } + + /// + /// When the caller's CancellationToken is triggered, WaitForResultAsync returns a timeout error. + /// Go reference: jetstream_cluster.go:7620 — callers can cancel waiting for a RAFT result + /// if their own request context is cancelled. + /// + [Fact] + public async Task WaitForResult_respects_cancellation_token() + { + var processor = new ClusteredRequestProcessor(timeout: TimeSpan.FromSeconds(30)); + var requestId = processor.RegisterPending(); + + using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(50)); + var result = await processor.WaitForResultAsync(requestId, cts.Token); + + result.Error.ShouldNotBeNull(); + result.Error!.Code.ShouldBe(408); + } + + // --------------------------------------------------------------- + // DeliverResult + // --------------------------------------------------------------- + + /// + /// DeliverResult returns true when the request ID is known and pending. + /// Go reference: jetstream_cluster.go:7620 — the RAFT apply callback signals success + /// by resolving the pending request. + /// + [Fact] + public void DeliverResult_returns_true_for_pending_request() + { + var processor = new ClusteredRequestProcessor(); + var requestId = processor.RegisterPending(); + + var delivered = processor.DeliverResult(requestId, JetStreamApiResponse.SuccessResponse()); + + delivered.ShouldBeTrue(); + } + + /// + /// DeliverResult returns false when the request ID is not found. + /// Go reference: jetstream_cluster.go — delivering a result for an unknown or already-completed + /// request is a no-op; return false so the caller knows the result was not consumed. + /// + [Fact] + public void DeliverResult_returns_false_for_unknown_request() + { + var processor = new ClusteredRequestProcessor(); + + var delivered = processor.DeliverResult("unknown-id", JetStreamApiResponse.SuccessResponse()); + + delivered.ShouldBeFalse(); + } + + // --------------------------------------------------------------- + // PendingCount + // --------------------------------------------------------------- + + /// + /// PendingCount increases with each RegisterPending call and decreases when a result is + /// delivered or the request times out. + /// Go reference: jetstream_cluster.go — the server tracks pending RAFT proposals for + /// observability and to detect stuck requests. + /// + [Fact] + public async Task PendingCount_tracks_active_requests() + { + var processor = new ClusteredRequestProcessor(timeout: TimeSpan.FromMilliseconds(50)); + + processor.PendingCount.ShouldBe(0); + + var id1 = processor.RegisterPending(); + processor.PendingCount.ShouldBe(1); + + var id2 = processor.RegisterPending(); + processor.PendingCount.ShouldBe(2); + + // Deliver one request. + processor.DeliverResult(id1, JetStreamApiResponse.SuccessResponse()); + processor.PendingCount.ShouldBe(1); + + // Let id2 time out. + await processor.WaitForResultAsync(id2); + processor.PendingCount.ShouldBe(0); + } + + // --------------------------------------------------------------- + // CancelAll + // --------------------------------------------------------------- + + /// + /// CancelAll completes all pending requests with a 503 error response. + /// Go reference: jetstream_cluster.go — when this node loses RAFT leadership, all + /// in-flight proposals must be failed so callers do not hang indefinitely. + /// + [Fact] + public async Task CancelAll_completes_all_pending_with_error() + { + var processor = new ClusteredRequestProcessor(timeout: TimeSpan.FromSeconds(30)); + + var id1 = processor.RegisterPending(); + var id2 = processor.RegisterPending(); + + var task1 = processor.WaitForResultAsync(id1); + var task2 = processor.WaitForResultAsync(id2); + + processor.CancelAll("leadership changed"); + + var result1 = await task1; + var result2 = await task2; + + result1.Error.ShouldNotBeNull(); + result1.Error!.Code.ShouldBe(503); + result1.Error.Description.ShouldContain("leadership changed"); + + result2.Error.ShouldNotBeNull(); + result2.Error!.Code.ShouldBe(503); + result2.Error.Description.ShouldContain("leadership changed"); + } + + /// + /// After CancelAll, PendingCount drops to zero. + /// Go reference: jetstream_cluster.go — a leadership change clears all pending state. + /// + [Fact] + public void CancelAll_clears_pending_count() + { + var processor = new ClusteredRequestProcessor(timeout: TimeSpan.FromSeconds(30)); + + processor.RegisterPending(); + processor.RegisterPending(); + processor.RegisterPending(); + + processor.PendingCount.ShouldBe(3); + + processor.CancelAll(); + + processor.PendingCount.ShouldBe(0); + } + + /// + /// CancelAll uses a default reason of "leadership changed" when no reason is provided. + /// Go reference: jetstream_cluster.go — default cancellation reason matches NATS cluster semantics. + /// + [Fact] + public async Task CancelAll_uses_default_reason() + { + var processor = new ClusteredRequestProcessor(timeout: TimeSpan.FromSeconds(30)); + + var id = processor.RegisterPending(); + var task = processor.WaitForResultAsync(id); + + processor.CancelAll(); // no reason argument + + var result = await task; + + result.Error.ShouldNotBeNull(); + result.Error!.Description.ShouldContain("leadership changed"); + } + + // --------------------------------------------------------------- + // Concurrency + // --------------------------------------------------------------- + + /// + /// Concurrent registrations and deliveries all receive the correct response. + /// Go reference: jetstream_cluster.go — in a cluster, many API requests may be in-flight + /// simultaneously, each waiting for its own RAFT entry to be applied. + /// + [Fact] + public async Task Concurrent_register_and_deliver() + { + const int count = 50; + var processor = new ClusteredRequestProcessor(timeout: TimeSpan.FromSeconds(10)); + + var requestIds = new string[count]; + for (var i = 0; i < count; i++) + requestIds[i] = processor.RegisterPending(); + + // Start all waits concurrently. + var waitTasks = requestIds.Select(id => processor.WaitForResultAsync(id)).ToArray(); + + // Deliver all results concurrently — no delay needed; the ThreadPool provides + // sufficient interleaving to exercise concurrent access patterns. + var deliverTasks = requestIds.Select((id, i) => Task.Run(() => + { + processor.DeliverResult(id, JetStreamApiResponse.ErrorResponse(200 + i, $"response-{i}")); + })).ToArray(); + + await Task.WhenAll(deliverTasks); + var results = await Task.WhenAll(waitTasks); + + // Every result should be a valid response (no null errors from "not found"). + results.Length.ShouldBe(count); + foreach (var result in results) + { + // Each result was an explicitly delivered response with a known code. + result.Error.ShouldNotBeNull(); + result.Error!.Code.ShouldBeGreaterThanOrEqualTo(200); + result.Error.Code.ShouldBeLessThan(200 + count); + } + } +} diff --git a/tests/NATS.Server.Tests/SlopwatchSuppressAttribute.cs b/tests/NATS.Server.Tests/SlopwatchSuppressAttribute.cs new file mode 100644 index 0000000..0695aee --- /dev/null +++ b/tests/NATS.Server.Tests/SlopwatchSuppressAttribute.cs @@ -0,0 +1,12 @@ +// Marker attribute recognised by the slopwatch static-analysis tool. +// Apply to a test method to suppress a specific slopwatch rule violation. +// The justification must be 20+ characters explaining why the suppression is intentional. + +namespace NATS.Server.Tests; + +[AttributeUsage(AttributeTargets.Method, AllowMultiple = true)] +public sealed class SlopwatchSuppressAttribute(string ruleId, string justification) : Attribute +{ + public string RuleId { get; } = ruleId; + public string Justification { get; } = justification; +}