feat(redundancy): gate scripted-alarm alerts publish on Primary (A1)
This commit is contained in:
@@ -280,7 +280,7 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers
|
|||||||
var engine = new ScriptedAlarmEngine(
|
var engine = new ScriptedAlarmEngine(
|
||||||
upstream, store, new ScriptLoggerFactory(_scriptRootLogger.Logger), _scriptRootLogger.Logger);
|
upstream, store, new ScriptLoggerFactory(_scriptRootLogger.Logger), _scriptRootLogger.Logger);
|
||||||
_scriptedAlarmHost = Context.ActorOf(
|
_scriptedAlarmHost = Context.ActorOf(
|
||||||
ScriptedAlarmHostActor.Props(_opcUaPublishActor, _dependencyMux, upstream, engine),
|
ScriptedAlarmHostActor.Props(_opcUaPublishActor, _dependencyMux, upstream, engine, _localNode),
|
||||||
"scripted-alarm-host");
|
"scripted-alarm-host");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -2,7 +2,9 @@ using Akka.Actor;
|
|||||||
using Akka.Cluster.Tools.PublishSubscribe;
|
using Akka.Cluster.Tools.PublishSubscribe;
|
||||||
using Akka.Event;
|
using Akka.Event;
|
||||||
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Alerts;
|
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Alerts;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Redundancy;
|
||||||
using ZB.MOM.WW.OtOpcUa.Commons.OpcUa;
|
using ZB.MOM.WW.OtOpcUa.Commons.OpcUa;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Commons.Types;
|
||||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||||
using ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms;
|
using ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms;
|
||||||
using ZB.MOM.WW.OtOpcUa.OpcUaServer;
|
using ZB.MOM.WW.OtOpcUa.OpcUaServer;
|
||||||
@@ -95,10 +97,17 @@ public sealed class ScriptedAlarmHostActor : ReceiveActor
|
|||||||
private readonly IActorRef? _mux;
|
private readonly IActorRef? _mux;
|
||||||
private readonly DependencyMuxTagUpstreamSource _upstream;
|
private readonly DependencyMuxTagUpstreamSource _upstream;
|
||||||
private readonly ScriptedAlarmEngine _engine;
|
private readonly ScriptedAlarmEngine _engine;
|
||||||
|
private readonly NodeId? _localNode;
|
||||||
private readonly ILoggingAdapter _log = Context.GetLogger();
|
private readonly ILoggingAdapter _log = Context.GetLogger();
|
||||||
private readonly CancellationTokenSource _cts = new();
|
private readonly CancellationTokenSource _cts = new();
|
||||||
private readonly EventHandler<ScriptedAlarmEvent> _onEngineEvent;
|
private readonly EventHandler<ScriptedAlarmEvent> _onEngineEvent;
|
||||||
|
|
||||||
|
/// <summary>Cached local <see cref="RedundancyRole"/> from the latest <see cref="RedundancyStateChanged"/>
|
||||||
|
/// snapshot (null = unknown until the first snapshot arrives, or no <see cref="_localNode"/> wired). The
|
||||||
|
/// cluster-wide <c>alerts</c> publish in <see cref="OnEngineEmission"/> is gated on this: only the Primary
|
||||||
|
/// publishes (default-emit while unknown so single-node deploys + the boot window never drop transitions).</summary>
|
||||||
|
private RedundancyRole? _localRole;
|
||||||
|
|
||||||
/// <summary>Monotonic load generation, bumped on every <see cref="OnApply"/>. The continuation that
|
/// <summary>Monotonic load generation, bumped on every <see cref="OnApply"/>. The continuation that
|
||||||
/// pipes back an <see cref="AlarmsLoaded"/> captures the generation it was started under; a stale
|
/// pipes back an <see cref="AlarmsLoaded"/> captures the generation it was started under; a stale
|
||||||
/// completion (an earlier generation arriving after a newer apply) is discarded in
|
/// completion (an earlier generation arriving after a newer apply) is discarded in
|
||||||
@@ -118,23 +127,31 @@ public sealed class ScriptedAlarmHostActor : ReceiveActor
|
|||||||
/// <param name="upstream">The mux-fed upstream the engine reads + subscribes from. MUST be the
|
/// <param name="upstream">The mux-fed upstream the engine reads + subscribes from. MUST be the
|
||||||
/// same instance the <paramref name="engine"/> was constructed around.</param>
|
/// same instance the <paramref name="engine"/> was constructed around.</param>
|
||||||
/// <param name="engine">The scripted-alarm engine this host owns + disposes.</param>
|
/// <param name="engine">The scripted-alarm engine this host owns + disposes.</param>
|
||||||
|
/// <param name="localNode">The local cluster node id, used to read this node's <see cref="RedundancyRole"/>
|
||||||
|
/// from the <c>redundancy-state</c> topic so only the Primary publishes the cluster-wide <c>alerts</c>
|
||||||
|
/// transition. Null (the default) leaves the role unknown ⇒ default-emit (single-node deploys + tests).</param>
|
||||||
public static Props Props(
|
public static Props Props(
|
||||||
IActorRef publishActor,
|
IActorRef publishActor,
|
||||||
IActorRef? mux,
|
IActorRef? mux,
|
||||||
DependencyMuxTagUpstreamSource upstream,
|
DependencyMuxTagUpstreamSource upstream,
|
||||||
ScriptedAlarmEngine engine) =>
|
ScriptedAlarmEngine engine,
|
||||||
Akka.Actor.Props.Create(() => new ScriptedAlarmHostActor(publishActor, mux, upstream, engine));
|
NodeId? localNode = null) =>
|
||||||
|
Akka.Actor.Props.Create(() => new ScriptedAlarmHostActor(publishActor, mux, upstream, engine, localNode));
|
||||||
|
|
||||||
/// <summary>Initializes a new instance of the <see cref="ScriptedAlarmHostActor"/> class.</summary>
|
/// <summary>Initializes a new instance of the <see cref="ScriptedAlarmHostActor"/> class.</summary>
|
||||||
/// <param name="publishActor">The OPC UA publish actor emissions are bridged to.</param>
|
/// <param name="publishActor">The OPC UA publish actor emissions are bridged to.</param>
|
||||||
/// <param name="mux">Optional dependency multiplexer the host registers dependency interest with.</param>
|
/// <param name="mux">Optional dependency multiplexer the host registers dependency interest with.</param>
|
||||||
/// <param name="upstream">The mux-fed upstream the engine reads + subscribes from (same instance the engine wraps).</param>
|
/// <param name="upstream">The mux-fed upstream the engine reads + subscribes from (same instance the engine wraps).</param>
|
||||||
/// <param name="engine">The scripted-alarm engine this host owns + disposes.</param>
|
/// <param name="engine">The scripted-alarm engine this host owns + disposes.</param>
|
||||||
|
/// <param name="localNode">The local cluster node id, used to read this node's <see cref="RedundancyRole"/>
|
||||||
|
/// from the <c>redundancy-state</c> topic so only the Primary publishes the cluster-wide <c>alerts</c>
|
||||||
|
/// transition. Null leaves the role unknown ⇒ default-emit (single-node deploys + tests).</param>
|
||||||
public ScriptedAlarmHostActor(
|
public ScriptedAlarmHostActor(
|
||||||
IActorRef publishActor,
|
IActorRef publishActor,
|
||||||
IActorRef? mux,
|
IActorRef? mux,
|
||||||
DependencyMuxTagUpstreamSource upstream,
|
DependencyMuxTagUpstreamSource upstream,
|
||||||
ScriptedAlarmEngine engine)
|
ScriptedAlarmEngine engine,
|
||||||
|
NodeId? localNode = null)
|
||||||
{
|
{
|
||||||
ArgumentNullException.ThrowIfNull(publishActor);
|
ArgumentNullException.ThrowIfNull(publishActor);
|
||||||
ArgumentNullException.ThrowIfNull(upstream);
|
ArgumentNullException.ThrowIfNull(upstream);
|
||||||
@@ -143,6 +160,7 @@ public sealed class ScriptedAlarmHostActor : ReceiveActor
|
|||||||
_mux = mux;
|
_mux = mux;
|
||||||
_upstream = upstream;
|
_upstream = upstream;
|
||||||
_engine = engine;
|
_engine = engine;
|
||||||
|
_localNode = localNode;
|
||||||
|
|
||||||
// OnEvent fires on the engine's worker thread. NEVER touch Context / actor state here —
|
// OnEvent fires on the engine's worker thread. NEVER touch Context / actor state here —
|
||||||
// marshal onto the actor thread via the thread-safe Self.Tell. Keep the handler in a field
|
// marshal onto the actor thread via the thread-safe Self.Tell. Keep the handler in a field
|
||||||
@@ -155,6 +173,9 @@ public sealed class ScriptedAlarmHostActor : ReceiveActor
|
|||||||
Receive<AlarmsLoaded>(OnAlarmsLoaded);
|
Receive<AlarmsLoaded>(OnAlarmsLoaded);
|
||||||
Receive<VirtualTagActor.DependencyValueChanged>(OnDependencyChanged);
|
Receive<VirtualTagActor.DependencyValueChanged>(OnDependencyChanged);
|
||||||
Receive<EngineEmission>(OnEngineEmission);
|
Receive<EngineEmission>(OnEngineEmission);
|
||||||
|
// Cluster redundancy snapshots (published on the `redundancy-state` topic, subscribed in PreStart)
|
||||||
|
// cache this node's role so OnEngineEmission can gate the cluster-wide alerts publish to the Primary.
|
||||||
|
Receive<RedundancyStateChanged>(OnRedundancyStateChanged);
|
||||||
// Inbound OPC UA Part 9 alarm method calls arrive as AlarmCommands on the cluster
|
// Inbound OPC UA Part 9 alarm method calls arrive as AlarmCommands on the cluster
|
||||||
// `alarm-commands` DPS topic (T18 publishes them after the AlarmAck role gate). The topic is a
|
// `alarm-commands` DPS topic (T18 publishes them after the AlarmAck role gate). The topic is a
|
||||||
// cluster-wide broadcast — every host node receives every command — so OnAlarmCommand filters to
|
// cluster-wide broadcast — every host node receives every command — so OnAlarmCommand filters to
|
||||||
@@ -275,9 +296,37 @@ public sealed class ScriptedAlarmHostActor : ReceiveActor
|
|||||||
User: TransitionUser(e),
|
User: TransitionUser(e),
|
||||||
TimestampUtc: e.TimestampUtc);
|
TimestampUtc: e.TimestampUtc);
|
||||||
|
|
||||||
|
// Warm-standby dedup: only the Primary (driver-role leader) publishes the cluster-wide
|
||||||
|
// transition. Default-emit until told we are Secondary/Detached so single-node deploys + the
|
||||||
|
// boot window never drop transitions. The OPC UA node write above + inbound command processing
|
||||||
|
// stay ungated (the secondary keeps its address space + engine state warm for failover).
|
||||||
|
if (_localRole is RedundancyRole.Secondary or RedundancyRole.Detached)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
_mediator.Tell(new Publish(AlertsTopic, evt));
|
_mediator.Tell(new Publish(AlertsTopic, evt));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>Caches this node's <see cref="RedundancyRole"/> from a cluster redundancy snapshot so
|
||||||
|
/// <see cref="OnEngineEmission"/> can gate the cluster-wide <c>alerts</c> publish to the Primary. A
|
||||||
|
/// snapshot that doesn't mention <see cref="_localNode"/> (or no local node wired) leaves the cached
|
||||||
|
/// role unchanged ⇒ default-emit. Mirrors <see cref="OpcUaPublishActor"/>'s handler.</summary>
|
||||||
|
/// <param name="msg">The cluster redundancy snapshot.</param>
|
||||||
|
private void OnRedundancyStateChanged(RedundancyStateChanged msg)
|
||||||
|
{
|
||||||
|
if (_localNode is null)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
var local = msg.Nodes.FirstOrDefault(n => n.NodeId == _localNode.Value);
|
||||||
|
if (local is not null)
|
||||||
|
{
|
||||||
|
_localRole = local.Role;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Drives an inbound OPC UA Part 9 alarm method call (delivered as an <see cref="AlarmCommand"/>
|
/// Drives an inbound OPC UA Part 9 alarm method call (delivered as an <see cref="AlarmCommand"/>
|
||||||
/// on the cluster <c>alarm-commands</c> topic) onto the matching <see cref="ScriptedAlarmEngine"/>
|
/// on the cluster <c>alarm-commands</c> topic) onto the matching <see cref="ScriptedAlarmEngine"/>
|
||||||
@@ -386,6 +435,10 @@ public sealed class ScriptedAlarmHostActor : ReceiveActor
|
|||||||
// the node manager's condition handlers, T18) land here as AlarmCommands. The Subscribe is sent
|
// the node manager's condition handlers, T18) land here as AlarmCommands. The Subscribe is sent
|
||||||
// from Self so the SubscribeAck returns to this actor (handled as a no-op in the ctor wiring).
|
// from Self so the SubscribeAck returns to this actor (handled as a no-op in the ctor wiring).
|
||||||
_mediator.Tell(new Subscribe(AlarmCommandsTopic, Self));
|
_mediator.Tell(new Subscribe(AlarmCommandsTopic, Self));
|
||||||
|
// Also subscribe to the `redundancy-state` topic so cluster role changes land as
|
||||||
|
// RedundancyStateChanged and cache this node's role — OnEngineEmission gates the cluster-wide
|
||||||
|
// alerts publish to the Primary so a 2-node single-cluster deploy doesn't double-emit transitions.
|
||||||
|
_mediator.Tell(new Subscribe(OpcUaPublishActor.RedundancyStateTopic, Self));
|
||||||
base.PreStart();
|
base.PreStart();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
+129
-2
@@ -5,7 +5,9 @@ using Serilog;
|
|||||||
using Shouldly;
|
using Shouldly;
|
||||||
using Xunit;
|
using Xunit;
|
||||||
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Alerts;
|
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Alerts;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Commons.Messages.Redundancy;
|
||||||
using ZB.MOM.WW.OtOpcUa.Commons.OpcUa;
|
using ZB.MOM.WW.OtOpcUa.Commons.OpcUa;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Commons.Types;
|
||||||
using ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms;
|
using ZB.MOM.WW.OtOpcUa.Core.ScriptedAlarms;
|
||||||
using ZB.MOM.WW.OtOpcUa.Core.Scripting;
|
using ZB.MOM.WW.OtOpcUa.Core.Scripting;
|
||||||
using ZB.MOM.WW.OtOpcUa.OpcUaServer;
|
using ZB.MOM.WW.OtOpcUa.OpcUaServer;
|
||||||
@@ -74,15 +76,33 @@ public sealed class ScriptedAlarmHostActorTests : RuntimeActorTestBase
|
|||||||
return new ScriptedAlarmEngine(upstream, new InMemoryAlarmStateStore(), new ScriptLoggerFactory(logger), logger);
|
return new ScriptedAlarmEngine(upstream, new InMemoryAlarmStateStore(), new ScriptLoggerFactory(logger), logger);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>The local node id used by the redundancy-gating tests.</summary>
|
||||||
|
private static readonly NodeId LocalNode = new("node-A");
|
||||||
|
|
||||||
private (IActorRef Host, DependencyMuxTagUpstreamSource Upstream) Spawn(
|
private (IActorRef Host, DependencyMuxTagUpstreamSource Upstream) Spawn(
|
||||||
TestProbe publish, TestProbe mux)
|
TestProbe publish, TestProbe mux, NodeId? localNode = null)
|
||||||
{
|
{
|
||||||
var upstream = new DependencyMuxTagUpstreamSource();
|
var upstream = new DependencyMuxTagUpstreamSource();
|
||||||
var engine = BuildEngine(upstream);
|
var engine = BuildEngine(upstream);
|
||||||
var host = Sys.ActorOf(ScriptedAlarmHostActor.Props(publish.Ref, mux.Ref, upstream, engine));
|
var host = Sys.ActorOf(ScriptedAlarmHostActor.Props(publish.Ref, mux.Ref, upstream, engine, localNode));
|
||||||
return (host, upstream);
|
return (host, upstream);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>Tell the host a <see cref="RedundancyStateChanged"/> snapshot marking
|
||||||
|
/// <see cref="LocalNode"/> with <paramref name="role"/> so the gate observes the local role.</summary>
|
||||||
|
private static void TellRedundancyRole(IActorRef host, RedundancyRole role) =>
|
||||||
|
host.Tell(new RedundancyStateChanged(
|
||||||
|
new[]
|
||||||
|
{
|
||||||
|
new NodeRedundancyState(
|
||||||
|
NodeId: LocalNode,
|
||||||
|
Role: role,
|
||||||
|
IsClusterLeader: role == RedundancyRole.Primary,
|
||||||
|
IsRoleLeaderForDriver: role == RedundancyRole.Primary,
|
||||||
|
AsOfUtc: DateTime.UtcNow),
|
||||||
|
},
|
||||||
|
CorrelationId.NewId()));
|
||||||
|
|
||||||
/// <summary>Subscribe <paramref name="probe"/> to the <c>alerts</c> DPS topic and wait for the ack.
|
/// <summary>Subscribe <paramref name="probe"/> to the <c>alerts</c> DPS topic and wait for the ack.
|
||||||
/// The Subscribe is sent FROM the probe so the SubscribeAck returns to it.</summary>
|
/// The Subscribe is sent FROM the probe so the SubscribeAck returns to it.</summary>
|
||||||
private void SubscribeToAlerts(TestProbe probe)
|
private void SubscribeToAlerts(TestProbe probe)
|
||||||
@@ -381,4 +401,111 @@ public sealed class ScriptedAlarmHostActorTests : RuntimeActorTestBase
|
|||||||
var state = publish.FishForMessage<OpcUaPublishActor.AlarmStateUpdate>(m => m.State.Active, Timeout);
|
var state = publish.FishForMessage<OpcUaPublishActor.AlarmStateUpdate>(m => m.State.Active, Timeout);
|
||||||
state.AlarmNodeId.ShouldBe("alm-1");
|
state.AlarmNodeId.ShouldBe("alm-1");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>Default-emit (T1): before ANY RedundancyStateChanged snapshot arrives — the boot window,
|
||||||
|
/// and the steady state for single-node deploys (the sole node is always Primary) — the host MUST
|
||||||
|
/// publish the cluster-wide alerts transition. Constructed WITH a localNode but no snapshot sent, so
|
||||||
|
/// the cached local role is unknown ⇒ treated as Primary/emit.</summary>
|
||||||
|
[Fact]
|
||||||
|
public void Emission_is_published_to_alerts_by_default_before_any_redundancy_state()
|
||||||
|
{
|
||||||
|
var publish = CreateTestProbe();
|
||||||
|
var mux = CreateTestProbe();
|
||||||
|
var alerts = CreateTestProbe();
|
||||||
|
SubscribeToAlerts(alerts);
|
||||||
|
|
||||||
|
var (host, _) = Spawn(publish, mux, LocalNode);
|
||||||
|
host.Tell(new ScriptedAlarmHostActor.ApplyScriptedAlarms(new[] { Plan(severity: 800) }));
|
||||||
|
mux.ExpectMsg<DependencyMuxActor.RegisterInterest>(Timeout); // load completed
|
||||||
|
|
||||||
|
// No RedundancyStateChanged sent — local role unknown ⇒ default-emit.
|
||||||
|
host.Tell(new VirtualTagActor.DependencyValueChanged("M.T", 99, DateTime.UtcNow));
|
||||||
|
|
||||||
|
// The OPC UA node write happens AND the alerts transition is published.
|
||||||
|
publish.FishForMessage<OpcUaPublishActor.AlarmStateUpdate>(m => m.State.Active, Timeout);
|
||||||
|
var evt = alerts.ExpectMsg<AlarmTransitionEvent>(Timeout);
|
||||||
|
evt.AlarmId.ShouldBe("alm-1");
|
||||||
|
evt.TransitionKind.ShouldBe("Activated");
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>Secondary suppression (T1): when the cached local role is Secondary, the host MUST NOT
|
||||||
|
/// publish the cluster-wide alerts transition (the Primary publishes the single copy) — but it MUST
|
||||||
|
/// still write the local OPC UA condition node so the secondary's address space stays warm for failover.</summary>
|
||||||
|
[Fact]
|
||||||
|
public void Secondary_node_suppresses_alerts_publish_but_still_writes_opcua()
|
||||||
|
{
|
||||||
|
var publish = CreateTestProbe();
|
||||||
|
var mux = CreateTestProbe();
|
||||||
|
var alerts = CreateTestProbe();
|
||||||
|
SubscribeToAlerts(alerts);
|
||||||
|
|
||||||
|
var (host, _) = Spawn(publish, mux, LocalNode);
|
||||||
|
host.Tell(new ScriptedAlarmHostActor.ApplyScriptedAlarms(new[] { Plan(severity: 800) }));
|
||||||
|
mux.ExpectMsg<DependencyMuxActor.RegisterInterest>(Timeout); // load completed
|
||||||
|
|
||||||
|
// Mark this node Secondary, then activate.
|
||||||
|
TellRedundancyRole(host, RedundancyRole.Secondary);
|
||||||
|
host.Tell(new VirtualTagActor.DependencyValueChanged("M.T", 99, DateTime.UtcNow));
|
||||||
|
|
||||||
|
// The local OPC UA node write is UNGATED — it must still arrive.
|
||||||
|
var state = publish.FishForMessage<OpcUaPublishActor.AlarmStateUpdate>(m => m.State.Active, Timeout);
|
||||||
|
state.AlarmNodeId.ShouldBe("alm-1");
|
||||||
|
|
||||||
|
// The cluster-wide alerts publish is gated off on the secondary.
|
||||||
|
alerts.ExpectNoMsg(TimeSpan.FromMilliseconds(500));
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>Primary publishes (T1): when the cached local role is Primary, the host publishes the
|
||||||
|
/// cluster-wide alerts transition as normal (this is the single copy the fleet sees).</summary>
|
||||||
|
[Fact]
|
||||||
|
public void Primary_node_publishes_alerts()
|
||||||
|
{
|
||||||
|
var publish = CreateTestProbe();
|
||||||
|
var mux = CreateTestProbe();
|
||||||
|
var alerts = CreateTestProbe();
|
||||||
|
SubscribeToAlerts(alerts);
|
||||||
|
|
||||||
|
var (host, _) = Spawn(publish, mux, LocalNode);
|
||||||
|
host.Tell(new ScriptedAlarmHostActor.ApplyScriptedAlarms(new[] { Plan(severity: 800) }));
|
||||||
|
mux.ExpectMsg<DependencyMuxActor.RegisterInterest>(Timeout); // load completed
|
||||||
|
|
||||||
|
// Mark this node Primary, then activate.
|
||||||
|
TellRedundancyRole(host, RedundancyRole.Primary);
|
||||||
|
host.Tell(new VirtualTagActor.DependencyValueChanged("M.T", 99, DateTime.UtcNow));
|
||||||
|
|
||||||
|
publish.FishForMessage<OpcUaPublishActor.AlarmStateUpdate>(m => m.State.Active, Timeout);
|
||||||
|
var evt = alerts.ExpectMsg<AlarmTransitionEvent>(Timeout);
|
||||||
|
evt.AlarmId.ShouldBe("alm-1");
|
||||||
|
evt.TransitionKind.ShouldBe("Activated");
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>Inbound command ungated by role (T1): the alerts-publish gate must NOT affect inbound
|
||||||
|
/// command processing. Under a Secondary role, an AlarmCommand("Acknowledge") for an owned, active
|
||||||
|
/// alarm still drives the engine — observed via the resulting AlarmStateUpdate(Acknowledged=true)
|
||||||
|
/// (the OPC UA node write is ungated so the secondary's engine state + address space stay consistent).</summary>
|
||||||
|
[Fact]
|
||||||
|
public void Inbound_AlarmCommand_is_processed_regardless_of_role()
|
||||||
|
{
|
||||||
|
var publish = CreateTestProbe();
|
||||||
|
var mux = CreateTestProbe();
|
||||||
|
var (host, _) = Spawn(publish, mux, LocalNode);
|
||||||
|
|
||||||
|
host.Tell(new ScriptedAlarmHostActor.ApplyScriptedAlarms(new[] { Plan(id: "alm-1", depRef: "M.T") }));
|
||||||
|
mux.ExpectMsg<DependencyMuxActor.RegisterInterest>(Timeout); // load completed
|
||||||
|
|
||||||
|
// Mark this node Secondary — the alerts publish is gated, but command processing is NOT.
|
||||||
|
TellRedundancyRole(host, RedundancyRole.Secondary);
|
||||||
|
|
||||||
|
// Activate so there is something to acknowledge.
|
||||||
|
host.Tell(new VirtualTagActor.DependencyValueChanged("M.T", 99, DateTime.UtcNow));
|
||||||
|
publish.FishForMessage<OpcUaPublishActor.AlarmStateUpdate>(m => m.State.Active && !m.State.Acknowledged, Timeout);
|
||||||
|
|
||||||
|
// Acknowledge via the command topic — the engine must process it even on the secondary.
|
||||||
|
host.Tell(new AlarmCommand(
|
||||||
|
AlarmId: "alm-1", Operation: "Acknowledge", User: "alice", Comment: "ack-note", UnshelveAtUtc: null));
|
||||||
|
|
||||||
|
var acked = publish.FishForMessage<OpcUaPublishActor.AlarmStateUpdate>(m => m.State.Acknowledged, Timeout);
|
||||||
|
acked.AlarmNodeId.ShouldBe("alm-1");
|
||||||
|
acked.State.Acknowledged.ShouldBeTrue();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user