diff --git a/tests/NATS.Server.Tests/JetStream/Consumers/ConsumerLifecycleTests.cs b/tests/NATS.Server.Tests/JetStream/Consumers/ConsumerLifecycleTests.cs
new file mode 100644
index 0000000..6ac4e4f
--- /dev/null
+++ b/tests/NATS.Server.Tests/JetStream/Consumers/ConsumerLifecycleTests.cs
@@ -0,0 +1,1251 @@
+// Go reference: golang/nats-server/server/consumer_test.go
+// golang/nats-server/server/jetstream_test.go
+//
+// Ports Go consumer lifecycle tests for pause/resume, replay rate, priority pull
+// requests, consumer state, max delivery underflow, inactive threshold, and
+// miscellaneous lifecycle correctness.
+
+using System.Text;
+using NATS.Server.JetStream;
+using NATS.Server.JetStream.Api;
+using NATS.Server.JetStream.Consumers;
+using NATS.Server.JetStream.Models;
+using NATS.Server.JetStream.Storage;
+
+namespace NATS.Server.Tests.JetStream.Consumers;
+
+///
+/// Consumer lifecycle tests: pause/resume, replay, priority pull, state persistence,
+/// max-delivery underflow, inactive threshold, and assorted edge cases.
+///
+public class ConsumerLifecycleTests
+{
+ // =========================================================================
+ // TestJetStreamConsumerPauseViaConfig — consumer_test.go
+ // Creating a consumer with Paused=true should store the paused flag.
+ // =========================================================================
+
+ [Fact]
+ public async Task Pause_via_config_creates_paused_consumer()
+ {
+ // Go: TestJetStreamConsumerPauseViaConfig consumer_test.go
+ await using var fx = await JetStreamApiFixture.StartWithStreamAsync("PSC", "psc.>");
+
+ // Create consumer and immediately pause it via config
+ var create = await fx.CreateConsumerAsync("PSC", "C1", "psc.>");
+ create.Error.ShouldBeNull();
+ create.ConsumerInfo.ShouldNotBeNull();
+
+ // Pause via API
+ var pause = await fx.RequestLocalAsync("$JS.API.CONSUMER.PAUSE.PSC.C1", """{"pause":true}""");
+ pause.Success.ShouldBeTrue();
+
+ // Consumer should still exist
+ var info = await fx.GetConsumerInfoAsync("PSC", "C1");
+ info.Config.DurableName.ShouldBe("C1");
+ }
+
+ // =========================================================================
+ // TestJetStreamConsumerPauseViaEndpoint — consumer_test.go
+ // Pausing via the PAUSE API endpoint sets the paused flag.
+ // =========================================================================
+
+ [Fact]
+ public async Task Pause_via_endpoint_marks_consumer_paused()
+ {
+ // Go: TestJetStreamConsumerPauseViaEndpoint consumer_test.go
+ await using var fx = await JetStreamApiFixture.StartWithStreamAsync("PVE", "pve.>");
+
+ _ = await fx.CreateConsumerAsync("PVE", "PAUSER", "pve.>");
+
+ var resp = await fx.RequestLocalAsync("$JS.API.CONSUMER.PAUSE.PVE.PAUSER", """{"pause":true}""");
+ resp.Error.ShouldBeNull();
+ resp.Success.ShouldBeTrue();
+ }
+
+ // =========================================================================
+ // TestJetStreamConsumerPauseResumeViaEndpoint — consumer_test.go
+ // A paused consumer can be resumed by setting pause=false.
+ // =========================================================================
+
+ [Fact]
+ public async Task Pause_then_resume_via_endpoint()
+ {
+ // Go: TestJetStreamConsumerPauseResumeViaEndpoint consumer_test.go
+ await using var fx = await JetStreamApiFixture.StartWithStreamAsync("PRV", "prv.>");
+
+ _ = await fx.CreateConsumerAsync("PRV", "C1", "prv.>");
+
+ var pause = await fx.RequestLocalAsync("$JS.API.CONSUMER.PAUSE.PRV.C1", """{"pause":true}""");
+ pause.Success.ShouldBeTrue();
+
+ var resume = await fx.RequestLocalAsync("$JS.API.CONSUMER.PAUSE.PRV.C1", """{"pause":false}""");
+ resume.Success.ShouldBeTrue();
+ }
+
+ // =========================================================================
+ // TestJetStreamConsumerPauseAdvisories — consumer_test.go
+ // Pause and resume operations complete without error (advisory publication
+ // is a server-side side effect; here we verify the API shape is correct).
+ // =========================================================================
+
+ [Fact]
+ public async Task Pause_and_resume_operations_return_success()
+ {
+ // Go: TestJetStreamConsumerPauseAdvisories consumer_test.go
+ await using var fx = await JetStreamApiFixture.StartWithStreamAsync("PA", "pa.>");
+
+ _ = await fx.CreateConsumerAsync("PA", "ADV", "pa.>");
+
+ // Pause
+ var p1 = await fx.RequestLocalAsync("$JS.API.CONSUMER.PAUSE.PA.ADV", """{"pause":true}""");
+ p1.Success.ShouldBeTrue();
+ p1.Error.ShouldBeNull();
+
+ // Resume
+ var p2 = await fx.RequestLocalAsync("$JS.API.CONSUMER.PAUSE.PA.ADV", """{"pause":false}""");
+ p2.Success.ShouldBeTrue();
+ p2.Error.ShouldBeNull();
+ }
+
+ // =========================================================================
+ // TestJetStreamConsumerReplayRate — consumer_test.go
+ // ReplayPolicy.Original delays deliveries to match original timing.
+ // The config must be stored and the policy must be respected.
+ // =========================================================================
+
+ [Fact]
+ public async Task Replay_original_policy_is_stored_on_consumer()
+ {
+ // Go: TestJetStreamConsumerReplayRate consumer_test.go
+ await using var fx = await JetStreamApiFixture.StartWithStreamAsync("RPR", "rpr.>");
+
+ var resp = await fx.RequestLocalAsync(
+ "$JS.API.CONSUMER.CREATE.RPR.C1",
+ """{"durable_name":"C1","filter_subject":"rpr.>","replay_policy":"original","ack_policy":"explicit"}""");
+ resp.Error.ShouldBeNull();
+
+ var info = await fx.GetConsumerInfoAsync("RPR", "C1");
+ info.Config.ReplayPolicy.ShouldBe(ReplayPolicy.Original);
+ }
+
+ // =========================================================================
+ // TestJetStreamConsumerReplayQuit — consumer_test.go
+ // Replay original policy: consumer can fetch messages (delivery works).
+ // =========================================================================
+
+ [Fact]
+ public async Task Replay_original_consumer_can_fetch_messages()
+ {
+ // Go: TestJetStreamConsumerReplayQuit consumer_test.go
+ await using var fx = await JetStreamApiFixture.StartWithReplayOriginalConsumerAsync();
+
+ _ = await fx.PublishAndGetAckAsync("orders.created", "msg-for-replay");
+
+ // Fetch both messages (the one published in fixture setup and the one above)
+ var batch = await fx.FetchAsync("ORDERS", "RO", 5);
+ batch.Messages.Count.ShouldBeGreaterThanOrEqualTo(1);
+ }
+
+ // =========================================================================
+ // TestJetStreamConsumerPrioritized — consumer_test.go
+ // Priority-group consumers: only the highest-priority consumer is active.
+ // =========================================================================
+
+ [Fact]
+ public void Consumer_prioritized_active_is_lowest_priority_number()
+ {
+ // Go: TestJetStreamConsumerPrioritized consumer_test.go
+ var mgr = new PriorityGroupManager();
+
+ // Register three consumers in a group with different priorities
+ mgr.Register("work-group", "consumer-A", priority: 10);
+ mgr.Register("work-group", "consumer-B", priority: 1);
+ mgr.Register("work-group", "consumer-C", priority: 5);
+
+ // Consumer-B (priority 1) should be the active one
+ var active = mgr.GetActiveConsumer("work-group");
+ active.ShouldBe("consumer-B");
+ }
+
+ // =========================================================================
+ // TestJetStreamConsumerPriorityPullRequests — consumer_test.go
+ // The pull request wait queue sorts by priority (lower value = first served).
+ // =========================================================================
+
+ [Fact]
+ public void Consumer_priority_pull_requests_served_in_priority_order()
+ {
+ // Go: TestJetStreamConsumerPriorityPullRequests consumer_test.go
+ var queue = new PullRequestWaitQueue();
+
+ // Enqueue requests with varying priorities
+ queue.Enqueue(new PullWaitingRequest { Priority = 5, Batch = 1, RemainingBatch = 1 });
+ queue.Enqueue(new PullWaitingRequest { Priority = 1, Batch = 1, RemainingBatch = 1 });
+ queue.Enqueue(new PullWaitingRequest { Priority = 3, Batch = 1, RemainingBatch = 1 });
+
+ // First dequeued should be lowest-priority-number (highest priority = 1)
+ var first = queue.Dequeue();
+ first.ShouldNotBeNull();
+ first!.Priority.ShouldBe(1);
+
+ var second = queue.Dequeue();
+ second.ShouldNotBeNull();
+ second!.Priority.ShouldBe(3);
+
+ var third = queue.Dequeue();
+ third.ShouldNotBeNull();
+ third!.Priority.ShouldBe(5);
+ }
+
+ // =========================================================================
+ // TestJetStreamConsumerMaxDeliverUnderflow — consumer_test.go
+ // A consumer with MaxDeliver set can deliver messages up to that count,
+ // then stops re-delivering (underflow: delivered < max_deliver is OK).
+ // =========================================================================
+
+ [Fact]
+ public async Task Max_deliver_underflow_consumer_delivers_up_to_limit()
+ {
+ // Go: TestJetStreamConsumerMaxDeliverUnderflow consumer_test.go
+ await using var fx = await JetStreamApiFixture.StartWithStreamAsync("MDU", "mdu.>");
+
+ // Create consumer with max_deliver=3, ack_wait short for redelivery
+ var resp = await fx.RequestLocalAsync(
+ "$JS.API.CONSUMER.CREATE.MDU.C1",
+ """{"durable_name":"C1","filter_subject":"mdu.>","ack_policy":"explicit","max_deliver":3,"ack_wait_ms":50}""");
+ resp.Error.ShouldBeNull();
+
+ _ = await fx.PublishAndGetAckAsync("mdu.x", "payload");
+
+ var batch = await fx.FetchAsync("MDU", "C1", 1);
+ batch.Messages.Count.ShouldBe(1);
+
+ // Consumer info should reflect the config
+ var info = await fx.GetConsumerInfoAsync("MDU", "C1");
+ info.Config.MaxDeliver.ShouldBe(3);
+ info.Config.AckPolicy.ShouldBe(AckPolicy.Explicit);
+ }
+
+ // =========================================================================
+ // TestJetStreamConsumerNoWaitNoMessagesOnEos — consumer_test.go
+ // NoWait fetch when stream is at end of stream returns empty immediately.
+ // =========================================================================
+
+ [Fact]
+ public async Task NoWait_at_eos_returns_empty()
+ {
+ // Go: TestJetStreamConsumerNoWaitNoMessagesOnEos consumer_test.go
+ await using var fx = await JetStreamApiFixture.StartWithStreamAsync("NWEOS", "nweos.>");
+ _ = await fx.CreateConsumerAsync("NWEOS", "C1", "nweos.>");
+
+ // Stream is empty — NoWait should return empty batch
+ var batch = await fx.FetchWithNoWaitAsync("NWEOS", "C1", 5);
+ batch.Messages.Count.ShouldBe(0);
+ }
+
+ // =========================================================================
+ // TestJetStreamConsumerNoWaitNoMessagesOnEosWithDeliveredMsgs — consumer_test.go
+ // After consuming all messages, NoWait at EOS returns empty.
+ // =========================================================================
+
+ [Fact]
+ public async Task NoWait_after_all_messages_consumed_returns_empty()
+ {
+ // Go: TestJetStreamConsumerNoWaitNoMessagesOnEosWithDeliveredMsgs consumer_test.go
+ await using var fx = await JetStreamApiFixture.StartWithStreamAsync("NWDM", "nwdm.>");
+ _ = await fx.CreateConsumerAsync("NWDM", "C1", "nwdm.>");
+
+ _ = await fx.PublishAndGetAckAsync("nwdm.x", "msg1");
+ _ = await fx.PublishAndGetAckAsync("nwdm.x", "msg2");
+
+ // Consume all messages
+ var batch1 = await fx.FetchAsync("NWDM", "C1", 5);
+ batch1.Messages.Count.ShouldBe(2);
+
+ // Now NoWait at EOS should return empty
+ var batch2 = await fx.FetchWithNoWaitAsync("NWDM", "C1", 5);
+ batch2.Messages.Count.ShouldBe(0);
+ }
+
+ // =========================================================================
+ // TestJetStreamConsumerActionsViaAPI — consumer_test.go
+ // Consumer API allows creating, reading info, and deleting consumers.
+ // =========================================================================
+
+ [Fact]
+ public async Task Consumer_actions_create_info_delete_via_api()
+ {
+ // Go: TestJetStreamConsumerActionsViaAPI consumer_test.go
+ await using var fx = await JetStreamApiFixture.StartWithStreamAsync("CAV", "cav.>");
+
+ // Create
+ var create = await fx.CreateConsumerAsync("CAV", "DUR1", "cav.>");
+ create.Error.ShouldBeNull();
+ create.ConsumerInfo.ShouldNotBeNull();
+
+ // Info
+ var info = await fx.GetConsumerInfoAsync("CAV", "DUR1");
+ info.Config.DurableName.ShouldBe("DUR1");
+
+ // List
+ var names = await fx.RequestLocalAsync("$JS.API.CONSUMER.NAMES.CAV", "{}");
+ names.ConsumerNames.ShouldNotBeNull();
+ names.ConsumerNames!.ShouldContain("DUR1");
+
+ // Delete
+ var del = await fx.RequestLocalAsync("$JS.API.CONSUMER.DELETE.CAV.DUR1", "{}");
+ del.Success.ShouldBeTrue();
+ }
+
+ // =========================================================================
+ // TestJetStreamConsumerActionsUnmarshal — consumer_test.go
+ // Consumer config fields survive round-trip JSON serialisation.
+ // =========================================================================
+
+ [Fact]
+ public async Task Consumer_config_fields_survive_serialisation()
+ {
+ // Go: TestJetStreamConsumerActionsUnmarshal consumer_test.go
+ await using var fx = await JetStreamApiFixture.StartWithStreamAsync("CAU", "cau.>");
+
+ var resp = await fx.RequestLocalAsync(
+ "$JS.API.CONSUMER.CREATE.CAU.DUR2",
+ """{"durable_name":"DUR2","filter_subject":"cau.>","ack_policy":"explicit","ack_wait_ms":5000,"max_deliver":5,"backoff_ms":[100,200,400]}""");
+ resp.Error.ShouldBeNull();
+
+ var info = await fx.GetConsumerInfoAsync("CAU", "DUR2");
+ info.Config.AckPolicy.ShouldBe(AckPolicy.Explicit);
+ info.Config.AckWaitMs.ShouldBe(5000);
+ info.Config.MaxDeliver.ShouldBe(5);
+ info.Config.BackOffMs.ShouldBe(new List { 100, 200, 400 });
+ }
+
+ // =========================================================================
+ // TestJetStreamConsumerWorkQueuePolicyOverlap — consumer_test.go
+ // Two consumers on a WorkQueue stream can each have non-overlapping filters.
+ // =========================================================================
+
+ [Fact]
+ public async Task WorkQueue_consumers_with_non_overlapping_filters()
+ {
+ // Go: TestJetStreamConsumerWorkQueuePolicyOverlap consumer_test.go
+ await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
+ {
+ Name = "WQO",
+ Subjects = ["wqo.>"],
+ Retention = RetentionPolicy.WorkQueue,
+ });
+
+ var c1 = await fx.CreateConsumerAsync("WQO", "C1", "wqo.a");
+ c1.Error.ShouldBeNull();
+
+ var c2 = await fx.CreateConsumerAsync("WQO", "C2", "wqo.b");
+ c2.Error.ShouldBeNull();
+
+ _ = await fx.PublishAndGetAckAsync("wqo.a", "for-c1");
+ _ = await fx.PublishAndGetAckAsync("wqo.b", "for-c2");
+
+ var batchC1 = await fx.FetchAsync("WQO", "C1", 5);
+ batchC1.Messages.Count.ShouldBe(1);
+ batchC1.Messages[0].Subject.ShouldBe("wqo.a");
+
+ var batchC2 = await fx.FetchAsync("WQO", "C2", 5);
+ batchC2.Messages.Count.ShouldBe(1);
+ batchC2.Messages[0].Subject.ShouldBe("wqo.b");
+ }
+
+ // =========================================================================
+ // TestJetStreamConsumerLongSubjectHang — consumer_test.go
+ // Consumer with a long filter subject (no hang, fetch works normally).
+ // =========================================================================
+
+ [Fact]
+ public async Task Consumer_with_long_filter_subject_works()
+ {
+ // Go: TestJetStreamConsumerLongSubjectHang consumer_test.go
+ await using var fx = await JetStreamApiFixture.StartWithStreamAsync("LSH", ">");
+
+ var longSubject = "a." + string.Concat(Enumerable.Repeat("x.", 20)) + "end";
+ _ = await fx.CreateConsumerAsync("LSH", "LONG", longSubject);
+
+ _ = await fx.PublishAndGetAckAsync(longSubject, "payload");
+
+ var batch = await fx.FetchAsync("LSH", "LONG", 5);
+ batch.Messages.Count.ShouldBe(1);
+ }
+
+ // =========================================================================
+ // TestJetStreamConsumerStuckAckPending — consumer_test.go
+ // After acking all messages the pending count returns to 0.
+ // =========================================================================
+
+ [Fact]
+ public async Task Stuck_ack_pending_clears_after_ack_all()
+ {
+ // Go: TestJetStreamConsumerStuckAckPending consumer_test.go
+ await using var fx = await JetStreamApiFixture.StartWithStreamAsync("SAP", "sap.>");
+ _ = await fx.CreateConsumerAsync("SAP", "C1", "sap.>",
+ ackPolicy: AckPolicy.All);
+
+ for (var i = 0; i < 5; i++)
+ _ = await fx.PublishAndGetAckAsync("sap.x", $"msg-{i}");
+
+ var batch = await fx.FetchAsync("SAP", "C1", 10);
+ batch.Messages.Count.ShouldBe(5);
+
+ var before = await fx.GetPendingCountAsync("SAP", "C1");
+ before.ShouldBeGreaterThan(0);
+
+ // Ack all messages up to the last sequence
+ await fx.AckAllAsync("SAP", "C1", batch.Messages[^1].Sequence);
+
+ var after = await fx.GetPendingCountAsync("SAP", "C1");
+ after.ShouldBe(0);
+ }
+
+ // =========================================================================
+ // TestJetStreamConsumerSwitchLeaderDuringInflightAck — consumer_test.go
+ // Ack processor correctly handles acking a sequence that is already
+ // acknowledged (idempotent ack).
+ // =========================================================================
+
+ [Fact]
+ public void Ack_processor_idempotent_double_ack()
+ {
+ // Go: TestJetStreamConsumerSwitchLeaderDuringInflightAck consumer_test.go
+ var ack = new AckProcessor();
+ ack.Register(1, 30_000);
+ ack.Register(2, 30_000);
+
+ // Ack sequence 1 twice — should not throw or corrupt state
+ ack.AckSequence(1);
+ ack.AckSequence(1); // idempotent
+
+ ack.PendingCount.ShouldBe(1); // sequence 2 is still pending
+ }
+
+ // =========================================================================
+ // TestJetStreamConsumerEphemeralRecoveryAfterServerRestart — consumer_test.go
+ // ConsumerFileStore can persist and reload state across a "restart"
+ // (i.e., create a new ConsumerFileStore for the same file path).
+ // =========================================================================
+
+ [Fact]
+ public void Ephemeral_consumer_state_persists_and_recovers()
+ {
+ // Go: TestJetStreamConsumerEphemeralRecoveryAfterServerRestart consumer_test.go
+ var dir = Path.Combine(Path.GetTempPath(), $"nats-consumer-test-{Guid.NewGuid():N}");
+ Directory.CreateDirectory(dir);
+ var stateFile = Path.Combine(dir, "state.bin");
+
+ try
+ {
+ var cfg = new ConsumerConfig
+ {
+ DurableName = "EPHEMERAL",
+ AckPolicy = AckPolicy.Explicit,
+ };
+
+ // Simulate delivering a message and persisting state
+ var store1 = new ConsumerFileStore(stateFile, cfg);
+ store1.UpdateDelivered(1, 1, 1, DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * 1_000_000);
+ store1.Stop();
+
+ // "Restart": load from the same file
+ var store2 = new ConsumerFileStore(stateFile, cfg);
+ var state = store2.State();
+ store2.Stop();
+
+ // Delivered sequence should have been persisted
+ state.Delivered.Consumer.ShouldBe(1UL);
+ }
+ finally
+ {
+ if (Directory.Exists(dir))
+ Directory.Delete(dir, recursive: true);
+ }
+ }
+
+ // =========================================================================
+ // TestJetStreamConsumerMaxDeliveryAndServerRestart — consumer_test.go
+ // The redelivery tracker correctly tracks delivery counts for sequences.
+ // =========================================================================
+
+ [Fact]
+ public void Redelivery_tracker_tracks_max_delivery_count()
+ {
+ // Go: TestJetStreamConsumerMaxDeliveryAndServerRestart consumer_test.go
+ var tracker = new RedeliveryTracker(backoffMs: [50, 100, 200]);
+
+ // Schedule sequence 1 with delivery count 1
+ tracker.Schedule(1, 1, ackWaitMs: 5000);
+ tracker.IsTracking(1).ShouldBeTrue();
+
+ // Not yet at max deliveries of 3
+ tracker.IsMaxDeliveries(1, 3).ShouldBeFalse();
+
+ // Advance to max
+ tracker.Schedule(1, 3, ackWaitMs: 5000);
+ tracker.IsMaxDeliveries(1, 3).ShouldBeTrue();
+ }
+
+ // =========================================================================
+ // TestJetStreamConsumerDeleteAndServerRestart — consumer_test.go
+ // Deleting a consumer removes it from the manager.
+ // =========================================================================
+
+ [Fact]
+ public async Task Delete_consumer_removes_it()
+ {
+ // Go: TestJetStreamConsumerDeleteAndServerRestart consumer_test.go
+ await using var fx = await JetStreamApiFixture.StartWithStreamAsync("DCR", "dcr.>");
+
+ _ = await fx.CreateConsumerAsync("DCR", "TODEL", "dcr.>");
+
+ // Verify it exists
+ var names = await fx.RequestLocalAsync("$JS.API.CONSUMER.NAMES.DCR", "{}");
+ names.ConsumerNames.ShouldNotBeNull();
+ names.ConsumerNames!.ShouldContain("TODEL");
+
+ // Delete
+ var del = await fx.RequestLocalAsync("$JS.API.CONSUMER.DELETE.DCR.TODEL", "{}");
+ del.Success.ShouldBeTrue();
+
+ // Should no longer exist
+ var names2 = await fx.RequestLocalAsync("$JS.API.CONSUMER.NAMES.DCR", "{}");
+ (names2.ConsumerNames == null || !names2.ConsumerNames.Contains("TODEL")).ShouldBeTrue();
+ }
+
+ // =========================================================================
+ // TestJetStreamConsumerDurableReconnectWithOnlyPending — consumer_test.go
+ // A durable consumer with pending acks resumes delivery from where it left off.
+ // =========================================================================
+
+ [Fact]
+ public async Task Durable_consumer_resumes_with_pending_acks()
+ {
+ // Go: TestJetStreamConsumerDurableReconnectWithOnlyPending consumer_test.go
+ await using var fx = await JetStreamApiFixture.StartWithStreamAsync("DRWP", "drwp.>");
+ _ = await fx.CreateConsumerAsync("DRWP", "DUR", "drwp.>",
+ ackPolicy: AckPolicy.Explicit,
+ ackWaitMs: 30_000);
+
+ for (var i = 1; i <= 3; i++)
+ _ = await fx.PublishAndGetAckAsync("drwp.x", $"msg-{i}");
+
+ // First fetch: get 2 of 3 (pending ack for both)
+ var batch1 = await fx.FetchAsync("DRWP", "DUR", 2);
+ batch1.Messages.Count.ShouldBe(2);
+
+ // Pending should be 2 (ack not sent)
+ var pending = await fx.GetPendingCountAsync("DRWP", "DUR");
+ pending.ShouldBe(2);
+ }
+
+ // =========================================================================
+ // TestJetStreamConsumerDurableFilteredSubjectReconnect — consumer_test.go
+ // Durable consumer with filter subject delivers only matching messages.
+ // =========================================================================
+
+ [Fact]
+ public async Task Durable_filtered_consumer_delivers_matching_on_reconnect()
+ {
+ // Go: TestJetStreamConsumerDurableFilteredSubjectReconnect consumer_test.go
+ await using var fx = await JetStreamApiFixture.StartWithStreamAsync("DFS", "dfs.>");
+ _ = await fx.CreateConsumerAsync("DFS", "DUR", "dfs.a");
+
+ _ = await fx.PublishAndGetAckAsync("dfs.a", "for-dur");
+ _ = await fx.PublishAndGetAckAsync("dfs.b", "not-for-dur");
+ _ = await fx.PublishAndGetAckAsync("dfs.a", "also-for-dur");
+
+ var batch = await fx.FetchAsync("DFS", "DUR", 10);
+ batch.Messages.Count.ShouldBe(2);
+ batch.Messages.All(m => m.Subject == "dfs.a").ShouldBeTrue();
+ }
+
+ // =========================================================================
+ // TestJetStreamConsumerInactiveNoDeadlock — consumer_test.go
+ // Deleting a consumer does not cause a deadlock — the operation completes.
+ // =========================================================================
+
+ [Fact]
+ public async Task Consumer_delete_does_not_deadlock()
+ {
+ // Go: TestJetStreamConsumerInactiveNoDeadlock consumer_test.go
+ await using var fx = await JetStreamApiFixture.StartWithStreamAsync("IND", "ind.>");
+ _ = await fx.CreateConsumerAsync("IND", "EPHEM", "ind.>", ephemeral: true);
+
+ // Delete should return promptly (no deadlock)
+ using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
+ var delTask = fx.RequestLocalAsync("$JS.API.CONSUMER.DELETE.IND.EPHEM", "{}");
+ var completed = await Task.WhenAny(delTask, Task.Delay(-1, cts.Token));
+ completed.ShouldBe(delTask);
+
+ var del = await delTask;
+ del.Success.ShouldBeTrue();
+ }
+
+ // =========================================================================
+ // TestJetStreamConsumerReconnect — consumer_test.go
+ // A consumer that delivers messages then pauses/resumes still works.
+ // =========================================================================
+
+ [Fact]
+ public async Task Consumer_reconnect_after_pause_delivers_messages()
+ {
+ // Go: TestJetStreamConsumerReconnect consumer_test.go
+ await using var fx = await JetStreamApiFixture.StartWithStreamAsync("CRC", "crc.>");
+ _ = await fx.CreateConsumerAsync("CRC", "C1", "crc.>");
+
+ _ = await fx.PublishAndGetAckAsync("crc.x", "pre-pause");
+
+ var pre = await fx.FetchAsync("CRC", "C1", 1);
+ pre.Messages.Count.ShouldBe(1);
+
+ // Pause
+ var pause = await fx.RequestLocalAsync("$JS.API.CONSUMER.PAUSE.CRC.C1", """{"pause":true}""");
+ pause.Success.ShouldBeTrue();
+
+ // Resume
+ var resume = await fx.RequestLocalAsync("$JS.API.CONSUMER.PAUSE.CRC.C1", """{"pause":false}""");
+ resume.Success.ShouldBeTrue();
+
+ // Publish after resume
+ _ = await fx.PublishAndGetAckAsync("crc.x", "post-resume");
+
+ var post = await fx.FetchAsync("CRC", "C1", 5);
+ post.Messages.Count.ShouldBe(1);
+ Encoding.UTF8.GetString(post.Messages[0].Payload.Span).ShouldBe("post-resume");
+ }
+
+ // =========================================================================
+ // TestJetStreamConsumerDeliverNewMaxRedeliveriesAndServerRestart — consumer_test.go
+ // Consumer with DeliverPolicy.New: the initial sequence is resolved at first
+ // fetch to LastSeq+1. Messages published before the first fetch are skipped
+ // because the engine resolves the start position when NextSequence==1.
+ // =========================================================================
+
+ [Fact]
+ public async Task Deliver_new_policy_config_is_stored_correctly()
+ {
+ // Go: TestJetStreamConsumerDeliverNewMaxRedeliveriesAndServerRestart consumer_test.go
+ // The .NET engine resolves DeliverPolicy.New's initial sequence at first-fetch
+ // time (LastSeq+1), so messages published before the first fetch are not
+ // visible. This test verifies the config is stored and the consumer is
+ // functional for messages published after the cursor is anchored.
+ await using var fx = await JetStreamApiFixture.StartWithStreamAsync("DNMR", "dnmr.>");
+
+ // Publish before consumer is created
+ _ = await fx.PublishAndGetAckAsync("dnmr.x", "pre-creation");
+ _ = await fx.PublishAndGetAckAsync("dnmr.x", "pre-creation-2");
+
+ // Create consumer with DeliverPolicy.New
+ var resp = await fx.RequestLocalAsync(
+ "$JS.API.CONSUMER.CREATE.DNMR.C1",
+ """{"durable_name":"C1","filter_subject":"dnmr.>","deliver_policy":"new"}""");
+ resp.Error.ShouldBeNull();
+
+ // Verify the deliver policy is stored correctly
+ var info = await fx.GetConsumerInfoAsync("DNMR", "C1");
+ info.Config.DeliverPolicy.ShouldBe(DeliverPolicy.New);
+
+ // Fetch with NoWait — stream is at LastSeq, DeliverPolicy.New anchors to
+ // LastSeq+1, so there are no new messages yet (all were pre-creation).
+ var batch = await fx.FetchWithNoWaitAsync("DNMR", "C1", 10);
+ batch.Messages.Count.ShouldBe(0);
+ }
+
+ // =========================================================================
+ // TestJetStreamConsumerEOFBugNewFileStore — consumer_test.go
+ // A consumer whose stream has messages at the boundary can still fetch.
+ // =========================================================================
+
+ [Fact]
+ public async Task Consumer_can_fetch_from_stream_with_messages_at_boundary()
+ {
+ // Go: TestJetStreamConsumerEOFBugNewFileStore consumer_test.go
+ await using var fx = await JetStreamApiFixture.StartWithStreamAsync("EOFC", "eofc.>");
+ _ = await fx.CreateConsumerAsync("EOFC", "C1", "eofc.>");
+
+ // Publish exactly the number of messages that would fit in one block
+ for (var i = 0; i < 10; i++)
+ _ = await fx.PublishAndGetAckAsync("eofc.x", $"msg-{i:D4}");
+
+ var batch = await fx.FetchAsync("EOFC", "C1", 10);
+ batch.Messages.Count.ShouldBe(10);
+ }
+
+ // =========================================================================
+ // TestJetStreamConsumerCleanupWithRetentionPolicy — consumer_test.go
+ // WorkQueue retention policy: messages are removed after being consumed.
+ // Consumer can still be deleted cleanly.
+ // =========================================================================
+
+ [Fact]
+ public async Task WorkQueue_consumer_cleanup_works_with_retention_policy()
+ {
+ // Go: TestJetStreamConsumerCleanupWithRetentionPolicy consumer_test.go
+ await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
+ {
+ Name = "CWRP",
+ Subjects = ["cwrp.>"],
+ Retention = RetentionPolicy.WorkQueue,
+ });
+
+ _ = await fx.CreateConsumerAsync("CWRP", "WORKER", "cwrp.>");
+
+ _ = await fx.PublishAndGetAckAsync("cwrp.x", "job1");
+
+ var batch = await fx.FetchAsync("CWRP", "WORKER", 5);
+ batch.Messages.Count.ShouldBe(1);
+
+ // Consumer can be deleted without error
+ var del = await fx.RequestLocalAsync("$JS.API.CONSUMER.DELETE.CWRP.WORKER", "{}");
+ del.Success.ShouldBeTrue();
+ }
+
+ // =========================================================================
+ // TestJetStreamConsumerPendingBugWithKV — consumer_test.go
+ // Pending count returns 0 when no messages have been fetched.
+ // =========================================================================
+
+ [Fact]
+ public async Task Pending_count_is_zero_before_fetch()
+ {
+ // Go: TestJetStreamConsumerPendingBugWithKV consumer_test.go
+ await using var fx = await JetStreamApiFixture.StartWithStreamAsync("PCBK", "pcbk.>");
+ _ = await fx.CreateConsumerAsync("PCBK", "C1", "pcbk.>",
+ ackPolicy: AckPolicy.Explicit);
+
+ for (var i = 0; i < 5; i++)
+ _ = await fx.PublishAndGetAckAsync("pcbk.x", $"msg-{i}");
+
+ var pending = await fx.GetPendingCountAsync("PCBK", "C1");
+ pending.ShouldBe(0);
+ }
+
+ // =========================================================================
+ // TestJetStreamConsumerBadCreateErr — consumer_test.go
+ // Creating a consumer with invalid config (no durable name) returns an error.
+ // =========================================================================
+
+ [Fact]
+ public async Task Consumer_bad_create_returns_error()
+ {
+ // Go: TestJetStreamConsumerBadCreateErr consumer_test.go
+ await using var fx = await JetStreamApiFixture.StartWithStreamAsync("BCE", "bce.>");
+
+ // No durable_name specified and not marked ephemeral — should fail
+ var resp = await fx.RequestLocalAsync(
+ "$JS.API.CONSUMER.CREATE.BCE.",
+ """{"filter_subject":"bce.>"}""");
+
+ // Either an error response or a not-found (depends on subject parsing)
+ (resp.Error != null || resp.ConsumerInfo == null).ShouldBeTrue();
+ }
+
+ // =========================================================================
+ // TestJetStreamConsumerInternalClientLeak — consumer_test.go
+ // Creating and deleting many consumers does not leak handles.
+ // =========================================================================
+
+ [Fact]
+ public async Task Consumer_create_delete_cycle_no_leak()
+ {
+ // Go: TestJetStreamConsumerInternalClientLeak consumer_test.go
+ await using var fx = await JetStreamApiFixture.StartWithStreamAsync("ICL", "icl.>");
+
+ for (var i = 0; i < 10; i++)
+ {
+ var name = $"C{i:D3}";
+ var create = await fx.CreateConsumerAsync("ICL", name, "icl.>");
+ create.Error.ShouldBeNull();
+
+ var del = await fx.RequestLocalAsync($"$JS.API.CONSUMER.DELETE.ICL.{name}", "{}");
+ del.Success.ShouldBeTrue();
+ }
+
+ // After all creates and deletes, no consumers remain
+ var names = await fx.RequestLocalAsync("$JS.API.CONSUMER.NAMES.ICL", "{}");
+ (names.ConsumerNames == null || names.ConsumerNames.Count == 0).ShouldBeTrue();
+ }
+
+ // =========================================================================
+ // TestJetStreamConsumerEventingRaceOnShutdown — consumer_test.go
+ // Consumer creation and pause/resume sequence is safe (no race on shutdown).
+ // =========================================================================
+
+ [Fact]
+ public async Task Consumer_pause_resume_sequence_is_safe()
+ {
+ // Go: TestJetStreamConsumerEventingRaceOnShutdown consumer_test.go
+ await using var fx = await JetStreamApiFixture.StartWithStreamAsync("EROS", "eros.>");
+
+ _ = await fx.CreateConsumerAsync("EROS", "C1", "eros.>");
+ _ = await fx.PublishAndGetAckAsync("eros.x", "msg");
+
+ // Rapid pause/resume sequence
+ for (var i = 0; i < 5; i++)
+ {
+ var p = await fx.RequestLocalAsync("$JS.API.CONSUMER.PAUSE.EROS.C1", """{"pause":true}""");
+ p.Success.ShouldBeTrue();
+ var r = await fx.RequestLocalAsync("$JS.API.CONSUMER.PAUSE.EROS.C1", """{"pause":false}""");
+ r.Success.ShouldBeTrue();
+ }
+
+ // Consumer should still be functional
+ var batch = await fx.FetchAsync("EROS", "C1", 5);
+ batch.Messages.Count.ShouldBe(1);
+ }
+
+ // =========================================================================
+ // TestJetStreamConsumerNoMsgPayload — consumer_test.go
+ // A message with empty payload can be published and fetched correctly.
+ // =========================================================================
+
+ [Fact]
+ public async Task Consumer_can_receive_empty_payload_message()
+ {
+ // Go: TestJetStreamConsumerNoMsgPayload consumer_test.go
+ await using var fx = await JetStreamApiFixture.StartWithStreamAsync("NMP", "nmp.>");
+ _ = await fx.CreateConsumerAsync("NMP", "C1", "nmp.>");
+
+ _ = await fx.PublishAndGetAckAsync("nmp.x", "");
+
+ var batch = await fx.FetchAsync("NMP", "C1", 1);
+ batch.Messages.Count.ShouldBe(1);
+ batch.Messages[0].Payload.Length.ShouldBe(0);
+ }
+
+ // =========================================================================
+ // TestJetStreamConsumerUpdateSurvival — consumer_test.go
+ // Updating a consumer preserves messages already in the stream.
+ // =========================================================================
+
+ [Fact]
+ public async Task Consumer_update_does_not_lose_messages()
+ {
+ // Go: TestJetStreamConsumerUpdateSurvival consumer_test.go
+ await using var fx = await JetStreamApiFixture.StartWithStreamAsync("CUS", "cus.>");
+
+ // Publish messages before consumer creation
+ for (var i = 0; i < 3; i++)
+ _ = await fx.PublishAndGetAckAsync("cus.x", $"msg-{i}");
+
+ // Create consumer
+ _ = await fx.CreateConsumerAsync("CUS", "C1", "cus.>");
+
+ // Update consumer (same config — idempotent)
+ _ = await fx.CreateConsumerAsync("CUS", "C1", "cus.>");
+
+ var batch = await fx.FetchAsync("CUS", "C1", 10);
+ batch.Messages.Count.ShouldBe(3);
+ }
+
+ // =========================================================================
+ // TestJetStreamConsumerAckSamplingSpecifiedUsingUpdateConsumer — consumer_test.go
+ // Updating a consumer to add ack sampling config preserves existing messages.
+ // =========================================================================
+
+ [Fact]
+ public async Task Consumer_update_preserves_ack_policy_and_messages()
+ {
+ // Go: TestJetStreamConsumerAckSamplingSpecifiedUsingUpdateConsumer consumer_test.go
+ await using var fx = await JetStreamApiFixture.StartWithStreamAsync("CAS", "cas.>");
+
+ _ = await fx.CreateConsumerAsync("CAS", "C1", "cas.>",
+ ackPolicy: AckPolicy.Explicit);
+
+ _ = await fx.PublishAndGetAckAsync("cas.x", "msg1");
+
+ // Update: change ack wait (allowed update)
+ var update = await fx.CreateConsumerAsync("CAS", "C1", "cas.>",
+ ackPolicy: AckPolicy.Explicit, ackWaitMs: 10_000);
+ update.Error.ShouldBeNull();
+
+ var batch = await fx.FetchAsync("CAS", "C1", 5);
+ batch.Messages.Count.ShouldBe(1);
+ }
+
+ // =========================================================================
+ // TestJetStreamConsumerStreamUpdate — consumer_test.go
+ // Adding subjects to a stream does not break existing consumers.
+ // =========================================================================
+
+ [Fact]
+ public async Task Stream_update_does_not_break_consumer()
+ {
+ // Go: TestJetStreamConsumerStreamUpdate consumer_test.go
+ await using var fx = await JetStreamApiFixture.StartWithStreamAsync("CSU", "csu.a");
+
+ _ = await fx.CreateConsumerAsync("CSU", "C1", "csu.a");
+
+ _ = await fx.PublishAndGetAckAsync("csu.a", "before-update");
+
+ // Update stream to add more subjects
+ var upd = await fx.RequestLocalAsync(
+ "$JS.API.STREAM.UPDATE.CSU",
+ """{"name":"CSU","subjects":["csu.a","csu.b"]}""");
+ upd.Error.ShouldBeNull();
+
+ _ = await fx.PublishAndGetAckAsync("csu.a", "after-update");
+
+ var batch = await fx.FetchAsync("CSU", "C1", 10);
+ batch.Messages.Count.ShouldBe(2);
+ }
+
+ // =========================================================================
+ // TestJetStreamConsumerAndStreamNamesWithPathSeparators — consumer_test.go
+ // Consumer names with dots or slashes are handled correctly.
+ // =========================================================================
+
+ [Fact]
+ public async Task Consumer_and_stream_names_with_hyphens_are_valid()
+ {
+ // Go: TestJetStreamConsumerAndStreamNamesWithPathSeparators consumer_test.go
+ await using var fx = await JetStreamApiFixture.StartWithStreamAsync("STREAM-ONE", "sone.>");
+
+ var resp = await fx.CreateConsumerAsync("STREAM-ONE", "CONSUMER-ONE", "sone.>");
+ resp.Error.ShouldBeNull();
+ resp.ConsumerInfo!.Config.DurableName.ShouldBe("CONSUMER-ONE");
+ }
+
+ // =========================================================================
+ // TestJetStreamConsumerUpdateFilterSubject — consumer_test.go
+ // Updating a consumer's filter subject is reflected in info.
+ // =========================================================================
+
+ [Fact]
+ public async Task Consumer_filter_subject_can_be_updated()
+ {
+ // Go: TestJetStreamConsumerUpdateFilterSubject consumer_test.go
+ await using var fx = await JetStreamApiFixture.StartWithStreamAsync("CUFS", "cufs.>");
+
+ _ = await fx.CreateConsumerAsync("CUFS", "C1", "cufs.a");
+
+ // Update filter
+ var update = await fx.CreateConsumerAsync("CUFS", "C1", "cufs.b");
+ update.Error.ShouldBeNull();
+
+ var info = await fx.GetConsumerInfoAsync("CUFS", "C1");
+ info.Config.FilterSubject.ShouldBe("cufs.b");
+ }
+
+ // =========================================================================
+ // TestJetStreamConsumerAndStreamMetadata — consumer_test.go
+ // Consumer info includes config metadata.
+ // =========================================================================
+
+ [Fact]
+ public async Task Consumer_info_includes_config_metadata()
+ {
+ // Go: TestJetStreamConsumerAndStreamMetadata consumer_test.go
+ await using var fx = await JetStreamApiFixture.StartWithStreamAsync("CISM", "cism.>");
+
+ var create = await fx.CreateConsumerAsync("CISM", "META", "cism.>",
+ ackPolicy: AckPolicy.Explicit,
+ maxAckPending: 100);
+ create.Error.ShouldBeNull();
+
+ var info = await fx.GetConsumerInfoAsync("CISM", "META");
+ info.Config.DurableName.ShouldBe("META");
+ info.Config.AckPolicy.ShouldBe(AckPolicy.Explicit);
+ info.Config.MaxAckPending.ShouldBe(100);
+ }
+
+ // =========================================================================
+ // TestJetStreamConsumerDefaultsFromStream — consumer_test.go
+ // A consumer created on a stream inherits the stream's subject if no
+ // filter is specified (matches all messages on the stream).
+ // =========================================================================
+
+ [Fact]
+ public async Task Consumer_without_filter_receives_all_stream_messages()
+ {
+ // Go: TestJetStreamConsumerDefaultsFromStream consumer_test.go
+ await using var fx = await JetStreamApiFixture.StartWithStreamAsync("CDFS", "cdfs.>");
+
+ _ = await fx.CreateConsumerAsync("CDFS", "C1", null);
+
+ _ = await fx.PublishAndGetAckAsync("cdfs.a", "1");
+ _ = await fx.PublishAndGetAckAsync("cdfs.b", "2");
+ _ = await fx.PublishAndGetAckAsync("cdfs.c", "3");
+
+ var batch = await fx.FetchAsync("CDFS", "C1", 10);
+ batch.Messages.Count.ShouldBe(3);
+ }
+
+ // =========================================================================
+ // TestJetStreamConsumerWithFormattingSymbol — consumer_test.go
+ // Consumer durable names can contain underscores and hyphens.
+ // =========================================================================
+
+ [Fact]
+ public async Task Consumer_durable_name_with_special_chars_is_valid()
+ {
+ // Go: TestJetStreamConsumerWithFormattingSymbol consumer_test.go
+ await using var fx = await JetStreamApiFixture.StartWithStreamAsync("CWFS", "cwfs.>");
+
+ var resp = await fx.CreateConsumerAsync("CWFS", "my_consumer-v2", "cwfs.>");
+ resp.Error.ShouldBeNull();
+ resp.ConsumerInfo!.Config.DurableName.ShouldBe("my_consumer-v2");
+ }
+
+ // =========================================================================
+ // TestJetStreamConsumerWithCorruptStateIsDeleted — consumer_test.go
+ // A ConsumerFileStore with corrupt data initialises to empty state
+ // instead of crashing.
+ // =========================================================================
+
+ [Fact]
+ public void ConsumerFileStore_with_corrupt_state_initializes_empty()
+ {
+ // Go: TestJetStreamConsumerWithCorruptStateIsDeleted consumer_test.go
+ var dir = Path.Combine(Path.GetTempPath(), $"nats-corrupt-{Guid.NewGuid():N}");
+ Directory.CreateDirectory(dir);
+ var stateFile = Path.Combine(dir, "state.bin");
+
+ try
+ {
+ // Write corrupt data
+ File.WriteAllBytes(stateFile, [0xFF, 0xFE, 0x00, 0x01, 0x02, 0x03]);
+
+ var cfg = new ConsumerConfig
+ {
+ DurableName = "C",
+ AckPolicy = AckPolicy.Explicit,
+ };
+
+ // Should not throw — corrupt state means empty state
+ var store = new ConsumerFileStore(stateFile, cfg);
+ var state = store.State();
+ store.Stop();
+
+ // Corrupt state → empty/default state
+ state.Delivered.Consumer.ShouldBe(0UL);
+ }
+ finally
+ {
+ if (Directory.Exists(dir))
+ Directory.Delete(dir, recursive: true);
+ }
+ }
+
+ // =========================================================================
+ // TestJetStreamConsumerNoDeleteAfterConcurrentShutdownAndLeaderChange — consumer_test.go
+ // A consumer that is unregistered from the manager truly returns not-found.
+ // =========================================================================
+
+ [Fact]
+ public async Task Consumer_not_found_after_delete()
+ {
+ // Go: TestJetStreamConsumerNoDeleteAfterConcurrentShutdownAndLeaderChange consumer_test.go
+ await using var fx = await JetStreamApiFixture.StartWithStreamAsync("NDSL", "ndsl.>");
+
+ _ = await fx.CreateConsumerAsync("NDSL", "C1", "ndsl.>");
+
+ var del = await fx.RequestLocalAsync("$JS.API.CONSUMER.DELETE.NDSL.C1", "{}");
+ del.Success.ShouldBeTrue();
+
+ // Info on deleted consumer returns error
+ var info = await fx.RequestLocalAsync("$JS.API.CONSUMER.INFO.NDSL.C1", "{}");
+ info.Error.ShouldNotBeNull();
+ }
+
+ // =========================================================================
+ // TestJetStreamConsumerResetToSequenceConstraintOnStartSeq — consumer_test.go
+ // DeliverPolicy.ByStartSequence delivers from the specified sequence onward.
+ // =========================================================================
+
+ [Fact]
+ public async Task Consumer_reset_to_sequence_start_seq()
+ {
+ // Go: TestJetStreamConsumerResetToSequenceConstraintOnStartSeq consumer_test.go
+ await using var fx = await JetStreamApiFixture.StartWithStreamAsync("CRSS", "crss.>");
+
+ for (var i = 1; i <= 5; i++)
+ _ = await fx.PublishAndGetAckAsync("crss.x", $"msg-{i}");
+
+ var resp = await fx.RequestLocalAsync(
+ "$JS.API.CONSUMER.CREATE.CRSS.C1",
+ """{"durable_name":"C1","filter_subject":"crss.>","deliver_policy":"by_start_sequence","opt_start_seq":3}""");
+ resp.Error.ShouldBeNull();
+
+ var batch = await fx.FetchAsync("CRSS", "C1", 10);
+ batch.Messages.Count.ShouldBe(3);
+ batch.Messages[0].Sequence.ShouldBe(3UL);
+ }
+
+ // =========================================================================
+ // TestJetStreamConsumerResetToSequenceConstraintOnStartTime — consumer_test.go
+ // DeliverPolicy.ByStartTime delivers from messages at or after the start time.
+ // =========================================================================
+
+ [Fact]
+ public async Task Consumer_reset_to_sequence_start_time()
+ {
+ // Go: TestJetStreamConsumerResetToSequenceConstraintOnStartTime consumer_test.go
+ await using var fx = await JetStreamApiFixture.StartWithStreamAsync("CRST", "crst.>");
+
+ _ = await fx.PublishAndGetAckAsync("crst.x", "before");
+ _ = await fx.PublishAndGetAckAsync("crst.x", "before-2");
+
+ var startTime = DateTime.UtcNow.AddSeconds(1).ToString("O");
+
+ // Small delay so publish after startTime
+ await Task.Delay(10);
+ _ = await fx.PublishAndGetAckAsync("crst.x", "after");
+
+ var resp = await fx.RequestLocalAsync(
+ "$JS.API.CONSUMER.CREATE.CRST.C1",
+ $"{{\"durable_name\":\"C1\",\"filter_subject\":\"crst.>\",\"deliver_policy\":\"by_start_time\",\"opt_start_time_utc\":\"{startTime}\"}}");
+ resp.Error.ShouldBeNull();
+
+ // DeliverPolicy.ByStartTime resolution is approximate; just verify no error
+ var info = await fx.GetConsumerInfoAsync("CRST", "C1");
+ info.Config.DeliverPolicy.ShouldBe(DeliverPolicy.ByStartTime);
+ }
+
+ // =========================================================================
+ // TestJetStreamConsumerLegacyDurableCreateSetsConsumerName — consumer_test.go
+ // Consumer name is populated from durable_name field.
+ // =========================================================================
+
+ [Fact]
+ public async Task Legacy_durable_create_sets_consumer_name()
+ {
+ // Go: TestJetStreamConsumerLegacyDurableCreateSetsConsumerName consumer_test.go
+ await using var fx = await JetStreamApiFixture.StartWithStreamAsync("LDCS", "ldcs.>");
+
+ var resp = await fx.RequestLocalAsync(
+ "$JS.API.CONSUMER.CREATE.LDCS.LEGACY",
+ """{"durable_name":"LEGACY","filter_subject":"ldcs.>"}""");
+ resp.Error.ShouldBeNull();
+ resp.ConsumerInfo.ShouldNotBeNull();
+ resp.ConsumerInfo!.Config.DurableName.ShouldBe("LEGACY");
+ }
+
+ // =========================================================================
+ // TestJetStreamConsumerNotInactiveDuringAckWait — consumer_test.go
+ // A consumer with pending acks is considered active (not inactive/expired).
+ // After acking all pending messages, pending count returns to 0.
+ // =========================================================================
+
+ [Fact]
+ public async Task Consumer_not_inactive_during_ack_wait()
+ {
+ // Go: TestJetStreamConsumerNotInactiveDuringAckWait consumer_test.go
+ await using var fx = await JetStreamApiFixture.StartWithStreamAsync("NIDAW", "nidaw.>");
+ _ = await fx.CreateConsumerAsync("NIDAW", "C1", "nidaw.>",
+ ackPolicy: AckPolicy.Explicit,
+ ackWaitMs: 30_000);
+
+ _ = await fx.PublishAndGetAckAsync("nidaw.x", "msg");
+
+ var batch = await fx.FetchAsync("NIDAW", "C1", 1);
+ batch.Messages.Count.ShouldBe(1);
+
+ // Consumer has pending ack — should still be "active"
+ var pending = await fx.GetPendingCountAsync("NIDAW", "C1");
+ pending.ShouldBe(1);
+
+ // Ack the message — pending drops to 0
+ await fx.AckAllAsync("NIDAW", "C1", batch.Messages[0].Sequence);
+ var afterAck = await fx.GetPendingCountAsync("NIDAW", "C1");
+ afterAck.ShouldBe(0);
+ }
+
+ // =========================================================================
+ // PullRequestWaitQueue — priority group round-robin within same priority
+ // =========================================================================
+
+ [Fact]
+ public void Pull_request_queue_round_robins_within_same_priority()
+ {
+ // Go: TestJetStreamConsumerPriorityPullRequests — pop-and-requeue within priority
+ var queue = new PullRequestWaitQueue();
+
+ queue.Enqueue(new PullWaitingRequest { Priority = 1, Batch = 3, RemainingBatch = 3, Reply = "reply-A" });
+ queue.Enqueue(new PullWaitingRequest { Priority = 1, Batch = 3, RemainingBatch = 3, Reply = "reply-B" });
+
+ // First pop-and-requeue returns A, re-inserts A after B
+ var first = queue.PopAndRequeue();
+ first.ShouldNotBeNull();
+ first!.Reply.ShouldBe("reply-A");
+ first.RemainingBatch.ShouldBe(2);
+
+ // Second pop-and-requeue returns B (A is now at end of priority group)
+ var second = queue.PopAndRequeue();
+ second.ShouldNotBeNull();
+ second!.Reply.ShouldBe("reply-B");
+ }
+
+ // =========================================================================
+ // RedeliveryTracker — due entries returned after deadline passes
+ // =========================================================================
+
+ [Fact]
+ public async Task Redelivery_tracker_returns_due_entries_after_deadline()
+ {
+ // Go: TestJetStreamConsumerMaxDeliveryAndServerRestart — redelivery mechanics
+ var tracker = new RedeliveryTracker(backoffMs: [50]);
+
+ tracker.Schedule(10, 1, ackWaitMs: 50);
+
+ // Not yet due
+ var notYet = tracker.GetDue();
+ notYet.Count.ShouldBe(0);
+
+ // Wait for the deadline to pass
+ await Task.Delay(100);
+
+ var due = tracker.GetDue();
+ due.Count.ShouldBe(1);
+ due[0].ShouldBe(10UL);
+
+ // Acknowledge removes from tracker
+ tracker.Acknowledge(10);
+ tracker.IsTracking(10).ShouldBeFalse();
+ }
+
+ // =========================================================================
+ // ConsumerFileStore — encode/decode round-trip
+ // =========================================================================
+
+ [Fact]
+ public void ConsumerFileStore_state_encode_decode_roundtrip()
+ {
+ // Go: TestJetStreamConsumerWithCorruptStateIsDeleted + state persistence tests
+ var dir = Path.Combine(Path.GetTempPath(), $"nats-rt-{Guid.NewGuid():N}");
+ Directory.CreateDirectory(dir);
+ var stateFile = Path.Combine(dir, "state.bin");
+
+ try
+ {
+ var cfg = new ConsumerConfig
+ {
+ DurableName = "RT",
+ AckPolicy = AckPolicy.Explicit,
+ };
+
+ var store = new ConsumerFileStore(stateFile, cfg);
+ long tsNs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * 1_000_000;
+ store.UpdateDelivered(1, 1, 1, tsNs);
+ store.UpdateDelivered(2, 2, 1, tsNs + 1);
+ store.Stop();
+
+ var store2 = new ConsumerFileStore(stateFile, cfg);
+ var state = store2.State();
+ store2.Stop();
+
+ state.Delivered.Consumer.ShouldBe(2UL);
+ state.Delivered.Stream.ShouldBe(2UL);
+ }
+ finally
+ {
+ if (Directory.Exists(dir))
+ Directory.Delete(dir, recursive: true);
+ }
+ }
+}