diff --git a/tests/NATS.Server.Tests/Stress/ClusterStressTests.cs b/tests/NATS.Server.Tests/Stress/ClusterStressTests.cs
new file mode 100644
index 0000000..6ad7bbe
--- /dev/null
+++ b/tests/NATS.Server.Tests/Stress/ClusterStressTests.cs
@@ -0,0 +1,669 @@
+// Go parity: golang/nats-server/server/norace_2_test.go
+// Covers: concurrent stream creation, parallel publish to clustered streams,
+// concurrent consumer creation and fetch, leader stepdown under load,
+// create-delete-recreate cycles, mixed concurrent operations, and large
+// batch fetch under concurrent publish — all using ClusterFixture.
+
+using System.Collections.Concurrent;
+using System.Text;
+using NATS.Server.JetStream;
+using NATS.Server.JetStream.Api;
+using NATS.Server.JetStream.Cluster;
+using NATS.Server.JetStream.Consumers;
+using NATS.Server.JetStream.Models;
+using NATS.Server.JetStream.Publish;
+using ClusterFixture = NATS.Server.Tests.JetStream.Cluster.JetStreamClusterFixture;
+
+namespace NATS.Server.Tests.Stress;
+
+///
+/// Stress tests for clustered JetStream operations under concurrency.
+/// Uses JetStreamClusterFixture (in-process meta-group) to simulate cluster behaviour
+/// consistent with how Tasks 6-10 are tested.
+///
+/// Go ref: norace_2_test.go — cluster stress tests.
+///
+public class ClusterStressTests
+{
+ // ---------------------------------------------------------------
+ // Go: TestNoRaceJetStreamCluster100ConcurrentStreamCreates norace_2_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "Stress")]
+ public async Task Cluster_100_concurrent_stream_creates_all_succeed()
+ {
+ await using var fx = await ClusterFixture.StartAsync(nodes: 3);
+ const int count = 100;
+ var errors = new ConcurrentBag();
+ var created = new ConcurrentBag();
+
+ await Parallel.ForEachAsync(Enumerable.Range(0, count), async (i, _) =>
+ {
+ try
+ {
+ var resp = await fx.CreateStreamAsync(
+ $"CONCS{i}",
+ [$"concs{i}.>"],
+ 1);
+
+ if (resp.Error is null)
+ created.Add($"CONCS{i}");
+ }
+ catch (Exception ex) { errors.Add(ex); }
+
+ await Task.CompletedTask;
+ });
+
+ errors.ShouldBeEmpty();
+ created.Count.ShouldBe(count);
+ }
+
+ // ---------------------------------------------------------------
+ // Go: TestNoRaceJetStreamCluster50ConcurrentPublishes norace_2_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "Stress")]
+ public async Task Cluster_50_concurrent_publishes_to_same_stream_all_stored()
+ {
+ await using var fx = await ClusterFixture.StartAsync(nodes: 3);
+ await fx.CreateStreamAsync("CONCPUB", ["concpub.>"], 1);
+
+ const int publishes = 50;
+ var sequences = new ConcurrentBag();
+ var errors = new ConcurrentBag();
+
+ // Publish must be sequential because the in-process store serialises writes.
+ // The concurrency in Go's norace tests comes from multiple goroutines being
+ // scheduled — here we verify the sequential publish path is correct.
+ for (var i = 0; i < publishes; i++)
+ {
+ try
+ {
+ var ack = await fx.PublishAsync($"concpub.event.{i}", $"payload-{i}");
+ sequences.Add(ack.Seq);
+ }
+ catch (Exception ex) { errors.Add(ex); }
+ }
+
+ errors.ShouldBeEmpty();
+ sequences.Count.ShouldBe(publishes);
+
+ var state = await fx.GetStreamStateAsync("CONCPUB");
+ state.Messages.ShouldBe((ulong)publishes);
+ }
+
+ // ---------------------------------------------------------------
+ // Go: TestNoRaceJetStreamCluster20StreamsConcurrentPublish norace_2_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "Stress")]
+ public async Task Cluster_20_streams_with_concurrent_publish_each_stores_correct_count()
+ {
+ await using var fx = await ClusterFixture.StartAsync(nodes: 3);
+ const int streamCount = 20;
+ const int msgsPerStream = 10;
+
+ for (var i = 0; i < streamCount; i++)
+ await fx.CreateStreamAsync($"MULTI{i}", [$"multi{i}.>"], 1);
+
+ var errors = new ConcurrentBag();
+
+ // Independent streams publish in parallel — each has its own store.
+ await Parallel.ForEachAsync(Enumerable.Range(0, streamCount), async (i, _) =>
+ {
+ try
+ {
+ for (var j = 0; j < msgsPerStream; j++)
+ await fx.PublishAsync($"multi{i}.event", $"msg-{i}-{j}");
+ }
+ catch (Exception ex) { errors.Add(ex); }
+
+ await Task.CompletedTask;
+ });
+
+ errors.ShouldBeEmpty();
+
+ for (var i = 0; i < streamCount; i++)
+ {
+ var state = await fx.GetStreamStateAsync($"MULTI{i}");
+ state.Messages.ShouldBe((ulong)msgsPerStream,
+ $"stream MULTI{i} should have {msgsPerStream} messages");
+ }
+ }
+
+ // ---------------------------------------------------------------
+ // Go: TestNoRaceJetStreamClusterLeaderStepdownConcurrentPublish norace_2_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "Stress")]
+ public async Task Cluster_leader_stepdown_during_concurrent_publishes_does_not_lose_data()
+ {
+ await using var fx = await ClusterFixture.StartAsync(nodes: 3);
+ await fx.CreateStreamAsync("STEPUB", ["stepub.>"], 3);
+
+ const int publishCount = 20;
+ var errors = new ConcurrentBag();
+
+ for (var i = 0; i < publishCount; i++)
+ {
+ try
+ {
+ if (i == 5)
+ await fx.StepDownStreamLeaderAsync("STEPUB");
+
+ await fx.PublishAsync($"stepub.event.{i}", $"msg-{i}");
+ }
+ catch (Exception ex) { errors.Add(ex); }
+ }
+
+ errors.ShouldBeEmpty();
+
+ var state = await fx.GetStreamStateAsync("STEPUB");
+ state.Messages.ShouldBe((ulong)publishCount);
+ }
+
+ // ---------------------------------------------------------------
+ // Go: TestNoRaceJetStreamCluster100ConcurrentConsumerCreates norace_2_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "Stress")]
+ public async Task Cluster_100_concurrent_consumer_creates_all_succeed()
+ {
+ await using var fx = await ClusterFixture.StartAsync(nodes: 3);
+ await fx.CreateStreamAsync("CONCON", ["concon.>"], 1);
+
+ const int count = 100;
+ var errors = new ConcurrentBag();
+
+ await Parallel.ForEachAsync(Enumerable.Range(0, count), async (i, _) =>
+ {
+ try
+ {
+ await fx.CreateConsumerAsync("CONCON", $"consumer{i}");
+ }
+ catch (Exception ex) { errors.Add(ex); }
+
+ await Task.CompletedTask;
+ });
+
+ errors.ShouldBeEmpty();
+ }
+
+ // ---------------------------------------------------------------
+ // Go: TestNoRaceJetStreamCluster50ConcurrentFetches norace_2_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "Stress")]
+ public async Task Cluster_50_sequential_fetches_on_same_consumer_all_succeed()
+ {
+ await using var fx = await ClusterFixture.StartAsync(nodes: 3);
+ await fx.CreateStreamAsync("CONFETCH", ["confetch.>"], 1);
+ await fx.CreateConsumerAsync("CONFETCH", "fetcher");
+
+ for (var i = 0; i < 100; i++)
+ await fx.PublishAsync("confetch.event", $"msg-{i}");
+
+ var errors = new ConcurrentBag();
+
+ for (var i = 0; i < 50; i++)
+ {
+ try
+ {
+ var batch = await fx.FetchAsync("CONFETCH", "fetcher", 1);
+ batch.ShouldNotBeNull();
+ }
+ catch (Exception ex) { errors.Add(ex); }
+ }
+
+ errors.ShouldBeEmpty();
+ }
+
+ // ---------------------------------------------------------------
+ // Go: TestNoRaceJetStreamClusterPublishFetchInterleave norace_2_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "Stress")]
+ public async Task Cluster_concurrent_publish_and_fetch_interleaving_delivers_all_messages()
+ {
+ await using var fx = await ClusterFixture.StartAsync(nodes: 3);
+ await fx.CreateStreamAsync("INTERLEAVE", ["inter.>"], 1);
+ await fx.CreateConsumerAsync("INTERLEAVE", "reader");
+
+ const int rounds = 10;
+ const int msgsPerRound = 5;
+ var errors = new ConcurrentBag();
+ var totalFetched = 0;
+
+ for (var r = 0; r < rounds; r++)
+ {
+ try
+ {
+ for (var m = 0; m < msgsPerRound; m++)
+ await fx.PublishAsync("inter.event", $"round-{r}-msg-{m}");
+
+ var batch = await fx.FetchAsync("INTERLEAVE", "reader", msgsPerRound);
+ Interlocked.Add(ref totalFetched, batch.Messages.Count);
+ }
+ catch (Exception ex) { errors.Add(ex); }
+ }
+
+ errors.ShouldBeEmpty();
+ totalFetched.ShouldBe(rounds * msgsPerRound);
+ }
+
+ // ---------------------------------------------------------------
+ // Go: TestNoRaceJetStreamClusterMetaStepdownDuringStreamCreate norace_2_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "Stress")]
+ public void Cluster_meta_stepdown_during_stream_creation_does_not_corrupt_state()
+ {
+ var meta = new JetStreamMetaGroup(5);
+ var consumerManager = new ConsumerManager(meta);
+ var streamManager = new StreamManager(meta, consumerManager: consumerManager);
+ var errors = new ConcurrentBag();
+
+ Parallel.Invoke(
+ () =>
+ {
+ try
+ {
+ for (var i = 0; i < 30; i++)
+ {
+ streamManager.CreateOrUpdate(new StreamConfig
+ {
+ Name = $"METACD{i}",
+ Subjects = [$"mcd{i}.>"],
+ Replicas = 1,
+ });
+ }
+ }
+ catch (Exception ex) { errors.Add(ex); }
+ },
+ () =>
+ {
+ try
+ {
+ for (var i = 0; i < 5; i++)
+ {
+ meta.StepDown();
+ Thread.Sleep(2);
+ }
+ }
+ catch (Exception ex) { errors.Add(ex); }
+ });
+
+ errors.ShouldBeEmpty();
+ }
+
+ // ---------------------------------------------------------------
+ // Go: TestNoRaceJetStreamCluster10ConcurrentStreamDeletes norace_2_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "Stress")]
+ public async Task Cluster_10_concurrent_stream_deletes_complete_without_error()
+ {
+ await using var fx = await ClusterFixture.StartAsync(nodes: 3);
+ const int count = 10;
+
+ for (var i = 0; i < count; i++)
+ await fx.CreateStreamAsync($"DEL{i}", [$"del{i}.>"], 1);
+
+ var errors = new ConcurrentBag();
+
+ await Parallel.ForEachAsync(Enumerable.Range(0, count), async (i, _) =>
+ {
+ try
+ {
+ var resp = await fx.RequestAsync($"{JetStreamApiSubjects.StreamDelete}DEL{i}", "{}");
+ resp.ShouldNotBeNull();
+ }
+ catch (Exception ex) { errors.Add(ex); }
+
+ await Task.CompletedTask;
+ });
+
+ errors.ShouldBeEmpty();
+ }
+
+ // ---------------------------------------------------------------
+ // Go: TestNoRaceJetStreamClusterConcurrentAckAll norace_2_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "Stress")]
+ public async Task Cluster_concurrent_ackall_operations_advance_consumer_correctly()
+ {
+ await using var fx = await ClusterFixture.StartAsync(nodes: 3);
+ await fx.CreateStreamAsync("ACKALL", ["ackall.>"], 1);
+ await fx.CreateConsumerAsync("ACKALL", "acker", ackPolicy: AckPolicy.All);
+
+ const int msgCount = 50;
+ for (var i = 0; i < msgCount; i++)
+ await fx.PublishAsync("ackall.event", $"msg-{i}");
+
+ var errors = new ConcurrentBag();
+
+ for (ulong seq = 1; seq <= msgCount; seq += 5)
+ {
+ try
+ {
+ fx.AckAll("ACKALL", "acker", seq);
+ }
+ catch (Exception ex) { errors.Add(ex); }
+ }
+
+ errors.ShouldBeEmpty();
+ }
+
+ // ---------------------------------------------------------------
+ // Go: TestNoRaceJetStreamClusterMultiConsumerConcurrentFetch norace_2_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "Stress")]
+ public async Task Cluster_multiple_consumers_each_see_all_messages_independently()
+ {
+ await using var fx = await ClusterFixture.StartAsync(nodes: 3);
+ await fx.CreateStreamAsync("MULTICONSUMER", ["mc.>"], 1);
+
+ const int consumers = 5;
+ const int msgCount = 10;
+
+ for (var c = 0; c < consumers; c++)
+ await fx.CreateConsumerAsync("MULTICONSUMER", $"reader{c}");
+
+ for (var i = 0; i < msgCount; i++)
+ await fx.PublishAsync("mc.event", $"msg-{i}");
+
+ var errors = new ConcurrentBag();
+
+ await Parallel.ForEachAsync(Enumerable.Range(0, consumers), async (c, _) =>
+ {
+ try
+ {
+ var batch = await fx.FetchAsync("MULTICONSUMER", $"reader{c}", msgCount);
+ batch.Messages.Count.ShouldBe(msgCount,
+ $"consumer reader{c} should see all {msgCount} messages");
+ }
+ catch (Exception ex) { errors.Add(ex); }
+
+ await Task.CompletedTask;
+ });
+
+ errors.ShouldBeEmpty();
+ }
+
+ // ---------------------------------------------------------------
+ // Go: TestNoRaceJetStreamClusterRapidCreateDeleteRecreate norace_2_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "Stress")]
+ public async Task Cluster_rapid_create_delete_recreate_cycle_50_iterations_correct()
+ {
+ await using var fx = await ClusterFixture.StartAsync(nodes: 3);
+ const int iterations = 50;
+ var errors = new ConcurrentBag();
+
+ for (var i = 0; i < iterations; i++)
+ {
+ try
+ {
+ var createResp = await fx.CreateStreamAsync("RECYCLE", ["recycle.>"], 1);
+ if (createResp.Error is null)
+ {
+ await fx.PublishAsync("recycle.event", $"msg-{i}");
+ await fx.RequestAsync($"{JetStreamApiSubjects.StreamDelete}RECYCLE", "{}");
+ }
+ }
+ catch (Exception ex) { errors.Add(ex); }
+ }
+
+ errors.ShouldBeEmpty();
+ }
+
+ // ---------------------------------------------------------------
+ // Go: TestNoRaceJetStreamClusterMixedConcurrentOperations norace_2_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "Stress")]
+ public async Task Cluster_mixed_create_publish_fetch_delete_concurrently_does_not_corrupt()
+ {
+ await using var fx = await ClusterFixture.StartAsync(nodes: 3);
+
+ await fx.CreateStreamAsync("MIXEDBASE", ["mixed.>"], 1);
+ await fx.CreateConsumerAsync("MIXEDBASE", "mixedreader");
+
+ const int opsPerTask = 20;
+ var errors = new ConcurrentBag();
+
+ await Task.WhenAll(
+ Task.Run(async () =>
+ {
+ try
+ {
+ for (var i = 0; i < opsPerTask; i++)
+ await fx.CreateStreamAsync($"MXNEW{i}", [$"mxnew{i}.>"], 1);
+ }
+ catch (Exception ex) { errors.Add(ex); }
+ }),
+ Task.Run(async () =>
+ {
+ try
+ {
+ for (var i = 0; i < opsPerTask; i++)
+ await fx.PublishAsync("mixed.event", $"msg-{i}");
+ }
+ catch (Exception ex) { errors.Add(ex); }
+ }),
+ Task.Run(async () =>
+ {
+ try
+ {
+ for (var i = 0; i < opsPerTask; i++)
+ _ = await fx.FetchAsync("MIXEDBASE", "mixedreader", 1);
+ }
+ catch (Exception ex) { errors.Add(ex); }
+ }),
+ Task.Run(async () =>
+ {
+ try
+ {
+ for (var i = 0; i < opsPerTask; i++)
+ _ = await fx.GetStreamInfoAsync("MIXEDBASE");
+ }
+ catch (Exception ex) { errors.Add(ex); }
+ }));
+
+ errors.ShouldBeEmpty();
+ }
+
+ // ---------------------------------------------------------------
+ // Go: TestNoRaceJetStreamClusterConcurrentStreamInfo norace_2_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "Stress")]
+ public async Task Cluster_concurrent_stream_info_queries_during_publishes_are_safe()
+ {
+ await using var fx = await ClusterFixture.StartAsync(nodes: 3);
+ await fx.CreateStreamAsync("INFOLOAD", ["infoload.>"], 1);
+
+ const int ops = 50;
+ var errors = new ConcurrentBag();
+
+ await Task.WhenAll(
+ Task.Run(async () =>
+ {
+ try
+ {
+ for (var i = 0; i < ops; i++)
+ await fx.PublishAsync("infoload.event", $"msg-{i}");
+ }
+ catch (Exception ex) { errors.Add(ex); }
+ }),
+ Task.Run(async () =>
+ {
+ try
+ {
+ for (var i = 0; i < ops * 2; i++)
+ _ = await fx.GetStreamInfoAsync("INFOLOAD");
+ }
+ catch (Exception ex) { errors.Add(ex); }
+ }),
+ Task.Run(async () =>
+ {
+ try
+ {
+ for (var i = 0; i < ops * 2; i++)
+ _ = await fx.GetStreamStateAsync("INFOLOAD");
+ }
+ catch (Exception ex) { errors.Add(ex); }
+ }));
+
+ errors.ShouldBeEmpty();
+ }
+
+ // ---------------------------------------------------------------
+ // Go: TestNoRaceJetStreamClusterLargeBatchFetch norace_2_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "Stress")]
+ public async Task Cluster_large_batch_fetch_500_messages_under_concurrent_publish()
+ {
+ await using var fx = await ClusterFixture.StartAsync(nodes: 3);
+ await fx.CreateStreamAsync("LARGEBATCH", ["lb.>"], 1);
+ await fx.CreateConsumerAsync("LARGEBATCH", "batchreader");
+
+ const int totalMsgs = 500;
+
+ for (var i = 0; i < totalMsgs; i++)
+ await fx.PublishAsync("lb.event", $"payload-{i}");
+
+ var errors = new ConcurrentBag();
+ var fetchedCount = 0;
+
+ await Task.WhenAll(
+ Task.Run(async () =>
+ {
+ try
+ {
+ var batch = await fx.FetchAsync("LARGEBATCH", "batchreader", totalMsgs);
+ Interlocked.Add(ref fetchedCount, batch.Messages.Count);
+ }
+ catch (Exception ex) { errors.Add(ex); }
+ }),
+ Task.Run(async () =>
+ {
+ try
+ {
+ for (var i = 0; i < 50; i++)
+ await fx.PublishAsync("lb.event", $"extra-{i}");
+ }
+ catch (Exception ex) { errors.Add(ex); }
+ }));
+
+ errors.ShouldBeEmpty();
+ fetchedCount.ShouldBe(totalMsgs);
+ }
+
+ // ---------------------------------------------------------------
+ // Go: TestNoRaceJetStreamClusterConsumerDeleteConcurrent norace_2_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "Stress")]
+ public async Task Cluster_concurrent_consumer_delete_and_create_is_thread_safe()
+ {
+ await using var fx = await ClusterFixture.StartAsync(nodes: 3);
+ await fx.CreateStreamAsync("CONDEL", ["condel.>"], 1);
+
+ const int initialCount = 20;
+ for (var i = 0; i < initialCount; i++)
+ await fx.CreateConsumerAsync("CONDEL", $"c{i}");
+
+ var errors = new ConcurrentBag();
+
+ await Task.WhenAll(
+ Task.Run(async () =>
+ {
+ try
+ {
+ for (var i = 0; i < initialCount / 2; i++)
+ await fx.RequestAsync(
+ $"{JetStreamApiSubjects.ConsumerDelete}CONDEL.c{i}", "{}");
+ }
+ catch (Exception ex) { errors.Add(ex); }
+ }),
+ Task.Run(async () =>
+ {
+ try
+ {
+ for (var i = initialCount; i < initialCount + 10; i++)
+ await fx.CreateConsumerAsync("CONDEL", $"c{i}");
+ }
+ catch (Exception ex) { errors.Add(ex); }
+ }),
+ Task.Run(async () =>
+ {
+ try
+ {
+ for (var i = 0; i < 30; i++)
+ _ = await fx.GetStreamInfoAsync("CONDEL");
+ }
+ catch (Exception ex) { errors.Add(ex); }
+ }));
+
+ errors.ShouldBeEmpty();
+ }
+
+ // ---------------------------------------------------------------
+ // Go: TestNoRaceJetStreamClusterStreamPurgeConcurrentFetch norace_2_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "Stress")]
+ public async Task Cluster_stream_purge_concurrent_with_fetch_does_not_deadlock()
+ {
+ await using var fx = await ClusterFixture.StartAsync(nodes: 3);
+ await fx.CreateStreamAsync("PURGELOAD", ["pl.>"], 1);
+ await fx.CreateConsumerAsync("PURGELOAD", "purgereader");
+
+ for (var i = 0; i < 100; i++)
+ await fx.PublishAsync("pl.event", $"msg-{i}");
+
+ var errors = new ConcurrentBag();
+
+ await Task.WhenAll(
+ Task.Run(async () =>
+ {
+ try
+ {
+ await fx.RequestAsync($"{JetStreamApiSubjects.StreamPurge}PURGELOAD", "{}");
+ }
+ catch (Exception ex) { errors.Add(ex); }
+ }),
+ Task.Run(async () =>
+ {
+ try
+ {
+ _ = await fx.FetchAsync("PURGELOAD", "purgereader", 50);
+ }
+ catch (Exception ex) { errors.Add(ex); }
+ }));
+
+ errors.ShouldBeEmpty();
+ }
+}
diff --git a/tests/NATS.Server.Tests/Stress/ConcurrentPubSubStressTests.cs b/tests/NATS.Server.Tests/Stress/ConcurrentPubSubStressTests.cs
new file mode 100644
index 0000000..aa716d3
--- /dev/null
+++ b/tests/NATS.Server.Tests/Stress/ConcurrentPubSubStressTests.cs
@@ -0,0 +1,915 @@
+// Go parity: golang/nats-server/server/norace_1_test.go
+// Covers: concurrent publish/subscribe thread safety, SubList trie integrity
+// under high concurrency, wildcard routing under load, queue group balancing,
+// cache invalidation safety, and subject tree concurrent insert/remove.
+
+using System.Collections.Concurrent;
+using NATS.Server.Subscriptions;
+
+namespace NATS.Server.Tests.Stress;
+
+///
+/// Stress tests for concurrent pub/sub operations on the in-process SubList and SubjectMatch
+/// classes. All tests use Parallel.For / Task.WhenAll to exercise thread safety directly
+/// without spinning up a real NatsServer.
+///
+/// Go ref: norace_1_test.go — concurrent subscription and matching operations.
+///
+public class ConcurrentPubSubStressTests
+{
+ // ---------------------------------------------------------------
+ // Go: TestNoRaceSublistConcurrent100Subscribers norace_1_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "Stress")]
+ public void SubList_100_concurrent_subscribers_all_inserted_without_error()
+ {
+ // 100 concurrent goroutines each Subscribe to the same subject and then Match.
+ using var subList = new SubList();
+ const int count = 100;
+ var errors = new ConcurrentBag();
+
+ Parallel.For(0, count, i =>
+ {
+ try
+ {
+ subList.Insert(new Subscription { Subject = "stress.concurrent", Sid = $"s{i}" });
+ var result = subList.Match("stress.concurrent");
+ result.PlainSubs.Length.ShouldBeGreaterThan(0);
+ }
+ catch (Exception ex) { errors.Add(ex); }
+ });
+
+ errors.ShouldBeEmpty();
+ subList.Count.ShouldBe((uint)count);
+ }
+
+ // ---------------------------------------------------------------
+ // Go: TestNoRace50ConcurrentPublishers norace_1_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "Stress")]
+ public void SubList_50_concurrent_publishers_produce_correct_match_counts()
+ {
+ // 50 goroutines each publish 100 times to their own subject.
+ // Verifies that Match never throws even under heavy concurrent write/read.
+ using var subList = new SubList();
+ const int publishers = 50;
+ const int messagesEach = 100;
+ var errors = new ConcurrentBag();
+
+ // Pre-insert one subscription per publisher subject
+ for (var i = 0; i < publishers; i++)
+ {
+ subList.Insert(new Subscription
+ {
+ Subject = $"pub.stress.{i}",
+ Sid = $"pre-{i}",
+ });
+ }
+
+ Parallel.For(0, publishers, i =>
+ {
+ try
+ {
+ for (var j = 0; j < messagesEach; j++)
+ {
+ var result = subList.Match($"pub.stress.{i}");
+ result.PlainSubs.Length.ShouldBe(1);
+ }
+ }
+ catch (Exception ex) { errors.Add(ex); }
+ });
+
+ errors.ShouldBeEmpty();
+ }
+
+ // ---------------------------------------------------------------
+ // Go: TestNoRaceSubUnsubConcurrent norace_1_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "Stress")]
+ public void SubList_concurrent_subscribe_and_unsubscribe_does_not_crash()
+ {
+ using var subList = new SubList();
+ const int ops = 300;
+ var subs = new ConcurrentBag();
+ var errors = new ConcurrentBag();
+
+ // Concurrent inserts and removes — neither side holds a reference the other
+ // side needs, so any interleaving is valid as long as it doesn't throw.
+ Parallel.Invoke(
+ () =>
+ {
+ try
+ {
+ for (var i = 0; i < ops; i++)
+ {
+ var sub = new Subscription { Subject = $"unsub.{i % 30}", Sid = $"ins-{i}" };
+ subList.Insert(sub);
+ subs.Add(sub);
+ }
+ }
+ catch (Exception ex) { errors.Add(ex); }
+ },
+ () =>
+ {
+ try
+ {
+ foreach (var sub in subs.Take(ops / 2))
+ subList.Remove(sub);
+ }
+ catch (Exception ex) { errors.Add(ex); }
+ });
+
+ errors.ShouldBeEmpty();
+ }
+
+ // ---------------------------------------------------------------
+ // Go: TestNoRaceConcurrentMatchOperations norace_1_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "Stress")]
+ public void SubList_concurrent_match_operations_are_thread_safe()
+ {
+ using var subList = new SubList();
+
+ for (var i = 0; i < 50; i++)
+ {
+ subList.Insert(new Subscription
+ {
+ Subject = $"match.safe.{i % 10}",
+ Sid = $"m{i}",
+ });
+ }
+
+ var errors = new ConcurrentBag();
+
+ // 200 threads all calling Match simultaneously
+ Parallel.For(0, 200, i =>
+ {
+ try
+ {
+ var result = subList.Match($"match.safe.{i % 10}");
+ result.ShouldNotBeNull();
+ }
+ catch (Exception ex) { errors.Add(ex); }
+ });
+
+ errors.ShouldBeEmpty();
+ }
+
+ // ---------------------------------------------------------------
+ // Go: TestNoRace1000ConcurrentSubscriptions norace_1_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "Stress")]
+ public void SubList_handles_1000_concurrent_subscriptions_without_error()
+ {
+ using var subList = new SubList();
+ const int count = 1000;
+ var errors = new ConcurrentBag();
+
+ Parallel.For(0, count, i =>
+ {
+ try
+ {
+ subList.Insert(new Subscription
+ {
+ Subject = $"big.load.{i % 100}",
+ Sid = $"big-{i}",
+ });
+ }
+ catch (Exception ex) { errors.Add(ex); }
+ });
+
+ errors.ShouldBeEmpty();
+ subList.Count.ShouldBe((uint)count);
+ }
+
+ // ---------------------------------------------------------------
+ // Go: TestNoRace10000SubscriptionsWithConcurrentMatch norace_1_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "Stress")]
+ public void SubList_handles_10000_subscriptions_with_concurrent_matches()
+ {
+ using var subList = new SubList();
+ const int count = 10_000;
+
+ // Sequential insert to avoid any write-write contention noise
+ for (var i = 0; i < count; i++)
+ {
+ subList.Insert(new Subscription
+ {
+ Subject = $"huge.{i % 200}.data",
+ Sid = $"h{i}",
+ });
+ }
+
+ var errors = new ConcurrentBag();
+
+ Parallel.For(0, 500, i =>
+ {
+ try
+ {
+ var result = subList.Match($"huge.{i % 200}.data");
+ // Each subject bucket has count/200 = 50 subscribers
+ result.PlainSubs.Length.ShouldBe(50);
+ }
+ catch (Exception ex) { errors.Add(ex); }
+ });
+
+ errors.ShouldBeEmpty();
+ }
+
+ // ---------------------------------------------------------------
+ // Go: TestNoRaceWildcardConcurrentPub norace_1_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "Stress")]
+ public void SubList_wildcard_subjects_routed_correctly_under_concurrent_match()
+ {
+ using var subList = new SubList();
+
+ subList.Insert(new Subscription { Subject = "wc.*", Sid = "pwc" });
+ subList.Insert(new Subscription { Subject = "wc.>", Sid = "fwc" });
+ subList.Insert(new Subscription { Subject = "wc.specific", Sid = "lit" });
+
+ var errors = new ConcurrentBag();
+
+ Parallel.For(0, 400, i =>
+ {
+ try
+ {
+ var subject = (i % 3) switch
+ {
+ 0 => "wc.specific",
+ 1 => "wc.anything",
+ _ => "wc.deep.nested",
+ };
+ var result = subList.Match(subject);
+ // wc.* matches single-token, wc.> matches all
+ result.PlainSubs.Length.ShouldBeGreaterThan(0);
+ }
+ catch (Exception ex) { errors.Add(ex); }
+ });
+
+ errors.ShouldBeEmpty();
+ }
+
+ // ---------------------------------------------------------------
+ // Go: TestNoRaceQueueGroupBalancingUnderLoad norace_1_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "Stress")]
+ public void SubList_queue_group_balancing_correct_under_concurrent_load()
+ {
+ using var subList = new SubList();
+ const int memberCount = 20;
+
+ for (var i = 0; i < memberCount; i++)
+ {
+ subList.Insert(new Subscription
+ {
+ Subject = "queue.load",
+ Queue = "workers",
+ Sid = $"q{i}",
+ });
+ }
+
+ var errors = new ConcurrentBag();
+
+ Parallel.For(0, 200, i =>
+ {
+ try
+ {
+ var result = subList.Match("queue.load");
+ result.QueueSubs.Length.ShouldBe(1);
+ result.QueueSubs[0].Length.ShouldBe(memberCount);
+ }
+ catch (Exception ex) { errors.Add(ex); }
+ });
+
+ errors.ShouldBeEmpty();
+ }
+
+ // ---------------------------------------------------------------
+ // Go: TestNoRace100ConcurrentPubsSameSubject norace_1_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "Stress")]
+ public void SubList_100_concurrent_publishes_to_same_subject_all_processed()
+ {
+ using var subList = new SubList();
+ subList.Insert(new Subscription { Subject = "same.subject", Sid = "single" });
+
+ var matchCount = 0;
+ var errors = new ConcurrentBag();
+
+ Parallel.For(0, 100, _ =>
+ {
+ try
+ {
+ var result = subList.Match("same.subject");
+ result.PlainSubs.Length.ShouldBe(1);
+ Interlocked.Increment(ref matchCount);
+ }
+ catch (Exception ex) { errors.Add(ex); }
+ });
+
+ errors.ShouldBeEmpty();
+ matchCount.ShouldBe(100);
+ }
+
+ // ---------------------------------------------------------------
+ // Go: TestNoRaceConcurrentIdenticalSubjects norace_1_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "Stress")]
+ public void SubList_concurrent_subscribe_with_identical_subjects_all_inserted()
+ {
+ using var subList = new SubList();
+ const int count = 100;
+ var errors = new ConcurrentBag();
+
+ Parallel.For(0, count, i =>
+ {
+ try
+ {
+ subList.Insert(new Subscription
+ {
+ Subject = "identical.subject",
+ Sid = $"ident-{i}",
+ });
+ }
+ catch (Exception ex) { errors.Add(ex); }
+ });
+
+ errors.ShouldBeEmpty();
+ var result = subList.Match("identical.subject");
+ result.PlainSubs.Length.ShouldBe(count);
+ }
+
+ // ---------------------------------------------------------------
+ // Go: TestNoRaceSubscribePublishInterleaving norace_1_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "Stress")]
+ public void SubList_subscribe_publish_interleaving_does_not_lose_messages()
+ {
+ using var subList = new SubList();
+ var errors = new ConcurrentBag();
+ var totalMatches = 0;
+
+ Parallel.Invoke(
+ () =>
+ {
+ try
+ {
+ for (var i = 0; i < 100; i++)
+ {
+ subList.Insert(new Subscription
+ {
+ Subject = $"interleave.{i % 10}",
+ Sid = $"il-{i}",
+ });
+ }
+ }
+ catch (Exception ex) { errors.Add(ex); }
+ },
+ () =>
+ {
+ try
+ {
+ for (var i = 0; i < 200; i++)
+ {
+ var result = subList.Match($"interleave.{i % 10}");
+ Interlocked.Add(ref totalMatches, result.PlainSubs.Length);
+ }
+ }
+ catch (Exception ex) { errors.Add(ex); }
+ });
+
+ errors.ShouldBeEmpty();
+ // We cannot assert a fixed count because of race between sub insert and match,
+ // but no exception is the primary invariant.
+ totalMatches.ShouldBeGreaterThanOrEqualTo(0);
+ }
+
+ // ---------------------------------------------------------------
+ // Go: TestNoRaceCacheInvalidationConcurrent norace_1_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "Stress")]
+ public void SubList_cache_invalidation_is_thread_safe_under_concurrent_modifications()
+ {
+ using var subList = new SubList();
+
+ // Fill the cache
+ for (var i = 0; i < 100; i++)
+ {
+ var sub = new Subscription { Subject = $"cache.inv.{i}", Sid = $"ci-{i}" };
+ subList.Insert(sub);
+ _ = subList.Match($"cache.inv.{i}");
+ }
+
+ subList.CacheCount.ShouldBeGreaterThan(0);
+
+ var errors = new ConcurrentBag();
+
+ // Concurrent reads (cache hits) and writes (cache invalidation)
+ Parallel.Invoke(
+ () =>
+ {
+ try
+ {
+ for (var i = 0; i < 200; i++)
+ _ = subList.Match($"cache.inv.{i % 100}");
+ }
+ catch (Exception ex) { errors.Add(ex); }
+ },
+ () =>
+ {
+ try
+ {
+ for (var i = 100; i < 150; i++)
+ {
+ subList.Insert(new Subscription
+ {
+ Subject = $"cache.inv.{i}",
+ Sid = $"cinew-{i}",
+ });
+ }
+ }
+ catch (Exception ex) { errors.Add(ex); }
+ });
+
+ errors.ShouldBeEmpty();
+ }
+
+ // ---------------------------------------------------------------
+ // Go: TestNoRacePurgeAndMatchConcurrent norace_1_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "Stress")]
+ public void SubList_concurrent_batch_remove_and_match_do_not_deadlock()
+ {
+ using var subList = new SubList();
+ var inserted = new List();
+ var errors = new ConcurrentBag();
+
+ for (var i = 0; i < 200; i++)
+ {
+ var sub = new Subscription { Subject = $"purge.match.{i % 20}", Sid = $"pm-{i}" };
+ subList.Insert(sub);
+ inserted.Add(sub);
+ }
+
+ Parallel.Invoke(
+ () =>
+ {
+ try
+ {
+ subList.RemoveBatch(inserted.Take(100));
+ }
+ catch (Exception ex) { errors.Add(ex); }
+ },
+ () =>
+ {
+ try
+ {
+ for (var i = 0; i < 100; i++)
+ _ = subList.Match($"purge.match.{i % 20}");
+ }
+ catch (Exception ex) { errors.Add(ex); }
+ });
+
+ errors.ShouldBeEmpty();
+ }
+
+ // ---------------------------------------------------------------
+ // Go: TestNoRace1000Subjects10SubscribersEach norace_1_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "Stress")]
+ public void SubList_1000_subjects_10_subscribers_each_concurrent_match_correct()
+ {
+ using var subList = new SubList();
+ const int subjects = 200; // reduced for CI speed; same shape as 1000
+ const int subsPerSubject = 5;
+
+ for (var s = 0; s < subjects; s++)
+ {
+ for (var n = 0; n < subsPerSubject; n++)
+ {
+ subList.Insert(new Subscription
+ {
+ Subject = $"big.tree.{s}",
+ Sid = $"bt-{s}-{n}",
+ });
+ }
+ }
+
+ var errors = new ConcurrentBag();
+
+ Parallel.For(0, subjects * 3, i =>
+ {
+ try
+ {
+ var result = subList.Match($"big.tree.{i % subjects}");
+ result.PlainSubs.Length.ShouldBe(subsPerSubject);
+ }
+ catch (Exception ex) { errors.Add(ex); }
+ });
+
+ errors.ShouldBeEmpty();
+ }
+
+ // ---------------------------------------------------------------
+ // Go: TestNoRaceMixedWildcardLiteralConcurrent norace_1_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "Stress")]
+ public void SubList_mixed_wildcard_and_literal_subscriptions_under_concurrent_match()
+ {
+ using var subList = new SubList();
+
+ // Mix of literals, * wildcards, and > wildcards
+ for (var i = 0; i < 20; i++)
+ {
+ subList.Insert(new Subscription { Subject = $"mix.{i}.literal", Sid = $"lit-{i}" });
+ subList.Insert(new Subscription { Subject = $"mix.{i}.*", Sid = $"pwc-{i}" });
+ }
+
+ subList.Insert(new Subscription { Subject = "mix.>", Sid = "fwc-root" });
+
+ var errors = new ConcurrentBag();
+
+ Parallel.For(0, 300, i =>
+ {
+ try
+ {
+ var idx = i % 20;
+ var result = subList.Match($"mix.{idx}.literal");
+ // Matches: the literal sub, the * wildcard sub, and the > sub
+ result.PlainSubs.Length.ShouldBe(3);
+ }
+ catch (Exception ex) { errors.Add(ex); }
+ });
+
+ errors.ShouldBeEmpty();
+ }
+
+ // ---------------------------------------------------------------
+ // Go: TestNoRaceHighThroughputPublish norace_1_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "Stress")]
+ public void SubList_high_throughput_10000_messages_to_single_subscriber()
+ {
+ using var subList = new SubList();
+ subList.Insert(new Subscription { Subject = "throughput.test", Sid = "tp1" });
+
+ var count = 0;
+ var errors = new ConcurrentBag();
+
+ for (var i = 0; i < 10_000; i++)
+ {
+ try
+ {
+ var result = subList.Match("throughput.test");
+ result.PlainSubs.Length.ShouldBe(1);
+ count++;
+ }
+ catch (Exception ex) { errors.Add(ex); }
+ }
+
+ errors.ShouldBeEmpty();
+ count.ShouldBe(10_000);
+ }
+
+ // ---------------------------------------------------------------
+ // Go: TestNoRaceQueueSubConcurrentUnsubscribe norace_1_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "Stress")]
+ public void SubList_concurrent_queue_group_subscribe_and_unsubscribe_is_safe()
+ {
+ using var subList = new SubList();
+ const int ops = 200;
+ var inserted = new ConcurrentBag();
+ var errors = new ConcurrentBag();
+
+ Parallel.Invoke(
+ () =>
+ {
+ try
+ {
+ for (var i = 0; i < ops; i++)
+ {
+ var sub = new Subscription
+ {
+ Subject = $"qg.stress.{i % 10}",
+ Queue = $"grp-{i % 5}",
+ Sid = $"qgs-{i}",
+ };
+ subList.Insert(sub);
+ inserted.Add(sub);
+ }
+ }
+ catch (Exception ex) { errors.Add(ex); }
+ },
+ () =>
+ {
+ try
+ {
+ foreach (var sub in inserted.Take(ops / 2))
+ subList.Remove(sub);
+ }
+ catch (Exception ex) { errors.Add(ex); }
+ },
+ () =>
+ {
+ try
+ {
+ for (var i = 0; i < ops; i++)
+ _ = subList.Match($"qg.stress.{i % 10}");
+ }
+ catch (Exception ex) { errors.Add(ex); }
+ });
+
+ errors.ShouldBeEmpty();
+ }
+
+ // ---------------------------------------------------------------
+ // Go: TestNoRace500Subjects5SubscribersEach norace_1_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "Stress")]
+ public void SubList_500_subjects_5_subscribers_each_concurrent_match_returns_correct_results()
+ {
+ using var subList = new SubList();
+ const int subjects = 100; // scaled for CI speed
+ const int subsPerSubject = 5;
+
+ for (var s = 0; s < subjects; s++)
+ {
+ for (var n = 0; n < subsPerSubject; n++)
+ {
+ subList.Insert(new Subscription
+ {
+ Subject = $"five.subs.{s}",
+ Sid = $"fs-{s}-{n}",
+ });
+ }
+ }
+
+ var errors = new ConcurrentBag();
+ var correctCount = 0;
+
+ Parallel.For(0, subjects * 4, i =>
+ {
+ try
+ {
+ var result = subList.Match($"five.subs.{i % subjects}");
+ if (result.PlainSubs.Length == subsPerSubject)
+ Interlocked.Increment(ref correctCount);
+ }
+ catch (Exception ex) { errors.Add(ex); }
+ });
+
+ errors.ShouldBeEmpty();
+ correctCount.ShouldBe(subjects * 4);
+ }
+
+ // ---------------------------------------------------------------
+ // Go: TestNoRaceSubjectValidationConcurrent norace_1_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "Stress")]
+ public void SubjectMatch_validation_is_thread_safe_under_concurrent_calls()
+ {
+ var errors = new ConcurrentBag();
+ var validCount = 0;
+
+ Parallel.For(0, 1000, i =>
+ {
+ try
+ {
+ var subject = (i % 4) switch
+ {
+ 0 => $"valid.subject.{i}",
+ 1 => $"valid.*.wildcard",
+ 2 => $"valid.>",
+ _ => string.Empty, // invalid
+ };
+ var isValid = SubjectMatch.IsValidSubject(subject);
+ if (isValid)
+ Interlocked.Increment(ref validCount);
+ }
+ catch (Exception ex) { errors.Add(ex); }
+ });
+
+ errors.ShouldBeEmpty();
+ // 750 valid, 250 empty (invalid)
+ validCount.ShouldBe(750);
+ }
+
+ // ---------------------------------------------------------------
+ // Go: TestNoRaceHasInterestConcurrent norace_1_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "Stress")]
+ public void SubList_has_interest_returns_consistent_results_under_concurrent_insert()
+ {
+ using var subList = new SubList();
+ var errors = new ConcurrentBag();
+ var interestFoundCount = 0;
+
+ Parallel.Invoke(
+ () =>
+ {
+ try
+ {
+ for (var i = 0; i < 200; i++)
+ {
+ subList.Insert(new Subscription
+ {
+ Subject = $"interest.{i % 20}",
+ Sid = $"hi-{i}",
+ });
+ }
+ }
+ catch (Exception ex) { errors.Add(ex); }
+ },
+ () =>
+ {
+ try
+ {
+ for (var i = 0; i < 200; i++)
+ {
+ if (subList.HasInterest($"interest.{i % 20}"))
+ Interlocked.Increment(ref interestFoundCount);
+ }
+ }
+ catch (Exception ex) { errors.Add(ex); }
+ });
+
+ errors.ShouldBeEmpty();
+ interestFoundCount.ShouldBeGreaterThanOrEqualTo(0);
+ }
+
+ // ---------------------------------------------------------------
+ // Go: TestNoRaceNumInterestConcurrent norace_1_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "Stress")]
+ public void SubList_num_interest_is_consistent_under_high_concurrency()
+ {
+ using var subList = new SubList();
+ const int subCount = 80;
+
+ for (var i = 0; i < subCount; i++)
+ {
+ subList.Insert(new Subscription
+ {
+ Subject = "num.interest.stress",
+ Sid = $"nis-{i}",
+ });
+ }
+
+ var errors = new ConcurrentBag();
+
+ Parallel.For(0, 400, _ =>
+ {
+ try
+ {
+ var (plain, queue) = subList.NumInterest("num.interest.stress");
+ plain.ShouldBe(subCount);
+ queue.ShouldBe(0);
+ }
+ catch (Exception ex) { errors.Add(ex); }
+ });
+
+ errors.ShouldBeEmpty();
+ }
+
+ // ---------------------------------------------------------------
+ // Go: TestNoRaceReverseMatchConcurrent norace_1_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "Stress")]
+ public void SubList_reverse_match_concurrent_with_inserts_does_not_throw()
+ {
+ using var subList = new SubList();
+ var errors = new ConcurrentBag();
+
+ Parallel.Invoke(
+ () =>
+ {
+ try
+ {
+ for (var i = 0; i < 100; i++)
+ {
+ subList.Insert(new Subscription
+ {
+ Subject = $"rev.stress.{i % 10}",
+ Sid = $"rs-{i}",
+ });
+ }
+ }
+ catch (Exception ex) { errors.Add(ex); }
+ },
+ () =>
+ {
+ try
+ {
+ for (var i = 0; i < 150; i++)
+ _ = subList.ReverseMatch($"rev.stress.{i % 10}");
+ }
+ catch (Exception ex) { errors.Add(ex); }
+ });
+
+ errors.ShouldBeEmpty();
+ }
+
+ // ---------------------------------------------------------------
+ // Go: TestNoRaceStatsConsistencyUnderLoad norace_1_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "Stress")]
+ public void SubList_stats_remain_consistent_under_concurrent_insert_remove_match()
+ {
+ using var subList = new SubList();
+ const int ops = 300;
+ var insertedSubs = new ConcurrentBag();
+ var errors = new ConcurrentBag();
+
+ Parallel.Invoke(
+ () =>
+ {
+ try
+ {
+ for (var i = 0; i < ops; i++)
+ {
+ var sub = new Subscription
+ {
+ Subject = $"stats.stress.{i % 30}",
+ Sid = $"ss-{i}",
+ };
+ subList.Insert(sub);
+ insertedSubs.Add(sub);
+ }
+ }
+ catch (Exception ex) { errors.Add(ex); }
+ },
+ () =>
+ {
+ try
+ {
+ for (var i = 0; i < ops; i++)
+ _ = subList.Match($"stats.stress.{i % 30}");
+ }
+ catch (Exception ex) { errors.Add(ex); }
+ },
+ () =>
+ {
+ try
+ {
+ for (var i = 0; i < 50; i++)
+ _ = subList.Stats();
+ }
+ catch (Exception ex) { errors.Add(ex); }
+ });
+
+ errors.ShouldBeEmpty();
+
+ var finalStats = subList.Stats();
+ finalStats.NumInserts.ShouldBeGreaterThan(0UL);
+ finalStats.NumMatches.ShouldBeGreaterThan(0UL);
+ }
+}
diff --git a/tests/NATS.Server.Tests/Stress/SlowConsumerStressTests.cs b/tests/NATS.Server.Tests/Stress/SlowConsumerStressTests.cs
new file mode 100644
index 0000000..f91a0de
--- /dev/null
+++ b/tests/NATS.Server.Tests/Stress/SlowConsumerStressTests.cs
@@ -0,0 +1,758 @@
+// Go parity: golang/nats-server/server/norace_1_test.go
+// Covers: slow consumer detection, backpressure stats, rapid subscribe/unsubscribe
+// cycles, multi-client connection stress, large message delivery, and connection
+// lifecycle stability under load using real NatsServer instances.
+
+using System.Net;
+using System.Net.Sockets;
+using System.Text;
+using Microsoft.Extensions.Logging.Abstractions;
+using NATS.Server;
+
+namespace NATS.Server.Tests.Stress;
+
+///
+/// Stress tests for slow consumer behaviour and connection lifecycle using real NatsServer
+/// instances wired with raw Socket connections following the same pattern as
+/// ClientSlowConsumerTests.cs and ServerTests.cs.
+///
+/// Go ref: norace_1_test.go — slow consumer, connection churn, and load tests.
+///
+public class SlowConsumerStressTests
+{
+ // ---------------------------------------------------------------
+ // Helpers
+ // ---------------------------------------------------------------
+
+ private static int GetFreePort()
+ {
+ using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
+ sock.Bind(new IPEndPoint(IPAddress.Loopback, 0));
+ return ((IPEndPoint)sock.LocalEndPoint!).Port;
+ }
+
+ private static async Task ReadUntilAsync(Socket sock, string expected, int timeoutMs = 5000)
+ {
+ using var cts = new CancellationTokenSource(timeoutMs);
+ var sb = new StringBuilder();
+ var buf = new byte[8192];
+ while (!sb.ToString().Contains(expected))
+ {
+ var n = await sock.ReceiveAsync(buf, SocketFlags.None, cts.Token);
+ if (n == 0) break;
+ sb.Append(Encoding.ASCII.GetString(buf, 0, n));
+ }
+ return sb.ToString();
+ }
+
+ private static async Task ConnectRawAsync(int port)
+ {
+ var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
+ await sock.ConnectAsync(IPAddress.Loopback, port);
+ // Drain the INFO line
+ var buf = new byte[4096];
+ await sock.ReceiveAsync(buf, SocketFlags.None);
+ return sock;
+ }
+
+ // ---------------------------------------------------------------
+ // Go: TestNoRaceSlowConsumerStatIncrement norace_1_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "Stress")]
+ public async Task Slow_consumer_stat_incremented_when_client_falls_behind()
+ {
+ // Go: TestNoClientLeakOnSlowConsumer — verify Stats.SlowConsumers increments.
+ const long maxPending = 512;
+ const int payloadSize = 256;
+ const int floodCount = 30;
+
+ var port = GetFreePort();
+ using var cts = new CancellationTokenSource();
+ var server = new NatsServer(
+ new NatsOptions { Port = port, MaxPending = maxPending },
+ NullLoggerFactory.Instance);
+ _ = server.StartAsync(cts.Token);
+ await server.WaitForReadyAsync();
+
+ try
+ {
+ using var slowSub = await ConnectRawAsync(port);
+ await slowSub.SendAsync(
+ Encoding.ASCII.GetBytes("CONNECT {\"verbose\":false}\r\nSUB sc.stat 1\r\nPING\r\n"));
+ await ReadUntilAsync(slowSub, "PONG");
+
+ using var pub = await ConnectRawAsync(port);
+ await pub.SendAsync(Encoding.ASCII.GetBytes("CONNECT {\"verbose\":false}\r\n"));
+
+ var payload = new string('Z', payloadSize);
+ var sb = new StringBuilder();
+ for (var i = 0; i < floodCount; i++)
+ sb.Append($"PUB sc.stat {payloadSize}\r\n{payload}\r\n");
+ sb.Append("PING\r\n");
+ await pub.SendAsync(Encoding.ASCII.GetBytes(sb.ToString()));
+ await ReadUntilAsync(pub, "PONG", 5000);
+
+ await Task.Delay(500);
+
+ var stats = server.Stats;
+ Interlocked.Read(ref stats.SlowConsumers).ShouldBeGreaterThan(0);
+ }
+ finally
+ {
+ await cts.CancelAsync();
+ server.Dispose();
+ }
+ }
+
+ // ---------------------------------------------------------------
+ // Go: TestNoRaceSlowConsumerClientsTrackedIndependently norace_1_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "Stress")]
+ public async Task Multiple_slow_consumers_tracked_independently_in_stats()
+ {
+ const long maxPending = 256;
+ const int payloadSize = 128;
+ const int floodCount = 20;
+
+ var port = GetFreePort();
+ using var cts = new CancellationTokenSource();
+ var server = new NatsServer(
+ new NatsOptions { Port = port, MaxPending = maxPending },
+ NullLoggerFactory.Instance);
+ _ = server.StartAsync(cts.Token);
+ await server.WaitForReadyAsync();
+
+ try
+ {
+ // Two independent slow subscribers
+ using var slow1 = await ConnectRawAsync(port);
+ using var slow2 = await ConnectRawAsync(port);
+
+ await slow1.SendAsync(
+ Encoding.ASCII.GetBytes("CONNECT {\"verbose\":false}\r\nSUB multi.slow 1\r\nPING\r\n"));
+ await ReadUntilAsync(slow1, "PONG");
+
+ await slow2.SendAsync(
+ Encoding.ASCII.GetBytes("CONNECT {\"verbose\":false}\r\nSUB multi.slow 2\r\nPING\r\n"));
+ await ReadUntilAsync(slow2, "PONG");
+
+ using var pub = await ConnectRawAsync(port);
+ await pub.SendAsync(Encoding.ASCII.GetBytes("CONNECT {\"verbose\":false}\r\n"));
+
+ var payload = new string('A', payloadSize);
+ var sb = new StringBuilder();
+ for (var i = 0; i < floodCount; i++)
+ sb.Append($"PUB multi.slow {payloadSize}\r\n{payload}\r\n");
+ sb.Append("PING\r\n");
+ await pub.SendAsync(Encoding.ASCII.GetBytes(sb.ToString()));
+ await ReadUntilAsync(pub, "PONG", 5000);
+
+ await Task.Delay(600);
+
+ var stats = server.Stats;
+ Interlocked.Read(ref stats.SlowConsumers).ShouldBeGreaterThan(0);
+ }
+ finally
+ {
+ await cts.CancelAsync();
+ server.Dispose();
+ }
+ }
+
+ // ---------------------------------------------------------------
+ // Go: TestNoRacePublisherBackpressure norace_1_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "Stress")]
+ public async Task Fast_publisher_with_slow_reader_generates_backpressure_stats()
+ {
+ const long maxPending = 512;
+
+ var port = GetFreePort();
+ using var cts = new CancellationTokenSource();
+ var server = new NatsServer(
+ new NatsOptions { Port = port, MaxPending = maxPending },
+ NullLoggerFactory.Instance);
+ _ = server.StartAsync(cts.Token);
+ await server.WaitForReadyAsync();
+
+ try
+ {
+ using var sub = await ConnectRawAsync(port);
+ await sub.SendAsync(
+ Encoding.ASCII.GetBytes("CONNECT {\"verbose\":false}\r\nSUB bp.test 1\r\nPING\r\n"));
+ await ReadUntilAsync(sub, "PONG");
+
+ using var pub = await ConnectRawAsync(port);
+ await pub.SendAsync(Encoding.ASCII.GetBytes("CONNECT {\"verbose\":false}\r\n"));
+
+ var payload = new string('P', 400);
+ var sb = new StringBuilder();
+ for (var i = 0; i < 25; i++)
+ sb.Append($"PUB bp.test 400\r\n{payload}\r\n");
+ sb.Append("PING\r\n");
+ await pub.SendAsync(Encoding.ASCII.GetBytes(sb.ToString()));
+ await ReadUntilAsync(pub, "PONG", 5000);
+ await Task.Delay(400);
+
+ var stats = server.Stats;
+ // At least the SlowConsumers counter or client count dropped
+ (Interlocked.Read(ref stats.SlowConsumers) > 0 || server.ClientCount <= 2)
+ .ShouldBeTrue();
+ }
+ finally
+ {
+ await cts.CancelAsync();
+ server.Dispose();
+ }
+ }
+
+ // ---------------------------------------------------------------
+ // Go: TestNoRace100RapidPublishes norace_1_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "Stress")]
+ public async Task Subscriber_receives_messages_after_100_rapid_publishes()
+ {
+ var port = GetFreePort();
+ using var cts = new CancellationTokenSource();
+ var server = new NatsServer(
+ new NatsOptions { Port = port },
+ NullLoggerFactory.Instance);
+ _ = server.StartAsync(cts.Token);
+ await server.WaitForReadyAsync();
+
+ try
+ {
+ using var sub = await ConnectRawAsync(port);
+ await sub.SendAsync(
+ Encoding.ASCII.GetBytes("CONNECT {\"verbose\":false}\r\nSUB rapid 1\r\nPING\r\n"));
+ await ReadUntilAsync(sub, "PONG");
+
+ using var pub = await ConnectRawAsync(port);
+ await pub.SendAsync(Encoding.ASCII.GetBytes("CONNECT {\"verbose\":false}\r\n"));
+
+ var sb = new StringBuilder();
+ for (var i = 0; i < 100; i++)
+ sb.Append("PUB rapid 4\r\nping\r\n");
+ sb.Append("PING\r\n");
+ await pub.SendAsync(Encoding.ASCII.GetBytes(sb.ToString()));
+ await ReadUntilAsync(pub, "PONG", 5000);
+
+ var received = await ReadUntilAsync(sub, "MSG rapid", 5000);
+ received.ShouldContain("MSG rapid");
+ }
+ finally
+ {
+ await cts.CancelAsync();
+ server.Dispose();
+ }
+ }
+
+ // ---------------------------------------------------------------
+ // Go: TestNoRaceConcurrentSubscribeStartup norace_1_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "Stress")]
+ public async Task Concurrent_publish_and_subscribe_startup_does_not_crash_server()
+ {
+ var port = GetFreePort();
+ using var cts = new CancellationTokenSource();
+ var server = new NatsServer(
+ new NatsOptions { Port = port },
+ NullLoggerFactory.Instance);
+ _ = server.StartAsync(cts.Token);
+ await server.WaitForReadyAsync();
+
+ try
+ {
+ var tasks = Enumerable.Range(0, 10).Select(async i =>
+ {
+ using var sock = await ConnectRawAsync(port);
+ await sock.SendAsync(
+ Encoding.ASCII.GetBytes($"CONNECT {{\"verbose\":false}}\r\nSUB conc.start.{i} {i + 1}\r\nPING\r\n"));
+ await ReadUntilAsync(sock, "PONG", 3000);
+ });
+
+ await Task.WhenAll(tasks);
+
+ server.ClientCount.ShouldBeGreaterThanOrEqualTo(0);
+ }
+ finally
+ {
+ await cts.CancelAsync();
+ server.Dispose();
+ }
+ }
+
+ // ---------------------------------------------------------------
+ // Go: TestNoRaceLargeMessageMultipleSubscribers norace_1_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "Stress")]
+ public async Task Large_message_published_and_received_by_multiple_subscribers()
+ {
+ // Use 8KB payload — large enough to span multiple TCP segments but small
+ // enough to stay well within the default MaxPending limit in CI.
+ var port = GetFreePort();
+ using var cts = new CancellationTokenSource();
+ var server = new NatsServer(
+ new NatsOptions { Port = port },
+ NullLoggerFactory.Instance);
+ _ = server.StartAsync(cts.Token);
+ await server.WaitForReadyAsync();
+
+ const int payloadSize = 8192;
+ var payload = new string('L', payloadSize);
+
+ try
+ {
+ using var sub1 = await ConnectRawAsync(port);
+ using var sub2 = await ConnectRawAsync(port);
+
+ await sub1.SendAsync(
+ Encoding.ASCII.GetBytes("CONNECT {\"verbose\":false}\r\nSUB large.msg 1\r\nPING\r\n"));
+ await ReadUntilAsync(sub1, "PONG");
+
+ await sub2.SendAsync(
+ Encoding.ASCII.GetBytes("CONNECT {\"verbose\":false}\r\nSUB large.msg 2\r\nPING\r\n"));
+ await ReadUntilAsync(sub2, "PONG");
+
+ using var pub = await ConnectRawAsync(port);
+ await pub.SendAsync(Encoding.ASCII.GetBytes("CONNECT {\"verbose\":false}\r\n"));
+ await pub.SendAsync(Encoding.ASCII.GetBytes($"PUB large.msg {payloadSize}\r\n{payload}\r\nPING\r\n"));
+ // Use a longer timeout for large message delivery
+ await ReadUntilAsync(pub, "PONG", 10000);
+
+ var r1 = await ReadUntilAsync(sub1, "MSG large.msg", 10000);
+ var r2 = await ReadUntilAsync(sub2, "MSG large.msg", 10000);
+
+ r1.ShouldContain("MSG large.msg");
+ r2.ShouldContain("MSG large.msg");
+ }
+ finally
+ {
+ await cts.CancelAsync();
+ server.Dispose();
+ }
+ }
+
+ // ---------------------------------------------------------------
+ // Go: TestNoRaceSubscribeUnsubscribeResubscribeCycle norace_1_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "Stress")]
+ public async Task Subscribe_unsubscribe_resubscribe_cycle_100_times_without_error()
+ {
+ var port = GetFreePort();
+ using var cts = new CancellationTokenSource();
+ var server = new NatsServer(
+ new NatsOptions { Port = port },
+ NullLoggerFactory.Instance);
+ _ = server.StartAsync(cts.Token);
+ await server.WaitForReadyAsync();
+
+ try
+ {
+ using var client = await ConnectRawAsync(port);
+ await client.SendAsync(Encoding.ASCII.GetBytes("CONNECT {\"verbose\":false}\r\n"));
+
+ for (var i = 1; i <= 100; i++)
+ {
+ await client.SendAsync(
+ Encoding.ASCII.GetBytes($"SUB resub.cycle {i}\r\nUNSUB {i}\r\n"));
+ }
+
+ await client.SendAsync(Encoding.ASCII.GetBytes("PING\r\n"));
+ var resp = await ReadUntilAsync(client, "PONG", 5000);
+ resp.ShouldContain("PONG");
+ }
+ finally
+ {
+ await cts.CancelAsync();
+ server.Dispose();
+ }
+ }
+
+ // ---------------------------------------------------------------
+ // Go: TestNoRaceSubscriberReceivesAfterPause norace_1_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "Stress")]
+ public async Task Subscriber_receives_messages_correctly_after_brief_pause()
+ {
+ var port = GetFreePort();
+ using var cts = new CancellationTokenSource();
+ var server = new NatsServer(
+ new NatsOptions { Port = port },
+ NullLoggerFactory.Instance);
+ _ = server.StartAsync(cts.Token);
+ await server.WaitForReadyAsync();
+
+ try
+ {
+ using var sub = await ConnectRawAsync(port);
+ await sub.SendAsync(
+ Encoding.ASCII.GetBytes("CONNECT {\"verbose\":false}\r\nSUB pause.sub 1\r\nPING\r\n"));
+ await ReadUntilAsync(sub, "PONG");
+
+ // Brief pause simulating a subscriber that drifts slightly
+ await Task.Delay(100);
+
+ using var pub = await ConnectRawAsync(port);
+ await pub.SendAsync(Encoding.ASCII.GetBytes("CONNECT {\"verbose\":false}\r\n"));
+ await pub.SendAsync(Encoding.ASCII.GetBytes("PUB pause.sub 5\r\nhello\r\nPING\r\n"));
+ await ReadUntilAsync(pub, "PONG", 5000);
+
+ var received = await ReadUntilAsync(sub, "hello", 5000);
+ received.ShouldContain("hello");
+ }
+ finally
+ {
+ await cts.CancelAsync();
+ server.Dispose();
+ }
+ }
+
+ // ---------------------------------------------------------------
+ // Go: TestNoRaceMultipleClientConnectDisconnect norace_1_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "Stress")]
+ public async Task Multiple_client_connections_and_disconnections_leave_server_stable()
+ {
+ var port = GetFreePort();
+ using var cts = new CancellationTokenSource();
+ var server = new NatsServer(
+ new NatsOptions { Port = port },
+ NullLoggerFactory.Instance);
+ _ = server.StartAsync(cts.Token);
+ await server.WaitForReadyAsync();
+
+ try
+ {
+ // Connect and disconnect 20 clients sequentially to avoid hammering the port
+ for (var i = 0; i < 20; i++)
+ {
+ using var sock = await ConnectRawAsync(port);
+ await sock.SendAsync(
+ Encoding.ASCII.GetBytes("CONNECT {\"verbose\":false}\r\nPING\r\n"));
+ await ReadUntilAsync(sock, "PONG", 3000);
+ sock.Close();
+ }
+
+ // Brief settle time
+ await Task.Delay(200);
+
+ // Server should still accept new connections
+ using var final = await ConnectRawAsync(port);
+ await final.SendAsync(Encoding.ASCII.GetBytes("CONNECT {\"verbose\":false}\r\nPING\r\n"));
+ var resp = await ReadUntilAsync(final, "PONG", 3000);
+ resp.ShouldContain("PONG");
+ }
+ finally
+ {
+ await cts.CancelAsync();
+ server.Dispose();
+ }
+ }
+
+ // ---------------------------------------------------------------
+ // Go: TestNoRaceStatsCountersUnderLoad norace_1_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "Stress")]
+ public async Task Stats_in_and_out_bytes_increment_correctly_under_load()
+ {
+ var port = GetFreePort();
+ using var cts = new CancellationTokenSource();
+ var server = new NatsServer(
+ new NatsOptions { Port = port },
+ NullLoggerFactory.Instance);
+ _ = server.StartAsync(cts.Token);
+ await server.WaitForReadyAsync();
+
+ try
+ {
+ using var sub = await ConnectRawAsync(port);
+ await sub.SendAsync(
+ Encoding.ASCII.GetBytes("CONNECT {\"verbose\":false}\r\nSUB stats.load 1\r\nPING\r\n"));
+ await ReadUntilAsync(sub, "PONG");
+
+ using var pub = await ConnectRawAsync(port);
+ await pub.SendAsync(Encoding.ASCII.GetBytes("CONNECT {\"verbose\":false}\r\n"));
+
+ var sb = new StringBuilder();
+ for (var i = 0; i < 50; i++)
+ sb.Append("PUB stats.load 10\r\n0123456789\r\n");
+ sb.Append("PING\r\n");
+ await pub.SendAsync(Encoding.ASCII.GetBytes(sb.ToString()));
+ await ReadUntilAsync(pub, "PONG", 5000);
+
+ await Task.Delay(200);
+
+ var stats = server.Stats;
+ Interlocked.Read(ref stats.InMsgs).ShouldBeGreaterThan(0);
+ Interlocked.Read(ref stats.OutMsgs).ShouldBeGreaterThan(0);
+ }
+ finally
+ {
+ await cts.CancelAsync();
+ server.Dispose();
+ }
+ }
+
+ // ---------------------------------------------------------------
+ // Go: TestNoRaceRapidConnectDisconnect norace_1_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "Stress")]
+ public async Task Rapid_connect_disconnect_cycles_do_not_corrupt_server_state()
+ {
+ var port = GetFreePort();
+ using var cts = new CancellationTokenSource();
+ var server = new NatsServer(
+ new NatsOptions { Port = port },
+ NullLoggerFactory.Instance);
+ _ = server.StartAsync(cts.Token);
+ await server.WaitForReadyAsync();
+
+ try
+ {
+ // 30 rapid sequential connect + disconnect cycles
+ for (var i = 0; i < 30; i++)
+ {
+ var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
+ await sock.ConnectAsync(IPAddress.Loopback, port);
+ // Drain INFO
+ var buf = new byte[512];
+ await sock.ReceiveAsync(buf, SocketFlags.None);
+ // Immediately close — simulates a client that disconnects without CONNECT
+ sock.Close();
+ sock.Dispose();
+ }
+
+ await Task.Delay(300);
+
+ // Server should still respond
+ using var healthy = await ConnectRawAsync(port);
+ await healthy.SendAsync(Encoding.ASCII.GetBytes("CONNECT {\"verbose\":false}\r\nPING\r\n"));
+ var resp = await ReadUntilAsync(healthy, "PONG", 3000);
+ resp.ShouldContain("PONG");
+ }
+ finally
+ {
+ await cts.CancelAsync();
+ server.Dispose();
+ }
+ }
+
+ // ---------------------------------------------------------------
+ // Go: TestNoRacePublishWithCancellation norace_1_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "Stress")]
+ public async Task Server_accepts_connection_after_cancelled_client_task()
+ {
+ var port = GetFreePort();
+ using var serverCts = new CancellationTokenSource();
+ var server = new NatsServer(
+ new NatsOptions { Port = port },
+ NullLoggerFactory.Instance);
+ _ = server.StartAsync(serverCts.Token);
+ await server.WaitForReadyAsync();
+
+ try
+ {
+ using var clientCts = new CancellationTokenSource(TimeSpan.FromMilliseconds(50));
+
+ // Attempt a receive with a very short timeout — the token will cancel the read
+ // but the server should not be destabilised by the abrupt disconnect.
+ try
+ {
+ using var sock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
+ await sock.ConnectAsync(IPAddress.Loopback, port);
+ var buf = new byte[512];
+ await sock.ReceiveAsync(buf, SocketFlags.None, clientCts.Token);
+ }
+ catch (OperationCanceledException)
+ {
+ // Expected
+ }
+
+ await Task.Delay(200);
+
+ // Server should still function
+ using var good = await ConnectRawAsync(port);
+ await good.SendAsync(Encoding.ASCII.GetBytes("CONNECT {\"verbose\":false}\r\nPING\r\n"));
+ var resp = await ReadUntilAsync(good, "PONG", 3000);
+ resp.ShouldContain("PONG");
+ }
+ finally
+ {
+ await serverCts.CancelAsync();
+ server.Dispose();
+ }
+ }
+
+ // ---------------------------------------------------------------
+ // Go: TestNoRaceSlowConsumerClientCountDrops norace_1_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "Stress")]
+ public async Task Slow_consumer_is_removed_from_client_count_after_detection()
+ {
+ const long maxPending = 512;
+ const int payloadSize = 256;
+ const int floodCount = 20;
+
+ var port = GetFreePort();
+ using var cts = new CancellationTokenSource();
+ var server = new NatsServer(
+ new NatsOptions { Port = port, MaxPending = maxPending },
+ NullLoggerFactory.Instance);
+ _ = server.StartAsync(cts.Token);
+ await server.WaitForReadyAsync();
+
+ try
+ {
+ using var slowSub = await ConnectRawAsync(port);
+ await slowSub.SendAsync(
+ Encoding.ASCII.GetBytes("CONNECT {\"verbose\":false}\r\nSUB drop.test 1\r\nPING\r\n"));
+ await ReadUntilAsync(slowSub, "PONG");
+
+ using var pub = await ConnectRawAsync(port);
+ await pub.SendAsync(Encoding.ASCII.GetBytes("CONNECT {\"verbose\":false}\r\n"));
+
+ var payload = new string('D', payloadSize);
+ var sb = new StringBuilder();
+ for (var i = 0; i < floodCount; i++)
+ sb.Append($"PUB drop.test {payloadSize}\r\n{payload}\r\n");
+ sb.Append("PING\r\n");
+ await pub.SendAsync(Encoding.ASCII.GetBytes(sb.ToString()));
+ await ReadUntilAsync(pub, "PONG", 5000);
+
+ await Task.Delay(600);
+
+ // Publisher is still alive; slow subscriber has been dropped
+ server.ClientCount.ShouldBeLessThanOrEqualTo(2);
+ }
+ finally
+ {
+ await cts.CancelAsync();
+ server.Dispose();
+ }
+ }
+
+ // ---------------------------------------------------------------
+ // Go: TestNoRaceSubjectMatchingUnderConcurrentConnections norace_1_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "Stress")]
+ public async Task Server_delivers_to_correct_subscriber_when_multiple_subjects_active()
+ {
+ var port = GetFreePort();
+ using var cts = new CancellationTokenSource();
+ var server = new NatsServer(
+ new NatsOptions { Port = port },
+ NullLoggerFactory.Instance);
+ _ = server.StartAsync(cts.Token);
+ await server.WaitForReadyAsync();
+
+ try
+ {
+ using var sub1 = await ConnectRawAsync(port);
+ using var sub2 = await ConnectRawAsync(port);
+
+ await sub1.SendAsync(
+ Encoding.ASCII.GetBytes("CONNECT {\"verbose\":false}\r\nSUB target.A 1\r\nPING\r\n"));
+ await ReadUntilAsync(sub1, "PONG");
+
+ await sub2.SendAsync(
+ Encoding.ASCII.GetBytes("CONNECT {\"verbose\":false}\r\nSUB target.B 1\r\nPING\r\n"));
+ await ReadUntilAsync(sub2, "PONG");
+
+ using var pub = await ConnectRawAsync(port);
+ await pub.SendAsync(Encoding.ASCII.GetBytes("CONNECT {\"verbose\":false}\r\n"));
+ await pub.SendAsync(Encoding.ASCII.GetBytes("PUB target.A 5\r\nhello\r\nPING\r\n"));
+ await ReadUntilAsync(pub, "PONG", 5000);
+
+ var r1 = await ReadUntilAsync(sub1, "hello", 3000);
+ r1.ShouldContain("MSG target.A");
+
+ // sub2 should NOT have received the target.A message
+ sub2.ReceiveTimeout = 200;
+ var buf = new byte[512];
+ var n = 0;
+ try { n = sub2.Receive(buf); } catch (SocketException) { }
+ var s2Data = Encoding.ASCII.GetString(buf, 0, n);
+ s2Data.ShouldNotContain("target.A");
+ }
+ finally
+ {
+ await cts.CancelAsync();
+ server.Dispose();
+ }
+ }
+
+ // ---------------------------------------------------------------
+ // Go: TestNoRaceServerRejectsPayloadOverLimit norace_1_test.go
+ // ---------------------------------------------------------------
+
+ [Fact]
+ [Trait("Category", "Stress")]
+ public async Task Server_remains_stable_after_processing_many_medium_sized_messages()
+ {
+ var port = GetFreePort();
+ using var cts = new CancellationTokenSource();
+ var server = new NatsServer(
+ new NatsOptions { Port = port },
+ NullLoggerFactory.Instance);
+ _ = server.StartAsync(cts.Token);
+ await server.WaitForReadyAsync();
+
+ try
+ {
+ using var sub = await ConnectRawAsync(port);
+ await sub.SendAsync(
+ Encoding.ASCII.GetBytes("CONNECT {\"verbose\":false}\r\nSUB medium.msgs 1\r\nPING\r\n"));
+ await ReadUntilAsync(sub, "PONG");
+
+ using var pub = await ConnectRawAsync(port);
+ await pub.SendAsync(Encoding.ASCII.GetBytes("CONNECT {\"verbose\":false}\r\n"));
+
+ var payload = new string('M', 1024); // 1 KB each
+ var sb = new StringBuilder();
+ for (var i = 0; i < 200; i++)
+ sb.Append($"PUB medium.msgs 1024\r\n{payload}\r\n");
+ sb.Append("PING\r\n");
+
+ await pub.SendAsync(Encoding.ASCII.GetBytes(sb.ToString()));
+ await ReadUntilAsync(pub, "PONG", 10000);
+
+ var stats = server.Stats;
+ Interlocked.Read(ref stats.InMsgs).ShouldBeGreaterThanOrEqualTo(200);
+ }
+ finally
+ {
+ await cts.CancelAsync();
+ server.Dispose();
+ }
+ }
+}