Files
lmxopcua/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Drivers/DriverInstanceActor.cs
T

959 lines
55 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
using Akka.Actor;
using Akka.Event;
using ZB.MOM.WW.OtOpcUa.Commons.Observability;
using ZB.MOM.WW.OtOpcUa.Commons.OpcUa;
using ZB.MOM.WW.OtOpcUa.Commons.Types;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
namespace ZB.MOM.WW.OtOpcUa.Runtime.Drivers;
// private timer key type — file-scoped so the name stays unique per-file
file sealed class HealthPollTick
{
public static readonly HealthPollTick Instance = new();
private HealthPollTick() { }
}
/// <summary>
/// Akka wrapper for a single <see cref="IDriver"/> instance. States:
///
/// <list type="bullet">
/// <item><c>Connecting</c> — calling <see cref="IDriver.InitializeAsync"/>.</item>
/// <item><c>Connected</c> — initialised; serving Read/Write/Subscribe requests.</item>
/// <item><c>Reconnecting</c> — disconnect observed; periodic retry of Initialize.</item>
/// <item><c>Failed</c> — terminal until parent restarts the actor.</item>
/// </list>
///
/// Engine wiring (subscriptions → AttributeValueUpdate publishes, ApplyDelta-driven Reinitialize,
/// per-tag write Asks) is staged for follow-up F7. This skeleton compiles + has a working
/// state machine so the Phase 6 control-plane integration tests can target it.
/// </summary>
public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
{
public static readonly TimeSpan DefaultReconnectInterval = TimeSpan.FromSeconds(10);
/// <summary>Default interval between bounded post-connect re-discovery passes.</summary>
public static readonly TimeSpan DefaultRediscoverInterval = TimeSpan.FromSeconds(2);
/// <summary>Default cap on the number of post-connect re-discovery passes.</summary>
public const int DefaultRediscoverMaxAttempts = 15;
/// <summary>Default per-pass timeout for <see cref="ITagDiscovery.DiscoverAsync"/> during
/// bounded post-connect re-discovery. Bounds the mailbox suspension time; production default 30 s.</summary>
public static readonly TimeSpan DefaultRediscoverDiscoverTimeout = TimeSpan.FromSeconds(30);
public sealed record InitializeRequested(string DriverConfigJson);
public sealed record InitializeSucceeded(int Generation);
public sealed record InitializeFailed(string Reason, int Generation);
public sealed record DisconnectObserved(string Reason);
public sealed record ApplyDelta(string DriverConfigJson, CorrelationId Correlation);
public sealed record ApplyResult(bool Success, string? Reason, CorrelationId Correlation);
public sealed record WriteAttribute(string TagId, object Value);
public sealed record WriteAttributeResult(bool Success, string? Reason);
/// <summary>
/// Sent by <see cref="DriverHostActor"/> when an OPC UA client Acknowledges a NATIVE Part 9
/// condition (resolved from the condition NodeId to this driver via the host's alarm inverse map).
/// The actor forwards it to the driver's <see cref="IAlarmSource.AcknowledgeAsync"/>, carrying the
/// authored alarm full-reference (= the driver's <c>ConditionId</c>/AlarmFullReference) and the
/// authenticated principal. Mirrors <see cref="WriteAttribute"/>, but the ack is fire-and-forget:
/// the driver's <see cref="IAlarmSource.AcknowledgeAsync"/> returns no per-condition status and the
/// OPC UA Part 9 ack already committed the local condition state, so there is no reply to surface.
/// </summary>
/// <param name="ConditionId">The authored alarm full-reference the driver correlates the ack on
/// (= the equipment tag's <c>FullName</c>/AlarmFullReference).</param>
/// <param name="Comment">Operator-supplied comment forwarded to the upstream alarm system; null when none.</param>
/// <param name="OperatorUser">The authenticated principal performing the acknowledge.</param>
public sealed record RouteAlarmAck(string ConditionId, string? Comment, string OperatorUser);
public sealed record Subscribe(IReadOnlyList<string> FullReferences, TimeSpan PublishingInterval);
/// <summary>
/// Sets the set of references this driver should keep subscribed for the lifetime of the
/// current deployment. Unlike the one-shot <see cref="Subscribe"/>, the desired set is
/// retained and (re)established automatically every time the actor (re)enters
/// <c>Connected</c> — closing the F8b/#113 "re-subscribe across reconnects" gap and giving
/// <see cref="DriverHostActor"/> a single message to drive the SubscribeBulk pass after an
/// apply. Sending an empty set clears the desired subscription.
/// </summary>
/// <param name="AlarmReferences">
/// The native-alarm references (alarm-bearing equipment-tag FullNames = the driver's
/// <c>ConditionId</c>/AlarmFullReference) this driver should keep an alarm subscription open for.
/// An <see cref="IAlarmSource"/> driver suppresses <see cref="IAlarmSource.OnAlarmEvent"/> until at
/// least one alarm subscription exists, so the actor calls
/// <see cref="IAlarmSource.SubscribeAlarmsAsync"/> with this set to un-gate the native feed. Empty
/// (or null) means the driver has no alarm tags. Defaults to null so non-alarm callers are unchanged.
/// </param>
public sealed record SetDesiredSubscriptions(
IReadOnlyList<string> FullReferences,
TimeSpan PublishingInterval,
IReadOnlyList<string>? AlarmReferences = null);
public sealed record SubscriptionEstablished(string DiagnosticId, int ReferenceCount);
public sealed record SubscriptionFailed(string Reason);
public sealed record Unsubscribe;
/// <summary>
/// Sent by <see cref="DriverHostActor"/> when the AdminUI issues a Reconnect operation.
/// Pushes the actor out of <c>Connected</c> into <c>Reconnecting</c> so the transport is
/// re-established without fully stopping and respawning the actor. Safe to send in any
/// state — a no-op when already Reconnecting or Connecting.
/// </summary>
public sealed record ForceReconnect;
/// <summary>Published to the actor's parent whenever the subscribed IDriver fires
/// <see cref="ISubscribable.OnDataChange"/>. The parent forwards to OpcUaPublishActor.</summary>
public sealed record AttributeValuePublished(string DriverInstanceId, string FullReference, object? Value, OpcUaQuality Quality, DateTime TimestampUtc);
private sealed record DataChangeForward(string FullReference, DataValueSnapshot Snapshot);
/// <summary>Published to the parent whenever the subscribed driver (an <see cref="IAlarmSource"/>) fires
/// <see cref="IAlarmSource.OnAlarmEvent"/>. The parent (<see cref="DriverHostActor"/>) projects + routes it
/// to the materialised Part 9 condition. Parallels <see cref="AttributeValuePublished"/>.</summary>
public sealed record AttributeAlarmPublished(string DriverInstanceId, AlarmEventArgs Args);
private sealed record NativeAlarmRaised(AlarmEventArgs Args);
/// <summary>Self-sent on Connected entry / when alarm refs are (re)pushed, to establish the native-alarm
/// subscription that un-gates an <see cref="IAlarmSource"/> driver's feed. Handled async so the
/// <see cref="IAlarmSource.SubscribeAlarmsAsync"/> call is bounded + off the synchronous handlers.</summary>
private sealed record SubscribeAlarms;
/// <summary>Published to the parent (DriverHostActor) after each post-connect discovery pass so it can
/// graft the driver's discovered FixedTree nodes under the equipment. Empty/duplicate sets are fine —
/// the parent dedups and injection is idempotent.</summary>
public sealed record DiscoveredNodesReady(string DriverInstanceId, IReadOnlyList<DiscoveredNode> Nodes);
/// <summary>
/// Sent by <see cref="DriverHostActor"/> to ask this driver child to re-run post-connect discovery
/// after the host rebinds the driver to a new equipment. Handled only in <c>Connected</c>, where it
/// re-kicks <see cref="StartDiscovery"/> — which already honours the driver's
/// <see cref="ITagDiscovery.RediscoverPolicy"/> and the <see cref="ITagDiscovery"/> guard, tagging the
/// fresh pass with the current init generation. In any non-Connected state it is a deliberate no-op:
/// the driver's eventual (re)connect re-discovers anyway, so there is nothing to do and nothing to log.
/// </summary>
public sealed record TriggerRediscovery;
/// <summary>Internal self-tick driving bounded post-connect re-discovery (FixedTree populates ~02s after connect).
/// <paramref name="PreviousSignature"/> is the ordered-distinct full-reference signature of the prior pass's
/// captured set (empty string on the first tick); re-discovery stops once a non-empty set repeats it.</summary>
private sealed record RediscoverTick(int Generation, int Attempt, string PreviousSignature);
public sealed class RetryConnect
{
public static readonly RetryConnect Instance = new();
private RetryConnect() { }
}
/// <summary>Interval between periodic health-poll heartbeats sent to the snapshot store.</summary>
public static readonly TimeSpan HealthPollInterval = TimeSpan.FromSeconds(30);
private readonly IDriver _driver;
private readonly string _driverInstanceId;
private readonly string _clusterId;
private readonly IDriverHealthPublisher _healthPublisher;
private readonly TimeSpan _reconnectInterval;
/// <summary>Interval between bounded post-connect re-discovery passes. Production default 2s; tests
/// inject a tiny value so the loop runs without real-time waits.</summary>
private readonly TimeSpan _rediscoverInterval;
/// <summary>Cap on the number of post-connect re-discovery passes — a backstop so a never-stabilising
/// (or perpetually-empty) discovered set cannot spin the loop forever. Production default 15.</summary>
private readonly int _rediscoverMaxAttempts;
/// <summary>Per-pass timeout for <see cref="ITagDiscovery.DiscoverAsync"/> during bounded post-connect
/// re-discovery. Bounds the mailbox suspension time. Production default 30 s; tests may inject a shorter
/// value. Stored to allow injection rather than hardcoding.</summary>
private readonly TimeSpan _rediscoverDiscoverTimeout;
private readonly ILoggingAdapter _log = Context.GetLogger();
private string? _currentConfigJson;
/// <summary>Monotonic token tagging each <see cref="InitializeAsync"/> attempt. An init result is
/// honoured only when its generation matches the latest; an older result is from a superseded attempt
/// (e.g. an <see cref="ApplyDelta"/> adopted a new config mid-(re)connect) and is dropped. Touched only
/// on the actor thread, so no lock is needed.</summary>
private int _initGeneration;
/// <summary>
/// Timestamps of recent Faulted-state transitions; used to compute the 5-minute error count.
/// No lock needed — every read/write site runs inside an Akka message handler, which is
/// single-threaded per actor instance.
/// </summary>
private readonly Queue<DateTime> _faultTimestamps = new();
/// <summary>Active subscription handle (null when not subscribed). Tracks the current live
/// subscription; the actor auto-(re)subscribes on (re)connect and on each <see cref="Subscribe"/>
/// message via <see cref="ResubscribeDesired"/> / <see cref="HandleSubscribeAsync"/>, so callers
/// do not need to re-send subscription requests after a reconnect.</summary>
private ISubscriptionHandle? _subscriptionHandle;
private EventHandler<DataChangeEventArgs>? _dataChangeHandler;
private EventHandler<AlarmEventArgs>? _alarmEventHandler;
/// <summary>The references the host wants kept subscribed (set by <see cref="SetDesiredSubscriptions"/>).
/// Re-applied on every entry into <c>Connected</c> so values resume after a reconnect or redeploy.</summary>
private IReadOnlyList<string> _desiredRefs = Array.Empty<string>();
private TimeSpan _desiredInterval = TimeSpan.FromSeconds(1);
/// <summary>The native-alarm references the host wants kept subscribed (set by
/// <see cref="SetDesiredSubscriptions"/>). Re-applied on every <c>Connected</c> entry so the alarm
/// feed is re-un-gated after a reconnect/redeploy.</summary>
private IReadOnlyList<string> _desiredAlarmRefs = Array.Empty<string>();
/// <summary>The active native-alarm subscription handle for an <see cref="IAlarmSource"/> driver, or
/// null when none is established. Reset on <see cref="DetachAlarmSource"/> so the next Connected entry
/// re-subscribes against the freshly re-initialised driver; the null check makes the subscribe
/// idempotent across repeated <see cref="SetDesiredSubscriptions"/> pushes.</summary>
private IAlarmSubscriptionHandle? _alarmSubscriptionHandle;
/// <summary>
/// Gets or sets the timer scheduler for scheduling reconnection attempts.
/// </summary>
public ITimerScheduler Timers { get; set; } = null!;
/// <summary>
/// Creates a Props object for instantiating a <see cref="DriverInstanceActor"/>.
/// </summary>
/// <param name="driver">The driver instance to wrap.</param>
/// <param name="reconnectInterval">Optional interval for reconnection attempts; defaults to 10 seconds.</param>
/// <param name="startStubbed">If true, the actor starts in stub mode for testing or unavailable platforms.</param>
/// <param name="healthPublisher">Optional health publisher; defaults to <see cref="NullDriverHealthPublisher"/> so tests and
/// stub paths don't need to provide one.</param>
/// <param name="clusterId">Optional cluster identifier forwarded in <see cref="DriverHealthChanged"/> messages;
/// defaults to an empty string when not provided (e.g. in unit tests).</param>
/// <param name="rediscoverInterval">Optional interval between post-connect re-discovery passes; defaults to 2 seconds.</param>
/// <param name="rediscoverMaxAttempts">Optional cap on re-discovery passes; defaults to 15.</param>
/// <param name="rediscoverDiscoverTimeout">Optional per-pass timeout for <see cref="ITagDiscovery.DiscoverAsync"/>; defaults to 30 seconds.</param>
public static Props Props(
IDriver driver,
TimeSpan? reconnectInterval = null,
bool startStubbed = false,
IDriverHealthPublisher? healthPublisher = null,
string? clusterId = null,
TimeSpan? rediscoverInterval = null,
int rediscoverMaxAttempts = DefaultRediscoverMaxAttempts,
TimeSpan? rediscoverDiscoverTimeout = null) =>
Akka.Actor.Props.Create(() => new DriverInstanceActor(
driver,
reconnectInterval ?? DefaultReconnectInterval,
startStubbed,
healthPublisher ?? NullDriverHealthPublisher.Instance,
clusterId ?? string.Empty,
rediscoverInterval,
rediscoverMaxAttempts,
rediscoverDiscoverTimeout));
/// <summary>
/// Returns true when the driver should boot in DEV-STUB mode based on host platform and
/// configured roles. Only the v1 in-process types stay Windows-only:
/// <list type="bullet">
/// <item><c>"Galaxy"</c> — legacy MXAccess COM proxy (retired in PR 7.2; gated for any
/// leftover DriverInstance rows that still reference the old type name).</item>
/// <item><c>"Historian.Wonderware"</c> — Wonderware Historian sidecar over Windows-only
/// named pipes.</item>
/// </list>
/// The v2 <c>"GalaxyMxGateway"</c> driver talks gRPC to an external mxaccessgw process,
/// so it runs on any platform .NET 10 supports — Linux containers included. Not stubbed.
/// </summary>
/// <param name="driverType">The type identifier of the driver.</param>
/// <param name="roles">Operational roles configured for this instance.</param>
public static bool ShouldStub(string driverType, IEnumerable<string> roles)
{
var isWindowsOnly = driverType is "Galaxy" or "Historian.Wonderware";
if (!OperatingSystem.IsWindows() && isWindowsOnly) return true;
if (roles.Contains("dev") && isWindowsOnly) return true;
return false;
}
/// <summary>
/// Initializes a new instance of the <see cref="DriverInstanceActor"/> class.
/// </summary>
/// <param name="driver">The driver instance to wrap and manage.</param>
/// <param name="reconnectInterval">Interval between reconnection attempts.</param>
/// <param name="startStubbed">If true, start in stub mode for testing or unavailable platforms.</param>
/// <param name="healthPublisher">Sink for health-change notifications; must not be null.</param>
/// <param name="clusterId">Cluster identifier forwarded in health snapshots.</param>
/// <param name="rediscoverInterval">Interval between post-connect re-discovery passes; defaults to 2 seconds.</param>
/// <param name="rediscoverMaxAttempts">Cap on the number of re-discovery passes; defaults to 15.</param>
/// <param name="rediscoverDiscoverTimeout">Per-pass timeout for <see cref="ITagDiscovery.DiscoverAsync"/>; defaults to 30 seconds.</param>
public DriverInstanceActor(
IDriver driver,
TimeSpan reconnectInterval,
bool startStubbed = false,
IDriverHealthPublisher? healthPublisher = null,
string? clusterId = null,
TimeSpan? rediscoverInterval = null,
int rediscoverMaxAttempts = DefaultRediscoverMaxAttempts,
TimeSpan? rediscoverDiscoverTimeout = null)
{
_driver = driver;
_driverInstanceId = driver.DriverInstanceId;
_clusterId = clusterId ?? string.Empty;
_healthPublisher = healthPublisher ?? NullDriverHealthPublisher.Instance;
_reconnectInterval = reconnectInterval;
_rediscoverInterval = rediscoverInterval ?? DefaultRediscoverInterval;
_rediscoverMaxAttempts = rediscoverMaxAttempts;
_rediscoverDiscoverTimeout = rediscoverDiscoverTimeout ?? DefaultRediscoverDiscoverTimeout;
OtOpcUaTelemetry.DriverInstanceLifecycle.Add(1,
new KeyValuePair<string, object?>("event", startStubbed ? "spawn_stub" : "spawn"),
new KeyValuePair<string, object?>("driver_type", driver.DriverType));
if (startStubbed)
{
Context.GetLogger().Info("[DEV-STUB] driver={Name} type={Type}",
_driverInstanceId, driver.DriverType);
Become(Stubbed);
}
else
{
Become(Connecting);
}
}
/// <inheritdoc />
protected override void PreStart()
{
// Warm up the snapshot store immediately so AdminUI sees current state as soon as the
// actor starts, before any state transition fires. Also start the periodic heartbeat so
// long-lived Healthy drivers keep their snapshot fresh for newly-joined SignalR clients.
PublishHealthSnapshot();
Timers.StartPeriodicTimer("health-poll", HealthPollTick.Instance, HealthPollInterval);
}
private void Stubbed()
{
// Stubbed drivers accept the standard message contracts but return deterministic
// success without touching real hardware. Read returns null; Write succeeds.
Receive<InitializeRequested>(_ => { /* no-op */ });
Receive<ApplyDelta>(msg => Sender.Tell(new ApplyResult(true, "stubbed", msg.Correlation)));
Receive<WriteAttribute>(_ => Sender.Tell(new WriteAttributeResult(true, "stubbed")));
// Stubbed drivers have no upstream alarm system — swallow the ack (it's fire-and-forget, no reply).
Receive<RouteAlarmAck>(_ => { /* stubbed drivers have no alarm backend */ });
Receive<DisconnectObserved>(_ => { /* stubbed drivers don't disconnect */ });
Receive<ForceReconnect>(_ => { /* stubbed drivers don't reconnect */ });
Receive<SetDesiredSubscriptions>(StoreDesiredSubscriptions);
// Stubbed drivers never enter Connected, so they never kick discovery; swallow defensively in case a
// re-discovery self-tick is ever routed here so it doesn't surface as an Akka Unhandled message.
Receive<RediscoverTick>(_ => { });
// A TriggerRediscovery is meaningless to a stubbed (never-Connected) driver — silently ignore it.
Receive<TriggerRediscovery>(_ => { });
Receive<HealthPollTick>(_ => PublishHealthSnapshot());
}
private void Connecting()
{
Receive<InitializeRequested>(msg => InitializeAsync(msg.DriverConfigJson));
// Fast-fail writes while still connecting — without this the inbound WriteAttribute dead-letters
// and DriverHostActor.HandleRouteNodeWrite waits its full 8s Ask before reporting a generic
// "write timeout". Synchronous Receive: Sender.Tell on the actor thread is safe (#4a-instance).
Receive<WriteAttribute>(_ =>
Sender.Tell(new WriteAttributeResult(false, "driver not connected")));
// An ack arriving while still connecting can't reach the upstream alarm system; drop it (the ack is
// fire-and-forget — no reply to surface — and the OPC UA condition state already committed locally).
Receive<RouteAlarmAck>(_ =>
_log.Debug("DriverInstance {Id}: alarm ack arrived during connect — dropped (driver not connected)", _driverInstanceId));
Receive<ApplyDelta>(AdoptConfigDuringInit);
Receive<InitializeSucceeded>(msg =>
{
if (msg.Generation != _initGeneration) return;
_log.Info("DriverInstance {Id}: connected", _driverInstanceId);
Become(Connected);
PublishHealthSnapshot();
ResubscribeDesired();
AttachAlarmSource();
SubscribeDesiredAlarms();
StartDiscovery();
});
Receive<InitializeFailed>(msg =>
{
if (msg.Generation != _initGeneration) return;
_log.Warning("DriverInstance {Id}: initialize failed: {Reason}", _driverInstanceId, msg.Reason);
RecordFault();
Become(Reconnecting);
PublishHealthSnapshot();
});
Receive<SetDesiredSubscriptions>(StoreDesiredSubscriptions);
Receive<ForceReconnect>(_ => { /* already connecting — no-op */ });
// ResubscribeDesired self-Tells Subscribe; HandleSubscribeAsync replies SubscriptionEstablished to the
// sender, which on the self-resubscribe path is Self. Swallow it (trace only) so it doesn't dead-letter.
Receive<SubscriptionEstablished>(msg =>
_log.Debug("DriverInstance {Id}: subscription established ({Count} refs, {Diag})",
_driverInstanceId, msg.ReferenceCount, msg.DiagnosticId));
// Symmetric to the SubscriptionEstablished swallow: a failed self-resubscribe replies SubscriptionFailed
// to Self (HandleSubscribeAsync already logged the underlying cause). Swallow it so it doesn't dead-letter.
Receive<SubscriptionFailed>(msg =>
_log.Debug("DriverInstance {Id}: resubscribe reported failure: {Reason}", _driverInstanceId, msg.Reason));
// A native alarm transition can race in while still (re)connecting (the driver's feed runs on its
// own thread); drop it — the feed re-delivers active alarms once Connected. Trace only.
Receive<NativeAlarmRaised>(_ =>
_log.Debug("DriverInstance {Id}: native alarm arrived during connect — dropped (feed re-delivers)", _driverInstanceId));
// A SubscribeAlarms self-tell (from Connected) can be overtaken by an already-queued disconnect into
// this state; swallow it so it doesn't dead-letter — the next Connected entry re-subscribes.
Receive<SubscribeAlarms>(_ => { });
// Likewise the attempt-0 re-discovery self-tick (sent on Connected entry) can be overtaken by an
// already-queued disconnect; swallow it — the next Connected entry re-kicks discovery.
Receive<RediscoverTick>(_ => { });
// A TriggerRediscovery arriving while not Connected is a deliberate no-op — the (re)connect path
// re-runs discovery anyway. Swallow it so it stays a clean silent no-op (no Unhandled event).
Receive<TriggerRediscovery>(_ => { });
Receive<HealthPollTick>(_ => PublishHealthSnapshot());
}
private void Connected()
{
ReceiveAsync<ApplyDelta>(HandleApplyDeltaAsync);
Receive<DisconnectObserved>(msg =>
{
_log.Warning("DriverInstance {Id}: disconnect observed ({Reason}); reconnecting",
_driverInstanceId, msg.Reason);
Timers.Cancel("rediscover");
DetachSubscription();
RecordFault();
Become(Reconnecting);
PublishHealthSnapshot();
});
Receive<ForceReconnect>(_ =>
{
_log.Info("DriverInstance {Id}: ForceReconnect requested by admin; re-entering Reconnecting", _driverInstanceId);
Timers.Cancel("rediscover");
DetachSubscription();
Become(Reconnecting);
PublishHealthSnapshot();
});
ReceiveAsync<RediscoverTick>(HandleRediscoverAsync);
// The host asks for a fresh discovery pass after rebinding the driver to a new equipment. Cancel any
// pending rediscover tick FIRST — mirroring ForceReconnect/DisconnectObserved — so a stale tick left
// over from the prior loop can't fire alongside the freshly-kicked one, then re-kick the bounded loop
// via StartDiscovery (honours RediscoverPolicy + the ITagDiscovery guard, tagged with the current
// _initGeneration). Only handled here in Connected — non-Connected states no-op it below. A stale tick
// that still slips through (one already mid-async-handler) is benign: the parent dedups
// DiscoveredNodesReady and node injection is idempotent — the Cancel just avoids the avoidable double
// pass in the common case.
Receive<TriggerRediscovery>(_ =>
{
Timers.Cancel("rediscover");
StartDiscovery();
});
ReceiveAsync<WriteAttribute>(HandleWriteAsync);
ReceiveAsync<RouteAlarmAck>(HandleAcknowledgeAsync);
ReceiveAsync<Subscribe>(HandleSubscribeAsync);
ReceiveAsync<Unsubscribe>(_ => UnsubscribeAsync());
Receive<SetDesiredSubscriptions>(msg =>
{
StoreDesiredSubscriptions(msg);
if (_desiredRefs.Count > 0) Self.Tell(new Subscribe(_desiredRefs, _desiredInterval));
else if (_subscriptionHandle is not null) Self.Tell(new Unsubscribe());
// Native-alarm analogue: un-gate the IAlarmSource feed when alarm tags are (now) present. The
// common live path — a deploy delivers SetDesiredSubscriptions while the driver is already
// Connected — flows through HERE, so the alarm subscribe must happen on this message, not only
// on Connected entry. Unlike the value path above, there is deliberately no unsubscribe-on-empty:
// a removed alarm tag's condition node is torn down on the address-space rebuild and
// DriverHostActor.ForwardNativeAlarm drops any transition whose ConditionId no longer maps, so a
// lingering session-less alarm subscription can never surface a removed alarm.
SubscribeDesiredAlarms();
});
ReceiveAsync<SubscribeAlarms>(HandleSubscribeAlarmsAsync);
Receive<DataChangeForward>(OnDataChangeForward);
// Native alarm transition marshaled onto the actor thread from the driver's OnAlarmEvent;
// project it to the parent the same way DataChangeForward projects AttributeValuePublished.
Receive<NativeAlarmRaised>(m => Context.Parent.Tell(new AttributeAlarmPublished(_driverInstanceId, m.Args)));
// ResubscribeDesired self-Tells Subscribe; HandleSubscribeAsync replies SubscriptionEstablished to the
// sender, which on the self-resubscribe path is Self. Swallow it (trace only) so it doesn't dead-letter.
Receive<SubscriptionEstablished>(msg =>
_log.Debug("DriverInstance {Id}: subscription established ({Count} refs, {Diag})",
_driverInstanceId, msg.ReferenceCount, msg.DiagnosticId));
// Symmetric to the SubscriptionEstablished swallow: a failed self-resubscribe replies SubscriptionFailed
// to Self (HandleSubscribeAsync already logged the underlying cause). Swallow it so it doesn't dead-letter.
Receive<SubscriptionFailed>(msg =>
_log.Debug("DriverInstance {Id}: resubscribe reported failure: {Reason}", _driverInstanceId, msg.Reason));
Receive<HealthPollTick>(_ => PublishHealthSnapshot());
}
private void Reconnecting()
{
Receive<RetryConnect>(_ => InitializeAsync(_currentConfigJson ?? "{}"));
// Fast-fail writes while reconnecting (same reason as Connecting — avoids the 8s host Ask
// timeout on an inbound write to a transiently-down driver). Synchronous Receive (#4a-instance).
Receive<WriteAttribute>(_ =>
Sender.Tell(new WriteAttributeResult(false, "driver not connected")));
// An ack arriving while reconnecting can't reach the upstream alarm system; drop it (fire-and-forget,
// no reply — the OPC UA condition state already committed locally on the Part 9 ack).
Receive<RouteAlarmAck>(_ =>
_log.Debug("DriverInstance {Id}: alarm ack arrived during reconnect — dropped (driver not connected)", _driverInstanceId));
Receive<ApplyDelta>(AdoptConfigDuringInit);
Receive<InitializeSucceeded>(msg =>
{
if (msg.Generation != _initGeneration) return;
Timers.Cancel("retry-connect");
_log.Info("DriverInstance {Id}: reconnected", _driverInstanceId);
Become(Connected);
PublishHealthSnapshot();
ResubscribeDesired();
AttachAlarmSource();
SubscribeDesiredAlarms();
StartDiscovery(); // re-run discovery on reconnect — keeps the injected tree fresh if the backend's capabilities changed
});
// A failure here is a no-op regardless of generation — the retry timer keeps trying the
// current config; only a (generation-matched) InitializeSucceeded transitions state.
Receive<InitializeFailed>(_ => { /* keep retrying via timer */ });
Receive<SetDesiredSubscriptions>(StoreDesiredSubscriptions);
Receive<ForceReconnect>(_ => { /* already reconnecting — no-op */ });
// ResubscribeDesired self-Tells Subscribe; HandleSubscribeAsync replies SubscriptionEstablished to the
// sender, which on the self-resubscribe path is Self. Swallow it (trace only) so it doesn't dead-letter.
Receive<SubscriptionEstablished>(msg =>
_log.Debug("DriverInstance {Id}: subscription established ({Count} refs, {Diag})",
_driverInstanceId, msg.ReferenceCount, msg.DiagnosticId));
// Symmetric to the SubscriptionEstablished swallow: a failed self-resubscribe replies SubscriptionFailed
// to Self (HandleSubscribeAsync already logged the underlying cause). Swallow it so it doesn't dead-letter.
Receive<SubscriptionFailed>(msg =>
_log.Debug("DriverInstance {Id}: resubscribe reported failure: {Reason}", _driverInstanceId, msg.Reason));
// A native alarm transition can race in while still reconnecting (the driver's feed runs on its
// own thread); drop it — the feed re-delivers active alarms once Connected. Trace only.
Receive<NativeAlarmRaised>(_ =>
_log.Debug("DriverInstance {Id}: native alarm arrived during reconnect — dropped (feed re-delivers)", _driverInstanceId));
// A SubscribeAlarms self-tell (from Connected) can be overtaken by an already-queued disconnect into
// this state; swallow it so it doesn't dead-letter — the next Connected entry re-subscribes.
Receive<SubscribeAlarms>(_ => { });
// Likewise the attempt-0 re-discovery self-tick (sent on Connected entry) can be overtaken by an
// already-queued disconnect; swallow it — the next Connected entry re-kicks discovery.
Receive<RediscoverTick>(_ => { });
// A TriggerRediscovery arriving while not Connected is a deliberate no-op — the (re)connect path
// re-runs discovery anyway. Swallow it so it stays a clean silent no-op (no Unhandled event).
Receive<TriggerRediscovery>(_ => { });
Receive<HealthPollTick>(_ => PublishHealthSnapshot());
Timers.StartPeriodicTimer("retry-connect", RetryConnect.Instance, _reconnectInterval);
}
private void InitializeAsync(string driverConfigJson)
{
_currentConfigJson = driverConfigJson;
var generation = ++_initGeneration;
var self = Self;
_ = Task.Run(async () =>
{
try
{
await _driver.InitializeAsync(driverConfigJson, CancellationToken.None);
self.Tell(new InitializeSucceeded(generation));
}
catch (Exception ex)
{
self.Tell(new InitializeFailed(ex.Message, generation));
}
});
}
/// <summary>Adopt a new config while not connected: ApplyDelta in Connecting/Reconnecting re-inits
/// immediately with the new config. <see cref="InitializeAsync"/> swaps <c>_currentConfigJson</c> and
/// bumps the generation, so the in-flight (old-config) init is superseded and its result is dropped.
/// The actor stays in its current state; the new init's result drives the next transition. In
/// Reconnecting the retry timer is left running — if this immediate attempt fails it keeps retrying
/// the new config (a redundant concurrent attempt is deduped by the generation guard).</summary>
private void AdoptConfigDuringInit(ApplyDelta msg)
{
_log.Info("DriverInstance {Id}: ApplyDelta during (re)connect — adopting new config, re-initialising now",
_driverInstanceId);
InitializeAsync(msg.DriverConfigJson);
Sender.Tell(new ApplyResult(true, "config adopted; reinitializing", msg.Correlation));
}
private async Task HandleApplyDeltaAsync(ApplyDelta msg)
{
var replyTo = Sender;
try
{
await _driver.ReinitializeAsync(msg.DriverConfigJson, CancellationToken.None);
_currentConfigJson = msg.DriverConfigJson;
replyTo.Tell(new ApplyResult(true, null, msg.Correlation));
}
catch (Exception ex)
{
replyTo.Tell(new ApplyResult(false, ex.Message, msg.Correlation));
}
}
private async Task HandleWriteAsync(WriteAttribute msg)
{
var replyTo = Sender;
if (_driver is not IWritable writable)
{
replyTo.Tell(new WriteAttributeResult(false, "Driver does not implement IWritable"));
return;
}
var request = new[] { new WriteRequest(msg.TagId, msg.Value) };
// Bound the write so a hung backend can't pin this actor forever — decision #44/#45 keeps
// retry off by default, but a stalled call still needs an answer.
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
try
{
var results = await writable.WriteAsync(request, cts.Token);
if (results is { Count: 1 } && IsGoodStatus(results[0].StatusCode))
{
replyTo.Tell(new WriteAttributeResult(true, null));
return;
}
var status = results is { Count: > 0 } ? results[0].StatusCode : 0xFFFFFFFF;
replyTo.Tell(new WriteAttributeResult(false, $"StatusCode=0x{status:X8}"));
}
catch (OperationCanceledException)
{
replyTo.Tell(new WriteAttributeResult(false, "write timeout"));
}
catch (Exception ex)
{
replyTo.Tell(new WriteAttributeResult(false, ex.Message));
}
}
/// <summary>
/// Forwards an inbound native-condition acknowledge (routed by <see cref="DriverHostActor"/> from a
/// resolved condition NodeId) to the driver's <see cref="IAlarmSource.AcknowledgeAsync"/>. The driver
/// correlates on <see cref="AlarmAcknowledgeRequest.ConditionId"/> (= the authored alarm
/// full-reference); <see cref="AlarmAcknowledgeRequest.SourceNodeId"/> carries the same reference (the
/// driver's ack path keys on ConditionId). Bounded to 5s so a hung backend can't pin this actor.
/// Fire-and-forget — the OPC UA Part 9 ack already committed the local condition state and
/// <see cref="IAlarmSource.AcknowledgeAsync"/> returns no per-condition status — so there is no reply;
/// a failure is logged and dropped (the local condition stays Acknowledged regardless).
/// </summary>
private async Task HandleAcknowledgeAsync(RouteAlarmAck msg)
{
if (_driver is not IAlarmSource src)
{
_log.Warning("DriverInstance {Id}: alarm ack dropped — driver does not implement IAlarmSource", _driverInstanceId);
return;
}
var request = new[]
{
new AlarmAcknowledgeRequest(
SourceNodeId: msg.ConditionId,
ConditionId: msg.ConditionId,
Comment: msg.Comment,
OperatorUser: msg.OperatorUser),
};
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
try
{
await src.AcknowledgeAsync(request, cts.Token);
_log.Info("DriverInstance {Id}: acknowledged native condition {Cond} by {User}",
_driverInstanceId, msg.ConditionId, msg.OperatorUser);
}
catch (Exception ex)
{
_log.Warning(ex, "DriverInstance {Id}: native-alarm acknowledge of {Cond} failed",
_driverInstanceId, msg.ConditionId);
}
}
private async Task HandleSubscribeAsync(Subscribe msg)
{
// Capture Sender/Self BEFORE any await. The re-subscribe path below awaits
// UnsubscribeAsync, and a real async backend can resume the continuation off Akka's
// ActorContext — reading raw Sender/Self/Context past that point throws
// NotSupportedException ("no active ActorContext"). Keep ConfigureAwait off the awaits
// in this handler so continuations resume on the actor context.
var replyTo = Sender;
var self = Self;
if (_driver is not ISubscribable subscribable)
{
replyTo.Tell(new SubscriptionFailed("Driver does not implement ISubscribable"));
return;
}
if (_subscriptionHandle is not null)
{
// Subscribe-twice — drop the prior subscription before establishing the new one.
await UnsubscribeAsync();
}
try
{
_dataChangeHandler = (_, args) => self.Tell(new DataChangeForward(args.FullReference, args.Snapshot));
subscribable.OnDataChange += _dataChangeHandler;
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
_subscriptionHandle = await subscribable
.SubscribeAsync(msg.FullReferences, msg.PublishingInterval, cts.Token);
replyTo.Tell(new SubscriptionEstablished(_subscriptionHandle.DiagnosticId, msg.FullReferences.Count));
_log.Info("DriverInstance {Id}: subscribed to {Count} refs ({Diag})",
_driverInstanceId, msg.FullReferences.Count, _subscriptionHandle.DiagnosticId);
}
catch (Exception ex)
{
DetachSubscription();
_log.Warning(ex, "DriverInstance {Id}: subscribe failed", _driverInstanceId);
replyTo.Tell(new SubscriptionFailed(ex.Message));
}
}
private async Task UnsubscribeAsync()
{
if (_driver is not ISubscribable subscribable || _subscriptionHandle is null)
{
DetachSubscription();
return;
}
try
{
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
await subscribable.UnsubscribeAsync(_subscriptionHandle, cts.Token);
}
catch (Exception ex)
{
_log.Warning(ex, "DriverInstance {Id}: unsubscribe threw (continuing)", _driverInstanceId);
}
finally
{
DetachSubscription();
}
}
/// <summary>Tear down the data-change + native-alarm event handlers + null the handle. Called from the
/// Unsubscribe path, on PostStop, and on Connected → Reconnecting transitions so a stale handler doesn't
/// push data-change / alarm events to an actor that has lost its driver connection.</summary>
private void DetachSubscription()
{
if (_driver is ISubscribable subscribable && _dataChangeHandler is not null)
{
subscribable.OnDataChange -= _dataChangeHandler;
}
_dataChangeHandler = null;
_subscriptionHandle = null;
DetachAlarmSource();
}
/// <summary>Subscribe the driver's native alarm event (if it is an <see cref="IAlarmSource"/>),
/// marshaling each transition to the actor thread. Idempotent; mirrors the OnDataChange attach.</summary>
private void AttachAlarmSource()
{
if (_driver is not IAlarmSource src || _alarmEventHandler is not null) return;
var self = Self;
_alarmEventHandler = (_, e) => self.Tell(new NativeAlarmRaised(e));
src.OnAlarmEvent += _alarmEventHandler;
}
/// <summary>Symmetric teardown — called from <see cref="DetachSubscription"/> and PostStop so a stale
/// handler never pushes to a disconnected actor.</summary>
private void DetachAlarmSource()
{
if (_driver is IAlarmSource src && _alarmEventHandler is not null)
src.OnAlarmEvent -= _alarmEventHandler;
_alarmEventHandler = null;
// Drop our handle so the next Connected entry re-subscribes; the desired alarm refs persist across
// reconnects. NOTE: this does NOT tear down the driver-side subscription. For a session-bound
// IAlarmSource the old subscription dies with the session (no accumulation). For a session-less feed
// (GalaxyDriver's always-on central monitor) it survives an in-place reconnect, so the re-subscribe
// is additive — but the driver now collapses to a single live handle on each SubscribeAlarmsAsync
// (GalaxyDriver.SubscribeAlarmsAsync clears the set before adding), so handles no longer accumulate
// across reconnects. The gate (Count > 0) and the one-shot fan-out are unchanged.
_alarmSubscriptionHandle = null;
}
/// <summary>Establish the native-alarm subscription that un-gates an <see cref="IAlarmSource"/> driver's
/// feed — the driver suppresses <see cref="IAlarmSource.OnAlarmEvent"/> until at least one alarm
/// subscription exists. Self-sends the async <see cref="SubscribeAlarms"/> the Connected behaviour
/// handles. Idempotent: a no-op unless the driver is an <see cref="IAlarmSource"/>, alarm refs are
/// desired, and no subscription is yet established.</summary>
private void SubscribeDesiredAlarms()
{
if (_driver is IAlarmSource && _desiredAlarmRefs.Count > 0 && _alarmSubscriptionHandle is null)
Self.Tell(new SubscribeAlarms());
}
/// <summary>Calls the driver's <see cref="IAlarmSource.SubscribeAlarmsAsync"/> (bounded) to register the
/// alarm subscription that un-gates its native feed, caching the returned handle. Re-checks the guard
/// (the desired set may have cleared, or another SubscribeAlarms may have already established a handle,
/// while this was queued). Failures are logged and retried on the next Connected entry — the feed simply
/// stays gated until then.</summary>
private async Task HandleSubscribeAlarmsAsync(SubscribeAlarms _)
{
if (_driver is not IAlarmSource src || _desiredAlarmRefs.Count == 0 || _alarmSubscriptionHandle is not null)
return;
var refs = _desiredAlarmRefs;
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
try
{
_alarmSubscriptionHandle = await src.SubscribeAlarmsAsync(refs, cts.Token);
_log.Info("DriverInstance {Id}: native-alarm subscription established for {Count} alarm ref(s) ({Diag})",
_driverInstanceId, refs.Count, _alarmSubscriptionHandle.DiagnosticId);
}
catch (Exception ex)
{
_log.Warning(ex, "DriverInstance {Id}: native-alarm subscription failed — feed stays gated until reconnect",
_driverInstanceId);
}
}
/// <summary>Kick the bounded post-connect re-discovery loop on a <c>Connected</c> entry. A no-op unless the
/// driver exposes <see cref="ITagDiscovery"/> (nothing to inject otherwise). Self-sends the first
/// <see cref="RediscoverTick"/> tagged with the current init generation so a tick that outlives a reconnect
/// is rejected by the generation guard in <see cref="HandleRediscoverAsync"/>.
/// <para>Honours the driver's <see cref="ITagDiscovery.RediscoverPolicy"/>: <c>Never</c> opts out entirely
/// (no tick scheduled); <c>Once</c> runs a single pass (the loop stops after the first publish in
/// <see cref="HandleRediscoverAsync"/>); <c>UntilStable</c> retries each (re)connect, bounded by
/// stop-on-stable (the discovered-set signature repeats) + the attempt cap.</para></summary>
private void StartDiscovery()
{
if (_driver is not ITagDiscovery discovery) return; // driver doesn't expose discovery — nothing to inject
if (discovery.RediscoverPolicy == DiscoveryRediscoverPolicy.Never)
{
// Driver opts out of post-connect discovery — don't even schedule the first tick.
_log.Debug("DriverInstance {Id}: RediscoverPolicy=Never — skipping post-connect discovery", _driverInstanceId);
return;
}
Self.Tell(new RediscoverTick(_initGeneration, Attempt: 0, PreviousSignature: string.Empty));
}
/// <summary>Runs one post-connect discovery pass: captures the driver's streamed FixedTree via a
/// <see cref="CapturingAddressSpaceBuilder"/> and ships the result to the parent as
/// <see cref="DiscoveredNodesReady"/> (empty/duplicate sets are fine — the parent dedups and injection
/// is idempotent). Retries on the <see cref="_rediscoverInterval"/> until the non-empty discovered SET
/// has STABILISED (the ordered-distinct full-reference signature repeats — robust for incremental/paged
/// browsers where a count alone could falsely settle a partial tree) or the <see cref="_rediscoverMaxAttempts"/>
/// cap is hit, whichever comes first; keeps retrying while empty because a FOCAS-style FixedTree cache may
/// still be populating.
/// <para>Limitation: this assumes a driver's discovered set only GROWS toward a stable shape (true for
/// FOCAS — its FixedTree appears once, and on the wonder deploy the driver-config <c>_options.Tags</c> is
/// empty so the set is 0 until the cache populates). A driver that emits an initial non-empty set and
/// later grows could stop early on a transient repeat; acceptable for current scope.</para></summary>
private async Task HandleRediscoverAsync(RediscoverTick tick)
{
if (tick.Generation != _initGeneration) return; // stale (a reconnect superseded this pass)
if (_driver is not ITagDiscovery discovery) return;
IReadOnlyList<DiscoveredNode> nodes;
try
{
var builder = new CapturingAddressSpaceBuilder();
// Bound the browse — ReceiveAsync suspends the mailbox for the whole handler, so an unbounded
// DiscoverAsync would block DisconnectObserved / ForceReconnect / writes / health-poll behind it.
using var cts = new CancellationTokenSource(_rediscoverDiscoverTimeout);
// NO ConfigureAwait(false): a genuinely-async DiscoverAsync (Galaxy / OpcUaClient / TwinCAT) must
// resume on the actor task scheduler so the Context.Parent.Tell + Timers calls below run with a
// live ActorContext. ConfigureAwait(false) would resume off-context and throw
// NotSupportedException("no active ActorContext") — see the same warning on HandleSubscribeAsync.
await discovery.DiscoverAsync(builder, cts.Token);
nodes = builder.Nodes.ToArray(); // immutable snapshot — never hand the builder's live list across actors
}
catch (Exception ex)
{
_log.Warning(ex, "DriverInstance {Id}: discovery pass {Attempt} failed; will retry", _driverInstanceId, tick.Attempt);
nodes = Array.Empty<DiscoveredNode>();
}
// Belt-and-suspenders: under ReceiveAsync the mailbox is suspended for the whole handler, so
// _initGeneration cannot change mid-await — the pre-await guard + Timers.Cancel("rediscover") on
// disconnect + single-timer key reuse are the primary protections. Re-checked in case that changes.
if (tick.Generation != _initGeneration) return;
Context.Parent.Tell(new DiscoveredNodesReady(_driverInstanceId, nodes));
// Honour the driver's re-discovery policy. A Once driver runs a single post-connect pass per
// (re)connect regardless of whether DiscoverAsync is synchronous or async — one published pass is
// complete, so the retry loop is skipped (no further tick scheduled). (Never never reaches here —
// StartDiscovery returns before the first tick.) UntilStable falls through to the stop-on-stable +
// attempt-cap logic below.
if (discovery.RediscoverPolicy == DiscoveryRediscoverPolicy.Once)
{
_log.Debug("DriverInstance {Id}: RediscoverPolicy=Once — single discovery pass, not scheduling another", _driverInstanceId);
return;
}
// Stop when the non-empty discovered SET has stabilised (its signature repeats), or the attempt cap
// is hit. Keep retrying while empty (a FixedTree cache may still be populating). First tick carries "".
var signature = string.Join('\u0001',
nodes.Select(n => n.FullReference).Distinct(StringComparer.Ordinal).OrderBy(x => x, StringComparer.Ordinal));
var stableNonEmpty = nodes.Count > 0 && string.Equals(signature, tick.PreviousSignature, StringComparison.Ordinal);
if (tick.Attempt + 1 < _rediscoverMaxAttempts && !stableNonEmpty)
Timers.StartSingleTimer("rediscover", new RediscoverTick(tick.Generation, tick.Attempt + 1, signature), _rediscoverInterval);
else
_log.Debug("DriverInstance {Id}: discovery settled after {Attempt} pass(es), {Count} node(s)", _driverInstanceId, tick.Attempt + 1, nodes.Count);
}
/// <summary>Records the host's desired subscription set without touching the live subscription.
/// The set is (re)applied by <see cref="ResubscribeDesired"/> on the next <c>Connected</c> entry.</summary>
private void StoreDesiredSubscriptions(SetDesiredSubscriptions msg)
{
_desiredRefs = msg.FullReferences;
_desiredInterval = msg.PublishingInterval;
_desiredAlarmRefs = msg.AlarmReferences ?? Array.Empty<string>();
}
/// <summary>Re-establish the desired subscription after (re)connecting. Self-sends the one-shot
/// <see cref="Subscribe"/> the Connected behaviour already handles (which drops any prior handle
/// first), so values resume streaming after a reconnect or redeploy without host involvement.</summary>
private void ResubscribeDesired()
{
if (_desiredRefs.Count > 0)
{
Self.Tell(new Subscribe(_desiredRefs, _desiredInterval));
}
}
private void OnDataChangeForward(DataChangeForward msg)
{
var quality = QualityFromStatus(msg.Snapshot.StatusCode);
var ts = msg.Snapshot.SourceTimestampUtc ?? msg.Snapshot.ServerTimestampUtc;
Context.Parent.Tell(new AttributeValuePublished(_driverInstanceId, msg.FullReference, msg.Snapshot.Value, quality, ts));
}
/// <summary>Translate an OPC UA status code to the 3-state <see cref="OpcUaQuality"/> projection
/// the publish actor consumes. Severity bits (top 2): 00 = Good, 01 = Uncertain, 10/11 = Bad.</summary>
private static OpcUaQuality QualityFromStatus(uint statusCode)
{
var severity = statusCode >> 30;
return severity switch
{
0 => OpcUaQuality.Good,
1 => OpcUaQuality.Uncertain,
_ => OpcUaQuality.Bad,
};
}
private static bool IsGoodStatus(uint statusCode) => (statusCode >> 30) == 0;
/// <summary>Records a transition into a Faulted / error state for the 5-minute sliding counter.</summary>
private void RecordFault()
{
_faultTimestamps.Enqueue(DateTime.UtcNow);
}
/// <summary>Returns how many fault transitions occurred in the last 5 minutes.</summary>
private int ErrorCount5Min()
{
var cutoff = DateTime.UtcNow.AddMinutes(-5);
while (_faultTimestamps.Count > 0 && _faultTimestamps.Peek() < cutoff)
_faultTimestamps.Dequeue();
return _faultTimestamps.Count;
}
/// <summary>
/// Polls <see cref="IDriver.GetHealth"/> and forwards the snapshot to the health publisher.
/// Called on every observable state change and by the periodic <see cref="HealthPollTick"/>
/// so the AdminUI snapshot store is warmed up for newly-joined SignalR clients.
/// Deduplicates: if the resulting (state, lastSuccess, lastError, errorCount) tuple matches
/// the last publish, this call is a no-op. Stops flood-publishing identical Healthy snapshots
/// every 30s when nothing has changed. Newly-joined SignalR clients still get the current
/// snapshot via <c>DriverStatusHub.JoinDriver</c> which reads the store directly.
/// </summary>
private void PublishHealthSnapshot()
{
try
{
var health = _driver.GetHealth();
var errorCount = ErrorCount5Min();
var fingerprint = (health.State, health.LastSuccessfulRead, health.LastError, errorCount);
if (_lastPublishedFingerprint is { } prev && prev.Equals(fingerprint))
return;
_lastPublishedFingerprint = fingerprint;
_healthPublisher.Publish(_clusterId, _driverInstanceId, health, errorCount);
}
catch (Exception ex)
{
_log.Warning(ex, "DriverInstance {Id}: GetHealth threw during health publish; skipping", _driverInstanceId);
}
}
/// <summary>Fingerprint of the last <see cref="PublishHealthSnapshot"/> call; null until first publish.</summary>
private (DriverState State, DateTime? LastSuccess, string? LastError, int ErrorCount)? _lastPublishedFingerprint;
/// <inheritdoc />
protected override void PostStop()
{
DetachSubscription();
try { _driver.ShutdownAsync(CancellationToken.None).GetAwaiter().GetResult(); }
catch (Exception ex) { _log.Warning(ex, "DriverInstance {Id}: ShutdownAsync threw on PostStop", _driverInstanceId); }
OtOpcUaTelemetry.DriverInstanceLifecycle.Add(1,
new KeyValuePair<string, object?>("event", "stop"),
new KeyValuePair<string, object?>("driver_type", _driver.DriverType));
}
}