test(integration): F22 — failover scenario tests + harness Stop/Restart primitives
Extends TwoNodeClusterHarness with three lifecycle primitives: - StopNodeBAsync() — graceful CoordinatedShutdown (Cluster.Leave) - RestartNodeBAsync() — rebuild node B on same Akka port + same in-memory DB - WaitForClusterSizeAsync(n) — converge assertion helper Adds three failover scenario tests: - Stopping node B shrinks cluster to 1 Up member - Restarted node B rejoins on the same Akka port - Deployment started with B down seals with a single NodeDeploymentState (validates ConfigPublishCoordinator.DiscoverDriverNodes snapshots membership at dispatch time) Closes follow-up F22. Integration test count: 6 → 9 (+3).
This commit is contained in:
@@ -0,0 +1,101 @@
|
||||
using Akka.Cluster;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Interfaces;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Admin;
|
||||
using ZB.MOM.WW.OtOpcUa.Configuration;
|
||||
using ZB.MOM.WW.OtOpcUa.Configuration.Enums;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Host.IntegrationTests;
|
||||
|
||||
/// <summary>
|
||||
/// Failover scenarios layered on <see cref="TwoNodeClusterHarness"/> Stop/Restart primitives.
|
||||
/// Covers graceful node loss, rejoin on the same Akka port, and deployment under reduced membership.
|
||||
/// </summary>
|
||||
public sealed class FailoverScenarioTests
|
||||
{
|
||||
private static CancellationToken Ct => TestContext.Current.CancellationToken;
|
||||
|
||||
[Fact]
|
||||
public async Task Stopping_node_b_shrinks_cluster_to_one_up_member()
|
||||
{
|
||||
await using var harness = await TwoNodeClusterHarness.StartAsync();
|
||||
Akka.Cluster.Cluster.Get(harness.NodeASystem).State.Members
|
||||
.Count(m => m.Status == MemberStatus.Up).ShouldBe(2);
|
||||
|
||||
await harness.StopNodeBAsync();
|
||||
await harness.WaitForClusterSizeAsync(1, TimeSpan.FromSeconds(20));
|
||||
|
||||
Akka.Cluster.Cluster.Get(harness.NodeASystem).State.Members
|
||||
.Count(m => m.Status == MemberStatus.Up).ShouldBe(1);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Restarted_node_b_rejoins_cluster_on_same_port()
|
||||
{
|
||||
await using var harness = await TwoNodeClusterHarness.StartAsync();
|
||||
|
||||
await harness.StopNodeBAsync();
|
||||
await harness.WaitForClusterSizeAsync(1, TimeSpan.FromSeconds(20));
|
||||
|
||||
await harness.RestartNodeBAsync();
|
||||
|
||||
Akka.Cluster.Cluster.Get(harness.NodeASystem).State.Members
|
||||
.Count(m => m.Status == MemberStatus.Up).ShouldBe(2);
|
||||
Akka.Cluster.Cluster.Get(harness.NodeBSystem).State.Members
|
||||
.Count(m => m.Status == MemberStatus.Up).ShouldBe(2);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Deployment_started_with_node_b_down_seals_with_one_node_state()
|
||||
{
|
||||
// Establishes that ConfigPublishCoordinator.DiscoverDriverNodes snapshots membership at
|
||||
// dispatch time — when only node A is Up, only one ApplyAck is expected and the
|
||||
// deployment seals without B ever participating.
|
||||
await using var harness = await TwoNodeClusterHarness.StartAsync();
|
||||
|
||||
await harness.StopNodeBAsync();
|
||||
await harness.WaitForClusterSizeAsync(1, TimeSpan.FromSeconds(20));
|
||||
|
||||
await using var scope = harness.NodeA.Services.CreateAsyncScope();
|
||||
var client = scope.ServiceProvider.GetRequiredService<IAdminOperationsClient>();
|
||||
|
||||
var result = await client.StartDeploymentAsync(createdBy: "alice@test", Ct);
|
||||
result.Outcome.ShouldBe(StartDeploymentOutcome.Accepted);
|
||||
var deploymentId = result.DeploymentId!.Value.Value;
|
||||
|
||||
await WaitForAsync(async () =>
|
||||
{
|
||||
await using var db = await CreateDbAsync(harness);
|
||||
var d = await db.Deployments.AsNoTracking()
|
||||
.FirstOrDefaultAsync(d => d.DeploymentId == deploymentId, Ct);
|
||||
return d?.Status == DeploymentStatus.Sealed;
|
||||
}, TimeSpan.FromSeconds(15));
|
||||
|
||||
await using var db = await CreateDbAsync(harness);
|
||||
var nodeStates = await db.NodeDeploymentStates.AsNoTracking()
|
||||
.Where(s => s.DeploymentId == deploymentId)
|
||||
.ToListAsync(Ct);
|
||||
nodeStates.Count.ShouldBe(1);
|
||||
nodeStates[0].Status.ShouldBe(NodeDeploymentStatus.Applied);
|
||||
}
|
||||
|
||||
private static async Task<OtOpcUaConfigDbContext> CreateDbAsync(TwoNodeClusterHarness harness)
|
||||
{
|
||||
var factory = harness.NodeA.Services.GetRequiredService<IDbContextFactory<OtOpcUaConfigDbContext>>();
|
||||
return await factory.CreateDbContextAsync();
|
||||
}
|
||||
|
||||
private static async Task WaitForAsync(Func<Task<bool>> condition, TimeSpan timeout)
|
||||
{
|
||||
var deadline = DateTime.UtcNow + timeout;
|
||||
while (DateTime.UtcNow < deadline)
|
||||
{
|
||||
if (await condition()) return;
|
||||
await Task.Delay(200);
|
||||
}
|
||||
throw new TimeoutException($"Condition not met within {timeout}");
|
||||
}
|
||||
}
|
||||
@@ -88,6 +88,58 @@ public sealed class TwoNodeClusterHarness : IAsyncDisposable
|
||||
return harness;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Gracefully shuts down node B via <see cref="WebApplication.DisposeAsync"/>, which runs
|
||||
/// CoordinatedShutdown → Cluster.Leave. Node A sees the member transition to Removed within
|
||||
/// a couple of seconds. Use this for failover scenarios; call <see cref="RestartNodeBAsync"/>
|
||||
/// to bring it back on the same Akka port.
|
||||
/// </summary>
|
||||
public async Task StopNodeBAsync()
|
||||
{
|
||||
if (NodeB is null) return;
|
||||
await NodeB.DisposeAsync();
|
||||
NodeB = null!;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Rebuilds node B on the same Akka port + same in-memory ConfigDb and waits for the cluster
|
||||
/// to re-converge to 2 Up members. Use after <see cref="StopNodeBAsync"/> to test rejoin.
|
||||
/// </summary>
|
||||
public async Task RestartNodeBAsync(TimeSpan? formationTimeout = null)
|
||||
{
|
||||
NodeB = await BuildNodeAsync(
|
||||
host: LoopbackHost,
|
||||
akkaPort: NodeBAkkaPort,
|
||||
seedHost: LoopbackHost,
|
||||
seedAkkaPort: NodeAAkkaPort,
|
||||
dbName: SharedDbName);
|
||||
|
||||
await WaitForClusterFormationAsync(
|
||||
NodeASystem,
|
||||
NodeBSystem,
|
||||
formationTimeout ?? TimeSpan.FromSeconds(20));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Waits for node A's cluster view to reach <paramref name="expectedUpMembers"/> members in
|
||||
/// <see cref="MemberStatus.Up"/>. Used for asserting shrink-after-stop or grow-after-restart.
|
||||
/// </summary>
|
||||
public async Task WaitForClusterSizeAsync(int expectedUpMembers, TimeSpan timeout)
|
||||
{
|
||||
var deadline = DateTime.UtcNow + timeout;
|
||||
while (DateTime.UtcNow < deadline)
|
||||
{
|
||||
var count = Akka.Cluster.Cluster.Get(NodeASystem).State.Members
|
||||
.Count(m => m.Status == MemberStatus.Up);
|
||||
if (count == expectedUpMembers) return;
|
||||
await Task.Delay(200);
|
||||
}
|
||||
var actual = Akka.Cluster.Cluster.Get(NodeASystem).State.Members
|
||||
.Count(m => m.Status == MemberStatus.Up);
|
||||
throw new TimeoutException(
|
||||
$"Cluster did not converge to {expectedUpMembers} Up members within {timeout}. Actual={actual}");
|
||||
}
|
||||
|
||||
private static async Task<WebApplication> BuildNodeAsync(
|
||||
string host, int akkaPort, string seedHost, int seedAkkaPort, string dbName)
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user