// Copyright 2020-2025 The NATS Authors // Licensed under the Apache License, Version 2.0 // // Ported from golang/nats-server/server/jetstream_cluster_1_test.go // These tests require a running JetStream cluster. They are skipped unless // NATS_INTEGRATION_TESTS=true is set in the environment. using Xunit.Sdk; namespace ZB.MOM.NatsNet.Server.IntegrationTests.JetStream; /// /// Integration tests for JetStream cluster functionality: cluster formation, /// stream replication, consumer state, leader election, and catchup. /// Ported from Go's TestJetStreamCluster* tests (first 118). /// [Trait("Category", "Integration")] public class JetStreamCluster1Tests : IntegrationTestBase { public JetStreamCluster1Tests(ITestOutputHelper output) : base(output) { } // ----------------------------------------------------------------------- // 1. TestJetStreamClusterConfig // ----------------------------------------------------------------------- [SkippableFact] public void ClusterConfig_ShouldRequireServerNameAndClusterName() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); // Verifies that a JetStream cluster node requires server_name and cluster.name. // Corresponds to Go TestJetStreamClusterConfig. using var c = TestCluster.CreateJetStreamCluster(1, "JSC"); c.WaitOnClusterReady(); } // ----------------------------------------------------------------------- // 2. TestJetStreamClusterLeader // ----------------------------------------------------------------------- [SkippableFact] public void ClusterLeader_ShouldElectNewLeaderAfterShutdown() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "JSC"); c.WaitOnLeader(); var leader = c.Leader(); // Kill leader — new leader should be elected. // Kill again — no leader (loss of quorum). c.WaitOnLeader(); } // ----------------------------------------------------------------------- // 3. TestJetStreamClusterExpand // ----------------------------------------------------------------------- [SkippableFact] public void ClusterExpand_ShouldAllowAddingNewServer() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(2, "JSC"); c.WaitOnPeerCount(2); // Add a new server and wait for 3-peer cluster. c.WaitOnPeerCount(3); } // ----------------------------------------------------------------------- // 4. TestJetStreamClusterAccountInfo // ----------------------------------------------------------------------- [SkippableFact] public void ClusterAccountInfo_ShouldReturnSingleResponse() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "JSC"); c.WaitOnLeader(); // Connect and send $JS.API.INFO, expect exactly one response. using var nc = NatsTestClient.ConnectToServer(c.RandomServer()); } // ----------------------------------------------------------------------- // 5. TestJetStreamClusterStreamLimitWithAccountDefaults // ----------------------------------------------------------------------- [SkippableFact] public void ClusterStreamLimitWithAccountDefaults_ShouldEnforceStorageLimits() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); // 2MB memory, 8MB disk limits template. using var c = TestCluster.CreateJetStreamClusterWithTemplate(ConfigHelper.JsClusterTemplate, 3, "R3L"); c.WaitOnLeader(); } // ----------------------------------------------------------------------- // 6. TestJetStreamClusterInfoRaftGroup // ----------------------------------------------------------------------- [SkippableFact] public void ClusterInfoRaftGroup_ShouldIncludeRaftGroupInStreamAndConsumerInfo() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R1S"); c.WaitOnLeader(); // Verify stream info and consumer info include cluster.raft_group field. } // ----------------------------------------------------------------------- // 7. TestJetStreamClusterSingleReplicaStreams // ----------------------------------------------------------------------- [SkippableFact] public void ClusterSingleReplicaStreams_ShouldSurviveLeaderRestart() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R1S"); c.WaitOnLeader(); // Create R=1 stream, publish 10 msgs, kill stream leader, restart, verify stream and consumer still exist. c.WaitOnStreamLeader("$G", "TEST"); } // ----------------------------------------------------------------------- // 8. TestJetStreamClusterMultiReplicaStreams // ----------------------------------------------------------------------- [SkippableFact] public void ClusterMultiReplicaStreams_ShouldReplicateAcrossCluster() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(5, "RNS"); c.WaitOnLeader(); // Create R=3 stream in 5-node cluster, publish 10 msgs, verify state. } // ----------------------------------------------------------------------- // 9. TestJetStreamClusterMultiReplicaStreamsDefaultFileMem // ----------------------------------------------------------------------- [SkippableFact] public void ClusterMultiReplicaStreamsDefaultFileMem_ShouldUseFileStorageByDefault() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); const string testConfig = @" listen: 127.0.0.1:-1 server_name: %s jetstream: {store_dir: '%s'} cluster { name: %s listen: 127.0.0.1:%d routes = [%s] } "; using var c = TestCluster.CreateJetStreamClusterWithTemplate(testConfig, 3, "RNS"); c.WaitOnLeader(); } // ----------------------------------------------------------------------- // 10. TestJetStreamClusterMemoryStore // ----------------------------------------------------------------------- [SkippableFact] public void ClusterMemoryStore_ShouldReplicateMemoryStoredMessages() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3M"); c.WaitOnLeader(); // Create R=3 memory stream, publish 100 msgs, verify cluster info has 2 replicas. } // ----------------------------------------------------------------------- // 11. TestJetStreamClusterDelete // ----------------------------------------------------------------------- [SkippableFact] public void ClusterDelete_ShouldRemoveConsumerAndStream() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "RNS"); c.WaitOnLeader(); // Create stream C22 R=2, add consumer, delete consumer, delete stream, verify account info shows 0 streams. } // ----------------------------------------------------------------------- // 12. TestJetStreamClusterStreamPurge // ----------------------------------------------------------------------- [SkippableFact] public void ClusterStreamPurge_ShouldClearAllMessages() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(5, "R5S"); c.WaitOnLeader(); // Create R=3 stream in 5-node cluster, publish 100, purge, verify state shows 0 msgs. } // ----------------------------------------------------------------------- // 13. TestJetStreamClusterStreamUpdateSubjects // ----------------------------------------------------------------------- [SkippableFact] public void ClusterStreamUpdateSubjects_ShouldUpdateSubjectsSuccessfully() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Create stream on {foo, bar}, update to {bar, baz}, verify foo publish fails, baz succeeds. } // ----------------------------------------------------------------------- // 14. TestJetStreamClusterBadStreamUpdate // ----------------------------------------------------------------------- [SkippableFact] public void ClusterBadStreamUpdate_ShouldNotDeleteStreamOnBadConfig() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Attempt to update stream with invalid subject "foo..bar", verify original stream preserved. } // ----------------------------------------------------------------------- // 15. TestJetStreamClusterConsumerRedeliveredInfo // ----------------------------------------------------------------------- [SkippableFact] public void ClusterConsumerRedeliveredInfo_ShouldTrackRedeliveredCount() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Publish 1 msg, subscribe with AckWait=100ms, auto-unsubscribe after 2, // verify NumRedelivered == 1 in ConsumerInfo. } // ----------------------------------------------------------------------- // 16. TestJetStreamClusterConsumerState // ----------------------------------------------------------------------- [SkippableFact] public void ClusterConsumerState_ShouldPreserveStateAfterLeaderChange() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(5, "R3S"); c.WaitOnLeader(); // Create R=3 stream, publish 10, pull-subscribe with "dlc", fetch 5 + ack, // kill consumer leader, wait for new leader, verify AckFloor matches. } // ----------------------------------------------------------------------- // 17. TestJetStreamClusterFullConsumerState // ----------------------------------------------------------------------- [SkippableFact] public void ClusterFullConsumerState_ShouldHandlePurgeWithActiveConsumer() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Create R=3 stream, publish 10, pull-subscribe, fetch 1, then purge stream. } // ----------------------------------------------------------------------- // 18. TestJetStreamClusterMetaSnapshotsAndCatchup // ----------------------------------------------------------------------- [SkippableFact] public void ClusterMetaSnapshotsAndCatchup_ShouldCatchupAfterRestart() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Shut one server, create 4 streams, snapshot meta, restart server, // wait for current, delete streams, restart again, verify catchup. } // ----------------------------------------------------------------------- // 19. TestJetStreamClusterMetaSnapshotsMultiChange // ----------------------------------------------------------------------- [SkippableFact] public void ClusterMetaSnapshotsMultiChange_ShouldHandleComplexDeltasOnRestart() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(2, "R3S"); c.WaitOnLeader(); // Add streams/consumers, add new server, shut it, make changes (add S3, delete S2, // delete S1C1, add S1C2), snapshot, restart, verify all current. } // ----------------------------------------------------------------------- // 20. TestJetStreamClusterStreamSynchedTimeStamps // ----------------------------------------------------------------------- [SkippableFact] public void ClusterStreamSynchedTimeStamps_ShouldMaintainTimestampAfterLeaderChange() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Publish to R=3 memory stream, record timestamp, kill stream leader, // fetch msg from new leader, verify timestamps match. } // ----------------------------------------------------------------------- // 21. TestJetStreamClusterRestoreSingleConsumer // ----------------------------------------------------------------------- [SkippableFact] public void ClusterRestoreSingleConsumer_ShouldRestoreAfterFullClusterRestart() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Create stream, publish, subscribe durable, ack, stop all, restart all, // verify stream and consumer are restored. } // ----------------------------------------------------------------------- // 22. TestJetStreamClusterMaxBytesForStream // ----------------------------------------------------------------------- [SkippableFact] public void ClusterMaxBytesForStream_ShouldEnforcePerServerStorageLimit() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Create R=2 stream with MaxBytes=2GB (ok), then try 4GB (should fail: no suitable peers). } // ----------------------------------------------------------------------- // 23. TestJetStreamClusterStreamPublishWithActiveConsumers // ----------------------------------------------------------------------- [SkippableFact] public void ClusterStreamPublishWithActiveConsumers_ShouldDeliverInOrderAfterLeaderChange() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Create R=3 stream, subscribe durable, publish 10 in sequence, verify order, // kill consumer leader, publish 10 more, verify order continues. } // ----------------------------------------------------------------------- // 24. TestJetStreamClusterStreamOverlapSubjects // ----------------------------------------------------------------------- [SkippableFact] public void ClusterStreamOverlapSubjects_ShouldPreventOverlappingSubjects() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3"); c.WaitOnLeader(); // Create TEST on "foo", try to create TEST2 on "foo" — should fail. // Verify only 1 stream in list. } // ----------------------------------------------------------------------- // 25. TestJetStreamClusterStreamInfoList // ----------------------------------------------------------------------- [SkippableFact] public void ClusterStreamInfoList_ShouldReturnCorrectMsgCountsForAllStreams() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Create foo(10), bar(22), baz(33), verify StreamsInfo returns correct counts. } // ----------------------------------------------------------------------- // 26. TestJetStreamClusterConsumerInfoList // ----------------------------------------------------------------------- [SkippableFact] public void ClusterConsumerInfoList_ShouldReturnCorrectConsumerStates() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Create R=3 stream, publish 10, create 3 pull consumers with different // fetch/ack combos, verify ConsumersInfo returns correct delivered/ackfloor. } // ----------------------------------------------------------------------- // 27. TestJetStreamClusterStreamUpdate // ----------------------------------------------------------------------- [SkippableFact] public void ClusterStreamUpdate_ShouldUpdateMaxMsgsSuccessfully() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Create stream MaxMsgs=10, fill it, expect publish failure. // Update MaxMsgs=20 from non-leader, verify success. // Attempt bad update (name mismatch), verify only 1 response. } // ----------------------------------------------------------------------- // 28. TestJetStreamClusterStreamExtendedUpdates // ----------------------------------------------------------------------- [SkippableFact] public void ClusterStreamExtendedUpdates_ShouldAllowSubjectUpdateButNotMirrorChange() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Subjects can be updated. Mirror changes should return JSStreamMirrorNotUpdatableError. } // ----------------------------------------------------------------------- // 29. TestJetStreamClusterDoubleAdd // ----------------------------------------------------------------------- [SkippableFact] public void ClusterDoubleAdd_ShouldBeIdempotentForStreamsAndConsumers() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(2, "R32"); c.WaitOnLeader(); // Add stream twice — should not error. Add consumer twice — should not error. } // ----------------------------------------------------------------------- // 30. TestJetStreamClusterDefaultMaxAckPending // ----------------------------------------------------------------------- [SkippableFact] public void ClusterDefaultMaxAckPending_ShouldSetDefaultAckPendingOnConsumer() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(2, "R32"); c.WaitOnLeader(); // Create consumer with default config, verify MaxAckPending == JsDefaultMaxAckPending (20000). } // ----------------------------------------------------------------------- // 31. TestJetStreamClusterStreamNormalCatchup // ----------------------------------------------------------------------- [SkippableFact] public void ClusterStreamNormalCatchup_ShouldCatchupAfterRejoiningCluster() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Publish 10 msgs, kill stream leader, publish 11 more, delete one, // restart old leader, wait for cluster formed + current. } // ----------------------------------------------------------------------- // 32. TestJetStreamClusterStreamSnapshotCatchup // ----------------------------------------------------------------------- [SkippableFact] public void ClusterStreamSnapshotCatchup_ShouldCatchupViaSnapshotAfterRejoining() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Publish 2, kill stream leader, publish 100, delete 2 msgs, snapshot, // send more, restart old leader, wait for current, verify states match. } // ----------------------------------------------------------------------- // 33. TestJetStreamClusterDeleteMsg // ----------------------------------------------------------------------- [SkippableFact] public void ClusterDeleteMsg_ShouldDeleteMessageAndSupportPurge() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Create R=1 stream, publish 10, delete seq 1, purge stream — all should succeed. } // ----------------------------------------------------------------------- // 34. TestJetStreamClusterDeleteMsgAndRestart // ----------------------------------------------------------------------- [SkippableFact] public void ClusterDeleteMsgAndRestart_ShouldSurviveFullRestart() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Create R=2 stream, publish 10, delete seq 1, stop all, restart all, // wait for stream leader. } // ----------------------------------------------------------------------- // 35. TestJetStreamClusterStreamSnapshotCatchupWithPurge // ----------------------------------------------------------------------- [SkippableFact] public void ClusterStreamSnapshotCatchupWithPurge_ShouldHandlePurgeDuringCatchup() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(5, "R5S"); c.WaitOnLeader(); // Kill stream leader, publish 10, snapshot, restart old leader, // purge while recovering, wait for current, verify stream info available. } // ----------------------------------------------------------------------- // 36. TestJetStreamClusterExtendedStreamInfo // ----------------------------------------------------------------------- [SkippableFact] public void ClusterExtendedStreamInfo_ShouldIncludeClusterInfoAndReplicas() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Create R=3 stream, publish 50, verify StreamInfo contains cluster info, // cluster name, leader, and 2 replicas. Replicas must be ordered. // Kill leader, verify info still correct, restart, verify current. } // ----------------------------------------------------------------------- // 37. TestJetStreamClusterExtendedStreamInfoSingleReplica // ----------------------------------------------------------------------- [SkippableFact] public void ClusterExtendedStreamInfoSingleReplica_ShouldShowNoReplicasForR1Stream() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Create R=1 stream, verify cluster info shows 0 replicas. // Verify ConsumersInfo returns 0 initially, 1 after adding consumer. } // ----------------------------------------------------------------------- // 38. TestJetStreamClusterInterestRetention // ----------------------------------------------------------------------- [SkippableFact] public void ClusterInterestRetention_ShouldDeleteMsgsAfterAckWithInterestPolicy() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Create Interest-retention R=3 stream, subscribe durable, publish 1, ack, // verify stream goes to 0. Publish 50 more, delete consumer, verify stream goes to 0. } // ----------------------------------------------------------------------- // 39. TestJetStreamClusterWorkQueueRetention // ----------------------------------------------------------------------- [SkippableFact] public void ClusterWorkQueueRetention_ShouldRemoveMsgsAfterAckInWorkQueueMode() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Create WorkQueue R=2 stream, publish 1, pull and ack, verify stream goes to 0. } // ----------------------------------------------------------------------- // 40. TestJetStreamClusterMirrorAndSourceWorkQueues // ----------------------------------------------------------------------- [SkippableFact] public void ClusterMirrorAndSourceWorkQueues_ShouldMirrorWorkQueueMessages() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "WQ"); c.WaitOnLeader(); // Create WQ22 WorkQueue, M mirror, S source. Publish 1 to WQ22. // Verify WQ22=0, M=1, S=1 (because mirror/source consume from work queue). } // ----------------------------------------------------------------------- // 41. TestJetStreamClusterMirrorAndSourceInterestPolicyStream // ----------------------------------------------------------------------- [SkippableFact] public void ClusterMirrorAndSourceInterestPolicyStream_ShouldHandleInterestPolicyWithMirrorAndSource() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "WQ"); c.WaitOnLeader(); // Create IP22 Interest stream, mirror M, source S. // Without other interest: IP22=0, M=1, S=1. // After adding subscriber: IP22=1, M=2, S=2. } // ----------------------------------------------------------------------- // 42. TestJetStreamClusterInterestRetentionWithFilteredConsumers // ----------------------------------------------------------------------- [SkippableFact] public void ClusterInterestRetentionWithFilteredConsumers_ShouldTrackPerFilteredConsumer() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Create Interest stream on "*", two filtered consumers (foo, bar). // Verify messages are retained until all consumers ack. // Delete consumers, verify stream goes to 0. // Test same with pull consumer. } // ----------------------------------------------------------------------- // 43. TestJetStreamClusterEphemeralConsumerNoImmediateInterest // ----------------------------------------------------------------------- [SkippableFact] public void ClusterEphemeralConsumerNoImmediateInterest_ShouldCleanUpWithoutActiveSubscriber() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Create ephemeral consumer with deliver subject "r", set inactive threshold to 500ms, // verify consumer disappears within 5s. } // ----------------------------------------------------------------------- // 44. TestJetStreamClusterEphemeralConsumerCleanup // ----------------------------------------------------------------------- [SkippableFact] public void ClusterEphemeralConsumerCleanup_ShouldRemoveConsumerOnUnsubscribe() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Create R=2 stream, subscribe (ephemeral), set inactive threshold to 10ms, // verify 1 consumer. Unsubscribe, verify consumer removed within 2s. } // ----------------------------------------------------------------------- // 45. TestJetStreamClusterEphemeralConsumersNotReplicated // ----------------------------------------------------------------------- [SkippableFact] public void ClusterEphemeralConsumersNotReplicated_ShouldBeR1Only() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Create R=3 stream, ephemeral subscribe, verify consumer cluster has 0 replicas (R=1). // Shut consumer server, verify optimistic delivery may fail (logged, not fatal). } // ----------------------------------------------------------------------- // 46. TestJetStreamClusterUserSnapshotAndRestore // ----------------------------------------------------------------------- [SkippableFact] public void ClusterUserSnapshotAndRestore_ShouldRestoreStreamWithConsumerState() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Create R=2 stream, publish 200, create 2 consumers with partial ack state, // snapshot, delete stream, restore, verify message count and consumer state. } // ----------------------------------------------------------------------- // 47. TestJetStreamClusterUserSnapshotAndRestoreConfigChanges // ----------------------------------------------------------------------- [SkippableFact] public void ClusterUserSnapshotAndRestoreConfigChanges_ShouldAllowConfigChangesOnRestore() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Snapshot R=2 stream, delete it, restore with different subjects/storage/replicas. } // ----------------------------------------------------------------------- // 48. TestJetStreamClusterAccountInfoAndLimits // ----------------------------------------------------------------------- [SkippableFact] public void ClusterAccountInfoAndLimits_ShouldEnforceStreamAndConsumerLimits() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(5, "R5S"); c.WaitOnLeader(); // Set limits: 1024 mem, 8000 store, 3 streams, 2 consumers. // Create 3 streams, verify 4th fails. Verify store enforcement. // Create 2 consumers (with idempotent create), verify 3rd fails. } // ----------------------------------------------------------------------- // 49. TestJetStreamClusterMaxStreamsReached // ----------------------------------------------------------------------- [SkippableFact] public void ClusterMaxStreamsReached_ShouldAllowIdempotentCreateUnderLimit() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // MaxStreams=1: 15 parallel creates of same stream — all succeed (idempotent). // MaxStreams=2, 2 existing streams: 15 parallel creates alternating — all succeed. } // ----------------------------------------------------------------------- // 50. TestJetStreamClusterStreamLimits // ----------------------------------------------------------------------- [SkippableFact] public void ClusterStreamLimits_ShouldEnforceMaxMsgSizeAndMaxMsgsAndMaxAge() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // R=5 fails on 3-node. R=3 stream with MaxMsgSize=11, MaxMsgs=5, MaxAge=250ms, DiscardNew. // Large msg fails, 5 msgs ok, 6th fails. After age expires, msgs=0, publish succeeds. } // ----------------------------------------------------------------------- // 51. TestJetStreamClusterStreamInterestOnlyPolicy // ----------------------------------------------------------------------- [SkippableFact] public void ClusterStreamInterestOnlyPolicy_ShouldNotRetainMsgsWithoutInterest() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Publish 10 without consumer → stream stays at 0. // Add consumer, publish 10 → stream has 10. Delete consumer → stream goes to 0. } // ----------------------------------------------------------------------- // 52. TestJetStreamClusterExtendedAccountInfo // ----------------------------------------------------------------------- [SkippableFact] public void ClusterExtendedAccountInfo_ShouldTrackStreamsConsumersAndApiErrors() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Create 3 streams with consumers, verify AccountInfo shows 3 streams, 3 consumers, >=7 API calls. // Make 4 bad API calls, verify Errors==4. } // ----------------------------------------------------------------------- // 53. TestJetStreamClusterPeerRemovalAPI // ----------------------------------------------------------------------- [SkippableFact] public void ClusterPeerRemovalApi_ShouldRemovePeerViaApi() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(5, "R5S"); c.WaitOnLeader(); // Unknown peer removal → error. Valid peer removal → success + advisory published. // Verify peer removed from cluster peers within 5s. } // ----------------------------------------------------------------------- // 54. TestJetStreamClusterPeerRemovalAndStreamReassignment // ----------------------------------------------------------------------- [SkippableFact] public void ClusterPeerRemovalAndStreamReassignment_ShouldReassignStreamAfterPeerRemoval() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(5, "R5S"); c.WaitOnLeader(); // Create R=3 stream, remove one non-leader stream peer via API, // verify stream still has 2 current replicas (none is the removed server). } // ----------------------------------------------------------------------- // 55. TestJetStreamClusterPeerRemovalAndStreamReassignmentWithoutSpace // ----------------------------------------------------------------------- [SkippableFact] public void ClusterPeerRemovalAndStreamReassignmentWithoutSpace_ShouldHandleInsufficientPeers() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // R=3 stream in 3-node cluster, remove one peer — cluster goes to 2 nodes. // Stream should scale down to R=2 (no space for R=3). } // ----------------------------------------------------------------------- // 56. TestJetStreamClusterPeerRemovalAndServerBroughtBack // ----------------------------------------------------------------------- [SkippableFact] public void ClusterPeerRemovalAndServerBroughtBack_ShouldHandleServerReintroduction() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Remove server from cluster, bring it back, verify peer count restored to 3. } // ----------------------------------------------------------------------- // 57. TestJetStreamClusterPeerExclusionTag // ----------------------------------------------------------------------- [SkippableFact] public void ClusterPeerExclusionTag_ShouldExcludeTaggedPeersFromPlacement() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Stream with placement excluding a tag should not place on tagged server. } // ----------------------------------------------------------------------- // 58. TestJetStreamClusterAccountPurge // ----------------------------------------------------------------------- [SkippableFact] public void ClusterAccountPurge_ShouldDeleteAllStreamsAndConsumersForAccount() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Create several streams with consumers, purge account via $JS.API.ACCOUNT.PURGE, // verify all streams/consumers removed. } // ----------------------------------------------------------------------- // 59. TestJetStreamClusterScaleConsumer // ----------------------------------------------------------------------- [SkippableFact] public void ClusterScaleConsumer_ShouldScaleConsumerReplicasUpAndDown() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "C"); c.WaitOnLeader(); // Create R=3 stream + durable consumer, publish 1000 msgs, // scale consumer down to 1, up to 3, down to 1, up to 0 (inherit from stream), // consuming one msg between each scale, verify state consistency throughout. } // ----------------------------------------------------------------------- // 60. TestJetStreamClusterConsumerScaleUp // ----------------------------------------------------------------------- [SkippableFact] public void ClusterConsumerScaleUp_ShouldMaintainConsumerLeadershipAfterStreamScaleUp() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "HUB"); c.WaitOnLeader(); // Create R=1 stream + durable R=0 consumer, publish 100 msgs, // scale stream to R=2, wait 2s, verify consumer leader still present. } // ----------------------------------------------------------------------- // 61. TestJetStreamClusterPeerOffline // ----------------------------------------------------------------------- [SkippableFact] public void ClusterPeerOffline_ShouldMarkServerOfflineAndOnlineCorrectly() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(5, "R5S"); c.WaitOnLeader(); // Shut a non-leader, verify it shows as offline. Restart it, verify online again. } // ----------------------------------------------------------------------- // 62. TestJetStreamClusterNoQuorumStepdown // ----------------------------------------------------------------------- [SkippableFact] public void ClusterNoQuorumStepdown_ShouldStepDownLeaderWhenQuorumLost() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Shut 2 of 3 servers, verify meta leader steps down (no quorum). } // ----------------------------------------------------------------------- // 63. TestJetStreamClusterCreateResponseAdvisoriesHaveSubject // ----------------------------------------------------------------------- [SkippableFact] public void ClusterCreateResponseAdvisoriesHaveSubject_ShouldIncludeSubjectInAdvisories() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Subscribe to $JS.EVENT.ADVISORY.API.>, create stream, verify advisory has subject field set. } // ----------------------------------------------------------------------- // 64. TestJetStreamClusterRestartAndRemoveAdvisories // ----------------------------------------------------------------------- [SkippableFact] public void ClusterRestartAndRemoveAdvisories_ShouldNotSendAdvisoriesForRemovedOnRestart() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Create/delete streams, restart cluster, verify no spurious create/delete advisories on restart. } // ----------------------------------------------------------------------- // 65. TestJetStreamClusterNoDuplicateOnNodeRestart // ----------------------------------------------------------------------- [SkippableFact] public void ClusterNoDuplicateOnNodeRestart_ShouldNotDeliverDuplicateMessagesOnRestart() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Subscribe sync, publish, receive, restart node, verify no duplicate delivery. } // ----------------------------------------------------------------------- // 66. TestJetStreamClusterNoDupePeerSelection // ----------------------------------------------------------------------- [SkippableFact] public void ClusterNoDupePeerSelection_ShouldNotSelectSamePeerTwiceForConsumer() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Create R=3 stream, create R=3 consumer, verify consumer cluster has distinct peers. } // ----------------------------------------------------------------------- // 67. TestJetStreamClusterStreamRemovePeer // ----------------------------------------------------------------------- [SkippableFact] public void ClusterStreamRemovePeer_ShouldReassignStreamAfterPeerRemoval() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(5, "R5S"); c.WaitOnLeader(); // Remove stream peer via $JS.API.STREAM.PEER.REMOVE, verify stream reassigned to new peer. } // ----------------------------------------------------------------------- // 68. TestJetStreamClusterStreamLeaderStepDown // ----------------------------------------------------------------------- [SkippableFact] public void ClusterStreamLeaderStepDown_ShouldElectNewStreamLeaderAfterStepDown() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Step down stream leader, verify new leader elected and stream still accessible. } // ----------------------------------------------------------------------- // 69. TestJetStreamClusterRemoveServer // ----------------------------------------------------------------------- [SkippableFact] public void ClusterRemoveServer_ShouldRebalanceStreamsAfterServerRemoval() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(5, "R5S"); c.WaitOnLeader(); // Remove server, verify streams are rebalanced, no stale peer references remain. } // ----------------------------------------------------------------------- // 70. TestJetStreamClusterPurgeReplayAfterRestart // ----------------------------------------------------------------------- [SkippableFact] public void ClusterPurgeReplayAfterRestart_ShouldReplayPurgeAfterRestart() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Publish 10, purge, restart cluster, verify stream is still empty. } // ----------------------------------------------------------------------- // 71. TestJetStreamClusterStreamGetMsg // ----------------------------------------------------------------------- [SkippableFact] public void ClusterStreamGetMsg_ShouldGetMessageBySequenceFromCluster() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Publish a msg, get it via $JS.API.STREAM.MSG.GET, verify data matches. } // ----------------------------------------------------------------------- // 72. TestJetStreamClusterStreamDirectGetMsg // ----------------------------------------------------------------------- [SkippableFact] public void ClusterStreamDirectGetMsg_ShouldSupportDirectGetFromReplica() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Create R=3 stream with AllowDirect=true, publish, direct-get from replica. } // ----------------------------------------------------------------------- // 73. TestJetStreamClusterStreamPerf // ----------------------------------------------------------------------- [SkippableFact] public void ClusterStreamPerf_ShouldPublishAndReceiveAllMessagesWithinTimeout() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Publish 5000 msgs to R=3 stream, verify all received by pull consumer. } // ----------------------------------------------------------------------- // 74. TestJetStreamClusterConsumerPerf // ----------------------------------------------------------------------- [SkippableFact] public void ClusterConsumerPerf_ShouldDeliverAllMessagesToPushConsumer() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Publish 5000 msgs to R=3 stream, push-consume all, verify count. } // ----------------------------------------------------------------------- // 75. TestJetStreamClusterQueueSubConsumer // ----------------------------------------------------------------------- [SkippableFact] public void ClusterQueueSubConsumer_ShouldDeliverExactlyOnceAcrossQueueGroup() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Create stream, subscribe with queue group, publish 100 msgs, // verify each msg delivered exactly once across all queue members. } // ----------------------------------------------------------------------- // 76. TestJetStreamClusterLeaderStepdown // ----------------------------------------------------------------------- [SkippableFact] public void ClusterLeaderStepdown_ShouldElectNewMetaLeaderAfterStepDown() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Request meta leader stepdown, verify new leader elected. } // ----------------------------------------------------------------------- // 77. TestJetStreamClusterSourcesFilteringAndUpdating // ----------------------------------------------------------------------- [SkippableFact] public void ClusterSourcesFilteringAndUpdating_ShouldFilterSourcesBySubjectAndSupportUpdate() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Create source stream, create stream with filtered source, verify filtering, // update source filter, verify updated behavior. } // ----------------------------------------------------------------------- // 78. TestJetStreamClusterSourcesUpdateOriginError // ----------------------------------------------------------------------- [SkippableFact] public void ClusterSourcesUpdateOriginError_ShouldReportErrorWhenSourceOriginChanges() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Create stream with source, update to change source origin — should error. } // ----------------------------------------------------------------------- // 79. TestJetStreamClusterMirrorAndSourcesClusterRestart // ----------------------------------------------------------------------- [SkippableFact] public void ClusterMirrorAndSourcesClusterRestart_ShouldContinueAfterRestart() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Create mirror and source streams, publish, restart cluster, verify counts preserved. } // ----------------------------------------------------------------------- // 80. TestJetStreamClusterMirrorAndSourcesFilteredConsumers // ----------------------------------------------------------------------- [SkippableFact] public void ClusterMirrorAndSourcesFilteredConsumers_ShouldWorkWithFilteredConsumers() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Create mirror with filtered subjects, consume from mirror with consumer filter. } // ----------------------------------------------------------------------- // 81. TestJetStreamClusterCrossAccountMirrorsAndSources // ----------------------------------------------------------------------- [SkippableFact] public void ClusterCrossAccountMirrorsAndSources_ShouldMirrorAcrossAccounts() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamClusterWithTemplate(ConfigHelper.JsClusterAccountsTemplate, 3, "R3S"); c.WaitOnLeader(); // Create stream in account ONE, mirror in account TWO, verify mirror receives msgs. } // ----------------------------------------------------------------------- // 82. TestJetStreamClusterFailMirrorsAndSources // ----------------------------------------------------------------------- [SkippableFact] public void ClusterFailMirrorsAndSources_ShouldFailGracefullyOnInvalidMirrorOrSource() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Attempt to create mirror/source on non-existent stream — should get error response. } // ----------------------------------------------------------------------- // 83. TestJetStreamClusterConsumerDeliveredSyncReporting // ----------------------------------------------------------------------- [SkippableFact] public void ClusterConsumerDeliveredSyncReporting_ShouldReportDeliveredSequenceAccurately() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Push consumer, publish msgs, verify Delivered.Stream and Delivered.Consumer in sync. } // ----------------------------------------------------------------------- // 84. TestJetStreamClusterConsumerAckSyncReporting // ----------------------------------------------------------------------- [SkippableFact] public void ClusterConsumerAckSyncReporting_ShouldReportAckFloorAccurately() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Pull consumer, fetch and ack, verify AckFloor.Stream and AckFloor.Consumer in sync. } // ----------------------------------------------------------------------- // 85. TestJetStreamClusterConsumerDeleteInterestPolicyMultipleConsumers // ----------------------------------------------------------------------- [SkippableFact] public void ClusterConsumerDeleteInterestPolicyMultipleConsumers_ShouldNotPurgeMsgsWithOtherActiveConsumers() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Interest stream, 2 consumers. Delete one — msgs should remain until other acks too. } // ----------------------------------------------------------------------- // 86. TestJetStreamClusterConsumerAckNoneInterestPolicyShouldNotRetainAfterDelivery // ----------------------------------------------------------------------- [SkippableFact] public void ClusterConsumerAckNoneInterestPolicyShouldNotRetainAfterDelivery_ShouldRemoveMsgsOnDelivery() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // AckNone consumer on Interest stream: msgs removed on delivery without explicit ack. } // ----------------------------------------------------------------------- // 87. TestJetStreamClusterConsumerDeleteAckNoneInterestPolicyWithOthers // ----------------------------------------------------------------------- [SkippableFact] public void ClusterConsumerDeleteAckNoneInterestPolicyWithOthers_ShouldHandleDeleteWithMultipleConsumers() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // AckNone + AckExplicit consumers on Interest stream, delete AckNone consumer, // verify explicit consumer still sees msgs. } // ----------------------------------------------------------------------- // 88. TestJetStreamClusterMetaStepdownFromNonSysAccount // ----------------------------------------------------------------------- [SkippableFact] public void ClusterMetaStepdownFromNonSysAccount_ShouldFailWithPermissionError() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Non-system account attempting meta stepdown should get an error. } // ----------------------------------------------------------------------- // 89. TestJetStreamClusterMaxDeliveriesOnInterestStreams // ----------------------------------------------------------------------- [SkippableFact] public void ClusterMaxDeliveriesOnInterestStreams_ShouldRespectMaxDeliveriesSetting() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Interest stream, consumer MaxDelivers=3, verify msg removed after 3 delivery attempts. } // ----------------------------------------------------------------------- // 90. TestJetStreamClusterMetaRecoveryUpdatesDeletesConsumers // ----------------------------------------------------------------------- [SkippableFact] public void ClusterMetaRecoveryUpdatesDeletesConsumers_ShouldRecoverUpdatedAndDeletedConsumers() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Create, update, delete consumer. Restart. Verify correct final state recovered. } // ----------------------------------------------------------------------- // 91. TestJetStreamClusterMetaRecoveryRecreateFileStreamAsMemory // ----------------------------------------------------------------------- [SkippableFact] public void ClusterMetaRecoveryRecreateFileStreamAsMemory_ShouldRecoverStreamWithChangedStorageType() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Create File stream, delete it, recreate as Memory, restart, verify Memory type recovered. } // ----------------------------------------------------------------------- // 92. TestJetStreamClusterMetaRecoveryConsumerCreateAndRemove // ----------------------------------------------------------------------- [SkippableFact] public void ClusterMetaRecoveryConsumerCreateAndRemove_ShouldRecoverAfterConsumerCreateAndDelete() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Create consumer, delete it, restart, verify consumer is not present. } // ----------------------------------------------------------------------- // 93. TestJetStreamClusterMetaRecoveryAddAndUpdateStream // ----------------------------------------------------------------------- [SkippableFact] public void ClusterMetaRecoveryAddAndUpdateStream_ShouldRecoverUpdatedStreamConfig() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Create stream, update MaxMsgs, restart, verify updated config recovered. } // ----------------------------------------------------------------------- // 94. TestJetStreamClusterConsumerAckOutOfBounds // ----------------------------------------------------------------------- [SkippableFact] public void ClusterConsumerAckOutOfBounds_ShouldHandleOutOfBoundsAckGracefully() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Ack a sequence beyond delivered range — server should not crash, consumer stays healthy. } // ----------------------------------------------------------------------- // 95. TestJetStreamClusterCatchupLoadNextMsgTooManyDeletes // ----------------------------------------------------------------------- [SkippableFact] public void ClusterCatchupLoadNextMsgTooManyDeletes_ShouldCatchupWithHighDensityDeletes() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Publish 1000 msgs, delete 999 of them, kill stream leader, restart, verify catchup. } // ----------------------------------------------------------------------- // 96. TestJetStreamClusterCatchupMustStallWhenBehindOnApplies // ----------------------------------------------------------------------- [SkippableFact] public void ClusterCatchupMustStallWhenBehindOnApplies_ShouldNotOverloadCatchupQueue() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Kill stream replica, flood with messages, restart replica, verify catchup without queue overflow. } // ----------------------------------------------------------------------- // 97. TestJetStreamClusterConsumerInfoAfterCreate // ----------------------------------------------------------------------- [SkippableFact] public void ClusterConsumerInfoAfterCreate_ShouldReturnConsumerInfoImmediatelyAfterCreate() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Create consumer, immediately request ConsumerInfo, verify info is available. } // ----------------------------------------------------------------------- // 98. TestJetStreamClusterStreamUpscalePeersAfterDownscale // ----------------------------------------------------------------------- [SkippableFact] public void ClusterStreamUpscalePeersAfterDownscale_ShouldRestoreAllPeersOnUpscale() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(5, "R5S"); c.WaitOnLeader(); // Scale stream from R=3 to R=1, then back to R=3, verify 3 distinct current peers. } // ----------------------------------------------------------------------- // 99. TestJetStreamClusterClearAllPreAcksOnRemoveMsg // ----------------------------------------------------------------------- [SkippableFact] public void ClusterClearAllPreAcksOnRemoveMsg_ShouldClearPreAcksWhenMessageRemoved() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Interest stream, pre-ack msg, then delete it via DeleteMsg, verify pre-ack state cleared. } // ----------------------------------------------------------------------- // 100. TestJetStreamClusterStreamHealthCheckMustNotRecreate // ----------------------------------------------------------------------- [SkippableFact] public void ClusterStreamHealthCheckMustNotRecreate_ShouldNotRecreateExistingStream() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Health check on existing stream must not trigger recreation or data loss. } // ----------------------------------------------------------------------- // 101. TestJetStreamClusterStreamHealthCheckMustNotDeleteEarly // ----------------------------------------------------------------------- [SkippableFact] public void ClusterStreamHealthCheckMustNotDeleteEarly_ShouldNotDeleteStreamDuringHealthCheck() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Health check on stream with messages must not trigger premature deletion. } // ----------------------------------------------------------------------- // 102. TestJetStreamClusterStreamHealthCheckOnlyReportsSkew // ----------------------------------------------------------------------- [SkippableFact] public void ClusterStreamHealthCheckOnlyReportsSkew_ShouldOnlyReportSkewNotForceRecovery() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Force skew in stream replica, health check should report skew, not force recreation. } // ----------------------------------------------------------------------- // 103. TestJetStreamClusterStreamHealthCheckStreamCatchup // ----------------------------------------------------------------------- [SkippableFact] public void ClusterStreamHealthCheckStreamCatchup_ShouldTriggerCatchupOnHealthCheckFailure() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Simulate replica behind, health check should trigger catchup. } // ----------------------------------------------------------------------- // 104. TestJetStreamClusterConsumerHealthCheckMustNotRecreate // ----------------------------------------------------------------------- [SkippableFact] public void ClusterConsumerHealthCheckMustNotRecreate_ShouldNotRecreateExistingConsumer() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Health check on existing consumer must not trigger recreation. } // ----------------------------------------------------------------------- // 105. TestJetStreamClusterConsumerHealthCheckMustNotDeleteEarly // ----------------------------------------------------------------------- [SkippableFact] public void ClusterConsumerHealthCheckMustNotDeleteEarly_ShouldNotDeleteActiveConsumer() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Active consumer must not be prematurely deleted by health check. } // ----------------------------------------------------------------------- // 106. TestJetStreamClusterConsumerHealthCheckOnlyReportsSkew // ----------------------------------------------------------------------- [SkippableFact] public void ClusterConsumerHealthCheckOnlyReportsSkew_ShouldNotForceRecreateOnSkew() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Skewed consumer state should be reported, not cause forced recreation. } // ----------------------------------------------------------------------- // 107. TestJetStreamClusterConsumerHealthCheckDeleted // ----------------------------------------------------------------------- [SkippableFact] public void ClusterConsumerHealthCheckDeleted_ShouldCleanUpDeletedConsumerOnHealthCheck() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Deleted consumer should be cleaned up gracefully during health check. } // ----------------------------------------------------------------------- // 108. TestJetStreamClusterRespectConsumerStartSeq // ----------------------------------------------------------------------- [SkippableFact] public void ClusterRespectConsumerStartSeq_ShouldStartDeliveryFromConfiguredSequence() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Create consumer with DeliverByStartSequence=5, verify first delivered msg is seq 5. } // ----------------------------------------------------------------------- // 109. TestJetStreamClusterPeerRemoveStreamConsumerDesync // ----------------------------------------------------------------------- [SkippableFact] public void ClusterPeerRemoveStreamConsumerDesync_ShouldNotDesyncConsumerAfterPeerRemoval() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(5, "R5S"); c.WaitOnLeader(); // Remove a peer from stream, verify consumer state remains in sync with stream. } // ----------------------------------------------------------------------- // 110. TestJetStreamClusterStuckConsumerAfterLeaderChangeWithUnknownDeliveries // ----------------------------------------------------------------------- [SkippableFact] public void ClusterStuckConsumerAfterLeaderChangeWithUnknownDeliveries_ShouldRecoverFromStuckState() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Consumer with in-flight msgs, leader change — consumer should recover without getting stuck. } // ----------------------------------------------------------------------- // 111. TestJetStreamClusterAccountStatsForReplicatedStreams // ----------------------------------------------------------------------- [SkippableFact] public void ClusterAccountStatsForReplicatedStreams_ShouldCountStorageOnceNotPerReplica() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Publish to R=3 stream, verify account stats count storage once (logical), not 3x. } // ----------------------------------------------------------------------- // 112. TestJetStreamClusterRecreateConsumerFromMetaSnapshot // ----------------------------------------------------------------------- [SkippableFact] public void ClusterRecreateConsumerFromMetaSnapshot_ShouldRecreateConsumerFromSnapshot() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Create consumer, snapshot meta, shut server, restore from snapshot, verify consumer exists. } // ----------------------------------------------------------------------- // 113. TestJetStreamClusterUpgradeStreamVersioning // ----------------------------------------------------------------------- [SkippableFact] public void ClusterUpgradeStreamVersioning_ShouldHandleStreamVersionUpgrade() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Simulate stream version upgrade scenario, verify stream accessible after upgrade. } // ----------------------------------------------------------------------- // 114. TestJetStreamClusterUpgradeConsumerVersioning // ----------------------------------------------------------------------- [SkippableFact] public void ClusterUpgradeConsumerVersioning_ShouldHandleConsumerVersionUpgrade() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Simulate consumer version upgrade, verify consumer accessible after upgrade. } // ----------------------------------------------------------------------- // 115. TestJetStreamClusterInterestPolicyAckAll // ----------------------------------------------------------------------- [SkippableFact] public void ClusterInterestPolicyAckAll_ShouldRemoveMsgOnlyAfterAllConsumersAckAll() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Interest stream, AckAll consumer: msg removed only after all consumers AckAll. } // ----------------------------------------------------------------------- // 116. TestJetStreamClusterPreserveRedeliveredWithLaggingStream // ----------------------------------------------------------------------- [SkippableFact] public void ClusterPreserveRedeliveredWithLaggingStream_ShouldPreserveRedeliveredFlagDuringLag() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Consumer with lagging stream: redelivered flag must be preserved across leader changes. } // ----------------------------------------------------------------------- // 117. TestJetStreamClusterInvalidJSACKOverRoute // ----------------------------------------------------------------------- [SkippableFact] public void ClusterInvalidJsAckOverRoute_ShouldHandleInvalidAckGracefully() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // Invalid JSACK sent over a route should not crash the server. } // ----------------------------------------------------------------------- // 118. TestJetStreamClusterConsumerOnlyDeliverMsgAfterQuorum // ----------------------------------------------------------------------- [SkippableFact] public void ClusterConsumerOnlyDeliverMsgAfterQuorum_ShouldNotDeliverBeforeQuorumAchieved() { Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); c.WaitOnLeader(); // R=3 consumer must not deliver msg until quorum (2 of 3) of replicas have acknowledged the entry. } }