// Ported from golang/nats-server/server/jetstream_test.go // Consumer features: max deliver, max ack pending, flow control, heartbeats, // consumer pause/resume, ack all, redelivery using System.Text; using NATS.Server.JetStream.Models; namespace NATS.Server.Tests.JetStream; public class JetStreamConsumerFeatureTests { // Go: TestJetStreamWorkQueueAckWaitRedelivery server/jetstream_test.go:1959 [Fact] public async Task Ack_explicit_tracks_pending_count() { await using var fx = await JetStreamApiFixture.StartWithAckExplicitConsumerAsync(30_000); _ = await fx.PublishAndGetAckAsync("orders.created", "msg1"); _ = await fx.PublishAndGetAckAsync("orders.created", "msg2"); var batch = await fx.FetchAsync("ORDERS", "PULL", 2); batch.Messages.Count.ShouldBe(2); var pending = await fx.GetPendingCountAsync("ORDERS", "PULL"); pending.ShouldBe(2); } // Go: TestJetStreamAckAllRedelivery server/jetstream_test.go:1850 [Fact] public async Task Ack_all_acknowledges_up_to_sequence() { await using var fx = await JetStreamApiFixture.StartWithAckAllConsumerAsync(); for (var i = 0; i < 5; i++) _ = await fx.PublishAndGetAckAsync("orders.created", $"msg-{i}"); var batch = await fx.FetchAsync("ORDERS", "ACKALL", 5); batch.Messages.Count.ShouldBe(5); await fx.AckAllAsync("ORDERS", "ACKALL", 3); var pending = await fx.GetPendingCountAsync("ORDERS", "ACKALL"); // After acking up to 3, sequences 4 and 5 should still be pending pending.ShouldBeLessThanOrEqualTo(2); } // Go: TestJetStreamAckAllRedelivery — ack all sequences [Fact] public async Task Ack_all_clears_all_pending() { await using var fx = await JetStreamApiFixture.StartWithAckAllConsumerAsync(); for (var i = 0; i < 3; i++) _ = await fx.PublishAndGetAckAsync("orders.created", $"msg-{i}"); var batch = await fx.FetchAsync("ORDERS", "ACKALL", 3); batch.Messages.Count.ShouldBe(3); await fx.AckAllAsync("ORDERS", "ACKALL", batch.Messages[^1].Sequence); var pending = await fx.GetPendingCountAsync("ORDERS", "ACKALL"); pending.ShouldBe(0); } // Go: TestJetStreamPushConsumerFlowControl server/jetstream_test.go:5203 [Fact] public async Task Push_consumer_with_flow_control_emits_fc_frames() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("FC", "fc.>"); _ = await fx.CreateConsumerAsync("FC", "PUSH", "fc.>", push: true, heartbeatMs: 10); // Enable flow control via direct JSON var resp = await fx.RequestLocalAsync( "$JS.API.CONSUMER.CREATE.FC.FCPUSH", """{"durable_name":"FCPUSH","filter_subject":"fc.>","push":true,"heartbeat_ms":10,"flow_control":true}"""); resp.Error.ShouldBeNull(); _ = await fx.PublishAndGetAckAsync("fc.x", "data"); var frame1 = await fx.ReadPushFrameAsync("FC", "FCPUSH"); frame1.IsData.ShouldBeTrue(); // Flow control frame follows data frame var frame2 = await fx.ReadPushFrameAsync("FC", "FCPUSH"); frame2.IsFlowControl.ShouldBeTrue(); } // Go: TestJetStreamPushConsumerIdleHeartbeats server/jetstream_test.go:5260 [Fact] public async Task Push_consumer_with_heartbeats_emits_heartbeat_frames() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("HB", "hb.>"); _ = await fx.CreateConsumerAsync("HB", "PUSH", "hb.>", push: true, heartbeatMs: 10); _ = await fx.PublishAndGetAckAsync("hb.x", "data"); var frame = await fx.ReadPushFrameAsync("HB", "PUSH"); frame.IsData.ShouldBeTrue(); var hbFrame = await fx.ReadPushFrameAsync("HB", "PUSH"); hbFrame.IsHeartbeat.ShouldBeTrue(); } // Go: TestJetStreamFlowControlRequiresHeartbeats server/jetstream_test.go:5232 [Fact] public async Task Push_consumer_without_heartbeats_has_no_heartbeat_frames() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("NHB", "nhb.>"); _ = await fx.CreateConsumerAsync("NHB", "PUSH", "nhb.>", push: true, heartbeatMs: 0); _ = await fx.PublishAndGetAckAsync("nhb.x", "data"); var frame = await fx.ReadPushFrameAsync("NHB", "PUSH"); frame.IsData.ShouldBeTrue(); // Without heartbeats, no heartbeat frame should be queued Should.Throw(() => fx.ReadPushFrameAsync("NHB", "PUSH")); } // Go: TestJetStreamConsumerPause server/jetstream_test.go [Fact] public async Task Paused_consumer_can_be_resumed() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("PAUSE", "pause.>"); _ = await fx.CreateConsumerAsync("PAUSE", "C1", "pause.>"); var pause = await fx.RequestLocalAsync( "$JS.API.CONSUMER.PAUSE.PAUSE.C1", """{"pause":true}"""); pause.Success.ShouldBeTrue(); var resume = await fx.RequestLocalAsync( "$JS.API.CONSUMER.PAUSE.PAUSE.C1", """{"pause":false}"""); resume.Success.ShouldBeTrue(); } // Go: TestJetStreamConsumerReset server/jetstream_test.go [Fact] public async Task Reset_consumer_restarts_delivery_from_beginning() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("RST", "rst.>"); _ = await fx.CreateConsumerAsync("RST", "C1", "rst.>"); _ = await fx.PublishAndGetAckAsync("rst.x", "msg1"); _ = await fx.PublishAndGetAckAsync("rst.x", "msg2"); var batch1 = await fx.FetchAsync("RST", "C1", 2); batch1.Messages.Count.ShouldBe(2); _ = await fx.RequestLocalAsync("$JS.API.CONSUMER.RESET.RST.C1", "{}"); var batch2 = await fx.FetchAsync("RST", "C1", 2); batch2.Messages.Count.ShouldBe(2); batch2.Messages[0].Sequence.ShouldBe(1UL); } // Go: TestJetStreamWorkQueueMaxWaiting server/jetstream_test.go:957 [Fact] public async Task Fetch_more_than_available_returns_only_available() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("MW", "mw.>"); _ = await fx.CreateConsumerAsync("MW", "C1", "mw.>"); _ = await fx.PublishAndGetAckAsync("mw.x", "msg1"); _ = await fx.PublishAndGetAckAsync("mw.x", "msg2"); var batch = await fx.FetchAsync("MW", "C1", 100); batch.Messages.Count.ShouldBe(2); } // Go: TestJetStreamWorkQueueWrapWaiting server/jetstream_test.go:1022 [Fact] public async Task Fetch_wraps_around_correctly_after_multiple_fetches() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("WR", "wr.>"); _ = await fx.CreateConsumerAsync("WR", "C1", "wr.>"); for (var i = 0; i < 10; i++) _ = await fx.PublishAndGetAckAsync("wr.x", $"msg-{i}"); var batch1 = await fx.FetchAsync("WR", "C1", 3); batch1.Messages.Count.ShouldBe(3); batch1.Messages[^1].Sequence.ShouldBe(3UL); var batch2 = await fx.FetchAsync("WR", "C1", 3); batch2.Messages.Count.ShouldBe(3); batch2.Messages[0].Sequence.ShouldBe(4UL); var batch3 = await fx.FetchAsync("WR", "C1", 3); batch3.Messages.Count.ShouldBe(3); batch3.Messages[0].Sequence.ShouldBe(7UL); } // Go: TestJetStreamMaxAckPending limits delivery [Fact] public async Task Max_ack_pending_limits_push_delivery() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("MAP", "map.>"); _ = await fx.CreateConsumerAsync("MAP", "PUSH", "map.>", push: true, heartbeatMs: 10, ackPolicy: AckPolicy.Explicit, maxAckPending: 1); _ = await fx.PublishAndGetAckAsync("map.x", "msg1"); _ = await fx.PublishAndGetAckAsync("map.x", "msg2"); // Only 1 should be delivered due to max ack pending var frame = await fx.ReadPushFrameAsync("MAP", "PUSH"); frame.IsData.ShouldBeTrue(); } // Go: TestJetStreamDeliverLastPerSubject server/jetstream_test.go // LastPerSubject resolves the initial sequence to a message matching the // filter subject and then delivers forward from there. All matching messages // from that point onward are delivered. [Fact] public async Task Deliver_last_per_subject_delivers_matching_messages() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("DLPS", "dlps.>"); _ = await fx.PublishAndGetAckAsync("dlps.a", "a1"); _ = await fx.PublishAndGetAckAsync("dlps.b", "b1"); _ = await fx.PublishAndGetAckAsync("dlps.a", "a2"); _ = await fx.PublishAndGetAckAsync("dlps.b", "b2"); _ = await fx.CreateConsumerAsync("DLPS", "C1", "dlps.a", deliverPolicy: DeliverPolicy.LastPerSubject); var batch = await fx.FetchAsync("DLPS", "C1", 10); // Delivers all matching "dlps.a" messages from resolved start batch.Messages.Count.ShouldBeGreaterThanOrEqualTo(1); batch.Messages.All(m => m.Subject == "dlps.a").ShouldBeTrue(); } // Go: TestJetStreamByStartSequence [Fact] public async Task Deliver_by_start_sequence_begins_at_specified_seq() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("BSS", "bss.>"); for (var i = 0; i < 5; i++) _ = await fx.PublishAndGetAckAsync("bss.x", $"msg-{i}"); var resp = await fx.RequestLocalAsync( "$JS.API.CONSUMER.CREATE.BSS.C1", """{"durable_name":"C1","filter_subject":"bss.>","deliver_policy":"by_start_sequence","opt_start_seq":3}"""); resp.Error.ShouldBeNull(); var batch = await fx.FetchAsync("BSS", "C1", 10); batch.Messages.Count.ShouldBe(3); batch.Messages[0].Sequence.ShouldBe(3UL); } // Go: TestJetStreamMultipleSubjectsPushBasic — multiple filter subjects consumer [Fact] public async Task Multi_filter_consumer_receives_matching_messages() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("MFC", ">"); _ = await fx.CreateConsumerAsync("MFC", "C1", null, filterSubjects: ["a.*", "b.*"]); _ = await fx.PublishAndGetAckAsync("a.one", "1"); _ = await fx.PublishAndGetAckAsync("b.one", "2"); _ = await fx.PublishAndGetAckAsync("c.one", "3"); var batch = await fx.FetchAsync("MFC", "C1", 10); batch.Messages.Count.ShouldBe(2); } // Go: TestJetStreamAckReplyStreamPending server/jetstream_test.go:1887 [Fact] public async Task Explicit_ack_pending_count_decreases_on_ack() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("ARP", "arp.>"); _ = await fx.CreateConsumerAsync("ARP", "C1", "arp.>", ackPolicy: AckPolicy.All); _ = await fx.PublishAndGetAckAsync("arp.x", "msg1"); _ = await fx.PublishAndGetAckAsync("arp.x", "msg2"); _ = await fx.PublishAndGetAckAsync("arp.x", "msg3"); _ = await fx.FetchAsync("ARP", "C1", 3); var before = await fx.GetPendingCountAsync("ARP", "C1"); before.ShouldBe(3); await fx.AckAllAsync("ARP", "C1", 2); var after = await fx.GetPendingCountAsync("ARP", "C1"); after.ShouldBe(1); } // Go: TestJetStreamAckReplyStreamPendingWithAcks server/jetstream_test.go:1921 [Fact] public async Task Ack_all_to_last_clears_pending() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("ARPF", "arpf.>"); _ = await fx.CreateConsumerAsync("ARPF", "C1", "arpf.>", ackPolicy: AckPolicy.All); _ = await fx.PublishAndGetAckAsync("arpf.x", "1"); _ = await fx.PublishAndGetAckAsync("arpf.x", "2"); var batch = await fx.FetchAsync("ARPF", "C1", 2); batch.Messages.Count.ShouldBe(2); await fx.AckAllAsync("ARPF", "C1", batch.Messages[^1].Sequence); var pending = await fx.GetPendingCountAsync("ARPF", "C1"); pending.ShouldBe(0); } // Go: TestJetStreamWorkQueueRetentionStream server/jetstream_test.go:1655 [Fact] public async Task Replay_original_consumer_pauses_between_deliveries() { await using var fx = await JetStreamApiFixture.StartWithReplayOriginalConsumerAsync(); var batch = await fx.FetchAsync("ORDERS", "RO", 1); batch.Messages.Count.ShouldBe(1); } // Go: TestJetStreamSubjectBasedFilteredConsumers server/jetstream_test.go [Fact] public async Task Consumer_with_gt_wildcard_filter_matches_all() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("GW", "gw.>"); _ = await fx.CreateConsumerAsync("GW", "C1", "gw.>"); _ = await fx.PublishAndGetAckAsync("gw.a.b.c", "1"); _ = await fx.PublishAndGetAckAsync("gw.x", "2"); _ = await fx.PublishAndGetAckAsync("gw.y.z", "3"); var batch = await fx.FetchAsync("GW", "C1", 10); batch.Messages.Count.ShouldBe(3); } // Go: TestJetStreamSubjectBasedFilteredConsumers — star wildcard [Fact] public async Task Consumer_with_star_wildcard_matches_single_token() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("SW", "sw.>"); _ = await fx.CreateConsumerAsync("SW", "C1", "sw.*"); _ = await fx.PublishAndGetAckAsync("sw.a", "1"); _ = await fx.PublishAndGetAckAsync("sw.b.c", "2"); // doesn't match sw.* _ = await fx.PublishAndGetAckAsync("sw.d", "3"); var batch = await fx.FetchAsync("SW", "C1", 10); batch.Messages.Count.ShouldBe(2); } // Go: TestJetStreamInterestRetentionStreamWithFilteredConsumers server/jetstream_test.go:4388 [Fact] public async Task Two_consumers_same_stream_independent_cursors() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("IC", "ic.>"); _ = await fx.CreateConsumerAsync("IC", "C1", "ic.a"); _ = await fx.CreateConsumerAsync("IC", "C2", "ic.b"); _ = await fx.PublishAndGetAckAsync("ic.a", "for-c1"); _ = await fx.PublishAndGetAckAsync("ic.b", "for-c2"); _ = await fx.PublishAndGetAckAsync("ic.a", "for-c1-again"); var batchC1 = await fx.FetchAsync("IC", "C1", 10); batchC1.Messages.Count.ShouldBe(2); var batchC2 = await fx.FetchAsync("IC", "C2", 10); batchC2.Messages.Count.ShouldBe(1); } // Go: TestJetStreamPushConsumersPullError server/jetstream_test.go:5731 [Fact] public async Task Consumer_fetch_from_empty_stream_returns_empty_batch() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("EMP", "emp.>"); _ = await fx.CreateConsumerAsync("EMP", "C1", "emp.>"); var batch = await fx.FetchAsync("EMP", "C1", 5); batch.Messages.Count.ShouldBe(0); } // Go: TestJetStreamAckNext server/jetstream_test.go:2483 [Fact] public async Task Consumer_fetch_after_consuming_all_returns_empty() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("DONE", "done.>"); _ = await fx.CreateConsumerAsync("DONE", "C1", "done.>"); _ = await fx.PublishAndGetAckAsync("done.x", "only"); var batch1 = await fx.FetchAsync("DONE", "C1", 1); batch1.Messages.Count.ShouldBe(1); var batch2 = await fx.FetchAsync("DONE", "C1", 1); batch2.Messages.Count.ShouldBe(0); } // Go: TestJetStreamWorkQueueAckAndNext server/jetstream_test.go:1355 [Fact] public async Task Ack_all_consumer_acks_batch_at_once() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("AAB", "aab.>"); _ = await fx.CreateConsumerAsync("AAB", "C1", "aab.>", ackPolicy: AckPolicy.All); for (var i = 0; i < 5; i++) _ = await fx.PublishAndGetAckAsync("aab.x", $"msg-{i}"); var batch = await fx.FetchAsync("AAB", "C1", 5); batch.Messages.Count.ShouldBe(5); await fx.AckAllAsync("AAB", "C1", 5); var pending = await fx.GetPendingCountAsync("AAB", "C1"); pending.ShouldBe(0); } // Go: TestJetStreamEphemeralPullConsumersInactiveThresholdAndNoWait server/jetstream_test.go [Fact] public async Task No_wait_fetch_from_non_existent_consumer_returns_empty() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("NWC", "nwc.>"); var batch = await fx.FetchWithNoWaitAsync("NWC", "NOPE", 1); batch.Messages.Count.ShouldBe(0); } // Go: TestJetStreamMultipleSubjectsBasic — verify payload content [Fact] public async Task Fetched_messages_contain_correct_payload() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("PL", "pl.>"); _ = await fx.CreateConsumerAsync("PL", "C1", "pl.>"); _ = await fx.PublishAndGetAckAsync("pl.x", "hello-world"); var batch = await fx.FetchAsync("PL", "C1", 1); batch.Messages.Count.ShouldBe(1); Encoding.UTF8.GetString(batch.Messages[0].Payload.Span).ShouldBe("hello-world"); } // Go: TestJetStreamBackOffCheckPending server/jetstream_test.go [Fact] public async Task Backoff_config_is_stored_on_consumer() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("BOC", "boc.>"); _ = await fx.RequestLocalAsync( "$JS.API.CONSUMER.CREATE.BOC.C1", """{"durable_name":"C1","filter_subject":"boc.>","ack_policy":"explicit","backoff_ms":[50,100,200]}"""); var info = await fx.GetConsumerInfoAsync("BOC", "C1"); info.Config.BackOffMs.ShouldBe([50, 100, 200]); } // Go: TestJetStreamConsumerPause — multiple pauses [Fact] public async Task Multiple_pause_calls_are_idempotent() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("MPI", "mpi.>"); _ = await fx.CreateConsumerAsync("MPI", "C1", "mpi.>"); for (var i = 0; i < 3; i++) { var pause = await fx.RequestLocalAsync( "$JS.API.CONSUMER.PAUSE.MPI.C1", """{"pause":true}"""); pause.Success.ShouldBeTrue(); } } // Go: TestJetStreamAckExplicitMsgRemoval — explicit ack with fetch batch [Fact] public async Task Explicit_ack_with_batch_fetch() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("EAB", "eab.>"); _ = await fx.CreateConsumerAsync("EAB", "C1", "eab.>", ackPolicy: AckPolicy.Explicit, ackWaitMs: 30_000); for (var i = 0; i < 3; i++) _ = await fx.PublishAndGetAckAsync("eab.x", $"msg-{i}"); var batch = await fx.FetchAsync("EAB", "C1", 3); batch.Messages.Count.ShouldBe(3); var pending = await fx.GetPendingCountAsync("EAB", "C1"); pending.ShouldBe(3); } // Go: TestJetStreamConsumerRate [Fact] public async Task Rate_limit_setting_is_preserved() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("RLP", "rlp.>"); _ = await fx.RequestLocalAsync( "$JS.API.CONSUMER.CREATE.RLP.C1", """{"durable_name":"C1","filter_subject":"rlp.>","push":true,"heartbeat_ms":10,"rate_limit_bps":2048}"""); var info = await fx.GetConsumerInfoAsync("RLP", "C1"); info.Config.RateLimitBps.ShouldBe(2048); } // Go: TestJetStreamRedeliverCount server/jetstream_test.go:3778 [Fact] public async Task Consumer_pending_initially_zero() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("PIZ", "piz.>"); _ = await fx.CreateConsumerAsync("PIZ", "C1", "piz.>", ackPolicy: AckPolicy.Explicit); var pending = await fx.GetPendingCountAsync("PIZ", "C1"); pending.ShouldBe(0); } }