using Akka.Actor; using Akka.Cluster.Tools.PublishSubscribe; using Akka.TestKit; using Serilog; using Shouldly; using Xunit; using ZB.MOM.WW.OtOpcUa.Commons.Messages.Alerts; using ZB.MOM.WW.OtOpcUa.Commons.Messages.Redundancy; using ZB.MOM.WW.OtOpcUa.Commons.OpcUa; using ZB.MOM.WW.OtOpcUa.Commons.Types; using ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms; using ZB.MOM.WW.OtOpcUa.Core.Scripting; using ZB.MOM.WW.OtOpcUa.OpcUaServer; using ZB.MOM.WW.OtOpcUa.Runtime.OpcUa; using ZB.MOM.WW.OtOpcUa.Runtime.ScriptedAlarms; using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness; using ZB.MOM.WW.OtOpcUa.Runtime.VirtualTags; namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.ScriptedAlarms; /// /// Verifies loads the enabled subset of /// s into its , registers mux /// interest for their dependency refs after the load completes, feeds live /// values into the engine, and bridges the /// engine's emissions to both an and an /// on the cluster alerts topic. /// public sealed class ScriptedAlarmHostActorTests : RuntimeActorTestBase { private static readonly TimeSpan Timeout = TimeSpan.FromSeconds(8); /// Plan whose predicate compares the single tag "M.T" against 90 — enabled by default. private static EquipmentScriptedAlarmPlan Plan( string id = "alm-1", string equipmentId = "Plant/Line1/M", string name = "HighTemp", string depRef = "M.T", int threshold = 90, bool enabled = true, int severity = 800) => new( ScriptedAlarmId: id, EquipmentId: equipmentId, Name: name, AlarmType: "AlarmCondition", Severity: severity, MessageTemplate: "condition", PredicateScriptId: $"{id}-script", PredicateSource: $"return (int)ctx.GetTag(\"{depRef}\").Value > {threshold};", DependencyRefs: new[] { depRef }, HistorizeToAveva: true, Retain: true, Enabled: enabled); /// Plan whose predicate references an unknown identifier so it fails to compile — applying it /// faults the engine's LoadAsync. Used to prove a faulted load doesn't crash the host. private static EquipmentScriptedAlarmPlan BadPlan(string id = "bad-1") => new( ScriptedAlarmId: id, EquipmentId: "Plant/Line1/M", Name: "Broken", AlarmType: "AlarmCondition", Severity: 500, MessageTemplate: "broken", PredicateScriptId: $"{id}-script", PredicateSource: "return unknownIdentifier;", // uncompilable → LoadAsync throws DependencyRefs: Array.Empty(), HistorizeToAveva: false, Retain: false, Enabled: true); private static ScriptedAlarmEngine BuildEngine(DependencyMuxTagUpstreamSource upstream) { var logger = new LoggerConfiguration().CreateLogger(); return new ScriptedAlarmEngine(upstream, new InMemoryAlarmStateStore(), new ScriptLoggerFactory(logger), logger); } /// The local node id used by the redundancy-gating tests. private static readonly NodeId LocalNode = new("node-A"); private (IActorRef Host, DependencyMuxTagUpstreamSource Upstream) Spawn( TestProbe publish, TestProbe mux, NodeId? localNode = null) { var upstream = new DependencyMuxTagUpstreamSource(); var engine = BuildEngine(upstream); var host = Sys.ActorOf(ScriptedAlarmHostActor.Props(publish.Ref, mux.Ref, upstream, engine, localNode)); return (host, upstream); } /// Tell the host a snapshot marking /// with so the gate observes the local role. private static void TellRedundancyRole(IActorRef host, RedundancyRole role) => host.Tell(new RedundancyStateChanged( new[] { new NodeRedundancyState( NodeId: LocalNode, Role: role, IsClusterLeader: role == RedundancyRole.Primary, IsRoleLeaderForDriver: role == RedundancyRole.Primary, AsOfUtc: DateTime.UtcNow), }, CorrelationId.NewId())); /// Subscribe to the alerts DPS topic and wait for the ack. /// The Subscribe is sent FROM the probe so the SubscribeAck returns to it. private void SubscribeToAlerts(TestProbe probe) { DistributedPubSub.Get(Sys).Mediator.Tell( new Subscribe(ScriptedAlarmHostActor.AlertsTopic, probe.Ref), probe.Ref); probe.ExpectMsg(Timeout); } /// Load + interest: applying one enabled alarm registers mux interest for its dep ref /// AFTER the engine load completes; a disabled alarm in the same apply contributes no dep ref. [Fact] public void Apply_loads_enabled_alarm_and_registers_interest() { var publish = CreateTestProbe(); var mux = CreateTestProbe(); var (host, _) = Spawn(publish, mux); host.Tell(new ScriptedAlarmHostActor.ApplyScriptedAlarms(new[] { Plan(id: "alm-1", depRef: "M.T"), Plan(id: "alm-2", depRef: "M.X", enabled: false), // disabled — not loaded, dep absent })); var reg = mux.ExpectMsg(Timeout); reg.TagRefs.ShouldContain("M.T"); reg.TagRefs.ShouldNotContain("M.X"); } /// Activation path: with the alarm loaded, pushing a value above the threshold drives an /// Inactive→Active transition — the host publishes an AlarmStateUpdate(Active=true) and an /// AlarmTransitionEvent("Activated") on the alerts topic. [Fact] public void Dependency_change_above_threshold_activates_alarm() { var publish = CreateTestProbe(); var mux = CreateTestProbe(); var alerts = CreateTestProbe(); SubscribeToAlerts(alerts); var (host, _) = Spawn(publish, mux); host.Tell(new ScriptedAlarmHostActor.ApplyScriptedAlarms(new[] { Plan(severity: 800) })); mux.ExpectMsg(Timeout); // load completed var now = DateTime.UtcNow; host.Tell(new VirtualTagActor.DependencyValueChanged("M.T", 99, now)); var state = publish.ExpectMsg(Timeout); state.AlarmNodeId.ShouldBe("alm-1"); // The full Part 9 snapshot bridges through (T15) — every Core condition field maps: // on activation the engine sets Active, clears Ack AND Confirm (a new active occurrence needs a // fresh ack→clear→confirm cycle), keeps Enabled, and leaves Shelving unshelved. state.State.Active.ShouldBeTrue(); // Condition.Active == Active state.State.Acknowledged.ShouldBeFalse(); // Condition.Acked == Unacknowledged on activation state.State.Confirmed.ShouldBeFalse(); // Condition.Confirmed == Unconfirmed on activation state.State.Enabled.ShouldBeTrue(); // Condition.Enabled == Enabled state.State.Shelving.ShouldBe(AlarmShelvingKind.Unshelved); // Condition.Shelving.Kind == Unshelved state.State.Severity.ShouldBe((ushort)1000); // 800 → Critical bucket → 1000 state.State.Message.ShouldBe("condition"); // e.Message var evt = alerts.ExpectMsg(Timeout); evt.AlarmId.ShouldBe("alm-1"); evt.TransitionKind.ShouldBe("Activated"); evt.Severity.ShouldBe(1000); // 800 → Critical bucket → 1000 evt.User.ShouldBe("system"); } /// Clear path: after activating, pushing a value below the threshold drives Active→Inactive /// — AlarmStateUpdate(Active=false) + AlarmTransitionEvent("Cleared"). [Fact] public void Dependency_change_below_threshold_clears_alarm() { var publish = CreateTestProbe(); var mux = CreateTestProbe(); var alerts = CreateTestProbe(); SubscribeToAlerts(alerts); var (host, _) = Spawn(publish, mux); host.Tell(new ScriptedAlarmHostActor.ApplyScriptedAlarms(new[] { Plan() })); mux.ExpectMsg(Timeout); // Activate first. host.Tell(new VirtualTagActor.DependencyValueChanged("M.T", 99, DateTime.UtcNow)); publish.FishForMessage(m => m.State.Active, Timeout); alerts.FishForMessage(e => e.TransitionKind == "Activated", Timeout); // Now clear. host.Tell(new VirtualTagActor.DependencyValueChanged("M.T", 10, DateTime.UtcNow)); var cleared = publish.FishForMessage(m => !m.State.Active, Timeout); cleared.AlarmNodeId.ShouldBe("alm-1"); var evt = alerts.FishForMessage(e => e.TransitionKind == "Cleared", Timeout); evt.AlarmId.ShouldBe("alm-1"); } /// Re-apply reloads: a second ApplyScriptedAlarms with a different alarm set loads the new /// alarm — a fresh RegisterInterest reflecting the new dependency refs lands on the mux. [Fact] public void Reapply_reloads_with_new_dependency_refs() { var publish = CreateTestProbe(); var mux = CreateTestProbe(); var (host, _) = Spawn(publish, mux); host.Tell(new ScriptedAlarmHostActor.ApplyScriptedAlarms(new[] { Plan(id: "alm-1", depRef: "M.T") })); var first = mux.ExpectMsg(Timeout); first.TagRefs.ShouldContain("M.T"); host.Tell(new ScriptedAlarmHostActor.ApplyScriptedAlarms(new[] { Plan(id: "alm-9", depRef: "M.Y") })); var second = mux.ExpectMsg(Timeout); second.TagRefs.ShouldContain("M.Y"); second.TagRefs.ShouldNotContain("M.T"); } /// Faulted load resilience: applying an alarm whose predicate doesn't compile faults the /// engine's LoadAsync. The host logs a Warning (Status.Failure handler) and stays alive — it must /// still process a subsequent valid apply, registering interest for that apply's dep refs. [Fact] public void Faulted_load_does_not_crash_host() { var publish = CreateTestProbe(); var mux = CreateTestProbe(); var (host, _) = Spawn(publish, mux); // Apply a plan that fails to compile — LoadAsync faults, the host swallows it as a Warning and // does NOT register interest (no dep refs to register, and the load never completed). host.Tell(new ScriptedAlarmHostActor.ApplyScriptedAlarms(new[] { BadPlan() })); mux.ExpectNoMsg(TimeSpan.FromMilliseconds(500)); // Prove the actor is still responsive: a later valid apply loads + registers interest as normal. host.Tell(new ScriptedAlarmHostActor.ApplyScriptedAlarms(new[] { Plan(id: "alm-ok", depRef: "M.T") })); var reg = mux.ExpectMsg(Timeout); reg.TagRefs.ShouldContain("M.T"); } /// Stale-load guard (fix A): two back-to-back applies with different dep-ref sets must end /// with the mux holding the SECOND (latest-generation) set — a stale earlier completion must never /// re-introduce the first set's refs. /// /// Limitation: forcing the two LoadAsync continuations to complete out of order is not /// deterministic via real async timing (both loads are short + run on the thread pool). This test /// therefore validates the guard's observable contract rather than the race itself: it fishes for /// the RegisterInterest carrying the second apply's refs, then asserts no LATER RegisterInterest /// re-introduces the first apply's refs. With the generation guard in place the latest apply always /// wins; without it, an out-of-order stale completion could land a "M.A"-bearing RegisterInterest /// after the "M.B" one — which this assertion would catch. [Fact] public void Stale_load_does_not_register_superseded_dep_refs() { var publish = CreateTestProbe(); var mux = CreateTestProbe(); var (host, _) = Spawn(publish, mux); // Fire two applies in quick succession with disjoint dep refs; the second supersedes the first. host.Tell(new ScriptedAlarmHostActor.ApplyScriptedAlarms(new[] { Plan(id: "alm-a", depRef: "M.A") })); host.Tell(new ScriptedAlarmHostActor.ApplyScriptedAlarms(new[] { Plan(id: "alm-b", depRef: "M.B") })); // The latest generation must win: fish for the RegisterInterest reflecting the second apply. mux.FishForMessage(r => r.TagRefs.Contains("M.B"), Timeout); // No LATER RegisterInterest may re-introduce the first (superseded) apply's "M.A" ref. mux.ExpectNoMsg(TimeSpan.FromMilliseconds(500)); } /// Command path: an AlarmCommand("Acknowledge") for an alarm this host owns (and that is /// currently active+unacknowledged) drives the engine's AcknowledgeAsync — observed via the resulting /// AlarmStateUpdate(Acknowledged=true) and an AlarmTransitionEvent("Acknowledged") on the alerts topic /// carrying the command's User (the user threads through AcknowledgeAsync → LastAckUser → evt.User). [Fact] public void AlarmCommand_acknowledge_drives_engine_with_mapped_args() { var publish = CreateTestProbe(); var mux = CreateTestProbe(); var alerts = CreateTestProbe(); SubscribeToAlerts(alerts); var (host, _) = Spawn(publish, mux); host.Tell(new ScriptedAlarmHostActor.ApplyScriptedAlarms(new[] { Plan(id: "alm-1", depRef: "M.T") })); mux.ExpectMsg(Timeout); // load completed // Activate so there is something to acknowledge (Acknowledge no-ops on an already-acked alarm). host.Tell(new VirtualTagActor.DependencyValueChanged("M.T", 99, DateTime.UtcNow)); publish.FishForMessage(m => m.State.Active && !m.State.Acknowledged, Timeout); alerts.FishForMessage(e => e.TransitionKind == "Activated", Timeout); // Acknowledge via the command topic — the host owns alm-1, so AcknowledgeAsync runs. host.Tell(new AlarmCommand( AlarmId: "alm-1", Operation: "Acknowledge", User: "alice", Comment: "ack-note", UnshelveAtUtc: null)); var acked = publish.FishForMessage(m => m.State.Acknowledged, Timeout); acked.AlarmNodeId.ShouldBe("alm-1"); acked.State.Acknowledged.ShouldBeTrue(); // The transition carries the acting user mapped from cmd.User (proves AcknowledgeAsync got it). var evt = alerts.FishForMessage(e => e.TransitionKind == "Acknowledged", Timeout); evt.AlarmId.ShouldBe("alm-1"); evt.User.ShouldBe("alice"); } /// Ownership filter: an AlarmCommand for an AlarmId this host's engine does NOT own is ignored /// — the engine op never runs, so no AlarmStateUpdate and no alerts transition are produced. [Fact] public void AlarmCommand_for_unowned_alarm_is_ignored() { var publish = CreateTestProbe(); var mux = CreateTestProbe(); var alerts = CreateTestProbe(); SubscribeToAlerts(alerts); var (host, _) = Spawn(publish, mux); host.Tell(new ScriptedAlarmHostActor.ApplyScriptedAlarms(new[] { Plan(id: "alm-1", depRef: "M.T") })); mux.ExpectMsg(Timeout); // load completed; host owns only alm-1 // Command targets an alarm this engine never loaded — it must be a no-op. host.Tell(new AlarmCommand( AlarmId: "not-mine", Operation: "Acknowledge", User: "alice", Comment: null, UnshelveAtUtc: null)); publish.ExpectNoMsg(TimeSpan.FromMilliseconds(500)); // no engine op → no projection alerts.ExpectNoMsg(TimeSpan.FromMilliseconds(500)); // no engine op → no transition } /// Validation: an AddComment command with empty or whitespace comment text is rejected (logged), /// NOT propagated to the engine — the actor stays alive and still processes a subsequent valid command, /// proving it didn't fault and the engine's AddCommentAsync was never driven. [Theory] [InlineData("")] [InlineData(" ")] public void AlarmCommand_add_comment_empty_text_is_rejected_not_driven(string emptyComment) { var publish = CreateTestProbe(); var mux = CreateTestProbe(); var alerts = CreateTestProbe(); SubscribeToAlerts(alerts); var (host, _) = Spawn(publish, mux); host.Tell(new ScriptedAlarmHostActor.ApplyScriptedAlarms(new[] { Plan(id: "alm-1", depRef: "M.T") })); mux.ExpectMsg(Timeout); // load completed // AddComment with empty/whitespace text is rejected before reaching the engine. host.Tell(new AlarmCommand( AlarmId: "alm-1", Operation: "AddComment", User: "alice", Comment: emptyComment, UnshelveAtUtc: null)); publish.ExpectNoMsg(TimeSpan.FromMilliseconds(500)); // rejected → no engine op → no OPC UA projection alerts.ExpectNoMsg(TimeSpan.FromMilliseconds(200)); // rejected → no alerts event // Prove the actor survived: activate the alarm and observe the normal projection flow. host.Tell(new VirtualTagActor.DependencyValueChanged("M.T", 99, DateTime.UtcNow)); var state = publish.FishForMessage(m => m.State.Active, Timeout); state.AlarmNodeId.ShouldBe("alm-1"); } /// Positive AddComment path: a non-empty AddComment for a loaded alarm drives the engine's /// AddCommentAsync — observed via an AlarmTransitionEvent("CommentAdded") on the alerts topic carrying /// the acting user (proves the op ran end-to-end through the host dispatch). [Fact] public void AlarmCommand_add_comment_nonempty_drives_engine() { var publish = CreateTestProbe(); var mux = CreateTestProbe(); var alerts = CreateTestProbe(); SubscribeToAlerts(alerts); var (host, _) = Spawn(publish, mux); host.Tell(new ScriptedAlarmHostActor.ApplyScriptedAlarms(new[] { Plan(id: "alm-1", depRef: "M.T") })); mux.ExpectMsg(Timeout); // load completed // AddComment with a non-empty comment drives the engine — CommentAdded transition emitted. host.Tell(new AlarmCommand( AlarmId: "alm-1", Operation: "AddComment", User: "bob", Comment: "note from operator", UnshelveAtUtc: null)); var evt = alerts.FishForMessage(e => e.TransitionKind == "CommentAdded", Timeout); evt.AlarmId.ShouldBe("alm-1"); } /// Validation: a TimedShelve command missing UnshelveAtUtc is rejected (logged), NOT thrown — /// the actor stays alive and still processes a subsequent valid command, proving it didn't fault. [Fact] public void AlarmCommand_timed_shelve_missing_unshelve_time_is_rejected_not_thrown() { var publish = CreateTestProbe(); var mux = CreateTestProbe(); var alerts = CreateTestProbe(); SubscribeToAlerts(alerts); var (host, _) = Spawn(publish, mux); host.Tell(new ScriptedAlarmHostActor.ApplyScriptedAlarms(new[] { Plan(id: "alm-1", depRef: "M.T") })); mux.ExpectMsg(Timeout); // load completed // TimedShelve with a null UnshelveAtUtc is malformed — the host rejects + logs, does not throw. host.Tell(new AlarmCommand( AlarmId: "alm-1", Operation: "TimedShelve", User: "alice", Comment: null, UnshelveAtUtc: null)); publish.ExpectNoMsg(TimeSpan.FromMilliseconds(500)); // rejected → no engine op → no projection // Prove the actor survived: activate the alarm and observe the normal projection flow. host.Tell(new VirtualTagActor.DependencyValueChanged("M.T", 99, DateTime.UtcNow)); var state = publish.FishForMessage(m => m.State.Active, Timeout); state.AlarmNodeId.ShouldBe("alm-1"); } /// Default-emit (T1): before ANY RedundancyStateChanged snapshot arrives — the boot window, /// and the steady state for single-node deploys (the sole node is always Primary) — the host MUST /// publish the cluster-wide alerts transition. Constructed WITH a localNode but no snapshot sent, so /// the cached local role is unknown ⇒ treated as Primary/emit. [Fact] public void Emission_is_published_to_alerts_by_default_before_any_redundancy_state() { var publish = CreateTestProbe(); var mux = CreateTestProbe(); var alerts = CreateTestProbe(); SubscribeToAlerts(alerts); var (host, _) = Spawn(publish, mux, LocalNode); host.Tell(new ScriptedAlarmHostActor.ApplyScriptedAlarms(new[] { Plan(severity: 800) })); mux.ExpectMsg(Timeout); // load completed // No RedundancyStateChanged sent — local role unknown ⇒ default-emit. host.Tell(new VirtualTagActor.DependencyValueChanged("M.T", 99, DateTime.UtcNow)); // The OPC UA node write happens AND the alerts transition is published. publish.FishForMessage(m => m.State.Active, Timeout); var evt = alerts.ExpectMsg(Timeout); evt.AlarmId.ShouldBe("alm-1"); evt.TransitionKind.ShouldBe("Activated"); } /// Secondary suppression (T1): when the cached local role is Secondary, the host MUST NOT /// publish the cluster-wide alerts transition (the Primary publishes the single copy) — but it MUST /// still write the local OPC UA condition node so the secondary's address space stays warm for failover. [Fact] public void Secondary_node_suppresses_alerts_publish_but_still_writes_opcua() { var publish = CreateTestProbe(); var mux = CreateTestProbe(); var alerts = CreateTestProbe(); SubscribeToAlerts(alerts); var (host, _) = Spawn(publish, mux, LocalNode); host.Tell(new ScriptedAlarmHostActor.ApplyScriptedAlarms(new[] { Plan(severity: 800) })); mux.ExpectMsg(Timeout); // load completed // Mark this node Secondary, then activate. TellRedundancyRole(host, RedundancyRole.Secondary); host.Tell(new VirtualTagActor.DependencyValueChanged("M.T", 99, DateTime.UtcNow)); // The local OPC UA node write is UNGATED — it must still arrive. var state = publish.FishForMessage(m => m.State.Active, Timeout); state.AlarmNodeId.ShouldBe("alm-1"); // The cluster-wide alerts publish is gated off on the secondary. alerts.ExpectNoMsg(TimeSpan.FromMilliseconds(500)); } /// Primary publishes (T1): when the cached local role is Primary, the host publishes the /// cluster-wide alerts transition as normal (this is the single copy the fleet sees). [Fact] public void Primary_node_publishes_alerts() { var publish = CreateTestProbe(); var mux = CreateTestProbe(); var alerts = CreateTestProbe(); SubscribeToAlerts(alerts); var (host, _) = Spawn(publish, mux, LocalNode); host.Tell(new ScriptedAlarmHostActor.ApplyScriptedAlarms(new[] { Plan(severity: 800) })); mux.ExpectMsg(Timeout); // load completed // Mark this node Primary, then activate. TellRedundancyRole(host, RedundancyRole.Primary); host.Tell(new VirtualTagActor.DependencyValueChanged("M.T", 99, DateTime.UtcNow)); publish.FishForMessage(m => m.State.Active, Timeout); var evt = alerts.ExpectMsg(Timeout); evt.AlarmId.ShouldBe("alm-1"); evt.TransitionKind.ShouldBe("Activated"); } /// Inbound command ungated by role (T1): the alerts-publish gate must NOT affect inbound /// command processing. Under a Secondary role, an AlarmCommand("Acknowledge") for an owned, active /// alarm still drives the engine — observed via the resulting AlarmStateUpdate(Acknowledged=true) /// (the OPC UA node write is ungated so the secondary's engine state + address space stay consistent). [Fact] public void Inbound_AlarmCommand_is_processed_regardless_of_role() { var publish = CreateTestProbe(); var mux = CreateTestProbe(); var (host, _) = Spawn(publish, mux, LocalNode); host.Tell(new ScriptedAlarmHostActor.ApplyScriptedAlarms(new[] { Plan(id: "alm-1", depRef: "M.T") })); mux.ExpectMsg(Timeout); // load completed // Mark this node Secondary — the alerts publish is gated, but command processing is NOT. TellRedundancyRole(host, RedundancyRole.Secondary); // Activate so there is something to acknowledge. host.Tell(new VirtualTagActor.DependencyValueChanged("M.T", 99, DateTime.UtcNow)); publish.FishForMessage(m => m.State.Active && !m.State.Acknowledged, Timeout); // Acknowledge via the command topic — the engine must process it even on the secondary. host.Tell(new AlarmCommand( AlarmId: "alm-1", Operation: "Acknowledge", User: "alice", Comment: "ack-note", UnshelveAtUtc: null)); var acked = publish.FishForMessage(m => m.State.Acknowledged, Timeout); acked.AlarmNodeId.ShouldBe("alm-1"); acked.State.Acknowledged.ShouldBeTrue(); } }