feat(controlplane): ConfigPublishCoordinator happy path with NodeDeploymentState seeding

This commit is contained in:
Joseph Doherty
2026-05-26 04:53:29 -04:00
parent ef683f5073
commit 62e12dab95
2 changed files with 241 additions and 0 deletions

View File

@@ -0,0 +1,165 @@
using Akka.Actor;
using Akka.Cluster;
using Akka.Cluster.Tools.PublishSubscribe;
using Akka.Event;
using Microsoft.EntityFrameworkCore;
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Deploy;
using ZB.MOM.WW.OtOpcUa.Commons.Types;
using ZB.MOM.WW.OtOpcUa.Configuration;
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
using ZB.MOM.WW.OtOpcUa.Configuration.Enums;
namespace ZB.MOM.WW.OtOpcUa.ControlPlane.Coordinators;
/// <summary>
/// Admin-role cluster singleton that drives a deployment through its lifecycle: dispatches a
/// <see cref="DispatchDeployment"/> over DistributedPubSub on the <c>deployments</c> topic, gathers
/// per-node <see cref="ApplyAck"/> replies, and seals the deployment when every expected node
/// has acked Applied. Per-node ACKs are persisted in <c>NodeDeploymentState</c> so a failover of
/// this singleton (Task 31) can recover in-flight state from the DB.
///
/// Discovery of the "expected ACK set" comes from <c>Akka.Cluster.State.Members</c> filtered by
/// the <c>driver</c> role — the DB does not own per-node role assignment.
/// </summary>
public sealed class ConfigPublishCoordinator : ReceiveActor
{
public const string DeploymentsTopic = "deployments";
private readonly IDbContextFactory<OtOpcUaConfigDbContext> _dbFactory;
private readonly ILoggingAdapter _log = Context.GetLogger();
private readonly Dictionary<NodeId, ApplyAckOutcome> _acks = new();
private DeploymentId? _current;
private HashSet<NodeId> _expectedAcks = new();
public static Props Props(IDbContextFactory<OtOpcUaConfigDbContext> dbFactory) =>
Akka.Actor.Props.Create(() => new ConfigPublishCoordinator(dbFactory));
public ConfigPublishCoordinator(IDbContextFactory<OtOpcUaConfigDbContext> dbFactory)
{
_dbFactory = dbFactory;
Receive<DispatchDeployment>(HandleDispatch);
Receive<ApplyAck>(HandleAck);
}
private void HandleDispatch(DispatchDeployment msg)
{
_current = msg.DeploymentId;
_acks.Clear();
_expectedAcks = DiscoverDriverNodes();
// Seed NodeDeploymentState rows so a failover knows which nodes were expected to ack.
using (var db = _dbFactory.CreateDbContext())
{
foreach (var node in _expectedAcks)
{
db.NodeDeploymentStates.Add(new NodeDeploymentState
{
NodeId = node.Value,
DeploymentId = msg.DeploymentId.Value,
Status = NodeDeploymentStatus.Applying,
});
}
UpdateDeploymentStatus(db, msg.DeploymentId, DeploymentStatus.AwaitingApplyAcks);
db.SaveChanges();
}
DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish(DeploymentsTopic, msg));
if (_expectedAcks.Count == 0)
{
// No driver-role members. Seal immediately — the alternative is hanging forever
// waiting for ACKs that will never come.
_log.Warning("DispatchDeployment {Id}: no driver-role members in cluster; sealing empty",
msg.DeploymentId);
SealDeployment();
}
}
private void HandleAck(ApplyAck msg)
{
if (_current is null || msg.DeploymentId != _current.Value)
{
_log.Debug("Discarding stale ApplyAck for {Id} (current={Current})",
msg.DeploymentId, _current);
return;
}
_acks[msg.NodeId] = msg.Outcome;
PersistNodeAck(msg);
if (_acks.Count < _expectedAcks.Count) return;
if (_acks.Values.All(o => o == ApplyAckOutcome.Applied))
SealDeployment();
else
MarkPartiallyFailed();
}
private void PersistNodeAck(ApplyAck msg)
{
using var db = _dbFactory.CreateDbContext();
var row = db.NodeDeploymentStates
.FirstOrDefault(x => x.NodeId == msg.NodeId.Value && x.DeploymentId == msg.DeploymentId.Value);
if (row is null) return;
row.Status = msg.Outcome == ApplyAckOutcome.Applied
? NodeDeploymentStatus.Applied
: NodeDeploymentStatus.Failed;
row.AppliedAtUtc = DateTime.UtcNow;
row.FailureReason = msg.FailureReason;
db.SaveChanges();
}
private void SealDeployment()
{
if (_current is null) return;
using var db = _dbFactory.CreateDbContext();
UpdateDeploymentStatus(db, _current.Value, DeploymentStatus.Sealed, sealNow: true);
db.SaveChanges();
_log.Info("Deployment {Id} sealed (acks={Count})", _current.Value, _acks.Count);
ResetForNext();
}
private void MarkPartiallyFailed()
{
if (_current is null) return;
using var db = _dbFactory.CreateDbContext();
UpdateDeploymentStatus(db, _current.Value, DeploymentStatus.PartiallyFailed);
db.SaveChanges();
_log.Warning("Deployment {Id} partially failed; acks={Acks}", _current.Value,
string.Join(",", _acks.Select(kv => $"{kv.Key.Value}={kv.Value}")));
ResetForNext();
}
private void ResetForNext()
{
_current = null;
_expectedAcks.Clear();
_acks.Clear();
}
private static void UpdateDeploymentStatus(
OtOpcUaConfigDbContext db, DeploymentId id, DeploymentStatus status, bool sealNow = false)
{
var d = db.Deployments.FirstOrDefault(x => x.DeploymentId == id.Value);
if (d is null) return;
d.Status = status;
if (sealNow) d.SealedAtUtc = DateTime.UtcNow;
}
private HashSet<NodeId> DiscoverDriverNodes()
{
var cluster = Akka.Cluster.Cluster.Get(Context.System);
var nodes = new HashSet<NodeId>();
foreach (var member in cluster.State.Members)
{
if (member.Status is not (MemberStatus.Up or MemberStatus.Joining)) continue;
if (!member.Roles.Contains("driver")) continue;
var host = member.Address.Host;
if (string.IsNullOrWhiteSpace(host)) continue;
nodes.Add(NodeId.Parse(host));
}
return nodes;
}
}