diff --git a/tests/NATS.Server.Tests/JetStream/Cluster/JsClusterAdvancedTests.cs b/tests/NATS.Server.Tests/JetStream/Cluster/JsClusterAdvancedTests.cs
new file mode 100644
index 0000000..b8d946f
--- /dev/null
+++ b/tests/NATS.Server.Tests/JetStream/Cluster/JsClusterAdvancedTests.cs
@@ -0,0 +1,743 @@
+// Go ref: TestJetStreamClusterXxx — jetstream_cluster_4_test.go
+// Covers: large clusters, many-subject streams, wildcard streams, high-message-count
+// publishes, multi-stream mixed replica counts, create/delete/recreate cycles,
+// consumer on high-message streams, purge/republish, stream delete cascades,
+// node removal and restart lifecycle markers.
+using System.Text;
+using NATS.Server.JetStream.Api;
+using NATS.Server.JetStream.Cluster;
+using NATS.Server.JetStream.Models;
+
+namespace NATS.Server.Tests.JetStream.Cluster;
+
+///
+/// Advanced JetStream cluster tests covering high-load scenarios, large clusters,
+/// many-subject streams, wildcard subjects, multi-stream environments, consumer
+/// lifecycle edge cases, purge/republish cycles, and node lifecycle markers.
+/// Ported from Go jetstream_cluster_4_test.go.
+///
+public class JsClusterAdvancedTests
+{
+ // ---------------------------------------------------------------
+ // Go ref: TestJetStreamClusterLargeClusterR5 — jetstream_cluster_4_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ public async Task Large_seven_node_cluster_with_R5_stream_accepts_publishes()
+ {
+ // Go ref: TestJetStreamClusterLargeClusterR5 — jetstream_cluster_4_test.go
+ await using var cluster = await JetStreamClusterFixture.StartAsync(7);
+
+ cluster.NodeCount.ShouldBe(7);
+
+ var resp = await cluster.CreateStreamAsync("R5LARGE", ["r5.>"], replicas: 5);
+ resp.Error.ShouldBeNull();
+ resp.StreamInfo.ShouldNotBeNull();
+ resp.StreamInfo!.Config.Replicas.ShouldBe(5);
+
+ for (var i = 0; i < 20; i++)
+ {
+ var ack = await cluster.PublishAsync("r5.event", $"msg-{i}");
+ ack.Stream.ShouldBe("R5LARGE");
+ ack.Seq.ShouldBe((ulong)(i + 1));
+ }
+
+ var state = await cluster.GetStreamStateAsync("R5LARGE");
+ state.Messages.ShouldBe(20UL);
+ }
+
+ // ---------------------------------------------------------------
+ // Go ref: TestJetStreamClusterStreamWithManySubjects — jetstream_cluster_4_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ public async Task Stream_with_twenty_subjects_routes_all_correctly()
+ {
+ // Go ref: TestJetStreamClusterStreamWithManySubjects — jetstream_cluster_4_test.go
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ var subjects = Enumerable.Range(1, 20).Select(i => $"topic.{i}").ToArray();
+ var resp = await cluster.CreateStreamAsync("MANYSUBJ", subjects, replicas: 3);
+ resp.Error.ShouldBeNull();
+ resp.StreamInfo!.Config.Subjects.Count.ShouldBe(20);
+
+ // Publish to each subject
+ for (var i = 1; i <= 20; i++)
+ {
+ var ack = await cluster.PublishAsync($"topic.{i}", $"payload-{i}");
+ ack.Stream.ShouldBe("MANYSUBJ");
+ }
+
+ var state = await cluster.GetStreamStateAsync("MANYSUBJ");
+ state.Messages.ShouldBe(20UL);
+ }
+
+ // ---------------------------------------------------------------
+ // Go ref: TestJetStreamClusterWildcardSubjectStream — jetstream_cluster_4_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ public async Task Stream_with_wildcard_gt_subject_captures_all_sub_subjects()
+ {
+ // Go ref: TestJetStreamClusterWildcardSubjectStream — jetstream_cluster_4_test.go
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ var resp = await cluster.CreateStreamAsync("WILDCARD", [">"], replicas: 3);
+ resp.Error.ShouldBeNull();
+
+ await cluster.PublishAsync("any.subject.here", "msg1");
+ await cluster.PublishAsync("totally.different", "msg2");
+ await cluster.PublishAsync("nested.deep.path.to.leaf", "msg3");
+
+ var state = await cluster.GetStreamStateAsync("WILDCARD");
+ state.Messages.ShouldBe(3UL);
+ }
+
+ // ---------------------------------------------------------------
+ // Go ref: TestJetStreamClusterPublish1000MessagesToReplicatedStream — jetstream_cluster_4_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ public async Task Publish_1000_messages_to_R3_stream_all_acknowledged()
+ {
+ // Go ref: TestJetStreamClusterPublish1000MessagesToReplicatedStream — jetstream_cluster_4_test.go
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ await cluster.CreateStreamAsync("BIG3", ["big.>"], replicas: 3);
+
+ var lastSeq = 0UL;
+ for (var i = 0; i < 1000; i++)
+ {
+ var ack = await cluster.PublishAsync("big.event", $"msg-{i}");
+ ack.Stream.ShouldBe("BIG3");
+ ack.ErrorCode.ShouldBeNull();
+ lastSeq = ack.Seq;
+ }
+
+ lastSeq.ShouldBe(1000UL);
+
+ var state = await cluster.GetStreamStateAsync("BIG3");
+ state.Messages.ShouldBe(1000UL);
+ state.FirstSeq.ShouldBe(1UL);
+ state.LastSeq.ShouldBe(1000UL);
+ }
+
+ // ---------------------------------------------------------------
+ // Go ref: TestJetStreamClusterPublish1000MessagesToR1Stream — jetstream_cluster_4_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ public async Task Publish_1000_messages_to_R1_stream_all_acknowledged()
+ {
+ // Go ref: TestJetStreamClusterPublish1000MessagesToR1Stream — jetstream_cluster_4_test.go
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ await cluster.CreateStreamAsync("BIG1", ["b1.>"], replicas: 1);
+
+ for (var i = 0; i < 1000; i++)
+ {
+ var ack = await cluster.PublishAsync("b1.event", $"msg-{i}");
+ ack.Stream.ShouldBe("BIG1");
+ ack.ErrorCode.ShouldBeNull();
+ }
+
+ var state = await cluster.GetStreamStateAsync("BIG1");
+ state.Messages.ShouldBe(1000UL);
+ }
+
+ // ---------------------------------------------------------------
+ // Go ref: TestJetStreamClusterStreamStateAfter1000Messages — jetstream_cluster_4_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ public async Task Stream_state_accurate_after_1000_messages()
+ {
+ // Go ref: TestJetStreamClusterStreamStateAfter1000Messages — jetstream_cluster_4_test.go
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ await cluster.CreateStreamAsync("STATE1K", ["s1k.>"], replicas: 3);
+
+ for (var i = 0; i < 1000; i++)
+ await cluster.PublishAsync("s1k.data", $"payload-{i}");
+
+ var state = await cluster.GetStreamStateAsync("STATE1K");
+ state.Messages.ShouldBe(1000UL);
+ state.FirstSeq.ShouldBe(1UL);
+ state.LastSeq.ShouldBe(1000UL);
+ state.Bytes.ShouldBeGreaterThan(0UL);
+ }
+
+ // ---------------------------------------------------------------
+ // Go ref: TestJetStreamClusterMultipleStreamsMixedReplicas — jetstream_cluster_4_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ public async Task Ten_streams_with_mixed_replica_counts_all_independent()
+ {
+ // Go ref: TestJetStreamClusterMultipleStreamsMixedReplicas — jetstream_cluster_4_test.go
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ for (var i = 0; i < 10; i++)
+ {
+ var replicas = (i % 3) + 1;
+ var resp = await cluster.CreateStreamAsync($"MIX{i}", [$"mix{i}.>"], replicas: replicas);
+ resp.Error.ShouldBeNull();
+ }
+
+ // Publish to each stream independently
+ for (var i = 0; i < 10; i++)
+ {
+ var ack = await cluster.PublishAsync($"mix{i}.event", $"stream-{i}-msg");
+ ack.Stream.ShouldBe($"MIX{i}");
+ }
+
+ // Verify each stream has exactly 1 message
+ for (var i = 0; i < 10; i++)
+ {
+ var state = await cluster.GetStreamStateAsync($"MIX{i}");
+ state.Messages.ShouldBe(1UL);
+ }
+ }
+
+ // ---------------------------------------------------------------
+ // Go ref: TestJetStreamClusterCreatePublishDeleteRecreate — jetstream_cluster_4_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ public async Task Create_publish_delete_recreate_cycle_three_times()
+ {
+ // Go ref: TestJetStreamClusterCreatePublishDeleteRecreate — jetstream_cluster_4_test.go
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ for (var cycle = 0; cycle < 3; cycle++)
+ {
+ // Create stream
+ var create = await cluster.CreateStreamAsync("CYCLE", ["cyc.>"], replicas: 3);
+ create.Error.ShouldBeNull();
+
+ // Publish messages
+ for (var i = 0; i < 5; i++)
+ await cluster.PublishAsync("cyc.event", $"cycle-{cycle}-msg-{i}");
+
+ var state = await cluster.GetStreamStateAsync("CYCLE");
+ state.Messages.ShouldBe(5UL);
+
+ // Delete stream
+ var del = await cluster.RequestAsync($"{JetStreamApiSubjects.StreamDelete}CYCLE", "{}");
+ del.Success.ShouldBeTrue();
+ }
+ }
+
+ // ---------------------------------------------------------------
+ // Go ref: TestJetStreamClusterConsumerOn1000MessageStream — jetstream_cluster_4_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ public async Task Consumer_on_stream_with_1000_messages_fetches_correctly()
+ {
+ // Go ref: TestJetStreamClusterConsumerOn1000MessageStream — jetstream_cluster_4_test.go
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ await cluster.CreateStreamAsync("FETCH1K", ["f1k.>"], replicas: 3);
+
+ for (var i = 0; i < 1000; i++)
+ await cluster.PublishAsync("f1k.event", $"msg-{i}");
+
+ await cluster.CreateConsumerAsync("FETCH1K", "fetcher", filterSubject: "f1k.>");
+
+ var batch = await cluster.FetchAsync("FETCH1K", "fetcher", 100);
+ batch.Messages.Count.ShouldBe(100);
+ batch.Messages[0].Sequence.ShouldBe(1UL);
+ batch.Messages[99].Sequence.ShouldBe(100UL);
+ }
+
+ // ---------------------------------------------------------------
+ // Go ref: TestJetStreamClusterAckAllFor1000Messages — jetstream_cluster_4_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ public async Task AckAll_for_1000_messages_reduces_pending_to_zero()
+ {
+ // Go ref: TestJetStreamClusterAckAllFor1000Messages — jetstream_cluster_4_test.go
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ await cluster.CreateStreamAsync("ACKBIG", ["ab.>"], replicas: 3);
+ await cluster.CreateConsumerAsync("ACKBIG", "acker", filterSubject: "ab.>",
+ ackPolicy: AckPolicy.All);
+
+ for (var i = 0; i < 1000; i++)
+ await cluster.PublishAsync("ab.event", $"msg-{i}");
+
+ var batch = await cluster.FetchAsync("ACKBIG", "acker", 1000);
+ batch.Messages.Count.ShouldBe(1000);
+
+ // AckAll up to last sequence
+ cluster.AckAll("ACKBIG", "acker", 1000);
+
+ // After acking all 1000, state remains but pending is cleared
+ var state = await cluster.GetStreamStateAsync("ACKBIG");
+ state.Messages.ShouldBe(1000UL);
+ }
+
+ // ---------------------------------------------------------------
+ // Go ref: TestJetStreamClusterStreamInfoConsistentAfterManyOps — jetstream_cluster_4_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ public async Task Stream_info_consistent_after_many_operations()
+ {
+ // Go ref: TestJetStreamClusterStreamInfoConsistentAfterManyOps — jetstream_cluster_4_test.go
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ await cluster.CreateStreamAsync("INFOCONSIST", ["ic.>"], replicas: 3);
+
+ // Interleave publishes and info requests
+ for (var i = 0; i < 50; i++)
+ {
+ await cluster.PublishAsync("ic.event", $"msg-{i}");
+ var info = await cluster.GetStreamInfoAsync("INFOCONSIST");
+ info.StreamInfo.ShouldNotBeNull();
+ info.StreamInfo!.State.Messages.ShouldBe((ulong)(i + 1));
+ }
+
+ var finalInfo = await cluster.GetStreamInfoAsync("INFOCONSIST");
+ finalInfo.StreamInfo!.Config.Name.ShouldBe("INFOCONSIST");
+ finalInfo.StreamInfo.Config.Replicas.ShouldBe(3);
+ finalInfo.StreamInfo.State.Messages.ShouldBe(50UL);
+ }
+
+ // ---------------------------------------------------------------
+ // Go ref: TestJetStreamClusterMetaStateAfter10StreamOps — jetstream_cluster_4_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ public async Task Meta_state_after_creating_and_deleting_ten_streams()
+ {
+ // Go ref: TestJetStreamClusterMetaStateAfter10StreamOps — jetstream_cluster_4_test.go
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ for (var i = 0; i < 10; i++)
+ await cluster.CreateStreamAsync($"META{i}", [$"meta{i}.>"], replicas: 3);
+
+ // Delete half
+ for (var i = 0; i < 5; i++)
+ {
+ var del = await cluster.RequestAsync($"{JetStreamApiSubjects.StreamDelete}META{i}", "{}");
+ del.Success.ShouldBeTrue();
+ }
+
+ var metaState = cluster.GetMetaState();
+ metaState.ShouldNotBeNull();
+
+ var names = await cluster.RequestAsync(JetStreamApiSubjects.StreamNames, "{}");
+ names.StreamNames.ShouldNotBeNull();
+ names.StreamNames!.Count.ShouldBe(5);
+ for (var i = 5; i < 10; i++)
+ names.StreamNames.ShouldContain($"META{i}");
+ }
+
+ // ---------------------------------------------------------------
+ // Go ref: TestJetStreamClusterMultipleConsumersIndependentPending — jetstream_cluster_4_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ public async Task Five_consumers_on_same_stream_have_independent_pending()
+ {
+ // Go ref: TestJetStreamClusterMultipleConsumersIndependentPending — jetstream_cluster_4_test.go
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ await cluster.CreateStreamAsync("MULTIDUP", ["md.>"], replicas: 3);
+
+ for (var i = 0; i < 10; i++)
+ await cluster.PublishAsync("md.event", $"msg-{i}");
+
+ for (var c = 0; c < 5; c++)
+ await cluster.CreateConsumerAsync("MULTIDUP", $"consumer{c}", filterSubject: "md.>");
+
+ // Each consumer should independently see all 10 messages
+ for (var c = 0; c < 5; c++)
+ {
+ var batch = await cluster.FetchAsync("MULTIDUP", $"consumer{c}", 10);
+ batch.Messages.Count.ShouldBe(10);
+ batch.Messages[0].Sequence.ShouldBe(1UL);
+ }
+ }
+
+ // ---------------------------------------------------------------
+ // Go ref: TestJetStreamClusterConsumerWildcardFilter — jetstream_cluster_4_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ public async Task Consumer_with_wildcard_filter_delivers_only_matching_messages()
+ {
+ // Go ref: TestJetStreamClusterConsumerWildcardFilter — jetstream_cluster_4_test.go
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ await cluster.CreateStreamAsync("WFILT", ["wf.>"], replicas: 3);
+ await cluster.CreateConsumerAsync("WFILT", "wildcons", filterSubject: "wf.alpha.>");
+
+ await cluster.PublishAsync("wf.alpha.one", "match1");
+ await cluster.PublishAsync("wf.beta.two", "no-match");
+ await cluster.PublishAsync("wf.alpha.three", "match2");
+ await cluster.PublishAsync("wf.gamma.four", "no-match2");
+ await cluster.PublishAsync("wf.alpha.five", "match3");
+
+ var batch = await cluster.FetchAsync("WFILT", "wildcons", 10);
+ batch.Messages.Count.ShouldBe(3);
+ foreach (var msg in batch.Messages)
+ msg.Subject.ShouldStartWith("wf.alpha.");
+ }
+
+ // ---------------------------------------------------------------
+ // Go ref: TestJetStreamClusterStreamUpdateAddSubjectsAfterPublish — jetstream_cluster_4_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ public async Task Stream_update_adding_subjects_after_publishes_works()
+ {
+ // Go ref: TestJetStreamClusterStreamUpdateAddSubjectsAfterPublish — jetstream_cluster_4_test.go
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ await cluster.CreateStreamAsync("ADDSUB", ["as.alpha"], replicas: 3);
+
+ for (var i = 0; i < 5; i++)
+ await cluster.PublishAsync("as.alpha", $"msg-{i}");
+
+ var state = await cluster.GetStreamStateAsync("ADDSUB");
+ state.Messages.ShouldBe(5UL);
+
+ // Add more subjects via update
+ var update = cluster.UpdateStream("ADDSUB", ["as.alpha", "as.beta", "as.gamma"], replicas: 3);
+ update.Error.ShouldBeNull();
+ update.StreamInfo!.Config.Subjects.Count.ShouldBe(3);
+ update.StreamInfo.Config.Subjects.ShouldContain("as.beta");
+
+ // Now publish to new subjects
+ await cluster.PublishAsync("as.beta", "beta-msg");
+ await cluster.PublishAsync("as.gamma", "gamma-msg");
+
+ var finalState = await cluster.GetStreamStateAsync("ADDSUB");
+ finalState.Messages.ShouldBe(7UL);
+ }
+
+ // ---------------------------------------------------------------
+ // Go ref: TestJetStreamClusterStreamPurgeAndRepublish — jetstream_cluster_4_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ public async Task Stream_purge_in_cluster_then_republish_works_correctly()
+ {
+ // Go ref: TestJetStreamClusterStreamPurgeAndRepublish — jetstream_cluster_4_test.go
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ await cluster.CreateStreamAsync("PURGEREP", ["pr.>"], replicas: 3);
+
+ for (var i = 0; i < 100; i++)
+ await cluster.PublishAsync("pr.data", $"msg-{i}");
+
+ var before = await cluster.GetStreamStateAsync("PURGEREP");
+ before.Messages.ShouldBe(100UL);
+
+ // Purge
+ var purge = await cluster.RequestAsync($"{JetStreamApiSubjects.StreamPurge}PURGEREP", "{}");
+ purge.Success.ShouldBeTrue();
+
+ var afterPurge = await cluster.GetStreamStateAsync("PURGEREP");
+ afterPurge.Messages.ShouldBe(0UL);
+
+ // Re-publish
+ for (var i = 0; i < 50; i++)
+ {
+ var ack = await cluster.PublishAsync("pr.data", $"new-msg-{i}");
+ ack.ErrorCode.ShouldBeNull();
+ }
+
+ var final = await cluster.GetStreamStateAsync("PURGEREP");
+ final.Messages.ShouldBe(50UL);
+ // Sequences restart after purge
+ final.FirstSeq.ShouldBeGreaterThan(0UL);
+ }
+
+ // ---------------------------------------------------------------
+ // Go ref: TestJetStreamClusterFetchEmptyAfterPurge — jetstream_cluster_4_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ public async Task Fetch_empty_after_stream_purge()
+ {
+ // Go ref: TestJetStreamClusterFetchEmptyAfterPurge — jetstream_cluster_4_test.go
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ await cluster.CreateStreamAsync("PURGEDRAIN", ["pd.>"], replicas: 3);
+ await cluster.CreateConsumerAsync("PURGEDRAIN", "reader", filterSubject: "pd.>");
+
+ for (var i = 0; i < 20; i++)
+ await cluster.PublishAsync("pd.event", $"msg-{i}");
+
+ // Fetch to advance the consumer
+ var pre = await cluster.FetchAsync("PURGEDRAIN", "reader", 20);
+ pre.Messages.Count.ShouldBe(20);
+
+ // Purge the stream
+ (await cluster.RequestAsync($"{JetStreamApiSubjects.StreamPurge}PURGEDRAIN", "{}")).Success.ShouldBeTrue();
+
+ // Fetch should now return empty
+ var post = await cluster.FetchAsync("PURGEDRAIN", "reader", 20);
+ post.Messages.Count.ShouldBe(0);
+ }
+
+ // ---------------------------------------------------------------
+ // Go ref: TestJetStreamClusterStreamDeleteCascadesConsumers — jetstream_cluster_4_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ public async Task Stream_delete_cascades_consumer_removal()
+ {
+ // Go ref: TestJetStreamClusterStreamDeleteCascadesConsumers — jetstream_cluster_4_test.go
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ await cluster.CreateStreamAsync("CASCADE", ["cas.>"], replicas: 3);
+ await cluster.CreateConsumerAsync("CASCADE", "c1");
+ await cluster.CreateConsumerAsync("CASCADE", "c2");
+ await cluster.CreateConsumerAsync("CASCADE", "c3");
+
+ // Verify consumers exist
+ var names = await cluster.RequestAsync($"{JetStreamApiSubjects.ConsumerNames}CASCADE", "{}");
+ names.ConsumerNames!.Count.ShouldBe(3);
+
+ // Delete the stream
+ (await cluster.RequestAsync($"{JetStreamApiSubjects.StreamDelete}CASCADE", "{}")).Success.ShouldBeTrue();
+
+ // Stream no longer exists
+ var info = await cluster.GetStreamInfoAsync("CASCADE");
+ info.Error.ShouldNotBeNull();
+ info.Error!.Code.ShouldBe(404);
+ }
+
+ // ---------------------------------------------------------------
+ // Go ref: TestJetStreamClusterNodeRemovalPreservesDataReads — jetstream_cluster_4_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ public async Task Node_removal_does_not_affect_stream_data_reads()
+ {
+ // Go ref: TestJetStreamClusterNodeRemovalPreservesDataReads — jetstream_cluster_4_test.go
+ await using var cluster = await JetStreamClusterFixture.StartAsync(5);
+
+ await cluster.CreateStreamAsync("NODEREM", ["nr.>"], replicas: 3);
+
+ for (var i = 0; i < 30; i++)
+ await cluster.PublishAsync("nr.event", $"msg-{i}");
+
+ var before = await cluster.GetStreamStateAsync("NODEREM");
+ before.Messages.ShouldBe(30UL);
+
+ // Simulate removing a node
+ cluster.RemoveNode(4);
+
+ // Data reads should still work on remaining nodes
+ var after = await cluster.GetStreamStateAsync("NODEREM");
+ after.Messages.ShouldBe(30UL);
+ after.LastSeq.ShouldBe(30UL);
+ }
+
+ // ---------------------------------------------------------------
+ // Go ref: TestJetStreamClusterNodeRestartPreservesLifecycleMarkers — jetstream_cluster_4_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ public async Task Node_restart_records_lifecycle_markers_correctly()
+ {
+ // Go ref: TestJetStreamClusterNodeRestartPreservesLifecycleMarkers — jetstream_cluster_4_test.go
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ await cluster.CreateStreamAsync("RESTART", ["rs.>"], replicas: 3);
+
+ for (var i = 0; i < 10; i++)
+ await cluster.PublishAsync("rs.event", $"msg-{i}");
+
+ // Simulate node removal
+ cluster.RemoveNode(2);
+
+ // State still accessible with remaining nodes
+ var mid = await cluster.GetStreamStateAsync("RESTART");
+ mid.Messages.ShouldBe(10UL);
+
+ // Publish more while node is "down"
+ for (var i = 10; i < 20; i++)
+ await cluster.PublishAsync("rs.event", $"msg-{i}");
+
+ // Simulate node restart
+ cluster.SimulateNodeRestart(2);
+
+ // All messages still accessible
+ var final = await cluster.GetStreamStateAsync("RESTART");
+ final.Messages.ShouldBe(20UL);
+ final.LastSeq.ShouldBe(20UL);
+ }
+
+ // ---------------------------------------------------------------
+ // Go ref: TestJetStreamClusterLeaderStepdownDuringPublish — jetstream_cluster_4_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ public async Task Leader_stepdown_during_publish_sequence_is_monotonic()
+ {
+ // Go ref: TestJetStreamClusterLeaderStepdownDuringPublish — jetstream_cluster_4_test.go
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ await cluster.CreateStreamAsync("SEQSTEP", ["seq.>"], replicas: 3);
+
+ var seqs = new List();
+ for (var i = 0; i < 10; i++)
+ {
+ var ack = await cluster.PublishAsync("seq.event", $"msg-{i}");
+ seqs.Add(ack.Seq);
+ }
+
+ // Step down leader
+ (await cluster.StepDownStreamLeaderAsync("SEQSTEP")).Success.ShouldBeTrue();
+
+ for (var i = 10; i < 20; i++)
+ {
+ var ack = await cluster.PublishAsync("seq.event", $"msg-{i}");
+ seqs.Add(ack.Seq);
+ }
+
+ // All sequences must be strictly increasing
+ for (var i = 1; i < seqs.Count; i++)
+ seqs[i].ShouldBeGreaterThan(seqs[i - 1]);
+
+ seqs[^1].ShouldBe(20UL);
+ }
+
+ // ---------------------------------------------------------------
+ // Go ref: TestJetStreamClusterStreamInfoAfterStepdown — jetstream_cluster_4_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ public async Task Stream_info_accurate_after_leader_stepdown_with_many_messages()
+ {
+ // Go ref: TestJetStreamClusterStreamInfoAfterStepdown — jetstream_cluster_4_test.go
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ await cluster.CreateStreamAsync("INFOSD1K", ["isd.>"], replicas: 3);
+
+ for (var i = 0; i < 500; i++)
+ await cluster.PublishAsync("isd.event", $"msg-{i}");
+
+ (await cluster.StepDownStreamLeaderAsync("INFOSD1K")).Success.ShouldBeTrue();
+
+ for (var i = 500; i < 1000; i++)
+ await cluster.PublishAsync("isd.event", $"msg-{i}");
+
+ var info = await cluster.GetStreamInfoAsync("INFOSD1K");
+ info.StreamInfo.ShouldNotBeNull();
+ info.StreamInfo!.State.Messages.ShouldBe(1000UL);
+ info.StreamInfo.State.FirstSeq.ShouldBe(1UL);
+ info.StreamInfo.State.LastSeq.ShouldBe(1000UL);
+ }
+
+ // ---------------------------------------------------------------
+ // Go ref: TestJetStreamClusterStreamReplicaGroupHasCorrectNodes — jetstream_cluster_4_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ public async Task Replica_group_for_stream_has_correct_node_count()
+ {
+ // Go ref: TestJetStreamClusterStreamReplicaGroupHasCorrectNodes — jetstream_cluster_4_test.go
+ await using var cluster = await JetStreamClusterFixture.StartAsync(5);
+
+ await cluster.CreateStreamAsync("GRPCHECK", ["gc.>"], replicas: 3);
+
+ var group = cluster.GetReplicaGroup("GRPCHECK");
+ group.ShouldNotBeNull();
+ group!.Nodes.Count.ShouldBe(3);
+ group.Leader.ShouldNotBeNull();
+ group.Leader.IsLeader.ShouldBeTrue();
+ }
+
+ // ---------------------------------------------------------------
+ // Go ref: TestJetStreamClusterConsumerLeaderAfterStreamStepdown — jetstream_cluster_4_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ public async Task Consumer_leader_remains_valid_after_stream_stepdown()
+ {
+ // Go ref: TestJetStreamClusterConsumerLeaderAfterStreamStepdown — jetstream_cluster_4_test.go
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ await cluster.CreateStreamAsync("CONSLEADER", ["cl.>"], replicas: 3);
+ await cluster.CreateConsumerAsync("CONSLEADER", "durable1");
+
+ var leaderBefore = cluster.GetConsumerLeaderId("CONSLEADER", "durable1");
+ leaderBefore.ShouldNotBeNullOrWhiteSpace();
+
+ (await cluster.StepDownStreamLeaderAsync("CONSLEADER")).Success.ShouldBeTrue();
+
+ var leaderAfter = cluster.GetConsumerLeaderId("CONSLEADER", "durable1");
+ leaderAfter.ShouldNotBeNullOrWhiteSpace();
+ }
+
+ // ---------------------------------------------------------------
+ // Go ref: TestJetStreamClusterWaitOnStreamLeaderAfterCreation — jetstream_cluster_4_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ public async Task WaitOnStreamLeader_resolves_immediately_for_existing_stream()
+ {
+ // Go ref: TestJetStreamClusterWaitOnStreamLeaderAfterCreation — jetstream_cluster_4_test.go
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ await cluster.CreateStreamAsync("WLEADER", ["wl.>"], replicas: 3);
+
+ // Should complete immediately, no timeout
+ await cluster.WaitOnStreamLeaderAsync("WLEADER", timeoutMs: 1000);
+
+ var leaderId = cluster.GetStreamLeaderId("WLEADER");
+ leaderId.ShouldNotBeNullOrWhiteSpace();
+ }
+
+ // ---------------------------------------------------------------
+ // Go ref: TestJetStreamClusterConsumerWaitOnLeaderAfterCreation — jetstream_cluster_4_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ public async Task WaitOnConsumerLeader_resolves_for_existing_consumer()
+ {
+ // Go ref: TestJetStreamClusterConsumerWaitOnLeaderAfterCreation — jetstream_cluster_4_test.go
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ await cluster.CreateStreamAsync("WCLEADER2", ["wcl2.>"], replicas: 3);
+ await cluster.CreateConsumerAsync("WCLEADER2", "dur-wc");
+
+ await cluster.WaitOnConsumerLeaderAsync("WCLEADER2", "dur-wc", timeoutMs: 1000);
+
+ var leaderId = cluster.GetConsumerLeaderId("WCLEADER2", "dur-wc");
+ leaderId.ShouldNotBeNullOrWhiteSpace();
+ }
+
+ // ---------------------------------------------------------------
+ // Go ref: TestJetStreamClusterAccountInfoAfterBatchDelete — jetstream_cluster_4_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ public async Task Account_info_reflects_accurate_stream_count_after_batch_delete()
+ {
+ // Go ref: TestJetStreamClusterAccountInfoAfterBatchDelete — jetstream_cluster_4_test.go
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ for (var i = 0; i < 8; i++)
+ await cluster.CreateStreamAsync($"BATCH{i}", [$"batch{i}.>"], replicas: 3);
+
+ var pre = await cluster.RequestAsync(JetStreamApiSubjects.Info, "{}");
+ pre.AccountInfo!.Streams.ShouldBe(8);
+
+ // Delete 3 streams
+ for (var i = 0; i < 3; i++)
+ (await cluster.RequestAsync($"{JetStreamApiSubjects.StreamDelete}BATCH{i}", "{}")).Success.ShouldBeTrue();
+
+ var post = await cluster.RequestAsync(JetStreamApiSubjects.Info, "{}");
+ post.AccountInfo!.Streams.ShouldBe(5);
+ }
+}
diff --git a/tests/NATS.Server.Tests/JetStream/Cluster/JsClusterLongRunningTests.cs b/tests/NATS.Server.Tests/JetStream/Cluster/JsClusterLongRunningTests.cs
new file mode 100644
index 0000000..4fed37a
--- /dev/null
+++ b/tests/NATS.Server.Tests/JetStream/Cluster/JsClusterLongRunningTests.cs
@@ -0,0 +1,502 @@
+// Go ref: TestJetStreamClusterXxx — jetstream_cluster_long_test.go
+// Covers: high-volume publish/consume cycles, many sequential fetches, many consumers,
+// many streams, repeated publish-ack-fetch cycles, stepdowns during publishing,
+// alternating publish+stepdown, create-publish-delete sequences, ack tracking across
+// failovers, batch-1 iteration, mixed multi-stream operations, rapid meta stepdowns,
+// large R1 message volumes, max-messages stream limits, consumer pending correctness.
+using System.Text;
+using NATS.Server.JetStream.Api;
+using NATS.Server.JetStream.Cluster;
+using NATS.Server.JetStream.Models;
+
+namespace NATS.Server.Tests.JetStream.Cluster;
+
+///
+/// Long-running JetStream cluster tests covering high-volume scenarios,
+/// repeated failover cycles, many-stream/many-consumer environments, and
+/// limit enforcement under sustained load.
+/// Ported from Go jetstream_cluster_long_test.go.
+/// All tests are marked [Trait("Category", "LongRunning")].
+///
+public class JsClusterLongRunningTests
+{
+ // ---------------------------------------------------------------
+ // Go ref: TestJetStreamClusterLong5000MessagesR3 — jetstream_cluster_long_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "LongRunning")]
+ public async Task Five_thousand_messages_in_R3_stream_maintain_consistency()
+ {
+ // Go ref: TestJetStreamClusterLong5000MessagesR3 — jetstream_cluster_long_test.go
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ await cluster.CreateStreamAsync("LONG5K", ["long5k.>"], replicas: 3);
+
+ for (var i = 0; i < 5000; i++)
+ {
+ var ack = await cluster.PublishAsync("long5k.data", $"msg-{i}");
+ ack.ErrorCode.ShouldBeNull();
+ ack.Seq.ShouldBe((ulong)(i + 1));
+ }
+
+ var state = await cluster.GetStreamStateAsync("LONG5K");
+ state.Messages.ShouldBe(5000UL);
+ state.FirstSeq.ShouldBe(1UL);
+ state.LastSeq.ShouldBe(5000UL);
+ }
+
+ // ---------------------------------------------------------------
+ // Go ref: TestJetStreamClusterLong100SequentialFetchesOf50 — jetstream_cluster_long_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "LongRunning")]
+ public async Task One_hundred_sequential_fetches_of_fifty_messages_each()
+ {
+ // Go ref: TestJetStreamClusterLong100SequentialFetchesOf50 — jetstream_cluster_long_test.go
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ await cluster.CreateStreamAsync("SEQFETCH", ["sf.>"], replicas: 3);
+ await cluster.CreateConsumerAsync("SEQFETCH", "batcher", filterSubject: "sf.>");
+
+ // Pre-publish 5000 messages
+ for (var i = 0; i < 5000; i++)
+ await cluster.PublishAsync("sf.event", $"msg-{i}");
+
+ var totalFetched = 0;
+ for (var batch = 0; batch < 100; batch++)
+ {
+ var result = await cluster.FetchAsync("SEQFETCH", "batcher", 50);
+ result.Messages.Count.ShouldBe(50);
+ totalFetched += result.Messages.Count;
+
+ // Verify sequences are contiguous within each batch
+ for (var j = 1; j < result.Messages.Count; j++)
+ result.Messages[j].Sequence.ShouldBe(result.Messages[j - 1].Sequence + 1);
+ }
+
+ totalFetched.ShouldBe(5000);
+ }
+
+ // ---------------------------------------------------------------
+ // Go ref: TestJetStreamClusterLong50ConsumersOnSameStream — jetstream_cluster_long_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "LongRunning")]
+ public async Task Fifty_consumers_on_same_stream_all_see_all_messages()
+ {
+ // Go ref: TestJetStreamClusterLong50ConsumersOnSameStream — jetstream_cluster_long_test.go
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ await cluster.CreateStreamAsync("FIFTYCONSUMERS", ["fc.>"], replicas: 3);
+
+ for (var i = 0; i < 100; i++)
+ await cluster.PublishAsync("fc.event", $"msg-{i}");
+
+ for (var c = 0; c < 50; c++)
+ await cluster.CreateConsumerAsync("FIFTYCONSUMERS", $"cons{c}", filterSubject: "fc.>");
+
+ // Each consumer should see all 100 messages independently
+ for (var c = 0; c < 50; c++)
+ {
+ var batch = await cluster.FetchAsync("FIFTYCONSUMERS", $"cons{c}", 100);
+ batch.Messages.Count.ShouldBe(100);
+ }
+ }
+
+ // ---------------------------------------------------------------
+ // Go ref: TestJetStreamClusterLong20StreamsIn5NodeCluster — jetstream_cluster_long_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "LongRunning")]
+ public async Task Twenty_streams_in_five_node_cluster_are_independent()
+ {
+ // Go ref: TestJetStreamClusterLong20StreamsIn5NodeCluster — jetstream_cluster_long_test.go
+ await using var cluster = await JetStreamClusterFixture.StartAsync(5);
+
+ for (var i = 0; i < 20; i++)
+ await cluster.CreateStreamAsync($"IND{i}", [$"ind{i}.>"], replicas: 3);
+
+ // Publish to each stream
+ for (var i = 0; i < 20; i++)
+ for (var j = 0; j < 10; j++)
+ await cluster.PublishAsync($"ind{i}.event", $"stream{i}-msg{j}");
+
+ // Verify each stream is independent
+ for (var i = 0; i < 20; i++)
+ {
+ var state = await cluster.GetStreamStateAsync($"IND{i}");
+ state.Messages.ShouldBe(10UL);
+ }
+
+ var accountInfo = await cluster.RequestAsync(JetStreamApiSubjects.Info, "{}");
+ accountInfo.AccountInfo!.Streams.ShouldBe(20);
+ }
+
+ // ---------------------------------------------------------------
+ // Go ref: TestJetStreamClusterLongPublishAckFetchCycle100Times — jetstream_cluster_long_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "LongRunning")]
+ public async Task Publish_ack_fetch_cycle_repeated_100_times()
+ {
+ // Go ref: TestJetStreamClusterLongPublishAckFetchCycle100Times — jetstream_cluster_long_test.go
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ await cluster.CreateStreamAsync("PAFCYCLE", ["paf.>"], replicas: 3);
+ await cluster.CreateConsumerAsync("PAFCYCLE", "cycler", filterSubject: "paf.>",
+ ackPolicy: AckPolicy.All);
+
+ for (var cycle = 0; cycle < 100; cycle++)
+ {
+ // Publish one message per cycle
+ var ack = await cluster.PublishAsync("paf.event", $"cycle-{cycle}");
+ ack.ErrorCode.ShouldBeNull();
+
+ // Fetch one message
+ var batch = await cluster.FetchAsync("PAFCYCLE", "cycler", 1);
+ batch.Messages.Count.ShouldBe(1);
+ batch.Messages[0].Sequence.ShouldBe(ack.Seq);
+
+ // Ack it
+ cluster.AckAll("PAFCYCLE", "cycler", ack.Seq);
+ }
+
+ var finalState = await cluster.GetStreamStateAsync("PAFCYCLE");
+ finalState.Messages.ShouldBe(100UL);
+ }
+
+ // ---------------------------------------------------------------
+ // Go ref: TestJetStreamClusterLong10StepdownsDuringPublish — jetstream_cluster_long_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "LongRunning")]
+ public async Task Ten_stepdowns_during_continuous_publish_preserve_all_messages()
+ {
+ // Go ref: TestJetStreamClusterLong10StepdownsDuringPublish — jetstream_cluster_long_test.go
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ await cluster.CreateStreamAsync("STEPDURINGPUB", ["sdp.>"], replicas: 3);
+
+ var totalPublished = 0;
+
+ // Publish 50 messages per batch, then step down (10 iterations = 500 msgs + 10 stepdowns)
+ for (var sd = 0; sd < 10; sd++)
+ {
+ for (var i = 0; i < 50; i++)
+ {
+ var ack = await cluster.PublishAsync("sdp.event", $"batch{sd}-msg{i}");
+ ack.ErrorCode.ShouldBeNull();
+ totalPublished++;
+ }
+
+ (await cluster.StepDownStreamLeaderAsync("STEPDURINGPUB")).Success.ShouldBeTrue();
+ }
+
+ var state = await cluster.GetStreamStateAsync("STEPDURINGPUB");
+ state.Messages.ShouldBe((ulong)totalPublished);
+ state.LastSeq.ShouldBe((ulong)totalPublished);
+ }
+
+ // ---------------------------------------------------------------
+ // Go ref: TestJetStreamClusterLongAlternatingPublishAndStepdown — jetstream_cluster_long_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "LongRunning")]
+ public async Task Alternating_publish_and_stepdown_20_iterations_preserves_monotonic_sequence()
+ {
+ // Go ref: TestJetStreamClusterLongAlternatingPublishAndStepdown — jetstream_cluster_long_test.go
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ await cluster.CreateStreamAsync("ALTPUBSD", ["aps.>"], replicas: 3);
+
+ var allSeqs = new List();
+
+ for (var iter = 0; iter < 20; iter++)
+ {
+ var ack = await cluster.PublishAsync("aps.event", $"iter-{iter}");
+ ack.ErrorCode.ShouldBeNull();
+ allSeqs.Add(ack.Seq);
+
+ (await cluster.StepDownStreamLeaderAsync("ALTPUBSD")).Success.ShouldBeTrue();
+ }
+
+ // Verify strictly monotonically increasing sequences across all stepdowns
+ for (var i = 1; i < allSeqs.Count; i++)
+ allSeqs[i].ShouldBeGreaterThan(allSeqs[i - 1]);
+
+ allSeqs[^1].ShouldBe(20UL);
+ }
+
+ // ---------------------------------------------------------------
+ // Go ref: TestJetStreamClusterLongCreatePublishDelete20Streams — jetstream_cluster_long_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "LongRunning")]
+ public async Task Create_publish_delete_20_streams_sequentially()
+ {
+ // Go ref: TestJetStreamClusterLongCreatePublishDelete20Streams — jetstream_cluster_long_test.go
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ for (var i = 0; i < 20; i++)
+ {
+ var streamName = $"SEQ{i}";
+
+ var create = await cluster.CreateStreamAsync(streamName, [$"seq{i}.>"], replicas: 3);
+ create.Error.ShouldBeNull();
+
+ for (var j = 0; j < 10; j++)
+ await cluster.PublishAsync($"seq{i}.event", $"msg-{j}");
+
+ var state = await cluster.GetStreamStateAsync(streamName);
+ state.Messages.ShouldBe(10UL);
+
+ var del = await cluster.RequestAsync($"{JetStreamApiSubjects.StreamDelete}{streamName}", "{}");
+ del.Success.ShouldBeTrue();
+ }
+
+ // All streams deleted
+ var accountInfo = await cluster.RequestAsync(JetStreamApiSubjects.Info, "{}");
+ accountInfo.AccountInfo!.Streams.ShouldBe(0);
+ }
+
+ // ---------------------------------------------------------------
+ // Go ref: TestJetStreamClusterLongConsumerAckAfter10Failovers — jetstream_cluster_long_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "LongRunning")]
+ public async Task Consumer_ack_tracking_correct_after_ten_leader_failovers()
+ {
+ // Go ref: TestJetStreamClusterLongConsumerAckAfter10Failovers — jetstream_cluster_long_test.go
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ await cluster.CreateStreamAsync("ACKFAIL", ["af.>"], replicas: 3);
+ await cluster.CreateConsumerAsync("ACKFAIL", "tracker", filterSubject: "af.>",
+ ackPolicy: AckPolicy.All);
+
+ // Pre-publish 100 messages
+ for (var i = 0; i < 100; i++)
+ await cluster.PublishAsync("af.event", $"msg-{i}");
+
+ // Fetch and ack in batches across 10 failovers
+ var ackedThrough = 0UL;
+ for (var failover = 0; failover < 10; failover++)
+ {
+ var batch = await cluster.FetchAsync("ACKFAIL", "tracker", 10);
+ batch.Messages.Count.ShouldBe(10);
+
+ var lastSeq = batch.Messages[^1].Sequence;
+ cluster.AckAll("ACKFAIL", "tracker", lastSeq);
+ ackedThrough = lastSeq;
+
+ (await cluster.StepDownStreamLeaderAsync("ACKFAIL")).Success.ShouldBeTrue();
+ }
+
+ ackedThrough.ShouldBe(100UL);
+ }
+
+ // ---------------------------------------------------------------
+ // Go ref: TestJetStreamClusterLongFetchBatch1Iterated500Times — jetstream_cluster_long_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "LongRunning")]
+ public async Task Fetch_with_batch_1_iterated_500_times_reads_all_messages()
+ {
+ // Go ref: TestJetStreamClusterLongFetchBatch1Iterated500Times — jetstream_cluster_long_test.go
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ await cluster.CreateStreamAsync("BATCH1ITER", ["b1i.>"], replicas: 3);
+ await cluster.CreateConsumerAsync("BATCH1ITER", "one_at_a_time", filterSubject: "b1i.>");
+
+ for (var i = 0; i < 500; i++)
+ await cluster.PublishAsync("b1i.event", $"msg-{i}");
+
+ var allSeqs = new List();
+ for (var i = 0; i < 500; i++)
+ {
+ var batch = await cluster.FetchAsync("BATCH1ITER", "one_at_a_time", 1);
+ batch.Messages.Count.ShouldBe(1);
+ allSeqs.Add(batch.Messages[0].Sequence);
+ }
+
+ // All 500 sequences read, strictly increasing
+ allSeqs.Count.ShouldBe(500);
+ for (var i = 1; i < allSeqs.Count; i++)
+ allSeqs[i].ShouldBeGreaterThan(allSeqs[i - 1]);
+ }
+
+ // ---------------------------------------------------------------
+ // Go ref: TestJetStreamClusterLongMixedMultiStreamOps — jetstream_cluster_long_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "LongRunning")]
+ public async Task Mixed_ops_five_streams_100_messages_each_consumers_fetch_all()
+ {
+ // Go ref: TestJetStreamClusterLongMixedMultiStreamOps — jetstream_cluster_long_test.go
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ // Create 5 streams
+ for (var s = 0; s < 5; s++)
+ await cluster.CreateStreamAsync($"MIXED{s}", [$"mixed{s}.>"], replicas: 3);
+
+ // Publish 100 messages to each
+ for (var s = 0; s < 5; s++)
+ for (var i = 0; i < 100; i++)
+ await cluster.PublishAsync($"mixed{s}.event", $"stream{s}-msg{i}");
+
+ // Create one consumer per stream
+ for (var s = 0; s < 5; s++)
+ await cluster.CreateConsumerAsync($"MIXED{s}", $"reader{s}", filterSubject: $"mixed{s}.>");
+
+ // Fetch all messages from each stream consumer
+ for (var s = 0; s < 5; s++)
+ {
+ var batch = await cluster.FetchAsync($"MIXED{s}", $"reader{s}", 100);
+ batch.Messages.Count.ShouldBe(100);
+ batch.Messages[0].Sequence.ShouldBe(1UL);
+ batch.Messages[^1].Sequence.ShouldBe(100UL);
+ }
+
+ var info = await cluster.RequestAsync(JetStreamApiSubjects.Info, "{}");
+ info.AccountInfo!.Streams.ShouldBe(5);
+ info.AccountInfo.Consumers.ShouldBe(5);
+ }
+
+ // ---------------------------------------------------------------
+ // Go ref: TestJetStreamClusterLongRapidMetaStepdowns — jetstream_cluster_long_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "LongRunning")]
+ public async Task Rapid_meta_stepdowns_20_times_all_streams_remain_accessible()
+ {
+ // Go ref: TestJetStreamClusterLongRapidMetaStepdowns — jetstream_cluster_long_test.go
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ // Create streams before stepdowns
+ for (var i = 0; i < 5; i++)
+ await cluster.CreateStreamAsync($"RAPID{i}", [$"rapid{i}.>"], replicas: 3);
+
+ var leaderVersions = new List();
+ var initialState = cluster.GetMetaState();
+ leaderVersions.Add(initialState!.LeadershipVersion);
+
+ // Perform 20 rapid meta stepdowns
+ for (var sd = 0; sd < 20; sd++)
+ {
+ cluster.StepDownMetaLeader();
+ var state = cluster.GetMetaState();
+ leaderVersions.Add(state!.LeadershipVersion);
+ }
+
+ // Leadership version must monotonically increase
+ for (var i = 1; i < leaderVersions.Count; i++)
+ leaderVersions[i].ShouldBeGreaterThan(leaderVersions[i - 1]);
+
+ // All streams still accessible
+ var names = await cluster.RequestAsync(JetStreamApiSubjects.StreamNames, "{}");
+ names.StreamNames!.Count.ShouldBe(5);
+ for (var i = 0; i < 5; i++)
+ names.StreamNames.ShouldContain($"RAPID{i}");
+ }
+
+ // ---------------------------------------------------------------
+ // Go ref: TestJetStreamClusterLong10000MessagesR1 — jetstream_cluster_long_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "LongRunning")]
+ public async Task Ten_thousand_small_messages_in_R1_stream()
+ {
+ // Go ref: TestJetStreamClusterLong10000MessagesR1 — jetstream_cluster_long_test.go
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ await cluster.CreateStreamAsync("R1HUGE", ["r1h.>"], replicas: 1);
+
+ for (var i = 0; i < 10000; i++)
+ {
+ var ack = await cluster.PublishAsync("r1h.event", $"x{i}");
+ ack.ErrorCode.ShouldBeNull();
+ }
+
+ var state = await cluster.GetStreamStateAsync("R1HUGE");
+ state.Messages.ShouldBe(10000UL);
+ state.LastSeq.ShouldBe(10000UL);
+ }
+
+ // ---------------------------------------------------------------
+ // Go ref: TestJetStreamClusterLongMaxMessagesLimit — jetstream_cluster_long_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "LongRunning")]
+ public async Task Stream_with_max_messages_100_has_exactly_100_after_1000_publishes()
+ {
+ // Go ref: TestJetStreamClusterLongMaxMessagesLimit — jetstream_cluster_long_test.go
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ var cfg = new StreamConfig
+ {
+ Name = "MAXLIMIT",
+ Subjects = ["ml.>"],
+ Replicas = 3,
+ MaxMsgs = 100,
+ };
+ cluster.CreateStreamDirect(cfg);
+
+ for (var i = 0; i < 1000; i++)
+ await cluster.PublishAsync("ml.event", $"msg-{i}");
+
+ var state = await cluster.GetStreamStateAsync("MAXLIMIT");
+ // MaxMsgs=100: only the latest 100 messages retained (old ones discarded)
+ state.Messages.ShouldBeLessThanOrEqualTo(100UL);
+ state.Messages.ShouldBeGreaterThan(0UL);
+ }
+
+ // ---------------------------------------------------------------
+ // Go ref: TestJetStreamClusterLongConsumerPendingWithMaxMessages — jetstream_cluster_long_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "LongRunning")]
+ public async Task Consumer_on_max_messages_stream_tracks_correct_pending()
+ {
+ // Go ref: TestJetStreamClusterLongConsumerPendingWithMaxMessages — jetstream_cluster_long_test.go
+ await using var cluster = await JetStreamClusterFixture.StartAsync(3);
+
+ var cfg = new StreamConfig
+ {
+ Name = "MAXPEND",
+ Subjects = ["mp.>"],
+ Replicas = 3,
+ MaxMsgs = 50,
+ };
+ cluster.CreateStreamDirect(cfg);
+
+ // Publish 200 messages (150 will be evicted by MaxMsgs)
+ for (var i = 0; i < 200; i++)
+ await cluster.PublishAsync("mp.event", $"msg-{i}");
+
+ var state = await cluster.GetStreamStateAsync("MAXPEND");
+ // Stream retains at most 50 messages
+ state.Messages.ShouldBeLessThanOrEqualTo(50UL);
+
+ // Create consumer after publishes (starts at current first seq)
+ await cluster.CreateConsumerAsync("MAXPEND", "latecons", filterSubject: "mp.>",
+ ackPolicy: AckPolicy.None);
+
+ var batch = await cluster.FetchAsync("MAXPEND", "latecons", 100);
+ // Consumer should see only retained messages
+ ((ulong)batch.Messages.Count).ShouldBeLessThanOrEqualTo(state.Messages);
+ }
+}