From 62e12dab95fc13d2464a301e6172f131e3b50cd8 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Tue, 26 May 2026 04:53:29 -0400 Subject: [PATCH] feat(controlplane): ConfigPublishCoordinator happy path with NodeDeploymentState seeding --- .../Coordinators/ConfigPublishCoordinator.cs | 165 ++++++++++++++++++ .../ConfigPublishCoordinatorTests.cs | 76 ++++++++ 2 files changed, 241 insertions(+) create mode 100644 src/Server/ZB.MOM.WW.OtOpcUa.ControlPlane/Coordinators/ConfigPublishCoordinator.cs create mode 100644 tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests/ConfigPublishCoordinatorTests.cs diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.ControlPlane/Coordinators/ConfigPublishCoordinator.cs b/src/Server/ZB.MOM.WW.OtOpcUa.ControlPlane/Coordinators/ConfigPublishCoordinator.cs new file mode 100644 index 0000000..94b73b8 --- /dev/null +++ b/src/Server/ZB.MOM.WW.OtOpcUa.ControlPlane/Coordinators/ConfigPublishCoordinator.cs @@ -0,0 +1,165 @@ +using Akka.Actor; +using Akka.Cluster; +using Akka.Cluster.Tools.PublishSubscribe; +using Akka.Event; +using Microsoft.EntityFrameworkCore; +using ZB.MOM.WW.OtOpcUa.Commons.Messages.Deploy; +using ZB.MOM.WW.OtOpcUa.Commons.Types; +using ZB.MOM.WW.OtOpcUa.Configuration; +using ZB.MOM.WW.OtOpcUa.Configuration.Entities; +using ZB.MOM.WW.OtOpcUa.Configuration.Enums; + +namespace ZB.MOM.WW.OtOpcUa.ControlPlane.Coordinators; + +/// +/// Admin-role cluster singleton that drives a deployment through its lifecycle: dispatches a +/// over DistributedPubSub on the deployments topic, gathers +/// per-node replies, and seals the deployment when every expected node +/// has acked Applied. Per-node ACKs are persisted in NodeDeploymentState so a failover of +/// this singleton (Task 31) can recover in-flight state from the DB. +/// +/// Discovery of the "expected ACK set" comes from Akka.Cluster.State.Members filtered by +/// the driver role — the DB does not own per-node role assignment. +/// +public sealed class ConfigPublishCoordinator : ReceiveActor +{ + public const string DeploymentsTopic = "deployments"; + + private readonly IDbContextFactory _dbFactory; + private readonly ILoggingAdapter _log = Context.GetLogger(); + private readonly Dictionary _acks = new(); + + private DeploymentId? _current; + private HashSet _expectedAcks = new(); + + public static Props Props(IDbContextFactory dbFactory) => + Akka.Actor.Props.Create(() => new ConfigPublishCoordinator(dbFactory)); + + public ConfigPublishCoordinator(IDbContextFactory dbFactory) + { + _dbFactory = dbFactory; + Receive(HandleDispatch); + Receive(HandleAck); + } + + private void HandleDispatch(DispatchDeployment msg) + { + _current = msg.DeploymentId; + _acks.Clear(); + _expectedAcks = DiscoverDriverNodes(); + + // Seed NodeDeploymentState rows so a failover knows which nodes were expected to ack. + using (var db = _dbFactory.CreateDbContext()) + { + foreach (var node in _expectedAcks) + { + db.NodeDeploymentStates.Add(new NodeDeploymentState + { + NodeId = node.Value, + DeploymentId = msg.DeploymentId.Value, + Status = NodeDeploymentStatus.Applying, + }); + } + UpdateDeploymentStatus(db, msg.DeploymentId, DeploymentStatus.AwaitingApplyAcks); + db.SaveChanges(); + } + + DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish(DeploymentsTopic, msg)); + + if (_expectedAcks.Count == 0) + { + // No driver-role members. Seal immediately — the alternative is hanging forever + // waiting for ACKs that will never come. + _log.Warning("DispatchDeployment {Id}: no driver-role members in cluster; sealing empty", + msg.DeploymentId); + SealDeployment(); + } + } + + private void HandleAck(ApplyAck msg) + { + if (_current is null || msg.DeploymentId != _current.Value) + { + _log.Debug("Discarding stale ApplyAck for {Id} (current={Current})", + msg.DeploymentId, _current); + return; + } + + _acks[msg.NodeId] = msg.Outcome; + PersistNodeAck(msg); + + if (_acks.Count < _expectedAcks.Count) return; + + if (_acks.Values.All(o => o == ApplyAckOutcome.Applied)) + SealDeployment(); + else + MarkPartiallyFailed(); + } + + private void PersistNodeAck(ApplyAck msg) + { + using var db = _dbFactory.CreateDbContext(); + var row = db.NodeDeploymentStates + .FirstOrDefault(x => x.NodeId == msg.NodeId.Value && x.DeploymentId == msg.DeploymentId.Value); + if (row is null) return; + + row.Status = msg.Outcome == ApplyAckOutcome.Applied + ? NodeDeploymentStatus.Applied + : NodeDeploymentStatus.Failed; + row.AppliedAtUtc = DateTime.UtcNow; + row.FailureReason = msg.FailureReason; + db.SaveChanges(); + } + + private void SealDeployment() + { + if (_current is null) return; + using var db = _dbFactory.CreateDbContext(); + UpdateDeploymentStatus(db, _current.Value, DeploymentStatus.Sealed, sealNow: true); + db.SaveChanges(); + _log.Info("Deployment {Id} sealed (acks={Count})", _current.Value, _acks.Count); + ResetForNext(); + } + + private void MarkPartiallyFailed() + { + if (_current is null) return; + using var db = _dbFactory.CreateDbContext(); + UpdateDeploymentStatus(db, _current.Value, DeploymentStatus.PartiallyFailed); + db.SaveChanges(); + _log.Warning("Deployment {Id} partially failed; acks={Acks}", _current.Value, + string.Join(",", _acks.Select(kv => $"{kv.Key.Value}={kv.Value}"))); + ResetForNext(); + } + + private void ResetForNext() + { + _current = null; + _expectedAcks.Clear(); + _acks.Clear(); + } + + private static void UpdateDeploymentStatus( + OtOpcUaConfigDbContext db, DeploymentId id, DeploymentStatus status, bool sealNow = false) + { + var d = db.Deployments.FirstOrDefault(x => x.DeploymentId == id.Value); + if (d is null) return; + d.Status = status; + if (sealNow) d.SealedAtUtc = DateTime.UtcNow; + } + + private HashSet DiscoverDriverNodes() + { + var cluster = Akka.Cluster.Cluster.Get(Context.System); + var nodes = new HashSet(); + foreach (var member in cluster.State.Members) + { + if (member.Status is not (MemberStatus.Up or MemberStatus.Joining)) continue; + if (!member.Roles.Contains("driver")) continue; + var host = member.Address.Host; + if (string.IsNullOrWhiteSpace(host)) continue; + nodes.Add(NodeId.Parse(host)); + } + return nodes; + } +} diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests/ConfigPublishCoordinatorTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests/ConfigPublishCoordinatorTests.cs new file mode 100644 index 0000000..2bb5b29 --- /dev/null +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests/ConfigPublishCoordinatorTests.cs @@ -0,0 +1,76 @@ +using Akka.Actor; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Commons.Messages.Deploy; +using ZB.MOM.WW.OtOpcUa.Commons.Types; +using ZB.MOM.WW.OtOpcUa.Configuration.Enums; +using ZB.MOM.WW.OtOpcUa.ControlPlane.Coordinators; +using ZB.MOM.WW.OtOpcUa.ControlPlane.Tests.Harness; + +namespace ZB.MOM.WW.OtOpcUa.ControlPlane.Tests; + +public sealed class ConfigPublishCoordinatorTests : ControlPlaneActorTestBase +{ + private static readonly RevisionHash TestRevision = RevisionHash.Parse(new string('a', 64)); + + [Fact] + public void EmptyCluster_dispatch_seals_immediately() + { + // With no driver-role cluster members in scope, the coordinator has nobody to wait for + // and seals the deployment right after writing the AwaitingApplyAcks status. + var dbFactory = NewInMemoryDbFactory(); + var deploymentId = SeedDispatchingDeployment(dbFactory); + + var actor = Sys.ActorOf(ConfigPublishCoordinator.Props(dbFactory)); + actor.Tell(new DispatchDeployment(deploymentId, TestRevision, CorrelationId.NewId())); + + AwaitAssert(() => + { + using var db = dbFactory.CreateDbContext(); + var status = db.Deployments.Single().Status; + status.ShouldBe(DeploymentStatus.Sealed); + }, duration: TimeSpan.FromSeconds(3)); + } + + [Fact] + public void Stale_ApplyAck_after_seal_is_ignored() + { + var dbFactory = NewInMemoryDbFactory(); + var deploymentId = SeedDispatchingDeployment(dbFactory); + var actor = Sys.ActorOf(ConfigPublishCoordinator.Props(dbFactory)); + + actor.Tell(new DispatchDeployment(deploymentId, TestRevision, CorrelationId.NewId())); + + // Wait for seal. + AwaitAssert(() => + { + using var db = dbFactory.CreateDbContext(); + db.Deployments.Single().Status.ShouldBe(DeploymentStatus.Sealed); + }, duration: TimeSpan.FromSeconds(3)); + + // Now send a late ApplyAck for the just-sealed deployment. Should be a no-op — neither + // crash the actor nor modify the row. We give it a beat and re-check the status. + actor.Tell(new ApplyAck(deploymentId, NodeId.Parse("ghost-node"), + ApplyAckOutcome.Applied, null, CorrelationId.NewId())); + + ExpectNoMsg(TimeSpan.FromMilliseconds(250)); + using var db = dbFactory.CreateDbContext(); + db.Deployments.Single().Status.ShouldBe(DeploymentStatus.Sealed); + } + + private static DeploymentId SeedDispatchingDeployment( + Microsoft.EntityFrameworkCore.IDbContextFactory dbFactory) + { + var id = DeploymentId.NewId(); + using var db = dbFactory.CreateDbContext(); + db.Deployments.Add(new Configuration.Entities.Deployment + { + DeploymentId = id.Value, + RevisionHash = TestRevision.Value, + Status = DeploymentStatus.Dispatching, + CreatedBy = "test", + }); + db.SaveChanges(); + return id; + } +}