From 3b418a54f1083fecd201280e138c427caee9a9ee Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Wed, 10 Jun 2026 14:57:42 -0400 Subject: [PATCH] =?UTF-8?q?feat(scripted-alarms):=20ScriptedAlarmHostActor?= =?UTF-8?q?=20=E2=80=94=20engine=20runtime=20host=20(T9)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ScriptedAlarms/ScriptedAlarmHostActor.cs | 289 ++++++++++++++++++ .../ScriptedAlarmHostActorTests.cs | 173 +++++++++++ 2 files changed, 462 insertions(+) create mode 100644 src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ScriptedAlarms/ScriptedAlarmHostActor.cs create mode 100644 tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/ScriptedAlarms/ScriptedAlarmHostActorTests.cs diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ScriptedAlarms/ScriptedAlarmHostActor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ScriptedAlarms/ScriptedAlarmHostActor.cs new file mode 100644 index 00000000..c170aa33 --- /dev/null +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ScriptedAlarms/ScriptedAlarmHostActor.cs @@ -0,0 +1,289 @@ +using Akka.Actor; +using Akka.Cluster.Tools.PublishSubscribe; +using Akka.Event; +using ZB.MOM.WW.OtOpcUa.Commons.Messages.Alerts; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms; +using ZB.MOM.WW.OtOpcUa.OpcUaServer; +using ZB.MOM.WW.OtOpcUa.Runtime.OpcUa; +using ZB.MOM.WW.OtOpcUa.Runtime.VirtualTags; + +namespace ZB.MOM.WW.OtOpcUa.Runtime.ScriptedAlarms; + +/// +/// Akka host that owns one for an Equipment-namespace +/// driver node, feeds it live tag values, and bridges its emissions to OPC UA publish + the +/// cluster alerts DistributedPubSub topic. +/// +/// +/// The host is the engine's lifecycle owner: the caller (the driver-role startup) builds +/// the , constructs a +/// around that same upstream, and passes both here. The host disposes the engine in +/// . +/// +/// +/// +/// Data flow. delivers a +/// for every tag the loaded alarms +/// depend on; the host pushes each into the upstream. The engine self-evaluates: its own +/// observer re-runs the predicates and raises +/// . The host does NOT call evaluate — it only +/// feeds values in and fans emissions out. +/// +/// +/// +/// Thread-safety. fires on the engine's +/// background worker thread (its fire-and-forget re-evaluation), NOT on the actor thread. +/// The ctor subscribes a handler that does nothing but Self.Tell(new EngineEmission(e)) +/// — is thread-safe. ALL sink work (touching , +/// the publish actor ref, the DPS mediator) then happens on the actor thread in +/// . No or actor state is ever read or +/// written from the OnEvent callback. +/// +/// +/// +/// Historization. The host publishes each transition as an +/// to the alerts topic ONLY. That topic is the +/// historization path: HistorianAdapterActor's upstream and the Admin UI Alerts page +/// both consume alerts. The host deliberately does NOT also Tell the historian +/// adapter directly — doing so would double-historize every transition. (T9 plan called for a +/// "direct historian tell"; that was dropped because the alerts topic already feeds the +/// historian path, so a direct tell would duplicate every row.) +/// +/// +public sealed class ScriptedAlarmHostActor : ReceiveActor +{ + /// The cluster DistributedPubSub topic every alarm transition is published to. Matches + /// the constant the (retired) ScriptedAlarmActor used so subscribers stay wired. + public const string AlertsTopic = "alerts"; + + /// Reconcile the loaded alarm set to exactly the enabled subset of : + /// builds s (skipping disabled plans), reloads the engine, and + /// re-registers mux interest for the union of dependency refs. + /// The desired Equipment-namespace scripted-alarm plans. + public sealed record ApplyScriptedAlarms(IReadOnlyList Plans); + + /// Marshals an engine emission off the engine's worker thread onto the actor thread. + /// Carries the the engine raised on OnEvent. + private sealed record EngineEmission(ScriptedAlarmEvent Event); + + /// Pipe-back completion of an in-flight : + /// carries the union of dependency refs to register mux interest for AFTER the load completed + /// (so the engine's upstream subscriptions exist before any value can arrive). + private sealed record AlarmsLoaded(IReadOnlyList DepRefs); + + private readonly IActorRef _publishActor; + private readonly IActorRef? _mux; + private readonly DependencyMuxTagUpstreamSource _upstream; + private readonly ScriptedAlarmEngine _engine; + private readonly Func _clock; + private readonly ILoggingAdapter _log = Context.GetLogger(); + private readonly CancellationTokenSource _cts = new(); + private readonly EventHandler _onEngineEvent; + + /// Factory method to create Props for a . + /// The OPC UA publish actor that consumes + /// bridged from engine emissions. + /// Optional dependency multiplexer the host registers interest with so it + /// receives a per dependency tag. Null on the + /// dev/Mac path (no live values). + /// The mux-fed upstream the engine reads + subscribes from. MUST be the + /// same instance the was constructed around. + /// The scripted-alarm engine this host owns + disposes. + /// Optional UTC clock; defaults to . + public static Props Props( + IActorRef publishActor, + IActorRef? mux, + DependencyMuxTagUpstreamSource upstream, + ScriptedAlarmEngine engine, + Func? clock = null) => + Akka.Actor.Props.Create(() => new ScriptedAlarmHostActor(publishActor, mux, upstream, engine, clock)); + + /// Initializes a new instance of the class. + /// The OPC UA publish actor emissions are bridged to. + /// Optional dependency multiplexer the host registers dependency interest with. + /// The mux-fed upstream the engine reads + subscribes from (same instance the engine wraps). + /// The scripted-alarm engine this host owns + disposes. + /// Optional UTC clock; defaults to . + public ScriptedAlarmHostActor( + IActorRef publishActor, + IActorRef? mux, + DependencyMuxTagUpstreamSource upstream, + ScriptedAlarmEngine engine, + Func? clock = null) + { + ArgumentNullException.ThrowIfNull(publishActor); + ArgumentNullException.ThrowIfNull(upstream); + ArgumentNullException.ThrowIfNull(engine); + _publishActor = publishActor; + _mux = mux; + _upstream = upstream; + _engine = engine; + _clock = clock ?? (() => DateTime.UtcNow); + + // OnEvent fires on the engine's worker thread. NEVER touch Context / actor state here — + // marshal onto the actor thread via the thread-safe Self.Tell. Keep the handler in a field + // so PostStop can unsubscribe it. (Self is captured once; it is stable for the actor's life.) + var self = Self; + _onEngineEvent = (_, e) => self.Tell(new EngineEmission(e)); + _engine.OnEvent += _onEngineEvent; + + Receive(OnApply); + Receive(OnAlarmsLoaded); + Receive(OnDependencyChanged); + Receive(OnEngineEmission); + // A faulted LoadAsync pipes back a Status.Failure (see OnApply) — log it and stay inert so the + // failure doesn't hit the dead-letter log. + Receive(OnLoadFailed); + } + + private void OnApply(ApplyScriptedAlarms msg) + { + // Skip disabled plans entirely — the engine has no Enabled flag, so a disabled alarm is simply + // not loaded (no predicate, no upstream subscription, no events). + var enabled = msg.Plans.Where(p => p.Enabled).ToList(); + + var defs = enabled.Select(ToDefinition).ToList(); + + // Union of dependency refs across the loaded (enabled) alarms — interest is registered with + // the mux only AFTER LoadAsync completes (see OnAlarmsLoaded), because the engine establishes + // its upstream SubscribeTag subscriptions inside LoadAsync; values must not arrive before then. + var depRefs = enabled + .SelectMany(p => p.DependencyRefs) + .Distinct(StringComparer.Ordinal) + .ToList(); + + // PipeTo marshals the completion back onto the actor thread; on failure we log a Warning and + // do NOT register interest (the prior generation's subscription, if any, stays — a failed + // reload should not silently drop live values). Self-message AlarmsLoaded carries the refs. + _engine.LoadAsync(defs, _cts.Token) + .ContinueWith( + t => t.IsFaulted + ? (object)new Status.Failure(t.Exception!) + : new AlarmsLoaded(depRefs), + _cts.Token, + TaskContinuationOptions.ExecuteSynchronously, + TaskScheduler.Default) + .PipeTo(Self); + + _log.Debug("ScriptedAlarmHost: applying (enabled={Enabled}/{Total}, depRefs={Refs})", + enabled.Count, msg.Plans.Count, depRefs.Count); + } + + private void OnLoadFailed(Status.Failure msg) + => _log.Warning(msg.Cause, "ScriptedAlarmHost: engine LoadAsync failed — alarms not (re)loaded"); + + private void OnAlarmsLoaded(AlarmsLoaded msg) + { + // Register mux interest only now that LoadAsync has returned — the engine's upstream + // subscriptions exist, so any DependencyValueChanged the mux delivers will be observed by the + // engine. RegisterInterest replaces a subscriber's prior interest set (see DependencyMuxActor), + // so a re-Apply with a fresh union simply supersedes the old one — no explicit unregister needed. + _mux?.Tell(new DependencyMuxActor.RegisterInterest(msg.DepRefs, Self)); + _log.Debug("ScriptedAlarmHost: loaded; registered mux interest for {Count} dep refs", msg.DepRefs.Count); + } + + private void OnDependencyChanged(VirtualTagActor.DependencyValueChanged msg) + { + // Feed the live value into the upstream the engine subscribes from. StatusCode 0 = Good; the + // mux only forwards values it received from a driver publish, so we treat them as Good-quality. + _upstream.Push(msg.TagId, new DataValueSnapshot(msg.Value, 0u, msg.TimestampUtc, msg.TimestampUtc)); + } + + private void OnEngineEmission(EngineEmission msg) + { + var e = msg.Event; + + // None = no meaningful change; Suppressed = shelving ate the emission. Neither should reach a + // sink. (The engine already filters these out of BuildEmission, but guard defensively.) + if (e.Emission is EmissionKind.None or EmissionKind.Suppressed) + { + return; + } + + // Bridge to OPC UA: drive the alarm node's Active / Acknowledged sub-vars. We use e.AlarmId as + // the node id for now — T14 will materialise the real condition node at an aligned NodeId and + // this id will line up with it. + _publishActor.Tell(new OpcUaPublishActor.AlarmStateUpdate( + AlarmNodeId: e.AlarmId, + Active: e.Condition.Active == AlarmActiveState.Active, + Acknowledged: e.Condition.Acked == AlarmAckedState.Acknowledged, + TimestampUtc: e.TimestampUtc)); + + // Publish the transition to the cluster `alerts` topic — the single historization + live + // fan-out path. The mediator is obtained on the ACTOR thread (here), never off-thread. + var evt = new AlarmTransitionEvent( + AlarmId: e.AlarmId, + EquipmentPath: e.EquipmentPath, + AlarmName: e.AlarmName, + TransitionKind: e.Emission.ToString(), + Severity: SeverityToInt(e.Severity), + Message: e.Message, + User: TransitionUser(e), + TimestampUtc: e.TimestampUtc); + + DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish(AlertsTopic, evt)); + } + + /// + protected override void PostStop() + { + // Unregister mux interest first so no further DependencyValueChanged arrives while we tear down. + _mux?.Tell(new DependencyMuxActor.UnregisterInterest(Self)); + // Cancel any in-flight LoadAsync, detach the OnEvent handler (so a late engine emission can't + // Tell a stopping actor), then dispose the engine (which drains its background work + clears). + _cts.Cancel(); + _engine.OnEvent -= _onEngineEvent; + _engine.Dispose(); + _cts.Dispose(); + base.PostStop(); + } + + /// Maps an to the engine's + /// . is a + /// string that must parse to an ; an unrecognised type falls back to + /// rather than dropping the alarm. + private static ScriptedAlarmDefinition ToDefinition(EquipmentScriptedAlarmPlan p) => new( + AlarmId: p.ScriptedAlarmId, + EquipmentPath: p.EquipmentId, + AlarmName: p.Name, + Kind: Enum.TryParse(p.AlarmType, out var k) ? k : AlarmKind.AlarmCondition, + Severity: SeverityFromInt(p.Severity), + MessageTemplate: p.MessageTemplate, + PredicateScriptSource: p.PredicateSource, + HistorizeToAveva: p.HistorizeToAveva, + Retain: p.Retain); + + /// The acting user for an . Engine-driven + /// Activated / Cleared transitions are "system"; operator Acknowledged / Confirmed carry the + /// recorded user from the condition state, falling back to "system" when none was recorded. + private static string TransitionUser(ScriptedAlarmEvent e) => e.Emission switch + { + EmissionKind.Acknowledged => e.Condition.LastAckUser ?? "system", + EmissionKind.Confirmed => e.Condition.LastConfirmUser ?? "system", + _ => "system", + }; + + // Severity conversion convention: the engine + plan disagree on type (engine = AlarmSeverity enum, + // plan + AlarmTransitionEvent = OPC UA 1–1000 int). We bucket the int into quartiles on the way IN + // and emit the quartile ceiling on the way OUT, so a round-trip is stable within a bucket. + + /// Buckets an OPC UA 1–1000 severity into the engine's coarse + /// enum: ≤250 Low, ≤500 Medium, ≤750 High, else Critical. + private static AlarmSeverity SeverityFromInt(int s) => + s <= 250 ? AlarmSeverity.Low + : s <= 500 ? AlarmSeverity.Medium + : s <= 750 ? AlarmSeverity.High + : AlarmSeverity.Critical; + + /// Maps the engine's coarse back to an OPC UA 1–1000 severity + /// at each bucket's ceiling: Low=250, Medium=500, High=750, Critical=1000. + private static int SeverityToInt(AlarmSeverity s) => s switch + { + AlarmSeverity.Low => 250, + AlarmSeverity.Medium => 500, + AlarmSeverity.High => 750, + AlarmSeverity.Critical => 1000, + _ => 500, + }; +} diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/ScriptedAlarms/ScriptedAlarmHostActorTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/ScriptedAlarms/ScriptedAlarmHostActorTests.cs new file mode 100644 index 00000000..4229f27e --- /dev/null +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/ScriptedAlarms/ScriptedAlarmHostActorTests.cs @@ -0,0 +1,173 @@ +using Akka.Actor; +using Akka.Cluster.Tools.PublishSubscribe; +using Akka.TestKit; +using Serilog; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Commons.Messages.Alerts; +using ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms; +using ZB.MOM.WW.OtOpcUa.Core.Scripting; +using ZB.MOM.WW.OtOpcUa.OpcUaServer; +using ZB.MOM.WW.OtOpcUa.Runtime.OpcUa; +using ZB.MOM.WW.OtOpcUa.Runtime.ScriptedAlarms; +using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness; +using ZB.MOM.WW.OtOpcUa.Runtime.VirtualTags; + +namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.ScriptedAlarms; + +/// +/// Verifies loads the enabled subset of +/// s into its , registers mux +/// interest for their dependency refs after the load completes, feeds live +/// values into the engine, and bridges the +/// engine's emissions to both an and an +/// on the cluster alerts topic. +/// +public sealed class ScriptedAlarmHostActorTests : RuntimeActorTestBase +{ + private static readonly TimeSpan Timeout = TimeSpan.FromSeconds(8); + + /// Plan whose predicate compares the single tag "M.T" against 90 — enabled by default. + private static EquipmentScriptedAlarmPlan Plan( + string id = "alm-1", + string equipmentId = "Plant/Line1/M", + string name = "HighTemp", + string depRef = "M.T", + int threshold = 90, + bool enabled = true, + int severity = 800) => + new( + ScriptedAlarmId: id, + EquipmentId: equipmentId, + Name: name, + AlarmType: "AlarmCondition", + Severity: severity, + MessageTemplate: "condition", + PredicateScriptId: $"{id}-script", + PredicateSource: $"return (int)ctx.GetTag(\"{depRef}\").Value > {threshold};", + DependencyRefs: new[] { depRef }, + HistorizeToAveva: true, + Retain: true, + Enabled: enabled); + + private static ScriptedAlarmEngine BuildEngine(DependencyMuxTagUpstreamSource upstream) + { + var logger = new LoggerConfiguration().CreateLogger(); + return new ScriptedAlarmEngine(upstream, new InMemoryAlarmStateStore(), new ScriptLoggerFactory(logger), logger); + } + + private (IActorRef Host, DependencyMuxTagUpstreamSource Upstream) Spawn( + TestProbe publish, TestProbe mux) + { + var upstream = new DependencyMuxTagUpstreamSource(); + var engine = BuildEngine(upstream); + var host = Sys.ActorOf(ScriptedAlarmHostActor.Props(publish.Ref, mux.Ref, upstream, engine)); + return (host, upstream); + } + + /// Subscribe to the alerts DPS topic and wait for the ack. + /// The Subscribe is sent FROM the probe so the SubscribeAck returns to it. + private void SubscribeToAlerts(TestProbe probe) + { + DistributedPubSub.Get(Sys).Mediator.Tell( + new Subscribe(ScriptedAlarmHostActor.AlertsTopic, probe.Ref), probe.Ref); + probe.ExpectMsg(Timeout); + } + + /// Load + interest: applying one enabled alarm registers mux interest for its dep ref + /// AFTER the engine load completes; a disabled alarm in the same apply contributes no dep ref. + [Fact] + public void Apply_loads_enabled_alarm_and_registers_interest() + { + var publish = CreateTestProbe(); + var mux = CreateTestProbe(); + var (host, _) = Spawn(publish, mux); + + host.Tell(new ScriptedAlarmHostActor.ApplyScriptedAlarms(new[] + { + Plan(id: "alm-1", depRef: "M.T"), + Plan(id: "alm-2", depRef: "M.X", enabled: false), // disabled — not loaded, dep absent + })); + + var reg = mux.ExpectMsg(Timeout); + reg.TagRefs.ShouldContain("M.T"); + reg.TagRefs.ShouldNotContain("M.X"); + } + + /// Activation path: with the alarm loaded, pushing a value above the threshold drives an + /// Inactive→Active transition — the host publishes an AlarmStateUpdate(Active=true) and an + /// AlarmTransitionEvent("Activated") on the alerts topic. + [Fact] + public void Dependency_change_above_threshold_activates_alarm() + { + var publish = CreateTestProbe(); + var mux = CreateTestProbe(); + var alerts = CreateTestProbe(); + SubscribeToAlerts(alerts); + + var (host, _) = Spawn(publish, mux); + host.Tell(new ScriptedAlarmHostActor.ApplyScriptedAlarms(new[] { Plan(severity: 800) })); + mux.ExpectMsg(Timeout); // load completed + + var now = DateTime.UtcNow; + host.Tell(new VirtualTagActor.DependencyValueChanged("M.T", 99, now)); + + var state = publish.ExpectMsg(Timeout); + state.AlarmNodeId.ShouldBe("alm-1"); + state.Active.ShouldBeTrue(); + + var evt = alerts.ExpectMsg(Timeout); + evt.AlarmId.ShouldBe("alm-1"); + evt.TransitionKind.ShouldBe("Activated"); + evt.Severity.ShouldBe(1000); // 800 → Critical bucket → 1000 + evt.User.ShouldBe("system"); + } + + /// Clear path: after activating, pushing a value below the threshold drives Active→Inactive + /// — AlarmStateUpdate(Active=false) + AlarmTransitionEvent("Cleared"). + [Fact] + public void Dependency_change_below_threshold_clears_alarm() + { + var publish = CreateTestProbe(); + var mux = CreateTestProbe(); + var alerts = CreateTestProbe(); + SubscribeToAlerts(alerts); + + var (host, _) = Spawn(publish, mux); + host.Tell(new ScriptedAlarmHostActor.ApplyScriptedAlarms(new[] { Plan() })); + mux.ExpectMsg(Timeout); + + // Activate first. + host.Tell(new VirtualTagActor.DependencyValueChanged("M.T", 99, DateTime.UtcNow)); + publish.FishForMessage(m => m.Active, Timeout); + alerts.FishForMessage(e => e.TransitionKind == "Activated", Timeout); + + // Now clear. + host.Tell(new VirtualTagActor.DependencyValueChanged("M.T", 10, DateTime.UtcNow)); + + var cleared = publish.FishForMessage(m => !m.Active, Timeout); + cleared.AlarmNodeId.ShouldBe("alm-1"); + + var evt = alerts.FishForMessage(e => e.TransitionKind == "Cleared", Timeout); + evt.AlarmId.ShouldBe("alm-1"); + } + + /// Re-apply reloads: a second ApplyScriptedAlarms with a different alarm set loads the new + /// alarm — a fresh RegisterInterest reflecting the new dependency refs lands on the mux. + [Fact] + public void Reapply_reloads_with_new_dependency_refs() + { + var publish = CreateTestProbe(); + var mux = CreateTestProbe(); + var (host, _) = Spawn(publish, mux); + + host.Tell(new ScriptedAlarmHostActor.ApplyScriptedAlarms(new[] { Plan(id: "alm-1", depRef: "M.T") })); + var first = mux.ExpectMsg(Timeout); + first.TagRefs.ShouldContain("M.T"); + + host.Tell(new ScriptedAlarmHostActor.ApplyScriptedAlarms(new[] { Plan(id: "alm-9", depRef: "M.Y") })); + var second = mux.ExpectMsg(Timeout); + second.TagRefs.ShouldContain("M.Y"); + second.TagRefs.ShouldNotContain("M.T"); + } +}