// Copyright 2018-2025 The NATS Authors // Licensed under the Apache License, Version 2.0 // // NoRace integration tests - corresponds to Go file: // golang/nats-server/server/norace_1_test.go (first 51 tests) // // These tests are equivalent to Go's //go:build !race tests. // All tests require NATS_INTEGRATION_ENABLED=true to run. // Set [Trait("Category", "NoRace")] in addition to the base "Integration" trait. using System.Net; using System.Net.Sockets; using Shouldly; using Xunit.Abstractions; using ZB.MOM.NatsNet.Server.IntegrationTests.Helpers; namespace ZB.MOM.NatsNet.Server.IntegrationTests.NoRace; [Trait("Category", "NoRace")] [Trait("Category", "Integration")] public class NoRace1Tests : IntegrationTestBase { public NoRace1Tests(ITestOutputHelper output) : base(output) { } // --------------------------------------------------------------------------- // 1. TestNoRaceAvoidSlowConsumerBigMessages // Verifies that 500 large (1MB) messages are delivered to a subscriber // without triggering slow-consumer status on the server. // --------------------------------------------------------------------------- [Fact(Skip = "deferred: requires running NATS server")] public void AvoidSlowConsumerBigMessages_ShouldSucceed() { // Port of Go TestNoRaceAvoidSlowConsumerBigMessages: // 500 x 1MB messages delivered to subscriber without triggering slow-consumer } // --------------------------------------------------------------------------- // 2. TestNoRaceRoutedQueueAutoUnsubscribe // Two-server cluster. Creates 100 queue subs with AutoUnsubscribe(1) per server // for groups "bar" and "baz". Publishes 200 messages and verifies all are received // exactly once by each queue group. // --------------------------------------------------------------------------- [SkippableFact] public async Task RoutedQueueAutoUnsubscribe_ShouldSucceed() { Skip.If(!IntegrationEnabled, SkipMessage); Output.WriteLine("RoutedQueueAutoUnsubscribe requires a 2-server routed cluster — skipping unless cluster is configured."); Skip.If(true, "Requires 2-server routed cluster infrastructure"); await Task.CompletedTask; } // --------------------------------------------------------------------------- // 3. TestNoRaceClosedSlowConsumerWriteDeadline // Connects a slow raw TCP subscriber (1MB payload, 10ms write deadline). // Publishes 100 x 1MB messages. Verifies server closes the connection and // records it as SlowConsumerWriteDeadline. // --------------------------------------------------------------------------- [Fact(Skip = "deferred: requires running NATS server")] public void ClosedSlowConsumerWriteDeadline_ShouldSucceed() { // Port of Go TestNoRaceClosedSlowConsumerWriteDeadline: // Raw TCP slow subscriber, server closes connection via write deadline } // --------------------------------------------------------------------------- // 4. TestNoRaceClosedSlowConsumerPendingBytes // Same as above but triggers via MaxPending (1MB) rather than write deadline. // --------------------------------------------------------------------------- [Fact(Skip = "deferred: requires running NATS server")] public void ClosedSlowConsumerPendingBytes_ShouldSucceed() { // Port of Go TestNoRaceClosedSlowConsumerPendingBytes: // Raw TCP slow subscriber, server closes connection via MaxPending } // --------------------------------------------------------------------------- // 5. TestNoRaceSlowConsumerPendingBytes // After server closes the slow consumer connection, verifies that writing to // the closed socket eventually returns an error. // --------------------------------------------------------------------------- [Fact(Skip = "deferred: requires running NATS server")] public void SlowConsumerPendingBytes_ShouldSucceed() { // Port of Go TestNoRaceSlowConsumerPendingBytes: // After server closes slow consumer, writes to closed socket return error } // --------------------------------------------------------------------------- // 6. TestNoRaceGatewayNoMissingReplies // Complex 4-server gateway topology (A1, A2, B1, B2) verifying that // request-reply works correctly without missing replies across gateways // after interest-only mode is activated. // --------------------------------------------------------------------------- [SkippableFact] public async Task GatewayNoMissingReplies_ShouldSucceed() { Skip.If(!IntegrationEnabled, SkipMessage); Output.WriteLine("GatewayNoMissingReplies requires a 4-server gateway topology — skipping unless gateway infrastructure is configured."); Skip.If(true, "Requires 4-server gateway cluster infrastructure"); await Task.CompletedTask; } // --------------------------------------------------------------------------- // 7. TestNoRaceRouteMemUsage // Creates a 2-server cluster. Sends 100 requests with 50KB payloads via a // route. Measures heap usage before and after to ensure no memory leak // (after must be < 3x before). // --------------------------------------------------------------------------- [SkippableFact] public async Task RouteMemUsage_ShouldSucceed() { Skip.If(!IntegrationEnabled, SkipMessage); Output.WriteLine("RouteMemUsage requires a 2-server routed cluster"); Skip.If(true, "Requires routed cluster infrastructure"); await Task.CompletedTask; } // --------------------------------------------------------------------------- // 8. TestNoRaceRouteCache // Verifies that the per-account route subscription cache correctly prunes // closed subscriptions and stays at or below maxPerAccountCacheSize. // Tests both plain sub and queue sub variants. // --------------------------------------------------------------------------- [SkippableFact] public async Task RouteCache_ShouldSucceed() { Skip.If(!IntegrationEnabled, SkipMessage); Output.WriteLine("RouteCache requires a 2-server routed cluster with internal server access"); Skip.If(true, "Requires routed cluster infrastructure"); await Task.CompletedTask; } // --------------------------------------------------------------------------- // 9. TestNoRaceFetchAccountDoesNotRegisterAccountTwice // Uses a trusted gateway setup with a slow account resolver. Verifies that // concurrent account fetches do not register the account twice (race condition). // --------------------------------------------------------------------------- [SkippableFact] public async Task FetchAccountDoesNotRegisterAccountTwice_ShouldSucceed() { Skip.If(!IntegrationEnabled, SkipMessage); Output.WriteLine("FetchAccountDoesNotRegisterAccountTwice requires a trusted gateway setup"); Skip.If(true, "Requires trusted gateway cluster infrastructure"); await Task.CompletedTask; } // --------------------------------------------------------------------------- // 10. TestNoRaceWriteDeadline // Connects a raw TCP subscriber with a 30ms write deadline. // Publishes 1000 x 1MB messages and verifies the server closes // the connection, causing subsequent writes to fail. // --------------------------------------------------------------------------- [Fact(Skip = "deferred: requires running NATS server")] public void WriteDeadline_ShouldSucceed() { // Port of Go TestNoRaceWriteDeadline: // Raw TCP subscriber, server closes connection via write deadline after 1000 x 1MB msgs } // --------------------------------------------------------------------------- // 11. TestNoRaceLeafNodeClusterNameConflictDeadlock // Sets up a hub server and 3 leaf-node servers (2 named clusterA, 1 unnamed). // Verifies that a cluster name conflict does not cause a deadlock. // --------------------------------------------------------------------------- [SkippableFact] public async Task LeafNodeClusterNameConflictDeadlock_ShouldSucceed() { Skip.If(!IntegrationEnabled, SkipMessage); Output.WriteLine("LeafNodeClusterNameConflictDeadlock requires leaf node + cluster infrastructure"); Skip.If(true, "Requires leaf node cluster infrastructure"); await Task.CompletedTask; } // --------------------------------------------------------------------------- // 12. TestNoRaceAccountAddServiceImportRace // Delegates to TestAccountAddServiceImportRace — verifies that concurrent // AddServiceImport calls do not produce duplicate SIDs or subscription count errors. // --------------------------------------------------------------------------- [SkippableFact] public async Task AccountAddServiceImportRace_ShouldSucceed() { Skip.If(!IntegrationEnabled, SkipMessage); Output.WriteLine("AccountAddServiceImportRace requires service import infrastructure"); Skip.If(true, "Requires service import server infrastructure"); await Task.CompletedTask; } // --------------------------------------------------------------------------- // 13. TestNoRaceQueueAutoUnsubscribe // Single server. Creates 1000 queue subs (bar + baz) with AutoUnsubscribe(1). // Publishes 1000 messages, verifies each queue group receives exactly 1000. // --------------------------------------------------------------------------- [Fact(Skip = "deferred: requires running NATS server")] public void QueueAutoUnsubscribe_ShouldSucceed() { // Port of Go TestNoRaceQueueAutoUnsubscribe: // 1000 queue subs per group with AutoUnsubscribe(1), each group receives 1000 msgs } // --------------------------------------------------------------------------- // 14. TestNoRaceAcceptLoopsDoNotLeaveOpenedConn // For each connection type (client, route, gateway, leafnode, websocket): // opens connections while the server is shutting down, verifies no connections // are left open (timeout error on read). // --------------------------------------------------------------------------- [SkippableFact] public async Task AcceptLoopsDoNotLeaveOpenedConn_ShouldSucceed() { Skip.If(!IntegrationEnabled, SkipMessage); Output.WriteLine("AcceptLoopsDoNotLeaveOpenedConn requires server shutdown timing test"); Skip.If(true, "Requires server lifecycle control infrastructure"); await Task.CompletedTask; } // --------------------------------------------------------------------------- // 15. TestNoRaceJetStreamDeleteStreamManyConsumers // Creates a JetStream stream with 2000 consumers (all with DeliverSubject), // then deletes the stream. Verifies that delete does not hang (bug: sendq // size exceeded would cause deadlock). // --------------------------------------------------------------------------- [Fact(Skip = "deferred: requires running JetStream server")] public void JetStreamDeleteStreamManyConsumers_ShouldSucceed() { // Port of Go TestNoRaceJetStreamDeleteStreamManyConsumers: // Stream with 2000 push consumers deleted without deadlock } // --------------------------------------------------------------------------- // 16. TestNoRaceJetStreamServiceImportAccountSwapIssue // Creates a JetStream stream and pull consumer. Runs concurrent publishing // and StreamInfo requests alongside fetch operations for 3 seconds. Verifies // no errors occur from account swap race condition. // --------------------------------------------------------------------------- [Fact(Skip = "deferred: requires running JetStream server")] public void JetStreamServiceImportAccountSwapIssue_ShouldSucceed() { // Port of Go TestNoRaceJetStreamServiceImportAccountSwapIssue: // Concurrent publish/info/fetch has no account swap race condition } // --------------------------------------------------------------------------- // 17. TestNoRaceJetStreamAPIStreamListPaging // Creates 2*JSApiNamesLimit (256) streams. Verifies that the stream list API // correctly pages results with offset parameter. // --------------------------------------------------------------------------- [Fact(Skip = "deferred: requires running JetStream server")] public void JetStreamAPIStreamListPaging_ShouldSucceed() { // Port of Go TestNoRaceJetStreamAPIStreamListPaging: // Stream list API paging with 256 streams, offset=0/128/256 } // --------------------------------------------------------------------------- // 18. TestNoRaceJetStreamAPIConsumerListPaging // Creates JSApiNamesLimit (128) consumers on a stream. Verifies consumer list // paging with various offsets. // --------------------------------------------------------------------------- [Fact(Skip = "deferred: requires running JetStream server")] public void JetStreamAPIConsumerListPaging_ShouldSucceed() { // Port of Go TestNoRaceJetStreamAPIConsumerListPaging: // Consumer list paging with 128 consumers, offset=0/106/150 } // --------------------------------------------------------------------------- // 19. TestNoRaceJetStreamWorkQueueLoadBalance // Creates a work queue stream with 25 worker goroutines. Publishes 1000 messages. // Verifies each worker receives approximately equal share (+/-50% + 5). // --------------------------------------------------------------------------- [Fact(Skip = "deferred: requires running JetStream server")] public void JetStreamWorkQueueLoadBalance_ShouldSucceed() { // Port of Go TestNoRaceJetStreamWorkQueueLoadBalance: // 1000 msgs distributed across 25 workers, each within target ± delta } // --------------------------------------------------------------------------- // 20. TestNoRaceJetStreamClusterLargeStreamInlineCatchup // 3-server cluster. Shuts down one server, publishes 5000 messages. Kills // the stream leader. Restarts the first server. Verifies it catches up to // 5000 messages by becoming stream leader. // --------------------------------------------------------------------------- [Fact(Skip = "deferred: requires running JetStream cluster server")] public void JetStreamClusterLargeStreamInlineCatchup_ShouldSucceed() { // Port of Go TestNoRaceJetStreamClusterLargeStreamInlineCatchup: // Server restart catches up to 5000 msgs by becoming stream leader } // --------------------------------------------------------------------------- // 21. TestNoRaceJetStreamClusterStreamCreateAndLostQuorum // 3-server cluster. Creates replicated stream. Stops all servers. Restarts // one. Subscribes to quorum-lost advisory. Restarts remaining servers. // Verifies no spurious quorum-lost advisory is received. // --------------------------------------------------------------------------- [Fact(Skip = "deferred: requires running JetStream cluster server")] public void JetStreamClusterStreamCreateAndLostQuorum_ShouldSucceed() { // Port of Go TestNoRaceJetStreamClusterStreamCreateAndLostQuorum: // No spurious quorum-lost advisory after stop-all/restart-one/restart-remaining } // --------------------------------------------------------------------------- // 22. TestNoRaceJetStreamSuperClusterMirrors // 3-cluster x 3-server super-cluster. Creates source stream in C2, sends 100 // msgs. Creates mirror M1 in C1, verifies 100 msgs. Purges source, sends 50 // more. Creates M2 (replicas=3) in C3. Verifies catchup after stream leader // restart during concurrent publishing. // --------------------------------------------------------------------------- [Fact(Skip = "deferred: requires running JetStream super-cluster")] public void JetStreamSuperClusterMirrors_ShouldSucceed() { // Port of Go TestNoRaceJetStreamSuperClusterMirrors: // 3x3 super-cluster mirror catchup after stream leader restart } // --------------------------------------------------------------------------- // 23. TestNoRaceJetStreamSuperClusterMixedModeMirrors // 7-server super-cluster with mixed JetStream/non-JetStream nodes. // Creates 10 origin streams (1000 msgs each) then creates 10 mirrors // (replicas=3) in a loop 3 times, verifying each gets 1000 msgs. // --------------------------------------------------------------------------- [Fact(Skip = "deferred: requires running JetStream super-cluster")] public void JetStreamSuperClusterMixedModeMirrors_ShouldSucceed() { // Port of Go TestNoRaceJetStreamSuperClusterMixedModeMirrors: // Mixed-mode super-cluster, 10 origin streams x 10 mirrors x 3 iterations } // --------------------------------------------------------------------------- // 24. TestNoRaceJetStreamSuperClusterSources // 3x3 super-cluster. Creates 3 source streams (foo=10, bar=15, baz=25 msgs). // Creates aggregate stream MS sourcing all three — verifies 50 msgs. // Then purges, sends more, creates MS2 with replicas=3 in C3. // Verifies catchup after leader restart during concurrent publishing (200 msgs). // --------------------------------------------------------------------------- [Fact(Skip = "deferred: requires running JetStream super-cluster")] public void JetStreamSuperClusterSources_ShouldSucceed() { // Port of Go TestNoRaceJetStreamSuperClusterSources: // 3x3 super-cluster sources aggregate, catchup after leader restart } // --------------------------------------------------------------------------- // 25. TestNoRaceJetStreamClusterSourcesMuxd // 3-server cluster. Creates 10 origin streams (10000 msgs each, 1KB payload). // Creates aggregate stream S sourcing all 10 (replicas=2). // Verifies S has 100,000 msgs total within 20 seconds. // --------------------------------------------------------------------------- [Fact(Skip = "deferred: requires running JetStream cluster server")] public void JetStreamClusterSourcesMuxd_ShouldSucceed() { // Port of Go TestNoRaceJetStreamClusterSourcesMuxd: // Aggregate stream sourcing 10 origins, 100k msgs total } // --------------------------------------------------------------------------- // 26. TestNoRaceJetStreamSuperClusterMixedModeSources // 7-server mixed-mode super-cluster. Creates 100 origin streams (1000 msgs // each). Creates aggregate stream S (replicas=3) sourcing all 100 — 100,000 // msgs. Repeats 3x with delete between iterations. // --------------------------------------------------------------------------- [Fact(Skip = "deferred: requires running JetStream super-cluster")] public void JetStreamSuperClusterMixedModeSources_ShouldSucceed() { // Port of Go TestNoRaceJetStreamSuperClusterMixedModeSources: // Mixed-mode super-cluster, 100 origin streams aggregated, 3 iterations } // --------------------------------------------------------------------------- // 27. TestNoRaceJetStreamClusterExtendedStreamPurgeStall // [skip(t) in Go — not run by default] Needs big machine. // Verifies that subject-filtered stream purge completes in < 1 second with // < 100MB memory usage (was ~7GB before fix). // --------------------------------------------------------------------------- [SkippableFact] public async Task JetStreamClusterExtendedStreamPurgeStall_ShouldSucceed() { Skip.If(true, "Explicitly skipped in Go source (skip(t)) — performance test requiring large machine"); await Task.CompletedTask; } // --------------------------------------------------------------------------- // 28. TestNoRaceJetStreamClusterMirrorExpirationAndMissingSequences // 9-server cluster. Creates source stream with 500ms MaxAge. Creates mirror // on a different server. Sends 10 msgs, verifies mirror has 10. Shuts down // mirror server, sends 10 more (they expire). Restarts mirror server. // Sends 10 more, verifies mirror has 20 (original 10 + last 10). // --------------------------------------------------------------------------- [Fact(Skip = "deferred: requires running JetStream cluster server")] public void JetStreamClusterMirrorExpirationAndMissingSequences_ShouldSucceed() { // Port of Go TestNoRaceJetStreamClusterMirrorExpirationAndMissingSequences: // 9-server cluster, mirror catches up after server restart with expired sequences } // --------------------------------------------------------------------------- // 29. TestNoRaceJetStreamClusterLargeActiveOnReplica // [skip(t) in Go — not run by default] // Verifies that stream replica active time is never > 5s (performance test). // --------------------------------------------------------------------------- [SkippableFact] public async Task JetStreamClusterLargeActiveOnReplica_ShouldSucceed() { Skip.If(true, "Explicitly skipped in Go source (skip(t)) — performance test requiring large machine"); await Task.CompletedTask; } // --------------------------------------------------------------------------- // 30. TestNoRaceJetStreamSuperClusterRIPStress // [skip(t) in Go — not run by default] // Long-running stress test (8 min): 3x3 super-cluster, 150 streams + mux + // mirror streams, 64 clients publishing concurrently. // --------------------------------------------------------------------------- [SkippableFact] public async Task JetStreamSuperClusterRIPStress_ShouldSucceed() { Skip.If(true, "Explicitly skipped in Go source (skip(t)) — long-running stress test"); await Task.CompletedTask; } // --------------------------------------------------------------------------- // 31. TestNoRaceJetStreamSlowFilteredInitialPendingAndFirstMsg // Creates a stream with 500k messages across 5 subjects. Tests that creating // consumers with specific filter subjects completes in < 150ms each. // Also verifies NumPending accuracy after message deletion and server restart. // --------------------------------------------------------------------------- [Fact(Skip = "deferred: requires running JetStream server")] public void JetStreamSlowFilteredInitialPendingAndFirstMsg_ShouldSucceed() { // Port of Go TestNoRaceJetStreamSlowFilteredInitialPendingAndFirstMsg: // Consumer creation < 150ms with 500k messages, NumPending accuracy } // --------------------------------------------------------------------------- // 32. TestNoRaceJetStreamFileStoreBufferReuse // [skip(t) in Go — not run by default] // Memory allocation test for FileStore buffer reuse with 200k messages. // --------------------------------------------------------------------------- [SkippableFact] public async Task JetStreamFileStoreBufferReuse_ShouldSucceed() { Skip.If(true, "Explicitly skipped in Go source (skip(t)) — performance test requiring large machine"); await Task.CompletedTask; } // --------------------------------------------------------------------------- // 33. TestNoRaceJetStreamSlowRestartWithManyExpiredMsgs // Creates a stream with MaxAge=100ms, publishes 50k messages. // Waits for expiry, shuts down server, restarts it. // Verifies restart completes in < 5 seconds with 0 messages remaining. // --------------------------------------------------------------------------- [Fact(Skip = "deferred: requires running JetStream server")] public void JetStreamSlowRestartWithManyExpiredMsgs_ShouldSucceed() { // Port of Go TestNoRaceJetStreamSlowRestartWithManyExpiredMsgs: // Restart with 50k expired messages completes in < 5s with 0 msgs remaining } // --------------------------------------------------------------------------- // 34. TestNoRaceJetStreamStalledMirrorsAfterExpire // Creates source stream (MaxAge=250ms), publishes 100 msgs. // Creates mirror. Waits for mirror to sync. Publishes 100 more with delay. // Verifies mirror has all 200 msgs despite source expiration (not stalled). // --------------------------------------------------------------------------- [Fact(Skip = "deferred: requires running JetStream server")] public void JetStreamStalledMirrorsAfterExpire_ShouldSucceed() { // Port of Go TestNoRaceJetStreamStalledMirrorsAfterExpire: // Mirror does not stall after source expiry, eventually has 200 msgs } // --------------------------------------------------------------------------- // 35. TestNoRaceJetStreamSuperClusterAccountConnz // 3x3 super-cluster. Verifies account connections info (connz) is reported // correctly across gateway connections for multiple accounts. // --------------------------------------------------------------------------- [Fact(Skip = "deferred: requires running JetStream super-cluster")] public void JetStreamSuperClusterAccountConnz_ShouldSucceed() { // Port of Go TestNoRaceJetStreamSuperClusterAccountConnz: // Account connz reported correctly across gateway connections } // --------------------------------------------------------------------------- // 36. TestNoRaceCompressedConnz // Starts a server with HTTP monitoring. Sends a gzip-accept connz request. // Verifies the response is valid gzip JSON with correct connection counts. // --------------------------------------------------------------------------- [Fact(Skip = "deferred: requires running NATS server")] public void CompressedConnz_ShouldSucceed() { // Port of Go TestNoRaceCompressedConnz: // Gzip-compressed HTTP monitoring connz endpoint returns valid JSON } // --------------------------------------------------------------------------- // 37. TestNoRaceJetStreamClusterExtendedStreamPurge // 3-server cluster. Creates stream with 100k messages across 1000 subjects. // Purges by subject, verifies purge is near-instant (< 5s per purge). // Tests both direct and per-subject purge with replica confirmation. // --------------------------------------------------------------------------- [Fact(Skip = "deferred: requires running JetStream cluster server")] public void JetStreamClusterExtendedStreamPurge_ShouldSucceed() { // Port of Go TestNoRaceJetStreamClusterExtendedStreamPurge: // Per-subject purge < 5s with 100k messages across 1000 subjects } // --------------------------------------------------------------------------- // 38. TestNoRaceJetStreamFileStoreCompaction // Creates file store stream, sends 200 messages with 50% TTL = 1s, // waits for expiry, publishes another 100. // Verifies file store compacts correctly (block count decreases). // --------------------------------------------------------------------------- [Fact(Skip = "deferred: requires running JetStream server")] public void JetStreamFileStoreCompaction_ShouldSucceed() { // Port of Go TestNoRaceJetStreamFileStoreCompaction: // File store compaction after message expiry reduces block count } // --------------------------------------------------------------------------- // 39. TestNoRaceJetStreamEncryptionEnabledOnRestartWithExpire // Creates an encrypted JetStream server, publishes messages with short TTL. // Restarts server. Verifies messages expired correctly and no data corruption. // --------------------------------------------------------------------------- [Fact(Skip = "deferred: requires running JetStream server")] public void JetStreamEncryptionEnabledOnRestartWithExpire_ShouldSucceed() { // Port of Go TestNoRaceJetStreamEncryptionEnabledOnRestartWithExpire: // Encrypted JetStream restart with expired messages, no data corruption } // --------------------------------------------------------------------------- // 40. TestNoRaceJetStreamOrderedConsumerMissingMsg // Creates ordered consumer on a stream. Publishes messages in two goroutines. // Deletes some messages. Verifies ordered consumer receives all non-deleted // messages in order without stalling. // --------------------------------------------------------------------------- [Fact(Skip = "deferred: requires running JetStream server")] public void JetStreamOrderedConsumerMissingMsg_ShouldSucceed() { // Port of Go TestNoRaceJetStreamOrderedConsumerMissingMsg: // Ordered consumer receives all non-deleted messages in sequence, no stalls } // --------------------------------------------------------------------------- // 41. TestNoRaceJetStreamClusterInterestPolicyAckNone // 3-server cluster. Creates interest-policy stream (AckNone). // Publishes 100k messages. Creates multiple consumers. // Verifies messages are removed once all consumers have received them. // --------------------------------------------------------------------------- [Fact(Skip = "deferred: requires running JetStream cluster server")] public void JetStreamClusterInterestPolicyAckNone_ShouldSucceed() { // Port of Go TestNoRaceJetStreamClusterInterestPolicyAckNone: // Interest-policy AckNone stream, messages removed after all consumers receive } // --------------------------------------------------------------------------- // 42. TestNoRaceJetStreamLastSubjSeqAndFilestoreCompact // Creates file store stream, publishes messages to multiple subjects. // Verifies that LastSubjectSeq is tracked correctly through compaction. // --------------------------------------------------------------------------- [Fact(Skip = "deferred: requires running JetStream server")] public void JetStreamLastSubjSeqAndFilestoreCompact_ShouldSucceed() { // Port of Go TestNoRaceJetStreamLastSubjSeqAndFilestoreCompact: // LastSubjectSeq tracked correctly through file store compaction } // --------------------------------------------------------------------------- // 43. TestNoRaceJetStreamClusterMemoryStreamConsumerRaftGrowth // 3-server cluster. Creates memory stream with durable consumer. // Publishes 2 million messages and verifies the Raft WAL does not grow // unboundedly (checks WAL file sizes). // --------------------------------------------------------------------------- [Fact(Skip = "deferred: requires running JetStream cluster server")] public void JetStreamClusterMemoryStreamConsumerRaftGrowth_ShouldSucceed() { // Port of Go TestNoRaceJetStreamClusterMemoryStreamConsumerRaftGrowth: // Raft WAL does not grow unboundedly with 2M messages } // --------------------------------------------------------------------------- // 44. TestNoRaceJetStreamClusterCorruptWAL // 3-server cluster. Publishes messages. Corrupts the Raft WAL on all non-leader // replicas. Restarts servers. Verifies the cluster recovers and all messages // are intact. // --------------------------------------------------------------------------- [Fact(Skip = "deferred: requires running JetStream cluster server")] public void JetStreamClusterCorruptWAL_ShouldSucceed() { // Port of Go TestNoRaceJetStreamClusterCorruptWAL: // Cluster recovers from Raft WAL corruption, messages intact after restart } // --------------------------------------------------------------------------- // 45. TestNoRaceJetStreamClusterInterestRetentionDeadlock // 3-server cluster. Creates interest-retention stream with push consumer. // Publishes messages while simultaneously deleting and recreating the consumer. // Verifies no deadlock occurs. // --------------------------------------------------------------------------- [Fact(Skip = "deferred: requires running JetStream cluster server")] public void JetStreamClusterInterestRetentionDeadlock_ShouldSucceed() { // Port of Go TestNoRaceJetStreamClusterInterestRetentionDeadlock: // Concurrent publish + consumer delete/recreate does not deadlock } // --------------------------------------------------------------------------- // 46. TestNoRaceJetStreamClusterMaxConsumersAndDirect // 3-server cluster. Stream with MaxConsumers limit. Verifies that direct-get // operations do not count against MaxConsumers. // --------------------------------------------------------------------------- [Fact(Skip = "deferred: requires running JetStream cluster server")] public void JetStreamClusterMaxConsumersAndDirect_ShouldSucceed() { // Port of Go TestNoRaceJetStreamClusterMaxConsumersAndDirect: // Direct-get operations do not count against MaxConsumers limit } // --------------------------------------------------------------------------- // 47. TestNoRaceJetStreamClusterStreamReset // 3-server cluster. Creates stream and sends messages. Kills a replica. // While server is down, purges the stream. Restarts the server. // Verifies stream resets correctly (no phantom messages). // --------------------------------------------------------------------------- [Fact(Skip = "deferred: requires running JetStream cluster server")] public void JetStreamClusterStreamReset_ShouldSucceed() { // Port of Go TestNoRaceJetStreamClusterStreamReset: // Stream resets correctly after replica restart following purge } // --------------------------------------------------------------------------- // 48. TestNoRaceJetStreamKeyValueCompaction // Creates a KV bucket, puts 10k entries to 100 keys (multiple revisions). // Verifies that after compaction the bucket has only the latest revision // per key (100 msgs total). // --------------------------------------------------------------------------- [Fact(Skip = "deferred: requires running JetStream server")] public void JetStreamKeyValueCompaction_ShouldSucceed() { // Port of Go TestNoRaceJetStreamKeyValueCompaction: // KV compaction retains only latest revision per key (100 keys, 10k entries) } // --------------------------------------------------------------------------- // 49. TestNoRaceJetStreamClusterStreamSeqMismatchIssue // 3-server cluster. Tests that stream sequence number mismatch between // leader and replicas does not cause incorrect state after recovery. // --------------------------------------------------------------------------- [Fact(Skip = "deferred: requires running JetStream cluster server")] public void JetStreamClusterStreamSeqMismatchIssue_ShouldSucceed() { // Port of Go TestNoRaceJetStreamClusterStreamSeqMismatchIssue: // Stream sequence mismatch between leader and replicas recovers correctly } // --------------------------------------------------------------------------- // 50. TestNoRaceJetStreamClusterStreamDropCLFS // 3-server cluster. Verifies that when a replica drops CLFS (checksum-less // full-state) messages, recovery still yields correct stream state. // --------------------------------------------------------------------------- [Fact(Skip = "deferred: requires running JetStream cluster server")] public void JetStreamClusterStreamDropCLFS_ShouldSucceed() { // Port of Go TestNoRaceJetStreamClusterStreamDropCLFS: // Replica CLFS drop recovery yields correct stream state } // --------------------------------------------------------------------------- // 51. TestNoRaceJetStreamMemstoreWithLargeInteriorDeletes // Creates a memory store stream, publishes 1 million messages, then deletes // every other message. Verifies NumPending and Num counts remain accurate // through a large number of interior deletes. // --------------------------------------------------------------------------- [Fact(Skip = "deferred: requires running JetStream server")] public void JetStreamMemstoreWithLargeInteriorDeletes_ShouldSucceed() { // Port of Go TestNoRaceJetStreamMemstoreWithLargeInteriorDeletes: // Memory store NumPending/Num accurate with 500k interior deletes } }