Replace all Task.Delay-based interest propagation waits with active probe loops (PeriodicTimer + publish-and-read) in GatewayFailoverTests, LeafNodeFailoverTests, JetStreamClusterTests, and RaftConsensusTests. Fix SW003 empty-catch violations in ClusterResilienceTests by adding _ = e discard statements. Correct State.Messages type from ulong to long to match the NATS.Client.JetStream API.
239 lines
9.3 KiB
C#
239 lines
9.3 KiB
C#
using NATS.Client.Core;
|
|
using NATS.Client.JetStream;
|
|
using NATS.Client.JetStream.Models;
|
|
using NATS.E2E.Cluster.Tests.Infrastructure;
|
|
|
|
namespace NATS.E2E.Cluster.Tests;
|
|
|
|
// Go reference: server/jetstream_cluster_test.go — TestJetStreamClusterBasic, TestJetStreamClusterNodeFailure, etc.
|
|
[Collection("E2E-JetStreamCluster")]
|
|
public class JetStreamClusterTests(JetStreamClusterFixture fixture) : IClassFixture<JetStreamClusterFixture>
|
|
{
|
|
/// <summary>
|
|
/// Creates an R3 stream and publishes 5 messages, then verifies all 3 nodes report 5 messages.
|
|
/// Go reference: server/jetstream_cluster_test.go TestJetStreamClusterBasic
|
|
/// </summary>
|
|
[Fact]
|
|
public async Task R3Stream_CreateAndPublish_ReplicatedAcrossNodes()
|
|
{
|
|
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
|
|
|
|
await using var client = fixture.CreateClient(0);
|
|
await client.ConnectAsync();
|
|
|
|
var js = new NatsJSContext(client);
|
|
|
|
await js.CreateStreamAsync(
|
|
new StreamConfig("JS_REPL", ["js.repl.>"]) { NumReplicas = 3 },
|
|
cts.Token);
|
|
|
|
for (var i = 0; i < 5; i++)
|
|
{
|
|
await js.PublishAsync("js.repl.data", $"msg-{i}", cancellationToken: cts.Token);
|
|
}
|
|
|
|
// Poll each node until it reports 5 messages, confirming RAFT replication completed
|
|
for (var nodeIndex = 0; nodeIndex < 3; nodeIndex++)
|
|
{
|
|
await using var nodeClient = fixture.CreateClient(nodeIndex);
|
|
await nodeClient.ConnectAsync();
|
|
|
|
var nodeJs = new NatsJSContext(nodeClient);
|
|
await WaitForStreamMessagesAsync(nodeJs, "JS_REPL", minMessages: 5, cts.Token);
|
|
|
|
var info = await nodeJs.GetStreamAsync("JS_REPL", cancellationToken: cts.Token);
|
|
info.Info.State.Messages.ShouldBe(5L,
|
|
$"Node {nodeIndex} should report 5 messages but reported {info.Info.State.Messages}");
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Kills node 2 mid-stream and verifies publishing continues on the surviving 2/3 quorum,
|
|
/// then restores node 2 and waits for full mesh.
|
|
/// Go reference: server/jetstream_cluster_test.go TestJetStreamClusterNodeFailure
|
|
/// </summary>
|
|
[Fact]
|
|
public async Task R3Stream_NodeDies_PublishContinues()
|
|
{
|
|
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
|
|
|
|
await using var client0 = fixture.CreateClient(0);
|
|
await client0.ConnectAsync();
|
|
|
|
var js0 = new NatsJSContext(client0);
|
|
|
|
await js0.CreateStreamAsync(
|
|
new StreamConfig("JS_NODEDIES", ["js.nodedies.>"]) { NumReplicas = 3 },
|
|
cts.Token);
|
|
|
|
// Publish 3 initial messages
|
|
for (var i = 0; i < 3; i++)
|
|
{
|
|
await js0.PublishAsync("js.nodedies.data", $"pre-{i}", cancellationToken: cts.Token);
|
|
}
|
|
|
|
// Kill node 2; poll node 1 until the cluster elects a new leader and stream is available
|
|
await fixture.KillNode(2);
|
|
|
|
await using var client1Early = fixture.CreateClient(1);
|
|
await client1Early.ConnectAsync();
|
|
var js1Early = new NatsJSContext(client1Early);
|
|
await WaitForStreamMessagesAsync(js1Early, "JS_NODEDIES", minMessages: 3, cts.Token);
|
|
|
|
// Publish 5 more on node 0 — should succeed with 2/3 quorum
|
|
for (var i = 0; i < 5; i++)
|
|
{
|
|
await js0.PublishAsync("js.nodedies.data", $"post-{i}", cancellationToken: cts.Token);
|
|
}
|
|
|
|
// Poll node 1 until it sees all 8 messages
|
|
await using var client1 = fixture.CreateClient(1);
|
|
await client1.ConnectAsync();
|
|
|
|
var js1 = new NatsJSContext(client1);
|
|
await WaitForStreamMessagesAsync(js1, "JS_NODEDIES", minMessages: 8, cts.Token);
|
|
|
|
var info = await js1.GetStreamAsync("JS_NODEDIES", cancellationToken: cts.Token);
|
|
info.Info.State.Messages.ShouldBeGreaterThanOrEqualTo(8L,
|
|
$"Node 1 should have >= 8 messages but has {info.Info.State.Messages}");
|
|
|
|
// Restore node 2 and wait for full mesh
|
|
await fixture.RestartNode(2);
|
|
await fixture.WaitForFullMeshAsync(expectedRoutes: 2);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Kills node 2 while a pull consumer exists and verifies the consumer is accessible on node 1.
|
|
/// Go reference: server/jetstream_cluster_test.go TestJetStreamClusterConsumerHardKill
|
|
/// </summary>
|
|
[Fact]
|
|
public async Task Consumer_NodeDies_PullContinuesOnSurvivor()
|
|
{
|
|
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
|
|
|
|
await using var client0 = fixture.CreateClient(0);
|
|
await client0.ConnectAsync();
|
|
|
|
var js0 = new NatsJSContext(client0);
|
|
|
|
var stream = await js0.CreateStreamAsync(
|
|
new StreamConfig("JS_CONS_FAIL", ["js.consfail.>"]) { NumReplicas = 3 },
|
|
cts.Token);
|
|
|
|
// Publish 5 messages
|
|
for (var i = 0; i < 5; i++)
|
|
{
|
|
await js0.PublishAsync("js.consfail.data", $"msg-{i}", cancellationToken: cts.Token);
|
|
}
|
|
|
|
// Create pull consumer
|
|
await stream.CreateOrUpdateConsumerAsync(
|
|
new ConsumerConfig("cons-fail") { AckPolicy = ConsumerConfigAckPolicy.Explicit },
|
|
cts.Token);
|
|
|
|
// Kill node 2; poll node 1 until the stream and consumer are accessible via the new leader
|
|
await fixture.KillNode(2);
|
|
|
|
await using var client1 = fixture.CreateClient(1);
|
|
await client1.ConnectAsync();
|
|
|
|
var js1 = new NatsJSContext(client1);
|
|
await WaitForStreamMessagesAsync(js1, "JS_CONS_FAIL", minMessages: 5, cts.Token);
|
|
|
|
var stream1 = await js1.GetStreamAsync("JS_CONS_FAIL", cancellationToken: cts.Token);
|
|
var consumer = await stream1.GetConsumerAsync("cons-fail", cts.Token);
|
|
|
|
consumer.ShouldNotBeNull("Consumer 'cons-fail' should be accessible on surviving node 1");
|
|
|
|
// Restore node 2 and wait for full mesh
|
|
await fixture.RestartNode(2);
|
|
await fixture.WaitForFullMeshAsync(expectedRoutes: 2);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Purges an R3 stream and verifies all 3 nodes report 0 messages after purge replication.
|
|
/// Go reference: server/jetstream_cluster_test.go TestJetStreamClusterStreamPurge
|
|
/// </summary>
|
|
[Fact]
|
|
public async Task R3Stream_Purge_ReplicatedAcrossNodes()
|
|
{
|
|
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
|
|
|
|
await using var client = fixture.CreateClient(0);
|
|
await client.ConnectAsync();
|
|
|
|
var js = new NatsJSContext(client);
|
|
|
|
await js.CreateStreamAsync(
|
|
new StreamConfig("JS_PURGE", ["js.purge.>"]) { NumReplicas = 3 },
|
|
cts.Token);
|
|
|
|
// Publish 5 messages and wait for replication to all nodes
|
|
for (var i = 0; i < 5; i++)
|
|
{
|
|
await js.PublishAsync("js.purge.data", $"msg-{i}", cancellationToken: cts.Token);
|
|
}
|
|
|
|
// Poll all nodes until each confirms it has the 5 pre-purge messages
|
|
for (var nodeIndex = 0; nodeIndex < 3; nodeIndex++)
|
|
{
|
|
await using var nc = fixture.CreateClient(nodeIndex);
|
|
await nc.ConnectAsync();
|
|
await WaitForStreamMessagesAsync(new NatsJSContext(nc), "JS_PURGE", minMessages: 5, cts.Token);
|
|
}
|
|
|
|
// Purge the stream
|
|
await js.PurgeStreamAsync("JS_PURGE", new StreamPurgeRequest(), cts.Token);
|
|
|
|
// Poll all nodes until each confirms 0 messages (purge replicated)
|
|
for (var nodeIndex = 0; nodeIndex < 3; nodeIndex++)
|
|
{
|
|
await using var nodeClient = fixture.CreateClient(nodeIndex);
|
|
await nodeClient.ConnectAsync();
|
|
|
|
var nodeJs = new NatsJSContext(nodeClient);
|
|
await WaitForStreamMessagesAsync(nodeJs, "JS_PURGE", minMessages: 0, cts.Token,
|
|
exactMatch: true);
|
|
|
|
var info = await nodeJs.GetStreamAsync("JS_PURGE", cancellationToken: cts.Token);
|
|
info.Info.State.Messages.ShouldBe(0L,
|
|
$"Node {nodeIndex} should report 0 messages after purge but reported {info.Info.State.Messages}");
|
|
}
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Helpers
|
|
// ---------------------------------------------------------------------------
|
|
|
|
/// <summary>
|
|
/// Polls the stream on the given JetStream context until it reports at least
|
|
/// <paramref name="minMessages"/> messages (or exactly 0 when <paramref name="exactMatch"/>
|
|
/// is true), or the cancellation token fires.
|
|
/// </summary>
|
|
private static async Task WaitForStreamMessagesAsync(
|
|
NatsJSContext js,
|
|
string streamName,
|
|
long minMessages,
|
|
CancellationToken ct,
|
|
bool exactMatch = false)
|
|
{
|
|
using var timer = new PeriodicTimer(TimeSpan.FromMilliseconds(200));
|
|
while (await timer.WaitForNextTickAsync(ct).ConfigureAwait(false))
|
|
{
|
|
try
|
|
{
|
|
var info = await js.GetStreamAsync(streamName, cancellationToken: ct);
|
|
var count = info.Info.State.Messages;
|
|
var satisfied = exactMatch ? count == minMessages : count >= minMessages;
|
|
if (satisfied)
|
|
return;
|
|
}
|
|
catch (NatsJSApiException ex)
|
|
{
|
|
// Stream not yet visible on this node after failover — keep polling.
|
|
_ = ex;
|
|
}
|
|
}
|
|
}
|
|
}
|