using Akka.Actor; using Akka.Cluster; using Akka.Cluster.Tools.PublishSubscribe; using Akka.Event; using Microsoft.EntityFrameworkCore; using ZB.MOM.WW.OtOpcUa.Cluster.Redundancy; using ZB.MOM.WW.OtOpcUa.Commons.Messages.Redundancy; using ZB.MOM.WW.OtOpcUa.Commons.Observability; using ZB.MOM.WW.OtOpcUa.Commons.OpcUa; using ZB.MOM.WW.OtOpcUa.Commons.Types; using ZB.MOM.WW.OtOpcUa.Configuration; using ZB.MOM.WW.OtOpcUa.OpcUaServer; using ZB.MOM.WW.OtOpcUa.Runtime.Drivers; using ZB.MOM.WW.OtOpcUa.Runtime.Health; namespace ZB.MOM.WW.OtOpcUa.Runtime.OpcUa; /// /// Single-threaded bridge between Akka messages and the OPC UA SDK address space. Hosted on /// the pinned opcua-synchronized-dispatcher (Task 19 HOCON) so the OPC UA SDK sees /// only one thread per actor instance — its session/subscription locks expect strict /// single-threaded access. /// /// Address-space writes route through ; ServiceLevel /// writes route through . Production binds SDK-backed /// implementations; dev/Mac/tests bind the Null* defaults so the actor stays decoupled from /// Opc.Ua.Server. The remaining piece is wiring those bindings to a real /// StandardServer address space — tracked as F10b. /// public sealed class OpcUaPublishActor : ReceiveActor { public const string DispatcherId = "opcua-synchronized-dispatcher"; public const string RedundancyStateTopic = "redundancy-state"; public sealed record AttributeValueUpdate(string NodeId, object? Value, OpcUaQuality Quality, DateTime TimestampUtc); /// Carries the full Part 9 condition state for a scripted alarm to the sink. The /// snapshot is the Commons projection the Runtime host maps from the engine's /// Core AlarmConditionState + severity/message — the actor stays decoupled from /// Core.ScriptedAlarms. /// The alarm node id (== ScriptedAlarmId for materialised conditions). /// The full condition state to project onto the node. /// The source timestamp of the transition in UTC. public sealed record AlarmStateUpdate(string AlarmNodeId, AlarmConditionSnapshot State, DateTime TimestampUtc); /// /// Triggers an address-space rebuild. is the deployment /// just applied by the host; the rebuild loads THAT artifact so materialisation matches the /// applied config + the SubscribeBulk pass. It is null only for legacy/dev callers, which /// fall back to the latest sealed deployment (lags a not-yet-sealed apply by one revision). /// public sealed record RebuildAddressSpace(CorrelationId Correlation, DeploymentId? DeploymentId = null); public sealed record ServiceLevelChanged(byte ServiceLevel); private readonly IOpcUaAddressSpaceSink _sink; private readonly IServiceLevelPublisher _serviceLevel; private readonly bool _subscribeRedundancyTopic; private readonly NodeId? _localNode; private readonly IDbContextFactory? _dbFactory; private readonly Phase7Applier? _applier; private readonly IActorRef? _dbHealthProbe; private readonly TimeSpan _staleWindow; private TimeSpan _probeFreshnessWindow; private readonly Akka.Cluster.Cluster _cluster = Akka.Cluster.Cluster.Get(Context.System); private readonly ILoggingAdapter _log = Context.GetLogger(); private int _writes; private byte _lastServiceLevel; private bool _publishedAtLeastOnce; private DbHealthProbeActor.DbHealthStatus? _lastDbHealth; private RedundancyStateChanged? _lastSnapshot; private (bool Ok, DateTime At)? _probeAboutMe; private Phase7CompositionResult _lastApplied = new( Array.Empty(), Array.Empty(), Array.Empty(), Array.Empty(), Array.Empty()); /// Gets the number of writes performed. public int WriteCount => _writes; /// Gets the last published service level. public byte LastServiceLevel => _lastServiceLevel; /// Production Props — pins the OPC UA dispatcher + subscribes to the /// redundancy-state DPS topic so cluster transitions drive the local ServiceLevel /// publish path. When + are supplied, /// reads the latest deployment artifact + drives the /// applier through the sink. When is supplied the local /// ServiceLevel is computed via from real DB-health + /// staleness + role-leader inputs; otherwise the legacy role-only switch is used. /// The OPC UA address space sink. /// The service level publisher. /// The local cluster node ID. /// The optional database context factory. /// The optional Phase 7 applier. /// The optional ref; when null the /// legacy role-only ServiceLevel seam is used until a arrives. /// The window beyond which a DB-health sample or redundancy snapshot is /// considered stale; defaults to 30 seconds. /// The window beyond which a peer's OPC UA probe verdict about /// this node is considered stale (and thus given the benefit of the doubt rather than demoting); /// defaults to 30 seconds. public static Props Props( IOpcUaAddressSpaceSink? sink = null, IServiceLevelPublisher? serviceLevel = null, NodeId? localNode = null, IDbContextFactory? dbFactory = null, Phase7Applier? applier = null, IActorRef? dbHealthProbe = null, TimeSpan? staleWindow = null, TimeSpan? probeFreshnessWindow = null) => Akka.Actor.Props.Create(() => new OpcUaPublishActor( sink ?? NullOpcUaAddressSpaceSink.Instance, serviceLevel ?? NullServiceLevelPublisher.Instance, subscribeRedundancyTopic: true, localNode, dbFactory, applier, dbHealthProbe, staleWindow, probeFreshnessWindow)).WithDispatcher(DispatcherId); /// Test-only Props that omits the pinned-dispatcher requirement and skips the /// DPS subscribe so unit tests can spin up the actor on a vanilla TestKit cluster. /// The OPC UA address space sink. /// The service level publisher. /// Whether to subscribe to the redundancy topic. /// The local cluster node ID. /// The optional database context factory. /// The optional Phase 7 applier. /// The optional ref; when null the /// legacy role-only ServiceLevel seam is used until a arrives. /// The window beyond which a DB-health sample or redundancy snapshot is /// considered stale; defaults to 30 seconds. /// The window beyond which a peer's OPC UA probe verdict about /// this node is considered stale (and thus given the benefit of the doubt rather than demoting); /// defaults to 30 seconds. public static Props PropsForTests( IOpcUaAddressSpaceSink? sink = null, IServiceLevelPublisher? serviceLevel = null, bool subscribeRedundancyTopic = false, NodeId? localNode = null, IDbContextFactory? dbFactory = null, Phase7Applier? applier = null, IActorRef? dbHealthProbe = null, TimeSpan? staleWindow = null, TimeSpan? probeFreshnessWindow = null) => Akka.Actor.Props.Create(() => new OpcUaPublishActor( sink ?? NullOpcUaAddressSpaceSink.Instance, serviceLevel ?? NullServiceLevelPublisher.Instance, subscribeRedundancyTopic, localNode, dbFactory, applier, dbHealthProbe, staleWindow, probeFreshnessWindow)); /// Initializes a new instance of the class. /// The OPC UA address space sink. /// The service level publisher. /// Whether to subscribe to the redundancy topic. /// The local cluster node ID. /// The optional database context factory. /// The optional Phase 7 applier. /// The optional ref; when null the /// legacy role-only ServiceLevel seam is used until a arrives. /// The window beyond which a DB-health sample or redundancy snapshot is /// considered stale; defaults to 30 seconds. /// The window beyond which a peer's OPC UA probe verdict about /// this node is considered stale (and thus given the benefit of the doubt rather than demoting); /// defaults to 30 seconds. public OpcUaPublishActor( IOpcUaAddressSpaceSink sink, IServiceLevelPublisher serviceLevel, bool subscribeRedundancyTopic, NodeId? localNode, IDbContextFactory? dbFactory = null, Phase7Applier? applier = null, IActorRef? dbHealthProbe = null, TimeSpan? staleWindow = null, TimeSpan? probeFreshnessWindow = null) { _sink = sink; _serviceLevel = serviceLevel; _subscribeRedundancyTopic = subscribeRedundancyTopic; _localNode = localNode; _dbFactory = dbFactory; _applier = applier; _dbHealthProbe = dbHealthProbe; _staleWindow = staleWindow ?? TimeSpan.FromSeconds(30); _probeFreshnessWindow = probeFreshnessWindow ?? TimeSpan.FromSeconds(30); Receive(HandleAttributeUpdate); Receive(HandleAlarmUpdate); Receive(HandleRebuild); Receive(HandleServiceLevelChanged); Receive(HandleRedundancyStateChanged); Receive(HandleDbHealthStatus); Receive(HandlePeerProbe); Receive(_ => { /* PubSub ack */ }); } /// protected override void PreStart() { if (_subscribeRedundancyTopic) { DistributedPubSub.Get(Context.System).Mediator.Tell(new Subscribe(RedundancyStateTopic, Self)); } } private void HandleAttributeUpdate(AttributeValueUpdate msg) { try { _sink.WriteValue(msg.NodeId, msg.Value, msg.Quality, msg.TimestampUtc); Interlocked.Increment(ref _writes); OtOpcUaTelemetry.OpcUaSinkWrite.Add(1, new KeyValuePair("kind", "value")); } catch (Exception ex) { _log.Warning(ex, "OpcUaPublish: sink.WriteValue threw for {Node}", msg.NodeId); } } private void HandleAlarmUpdate(AlarmStateUpdate msg) { try { _sink.WriteAlarmCondition(msg.AlarmNodeId, msg.State, msg.TimestampUtc); Interlocked.Increment(ref _writes); OtOpcUaTelemetry.OpcUaSinkWrite.Add(1, new KeyValuePair("kind", "alarm")); } catch (Exception ex) { _log.Warning(ex, "OpcUaPublish: sink.WriteAlarmCondition threw for {Node}", msg.AlarmNodeId); } } private void HandleRebuild(RebuildAddressSpace msg) { using var span = OtOpcUaTelemetry.StartAddressSpaceRebuildSpan(); span?.SetTag("otopcua.correlation_id", msg.Correlation.ToString()); // Two modes: when dbFactory + applier are wired, do a real diff-and-apply pass against // the latest deployment artifact. Without them, fall back to a raw sink rebuild — the // F10b/dev path before the integration completes. if (_dbFactory is null || _applier is null) { try { _sink.RebuildAddressSpace(); OtOpcUaTelemetry.OpcUaSinkWrite.Add(1, new KeyValuePair("kind", "rebuild")); } catch (Exception ex) { _log.Error(ex, "OpcUaPublish: sink.RebuildAddressSpace threw (correlation={Correlation})", msg.Correlation); } return; } try { // Prefer the artifact of the deployment the host just applied — at apply time it is not // yet Sealed, so LoadLatestArtifact would return the PREVIOUS revision and materialise a // stale composition (variables that don't match the SubscribeBulk refs). Fall back to // latest-sealed only for legacy callers that don't carry a DeploymentId. var artifact = msg.DeploymentId is { } depId ? LoadArtifact(depId) : LoadLatestArtifact(); var composition = _localNode is { } ln ? DeploymentArtifact.ParseComposition(artifact, ln.Value, inconsistency => _log.Warning("OpcUaPublish {Node}: cross-cluster binding — {Message}", ln, inconsistency)) : DeploymentArtifact.ParseComposition(artifact); var plan = Phase7Planner.Compute(_lastApplied, composition); if (plan.IsEmpty) { _log.Debug("OpcUaPublish: rebuild requested but plan is empty (correlation={Correlation})", msg.Correlation); return; } var outcome = _applier.Apply(plan); _lastApplied = composition; // #85 — after the plan diff lands, rebuild the UNS folder hierarchy so OPC UA // clients see Area/Line/Equipment as proper folders. Idempotent; Phase7Applier // skips folders that already exist with the same node id. _applier.MaterialiseHierarchy(composition); // T14 — scripted alarms get their own pass right after the hierarchy so the equipment // folders they parent under already exist. Materialises real Part 9 AlarmConditionState // nodes (keyed by ScriptedAlarmId so AlarmStateUpdate writes target them); disabled // alarms are skipped. _applier.MaterialiseScriptedAlarms(composition); // Equipment-namespace tags get their own pass: ensures each signal's Variable (and any // FolderPath sub-folder) exists under its already-materialised equipment folder so // clients can browse them. Live values are pushed by DriverHostActor.ForwardToMux after // each subscription cycle; variables show BadWaitingForInitialData only until the first // publish interval fires. _applier.MaterialiseEquipmentTags(composition); // Equipment-namespace VirtualTags get their own pass right after the equipment tags: // ensures each computed signal's Variable (and any FolderPath sub-folder) exists under its // equipment folder with a folder-scoped NodeId. VirtualTagHostActor.OnResult pushes live // values once the first dependency update arrives; until then variables show BadWaitingForInitialData. _applier.MaterialiseEquipmentVirtualTags(composition); OtOpcUaTelemetry.OpcUaSinkWrite.Add(1, new KeyValuePair("kind", "rebuild")); _log.Info("OpcUaPublish: applied rebuild (correlation={Correlation}, added={Added}, removed={Removed}, changed={Changed}, rebuild={Rebuild})", msg.Correlation, outcome.AddedNodes, outcome.RemovedNodes, outcome.ChangedNodes, outcome.RebuildCalled); } catch (Exception ex) { _log.Error(ex, "OpcUaPublish: rebuild pipeline threw (correlation={Correlation})", msg.Correlation); } } /// Read a specific deployment's artifact blob from ConfigDb (the one just applied, /// which may not be Sealed yet). Empty array on any failure — parser treats it as "no composition". private byte[] LoadArtifact(DeploymentId deploymentId) { try { using var db = _dbFactory!.CreateDbContext(); return db.Deployments.AsNoTracking() .Where(d => d.DeploymentId == deploymentId.Value) .Select(d => d.ArtifactBlob) .FirstOrDefault() ?? Array.Empty(); } catch (Exception ex) { _log.Warning(ex, "OpcUaPublish: failed to load artifact for deployment {Id}; rebuild becomes no-op", deploymentId); return Array.Empty(); } } /// Read the most recent Sealed deployment's artifact blob from ConfigDb. /// Empty array on any failure — the parser treats empty blob as "no composition". private byte[] LoadLatestArtifact() { try { using var db = _dbFactory!.CreateDbContext(); return db.Deployments.AsNoTracking() .Where(d => d.Status == Configuration.Enums.DeploymentStatus.Sealed) .OrderByDescending(d => d.SealedAtUtc) .Select(d => d.ArtifactBlob) .FirstOrDefault() ?? Array.Empty(); } catch (Exception ex) { _log.Warning(ex, "OpcUaPublish: failed to load latest deployment artifact; rebuild becomes no-op"); return Array.Empty(); } } private void HandleServiceLevelChanged(ServiceLevelChanged msg) { // Always publish the FIRST computed level, even if it equals the byte-default 0. Otherwise a // node starting Detached/role-less (first level = 0) would be dedup'd away, leaving the SDK's // built-in default (255 = full service) standing — a degraded node wrongly advertising 255. if (_publishedAtLeastOnce && msg.ServiceLevel == _lastServiceLevel) return; _lastServiceLevel = msg.ServiceLevel; try { _serviceLevel.Publish(msg.ServiceLevel); _publishedAtLeastOnce = true; OtOpcUaTelemetry.ServiceLevelChange.Add(1, new KeyValuePair("level", msg.ServiceLevel)); _log.Debug("OpcUaPublish: ServiceLevel={Level}", msg.ServiceLevel); } catch (Exception ex) { _log.Warning(ex, "OpcUaPublish: ServiceLevel publisher threw at level {Level}", msg.ServiceLevel); } } /// Caches the latest redundancy snapshot and recomputes the local ServiceLevel. /// The actual byte is produced by — either via the /// health-aware (once a DB-health probe + sample are /// wired) or via the legacy role-only seam (back-compat / bootstrap). private void HandleRedundancyStateChanged(RedundancyStateChanged msg) { _lastSnapshot = msg; RecomputeServiceLevel(); } /// Caches the latest DB-health sample and recomputes the local ServiceLevel. The /// probe pushes these (or the actor Asks for them); either way the freshest sample feeds the /// calculator's DbReachable/Stale inputs. private void HandleDbHealthStatus(DbHealthProbeActor.DbHealthStatus msg) { _lastDbHealth = msg; RecomputeServiceLevel(); } /// Records a peer's OPC UA probe verdict about THIS node and recomputes the local /// ServiceLevel. The probe's is the /// target that was probed, so a result whose NodeId is not this node is about a peer and /// is ignored. A matching result is stamped with the receive time so /// can debounce stale verdicts. private void HandlePeerProbe(PeerOpcUaProbeActor.OpcUaProbeResult r) { // The result targets the probed node. If it isn't me, it's about a peer — ignore it. if (_localNode is null || r.NodeId != _localNode.Value) return; _probeAboutMe = (r.Ok, DateTime.UtcNow); RecomputeServiceLevel(); } /// The OPC UA self-probe input for the calculator: "did a peer recently observe MY OPC UA /// endpoint as reachable?" Returns true (benefit of the doubt) when no peer verdict has /// arrived yet (single-node / no peer) or when the latest verdict is older than /// (the peer went away — don't penalise this node for that). /// Only an actively-observed, RECENT Ok==false demotes. private bool OpcUaProbeOk() { if (_probeAboutMe is not { } verdict) return true; if (DateTime.UtcNow - verdict.At > _probeFreshnessWindow) return true; return verdict.Ok; } /// /// Computes the local OPC UA ServiceLevel and routes it through /// (the dedup/publish/metric handler). The full path is /// used once a DB-health probe is wired AND a sample has arrived; until then (and when no probe /// is supplied at all) a legacy role-only seam keeps the historical "primary-leader → 240, /// secondary → 100, detached → 0" behaviour. The calculator does not model Detached, so a /// detached local node is guarded to 0 before either path runs. /// private void RecomputeServiceLevel() { if (_localNode is null || _lastSnapshot is null) return; var entry = _lastSnapshot.Nodes.FirstOrDefault(n => n.NodeId == _localNode.Value); // The calculator does NOT model Detached — a healthy detached node would wrongly compute // 240, so guard it (and the missing-entry case) to 0 here. if (entry is null || entry.Role == RedundancyRole.Detached) { Self.Tell(new ServiceLevelChanged(0)); return; } // Legacy / back-compat seam: with no DB-health probe wired (or before the first sample // arrives) fall back to the old role-only switch. This preserves historical behaviour and // is the bootstrap value until the first DbHealthStatus lands. if (_dbHealthProbe is null || _lastDbHealth is null) { Self.Tell(new ServiceLevelChanged(LegacyRoleOnly(entry))); return; } var now = DateTime.UtcNow; var inputs = new NodeHealthInputs( MemberState: SafeSelfStatus(), DbReachable: _lastDbHealth.Reachable, OpcUaProbeOk: OpcUaProbeOk(), Stale: !_lastDbHealth.Reachable || (now - _lastDbHealth.AsOfUtc) > _staleWindow || (now - entry.AsOfUtc) > _staleWindow, IsDriverRoleLeader: entry.IsRoleLeaderForDriver); Self.Tell(new ServiceLevelChanged(ServiceLevelCalculator.Compute(inputs))); } /// The legacy role-only ServiceLevel switch (primary-leader → 240, primary → 200, /// secondary → 100, _ → 0). Preserved as the back-compat / bootstrap seam. private static byte LegacyRoleOnly(NodeRedundancyState entry) => entry.Role switch { RedundancyRole.Primary when entry.IsRoleLeaderForDriver => 240, RedundancyRole.Primary => 200, RedundancyRole.Secondary => 100, _ => 0, }; /// Reads this node's cluster , returning /// if the cluster is unavailable (so the calculator treats it /// as untrusted → 0 rather than throwing). private MemberStatus SafeSelfStatus() { try { return _cluster.SelfMember.Status; } catch (Exception ex) { _log.Debug(ex, "OpcUaPublish: SelfMember status unavailable; treating as Removed (ServiceLevel→0)"); return MemberStatus.Removed; } } }