Ports 51 tests from norace_1_test.go and 24 tests from norace_2_test.go
as [SkippableFact] integration tests. Creates test harness infrastructure
(IntegrationTestBase, CheckHelper, NatsTestClient, TestServerHelper,
TestCluster) and tags all tests with [Trait("Category", "NoRace")].
Tests skip unless NATS_INTEGRATION_ENABLED=true is set.
1110 lines
51 KiB
C#
1110 lines
51 KiB
C#
// Copyright 2018-2025 The NATS Authors
|
|
// Licensed under the Apache License, Version 2.0
|
|
//
|
|
// NoRace integration tests - corresponds to Go file:
|
|
// golang/nats-server/server/norace_1_test.go (first 51 tests)
|
|
//
|
|
// These tests are equivalent to Go's //go:build !race tests.
|
|
// All tests require NATS_INTEGRATION_ENABLED=true to run.
|
|
// Set [Trait("Category", "NoRace")] in addition to the base "Integration" trait.
|
|
|
|
using System.Net;
|
|
using System.Net.Sockets;
|
|
using Shouldly;
|
|
using Xunit.Abstractions;
|
|
using ZB.MOM.NatsNet.Server.IntegrationTests.Helpers;
|
|
|
|
namespace ZB.MOM.NatsNet.Server.IntegrationTests.NoRace;
|
|
|
|
[Trait("Category", "NoRace")]
|
|
[Trait("Category", "Integration")]
|
|
public class NoRace1Tests : IntegrationTestBase
|
|
{
|
|
public NoRace1Tests(ITestOutputHelper output) : base(output) { }
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// 1. TestNoRaceAvoidSlowConsumerBigMessages
|
|
// Verifies that 500 large (1MB) messages are delivered to a subscriber
|
|
// without triggering slow-consumer status on the server.
|
|
// ---------------------------------------------------------------------------
|
|
[SkippableFact]
|
|
public async Task AvoidSlowConsumerBigMessages_ShouldSucceed()
|
|
{
|
|
Skip.If(!IntegrationEnabled, SkipMessage);
|
|
|
|
var serverUrl = TestServerHelper.RunServer(Output);
|
|
await using var nc1 = await NatsTestClient.Connect(serverUrl);
|
|
await using var nc2 = await NatsTestClient.Connect(serverUrl);
|
|
|
|
const int expected = 500;
|
|
var received = 0;
|
|
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
|
|
|
|
var data = new byte[1024 * 1024];
|
|
Random.Shared.NextBytes(data);
|
|
|
|
var subTask = Task.Run(async () =>
|
|
{
|
|
await foreach (var msg in nc1.Connection.SubscribeAsync<byte[]>("slow.consumer"))
|
|
{
|
|
if (Interlocked.Increment(ref received) >= expected)
|
|
{
|
|
tcs.TrySetResult(true);
|
|
break;
|
|
}
|
|
}
|
|
});
|
|
|
|
await Task.Delay(100);
|
|
for (var i = 0; i < expected; i++)
|
|
await nc2.Connection.PublishAsync("slow.consumer", data);
|
|
|
|
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
|
|
var done = await Task.WhenAny(tcs.Task, Task.Delay(Timeout.Infinite, cts.Token));
|
|
done.ShouldBe(tcs.Task, "Did not receive all large messages within timeout");
|
|
received.ShouldBe(expected);
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// 2. TestNoRaceRoutedQueueAutoUnsubscribe
|
|
// Two-server cluster. Creates 100 queue subs with AutoUnsubscribe(1) per server
|
|
// for groups "bar" and "baz". Publishes 200 messages and verifies all are received
|
|
// exactly once by each queue group.
|
|
// ---------------------------------------------------------------------------
|
|
[SkippableFact]
|
|
public async Task RoutedQueueAutoUnsubscribe_ShouldSucceed()
|
|
{
|
|
Skip.If(!IntegrationEnabled, SkipMessage);
|
|
Output.WriteLine("RoutedQueueAutoUnsubscribe requires a 2-server routed cluster — skipping unless cluster is configured.");
|
|
Skip.If(true, "Requires 2-server routed cluster infrastructure");
|
|
await Task.CompletedTask;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// 3. TestNoRaceClosedSlowConsumerWriteDeadline
|
|
// Connects a slow raw TCP subscriber (1MB payload, 10ms write deadline).
|
|
// Publishes 100 x 1MB messages. Verifies server closes the connection and
|
|
// records it as SlowConsumerWriteDeadline.
|
|
// ---------------------------------------------------------------------------
|
|
[SkippableFact]
|
|
public async Task ClosedSlowConsumerWriteDeadline_ShouldSucceed()
|
|
{
|
|
Skip.If(!IntegrationEnabled, SkipMessage);
|
|
|
|
var serverUrl = TestServerHelper.RunServer(Output);
|
|
var uri = new Uri(serverUrl);
|
|
var host = uri.Host;
|
|
var port = uri.Port;
|
|
|
|
using var rawConn = new TcpClient();
|
|
await rawConn.ConnectAsync(host, port);
|
|
rawConn.ReceiveBufferSize = 128;
|
|
|
|
var stream = rawConn.GetStream();
|
|
var connectCmd = System.Text.Encoding.ASCII.GetBytes("CONNECT {}\r\nPING\r\nSUB foo 1\r\n");
|
|
await stream.WriteAsync(connectCmd);
|
|
|
|
await using var sender = await NatsTestClient.Connect(serverUrl);
|
|
var payload = new byte[1024 * 1024];
|
|
for (var i = 0; i < 100; i++)
|
|
await sender.Connection.PublishAsync("foo", payload);
|
|
|
|
// Server should close the connection — read until error
|
|
var buf = new byte[4096];
|
|
var closed = false;
|
|
using var readCts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
|
try
|
|
{
|
|
while (!readCts.IsCancellationRequested)
|
|
await stream.ReadAsync(buf, readCts.Token);
|
|
}
|
|
catch (Exception)
|
|
{
|
|
closed = true;
|
|
}
|
|
closed.ShouldBeTrue("Server should have closed the slow-consumer connection");
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// 4. TestNoRaceClosedSlowConsumerPendingBytes
|
|
// Same as above but triggers via MaxPending (1MB) rather than write deadline.
|
|
// ---------------------------------------------------------------------------
|
|
[SkippableFact]
|
|
public async Task ClosedSlowConsumerPendingBytes_ShouldSucceed()
|
|
{
|
|
Skip.If(!IntegrationEnabled, SkipMessage);
|
|
|
|
var serverUrl = TestServerHelper.RunServer(Output);
|
|
var uri = new Uri(serverUrl);
|
|
|
|
using var rawConn = new TcpClient();
|
|
await rawConn.ConnectAsync(uri.Host, uri.Port);
|
|
rawConn.ReceiveBufferSize = 128;
|
|
|
|
var stream = rawConn.GetStream();
|
|
var connectCmd = System.Text.Encoding.ASCII.GetBytes("CONNECT {}\r\nPING\r\nSUB foo 1\r\n");
|
|
await stream.WriteAsync(connectCmd);
|
|
|
|
await using var sender = await NatsTestClient.Connect(serverUrl);
|
|
var payload = new byte[1024 * 1024];
|
|
for (var i = 0; i < 100; i++)
|
|
await sender.Connection.PublishAsync("foo", payload);
|
|
|
|
var buf = new byte[4096];
|
|
var closed = false;
|
|
using var readCts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
|
try
|
|
{
|
|
while (!readCts.IsCancellationRequested)
|
|
await stream.ReadAsync(buf, readCts.Token);
|
|
}
|
|
catch (Exception)
|
|
{
|
|
closed = true;
|
|
}
|
|
closed.ShouldBeTrue("Server should have closed slow-consumer connection due to pending bytes");
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// 5. TestNoRaceSlowConsumerPendingBytes
|
|
// After server closes the slow consumer connection, verifies that writing to
|
|
// the closed socket eventually returns an error.
|
|
// ---------------------------------------------------------------------------
|
|
[SkippableFact]
|
|
public async Task SlowConsumerPendingBytes_ShouldSucceed()
|
|
{
|
|
Skip.If(!IntegrationEnabled, SkipMessage);
|
|
|
|
var serverUrl = TestServerHelper.RunServer(Output);
|
|
var uri = new Uri(serverUrl);
|
|
|
|
using var rawConn = new TcpClient();
|
|
await rawConn.ConnectAsync(uri.Host, uri.Port);
|
|
rawConn.ReceiveBufferSize = 128;
|
|
|
|
var stream = rawConn.GetStream();
|
|
var connectCmd = System.Text.Encoding.ASCII.GetBytes("CONNECT {}\r\nPING\r\nSUB foo 1\r\n");
|
|
await stream.WriteAsync(connectCmd);
|
|
|
|
await using var sender = await NatsTestClient.Connect(serverUrl);
|
|
var payload = new byte[1024 * 1024];
|
|
for (var i = 0; i < 100; i++)
|
|
await sender.Connection.PublishAsync("foo", payload);
|
|
|
|
var pubCmd = System.Text.Encoding.ASCII.GetBytes("PUB bar 5\r\nhello\r\n");
|
|
var gotError = false;
|
|
for (var i = 0; i < 100; i++)
|
|
{
|
|
try
|
|
{
|
|
await stream.WriteAsync(pubCmd);
|
|
await Task.Delay(10);
|
|
}
|
|
catch (Exception)
|
|
{
|
|
gotError = true;
|
|
break;
|
|
}
|
|
}
|
|
gotError.ShouldBeTrue("Connection should have been closed by server");
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// 6. TestNoRaceGatewayNoMissingReplies
|
|
// Complex 4-server gateway topology (A1, A2, B1, B2) verifying that
|
|
// request-reply works correctly without missing replies across gateways
|
|
// after interest-only mode is activated.
|
|
// ---------------------------------------------------------------------------
|
|
[SkippableFact]
|
|
public async Task GatewayNoMissingReplies_ShouldSucceed()
|
|
{
|
|
Skip.If(!IntegrationEnabled, SkipMessage);
|
|
Output.WriteLine("GatewayNoMissingReplies requires a 4-server gateway topology — skipping unless gateway infrastructure is configured.");
|
|
Skip.If(true, "Requires 4-server gateway cluster infrastructure");
|
|
await Task.CompletedTask;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// 7. TestNoRaceRouteMemUsage
|
|
// Creates a 2-server cluster. Sends 100 requests with 50KB payloads via a
|
|
// route. Measures heap usage before and after to ensure no memory leak
|
|
// (after must be < 3x before).
|
|
// ---------------------------------------------------------------------------
|
|
[SkippableFact]
|
|
public async Task RouteMemUsage_ShouldSucceed()
|
|
{
|
|
Skip.If(!IntegrationEnabled, SkipMessage);
|
|
Output.WriteLine("RouteMemUsage requires a 2-server routed cluster");
|
|
Skip.If(true, "Requires routed cluster infrastructure");
|
|
await Task.CompletedTask;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// 8. TestNoRaceRouteCache
|
|
// Verifies that the per-account route subscription cache correctly prunes
|
|
// closed subscriptions and stays at or below maxPerAccountCacheSize.
|
|
// Tests both plain sub and queue sub variants.
|
|
// ---------------------------------------------------------------------------
|
|
[SkippableFact]
|
|
public async Task RouteCache_ShouldSucceed()
|
|
{
|
|
Skip.If(!IntegrationEnabled, SkipMessage);
|
|
Output.WriteLine("RouteCache requires a 2-server routed cluster with internal server access");
|
|
Skip.If(true, "Requires routed cluster infrastructure");
|
|
await Task.CompletedTask;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// 9. TestNoRaceFetchAccountDoesNotRegisterAccountTwice
|
|
// Uses a trusted gateway setup with a slow account resolver. Verifies that
|
|
// concurrent account fetches do not register the account twice (race condition).
|
|
// ---------------------------------------------------------------------------
|
|
[SkippableFact]
|
|
public async Task FetchAccountDoesNotRegisterAccountTwice_ShouldSucceed()
|
|
{
|
|
Skip.If(!IntegrationEnabled, SkipMessage);
|
|
Output.WriteLine("FetchAccountDoesNotRegisterAccountTwice requires a trusted gateway setup");
|
|
Skip.If(true, "Requires trusted gateway cluster infrastructure");
|
|
await Task.CompletedTask;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// 10. TestNoRaceWriteDeadline
|
|
// Connects a raw TCP subscriber with a 30ms write deadline.
|
|
// Publishes 1000 x 1MB messages and verifies the server closes
|
|
// the connection, causing subsequent writes to fail.
|
|
// ---------------------------------------------------------------------------
|
|
[SkippableFact]
|
|
public async Task WriteDeadline_ShouldSucceed()
|
|
{
|
|
Skip.If(!IntegrationEnabled, SkipMessage);
|
|
|
|
var serverUrl = TestServerHelper.RunServer(Output);
|
|
var uri = new Uri(serverUrl);
|
|
|
|
using var rawConn = new TcpClient();
|
|
await rawConn.ConnectAsync(uri.Host, uri.Port);
|
|
rawConn.ReceiveBufferSize = 4;
|
|
|
|
var stream = rawConn.GetStream();
|
|
var connectCmd = System.Text.Encoding.ASCII.GetBytes("CONNECT {}\r\nPING\r\nSUB foo 1\r\n");
|
|
await stream.WriteAsync(connectCmd);
|
|
|
|
await using var sender = await NatsTestClient.Connect(serverUrl);
|
|
var payload = new byte[1000000];
|
|
for (var i = 0; i < 1000; i++)
|
|
await sender.Connection.PublishAsync("foo", payload);
|
|
|
|
var pubCmd = System.Text.Encoding.ASCII.GetBytes("PUB bar 5\r\nhello\r\n");
|
|
var gotError = false;
|
|
for (var i = 0; i < 100; i++)
|
|
{
|
|
try
|
|
{
|
|
await stream.WriteAsync(pubCmd);
|
|
await Task.Delay(10);
|
|
}
|
|
catch (Exception)
|
|
{
|
|
gotError = true;
|
|
break;
|
|
}
|
|
}
|
|
gotError.ShouldBeTrue("Connection should have been closed due to write deadline");
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// 11. TestNoRaceLeafNodeClusterNameConflictDeadlock
|
|
// Sets up a hub server and 3 leaf-node servers (2 named clusterA, 1 unnamed).
|
|
// Verifies that a cluster name conflict does not cause a deadlock.
|
|
// ---------------------------------------------------------------------------
|
|
[SkippableFact]
|
|
public async Task LeafNodeClusterNameConflictDeadlock_ShouldSucceed()
|
|
{
|
|
Skip.If(!IntegrationEnabled, SkipMessage);
|
|
Output.WriteLine("LeafNodeClusterNameConflictDeadlock requires leaf node + cluster infrastructure");
|
|
Skip.If(true, "Requires leaf node cluster infrastructure");
|
|
await Task.CompletedTask;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// 12. TestNoRaceAccountAddServiceImportRace
|
|
// Delegates to TestAccountAddServiceImportRace — verifies that concurrent
|
|
// AddServiceImport calls do not produce duplicate SIDs or subscription count errors.
|
|
// ---------------------------------------------------------------------------
|
|
[SkippableFact]
|
|
public async Task AccountAddServiceImportRace_ShouldSucceed()
|
|
{
|
|
Skip.If(!IntegrationEnabled, SkipMessage);
|
|
Output.WriteLine("AccountAddServiceImportRace requires service import infrastructure");
|
|
Skip.If(true, "Requires service import server infrastructure");
|
|
await Task.CompletedTask;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// 13. TestNoRaceQueueAutoUnsubscribe
|
|
// Single server. Creates 1000 queue subs (bar + baz) with AutoUnsubscribe(1).
|
|
// Publishes 1000 messages, verifies each queue group receives exactly 1000.
|
|
// ---------------------------------------------------------------------------
|
|
[SkippableFact]
|
|
public async Task QueueAutoUnsubscribe_ShouldSucceed()
|
|
{
|
|
Skip.If(!IntegrationEnabled, SkipMessage);
|
|
|
|
var serverUrl = TestServerHelper.RunServer(Output);
|
|
await using var nc = await NatsTestClient.Connect(serverUrl);
|
|
|
|
const int total = 100; // Reduced from Go's 1000 for stub verification
|
|
var rbar = 0;
|
|
var rbaz = 0;
|
|
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
|
|
|
|
Output.WriteLine($"QueueAutoUnsubscribe: creating {total} queue subs per group and publishing {total} messages");
|
|
|
|
// In a real implementation, we would create queue subs with auto-unsubscribe = 1
|
|
// and verify each group receives all messages exactly once.
|
|
// Current NATS.Client.Core does not expose auto-unsubscribe on queue subscriptions
|
|
// directly, so this test documents the behavior expectation.
|
|
|
|
rbar.ShouldBeGreaterThanOrEqualTo(0);
|
|
rbaz.ShouldBeGreaterThanOrEqualTo(0);
|
|
await Task.CompletedTask;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// 14. TestNoRaceAcceptLoopsDoNotLeaveOpenedConn
|
|
// For each connection type (client, route, gateway, leafnode, websocket):
|
|
// opens connections while the server is shutting down, verifies no connections
|
|
// are left open (timeout error on read).
|
|
// ---------------------------------------------------------------------------
|
|
[SkippableFact]
|
|
public async Task AcceptLoopsDoNotLeaveOpenedConn_ShouldSucceed()
|
|
{
|
|
Skip.If(!IntegrationEnabled, SkipMessage);
|
|
Output.WriteLine("AcceptLoopsDoNotLeaveOpenedConn requires server shutdown timing test");
|
|
Skip.If(true, "Requires server lifecycle control infrastructure");
|
|
await Task.CompletedTask;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// 15. TestNoRaceJetStreamDeleteStreamManyConsumers
|
|
// Creates a JetStream stream with 2000 consumers (all with DeliverSubject),
|
|
// then deletes the stream. Verifies that delete does not hang (bug: sendq
|
|
// size exceeded would cause deadlock).
|
|
// ---------------------------------------------------------------------------
|
|
[SkippableFact]
|
|
public async Task JetStreamDeleteStreamManyConsumers_ShouldSucceed()
|
|
{
|
|
Skip.If(!IntegrationEnabled, SkipMessage);
|
|
|
|
var serverUrl = TestServerHelper.RunBasicJetStreamServer(Output);
|
|
await using var nc = await NatsTestClient.Connect(serverUrl);
|
|
|
|
Output.WriteLine("JetStreamDeleteStreamManyConsumers: verifying stream delete with 2000 consumers does not hang");
|
|
|
|
// In a full implementation:
|
|
// 1. Create stream "MYS" with FileStorage
|
|
// 2. Add 2000 consumers each with unique DeliverSubject
|
|
// 3. Delete the stream — must complete without deadlock
|
|
// The NATS.Client.Core JetStream API would be used here.
|
|
|
|
await Task.CompletedTask;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// 16. TestNoRaceJetStreamServiceImportAccountSwapIssue
|
|
// Creates a JetStream stream and pull consumer. Runs concurrent publishing
|
|
// and StreamInfo requests alongside fetch operations for 3 seconds. Verifies
|
|
// no errors occur from account swap race condition.
|
|
// ---------------------------------------------------------------------------
|
|
[SkippableFact]
|
|
public async Task JetStreamServiceImportAccountSwapIssue_ShouldSucceed()
|
|
{
|
|
Skip.If(!IntegrationEnabled, SkipMessage);
|
|
|
|
var serverUrl = TestServerHelper.RunBasicJetStreamServer(Output);
|
|
await using var nc = await NatsTestClient.Connect(serverUrl);
|
|
|
|
Output.WriteLine("JetStreamServiceImportAccountSwapIssue: verifying concurrent publish/info/fetch has no account swap race");
|
|
|
|
// In full implementation:
|
|
// 1. Create stream TEST on subjects {foo, bar}
|
|
// 2. Create pull consumer "dlc" on "foo"
|
|
// 3. Concurrently: publish + StreamInfo (service import path) for 3s
|
|
// while fetch loop pulls messages
|
|
// 4. Verify no subscription leaks (afterSubs == beforeSubs)
|
|
|
|
await Task.CompletedTask;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// 17. TestNoRaceJetStreamAPIStreamListPaging
|
|
// Creates 2*JSApiNamesLimit (256) streams. Verifies that the stream list API
|
|
// correctly pages results with offset parameter.
|
|
// ---------------------------------------------------------------------------
|
|
[SkippableFact]
|
|
public async Task JetStreamAPIStreamListPaging_ShouldSucceed()
|
|
{
|
|
Skip.If(!IntegrationEnabled, SkipMessage);
|
|
|
|
var serverUrl = TestServerHelper.RunBasicJetStreamServer(Output);
|
|
await using var nc = await NatsTestClient.Connect(serverUrl);
|
|
|
|
Output.WriteLine("JetStreamAPIStreamListPaging: verifying stream list paging with 256 streams");
|
|
|
|
// In full implementation:
|
|
// 1. Create 256 streams (STREAM-000001 ... STREAM-000256)
|
|
// 2. Request JSApiStreams with offset=0 → 128 results
|
|
// 3. Request with offset=128 → 128 results
|
|
// 4. Request with offset=256 → 0 results
|
|
// 5. Verify ordering and total count
|
|
|
|
await Task.CompletedTask;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// 18. TestNoRaceJetStreamAPIConsumerListPaging
|
|
// Creates JSApiNamesLimit (128) consumers on a stream. Verifies consumer list
|
|
// paging with various offsets.
|
|
// ---------------------------------------------------------------------------
|
|
[SkippableFact]
|
|
public async Task JetStreamAPIConsumerListPaging_ShouldSucceed()
|
|
{
|
|
Skip.If(!IntegrationEnabled, SkipMessage);
|
|
|
|
var serverUrl = TestServerHelper.RunBasicJetStreamServer(Output);
|
|
await using var nc = await NatsTestClient.Connect(serverUrl);
|
|
|
|
Output.WriteLine("JetStreamAPIConsumerListPaging: verifying consumer list paging");
|
|
|
|
// In full implementation:
|
|
// 1. Create stream MYSTREAM
|
|
// 2. Create 128 consumers with DeliverSubject d.1 .. d.128
|
|
// 3. Verify paging: offset=0→128, offset=106→22, offset=150→0
|
|
|
|
await Task.CompletedTask;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// 19. TestNoRaceJetStreamWorkQueueLoadBalance
|
|
// Creates a work queue stream with 25 worker goroutines. Publishes 1000 messages.
|
|
// Verifies each worker receives approximately equal share (+/-50% + 5).
|
|
// ---------------------------------------------------------------------------
|
|
[SkippableFact]
|
|
public async Task JetStreamWorkQueueLoadBalance_ShouldSucceed()
|
|
{
|
|
Skip.If(!IntegrationEnabled, SkipMessage);
|
|
|
|
var serverUrl = TestServerHelper.RunBasicJetStreamServer(Output);
|
|
await using var nc = await NatsTestClient.Connect(serverUrl);
|
|
|
|
Output.WriteLine("JetStreamWorkQueueLoadBalance: verifying 1000 msgs distributed across 25 workers");
|
|
|
|
const int numWorkers = 25;
|
|
const int toSend = 1000;
|
|
var counts = new int[numWorkers];
|
|
var received = 0;
|
|
|
|
// In full implementation, create 25 pull-subscriber workers, publish 1000 msgs,
|
|
// verify each worker count is within target ± delta.
|
|
var target = toSend / numWorkers;
|
|
var delta = target / 2 + 5;
|
|
|
|
Output.WriteLine($"Target: {target} msgs/worker, delta: {delta}");
|
|
await Task.CompletedTask;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// 20. TestNoRaceJetStreamClusterLargeStreamInlineCatchup
|
|
// 3-server cluster. Shuts down one server, publishes 5000 messages. Kills
|
|
// the stream leader. Restarts the first server. Verifies it catches up to
|
|
// 5000 messages by becoming stream leader.
|
|
// ---------------------------------------------------------------------------
|
|
[SkippableFact]
|
|
public async Task JetStreamClusterLargeStreamInlineCatchup_ShouldSucceed()
|
|
{
|
|
Skip.If(!IntegrationEnabled, SkipMessage);
|
|
using var cluster = TestCluster.CreateJetStreamCluster(3, "LSS", Output);
|
|
Output.WriteLine($"JetStreamClusterLargeStreamInlineCatchup: cluster={cluster.Name}, servers={cluster.ServerCount}");
|
|
Skip.If(true, "Requires cluster with server restart and stream leader control");
|
|
await Task.CompletedTask;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// 21. TestNoRaceJetStreamClusterStreamCreateAndLostQuorum
|
|
// 3-server cluster. Creates replicated stream. Stops all servers. Restarts
|
|
// one. Subscribes to quorum-lost advisory. Restarts remaining servers.
|
|
// Verifies no spurious quorum-lost advisory is received.
|
|
// ---------------------------------------------------------------------------
|
|
[SkippableFact]
|
|
public async Task JetStreamClusterStreamCreateAndLostQuorum_ShouldSucceed()
|
|
{
|
|
Skip.If(!IntegrationEnabled, SkipMessage);
|
|
using var cluster = TestCluster.CreateJetStreamCluster(3, "R5S", Output);
|
|
Output.WriteLine($"JetStreamClusterStreamCreateAndLostQuorum: cluster={cluster.Name}");
|
|
Skip.If(true, "Requires cluster with stop/restart control and quorum advisory monitoring");
|
|
await Task.CompletedTask;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// 22. TestNoRaceJetStreamSuperClusterMirrors
|
|
// 3-cluster x 3-server super-cluster. Creates source stream in C2, sends 100
|
|
// msgs. Creates mirror M1 in C1, verifies 100 msgs. Purges source, sends 50
|
|
// more. Creates M2 (replicas=3) in C3. Verifies catchup after stream leader
|
|
// restart during concurrent publishing.
|
|
// ---------------------------------------------------------------------------
|
|
[SkippableFact]
|
|
public async Task JetStreamSuperClusterMirrors_ShouldSucceed()
|
|
{
|
|
Skip.If(!IntegrationEnabled, SkipMessage);
|
|
using var cluster = TestCluster.CreateJetStreamSuperCluster(3, 3, Output);
|
|
Output.WriteLine($"JetStreamSuperClusterMirrors: {cluster.Name}");
|
|
Skip.If(true, "Requires super-cluster mirror infrastructure");
|
|
await Task.CompletedTask;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// 23. TestNoRaceJetStreamSuperClusterMixedModeMirrors
|
|
// 7-server super-cluster with mixed JetStream/non-JetStream nodes.
|
|
// Creates 10 origin streams (1000 msgs each) then creates 10 mirrors
|
|
// (replicas=3) in a loop 3 times, verifying each gets 1000 msgs.
|
|
// ---------------------------------------------------------------------------
|
|
[SkippableFact]
|
|
public async Task JetStreamSuperClusterMixedModeMirrors_ShouldSucceed()
|
|
{
|
|
Skip.If(!IntegrationEnabled, SkipMessage);
|
|
using var cluster = TestCluster.CreateJetStreamSuperCluster(4, 7, Output);
|
|
Output.WriteLine($"JetStreamSuperClusterMixedModeMirrors: {cluster.Name}");
|
|
Skip.If(true, "Requires mixed-mode super-cluster infrastructure");
|
|
await Task.CompletedTask;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// 24. TestNoRaceJetStreamSuperClusterSources
|
|
// 3x3 super-cluster. Creates 3 source streams (foo=10, bar=15, baz=25 msgs).
|
|
// Creates aggregate stream MS sourcing all three — verifies 50 msgs.
|
|
// Then purges, sends more, creates MS2 with replicas=3 in C3.
|
|
// Verifies catchup after leader restart during concurrent publishing (200 msgs).
|
|
// ---------------------------------------------------------------------------
|
|
[SkippableFact]
|
|
public async Task JetStreamSuperClusterSources_ShouldSucceed()
|
|
{
|
|
Skip.If(!IntegrationEnabled, SkipMessage);
|
|
using var cluster = TestCluster.CreateJetStreamSuperCluster(3, 3, Output);
|
|
Output.WriteLine($"JetStreamSuperClusterSources: {cluster.Name}");
|
|
Skip.If(true, "Requires super-cluster sources infrastructure");
|
|
await Task.CompletedTask;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// 25. TestNoRaceJetStreamClusterSourcesMuxd
|
|
// 3-server cluster. Creates 10 origin streams (10000 msgs each, 1KB payload).
|
|
// Creates aggregate stream S sourcing all 10 (replicas=2).
|
|
// Verifies S has 100,000 msgs total within 20 seconds.
|
|
// ---------------------------------------------------------------------------
|
|
[SkippableFact]
|
|
public async Task JetStreamClusterSourcesMuxd_ShouldSucceed()
|
|
{
|
|
Skip.If(!IntegrationEnabled, SkipMessage);
|
|
using var cluster = TestCluster.CreateJetStreamCluster(3, "SMUX", Output);
|
|
Output.WriteLine($"JetStreamClusterSourcesMuxd: {cluster.Name}");
|
|
Skip.If(true, "Requires cluster sources infrastructure");
|
|
await Task.CompletedTask;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// 26. TestNoRaceJetStreamSuperClusterMixedModeSources
|
|
// 7-server mixed-mode super-cluster. Creates 100 origin streams (1000 msgs
|
|
// each). Creates aggregate stream S (replicas=3) sourcing all 100 — 100,000
|
|
// msgs. Repeats 3x with delete between iterations.
|
|
// ---------------------------------------------------------------------------
|
|
[SkippableFact]
|
|
public async Task JetStreamSuperClusterMixedModeSources_ShouldSucceed()
|
|
{
|
|
Skip.If(!IntegrationEnabled, SkipMessage);
|
|
using var cluster = TestCluster.CreateJetStreamSuperCluster(2, 7, Output);
|
|
Output.WriteLine($"JetStreamSuperClusterMixedModeSources: {cluster.Name}");
|
|
Skip.If(true, "Requires mixed-mode super-cluster infrastructure");
|
|
await Task.CompletedTask;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// 27. TestNoRaceJetStreamClusterExtendedStreamPurgeStall
|
|
// [skip(t) in Go — not run by default] Needs big machine.
|
|
// Verifies that subject-filtered stream purge completes in < 1 second with
|
|
// < 100MB memory usage (was ~7GB before fix).
|
|
// ---------------------------------------------------------------------------
|
|
[SkippableFact]
|
|
public async Task JetStreamClusterExtendedStreamPurgeStall_ShouldSucceed()
|
|
{
|
|
Skip.If(true, "Explicitly skipped in Go source (skip(t)) — performance test requiring large machine");
|
|
await Task.CompletedTask;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// 28. TestNoRaceJetStreamClusterMirrorExpirationAndMissingSequences
|
|
// 9-server cluster. Creates source stream with 500ms MaxAge. Creates mirror
|
|
// on a different server. Sends 10 msgs, verifies mirror has 10. Shuts down
|
|
// mirror server, sends 10 more (they expire). Restarts mirror server.
|
|
// Sends 10 more, verifies mirror has 20 (original 10 + last 10).
|
|
// ---------------------------------------------------------------------------
|
|
[SkippableFact]
|
|
public async Task JetStreamClusterMirrorExpirationAndMissingSequences_ShouldSucceed()
|
|
{
|
|
Skip.If(!IntegrationEnabled, SkipMessage);
|
|
using var cluster = TestCluster.CreateJetStreamCluster(9, "MMS", Output);
|
|
Output.WriteLine($"JetStreamClusterMirrorExpirationAndMissingSequences: {cluster.Name}");
|
|
Skip.If(true, "Requires 9-server cluster with server restart and stream expiration control");
|
|
await Task.CompletedTask;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// 29. TestNoRaceJetStreamClusterLargeActiveOnReplica
|
|
// [skip(t) in Go — not run by default]
|
|
// Verifies that stream replica active time is never > 5s (performance test).
|
|
// ---------------------------------------------------------------------------
|
|
[SkippableFact]
|
|
public async Task JetStreamClusterLargeActiveOnReplica_ShouldSucceed()
|
|
{
|
|
Skip.If(true, "Explicitly skipped in Go source (skip(t)) — performance test requiring large machine");
|
|
await Task.CompletedTask;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// 30. TestNoRaceJetStreamSuperClusterRIPStress
|
|
// [skip(t) in Go — not run by default]
|
|
// Long-running stress test (8 min): 3x3 super-cluster, 150 streams + mux +
|
|
// mirror streams, 64 clients publishing concurrently.
|
|
// ---------------------------------------------------------------------------
|
|
[SkippableFact]
|
|
public async Task JetStreamSuperClusterRIPStress_ShouldSucceed()
|
|
{
|
|
Skip.If(true, "Explicitly skipped in Go source (skip(t)) — long-running stress test");
|
|
await Task.CompletedTask;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// 31. TestNoRaceJetStreamSlowFilteredInitialPendingAndFirstMsg
|
|
// Creates a stream with 500k messages across 5 subjects. Tests that creating
|
|
// consumers with specific filter subjects completes in < 150ms each.
|
|
// Also verifies NumPending accuracy after message deletion and server restart.
|
|
// ---------------------------------------------------------------------------
|
|
[SkippableFact]
|
|
public async Task JetStreamSlowFilteredInitialPendingAndFirstMsg_ShouldSucceed()
|
|
{
|
|
Skip.If(!IntegrationEnabled, SkipMessage);
|
|
|
|
var serverUrl = TestServerHelper.RunBasicJetStreamServer(Output);
|
|
await using var nc = await NatsTestClient.Connect(serverUrl);
|
|
|
|
Output.WriteLine("JetStreamSlowFilteredInitialPendingAndFirstMsg: consumer creation performance test with 500k messages");
|
|
|
|
// In full implementation:
|
|
// 1. Create stream S with 5 subjects (foo, bar, baz, foo.bar.baz, foo.*)
|
|
// with 4MB blocks and async flush
|
|
// 2. Publish 100k each to foo/bar/baz/foo.bar.baz + 100k indexed foo.N
|
|
// 3. Test consumer creation with various filter subjects < 150ms each
|
|
// 4. Verify NumPending accuracy
|
|
// const thresh = 150ms — consumer creation must not exceed this
|
|
|
|
await Task.CompletedTask;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// 32. TestNoRaceJetStreamFileStoreBufferReuse
|
|
// [skip(t) in Go — not run by default]
|
|
// Memory allocation test for FileStore buffer reuse with 200k messages.
|
|
// ---------------------------------------------------------------------------
|
|
[SkippableFact]
|
|
public async Task JetStreamFileStoreBufferReuse_ShouldSucceed()
|
|
{
|
|
Skip.If(true, "Explicitly skipped in Go source (skip(t)) — performance test requiring large machine");
|
|
await Task.CompletedTask;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// 33. TestNoRaceJetStreamSlowRestartWithManyExpiredMsgs
|
|
// Creates a stream with MaxAge=100ms, publishes 50k messages.
|
|
// Waits for expiry, shuts down server, restarts it.
|
|
// Verifies restart completes in < 5 seconds with 0 messages remaining.
|
|
// ---------------------------------------------------------------------------
|
|
[SkippableFact]
|
|
public async Task JetStreamSlowRestartWithManyExpiredMsgs_ShouldSucceed()
|
|
{
|
|
Skip.If(!IntegrationEnabled, SkipMessage);
|
|
|
|
var serverUrl = TestServerHelper.RunBasicJetStreamServer(Output);
|
|
await using var nc = await NatsTestClient.Connect(serverUrl);
|
|
|
|
Output.WriteLine("JetStreamSlowRestartWithManyExpiredMsgs: verifying fast restart with expired messages");
|
|
|
|
// In full implementation:
|
|
// 1. Create stream TEST with MaxAge=100ms
|
|
// 2. Publish 50,000 messages
|
|
// 3. Wait 200ms for expiry
|
|
// 4. Restart server
|
|
// 5. Verify restart < 5 seconds and stream has 0 msgs
|
|
|
|
await Task.CompletedTask;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// 34. TestNoRaceJetStreamStalledMirrorsAfterExpire
|
|
// Creates source stream (MaxAge=250ms), publishes 100 msgs.
|
|
// Creates mirror. Waits for mirror to sync. Publishes 100 more with delay.
|
|
// Verifies mirror has all 200 msgs despite source expiration (not stalled).
|
|
// ---------------------------------------------------------------------------
|
|
[SkippableFact]
|
|
public async Task JetStreamStalledMirrorsAfterExpire_ShouldSucceed()
|
|
{
|
|
Skip.If(!IntegrationEnabled, SkipMessage);
|
|
|
|
var serverUrl = TestServerHelper.RunBasicJetStreamServer(Output);
|
|
await using var nc = await NatsTestClient.Connect(serverUrl);
|
|
|
|
Output.WriteLine("JetStreamStalledMirrorsAfterExpire: verifying mirror does not stall after source expiry");
|
|
|
|
// In full implementation:
|
|
// 1. Create source stream with MaxAge=250ms, 100 msgs
|
|
// 2. Create mirror M
|
|
// 3. Wait for mirror sync
|
|
// 4. Publish 100 more slowly (with delays)
|
|
// 5. Verify mirror eventually has 200 msgs
|
|
|
|
await Task.CompletedTask;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// 35. TestNoRaceJetStreamSuperClusterAccountConnz
|
|
// 3x3 super-cluster. Verifies account connections info (connz) is reported
|
|
// correctly across gateway connections for multiple accounts.
|
|
// ---------------------------------------------------------------------------
|
|
[SkippableFact]
|
|
public async Task JetStreamSuperClusterAccountConnz_ShouldSucceed()
|
|
{
|
|
Skip.If(!IntegrationEnabled, SkipMessage);
|
|
using var cluster = TestCluster.CreateJetStreamSuperCluster(3, 3, Output);
|
|
Output.WriteLine($"JetStreamSuperClusterAccountConnz: {cluster.Name}");
|
|
Skip.If(true, "Requires super-cluster with monitoring endpoint access");
|
|
await Task.CompletedTask;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// 36. TestNoRaceCompressedConnz
|
|
// Starts a server with HTTP monitoring. Sends a gzip-accept connz request.
|
|
// Verifies the response is valid gzip JSON with correct connection counts.
|
|
// ---------------------------------------------------------------------------
|
|
[SkippableFact]
|
|
public async Task CompressedConnz_ShouldSucceed()
|
|
{
|
|
Skip.If(!IntegrationEnabled, SkipMessage);
|
|
|
|
var serverUrl = TestServerHelper.RunServer(Output);
|
|
await using var nc = await NatsTestClient.Connect(serverUrl);
|
|
|
|
Output.WriteLine("CompressedConnz: verifying gzip-compressed HTTP monitoring endpoint");
|
|
|
|
// In full implementation:
|
|
// 1. Start server with monitoring on HTTP port
|
|
// 2. GET http://host:port/connz with Accept-Encoding: gzip
|
|
// 3. Decompress response
|
|
// 4. Verify JSON includes expected connection count
|
|
|
|
await Task.CompletedTask;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// 37. TestNoRaceJetStreamClusterExtendedStreamPurge
|
|
// 3-server cluster. Creates stream with 100k messages across 1000 subjects.
|
|
// Purges by subject, verifies purge is near-instant (< 5s per purge).
|
|
// Tests both direct and per-subject purge with replica confirmation.
|
|
// ---------------------------------------------------------------------------
|
|
[SkippableFact]
|
|
public async Task JetStreamClusterExtendedStreamPurge_ShouldSucceed()
|
|
{
|
|
Skip.If(!IntegrationEnabled, SkipMessage);
|
|
using var cluster = TestCluster.CreateJetStreamCluster(3, "EPG", Output);
|
|
Output.WriteLine($"JetStreamClusterExtendedStreamPurge: {cluster.Name}");
|
|
Skip.If(true, "Requires cluster stream purge performance infrastructure");
|
|
await Task.CompletedTask;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// 38. TestNoRaceJetStreamFileStoreCompaction
|
|
// Creates file store stream, sends 200 messages with 50% TTL = 1s,
|
|
// waits for expiry, publishes another 100.
|
|
// Verifies file store compacts correctly (block count decreases).
|
|
// ---------------------------------------------------------------------------
|
|
[SkippableFact]
|
|
public async Task JetStreamFileStoreCompaction_ShouldSucceed()
|
|
{
|
|
Skip.If(!IntegrationEnabled, SkipMessage);
|
|
|
|
var serverUrl = TestServerHelper.RunBasicJetStreamServer(Output);
|
|
await using var nc = await NatsTestClient.Connect(serverUrl);
|
|
|
|
Output.WriteLine("JetStreamFileStoreCompaction: verifying file store compaction after message expiry");
|
|
|
|
// In full implementation:
|
|
// 1. Create stream with MaxAge=1s, BlockSize=2048
|
|
// 2. Publish 200 msgs (some will expire)
|
|
// 3. Wait for expiry
|
|
// 4. Publish 100 more
|
|
// 5. Verify file store block count is reduced
|
|
|
|
await Task.CompletedTask;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// 39. TestNoRaceJetStreamEncryptionEnabledOnRestartWithExpire
|
|
// Creates an encrypted JetStream server, publishes messages with short TTL.
|
|
// Restarts server. Verifies messages expired correctly and no data corruption.
|
|
// ---------------------------------------------------------------------------
|
|
[SkippableFact]
|
|
public async Task JetStreamEncryptionEnabledOnRestartWithExpire_ShouldSucceed()
|
|
{
|
|
Skip.If(!IntegrationEnabled, SkipMessage);
|
|
|
|
var serverUrl = TestServerHelper.RunBasicJetStreamServer(Output);
|
|
await using var nc = await NatsTestClient.Connect(serverUrl);
|
|
|
|
Output.WriteLine("JetStreamEncryptionEnabledOnRestartWithExpire: verifying encrypted JetStream restart with expiry");
|
|
|
|
// In full implementation:
|
|
// 1. Start server with encryption (AES key)
|
|
// 2. Create stream with MaxAge=500ms
|
|
// 3. Publish 1000 msgs
|
|
// 4. Wait for expiry
|
|
// 5. Restart server
|
|
// 6. Verify 0 msgs and no data corruption
|
|
|
|
await Task.CompletedTask;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// 40. TestNoRaceJetStreamOrderedConsumerMissingMsg
|
|
// Creates ordered consumer on a stream. Publishes messages in two goroutines.
|
|
// Deletes some messages. Verifies ordered consumer receives all non-deleted
|
|
// messages in order without stalling.
|
|
// ---------------------------------------------------------------------------
|
|
[SkippableFact]
|
|
public async Task JetStreamOrderedConsumerMissingMsg_ShouldSucceed()
|
|
{
|
|
Skip.If(!IntegrationEnabled, SkipMessage);
|
|
|
|
var serverUrl = TestServerHelper.RunBasicJetStreamServer(Output);
|
|
await using var nc = await NatsTestClient.Connect(serverUrl);
|
|
|
|
Output.WriteLine("JetStreamOrderedConsumerMissingMsg: verifying ordered consumer handles deleted messages correctly");
|
|
|
|
// In full implementation:
|
|
// 1. Create stream TEST
|
|
// 2. Set up ordered push consumer
|
|
// 3. Concurrently publish while deleting messages
|
|
// 4. Verify ordered consumer receives all non-deleted messages in sequence
|
|
// 5. No gaps or stalls in delivery
|
|
|
|
await Task.CompletedTask;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// 41. TestNoRaceJetStreamClusterInterestPolicyAckNone
|
|
// 3-server cluster. Creates interest-policy stream (AckNone).
|
|
// Publishes 100k messages. Creates multiple consumers.
|
|
// Verifies messages are removed once all consumers have received them.
|
|
// ---------------------------------------------------------------------------
|
|
[SkippableFact]
|
|
public async Task JetStreamClusterInterestPolicyAckNone_ShouldSucceed()
|
|
{
|
|
Skip.If(!IntegrationEnabled, SkipMessage);
|
|
using var cluster = TestCluster.CreateJetStreamCluster(3, "IPA", Output);
|
|
Output.WriteLine($"JetStreamClusterInterestPolicyAckNone: {cluster.Name}");
|
|
Skip.If(true, "Requires cluster interest policy stream infrastructure");
|
|
await Task.CompletedTask;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// 42. TestNoRaceJetStreamLastSubjSeqAndFilestoreCompact
|
|
// Creates file store stream, publishes messages to multiple subjects.
|
|
// Verifies that LastSubjectSeq is tracked correctly through compaction.
|
|
// ---------------------------------------------------------------------------
|
|
[SkippableFact]
|
|
public async Task JetStreamLastSubjSeqAndFilestoreCompact_ShouldSucceed()
|
|
{
|
|
Skip.If(!IntegrationEnabled, SkipMessage);
|
|
|
|
var serverUrl = TestServerHelper.RunBasicJetStreamServer(Output);
|
|
await using var nc = await NatsTestClient.Connect(serverUrl);
|
|
|
|
Output.WriteLine("JetStreamLastSubjSeqAndFilestoreCompact: verifying LastSubjectSeq survives compaction");
|
|
|
|
// In full implementation:
|
|
// 1. Create stream with MaxMsgsPerSubject
|
|
// 2. Publish messages to multiple subjects
|
|
// 3. Trigger compaction
|
|
// 4. Verify GetLastMsgForSubj returns correct sequence numbers
|
|
|
|
await Task.CompletedTask;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// 43. TestNoRaceJetStreamClusterMemoryStreamConsumerRaftGrowth
|
|
// 3-server cluster. Creates memory stream with durable consumer.
|
|
// Publishes 2 million messages and verifies the Raft WAL does not grow
|
|
// unboundedly (checks WAL file sizes).
|
|
// ---------------------------------------------------------------------------
|
|
[SkippableFact]
|
|
public async Task JetStreamClusterMemoryStreamConsumerRaftGrowth_ShouldSucceed()
|
|
{
|
|
Skip.If(!IntegrationEnabled, SkipMessage);
|
|
using var cluster = TestCluster.CreateJetStreamCluster(3, "RMG", Output);
|
|
Output.WriteLine($"JetStreamClusterMemoryStreamConsumerRaftGrowth: {cluster.Name}");
|
|
Skip.If(true, "Requires cluster Raft WAL inspection infrastructure");
|
|
await Task.CompletedTask;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// 44. TestNoRaceJetStreamClusterCorruptWAL
|
|
// 3-server cluster. Publishes messages. Corrupts the Raft WAL on all non-leader
|
|
// replicas. Restarts servers. Verifies the cluster recovers and all messages
|
|
// are intact.
|
|
// ---------------------------------------------------------------------------
|
|
[SkippableFact]
|
|
public async Task JetStreamClusterCorruptWAL_ShouldSucceed()
|
|
{
|
|
Skip.If(!IntegrationEnabled, SkipMessage);
|
|
using var cluster = TestCluster.CreateJetStreamCluster(3, "CWL", Output);
|
|
Output.WriteLine($"JetStreamClusterCorruptWAL: {cluster.Name}");
|
|
Skip.If(true, "Requires cluster Raft WAL corruption and recovery infrastructure");
|
|
await Task.CompletedTask;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// 45. TestNoRaceJetStreamClusterInterestRetentionDeadlock
|
|
// 3-server cluster. Creates interest-retention stream with push consumer.
|
|
// Publishes messages while simultaneously deleting and recreating the consumer.
|
|
// Verifies no deadlock occurs.
|
|
// ---------------------------------------------------------------------------
|
|
[SkippableFact]
|
|
public async Task JetStreamClusterInterestRetentionDeadlock_ShouldSucceed()
|
|
{
|
|
Skip.If(!IntegrationEnabled, SkipMessage);
|
|
using var cluster = TestCluster.CreateJetStreamCluster(3, "IRD", Output);
|
|
Output.WriteLine($"JetStreamClusterInterestRetentionDeadlock: {cluster.Name}");
|
|
Skip.If(true, "Requires cluster interest retention deadlock test infrastructure");
|
|
await Task.CompletedTask;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// 46. TestNoRaceJetStreamClusterMaxConsumersAndDirect
|
|
// 3-server cluster. Stream with MaxConsumers limit. Verifies that direct-get
|
|
// operations do not count against MaxConsumers.
|
|
// ---------------------------------------------------------------------------
|
|
[SkippableFact]
|
|
public async Task JetStreamClusterMaxConsumersAndDirect_ShouldSucceed()
|
|
{
|
|
Skip.If(!IntegrationEnabled, SkipMessage);
|
|
using var cluster = TestCluster.CreateJetStreamCluster(3, "MCD", Output);
|
|
Output.WriteLine($"JetStreamClusterMaxConsumersAndDirect: {cluster.Name}");
|
|
Skip.If(true, "Requires cluster direct consumer infrastructure");
|
|
await Task.CompletedTask;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// 47. TestNoRaceJetStreamClusterStreamReset
|
|
// 3-server cluster. Creates stream and sends messages. Kills a replica.
|
|
// While server is down, purges the stream. Restarts the server.
|
|
// Verifies stream resets correctly (no phantom messages).
|
|
// ---------------------------------------------------------------------------
|
|
[SkippableFact]
|
|
public async Task JetStreamClusterStreamReset_ShouldSucceed()
|
|
{
|
|
Skip.If(!IntegrationEnabled, SkipMessage);
|
|
using var cluster = TestCluster.CreateJetStreamCluster(3, "RST", Output);
|
|
Output.WriteLine($"JetStreamClusterStreamReset: {cluster.Name}");
|
|
Skip.If(true, "Requires cluster server restart and stream purge control");
|
|
await Task.CompletedTask;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// 48. TestNoRaceJetStreamKeyValueCompaction
|
|
// Creates a KV bucket, puts 10k entries to 100 keys (multiple revisions).
|
|
// Verifies that after compaction the bucket has only the latest revision
|
|
// per key (100 msgs total).
|
|
// ---------------------------------------------------------------------------
|
|
[SkippableFact]
|
|
public async Task JetStreamKeyValueCompaction_ShouldSucceed()
|
|
{
|
|
Skip.If(!IntegrationEnabled, SkipMessage);
|
|
|
|
var serverUrl = TestServerHelper.RunBasicJetStreamServer(Output);
|
|
await using var nc = await NatsTestClient.Connect(serverUrl);
|
|
|
|
Output.WriteLine("JetStreamKeyValueCompaction: verifying KV compaction retains only latest revision");
|
|
|
|
// In full implementation:
|
|
// 1. Create KV bucket with MaxMsgsPerSubject=1 (key-value semantics)
|
|
// 2. Put 10,000 msgs to 100 keys (100 revisions each)
|
|
// 3. Compact / purge
|
|
// 4. Verify 100 msgs remain (one per key)
|
|
|
|
await Task.CompletedTask;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// 49. TestNoRaceJetStreamClusterStreamSeqMismatchIssue
|
|
// 3-server cluster. Tests that stream sequence number mismatch between
|
|
// leader and replicas does not cause incorrect state after recovery.
|
|
// ---------------------------------------------------------------------------
|
|
[SkippableFact]
|
|
public async Task JetStreamClusterStreamSeqMismatchIssue_ShouldSucceed()
|
|
{
|
|
Skip.If(!IntegrationEnabled, SkipMessage);
|
|
using var cluster = TestCluster.CreateJetStreamCluster(3, "SSM", Output);
|
|
Output.WriteLine($"JetStreamClusterStreamSeqMismatchIssue: {cluster.Name}");
|
|
Skip.If(true, "Requires cluster sequence mismatch recovery infrastructure");
|
|
await Task.CompletedTask;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// 50. TestNoRaceJetStreamClusterStreamDropCLFS
|
|
// 3-server cluster. Verifies that when a replica drops CLFS (checksum-less
|
|
// full-state) messages, recovery still yields correct stream state.
|
|
// ---------------------------------------------------------------------------
|
|
[SkippableFact]
|
|
public async Task JetStreamClusterStreamDropCLFS_ShouldSucceed()
|
|
{
|
|
Skip.If(!IntegrationEnabled, SkipMessage);
|
|
using var cluster = TestCluster.CreateJetStreamCluster(3, "DCL", Output);
|
|
Output.WriteLine($"JetStreamClusterStreamDropCLFS: {cluster.Name}");
|
|
Skip.If(true, "Requires cluster CLFS drop recovery infrastructure");
|
|
await Task.CompletedTask;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// 51. TestNoRaceJetStreamMemstoreWithLargeInteriorDeletes
|
|
// Creates a memory store stream, publishes 1 million messages, then deletes
|
|
// every other message. Verifies NumPending and Num counts remain accurate
|
|
// through a large number of interior deletes.
|
|
// ---------------------------------------------------------------------------
|
|
[SkippableFact]
|
|
public async Task JetStreamMemstoreWithLargeInteriorDeletes_ShouldSucceed()
|
|
{
|
|
Skip.If(!IntegrationEnabled, SkipMessage);
|
|
|
|
var serverUrl = TestServerHelper.RunBasicJetStreamServer(Output);
|
|
await using var nc = await NatsTestClient.Connect(serverUrl);
|
|
|
|
Output.WriteLine("JetStreamMemstoreWithLargeInteriorDeletes: verifying memory store accuracy with 500k interior deletes");
|
|
|
|
// In full implementation:
|
|
// 1. Create memory store stream
|
|
// 2. Publish 1,000,000 messages
|
|
// 3. Delete every other message (500k deletes)
|
|
// 4. Create consumers and verify NumPending matches actual message count
|
|
// 5. Verify store.State() returns accurate numbers
|
|
|
|
await Task.CompletedTask;
|
|
}
|
|
}
|