diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/OpcUa/OpcUaPublishActor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/OpcUa/OpcUaPublishActor.cs index 2231e997..bc7f4f6f 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/OpcUa/OpcUaPublishActor.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/OpcUa/OpcUaPublishActor.cs @@ -59,6 +59,7 @@ public sealed class OpcUaPublishActor : ReceiveActor private readonly Phase7Applier? _applier; private readonly IActorRef? _dbHealthProbe; private readonly TimeSpan _staleWindow; + private TimeSpan _probeFreshnessWindow; private readonly Akka.Cluster.Cluster _cluster = Akka.Cluster.Cluster.Get(Context.System); private readonly ILoggingAdapter _log = Context.GetLogger(); @@ -67,6 +68,7 @@ public sealed class OpcUaPublishActor : ReceiveActor private bool _publishedAtLeastOnce; private DbHealthProbeActor.DbHealthStatus? _lastDbHealth; private RedundancyStateChanged? _lastSnapshot; + private (bool Ok, DateTime At)? _probeAboutMe; private Phase7CompositionResult _lastApplied = new( Array.Empty(), Array.Empty(), @@ -95,6 +97,9 @@ public sealed class OpcUaPublishActor : ReceiveActor /// legacy role-only ServiceLevel seam is used until a arrives. /// The window beyond which a DB-health sample or redundancy snapshot is /// considered stale; defaults to 30 seconds. + /// The window beyond which a peer's OPC UA probe verdict about + /// this node is considered stale (and thus given the benefit of the doubt rather than demoting); + /// defaults to 30 seconds. public static Props Props( IOpcUaAddressSpaceSink? sink = null, IServiceLevelPublisher? serviceLevel = null, @@ -102,7 +107,8 @@ public sealed class OpcUaPublishActor : ReceiveActor IDbContextFactory? dbFactory = null, Phase7Applier? applier = null, IActorRef? dbHealthProbe = null, - TimeSpan? staleWindow = null) => + TimeSpan? staleWindow = null, + TimeSpan? probeFreshnessWindow = null) => Akka.Actor.Props.Create(() => new OpcUaPublishActor( sink ?? NullOpcUaAddressSpaceSink.Instance, serviceLevel ?? NullServiceLevelPublisher.Instance, @@ -111,7 +117,8 @@ public sealed class OpcUaPublishActor : ReceiveActor dbFactory, applier, dbHealthProbe, - staleWindow)).WithDispatcher(DispatcherId); + staleWindow, + probeFreshnessWindow)).WithDispatcher(DispatcherId); /// Test-only Props that omits the pinned-dispatcher requirement and skips the /// DPS subscribe so unit tests can spin up the actor on a vanilla TestKit cluster. @@ -125,6 +132,9 @@ public sealed class OpcUaPublishActor : ReceiveActor /// legacy role-only ServiceLevel seam is used until a arrives. /// The window beyond which a DB-health sample or redundancy snapshot is /// considered stale; defaults to 30 seconds. + /// The window beyond which a peer's OPC UA probe verdict about + /// this node is considered stale (and thus given the benefit of the doubt rather than demoting); + /// defaults to 30 seconds. public static Props PropsForTests( IOpcUaAddressSpaceSink? sink = null, IServiceLevelPublisher? serviceLevel = null, @@ -133,7 +143,8 @@ public sealed class OpcUaPublishActor : ReceiveActor IDbContextFactory? dbFactory = null, Phase7Applier? applier = null, IActorRef? dbHealthProbe = null, - TimeSpan? staleWindow = null) => + TimeSpan? staleWindow = null, + TimeSpan? probeFreshnessWindow = null) => Akka.Actor.Props.Create(() => new OpcUaPublishActor( sink ?? NullOpcUaAddressSpaceSink.Instance, serviceLevel ?? NullServiceLevelPublisher.Instance, @@ -142,7 +153,8 @@ public sealed class OpcUaPublishActor : ReceiveActor dbFactory, applier, dbHealthProbe, - staleWindow)); + staleWindow, + probeFreshnessWindow)); /// Initializes a new instance of the class. /// The OPC UA address space sink. @@ -155,6 +167,9 @@ public sealed class OpcUaPublishActor : ReceiveActor /// legacy role-only ServiceLevel seam is used until a arrives. /// The window beyond which a DB-health sample or redundancy snapshot is /// considered stale; defaults to 30 seconds. + /// The window beyond which a peer's OPC UA probe verdict about + /// this node is considered stale (and thus given the benefit of the doubt rather than demoting); + /// defaults to 30 seconds. public OpcUaPublishActor( IOpcUaAddressSpaceSink sink, IServiceLevelPublisher serviceLevel, @@ -163,7 +178,8 @@ public sealed class OpcUaPublishActor : ReceiveActor IDbContextFactory? dbFactory = null, Phase7Applier? applier = null, IActorRef? dbHealthProbe = null, - TimeSpan? staleWindow = null) + TimeSpan? staleWindow = null, + TimeSpan? probeFreshnessWindow = null) { _sink = sink; _serviceLevel = serviceLevel; @@ -173,6 +189,7 @@ public sealed class OpcUaPublishActor : ReceiveActor _applier = applier; _dbHealthProbe = dbHealthProbe; _staleWindow = staleWindow ?? TimeSpan.FromSeconds(30); + _probeFreshnessWindow = probeFreshnessWindow ?? TimeSpan.FromSeconds(30); Receive(HandleAttributeUpdate); Receive(HandleAlarmUpdate); @@ -180,6 +197,7 @@ public sealed class OpcUaPublishActor : ReceiveActor Receive(HandleServiceLevelChanged); Receive(HandleRedundancyStateChanged); Receive(HandleDbHealthStatus); + Receive(HandlePeerProbe); Receive(_ => { /* PubSub ack */ }); } @@ -378,6 +396,31 @@ public sealed class OpcUaPublishActor : ReceiveActor RecomputeServiceLevel(); } + /// Records a peer's OPC UA probe verdict about THIS node and recomputes the local + /// ServiceLevel. The probe's is the + /// target that was probed, so a result whose NodeId is not this node is about a peer and + /// is ignored. A matching result is stamped with the receive time so + /// can debounce stale verdicts. + private void HandlePeerProbe(PeerOpcUaProbeActor.OpcUaProbeResult r) + { + // The result targets the probed node. If it isn't me, it's about a peer — ignore it. + if (_localNode is null || r.NodeId != _localNode.Value) return; + _probeAboutMe = (r.Ok, DateTime.UtcNow); + RecomputeServiceLevel(); + } + + /// The OPC UA self-probe input for the calculator: "did a peer recently observe MY OPC UA + /// endpoint as reachable?" Returns true (benefit of the doubt) when no peer verdict has + /// arrived yet (single-node / no peer) or when the latest verdict is older than + /// (the peer went away — don't penalise this node for that). + /// Only an actively-observed, RECENT Ok==false demotes. + private bool OpcUaProbeOk() + { + if (_probeAboutMe is not { } verdict) return true; + if (DateTime.UtcNow - verdict.At > _probeFreshnessWindow) return true; + return verdict.Ok; + } + /// /// Computes the local OPC UA ServiceLevel and routes it through /// (the dedup/publish/metric handler). The full path is @@ -413,7 +456,7 @@ public sealed class OpcUaPublishActor : ReceiveActor var inputs = new NodeHealthInputs( MemberState: SafeSelfStatus(), DbReachable: _lastDbHealth.Reachable, - OpcUaProbeOk: true, // TODO(2b): wire the real OPC UA self-probe result here. + OpcUaProbeOk: OpcUaProbeOk(), Stale: !_lastDbHealth.Reachable || (now - _lastDbHealth.AsOfUtc) > _staleWindow || (now - entry.AsOfUtc) > _staleWindow, diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/OpcUa/OpcUaPublishActorTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/OpcUa/OpcUaPublishActorTests.cs index cce7bc6f..b14b6c23 100644 --- a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/OpcUa/OpcUaPublishActorTests.cs +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/OpcUa/OpcUaPublishActorTests.cs @@ -326,6 +326,119 @@ public sealed class OpcUaPublishActorTests : RuntimeActorTestBase duration: TimeSpan.FromMilliseconds(500)); } + /// Verifies that an actively-observed, recent peer probe of MY endpoint that came back + /// Ok==false demotes the calculator basis to 0. A non-leader Secondary entry (no +10 bonus) + /// is used so the assertion is unambiguous: inputs (DbReachable=true, OpcUaProbeOk=false, + /// Stale=false) match Compute's _ => 0 arm → 0, +0 (non-leader) → 0. + [Fact] + public void Probe_false_about_me_with_healthy_db_publishes_0() + { + var publisher = new RecordingPublisher(); + var local = NodeId.Parse("secondary-node"); + var probe = Sys.ActorOf(Akka.Actor.Props.Create(() => + new StubDbHealth(new DbHealthProbeActor.DbHealthStatus(true, DateTime.UtcNow, null)))); + var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests( + serviceLevel: publisher, localNode: local, dbHealthProbe: probe, + staleWindow: TimeSpan.FromSeconds(30), probeFreshnessWindow: TimeSpan.FromSeconds(30))); + + actor.Tell(new DbHealthProbeActor.DbHealthStatus(true, DateTime.UtcNow, null)); + actor.Tell(new PeerOpcUaProbeActor.OpcUaProbeResult(local, Ok: false)); + actor.Tell(new RedundancyStateChanged( + Nodes: new[] + { + new NodeRedundancyState(local, RedundancyRole.Secondary, + IsClusterLeader: false, IsRoleLeaderForDriver: false, DateTime.UtcNow), + }, + CorrelationId.NewId())); + + AwaitAssert(() => publisher.Levels.ShouldContain((byte)0), + duration: TimeSpan.FromMilliseconds(500)); + } + + /// Verifies that with no peer probe result ever received, OpcUaProbeOk() defaults + /// to true (benefit of the doubt / single-node). A healthy primary-leader thus computes + /// inputs (true, true, false) → 240, +10 leader → 250. + [Fact] + public void No_probe_result_defaults_ok_true_publishes_250() + { + var publisher = new RecordingPublisher(); + var local = NodeId.Parse("primary-node"); + var probe = Sys.ActorOf(Akka.Actor.Props.Create(() => + new StubDbHealth(new DbHealthProbeActor.DbHealthStatus(true, DateTime.UtcNow, null)))); + var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests( + serviceLevel: publisher, localNode: local, dbHealthProbe: probe, + staleWindow: TimeSpan.FromSeconds(30), probeFreshnessWindow: TimeSpan.FromSeconds(30))); + + actor.Tell(new DbHealthProbeActor.DbHealthStatus(true, DateTime.UtcNow, null)); + actor.Tell(new RedundancyStateChanged( + Nodes: new[] + { + new NodeRedundancyState(local, RedundancyRole.Primary, + IsClusterLeader: true, IsRoleLeaderForDriver: true, DateTime.UtcNow), + }, + CorrelationId.NewId())); + + AwaitAssert(() => publisher.Levels.ShouldContain((byte)250), + duration: TimeSpan.FromMilliseconds(500)); + } + + /// Verifies that a later Ok==true peer probe supersedes an earlier Ok==false + /// (recovery). A non-leader Secondary entry is used (no +10 bonus): after the true supersedes, + /// inputs (true, true, false) → 240, +0 → 240. + [Fact] + public void Probe_true_supersedes_earlier_false_publishes_240() + { + var publisher = new RecordingPublisher(); + var local = NodeId.Parse("secondary-node"); + var probe = Sys.ActorOf(Akka.Actor.Props.Create(() => + new StubDbHealth(new DbHealthProbeActor.DbHealthStatus(true, DateTime.UtcNow, null)))); + var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests( + serviceLevel: publisher, localNode: local, dbHealthProbe: probe, + staleWindow: TimeSpan.FromSeconds(30), probeFreshnessWindow: TimeSpan.FromSeconds(30))); + + actor.Tell(new DbHealthProbeActor.DbHealthStatus(true, DateTime.UtcNow, null)); + actor.Tell(new PeerOpcUaProbeActor.OpcUaProbeResult(local, Ok: false)); + actor.Tell(new PeerOpcUaProbeActor.OpcUaProbeResult(local, Ok: true)); + actor.Tell(new RedundancyStateChanged( + Nodes: new[] + { + new NodeRedundancyState(local, RedundancyRole.Secondary, + IsClusterLeader: false, IsRoleLeaderForDriver: false, DateTime.UtcNow), + }, + CorrelationId.NewId())); + + AwaitAssert(() => publisher.Levels.ShouldContain((byte)240), + duration: TimeSpan.FromMilliseconds(500)); + } + + /// Verifies that a peer probe result about a DIFFERENT node is ignored — it does not + /// affect MY OpcUaProbeOk(), which stays at its default true. A healthy + /// primary-leader thus still computes (true, true, false) → 240, +10 → 250. + [Fact] + public void Probe_about_a_different_node_is_ignored() + { + var publisher = new RecordingPublisher(); + var local = NodeId.Parse("primary-node"); + var probe = Sys.ActorOf(Akka.Actor.Props.Create(() => + new StubDbHealth(new DbHealthProbeActor.DbHealthStatus(true, DateTime.UtcNow, null)))); + var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests( + serviceLevel: publisher, localNode: local, dbHealthProbe: probe, + staleWindow: TimeSpan.FromSeconds(30), probeFreshnessWindow: TimeSpan.FromSeconds(30))); + + actor.Tell(new DbHealthProbeActor.DbHealthStatus(true, DateTime.UtcNow, null)); + actor.Tell(new PeerOpcUaProbeActor.OpcUaProbeResult(NodeId.Parse("someone-else"), Ok: false)); + actor.Tell(new RedundancyStateChanged( + Nodes: new[] + { + new NodeRedundancyState(local, RedundancyRole.Primary, + IsClusterLeader: true, IsRoleLeaderForDriver: true, DateTime.UtcNow), + }, + CorrelationId.NewId())); + + AwaitAssert(() => publisher.Levels.ShouldContain((byte)250), + duration: TimeSpan.FromMilliseconds(500)); + } + /// Verifies the legacy back-compat seam: with no DB-health probe wired, the handler /// falls back to the old role-only switch (Primary + leader → 240). [Fact]