diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ScriptedAlarms/ScriptedAlarmHostActor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ScriptedAlarms/ScriptedAlarmHostActor.cs
new file mode 100644
index 00000000..c170aa33
--- /dev/null
+++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ScriptedAlarms/ScriptedAlarmHostActor.cs
@@ -0,0 +1,289 @@
+using Akka.Actor;
+using Akka.Cluster.Tools.PublishSubscribe;
+using Akka.Event;
+using ZB.MOM.WW.OtOpcUa.Commons.Messages.Alerts;
+using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
+using ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms;
+using ZB.MOM.WW.OtOpcUa.OpcUaServer;
+using ZB.MOM.WW.OtOpcUa.Runtime.OpcUa;
+using ZB.MOM.WW.OtOpcUa.Runtime.VirtualTags;
+
+namespace ZB.MOM.WW.OtOpcUa.Runtime.ScriptedAlarms;
+
+///
+/// Akka host that owns one for an Equipment-namespace
+/// driver node, feeds it live tag values, and bridges its emissions to OPC UA publish + the
+/// cluster alerts DistributedPubSub topic.
+///
+///
+/// The host is the engine's lifecycle owner: the caller (the driver-role startup) builds
+/// the , constructs a
+/// around that same upstream, and passes both here. The host disposes the engine in
+/// .
+///
+///
+///
+/// Data flow. delivers a
+/// for every tag the loaded alarms
+/// depend on; the host pushes each into the upstream. The engine self-evaluates: its own
+/// observer re-runs the predicates and raises
+/// . The host does NOT call evaluate — it only
+/// feeds values in and fans emissions out.
+///
+///
+///
+/// Thread-safety. fires on the engine's
+/// background worker thread (its fire-and-forget re-evaluation), NOT on the actor thread.
+/// The ctor subscribes a handler that does nothing but Self.Tell(new EngineEmission(e))
+/// — is thread-safe. ALL sink work (touching ,
+/// the publish actor ref, the DPS mediator) then happens on the actor thread in
+/// . No or actor state is ever read or
+/// written from the OnEvent callback.
+///
+///
+///
+/// Historization. The host publishes each transition as an
+/// to the alerts topic ONLY. That topic is the
+/// historization path: HistorianAdapterActor's upstream and the Admin UI Alerts page
+/// both consume alerts. The host deliberately does NOT also Tell the historian
+/// adapter directly — doing so would double-historize every transition. (T9 plan called for a
+/// "direct historian tell"; that was dropped because the alerts topic already feeds the
+/// historian path, so a direct tell would duplicate every row.)
+///
+///
+public sealed class ScriptedAlarmHostActor : ReceiveActor
+{
+ /// The cluster DistributedPubSub topic every alarm transition is published to. Matches
+ /// the constant the (retired) ScriptedAlarmActor used so subscribers stay wired.
+ public const string AlertsTopic = "alerts";
+
+ /// Reconcile the loaded alarm set to exactly the enabled subset of :
+ /// builds s (skipping disabled plans), reloads the engine, and
+ /// re-registers mux interest for the union of dependency refs.
+ /// The desired Equipment-namespace scripted-alarm plans.
+ public sealed record ApplyScriptedAlarms(IReadOnlyList Plans);
+
+ /// Marshals an engine emission off the engine's worker thread onto the actor thread.
+ /// Carries the the engine raised on OnEvent.
+ private sealed record EngineEmission(ScriptedAlarmEvent Event);
+
+ /// Pipe-back completion of an in-flight :
+ /// carries the union of dependency refs to register mux interest for AFTER the load completed
+ /// (so the engine's upstream subscriptions exist before any value can arrive).
+ private sealed record AlarmsLoaded(IReadOnlyList DepRefs);
+
+ private readonly IActorRef _publishActor;
+ private readonly IActorRef? _mux;
+ private readonly DependencyMuxTagUpstreamSource _upstream;
+ private readonly ScriptedAlarmEngine _engine;
+ private readonly Func _clock;
+ private readonly ILoggingAdapter _log = Context.GetLogger();
+ private readonly CancellationTokenSource _cts = new();
+ private readonly EventHandler _onEngineEvent;
+
+ /// Factory method to create Props for a .
+ /// The OPC UA publish actor that consumes
+ /// bridged from engine emissions.
+ /// Optional dependency multiplexer the host registers interest with so it
+ /// receives a per dependency tag. Null on the
+ /// dev/Mac path (no live values).
+ /// The mux-fed upstream the engine reads + subscribes from. MUST be the
+ /// same instance the was constructed around.
+ /// The scripted-alarm engine this host owns + disposes.
+ /// Optional UTC clock; defaults to .
+ public static Props Props(
+ IActorRef publishActor,
+ IActorRef? mux,
+ DependencyMuxTagUpstreamSource upstream,
+ ScriptedAlarmEngine engine,
+ Func? clock = null) =>
+ Akka.Actor.Props.Create(() => new ScriptedAlarmHostActor(publishActor, mux, upstream, engine, clock));
+
+ /// Initializes a new instance of the class.
+ /// The OPC UA publish actor emissions are bridged to.
+ /// Optional dependency multiplexer the host registers dependency interest with.
+ /// The mux-fed upstream the engine reads + subscribes from (same instance the engine wraps).
+ /// The scripted-alarm engine this host owns + disposes.
+ /// Optional UTC clock; defaults to .
+ public ScriptedAlarmHostActor(
+ IActorRef publishActor,
+ IActorRef? mux,
+ DependencyMuxTagUpstreamSource upstream,
+ ScriptedAlarmEngine engine,
+ Func? clock = null)
+ {
+ ArgumentNullException.ThrowIfNull(publishActor);
+ ArgumentNullException.ThrowIfNull(upstream);
+ ArgumentNullException.ThrowIfNull(engine);
+ _publishActor = publishActor;
+ _mux = mux;
+ _upstream = upstream;
+ _engine = engine;
+ _clock = clock ?? (() => DateTime.UtcNow);
+
+ // OnEvent fires on the engine's worker thread. NEVER touch Context / actor state here —
+ // marshal onto the actor thread via the thread-safe Self.Tell. Keep the handler in a field
+ // so PostStop can unsubscribe it. (Self is captured once; it is stable for the actor's life.)
+ var self = Self;
+ _onEngineEvent = (_, e) => self.Tell(new EngineEmission(e));
+ _engine.OnEvent += _onEngineEvent;
+
+ Receive(OnApply);
+ Receive(OnAlarmsLoaded);
+ Receive(OnDependencyChanged);
+ Receive(OnEngineEmission);
+ // A faulted LoadAsync pipes back a Status.Failure (see OnApply) — log it and stay inert so the
+ // failure doesn't hit the dead-letter log.
+ Receive(OnLoadFailed);
+ }
+
+ private void OnApply(ApplyScriptedAlarms msg)
+ {
+ // Skip disabled plans entirely — the engine has no Enabled flag, so a disabled alarm is simply
+ // not loaded (no predicate, no upstream subscription, no events).
+ var enabled = msg.Plans.Where(p => p.Enabled).ToList();
+
+ var defs = enabled.Select(ToDefinition).ToList();
+
+ // Union of dependency refs across the loaded (enabled) alarms — interest is registered with
+ // the mux only AFTER LoadAsync completes (see OnAlarmsLoaded), because the engine establishes
+ // its upstream SubscribeTag subscriptions inside LoadAsync; values must not arrive before then.
+ var depRefs = enabled
+ .SelectMany(p => p.DependencyRefs)
+ .Distinct(StringComparer.Ordinal)
+ .ToList();
+
+ // PipeTo marshals the completion back onto the actor thread; on failure we log a Warning and
+ // do NOT register interest (the prior generation's subscription, if any, stays — a failed
+ // reload should not silently drop live values). Self-message AlarmsLoaded carries the refs.
+ _engine.LoadAsync(defs, _cts.Token)
+ .ContinueWith(
+ t => t.IsFaulted
+ ? (object)new Status.Failure(t.Exception!)
+ : new AlarmsLoaded(depRefs),
+ _cts.Token,
+ TaskContinuationOptions.ExecuteSynchronously,
+ TaskScheduler.Default)
+ .PipeTo(Self);
+
+ _log.Debug("ScriptedAlarmHost: applying (enabled={Enabled}/{Total}, depRefs={Refs})",
+ enabled.Count, msg.Plans.Count, depRefs.Count);
+ }
+
+ private void OnLoadFailed(Status.Failure msg)
+ => _log.Warning(msg.Cause, "ScriptedAlarmHost: engine LoadAsync failed — alarms not (re)loaded");
+
+ private void OnAlarmsLoaded(AlarmsLoaded msg)
+ {
+ // Register mux interest only now that LoadAsync has returned — the engine's upstream
+ // subscriptions exist, so any DependencyValueChanged the mux delivers will be observed by the
+ // engine. RegisterInterest replaces a subscriber's prior interest set (see DependencyMuxActor),
+ // so a re-Apply with a fresh union simply supersedes the old one — no explicit unregister needed.
+ _mux?.Tell(new DependencyMuxActor.RegisterInterest(msg.DepRefs, Self));
+ _log.Debug("ScriptedAlarmHost: loaded; registered mux interest for {Count} dep refs", msg.DepRefs.Count);
+ }
+
+ private void OnDependencyChanged(VirtualTagActor.DependencyValueChanged msg)
+ {
+ // Feed the live value into the upstream the engine subscribes from. StatusCode 0 = Good; the
+ // mux only forwards values it received from a driver publish, so we treat them as Good-quality.
+ _upstream.Push(msg.TagId, new DataValueSnapshot(msg.Value, 0u, msg.TimestampUtc, msg.TimestampUtc));
+ }
+
+ private void OnEngineEmission(EngineEmission msg)
+ {
+ var e = msg.Event;
+
+ // None = no meaningful change; Suppressed = shelving ate the emission. Neither should reach a
+ // sink. (The engine already filters these out of BuildEmission, but guard defensively.)
+ if (e.Emission is EmissionKind.None or EmissionKind.Suppressed)
+ {
+ return;
+ }
+
+ // Bridge to OPC UA: drive the alarm node's Active / Acknowledged sub-vars. We use e.AlarmId as
+ // the node id for now — T14 will materialise the real condition node at an aligned NodeId and
+ // this id will line up with it.
+ _publishActor.Tell(new OpcUaPublishActor.AlarmStateUpdate(
+ AlarmNodeId: e.AlarmId,
+ Active: e.Condition.Active == AlarmActiveState.Active,
+ Acknowledged: e.Condition.Acked == AlarmAckedState.Acknowledged,
+ TimestampUtc: e.TimestampUtc));
+
+ // Publish the transition to the cluster `alerts` topic — the single historization + live
+ // fan-out path. The mediator is obtained on the ACTOR thread (here), never off-thread.
+ var evt = new AlarmTransitionEvent(
+ AlarmId: e.AlarmId,
+ EquipmentPath: e.EquipmentPath,
+ AlarmName: e.AlarmName,
+ TransitionKind: e.Emission.ToString(),
+ Severity: SeverityToInt(e.Severity),
+ Message: e.Message,
+ User: TransitionUser(e),
+ TimestampUtc: e.TimestampUtc);
+
+ DistributedPubSub.Get(Context.System).Mediator.Tell(new Publish(AlertsTopic, evt));
+ }
+
+ ///
+ protected override void PostStop()
+ {
+ // Unregister mux interest first so no further DependencyValueChanged arrives while we tear down.
+ _mux?.Tell(new DependencyMuxActor.UnregisterInterest(Self));
+ // Cancel any in-flight LoadAsync, detach the OnEvent handler (so a late engine emission can't
+ // Tell a stopping actor), then dispose the engine (which drains its background work + clears).
+ _cts.Cancel();
+ _engine.OnEvent -= _onEngineEvent;
+ _engine.Dispose();
+ _cts.Dispose();
+ base.PostStop();
+ }
+
+ /// Maps an to the engine's
+ /// . is a
+ /// string that must parse to an ; an unrecognised type falls back to
+ /// rather than dropping the alarm.
+ private static ScriptedAlarmDefinition ToDefinition(EquipmentScriptedAlarmPlan p) => new(
+ AlarmId: p.ScriptedAlarmId,
+ EquipmentPath: p.EquipmentId,
+ AlarmName: p.Name,
+ Kind: Enum.TryParse(p.AlarmType, out var k) ? k : AlarmKind.AlarmCondition,
+ Severity: SeverityFromInt(p.Severity),
+ MessageTemplate: p.MessageTemplate,
+ PredicateScriptSource: p.PredicateSource,
+ HistorizeToAveva: p.HistorizeToAveva,
+ Retain: p.Retain);
+
+ /// The acting user for an . Engine-driven
+ /// Activated / Cleared transitions are "system"; operator Acknowledged / Confirmed carry the
+ /// recorded user from the condition state, falling back to "system" when none was recorded.
+ private static string TransitionUser(ScriptedAlarmEvent e) => e.Emission switch
+ {
+ EmissionKind.Acknowledged => e.Condition.LastAckUser ?? "system",
+ EmissionKind.Confirmed => e.Condition.LastConfirmUser ?? "system",
+ _ => "system",
+ };
+
+ // Severity conversion convention: the engine + plan disagree on type (engine = AlarmSeverity enum,
+ // plan + AlarmTransitionEvent = OPC UA 1–1000 int). We bucket the int into quartiles on the way IN
+ // and emit the quartile ceiling on the way OUT, so a round-trip is stable within a bucket.
+
+ /// Buckets an OPC UA 1–1000 severity into the engine's coarse
+ /// enum: ≤250 Low, ≤500 Medium, ≤750 High, else Critical.
+ private static AlarmSeverity SeverityFromInt(int s) =>
+ s <= 250 ? AlarmSeverity.Low
+ : s <= 500 ? AlarmSeverity.Medium
+ : s <= 750 ? AlarmSeverity.High
+ : AlarmSeverity.Critical;
+
+ /// Maps the engine's coarse back to an OPC UA 1–1000 severity
+ /// at each bucket's ceiling: Low=250, Medium=500, High=750, Critical=1000.
+ private static int SeverityToInt(AlarmSeverity s) => s switch
+ {
+ AlarmSeverity.Low => 250,
+ AlarmSeverity.Medium => 500,
+ AlarmSeverity.High => 750,
+ AlarmSeverity.Critical => 1000,
+ _ => 500,
+ };
+}
diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/ScriptedAlarms/ScriptedAlarmHostActorTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/ScriptedAlarms/ScriptedAlarmHostActorTests.cs
new file mode 100644
index 00000000..4229f27e
--- /dev/null
+++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/ScriptedAlarms/ScriptedAlarmHostActorTests.cs
@@ -0,0 +1,173 @@
+using Akka.Actor;
+using Akka.Cluster.Tools.PublishSubscribe;
+using Akka.TestKit;
+using Serilog;
+using Shouldly;
+using Xunit;
+using ZB.MOM.WW.OtOpcUa.Commons.Messages.Alerts;
+using ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms;
+using ZB.MOM.WW.OtOpcUa.Core.Scripting;
+using ZB.MOM.WW.OtOpcUa.OpcUaServer;
+using ZB.MOM.WW.OtOpcUa.Runtime.OpcUa;
+using ZB.MOM.WW.OtOpcUa.Runtime.ScriptedAlarms;
+using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness;
+using ZB.MOM.WW.OtOpcUa.Runtime.VirtualTags;
+
+namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.ScriptedAlarms;
+
+///
+/// Verifies loads the enabled subset of
+/// s into its , registers mux
+/// interest for their dependency refs after the load completes, feeds live
+/// values into the engine, and bridges the
+/// engine's emissions to both an and an
+/// on the cluster alerts topic.
+///
+public sealed class ScriptedAlarmHostActorTests : RuntimeActorTestBase
+{
+ private static readonly TimeSpan Timeout = TimeSpan.FromSeconds(8);
+
+ /// Plan whose predicate compares the single tag "M.T" against 90 — enabled by default.
+ private static EquipmentScriptedAlarmPlan Plan(
+ string id = "alm-1",
+ string equipmentId = "Plant/Line1/M",
+ string name = "HighTemp",
+ string depRef = "M.T",
+ int threshold = 90,
+ bool enabled = true,
+ int severity = 800) =>
+ new(
+ ScriptedAlarmId: id,
+ EquipmentId: equipmentId,
+ Name: name,
+ AlarmType: "AlarmCondition",
+ Severity: severity,
+ MessageTemplate: "condition",
+ PredicateScriptId: $"{id}-script",
+ PredicateSource: $"return (int)ctx.GetTag(\"{depRef}\").Value > {threshold};",
+ DependencyRefs: new[] { depRef },
+ HistorizeToAveva: true,
+ Retain: true,
+ Enabled: enabled);
+
+ private static ScriptedAlarmEngine BuildEngine(DependencyMuxTagUpstreamSource upstream)
+ {
+ var logger = new LoggerConfiguration().CreateLogger();
+ return new ScriptedAlarmEngine(upstream, new InMemoryAlarmStateStore(), new ScriptLoggerFactory(logger), logger);
+ }
+
+ private (IActorRef Host, DependencyMuxTagUpstreamSource Upstream) Spawn(
+ TestProbe publish, TestProbe mux)
+ {
+ var upstream = new DependencyMuxTagUpstreamSource();
+ var engine = BuildEngine(upstream);
+ var host = Sys.ActorOf(ScriptedAlarmHostActor.Props(publish.Ref, mux.Ref, upstream, engine));
+ return (host, upstream);
+ }
+
+ /// Subscribe to the alerts DPS topic and wait for the ack.
+ /// The Subscribe is sent FROM the probe so the SubscribeAck returns to it.
+ private void SubscribeToAlerts(TestProbe probe)
+ {
+ DistributedPubSub.Get(Sys).Mediator.Tell(
+ new Subscribe(ScriptedAlarmHostActor.AlertsTopic, probe.Ref), probe.Ref);
+ probe.ExpectMsg(Timeout);
+ }
+
+ /// Load + interest: applying one enabled alarm registers mux interest for its dep ref
+ /// AFTER the engine load completes; a disabled alarm in the same apply contributes no dep ref.
+ [Fact]
+ public void Apply_loads_enabled_alarm_and_registers_interest()
+ {
+ var publish = CreateTestProbe();
+ var mux = CreateTestProbe();
+ var (host, _) = Spawn(publish, mux);
+
+ host.Tell(new ScriptedAlarmHostActor.ApplyScriptedAlarms(new[]
+ {
+ Plan(id: "alm-1", depRef: "M.T"),
+ Plan(id: "alm-2", depRef: "M.X", enabled: false), // disabled — not loaded, dep absent
+ }));
+
+ var reg = mux.ExpectMsg(Timeout);
+ reg.TagRefs.ShouldContain("M.T");
+ reg.TagRefs.ShouldNotContain("M.X");
+ }
+
+ /// Activation path: with the alarm loaded, pushing a value above the threshold drives an
+ /// Inactive→Active transition — the host publishes an AlarmStateUpdate(Active=true) and an
+ /// AlarmTransitionEvent("Activated") on the alerts topic.
+ [Fact]
+ public void Dependency_change_above_threshold_activates_alarm()
+ {
+ var publish = CreateTestProbe();
+ var mux = CreateTestProbe();
+ var alerts = CreateTestProbe();
+ SubscribeToAlerts(alerts);
+
+ var (host, _) = Spawn(publish, mux);
+ host.Tell(new ScriptedAlarmHostActor.ApplyScriptedAlarms(new[] { Plan(severity: 800) }));
+ mux.ExpectMsg(Timeout); // load completed
+
+ var now = DateTime.UtcNow;
+ host.Tell(new VirtualTagActor.DependencyValueChanged("M.T", 99, now));
+
+ var state = publish.ExpectMsg(Timeout);
+ state.AlarmNodeId.ShouldBe("alm-1");
+ state.Active.ShouldBeTrue();
+
+ var evt = alerts.ExpectMsg(Timeout);
+ evt.AlarmId.ShouldBe("alm-1");
+ evt.TransitionKind.ShouldBe("Activated");
+ evt.Severity.ShouldBe(1000); // 800 → Critical bucket → 1000
+ evt.User.ShouldBe("system");
+ }
+
+ /// Clear path: after activating, pushing a value below the threshold drives Active→Inactive
+ /// — AlarmStateUpdate(Active=false) + AlarmTransitionEvent("Cleared").
+ [Fact]
+ public void Dependency_change_below_threshold_clears_alarm()
+ {
+ var publish = CreateTestProbe();
+ var mux = CreateTestProbe();
+ var alerts = CreateTestProbe();
+ SubscribeToAlerts(alerts);
+
+ var (host, _) = Spawn(publish, mux);
+ host.Tell(new ScriptedAlarmHostActor.ApplyScriptedAlarms(new[] { Plan() }));
+ mux.ExpectMsg(Timeout);
+
+ // Activate first.
+ host.Tell(new VirtualTagActor.DependencyValueChanged("M.T", 99, DateTime.UtcNow));
+ publish.FishForMessage(m => m.Active, Timeout);
+ alerts.FishForMessage(e => e.TransitionKind == "Activated", Timeout);
+
+ // Now clear.
+ host.Tell(new VirtualTagActor.DependencyValueChanged("M.T", 10, DateTime.UtcNow));
+
+ var cleared = publish.FishForMessage(m => !m.Active, Timeout);
+ cleared.AlarmNodeId.ShouldBe("alm-1");
+
+ var evt = alerts.FishForMessage(e => e.TransitionKind == "Cleared", Timeout);
+ evt.AlarmId.ShouldBe("alm-1");
+ }
+
+ /// Re-apply reloads: a second ApplyScriptedAlarms with a different alarm set loads the new
+ /// alarm — a fresh RegisterInterest reflecting the new dependency refs lands on the mux.
+ [Fact]
+ public void Reapply_reloads_with_new_dependency_refs()
+ {
+ var publish = CreateTestProbe();
+ var mux = CreateTestProbe();
+ var (host, _) = Spawn(publish, mux);
+
+ host.Tell(new ScriptedAlarmHostActor.ApplyScriptedAlarms(new[] { Plan(id: "alm-1", depRef: "M.T") }));
+ var first = mux.ExpectMsg(Timeout);
+ first.TagRefs.ShouldContain("M.T");
+
+ host.Tell(new ScriptedAlarmHostActor.ApplyScriptedAlarms(new[] { Plan(id: "alm-9", depRef: "M.Y") }));
+ var second = mux.ExpectMsg(Timeout);
+ second.TagRefs.ShouldContain("M.Y");
+ second.TagRefs.ShouldNotContain("M.T");
+ }
+}