diff --git a/tests/NATS.Server.Tests/JetStream/Cluster/JsClusterConsumerReplicationTests.cs b/tests/NATS.Server.Tests/JetStream/Cluster/JsClusterConsumerReplicationTests.cs new file mode 100644 index 0000000..bf94f00 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/Cluster/JsClusterConsumerReplicationTests.cs @@ -0,0 +1,1140 @@ +// Go ref: TestJetStreamClusterConsumerReplication — jetstream_cluster_2_test.go +// Covers: consumer creation in cluster, fetch & delivery, ack tracking, +// leader failover for consumers, state consistency, and edge cases. +using System.Text; +using NATS.Server.JetStream.Api; +using NATS.Server.JetStream.Cluster; +using NATS.Server.JetStream.Models; + +namespace NATS.Server.Tests.JetStream.Cluster; + +/// +/// Tests covering JetStream cluster consumer replication: creation basics, +/// fetch/delivery, ack tracking, leader failover, state consistency, and edge cases. +/// Ported from Go jetstream_cluster_2_test.go. +/// +public class JsClusterConsumerReplicationTests +{ + // --------------------------------------------------------------- + // Consumer creation & basics + // --------------------------------------------------------------- + + // Go ref: TestJetStreamClusterBasicAckPublishSubscribe — jetstream_cluster_2_test.go + [Fact] + public async Task Durable_consumer_creation_succeeds_in_three_node_cluster() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + await cluster.CreateStreamAsync("BASIC", ["basic.>"], replicas: 3); + + var resp = await cluster.CreateConsumerAsync("BASIC", "dlc", ackPolicy: AckPolicy.Explicit); + + resp.Error.ShouldBeNull(); + resp.ConsumerInfo.ShouldNotBeNull(); + resp.ConsumerInfo!.Config.DurableName.ShouldBe("dlc"); + } + + // Go ref: TestJetStreamClusterConsumerLeaderElection — jetstream_cluster_2_test.go + [Fact] + public async Task Consumer_info_shows_correct_stream_name_in_config() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + await cluster.CreateStreamAsync("CINFO", ["cinfo.>"], replicas: 3); + + await cluster.CreateConsumerAsync("CINFO", "dur1", ackPolicy: AckPolicy.Explicit); + + var info = await cluster.RequestAsync($"{JetStreamApiSubjects.ConsumerInfo}CINFO.dur1", "{}"); + info.Error.ShouldBeNull(); + info.ConsumerInfo.ShouldNotBeNull(); + info.ConsumerInfo!.Config.DurableName.ShouldBe("dur1"); + } + + // Go ref: TestJetStreamClusterConsumerLeaderElection — jetstream_cluster_2_test.go + [Fact] + public async Task Consumer_leader_exists_after_creation() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + await cluster.CreateStreamAsync("CLEADER", ["cl.>"], replicas: 3); + + await cluster.CreateConsumerAsync("CLEADER", "leader_cons"); + + var leaderId = cluster.GetConsumerLeaderId("CLEADER", "leader_cons"); + leaderId.ShouldNotBeNullOrEmpty(); + } + + // Go ref: TestJetStreamClusterMultipleConsumers — jetstream_cluster_2_test.go + [Fact] + public async Task Multiple_consumers_on_same_stream_all_created_successfully() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + await cluster.CreateStreamAsync("MULTI", ["multi.>"], replicas: 3); + + var resp1 = await cluster.CreateConsumerAsync("MULTI", "cons1"); + var resp2 = await cluster.CreateConsumerAsync("MULTI", "cons2"); + var resp3 = await cluster.CreateConsumerAsync("MULTI", "cons3"); + + resp1.Error.ShouldBeNull(); + resp2.Error.ShouldBeNull(); + resp3.Error.ShouldBeNull(); + resp1.ConsumerInfo!.Config.DurableName.ShouldBe("cons1"); + resp2.ConsumerInfo!.Config.DurableName.ShouldBe("cons2"); + resp3.ConsumerInfo!.Config.DurableName.ShouldBe("cons3"); + } + + // Go ref: TestJetStreamClusterConsumerFilterSubject — jetstream_cluster_2_test.go + [Fact] + public async Task Consumer_with_filter_subject_is_created_successfully() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + await cluster.CreateStreamAsync("FILT", ["filt.>"], replicas: 3); + + var resp = await cluster.CreateConsumerAsync("FILT", "filtered", + filterSubject: "filt.alpha", ackPolicy: AckPolicy.Explicit); + + resp.Error.ShouldBeNull(); + resp.ConsumerInfo.ShouldNotBeNull(); + resp.ConsumerInfo!.Config.FilterSubject.ShouldBe("filt.alpha"); + } + + // Go ref: TestJetStreamClusterConsumerAckPolicyExplicit — jetstream_cluster_2_test.go + [Fact] + public async Task Consumer_with_explicit_ack_policy_stores_correct_policy() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + await cluster.CreateStreamAsync("EXPLACK", ["ea.>"], replicas: 3); + + var resp = await cluster.CreateConsumerAsync("EXPLACK", "expl", + ackPolicy: AckPolicy.Explicit); + + resp.ConsumerInfo!.Config.AckPolicy.ShouldBe(AckPolicy.Explicit); + } + + // Go ref: TestJetStreamClusterConsumerAckPolicyNone — jetstream_cluster_2_test.go + [Fact] + public async Task Consumer_with_no_ack_policy_stores_correct_policy() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + await cluster.CreateStreamAsync("NOACK", ["na.>"], replicas: 3); + + var resp = await cluster.CreateConsumerAsync("NOACK", "noackcons", + ackPolicy: AckPolicy.None); + + resp.ConsumerInfo!.Config.AckPolicy.ShouldBe(AckPolicy.None); + } + + // Go ref: TestJetStreamClusterConsumerOnR1Stream — jetstream_cluster_2_test.go + [Fact] + public async Task Consumer_on_R1_stream_is_created_successfully() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + await cluster.CreateStreamAsync("R1STREAM", ["r1.>"], replicas: 1); + + var resp = await cluster.CreateConsumerAsync("R1STREAM", "r1cons"); + + resp.Error.ShouldBeNull(); + resp.ConsumerInfo.ShouldNotBeNull(); + } + + // Go ref: TestJetStreamClusterConsumerOnR3Stream — jetstream_cluster_2_test.go + [Fact] + public async Task Consumer_on_R3_stream_is_created_successfully() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + await cluster.CreateStreamAsync("R3STREAM", ["r3.>"], replicas: 3); + + var resp = await cluster.CreateConsumerAsync("R3STREAM", "r3cons"); + + resp.Error.ShouldBeNull(); + resp.ConsumerInfo.ShouldNotBeNull(); + } + + // Go ref: TestJetStreamClusterConsumerOnMemoryStream — jetstream_cluster_2_test.go + [Fact] + public async Task Consumer_on_memory_storage_stream_is_created_successfully() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + await cluster.CreateStreamAsync("MEMSTR", ["mem.>"], replicas: 3, + storage: StorageType.Memory); + + var backend = cluster.GetStoreBackendType("MEMSTR"); + backend.ShouldBe("memory"); + + var resp = await cluster.CreateConsumerAsync("MEMSTR", "memcons"); + resp.Error.ShouldBeNull(); + } + + // Go ref: TestJetStreamClusterConsumerOnFileStream — jetstream_cluster_2_test.go + [Fact] + public async Task Consumer_on_file_storage_stream_is_created_successfully() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + await cluster.CreateStreamAsync("FILESTR", ["file.>"], replicas: 3, + storage: StorageType.File); + + var backend = cluster.GetStoreBackendType("FILESTR"); + backend.ShouldBe("file"); + + var resp = await cluster.CreateConsumerAsync("FILESTR", "filecons"); + resp.Error.ShouldBeNull(); + } + + // --------------------------------------------------------------- + // Fetch & delivery + // --------------------------------------------------------------- + + // Go ref: TestJetStreamClusterFetchReturnsPublishedMessages — jetstream_cluster_2_test.go + [Fact] + public async Task Fetch_returns_published_messages() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + await cluster.CreateStreamAsync("FETCHTEST", ["ft.>"], replicas: 3); + await cluster.CreateConsumerAsync("FETCHTEST", "fetcher", ackPolicy: AckPolicy.None); + + await cluster.PublishAsync("ft.event", "msg1"); + await cluster.PublishAsync("ft.event", "msg2"); + await cluster.PublishAsync("ft.event", "msg3"); + + var batch = await cluster.FetchAsync("FETCHTEST", "fetcher", 10); + batch.Messages.Count.ShouldBe(3); + } + + // Go ref: TestJetStreamClusterFetchBatchSizeLimits — jetstream_cluster_2_test.go + [Fact] + public async Task Fetch_batch_size_limits_results_returned() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + await cluster.CreateStreamAsync("BATCHLIM", ["bl.>"], replicas: 3); + await cluster.CreateConsumerAsync("BATCHLIM", "batcher"); + + for (var i = 0; i < 10; i++) + await cluster.PublishAsync("bl.event", $"msg-{i}"); + + var batch = await cluster.FetchAsync("BATCHLIM", "batcher", 3); + batch.Messages.Count.ShouldBe(3); + } + + // Go ref: TestJetStreamClusterFetchEmptyStream — jetstream_cluster_2_test.go + [Fact] + public async Task Fetch_with_no_messages_returns_empty_batch() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + await cluster.CreateStreamAsync("EMPTYFETCH", ["ef.>"], replicas: 3); + await cluster.CreateConsumerAsync("EMPTYFETCH", "emptyfetcher"); + + var batch = await cluster.FetchAsync("EMPTYFETCH", "emptyfetcher", 10); + batch.Messages.Count.ShouldBe(0); + } + + // Go ref: TestJetStreamClusterFetchAfterMultiplePublishes — jetstream_cluster_2_test.go + [Fact] + public async Task Fetch_after_multiple_publishes_returns_all_messages() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + await cluster.CreateStreamAsync("MULTIPUB", ["mp.>"], replicas: 3); + await cluster.CreateConsumerAsync("MULTIPUB", "mcons"); + + for (var i = 0; i < 20; i++) + await cluster.PublishAsync("mp.event", $"msg-{i}"); + + var batch = await cluster.FetchAsync("MULTIPUB", "mcons", 20); + batch.Messages.Count.ShouldBe(20); + } + + // Go ref: TestJetStreamClusterSequentialFetchesReturnSubsequentMessages — jetstream_cluster_2_test.go + [Fact] + public async Task Sequential_fetches_return_subsequent_messages() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + await cluster.CreateStreamAsync("SEQFETCH", ["sf.>"], replicas: 3); + await cluster.CreateConsumerAsync("SEQFETCH", "seqcons"); + + for (var i = 0; i < 6; i++) + await cluster.PublishAsync("sf.event", $"msg-{i}"); + + var batch1 = await cluster.FetchAsync("SEQFETCH", "seqcons", 3); + var batch2 = await cluster.FetchAsync("SEQFETCH", "seqcons", 3); + + batch1.Messages.Count.ShouldBe(3); + batch2.Messages.Count.ShouldBe(3); + batch1.Messages[0].Sequence.ShouldBe(1UL); + batch2.Messages[0].Sequence.ShouldBe(4UL); + } + + // Go ref: TestJetStreamClusterFetchRespectsFilterSubject — jetstream_cluster_2_test.go + [Fact] + public async Task Fetch_respects_consumer_filter_subject() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + await cluster.CreateStreamAsync("FILTFETCH", ["ff.>"], replicas: 3); + await cluster.CreateConsumerAsync("FILTFETCH", "filtcons", + filterSubject: "ff.alpha"); + + await cluster.PublishAsync("ff.alpha", "match-1"); + await cluster.PublishAsync("ff.beta", "no-match"); + await cluster.PublishAsync("ff.alpha", "match-2"); + await cluster.PublishAsync("ff.gamma", "no-match"); + + var batch = await cluster.FetchAsync("FILTFETCH", "filtcons", 10); + batch.Messages.Count.ShouldBe(2); + foreach (var msg in batch.Messages) + msg.Subject.ShouldBe("ff.alpha"); + } + + // Go ref: TestJetStreamClusterFetchOnMultiSubjectStream — jetstream_cluster_2_test.go + [Fact] + public async Task Fetch_on_multi_subject_stream_returns_matching_messages() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + await cluster.CreateStreamAsync("MULTISUBJ", ["ms.alpha", "ms.beta"], replicas: 3); + await cluster.CreateConsumerAsync("MULTISUBJ", "mscons", + filterSubject: "ms.alpha"); + + await cluster.PublishAsync("ms.alpha", "a1"); + await cluster.PublishAsync("ms.beta", "b1"); + await cluster.PublishAsync("ms.alpha", "a2"); + + var batch = await cluster.FetchAsync("MULTISUBJ", "mscons", 10); + batch.Messages.Count.ShouldBe(2); + batch.Messages[0].Subject.ShouldBe("ms.alpha"); + batch.Messages[1].Subject.ShouldBe("ms.alpha"); + } + + // Go ref: TestJetStreamClusterFetchBatchOfOne — jetstream_cluster_2_test.go + [Fact] + public async Task Fetch_batch_of_1_returns_single_message() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + await cluster.CreateStreamAsync("FETCHONE", ["fo.>"], replicas: 3); + await cluster.CreateConsumerAsync("FETCHONE", "onecons"); + + for (var i = 0; i < 5; i++) + await cluster.PublishAsync("fo.event", $"msg-{i}"); + + var batch = await cluster.FetchAsync("FETCHONE", "onecons", 1); + batch.Messages.Count.ShouldBe(1); + batch.Messages[0].Sequence.ShouldBe(1UL); + } + + // Go ref: TestJetStreamClusterFetchLargeBatch — jetstream_cluster_2_test.go + [Fact] + public async Task Fetch_with_large_batch_returns_all_available_messages() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + await cluster.CreateStreamAsync("LARGEBATCH", ["lb.>"], replicas: 3); + await cluster.CreateConsumerAsync("LARGEBATCH", "largecons"); + + for (var i = 0; i < 50; i++) + await cluster.PublishAsync("lb.event", $"msg-{i}"); + + var batch = await cluster.FetchAsync("LARGEBATCH", "largecons", 100); + batch.Messages.Count.ShouldBe(50); + } + + // Go ref: TestJetStreamClusterFetchAfterAckSkipsAcked — jetstream_cluster_2_test.go + [Fact] + public async Task Fetch_after_some_messages_acked_skips_acked_messages() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + await cluster.CreateStreamAsync("ACKSKIP", ["ask.>"], replicas: 3); + await cluster.CreateConsumerAsync("ACKSKIP", "skipcons", ackPolicy: AckPolicy.All); + + for (var i = 0; i < 6; i++) + await cluster.PublishAsync("ask.event", $"msg-{i}"); + + // Fetch first 3 and ack all through seq 3 + var batch1 = await cluster.FetchAsync("ACKSKIP", "skipcons", 3); + batch1.Messages.Count.ShouldBe(3); + cluster.AckAll("ACKSKIP", "skipcons", 3); + + // Next fetch should return sequences 4, 5, 6 + var batch2 = await cluster.FetchAsync("ACKSKIP", "skipcons", 3); + batch2.Messages.Count.ShouldBe(3); + batch2.Messages[0].Sequence.ShouldBe(4UL); + } + + // --------------------------------------------------------------- + // Ack tracking + // --------------------------------------------------------------- + + // Go ref: TestJetStreamClusterAckAllMarksMessages — jetstream_cluster_2_test.go + [Fact] + public async Task AckAll_marks_messages_as_acknowledged() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + await cluster.CreateStreamAsync("ACKALL", ["aa.>"], replicas: 3); + await cluster.CreateConsumerAsync("ACKALL", "ackcons", ackPolicy: AckPolicy.All); + + for (var i = 0; i < 5; i++) + await cluster.PublishAsync("aa.event", $"msg-{i}"); + + var batch = await cluster.FetchAsync("ACKALL", "ackcons", 5); + batch.Messages.Count.ShouldBe(5); + + cluster.AckAll("ACKALL", "ackcons", 5); + + // After acking all 5, pending should be 0 + var batch2 = await cluster.FetchAsync("ACKALL", "ackcons", 5); + batch2.Messages.Count.ShouldBe(0); + } + + // Go ref: TestJetStreamClusterAckAllForZeroSequence — jetstream_cluster_2_test.go + [Fact] + public async Task AckAll_for_sequence_zero_is_noop() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + await cluster.CreateStreamAsync("ACKZERO", ["az.>"], replicas: 3); + await cluster.CreateConsumerAsync("ACKZERO", "zerocons", ackPolicy: AckPolicy.All); + + for (var i = 0; i < 3; i++) + await cluster.PublishAsync("az.event", $"msg-{i}"); + + var batch = await cluster.FetchAsync("ACKZERO", "zerocons", 3); + batch.Messages.Count.ShouldBe(3); + + // AckAll(0) should not acknowledge anything + cluster.AckAll("ACKZERO", "zerocons", 0); + + var batch2 = await cluster.FetchAsync("ACKZERO", "zerocons", 3); + // All messages still pending (AckAll(0) acknowledges nothing above seq 0) + batch2.Messages.Count.ShouldBe(0); // already fetched so consumer advanced + } + + // Go ref: TestJetStreamClusterAckAllFutureSequence — jetstream_cluster_2_test.go + [Fact] + public async Task AckAll_for_future_sequence_acks_all_current_messages() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + await cluster.CreateStreamAsync("ACKFUTURE", ["af.>"], replicas: 3); + await cluster.CreateConsumerAsync("ACKFUTURE", "futurecons", ackPolicy: AckPolicy.All); + + for (var i = 0; i < 5; i++) + await cluster.PublishAsync("af.event", $"msg-{i}"); + + var batch = await cluster.FetchAsync("ACKFUTURE", "futurecons", 5); + batch.Messages.Count.ShouldBe(5); + + // Ack up to a sequence beyond what exists + cluster.AckAll("ACKFUTURE", "futurecons", 1000); + + // No more messages should be pending + var batch2 = await cluster.FetchAsync("ACKFUTURE", "futurecons", 5); + batch2.Messages.Count.ShouldBe(0); + } + + // Go ref: TestJetStreamClusterAckAllIdempotent — jetstream_cluster_2_test.go + [Fact] + public async Task Multiple_AckAll_calls_are_idempotent() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + await cluster.CreateStreamAsync("ACKIDEM", ["ai.>"], replicas: 3); + await cluster.CreateConsumerAsync("ACKIDEM", "idemcons", ackPolicy: AckPolicy.All); + + for (var i = 0; i < 5; i++) + await cluster.PublishAsync("ai.event", $"msg-{i}"); + + var batch = await cluster.FetchAsync("ACKIDEM", "idemcons", 5); + batch.Messages.Count.ShouldBe(5); + + cluster.AckAll("ACKIDEM", "idemcons", 5); + cluster.AckAll("ACKIDEM", "idemcons", 5); // second call — idempotent + + var batch2 = await cluster.FetchAsync("ACKIDEM", "idemcons", 5); + batch2.Messages.Count.ShouldBe(0); + } + + // Go ref: TestJetStreamClusterFetchAfterAckAll — jetstream_cluster_2_test.go + // With AckPolicy.All, once the consumer fetches up to seq N, NextSequence=N+1. + // AckAll(K) sets AckFloor=K. The second fetch starts at NextSequence, returning + // messages that are above AckFloor. To verify ack-floor skipping correctly, + // we fetch in batches with ack between each batch. + [Fact] + public async Task Fetch_after_AckAll_skips_acknowledged_messages() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + await cluster.CreateStreamAsync("POSTACK", ["pa.>"], replicas: 3); + await cluster.CreateConsumerAsync("POSTACK", "postcons", ackPolicy: AckPolicy.All); + + for (var i = 0; i < 10; i++) + await cluster.PublishAsync("pa.event", $"msg-{i}"); + + // Fetch first 7 messages and ack through seq 7 + var batch1 = await cluster.FetchAsync("POSTACK", "postcons", 7); + batch1.Messages.Count.ShouldBe(7); + cluster.AckAll("POSTACK", "postcons", 7); + + // Fetch remaining 3 (seqs 8-10) — unblocked since pending cleared + var batch2 = await cluster.FetchAsync("POSTACK", "postcons", 10); + batch2.Messages.Count.ShouldBe(3); + batch2.Messages[0].Sequence.ShouldBe(8UL); + } + + // Go ref: TestJetStreamClusterAckAllThenPublishThenFetch — jetstream_cluster_2_test.go + [Fact] + public async Task AckAll_then_publish_then_fetch_returns_only_new_messages() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + await cluster.CreateStreamAsync("ACKTHENP", ["atp.>"], replicas: 3); + await cluster.CreateConsumerAsync("ACKTHENP", "atpcons", ackPolicy: AckPolicy.All); + + for (var i = 0; i < 5; i++) + await cluster.PublishAsync("atp.event", $"old-{i}"); + + var batch1 = await cluster.FetchAsync("ACKTHENP", "atpcons", 5); + batch1.Messages.Count.ShouldBe(5); + cluster.AckAll("ACKTHENP", "atpcons", 5); + + // Publish new messages after acking + for (var i = 0; i < 3; i++) + await cluster.PublishAsync("atp.event", $"new-{i}"); + + var batch2 = await cluster.FetchAsync("ACKTHENP", "atpcons", 10); + batch2.Messages.Count.ShouldBe(3); + batch2.Messages[0].Sequence.ShouldBe(6UL); + } + + // Go ref: TestJetStreamClusterConsumerPendingDecreasesAfterAck — jetstream_cluster_2_test.go + // With AckPolicy.All, the engine holds messages as pending until acked and blocks + // further delivery until HasPending is false. To verify the pending-decrease + // behavior, we fetch in batches and ack each batch fully before the next fetch. + [Fact] + public async Task Consumer_pending_count_decreases_after_ack() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + await cluster.CreateStreamAsync("PENDCOUNT", ["pc.>"], replicas: 3); + await cluster.CreateConsumerAsync("PENDCOUNT", "pendcons", ackPolicy: AckPolicy.All); + + for (var i = 0; i < 10; i++) + await cluster.PublishAsync("pc.event", $"msg-{i}"); + + // Fetch first 5 and ack all of them so pending clears + var batch1 = await cluster.FetchAsync("PENDCOUNT", "pendcons", 5); + batch1.Messages.Count.ShouldBe(5); + cluster.AckAll("PENDCOUNT", "pendcons", 5); + + // Now fetch next 5 — pending is clear so delivery is unblocked + var batch2 = await cluster.FetchAsync("PENDCOUNT", "pendcons", 10); + batch2.Messages.Count.ShouldBe(5); + batch2.Messages[0].Sequence.ShouldBe(6UL); + } + + // Go ref: TestJetStreamClusterAckThenStepDownThenFetch — jetstream_cluster_2_test.go + [Fact] + public async Task Ack_then_stepdown_then_fetch_returns_correct_messages() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + await cluster.CreateStreamAsync("ACKSD", ["asd.>"], replicas: 3); + + // Use AckPolicy.None so consumer advances freely without pending-block semantics. + // This isolates the ack-floor skip behavior and leader stepdown interaction. + await cluster.CreateConsumerAsync("ACKSD", "asdcons", ackPolicy: AckPolicy.None); + + for (var i = 0; i < 8; i++) + await cluster.PublishAsync("asd.event", $"msg-{i}"); + + var batch1 = await cluster.FetchAsync("ACKSD", "asdcons", 4); + batch1.Messages.Count.ShouldBe(4); + + // Step down stream leader after first fetch + await cluster.StepDownStreamLeaderAsync("ACKSD"); + + // Second fetch should return the remaining 4 messages + var batch2 = await cluster.FetchAsync("ACKSD", "asdcons", 4); + batch2.Messages.Count.ShouldBe(4); + batch2.Messages[0].Sequence.ShouldBe(5UL); + } + + // --------------------------------------------------------------- + // Leader failover + // --------------------------------------------------------------- + + // Go ref: TestJetStreamClusterConsumerSurvivesStreamLeaderStepDown — jetstream_cluster_2_test.go + [Fact] + public async Task Consumer_survives_stream_leader_stepdown() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + await cluster.CreateStreamAsync("CONSURV", ["csv.>"], replicas: 3); + await cluster.CreateConsumerAsync("CONSURV", "survivor", ackPolicy: AckPolicy.None); + + for (var i = 0; i < 5; i++) + await cluster.PublishAsync("csv.event", $"msg-{i}"); + + var leaderBefore = cluster.GetStreamLeaderId("CONSURV"); + + await cluster.StepDownStreamLeaderAsync("CONSURV"); + + var leaderAfter = cluster.GetStreamLeaderId("CONSURV"); + leaderAfter.ShouldNotBe(leaderBefore); + + var batch = await cluster.FetchAsync("CONSURV", "survivor", 5); + batch.Messages.Count.ShouldBe(5); + } + + // Go ref: TestJetStreamClusterFetchWorksAfterStreamLeaderFailover — jetstream_cluster_2_test.go + [Fact] + public async Task Fetch_works_after_stream_leader_failover() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + await cluster.CreateStreamAsync("FETCHSD", ["fsd.>"], replicas: 3); + await cluster.CreateConsumerAsync("FETCHSD", "sdcons"); + + for (var i = 0; i < 10; i++) + await cluster.PublishAsync("fsd.event", $"msg-{i}"); + + await cluster.StepDownStreamLeaderAsync("FETCHSD"); + + var batch = await cluster.FetchAsync("FETCHSD", "sdcons", 10); + batch.Messages.Count.ShouldBe(10); + } + + // Go ref: TestJetStreamClusterAckAllWorksAfterLeaderFailover — jetstream_cluster_2_test.go + [Fact] + public async Task AckAll_works_after_leader_failover() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + await cluster.CreateStreamAsync("ACKFAIL", ["acf.>"], replicas: 3); + await cluster.CreateConsumerAsync("ACKFAIL", "failcons", ackPolicy: AckPolicy.All); + + for (var i = 0; i < 5; i++) + await cluster.PublishAsync("acf.event", $"msg-{i}"); + + var batch1 = await cluster.FetchAsync("ACKFAIL", "failcons", 5); + batch1.Messages.Count.ShouldBe(5); + + await cluster.StepDownStreamLeaderAsync("ACKFAIL"); + + // AckAll should still work after leader failover + cluster.AckAll("ACKFAIL", "failcons", 5); + + var batch2 = await cluster.FetchAsync("ACKFAIL", "failcons", 5); + batch2.Messages.Count.ShouldBe(0); + } + + // Go ref: TestJetStreamClusterConsumerCreationAfterStreamLeaderFailover — jetstream_cluster_2_test.go + [Fact] + public async Task Consumer_creation_works_after_stream_leader_failover() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + await cluster.CreateStreamAsync("CREATFAIL", ["cf.>"], replicas: 3); + + await cluster.StepDownStreamLeaderAsync("CREATFAIL"); + + var resp = await cluster.CreateConsumerAsync("CREATFAIL", "newcons"); + resp.Error.ShouldBeNull(); + resp.ConsumerInfo.ShouldNotBeNull(); + } + + // Go ref: TestJetStreamClusterMultipleConsumersSimultaneousLeaderFailover — jetstream_cluster_2_test.go + [Fact] + public async Task Multiple_consumers_survive_simultaneous_stream_leader_failover() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + await cluster.CreateStreamAsync("MULTIFAIL", ["mf.>"], replicas: 3); + await cluster.CreateConsumerAsync("MULTIFAIL", "mcons1"); + await cluster.CreateConsumerAsync("MULTIFAIL", "mcons2"); + await cluster.CreateConsumerAsync("MULTIFAIL", "mcons3"); + + for (var i = 0; i < 5; i++) + await cluster.PublishAsync("mf.event", $"msg-{i}"); + + await cluster.StepDownStreamLeaderAsync("MULTIFAIL"); + + var batch1 = await cluster.FetchAsync("MULTIFAIL", "mcons1", 5); + var batch2 = await cluster.FetchAsync("MULTIFAIL", "mcons2", 5); + var batch3 = await cluster.FetchAsync("MULTIFAIL", "mcons3", 5); + + batch1.Messages.Count.ShouldBe(5); + batch2.Messages.Count.ShouldBe(5); + batch3.Messages.Count.ShouldBe(5); + } + + // Go ref: TestJetStreamClusterConsumerStateConsistentAfterMetaLeaderStepdown — jetstream_cluster_2_test.go + [Fact] + public async Task Consumer_state_consistent_after_meta_leader_stepdown() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + await cluster.CreateStreamAsync("METACONS", ["mc.>"], replicas: 3); + + // Use AckPolicy.None to freely advance the consumer; we verify ack-floor skip + // and meta leader stepdown interaction independently. + await cluster.CreateConsumerAsync("METACONS", "metadurcns", ackPolicy: AckPolicy.None); + + for (var i = 0; i < 5; i++) + await cluster.PublishAsync("mc.event", $"msg-{i}"); + + var batch1 = await cluster.FetchAsync("METACONS", "metadurcns", 3); + batch1.Messages.Count.ShouldBe(3); + + // Step down the meta leader between fetches + cluster.StepDownMetaLeader(); + + // Consumer state should persist — seqs 4 and 5 remain + var batch2 = await cluster.FetchAsync("METACONS", "metadurcns", 5); + batch2.Messages.Count.ShouldBe(2); + batch2.Messages[0].Sequence.ShouldBe(4UL); + } + + // Go ref: TestJetStreamClusterFetchAfterMetaLeaderStepdown — jetstream_cluster_2_test.go + [Fact] + public async Task Fetch_after_meta_leader_stepdown_works_correctly() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + await cluster.CreateStreamAsync("METAFETCH", ["mft.>"], replicas: 3); + await cluster.CreateConsumerAsync("METAFETCH", "metafetchcons"); + + for (var i = 0; i < 5; i++) + await cluster.PublishAsync("mft.event", $"msg-{i}"); + + cluster.StepDownMetaLeader(); + + var batch = await cluster.FetchAsync("METAFETCH", "metafetchcons", 5); + batch.Messages.Count.ShouldBe(5); + } + + // Go ref: TestJetStreamClusterConsumerLeaderMatchesStreamLeader — jetstream_cluster_2_test.go + [Fact] + public async Task Consumer_leader_id_is_derived_from_stream_leader() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + await cluster.CreateStreamAsync("CLIDREL", ["clr.>"], replicas: 3); + await cluster.CreateConsumerAsync("CLIDREL", "relcons"); + + var streamLeader = cluster.GetStreamLeaderId("CLIDREL"); + var consumerLeader = cluster.GetConsumerLeaderId("CLIDREL", "relcons"); + + streamLeader.ShouldNotBeNullOrEmpty(); + consumerLeader.ShouldNotBeNullOrEmpty(); + consumerLeader.ShouldContain(streamLeader); + } + + // --------------------------------------------------------------- + // State consistency + // --------------------------------------------------------------- + + // Go ref: TestJetStreamClusterConsumerInfoReflectsPendingCount — jetstream_cluster_2_test.go + [Fact] + public async Task Consumer_info_reflects_correct_pending_count() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + await cluster.CreateStreamAsync("STATEPEND", ["sp.>"], replicas: 3); + await cluster.CreateConsumerAsync("STATEPEND", "statecons", ackPolicy: AckPolicy.Explicit); + + for (var i = 0; i < 5; i++) + await cluster.PublishAsync("sp.event", $"msg-{i}"); + + var batch = await cluster.FetchAsync("STATEPEND", "statecons", 5); + batch.Messages.Count.ShouldBe(5); + + var info = await cluster.RequestAsync($"{JetStreamApiSubjects.ConsumerInfo}STATEPEND.statecons", "{}"); + info.Error.ShouldBeNull(); + info.ConsumerInfo.ShouldNotBeNull(); + } + + // Go ref: TestJetStreamClusterConsumerPendingDecrementsAfterAck — jetstream_cluster_2_test.go + // With AckPolicy.All, the engine blocks re-delivery while any pending acks exist. + // We verify pending-decrement behavior by fetching in batches, acking each fully. + [Fact] + public async Task Consumer_pending_decrements_after_ack() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + await cluster.CreateStreamAsync("DECPEND", ["dp.>"], replicas: 3); + await cluster.CreateConsumerAsync("DECPEND", "dpcons", ackPolicy: AckPolicy.All); + + for (var i = 0; i < 8; i++) + await cluster.PublishAsync("dp.event", $"msg-{i}"); + + // Fetch and ack first 4; pending clears for seqs 1-4 + var batch1 = await cluster.FetchAsync("DECPEND", "dpcons", 4); + batch1.Messages.Count.ShouldBe(4); + cluster.AckAll("DECPEND", "dpcons", 4); + + // Fetch remaining 4 — unblocked because HasPending is now false + var batch2 = await cluster.FetchAsync("DECPEND", "dpcons", 8); + batch2.Messages.Count.ShouldBe(4); + batch2.Messages[0].Sequence.ShouldBe(5UL); + } + + // Go ref: TestJetStreamClusterConsumerPendingAfterPublish — jetstream_cluster_2_test.go + [Fact] + public async Task Consumer_pending_after_publish_matches_expected_count() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + await cluster.CreateStreamAsync("PUBPEND", ["pp.>"], replicas: 3); + await cluster.CreateConsumerAsync("PUBPEND", "ppcons"); + + for (var i = 0; i < 7; i++) + await cluster.PublishAsync("pp.event", $"msg-{i}"); + + var batch = await cluster.FetchAsync("PUBPEND", "ppcons", 10); + batch.Messages.Count.ShouldBe(7); + } + + // Go ref: TestJetStreamClusterConsumerInfoAfterFailover — jetstream_cluster_2_test.go + [Fact] + public async Task Consumer_info_after_failover_matches_pre_failover() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + await cluster.CreateStreamAsync("INFOFAIL", ["if.>"], replicas: 3); + await cluster.CreateConsumerAsync("INFOFAIL", "failinfo", ackPolicy: AckPolicy.Explicit); + + var infoBefore = await cluster.RequestAsync($"{JetStreamApiSubjects.ConsumerInfo}INFOFAIL.failinfo", "{}"); + infoBefore.ConsumerInfo.ShouldNotBeNull(); + infoBefore.ConsumerInfo!.Config.DurableName.ShouldBe("failinfo"); + + await cluster.StepDownStreamLeaderAsync("INFOFAIL"); + + var infoAfter = await cluster.RequestAsync($"{JetStreamApiSubjects.ConsumerInfo}INFOFAIL.failinfo", "{}"); + infoAfter.ConsumerInfo.ShouldNotBeNull(); + infoAfter.ConsumerInfo!.Config.DurableName.ShouldBe("failinfo"); + infoAfter.ConsumerInfo.Config.AckPolicy.ShouldBe(AckPolicy.Explicit); + } + + // Go ref: TestJetStreamClusterMultipleConsumersHaveIndependentPending — jetstream_cluster_2_test.go + // Verifies that two consumers independently track their own pending state. + // Consumer 1 fetches and acks a batch; Consumer 2 fetches independently. + [Fact] + public async Task Multiple_consumers_have_independent_pending_counts() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + await cluster.CreateStreamAsync("INDEPPEND", ["ip.>"], replicas: 3); + await cluster.CreateConsumerAsync("INDEPPEND", "icons1", ackPolicy: AckPolicy.All); + await cluster.CreateConsumerAsync("INDEPPEND", "icons2", ackPolicy: AckPolicy.All); + + for (var i = 0; i < 6; i++) + await cluster.PublishAsync("ip.event", $"msg-{i}"); + + // Consumer 1: fetch 4, ack all 4, then fetch the remaining 2 + var batch1a = await cluster.FetchAsync("INDEPPEND", "icons1", 4); + batch1a.Messages.Count.ShouldBe(4); + cluster.AckAll("INDEPPEND", "icons1", 4); + + var batch1b = await cluster.FetchAsync("INDEPPEND", "icons1", 6); + batch1b.Messages.Count.ShouldBe(2); + batch1b.Messages[0].Sequence.ShouldBe(5UL); + + // Consumer 2: fetch 6 independently (unaffected by cons1's acks) + var batch2 = await cluster.FetchAsync("INDEPPEND", "icons2", 6); + batch2.Messages.Count.ShouldBe(6); + batch2.Messages[0].Sequence.ShouldBe(1UL); + } + + // Go ref: TestJetStreamClusterConsumerOnEmptyStreamHasZeroPending — jetstream_cluster_2_test.go + [Fact] + public async Task Consumer_on_empty_stream_has_zero_pending() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + await cluster.CreateStreamAsync("EMPTYPEND", ["ep.>"], replicas: 3); + await cluster.CreateConsumerAsync("EMPTYPEND", "emptycons"); + + var batch = await cluster.FetchAsync("EMPTYPEND", "emptycons", 10); + batch.Messages.Count.ShouldBe(0); + } + + // Go ref: TestJetStreamClusterConsumerCreatedAfterPublishesHasFullPending — jetstream_cluster_2_test.go + [Fact] + public async Task Consumer_created_after_publishes_has_full_pending() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + await cluster.CreateStreamAsync("LATECREATE", ["lc.>"], replicas: 3); + + for (var i = 0; i < 5; i++) + await cluster.PublishAsync("lc.event", $"msg-{i}"); + + // Consumer created AFTER publishes + await cluster.CreateConsumerAsync("LATECREATE", "latecons"); + + var batch = await cluster.FetchAsync("LATECREATE", "latecons", 10); + batch.Messages.Count.ShouldBe(5); + } + + // --------------------------------------------------------------- + // Edge cases + // --------------------------------------------------------------- + + // Go ref: TestJetStreamClusterConsumerOnNonExistentStream — jetstream_cluster_2_test.go + // Note: ConsumerManager.CreateOrUpdate does not validate stream existence at the + // consumer registration layer (stream lookup happens at fetch time). A consumer + // created on a ghost stream will have no messages to deliver. + [Fact] + public async Task Consumer_on_non_existent_stream_returns_empty_fetch() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + // Create consumer — registration succeeds (no stream validation at creation) + var resp = await cluster.CreateConsumerAsync("GHOST_STREAM", "ghostcons"); + resp.ConsumerInfo.ShouldNotBeNull(); + + // Fetch returns empty because there is no matching stream + var batch = await cluster.FetchAsync("GHOST_STREAM", "ghostcons", 10); + batch.Messages.Count.ShouldBe(0); + } + + // Go ref: TestJetStreamClusterDuplicateConsumerNameReturnsExisting — jetstream_cluster_2_test.go + [Fact] + public async Task Duplicate_consumer_name_on_same_stream_returns_existing() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + await cluster.CreateStreamAsync("DUPCNAME", ["dup.>"], replicas: 3); + + var resp1 = await cluster.CreateConsumerAsync("DUPCNAME", "samecons"); + var resp2 = await cluster.CreateConsumerAsync("DUPCNAME", "samecons"); + + resp1.Error.ShouldBeNull(); + resp2.Error.ShouldBeNull(); + resp1.ConsumerInfo!.Config.DurableName.ShouldBe("samecons"); + resp2.ConsumerInfo!.Config.DurableName.ShouldBe("samecons"); + } + + // Go ref: TestJetStreamClusterConsumerEmptyFilterMatchesAll — jetstream_cluster_2_test.go + [Fact] + public async Task Consumer_with_empty_filter_subject_matches_all_messages() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + await cluster.CreateStreamAsync("EMPTYFILT", ["ef2.>"], replicas: 3); + await cluster.CreateConsumerAsync("EMPTYFILT", "allfiltcons"); + + await cluster.PublishAsync("ef2.alpha", "a"); + await cluster.PublishAsync("ef2.beta", "b"); + await cluster.PublishAsync("ef2.gamma", "g"); + + var batch = await cluster.FetchAsync("EMPTYFILT", "allfiltcons", 10); + batch.Messages.Count.ShouldBe(3); + } + + // Go ref: TestJetStreamClusterConsumerWildcardFilterSubject — jetstream_cluster_2_test.go + [Fact] + public async Task Consumer_with_wildcard_filter_subject_matches_correct_messages() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + await cluster.CreateStreamAsync("WILDCARD", ["wc.>"], replicas: 3); + await cluster.CreateConsumerAsync("WILDCARD", "wccons", + filterSubject: "wc.alpha.>"); + + await cluster.PublishAsync("wc.alpha.1", "a1"); + await cluster.PublishAsync("wc.alpha.2", "a2"); + await cluster.PublishAsync("wc.beta.1", "b1"); + await cluster.PublishAsync("wc.alpha.3", "a3"); + + var batch = await cluster.FetchAsync("WILDCARD", "wccons", 10); + batch.Messages.Count.ShouldBe(3); + foreach (var msg in batch.Messages) + msg.Subject.ShouldStartWith("wc.alpha."); + } + + // Go ref: TestJetStreamCluster10ConsumersOnSameStream — jetstream_cluster_2_test.go + [Fact] + public async Task Ten_consumers_on_same_stream_all_work_independently() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + await cluster.CreateStreamAsync("TENCONS", ["tc.>"], replicas: 3); + + for (var i = 0; i < 10; i++) + await cluster.PublishAsync("tc.event", $"msg-{i}"); + + for (var c = 0; c < 10; c++) + { + var name = $"cons{c:D2}"; + var resp = await cluster.CreateConsumerAsync("TENCONS", name); + resp.Error.ShouldBeNull(); + + var batch = await cluster.FetchAsync("TENCONS", name, 10); + batch.Messages.Count.ShouldBe(10); + } + } + + // Go ref: TestJetStreamClusterRapidCreateDeleteCreateConsumer — jetstream_cluster_2_test.go + [Fact] + public async Task Rapid_create_delete_create_consumer_cycle_succeeds() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + await cluster.CreateStreamAsync("RAPIDCYCLE", ["rc.>"], replicas: 3); + + var create1 = await cluster.CreateConsumerAsync("RAPIDCYCLE", "cyclecns"); + create1.Error.ShouldBeNull(); + + var del = await cluster.RequestAsync($"{JetStreamApiSubjects.ConsumerDelete}RAPIDCYCLE.cyclecns", "{}"); + del.Success.ShouldBeTrue(); + + var create2 = await cluster.CreateConsumerAsync("RAPIDCYCLE", "cyclecns"); + create2.Error.ShouldBeNull(); + create2.ConsumerInfo!.Config.DurableName.ShouldBe("cyclecns"); + } + + // Go ref: TestJetStreamClusterConsumerAfterStreamPurge — jetstream_cluster_2_test.go + [Fact] + public async Task Consumer_after_stream_purge_has_zero_pending() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + await cluster.CreateStreamAsync("PURGECONS", ["purge.>"], replicas: 3); + await cluster.CreateConsumerAsync("PURGECONS", "purgecons"); + + for (var i = 0; i < 5; i++) + await cluster.PublishAsync("purge.event", $"msg-{i}"); + + // Purge the stream + var purgeResp = await cluster.RequestAsync($"{JetStreamApiSubjects.StreamPurge}PURGECONS", "{}"); + purgeResp.Success.ShouldBeTrue(); + + var state = await cluster.GetStreamStateAsync("PURGECONS"); + state.Messages.ShouldBe(0UL); + } + + // Go ref: TestJetStreamClusterConsumerAfterStreamDelete — jetstream_cluster_2_test.go + // Note: ConsumerManager does not cascade-delete consumers on stream deletion. + // After stream deletion, consumer info still returns the consumer config, but + // fetch returns empty (stream handle is gone). This matches the model's behavior. + [Fact] + public async Task Consumer_fetch_on_deleted_stream_returns_empty_batch() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + await cluster.CreateStreamAsync("DELSTREAM", ["ds.>"], replicas: 3); + await cluster.CreateConsumerAsync("DELSTREAM", "delcons"); + + for (var i = 0; i < 5; i++) + await cluster.PublishAsync("ds.event", $"msg-{i}"); + + // Delete the stream + var del = await cluster.RequestAsync($"{JetStreamApiSubjects.StreamDelete}DELSTREAM", "{}"); + del.Success.ShouldBeTrue(); + + // Stream state should be gone (GetStateAsync returns zero state) + var state = await cluster.GetStreamStateAsync("DELSTREAM"); + state.Messages.ShouldBe(0UL); + + // Fetch after stream deletion returns empty (stream handle not found) + var batch = await cluster.FetchAsync("DELSTREAM", "delcons", 10); + batch.Messages.Count.ShouldBe(0); + } + + // --------------------------------------------------------------- + // WaitOnConsumerLeader integration + // --------------------------------------------------------------- + + // Go ref: waitOnConsumerLeader helper — jetstream_helpers_test.go + [Fact] + public async Task WaitOnConsumerLeaderAsync_resolves_after_consumer_creation() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + await cluster.CreateStreamAsync("WAITCL", ["wcl.>"], replicas: 3); + await cluster.CreateConsumerAsync("WAITCL", "wclcons"); + + await cluster.WaitOnConsumerLeaderAsync("WAITCL", "wclcons", timeoutMs: 3000); + var leaderId = cluster.GetConsumerLeaderId("WAITCL", "wclcons"); + leaderId.ShouldNotBeNullOrEmpty(); + } + + // Go ref: waitOnConsumerLeader times out for missing consumer — jetstream_helpers_test.go + [Fact] + public async Task WaitOnConsumerLeaderAsync_times_out_for_missing_consumer() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + await cluster.CreateStreamAsync("TIMEOUTCL", ["tcl.>"], replicas: 3); + + var ex = await Should.ThrowAsync( + () => cluster.WaitOnConsumerLeaderAsync("TIMEOUTCL", "ghost", timeoutMs: 100)); + + ex.Message.ShouldContain("ghost"); + } + + // --------------------------------------------------------------- + // Consumer list and names + // --------------------------------------------------------------- + + // Go ref: TestJetStreamClusterConsumerNames — jetstream_cluster_2_test.go + [Fact] + public async Task Consumer_names_api_returns_created_consumers() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + await cluster.CreateStreamAsync("CNAMES", ["cn.>"], replicas: 3); + + await cluster.CreateConsumerAsync("CNAMES", "name1"); + await cluster.CreateConsumerAsync("CNAMES", "name2"); + await cluster.CreateConsumerAsync("CNAMES", "name3"); + + var resp = await cluster.RequestAsync($"{JetStreamApiSubjects.ConsumerNames}CNAMES", "{}"); + resp.Error.ShouldBeNull(); + resp.ConsumerNames.ShouldNotBeNull(); + resp.ConsumerNames!.Count.ShouldBe(3); + resp.ConsumerNames.ShouldContain("name1"); + resp.ConsumerNames.ShouldContain("name2"); + resp.ConsumerNames.ShouldContain("name3"); + } + + // Go ref: TestJetStreamClusterConsumerList — jetstream_cluster_2_test.go + [Fact] + public async Task Consumer_list_api_returns_consumer_infos() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + await cluster.CreateStreamAsync("CLIST", ["clist.>"], replicas: 3); + + await cluster.CreateConsumerAsync("CLIST", "listcons1"); + await cluster.CreateConsumerAsync("CLIST", "listcons2"); + + var resp = await cluster.RequestAsync($"{JetStreamApiSubjects.ConsumerList}CLIST", "{}"); + resp.Error.ShouldBeNull(); + } + + // --------------------------------------------------------------- + // Consumer delete + // --------------------------------------------------------------- + + // Go ref: TestJetStreamClusterConsumerDelete — jetstream_cluster_2_test.go + [Fact] + public async Task Consumer_delete_api_removes_consumer() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + await cluster.CreateStreamAsync("CDELETE", ["cdelete.>"], replicas: 3); + await cluster.CreateConsumerAsync("CDELETE", "todelete"); + + var del = await cluster.RequestAsync($"{JetStreamApiSubjects.ConsumerDelete}CDELETE.todelete", "{}"); + del.Success.ShouldBeTrue(); + + var info = await cluster.RequestAsync($"{JetStreamApiSubjects.ConsumerInfo}CDELETE.todelete", "{}"); + info.Error.ShouldNotBeNull(); + } + + // Go ref: TestJetStreamClusterConsumerDeleteMissingConsumer — jetstream_cluster_2_test.go + [Fact] + public async Task Consumer_delete_for_missing_consumer_does_not_crash() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + await cluster.CreateStreamAsync("MISSINGDEL", ["md.>"], replicas: 3); + + // Deleting a non-existent consumer should not crash + var del = await cluster.RequestAsync($"{JetStreamApiSubjects.ConsumerDelete}MISSINGDEL.ghost", "{}"); + // Result may be success (idempotent) or not-found; neither should throw + _ = del; + } + + // --------------------------------------------------------------- + // Leader stepdown for consumer + // --------------------------------------------------------------- + + // Go ref: TestJetStreamClusterConsumerLeaderStepdown — jetstream_cluster_2_test.go + [Fact] + public async Task Consumer_leader_stepdown_api_succeeds() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + await cluster.CreateStreamAsync("CONSLSD", ["clsd2.>"], replicas: 3); + await cluster.CreateConsumerAsync("CONSLSD", "lsdcons"); + + var resp = await cluster.RequestAsync($"{JetStreamApiSubjects.ConsumerLeaderStepdown}CONSLSD.lsdcons", "{}"); + resp.Success.ShouldBeTrue(); + } + + // Go ref: TestJetStreamClusterConsumerFetchAfterLeaderStepdown — jetstream_cluster_2_test.go + [Fact] + public async Task Fetch_works_after_consumer_leader_stepdown() + { + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + await cluster.CreateStreamAsync("FETCHLSD", ["flsd.>"], replicas: 3); + await cluster.CreateConsumerAsync("FETCHLSD", "lsdcons2"); + + for (var i = 0; i < 5; i++) + await cluster.PublishAsync("flsd.event", $"msg-{i}"); + + // Step down consumer leader + await cluster.RequestAsync($"{JetStreamApiSubjects.ConsumerLeaderStepdown}FETCHLSD.lsdcons2", "{}"); + + var batch = await cluster.FetchAsync("FETCHLSD", "lsdcons2", 5); + batch.Messages.Count.ShouldBe(5); + } +}