feat(scripted-alarms): ScriptedAlarmHostActor — engine runtime host (T9)

This commit is contained in:
Joseph Doherty
2026-06-10 14:57:42 -04:00
parent c9590c03d0
commit 3b418a54f1
2 changed files with 462 additions and 0 deletions
@@ -0,0 +1,289 @@
using Akka.Actor;
using Akka.Cluster.Tools.PublishSubscribe;
using Akka.Event;
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Alerts;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms;
using ZB.MOM.WW.OtOpcUa.OpcUaServer;
using ZB.MOM.WW.OtOpcUa.Runtime.OpcUa;
using ZB.MOM.WW.OtOpcUa.Runtime.VirtualTags;
namespace ZB.MOM.WW.OtOpcUa.Runtime.ScriptedAlarms;
/// <summary>
/// Akka host that owns one <see cref="ScriptedAlarmEngine"/> for an Equipment-namespace
/// driver node, feeds it live tag values, and bridges its emissions to OPC UA publish + the
/// cluster <c>alerts</c> DistributedPubSub topic.
///
/// <para>
/// The host is the engine's lifecycle owner: the caller (the driver-role startup) builds
/// the <see cref="DependencyMuxTagUpstreamSource"/>, constructs a <see cref="ScriptedAlarmEngine"/>
/// <b>around that same upstream</b>, and passes both here. The host disposes the engine in
/// <see cref="PostStop"/>.
/// </para>
///
/// <para>
/// <b>Data flow.</b> <see cref="DependencyMuxActor"/> delivers a
/// <see cref="VirtualTagActor.DependencyValueChanged"/> for every tag the loaded alarms
/// depend on; the host pushes each into the upstream. The engine self-evaluates: its own
/// <see cref="ITagUpstreamSource.SubscribeTag"/> observer re-runs the predicates and raises
/// <see cref="ScriptedAlarmEngine.OnEvent"/>. The host does NOT call evaluate — it only
/// feeds values in and fans emissions out.
/// </para>
///
/// <para>
/// <b>Thread-safety.</b> <see cref="ScriptedAlarmEngine.OnEvent"/> fires on the engine's
/// background worker thread (its fire-and-forget re-evaluation), NOT on the actor thread.
/// The ctor subscribes a handler that does nothing but <c>Self.Tell(new EngineEmission(e))</c>
/// — <see cref="ICanTell.Tell"/> is thread-safe. ALL sink work (touching <see cref="Context"/>,
/// the publish actor ref, the DPS mediator) then happens on the actor thread in
/// <see cref="OnEngineEmission"/>. No <see cref="Context"/> or actor state is ever read or
/// written from the OnEvent callback.
/// </para>
///
/// <para>
/// <b>Historization.</b> The host publishes each transition as an
/// <see cref="AlarmTransitionEvent"/> to the <c>alerts</c> topic ONLY. That topic is the
/// historization path: <c>HistorianAdapterActor</c>'s upstream and the Admin UI Alerts page
/// both consume <c>alerts</c>. The host deliberately does NOT also <c>Tell</c> the historian
/// adapter directly — doing so would double-historize every transition. (T9 plan called for a
/// "direct historian tell"; that was dropped because the alerts topic already feeds the
/// historian path, so a direct tell would duplicate every row.)
/// </para>
/// </summary>
public sealed class ScriptedAlarmHostActor : ReceiveActor
{
/// <summary>The cluster DistributedPubSub topic every alarm transition is published to. Matches
/// the constant the (retired) <c>ScriptedAlarmActor</c> used so subscribers stay wired.</summary>
public const string AlertsTopic = "alerts";
/// <summary>Reconcile the loaded alarm set to exactly the enabled subset of <paramref name="Plans"/>:
/// builds <see cref="ScriptedAlarmDefinition"/>s (skipping disabled plans), reloads the engine, and
/// re-registers mux interest for the union of dependency refs.</summary>
/// <param name="Plans">The desired Equipment-namespace scripted-alarm plans.</param>
public sealed record ApplyScriptedAlarms(IReadOnlyList<EquipmentScriptedAlarmPlan> Plans);
/// <summary>Marshals an engine emission off the engine's worker thread onto the actor thread.
/// Carries the <see cref="ScriptedAlarmEvent"/> the engine raised on <c>OnEvent</c>.</summary>
private sealed record EngineEmission(ScriptedAlarmEvent Event);
/// <summary>Pipe-back completion of an in-flight <see cref="ScriptedAlarmEngine.LoadAsync"/>:
/// carries the union of dependency refs to register mux interest for AFTER the load completed
/// (so the engine's upstream subscriptions exist before any value can arrive).</summary>
private sealed record AlarmsLoaded(IReadOnlyList<string> DepRefs);
private readonly IActorRef _publishActor;
private readonly IActorRef? _mux;
private readonly DependencyMuxTagUpstreamSource _upstream;
private readonly ScriptedAlarmEngine _engine;
private readonly Func<DateTime> _clock;
private readonly ILoggingAdapter _log = Context.GetLogger();
private readonly CancellationTokenSource _cts = new();
private readonly EventHandler<ScriptedAlarmEvent> _onEngineEvent;
/// <summary>Factory method to create Props for a <see cref="ScriptedAlarmHostActor"/>.</summary>
/// <param name="publishActor">The OPC UA publish actor that consumes
/// <see cref="OpcUaPublishActor.AlarmStateUpdate"/> bridged from engine emissions.</param>
/// <param name="mux">Optional dependency multiplexer the host registers interest with so it
/// receives a <see cref="VirtualTagActor.DependencyValueChanged"/> per dependency tag. Null on the
/// dev/Mac path (no live values).</param>
/// <param name="upstream">The mux-fed upstream the engine reads + subscribes from. MUST be the
/// same instance the <paramref name="engine"/> was constructed around.</param>
/// <param name="engine">The scripted-alarm engine this host owns + disposes.</param>
/// <param name="clock">Optional UTC clock; defaults to <see cref="DateTime.UtcNow"/>.</param>
public static Props Props(
IActorRef publishActor,
IActorRef? mux,
DependencyMuxTagUpstreamSource upstream,
ScriptedAlarmEngine engine,
Func<DateTime>? clock = null) =>
Akka.Actor.Props.Create(() => new ScriptedAlarmHostActor(publishActor, mux, upstream, engine, clock));
/// <summary>Initializes a new instance of the <see cref="ScriptedAlarmHostActor"/> class.</summary>
/// <param name="publishActor">The OPC UA publish actor emissions are bridged to.</param>
/// <param name="mux">Optional dependency multiplexer the host registers dependency interest with.</param>
/// <param name="upstream">The mux-fed upstream the engine reads + subscribes from (same instance the engine wraps).</param>
/// <param name="engine">The scripted-alarm engine this host owns + disposes.</param>
/// <param name="clock">Optional UTC clock; defaults to <see cref="DateTime.UtcNow"/>.</param>
public ScriptedAlarmHostActor(
IActorRef publishActor,
IActorRef? mux,
DependencyMuxTagUpstreamSource upstream,
ScriptedAlarmEngine engine,
Func<DateTime>? clock = null)
{
ArgumentNullException.ThrowIfNull(publishActor);
ArgumentNullException.ThrowIfNull(upstream);
ArgumentNullException.ThrowIfNull(engine);
_publishActor = publishActor;
_mux = mux;
_upstream = upstream;
_engine = engine;
_clock = clock ?? (() => DateTime.UtcNow);
// OnEvent fires on the engine's worker thread. NEVER touch Context / actor state here —
// marshal onto the actor thread via the thread-safe Self.Tell. Keep the handler in a field
// so PostStop can unsubscribe it. (Self is captured once; it is stable for the actor's life.)
var self = Self;
_onEngineEvent = (_, e) => self.Tell(new EngineEmission(e));
_engine.OnEvent += _onEngineEvent;
Receive<ApplyScriptedAlarms>(OnApply);
Receive<AlarmsLoaded>(OnAlarmsLoaded);
Receive<VirtualTagActor.DependencyValueChanged>(OnDependencyChanged);
Receive<EngineEmission>(OnEngineEmission);
// A faulted LoadAsync pipes back a Status.Failure (see OnApply) — log it and stay inert so the
// failure doesn't hit the dead-letter log.
Receive<Status.Failure>(OnLoadFailed);
}
private void OnApply(ApplyScriptedAlarms msg)
{
// Skip disabled plans entirely — the engine has no Enabled flag, so a disabled alarm is simply
// not loaded (no predicate, no upstream subscription, no events).
var enabled = msg.Plans.Where(p => p.Enabled).ToList();
var defs = enabled.Select(ToDefinition).ToList();
// Union of dependency refs across the loaded (enabled) alarms — interest is registered with
// the mux only AFTER LoadAsync completes (see OnAlarmsLoaded), because the engine establishes
// its upstream SubscribeTag subscriptions inside LoadAsync; values must not arrive before then.
var depRefs = enabled
.SelectMany(p => p.DependencyRefs)
.Distinct(StringComparer.Ordinal)
.ToList();
// PipeTo marshals the completion back onto the actor thread; on failure we log a Warning and
// do NOT register interest (the prior generation's subscription, if any, stays — a failed
// reload should not silently drop live values). Self-message AlarmsLoaded carries the refs.
_engine.LoadAsync(defs, _cts.Token)
.ContinueWith(
t => t.IsFaulted
? (object)new Status.Failure(t.Exception!)
: new AlarmsLoaded(depRefs),
_cts.Token,
TaskContinuationOptions.ExecuteSynchronously,
TaskScheduler.Default)
.PipeTo(Self);
_log.Debug("ScriptedAlarmHost: applying (enabled={Enabled}/{Total}, depRefs={Refs})",
enabled.Count, msg.Plans.Count, depRefs.Count);
}
private void OnLoadFailed(Status.Failure msg)
=> _log.Warning(msg.Cause, "ScriptedAlarmHost: engine LoadAsync failed — alarms not (re)loaded");
private void OnAlarmsLoaded(AlarmsLoaded msg)
{
// Register mux interest only now that LoadAsync has returned — the engine's upstream
// subscriptions exist, so any DependencyValueChanged the mux delivers will be observed by the
// engine. RegisterInterest replaces a subscriber's prior interest set (see DependencyMuxActor),
// so a re-Apply with a fresh union simply supersedes the old one — no explicit unregister needed.
_mux?.Tell(new DependencyMuxActor.RegisterInterest(msg.DepRefs, Self));
_log.Debug("ScriptedAlarmHost: loaded; registered mux interest for {Count} dep refs", msg.DepRefs.Count);
}
private void OnDependencyChanged(VirtualTagActor.DependencyValueChanged msg)
{
// Feed the live value into the upstream the engine subscribes from. StatusCode 0 = Good; the
// mux only forwards values it received from a driver publish, so we treat them as Good-quality.
_upstream.Push(msg.TagId, new DataValueSnapshot(msg.Value, 0u, msg.TimestampUtc, msg.TimestampUtc));
}
private void OnEngineEmission(EngineEmission msg)
{
var e = msg.Event;
// None = no meaningful change; Suppressed = shelving ate the emission. Neither should reach a
// sink. (The engine already filters these out of BuildEmission, but guard defensively.)
if (e.Emission is EmissionKind.None or EmissionKind.Suppressed)
{
return;
}
// Bridge to OPC UA: drive the alarm node's Active / Acknowledged sub-vars. We use e.AlarmId as
// the node id for now — T14 will materialise the real condition node at an aligned NodeId and
// this id will line up with it.
_publishActor.Tell(new OpcUaPublishActor.AlarmStateUpdate(
AlarmNodeId: e.AlarmId,
Active: e.Condition.Active == AlarmActiveState.Active,
Acknowledged: e.Condition.Acked == AlarmAckedState.Acknowledged,
TimestampUtc: e.TimestampUtc));
// Publish the transition to the cluster `alerts` topic — the single historization + live
// fan-out path. The mediator is obtained on the ACTOR thread (here), never off-thread.
var evt = new AlarmTransitionEvent(
AlarmId: e.AlarmId,
EquipmentPath: e.EquipmentPath,
AlarmName: e.AlarmName,
TransitionKind: e.Emission.ToString(),
Severity: SeverityToInt(e.Severity),
Message: e.Message,
User: TransitionUser(e),
TimestampUtc: e.TimestampUtc);
DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish(AlertsTopic, evt));
}
/// <inheritdoc />
protected override void PostStop()
{
// Unregister mux interest first so no further DependencyValueChanged arrives while we tear down.
_mux?.Tell(new DependencyMuxActor.UnregisterInterest(Self));
// Cancel any in-flight LoadAsync, detach the OnEvent handler (so a late engine emission can't
// Tell a stopping actor), then dispose the engine (which drains its background work + clears).
_cts.Cancel();
_engine.OnEvent -= _onEngineEvent;
_engine.Dispose();
_cts.Dispose();
base.PostStop();
}
/// <summary>Maps an <see cref="EquipmentScriptedAlarmPlan"/> to the engine's
/// <see cref="ScriptedAlarmDefinition"/>. <see cref="EquipmentScriptedAlarmPlan.AlarmType"/> is a
/// string that must parse to an <see cref="AlarmKind"/>; an unrecognised type falls back to
/// <see cref="AlarmKind.AlarmCondition"/> rather than dropping the alarm.</summary>
private static ScriptedAlarmDefinition ToDefinition(EquipmentScriptedAlarmPlan p) => new(
AlarmId: p.ScriptedAlarmId,
EquipmentPath: p.EquipmentId,
AlarmName: p.Name,
Kind: Enum.TryParse<AlarmKind>(p.AlarmType, out var k) ? k : AlarmKind.AlarmCondition,
Severity: SeverityFromInt(p.Severity),
MessageTemplate: p.MessageTemplate,
PredicateScriptSource: p.PredicateSource,
HistorizeToAveva: p.HistorizeToAveva,
Retain: p.Retain);
/// <summary>The acting user for an <see cref="AlarmTransitionEvent"/>. Engine-driven
/// Activated / Cleared transitions are <c>"system"</c>; operator Acknowledged / Confirmed carry the
/// recorded user from the condition state, falling back to <c>"system"</c> when none was recorded.</summary>
private static string TransitionUser(ScriptedAlarmEvent e) => e.Emission switch
{
EmissionKind.Acknowledged => e.Condition.LastAckUser ?? "system",
EmissionKind.Confirmed => e.Condition.LastConfirmUser ?? "system",
_ => "system",
};
// Severity conversion convention: the engine + plan disagree on type (engine = AlarmSeverity enum,
// plan + AlarmTransitionEvent = OPC UA 11000 int). We bucket the int into quartiles on the way IN
// and emit the quartile ceiling on the way OUT, so a round-trip is stable within a bucket.
/// <summary>Buckets an OPC UA 11000 severity into the engine's coarse <see cref="AlarmSeverity"/>
/// enum: ≤250 Low, ≤500 Medium, ≤750 High, else Critical.</summary>
private static AlarmSeverity SeverityFromInt(int s) =>
s <= 250 ? AlarmSeverity.Low
: s <= 500 ? AlarmSeverity.Medium
: s <= 750 ? AlarmSeverity.High
: AlarmSeverity.Critical;
/// <summary>Maps the engine's coarse <see cref="AlarmSeverity"/> back to an OPC UA 11000 severity
/// at each bucket's ceiling: Low=250, Medium=500, High=750, Critical=1000.</summary>
private static int SeverityToInt(AlarmSeverity s) => s switch
{
AlarmSeverity.Low => 250,
AlarmSeverity.Medium => 500,
AlarmSeverity.High => 750,
AlarmSeverity.Critical => 1000,
_ => 500,
};
}
@@ -0,0 +1,173 @@
using Akka.Actor;
using Akka.Cluster.Tools.PublishSubscribe;
using Akka.TestKit;
using Serilog;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Alerts;
using ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms;
using ZB.MOM.WW.OtOpcUa.Core.Scripting;
using ZB.MOM.WW.OtOpcUa.OpcUaServer;
using ZB.MOM.WW.OtOpcUa.Runtime.OpcUa;
using ZB.MOM.WW.OtOpcUa.Runtime.ScriptedAlarms;
using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness;
using ZB.MOM.WW.OtOpcUa.Runtime.VirtualTags;
namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.ScriptedAlarms;
/// <summary>
/// Verifies <see cref="ScriptedAlarmHostActor"/> loads the enabled subset of
/// <see cref="EquipmentScriptedAlarmPlan"/>s into its <see cref="ScriptedAlarmEngine"/>, registers mux
/// interest for their dependency refs after the load completes, feeds live
/// <see cref="VirtualTagActor.DependencyValueChanged"/> values into the engine, and bridges the
/// engine's emissions to both an <see cref="OpcUaPublishActor.AlarmStateUpdate"/> and an
/// <see cref="AlarmTransitionEvent"/> on the cluster <c>alerts</c> topic.
/// </summary>
public sealed class ScriptedAlarmHostActorTests : RuntimeActorTestBase
{
private static readonly TimeSpan Timeout = TimeSpan.FromSeconds(8);
/// <summary>Plan whose predicate compares the single tag "M.T" against 90 — enabled by default.</summary>
private static EquipmentScriptedAlarmPlan Plan(
string id = "alm-1",
string equipmentId = "Plant/Line1/M",
string name = "HighTemp",
string depRef = "M.T",
int threshold = 90,
bool enabled = true,
int severity = 800) =>
new(
ScriptedAlarmId: id,
EquipmentId: equipmentId,
Name: name,
AlarmType: "AlarmCondition",
Severity: severity,
MessageTemplate: "condition",
PredicateScriptId: $"{id}-script",
PredicateSource: $"return (int)ctx.GetTag(\"{depRef}\").Value > {threshold};",
DependencyRefs: new[] { depRef },
HistorizeToAveva: true,
Retain: true,
Enabled: enabled);
private static ScriptedAlarmEngine BuildEngine(DependencyMuxTagUpstreamSource upstream)
{
var logger = new LoggerConfiguration().CreateLogger();
return new ScriptedAlarmEngine(upstream, new InMemoryAlarmStateStore(), new ScriptLoggerFactory(logger), logger);
}
private (IActorRef Host, DependencyMuxTagUpstreamSource Upstream) Spawn(
TestProbe publish, TestProbe mux)
{
var upstream = new DependencyMuxTagUpstreamSource();
var engine = BuildEngine(upstream);
var host = Sys.ActorOf(ScriptedAlarmHostActor.Props(publish.Ref, mux.Ref, upstream, engine));
return (host, upstream);
}
/// <summary>Subscribe <paramref name="probe"/> to the <c>alerts</c> DPS topic and wait for the ack.
/// The Subscribe is sent FROM the probe so the SubscribeAck returns to it.</summary>
private void SubscribeToAlerts(TestProbe probe)
{
DistributedPubSub.Get(Sys).Mediator.Tell(
new Subscribe(ScriptedAlarmHostActor.AlertsTopic, probe.Ref), probe.Ref);
probe.ExpectMsg<SubscribeAck>(Timeout);
}
/// <summary>Load + interest: applying one enabled alarm registers mux interest for its dep ref
/// AFTER the engine load completes; a disabled alarm in the same apply contributes no dep ref.</summary>
[Fact]
public void Apply_loads_enabled_alarm_and_registers_interest()
{
var publish = CreateTestProbe();
var mux = CreateTestProbe();
var (host, _) = Spawn(publish, mux);
host.Tell(new ScriptedAlarmHostActor.ApplyScriptedAlarms(new[]
{
Plan(id: "alm-1", depRef: "M.T"),
Plan(id: "alm-2", depRef: "M.X", enabled: false), // disabled — not loaded, dep absent
}));
var reg = mux.ExpectMsg<DependencyMuxActor.RegisterInterest>(Timeout);
reg.TagRefs.ShouldContain("M.T");
reg.TagRefs.ShouldNotContain("M.X");
}
/// <summary>Activation path: with the alarm loaded, pushing a value above the threshold drives an
/// Inactive→Active transition — the host publishes an AlarmStateUpdate(Active=true) and an
/// AlarmTransitionEvent("Activated") on the alerts topic.</summary>
[Fact]
public void Dependency_change_above_threshold_activates_alarm()
{
var publish = CreateTestProbe();
var mux = CreateTestProbe();
var alerts = CreateTestProbe();
SubscribeToAlerts(alerts);
var (host, _) = Spawn(publish, mux);
host.Tell(new ScriptedAlarmHostActor.ApplyScriptedAlarms(new[] { Plan(severity: 800) }));
mux.ExpectMsg<DependencyMuxActor.RegisterInterest>(Timeout); // load completed
var now = DateTime.UtcNow;
host.Tell(new VirtualTagActor.DependencyValueChanged("M.T", 99, now));
var state = publish.ExpectMsg<OpcUaPublishActor.AlarmStateUpdate>(Timeout);
state.AlarmNodeId.ShouldBe("alm-1");
state.Active.ShouldBeTrue();
var evt = alerts.ExpectMsg<AlarmTransitionEvent>(Timeout);
evt.AlarmId.ShouldBe("alm-1");
evt.TransitionKind.ShouldBe("Activated");
evt.Severity.ShouldBe(1000); // 800 → Critical bucket → 1000
evt.User.ShouldBe("system");
}
/// <summary>Clear path: after activating, pushing a value below the threshold drives Active→Inactive
/// — AlarmStateUpdate(Active=false) + AlarmTransitionEvent("Cleared").</summary>
[Fact]
public void Dependency_change_below_threshold_clears_alarm()
{
var publish = CreateTestProbe();
var mux = CreateTestProbe();
var alerts = CreateTestProbe();
SubscribeToAlerts(alerts);
var (host, _) = Spawn(publish, mux);
host.Tell(new ScriptedAlarmHostActor.ApplyScriptedAlarms(new[] { Plan() }));
mux.ExpectMsg<DependencyMuxActor.RegisterInterest>(Timeout);
// Activate first.
host.Tell(new VirtualTagActor.DependencyValueChanged("M.T", 99, DateTime.UtcNow));
publish.FishForMessage<OpcUaPublishActor.AlarmStateUpdate>(m => m.Active, Timeout);
alerts.FishForMessage<AlarmTransitionEvent>(e => e.TransitionKind == "Activated", Timeout);
// Now clear.
host.Tell(new VirtualTagActor.DependencyValueChanged("M.T", 10, DateTime.UtcNow));
var cleared = publish.FishForMessage<OpcUaPublishActor.AlarmStateUpdate>(m => !m.Active, Timeout);
cleared.AlarmNodeId.ShouldBe("alm-1");
var evt = alerts.FishForMessage<AlarmTransitionEvent>(e => e.TransitionKind == "Cleared", Timeout);
evt.AlarmId.ShouldBe("alm-1");
}
/// <summary>Re-apply reloads: a second ApplyScriptedAlarms with a different alarm set loads the new
/// alarm — a fresh RegisterInterest reflecting the new dependency refs lands on the mux.</summary>
[Fact]
public void Reapply_reloads_with_new_dependency_refs()
{
var publish = CreateTestProbe();
var mux = CreateTestProbe();
var (host, _) = Spawn(publish, mux);
host.Tell(new ScriptedAlarmHostActor.ApplyScriptedAlarms(new[] { Plan(id: "alm-1", depRef: "M.T") }));
var first = mux.ExpectMsg<DependencyMuxActor.RegisterInterest>(Timeout);
first.TagRefs.ShouldContain("M.T");
host.Tell(new ScriptedAlarmHostActor.ApplyScriptedAlarms(new[] { Plan(id: "alm-9", depRef: "M.Y") }));
var second = mux.ExpectMsg<DependencyMuxActor.RegisterInterest>(Timeout);
second.TagRefs.ShouldContain("M.Y");
second.TagRefs.ShouldNotContain("M.T");
}
}