using Akka.Actor;
using Akka.Cluster.Tools.PublishSubscribe;
using Akka.Event;
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Alerts;
using ZB.MOM.WW.OtOpcUa.Commons.OpcUa;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms;
using ZB.MOM.WW.OtOpcUa.OpcUaServer;
using ZB.MOM.WW.OtOpcUa.Runtime.OpcUa;
using ZB.MOM.WW.OtOpcUa.Runtime.VirtualTags;
namespace ZB.MOM.WW.OtOpcUa.Runtime.ScriptedAlarms;
///
/// Akka host that owns one for an Equipment-namespace
/// driver node, feeds it live tag values, and bridges its emissions to OPC UA publish + the
/// cluster alerts DistributedPubSub topic.
///
///
/// The host is the engine's lifecycle owner: the caller (the driver-role startup) builds
/// the , constructs a
/// around that same upstream, and passes both here. The host disposes the engine in
/// .
///
///
///
/// Data flow. delivers a
/// for every tag the loaded alarms
/// depend on; the host pushes each into the upstream. The engine self-evaluates: its own
/// observer re-runs the predicates and raises
/// . The host does NOT call evaluate — it only
/// feeds values in and fans emissions out.
///
///
///
/// Thread-safety. fires on the engine's
/// background worker thread (its fire-and-forget re-evaluation), NOT on the actor thread.
/// The ctor subscribes a handler that does nothing but Self.Tell(new EngineEmission(e))
/// — is thread-safe. ALL sink work (touching ,
/// the publish actor ref, the DPS mediator) then happens on the actor thread in
/// . No or actor state is ever read or
/// written from the OnEvent callback.
///
///
///
/// Historization. The host publishes each transition as an
/// to the alerts topic ONLY. That topic is the
/// historization path: HistorianAdapterActor's upstream and the Admin UI Alerts page
/// both consume alerts. The host deliberately does NOT also Tell the historian
/// adapter directly — doing so would double-historize every transition. (T9 plan called for a
/// "direct historian tell"; that was dropped because the alerts topic already feeds the
/// historian path, so a direct tell would duplicate every row.)
///
///
public sealed class ScriptedAlarmHostActor : ReceiveActor
{
/// The cluster DistributedPubSub topic every alarm transition is published to. Matches
/// the constant the (retired) ScriptedAlarmActor used so subscribers stay wired.
public const string AlertsTopic = "alerts";
/// The cluster DistributedPubSub topic inbound OPC UA Part 9 alarm method calls
/// (Acknowledge / Confirm / Shelve / AddComment) are routed onto as s.
/// The OPC UA node manager's condition handlers build the command (after the AlarmAck role
/// gate); the host's boot wiring publishes it here; T19's engine-side subscriber consumes it.
public const string AlarmCommandsTopic = "alarm-commands";
/// Reconcile the loaded alarm set to exactly the enabled subset of :
/// builds s (skipping disabled plans), reloads the engine, and
/// re-registers mux interest for the union of dependency refs.
/// The desired Equipment-namespace scripted-alarm plans.
public sealed record ApplyScriptedAlarms(IReadOnlyList Plans);
/// Marshals an engine emission off the engine's worker thread onto the actor thread.
/// Carries the the engine raised on OnEvent.
private sealed record EngineEmission(ScriptedAlarmEvent Event);
/// Pipe-back completion of an in-flight :
/// carries the union of dependency refs to register mux interest for AFTER the load completed
/// (so the engine's upstream subscriptions exist before any value can arrive).
/// is the load generation captured when kicked the load off; a stale
/// continuation (an earlier generation completing after a newer apply) is discarded in
/// so out-of-order completions can't register stale dep refs.
private sealed record AlarmsLoaded(IReadOnlyList DepRefs, int Generation);
/// Pipe-back marker for a that was cancelled
/// because the actor is stopping (its fired in ). It is a
/// no-op — there is nothing to register and no fault to log — and exists only so a cancelled load
/// pipes back a quiet message instead of a that would log a spurious
/// Warning on every clean shutdown.
private sealed record AlarmsLoadCanceled;
private readonly IActorRef _publishActor;
private readonly IActorRef? _mux;
private readonly DependencyMuxTagUpstreamSource _upstream;
private readonly ScriptedAlarmEngine _engine;
private readonly ILoggingAdapter _log = Context.GetLogger();
private readonly CancellationTokenSource _cts = new();
private readonly EventHandler _onEngineEvent;
/// Monotonic load generation, bumped on every . The continuation that
/// pipes back an captures the generation it was started under; a stale
/// completion (an earlier generation arriving after a newer apply) is discarded in
/// so out-of-order loads never register superseded dep refs.
private int _loadGeneration;
/// Cached cluster DistributedPubSub mediator, resolved once in (on the
/// actor thread) and reused for every emission instead of re-resolving it per-publish.
private IActorRef _mediator = null!;
/// Factory method to create Props for a .
/// The OPC UA publish actor that consumes
/// bridged from engine emissions.
/// Optional dependency multiplexer the host registers interest with so it
/// receives a per dependency tag. Null on the
/// dev/Mac path (no live values).
/// The mux-fed upstream the engine reads + subscribes from. MUST be the
/// same instance the was constructed around.
/// The scripted-alarm engine this host owns + disposes.
public static Props Props(
IActorRef publishActor,
IActorRef? mux,
DependencyMuxTagUpstreamSource upstream,
ScriptedAlarmEngine engine) =>
Akka.Actor.Props.Create(() => new ScriptedAlarmHostActor(publishActor, mux, upstream, engine));
/// Initializes a new instance of the class.
/// The OPC UA publish actor emissions are bridged to.
/// Optional dependency multiplexer the host registers dependency interest with.
/// The mux-fed upstream the engine reads + subscribes from (same instance the engine wraps).
/// The scripted-alarm engine this host owns + disposes.
public ScriptedAlarmHostActor(
IActorRef publishActor,
IActorRef? mux,
DependencyMuxTagUpstreamSource upstream,
ScriptedAlarmEngine engine)
{
ArgumentNullException.ThrowIfNull(publishActor);
ArgumentNullException.ThrowIfNull(upstream);
ArgumentNullException.ThrowIfNull(engine);
_publishActor = publishActor;
_mux = mux;
_upstream = upstream;
_engine = engine;
// OnEvent fires on the engine's worker thread. NEVER touch Context / actor state here —
// marshal onto the actor thread via the thread-safe Self.Tell. Keep the handler in a field
// so PostStop can unsubscribe it. (Self is captured once; it is stable for the actor's life.)
var self = Self;
_onEngineEvent = (_, e) => self.Tell(new EngineEmission(e));
_engine.OnEvent += _onEngineEvent;
Receive(OnApply);
Receive(OnAlarmsLoaded);
Receive(OnDependencyChanged);
Receive(OnEngineEmission);
// Inbound OPC UA Part 9 alarm method calls arrive as AlarmCommands on the cluster
// `alarm-commands` DPS topic (T18 publishes them after the AlarmAck role gate). The topic is a
// cluster-wide broadcast — every host node receives every command — so OnAlarmCommand filters to
// the alarms THIS host's engine owns before driving the matching engine op. The engine ops are
// async, and this project's Akka analyzer (AK2003) forbids an async-void Receive delegate, so
// the handler is a Task-returning ReceiveAsync: Akka suspends the mailbox until the op completes
// (ordered, awaited on the actor thread) and routes any escaped fault through supervision.
ReceiveAsync(OnAlarmCommand);
// A faulted LoadAsync pipes back a Status.Failure (see OnApply) — log it and stay inert so the
// failure doesn't hit the dead-letter log.
Receive(OnLoadFailed);
// A LoadAsync cancelled by PostStop's _cts pipes back this marker. The actor is stopping, so
// there's nothing to do — swallow it quietly (no Warning, no dead letter).
Receive(_ => { });
// DPS Subscribe (PreStart) acks back here once the mediator has registered Self on the topic.
// No-op — the subscription is live the moment the ack arrives; we only need to keep it off the
// dead-letter log. Matches OpcUaPublishActor / DriverHostActor's SubscribeAck convention.
Receive(_ => { /* PubSub ack */ });
}
private void OnApply(ApplyScriptedAlarms msg)
{
// Bump the load generation and capture it into this apply's continuation. Rapid back-to-back
// applies kick off concurrent LoadAsyncs whose continuations may complete out of order; tagging
// each with its generation lets OnAlarmsLoaded discard any but the latest.
var gen = ++_loadGeneration;
// Skip disabled plans entirely — the engine has no Enabled flag, so a disabled alarm is simply
// not loaded (no predicate, no upstream subscription, no events).
var enabled = msg.Plans.Where(p => p.Enabled).ToList();
var defs = enabled.Select(ToDefinition).ToList();
// Union of dependency refs across the loaded (enabled) alarms — interest is registered with
// the mux only AFTER LoadAsync completes (see OnAlarmsLoaded), because the engine establishes
// its upstream SubscribeTag subscriptions inside LoadAsync; values must not arrive before then.
var depRefs = enabled
.SelectMany(p => p.DependencyRefs)
.Distinct(StringComparer.Ordinal)
.ToList();
// PipeTo marshals the completion back onto the actor thread. The continuation runs with
// CancellationToken.None so it ALWAYS executes (even after _cts fired on stop) and branches on
// the load's outcome: a clean cancel (actor stopping) pipes back a quiet AlarmsLoadCanceled; a
// genuine fault pipes back a Status.Failure we log as Warning (the prior generation's mux
// subscription stays — a failed reload should not silently drop live values); success pipes back
// AlarmsLoaded carrying the refs + the generation it was loaded under.
_engine.LoadAsync(defs, _cts.Token)
.ContinueWith(
t => t.IsCanceled ? (object)new AlarmsLoadCanceled()
: t.IsFaulted ? new Status.Failure(t.Exception!.GetBaseException())
: new AlarmsLoaded(depRefs, gen),
CancellationToken.None,
TaskContinuationOptions.ExecuteSynchronously,
TaskScheduler.Default)
.PipeTo(Self);
_log.Debug("ScriptedAlarmHost: applying (enabled={Enabled}/{Total}, depRefs={Refs})",
enabled.Count, msg.Plans.Count, depRefs.Count);
}
private void OnLoadFailed(Status.Failure msg)
=> _log.Warning(msg.Cause, "ScriptedAlarmHost: engine LoadAsync failed — alarms not (re)loaded");
private void OnAlarmsLoaded(AlarmsLoaded msg)
{
// Discard a stale completion: if a newer apply has since bumped the generation, this load's
// dep refs are superseded and registering them would re-introduce the old set. The latest
// generation's continuation will (or already did) register the current interest set.
if (msg.Generation != _loadGeneration)
{
return;
}
// Register mux interest only now that LoadAsync has returned — the engine's upstream
// subscriptions exist, so any DependencyValueChanged the mux delivers will be observed by the
// engine. RegisterInterest replaces a subscriber's prior interest set (see DependencyMuxActor),
// so a re-Apply with a fresh union simply supersedes the old one — no explicit unregister needed.
_mux?.Tell(new DependencyMuxActor.RegisterInterest(msg.DepRefs, Self));
_log.Debug("ScriptedAlarmHost: loaded; registered mux interest for {Count} dep refs", msg.DepRefs.Count);
}
private void OnDependencyChanged(VirtualTagActor.DependencyValueChanged msg)
{
// Feed the live value into the upstream the engine subscribes from. StatusCode 0 = Good; the
// mux only forwards values it received from a driver publish, so we treat them as Good-quality.
_upstream.Push(msg.TagId, new DataValueSnapshot(msg.Value, 0u, msg.TimestampUtc, msg.TimestampUtc));
}
private void OnEngineEmission(EngineEmission msg)
{
var e = msg.Event;
// None = no meaningful change; Suppressed = shelving ate the emission. Neither should reach a
// sink. (The engine already filters these out of BuildEmission, but guard defensively.)
if (e.Emission is EmissionKind.None or EmissionKind.Suppressed)
{
return;
}
// Bridge to OPC UA: project the FULL Part 9 condition state (enabled/active/acked/confirmed/
// shelving/severity/message) onto the materialised condition node via the Commons snapshot.
// e.AlarmId is the materialised condition's NodeId (T14 aligned it to the ScriptedAlarmId).
_publishActor.Tell(new OpcUaPublishActor.AlarmStateUpdate(
AlarmNodeId: e.AlarmId,
State: ToSnapshot(e),
TimestampUtc: e.TimestampUtc));
// Publish the transition to the cluster `alerts` topic — the single historization + live
// fan-out path. The mediator was cached on the ACTOR thread in PreStart; we only Tell it here.
var evt = new AlarmTransitionEvent(
AlarmId: e.AlarmId,
EquipmentPath: e.EquipmentPath,
AlarmName: e.AlarmName,
TransitionKind: e.Emission.ToString(),
Severity: SeverityToInt(e.Severity),
Message: e.Message,
User: TransitionUser(e),
TimestampUtc: e.TimestampUtc);
_mediator.Tell(new Publish(AlertsTopic, evt));
}
///
/// Drives an inbound OPC UA Part 9 alarm method call (delivered as an
/// on the cluster alarm-commands topic) onto the matching
/// operation.
///
///
/// Ownership filter. The topic is a cluster-wide broadcast; every host node receives
/// every command, but each owns a disjoint subset of alarms (its engine's loaded set). A
/// command for an alarm this engine does NOT own is a no-op — the owning node will act on it.
///
///
///
/// No re-projection. The engine op raises on
/// success, which already marshals back to and re-projects the
/// condition to the OPC UA node + the alerts topic. So this handler just calls the op and
/// awaits; it never touches the publish actor directly.
///
///
///
/// Async on the actor thread. The handler is a Task-returning
/// ReceiveAsync (this project's AK2003 analyzer forbids an async-void Receive
/// delegate). Akka suspends the actor's mailbox until the returned task completes, so the op
/// runs ordered + awaited on the actor thread — never overlapping the next message. The engine
/// also serialises every operation behind its own _evalGate and marshals every emission
/// back via Self.Tell (never touching off-thread). The whole body
/// is wrapped in a try/catch so a faulting op can never escape the handler and fault the actor
/// — failures are logged like and swallowed.
///
///
/// The inbound alarm command.
private async Task OnAlarmCommand(AlarmCommand cmd)
{
// Ownership filter FIRST: ignore commands for alarms this engine doesn't own. The topic is a
// cluster-wide broadcast, so the same command lands on every host — only the owner acts.
if (!_engine.LoadedAlarmIds.Contains(cmd.AlarmId))
{
_log.Debug("ScriptedAlarmHost: ignoring AlarmCommand {Op} for unowned alarm {AlarmId}",
cmd.Operation, cmd.AlarmId);
return;
}
try
{
switch (cmd.Operation)
{
case "Acknowledge":
await _engine.AcknowledgeAsync(cmd.AlarmId, cmd.User, cmd.Comment, CancellationToken.None);
break;
case "Confirm":
await _engine.ConfirmAsync(cmd.AlarmId, cmd.User, cmd.Comment, CancellationToken.None);
break;
case "OneShotShelve":
await _engine.OneShotShelveAsync(cmd.AlarmId, cmd.User, CancellationToken.None);
break;
case "TimedShelve":
// A timed shelve needs the absolute unshelve instant. T18 derives it from the OPC UA
// Duration (UtcNow + shelvingTime); a command missing it is malformed — log + reject
// rather than throw (a throw out of this async void would crash the actor).
if (cmd.UnshelveAtUtc is not { } unshelveAt)
{
_log.Warning("ScriptedAlarmHost: rejecting TimedShelve for {AlarmId} — missing UnshelveAtUtc",
cmd.AlarmId);
return;
}
await _engine.TimedShelveAsync(cmd.AlarmId, cmd.User, unshelveAt, CancellationToken.None);
break;
case "Unshelve":
await _engine.UnshelveAsync(cmd.AlarmId, cmd.User, CancellationToken.None);
break;
case "Enable":
await _engine.EnableAsync(cmd.AlarmId, cmd.User, CancellationToken.None);
break;
case "Disable":
await _engine.DisableAsync(cmd.AlarmId, cmd.User, CancellationToken.None);
break;
case "AddComment":
// AddComment's text is required by the engine (ApplyAddComment takes a non-null text);
// coalesce a null comment to empty so a comment-less AddComment is still a valid no-op
// rather than an NRE.
await _engine.AddCommentAsync(cmd.AlarmId, cmd.User, cmd.Comment ?? string.Empty, CancellationToken.None);
break;
default:
_log.Warning("ScriptedAlarmHost: ignoring AlarmCommand with unknown operation {Op} for {AlarmId}",
cmd.Operation, cmd.AlarmId);
break;
}
}
catch (Exception ex)
{
// A failing engine op must not crash the actor — mirror OnLoadFailed's log-and-stay-inert style.
_log.Warning(ex, "ScriptedAlarmHost: engine op {Op} failed for alarm {AlarmId}",
cmd.Operation, cmd.AlarmId);
}
}
///
protected override void PreStart()
{
// Resolve the cluster DPS mediator once, on the actor thread, so emissions only Tell it.
_mediator = DistributedPubSub.Get(Context.System).Mediator;
// Subscribe to the `alarm-commands` topic so inbound OPC UA Part 9 method calls (published by
// the node manager's condition handlers, T18) land here as AlarmCommands. The Subscribe is sent
// from Self so the SubscribeAck returns to this actor (handled as a no-op in the ctor wiring).
_mediator.Tell(new Subscribe(AlarmCommandsTopic, Self));
base.PreStart();
}
///
protected override void PostStop()
{
// Unregister mux interest first so no further DependencyValueChanged arrives while we tear down.
_mux?.Tell(new DependencyMuxActor.UnregisterInterest(Self));
// Cancel any in-flight LoadAsync, detach the OnEvent handler (so a late engine emission can't
// Tell a stopping actor), then dispose the engine (which drains its background work + clears).
_cts.Cancel();
_engine.OnEvent -= _onEngineEvent;
_engine.Dispose();
_cts.Dispose();
base.PostStop();
}
/// Maps an to the engine's
/// . is a
/// string that must parse to a defined NAME; an unrecognised type — including
/// a numeric string like "5" or "-1", which
/// would otherwise accept as an undefined value — falls back to
/// rather than dropping the alarm.
private static ScriptedAlarmDefinition ToDefinition(EquipmentScriptedAlarmPlan p) => new(
AlarmId: p.ScriptedAlarmId,
EquipmentPath: p.EquipmentId,
AlarmName: p.Name,
Kind: Enum.TryParse(p.AlarmType, out var k) && Enum.IsDefined(k) ? k : AlarmKind.AlarmCondition,
Severity: SeverityFromInt(p.Severity),
MessageTemplate: p.MessageTemplate,
PredicateScriptSource: p.PredicateSource,
HistorizeToAveva: p.HistorizeToAveva,
Retain: p.Retain);
/// Maps a 's Core +
/// severity/message down to the Commons the SDK sink projects.
/// Severity is the OPC UA 1..1000 value derives from the coarse engine
/// bucket, cast to the ushort the SDK SetSeverity expects. Shelving's 3-way Core kind
/// maps 1:1 onto the Commons .
private static AlarmConditionSnapshot ToSnapshot(ScriptedAlarmEvent e) => new(
Active: e.Condition.Active == AlarmActiveState.Active,
Acknowledged: e.Condition.Acked == AlarmAckedState.Acknowledged,
Confirmed: e.Condition.Confirmed == AlarmConfirmedState.Confirmed,
Enabled: e.Condition.Enabled == AlarmEnabledState.Enabled,
Shelving: MapShelving(e.Condition.Shelving.Kind),
Severity: (ushort)SeverityToInt(e.Severity),
Message: e.Message);
/// Maps the Core onto the Commons
/// mirror (the Commons assembly can't see the Core enum).
private static AlarmShelvingKind MapShelving(ShelvingKind kind) => kind switch
{
ShelvingKind.OneShot => AlarmShelvingKind.OneShot,
ShelvingKind.Timed => AlarmShelvingKind.Timed,
_ => AlarmShelvingKind.Unshelved,
};
/// The acting user for an . Engine-driven
/// Activated / Cleared transitions are "system"; operator Acknowledged / Confirmed carry the
/// recorded user from the condition state, falling back to "system" when none was recorded.
private static string TransitionUser(ScriptedAlarmEvent e) => e.Emission switch
{
EmissionKind.Acknowledged => e.Condition.LastAckUser ?? "system",
EmissionKind.Confirmed => e.Condition.LastConfirmUser ?? "system",
_ => "system",
};
// Severity conversion convention: the engine + plan disagree on type (engine = AlarmSeverity enum,
// plan + AlarmTransitionEvent = OPC UA 1–1000 int). We bucket the int into quartiles on the way IN
// and emit the quartile ceiling on the way OUT, so a round-trip is stable within a bucket.
/// Buckets an OPC UA 1–1000 severity into the engine's coarse
/// enum: ≤250 Low, ≤500 Medium, ≤750 High, else Critical.
private static AlarmSeverity SeverityFromInt(int s) =>
s <= 250 ? AlarmSeverity.Low
: s <= 500 ? AlarmSeverity.Medium
: s <= 750 ? AlarmSeverity.High
: AlarmSeverity.Critical;
/// Maps the engine's coarse back to an OPC UA 1–1000 severity
/// at each bucket's ceiling: Low=250, Medium=500, High=750, Critical=1000.
private static int SeverityToInt(AlarmSeverity s) => s switch
{
AlarmSeverity.Low => 250,
AlarmSeverity.Medium => 500,
AlarmSeverity.High => 750,
AlarmSeverity.Critical => 1000,
_ => 500,
};
}