diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/OpcUa/OpcUaPublishActor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/OpcUa/OpcUaPublishActor.cs index 4b3e65ae..cf18c778 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/OpcUa/OpcUaPublishActor.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/OpcUa/OpcUaPublishActor.cs @@ -27,11 +27,18 @@ namespace ZB.MOM.WW.OtOpcUa.Runtime.OpcUa; /// Opc.Ua.Server. The remaining piece is wiring those bindings to a real /// StandardServer address space — tracked as F10b. /// -public sealed class OpcUaPublishActor : ReceiveActor +public sealed class OpcUaPublishActor : ReceiveActor, IWithTimers { public const string DispatcherId = "opcua-synchronized-dispatcher"; public const string RedundancyStateTopic = "redundancy-state"; + /// Gets or sets the timer scheduler for the periodic DB-health refresh tick. + public ITimerScheduler Timers { get; set; } = null!; + + /// Self-tick that drives the periodic Ask of the local + /// for its cached status. Private singleton — the actor pumps this for itself, no external sender. + private sealed class HealthTick { public static readonly HealthTick Instance = new(); private HealthTick() { } } + public sealed record AttributeValueUpdate(string NodeId, object? Value, OpcUaQuality Quality, DateTime TimestampUtc); /// Carries the full Part 9 condition state for a scripted alarm to the sink. The @@ -60,6 +67,7 @@ public sealed class OpcUaPublishActor : ReceiveActor private readonly IActorRef? _dbHealthProbe; private readonly TimeSpan _staleWindow; private readonly TimeSpan _probeFreshnessWindow; + private readonly TimeSpan _healthTickInterval; private readonly Akka.Cluster.Cluster _cluster = Akka.Cluster.Cluster.Get(Context.System); private readonly ILoggingAdapter _log = Context.GetLogger(); @@ -100,6 +108,9 @@ public sealed class OpcUaPublishActor : ReceiveActor /// The window beyond which a peer's OPC UA probe verdict about /// this node is considered stale (and thus given the benefit of the doubt rather than demoting); /// defaults to 30 seconds. + /// The period between self-driven DB-health refresh ticks (each + /// Asks for its cached status); defaults to 5 seconds. No timer is + /// started when is null. public static Props Props( IOpcUaAddressSpaceSink? sink = null, IServiceLevelPublisher? serviceLevel = null, @@ -108,7 +119,8 @@ public sealed class OpcUaPublishActor : ReceiveActor Phase7Applier? applier = null, IActorRef? dbHealthProbe = null, TimeSpan? staleWindow = null, - TimeSpan? probeFreshnessWindow = null) => + TimeSpan? probeFreshnessWindow = null, + TimeSpan? healthTickInterval = null) => Akka.Actor.Props.Create(() => new OpcUaPublishActor( sink ?? NullOpcUaAddressSpaceSink.Instance, serviceLevel ?? NullServiceLevelPublisher.Instance, @@ -118,7 +130,8 @@ public sealed class OpcUaPublishActor : ReceiveActor applier, dbHealthProbe, staleWindow, - probeFreshnessWindow)).WithDispatcher(DispatcherId); + probeFreshnessWindow, + healthTickInterval)).WithDispatcher(DispatcherId); /// Test-only Props that omits the pinned-dispatcher requirement and skips the /// DPS subscribe so unit tests can spin up the actor on a vanilla TestKit cluster. @@ -135,6 +148,9 @@ public sealed class OpcUaPublishActor : ReceiveActor /// The window beyond which a peer's OPC UA probe verdict about /// this node is considered stale (and thus given the benefit of the doubt rather than demoting); /// defaults to 30 seconds. + /// The period between self-driven DB-health refresh ticks (each + /// Asks for its cached status); defaults to 5 seconds. No timer is + /// started when is null. public static Props PropsForTests( IOpcUaAddressSpaceSink? sink = null, IServiceLevelPublisher? serviceLevel = null, @@ -144,7 +160,8 @@ public sealed class OpcUaPublishActor : ReceiveActor Phase7Applier? applier = null, IActorRef? dbHealthProbe = null, TimeSpan? staleWindow = null, - TimeSpan? probeFreshnessWindow = null) => + TimeSpan? probeFreshnessWindow = null, + TimeSpan? healthTickInterval = null) => Akka.Actor.Props.Create(() => new OpcUaPublishActor( sink ?? NullOpcUaAddressSpaceSink.Instance, serviceLevel ?? NullServiceLevelPublisher.Instance, @@ -154,7 +171,8 @@ public sealed class OpcUaPublishActor : ReceiveActor applier, dbHealthProbe, staleWindow, - probeFreshnessWindow)); + probeFreshnessWindow, + healthTickInterval)); /// Initializes a new instance of the class. /// The OPC UA address space sink. @@ -170,6 +188,9 @@ public sealed class OpcUaPublishActor : ReceiveActor /// The window beyond which a peer's OPC UA probe verdict about /// this node is considered stale (and thus given the benefit of the doubt rather than demoting); /// defaults to 30 seconds. + /// The period between self-driven DB-health refresh ticks (each + /// Asks for its cached status); defaults to 5 seconds. No timer is + /// started when is null. public OpcUaPublishActor( IOpcUaAddressSpaceSink sink, IServiceLevelPublisher serviceLevel, @@ -179,7 +200,8 @@ public sealed class OpcUaPublishActor : ReceiveActor Phase7Applier? applier = null, IActorRef? dbHealthProbe = null, TimeSpan? staleWindow = null, - TimeSpan? probeFreshnessWindow = null) + TimeSpan? probeFreshnessWindow = null, + TimeSpan? healthTickInterval = null) { _sink = sink; _serviceLevel = serviceLevel; @@ -190,6 +212,7 @@ public sealed class OpcUaPublishActor : ReceiveActor _dbHealthProbe = dbHealthProbe; _staleWindow = staleWindow ?? TimeSpan.FromSeconds(30); _probeFreshnessWindow = probeFreshnessWindow ?? TimeSpan.FromSeconds(30); + _healthTickInterval = healthTickInterval ?? TimeSpan.FromSeconds(5); Receive(HandleAttributeUpdate); Receive(HandleAlarmUpdate); @@ -197,6 +220,7 @@ public sealed class OpcUaPublishActor : ReceiveActor Receive(HandleServiceLevelChanged); Receive(HandleRedundancyStateChanged); Receive(HandleDbHealthStatus); + Receive(_ => OnHealthTick()); Receive(HandlePeerProbe); Receive(_ => { /* PubSub ack */ }); } @@ -208,6 +232,16 @@ public sealed class OpcUaPublishActor : ReceiveActor { DistributedPubSub.Get(Context.System).Mediator.Tell(new Subscribe(RedundancyStateTopic, Self)); } + + // Production delivery of DB-health: when a probe is wired, kick an immediate refresh + start the + // periodic tick so _lastDbHealth gets populated and kept fresh without any external pump. Gated + // ONLY on the probe being present (independent of the DPS subscribe) so the calculator path works + // in tests too. No probe → stay on the legacy role-only seam (no timer). + if (_dbHealthProbe is not null) + { + Self.Tell(HealthTick.Instance); // immediate first refresh + Timers.StartPeriodicTimer("db-health", HealthTick.Instance, _healthTickInterval); + } } private void HandleAttributeUpdate(AttributeValueUpdate msg) @@ -396,6 +430,23 @@ public sealed class OpcUaPublishActor : ReceiveActor RecomputeServiceLevel(); } + /// Periodic self-tick: Asks the local for its cached + /// status and pipes the reply back to , where + /// caches it + recomputes. A hung/late probe (Ask timeout) pipes a Reachable=false status so + /// the node FAIL-SAFE-DEMOTES rather than freezing the last-known-good. The continuation only + /// CONSTRUCTS a record off the actor thread — the actual state mutation happens on the actor thread + /// when the piped is received, so no actor field is + /// touched here. + private void OnHealthTick() + { + _dbHealthProbe!.Ask( + DbHealthProbeActor.GetStatus.Instance, TimeSpan.FromSeconds(1)) + .ContinueWith(t => t.IsCompletedSuccessfully + ? t.Result + : new DbHealthProbeActor.DbHealthStatus(false, DateTime.UtcNow, "db-health ask timeout")) + .PipeTo(Self); + } + /// Records a peer's OPC UA probe verdict about THIS node and recomputes the local /// ServiceLevel. The probe's is the /// target that was probed, so a result whose NodeId is not this node is about a peer and diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/OpcUa/OpcUaPublishActorTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/OpcUa/OpcUaPublishActorTests.cs index 4dc5d16e..7d829ec3 100644 --- a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/OpcUa/OpcUaPublishActorTests.cs +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/OpcUa/OpcUaPublishActorTests.cs @@ -505,6 +505,30 @@ public sealed class OpcUaPublishActorTests : RuntimeActorTestBase duration: TimeSpan.FromMilliseconds(500)); } + /// Verifies that the periodic HealthTick Asks the local + /// for status and pipes the reply back to itself — populating _lastDbHealth in production WITHOUT + /// any external pump (no direct Tell). With a healthy DB + /// reply and a primary-leader snapshot, the calculator path therefore reaches 250. + [Fact] + public void HealthTick_asks_db_probe_and_publishes_calculator_byte() + { + var publisher = new RecordingPublisher(); + var local = NodeId.Parse("primary-node"); + var db = Sys.ActorOf(Akka.Actor.Props.Create(() => new StubDbHealth( + new DbHealthProbeActor.DbHealthStatus(true, DateTime.UtcNow, null)))); + var actor = Sys.ActorOf(OpcUaPublishActor.PropsForTests( + serviceLevel: publisher, localNode: local, dbHealthProbe: db, + staleWindow: TimeSpan.FromSeconds(30), probeFreshnessWindow: TimeSpan.FromSeconds(30), + healthTickInterval: TimeSpan.FromMilliseconds(100))); + actor.Tell(new RedundancyStateChanged(new[] + { + new NodeRedundancyState(local, RedundancyRole.Primary, + IsClusterLeader: true, IsRoleLeaderForDriver: true, DateTime.UtcNow), + }, CorrelationId.NewId())); + // No direct DbHealthStatus Tell — the periodic HealthTick Ask must populate it → 250. + AwaitAssert(() => publisher.Levels.ShouldContain((byte)250), TimeSpan.FromSeconds(2)); + } + /// Stub DB-health probe actor that answers /// with a fixed status, so the calculator path is deterministic without a real timer. private sealed class StubDbHealth : Akka.Actor.ReceiveActor