feat(runtime): publish DriverHealthChanged via DriverInstanceActor

- IDriverHealthPublisher in Core.Abstractions + NullDriverHealthPublisher
  no-op for tests/dev-stub paths.
- AkkaDriverHealthPublisher in Runtime forwards to the cluster-wide
  `driver-health` DPS topic.
- DriverInstanceActor instrumented to publish snapshots on every
  observable state change + a periodic 30s heartbeat so the AdminUI
  snapshot store warms up for newly-joined SignalR clients.
- Sliding 5-minute Faulted-count tracked per actor via Queue<DateTime>.
- DriverHostActor.SpawnChild threads clusterId (_localNode.Value) and
  the health publisher down to every DriverInstanceActor child.
- ServiceCollectionExtensions.AddOtOpcUaRuntime registers
  AkkaDriverHealthPublisher as IDriverHealthPublisher singleton.
This commit is contained in:
Joseph Doherty
2026-05-28 10:22:44 -04:00
parent 29370fde3c
commit 4203b84d51
5 changed files with 205 additions and 10 deletions
@@ -7,6 +7,13 @@ using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
namespace ZB.MOM.WW.OtOpcUa.Runtime.Drivers;
// private timer key type — file-scoped so the name stays unique per-file
file sealed class HealthPollTick
{
public static readonly HealthPollTick Instance = new();
private HealthPollTick() { }
}
/// <summary>
/// Akka wrapper for a single <see cref="IDriver"/> instance. States:
///
@@ -47,12 +54,21 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
private RetryConnect() { }
}
/// <summary>Interval between periodic health-poll heartbeats sent to the snapshot store.</summary>
public static readonly TimeSpan HealthPollInterval = TimeSpan.FromSeconds(30);
private readonly IDriver _driver;
private readonly string _driverInstanceId;
private readonly string _clusterId;
private readonly IDriverHealthPublisher _healthPublisher;
private readonly TimeSpan _reconnectInterval;
private readonly ILoggingAdapter _log = Context.GetLogger();
private string? _currentConfigJson;
/// <summary>Timestamps of recent Faulted-state transitions; used to compute the 5-minute error count.</summary>
private readonly Queue<DateTime> _faultTimestamps = new();
private readonly object _faultLock = new();
/// <summary>Active subscription handle (null when not subscribed). Lifetime is one-per-actor —
/// re-subscribe across reconnects is the consumer's responsibility today (subscribe-once
/// semantics keep the actor simple; mux-driven re-subscribe is tracked as F8b/#113).</summary>
@@ -70,8 +86,22 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
/// <param name="driver">The driver instance to wrap.</param>
/// <param name="reconnectInterval">Optional interval for reconnection attempts; defaults to 10 seconds.</param>
/// <param name="startStubbed">If true, the actor starts in stub mode for testing or unavailable platforms.</param>
public static Props Props(IDriver driver, TimeSpan? reconnectInterval = null, bool startStubbed = false) =>
Akka.Actor.Props.Create(() => new DriverInstanceActor(driver, reconnectInterval ?? DefaultReconnectInterval, startStubbed));
/// <param name="healthPublisher">Optional health publisher; defaults to <see cref="NullDriverHealthPublisher"/> so tests and
/// stub paths don't need to provide one.</param>
/// <param name="clusterId">Optional cluster identifier forwarded in <see cref="DriverHealthChanged"/> messages;
/// defaults to an empty string when not provided (e.g. in unit tests).</param>
public static Props Props(
IDriver driver,
TimeSpan? reconnectInterval = null,
bool startStubbed = false,
IDriverHealthPublisher? healthPublisher = null,
string? clusterId = null) =>
Akka.Actor.Props.Create(() => new DriverInstanceActor(
driver,
reconnectInterval ?? DefaultReconnectInterval,
startStubbed,
healthPublisher ?? NullDriverHealthPublisher.Instance,
clusterId ?? string.Empty));
/// <summary>
/// Returns true when the driver should boot in DEV-STUB mode based on host platform and
@@ -101,10 +131,19 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
/// <param name="driver">The driver instance to wrap and manage.</param>
/// <param name="reconnectInterval">Interval between reconnection attempts.</param>
/// <param name="startStubbed">If true, start in stub mode for testing or unavailable platforms.</param>
public DriverInstanceActor(IDriver driver, TimeSpan reconnectInterval, bool startStubbed = false)
/// <param name="healthPublisher">Sink for health-change notifications; must not be null.</param>
/// <param name="clusterId">Cluster identifier forwarded in health snapshots.</param>
public DriverInstanceActor(
IDriver driver,
TimeSpan reconnectInterval,
bool startStubbed = false,
IDriverHealthPublisher? healthPublisher = null,
string? clusterId = null)
{
_driver = driver;
_driverInstanceId = driver.DriverInstanceId;
_clusterId = clusterId ?? string.Empty;
_healthPublisher = healthPublisher ?? NullDriverHealthPublisher.Instance;
_reconnectInterval = reconnectInterval;
OtOpcUaTelemetry.DriverInstanceLifecycle.Add(1,
new KeyValuePair<string, object?>("event", startStubbed ? "spawn_stub" : "spawn"),
@@ -121,6 +160,16 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
}
}
/// <inheritdoc />
protected override void PreStart()
{
// Warm up the snapshot store immediately so AdminUI sees current state as soon as the
// actor starts, before any state transition fires. Also start the periodic heartbeat so
// long-lived Healthy drivers keep their snapshot fresh for newly-joined SignalR clients.
PublishHealthSnapshot();
Timers.StartPeriodicTimer("health-poll", HealthPollTick.Instance, HealthPollInterval);
}
private void Stubbed()
{
// Stubbed drivers accept the standard message contracts but return deterministic
@@ -129,6 +178,7 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
Receive<ApplyDelta>(msg => Sender.Tell(new ApplyResult(true, "stubbed", msg.Correlation)));
Receive<WriteAttribute>(_ => Sender.Tell(new WriteAttributeResult(true, "stubbed")));
Receive<DisconnectObserved>(_ => { /* stubbed drivers don't disconnect */ });
Receive<HealthPollTick>(_ => PublishHealthSnapshot());
}
private void Connecting()
@@ -138,12 +188,16 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
{
_log.Info("DriverInstance {Id}: connected", _driverInstanceId);
Become(Connected);
PublishHealthSnapshot();
});
Receive<InitializeFailed>(msg =>
{
_log.Warning("DriverInstance {Id}: initialize failed: {Reason}", _driverInstanceId, msg.Reason);
RecordFault();
Become(Reconnecting);
PublishHealthSnapshot();
});
Receive<HealthPollTick>(_ => PublishHealthSnapshot());
}
private void Connected()
@@ -154,12 +208,15 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
_log.Warning("DriverInstance {Id}: disconnect observed ({Reason}); reconnecting",
_driverInstanceId, msg.Reason);
DetachSubscription();
RecordFault();
Become(Reconnecting);
PublishHealthSnapshot();
});
ReceiveAsync<WriteAttribute>(HandleWriteAsync);
ReceiveAsync<Subscribe>(HandleSubscribeAsync);
ReceiveAsync<Unsubscribe>(_ => UnsubscribeAsync());
Receive<DataChangeForward>(OnDataChangeForward);
Receive<HealthPollTick>(_ => PublishHealthSnapshot());
}
private void Reconnecting()
@@ -170,8 +227,10 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
Timers.Cancel("retry-connect");
_log.Info("DriverInstance {Id}: reconnected", _driverInstanceId);
Become(Connected);
PublishHealthSnapshot();
});
Receive<InitializeFailed>(_ => { /* keep retrying via timer */ });
Receive<HealthPollTick>(_ => PublishHealthSnapshot());
Timers.StartPeriodicTimer("retry-connect", RetryConnect.Instance, _reconnectInterval);
}
@@ -336,6 +395,48 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
private static bool IsGoodStatus(uint statusCode) => (statusCode >> 30) == 0;
/// <summary>
/// Records a transition into a Faulted / error state for the 5-minute sliding counter.
/// Thread-safe: called from actor message-handling (single-threaded) but guard is cheap.
/// </summary>
private void RecordFault()
{
lock (_faultLock)
{
_faultTimestamps.Enqueue(DateTime.UtcNow);
}
}
/// <summary>Returns how many fault transitions occurred in the last 5 minutes.</summary>
private int ErrorCount5Min()
{
var cutoff = DateTime.UtcNow.AddMinutes(-5);
lock (_faultLock)
{
while (_faultTimestamps.Count > 0 && _faultTimestamps.Peek() < cutoff)
_faultTimestamps.Dequeue();
return _faultTimestamps.Count;
}
}
/// <summary>
/// Polls <see cref="IDriver.GetHealth"/> and forwards the snapshot to the health publisher.
/// Called on every observable state change and by the periodic <see cref="HealthPollTick"/>
/// so the AdminUI snapshot store is warmed up for newly-joined SignalR clients.
/// </summary>
private void PublishHealthSnapshot()
{
try
{
var health = _driver.GetHealth();
_healthPublisher.Publish(_clusterId, _driverInstanceId, health, ErrorCount5Min());
}
catch (Exception ex)
{
_log.Warning(ex, "DriverInstance {Id}: GetHealth threw during health publish; skipping", _driverInstanceId);
}
}
/// <inheritdoc />
protected override void PostStop()
{