diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs index aafeecbf..bb63e8a5 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs @@ -8,6 +8,7 @@ using Microsoft.Extensions.Logging.Abstractions; using ZB.MOM.WW.OtOpcUa.Commons.Engines; using ZB.MOM.WW.OtOpcUa.Commons.Interfaces; using ZB.MOM.WW.OtOpcUa.Commons.Messages.Admin; +using ZB.MOM.WW.OtOpcUa.Commons.Messages.Alerts; using ZB.MOM.WW.OtOpcUa.Commons.Messages.Deploy; using ZB.MOM.WW.OtOpcUa.Commons.Messages.Fleet; using ZB.MOM.WW.OtOpcUa.Commons.Messages.Redundancy; @@ -116,6 +117,11 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers /// value variables, so they are kept OUT of the value maps + value-subscription set. private readonly Dictionary<(string DriverInstanceId, string FullName), HashSet> _alarmNodeIdByDriverRef = new(); + /// Condition NodeId → (EquipmentId, tag Name, OPC UA alarm type) for building the + /// AlarmTransitionEvent fan-out. Built in the same PushDesiredSubscriptions alarm branch. + private readonly Dictionary _alarmMetaByNodeId = + new(StringComparer.Ordinal); + /// Derives a full Part 9 condition snapshot from each native alarm transition delta, /// tracking per-condition-NodeId prior state. 'd on every apply alongside the /// value maps so stale condition state never leaks across redeploys. @@ -131,6 +137,12 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers /// private RedundancyRole? _localRole; + /// Cached cluster DistributedPubSub mediator, resolved once in (on the + /// actor thread) and reused for the Primary-gated native-alarm alerts fan-out in + /// instead of re-resolving it per-publish. Mirrors + /// ScriptedAlarmHostActor._mediator. + private IActorRef _mediator = null!; + /// /// Routes an inbound operator write (resolved from the OPC UA node-manager side, Task 11) to the /// owning driver child. gates on the local node being the @@ -265,14 +277,17 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers /// protected override void PreStart() { + // Resolve the cluster DPS mediator once, on the actor thread, and reuse it for every subscribe + + // the Primary-gated native-alarm alerts fan-out in ForwardNativeAlarm (mirrors ScriptedAlarmHostActor). + _mediator = DistributedPubSub.Get(Context.System).Mediator; // Subscribe to deployments topic so the coordinator's broadcast lands here. - DistributedPubSub.Get(Context.System).Mediator.Tell(new Subscribe(DeploymentsTopic, Self)); + _mediator.Tell(new Subscribe(DeploymentsTopic, Self)); // Subscribe to driver-control topic so AdminUI Reconnect/Restart commands land here. - DistributedPubSub.Get(Context.System).Mediator.Tell(new Subscribe(DriverControlTopic, Self)); + _mediator.Tell(new Subscribe(DriverControlTopic, Self)); // Subscribe to the redundancy-state topic so cluster role changes cache this node's role — the // inbound-write gate in HandleRouteNodeWrite reuses the SAME signal the scripted-alarm emit gate // uses so only the Primary services operator writes (the secondary keeps state warm for failover). - DistributedPubSub.Get(Context.System).Mediator.Tell( + _mediator.Tell( new Subscribe(ZB.MOM.WW.OtOpcUa.Runtime.OpcUa.OpcUaPublishActor.RedundancyStateTopic, Self)); // Spawn the VirtualTag host BEFORE Bootstrap so the bootstrap-restore path (which routes // through PushDesiredSubscriptions and Tells ApplyVirtualTags) has a live host to target. @@ -489,8 +504,16 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers /// For each node the projects the transition delta into a full /// AlarmConditionSnapshot, then this Tells /// — the SAME message scripted alarms use, so it routes through WriteAlarmCondition. An - /// unknown ref is Debug-logged and dropped (mirrors the value drop). The /alerts fan-out is a - /// separate concern (Task 7) and is NOT emitted here. + /// unknown ref is Debug-logged and dropped (mirrors the value drop). + /// + /// + /// Each transition is ALSO published as an to the cluster + /// alerts topic — the single historization + live /alerts fan-out path, exactly as + /// scripted alarms do (ScriptedAlarmHostActor.OnEngineEmission). The OPC UA condition + /// write above stays UNGATED (a Secondary keeps its address space warm for failover), but the + /// cluster-wide alerts publish is Primary-gated on the same redundancy + /// signal the inbound-write gate uses — only the Primary publishes the single fleet-wide copy. + /// /// private void ForwardNativeAlarm(DriverInstanceActor.AttributeAlarmPublished msg) { @@ -506,6 +529,33 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers var snapshot = _nativeAlarmProjector.Project(nodeId, msg.Args); _opcUaPublishActor.Tell(new ZB.MOM.WW.OtOpcUa.Runtime.OpcUa.OpcUaPublishActor.AlarmStateUpdate( nodeId, snapshot, msg.Args.SourceTimestampUtc)); + + // Warm-standby dedup: the OPC UA condition write above is UNGATED (the secondary keeps its + // address space warm), but only the Primary publishes the cluster-wide alerts transition. + // Default-emit until told we are Secondary/Detached so single-node deploys + the boot window + // never drop transitions — the SAME signal HandleRouteNodeWrite gates writes on. + if (_localRole is RedundancyRole.Secondary or RedundancyRole.Detached) continue; + + var meta = _alarmMetaByNodeId.TryGetValue(nodeId, out var m) + ? m : (EquipmentId: nodeId, Name: nodeId, AlarmType: "AlarmCondition"); + _mediator.Tell(new Publish(ScriptedAlarmHostActor.AlertsTopic, new AlarmTransitionEvent( + AlarmId: nodeId, + EquipmentPath: meta.EquipmentId, + AlarmName: meta.Name, + TransitionKind: msg.Args.Kind.ToString(), + // The projector mapped the four-bucket AlarmSeverity onto the OPC UA 1..1000 scale already; + // reuse its ushort so the condition node + the alerts row agree on severity. + Severity: snapshot.Severity, + Message: msg.Args.Message, + // Native transitions are device-driven; an operator comment only rides along on an + // acknowledge from the upstream alarm system. "device" marks a non-operator origin. + User: msg.Args.OperatorComment is null ? string.Empty : "device", + TimestampUtc: msg.Args.SourceTimestampUtc, + AlarmTypeName: meta.AlarmType, + Comment: msg.Args.OperatorComment, + // Native alarms always historize (no per-condition opt-out surface yet — that is a + // scripted-alarm plan flag); pass a concrete true so the historian's null-default isn't relied on. + HistorizeToAveva: true))); } } @@ -805,6 +855,9 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers // every apply; the projector is Clear()'d too so stale per-condition state never leaks across // redeploys (renames/removals/address-space rebuilds). _alarmNodeIdByDriverRef.Clear(); + // Per-condition metadata (EquipmentId / Name / OPC UA alarm type) for the alerts fan-out, built in + // the SAME alarm branch as the node map so a redeploy can't leave it out of sync. Cleared alongside it. + _alarmMetaByNodeId.Clear(); _nativeAlarmProjector.Clear(); foreach (var t in composition.EquipmentTags) { @@ -818,6 +871,10 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers if (!_alarmNodeIdByDriverRef.TryGetValue(key, out var aset)) _alarmNodeIdByDriverRef[key] = aset = new HashSet(StringComparer.Ordinal); aset.Add(nodeId); + // Capture the per-condition metadata the alerts fan-out (ForwardNativeAlarm) needs to build + // the AlarmTransitionEvent: the equipment path, the operator-visible alarm name, and the + // OPC UA Part 9 subtype. Keyed by the condition NodeId (the projection's own key). + _alarmMetaByNodeId[nodeId] = (t.EquipmentId, t.Name, t.Alarm.AlarmType); continue; } if (!_nodeIdByDriverRef.TryGetValue(key, out var set)) diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverHostActorNativeAlarmTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverHostActorNativeAlarmTests.cs index d4a475c4..1bc2a7f2 100644 --- a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverHostActorNativeAlarmTests.cs +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Drivers/DriverHostActorNativeAlarmTests.cs @@ -1,11 +1,15 @@ using System.Text.Json; using Akka.Actor; +using Akka.Cluster.Tools.PublishSubscribe; +using Akka.TestKit; using Microsoft.EntityFrameworkCore; using Shouldly; using Xunit; using ZB.MOM.WW.OtOpcUa.Commons.Engines; +using ZB.MOM.WW.OtOpcUa.Commons.Messages.Alerts; using ZB.MOM.WW.OtOpcUa.Commons.Messages.Deploy; using ZB.MOM.WW.OtOpcUa.Commons.Messages.Fleet; +using ZB.MOM.WW.OtOpcUa.Commons.Messages.Redundancy; using ZB.MOM.WW.OtOpcUa.Commons.OpcUa; using ZB.MOM.WW.OtOpcUa.Commons.Types; using ZB.MOM.WW.OtOpcUa.Configuration; @@ -14,6 +18,7 @@ using ZB.MOM.WW.OtOpcUa.Configuration.Enums; using ZB.MOM.WW.OtOpcUa.Core.Abstractions; using ZB.MOM.WW.OtOpcUa.Runtime.Drivers; using ZB.MOM.WW.OtOpcUa.Runtime.OpcUa; +using ZB.MOM.WW.OtOpcUa.Runtime.ScriptedAlarms; using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness; namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.Drivers; @@ -99,6 +104,116 @@ public sealed class DriverHostActorNativeAlarmTests : RuntimeActorTestBase publish.ExpectNoMsg(TimeSpan.FromMilliseconds(500)); } + /// Primary (default/unset role) fan-out (Phase B WS-5): a native alarm RAISE on a known ref + /// publishes exactly one to the cluster alerts topic with + /// AlarmId = the folder-scoped condition NodeId, alongside the (ungated) OPC UA condition + /// update. No is sent, so the cached role is unknown ⇒ emit. + [Fact] + public void Native_alarm_publishes_AlarmTransitionEvent_to_alerts_when_primary() + { + var db = NewInMemoryDbFactory(); + var deploymentId = SeedDeploymentWithAlarmTag(db, RevA, + Equip: "eq-1", Driver: "drv-1", FullName: "Temp.HiHi", Folder: null, Name: "temp_hi"); + + var alerts = CreateTestProbe(); + SubscribeToAlerts(alerts); + + var (actor, publish) = SpawnHostAndApply(db, deploymentId); + + actor.Tell(new DriverInstanceActor.AttributeAlarmPublished("drv-1", new AlarmEventArgs( + new StubAlarmHandle(), + SourceNodeId: "Temp.HiHi", + ConditionId: "cond-1", + AlarmType: "OffNormalAlarm", + Message: "temperature high", + Severity: AlarmSeverity.High, + SourceTimestampUtc: Ts, + Kind: AlarmTransitionKind.Raise))); + + // The OPC UA condition update is UNGATED — it must arrive. + var update = publish.ExpectMsg(TimeSpan.FromSeconds(5)); + update.AlarmNodeId.ShouldBe("eq-1/temp_hi"); + + // Role unknown ⇒ default-emit: exactly one AlarmTransitionEvent on the alerts topic. + var evt = alerts.ExpectMsg(TimeSpan.FromSeconds(5)); + evt.AlarmId.ShouldBe("eq-1/temp_hi"); // the folder-scoped condition NodeId + evt.EquipmentPath.ShouldBe("eq-1"); // from the alarm-bearing tag's EquipmentId + evt.AlarmName.ShouldBe("temp_hi"); // from the tag's Name + evt.TransitionKind.ShouldBe("Raise"); // AlarmEventArgs.Kind.ToString() + evt.AlarmTypeName.ShouldBe("OffNormalAlarm"); // the tag's alarm AlarmType + evt.Severity.ShouldBe(700); // AlarmSeverity.High → projector 700 + evt.Message.ShouldBe("temperature high"); + evt.User.ShouldBe(string.Empty); // no operator comment ⇒ device-origin (empty user) + evt.HistorizeToAveva.ShouldBe(true); + alerts.ExpectNoMsg(TimeSpan.FromMilliseconds(300)); // exactly one + } + + /// Secondary suppression (Phase B WS-5): when the cached local role is Secondary the host + /// MUST still write the local OPC UA condition node (ungated — keeps the standby's address space warm + /// for failover) but MUST NOT publish the cluster-wide alerts transition (the Primary publishes + /// the single fleet-wide copy). + [Fact] + public void Secondary_node_suppresses_alerts_publish_but_still_updates_condition() + { + var db = NewInMemoryDbFactory(); + var deploymentId = SeedDeploymentWithAlarmTag(db, RevA, + Equip: "eq-1", Driver: "drv-1", FullName: "Temp.HiHi", Folder: null, Name: "temp_hi"); + + var alerts = CreateTestProbe(); + SubscribeToAlerts(alerts); + + var (actor, publish) = SpawnHostAndApply(db, deploymentId); + + // Mark this node Secondary so the alerts publish is gated off. + TellRedundancyRole(actor, RedundancyRole.Secondary); + + actor.Tell(new DriverInstanceActor.AttributeAlarmPublished("drv-1", new AlarmEventArgs( + new StubAlarmHandle(), + SourceNodeId: "Temp.HiHi", + ConditionId: "cond-1", + AlarmType: "OffNormalAlarm", + Message: "temperature high", + Severity: AlarmSeverity.High, + SourceTimestampUtc: Ts, + Kind: AlarmTransitionKind.Raise))); + + // The OPC UA condition update is UNGATED — it must still arrive on the secondary. + var update = publish.ExpectMsg(TimeSpan.FromSeconds(5)); + update.AlarmNodeId.ShouldBe("eq-1/temp_hi"); + update.State.Active.ShouldBeTrue(); + + // The cluster-wide alerts publish is gated off on the secondary. + alerts.ExpectNoMsg(TimeSpan.FromMilliseconds(500)); + } + + /// Subscribe to the alerts DPS topic and wait for the ack. + /// The Subscribe is sent FROM the probe so the SubscribeAck returns to it. Mirrors the + /// ScriptedAlarmHostActor test harness. + private void SubscribeToAlerts(TestProbe probe) + { + DistributedPubSub.Get(Sys).Mediator.Tell( + new Subscribe(ScriptedAlarmHostActor.AlertsTopic, probe.Ref), probe.Ref); + probe.ExpectMsg(TimeSpan.FromSeconds(5)); + } + + /// Tell the host a snapshot marking the host's own + /// node (, which equals the host's _localNode) with + /// so the alerts-publish gate observes the local role. + private static void TellRedundancyRole(IActorRef host, RedundancyRole role) + { + host.Tell(new RedundancyStateChanged( + new[] + { + new NodeRedundancyState( + NodeId: TestNode, + Role: role, + IsClusterLeader: role == RedundancyRole.Primary, + IsRoleLeaderForDriver: role == RedundancyRole.Primary, + AsOfUtc: DateTime.UtcNow), + }, + CorrelationId.NewId())); + } + /// Spawns the host with a publish probe, dispatches the deployment, and waits for the Applied /// ACK so the apply (and thus the alarm-map build in PushDesiredSubscriptions) has completed before the /// test publishes an alarm. A VirtualTag-host probe is injected so the real host isn't spawned.