// Go ref: TestJetStreamClusterXxx — jetstream_cluster_long_test.go
// Covers: high-volume publish/consume cycles, many sequential fetches, many consumers,
// many streams, repeated publish-ack-fetch cycles, stepdowns during publishing,
// alternating publish+stepdown, create-publish-delete sequences, ack tracking across
// failovers, batch-1 iteration, mixed multi-stream operations, rapid meta stepdowns,
// large R1 message volumes, max-messages stream limits, consumer pending correctness.
using System.Text;
using NATS.Server.JetStream.Api;
using NATS.Server.JetStream.Cluster;
using NATS.Server.JetStream.Models;
using NATS.Server.TestUtilities;
namespace NATS.Server.JetStream.Tests.JetStream.Cluster;
///
/// Long-running JetStream cluster tests covering high-volume scenarios,
/// repeated failover cycles, many-stream/many-consumer environments, and
/// limit enforcement under sustained load.
/// Ported from Go jetstream_cluster_long_test.go.
/// All tests are marked [Trait("Category", "LongRunning")].
///
public class JsClusterLongRunningTests
{
// ---------------------------------------------------------------
// Go ref: TestJetStreamClusterLong5000MessagesR3 — jetstream_cluster_long_test.go
// ---------------------------------------------------------------
[Fact]
[Trait("Category", "LongRunning")]
public async Task Five_thousand_messages_in_R3_stream_maintain_consistency()
{
// Go ref: TestJetStreamClusterLong5000MessagesR3 — jetstream_cluster_long_test.go
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("LONG5K", ["long5k.>"], replicas: 3);
for (var i = 0; i < 5000; i++)
{
var ack = await cluster.PublishAsync("long5k.data", $"msg-{i}");
ack.ErrorCode.ShouldBeNull();
ack.Seq.ShouldBe((ulong)(i + 1));
}
var state = await cluster.GetStreamStateAsync("LONG5K");
state.Messages.ShouldBe(5000UL);
state.FirstSeq.ShouldBe(1UL);
state.LastSeq.ShouldBe(5000UL);
}
// ---------------------------------------------------------------
// Go ref: TestJetStreamClusterLong100SequentialFetchesOf50 — jetstream_cluster_long_test.go
// ---------------------------------------------------------------
[Fact]
[Trait("Category", "LongRunning")]
public async Task One_hundred_sequential_fetches_of_fifty_messages_each()
{
// Go ref: TestJetStreamClusterLong100SequentialFetchesOf50 — jetstream_cluster_long_test.go
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("SEQFETCH", ["sf.>"], replicas: 3);
await cluster.CreateConsumerAsync("SEQFETCH", "batcher", filterSubject: "sf.>");
// Pre-publish 5000 messages
for (var i = 0; i < 5000; i++)
await cluster.PublishAsync("sf.event", $"msg-{i}");
var totalFetched = 0;
for (var batch = 0; batch < 100; batch++)
{
var result = await cluster.FetchAsync("SEQFETCH", "batcher", 50);
result.Messages.Count.ShouldBe(50);
totalFetched += result.Messages.Count;
// Verify sequences are contiguous within each batch
for (var j = 1; j < result.Messages.Count; j++)
result.Messages[j].Sequence.ShouldBe(result.Messages[j - 1].Sequence + 1);
}
totalFetched.ShouldBe(5000);
}
// ---------------------------------------------------------------
// Go ref: TestJetStreamClusterLong50ConsumersOnSameStream — jetstream_cluster_long_test.go
// ---------------------------------------------------------------
[Fact]
[Trait("Category", "LongRunning")]
public async Task Fifty_consumers_on_same_stream_all_see_all_messages()
{
// Go ref: TestJetStreamClusterLong50ConsumersOnSameStream — jetstream_cluster_long_test.go
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("FIFTYCONSUMERS", ["fc.>"], replicas: 3);
for (var i = 0; i < 100; i++)
await cluster.PublishAsync("fc.event", $"msg-{i}");
for (var c = 0; c < 50; c++)
await cluster.CreateConsumerAsync("FIFTYCONSUMERS", $"cons{c}", filterSubject: "fc.>");
// Each consumer should see all 100 messages independently
for (var c = 0; c < 50; c++)
{
var batch = await cluster.FetchAsync("FIFTYCONSUMERS", $"cons{c}", 100);
batch.Messages.Count.ShouldBe(100);
}
}
// ---------------------------------------------------------------
// Go ref: TestJetStreamClusterLong20StreamsIn5NodeCluster — jetstream_cluster_long_test.go
// ---------------------------------------------------------------
[Fact]
[Trait("Category", "LongRunning")]
public async Task Twenty_streams_in_five_node_cluster_are_independent()
{
// Go ref: TestJetStreamClusterLong20StreamsIn5NodeCluster — jetstream_cluster_long_test.go
await using var cluster = await JetStreamClusterFixture.StartAsync(5);
for (var i = 0; i < 20; i++)
await cluster.CreateStreamAsync($"IND{i}", [$"ind{i}.>"], replicas: 3);
// Publish to each stream
for (var i = 0; i < 20; i++)
for (var j = 0; j < 10; j++)
await cluster.PublishAsync($"ind{i}.event", $"stream{i}-msg{j}");
// Verify each stream is independent
for (var i = 0; i < 20; i++)
{
var state = await cluster.GetStreamStateAsync($"IND{i}");
state.Messages.ShouldBe(10UL);
}
var accountInfo = await cluster.RequestAsync(JetStreamApiSubjects.Info, "{}");
accountInfo.AccountInfo!.Streams.ShouldBe(20);
}
// ---------------------------------------------------------------
// Go ref: TestJetStreamClusterLongPublishAckFetchCycle100Times — jetstream_cluster_long_test.go
// ---------------------------------------------------------------
[Fact]
[Trait("Category", "LongRunning")]
public async Task Publish_ack_fetch_cycle_repeated_100_times()
{
// Go ref: TestJetStreamClusterLongPublishAckFetchCycle100Times — jetstream_cluster_long_test.go
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("PAFCYCLE", ["paf.>"], replicas: 3);
await cluster.CreateConsumerAsync("PAFCYCLE", "cycler", filterSubject: "paf.>",
ackPolicy: AckPolicy.All);
for (var cycle = 0; cycle < 100; cycle++)
{
// Publish one message per cycle
var ack = await cluster.PublishAsync("paf.event", $"cycle-{cycle}");
ack.ErrorCode.ShouldBeNull();
// Fetch one message
var batch = await cluster.FetchAsync("PAFCYCLE", "cycler", 1);
batch.Messages.Count.ShouldBe(1);
batch.Messages[0].Sequence.ShouldBe(ack.Seq);
// Ack it
cluster.AckAll("PAFCYCLE", "cycler", ack.Seq);
}
var finalState = await cluster.GetStreamStateAsync("PAFCYCLE");
finalState.Messages.ShouldBe(100UL);
}
// ---------------------------------------------------------------
// Go ref: TestJetStreamClusterLong10StepdownsDuringPublish — jetstream_cluster_long_test.go
// ---------------------------------------------------------------
[Fact]
[Trait("Category", "LongRunning")]
public async Task Ten_stepdowns_during_continuous_publish_preserve_all_messages()
{
// Go ref: TestJetStreamClusterLong10StepdownsDuringPublish — jetstream_cluster_long_test.go
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("STEPDURINGPUB", ["sdp.>"], replicas: 3);
var totalPublished = 0;
// Publish 50 messages per batch, then step down (10 iterations = 500 msgs + 10 stepdowns)
for (var sd = 0; sd < 10; sd++)
{
for (var i = 0; i < 50; i++)
{
var ack = await cluster.PublishAsync("sdp.event", $"batch{sd}-msg{i}");
ack.ErrorCode.ShouldBeNull();
totalPublished++;
}
(await cluster.StepDownStreamLeaderAsync("STEPDURINGPUB")).Success.ShouldBeTrue();
}
var state = await cluster.GetStreamStateAsync("STEPDURINGPUB");
state.Messages.ShouldBe((ulong)totalPublished);
state.LastSeq.ShouldBe((ulong)totalPublished);
}
// ---------------------------------------------------------------
// Go ref: TestJetStreamClusterLongAlternatingPublishAndStepdown — jetstream_cluster_long_test.go
// ---------------------------------------------------------------
[Fact]
[Trait("Category", "LongRunning")]
public async Task Alternating_publish_and_stepdown_20_iterations_preserves_monotonic_sequence()
{
// Go ref: TestJetStreamClusterLongAlternatingPublishAndStepdown — jetstream_cluster_long_test.go
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("ALTPUBSD", ["aps.>"], replicas: 3);
var allSeqs = new List();
for (var iter = 0; iter < 20; iter++)
{
var ack = await cluster.PublishAsync("aps.event", $"iter-{iter}");
ack.ErrorCode.ShouldBeNull();
allSeqs.Add(ack.Seq);
(await cluster.StepDownStreamLeaderAsync("ALTPUBSD")).Success.ShouldBeTrue();
}
// Verify strictly monotonically increasing sequences across all stepdowns
for (var i = 1; i < allSeqs.Count; i++)
allSeqs[i].ShouldBeGreaterThan(allSeqs[i - 1]);
allSeqs[^1].ShouldBe(20UL);
}
// ---------------------------------------------------------------
// Go ref: TestJetStreamClusterLongCreatePublishDelete20Streams — jetstream_cluster_long_test.go
// ---------------------------------------------------------------
[Fact]
[Trait("Category", "LongRunning")]
public async Task Create_publish_delete_20_streams_sequentially()
{
// Go ref: TestJetStreamClusterLongCreatePublishDelete20Streams — jetstream_cluster_long_test.go
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
for (var i = 0; i < 20; i++)
{
var streamName = $"SEQ{i}";
var create = await cluster.CreateStreamAsync(streamName, [$"seq{i}.>"], replicas: 3);
create.Error.ShouldBeNull();
for (var j = 0; j < 10; j++)
await cluster.PublishAsync($"seq{i}.event", $"msg-{j}");
var state = await cluster.GetStreamStateAsync(streamName);
state.Messages.ShouldBe(10UL);
var del = await cluster.RequestAsync($"{JetStreamApiSubjects.StreamDelete}{streamName}", "{}");
del.Success.ShouldBeTrue();
}
// All streams deleted
var accountInfo = await cluster.RequestAsync(JetStreamApiSubjects.Info, "{}");
accountInfo.AccountInfo!.Streams.ShouldBe(0);
}
// ---------------------------------------------------------------
// Go ref: TestJetStreamClusterLongConsumerAckAfter10Failovers — jetstream_cluster_long_test.go
// ---------------------------------------------------------------
[Fact]
[Trait("Category", "LongRunning")]
public async Task Consumer_ack_tracking_correct_after_ten_leader_failovers()
{
// Go ref: TestJetStreamClusterLongConsumerAckAfter10Failovers — jetstream_cluster_long_test.go
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("ACKFAIL", ["af.>"], replicas: 3);
await cluster.CreateConsumerAsync("ACKFAIL", "tracker", filterSubject: "af.>",
ackPolicy: AckPolicy.All);
// Pre-publish 100 messages
for (var i = 0; i < 100; i++)
await cluster.PublishAsync("af.event", $"msg-{i}");
// Fetch and ack in batches across 10 failovers
var ackedThrough = 0UL;
for (var failover = 0; failover < 10; failover++)
{
var batch = await cluster.FetchAsync("ACKFAIL", "tracker", 10);
batch.Messages.Count.ShouldBe(10);
var lastSeq = batch.Messages[^1].Sequence;
cluster.AckAll("ACKFAIL", "tracker", lastSeq);
ackedThrough = lastSeq;
(await cluster.StepDownStreamLeaderAsync("ACKFAIL")).Success.ShouldBeTrue();
}
ackedThrough.ShouldBe(100UL);
}
// ---------------------------------------------------------------
// Go ref: TestJetStreamClusterLongFetchBatch1Iterated500Times — jetstream_cluster_long_test.go
// ---------------------------------------------------------------
[Fact]
[Trait("Category", "LongRunning")]
public async Task Fetch_with_batch_1_iterated_500_times_reads_all_messages()
{
// Go ref: TestJetStreamClusterLongFetchBatch1Iterated500Times — jetstream_cluster_long_test.go
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("BATCH1ITER", ["b1i.>"], replicas: 3);
await cluster.CreateConsumerAsync("BATCH1ITER", "one_at_a_time", filterSubject: "b1i.>");
for (var i = 0; i < 500; i++)
await cluster.PublishAsync("b1i.event", $"msg-{i}");
var allSeqs = new List();
for (var i = 0; i < 500; i++)
{
var batch = await cluster.FetchAsync("BATCH1ITER", "one_at_a_time", 1);
batch.Messages.Count.ShouldBe(1);
allSeqs.Add(batch.Messages[0].Sequence);
}
// All 500 sequences read, strictly increasing
allSeqs.Count.ShouldBe(500);
for (var i = 1; i < allSeqs.Count; i++)
allSeqs[i].ShouldBeGreaterThan(allSeqs[i - 1]);
}
// ---------------------------------------------------------------
// Go ref: TestJetStreamClusterLongMixedMultiStreamOps — jetstream_cluster_long_test.go
// ---------------------------------------------------------------
[Fact]
[Trait("Category", "LongRunning")]
public async Task Mixed_ops_five_streams_100_messages_each_consumers_fetch_all()
{
// Go ref: TestJetStreamClusterLongMixedMultiStreamOps — jetstream_cluster_long_test.go
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
// Create 5 streams
for (var s = 0; s < 5; s++)
await cluster.CreateStreamAsync($"MIXED{s}", [$"mixed{s}.>"], replicas: 3);
// Publish 100 messages to each
for (var s = 0; s < 5; s++)
for (var i = 0; i < 100; i++)
await cluster.PublishAsync($"mixed{s}.event", $"stream{s}-msg{i}");
// Create one consumer per stream
for (var s = 0; s < 5; s++)
await cluster.CreateConsumerAsync($"MIXED{s}", $"reader{s}", filterSubject: $"mixed{s}.>");
// Fetch all messages from each stream consumer
for (var s = 0; s < 5; s++)
{
var batch = await cluster.FetchAsync($"MIXED{s}", $"reader{s}", 100);
batch.Messages.Count.ShouldBe(100);
batch.Messages[0].Sequence.ShouldBe(1UL);
batch.Messages[^1].Sequence.ShouldBe(100UL);
}
var info = await cluster.RequestAsync(JetStreamApiSubjects.Info, "{}");
info.AccountInfo!.Streams.ShouldBe(5);
info.AccountInfo.Consumers.ShouldBe(5);
}
// ---------------------------------------------------------------
// Go ref: TestJetStreamClusterLongRapidMetaStepdowns — jetstream_cluster_long_test.go
// ---------------------------------------------------------------
[Fact]
[Trait("Category", "LongRunning")]
public async Task Rapid_meta_stepdowns_20_times_all_streams_remain_accessible()
{
// Go ref: TestJetStreamClusterLongRapidMetaStepdowns — jetstream_cluster_long_test.go
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
// Create streams before stepdowns
for (var i = 0; i < 5; i++)
await cluster.CreateStreamAsync($"RAPID{i}", [$"rapid{i}.>"], replicas: 3);
var leaderVersions = new List();
var initialState = cluster.GetMetaState();
leaderVersions.Add(initialState!.LeadershipVersion);
// Perform 20 rapid meta stepdowns
for (var sd = 0; sd < 20; sd++)
{
cluster.StepDownMetaLeader();
var state = cluster.GetMetaState();
leaderVersions.Add(state!.LeadershipVersion);
}
// Leadership version must monotonically increase
for (var i = 1; i < leaderVersions.Count; i++)
leaderVersions[i].ShouldBeGreaterThan(leaderVersions[i - 1]);
// All streams still accessible
var names = await cluster.RequestAsync(JetStreamApiSubjects.StreamNames, "{}");
names.StreamNames!.Count.ShouldBe(5);
for (var i = 0; i < 5; i++)
names.StreamNames.ShouldContain($"RAPID{i}");
}
// ---------------------------------------------------------------
// Go ref: TestJetStreamClusterLong10000MessagesR1 — jetstream_cluster_long_test.go
// ---------------------------------------------------------------
[Fact]
[Trait("Category", "LongRunning")]
public async Task Ten_thousand_small_messages_in_R1_stream()
{
// Go ref: TestJetStreamClusterLong10000MessagesR1 — jetstream_cluster_long_test.go
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
await cluster.CreateStreamAsync("R1HUGE", ["r1h.>"], replicas: 1);
for (var i = 0; i < 10000; i++)
{
var ack = await cluster.PublishAsync("r1h.event", $"x{i}");
ack.ErrorCode.ShouldBeNull();
}
var state = await cluster.GetStreamStateAsync("R1HUGE");
state.Messages.ShouldBe(10000UL);
state.LastSeq.ShouldBe(10000UL);
}
// ---------------------------------------------------------------
// Go ref: TestJetStreamClusterLongMaxMessagesLimit — jetstream_cluster_long_test.go
// ---------------------------------------------------------------
[Fact]
[Trait("Category", "LongRunning")]
public async Task Stream_with_max_messages_100_has_exactly_100_after_1000_publishes()
{
// Go ref: TestJetStreamClusterLongMaxMessagesLimit — jetstream_cluster_long_test.go
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
var cfg = new StreamConfig
{
Name = "MAXLIMIT",
Subjects = ["ml.>"],
Replicas = 3,
MaxMsgs = 100,
};
cluster.CreateStreamDirect(cfg);
for (var i = 0; i < 1000; i++)
await cluster.PublishAsync("ml.event", $"msg-{i}");
var state = await cluster.GetStreamStateAsync("MAXLIMIT");
// MaxMsgs=100: only the latest 100 messages retained (old ones discarded)
state.Messages.ShouldBeLessThanOrEqualTo(100UL);
state.Messages.ShouldBeGreaterThan(0UL);
}
// ---------------------------------------------------------------
// Go ref: TestJetStreamClusterLongConsumerPendingWithMaxMessages — jetstream_cluster_long_test.go
// ---------------------------------------------------------------
[Fact]
[Trait("Category", "LongRunning")]
public async Task Consumer_on_max_messages_stream_tracks_correct_pending()
{
// Go ref: TestJetStreamClusterLongConsumerPendingWithMaxMessages — jetstream_cluster_long_test.go
await using var cluster = await JetStreamClusterFixture.StartAsync(3);
var cfg = new StreamConfig
{
Name = "MAXPEND",
Subjects = ["mp.>"],
Replicas = 3,
MaxMsgs = 50,
};
cluster.CreateStreamDirect(cfg);
// Publish 200 messages (150 will be evicted by MaxMsgs)
for (var i = 0; i < 200; i++)
await cluster.PublishAsync("mp.event", $"msg-{i}");
var state = await cluster.GetStreamStateAsync("MAXPEND");
// Stream retains at most 50 messages
state.Messages.ShouldBeLessThanOrEqualTo(50UL);
// Create consumer after publishes (starts at current first seq)
await cluster.CreateConsumerAsync("MAXPEND", "latecons", filterSubject: "mp.>",
ackPolicy: AckPolicy.None);
var batch = await cluster.FetchAsync("MAXPEND", "latecons", 100);
// Consumer should see only retained messages
((ulong)batch.Messages.Count).ShouldBeLessThanOrEqualTo(state.Messages);
}
}