diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/OpcUa/OpcUaPublishActor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/OpcUa/OpcUaPublishActor.cs index e5c5f227..9336560a 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/OpcUa/OpcUaPublishActor.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/OpcUa/OpcUaPublishActor.cs @@ -1,7 +1,9 @@ using Akka.Actor; +using Akka.Cluster; using Akka.Cluster.Tools.PublishSubscribe; using Akka.Event; using Microsoft.EntityFrameworkCore; +using ZB.MOM.WW.OtOpcUa.Cluster.Redundancy; using ZB.MOM.WW.OtOpcUa.Commons.Messages.Redundancy; using ZB.MOM.WW.OtOpcUa.Commons.Observability; using ZB.MOM.WW.OtOpcUa.Commons.OpcUa; @@ -9,6 +11,7 @@ using ZB.MOM.WW.OtOpcUa.Commons.Types; using ZB.MOM.WW.OtOpcUa.Configuration; using ZB.MOM.WW.OtOpcUa.OpcUaServer; using ZB.MOM.WW.OtOpcUa.Runtime.Drivers; +using ZB.MOM.WW.OtOpcUa.Runtime.Health; namespace ZB.MOM.WW.OtOpcUa.Runtime.OpcUa; @@ -54,10 +57,15 @@ public sealed class OpcUaPublishActor : ReceiveActor private readonly NodeId? _localNode; private readonly IDbContextFactory? _dbFactory; private readonly Phase7Applier? _applier; + private readonly IActorRef? _dbHealthProbe; + private readonly TimeSpan _staleWindow; + private readonly Akka.Cluster.Cluster _cluster = Akka.Cluster.Cluster.Get(Context.System); private readonly ILoggingAdapter _log = Context.GetLogger(); private int _writes; private byte _lastServiceLevel; + private DbHealthProbeActor.DbHealthStatus? _lastDbHealth; + private RedundancyStateChanged? _lastSnapshot; private Phase7CompositionResult _lastApplied = new( Array.Empty(), Array.Empty(), @@ -74,25 +82,35 @@ public sealed class OpcUaPublishActor : ReceiveActor /// redundancy-state DPS topic so cluster transitions drive the local ServiceLevel /// publish path. When + are supplied, /// reads the latest deployment artifact + drives the - /// applier through the sink. + /// applier through the sink. When is supplied the local + /// ServiceLevel is computed via from real DB-health + + /// staleness + role-leader inputs; otherwise the legacy role-only switch is used. /// The OPC UA address space sink. /// The service level publisher. /// The local cluster node ID. /// The optional database context factory. /// The optional Phase 7 applier. + /// The optional ref; when null the + /// legacy role-only ServiceLevel seam is used until a arrives. + /// The window beyond which a DB-health sample or redundancy snapshot is + /// considered stale; defaults to 30 seconds. public static Props Props( IOpcUaAddressSpaceSink? sink = null, IServiceLevelPublisher? serviceLevel = null, NodeId? localNode = null, IDbContextFactory? dbFactory = null, - Phase7Applier? applier = null) => + Phase7Applier? applier = null, + IActorRef? dbHealthProbe = null, + TimeSpan? staleWindow = null) => Akka.Actor.Props.Create(() => new OpcUaPublishActor( sink ?? NullOpcUaAddressSpaceSink.Instance, serviceLevel ?? NullServiceLevelPublisher.Instance, subscribeRedundancyTopic: true, localNode, dbFactory, - applier)).WithDispatcher(DispatcherId); + applier, + dbHealthProbe, + staleWindow)).WithDispatcher(DispatcherId); /// Test-only Props that omits the pinned-dispatcher requirement and skips the /// DPS subscribe so unit tests can spin up the actor on a vanilla TestKit cluster. @@ -102,20 +120,28 @@ public sealed class OpcUaPublishActor : ReceiveActor /// The local cluster node ID. /// The optional database context factory. /// The optional Phase 7 applier. + /// The optional ref; when null the + /// legacy role-only ServiceLevel seam is used until a arrives. + /// The window beyond which a DB-health sample or redundancy snapshot is + /// considered stale; defaults to 30 seconds. public static Props PropsForTests( IOpcUaAddressSpaceSink? sink = null, IServiceLevelPublisher? serviceLevel = null, bool subscribeRedundancyTopic = false, NodeId? localNode = null, IDbContextFactory? dbFactory = null, - Phase7Applier? applier = null) => + Phase7Applier? applier = null, + IActorRef? dbHealthProbe = null, + TimeSpan? staleWindow = null) => Akka.Actor.Props.Create(() => new OpcUaPublishActor( sink ?? NullOpcUaAddressSpaceSink.Instance, serviceLevel ?? NullServiceLevelPublisher.Instance, subscribeRedundancyTopic, localNode, dbFactory, - applier)); + applier, + dbHealthProbe, + staleWindow)); /// Initializes a new instance of the class. /// The OPC UA address space sink. @@ -124,13 +150,19 @@ public sealed class OpcUaPublishActor : ReceiveActor /// The local cluster node ID. /// The optional database context factory. /// The optional Phase 7 applier. + /// The optional ref; when null the + /// legacy role-only ServiceLevel seam is used until a arrives. + /// The window beyond which a DB-health sample or redundancy snapshot is + /// considered stale; defaults to 30 seconds. public OpcUaPublishActor( IOpcUaAddressSpaceSink sink, IServiceLevelPublisher serviceLevel, bool subscribeRedundancyTopic, NodeId? localNode, IDbContextFactory? dbFactory = null, - Phase7Applier? applier = null) + Phase7Applier? applier = null, + IActorRef? dbHealthProbe = null, + TimeSpan? staleWindow = null) { _sink = sink; _serviceLevel = serviceLevel; @@ -138,12 +170,15 @@ public sealed class OpcUaPublishActor : ReceiveActor _localNode = localNode; _dbFactory = dbFactory; _applier = applier; + _dbHealthProbe = dbHealthProbe; + _staleWindow = staleWindow ?? TimeSpan.FromSeconds(30); Receive(HandleAttributeUpdate); Receive(HandleAlarmUpdate); Receive(HandleRebuild); Receive(HandleServiceLevelChanged); Receive(HandleRedundancyStateChanged); + Receive(HandleDbHealthStatus); Receive(_ => { /* PubSub ack */ }); } @@ -319,30 +354,91 @@ public sealed class OpcUaPublishActor : ReceiveActor } } - /// - /// Compute a coarse ServiceLevel from the cluster snapshot and forward to the - /// . This is a placeholder for F10b's full health - /// aggregation — for now we surface "primary-leader → 240, secondary → 100, detached → 0" - /// so the local SDK at least reflects role state. The full - /// path (with DB-reachable, OPC UA probe inputs) lives in RedundancyStateActor on - /// admin nodes; this driver-side mirror exists so each node's own SDK exposes a sensible - /// ServiceLevel without round-tripping back through the admin singleton. - /// + /// Caches the latest redundancy snapshot and recomputes the local ServiceLevel. + /// The actual byte is produced by — either via the + /// health-aware (once a DB-health probe + sample are + /// wired) or via the legacy role-only seam (back-compat / bootstrap). private void HandleRedundancyStateChanged(RedundancyStateChanged msg) { - if (_localNode is null) return; + _lastSnapshot = msg; + RecomputeServiceLevel(); + } - var local = msg.Nodes.FirstOrDefault(n => n.NodeId == _localNode.Value); - if (local is null) return; + /// Caches the latest DB-health sample and recomputes the local ServiceLevel. The + /// probe pushes these (or the actor Asks for them); either way the freshest sample feeds the + /// calculator's DbReachable/Stale inputs. + private void HandleDbHealthStatus(DbHealthProbeActor.DbHealthStatus msg) + { + _lastDbHealth = msg; + RecomputeServiceLevel(); + } - byte level = local.Role switch + /// + /// Computes the local OPC UA ServiceLevel and routes it through + /// (the dedup/publish/metric handler). The full path is + /// used once a DB-health probe is wired AND a sample has arrived; until then (and when no probe + /// is supplied at all) a legacy role-only seam keeps the historical "primary-leader → 240, + /// secondary → 100, detached → 0" behaviour. The calculator does not model Detached, so a + /// detached local node is guarded to 0 before either path runs. + /// + private void RecomputeServiceLevel() + { + if (_localNode is null || _lastSnapshot is null) return; + + var entry = _lastSnapshot.Nodes.FirstOrDefault(n => n.NodeId == _localNode.Value); + + // The calculator does NOT model Detached — a healthy detached node would wrongly compute + // 240, so guard it (and the missing-entry case) to 0 here. + if (entry is null || entry.Role == RedundancyRole.Detached) { - RedundancyRole.Primary when local.IsRoleLeaderForDriver => 240, - RedundancyRole.Primary => 200, - RedundancyRole.Secondary => 100, - RedundancyRole.Detached => 0, - _ => 0, - }; - Self.Tell(new ServiceLevelChanged(level)); + Self.Tell(new ServiceLevelChanged(0)); + return; + } + + // Legacy / back-compat seam: with no DB-health probe wired (or before the first sample + // arrives) fall back to the old role-only switch. This preserves historical behaviour and + // is the bootstrap value until the first DbHealthStatus lands. + if (_dbHealthProbe is null || _lastDbHealth is null) + { + Self.Tell(new ServiceLevelChanged(LegacyRoleOnly(entry))); + return; + } + + var now = DateTime.UtcNow; + var inputs = new NodeHealthInputs( + MemberState: SafeSelfStatus(), + DbReachable: _lastDbHealth.Reachable, + OpcUaProbeOk: true, // TODO(2b): wire the real OPC UA self-probe result here. + Stale: !_lastDbHealth.Reachable + || (now - _lastDbHealth.AsOfUtc) > _staleWindow + || (now - entry.AsOfUtc) > _staleWindow, + IsDriverRoleLeader: entry.IsRoleLeaderForDriver); + + Self.Tell(new ServiceLevelChanged(ServiceLevelCalculator.Compute(inputs))); + } + + /// The legacy role-only ServiceLevel switch (primary-leader → 240, primary → 200, + /// secondary → 100, _ → 0). Preserved as the back-compat / bootstrap seam. + private static byte LegacyRoleOnly(NodeRedundancyState entry) => entry.Role switch + { + RedundancyRole.Primary when entry.IsRoleLeaderForDriver => 240, + RedundancyRole.Primary => 200, + RedundancyRole.Secondary => 100, + _ => 0, + }; + + /// Reads this node's cluster , returning + /// if the cluster is unavailable (so the calculator treats it + /// as untrusted → 0 rather than throwing). + private MemberStatus SafeSelfStatus() + { + try + { + return _cluster.SelfMember.Status; + } + catch + { + return MemberStatus.Removed; + } } } diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/OpcUa/OpcUaPublishActorTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/OpcUa/OpcUaPublishActorTests.cs index 19ce2d73..1dcd9ddd 100644 --- a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/OpcUa/OpcUaPublishActorTests.cs +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/OpcUa/OpcUaPublishActorTests.cs @@ -5,6 +5,7 @@ using Xunit; using ZB.MOM.WW.OtOpcUa.Commons.Messages.Redundancy; using ZB.MOM.WW.OtOpcUa.Commons.OpcUa; using ZB.MOM.WW.OtOpcUa.Commons.Types; +using ZB.MOM.WW.OtOpcUa.Runtime.Health; using ZB.MOM.WW.OtOpcUa.Runtime.OpcUa; using ZB.MOM.WW.OtOpcUa.Runtime.Tests.Harness; @@ -157,6 +158,181 @@ public sealed class OpcUaPublishActorTests : RuntimeActorTestBase duration: TimeSpan.FromMilliseconds(500)); } + /// Verifies that the calculator path computes 250 for a healthy primary role-leader + /// (basis 240 from DB-reachable + probe-ok + fresh, +10 driver-role-leader bonus). + [Fact] + public void Calculator_path_healthy_primary_leader_publishes_250() + { + var publisher = new RecordingPublisher(); + var local = NodeId.Parse("primary-node"); + var probe = Sys.ActorOf(Akka.Actor.Props.Create(() => + new StubDbHealth(new DbHealthProbeActor.DbHealthStatus(true, DateTime.UtcNow, null)))); + var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests( + serviceLevel: publisher, localNode: local, dbHealthProbe: probe)); + + actor.Tell(new DbHealthProbeActor.DbHealthStatus(true, DateTime.UtcNow, null)); + actor.Tell(new RedundancyStateChanged( + Nodes: new[] + { + new NodeRedundancyState(local, RedundancyRole.Primary, + IsClusterLeader: true, IsRoleLeaderForDriver: true, DateTime.UtcNow), + }, + CorrelationId.NewId())); + + AwaitAssert(() => publisher.Levels.ShouldContain((byte)250), + duration: TimeSpan.FromMilliseconds(500)); + } + + /// Verifies that the calculator path computes 240 for a healthy non-leader secondary + /// (basis 240 from DB-reachable + probe-ok + fresh, no leader bonus). Documented change from the + /// legacy role-only path, which mapped Secondary → 100. + [Fact] + public void Calculator_path_healthy_secondary_publishes_240() + { + var publisher = new RecordingPublisher(); + var local = NodeId.Parse("secondary-node"); + var probe = Sys.ActorOf(Akka.Actor.Props.Create(() => + new StubDbHealth(new DbHealthProbeActor.DbHealthStatus(true, DateTime.UtcNow, null)))); + var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests( + serviceLevel: publisher, localNode: local, dbHealthProbe: probe)); + + actor.Tell(new DbHealthProbeActor.DbHealthStatus(true, DateTime.UtcNow, null)); + actor.Tell(new RedundancyStateChanged( + Nodes: new[] + { + new NodeRedundancyState(local, RedundancyRole.Secondary, + IsClusterLeader: false, IsRoleLeaderForDriver: false, DateTime.UtcNow), + }, + CorrelationId.NewId())); + + AwaitAssert(() => publisher.Levels.ShouldContain((byte)240), + duration: TimeSpan.FromMilliseconds(500)); + } + + /// Verifies that the calculator path computes 100 when the DB is unreachable + /// ((false,_,true) basis, stale via !DbReachable) for a non-leader secondary (no bonus). + [Fact] + public void Calculator_path_db_unreachable_publishes_100() + { + var publisher = new RecordingPublisher(); + var local = NodeId.Parse("secondary-node"); + var probe = Sys.ActorOf(Akka.Actor.Props.Create(() => + new StubDbHealth(new DbHealthProbeActor.DbHealthStatus(false, DateTime.UtcNow, "down")))); + var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests( + serviceLevel: publisher, localNode: local, dbHealthProbe: probe)); + + actor.Tell(new DbHealthProbeActor.DbHealthStatus(false, DateTime.UtcNow, "down")); + actor.Tell(new RedundancyStateChanged( + Nodes: new[] + { + new NodeRedundancyState(local, RedundancyRole.Secondary, + IsClusterLeader: false, IsRoleLeaderForDriver: false, DateTime.UtcNow), + }, + CorrelationId.NewId())); + + AwaitAssert(() => publisher.Levels.ShouldContain((byte)100), + duration: TimeSpan.FromMilliseconds(500)); + } + + /// Verifies that the calculator path computes 200 for a stale snapshot when the DB is + /// reachable + fresh but the redundancy entry's AsOfUtc is older than the stale window + /// ((true,_,true) basis) for a non-leader secondary (no bonus). + [Fact] + public void Calculator_path_stale_snapshot_publishes_200() + { + var publisher = new RecordingPublisher(); + var local = NodeId.Parse("secondary-node"); + var probe = Sys.ActorOf(Akka.Actor.Props.Create(() => + new StubDbHealth(new DbHealthProbeActor.DbHealthStatus(true, DateTime.UtcNow, null)))); + var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests( + serviceLevel: publisher, localNode: local, dbHealthProbe: probe, + staleWindow: TimeSpan.FromSeconds(2))); + + actor.Tell(new DbHealthProbeActor.DbHealthStatus(true, DateTime.UtcNow, null)); + actor.Tell(new RedundancyStateChanged( + Nodes: new[] + { + new NodeRedundancyState(local, RedundancyRole.Secondary, + IsClusterLeader: false, IsRoleLeaderForDriver: false, + DateTime.UtcNow - TimeSpan.FromMinutes(1)), + }, + CorrelationId.NewId())); + + AwaitAssert(() => publisher.Levels.ShouldContain((byte)200), + duration: TimeSpan.FromMilliseconds(500)); + } + + /// Verifies that a detached local node publishes 0 (the calculator does not model + /// Detached, so the handler guards it before the calculator path). The node first goes healthy + /// (250) so the dedup'd transition down to 0 is observable. + [Fact] + public void Calculator_path_detached_publishes_0() + { + var publisher = new RecordingPublisher(); + var local = NodeId.Parse("detached-node"); + var probe = Sys.ActorOf(Akka.Actor.Props.Create(() => + new StubDbHealth(new DbHealthProbeActor.DbHealthStatus(true, DateTime.UtcNow, null)))); + var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests( + serviceLevel: publisher, localNode: local, dbHealthProbe: probe)); + + actor.Tell(new DbHealthProbeActor.DbHealthStatus(true, DateTime.UtcNow, null)); + // First go healthy primary-leader (250) so the transition down to 0 is not dedup'd against + // the initial 0. + actor.Tell(new RedundancyStateChanged( + Nodes: new[] + { + new NodeRedundancyState(local, RedundancyRole.Primary, + IsClusterLeader: true, IsRoleLeaderForDriver: true, DateTime.UtcNow), + }, + CorrelationId.NewId())); + AwaitAssert(() => publisher.Levels.ShouldContain((byte)250), + duration: TimeSpan.FromMilliseconds(500)); + + // Now detach — expect the guard to drive ServiceLevel down to 0. + actor.Tell(new RedundancyStateChanged( + Nodes: new[] + { + new NodeRedundancyState(local, RedundancyRole.Detached, + IsClusterLeader: false, IsRoleLeaderForDriver: false, DateTime.UtcNow), + }, + CorrelationId.NewId())); + + AwaitAssert(() => publisher.Levels.ShouldContain((byte)0), + duration: TimeSpan.FromMilliseconds(500)); + } + + /// Verifies the legacy back-compat seam: with no DB-health probe wired, the handler + /// falls back to the old role-only switch (Primary + leader → 240). + [Fact] + public void Legacy_path_no_db_probe_keeps_role_only() + { + var publisher = new RecordingPublisher(); + var local = NodeId.Parse("primary-node"); + var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests( + serviceLevel: publisher, localNode: local)); + + actor.Tell(new RedundancyStateChanged( + Nodes: new[] + { + new NodeRedundancyState(local, RedundancyRole.Primary, + IsClusterLeader: true, IsRoleLeaderForDriver: true, DateTime.UtcNow), + }, + CorrelationId.NewId())); + + AwaitAssert(() => publisher.Levels.ShouldContain((byte)240), + duration: TimeSpan.FromMilliseconds(500)); + } + + /// Stub DB-health probe actor that answers + /// with a fixed status, so the calculator path is deterministic without a real timer. + private sealed class StubDbHealth : Akka.Actor.ReceiveActor + { + /// Initializes a new instance of the class. + /// The fixed DB-health status to reply with. + public StubDbHealth(DbHealthProbeActor.DbHealthStatus status) => + Receive(_ => Sender.Tell(status)); + } + /// Test implementation of IOpcUaAddressSpaceSink that records calls. private sealed class RecordingSink : IOpcUaAddressSpaceSink {