feat(alarms): Primary-gated AlarmTransitionEvent fan-out for native alarms (Phase B WS-5)
This commit is contained in:
@@ -8,6 +8,7 @@ using Microsoft.Extensions.Logging.Abstractions;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Engines;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Interfaces;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Admin;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Alerts;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Deploy;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Fleet;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Redundancy;
|
||||
@@ -116,6 +117,11 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
|
||||
/// value variables, so they are kept OUT of the value maps + value-subscription set.</summary>
|
||||
private readonly Dictionary<(string DriverInstanceId, string FullName), HashSet<string>> _alarmNodeIdByDriverRef = new();
|
||||
|
||||
/// <summary>Condition NodeId → (EquipmentId, tag Name, OPC UA alarm type) for building the
|
||||
/// AlarmTransitionEvent fan-out. Built in the same PushDesiredSubscriptions alarm branch.</summary>
|
||||
private readonly Dictionary<string, (string EquipmentId, string Name, string AlarmType)> _alarmMetaByNodeId =
|
||||
new(StringComparer.Ordinal);
|
||||
|
||||
/// <summary>Derives a full Part 9 condition snapshot from each native alarm transition delta,
|
||||
/// tracking per-condition-NodeId prior state. <see cref="Clear"/>'d on every apply alongside the
|
||||
/// value maps so stale condition state never leaks across redeploys.</summary>
|
||||
@@ -131,6 +137,12 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
|
||||
/// </summary>
|
||||
private RedundancyRole? _localRole;
|
||||
|
||||
/// <summary>Cached cluster DistributedPubSub mediator, resolved once in <see cref="PreStart"/> (on the
|
||||
/// actor thread) and reused for the Primary-gated native-alarm <c>alerts</c> fan-out in
|
||||
/// <see cref="ForwardNativeAlarm"/> instead of re-resolving it per-publish. Mirrors
|
||||
/// <c>ScriptedAlarmHostActor._mediator</c>.</summary>
|
||||
private IActorRef _mediator = null!;
|
||||
|
||||
/// <summary>
|
||||
/// Routes an inbound operator write (resolved from the OPC UA node-manager side, Task 11) to the
|
||||
/// owning driver child. <see cref="HandleRouteNodeWrite"/> gates on the local node being the
|
||||
@@ -265,14 +277,17 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
|
||||
/// <inheritdoc />
|
||||
protected override void PreStart()
|
||||
{
|
||||
// Resolve the cluster DPS mediator once, on the actor thread, and reuse it for every subscribe +
|
||||
// the Primary-gated native-alarm alerts fan-out in ForwardNativeAlarm (mirrors ScriptedAlarmHostActor).
|
||||
_mediator = DistributedPubSub.Get(Context.System).Mediator;
|
||||
// Subscribe to deployments topic so the coordinator's broadcast lands here.
|
||||
DistributedPubSub.Get(Context.System).Mediator.Tell(new Subscribe(DeploymentsTopic, Self));
|
||||
_mediator.Tell(new Subscribe(DeploymentsTopic, Self));
|
||||
// Subscribe to driver-control topic so AdminUI Reconnect/Restart commands land here.
|
||||
DistributedPubSub.Get(Context.System).Mediator.Tell(new Subscribe(DriverControlTopic, Self));
|
||||
_mediator.Tell(new Subscribe(DriverControlTopic, Self));
|
||||
// Subscribe to the redundancy-state topic so cluster role changes cache this node's role — the
|
||||
// inbound-write gate in HandleRouteNodeWrite reuses the SAME signal the scripted-alarm emit gate
|
||||
// uses so only the Primary services operator writes (the secondary keeps state warm for failover).
|
||||
DistributedPubSub.Get(Context.System).Mediator.Tell(
|
||||
_mediator.Tell(
|
||||
new Subscribe(ZB.MOM.WW.OtOpcUa.Runtime.OpcUa.OpcUaPublishActor.RedundancyStateTopic, Self));
|
||||
// Spawn the VirtualTag host BEFORE Bootstrap so the bootstrap-restore path (which routes
|
||||
// through PushDesiredSubscriptions and Tells ApplyVirtualTags) has a live host to target.
|
||||
@@ -489,8 +504,16 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
|
||||
/// For each node the <see cref="_nativeAlarmProjector"/> projects the transition delta into a full
|
||||
/// <c>AlarmConditionSnapshot</c>, then this Tells <see cref="ZB.MOM.WW.OtOpcUa.Runtime.OpcUa.OpcUaPublishActor.AlarmStateUpdate"/>
|
||||
/// — the SAME message scripted alarms use, so it routes through <c>WriteAlarmCondition</c>. An
|
||||
/// unknown ref is Debug-logged and dropped (mirrors the value drop). The <c>/alerts</c> fan-out is a
|
||||
/// separate concern (Task 7) and is NOT emitted here.
|
||||
/// unknown ref is Debug-logged and dropped (mirrors the value drop).
|
||||
///
|
||||
/// <para>
|
||||
/// Each transition is ALSO published as an <see cref="AlarmTransitionEvent"/> to the cluster
|
||||
/// <c>alerts</c> topic — the single historization + live <c>/alerts</c> fan-out path, exactly as
|
||||
/// scripted alarms do (<c>ScriptedAlarmHostActor.OnEngineEmission</c>). The OPC UA condition
|
||||
/// write above stays UNGATED (a Secondary keeps its address space warm for failover), but the
|
||||
/// cluster-wide alerts publish is Primary-gated on the same <see cref="_localRole"/> redundancy
|
||||
/// signal the inbound-write gate uses — only the Primary publishes the single fleet-wide copy.
|
||||
/// </para>
|
||||
/// </summary>
|
||||
private void ForwardNativeAlarm(DriverInstanceActor.AttributeAlarmPublished msg)
|
||||
{
|
||||
@@ -506,6 +529,33 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
|
||||
var snapshot = _nativeAlarmProjector.Project(nodeId, msg.Args);
|
||||
_opcUaPublishActor.Tell(new ZB.MOM.WW.OtOpcUa.Runtime.OpcUa.OpcUaPublishActor.AlarmStateUpdate(
|
||||
nodeId, snapshot, msg.Args.SourceTimestampUtc));
|
||||
|
||||
// Warm-standby dedup: the OPC UA condition write above is UNGATED (the secondary keeps its
|
||||
// address space warm), but only the Primary publishes the cluster-wide alerts transition.
|
||||
// Default-emit until told we are Secondary/Detached so single-node deploys + the boot window
|
||||
// never drop transitions — the SAME signal HandleRouteNodeWrite gates writes on.
|
||||
if (_localRole is RedundancyRole.Secondary or RedundancyRole.Detached) continue;
|
||||
|
||||
var meta = _alarmMetaByNodeId.TryGetValue(nodeId, out var m)
|
||||
? m : (EquipmentId: nodeId, Name: nodeId, AlarmType: "AlarmCondition");
|
||||
_mediator.Tell(new Publish(ScriptedAlarmHostActor.AlertsTopic, new AlarmTransitionEvent(
|
||||
AlarmId: nodeId,
|
||||
EquipmentPath: meta.EquipmentId,
|
||||
AlarmName: meta.Name,
|
||||
TransitionKind: msg.Args.Kind.ToString(),
|
||||
// The projector mapped the four-bucket AlarmSeverity onto the OPC UA 1..1000 scale already;
|
||||
// reuse its ushort so the condition node + the alerts row agree on severity.
|
||||
Severity: snapshot.Severity,
|
||||
Message: msg.Args.Message,
|
||||
// Native transitions are device-driven; an operator comment only rides along on an
|
||||
// acknowledge from the upstream alarm system. "device" marks a non-operator origin.
|
||||
User: msg.Args.OperatorComment is null ? string.Empty : "device",
|
||||
TimestampUtc: msg.Args.SourceTimestampUtc,
|
||||
AlarmTypeName: meta.AlarmType,
|
||||
Comment: msg.Args.OperatorComment,
|
||||
// Native alarms always historize (no per-condition opt-out surface yet — that is a
|
||||
// scripted-alarm plan flag); pass a concrete true so the historian's null-default isn't relied on.
|
||||
HistorizeToAveva: true)));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -805,6 +855,9 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
|
||||
// every apply; the projector is Clear()'d too so stale per-condition state never leaks across
|
||||
// redeploys (renames/removals/address-space rebuilds).
|
||||
_alarmNodeIdByDriverRef.Clear();
|
||||
// Per-condition metadata (EquipmentId / Name / OPC UA alarm type) for the alerts fan-out, built in
|
||||
// the SAME alarm branch as the node map so a redeploy can't leave it out of sync. Cleared alongside it.
|
||||
_alarmMetaByNodeId.Clear();
|
||||
_nativeAlarmProjector.Clear();
|
||||
foreach (var t in composition.EquipmentTags)
|
||||
{
|
||||
@@ -818,6 +871,10 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
|
||||
if (!_alarmNodeIdByDriverRef.TryGetValue(key, out var aset))
|
||||
_alarmNodeIdByDriverRef[key] = aset = new HashSet<string>(StringComparer.Ordinal);
|
||||
aset.Add(nodeId);
|
||||
// Capture the per-condition metadata the alerts fan-out (ForwardNativeAlarm) needs to build
|
||||
// the AlarmTransitionEvent: the equipment path, the operator-visible alarm name, and the
|
||||
// OPC UA Part 9 subtype. Keyed by the condition NodeId (the projection's own key).
|
||||
_alarmMetaByNodeId[nodeId] = (t.EquipmentId, t.Name, t.Alarm.AlarmType);
|
||||
continue;
|
||||
}
|
||||
if (!_nodeIdByDriverRef.TryGetValue(key, out var set))
|
||||
|
||||
+115
@@ -1,11 +1,15 @@
|
||||
using System.Text.Json;
|
||||
using Akka.Actor;
|
||||
using Akka.Cluster.Tools.PublishSubscribe;
|
||||
using Akka.TestKit;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Engines;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Alerts;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Deploy;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Fleet;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Redundancy;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.OpcUa;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.Types;
|
||||
using ZB.MOM.WW.OtOpcUa.Configuration;
|
||||
@@ -14,6 +18,7 @@ using ZB.MOM.WW.OtOpcUa.Configuration.Enums;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
using ZB.MOM.WW.OtOpcUa.Runtime.Drivers;
|
||||
using ZB.MOM.WW.OtOpcUa.Runtime.OpcUa;
|
||||
using ZB.MOM.WW.OtOpcUa.Runtime.ScriptedAlarms;
|
||||
using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.Drivers;
|
||||
@@ -99,6 +104,116 @@ public sealed class DriverHostActorNativeAlarmTests : RuntimeActorTestBase
|
||||
publish.ExpectNoMsg(TimeSpan.FromMilliseconds(500));
|
||||
}
|
||||
|
||||
/// <summary>Primary (default/unset role) fan-out (Phase B WS-5): a native alarm RAISE on a known ref
|
||||
/// publishes exactly one <see cref="AlarmTransitionEvent"/> to the cluster <c>alerts</c> topic with
|
||||
/// <c>AlarmId</c> = the folder-scoped condition NodeId, alongside the (ungated) OPC UA condition
|
||||
/// update. No <see cref="RedundancyStateChanged"/> is sent, so the cached role is unknown ⇒ emit.</summary>
|
||||
[Fact]
|
||||
public void Native_alarm_publishes_AlarmTransitionEvent_to_alerts_when_primary()
|
||||
{
|
||||
var db = NewInMemoryDbFactory();
|
||||
var deploymentId = SeedDeploymentWithAlarmTag(db, RevA,
|
||||
Equip: "eq-1", Driver: "drv-1", FullName: "Temp.HiHi", Folder: null, Name: "temp_hi");
|
||||
|
||||
var alerts = CreateTestProbe();
|
||||
SubscribeToAlerts(alerts);
|
||||
|
||||
var (actor, publish) = SpawnHostAndApply(db, deploymentId);
|
||||
|
||||
actor.Tell(new DriverInstanceActor.AttributeAlarmPublished("drv-1", new AlarmEventArgs(
|
||||
new StubAlarmHandle(),
|
||||
SourceNodeId: "Temp.HiHi",
|
||||
ConditionId: "cond-1",
|
||||
AlarmType: "OffNormalAlarm",
|
||||
Message: "temperature high",
|
||||
Severity: AlarmSeverity.High,
|
||||
SourceTimestampUtc: Ts,
|
||||
Kind: AlarmTransitionKind.Raise)));
|
||||
|
||||
// The OPC UA condition update is UNGATED — it must arrive.
|
||||
var update = publish.ExpectMsg<OpcUaPublishActor.AlarmStateUpdate>(TimeSpan.FromSeconds(5));
|
||||
update.AlarmNodeId.ShouldBe("eq-1/temp_hi");
|
||||
|
||||
// Role unknown ⇒ default-emit: exactly one AlarmTransitionEvent on the alerts topic.
|
||||
var evt = alerts.ExpectMsg<AlarmTransitionEvent>(TimeSpan.FromSeconds(5));
|
||||
evt.AlarmId.ShouldBe("eq-1/temp_hi"); // the folder-scoped condition NodeId
|
||||
evt.EquipmentPath.ShouldBe("eq-1"); // from the alarm-bearing tag's EquipmentId
|
||||
evt.AlarmName.ShouldBe("temp_hi"); // from the tag's Name
|
||||
evt.TransitionKind.ShouldBe("Raise"); // AlarmEventArgs.Kind.ToString()
|
||||
evt.AlarmTypeName.ShouldBe("OffNormalAlarm"); // the tag's alarm AlarmType
|
||||
evt.Severity.ShouldBe(700); // AlarmSeverity.High → projector 700
|
||||
evt.Message.ShouldBe("temperature high");
|
||||
evt.User.ShouldBe(string.Empty); // no operator comment ⇒ device-origin (empty user)
|
||||
evt.HistorizeToAveva.ShouldBe(true);
|
||||
alerts.ExpectNoMsg(TimeSpan.FromMilliseconds(300)); // exactly one
|
||||
}
|
||||
|
||||
/// <summary>Secondary suppression (Phase B WS-5): when the cached local role is Secondary the host
|
||||
/// MUST still write the local OPC UA condition node (ungated — keeps the standby's address space warm
|
||||
/// for failover) but MUST NOT publish the cluster-wide <c>alerts</c> transition (the Primary publishes
|
||||
/// the single fleet-wide copy).</summary>
|
||||
[Fact]
|
||||
public void Secondary_node_suppresses_alerts_publish_but_still_updates_condition()
|
||||
{
|
||||
var db = NewInMemoryDbFactory();
|
||||
var deploymentId = SeedDeploymentWithAlarmTag(db, RevA,
|
||||
Equip: "eq-1", Driver: "drv-1", FullName: "Temp.HiHi", Folder: null, Name: "temp_hi");
|
||||
|
||||
var alerts = CreateTestProbe();
|
||||
SubscribeToAlerts(alerts);
|
||||
|
||||
var (actor, publish) = SpawnHostAndApply(db, deploymentId);
|
||||
|
||||
// Mark this node Secondary so the alerts publish is gated off.
|
||||
TellRedundancyRole(actor, RedundancyRole.Secondary);
|
||||
|
||||
actor.Tell(new DriverInstanceActor.AttributeAlarmPublished("drv-1", new AlarmEventArgs(
|
||||
new StubAlarmHandle(),
|
||||
SourceNodeId: "Temp.HiHi",
|
||||
ConditionId: "cond-1",
|
||||
AlarmType: "OffNormalAlarm",
|
||||
Message: "temperature high",
|
||||
Severity: AlarmSeverity.High,
|
||||
SourceTimestampUtc: Ts,
|
||||
Kind: AlarmTransitionKind.Raise)));
|
||||
|
||||
// The OPC UA condition update is UNGATED — it must still arrive on the secondary.
|
||||
var update = publish.ExpectMsg<OpcUaPublishActor.AlarmStateUpdate>(TimeSpan.FromSeconds(5));
|
||||
update.AlarmNodeId.ShouldBe("eq-1/temp_hi");
|
||||
update.State.Active.ShouldBeTrue();
|
||||
|
||||
// The cluster-wide alerts publish is gated off on the secondary.
|
||||
alerts.ExpectNoMsg(TimeSpan.FromMilliseconds(500));
|
||||
}
|
||||
|
||||
/// <summary>Subscribe <paramref name="probe"/> to the <c>alerts</c> DPS topic and wait for the ack.
|
||||
/// The Subscribe is sent FROM the probe so the SubscribeAck returns to it. Mirrors the
|
||||
/// ScriptedAlarmHostActor test harness.</summary>
|
||||
private void SubscribeToAlerts(TestProbe probe)
|
||||
{
|
||||
DistributedPubSub.Get(Sys).Mediator.Tell(
|
||||
new Subscribe(ScriptedAlarmHostActor.AlertsTopic, probe.Ref), probe.Ref);
|
||||
probe.ExpectMsg<SubscribeAck>(TimeSpan.FromSeconds(5));
|
||||
}
|
||||
|
||||
/// <summary>Tell the host a <see cref="RedundancyStateChanged"/> snapshot marking the host's own
|
||||
/// node (<see cref="TestNode"/>, which equals the host's <c>_localNode</c>) with <paramref name="role"/>
|
||||
/// so the alerts-publish gate observes the local role.</summary>
|
||||
private static void TellRedundancyRole(IActorRef host, RedundancyRole role)
|
||||
{
|
||||
host.Tell(new RedundancyStateChanged(
|
||||
new[]
|
||||
{
|
||||
new NodeRedundancyState(
|
||||
NodeId: TestNode,
|
||||
Role: role,
|
||||
IsClusterLeader: role == RedundancyRole.Primary,
|
||||
IsRoleLeaderForDriver: role == RedundancyRole.Primary,
|
||||
AsOfUtc: DateTime.UtcNow),
|
||||
},
|
||||
CorrelationId.NewId()));
|
||||
}
|
||||
|
||||
/// <summary>Spawns the host with a publish probe, dispatches the deployment, and waits for the Applied
|
||||
/// ACK so the apply (and thus the alarm-map build in PushDesiredSubscriptions) has completed before the
|
||||
/// test publishes an alarm. A VirtualTag-host probe is injected so the real host isn't spawned.</summary>
|
||||
|
||||
Reference in New Issue
Block a user