feat(redundancy): periodic HealthTick refreshes DB reachability via Ask/PipeTo

This commit is contained in:
Joseph Doherty
2026-06-15 13:15:26 -04:00
parent 5382eea9b5
commit 37b32a5623
2 changed files with 81 additions and 6 deletions
@@ -27,11 +27,18 @@ namespace ZB.MOM.WW.OtOpcUa.Runtime.OpcUa;
/// <c>Opc.Ua.Server</c>. The remaining piece is wiring those bindings to a real
/// <c>StandardServer</c> address space — tracked as F10b.
/// </summary>
public sealed class OpcUaPublishActor : ReceiveActor
public sealed class OpcUaPublishActor : ReceiveActor, IWithTimers
{
public const string DispatcherId = "opcua-synchronized-dispatcher";
public const string RedundancyStateTopic = "redundancy-state";
/// <summary>Gets or sets the timer scheduler for the periodic DB-health refresh tick.</summary>
public ITimerScheduler Timers { get; set; } = null!;
/// <summary>Self-tick that drives the periodic Ask of the local <see cref="DbHealthProbeActor"/>
/// for its cached status. Private singleton — the actor pumps this for itself, no external sender.</summary>
private sealed class HealthTick { public static readonly HealthTick Instance = new(); private HealthTick() { } }
public sealed record AttributeValueUpdate(string NodeId, object? Value, OpcUaQuality Quality, DateTime TimestampUtc);
/// <summary>Carries the full Part 9 condition state for a scripted alarm to the sink. The
@@ -60,6 +67,7 @@ public sealed class OpcUaPublishActor : ReceiveActor
private readonly IActorRef? _dbHealthProbe;
private readonly TimeSpan _staleWindow;
private readonly TimeSpan _probeFreshnessWindow;
private readonly TimeSpan _healthTickInterval;
private readonly Akka.Cluster.Cluster _cluster = Akka.Cluster.Cluster.Get(Context.System);
private readonly ILoggingAdapter _log = Context.GetLogger();
@@ -100,6 +108,9 @@ public sealed class OpcUaPublishActor : ReceiveActor
/// <param name="probeFreshnessWindow">The window beyond which a peer's OPC UA probe verdict about
/// this node is considered stale (and thus given the benefit of the doubt rather than demoting);
/// defaults to 30 seconds.</param>
/// <param name="healthTickInterval">The period between self-driven DB-health refresh ticks (each
/// Asks <paramref name="dbHealthProbe"/> for its cached status); defaults to 5 seconds. No timer is
/// started when <paramref name="dbHealthProbe"/> is null.</param>
public static Props Props(
IOpcUaAddressSpaceSink? sink = null,
IServiceLevelPublisher? serviceLevel = null,
@@ -108,7 +119,8 @@ public sealed class OpcUaPublishActor : ReceiveActor
Phase7Applier? applier = null,
IActorRef? dbHealthProbe = null,
TimeSpan? staleWindow = null,
TimeSpan? probeFreshnessWindow = null) =>
TimeSpan? probeFreshnessWindow = null,
TimeSpan? healthTickInterval = null) =>
Akka.Actor.Props.Create(() => new OpcUaPublishActor(
sink ?? NullOpcUaAddressSpaceSink.Instance,
serviceLevel ?? NullServiceLevelPublisher.Instance,
@@ -118,7 +130,8 @@ public sealed class OpcUaPublishActor : ReceiveActor
applier,
dbHealthProbe,
staleWindow,
probeFreshnessWindow)).WithDispatcher(DispatcherId);
probeFreshnessWindow,
healthTickInterval)).WithDispatcher(DispatcherId);
/// <summary>Test-only Props that omits the pinned-dispatcher requirement and skips the
/// DPS subscribe so unit tests can spin up the actor on a vanilla TestKit cluster.</summary>
@@ -135,6 +148,9 @@ public sealed class OpcUaPublishActor : ReceiveActor
/// <param name="probeFreshnessWindow">The window beyond which a peer's OPC UA probe verdict about
/// this node is considered stale (and thus given the benefit of the doubt rather than demoting);
/// defaults to 30 seconds.</param>
/// <param name="healthTickInterval">The period between self-driven DB-health refresh ticks (each
/// Asks <paramref name="dbHealthProbe"/> for its cached status); defaults to 5 seconds. No timer is
/// started when <paramref name="dbHealthProbe"/> is null.</param>
public static Props PropsForTests(
IOpcUaAddressSpaceSink? sink = null,
IServiceLevelPublisher? serviceLevel = null,
@@ -144,7 +160,8 @@ public sealed class OpcUaPublishActor : ReceiveActor
Phase7Applier? applier = null,
IActorRef? dbHealthProbe = null,
TimeSpan? staleWindow = null,
TimeSpan? probeFreshnessWindow = null) =>
TimeSpan? probeFreshnessWindow = null,
TimeSpan? healthTickInterval = null) =>
Akka.Actor.Props.Create(() => new OpcUaPublishActor(
sink ?? NullOpcUaAddressSpaceSink.Instance,
serviceLevel ?? NullServiceLevelPublisher.Instance,
@@ -154,7 +171,8 @@ public sealed class OpcUaPublishActor : ReceiveActor
applier,
dbHealthProbe,
staleWindow,
probeFreshnessWindow));
probeFreshnessWindow,
healthTickInterval));
/// <summary>Initializes a new instance of the <see cref="OpcUaPublishActor"/> class.</summary>
/// <param name="sink">The OPC UA address space sink.</param>
@@ -170,6 +188,9 @@ public sealed class OpcUaPublishActor : ReceiveActor
/// <param name="probeFreshnessWindow">The window beyond which a peer's OPC UA probe verdict about
/// this node is considered stale (and thus given the benefit of the doubt rather than demoting);
/// defaults to 30 seconds.</param>
/// <param name="healthTickInterval">The period between self-driven DB-health refresh ticks (each
/// Asks <paramref name="dbHealthProbe"/> for its cached status); defaults to 5 seconds. No timer is
/// started when <paramref name="dbHealthProbe"/> is null.</param>
public OpcUaPublishActor(
IOpcUaAddressSpaceSink sink,
IServiceLevelPublisher serviceLevel,
@@ -179,7 +200,8 @@ public sealed class OpcUaPublishActor : ReceiveActor
Phase7Applier? applier = null,
IActorRef? dbHealthProbe = null,
TimeSpan? staleWindow = null,
TimeSpan? probeFreshnessWindow = null)
TimeSpan? probeFreshnessWindow = null,
TimeSpan? healthTickInterval = null)
{
_sink = sink;
_serviceLevel = serviceLevel;
@@ -190,6 +212,7 @@ public sealed class OpcUaPublishActor : ReceiveActor
_dbHealthProbe = dbHealthProbe;
_staleWindow = staleWindow ?? TimeSpan.FromSeconds(30);
_probeFreshnessWindow = probeFreshnessWindow ?? TimeSpan.FromSeconds(30);
_healthTickInterval = healthTickInterval ?? TimeSpan.FromSeconds(5);
Receive<AttributeValueUpdate>(HandleAttributeUpdate);
Receive<AlarmStateUpdate>(HandleAlarmUpdate);
@@ -197,6 +220,7 @@ public sealed class OpcUaPublishActor : ReceiveActor
Receive<ServiceLevelChanged>(HandleServiceLevelChanged);
Receive<RedundancyStateChanged>(HandleRedundancyStateChanged);
Receive<DbHealthProbeActor.DbHealthStatus>(HandleDbHealthStatus);
Receive<HealthTick>(_ => OnHealthTick());
Receive<PeerOpcUaProbeActor.OpcUaProbeResult>(HandlePeerProbe);
Receive<SubscribeAck>(_ => { /* PubSub ack */ });
}
@@ -208,6 +232,16 @@ public sealed class OpcUaPublishActor : ReceiveActor
{
DistributedPubSub.Get(Context.System).Mediator.Tell(new Subscribe(RedundancyStateTopic, Self));
}
// Production delivery of DB-health: when a probe is wired, kick an immediate refresh + start the
// periodic tick so _lastDbHealth gets populated and kept fresh without any external pump. Gated
// ONLY on the probe being present (independent of the DPS subscribe) so the calculator path works
// in tests too. No probe → stay on the legacy role-only seam (no timer).
if (_dbHealthProbe is not null)
{
Self.Tell(HealthTick.Instance); // immediate first refresh
Timers.StartPeriodicTimer("db-health", HealthTick.Instance, _healthTickInterval);
}
}
private void HandleAttributeUpdate(AttributeValueUpdate msg)
@@ -396,6 +430,23 @@ public sealed class OpcUaPublishActor : ReceiveActor
RecomputeServiceLevel();
}
/// <summary>Periodic self-tick: Asks the local <see cref="DbHealthProbeActor"/> for its cached
/// status and pipes the reply back to <see cref="Self"/>, where <see cref="HandleDbHealthStatus"/>
/// caches it + recomputes. A hung/late probe (Ask timeout) pipes a <c>Reachable=false</c> status so
/// the node FAIL-SAFE-DEMOTES rather than freezing the last-known-good. The continuation only
/// CONSTRUCTS a record off the actor thread — the actual state mutation happens on the actor thread
/// when the piped <see cref="DbHealthProbeActor.DbHealthStatus"/> is received, so no actor field is
/// touched here.</summary>
private void OnHealthTick()
{
_dbHealthProbe!.Ask<DbHealthProbeActor.DbHealthStatus>(
DbHealthProbeActor.GetStatus.Instance, TimeSpan.FromSeconds(1))
.ContinueWith(t => t.IsCompletedSuccessfully
? t.Result
: new DbHealthProbeActor.DbHealthStatus(false, DateTime.UtcNow, "db-health ask timeout"))
.PipeTo(Self);
}
/// <summary>Records a peer's OPC UA probe verdict about THIS node and recomputes the local
/// ServiceLevel. The probe's <see cref="PeerOpcUaProbeActor.OpcUaProbeResult.NodeId"/> is the
/// target that was probed, so a result whose <c>NodeId</c> is not this node is about a peer and