diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs index a8f2a737..6e2247b6 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs @@ -280,7 +280,7 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers var engine = new ScriptedAlarmEngine( upstream, store, new ScriptLoggerFactory(_scriptRootLogger.Logger), _scriptRootLogger.Logger); _scriptedAlarmHost = Context.ActorOf( - ScriptedAlarmHostActor.Props(_opcUaPublishActor, _dependencyMux, upstream, engine), + ScriptedAlarmHostActor.Props(_opcUaPublishActor, _dependencyMux, upstream, engine, _localNode), "scripted-alarm-host"); } diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ScriptedAlarms/ScriptedAlarmHostActor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ScriptedAlarms/ScriptedAlarmHostActor.cs index 65e93433..94e49d51 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ScriptedAlarms/ScriptedAlarmHostActor.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ScriptedAlarms/ScriptedAlarmHostActor.cs @@ -2,7 +2,9 @@ using Akka.Actor; using Akka.Cluster.Tools.PublishSubscribe; using Akka.Event; using ZB.MOM.WW.OtOpcUa.Commons.Messages.Alerts; +using ZB.MOM.WW.OtOpcUa.Commons.Messages.Redundancy; using ZB.MOM.WW.OtOpcUa.Commons.OpcUa; +using ZB.MOM.WW.OtOpcUa.Commons.Types; using ZB.MOM.WW.OtOpcUa.Core.Abstractions; using ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms; using ZB.MOM.WW.OtOpcUa.OpcUaServer; @@ -95,10 +97,17 @@ public sealed class ScriptedAlarmHostActor : ReceiveActor private readonly IActorRef? _mux; private readonly DependencyMuxTagUpstreamSource _upstream; private readonly ScriptedAlarmEngine _engine; + private readonly NodeId? _localNode; private readonly ILoggingAdapter _log = Context.GetLogger(); private readonly CancellationTokenSource _cts = new(); private readonly EventHandler _onEngineEvent; + /// Cached local from the latest + /// snapshot (null = unknown until the first snapshot arrives, or no wired). The + /// cluster-wide alerts publish in is gated on this: only the Primary + /// publishes (default-emit while unknown so single-node deploys + the boot window never drop transitions). + private RedundancyRole? _localRole; + /// Monotonic load generation, bumped on every . The continuation that /// pipes back an captures the generation it was started under; a stale /// completion (an earlier generation arriving after a newer apply) is discarded in @@ -118,23 +127,31 @@ public sealed class ScriptedAlarmHostActor : ReceiveActor /// The mux-fed upstream the engine reads + subscribes from. MUST be the /// same instance the was constructed around. /// The scripted-alarm engine this host owns + disposes. + /// The local cluster node id, used to read this node's + /// from the redundancy-state topic so only the Primary publishes the cluster-wide alerts + /// transition. Null (the default) leaves the role unknown ⇒ default-emit (single-node deploys + tests). public static Props Props( IActorRef publishActor, IActorRef? mux, DependencyMuxTagUpstreamSource upstream, - ScriptedAlarmEngine engine) => - Akka.Actor.Props.Create(() => new ScriptedAlarmHostActor(publishActor, mux, upstream, engine)); + ScriptedAlarmEngine engine, + NodeId? localNode = null) => + Akka.Actor.Props.Create(() => new ScriptedAlarmHostActor(publishActor, mux, upstream, engine, localNode)); /// Initializes a new instance of the class. /// The OPC UA publish actor emissions are bridged to. /// Optional dependency multiplexer the host registers dependency interest with. /// The mux-fed upstream the engine reads + subscribes from (same instance the engine wraps). /// The scripted-alarm engine this host owns + disposes. + /// The local cluster node id, used to read this node's + /// from the redundancy-state topic so only the Primary publishes the cluster-wide alerts + /// transition. Null leaves the role unknown ⇒ default-emit (single-node deploys + tests). public ScriptedAlarmHostActor( IActorRef publishActor, IActorRef? mux, DependencyMuxTagUpstreamSource upstream, - ScriptedAlarmEngine engine) + ScriptedAlarmEngine engine, + NodeId? localNode = null) { ArgumentNullException.ThrowIfNull(publishActor); ArgumentNullException.ThrowIfNull(upstream); @@ -143,6 +160,7 @@ public sealed class ScriptedAlarmHostActor : ReceiveActor _mux = mux; _upstream = upstream; _engine = engine; + _localNode = localNode; // OnEvent fires on the engine's worker thread. NEVER touch Context / actor state here — // marshal onto the actor thread via the thread-safe Self.Tell. Keep the handler in a field @@ -155,6 +173,9 @@ public sealed class ScriptedAlarmHostActor : ReceiveActor Receive(OnAlarmsLoaded); Receive(OnDependencyChanged); Receive(OnEngineEmission); + // Cluster redundancy snapshots (published on the `redundancy-state` topic, subscribed in PreStart) + // cache this node's role so OnEngineEmission can gate the cluster-wide alerts publish to the Primary. + Receive(OnRedundancyStateChanged); // Inbound OPC UA Part 9 alarm method calls arrive as AlarmCommands on the cluster // `alarm-commands` DPS topic (T18 publishes them after the AlarmAck role gate). The topic is a // cluster-wide broadcast — every host node receives every command — so OnAlarmCommand filters to @@ -275,9 +296,37 @@ public sealed class ScriptedAlarmHostActor : ReceiveActor User: TransitionUser(e), TimestampUtc: e.TimestampUtc); + // Warm-standby dedup: only the Primary (driver-role leader) publishes the cluster-wide + // transition. Default-emit until told we are Secondary/Detached so single-node deploys + the + // boot window never drop transitions. The OPC UA node write above + inbound command processing + // stay ungated (the secondary keeps its address space + engine state warm for failover). + if (_localRole is RedundancyRole.Secondary or RedundancyRole.Detached) + { + return; + } + _mediator.Tell(new Publish(AlertsTopic, evt)); } + /// Caches this node's from a cluster redundancy snapshot so + /// can gate the cluster-wide alerts publish to the Primary. A + /// snapshot that doesn't mention (or no local node wired) leaves the cached + /// role unchanged ⇒ default-emit. Mirrors 's handler. + /// The cluster redundancy snapshot. + private void OnRedundancyStateChanged(RedundancyStateChanged msg) + { + if (_localNode is null) + { + return; + } + + var local = msg.Nodes.FirstOrDefault(n => n.NodeId == _localNode.Value); + if (local is not null) + { + _localRole = local.Role; + } + } + /// /// Drives an inbound OPC UA Part 9 alarm method call (delivered as an /// on the cluster alarm-commands topic) onto the matching @@ -386,6 +435,10 @@ public sealed class ScriptedAlarmHostActor : ReceiveActor // the node manager's condition handlers, T18) land here as AlarmCommands. The Subscribe is sent // from Self so the SubscribeAck returns to this actor (handled as a no-op in the ctor wiring). _mediator.Tell(new Subscribe(AlarmCommandsTopic, Self)); + // Also subscribe to the `redundancy-state` topic so cluster role changes land as + // RedundancyStateChanged and cache this node's role — OnEngineEmission gates the cluster-wide + // alerts publish to the Primary so a 2-node single-cluster deploy doesn't double-emit transitions. + _mediator.Tell(new Subscribe(OpcUaPublishActor.RedundancyStateTopic, Self)); base.PreStart(); } diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/ScriptedAlarms/ScriptedAlarmHostActorTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/ScriptedAlarms/ScriptedAlarmHostActorTests.cs index 983df7f2..0c6b89c9 100644 --- a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/ScriptedAlarms/ScriptedAlarmHostActorTests.cs +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/ScriptedAlarms/ScriptedAlarmHostActorTests.cs @@ -5,7 +5,9 @@ using Serilog; using Shouldly; using Xunit; using ZB.MOM.WW.OtOpcUa.Commons.Messages.Alerts; +using ZB.MOM.WW.OtOpcUa.Commons.Messages.Redundancy; using ZB.MOM.WW.OtOpcUa.Commons.OpcUa; +using ZB.MOM.WW.OtOpcUa.Commons.Types; using ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms; using ZB.MOM.WW.OtOpcUa.Core.Scripting; using ZB.MOM.WW.OtOpcUa.OpcUaServer; @@ -74,15 +76,33 @@ public sealed class ScriptedAlarmHostActorTests : RuntimeActorTestBase return new ScriptedAlarmEngine(upstream, new InMemoryAlarmStateStore(), new ScriptLoggerFactory(logger), logger); } + /// The local node id used by the redundancy-gating tests. + private static readonly NodeId LocalNode = new("node-A"); + private (IActorRef Host, DependencyMuxTagUpstreamSource Upstream) Spawn( - TestProbe publish, TestProbe mux) + TestProbe publish, TestProbe mux, NodeId? localNode = null) { var upstream = new DependencyMuxTagUpstreamSource(); var engine = BuildEngine(upstream); - var host = Sys.ActorOf(ScriptedAlarmHostActor.Props(publish.Ref, mux.Ref, upstream, engine)); + var host = Sys.ActorOf(ScriptedAlarmHostActor.Props(publish.Ref, mux.Ref, upstream, engine, localNode)); return (host, upstream); } + /// Tell the host a snapshot marking + /// with so the gate observes the local role. + private static void TellRedundancyRole(IActorRef host, RedundancyRole role) => + host.Tell(new RedundancyStateChanged( + new[] + { + new NodeRedundancyState( + NodeId: LocalNode, + Role: role, + IsClusterLeader: role == RedundancyRole.Primary, + IsRoleLeaderForDriver: role == RedundancyRole.Primary, + AsOfUtc: DateTime.UtcNow), + }, + CorrelationId.NewId())); + /// Subscribe to the alerts DPS topic and wait for the ack. /// The Subscribe is sent FROM the probe so the SubscribeAck returns to it. private void SubscribeToAlerts(TestProbe probe) @@ -381,4 +401,111 @@ public sealed class ScriptedAlarmHostActorTests : RuntimeActorTestBase var state = publish.FishForMessage(m => m.State.Active, Timeout); state.AlarmNodeId.ShouldBe("alm-1"); } + + /// Default-emit (T1): before ANY RedundancyStateChanged snapshot arrives — the boot window, + /// and the steady state for single-node deploys (the sole node is always Primary) — the host MUST + /// publish the cluster-wide alerts transition. Constructed WITH a localNode but no snapshot sent, so + /// the cached local role is unknown ⇒ treated as Primary/emit. + [Fact] + public void Emission_is_published_to_alerts_by_default_before_any_redundancy_state() + { + var publish = CreateTestProbe(); + var mux = CreateTestProbe(); + var alerts = CreateTestProbe(); + SubscribeToAlerts(alerts); + + var (host, _) = Spawn(publish, mux, LocalNode); + host.Tell(new ScriptedAlarmHostActor.ApplyScriptedAlarms(new[] { Plan(severity: 800) })); + mux.ExpectMsg(Timeout); // load completed + + // No RedundancyStateChanged sent — local role unknown ⇒ default-emit. + host.Tell(new VirtualTagActor.DependencyValueChanged("M.T", 99, DateTime.UtcNow)); + + // The OPC UA node write happens AND the alerts transition is published. + publish.FishForMessage(m => m.State.Active, Timeout); + var evt = alerts.ExpectMsg(Timeout); + evt.AlarmId.ShouldBe("alm-1"); + evt.TransitionKind.ShouldBe("Activated"); + } + + /// Secondary suppression (T1): when the cached local role is Secondary, the host MUST NOT + /// publish the cluster-wide alerts transition (the Primary publishes the single copy) — but it MUST + /// still write the local OPC UA condition node so the secondary's address space stays warm for failover. + [Fact] + public void Secondary_node_suppresses_alerts_publish_but_still_writes_opcua() + { + var publish = CreateTestProbe(); + var mux = CreateTestProbe(); + var alerts = CreateTestProbe(); + SubscribeToAlerts(alerts); + + var (host, _) = Spawn(publish, mux, LocalNode); + host.Tell(new ScriptedAlarmHostActor.ApplyScriptedAlarms(new[] { Plan(severity: 800) })); + mux.ExpectMsg(Timeout); // load completed + + // Mark this node Secondary, then activate. + TellRedundancyRole(host, RedundancyRole.Secondary); + host.Tell(new VirtualTagActor.DependencyValueChanged("M.T", 99, DateTime.UtcNow)); + + // The local OPC UA node write is UNGATED — it must still arrive. + var state = publish.FishForMessage(m => m.State.Active, Timeout); + state.AlarmNodeId.ShouldBe("alm-1"); + + // The cluster-wide alerts publish is gated off on the secondary. + alerts.ExpectNoMsg(TimeSpan.FromMilliseconds(500)); + } + + /// Primary publishes (T1): when the cached local role is Primary, the host publishes the + /// cluster-wide alerts transition as normal (this is the single copy the fleet sees). + [Fact] + public void Primary_node_publishes_alerts() + { + var publish = CreateTestProbe(); + var mux = CreateTestProbe(); + var alerts = CreateTestProbe(); + SubscribeToAlerts(alerts); + + var (host, _) = Spawn(publish, mux, LocalNode); + host.Tell(new ScriptedAlarmHostActor.ApplyScriptedAlarms(new[] { Plan(severity: 800) })); + mux.ExpectMsg(Timeout); // load completed + + // Mark this node Primary, then activate. + TellRedundancyRole(host, RedundancyRole.Primary); + host.Tell(new VirtualTagActor.DependencyValueChanged("M.T", 99, DateTime.UtcNow)); + + publish.FishForMessage(m => m.State.Active, Timeout); + var evt = alerts.ExpectMsg(Timeout); + evt.AlarmId.ShouldBe("alm-1"); + evt.TransitionKind.ShouldBe("Activated"); + } + + /// Inbound command ungated by role (T1): the alerts-publish gate must NOT affect inbound + /// command processing. Under a Secondary role, an AlarmCommand("Acknowledge") for an owned, active + /// alarm still drives the engine — observed via the resulting AlarmStateUpdate(Acknowledged=true) + /// (the OPC UA node write is ungated so the secondary's engine state + address space stay consistent). + [Fact] + public void Inbound_AlarmCommand_is_processed_regardless_of_role() + { + var publish = CreateTestProbe(); + var mux = CreateTestProbe(); + var (host, _) = Spawn(publish, mux, LocalNode); + + host.Tell(new ScriptedAlarmHostActor.ApplyScriptedAlarms(new[] { Plan(id: "alm-1", depRef: "M.T") })); + mux.ExpectMsg(Timeout); // load completed + + // Mark this node Secondary — the alerts publish is gated, but command processing is NOT. + TellRedundancyRole(host, RedundancyRole.Secondary); + + // Activate so there is something to acknowledge. + host.Tell(new VirtualTagActor.DependencyValueChanged("M.T", 99, DateTime.UtcNow)); + publish.FishForMessage(m => m.State.Active && !m.State.Acknowledged, Timeout); + + // Acknowledge via the command topic — the engine must process it even on the secondary. + host.Tell(new AlarmCommand( + AlarmId: "alm-1", Operation: "Acknowledge", User: "alice", Comment: "ack-note", UnshelveAtUtc: null)); + + var acked = publish.FishForMessage(m => m.State.Acknowledged, Timeout); + acked.AlarmNodeId.ShouldBe("alm-1"); + acked.State.Acknowledged.ShouldBeTrue(); + } }