using Akka.Actor;
using Akka.Event;
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Protocol;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.DataConnection;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Management;
using ZB.MOM.WW.ScadaBridge.Commons.Types.Alarms;
using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums;
using ZB.MOM.WW.ScadaBridge.HealthMonitoring;
using ZB.MOM.WW.ScadaBridge.SiteEventLogging;
namespace ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Actors;
///
/// WP-6: Connection actor using Akka.NET Become/Stash pattern for lifecycle state machine.
///
/// States:
/// - Connecting: stash subscribe/write requests; attempts connection
/// - Connected: unstash and process all requests
/// - Reconnecting: push bad quality for all subscribed tags, stash new requests,
/// fixed-interval reconnect
///
/// WP-9: Auto-reconnect with bad quality on disconnect.
/// WP-10: Transparent re-subscribe after reconnection.
/// WP-11: Write-back support (synchronous failure to caller, no S&F).
/// WP-12: Tag path resolution with retry.
/// WP-13: Health reporting (connection status + tag resolution counts).
/// WP-14: Subscription lifecycle (register on create, cleanup on stop).
///
public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
{
public enum ActiveEndpoint { Primary, Backup }
private readonly ILoggingAdapter _log = Context.GetLogger();
private readonly string _connectionName;
private IDataConnection _adapter;
private readonly DataConnectionOptions _options;
private readonly ISiteHealthCollector _healthCollector;
private readonly IDataConnectionFactory _factory;
private readonly string _protocolType;
private readonly ISiteEventLogger? _siteEventLogger;
/// Stash for holding messages while the connection is not in the Connected state.
public IStash Stash { get; set; } = null!;
/// Timer scheduler for reconnect and tag-resolution retry timers.
public ITimerScheduler Timers { get; set; } = null!;
///
/// Active subscriptions: instanceUniqueName → set of tag paths.
///
private readonly Dictionary> _subscriptionsByInstance = new();
///
/// Subscription IDs returned by the adapter: tagPath → subscriptionId.
///
private readonly Dictionary _subscriptionIds = new();
///
/// DataConnectionLayer-008: reverse index of how many instances subscribe to each
/// tag path. Lets decide whether any other instance
/// still needs a tag in O(1) instead of scanning every instance's tag set.
///
private readonly Dictionary _tagSubscriberCount = new();
///
/// Tags whose path resolution failed and are awaiting retry.
///
private readonly HashSet _unresolvedTags = new();
///
/// DataConnectionLayer-010: tags whose retry SubscribeAsync is currently in flight.
/// They are excluded from the next retry tick so a slow attempt is not duplicated
/// (which would leak monitored items / subscription ids).
///
private readonly HashSet _resolutionInFlight = new();
///
/// DataConnectionLayer-018: tags whose initial SubscribeAsync (issued from
/// ) is currently in flight. Two parallel
/// SubscribeTagsRequest messages for different instances sharing a tag
/// path would otherwise both observe "not subscribed" against
/// (the in-flight task has not yet posted its
/// ), both call _adapter.SubscribeAsync,
/// and the second subscription id gets silently dropped at the existing
/// _subscriptionIds.ContainsKey guard in
/// — orphaning the adapter's monitored item (duplicate notifications + leaked
/// memory until the connection drops). This set is read+written only on the
/// actor thread and cleared in for symmetry with
/// .
///
private readonly HashSet _subscribesInFlight = new();
///
/// Subscribers: instanceUniqueName → IActorRef (the Instance Actor).
///
private readonly Dictionary _subscribers = new();
// ── Native alarm subscriptions (Task-10) ──
// The connection opens one alarm feed per source reference; transitions are
// routed to subscribers (NativeAlarmActors) by source-object reference.
/// sourceReference → set of subscriber actor refs (NativeAlarmActors), for routing + ref-count.
private readonly Dictionary> _alarmSourceSubscribers = new();
/// sourceReference → raw condition filter string passed to the adapter (first subscriber wins).
private readonly Dictionary _alarmSourceFilter = new();
///
/// sourceReference → parsed condition-type predicate (M2.4 / #8). The authoritative
/// client-side gate in ; applies uniformly
/// across OPC UA and the gateway-wide MxGateway feed.
///
private readonly Dictionary _alarmSourceFilterPredicate = new();
/// sourceReference → adapter alarm subscription id.
private readonly Dictionary _alarmSubscriptionIds = new();
/// sourceReferences whose adapter SubscribeAlarmsAsync is currently in flight.
private readonly HashSet _alarmSubscribesInFlight = new();
///
/// Tracks total subscribed and resolved tags for health reporting.
///
private int _totalSubscribed;
private int _resolvedTags;
private int _tagsGoodQuality;
private int _tagsBadQuality;
private int _tagsUncertainQuality;
private readonly Dictionary _lastTagQuality = new();
private IDictionary _connectionDetails;
private readonly IDictionary _primaryConfig;
private readonly IDictionary? _backupConfig;
private readonly int _failoverRetryCount;
private ActiveEndpoint _activeEndpoint = ActiveEndpoint.Primary;
private int _consecutiveFailures;
private int _consecutiveUnstableDisconnects;
private DateTimeOffset _lastConnectedAt;
///
/// DataConnectionLayer-011: monotonically increasing tag that identifies the
/// current adapter instance. Subscription callbacks capture the generation in
/// effect when they were created; a whose
/// generation no longer matches comes from a disposed adapter and is dropped so
/// stale pre-failover device data is never forwarded to Instance Actors.
///
private int _adapterGeneration;
///
/// Captured Self reference for use from non-actor threads (event handlers, callbacks).
/// Akka.NET's Self property is only valid inside the actor's message loop.
///
private IActorRef _self = null!;
///
/// Initializes the data connection actor with its adapter and configuration.
///
/// Human-readable name used in logs and health metrics.
/// The protocol adapter for the primary endpoint.
/// Data connection layer configuration options.
/// Collector for site health metrics.
/// Factory used to create replacement adapters on failover.
/// Protocol type identifier (e.g. "OpcUa").
/// Configuration dictionary for the primary endpoint.
/// Optional configuration dictionary for the backup endpoint.
/// Number of consecutive failures before switching to the backup endpoint.
/// Optional site event logger for operational events.
public DataConnectionActor(
string connectionName,
IDataConnection adapter,
DataConnectionOptions options,
ISiteHealthCollector healthCollector,
IDataConnectionFactory factory,
string protocolType,
IDictionary? primaryConfig = null,
IDictionary? backupConfig = null,
int failoverRetryCount = 3,
ISiteEventLogger? siteEventLogger = null)
{
_connectionName = connectionName;
_adapter = adapter;
_options = options;
_healthCollector = healthCollector;
_factory = factory;
_protocolType = protocolType;
_primaryConfig = primaryConfig ?? new Dictionary();
_backupConfig = backupConfig;
_failoverRetryCount = failoverRetryCount;
_siteEventLogger = siteEventLogger;
_connectionDetails = _primaryConfig;
}
///
protected override void PreStart()
{
_log.Info("DataConnectionActor [{0}] starting in Connecting state", _connectionName);
// Capture Self for use from non-actor threads (event handlers, callbacks).
// Akka.NET's Self property is only valid inside the actor's message loop.
_self = Self;
// Listen for unexpected adapter disconnections
_adapter.Disconnected += OnAdapterDisconnected;
BecomeConnecting();
}
private void OnAdapterDisconnected()
{
// Marshal the event onto the actor's message loop using captured _self reference.
// This runs on a background thread (gRPC stream reader), so Self would throw.
_self.Tell(new AdapterDisconnected());
}
///
protected override void PostStop()
{
_log.Info("DataConnectionActor [{0}] stopping — disposing adapter", _connectionName);
_adapter.Disconnected -= OnAdapterDisconnected;
_ = _adapter.DisposeAsync().AsTask();
}
///
protected override void OnReceive(object message)
{
// Default handler — should not be reached due to Become
Unhandled(message);
}
// ── Connecting State ──
private void BecomeConnecting()
{
_log.Info("[{0}] Entering Connecting state", _connectionName);
_healthCollector.UpdateConnectionHealth(_connectionName, ConnectionHealth.Connecting);
_healthCollector.UpdateConnectionEndpoint(_connectionName, "Connecting");
Become(Connecting);
Self.Tell(new AttemptConnect());
}
private void Connecting(object message)
{
switch (message)
{
case AttemptConnect:
HandleAttemptConnect();
break;
case ConnectResult result:
HandleConnectResult(result);
break;
case SubscribeTagsRequest:
case WriteTagRequest:
case UnsubscribeTagsRequest:
case SubscribeAlarmsRequest:
case UnsubscribeAlarmsRequest:
Stash.Stash();
break;
case SubscribeCompleted sc:
// A subscribe started while Connected can complete after a transition;
// apply it so its state survives into the next ReSubscribeAll.
HandleSubscribeCompleted(sc);
break;
case AlarmSubscribeCompleted asc:
HandleAlarmSubscribeCompleted(asc);
break;
case AlarmTransitionReceived:
// No live feed yet in Connecting; ignore (snapshot replays on subscribe).
break;
case BrowseNodeCommand browse:
// Browse is an interactive design-time query; never stash. The
// adapter has no session yet in this state, so reply with a
// typed ConnectionNotConnected failure so the dialog can render
// an inline banner.
HandleBrowse(browse);
break;
case ReadTagValuesCommand read:
// Same rule as browse — never stash; adapter is not yet
// connected, so HandleReadTagValues short-circuits to
// ConnectionNotConnected.
HandleReadTagValues(read);
break;
case GetHealthReport:
ReplyWithHealthReport();
break;
default:
Unhandled(message);
break;
}
}
// ── Connected State ──
private void BecomeConnected()
{
_log.Info("[{0}] Entering Connected state", _connectionName);
_lastConnectedAt = DateTimeOffset.UtcNow;
_healthCollector.UpdateConnectionHealth(_connectionName, ConnectionHealth.Connected);
_healthCollector.UpdateTagResolution(_connectionName, _totalSubscribed, _resolvedTags);
var endpointLabel = _backupConfig == null ? "Connected" : $"Connected to {_activeEndpoint.ToString().ToLower()}";
_healthCollector.UpdateConnectionEndpoint(_connectionName, endpointLabel);
Become(Connected);
Stash.UnstashAll();
}
private void Connected(object message)
{
switch (message)
{
case SubscribeTagsRequest req:
HandleSubscribe(req);
break;
case SubscribeCompleted sc:
// In Connected state, a connection-level subscribe failure must drive
// the reconnection state machine (DataConnectionLayer-004).
if (HandleSubscribeCompleted(sc))
{
_log.Warning("[{0}] Connection-level subscribe failure — entering Reconnecting", _connectionName);
BecomeReconnecting();
}
break;
case UnsubscribeTagsRequest req:
HandleUnsubscribe(req);
break;
case SubscribeAlarmsRequest areq:
HandleSubscribeAlarms(areq);
break;
case UnsubscribeAlarmsRequest areq:
HandleUnsubscribeAlarms(areq);
break;
case AlarmSubscribeCompleted asc:
HandleAlarmSubscribeCompleted(asc);
break;
case AlarmTransitionReceived atr:
HandleAlarmTransitionReceived(atr);
break;
case WriteTagRequest req:
HandleWrite(req);
break;
case TagValueReceived tvr:
HandleTagValueReceived(tvr);
break;
case TagResolutionSucceeded trs:
HandleTagResolutionSucceeded(trs);
break;
case TagResolutionFailed trf:
HandleTagResolutionFailed(trf);
break;
case AdapterDisconnected:
HandleDisconnect();
break;
case RetryTagResolution:
HandleRetryTagResolution();
break;
case BrowseNodeCommand browse:
HandleBrowse(browse);
break;
case ReadTagValuesCommand read:
HandleReadTagValues(read);
break;
case GetHealthReport:
ReplyWithHealthReport();
break;
default:
Unhandled(message);
break;
}
}
// ── Reconnecting State ──
private void BecomeReconnecting()
{
_log.Warning("[{0}] Entering Reconnecting state", _connectionName);
_healthCollector.UpdateConnectionHealth(_connectionName, ConnectionHealth.Disconnected);
_healthCollector.UpdateConnectionEndpoint(_connectionName, "Disconnected");
// Track unstable connections toward failover.
// If we were connected for less than the stability threshold, this counts
// as an unstable cycle (e.g., connect succeeded but heartbeat went stale).
var connectionDuration = DateTimeOffset.UtcNow - _lastConnectedAt;
if (_lastConnectedAt != default && connectionDuration < _options.StableConnectionThreshold)
{
_consecutiveUnstableDisconnects++;
_log.Warning("[{0}] Unstable connection (lasted {1:F0}s) — consecutive unstable disconnects: {2}/{3}",
_connectionName, connectionDuration.TotalSeconds, _consecutiveUnstableDisconnects,
_backupConfig != null ? _failoverRetryCount : 0);
}
else
{
_consecutiveUnstableDisconnects = 0;
}
// Failover if we keep connecting and going stale repeatedly
if (_backupConfig != null && _consecutiveUnstableDisconnects >= _failoverRetryCount)
{
var previousEndpoint = _activeEndpoint;
_activeEndpoint = _activeEndpoint == ActiveEndpoint.Primary
? ActiveEndpoint.Backup
: ActiveEndpoint.Primary;
_consecutiveUnstableDisconnects = 0;
_consecutiveFailures = 0;
var newConfig = _activeEndpoint == ActiveEndpoint.Primary
? _primaryConfig
: _backupConfig;
// Dispose old adapter
_adapter.Disconnected -= OnAdapterDisconnected;
_ = _adapter.DisposeAsync().AsTask();
// Create new adapter for the target endpoint
_adapter = _factory.Create(_protocolType, newConfig);
_connectionDetails = newConfig;
_adapter.Disconnected += OnAdapterDisconnected;
// DataConnectionLayer-011: new adapter — bump the generation so callbacks
// from the disposed adapter are recognised as stale and dropped.
_adapterGeneration++;
_log.Warning("[{0}] Failing over from {1} to {2} (unstable connection pattern)",
_connectionName, previousEndpoint, _activeEndpoint);
if (_siteEventLogger != null)
{
_ = _siteEventLogger.LogEventAsync(
"connection", "Warning", null, _connectionName,
$"Failover from {previousEndpoint} to {_activeEndpoint} (unstable connection)",
$"Connection lasted {connectionDuration.TotalSeconds:F0}s, threshold {_options.StableConnectionThreshold.TotalSeconds:F0}s");
}
}
// Log disconnect to site event log
if (_siteEventLogger != null)
{
_ = _siteEventLogger.LogEventAsync(
"connection", "Warning", null, _connectionName,
$"Connection lost — entering reconnect cycle", null);
}
Become(Reconnecting);
// WP-9: Push bad quality for all subscribed tags on disconnect
PushBadQualityForAllTags();
// Task-10: notify native alarm subscribers the source feed is unavailable
// (mark mirrored alarms uncertain; the reconnect snapshot reconciles them).
PushAlarmSourceUnavailable();
// Schedule reconnect attempt
Timers.StartSingleTimer("reconnect", new AttemptConnect(), _options.ReconnectInterval);
}
private void Reconnecting(object message)
{
switch (message)
{
case AttemptConnect:
HandleAttemptConnect();
break;
case ConnectResult result:
HandleReconnectResult(result);
break;
case SubscribeTagsRequest:
case WriteTagRequest:
case SubscribeAlarmsRequest:
Stash.Stash();
break;
case UnsubscribeTagsRequest req:
// Allow unsubscribe even during reconnect (for cleanup on instance stop)
HandleUnsubscribe(req);
break;
case UnsubscribeAlarmsRequest areq:
// Allow alarm unsubscribe during reconnect (cleanup on instance stop).
HandleUnsubscribeAlarms(areq);
break;
case TagValueReceived:
// Ignore — stale callback from previous connection
break;
case AlarmTransitionReceived:
// Ignore — stale alarm callback from previous connection; ReSubscribeAll re-seeds.
break;
case TagResolutionSucceeded:
case TagResolutionFailed:
// Ignore — stale results from previous connection; ReSubscribeAll runs after reconnect
break;
case SubscribeCompleted sc:
// A subscribe started while Connected can complete after a transition;
// apply it so its state survives into the next ReSubscribeAll.
HandleSubscribeCompleted(sc);
break;
case AlarmSubscribeCompleted asc:
HandleAlarmSubscribeCompleted(asc);
break;
case BrowseNodeCommand browse:
// Browse is design-time and never stashed. While reconnecting
// the adapter has no live session, so the adapter call will
// throw ConnectionNotConnectedException — mapped by HandleBrowse.
HandleBrowse(browse);
break;
case ReadTagValuesCommand read:
// Same rule as browse — never stashed; while reconnecting the
// adapter is not Connected so HandleReadTagValues short-circuits
// to a ConnectionNotConnected failure.
HandleReadTagValues(read);
break;
case GetHealthReport:
ReplyWithHealthReport();
break;
default:
Unhandled(message);
break;
}
}
// ── Connection Management ──
private void HandleAttemptConnect()
{
_log.Debug("[{0}] Attempting connection...", _connectionName);
var self = Self;
_adapter.ConnectAsync(_connectionDetails).ContinueWith(t =>
{
if (t.IsCompletedSuccessfully)
return new ConnectResult(true, null);
return new ConnectResult(false, t.Exception?.GetBaseException().Message);
}).PipeTo(self);
}
private void HandleConnectResult(ConnectResult result)
{
if (result.Success)
{
_log.Info("[{0}] Connection established", _connectionName);
BecomeConnected();
}
else
{
// DataConnectionLayer-015: the INITIAL connect must participate in the
// failover counter exactly like a reconnect. Without this a primary that is
// unreachable when the actor first starts (fresh deployment, site restart, or
// a primary simply down) is retried forever and the configured backup is
// never tried. Count the failure and switch endpoint once the retry count is
// exhausted, then re-arm the timer.
_consecutiveFailures++;
CountFailureAndMaybeFailover(result.Error);
Timers.StartSingleTimer("reconnect", new AttemptConnect(), _options.ReconnectInterval);
}
}
private void HandleReconnectResult(ConnectResult result)
{
if (result.Success)
{
_log.Info("[{0}] Reconnected successfully on {1} endpoint", _connectionName, _activeEndpoint);
_consecutiveFailures = 0;
// Log restoration event to site event log
if (_siteEventLogger != null)
{
_ = _siteEventLogger.LogEventAsync(
"connection", "Info", null, _connectionName,
$"Connection restored on {_activeEndpoint} endpoint", null);
}
// WP-10: Transparent re-subscribe — re-establish all active subscriptions
ReSubscribeAll();
// Task-10: re-establish native alarm feeds (source replays a snapshot).
ReSubscribeAllAlarms();
BecomeConnected();
}
else
{
_consecutiveFailures++;
CountFailureAndMaybeFailover(result.Error);
Timers.StartSingleTimer("reconnect", new AttemptConnect(), _options.ReconnectInterval);
}
}
///
/// Shared connect-failure handling for both the initial connect (Connecting state)
/// and reconnect (Reconnecting state). Assumes has
/// already been incremented for the current failure. Switches to the other endpoint
/// once the retry count is exhausted and a backup is configured
/// (DataConnectionLayer-015 brought the initial connect onto this path).
///
private void CountFailureAndMaybeFailover(string? error)
{
// Failover: switch endpoint after exhausting retry count (only if backup is configured)
if (_backupConfig != null && _consecutiveFailures >= _failoverRetryCount)
{
var previousEndpoint = _activeEndpoint;
_activeEndpoint = _activeEndpoint == ActiveEndpoint.Primary
? ActiveEndpoint.Backup
: ActiveEndpoint.Primary;
_consecutiveFailures = 0;
var newConfig = _activeEndpoint == ActiveEndpoint.Primary
? _primaryConfig
: _backupConfig;
// Dispose old adapter (fire-and-forget — don't await in actor context)
_adapter.Disconnected -= OnAdapterDisconnected;
_ = _adapter.DisposeAsync().AsTask();
// Create new adapter for the target endpoint
_adapter = _factory.Create(_protocolType, newConfig);
_connectionDetails = newConfig;
// Wire disconnect handler on new adapter
_adapter.Disconnected += OnAdapterDisconnected;
// DataConnectionLayer-011: new adapter — bump the generation so callbacks
// from the disposed adapter are recognised as stale and dropped.
_adapterGeneration++;
_log.Warning("[{0}] Failing over from {1} to {2}",
_connectionName, previousEndpoint, _activeEndpoint);
// Log failover event to site event log
if (_siteEventLogger != null)
{
_ = _siteEventLogger.LogEventAsync(
"connection", "Warning", null, _connectionName,
$"Failover from {previousEndpoint} to {_activeEndpoint}",
$"After {_failoverRetryCount} consecutive failures");
}
}
else
{
var retryLimit = _backupConfig != null ? _failoverRetryCount.ToString() : "∞";
_log.Warning("[{0}] Connect failed: {1}. Retrying in {2}s (attempt {3}/{4})",
_connectionName, error, _options.ReconnectInterval.TotalSeconds,
_consecutiveFailures, retryLimit);
}
}
private void HandleDisconnect()
{
_log.Warning("[{0}] AdapterDisconnected message received — transitioning to Reconnecting", _connectionName);
BecomeReconnecting();
}
// ── Subscription Management (WP-14) ──
private void HandleSubscribe(SubscribeTagsRequest request)
{
_log.Debug("[{0}] Subscribing {1} tags for instance {2}",
_connectionName, request.TagPaths.Count, request.InstanceUniqueName);
_subscribers[request.InstanceUniqueName] = Sender;
if (!_subscriptionsByInstance.ContainsKey(request.InstanceUniqueName))
_subscriptionsByInstance[request.InstanceUniqueName] = new HashSet();
var self = Self;
var sender = Sender;
// DataConnectionLayer-011: capture the current adapter generation so callbacks
// from this adapter can be distinguished from a later (post-failover) adapter.
var generation = _adapterGeneration;
// DataConnectionLayer-018: partition tags on the actor thread into "this
// request will issue _adapter.SubscribeAsync" vs. "already subscribed (by us
// or by another in-flight SubscribeTagsRequest)". A tag that is already in
// _subscriptionIds OR currently in _subscribesInFlight is treated as
// AlreadySubscribed — the eventual SubscribeCompleted of the in-flight
// request will populate _subscriptionIds, at which point a subsequent
// unsubscribe by either instance correctly references the adapter handle.
// The background task below must NOT read or mutate actor state — these
// partitioned lists are the only state it sees.
var tagsToSubscribe = new List(request.TagPaths.Count);
var preResolvedResults = new List();
foreach (var tagPath in request.TagPaths)
{
if (_subscriptionIds.ContainsKey(tagPath) || _subscribesInFlight.Contains(tagPath))
{
preResolvedResults.Add(new SubscribeTagResult(
tagPath, AlreadySubscribed: true, Success: true, null, null));
}
else
{
tagsToSubscribe.Add(tagPath);
_subscribesInFlight.Add(tagPath);
}
}
Task.Run(async () =>
{
var results = new List(request.TagPaths.Count);
results.AddRange(preResolvedResults);
var tagsToSeed = new List(preResolvedResults.Count + tagsToSubscribe.Count);
foreach (var r in preResolvedResults)
{
tagsToSeed.Add(r.TagPath);
}
foreach (var tagPath in tagsToSubscribe)
{
try
{
var subId = await _adapter.SubscribeAsync(tagPath, (path, value) =>
{
self.Tell(new TagValueReceived(path, value, generation));
});
results.Add(new SubscribeTagResult(tagPath, AlreadySubscribed: false, Success: true, subId, null));
tagsToSeed.Add(tagPath);
}
catch (Exception ex)
{
// DataConnectionLayer-004: distinguish a connection-level fault
// (adapter not connected / transport down) from a genuine
// node-not-found. Connection-level faults must drive the
// reconnection state machine, not be retried as unresolved tags.
var connectionLevel = IsConnectionLevelFailure(ex);
results.Add(new SubscribeTagResult(
tagPath, AlreadySubscribed: false, Success: false, null, ex.Message,
ConnectionLevelFailure: connectionLevel));
}
}
// Initial read — seed current values for resolved tags so the Instance Actor
// doesn't stay Uncertain until the next OPC UA data change notification.
// Tell is thread-safe, so seeded values are delivered directly as messages.
foreach (var tagPath in tagsToSeed)
{
try
{
var readResult = await _adapter.ReadAsync(tagPath);
if (readResult.Success && readResult.Value != null)
{
self.Tell(new TagValueReceived(tagPath, readResult.Value, generation));
}
}
catch
{
// Best-effort — subscription will deliver subsequent changes
}
}
return new SubscribeCompleted(request, sender, results);
}).PipeTo(self);
}
///
/// Applies the result of an asynchronous subscribe on the actor thread. ALL mutation
/// of subscription state and counters happens here — never on the background task —
/// so the actor model's single-threaded state guarantee holds.
/// Returns true if any tag failed at connection level (DataConnectionLayer-004),
/// signalling the caller (only the Connected state) to enter Reconnecting.
///
private bool HandleSubscribeCompleted(SubscribeCompleted msg)
{
var instanceName = msg.Request.InstanceUniqueName;
if (!_subscriptionsByInstance.TryGetValue(instanceName, out var instanceTags))
{
// DataConnectionLayer-021: the instance was unsubscribed while the
// subscribe I/O was in flight. Re-creating the per-instance entry and
// applying counter/handle mutations here would permanently leak state
// — _subscriptionsByInstance[instanceName] resurrected with no
// subscriber to receive callbacks, _tagSubscriberCount inflated forever
// (no future HandleUnsubscribe will drop it), and _totalSubscribed /
// _resolvedTags drifting above the real instance count across the
// adapter lifetime (also re-issued by ReSubscribeAll on every
// reconnect). Instead: drop all state mutations for this stale
// message and release the adapter-level monitored items we just
// created so the device doesn't keep streaming change notifications
// for a tag nobody is subscribed to.
_log.Warning(
"[{0}] SubscribeCompleted arrived for instance {1} but the instance " +
"was unsubscribed while the subscribe was in flight; releasing " +
"{2} adapter handle(s) and discarding state mutations.",
_connectionName, instanceName, msg.Results.Count(r => r.Success && !r.AlreadySubscribed));
foreach (var result in msg.Results)
{
// DCL-018: clear in-flight markers we placed in HandleSubscribe.
if (!result.AlreadySubscribed)
_subscribesInFlight.Remove(result.TagPath);
// Fire-and-forget release of any subscription id this request
// genuinely created. AlreadySubscribed=true means another caller
// owns the adapter handle and unsubscribing it would break them.
if (result is { Success: true, AlreadySubscribed: false, SubscriptionId: not null })
{
_ = _adapter.UnsubscribeAsync(result.SubscriptionId);
}
}
// The original sender is already gone (unsubscribed). Telling a dead
// ref produces a dead letter, which is the harmless and observable
// outcome — but skipping the reply altogether keeps dead-letter noise
// out of the log when this race fires in the normal disable/redeploy
// path. The unsubscribe message did NOT request a response of its own.
return false;
}
// DataConnectionLayer-004: if any tag failed because the adapter is not
// connected (a connection-level fault), the subscribe needs the reconnection
// state machine, not the tag-resolution retry. Drive a disconnect and let the
// request be re-stashed/retried after reconnect via ReSubscribeAll.
var connectionLevelFailure = msg.Results.Any(r => !r.Success && r.ConnectionLevelFailure);
foreach (var result in msg.Results)
{
// DataConnectionLayer-018: a result with AlreadySubscribed: false means
// this request was responsible for the SubscribeAsync call — the tag
// was added to _subscribesInFlight in HandleSubscribe. Clear it now so
// a later SubscribeTagsRequest for the same tag isn't forever treated
// as in-flight. AlreadySubscribed: true tags were not added to the
// set (another request owned the in-flight slot).
if (!result.AlreadySubscribed)
_subscribesInFlight.Remove(result.TagPath);
// DataConnectionLayer-008: only a tag newly added to THIS instance's set
// increments the reference count, so the count stays an accurate "number
// of distinct instances subscribed to this tag".
if (instanceTags.Add(result.TagPath))
_tagSubscriberCount[result.TagPath] =
_tagSubscriberCount.GetValueOrDefault(result.TagPath) + 1;
// Re-check against current state: another subscribe may have resolved the
// same tag while this request's I/O was in flight.
if (result.AlreadySubscribed || _subscriptionIds.ContainsKey(result.TagPath))
continue;
if (result.Success)
{
_subscriptionIds[result.TagPath] = result.SubscriptionId!;
// DataConnectionLayer-020: distinguish fresh subscribe from
// unresolved → resolved promotion. If an earlier instance's
// subscribe for this tag had failed at the resolution layer
// (the tag was already added to _unresolvedTags AND already
// counted in _totalSubscribed), this success transitions it
// from unresolved to resolved — increment _resolvedTags ONLY.
// Incrementing _totalSubscribed again here would over-count by
// one until HandleTagResolutionSucceeded reconciled. Mirrors
// HandleTagResolutionSucceeded's promotion shape so both paths
// resolve a previously-failed tag identically.
if (_unresolvedTags.Remove(result.TagPath))
{
_resolutionInFlight.Remove(result.TagPath);
_resolvedTags++;
}
else
{
_totalSubscribed++;
_resolvedTags++;
}
}
else if (result.ConnectionLevelFailure)
{
// Connection-level fault — do not count as an unresolved tag.
// ReSubscribeAll after reconnect derives the tag from
// _subscriptionsByInstance (already updated above).
_log.Warning("[{0}] Subscribe for {1} failed at connection level: {2}",
_connectionName, result.TagPath, result.Error);
}
else
{
// WP-12: genuine tag resolution failure — mark unresolved so the
// periodic retry timer picks it up. DataConnectionLayer-020:
// only increment _totalSubscribed when the tag is genuinely
// newly-tracked. A second instance failing to resolve a tag the
// first instance already added to _unresolvedTags is the same
// logical tag, counted once — bumping _totalSubscribed again
// would over-report TotalSubscribedTags forever.
var newlyUnresolved = _unresolvedTags.Add(result.TagPath);
if (newlyUnresolved)
{
_totalSubscribed++;
}
_log.Debug("[{0}] Tag resolution failed for {1}: {2}",
_connectionName, result.TagPath, result.Error);
// DataConnectionLayer-004 / design doc Tag Path Resolution step 2:
// mark the attribute quality `bad` so the Instance Actor sees a
// signal rather than staying Uncertain indefinitely.
if (_subscribers.TryGetValue(instanceName, out var subscriber))
{
subscriber.Tell(new TagValueUpdate(
_connectionName, result.TagPath, null, QualityCode.Bad, DateTimeOffset.UtcNow));
}
}
}
// Start the tag-resolution retry timer if any tags are unresolved.
// DataConnectionLayer-022: StartPeriodicTimer with an existing key CANCELS
// and replaces the prior timer, so a fan-out of SubscribeTagsRequests
// arriving faster than TagResolutionRetryInterval would keep resetting
// the timer and starve the retry indefinitely. Gating on IsTimerActive
// means the first failure starts the timer and subsequent failures
// simply pile onto _unresolvedTags without restarting the clock.
if (_unresolvedTags.Count > 0 && !Timers.IsTimerActive("tag-resolution-retry"))
{
Timers.StartPeriodicTimer(
"tag-resolution-retry",
new RetryTagResolution(),
_options.TagResolutionRetryInterval,
_options.TagResolutionRetryInterval);
}
// DataConnectionLayer-016: the response must match the actor's own assessment.
// When a connection-level failure is driving the actor into Reconnecting, the
// tags were never subscribed at the adapter — replying Success: true would tell
// the Instance Actor the subscribe succeeded when it did not. Genuine
// tag-resolution failures stay Success: true (they are a runtime quality concern
// tracked via _unresolvedTags, with a Bad-quality TagValueUpdate already pushed).
msg.ReplyTo.Tell(connectionLevelFailure
? new SubscribeTagsResponse(
msg.Request.CorrelationId, instanceName, false,
"connection unavailable — will re-subscribe on reconnect", DateTimeOffset.UtcNow)
: new SubscribeTagsResponse(
msg.Request.CorrelationId, instanceName, true, null, DateTimeOffset.UtcNow));
// The caller (Connected state only) decides whether to enter Reconnecting.
// In Connecting/Reconnecting the connection is not established anyway, so the
// existing reconnect cycle handles recovery without a re-trigger here.
return connectionLevelFailure;
}
///
/// DataConnectionLayer-004: classifies a subscribe exception as a connection-level
/// fault (adapter not connected / transport down) versus a genuine tag-resolution
/// failure (the node does not exist on the device). Connection-level faults must
/// drive the reconnection state machine; resolution failures are retried on the
/// tag-resolution timer.
///
private static bool IsConnectionLevelFailure(Exception ex)
{
var baseEx = ex is AggregateException agg ? agg.GetBaseException() : ex;
return baseEx is InvalidOperationException
or System.Net.Sockets.SocketException
or TimeoutException
or System.IO.IOException;
}
private void HandleUnsubscribe(UnsubscribeTagsRequest request)
{
_log.Debug("[{0}] Unsubscribing all tags for instance {1}",
_connectionName, request.InstanceUniqueName);
if (!_subscriptionsByInstance.TryGetValue(request.InstanceUniqueName, out var tags))
return;
// WP-14: Cleanup on Instance Actor stop
foreach (var tagPath in tags)
{
// DataConnectionLayer-008: drop this instance's reference; the tag is only
// released at the adapter when no other instance still subscribes to it.
// The reference count makes this O(1) instead of an O(instances) scan.
var remaining = _tagSubscriberCount.GetValueOrDefault(tagPath) - 1;
if (remaining > 0)
{
_tagSubscriberCount[tagPath] = remaining;
continue;
}
_tagSubscriberCount.Remove(tagPath);
// Last subscriber gone. A tag with a subscription id is a resolved tag;
// an unresolved tag never has a subscription id, so reaching this branch
// via TryGetValue means the tag was resolved — decrement _resolvedTags
// unconditionally (the previous `!_unresolvedTags.Contains` re-check after
// an unconditional Remove was always-true dead logic).
if (_subscriptionIds.TryGetValue(tagPath, out var subId))
{
_ = _adapter.UnsubscribeAsync(subId);
_subscriptionIds.Remove(tagPath);
_resolutionInFlight.Remove(tagPath);
_totalSubscribed--;
_resolvedTags--;
// DataConnectionLayer-006: drop the tag's tracked quality so it is no
// longer counted by PushBadQualityForAllTags (which sets _tagsBadQuality
// from _lastTagQuality.Count). Leaving it here drifts the quality
// counters above _totalSubscribed across disconnect cycles.
if (_lastTagQuality.Remove(tagPath, out var droppedQuality))
{
switch (droppedQuality)
{
case QualityCode.Good: _tagsGoodQuality--; break;
case QualityCode.Bad: _tagsBadQuality--; break;
case QualityCode.Uncertain: _tagsUncertainQuality--; break;
}
}
}
else if (_unresolvedTags.Remove(tagPath))
{
// Last subscriber gone for a tag that had never resolved: stop
// retrying it and drop it from the subscribed total. The previous
// implementation never reached this case (its guard required a
// subscription id), so an unresolved tag leaked into the retry timer
// and TotalSubscribedTags forever after its instance unsubscribed.
_resolutionInFlight.Remove(tagPath);
_totalSubscribed--;
}
}
_subscriptionsByInstance.Remove(request.InstanceUniqueName);
_subscribers.Remove(request.InstanceUniqueName);
// DataConnectionLayer-006: keep the reported quality counters in sync after the
// unsubscribed tags' buckets were decremented above.
_healthCollector.UpdateTagQuality(_connectionName, _tagsGoodQuality, _tagsBadQuality, _tagsUncertainQuality);
_healthCollector.UpdateTagResolution(_connectionName, _totalSubscribed, _resolvedTags);
}
// ── Write Support (WP-11) ──
private void HandleWrite(WriteTagRequest request)
{
_log.Debug("[{0}] Writing to tag {1}", _connectionName, request.TagPath);
var sender = Sender;
// DataConnectionLayer-005: bound the write with WriteTimeout. A hung device
// write (TCP black-hole) would otherwise never complete, so PipeTo never
// fires and the calling script gets no DCL-level error. The CancellationToken
// is passed to the adapter; on timeout we translate cancellation into a
// failed WriteTagResponse so the failure is returned synchronously (WP-11).
var cts = new CancellationTokenSource(_options.WriteTimeout);
// WP-11: Write through DCL to device, failure returned synchronously
_adapter.WriteAsync(request.TagPath, request.Value, cts.Token).ContinueWith(t =>
{
cts.Dispose();
if (t.IsCompletedSuccessfully)
{
var result = t.Result;
return new WriteTagResponse(
request.CorrelationId, result.Success, result.ErrorMessage, DateTimeOffset.UtcNow);
}
if (t.IsCanceled || t.Exception?.GetBaseException() is OperationCanceledException)
{
return new WriteTagResponse(
request.CorrelationId, false,
$"Write timeout after {_options.WriteTimeout.TotalSeconds:F0}s", DateTimeOffset.UtcNow);
}
return new WriteTagResponse(
request.CorrelationId, false, t.Exception?.GetBaseException().Message, DateTimeOffset.UtcNow);
}).PipeTo(sender);
}
// ── OPC UA Tag Browser (interactive design-time query) ──
///
/// Handles a forwarded by the
/// . The capability check (does
/// this adapter support browsing?) and all browse-failure mapping live
/// here because the adapter is held by this actor, not the manager.
///
/// Failure mapping:
///
/// - — adapter is not , or it threw (browsable adapter, but the server/protocol cannot browse — e.g. a gateway build predating the browse RPC); message carried verbatim in the latter case.
/// - — adapter threw .
/// - — adapter threw .
/// - — any other exception, message carried verbatim.
///
///
/// The reply is sent via PipeTo(sender) — the same pattern used by
/// — so the captured is
/// safe to use from the continuation (which runs off the actor thread).
///
private void HandleBrowse(BrowseNodeCommand command)
{
var sender = Sender;
if (_adapter is not IBrowsableDataConnection browsable)
{
_log.Debug("[{0}] Browse requested but adapter does not implement IBrowsableDataConnection", _connectionName);
sender.Tell(new BrowseNodeResult(
Array.Empty(),
Truncated: false,
new BrowseFailure(
BrowseFailureKind.NotBrowsable,
$"Connection '{_connectionName}' does not support browsing.")));
return;
}
_log.Debug("[{0}] Browsing children of {1}", _connectionName, command.ParentNodeId ?? "(root)");
browsable.BrowseChildrenAsync(command.ParentNodeId).ContinueWith(t =>
{
if (t.IsCompletedSuccessfully)
{
// Bound the reply to stay under Akka's remote frame size before it
// crosses the site→central boundary (see CapBrowseChildren).
var (children, truncated) = CapBrowseChildren(t.Result.Children, t.Result.Truncated);
return new BrowseNodeResult(children, truncated, Failure: null);
}
var baseEx = t.Exception?.GetBaseException();
return baseEx switch
{
ConnectionNotConnectedException notConnected => new BrowseNodeResult(
Array.Empty(),
Truncated: false,
new BrowseFailure(BrowseFailureKind.ConnectionNotConnected, notConnected.Message)),
OperationCanceledException => new BrowseNodeResult(
Array.Empty(),
Truncated: false,
new BrowseFailure(BrowseFailureKind.Timeout, "Browse cancelled.")),
// Adapter reachable but the protocol/server cannot browse (e.g. an
// MxGateway build that predates the BrowseChildren RPC). Carry the
// adapter's explanatory message through as NotBrowsable.
NotSupportedException notSupported => new BrowseNodeResult(
Array.Empty(),
Truncated: false,
new BrowseFailure(BrowseFailureKind.NotBrowsable, notSupported.Message)),
_ => new BrowseNodeResult(
Array.Empty(),
Truncated: false,
new BrowseFailure(
BrowseFailureKind.ServerError,
baseEx?.Message ?? "Unknown browse error.")),
};
}).PipeTo(sender);
}
///
/// Estimated-byte ceiling for a single , kept
/// comfortably below Akka's default 128 KB remote frame size. A browse reply
/// crosses the site→central frame on a temp Ask actor; an oversized reply is
/// silently discarded by remoting (the picker then hangs on "loading…"). The
/// limit is a byte budget rather than a child count because the only thing
/// that actually consumes frame space is serialized size — OPC UA NodeIds and
/// MxGateway tag references vary widely in length, so a fixed count is not a
/// safe proxy.
///
private const int BrowseResultByteBudget = 100 * 1024;
///
/// Truncates a browse child list to using
/// a conservative per-node size estimate (JSON structural overhead plus the two
/// variable-length strings — ASCII NodeId/DisplayName ≈ 1 byte/char). Returns
/// the kept prefix and a Truncated flag OR-ed with the adapter's own
/// truncation signal, so the picker shows its "use manual entry" hint when the
/// level is clipped. Protocol-agnostic: every adapter's reply funnels through
/// here regardless of how it paginates upstream.
///
private static (IReadOnlyList Children, bool Truncated) CapBrowseChildren(
IReadOnlyList children, bool truncated)
{
var budget = 0;
var kept = new List(children.Count);
foreach (var node in children)
{
budget += 64 + (node.NodeId?.Length ?? 0) + (node.DisplayName?.Length ?? 0);
if (budget > BrowseResultByteBudget)
{
truncated = true;
break;
}
kept.Add(node);
}
return (kept, truncated);
}
// ── Test Bindings (one-shot live read of bound tags) ──
///
/// Handles a forwarded by the
/// . Short-circuits to a
/// failure
/// when the adapter is not currently Connected (Connecting / Reconnecting
/// states) so the dialog can render an inline banner without waiting for
/// the adapter to fail per-tag with a generic "client is not connected"
/// message. Otherwise calls _adapter.ReadBatchAsync and maps the
/// resulting per-tag map onto a list of
/// (preserving every requested tag — missing
/// adapter entries become failure outcomes).
///
/// Failure mapping mirrors :
///
/// - — adapter status is not .
/// - — batch cancelled ().
/// - — any other exception, message carried verbatim.
///
///
/// The reply is sent via PipeTo(sender) — same pattern as
/// and — so the
/// captured is safe to use from the continuation.
///
private void HandleReadTagValues(ReadTagValuesCommand command)
{
var sender = Sender;
if (_adapter.Status != ConnectionHealth.Connected)
{
_log.Debug("[{0}] Test-bindings read requested but adapter status is {1}", _connectionName, _adapter.Status);
sender.Tell(new ReadTagValuesResult(
Array.Empty(),
new ReadTagValuesFailure(
ReadTagValuesFailureKind.ConnectionNotConnected,
"Connection is not yet established.")));
return;
}
_log.Debug("[{0}] Test-bindings read of {1} tag(s)", _connectionName, command.TagPaths.Count);
var tagPaths = command.TagPaths.ToList();
_adapter.ReadBatchAsync(tagPaths).ContinueWith(t =>
{
if (t.IsCompletedSuccessfully)
{
var nowUtc = DateTimeOffset.UtcNow;
var outcomes = new List(tagPaths.Count);
foreach (var tagPath in tagPaths)
{
if (t.Result.TryGetValue(tagPath, out var result) && result.Success && result.Value is not null)
{
outcomes.Add(new TagReadOutcome(
tagPath,
Success: true,
Value: result.Value.Value,
Quality: result.Value.Quality.ToString(),
Timestamp: result.Value.Timestamp,
ErrorMessage: null));
}
else
{
var errMsg = result?.ErrorMessage
?? (t.Result.ContainsKey(tagPath)
? "Read returned no value."
: "Tag missing from adapter result.");
outcomes.Add(new TagReadOutcome(
tagPath,
Success: false,
Value: null,
Quality: "Bad",
Timestamp: nowUtc,
ErrorMessage: errMsg));
}
}
return new ReadTagValuesResult(outcomes, Failure: null);
}
var baseEx = t.Exception?.GetBaseException();
return baseEx switch
{
OperationCanceledException => new ReadTagValuesResult(
Array.Empty(),
new ReadTagValuesFailure(ReadTagValuesFailureKind.Timeout, "Read cancelled.")),
_ => new ReadTagValuesResult(
Array.Empty(),
new ReadTagValuesFailure(
ReadTagValuesFailureKind.ServerError,
baseEx?.Message ?? "Unknown read error.")),
};
}).PipeTo(sender);
}
// ── Tag Resolution Retry (WP-12) ──
private void HandleRetryTagResolution()
{
if (_unresolvedTags.Count == 0)
{
Timers.Cancel("tag-resolution-retry");
return;
}
var self = Self;
// DataConnectionLayer-010: only dispatch retries for tags that do not already
// have an attempt in flight. A slow SubscribeAsync overlapping the next tick
// would otherwise produce duplicate concurrent subscribes for the same tag.
var toResolve = _unresolvedTags.Where(t => !_resolutionInFlight.Contains(t)).ToList();
if (toResolve.Count == 0)
{
_log.Debug("[{0}] Tag-resolution retry skipped — {1} attempt(s) still in flight",
_connectionName, _resolutionInFlight.Count);
return;
}
_log.Debug("[{0}] Retrying resolution for {1} unresolved tags", _connectionName, toResolve.Count);
var generation = _adapterGeneration;
foreach (var tagPath in toResolve)
{
_resolutionInFlight.Add(tagPath);
_adapter.SubscribeAsync(tagPath, (path, value) =>
{
self.Tell(new TagValueReceived(path, value, generation));
}).ContinueWith(t =>
{
if (t.IsCompletedSuccessfully)
return new TagResolutionSucceeded(tagPath, t.Result) as object;
return new TagResolutionFailed(tagPath, t.Exception?.GetBaseException().Message ?? "Unknown error");
}).PipeTo(self);
}
}
// ── Bad Quality Push (WP-9) ──
private void PushBadQualityForAllTags()
{
var now = DateTimeOffset.UtcNow;
foreach (var (instanceName, tags) in _subscriptionsByInstance)
{
if (!_subscribers.TryGetValue(instanceName, out var subscriber))
continue;
subscriber.Tell(new ConnectionQualityChanged(_connectionName, QualityCode.Bad, now));
}
// All tags now bad quality
_tagsGoodQuality = 0;
_tagsUncertainQuality = 0;
_tagsBadQuality = _lastTagQuality.Count;
foreach (var key in _lastTagQuality.Keys.ToList())
_lastTagQuality[key] = QualityCode.Bad;
_healthCollector.UpdateTagQuality(_connectionName, _tagsGoodQuality, _tagsBadQuality, _tagsUncertainQuality);
}
// ── Re-subscribe (WP-10) ──
private void ReSubscribeAll()
{
// Derive tag list from _subscriptionsByInstance (durable source of truth),
// not _subscriptionIds which gets cleared and is only repopulated on success.
var allTags = _subscriptionsByInstance.Values
.SelectMany(tags => tags)
.Distinct()
.ToList();
_log.Info("[{0}] Re-subscribing {1} tags after reconnect", _connectionName, allTags.Count);
var self = Self;
_subscriptionIds.Clear();
_unresolvedTags.Clear();
_resolutionInFlight.Clear();
// DataConnectionLayer-018: symmetric with _resolutionInFlight — any pending
// initial-subscribe completions from the previous adapter generation will
// post SubscribeCompleted to the actor, but ReSubscribeAll has just emptied
// the in-flight tracking; the stale completion simply has nothing to
// remove (idempotent HashSet.Remove on a missing key).
_subscribesInFlight.Clear();
_resolvedTags = 0;
// DataConnectionLayer-006: reset the quality tracking too. Otherwise tags
// resolved for the first time after reconnect (never in _lastTagQuality) only
// increment their bucket and the totals drift above _totalSubscribed. They are
// repopulated from fresh TagValueReceived messages once subscriptions activate.
_lastTagQuality.Clear();
_tagsGoodQuality = 0;
_tagsBadQuality = 0;
_tagsUncertainQuality = 0;
_healthCollector.UpdateTagQuality(_connectionName, _tagsGoodQuality, _tagsBadQuality, _tagsUncertainQuality);
var generation = _adapterGeneration;
foreach (var tagPath in allTags)
{
_adapter.SubscribeAsync(tagPath, (path, value) =>
{
self.Tell(new TagValueReceived(path, value, generation));
}).ContinueWith(t =>
{
if (t.IsCompletedSuccessfully)
return new TagResolutionSucceeded(tagPath, t.Result) as object;
return new TagResolutionFailed(tagPath, t.Exception?.GetBaseException().Message ?? "Unknown error");
}).PipeTo(self);
}
}
// ── Health Reporting (WP-13) ──
private void ReplyWithHealthReport()
{
var status = _adapter.Status;
var endpointLabel = _backupConfig == null
? "Primary (no backup)"
: _activeEndpoint.ToString();
Sender.Tell(new DataConnectionHealthReport(
_connectionName, status, _totalSubscribed, _resolvedTags, endpointLabel, DateTimeOffset.UtcNow));
}
// ── Internal message handlers for piped async results ──
private void HandleTagResolutionSucceeded(TagResolutionSucceeded msg)
{
// DataConnectionLayer-010: the retry attempt for this tag has completed.
_resolutionInFlight.Remove(msg.TagPath);
if (_unresolvedTags.Remove(msg.TagPath))
{
_subscriptionIds[msg.TagPath] = msg.SubscriptionId;
_resolvedTags++;
_healthCollector.UpdateTagResolution(_connectionName, _totalSubscribed, _resolvedTags);
_log.Info("[{0}] Tag resolved: {1}", _connectionName, msg.TagPath);
}
if (_unresolvedTags.Count == 0)
{
Timers.Cancel("tag-resolution-retry");
}
}
private void HandleTagResolutionFailed(TagResolutionFailed msg)
{
_log.Debug("[{0}] Tag resolution still failing for {1}: {2}",
_connectionName, msg.TagPath, msg.Error);
// DataConnectionLayer-010: the retry attempt for this tag has completed —
// it is eligible for the next retry tick again.
_resolutionInFlight.Remove(msg.TagPath);
// Track as unresolved so periodic retry picks it up. DCL-022: gate on
// IsTimerActive so a stream of TagResolutionFailed events doesn't keep
// cancelling and re-starting the timer faster than its own interval.
if (_unresolvedTags.Add(msg.TagPath) && !Timers.IsTimerActive("tag-resolution-retry"))
{
Timers.StartPeriodicTimer(
"tag-resolution-retry",
new RetryTagResolution(),
_options.TagResolutionRetryInterval,
_options.TagResolutionRetryInterval);
}
}
private void HandleTagValueReceived(TagValueReceived msg)
{
// DataConnectionLayer-011: drop values delivered by a disposed adapter. After a
// failover the old adapter's OPC UA SDK threads may still fire callbacks; those
// carry a stale generation and must not be forwarded to Instance Actors.
if (msg.AdapterGeneration != _adapterGeneration)
{
_log.Debug("[{0}] Dropping stale tag value for {1} from adapter generation {2} (current {3})",
_connectionName, msg.TagPath, msg.AdapterGeneration, _adapterGeneration);
return;
}
// Fan out to all subscribed instances
foreach (var (instanceName, tags) in _subscriptionsByInstance)
{
if (!tags.Contains(msg.TagPath))
continue;
if (_subscribers.TryGetValue(instanceName, out var subscriber))
{
subscriber.Tell(new TagValueUpdate(
_connectionName, msg.TagPath, msg.Value.Value, msg.Value.Quality, msg.Value.Timestamp));
}
}
// Track quality transitions
if (_lastTagQuality.TryGetValue(msg.TagPath, out var prevQuality))
{
// Decrement old quality bucket
switch (prevQuality)
{
case QualityCode.Good: _tagsGoodQuality--; break;
case QualityCode.Bad: _tagsBadQuality--; break;
case QualityCode.Uncertain: _tagsUncertainQuality--; break;
}
}
// Increment new quality bucket
switch (msg.Value.Quality)
{
case QualityCode.Good: _tagsGoodQuality++; break;
case QualityCode.Bad: _tagsBadQuality++; break;
case QualityCode.Uncertain: _tagsUncertainQuality++; break;
}
_lastTagQuality[msg.TagPath] = msg.Value.Quality;
_healthCollector.UpdateTagQuality(_connectionName, _tagsGoodQuality, _tagsBadQuality, _tagsUncertainQuality);
}
// ── Native alarm subscriptions (Task-10) ──
private void HandleSubscribeAlarms(SubscribeAlarmsRequest request)
{
var subscriber = Sender;
var now = DateTimeOffset.UtcNow;
if (_adapter is not IAlarmSubscribableConnection alarmable)
{
subscriber.Tell(new SubscribeAlarmsResponse(
request.CorrelationId, request.InstanceUniqueName, false,
$"Connection '{_connectionName}' is not alarm-capable.", now));
return;
}
// Register the subscriber for routing (idempotent) before issuing the
// adapter subscribe so a transition that arrives mid-subscribe is routed.
if (!_alarmSourceSubscribers.TryGetValue(request.SourceReference, out var subs))
{
subs = new HashSet();
_alarmSourceSubscribers[request.SourceReference] = subs;
}
subs.Add(subscriber);
_alarmSourceFilter[request.SourceReference] = request.ConditionFilter;
// Parse the type-name filter once; this is the authoritative client-side
// gate consulted on every routed transition (M2.4 / #8).
_alarmSourceFilterPredicate[request.SourceReference] = AlarmConditionFilter.Parse(request.ConditionFilter);
// If the adapter feed for this source is already (being) established, the
// existing subscription serves the new subscriber too.
if (_alarmSubscriptionIds.ContainsKey(request.SourceReference) ||
_alarmSubscribesInFlight.Contains(request.SourceReference))
{
subscriber.Tell(new SubscribeAlarmsResponse(
request.CorrelationId, request.InstanceUniqueName, true, null, now));
return;
}
_alarmSubscribesInFlight.Add(request.SourceReference);
var self = Self;
var generation = _adapterGeneration;
var sourceRef = request.SourceReference;
var filter = request.ConditionFilter;
var corr = request.CorrelationId;
var inst = request.InstanceUniqueName;
alarmable.SubscribeAlarmsAsync(sourceRef, filter,
t => self.Tell(new AlarmTransitionReceived(t, generation)))
.ContinueWith(task => task.IsCompletedSuccessfully
? new AlarmSubscribeCompleted(sourceRef, true, task.Result, null, subscriber, corr, inst) as object
: new AlarmSubscribeCompleted(sourceRef, false, null,
task.Exception?.GetBaseException().Message ?? "Unknown error", subscriber, corr, inst))
.PipeTo(self);
}
private void HandleAlarmSubscribeCompleted(AlarmSubscribeCompleted msg)
{
_alarmSubscribesInFlight.Remove(msg.SourceReference);
if (msg.Success && msg.SubscriptionId != null)
{
_alarmSubscriptionIds[msg.SourceReference] = msg.SubscriptionId;
_log.Info("[{0}] Alarm feed subscribed for source {1}", _connectionName, msg.SourceReference);
}
else if (!msg.Success)
{
_log.Warning("[{0}] Alarm subscribe failed for source {1}: {2}",
_connectionName, msg.SourceReference, msg.Error);
}
// ReplyTo is null for reconnect re-subscribes (no original requester to answer).
msg.ReplyTo?.Tell(new SubscribeAlarmsResponse(
msg.CorrelationId ?? string.Empty, msg.InstanceUniqueName ?? string.Empty,
msg.Success, msg.Error, DateTimeOffset.UtcNow));
}
private void HandleAlarmTransitionReceived(AlarmTransitionReceived msg)
{
// DataConnectionLayer-011: drop transitions from a disposed adapter after failover.
if (msg.AdapterGeneration != _adapterGeneration)
return;
var transition = msg.Transition;
var notified = new HashSet();
foreach (var (sourceRef, subs) in _alarmSourceSubscribers)
{
// A subscriber bound to source S receives a transition whose source
// object (or full reference) falls under S.
var match = transition.SourceObjectReference.StartsWith(sourceRef, StringComparison.Ordinal)
|| transition.SourceReference.StartsWith(sourceRef, StringComparison.Ordinal);
if (!match)
continue;
// M2.4 (#8): authoritative client-side condition-type gate. Applied
// per matched source because two sources may share a prefix yet carry
// different filters. Empty filter = allow all (historical behaviour);
// framing sentinels (SnapshotComplete) are never dropped.
if (_alarmSourceFilterPredicate.TryGetValue(sourceRef, out var predicate) &&
!predicate.IsAllowed(transition))
continue;
foreach (var sub in subs)
{
if (notified.Add(sub))
sub.Tell(new NativeAlarmTransitionUpdate(_connectionName, transition));
}
}
}
private void HandleUnsubscribeAlarms(UnsubscribeAlarmsRequest request)
{
if (!_alarmSourceSubscribers.TryGetValue(request.SourceReference, out var subs))
return;
subs.Remove(Sender);
if (subs.Count > 0)
return;
// No subscribers remain for this source — tear down the adapter feed.
_alarmSourceSubscribers.Remove(request.SourceReference);
_alarmSourceFilter.Remove(request.SourceReference);
_alarmSourceFilterPredicate.Remove(request.SourceReference);
if (_alarmSubscriptionIds.Remove(request.SourceReference, out var subId) &&
_adapter is IAlarmSubscribableConnection alarmable)
{
_ = alarmable.UnsubscribeAlarmsAsync(subId);
}
}
/// Re-establishes all native alarm feeds after a reconnect; the source replays a snapshot.
private void ReSubscribeAllAlarms()
{
if (_adapter is not IAlarmSubscribableConnection alarmable || _alarmSourceSubscribers.Count == 0)
return;
_alarmSubscriptionIds.Clear();
_alarmSubscribesInFlight.Clear();
var self = Self;
var generation = _adapterGeneration;
foreach (var sourceRef in _alarmSourceSubscribers.Keys.ToList())
{
var sr = sourceRef;
var filter = _alarmSourceFilter.GetValueOrDefault(sourceRef);
_alarmSubscribesInFlight.Add(sr);
alarmable.SubscribeAlarmsAsync(sr, filter,
t => self.Tell(new AlarmTransitionReceived(t, generation)))
.ContinueWith(task => task.IsCompletedSuccessfully
? new AlarmSubscribeCompleted(sr, true, task.Result, null, null, null, null) as object
: new AlarmSubscribeCompleted(sr, false, null,
task.Exception?.GetBaseException().Message ?? "Unknown error", null, null, null))
.PipeTo(self);
}
}
/// Notifies alarm subscribers that the source feed is unavailable (connection lost).
private void PushAlarmSourceUnavailable()
{
var now = DateTimeOffset.UtcNow;
foreach (var (sourceRef, subs) in _alarmSourceSubscribers)
{
foreach (var sub in subs)
sub.Tell(new NativeAlarmSourceUnavailable(_connectionName, sourceRef, now));
}
}
// ── Internal messages ──
internal record AttemptConnect;
internal record ConnectResult(bool Success, string? Error);
internal record AdapterDisconnected;
internal record TagValueReceived(string TagPath, TagValue Value, int AdapterGeneration);
internal record TagResolutionFailed(string TagPath, string Error);
internal record TagResolutionSucceeded(string TagPath, string SubscriptionId);
internal record RetryTagResolution;
internal record SubscribeTagResult(
string TagPath, bool AlreadySubscribed, bool Success, string? SubscriptionId, string? Error,
bool ConnectionLevelFailure = false);
internal record SubscribeCompleted(
SubscribeTagsRequest Request, IActorRef ReplyTo, IReadOnlyList Results);
internal record AlarmTransitionReceived(NativeAlarmTransition Transition, int AdapterGeneration);
internal record AlarmSubscribeCompleted(
string SourceReference, bool Success, string? SubscriptionId, string? Error,
IActorRef? ReplyTo, string? CorrelationId, string? InstanceUniqueName);
public record GetHealthReport;
}