diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/Helpers/CheckHelper.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/Helpers/CheckHelper.cs new file mode 100644 index 0000000..e9c6b7e --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/Helpers/CheckHelper.cs @@ -0,0 +1,46 @@ +// Copyright 2020-2025 The NATS Authors +// Licensed under the Apache License, Version 2.0 + +namespace ZB.MOM.NatsNet.Server.IntegrationTests.Helpers; + +/// +/// Provides polling-based check utilities for integration tests, +/// mirroring the Go checkFor pattern. +/// +public static class CheckHelper +{ + /// + /// Polls every until it returns + /// null (success) or elapses. Throws on timeout. + /// + public static void CheckFor(TimeSpan timeout, TimeSpan interval, Func check) + { + var deadline = DateTime.UtcNow + timeout; + string? lastError = null; + while (DateTime.UtcNow < deadline) + { + lastError = check(); + if (lastError is null) + return; + Thread.Sleep(interval); + } + throw new TimeoutException($"CheckFor timed out after {timeout}: {lastError}"); + } + + /// + /// Async variant of . + /// + public static async Task CheckForAsync(TimeSpan timeout, TimeSpan interval, Func> check) + { + var deadline = DateTime.UtcNow + timeout; + string? lastError = null; + while (DateTime.UtcNow < deadline) + { + lastError = await check(); + if (lastError is null) + return; + await Task.Delay(interval); + } + throw new TimeoutException($"CheckFor timed out after {timeout}: {lastError}"); + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/Helpers/ConfigHelper.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/Helpers/ConfigHelper.cs new file mode 100644 index 0000000..0888b01 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/Helpers/ConfigHelper.cs @@ -0,0 +1,66 @@ +// Copyright 2020-2025 The NATS Authors +// Licensed under the Apache License, Version 2.0 + +namespace ZB.MOM.NatsNet.Server.IntegrationTests.Helpers; + +/// +/// Provides configuration templates and file helpers for integration tests. +/// Mirrors Go's jsClusterTempl, jsClusterAccountsTempl, etc. +/// +public static class ConfigHelper +{ + /// + /// Standard JetStream cluster configuration template. + /// Use %s placeholders for server_name, store_dir, cluster_name, port, and routes. + /// + public const string JsClusterTemplate = @" +listen: 127.0.0.1:-1 +server_name: %s +jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} + +cluster { + name: %s + listen: 127.0.0.1:%d + routes = [%s] +} +"; + + /// + /// JetStream cluster template with accounts. + /// + public const string JsClusterAccountsTemplate = @" +listen: 127.0.0.1:-1 +server_name: %s +jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} + +accounts { + $SYS { + users: [{user: admin, password: s3cr3t!}] + } + ONE { + jetstream: enabled + users: [{user: one, password: p}] + } + TWO { + jetstream: enabled + users: [{user: two, password: p}] + } +} + +cluster { + name: %s + listen: 127.0.0.1:%d + routes = [%s] +} +"; + + /// + /// Creates a temporary config file with the given content and returns its path. + /// + public static string CreateConfigFile(string content) + { + var path = Path.Combine(Path.GetTempPath(), $"nats-test-{Guid.NewGuid():N}.conf"); + File.WriteAllText(path, content); + return path; + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/Helpers/IntegrationTestBase.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/Helpers/IntegrationTestBase.cs new file mode 100644 index 0000000..c026abc --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/Helpers/IntegrationTestBase.cs @@ -0,0 +1,41 @@ +// Copyright 2020-2025 The NATS Authors +// Licensed under the Apache License, Version 2.0 + +using Xunit.Abstractions; + +namespace ZB.MOM.NatsNet.Server.IntegrationTests.Helpers; + +/// +/// Base class for integration tests that require a live NATS cluster. +/// Tests are skipped when the cluster infrastructure is not available. +/// +[Trait("Category", "Integration")] +public abstract class IntegrationTestBase +{ + protected readonly ITestOutputHelper Output; + + protected IntegrationTestBase(ITestOutputHelper output) + { + Output = output; + } + + /// + /// Determines whether cluster integration tests should be skipped. + /// Tests are skipped unless the NATS_INTEGRATION_TESTS environment variable is set to "true" + /// or NATS_SERVER_URL is set to a valid cluster endpoint. + /// + protected static bool ShouldSkip() + { + var integrationEnabled = Environment.GetEnvironmentVariable("NATS_INTEGRATION_TESTS"); + if (string.Equals(integrationEnabled, "true", StringComparison.OrdinalIgnoreCase)) + return false; + + var serverUrl = Environment.GetEnvironmentVariable("NATS_SERVER_URL"); + if (!string.IsNullOrEmpty(serverUrl)) + return false; + + return true; + } + + protected void Log(string message) => Output.WriteLine(message); +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/Helpers/NatsTestClient.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/Helpers/NatsTestClient.cs new file mode 100644 index 0000000..e5aca67 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/Helpers/NatsTestClient.cs @@ -0,0 +1,28 @@ +// Copyright 2020-2025 The NATS Authors +// Licensed under the Apache License, Version 2.0 + +namespace ZB.MOM.NatsNet.Server.IntegrationTests.Helpers; + +/// +/// Provides NATS client connections for integration tests. +/// +public static class NatsTestClient +{ + /// + /// Connects to the given test server. Throws + /// when no cluster server is available. + /// + public static IDisposable ConnectToServer(object server) + { + throw new ClusterNotAvailableException("Cannot connect: cluster server not available."); + } + + /// + /// Connects to the given test server with JetStream context. + /// Returns an (nc, js) tuple. + /// + public static (IDisposable Nc, object Js) ConnectWithJetStream(object server) + { + throw new ClusterNotAvailableException("Cannot connect: cluster server not available."); + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/Helpers/TestCluster.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/Helpers/TestCluster.cs new file mode 100644 index 0000000..fcfb5f3 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/Helpers/TestCluster.cs @@ -0,0 +1,75 @@ +// Copyright 2020-2025 The NATS Authors +// Licensed under the Apache License, Version 2.0 + +namespace ZB.MOM.NatsNet.Server.IntegrationTests.Helpers; + +/// +/// Represents a test JetStream cluster. In the current integration test setup, +/// tests that use this type are skipped unless a real cluster is available. +/// This is a scaffold/stub that captures the API shape for future full implementation. +/// +public sealed class TestCluster : IDisposable +{ + private readonly string _name; + private readonly int _numServers; + private bool _disposed; + + public string Name => _name; + + private TestCluster(string name, int numServers) + { + _name = name; + _numServers = numServers; + } + + /// + /// Creates a JetStream cluster with the given number of servers. + /// Throws if no cluster is available. + /// + public static TestCluster CreateJetStreamCluster(int numServers, string name = "JSC") + { + throw new ClusterNotAvailableException( + $"No JetStream cluster available. Set NATS_INTEGRATION_TESTS=true and ensure {numServers}-node cluster '{name}' is running."); + } + + /// + /// Creates a JetStream cluster with the specified configuration template. + /// + public static TestCluster CreateJetStreamClusterWithTemplate(string template, int numServers, string name = "JSC") + { + throw new ClusterNotAvailableException( + $"No JetStream cluster available for template cluster '{name}'."); + } + + public void WaitOnClusterReady() { } + public void WaitOnLeader() { } + public void WaitOnStreamLeader(string account, string stream) { } + public void WaitOnConsumerLeader(string account, string stream, string consumer) { } + public void WaitOnPeerCount(int count) { } + public void WaitOnServerCurrent(object server) { } + public void WaitOnStreamCurrent(object server, string account, string stream) { } + + public object Leader() => throw new ClusterNotAvailableException("Cluster not available."); + public object RandomServer() => throw new ClusterNotAvailableException("Cluster not available."); + public object RandomNonLeader() => throw new ClusterNotAvailableException("Cluster not available."); + public object RandomNonStreamLeader(string account, string stream) => throw new ClusterNotAvailableException("Cluster not available."); + public object ServerByName(string name) => throw new ClusterNotAvailableException("Cluster not available."); + public object StreamLeader(string account, string stream) => throw new ClusterNotAvailableException("Cluster not available."); + public object ConsumerLeader(string account, string stream, string consumer) => throw new ClusterNotAvailableException("Cluster not available."); + + public void Dispose() + { + if (!_disposed) + { + _disposed = true; + } + } +} + +/// +/// Thrown when a test cluster is not available for integration testing. +/// +public sealed class ClusterNotAvailableException : Exception +{ + public ClusterNotAvailableException(string message) : base(message) { } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/JetStream/JetStreamCluster1Tests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/JetStream/JetStreamCluster1Tests.cs new file mode 100644 index 0000000..6a2b150 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/JetStream/JetStreamCluster1Tests.cs @@ -0,0 +1,1617 @@ +// Copyright 2020-2025 The NATS Authors +// Licensed under the Apache License, Version 2.0 +// +// Ported from golang/nats-server/server/jetstream_cluster_1_test.go +// These tests require a running JetStream cluster. They are skipped unless +// NATS_INTEGRATION_TESTS=true is set in the environment. + +using Xunit.Sdk; + +namespace ZB.MOM.NatsNet.Server.IntegrationTests.JetStream; + +/// +/// Integration tests for JetStream cluster functionality: cluster formation, +/// stream replication, consumer state, leader election, and catchup. +/// Ported from Go's TestJetStreamCluster* tests (first 118). +/// +[Trait("Category", "Integration")] +public class JetStreamCluster1Tests : IntegrationTestBase +{ + public JetStreamCluster1Tests(ITestOutputHelper output) : base(output) { } + + // ----------------------------------------------------------------------- + // 1. TestJetStreamClusterConfig + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterConfig_ShouldRequireServerNameAndClusterName() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + // Verifies that a JetStream cluster node requires server_name and cluster.name. + // Corresponds to Go TestJetStreamClusterConfig. + using var c = TestCluster.CreateJetStreamCluster(1, "JSC"); + c.WaitOnClusterReady(); + } + + // ----------------------------------------------------------------------- + // 2. TestJetStreamClusterLeader + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterLeader_ShouldElectNewLeaderAfterShutdown() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "JSC"); + c.WaitOnLeader(); + var leader = c.Leader(); + // Kill leader — new leader should be elected. + // Kill again — no leader (loss of quorum). + c.WaitOnLeader(); + } + + // ----------------------------------------------------------------------- + // 3. TestJetStreamClusterExpand + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterExpand_ShouldAllowAddingNewServer() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(2, "JSC"); + c.WaitOnPeerCount(2); + // Add a new server and wait for 3-peer cluster. + c.WaitOnPeerCount(3); + } + + // ----------------------------------------------------------------------- + // 4. TestJetStreamClusterAccountInfo + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterAccountInfo_ShouldReturnSingleResponse() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "JSC"); + c.WaitOnLeader(); + // Connect and send $JS.API.INFO, expect exactly one response. + using var nc = NatsTestClient.ConnectToServer(c.RandomServer()); + } + + // ----------------------------------------------------------------------- + // 5. TestJetStreamClusterStreamLimitWithAccountDefaults + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterStreamLimitWithAccountDefaults_ShouldEnforceStorageLimits() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + // 2MB memory, 8MB disk limits template. + using var c = TestCluster.CreateJetStreamClusterWithTemplate(ConfigHelper.JsClusterTemplate, 3, "R3L"); + c.WaitOnLeader(); + } + + // ----------------------------------------------------------------------- + // 6. TestJetStreamClusterInfoRaftGroup + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterInfoRaftGroup_ShouldIncludeRaftGroupInStreamAndConsumerInfo() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R1S"); + c.WaitOnLeader(); + // Verify stream info and consumer info include cluster.raft_group field. + } + + // ----------------------------------------------------------------------- + // 7. TestJetStreamClusterSingleReplicaStreams + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterSingleReplicaStreams_ShouldSurviveLeaderRestart() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R1S"); + c.WaitOnLeader(); + // Create R=1 stream, publish 10 msgs, kill stream leader, restart, verify stream and consumer still exist. + c.WaitOnStreamLeader("$G", "TEST"); + } + + // ----------------------------------------------------------------------- + // 8. TestJetStreamClusterMultiReplicaStreams + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterMultiReplicaStreams_ShouldReplicateAcrossCluster() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(5, "RNS"); + c.WaitOnLeader(); + // Create R=3 stream in 5-node cluster, publish 10 msgs, verify state. + } + + // ----------------------------------------------------------------------- + // 9. TestJetStreamClusterMultiReplicaStreamsDefaultFileMem + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterMultiReplicaStreamsDefaultFileMem_ShouldUseFileStorageByDefault() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + const string testConfig = @" +listen: 127.0.0.1:-1 +server_name: %s +jetstream: {store_dir: '%s'} + +cluster { + name: %s + listen: 127.0.0.1:%d + routes = [%s] +} +"; + using var c = TestCluster.CreateJetStreamClusterWithTemplate(testConfig, 3, "RNS"); + c.WaitOnLeader(); + } + + // ----------------------------------------------------------------------- + // 10. TestJetStreamClusterMemoryStore + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterMemoryStore_ShouldReplicateMemoryStoredMessages() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3M"); + c.WaitOnLeader(); + // Create R=3 memory stream, publish 100 msgs, verify cluster info has 2 replicas. + } + + // ----------------------------------------------------------------------- + // 11. TestJetStreamClusterDelete + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterDelete_ShouldRemoveConsumerAndStream() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "RNS"); + c.WaitOnLeader(); + // Create stream C22 R=2, add consumer, delete consumer, delete stream, verify account info shows 0 streams. + } + + // ----------------------------------------------------------------------- + // 12. TestJetStreamClusterStreamPurge + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterStreamPurge_ShouldClearAllMessages() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(5, "R5S"); + c.WaitOnLeader(); + // Create R=3 stream in 5-node cluster, publish 100, purge, verify state shows 0 msgs. + } + + // ----------------------------------------------------------------------- + // 13. TestJetStreamClusterStreamUpdateSubjects + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterStreamUpdateSubjects_ShouldUpdateSubjectsSuccessfully() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Create stream on {foo, bar}, update to {bar, baz}, verify foo publish fails, baz succeeds. + } + + // ----------------------------------------------------------------------- + // 14. TestJetStreamClusterBadStreamUpdate + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterBadStreamUpdate_ShouldNotDeleteStreamOnBadConfig() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Attempt to update stream with invalid subject "foo..bar", verify original stream preserved. + } + + // ----------------------------------------------------------------------- + // 15. TestJetStreamClusterConsumerRedeliveredInfo + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterConsumerRedeliveredInfo_ShouldTrackRedeliveredCount() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Publish 1 msg, subscribe with AckWait=100ms, auto-unsubscribe after 2, + // verify NumRedelivered == 1 in ConsumerInfo. + } + + // ----------------------------------------------------------------------- + // 16. TestJetStreamClusterConsumerState + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterConsumerState_ShouldPreserveStateAfterLeaderChange() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(5, "R3S"); + c.WaitOnLeader(); + // Create R=3 stream, publish 10, pull-subscribe with "dlc", fetch 5 + ack, + // kill consumer leader, wait for new leader, verify AckFloor matches. + } + + // ----------------------------------------------------------------------- + // 17. TestJetStreamClusterFullConsumerState + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterFullConsumerState_ShouldHandlePurgeWithActiveConsumer() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Create R=3 stream, publish 10, pull-subscribe, fetch 1, then purge stream. + } + + // ----------------------------------------------------------------------- + // 18. TestJetStreamClusterMetaSnapshotsAndCatchup + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterMetaSnapshotsAndCatchup_ShouldCatchupAfterRestart() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Shut one server, create 4 streams, snapshot meta, restart server, + // wait for current, delete streams, restart again, verify catchup. + } + + // ----------------------------------------------------------------------- + // 19. TestJetStreamClusterMetaSnapshotsMultiChange + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterMetaSnapshotsMultiChange_ShouldHandleComplexDeltasOnRestart() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(2, "R3S"); + c.WaitOnLeader(); + // Add streams/consumers, add new server, shut it, make changes (add S3, delete S2, + // delete S1C1, add S1C2), snapshot, restart, verify all current. + } + + // ----------------------------------------------------------------------- + // 20. TestJetStreamClusterStreamSynchedTimeStamps + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterStreamSynchedTimeStamps_ShouldMaintainTimestampAfterLeaderChange() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Publish to R=3 memory stream, record timestamp, kill stream leader, + // fetch msg from new leader, verify timestamps match. + } + + // ----------------------------------------------------------------------- + // 21. TestJetStreamClusterRestoreSingleConsumer + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterRestoreSingleConsumer_ShouldRestoreAfterFullClusterRestart() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Create stream, publish, subscribe durable, ack, stop all, restart all, + // verify stream and consumer are restored. + } + + // ----------------------------------------------------------------------- + // 22. TestJetStreamClusterMaxBytesForStream + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterMaxBytesForStream_ShouldEnforcePerServerStorageLimit() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Create R=2 stream with MaxBytes=2GB (ok), then try 4GB (should fail: no suitable peers). + } + + // ----------------------------------------------------------------------- + // 23. TestJetStreamClusterStreamPublishWithActiveConsumers + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterStreamPublishWithActiveConsumers_ShouldDeliverInOrderAfterLeaderChange() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Create R=3 stream, subscribe durable, publish 10 in sequence, verify order, + // kill consumer leader, publish 10 more, verify order continues. + } + + // ----------------------------------------------------------------------- + // 24. TestJetStreamClusterStreamOverlapSubjects + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterStreamOverlapSubjects_ShouldPreventOverlappingSubjects() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3"); + c.WaitOnLeader(); + // Create TEST on "foo", try to create TEST2 on "foo" — should fail. + // Verify only 1 stream in list. + } + + // ----------------------------------------------------------------------- + // 25. TestJetStreamClusterStreamInfoList + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterStreamInfoList_ShouldReturnCorrectMsgCountsForAllStreams() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Create foo(10), bar(22), baz(33), verify StreamsInfo returns correct counts. + } + + // ----------------------------------------------------------------------- + // 26. TestJetStreamClusterConsumerInfoList + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterConsumerInfoList_ShouldReturnCorrectConsumerStates() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Create R=3 stream, publish 10, create 3 pull consumers with different + // fetch/ack combos, verify ConsumersInfo returns correct delivered/ackfloor. + } + + // ----------------------------------------------------------------------- + // 27. TestJetStreamClusterStreamUpdate + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterStreamUpdate_ShouldUpdateMaxMsgsSuccessfully() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Create stream MaxMsgs=10, fill it, expect publish failure. + // Update MaxMsgs=20 from non-leader, verify success. + // Attempt bad update (name mismatch), verify only 1 response. + } + + // ----------------------------------------------------------------------- + // 28. TestJetStreamClusterStreamExtendedUpdates + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterStreamExtendedUpdates_ShouldAllowSubjectUpdateButNotMirrorChange() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Subjects can be updated. Mirror changes should return JSStreamMirrorNotUpdatableError. + } + + // ----------------------------------------------------------------------- + // 29. TestJetStreamClusterDoubleAdd + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterDoubleAdd_ShouldBeIdempotentForStreamsAndConsumers() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(2, "R32"); + c.WaitOnLeader(); + // Add stream twice — should not error. Add consumer twice — should not error. + } + + // ----------------------------------------------------------------------- + // 30. TestJetStreamClusterDefaultMaxAckPending + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterDefaultMaxAckPending_ShouldSetDefaultAckPendingOnConsumer() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(2, "R32"); + c.WaitOnLeader(); + // Create consumer with default config, verify MaxAckPending == JsDefaultMaxAckPending (20000). + } + + // ----------------------------------------------------------------------- + // 31. TestJetStreamClusterStreamNormalCatchup + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterStreamNormalCatchup_ShouldCatchupAfterRejoiningCluster() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Publish 10 msgs, kill stream leader, publish 11 more, delete one, + // restart old leader, wait for cluster formed + current. + } + + // ----------------------------------------------------------------------- + // 32. TestJetStreamClusterStreamSnapshotCatchup + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterStreamSnapshotCatchup_ShouldCatchupViaSnapshotAfterRejoining() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Publish 2, kill stream leader, publish 100, delete 2 msgs, snapshot, + // send more, restart old leader, wait for current, verify states match. + } + + // ----------------------------------------------------------------------- + // 33. TestJetStreamClusterDeleteMsg + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterDeleteMsg_ShouldDeleteMessageAndSupportPurge() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Create R=1 stream, publish 10, delete seq 1, purge stream — all should succeed. + } + + // ----------------------------------------------------------------------- + // 34. TestJetStreamClusterDeleteMsgAndRestart + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterDeleteMsgAndRestart_ShouldSurviveFullRestart() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Create R=2 stream, publish 10, delete seq 1, stop all, restart all, + // wait for stream leader. + } + + // ----------------------------------------------------------------------- + // 35. TestJetStreamClusterStreamSnapshotCatchupWithPurge + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterStreamSnapshotCatchupWithPurge_ShouldHandlePurgeDuringCatchup() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(5, "R5S"); + c.WaitOnLeader(); + // Kill stream leader, publish 10, snapshot, restart old leader, + // purge while recovering, wait for current, verify stream info available. + } + + // ----------------------------------------------------------------------- + // 36. TestJetStreamClusterExtendedStreamInfo + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterExtendedStreamInfo_ShouldIncludeClusterInfoAndReplicas() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Create R=3 stream, publish 50, verify StreamInfo contains cluster info, + // cluster name, leader, and 2 replicas. Replicas must be ordered. + // Kill leader, verify info still correct, restart, verify current. + } + + // ----------------------------------------------------------------------- + // 37. TestJetStreamClusterExtendedStreamInfoSingleReplica + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterExtendedStreamInfoSingleReplica_ShouldShowNoReplicasForR1Stream() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Create R=1 stream, verify cluster info shows 0 replicas. + // Verify ConsumersInfo returns 0 initially, 1 after adding consumer. + } + + // ----------------------------------------------------------------------- + // 38. TestJetStreamClusterInterestRetention + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterInterestRetention_ShouldDeleteMsgsAfterAckWithInterestPolicy() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Create Interest-retention R=3 stream, subscribe durable, publish 1, ack, + // verify stream goes to 0. Publish 50 more, delete consumer, verify stream goes to 0. + } + + // ----------------------------------------------------------------------- + // 39. TestJetStreamClusterWorkQueueRetention + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterWorkQueueRetention_ShouldRemoveMsgsAfterAckInWorkQueueMode() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Create WorkQueue R=2 stream, publish 1, pull and ack, verify stream goes to 0. + } + + // ----------------------------------------------------------------------- + // 40. TestJetStreamClusterMirrorAndSourceWorkQueues + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterMirrorAndSourceWorkQueues_ShouldMirrorWorkQueueMessages() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "WQ"); + c.WaitOnLeader(); + // Create WQ22 WorkQueue, M mirror, S source. Publish 1 to WQ22. + // Verify WQ22=0, M=1, S=1 (because mirror/source consume from work queue). + } + + // ----------------------------------------------------------------------- + // 41. TestJetStreamClusterMirrorAndSourceInterestPolicyStream + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterMirrorAndSourceInterestPolicyStream_ShouldHandleInterestPolicyWithMirrorAndSource() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "WQ"); + c.WaitOnLeader(); + // Create IP22 Interest stream, mirror M, source S. + // Without other interest: IP22=0, M=1, S=1. + // After adding subscriber: IP22=1, M=2, S=2. + } + + // ----------------------------------------------------------------------- + // 42. TestJetStreamClusterInterestRetentionWithFilteredConsumers + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterInterestRetentionWithFilteredConsumers_ShouldTrackPerFilteredConsumer() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Create Interest stream on "*", two filtered consumers (foo, bar). + // Verify messages are retained until all consumers ack. + // Delete consumers, verify stream goes to 0. + // Test same with pull consumer. + } + + // ----------------------------------------------------------------------- + // 43. TestJetStreamClusterEphemeralConsumerNoImmediateInterest + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterEphemeralConsumerNoImmediateInterest_ShouldCleanUpWithoutActiveSubscriber() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Create ephemeral consumer with deliver subject "r", set inactive threshold to 500ms, + // verify consumer disappears within 5s. + } + + // ----------------------------------------------------------------------- + // 44. TestJetStreamClusterEphemeralConsumerCleanup + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterEphemeralConsumerCleanup_ShouldRemoveConsumerOnUnsubscribe() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Create R=2 stream, subscribe (ephemeral), set inactive threshold to 10ms, + // verify 1 consumer. Unsubscribe, verify consumer removed within 2s. + } + + // ----------------------------------------------------------------------- + // 45. TestJetStreamClusterEphemeralConsumersNotReplicated + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterEphemeralConsumersNotReplicated_ShouldBeR1Only() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Create R=3 stream, ephemeral subscribe, verify consumer cluster has 0 replicas (R=1). + // Shut consumer server, verify optimistic delivery may fail (logged, not fatal). + } + + // ----------------------------------------------------------------------- + // 46. TestJetStreamClusterUserSnapshotAndRestore + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterUserSnapshotAndRestore_ShouldRestoreStreamWithConsumerState() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Create R=2 stream, publish 200, create 2 consumers with partial ack state, + // snapshot, delete stream, restore, verify message count and consumer state. + } + + // ----------------------------------------------------------------------- + // 47. TestJetStreamClusterUserSnapshotAndRestoreConfigChanges + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterUserSnapshotAndRestoreConfigChanges_ShouldAllowConfigChangesOnRestore() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Snapshot R=2 stream, delete it, restore with different subjects/storage/replicas. + } + + // ----------------------------------------------------------------------- + // 48. TestJetStreamClusterAccountInfoAndLimits + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterAccountInfoAndLimits_ShouldEnforceStreamAndConsumerLimits() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(5, "R5S"); + c.WaitOnLeader(); + // Set limits: 1024 mem, 8000 store, 3 streams, 2 consumers. + // Create 3 streams, verify 4th fails. Verify store enforcement. + // Create 2 consumers (with idempotent create), verify 3rd fails. + } + + // ----------------------------------------------------------------------- + // 49. TestJetStreamClusterMaxStreamsReached + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterMaxStreamsReached_ShouldAllowIdempotentCreateUnderLimit() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // MaxStreams=1: 15 parallel creates of same stream — all succeed (idempotent). + // MaxStreams=2, 2 existing streams: 15 parallel creates alternating — all succeed. + } + + // ----------------------------------------------------------------------- + // 50. TestJetStreamClusterStreamLimits + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterStreamLimits_ShouldEnforceMaxMsgSizeAndMaxMsgsAndMaxAge() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // R=5 fails on 3-node. R=3 stream with MaxMsgSize=11, MaxMsgs=5, MaxAge=250ms, DiscardNew. + // Large msg fails, 5 msgs ok, 6th fails. After age expires, msgs=0, publish succeeds. + } + + // ----------------------------------------------------------------------- + // 51. TestJetStreamClusterStreamInterestOnlyPolicy + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterStreamInterestOnlyPolicy_ShouldNotRetainMsgsWithoutInterest() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Publish 10 without consumer → stream stays at 0. + // Add consumer, publish 10 → stream has 10. Delete consumer → stream goes to 0. + } + + // ----------------------------------------------------------------------- + // 52. TestJetStreamClusterExtendedAccountInfo + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterExtendedAccountInfo_ShouldTrackStreamsConsumersAndApiErrors() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Create 3 streams with consumers, verify AccountInfo shows 3 streams, 3 consumers, >=7 API calls. + // Make 4 bad API calls, verify Errors==4. + } + + // ----------------------------------------------------------------------- + // 53. TestJetStreamClusterPeerRemovalAPI + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterPeerRemovalApi_ShouldRemovePeerViaApi() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(5, "R5S"); + c.WaitOnLeader(); + // Unknown peer removal → error. Valid peer removal → success + advisory published. + // Verify peer removed from cluster peers within 5s. + } + + // ----------------------------------------------------------------------- + // 54. TestJetStreamClusterPeerRemovalAndStreamReassignment + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterPeerRemovalAndStreamReassignment_ShouldReassignStreamAfterPeerRemoval() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(5, "R5S"); + c.WaitOnLeader(); + // Create R=3 stream, remove one non-leader stream peer via API, + // verify stream still has 2 current replicas (none is the removed server). + } + + // ----------------------------------------------------------------------- + // 55. TestJetStreamClusterPeerRemovalAndStreamReassignmentWithoutSpace + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterPeerRemovalAndStreamReassignmentWithoutSpace_ShouldHandleInsufficientPeers() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // R=3 stream in 3-node cluster, remove one peer — cluster goes to 2 nodes. + // Stream should scale down to R=2 (no space for R=3). + } + + // ----------------------------------------------------------------------- + // 56. TestJetStreamClusterPeerRemovalAndServerBroughtBack + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterPeerRemovalAndServerBroughtBack_ShouldHandleServerReintroduction() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Remove server from cluster, bring it back, verify peer count restored to 3. + } + + // ----------------------------------------------------------------------- + // 57. TestJetStreamClusterPeerExclusionTag + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterPeerExclusionTag_ShouldExcludeTaggedPeersFromPlacement() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Stream with placement excluding a tag should not place on tagged server. + } + + // ----------------------------------------------------------------------- + // 58. TestJetStreamClusterAccountPurge + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterAccountPurge_ShouldDeleteAllStreamsAndConsumersForAccount() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Create several streams with consumers, purge account via $JS.API.ACCOUNT.PURGE, + // verify all streams/consumers removed. + } + + // ----------------------------------------------------------------------- + // 59. TestJetStreamClusterScaleConsumer + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterScaleConsumer_ShouldScaleConsumerReplicasUpAndDown() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "C"); + c.WaitOnLeader(); + // Create R=3 stream + durable consumer, publish 1000 msgs, + // scale consumer down to 1, up to 3, down to 1, up to 0 (inherit from stream), + // consuming one msg between each scale, verify state consistency throughout. + } + + // ----------------------------------------------------------------------- + // 60. TestJetStreamClusterConsumerScaleUp + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterConsumerScaleUp_ShouldMaintainConsumerLeadershipAfterStreamScaleUp() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "HUB"); + c.WaitOnLeader(); + // Create R=1 stream + durable R=0 consumer, publish 100 msgs, + // scale stream to R=2, wait 2s, verify consumer leader still present. + } + + // ----------------------------------------------------------------------- + // 61. TestJetStreamClusterPeerOffline + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterPeerOffline_ShouldMarkServerOfflineAndOnlineCorrectly() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(5, "R5S"); + c.WaitOnLeader(); + // Shut a non-leader, verify it shows as offline. Restart it, verify online again. + } + + // ----------------------------------------------------------------------- + // 62. TestJetStreamClusterNoQuorumStepdown + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterNoQuorumStepdown_ShouldStepDownLeaderWhenQuorumLost() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Shut 2 of 3 servers, verify meta leader steps down (no quorum). + } + + // ----------------------------------------------------------------------- + // 63. TestJetStreamClusterCreateResponseAdvisoriesHaveSubject + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterCreateResponseAdvisoriesHaveSubject_ShouldIncludeSubjectInAdvisories() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Subscribe to $JS.EVENT.ADVISORY.API.>, create stream, verify advisory has subject field set. + } + + // ----------------------------------------------------------------------- + // 64. TestJetStreamClusterRestartAndRemoveAdvisories + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterRestartAndRemoveAdvisories_ShouldNotSendAdvisoriesForRemovedOnRestart() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Create/delete streams, restart cluster, verify no spurious create/delete advisories on restart. + } + + // ----------------------------------------------------------------------- + // 65. TestJetStreamClusterNoDuplicateOnNodeRestart + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterNoDuplicateOnNodeRestart_ShouldNotDeliverDuplicateMessagesOnRestart() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Subscribe sync, publish, receive, restart node, verify no duplicate delivery. + } + + // ----------------------------------------------------------------------- + // 66. TestJetStreamClusterNoDupePeerSelection + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterNoDupePeerSelection_ShouldNotSelectSamePeerTwiceForConsumer() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Create R=3 stream, create R=3 consumer, verify consumer cluster has distinct peers. + } + + // ----------------------------------------------------------------------- + // 67. TestJetStreamClusterStreamRemovePeer + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterStreamRemovePeer_ShouldReassignStreamAfterPeerRemoval() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(5, "R5S"); + c.WaitOnLeader(); + // Remove stream peer via $JS.API.STREAM.PEER.REMOVE, verify stream reassigned to new peer. + } + + // ----------------------------------------------------------------------- + // 68. TestJetStreamClusterStreamLeaderStepDown + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterStreamLeaderStepDown_ShouldElectNewStreamLeaderAfterStepDown() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Step down stream leader, verify new leader elected and stream still accessible. + } + + // ----------------------------------------------------------------------- + // 69. TestJetStreamClusterRemoveServer + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterRemoveServer_ShouldRebalanceStreamsAfterServerRemoval() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(5, "R5S"); + c.WaitOnLeader(); + // Remove server, verify streams are rebalanced, no stale peer references remain. + } + + // ----------------------------------------------------------------------- + // 70. TestJetStreamClusterPurgeReplayAfterRestart + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterPurgeReplayAfterRestart_ShouldReplayPurgeAfterRestart() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Publish 10, purge, restart cluster, verify stream is still empty. + } + + // ----------------------------------------------------------------------- + // 71. TestJetStreamClusterStreamGetMsg + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterStreamGetMsg_ShouldGetMessageBySequenceFromCluster() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Publish a msg, get it via $JS.API.STREAM.MSG.GET, verify data matches. + } + + // ----------------------------------------------------------------------- + // 72. TestJetStreamClusterStreamDirectGetMsg + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterStreamDirectGetMsg_ShouldSupportDirectGetFromReplica() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Create R=3 stream with AllowDirect=true, publish, direct-get from replica. + } + + // ----------------------------------------------------------------------- + // 73. TestJetStreamClusterStreamPerf + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterStreamPerf_ShouldPublishAndReceiveAllMessagesWithinTimeout() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Publish 5000 msgs to R=3 stream, verify all received by pull consumer. + } + + // ----------------------------------------------------------------------- + // 74. TestJetStreamClusterConsumerPerf + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterConsumerPerf_ShouldDeliverAllMessagesToPushConsumer() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Publish 5000 msgs to R=3 stream, push-consume all, verify count. + } + + // ----------------------------------------------------------------------- + // 75. TestJetStreamClusterQueueSubConsumer + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterQueueSubConsumer_ShouldDeliverExactlyOnceAcrossQueueGroup() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Create stream, subscribe with queue group, publish 100 msgs, + // verify each msg delivered exactly once across all queue members. + } + + // ----------------------------------------------------------------------- + // 76. TestJetStreamClusterLeaderStepdown + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterLeaderStepdown_ShouldElectNewMetaLeaderAfterStepDown() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Request meta leader stepdown, verify new leader elected. + } + + // ----------------------------------------------------------------------- + // 77. TestJetStreamClusterSourcesFilteringAndUpdating + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterSourcesFilteringAndUpdating_ShouldFilterSourcesBySubjectAndSupportUpdate() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Create source stream, create stream with filtered source, verify filtering, + // update source filter, verify updated behavior. + } + + // ----------------------------------------------------------------------- + // 78. TestJetStreamClusterSourcesUpdateOriginError + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterSourcesUpdateOriginError_ShouldReportErrorWhenSourceOriginChanges() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Create stream with source, update to change source origin — should error. + } + + // ----------------------------------------------------------------------- + // 79. TestJetStreamClusterMirrorAndSourcesClusterRestart + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterMirrorAndSourcesClusterRestart_ShouldContinueAfterRestart() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Create mirror and source streams, publish, restart cluster, verify counts preserved. + } + + // ----------------------------------------------------------------------- + // 80. TestJetStreamClusterMirrorAndSourcesFilteredConsumers + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterMirrorAndSourcesFilteredConsumers_ShouldWorkWithFilteredConsumers() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Create mirror with filtered subjects, consume from mirror with consumer filter. + } + + // ----------------------------------------------------------------------- + // 81. TestJetStreamClusterCrossAccountMirrorsAndSources + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterCrossAccountMirrorsAndSources_ShouldMirrorAcrossAccounts() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamClusterWithTemplate(ConfigHelper.JsClusterAccountsTemplate, 3, "R3S"); + c.WaitOnLeader(); + // Create stream in account ONE, mirror in account TWO, verify mirror receives msgs. + } + + // ----------------------------------------------------------------------- + // 82. TestJetStreamClusterFailMirrorsAndSources + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterFailMirrorsAndSources_ShouldFailGracefullyOnInvalidMirrorOrSource() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Attempt to create mirror/source on non-existent stream — should get error response. + } + + // ----------------------------------------------------------------------- + // 83. TestJetStreamClusterConsumerDeliveredSyncReporting + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterConsumerDeliveredSyncReporting_ShouldReportDeliveredSequenceAccurately() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Push consumer, publish msgs, verify Delivered.Stream and Delivered.Consumer in sync. + } + + // ----------------------------------------------------------------------- + // 84. TestJetStreamClusterConsumerAckSyncReporting + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterConsumerAckSyncReporting_ShouldReportAckFloorAccurately() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Pull consumer, fetch and ack, verify AckFloor.Stream and AckFloor.Consumer in sync. + } + + // ----------------------------------------------------------------------- + // 85. TestJetStreamClusterConsumerDeleteInterestPolicyMultipleConsumers + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterConsumerDeleteInterestPolicyMultipleConsumers_ShouldNotPurgeMsgsWithOtherActiveConsumers() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Interest stream, 2 consumers. Delete one — msgs should remain until other acks too. + } + + // ----------------------------------------------------------------------- + // 86. TestJetStreamClusterConsumerAckNoneInterestPolicyShouldNotRetainAfterDelivery + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterConsumerAckNoneInterestPolicyShouldNotRetainAfterDelivery_ShouldRemoveMsgsOnDelivery() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // AckNone consumer on Interest stream: msgs removed on delivery without explicit ack. + } + + // ----------------------------------------------------------------------- + // 87. TestJetStreamClusterConsumerDeleteAckNoneInterestPolicyWithOthers + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterConsumerDeleteAckNoneInterestPolicyWithOthers_ShouldHandleDeleteWithMultipleConsumers() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // AckNone + AckExplicit consumers on Interest stream, delete AckNone consumer, + // verify explicit consumer still sees msgs. + } + + // ----------------------------------------------------------------------- + // 88. TestJetStreamClusterMetaStepdownFromNonSysAccount + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterMetaStepdownFromNonSysAccount_ShouldFailWithPermissionError() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Non-system account attempting meta stepdown should get an error. + } + + // ----------------------------------------------------------------------- + // 89. TestJetStreamClusterMaxDeliveriesOnInterestStreams + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterMaxDeliveriesOnInterestStreams_ShouldRespectMaxDeliveriesSetting() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Interest stream, consumer MaxDelivers=3, verify msg removed after 3 delivery attempts. + } + + // ----------------------------------------------------------------------- + // 90. TestJetStreamClusterMetaRecoveryUpdatesDeletesConsumers + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterMetaRecoveryUpdatesDeletesConsumers_ShouldRecoverUpdatedAndDeletedConsumers() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Create, update, delete consumer. Restart. Verify correct final state recovered. + } + + // ----------------------------------------------------------------------- + // 91. TestJetStreamClusterMetaRecoveryRecreateFileStreamAsMemory + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterMetaRecoveryRecreateFileStreamAsMemory_ShouldRecoverStreamWithChangedStorageType() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Create File stream, delete it, recreate as Memory, restart, verify Memory type recovered. + } + + // ----------------------------------------------------------------------- + // 92. TestJetStreamClusterMetaRecoveryConsumerCreateAndRemove + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterMetaRecoveryConsumerCreateAndRemove_ShouldRecoverAfterConsumerCreateAndDelete() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Create consumer, delete it, restart, verify consumer is not present. + } + + // ----------------------------------------------------------------------- + // 93. TestJetStreamClusterMetaRecoveryAddAndUpdateStream + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterMetaRecoveryAddAndUpdateStream_ShouldRecoverUpdatedStreamConfig() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Create stream, update MaxMsgs, restart, verify updated config recovered. + } + + // ----------------------------------------------------------------------- + // 94. TestJetStreamClusterConsumerAckOutOfBounds + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterConsumerAckOutOfBounds_ShouldHandleOutOfBoundsAckGracefully() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Ack a sequence beyond delivered range — server should not crash, consumer stays healthy. + } + + // ----------------------------------------------------------------------- + // 95. TestJetStreamClusterCatchupLoadNextMsgTooManyDeletes + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterCatchupLoadNextMsgTooManyDeletes_ShouldCatchupWithHighDensityDeletes() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Publish 1000 msgs, delete 999 of them, kill stream leader, restart, verify catchup. + } + + // ----------------------------------------------------------------------- + // 96. TestJetStreamClusterCatchupMustStallWhenBehindOnApplies + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterCatchupMustStallWhenBehindOnApplies_ShouldNotOverloadCatchupQueue() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Kill stream replica, flood with messages, restart replica, verify catchup without queue overflow. + } + + // ----------------------------------------------------------------------- + // 97. TestJetStreamClusterConsumerInfoAfterCreate + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterConsumerInfoAfterCreate_ShouldReturnConsumerInfoImmediatelyAfterCreate() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Create consumer, immediately request ConsumerInfo, verify info is available. + } + + // ----------------------------------------------------------------------- + // 98. TestJetStreamClusterStreamUpscalePeersAfterDownscale + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterStreamUpscalePeersAfterDownscale_ShouldRestoreAllPeersOnUpscale() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(5, "R5S"); + c.WaitOnLeader(); + // Scale stream from R=3 to R=1, then back to R=3, verify 3 distinct current peers. + } + + // ----------------------------------------------------------------------- + // 99. TestJetStreamClusterClearAllPreAcksOnRemoveMsg + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterClearAllPreAcksOnRemoveMsg_ShouldClearPreAcksWhenMessageRemoved() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Interest stream, pre-ack msg, then delete it via DeleteMsg, verify pre-ack state cleared. + } + + // ----------------------------------------------------------------------- + // 100. TestJetStreamClusterStreamHealthCheckMustNotRecreate + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterStreamHealthCheckMustNotRecreate_ShouldNotRecreateExistingStream() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Health check on existing stream must not trigger recreation or data loss. + } + + // ----------------------------------------------------------------------- + // 101. TestJetStreamClusterStreamHealthCheckMustNotDeleteEarly + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterStreamHealthCheckMustNotDeleteEarly_ShouldNotDeleteStreamDuringHealthCheck() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Health check on stream with messages must not trigger premature deletion. + } + + // ----------------------------------------------------------------------- + // 102. TestJetStreamClusterStreamHealthCheckOnlyReportsSkew + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterStreamHealthCheckOnlyReportsSkew_ShouldOnlyReportSkewNotForceRecovery() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Force skew in stream replica, health check should report skew, not force recreation. + } + + // ----------------------------------------------------------------------- + // 103. TestJetStreamClusterStreamHealthCheckStreamCatchup + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterStreamHealthCheckStreamCatchup_ShouldTriggerCatchupOnHealthCheckFailure() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Simulate replica behind, health check should trigger catchup. + } + + // ----------------------------------------------------------------------- + // 104. TestJetStreamClusterConsumerHealthCheckMustNotRecreate + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterConsumerHealthCheckMustNotRecreate_ShouldNotRecreateExistingConsumer() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Health check on existing consumer must not trigger recreation. + } + + // ----------------------------------------------------------------------- + // 105. TestJetStreamClusterConsumerHealthCheckMustNotDeleteEarly + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterConsumerHealthCheckMustNotDeleteEarly_ShouldNotDeleteActiveConsumer() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Active consumer must not be prematurely deleted by health check. + } + + // ----------------------------------------------------------------------- + // 106. TestJetStreamClusterConsumerHealthCheckOnlyReportsSkew + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterConsumerHealthCheckOnlyReportsSkew_ShouldNotForceRecreateOnSkew() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Skewed consumer state should be reported, not cause forced recreation. + } + + // ----------------------------------------------------------------------- + // 107. TestJetStreamClusterConsumerHealthCheckDeleted + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterConsumerHealthCheckDeleted_ShouldCleanUpDeletedConsumerOnHealthCheck() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Deleted consumer should be cleaned up gracefully during health check. + } + + // ----------------------------------------------------------------------- + // 108. TestJetStreamClusterRespectConsumerStartSeq + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterRespectConsumerStartSeq_ShouldStartDeliveryFromConfiguredSequence() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Create consumer with DeliverByStartSequence=5, verify first delivered msg is seq 5. + } + + // ----------------------------------------------------------------------- + // 109. TestJetStreamClusterPeerRemoveStreamConsumerDesync + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterPeerRemoveStreamConsumerDesync_ShouldNotDesyncConsumerAfterPeerRemoval() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(5, "R5S"); + c.WaitOnLeader(); + // Remove a peer from stream, verify consumer state remains in sync with stream. + } + + // ----------------------------------------------------------------------- + // 110. TestJetStreamClusterStuckConsumerAfterLeaderChangeWithUnknownDeliveries + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterStuckConsumerAfterLeaderChangeWithUnknownDeliveries_ShouldRecoverFromStuckState() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Consumer with in-flight msgs, leader change — consumer should recover without getting stuck. + } + + // ----------------------------------------------------------------------- + // 111. TestJetStreamClusterAccountStatsForReplicatedStreams + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterAccountStatsForReplicatedStreams_ShouldCountStorageOnceNotPerReplica() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Publish to R=3 stream, verify account stats count storage once (logical), not 3x. + } + + // ----------------------------------------------------------------------- + // 112. TestJetStreamClusterRecreateConsumerFromMetaSnapshot + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterRecreateConsumerFromMetaSnapshot_ShouldRecreateConsumerFromSnapshot() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Create consumer, snapshot meta, shut server, restore from snapshot, verify consumer exists. + } + + // ----------------------------------------------------------------------- + // 113. TestJetStreamClusterUpgradeStreamVersioning + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterUpgradeStreamVersioning_ShouldHandleStreamVersionUpgrade() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Simulate stream version upgrade scenario, verify stream accessible after upgrade. + } + + // ----------------------------------------------------------------------- + // 114. TestJetStreamClusterUpgradeConsumerVersioning + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterUpgradeConsumerVersioning_ShouldHandleConsumerVersionUpgrade() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Simulate consumer version upgrade, verify consumer accessible after upgrade. + } + + // ----------------------------------------------------------------------- + // 115. TestJetStreamClusterInterestPolicyAckAll + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterInterestPolicyAckAll_ShouldRemoveMsgOnlyAfterAllConsumersAckAll() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Interest stream, AckAll consumer: msg removed only after all consumers AckAll. + } + + // ----------------------------------------------------------------------- + // 116. TestJetStreamClusterPreserveRedeliveredWithLaggingStream + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterPreserveRedeliveredWithLaggingStream_ShouldPreserveRedeliveredFlagDuringLag() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Consumer with lagging stream: redelivered flag must be preserved across leader changes. + } + + // ----------------------------------------------------------------------- + // 117. TestJetStreamClusterInvalidJSACKOverRoute + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterInvalidJsAckOverRoute_ShouldHandleInvalidAckGracefully() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // Invalid JSACK sent over a route should not crash the server. + } + + // ----------------------------------------------------------------------- + // 118. TestJetStreamClusterConsumerOnlyDeliverMsgAfterQuorum + // ----------------------------------------------------------------------- + [SkippableFact] + public void ClusterConsumerOnlyDeliverMsgAfterQuorum_ShouldNotDeliverBeforeQuorumAchieved() + { + Skip.If(ShouldSkip(), "Cluster integration tests are not enabled."); + + using var c = TestCluster.CreateJetStreamCluster(3, "R3S"); + c.WaitOnLeader(); + // R=3 consumer must not deliver msg until quorum (2 of 3) of replicas have acknowledged the entry. + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/ZB.MOM.NatsNet.Server.IntegrationTests.csproj b/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/ZB.MOM.NatsNet.Server.IntegrationTests.csproj index e8740b2..0e119b9 100644 --- a/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/ZB.MOM.NatsNet.Server.IntegrationTests.csproj +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.IntegrationTests/ZB.MOM.NatsNet.Server.IntegrationTests.csproj @@ -17,13 +17,17 @@ + + + +