102 lines
4.0 KiB
C#
102 lines
4.0 KiB
C#
using Akka.Cluster;
|
|
using Microsoft.EntityFrameworkCore;
|
|
using Microsoft.Extensions.DependencyInjection;
|
|
using Shouldly;
|
|
using Xunit;
|
|
using ZB.MOM.WW.OtOpcUa.Commons.Interfaces;
|
|
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Admin;
|
|
using ZB.MOM.WW.OtOpcUa.Configuration;
|
|
using ZB.MOM.WW.OtOpcUa.Configuration.Enums;
|
|
|
|
namespace ZB.MOM.WW.OtOpcUa.Host.IntegrationTests;
|
|
|
|
/// <summary>
|
|
/// Failover scenarios layered on <see cref="TwoNodeClusterHarness"/> Stop/Restart primitives.
|
|
/// Covers graceful node loss, rejoin on the same Akka port, and deployment under reduced membership.
|
|
/// </summary>
|
|
public sealed class FailoverDuringDeployTests
|
|
{
|
|
private static CancellationToken Ct => TestContext.Current.CancellationToken;
|
|
|
|
[Fact]
|
|
public async Task Stopping_node_b_shrinks_cluster_to_one_up_member()
|
|
{
|
|
await using var harness = await TwoNodeClusterHarness.StartAsync();
|
|
Akka.Cluster.Cluster.Get(harness.NodeASystem).State.Members
|
|
.Count(m => m.Status == MemberStatus.Up).ShouldBe(2);
|
|
|
|
await harness.StopNodeBAsync();
|
|
await harness.WaitForClusterSizeAsync(1, TimeSpan.FromSeconds(20));
|
|
|
|
Akka.Cluster.Cluster.Get(harness.NodeASystem).State.Members
|
|
.Count(m => m.Status == MemberStatus.Up).ShouldBe(1);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Restarted_node_b_rejoins_cluster_on_same_port()
|
|
{
|
|
await using var harness = await TwoNodeClusterHarness.StartAsync();
|
|
|
|
await harness.StopNodeBAsync();
|
|
await harness.WaitForClusterSizeAsync(1, TimeSpan.FromSeconds(20));
|
|
|
|
await harness.RestartNodeBAsync();
|
|
|
|
Akka.Cluster.Cluster.Get(harness.NodeASystem).State.Members
|
|
.Count(m => m.Status == MemberStatus.Up).ShouldBe(2);
|
|
Akka.Cluster.Cluster.Get(harness.NodeBSystem).State.Members
|
|
.Count(m => m.Status == MemberStatus.Up).ShouldBe(2);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Deployment_started_with_node_b_down_seals_with_one_node_state()
|
|
{
|
|
// Establishes that ConfigPublishCoordinator.DiscoverDriverNodes snapshots membership at
|
|
// dispatch time — when only node A is Up, only one ApplyAck is expected and the
|
|
// deployment seals without B ever participating.
|
|
await using var harness = await TwoNodeClusterHarness.StartAsync();
|
|
|
|
await harness.StopNodeBAsync();
|
|
await harness.WaitForClusterSizeAsync(1, TimeSpan.FromSeconds(20));
|
|
|
|
await using var scope = harness.NodeA.Services.CreateAsyncScope();
|
|
var client = scope.ServiceProvider.GetRequiredService<IAdminOperationsClient>();
|
|
|
|
var result = await client.StartDeploymentAsync(createdBy: "alice@test", Ct);
|
|
result.Outcome.ShouldBe(StartDeploymentOutcome.Accepted);
|
|
var deploymentId = result.DeploymentId!.Value.Value;
|
|
|
|
await WaitForAsync(async () =>
|
|
{
|
|
await using var db = await CreateDbAsync(harness);
|
|
var d = await db.Deployments.AsNoTracking()
|
|
.FirstOrDefaultAsync(d => d.DeploymentId == deploymentId, Ct);
|
|
return d?.Status == DeploymentStatus.Sealed;
|
|
}, TimeSpan.FromSeconds(15));
|
|
|
|
await using var db = await CreateDbAsync(harness);
|
|
var nodeStates = await db.NodeDeploymentStates.AsNoTracking()
|
|
.Where(s => s.DeploymentId == deploymentId)
|
|
.ToListAsync(Ct);
|
|
nodeStates.Count.ShouldBe(1);
|
|
nodeStates[0].Status.ShouldBe(NodeDeploymentStatus.Applied);
|
|
}
|
|
|
|
private static async Task<OtOpcUaConfigDbContext> CreateDbAsync(TwoNodeClusterHarness harness)
|
|
{
|
|
var factory = harness.NodeA.Services.GetRequiredService<IDbContextFactory<OtOpcUaConfigDbContext>>();
|
|
return await factory.CreateDbContextAsync();
|
|
}
|
|
|
|
private static async Task WaitForAsync(Func<Task<bool>> condition, TimeSpan timeout)
|
|
{
|
|
var deadline = DateTime.UtcNow + timeout;
|
|
while (DateTime.UtcNow < deadline)
|
|
{
|
|
if (await condition()) return;
|
|
await Task.Delay(200);
|
|
}
|
|
throw new TimeoutException($"Condition not met within {timeout}");
|
|
}
|
|
}
|