using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.DependencyInjection; using Shouldly; using Xunit; using ZB.MOM.WW.OtOpcUa.Commons.Interfaces; using ZB.MOM.WW.OtOpcUa.Commons.Messages.Admin; using ZB.MOM.WW.OtOpcUa.Configuration; using ZB.MOM.WW.OtOpcUa.Configuration.Enums; namespace ZB.MOM.WW.OtOpcUa.Host.IntegrationTests; /// /// End-to-end deploy: AdminOperationsActor → ConfigPublishCoordinator → DriverHostActor on both /// nodes → ApplyAck → coordinator seals. Verifies the cluster singleton + DistributedPubSub /// dispatch path actually works against the real Akka cluster the harness builds. /// public sealed class DeployHappyPathTests { private static CancellationToken Ct => TestContext.Current.CancellationToken; [Fact] public async Task StartDeployment_seals_after_both_nodes_apply() { await using var harness = await TwoNodeClusterHarness.StartAsync(); await using var scope = harness.NodeA.Services.CreateAsyncScope(); var client = scope.ServiceProvider.GetRequiredService(); var result = await client.StartDeploymentAsync(createdBy: "alice@test", Ct); result.Outcome.ShouldBe(StartDeploymentOutcome.Accepted); result.DeploymentId.ShouldNotBeNull(); var deploymentId = result.DeploymentId!.Value.Value; // Wait up to 15s for coordinator to seal after both DriverHostActors ack. await WaitForAsync(async () => { await using var db = await CreateDbAsync(harness); var d = await db.Deployments.AsNoTracking().FirstOrDefaultAsync(d => d.DeploymentId == deploymentId, Ct); return d?.Status == DeploymentStatus.Sealed; }, TimeSpan.FromSeconds(15)); await using var db = await CreateDbAsync(harness); var deployment = await db.Deployments.AsNoTracking() .FirstAsync(d => d.DeploymentId == deploymentId, Ct); deployment.Status.ShouldBe(DeploymentStatus.Sealed); deployment.SealedAtUtc.ShouldNotBeNull(); var nodeStates = await db.NodeDeploymentStates.AsNoTracking() .Where(s => s.DeploymentId == deploymentId) .ToListAsync(Ct); nodeStates.Count.ShouldBe(2); nodeStates.ShouldAllBe(s => s.Status == NodeDeploymentStatus.Applied); } [Fact] public async Task Replaying_dispatch_to_same_revision_is_idempotent_no_op() { await using var harness = await TwoNodeClusterHarness.StartAsync(); await using var scope = harness.NodeA.Services.CreateAsyncScope(); var client = scope.ServiceProvider.GetRequiredService(); var first = await client.StartDeploymentAsync(createdBy: "alice@test", Ct); first.Outcome.ShouldBe(StartDeploymentOutcome.Accepted); await WaitForAsync(async () => { await using var db = await CreateDbAsync(harness); var d = await db.Deployments.AsNoTracking() .FirstOrDefaultAsync(d => d.DeploymentId == first.DeploymentId!.Value.Value, Ct); return d?.Status == DeploymentStatus.Sealed; }, TimeSpan.FromSeconds(15)); // Same DB state → same revision hash → AdminOperations short-circuits with NoChanges, // OR (if it accepts) the second dispatch is idempotent (DriverHostActor recognises // _currentRevision == msg.RevisionHash and acks Applied without re-applying). var second = await client.StartDeploymentAsync(createdBy: "alice@test", Ct); if (second.Outcome == StartDeploymentOutcome.Accepted) { // Second deployment row should also seal quickly because both drivers immediate-ack. await WaitForAsync(async () => { await using var db = await CreateDbAsync(harness); var d = await db.Deployments.AsNoTracking() .FirstOrDefaultAsync(d => d.DeploymentId == second.DeploymentId!.Value.Value, Ct); return d?.Status == DeploymentStatus.Sealed; }, TimeSpan.FromSeconds(15)); second.RevisionHash!.Value.Value.ShouldBe(first.RevisionHash!.Value.Value); } else { second.Outcome.ShouldBeOneOf(StartDeploymentOutcome.NoChanges, StartDeploymentOutcome.AnotherDeploymentInFlight); } } private static async Task CreateDbAsync(TwoNodeClusterHarness harness) { var factory = harness.NodeA.Services.GetRequiredService>(); return await factory.CreateDbContextAsync(); } private static async Task WaitForAsync(Func> condition, TimeSpan timeout) { var deadline = DateTime.UtcNow + timeout; while (DateTime.UtcNow < deadline) { if (await condition()) return; await Task.Delay(200); } throw new TimeoutException($"Condition not met within {timeout}"); } }