using Akka.Actor;
using Akka.Cluster.Tools.PublishSubscribe;
using Akka.Event;
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Alerts;
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Redundancy;
using ZB.MOM.WW.OtOpcUa.Commons.Types;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
using ZB.MOM.WW.OtOpcUa.Runtime.OpcUa;
using ZB.MOM.WW.OtOpcUa.Runtime.ScriptedAlarms;
namespace ZB.MOM.WW.OtOpcUa.Runtime.Historian;
///
/// Thin actor wrapper around . Engine code (ScriptedAlarmActor,
/// Galaxy native alarm bridge, AB CIP ALMD reader) tells s to this
/// actor; the actor enqueues them on the sink fire-and-forget. Production deployments register
/// against IAlarmHistorianSink; the sink owns the
/// durable queue + drain-to-Wonderware-pipe loop. The actor here owns nothing operational beyond
/// the message contract — its job is to keep the engine actors on Akka's mailbox without blocking
/// them on disk I/O or pipe handshakes.
///
/// Query queue depth + drain health via .
///
public sealed class HistorianAdapterActor : ReceiveActor
{
public sealed record GetStatus
{
public static readonly GetStatus Instance = new();
}
private readonly IAlarmHistorianSink _sink;
private readonly NodeId? _localNode;
private readonly ILoggingAdapter _log = Context.GetLogger();
private IActorRef? _mediator;
/// Cached local from the latest
/// snapshot (null = unknown until the first snapshot arrives, or no wired). The
/// durable sink enqueue in the handler is gated on this: only the
/// Primary historizes (default-write while unknown so single-node deploys + the boot window never drop
/// historization).
private RedundancyRole? _localRole;
/// Creates the props for a HistorianAdapterActor instance.
/// The alarm historian sink implementation, or null to use a null sink.
/// The local cluster node id, used to read this node's
/// from the redundancy-state topic so only the Primary historizes to the durable sink. Null (the
/// default) leaves the role unknown ⇒ default-write (single-node deploys + tests).
/// Props configured for creating a HistorianAdapterActor.
public static Props Props(IAlarmHistorianSink? sink = null, NodeId? localNode = null) =>
Akka.Actor.Props.Create(() => new HistorianAdapterActor(sink ?? NullAlarmHistorianSink.Instance, localNode));
/// Initializes a new instance of the HistorianAdapterActor class.
/// The alarm historian sink to forward enqueued events to.
/// The local cluster node id, used to read this node's
/// from the redundancy-state topic so only the Primary historizes to the durable sink. Null leaves
/// the role unknown ⇒ default-write (single-node deploys + tests).
public HistorianAdapterActor(IAlarmHistorianSink sink, NodeId? localNode = null)
{
_sink = sink;
_localNode = localNode;
// A direct AlarmHistorianEvent source (kept for a future engine→historian path) goes through the
// same Primary gate as the alerts-topic feed below.
Receive(Historize);
// Live alarm transitions arrive off the cluster `alerts` DPS topic (subscribed in PreStart). The
// Primary ScriptedAlarmHostActor publishes each transition ONCE, but DistributedPubSub fans that
// single message to EVERY node's subscriber — including BOTH central nodes' historian adapters. The
// Primary gate in Historize keeps only the Primary writing ⇒ exactly-once across the warm pair.
Receive(t => Historize(Translate(t)));
Receive(_ => Sender.Tell(_sink.GetStatus()));
// Cluster redundancy snapshots (published on the `redundancy-state` topic, subscribed in PreStart)
// cache this node's role so the historize gate can scope the durable sink enqueue to the Primary.
// The PubSub Subscribe acks (redundancy-state + alerts) are acked back to Self (no-op below).
Receive(OnRedundancyStateChanged);
Receive(_ => { });
}
/// Gates a historian event to the Primary then enqueues it fire-and-forget. Warm-standby
/// dedup: only the Primary historizes to the durable sink so the per-node alerts feed writes exactly
/// once. Default-write until told Secondary/Detached so single-node deploys + the boot window never
/// drop historization. Fire-and-forget because persists to
/// local SQLite synchronously inside (it returns once the row is committed),
/// so we don't block the mailbox on network/pipe latency; failures surface via .
/// The historian event to gate + enqueue.
private void Historize(AlarmHistorianEvent evt)
{
if (_localRole is RedundancyRole.Secondary or RedundancyRole.Detached)
{
return;
}
_ = EnqueueAsync(evt);
}
/// Translates a live (the alerts-topic shape) into the
/// historian's . AlarmTypeName is null-coalesced to
/// "AlarmCondition": during a rolling restart Akka's JSON serializer applies the CLR default (null) to
/// an old-format message's AlarmTypeName rather than the record's call-site default, and the
/// historian must never store a null alarm type.
/// The live transition published on the alerts topic.
/// The translated historian event.
private static AlarmHistorianEvent Translate(AlarmTransitionEvent t) => new(
AlarmId: t.AlarmId,
EquipmentPath: t.EquipmentPath,
AlarmName: t.AlarmName,
AlarmTypeName: string.IsNullOrEmpty(t.AlarmTypeName) ? "AlarmCondition" : t.AlarmTypeName,
Severity: ToSeverity(t.Severity),
EventKind: t.TransitionKind,
Message: t.Message,
User: t.User,
Comment: t.Comment,
TimestampUtc: t.TimestampUtc);
/// Maps the OPC UA 1–1000 numeric severity carried on a transition back to the coarse
/// , inverting ScriptedAlarmHostActor.SeverityToInt's bucket ceilings
/// (Low=250, Medium=500, High=750, Critical=1000).
/// The OPC UA 1–1000 numeric severity.
/// The coarse alarm severity bucket.
private static AlarmSeverity ToSeverity(int severity) => severity switch
{
<= 250 => AlarmSeverity.Low,
<= 500 => AlarmSeverity.Medium,
<= 750 => AlarmSeverity.High,
_ => AlarmSeverity.Critical,
};
/// Subscribes to the redundancy-state topic (so cluster role changes land as
/// and cache this node's role — the historize enqueue is gated to
/// the Primary so the alerts feed doesn't double-write across the warm-redundant pair) and to the
/// alerts topic (so live s are translated + historized).
protected override void PreStart()
{
_mediator = DistributedPubSub.Get(Context.System).Mediator;
_mediator.Tell(new Subscribe(OpcUaPublishActor.RedundancyStateTopic, Self));
_mediator.Tell(new Subscribe(ScriptedAlarmHostActor.AlertsTopic, Self));
base.PreStart();
}
/// Caches this node's from a cluster redundancy snapshot so the
/// handler can gate the durable sink enqueue to the Primary. A snapshot
/// that doesn't mention (or no local node wired) leaves the cached role unchanged
/// ⇒ default-write. Mirrors 's handler.
/// The cluster redundancy snapshot.
private void OnRedundancyStateChanged(RedundancyStateChanged msg)
{
if (_localNode is null)
{
return;
}
var local = msg.Nodes.FirstOrDefault(n => n.NodeId == _localNode.Value);
if (local is not null)
{
_localRole = local.Role;
}
}
private async Task EnqueueAsync(AlarmHistorianEvent evt)
{
try
{
await _sink.EnqueueAsync(evt, CancellationToken.None);
}
catch (Exception ex)
{
_log.Error(ex, "Historian sink rejected event for {AlarmId} at {Ts}",
evt.AlarmId, evt.TimestampUtc);
}
}
}