using System.Diagnostics;
using Akka.Actor;
using Akka.Cluster.Tools.PublishSubscribe;
using Akka.Event;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using ZB.MOM.WW.OtOpcUa.Commons.Engines;
using ZB.MOM.WW.OtOpcUa.Commons.Interfaces;
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Admin;
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Alerts;
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Deploy;
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Fleet;
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Redundancy;
using ZB.MOM.WW.OtOpcUa.Commons.Observability;
using ZB.MOM.WW.OtOpcUa.Commons.OpcUa;
using ZB.MOM.WW.OtOpcUa.Commons.Types;
using ZB.MOM.WW.OtOpcUa.Configuration;
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
using ZB.MOM.WW.OtOpcUa.Configuration.Enums;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms;
using ZB.MOM.WW.OtOpcUa.Core.Scripting;
using ZB.MOM.WW.OtOpcUa.Core.VirtualTags;
using ZB.MOM.WW.OtOpcUa.OpcUaServer;
using ZB.MOM.WW.OtOpcUa.Runtime.ScriptedAlarms;
using ZB.MOM.WW.OtOpcUa.Runtime.VirtualTags;
using CommonsNodeId = ZB.MOM.WW.OtOpcUa.Commons.Types.NodeId;
namespace ZB.MOM.WW.OtOpcUa.Runtime.Drivers;
///
/// Per-node supervisor that receives from the admin-role
/// coordinator and applies the deployment locally. Three Become states:
///
///
/// - Bootstrapping — PreStart only. Reads for self;
/// chooses next state.
/// - Steady(rev) — caught up. Idempotent on same-rev dispatch (immediate ApplyAck.Applied).
/// New rev → transitions to Applying.
/// - Applying(id) — applying a delta. Buffers further dispatches.
/// - Stale — ConfigDb unreachable on bootstrap. Background reconnect loop tries to advance.
///
///
/// Children (DriverInstance/VirtualTag/etc.) are fully wired: DispatchDeployment drives
/// which runs the spawn-plan via ,
/// maintains the FullName→NodeId live-value routing map, and forwards results to the
/// OPC UA publish actor via ForwardToMux.
///
public sealed class DriverHostActor : ReceiveActor, IWithTimers
{
public const string DeploymentsTopic = "deployments";
public const string DeploymentAcksTopic = "deployment-acks";
public const string DriverControlTopic = ZB.MOM.WW.OtOpcUa.Commons.Messages.Admin.DriverControlTopic.Name;
public static readonly TimeSpan ReconnectInterval = TimeSpan.FromSeconds(30);
/// Publishing interval handed to each driver's SubscribeBulk pass after an apply.
private static readonly TimeSpan SubscriptionPublishingInterval = TimeSpan.FromSeconds(1);
private readonly IDbContextFactory _dbFactory;
private readonly CommonsNodeId _localNode;
private readonly IActorRef? _coordinatorOverride;
private readonly IDriverFactory _driverFactory;
private readonly IReadOnlySet _localRoles;
private readonly IActorRef? _dependencyMux;
private readonly IActorRef? _opcUaPublishActor;
private readonly IDriverHealthPublisher _healthPublisher;
private readonly IVirtualTagEvaluator _virtualTagEvaluator;
private readonly IHistoryWriter _historyWriter;
private readonly IActorRef? _virtualTagHostOverride;
private readonly ILoggerFactory _loggerFactory;
private readonly ScriptRootLogger? _scriptRootLogger;
private readonly IActorRef? _scriptedAlarmHostOverride;
private readonly ILoggingAdapter _log = Context.GetLogger();
/// The single VirtualTag-host child that spawns/reconciles Equipment-namespace
/// VirtualTagActors and bridges their results onto the OPC UA publish actor. Spawned in
/// when an OPC UA publish actor is wired; receives
/// from .
private IActorRef? _virtualTagHost;
/// The single ScriptedAlarm-host child that owns the per-node
/// , feeds it live tag values, and bridges its emissions onto the
/// OPC UA publish actor + the cluster alerts topic. Spawned in
/// alongside the VirtualTag host (requires both an OPC UA publish actor and a
/// ); receives
/// from .
private IActorRef? _scriptedAlarmHost;
private RevisionHash? _currentRevision;
private DeploymentId? _applyingDeploymentId;
private readonly Dictionary _children = new(StringComparer.Ordinal);
// Monotonic counter feeding the child actor-name suffix (see ActorNameFor / SpawnChild). Single-
// threaded actor, so a plain increment is safe; it only ever grows, guaranteeing a unique name per
// spawn so a restart's respawn never collides with the still-terminating old child.
private long _childSpawnGeneration;
///
/// Driver live-value routing map: (DriverInstanceId, FullName) → folder-scoped equipment
/// NodeId(s). Rebuilt every apply by from the
/// composition's EquipmentTags (mirroring VirtualTagHostActor._nodeIdByVtag), and
/// resolved in so a driver value published by wire-ref FullName lands
/// on the variable's actual folder-scoped NodeId. A set because the same driver ref can back
/// several equipment variables (e.g. identical machines sharing a register), and the per-apply
/// rebuild dedups by NodeId.
///
private readonly Dictionary<(string DriverInstanceId, string FullName), HashSet> _nodeIdByDriverRef = new();
///
/// Inverse of : folder-scoped equipment NodeId →
/// (DriverInstanceId, FullName). Rebuilt every apply by
/// from the same EquipmentTags pass, and resolved by so an
/// inbound operator write targeting an equipment variable's NodeId is forwarded to the owning
/// driver child as a write of its wire-ref FullName. Each NodeId maps to exactly one driver
/// ref (a variable is backed by a single driver attribute), so this is a flat 1:1 map (the forward
/// map fans out 1:N because one ref can back several variables).
///
private readonly Dictionary _driverRefByNodeId =
new(StringComparer.Ordinal);
/// (DriverInstanceId, FullName = alarm ConditionId / AlarmFullReference) → folder-scoped condition NodeId(s).
/// Built from EquipmentTags whose plan carries Alarm, alongside the value maps; resolves a native
/// alarm transition to the materialised Part 9 condition node(s). Alarm tags are conditions, not
/// value variables, so they are kept OUT of the value maps + value-subscription set.
private readonly Dictionary<(string DriverInstanceId, string FullName), HashSet> _alarmNodeIdByDriverRef = new();
///
/// Inverse of : folder-scoped condition NodeId →
/// (DriverInstanceId, FullName = alarm ConditionId/AlarmFullReference). Built in the SAME apply
/// pass from the alarm-bearing EquipmentTags, and resolved by
/// so an inbound OPC UA acknowledge of a native condition (resolved to the condition's NodeId by the
/// node manager) is forwarded to the owning driver child as an acknowledge of its wire-ref
/// FullName. Each condition NodeId maps to exactly one driver ref (a condition is backed by a
/// single driver alarm), so this is a flat 1:1 map (the forward map fans out 1:N because one ref can
/// back several conditions on identical machines).
///
private readonly Dictionary _driverRefByAlarmNodeId =
new(StringComparer.Ordinal);
/// Condition NodeId → (EquipmentId, tag Name, OPC UA alarm type, HistorizeToAveva) for building
/// the AlarmTransitionEvent fan-out. Built in the same PushDesiredSubscriptions alarm branch.
/// HistorizeToAveva (bool?, null ⇒ historize) is the native per-condition opt-out parsed from
/// TagConfig.alarm.historizeToAveva; it is threaded onto the transition so the
/// HistorianAdapterActor's is not false gate suppresses the durable AVEVA row only on an
/// explicit false (mirroring the scripted-alarm opt-out).
private readonly Dictionary _alarmMetaByNodeId =
new(StringComparer.Ordinal);
/// Derives a full Part 9 condition snapshot from each native alarm transition delta,
/// tracking per-condition-NodeId prior state. 'd on every apply alongside the
/// value maps so stale condition state never leaks across redeploys.
private readonly NativeAlarmProjector _nativeAlarmProjector = new();
/// The composition from the most-recent apply (set at the END of
/// ). Discovered-node injection
/// () reads it to resolve the equipment bound to a driver (from the
/// composition's EquipmentNodes whose DriverInstanceId matches, UNION the authored
/// EquipmentTags for that driver — so a driver with zero authored tags can still graft onto an
/// equipment bound via EquipmentNode.DriverInstanceId) and to recompute the authored value + alarm
/// subscription sets when merging FixedTree refs. Null until the first apply — a
/// arriving before any apply is ignored.
private AddressSpaceComposition? _lastComposition;
/// The most-recent discovered-injection plan(s) per driver instance, cached so the redeploy
/// re-inject tail can re-apply the live graft after an address-space rebuild without re-running discovery.
/// Keyed by DriverInstanceId at the OUTER level, then by EquipmentId at the INNER level (driver → (equipment
/// → plan)). Today only the single-equipment case is populated, so the inner map always has exactly one
/// entry; the inner map is shaped per-equipment now so the follow-up multi-device-partition task can hold
/// multiple (equipmentId → plan) entries per driver without reshaping this cache. Inner dict is mutable
/// (the redeploy tail drops stale per-equipment entries in place); both levels are Ordinal-keyed.
/// Last-writer-wins on a re-discovery (the whole inner map is replaced).
private readonly Dictionary> _discoveredByDriver =
new(StringComparer.Ordinal);
///
/// Cached local from the latest
/// snapshot (null = unknown until the first snapshot arrives, or no local node match). The inbound
/// write gate in reuses this signal — the SAME one the
/// scripted-alarm emit gate uses (ScriptedAlarmHostActor._localRole): only the Primary
/// services writes, default-allow while unknown so single-node deploys + the boot window never
/// reject (a node is the sole Primary until told otherwise).
///
private RedundancyRole? _localRole;
/// Cached cluster DistributedPubSub mediator, resolved once in (on the
/// actor thread) and reused for the Primary-gated native-alarm alerts fan-out in
/// instead of re-resolving it per-publish. Mirrors
/// ScriptedAlarmHostActor._mediator.
private IActorRef _mediator = null!;
///
/// Routes an inbound operator write (resolved from the OPC UA node-manager side, Task 11) to the
/// owning driver child. gates on the local node being the
/// driver Primary, resolves the reverse map, and Asks the child a
/// carrying the driver-side .
///
/// The folder-scoped equipment-variable NodeId the operator wrote to.
/// The value to write (the driver coerces it to the attribute's data type).
public sealed record RouteNodeWrite(string NodeId, object? Value);
/// Reply to : the outcome of forwarding the write to the driver
/// (or a gate/lookup failure that never reached the driver).
/// True when the driver accepted the write.
/// Failure detail when is false; null on success.
public sealed record NodeWriteResult(bool Success, string? Reason);
///
/// Routes an inbound native-condition acknowledge to the owning driver child. The host wires the
/// OPC UA node manager's NativeAlarmAckRouter to Tell this in when a client Acknowledges a
/// NATIVE Part 9 condition; applies the same Primary gate the
/// inbound write path uses, resolves the inverse map, and Tells
/// the owning a
/// carrying the principal — fire-and-forget (the Part 9 ack already committed the local condition
/// state). Deliberately decoupled from the OpcUaServer NativeAlarmAck type: the host maps
/// NativeAlarmAck → this at the wiring boundary so Runtime does not depend on the OPC UA layer.
///
/// The folder-scoped condition NodeId the operator acknowledged.
/// Operator-supplied comment forwarded to the upstream alarm system; null when none.
/// The authenticated principal performing the acknowledge.
public sealed record RouteNativeAlarmAck(string ConditionNodeId, string? Comment, string OperatorUser);
private sealed record ChildEntry(IActorRef Actor, DriverInstanceSpec Spec, bool Stubbed)
{
// Convenience accessors for sites that don't need the full spec.
public string DriverType => Spec.DriverType;
public string LastConfigJson => Spec.DriverConfig;
}
///
public ITimerScheduler Timers { get; set; } = null!;
public sealed class RetryConfigDbConnection
{
public static readonly RetryConfigDbConnection Instance = new();
private RetryConfigDbConnection() { }
}
/// Creates props for a DriverHostActor with the specified dependencies.
/// Database context factory for configuration database access.
/// The local cluster node identifier.
/// Optional coordinator actor reference for deployment coordination.
/// Optional driver factory; defaults to null factory if not provided.
/// Optional set of roles assigned to the local node.
/// Optional actor reference for dependency multiplexing.
/// Optional actor reference for OPC UA publishing.
/// Optional driver-health publisher; defaults to
/// so test harnesses and smoke fixtures don't need to wire it.
/// Optional evaluator handed to the spawned
/// 's children; defaults to
/// (the dev/Mac path where no expression is evaluated). Production passes the DI-resolved
/// Roslyn evaluator.
/// Optional sink handed to the spawned
/// for VirtualTag results whose plan opted into Historize=true; defaults to
/// (the durable AVEVA sink is infra-gated, so no live-data historian
/// write RPC exists). A deployment that binds a real in DI overrides it.
/// Test seam: when supplied, this actor is used as the
/// VirtualTag host instead of spawning a real child, so tests
/// can intercept the message. Null in
/// production (the real host is spawned).
/// Optional logger factory used to create the
/// 's logger when spawning the ScriptedAlarm host;
/// defaults to when not provided.
/// Optional root script logger required to spawn the ScriptedAlarm
/// host (the engine + its script logging hang off it). When null the ScriptedAlarm host is left
/// unspawned — the graceful dev/None-deployment path.
/// Test seam: when supplied, this actor is used as the
/// ScriptedAlarm host instead of spawning a real child, so
/// tests can intercept the message. Null
/// in production (the real host is spawned).
public static Props Props(
IDbContextFactory dbFactory,
CommonsNodeId localNode,
IActorRef? coordinator = null,
IDriverFactory? driverFactory = null,
IReadOnlySet? localRoles = null,
IActorRef? dependencyMux = null,
IActorRef? opcUaPublishActor = null,
IDriverHealthPublisher? healthPublisher = null,
IVirtualTagEvaluator? virtualTagEvaluator = null,
IHistoryWriter? historyWriter = null,
IActorRef? virtualTagHostOverride = null,
ILoggerFactory? loggerFactory = null,
ScriptRootLogger? scriptRootLogger = null,
IActorRef? scriptedAlarmHostOverride = null) =>
Akka.Actor.Props.Create(() => new DriverHostActor(
dbFactory, localNode, coordinator, driverFactory, localRoles, dependencyMux, opcUaPublishActor,
healthPublisher, virtualTagEvaluator, historyWriter, virtualTagHostOverride,
loggerFactory, scriptRootLogger, scriptedAlarmHostOverride));
/// Initializes a new DriverHostActor with the specified dependencies.
/// Database context factory for configuration database access.
/// The local cluster node identifier.
/// Optional coordinator actor reference for deployment coordination.
/// Optional driver factory; defaults to null factory if not provided.
/// Optional set of roles assigned to the local node.
/// Optional actor reference for dependency multiplexing.
/// Optional actor reference for OPC UA publishing.
/// Optional driver-health publisher; defaults to .
/// Optional evaluator handed to the VirtualTag host's children;
/// defaults to .
/// Optional sink handed to the spawned
/// for historized VirtualTag results; defaults to .
/// Test seam: when supplied, used as the VirtualTag host
/// instead of spawning a real child.
/// Optional logger factory used to create the
/// 's logger; defaults to .
/// Optional root script logger required to spawn the ScriptedAlarm
/// host; when null the host is left unspawned.
/// Test seam: when supplied, used as the ScriptedAlarm host
/// instead of spawning a real child.
public DriverHostActor(
IDbContextFactory dbFactory,
CommonsNodeId localNode,
IActorRef? coordinator,
IDriverFactory? driverFactory = null,
IReadOnlySet? localRoles = null,
IActorRef? dependencyMux = null,
IActorRef? opcUaPublishActor = null,
IDriverHealthPublisher? healthPublisher = null,
IVirtualTagEvaluator? virtualTagEvaluator = null,
IHistoryWriter? historyWriter = null,
IActorRef? virtualTagHostOverride = null,
ILoggerFactory? loggerFactory = null,
ScriptRootLogger? scriptRootLogger = null,
IActorRef? scriptedAlarmHostOverride = null)
{
_dbFactory = dbFactory;
_localNode = localNode;
_coordinatorOverride = coordinator;
_driverFactory = driverFactory ?? NullDriverFactory.Instance;
_localRoles = localRoles ?? new HashSet(StringComparer.Ordinal);
_dependencyMux = dependencyMux;
_opcUaPublishActor = opcUaPublishActor;
_healthPublisher = healthPublisher ?? NullDriverHealthPublisher.Instance;
_virtualTagEvaluator = virtualTagEvaluator ?? NullVirtualTagEvaluator.Instance;
_historyWriter = historyWriter ?? NullHistoryWriter.Instance;
_virtualTagHostOverride = virtualTagHostOverride;
_loggerFactory = loggerFactory ?? NullLoggerFactory.Instance;
_scriptRootLogger = scriptRootLogger;
_scriptedAlarmHostOverride = scriptedAlarmHostOverride;
// Default behavior is Steady — PreStart may flip to Stale or replay an orphan apply.
Become(Steady);
}
///
protected override void PreStart()
{
// Resolve the cluster DPS mediator once, on the actor thread, and reuse it for every subscribe +
// the Primary-gated native-alarm alerts fan-out in ForwardNativeAlarm (mirrors ScriptedAlarmHostActor).
_mediator = DistributedPubSub.Get(Context.System).Mediator;
// Subscribe to deployments topic so the coordinator's broadcast lands here.
_mediator.Tell(new Subscribe(DeploymentsTopic, Self));
// Subscribe to driver-control topic so AdminUI Reconnect/Restart commands land here.
_mediator.Tell(new Subscribe(DriverControlTopic, Self));
// Subscribe to the redundancy-state topic so cluster role changes cache this node's role — the
// inbound-write gate in HandleRouteNodeWrite reuses the SAME signal the scripted-alarm emit gate
// uses so only the Primary services operator writes (the secondary keeps state warm for failover).
_mediator.Tell(
new Subscribe(ZB.MOM.WW.OtOpcUa.Runtime.OpcUa.OpcUaPublishActor.RedundancyStateTopic, Self));
// Spawn the VirtualTag host BEFORE Bootstrap so the bootstrap-restore path (which routes
// through PushDesiredSubscriptions and Tells ApplyVirtualTags) has a live host to target.
SpawnVirtualTagHost();
// Same rationale for the ScriptedAlarm host — the bootstrap-restore path Tells it
// ApplyScriptedAlarms through the same PushDesiredSubscriptions pass.
SpawnScriptedAlarmHost();
Bootstrap();
}
///
/// Spawns the single child that owns the Equipment-namespace
/// VirtualTagActors and bridges their results onto the OPC UA publish actor. A test-supplied
/// virtualTagHostOverride short-circuits the spawn so a probe can intercept
/// . The real host requires a non-null
/// (its ctor throws otherwise), so when no publish actor is
/// wired (legacy ControlPlane test harnesses with no OPC UA sink) the host is left null and
/// ApplyVirtualTags becomes a no-op — VirtualTags can't have anywhere to publish without it.
///
private void SpawnVirtualTagHost()
{
if (_virtualTagHostOverride is not null)
{
_virtualTagHost = _virtualTagHostOverride;
return;
}
if (_opcUaPublishActor is null)
{
_log.Debug("DriverHost {Node}: no OPC UA publish actor wired; skipping VirtualTag host spawn", _localNode);
return;
}
_virtualTagHost = Context.ActorOf(
VirtualTagHostActor.Props(_opcUaPublishActor, _dependencyMux, _virtualTagEvaluator, _historyWriter),
"virtual-tag-host");
}
///
/// Spawns the single child that owns the per-node
/// , feeds it live tag values from the dependency mux, and
/// bridges its emissions onto the OPC UA publish actor + the cluster alerts topic. A
/// test-supplied scriptedAlarmHostOverride short-circuits the spawn so a probe can
/// intercept . The real host needs
/// both a non-null (the emission sink) and a non-null
/// (the engine + its script logging hang off it); when either
/// is missing (legacy ControlPlane test harnesses, dev/None deployments) the host is left
/// null and ApplyScriptedAlarms becomes a no-op. The engine is built around a fresh
/// + an ;
/// the host (spawned as a child) owns + disposes the engine in its PostStop, so it stops with
/// the driver host.
///
private void SpawnScriptedAlarmHost()
{
if (_scriptedAlarmHostOverride is not null)
{
_scriptedAlarmHost = _scriptedAlarmHostOverride;
return;
}
if (_opcUaPublishActor is null || _scriptRootLogger is null)
{
_log.Debug(
"DriverHost {Node}: skipping ScriptedAlarm host spawn (no publish actor / root logger)",
_localNode);
return;
}
var upstream = new DependencyMuxTagUpstreamSource();
var store = new EfAlarmConditionStateStore(
_dbFactory, _loggerFactory.CreateLogger());
var engine = new ScriptedAlarmEngine(
upstream, store, new ScriptLoggerFactory(_scriptRootLogger.Logger), _scriptRootLogger.Logger);
_scriptedAlarmHost = Context.ActorOf(
ScriptedAlarmHostActor.Props(_opcUaPublishActor, _dependencyMux, upstream, engine, _localNode),
"scripted-alarm-host");
}
private void Bootstrap()
{
// Read the most-recent NodeDeploymentState for this node; if it's Applied, jump
// to Steady with that revision. If Applying (orphan from a crash), discard and replay.
// If the DB is unreachable, fall back to Stale and start the reconnect loop.
try
{
using var db = _dbFactory.CreateDbContext();
var latest = db.NodeDeploymentStates
.Where(s => s.NodeId == _localNode.Value)
.OrderByDescending(s => s.StartedAtUtc)
.Select(s => new { s.DeploymentId, s.Status, s.StartedAtUtc })
.FirstOrDefault();
if (latest is null)
{
_log.Info("DriverHost {Node}: no prior deployments; entering Steady (no revision)", _localNode);
Become(Steady);
return;
}
var deployment = db.Deployments
.AsNoTracking()
.FirstOrDefault(d => d.DeploymentId == latest.DeploymentId);
var revision = deployment is null
? (RevisionHash?)null
: RevisionHash.Parse(deployment.RevisionHash);
switch (latest.Status)
{
case NodeDeploymentStatus.Applied:
_currentRevision = revision;
_log.Info("DriverHost {Node}: recovered Applied state at rev {Rev}", _localNode, revision);
Become(Steady);
// The revision is recovered but the in-memory driver children + OPC UA address
// space were lost on restart. Re-spawn + re-materialise + re-subscribe from the
// applied deployment so a restarted/rebuilt node restores its served state instead
// of waiting for a config change (whose identical-config revision would no-op).
RestoreApplied(new DeploymentId(latest.DeploymentId));
break;
case NodeDeploymentStatus.Applying:
_log.Warning("DriverHost {Node}: found orphan Applying row for deployment {Id}; replaying",
_localNode, latest.DeploymentId);
if (revision is not null) ApplyAndAck(new DeploymentId(latest.DeploymentId), revision.Value, CorrelationId.NewId());
else Become(Steady);
break;
case NodeDeploymentStatus.Failed:
default:
_log.Info("DriverHost {Node}: prior deployment {Id} failed; entering Steady at last known rev",
_localNode, latest.DeploymentId);
Become(Steady);
break;
}
}
catch (Exception ex)
{
_log.Warning(ex, "DriverHost {Node}: ConfigDb unreachable on bootstrap; entering Stale", _localNode);
Become(Stale);
}
}
private void Steady()
{
Receive(HandleDispatchFromSteady);
Receive(HandleGetDiagnostics);
Receive(ForwardToMux);
Receive(ForwardNativeAlarm);
Receive(HandleDiscoveredNodes);
Receive(HandleRestartDriver);
Receive(HandleReconnectDriver);
Receive(HandleRouteNodeWrite);
Receive(HandleRouteNativeAlarmAck);
Receive(OnRedundancyStateChanged);
// The redundancy-state topic also carries OpcUaProbeResult (OpcUaPublishActor peer-probes).
// We don't consume it here — drop it so it doesn't dead-letter (matches PeerProbeSupervisor).
Receive(_ => { });
Receive(_ => { /* PubSub ack */ });
}
private void Applying()
{
Receive(msg =>
{
if (_applyingDeploymentId is not null && msg.DeploymentId == _applyingDeploymentId.Value)
{
_log.Debug("DriverHost {Node}: duplicate DispatchDeployment for in-flight {Id}; ignoring",
_localNode, msg.DeploymentId);
return;
}
_log.Info("DriverHost {Node}: dispatch for {Id} received while still applying {Cur}; deferring",
_localNode, msg.DeploymentId, _applyingDeploymentId);
Self.Forward(msg); // re-deliver after we transition back
});
Receive(HandleGetDiagnostics);
Receive(ForwardToMux);
Receive(ForwardNativeAlarm);
Receive(HandleDiscoveredNodes);
Receive(HandleRestartDriver);
Receive(HandleReconnectDriver);
Receive(HandleRouteNodeWrite);
Receive(HandleRouteNativeAlarmAck);
Receive(OnRedundancyStateChanged);
// The redundancy-state topic also carries OpcUaProbeResult (OpcUaPublishActor peer-probes).
// We don't consume it here — drop it so it doesn't dead-letter (matches PeerProbeSupervisor).
Receive(_ => { });
Receive(_ => { /* PubSub ack */ });
}
private void ForwardToMux(DriverInstanceActor.AttributeValuePublished msg)
{
// Pass driver-published values to the dependency mux when one is wired (VirtualTag inputs,
// keyed by FullReference). Without a mux, VirtualTagActor evaluation can't fire — that's the
// dev/Mac path (no virtual tags registered); production binds the mux via the RuntimeActors
// extension. KEEP this unchanged — VirtualTag inputs are still keyed by FullReference.
_dependencyMux?.Tell(msg);
if (_opcUaPublishActor is null) return;
// Route the value to the OPC UA sink at the variable's ACTUAL NodeId. Equipment-tag variables
// are materialised with folder-scoped NodeIds (EquipmentId/FolderPath/Name), while the driver
// publishes keyed by its wire-ref FullName (FullReference). The _nodeIdByDriverRef map — built
// each apply from the composition's EquipmentTags — resolves (DriverInstanceId, FullName) to
// the folder-scoped NodeId(s) the materialiser placed the variable(s) at, so the value lands
// instead of leaving the variable at BadWaitingForInitialData. One driver ref can back several
// equipment variables (identical machines sharing a register), hence the fan-out.
if (_nodeIdByDriverRef.TryGetValue((msg.DriverInstanceId, msg.FullReference), out var nodeIds))
{
foreach (var nodeId in nodeIds)
_opcUaPublishActor.Tell(new ZB.MOM.WW.OtOpcUa.Runtime.OpcUa.OpcUaPublishActor.AttributeValueUpdate(
nodeId, msg.Value, msg.Quality, msg.TimestampUtc));
}
else
{
_log.Debug("DriverHost {Node}: no equipment-tag NodeId for ({Driver},{Ref}) — value dropped",
_localNode, msg.DriverInstanceId, msg.FullReference);
}
}
///
/// Handles a driver child's post-connect :
/// resolves the equipment the driver is bound to from the most-recent applied composition (its
/// EquipmentNodes bound by DriverInstanceId UNION its authored EquipmentTags),
/// maps the captured FixedTree under it via (deduping any node that
/// shadows an authored equipment-tag ref), caches the per-equipment plan map, and grafts it onto the
/// served address space + live-value maps + subscription set via
/// . Idempotent / duplicate-safe: the mapper is pure,
/// materialisation is idempotent, and the routing-map extension + subscription merge are set-based.
///
private void HandleDiscoveredNodes(DriverInstanceActor.DiscoveredNodesReady msg)
{
if (_lastComposition is null)
{
_log.Debug("DriverHost {Node}: DiscoveredNodesReady from {Driver} before any composition applied — ignored",
_localNode, msg.DriverInstanceId);
return;
}
// Resolve the equipment bound to this driver from BOTH the composition's EquipmentNodes (whose
// DriverInstanceId matches — this lets a driver with ZERO authored tags graft onto a tag-less
// equipment) UNION the authored EquipmentTags for the driver (the original resolution). Distinct so a
// driver that is both EquipmentNode-bound AND has authored tags under the same equipment resolves once.
var fromNodes = _lastComposition.EquipmentNodes
.Where(e => e.DriverInstanceId is not null && string.Equals(e.DriverInstanceId, msg.DriverInstanceId, StringComparison.Ordinal))
.Select(e => e.EquipmentId);
var fromTags = _lastComposition.EquipmentTags
.Where(t => string.Equals(t.DriverInstanceId, msg.DriverInstanceId, StringComparison.Ordinal))
.Select(t => t.EquipmentId);
var equipmentIds = fromNodes.Concat(fromTags).Distinct(StringComparer.Ordinal).ToList();
if (equipmentIds.Count == 0)
{
_log.Info("DriverHost {Node}: no equipment for driver {Driver} — skipping discovered-node injection",
_localNode, msg.DriverInstanceId);
return;
}
if (equipmentIds.Count > 1)
{
// NEXT TASK (multi-device partition) REPLACES THIS BRANCH: a driver that fans out to multiple
// equipments (one per device-host) will partition its discovered FixedTree by DeviceHost and graft
// each partition under its matching equipment, populating multiple inner-map entries. Until then we
// keep the conservative warn+skip — a single-equipment graft is the only shape this task handles.
_log.Warning("DriverHost {Node}: driver {Driver} maps to {Count} equipments — discovered-node injection skipped (multi-equipment-per-driver is a follow-up)",
_localNode, msg.DriverInstanceId, equipmentIds.Count);
return;
}
var equipmentId = equipmentIds[0];
// Authored refs for THIS driver (both value + alarm tags) so a discovered node never shadows an
// authored one — the mapper drops any captured node whose FullReference is already authored. May be
// EMPTY for a tag-less equipment, which is fine: Map dedups against an empty set (keeps everything).
var authoredRefs = _lastComposition.EquipmentTags
.Where(t => string.Equals(t.DriverInstanceId, msg.DriverInstanceId, StringComparison.Ordinal))
.Select(t => t.FullName)
.ToHashSet(StringComparer.Ordinal);
var plan = DiscoveredNodeMapper.Map(equipmentId, msg.Nodes, authoredRefs);
if (plan.Variables.Count == 0) return; // nothing new to inject (all captured nodes were authored)
// The driver's per-equipment plan map for this discovery. Single-equipment today ⇒ one entry; the
// multi-device task will add an entry per partitioned equipment here.
var newPlans = new Dictionary(StringComparer.Ordinal) { [equipmentId] = plan };
// Unchanged-plan short-circuit: the driver re-discovers every ~2s (up to ~15 passes) until the
// FixedTree set stabilises, re-sending DiscoveredNodesReady each pass. Re-applying an IDENTICAL set
// would re-send SetDesiredSubscriptions, forcing the child to UnsubscribeAsync (dropping the WHOLE
// handle — authored tags included) then re-Subscribe — blipping authored-tag values up to ~15× across
// the discovery window. Skip when the WHOLE per-equipment routing is unchanged from the last applied
// pass; a GROWING set still differs (superset) and re-applies. This is _discoveredByDriver's first reader.
if (_discoveredByDriver.TryGetValue(msg.DriverInstanceId, out var cached)
&& PlansRoutingEqual(cached, newPlans))
{
_log.Debug("DriverHost {Node}: discovered set for driver {Driver} unchanged ({Count} node(s)) — re-apply skipped",
_localNode, msg.DriverInstanceId, plan.Variables.Count);
return;
}
_discoveredByDriver[msg.DriverInstanceId] = newPlans;
ApplyDiscoveredPlansForDriver(msg.DriverInstanceId, newPlans);
}
/// Routing-map equality: same count + every key maps to the same NodeId. Lets
/// skip re-applying an unchanged discovered set across the driver's
/// repeated post-connect re-discovery passes (a grown/changed set differs and re-applies).
private static bool RoutingEquals(IReadOnlyDictionary a, IReadOnlyDictionary b)
=> a.Count == b.Count
&& a.All(kv => b.TryGetValue(kv.Key, out var v) && string.Equals(v, kv.Value, StringComparison.Ordinal));
/// Per-equipment plan-map routing equality: same equipment keys + each equipment's plan has the
/// same (via ). Lets
/// short-circuit a re-discovery whose WHOLE per-driver set is unchanged
/// (a grown/changed set on any equipment differs and re-applies).
private static bool PlansRoutingEqual(
IReadOnlyDictionary a,
IReadOnlyDictionary b)
=> a.Count == b.Count
&& a.All(kv => b.TryGetValue(kv.Key, out var p) && RoutingEquals(kv.Value.RoutingByRef, p.RoutingByRef));
///
/// Grafts a driver's per-equipment map onto the served state in
/// two phases so the resubscribe stays a single push per driver (the shape the multi-device-partition
/// follow-up needs without resubscribe churn):
///
/// - Materialise per equipment — for each (equipmentId, plan) entry, extend the
/// live-value routing map (mirroring ' fan-out so
/// lands FixedTree values on the right node) and Tell the publish actor
/// for
/// that equipment (idempotent).
/// - Subscribe ONCE per driver — compute the union of the driver's authored value refs
/// (recomputed the same way does) and the FixedTree refs of
/// ALL the driver's cached plans, then Tell the child a single
/// so the poll engine reads them and the
/// values flow. For a single-equipment driver this equals the prior per-plan behavior.
///
/// Extracted as a standalone method so the redeploy re-inject tail can re-apply the cached plans after
/// an address-space rebuild without re-running discovery.
///
private void ApplyDiscoveredPlansForDriver(
string driverId, IReadOnlyDictionary plansByEquipment)
{
// (a) Per-equipment: extend the live-value routing map (fan-out, mirroring PushDesiredSubscriptions'
// pattern) + materialise the discovered folders + variables under that equipment (idempotent). This is
// purely ADDITIVE across passes: a shrinking discovery set would leave the dropped refs' stale routes
// until the next full apply (PushDesiredSubscriptions) clears + rebuilds the maps — acceptable because
// a FOCAS FixedTree only grows-then-stabilises, never shrinks within a connect.
var totalVariables = 0;
foreach (var (equipmentId, plan) in plansByEquipment)
{
foreach (var (driverRef, nodeId) in plan.RoutingByRef)
{
var key = (driverId, driverRef);
if (!_nodeIdByDriverRef.TryGetValue(key, out var set))
_nodeIdByDriverRef[key] = set = new HashSet(StringComparer.Ordinal);
set.Add(nodeId);
_driverRefByNodeId[nodeId] = key;
}
_opcUaPublishActor?.Tell(new ZB.MOM.WW.OtOpcUa.Runtime.OpcUa.OpcUaPublishActor.MaterialiseDiscoveredNodes(
equipmentId, plan.Folders, plan.Variables));
totalVariables += plan.Variables.Count;
}
// (b) ONE subscription push per driver: merge the FixedTree refs from ALL the driver's plans into the
// driver's desired subscription set so the poll engine reads them and ForwardToMux routes the values.
// Recompute the authored value + alarm refs the same way PushDesiredSubscriptions does, then union the
// FixedTree refs onto the value set. Doing the union here (rather than once per plan) means the
// multi-device task adds inner-map entries without changing this single-send shape.
if (!_children.TryGetValue(driverId, out var entry)) return;
// The _lastComposition null-guards below are defensive: HandleDiscoveredNodes already proved it
// non-null, but the redeploy tail also calls this from the PushDesiredSubscriptions tail — keep them
// so that re-apply path can't NRE.
var authoredValueRefs = _lastComposition is null
? Enumerable.Empty()
: _lastComposition.EquipmentTags
.Where(t => t.Alarm is null && string.Equals(t.DriverInstanceId, driverId, StringComparison.Ordinal))
.Select(t => t.FullName);
var alarmRefs = _lastComposition is null
? Array.Empty()
: _lastComposition.EquipmentTags
.Where(t => t.Alarm is not null && string.Equals(t.DriverInstanceId, driverId, StringComparison.Ordinal))
.Select(t => t.FullName)
.Distinct(StringComparer.Ordinal)
.ToArray();
var discoveredRefs = plansByEquipment.Values.SelectMany(p => p.RoutingByRef.Keys);
var union = authoredValueRefs.Concat(discoveredRefs).Distinct(StringComparer.Ordinal).ToArray();
entry.Actor.Tell(new DriverInstanceActor.SetDesiredSubscriptions(union, SubscriptionPublishingInterval, alarmRefs));
_log.Info("DriverHost {Node}: injected {Count} discovered node(s) for driver {Driver} across {Equipment} equipment(s)",
_localNode, totalVariables, driverId, plansByEquipment.Count);
}
///
/// Routes a native alarm transition (published by a driver child as
/// ) to its materialised Part 9 condition
/// node(s). The alarm path analogue of : the transition's ConditionId
/// (the dotted alarm full-reference, which equals the authored equipment-tag FullName — NOT the bare
/// SourceNodeId owning object) is resolved by the map —
/// built each apply from the alarm-bearing EquipmentTags — to the folder-scoped condition NodeId(s)
/// the materialiser placed the condition(s) at.
/// For each node the projects the transition delta into a full
/// AlarmConditionSnapshot, then this Tells
/// — the SAME message scripted alarms use, so it routes through WriteAlarmCondition. An
/// unknown ref is Debug-logged and dropped (mirrors the value drop).
///
///
/// Each transition is ALSO published as an to the cluster
/// alerts topic — the single historization + live /alerts fan-out path, exactly as
/// scripted alarms do (ScriptedAlarmHostActor.OnEngineEmission). The OPC UA condition
/// write above stays UNGATED (a Secondary keeps its address space warm for failover), but the
/// cluster-wide alerts publish is Primary-gated on the same redundancy
/// signal the inbound-write gate uses — only the Primary publishes the single fleet-wide copy.
///
///
private void ForwardNativeAlarm(DriverInstanceActor.AttributeAlarmPublished msg)
{
if (_opcUaPublishActor is null) return;
// Resolve on ConditionId, NOT SourceNodeId: for Galaxy the dotted alarm full-reference — which
// equals the authored equipment-tag FullName the map is keyed by — is carried in ConditionId
// (AlarmFullReference), while SourceNodeId is the bare owning object (SourceObjectReference).
if (!_alarmNodeIdByDriverRef.TryGetValue((msg.DriverInstanceId, msg.Args.ConditionId), out var nodeIds))
{
_log.Debug("DriverHost {Node}: no alarm condition for ({Driver},{Ref}) — transition dropped",
_localNode, msg.DriverInstanceId, msg.Args.ConditionId);
return;
}
foreach (var nodeId in nodeIds)
{
var snapshot = _nativeAlarmProjector.Project(nodeId, msg.Args);
_opcUaPublishActor.Tell(new ZB.MOM.WW.OtOpcUa.Runtime.OpcUa.OpcUaPublishActor.AlarmStateUpdate(
nodeId, snapshot, msg.Args.SourceTimestampUtc));
// Warm-standby dedup: the OPC UA condition write above is UNGATED (the secondary keeps its
// address space warm), but only the Primary publishes the cluster-wide alerts transition.
// Default-emit until told we are Secondary/Detached so single-node deploys + the boot window
// never drop transitions — the SAME signal HandleRouteNodeWrite gates writes on.
if (_localRole is RedundancyRole.Secondary or RedundancyRole.Detached) continue;
var meta = _alarmMetaByNodeId.TryGetValue(nodeId, out var m)
? m : (EquipmentId: nodeId, Name: nodeId, AlarmType: "AlarmCondition", HistorizeToAveva: (bool?)null);
_mediator.Tell(new Publish(ScriptedAlarmHostActor.AlertsTopic, new AlarmTransitionEvent(
AlarmId: nodeId,
EquipmentPath: meta.EquipmentId,
AlarmName: meta.Name,
TransitionKind: ToEventKind(msg.Args.Kind),
// The projector mapped the four-bucket AlarmSeverity onto the OPC UA 1..1000 scale already;
// reuse its ushort so the condition node + the alerts row agree on severity.
Severity: snapshot.Severity,
Message: msg.Args.Message,
// Native transitions are device-driven; an operator comment only rides along on an
// acknowledge from the upstream alarm system. "device" marks a non-operator origin.
User: msg.Args.OperatorComment is null ? string.Empty : "device",
TimestampUtc: msg.Args.SourceTimestampUtc,
AlarmTypeName: meta.AlarmType,
Comment: msg.Args.OperatorComment,
// Per-condition opt-out parsed from TagConfig.alarm.historizeToAveva (bool?, null ⇒ absent ⇒
// historize). The HistorianAdapterActor gate (historizeToAveva is not false) historizes null +
// true and suppresses the durable AVEVA row only on an explicit false — the same posture as the
// scripted-alarm opt-out. null here rides through unchanged (the gate treats it as default-on).
HistorizeToAveva: meta.HistorizeToAveva)));
}
}
/// Maps a native onto the canonical alarm event-kind
/// vocabulary scripted alarms emit (the EmissionKind names) so a native row renders with the
/// correct chip on the /alerts page and historizes into the same EventKind column as
/// scripted alarms. An unmapped/unknown transition surfaces as Activated (visible) rather than
/// a grey unknown label.
private static string ToEventKind(AlarmTransitionKind kind) => kind switch
{
AlarmTransitionKind.Raise or AlarmTransitionKind.Retrigger => "Activated",
AlarmTransitionKind.Clear => "Cleared",
AlarmTransitionKind.Acknowledge => "Acknowledged",
_ => "Activated",
};
///
/// Routes an inbound operator write (Task 11 Asks this from the OPC UA node-manager side) to the
/// owning driver child. Order matters:
///
/// - PRIMARY gate FIRST — reuses the same redundancy signal the
/// scripted-alarm emit gate uses. Only the Primary services writes (default-allow while the
/// role is unknown, so single-node deploys + the boot window never reject). A Secondary/Detached
/// node keeps its address space + driver state warm for failover but must NOT push writes to the
/// shared field device.
/// - Resolve the reverse map to the owning
/// (DriverInstanceId, FullName).
/// - Resolve the running driver child.
/// - Ask the child a bounded of the driver-side
/// FullName and pipe the translated
/// result back to the asker.
///
/// Every branch replies the asker a exactly once.
///
private void HandleRouteNodeWrite(RouteNodeWrite msg)
{
// PRIMARY GATE FIRST — only the Primary services operator writes (same signal as the alarm-emit
// gate; unknown role ⇒ treated as Primary so single-node deploys + the boot window aren't blocked).
if (_localRole is RedundancyRole.Secondary or RedundancyRole.Detached)
{
Sender.Tell(new NodeWriteResult(false, "not primary"));
return;
}
if (!_driverRefByNodeId.TryGetValue(msg.NodeId, out var target))
{
Sender.Tell(new NodeWriteResult(false, $"no driver mapping for node {msg.NodeId}"));
return;
}
if (!_children.TryGetValue(target.DriverInstanceId, out var entry))
{
Sender.Tell(new NodeWriteResult(false, "driver not running"));
return;
}
// Ask the child a bounded write — DriverInstanceActor.HandleWriteAsync already bounds the backend
// call to 5s, so the 8s Ask is a safety net for an actor that never replies. Capture Sender before
// the continuation: it runs off the actor thread, where raw Sender is unsafe to read.
var replyTo = Sender;
entry.Actor.Ask(
new DriverInstanceActor.WriteAttribute(target.FullName, msg.Value!), TimeSpan.FromSeconds(8))
.ContinueWith(
t => t.IsCompletedSuccessfully
? new NodeWriteResult(t.Result.Success, t.Result.Reason)
: new NodeWriteResult(false, "write timeout"),
CancellationToken.None,
TaskContinuationOptions.None,
TaskScheduler.Default)
.PipeTo(replyTo);
}
///
/// Routes an inbound native-condition acknowledge (the host Tells this from the OPC UA node-manager
/// side when a client Acknowledges a NATIVE Part 9 condition) to the owning driver child. Mirrors
/// 's order + gating, but the ack is fire-and-forget (no reply):
/// the OPC UA Part 9 acknowledge already committed the local condition state and the driver's
/// returns no per-condition status.
///
/// - PRIMARY gate FIRST — reuses the same redundancy signal as the
/// inbound-write gate. Only the Primary pushes the ack to the shared upstream alarm system
/// (default-allow while the role is unknown). A Secondary/Detached node keeps its condition
/// state warm for failover but must NOT push the command. Drop (debug-log) on non-primary.
/// - Resolve the inverse map to the owning
/// (DriverInstanceId, FullName = ConditionId); an unmapped node is debug-logged + dropped
/// (no throw) — mirrors 's unknown-ref drop.
/// - Resolve the running driver child and Tell it a
/// carrying the wire-ref FullName + principal.
///
///
private void HandleRouteNativeAlarmAck(RouteNativeAlarmAck msg)
{
// PRIMARY GATE FIRST — only the Primary services operator acks (same signal as the inbound-write +
// alarm-emit gates; unknown role ⇒ treated as Primary so single-node deploys + the boot window aren't blocked).
if (_localRole is RedundancyRole.Secondary or RedundancyRole.Detached)
{
_log.Debug("DriverHost {Node}: dropping native-alarm ack for {Cond} — not primary",
_localNode, msg.ConditionNodeId);
return;
}
if (!_driverRefByAlarmNodeId.TryGetValue(msg.ConditionNodeId, out var target))
{
_log.Debug("DriverHost {Node}: no driver mapping for alarm condition node {Cond} — ack dropped",
_localNode, msg.ConditionNodeId);
return;
}
if (!_children.TryGetValue(target.DriverInstanceId, out var entry))
{
_log.Debug("DriverHost {Node}: driver {Driver} not running for alarm ack of {Cond} — dropped",
_localNode, target.DriverInstanceId, msg.ConditionNodeId);
return;
}
// Fire-and-forget: the OPC UA Part 9 ack already committed the local condition state, and the
// driver's AcknowledgeAsync surfaces no per-condition status, so there is nothing to reply. The
// driver correlates on ConditionId (= the authored alarm FullName the inverse map keyed on).
entry.Actor.Tell(new DriverInstanceActor.RouteAlarmAck(target.FullName, msg.Comment, msg.OperatorUser));
}
/// Caches this node's from a cluster redundancy snapshot so
/// can gate inbound writes to the Primary. A snapshot that doesn't
/// mention this node leaves the cached role unchanged ⇒ default-allow. Mirrors
/// ScriptedAlarmHostActor.OnRedundancyStateChanged / OpcUaPublishActor.
private void OnRedundancyStateChanged(RedundancyStateChanged msg)
{
var local = msg.Nodes.FirstOrDefault(n => n.NodeId == _localNode);
if (local is not null)
{
_localRole = local.Role;
}
}
private void Stale()
{
Receive(_ =>
{
_log.Warning("DriverHost {Node}: ignoring DispatchDeployment while Stale (DB unreachable)", _localNode);
});
Receive(HandleGetDiagnostics);
Receive(_ => TryRecoverFromStale());
Receive(HandleRestartDriver);
Receive(HandleReconnectDriver);
Receive(OnRedundancyStateChanged);
// The redundancy-state topic also carries OpcUaProbeResult (OpcUaPublishActor peer-probes).
// We don't consume it here — drop it so it doesn't dead-letter (matches PeerProbeSupervisor).
Receive(_ => { });
// An inbound operator write can't be serviced while the config DB is unreachable — fast-fail so the
// node-manager's bounded Ask gets an immediate clear status instead of dead-lettering into a timeout.
Receive(_ =>
Sender.Tell(new NodeWriteResult(false, "driver host stale (config DB unreachable)")));
// An inbound native-condition ack can't be serviced while Stale either (the alarm inverse map is
// empty until an apply runs). The ack is fire-and-forget (no reply), so just drop it with a log —
// the local OPC UA condition state already committed on the Part 9 ack.
Receive(msg =>
_log.Debug("DriverHost {Node}: dropping native-alarm ack for {Node2} while Stale (config DB unreachable)",
_localNode, msg.ConditionNodeId));
// A driver child's post-connect DiscoveredNodesReady can't be injected while Stale (no composition is
// applied yet, so the equipment can't be resolved). Drop it — Task 6's re-discovery loop re-sends it
// and the Task-8 post-recovery re-apply self-heal it once an apply runs (matches the no-op drops above).
Receive(_ => { });
Receive(_ => { /* PubSub ack */ });
Timers.StartPeriodicTimer("retry-db", RetryConfigDbConnection.Instance, ReconnectInterval);
}
private void HandleGetDiagnostics(GetDiagnostics msg)
{
var drivers = _children
.Select(kv => new DriverInstanceDiagnostics(
DriverInstanceId: Guid.Empty,
Name: kv.Key,
State: kv.Value.Stubbed ? "Stubbed" : "Spawned",
ConnectedDevices: 0,
FaultedDevices: 0,
LastChangeUtc: DateTime.UtcNow))
.ToArray();
var snapshot = new NodeDiagnosticsSnapshot(
NodeId: _localNode,
CurrentRevision: _currentRevision,
Drivers: drivers,
AsOfUtc: DateTime.UtcNow);
Sender.Tell(snapshot);
}
private void HandleDispatchFromSteady(DispatchDeployment msg)
{
if (_currentRevision is { } cur && cur == msg.RevisionHash)
{
// Idempotent — already at this rev. Ack and stay Steady.
_log.Debug("DriverHost {Node}: dispatch {Id} matches current rev {Rev}; immediate ACK",
_localNode, msg.DeploymentId, msg.RevisionHash);
SendAck(msg.DeploymentId, ApplyAckOutcome.Applied, failureReason: null, msg.CorrelationId);
return;
}
ApplyAndAck(msg.DeploymentId, msg.RevisionHash, msg.CorrelationId);
}
private void ApplyAndAck(DeploymentId deploymentId, RevisionHash revision, CorrelationId correlation)
{
_applyingDeploymentId = deploymentId;
Become(Applying);
using var span = OtOpcUaTelemetry.StartDeployApplySpan(deploymentId.ToString());
span?.SetTag("otopcua.node_id", _localNode.ToString());
span?.SetTag("otopcua.revision", revision.ToString());
span?.SetTag("otopcua.correlation_id", correlation.ToString());
var sw = Stopwatch.StartNew();
// Persist Applying row (idempotent on PK).
UpsertNodeDeploymentState(deploymentId, NodeDeploymentStatus.Applying, failureReason: null);
try
{
ReconcileDrivers(deploymentId);
_currentRevision = revision;
UpsertNodeDeploymentState(deploymentId, NodeDeploymentStatus.Applied, failureReason: null);
SendAck(deploymentId, ApplyAckOutcome.Applied, failureReason: null, correlation);
// Trigger the OPC UA address-space rebuild so the local SDK reflects the new
// composition. The publish actor handles the load-compose-diff-apply pipeline; we
// just forward the same correlation id so the audit trail joins up.
_opcUaPublishActor?.Tell(new ZB.MOM.WW.OtOpcUa.Runtime.OpcUa.OpcUaPublishActor.RebuildAddressSpace(correlation, deploymentId));
// SubscribeBulk pass: hand each driver its desired tag references so live values flow into
// the just-rebuilt address space instead of staying BadWaitingForInitialData.
PushDesiredSubscriptions(deploymentId);
OtOpcUaTelemetry.DeploymentApplied.Add(1, new KeyValuePair("outcome", "ack"));
_log.Info("DriverHost {Node}: applied deployment {Id} (rev {Rev}, children={Count})",
_localNode, deploymentId, revision, _children.Count);
}
catch (Exception ex)
{
UpsertNodeDeploymentState(deploymentId, NodeDeploymentStatus.Failed, ex.Message);
SendAck(deploymentId, ApplyAckOutcome.Failed, ex.Message, correlation);
OtOpcUaTelemetry.DeploymentApplied.Add(1, new KeyValuePair("outcome", "reject"));
span?.SetStatus(ActivityStatusCode.Error, ex.Message);
_log.Error(ex, "DriverHost {Node}: apply of {Id} failed", _localNode, deploymentId);
}
finally
{
OtOpcUaTelemetry.DeploymentApplyDurationSec.Record(sw.Elapsed.TotalSeconds);
_applyingDeploymentId = null;
Become(Steady);
}
}
///
/// Read the deployment artifact + reconcile the set of running
/// children. Spawn missing, ApplyDelta on config change, stop removed/disabled drivers.
/// When the artifact blob is empty (legacy ControlPlane tests, smoke fixtures) or the
/// configured can't materialise any of the requested
/// types, this is effectively a no-op.
///
private void ReconcileDrivers(DeploymentId deploymentId)
{
byte[] blob;
try
{
using var db = _dbFactory.CreateDbContext();
blob = db.Deployments.AsNoTracking()
.Where(d => d.DeploymentId == deploymentId.Value)
.Select(d => d.ArtifactBlob)
.FirstOrDefault() ?? Array.Empty();
}
catch (Exception ex)
{
_log.Warning(ex, "DriverHost {Node}: failed to load artifact for {Id}; skipping reconcile",
_localNode, deploymentId);
return;
}
var specs = DeploymentArtifact.ParseDriverInstances(blob, _localNode.Value);
var snapshots = _children.ToDictionary(
kv => kv.Key,
kv => new DriverChildSnapshot(kv.Value.DriverType, kv.Value.LastConfigJson),
StringComparer.Ordinal);
var plan = DriverSpawnPlanner.Compute(snapshots, specs);
foreach (var id in plan.ToStop) StopChild(id);
foreach (var spec in plan.ToApplyDelta) ApplyChildDelta(spec);
foreach (var spec in plan.ToSpawn) SpawnChild(spec);
}
///
/// Restore the served state for an already-applied deployment after a process restart.
/// recovers from NodeDeploymentState,
/// but the driver children and OPC UA address space are in-memory and gone after a restart —
/// so without this a restarted node serves an empty address space until the next config
/// change (and an identical-config redeploy no-ops on the unchanged revision). Re-spawns
/// drivers, rebuilds the address space from the applied artifact, and re-pushes SubscribeBulk.
/// No re-ack: the deployment is already Applied.
///
private void RestoreApplied(DeploymentId deploymentId)
{
var correlation = CorrelationId.NewId();
try
{
ReconcileDrivers(deploymentId);
_opcUaPublishActor?.Tell(new ZB.MOM.WW.OtOpcUa.Runtime.OpcUa.OpcUaPublishActor.RebuildAddressSpace(correlation, deploymentId));
PushDesiredSubscriptions(deploymentId);
_log.Info("DriverHost {Node}: restored served state for applied deployment {Id} on bootstrap", _localNode, deploymentId);
}
catch (Exception ex)
{
_log.Warning(ex, "DriverHost {Node}: failed to restore served state for {Id} on bootstrap", _localNode, deploymentId);
}
}
///
/// SubscribeBulk pass. After an apply, read the deployment's Equipment-namespace tags,
/// group their driver-side FullName references by driver instance, and hand each running driver
/// child its desired subscription set via .
/// The child retains the set and (re)subscribes on every Connected entry, so values stream into
/// the OPC UA sink and resume after reconnects. Drivers with no configured tags get an empty set
/// (which clears any stale subscription from a previous deployment).
///
private void PushDesiredSubscriptions(DeploymentId deploymentId)
{
byte[] blob;
try
{
using var db = _dbFactory.CreateDbContext();
blob = db.Deployments.AsNoTracking()
.Where(d => d.DeploymentId == deploymentId.Value)
.Select(d => d.ArtifactBlob)
.FirstOrDefault() ?? Array.Empty();
}
catch (Exception ex)
{
_log.Warning(ex, "DriverHost {Node}: failed to load artifact for SubscribeBulk ({Id})", _localNode, deploymentId);
return;
}
AddressSpaceComposition composition;
try
{
composition = DeploymentArtifact.ParseComposition(blob, _localNode.Value);
}
catch (Exception ex)
{
_log.Warning(ex, "DriverHost {Node}: failed to parse composition for SubscribeBulk ({Id})", _localNode, deploymentId);
return;
}
// Value-subscription set: alarm-bearing tags are Part 9 conditions, not value variables, so they
// are excluded — the driver must not value-subscribe an alarm attribute (it is fed via the native
// alarm event stream, routed by ForwardNativeAlarm).
var refsByDriver = composition.EquipmentTags
.Where(t => t.Alarm is null)
.GroupBy(t => t.DriverInstanceId, StringComparer.Ordinal)
.ToDictionary(
g => g.Key,
g => (IReadOnlyList)g.Select(t => t.FullName)
.Distinct(StringComparer.Ordinal)
.ToArray(),
StringComparer.Ordinal);
// Native-alarm subscription set: the alarm-bearing tags' FullNames (= the driver's
// ConditionId/AlarmFullReference). An IAlarmSource driver suppresses OnAlarmEvent until at least one
// alarm subscription exists (e.g. GalaxyDriver gates its central feed on _alarmSubscriptions), so the
// instance actor must SubscribeAlarmsAsync these refs to un-gate the feed. Routing stays by
// ConditionId in ForwardNativeAlarm; this set just opens (and scopes) the subscription.
var alarmRefsByDriver = composition.EquipmentTags
.Where(t => t.Alarm is not null)
.GroupBy(t => t.DriverInstanceId, StringComparer.Ordinal)
.ToDictionary(
g => g.Key,
g => (IReadOnlyList)g.Select(t => t.FullName)
.Distinct(StringComparer.Ordinal)
.ToArray(),
StringComparer.Ordinal);
// Rebuild the driver live-value routing map from the SAME EquipmentTags pass (mirrors
// VirtualTagHostActor._nodeIdByVtag): map each tag's (DriverInstanceId, FullName) wire-ref to
// the folder-scoped equipment NodeId the materialiser placed its variable at, so ForwardToMux
// can land driver values on the right node. Clear-and-repopulate every apply so renames
// (Name/FolderPath/EquipmentId changes) and removals are reflected.
_nodeIdByDriverRef.Clear();
// Inverse map for the inbound operator-write path (NodeId → (DriverInstanceId, FullName)): an
// operator writes to the variable's folder-scoped NodeId, but the driver writes by its wire-ref
// FullName. Cleared + repopulated from the SAME EquipmentTags pass so renames/removals are
// reflected. Each NodeId maps to exactly one driver ref (a variable is backed by a single driver
// attribute), so last-writer-wins on the rare duplicate is harmless.
_driverRefByNodeId.Clear();
// Alarm condition routing map: (DriverInstanceId, FullName = alarm ConditionId/AlarmFullReference) → folder-scoped
// condition NodeId(s). Built from the SAME EquipmentTags pass (alarm-bearing tags only) so
// ForwardNativeAlarm can land a native transition on the right condition node. Clear-and-rebuild
// every apply; the projector is Clear()'d too so stale per-condition state never leaks across
// redeploys (renames/removals/address-space rebuilds).
_alarmNodeIdByDriverRef.Clear();
// Inverse alarm map for the inbound native-condition ack path (condition NodeId → (DriverInstanceId,
// FullName)): an OPC UA client acknowledges the condition's folder-scoped NodeId, but the driver
// acknowledges by its wire-ref FullName (= ConditionId). Cleared + repopulated from the SAME
// alarm-bearing EquipmentTags pass so renames/removals are reflected. Each condition NodeId maps to
// exactly one driver ref (a condition is backed by a single driver alarm), so last-writer-wins on the
// rare duplicate is harmless.
_driverRefByAlarmNodeId.Clear();
// Per-condition metadata (EquipmentId / Name / OPC UA alarm type) for the alerts fan-out, built in
// the SAME alarm branch as the node map so a redeploy can't leave it out of sync. Cleared alongside it.
_alarmMetaByNodeId.Clear();
_nativeAlarmProjector.Clear();
foreach (var t in composition.EquipmentTags)
{
var key = (t.DriverInstanceId, t.FullName);
var nodeId = EquipmentNodeIds.Variable(t.EquipmentId, t.FolderPath, t.Name);
if (t.Alarm is not null)
{
// Alarm tags are conditions, not value variables: route them ONLY into the alarm map and
// keep them OUT of the value maps + value-subscription set (so they don't get both a value
// variable AND a condition).
if (!_alarmNodeIdByDriverRef.TryGetValue(key, out var aset))
_alarmNodeIdByDriverRef[key] = aset = new HashSet(StringComparer.Ordinal);
aset.Add(nodeId);
// Inverse 1:1 map for the inbound native-condition ack path: the materialised condition
// NodeId routes back to the owning (DriverInstanceId, FullName=ConditionId) so an OPC UA
// acknowledge of this condition reaches the right driver child.
_driverRefByAlarmNodeId[nodeId] = key;
// Capture the per-condition metadata the alerts fan-out (ForwardNativeAlarm) needs to build
// the AlarmTransitionEvent: the equipment path, the operator-visible alarm name, and the
// OPC UA Part 9 subtype. Keyed by the condition NodeId (the projection's own key).
_alarmMetaByNodeId[nodeId] = (t.EquipmentId, t.Name, t.Alarm.AlarmType, t.Alarm.HistorizeToAveva);
continue;
}
if (!_nodeIdByDriverRef.TryGetValue(key, out var set))
_nodeIdByDriverRef[key] = set = new HashSet(StringComparer.Ordinal);
set.Add(nodeId);
_driverRefByNodeId[nodeId] = key;
}
var total = 0;
foreach (var (driverId, entry) in _children)
{
var refs = refsByDriver.TryGetValue(driverId, out var r) ? r : Array.Empty();
var alarmRefs = alarmRefsByDriver.TryGetValue(driverId, out var ar) ? ar : Array.Empty();
entry.Actor.Tell(new DriverInstanceActor.SetDesiredSubscriptions(refs, SubscriptionPublishingInterval, alarmRefs));
total += refs.Count;
}
if (total > 0)
{
_log.Info("DriverHost {Node}: SubscribeBulk pushed {Refs} references across {Drivers} driver(s)",
_localNode, total, refsByDriver.Count);
}
// Hand the Equipment-namespace VirtualTags to the host so it spawns/reconciles a
// VirtualTagActor per plan and streams their evaluated values back onto the just-rebuilt
// address space. Runs on BOTH the fresh-apply path (ApplyAndAck) and the bootstrap-restore
// path (RestoreApplied) because both call this method, so one send covers both.
// NOTE: the Stale-recovery path (TryRecoverFromStale) does NOT call PushDesiredSubscriptions,
// so — like drivers — VirtualTags remain empty after a Stale recovery until the next
// deployment dispatch. This is intentional and consistent with driver recovery: the Stale
// path only restores the revision marker + NodeDeploymentState; a subsequent dispatch
// (or a redeploy from AdminUI) triggers the full apply + subscribe pass.
_virtualTagHost?.Tell(new VirtualTagHostActor.ApplyVirtualTags(composition.EquipmentVirtualTags));
if (composition.EquipmentVirtualTags.Count > 0)
{
_log.Info("DriverHost {Node}: applied {Count} Equipment VirtualTag(s) to the VirtualTag host",
_localNode, composition.EquipmentVirtualTags.Count);
}
// Same pass for Equipment-namespace ScriptedAlarms: hand the plans to the host so it
// (re)loads its engine + re-registers mux interest for the union of dependency refs. Covers
// both the fresh-apply and bootstrap-restore paths (both call this method); the Stale-recovery
// path deliberately does not, matching driver + VirtualTag recovery.
_scriptedAlarmHost?.Tell(new ScriptedAlarmHostActor.ApplyScriptedAlarms(composition.EquipmentScriptedAlarms));
if (composition.EquipmentScriptedAlarms.Count > 0)
{
_log.Info("DriverHost {Node}: applied {Count} Equipment ScriptedAlarm(s) to the ScriptedAlarm host",
_localNode, composition.EquipmentScriptedAlarms.Count);
}
// Cache the applied composition LAST so discovered-node injection (HandleDiscoveredNodes) can resolve
// the equipment bound to a driver + recompute the authored subscription sets when a driver later
// reports its FixedTree. Set here (not in ApplyAndAck) so both the fresh-apply and bootstrap-restore
// paths — which both route through this method — leave a current composition.
_lastComposition = composition;
// Re-inject discovered (FixedTree) nodes after the authored rebuild. PushDesiredSubscriptions cleared
// _nodeIdByDriverRef and re-pushed authored-only subscriptions above; without this, an IN-PROCESS
// redeploy / re-apply (one that runs while the host is alive, so _discoveredByDriver is populated)
// would drop the injected FixedTree routes + materialised nodes until the driver happens to reconnect
// and re-discover. This loop is INERT on the bootstrap-restore path (RestoreApplied): there the actor
// is freshly constructed so _discoveredByDriver is empty — restart survival comes from Task 6's
// post-connect re-discovery, NOT this re-apply. Re-resolve each cached driver's candidate equipments
// from the CURRENT composition (the SAME EquipmentNodes-UNION-EquipmentTags logic HandleDiscoveredNodes
// uses), then validate each cached (equipmentId → plan) entry PER ENTRY: drop the entry if its
// equipmentId is no longer a resolved candidate for the driver, OR the plan's NodeIds aren't scoped to
// that equipmentId (a rebind). A driver whose inner map empties out is removed entirely. The surviving
// entries are re-applied via the single-send-per-driver structure. (The single-equipment case today has
// exactly one inner entry; the multi-device task adds more.)
foreach (var driverId in _discoveredByDriver.Keys.ToList()) // snapshot — we mutate the dict below
{
var fromNodes = composition.EquipmentNodes
.Where(e => e.DriverInstanceId is not null && string.Equals(e.DriverInstanceId, driverId, StringComparison.Ordinal))
.Select(e => e.EquipmentId);
var fromTags = composition.EquipmentTags
.Where(t => string.Equals(t.DriverInstanceId, driverId, StringComparison.Ordinal))
.Select(t => t.EquipmentId);
var candidates = fromNodes.Concat(fromTags).ToHashSet(StringComparer.Ordinal);
var plansByEquipment = _discoveredByDriver[driverId];
// Track whether ANY entry was dropped (no-longer-candidate or rebind) so we can re-trigger this
// driver's discovery exactly ONCE after the inner map is processed (see the post-loop block).
var droppedAny = false;
foreach (var equipmentId in plansByEquipment.Keys.ToList()) // snapshot — we mutate the inner dict
{
var plan = plansByEquipment[equipmentId];
if (!candidates.Contains(equipmentId))
{
plansByEquipment.Remove(equipmentId);
droppedAny = true;
_log.Debug("DriverHost {Node}: dropped cached discovered nodes for {Driver}/{Equipment} — equipment no longer resolves", _localNode, driverId, equipmentId);
continue;
}
// If the equipment was rebound (the cached plan's NodeIds are scoped to the OLD equipment), drop +
// let re-discovery rebuild against the new equipment. The plan's NodeIds are "{equipmentId}/...".
var planEquipmentConsistent = plan.Variables.Count > 0
&& plan.Variables[0].NodeId.StartsWith(equipmentId + "/", StringComparison.Ordinal);
if (!planEquipmentConsistent)
{
plansByEquipment.Remove(equipmentId);
droppedAny = true;
_log.Debug("DriverHost {Node}: dropped cached discovered nodes for {Driver}/{Equipment} — equipment rebound", _localNode, driverId, equipmentId);
}
}
// Re-trigger discovery when ANY entry was dropped (no-longer-candidate or rebind). A CONFIG-UNCHANGED
// rebind (the driver's DriverConfig is identical, only its authored tag's EquipmentId moved) is NOT
// restarted by ReconcileDrivers — the child stays Connected — so without this nudge the FixedTree
// subtree would stay ABSENT under the new equipment until the driver's next natural reconnect. We now
// ask the child to re-run discovery so it re-grafts promptly: the next pass resolves against the new
// _lastComposition (the now-bound equipment). This is a DISCOVERY action, not lifecycle control — no
// stop/restart; it is idempotent, and the child no-ops it if not Connected (handled in
// DriverInstanceActor). Sent at most ONCE per driver per re-inject pass (here, after the inner map is
// processed — so even when the inner map empties below), guarded on the child still existing.
if (droppedAny && _children.TryGetValue(driverId, out var rediscoverEntry))
rediscoverEntry.Actor.Tell(new DriverInstanceActor.TriggerRediscovery());
if (plansByEquipment.Count == 0)
{
_discoveredByDriver.Remove(driverId);
continue;
}
ApplyDiscoveredPlansForDriver(driverId, plansByEquipment);
}
}
private void SpawnChild(DriverInstanceSpec spec)
{
var stub = DriverInstanceActor.ShouldStub(spec.DriverType, _localRoles);
IDriver? driver = null;
if (!stub)
{
try { driver = _driverFactory.TryCreate(spec.DriverType, spec.DriverInstanceId, spec.DriverConfig); }
catch (Exception ex)
{
_log.Warning(ex, "DriverHost {Node}: factory for {Type} threw on {Id}; stubbing",
_localNode, spec.DriverType, spec.DriverInstanceId);
}
if (driver is null)
{
_log.Warning(
"DriverHost {Node}: no factory for driver type {Type} (instance {Id}); falling back to stub",
_localNode, spec.DriverType, spec.DriverInstanceId);
stub = true;
}
}
IActorRef child;
// Prefer the real ClusterId from the deployment artifact; fall back to the local node
// identity for pre-PR artifacts that don't carry it yet (older deploys persisted before
// ClusterId was added to DriverInstanceSpec).
var clusterId = !string.IsNullOrEmpty(spec.ClusterId) ? spec.ClusterId : _localNode.Value;
// A fresh generation-suffixed name on EVERY spawn so a respawn can never collide with a child
// still tearing down: Akka frees a stopped actor's name only after it FULLY terminates (async),
// but HandleRestartDriver stops + respawns within one (synchronous) message handler — the old
// child is still registered, so reusing the base name throws InvalidActorNameException
// ("actor name is not unique"). Children are tracked by the _children dict (by IActorRef), never
// by path, so the suffix is invisible to every caller.
var actorName = ActorNameFor(spec.DriverInstanceId, _childSpawnGeneration++);
if (stub)
{
child = Context.ActorOf(
DriverInstanceActor.Props(
new StubbedDriver(spec.DriverInstanceId, spec.DriverType),
reconnectInterval: null,
startStubbed: true,
healthPublisher: _healthPublisher,
clusterId: clusterId),
actorName);
}
else
{
child = Context.ActorOf(
DriverInstanceActor.Props(
driver!,
healthPublisher: _healthPublisher,
clusterId: clusterId),
actorName);
child.Tell(new DriverInstanceActor.InitializeRequested(spec.DriverConfig));
}
_children[spec.DriverInstanceId] = new ChildEntry(child, spec, stub);
_log.Info("DriverHost {Node}: spawned {Type} driver {Id} (stub={Stub})",
_localNode, spec.DriverType, spec.DriverInstanceId, stub);
}
private void ApplyChildDelta(DriverInstanceSpec spec)
{
if (!_children.TryGetValue(spec.DriverInstanceId, out var entry)) return;
entry.Actor.Tell(new DriverInstanceActor.ApplyDelta(spec.DriverConfig, CorrelationId.NewId()));
// Store the full new spec — a delta can change Name, Enabled, ClusterId, etc. in addition to config.
_children[spec.DriverInstanceId] = entry with { Spec = spec };
_log.Debug("DriverHost {Node}: ApplyDelta queued for {Id}", _localNode, spec.DriverInstanceId);
}
private void StopChild(string driverInstanceId)
{
if (!_children.TryGetValue(driverInstanceId, out var entry)) return;
Context.Stop(entry.Actor);
_children.Remove(driverInstanceId);
_log.Info("DriverHost {Node}: stopped driver child {Id}", _localNode, driverInstanceId);
}
private static string ActorNameFor(string driverInstanceId, long generation)
{
// Akka actor names cannot contain '/', ':', or whitespace. Mangle defensively. The monotonic
// generation suffix guarantees a never-before-used name on every spawn (see SpawnChild) so a
// restart's respawn can never collide with the still-terminating predecessor.
var chars = driverInstanceId.Select(c => char.IsLetterOrDigit(c) || c is '-' or '_' or '.' ? c : '_').ToArray();
return "drv-" + new string(chars) + "-g" + generation;
}
///
/// Minimal placeholder driver used when no factory is registered for a driver type or when
/// returns true.
/// is started with startStubbed:true so the driver methods on this object never run.
///
private sealed class StubbedDriver : IDriver
{
///
public string DriverInstanceId { get; }
///
public string DriverType { get; }
/// Initializes a new stubbed driver with the specified ID and type.
/// The driver instance identifier.
/// The driver type name.
public StubbedDriver(string id, string type) { DriverInstanceId = id; DriverType = type; }
///
public Task InitializeAsync(string driverConfigJson, CancellationToken cancellationToken) => Task.CompletedTask;
///
public Task ReinitializeAsync(string driverConfigJson, CancellationToken cancellationToken) => Task.CompletedTask;
///
public Task ShutdownAsync(CancellationToken cancellationToken) => Task.CompletedTask;
///
public DriverHealth GetHealth() => new(DriverState.Healthy, DateTime.UtcNow, LastError: null);
///
public long GetMemoryFootprint() => 0;
///
public Task FlushOptionalCachesAsync(CancellationToken cancellationToken) => Task.CompletedTask;
}
private void HandleRestartDriver(RestartDriver msg)
{
// DPS broadcast — only act if this node hosts the requested instance.
if (!_children.TryGetValue(msg.DriverInstanceId, out var entry))
return;
_log.Info("DriverHost {Node}: restarting driver {Id} by request of {User}",
_localNode, msg.DriverInstanceId, msg.ActorByUserName);
// Stop the existing child actor — DriverInstanceActor.PostStop calls ShutdownAsync.
Context.Stop(entry.Actor);
_children.Remove(msg.DriverInstanceId);
// Respawn from the same spec the last reconcile used — preserves RowId, Name, ClusterId.
SpawnChild(entry.Spec);
}
private void HandleReconnectDriver(ReconnectDriver msg)
{
// DPS broadcast — only act if this node hosts the requested instance.
if (!_children.TryGetValue(msg.DriverInstanceId, out var entry))
return;
_log.Info("DriverHost {Node}: reconnecting driver {Id} by request of {User}",
_localNode, msg.DriverInstanceId, msg.ActorByUserName);
// Tell the child to drop its transport and re-enter the Reconnecting state.
entry.Actor.Tell(new DriverInstanceActor.ForceReconnect());
}
private void TryRecoverFromStale()
{
try
{
using var db = _dbFactory.CreateDbContext();
var latestSealed = db.Deployments
.AsNoTracking()
.Where(d => d.Status == DeploymentStatus.Sealed)
.OrderByDescending(d => d.SealedAtUtc)
.Select(d => new { d.DeploymentId, d.RevisionHash })
.FirstOrDefault();
_log.Info("DriverHost {Node}: ConfigDb back; recovering from Stale", _localNode);
Timers.Cancel("retry-db");
if (latestSealed is not null)
{
_currentRevision = RevisionHash.Parse(latestSealed.RevisionHash);
UpsertNodeDeploymentState(new DeploymentId(latestSealed.DeploymentId),
NodeDeploymentStatus.Applied, failureReason: null);
}
Become(Steady);
}
catch (Exception ex)
{
_log.Debug(ex, "DriverHost {Node}: still Stale; will retry in {Interval}", _localNode, ReconnectInterval);
}
}
private void UpsertNodeDeploymentState(DeploymentId deploymentId, NodeDeploymentStatus status, string? failureReason)
{
try
{
using var db = _dbFactory.CreateDbContext();
var existing = db.NodeDeploymentStates.FirstOrDefault(
x => x.NodeId == _localNode.Value && x.DeploymentId == deploymentId.Value);
if (existing is null)
{
db.NodeDeploymentStates.Add(new NodeDeploymentState
{
NodeId = _localNode.Value,
DeploymentId = deploymentId.Value,
Status = status,
FailureReason = failureReason,
AppliedAtUtc = status == NodeDeploymentStatus.Applied ? DateTime.UtcNow : null,
});
}
else
{
existing.Status = status;
existing.FailureReason = failureReason;
if (status == NodeDeploymentStatus.Applied) existing.AppliedAtUtc = DateTime.UtcNow;
}
db.SaveChanges();
}
catch (Exception ex)
{
_log.Warning(ex, "DriverHost {Node}: failed to upsert NodeDeploymentState for {Id}", _localNode, deploymentId);
}
}
private void SendAck(DeploymentId deploymentId, ApplyAckOutcome outcome, string? failureReason, CorrelationId correlation)
{
var ack = new ApplyAck(deploymentId, _localNode, outcome, failureReason, correlation);
if (_coordinatorOverride is not null)
{
_coordinatorOverride.Tell(ack);
}
else
{
// No direct coordinator handle — publish on the dedicated ACK topic. The coordinator
// singleton subscribes there in PreStart so the ACK reaches whichever admin node hosts
// it without an actor-path lookup.
DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish(DeploymentAcksTopic, ack));
}
}
}