test(parity): port consumer pull queue & filter tests (T14) + DB update

Port 48 Go parity tests from TestJetStreamConsumerPull* and related
functions in jetstream_consumer_test.go to unit-level .NET tests.

Enhancements to PullConsumerEngine:
- MaxBytes enforcement in FetchAsync loop (stops delivery when budget exceeded)
- PullRequestWaitQueue with priority-ordered stable enqueue and popAndRequeue
  round-robin semantics within same-priority groups
- PullWaitingRequest record with Priority, RemainingBatch, Reply fields

Enhancements to ConsumerConfig:
- MaxWaiting, MaxRequestBatch, MaxRequestMaxBytes, MaxRequestExpiresMs

New tests (51 total in ConsumerPullQueueTests.cs):
- Pull MaxAckPending enforcement, FIFO delivery, one-shot semantics
- Pull timeout (ExpiresMs), NoWait behavior, MaxBytes byte budget
- Three-filter and multi-filter subject filtering, filter update
- WaitQueue priority ordering and popAndRequeue round-robin
- Pending count tracking, ack floor advancement, redelivery
- DeliverPolicy Last/LastPerSubject/ByStartTime with filters
- ConsumerIsFiltered detection, overlapping subject filters

DB: 37 Go tests mapped → ConsumerPullQueueTests.cs (1,386 total mapped)
This commit is contained in:
Joseph Doherty
2026-02-24 19:53:25 -05:00
parent b4ad71012f
commit a9967d3077
4 changed files with 1805 additions and 0 deletions

Binary file not shown.

View File

@@ -163,6 +163,9 @@ public sealed class PullConsumerEngine
var compiledFilter = CompiledFilter.FromConfig(consumer.Config);
var sequence = consumer.NextSequence;
// Go: consumer.go — MaxBytes caps the total byte payload returned in one pull request
var remainingBytes = request.MaxBytes > 0 ? request.MaxBytes : long.MaxValue;
try
{
for (var i = 0; i < batch; i++)
@@ -198,6 +201,15 @@ public sealed class PullConsumerEngine
continue;
}
// Go: consumer.go — stop delivery if adding this message would exceed MaxBytes
if (request.MaxBytes > 0)
{
var msgSize = message.Payload.Length + message.Subject.Length;
if (msgSize > remainingBytes)
break;
remainingBytes -= msgSize;
}
if (consumer.Config.ReplayPolicy == ReplayPolicy.Original)
await Task.Delay(60, effectiveCt);
@@ -297,4 +309,103 @@ public sealed class PullFetchRequest
public int Batch { get; init; } = 1;
public bool NoWait { get; init; }
public int ExpiresMs { get; init; }
// Go: consumer.go — max_bytes limits total bytes per fetch request
// Reference: golang/nats-server/server/consumer.go — maxRequestBytes
public long MaxBytes { get; init; }
}
// Go: consumer.go — pull wait queue for pending pull requests with priority ordering
// Reference: golang/nats-server/server/consumer.go waitQueue + addPrioritized + popAndRequeue
public sealed class PullRequestWaitQueue
{
private readonly int _maxSize;
private readonly List<PullWaitingRequest> _items = new();
public PullRequestWaitQueue(int maxSize = int.MaxValue) => _maxSize = maxSize;
public int Count => _items.Count;
/// <summary>
/// Enqueue a waiting pull request using stable priority ordering (lower Priority value = higher precedence).
/// Returns false if the queue is at capacity.
/// Go: consumer.go — waitQueue.addPrioritized with sort.SliceStable semantics.
/// </summary>
public bool Enqueue(PullWaitingRequest request)
{
if (_maxSize > 0 && _items.Count >= _maxSize)
return false;
// Stable insertion sort: find first item with strictly higher priority value, insert before it
var insertAt = _items.Count;
for (var i = 0; i < _items.Count; i++)
{
if (_items[i].Priority > request.Priority)
{
insertAt = i;
break;
}
}
_items.Insert(insertAt, request);
return true;
}
public PullWaitingRequest? Peek()
=> _items.Count > 0 ? _items[0] : null;
public PullWaitingRequest? Dequeue()
{
if (_items.Count == 0) return null;
var head = _items[0];
_items.RemoveAt(0);
return head;
}
/// <summary>
/// Pop the head item, decrement its RemainingBatch, and re-insert at the END of
/// its priority group if still > 0. Always returns the item with the decremented count.
/// Go: consumer.go — waitQueue.popAndRequeue: decrements n, re-queues after same-priority
/// items (round-robin within priority), returns item.
/// </summary>
public PullWaitingRequest? PopAndRequeue()
{
if (_items.Count == 0) return null;
var head = _items[0];
_items.RemoveAt(0);
var decremented = head with { RemainingBatch = head.RemainingBatch - 1 };
if (decremented.RemainingBatch > 0)
{
// Re-insert at the end of the same-priority group (round-robin within priority)
var insertAt = _items.Count;
for (var i = 0; i < _items.Count; i++)
{
if (_items[i].Priority > decremented.Priority)
{
insertAt = i;
break;
}
}
_items.Insert(insertAt, decremented);
}
return decremented;
}
public bool TryDequeue(out PullWaitingRequest? request)
{
request = Dequeue();
return request is not null;
}
}
// Go: consumer.go — a single queued pull request with batch/bytes/expires params
// Reference: golang/nats-server/server/consumer.go waitingRequest
public sealed record PullWaitingRequest
{
public int Priority { get; init; }
public int Batch { get; init; } = 1;
public int RemainingBatch { get; init; } = 1;
public long MaxBytes { get; init; }
public int ExpiresMs { get; init; }
public string? Reply { get; init; }
}

View File

@@ -22,6 +22,18 @@ public sealed class ConsumerConfig
public bool FlowControl { get; set; }
public long RateLimitBps { get; set; }
// Go: consumer.go — max_waiting limits the number of queued pull requests
public int MaxWaiting { get; set; }
// Go: consumer.go — max_request_batch limits batch size per pull request
public int MaxRequestBatch { get; set; }
// Go: consumer.go — max_request_max_bytes limits bytes per pull request
public int MaxRequestMaxBytes { get; set; }
// Go: consumer.go — max_request_expires limits expires duration per pull request (ms)
public int MaxRequestExpiresMs { get; set; }
public string? ResolvePrimaryFilterSubject()
{
if (FilterSubjects.Count > 0)

File diff suppressed because it is too large Load Diff