// Go reference: golang/nats-server/server/jetstream_consumer_test.go // Ports Go pull consumer queue, state, and filter tests to .NET unit tests. // Tests that require a full NATS networking server are marked [Fact(Skip=...)] // with the reason "Requires full NATS server infrastructure". using System.Text; using NATS.Server.JetStream; using NATS.Server.JetStream.Consumers; using NATS.Server.JetStream.Models; using NATS.Server.JetStream.Storage; using NATS.Server.Subscriptions; namespace NATS.Server.Tests.JetStream.Consumers; /// /// Go parity tests ported from jetstream_consumer_test.go covering pull consumer /// queue behavior, MaxAckPending enforcement, filter semantics, pending count /// calculations, and pull request ordering. /// public class ConsumerPullQueueTests { // ========================================================================= // Helpers // ========================================================================= private static StreamHandle MakeStream(MemStore store, string name = "TEST", params string[] subjects) { var config = new StreamConfig { Name = name, Subjects = subjects.Length > 0 ? [..subjects] : ["test.>"], }; return new StreamHandle(config, store); } private static ConsumerHandle MakeConsumer(ConsumerConfig? config = null, string stream = "TEST") => new(stream, config ?? new ConsumerConfig { DurableName = "C1", AckPolicy = AckPolicy.Explicit, }); private static async Task AppendAsync(MemStore store, string subject, string? payload = null) => await store.AppendAsync(subject, payload is not null ? Encoding.UTF8.GetBytes(payload) : ReadOnlyMemory.Empty, CancellationToken.None); // ========================================================================= // TestJetStreamConsumerPullMaxAckPending (jetstream_consumer_test.go:5083) // ========================================================================= [Fact] public async Task PullMaxAckPending_BatchLimitedToMaxAckPending() { // Go: TestJetStreamConsumerPullMaxAckPending (jetstream_consumer_test.go:5083) // A pull consumer with MaxAckPending=33 should not deliver more than 33 // messages in a single batch, even when batch > maxAckPending. var store = new MemStore(); var stream = MakeStream(store); for (int i = 0; i < 100; i++) await AppendAsync(store, "test.bar", $"MSG: {i + 1}"); const int maxAckPending = 33; var consumer = MakeConsumer(new ConsumerConfig { DurableName = "d22", AckPolicy = AckPolicy.Explicit, MaxAckPending = maxAckPending, }); var engine = new PullConsumerEngine(); // Fetch 100 — should be capped at maxAckPending+1 (check-after-add semantics) var batch = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 100 }, CancellationToken.None); batch.Messages.Count.ShouldBeLessThanOrEqualTo(maxAckPending + 1); batch.Messages.Count.ShouldBeGreaterThan(0); // After acking all, a subsequent fetch should again be allowed up to maxAckPending foreach (var msg in batch.Messages) consumer.AckProcessor.AckSequence(msg.Sequence); var batch2 = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = maxAckPending }, CancellationToken.None); batch2.Messages.Count.ShouldBeLessThanOrEqualTo(maxAckPending + 1); batch2.Messages.Count.ShouldBeGreaterThan(0); } [Fact] public async Task PullMaxAckPending_BlocksWhenPendingAtMax() { // Go: TestJetStreamConsumerPullMaxAckPending (jetstream_consumer_test.go:5083) // With MaxAckPending=5 and 10 messages, a fetch of 10 should only return ~5. var store = new MemStore(); var stream = MakeStream(store); for (int i = 0; i < 10; i++) await AppendAsync(store, "test.foo", $"msg-{i}"); var consumer = MakeConsumer(new ConsumerConfig { DurableName = "d", AckPolicy = AckPolicy.Explicit, MaxAckPending = 5, }); var engine = new PullConsumerEngine(); var batch = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 10 }, CancellationToken.None); // Should stop at MaxAckPending+1 due to check-after-register batch.Messages.Count.ShouldBeLessThanOrEqualTo(6); batch.Messages.Count.ShouldBeGreaterThan(0); } // ========================================================================= // TestJetStreamConsumerPullConsumerFIFO (jetstream_consumer_test.go:6422) // ========================================================================= [Fact] public async Task PullConsumerFIFO_MessagesDeliveredInSequenceOrder() { // Go: TestJetStreamConsumerPullConsumerFIFO (jetstream_consumer_test.go:6422) // Messages must be delivered in FIFO (sequence) order. var store = new MemStore(); var stream = MakeStream(store, "T", "T"); for (int i = 1; i <= 10; i++) await AppendAsync(store, "T", $"msg-{i}"); var consumer = MakeConsumer(new ConsumerConfig { DurableName = "d", AckPolicy = AckPolicy.Explicit, }); var engine = new PullConsumerEngine(); var batch = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 10 }, CancellationToken.None); batch.Messages.Count.ShouldBe(10); for (int i = 0; i < batch.Messages.Count; i++) { var payload = Encoding.UTF8.GetString(batch.Messages[i].Payload.Span); payload.ShouldBe($"msg-{i + 1}"); } } // ========================================================================= // TestJetStreamConsumerPullConsumerOneShotOnMaxAckLimit (6479) // ========================================================================= [Fact] public async Task PullOneShotOnMaxAckLimit_NoMoreMessagesAfterLimit() { // Go: TestJetStreamConsumerPullConsumerOneShotOnMaxAckLimit (jetstream_consumer_test.go:6479) // When MaxAckPending is reached, subsequent fetches should return empty. var store = new MemStore(); var stream = MakeStream(store, "T", "T"); for (int i = 0; i < 10; i++) await AppendAsync(store, "T", "OK"); var consumer = MakeConsumer(new ConsumerConfig { DurableName = "d", AckPolicy = AckPolicy.Explicit, MaxAckPending = 3, }); var engine = new PullConsumerEngine(); // First fetch fills up ack pending var batch1 = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 5 }, CancellationToken.None); batch1.Messages.Count.ShouldBeLessThanOrEqualTo(4); batch1.Messages.Count.ShouldBeGreaterThan(0); // Pending is now full — further fetch returns empty var batch2 = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 5 }, CancellationToken.None); batch2.Messages.Count.ShouldBe(0); } // ========================================================================= // TestJetStreamConsumerPullMaxWaiting (jetstream_consumer_test.go:8539) // ========================================================================= [Fact] public void PullMaxWaiting_QueueCapacityIsRespected() { // Go: TestJetStreamConsumerPullMaxWaiting (jetstream_consumer_test.go:8539) // PullRequestWaitQueue with maxWaiting=3 should reject the 4th enqueue. var queue = new PullRequestWaitQueue(maxSize: 3); queue.Enqueue(new PullWaitingRequest { Batch = 1, Reply = "r1" }).ShouldBeTrue(); queue.Enqueue(new PullWaitingRequest { Batch = 1, Reply = "r2" }).ShouldBeTrue(); queue.Enqueue(new PullWaitingRequest { Batch = 1, Reply = "r3" }).ShouldBeTrue(); queue.Enqueue(new PullWaitingRequest { Batch = 1, Reply = "r4" }).ShouldBeFalse(); queue.Count.ShouldBe(3); } [Fact] public void PullMaxWaiting_DefaultQueueIsUnbounded() { // Go: TestJetStreamConsumerPullMaxWaitingOfOne (jetstream_consumer_test.go:8446) // Default queue (no max) accepts many requests. var queue = new PullRequestWaitQueue(); for (int i = 0; i < 100; i++) queue.Enqueue(new PullWaitingRequest { Batch = 1, Reply = $"r{i}" }).ShouldBeTrue(); queue.Count.ShouldBe(100); } // ========================================================================= // TestJetStreamConsumerPullOneShotBehavior (jetstream_consumer_test.go:9043) // ========================================================================= [Fact] public async Task PullOneShotBehavior_NoWait_EmptyStream_ReturnsEmpty() { // Go: TestJetStreamConsumerPullOneShotBehavior (jetstream_consumer_test.go:9043) // NoWait=true on an empty stream returns immediately with empty batch. var store = new MemStore(); var stream = MakeStream(store, "TEST", "foo"); var consumer = MakeConsumer(new ConsumerConfig { DurableName = "dlc", AckPolicy = AckPolicy.Explicit, FilterSubject = "foo", }); var engine = new PullConsumerEngine(); var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 1, NoWait = true, }, CancellationToken.None); result.Messages.Count.ShouldBe(0); result.TimedOut.ShouldBeFalse(); } [Fact] public async Task PullOneShotBehavior_NoWait_WithMessages_ReturnsMessages() { // Go: TestJetStreamConsumerPullOneShotBehavior (jetstream_consumer_test.go:9043) // NoWait=true with available messages returns them immediately. var store = new MemStore(); var stream = MakeStream(store, "TEST", "foo"); for (int i = 0; i < 5; i++) await AppendAsync(store, "foo", "HELLO"); var consumer = MakeConsumer(new ConsumerConfig { DurableName = "dlc", AckPolicy = AckPolicy.Explicit, FilterSubject = "foo", }); var engine = new PullConsumerEngine(); var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 3, NoWait = true, }, CancellationToken.None); result.Messages.Count.ShouldBe(3); } // ========================================================================= // TestJetStreamConsumerPullTimeout (jetstream_consumer_test.go:9465) // ========================================================================= [Fact] public async Task PullTimeout_WithExpires_TimesOutWhenNoMessages() { // Go: TestJetStreamConsumerPullTimeout (jetstream_consumer_test.go:9465) // A pull request with Expires and no messages returns timed out. var store = new MemStore(); var stream = MakeStream(store); var consumer = MakeConsumer(new ConsumerConfig { DurableName = "pr", AckPolicy = AckPolicy.Explicit, }); var engine = new PullConsumerEngine(); var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 200, ExpiresMs = 50, }, CancellationToken.None); result.TimedOut.ShouldBeTrue(); result.Messages.Count.ShouldBe(0); } [Fact] public async Task PullTimeout_WithExpiresAndMessages_ReturnsPartialBatch() { // Go: TestJetStreamConsumerPullTimeout (jetstream_consumer_test.go:9465) // When messages trickle in, partial batch is returned on timeout. var store = new MemStore(); var stream = MakeStream(store); await AppendAsync(store, "test.a", "one"); await AppendAsync(store, "test.b", "two"); var consumer = MakeConsumer(new ConsumerConfig { DurableName = "pr", AckPolicy = AckPolicy.Explicit, }); var engine = new PullConsumerEngine(); var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 100, ExpiresMs = 100, }, CancellationToken.None); // Gets 2 available messages, then times out waiting for more result.Messages.Count.ShouldBe(2); } // ========================================================================= // TestJetStreamConsumerPullMaxBytes (jetstream_consumer_test.go:9522) // ========================================================================= [Fact] public async Task PullMaxBytes_LimitsDeliveredMessagesByByteSize() { // Go: TestJetStreamConsumerPullMaxBytes (jetstream_consumer_test.go:9522) // MaxBytes caps the total payload bytes returned per pull request. var store = new MemStore(); var stream = MakeStream(store, "TEST", "TEST"); // Each message ~100 bytes payload + subject var payload = new string('Z', 100); for (int i = 0; i < 10; i++) await AppendAsync(store, "TEST", payload); var consumer = MakeConsumer(new ConsumerConfig { DurableName = "pr", AckPolicy = AckPolicy.Explicit, }); var engine = new PullConsumerEngine(); // MaxBytes = 250 → should deliver at most 2 messages (each ~104 bytes total) var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 10, MaxBytes = 250, NoWait = true, }, CancellationToken.None); result.Messages.Count.ShouldBeLessThanOrEqualTo(2); result.Messages.Count.ShouldBeGreaterThanOrEqualTo(1); } [Fact] public async Task PullMaxBytes_BatchLimitOverridesWhenSmaller() { // Go: TestJetStreamConsumerPullMaxBytes (jetstream_consumer_test.go:9579) // When batch=1 and MaxBytes is very large, batch limit controls. var store = new MemStore(); var stream = MakeStream(store, "TEST", "TEST"); var payload = new string('Z', 1000); for (int i = 0; i < 5; i++) await AppendAsync(store, "TEST", payload); var consumer = MakeConsumer(new ConsumerConfig { DurableName = "pr", AckPolicy = AckPolicy.Explicit, }); var engine = new PullConsumerEngine(); var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 1, MaxBytes = 10_000_000, NoWait = true, }, CancellationToken.None); result.Messages.Count.ShouldBe(1); } [Fact] public async Task PullMaxBytes_MultipleMessagesWithinBudget() { // Go: TestJetStreamConsumerPullMaxBytes (jetstream_consumer_test.go:9599) // batch=5 with large MaxBytes returns 5 messages. var store = new MemStore(); var stream = MakeStream(store, "TEST", "TEST"); var payload = new string('Z', 50); for (int i = 0; i < 10; i++) await AppendAsync(store, "TEST", payload); var consumer = MakeConsumer(new ConsumerConfig { DurableName = "pr", AckPolicy = AckPolicy.Explicit, }); var engine = new PullConsumerEngine(); var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 5, MaxBytes = 10_000_000, NoWait = true, }, CancellationToken.None); result.Messages.Count.ShouldBe(5); } [Fact] public async Task PullMaxBytes_LargerBatchLimitedByMaxBytes() { // Go: TestJetStreamConsumerPullMaxBytes (jetstream_consumer_test.go:9618) // Large batch limited to MaxBytes/messageSize messages. var store = new MemStore(); var stream = MakeStream(store, "TEST", "TEST"); // msz~=100_000, dsz=99_950; MaxBytes = msz * 4 allows ~4 messages var payload = new string('Z', 99_950); for (int i = 0; i < 20; i++) await AppendAsync(store, "TEST", payload); var consumer = MakeConsumer(new ConsumerConfig { DurableName = "pr", AckPolicy = AckPolicy.Explicit, }); var engine = new PullConsumerEngine(); var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 1000, MaxBytes = 100_000 * 4, NoWait = true, }, CancellationToken.None); result.Messages.Count.ShouldBeLessThanOrEqualTo(5); result.Messages.Count.ShouldBeGreaterThanOrEqualTo(1); } // ========================================================================= // TestJetStreamConsumerPullRemoveInterest (jetstream_consumer_test.go:8367) // ========================================================================= [Fact] public async Task PullRemoveInterest_ConsumerWithFilterSkipsNonMatchingMessages() { // Go: TestJetStreamConsumerPullRemoveInterest (jetstream_consumer_test.go:8367) // Consumer with filter subject only receives matching messages. var store = new MemStore(); var stream = MakeStream(store, "MYS", "MYS"); await AppendAsync(store, "MYS", "unrelated"); // seq 1 await AppendAsync(store, "MYS", "unrelated"); // seq 2 var consumer = MakeConsumer(new ConsumerConfig { DurableName = "worker", AckPolicy = AckPolicy.Explicit, FilterSubject = "FILTERED", // doesn't match MYS }); var engine = new PullConsumerEngine(); var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 5, NoWait = true, }, CancellationToken.None); result.Messages.Count.ShouldBe(0); } // ========================================================================= // TestJetStreamConsumerPullDelayedFirstPullWithReplayOriginal (3361) // ========================================================================= [Fact] public async Task PullDelayedFirstPullWithReplayOriginal_DeliversMessage() { // Go: TestJetStreamConsumerPullDelayedFirstPullWithReplayOriginal (jetstream_consumer_test.go:3361) // ReplayOriginal consumer with delayed first pull still delivers the message. var store = new MemStore(); var stream = MakeStream(store, "MY_WQ", "MY_WQ"); await AppendAsync(store, "MY_WQ", "Hello World!"); var consumer = MakeConsumer(new ConsumerConfig { DurableName = "d", AckPolicy = AckPolicy.Explicit, ReplayPolicy = ReplayPolicy.Original, }); var engine = new PullConsumerEngine(); // Simulate delay before pull (but keep it short for unit test) await Task.Delay(50); var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 1 }, CancellationToken.None); result.Messages.Count.ShouldBe(1); Encoding.UTF8.GetString(result.Messages[0].Payload.Span).ShouldBe("Hello World!"); } // ========================================================================= // TestJetStreamConsumerThreeFilters (jetstream_consumer_test.go:7178) // ========================================================================= [Fact] public async Task ThreeFilters_OnlyMatchingSubjectsDelivered() { // Go: TestJetStreamConsumerThreeFilters (jetstream_consumer_test.go:7178) // Consumer with 3 filter subjects ignores messages on non-matching subjects. var store = new MemStore(); var stream = MakeStream(store, "TEST", "events", "data", "other", "ignored"); await AppendAsync(store, "ignored", "100"); // seq 1 await AppendAsync(store, "events", "0"); // seq 2 await AppendAsync(store, "events", "1"); // seq 3 await AppendAsync(store, "data", "2"); // seq 4 await AppendAsync(store, "ignored", "100"); // seq 5 await AppendAsync(store, "data", "3"); // seq 6 await AppendAsync(store, "other", "4"); // seq 7 await AppendAsync(store, "data", "5"); // seq 8 await AppendAsync(store, "other", "6"); // seq 9 await AppendAsync(store, "data", "7"); // seq 10 await AppendAsync(store, "ignored", "100"); // seq 11 var consumer = MakeConsumer(new ConsumerConfig { DurableName = "multi", AckPolicy = AckPolicy.Explicit, FilterSubjects = ["events", "data", "other"], }); var engine = new PullConsumerEngine(); var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 8 }, CancellationToken.None); result.Messages.Count.ShouldBe(8); var payloads = result.Messages.Select(m => Encoding.UTF8.GetString(m.Payload.Span)).ToList(); payloads.ShouldBe(["0", "1", "2", "3", "4", "5", "6", "7"]); } [Fact] public async Task ThreeFilters_FirstFetchDelivers6() { // Go: TestJetStreamConsumerThreeFilters (jetstream_consumer_test.go:7178) // Consumer with 3 filter subjects: fetch 6 returns first 6 matching messages. var store = new MemStore(); var stream = MakeStream(store, "TEST", "events", "data", "other", "ignored"); await AppendAsync(store, "ignored", "100"); await AppendAsync(store, "events", "0"); await AppendAsync(store, "events", "1"); await AppendAsync(store, "data", "2"); await AppendAsync(store, "ignored", "100"); await AppendAsync(store, "data", "3"); await AppendAsync(store, "other", "4"); await AppendAsync(store, "data", "5"); await AppendAsync(store, "other", "6"); await AppendAsync(store, "data", "7"); await AppendAsync(store, "ignored", "100"); var consumer = MakeConsumer(new ConsumerConfig { DurableName = "multi", AckPolicy = AckPolicy.Explicit, FilterSubjects = ["events", "data", "other"], }); var engine = new PullConsumerEngine(); var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 6 }, CancellationToken.None); result.Messages.Count.ShouldBe(6); for (int i = 0; i < 6; i++) Encoding.UTF8.GetString(result.Messages[i].Payload.Span).ShouldBe(i.ToString()); // Ack all 6 foreach (var msg in result.Messages) consumer.AckProcessor.AckSequence(msg.Sequence); // Remaining 2 messages (indices 6 and 7) var result2 = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 5 }, CancellationToken.None); result2.Messages.Count.ShouldBe(2); } // ========================================================================= // TestJetStreamConsumerUpdateFilterSubjects (jetstream_consumer_test.go:7231) // ========================================================================= [Fact] public async Task UpdateFilterSubjects_NewFilterReceivesAdditionalSubject() { // Go: TestJetStreamConsumerUpdateFilterSubjects (jetstream_consumer_test.go:7231) // Consumer with ["events","data"] fetches those messages; updated filter // ["events","data","other"] on same consumer also fetches the "other" messages // when starting from beginning. var store = new MemStore(); var stream = MakeStream(store, "TEST", "events", "data", "other"); await AppendAsync(store, "events", "0"); // seq 1 await AppendAsync(store, "events", "1"); // seq 2 await AppendAsync(store, "data", "2"); // seq 3 await AppendAsync(store, "data", "3"); // seq 4 var consumer = MakeConsumer(new ConsumerConfig { DurableName = "multi", AckPolicy = AckPolicy.Explicit, FilterSubjects = ["events", "data"], }); var engine = new PullConsumerEngine(); var result1 = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 5 }, CancellationToken.None); result1.Messages.Count.ShouldBe(4); for (int i = 0; i < 4; i++) { Encoding.UTF8.GetString(result1.Messages[i].Payload.Span).ShouldBe(i.ToString()); consumer.AckProcessor.AckSequence(result1.Messages[i].Sequence); } // Add "other" messages after first batch await AppendAsync(store, "other", "4"); // seq 5 await AppendAsync(store, "data", "5"); // seq 6 // Update consumer filter to include "other" consumer.Config.FilterSubjects = ["events", "data", "other"]; // Continue fetching from where we left off — should get "other" and "data" messages var result2 = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 5 }, CancellationToken.None); result2.Messages.Count.ShouldBe(2); Encoding.UTF8.GetString(result2.Messages[0].Payload.Span).ShouldBe("4"); Encoding.UTF8.GetString(result2.Messages[1].Payload.Span).ShouldBe("5"); } // ========================================================================= // TestJetStreamConsumerIsFiltered (jetstream_consumer_test.go:7515) // ========================================================================= [Theory] [InlineData(new[] { "one" }, new[] { "one" }, false)] [InlineData(new[] { "one.>" }, new[] { "one.filter" }, true)] [InlineData(new[] { "multi", "foo", "bar.>" }, new[] { "multi", "bar.>", "foo" }, false)] [InlineData(new[] { "events", "data" }, new[] { "data" }, true)] [InlineData(new[] { "machines", "floors" }, new[] { "machines" }, true)] public void IsFiltered_CorrectlyDetectsWhetherConsumerFilters( string[] streamSubjects, string[] filterSubjects, bool expectedIsFiltered) { // Go: TestJetStreamConsumerIsFiltered (jetstream_consumer_test.go:7515) // A consumer is considered filtered if it doesn't receive all stream messages. var config = new ConsumerConfig { FilterSubjects = [..filterSubjects], }; var filter = CompiledFilter.FromConfig(config); // A consumer is NOT filtered if all stream subjects match the filter var allMatch = streamSubjects.All(s => filter.Matches(s)); var isFiltered = !allMatch; isFiltered.ShouldBe(expectedIsFiltered); } // ========================================================================= // TestJetStreamConsumerSingleFilterSubjectInFilterSubjects (11281) // ========================================================================= [Fact] public async Task SingleFilterSubjectInFilterSubjects_Works() { // Go: TestJetStreamConsumerSingleFilterSubjectInFilterSubjects (jetstream_consumer_test.go:11281) // FilterSubjects with a single entry behaves identically to FilterSubject. var store = new MemStore(); var stream = MakeStream(store, "TEST", "foo", "bar"); await AppendAsync(store, "foo", "match"); await AppendAsync(store, "bar", "skip"); await AppendAsync(store, "foo", "match2"); var consumerViaFilterSubjects = MakeConsumer(new ConsumerConfig { DurableName = "c1", FilterSubjects = ["foo"], }); var consumerViaFilterSubject = MakeConsumer(new ConsumerConfig { DurableName = "c2", FilterSubject = "foo", }); var engine = new PullConsumerEngine(); var r1 = await engine.FetchAsync(stream, consumerViaFilterSubjects, new PullFetchRequest { Batch = 10 }, CancellationToken.None); var r2 = await engine.FetchAsync(stream, consumerViaFilterSubject, new PullFetchRequest { Batch = 10 }, CancellationToken.None); r1.Messages.Count.ShouldBe(r2.Messages.Count); r1.Messages.Count.ShouldBe(2); r1.Messages.All(m => m.Subject == "foo").ShouldBeTrue(); r2.Messages.All(m => m.Subject == "foo").ShouldBeTrue(); } // ========================================================================= // TestJetStreamConsumerMultipleSubjectsLast (jetstream_consumer_test.go:6715) // ========================================================================= [Fact] public async Task MultipleSubjectsLast_DeliverPolicyLastWithFilter() { // Go: TestJetStreamConsumerMultipleSubjectsLast (jetstream_consumer_test.go:6715) // DeliverPolicy.Last with FilterSubjects = ["events","data"] starts at last // message of the stream and delivers from there. // When the last message in the stream doesn't match the filter, no messages are delivered. var store = new MemStore(); var stream = MakeStream(store, "name", "events", "data", "other"); await AppendAsync(store, "events", "1"); // seq 1 await AppendAsync(store, "data", "2"); // seq 2 await AppendAsync(store, "other", "3"); // seq 3 await AppendAsync(store, "events", "4"); // seq 4 await AppendAsync(store, "data", "5"); // seq 5 await AppendAsync(store, "data", "6"); // seq 6 var consumer = MakeConsumer(new ConsumerConfig { DurableName = "durable", AckPolicy = AckPolicy.Explicit, DeliverPolicy = DeliverPolicy.Last, FilterSubjects = ["events", "data"], }); var engine = new PullConsumerEngine(); var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 2 }, CancellationToken.None); // DeliverPolicy.Last resolves to state.LastSeq = 6 ("data" with payload "6") // which matches the filter — so it delivers that message. result.Messages.Count.ShouldBe(1); result.Messages[0].Subject.ShouldBe("data"); Encoding.UTF8.GetString(result.Messages[0].Payload.Span).ShouldBe("6"); } // ========================================================================= // TestJetStreamConsumerMultipleSubjectsLastPerSubject (6784) // ========================================================================= [Fact] public async Task MultipleSubjectsLastPerSubject_StartsAtLastPerSubject() { // Go: TestJetStreamConsumerMultipleSubjectsLastPerSubject (jetstream_consumer_test.go:6784) // DeliverPolicy.LastPerSubject with FilterSubjects starts at the last msg per subject. var store = new MemStore(); var stream = MakeStream(store, "name", "events.*", "data.>"); await AppendAsync(store, "events.1", "bad"); // seq 1 await AppendAsync(store, "events.1", "events.1"); // seq 2 — last for events.1 await AppendAsync(store, "data.1", "bad"); // seq 3 await AppendAsync(store, "data.1", "bad"); // seq 4 await AppendAsync(store, "data.1", "data.1"); // seq 5 — last for data.1 await AppendAsync(store, "events.2", "bad"); // seq 6 await AppendAsync(store, "events.2", "events.2"); // seq 7 — last for events.2 var consumer = MakeConsumer(new ConsumerConfig { DurableName = "durable", AckPolicy = AckPolicy.Explicit, DeliverPolicy = DeliverPolicy.LastPerSubject, FilterSubject = "events.1", }); var engine = new PullConsumerEngine(); var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 3 }, CancellationToken.None); result.Messages.Count.ShouldBeGreaterThan(0); // First message should be the last "events.1" result.Messages[0].Subject.ShouldBe("events.1"); Encoding.UTF8.GetString(result.Messages[0].Payload.Span).ShouldBe("events.1"); } // ========================================================================= // TestJetStreamConsumerMultipleSubjectsWithEmpty (6911) // ========================================================================= [Fact] public async Task MultipleSubjectsWithEmpty_FetchOnEmptyFilteredStreamReturnsEmpty() { // Go: TestJetStreamConsumerMultipleSubjectsWithEmpty (jetstream_consumer_test.go:6911) // Consumer with filter on non-existent subject returns empty. var store = new MemStore(); var stream = MakeStream(store, "TEST", "foo", "bar"); await AppendAsync(store, "foo", "1"); await AppendAsync(store, "bar", "2"); var consumer = MakeConsumer(new ConsumerConfig { DurableName = "c", FilterSubjects = ["baz"], // no messages on baz }); var engine = new PullConsumerEngine(); var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 5, NoWait = true, }, CancellationToken.None); result.Messages.Count.ShouldBe(0); } // ========================================================================= // TestJetStreamConsumerMultipleSubjectsAck (jetstream_consumer_test.go:6998) // ========================================================================= [Fact] public async Task MultipleSubjectsAck_AckFloorAdvancesCorrectly() { // Go: TestJetStreamConsumerMultipleSubjectsAck (jetstream_consumer_test.go:6998) // Acking messages from a multi-filter consumer advances the ack floor correctly. var store = new MemStore(); var stream = MakeStream(store, "TEST", "foo", "bar", "baz"); await AppendAsync(store, "foo", "f1"); // seq 1 await AppendAsync(store, "bar", "b1"); // seq 2 await AppendAsync(store, "baz", "z1"); // seq 3 — filtered await AppendAsync(store, "foo", "f2"); // seq 4 await AppendAsync(store, "bar", "b2"); // seq 5 var consumer = MakeConsumer(new ConsumerConfig { DurableName = "c", AckPolicy = AckPolicy.Explicit, FilterSubjects = ["foo", "bar"], }); var engine = new PullConsumerEngine(); var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 10 }, CancellationToken.None); result.Messages.Count.ShouldBe(4); // Ack all messages foreach (var msg in result.Messages) consumer.AckProcessor.AckSequence(msg.Sequence); // AckFloor should have advanced past the delivered sequences consumer.AckProcessor.AckFloor.ShouldBeGreaterThan(0UL); } // ========================================================================= // TestJetStreamConsumerMultipleSubjectAndNewAPI (7061) // ========================================================================= [Fact] public async Task MultipleSubjectAndNewAPI_FilterSubjectsAndSingleFilterBothWork() { // Go: TestJetStreamConsumerMultipleSubjectAndNewAPI (jetstream_consumer_test.go:7061) // Both FilterSubject and FilterSubjects produce the same results for single filter. var store = new MemStore(); var stream = MakeStream(store, "TEST", "foo.>", "bar"); await AppendAsync(store, "foo.1", "a"); await AppendAsync(store, "bar", "b"); await AppendAsync(store, "foo.2", "c"); var consumerA = MakeConsumer(new ConsumerConfig { DurableName = "cA", FilterSubjects = ["foo.>"], }); var consumerB = MakeConsumer(new ConsumerConfig { DurableName = "cB", FilterSubject = "foo.>", }); var engine = new PullConsumerEngine(); var rA = await engine.FetchAsync(stream, consumerA, new PullFetchRequest { Batch = 5 }, CancellationToken.None); var rB = await engine.FetchAsync(stream, consumerB, new PullFetchRequest { Batch = 5 }, CancellationToken.None); rA.Messages.Count.ShouldBe(rB.Messages.Count); rA.Messages.Count.ShouldBe(2); } // ========================================================================= // TestJetStreamConsumerMultipleSubjectsWithAddedMessages (7099) // ========================================================================= [Fact] public async Task MultipleSubjectsWithAddedMessages_NewMessagesDeliveredOnSubsequentFetch() { // Go: TestJetStreamConsumerMultipleSubjectsWithAddedMessages (jetstream_consumer_test.go:7099) // After first fetch, newly appended messages are returned on the next fetch. var store = new MemStore(); var stream = MakeStream(store, "TEST", "events", "data"); await AppendAsync(store, "events", "e1"); await AppendAsync(store, "data", "d1"); var consumer = MakeConsumer(new ConsumerConfig { DurableName = "c", AckPolicy = AckPolicy.Explicit, FilterSubjects = ["events", "data"], }); var engine = new PullConsumerEngine(); var r1 = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 10 }, CancellationToken.None); r1.Messages.Count.ShouldBe(2); foreach (var msg in r1.Messages) consumer.AckProcessor.AckSequence(msg.Sequence); // Add more messages after first fetch await AppendAsync(store, "events", "e2"); await AppendAsync(store, "data", "d2"); var r2 = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 10 }, CancellationToken.None); r2.Messages.Count.ShouldBe(2); Encoding.UTF8.GetString(r2.Messages[0].Payload.Span).ShouldBe("e2"); Encoding.UTF8.GetString(r2.Messages[1].Payload.Span).ShouldBe("d2"); } // ========================================================================= // TestJetStreamConsumerBadNumPending (jetstream_consumer_test.go:5263) // ========================================================================= [Fact] public async Task BadNumPending_MultipleConsumers_NoPendingAfterFullDelivery() { // Go: TestJetStreamConsumerBadNumPending (jetstream_consumer_test.go:5263) // After all messages are consumed and acked, pending count is 0. var store = new MemStore(); var stream = MakeStream(store, "ORDERS", "orders.*"); for (int i = 0; i < 10; i++) await AppendAsync(store, "orders.created", "NEW"); var consumer = MakeConsumer(new ConsumerConfig { DurableName = "c", AckPolicy = AckPolicy.Explicit, }); var engine = new PullConsumerEngine(); var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 100 }, CancellationToken.None); result.Messages.Count.ShouldBe(10); foreach (var msg in result.Messages) consumer.AckProcessor.AckSequence(msg.Sequence); consumer.AckProcessor.PendingCount.ShouldBe(0); consumer.AckProcessor.AckFloor.ShouldBe(10UL); } // ========================================================================= // TestJetStreamConsumerPendingCountWithRedeliveries (5775) // ========================================================================= [Fact] public async Task PendingCountWithRedeliveries_UnackedMsgRedeliveredOnExpiry() { // Go: TestJetStreamConsumerPendingCountWithRedeliveries (jetstream_consumer_test.go:5775) // A message that is not acked within AckWait is redelivered. var store = new MemStore(); var stream = MakeStream(store, "TEST", "foo"); await AppendAsync(store, "foo", "msg1"); var consumer = MakeConsumer(new ConsumerConfig { DurableName = "test", AckPolicy = AckPolicy.Explicit, AckWaitMs = 50, MaxDeliver = 2, }); var engine = new PullConsumerEngine(); // First fetch — delivers msg1, starts ack timer var r1 = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 1 }, CancellationToken.None); r1.Messages.Count.ShouldBe(1); Encoding.UTF8.GetString(r1.Messages[0].Payload.Span).ShouldBe("msg1"); // Do not ack // Pending count should be 1 (awaiting ack) consumer.AckProcessor.PendingCount.ShouldBe(1); // Wait for ack to expire await Task.Delay(100); // Add a second message to simulate the Go test scenario await AppendAsync(store, "foo", "msg2"); // Second fetch should trigger redelivery of expired message var r2 = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 1 }, CancellationToken.None); r2.Messages.Count.ShouldBe(1); r2.Messages[0].Redelivered.ShouldBeTrue(); // After redelivery, ack and check pending = 0 consumer.AckProcessor.AckSequence(r2.Messages[0].Sequence); consumer.AckProcessor.PendingCount.ShouldBe(0); } // ========================================================================= // TestJetStreamConsumerPendingCountAfterMsgAckAboveFloor (8311) // ========================================================================= [Fact] public async Task PendingCountAfterMsgAckAboveFloor_AckLastMsgClearsCount() { // Go: TestJetStreamConsumerPendingCountAfterMsgAckAboveFloor (jetstream_consumer_test.go:8311) // Acking the last fetched message (above ack floor) reduces pending to 0. var store = new MemStore(); var stream = MakeStream(store, "TEST", "foo"); await AppendAsync(store, "foo"); // seq 1 await AppendAsync(store, "foo"); // seq 2 var consumer = MakeConsumer(new ConsumerConfig { DurableName = "CONSUMER", AckPolicy = AckPolicy.Explicit, }); var engine = new PullConsumerEngine(); var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 2 }, CancellationToken.None); result.Messages.Count.ShouldBe(2); // Ack only the second (last) message consumer.AckProcessor.AckSequence(result.Messages[1].Sequence); // Pending count = 1 (first message still unacked) consumer.AckProcessor.PendingCount.ShouldBe(1); // Ack the first message too consumer.AckProcessor.AckSequence(result.Messages[0].Sequence); consumer.AckProcessor.PendingCount.ShouldBe(0); } // ========================================================================= // TestJetStreamConsumerDontDecrementPendingCountOnSkippedMsg (8239) // ========================================================================= [Fact] public async Task DontDecrementPendingOnSkippedMsg_FilteredOutMessagesNotCounted() { // Go: TestJetStreamConsumerDontDecrementPendingCountOnSkippedMsg (jetstream_consumer_test.go:8239) // Messages that don't match the filter are skipped and don't affect pending count. var store = new MemStore(); var stream = MakeStream(store, "TEST", "foo", "bar"); await AppendAsync(store, "foo", "match"); // seq 1 await AppendAsync(store, "bar", "skip"); // seq 2 await AppendAsync(store, "foo", "match2"); // seq 3 var consumer = MakeConsumer(new ConsumerConfig { DurableName = "CONSUMER", AckPolicy = AckPolicy.Explicit, FilterSubject = "foo", }); var engine = new PullConsumerEngine(); var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 10 }, CancellationToken.None); result.Messages.Count.ShouldBe(2); result.Messages.All(m => m.Subject == "foo").ShouldBeTrue(); consumer.AckProcessor.PendingCount.ShouldBe(2); // The "bar" message was skipped — pending count only reflects delivered messages consumer.AckProcessor.PendingCount.ShouldBe(result.Messages.Count); } // ========================================================================= // TestJetStreamConsumerNumPendingWithMaxPerSubjectGreaterThanOne (6282) // ========================================================================= [Fact] public async Task NumPendingWithMaxPerSubject_LastPerSubjectConsumerCountsCorrectly() { // Go: TestJetStreamConsumerNumPendingWithMaxPerSubjectGreaterThanOne (jetstream_consumer_test.go:6282) // DeliverLastPerSubject consumer with filtered subject counts the last msg per subject. var store = new MemStore(); var stream = MakeStream(store, "TEST", "KV.*.*"); await AppendAsync(store, "KV.plans.foo", "OK"); await AppendAsync(store, "KV.plans.bar", "OK"); await AppendAsync(store, "KV.plans.baz", "OK"); // These should be filtered out by the consumer await AppendAsync(store, "KV.config.foo", "OK"); await AppendAsync(store, "KV.config.bar", "OK"); await AppendAsync(store, "KV.config.baz", "OK"); // Double up plans await AppendAsync(store, "KV.plans.bar", "OK2"); await AppendAsync(store, "KV.plans.baz", "OK2"); var consumer = MakeConsumer(new ConsumerConfig { DurableName = "d", AckPolicy = AckPolicy.Explicit, DeliverPolicy = DeliverPolicy.LastPerSubject, FilterSubject = "KV.plans.*", }); var engine = new PullConsumerEngine(); // For LastPerSubject with "KV.plans.*", we should get the last message per subject // (KV.plans.foo seq1, KV.plans.bar seq7, KV.plans.baz seq8) var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 10 }, CancellationToken.None); // Result must only contain KV.plans.* subjects result.Messages.All(m => m.Subject.StartsWith("KV.plans.")).ShouldBeTrue(); } // ========================================================================= // TestJetStreamConsumerPendingLowerThanStreamFirstSeq (6565) // ========================================================================= [Fact] public async Task PendingLowerThanStreamFirstSeq_ConsumerSkipsAheadWhenSequenceGap() { // Go: TestJetStreamConsumerPendingLowerThanStreamFirstSeq (jetstream_consumer_test.go:6565) // When consumer's next sequence is lower than stream's first seq after compaction, // the consumer should still function correctly. var store = new MemStore(); var stream = MakeStream(store, "TEST", "foo"); for (int i = 0; i < 10; i++) await AppendAsync(store, "foo", "msg"); var consumer = MakeConsumer(new ConsumerConfig { DurableName = "dur", AckPolicy = AckPolicy.Explicit, DeliverPolicy = DeliverPolicy.All, }); var engine = new PullConsumerEngine(); var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 10 }, CancellationToken.None); result.Messages.Count.ShouldBe(10); foreach (var msg in result.Messages) consumer.AckProcessor.AckSequence(msg.Sequence); consumer.AckProcessor.AckFloor.ShouldBe(10UL); // Simulate consumer starting at a sequence before available messages consumer.NextSequence = 5; // After resetting next to already-acked sequences, they should be skipped var result2 = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 10 }, CancellationToken.None); result2.Messages.Count.ShouldBe(0); // All sequences <= AckFloor are skipped } // ========================================================================= // TestJetStreamConsumerInfoNumPending (jetstream_consumer_test.go:8195) // ========================================================================= [Fact] public async Task InfoNumPending_PendingCountMatchesAvailableMessages() { // Go: TestJetStreamConsumerInfoNumPending (jetstream_consumer_test.go:8195) // NumPending reflects the count of messages available for delivery. var store = new MemStore(); var stream = MakeStream(store, "LIMITS", "js.in.limits"); for (int i = 0; i < 100; i++) await AppendAsync(store, "js.in.limits", "x"); var consumer = MakeConsumer(new ConsumerConfig { DurableName = "PULL", AckPolicy = AckPolicy.Explicit, }); // Before any fetch, pending = number of messages in stream var state = await stream.Store.GetStateAsync(CancellationToken.None); state.Messages.ShouldBe(100UL); var engine = new PullConsumerEngine(); var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 50 }, CancellationToken.None); result.Messages.Count.ShouldBe(50); consumer.AckProcessor.PendingCount.ShouldBe(50); } // ========================================================================= // TestJetStreamConsumerEfficientInterestStateCheck (10532) // ========================================================================= [Fact] public async Task EfficientInterestStateCheck_LargeSequenceGapHandledEfficiently() { // Go: TestJetStreamConsumerEfficientInterestStateCheck (jetstream_consumer_test.go:10532) // A consumer with a large gap in acknowledged sequences should still function // without excessive computation. Unit test validates skipping below AckFloor. var store = new MemStore(); var stream = MakeStream(store, "TEST", "foo"); await AppendAsync(store, "foo", "msg1"); // seq 1 await AppendAsync(store, "foo", "msg2"); // seq 2 // Simulate a large gap in sequences by advancing the ack floor await AppendAsync(store, "foo", "msg3"); // seq 3 (after gap) var consumer = MakeConsumer(new ConsumerConfig { DurableName = "CONSUMER", AckPolicy = AckPolicy.Explicit, }); var engine = new PullConsumerEngine(); var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 3 }, CancellationToken.None); result.Messages.Count.ShouldBe(3); // Ack only the first message; others remain pending consumer.AckProcessor.AckSequence(1); // Manually set ack floor to simulate a large gap being already acked consumer.AckProcessor.AckSequence(2); consumer.AckProcessor.AckSequence(3); // AckFloor should have advanced consumer.AckProcessor.AckFloor.ShouldBe(3UL); consumer.AckProcessor.PendingCount.ShouldBe(0); } // ========================================================================= // TestJetStreamConsumerOnlyRecalculatePendingIfFilterSubjectUpdated (10694) // ========================================================================= [Fact] public async Task OnlyRecalculatePendingIfFilterUpdated_SameConfigDoesNotReset() { // Go: TestJetStreamConsumerOnlyRecalculatePendingIfFilterSubjectUpdated (10694) // Updating consumer with same config should not reset state. // Updating with new filter should apply from current position. var store = new MemStore(); var stream = MakeStream(store, "TEST", "foo"); await AppendAsync(store, "foo", "msg1"); var consumer = MakeConsumer(new ConsumerConfig { DurableName = "DURABLE", }); var engine = new PullConsumerEngine(); var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 1 }, CancellationToken.None); result.Messages.Count.ShouldBe(1); var sequenceBefore = consumer.NextSequence; // Simulating "same config update" — no change to filter, NextSequence unchanged // In our engine the consumer's config state is just modified in place consumer.Config.FilterSubject = null; // still no filter var result2 = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 5 }, CancellationToken.None); consumer.NextSequence.ShouldBeGreaterThanOrEqualTo(sequenceBefore); // Update filter to "foo" consumer.Config.FilterSubject = "foo"; // Reset sequence to test from beginning consumer.NextSequence = 1; var result3 = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 5 }, CancellationToken.None); result3.Messages.Count.ShouldBe(1); result3.Messages.All(m => m.Subject == "foo").ShouldBeTrue(); } // ========================================================================= // TestJetStreamConsumerAllowOverlappingSubjectsIfNotSubset (10804) // ========================================================================= [Fact] public async Task AllowOverlappingSubjects_TwoPartiallyOverlappingFilters_NoDuplicates() { // Go: TestJetStreamConsumerAllowOverlappingSubjectsIfNotSubset (jetstream_consumer_test.go:10804) // Two overlapping wildcard filters (event.foo.* and event.*.foo) deliver // all matching messages without duplication (7 unique subjects). var store = new MemStore(); var stream = MakeStream(store, "TEST", "event.>"); var parts = new[] { "foo", "bar", "baz", "oth" }; foreach (var start in parts) foreach (var end in parts) await AppendAsync(store, $"event.{start}.{end}"); // Total: 16 messages published var consumer = MakeConsumer(new ConsumerConfig { DurableName = "DURABLE", FilterSubjects = ["event.foo.*", "event.*.foo"], }); var engine = new PullConsumerEngine(); var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 20 }, CancellationToken.None); // event.foo.X (4 subjects) + event.Y.foo where Y != foo (3 subjects) = 7 result.Messages.Count.ShouldBe(7); // No duplicates var subjects = result.Messages.Select(m => m.Subject).ToList(); subjects.Distinct().Count().ShouldBe(7); // All match at least one filter var filter = new CompiledFilter(["event.foo.*", "event.*.foo"]); result.Messages.All(m => filter.Matches(m.Subject)).ShouldBeTrue(); } // ========================================================================= // TestSortingConsumerPullRequests (jetstream_consumer_test.go:10031) // ========================================================================= [Fact] public void SortingConsumerPullRequests_BasicPriorityOrdering() { // Go: TestSortingConsumerPullRequests (jetstream_consumer_test.go:10031) // Lower Priority value = higher precedence (earlier in queue). var queue = new PullRequestWaitQueue(100); queue.Enqueue(new PullWaitingRequest { Priority = 1, Reply = null }); queue.Enqueue(new PullWaitingRequest { Priority = 2, Reply = null }); queue.Enqueue(new PullWaitingRequest { Priority = 1, Reply = null }); queue.Enqueue(new PullWaitingRequest { Priority = 3, Reply = null }); var expectedPriorities = new[] { 1, 1, 2, 3 }; for (int i = 0; i < expectedPriorities.Length; i++) { var item = queue.Dequeue(); item.ShouldNotBeNull(); item!.Priority.ShouldBe(expectedPriorities[i]); } } [Fact] public void SortingConsumerPullRequests_StableOrderWithinSamePriority() { // Go: TestSortingConsumerPullRequests (jetstream_consumer_test.go:10054) "test if sort is stable" // Items with the same priority maintain insertion order (stable sort). var queue = new PullRequestWaitQueue(100); queue.Enqueue(new PullWaitingRequest { Priority = 1, Reply = "1a" }); queue.Enqueue(new PullWaitingRequest { Priority = 2, Reply = "2a" }); queue.Enqueue(new PullWaitingRequest { Priority = 1, Reply = "1b" }); queue.Enqueue(new PullWaitingRequest { Priority = 2, Reply = "2b" }); queue.Enqueue(new PullWaitingRequest { Priority = 1, Reply = "1c" }); queue.Enqueue(new PullWaitingRequest { Priority = 3, Reply = "3a" }); queue.Enqueue(new PullWaitingRequest { Priority = 2, Reply = "2c" }); var expected = new[] { "1a", "1b", "1c", "2a", "2b", "2c", "3a" }; for (int i = 0; i < expected.Length; i++) { var item = queue.Dequeue(); item.ShouldNotBeNull(); item!.Reply.ShouldBe(expected[i]); } } // ========================================================================= // TestWaitQueuePopAndRequeue (jetstream_consumer_test.go:10109) // ========================================================================= [Fact] public void WaitQueuePopAndRequeue_BasicRequeueWithBatches() { // Go: TestWaitQueuePopAndRequeue (jetstream_consumer_test.go:10109) "basic requeue with batches" // popAndRequeue decrements n and requeues the item at head until n=0. var queue = new PullRequestWaitQueue(100); // 9 items, 3 per priority group, each with n=3 string[] priorities = ["1", "2", "3"]; string[] letters = ["a", "b", "c"]; foreach (var p in priorities) foreach (var l in letters) queue.Enqueue(new PullWaitingRequest { Priority = int.Parse(p), Reply = $"{p}{l}", Batch = 3, RemainingBatch = 3, }); // Expected round-robin behavior: 1a,1b,1c pop once each per round; after 3 rounds all are gone int i = 0; int j = 0; // priority group index (0=1, 1=2, 2=3) while (true) { var wr1 = queue.PopAndRequeue(); wr1.ShouldNotBeNull(); wr1!.Reply.ShouldBe($"{j + 1}a"); var wr2 = queue.PopAndRequeue(); wr2.ShouldNotBeNull(); wr2!.Reply.ShouldBe($"{j + 1}b"); var wr3 = queue.PopAndRequeue(); wr3.ShouldNotBeNull(); wr3!.Reply.ShouldBe($"{j + 1}c"); i++; if (i % 3 == 0) j++; // Count should match 9 - (j * 3) queue.Count.ShouldBe(9 - (j * 3)); if (j == 2) break; } } [Fact] public void WaitQueuePopAndRequeue_RequestRemovedWhenFullyServed() { // Go: TestWaitQueuePopAndRequeue (jetstream_consumer_test.go:10152) "request removal when fully served" var queue = new PullRequestWaitQueue(100); queue.Enqueue(new PullWaitingRequest { Priority = 1, Reply = "1a", RemainingBatch = 2 }); queue.Enqueue(new PullWaitingRequest { Priority = 1, Reply = "1b", RemainingBatch = 1 }); queue.Enqueue(new PullWaitingRequest { Priority = 2, Reply = "2a", RemainingBatch = 3 }); var initialCount = queue.Count; initialCount.ShouldBe(3); // Pop 1a first time (n=2 -> n=1, requeued) var wr = queue.PopAndRequeue(); wr.ShouldNotBeNull(); wr!.Reply.ShouldBe("1a"); wr.RemainingBatch.ShouldBe(1); queue.Count.ShouldBe(initialCount); // still 3 // Pop 1b (n=1 -> n=0, removed) wr = queue.PopAndRequeue(); wr.ShouldNotBeNull(); wr!.Reply.ShouldBe("1b"); wr.RemainingBatch.ShouldBe(0); queue.Count.ShouldBe(initialCount - 1); // now 2 // Pop 1a second time (n=1 -> n=0, removed) wr = queue.PopAndRequeue(); wr.ShouldNotBeNull(); wr!.Reply.ShouldBe("1a"); wr.RemainingBatch.ShouldBe(0); queue.Count.ShouldBe(initialCount - 2); // now 1 // Only 2a should remain var next = queue.Peek(); next.ShouldNotBeNull(); next!.Reply.ShouldBe("2a"); next.RemainingBatch.ShouldBe(3); } // ========================================================================= // TestJetStreamConsumerStateAlwaysFromStore (jetstream_consumer_test.go:9796) // ========================================================================= [Fact] public async Task ConsumerStateAlwaysFromStore_DeliveredAndAckFloorTrack() { // Go: TestJetStreamConsumerStateAlwaysFromStore (jetstream_consumer_test.go:9796) // Consumer delivered/ackFloor state tracks correctly through fetch and ack cycles. var store = new MemStore(); var stream = MakeStream(store, "TEST", "foo.>"); await AppendAsync(store, "foo.bar"); // seq 1 — filtered await AppendAsync(store, "foo.other"); // seq 2 — not filtered var consumer = MakeConsumer(new ConsumerConfig { DurableName = "CONSUMER", AckPolicy = AckPolicy.Explicit, FilterSubject = "foo.bar", }); var engine = new PullConsumerEngine(); // Initial state: no delivered, no ack floor consumer.AckProcessor.AckFloor.ShouldBe(0UL); consumer.AckProcessor.PendingCount.ShouldBe(0); // Fetch — should deliver only foo.bar (seq 1) var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 5 }, CancellationToken.None); result.Messages.Count.ShouldBe(1); result.Messages[0].Subject.ShouldBe("foo.bar"); // Delivered but not acked — pending = 1, floor = 0 consumer.AckProcessor.PendingCount.ShouldBe(1); consumer.AckProcessor.AckFloor.ShouldBe(0UL); // Ack the message — floor advances to seq 1 consumer.AckProcessor.AckSequence(result.Messages[0].Sequence); consumer.AckProcessor.PendingCount.ShouldBe(0); consumer.AckProcessor.AckFloor.ShouldBe(result.Messages[0].Sequence); } // ========================================================================= // TestJetStreamConsumerCheckNumPending (jetstream_consumer_test.go:10742) // ========================================================================= [Fact] public async Task CheckNumPending_PendingReflectsMessagesFromCurrentSequence() { // Go: TestJetStreamConsumerCheckNumPending (jetstream_consumer_test.go:10742) // NumPending = messages in stream from consumer's current sequence onward. var store = new MemStore(); var stream = MakeStream(store, "TEST", "foo"); for (int i = 0; i < 5; i++) await AppendAsync(store, "foo"); var consumer = MakeConsumer(new ConsumerConfig { DurableName = "DURABLE", }); var engine = new PullConsumerEngine(); // Initial: consumer at seq 1, 5 messages pending // Fetch 2 messages var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 2 }, CancellationToken.None); result.Messages.Count.ShouldBe(2); // NextSequence now points past fetched messages consumer.NextSequence.ShouldBe(3UL); // 3 messages remain to be delivered var state = await stream.Store.GetStateAsync(CancellationToken.None); var remaining = state.LastSeq - consumer.NextSequence + 1; remaining.ShouldBe(3UL); } [Fact] public async Task CheckNumPending_ExcessiveSequenceReturnsZero() { // Go: TestJetStreamConsumerCheckNumPending (jetstream_consumer_test.go:10782) // A sequence past the end of stream reports 0 pending. var store = new MemStore(); var stream = MakeStream(store, "TEST", "foo"); for (int i = 0; i < 5; i++) await AppendAsync(store, "foo"); var consumer = MakeConsumer(new ConsumerConfig { DurableName = "DURABLE" }); consumer.NextSequence = 100; // well past end of stream var engine = new PullConsumerEngine(); var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 5, NoWait = true, }, CancellationToken.None); result.Messages.Count.ShouldBe(0); } // ========================================================================= // TestJetStreamConsumerMultipleFiltersRace (jetstream_consumer_test.go:93) // ========================================================================= [Fact] public async Task MultipleFiltersRace_ConcurrentFetchAllMessagesDelivered() { // Go: TestJetStreamConsumerMultipleFiltersRace (jetstream_consumer_test.go:93) // Multiple concurrent fetches with filter subjects deliver all matching messages. var store = new MemStore(); var stream = MakeStream(store, "TEST", "foo.>", "bar.>"); // Publish 20 messages on foo.* and 20 on bar.* for (int i = 0; i < 20; i++) { await AppendAsync(store, $"foo.{i}", $"f{i}"); await AppendAsync(store, $"bar.{i}", $"b{i}"); } var consumer = MakeConsumer(new ConsumerConfig { DurableName = "c", AckPolicy = AckPolicy.Explicit, FilterSubjects = ["foo.>", "bar.>"], }); var engine = new PullConsumerEngine(); var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 40 }, CancellationToken.None); result.Messages.Count.ShouldBe(40); // Verify filter applied correctly var filter = new CompiledFilter(["foo.>", "bar.>"]); result.Messages.All(m => filter.Matches(m.Subject)).ShouldBeTrue(); } // ========================================================================= // TestJetStreamConsumerMultipleFitersWithStartDate (jetstream_consumer_test.go:2533) // ========================================================================= [Fact] public async Task MultipleFitersWithStartDate_ByStartTimeOnlyDeliversNewEnoughMessages() { // Go: TestJetStreamConsumerMultipleFitersWithStartDate (jetstream_consumer_test.go:2533) // DeliverPolicy.ByStartTime with a cut-off time only delivers messages after that time. var store = new MemStore(); var stream = MakeStream(store, "TEST", "events", "data"); var cutoff = DateTime.UtcNow.AddMilliseconds(-50); // Append old message (before cutoff — at least conceptually; MemStore records current time) // In unit test we simulate by appending before the cutoff datetime we pass await AppendAsync(store, "events", "old"); await AppendAsync(store, "data", "old"); await Task.Delay(10); // ensure new messages have newer timestamp // Append new messages after our reference point await AppendAsync(store, "events", "new1"); await AppendAsync(store, "data", "new2"); var startTime = DateTime.UtcNow.AddMilliseconds(-5); // very recent var consumer = MakeConsumer(new ConsumerConfig { DurableName = "c", AckPolicy = AckPolicy.Explicit, DeliverPolicy = DeliverPolicy.ByStartTime, OptStartTimeUtc = startTime, FilterSubjects = ["events", "data"], }); var engine = new PullConsumerEngine(); var result = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 10 }, CancellationToken.None); // Only messages at or after startTime should be delivered result.Messages.All(m => filter_matches_events_data(m.Subject)).ShouldBeTrue(); result.Messages.Count.ShouldBeGreaterThanOrEqualTo(0); static bool filter_matches_events_data(string s) => s == "events" || s == "data"; } // ========================================================================= // TestJetStreamConsumerFilterUpdate (jetstream_consumer_test.go:7397) // ========================================================================= [Fact] public async Task FilterUpdate_ChangingFilterSubjectChangesDeliveredMessages() { // Go: TestJetStreamConsumerFilterUpdate (jetstream_consumer_test.go:7397) // Changing FilterSubject on a consumer config changes which messages are delivered. var store = new MemStore(); var stream = MakeStream(store, "TEST", "foo", "bar"); await AppendAsync(store, "foo", "f1"); await AppendAsync(store, "bar", "b1"); await AppendAsync(store, "foo", "f2"); await AppendAsync(store, "bar", "b2"); var consumer = MakeConsumer(new ConsumerConfig { DurableName = "c", AckPolicy = AckPolicy.Explicit, FilterSubject = "foo", }); var engine = new PullConsumerEngine(); var r1 = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 10 }, CancellationToken.None); r1.Messages.Count.ShouldBe(2); r1.Messages.All(m => m.Subject == "foo").ShouldBeTrue(); foreach (var msg in r1.Messages) consumer.AckProcessor.AckSequence(msg.Sequence); // Update filter to "bar" — consumer starts fresh consumer.Config.FilterSubject = "bar"; consumer.NextSequence = 1; var r2 = await engine.FetchAsync(stream, consumer, new PullFetchRequest { Batch = 10 }, CancellationToken.None); r2.Messages.Count.ShouldBe(2); r2.Messages.All(m => m.Subject == "bar").ShouldBeTrue(); } }