Files
lmxopcua/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs
T

1609 lines
96 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
using System.Diagnostics;
using Akka.Actor;
using Akka.Cluster.Tools.PublishSubscribe;
using Akka.Event;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using ZB.MOM.WW.OtOpcUa.Commons.Engines;
using ZB.MOM.WW.OtOpcUa.Commons.Interfaces;
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Admin;
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Alerts;
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Deploy;
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Fleet;
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Redundancy;
using ZB.MOM.WW.OtOpcUa.Commons.Observability;
using ZB.MOM.WW.OtOpcUa.Commons.OpcUa;
using ZB.MOM.WW.OtOpcUa.Commons.Types;
using ZB.MOM.WW.OtOpcUa.Configuration;
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
using ZB.MOM.WW.OtOpcUa.Configuration.Enums;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms;
using ZB.MOM.WW.OtOpcUa.Core.Scripting;
using ZB.MOM.WW.OtOpcUa.Core.VirtualTags;
using ZB.MOM.WW.OtOpcUa.OpcUaServer;
using ZB.MOM.WW.OtOpcUa.Runtime.ScriptedAlarms;
using ZB.MOM.WW.OtOpcUa.Runtime.VirtualTags;
using CommonsNodeId = ZB.MOM.WW.OtOpcUa.Commons.Types.NodeId;
namespace ZB.MOM.WW.OtOpcUa.Runtime.Drivers;
/// <summary>
/// Per-node supervisor that receives <see cref="DispatchDeployment"/> from the admin-role
/// coordinator and applies the deployment locally. Three Become states:
///
/// <list type="bullet">
/// <item><c>Bootstrapping</c> — PreStart only. Reads <see cref="NodeDeploymentState"/> for self;
/// chooses next state.</item>
/// <item><c>Steady(rev)</c> — caught up. Idempotent on same-rev dispatch (immediate <c>ApplyAck.Applied</c>).
/// New rev → transitions to <c>Applying</c>.</item>
/// <item><c>Applying(id)</c> — applying a delta. Buffers further dispatches.</item>
/// <item><c>Stale</c> — ConfigDb unreachable on bootstrap. Background reconnect loop tries to advance.</item>
/// </list>
///
/// Children (DriverInstance/VirtualTag/etc.) are fully wired: <c>DispatchDeployment</c> drives
/// <see cref="PushDesiredSubscriptions"/> which runs the spawn-plan via <see cref="IDriverFactory"/>,
/// maintains the <c>FullName→NodeId</c> live-value routing map, and forwards results to the
/// OPC UA publish actor via <c>ForwardToMux</c>.
/// </summary>
public sealed class DriverHostActor : ReceiveActor, IWithTimers
{
public const string DeploymentsTopic = "deployments";
public const string DeploymentAcksTopic = "deployment-acks";
public const string DriverControlTopic = ZB.MOM.WW.OtOpcUa.Commons.Messages.Admin.DriverControlTopic.Name;
public static readonly TimeSpan ReconnectInterval = TimeSpan.FromSeconds(30);
/// <summary>Publishing interval handed to each driver's SubscribeBulk pass after an apply.</summary>
private static readonly TimeSpan SubscriptionPublishingInterval = TimeSpan.FromSeconds(1);
private readonly IDbContextFactory<OtOpcUaConfigDbContext> _dbFactory;
private readonly CommonsNodeId _localNode;
private readonly IActorRef? _coordinatorOverride;
private readonly IDriverFactory _driverFactory;
private readonly IReadOnlySet<string> _localRoles;
private readonly IActorRef? _dependencyMux;
private readonly IActorRef? _opcUaPublishActor;
private readonly IDriverHealthPublisher _healthPublisher;
private readonly IVirtualTagEvaluator _virtualTagEvaluator;
private readonly IHistoryWriter _historyWriter;
private readonly IActorRef? _virtualTagHostOverride;
private readonly ILoggerFactory _loggerFactory;
private readonly ScriptRootLogger? _scriptRootLogger;
private readonly IActorRef? _scriptedAlarmHostOverride;
private readonly ILoggingAdapter _log = Context.GetLogger();
/// <summary>The single VirtualTag-host child that spawns/reconciles Equipment-namespace
/// VirtualTagActors and bridges their results onto the OPC UA publish actor. Spawned in
/// <see cref="PreStart"/> when an OPC UA publish actor is wired; receives
/// <see cref="VirtualTagHostActor.ApplyVirtualTags"/> from <see cref="PushDesiredSubscriptions"/>.</summary>
private IActorRef? _virtualTagHost;
/// <summary>The single ScriptedAlarm-host child that owns the per-node
/// <see cref="ScriptedAlarmEngine"/>, feeds it live tag values, and bridges its emissions onto the
/// OPC UA publish actor + the cluster <c>alerts</c> topic. Spawned in <see cref="PreStart"/>
/// alongside the VirtualTag host (requires both an OPC UA publish actor and a
/// <see cref="ScriptRootLogger"/>); receives <see cref="ScriptedAlarmHostActor.ApplyScriptedAlarms"/>
/// from <see cref="PushDesiredSubscriptions"/>.</summary>
private IActorRef? _scriptedAlarmHost;
private RevisionHash? _currentRevision;
private DeploymentId? _applyingDeploymentId;
private readonly Dictionary<string, ChildEntry> _children = new(StringComparer.Ordinal);
// Monotonic counter feeding the child actor-name suffix (see ActorNameFor / SpawnChild). Single-
// threaded actor, so a plain increment is safe; it only ever grows, guaranteeing a unique name per
// spawn so a restart's respawn never collides with the still-terminating old child.
private long _childSpawnGeneration;
/// <summary>
/// Driver live-value routing map: <c>(DriverInstanceId, FullName) → folder-scoped equipment
/// NodeId(s)</c>. Rebuilt every apply by <see cref="PushDesiredSubscriptions"/> from the
/// composition's <c>EquipmentTags</c> (mirroring <c>VirtualTagHostActor._nodeIdByVtag</c>), and
/// resolved in <see cref="ForwardToMux"/> so a driver value published by wire-ref FullName lands
/// on the variable's actual folder-scoped NodeId. A set because the same driver ref can back
/// several equipment variables (e.g. identical machines sharing a register), and the per-apply
/// rebuild dedups by NodeId.
/// </summary>
private readonly Dictionary<(string DriverInstanceId, string FullName), HashSet<string>> _nodeIdByDriverRef = new();
/// <summary>
/// Inverse of <see cref="_nodeIdByDriverRef"/>: <c>folder-scoped equipment NodeId →
/// (DriverInstanceId, FullName)</c>. Rebuilt every apply by <see cref="PushDesiredSubscriptions"/>
/// from the same <c>EquipmentTags</c> pass, and resolved by <see cref="HandleRouteNodeWrite"/> so an
/// inbound operator write targeting an equipment variable's NodeId is forwarded to the owning
/// driver child as a write of its wire-ref <c>FullName</c>. Each NodeId maps to exactly one driver
/// ref (a variable is backed by a single driver attribute), so this is a flat 1:1 map (the forward
/// map fans out 1:N because one ref can back several variables).
/// </summary>
private readonly Dictionary<string, (string DriverInstanceId, string FullName)> _driverRefByNodeId =
new(StringComparer.Ordinal);
/// <summary>(DriverInstanceId, FullName = alarm ConditionId / AlarmFullReference) → folder-scoped condition NodeId(s).
/// Built from EquipmentTags whose plan carries Alarm, alongside the value maps; resolves a native
/// alarm transition to the materialised Part 9 condition node(s). Alarm tags are conditions, not
/// value variables, so they are kept OUT of the value maps + value-subscription set.</summary>
private readonly Dictionary<(string DriverInstanceId, string FullName), HashSet<string>> _alarmNodeIdByDriverRef = new();
/// <summary>
/// Inverse of <see cref="_alarmNodeIdByDriverRef"/>: <c>folder-scoped condition NodeId →
/// (DriverInstanceId, FullName = alarm ConditionId/AlarmFullReference)</c>. Built in the SAME apply
/// pass from the alarm-bearing EquipmentTags, and resolved by <see cref="HandleRouteNativeAlarmAck"/>
/// so an inbound OPC UA acknowledge of a native condition (resolved to the condition's NodeId by the
/// node manager) is forwarded to the owning driver child as an acknowledge of its wire-ref
/// <c>FullName</c>. Each condition NodeId maps to exactly one driver ref (a condition is backed by a
/// single driver alarm), so this is a flat 1:1 map (the forward map fans out 1:N because one ref can
/// back several conditions on identical machines).
/// </summary>
private readonly Dictionary<string, (string DriverInstanceId, string FullName)> _driverRefByAlarmNodeId =
new(StringComparer.Ordinal);
/// <summary>Condition NodeId → (EquipmentId, tag Name, OPC UA alarm type, HistorizeToAveva) for building
/// the AlarmTransitionEvent fan-out. Built in the same PushDesiredSubscriptions alarm branch.
/// HistorizeToAveva (bool?, null ⇒ historize) is the native per-condition opt-out parsed from
/// <c>TagConfig.alarm.historizeToAveva</c>; it is threaded onto the transition so the
/// HistorianAdapterActor's <c>is not false</c> gate suppresses the durable AVEVA row only on an
/// explicit false (mirroring the scripted-alarm opt-out).</summary>
private readonly Dictionary<string, (string EquipmentId, string Name, string AlarmType, bool? HistorizeToAveva)> _alarmMetaByNodeId =
new(StringComparer.Ordinal);
/// <summary>Derives a full Part 9 condition snapshot from each native alarm transition delta,
/// tracking per-condition-NodeId prior state. <see cref="Clear"/>'d on every apply alongside the
/// value maps so stale condition state never leaks across redeploys.</summary>
private readonly NativeAlarmProjector _nativeAlarmProjector = new();
/// <summary>The composition from the most-recent apply (set at the END of
/// <see cref="PushDesiredSubscriptions"/>). Discovered-node injection
/// (<see cref="HandleDiscoveredNodes"/>) reads it to resolve the equipment bound to a driver (from the
/// composition's <c>EquipmentNodes</c> whose <c>DriverInstanceId</c> matches, UNION the authored
/// <c>EquipmentTags</c> for that driver — so a driver with zero authored tags can still graft onto an
/// equipment bound via <c>EquipmentNode.DriverInstanceId</c>) and to recompute the authored value + alarm
/// subscription sets when merging FixedTree refs. Null until the first apply — a
/// <see cref="DriverInstanceActor.DiscoveredNodesReady"/> arriving before any apply is ignored.</summary>
private AddressSpaceComposition? _lastComposition;
/// <summary>The most-recent discovered-injection plan(s) per driver instance, cached so the redeploy
/// re-inject tail can re-apply the live graft after an address-space rebuild without re-running discovery.
/// Keyed by DriverInstanceId at the OUTER level, then by EquipmentId at the INNER level (driver → (equipment
/// → plan)). Today only the single-equipment case is populated, so the inner map always has exactly one
/// entry; the inner map is shaped per-equipment now so the follow-up multi-device-partition task can hold
/// multiple (equipmentId → plan) entries per driver without reshaping this cache. Inner dict is mutable
/// (the redeploy tail drops stale per-equipment entries in place); both levels are Ordinal-keyed.
/// Last-writer-wins on a re-discovery (the whole inner map is replaced).</summary>
private readonly Dictionary<string, Dictionary<string, DiscoveredInjectionPlan>> _discoveredByDriver =
new(StringComparer.Ordinal);
/// <summary>
/// Cached local <see cref="RedundancyRole"/> from the latest <see cref="RedundancyStateChanged"/>
/// snapshot (null = unknown until the first snapshot arrives, or no local node match). The inbound
/// write gate in <see cref="HandleRouteNodeWrite"/> reuses this signal — the SAME one the
/// scripted-alarm emit gate uses (<c>ScriptedAlarmHostActor._localRole</c>): only the Primary
/// services writes, default-allow while unknown so single-node deploys + the boot window never
/// reject (a node is the sole Primary until told otherwise).
/// </summary>
private RedundancyRole? _localRole;
/// <summary>Cached cluster DistributedPubSub mediator, resolved once in <see cref="PreStart"/> (on the
/// actor thread) and reused for the Primary-gated native-alarm <c>alerts</c> fan-out in
/// <see cref="ForwardNativeAlarm"/> instead of re-resolving it per-publish. Mirrors
/// <c>ScriptedAlarmHostActor._mediator</c>.</summary>
private IActorRef _mediator = null!;
/// <summary>
/// Routes an inbound operator write (resolved from the OPC UA node-manager side, Task 11) to the
/// owning driver child. <see cref="HandleRouteNodeWrite"/> gates on the local node being the
/// driver Primary, resolves the <see cref="_driverRefByNodeId"/> reverse map, and Asks the child a
/// <see cref="DriverInstanceActor.WriteAttribute"/> carrying the driver-side <see cref="FullName"/>.
/// </summary>
/// <param name="NodeId">The folder-scoped equipment-variable NodeId the operator wrote to.</param>
/// <param name="Value">The value to write (the driver coerces it to the attribute's data type).</param>
public sealed record RouteNodeWrite(string NodeId, object? Value);
/// <summary>Reply to <see cref="RouteNodeWrite"/>: the outcome of forwarding the write to the driver
/// (or a gate/lookup failure that never reached the driver).</summary>
/// <param name="Success">True when the driver accepted the write.</param>
/// <param name="Reason">Failure detail when <paramref name="Success"/> is false; null on success.</param>
public sealed record NodeWriteResult(bool Success, string? Reason);
/// <summary>
/// Routes an inbound native-condition acknowledge to the owning driver child. The host wires the
/// OPC UA node manager's <c>NativeAlarmAckRouter</c> to Tell this in when a client Acknowledges a
/// NATIVE Part 9 condition; <see cref="HandleRouteNativeAlarmAck"/> applies the same Primary gate the
/// inbound write path uses, resolves the <see cref="_driverRefByAlarmNodeId"/> inverse map, and Tells
/// the owning <see cref="DriverInstanceActor"/> a <see cref="DriverInstanceActor.RouteAlarmAck"/>
/// carrying the principal — fire-and-forget (the Part 9 ack already committed the local condition
/// state). Deliberately decoupled from the OpcUaServer <c>NativeAlarmAck</c> type: the host maps
/// NativeAlarmAck → this at the wiring boundary so Runtime does not depend on the OPC UA layer.
/// </summary>
/// <param name="ConditionNodeId">The folder-scoped condition NodeId the operator acknowledged.</param>
/// <param name="Comment">Operator-supplied comment forwarded to the upstream alarm system; null when none.</param>
/// <param name="OperatorUser">The authenticated principal performing the acknowledge.</param>
public sealed record RouteNativeAlarmAck(string ConditionNodeId, string? Comment, string OperatorUser);
private sealed record ChildEntry(IActorRef Actor, DriverInstanceSpec Spec, bool Stubbed)
{
// Convenience accessors for sites that don't need the full spec.
public string DriverType => Spec.DriverType;
public string LastConfigJson => Spec.DriverConfig;
}
/// <inheritdoc />
public ITimerScheduler Timers { get; set; } = null!;
public sealed class RetryConfigDbConnection
{
public static readonly RetryConfigDbConnection Instance = new();
private RetryConfigDbConnection() { }
}
/// <summary>Creates props for a DriverHostActor with the specified dependencies.</summary>
/// <param name="dbFactory">Database context factory for configuration database access.</param>
/// <param name="localNode">The local cluster node identifier.</param>
/// <param name="coordinator">Optional coordinator actor reference for deployment coordination.</param>
/// <param name="driverFactory">Optional driver factory; defaults to null factory if not provided.</param>
/// <param name="localRoles">Optional set of roles assigned to the local node.</param>
/// <param name="dependencyMux">Optional actor reference for dependency multiplexing.</param>
/// <param name="opcUaPublishActor">Optional actor reference for OPC UA publishing.</param>
/// <param name="healthPublisher">Optional driver-health publisher; defaults to <see cref="NullDriverHealthPublisher"/>
/// so test harnesses and smoke fixtures don't need to wire it.</param>
/// <param name="virtualTagEvaluator">Optional evaluator handed to the spawned
/// <see cref="VirtualTagHostActor"/>'s children; defaults to <see cref="NullVirtualTagEvaluator"/>
/// (the dev/Mac path where no expression is evaluated). Production passes the DI-resolved
/// Roslyn evaluator.</param>
/// <param name="historyWriter">Optional sink handed to the spawned <see cref="VirtualTagHostActor"/>
/// for VirtualTag results whose plan opted into <c>Historize=true</c>; defaults to
/// <see cref="NullHistoryWriter"/> (the durable AVEVA sink is infra-gated, so no live-data historian
/// write RPC exists). A deployment that binds a real <see cref="IHistoryWriter"/> in DI overrides it.</param>
/// <param name="virtualTagHostOverride">Test seam: when supplied, this actor is used as the
/// VirtualTag host instead of spawning a real <see cref="VirtualTagHostActor"/> child, so tests
/// can intercept the <see cref="VirtualTagHostActor.ApplyVirtualTags"/> message. Null in
/// production (the real host is spawned).</param>
/// <param name="loggerFactory">Optional logger factory used to create the
/// <see cref="EfAlarmConditionStateStore"/>'s logger when spawning the ScriptedAlarm host;
/// defaults to <see cref="NullLoggerFactory"/> when not provided.</param>
/// <param name="scriptRootLogger">Optional root script logger required to spawn the ScriptedAlarm
/// host (the engine + its script logging hang off it). When null the ScriptedAlarm host is left
/// unspawned — the graceful dev/None-deployment path.</param>
/// <param name="scriptedAlarmHostOverride">Test seam: when supplied, this actor is used as the
/// ScriptedAlarm host instead of spawning a real <see cref="ScriptedAlarmHostActor"/> child, so
/// tests can intercept the <see cref="ScriptedAlarmHostActor.ApplyScriptedAlarms"/> message. Null
/// in production (the real host is spawned).</param>
public static Props Props(
IDbContextFactory<OtOpcUaConfigDbContext> dbFactory,
CommonsNodeId localNode,
IActorRef? coordinator = null,
IDriverFactory? driverFactory = null,
IReadOnlySet<string>? localRoles = null,
IActorRef? dependencyMux = null,
IActorRef? opcUaPublishActor = null,
IDriverHealthPublisher? healthPublisher = null,
IVirtualTagEvaluator? virtualTagEvaluator = null,
IHistoryWriter? historyWriter = null,
IActorRef? virtualTagHostOverride = null,
ILoggerFactory? loggerFactory = null,
ScriptRootLogger? scriptRootLogger = null,
IActorRef? scriptedAlarmHostOverride = null) =>
Akka.Actor.Props.Create(() => new DriverHostActor(
dbFactory, localNode, coordinator, driverFactory, localRoles, dependencyMux, opcUaPublishActor,
healthPublisher, virtualTagEvaluator, historyWriter, virtualTagHostOverride,
loggerFactory, scriptRootLogger, scriptedAlarmHostOverride));
/// <summary>Initializes a new DriverHostActor with the specified dependencies.</summary>
/// <param name="dbFactory">Database context factory for configuration database access.</param>
/// <param name="localNode">The local cluster node identifier.</param>
/// <param name="coordinator">Optional coordinator actor reference for deployment coordination.</param>
/// <param name="driverFactory">Optional driver factory; defaults to null factory if not provided.</param>
/// <param name="localRoles">Optional set of roles assigned to the local node.</param>
/// <param name="dependencyMux">Optional actor reference for dependency multiplexing.</param>
/// <param name="opcUaPublishActor">Optional actor reference for OPC UA publishing.</param>
/// <param name="healthPublisher">Optional driver-health publisher; defaults to <see cref="NullDriverHealthPublisher"/>.</param>
/// <param name="virtualTagEvaluator">Optional evaluator handed to the VirtualTag host's children;
/// defaults to <see cref="NullVirtualTagEvaluator"/>.</param>
/// <param name="historyWriter">Optional sink handed to the spawned <see cref="VirtualTagHostActor"/>
/// for historized VirtualTag results; defaults to <see cref="NullHistoryWriter"/>.</param>
/// <param name="virtualTagHostOverride">Test seam: when supplied, used as the VirtualTag host
/// instead of spawning a real <see cref="VirtualTagHostActor"/> child.</param>
/// <param name="loggerFactory">Optional logger factory used to create the
/// <see cref="EfAlarmConditionStateStore"/>'s logger; defaults to <see cref="NullLoggerFactory"/>.</param>
/// <param name="scriptRootLogger">Optional root script logger required to spawn the ScriptedAlarm
/// host; when null the host is left unspawned.</param>
/// <param name="scriptedAlarmHostOverride">Test seam: when supplied, used as the ScriptedAlarm host
/// instead of spawning a real <see cref="ScriptedAlarmHostActor"/> child.</param>
public DriverHostActor(
IDbContextFactory<OtOpcUaConfigDbContext> dbFactory,
CommonsNodeId localNode,
IActorRef? coordinator,
IDriverFactory? driverFactory = null,
IReadOnlySet<string>? localRoles = null,
IActorRef? dependencyMux = null,
IActorRef? opcUaPublishActor = null,
IDriverHealthPublisher? healthPublisher = null,
IVirtualTagEvaluator? virtualTagEvaluator = null,
IHistoryWriter? historyWriter = null,
IActorRef? virtualTagHostOverride = null,
ILoggerFactory? loggerFactory = null,
ScriptRootLogger? scriptRootLogger = null,
IActorRef? scriptedAlarmHostOverride = null)
{
_dbFactory = dbFactory;
_localNode = localNode;
_coordinatorOverride = coordinator;
_driverFactory = driverFactory ?? NullDriverFactory.Instance;
_localRoles = localRoles ?? new HashSet<string>(StringComparer.Ordinal);
_dependencyMux = dependencyMux;
_opcUaPublishActor = opcUaPublishActor;
_healthPublisher = healthPublisher ?? NullDriverHealthPublisher.Instance;
_virtualTagEvaluator = virtualTagEvaluator ?? NullVirtualTagEvaluator.Instance;
_historyWriter = historyWriter ?? NullHistoryWriter.Instance;
_virtualTagHostOverride = virtualTagHostOverride;
_loggerFactory = loggerFactory ?? NullLoggerFactory.Instance;
_scriptRootLogger = scriptRootLogger;
_scriptedAlarmHostOverride = scriptedAlarmHostOverride;
// Default behavior is Steady — PreStart may flip to Stale or replay an orphan apply.
Become(Steady);
}
/// <inheritdoc />
protected override void PreStart()
{
// Resolve the cluster DPS mediator once, on the actor thread, and reuse it for every subscribe +
// the Primary-gated native-alarm alerts fan-out in ForwardNativeAlarm (mirrors ScriptedAlarmHostActor).
_mediator = DistributedPubSub.Get(Context.System).Mediator;
// Subscribe to deployments topic so the coordinator's broadcast lands here.
_mediator.Tell(new Subscribe(DeploymentsTopic, Self));
// Subscribe to driver-control topic so AdminUI Reconnect/Restart commands land here.
_mediator.Tell(new Subscribe(DriverControlTopic, Self));
// Subscribe to the redundancy-state topic so cluster role changes cache this node's role — the
// inbound-write gate in HandleRouteNodeWrite reuses the SAME signal the scripted-alarm emit gate
// uses so only the Primary services operator writes (the secondary keeps state warm for failover).
_mediator.Tell(
new Subscribe(ZB.MOM.WW.OtOpcUa.Runtime.OpcUa.OpcUaPublishActor.RedundancyStateTopic, Self));
// Spawn the VirtualTag host BEFORE Bootstrap so the bootstrap-restore path (which routes
// through PushDesiredSubscriptions and Tells ApplyVirtualTags) has a live host to target.
SpawnVirtualTagHost();
// Same rationale for the ScriptedAlarm host — the bootstrap-restore path Tells it
// ApplyScriptedAlarms through the same PushDesiredSubscriptions pass.
SpawnScriptedAlarmHost();
Bootstrap();
}
/// <summary>
/// Spawns the single <see cref="VirtualTagHostActor"/> child that owns the Equipment-namespace
/// VirtualTagActors and bridges their results onto the OPC UA publish actor. A test-supplied
/// <c>virtualTagHostOverride</c> short-circuits the spawn so a probe can intercept
/// <see cref="VirtualTagHostActor.ApplyVirtualTags"/>. The real host requires a non-null
/// <see cref="_opcUaPublishActor"/> (its ctor throws otherwise), so when no publish actor is
/// wired (legacy ControlPlane test harnesses with no OPC UA sink) the host is left null and
/// ApplyVirtualTags becomes a no-op — VirtualTags can't have anywhere to publish without it.
/// </summary>
private void SpawnVirtualTagHost()
{
if (_virtualTagHostOverride is not null)
{
_virtualTagHost = _virtualTagHostOverride;
return;
}
if (_opcUaPublishActor is null)
{
_log.Debug("DriverHost {Node}: no OPC UA publish actor wired; skipping VirtualTag host spawn", _localNode);
return;
}
_virtualTagHost = Context.ActorOf(
VirtualTagHostActor.Props(_opcUaPublishActor, _dependencyMux, _virtualTagEvaluator, _historyWriter),
"virtual-tag-host");
}
/// <summary>
/// Spawns the single <see cref="ScriptedAlarmHostActor"/> child that owns the per-node
/// <see cref="ScriptedAlarmEngine"/>, feeds it live tag values from the dependency mux, and
/// bridges its emissions onto the OPC UA publish actor + the cluster <c>alerts</c> topic. A
/// test-supplied <c>scriptedAlarmHostOverride</c> short-circuits the spawn so a probe can
/// intercept <see cref="ScriptedAlarmHostActor.ApplyScriptedAlarms"/>. The real host needs
/// both a non-null <see cref="_opcUaPublishActor"/> (the emission sink) and a non-null
/// <see cref="_scriptRootLogger"/> (the engine + its script logging hang off it); when either
/// is missing (legacy ControlPlane test harnesses, dev/None deployments) the host is left
/// null and ApplyScriptedAlarms becomes a no-op. The engine is built around a fresh
/// <see cref="DependencyMuxTagUpstreamSource"/> + an <see cref="EfAlarmConditionStateStore"/>;
/// the host (spawned as a child) owns + disposes the engine in its PostStop, so it stops with
/// the driver host.
/// </summary>
private void SpawnScriptedAlarmHost()
{
if (_scriptedAlarmHostOverride is not null)
{
_scriptedAlarmHost = _scriptedAlarmHostOverride;
return;
}
if (_opcUaPublishActor is null || _scriptRootLogger is null)
{
_log.Debug(
"DriverHost {Node}: skipping ScriptedAlarm host spawn (no publish actor / root logger)",
_localNode);
return;
}
var upstream = new DependencyMuxTagUpstreamSource();
var store = new EfAlarmConditionStateStore(
_dbFactory, _loggerFactory.CreateLogger<EfAlarmConditionStateStore>());
var engine = new ScriptedAlarmEngine(
upstream, store, new ScriptLoggerFactory(_scriptRootLogger.Logger), _scriptRootLogger.Logger);
_scriptedAlarmHost = Context.ActorOf(
ScriptedAlarmHostActor.Props(_opcUaPublishActor, _dependencyMux, upstream, engine, _localNode),
"scripted-alarm-host");
}
private void Bootstrap()
{
// Read the most-recent NodeDeploymentState for this node; if it's Applied, jump
// to Steady with that revision. If Applying (orphan from a crash), discard and replay.
// If the DB is unreachable, fall back to Stale and start the reconnect loop.
try
{
using var db = _dbFactory.CreateDbContext();
var latest = db.NodeDeploymentStates
.Where(s => s.NodeId == _localNode.Value)
.OrderByDescending(s => s.StartedAtUtc)
.Select(s => new { s.DeploymentId, s.Status, s.StartedAtUtc })
.FirstOrDefault();
if (latest is null)
{
_log.Info("DriverHost {Node}: no prior deployments; entering Steady (no revision)", _localNode);
Become(Steady);
return;
}
var deployment = db.Deployments
.AsNoTracking()
.FirstOrDefault(d => d.DeploymentId == latest.DeploymentId);
var revision = deployment is null
? (RevisionHash?)null
: RevisionHash.Parse(deployment.RevisionHash);
switch (latest.Status)
{
case NodeDeploymentStatus.Applied:
_currentRevision = revision;
_log.Info("DriverHost {Node}: recovered Applied state at rev {Rev}", _localNode, revision);
Become(Steady);
// The revision is recovered but the in-memory driver children + OPC UA address
// space were lost on restart. Re-spawn + re-materialise + re-subscribe from the
// applied deployment so a restarted/rebuilt node restores its served state instead
// of waiting for a config change (whose identical-config revision would no-op).
RestoreApplied(new DeploymentId(latest.DeploymentId));
break;
case NodeDeploymentStatus.Applying:
_log.Warning("DriverHost {Node}: found orphan Applying row for deployment {Id}; replaying",
_localNode, latest.DeploymentId);
if (revision is not null) ApplyAndAck(new DeploymentId(latest.DeploymentId), revision.Value, CorrelationId.NewId());
else Become(Steady);
break;
case NodeDeploymentStatus.Failed:
default:
_log.Info("DriverHost {Node}: prior deployment {Id} failed; entering Steady at last known rev",
_localNode, latest.DeploymentId);
Become(Steady);
break;
}
}
catch (Exception ex)
{
_log.Warning(ex, "DriverHost {Node}: ConfigDb unreachable on bootstrap; entering Stale", _localNode);
Become(Stale);
}
}
private void Steady()
{
Receive<DispatchDeployment>(HandleDispatchFromSteady);
Receive<GetDiagnostics>(HandleGetDiagnostics);
Receive<DriverInstanceActor.AttributeValuePublished>(ForwardToMux);
Receive<DriverInstanceActor.AttributeAlarmPublished>(ForwardNativeAlarm);
Receive<DriverInstanceActor.DiscoveredNodesReady>(HandleDiscoveredNodes);
Receive<RestartDriver>(HandleRestartDriver);
Receive<ReconnectDriver>(HandleReconnectDriver);
Receive<RouteNodeWrite>(HandleRouteNodeWrite);
Receive<RouteNativeAlarmAck>(HandleRouteNativeAlarmAck);
Receive<RedundancyStateChanged>(OnRedundancyStateChanged);
// The redundancy-state topic also carries OpcUaProbeResult (OpcUaPublishActor peer-probes).
// We don't consume it here — drop it so it doesn't dead-letter (matches PeerProbeSupervisor).
Receive<ZB.MOM.WW.OtOpcUa.Runtime.Health.PeerOpcUaProbeActor.OpcUaProbeResult>(_ => { });
Receive<SubscribeAck>(_ => { /* PubSub ack */ });
}
private void Applying()
{
Receive<DispatchDeployment>(msg =>
{
if (_applyingDeploymentId is not null && msg.DeploymentId == _applyingDeploymentId.Value)
{
_log.Debug("DriverHost {Node}: duplicate DispatchDeployment for in-flight {Id}; ignoring",
_localNode, msg.DeploymentId);
return;
}
_log.Info("DriverHost {Node}: dispatch for {Id} received while still applying {Cur}; deferring",
_localNode, msg.DeploymentId, _applyingDeploymentId);
Self.Forward(msg); // re-deliver after we transition back
});
Receive<GetDiagnostics>(HandleGetDiagnostics);
Receive<DriverInstanceActor.AttributeValuePublished>(ForwardToMux);
Receive<DriverInstanceActor.AttributeAlarmPublished>(ForwardNativeAlarm);
Receive<DriverInstanceActor.DiscoveredNodesReady>(HandleDiscoveredNodes);
Receive<RestartDriver>(HandleRestartDriver);
Receive<ReconnectDriver>(HandleReconnectDriver);
Receive<RouteNodeWrite>(HandleRouteNodeWrite);
Receive<RouteNativeAlarmAck>(HandleRouteNativeAlarmAck);
Receive<RedundancyStateChanged>(OnRedundancyStateChanged);
// The redundancy-state topic also carries OpcUaProbeResult (OpcUaPublishActor peer-probes).
// We don't consume it here — drop it so it doesn't dead-letter (matches PeerProbeSupervisor).
Receive<ZB.MOM.WW.OtOpcUa.Runtime.Health.PeerOpcUaProbeActor.OpcUaProbeResult>(_ => { });
Receive<SubscribeAck>(_ => { /* PubSub ack */ });
}
private void ForwardToMux(DriverInstanceActor.AttributeValuePublished msg)
{
// Pass driver-published values to the dependency mux when one is wired (VirtualTag inputs,
// keyed by FullReference). Without a mux, VirtualTagActor evaluation can't fire — that's the
// dev/Mac path (no virtual tags registered); production binds the mux via the RuntimeActors
// extension. KEEP this unchanged — VirtualTag inputs are still keyed by FullReference.
_dependencyMux?.Tell(msg);
if (_opcUaPublishActor is null) return;
// Route the value to the OPC UA sink at the variable's ACTUAL NodeId. Equipment-tag variables
// are materialised with folder-scoped NodeIds (EquipmentId/FolderPath/Name), while the driver
// publishes keyed by its wire-ref FullName (FullReference). The _nodeIdByDriverRef map — built
// each apply from the composition's EquipmentTags — resolves (DriverInstanceId, FullName) to
// the folder-scoped NodeId(s) the materialiser placed the variable(s) at, so the value lands
// instead of leaving the variable at BadWaitingForInitialData. One driver ref can back several
// equipment variables (identical machines sharing a register), hence the fan-out.
if (_nodeIdByDriverRef.TryGetValue((msg.DriverInstanceId, msg.FullReference), out var nodeIds))
{
foreach (var nodeId in nodeIds)
_opcUaPublishActor.Tell(new ZB.MOM.WW.OtOpcUa.Runtime.OpcUa.OpcUaPublishActor.AttributeValueUpdate(
nodeId, msg.Value, msg.Quality, msg.TimestampUtc));
}
else
{
_log.Debug("DriverHost {Node}: no equipment-tag NodeId for ({Driver},{Ref}) — value dropped",
_localNode, msg.DriverInstanceId, msg.FullReference);
}
}
/// <summary>
/// Handles a driver child's post-connect <see cref="DriverInstanceActor.DiscoveredNodesReady"/>:
/// resolves the equipment the driver is bound to from the most-recent applied composition (its
/// <c>EquipmentNodes</c> bound by <c>DriverInstanceId</c> UNION its authored <c>EquipmentTags</c>),
/// maps the captured FixedTree under it via <see cref="DiscoveredNodeMapper"/> (deduping any node that
/// shadows an authored equipment-tag ref), caches the per-equipment plan map, and grafts it onto the
/// served address space + live-value maps + subscription set via
/// <see cref="ApplyDiscoveredPlansForDriver"/>. Idempotent / duplicate-safe: the mapper is pure,
/// materialisation is idempotent, and the routing-map extension + subscription merge are set-based.
/// </summary>
private void HandleDiscoveredNodes(DriverInstanceActor.DiscoveredNodesReady msg)
{
if (_lastComposition is null)
{
_log.Debug("DriverHost {Node}: DiscoveredNodesReady from {Driver} before any composition applied — ignored",
_localNode, msg.DriverInstanceId);
return;
}
// Resolve the equipment bound to this driver from BOTH the composition's EquipmentNodes (whose
// DriverInstanceId matches — this lets a driver with ZERO authored tags graft onto a tag-less
// equipment) UNION the authored EquipmentTags for the driver (the original resolution). Distinct so a
// driver that is both EquipmentNode-bound AND has authored tags under the same equipment resolves once.
var fromNodes = _lastComposition.EquipmentNodes
.Where(e => e.DriverInstanceId is not null && string.Equals(e.DriverInstanceId, msg.DriverInstanceId, StringComparison.Ordinal))
.Select(e => e.EquipmentId);
var fromTags = _lastComposition.EquipmentTags
.Where(t => string.Equals(t.DriverInstanceId, msg.DriverInstanceId, StringComparison.Ordinal))
.Select(t => t.EquipmentId);
var equipmentIds = fromNodes.Concat(fromTags).Distinct(StringComparer.Ordinal).ToList();
if (equipmentIds.Count == 0)
{
_log.Info("DriverHost {Node}: no equipment for driver {Driver} — skipping discovered-node injection",
_localNode, msg.DriverInstanceId);
return;
}
if (equipmentIds.Count > 1)
{
// NEXT TASK (multi-device partition) REPLACES THIS BRANCH: a driver that fans out to multiple
// equipments (one per device-host) will partition its discovered FixedTree by DeviceHost and graft
// each partition under its matching equipment, populating multiple inner-map entries. Until then we
// keep the conservative warn+skip — a single-equipment graft is the only shape this task handles.
_log.Warning("DriverHost {Node}: driver {Driver} maps to {Count} equipments — discovered-node injection skipped (multi-equipment-per-driver is a follow-up)",
_localNode, msg.DriverInstanceId, equipmentIds.Count);
return;
}
var equipmentId = equipmentIds[0];
// Authored refs for THIS driver (both value + alarm tags) so a discovered node never shadows an
// authored one — the mapper drops any captured node whose FullReference is already authored. May be
// EMPTY for a tag-less equipment, which is fine: Map dedups against an empty set (keeps everything).
var authoredRefs = _lastComposition.EquipmentTags
.Where(t => string.Equals(t.DriverInstanceId, msg.DriverInstanceId, StringComparison.Ordinal))
.Select(t => t.FullName)
.ToHashSet(StringComparer.Ordinal);
var plan = DiscoveredNodeMapper.Map(equipmentId, msg.Nodes, authoredRefs);
if (plan.Variables.Count == 0) return; // nothing new to inject (all captured nodes were authored)
// The driver's per-equipment plan map for this discovery. Single-equipment today ⇒ one entry; the
// multi-device task will add an entry per partitioned equipment here.
var newPlans = new Dictionary<string, DiscoveredInjectionPlan>(StringComparer.Ordinal) { [equipmentId] = plan };
// Unchanged-plan short-circuit: the driver re-discovers every ~2s (up to ~15 passes) until the
// FixedTree set stabilises, re-sending DiscoveredNodesReady each pass. Re-applying an IDENTICAL set
// would re-send SetDesiredSubscriptions, forcing the child to UnsubscribeAsync (dropping the WHOLE
// handle — authored tags included) then re-Subscribe — blipping authored-tag values up to ~15× across
// the discovery window. Skip when the WHOLE per-equipment routing is unchanged from the last applied
// pass; a GROWING set still differs (superset) and re-applies. This is _discoveredByDriver's first reader.
if (_discoveredByDriver.TryGetValue(msg.DriverInstanceId, out var cached)
&& PlansRoutingEqual(cached, newPlans))
{
_log.Debug("DriverHost {Node}: discovered set for driver {Driver} unchanged ({Count} node(s)) — re-apply skipped",
_localNode, msg.DriverInstanceId, plan.Variables.Count);
return;
}
_discoveredByDriver[msg.DriverInstanceId] = newPlans;
ApplyDiscoveredPlansForDriver(msg.DriverInstanceId, newPlans);
}
/// <summary>Routing-map equality: same count + every key maps to the same NodeId. Lets
/// <see cref="HandleDiscoveredNodes"/> skip re-applying an unchanged discovered set across the driver's
/// repeated post-connect re-discovery passes (a grown/changed set differs and re-applies).</summary>
private static bool RoutingEquals(IReadOnlyDictionary<string, string> a, IReadOnlyDictionary<string, string> b)
=> a.Count == b.Count
&& a.All(kv => b.TryGetValue(kv.Key, out var v) && string.Equals(v, kv.Value, StringComparison.Ordinal));
/// <summary>Per-equipment plan-map routing equality: same equipment keys + each equipment's plan has the
/// same <see cref="DiscoveredInjectionPlan.RoutingByRef"/> (via <see cref="RoutingEquals"/>). Lets
/// <see cref="HandleDiscoveredNodes"/> short-circuit a re-discovery whose WHOLE per-driver set is unchanged
/// (a grown/changed set on any equipment differs and re-applies).</summary>
private static bool PlansRoutingEqual(
IReadOnlyDictionary<string, DiscoveredInjectionPlan> a,
IReadOnlyDictionary<string, DiscoveredInjectionPlan> b)
=> a.Count == b.Count
&& a.All(kv => b.TryGetValue(kv.Key, out var p) && RoutingEquals(kv.Value.RoutingByRef, p.RoutingByRef));
/// <summary>
/// Grafts a driver's per-equipment <see cref="DiscoveredInjectionPlan"/> map onto the served state in
/// two phases so the resubscribe stays a single push per driver (the shape the multi-device-partition
/// follow-up needs without resubscribe churn):
/// <list type="number">
/// <item><b>Materialise per equipment</b> — for each <c>(equipmentId, plan)</c> entry, extend the
/// live-value routing map (mirroring <see cref="PushDesiredSubscriptions"/>' fan-out so
/// <see cref="ForwardToMux"/> lands FixedTree values on the right node) and Tell the publish actor
/// <see cref="ZB.MOM.WW.OtOpcUa.Runtime.OpcUa.OpcUaPublishActor.MaterialiseDiscoveredNodes"/> for
/// that equipment (idempotent).</item>
/// <item><b>Subscribe ONCE per driver</b> — compute the union of the driver's authored value refs
/// (recomputed the same way <see cref="PushDesiredSubscriptions"/> does) and the FixedTree refs of
/// ALL the driver's cached plans, then Tell the child a single
/// <see cref="DriverInstanceActor.SetDesiredSubscriptions"/> so the poll engine reads them and the
/// values flow. For a single-equipment driver this equals the prior per-plan behavior.</item>
/// </list>
/// Extracted as a standalone method so the redeploy re-inject tail can re-apply the cached plans after
/// an address-space rebuild without re-running discovery.
/// </summary>
private void ApplyDiscoveredPlansForDriver(
string driverId, IReadOnlyDictionary<string, DiscoveredInjectionPlan> plansByEquipment)
{
// (a) Per-equipment: extend the live-value routing map (fan-out, mirroring PushDesiredSubscriptions'
// pattern) + materialise the discovered folders + variables under that equipment (idempotent). This is
// purely ADDITIVE across passes: a shrinking discovery set would leave the dropped refs' stale routes
// until the next full apply (PushDesiredSubscriptions) clears + rebuilds the maps — acceptable because
// a FOCAS FixedTree only grows-then-stabilises, never shrinks within a connect.
var totalVariables = 0;
foreach (var (equipmentId, plan) in plansByEquipment)
{
foreach (var (driverRef, nodeId) in plan.RoutingByRef)
{
var key = (driverId, driverRef);
if (!_nodeIdByDriverRef.TryGetValue(key, out var set))
_nodeIdByDriverRef[key] = set = new HashSet<string>(StringComparer.Ordinal);
set.Add(nodeId);
_driverRefByNodeId[nodeId] = key;
}
_opcUaPublishActor?.Tell(new ZB.MOM.WW.OtOpcUa.Runtime.OpcUa.OpcUaPublishActor.MaterialiseDiscoveredNodes(
equipmentId, plan.Folders, plan.Variables));
totalVariables += plan.Variables.Count;
}
// (b) ONE subscription push per driver: merge the FixedTree refs from ALL the driver's plans into the
// driver's desired subscription set so the poll engine reads them and ForwardToMux routes the values.
// Recompute the authored value + alarm refs the same way PushDesiredSubscriptions does, then union the
// FixedTree refs onto the value set. Doing the union here (rather than once per plan) means the
// multi-device task adds inner-map entries without changing this single-send shape.
if (!_children.TryGetValue(driverId, out var entry)) return;
// The _lastComposition null-guards below are defensive: HandleDiscoveredNodes already proved it
// non-null, but the redeploy tail also calls this from the PushDesiredSubscriptions tail — keep them
// so that re-apply path can't NRE.
var authoredValueRefs = _lastComposition is null
? Enumerable.Empty<string>()
: _lastComposition.EquipmentTags
.Where(t => t.Alarm is null && string.Equals(t.DriverInstanceId, driverId, StringComparison.Ordinal))
.Select(t => t.FullName);
var alarmRefs = _lastComposition is null
? Array.Empty<string>()
: _lastComposition.EquipmentTags
.Where(t => t.Alarm is not null && string.Equals(t.DriverInstanceId, driverId, StringComparison.Ordinal))
.Select(t => t.FullName)
.Distinct(StringComparer.Ordinal)
.ToArray();
var discoveredRefs = plansByEquipment.Values.SelectMany(p => p.RoutingByRef.Keys);
var union = authoredValueRefs.Concat(discoveredRefs).Distinct(StringComparer.Ordinal).ToArray();
entry.Actor.Tell(new DriverInstanceActor.SetDesiredSubscriptions(union, SubscriptionPublishingInterval, alarmRefs));
_log.Info("DriverHost {Node}: injected {Count} discovered node(s) for driver {Driver} across {Equipment} equipment(s)",
_localNode, totalVariables, driverId, plansByEquipment.Count);
}
/// <summary>
/// Routes a native alarm transition (published by a driver child as
/// <see cref="DriverInstanceActor.AttributeAlarmPublished"/>) to its materialised Part 9 condition
/// node(s). The alarm path analogue of <see cref="ForwardToMux"/>: the transition's <c>ConditionId</c>
/// (the dotted alarm full-reference, which equals the authored equipment-tag FullName — NOT the bare
/// <c>SourceNodeId</c> owning object) is resolved by the <see cref="_alarmNodeIdByDriverRef"/> map —
/// built each apply from the alarm-bearing EquipmentTags — to the folder-scoped condition NodeId(s)
/// the materialiser placed the condition(s) at.
/// For each node the <see cref="_nativeAlarmProjector"/> projects the transition delta into a full
/// <c>AlarmConditionSnapshot</c>, then this Tells <see cref="ZB.MOM.WW.OtOpcUa.Runtime.OpcUa.OpcUaPublishActor.AlarmStateUpdate"/>
/// — the SAME message scripted alarms use, so it routes through <c>WriteAlarmCondition</c>. An
/// unknown ref is Debug-logged and dropped (mirrors the value drop).
///
/// <para>
/// Each transition is ALSO published as an <see cref="AlarmTransitionEvent"/> to the cluster
/// <c>alerts</c> topic — the single historization + live <c>/alerts</c> fan-out path, exactly as
/// scripted alarms do (<c>ScriptedAlarmHostActor.OnEngineEmission</c>). The OPC UA condition
/// write above stays UNGATED (a Secondary keeps its address space warm for failover), but the
/// cluster-wide alerts publish is Primary-gated on the same <see cref="_localRole"/> redundancy
/// signal the inbound-write gate uses — only the Primary publishes the single fleet-wide copy.
/// </para>
/// </summary>
private void ForwardNativeAlarm(DriverInstanceActor.AttributeAlarmPublished msg)
{
if (_opcUaPublishActor is null) return;
// Resolve on ConditionId, NOT SourceNodeId: for Galaxy the dotted alarm full-reference — which
// equals the authored equipment-tag FullName the map is keyed by — is carried in ConditionId
// (AlarmFullReference), while SourceNodeId is the bare owning object (SourceObjectReference).
if (!_alarmNodeIdByDriverRef.TryGetValue((msg.DriverInstanceId, msg.Args.ConditionId), out var nodeIds))
{
_log.Debug("DriverHost {Node}: no alarm condition for ({Driver},{Ref}) — transition dropped",
_localNode, msg.DriverInstanceId, msg.Args.ConditionId);
return;
}
foreach (var nodeId in nodeIds)
{
var snapshot = _nativeAlarmProjector.Project(nodeId, msg.Args);
_opcUaPublishActor.Tell(new ZB.MOM.WW.OtOpcUa.Runtime.OpcUa.OpcUaPublishActor.AlarmStateUpdate(
nodeId, snapshot, msg.Args.SourceTimestampUtc));
// Warm-standby dedup: the OPC UA condition write above is UNGATED (the secondary keeps its
// address space warm), but only the Primary publishes the cluster-wide alerts transition.
// Default-emit until told we are Secondary/Detached so single-node deploys + the boot window
// never drop transitions — the SAME signal HandleRouteNodeWrite gates writes on.
if (_localRole is RedundancyRole.Secondary or RedundancyRole.Detached) continue;
var meta = _alarmMetaByNodeId.TryGetValue(nodeId, out var m)
? m : (EquipmentId: nodeId, Name: nodeId, AlarmType: "AlarmCondition", HistorizeToAveva: (bool?)null);
_mediator.Tell(new Publish(ScriptedAlarmHostActor.AlertsTopic, new AlarmTransitionEvent(
AlarmId: nodeId,
EquipmentPath: meta.EquipmentId,
AlarmName: meta.Name,
TransitionKind: ToEventKind(msg.Args.Kind),
// The projector mapped the four-bucket AlarmSeverity onto the OPC UA 1..1000 scale already;
// reuse its ushort so the condition node + the alerts row agree on severity.
Severity: snapshot.Severity,
Message: msg.Args.Message,
// Native transitions are device-driven; an operator comment only rides along on an
// acknowledge from the upstream alarm system. "device" marks a non-operator origin.
User: msg.Args.OperatorComment is null ? string.Empty : "device",
TimestampUtc: msg.Args.SourceTimestampUtc,
AlarmTypeName: meta.AlarmType,
Comment: msg.Args.OperatorComment,
// Per-condition opt-out parsed from TagConfig.alarm.historizeToAveva (bool?, null ⇒ absent ⇒
// historize). The HistorianAdapterActor gate (historizeToAveva is not false) historizes null +
// true and suppresses the durable AVEVA row only on an explicit false — the same posture as the
// scripted-alarm opt-out. null here rides through unchanged (the gate treats it as default-on).
HistorizeToAveva: meta.HistorizeToAveva)));
}
}
/// <summary>Maps a native <see cref="AlarmTransitionKind"/> onto the canonical alarm event-kind
/// vocabulary scripted alarms emit (the <c>EmissionKind</c> names) so a native row renders with the
/// correct chip on the <c>/alerts</c> page and historizes into the same <c>EventKind</c> column as
/// scripted alarms. An unmapped/unknown transition surfaces as <c>Activated</c> (visible) rather than
/// a grey unknown label.</summary>
private static string ToEventKind(AlarmTransitionKind kind) => kind switch
{
AlarmTransitionKind.Raise or AlarmTransitionKind.Retrigger => "Activated",
AlarmTransitionKind.Clear => "Cleared",
AlarmTransitionKind.Acknowledge => "Acknowledged",
_ => "Activated",
};
/// <summary>
/// Routes an inbound operator write (Task 11 Asks this from the OPC UA node-manager side) to the
/// owning driver child. Order matters:
/// <list type="number">
/// <item>PRIMARY gate FIRST — reuses the same <see cref="_localRole"/> redundancy signal the
/// scripted-alarm emit gate uses. Only the Primary services writes (default-allow while the
/// role is unknown, so single-node deploys + the boot window never reject). A Secondary/Detached
/// node keeps its address space + driver state warm for failover but must NOT push writes to the
/// shared field device.</item>
/// <item>Resolve the <see cref="_driverRefByNodeId"/> reverse map to the owning
/// <c>(DriverInstanceId, FullName)</c>.</item>
/// <item>Resolve the running driver child.</item>
/// <item>Ask the child a bounded <see cref="DriverInstanceActor.WriteAttribute"/> of the driver-side
/// <see cref="DriverInstanceActor.WriteAttribute.TagId">FullName</see> and pipe the translated
/// result back to the asker.</item>
/// </list>
/// Every branch replies the asker a <see cref="NodeWriteResult"/> exactly once.
/// </summary>
private void HandleRouteNodeWrite(RouteNodeWrite msg)
{
// PRIMARY GATE FIRST — only the Primary services operator writes (same signal as the alarm-emit
// gate; unknown role ⇒ treated as Primary so single-node deploys + the boot window aren't blocked).
if (_localRole is RedundancyRole.Secondary or RedundancyRole.Detached)
{
Sender.Tell(new NodeWriteResult(false, "not primary"));
return;
}
if (!_driverRefByNodeId.TryGetValue(msg.NodeId, out var target))
{
Sender.Tell(new NodeWriteResult(false, $"no driver mapping for node {msg.NodeId}"));
return;
}
if (!_children.TryGetValue(target.DriverInstanceId, out var entry))
{
Sender.Tell(new NodeWriteResult(false, "driver not running"));
return;
}
// Ask the child a bounded write — DriverInstanceActor.HandleWriteAsync already bounds the backend
// call to 5s, so the 8s Ask is a safety net for an actor that never replies. Capture Sender before
// the continuation: it runs off the actor thread, where raw Sender is unsafe to read.
var replyTo = Sender;
entry.Actor.Ask<DriverInstanceActor.WriteAttributeResult>(
new DriverInstanceActor.WriteAttribute(target.FullName, msg.Value!), TimeSpan.FromSeconds(8))
.ContinueWith(
t => t.IsCompletedSuccessfully
? new NodeWriteResult(t.Result.Success, t.Result.Reason)
: new NodeWriteResult(false, "write timeout"),
CancellationToken.None,
TaskContinuationOptions.None,
TaskScheduler.Default)
.PipeTo(replyTo);
}
/// <summary>
/// Routes an inbound native-condition acknowledge (the host Tells this from the OPC UA node-manager
/// side when a client Acknowledges a NATIVE Part 9 condition) to the owning driver child. Mirrors
/// <see cref="HandleRouteNodeWrite"/>'s order + gating, but the ack is fire-and-forget (no reply):
/// the OPC UA Part 9 acknowledge already committed the local condition state and the driver's
/// <see cref="IAlarmSource.AcknowledgeAsync"/> returns no per-condition status.
/// <list type="number">
/// <item>PRIMARY gate FIRST — reuses the same <see cref="_localRole"/> redundancy signal as the
/// inbound-write gate. Only the Primary pushes the ack to the shared upstream alarm system
/// (default-allow while the role is unknown). A Secondary/Detached node keeps its condition
/// state warm for failover but must NOT push the command. Drop (debug-log) on non-primary.</item>
/// <item>Resolve the <see cref="_driverRefByAlarmNodeId"/> inverse map to the owning
/// <c>(DriverInstanceId, FullName = ConditionId)</c>; an unmapped node is debug-logged + dropped
/// (no throw) — mirrors <see cref="ForwardNativeAlarm"/>'s unknown-ref drop.</item>
/// <item>Resolve the running driver child and Tell it a
/// <see cref="DriverInstanceActor.RouteAlarmAck"/> carrying the wire-ref FullName + principal.</item>
/// </list>
/// </summary>
private void HandleRouteNativeAlarmAck(RouteNativeAlarmAck msg)
{
// PRIMARY GATE FIRST — only the Primary services operator acks (same signal as the inbound-write +
// alarm-emit gates; unknown role ⇒ treated as Primary so single-node deploys + the boot window aren't blocked).
if (_localRole is RedundancyRole.Secondary or RedundancyRole.Detached)
{
_log.Debug("DriverHost {Node}: dropping native-alarm ack for {Cond} — not primary",
_localNode, msg.ConditionNodeId);
return;
}
if (!_driverRefByAlarmNodeId.TryGetValue(msg.ConditionNodeId, out var target))
{
_log.Debug("DriverHost {Node}: no driver mapping for alarm condition node {Cond} — ack dropped",
_localNode, msg.ConditionNodeId);
return;
}
if (!_children.TryGetValue(target.DriverInstanceId, out var entry))
{
_log.Debug("DriverHost {Node}: driver {Driver} not running for alarm ack of {Cond} — dropped",
_localNode, target.DriverInstanceId, msg.ConditionNodeId);
return;
}
// Fire-and-forget: the OPC UA Part 9 ack already committed the local condition state, and the
// driver's AcknowledgeAsync surfaces no per-condition status, so there is nothing to reply. The
// driver correlates on ConditionId (= the authored alarm FullName the inverse map keyed on).
entry.Actor.Tell(new DriverInstanceActor.RouteAlarmAck(target.FullName, msg.Comment, msg.OperatorUser));
}
/// <summary>Caches this node's <see cref="RedundancyRole"/> from a cluster redundancy snapshot so
/// <see cref="HandleRouteNodeWrite"/> can gate inbound writes to the Primary. A snapshot that doesn't
/// mention this node leaves the cached role unchanged ⇒ default-allow. Mirrors
/// <c>ScriptedAlarmHostActor.OnRedundancyStateChanged</c> / <c>OpcUaPublishActor</c>.</summary>
private void OnRedundancyStateChanged(RedundancyStateChanged msg)
{
var local = msg.Nodes.FirstOrDefault(n => n.NodeId == _localNode);
if (local is not null)
{
_localRole = local.Role;
}
}
private void Stale()
{
Receive<DispatchDeployment>(_ =>
{
_log.Warning("DriverHost {Node}: ignoring DispatchDeployment while Stale (DB unreachable)", _localNode);
});
Receive<GetDiagnostics>(HandleGetDiagnostics);
Receive<RetryConfigDbConnection>(_ => TryRecoverFromStale());
Receive<RestartDriver>(HandleRestartDriver);
Receive<ReconnectDriver>(HandleReconnectDriver);
Receive<RedundancyStateChanged>(OnRedundancyStateChanged);
// The redundancy-state topic also carries OpcUaProbeResult (OpcUaPublishActor peer-probes).
// We don't consume it here — drop it so it doesn't dead-letter (matches PeerProbeSupervisor).
Receive<ZB.MOM.WW.OtOpcUa.Runtime.Health.PeerOpcUaProbeActor.OpcUaProbeResult>(_ => { });
// An inbound operator write can't be serviced while the config DB is unreachable — fast-fail so the
// node-manager's bounded Ask gets an immediate clear status instead of dead-lettering into a timeout.
Receive<RouteNodeWrite>(_ =>
Sender.Tell(new NodeWriteResult(false, "driver host stale (config DB unreachable)")));
// An inbound native-condition ack can't be serviced while Stale either (the alarm inverse map is
// empty until an apply runs). The ack is fire-and-forget (no reply), so just drop it with a log —
// the local OPC UA condition state already committed on the Part 9 ack.
Receive<RouteNativeAlarmAck>(msg =>
_log.Debug("DriverHost {Node}: dropping native-alarm ack for {Node2} while Stale (config DB unreachable)",
_localNode, msg.ConditionNodeId));
// A driver child's post-connect DiscoveredNodesReady can't be injected while Stale (no composition is
// applied yet, so the equipment can't be resolved). Drop it — Task 6's re-discovery loop re-sends it
// and the Task-8 post-recovery re-apply self-heal it once an apply runs (matches the no-op drops above).
Receive<DriverInstanceActor.DiscoveredNodesReady>(_ => { });
Receive<SubscribeAck>(_ => { /* PubSub ack */ });
Timers.StartPeriodicTimer("retry-db", RetryConfigDbConnection.Instance, ReconnectInterval);
}
private void HandleGetDiagnostics(GetDiagnostics msg)
{
var drivers = _children
.Select(kv => new DriverInstanceDiagnostics(
DriverInstanceId: Guid.Empty,
Name: kv.Key,
State: kv.Value.Stubbed ? "Stubbed" : "Spawned",
ConnectedDevices: 0,
FaultedDevices: 0,
LastChangeUtc: DateTime.UtcNow))
.ToArray();
var snapshot = new NodeDiagnosticsSnapshot(
NodeId: _localNode,
CurrentRevision: _currentRevision,
Drivers: drivers,
AsOfUtc: DateTime.UtcNow);
Sender.Tell(snapshot);
}
private void HandleDispatchFromSteady(DispatchDeployment msg)
{
if (_currentRevision is { } cur && cur == msg.RevisionHash)
{
// Idempotent — already at this rev. Ack and stay Steady.
_log.Debug("DriverHost {Node}: dispatch {Id} matches current rev {Rev}; immediate ACK",
_localNode, msg.DeploymentId, msg.RevisionHash);
SendAck(msg.DeploymentId, ApplyAckOutcome.Applied, failureReason: null, msg.CorrelationId);
return;
}
ApplyAndAck(msg.DeploymentId, msg.RevisionHash, msg.CorrelationId);
}
private void ApplyAndAck(DeploymentId deploymentId, RevisionHash revision, CorrelationId correlation)
{
_applyingDeploymentId = deploymentId;
Become(Applying);
using var span = OtOpcUaTelemetry.StartDeployApplySpan(deploymentId.ToString());
span?.SetTag("otopcua.node_id", _localNode.ToString());
span?.SetTag("otopcua.revision", revision.ToString());
span?.SetTag("otopcua.correlation_id", correlation.ToString());
var sw = Stopwatch.StartNew();
// Persist Applying row (idempotent on PK).
UpsertNodeDeploymentState(deploymentId, NodeDeploymentStatus.Applying, failureReason: null);
try
{
ReconcileDrivers(deploymentId);
_currentRevision = revision;
UpsertNodeDeploymentState(deploymentId, NodeDeploymentStatus.Applied, failureReason: null);
SendAck(deploymentId, ApplyAckOutcome.Applied, failureReason: null, correlation);
// Trigger the OPC UA address-space rebuild so the local SDK reflects the new
// composition. The publish actor handles the load-compose-diff-apply pipeline; we
// just forward the same correlation id so the audit trail joins up.
_opcUaPublishActor?.Tell(new ZB.MOM.WW.OtOpcUa.Runtime.OpcUa.OpcUaPublishActor.RebuildAddressSpace(correlation, deploymentId));
// SubscribeBulk pass: hand each driver its desired tag references so live values flow into
// the just-rebuilt address space instead of staying BadWaitingForInitialData.
PushDesiredSubscriptions(deploymentId);
OtOpcUaTelemetry.DeploymentApplied.Add(1, new KeyValuePair<string, object?>("outcome", "ack"));
_log.Info("DriverHost {Node}: applied deployment {Id} (rev {Rev}, children={Count})",
_localNode, deploymentId, revision, _children.Count);
}
catch (Exception ex)
{
UpsertNodeDeploymentState(deploymentId, NodeDeploymentStatus.Failed, ex.Message);
SendAck(deploymentId, ApplyAckOutcome.Failed, ex.Message, correlation);
OtOpcUaTelemetry.DeploymentApplied.Add(1, new KeyValuePair<string, object?>("outcome", "reject"));
span?.SetStatus(ActivityStatusCode.Error, ex.Message);
_log.Error(ex, "DriverHost {Node}: apply of {Id} failed", _localNode, deploymentId);
}
finally
{
OtOpcUaTelemetry.DeploymentApplyDurationSec.Record(sw.Elapsed.TotalSeconds);
_applyingDeploymentId = null;
Become(Steady);
}
}
/// <summary>
/// Read the deployment artifact + reconcile the set of running <see cref="DriverInstanceActor"/>
/// children. Spawn missing, ApplyDelta on config change, stop removed/disabled drivers.
/// When the artifact blob is empty (legacy ControlPlane tests, smoke fixtures) or the
/// configured <see cref="IDriverFactory"/> can't materialise any of the requested
/// types, this is effectively a no-op.
/// </summary>
private void ReconcileDrivers(DeploymentId deploymentId)
{
byte[] blob;
try
{
using var db = _dbFactory.CreateDbContext();
blob = db.Deployments.AsNoTracking()
.Where(d => d.DeploymentId == deploymentId.Value)
.Select(d => d.ArtifactBlob)
.FirstOrDefault() ?? Array.Empty<byte>();
}
catch (Exception ex)
{
_log.Warning(ex, "DriverHost {Node}: failed to load artifact for {Id}; skipping reconcile",
_localNode, deploymentId);
return;
}
var specs = DeploymentArtifact.ParseDriverInstances(blob, _localNode.Value);
var snapshots = _children.ToDictionary(
kv => kv.Key,
kv => new DriverChildSnapshot(kv.Value.DriverType, kv.Value.LastConfigJson),
StringComparer.Ordinal);
var plan = DriverSpawnPlanner.Compute(snapshots, specs);
foreach (var id in plan.ToStop) StopChild(id);
foreach (var spec in plan.ToApplyDelta) ApplyChildDelta(spec);
foreach (var spec in plan.ToSpawn) SpawnChild(spec);
}
/// <summary>
/// Restore the served state for an already-applied deployment after a process restart.
/// <see cref="Bootstrap"/> recovers <see cref="_currentRevision"/> from NodeDeploymentState,
/// but the driver children and OPC UA address space are in-memory and gone after a restart —
/// so without this a restarted node serves an empty address space until the next config
/// change (and an identical-config redeploy no-ops on the unchanged revision). Re-spawns
/// drivers, rebuilds the address space from the applied artifact, and re-pushes SubscribeBulk.
/// No re-ack: the deployment is already Applied.
/// </summary>
private void RestoreApplied(DeploymentId deploymentId)
{
var correlation = CorrelationId.NewId();
try
{
ReconcileDrivers(deploymentId);
_opcUaPublishActor?.Tell(new ZB.MOM.WW.OtOpcUa.Runtime.OpcUa.OpcUaPublishActor.RebuildAddressSpace(correlation, deploymentId));
PushDesiredSubscriptions(deploymentId);
_log.Info("DriverHost {Node}: restored served state for applied deployment {Id} on bootstrap", _localNode, deploymentId);
}
catch (Exception ex)
{
_log.Warning(ex, "DriverHost {Node}: failed to restore served state for {Id} on bootstrap", _localNode, deploymentId);
}
}
/// <summary>
/// SubscribeBulk pass. After an apply, read the deployment's Equipment-namespace tags,
/// group their driver-side FullName references by driver instance, and hand each running driver
/// child its desired subscription set via <see cref="DriverInstanceActor.SetDesiredSubscriptions"/>.
/// The child retains the set and (re)subscribes on every Connected entry, so values stream into
/// the OPC UA sink and resume after reconnects. Drivers with no configured tags get an empty set
/// (which clears any stale subscription from a previous deployment).
/// </summary>
private void PushDesiredSubscriptions(DeploymentId deploymentId)
{
byte[] blob;
try
{
using var db = _dbFactory.CreateDbContext();
blob = db.Deployments.AsNoTracking()
.Where(d => d.DeploymentId == deploymentId.Value)
.Select(d => d.ArtifactBlob)
.FirstOrDefault() ?? Array.Empty<byte>();
}
catch (Exception ex)
{
_log.Warning(ex, "DriverHost {Node}: failed to load artifact for SubscribeBulk ({Id})", _localNode, deploymentId);
return;
}
AddressSpaceComposition composition;
try
{
composition = DeploymentArtifact.ParseComposition(blob, _localNode.Value);
}
catch (Exception ex)
{
_log.Warning(ex, "DriverHost {Node}: failed to parse composition for SubscribeBulk ({Id})", _localNode, deploymentId);
return;
}
// Value-subscription set: alarm-bearing tags are Part 9 conditions, not value variables, so they
// are excluded — the driver must not value-subscribe an alarm attribute (it is fed via the native
// alarm event stream, routed by ForwardNativeAlarm).
var refsByDriver = composition.EquipmentTags
.Where(t => t.Alarm is null)
.GroupBy(t => t.DriverInstanceId, StringComparer.Ordinal)
.ToDictionary(
g => g.Key,
g => (IReadOnlyList<string>)g.Select(t => t.FullName)
.Distinct(StringComparer.Ordinal)
.ToArray(),
StringComparer.Ordinal);
// Native-alarm subscription set: the alarm-bearing tags' FullNames (= the driver's
// ConditionId/AlarmFullReference). An IAlarmSource driver suppresses OnAlarmEvent until at least one
// alarm subscription exists (e.g. GalaxyDriver gates its central feed on _alarmSubscriptions), so the
// instance actor must SubscribeAlarmsAsync these refs to un-gate the feed. Routing stays by
// ConditionId in ForwardNativeAlarm; this set just opens (and scopes) the subscription.
var alarmRefsByDriver = composition.EquipmentTags
.Where(t => t.Alarm is not null)
.GroupBy(t => t.DriverInstanceId, StringComparer.Ordinal)
.ToDictionary(
g => g.Key,
g => (IReadOnlyList<string>)g.Select(t => t.FullName)
.Distinct(StringComparer.Ordinal)
.ToArray(),
StringComparer.Ordinal);
// Rebuild the driver live-value routing map from the SAME EquipmentTags pass (mirrors
// VirtualTagHostActor._nodeIdByVtag): map each tag's (DriverInstanceId, FullName) wire-ref to
// the folder-scoped equipment NodeId the materialiser placed its variable at, so ForwardToMux
// can land driver values on the right node. Clear-and-repopulate every apply so renames
// (Name/FolderPath/EquipmentId changes) and removals are reflected.
_nodeIdByDriverRef.Clear();
// Inverse map for the inbound operator-write path (NodeId → (DriverInstanceId, FullName)): an
// operator writes to the variable's folder-scoped NodeId, but the driver writes by its wire-ref
// FullName. Cleared + repopulated from the SAME EquipmentTags pass so renames/removals are
// reflected. Each NodeId maps to exactly one driver ref (a variable is backed by a single driver
// attribute), so last-writer-wins on the rare duplicate is harmless.
_driverRefByNodeId.Clear();
// Alarm condition routing map: (DriverInstanceId, FullName = alarm ConditionId/AlarmFullReference) → folder-scoped
// condition NodeId(s). Built from the SAME EquipmentTags pass (alarm-bearing tags only) so
// ForwardNativeAlarm can land a native transition on the right condition node. Clear-and-rebuild
// every apply; the projector is Clear()'d too so stale per-condition state never leaks across
// redeploys (renames/removals/address-space rebuilds).
_alarmNodeIdByDriverRef.Clear();
// Inverse alarm map for the inbound native-condition ack path (condition NodeId → (DriverInstanceId,
// FullName)): an OPC UA client acknowledges the condition's folder-scoped NodeId, but the driver
// acknowledges by its wire-ref FullName (= ConditionId). Cleared + repopulated from the SAME
// alarm-bearing EquipmentTags pass so renames/removals are reflected. Each condition NodeId maps to
// exactly one driver ref (a condition is backed by a single driver alarm), so last-writer-wins on the
// rare duplicate is harmless.
_driverRefByAlarmNodeId.Clear();
// Per-condition metadata (EquipmentId / Name / OPC UA alarm type) for the alerts fan-out, built in
// the SAME alarm branch as the node map so a redeploy can't leave it out of sync. Cleared alongside it.
_alarmMetaByNodeId.Clear();
_nativeAlarmProjector.Clear();
foreach (var t in composition.EquipmentTags)
{
var key = (t.DriverInstanceId, t.FullName);
var nodeId = EquipmentNodeIds.Variable(t.EquipmentId, t.FolderPath, t.Name);
if (t.Alarm is not null)
{
// Alarm tags are conditions, not value variables: route them ONLY into the alarm map and
// keep them OUT of the value maps + value-subscription set (so they don't get both a value
// variable AND a condition).
if (!_alarmNodeIdByDriverRef.TryGetValue(key, out var aset))
_alarmNodeIdByDriverRef[key] = aset = new HashSet<string>(StringComparer.Ordinal);
aset.Add(nodeId);
// Inverse 1:1 map for the inbound native-condition ack path: the materialised condition
// NodeId routes back to the owning (DriverInstanceId, FullName=ConditionId) so an OPC UA
// acknowledge of this condition reaches the right driver child.
_driverRefByAlarmNodeId[nodeId] = key;
// Capture the per-condition metadata the alerts fan-out (ForwardNativeAlarm) needs to build
// the AlarmTransitionEvent: the equipment path, the operator-visible alarm name, and the
// OPC UA Part 9 subtype. Keyed by the condition NodeId (the projection's own key).
_alarmMetaByNodeId[nodeId] = (t.EquipmentId, t.Name, t.Alarm.AlarmType, t.Alarm.HistorizeToAveva);
continue;
}
if (!_nodeIdByDriverRef.TryGetValue(key, out var set))
_nodeIdByDriverRef[key] = set = new HashSet<string>(StringComparer.Ordinal);
set.Add(nodeId);
_driverRefByNodeId[nodeId] = key;
}
// Snapshot the cached (FixedTree-discovered) driver set BEFORE the bulk loop, while _discoveredByDriver
// is still untouched (the re-inject tail below drops/removes entries). Cached drivers are SKIPPED in the
// bulk loop because the tail sends each of them EXACTLY ONE SetDesiredSubscriptions for this pass: the
// authoreddiscovered union (ApplyDiscoveredPlansForDriver) for a survivor, or — if its plan is fully
// dropped — an authored-only fallback. Sending the bulk authored-only set HERE too would force the child
// to drop the whole handle (authored tags included) then re-subscribe — an extra unsub/resub blip of the
// authored values once per cached driver per redeploy. Net effect: exactly ONE send per driver per pass.
var cachedDriverIds = _discoveredByDriver.Keys.ToHashSet(StringComparer.Ordinal);
var total = 0;
foreach (var (driverId, entry) in _children)
{
// Cached drivers are owned exclusively by the re-inject tail (one send each) — skip here. Non-cached
// drivers keep the bulk authored-only send exactly as before.
if (cachedDriverIds.Contains(driverId)) continue;
var refs = refsByDriver.TryGetValue(driverId, out var r) ? r : Array.Empty<string>();
var alarmRefs = alarmRefsByDriver.TryGetValue(driverId, out var ar) ? ar : Array.Empty<string>();
entry.Actor.Tell(new DriverInstanceActor.SetDesiredSubscriptions(refs, SubscriptionPublishingInterval, alarmRefs));
total += refs.Count;
}
if (total > 0)
{
_log.Info("DriverHost {Node}: SubscribeBulk pushed {Refs} references across {Drivers} driver(s)",
_localNode, total, refsByDriver.Count);
}
// Hand the Equipment-namespace VirtualTags to the host so it spawns/reconciles a
// VirtualTagActor per plan and streams their evaluated values back onto the just-rebuilt
// address space. Runs on BOTH the fresh-apply path (ApplyAndAck) and the bootstrap-restore
// path (RestoreApplied) because both call this method, so one send covers both.
// NOTE: the Stale-recovery path (TryRecoverFromStale) does NOT call PushDesiredSubscriptions,
// so — like drivers — VirtualTags remain empty after a Stale recovery until the next
// deployment dispatch. This is intentional and consistent with driver recovery: the Stale
// path only restores the revision marker + NodeDeploymentState; a subsequent dispatch
// (or a redeploy from AdminUI) triggers the full apply + subscribe pass.
_virtualTagHost?.Tell(new VirtualTagHostActor.ApplyVirtualTags(composition.EquipmentVirtualTags));
if (composition.EquipmentVirtualTags.Count > 0)
{
_log.Info("DriverHost {Node}: applied {Count} Equipment VirtualTag(s) to the VirtualTag host",
_localNode, composition.EquipmentVirtualTags.Count);
}
// Same pass for Equipment-namespace ScriptedAlarms: hand the plans to the host so it
// (re)loads its engine + re-registers mux interest for the union of dependency refs. Covers
// both the fresh-apply and bootstrap-restore paths (both call this method); the Stale-recovery
// path deliberately does not, matching driver + VirtualTag recovery.
_scriptedAlarmHost?.Tell(new ScriptedAlarmHostActor.ApplyScriptedAlarms(composition.EquipmentScriptedAlarms));
if (composition.EquipmentScriptedAlarms.Count > 0)
{
_log.Info("DriverHost {Node}: applied {Count} Equipment ScriptedAlarm(s) to the ScriptedAlarm host",
_localNode, composition.EquipmentScriptedAlarms.Count);
}
// Cache the applied composition LAST so discovered-node injection (HandleDiscoveredNodes) can resolve
// the equipment bound to a driver + recompute the authored subscription sets when a driver later
// reports its FixedTree. Set here (not in ApplyAndAck) so both the fresh-apply and bootstrap-restore
// paths — which both route through this method — leave a current composition.
_lastComposition = composition;
// Re-inject discovered (FixedTree) nodes after the authored rebuild. PushDesiredSubscriptions cleared
// _nodeIdByDriverRef and re-pushed authored-only subscriptions above; without this, an IN-PROCESS
// redeploy / re-apply (one that runs while the host is alive, so _discoveredByDriver is populated)
// would drop the injected FixedTree routes + materialised nodes until the driver happens to reconnect
// and re-discover. This loop is INERT on the bootstrap-restore path (RestoreApplied): there the actor
// is freshly constructed so _discoveredByDriver is empty — restart survival comes from Task 6's
// post-connect re-discovery, NOT this re-apply. Re-resolve each cached driver's candidate equipments
// from the CURRENT composition (the SAME EquipmentNodes-UNION-EquipmentTags logic HandleDiscoveredNodes
// uses), then validate each cached (equipmentId → plan) entry PER ENTRY: drop the entry if its
// equipmentId is no longer a resolved candidate for the driver, OR the plan's NodeIds aren't scoped to
// that equipmentId (a rebind). A driver whose inner map empties out is removed entirely. The surviving
// entries are re-applied via the single-send-per-driver structure. (The single-equipment case today has
// exactly one inner entry; the multi-device task adds more.)
foreach (var driverId in _discoveredByDriver.Keys.ToList()) // snapshot — we mutate the dict below
{
var fromNodes = composition.EquipmentNodes
.Where(e => e.DriverInstanceId is not null && string.Equals(e.DriverInstanceId, driverId, StringComparison.Ordinal))
.Select(e => e.EquipmentId);
var fromTags = composition.EquipmentTags
.Where(t => string.Equals(t.DriverInstanceId, driverId, StringComparison.Ordinal))
.Select(t => t.EquipmentId);
var candidates = fromNodes.Concat(fromTags).ToHashSet(StringComparer.Ordinal);
var plansByEquipment = _discoveredByDriver[driverId];
// Track whether ANY entry was dropped (no-longer-candidate or rebind) so we can re-trigger this
// driver's discovery exactly ONCE after the inner map is processed (see the post-loop block).
var droppedAny = false;
foreach (var equipmentId in plansByEquipment.Keys.ToList()) // snapshot — we mutate the inner dict
{
var plan = plansByEquipment[equipmentId];
if (!candidates.Contains(equipmentId))
{
plansByEquipment.Remove(equipmentId);
droppedAny = true;
_log.Debug("DriverHost {Node}: dropped cached discovered nodes for {Driver}/{Equipment} — equipment no longer resolves", _localNode, driverId, equipmentId);
continue;
}
// If the equipment was rebound (the cached plan's NodeIds are scoped to the OLD equipment), drop +
// let re-discovery rebuild against the new equipment. The plan's NodeIds are "{equipmentId}/...".
var planEquipmentConsistent = plan.Variables.Count > 0
&& plan.Variables[0].NodeId.StartsWith(equipmentId + "/", StringComparison.Ordinal);
if (!planEquipmentConsistent)
{
plansByEquipment.Remove(equipmentId);
droppedAny = true;
_log.Debug("DriverHost {Node}: dropped cached discovered nodes for {Driver}/{Equipment} — equipment rebound", _localNode, driverId, equipmentId);
}
}
// Re-trigger discovery when ANY entry was dropped (no-longer-candidate or rebind). A CONFIG-UNCHANGED
// rebind (the driver's DriverConfig is identical, only its authored tag's EquipmentId moved) is NOT
// restarted by ReconcileDrivers — the child stays Connected — so without this nudge the FixedTree
// subtree would stay ABSENT under the new equipment until the driver's next natural reconnect. We now
// ask the child to re-run discovery so it re-grafts promptly: the next pass resolves against the new
// _lastComposition (the now-bound equipment). This is a DISCOVERY action, not lifecycle control — no
// stop/restart; it is idempotent, and the child no-ops it if not Connected (handled in
// DriverInstanceActor). Sent at most ONCE per driver per re-inject pass (here, after the inner map is
// processed — so even when the inner map empties below), guarded on the child still existing.
if (droppedAny && _children.TryGetValue(driverId, out var rediscoverEntry))
rediscoverEntry.Actor.Tell(new DriverInstanceActor.TriggerRediscovery());
if (plansByEquipment.Count == 0)
{
_discoveredByDriver.Remove(driverId);
// FALLBACK (one-send invariant): this driver was SKIPPED in the bulk loop (it was cached), and its
// plan is now FULLY DROPPED — so ApplyDiscoveredPlansForDriver won't run for it and it would
// otherwise receive ZERO sends this pass, losing its AUTHORED subscriptions. Send the authored-only
// set NOW (the SAME payload the bulk loop computes), so the authored tags subscribe in THIS pass.
// (The TriggerRediscovery above handles the async FixedTree re-graft separately; this just keeps
// the authored values live meanwhile.) Guarded on the child still existing — a driver removed by
// ReconcileDrivers has no child and correctly gets no send.
if (_children.TryGetValue(driverId, out var fallbackEntry))
{
var refs = refsByDriver.TryGetValue(driverId, out var r) ? r : Array.Empty<string>();
var alarmRefs = alarmRefsByDriver.TryGetValue(driverId, out var ar) ? ar : Array.Empty<string>();
fallbackEntry.Actor.Tell(new DriverInstanceActor.SetDesiredSubscriptions(refs, SubscriptionPublishingInterval, alarmRefs));
}
continue;
}
ApplyDiscoveredPlansForDriver(driverId, plansByEquipment);
}
}
private void SpawnChild(DriverInstanceSpec spec)
{
var stub = DriverInstanceActor.ShouldStub(spec.DriverType, _localRoles);
IDriver? driver = null;
if (!stub)
{
try { driver = _driverFactory.TryCreate(spec.DriverType, spec.DriverInstanceId, spec.DriverConfig); }
catch (Exception ex)
{
_log.Warning(ex, "DriverHost {Node}: factory for {Type} threw on {Id}; stubbing",
_localNode, spec.DriverType, spec.DriverInstanceId);
}
if (driver is null)
{
_log.Warning(
"DriverHost {Node}: no factory for driver type {Type} (instance {Id}); falling back to stub",
_localNode, spec.DriverType, spec.DriverInstanceId);
stub = true;
}
}
IActorRef child;
// Prefer the real ClusterId from the deployment artifact; fall back to the local node
// identity for pre-PR artifacts that don't carry it yet (older deploys persisted before
// ClusterId was added to DriverInstanceSpec).
var clusterId = !string.IsNullOrEmpty(spec.ClusterId) ? spec.ClusterId : _localNode.Value;
// A fresh generation-suffixed name on EVERY spawn so a respawn can never collide with a child
// still tearing down: Akka frees a stopped actor's name only after it FULLY terminates (async),
// but HandleRestartDriver stops + respawns within one (synchronous) message handler — the old
// child is still registered, so reusing the base name throws InvalidActorNameException
// ("actor name is not unique"). Children are tracked by the _children dict (by IActorRef), never
// by path, so the suffix is invisible to every caller.
var actorName = ActorNameFor(spec.DriverInstanceId, _childSpawnGeneration++);
if (stub)
{
child = Context.ActorOf(
DriverInstanceActor.Props(
new StubbedDriver(spec.DriverInstanceId, spec.DriverType),
reconnectInterval: null,
startStubbed: true,
healthPublisher: _healthPublisher,
clusterId: clusterId),
actorName);
}
else
{
child = Context.ActorOf(
DriverInstanceActor.Props(
driver!,
healthPublisher: _healthPublisher,
clusterId: clusterId),
actorName);
child.Tell(new DriverInstanceActor.InitializeRequested(spec.DriverConfig));
}
_children[spec.DriverInstanceId] = new ChildEntry(child, spec, stub);
_log.Info("DriverHost {Node}: spawned {Type} driver {Id} (stub={Stub})",
_localNode, spec.DriverType, spec.DriverInstanceId, stub);
}
private void ApplyChildDelta(DriverInstanceSpec spec)
{
if (!_children.TryGetValue(spec.DriverInstanceId, out var entry)) return;
entry.Actor.Tell(new DriverInstanceActor.ApplyDelta(spec.DriverConfig, CorrelationId.NewId()));
// Store the full new spec — a delta can change Name, Enabled, ClusterId, etc. in addition to config.
_children[spec.DriverInstanceId] = entry with { Spec = spec };
_log.Debug("DriverHost {Node}: ApplyDelta queued for {Id}", _localNode, spec.DriverInstanceId);
}
private void StopChild(string driverInstanceId)
{
if (!_children.TryGetValue(driverInstanceId, out var entry)) return;
Context.Stop(entry.Actor);
_children.Remove(driverInstanceId);
_log.Info("DriverHost {Node}: stopped driver child {Id}", _localNode, driverInstanceId);
}
private static string ActorNameFor(string driverInstanceId, long generation)
{
// Akka actor names cannot contain '/', ':', or whitespace. Mangle defensively. The monotonic
// generation suffix guarantees a never-before-used name on every spawn (see SpawnChild) so a
// restart's respawn can never collide with the still-terminating predecessor.
var chars = driverInstanceId.Select(c => char.IsLetterOrDigit(c) || c is '-' or '_' or '.' ? c : '_').ToArray();
return "drv-" + new string(chars) + "-g" + generation;
}
/// <summary>
/// Minimal placeholder driver used when no factory is registered for a driver type or when
/// <see cref="DriverInstanceActor.ShouldStub"/> returns true. <see cref="DriverInstanceActor"/>
/// is started with <c>startStubbed:true</c> so the driver methods on this object never run.
/// </summary>
private sealed class StubbedDriver : IDriver
{
/// <inheritdoc />
public string DriverInstanceId { get; }
/// <inheritdoc />
public string DriverType { get; }
/// <summary>Initializes a new stubbed driver with the specified ID and type.</summary>
/// <param name="id">The driver instance identifier.</param>
/// <param name="type">The driver type name.</param>
public StubbedDriver(string id, string type) { DriverInstanceId = id; DriverType = type; }
/// <inheritdoc />
public Task InitializeAsync(string driverConfigJson, CancellationToken cancellationToken) => Task.CompletedTask;
/// <inheritdoc />
public Task ReinitializeAsync(string driverConfigJson, CancellationToken cancellationToken) => Task.CompletedTask;
/// <inheritdoc />
public Task ShutdownAsync(CancellationToken cancellationToken) => Task.CompletedTask;
/// <inheritdoc />
public DriverHealth GetHealth() => new(DriverState.Healthy, DateTime.UtcNow, LastError: null);
/// <inheritdoc />
public long GetMemoryFootprint() => 0;
/// <inheritdoc />
public Task FlushOptionalCachesAsync(CancellationToken cancellationToken) => Task.CompletedTask;
}
private void HandleRestartDriver(RestartDriver msg)
{
// DPS broadcast — only act if this node hosts the requested instance.
if (!_children.TryGetValue(msg.DriverInstanceId, out var entry))
return;
_log.Info("DriverHost {Node}: restarting driver {Id} by request of {User}",
_localNode, msg.DriverInstanceId, msg.ActorByUserName);
// Stop the existing child actor — DriverInstanceActor.PostStop calls ShutdownAsync.
Context.Stop(entry.Actor);
_children.Remove(msg.DriverInstanceId);
// Respawn from the same spec the last reconcile used — preserves RowId, Name, ClusterId.
SpawnChild(entry.Spec);
}
private void HandleReconnectDriver(ReconnectDriver msg)
{
// DPS broadcast — only act if this node hosts the requested instance.
if (!_children.TryGetValue(msg.DriverInstanceId, out var entry))
return;
_log.Info("DriverHost {Node}: reconnecting driver {Id} by request of {User}",
_localNode, msg.DriverInstanceId, msg.ActorByUserName);
// Tell the child to drop its transport and re-enter the Reconnecting state.
entry.Actor.Tell(new DriverInstanceActor.ForceReconnect());
}
private void TryRecoverFromStale()
{
try
{
using var db = _dbFactory.CreateDbContext();
var latestSealed = db.Deployments
.AsNoTracking()
.Where(d => d.Status == DeploymentStatus.Sealed)
.OrderByDescending(d => d.SealedAtUtc)
.Select(d => new { d.DeploymentId, d.RevisionHash })
.FirstOrDefault();
_log.Info("DriverHost {Node}: ConfigDb back; recovering from Stale", _localNode);
Timers.Cancel("retry-db");
if (latestSealed is not null)
{
_currentRevision = RevisionHash.Parse(latestSealed.RevisionHash);
UpsertNodeDeploymentState(new DeploymentId(latestSealed.DeploymentId),
NodeDeploymentStatus.Applied, failureReason: null);
}
Become(Steady);
}
catch (Exception ex)
{
_log.Debug(ex, "DriverHost {Node}: still Stale; will retry in {Interval}", _localNode, ReconnectInterval);
}
}
private void UpsertNodeDeploymentState(DeploymentId deploymentId, NodeDeploymentStatus status, string? failureReason)
{
try
{
using var db = _dbFactory.CreateDbContext();
var existing = db.NodeDeploymentStates.FirstOrDefault(
x => x.NodeId == _localNode.Value && x.DeploymentId == deploymentId.Value);
if (existing is null)
{
db.NodeDeploymentStates.Add(new NodeDeploymentState
{
NodeId = _localNode.Value,
DeploymentId = deploymentId.Value,
Status = status,
FailureReason = failureReason,
AppliedAtUtc = status == NodeDeploymentStatus.Applied ? DateTime.UtcNow : null,
});
}
else
{
existing.Status = status;
existing.FailureReason = failureReason;
if (status == NodeDeploymentStatus.Applied) existing.AppliedAtUtc = DateTime.UtcNow;
}
db.SaveChanges();
}
catch (Exception ex)
{
_log.Warning(ex, "DriverHost {Node}: failed to upsert NodeDeploymentState for {Id}", _localNode, deploymentId);
}
}
private void SendAck(DeploymentId deploymentId, ApplyAckOutcome outcome, string? failureReason, CorrelationId correlation)
{
var ack = new ApplyAck(deploymentId, _localNode, outcome, failureReason, correlation);
if (_coordinatorOverride is not null)
{
_coordinatorOverride.Tell(ack);
}
else
{
// No direct coordinator handle — publish on the dedicated ACK topic. The coordinator
// singleton subscribes there in PreStart so the ACK reaches whichever admin node hosts
// it without an actor-path lookup.
DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish(DeploymentAcksTopic, ack));
}
}
}