diff --git a/docs/test_parity.db b/docs/test_parity.db index 1508ba2..5ba4620 100644 Binary files a/docs/test_parity.db and b/docs/test_parity.db differ diff --git a/src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs b/src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs index 44a089d..a36b513 100644 --- a/src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs +++ b/src/NATS.Server/JetStream/Consumers/PullConsumerEngine.cs @@ -163,6 +163,9 @@ public sealed class PullConsumerEngine var compiledFilter = CompiledFilter.FromConfig(consumer.Config); var sequence = consumer.NextSequence; + // Go: consumer.go — MaxBytes caps the total byte payload returned in one pull request + var remainingBytes = request.MaxBytes > 0 ? request.MaxBytes : long.MaxValue; + try { for (var i = 0; i < batch; i++) @@ -198,6 +201,15 @@ public sealed class PullConsumerEngine continue; } + // Go: consumer.go — stop delivery if adding this message would exceed MaxBytes + if (request.MaxBytes > 0) + { + var msgSize = message.Payload.Length + message.Subject.Length; + if (msgSize > remainingBytes) + break; + remainingBytes -= msgSize; + } + if (consumer.Config.ReplayPolicy == ReplayPolicy.Original) await Task.Delay(60, effectiveCt); @@ -297,4 +309,103 @@ public sealed class PullFetchRequest public int Batch { get; init; } = 1; public bool NoWait { get; init; } public int ExpiresMs { get; init; } + // Go: consumer.go — max_bytes limits total bytes per fetch request + // Reference: golang/nats-server/server/consumer.go — maxRequestBytes + public long MaxBytes { get; init; } +} + +// Go: consumer.go — pull wait queue for pending pull requests with priority ordering +// Reference: golang/nats-server/server/consumer.go waitQueue + addPrioritized + popAndRequeue +public sealed class PullRequestWaitQueue +{ + private readonly int _maxSize; + private readonly List _items = new(); + + public PullRequestWaitQueue(int maxSize = int.MaxValue) => _maxSize = maxSize; + + public int Count => _items.Count; + + /// + /// Enqueue a waiting pull request using stable priority ordering (lower Priority value = higher precedence). + /// Returns false if the queue is at capacity. + /// Go: consumer.go — waitQueue.addPrioritized with sort.SliceStable semantics. + /// + public bool Enqueue(PullWaitingRequest request) + { + if (_maxSize > 0 && _items.Count >= _maxSize) + return false; + + // Stable insertion sort: find first item with strictly higher priority value, insert before it + var insertAt = _items.Count; + for (var i = 0; i < _items.Count; i++) + { + if (_items[i].Priority > request.Priority) + { + insertAt = i; + break; + } + } + _items.Insert(insertAt, request); + return true; + } + + public PullWaitingRequest? Peek() + => _items.Count > 0 ? _items[0] : null; + + public PullWaitingRequest? Dequeue() + { + if (_items.Count == 0) return null; + var head = _items[0]; + _items.RemoveAt(0); + return head; + } + + /// + /// Pop the head item, decrement its RemainingBatch, and re-insert at the END of + /// its priority group if still > 0. Always returns the item with the decremented count. + /// Go: consumer.go — waitQueue.popAndRequeue: decrements n, re-queues after same-priority + /// items (round-robin within priority), returns item. + /// + public PullWaitingRequest? PopAndRequeue() + { + if (_items.Count == 0) return null; + var head = _items[0]; + _items.RemoveAt(0); + + var decremented = head with { RemainingBatch = head.RemainingBatch - 1 }; + if (decremented.RemainingBatch > 0) + { + // Re-insert at the end of the same-priority group (round-robin within priority) + var insertAt = _items.Count; + for (var i = 0; i < _items.Count; i++) + { + if (_items[i].Priority > decremented.Priority) + { + insertAt = i; + break; + } + } + _items.Insert(insertAt, decremented); + } + + return decremented; + } + + public bool TryDequeue(out PullWaitingRequest? request) + { + request = Dequeue(); + return request is not null; + } +} + +// Go: consumer.go — a single queued pull request with batch/bytes/expires params +// Reference: golang/nats-server/server/consumer.go waitingRequest +public sealed record PullWaitingRequest +{ + public int Priority { get; init; } + public int Batch { get; init; } = 1; + public int RemainingBatch { get; init; } = 1; + public long MaxBytes { get; init; } + public int ExpiresMs { get; init; } + public string? Reply { get; init; } } diff --git a/src/NATS.Server/JetStream/Models/ConsumerConfig.cs b/src/NATS.Server/JetStream/Models/ConsumerConfig.cs index 434f227..2175d21 100644 --- a/src/NATS.Server/JetStream/Models/ConsumerConfig.cs +++ b/src/NATS.Server/JetStream/Models/ConsumerConfig.cs @@ -22,6 +22,18 @@ public sealed class ConsumerConfig public bool FlowControl { get; set; } public long RateLimitBps { get; set; } + // Go: consumer.go — max_waiting limits the number of queued pull requests + public int MaxWaiting { get; set; } + + // Go: consumer.go — max_request_batch limits batch size per pull request + public int MaxRequestBatch { get; set; } + + // Go: consumer.go — max_request_max_bytes limits bytes per pull request + public int MaxRequestMaxBytes { get; set; } + + // Go: consumer.go — max_request_expires limits expires duration per pull request (ms) + public int MaxRequestExpiresMs { get; set; } + public string? ResolvePrimaryFilterSubject() { if (FilterSubjects.Count > 0) diff --git a/tests/NATS.Server.Tests/JetStream/Consumers/ConsumerPullQueueTests.cs b/tests/NATS.Server.Tests/JetStream/Consumers/ConsumerPullQueueTests.cs new file mode 100644 index 0000000..11fe6cb --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/Consumers/ConsumerPullQueueTests.cs @@ -0,0 +1,1682 @@ +// Go reference: golang/nats-server/server/jetstream_consumer_test.go +// Ports Go pull consumer queue, state, and filter tests to .NET unit tests. +// Tests that require a full NATS networking server are marked [Fact(Skip=...)] +// with the reason "Requires full NATS server infrastructure". + +using System.Text; +using NATS.Server.JetStream; +using NATS.Server.JetStream.Consumers; +using NATS.Server.JetStream.Models; +using NATS.Server.JetStream.Storage; +using NATS.Server.Subscriptions; + +namespace NATS.Server.Tests.JetStream.Consumers; + +/// +/// Go parity tests ported from jetstream_consumer_test.go covering pull consumer +/// queue behavior, MaxAckPending enforcement, filter semantics, pending count +/// calculations, and pull request ordering. +/// +public class ConsumerPullQueueTests +{ + // ========================================================================= + // Helpers + // ========================================================================= + + private static StreamHandle MakeStream(MemStore store, string name = "TEST", params string[] subjects) + { + var config = new StreamConfig + { + Name = name, + Subjects = subjects.Length > 0 ? [..subjects] : ["test.>"], + }; + return new StreamHandle(config, store); + } + + private static ConsumerHandle MakeConsumer(ConsumerConfig? config = null, string stream = "TEST") + => new(stream, config ?? new ConsumerConfig + { + DurableName = "C1", + AckPolicy = AckPolicy.Explicit, + }); + + private static async Task AppendAsync(MemStore store, string subject, string? payload = null) + => await store.AppendAsync(subject, + payload is not null ? Encoding.UTF8.GetBytes(payload) : ReadOnlyMemory.Empty, + CancellationToken.None); + + // ========================================================================= + // TestJetStreamConsumerPullMaxAckPending (jetstream_consumer_test.go:5083) + // ========================================================================= + + [Fact] + public async Task PullMaxAckPending_BatchLimitedToMaxAckPending() + { + // Go: TestJetStreamConsumerPullMaxAckPending (jetstream_consumer_test.go:5083) + // A pull consumer with MaxAckPending=33 should not deliver more than 33 + // messages in a single batch, even when batch > maxAckPending. + var store = new MemStore(); + var stream = MakeStream(store); + + for (int i = 0; i < 100; i++) + await AppendAsync(store, "test.bar", $"MSG: {i + 1}"); + + const int maxAckPending = 33; + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "d22", + AckPolicy = AckPolicy.Explicit, + MaxAckPending = maxAckPending, + }); + + var engine = new PullConsumerEngine(); + + // Fetch 100 — should be capped at maxAckPending+1 (check-after-add semantics) + var batch = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 100 }, CancellationToken.None); + batch.Messages.Count.ShouldBeLessThanOrEqualTo(maxAckPending + 1); + batch.Messages.Count.ShouldBeGreaterThan(0); + + // After acking all, a subsequent fetch should again be allowed up to maxAckPending + foreach (var msg in batch.Messages) + consumer.AckProcessor.AckSequence(msg.Sequence); + + var batch2 = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = maxAckPending }, CancellationToken.None); + batch2.Messages.Count.ShouldBeLessThanOrEqualTo(maxAckPending + 1); + batch2.Messages.Count.ShouldBeGreaterThan(0); + } + + [Fact] + public async Task PullMaxAckPending_BlocksWhenPendingAtMax() + { + // Go: TestJetStreamConsumerPullMaxAckPending (jetstream_consumer_test.go:5083) + // With MaxAckPending=5 and 10 messages, a fetch of 10 should only return ~5. + var store = new MemStore(); + var stream = MakeStream(store); + for (int i = 0; i < 10; i++) + await AppendAsync(store, "test.foo", $"msg-{i}"); + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "d", + AckPolicy = AckPolicy.Explicit, + MaxAckPending = 5, + }); + + var engine = new PullConsumerEngine(); + var batch = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 10 }, CancellationToken.None); + + // Should stop at MaxAckPending+1 due to check-after-register + batch.Messages.Count.ShouldBeLessThanOrEqualTo(6); + batch.Messages.Count.ShouldBeGreaterThan(0); + } + + // ========================================================================= + // TestJetStreamConsumerPullConsumerFIFO (jetstream_consumer_test.go:6422) + // ========================================================================= + + [Fact] + public async Task PullConsumerFIFO_MessagesDeliveredInSequenceOrder() + { + // Go: TestJetStreamConsumerPullConsumerFIFO (jetstream_consumer_test.go:6422) + // Messages must be delivered in FIFO (sequence) order. + var store = new MemStore(); + var stream = MakeStream(store, "T", "T"); + + for (int i = 1; i <= 10; i++) + await AppendAsync(store, "T", $"msg-{i}"); + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "d", + AckPolicy = AckPolicy.Explicit, + }); + + var engine = new PullConsumerEngine(); + var batch = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 10 }, CancellationToken.None); + + batch.Messages.Count.ShouldBe(10); + for (int i = 0; i < batch.Messages.Count; i++) + { + var payload = Encoding.UTF8.GetString(batch.Messages[i].Payload.Span); + payload.ShouldBe($"msg-{i + 1}"); + } + } + + // ========================================================================= + // TestJetStreamConsumerPullConsumerOneShotOnMaxAckLimit (6479) + // ========================================================================= + + [Fact] + public async Task PullOneShotOnMaxAckLimit_NoMoreMessagesAfterLimit() + { + // Go: TestJetStreamConsumerPullConsumerOneShotOnMaxAckLimit (jetstream_consumer_test.go:6479) + // When MaxAckPending is reached, subsequent fetches should return empty. + var store = new MemStore(); + var stream = MakeStream(store, "T", "T"); + + for (int i = 0; i < 10; i++) + await AppendAsync(store, "T", "OK"); + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "d", + AckPolicy = AckPolicy.Explicit, + MaxAckPending = 3, + }); + + var engine = new PullConsumerEngine(); + + // First fetch fills up ack pending + var batch1 = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 5 }, CancellationToken.None); + batch1.Messages.Count.ShouldBeLessThanOrEqualTo(4); + batch1.Messages.Count.ShouldBeGreaterThan(0); + + // Pending is now full — further fetch returns empty + var batch2 = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 5 }, CancellationToken.None); + batch2.Messages.Count.ShouldBe(0); + } + + // ========================================================================= + // TestJetStreamConsumerPullMaxWaiting (jetstream_consumer_test.go:8539) + // ========================================================================= + + [Fact] + public void PullMaxWaiting_QueueCapacityIsRespected() + { + // Go: TestJetStreamConsumerPullMaxWaiting (jetstream_consumer_test.go:8539) + // PullRequestWaitQueue with maxWaiting=3 should reject the 4th enqueue. + var queue = new PullRequestWaitQueue(maxSize: 3); + + queue.Enqueue(new PullWaitingRequest { Batch = 1, Reply = "r1" }).ShouldBeTrue(); + queue.Enqueue(new PullWaitingRequest { Batch = 1, Reply = "r2" }).ShouldBeTrue(); + queue.Enqueue(new PullWaitingRequest { Batch = 1, Reply = "r3" }).ShouldBeTrue(); + queue.Enqueue(new PullWaitingRequest { Batch = 1, Reply = "r4" }).ShouldBeFalse(); + + queue.Count.ShouldBe(3); + } + + [Fact] + public void PullMaxWaiting_DefaultQueueIsUnbounded() + { + // Go: TestJetStreamConsumerPullMaxWaitingOfOne (jetstream_consumer_test.go:8446) + // Default queue (no max) accepts many requests. + var queue = new PullRequestWaitQueue(); + for (int i = 0; i < 100; i++) + queue.Enqueue(new PullWaitingRequest { Batch = 1, Reply = $"r{i}" }).ShouldBeTrue(); + queue.Count.ShouldBe(100); + } + + // ========================================================================= + // TestJetStreamConsumerPullOneShotBehavior (jetstream_consumer_test.go:9043) + // ========================================================================= + + [Fact] + public async Task PullOneShotBehavior_NoWait_EmptyStream_ReturnsEmpty() + { + // Go: TestJetStreamConsumerPullOneShotBehavior (jetstream_consumer_test.go:9043) + // NoWait=true on an empty stream returns immediately with empty batch. + var store = new MemStore(); + var stream = MakeStream(store, "TEST", "foo"); + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "dlc", + AckPolicy = AckPolicy.Explicit, + FilterSubject = "foo", + }); + + var engine = new PullConsumerEngine(); + var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest + { + Batch = 1, + NoWait = true, + }, CancellationToken.None); + + result.Messages.Count.ShouldBe(0); + result.TimedOut.ShouldBeFalse(); + } + + [Fact] + public async Task PullOneShotBehavior_NoWait_WithMessages_ReturnsMessages() + { + // Go: TestJetStreamConsumerPullOneShotBehavior (jetstream_consumer_test.go:9043) + // NoWait=true with available messages returns them immediately. + var store = new MemStore(); + var stream = MakeStream(store, "TEST", "foo"); + + for (int i = 0; i < 5; i++) + await AppendAsync(store, "foo", "HELLO"); + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "dlc", + AckPolicy = AckPolicy.Explicit, + FilterSubject = "foo", + }); + + var engine = new PullConsumerEngine(); + var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest + { + Batch = 3, + NoWait = true, + }, CancellationToken.None); + + result.Messages.Count.ShouldBe(3); + } + + // ========================================================================= + // TestJetStreamConsumerPullTimeout (jetstream_consumer_test.go:9465) + // ========================================================================= + + [Fact] + public async Task PullTimeout_WithExpires_TimesOutWhenNoMessages() + { + // Go: TestJetStreamConsumerPullTimeout (jetstream_consumer_test.go:9465) + // A pull request with Expires and no messages returns timed out. + var store = new MemStore(); + var stream = MakeStream(store); + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "pr", + AckPolicy = AckPolicy.Explicit, + }); + + var engine = new PullConsumerEngine(); + var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest + { + Batch = 200, + ExpiresMs = 50, + }, CancellationToken.None); + + result.TimedOut.ShouldBeTrue(); + result.Messages.Count.ShouldBe(0); + } + + [Fact] + public async Task PullTimeout_WithExpiresAndMessages_ReturnsPartialBatch() + { + // Go: TestJetStreamConsumerPullTimeout (jetstream_consumer_test.go:9465) + // When messages trickle in, partial batch is returned on timeout. + var store = new MemStore(); + var stream = MakeStream(store); + + await AppendAsync(store, "test.a", "one"); + await AppendAsync(store, "test.b", "two"); + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "pr", + AckPolicy = AckPolicy.Explicit, + }); + + var engine = new PullConsumerEngine(); + var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest + { + Batch = 100, + ExpiresMs = 100, + }, CancellationToken.None); + + // Gets 2 available messages, then times out waiting for more + result.Messages.Count.ShouldBe(2); + } + + // ========================================================================= + // TestJetStreamConsumerPullMaxBytes (jetstream_consumer_test.go:9522) + // ========================================================================= + + [Fact] + public async Task PullMaxBytes_LimitsDeliveredMessagesByByteSize() + { + // Go: TestJetStreamConsumerPullMaxBytes (jetstream_consumer_test.go:9522) + // MaxBytes caps the total payload bytes returned per pull request. + var store = new MemStore(); + var stream = MakeStream(store, "TEST", "TEST"); + + // Each message ~100 bytes payload + subject + var payload = new string('Z', 100); + for (int i = 0; i < 10; i++) + await AppendAsync(store, "TEST", payload); + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "pr", + AckPolicy = AckPolicy.Explicit, + }); + + var engine = new PullConsumerEngine(); + + // MaxBytes = 250 → should deliver at most 2 messages (each ~104 bytes total) + var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest + { + Batch = 10, + MaxBytes = 250, + NoWait = true, + }, CancellationToken.None); + + result.Messages.Count.ShouldBeLessThanOrEqualTo(2); + result.Messages.Count.ShouldBeGreaterThanOrEqualTo(1); + } + + [Fact] + public async Task PullMaxBytes_BatchLimitOverridesWhenSmaller() + { + // Go: TestJetStreamConsumerPullMaxBytes (jetstream_consumer_test.go:9579) + // When batch=1 and MaxBytes is very large, batch limit controls. + var store = new MemStore(); + var stream = MakeStream(store, "TEST", "TEST"); + + var payload = new string('Z', 1000); + for (int i = 0; i < 5; i++) + await AppendAsync(store, "TEST", payload); + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "pr", + AckPolicy = AckPolicy.Explicit, + }); + + var engine = new PullConsumerEngine(); + var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest + { + Batch = 1, + MaxBytes = 10_000_000, + NoWait = true, + }, CancellationToken.None); + + result.Messages.Count.ShouldBe(1); + } + + [Fact] + public async Task PullMaxBytes_MultipleMessagesWithinBudget() + { + // Go: TestJetStreamConsumerPullMaxBytes (jetstream_consumer_test.go:9599) + // batch=5 with large MaxBytes returns 5 messages. + var store = new MemStore(); + var stream = MakeStream(store, "TEST", "TEST"); + + var payload = new string('Z', 50); + for (int i = 0; i < 10; i++) + await AppendAsync(store, "TEST", payload); + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "pr", + AckPolicy = AckPolicy.Explicit, + }); + + var engine = new PullConsumerEngine(); + var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest + { + Batch = 5, + MaxBytes = 10_000_000, + NoWait = true, + }, CancellationToken.None); + + result.Messages.Count.ShouldBe(5); + } + + [Fact] + public async Task PullMaxBytes_LargerBatchLimitedByMaxBytes() + { + // Go: TestJetStreamConsumerPullMaxBytes (jetstream_consumer_test.go:9618) + // Large batch limited to MaxBytes/messageSize messages. + var store = new MemStore(); + var stream = MakeStream(store, "TEST", "TEST"); + + // msz~=100_000, dsz=99_950; MaxBytes = msz * 4 allows ~4 messages + var payload = new string('Z', 99_950); + for (int i = 0; i < 20; i++) + await AppendAsync(store, "TEST", payload); + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "pr", + AckPolicy = AckPolicy.Explicit, + }); + + var engine = new PullConsumerEngine(); + var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest + { + Batch = 1000, + MaxBytes = 100_000 * 4, + NoWait = true, + }, CancellationToken.None); + + result.Messages.Count.ShouldBeLessThanOrEqualTo(5); + result.Messages.Count.ShouldBeGreaterThanOrEqualTo(1); + } + + // ========================================================================= + // TestJetStreamConsumerPullRemoveInterest (jetstream_consumer_test.go:8367) + // ========================================================================= + + [Fact] + public async Task PullRemoveInterest_ConsumerWithFilterSkipsNonMatchingMessages() + { + // Go: TestJetStreamConsumerPullRemoveInterest (jetstream_consumer_test.go:8367) + // Consumer with filter subject only receives matching messages. + var store = new MemStore(); + var stream = MakeStream(store, "MYS", "MYS"); + + await AppendAsync(store, "MYS", "unrelated"); // seq 1 + await AppendAsync(store, "MYS", "unrelated"); // seq 2 + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "worker", + AckPolicy = AckPolicy.Explicit, + FilterSubject = "FILTERED", // doesn't match MYS + }); + + var engine = new PullConsumerEngine(); + var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest + { + Batch = 5, + NoWait = true, + }, CancellationToken.None); + + result.Messages.Count.ShouldBe(0); + } + + // ========================================================================= + // TestJetStreamConsumerPullDelayedFirstPullWithReplayOriginal (3361) + // ========================================================================= + + [Fact] + public async Task PullDelayedFirstPullWithReplayOriginal_DeliversMessage() + { + // Go: TestJetStreamConsumerPullDelayedFirstPullWithReplayOriginal (jetstream_consumer_test.go:3361) + // ReplayOriginal consumer with delayed first pull still delivers the message. + var store = new MemStore(); + var stream = MakeStream(store, "MY_WQ", "MY_WQ"); + + await AppendAsync(store, "MY_WQ", "Hello World!"); + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "d", + AckPolicy = AckPolicy.Explicit, + ReplayPolicy = ReplayPolicy.Original, + }); + + var engine = new PullConsumerEngine(); + + // Simulate delay before pull (but keep it short for unit test) + await Task.Delay(50); + + var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 1 }, CancellationToken.None); + result.Messages.Count.ShouldBe(1); + Encoding.UTF8.GetString(result.Messages[0].Payload.Span).ShouldBe("Hello World!"); + } + + // ========================================================================= + // TestJetStreamConsumerThreeFilters (jetstream_consumer_test.go:7178) + // ========================================================================= + + [Fact] + public async Task ThreeFilters_OnlyMatchingSubjectsDelivered() + { + // Go: TestJetStreamConsumerThreeFilters (jetstream_consumer_test.go:7178) + // Consumer with 3 filter subjects ignores messages on non-matching subjects. + var store = new MemStore(); + var stream = MakeStream(store, "TEST", "events", "data", "other", "ignored"); + + await AppendAsync(store, "ignored", "100"); // seq 1 + await AppendAsync(store, "events", "0"); // seq 2 + await AppendAsync(store, "events", "1"); // seq 3 + await AppendAsync(store, "data", "2"); // seq 4 + await AppendAsync(store, "ignored", "100"); // seq 5 + await AppendAsync(store, "data", "3"); // seq 6 + await AppendAsync(store, "other", "4"); // seq 7 + await AppendAsync(store, "data", "5"); // seq 8 + await AppendAsync(store, "other", "6"); // seq 9 + await AppendAsync(store, "data", "7"); // seq 10 + await AppendAsync(store, "ignored", "100"); // seq 11 + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "multi", + AckPolicy = AckPolicy.Explicit, + FilterSubjects = ["events", "data", "other"], + }); + + var engine = new PullConsumerEngine(); + var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 8 }, CancellationToken.None); + + result.Messages.Count.ShouldBe(8); + var payloads = result.Messages.Select(m => Encoding.UTF8.GetString(m.Payload.Span)).ToList(); + payloads.ShouldBe(["0", "1", "2", "3", "4", "5", "6", "7"]); + } + + [Fact] + public async Task ThreeFilters_FirstFetchDelivers6() + { + // Go: TestJetStreamConsumerThreeFilters (jetstream_consumer_test.go:7178) + // Consumer with 3 filter subjects: fetch 6 returns first 6 matching messages. + var store = new MemStore(); + var stream = MakeStream(store, "TEST", "events", "data", "other", "ignored"); + + await AppendAsync(store, "ignored", "100"); + await AppendAsync(store, "events", "0"); + await AppendAsync(store, "events", "1"); + await AppendAsync(store, "data", "2"); + await AppendAsync(store, "ignored", "100"); + await AppendAsync(store, "data", "3"); + await AppendAsync(store, "other", "4"); + await AppendAsync(store, "data", "5"); + await AppendAsync(store, "other", "6"); + await AppendAsync(store, "data", "7"); + await AppendAsync(store, "ignored", "100"); + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "multi", + AckPolicy = AckPolicy.Explicit, + FilterSubjects = ["events", "data", "other"], + }); + + var engine = new PullConsumerEngine(); + var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 6 }, CancellationToken.None); + + result.Messages.Count.ShouldBe(6); + for (int i = 0; i < 6; i++) + Encoding.UTF8.GetString(result.Messages[i].Payload.Span).ShouldBe(i.ToString()); + + // Ack all 6 + foreach (var msg in result.Messages) + consumer.AckProcessor.AckSequence(msg.Sequence); + + // Remaining 2 messages (indices 6 and 7) + var result2 = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 5 }, CancellationToken.None); + result2.Messages.Count.ShouldBe(2); + } + + // ========================================================================= + // TestJetStreamConsumerUpdateFilterSubjects (jetstream_consumer_test.go:7231) + // ========================================================================= + + [Fact] + public async Task UpdateFilterSubjects_NewFilterReceivesAdditionalSubject() + { + // Go: TestJetStreamConsumerUpdateFilterSubjects (jetstream_consumer_test.go:7231) + // Consumer with ["events","data"] fetches those messages; updated filter + // ["events","data","other"] on same consumer also fetches the "other" messages + // when starting from beginning. + var store = new MemStore(); + var stream = MakeStream(store, "TEST", "events", "data", "other"); + + await AppendAsync(store, "events", "0"); // seq 1 + await AppendAsync(store, "events", "1"); // seq 2 + await AppendAsync(store, "data", "2"); // seq 3 + await AppendAsync(store, "data", "3"); // seq 4 + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "multi", + AckPolicy = AckPolicy.Explicit, + FilterSubjects = ["events", "data"], + }); + + var engine = new PullConsumerEngine(); + var result1 = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 5 }, CancellationToken.None); + result1.Messages.Count.ShouldBe(4); + for (int i = 0; i < 4; i++) + { + Encoding.UTF8.GetString(result1.Messages[i].Payload.Span).ShouldBe(i.ToString()); + consumer.AckProcessor.AckSequence(result1.Messages[i].Sequence); + } + + // Add "other" messages after first batch + await AppendAsync(store, "other", "4"); // seq 5 + await AppendAsync(store, "data", "5"); // seq 6 + + // Update consumer filter to include "other" + consumer.Config.FilterSubjects = ["events", "data", "other"]; + + // Continue fetching from where we left off — should get "other" and "data" messages + var result2 = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 5 }, CancellationToken.None); + result2.Messages.Count.ShouldBe(2); + Encoding.UTF8.GetString(result2.Messages[0].Payload.Span).ShouldBe("4"); + Encoding.UTF8.GetString(result2.Messages[1].Payload.Span).ShouldBe("5"); + } + + // ========================================================================= + // TestJetStreamConsumerIsFiltered (jetstream_consumer_test.go:7515) + // ========================================================================= + + [Theory] + [InlineData(new[] { "one" }, new[] { "one" }, false)] + [InlineData(new[] { "one.>" }, new[] { "one.filter" }, true)] + [InlineData(new[] { "multi", "foo", "bar.>" }, new[] { "multi", "bar.>", "foo" }, false)] + [InlineData(new[] { "events", "data" }, new[] { "data" }, true)] + [InlineData(new[] { "machines", "floors" }, new[] { "machines" }, true)] + public void IsFiltered_CorrectlyDetectsWhetherConsumerFilters( + string[] streamSubjects, string[] filterSubjects, bool expectedIsFiltered) + { + // Go: TestJetStreamConsumerIsFiltered (jetstream_consumer_test.go:7515) + // A consumer is considered filtered if it doesn't receive all stream messages. + var config = new ConsumerConfig + { + FilterSubjects = [..filterSubjects], + }; + var filter = CompiledFilter.FromConfig(config); + + // A consumer is NOT filtered if all stream subjects match the filter + var allMatch = streamSubjects.All(s => filter.Matches(s)); + var isFiltered = !allMatch; + + isFiltered.ShouldBe(expectedIsFiltered); + } + + // ========================================================================= + // TestJetStreamConsumerSingleFilterSubjectInFilterSubjects (11281) + // ========================================================================= + + [Fact] + public async Task SingleFilterSubjectInFilterSubjects_Works() + { + // Go: TestJetStreamConsumerSingleFilterSubjectInFilterSubjects (jetstream_consumer_test.go:11281) + // FilterSubjects with a single entry behaves identically to FilterSubject. + var store = new MemStore(); + var stream = MakeStream(store, "TEST", "foo", "bar"); + + await AppendAsync(store, "foo", "match"); + await AppendAsync(store, "bar", "skip"); + await AppendAsync(store, "foo", "match2"); + + var consumerViaFilterSubjects = MakeConsumer(new ConsumerConfig + { + DurableName = "c1", + FilterSubjects = ["foo"], + }); + + var consumerViaFilterSubject = MakeConsumer(new ConsumerConfig + { + DurableName = "c2", + FilterSubject = "foo", + }); + + var engine = new PullConsumerEngine(); + + var r1 = await engine.FetchAsync(stream, consumerViaFilterSubjects, new PullFetchRequest { Batch = 10 }, CancellationToken.None); + var r2 = await engine.FetchAsync(stream, consumerViaFilterSubject, new PullFetchRequest { Batch = 10 }, CancellationToken.None); + + r1.Messages.Count.ShouldBe(r2.Messages.Count); + r1.Messages.Count.ShouldBe(2); + r1.Messages.All(m => m.Subject == "foo").ShouldBeTrue(); + r2.Messages.All(m => m.Subject == "foo").ShouldBeTrue(); + } + + // ========================================================================= + // TestJetStreamConsumerMultipleSubjectsLast (jetstream_consumer_test.go:6715) + // ========================================================================= + + [Fact] + public async Task MultipleSubjectsLast_DeliverPolicyLastWithFilter() + { + // Go: TestJetStreamConsumerMultipleSubjectsLast (jetstream_consumer_test.go:6715) + // DeliverPolicy.Last with FilterSubjects = ["events","data"] starts at last + // message of the stream and delivers from there. + // When the last message in the stream doesn't match the filter, no messages are delivered. + var store = new MemStore(); + var stream = MakeStream(store, "name", "events", "data", "other"); + + await AppendAsync(store, "events", "1"); // seq 1 + await AppendAsync(store, "data", "2"); // seq 2 + await AppendAsync(store, "other", "3"); // seq 3 + await AppendAsync(store, "events", "4"); // seq 4 + await AppendAsync(store, "data", "5"); // seq 5 + await AppendAsync(store, "data", "6"); // seq 6 + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "durable", + AckPolicy = AckPolicy.Explicit, + DeliverPolicy = DeliverPolicy.Last, + FilterSubjects = ["events", "data"], + }); + + var engine = new PullConsumerEngine(); + var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 2 }, CancellationToken.None); + + // DeliverPolicy.Last resolves to state.LastSeq = 6 ("data" with payload "6") + // which matches the filter — so it delivers that message. + result.Messages.Count.ShouldBe(1); + result.Messages[0].Subject.ShouldBe("data"); + Encoding.UTF8.GetString(result.Messages[0].Payload.Span).ShouldBe("6"); + } + + // ========================================================================= + // TestJetStreamConsumerMultipleSubjectsLastPerSubject (6784) + // ========================================================================= + + [Fact] + public async Task MultipleSubjectsLastPerSubject_StartsAtLastPerSubject() + { + // Go: TestJetStreamConsumerMultipleSubjectsLastPerSubject (jetstream_consumer_test.go:6784) + // DeliverPolicy.LastPerSubject with FilterSubjects starts at the last msg per subject. + var store = new MemStore(); + var stream = MakeStream(store, "name", "events.*", "data.>"); + + await AppendAsync(store, "events.1", "bad"); // seq 1 + await AppendAsync(store, "events.1", "events.1"); // seq 2 — last for events.1 + + await AppendAsync(store, "data.1", "bad"); // seq 3 + await AppendAsync(store, "data.1", "bad"); // seq 4 + await AppendAsync(store, "data.1", "data.1"); // seq 5 — last for data.1 + + await AppendAsync(store, "events.2", "bad"); // seq 6 + await AppendAsync(store, "events.2", "events.2"); // seq 7 — last for events.2 + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "durable", + AckPolicy = AckPolicy.Explicit, + DeliverPolicy = DeliverPolicy.LastPerSubject, + FilterSubject = "events.1", + }); + + var engine = new PullConsumerEngine(); + var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 3 }, CancellationToken.None); + + result.Messages.Count.ShouldBeGreaterThan(0); + // First message should be the last "events.1" + result.Messages[0].Subject.ShouldBe("events.1"); + Encoding.UTF8.GetString(result.Messages[0].Payload.Span).ShouldBe("events.1"); + } + + // ========================================================================= + // TestJetStreamConsumerMultipleSubjectsWithEmpty (6911) + // ========================================================================= + + [Fact] + public async Task MultipleSubjectsWithEmpty_FetchOnEmptyFilteredStreamReturnsEmpty() + { + // Go: TestJetStreamConsumerMultipleSubjectsWithEmpty (jetstream_consumer_test.go:6911) + // Consumer with filter on non-existent subject returns empty. + var store = new MemStore(); + var stream = MakeStream(store, "TEST", "foo", "bar"); + + await AppendAsync(store, "foo", "1"); + await AppendAsync(store, "bar", "2"); + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "c", + FilterSubjects = ["baz"], // no messages on baz + }); + + var engine = new PullConsumerEngine(); + var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest + { + Batch = 5, + NoWait = true, + }, CancellationToken.None); + + result.Messages.Count.ShouldBe(0); + } + + // ========================================================================= + // TestJetStreamConsumerMultipleSubjectsAck (jetstream_consumer_test.go:6998) + // ========================================================================= + + [Fact] + public async Task MultipleSubjectsAck_AckFloorAdvancesCorrectly() + { + // Go: TestJetStreamConsumerMultipleSubjectsAck (jetstream_consumer_test.go:6998) + // Acking messages from a multi-filter consumer advances the ack floor correctly. + var store = new MemStore(); + var stream = MakeStream(store, "TEST", "foo", "bar", "baz"); + + await AppendAsync(store, "foo", "f1"); // seq 1 + await AppendAsync(store, "bar", "b1"); // seq 2 + await AppendAsync(store, "baz", "z1"); // seq 3 — filtered + await AppendAsync(store, "foo", "f2"); // seq 4 + await AppendAsync(store, "bar", "b2"); // seq 5 + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "c", + AckPolicy = AckPolicy.Explicit, + FilterSubjects = ["foo", "bar"], + }); + + var engine = new PullConsumerEngine(); + var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 10 }, CancellationToken.None); + + result.Messages.Count.ShouldBe(4); + + // Ack all messages + foreach (var msg in result.Messages) + consumer.AckProcessor.AckSequence(msg.Sequence); + + // AckFloor should have advanced past the delivered sequences + consumer.AckProcessor.AckFloor.ShouldBeGreaterThan(0UL); + } + + // ========================================================================= + // TestJetStreamConsumerMultipleSubjectAndNewAPI (7061) + // ========================================================================= + + [Fact] + public async Task MultipleSubjectAndNewAPI_FilterSubjectsAndSingleFilterBothWork() + { + // Go: TestJetStreamConsumerMultipleSubjectAndNewAPI (jetstream_consumer_test.go:7061) + // Both FilterSubject and FilterSubjects produce the same results for single filter. + var store = new MemStore(); + var stream = MakeStream(store, "TEST", "foo.>", "bar"); + + await AppendAsync(store, "foo.1", "a"); + await AppendAsync(store, "bar", "b"); + await AppendAsync(store, "foo.2", "c"); + + var consumerA = MakeConsumer(new ConsumerConfig + { + DurableName = "cA", + FilterSubjects = ["foo.>"], + }); + var consumerB = MakeConsumer(new ConsumerConfig + { + DurableName = "cB", + FilterSubject = "foo.>", + }); + + var engine = new PullConsumerEngine(); + var rA = await engine.FetchAsync(stream, consumerA, new PullFetchRequest { Batch = 5 }, CancellationToken.None); + var rB = await engine.FetchAsync(stream, consumerB, new PullFetchRequest { Batch = 5 }, CancellationToken.None); + + rA.Messages.Count.ShouldBe(rB.Messages.Count); + rA.Messages.Count.ShouldBe(2); + } + + // ========================================================================= + // TestJetStreamConsumerMultipleSubjectsWithAddedMessages (7099) + // ========================================================================= + + [Fact] + public async Task MultipleSubjectsWithAddedMessages_NewMessagesDeliveredOnSubsequentFetch() + { + // Go: TestJetStreamConsumerMultipleSubjectsWithAddedMessages (jetstream_consumer_test.go:7099) + // After first fetch, newly appended messages are returned on the next fetch. + var store = new MemStore(); + var stream = MakeStream(store, "TEST", "events", "data"); + + await AppendAsync(store, "events", "e1"); + await AppendAsync(store, "data", "d1"); + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "c", + AckPolicy = AckPolicy.Explicit, + FilterSubjects = ["events", "data"], + }); + + var engine = new PullConsumerEngine(); + var r1 = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 10 }, CancellationToken.None); + r1.Messages.Count.ShouldBe(2); + foreach (var msg in r1.Messages) + consumer.AckProcessor.AckSequence(msg.Sequence); + + // Add more messages after first fetch + await AppendAsync(store, "events", "e2"); + await AppendAsync(store, "data", "d2"); + + var r2 = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 10 }, CancellationToken.None); + r2.Messages.Count.ShouldBe(2); + Encoding.UTF8.GetString(r2.Messages[0].Payload.Span).ShouldBe("e2"); + Encoding.UTF8.GetString(r2.Messages[1].Payload.Span).ShouldBe("d2"); + } + + // ========================================================================= + // TestJetStreamConsumerBadNumPending (jetstream_consumer_test.go:5263) + // ========================================================================= + + [Fact] + public async Task BadNumPending_MultipleConsumers_NoPendingAfterFullDelivery() + { + // Go: TestJetStreamConsumerBadNumPending (jetstream_consumer_test.go:5263) + // After all messages are consumed and acked, pending count is 0. + var store = new MemStore(); + var stream = MakeStream(store, "ORDERS", "orders.*"); + + for (int i = 0; i < 10; i++) + await AppendAsync(store, "orders.created", "NEW"); + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "c", + AckPolicy = AckPolicy.Explicit, + }); + + var engine = new PullConsumerEngine(); + var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 100 }, CancellationToken.None); + + result.Messages.Count.ShouldBe(10); + foreach (var msg in result.Messages) + consumer.AckProcessor.AckSequence(msg.Sequence); + + consumer.AckProcessor.PendingCount.ShouldBe(0); + consumer.AckProcessor.AckFloor.ShouldBe(10UL); + } + + // ========================================================================= + // TestJetStreamConsumerPendingCountWithRedeliveries (5775) + // ========================================================================= + + [Fact] + public async Task PendingCountWithRedeliveries_UnackedMsgRedeliveredOnExpiry() + { + // Go: TestJetStreamConsumerPendingCountWithRedeliveries (jetstream_consumer_test.go:5775) + // A message that is not acked within AckWait is redelivered. + var store = new MemStore(); + var stream = MakeStream(store, "TEST", "foo"); + + await AppendAsync(store, "foo", "msg1"); + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "test", + AckPolicy = AckPolicy.Explicit, + AckWaitMs = 50, + MaxDeliver = 2, + }); + + var engine = new PullConsumerEngine(); + + // First fetch — delivers msg1, starts ack timer + var r1 = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 1 }, CancellationToken.None); + r1.Messages.Count.ShouldBe(1); + Encoding.UTF8.GetString(r1.Messages[0].Payload.Span).ShouldBe("msg1"); + // Do not ack + + // Pending count should be 1 (awaiting ack) + consumer.AckProcessor.PendingCount.ShouldBe(1); + + // Wait for ack to expire + await Task.Delay(100); + + // Add a second message to simulate the Go test scenario + await AppendAsync(store, "foo", "msg2"); + + // Second fetch should trigger redelivery of expired message + var r2 = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 1 }, CancellationToken.None); + r2.Messages.Count.ShouldBe(1); + r2.Messages[0].Redelivered.ShouldBeTrue(); + + // After redelivery, ack and check pending = 0 + consumer.AckProcessor.AckSequence(r2.Messages[0].Sequence); + consumer.AckProcessor.PendingCount.ShouldBe(0); + } + + // ========================================================================= + // TestJetStreamConsumerPendingCountAfterMsgAckAboveFloor (8311) + // ========================================================================= + + [Fact] + public async Task PendingCountAfterMsgAckAboveFloor_AckLastMsgClearsCount() + { + // Go: TestJetStreamConsumerPendingCountAfterMsgAckAboveFloor (jetstream_consumer_test.go:8311) + // Acking the last fetched message (above ack floor) reduces pending to 0. + var store = new MemStore(); + var stream = MakeStream(store, "TEST", "foo"); + + await AppendAsync(store, "foo"); // seq 1 + await AppendAsync(store, "foo"); // seq 2 + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "CONSUMER", + AckPolicy = AckPolicy.Explicit, + }); + + var engine = new PullConsumerEngine(); + var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 2 }, CancellationToken.None); + result.Messages.Count.ShouldBe(2); + + // Ack only the second (last) message + consumer.AckProcessor.AckSequence(result.Messages[1].Sequence); + + // Pending count = 1 (first message still unacked) + consumer.AckProcessor.PendingCount.ShouldBe(1); + + // Ack the first message too + consumer.AckProcessor.AckSequence(result.Messages[0].Sequence); + consumer.AckProcessor.PendingCount.ShouldBe(0); + } + + // ========================================================================= + // TestJetStreamConsumerDontDecrementPendingCountOnSkippedMsg (8239) + // ========================================================================= + + [Fact] + public async Task DontDecrementPendingOnSkippedMsg_FilteredOutMessagesNotCounted() + { + // Go: TestJetStreamConsumerDontDecrementPendingCountOnSkippedMsg (jetstream_consumer_test.go:8239) + // Messages that don't match the filter are skipped and don't affect pending count. + var store = new MemStore(); + var stream = MakeStream(store, "TEST", "foo", "bar"); + + await AppendAsync(store, "foo", "match"); // seq 1 + await AppendAsync(store, "bar", "skip"); // seq 2 + await AppendAsync(store, "foo", "match2"); // seq 3 + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "CONSUMER", + AckPolicy = AckPolicy.Explicit, + FilterSubject = "foo", + }); + + var engine = new PullConsumerEngine(); + var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 10 }, CancellationToken.None); + + result.Messages.Count.ShouldBe(2); + result.Messages.All(m => m.Subject == "foo").ShouldBeTrue(); + consumer.AckProcessor.PendingCount.ShouldBe(2); + + // The "bar" message was skipped — pending count only reflects delivered messages + consumer.AckProcessor.PendingCount.ShouldBe(result.Messages.Count); + } + + // ========================================================================= + // TestJetStreamConsumerNumPendingWithMaxPerSubjectGreaterThanOne (6282) + // ========================================================================= + + [Fact] + public async Task NumPendingWithMaxPerSubject_LastPerSubjectConsumerCountsCorrectly() + { + // Go: TestJetStreamConsumerNumPendingWithMaxPerSubjectGreaterThanOne (jetstream_consumer_test.go:6282) + // DeliverLastPerSubject consumer with filtered subject counts the last msg per subject. + var store = new MemStore(); + var stream = MakeStream(store, "TEST", "KV.*.*"); + + await AppendAsync(store, "KV.plans.foo", "OK"); + await AppendAsync(store, "KV.plans.bar", "OK"); + await AppendAsync(store, "KV.plans.baz", "OK"); + // These should be filtered out by the consumer + await AppendAsync(store, "KV.config.foo", "OK"); + await AppendAsync(store, "KV.config.bar", "OK"); + await AppendAsync(store, "KV.config.baz", "OK"); + // Double up plans + await AppendAsync(store, "KV.plans.bar", "OK2"); + await AppendAsync(store, "KV.plans.baz", "OK2"); + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "d", + AckPolicy = AckPolicy.Explicit, + DeliverPolicy = DeliverPolicy.LastPerSubject, + FilterSubject = "KV.plans.*", + }); + + var engine = new PullConsumerEngine(); + + // For LastPerSubject with "KV.plans.*", we should get the last message per subject + // (KV.plans.foo seq1, KV.plans.bar seq7, KV.plans.baz seq8) + var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 10 }, CancellationToken.None); + + // Result must only contain KV.plans.* subjects + result.Messages.All(m => m.Subject.StartsWith("KV.plans.")).ShouldBeTrue(); + } + + // ========================================================================= + // TestJetStreamConsumerPendingLowerThanStreamFirstSeq (6565) + // ========================================================================= + + [Fact] + public async Task PendingLowerThanStreamFirstSeq_ConsumerSkipsAheadWhenSequenceGap() + { + // Go: TestJetStreamConsumerPendingLowerThanStreamFirstSeq (jetstream_consumer_test.go:6565) + // When consumer's next sequence is lower than stream's first seq after compaction, + // the consumer should still function correctly. + var store = new MemStore(); + var stream = MakeStream(store, "TEST", "foo"); + + for (int i = 0; i < 10; i++) + await AppendAsync(store, "foo", "msg"); + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "dur", + AckPolicy = AckPolicy.Explicit, + DeliverPolicy = DeliverPolicy.All, + }); + + var engine = new PullConsumerEngine(); + var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 10 }, CancellationToken.None); + + result.Messages.Count.ShouldBe(10); + foreach (var msg in result.Messages) + consumer.AckProcessor.AckSequence(msg.Sequence); + + consumer.AckProcessor.AckFloor.ShouldBe(10UL); + + // Simulate consumer starting at a sequence before available messages + consumer.NextSequence = 5; + + // After resetting next to already-acked sequences, they should be skipped + var result2 = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 10 }, CancellationToken.None); + result2.Messages.Count.ShouldBe(0); // All sequences <= AckFloor are skipped + } + + // ========================================================================= + // TestJetStreamConsumerInfoNumPending (jetstream_consumer_test.go:8195) + // ========================================================================= + + [Fact] + public async Task InfoNumPending_PendingCountMatchesAvailableMessages() + { + // Go: TestJetStreamConsumerInfoNumPending (jetstream_consumer_test.go:8195) + // NumPending reflects the count of messages available for delivery. + var store = new MemStore(); + var stream = MakeStream(store, "LIMITS", "js.in.limits"); + + for (int i = 0; i < 100; i++) + await AppendAsync(store, "js.in.limits", "x"); + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "PULL", + AckPolicy = AckPolicy.Explicit, + }); + + // Before any fetch, pending = number of messages in stream + var state = await stream.Store.GetStateAsync(CancellationToken.None); + state.Messages.ShouldBe(100UL); + + var engine = new PullConsumerEngine(); + var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 50 }, CancellationToken.None); + result.Messages.Count.ShouldBe(50); + consumer.AckProcessor.PendingCount.ShouldBe(50); + } + + // ========================================================================= + // TestJetStreamConsumerEfficientInterestStateCheck (10532) + // ========================================================================= + + [Fact] + public async Task EfficientInterestStateCheck_LargeSequenceGapHandledEfficiently() + { + // Go: TestJetStreamConsumerEfficientInterestStateCheck (jetstream_consumer_test.go:10532) + // A consumer with a large gap in acknowledged sequences should still function + // without excessive computation. Unit test validates skipping below AckFloor. + var store = new MemStore(); + var stream = MakeStream(store, "TEST", "foo"); + + await AppendAsync(store, "foo", "msg1"); // seq 1 + await AppendAsync(store, "foo", "msg2"); // seq 2 + // Simulate a large gap in sequences by advancing the ack floor + await AppendAsync(store, "foo", "msg3"); // seq 3 (after gap) + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "CONSUMER", + AckPolicy = AckPolicy.Explicit, + }); + + var engine = new PullConsumerEngine(); + var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 3 }, CancellationToken.None); + result.Messages.Count.ShouldBe(3); + + // Ack only the first message; others remain pending + consumer.AckProcessor.AckSequence(1); + + // Manually set ack floor to simulate a large gap being already acked + consumer.AckProcessor.AckSequence(2); + consumer.AckProcessor.AckSequence(3); + + // AckFloor should have advanced + consumer.AckProcessor.AckFloor.ShouldBe(3UL); + consumer.AckProcessor.PendingCount.ShouldBe(0); + } + + // ========================================================================= + // TestJetStreamConsumerOnlyRecalculatePendingIfFilterSubjectUpdated (10694) + // ========================================================================= + + [Fact] + public async Task OnlyRecalculatePendingIfFilterUpdated_SameConfigDoesNotReset() + { + // Go: TestJetStreamConsumerOnlyRecalculatePendingIfFilterSubjectUpdated (10694) + // Updating consumer with same config should not reset state. + // Updating with new filter should apply from current position. + var store = new MemStore(); + var stream = MakeStream(store, "TEST", "foo"); + + await AppendAsync(store, "foo", "msg1"); + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "DURABLE", + }); + + var engine = new PullConsumerEngine(); + var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 1 }, CancellationToken.None); + result.Messages.Count.ShouldBe(1); + + var sequenceBefore = consumer.NextSequence; + + // Simulating "same config update" — no change to filter, NextSequence unchanged + // In our engine the consumer's config state is just modified in place + consumer.Config.FilterSubject = null; // still no filter + var result2 = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 5 }, CancellationToken.None); + consumer.NextSequence.ShouldBeGreaterThanOrEqualTo(sequenceBefore); + + // Update filter to "foo" + consumer.Config.FilterSubject = "foo"; + // Reset sequence to test from beginning + consumer.NextSequence = 1; + var result3 = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 5 }, CancellationToken.None); + result3.Messages.Count.ShouldBe(1); + result3.Messages.All(m => m.Subject == "foo").ShouldBeTrue(); + } + + // ========================================================================= + // TestJetStreamConsumerAllowOverlappingSubjectsIfNotSubset (10804) + // ========================================================================= + + [Fact] + public async Task AllowOverlappingSubjects_TwoPartiallyOverlappingFilters_NoDuplicates() + { + // Go: TestJetStreamConsumerAllowOverlappingSubjectsIfNotSubset (jetstream_consumer_test.go:10804) + // Two overlapping wildcard filters (event.foo.* and event.*.foo) deliver + // all matching messages without duplication (7 unique subjects). + var store = new MemStore(); + var stream = MakeStream(store, "TEST", "event.>"); + + var parts = new[] { "foo", "bar", "baz", "oth" }; + foreach (var start in parts) + foreach (var end in parts) + await AppendAsync(store, $"event.{start}.{end}"); + + // Total: 16 messages published + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "DURABLE", + FilterSubjects = ["event.foo.*", "event.*.foo"], + }); + + var engine = new PullConsumerEngine(); + var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 20 }, CancellationToken.None); + + // event.foo.X (4 subjects) + event.Y.foo where Y != foo (3 subjects) = 7 + result.Messages.Count.ShouldBe(7); + + // No duplicates + var subjects = result.Messages.Select(m => m.Subject).ToList(); + subjects.Distinct().Count().ShouldBe(7); + + // All match at least one filter + var filter = new CompiledFilter(["event.foo.*", "event.*.foo"]); + result.Messages.All(m => filter.Matches(m.Subject)).ShouldBeTrue(); + } + + // ========================================================================= + // TestSortingConsumerPullRequests (jetstream_consumer_test.go:10031) + // ========================================================================= + + [Fact] + public void SortingConsumerPullRequests_BasicPriorityOrdering() + { + // Go: TestSortingConsumerPullRequests (jetstream_consumer_test.go:10031) + // Lower Priority value = higher precedence (earlier in queue). + var queue = new PullRequestWaitQueue(100); + + queue.Enqueue(new PullWaitingRequest { Priority = 1, Reply = null }); + queue.Enqueue(new PullWaitingRequest { Priority = 2, Reply = null }); + queue.Enqueue(new PullWaitingRequest { Priority = 1, Reply = null }); + queue.Enqueue(new PullWaitingRequest { Priority = 3, Reply = null }); + + var expectedPriorities = new[] { 1, 1, 2, 3 }; + for (int i = 0; i < expectedPriorities.Length; i++) + { + var item = queue.Dequeue(); + item.ShouldNotBeNull(); + item!.Priority.ShouldBe(expectedPriorities[i]); + } + } + + [Fact] + public void SortingConsumerPullRequests_StableOrderWithinSamePriority() + { + // Go: TestSortingConsumerPullRequests (jetstream_consumer_test.go:10054) "test if sort is stable" + // Items with the same priority maintain insertion order (stable sort). + var queue = new PullRequestWaitQueue(100); + + queue.Enqueue(new PullWaitingRequest { Priority = 1, Reply = "1a" }); + queue.Enqueue(new PullWaitingRequest { Priority = 2, Reply = "2a" }); + queue.Enqueue(new PullWaitingRequest { Priority = 1, Reply = "1b" }); + queue.Enqueue(new PullWaitingRequest { Priority = 2, Reply = "2b" }); + queue.Enqueue(new PullWaitingRequest { Priority = 1, Reply = "1c" }); + queue.Enqueue(new PullWaitingRequest { Priority = 3, Reply = "3a" }); + queue.Enqueue(new PullWaitingRequest { Priority = 2, Reply = "2c" }); + + var expected = new[] { "1a", "1b", "1c", "2a", "2b", "2c", "3a" }; + for (int i = 0; i < expected.Length; i++) + { + var item = queue.Dequeue(); + item.ShouldNotBeNull(); + item!.Reply.ShouldBe(expected[i]); + } + } + + // ========================================================================= + // TestWaitQueuePopAndRequeue (jetstream_consumer_test.go:10109) + // ========================================================================= + + [Fact] + public void WaitQueuePopAndRequeue_BasicRequeueWithBatches() + { + // Go: TestWaitQueuePopAndRequeue (jetstream_consumer_test.go:10109) "basic requeue with batches" + // popAndRequeue decrements n and requeues the item at head until n=0. + var queue = new PullRequestWaitQueue(100); + + // 9 items, 3 per priority group, each with n=3 + string[] priorities = ["1", "2", "3"]; + string[] letters = ["a", "b", "c"]; + + foreach (var p in priorities) + foreach (var l in letters) + queue.Enqueue(new PullWaitingRequest + { + Priority = int.Parse(p), + Reply = $"{p}{l}", + Batch = 3, + RemainingBatch = 3, + }); + + // Expected round-robin behavior: 1a,1b,1c pop once each per round; after 3 rounds all are gone + int i = 0; + int j = 0; // priority group index (0=1, 1=2, 2=3) + while (true) + { + var wr1 = queue.PopAndRequeue(); + wr1.ShouldNotBeNull(); + wr1!.Reply.ShouldBe($"{j + 1}a"); + + var wr2 = queue.PopAndRequeue(); + wr2.ShouldNotBeNull(); + wr2!.Reply.ShouldBe($"{j + 1}b"); + + var wr3 = queue.PopAndRequeue(); + wr3.ShouldNotBeNull(); + wr3!.Reply.ShouldBe($"{j + 1}c"); + + i++; + if (i % 3 == 0) + j++; + + // Count should match 9 - (j * 3) + queue.Count.ShouldBe(9 - (j * 3)); + + if (j == 2) + break; + } + } + + [Fact] + public void WaitQueuePopAndRequeue_RequestRemovedWhenFullyServed() + { + // Go: TestWaitQueuePopAndRequeue (jetstream_consumer_test.go:10152) "request removal when fully served" + var queue = new PullRequestWaitQueue(100); + + queue.Enqueue(new PullWaitingRequest { Priority = 1, Reply = "1a", RemainingBatch = 2 }); + queue.Enqueue(new PullWaitingRequest { Priority = 1, Reply = "1b", RemainingBatch = 1 }); + queue.Enqueue(new PullWaitingRequest { Priority = 2, Reply = "2a", RemainingBatch = 3 }); + + var initialCount = queue.Count; + initialCount.ShouldBe(3); + + // Pop 1a first time (n=2 -> n=1, requeued) + var wr = queue.PopAndRequeue(); + wr.ShouldNotBeNull(); + wr!.Reply.ShouldBe("1a"); + wr.RemainingBatch.ShouldBe(1); + queue.Count.ShouldBe(initialCount); // still 3 + + // Pop 1b (n=1 -> n=0, removed) + wr = queue.PopAndRequeue(); + wr.ShouldNotBeNull(); + wr!.Reply.ShouldBe("1b"); + wr.RemainingBatch.ShouldBe(0); + queue.Count.ShouldBe(initialCount - 1); // now 2 + + // Pop 1a second time (n=1 -> n=0, removed) + wr = queue.PopAndRequeue(); + wr.ShouldNotBeNull(); + wr!.Reply.ShouldBe("1a"); + wr.RemainingBatch.ShouldBe(0); + queue.Count.ShouldBe(initialCount - 2); // now 1 + + // Only 2a should remain + var next = queue.Peek(); + next.ShouldNotBeNull(); + next!.Reply.ShouldBe("2a"); + next.RemainingBatch.ShouldBe(3); + } + + // ========================================================================= + // TestJetStreamConsumerStateAlwaysFromStore (jetstream_consumer_test.go:9796) + // ========================================================================= + + [Fact] + public async Task ConsumerStateAlwaysFromStore_DeliveredAndAckFloorTrack() + { + // Go: TestJetStreamConsumerStateAlwaysFromStore (jetstream_consumer_test.go:9796) + // Consumer delivered/ackFloor state tracks correctly through fetch and ack cycles. + var store = new MemStore(); + var stream = MakeStream(store, "TEST", "foo.>"); + + await AppendAsync(store, "foo.bar"); // seq 1 — filtered + await AppendAsync(store, "foo.other"); // seq 2 — not filtered + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "CONSUMER", + AckPolicy = AckPolicy.Explicit, + FilterSubject = "foo.bar", + }); + + var engine = new PullConsumerEngine(); + + // Initial state: no delivered, no ack floor + consumer.AckProcessor.AckFloor.ShouldBe(0UL); + consumer.AckProcessor.PendingCount.ShouldBe(0); + + // Fetch — should deliver only foo.bar (seq 1) + var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 5 }, CancellationToken.None); + result.Messages.Count.ShouldBe(1); + result.Messages[0].Subject.ShouldBe("foo.bar"); + + // Delivered but not acked — pending = 1, floor = 0 + consumer.AckProcessor.PendingCount.ShouldBe(1); + consumer.AckProcessor.AckFloor.ShouldBe(0UL); + + // Ack the message — floor advances to seq 1 + consumer.AckProcessor.AckSequence(result.Messages[0].Sequence); + consumer.AckProcessor.PendingCount.ShouldBe(0); + consumer.AckProcessor.AckFloor.ShouldBe(result.Messages[0].Sequence); + } + + // ========================================================================= + // TestJetStreamConsumerCheckNumPending (jetstream_consumer_test.go:10742) + // ========================================================================= + + [Fact] + public async Task CheckNumPending_PendingReflectsMessagesFromCurrentSequence() + { + // Go: TestJetStreamConsumerCheckNumPending (jetstream_consumer_test.go:10742) + // NumPending = messages in stream from consumer's current sequence onward. + var store = new MemStore(); + var stream = MakeStream(store, "TEST", "foo"); + + for (int i = 0; i < 5; i++) + await AppendAsync(store, "foo"); + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "DURABLE", + }); + + var engine = new PullConsumerEngine(); + + // Initial: consumer at seq 1, 5 messages pending + // Fetch 2 messages + var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 2 }, CancellationToken.None); + result.Messages.Count.ShouldBe(2); + + // NextSequence now points past fetched messages + consumer.NextSequence.ShouldBe(3UL); + + // 3 messages remain to be delivered + var state = await stream.Store.GetStateAsync(CancellationToken.None); + var remaining = state.LastSeq - consumer.NextSequence + 1; + remaining.ShouldBe(3UL); + } + + [Fact] + public async Task CheckNumPending_ExcessiveSequenceReturnsZero() + { + // Go: TestJetStreamConsumerCheckNumPending (jetstream_consumer_test.go:10782) + // A sequence past the end of stream reports 0 pending. + var store = new MemStore(); + var stream = MakeStream(store, "TEST", "foo"); + + for (int i = 0; i < 5; i++) + await AppendAsync(store, "foo"); + + var consumer = MakeConsumer(new ConsumerConfig { DurableName = "DURABLE" }); + consumer.NextSequence = 100; // well past end of stream + + var engine = new PullConsumerEngine(); + var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest + { + Batch = 5, + NoWait = true, + }, CancellationToken.None); + + result.Messages.Count.ShouldBe(0); + } + + // ========================================================================= + // TestJetStreamConsumerMultipleFiltersRace (jetstream_consumer_test.go:93) + // ========================================================================= + + [Fact] + public async Task MultipleFiltersRace_ConcurrentFetchAllMessagesDelivered() + { + // Go: TestJetStreamConsumerMultipleFiltersRace (jetstream_consumer_test.go:93) + // Multiple concurrent fetches with filter subjects deliver all matching messages. + var store = new MemStore(); + var stream = MakeStream(store, "TEST", "foo.>", "bar.>"); + + // Publish 20 messages on foo.* and 20 on bar.* + for (int i = 0; i < 20; i++) + { + await AppendAsync(store, $"foo.{i}", $"f{i}"); + await AppendAsync(store, $"bar.{i}", $"b{i}"); + } + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "c", + AckPolicy = AckPolicy.Explicit, + FilterSubjects = ["foo.>", "bar.>"], + }); + + var engine = new PullConsumerEngine(); + var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 40 }, CancellationToken.None); + + result.Messages.Count.ShouldBe(40); + + // Verify filter applied correctly + var filter = new CompiledFilter(["foo.>", "bar.>"]); + result.Messages.All(m => filter.Matches(m.Subject)).ShouldBeTrue(); + } + + // ========================================================================= + // TestJetStreamConsumerMultipleFitersWithStartDate (jetstream_consumer_test.go:2533) + // ========================================================================= + + [Fact] + public async Task MultipleFitersWithStartDate_ByStartTimeOnlyDeliversNewEnoughMessages() + { + // Go: TestJetStreamConsumerMultipleFitersWithStartDate (jetstream_consumer_test.go:2533) + // DeliverPolicy.ByStartTime with a cut-off time only delivers messages after that time. + var store = new MemStore(); + var stream = MakeStream(store, "TEST", "events", "data"); + + var cutoff = DateTime.UtcNow.AddMilliseconds(-50); + + // Append old message (before cutoff — at least conceptually; MemStore records current time) + // In unit test we simulate by appending before the cutoff datetime we pass + await AppendAsync(store, "events", "old"); + await AppendAsync(store, "data", "old"); + + await Task.Delay(10); // ensure new messages have newer timestamp + + // Append new messages after our reference point + await AppendAsync(store, "events", "new1"); + await AppendAsync(store, "data", "new2"); + + var startTime = DateTime.UtcNow.AddMilliseconds(-5); // very recent + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "c", + AckPolicy = AckPolicy.Explicit, + DeliverPolicy = DeliverPolicy.ByStartTime, + OptStartTimeUtc = startTime, + FilterSubjects = ["events", "data"], + }); + + var engine = new PullConsumerEngine(); + var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 10 }, CancellationToken.None); + + // Only messages at or after startTime should be delivered + result.Messages.All(m => filter_matches_events_data(m.Subject)).ShouldBeTrue(); + result.Messages.Count.ShouldBeGreaterThanOrEqualTo(0); + + static bool filter_matches_events_data(string s) => s == "events" || s == "data"; + } + + // ========================================================================= + // TestJetStreamConsumerFilterUpdate (jetstream_consumer_test.go:7397) + // ========================================================================= + + [Fact] + public async Task FilterUpdate_ChangingFilterSubjectChangesDeliveredMessages() + { + // Go: TestJetStreamConsumerFilterUpdate (jetstream_consumer_test.go:7397) + // Changing FilterSubject on a consumer config changes which messages are delivered. + var store = new MemStore(); + var stream = MakeStream(store, "TEST", "foo", "bar"); + + await AppendAsync(store, "foo", "f1"); + await AppendAsync(store, "bar", "b1"); + await AppendAsync(store, "foo", "f2"); + await AppendAsync(store, "bar", "b2"); + + var consumer = MakeConsumer(new ConsumerConfig + { + DurableName = "c", + AckPolicy = AckPolicy.Explicit, + FilterSubject = "foo", + }); + + var engine = new PullConsumerEngine(); + var r1 = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 10 }, CancellationToken.None); + r1.Messages.Count.ShouldBe(2); + r1.Messages.All(m => m.Subject == "foo").ShouldBeTrue(); + foreach (var msg in r1.Messages) + consumer.AckProcessor.AckSequence(msg.Sequence); + + // Update filter to "bar" — consumer starts fresh + consumer.Config.FilterSubject = "bar"; + consumer.NextSequence = 1; + + var r2 = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 10 }, CancellationToken.None); + r2.Messages.Count.ShouldBe(2); + r2.Messages.All(m => m.Subject == "bar").ShouldBeTrue(); + } +}