diff --git a/src/NATS.Server/JetStream/Consumers/WaitingRequestQueue.cs b/src/NATS.Server/JetStream/Consumers/WaitingRequestQueue.cs new file mode 100644 index 0000000..81c9a51 --- /dev/null +++ b/src/NATS.Server/JetStream/Consumers/WaitingRequestQueue.cs @@ -0,0 +1,65 @@ +namespace NATS.Server.JetStream.Consumers; + +/// +/// A pull request with mutable batch and byte tracking for delivery fulfillment. +/// Go reference: consumer.go waitingRequest / processNextMsgRequest. +/// +public sealed record PullRequest( + string ReplyTo, + int Batch, + long MaxBytes, + DateTimeOffset Expires, + bool NoWait, + string? PinId = null) +{ + /// Remaining messages allowed for this request. + public int RemainingBatch { get; private set; } = Batch; + + /// Remaining bytes allowed for this request (only meaningful when MaxBytes > 0). + public long RemainingBytes { get; private set; } = MaxBytes; + + /// True when batch or bytes (if set) are exhausted. + public bool IsExhausted => RemainingBatch <= 0 || (MaxBytes > 0 && RemainingBytes <= 0); + + /// Decrement remaining batch count by one. + public void ConsumeBatch() => RemainingBatch--; + + /// Subtract delivered bytes from remaining byte budget. + public void ConsumeBytes(long bytes) => RemainingBytes -= bytes; +} + +/// +/// FIFO queue of pull requests with expiry support. +/// Unlike PullRequestWaitQueue (priority-based), this is a simple FIFO with +/// RemoveExpired cleanup and mutable request tracking. +/// Go reference: consumer.go waitQueue / processNextMsgRequest. +/// +public sealed class WaitingRequestQueue +{ + private readonly LinkedList _queue = new(); + + public int Count => _queue.Count; + public bool IsEmpty => _queue.Count == 0; + + public void Enqueue(PullRequest request) => _queue.AddLast(request); + + public PullRequest? TryDequeue() + { + if (_queue.Count == 0) return null; + var first = _queue.First!.Value; + _queue.RemoveFirst(); + return first; + } + + public void RemoveExpired(DateTimeOffset now) + { + var node = _queue.First; + while (node != null) + { + var next = node.Next; + if (node.Value.Expires <= now) + _queue.Remove(node); + node = next; + } + } +} diff --git a/tests/NATS.Server.Tests/JetStream/Consumers/WaitingRequestQueueTests.cs b/tests/NATS.Server.Tests/JetStream/Consumers/WaitingRequestQueueTests.cs new file mode 100644 index 0000000..f93e238 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/Consumers/WaitingRequestQueueTests.cs @@ -0,0 +1,116 @@ +using NATS.Server.JetStream.Consumers; + +namespace NATS.Server.Tests.JetStream.Consumers; + +/// +/// Tests for WaitingRequestQueue FIFO queue with expiry and batch/byte tracking. +/// Go reference: consumer.go processNextMsgRequest. +/// +public class WaitingRequestQueueTests +{ + [Fact] + public void Enqueue_and_dequeue_fifo() + { + var queue = new WaitingRequestQueue(); + queue.Enqueue(new PullRequest("reply.1", Batch: 10, MaxBytes: 0, Expires: DateTimeOffset.UtcNow.AddMinutes(1), NoWait: false)); + queue.Enqueue(new PullRequest("reply.2", Batch: 5, MaxBytes: 0, Expires: DateTimeOffset.UtcNow.AddMinutes(1), NoWait: false)); + + queue.Count.ShouldBe(2); + + var first = queue.TryDequeue(); + first.ShouldNotBeNull(); + first.ReplyTo.ShouldBe("reply.1"); + } + + [Fact] + public void TryDequeue_returns_null_when_empty() + { + var queue = new WaitingRequestQueue(); + queue.TryDequeue().ShouldBeNull(); + queue.IsEmpty.ShouldBeTrue(); + } + + [Fact] + public void Expired_requests_are_removed() + { + var queue = new WaitingRequestQueue(); + queue.Enqueue(new PullRequest("expired", Batch: 10, MaxBytes: 0, Expires: DateTimeOffset.UtcNow.AddMilliseconds(-100), NoWait: false)); + queue.Enqueue(new PullRequest("valid", Batch: 10, MaxBytes: 0, Expires: DateTimeOffset.UtcNow.AddMinutes(1), NoWait: false)); + + queue.RemoveExpired(DateTimeOffset.UtcNow); + queue.Count.ShouldBe(1); + + var next = queue.TryDequeue(); + next!.ReplyTo.ShouldBe("valid"); + } + + [Fact] + public void NoWait_request_returns_immediately_when_empty() + { + var queue = new WaitingRequestQueue(); + queue.Enqueue(new PullRequest("nowait", Batch: 10, MaxBytes: 0, Expires: DateTimeOffset.UtcNow.AddMinutes(1), NoWait: true)); + + var req = queue.TryDequeue(); + req.ShouldNotBeNull(); + req.NoWait.ShouldBeTrue(); + } + + [Fact] + public void MaxBytes_tracks_accumulation() + { + var queue = new WaitingRequestQueue(); + var req = new PullRequest("mb", Batch: 100, MaxBytes: 1024, Expires: DateTimeOffset.UtcNow.AddMinutes(1), NoWait: false); + queue.Enqueue(req); + + var dequeued = queue.TryDequeue()!; + dequeued.MaxBytes.ShouldBe(1024L); + dequeued.RemainingBytes.ShouldBe(1024L); + + dequeued.ConsumeBytes(256); + dequeued.RemainingBytes.ShouldBe(768L); + dequeued.IsExhausted.ShouldBeFalse(); + + dequeued.ConsumeBytes(800); + dequeued.IsExhausted.ShouldBeTrue(); + } + + [Fact] + public void Batch_decrements_on_delivery() + { + var queue = new WaitingRequestQueue(); + var req = new PullRequest("batch", Batch: 3, MaxBytes: 0, Expires: DateTimeOffset.UtcNow.AddMinutes(1), NoWait: false); + queue.Enqueue(req); + + var dequeued = queue.TryDequeue()!; + dequeued.RemainingBatch.ShouldBe(3); + + dequeued.ConsumeBatch(); + dequeued.RemainingBatch.ShouldBe(2); + + dequeued.ConsumeBatch(); + dequeued.ConsumeBatch(); + dequeued.IsExhausted.ShouldBeTrue(); + } + + [Fact] + public void RemoveExpired_handles_all_expired() + { + var queue = new WaitingRequestQueue(); + queue.Enqueue(new PullRequest("a", Batch: 1, MaxBytes: 0, Expires: DateTimeOffset.UtcNow.AddMilliseconds(-100), NoWait: false)); + queue.Enqueue(new PullRequest("b", Batch: 1, MaxBytes: 0, Expires: DateTimeOffset.UtcNow.AddMilliseconds(-50), NoWait: false)); + + queue.RemoveExpired(DateTimeOffset.UtcNow); + queue.Count.ShouldBe(0); + queue.IsEmpty.ShouldBeTrue(); + } + + [Fact] + public void PinId_is_stored() + { + var queue = new WaitingRequestQueue(); + queue.Enqueue(new PullRequest("pin", Batch: 1, MaxBytes: 0, Expires: DateTimeOffset.UtcNow.AddMinutes(1), NoWait: false, PinId: "pin-123")); + + var dequeued = queue.TryDequeue()!; + dequeued.PinId.ShouldBe("pin-123"); + } +}