feat(consumer): add WaitingRequestQueue with expiry and batch/maxBytes tracking
Implements a FIFO pull-request queue (WaitingRequestQueue) with per-request mutable batch countdown and byte budget tracking, plus RemoveExpired cleanup. Complements the existing priority-based PullRequestWaitQueue for pull consumer delivery. Go reference: consumer.go waitQueue / processNextMsgRequest (lines 4276-4450).
This commit is contained in:
65
src/NATS.Server/JetStream/Consumers/WaitingRequestQueue.cs
Normal file
65
src/NATS.Server/JetStream/Consumers/WaitingRequestQueue.cs
Normal file
@@ -0,0 +1,65 @@
|
||||
namespace NATS.Server.JetStream.Consumers;
|
||||
|
||||
/// <summary>
|
||||
/// A pull request with mutable batch and byte tracking for delivery fulfillment.
|
||||
/// Go reference: consumer.go waitingRequest / processNextMsgRequest.
|
||||
/// </summary>
|
||||
public sealed record PullRequest(
|
||||
string ReplyTo,
|
||||
int Batch,
|
||||
long MaxBytes,
|
||||
DateTimeOffset Expires,
|
||||
bool NoWait,
|
||||
string? PinId = null)
|
||||
{
|
||||
/// <summary>Remaining messages allowed for this request.</summary>
|
||||
public int RemainingBatch { get; private set; } = Batch;
|
||||
|
||||
/// <summary>Remaining bytes allowed for this request (only meaningful when MaxBytes > 0).</summary>
|
||||
public long RemainingBytes { get; private set; } = MaxBytes;
|
||||
|
||||
/// <summary>True when batch or bytes (if set) are exhausted.</summary>
|
||||
public bool IsExhausted => RemainingBatch <= 0 || (MaxBytes > 0 && RemainingBytes <= 0);
|
||||
|
||||
/// <summary>Decrement remaining batch count by one.</summary>
|
||||
public void ConsumeBatch() => RemainingBatch--;
|
||||
|
||||
/// <summary>Subtract delivered bytes from remaining byte budget.</summary>
|
||||
public void ConsumeBytes(long bytes) => RemainingBytes -= bytes;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// FIFO queue of pull requests with expiry support.
|
||||
/// Unlike PullRequestWaitQueue (priority-based), this is a simple FIFO with
|
||||
/// RemoveExpired cleanup and mutable request tracking.
|
||||
/// Go reference: consumer.go waitQueue / processNextMsgRequest.
|
||||
/// </summary>
|
||||
public sealed class WaitingRequestQueue
|
||||
{
|
||||
private readonly LinkedList<PullRequest> _queue = new();
|
||||
|
||||
public int Count => _queue.Count;
|
||||
public bool IsEmpty => _queue.Count == 0;
|
||||
|
||||
public void Enqueue(PullRequest request) => _queue.AddLast(request);
|
||||
|
||||
public PullRequest? TryDequeue()
|
||||
{
|
||||
if (_queue.Count == 0) return null;
|
||||
var first = _queue.First!.Value;
|
||||
_queue.RemoveFirst();
|
||||
return first;
|
||||
}
|
||||
|
||||
public void RemoveExpired(DateTimeOffset now)
|
||||
{
|
||||
var node = _queue.First;
|
||||
while (node != null)
|
||||
{
|
||||
var next = node.Next;
|
||||
if (node.Value.Expires <= now)
|
||||
_queue.Remove(node);
|
||||
node = next;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,116 @@
|
||||
using NATS.Server.JetStream.Consumers;
|
||||
|
||||
namespace NATS.Server.Tests.JetStream.Consumers;
|
||||
|
||||
/// <summary>
|
||||
/// Tests for WaitingRequestQueue FIFO queue with expiry and batch/byte tracking.
|
||||
/// Go reference: consumer.go processNextMsgRequest.
|
||||
/// </summary>
|
||||
public class WaitingRequestQueueTests
|
||||
{
|
||||
[Fact]
|
||||
public void Enqueue_and_dequeue_fifo()
|
||||
{
|
||||
var queue = new WaitingRequestQueue();
|
||||
queue.Enqueue(new PullRequest("reply.1", Batch: 10, MaxBytes: 0, Expires: DateTimeOffset.UtcNow.AddMinutes(1), NoWait: false));
|
||||
queue.Enqueue(new PullRequest("reply.2", Batch: 5, MaxBytes: 0, Expires: DateTimeOffset.UtcNow.AddMinutes(1), NoWait: false));
|
||||
|
||||
queue.Count.ShouldBe(2);
|
||||
|
||||
var first = queue.TryDequeue();
|
||||
first.ShouldNotBeNull();
|
||||
first.ReplyTo.ShouldBe("reply.1");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void TryDequeue_returns_null_when_empty()
|
||||
{
|
||||
var queue = new WaitingRequestQueue();
|
||||
queue.TryDequeue().ShouldBeNull();
|
||||
queue.IsEmpty.ShouldBeTrue();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Expired_requests_are_removed()
|
||||
{
|
||||
var queue = new WaitingRequestQueue();
|
||||
queue.Enqueue(new PullRequest("expired", Batch: 10, MaxBytes: 0, Expires: DateTimeOffset.UtcNow.AddMilliseconds(-100), NoWait: false));
|
||||
queue.Enqueue(new PullRequest("valid", Batch: 10, MaxBytes: 0, Expires: DateTimeOffset.UtcNow.AddMinutes(1), NoWait: false));
|
||||
|
||||
queue.RemoveExpired(DateTimeOffset.UtcNow);
|
||||
queue.Count.ShouldBe(1);
|
||||
|
||||
var next = queue.TryDequeue();
|
||||
next!.ReplyTo.ShouldBe("valid");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void NoWait_request_returns_immediately_when_empty()
|
||||
{
|
||||
var queue = new WaitingRequestQueue();
|
||||
queue.Enqueue(new PullRequest("nowait", Batch: 10, MaxBytes: 0, Expires: DateTimeOffset.UtcNow.AddMinutes(1), NoWait: true));
|
||||
|
||||
var req = queue.TryDequeue();
|
||||
req.ShouldNotBeNull();
|
||||
req.NoWait.ShouldBeTrue();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void MaxBytes_tracks_accumulation()
|
||||
{
|
||||
var queue = new WaitingRequestQueue();
|
||||
var req = new PullRequest("mb", Batch: 100, MaxBytes: 1024, Expires: DateTimeOffset.UtcNow.AddMinutes(1), NoWait: false);
|
||||
queue.Enqueue(req);
|
||||
|
||||
var dequeued = queue.TryDequeue()!;
|
||||
dequeued.MaxBytes.ShouldBe(1024L);
|
||||
dequeued.RemainingBytes.ShouldBe(1024L);
|
||||
|
||||
dequeued.ConsumeBytes(256);
|
||||
dequeued.RemainingBytes.ShouldBe(768L);
|
||||
dequeued.IsExhausted.ShouldBeFalse();
|
||||
|
||||
dequeued.ConsumeBytes(800);
|
||||
dequeued.IsExhausted.ShouldBeTrue();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Batch_decrements_on_delivery()
|
||||
{
|
||||
var queue = new WaitingRequestQueue();
|
||||
var req = new PullRequest("batch", Batch: 3, MaxBytes: 0, Expires: DateTimeOffset.UtcNow.AddMinutes(1), NoWait: false);
|
||||
queue.Enqueue(req);
|
||||
|
||||
var dequeued = queue.TryDequeue()!;
|
||||
dequeued.RemainingBatch.ShouldBe(3);
|
||||
|
||||
dequeued.ConsumeBatch();
|
||||
dequeued.RemainingBatch.ShouldBe(2);
|
||||
|
||||
dequeued.ConsumeBatch();
|
||||
dequeued.ConsumeBatch();
|
||||
dequeued.IsExhausted.ShouldBeTrue();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void RemoveExpired_handles_all_expired()
|
||||
{
|
||||
var queue = new WaitingRequestQueue();
|
||||
queue.Enqueue(new PullRequest("a", Batch: 1, MaxBytes: 0, Expires: DateTimeOffset.UtcNow.AddMilliseconds(-100), NoWait: false));
|
||||
queue.Enqueue(new PullRequest("b", Batch: 1, MaxBytes: 0, Expires: DateTimeOffset.UtcNow.AddMilliseconds(-50), NoWait: false));
|
||||
|
||||
queue.RemoveExpired(DateTimeOffset.UtcNow);
|
||||
queue.Count.ShouldBe(0);
|
||||
queue.IsEmpty.ShouldBeTrue();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void PinId_is_stored()
|
||||
{
|
||||
var queue = new WaitingRequestQueue();
|
||||
queue.Enqueue(new PullRequest("pin", Batch: 1, MaxBytes: 0, Expires: DateTimeOffset.UtcNow.AddMinutes(1), NoWait: false, PinId: "pin-123"));
|
||||
|
||||
var dequeued = queue.TryDequeue()!;
|
||||
dequeued.PinId.ShouldBe("pin-123");
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user