// Ported from golang/nats-server/server/jetstream_test.go // Consumer delivery edge cases: ack wait timeout tracking, max deliver attempts, // backoff lists, idle heartbeat config, deliver policies, push vs pull. using NATS.Server.JetStream.Consumers; using NATS.Server.JetStream.Models; namespace NATS.Server.Tests.JetStream; public class JetStreamConsumerDeliveryEdgeTests { // Go: TestJetStreamWorkQueueAckWaitRedelivery server/jetstream_test.go:2213 // AckWait is stored in consumer config and used by ack processor. [Fact] public async Task Ack_wait_ms_stored_in_consumer_config() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("ACKWAIT", "ackwait.>"); var resp = await fx.CreateConsumerAsync("ACKWAIT", "C1", "ackwait.>", ackPolicy: AckPolicy.Explicit, ackWaitMs: 250); resp.Error.ShouldBeNull(); resp.ConsumerInfo!.Config.AckWaitMs.ShouldBe(250); } // Go: TestJetStreamWorkQueueAckWaitRedelivery — registers pending on fetch [Fact] public async Task Fetch_with_ack_explicit_registers_pending_messages() { await using var fx = await JetStreamApiFixture.StartWithAckExplicitConsumerAsync(500); _ = await fx.PublishAndGetAckAsync("orders.created", "msg1"); _ = await fx.PublishAndGetAckAsync("orders.created", "msg2"); _ = await fx.PublishAndGetAckAsync("orders.created", "msg3"); var batch = await fx.FetchAsync("ORDERS", "PULL", 3); batch.Messages.Count.ShouldBe(3); var pending = await fx.GetPendingCountAsync("ORDERS", "PULL"); pending.ShouldBe(3); } // Go: TestJetStreamWorkQueueNakRedelivery server/jetstream_test.go:2311 // After ack all, pending count drops to zero. [Fact] public async Task Ack_all_on_explicit_consumer_clears_all_pending() { await using var fx = await JetStreamApiFixture.StartWithAckExplicitConsumerAsync(30_000); for (var i = 0; i < 5; i++) _ = await fx.PublishAndGetAckAsync("orders.created", $"m{i}"); var batch = await fx.FetchAsync("ORDERS", "PULL", 5); batch.Messages.Count.ShouldBe(5); await fx.AckAllAsync("ORDERS", "PULL", batch.Messages[^1].Sequence); var pending = await fx.GetPendingCountAsync("ORDERS", "PULL"); pending.ShouldBe(0); } // Go: TestJetStreamAckAllRedelivery server/jetstream_test.go:1921 // Ack all up to sequence N leaves messages above N still pending. [Fact] public async Task Ack_all_up_to_mid_sequence_leaves_tail_pending() { await using var fx = await JetStreamApiFixture.StartWithAckAllConsumerAsync(); for (var i = 0; i < 6; i++) _ = await fx.PublishAndGetAckAsync("orders.created", $"m{i}"); var batch = await fx.FetchAsync("ORDERS", "ACKALL", 6); batch.Messages.Count.ShouldBe(6); // Ack messages 1-3 only await fx.AckAllAsync("ORDERS", "ACKALL", batch.Messages[2].Sequence); var pending = await fx.GetPendingCountAsync("ORDERS", "ACKALL"); // Messages 4, 5, 6 should still be pending pending.ShouldBeGreaterThan(0); pending.ShouldBeLessThanOrEqualTo(3); } // Go: TestJetStreamPushConsumerIdleHeartbeats server/jetstream_test.go:5804 // Push consumer with heartbeats configured is created without error. [Fact] public async Task Push_consumer_with_heartbeats_is_created_successfully() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("HBT", "hbt.>"); var resp = await fx.CreateConsumerAsync("HBT", "PUSHH", "hbt.>", push: true, heartbeatMs: 100); resp.Error.ShouldBeNull(); resp.ConsumerInfo!.Config.HeartbeatMs.ShouldBe(100); resp.ConsumerInfo.Config.Push.ShouldBeTrue(); } // Go: TestJetStreamFlowControlRequiresHeartbeats server/jetstream_test.go:5784 // Flow control can be configured on push consumer alongside heartbeats. [Fact] public async Task Push_consumer_with_flow_control_config_is_accepted() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("FCHB", "fchb.>"); var resp = await fx.RequestLocalAsync( "$JS.API.CONSUMER.CREATE.FCHB.FC1", """{"durable_name":"FC1","filter_subject":"fchb.>","push":true,"heartbeat_ms":50,"flow_control":true}"""); resp.Error.ShouldBeNull(); resp.ConsumerInfo!.Config.Push.ShouldBeTrue(); } // Go: TestJetStreamActiveDelivery server/jetstream_test.go:3726 // Push consumer receives messages published after creation. [Fact] public async Task Push_consumer_receives_published_message() { await using var fx = await JetStreamApiFixture.StartWithPushConsumerAsync(); _ = await fx.PublishAndGetAckAsync("orders.created", "order-data"); var frame = await fx.ReadPushFrameAsync("ORDERS", "PUSH"); frame.IsData.ShouldBeTrue(); frame.Subject.ShouldBe("orders.created"); } // Go: TestJetStreamBasicDeliverSubject server/jetstream_test.go:844 // Push consumer heartbeat frame is emitted after data frame. [Fact] public async Task Push_consumer_emits_heartbeat_frame_after_data() { await using var fx = await JetStreamApiFixture.StartWithPushConsumerAsync(); _ = await fx.PublishAndGetAckAsync("orders.created", "first"); var dataFrame = await fx.ReadPushFrameAsync("ORDERS", "PUSH"); dataFrame.IsData.ShouldBeTrue(); var hbFrame = await fx.ReadPushFrameAsync("ORDERS", "PUSH"); hbFrame.IsHeartbeat.ShouldBeTrue(); } // Go: TestJetStreamPushConsumerFlowControl server/jetstream_test.go:5690 // Flow control frame follows data frame when enabled. [Fact] public async Task Push_consumer_with_fc_emits_fc_frame_after_data() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("PUSHFC", "pushfc.>"); _ = await fx.RequestLocalAsync( "$JS.API.CONSUMER.CREATE.PUSHFC.FCTEST", """{"durable_name":"FCTEST","filter_subject":"pushfc.>","push":true,"heartbeat_ms":10,"flow_control":true}"""); _ = await fx.PublishAndGetAckAsync("pushfc.msg", "data"); var dataFrame = await fx.ReadPushFrameAsync("PUSHFC", "FCTEST"); dataFrame.IsData.ShouldBeTrue(); var fcFrame = await fx.ReadPushFrameAsync("PUSHFC", "FCTEST"); fcFrame.IsFlowControl.ShouldBeTrue(); } // Go: TestJetStreamEphemeralConsumers server/jetstream_test.go:3781 // Ephemeral consumer is created with generated durable name. [Fact] public async Task Ephemeral_consumer_gets_generated_name() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("EPH", "eph.>"); var resp = await fx.CreateConsumerAsync("EPH", "EPHNAME", "eph.>", ephemeral: true); resp.Error.ShouldBeNull(); resp.ConsumerInfo.ShouldNotBeNull(); } // Go: TestJetStreamWorkQueueMaxWaiting server/jetstream_test.go:1094 // Pull consumer fetch with no_wait returns immediately with available messages. [Fact] public async Task Fetch_no_wait_returns_available_messages_immediately() { await using var fx = await JetStreamApiFixture.StartWithPullConsumerAsync(); _ = await fx.PublishAndGetAckAsync("orders.created", "msg1"); _ = await fx.PublishAndGetAckAsync("orders.created", "msg2"); var batch = await fx.FetchWithNoWaitAsync("ORDERS", "PULL", 10); batch.Messages.Count.ShouldBe(2); } // Go: TestJetStreamWorkQueueMaxWaiting — fetch when empty returns zero [Fact] public async Task Fetch_no_wait_returns_empty_when_no_messages() { await using var fx = await JetStreamApiFixture.StartWithPullConsumerAsync(); var batch = await fx.FetchWithNoWaitAsync("ORDERS", "PULL", 10); batch.Messages.Count.ShouldBe(0); } // Go: TestJetStreamWorkQueueAckAndNext server/jetstream_test.go:1634 // Fetching after acking gives next available messages. [Fact] public async Task Fetch_after_ack_all_returns_next_messages() { await using var fx = await JetStreamApiFixture.StartWithAckAllConsumerAsync(); _ = await fx.PublishAndGetAckAsync("orders.created", "msg1"); _ = await fx.PublishAndGetAckAsync("orders.created", "msg2"); var batch1 = await fx.FetchAsync("ORDERS", "ACKALL", 1); batch1.Messages.Count.ShouldBe(1); await fx.AckAllAsync("ORDERS", "ACKALL", batch1.Messages[0].Sequence); var batch2 = await fx.FetchAsync("ORDERS", "ACKALL", 1); batch2.Messages.Count.ShouldBe(1); batch2.Messages[0].Sequence.ShouldBeGreaterThan(batch1.Messages[0].Sequence); } // Go: TestJetStreamRedeliverCount server/jetstream_test.go:3959 // AckProcessor tracks pending count correctly per delivery. [Fact] public void Ack_processor_registers_and_clears_pending_entries() { var proc = new AckProcessor(); proc.Register(1, 30_000); proc.Register(2, 30_000); proc.Register(3, 30_000); proc.PendingCount.ShouldBe(3); proc.AckAll(2); proc.PendingCount.ShouldBe(1); // only seq 3 remains proc.AckAll(3); proc.PendingCount.ShouldBe(0); } // Go: TestJetStreamRedeliverCount — ack floor advances monotonically [Fact] public void Ack_processor_ack_floor_advances_after_ack_all() { var proc = new AckProcessor(); proc.Register(1, 30_000); proc.Register(2, 30_000); proc.Register(3, 30_000); proc.AckFloor.ShouldBe(0UL); proc.AckAll(2); proc.AckFloor.ShouldBe(2UL); proc.AckAll(3); proc.AckFloor.ShouldBe(3UL); } // Go: TestJetStreamWorkQueueAckWaitRedelivery — expired entry detected [Fact] public async Task Ack_processor_detects_expired_pending_entry() { var proc = new AckProcessor(); proc.Register(1, 20); // 20ms ack wait await Task.Delay(50); proc.TryGetExpired(out var seq, out _).ShouldBeTrue(); seq.ShouldBe(1UL); } // Go: TestJetStreamWorkQueueTerminateDelivery server/jetstream_test.go:2465 // Drop removes a pending entry from the processor. [Fact] public void Ack_processor_drop_removes_pending_entry() { var proc = new AckProcessor(); proc.Register(1, 30_000); proc.Register(2, 30_000); proc.Drop(1); proc.PendingCount.ShouldBe(1); } // Go: TestJetStreamPushConsumerIdleHeartbeatsWithFilterSubject server/jetstream_test.go:5864 // Push consumer with heartbeats and filter subject is created without error. [Fact] public async Task Push_consumer_with_heartbeats_and_filter_subject() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("HBFILT", "hbfilt.>"); var resp = await fx.CreateConsumerAsync( "HBFILT", "HBCONS", "hbfilt.orders", push: true, heartbeatMs: 100); resp.Error.ShouldBeNull(); resp.ConsumerInfo!.Config.FilterSubject.ShouldBe("hbfilt.orders"); resp.ConsumerInfo.Config.HeartbeatMs.ShouldBe(100); } // Go: TestJetStreamAckNext server/jetstream_test.go:2565 // Consumer advances sequence correctly after each fetch. [Fact] public async Task Consumer_sequence_advances_with_each_fetch() { await using var fx = await JetStreamApiFixture.StartWithPullConsumerAsync(); for (var i = 0; i < 5; i++) _ = await fx.PublishAndGetAckAsync("orders.created", $"msg-{i}"); var seqs = new List(); for (var i = 0; i < 5; i++) { var batch = await fx.FetchAsync("ORDERS", "PULL", 1); batch.Messages.Count.ShouldBe(1); seqs.Add(batch.Messages[0].Sequence); } seqs.ShouldBeInOrder(); seqs.Distinct().Count().ShouldBe(5); // all unique sequences } // Go: TestJetStreamWorkQueueAckWaitRedelivery — schedule redelivery increases delivery count [Fact] public void Ack_processor_schedule_redelivery_increments_delivery_count() { var proc = new AckProcessor(); proc.Register(1, 30_000); proc.ScheduleRedelivery(1, 30_000); // After rescheduling, pending is still 1 proc.PendingCount.ShouldBe(1); } // Go: TestJetStreamWorkQueueRequest server/jetstream_test.go:1267 // Fetch batch respects count limit. [Fact] public async Task Fetch_batch_respects_count_limit() { await using var fx = await JetStreamApiFixture.StartWithPullConsumerAsync(); for (var i = 0; i < 10; i++) _ = await fx.PublishAndGetAckAsync("orders.created", $"data-{i}"); var batch = await fx.FetchAsync("ORDERS", "PULL", 3); batch.Messages.Count.ShouldBe(3); } // Go: TestJetStreamSubjectFiltering server/jetstream_test.go:1385 // Consumer with filter only delivers matching messages. [Fact] public async Task Consumer_filter_delivers_only_matching_messages() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("FILTDEL", "filtdel.>"); _ = await fx.CreateConsumerAsync("FILTDEL", "FILTCONS", "filtdel.orders"); _ = await fx.PublishAndGetAckAsync("filtdel.orders", "order-1"); _ = await fx.PublishAndGetAckAsync("filtdel.events", "event-1"); _ = await fx.PublishAndGetAckAsync("filtdel.orders", "order-2"); var batch = await fx.FetchAsync("FILTDEL", "FILTCONS", 10); batch.Messages.Count.ShouldBe(2); batch.Messages.All(m => m.Subject == "filtdel.orders").ShouldBeTrue(); } // Go: TestJetStreamWildcardSubjectFiltering server/jetstream_test.go:1522 // Consumer with wildcard filter delivers only matching messages. [Fact] public async Task Consumer_wildcard_filter_delivers_matching_messages() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("WCFILT", "wcfilt.>"); _ = await fx.CreateConsumerAsync("WCFILT", "WCC", "wcfilt.orders.*"); _ = await fx.PublishAndGetAckAsync("wcfilt.orders.created", "1"); _ = await fx.PublishAndGetAckAsync("wcfilt.events.logged", "2"); _ = await fx.PublishAndGetAckAsync("wcfilt.orders.shipped", "3"); var batch = await fx.FetchAsync("WCFILT", "WCC", 10); batch.Messages.Count.ShouldBe(2); } // Go: TestJetStreamWorkQueueRequestBatch server/jetstream_test.go:1703 // Batch fetch returns all available up to limit. [Fact] public async Task Batch_fetch_returns_all_available_messages_up_to_limit() { await using var fx = await JetStreamApiFixture.StartWithStreamAsync("BATCHFULL", "batchfull.>"); _ = await fx.CreateConsumerAsync("BATCHFULL", "BC", "batchfull.>"); for (var i = 0; i < 7; i++) _ = await fx.PublishAndGetAckAsync("batchfull.x", $"msg-{i}"); var batch = await fx.FetchAsync("BATCHFULL", "BC", 10); batch.Messages.Count.ShouldBe(7); } // Go: TestJetStreamWorkQueueRetentionStream server/jetstream_test.go:1788 // Pull consumer on work queue stream receives messages. [Fact] public async Task Work_queue_pull_consumer_receives_messages() { await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig { Name = "WQR", Subjects = ["wqr.>"], Retention = RetentionPolicy.WorkQueue, }); _ = await fx.CreateConsumerAsync("WQR", "WQC", "wqr.>"); _ = await fx.PublishAndGetAckAsync("wqr.task", "task1"); _ = await fx.PublishAndGetAckAsync("wqr.task", "task2"); var batch = await fx.FetchAsync("WQR", "WQC", 5); batch.Messages.Count.ShouldBe(2); } }