using Akka.Actor; using Akka.Cluster.Tools.PublishSubscribe; using Akka.Event; using ZB.MOM.WW.OtOpcUa.Commons.Messages.Alerts; using ZB.MOM.WW.OtOpcUa.Commons.Messages.Redundancy; using ZB.MOM.WW.OtOpcUa.Commons.Types; using ZB.MOM.WW.OtOpcUa.Core.Abstractions; using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian; using ZB.MOM.WW.OtOpcUa.Runtime.OpcUa; using ZB.MOM.WW.OtOpcUa.Runtime.ScriptedAlarms; namespace ZB.MOM.WW.OtOpcUa.Runtime.Historian; /// /// Thin actor wrapper around . Engine code (ScriptedAlarmActor, /// Galaxy native alarm bridge, AB CIP ALMD reader) tells s to this /// actor; the actor enqueues them on the sink fire-and-forget. Production deployments register /// against IAlarmHistorianSink; the sink owns the /// durable queue + drain-to-Wonderware-pipe loop. The actor here owns nothing operational beyond /// the message contract — its job is to keep the engine actors on Akka's mailbox without blocking /// them on disk I/O or pipe handshakes. /// /// Query queue depth + drain health via . /// public sealed class HistorianAdapterActor : ReceiveActor { public sealed record GetStatus { public static readonly GetStatus Instance = new(); } private readonly IAlarmHistorianSink _sink; private readonly NodeId? _localNode; private readonly ILoggingAdapter _log = Context.GetLogger(); private IActorRef? _mediator; /// Cached local from the latest /// snapshot (null = unknown until the first snapshot arrives, or no wired). The /// durable sink enqueue in the handler is gated on this: only the /// Primary historizes (default-write while unknown so single-node deploys + the boot window never drop /// historization). private RedundancyRole? _localRole; /// Creates the props for a HistorianAdapterActor instance. /// The alarm historian sink implementation, or null to use a null sink. /// The local cluster node id, used to read this node's /// from the redundancy-state topic so only the Primary historizes to the durable sink. Null (the /// default) leaves the role unknown ⇒ default-write (single-node deploys + tests). /// Props configured for creating a HistorianAdapterActor. public static Props Props(IAlarmHistorianSink? sink = null, NodeId? localNode = null) => Akka.Actor.Props.Create(() => new HistorianAdapterActor(sink ?? NullAlarmHistorianSink.Instance, localNode)); /// Initializes a new instance of the HistorianAdapterActor class. /// The alarm historian sink to forward enqueued events to. /// The local cluster node id, used to read this node's /// from the redundancy-state topic so only the Primary historizes to the durable sink. Null leaves /// the role unknown ⇒ default-write (single-node deploys + tests). public HistorianAdapterActor(IAlarmHistorianSink sink, NodeId? localNode = null) { _sink = sink; _localNode = localNode; // A direct AlarmHistorianEvent source is kept for a future engine→historian path; it goes through the // SAME ShouldHistorize gate. WARNING: do NOT Tell an AlarmHistorianEvent for a transition that is ALSO // published on the `alerts` topic — the sink would then double-write (both handlers enqueue it). Receive(evt => { if (ShouldHistorize()) _ = EnqueueAsync(evt); }); // Live alarm transitions arrive off the cluster `alerts` DPS topic (subscribed in PreStart). The // Primary ScriptedAlarmHostActor publishes each transition ONCE, but DistributedPubSub fans that // single message to EVERY node's subscriber — including BOTH central nodes' historian adapters. The // ShouldHistorize gate keeps only the Primary writing ⇒ exactly-once across the warm pair. // NOTE: Translate is intentionally inside the gate so Secondary/Detached nodes never allocate a // discarded AlarmHistorianEvent. // t.HistorizeToAveva is not false: only explicit false suppresses the durable sink write. null // (CLR default for bool?) and true both historize. null is the rolling-restart / cross-version case: // an old-format message missing the field deserializes to null and is historized (default-on), so no // audit row is dropped at a handover — same posture as the AlarmTypeName null-coalesce in Translate. // The producer (ScriptedAlarmHostActor) always sets a concrete true/false. Receive(t => { if (ShouldHistorize() && t.HistorizeToAveva is not false) _ = EnqueueAsync(Translate(t)); }); Receive(_ => Sender.Tell(_sink.GetStatus())); // Cluster redundancy snapshots (published on the `redundancy-state` topic, subscribed in PreStart) // cache this node's role so the historize gate can scope the durable sink enqueue to the Primary. // The PubSub Subscribe acks (redundancy-state + alerts) are acked back to Self (no-op below). Receive(OnRedundancyStateChanged); Receive(_ => { }); } /// True iff this node should write to the durable sink: only the Primary historizes /// (default-write while the role is unknown so single-node deploys + the boot window never drop /// historization). DPS fans the Primary's single alerts publish to BOTH nodes' historians, so this /// gate is what keeps the durable write exactly-once across the warm-redundant pair. private bool ShouldHistorize() => _localRole is not (RedundancyRole.Secondary or RedundancyRole.Detached); /// Translates a live (the alerts-topic shape) into the /// historian's . AlarmTypeName is null-coalesced to /// "AlarmCondition": during a rolling restart Akka's JSON serializer applies the CLR default (null) to /// an old-format message's AlarmTypeName rather than the record's call-site default, and the /// historian must never store a null alarm type. /// The live transition published on the alerts topic. /// The translated historian event. private static AlarmHistorianEvent Translate(AlarmTransitionEvent t) => new( AlarmId: t.AlarmId, EquipmentPath: t.EquipmentPath, AlarmName: t.AlarmName, AlarmTypeName: string.IsNullOrEmpty(t.AlarmTypeName) ? "AlarmCondition" : t.AlarmTypeName, Severity: ToSeverity(t.Severity), EventKind: t.TransitionKind, Message: t.Message, User: t.User, Comment: t.Comment, TimestampUtc: t.TimestampUtc); /// Maps the OPC UA 1–1000 numeric severity carried on a transition back to the coarse /// , inverting ScriptedAlarmHostActor.SeverityToInt's bucket ceilings /// (Low=250, Medium=500, High=750, Critical=1000). /// The OPC UA 1–1000 numeric severity. /// The coarse alarm severity bucket. private static AlarmSeverity ToSeverity(int severity) => severity switch { <= 250 => AlarmSeverity.Low, <= 500 => AlarmSeverity.Medium, <= 750 => AlarmSeverity.High, _ => AlarmSeverity.Critical, }; /// Subscribes to the redundancy-state topic (so cluster role changes land as /// and cache this node's role — the historize enqueue is gated to /// the Primary so the alerts feed doesn't double-write across the warm-redundant pair) and to the /// alerts topic (so live s are translated + historized). protected override void PreStart() { _mediator = DistributedPubSub.Get(Context.System).Mediator; _mediator.Tell(new Subscribe(OpcUaPublishActor.RedundancyStateTopic, Self)); _mediator.Tell(new Subscribe(ScriptedAlarmHostActor.AlertsTopic, Self)); base.PreStart(); } /// Caches this node's from a cluster redundancy snapshot so the /// handler can gate the durable sink enqueue to the Primary. A snapshot /// that doesn't mention (or no local node wired) leaves the cached role unchanged /// ⇒ default-write. Mirrors 's handler. /// The cluster redundancy snapshot. private void OnRedundancyStateChanged(RedundancyStateChanged msg) { if (_localNode is null) { return; } var local = msg.Nodes.FirstOrDefault(n => n.NodeId == _localNode.Value); if (local is not null) { _localRole = local.Role; } } private async Task EnqueueAsync(AlarmHistorianEvent evt) { try { await _sink.EnqueueAsync(evt, CancellationToken.None); } catch (Exception ex) { _log.Error(ex, "Historian sink rejected event for {AlarmId} at {Ts}", evt.AlarmId, evt.TimestampUtc); } } }