using Akka.Actor; using Akka.Event; using ZB.MOM.WW.OtOpcUa.Commons.Observability; using ZB.MOM.WW.OtOpcUa.Commons.OpcUa; using ZB.MOM.WW.OtOpcUa.Commons.Types; using ZB.MOM.WW.OtOpcUa.Core.Abstractions; namespace ZB.MOM.WW.OtOpcUa.Runtime.Drivers; // private timer key type — file-scoped so the name stays unique per-file file sealed class HealthPollTick { public static readonly HealthPollTick Instance = new(); private HealthPollTick() { } } /// /// Akka wrapper for a single instance. States: /// /// /// Connecting — calling . /// Connected — initialised; serving Read/Write/Subscribe requests. /// Reconnecting — disconnect observed; periodic retry of Initialize. /// Failed — terminal until parent restarts the actor. /// /// /// Engine wiring (subscriptions → AttributeValueUpdate publishes, ApplyDelta-driven Reinitialize, /// per-tag write Asks) is staged for follow-up F7. This skeleton compiles + has a working /// state machine so the Phase 6 control-plane integration tests can target it. /// public sealed class DriverInstanceActor : ReceiveActor, IWithTimers { public static readonly TimeSpan DefaultReconnectInterval = TimeSpan.FromSeconds(10); public sealed record InitializeRequested(string DriverConfigJson); public sealed record InitializeSucceeded(int Generation); public sealed record InitializeFailed(string Reason, int Generation); public sealed record DisconnectObserved(string Reason); public sealed record ApplyDelta(string DriverConfigJson, CorrelationId Correlation); public sealed record ApplyResult(bool Success, string? Reason, CorrelationId Correlation); public sealed record WriteAttribute(string TagId, object Value); public sealed record WriteAttributeResult(bool Success, string? Reason); public sealed record Subscribe(IReadOnlyList FullReferences, TimeSpan PublishingInterval); /// /// Sets the set of references this driver should keep subscribed for the lifetime of the /// current deployment. Unlike the one-shot , the desired set is /// retained and (re)established automatically every time the actor (re)enters /// Connected — closing the F8b/#113 "re-subscribe across reconnects" gap and giving /// a single message to drive the SubscribeBulk pass after an /// apply. Sending an empty set clears the desired subscription. /// /// /// The native-alarm references (alarm-bearing equipment-tag FullNames = the driver's /// ConditionId/AlarmFullReference) this driver should keep an alarm subscription open for. /// An driver suppresses until at /// least one alarm subscription exists, so the actor calls /// with this set to un-gate the native feed. Empty /// (or null) means the driver has no alarm tags. Defaults to null so non-alarm callers are unchanged. /// public sealed record SetDesiredSubscriptions( IReadOnlyList FullReferences, TimeSpan PublishingInterval, IReadOnlyList? AlarmReferences = null); public sealed record SubscriptionEstablished(string DiagnosticId, int ReferenceCount); public sealed record SubscriptionFailed(string Reason); public sealed record Unsubscribe; /// /// Sent by when the AdminUI issues a Reconnect operation. /// Pushes the actor out of Connected into Reconnecting so the transport is /// re-established without fully stopping and respawning the actor. Safe to send in any /// state — a no-op when already Reconnecting or Connecting. /// public sealed record ForceReconnect; /// Published to the actor's parent whenever the subscribed IDriver fires /// . The parent forwards to OpcUaPublishActor. public sealed record AttributeValuePublished(string DriverInstanceId, string FullReference, object? Value, OpcUaQuality Quality, DateTime TimestampUtc); private sealed record DataChangeForward(string FullReference, DataValueSnapshot Snapshot); /// Published to the parent whenever the subscribed driver (an ) fires /// . The parent () projects + routes it /// to the materialised Part 9 condition. Parallels . public sealed record AttributeAlarmPublished(string DriverInstanceId, AlarmEventArgs Args); private sealed record NativeAlarmRaised(AlarmEventArgs Args); /// Self-sent on Connected entry / when alarm refs are (re)pushed, to establish the native-alarm /// subscription that un-gates an driver's feed. Handled async so the /// call is bounded + off the synchronous handlers. private sealed record SubscribeAlarms; public sealed class RetryConnect { public static readonly RetryConnect Instance = new(); private RetryConnect() { } } /// Interval between periodic health-poll heartbeats sent to the snapshot store. public static readonly TimeSpan HealthPollInterval = TimeSpan.FromSeconds(30); private readonly IDriver _driver; private readonly string _driverInstanceId; private readonly string _clusterId; private readonly IDriverHealthPublisher _healthPublisher; private readonly TimeSpan _reconnectInterval; private readonly ILoggingAdapter _log = Context.GetLogger(); private string? _currentConfigJson; /// Monotonic token tagging each attempt. An init result is /// honoured only when its generation matches the latest; an older result is from a superseded attempt /// (e.g. an adopted a new config mid-(re)connect) and is dropped. Touched only /// on the actor thread, so no lock is needed. private int _initGeneration; /// /// Timestamps of recent Faulted-state transitions; used to compute the 5-minute error count. /// No lock needed — every read/write site runs inside an Akka message handler, which is /// single-threaded per actor instance. /// private readonly Queue _faultTimestamps = new(); /// Active subscription handle (null when not subscribed). Tracks the current live /// subscription; the actor auto-(re)subscribes on (re)connect and on each /// message via / , so callers /// do not need to re-send subscription requests after a reconnect. private ISubscriptionHandle? _subscriptionHandle; private EventHandler? _dataChangeHandler; private EventHandler? _alarmEventHandler; /// The references the host wants kept subscribed (set by ). /// Re-applied on every entry into Connected so values resume after a reconnect or redeploy. private IReadOnlyList _desiredRefs = Array.Empty(); private TimeSpan _desiredInterval = TimeSpan.FromSeconds(1); /// The native-alarm references the host wants kept subscribed (set by /// ). Re-applied on every Connected entry so the alarm /// feed is re-un-gated after a reconnect/redeploy. private IReadOnlyList _desiredAlarmRefs = Array.Empty(); /// The active native-alarm subscription handle for an driver, or /// null when none is established. Reset on so the next Connected entry /// re-subscribes against the freshly re-initialised driver; the null check makes the subscribe /// idempotent across repeated pushes. private IAlarmSubscriptionHandle? _alarmSubscriptionHandle; /// /// Gets or sets the timer scheduler for scheduling reconnection attempts. /// public ITimerScheduler Timers { get; set; } = null!; /// /// Creates a Props object for instantiating a . /// /// The driver instance to wrap. /// Optional interval for reconnection attempts; defaults to 10 seconds. /// If true, the actor starts in stub mode for testing or unavailable platforms. /// Optional health publisher; defaults to so tests and /// stub paths don't need to provide one. /// Optional cluster identifier forwarded in messages; /// defaults to an empty string when not provided (e.g. in unit tests). public static Props Props( IDriver driver, TimeSpan? reconnectInterval = null, bool startStubbed = false, IDriverHealthPublisher? healthPublisher = null, string? clusterId = null) => Akka.Actor.Props.Create(() => new DriverInstanceActor( driver, reconnectInterval ?? DefaultReconnectInterval, startStubbed, healthPublisher ?? NullDriverHealthPublisher.Instance, clusterId ?? string.Empty)); /// /// Returns true when the driver should boot in DEV-STUB mode based on host platform and /// configured roles. Only the v1 in-process types stay Windows-only: /// /// "Galaxy" — legacy MXAccess COM proxy (retired in PR 7.2; gated for any /// leftover DriverInstance rows that still reference the old type name). /// "Historian.Wonderware" — Wonderware Historian sidecar over Windows-only /// named pipes. /// /// The v2 "GalaxyMxGateway" driver talks gRPC to an external mxaccessgw process, /// so it runs on any platform .NET 10 supports — Linux containers included. Not stubbed. /// /// The type identifier of the driver. /// Operational roles configured for this instance. public static bool ShouldStub(string driverType, IEnumerable roles) { var isWindowsOnly = driverType is "Galaxy" or "Historian.Wonderware"; if (!OperatingSystem.IsWindows() && isWindowsOnly) return true; if (roles.Contains("dev") && isWindowsOnly) return true; return false; } /// /// Initializes a new instance of the class. /// /// The driver instance to wrap and manage. /// Interval between reconnection attempts. /// If true, start in stub mode for testing or unavailable platforms. /// Sink for health-change notifications; must not be null. /// Cluster identifier forwarded in health snapshots. public DriverInstanceActor( IDriver driver, TimeSpan reconnectInterval, bool startStubbed = false, IDriverHealthPublisher? healthPublisher = null, string? clusterId = null) { _driver = driver; _driverInstanceId = driver.DriverInstanceId; _clusterId = clusterId ?? string.Empty; _healthPublisher = healthPublisher ?? NullDriverHealthPublisher.Instance; _reconnectInterval = reconnectInterval; OtOpcUaTelemetry.DriverInstanceLifecycle.Add(1, new KeyValuePair("event", startStubbed ? "spawn_stub" : "spawn"), new KeyValuePair("driver_type", driver.DriverType)); if (startStubbed) { Context.GetLogger().Info("[DEV-STUB] driver={Name} type={Type}", _driverInstanceId, driver.DriverType); Become(Stubbed); } else { Become(Connecting); } } /// protected override void PreStart() { // Warm up the snapshot store immediately so AdminUI sees current state as soon as the // actor starts, before any state transition fires. Also start the periodic heartbeat so // long-lived Healthy drivers keep their snapshot fresh for newly-joined SignalR clients. PublishHealthSnapshot(); Timers.StartPeriodicTimer("health-poll", HealthPollTick.Instance, HealthPollInterval); } private void Stubbed() { // Stubbed drivers accept the standard message contracts but return deterministic // success without touching real hardware. Read returns null; Write succeeds. Receive(_ => { /* no-op */ }); Receive(msg => Sender.Tell(new ApplyResult(true, "stubbed", msg.Correlation))); Receive(_ => Sender.Tell(new WriteAttributeResult(true, "stubbed"))); Receive(_ => { /* stubbed drivers don't disconnect */ }); Receive(_ => { /* stubbed drivers don't reconnect */ }); Receive(StoreDesiredSubscriptions); Receive(_ => PublishHealthSnapshot()); } private void Connecting() { Receive(msg => InitializeAsync(msg.DriverConfigJson)); // Fast-fail writes while still connecting — without this the inbound WriteAttribute dead-letters // and DriverHostActor.HandleRouteNodeWrite waits its full 8s Ask before reporting a generic // "write timeout". Synchronous Receive: Sender.Tell on the actor thread is safe (#4a-instance). Receive(_ => Sender.Tell(new WriteAttributeResult(false, "driver not connected"))); Receive(AdoptConfigDuringInit); Receive(msg => { if (msg.Generation != _initGeneration) return; _log.Info("DriverInstance {Id}: connected", _driverInstanceId); Become(Connected); PublishHealthSnapshot(); ResubscribeDesired(); AttachAlarmSource(); SubscribeDesiredAlarms(); }); Receive(msg => { if (msg.Generation != _initGeneration) return; _log.Warning("DriverInstance {Id}: initialize failed: {Reason}", _driverInstanceId, msg.Reason); RecordFault(); Become(Reconnecting); PublishHealthSnapshot(); }); Receive(StoreDesiredSubscriptions); Receive(_ => { /* already connecting — no-op */ }); // ResubscribeDesired self-Tells Subscribe; HandleSubscribeAsync replies SubscriptionEstablished to the // sender, which on the self-resubscribe path is Self. Swallow it (trace only) so it doesn't dead-letter. Receive(msg => _log.Debug("DriverInstance {Id}: subscription established ({Count} refs, {Diag})", _driverInstanceId, msg.ReferenceCount, msg.DiagnosticId)); // Symmetric to the SubscriptionEstablished swallow: a failed self-resubscribe replies SubscriptionFailed // to Self (HandleSubscribeAsync already logged the underlying cause). Swallow it so it doesn't dead-letter. Receive(msg => _log.Debug("DriverInstance {Id}: resubscribe reported failure: {Reason}", _driverInstanceId, msg.Reason)); // A native alarm transition can race in while still (re)connecting (the driver's feed runs on its // own thread); drop it — the feed re-delivers active alarms once Connected. Trace only. Receive(_ => _log.Debug("DriverInstance {Id}: native alarm arrived during connect — dropped (feed re-delivers)", _driverInstanceId)); // A SubscribeAlarms self-tell (from Connected) can be overtaken by an already-queued disconnect into // this state; swallow it so it doesn't dead-letter — the next Connected entry re-subscribes. Receive(_ => { }); Receive(_ => PublishHealthSnapshot()); } private void Connected() { ReceiveAsync(HandleApplyDeltaAsync); Receive(msg => { _log.Warning("DriverInstance {Id}: disconnect observed ({Reason}); reconnecting", _driverInstanceId, msg.Reason); DetachSubscription(); RecordFault(); Become(Reconnecting); PublishHealthSnapshot(); }); Receive(_ => { _log.Info("DriverInstance {Id}: ForceReconnect requested by admin; re-entering Reconnecting", _driverInstanceId); DetachSubscription(); Become(Reconnecting); PublishHealthSnapshot(); }); ReceiveAsync(HandleWriteAsync); ReceiveAsync(HandleSubscribeAsync); ReceiveAsync(_ => UnsubscribeAsync()); Receive(msg => { StoreDesiredSubscriptions(msg); if (_desiredRefs.Count > 0) Self.Tell(new Subscribe(_desiredRefs, _desiredInterval)); else if (_subscriptionHandle is not null) Self.Tell(new Unsubscribe()); // Native-alarm analogue: un-gate the IAlarmSource feed when alarm tags are (now) present. The // common live path — a deploy delivers SetDesiredSubscriptions while the driver is already // Connected — flows through HERE, so the alarm subscribe must happen on this message, not only // on Connected entry. Unlike the value path above, there is deliberately no unsubscribe-on-empty: // a removed alarm tag's condition node is torn down on the address-space rebuild and // DriverHostActor.ForwardNativeAlarm drops any transition whose ConditionId no longer maps, so a // lingering session-less alarm subscription can never surface a removed alarm. SubscribeDesiredAlarms(); }); ReceiveAsync(HandleSubscribeAlarmsAsync); Receive(OnDataChangeForward); // Native alarm transition marshaled onto the actor thread from the driver's OnAlarmEvent; // project it to the parent the same way DataChangeForward projects AttributeValuePublished. Receive(m => Context.Parent.Tell(new AttributeAlarmPublished(_driverInstanceId, m.Args))); // ResubscribeDesired self-Tells Subscribe; HandleSubscribeAsync replies SubscriptionEstablished to the // sender, which on the self-resubscribe path is Self. Swallow it (trace only) so it doesn't dead-letter. Receive(msg => _log.Debug("DriverInstance {Id}: subscription established ({Count} refs, {Diag})", _driverInstanceId, msg.ReferenceCount, msg.DiagnosticId)); // Symmetric to the SubscriptionEstablished swallow: a failed self-resubscribe replies SubscriptionFailed // to Self (HandleSubscribeAsync already logged the underlying cause). Swallow it so it doesn't dead-letter. Receive(msg => _log.Debug("DriverInstance {Id}: resubscribe reported failure: {Reason}", _driverInstanceId, msg.Reason)); Receive(_ => PublishHealthSnapshot()); } private void Reconnecting() { Receive(_ => InitializeAsync(_currentConfigJson ?? "{}")); // Fast-fail writes while reconnecting (same reason as Connecting — avoids the 8s host Ask // timeout on an inbound write to a transiently-down driver). Synchronous Receive (#4a-instance). Receive(_ => Sender.Tell(new WriteAttributeResult(false, "driver not connected"))); Receive(AdoptConfigDuringInit); Receive(msg => { if (msg.Generation != _initGeneration) return; Timers.Cancel("retry-connect"); _log.Info("DriverInstance {Id}: reconnected", _driverInstanceId); Become(Connected); PublishHealthSnapshot(); ResubscribeDesired(); AttachAlarmSource(); SubscribeDesiredAlarms(); }); // A failure here is a no-op regardless of generation — the retry timer keeps trying the // current config; only a (generation-matched) InitializeSucceeded transitions state. Receive(_ => { /* keep retrying via timer */ }); Receive(StoreDesiredSubscriptions); Receive(_ => { /* already reconnecting — no-op */ }); // ResubscribeDesired self-Tells Subscribe; HandleSubscribeAsync replies SubscriptionEstablished to the // sender, which on the self-resubscribe path is Self. Swallow it (trace only) so it doesn't dead-letter. Receive(msg => _log.Debug("DriverInstance {Id}: subscription established ({Count} refs, {Diag})", _driverInstanceId, msg.ReferenceCount, msg.DiagnosticId)); // Symmetric to the SubscriptionEstablished swallow: a failed self-resubscribe replies SubscriptionFailed // to Self (HandleSubscribeAsync already logged the underlying cause). Swallow it so it doesn't dead-letter. Receive(msg => _log.Debug("DriverInstance {Id}: resubscribe reported failure: {Reason}", _driverInstanceId, msg.Reason)); // A native alarm transition can race in while still reconnecting (the driver's feed runs on its // own thread); drop it — the feed re-delivers active alarms once Connected. Trace only. Receive(_ => _log.Debug("DriverInstance {Id}: native alarm arrived during reconnect — dropped (feed re-delivers)", _driverInstanceId)); // A SubscribeAlarms self-tell (from Connected) can be overtaken by an already-queued disconnect into // this state; swallow it so it doesn't dead-letter — the next Connected entry re-subscribes. Receive(_ => { }); Receive(_ => PublishHealthSnapshot()); Timers.StartPeriodicTimer("retry-connect", RetryConnect.Instance, _reconnectInterval); } private void InitializeAsync(string driverConfigJson) { _currentConfigJson = driverConfigJson; var generation = ++_initGeneration; var self = Self; _ = Task.Run(async () => { try { await _driver.InitializeAsync(driverConfigJson, CancellationToken.None); self.Tell(new InitializeSucceeded(generation)); } catch (Exception ex) { self.Tell(new InitializeFailed(ex.Message, generation)); } }); } /// Adopt a new config while not connected: ApplyDelta in Connecting/Reconnecting re-inits /// immediately with the new config. swaps _currentConfigJson and /// bumps the generation, so the in-flight (old-config) init is superseded and its result is dropped. /// The actor stays in its current state; the new init's result drives the next transition. In /// Reconnecting the retry timer is left running — if this immediate attempt fails it keeps retrying /// the new config (a redundant concurrent attempt is deduped by the generation guard). private void AdoptConfigDuringInit(ApplyDelta msg) { _log.Info("DriverInstance {Id}: ApplyDelta during (re)connect — adopting new config, re-initialising now", _driverInstanceId); InitializeAsync(msg.DriverConfigJson); Sender.Tell(new ApplyResult(true, "config adopted; reinitializing", msg.Correlation)); } private async Task HandleApplyDeltaAsync(ApplyDelta msg) { var replyTo = Sender; try { await _driver.ReinitializeAsync(msg.DriverConfigJson, CancellationToken.None); _currentConfigJson = msg.DriverConfigJson; replyTo.Tell(new ApplyResult(true, null, msg.Correlation)); } catch (Exception ex) { replyTo.Tell(new ApplyResult(false, ex.Message, msg.Correlation)); } } private async Task HandleWriteAsync(WriteAttribute msg) { var replyTo = Sender; if (_driver is not IWritable writable) { replyTo.Tell(new WriteAttributeResult(false, "Driver does not implement IWritable")); return; } var request = new[] { new WriteRequest(msg.TagId, msg.Value) }; // Bound the write so a hung backend can't pin this actor forever — decision #44/#45 keeps // retry off by default, but a stalled call still needs an answer. using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); try { var results = await writable.WriteAsync(request, cts.Token); if (results is { Count: 1 } && IsGoodStatus(results[0].StatusCode)) { replyTo.Tell(new WriteAttributeResult(true, null)); return; } var status = results is { Count: > 0 } ? results[0].StatusCode : 0xFFFFFFFF; replyTo.Tell(new WriteAttributeResult(false, $"StatusCode=0x{status:X8}")); } catch (OperationCanceledException) { replyTo.Tell(new WriteAttributeResult(false, "write timeout")); } catch (Exception ex) { replyTo.Tell(new WriteAttributeResult(false, ex.Message)); } } private async Task HandleSubscribeAsync(Subscribe msg) { // Capture Sender/Self BEFORE any await. The re-subscribe path below awaits // UnsubscribeAsync, and a real async backend can resume the continuation off Akka's // ActorContext — reading raw Sender/Self/Context past that point throws // NotSupportedException ("no active ActorContext"). Keep ConfigureAwait off the awaits // in this handler so continuations resume on the actor context. var replyTo = Sender; var self = Self; if (_driver is not ISubscribable subscribable) { replyTo.Tell(new SubscriptionFailed("Driver does not implement ISubscribable")); return; } if (_subscriptionHandle is not null) { // Subscribe-twice — drop the prior subscription before establishing the new one. await UnsubscribeAsync(); } try { _dataChangeHandler = (_, args) => self.Tell(new DataChangeForward(args.FullReference, args.Snapshot)); subscribable.OnDataChange += _dataChangeHandler; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); _subscriptionHandle = await subscribable .SubscribeAsync(msg.FullReferences, msg.PublishingInterval, cts.Token); replyTo.Tell(new SubscriptionEstablished(_subscriptionHandle.DiagnosticId, msg.FullReferences.Count)); _log.Info("DriverInstance {Id}: subscribed to {Count} refs ({Diag})", _driverInstanceId, msg.FullReferences.Count, _subscriptionHandle.DiagnosticId); } catch (Exception ex) { DetachSubscription(); _log.Warning(ex, "DriverInstance {Id}: subscribe failed", _driverInstanceId); replyTo.Tell(new SubscriptionFailed(ex.Message)); } } private async Task UnsubscribeAsync() { if (_driver is not ISubscribable subscribable || _subscriptionHandle is null) { DetachSubscription(); return; } try { using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); await subscribable.UnsubscribeAsync(_subscriptionHandle, cts.Token); } catch (Exception ex) { _log.Warning(ex, "DriverInstance {Id}: unsubscribe threw (continuing)", _driverInstanceId); } finally { DetachSubscription(); } } /// Tear down the data-change + native-alarm event handlers + null the handle. Called from the /// Unsubscribe path, on PostStop, and on Connected → Reconnecting transitions so a stale handler doesn't /// push data-change / alarm events to an actor that has lost its driver connection. private void DetachSubscription() { if (_driver is ISubscribable subscribable && _dataChangeHandler is not null) { subscribable.OnDataChange -= _dataChangeHandler; } _dataChangeHandler = null; _subscriptionHandle = null; DetachAlarmSource(); } /// Subscribe the driver's native alarm event (if it is an ), /// marshaling each transition to the actor thread. Idempotent; mirrors the OnDataChange attach. private void AttachAlarmSource() { if (_driver is not IAlarmSource src || _alarmEventHandler is not null) return; var self = Self; _alarmEventHandler = (_, e) => self.Tell(new NativeAlarmRaised(e)); src.OnAlarmEvent += _alarmEventHandler; } /// Symmetric teardown — called from and PostStop so a stale /// handler never pushes to a disconnected actor. private void DetachAlarmSource() { if (_driver is IAlarmSource src && _alarmEventHandler is not null) src.OnAlarmEvent -= _alarmEventHandler; _alarmEventHandler = null; // Drop our handle so the next Connected entry re-subscribes; the desired alarm refs persist across // reconnects. NOTE: this does NOT tear down the driver-side subscription. For a session-bound // IAlarmSource the old subscription dies with the session (no accumulation). For a session-less feed // (GalaxyDriver's always-on central monitor) it survives an in-place reconnect, so the re-subscribe // is additive — but the driver now collapses to a single live handle on each SubscribeAlarmsAsync // (GalaxyDriver.SubscribeAlarmsAsync clears the set before adding), so handles no longer accumulate // across reconnects. The gate (Count > 0) and the one-shot fan-out are unchanged. _alarmSubscriptionHandle = null; } /// Establish the native-alarm subscription that un-gates an driver's /// feed — the driver suppresses until at least one alarm /// subscription exists. Self-sends the async the Connected behaviour /// handles. Idempotent: a no-op unless the driver is an , alarm refs are /// desired, and no subscription is yet established. private void SubscribeDesiredAlarms() { if (_driver is IAlarmSource && _desiredAlarmRefs.Count > 0 && _alarmSubscriptionHandle is null) Self.Tell(new SubscribeAlarms()); } /// Calls the driver's (bounded) to register the /// alarm subscription that un-gates its native feed, caching the returned handle. Re-checks the guard /// (the desired set may have cleared, or another SubscribeAlarms may have already established a handle, /// while this was queued). Failures are logged and retried on the next Connected entry — the feed simply /// stays gated until then. private async Task HandleSubscribeAlarmsAsync(SubscribeAlarms _) { if (_driver is not IAlarmSource src || _desiredAlarmRefs.Count == 0 || _alarmSubscriptionHandle is not null) return; var refs = _desiredAlarmRefs; using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); try { _alarmSubscriptionHandle = await src.SubscribeAlarmsAsync(refs, cts.Token); _log.Info("DriverInstance {Id}: native-alarm subscription established for {Count} alarm ref(s) ({Diag})", _driverInstanceId, refs.Count, _alarmSubscriptionHandle.DiagnosticId); } catch (Exception ex) { _log.Warning(ex, "DriverInstance {Id}: native-alarm subscription failed — feed stays gated until reconnect", _driverInstanceId); } } /// Records the host's desired subscription set without touching the live subscription. /// The set is (re)applied by on the next Connected entry. private void StoreDesiredSubscriptions(SetDesiredSubscriptions msg) { _desiredRefs = msg.FullReferences; _desiredInterval = msg.PublishingInterval; _desiredAlarmRefs = msg.AlarmReferences ?? Array.Empty(); } /// Re-establish the desired subscription after (re)connecting. Self-sends the one-shot /// the Connected behaviour already handles (which drops any prior handle /// first), so values resume streaming after a reconnect or redeploy without host involvement. private void ResubscribeDesired() { if (_desiredRefs.Count > 0) { Self.Tell(new Subscribe(_desiredRefs, _desiredInterval)); } } private void OnDataChangeForward(DataChangeForward msg) { var quality = QualityFromStatus(msg.Snapshot.StatusCode); var ts = msg.Snapshot.SourceTimestampUtc ?? msg.Snapshot.ServerTimestampUtc; Context.Parent.Tell(new AttributeValuePublished(_driverInstanceId, msg.FullReference, msg.Snapshot.Value, quality, ts)); } /// Translate an OPC UA status code to the 3-state projection /// the publish actor consumes. Severity bits (top 2): 00 = Good, 01 = Uncertain, 10/11 = Bad. private static OpcUaQuality QualityFromStatus(uint statusCode) { var severity = statusCode >> 30; return severity switch { 0 => OpcUaQuality.Good, 1 => OpcUaQuality.Uncertain, _ => OpcUaQuality.Bad, }; } private static bool IsGoodStatus(uint statusCode) => (statusCode >> 30) == 0; /// Records a transition into a Faulted / error state for the 5-minute sliding counter. private void RecordFault() { _faultTimestamps.Enqueue(DateTime.UtcNow); } /// Returns how many fault transitions occurred in the last 5 minutes. private int ErrorCount5Min() { var cutoff = DateTime.UtcNow.AddMinutes(-5); while (_faultTimestamps.Count > 0 && _faultTimestamps.Peek() < cutoff) _faultTimestamps.Dequeue(); return _faultTimestamps.Count; } /// /// Polls and forwards the snapshot to the health publisher. /// Called on every observable state change and by the periodic /// so the AdminUI snapshot store is warmed up for newly-joined SignalR clients. /// Deduplicates: if the resulting (state, lastSuccess, lastError, errorCount) tuple matches /// the last publish, this call is a no-op. Stops flood-publishing identical Healthy snapshots /// every 30s when nothing has changed. Newly-joined SignalR clients still get the current /// snapshot via DriverStatusHub.JoinDriver which reads the store directly. /// private void PublishHealthSnapshot() { try { var health = _driver.GetHealth(); var errorCount = ErrorCount5Min(); var fingerprint = (health.State, health.LastSuccessfulRead, health.LastError, errorCount); if (_lastPublishedFingerprint is { } prev && prev.Equals(fingerprint)) return; _lastPublishedFingerprint = fingerprint; _healthPublisher.Publish(_clusterId, _driverInstanceId, health, errorCount); } catch (Exception ex) { _log.Warning(ex, "DriverInstance {Id}: GetHealth threw during health publish; skipping", _driverInstanceId); } } /// Fingerprint of the last call; null until first publish. private (DriverState State, DateTime? LastSuccess, string? LastError, int ErrorCount)? _lastPublishedFingerprint; /// protected override void PostStop() { DetachSubscription(); try { _driver.ShutdownAsync(CancellationToken.None).GetAwaiter().GetResult(); } catch (Exception ex) { _log.Warning(ex, "DriverInstance {Id}: ShutdownAsync threw on PostStop", _driverInstanceId); } OtOpcUaTelemetry.DriverInstanceLifecycle.Add(1, new KeyValuePair("event", "stop"), new KeyValuePair("driver_type", _driver.DriverType)); } }