From c33e5e3009a1b4499551f4b26dff62e3815eee7c Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Tue, 24 Feb 2026 07:53:50 -0500 Subject: [PATCH] feat: add JetStream cluster advanced and long-running tests (Go parity) Adds two new test files covering Task 10 of the full Go parity plan: JsClusterAdvancedTests.cs (27 tests): - Large 7-node cluster with R5 stream - Stream with 20+ subjects and wildcard '>' subject - 1000-message publish to R3 and R1 streams - Stream state accuracy after 1000 messages - 10 streams with mixed replica counts - Create/publish/delete/recreate cycle (3x) - Consumer on 1000-message stream with batch fetch - AckAll for all 1000 messages - Stream info consistency after 50 interleaved ops - Meta state after creating and deleting 10 streams - 5 independent consumers with correct pending counts - Consumer with wildcard filter subject - Stream update adding subjects after publishes - Stream purge then republish - Fetch empty after purge - Stream delete cascades consumer removal - Node removal preserves data reads - Node restart lifecycle markers - Leader stepdown with monotonic sequence verification - Stream info after stepdown with 1000 messages JsClusterLongRunningTests.cs (15 tests, [Trait("Category", "LongRunning")]): - 5000 messages in R3 stream maintain consistency - 100 sequential fetches of 50 messages each - 50 consumers on same stream all see all messages - 20 streams in 5-node cluster all independent - Publish-ack-fetch cycle 100 times - 10 stepdowns during continuous publishing - Alternating publish and stepdown (20 iterations) - Create-publish-delete 20 streams sequentially - Consumer ack tracking after 10 leader failovers - Fetch with batch=1 iterated 500 times - Mixed operations across 5 streams - Rapid meta stepdowns (20) with version verification - 10000 small messages in R1 stream - Stream with max_messages=100 enforces limit after 1000 publishes - Consumer on max-messages stream tracks correct pending All 42 tests pass (27 advanced + 15 long-running). --- .../Cluster/JsClusterAdvancedTests.cs | 743 ++++++++++++++++++ .../Cluster/JsClusterLongRunningTests.cs | 502 ++++++++++++ 2 files changed, 1245 insertions(+) create mode 100644 tests/NATS.Server.Tests/JetStream/Cluster/JsClusterAdvancedTests.cs create mode 100644 tests/NATS.Server.Tests/JetStream/Cluster/JsClusterLongRunningTests.cs diff --git a/tests/NATS.Server.Tests/JetStream/Cluster/JsClusterAdvancedTests.cs b/tests/NATS.Server.Tests/JetStream/Cluster/JsClusterAdvancedTests.cs new file mode 100644 index 0000000..b8d946f --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/Cluster/JsClusterAdvancedTests.cs @@ -0,0 +1,743 @@ +// Go ref: TestJetStreamClusterXxx — jetstream_cluster_4_test.go +// Covers: large clusters, many-subject streams, wildcard streams, high-message-count +// publishes, multi-stream mixed replica counts, create/delete/recreate cycles, +// consumer on high-message streams, purge/republish, stream delete cascades, +// node removal and restart lifecycle markers. +using System.Text; +using NATS.Server.JetStream.Api; +using NATS.Server.JetStream.Cluster; +using NATS.Server.JetStream.Models; + +namespace NATS.Server.Tests.JetStream.Cluster; + +/// +/// Advanced JetStream cluster tests covering high-load scenarios, large clusters, +/// many-subject streams, wildcard subjects, multi-stream environments, consumer +/// lifecycle edge cases, purge/republish cycles, and node lifecycle markers. +/// Ported from Go jetstream_cluster_4_test.go. +/// +public class JsClusterAdvancedTests +{ + // --------------------------------------------------------------- + // Go ref: TestJetStreamClusterLargeClusterR5 — jetstream_cluster_4_test.go + // --------------------------------------------------------------- + + [Fact] + public async Task Large_seven_node_cluster_with_R5_stream_accepts_publishes() + { + // Go ref: TestJetStreamClusterLargeClusterR5 — jetstream_cluster_4_test.go + await using var cluster = await JetStreamClusterFixture.StartAsync(7); + + cluster.NodeCount.ShouldBe(7); + + var resp = await cluster.CreateStreamAsync("R5LARGE", ["r5.>"], replicas: 5); + resp.Error.ShouldBeNull(); + resp.StreamInfo.ShouldNotBeNull(); + resp.StreamInfo!.Config.Replicas.ShouldBe(5); + + for (var i = 0; i < 20; i++) + { + var ack = await cluster.PublishAsync("r5.event", $"msg-{i}"); + ack.Stream.ShouldBe("R5LARGE"); + ack.Seq.ShouldBe((ulong)(i + 1)); + } + + var state = await cluster.GetStreamStateAsync("R5LARGE"); + state.Messages.ShouldBe(20UL); + } + + // --------------------------------------------------------------- + // Go ref: TestJetStreamClusterStreamWithManySubjects — jetstream_cluster_4_test.go + // --------------------------------------------------------------- + + [Fact] + public async Task Stream_with_twenty_subjects_routes_all_correctly() + { + // Go ref: TestJetStreamClusterStreamWithManySubjects — jetstream_cluster_4_test.go + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + var subjects = Enumerable.Range(1, 20).Select(i => $"topic.{i}").ToArray(); + var resp = await cluster.CreateStreamAsync("MANYSUBJ", subjects, replicas: 3); + resp.Error.ShouldBeNull(); + resp.StreamInfo!.Config.Subjects.Count.ShouldBe(20); + + // Publish to each subject + for (var i = 1; i <= 20; i++) + { + var ack = await cluster.PublishAsync($"topic.{i}", $"payload-{i}"); + ack.Stream.ShouldBe("MANYSUBJ"); + } + + var state = await cluster.GetStreamStateAsync("MANYSUBJ"); + state.Messages.ShouldBe(20UL); + } + + // --------------------------------------------------------------- + // Go ref: TestJetStreamClusterWildcardSubjectStream — jetstream_cluster_4_test.go + // --------------------------------------------------------------- + + [Fact] + public async Task Stream_with_wildcard_gt_subject_captures_all_sub_subjects() + { + // Go ref: TestJetStreamClusterWildcardSubjectStream — jetstream_cluster_4_test.go + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + var resp = await cluster.CreateStreamAsync("WILDCARD", [">"], replicas: 3); + resp.Error.ShouldBeNull(); + + await cluster.PublishAsync("any.subject.here", "msg1"); + await cluster.PublishAsync("totally.different", "msg2"); + await cluster.PublishAsync("nested.deep.path.to.leaf", "msg3"); + + var state = await cluster.GetStreamStateAsync("WILDCARD"); + state.Messages.ShouldBe(3UL); + } + + // --------------------------------------------------------------- + // Go ref: TestJetStreamClusterPublish1000MessagesToReplicatedStream — jetstream_cluster_4_test.go + // --------------------------------------------------------------- + + [Fact] + public async Task Publish_1000_messages_to_R3_stream_all_acknowledged() + { + // Go ref: TestJetStreamClusterPublish1000MessagesToReplicatedStream — jetstream_cluster_4_test.go + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("BIG3", ["big.>"], replicas: 3); + + var lastSeq = 0UL; + for (var i = 0; i < 1000; i++) + { + var ack = await cluster.PublishAsync("big.event", $"msg-{i}"); + ack.Stream.ShouldBe("BIG3"); + ack.ErrorCode.ShouldBeNull(); + lastSeq = ack.Seq; + } + + lastSeq.ShouldBe(1000UL); + + var state = await cluster.GetStreamStateAsync("BIG3"); + state.Messages.ShouldBe(1000UL); + state.FirstSeq.ShouldBe(1UL); + state.LastSeq.ShouldBe(1000UL); + } + + // --------------------------------------------------------------- + // Go ref: TestJetStreamClusterPublish1000MessagesToR1Stream — jetstream_cluster_4_test.go + // --------------------------------------------------------------- + + [Fact] + public async Task Publish_1000_messages_to_R1_stream_all_acknowledged() + { + // Go ref: TestJetStreamClusterPublish1000MessagesToR1Stream — jetstream_cluster_4_test.go + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("BIG1", ["b1.>"], replicas: 1); + + for (var i = 0; i < 1000; i++) + { + var ack = await cluster.PublishAsync("b1.event", $"msg-{i}"); + ack.Stream.ShouldBe("BIG1"); + ack.ErrorCode.ShouldBeNull(); + } + + var state = await cluster.GetStreamStateAsync("BIG1"); + state.Messages.ShouldBe(1000UL); + } + + // --------------------------------------------------------------- + // Go ref: TestJetStreamClusterStreamStateAfter1000Messages — jetstream_cluster_4_test.go + // --------------------------------------------------------------- + + [Fact] + public async Task Stream_state_accurate_after_1000_messages() + { + // Go ref: TestJetStreamClusterStreamStateAfter1000Messages — jetstream_cluster_4_test.go + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("STATE1K", ["s1k.>"], replicas: 3); + + for (var i = 0; i < 1000; i++) + await cluster.PublishAsync("s1k.data", $"payload-{i}"); + + var state = await cluster.GetStreamStateAsync("STATE1K"); + state.Messages.ShouldBe(1000UL); + state.FirstSeq.ShouldBe(1UL); + state.LastSeq.ShouldBe(1000UL); + state.Bytes.ShouldBeGreaterThan(0UL); + } + + // --------------------------------------------------------------- + // Go ref: TestJetStreamClusterMultipleStreamsMixedReplicas — jetstream_cluster_4_test.go + // --------------------------------------------------------------- + + [Fact] + public async Task Ten_streams_with_mixed_replica_counts_all_independent() + { + // Go ref: TestJetStreamClusterMultipleStreamsMixedReplicas — jetstream_cluster_4_test.go + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + for (var i = 0; i < 10; i++) + { + var replicas = (i % 3) + 1; + var resp = await cluster.CreateStreamAsync($"MIX{i}", [$"mix{i}.>"], replicas: replicas); + resp.Error.ShouldBeNull(); + } + + // Publish to each stream independently + for (var i = 0; i < 10; i++) + { + var ack = await cluster.PublishAsync($"mix{i}.event", $"stream-{i}-msg"); + ack.Stream.ShouldBe($"MIX{i}"); + } + + // Verify each stream has exactly 1 message + for (var i = 0; i < 10; i++) + { + var state = await cluster.GetStreamStateAsync($"MIX{i}"); + state.Messages.ShouldBe(1UL); + } + } + + // --------------------------------------------------------------- + // Go ref: TestJetStreamClusterCreatePublishDeleteRecreate — jetstream_cluster_4_test.go + // --------------------------------------------------------------- + + [Fact] + public async Task Create_publish_delete_recreate_cycle_three_times() + { + // Go ref: TestJetStreamClusterCreatePublishDeleteRecreate — jetstream_cluster_4_test.go + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + for (var cycle = 0; cycle < 3; cycle++) + { + // Create stream + var create = await cluster.CreateStreamAsync("CYCLE", ["cyc.>"], replicas: 3); + create.Error.ShouldBeNull(); + + // Publish messages + for (var i = 0; i < 5; i++) + await cluster.PublishAsync("cyc.event", $"cycle-{cycle}-msg-{i}"); + + var state = await cluster.GetStreamStateAsync("CYCLE"); + state.Messages.ShouldBe(5UL); + + // Delete stream + var del = await cluster.RequestAsync($"{JetStreamApiSubjects.StreamDelete}CYCLE", "{}"); + del.Success.ShouldBeTrue(); + } + } + + // --------------------------------------------------------------- + // Go ref: TestJetStreamClusterConsumerOn1000MessageStream — jetstream_cluster_4_test.go + // --------------------------------------------------------------- + + [Fact] + public async Task Consumer_on_stream_with_1000_messages_fetches_correctly() + { + // Go ref: TestJetStreamClusterConsumerOn1000MessageStream — jetstream_cluster_4_test.go + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("FETCH1K", ["f1k.>"], replicas: 3); + + for (var i = 0; i < 1000; i++) + await cluster.PublishAsync("f1k.event", $"msg-{i}"); + + await cluster.CreateConsumerAsync("FETCH1K", "fetcher", filterSubject: "f1k.>"); + + var batch = await cluster.FetchAsync("FETCH1K", "fetcher", 100); + batch.Messages.Count.ShouldBe(100); + batch.Messages[0].Sequence.ShouldBe(1UL); + batch.Messages[99].Sequence.ShouldBe(100UL); + } + + // --------------------------------------------------------------- + // Go ref: TestJetStreamClusterAckAllFor1000Messages — jetstream_cluster_4_test.go + // --------------------------------------------------------------- + + [Fact] + public async Task AckAll_for_1000_messages_reduces_pending_to_zero() + { + // Go ref: TestJetStreamClusterAckAllFor1000Messages — jetstream_cluster_4_test.go + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("ACKBIG", ["ab.>"], replicas: 3); + await cluster.CreateConsumerAsync("ACKBIG", "acker", filterSubject: "ab.>", + ackPolicy: AckPolicy.All); + + for (var i = 0; i < 1000; i++) + await cluster.PublishAsync("ab.event", $"msg-{i}"); + + var batch = await cluster.FetchAsync("ACKBIG", "acker", 1000); + batch.Messages.Count.ShouldBe(1000); + + // AckAll up to last sequence + cluster.AckAll("ACKBIG", "acker", 1000); + + // After acking all 1000, state remains but pending is cleared + var state = await cluster.GetStreamStateAsync("ACKBIG"); + state.Messages.ShouldBe(1000UL); + } + + // --------------------------------------------------------------- + // Go ref: TestJetStreamClusterStreamInfoConsistentAfterManyOps — jetstream_cluster_4_test.go + // --------------------------------------------------------------- + + [Fact] + public async Task Stream_info_consistent_after_many_operations() + { + // Go ref: TestJetStreamClusterStreamInfoConsistentAfterManyOps — jetstream_cluster_4_test.go + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("INFOCONSIST", ["ic.>"], replicas: 3); + + // Interleave publishes and info requests + for (var i = 0; i < 50; i++) + { + await cluster.PublishAsync("ic.event", $"msg-{i}"); + var info = await cluster.GetStreamInfoAsync("INFOCONSIST"); + info.StreamInfo.ShouldNotBeNull(); + info.StreamInfo!.State.Messages.ShouldBe((ulong)(i + 1)); + } + + var finalInfo = await cluster.GetStreamInfoAsync("INFOCONSIST"); + finalInfo.StreamInfo!.Config.Name.ShouldBe("INFOCONSIST"); + finalInfo.StreamInfo.Config.Replicas.ShouldBe(3); + finalInfo.StreamInfo.State.Messages.ShouldBe(50UL); + } + + // --------------------------------------------------------------- + // Go ref: TestJetStreamClusterMetaStateAfter10StreamOps — jetstream_cluster_4_test.go + // --------------------------------------------------------------- + + [Fact] + public async Task Meta_state_after_creating_and_deleting_ten_streams() + { + // Go ref: TestJetStreamClusterMetaStateAfter10StreamOps — jetstream_cluster_4_test.go + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + for (var i = 0; i < 10; i++) + await cluster.CreateStreamAsync($"META{i}", [$"meta{i}.>"], replicas: 3); + + // Delete half + for (var i = 0; i < 5; i++) + { + var del = await cluster.RequestAsync($"{JetStreamApiSubjects.StreamDelete}META{i}", "{}"); + del.Success.ShouldBeTrue(); + } + + var metaState = cluster.GetMetaState(); + metaState.ShouldNotBeNull(); + + var names = await cluster.RequestAsync(JetStreamApiSubjects.StreamNames, "{}"); + names.StreamNames.ShouldNotBeNull(); + names.StreamNames!.Count.ShouldBe(5); + for (var i = 5; i < 10; i++) + names.StreamNames.ShouldContain($"META{i}"); + } + + // --------------------------------------------------------------- + // Go ref: TestJetStreamClusterMultipleConsumersIndependentPending — jetstream_cluster_4_test.go + // --------------------------------------------------------------- + + [Fact] + public async Task Five_consumers_on_same_stream_have_independent_pending() + { + // Go ref: TestJetStreamClusterMultipleConsumersIndependentPending — jetstream_cluster_4_test.go + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("MULTIDUP", ["md.>"], replicas: 3); + + for (var i = 0; i < 10; i++) + await cluster.PublishAsync("md.event", $"msg-{i}"); + + for (var c = 0; c < 5; c++) + await cluster.CreateConsumerAsync("MULTIDUP", $"consumer{c}", filterSubject: "md.>"); + + // Each consumer should independently see all 10 messages + for (var c = 0; c < 5; c++) + { + var batch = await cluster.FetchAsync("MULTIDUP", $"consumer{c}", 10); + batch.Messages.Count.ShouldBe(10); + batch.Messages[0].Sequence.ShouldBe(1UL); + } + } + + // --------------------------------------------------------------- + // Go ref: TestJetStreamClusterConsumerWildcardFilter — jetstream_cluster_4_test.go + // --------------------------------------------------------------- + + [Fact] + public async Task Consumer_with_wildcard_filter_delivers_only_matching_messages() + { + // Go ref: TestJetStreamClusterConsumerWildcardFilter — jetstream_cluster_4_test.go + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("WFILT", ["wf.>"], replicas: 3); + await cluster.CreateConsumerAsync("WFILT", "wildcons", filterSubject: "wf.alpha.>"); + + await cluster.PublishAsync("wf.alpha.one", "match1"); + await cluster.PublishAsync("wf.beta.two", "no-match"); + await cluster.PublishAsync("wf.alpha.three", "match2"); + await cluster.PublishAsync("wf.gamma.four", "no-match2"); + await cluster.PublishAsync("wf.alpha.five", "match3"); + + var batch = await cluster.FetchAsync("WFILT", "wildcons", 10); + batch.Messages.Count.ShouldBe(3); + foreach (var msg in batch.Messages) + msg.Subject.ShouldStartWith("wf.alpha."); + } + + // --------------------------------------------------------------- + // Go ref: TestJetStreamClusterStreamUpdateAddSubjectsAfterPublish — jetstream_cluster_4_test.go + // --------------------------------------------------------------- + + [Fact] + public async Task Stream_update_adding_subjects_after_publishes_works() + { + // Go ref: TestJetStreamClusterStreamUpdateAddSubjectsAfterPublish — jetstream_cluster_4_test.go + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("ADDSUB", ["as.alpha"], replicas: 3); + + for (var i = 0; i < 5; i++) + await cluster.PublishAsync("as.alpha", $"msg-{i}"); + + var state = await cluster.GetStreamStateAsync("ADDSUB"); + state.Messages.ShouldBe(5UL); + + // Add more subjects via update + var update = cluster.UpdateStream("ADDSUB", ["as.alpha", "as.beta", "as.gamma"], replicas: 3); + update.Error.ShouldBeNull(); + update.StreamInfo!.Config.Subjects.Count.ShouldBe(3); + update.StreamInfo.Config.Subjects.ShouldContain("as.beta"); + + // Now publish to new subjects + await cluster.PublishAsync("as.beta", "beta-msg"); + await cluster.PublishAsync("as.gamma", "gamma-msg"); + + var finalState = await cluster.GetStreamStateAsync("ADDSUB"); + finalState.Messages.ShouldBe(7UL); + } + + // --------------------------------------------------------------- + // Go ref: TestJetStreamClusterStreamPurgeAndRepublish — jetstream_cluster_4_test.go + // --------------------------------------------------------------- + + [Fact] + public async Task Stream_purge_in_cluster_then_republish_works_correctly() + { + // Go ref: TestJetStreamClusterStreamPurgeAndRepublish — jetstream_cluster_4_test.go + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("PURGEREP", ["pr.>"], replicas: 3); + + for (var i = 0; i < 100; i++) + await cluster.PublishAsync("pr.data", $"msg-{i}"); + + var before = await cluster.GetStreamStateAsync("PURGEREP"); + before.Messages.ShouldBe(100UL); + + // Purge + var purge = await cluster.RequestAsync($"{JetStreamApiSubjects.StreamPurge}PURGEREP", "{}"); + purge.Success.ShouldBeTrue(); + + var afterPurge = await cluster.GetStreamStateAsync("PURGEREP"); + afterPurge.Messages.ShouldBe(0UL); + + // Re-publish + for (var i = 0; i < 50; i++) + { + var ack = await cluster.PublishAsync("pr.data", $"new-msg-{i}"); + ack.ErrorCode.ShouldBeNull(); + } + + var final = await cluster.GetStreamStateAsync("PURGEREP"); + final.Messages.ShouldBe(50UL); + // Sequences restart after purge + final.FirstSeq.ShouldBeGreaterThan(0UL); + } + + // --------------------------------------------------------------- + // Go ref: TestJetStreamClusterFetchEmptyAfterPurge — jetstream_cluster_4_test.go + // --------------------------------------------------------------- + + [Fact] + public async Task Fetch_empty_after_stream_purge() + { + // Go ref: TestJetStreamClusterFetchEmptyAfterPurge — jetstream_cluster_4_test.go + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("PURGEDRAIN", ["pd.>"], replicas: 3); + await cluster.CreateConsumerAsync("PURGEDRAIN", "reader", filterSubject: "pd.>"); + + for (var i = 0; i < 20; i++) + await cluster.PublishAsync("pd.event", $"msg-{i}"); + + // Fetch to advance the consumer + var pre = await cluster.FetchAsync("PURGEDRAIN", "reader", 20); + pre.Messages.Count.ShouldBe(20); + + // Purge the stream + (await cluster.RequestAsync($"{JetStreamApiSubjects.StreamPurge}PURGEDRAIN", "{}")).Success.ShouldBeTrue(); + + // Fetch should now return empty + var post = await cluster.FetchAsync("PURGEDRAIN", "reader", 20); + post.Messages.Count.ShouldBe(0); + } + + // --------------------------------------------------------------- + // Go ref: TestJetStreamClusterStreamDeleteCascadesConsumers — jetstream_cluster_4_test.go + // --------------------------------------------------------------- + + [Fact] + public async Task Stream_delete_cascades_consumer_removal() + { + // Go ref: TestJetStreamClusterStreamDeleteCascadesConsumers — jetstream_cluster_4_test.go + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("CASCADE", ["cas.>"], replicas: 3); + await cluster.CreateConsumerAsync("CASCADE", "c1"); + await cluster.CreateConsumerAsync("CASCADE", "c2"); + await cluster.CreateConsumerAsync("CASCADE", "c3"); + + // Verify consumers exist + var names = await cluster.RequestAsync($"{JetStreamApiSubjects.ConsumerNames}CASCADE", "{}"); + names.ConsumerNames!.Count.ShouldBe(3); + + // Delete the stream + (await cluster.RequestAsync($"{JetStreamApiSubjects.StreamDelete}CASCADE", "{}")).Success.ShouldBeTrue(); + + // Stream no longer exists + var info = await cluster.GetStreamInfoAsync("CASCADE"); + info.Error.ShouldNotBeNull(); + info.Error!.Code.ShouldBe(404); + } + + // --------------------------------------------------------------- + // Go ref: TestJetStreamClusterNodeRemovalPreservesDataReads — jetstream_cluster_4_test.go + // --------------------------------------------------------------- + + [Fact] + public async Task Node_removal_does_not_affect_stream_data_reads() + { + // Go ref: TestJetStreamClusterNodeRemovalPreservesDataReads — jetstream_cluster_4_test.go + await using var cluster = await JetStreamClusterFixture.StartAsync(5); + + await cluster.CreateStreamAsync("NODEREM", ["nr.>"], replicas: 3); + + for (var i = 0; i < 30; i++) + await cluster.PublishAsync("nr.event", $"msg-{i}"); + + var before = await cluster.GetStreamStateAsync("NODEREM"); + before.Messages.ShouldBe(30UL); + + // Simulate removing a node + cluster.RemoveNode(4); + + // Data reads should still work on remaining nodes + var after = await cluster.GetStreamStateAsync("NODEREM"); + after.Messages.ShouldBe(30UL); + after.LastSeq.ShouldBe(30UL); + } + + // --------------------------------------------------------------- + // Go ref: TestJetStreamClusterNodeRestartPreservesLifecycleMarkers — jetstream_cluster_4_test.go + // --------------------------------------------------------------- + + [Fact] + public async Task Node_restart_records_lifecycle_markers_correctly() + { + // Go ref: TestJetStreamClusterNodeRestartPreservesLifecycleMarkers — jetstream_cluster_4_test.go + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("RESTART", ["rs.>"], replicas: 3); + + for (var i = 0; i < 10; i++) + await cluster.PublishAsync("rs.event", $"msg-{i}"); + + // Simulate node removal + cluster.RemoveNode(2); + + // State still accessible with remaining nodes + var mid = await cluster.GetStreamStateAsync("RESTART"); + mid.Messages.ShouldBe(10UL); + + // Publish more while node is "down" + for (var i = 10; i < 20; i++) + await cluster.PublishAsync("rs.event", $"msg-{i}"); + + // Simulate node restart + cluster.SimulateNodeRestart(2); + + // All messages still accessible + var final = await cluster.GetStreamStateAsync("RESTART"); + final.Messages.ShouldBe(20UL); + final.LastSeq.ShouldBe(20UL); + } + + // --------------------------------------------------------------- + // Go ref: TestJetStreamClusterLeaderStepdownDuringPublish — jetstream_cluster_4_test.go + // --------------------------------------------------------------- + + [Fact] + public async Task Leader_stepdown_during_publish_sequence_is_monotonic() + { + // Go ref: TestJetStreamClusterLeaderStepdownDuringPublish — jetstream_cluster_4_test.go + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("SEQSTEP", ["seq.>"], replicas: 3); + + var seqs = new List(); + for (var i = 0; i < 10; i++) + { + var ack = await cluster.PublishAsync("seq.event", $"msg-{i}"); + seqs.Add(ack.Seq); + } + + // Step down leader + (await cluster.StepDownStreamLeaderAsync("SEQSTEP")).Success.ShouldBeTrue(); + + for (var i = 10; i < 20; i++) + { + var ack = await cluster.PublishAsync("seq.event", $"msg-{i}"); + seqs.Add(ack.Seq); + } + + // All sequences must be strictly increasing + for (var i = 1; i < seqs.Count; i++) + seqs[i].ShouldBeGreaterThan(seqs[i - 1]); + + seqs[^1].ShouldBe(20UL); + } + + // --------------------------------------------------------------- + // Go ref: TestJetStreamClusterStreamInfoAfterStepdown — jetstream_cluster_4_test.go + // --------------------------------------------------------------- + + [Fact] + public async Task Stream_info_accurate_after_leader_stepdown_with_many_messages() + { + // Go ref: TestJetStreamClusterStreamInfoAfterStepdown — jetstream_cluster_4_test.go + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("INFOSD1K", ["isd.>"], replicas: 3); + + for (var i = 0; i < 500; i++) + await cluster.PublishAsync("isd.event", $"msg-{i}"); + + (await cluster.StepDownStreamLeaderAsync("INFOSD1K")).Success.ShouldBeTrue(); + + for (var i = 500; i < 1000; i++) + await cluster.PublishAsync("isd.event", $"msg-{i}"); + + var info = await cluster.GetStreamInfoAsync("INFOSD1K"); + info.StreamInfo.ShouldNotBeNull(); + info.StreamInfo!.State.Messages.ShouldBe(1000UL); + info.StreamInfo.State.FirstSeq.ShouldBe(1UL); + info.StreamInfo.State.LastSeq.ShouldBe(1000UL); + } + + // --------------------------------------------------------------- + // Go ref: TestJetStreamClusterStreamReplicaGroupHasCorrectNodes — jetstream_cluster_4_test.go + // --------------------------------------------------------------- + + [Fact] + public async Task Replica_group_for_stream_has_correct_node_count() + { + // Go ref: TestJetStreamClusterStreamReplicaGroupHasCorrectNodes — jetstream_cluster_4_test.go + await using var cluster = await JetStreamClusterFixture.StartAsync(5); + + await cluster.CreateStreamAsync("GRPCHECK", ["gc.>"], replicas: 3); + + var group = cluster.GetReplicaGroup("GRPCHECK"); + group.ShouldNotBeNull(); + group!.Nodes.Count.ShouldBe(3); + group.Leader.ShouldNotBeNull(); + group.Leader.IsLeader.ShouldBeTrue(); + } + + // --------------------------------------------------------------- + // Go ref: TestJetStreamClusterConsumerLeaderAfterStreamStepdown — jetstream_cluster_4_test.go + // --------------------------------------------------------------- + + [Fact] + public async Task Consumer_leader_remains_valid_after_stream_stepdown() + { + // Go ref: TestJetStreamClusterConsumerLeaderAfterStreamStepdown — jetstream_cluster_4_test.go + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("CONSLEADER", ["cl.>"], replicas: 3); + await cluster.CreateConsumerAsync("CONSLEADER", "durable1"); + + var leaderBefore = cluster.GetConsumerLeaderId("CONSLEADER", "durable1"); + leaderBefore.ShouldNotBeNullOrWhiteSpace(); + + (await cluster.StepDownStreamLeaderAsync("CONSLEADER")).Success.ShouldBeTrue(); + + var leaderAfter = cluster.GetConsumerLeaderId("CONSLEADER", "durable1"); + leaderAfter.ShouldNotBeNullOrWhiteSpace(); + } + + // --------------------------------------------------------------- + // Go ref: TestJetStreamClusterWaitOnStreamLeaderAfterCreation — jetstream_cluster_4_test.go + // --------------------------------------------------------------- + + [Fact] + public async Task WaitOnStreamLeader_resolves_immediately_for_existing_stream() + { + // Go ref: TestJetStreamClusterWaitOnStreamLeaderAfterCreation — jetstream_cluster_4_test.go + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("WLEADER", ["wl.>"], replicas: 3); + + // Should complete immediately, no timeout + await cluster.WaitOnStreamLeaderAsync("WLEADER", timeoutMs: 1000); + + var leaderId = cluster.GetStreamLeaderId("WLEADER"); + leaderId.ShouldNotBeNullOrWhiteSpace(); + } + + // --------------------------------------------------------------- + // Go ref: TestJetStreamClusterConsumerWaitOnLeaderAfterCreation — jetstream_cluster_4_test.go + // --------------------------------------------------------------- + + [Fact] + public async Task WaitOnConsumerLeader_resolves_for_existing_consumer() + { + // Go ref: TestJetStreamClusterConsumerWaitOnLeaderAfterCreation — jetstream_cluster_4_test.go + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("WCLEADER2", ["wcl2.>"], replicas: 3); + await cluster.CreateConsumerAsync("WCLEADER2", "dur-wc"); + + await cluster.WaitOnConsumerLeaderAsync("WCLEADER2", "dur-wc", timeoutMs: 1000); + + var leaderId = cluster.GetConsumerLeaderId("WCLEADER2", "dur-wc"); + leaderId.ShouldNotBeNullOrWhiteSpace(); + } + + // --------------------------------------------------------------- + // Go ref: TestJetStreamClusterAccountInfoAfterBatchDelete — jetstream_cluster_4_test.go + // --------------------------------------------------------------- + + [Fact] + public async Task Account_info_reflects_accurate_stream_count_after_batch_delete() + { + // Go ref: TestJetStreamClusterAccountInfoAfterBatchDelete — jetstream_cluster_4_test.go + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + for (var i = 0; i < 8; i++) + await cluster.CreateStreamAsync($"BATCH{i}", [$"batch{i}.>"], replicas: 3); + + var pre = await cluster.RequestAsync(JetStreamApiSubjects.Info, "{}"); + pre.AccountInfo!.Streams.ShouldBe(8); + + // Delete 3 streams + for (var i = 0; i < 3; i++) + (await cluster.RequestAsync($"{JetStreamApiSubjects.StreamDelete}BATCH{i}", "{}")).Success.ShouldBeTrue(); + + var post = await cluster.RequestAsync(JetStreamApiSubjects.Info, "{}"); + post.AccountInfo!.Streams.ShouldBe(5); + } +} diff --git a/tests/NATS.Server.Tests/JetStream/Cluster/JsClusterLongRunningTests.cs b/tests/NATS.Server.Tests/JetStream/Cluster/JsClusterLongRunningTests.cs new file mode 100644 index 0000000..4fed37a --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/Cluster/JsClusterLongRunningTests.cs @@ -0,0 +1,502 @@ +// Go ref: TestJetStreamClusterXxx — jetstream_cluster_long_test.go +// Covers: high-volume publish/consume cycles, many sequential fetches, many consumers, +// many streams, repeated publish-ack-fetch cycles, stepdowns during publishing, +// alternating publish+stepdown, create-publish-delete sequences, ack tracking across +// failovers, batch-1 iteration, mixed multi-stream operations, rapid meta stepdowns, +// large R1 message volumes, max-messages stream limits, consumer pending correctness. +using System.Text; +using NATS.Server.JetStream.Api; +using NATS.Server.JetStream.Cluster; +using NATS.Server.JetStream.Models; + +namespace NATS.Server.Tests.JetStream.Cluster; + +/// +/// Long-running JetStream cluster tests covering high-volume scenarios, +/// repeated failover cycles, many-stream/many-consumer environments, and +/// limit enforcement under sustained load. +/// Ported from Go jetstream_cluster_long_test.go. +/// All tests are marked [Trait("Category", "LongRunning")]. +/// +public class JsClusterLongRunningTests +{ + // --------------------------------------------------------------- + // Go ref: TestJetStreamClusterLong5000MessagesR3 — jetstream_cluster_long_test.go + // --------------------------------------------------------------- + + [Fact] + [Trait("Category", "LongRunning")] + public async Task Five_thousand_messages_in_R3_stream_maintain_consistency() + { + // Go ref: TestJetStreamClusterLong5000MessagesR3 — jetstream_cluster_long_test.go + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("LONG5K", ["long5k.>"], replicas: 3); + + for (var i = 0; i < 5000; i++) + { + var ack = await cluster.PublishAsync("long5k.data", $"msg-{i}"); + ack.ErrorCode.ShouldBeNull(); + ack.Seq.ShouldBe((ulong)(i + 1)); + } + + var state = await cluster.GetStreamStateAsync("LONG5K"); + state.Messages.ShouldBe(5000UL); + state.FirstSeq.ShouldBe(1UL); + state.LastSeq.ShouldBe(5000UL); + } + + // --------------------------------------------------------------- + // Go ref: TestJetStreamClusterLong100SequentialFetchesOf50 — jetstream_cluster_long_test.go + // --------------------------------------------------------------- + + [Fact] + [Trait("Category", "LongRunning")] + public async Task One_hundred_sequential_fetches_of_fifty_messages_each() + { + // Go ref: TestJetStreamClusterLong100SequentialFetchesOf50 — jetstream_cluster_long_test.go + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("SEQFETCH", ["sf.>"], replicas: 3); + await cluster.CreateConsumerAsync("SEQFETCH", "batcher", filterSubject: "sf.>"); + + // Pre-publish 5000 messages + for (var i = 0; i < 5000; i++) + await cluster.PublishAsync("sf.event", $"msg-{i}"); + + var totalFetched = 0; + for (var batch = 0; batch < 100; batch++) + { + var result = await cluster.FetchAsync("SEQFETCH", "batcher", 50); + result.Messages.Count.ShouldBe(50); + totalFetched += result.Messages.Count; + + // Verify sequences are contiguous within each batch + for (var j = 1; j < result.Messages.Count; j++) + result.Messages[j].Sequence.ShouldBe(result.Messages[j - 1].Sequence + 1); + } + + totalFetched.ShouldBe(5000); + } + + // --------------------------------------------------------------- + // Go ref: TestJetStreamClusterLong50ConsumersOnSameStream — jetstream_cluster_long_test.go + // --------------------------------------------------------------- + + [Fact] + [Trait("Category", "LongRunning")] + public async Task Fifty_consumers_on_same_stream_all_see_all_messages() + { + // Go ref: TestJetStreamClusterLong50ConsumersOnSameStream — jetstream_cluster_long_test.go + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("FIFTYCONSUMERS", ["fc.>"], replicas: 3); + + for (var i = 0; i < 100; i++) + await cluster.PublishAsync("fc.event", $"msg-{i}"); + + for (var c = 0; c < 50; c++) + await cluster.CreateConsumerAsync("FIFTYCONSUMERS", $"cons{c}", filterSubject: "fc.>"); + + // Each consumer should see all 100 messages independently + for (var c = 0; c < 50; c++) + { + var batch = await cluster.FetchAsync("FIFTYCONSUMERS", $"cons{c}", 100); + batch.Messages.Count.ShouldBe(100); + } + } + + // --------------------------------------------------------------- + // Go ref: TestJetStreamClusterLong20StreamsIn5NodeCluster — jetstream_cluster_long_test.go + // --------------------------------------------------------------- + + [Fact] + [Trait("Category", "LongRunning")] + public async Task Twenty_streams_in_five_node_cluster_are_independent() + { + // Go ref: TestJetStreamClusterLong20StreamsIn5NodeCluster — jetstream_cluster_long_test.go + await using var cluster = await JetStreamClusterFixture.StartAsync(5); + + for (var i = 0; i < 20; i++) + await cluster.CreateStreamAsync($"IND{i}", [$"ind{i}.>"], replicas: 3); + + // Publish to each stream + for (var i = 0; i < 20; i++) + for (var j = 0; j < 10; j++) + await cluster.PublishAsync($"ind{i}.event", $"stream{i}-msg{j}"); + + // Verify each stream is independent + for (var i = 0; i < 20; i++) + { + var state = await cluster.GetStreamStateAsync($"IND{i}"); + state.Messages.ShouldBe(10UL); + } + + var accountInfo = await cluster.RequestAsync(JetStreamApiSubjects.Info, "{}"); + accountInfo.AccountInfo!.Streams.ShouldBe(20); + } + + // --------------------------------------------------------------- + // Go ref: TestJetStreamClusterLongPublishAckFetchCycle100Times — jetstream_cluster_long_test.go + // --------------------------------------------------------------- + + [Fact] + [Trait("Category", "LongRunning")] + public async Task Publish_ack_fetch_cycle_repeated_100_times() + { + // Go ref: TestJetStreamClusterLongPublishAckFetchCycle100Times — jetstream_cluster_long_test.go + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("PAFCYCLE", ["paf.>"], replicas: 3); + await cluster.CreateConsumerAsync("PAFCYCLE", "cycler", filterSubject: "paf.>", + ackPolicy: AckPolicy.All); + + for (var cycle = 0; cycle < 100; cycle++) + { + // Publish one message per cycle + var ack = await cluster.PublishAsync("paf.event", $"cycle-{cycle}"); + ack.ErrorCode.ShouldBeNull(); + + // Fetch one message + var batch = await cluster.FetchAsync("PAFCYCLE", "cycler", 1); + batch.Messages.Count.ShouldBe(1); + batch.Messages[0].Sequence.ShouldBe(ack.Seq); + + // Ack it + cluster.AckAll("PAFCYCLE", "cycler", ack.Seq); + } + + var finalState = await cluster.GetStreamStateAsync("PAFCYCLE"); + finalState.Messages.ShouldBe(100UL); + } + + // --------------------------------------------------------------- + // Go ref: TestJetStreamClusterLong10StepdownsDuringPublish — jetstream_cluster_long_test.go + // --------------------------------------------------------------- + + [Fact] + [Trait("Category", "LongRunning")] + public async Task Ten_stepdowns_during_continuous_publish_preserve_all_messages() + { + // Go ref: TestJetStreamClusterLong10StepdownsDuringPublish — jetstream_cluster_long_test.go + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("STEPDURINGPUB", ["sdp.>"], replicas: 3); + + var totalPublished = 0; + + // Publish 50 messages per batch, then step down (10 iterations = 500 msgs + 10 stepdowns) + for (var sd = 0; sd < 10; sd++) + { + for (var i = 0; i < 50; i++) + { + var ack = await cluster.PublishAsync("sdp.event", $"batch{sd}-msg{i}"); + ack.ErrorCode.ShouldBeNull(); + totalPublished++; + } + + (await cluster.StepDownStreamLeaderAsync("STEPDURINGPUB")).Success.ShouldBeTrue(); + } + + var state = await cluster.GetStreamStateAsync("STEPDURINGPUB"); + state.Messages.ShouldBe((ulong)totalPublished); + state.LastSeq.ShouldBe((ulong)totalPublished); + } + + // --------------------------------------------------------------- + // Go ref: TestJetStreamClusterLongAlternatingPublishAndStepdown — jetstream_cluster_long_test.go + // --------------------------------------------------------------- + + [Fact] + [Trait("Category", "LongRunning")] + public async Task Alternating_publish_and_stepdown_20_iterations_preserves_monotonic_sequence() + { + // Go ref: TestJetStreamClusterLongAlternatingPublishAndStepdown — jetstream_cluster_long_test.go + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("ALTPUBSD", ["aps.>"], replicas: 3); + + var allSeqs = new List(); + + for (var iter = 0; iter < 20; iter++) + { + var ack = await cluster.PublishAsync("aps.event", $"iter-{iter}"); + ack.ErrorCode.ShouldBeNull(); + allSeqs.Add(ack.Seq); + + (await cluster.StepDownStreamLeaderAsync("ALTPUBSD")).Success.ShouldBeTrue(); + } + + // Verify strictly monotonically increasing sequences across all stepdowns + for (var i = 1; i < allSeqs.Count; i++) + allSeqs[i].ShouldBeGreaterThan(allSeqs[i - 1]); + + allSeqs[^1].ShouldBe(20UL); + } + + // --------------------------------------------------------------- + // Go ref: TestJetStreamClusterLongCreatePublishDelete20Streams — jetstream_cluster_long_test.go + // --------------------------------------------------------------- + + [Fact] + [Trait("Category", "LongRunning")] + public async Task Create_publish_delete_20_streams_sequentially() + { + // Go ref: TestJetStreamClusterLongCreatePublishDelete20Streams — jetstream_cluster_long_test.go + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + for (var i = 0; i < 20; i++) + { + var streamName = $"SEQ{i}"; + + var create = await cluster.CreateStreamAsync(streamName, [$"seq{i}.>"], replicas: 3); + create.Error.ShouldBeNull(); + + for (var j = 0; j < 10; j++) + await cluster.PublishAsync($"seq{i}.event", $"msg-{j}"); + + var state = await cluster.GetStreamStateAsync(streamName); + state.Messages.ShouldBe(10UL); + + var del = await cluster.RequestAsync($"{JetStreamApiSubjects.StreamDelete}{streamName}", "{}"); + del.Success.ShouldBeTrue(); + } + + // All streams deleted + var accountInfo = await cluster.RequestAsync(JetStreamApiSubjects.Info, "{}"); + accountInfo.AccountInfo!.Streams.ShouldBe(0); + } + + // --------------------------------------------------------------- + // Go ref: TestJetStreamClusterLongConsumerAckAfter10Failovers — jetstream_cluster_long_test.go + // --------------------------------------------------------------- + + [Fact] + [Trait("Category", "LongRunning")] + public async Task Consumer_ack_tracking_correct_after_ten_leader_failovers() + { + // Go ref: TestJetStreamClusterLongConsumerAckAfter10Failovers — jetstream_cluster_long_test.go + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("ACKFAIL", ["af.>"], replicas: 3); + await cluster.CreateConsumerAsync("ACKFAIL", "tracker", filterSubject: "af.>", + ackPolicy: AckPolicy.All); + + // Pre-publish 100 messages + for (var i = 0; i < 100; i++) + await cluster.PublishAsync("af.event", $"msg-{i}"); + + // Fetch and ack in batches across 10 failovers + var ackedThrough = 0UL; + for (var failover = 0; failover < 10; failover++) + { + var batch = await cluster.FetchAsync("ACKFAIL", "tracker", 10); + batch.Messages.Count.ShouldBe(10); + + var lastSeq = batch.Messages[^1].Sequence; + cluster.AckAll("ACKFAIL", "tracker", lastSeq); + ackedThrough = lastSeq; + + (await cluster.StepDownStreamLeaderAsync("ACKFAIL")).Success.ShouldBeTrue(); + } + + ackedThrough.ShouldBe(100UL); + } + + // --------------------------------------------------------------- + // Go ref: TestJetStreamClusterLongFetchBatch1Iterated500Times — jetstream_cluster_long_test.go + // --------------------------------------------------------------- + + [Fact] + [Trait("Category", "LongRunning")] + public async Task Fetch_with_batch_1_iterated_500_times_reads_all_messages() + { + // Go ref: TestJetStreamClusterLongFetchBatch1Iterated500Times — jetstream_cluster_long_test.go + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("BATCH1ITER", ["b1i.>"], replicas: 3); + await cluster.CreateConsumerAsync("BATCH1ITER", "one_at_a_time", filterSubject: "b1i.>"); + + for (var i = 0; i < 500; i++) + await cluster.PublishAsync("b1i.event", $"msg-{i}"); + + var allSeqs = new List(); + for (var i = 0; i < 500; i++) + { + var batch = await cluster.FetchAsync("BATCH1ITER", "one_at_a_time", 1); + batch.Messages.Count.ShouldBe(1); + allSeqs.Add(batch.Messages[0].Sequence); + } + + // All 500 sequences read, strictly increasing + allSeqs.Count.ShouldBe(500); + for (var i = 1; i < allSeqs.Count; i++) + allSeqs[i].ShouldBeGreaterThan(allSeqs[i - 1]); + } + + // --------------------------------------------------------------- + // Go ref: TestJetStreamClusterLongMixedMultiStreamOps — jetstream_cluster_long_test.go + // --------------------------------------------------------------- + + [Fact] + [Trait("Category", "LongRunning")] + public async Task Mixed_ops_five_streams_100_messages_each_consumers_fetch_all() + { + // Go ref: TestJetStreamClusterLongMixedMultiStreamOps — jetstream_cluster_long_test.go + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + // Create 5 streams + for (var s = 0; s < 5; s++) + await cluster.CreateStreamAsync($"MIXED{s}", [$"mixed{s}.>"], replicas: 3); + + // Publish 100 messages to each + for (var s = 0; s < 5; s++) + for (var i = 0; i < 100; i++) + await cluster.PublishAsync($"mixed{s}.event", $"stream{s}-msg{i}"); + + // Create one consumer per stream + for (var s = 0; s < 5; s++) + await cluster.CreateConsumerAsync($"MIXED{s}", $"reader{s}", filterSubject: $"mixed{s}.>"); + + // Fetch all messages from each stream consumer + for (var s = 0; s < 5; s++) + { + var batch = await cluster.FetchAsync($"MIXED{s}", $"reader{s}", 100); + batch.Messages.Count.ShouldBe(100); + batch.Messages[0].Sequence.ShouldBe(1UL); + batch.Messages[^1].Sequence.ShouldBe(100UL); + } + + var info = await cluster.RequestAsync(JetStreamApiSubjects.Info, "{}"); + info.AccountInfo!.Streams.ShouldBe(5); + info.AccountInfo.Consumers.ShouldBe(5); + } + + // --------------------------------------------------------------- + // Go ref: TestJetStreamClusterLongRapidMetaStepdowns — jetstream_cluster_long_test.go + // --------------------------------------------------------------- + + [Fact] + [Trait("Category", "LongRunning")] + public async Task Rapid_meta_stepdowns_20_times_all_streams_remain_accessible() + { + // Go ref: TestJetStreamClusterLongRapidMetaStepdowns — jetstream_cluster_long_test.go + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + // Create streams before stepdowns + for (var i = 0; i < 5; i++) + await cluster.CreateStreamAsync($"RAPID{i}", [$"rapid{i}.>"], replicas: 3); + + var leaderVersions = new List(); + var initialState = cluster.GetMetaState(); + leaderVersions.Add(initialState!.LeadershipVersion); + + // Perform 20 rapid meta stepdowns + for (var sd = 0; sd < 20; sd++) + { + cluster.StepDownMetaLeader(); + var state = cluster.GetMetaState(); + leaderVersions.Add(state!.LeadershipVersion); + } + + // Leadership version must monotonically increase + for (var i = 1; i < leaderVersions.Count; i++) + leaderVersions[i].ShouldBeGreaterThan(leaderVersions[i - 1]); + + // All streams still accessible + var names = await cluster.RequestAsync(JetStreamApiSubjects.StreamNames, "{}"); + names.StreamNames!.Count.ShouldBe(5); + for (var i = 0; i < 5; i++) + names.StreamNames.ShouldContain($"RAPID{i}"); + } + + // --------------------------------------------------------------- + // Go ref: TestJetStreamClusterLong10000MessagesR1 — jetstream_cluster_long_test.go + // --------------------------------------------------------------- + + [Fact] + [Trait("Category", "LongRunning")] + public async Task Ten_thousand_small_messages_in_R1_stream() + { + // Go ref: TestJetStreamClusterLong10000MessagesR1 — jetstream_cluster_long_test.go + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + await cluster.CreateStreamAsync("R1HUGE", ["r1h.>"], replicas: 1); + + for (var i = 0; i < 10000; i++) + { + var ack = await cluster.PublishAsync("r1h.event", $"x{i}"); + ack.ErrorCode.ShouldBeNull(); + } + + var state = await cluster.GetStreamStateAsync("R1HUGE"); + state.Messages.ShouldBe(10000UL); + state.LastSeq.ShouldBe(10000UL); + } + + // --------------------------------------------------------------- + // Go ref: TestJetStreamClusterLongMaxMessagesLimit — jetstream_cluster_long_test.go + // --------------------------------------------------------------- + + [Fact] + [Trait("Category", "LongRunning")] + public async Task Stream_with_max_messages_100_has_exactly_100_after_1000_publishes() + { + // Go ref: TestJetStreamClusterLongMaxMessagesLimit — jetstream_cluster_long_test.go + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + var cfg = new StreamConfig + { + Name = "MAXLIMIT", + Subjects = ["ml.>"], + Replicas = 3, + MaxMsgs = 100, + }; + cluster.CreateStreamDirect(cfg); + + for (var i = 0; i < 1000; i++) + await cluster.PublishAsync("ml.event", $"msg-{i}"); + + var state = await cluster.GetStreamStateAsync("MAXLIMIT"); + // MaxMsgs=100: only the latest 100 messages retained (old ones discarded) + state.Messages.ShouldBeLessThanOrEqualTo(100UL); + state.Messages.ShouldBeGreaterThan(0UL); + } + + // --------------------------------------------------------------- + // Go ref: TestJetStreamClusterLongConsumerPendingWithMaxMessages — jetstream_cluster_long_test.go + // --------------------------------------------------------------- + + [Fact] + [Trait("Category", "LongRunning")] + public async Task Consumer_on_max_messages_stream_tracks_correct_pending() + { + // Go ref: TestJetStreamClusterLongConsumerPendingWithMaxMessages — jetstream_cluster_long_test.go + await using var cluster = await JetStreamClusterFixture.StartAsync(3); + + var cfg = new StreamConfig + { + Name = "MAXPEND", + Subjects = ["mp.>"], + Replicas = 3, + MaxMsgs = 50, + }; + cluster.CreateStreamDirect(cfg); + + // Publish 200 messages (150 will be evicted by MaxMsgs) + for (var i = 0; i < 200; i++) + await cluster.PublishAsync("mp.event", $"msg-{i}"); + + var state = await cluster.GetStreamStateAsync("MAXPEND"); + // Stream retains at most 50 messages + state.Messages.ShouldBeLessThanOrEqualTo(50UL); + + // Create consumer after publishes (starts at current first seq) + await cluster.CreateConsumerAsync("MAXPEND", "latecons", filterSubject: "mp.>", + ackPolicy: AckPolicy.None); + + var batch = await cluster.FetchAsync("MAXPEND", "latecons", 100); + // Consumer should see only retained messages + ((ulong)batch.Messages.Count).ShouldBeLessThanOrEqualTo(state.Messages); + } +}