950 lines
54 KiB
C#
950 lines
54 KiB
C#
using Akka.Actor;
|
||
using Akka.Event;
|
||
using ZB.MOM.WW.OtOpcUa.Commons.Observability;
|
||
using ZB.MOM.WW.OtOpcUa.Commons.OpcUa;
|
||
using ZB.MOM.WW.OtOpcUa.Commons.Types;
|
||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||
|
||
namespace ZB.MOM.WW.OtOpcUa.Runtime.Drivers;
|
||
|
||
// private timer key type — file-scoped so the name stays unique per-file
|
||
file sealed class HealthPollTick
|
||
{
|
||
public static readonly HealthPollTick Instance = new();
|
||
private HealthPollTick() { }
|
||
}
|
||
|
||
/// <summary>
|
||
/// Akka wrapper for a single <see cref="IDriver"/> instance. States:
|
||
///
|
||
/// <list type="bullet">
|
||
/// <item><c>Connecting</c> — calling <see cref="IDriver.InitializeAsync"/>.</item>
|
||
/// <item><c>Connected</c> — initialised; serving Read/Write/Subscribe requests.</item>
|
||
/// <item><c>Reconnecting</c> — disconnect observed; periodic retry of Initialize.</item>
|
||
/// <item><c>Failed</c> — terminal until parent restarts the actor.</item>
|
||
/// </list>
|
||
///
|
||
/// Engine wiring (subscriptions → AttributeValueUpdate publishes, ApplyDelta-driven Reinitialize,
|
||
/// per-tag write Asks) is staged for follow-up F7. This skeleton compiles + has a working
|
||
/// state machine so the Phase 6 control-plane integration tests can target it.
|
||
/// </summary>
|
||
public sealed class DriverInstanceActor : ReceiveActor, IWithTimers
|
||
{
|
||
public static readonly TimeSpan DefaultReconnectInterval = TimeSpan.FromSeconds(10);
|
||
|
||
/// <summary>Default interval between bounded post-connect re-discovery passes.</summary>
|
||
public static readonly TimeSpan DefaultRediscoverInterval = TimeSpan.FromSeconds(2);
|
||
|
||
/// <summary>Default cap on the number of post-connect re-discovery passes.</summary>
|
||
public const int DefaultRediscoverMaxAttempts = 15;
|
||
|
||
/// <summary>Default per-pass timeout for <see cref="ITagDiscovery.DiscoverAsync"/> during
|
||
/// bounded post-connect re-discovery. Bounds the mailbox suspension time; production default 30 s.</summary>
|
||
public static readonly TimeSpan DefaultRediscoverDiscoverTimeout = TimeSpan.FromSeconds(30);
|
||
|
||
public sealed record InitializeRequested(string DriverConfigJson);
|
||
public sealed record InitializeSucceeded(int Generation);
|
||
public sealed record InitializeFailed(string Reason, int Generation);
|
||
public sealed record DisconnectObserved(string Reason);
|
||
public sealed record ApplyDelta(string DriverConfigJson, CorrelationId Correlation);
|
||
public sealed record ApplyResult(bool Success, string? Reason, CorrelationId Correlation);
|
||
public sealed record WriteAttribute(string TagId, object Value);
|
||
public sealed record WriteAttributeResult(bool Success, string? Reason);
|
||
/// <summary>
|
||
/// Sent by <see cref="DriverHostActor"/> when an OPC UA client Acknowledges a NATIVE Part 9
|
||
/// condition (resolved from the condition NodeId to this driver via the host's alarm inverse map).
|
||
/// The actor forwards it to the driver's <see cref="IAlarmSource.AcknowledgeAsync"/>, carrying the
|
||
/// authored alarm full-reference (= the driver's <c>ConditionId</c>/AlarmFullReference) and the
|
||
/// authenticated principal. Mirrors <see cref="WriteAttribute"/>, but the ack is fire-and-forget:
|
||
/// the driver's <see cref="IAlarmSource.AcknowledgeAsync"/> returns no per-condition status and the
|
||
/// OPC UA Part 9 ack already committed the local condition state, so there is no reply to surface.
|
||
/// </summary>
|
||
/// <param name="ConditionId">The authored alarm full-reference the driver correlates the ack on
|
||
/// (= the equipment tag's <c>FullName</c>/AlarmFullReference).</param>
|
||
/// <param name="Comment">Operator-supplied comment forwarded to the upstream alarm system; null when none.</param>
|
||
/// <param name="OperatorUser">The authenticated principal performing the acknowledge.</param>
|
||
public sealed record RouteAlarmAck(string ConditionId, string? Comment, string OperatorUser);
|
||
public sealed record Subscribe(IReadOnlyList<string> FullReferences, TimeSpan PublishingInterval);
|
||
/// <summary>
|
||
/// Sets the set of references this driver should keep subscribed for the lifetime of the
|
||
/// current deployment. Unlike the one-shot <see cref="Subscribe"/>, the desired set is
|
||
/// retained and (re)established automatically every time the actor (re)enters
|
||
/// <c>Connected</c> — closing the F8b/#113 "re-subscribe across reconnects" gap and giving
|
||
/// <see cref="DriverHostActor"/> a single message to drive the SubscribeBulk pass after an
|
||
/// apply. Sending an empty set clears the desired subscription.
|
||
/// </summary>
|
||
/// <param name="AlarmReferences">
|
||
/// The native-alarm references (alarm-bearing equipment-tag FullNames = the driver's
|
||
/// <c>ConditionId</c>/AlarmFullReference) this driver should keep an alarm subscription open for.
|
||
/// An <see cref="IAlarmSource"/> driver suppresses <see cref="IAlarmSource.OnAlarmEvent"/> until at
|
||
/// least one alarm subscription exists, so the actor calls
|
||
/// <see cref="IAlarmSource.SubscribeAlarmsAsync"/> with this set to un-gate the native feed. Empty
|
||
/// (or null) means the driver has no alarm tags. Defaults to null so non-alarm callers are unchanged.
|
||
/// </param>
|
||
public sealed record SetDesiredSubscriptions(
|
||
IReadOnlyList<string> FullReferences,
|
||
TimeSpan PublishingInterval,
|
||
IReadOnlyList<string>? AlarmReferences = null);
|
||
public sealed record SubscriptionEstablished(string DiagnosticId, int ReferenceCount);
|
||
public sealed record SubscriptionFailed(string Reason);
|
||
public sealed record Unsubscribe;
|
||
/// <summary>
|
||
/// Sent by <see cref="DriverHostActor"/> when the AdminUI issues a Reconnect operation.
|
||
/// Pushes the actor out of <c>Connected</c> into <c>Reconnecting</c> so the transport is
|
||
/// re-established without fully stopping and respawning the actor. Safe to send in any
|
||
/// state — a no-op when already Reconnecting or Connecting.
|
||
/// </summary>
|
||
public sealed record ForceReconnect;
|
||
/// <summary>Published to the actor's parent whenever the subscribed IDriver fires
|
||
/// <see cref="ISubscribable.OnDataChange"/>. The parent forwards to OpcUaPublishActor.</summary>
|
||
public sealed record AttributeValuePublished(string DriverInstanceId, string FullReference, object? Value, OpcUaQuality Quality, DateTime TimestampUtc);
|
||
private sealed record DataChangeForward(string FullReference, DataValueSnapshot Snapshot);
|
||
/// <summary>Published to the parent whenever the subscribed driver (an <see cref="IAlarmSource"/>) fires
|
||
/// <see cref="IAlarmSource.OnAlarmEvent"/>. The parent (<see cref="DriverHostActor"/>) projects + routes it
|
||
/// to the materialised Part 9 condition. Parallels <see cref="AttributeValuePublished"/>.</summary>
|
||
public sealed record AttributeAlarmPublished(string DriverInstanceId, AlarmEventArgs Args);
|
||
private sealed record NativeAlarmRaised(AlarmEventArgs Args);
|
||
/// <summary>Self-sent on Connected entry / when alarm refs are (re)pushed, to establish the native-alarm
|
||
/// subscription that un-gates an <see cref="IAlarmSource"/> driver's feed. Handled async so the
|
||
/// <see cref="IAlarmSource.SubscribeAlarmsAsync"/> call is bounded + off the synchronous handlers.</summary>
|
||
private sealed record SubscribeAlarms;
|
||
/// <summary>Published to the parent (DriverHostActor) after each post-connect discovery pass so it can
|
||
/// graft the driver's discovered FixedTree nodes under the equipment. Empty/duplicate sets are fine —
|
||
/// the parent dedups and injection is idempotent.</summary>
|
||
public sealed record DiscoveredNodesReady(string DriverInstanceId, IReadOnlyList<DiscoveredNode> Nodes);
|
||
|
||
/// <summary>
|
||
/// Sent by <see cref="DriverHostActor"/> to ask this driver child to re-run post-connect discovery
|
||
/// after the host rebinds the driver to a new equipment. Handled only in <c>Connected</c>, where it
|
||
/// re-kicks <see cref="StartDiscovery"/> — which already honours the driver's
|
||
/// <see cref="ITagDiscovery.RediscoverPolicy"/> and the <see cref="ITagDiscovery"/> guard, tagging the
|
||
/// fresh pass with the current init generation. In any non-Connected state it is a deliberate no-op:
|
||
/// the driver's eventual (re)connect re-discovers anyway, so there is nothing to do and nothing to log.
|
||
/// </summary>
|
||
public sealed record TriggerRediscovery;
|
||
|
||
/// <summary>Internal self-tick driving bounded post-connect re-discovery (FixedTree populates ~0–2s after connect).
|
||
/// <paramref name="PreviousSignature"/> is the ordered-distinct full-reference signature of the prior pass's
|
||
/// captured set (empty string on the first tick); re-discovery stops once a non-empty set repeats it.</summary>
|
||
private sealed record RediscoverTick(int Generation, int Attempt, string PreviousSignature);
|
||
public sealed class RetryConnect
|
||
{
|
||
public static readonly RetryConnect Instance = new();
|
||
private RetryConnect() { }
|
||
}
|
||
|
||
/// <summary>Interval between periodic health-poll heartbeats sent to the snapshot store.</summary>
|
||
public static readonly TimeSpan HealthPollInterval = TimeSpan.FromSeconds(30);
|
||
|
||
private readonly IDriver _driver;
|
||
private readonly string _driverInstanceId;
|
||
private readonly string _clusterId;
|
||
private readonly IDriverHealthPublisher _healthPublisher;
|
||
private readonly TimeSpan _reconnectInterval;
|
||
|
||
/// <summary>Interval between bounded post-connect re-discovery passes. Production default 2s; tests
|
||
/// inject a tiny value so the loop runs without real-time waits.</summary>
|
||
private readonly TimeSpan _rediscoverInterval;
|
||
|
||
/// <summary>Cap on the number of post-connect re-discovery passes — a backstop so a never-stabilising
|
||
/// (or perpetually-empty) discovered set cannot spin the loop forever. Production default 15.</summary>
|
||
private readonly int _rediscoverMaxAttempts;
|
||
|
||
/// <summary>Per-pass timeout for <see cref="ITagDiscovery.DiscoverAsync"/> during bounded post-connect
|
||
/// re-discovery. Bounds the mailbox suspension time. Production default 30 s; tests may inject a shorter
|
||
/// value. Stored to allow injection rather than hardcoding.</summary>
|
||
private readonly TimeSpan _rediscoverDiscoverTimeout;
|
||
private readonly ILoggingAdapter _log = Context.GetLogger();
|
||
private string? _currentConfigJson;
|
||
|
||
/// <summary>Monotonic token tagging each <see cref="InitializeAsync"/> attempt. An init result is
|
||
/// honoured only when its generation matches the latest; an older result is from a superseded attempt
|
||
/// (e.g. an <see cref="ApplyDelta"/> adopted a new config mid-(re)connect) and is dropped. Touched only
|
||
/// on the actor thread, so no lock is needed.</summary>
|
||
private int _initGeneration;
|
||
|
||
/// <summary>
|
||
/// Timestamps of recent Faulted-state transitions; used to compute the 5-minute error count.
|
||
/// No lock needed — every read/write site runs inside an Akka message handler, which is
|
||
/// single-threaded per actor instance.
|
||
/// </summary>
|
||
private readonly Queue<DateTime> _faultTimestamps = new();
|
||
|
||
/// <summary>Active subscription handle (null when not subscribed). Tracks the current live
|
||
/// subscription; the actor auto-(re)subscribes on (re)connect and on each <see cref="Subscribe"/>
|
||
/// message via <see cref="ResubscribeDesired"/> / <see cref="HandleSubscribeAsync"/>, so callers
|
||
/// do not need to re-send subscription requests after a reconnect.</summary>
|
||
private ISubscriptionHandle? _subscriptionHandle;
|
||
private EventHandler<DataChangeEventArgs>? _dataChangeHandler;
|
||
private EventHandler<AlarmEventArgs>? _alarmEventHandler;
|
||
|
||
/// <summary>The references the host wants kept subscribed (set by <see cref="SetDesiredSubscriptions"/>).
|
||
/// Re-applied on every entry into <c>Connected</c> so values resume after a reconnect or redeploy.</summary>
|
||
private IReadOnlyList<string> _desiredRefs = Array.Empty<string>();
|
||
private TimeSpan _desiredInterval = TimeSpan.FromSeconds(1);
|
||
|
||
/// <summary>The native-alarm references the host wants kept subscribed (set by
|
||
/// <see cref="SetDesiredSubscriptions"/>). Re-applied on every <c>Connected</c> entry so the alarm
|
||
/// feed is re-un-gated after a reconnect/redeploy.</summary>
|
||
private IReadOnlyList<string> _desiredAlarmRefs = Array.Empty<string>();
|
||
|
||
/// <summary>The active native-alarm subscription handle for an <see cref="IAlarmSource"/> driver, or
|
||
/// null when none is established. Reset on <see cref="DetachAlarmSource"/> so the next Connected entry
|
||
/// re-subscribes against the freshly re-initialised driver; the null check makes the subscribe
|
||
/// idempotent across repeated <see cref="SetDesiredSubscriptions"/> pushes.</summary>
|
||
private IAlarmSubscriptionHandle? _alarmSubscriptionHandle;
|
||
|
||
/// <summary>
|
||
/// Gets or sets the timer scheduler for scheduling reconnection attempts.
|
||
/// </summary>
|
||
public ITimerScheduler Timers { get; set; } = null!;
|
||
|
||
/// <summary>
|
||
/// Creates a Props object for instantiating a <see cref="DriverInstanceActor"/>.
|
||
/// </summary>
|
||
/// <param name="driver">The driver instance to wrap.</param>
|
||
/// <param name="reconnectInterval">Optional interval for reconnection attempts; defaults to 10 seconds.</param>
|
||
/// <param name="startStubbed">If true, the actor starts in stub mode for testing or unavailable platforms.</param>
|
||
/// <param name="healthPublisher">Optional health publisher; defaults to <see cref="NullDriverHealthPublisher"/> so tests and
|
||
/// stub paths don't need to provide one.</param>
|
||
/// <param name="clusterId">Optional cluster identifier forwarded in <see cref="DriverHealthChanged"/> messages;
|
||
/// defaults to an empty string when not provided (e.g. in unit tests).</param>
|
||
/// <param name="rediscoverInterval">Optional interval between post-connect re-discovery passes; defaults to 2 seconds.</param>
|
||
/// <param name="rediscoverMaxAttempts">Optional cap on re-discovery passes; defaults to 15.</param>
|
||
/// <param name="rediscoverDiscoverTimeout">Optional per-pass timeout for <see cref="ITagDiscovery.DiscoverAsync"/>; defaults to 30 seconds.</param>
|
||
public static Props Props(
|
||
IDriver driver,
|
||
TimeSpan? reconnectInterval = null,
|
||
bool startStubbed = false,
|
||
IDriverHealthPublisher? healthPublisher = null,
|
||
string? clusterId = null,
|
||
TimeSpan? rediscoverInterval = null,
|
||
int rediscoverMaxAttempts = DefaultRediscoverMaxAttempts,
|
||
TimeSpan? rediscoverDiscoverTimeout = null) =>
|
||
Akka.Actor.Props.Create(() => new DriverInstanceActor(
|
||
driver,
|
||
reconnectInterval ?? DefaultReconnectInterval,
|
||
startStubbed,
|
||
healthPublisher ?? NullDriverHealthPublisher.Instance,
|
||
clusterId ?? string.Empty,
|
||
rediscoverInterval,
|
||
rediscoverMaxAttempts,
|
||
rediscoverDiscoverTimeout));
|
||
|
||
/// <summary>
|
||
/// Returns true when the driver should boot in DEV-STUB mode based on host platform and
|
||
/// configured roles. Only the v1 in-process types stay Windows-only:
|
||
/// <list type="bullet">
|
||
/// <item><c>"Galaxy"</c> — legacy MXAccess COM proxy (retired in PR 7.2; gated for any
|
||
/// leftover DriverInstance rows that still reference the old type name).</item>
|
||
/// <item><c>"Historian.Wonderware"</c> — Wonderware Historian sidecar over Windows-only
|
||
/// named pipes.</item>
|
||
/// </list>
|
||
/// The v2 <c>"GalaxyMxGateway"</c> driver talks gRPC to an external mxaccessgw process,
|
||
/// so it runs on any platform .NET 10 supports — Linux containers included. Not stubbed.
|
||
/// </summary>
|
||
/// <param name="driverType">The type identifier of the driver.</param>
|
||
/// <param name="roles">Operational roles configured for this instance.</param>
|
||
public static bool ShouldStub(string driverType, IEnumerable<string> roles)
|
||
{
|
||
var isWindowsOnly = driverType is "Galaxy" or "Historian.Wonderware";
|
||
if (!OperatingSystem.IsWindows() && isWindowsOnly) return true;
|
||
if (roles.Contains("dev") && isWindowsOnly) return true;
|
||
return false;
|
||
}
|
||
|
||
/// <summary>
|
||
/// Initializes a new instance of the <see cref="DriverInstanceActor"/> class.
|
||
/// </summary>
|
||
/// <param name="driver">The driver instance to wrap and manage.</param>
|
||
/// <param name="reconnectInterval">Interval between reconnection attempts.</param>
|
||
/// <param name="startStubbed">If true, start in stub mode for testing or unavailable platforms.</param>
|
||
/// <param name="healthPublisher">Sink for health-change notifications; must not be null.</param>
|
||
/// <param name="clusterId">Cluster identifier forwarded in health snapshots.</param>
|
||
/// <param name="rediscoverInterval">Interval between post-connect re-discovery passes; defaults to 2 seconds.</param>
|
||
/// <param name="rediscoverMaxAttempts">Cap on the number of re-discovery passes; defaults to 15.</param>
|
||
/// <param name="rediscoverDiscoverTimeout">Per-pass timeout for <see cref="ITagDiscovery.DiscoverAsync"/>; defaults to 30 seconds.</param>
|
||
public DriverInstanceActor(
|
||
IDriver driver,
|
||
TimeSpan reconnectInterval,
|
||
bool startStubbed = false,
|
||
IDriverHealthPublisher? healthPublisher = null,
|
||
string? clusterId = null,
|
||
TimeSpan? rediscoverInterval = null,
|
||
int rediscoverMaxAttempts = DefaultRediscoverMaxAttempts,
|
||
TimeSpan? rediscoverDiscoverTimeout = null)
|
||
{
|
||
_driver = driver;
|
||
_driverInstanceId = driver.DriverInstanceId;
|
||
_clusterId = clusterId ?? string.Empty;
|
||
_healthPublisher = healthPublisher ?? NullDriverHealthPublisher.Instance;
|
||
_reconnectInterval = reconnectInterval;
|
||
_rediscoverInterval = rediscoverInterval ?? DefaultRediscoverInterval;
|
||
_rediscoverMaxAttempts = rediscoverMaxAttempts;
|
||
_rediscoverDiscoverTimeout = rediscoverDiscoverTimeout ?? DefaultRediscoverDiscoverTimeout;
|
||
OtOpcUaTelemetry.DriverInstanceLifecycle.Add(1,
|
||
new KeyValuePair<string, object?>("event", startStubbed ? "spawn_stub" : "spawn"),
|
||
new KeyValuePair<string, object?>("driver_type", driver.DriverType));
|
||
if (startStubbed)
|
||
{
|
||
Context.GetLogger().Info("[DEV-STUB] driver={Name} type={Type}",
|
||
_driverInstanceId, driver.DriverType);
|
||
Become(Stubbed);
|
||
}
|
||
else
|
||
{
|
||
Become(Connecting);
|
||
}
|
||
}
|
||
|
||
/// <inheritdoc />
|
||
protected override void PreStart()
|
||
{
|
||
// Warm up the snapshot store immediately so AdminUI sees current state as soon as the
|
||
// actor starts, before any state transition fires. Also start the periodic heartbeat so
|
||
// long-lived Healthy drivers keep their snapshot fresh for newly-joined SignalR clients.
|
||
PublishHealthSnapshot();
|
||
Timers.StartPeriodicTimer("health-poll", HealthPollTick.Instance, HealthPollInterval);
|
||
}
|
||
|
||
private void Stubbed()
|
||
{
|
||
// Stubbed drivers accept the standard message contracts but return deterministic
|
||
// success without touching real hardware. Read returns null; Write succeeds.
|
||
Receive<InitializeRequested>(_ => { /* no-op */ });
|
||
Receive<ApplyDelta>(msg => Sender.Tell(new ApplyResult(true, "stubbed", msg.Correlation)));
|
||
Receive<WriteAttribute>(_ => Sender.Tell(new WriteAttributeResult(true, "stubbed")));
|
||
// Stubbed drivers have no upstream alarm system — swallow the ack (it's fire-and-forget, no reply).
|
||
Receive<RouteAlarmAck>(_ => { /* stubbed drivers have no alarm backend */ });
|
||
Receive<DisconnectObserved>(_ => { /* stubbed drivers don't disconnect */ });
|
||
Receive<ForceReconnect>(_ => { /* stubbed drivers don't reconnect */ });
|
||
Receive<SetDesiredSubscriptions>(StoreDesiredSubscriptions);
|
||
// Stubbed drivers never enter Connected, so they never kick discovery; swallow defensively in case a
|
||
// re-discovery self-tick is ever routed here so it doesn't surface as an Akka Unhandled message.
|
||
Receive<RediscoverTick>(_ => { });
|
||
// A TriggerRediscovery is meaningless to a stubbed (never-Connected) driver — silently ignore it.
|
||
Receive<TriggerRediscovery>(_ => { });
|
||
Receive<HealthPollTick>(_ => PublishHealthSnapshot());
|
||
}
|
||
|
||
private void Connecting()
|
||
{
|
||
Receive<InitializeRequested>(msg => InitializeAsync(msg.DriverConfigJson));
|
||
// Fast-fail writes while still connecting — without this the inbound WriteAttribute dead-letters
|
||
// and DriverHostActor.HandleRouteNodeWrite waits its full 8s Ask before reporting a generic
|
||
// "write timeout". Synchronous Receive: Sender.Tell on the actor thread is safe (#4a-instance).
|
||
Receive<WriteAttribute>(_ =>
|
||
Sender.Tell(new WriteAttributeResult(false, "driver not connected")));
|
||
// An ack arriving while still connecting can't reach the upstream alarm system; drop it (the ack is
|
||
// fire-and-forget — no reply to surface — and the OPC UA condition state already committed locally).
|
||
Receive<RouteAlarmAck>(_ =>
|
||
_log.Debug("DriverInstance {Id}: alarm ack arrived during connect — dropped (driver not connected)", _driverInstanceId));
|
||
Receive<ApplyDelta>(AdoptConfigDuringInit);
|
||
Receive<InitializeSucceeded>(msg =>
|
||
{
|
||
if (msg.Generation != _initGeneration) return;
|
||
_log.Info("DriverInstance {Id}: connected", _driverInstanceId);
|
||
Become(Connected);
|
||
PublishHealthSnapshot();
|
||
ResubscribeDesired();
|
||
AttachAlarmSource();
|
||
SubscribeDesiredAlarms();
|
||
StartDiscovery();
|
||
});
|
||
Receive<InitializeFailed>(msg =>
|
||
{
|
||
if (msg.Generation != _initGeneration) return;
|
||
_log.Warning("DriverInstance {Id}: initialize failed: {Reason}", _driverInstanceId, msg.Reason);
|
||
RecordFault();
|
||
Become(Reconnecting);
|
||
PublishHealthSnapshot();
|
||
});
|
||
Receive<SetDesiredSubscriptions>(StoreDesiredSubscriptions);
|
||
Receive<ForceReconnect>(_ => { /* already connecting — no-op */ });
|
||
// ResubscribeDesired self-Tells Subscribe; HandleSubscribeAsync replies SubscriptionEstablished to the
|
||
// sender, which on the self-resubscribe path is Self. Swallow it (trace only) so it doesn't dead-letter.
|
||
Receive<SubscriptionEstablished>(msg =>
|
||
_log.Debug("DriverInstance {Id}: subscription established ({Count} refs, {Diag})",
|
||
_driverInstanceId, msg.ReferenceCount, msg.DiagnosticId));
|
||
// Symmetric to the SubscriptionEstablished swallow: a failed self-resubscribe replies SubscriptionFailed
|
||
// to Self (HandleSubscribeAsync already logged the underlying cause). Swallow it so it doesn't dead-letter.
|
||
Receive<SubscriptionFailed>(msg =>
|
||
_log.Debug("DriverInstance {Id}: resubscribe reported failure: {Reason}", _driverInstanceId, msg.Reason));
|
||
// A native alarm transition can race in while still (re)connecting (the driver's feed runs on its
|
||
// own thread); drop it — the feed re-delivers active alarms once Connected. Trace only.
|
||
Receive<NativeAlarmRaised>(_ =>
|
||
_log.Debug("DriverInstance {Id}: native alarm arrived during connect — dropped (feed re-delivers)", _driverInstanceId));
|
||
// A SubscribeAlarms self-tell (from Connected) can be overtaken by an already-queued disconnect into
|
||
// this state; swallow it so it doesn't dead-letter — the next Connected entry re-subscribes.
|
||
Receive<SubscribeAlarms>(_ => { });
|
||
// Likewise the attempt-0 re-discovery self-tick (sent on Connected entry) can be overtaken by an
|
||
// already-queued disconnect; swallow it — the next Connected entry re-kicks discovery.
|
||
Receive<RediscoverTick>(_ => { });
|
||
// A TriggerRediscovery arriving while not Connected is a deliberate no-op — the (re)connect path
|
||
// re-runs discovery anyway. Swallow it so it stays a clean silent no-op (no Unhandled event).
|
||
Receive<TriggerRediscovery>(_ => { });
|
||
Receive<HealthPollTick>(_ => PublishHealthSnapshot());
|
||
}
|
||
|
||
private void Connected()
|
||
{
|
||
ReceiveAsync<ApplyDelta>(HandleApplyDeltaAsync);
|
||
Receive<DisconnectObserved>(msg =>
|
||
{
|
||
_log.Warning("DriverInstance {Id}: disconnect observed ({Reason}); reconnecting",
|
||
_driverInstanceId, msg.Reason);
|
||
Timers.Cancel("rediscover");
|
||
DetachSubscription();
|
||
RecordFault();
|
||
Become(Reconnecting);
|
||
PublishHealthSnapshot();
|
||
});
|
||
Receive<ForceReconnect>(_ =>
|
||
{
|
||
_log.Info("DriverInstance {Id}: ForceReconnect requested by admin; re-entering Reconnecting", _driverInstanceId);
|
||
Timers.Cancel("rediscover");
|
||
DetachSubscription();
|
||
Become(Reconnecting);
|
||
PublishHealthSnapshot();
|
||
});
|
||
ReceiveAsync<RediscoverTick>(HandleRediscoverAsync);
|
||
// The host asks for a fresh discovery pass after rebinding the driver to a new equipment. Re-kick the
|
||
// bounded loop via StartDiscovery (honours RediscoverPolicy + the ITagDiscovery guard, tagged with the
|
||
// current _initGeneration). Only handled here in Connected — non-Connected states no-op it below.
|
||
Receive<TriggerRediscovery>(_ => StartDiscovery());
|
||
ReceiveAsync<WriteAttribute>(HandleWriteAsync);
|
||
ReceiveAsync<RouteAlarmAck>(HandleAcknowledgeAsync);
|
||
ReceiveAsync<Subscribe>(HandleSubscribeAsync);
|
||
ReceiveAsync<Unsubscribe>(_ => UnsubscribeAsync());
|
||
Receive<SetDesiredSubscriptions>(msg =>
|
||
{
|
||
StoreDesiredSubscriptions(msg);
|
||
if (_desiredRefs.Count > 0) Self.Tell(new Subscribe(_desiredRefs, _desiredInterval));
|
||
else if (_subscriptionHandle is not null) Self.Tell(new Unsubscribe());
|
||
// Native-alarm analogue: un-gate the IAlarmSource feed when alarm tags are (now) present. The
|
||
// common live path — a deploy delivers SetDesiredSubscriptions while the driver is already
|
||
// Connected — flows through HERE, so the alarm subscribe must happen on this message, not only
|
||
// on Connected entry. Unlike the value path above, there is deliberately no unsubscribe-on-empty:
|
||
// a removed alarm tag's condition node is torn down on the address-space rebuild and
|
||
// DriverHostActor.ForwardNativeAlarm drops any transition whose ConditionId no longer maps, so a
|
||
// lingering session-less alarm subscription can never surface a removed alarm.
|
||
SubscribeDesiredAlarms();
|
||
});
|
||
ReceiveAsync<SubscribeAlarms>(HandleSubscribeAlarmsAsync);
|
||
Receive<DataChangeForward>(OnDataChangeForward);
|
||
// Native alarm transition marshaled onto the actor thread from the driver's OnAlarmEvent;
|
||
// project it to the parent the same way DataChangeForward projects AttributeValuePublished.
|
||
Receive<NativeAlarmRaised>(m => Context.Parent.Tell(new AttributeAlarmPublished(_driverInstanceId, m.Args)));
|
||
// ResubscribeDesired self-Tells Subscribe; HandleSubscribeAsync replies SubscriptionEstablished to the
|
||
// sender, which on the self-resubscribe path is Self. Swallow it (trace only) so it doesn't dead-letter.
|
||
Receive<SubscriptionEstablished>(msg =>
|
||
_log.Debug("DriverInstance {Id}: subscription established ({Count} refs, {Diag})",
|
||
_driverInstanceId, msg.ReferenceCount, msg.DiagnosticId));
|
||
// Symmetric to the SubscriptionEstablished swallow: a failed self-resubscribe replies SubscriptionFailed
|
||
// to Self (HandleSubscribeAsync already logged the underlying cause). Swallow it so it doesn't dead-letter.
|
||
Receive<SubscriptionFailed>(msg =>
|
||
_log.Debug("DriverInstance {Id}: resubscribe reported failure: {Reason}", _driverInstanceId, msg.Reason));
|
||
Receive<HealthPollTick>(_ => PublishHealthSnapshot());
|
||
}
|
||
|
||
private void Reconnecting()
|
||
{
|
||
Receive<RetryConnect>(_ => InitializeAsync(_currentConfigJson ?? "{}"));
|
||
// Fast-fail writes while reconnecting (same reason as Connecting — avoids the 8s host Ask
|
||
// timeout on an inbound write to a transiently-down driver). Synchronous Receive (#4a-instance).
|
||
Receive<WriteAttribute>(_ =>
|
||
Sender.Tell(new WriteAttributeResult(false, "driver not connected")));
|
||
// An ack arriving while reconnecting can't reach the upstream alarm system; drop it (fire-and-forget,
|
||
// no reply — the OPC UA condition state already committed locally on the Part 9 ack).
|
||
Receive<RouteAlarmAck>(_ =>
|
||
_log.Debug("DriverInstance {Id}: alarm ack arrived during reconnect — dropped (driver not connected)", _driverInstanceId));
|
||
Receive<ApplyDelta>(AdoptConfigDuringInit);
|
||
Receive<InitializeSucceeded>(msg =>
|
||
{
|
||
if (msg.Generation != _initGeneration) return;
|
||
Timers.Cancel("retry-connect");
|
||
_log.Info("DriverInstance {Id}: reconnected", _driverInstanceId);
|
||
Become(Connected);
|
||
PublishHealthSnapshot();
|
||
ResubscribeDesired();
|
||
AttachAlarmSource();
|
||
SubscribeDesiredAlarms();
|
||
StartDiscovery(); // re-run discovery on reconnect — keeps the injected tree fresh if the backend's capabilities changed
|
||
});
|
||
// A failure here is a no-op regardless of generation — the retry timer keeps trying the
|
||
// current config; only a (generation-matched) InitializeSucceeded transitions state.
|
||
Receive<InitializeFailed>(_ => { /* keep retrying via timer */ });
|
||
Receive<SetDesiredSubscriptions>(StoreDesiredSubscriptions);
|
||
Receive<ForceReconnect>(_ => { /* already reconnecting — no-op */ });
|
||
// ResubscribeDesired self-Tells Subscribe; HandleSubscribeAsync replies SubscriptionEstablished to the
|
||
// sender, which on the self-resubscribe path is Self. Swallow it (trace only) so it doesn't dead-letter.
|
||
Receive<SubscriptionEstablished>(msg =>
|
||
_log.Debug("DriverInstance {Id}: subscription established ({Count} refs, {Diag})",
|
||
_driverInstanceId, msg.ReferenceCount, msg.DiagnosticId));
|
||
// Symmetric to the SubscriptionEstablished swallow: a failed self-resubscribe replies SubscriptionFailed
|
||
// to Self (HandleSubscribeAsync already logged the underlying cause). Swallow it so it doesn't dead-letter.
|
||
Receive<SubscriptionFailed>(msg =>
|
||
_log.Debug("DriverInstance {Id}: resubscribe reported failure: {Reason}", _driverInstanceId, msg.Reason));
|
||
// A native alarm transition can race in while still reconnecting (the driver's feed runs on its
|
||
// own thread); drop it — the feed re-delivers active alarms once Connected. Trace only.
|
||
Receive<NativeAlarmRaised>(_ =>
|
||
_log.Debug("DriverInstance {Id}: native alarm arrived during reconnect — dropped (feed re-delivers)", _driverInstanceId));
|
||
// A SubscribeAlarms self-tell (from Connected) can be overtaken by an already-queued disconnect into
|
||
// this state; swallow it so it doesn't dead-letter — the next Connected entry re-subscribes.
|
||
Receive<SubscribeAlarms>(_ => { });
|
||
// Likewise the attempt-0 re-discovery self-tick (sent on Connected entry) can be overtaken by an
|
||
// already-queued disconnect; swallow it — the next Connected entry re-kicks discovery.
|
||
Receive<RediscoverTick>(_ => { });
|
||
// A TriggerRediscovery arriving while not Connected is a deliberate no-op — the (re)connect path
|
||
// re-runs discovery anyway. Swallow it so it stays a clean silent no-op (no Unhandled event).
|
||
Receive<TriggerRediscovery>(_ => { });
|
||
Receive<HealthPollTick>(_ => PublishHealthSnapshot());
|
||
Timers.StartPeriodicTimer("retry-connect", RetryConnect.Instance, _reconnectInterval);
|
||
}
|
||
|
||
private void InitializeAsync(string driverConfigJson)
|
||
{
|
||
_currentConfigJson = driverConfigJson;
|
||
var generation = ++_initGeneration;
|
||
var self = Self;
|
||
_ = Task.Run(async () =>
|
||
{
|
||
try
|
||
{
|
||
await _driver.InitializeAsync(driverConfigJson, CancellationToken.None);
|
||
self.Tell(new InitializeSucceeded(generation));
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
self.Tell(new InitializeFailed(ex.Message, generation));
|
||
}
|
||
});
|
||
}
|
||
|
||
/// <summary>Adopt a new config while not connected: ApplyDelta in Connecting/Reconnecting re-inits
|
||
/// immediately with the new config. <see cref="InitializeAsync"/> swaps <c>_currentConfigJson</c> and
|
||
/// bumps the generation, so the in-flight (old-config) init is superseded and its result is dropped.
|
||
/// The actor stays in its current state; the new init's result drives the next transition. In
|
||
/// Reconnecting the retry timer is left running — if this immediate attempt fails it keeps retrying
|
||
/// the new config (a redundant concurrent attempt is deduped by the generation guard).</summary>
|
||
private void AdoptConfigDuringInit(ApplyDelta msg)
|
||
{
|
||
_log.Info("DriverInstance {Id}: ApplyDelta during (re)connect — adopting new config, re-initialising now",
|
||
_driverInstanceId);
|
||
InitializeAsync(msg.DriverConfigJson);
|
||
Sender.Tell(new ApplyResult(true, "config adopted; reinitializing", msg.Correlation));
|
||
}
|
||
|
||
private async Task HandleApplyDeltaAsync(ApplyDelta msg)
|
||
{
|
||
var replyTo = Sender;
|
||
try
|
||
{
|
||
await _driver.ReinitializeAsync(msg.DriverConfigJson, CancellationToken.None);
|
||
_currentConfigJson = msg.DriverConfigJson;
|
||
replyTo.Tell(new ApplyResult(true, null, msg.Correlation));
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
replyTo.Tell(new ApplyResult(false, ex.Message, msg.Correlation));
|
||
}
|
||
}
|
||
|
||
private async Task HandleWriteAsync(WriteAttribute msg)
|
||
{
|
||
var replyTo = Sender;
|
||
if (_driver is not IWritable writable)
|
||
{
|
||
replyTo.Tell(new WriteAttributeResult(false, "Driver does not implement IWritable"));
|
||
return;
|
||
}
|
||
|
||
var request = new[] { new WriteRequest(msg.TagId, msg.Value) };
|
||
// Bound the write so a hung backend can't pin this actor forever — decision #44/#45 keeps
|
||
// retry off by default, but a stalled call still needs an answer.
|
||
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
||
try
|
||
{
|
||
var results = await writable.WriteAsync(request, cts.Token);
|
||
if (results is { Count: 1 } && IsGoodStatus(results[0].StatusCode))
|
||
{
|
||
replyTo.Tell(new WriteAttributeResult(true, null));
|
||
return;
|
||
}
|
||
var status = results is { Count: > 0 } ? results[0].StatusCode : 0xFFFFFFFF;
|
||
replyTo.Tell(new WriteAttributeResult(false, $"StatusCode=0x{status:X8}"));
|
||
}
|
||
catch (OperationCanceledException)
|
||
{
|
||
replyTo.Tell(new WriteAttributeResult(false, "write timeout"));
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
replyTo.Tell(new WriteAttributeResult(false, ex.Message));
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// Forwards an inbound native-condition acknowledge (routed by <see cref="DriverHostActor"/> from a
|
||
/// resolved condition NodeId) to the driver's <see cref="IAlarmSource.AcknowledgeAsync"/>. The driver
|
||
/// correlates on <see cref="AlarmAcknowledgeRequest.ConditionId"/> (= the authored alarm
|
||
/// full-reference); <see cref="AlarmAcknowledgeRequest.SourceNodeId"/> carries the same reference (the
|
||
/// driver's ack path keys on ConditionId). Bounded to 5s so a hung backend can't pin this actor.
|
||
/// Fire-and-forget — the OPC UA Part 9 ack already committed the local condition state and
|
||
/// <see cref="IAlarmSource.AcknowledgeAsync"/> returns no per-condition status — so there is no reply;
|
||
/// a failure is logged and dropped (the local condition stays Acknowledged regardless).
|
||
/// </summary>
|
||
private async Task HandleAcknowledgeAsync(RouteAlarmAck msg)
|
||
{
|
||
if (_driver is not IAlarmSource src)
|
||
{
|
||
_log.Warning("DriverInstance {Id}: alarm ack dropped — driver does not implement IAlarmSource", _driverInstanceId);
|
||
return;
|
||
}
|
||
|
||
var request = new[]
|
||
{
|
||
new AlarmAcknowledgeRequest(
|
||
SourceNodeId: msg.ConditionId,
|
||
ConditionId: msg.ConditionId,
|
||
Comment: msg.Comment,
|
||
OperatorUser: msg.OperatorUser),
|
||
};
|
||
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
||
try
|
||
{
|
||
await src.AcknowledgeAsync(request, cts.Token);
|
||
_log.Info("DriverInstance {Id}: acknowledged native condition {Cond} by {User}",
|
||
_driverInstanceId, msg.ConditionId, msg.OperatorUser);
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
_log.Warning(ex, "DriverInstance {Id}: native-alarm acknowledge of {Cond} failed",
|
||
_driverInstanceId, msg.ConditionId);
|
||
}
|
||
}
|
||
|
||
private async Task HandleSubscribeAsync(Subscribe msg)
|
||
{
|
||
// Capture Sender/Self BEFORE any await. The re-subscribe path below awaits
|
||
// UnsubscribeAsync, and a real async backend can resume the continuation off Akka's
|
||
// ActorContext — reading raw Sender/Self/Context past that point throws
|
||
// NotSupportedException ("no active ActorContext"). Keep ConfigureAwait off the awaits
|
||
// in this handler so continuations resume on the actor context.
|
||
var replyTo = Sender;
|
||
var self = Self;
|
||
if (_driver is not ISubscribable subscribable)
|
||
{
|
||
replyTo.Tell(new SubscriptionFailed("Driver does not implement ISubscribable"));
|
||
return;
|
||
}
|
||
if (_subscriptionHandle is not null)
|
||
{
|
||
// Subscribe-twice — drop the prior subscription before establishing the new one.
|
||
await UnsubscribeAsync();
|
||
}
|
||
|
||
try
|
||
{
|
||
_dataChangeHandler = (_, args) => self.Tell(new DataChangeForward(args.FullReference, args.Snapshot));
|
||
subscribable.OnDataChange += _dataChangeHandler;
|
||
|
||
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
|
||
_subscriptionHandle = await subscribable
|
||
.SubscribeAsync(msg.FullReferences, msg.PublishingInterval, cts.Token);
|
||
|
||
replyTo.Tell(new SubscriptionEstablished(_subscriptionHandle.DiagnosticId, msg.FullReferences.Count));
|
||
_log.Info("DriverInstance {Id}: subscribed to {Count} refs ({Diag})",
|
||
_driverInstanceId, msg.FullReferences.Count, _subscriptionHandle.DiagnosticId);
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
DetachSubscription();
|
||
_log.Warning(ex, "DriverInstance {Id}: subscribe failed", _driverInstanceId);
|
||
replyTo.Tell(new SubscriptionFailed(ex.Message));
|
||
}
|
||
}
|
||
|
||
private async Task UnsubscribeAsync()
|
||
{
|
||
if (_driver is not ISubscribable subscribable || _subscriptionHandle is null)
|
||
{
|
||
DetachSubscription();
|
||
return;
|
||
}
|
||
try
|
||
{
|
||
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
||
await subscribable.UnsubscribeAsync(_subscriptionHandle, cts.Token);
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
_log.Warning(ex, "DriverInstance {Id}: unsubscribe threw (continuing)", _driverInstanceId);
|
||
}
|
||
finally
|
||
{
|
||
DetachSubscription();
|
||
}
|
||
}
|
||
|
||
/// <summary>Tear down the data-change + native-alarm event handlers + null the handle. Called from the
|
||
/// Unsubscribe path, on PostStop, and on Connected → Reconnecting transitions so a stale handler doesn't
|
||
/// push data-change / alarm events to an actor that has lost its driver connection.</summary>
|
||
private void DetachSubscription()
|
||
{
|
||
if (_driver is ISubscribable subscribable && _dataChangeHandler is not null)
|
||
{
|
||
subscribable.OnDataChange -= _dataChangeHandler;
|
||
}
|
||
_dataChangeHandler = null;
|
||
_subscriptionHandle = null;
|
||
DetachAlarmSource();
|
||
}
|
||
|
||
/// <summary>Subscribe the driver's native alarm event (if it is an <see cref="IAlarmSource"/>),
|
||
/// marshaling each transition to the actor thread. Idempotent; mirrors the OnDataChange attach.</summary>
|
||
private void AttachAlarmSource()
|
||
{
|
||
if (_driver is not IAlarmSource src || _alarmEventHandler is not null) return;
|
||
var self = Self;
|
||
_alarmEventHandler = (_, e) => self.Tell(new NativeAlarmRaised(e));
|
||
src.OnAlarmEvent += _alarmEventHandler;
|
||
}
|
||
|
||
/// <summary>Symmetric teardown — called from <see cref="DetachSubscription"/> and PostStop so a stale
|
||
/// handler never pushes to a disconnected actor.</summary>
|
||
private void DetachAlarmSource()
|
||
{
|
||
if (_driver is IAlarmSource src && _alarmEventHandler is not null)
|
||
src.OnAlarmEvent -= _alarmEventHandler;
|
||
_alarmEventHandler = null;
|
||
// Drop our handle so the next Connected entry re-subscribes; the desired alarm refs persist across
|
||
// reconnects. NOTE: this does NOT tear down the driver-side subscription. For a session-bound
|
||
// IAlarmSource the old subscription dies with the session (no accumulation). For a session-less feed
|
||
// (GalaxyDriver's always-on central monitor) it survives an in-place reconnect, so the re-subscribe
|
||
// is additive — but the driver now collapses to a single live handle on each SubscribeAlarmsAsync
|
||
// (GalaxyDriver.SubscribeAlarmsAsync clears the set before adding), so handles no longer accumulate
|
||
// across reconnects. The gate (Count > 0) and the one-shot fan-out are unchanged.
|
||
_alarmSubscriptionHandle = null;
|
||
}
|
||
|
||
/// <summary>Establish the native-alarm subscription that un-gates an <see cref="IAlarmSource"/> driver's
|
||
/// feed — the driver suppresses <see cref="IAlarmSource.OnAlarmEvent"/> until at least one alarm
|
||
/// subscription exists. Self-sends the async <see cref="SubscribeAlarms"/> the Connected behaviour
|
||
/// handles. Idempotent: a no-op unless the driver is an <see cref="IAlarmSource"/>, alarm refs are
|
||
/// desired, and no subscription is yet established.</summary>
|
||
private void SubscribeDesiredAlarms()
|
||
{
|
||
if (_driver is IAlarmSource && _desiredAlarmRefs.Count > 0 && _alarmSubscriptionHandle is null)
|
||
Self.Tell(new SubscribeAlarms());
|
||
}
|
||
|
||
/// <summary>Calls the driver's <see cref="IAlarmSource.SubscribeAlarmsAsync"/> (bounded) to register the
|
||
/// alarm subscription that un-gates its native feed, caching the returned handle. Re-checks the guard
|
||
/// (the desired set may have cleared, or another SubscribeAlarms may have already established a handle,
|
||
/// while this was queued). Failures are logged and retried on the next Connected entry — the feed simply
|
||
/// stays gated until then.</summary>
|
||
private async Task HandleSubscribeAlarmsAsync(SubscribeAlarms _)
|
||
{
|
||
if (_driver is not IAlarmSource src || _desiredAlarmRefs.Count == 0 || _alarmSubscriptionHandle is not null)
|
||
return;
|
||
var refs = _desiredAlarmRefs;
|
||
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
|
||
try
|
||
{
|
||
_alarmSubscriptionHandle = await src.SubscribeAlarmsAsync(refs, cts.Token);
|
||
_log.Info("DriverInstance {Id}: native-alarm subscription established for {Count} alarm ref(s) ({Diag})",
|
||
_driverInstanceId, refs.Count, _alarmSubscriptionHandle.DiagnosticId);
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
_log.Warning(ex, "DriverInstance {Id}: native-alarm subscription failed — feed stays gated until reconnect",
|
||
_driverInstanceId);
|
||
}
|
||
}
|
||
|
||
/// <summary>Kick the bounded post-connect re-discovery loop on a <c>Connected</c> entry. A no-op unless the
|
||
/// driver exposes <see cref="ITagDiscovery"/> (nothing to inject otherwise). Self-sends the first
|
||
/// <see cref="RediscoverTick"/> tagged with the current init generation so a tick that outlives a reconnect
|
||
/// is rejected by the generation guard in <see cref="HandleRediscoverAsync"/>.
|
||
/// <para>Honours the driver's <see cref="ITagDiscovery.RediscoverPolicy"/>: <c>Never</c> opts out entirely
|
||
/// (no tick scheduled); <c>Once</c> runs a single pass (the loop stops after the first publish in
|
||
/// <see cref="HandleRediscoverAsync"/>); <c>UntilStable</c> retries each (re)connect, bounded by
|
||
/// stop-on-stable (the discovered-set signature repeats) + the attempt cap.</para></summary>
|
||
private void StartDiscovery()
|
||
{
|
||
if (_driver is not ITagDiscovery discovery) return; // driver doesn't expose discovery — nothing to inject
|
||
if (discovery.RediscoverPolicy == DiscoveryRediscoverPolicy.Never)
|
||
{
|
||
// Driver opts out of post-connect discovery — don't even schedule the first tick.
|
||
_log.Debug("DriverInstance {Id}: RediscoverPolicy=Never — skipping post-connect discovery", _driverInstanceId);
|
||
return;
|
||
}
|
||
Self.Tell(new RediscoverTick(_initGeneration, Attempt: 0, PreviousSignature: string.Empty));
|
||
}
|
||
|
||
/// <summary>Runs one post-connect discovery pass: captures the driver's streamed FixedTree via a
|
||
/// <see cref="CapturingAddressSpaceBuilder"/> and ships the result to the parent as
|
||
/// <see cref="DiscoveredNodesReady"/> (empty/duplicate sets are fine — the parent dedups and injection
|
||
/// is idempotent). Retries on the <see cref="_rediscoverInterval"/> until the non-empty discovered SET
|
||
/// has STABILISED (the ordered-distinct full-reference signature repeats — robust for incremental/paged
|
||
/// browsers where a count alone could falsely settle a partial tree) or the <see cref="_rediscoverMaxAttempts"/>
|
||
/// cap is hit, whichever comes first; keeps retrying while empty because a FOCAS-style FixedTree cache may
|
||
/// still be populating.
|
||
/// <para>Limitation: this assumes a driver's discovered set only GROWS toward a stable shape (true for
|
||
/// FOCAS — its FixedTree appears once, and on the wonder deploy the driver-config <c>_options.Tags</c> is
|
||
/// empty so the set is 0 until the cache populates). A driver that emits an initial non-empty set and
|
||
/// later grows could stop early on a transient repeat; acceptable for current scope.</para></summary>
|
||
private async Task HandleRediscoverAsync(RediscoverTick tick)
|
||
{
|
||
if (tick.Generation != _initGeneration) return; // stale (a reconnect superseded this pass)
|
||
if (_driver is not ITagDiscovery discovery) return;
|
||
|
||
IReadOnlyList<DiscoveredNode> nodes;
|
||
try
|
||
{
|
||
var builder = new CapturingAddressSpaceBuilder();
|
||
// Bound the browse — ReceiveAsync suspends the mailbox for the whole handler, so an unbounded
|
||
// DiscoverAsync would block DisconnectObserved / ForceReconnect / writes / health-poll behind it.
|
||
using var cts = new CancellationTokenSource(_rediscoverDiscoverTimeout);
|
||
// NO ConfigureAwait(false): a genuinely-async DiscoverAsync (Galaxy / OpcUaClient / TwinCAT) must
|
||
// resume on the actor task scheduler so the Context.Parent.Tell + Timers calls below run with a
|
||
// live ActorContext. ConfigureAwait(false) would resume off-context and throw
|
||
// NotSupportedException("no active ActorContext") — see the same warning on HandleSubscribeAsync.
|
||
await discovery.DiscoverAsync(builder, cts.Token);
|
||
nodes = builder.Nodes.ToArray(); // immutable snapshot — never hand the builder's live list across actors
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
_log.Warning(ex, "DriverInstance {Id}: discovery pass {Attempt} failed; will retry", _driverInstanceId, tick.Attempt);
|
||
nodes = Array.Empty<DiscoveredNode>();
|
||
}
|
||
|
||
// Belt-and-suspenders: under ReceiveAsync the mailbox is suspended for the whole handler, so
|
||
// _initGeneration cannot change mid-await — the pre-await guard + Timers.Cancel("rediscover") on
|
||
// disconnect + single-timer key reuse are the primary protections. Re-checked in case that changes.
|
||
if (tick.Generation != _initGeneration) return;
|
||
|
||
Context.Parent.Tell(new DiscoveredNodesReady(_driverInstanceId, nodes));
|
||
|
||
// Honour the driver's re-discovery policy. A Once driver runs a single post-connect pass per
|
||
// (re)connect regardless of whether DiscoverAsync is synchronous or async — one published pass is
|
||
// complete, so the retry loop is skipped (no further tick scheduled). (Never never reaches here —
|
||
// StartDiscovery returns before the first tick.) UntilStable falls through to the stop-on-stable +
|
||
// attempt-cap logic below.
|
||
if (discovery.RediscoverPolicy == DiscoveryRediscoverPolicy.Once)
|
||
{
|
||
_log.Debug("DriverInstance {Id}: RediscoverPolicy=Once — single discovery pass, not scheduling another", _driverInstanceId);
|
||
return;
|
||
}
|
||
|
||
// Stop when the non-empty discovered SET has stabilised (its signature repeats), or the attempt cap
|
||
// is hit. Keep retrying while empty (a FixedTree cache may still be populating). First tick carries "".
|
||
var signature = string.Join('\u0001',
|
||
nodes.Select(n => n.FullReference).Distinct(StringComparer.Ordinal).OrderBy(x => x, StringComparer.Ordinal));
|
||
var stableNonEmpty = nodes.Count > 0 && string.Equals(signature, tick.PreviousSignature, StringComparison.Ordinal);
|
||
if (tick.Attempt + 1 < _rediscoverMaxAttempts && !stableNonEmpty)
|
||
Timers.StartSingleTimer("rediscover", new RediscoverTick(tick.Generation, tick.Attempt + 1, signature), _rediscoverInterval);
|
||
else
|
||
_log.Debug("DriverInstance {Id}: discovery settled after {Attempt} pass(es), {Count} node(s)", _driverInstanceId, tick.Attempt + 1, nodes.Count);
|
||
}
|
||
|
||
/// <summary>Records the host's desired subscription set without touching the live subscription.
|
||
/// The set is (re)applied by <see cref="ResubscribeDesired"/> on the next <c>Connected</c> entry.</summary>
|
||
private void StoreDesiredSubscriptions(SetDesiredSubscriptions msg)
|
||
{
|
||
_desiredRefs = msg.FullReferences;
|
||
_desiredInterval = msg.PublishingInterval;
|
||
_desiredAlarmRefs = msg.AlarmReferences ?? Array.Empty<string>();
|
||
}
|
||
|
||
/// <summary>Re-establish the desired subscription after (re)connecting. Self-sends the one-shot
|
||
/// <see cref="Subscribe"/> the Connected behaviour already handles (which drops any prior handle
|
||
/// first), so values resume streaming after a reconnect or redeploy without host involvement.</summary>
|
||
private void ResubscribeDesired()
|
||
{
|
||
if (_desiredRefs.Count > 0)
|
||
{
|
||
Self.Tell(new Subscribe(_desiredRefs, _desiredInterval));
|
||
}
|
||
}
|
||
|
||
private void OnDataChangeForward(DataChangeForward msg)
|
||
{
|
||
var quality = QualityFromStatus(msg.Snapshot.StatusCode);
|
||
var ts = msg.Snapshot.SourceTimestampUtc ?? msg.Snapshot.ServerTimestampUtc;
|
||
Context.Parent.Tell(new AttributeValuePublished(_driverInstanceId, msg.FullReference, msg.Snapshot.Value, quality, ts));
|
||
}
|
||
|
||
/// <summary>Translate an OPC UA status code to the 3-state <see cref="OpcUaQuality"/> projection
|
||
/// the publish actor consumes. Severity bits (top 2): 00 = Good, 01 = Uncertain, 10/11 = Bad.</summary>
|
||
private static OpcUaQuality QualityFromStatus(uint statusCode)
|
||
{
|
||
var severity = statusCode >> 30;
|
||
return severity switch
|
||
{
|
||
0 => OpcUaQuality.Good,
|
||
1 => OpcUaQuality.Uncertain,
|
||
_ => OpcUaQuality.Bad,
|
||
};
|
||
}
|
||
|
||
private static bool IsGoodStatus(uint statusCode) => (statusCode >> 30) == 0;
|
||
|
||
/// <summary>Records a transition into a Faulted / error state for the 5-minute sliding counter.</summary>
|
||
private void RecordFault()
|
||
{
|
||
_faultTimestamps.Enqueue(DateTime.UtcNow);
|
||
}
|
||
|
||
/// <summary>Returns how many fault transitions occurred in the last 5 minutes.</summary>
|
||
private int ErrorCount5Min()
|
||
{
|
||
var cutoff = DateTime.UtcNow.AddMinutes(-5);
|
||
while (_faultTimestamps.Count > 0 && _faultTimestamps.Peek() < cutoff)
|
||
_faultTimestamps.Dequeue();
|
||
return _faultTimestamps.Count;
|
||
}
|
||
|
||
/// <summary>
|
||
/// Polls <see cref="IDriver.GetHealth"/> and forwards the snapshot to the health publisher.
|
||
/// Called on every observable state change and by the periodic <see cref="HealthPollTick"/>
|
||
/// so the AdminUI snapshot store is warmed up for newly-joined SignalR clients.
|
||
/// Deduplicates: if the resulting (state, lastSuccess, lastError, errorCount) tuple matches
|
||
/// the last publish, this call is a no-op. Stops flood-publishing identical Healthy snapshots
|
||
/// every 30s when nothing has changed. Newly-joined SignalR clients still get the current
|
||
/// snapshot via <c>DriverStatusHub.JoinDriver</c> which reads the store directly.
|
||
/// </summary>
|
||
private void PublishHealthSnapshot()
|
||
{
|
||
try
|
||
{
|
||
var health = _driver.GetHealth();
|
||
var errorCount = ErrorCount5Min();
|
||
var fingerprint = (health.State, health.LastSuccessfulRead, health.LastError, errorCount);
|
||
if (_lastPublishedFingerprint is { } prev && prev.Equals(fingerprint))
|
||
return;
|
||
_lastPublishedFingerprint = fingerprint;
|
||
_healthPublisher.Publish(_clusterId, _driverInstanceId, health, errorCount);
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
_log.Warning(ex, "DriverInstance {Id}: GetHealth threw during health publish; skipping", _driverInstanceId);
|
||
}
|
||
}
|
||
|
||
/// <summary>Fingerprint of the last <see cref="PublishHealthSnapshot"/> call; null until first publish.</summary>
|
||
private (DriverState State, DateTime? LastSuccess, string? LastError, int ErrorCount)? _lastPublishedFingerprint;
|
||
|
||
/// <inheritdoc />
|
||
protected override void PostStop()
|
||
{
|
||
DetachSubscription();
|
||
try { _driver.ShutdownAsync(CancellationToken.None).GetAwaiter().GetResult(); }
|
||
catch (Exception ex) { _log.Warning(ex, "DriverInstance {Id}: ShutdownAsync threw on PostStop", _driverInstanceId); }
|
||
OtOpcUaTelemetry.DriverInstanceLifecycle.Add(1,
|
||
new KeyValuePair<string, object?>("event", "stop"),
|
||
new KeyValuePair<string, object?>("driver_type", _driver.DriverType));
|
||
}
|
||
}
|