diff --git a/src/Core/ZB.MOM.WW.OtOpcUa.Core.Abstractions/IDriverHealthPublisher.cs b/src/Core/ZB.MOM.WW.OtOpcUa.Core.Abstractions/IDriverHealthPublisher.cs new file mode 100644 index 00000000..fa2516db --- /dev/null +++ b/src/Core/ZB.MOM.WW.OtOpcUa.Core.Abstractions/IDriverHealthPublisher.cs @@ -0,0 +1,39 @@ +namespace ZB.MOM.WW.OtOpcUa.Core.Abstractions; + +/// +/// Sink for driver-health state-change notifications. The runtime DI wires the +/// Akka-DistributedPubSub-backed implementation; tests and dev-stub paths use +/// to opt out without changing call sites. +/// +public interface IDriverHealthPublisher +{ + /// + /// Publishes a health snapshot for one driver instance. Implementations must be + /// non-blocking and tolerant of being called from any thread. + /// + void Publish( + string clusterId, + string driverInstanceId, + DriverHealth health, + int errorCount5Min); +} + +/// +/// Drop-in no-op for tests and dev-stub paths. Production wires the Akka-backed +/// implementation in the Runtime project. +/// +public sealed class NullDriverHealthPublisher : IDriverHealthPublisher +{ + /// Singleton instance. + public static readonly NullDriverHealthPublisher Instance = new(); + + private NullDriverHealthPublisher() { } + + /// + public void Publish( + string clusterId, + string driverInstanceId, + DriverHealth health, + int errorCount5Min) + { /* no-op */ } +} diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/AkkaDriverHealthPublisher.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/AkkaDriverHealthPublisher.cs new file mode 100644 index 00000000..f48fce0a --- /dev/null +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/AkkaDriverHealthPublisher.cs @@ -0,0 +1,37 @@ +using Akka.Actor; +using Akka.Cluster.Tools.PublishSubscribe; +using ZB.MOM.WW.OtOpcUa.Commons.Messages.Drivers; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; + +namespace ZB.MOM.WW.OtOpcUa.Runtime.Drivers; + +/// +/// Forwards transitions to the cluster-wide +/// driver-health DistributedPubSub topic. Consumed by the AdminUI +/// DriverStatusSignalRBridge. +/// +public sealed class AkkaDriverHealthPublisher : IDriverHealthPublisher +{ + /// The DistributedPubSub topic name for driver-health snapshots. + public const string TopicName = "driver-health"; + + private readonly ActorSystem _system; + + /// Initializes a new instance of . + /// The Akka actor system used to resolve the DPS mediator. + public AkkaDriverHealthPublisher(ActorSystem system) => _system = system; + + /// + public void Publish(string clusterId, string driverInstanceId, DriverHealth health, int errorCount5Min) + { + var msg = new DriverHealthChanged( + clusterId, + driverInstanceId, + health.State.ToString(), + health.LastSuccessfulRead, + health.LastError, + errorCount5Min, + DateTime.UtcNow); + DistributedPubSub.Get(_system).Mediator.Tell(new Publish(TopicName, msg)); + } +} diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs index d80fcb46..71139a28 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverHostActor.cs @@ -45,6 +45,7 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers private readonly IReadOnlySet _localRoles; private readonly IActorRef? _dependencyMux; private readonly IActorRef? _opcUaPublishActor; + private readonly IDriverHealthPublisher _healthPublisher; private readonly ILoggingAdapter _log = Context.GetLogger(); private RevisionHash? _currentRevision; @@ -71,6 +72,8 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers /// Optional set of roles assigned to the local node. /// Optional actor reference for dependency multiplexing. /// Optional actor reference for OPC UA publishing. + /// Optional driver-health publisher; defaults to + /// so test harnesses and smoke fixtures don't need to wire it. public static Props Props( IDbContextFactory dbFactory, CommonsNodeId localNode, @@ -78,9 +81,10 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers IDriverFactory? driverFactory = null, IReadOnlySet? localRoles = null, IActorRef? dependencyMux = null, - IActorRef? opcUaPublishActor = null) => + IActorRef? opcUaPublishActor = null, + IDriverHealthPublisher? healthPublisher = null) => Akka.Actor.Props.Create(() => new DriverHostActor( - dbFactory, localNode, coordinator, driverFactory, localRoles, dependencyMux, opcUaPublishActor)); + dbFactory, localNode, coordinator, driverFactory, localRoles, dependencyMux, opcUaPublishActor, healthPublisher)); /// Initializes a new DriverHostActor with the specified dependencies. /// Database context factory for configuration database access. @@ -90,6 +94,7 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers /// Optional set of roles assigned to the local node. /// Optional actor reference for dependency multiplexing. /// Optional actor reference for OPC UA publishing. + /// Optional driver-health publisher; defaults to . public DriverHostActor( IDbContextFactory dbFactory, CommonsNodeId localNode, @@ -97,7 +102,8 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers IDriverFactory? driverFactory = null, IReadOnlySet? localRoles = null, IActorRef? dependencyMux = null, - IActorRef? opcUaPublishActor = null) + IActorRef? opcUaPublishActor = null, + IDriverHealthPublisher? healthPublisher = null) { _dbFactory = dbFactory; _localNode = localNode; @@ -106,6 +112,7 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers _localRoles = localRoles ?? new HashSet(StringComparer.Ordinal); _dependencyMux = dependencyMux; _opcUaPublishActor = opcUaPublishActor; + _healthPublisher = healthPublisher ?? NullDriverHealthPublisher.Instance; // Default behavior is Steady — PreStart may flip to Stale or replay an orphan apply. Become(Steady); @@ -357,17 +364,25 @@ public sealed class DriverHostActor : ReceiveActor, IWithTimers } IActorRef child; + var clusterId = _localNode.Value; if (stub) { child = Context.ActorOf( - DriverInstanceActor.Props(new StubbedDriver(spec.DriverInstanceId, spec.DriverType), - reconnectInterval: null, startStubbed: true), + DriverInstanceActor.Props( + new StubbedDriver(spec.DriverInstanceId, spec.DriverType), + reconnectInterval: null, + startStubbed: true, + healthPublisher: _healthPublisher, + clusterId: clusterId), ActorNameFor(spec.DriverInstanceId)); } else { child = Context.ActorOf( - DriverInstanceActor.Props(driver!), + DriverInstanceActor.Props( + driver!, + healthPublisher: _healthPublisher, + clusterId: clusterId), ActorNameFor(spec.DriverInstanceId)); child.Tell(new DriverInstanceActor.InitializeRequested(spec.DriverConfig)); } diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverInstanceActor.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverInstanceActor.cs index 4ccb26d3..4fd8de48 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverInstanceActor.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverInstanceActor.cs @@ -7,6 +7,13 @@ using ZB.MOM.WW.OtOpcUa.Core.Abstractions; namespace ZB.MOM.WW.OtOpcUa.Runtime.Drivers; +// private timer key type — file-scoped so the name stays unique per-file +file sealed class HealthPollTick +{ + public static readonly HealthPollTick Instance = new(); + private HealthPollTick() { } +} + /// /// Akka wrapper for a single instance. States: /// @@ -47,12 +54,21 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers private RetryConnect() { } } + /// Interval between periodic health-poll heartbeats sent to the snapshot store. + public static readonly TimeSpan HealthPollInterval = TimeSpan.FromSeconds(30); + private readonly IDriver _driver; private readonly string _driverInstanceId; + private readonly string _clusterId; + private readonly IDriverHealthPublisher _healthPublisher; private readonly TimeSpan _reconnectInterval; private readonly ILoggingAdapter _log = Context.GetLogger(); private string? _currentConfigJson; + /// Timestamps of recent Faulted-state transitions; used to compute the 5-minute error count. + private readonly Queue _faultTimestamps = new(); + private readonly object _faultLock = new(); + /// Active subscription handle (null when not subscribed). Lifetime is one-per-actor — /// re-subscribe across reconnects is the consumer's responsibility today (subscribe-once /// semantics keep the actor simple; mux-driven re-subscribe is tracked as F8b/#113). @@ -70,8 +86,22 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers /// The driver instance to wrap. /// Optional interval for reconnection attempts; defaults to 10 seconds. /// If true, the actor starts in stub mode for testing or unavailable platforms. - public static Props Props(IDriver driver, TimeSpan? reconnectInterval = null, bool startStubbed = false) => - Akka.Actor.Props.Create(() => new DriverInstanceActor(driver, reconnectInterval ?? DefaultReconnectInterval, startStubbed)); + /// Optional health publisher; defaults to so tests and + /// stub paths don't need to provide one. + /// Optional cluster identifier forwarded in messages; + /// defaults to an empty string when not provided (e.g. in unit tests). + public static Props Props( + IDriver driver, + TimeSpan? reconnectInterval = null, + bool startStubbed = false, + IDriverHealthPublisher? healthPublisher = null, + string? clusterId = null) => + Akka.Actor.Props.Create(() => new DriverInstanceActor( + driver, + reconnectInterval ?? DefaultReconnectInterval, + startStubbed, + healthPublisher ?? NullDriverHealthPublisher.Instance, + clusterId ?? string.Empty)); /// /// Returns true when the driver should boot in DEV-STUB mode based on host platform and @@ -101,10 +131,19 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers /// The driver instance to wrap and manage. /// Interval between reconnection attempts. /// If true, start in stub mode for testing or unavailable platforms. - public DriverInstanceActor(IDriver driver, TimeSpan reconnectInterval, bool startStubbed = false) + /// Sink for health-change notifications; must not be null. + /// Cluster identifier forwarded in health snapshots. + public DriverInstanceActor( + IDriver driver, + TimeSpan reconnectInterval, + bool startStubbed = false, + IDriverHealthPublisher? healthPublisher = null, + string? clusterId = null) { _driver = driver; _driverInstanceId = driver.DriverInstanceId; + _clusterId = clusterId ?? string.Empty; + _healthPublisher = healthPublisher ?? NullDriverHealthPublisher.Instance; _reconnectInterval = reconnectInterval; OtOpcUaTelemetry.DriverInstanceLifecycle.Add(1, new KeyValuePair("event", startStubbed ? "spawn_stub" : "spawn"), @@ -121,6 +160,16 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers } } + /// + protected override void PreStart() + { + // Warm up the snapshot store immediately so AdminUI sees current state as soon as the + // actor starts, before any state transition fires. Also start the periodic heartbeat so + // long-lived Healthy drivers keep their snapshot fresh for newly-joined SignalR clients. + PublishHealthSnapshot(); + Timers.StartPeriodicTimer("health-poll", HealthPollTick.Instance, HealthPollInterval); + } + private void Stubbed() { // Stubbed drivers accept the standard message contracts but return deterministic @@ -129,6 +178,7 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers Receive(msg => Sender.Tell(new ApplyResult(true, "stubbed", msg.Correlation))); Receive(_ => Sender.Tell(new WriteAttributeResult(true, "stubbed"))); Receive(_ => { /* stubbed drivers don't disconnect */ }); + Receive(_ => PublishHealthSnapshot()); } private void Connecting() @@ -138,12 +188,16 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers { _log.Info("DriverInstance {Id}: connected", _driverInstanceId); Become(Connected); + PublishHealthSnapshot(); }); Receive(msg => { _log.Warning("DriverInstance {Id}: initialize failed: {Reason}", _driverInstanceId, msg.Reason); + RecordFault(); Become(Reconnecting); + PublishHealthSnapshot(); }); + Receive(_ => PublishHealthSnapshot()); } private void Connected() @@ -154,12 +208,15 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers _log.Warning("DriverInstance {Id}: disconnect observed ({Reason}); reconnecting", _driverInstanceId, msg.Reason); DetachSubscription(); + RecordFault(); Become(Reconnecting); + PublishHealthSnapshot(); }); ReceiveAsync(HandleWriteAsync); ReceiveAsync(HandleSubscribeAsync); ReceiveAsync(_ => UnsubscribeAsync()); Receive(OnDataChangeForward); + Receive(_ => PublishHealthSnapshot()); } private void Reconnecting() @@ -170,8 +227,10 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers Timers.Cancel("retry-connect"); _log.Info("DriverInstance {Id}: reconnected", _driverInstanceId); Become(Connected); + PublishHealthSnapshot(); }); Receive(_ => { /* keep retrying via timer */ }); + Receive(_ => PublishHealthSnapshot()); Timers.StartPeriodicTimer("retry-connect", RetryConnect.Instance, _reconnectInterval); } @@ -336,6 +395,48 @@ public sealed class DriverInstanceActor : ReceiveActor, IWithTimers private static bool IsGoodStatus(uint statusCode) => (statusCode >> 30) == 0; + /// + /// Records a transition into a Faulted / error state for the 5-minute sliding counter. + /// Thread-safe: called from actor message-handling (single-threaded) but guard is cheap. + /// + private void RecordFault() + { + lock (_faultLock) + { + _faultTimestamps.Enqueue(DateTime.UtcNow); + } + } + + /// Returns how many fault transitions occurred in the last 5 minutes. + private int ErrorCount5Min() + { + var cutoff = DateTime.UtcNow.AddMinutes(-5); + lock (_faultLock) + { + while (_faultTimestamps.Count > 0 && _faultTimestamps.Peek() < cutoff) + _faultTimestamps.Dequeue(); + return _faultTimestamps.Count; + } + } + + /// + /// Polls and forwards the snapshot to the health publisher. + /// Called on every observable state change and by the periodic + /// so the AdminUI snapshot store is warmed up for newly-joined SignalR clients. + /// + private void PublishHealthSnapshot() + { + try + { + var health = _driver.GetHealth(); + _healthPublisher.Publish(_clusterId, _driverInstanceId, health, ErrorCount5Min()); + } + catch (Exception ex) + { + _log.Warning(ex, "DriverInstance {Id}: GetHealth threw during health publish; skipping", _driverInstanceId); + } + } + /// protected override void PostStop() { diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ServiceCollectionExtensions.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ServiceCollectionExtensions.cs index 83c8e3d1..ae03e824 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ServiceCollectionExtensions.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ServiceCollectionExtensions.cs @@ -42,6 +42,7 @@ public static class ServiceCollectionExtensions services.TryAddSingleton(NullDriverFactory.Instance); services.TryAddSingleton(NullOpcUaAddressSpaceSink.Instance); services.TryAddSingleton(NullServiceLevelPublisher.Instance); + services.TryAddSingleton(); return services; } @@ -87,6 +88,7 @@ public static class ServiceCollectionExtensions var addressSpaceSink = resolver.GetService() ?? NullOpcUaAddressSpaceSink.Instance; var serviceLevel = resolver.GetService() ?? NullServiceLevelPublisher.Instance; var loggerFactory = resolver.GetService() ?? NullLoggerFactory.Instance; + var healthPublisher = resolver.GetService() ?? NullDriverHealthPublisher.Instance; var dbHealth = system.ActorOf( DbHealthProbeActor.Props(dbFactory), @@ -116,7 +118,8 @@ public static class ServiceCollectionExtensions DriverHostActor.Props(dbFactory, roleInfo.LocalNode, coordinator: null, driverFactory: driverFactory, localRoles: roleInfo.LocalRoles, dependencyMux: mux, - opcUaPublishActor: publishActor), + opcUaPublishActor: publishActor, + healthPublisher: healthPublisher), DriverHostActorName); registry.Register(driverHost);