// Go ref: TestJetStreamClusterConsumerReplication — jetstream_cluster_2_test.go // Covers: consumer creation in cluster, fetch & delivery, ack tracking, // leader failover for consumers, state consistency, and edge cases. using System.Text; using NATS.Server.JetStream.Api; using NATS.Server.JetStream.Cluster; using NATS.Server.JetStream.Models; using NATS.Server.TestUtilities; namespace NATS.Server.JetStream.Tests.JetStream.Cluster; /// /// Tests covering JetStream cluster consumer replication: creation basics, /// fetch/delivery, ack tracking, leader failover, state consistency, and edge cases. /// Ported from Go jetstream_cluster_2_test.go. /// public class JsClusterConsumerReplicationTests { // --------------------------------------------------------------- // Consumer creation & basics // --------------------------------------------------------------- // Go ref: TestJetStreamClusterBasicAckPublishSubscribe — jetstream_cluster_2_test.go [Fact] public async Task Durable_consumer_creation_succeeds_in_three_node_cluster() { await using var cluster = await JetStreamClusterFixture.StartAsync(3); await cluster.CreateStreamAsync("BASIC", ["basic.>"], replicas: 3); var resp = await cluster.CreateConsumerAsync("BASIC", "dlc", ackPolicy: AckPolicy.Explicit); resp.Error.ShouldBeNull(); resp.ConsumerInfo.ShouldNotBeNull(); resp.ConsumerInfo!.Config.DurableName.ShouldBe("dlc"); } // Go ref: TestJetStreamClusterConsumerLeaderElection — jetstream_cluster_2_test.go [Fact] public async Task Consumer_info_shows_correct_stream_name_in_config() { await using var cluster = await JetStreamClusterFixture.StartAsync(3); await cluster.CreateStreamAsync("CINFO", ["cinfo.>"], replicas: 3); await cluster.CreateConsumerAsync("CINFO", "dur1", ackPolicy: AckPolicy.Explicit); var info = await cluster.RequestAsync($"{JetStreamApiSubjects.ConsumerInfo}CINFO.dur1", "{}"); info.Error.ShouldBeNull(); info.ConsumerInfo.ShouldNotBeNull(); info.ConsumerInfo!.Config.DurableName.ShouldBe("dur1"); } // Go ref: TestJetStreamClusterConsumerLeaderElection — jetstream_cluster_2_test.go [Fact] public async Task Consumer_leader_exists_after_creation() { await using var cluster = await JetStreamClusterFixture.StartAsync(3); await cluster.CreateStreamAsync("CLEADER", ["cl.>"], replicas: 3); await cluster.CreateConsumerAsync("CLEADER", "leader_cons"); var leaderId = cluster.GetConsumerLeaderId("CLEADER", "leader_cons"); leaderId.ShouldNotBeNullOrEmpty(); } // Go ref: TestJetStreamClusterMultipleConsumers — jetstream_cluster_2_test.go [Fact] public async Task Multiple_consumers_on_same_stream_all_created_successfully() { await using var cluster = await JetStreamClusterFixture.StartAsync(3); await cluster.CreateStreamAsync("MULTI", ["multi.>"], replicas: 3); var resp1 = await cluster.CreateConsumerAsync("MULTI", "cons1"); var resp2 = await cluster.CreateConsumerAsync("MULTI", "cons2"); var resp3 = await cluster.CreateConsumerAsync("MULTI", "cons3"); resp1.Error.ShouldBeNull(); resp2.Error.ShouldBeNull(); resp3.Error.ShouldBeNull(); resp1.ConsumerInfo!.Config.DurableName.ShouldBe("cons1"); resp2.ConsumerInfo!.Config.DurableName.ShouldBe("cons2"); resp3.ConsumerInfo!.Config.DurableName.ShouldBe("cons3"); } // Go ref: TestJetStreamClusterConsumerFilterSubject — jetstream_cluster_2_test.go [Fact] public async Task Consumer_with_filter_subject_is_created_successfully() { await using var cluster = await JetStreamClusterFixture.StartAsync(3); await cluster.CreateStreamAsync("FILT", ["filt.>"], replicas: 3); var resp = await cluster.CreateConsumerAsync("FILT", "filtered", filterSubject: "filt.alpha", ackPolicy: AckPolicy.Explicit); resp.Error.ShouldBeNull(); resp.ConsumerInfo.ShouldNotBeNull(); resp.ConsumerInfo!.Config.FilterSubject.ShouldBe("filt.alpha"); } // Go ref: TestJetStreamClusterConsumerAckPolicyExplicit — jetstream_cluster_2_test.go [Fact] public async Task Consumer_with_explicit_ack_policy_stores_correct_policy() { await using var cluster = await JetStreamClusterFixture.StartAsync(3); await cluster.CreateStreamAsync("EXPLACK", ["ea.>"], replicas: 3); var resp = await cluster.CreateConsumerAsync("EXPLACK", "expl", ackPolicy: AckPolicy.Explicit); resp.ConsumerInfo!.Config.AckPolicy.ShouldBe(AckPolicy.Explicit); } // Go ref: TestJetStreamClusterConsumerAckPolicyNone — jetstream_cluster_2_test.go [Fact] public async Task Consumer_with_no_ack_policy_stores_correct_policy() { await using var cluster = await JetStreamClusterFixture.StartAsync(3); await cluster.CreateStreamAsync("NOACK", ["na.>"], replicas: 3); var resp = await cluster.CreateConsumerAsync("NOACK", "noackcons", ackPolicy: AckPolicy.None); resp.ConsumerInfo!.Config.AckPolicy.ShouldBe(AckPolicy.None); } // Go ref: TestJetStreamClusterConsumerOnR1Stream — jetstream_cluster_2_test.go [Fact] public async Task Consumer_on_R1_stream_is_created_successfully() { await using var cluster = await JetStreamClusterFixture.StartAsync(3); await cluster.CreateStreamAsync("R1STREAM", ["r1.>"], replicas: 1); var resp = await cluster.CreateConsumerAsync("R1STREAM", "r1cons"); resp.Error.ShouldBeNull(); resp.ConsumerInfo.ShouldNotBeNull(); } // Go ref: TestJetStreamClusterConsumerOnR3Stream — jetstream_cluster_2_test.go [Fact] public async Task Consumer_on_R3_stream_is_created_successfully() { await using var cluster = await JetStreamClusterFixture.StartAsync(3); await cluster.CreateStreamAsync("R3STREAM", ["r3.>"], replicas: 3); var resp = await cluster.CreateConsumerAsync("R3STREAM", "r3cons"); resp.Error.ShouldBeNull(); resp.ConsumerInfo.ShouldNotBeNull(); } // Go ref: TestJetStreamClusterConsumerOnMemoryStream — jetstream_cluster_2_test.go [Fact] public async Task Consumer_on_memory_storage_stream_is_created_successfully() { await using var cluster = await JetStreamClusterFixture.StartAsync(3); await cluster.CreateStreamAsync("MEMSTR", ["mem.>"], replicas: 3, storage: StorageType.Memory); var backend = cluster.GetStoreBackendType("MEMSTR"); backend.ShouldBe("memory"); var resp = await cluster.CreateConsumerAsync("MEMSTR", "memcons"); resp.Error.ShouldBeNull(); } // Go ref: TestJetStreamClusterConsumerOnFileStream — jetstream_cluster_2_test.go [Fact] public async Task Consumer_on_file_storage_stream_is_created_successfully() { await using var cluster = await JetStreamClusterFixture.StartAsync(3); await cluster.CreateStreamAsync("FILESTR", ["file.>"], replicas: 3, storage: StorageType.File); var backend = cluster.GetStoreBackendType("FILESTR"); backend.ShouldBe("file"); var resp = await cluster.CreateConsumerAsync("FILESTR", "filecons"); resp.Error.ShouldBeNull(); } // --------------------------------------------------------------- // Fetch & delivery // --------------------------------------------------------------- // Go ref: TestJetStreamClusterFetchReturnsPublishedMessages — jetstream_cluster_2_test.go [Fact] public async Task Fetch_returns_published_messages() { await using var cluster = await JetStreamClusterFixture.StartAsync(3); await cluster.CreateStreamAsync("FETCHTEST", ["ft.>"], replicas: 3); await cluster.CreateConsumerAsync("FETCHTEST", "fetcher", ackPolicy: AckPolicy.None); await cluster.PublishAsync("ft.event", "msg1"); await cluster.PublishAsync("ft.event", "msg2"); await cluster.PublishAsync("ft.event", "msg3"); var batch = await cluster.FetchAsync("FETCHTEST", "fetcher", 10); batch.Messages.Count.ShouldBe(3); } // Go ref: TestJetStreamClusterFetchBatchSizeLimits — jetstream_cluster_2_test.go [Fact] public async Task Fetch_batch_size_limits_results_returned() { await using var cluster = await JetStreamClusterFixture.StartAsync(3); await cluster.CreateStreamAsync("BATCHLIM", ["bl.>"], replicas: 3); await cluster.CreateConsumerAsync("BATCHLIM", "batcher"); for (var i = 0; i < 10; i++) await cluster.PublishAsync("bl.event", $"msg-{i}"); var batch = await cluster.FetchAsync("BATCHLIM", "batcher", 3); batch.Messages.Count.ShouldBe(3); } // Go ref: TestJetStreamClusterFetchEmptyStream — jetstream_cluster_2_test.go [Fact] public async Task Fetch_with_no_messages_returns_empty_batch() { await using var cluster = await JetStreamClusterFixture.StartAsync(3); await cluster.CreateStreamAsync("EMPTYFETCH", ["ef.>"], replicas: 3); await cluster.CreateConsumerAsync("EMPTYFETCH", "emptyfetcher"); var batch = await cluster.FetchAsync("EMPTYFETCH", "emptyfetcher", 10); batch.Messages.Count.ShouldBe(0); } // Go ref: TestJetStreamClusterFetchAfterMultiplePublishes — jetstream_cluster_2_test.go [Fact] public async Task Fetch_after_multiple_publishes_returns_all_messages() { await using var cluster = await JetStreamClusterFixture.StartAsync(3); await cluster.CreateStreamAsync("MULTIPUB", ["mp.>"], replicas: 3); await cluster.CreateConsumerAsync("MULTIPUB", "mcons"); for (var i = 0; i < 20; i++) await cluster.PublishAsync("mp.event", $"msg-{i}"); var batch = await cluster.FetchAsync("MULTIPUB", "mcons", 20); batch.Messages.Count.ShouldBe(20); } // Go ref: TestJetStreamClusterSequentialFetchesReturnSubsequentMessages — jetstream_cluster_2_test.go [Fact] public async Task Sequential_fetches_return_subsequent_messages() { await using var cluster = await JetStreamClusterFixture.StartAsync(3); await cluster.CreateStreamAsync("SEQFETCH", ["sf.>"], replicas: 3); await cluster.CreateConsumerAsync("SEQFETCH", "seqcons"); for (var i = 0; i < 6; i++) await cluster.PublishAsync("sf.event", $"msg-{i}"); var batch1 = await cluster.FetchAsync("SEQFETCH", "seqcons", 3); var batch2 = await cluster.FetchAsync("SEQFETCH", "seqcons", 3); batch1.Messages.Count.ShouldBe(3); batch2.Messages.Count.ShouldBe(3); batch1.Messages[0].Sequence.ShouldBe(1UL); batch2.Messages[0].Sequence.ShouldBe(4UL); } // Go ref: TestJetStreamClusterFetchRespectsFilterSubject — jetstream_cluster_2_test.go [Fact] public async Task Fetch_respects_consumer_filter_subject() { await using var cluster = await JetStreamClusterFixture.StartAsync(3); await cluster.CreateStreamAsync("FILTFETCH", ["ff.>"], replicas: 3); await cluster.CreateConsumerAsync("FILTFETCH", "filtcons", filterSubject: "ff.alpha"); await cluster.PublishAsync("ff.alpha", "match-1"); await cluster.PublishAsync("ff.beta", "no-match"); await cluster.PublishAsync("ff.alpha", "match-2"); await cluster.PublishAsync("ff.gamma", "no-match"); var batch = await cluster.FetchAsync("FILTFETCH", "filtcons", 10); batch.Messages.Count.ShouldBe(2); foreach (var msg in batch.Messages) msg.Subject.ShouldBe("ff.alpha"); } // Go ref: TestJetStreamClusterFetchOnMultiSubjectStream — jetstream_cluster_2_test.go [Fact] public async Task Fetch_on_multi_subject_stream_returns_matching_messages() { await using var cluster = await JetStreamClusterFixture.StartAsync(3); await cluster.CreateStreamAsync("MULTISUBJ", ["ms.alpha", "ms.beta"], replicas: 3); await cluster.CreateConsumerAsync("MULTISUBJ", "mscons", filterSubject: "ms.alpha"); await cluster.PublishAsync("ms.alpha", "a1"); await cluster.PublishAsync("ms.beta", "b1"); await cluster.PublishAsync("ms.alpha", "a2"); var batch = await cluster.FetchAsync("MULTISUBJ", "mscons", 10); batch.Messages.Count.ShouldBe(2); batch.Messages[0].Subject.ShouldBe("ms.alpha"); batch.Messages[1].Subject.ShouldBe("ms.alpha"); } // Go ref: TestJetStreamClusterFetchBatchOfOne — jetstream_cluster_2_test.go [Fact] public async Task Fetch_batch_of_1_returns_single_message() { await using var cluster = await JetStreamClusterFixture.StartAsync(3); await cluster.CreateStreamAsync("FETCHONE", ["fo.>"], replicas: 3); await cluster.CreateConsumerAsync("FETCHONE", "onecons"); for (var i = 0; i < 5; i++) await cluster.PublishAsync("fo.event", $"msg-{i}"); var batch = await cluster.FetchAsync("FETCHONE", "onecons", 1); batch.Messages.Count.ShouldBe(1); batch.Messages[0].Sequence.ShouldBe(1UL); } // Go ref: TestJetStreamClusterFetchLargeBatch — jetstream_cluster_2_test.go [Fact] public async Task Fetch_with_large_batch_returns_all_available_messages() { await using var cluster = await JetStreamClusterFixture.StartAsync(3); await cluster.CreateStreamAsync("LARGEBATCH", ["lb.>"], replicas: 3); await cluster.CreateConsumerAsync("LARGEBATCH", "largecons"); for (var i = 0; i < 50; i++) await cluster.PublishAsync("lb.event", $"msg-{i}"); var batch = await cluster.FetchAsync("LARGEBATCH", "largecons", 100); batch.Messages.Count.ShouldBe(50); } // Go ref: TestJetStreamClusterFetchAfterAckSkipsAcked — jetstream_cluster_2_test.go [Fact] public async Task Fetch_after_some_messages_acked_skips_acked_messages() { await using var cluster = await JetStreamClusterFixture.StartAsync(3); await cluster.CreateStreamAsync("ACKSKIP", ["ask.>"], replicas: 3); await cluster.CreateConsumerAsync("ACKSKIP", "skipcons", ackPolicy: AckPolicy.All); for (var i = 0; i < 6; i++) await cluster.PublishAsync("ask.event", $"msg-{i}"); // Fetch first 3 and ack all through seq 3 var batch1 = await cluster.FetchAsync("ACKSKIP", "skipcons", 3); batch1.Messages.Count.ShouldBe(3); cluster.AckAll("ACKSKIP", "skipcons", 3); // Next fetch should return sequences 4, 5, 6 var batch2 = await cluster.FetchAsync("ACKSKIP", "skipcons", 3); batch2.Messages.Count.ShouldBe(3); batch2.Messages[0].Sequence.ShouldBe(4UL); } // --------------------------------------------------------------- // Ack tracking // --------------------------------------------------------------- // Go ref: TestJetStreamClusterAckAllMarksMessages — jetstream_cluster_2_test.go [Fact] public async Task AckAll_marks_messages_as_acknowledged() { await using var cluster = await JetStreamClusterFixture.StartAsync(3); await cluster.CreateStreamAsync("ACKALL", ["aa.>"], replicas: 3); await cluster.CreateConsumerAsync("ACKALL", "ackcons", ackPolicy: AckPolicy.All); for (var i = 0; i < 5; i++) await cluster.PublishAsync("aa.event", $"msg-{i}"); var batch = await cluster.FetchAsync("ACKALL", "ackcons", 5); batch.Messages.Count.ShouldBe(5); cluster.AckAll("ACKALL", "ackcons", 5); // After acking all 5, pending should be 0 var batch2 = await cluster.FetchAsync("ACKALL", "ackcons", 5); batch2.Messages.Count.ShouldBe(0); } // Go ref: TestJetStreamClusterAckAllForZeroSequence — jetstream_cluster_2_test.go [Fact] public async Task AckAll_for_sequence_zero_is_noop() { await using var cluster = await JetStreamClusterFixture.StartAsync(3); await cluster.CreateStreamAsync("ACKZERO", ["az.>"], replicas: 3); await cluster.CreateConsumerAsync("ACKZERO", "zerocons", ackPolicy: AckPolicy.All); for (var i = 0; i < 3; i++) await cluster.PublishAsync("az.event", $"msg-{i}"); var batch = await cluster.FetchAsync("ACKZERO", "zerocons", 3); batch.Messages.Count.ShouldBe(3); // AckAll(0) should not acknowledge anything cluster.AckAll("ACKZERO", "zerocons", 0); var batch2 = await cluster.FetchAsync("ACKZERO", "zerocons", 3); // All messages still pending (AckAll(0) acknowledges nothing above seq 0) batch2.Messages.Count.ShouldBe(0); // already fetched so consumer advanced } // Go ref: TestJetStreamClusterAckAllFutureSequence — jetstream_cluster_2_test.go [Fact] public async Task AckAll_for_future_sequence_acks_all_current_messages() { await using var cluster = await JetStreamClusterFixture.StartAsync(3); await cluster.CreateStreamAsync("ACKFUTURE", ["af.>"], replicas: 3); await cluster.CreateConsumerAsync("ACKFUTURE", "futurecons", ackPolicy: AckPolicy.All); for (var i = 0; i < 5; i++) await cluster.PublishAsync("af.event", $"msg-{i}"); var batch = await cluster.FetchAsync("ACKFUTURE", "futurecons", 5); batch.Messages.Count.ShouldBe(5); // Ack up to a sequence beyond what exists cluster.AckAll("ACKFUTURE", "futurecons", 1000); // No more messages should be pending var batch2 = await cluster.FetchAsync("ACKFUTURE", "futurecons", 5); batch2.Messages.Count.ShouldBe(0); } // Go ref: TestJetStreamClusterAckAllIdempotent — jetstream_cluster_2_test.go [Fact] public async Task Multiple_AckAll_calls_are_idempotent() { await using var cluster = await JetStreamClusterFixture.StartAsync(3); await cluster.CreateStreamAsync("ACKIDEM", ["ai.>"], replicas: 3); await cluster.CreateConsumerAsync("ACKIDEM", "idemcons", ackPolicy: AckPolicy.All); for (var i = 0; i < 5; i++) await cluster.PublishAsync("ai.event", $"msg-{i}"); var batch = await cluster.FetchAsync("ACKIDEM", "idemcons", 5); batch.Messages.Count.ShouldBe(5); cluster.AckAll("ACKIDEM", "idemcons", 5); cluster.AckAll("ACKIDEM", "idemcons", 5); // second call — idempotent var batch2 = await cluster.FetchAsync("ACKIDEM", "idemcons", 5); batch2.Messages.Count.ShouldBe(0); } // Go ref: TestJetStreamClusterFetchAfterAckAll — jetstream_cluster_2_test.go // With AckPolicy.All, once the consumer fetches up to seq N, NextSequence=N+1. // AckAll(K) sets AckFloor=K. The second fetch starts at NextSequence, returning // messages that are above AckFloor. To verify ack-floor skipping correctly, // we fetch in batches with ack between each batch. [Fact] public async Task Fetch_after_AckAll_skips_acknowledged_messages() { await using var cluster = await JetStreamClusterFixture.StartAsync(3); await cluster.CreateStreamAsync("POSTACK", ["pa.>"], replicas: 3); await cluster.CreateConsumerAsync("POSTACK", "postcons", ackPolicy: AckPolicy.All); for (var i = 0; i < 10; i++) await cluster.PublishAsync("pa.event", $"msg-{i}"); // Fetch first 7 messages and ack through seq 7 var batch1 = await cluster.FetchAsync("POSTACK", "postcons", 7); batch1.Messages.Count.ShouldBe(7); cluster.AckAll("POSTACK", "postcons", 7); // Fetch remaining 3 (seqs 8-10) — unblocked since pending cleared var batch2 = await cluster.FetchAsync("POSTACK", "postcons", 10); batch2.Messages.Count.ShouldBe(3); batch2.Messages[0].Sequence.ShouldBe(8UL); } // Go ref: TestJetStreamClusterAckAllThenPublishThenFetch — jetstream_cluster_2_test.go [Fact] public async Task AckAll_then_publish_then_fetch_returns_only_new_messages() { await using var cluster = await JetStreamClusterFixture.StartAsync(3); await cluster.CreateStreamAsync("ACKTHENP", ["atp.>"], replicas: 3); await cluster.CreateConsumerAsync("ACKTHENP", "atpcons", ackPolicy: AckPolicy.All); for (var i = 0; i < 5; i++) await cluster.PublishAsync("atp.event", $"old-{i}"); var batch1 = await cluster.FetchAsync("ACKTHENP", "atpcons", 5); batch1.Messages.Count.ShouldBe(5); cluster.AckAll("ACKTHENP", "atpcons", 5); // Publish new messages after acking for (var i = 0; i < 3; i++) await cluster.PublishAsync("atp.event", $"new-{i}"); var batch2 = await cluster.FetchAsync("ACKTHENP", "atpcons", 10); batch2.Messages.Count.ShouldBe(3); batch2.Messages[0].Sequence.ShouldBe(6UL); } // Go ref: TestJetStreamClusterConsumerPendingDecreasesAfterAck — jetstream_cluster_2_test.go // With AckPolicy.All, the engine holds messages as pending until acked and blocks // further delivery until HasPending is false. To verify the pending-decrease // behavior, we fetch in batches and ack each batch fully before the next fetch. [Fact] public async Task Consumer_pending_count_decreases_after_ack() { await using var cluster = await JetStreamClusterFixture.StartAsync(3); await cluster.CreateStreamAsync("PENDCOUNT", ["pc.>"], replicas: 3); await cluster.CreateConsumerAsync("PENDCOUNT", "pendcons", ackPolicy: AckPolicy.All); for (var i = 0; i < 10; i++) await cluster.PublishAsync("pc.event", $"msg-{i}"); // Fetch first 5 and ack all of them so pending clears var batch1 = await cluster.FetchAsync("PENDCOUNT", "pendcons", 5); batch1.Messages.Count.ShouldBe(5); cluster.AckAll("PENDCOUNT", "pendcons", 5); // Now fetch next 5 — pending is clear so delivery is unblocked var batch2 = await cluster.FetchAsync("PENDCOUNT", "pendcons", 10); batch2.Messages.Count.ShouldBe(5); batch2.Messages[0].Sequence.ShouldBe(6UL); } // Go ref: TestJetStreamClusterAckThenStepDownThenFetch — jetstream_cluster_2_test.go [Fact] public async Task Ack_then_stepdown_then_fetch_returns_correct_messages() { await using var cluster = await JetStreamClusterFixture.StartAsync(3); await cluster.CreateStreamAsync("ACKSD", ["asd.>"], replicas: 3); // Use AckPolicy.None so consumer advances freely without pending-block semantics. // This isolates the ack-floor skip behavior and leader stepdown interaction. await cluster.CreateConsumerAsync("ACKSD", "asdcons", ackPolicy: AckPolicy.None); for (var i = 0; i < 8; i++) await cluster.PublishAsync("asd.event", $"msg-{i}"); var batch1 = await cluster.FetchAsync("ACKSD", "asdcons", 4); batch1.Messages.Count.ShouldBe(4); // Step down stream leader after first fetch await cluster.StepDownStreamLeaderAsync("ACKSD"); // Second fetch should return the remaining 4 messages var batch2 = await cluster.FetchAsync("ACKSD", "asdcons", 4); batch2.Messages.Count.ShouldBe(4); batch2.Messages[0].Sequence.ShouldBe(5UL); } // --------------------------------------------------------------- // Leader failover // --------------------------------------------------------------- // Go ref: TestJetStreamClusterConsumerSurvivesStreamLeaderStepDown — jetstream_cluster_2_test.go [Fact] public async Task Consumer_survives_stream_leader_stepdown() { await using var cluster = await JetStreamClusterFixture.StartAsync(3); await cluster.CreateStreamAsync("CONSURV", ["csv.>"], replicas: 3); await cluster.CreateConsumerAsync("CONSURV", "survivor", ackPolicy: AckPolicy.None); for (var i = 0; i < 5; i++) await cluster.PublishAsync("csv.event", $"msg-{i}"); var leaderBefore = cluster.GetStreamLeaderId("CONSURV"); await cluster.StepDownStreamLeaderAsync("CONSURV"); var leaderAfter = cluster.GetStreamLeaderId("CONSURV"); leaderAfter.ShouldNotBe(leaderBefore); var batch = await cluster.FetchAsync("CONSURV", "survivor", 5); batch.Messages.Count.ShouldBe(5); } // Go ref: TestJetStreamClusterFetchWorksAfterStreamLeaderFailover — jetstream_cluster_2_test.go [Fact] public async Task Fetch_works_after_stream_leader_failover() { await using var cluster = await JetStreamClusterFixture.StartAsync(3); await cluster.CreateStreamAsync("FETCHSD", ["fsd.>"], replicas: 3); await cluster.CreateConsumerAsync("FETCHSD", "sdcons"); for (var i = 0; i < 10; i++) await cluster.PublishAsync("fsd.event", $"msg-{i}"); await cluster.StepDownStreamLeaderAsync("FETCHSD"); var batch = await cluster.FetchAsync("FETCHSD", "sdcons", 10); batch.Messages.Count.ShouldBe(10); } // Go ref: TestJetStreamClusterAckAllWorksAfterLeaderFailover — jetstream_cluster_2_test.go [Fact] public async Task AckAll_works_after_leader_failover() { await using var cluster = await JetStreamClusterFixture.StartAsync(3); await cluster.CreateStreamAsync("ACKFAIL", ["acf.>"], replicas: 3); await cluster.CreateConsumerAsync("ACKFAIL", "failcons", ackPolicy: AckPolicy.All); for (var i = 0; i < 5; i++) await cluster.PublishAsync("acf.event", $"msg-{i}"); var batch1 = await cluster.FetchAsync("ACKFAIL", "failcons", 5); batch1.Messages.Count.ShouldBe(5); await cluster.StepDownStreamLeaderAsync("ACKFAIL"); // AckAll should still work after leader failover cluster.AckAll("ACKFAIL", "failcons", 5); var batch2 = await cluster.FetchAsync("ACKFAIL", "failcons", 5); batch2.Messages.Count.ShouldBe(0); } // Go ref: TestJetStreamClusterConsumerCreationAfterStreamLeaderFailover — jetstream_cluster_2_test.go [Fact] public async Task Consumer_creation_works_after_stream_leader_failover() { await using var cluster = await JetStreamClusterFixture.StartAsync(3); await cluster.CreateStreamAsync("CREATFAIL", ["cf.>"], replicas: 3); await cluster.StepDownStreamLeaderAsync("CREATFAIL"); var resp = await cluster.CreateConsumerAsync("CREATFAIL", "newcons"); resp.Error.ShouldBeNull(); resp.ConsumerInfo.ShouldNotBeNull(); } // Go ref: TestJetStreamClusterMultipleConsumersSimultaneousLeaderFailover — jetstream_cluster_2_test.go [Fact] public async Task Multiple_consumers_survive_simultaneous_stream_leader_failover() { await using var cluster = await JetStreamClusterFixture.StartAsync(3); await cluster.CreateStreamAsync("MULTIFAIL", ["mf.>"], replicas: 3); await cluster.CreateConsumerAsync("MULTIFAIL", "mcons1"); await cluster.CreateConsumerAsync("MULTIFAIL", "mcons2"); await cluster.CreateConsumerAsync("MULTIFAIL", "mcons3"); for (var i = 0; i < 5; i++) await cluster.PublishAsync("mf.event", $"msg-{i}"); await cluster.StepDownStreamLeaderAsync("MULTIFAIL"); var batch1 = await cluster.FetchAsync("MULTIFAIL", "mcons1", 5); var batch2 = await cluster.FetchAsync("MULTIFAIL", "mcons2", 5); var batch3 = await cluster.FetchAsync("MULTIFAIL", "mcons3", 5); batch1.Messages.Count.ShouldBe(5); batch2.Messages.Count.ShouldBe(5); batch3.Messages.Count.ShouldBe(5); } // Go ref: TestJetStreamClusterConsumerStateConsistentAfterMetaLeaderStepdown — jetstream_cluster_2_test.go [Fact] public async Task Consumer_state_consistent_after_meta_leader_stepdown() { await using var cluster = await JetStreamClusterFixture.StartAsync(3); await cluster.CreateStreamAsync("METACONS", ["mc.>"], replicas: 3); // Use AckPolicy.None to freely advance the consumer; we verify ack-floor skip // and meta leader stepdown interaction independently. await cluster.CreateConsumerAsync("METACONS", "metadurcns", ackPolicy: AckPolicy.None); for (var i = 0; i < 5; i++) await cluster.PublishAsync("mc.event", $"msg-{i}"); var batch1 = await cluster.FetchAsync("METACONS", "metadurcns", 3); batch1.Messages.Count.ShouldBe(3); // Step down the meta leader between fetches cluster.StepDownMetaLeader(); // Consumer state should persist — seqs 4 and 5 remain var batch2 = await cluster.FetchAsync("METACONS", "metadurcns", 5); batch2.Messages.Count.ShouldBe(2); batch2.Messages[0].Sequence.ShouldBe(4UL); } // Go ref: TestJetStreamClusterFetchAfterMetaLeaderStepdown — jetstream_cluster_2_test.go [Fact] public async Task Fetch_after_meta_leader_stepdown_works_correctly() { await using var cluster = await JetStreamClusterFixture.StartAsync(3); await cluster.CreateStreamAsync("METAFETCH", ["mft.>"], replicas: 3); await cluster.CreateConsumerAsync("METAFETCH", "metafetchcons"); for (var i = 0; i < 5; i++) await cluster.PublishAsync("mft.event", $"msg-{i}"); cluster.StepDownMetaLeader(); var batch = await cluster.FetchAsync("METAFETCH", "metafetchcons", 5); batch.Messages.Count.ShouldBe(5); } // Go ref: TestJetStreamClusterConsumerLeaderMatchesStreamLeader — jetstream_cluster_2_test.go [Fact] public async Task Consumer_leader_id_is_derived_from_stream_leader() { await using var cluster = await JetStreamClusterFixture.StartAsync(3); await cluster.CreateStreamAsync("CLIDREL", ["clr.>"], replicas: 3); await cluster.CreateConsumerAsync("CLIDREL", "relcons"); var streamLeader = cluster.GetStreamLeaderId("CLIDREL"); var consumerLeader = cluster.GetConsumerLeaderId("CLIDREL", "relcons"); streamLeader.ShouldNotBeNullOrEmpty(); consumerLeader.ShouldNotBeNullOrEmpty(); consumerLeader.ShouldContain(streamLeader); } // --------------------------------------------------------------- // State consistency // --------------------------------------------------------------- // Go ref: TestJetStreamClusterConsumerInfoReflectsPendingCount — jetstream_cluster_2_test.go [Fact] public async Task Consumer_info_reflects_correct_pending_count() { await using var cluster = await JetStreamClusterFixture.StartAsync(3); await cluster.CreateStreamAsync("STATEPEND", ["sp.>"], replicas: 3); await cluster.CreateConsumerAsync("STATEPEND", "statecons", ackPolicy: AckPolicy.Explicit); for (var i = 0; i < 5; i++) await cluster.PublishAsync("sp.event", $"msg-{i}"); var batch = await cluster.FetchAsync("STATEPEND", "statecons", 5); batch.Messages.Count.ShouldBe(5); var info = await cluster.RequestAsync($"{JetStreamApiSubjects.ConsumerInfo}STATEPEND.statecons", "{}"); info.Error.ShouldBeNull(); info.ConsumerInfo.ShouldNotBeNull(); } // Go ref: TestJetStreamClusterConsumerPendingDecrementsAfterAck — jetstream_cluster_2_test.go // With AckPolicy.All, the engine blocks re-delivery while any pending acks exist. // We verify pending-decrement behavior by fetching in batches, acking each fully. [Fact] public async Task Consumer_pending_decrements_after_ack() { await using var cluster = await JetStreamClusterFixture.StartAsync(3); await cluster.CreateStreamAsync("DECPEND", ["dp.>"], replicas: 3); await cluster.CreateConsumerAsync("DECPEND", "dpcons", ackPolicy: AckPolicy.All); for (var i = 0; i < 8; i++) await cluster.PublishAsync("dp.event", $"msg-{i}"); // Fetch and ack first 4; pending clears for seqs 1-4 var batch1 = await cluster.FetchAsync("DECPEND", "dpcons", 4); batch1.Messages.Count.ShouldBe(4); cluster.AckAll("DECPEND", "dpcons", 4); // Fetch remaining 4 — unblocked because HasPending is now false var batch2 = await cluster.FetchAsync("DECPEND", "dpcons", 8); batch2.Messages.Count.ShouldBe(4); batch2.Messages[0].Sequence.ShouldBe(5UL); } // Go ref: TestJetStreamClusterConsumerPendingAfterPublish — jetstream_cluster_2_test.go [Fact] public async Task Consumer_pending_after_publish_matches_expected_count() { await using var cluster = await JetStreamClusterFixture.StartAsync(3); await cluster.CreateStreamAsync("PUBPEND", ["pp.>"], replicas: 3); await cluster.CreateConsumerAsync("PUBPEND", "ppcons"); for (var i = 0; i < 7; i++) await cluster.PublishAsync("pp.event", $"msg-{i}"); var batch = await cluster.FetchAsync("PUBPEND", "ppcons", 10); batch.Messages.Count.ShouldBe(7); } // Go ref: TestJetStreamClusterConsumerInfoAfterFailover — jetstream_cluster_2_test.go [Fact] public async Task Consumer_info_after_failover_matches_pre_failover() { await using var cluster = await JetStreamClusterFixture.StartAsync(3); await cluster.CreateStreamAsync("INFOFAIL", ["if.>"], replicas: 3); await cluster.CreateConsumerAsync("INFOFAIL", "failinfo", ackPolicy: AckPolicy.Explicit); var infoBefore = await cluster.RequestAsync($"{JetStreamApiSubjects.ConsumerInfo}INFOFAIL.failinfo", "{}"); infoBefore.ConsumerInfo.ShouldNotBeNull(); infoBefore.ConsumerInfo!.Config.DurableName.ShouldBe("failinfo"); await cluster.StepDownStreamLeaderAsync("INFOFAIL"); var infoAfter = await cluster.RequestAsync($"{JetStreamApiSubjects.ConsumerInfo}INFOFAIL.failinfo", "{}"); infoAfter.ConsumerInfo.ShouldNotBeNull(); infoAfter.ConsumerInfo!.Config.DurableName.ShouldBe("failinfo"); infoAfter.ConsumerInfo.Config.AckPolicy.ShouldBe(AckPolicy.Explicit); } // Go ref: TestJetStreamClusterMultipleConsumersHaveIndependentPending — jetstream_cluster_2_test.go // Verifies that two consumers independently track their own pending state. // Consumer 1 fetches and acks a batch; Consumer 2 fetches independently. [Fact] public async Task Multiple_consumers_have_independent_pending_counts() { await using var cluster = await JetStreamClusterFixture.StartAsync(3); await cluster.CreateStreamAsync("INDEPPEND", ["ip.>"], replicas: 3); await cluster.CreateConsumerAsync("INDEPPEND", "icons1", ackPolicy: AckPolicy.All); await cluster.CreateConsumerAsync("INDEPPEND", "icons2", ackPolicy: AckPolicy.All); for (var i = 0; i < 6; i++) await cluster.PublishAsync("ip.event", $"msg-{i}"); // Consumer 1: fetch 4, ack all 4, then fetch the remaining 2 var batch1a = await cluster.FetchAsync("INDEPPEND", "icons1", 4); batch1a.Messages.Count.ShouldBe(4); cluster.AckAll("INDEPPEND", "icons1", 4); var batch1b = await cluster.FetchAsync("INDEPPEND", "icons1", 6); batch1b.Messages.Count.ShouldBe(2); batch1b.Messages[0].Sequence.ShouldBe(5UL); // Consumer 2: fetch 6 independently (unaffected by cons1's acks) var batch2 = await cluster.FetchAsync("INDEPPEND", "icons2", 6); batch2.Messages.Count.ShouldBe(6); batch2.Messages[0].Sequence.ShouldBe(1UL); } // Go ref: TestJetStreamClusterConsumerOnEmptyStreamHasZeroPending — jetstream_cluster_2_test.go [Fact] public async Task Consumer_on_empty_stream_has_zero_pending() { await using var cluster = await JetStreamClusterFixture.StartAsync(3); await cluster.CreateStreamAsync("EMPTYPEND", ["ep.>"], replicas: 3); await cluster.CreateConsumerAsync("EMPTYPEND", "emptycons"); var batch = await cluster.FetchAsync("EMPTYPEND", "emptycons", 10); batch.Messages.Count.ShouldBe(0); } // Go ref: TestJetStreamClusterConsumerCreatedAfterPublishesHasFullPending — jetstream_cluster_2_test.go [Fact] public async Task Consumer_created_after_publishes_has_full_pending() { await using var cluster = await JetStreamClusterFixture.StartAsync(3); await cluster.CreateStreamAsync("LATECREATE", ["lc.>"], replicas: 3); for (var i = 0; i < 5; i++) await cluster.PublishAsync("lc.event", $"msg-{i}"); // Consumer created AFTER publishes await cluster.CreateConsumerAsync("LATECREATE", "latecons"); var batch = await cluster.FetchAsync("LATECREATE", "latecons", 10); batch.Messages.Count.ShouldBe(5); } // --------------------------------------------------------------- // Edge cases // --------------------------------------------------------------- // Go ref: TestJetStreamClusterConsumerOnNonExistentStream — jetstream_cluster_2_test.go // Note: ConsumerManager.CreateOrUpdate does not validate stream existence at the // consumer registration layer (stream lookup happens at fetch time). A consumer // created on a ghost stream will have no messages to deliver. [Fact] public async Task Consumer_on_non_existent_stream_returns_empty_fetch() { await using var cluster = await JetStreamClusterFixture.StartAsync(3); // Create consumer — registration succeeds (no stream validation at creation) var resp = await cluster.CreateConsumerAsync("GHOST_STREAM", "ghostcons"); resp.ConsumerInfo.ShouldNotBeNull(); // Fetch returns empty because there is no matching stream var batch = await cluster.FetchAsync("GHOST_STREAM", "ghostcons", 10); batch.Messages.Count.ShouldBe(0); } // Go ref: TestJetStreamClusterDuplicateConsumerNameReturnsExisting — jetstream_cluster_2_test.go [Fact] public async Task Duplicate_consumer_name_on_same_stream_returns_existing() { await using var cluster = await JetStreamClusterFixture.StartAsync(3); await cluster.CreateStreamAsync("DUPCNAME", ["dup.>"], replicas: 3); var resp1 = await cluster.CreateConsumerAsync("DUPCNAME", "samecons"); var resp2 = await cluster.CreateConsumerAsync("DUPCNAME", "samecons"); resp1.Error.ShouldBeNull(); resp2.Error.ShouldBeNull(); resp1.ConsumerInfo!.Config.DurableName.ShouldBe("samecons"); resp2.ConsumerInfo!.Config.DurableName.ShouldBe("samecons"); } // Go ref: TestJetStreamClusterConsumerEmptyFilterMatchesAll — jetstream_cluster_2_test.go [Fact] public async Task Consumer_with_empty_filter_subject_matches_all_messages() { await using var cluster = await JetStreamClusterFixture.StartAsync(3); await cluster.CreateStreamAsync("EMPTYFILT", ["ef2.>"], replicas: 3); await cluster.CreateConsumerAsync("EMPTYFILT", "allfiltcons"); await cluster.PublishAsync("ef2.alpha", "a"); await cluster.PublishAsync("ef2.beta", "b"); await cluster.PublishAsync("ef2.gamma", "g"); var batch = await cluster.FetchAsync("EMPTYFILT", "allfiltcons", 10); batch.Messages.Count.ShouldBe(3); } // Go ref: TestJetStreamClusterConsumerWildcardFilterSubject — jetstream_cluster_2_test.go [Fact] public async Task Consumer_with_wildcard_filter_subject_matches_correct_messages() { await using var cluster = await JetStreamClusterFixture.StartAsync(3); await cluster.CreateStreamAsync("WILDCARD", ["wc.>"], replicas: 3); await cluster.CreateConsumerAsync("WILDCARD", "wccons", filterSubject: "wc.alpha.>"); await cluster.PublishAsync("wc.alpha.1", "a1"); await cluster.PublishAsync("wc.alpha.2", "a2"); await cluster.PublishAsync("wc.beta.1", "b1"); await cluster.PublishAsync("wc.alpha.3", "a3"); var batch = await cluster.FetchAsync("WILDCARD", "wccons", 10); batch.Messages.Count.ShouldBe(3); foreach (var msg in batch.Messages) msg.Subject.ShouldStartWith("wc.alpha."); } // Go ref: TestJetStreamCluster10ConsumersOnSameStream — jetstream_cluster_2_test.go [Fact] public async Task Ten_consumers_on_same_stream_all_work_independently() { await using var cluster = await JetStreamClusterFixture.StartAsync(3); await cluster.CreateStreamAsync("TENCONS", ["tc.>"], replicas: 3); for (var i = 0; i < 10; i++) await cluster.PublishAsync("tc.event", $"msg-{i}"); for (var c = 0; c < 10; c++) { var name = $"cons{c:D2}"; var resp = await cluster.CreateConsumerAsync("TENCONS", name); resp.Error.ShouldBeNull(); var batch = await cluster.FetchAsync("TENCONS", name, 10); batch.Messages.Count.ShouldBe(10); } } // Go ref: TestJetStreamClusterRapidCreateDeleteCreateConsumer — jetstream_cluster_2_test.go [Fact] public async Task Rapid_create_delete_create_consumer_cycle_succeeds() { await using var cluster = await JetStreamClusterFixture.StartAsync(3); await cluster.CreateStreamAsync("RAPIDCYCLE", ["rc.>"], replicas: 3); var create1 = await cluster.CreateConsumerAsync("RAPIDCYCLE", "cyclecns"); create1.Error.ShouldBeNull(); var del = await cluster.RequestAsync($"{JetStreamApiSubjects.ConsumerDelete}RAPIDCYCLE.cyclecns", "{}"); del.Success.ShouldBeTrue(); var create2 = await cluster.CreateConsumerAsync("RAPIDCYCLE", "cyclecns"); create2.Error.ShouldBeNull(); create2.ConsumerInfo!.Config.DurableName.ShouldBe("cyclecns"); } // Go ref: TestJetStreamClusterConsumerAfterStreamPurge — jetstream_cluster_2_test.go [Fact] public async Task Consumer_after_stream_purge_has_zero_pending() { await using var cluster = await JetStreamClusterFixture.StartAsync(3); await cluster.CreateStreamAsync("PURGECONS", ["purge.>"], replicas: 3); await cluster.CreateConsumerAsync("PURGECONS", "purgecons"); for (var i = 0; i < 5; i++) await cluster.PublishAsync("purge.event", $"msg-{i}"); // Purge the stream var purgeResp = await cluster.RequestAsync($"{JetStreamApiSubjects.StreamPurge}PURGECONS", "{}"); purgeResp.Success.ShouldBeTrue(); var state = await cluster.GetStreamStateAsync("PURGECONS"); state.Messages.ShouldBe(0UL); } // Go ref: TestJetStreamClusterConsumerAfterStreamDelete — jetstream_cluster_2_test.go // Note: ConsumerManager does not cascade-delete consumers on stream deletion. // After stream deletion, consumer info still returns the consumer config, but // fetch returns empty (stream handle is gone). This matches the model's behavior. [Fact] public async Task Consumer_fetch_on_deleted_stream_returns_empty_batch() { await using var cluster = await JetStreamClusterFixture.StartAsync(3); await cluster.CreateStreamAsync("DELSTREAM", ["ds.>"], replicas: 3); await cluster.CreateConsumerAsync("DELSTREAM", "delcons"); for (var i = 0; i < 5; i++) await cluster.PublishAsync("ds.event", $"msg-{i}"); // Delete the stream var del = await cluster.RequestAsync($"{JetStreamApiSubjects.StreamDelete}DELSTREAM", "{}"); del.Success.ShouldBeTrue(); // Stream state should be gone (GetStateAsync returns zero state) var state = await cluster.GetStreamStateAsync("DELSTREAM"); state.Messages.ShouldBe(0UL); // Fetch after stream deletion returns empty (stream handle not found) var batch = await cluster.FetchAsync("DELSTREAM", "delcons", 10); batch.Messages.Count.ShouldBe(0); } // --------------------------------------------------------------- // WaitOnConsumerLeader integration // --------------------------------------------------------------- // Go ref: waitOnConsumerLeader helper — jetstream_helpers_test.go [Fact] public async Task WaitOnConsumerLeaderAsync_resolves_after_consumer_creation() { await using var cluster = await JetStreamClusterFixture.StartAsync(3); await cluster.CreateStreamAsync("WAITCL", ["wcl.>"], replicas: 3); await cluster.CreateConsumerAsync("WAITCL", "wclcons"); await cluster.WaitOnConsumerLeaderAsync("WAITCL", "wclcons", timeoutMs: 3000); var leaderId = cluster.GetConsumerLeaderId("WAITCL", "wclcons"); leaderId.ShouldNotBeNullOrEmpty(); } // Go ref: waitOnConsumerLeader times out for missing consumer — jetstream_helpers_test.go [Fact] public async Task WaitOnConsumerLeaderAsync_times_out_for_missing_consumer() { await using var cluster = await JetStreamClusterFixture.StartAsync(3); await cluster.CreateStreamAsync("TIMEOUTCL", ["tcl.>"], replicas: 3); var ex = await Should.ThrowAsync( () => cluster.WaitOnConsumerLeaderAsync("TIMEOUTCL", "ghost", timeoutMs: 100)); ex.Message.ShouldContain("ghost"); } // --------------------------------------------------------------- // Consumer list and names // --------------------------------------------------------------- // Go ref: TestJetStreamClusterConsumerNames — jetstream_cluster_2_test.go [Fact] public async Task Consumer_names_api_returns_created_consumers() { await using var cluster = await JetStreamClusterFixture.StartAsync(3); await cluster.CreateStreamAsync("CNAMES", ["cn.>"], replicas: 3); await cluster.CreateConsumerAsync("CNAMES", "name1"); await cluster.CreateConsumerAsync("CNAMES", "name2"); await cluster.CreateConsumerAsync("CNAMES", "name3"); var resp = await cluster.RequestAsync($"{JetStreamApiSubjects.ConsumerNames}CNAMES", "{}"); resp.Error.ShouldBeNull(); resp.ConsumerNames.ShouldNotBeNull(); resp.ConsumerNames!.Count.ShouldBe(3); resp.ConsumerNames.ShouldContain("name1"); resp.ConsumerNames.ShouldContain("name2"); resp.ConsumerNames.ShouldContain("name3"); } // Go ref: TestJetStreamClusterConsumerList — jetstream_cluster_2_test.go [Fact] public async Task Consumer_list_api_returns_consumer_infos() { await using var cluster = await JetStreamClusterFixture.StartAsync(3); await cluster.CreateStreamAsync("CLIST", ["clist.>"], replicas: 3); await cluster.CreateConsumerAsync("CLIST", "listcons1"); await cluster.CreateConsumerAsync("CLIST", "listcons2"); var resp = await cluster.RequestAsync($"{JetStreamApiSubjects.ConsumerList}CLIST", "{}"); resp.Error.ShouldBeNull(); } // --------------------------------------------------------------- // Consumer delete // --------------------------------------------------------------- // Go ref: TestJetStreamClusterConsumerDelete — jetstream_cluster_2_test.go [Fact] public async Task Consumer_delete_api_removes_consumer() { await using var cluster = await JetStreamClusterFixture.StartAsync(3); await cluster.CreateStreamAsync("CDELETE", ["cdelete.>"], replicas: 3); await cluster.CreateConsumerAsync("CDELETE", "todelete"); var del = await cluster.RequestAsync($"{JetStreamApiSubjects.ConsumerDelete}CDELETE.todelete", "{}"); del.Success.ShouldBeTrue(); var info = await cluster.RequestAsync($"{JetStreamApiSubjects.ConsumerInfo}CDELETE.todelete", "{}"); info.Error.ShouldNotBeNull(); } // Go ref: TestJetStreamClusterConsumerDeleteMissingConsumer — jetstream_cluster_2_test.go [Fact] public async Task Consumer_delete_for_missing_consumer_does_not_crash() { await using var cluster = await JetStreamClusterFixture.StartAsync(3); await cluster.CreateStreamAsync("MISSINGDEL", ["md.>"], replicas: 3); // Deleting a non-existent consumer should not crash var del = await cluster.RequestAsync($"{JetStreamApiSubjects.ConsumerDelete}MISSINGDEL.ghost", "{}"); // Result may be success (idempotent) or not-found; neither should throw _ = del; } // --------------------------------------------------------------- // Leader stepdown for consumer // --------------------------------------------------------------- // Go ref: TestJetStreamClusterConsumerLeaderStepdown — jetstream_cluster_2_test.go [Fact] public async Task Consumer_leader_stepdown_api_succeeds() { await using var cluster = await JetStreamClusterFixture.StartAsync(3); await cluster.CreateStreamAsync("CONSLSD", ["clsd2.>"], replicas: 3); await cluster.CreateConsumerAsync("CONSLSD", "lsdcons"); var resp = await cluster.RequestAsync($"{JetStreamApiSubjects.ConsumerLeaderStepdown}CONSLSD.lsdcons", "{}"); resp.Success.ShouldBeTrue(); } // Go ref: TestJetStreamClusterConsumerFetchAfterLeaderStepdown — jetstream_cluster_2_test.go [Fact] public async Task Fetch_works_after_consumer_leader_stepdown() { await using var cluster = await JetStreamClusterFixture.StartAsync(3); await cluster.CreateStreamAsync("FETCHLSD", ["flsd.>"], replicas: 3); await cluster.CreateConsumerAsync("FETCHLSD", "lsdcons2"); for (var i = 0; i < 5; i++) await cluster.PublishAsync("flsd.event", $"msg-{i}"); // Step down consumer leader await cluster.RequestAsync($"{JetStreamApiSubjects.ConsumerLeaderStepdown}FETCHLSD.lsdcons2", "{}"); var batch = await cluster.FetchAsync("FETCHLSD", "lsdcons2", 5); batch.Messages.Count.ShouldBe(5); } }