using Akka.Actor; using Akka.Cluster.Tools.PublishSubscribe; using Akka.Event; using ZB.MOM.WW.OtOpcUa.Commons.Messages.Redundancy; using ZB.MOM.WW.OtOpcUa.Commons.Types; using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian; using ZB.MOM.WW.OtOpcUa.Runtime.OpcUa; namespace ZB.MOM.WW.OtOpcUa.Runtime.Historian; /// /// Thin actor wrapper around . Engine code (ScriptedAlarmActor, /// Galaxy native alarm bridge, AB CIP ALMD reader) tells s to this /// actor; the actor enqueues them on the sink fire-and-forget. Production deployments register /// against IAlarmHistorianSink; the sink owns the /// durable queue + drain-to-Wonderware-pipe loop. The actor here owns nothing operational beyond /// the message contract — its job is to keep the engine actors on Akka's mailbox without blocking /// them on disk I/O or pipe handshakes. /// /// Query queue depth + drain health via . /// public sealed class HistorianAdapterActor : ReceiveActor { public sealed record GetStatus { public static readonly GetStatus Instance = new(); } private readonly IAlarmHistorianSink _sink; private readonly NodeId? _localNode; private readonly ILoggingAdapter _log = Context.GetLogger(); private IActorRef? _mediator; /// Cached local from the latest /// snapshot (null = unknown until the first snapshot arrives, or no wired). The /// durable sink enqueue in the handler is gated on this: only the /// Primary historizes (default-write while unknown so single-node deploys + the boot window never drop /// historization). private RedundancyRole? _localRole; /// Creates the props for a HistorianAdapterActor instance. /// The alarm historian sink implementation, or null to use a null sink. /// The local cluster node id, used to read this node's /// from the redundancy-state topic so only the Primary historizes to the durable sink. Null (the /// default) leaves the role unknown ⇒ default-write (single-node deploys + tests). /// Props configured for creating a HistorianAdapterActor. public static Props Props(IAlarmHistorianSink? sink = null, NodeId? localNode = null) => Akka.Actor.Props.Create(() => new HistorianAdapterActor(sink ?? NullAlarmHistorianSink.Instance, localNode)); /// Initializes a new instance of the HistorianAdapterActor class. /// The alarm historian sink to forward enqueued events to. /// The local cluster node id, used to read this node's /// from the redundancy-state topic so only the Primary historizes to the durable sink. Null leaves /// the role unknown ⇒ default-write (single-node deploys + tests). public HistorianAdapterActor(IAlarmHistorianSink sink, NodeId? localNode = null) { _sink = sink; _localNode = localNode; Receive(evt => { // Warm-standby dedup (forward-looking): only the Primary historizes to the durable sink so a // future per-node feeder writes exactly once. Default-write until told Secondary/Detached so // single-node deploys + the boot window never drop historization. (Currently the actor has no // production feeder — this is a defensive guard for when engine→historian wiring lands.) if (_localRole is RedundancyRole.Secondary or RedundancyRole.Detached) { return; } // Fire-and-forget: SqliteStoreAndForwardSink persists to local SQLite synchronously // inside EnqueueAsync (it returns once the row is committed), so we don't block on // network/pipe latency. Failures are surfaced via GetStatus's LastError + drain state. _ = EnqueueAsync(evt); }); Receive(_ => Sender.Tell(_sink.GetStatus())); // Cluster redundancy snapshots (published on the `redundancy-state` topic, subscribed in PreStart) // cache this node's role so the AlarmHistorianEvent handler can gate the durable sink enqueue to the // Primary. The PubSub Subscribe is acked back to Self (no-op below). Receive(OnRedundancyStateChanged); Receive(_ => { }); } /// Subscribes to the redundancy-state topic so cluster role changes land as /// and cache this node's role — the historian enqueue is gated to /// the Primary so a future per-node feeder doesn't double-write across the warm-redundant pair. protected override void PreStart() { _mediator = DistributedPubSub.Get(Context.System).Mediator; _mediator.Tell(new Subscribe(OpcUaPublishActor.RedundancyStateTopic, Self)); base.PreStart(); } /// Caches this node's from a cluster redundancy snapshot so the /// handler can gate the durable sink enqueue to the Primary. A snapshot /// that doesn't mention (or no local node wired) leaves the cached role unchanged /// ⇒ default-write. Mirrors 's handler. /// The cluster redundancy snapshot. private void OnRedundancyStateChanged(RedundancyStateChanged msg) { if (_localNode is null) { return; } var local = msg.Nodes.FirstOrDefault(n => n.NodeId == _localNode.Value); if (local is not null) { _localRole = local.Role; } } private async Task EnqueueAsync(AlarmHistorianEvent evt) { try { await _sink.EnqueueAsync(evt, CancellationToken.None); } catch (Exception ex) { _log.Error(ex, "Historian sink rejected event for {AlarmId} at {Ts}", evt.AlarmId, evt.TimestampUtc); } } }