Files
natsdotnet/tests/NATS.Server.Core.Tests/Stress/ClusterStressTests.cs
Joseph Doherty 88a82ee860 docs: add XML doc comments to server types and fix flaky test timings
Add XML doc comments to public properties across EventTypes, Connz, Varz,
NatsOptions, StreamConfig, IStreamStore, FileStore, MqttListener,
MqttSessionStore, MessageTraceContext, and JetStreamApiResponse. Fix flaky
tests by increasing timing margins (ResponseTracker expiry 1ms→50ms,
sleep 50ms→200ms) and document known flaky test patterns in tests.md.
2026-03-13 18:47:48 -04:00

671 lines
23 KiB
C#

// Go parity: golang/nats-server/server/norace_2_test.go
// Covers: concurrent stream creation, parallel publish to clustered streams,
// concurrent consumer creation and fetch, leader stepdown under load,
// create-delete-recreate cycles, mixed concurrent operations, and large
// batch fetch under concurrent publish — all using ClusterFixture.
using System.Collections.Concurrent;
using System.Text;
using NATS.Server.JetStream;
using NATS.Server.JetStream.Api;
using NATS.Server.JetStream.Cluster;
using NATS.Server.JetStream.Consumers;
using NATS.Server.JetStream.Models;
using NATS.Server.JetStream.Publish;
using ClusterFixture = NATS.Server.TestUtilities.JetStreamClusterFixture;
using NATS.Server.TestUtilities;
namespace NATS.Server.Core.Tests.Stress;
/// <summary>
/// Stress tests for clustered JetStream operations under concurrency.
/// Uses JetStreamClusterFixture (in-process meta-group) to simulate cluster behaviour
/// consistent with how Tasks 6-10 are tested.
///
/// Go ref: norace_2_test.go — cluster stress tests.
/// </summary>
public class ClusterStressTests
{
// ---------------------------------------------------------------
// Go: TestNoRaceJetStreamCluster100ConcurrentStreamCreates norace_2_test.go
// ---------------------------------------------------------------
[Fact]
[Trait("Category", "Stress")]
public async Task Cluster_100_concurrent_stream_creates_all_succeed()
{
await using var fx = await ClusterFixture.StartAsync(nodes: 3);
const int count = 100;
var errors = new ConcurrentBag<Exception>();
var created = new ConcurrentBag<string>();
await Parallel.ForEachAsync(Enumerable.Range(0, count), async (i, _) =>
{
try
{
var resp = await fx.CreateStreamAsync(
$"CONCS{i}",
[$"concs{i}.>"],
1);
if (resp.Error is null)
created.Add($"CONCS{i}");
}
catch (Exception ex) { errors.Add(ex); }
await Task.CompletedTask;
});
errors.ShouldBeEmpty();
created.Count.ShouldBe(count);
}
// ---------------------------------------------------------------
// Go: TestNoRaceJetStreamCluster50ConcurrentPublishes norace_2_test.go
// ---------------------------------------------------------------
[Fact]
[Trait("Category", "Stress")]
public async Task Cluster_50_concurrent_publishes_to_same_stream_all_stored()
{
await using var fx = await ClusterFixture.StartAsync(nodes: 3);
await fx.CreateStreamAsync("CONCPUB", ["concpub.>"], 1);
const int publishes = 50;
var sequences = new ConcurrentBag<ulong>();
var errors = new ConcurrentBag<Exception>();
// Publish must be sequential because the in-process store serialises writes.
// The concurrency in Go's norace tests comes from multiple goroutines being
// scheduled — here we verify the sequential publish path is correct.
for (var i = 0; i < publishes; i++)
{
try
{
var ack = await fx.PublishAsync($"concpub.event.{i}", $"payload-{i}");
sequences.Add(ack.Seq);
}
catch (Exception ex) { errors.Add(ex); }
}
errors.ShouldBeEmpty();
sequences.Count.ShouldBe(publishes);
var state = await fx.GetStreamStateAsync("CONCPUB");
state.Messages.ShouldBe((ulong)publishes);
}
// ---------------------------------------------------------------
// Go: TestNoRaceJetStreamCluster20StreamsConcurrentPublish norace_2_test.go
// ---------------------------------------------------------------
[Fact]
[Trait("Category", "Stress")]
public async Task Cluster_20_streams_with_concurrent_publish_each_stores_correct_count()
{
await using var fx = await ClusterFixture.StartAsync(nodes: 3);
const int streamCount = 20;
const int msgsPerStream = 10;
for (var i = 0; i < streamCount; i++)
await fx.CreateStreamAsync($"MULTI{i}", [$"multi{i}.>"], 1);
var errors = new ConcurrentBag<Exception>();
// Independent streams publish in parallel — each has its own store.
await Parallel.ForEachAsync(Enumerable.Range(0, streamCount), async (i, _) =>
{
try
{
for (var j = 0; j < msgsPerStream; j++)
await fx.PublishAsync($"multi{i}.event", $"msg-{i}-{j}");
}
catch (Exception ex) { errors.Add(ex); }
await Task.CompletedTask;
});
errors.ShouldBeEmpty();
for (var i = 0; i < streamCount; i++)
{
var state = await fx.GetStreamStateAsync($"MULTI{i}");
state.Messages.ShouldBe((ulong)msgsPerStream,
$"stream MULTI{i} should have {msgsPerStream} messages");
}
}
// ---------------------------------------------------------------
// Go: TestNoRaceJetStreamClusterLeaderStepdownConcurrentPublish norace_2_test.go
// ---------------------------------------------------------------
[Fact]
[Trait("Category", "Stress")]
public async Task Cluster_leader_stepdown_during_concurrent_publishes_does_not_lose_data()
{
await using var fx = await ClusterFixture.StartAsync(nodes: 3);
await fx.CreateStreamAsync("STEPUB", ["stepub.>"], 3);
const int publishCount = 20;
var errors = new ConcurrentBag<Exception>();
for (var i = 0; i < publishCount; i++)
{
try
{
if (i == 5)
await fx.StepDownStreamLeaderAsync("STEPUB");
await fx.PublishAsync($"stepub.event.{i}", $"msg-{i}");
}
catch (Exception ex) { errors.Add(ex); }
}
errors.ShouldBeEmpty();
var state = await fx.GetStreamStateAsync("STEPUB");
state.Messages.ShouldBe((ulong)publishCount);
}
// ---------------------------------------------------------------
// Go: TestNoRaceJetStreamCluster100ConcurrentConsumerCreates norace_2_test.go
// ---------------------------------------------------------------
[Fact]
[Trait("Category", "Stress")]
public async Task Cluster_100_concurrent_consumer_creates_all_succeed()
{
await using var fx = await ClusterFixture.StartAsync(nodes: 3);
await fx.CreateStreamAsync("CONCON", ["concon.>"], 1);
const int count = 100;
var errors = new ConcurrentBag<Exception>();
await Parallel.ForEachAsync(Enumerable.Range(0, count), async (i, _) =>
{
try
{
await fx.CreateConsumerAsync("CONCON", $"consumer{i}");
}
catch (Exception ex) { errors.Add(ex); }
await Task.CompletedTask;
});
errors.ShouldBeEmpty();
}
// ---------------------------------------------------------------
// Go: TestNoRaceJetStreamCluster50ConcurrentFetches norace_2_test.go
// ---------------------------------------------------------------
[Fact]
[Trait("Category", "Stress")]
public async Task Cluster_50_sequential_fetches_on_same_consumer_all_succeed()
{
await using var fx = await ClusterFixture.StartAsync(nodes: 3);
await fx.CreateStreamAsync("CONFETCH", ["confetch.>"], 1);
await fx.CreateConsumerAsync("CONFETCH", "fetcher");
for (var i = 0; i < 100; i++)
await fx.PublishAsync("confetch.event", $"msg-{i}");
var errors = new ConcurrentBag<Exception>();
for (var i = 0; i < 50; i++)
{
try
{
var batch = await fx.FetchAsync("CONFETCH", "fetcher", 1);
batch.ShouldNotBeNull();
}
catch (Exception ex) { errors.Add(ex); }
}
errors.ShouldBeEmpty();
}
// ---------------------------------------------------------------
// Go: TestNoRaceJetStreamClusterPublishFetchInterleave norace_2_test.go
// ---------------------------------------------------------------
[Fact]
[Trait("Category", "Stress")]
public async Task Cluster_concurrent_publish_and_fetch_interleaving_delivers_all_messages()
{
await using var fx = await ClusterFixture.StartAsync(nodes: 3);
await fx.CreateStreamAsync("INTERLEAVE", ["inter.>"], 1);
await fx.CreateConsumerAsync("INTERLEAVE", "reader");
const int rounds = 10;
const int msgsPerRound = 5;
var errors = new ConcurrentBag<Exception>();
var totalFetched = 0;
for (var r = 0; r < rounds; r++)
{
try
{
for (var m = 0; m < msgsPerRound; m++)
await fx.PublishAsync("inter.event", $"round-{r}-msg-{m}");
var batch = await fx.FetchAsync("INTERLEAVE", "reader", msgsPerRound);
Interlocked.Add(ref totalFetched, batch.Messages.Count);
}
catch (Exception ex) { errors.Add(ex); }
}
errors.ShouldBeEmpty();
totalFetched.ShouldBe(rounds * msgsPerRound);
}
// ---------------------------------------------------------------
// Go: TestNoRaceJetStreamClusterMetaStepdownDuringStreamCreate norace_2_test.go
// ---------------------------------------------------------------
[Fact]
[Trait("Category", "Stress")]
public void Cluster_meta_stepdown_during_stream_creation_does_not_corrupt_state()
{
var meta = new JetStreamMetaGroup(5);
var consumerManager = new ConsumerManager(meta);
var streamManager = new StreamManager(meta, consumerManager: consumerManager);
var errors = new ConcurrentBag<Exception>();
Parallel.Invoke(
() =>
{
try
{
for (var i = 0; i < 30; i++)
{
streamManager.CreateOrUpdate(new StreamConfig
{
Name = $"METACD{i}",
Subjects = [$"mcd{i}.>"],
Replicas = 1,
});
}
}
catch (Exception ex) { errors.Add(ex); }
},
() =>
{
try
{
for (var i = 0; i < 5; i++)
{
meta.StepDown();
Thread.Sleep(10);
}
}
catch (Exception ex) { errors.Add(ex); }
});
errors.ShouldBeEmpty();
}
// ---------------------------------------------------------------
// Go: TestNoRaceJetStreamCluster10ConcurrentStreamDeletes norace_2_test.go
// ---------------------------------------------------------------
[Fact]
[Trait("Category", "Stress")]
public async Task Cluster_10_concurrent_stream_deletes_complete_without_error()
{
await using var fx = await ClusterFixture.StartAsync(nodes: 3);
const int count = 10;
for (var i = 0; i < count; i++)
await fx.CreateStreamAsync($"DEL{i}", [$"del{i}.>"], 1);
var errors = new ConcurrentBag<Exception>();
await Parallel.ForEachAsync(Enumerable.Range(0, count), async (i, _) =>
{
try
{
var resp = await fx.RequestAsync($"{JetStreamApiSubjects.StreamDelete}DEL{i}", "{}");
resp.ShouldNotBeNull();
}
catch (Exception ex) { errors.Add(ex); }
await Task.CompletedTask;
});
errors.ShouldBeEmpty();
}
// ---------------------------------------------------------------
// Go: TestNoRaceJetStreamClusterConcurrentAckAll norace_2_test.go
// ---------------------------------------------------------------
[Fact]
[Trait("Category", "Stress")]
public async Task Cluster_concurrent_ackall_operations_advance_consumer_correctly()
{
await using var fx = await ClusterFixture.StartAsync(nodes: 3);
await fx.CreateStreamAsync("ACKALL", ["ackall.>"], 1);
await fx.CreateConsumerAsync("ACKALL", "acker", ackPolicy: AckPolicy.All);
const int msgCount = 50;
for (var i = 0; i < msgCount; i++)
await fx.PublishAsync("ackall.event", $"msg-{i}");
var errors = new ConcurrentBag<Exception>();
for (ulong seq = 1; seq <= msgCount; seq += 5)
{
try
{
fx.AckAll("ACKALL", "acker", seq);
}
catch (Exception ex) { errors.Add(ex); }
}
errors.ShouldBeEmpty();
}
// ---------------------------------------------------------------
// Go: TestNoRaceJetStreamClusterMultiConsumerConcurrentFetch norace_2_test.go
// ---------------------------------------------------------------
[Fact]
[Trait("Category", "Stress")]
public async Task Cluster_multiple_consumers_each_see_all_messages_independently()
{
await using var fx = await ClusterFixture.StartAsync(nodes: 3);
await fx.CreateStreamAsync("MULTICONSUMER", ["mc.>"], 1);
const int consumers = 5;
const int msgCount = 10;
for (var c = 0; c < consumers; c++)
await fx.CreateConsumerAsync("MULTICONSUMER", $"reader{c}");
for (var i = 0; i < msgCount; i++)
await fx.PublishAsync("mc.event", $"msg-{i}");
var errors = new ConcurrentBag<Exception>();
await Parallel.ForEachAsync(Enumerable.Range(0, consumers), async (c, _) =>
{
try
{
var batch = await fx.FetchAsync("MULTICONSUMER", $"reader{c}", msgCount);
batch.Messages.Count.ShouldBe(msgCount,
$"consumer reader{c} should see all {msgCount} messages");
}
catch (Exception ex) { errors.Add(ex); }
await Task.CompletedTask;
});
errors.ShouldBeEmpty();
}
// ---------------------------------------------------------------
// Go: TestNoRaceJetStreamClusterRapidCreateDeleteRecreate norace_2_test.go
// ---------------------------------------------------------------
[Fact]
[Trait("Category", "Stress")]
public async Task Cluster_rapid_create_delete_recreate_cycle_50_iterations_correct()
{
await using var fx = await ClusterFixture.StartAsync(nodes: 3);
const int iterations = 50;
var errors = new ConcurrentBag<Exception>();
for (var i = 0; i < iterations; i++)
{
try
{
var createResp = await fx.CreateStreamAsync("RECYCLE", ["recycle.>"], 1);
if (createResp.Error is null)
{
await fx.PublishAsync("recycle.event", $"msg-{i}");
await fx.RequestAsync($"{JetStreamApiSubjects.StreamDelete}RECYCLE", "{}");
}
}
catch (Exception ex) { errors.Add(ex); }
}
errors.ShouldBeEmpty();
}
// ---------------------------------------------------------------
// Go: TestNoRaceJetStreamClusterMixedConcurrentOperations norace_2_test.go
// ---------------------------------------------------------------
[Fact]
[Trait("Category", "Stress")]
public async Task Cluster_mixed_create_publish_fetch_delete_concurrently_does_not_corrupt()
{
await using var fx = await ClusterFixture.StartAsync(nodes: 3);
await fx.CreateStreamAsync("MIXEDBASE", ["mixed.>"], 1);
await fx.CreateConsumerAsync("MIXEDBASE", "mixedreader");
const int opsPerTask = 20;
var errors = new ConcurrentBag<Exception>();
await Task.WhenAll(
Task.Run(async () =>
{
try
{
for (var i = 0; i < opsPerTask; i++)
await fx.CreateStreamAsync($"MXNEW{i}", [$"mxnew{i}.>"], 1);
}
catch (Exception ex) { errors.Add(ex); }
}),
Task.Run(async () =>
{
try
{
for (var i = 0; i < opsPerTask; i++)
await fx.PublishAsync("mixed.event", $"msg-{i}");
}
catch (Exception ex) { errors.Add(ex); }
}),
Task.Run(async () =>
{
try
{
for (var i = 0; i < opsPerTask; i++)
_ = await fx.FetchAsync("MIXEDBASE", "mixedreader", 1);
}
catch (Exception ex) { errors.Add(ex); }
}),
Task.Run(async () =>
{
try
{
for (var i = 0; i < opsPerTask; i++)
_ = await fx.GetStreamInfoAsync("MIXEDBASE");
}
catch (Exception ex) { errors.Add(ex); }
}));
errors.ShouldBeEmpty();
}
// ---------------------------------------------------------------
// Go: TestNoRaceJetStreamClusterConcurrentStreamInfo norace_2_test.go
// ---------------------------------------------------------------
[Fact]
[Trait("Category", "Stress")]
public async Task Cluster_concurrent_stream_info_queries_during_publishes_are_safe()
{
await using var fx = await ClusterFixture.StartAsync(nodes: 3);
await fx.CreateStreamAsync("INFOLOAD", ["infoload.>"], 1);
const int ops = 50;
var errors = new ConcurrentBag<Exception>();
await Task.WhenAll(
Task.Run(async () =>
{
try
{
for (var i = 0; i < ops; i++)
await fx.PublishAsync("infoload.event", $"msg-{i}");
}
catch (Exception ex) { errors.Add(ex); }
}),
Task.Run(async () =>
{
try
{
for (var i = 0; i < ops * 2; i++)
_ = await fx.GetStreamInfoAsync("INFOLOAD");
}
catch (Exception ex) { errors.Add(ex); }
}),
Task.Run(async () =>
{
try
{
for (var i = 0; i < ops * 2; i++)
_ = await fx.GetStreamStateAsync("INFOLOAD");
}
catch (Exception ex) { errors.Add(ex); }
}));
errors.ShouldBeEmpty();
}
// ---------------------------------------------------------------
// Go: TestNoRaceJetStreamClusterLargeBatchFetch norace_2_test.go
// ---------------------------------------------------------------
[Fact]
[Trait("Category", "Stress")]
public async Task Cluster_large_batch_fetch_500_messages_under_concurrent_publish()
{
await using var fx = await ClusterFixture.StartAsync(nodes: 3);
await fx.CreateStreamAsync("LARGEBATCH", ["lb.>"], 1);
await fx.CreateConsumerAsync("LARGEBATCH", "batchreader");
const int totalMsgs = 500;
for (var i = 0; i < totalMsgs; i++)
await fx.PublishAsync("lb.event", $"payload-{i}");
var errors = new ConcurrentBag<Exception>();
var fetchedCount = 0;
await Task.WhenAll(
Task.Run(async () =>
{
try
{
var batch = await fx.FetchAsync("LARGEBATCH", "batchreader", totalMsgs);
Interlocked.Add(ref fetchedCount, batch.Messages.Count);
}
catch (Exception ex) { errors.Add(ex); }
}),
Task.Run(async () =>
{
try
{
for (var i = 0; i < 50; i++)
await fx.PublishAsync("lb.event", $"extra-{i}");
}
catch (Exception ex) { errors.Add(ex); }
}));
errors.ShouldBeEmpty();
fetchedCount.ShouldBe(totalMsgs);
}
// ---------------------------------------------------------------
// Go: TestNoRaceJetStreamClusterConsumerDeleteConcurrent norace_2_test.go
// ---------------------------------------------------------------
[Fact]
[Trait("Category", "Stress")]
public async Task Cluster_concurrent_consumer_delete_and_create_is_thread_safe()
{
await using var fx = await ClusterFixture.StartAsync(nodes: 3);
await fx.CreateStreamAsync("CONDEL", ["condel.>"], 1);
const int initialCount = 20;
for (var i = 0; i < initialCount; i++)
await fx.CreateConsumerAsync("CONDEL", $"c{i}");
var errors = new ConcurrentBag<Exception>();
await Task.WhenAll(
Task.Run(async () =>
{
try
{
for (var i = 0; i < initialCount / 2; i++)
await fx.RequestAsync(
$"{JetStreamApiSubjects.ConsumerDelete}CONDEL.c{i}", "{}");
}
catch (Exception ex) { errors.Add(ex); }
}),
Task.Run(async () =>
{
try
{
for (var i = initialCount; i < initialCount + 10; i++)
await fx.CreateConsumerAsync("CONDEL", $"c{i}");
}
catch (Exception ex) { errors.Add(ex); }
}),
Task.Run(async () =>
{
try
{
for (var i = 0; i < 30; i++)
_ = await fx.GetStreamInfoAsync("CONDEL");
}
catch (Exception ex) { errors.Add(ex); }
}));
errors.ShouldBeEmpty();
}
// ---------------------------------------------------------------
// Go: TestNoRaceJetStreamClusterStreamPurgeConcurrentFetch norace_2_test.go
// ---------------------------------------------------------------
[Fact]
[Trait("Category", "Stress")]
public async Task Cluster_stream_purge_concurrent_with_fetch_does_not_deadlock()
{
await using var fx = await ClusterFixture.StartAsync(nodes: 3);
await fx.CreateStreamAsync("PURGELOAD", ["pl.>"], 1);
await fx.CreateConsumerAsync("PURGELOAD", "purgereader");
for (var i = 0; i < 100; i++)
await fx.PublishAsync("pl.event", $"msg-{i}");
var errors = new ConcurrentBag<Exception>();
await Task.WhenAll(
Task.Run(async () =>
{
try
{
await fx.RequestAsync($"{JetStreamApiSubjects.StreamPurge}PURGELOAD", "{}");
}
catch (Exception ex) { errors.Add(ex); }
}),
Task.Run(async () =>
{
try
{
_ = await fx.FetchAsync("PURGELOAD", "purgereader", 50);
}
catch (Exception ex) { errors.Add(ex); }
}));
errors.ShouldBeEmpty();
}
}