using Akka.Actor; using Akka.Event; using ZB.MOM.WW.OtOpcUa.Commons.Observability; using ZB.MOM.WW.OtOpcUa.Commons.OpcUa; using ZB.MOM.WW.OtOpcUa.Commons.Types; using ZB.MOM.WW.OtOpcUa.Core.Abstractions; namespace ZB.MOM.WW.OtOpcUa.Runtime.Drivers; // private timer key type — file-scoped so the name stays unique per-file file sealed class HealthPollTick { public static readonly HealthPollTick Instance = new(); private HealthPollTick() { } } /// /// Akka wrapper for a single instance. States: /// /// /// Connecting — calling . /// Connected — initialised; serving Read/Write/Subscribe requests. /// Reconnecting — disconnect observed; periodic retry of Initialize. /// Failed — terminal until parent restarts the actor. /// /// /// Engine wiring (subscriptions → AttributeValueUpdate publishes, ApplyDelta-driven Reinitialize, /// per-tag write Asks) is staged for follow-up F7. This skeleton compiles + has a working /// state machine so the Phase 6 control-plane integration tests can target it. /// public sealed class DriverInstanceActor : ReceiveActor, IWithTimers { public static readonly TimeSpan DefaultReconnectInterval = TimeSpan.FromSeconds(10); public sealed record InitializeRequested(string DriverConfigJson); public sealed record InitializeSucceeded; public sealed record InitializeFailed(string Reason); public sealed record DisconnectObserved(string Reason); public sealed record ApplyDelta(string DriverConfigJson, CorrelationId Correlation); public sealed record ApplyResult(bool Success, string? Reason, CorrelationId Correlation); public sealed record WriteAttribute(string TagId, object Value); public sealed record WriteAttributeResult(bool Success, string? Reason); public sealed record Subscribe(IReadOnlyList FullReferences, TimeSpan PublishingInterval); public sealed record SubscriptionEstablished(string DiagnosticId, int ReferenceCount); public sealed record SubscriptionFailed(string Reason); public sealed record Unsubscribe; /// /// Sent by when the AdminUI issues a Reconnect operation. /// Pushes the actor out of Connected into Reconnecting so the transport is /// re-established without fully stopping and respawning the actor. Safe to send in any /// state — a no-op when already Reconnecting or Connecting. /// public sealed record ForceReconnect; /// Published to the actor's parent whenever the subscribed IDriver fires /// . The parent forwards to OpcUaPublishActor. public sealed record AttributeValuePublished(string FullReference, object? Value, OpcUaQuality Quality, DateTime TimestampUtc); private sealed record DataChangeForward(string FullReference, DataValueSnapshot Snapshot); public sealed class RetryConnect { public static readonly RetryConnect Instance = new(); private RetryConnect() { } } /// Interval between periodic health-poll heartbeats sent to the snapshot store. public static readonly TimeSpan HealthPollInterval = TimeSpan.FromSeconds(30); private readonly IDriver _driver; private readonly string _driverInstanceId; private readonly string _clusterId; private readonly IDriverHealthPublisher _healthPublisher; private readonly TimeSpan _reconnectInterval; private readonly ILoggingAdapter _log = Context.GetLogger(); private string? _currentConfigJson; /// Timestamps of recent Faulted-state transitions; used to compute the 5-minute error count. private readonly Queue _faultTimestamps = new(); private readonly object _faultLock = new(); /// Active subscription handle (null when not subscribed). Lifetime is one-per-actor — /// re-subscribe across reconnects is the consumer's responsibility today (subscribe-once /// semantics keep the actor simple; mux-driven re-subscribe is tracked as F8b/#113). private ISubscriptionHandle? _subscriptionHandle; private EventHandler? _dataChangeHandler; /// /// Gets or sets the timer scheduler for scheduling reconnection attempts. /// public ITimerScheduler Timers { get; set; } = null!; /// /// Creates a Props object for instantiating a . /// /// The driver instance to wrap. /// Optional interval for reconnection attempts; defaults to 10 seconds. /// If true, the actor starts in stub mode for testing or unavailable platforms. /// Optional health publisher; defaults to so tests and /// stub paths don't need to provide one. /// Optional cluster identifier forwarded in messages; /// defaults to an empty string when not provided (e.g. in unit tests). public static Props Props( IDriver driver, TimeSpan? reconnectInterval = null, bool startStubbed = false, IDriverHealthPublisher? healthPublisher = null, string? clusterId = null) => Akka.Actor.Props.Create(() => new DriverInstanceActor( driver, reconnectInterval ?? DefaultReconnectInterval, startStubbed, healthPublisher ?? NullDriverHealthPublisher.Instance, clusterId ?? string.Empty)); /// /// Returns true when the driver should boot in DEV-STUB mode based on host platform and /// configured roles. Only the v1 in-process types stay Windows-only: /// /// "Galaxy" — legacy MXAccess COM proxy (retired in PR 7.2; gated for any /// leftover DriverInstance rows that still reference the old type name). /// "Historian.Wonderware" — Wonderware Historian sidecar over Windows-only /// named pipes. /// /// The v2 "GalaxyMxGateway" driver talks gRPC to an external mxaccessgw process, /// so it runs on any platform .NET 10 supports — Linux containers included. Not stubbed. /// /// The type identifier of the driver. /// Operational roles configured for this instance. public static bool ShouldStub(string driverType, IEnumerable roles) { var isWindowsOnly = driverType is "Galaxy" or "Historian.Wonderware"; if (!OperatingSystem.IsWindows() && isWindowsOnly) return true; if (roles.Contains("dev") && isWindowsOnly) return true; return false; } /// /// Initializes a new instance of the class. /// /// The driver instance to wrap and manage. /// Interval between reconnection attempts. /// If true, start in stub mode for testing or unavailable platforms. /// Sink for health-change notifications; must not be null. /// Cluster identifier forwarded in health snapshots. public DriverInstanceActor( IDriver driver, TimeSpan reconnectInterval, bool startStubbed = false, IDriverHealthPublisher? healthPublisher = null, string? clusterId = null) { _driver = driver; _driverInstanceId = driver.DriverInstanceId; _clusterId = clusterId ?? string.Empty; _healthPublisher = healthPublisher ?? NullDriverHealthPublisher.Instance; _reconnectInterval = reconnectInterval; OtOpcUaTelemetry.DriverInstanceLifecycle.Add(1, new KeyValuePair("event", startStubbed ? "spawn_stub" : "spawn"), new KeyValuePair("driver_type", driver.DriverType)); if (startStubbed) { Context.GetLogger().Info("[DEV-STUB] driver={Name} type={Type}", _driverInstanceId, driver.DriverType); Become(Stubbed); } else { Become(Connecting); } } /// protected override void PreStart() { // Warm up the snapshot store immediately so AdminUI sees current state as soon as the // actor starts, before any state transition fires. Also start the periodic heartbeat so // long-lived Healthy drivers keep their snapshot fresh for newly-joined SignalR clients. PublishHealthSnapshot(); Timers.StartPeriodicTimer("health-poll", HealthPollTick.Instance, HealthPollInterval); } private void Stubbed() { // Stubbed drivers accept the standard message contracts but return deterministic // success without touching real hardware. Read returns null; Write succeeds. Receive(_ => { /* no-op */ }); Receive(msg => Sender.Tell(new ApplyResult(true, "stubbed", msg.Correlation))); Receive(_ => Sender.Tell(new WriteAttributeResult(true, "stubbed"))); Receive(_ => { /* stubbed drivers don't disconnect */ }); Receive(_ => { /* stubbed drivers don't reconnect */ }); Receive(_ => PublishHealthSnapshot()); } private void Connecting() { Receive(msg => InitializeAsync(msg.DriverConfigJson)); Receive(_ => { _log.Info("DriverInstance {Id}: connected", _driverInstanceId); Become(Connected); PublishHealthSnapshot(); }); Receive(msg => { _log.Warning("DriverInstance {Id}: initialize failed: {Reason}", _driverInstanceId, msg.Reason); RecordFault(); Become(Reconnecting); PublishHealthSnapshot(); }); Receive(_ => { /* already connecting — no-op */ }); Receive(_ => PublishHealthSnapshot()); } private void Connected() { ReceiveAsync(HandleApplyDeltaAsync); Receive(msg => { _log.Warning("DriverInstance {Id}: disconnect observed ({Reason}); reconnecting", _driverInstanceId, msg.Reason); DetachSubscription(); RecordFault(); Become(Reconnecting); PublishHealthSnapshot(); }); Receive(_ => { _log.Info("DriverInstance {Id}: ForceReconnect requested by admin; re-entering Reconnecting", _driverInstanceId); DetachSubscription(); Become(Reconnecting); PublishHealthSnapshot(); }); ReceiveAsync(HandleWriteAsync); ReceiveAsync(HandleSubscribeAsync); ReceiveAsync(_ => UnsubscribeAsync()); Receive(OnDataChangeForward); Receive(_ => PublishHealthSnapshot()); } private void Reconnecting() { Receive(_ => InitializeAsync(_currentConfigJson ?? "{}")); Receive(_ => { Timers.Cancel("retry-connect"); _log.Info("DriverInstance {Id}: reconnected", _driverInstanceId); Become(Connected); PublishHealthSnapshot(); }); Receive(_ => { /* keep retrying via timer */ }); Receive(_ => { /* already reconnecting — no-op */ }); Receive(_ => PublishHealthSnapshot()); Timers.StartPeriodicTimer("retry-connect", RetryConnect.Instance, _reconnectInterval); } private void InitializeAsync(string driverConfigJson) { _currentConfigJson = driverConfigJson; var self = Self; _ = Task.Run(async () => { try { await _driver.InitializeAsync(driverConfigJson, CancellationToken.None); self.Tell(new InitializeSucceeded()); } catch (Exception ex) { self.Tell(new InitializeFailed(ex.Message)); } }); } private async Task HandleApplyDeltaAsync(ApplyDelta msg) { var replyTo = Sender; try { await _driver.ReinitializeAsync(msg.DriverConfigJson, CancellationToken.None); _currentConfigJson = msg.DriverConfigJson; replyTo.Tell(new ApplyResult(true, null, msg.Correlation)); } catch (Exception ex) { replyTo.Tell(new ApplyResult(false, ex.Message, msg.Correlation)); } } private async Task HandleWriteAsync(WriteAttribute msg) { if (_driver is not IWritable writable) { Sender.Tell(new WriteAttributeResult(false, "Driver does not implement IWritable")); return; } var replyTo = Sender; var request = new[] { new WriteRequest(msg.TagId, msg.Value) }; // Bound the write so a hung backend can't pin this actor forever — decision #44/#45 keeps // retry off by default, but a stalled call still needs an answer. using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); try { var results = await writable.WriteAsync(request, cts.Token).ConfigureAwait(false); if (results is { Count: 1 } && IsGoodStatus(results[0].StatusCode)) { replyTo.Tell(new WriteAttributeResult(true, null)); return; } var status = results is { Count: > 0 } ? results[0].StatusCode : 0xFFFFFFFF; replyTo.Tell(new WriteAttributeResult(false, $"StatusCode=0x{status:X8}")); } catch (OperationCanceledException) { replyTo.Tell(new WriteAttributeResult(false, "write timeout")); } catch (Exception ex) { replyTo.Tell(new WriteAttributeResult(false, ex.Message)); } } private async Task HandleSubscribeAsync(Subscribe msg) { if (_driver is not ISubscribable subscribable) { Sender.Tell(new SubscriptionFailed("Driver does not implement ISubscribable")); return; } if (_subscriptionHandle is not null) { // Subscribe-twice — drop the prior subscription before establishing the new one. await UnsubscribeAsync().ConfigureAwait(false); } var replyTo = Sender; var self = Self; try { _dataChangeHandler = (_, args) => self.Tell(new DataChangeForward(args.FullReference, args.Snapshot)); subscribable.OnDataChange += _dataChangeHandler; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); _subscriptionHandle = await subscribable .SubscribeAsync(msg.FullReferences, msg.PublishingInterval, cts.Token) .ConfigureAwait(false); replyTo.Tell(new SubscriptionEstablished(_subscriptionHandle.DiagnosticId, msg.FullReferences.Count)); _log.Info("DriverInstance {Id}: subscribed to {Count} refs ({Diag})", _driverInstanceId, msg.FullReferences.Count, _subscriptionHandle.DiagnosticId); } catch (Exception ex) { DetachSubscription(); _log.Warning(ex, "DriverInstance {Id}: subscribe failed", _driverInstanceId); replyTo.Tell(new SubscriptionFailed(ex.Message)); } } private async Task UnsubscribeAsync() { if (_driver is not ISubscribable subscribable || _subscriptionHandle is null) { DetachSubscription(); return; } try { using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); await subscribable.UnsubscribeAsync(_subscriptionHandle, cts.Token).ConfigureAwait(false); } catch (Exception ex) { _log.Warning(ex, "DriverInstance {Id}: unsubscribe threw (continuing)", _driverInstanceId); } finally { DetachSubscription(); } } /// Tear down the event handler + null the handle. Called from Unsubscribe path, on /// PostStop, and on Connected → Reconnecting transitions so a stale handler doesn't push /// data-change events to an actor that has lost its driver connection. private void DetachSubscription() { if (_driver is ISubscribable subscribable && _dataChangeHandler is not null) { subscribable.OnDataChange -= _dataChangeHandler; } _dataChangeHandler = null; _subscriptionHandle = null; } private void OnDataChangeForward(DataChangeForward msg) { var quality = QualityFromStatus(msg.Snapshot.StatusCode); var ts = msg.Snapshot.SourceTimestampUtc ?? msg.Snapshot.ServerTimestampUtc; Context.Parent.Tell(new AttributeValuePublished(msg.FullReference, msg.Snapshot.Value, quality, ts)); } /// Translate an OPC UA status code to the 3-state projection /// the publish actor consumes. Severity bits (top 2): 00 = Good, 01 = Uncertain, 10/11 = Bad. private static OpcUaQuality QualityFromStatus(uint statusCode) { var severity = statusCode >> 30; return severity switch { 0 => OpcUaQuality.Good, 1 => OpcUaQuality.Uncertain, _ => OpcUaQuality.Bad, }; } private static bool IsGoodStatus(uint statusCode) => (statusCode >> 30) == 0; /// /// Records a transition into a Faulted / error state for the 5-minute sliding counter. /// Thread-safe: called from actor message-handling (single-threaded) but guard is cheap. /// private void RecordFault() { lock (_faultLock) { _faultTimestamps.Enqueue(DateTime.UtcNow); } } /// Returns how many fault transitions occurred in the last 5 minutes. private int ErrorCount5Min() { var cutoff = DateTime.UtcNow.AddMinutes(-5); lock (_faultLock) { while (_faultTimestamps.Count > 0 && _faultTimestamps.Peek() < cutoff) _faultTimestamps.Dequeue(); return _faultTimestamps.Count; } } /// /// Polls and forwards the snapshot to the health publisher. /// Called on every observable state change and by the periodic /// so the AdminUI snapshot store is warmed up for newly-joined SignalR clients. /// private void PublishHealthSnapshot() { try { var health = _driver.GetHealth(); _healthPublisher.Publish(_clusterId, _driverInstanceId, health, ErrorCount5Min()); } catch (Exception ex) { _log.Warning(ex, "DriverInstance {Id}: GetHealth threw during health publish; skipping", _driverInstanceId); } } /// protected override void PostStop() { DetachSubscription(); try { _driver.ShutdownAsync(CancellationToken.None).GetAwaiter().GetResult(); } catch (Exception ex) { _log.Warning(ex, "DriverInstance {Id}: ShutdownAsync threw on PostStop", _driverInstanceId); } OtOpcUaTelemetry.DriverInstanceLifecycle.Add(1, new KeyValuePair("event", "stop"), new KeyValuePair("driver_type", _driver.DriverType)); } }