diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ScriptedAlarms/ScriptedAlarmHostActor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ScriptedAlarms/ScriptedAlarmHostActor.cs index d9872016..d87de52f 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ScriptedAlarms/ScriptedAlarmHostActor.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ScriptedAlarms/ScriptedAlarmHostActor.cs @@ -153,12 +153,24 @@ public sealed class ScriptedAlarmHostActor : ReceiveActor Receive(OnAlarmsLoaded); Receive(OnDependencyChanged); Receive(OnEngineEmission); + // Inbound OPC UA Part 9 alarm method calls arrive as AlarmCommands on the cluster + // `alarm-commands` DPS topic (T18 publishes them after the AlarmAck role gate). The topic is a + // cluster-wide broadcast — every host node receives every command — so OnAlarmCommand filters to + // the alarms THIS host's engine owns before driving the matching engine op. The engine ops are + // async, and this project's Akka analyzer (AK2003) forbids an async-void Receive delegate, so + // the handler is a Task-returning ReceiveAsync: Akka suspends the mailbox until the op completes + // (ordered, awaited on the actor thread) and routes any escaped fault through supervision. + ReceiveAsync(OnAlarmCommand); // A faulted LoadAsync pipes back a Status.Failure (see OnApply) — log it and stay inert so the // failure doesn't hit the dead-letter log. Receive(OnLoadFailed); // A LoadAsync cancelled by PostStop's _cts pipes back this marker. The actor is stopping, so // there's nothing to do — swallow it quietly (no Warning, no dead letter). Receive(_ => { }); + // DPS Subscribe (PreStart) acks back here once the mediator has registered Self on the topic. + // No-op — the subscription is live the moment the ack arrives; we only need to keep it off the + // dead-letter log. Matches OpcUaPublishActor / DriverHostActor's SubscribeAck convention. + Receive(_ => { /* PubSub ack */ }); } private void OnApply(ApplyScriptedAlarms msg) @@ -264,11 +276,110 @@ public sealed class ScriptedAlarmHostActor : ReceiveActor _mediator.Tell(new Publish(AlertsTopic, evt)); } + /// + /// Drives an inbound OPC UA Part 9 alarm method call (delivered as an + /// on the cluster alarm-commands topic) onto the matching + /// operation. + /// + /// + /// Ownership filter. The topic is a cluster-wide broadcast; every host node receives + /// every command, but each owns a disjoint subset of alarms (its engine's loaded set). A + /// command for an alarm this engine does NOT own is a no-op — the owning node will act on it. + /// + /// + /// + /// No re-projection. The engine op raises on + /// success, which already marshals back to and re-projects the + /// condition to the OPC UA node + the alerts topic. So this handler just calls the op and + /// awaits; it never touches the publish actor directly. + /// + /// + /// + /// Async on the actor thread. The handler is a Task-returning + /// ReceiveAsync (this project's AK2003 analyzer forbids an async-void Receive + /// delegate). Akka suspends the actor's mailbox until the returned task completes, so the op + /// runs ordered + awaited on the actor thread — never overlapping the next message. The engine + /// also serialises every operation behind its own _evalGate and marshals every emission + /// back via Self.Tell (never touching off-thread). The whole body + /// is wrapped in a try/catch so a faulting op can never escape the handler and fault the actor + /// — failures are logged like and swallowed. + /// + /// + /// The inbound alarm command. + private async Task OnAlarmCommand(AlarmCommand cmd) + { + // Ownership filter FIRST: ignore commands for alarms this engine doesn't own. The topic is a + // cluster-wide broadcast, so the same command lands on every host — only the owner acts. + if (!_engine.LoadedAlarmIds.Contains(cmd.AlarmId)) + { + _log.Debug("ScriptedAlarmHost: ignoring AlarmCommand {Op} for unowned alarm {AlarmId}", + cmd.Operation, cmd.AlarmId); + return; + } + + try + { + switch (cmd.Operation) + { + case "Acknowledge": + await _engine.AcknowledgeAsync(cmd.AlarmId, cmd.User, cmd.Comment, CancellationToken.None); + break; + case "Confirm": + await _engine.ConfirmAsync(cmd.AlarmId, cmd.User, cmd.Comment, CancellationToken.None); + break; + case "OneShotShelve": + await _engine.OneShotShelveAsync(cmd.AlarmId, cmd.User, CancellationToken.None); + break; + case "TimedShelve": + // A timed shelve needs the absolute unshelve instant. T18 derives it from the OPC UA + // Duration (UtcNow + shelvingTime); a command missing it is malformed — log + reject + // rather than throw (a throw out of this async void would crash the actor). + if (cmd.UnshelveAtUtc is not { } unshelveAt) + { + _log.Warning("ScriptedAlarmHost: rejecting TimedShelve for {AlarmId} — missing UnshelveAtUtc", + cmd.AlarmId); + return; + } + await _engine.TimedShelveAsync(cmd.AlarmId, cmd.User, unshelveAt, CancellationToken.None); + break; + case "Unshelve": + await _engine.UnshelveAsync(cmd.AlarmId, cmd.User, CancellationToken.None); + break; + case "Enable": + await _engine.EnableAsync(cmd.AlarmId, cmd.User, CancellationToken.None); + break; + case "Disable": + await _engine.DisableAsync(cmd.AlarmId, cmd.User, CancellationToken.None); + break; + case "AddComment": + // AddComment's text is required by the engine (ApplyAddComment takes a non-null text); + // coalesce a null comment to empty so a comment-less AddComment is still a valid no-op + // rather than an NRE. + await _engine.AddCommentAsync(cmd.AlarmId, cmd.User, cmd.Comment ?? string.Empty, CancellationToken.None); + break; + default: + _log.Warning("ScriptedAlarmHost: ignoring AlarmCommand with unknown operation {Op} for {AlarmId}", + cmd.Operation, cmd.AlarmId); + break; + } + } + catch (Exception ex) + { + // A failing engine op must not crash the actor — mirror OnLoadFailed's log-and-stay-inert style. + _log.Warning(ex, "ScriptedAlarmHost: engine op {Op} failed for alarm {AlarmId}", + cmd.Operation, cmd.AlarmId); + } + } + /// protected override void PreStart() { // Resolve the cluster DPS mediator once, on the actor thread, so emissions only Tell it. _mediator = DistributedPubSub.Get(Context.System).Mediator; + // Subscribe to the `alarm-commands` topic so inbound OPC UA Part 9 method calls (published by + // the node manager's condition handlers, T18) land here as AlarmCommands. The Subscribe is sent + // from Self so the SubscribeAck returns to this actor (handled as a no-op in the ctor wiring). + _mediator.Tell(new Subscribe(AlarmCommandsTopic, Self)); base.PreStart(); } diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/ScriptedAlarms/ScriptedAlarmHostActorTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/ScriptedAlarms/ScriptedAlarmHostActorTests.cs index e8c0c7c1..3b6db3f2 100644 --- a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/ScriptedAlarms/ScriptedAlarmHostActorTests.cs +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/ScriptedAlarms/ScriptedAlarmHostActorTests.cs @@ -247,4 +247,86 @@ public sealed class ScriptedAlarmHostActorTests : RuntimeActorTestBase // No LATER RegisterInterest may re-introduce the first (superseded) apply's "M.A" ref. mux.ExpectNoMsg(TimeSpan.FromMilliseconds(500)); } + + /// Command path: an AlarmCommand("Acknowledge") for an alarm this host owns (and that is + /// currently active+unacknowledged) drives the engine's AcknowledgeAsync — observed via the resulting + /// AlarmStateUpdate(Acknowledged=true) and an AlarmTransitionEvent("Acknowledged") on the alerts topic + /// carrying the command's User (the user threads through AcknowledgeAsync → LastAckUser → evt.User). + [Fact] + public void AlarmCommand_acknowledge_drives_engine_with_mapped_args() + { + var publish = CreateTestProbe(); + var mux = CreateTestProbe(); + var alerts = CreateTestProbe(); + SubscribeToAlerts(alerts); + + var (host, _) = Spawn(publish, mux); + host.Tell(new ScriptedAlarmHostActor.ApplyScriptedAlarms(new[] { Plan(id: "alm-1", depRef: "M.T") })); + mux.ExpectMsg(Timeout); // load completed + + // Activate so there is something to acknowledge (Acknowledge no-ops on an already-acked alarm). + host.Tell(new VirtualTagActor.DependencyValueChanged("M.T", 99, DateTime.UtcNow)); + publish.FishForMessage(m => m.State.Active && !m.State.Acknowledged, Timeout); + alerts.FishForMessage(e => e.TransitionKind == "Activated", Timeout); + + // Acknowledge via the command topic — the host owns alm-1, so AcknowledgeAsync runs. + host.Tell(new AlarmCommand( + AlarmId: "alm-1", Operation: "Acknowledge", User: "alice", Comment: "ack-note", UnshelveAtUtc: null)); + + var acked = publish.FishForMessage(m => m.State.Acknowledged, Timeout); + acked.AlarmNodeId.ShouldBe("alm-1"); + acked.State.Acknowledged.ShouldBeTrue(); + + // The transition carries the acting user mapped from cmd.User (proves AcknowledgeAsync got it). + var evt = alerts.FishForMessage(e => e.TransitionKind == "Acknowledged", Timeout); + evt.AlarmId.ShouldBe("alm-1"); + evt.User.ShouldBe("alice"); + } + + /// Ownership filter: an AlarmCommand for an AlarmId this host's engine does NOT own is ignored + /// — the engine op never runs, so no AlarmStateUpdate and no alerts transition are produced. + [Fact] + public void AlarmCommand_for_unowned_alarm_is_ignored() + { + var publish = CreateTestProbe(); + var mux = CreateTestProbe(); + var alerts = CreateTestProbe(); + SubscribeToAlerts(alerts); + + var (host, _) = Spawn(publish, mux); + host.Tell(new ScriptedAlarmHostActor.ApplyScriptedAlarms(new[] { Plan(id: "alm-1", depRef: "M.T") })); + mux.ExpectMsg(Timeout); // load completed; host owns only alm-1 + + // Command targets an alarm this engine never loaded — it must be a no-op. + host.Tell(new AlarmCommand( + AlarmId: "not-mine", Operation: "Acknowledge", User: "alice", Comment: null, UnshelveAtUtc: null)); + + publish.ExpectNoMsg(TimeSpan.FromMilliseconds(500)); // no engine op → no projection + alerts.ExpectNoMsg(TimeSpan.FromMilliseconds(500)); // no engine op → no transition + } + + /// Validation: a TimedShelve command missing UnshelveAtUtc is rejected (logged), NOT thrown — + /// the actor stays alive and still processes a subsequent valid command, proving it didn't fault. + [Fact] + public void AlarmCommand_timed_shelve_missing_unshelve_time_is_rejected_not_thrown() + { + var publish = CreateTestProbe(); + var mux = CreateTestProbe(); + var alerts = CreateTestProbe(); + SubscribeToAlerts(alerts); + + var (host, _) = Spawn(publish, mux); + host.Tell(new ScriptedAlarmHostActor.ApplyScriptedAlarms(new[] { Plan(id: "alm-1", depRef: "M.T") })); + mux.ExpectMsg(Timeout); // load completed + + // TimedShelve with a null UnshelveAtUtc is malformed — the host rejects + logs, does not throw. + host.Tell(new AlarmCommand( + AlarmId: "alm-1", Operation: "TimedShelve", User: "alice", Comment: null, UnshelveAtUtc: null)); + publish.ExpectNoMsg(TimeSpan.FromMilliseconds(500)); // rejected → no engine op → no projection + + // Prove the actor survived: activate the alarm and observe the normal projection flow. + host.Tell(new VirtualTagActor.DependencyValueChanged("M.T", 99, DateTime.UtcNow)); + var state = publish.FishForMessage(m => m.State.Active, Timeout); + state.AlarmNodeId.ShouldBe("alm-1"); + } }