using System.Diagnostics; using Akka.Actor; using Akka.Cluster.Tools.PublishSubscribe; using Akka.Event; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; using ZB.MOM.WW.OtOpcUa.Commons.Engines; using ZB.MOM.WW.OtOpcUa.Commons.Interfaces; using ZB.MOM.WW.OtOpcUa.Commons.Messages.Admin; using ZB.MOM.WW.OtOpcUa.Commons.Messages.Alerts; using ZB.MOM.WW.OtOpcUa.Commons.Messages.Deploy; using ZB.MOM.WW.OtOpcUa.Commons.Messages.Fleet; using ZB.MOM.WW.OtOpcUa.Commons.Messages.Redundancy; using ZB.MOM.WW.OtOpcUa.Commons.Observability; using ZB.MOM.WW.OtOpcUa.Commons.OpcUa; using ZB.MOM.WW.OtOpcUa.Commons.Types; using ZB.MOM.WW.OtOpcUa.Configuration; using ZB.MOM.WW.OtOpcUa.Configuration.Entities; using ZB.MOM.WW.OtOpcUa.Configuration.Enums; using ZB.MOM.WW.OtOpcUa.Core.Abstractions; using ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms; using ZB.MOM.WW.OtOpcUa.Core.Scripting; using ZB.MOM.WW.OtOpcUa.Core.VirtualTags; using ZB.MOM.WW.OtOpcUa.OpcUaServer; using ZB.MOM.WW.OtOpcUa.Runtime.ScriptedAlarms; using ZB.MOM.WW.OtOpcUa.Runtime.VirtualTags; using CommonsNodeId = ZB.MOM.WW.OtOpcUa.Commons.Types.NodeId; namespace ZB.MOM.WW.OtOpcUa.Runtime.Drivers; /// /// Per-node supervisor that receives from the admin-role /// coordinator and applies the deployment locally. Three Become states: /// /// /// Bootstrapping — PreStart only. Reads for self; /// chooses next state. /// Steady(rev) — caught up. Idempotent on same-rev dispatch (immediate ApplyAck.Applied). /// New rev → transitions to Applying. /// Applying(id) — applying a delta. Buffers further dispatches. /// Stale — ConfigDb unreachable on bootstrap. Background reconnect loop tries to advance. /// /// /// Children (DriverInstance/VirtualTag/etc.) are fully wired: DispatchDeployment drives /// which runs the spawn-plan via , /// maintains the FullName→NodeId live-value routing map, and forwards results to the /// OPC UA publish actor via ForwardToMux. /// public sealed class DriverHostActor : ReceiveActor, IWithTimers { public const string DeploymentsTopic = "deployments"; public const string DeploymentAcksTopic = "deployment-acks"; public const string DriverControlTopic = ZB.MOM.WW.OtOpcUa.Commons.Messages.Admin.DriverControlTopic.Name; public static readonly TimeSpan ReconnectInterval = TimeSpan.FromSeconds(30); /// Publishing interval handed to each driver's SubscribeBulk pass after an apply. private static readonly TimeSpan SubscriptionPublishingInterval = TimeSpan.FromSeconds(1); private readonly IDbContextFactory _dbFactory; private readonly CommonsNodeId _localNode; private readonly IActorRef? _coordinatorOverride; private readonly IDriverFactory _driverFactory; private readonly IReadOnlySet _localRoles; private readonly IActorRef? _dependencyMux; private readonly IActorRef? _opcUaPublishActor; private readonly IDriverHealthPublisher _healthPublisher; private readonly IVirtualTagEvaluator _virtualTagEvaluator; private readonly IHistoryWriter _historyWriter; private readonly IActorRef? _virtualTagHostOverride; private readonly ILoggerFactory _loggerFactory; private readonly ScriptRootLogger? _scriptRootLogger; private readonly IActorRef? _scriptedAlarmHostOverride; private readonly ILoggingAdapter _log = Context.GetLogger(); /// The single VirtualTag-host child that spawns/reconciles Equipment-namespace /// VirtualTagActors and bridges their results onto the OPC UA publish actor. Spawned in /// when an OPC UA publish actor is wired; receives /// from . private IActorRef? _virtualTagHost; /// The single ScriptedAlarm-host child that owns the per-node /// , feeds it live tag values, and bridges its emissions onto the /// OPC UA publish actor + the cluster alerts topic. Spawned in /// alongside the VirtualTag host (requires both an OPC UA publish actor and a /// ); receives /// from . private IActorRef? _scriptedAlarmHost; private RevisionHash? _currentRevision; private DeploymentId? _applyingDeploymentId; private readonly Dictionary _children = new(StringComparer.Ordinal); // Monotonic counter feeding the child actor-name suffix (see ActorNameFor / SpawnChild). Single- // threaded actor, so a plain increment is safe; it only ever grows, guaranteeing a unique name per // spawn so a restart's respawn never collides with the still-terminating old child. private long _childSpawnGeneration; /// /// Driver live-value routing map: (DriverInstanceId, FullName) → folder-scoped equipment /// NodeId(s). Rebuilt every apply by from the /// composition's EquipmentTags (mirroring VirtualTagHostActor._nodeIdByVtag), and /// resolved in so a driver value published by wire-ref FullName lands /// on the variable's actual folder-scoped NodeId. A set because the same driver ref can back /// several equipment variables (e.g. identical machines sharing a register), and the per-apply /// rebuild dedups by NodeId. /// private readonly Dictionary<(string DriverInstanceId, string FullName), HashSet> _nodeIdByDriverRef = new(); /// /// Inverse of : folder-scoped equipment NodeId → /// (DriverInstanceId, FullName). Rebuilt every apply by /// from the same EquipmentTags pass, and resolved by so an /// inbound operator write targeting an equipment variable's NodeId is forwarded to the owning /// driver child as a write of its wire-ref FullName. Each NodeId maps to exactly one driver /// ref (a variable is backed by a single driver attribute), so this is a flat 1:1 map (the forward /// map fans out 1:N because one ref can back several variables). /// private readonly Dictionary _driverRefByNodeId = new(StringComparer.Ordinal); /// (DriverInstanceId, FullName = alarm ConditionId / AlarmFullReference) → folder-scoped condition NodeId(s). /// Built from EquipmentTags whose plan carries Alarm, alongside the value maps; resolves a native /// alarm transition to the materialised Part 9 condition node(s). Alarm tags are conditions, not /// value variables, so they are kept OUT of the value maps + value-subscription set. private readonly Dictionary<(string DriverInstanceId, string FullName), HashSet> _alarmNodeIdByDriverRef = new(); /// /// Inverse of : folder-scoped condition NodeId → /// (DriverInstanceId, FullName = alarm ConditionId/AlarmFullReference). Built in the SAME apply /// pass from the alarm-bearing EquipmentTags, and resolved by /// so an inbound OPC UA acknowledge of a native condition (resolved to the condition's NodeId by the /// node manager) is forwarded to the owning driver child as an acknowledge of its wire-ref /// FullName. Each condition NodeId maps to exactly one driver ref (a condition is backed by a /// single driver alarm), so this is a flat 1:1 map (the forward map fans out 1:N because one ref can /// back several conditions on identical machines). /// private readonly Dictionary _driverRefByAlarmNodeId = new(StringComparer.Ordinal); /// Condition NodeId → (EquipmentId, tag Name, OPC UA alarm type, HistorizeToAveva) for building /// the AlarmTransitionEvent fan-out. Built in the same PushDesiredSubscriptions alarm branch. /// HistorizeToAveva (bool?, null ⇒ historize) is the native per-condition opt-out parsed from /// TagConfig.alarm.historizeToAveva; it is threaded onto the transition so the /// HistorianAdapterActor's is not false gate suppresses the durable AVEVA row only on an /// explicit false (mirroring the scripted-alarm opt-out). private readonly Dictionary _alarmMetaByNodeId = new(StringComparer.Ordinal); /// Derives a full Part 9 condition snapshot from each native alarm transition delta, /// tracking per-condition-NodeId prior state. 'd on every apply alongside the /// value maps so stale condition state never leaks across redeploys. private readonly NativeAlarmProjector _nativeAlarmProjector = new(); /// /// Cached local from the latest /// snapshot (null = unknown until the first snapshot arrives, or no local node match). The inbound /// write gate in reuses this signal — the SAME one the /// scripted-alarm emit gate uses (ScriptedAlarmHostActor._localRole): only the Primary /// services writes, default-allow while unknown so single-node deploys + the boot window never /// reject (a node is the sole Primary until told otherwise). /// private RedundancyRole? _localRole; /// Cached cluster DistributedPubSub mediator, resolved once in (on the /// actor thread) and reused for the Primary-gated native-alarm alerts fan-out in /// instead of re-resolving it per-publish. Mirrors /// ScriptedAlarmHostActor._mediator. private IActorRef _mediator = null!; /// /// Routes an inbound operator write (resolved from the OPC UA node-manager side, Task 11) to the /// owning driver child. gates on the local node being the /// driver Primary, resolves the reverse map, and Asks the child a /// carrying the driver-side . /// /// The folder-scoped equipment-variable NodeId the operator wrote to. /// The value to write (the driver coerces it to the attribute's data type). public sealed record RouteNodeWrite(string NodeId, object? Value); /// Reply to : the outcome of forwarding the write to the driver /// (or a gate/lookup failure that never reached the driver). /// True when the driver accepted the write. /// Failure detail when is false; null on success. public sealed record NodeWriteResult(bool Success, string? Reason); /// /// Routes an inbound native-condition acknowledge to the owning driver child. The host wires the /// OPC UA node manager's NativeAlarmAckRouter to Tell this in when a client Acknowledges a /// NATIVE Part 9 condition; applies the same Primary gate the /// inbound write path uses, resolves the inverse map, and Tells /// the owning a /// carrying the principal — fire-and-forget (the Part 9 ack already committed the local condition /// state). Deliberately decoupled from the OpcUaServer NativeAlarmAck type: the host maps /// NativeAlarmAck → this at the wiring boundary so Runtime does not depend on the OPC UA layer. /// /// The folder-scoped condition NodeId the operator acknowledged. /// Operator-supplied comment forwarded to the upstream alarm system; null when none. /// The authenticated principal performing the acknowledge. public sealed record RouteNativeAlarmAck(string ConditionNodeId, string? Comment, string OperatorUser); private sealed record ChildEntry(IActorRef Actor, DriverInstanceSpec Spec, bool Stubbed) { // Convenience accessors for sites that don't need the full spec. public string DriverType => Spec.DriverType; public string LastConfigJson => Spec.DriverConfig; } /// public ITimerScheduler Timers { get; set; } = null!; public sealed class RetryConfigDbConnection { public static readonly RetryConfigDbConnection Instance = new(); private RetryConfigDbConnection() { } } /// Creates props for a DriverHostActor with the specified dependencies. /// Database context factory for configuration database access. /// The local cluster node identifier. /// Optional coordinator actor reference for deployment coordination. /// Optional driver factory; defaults to null factory if not provided. /// Optional set of roles assigned to the local node. /// Optional actor reference for dependency multiplexing. /// Optional actor reference for OPC UA publishing. /// Optional driver-health publisher; defaults to /// so test harnesses and smoke fixtures don't need to wire it. /// Optional evaluator handed to the spawned /// 's children; defaults to /// (the dev/Mac path where no expression is evaluated). Production passes the DI-resolved /// Roslyn evaluator. /// Optional sink handed to the spawned /// for VirtualTag results whose plan opted into Historize=true; defaults to /// (the durable AVEVA sink is infra-gated, so no live-data historian /// write RPC exists). A deployment that binds a real in DI overrides it. /// Test seam: when supplied, this actor is used as the /// VirtualTag host instead of spawning a real child, so tests /// can intercept the message. Null in /// production (the real host is spawned). /// Optional logger factory used to create the /// 's logger when spawning the ScriptedAlarm host; /// defaults to when not provided. /// Optional root script logger required to spawn the ScriptedAlarm /// host (the engine + its script logging hang off it). When null the ScriptedAlarm host is left /// unspawned — the graceful dev/None-deployment path. /// Test seam: when supplied, this actor is used as the /// ScriptedAlarm host instead of spawning a real child, so /// tests can intercept the message. Null /// in production (the real host is spawned). public static Props Props( IDbContextFactory dbFactory, CommonsNodeId localNode, IActorRef? coordinator = null, IDriverFactory? driverFactory = null, IReadOnlySet? localRoles = null, IActorRef? dependencyMux = null, IActorRef? opcUaPublishActor = null, IDriverHealthPublisher? healthPublisher = null, IVirtualTagEvaluator? virtualTagEvaluator = null, IHistoryWriter? historyWriter = null, IActorRef? virtualTagHostOverride = null, ILoggerFactory? loggerFactory = null, ScriptRootLogger? scriptRootLogger = null, IActorRef? scriptedAlarmHostOverride = null) => Akka.Actor.Props.Create(() => new DriverHostActor( dbFactory, localNode, coordinator, driverFactory, localRoles, dependencyMux, opcUaPublishActor, healthPublisher, virtualTagEvaluator, historyWriter, virtualTagHostOverride, loggerFactory, scriptRootLogger, scriptedAlarmHostOverride)); /// Initializes a new DriverHostActor with the specified dependencies. /// Database context factory for configuration database access. /// The local cluster node identifier. /// Optional coordinator actor reference for deployment coordination. /// Optional driver factory; defaults to null factory if not provided. /// Optional set of roles assigned to the local node. /// Optional actor reference for dependency multiplexing. /// Optional actor reference for OPC UA publishing. /// Optional driver-health publisher; defaults to . /// Optional evaluator handed to the VirtualTag host's children; /// defaults to . /// Optional sink handed to the spawned /// for historized VirtualTag results; defaults to . /// Test seam: when supplied, used as the VirtualTag host /// instead of spawning a real child. /// Optional logger factory used to create the /// 's logger; defaults to . /// Optional root script logger required to spawn the ScriptedAlarm /// host; when null the host is left unspawned. /// Test seam: when supplied, used as the ScriptedAlarm host /// instead of spawning a real child. public DriverHostActor( IDbContextFactory dbFactory, CommonsNodeId localNode, IActorRef? coordinator, IDriverFactory? driverFactory = null, IReadOnlySet? localRoles = null, IActorRef? dependencyMux = null, IActorRef? opcUaPublishActor = null, IDriverHealthPublisher? healthPublisher = null, IVirtualTagEvaluator? virtualTagEvaluator = null, IHistoryWriter? historyWriter = null, IActorRef? virtualTagHostOverride = null, ILoggerFactory? loggerFactory = null, ScriptRootLogger? scriptRootLogger = null, IActorRef? scriptedAlarmHostOverride = null) { _dbFactory = dbFactory; _localNode = localNode; _coordinatorOverride = coordinator; _driverFactory = driverFactory ?? NullDriverFactory.Instance; _localRoles = localRoles ?? new HashSet(StringComparer.Ordinal); _dependencyMux = dependencyMux; _opcUaPublishActor = opcUaPublishActor; _healthPublisher = healthPublisher ?? NullDriverHealthPublisher.Instance; _virtualTagEvaluator = virtualTagEvaluator ?? NullVirtualTagEvaluator.Instance; _historyWriter = historyWriter ?? NullHistoryWriter.Instance; _virtualTagHostOverride = virtualTagHostOverride; _loggerFactory = loggerFactory ?? NullLoggerFactory.Instance; _scriptRootLogger = scriptRootLogger; _scriptedAlarmHostOverride = scriptedAlarmHostOverride; // Default behavior is Steady — PreStart may flip to Stale or replay an orphan apply. Become(Steady); } /// protected override void PreStart() { // Resolve the cluster DPS mediator once, on the actor thread, and reuse it for every subscribe + // the Primary-gated native-alarm alerts fan-out in ForwardNativeAlarm (mirrors ScriptedAlarmHostActor). _mediator = DistributedPubSub.Get(Context.System).Mediator; // Subscribe to deployments topic so the coordinator's broadcast lands here. _mediator.Tell(new Subscribe(DeploymentsTopic, Self)); // Subscribe to driver-control topic so AdminUI Reconnect/Restart commands land here. _mediator.Tell(new Subscribe(DriverControlTopic, Self)); // Subscribe to the redundancy-state topic so cluster role changes cache this node's role — the // inbound-write gate in HandleRouteNodeWrite reuses the SAME signal the scripted-alarm emit gate // uses so only the Primary services operator writes (the secondary keeps state warm for failover). _mediator.Tell( new Subscribe(ZB.MOM.WW.OtOpcUa.Runtime.OpcUa.OpcUaPublishActor.RedundancyStateTopic, Self)); // Spawn the VirtualTag host BEFORE Bootstrap so the bootstrap-restore path (which routes // through PushDesiredSubscriptions and Tells ApplyVirtualTags) has a live host to target. SpawnVirtualTagHost(); // Same rationale for the ScriptedAlarm host — the bootstrap-restore path Tells it // ApplyScriptedAlarms through the same PushDesiredSubscriptions pass. SpawnScriptedAlarmHost(); Bootstrap(); } /// /// Spawns the single child that owns the Equipment-namespace /// VirtualTagActors and bridges their results onto the OPC UA publish actor. A test-supplied /// virtualTagHostOverride short-circuits the spawn so a probe can intercept /// . The real host requires a non-null /// (its ctor throws otherwise), so when no publish actor is /// wired (legacy ControlPlane test harnesses with no OPC UA sink) the host is left null and /// ApplyVirtualTags becomes a no-op — VirtualTags can't have anywhere to publish without it. /// private void SpawnVirtualTagHost() { if (_virtualTagHostOverride is not null) { _virtualTagHost = _virtualTagHostOverride; return; } if (_opcUaPublishActor is null) { _log.Debug("DriverHost {Node}: no OPC UA publish actor wired; skipping VirtualTag host spawn", _localNode); return; } _virtualTagHost = Context.ActorOf( VirtualTagHostActor.Props(_opcUaPublishActor, _dependencyMux, _virtualTagEvaluator, _historyWriter), "virtual-tag-host"); } /// /// Spawns the single child that owns the per-node /// , feeds it live tag values from the dependency mux, and /// bridges its emissions onto the OPC UA publish actor + the cluster alerts topic. A /// test-supplied scriptedAlarmHostOverride short-circuits the spawn so a probe can /// intercept . The real host needs /// both a non-null (the emission sink) and a non-null /// (the engine + its script logging hang off it); when either /// is missing (legacy ControlPlane test harnesses, dev/None deployments) the host is left /// null and ApplyScriptedAlarms becomes a no-op. The engine is built around a fresh /// + an ; /// the host (spawned as a child) owns + disposes the engine in its PostStop, so it stops with /// the driver host. /// private void SpawnScriptedAlarmHost() { if (_scriptedAlarmHostOverride is not null) { _scriptedAlarmHost = _scriptedAlarmHostOverride; return; } if (_opcUaPublishActor is null || _scriptRootLogger is null) { _log.Debug( "DriverHost {Node}: skipping ScriptedAlarm host spawn (no publish actor / root logger)", _localNode); return; } var upstream = new DependencyMuxTagUpstreamSource(); var store = new EfAlarmConditionStateStore( _dbFactory, _loggerFactory.CreateLogger()); var engine = new ScriptedAlarmEngine( upstream, store, new ScriptLoggerFactory(_scriptRootLogger.Logger), _scriptRootLogger.Logger); _scriptedAlarmHost = Context.ActorOf( ScriptedAlarmHostActor.Props(_opcUaPublishActor, _dependencyMux, upstream, engine, _localNode), "scripted-alarm-host"); } private void Bootstrap() { // Read the most-recent NodeDeploymentState for this node; if it's Applied, jump // to Steady with that revision. If Applying (orphan from a crash), discard and replay. // If the DB is unreachable, fall back to Stale and start the reconnect loop. try { using var db = _dbFactory.CreateDbContext(); var latest = db.NodeDeploymentStates .Where(s => s.NodeId == _localNode.Value) .OrderByDescending(s => s.StartedAtUtc) .Select(s => new { s.DeploymentId, s.Status, s.StartedAtUtc }) .FirstOrDefault(); if (latest is null) { _log.Info("DriverHost {Node}: no prior deployments; entering Steady (no revision)", _localNode); Become(Steady); return; } var deployment = db.Deployments .AsNoTracking() .FirstOrDefault(d => d.DeploymentId == latest.DeploymentId); var revision = deployment is null ? (RevisionHash?)null : RevisionHash.Parse(deployment.RevisionHash); switch (latest.Status) { case NodeDeploymentStatus.Applied: _currentRevision = revision; _log.Info("DriverHost {Node}: recovered Applied state at rev {Rev}", _localNode, revision); Become(Steady); // The revision is recovered but the in-memory driver children + OPC UA address // space were lost on restart. Re-spawn + re-materialise + re-subscribe from the // applied deployment so a restarted/rebuilt node restores its served state instead // of waiting for a config change (whose identical-config revision would no-op). RestoreApplied(new DeploymentId(latest.DeploymentId)); break; case NodeDeploymentStatus.Applying: _log.Warning("DriverHost {Node}: found orphan Applying row for deployment {Id}; replaying", _localNode, latest.DeploymentId); if (revision is not null) ApplyAndAck(new DeploymentId(latest.DeploymentId), revision.Value, CorrelationId.NewId()); else Become(Steady); break; case NodeDeploymentStatus.Failed: default: _log.Info("DriverHost {Node}: prior deployment {Id} failed; entering Steady at last known rev", _localNode, latest.DeploymentId); Become(Steady); break; } } catch (Exception ex) { _log.Warning(ex, "DriverHost {Node}: ConfigDb unreachable on bootstrap; entering Stale", _localNode); Become(Stale); } } private void Steady() { Receive(HandleDispatchFromSteady); Receive(HandleGetDiagnostics); Receive(ForwardToMux); Receive(ForwardNativeAlarm); Receive(HandleRestartDriver); Receive(HandleReconnectDriver); Receive(HandleRouteNodeWrite); Receive(HandleRouteNativeAlarmAck); Receive(OnRedundancyStateChanged); Receive(_ => { /* PubSub ack */ }); } private void Applying() { Receive(msg => { if (_applyingDeploymentId is not null && msg.DeploymentId == _applyingDeploymentId.Value) { _log.Debug("DriverHost {Node}: duplicate DispatchDeployment for in-flight {Id}; ignoring", _localNode, msg.DeploymentId); return; } _log.Info("DriverHost {Node}: dispatch for {Id} received while still applying {Cur}; deferring", _localNode, msg.DeploymentId, _applyingDeploymentId); Self.Forward(msg); // re-deliver after we transition back }); Receive(HandleGetDiagnostics); Receive(ForwardToMux); Receive(ForwardNativeAlarm); Receive(HandleRestartDriver); Receive(HandleReconnectDriver); Receive(HandleRouteNodeWrite); Receive(HandleRouteNativeAlarmAck); Receive(OnRedundancyStateChanged); Receive(_ => { /* PubSub ack */ }); } private void ForwardToMux(DriverInstanceActor.AttributeValuePublished msg) { // Pass driver-published values to the dependency mux when one is wired (VirtualTag inputs, // keyed by FullReference). Without a mux, VirtualTagActor evaluation can't fire — that's the // dev/Mac path (no virtual tags registered); production binds the mux via the RuntimeActors // extension. KEEP this unchanged — VirtualTag inputs are still keyed by FullReference. _dependencyMux?.Tell(msg); if (_opcUaPublishActor is null) return; // Route the value to the OPC UA sink at the variable's ACTUAL NodeId. Equipment-tag variables // are materialised with folder-scoped NodeIds (EquipmentId/FolderPath/Name), while the driver // publishes keyed by its wire-ref FullName (FullReference). The _nodeIdByDriverRef map — built // each apply from the composition's EquipmentTags — resolves (DriverInstanceId, FullName) to // the folder-scoped NodeId(s) the materialiser placed the variable(s) at, so the value lands // instead of leaving the variable at BadWaitingForInitialData. One driver ref can back several // equipment variables (identical machines sharing a register), hence the fan-out. if (_nodeIdByDriverRef.TryGetValue((msg.DriverInstanceId, msg.FullReference), out var nodeIds)) { foreach (var nodeId in nodeIds) _opcUaPublishActor.Tell(new ZB.MOM.WW.OtOpcUa.Runtime.OpcUa.OpcUaPublishActor.AttributeValueUpdate( nodeId, msg.Value, msg.Quality, msg.TimestampUtc)); } else { _log.Debug("DriverHost {Node}: no equipment-tag NodeId for ({Driver},{Ref}) — value dropped", _localNode, msg.DriverInstanceId, msg.FullReference); } } /// /// Routes a native alarm transition (published by a driver child as /// ) to its materialised Part 9 condition /// node(s). The alarm path analogue of : the transition's ConditionId /// (the dotted alarm full-reference, which equals the authored equipment-tag FullName — NOT the bare /// SourceNodeId owning object) is resolved by the map — /// built each apply from the alarm-bearing EquipmentTags — to the folder-scoped condition NodeId(s) /// the materialiser placed the condition(s) at. /// For each node the projects the transition delta into a full /// AlarmConditionSnapshot, then this Tells /// — the SAME message scripted alarms use, so it routes through WriteAlarmCondition. An /// unknown ref is Debug-logged and dropped (mirrors the value drop). /// /// /// Each transition is ALSO published as an to the cluster /// alerts topic — the single historization + live /alerts fan-out path, exactly as /// scripted alarms do (ScriptedAlarmHostActor.OnEngineEmission). The OPC UA condition /// write above stays UNGATED (a Secondary keeps its address space warm for failover), but the /// cluster-wide alerts publish is Primary-gated on the same redundancy /// signal the inbound-write gate uses — only the Primary publishes the single fleet-wide copy. /// /// private void ForwardNativeAlarm(DriverInstanceActor.AttributeAlarmPublished msg) { if (_opcUaPublishActor is null) return; // Resolve on ConditionId, NOT SourceNodeId: for Galaxy the dotted alarm full-reference — which // equals the authored equipment-tag FullName the map is keyed by — is carried in ConditionId // (AlarmFullReference), while SourceNodeId is the bare owning object (SourceObjectReference). if (!_alarmNodeIdByDriverRef.TryGetValue((msg.DriverInstanceId, msg.Args.ConditionId), out var nodeIds)) { _log.Debug("DriverHost {Node}: no alarm condition for ({Driver},{Ref}) — transition dropped", _localNode, msg.DriverInstanceId, msg.Args.ConditionId); return; } foreach (var nodeId in nodeIds) { var snapshot = _nativeAlarmProjector.Project(nodeId, msg.Args); _opcUaPublishActor.Tell(new ZB.MOM.WW.OtOpcUa.Runtime.OpcUa.OpcUaPublishActor.AlarmStateUpdate( nodeId, snapshot, msg.Args.SourceTimestampUtc)); // Warm-standby dedup: the OPC UA condition write above is UNGATED (the secondary keeps its // address space warm), but only the Primary publishes the cluster-wide alerts transition. // Default-emit until told we are Secondary/Detached so single-node deploys + the boot window // never drop transitions — the SAME signal HandleRouteNodeWrite gates writes on. if (_localRole is RedundancyRole.Secondary or RedundancyRole.Detached) continue; var meta = _alarmMetaByNodeId.TryGetValue(nodeId, out var m) ? m : (EquipmentId: nodeId, Name: nodeId, AlarmType: "AlarmCondition", HistorizeToAveva: (bool?)null); _mediator.Tell(new Publish(ScriptedAlarmHostActor.AlertsTopic, new AlarmTransitionEvent( AlarmId: nodeId, EquipmentPath: meta.EquipmentId, AlarmName: meta.Name, TransitionKind: ToEventKind(msg.Args.Kind), // The projector mapped the four-bucket AlarmSeverity onto the OPC UA 1..1000 scale already; // reuse its ushort so the condition node + the alerts row agree on severity. Severity: snapshot.Severity, Message: msg.Args.Message, // Native transitions are device-driven; an operator comment only rides along on an // acknowledge from the upstream alarm system. "device" marks a non-operator origin. User: msg.Args.OperatorComment is null ? string.Empty : "device", TimestampUtc: msg.Args.SourceTimestampUtc, AlarmTypeName: meta.AlarmType, Comment: msg.Args.OperatorComment, // Per-condition opt-out parsed from TagConfig.alarm.historizeToAveva (bool?, null ⇒ absent ⇒ // historize). The HistorianAdapterActor gate (historizeToAveva is not false) historizes null + // true and suppresses the durable AVEVA row only on an explicit false — the same posture as the // scripted-alarm opt-out. null here rides through unchanged (the gate treats it as default-on). HistorizeToAveva: meta.HistorizeToAveva))); } } /// Maps a native onto the canonical alarm event-kind /// vocabulary scripted alarms emit (the EmissionKind names) so a native row renders with the /// correct chip on the /alerts page and historizes into the same EventKind column as /// scripted alarms. An unmapped/unknown transition surfaces as Activated (visible) rather than /// a grey unknown label. private static string ToEventKind(AlarmTransitionKind kind) => kind switch { AlarmTransitionKind.Raise or AlarmTransitionKind.Retrigger => "Activated", AlarmTransitionKind.Clear => "Cleared", AlarmTransitionKind.Acknowledge => "Acknowledged", _ => "Activated", }; /// /// Routes an inbound operator write (Task 11 Asks this from the OPC UA node-manager side) to the /// owning driver child. Order matters: /// /// PRIMARY gate FIRST — reuses the same redundancy signal the /// scripted-alarm emit gate uses. Only the Primary services writes (default-allow while the /// role is unknown, so single-node deploys + the boot window never reject). A Secondary/Detached /// node keeps its address space + driver state warm for failover but must NOT push writes to the /// shared field device. /// Resolve the reverse map to the owning /// (DriverInstanceId, FullName). /// Resolve the running driver child. /// Ask the child a bounded of the driver-side /// FullName and pipe the translated /// result back to the asker. /// /// Every branch replies the asker a exactly once. /// private void HandleRouteNodeWrite(RouteNodeWrite msg) { // PRIMARY GATE FIRST — only the Primary services operator writes (same signal as the alarm-emit // gate; unknown role ⇒ treated as Primary so single-node deploys + the boot window aren't blocked). if (_localRole is RedundancyRole.Secondary or RedundancyRole.Detached) { Sender.Tell(new NodeWriteResult(false, "not primary")); return; } if (!_driverRefByNodeId.TryGetValue(msg.NodeId, out var target)) { Sender.Tell(new NodeWriteResult(false, $"no driver mapping for node {msg.NodeId}")); return; } if (!_children.TryGetValue(target.DriverInstanceId, out var entry)) { Sender.Tell(new NodeWriteResult(false, "driver not running")); return; } // Ask the child a bounded write — DriverInstanceActor.HandleWriteAsync already bounds the backend // call to 5s, so the 8s Ask is a safety net for an actor that never replies. Capture Sender before // the continuation: it runs off the actor thread, where raw Sender is unsafe to read. var replyTo = Sender; entry.Actor.Ask( new DriverInstanceActor.WriteAttribute(target.FullName, msg.Value!), TimeSpan.FromSeconds(8)) .ContinueWith( t => t.IsCompletedSuccessfully ? new NodeWriteResult(t.Result.Success, t.Result.Reason) : new NodeWriteResult(false, "write timeout"), CancellationToken.None, TaskContinuationOptions.None, TaskScheduler.Default) .PipeTo(replyTo); } /// /// Routes an inbound native-condition acknowledge (the host Tells this from the OPC UA node-manager /// side when a client Acknowledges a NATIVE Part 9 condition) to the owning driver child. Mirrors /// 's order + gating, but the ack is fire-and-forget (no reply): /// the OPC UA Part 9 acknowledge already committed the local condition state and the driver's /// returns no per-condition status. /// /// PRIMARY gate FIRST — reuses the same redundancy signal as the /// inbound-write gate. Only the Primary pushes the ack to the shared upstream alarm system /// (default-allow while the role is unknown). A Secondary/Detached node keeps its condition /// state warm for failover but must NOT push the command. Drop (debug-log) on non-primary. /// Resolve the inverse map to the owning /// (DriverInstanceId, FullName = ConditionId); an unmapped node is debug-logged + dropped /// (no throw) — mirrors 's unknown-ref drop. /// Resolve the running driver child and Tell it a /// carrying the wire-ref FullName + principal. /// /// private void HandleRouteNativeAlarmAck(RouteNativeAlarmAck msg) { // PRIMARY GATE FIRST — only the Primary services operator acks (same signal as the inbound-write + // alarm-emit gates; unknown role ⇒ treated as Primary so single-node deploys + the boot window aren't blocked). if (_localRole is RedundancyRole.Secondary or RedundancyRole.Detached) { _log.Debug("DriverHost {Node}: dropping native-alarm ack for {Cond} — not primary", _localNode, msg.ConditionNodeId); return; } if (!_driverRefByAlarmNodeId.TryGetValue(msg.ConditionNodeId, out var target)) { _log.Debug("DriverHost {Node}: no driver mapping for alarm condition node {Cond} — ack dropped", _localNode, msg.ConditionNodeId); return; } if (!_children.TryGetValue(target.DriverInstanceId, out var entry)) { _log.Debug("DriverHost {Node}: driver {Driver} not running for alarm ack of {Cond} — dropped", _localNode, target.DriverInstanceId, msg.ConditionNodeId); return; } // Fire-and-forget: the OPC UA Part 9 ack already committed the local condition state, and the // driver's AcknowledgeAsync surfaces no per-condition status, so there is nothing to reply. The // driver correlates on ConditionId (= the authored alarm FullName the inverse map keyed on). entry.Actor.Tell(new DriverInstanceActor.RouteAlarmAck(target.FullName, msg.Comment, msg.OperatorUser)); } /// Caches this node's from a cluster redundancy snapshot so /// can gate inbound writes to the Primary. A snapshot that doesn't /// mention this node leaves the cached role unchanged ⇒ default-allow. Mirrors /// ScriptedAlarmHostActor.OnRedundancyStateChanged / OpcUaPublishActor. private void OnRedundancyStateChanged(RedundancyStateChanged msg) { var local = msg.Nodes.FirstOrDefault(n => n.NodeId == _localNode); if (local is not null) { _localRole = local.Role; } } private void Stale() { Receive(_ => { _log.Warning("DriverHost {Node}: ignoring DispatchDeployment while Stale (DB unreachable)", _localNode); }); Receive(HandleGetDiagnostics); Receive(_ => TryRecoverFromStale()); Receive(HandleRestartDriver); Receive(HandleReconnectDriver); Receive(OnRedundancyStateChanged); // An inbound operator write can't be serviced while the config DB is unreachable — fast-fail so the // node-manager's bounded Ask gets an immediate clear status instead of dead-lettering into a timeout. Receive(_ => Sender.Tell(new NodeWriteResult(false, "driver host stale (config DB unreachable)"))); // An inbound native-condition ack can't be serviced while Stale either (the alarm inverse map is // empty until an apply runs). The ack is fire-and-forget (no reply), so just drop it with a log — // the local OPC UA condition state already committed on the Part 9 ack. Receive(msg => _log.Debug("DriverHost {Node}: dropping native-alarm ack for {Node2} while Stale (config DB unreachable)", _localNode, msg.ConditionNodeId)); Receive(_ => { /* PubSub ack */ }); Timers.StartPeriodicTimer("retry-db", RetryConfigDbConnection.Instance, ReconnectInterval); } private void HandleGetDiagnostics(GetDiagnostics msg) { var drivers = _children .Select(kv => new DriverInstanceDiagnostics( DriverInstanceId: Guid.Empty, Name: kv.Key, State: kv.Value.Stubbed ? "Stubbed" : "Spawned", ConnectedDevices: 0, FaultedDevices: 0, LastChangeUtc: DateTime.UtcNow)) .ToArray(); var snapshot = new NodeDiagnosticsSnapshot( NodeId: _localNode, CurrentRevision: _currentRevision, Drivers: drivers, AsOfUtc: DateTime.UtcNow); Sender.Tell(snapshot); } private void HandleDispatchFromSteady(DispatchDeployment msg) { if (_currentRevision is { } cur && cur == msg.RevisionHash) { // Idempotent — already at this rev. Ack and stay Steady. _log.Debug("DriverHost {Node}: dispatch {Id} matches current rev {Rev}; immediate ACK", _localNode, msg.DeploymentId, msg.RevisionHash); SendAck(msg.DeploymentId, ApplyAckOutcome.Applied, failureReason: null, msg.CorrelationId); return; } ApplyAndAck(msg.DeploymentId, msg.RevisionHash, msg.CorrelationId); } private void ApplyAndAck(DeploymentId deploymentId, RevisionHash revision, CorrelationId correlation) { _applyingDeploymentId = deploymentId; Become(Applying); using var span = OtOpcUaTelemetry.StartDeployApplySpan(deploymentId.ToString()); span?.SetTag("otopcua.node_id", _localNode.ToString()); span?.SetTag("otopcua.revision", revision.ToString()); span?.SetTag("otopcua.correlation_id", correlation.ToString()); var sw = Stopwatch.StartNew(); // Persist Applying row (idempotent on PK). UpsertNodeDeploymentState(deploymentId, NodeDeploymentStatus.Applying, failureReason: null); try { ReconcileDrivers(deploymentId); _currentRevision = revision; UpsertNodeDeploymentState(deploymentId, NodeDeploymentStatus.Applied, failureReason: null); SendAck(deploymentId, ApplyAckOutcome.Applied, failureReason: null, correlation); // Trigger the OPC UA address-space rebuild so the local SDK reflects the new // composition. The publish actor handles the load-compose-diff-apply pipeline; we // just forward the same correlation id so the audit trail joins up. _opcUaPublishActor?.Tell(new ZB.MOM.WW.OtOpcUa.Runtime.OpcUa.OpcUaPublishActor.RebuildAddressSpace(correlation, deploymentId)); // SubscribeBulk pass: hand each driver its desired tag references so live values flow into // the just-rebuilt address space instead of staying BadWaitingForInitialData. PushDesiredSubscriptions(deploymentId); OtOpcUaTelemetry.DeploymentApplied.Add(1, new KeyValuePair("outcome", "ack")); _log.Info("DriverHost {Node}: applied deployment {Id} (rev {Rev}, children={Count})", _localNode, deploymentId, revision, _children.Count); } catch (Exception ex) { UpsertNodeDeploymentState(deploymentId, NodeDeploymentStatus.Failed, ex.Message); SendAck(deploymentId, ApplyAckOutcome.Failed, ex.Message, correlation); OtOpcUaTelemetry.DeploymentApplied.Add(1, new KeyValuePair("outcome", "reject")); span?.SetStatus(ActivityStatusCode.Error, ex.Message); _log.Error(ex, "DriverHost {Node}: apply of {Id} failed", _localNode, deploymentId); } finally { OtOpcUaTelemetry.DeploymentApplyDurationSec.Record(sw.Elapsed.TotalSeconds); _applyingDeploymentId = null; Become(Steady); } } /// /// Read the deployment artifact + reconcile the set of running /// children. Spawn missing, ApplyDelta on config change, stop removed/disabled drivers. /// When the artifact blob is empty (legacy ControlPlane tests, smoke fixtures) or the /// configured can't materialise any of the requested /// types, this is effectively a no-op. /// private void ReconcileDrivers(DeploymentId deploymentId) { byte[] blob; try { using var db = _dbFactory.CreateDbContext(); blob = db.Deployments.AsNoTracking() .Where(d => d.DeploymentId == deploymentId.Value) .Select(d => d.ArtifactBlob) .FirstOrDefault() ?? Array.Empty(); } catch (Exception ex) { _log.Warning(ex, "DriverHost {Node}: failed to load artifact for {Id}; skipping reconcile", _localNode, deploymentId); return; } var specs = DeploymentArtifact.ParseDriverInstances(blob, _localNode.Value); var snapshots = _children.ToDictionary( kv => kv.Key, kv => new DriverChildSnapshot(kv.Value.DriverType, kv.Value.LastConfigJson), StringComparer.Ordinal); var plan = DriverSpawnPlanner.Compute(snapshots, specs); foreach (var id in plan.ToStop) StopChild(id); foreach (var spec in plan.ToApplyDelta) ApplyChildDelta(spec); foreach (var spec in plan.ToSpawn) SpawnChild(spec); } /// /// Restore the served state for an already-applied deployment after a process restart. /// recovers from NodeDeploymentState, /// but the driver children and OPC UA address space are in-memory and gone after a restart — /// so without this a restarted node serves an empty address space until the next config /// change (and an identical-config redeploy no-ops on the unchanged revision). Re-spawns /// drivers, rebuilds the address space from the applied artifact, and re-pushes SubscribeBulk. /// No re-ack: the deployment is already Applied. /// private void RestoreApplied(DeploymentId deploymentId) { var correlation = CorrelationId.NewId(); try { ReconcileDrivers(deploymentId); _opcUaPublishActor?.Tell(new ZB.MOM.WW.OtOpcUa.Runtime.OpcUa.OpcUaPublishActor.RebuildAddressSpace(correlation, deploymentId)); PushDesiredSubscriptions(deploymentId); _log.Info("DriverHost {Node}: restored served state for applied deployment {Id} on bootstrap", _localNode, deploymentId); } catch (Exception ex) { _log.Warning(ex, "DriverHost {Node}: failed to restore served state for {Id} on bootstrap", _localNode, deploymentId); } } /// /// SubscribeBulk pass. After an apply, read the deployment's Equipment-namespace tags, /// group their driver-side FullName references by driver instance, and hand each running driver /// child its desired subscription set via . /// The child retains the set and (re)subscribes on every Connected entry, so values stream into /// the OPC UA sink and resume after reconnects. Drivers with no configured tags get an empty set /// (which clears any stale subscription from a previous deployment). /// private void PushDesiredSubscriptions(DeploymentId deploymentId) { byte[] blob; try { using var db = _dbFactory.CreateDbContext(); blob = db.Deployments.AsNoTracking() .Where(d => d.DeploymentId == deploymentId.Value) .Select(d => d.ArtifactBlob) .FirstOrDefault() ?? Array.Empty(); } catch (Exception ex) { _log.Warning(ex, "DriverHost {Node}: failed to load artifact for SubscribeBulk ({Id})", _localNode, deploymentId); return; } Phase7CompositionResult composition; try { composition = DeploymentArtifact.ParseComposition(blob, _localNode.Value); } catch (Exception ex) { _log.Warning(ex, "DriverHost {Node}: failed to parse composition for SubscribeBulk ({Id})", _localNode, deploymentId); return; } // Value-subscription set: alarm-bearing tags are Part 9 conditions, not value variables, so they // are excluded — the driver must not value-subscribe an alarm attribute (it is fed via the native // alarm event stream, routed by ForwardNativeAlarm). var refsByDriver = composition.EquipmentTags .Where(t => t.Alarm is null) .GroupBy(t => t.DriverInstanceId, StringComparer.Ordinal) .ToDictionary( g => g.Key, g => (IReadOnlyList)g.Select(t => t.FullName) .Distinct(StringComparer.Ordinal) .ToArray(), StringComparer.Ordinal); // Native-alarm subscription set: the alarm-bearing tags' FullNames (= the driver's // ConditionId/AlarmFullReference). An IAlarmSource driver suppresses OnAlarmEvent until at least one // alarm subscription exists (e.g. GalaxyDriver gates its central feed on _alarmSubscriptions), so the // instance actor must SubscribeAlarmsAsync these refs to un-gate the feed. Routing stays by // ConditionId in ForwardNativeAlarm; this set just opens (and scopes) the subscription. var alarmRefsByDriver = composition.EquipmentTags .Where(t => t.Alarm is not null) .GroupBy(t => t.DriverInstanceId, StringComparer.Ordinal) .ToDictionary( g => g.Key, g => (IReadOnlyList)g.Select(t => t.FullName) .Distinct(StringComparer.Ordinal) .ToArray(), StringComparer.Ordinal); // Rebuild the driver live-value routing map from the SAME EquipmentTags pass (mirrors // VirtualTagHostActor._nodeIdByVtag): map each tag's (DriverInstanceId, FullName) wire-ref to // the folder-scoped equipment NodeId the materialiser placed its variable at, so ForwardToMux // can land driver values on the right node. Clear-and-repopulate every apply so renames // (Name/FolderPath/EquipmentId changes) and removals are reflected. _nodeIdByDriverRef.Clear(); // Inverse map for the inbound operator-write path (NodeId → (DriverInstanceId, FullName)): an // operator writes to the variable's folder-scoped NodeId, but the driver writes by its wire-ref // FullName. Cleared + repopulated from the SAME EquipmentTags pass so renames/removals are // reflected. Each NodeId maps to exactly one driver ref (a variable is backed by a single driver // attribute), so last-writer-wins on the rare duplicate is harmless. _driverRefByNodeId.Clear(); // Alarm condition routing map: (DriverInstanceId, FullName = alarm ConditionId/AlarmFullReference) → folder-scoped // condition NodeId(s). Built from the SAME EquipmentTags pass (alarm-bearing tags only) so // ForwardNativeAlarm can land a native transition on the right condition node. Clear-and-rebuild // every apply; the projector is Clear()'d too so stale per-condition state never leaks across // redeploys (renames/removals/address-space rebuilds). _alarmNodeIdByDriverRef.Clear(); // Inverse alarm map for the inbound native-condition ack path (condition NodeId → (DriverInstanceId, // FullName)): an OPC UA client acknowledges the condition's folder-scoped NodeId, but the driver // acknowledges by its wire-ref FullName (= ConditionId). Cleared + repopulated from the SAME // alarm-bearing EquipmentTags pass so renames/removals are reflected. Each condition NodeId maps to // exactly one driver ref (a condition is backed by a single driver alarm), so last-writer-wins on the // rare duplicate is harmless. _driverRefByAlarmNodeId.Clear(); // Per-condition metadata (EquipmentId / Name / OPC UA alarm type) for the alerts fan-out, built in // the SAME alarm branch as the node map so a redeploy can't leave it out of sync. Cleared alongside it. _alarmMetaByNodeId.Clear(); _nativeAlarmProjector.Clear(); foreach (var t in composition.EquipmentTags) { var key = (t.DriverInstanceId, t.FullName); var nodeId = EquipmentNodeIds.Variable(t.EquipmentId, t.FolderPath, t.Name); if (t.Alarm is not null) { // Alarm tags are conditions, not value variables: route them ONLY into the alarm map and // keep them OUT of the value maps + value-subscription set (so they don't get both a value // variable AND a condition). if (!_alarmNodeIdByDriverRef.TryGetValue(key, out var aset)) _alarmNodeIdByDriverRef[key] = aset = new HashSet(StringComparer.Ordinal); aset.Add(nodeId); // Inverse 1:1 map for the inbound native-condition ack path: the materialised condition // NodeId routes back to the owning (DriverInstanceId, FullName=ConditionId) so an OPC UA // acknowledge of this condition reaches the right driver child. _driverRefByAlarmNodeId[nodeId] = key; // Capture the per-condition metadata the alerts fan-out (ForwardNativeAlarm) needs to build // the AlarmTransitionEvent: the equipment path, the operator-visible alarm name, and the // OPC UA Part 9 subtype. Keyed by the condition NodeId (the projection's own key). _alarmMetaByNodeId[nodeId] = (t.EquipmentId, t.Name, t.Alarm.AlarmType, t.Alarm.HistorizeToAveva); continue; } if (!_nodeIdByDriverRef.TryGetValue(key, out var set)) _nodeIdByDriverRef[key] = set = new HashSet(StringComparer.Ordinal); set.Add(nodeId); _driverRefByNodeId[nodeId] = key; } var total = 0; foreach (var (driverId, entry) in _children) { var refs = refsByDriver.TryGetValue(driverId, out var r) ? r : Array.Empty(); var alarmRefs = alarmRefsByDriver.TryGetValue(driverId, out var ar) ? ar : Array.Empty(); entry.Actor.Tell(new DriverInstanceActor.SetDesiredSubscriptions(refs, SubscriptionPublishingInterval, alarmRefs)); total += refs.Count; } if (total > 0) { _log.Info("DriverHost {Node}: SubscribeBulk pushed {Refs} references across {Drivers} driver(s)", _localNode, total, refsByDriver.Count); } // Hand the Equipment-namespace VirtualTags to the host so it spawns/reconciles a // VirtualTagActor per plan and streams their evaluated values back onto the just-rebuilt // address space. Runs on BOTH the fresh-apply path (ApplyAndAck) and the bootstrap-restore // path (RestoreApplied) because both call this method, so one send covers both. // NOTE: the Stale-recovery path (TryRecoverFromStale) does NOT call PushDesiredSubscriptions, // so — like drivers — VirtualTags remain empty after a Stale recovery until the next // deployment dispatch. This is intentional and consistent with driver recovery: the Stale // path only restores the revision marker + NodeDeploymentState; a subsequent dispatch // (or a redeploy from AdminUI) triggers the full apply + subscribe pass. _virtualTagHost?.Tell(new VirtualTagHostActor.ApplyVirtualTags(composition.EquipmentVirtualTags)); if (composition.EquipmentVirtualTags.Count > 0) { _log.Info("DriverHost {Node}: applied {Count} Equipment VirtualTag(s) to the VirtualTag host", _localNode, composition.EquipmentVirtualTags.Count); } // Same pass for Equipment-namespace ScriptedAlarms: hand the plans to the host so it // (re)loads its engine + re-registers mux interest for the union of dependency refs. Covers // both the fresh-apply and bootstrap-restore paths (both call this method); the Stale-recovery // path deliberately does not, matching driver + VirtualTag recovery. _scriptedAlarmHost?.Tell(new ScriptedAlarmHostActor.ApplyScriptedAlarms(composition.EquipmentScriptedAlarms)); if (composition.EquipmentScriptedAlarms.Count > 0) { _log.Info("DriverHost {Node}: applied {Count} Equipment ScriptedAlarm(s) to the ScriptedAlarm host", _localNode, composition.EquipmentScriptedAlarms.Count); } } private void SpawnChild(DriverInstanceSpec spec) { var stub = DriverInstanceActor.ShouldStub(spec.DriverType, _localRoles); IDriver? driver = null; if (!stub) { try { driver = _driverFactory.TryCreate(spec.DriverType, spec.DriverInstanceId, spec.DriverConfig); } catch (Exception ex) { _log.Warning(ex, "DriverHost {Node}: factory for {Type} threw on {Id}; stubbing", _localNode, spec.DriverType, spec.DriverInstanceId); } if (driver is null) { _log.Warning( "DriverHost {Node}: no factory for driver type {Type} (instance {Id}); falling back to stub", _localNode, spec.DriverType, spec.DriverInstanceId); stub = true; } } IActorRef child; // Prefer the real ClusterId from the deployment artifact; fall back to the local node // identity for pre-PR artifacts that don't carry it yet (older deploys persisted before // ClusterId was added to DriverInstanceSpec). var clusterId = !string.IsNullOrEmpty(spec.ClusterId) ? spec.ClusterId : _localNode.Value; // A fresh generation-suffixed name on EVERY spawn so a respawn can never collide with a child // still tearing down: Akka frees a stopped actor's name only after it FULLY terminates (async), // but HandleRestartDriver stops + respawns within one (synchronous) message handler — the old // child is still registered, so reusing the base name throws InvalidActorNameException // ("actor name is not unique"). Children are tracked by the _children dict (by IActorRef), never // by path, so the suffix is invisible to every caller. var actorName = ActorNameFor(spec.DriverInstanceId, _childSpawnGeneration++); if (stub) { child = Context.ActorOf( DriverInstanceActor.Props( new StubbedDriver(spec.DriverInstanceId, spec.DriverType), reconnectInterval: null, startStubbed: true, healthPublisher: _healthPublisher, clusterId: clusterId), actorName); } else { child = Context.ActorOf( DriverInstanceActor.Props( driver!, healthPublisher: _healthPublisher, clusterId: clusterId), actorName); child.Tell(new DriverInstanceActor.InitializeRequested(spec.DriverConfig)); } _children[spec.DriverInstanceId] = new ChildEntry(child, spec, stub); _log.Info("DriverHost {Node}: spawned {Type} driver {Id} (stub={Stub})", _localNode, spec.DriverType, spec.DriverInstanceId, stub); } private void ApplyChildDelta(DriverInstanceSpec spec) { if (!_children.TryGetValue(spec.DriverInstanceId, out var entry)) return; entry.Actor.Tell(new DriverInstanceActor.ApplyDelta(spec.DriverConfig, CorrelationId.NewId())); // Store the full new spec — a delta can change Name, Enabled, ClusterId, etc. in addition to config. _children[spec.DriverInstanceId] = entry with { Spec = spec }; _log.Debug("DriverHost {Node}: ApplyDelta queued for {Id}", _localNode, spec.DriverInstanceId); } private void StopChild(string driverInstanceId) { if (!_children.TryGetValue(driverInstanceId, out var entry)) return; Context.Stop(entry.Actor); _children.Remove(driverInstanceId); _log.Info("DriverHost {Node}: stopped driver child {Id}", _localNode, driverInstanceId); } private static string ActorNameFor(string driverInstanceId, long generation) { // Akka actor names cannot contain '/', ':', or whitespace. Mangle defensively. The monotonic // generation suffix guarantees a never-before-used name on every spawn (see SpawnChild) so a // restart's respawn can never collide with the still-terminating predecessor. var chars = driverInstanceId.Select(c => char.IsLetterOrDigit(c) || c is '-' or '_' or '.' ? c : '_').ToArray(); return "drv-" + new string(chars) + "-g" + generation; } /// /// Minimal placeholder driver used when no factory is registered for a driver type or when /// returns true. /// is started with startStubbed:true so the driver methods on this object never run. /// private sealed class StubbedDriver : IDriver { /// public string DriverInstanceId { get; } /// public string DriverType { get; } /// Initializes a new stubbed driver with the specified ID and type. /// The driver instance identifier. /// The driver type name. public StubbedDriver(string id, string type) { DriverInstanceId = id; DriverType = type; } /// public Task InitializeAsync(string driverConfigJson, CancellationToken cancellationToken) => Task.CompletedTask; /// public Task ReinitializeAsync(string driverConfigJson, CancellationToken cancellationToken) => Task.CompletedTask; /// public Task ShutdownAsync(CancellationToken cancellationToken) => Task.CompletedTask; /// public DriverHealth GetHealth() => new(DriverState.Healthy, DateTime.UtcNow, LastError: null); /// public long GetMemoryFootprint() => 0; /// public Task FlushOptionalCachesAsync(CancellationToken cancellationToken) => Task.CompletedTask; } private void HandleRestartDriver(RestartDriver msg) { // DPS broadcast — only act if this node hosts the requested instance. if (!_children.TryGetValue(msg.DriverInstanceId, out var entry)) return; _log.Info("DriverHost {Node}: restarting driver {Id} by request of {User}", _localNode, msg.DriverInstanceId, msg.ActorByUserName); // Stop the existing child actor — DriverInstanceActor.PostStop calls ShutdownAsync. Context.Stop(entry.Actor); _children.Remove(msg.DriverInstanceId); // Respawn from the same spec the last reconcile used — preserves RowId, Name, ClusterId. SpawnChild(entry.Spec); } private void HandleReconnectDriver(ReconnectDriver msg) { // DPS broadcast — only act if this node hosts the requested instance. if (!_children.TryGetValue(msg.DriverInstanceId, out var entry)) return; _log.Info("DriverHost {Node}: reconnecting driver {Id} by request of {User}", _localNode, msg.DriverInstanceId, msg.ActorByUserName); // Tell the child to drop its transport and re-enter the Reconnecting state. entry.Actor.Tell(new DriverInstanceActor.ForceReconnect()); } private void TryRecoverFromStale() { try { using var db = _dbFactory.CreateDbContext(); var latestSealed = db.Deployments .AsNoTracking() .Where(d => d.Status == DeploymentStatus.Sealed) .OrderByDescending(d => d.SealedAtUtc) .Select(d => new { d.DeploymentId, d.RevisionHash }) .FirstOrDefault(); _log.Info("DriverHost {Node}: ConfigDb back; recovering from Stale", _localNode); Timers.Cancel("retry-db"); if (latestSealed is not null) { _currentRevision = RevisionHash.Parse(latestSealed.RevisionHash); UpsertNodeDeploymentState(new DeploymentId(latestSealed.DeploymentId), NodeDeploymentStatus.Applied, failureReason: null); } Become(Steady); } catch (Exception ex) { _log.Debug(ex, "DriverHost {Node}: still Stale; will retry in {Interval}", _localNode, ReconnectInterval); } } private void UpsertNodeDeploymentState(DeploymentId deploymentId, NodeDeploymentStatus status, string? failureReason) { try { using var db = _dbFactory.CreateDbContext(); var existing = db.NodeDeploymentStates.FirstOrDefault( x => x.NodeId == _localNode.Value && x.DeploymentId == deploymentId.Value); if (existing is null) { db.NodeDeploymentStates.Add(new NodeDeploymentState { NodeId = _localNode.Value, DeploymentId = deploymentId.Value, Status = status, FailureReason = failureReason, AppliedAtUtc = status == NodeDeploymentStatus.Applied ? DateTime.UtcNow : null, }); } else { existing.Status = status; existing.FailureReason = failureReason; if (status == NodeDeploymentStatus.Applied) existing.AppliedAtUtc = DateTime.UtcNow; } db.SaveChanges(); } catch (Exception ex) { _log.Warning(ex, "DriverHost {Node}: failed to upsert NodeDeploymentState for {Id}", _localNode, deploymentId); } } private void SendAck(DeploymentId deploymentId, ApplyAckOutcome outcome, string? failureReason, CorrelationId correlation) { var ack = new ApplyAck(deploymentId, _localNode, outcome, failureReason, correlation); if (_coordinatorOverride is not null) { _coordinatorOverride.Tell(ack); } else { // No direct coordinator handle — publish on the dedicated ACK topic. The coordinator // singleton subscribes there in PreStart so the ACK reaches whichever admin node hosts // it without an actor-path lookup. DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish(DeploymentAcksTopic, ack)); } } }