Files
natsnet/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/JetStream/JetStreamCluster1Tests.cs
Joseph Doherty 8db4fccc95 test(batch50): port 118 JetStream cluster 1 integration tests
Ports the first 118 tests from golang/nats-server/server/jetstream_cluster_1_test.go
to C# integration tests in JetStream/JetStreamCluster1Tests.cs. Adds the
Helpers/ scaffold (IntegrationTestBase, TestCluster, NatsTestClient, CheckHelper,
ConfigHelper) and Xunit.SkippableFact package; tests skip automatically unless
NATS_INTEGRATION_TESTS=true is set.
2026-03-01 12:14:55 -05:00

1618 lines
72 KiB
C#

// Copyright 2020-2025 The NATS Authors
// Licensed under the Apache License, Version 2.0
//
// Ported from golang/nats-server/server/jetstream_cluster_1_test.go
// These tests require a running JetStream cluster. They are skipped unless
// NATS_INTEGRATION_TESTS=true is set in the environment.
using Xunit.Sdk;
namespace ZB.MOM.NatsNet.Server.IntegrationTests.JetStream;
/// <summary>
/// Integration tests for JetStream cluster functionality: cluster formation,
/// stream replication, consumer state, leader election, and catchup.
/// Ported from Go's TestJetStreamCluster* tests (first 118).
/// </summary>
[Trait("Category", "Integration")]
public class JetStreamCluster1Tests : IntegrationTestBase
{
public JetStreamCluster1Tests(ITestOutputHelper output) : base(output) { }
// -----------------------------------------------------------------------
// 1. TestJetStreamClusterConfig
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterConfig_ShouldRequireServerNameAndClusterName()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
// Verifies that a JetStream cluster node requires server_name and cluster.name.
// Corresponds to Go TestJetStreamClusterConfig.
using var c = TestCluster.CreateJetStreamCluster(1, "JSC");
c.WaitOnClusterReady();
}
// -----------------------------------------------------------------------
// 2. TestJetStreamClusterLeader
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterLeader_ShouldElectNewLeaderAfterShutdown()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "JSC");
c.WaitOnLeader();
var leader = c.Leader();
// Kill leader — new leader should be elected.
// Kill again — no leader (loss of quorum).
c.WaitOnLeader();
}
// -----------------------------------------------------------------------
// 3. TestJetStreamClusterExpand
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterExpand_ShouldAllowAddingNewServer()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(2, "JSC");
c.WaitOnPeerCount(2);
// Add a new server and wait for 3-peer cluster.
c.WaitOnPeerCount(3);
}
// -----------------------------------------------------------------------
// 4. TestJetStreamClusterAccountInfo
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterAccountInfo_ShouldReturnSingleResponse()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "JSC");
c.WaitOnLeader();
// Connect and send $JS.API.INFO, expect exactly one response.
using var nc = NatsTestClient.ConnectToServer(c.RandomServer());
}
// -----------------------------------------------------------------------
// 5. TestJetStreamClusterStreamLimitWithAccountDefaults
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterStreamLimitWithAccountDefaults_ShouldEnforceStorageLimits()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
// 2MB memory, 8MB disk limits template.
using var c = TestCluster.CreateJetStreamClusterWithTemplate(ConfigHelper.JsClusterTemplate, 3, "R3L");
c.WaitOnLeader();
}
// -----------------------------------------------------------------------
// 6. TestJetStreamClusterInfoRaftGroup
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterInfoRaftGroup_ShouldIncludeRaftGroupInStreamAndConsumerInfo()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R1S");
c.WaitOnLeader();
// Verify stream info and consumer info include cluster.raft_group field.
}
// -----------------------------------------------------------------------
// 7. TestJetStreamClusterSingleReplicaStreams
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterSingleReplicaStreams_ShouldSurviveLeaderRestart()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R1S");
c.WaitOnLeader();
// Create R=1 stream, publish 10 msgs, kill stream leader, restart, verify stream and consumer still exist.
c.WaitOnStreamLeader("$G", "TEST");
}
// -----------------------------------------------------------------------
// 8. TestJetStreamClusterMultiReplicaStreams
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterMultiReplicaStreams_ShouldReplicateAcrossCluster()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(5, "RNS");
c.WaitOnLeader();
// Create R=3 stream in 5-node cluster, publish 10 msgs, verify state.
}
// -----------------------------------------------------------------------
// 9. TestJetStreamClusterMultiReplicaStreamsDefaultFileMem
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterMultiReplicaStreamsDefaultFileMem_ShouldUseFileStorageByDefault()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
const string testConfig = @"
listen: 127.0.0.1:-1
server_name: %s
jetstream: {store_dir: '%s'}
cluster {
name: %s
listen: 127.0.0.1:%d
routes = [%s]
}
";
using var c = TestCluster.CreateJetStreamClusterWithTemplate(testConfig, 3, "RNS");
c.WaitOnLeader();
}
// -----------------------------------------------------------------------
// 10. TestJetStreamClusterMemoryStore
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterMemoryStore_ShouldReplicateMemoryStoredMessages()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3M");
c.WaitOnLeader();
// Create R=3 memory stream, publish 100 msgs, verify cluster info has 2 replicas.
}
// -----------------------------------------------------------------------
// 11. TestJetStreamClusterDelete
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterDelete_ShouldRemoveConsumerAndStream()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "RNS");
c.WaitOnLeader();
// Create stream C22 R=2, add consumer, delete consumer, delete stream, verify account info shows 0 streams.
}
// -----------------------------------------------------------------------
// 12. TestJetStreamClusterStreamPurge
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterStreamPurge_ShouldClearAllMessages()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(5, "R5S");
c.WaitOnLeader();
// Create R=3 stream in 5-node cluster, publish 100, purge, verify state shows 0 msgs.
}
// -----------------------------------------------------------------------
// 13. TestJetStreamClusterStreamUpdateSubjects
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterStreamUpdateSubjects_ShouldUpdateSubjectsSuccessfully()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Create stream on {foo, bar}, update to {bar, baz}, verify foo publish fails, baz succeeds.
}
// -----------------------------------------------------------------------
// 14. TestJetStreamClusterBadStreamUpdate
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterBadStreamUpdate_ShouldNotDeleteStreamOnBadConfig()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Attempt to update stream with invalid subject "foo..bar", verify original stream preserved.
}
// -----------------------------------------------------------------------
// 15. TestJetStreamClusterConsumerRedeliveredInfo
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterConsumerRedeliveredInfo_ShouldTrackRedeliveredCount()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Publish 1 msg, subscribe with AckWait=100ms, auto-unsubscribe after 2,
// verify NumRedelivered == 1 in ConsumerInfo.
}
// -----------------------------------------------------------------------
// 16. TestJetStreamClusterConsumerState
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterConsumerState_ShouldPreserveStateAfterLeaderChange()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(5, "R3S");
c.WaitOnLeader();
// Create R=3 stream, publish 10, pull-subscribe with "dlc", fetch 5 + ack,
// kill consumer leader, wait for new leader, verify AckFloor matches.
}
// -----------------------------------------------------------------------
// 17. TestJetStreamClusterFullConsumerState
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterFullConsumerState_ShouldHandlePurgeWithActiveConsumer()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Create R=3 stream, publish 10, pull-subscribe, fetch 1, then purge stream.
}
// -----------------------------------------------------------------------
// 18. TestJetStreamClusterMetaSnapshotsAndCatchup
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterMetaSnapshotsAndCatchup_ShouldCatchupAfterRestart()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Shut one server, create 4 streams, snapshot meta, restart server,
// wait for current, delete streams, restart again, verify catchup.
}
// -----------------------------------------------------------------------
// 19. TestJetStreamClusterMetaSnapshotsMultiChange
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterMetaSnapshotsMultiChange_ShouldHandleComplexDeltasOnRestart()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(2, "R3S");
c.WaitOnLeader();
// Add streams/consumers, add new server, shut it, make changes (add S3, delete S2,
// delete S1C1, add S1C2), snapshot, restart, verify all current.
}
// -----------------------------------------------------------------------
// 20. TestJetStreamClusterStreamSynchedTimeStamps
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterStreamSynchedTimeStamps_ShouldMaintainTimestampAfterLeaderChange()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Publish to R=3 memory stream, record timestamp, kill stream leader,
// fetch msg from new leader, verify timestamps match.
}
// -----------------------------------------------------------------------
// 21. TestJetStreamClusterRestoreSingleConsumer
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterRestoreSingleConsumer_ShouldRestoreAfterFullClusterRestart()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Create stream, publish, subscribe durable, ack, stop all, restart all,
// verify stream and consumer are restored.
}
// -----------------------------------------------------------------------
// 22. TestJetStreamClusterMaxBytesForStream
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterMaxBytesForStream_ShouldEnforcePerServerStorageLimit()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Create R=2 stream with MaxBytes=2GB (ok), then try 4GB (should fail: no suitable peers).
}
// -----------------------------------------------------------------------
// 23. TestJetStreamClusterStreamPublishWithActiveConsumers
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterStreamPublishWithActiveConsumers_ShouldDeliverInOrderAfterLeaderChange()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Create R=3 stream, subscribe durable, publish 10 in sequence, verify order,
// kill consumer leader, publish 10 more, verify order continues.
}
// -----------------------------------------------------------------------
// 24. TestJetStreamClusterStreamOverlapSubjects
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterStreamOverlapSubjects_ShouldPreventOverlappingSubjects()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3");
c.WaitOnLeader();
// Create TEST on "foo", try to create TEST2 on "foo" — should fail.
// Verify only 1 stream in list.
}
// -----------------------------------------------------------------------
// 25. TestJetStreamClusterStreamInfoList
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterStreamInfoList_ShouldReturnCorrectMsgCountsForAllStreams()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Create foo(10), bar(22), baz(33), verify StreamsInfo returns correct counts.
}
// -----------------------------------------------------------------------
// 26. TestJetStreamClusterConsumerInfoList
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterConsumerInfoList_ShouldReturnCorrectConsumerStates()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Create R=3 stream, publish 10, create 3 pull consumers with different
// fetch/ack combos, verify ConsumersInfo returns correct delivered/ackfloor.
}
// -----------------------------------------------------------------------
// 27. TestJetStreamClusterStreamUpdate
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterStreamUpdate_ShouldUpdateMaxMsgsSuccessfully()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Create stream MaxMsgs=10, fill it, expect publish failure.
// Update MaxMsgs=20 from non-leader, verify success.
// Attempt bad update (name mismatch), verify only 1 response.
}
// -----------------------------------------------------------------------
// 28. TestJetStreamClusterStreamExtendedUpdates
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterStreamExtendedUpdates_ShouldAllowSubjectUpdateButNotMirrorChange()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Subjects can be updated. Mirror changes should return JSStreamMirrorNotUpdatableError.
}
// -----------------------------------------------------------------------
// 29. TestJetStreamClusterDoubleAdd
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterDoubleAdd_ShouldBeIdempotentForStreamsAndConsumers()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(2, "R32");
c.WaitOnLeader();
// Add stream twice — should not error. Add consumer twice — should not error.
}
// -----------------------------------------------------------------------
// 30. TestJetStreamClusterDefaultMaxAckPending
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterDefaultMaxAckPending_ShouldSetDefaultAckPendingOnConsumer()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(2, "R32");
c.WaitOnLeader();
// Create consumer with default config, verify MaxAckPending == JsDefaultMaxAckPending (20000).
}
// -----------------------------------------------------------------------
// 31. TestJetStreamClusterStreamNormalCatchup
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterStreamNormalCatchup_ShouldCatchupAfterRejoiningCluster()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Publish 10 msgs, kill stream leader, publish 11 more, delete one,
// restart old leader, wait for cluster formed + current.
}
// -----------------------------------------------------------------------
// 32. TestJetStreamClusterStreamSnapshotCatchup
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterStreamSnapshotCatchup_ShouldCatchupViaSnapshotAfterRejoining()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Publish 2, kill stream leader, publish 100, delete 2 msgs, snapshot,
// send more, restart old leader, wait for current, verify states match.
}
// -----------------------------------------------------------------------
// 33. TestJetStreamClusterDeleteMsg
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterDeleteMsg_ShouldDeleteMessageAndSupportPurge()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Create R=1 stream, publish 10, delete seq 1, purge stream — all should succeed.
}
// -----------------------------------------------------------------------
// 34. TestJetStreamClusterDeleteMsgAndRestart
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterDeleteMsgAndRestart_ShouldSurviveFullRestart()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Create R=2 stream, publish 10, delete seq 1, stop all, restart all,
// wait for stream leader.
}
// -----------------------------------------------------------------------
// 35. TestJetStreamClusterStreamSnapshotCatchupWithPurge
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterStreamSnapshotCatchupWithPurge_ShouldHandlePurgeDuringCatchup()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(5, "R5S");
c.WaitOnLeader();
// Kill stream leader, publish 10, snapshot, restart old leader,
// purge while recovering, wait for current, verify stream info available.
}
// -----------------------------------------------------------------------
// 36. TestJetStreamClusterExtendedStreamInfo
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterExtendedStreamInfo_ShouldIncludeClusterInfoAndReplicas()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Create R=3 stream, publish 50, verify StreamInfo contains cluster info,
// cluster name, leader, and 2 replicas. Replicas must be ordered.
// Kill leader, verify info still correct, restart, verify current.
}
// -----------------------------------------------------------------------
// 37. TestJetStreamClusterExtendedStreamInfoSingleReplica
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterExtendedStreamInfoSingleReplica_ShouldShowNoReplicasForR1Stream()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Create R=1 stream, verify cluster info shows 0 replicas.
// Verify ConsumersInfo returns 0 initially, 1 after adding consumer.
}
// -----------------------------------------------------------------------
// 38. TestJetStreamClusterInterestRetention
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterInterestRetention_ShouldDeleteMsgsAfterAckWithInterestPolicy()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Create Interest-retention R=3 stream, subscribe durable, publish 1, ack,
// verify stream goes to 0. Publish 50 more, delete consumer, verify stream goes to 0.
}
// -----------------------------------------------------------------------
// 39. TestJetStreamClusterWorkQueueRetention
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterWorkQueueRetention_ShouldRemoveMsgsAfterAckInWorkQueueMode()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Create WorkQueue R=2 stream, publish 1, pull and ack, verify stream goes to 0.
}
// -----------------------------------------------------------------------
// 40. TestJetStreamClusterMirrorAndSourceWorkQueues
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterMirrorAndSourceWorkQueues_ShouldMirrorWorkQueueMessages()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "WQ");
c.WaitOnLeader();
// Create WQ22 WorkQueue, M mirror, S source. Publish 1 to WQ22.
// Verify WQ22=0, M=1, S=1 (because mirror/source consume from work queue).
}
// -----------------------------------------------------------------------
// 41. TestJetStreamClusterMirrorAndSourceInterestPolicyStream
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterMirrorAndSourceInterestPolicyStream_ShouldHandleInterestPolicyWithMirrorAndSource()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "WQ");
c.WaitOnLeader();
// Create IP22 Interest stream, mirror M, source S.
// Without other interest: IP22=0, M=1, S=1.
// After adding subscriber: IP22=1, M=2, S=2.
}
// -----------------------------------------------------------------------
// 42. TestJetStreamClusterInterestRetentionWithFilteredConsumers
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterInterestRetentionWithFilteredConsumers_ShouldTrackPerFilteredConsumer()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Create Interest stream on "*", two filtered consumers (foo, bar).
// Verify messages are retained until all consumers ack.
// Delete consumers, verify stream goes to 0.
// Test same with pull consumer.
}
// -----------------------------------------------------------------------
// 43. TestJetStreamClusterEphemeralConsumerNoImmediateInterest
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterEphemeralConsumerNoImmediateInterest_ShouldCleanUpWithoutActiveSubscriber()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Create ephemeral consumer with deliver subject "r", set inactive threshold to 500ms,
// verify consumer disappears within 5s.
}
// -----------------------------------------------------------------------
// 44. TestJetStreamClusterEphemeralConsumerCleanup
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterEphemeralConsumerCleanup_ShouldRemoveConsumerOnUnsubscribe()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Create R=2 stream, subscribe (ephemeral), set inactive threshold to 10ms,
// verify 1 consumer. Unsubscribe, verify consumer removed within 2s.
}
// -----------------------------------------------------------------------
// 45. TestJetStreamClusterEphemeralConsumersNotReplicated
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterEphemeralConsumersNotReplicated_ShouldBeR1Only()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Create R=3 stream, ephemeral subscribe, verify consumer cluster has 0 replicas (R=1).
// Shut consumer server, verify optimistic delivery may fail (logged, not fatal).
}
// -----------------------------------------------------------------------
// 46. TestJetStreamClusterUserSnapshotAndRestore
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterUserSnapshotAndRestore_ShouldRestoreStreamWithConsumerState()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Create R=2 stream, publish 200, create 2 consumers with partial ack state,
// snapshot, delete stream, restore, verify message count and consumer state.
}
// -----------------------------------------------------------------------
// 47. TestJetStreamClusterUserSnapshotAndRestoreConfigChanges
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterUserSnapshotAndRestoreConfigChanges_ShouldAllowConfigChangesOnRestore()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Snapshot R=2 stream, delete it, restore with different subjects/storage/replicas.
}
// -----------------------------------------------------------------------
// 48. TestJetStreamClusterAccountInfoAndLimits
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterAccountInfoAndLimits_ShouldEnforceStreamAndConsumerLimits()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(5, "R5S");
c.WaitOnLeader();
// Set limits: 1024 mem, 8000 store, 3 streams, 2 consumers.
// Create 3 streams, verify 4th fails. Verify store enforcement.
// Create 2 consumers (with idempotent create), verify 3rd fails.
}
// -----------------------------------------------------------------------
// 49. TestJetStreamClusterMaxStreamsReached
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterMaxStreamsReached_ShouldAllowIdempotentCreateUnderLimit()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// MaxStreams=1: 15 parallel creates of same stream — all succeed (idempotent).
// MaxStreams=2, 2 existing streams: 15 parallel creates alternating — all succeed.
}
// -----------------------------------------------------------------------
// 50. TestJetStreamClusterStreamLimits
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterStreamLimits_ShouldEnforceMaxMsgSizeAndMaxMsgsAndMaxAge()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// R=5 fails on 3-node. R=3 stream with MaxMsgSize=11, MaxMsgs=5, MaxAge=250ms, DiscardNew.
// Large msg fails, 5 msgs ok, 6th fails. After age expires, msgs=0, publish succeeds.
}
// -----------------------------------------------------------------------
// 51. TestJetStreamClusterStreamInterestOnlyPolicy
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterStreamInterestOnlyPolicy_ShouldNotRetainMsgsWithoutInterest()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Publish 10 without consumer → stream stays at 0.
// Add consumer, publish 10 → stream has 10. Delete consumer → stream goes to 0.
}
// -----------------------------------------------------------------------
// 52. TestJetStreamClusterExtendedAccountInfo
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterExtendedAccountInfo_ShouldTrackStreamsConsumersAndApiErrors()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Create 3 streams with consumers, verify AccountInfo shows 3 streams, 3 consumers, >=7 API calls.
// Make 4 bad API calls, verify Errors==4.
}
// -----------------------------------------------------------------------
// 53. TestJetStreamClusterPeerRemovalAPI
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterPeerRemovalApi_ShouldRemovePeerViaApi()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(5, "R5S");
c.WaitOnLeader();
// Unknown peer removal → error. Valid peer removal → success + advisory published.
// Verify peer removed from cluster peers within 5s.
}
// -----------------------------------------------------------------------
// 54. TestJetStreamClusterPeerRemovalAndStreamReassignment
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterPeerRemovalAndStreamReassignment_ShouldReassignStreamAfterPeerRemoval()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(5, "R5S");
c.WaitOnLeader();
// Create R=3 stream, remove one non-leader stream peer via API,
// verify stream still has 2 current replicas (none is the removed server).
}
// -----------------------------------------------------------------------
// 55. TestJetStreamClusterPeerRemovalAndStreamReassignmentWithoutSpace
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterPeerRemovalAndStreamReassignmentWithoutSpace_ShouldHandleInsufficientPeers()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// R=3 stream in 3-node cluster, remove one peer — cluster goes to 2 nodes.
// Stream should scale down to R=2 (no space for R=3).
}
// -----------------------------------------------------------------------
// 56. TestJetStreamClusterPeerRemovalAndServerBroughtBack
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterPeerRemovalAndServerBroughtBack_ShouldHandleServerReintroduction()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Remove server from cluster, bring it back, verify peer count restored to 3.
}
// -----------------------------------------------------------------------
// 57. TestJetStreamClusterPeerExclusionTag
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterPeerExclusionTag_ShouldExcludeTaggedPeersFromPlacement()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Stream with placement excluding a tag should not place on tagged server.
}
// -----------------------------------------------------------------------
// 58. TestJetStreamClusterAccountPurge
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterAccountPurge_ShouldDeleteAllStreamsAndConsumersForAccount()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Create several streams with consumers, purge account via $JS.API.ACCOUNT.PURGE,
// verify all streams/consumers removed.
}
// -----------------------------------------------------------------------
// 59. TestJetStreamClusterScaleConsumer
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterScaleConsumer_ShouldScaleConsumerReplicasUpAndDown()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "C");
c.WaitOnLeader();
// Create R=3 stream + durable consumer, publish 1000 msgs,
// scale consumer down to 1, up to 3, down to 1, up to 0 (inherit from stream),
// consuming one msg between each scale, verify state consistency throughout.
}
// -----------------------------------------------------------------------
// 60. TestJetStreamClusterConsumerScaleUp
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterConsumerScaleUp_ShouldMaintainConsumerLeadershipAfterStreamScaleUp()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "HUB");
c.WaitOnLeader();
// Create R=1 stream + durable R=0 consumer, publish 100 msgs,
// scale stream to R=2, wait 2s, verify consumer leader still present.
}
// -----------------------------------------------------------------------
// 61. TestJetStreamClusterPeerOffline
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterPeerOffline_ShouldMarkServerOfflineAndOnlineCorrectly()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(5, "R5S");
c.WaitOnLeader();
// Shut a non-leader, verify it shows as offline. Restart it, verify online again.
}
// -----------------------------------------------------------------------
// 62. TestJetStreamClusterNoQuorumStepdown
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterNoQuorumStepdown_ShouldStepDownLeaderWhenQuorumLost()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Shut 2 of 3 servers, verify meta leader steps down (no quorum).
}
// -----------------------------------------------------------------------
// 63. TestJetStreamClusterCreateResponseAdvisoriesHaveSubject
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterCreateResponseAdvisoriesHaveSubject_ShouldIncludeSubjectInAdvisories()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Subscribe to $JS.EVENT.ADVISORY.API.>, create stream, verify advisory has subject field set.
}
// -----------------------------------------------------------------------
// 64. TestJetStreamClusterRestartAndRemoveAdvisories
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterRestartAndRemoveAdvisories_ShouldNotSendAdvisoriesForRemovedOnRestart()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Create/delete streams, restart cluster, verify no spurious create/delete advisories on restart.
}
// -----------------------------------------------------------------------
// 65. TestJetStreamClusterNoDuplicateOnNodeRestart
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterNoDuplicateOnNodeRestart_ShouldNotDeliverDuplicateMessagesOnRestart()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Subscribe sync, publish, receive, restart node, verify no duplicate delivery.
}
// -----------------------------------------------------------------------
// 66. TestJetStreamClusterNoDupePeerSelection
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterNoDupePeerSelection_ShouldNotSelectSamePeerTwiceForConsumer()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Create R=3 stream, create R=3 consumer, verify consumer cluster has distinct peers.
}
// -----------------------------------------------------------------------
// 67. TestJetStreamClusterStreamRemovePeer
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterStreamRemovePeer_ShouldReassignStreamAfterPeerRemoval()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(5, "R5S");
c.WaitOnLeader();
// Remove stream peer via $JS.API.STREAM.PEER.REMOVE, verify stream reassigned to new peer.
}
// -----------------------------------------------------------------------
// 68. TestJetStreamClusterStreamLeaderStepDown
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterStreamLeaderStepDown_ShouldElectNewStreamLeaderAfterStepDown()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Step down stream leader, verify new leader elected and stream still accessible.
}
// -----------------------------------------------------------------------
// 69. TestJetStreamClusterRemoveServer
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterRemoveServer_ShouldRebalanceStreamsAfterServerRemoval()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(5, "R5S");
c.WaitOnLeader();
// Remove server, verify streams are rebalanced, no stale peer references remain.
}
// -----------------------------------------------------------------------
// 70. TestJetStreamClusterPurgeReplayAfterRestart
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterPurgeReplayAfterRestart_ShouldReplayPurgeAfterRestart()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Publish 10, purge, restart cluster, verify stream is still empty.
}
// -----------------------------------------------------------------------
// 71. TestJetStreamClusterStreamGetMsg
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterStreamGetMsg_ShouldGetMessageBySequenceFromCluster()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Publish a msg, get it via $JS.API.STREAM.MSG.GET, verify data matches.
}
// -----------------------------------------------------------------------
// 72. TestJetStreamClusterStreamDirectGetMsg
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterStreamDirectGetMsg_ShouldSupportDirectGetFromReplica()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Create R=3 stream with AllowDirect=true, publish, direct-get from replica.
}
// -----------------------------------------------------------------------
// 73. TestJetStreamClusterStreamPerf
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterStreamPerf_ShouldPublishAndReceiveAllMessagesWithinTimeout()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Publish 5000 msgs to R=3 stream, verify all received by pull consumer.
}
// -----------------------------------------------------------------------
// 74. TestJetStreamClusterConsumerPerf
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterConsumerPerf_ShouldDeliverAllMessagesToPushConsumer()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Publish 5000 msgs to R=3 stream, push-consume all, verify count.
}
// -----------------------------------------------------------------------
// 75. TestJetStreamClusterQueueSubConsumer
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterQueueSubConsumer_ShouldDeliverExactlyOnceAcrossQueueGroup()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Create stream, subscribe with queue group, publish 100 msgs,
// verify each msg delivered exactly once across all queue members.
}
// -----------------------------------------------------------------------
// 76. TestJetStreamClusterLeaderStepdown
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterLeaderStepdown_ShouldElectNewMetaLeaderAfterStepDown()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Request meta leader stepdown, verify new leader elected.
}
// -----------------------------------------------------------------------
// 77. TestJetStreamClusterSourcesFilteringAndUpdating
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterSourcesFilteringAndUpdating_ShouldFilterSourcesBySubjectAndSupportUpdate()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Create source stream, create stream with filtered source, verify filtering,
// update source filter, verify updated behavior.
}
// -----------------------------------------------------------------------
// 78. TestJetStreamClusterSourcesUpdateOriginError
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterSourcesUpdateOriginError_ShouldReportErrorWhenSourceOriginChanges()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Create stream with source, update to change source origin — should error.
}
// -----------------------------------------------------------------------
// 79. TestJetStreamClusterMirrorAndSourcesClusterRestart
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterMirrorAndSourcesClusterRestart_ShouldContinueAfterRestart()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Create mirror and source streams, publish, restart cluster, verify counts preserved.
}
// -----------------------------------------------------------------------
// 80. TestJetStreamClusterMirrorAndSourcesFilteredConsumers
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterMirrorAndSourcesFilteredConsumers_ShouldWorkWithFilteredConsumers()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Create mirror with filtered subjects, consume from mirror with consumer filter.
}
// -----------------------------------------------------------------------
// 81. TestJetStreamClusterCrossAccountMirrorsAndSources
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterCrossAccountMirrorsAndSources_ShouldMirrorAcrossAccounts()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamClusterWithTemplate(ConfigHelper.JsClusterAccountsTemplate, 3, "R3S");
c.WaitOnLeader();
// Create stream in account ONE, mirror in account TWO, verify mirror receives msgs.
}
// -----------------------------------------------------------------------
// 82. TestJetStreamClusterFailMirrorsAndSources
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterFailMirrorsAndSources_ShouldFailGracefullyOnInvalidMirrorOrSource()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Attempt to create mirror/source on non-existent stream — should get error response.
}
// -----------------------------------------------------------------------
// 83. TestJetStreamClusterConsumerDeliveredSyncReporting
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterConsumerDeliveredSyncReporting_ShouldReportDeliveredSequenceAccurately()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Push consumer, publish msgs, verify Delivered.Stream and Delivered.Consumer in sync.
}
// -----------------------------------------------------------------------
// 84. TestJetStreamClusterConsumerAckSyncReporting
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterConsumerAckSyncReporting_ShouldReportAckFloorAccurately()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Pull consumer, fetch and ack, verify AckFloor.Stream and AckFloor.Consumer in sync.
}
// -----------------------------------------------------------------------
// 85. TestJetStreamClusterConsumerDeleteInterestPolicyMultipleConsumers
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterConsumerDeleteInterestPolicyMultipleConsumers_ShouldNotPurgeMsgsWithOtherActiveConsumers()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Interest stream, 2 consumers. Delete one — msgs should remain until other acks too.
}
// -----------------------------------------------------------------------
// 86. TestJetStreamClusterConsumerAckNoneInterestPolicyShouldNotRetainAfterDelivery
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterConsumerAckNoneInterestPolicyShouldNotRetainAfterDelivery_ShouldRemoveMsgsOnDelivery()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// AckNone consumer on Interest stream: msgs removed on delivery without explicit ack.
}
// -----------------------------------------------------------------------
// 87. TestJetStreamClusterConsumerDeleteAckNoneInterestPolicyWithOthers
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterConsumerDeleteAckNoneInterestPolicyWithOthers_ShouldHandleDeleteWithMultipleConsumers()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// AckNone + AckExplicit consumers on Interest stream, delete AckNone consumer,
// verify explicit consumer still sees msgs.
}
// -----------------------------------------------------------------------
// 88. TestJetStreamClusterMetaStepdownFromNonSysAccount
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterMetaStepdownFromNonSysAccount_ShouldFailWithPermissionError()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Non-system account attempting meta stepdown should get an error.
}
// -----------------------------------------------------------------------
// 89. TestJetStreamClusterMaxDeliveriesOnInterestStreams
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterMaxDeliveriesOnInterestStreams_ShouldRespectMaxDeliveriesSetting()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Interest stream, consumer MaxDelivers=3, verify msg removed after 3 delivery attempts.
}
// -----------------------------------------------------------------------
// 90. TestJetStreamClusterMetaRecoveryUpdatesDeletesConsumers
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterMetaRecoveryUpdatesDeletesConsumers_ShouldRecoverUpdatedAndDeletedConsumers()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Create, update, delete consumer. Restart. Verify correct final state recovered.
}
// -----------------------------------------------------------------------
// 91. TestJetStreamClusterMetaRecoveryRecreateFileStreamAsMemory
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterMetaRecoveryRecreateFileStreamAsMemory_ShouldRecoverStreamWithChangedStorageType()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Create File stream, delete it, recreate as Memory, restart, verify Memory type recovered.
}
// -----------------------------------------------------------------------
// 92. TestJetStreamClusterMetaRecoveryConsumerCreateAndRemove
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterMetaRecoveryConsumerCreateAndRemove_ShouldRecoverAfterConsumerCreateAndDelete()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Create consumer, delete it, restart, verify consumer is not present.
}
// -----------------------------------------------------------------------
// 93. TestJetStreamClusterMetaRecoveryAddAndUpdateStream
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterMetaRecoveryAddAndUpdateStream_ShouldRecoverUpdatedStreamConfig()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Create stream, update MaxMsgs, restart, verify updated config recovered.
}
// -----------------------------------------------------------------------
// 94. TestJetStreamClusterConsumerAckOutOfBounds
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterConsumerAckOutOfBounds_ShouldHandleOutOfBoundsAckGracefully()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Ack a sequence beyond delivered range — server should not crash, consumer stays healthy.
}
// -----------------------------------------------------------------------
// 95. TestJetStreamClusterCatchupLoadNextMsgTooManyDeletes
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterCatchupLoadNextMsgTooManyDeletes_ShouldCatchupWithHighDensityDeletes()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Publish 1000 msgs, delete 999 of them, kill stream leader, restart, verify catchup.
}
// -----------------------------------------------------------------------
// 96. TestJetStreamClusterCatchupMustStallWhenBehindOnApplies
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterCatchupMustStallWhenBehindOnApplies_ShouldNotOverloadCatchupQueue()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Kill stream replica, flood with messages, restart replica, verify catchup without queue overflow.
}
// -----------------------------------------------------------------------
// 97. TestJetStreamClusterConsumerInfoAfterCreate
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterConsumerInfoAfterCreate_ShouldReturnConsumerInfoImmediatelyAfterCreate()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Create consumer, immediately request ConsumerInfo, verify info is available.
}
// -----------------------------------------------------------------------
// 98. TestJetStreamClusterStreamUpscalePeersAfterDownscale
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterStreamUpscalePeersAfterDownscale_ShouldRestoreAllPeersOnUpscale()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(5, "R5S");
c.WaitOnLeader();
// Scale stream from R=3 to R=1, then back to R=3, verify 3 distinct current peers.
}
// -----------------------------------------------------------------------
// 99. TestJetStreamClusterClearAllPreAcksOnRemoveMsg
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterClearAllPreAcksOnRemoveMsg_ShouldClearPreAcksWhenMessageRemoved()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Interest stream, pre-ack msg, then delete it via DeleteMsg, verify pre-ack state cleared.
}
// -----------------------------------------------------------------------
// 100. TestJetStreamClusterStreamHealthCheckMustNotRecreate
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterStreamHealthCheckMustNotRecreate_ShouldNotRecreateExistingStream()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Health check on existing stream must not trigger recreation or data loss.
}
// -----------------------------------------------------------------------
// 101. TestJetStreamClusterStreamHealthCheckMustNotDeleteEarly
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterStreamHealthCheckMustNotDeleteEarly_ShouldNotDeleteStreamDuringHealthCheck()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Health check on stream with messages must not trigger premature deletion.
}
// -----------------------------------------------------------------------
// 102. TestJetStreamClusterStreamHealthCheckOnlyReportsSkew
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterStreamHealthCheckOnlyReportsSkew_ShouldOnlyReportSkewNotForceRecovery()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Force skew in stream replica, health check should report skew, not force recreation.
}
// -----------------------------------------------------------------------
// 103. TestJetStreamClusterStreamHealthCheckStreamCatchup
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterStreamHealthCheckStreamCatchup_ShouldTriggerCatchupOnHealthCheckFailure()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Simulate replica behind, health check should trigger catchup.
}
// -----------------------------------------------------------------------
// 104. TestJetStreamClusterConsumerHealthCheckMustNotRecreate
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterConsumerHealthCheckMustNotRecreate_ShouldNotRecreateExistingConsumer()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Health check on existing consumer must not trigger recreation.
}
// -----------------------------------------------------------------------
// 105. TestJetStreamClusterConsumerHealthCheckMustNotDeleteEarly
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterConsumerHealthCheckMustNotDeleteEarly_ShouldNotDeleteActiveConsumer()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Active consumer must not be prematurely deleted by health check.
}
// -----------------------------------------------------------------------
// 106. TestJetStreamClusterConsumerHealthCheckOnlyReportsSkew
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterConsumerHealthCheckOnlyReportsSkew_ShouldNotForceRecreateOnSkew()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Skewed consumer state should be reported, not cause forced recreation.
}
// -----------------------------------------------------------------------
// 107. TestJetStreamClusterConsumerHealthCheckDeleted
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterConsumerHealthCheckDeleted_ShouldCleanUpDeletedConsumerOnHealthCheck()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Deleted consumer should be cleaned up gracefully during health check.
}
// -----------------------------------------------------------------------
// 108. TestJetStreamClusterRespectConsumerStartSeq
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterRespectConsumerStartSeq_ShouldStartDeliveryFromConfiguredSequence()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Create consumer with DeliverByStartSequence=5, verify first delivered msg is seq 5.
}
// -----------------------------------------------------------------------
// 109. TestJetStreamClusterPeerRemoveStreamConsumerDesync
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterPeerRemoveStreamConsumerDesync_ShouldNotDesyncConsumerAfterPeerRemoval()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(5, "R5S");
c.WaitOnLeader();
// Remove a peer from stream, verify consumer state remains in sync with stream.
}
// -----------------------------------------------------------------------
// 110. TestJetStreamClusterStuckConsumerAfterLeaderChangeWithUnknownDeliveries
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterStuckConsumerAfterLeaderChangeWithUnknownDeliveries_ShouldRecoverFromStuckState()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Consumer with in-flight msgs, leader change — consumer should recover without getting stuck.
}
// -----------------------------------------------------------------------
// 111. TestJetStreamClusterAccountStatsForReplicatedStreams
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterAccountStatsForReplicatedStreams_ShouldCountStorageOnceNotPerReplica()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Publish to R=3 stream, verify account stats count storage once (logical), not 3x.
}
// -----------------------------------------------------------------------
// 112. TestJetStreamClusterRecreateConsumerFromMetaSnapshot
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterRecreateConsumerFromMetaSnapshot_ShouldRecreateConsumerFromSnapshot()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Create consumer, snapshot meta, shut server, restore from snapshot, verify consumer exists.
}
// -----------------------------------------------------------------------
// 113. TestJetStreamClusterUpgradeStreamVersioning
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterUpgradeStreamVersioning_ShouldHandleStreamVersionUpgrade()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Simulate stream version upgrade scenario, verify stream accessible after upgrade.
}
// -----------------------------------------------------------------------
// 114. TestJetStreamClusterUpgradeConsumerVersioning
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterUpgradeConsumerVersioning_ShouldHandleConsumerVersionUpgrade()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Simulate consumer version upgrade, verify consumer accessible after upgrade.
}
// -----------------------------------------------------------------------
// 115. TestJetStreamClusterInterestPolicyAckAll
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterInterestPolicyAckAll_ShouldRemoveMsgOnlyAfterAllConsumersAckAll()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Interest stream, AckAll consumer: msg removed only after all consumers AckAll.
}
// -----------------------------------------------------------------------
// 116. TestJetStreamClusterPreserveRedeliveredWithLaggingStream
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterPreserveRedeliveredWithLaggingStream_ShouldPreserveRedeliveredFlagDuringLag()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Consumer with lagging stream: redelivered flag must be preserved across leader changes.
}
// -----------------------------------------------------------------------
// 117. TestJetStreamClusterInvalidJSACKOverRoute
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterInvalidJsAckOverRoute_ShouldHandleInvalidAckGracefully()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// Invalid JSACK sent over a route should not crash the server.
}
// -----------------------------------------------------------------------
// 118. TestJetStreamClusterConsumerOnlyDeliverMsgAfterQuorum
// -----------------------------------------------------------------------
[SkippableFact]
public void ClusterConsumerOnlyDeliverMsgAfterQuorum_ShouldNotDeliverBeforeQuorumAchieved()
{
Skip.If(ShouldSkip(), "Cluster integration tests are not enabled.");
using var c = TestCluster.CreateJetStreamCluster(3, "R3S");
c.WaitOnLeader();
// R=3 consumer must not deliver msg until quorum (2 of 3) of replicas have acknowledged the entry.
}
}