feat(runtime): DriverHostActor state machine with PreStart recovery + DispatchDeployment + stale fallback

This commit is contained in:
Joseph Doherty
2026-05-26 05:02:42 -04:00
parent ea6f972e96
commit ed130135ca
5 changed files with 521 additions and 0 deletions

View File

@@ -0,0 +1,284 @@
using Akka.Actor;
using Akka.Cluster.Tools.PublishSubscribe;
using Akka.Event;
using Microsoft.EntityFrameworkCore;
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Deploy;
using ZB.MOM.WW.OtOpcUa.Commons.Types;
using ZB.MOM.WW.OtOpcUa.Configuration;
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
using ZB.MOM.WW.OtOpcUa.Configuration.Enums;
using CommonsNodeId = ZB.MOM.WW.OtOpcUa.Commons.Types.NodeId;
namespace ZB.MOM.WW.OtOpcUa.Runtime.Drivers;
/// <summary>
/// Per-node supervisor that receives <see cref="DispatchDeployment"/> from the admin-role
/// coordinator and applies the deployment locally. Three Become states:
///
/// <list type="bullet">
/// <item><c>Bootstrapping</c> — PreStart only. Reads <see cref="NodeDeploymentState"/> for self;
/// chooses next state.</item>
/// <item><c>Steady(rev)</c> — caught up. Idempotent on same-rev dispatch (immediate <c>ApplyAck.Applied</c>).
/// New rev → transitions to <c>Applying</c>.</item>
/// <item><c>Applying(id)</c> — applying a delta. Buffers further dispatches.</item>
/// <item><c>Stale</c> — ConfigDb unreachable on bootstrap. Background reconnect loop tries to advance.</item>
/// </list>
///
/// Children (DriverInstance/VirtualTag/etc.) are spawned in Phase 6 follow-up tasks (41-44).
/// For now the dispatch handler treats the apply as a no-op and writes the ACK back.
/// </summary>
public sealed class DriverHostActor : ReceiveActor, IWithTimers
{
public const string DeploymentsTopic = "deployments";
public static readonly TimeSpan ReconnectInterval = TimeSpan.FromSeconds(30);
private readonly IDbContextFactory<OtOpcUaConfigDbContext> _dbFactory;
private readonly CommonsNodeId _localNode;
private readonly IActorRef? _coordinatorOverride;
private readonly ILoggingAdapter _log = Context.GetLogger();
private RevisionHash? _currentRevision;
private DeploymentId? _applyingDeploymentId;
public ITimerScheduler Timers { get; set; } = null!;
public sealed class RetryConfigDbConnection
{
public static readonly RetryConfigDbConnection Instance = new();
private RetryConfigDbConnection() { }
}
public static Props Props(
IDbContextFactory<OtOpcUaConfigDbContext> dbFactory,
CommonsNodeId localNode,
IActorRef? coordinator = null) =>
Akka.Actor.Props.Create(() => new DriverHostActor(dbFactory, localNode, coordinator));
public DriverHostActor(
IDbContextFactory<OtOpcUaConfigDbContext> dbFactory,
CommonsNodeId localNode,
IActorRef? coordinator)
{
_dbFactory = dbFactory;
_localNode = localNode;
_coordinatorOverride = coordinator;
// Default behavior is Steady — PreStart may flip to Stale or replay an orphan apply.
Become(Steady);
}
protected override void PreStart()
{
// Subscribe to deployments topic so the coordinator's broadcast lands here.
DistributedPubSub.Get(Context.System).Mediator.Tell(new Subscribe(DeploymentsTopic, Self));
Bootstrap();
}
private void Bootstrap()
{
// Read the most-recent NodeDeploymentState for this node; if it's Applied, jump
// to Steady with that revision. If Applying (orphan from a crash), discard and replay.
// If the DB is unreachable, fall back to Stale and start the reconnect loop.
try
{
using var db = _dbFactory.CreateDbContext();
var latest = db.NodeDeploymentStates
.Where(s => s.NodeId == _localNode.Value)
.OrderByDescending(s => s.StartedAtUtc)
.Select(s => new { s.DeploymentId, s.Status, s.StartedAtUtc })
.FirstOrDefault();
if (latest is null)
{
_log.Info("DriverHost {Node}: no prior deployments; entering Steady (no revision)", _localNode);
Become(Steady);
return;
}
var deployment = db.Deployments
.AsNoTracking()
.FirstOrDefault(d => d.DeploymentId == latest.DeploymentId);
var revision = deployment is null
? (RevisionHash?)null
: RevisionHash.Parse(deployment.RevisionHash);
switch (latest.Status)
{
case NodeDeploymentStatus.Applied:
_currentRevision = revision;
_log.Info("DriverHost {Node}: recovered Applied state at rev {Rev}", _localNode, revision);
Become(Steady);
break;
case NodeDeploymentStatus.Applying:
_log.Warning("DriverHost {Node}: found orphan Applying row for deployment {Id}; replaying",
_localNode, latest.DeploymentId);
if (revision is not null) ApplyAndAck(new DeploymentId(latest.DeploymentId), revision.Value, CorrelationId.NewId());
else Become(Steady);
break;
case NodeDeploymentStatus.Failed:
default:
_log.Info("DriverHost {Node}: prior deployment {Id} failed; entering Steady at last known rev",
_localNode, latest.DeploymentId);
Become(Steady);
break;
}
}
catch (Exception ex)
{
_log.Warning(ex, "DriverHost {Node}: ConfigDb unreachable on bootstrap; entering Stale", _localNode);
Become(Stale);
}
}
private void Steady()
{
Receive<DispatchDeployment>(HandleDispatchFromSteady);
Receive<SubscribeAck>(_ => { /* PubSub ack */ });
}
private void Applying()
{
Receive<DispatchDeployment>(msg =>
{
if (_applyingDeploymentId is not null && msg.DeploymentId == _applyingDeploymentId.Value)
{
_log.Debug("DriverHost {Node}: duplicate DispatchDeployment for in-flight {Id}; ignoring",
_localNode, msg.DeploymentId);
return;
}
_log.Info("DriverHost {Node}: dispatch for {Id} received while still applying {Cur}; deferring",
_localNode, msg.DeploymentId, _applyingDeploymentId);
Self.Forward(msg); // re-deliver after we transition back
});
Receive<SubscribeAck>(_ => { /* PubSub ack */ });
}
private void Stale()
{
Receive<DispatchDeployment>(_ =>
{
_log.Warning("DriverHost {Node}: ignoring DispatchDeployment while Stale (DB unreachable)", _localNode);
});
Receive<RetryConfigDbConnection>(_ => TryRecoverFromStale());
Receive<SubscribeAck>(_ => { /* PubSub ack */ });
Timers.StartPeriodicTimer("retry-db", RetryConfigDbConnection.Instance, ReconnectInterval);
}
private void HandleDispatchFromSteady(DispatchDeployment msg)
{
if (_currentRevision is { } cur && cur == msg.RevisionHash)
{
// Idempotent — already at this rev. Ack and stay Steady.
_log.Debug("DriverHost {Node}: dispatch {Id} matches current rev {Rev}; immediate ACK",
_localNode, msg.DeploymentId, msg.RevisionHash);
SendAck(msg.DeploymentId, ApplyAckOutcome.Applied, failureReason: null, msg.CorrelationId);
return;
}
ApplyAndAck(msg.DeploymentId, msg.RevisionHash, msg.CorrelationId);
}
private void ApplyAndAck(DeploymentId deploymentId, RevisionHash revision, CorrelationId correlation)
{
_applyingDeploymentId = deploymentId;
Become(Applying);
// Persist Applying row (idempotent on PK).
UpsertNodeDeploymentState(deploymentId, NodeDeploymentStatus.Applying, failureReason: null);
try
{
// Future: dispatch ApplyDelta to children, wait for acks. For Task 37/38, just no-op.
_currentRevision = revision;
UpsertNodeDeploymentState(deploymentId, NodeDeploymentStatus.Applied, failureReason: null);
SendAck(deploymentId, ApplyAckOutcome.Applied, failureReason: null, correlation);
_log.Info("DriverHost {Node}: applied deployment {Id} (rev {Rev})", _localNode, deploymentId, revision);
}
catch (Exception ex)
{
UpsertNodeDeploymentState(deploymentId, NodeDeploymentStatus.Failed, ex.Message);
SendAck(deploymentId, ApplyAckOutcome.Failed, ex.Message, correlation);
_log.Error(ex, "DriverHost {Node}: apply of {Id} failed", _localNode, deploymentId);
}
finally
{
_applyingDeploymentId = null;
Become(Steady);
}
}
private void TryRecoverFromStale()
{
try
{
using var db = _dbFactory.CreateDbContext();
var latestSealed = db.Deployments
.AsNoTracking()
.Where(d => d.Status == DeploymentStatus.Sealed)
.OrderByDescending(d => d.SealedAtUtc)
.Select(d => new { d.DeploymentId, d.RevisionHash })
.FirstOrDefault();
_log.Info("DriverHost {Node}: ConfigDb back; recovering from Stale", _localNode);
Timers.Cancel("retry-db");
if (latestSealed is not null)
{
_currentRevision = RevisionHash.Parse(latestSealed.RevisionHash);
UpsertNodeDeploymentState(new DeploymentId(latestSealed.DeploymentId),
NodeDeploymentStatus.Applied, failureReason: null);
}
Become(Steady);
}
catch (Exception ex)
{
_log.Debug(ex, "DriverHost {Node}: still Stale; will retry in {Interval}", _localNode, ReconnectInterval);
}
}
private void UpsertNodeDeploymentState(DeploymentId deploymentId, NodeDeploymentStatus status, string? failureReason)
{
try
{
using var db = _dbFactory.CreateDbContext();
var existing = db.NodeDeploymentStates.FirstOrDefault(
x => x.NodeId == _localNode.Value && x.DeploymentId == deploymentId.Value);
if (existing is null)
{
db.NodeDeploymentStates.Add(new NodeDeploymentState
{
NodeId = _localNode.Value,
DeploymentId = deploymentId.Value,
Status = status,
FailureReason = failureReason,
AppliedAtUtc = status == NodeDeploymentStatus.Applied ? DateTime.UtcNow : null,
});
}
else
{
existing.Status = status;
existing.FailureReason = failureReason;
if (status == NodeDeploymentStatus.Applied) existing.AppliedAtUtc = DateTime.UtcNow;
}
db.SaveChanges();
}
catch (Exception ex)
{
_log.Warning(ex, "DriverHost {Node}: failed to upsert NodeDeploymentState for {Id}", _localNode, deploymentId);
}
}
private void SendAck(DeploymentId deploymentId, ApplyAckOutcome outcome, string? failureReason, CorrelationId correlation)
{
var ack = new ApplyAck(deploymentId, _localNode, outcome, failureReason, correlation);
if (_coordinatorOverride is not null)
{
_coordinatorOverride.Tell(ack);
}
else
{
// No direct coordinator handle — publish back through DistributedPubSub so the
// singleton routes it. The coordinator subscribes to its own incoming topic.
DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish(DeploymentsTopic, ack));
}
}
}