diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.ControlPlane/Coordinators/ConfigPublishCoordinator.cs b/src/Server/ZB.MOM.WW.OtOpcUa.ControlPlane/Coordinators/ConfigPublishCoordinator.cs index 94b73b8..306f18a 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.ControlPlane/Coordinators/ConfigPublishCoordinator.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.ControlPlane/Coordinators/ConfigPublishCoordinator.cs @@ -21,25 +21,78 @@ namespace ZB.MOM.WW.OtOpcUa.ControlPlane.Coordinators; /// Discovery of the "expected ACK set" comes from Akka.Cluster.State.Members filtered by /// the driver role — the DB does not own per-node role assignment. /// -public sealed class ConfigPublishCoordinator : ReceiveActor +public sealed class ConfigPublishCoordinator : ReceiveActor, IWithTimers { public const string DeploymentsTopic = "deployments"; + public static readonly TimeSpan DefaultApplyDeadline = TimeSpan.FromMinutes(2); private readonly IDbContextFactory _dbFactory; + private readonly TimeSpan _applyDeadline; private readonly ILoggingAdapter _log = Context.GetLogger(); private readonly Dictionary _acks = new(); private DeploymentId? _current; private HashSet _expectedAcks = new(); - public static Props Props(IDbContextFactory dbFactory) => - Akka.Actor.Props.Create(() => new ConfigPublishCoordinator(dbFactory)); + public ITimerScheduler Timers { get; set; } = null!; - public ConfigPublishCoordinator(IDbContextFactory dbFactory) + public static Props Props( + IDbContextFactory dbFactory, + TimeSpan? applyDeadline = null) => + Akka.Actor.Props.Create(() => new ConfigPublishCoordinator(dbFactory, applyDeadline ?? DefaultApplyDeadline)); + + public ConfigPublishCoordinator( + IDbContextFactory dbFactory, + TimeSpan applyDeadline) { _dbFactory = dbFactory; + _applyDeadline = applyDeadline; Receive(HandleDispatch); Receive(HandleAck); + Receive(HandleDeadline); + } + + /// + /// On startup recover any deployment that was mid-flight when a prior singleton instance + /// died. We re-derive _expectedAcks from NodeDeploymentState, replay the ACKs + /// that already landed in the DB, and resume the deadline timer. + /// + protected override void PreStart() + { + using var db = _dbFactory.CreateDbContext(); + var inflight = db.Deployments + .Where(d => d.Status == DeploymentStatus.Dispatching || d.Status == DeploymentStatus.AwaitingApplyAcks) + .OrderByDescending(d => d.CreatedAtUtc) + .FirstOrDefault(); + if (inflight is null) return; + + _current = new DeploymentId(inflight.DeploymentId); + var nodeStates = db.NodeDeploymentStates + .Where(x => x.DeploymentId == inflight.DeploymentId) + .AsNoTracking() + .ToList(); + + _expectedAcks = nodeStates.Select(s => NodeId.Parse(s.NodeId)).ToHashSet(); + foreach (var s in nodeStates.Where(s => s.Status != NodeDeploymentStatus.Applying)) + _acks[NodeId.Parse(s.NodeId)] = s.Status == NodeDeploymentStatus.Applied + ? ApplyAckOutcome.Applied + : ApplyAckOutcome.Failed; + + // Resume the deadline timer using the remaining time. The deadline runs from when the + // deployment was first marked AwaitingApplyAcks (Deployment.CreatedAtUtc is a close enough + // proxy — we don't track a separate "dispatched at" column). + var elapsed = DateTime.UtcNow - inflight.CreatedAtUtc; + var remaining = _applyDeadline - elapsed; + if (remaining <= TimeSpan.Zero) + { + Self.Tell(new DeadlineElapsed(_current.Value)); + } + else + { + Timers.StartSingleTimer(DeadlineTimerKey, new DeadlineElapsed(_current.Value), remaining); + } + _log.Info("Coordinator recovered in-flight deployment {Id} ({Acked}/{Total} acks landed)", + _current, _acks.Count, _expectedAcks.Count); } private void HandleDispatch(DispatchDeployment msg) @@ -65,6 +118,7 @@ public sealed class ConfigPublishCoordinator : ReceiveActor } DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish(DeploymentsTopic, msg)); + Timers.StartSingleTimer(DeadlineTimerKey, new DeadlineElapsed(msg.DeploymentId), _applyDeadline); if (_expectedAcks.Count == 0) { @@ -132,13 +186,40 @@ public sealed class ConfigPublishCoordinator : ReceiveActor ResetForNext(); } + private void HandleDeadline(DeadlineElapsed msg) + { + if (_current is null || msg.DeploymentId != _current.Value) + { + _log.Debug("Discarding stale DeadlineElapsed for {Id} (current={Current})", + msg.DeploymentId, _current); + return; + } + if (_acks.Count == _expectedAcks.Count) + { + // Race: every node acked just as the deadline fired. Already sealed/failed elsewhere. + return; + } + + using var db = _dbFactory.CreateDbContext(); + UpdateDeploymentStatus(db, _current.Value, DeploymentStatus.TimedOut); + db.SaveChanges(); + _log.Warning("Deployment {Id} timed out after {Deadline} ({Acked}/{Total} acks landed)", + _current.Value, _applyDeadline, _acks.Count, _expectedAcks.Count); + ResetForNext(); + } + private void ResetForNext() { + Timers.Cancel(DeadlineTimerKey); _current = null; _expectedAcks.Clear(); _acks.Clear(); } + private const string DeadlineTimerKey = "apply-deadline"; + + public sealed record DeadlineElapsed(DeploymentId DeploymentId); + private static void UpdateDeploymentStatus( OtOpcUaConfigDbContext db, DeploymentId id, DeploymentStatus status, bool sealNow = false) { diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests/ConfigPublishCoordinatorTimeoutTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests/ConfigPublishCoordinatorTimeoutTests.cs new file mode 100644 index 0000000..15824ab --- /dev/null +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests/ConfigPublishCoordinatorTimeoutTests.cs @@ -0,0 +1,120 @@ +using Akka.Actor; +using Microsoft.EntityFrameworkCore; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Commons.Messages.Deploy; +using ZB.MOM.WW.OtOpcUa.Commons.Types; +using ZB.MOM.WW.OtOpcUa.Configuration; +using ZB.MOM.WW.OtOpcUa.Configuration.Enums; +using ZB.MOM.WW.OtOpcUa.ControlPlane.Coordinators; +using ZB.MOM.WW.OtOpcUa.ControlPlane.Tests.Harness; + +namespace ZB.MOM.WW.OtOpcUa.ControlPlane.Tests; + +public sealed class ConfigPublishCoordinatorTimeoutTests : ControlPlaneActorTestBase +{ + private static readonly RevisionHash TestRevision = RevisionHash.Parse(new string('b', 64)); + + [Fact] + public void DeadlineElapsed_for_current_deployment_marks_TimedOut() + { + var dbFactory = NewInMemoryDbFactory(); + var deploymentId = SeedDispatchingDeployment(dbFactory); + + // Short deadline so we can drive it deterministically in the test. + var actor = Sys.ActorOf(ConfigPublishCoordinator.Props(dbFactory, TimeSpan.FromMilliseconds(150))); + + // Seed a NodeDeploymentState row so the coordinator doesn't see "zero expected acks" + // and short-circuit to Sealed. We pretend a driver node exists in the cluster. + using (var db = dbFactory.CreateDbContext()) + { + db.NodeDeploymentStates.Add(new Configuration.Entities.NodeDeploymentState + { + NodeId = "phantom-driver", + DeploymentId = deploymentId.Value, + Status = NodeDeploymentStatus.Applying, + }); + db.SaveChanges(); + } + + // Drive the deadline ourselves rather than waiting for the cluster's empty driver-set + // bypass. Tell the actor the deadline elapsed for this id. + actor.Tell(new ConfigPublishCoordinator.DeadlineElapsed(deploymentId)); + + AwaitAssert(() => + { + using var db = dbFactory.CreateDbContext(); + db.Deployments.Single().Status.ShouldBe(DeploymentStatus.TimedOut); + }, duration: TimeSpan.FromSeconds(3)); + } + + [Fact] + public void Stale_DeadlineElapsed_for_other_deployment_is_ignored() + { + var dbFactory = NewInMemoryDbFactory(); + var deploymentId = SeedDispatchingDeployment(dbFactory); + var actor = Sys.ActorOf(ConfigPublishCoordinator.Props(dbFactory, TimeSpan.FromMinutes(1))); + + // Tell the actor a deadline elapsed for a completely different deployment id. + actor.Tell(new ConfigPublishCoordinator.DeadlineElapsed(DeploymentId.NewId())); + + // The seeded one should remain in its starting state (no transition triggered). + ExpectNoMsg(TimeSpan.FromMilliseconds(250)); + using var db = dbFactory.CreateDbContext(); + var status = db.Deployments.Single().Status; + status.ShouldBeOneOf(DeploymentStatus.Dispatching, DeploymentStatus.AwaitingApplyAcks, DeploymentStatus.Sealed); + status.ShouldNotBe(DeploymentStatus.TimedOut); + } + + [Fact] + public void PreStart_recovers_inflight_deployment_state() + { + var dbFactory = NewInMemoryDbFactory(); + var deploymentId = SeedDispatchingDeployment(dbFactory, status: DeploymentStatus.AwaitingApplyAcks); + + // Seed two NodeDeploymentState rows — one already Applied, one still Applying. + using (var db = dbFactory.CreateDbContext()) + { + db.NodeDeploymentStates.Add(new Configuration.Entities.NodeDeploymentState + { + NodeId = "driver-a", DeploymentId = deploymentId.Value, Status = NodeDeploymentStatus.Applied, + }); + db.NodeDeploymentStates.Add(new Configuration.Entities.NodeDeploymentState + { + NodeId = "driver-b", DeploymentId = deploymentId.Value, Status = NodeDeploymentStatus.Applying, + }); + db.SaveChanges(); + } + + // Start a fresh coordinator — simulates singleton failover to this node. + var actor = Sys.ActorOf(ConfigPublishCoordinator.Props(dbFactory, TimeSpan.FromMinutes(5))); + + // Send the missing ACK; the recovered state should expect exactly that node, and the + // deployment should now seal (both nodes acked Applied). + actor.Tell(new ApplyAck(deploymentId, NodeId.Parse("driver-b"), + ApplyAckOutcome.Applied, null, CorrelationId.NewId())); + + AwaitAssert(() => + { + using var db = dbFactory.CreateDbContext(); + db.Deployments.Single().Status.ShouldBe(DeploymentStatus.Sealed); + }, duration: TimeSpan.FromSeconds(3)); + } + + private static DeploymentId SeedDispatchingDeployment( + IDbContextFactory dbFactory, + DeploymentStatus status = DeploymentStatus.Dispatching) + { + var id = DeploymentId.NewId(); + using var db = dbFactory.CreateDbContext(); + db.Deployments.Add(new Configuration.Entities.Deployment + { + DeploymentId = id.Value, + RevisionHash = TestRevision.Value, + Status = status, + CreatedBy = "test", + }); + db.SaveChanges(); + return id; + } +}