using Akka.Actor;
using Akka.Event;
using ScadaLink.Commons.Interfaces.Protocol;
using ScadaLink.Commons.Messages.DataConnection;
using ScadaLink.Commons.Types.Enums;
using ScadaLink.HealthMonitoring;
using ScadaLink.SiteEventLogging;
namespace ScadaLink.DataConnectionLayer.Actors;
///
/// WP-6: Connection actor using Akka.NET Become/Stash pattern for lifecycle state machine.
///
/// States:
/// - Connecting: stash subscribe/write requests; attempts connection
/// - Connected: unstash and process all requests
/// - Reconnecting: push bad quality for all subscribed tags, stash new requests,
/// fixed-interval reconnect
///
/// WP-9: Auto-reconnect with bad quality on disconnect.
/// WP-10: Transparent re-subscribe after reconnection.
/// WP-11: Write-back support (synchronous failure to caller, no S&F).
/// WP-12: Tag path resolution with retry.
/// WP-13: Health reporting (connection status + tag resolution counts).
/// WP-14: Subscription lifecycle (register on create, cleanup on stop).
///
public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
{
public enum ActiveEndpoint { Primary, Backup }
private readonly ILoggingAdapter _log = Context.GetLogger();
private readonly string _connectionName;
private IDataConnection _adapter;
private readonly DataConnectionOptions _options;
private readonly ISiteHealthCollector _healthCollector;
private readonly IDataConnectionFactory _factory;
private readonly string _protocolType;
private readonly ISiteEventLogger? _siteEventLogger;
public IStash Stash { get; set; } = null!;
public ITimerScheduler Timers { get; set; } = null!;
///
/// Active subscriptions: instanceUniqueName → set of tag paths.
///
private readonly Dictionary> _subscriptionsByInstance = new();
///
/// Subscription IDs returned by the adapter: tagPath → subscriptionId.
///
private readonly Dictionary _subscriptionIds = new();
///
/// Tags whose path resolution failed and are awaiting retry.
///
private readonly HashSet _unresolvedTags = new();
///
/// Subscribers: instanceUniqueName → IActorRef (the Instance Actor).
///
private readonly Dictionary _subscribers = new();
///
/// Tracks total subscribed and resolved tags for health reporting.
///
private int _totalSubscribed;
private int _resolvedTags;
private int _tagsGoodQuality;
private int _tagsBadQuality;
private int _tagsUncertainQuality;
private readonly Dictionary _lastTagQuality = new();
private IDictionary _connectionDetails;
private readonly IDictionary _primaryConfig;
private readonly IDictionary? _backupConfig;
private readonly int _failoverRetryCount;
private ActiveEndpoint _activeEndpoint = ActiveEndpoint.Primary;
private int _consecutiveFailures;
private int _consecutiveUnstableDisconnects;
private DateTimeOffset _lastConnectedAt;
///
/// Captured Self reference for use from non-actor threads (event handlers, callbacks).
/// Akka.NET's Self property is only valid inside the actor's message loop.
///
private IActorRef _self = null!;
public DataConnectionActor(
string connectionName,
IDataConnection adapter,
DataConnectionOptions options,
ISiteHealthCollector healthCollector,
IDataConnectionFactory factory,
string protocolType,
IDictionary? primaryConfig = null,
IDictionary? backupConfig = null,
int failoverRetryCount = 3,
ISiteEventLogger? siteEventLogger = null)
{
_connectionName = connectionName;
_adapter = adapter;
_options = options;
_healthCollector = healthCollector;
_factory = factory;
_protocolType = protocolType;
_primaryConfig = primaryConfig ?? new Dictionary();
_backupConfig = backupConfig;
_failoverRetryCount = failoverRetryCount;
_siteEventLogger = siteEventLogger;
_connectionDetails = _primaryConfig;
}
protected override void PreStart()
{
_log.Info("DataConnectionActor [{0}] starting in Connecting state", _connectionName);
// Capture Self for use from non-actor threads (event handlers, callbacks).
// Akka.NET's Self property is only valid inside the actor's message loop.
_self = Self;
// Listen for unexpected adapter disconnections
_adapter.Disconnected += OnAdapterDisconnected;
BecomeConnecting();
}
private void OnAdapterDisconnected()
{
// Marshal the event onto the actor's message loop using captured _self reference.
// This runs on a background thread (gRPC stream reader), so Self would throw.
_self.Tell(new AdapterDisconnected());
}
protected override void PostStop()
{
_log.Info("DataConnectionActor [{0}] stopping — disposing adapter", _connectionName);
_adapter.Disconnected -= OnAdapterDisconnected;
_ = _adapter.DisposeAsync().AsTask();
}
protected override void OnReceive(object message)
{
// Default handler — should not be reached due to Become
Unhandled(message);
}
// ── Connecting State ──
private void BecomeConnecting()
{
_log.Info("[{0}] Entering Connecting state", _connectionName);
_healthCollector.UpdateConnectionHealth(_connectionName, ConnectionHealth.Connecting);
_healthCollector.UpdateConnectionEndpoint(_connectionName, "Connecting");
Become(Connecting);
Self.Tell(new AttemptConnect());
}
private void Connecting(object message)
{
switch (message)
{
case AttemptConnect:
HandleAttemptConnect();
break;
case ConnectResult result:
HandleConnectResult(result);
break;
case SubscribeTagsRequest:
case WriteTagRequest:
case UnsubscribeTagsRequest:
Stash.Stash();
break;
case GetHealthReport:
ReplyWithHealthReport();
break;
default:
Unhandled(message);
break;
}
}
// ── Connected State ──
///
/// Minimum time connected before we consider the connection stable.
/// If we disconnect before this, it counts as an unstable connection toward failover.
///
private static readonly TimeSpan StableConnectionThreshold = TimeSpan.FromSeconds(60);
private void BecomeConnected()
{
_log.Info("[{0}] Entering Connected state", _connectionName);
_lastConnectedAt = DateTimeOffset.UtcNow;
_healthCollector.UpdateConnectionHealth(_connectionName, ConnectionHealth.Connected);
_healthCollector.UpdateTagResolution(_connectionName, _totalSubscribed, _resolvedTags);
var endpointLabel = _backupConfig == null ? "Connected" : $"Connected to {_activeEndpoint.ToString().ToLower()}";
_healthCollector.UpdateConnectionEndpoint(_connectionName, endpointLabel);
Become(Connected);
Stash.UnstashAll();
}
private void Connected(object message)
{
switch (message)
{
case SubscribeTagsRequest req:
HandleSubscribe(req);
break;
case UnsubscribeTagsRequest req:
HandleUnsubscribe(req);
break;
case WriteTagRequest req:
HandleWrite(req);
break;
case TagValueReceived tvr:
HandleTagValueReceived(tvr);
break;
case TagResolutionSucceeded trs:
HandleTagResolutionSucceeded(trs);
break;
case TagResolutionFailed trf:
HandleTagResolutionFailed(trf);
break;
case AdapterDisconnected:
HandleDisconnect();
break;
case RetryTagResolution:
HandleRetryTagResolution();
break;
case GetHealthReport:
ReplyWithHealthReport();
break;
default:
Unhandled(message);
break;
}
}
// ── Reconnecting State ──
private void BecomeReconnecting()
{
_log.Warning("[{0}] Entering Reconnecting state", _connectionName);
_healthCollector.UpdateConnectionHealth(_connectionName, ConnectionHealth.Disconnected);
_healthCollector.UpdateConnectionEndpoint(_connectionName, "Disconnected");
// Track unstable connections toward failover.
// If we were connected for less than the stability threshold, this counts
// as an unstable cycle (e.g., connect succeeded but heartbeat went stale).
var connectionDuration = DateTimeOffset.UtcNow - _lastConnectedAt;
if (_lastConnectedAt != default && connectionDuration < StableConnectionThreshold)
{
_consecutiveUnstableDisconnects++;
_log.Warning("[{0}] Unstable connection (lasted {1:F0}s) — consecutive unstable disconnects: {2}/{3}",
_connectionName, connectionDuration.TotalSeconds, _consecutiveUnstableDisconnects,
_backupConfig != null ? _failoverRetryCount : 0);
}
else
{
_consecutiveUnstableDisconnects = 0;
}
// Failover if we keep connecting and going stale repeatedly
if (_backupConfig != null && _consecutiveUnstableDisconnects >= _failoverRetryCount)
{
var previousEndpoint = _activeEndpoint;
_activeEndpoint = _activeEndpoint == ActiveEndpoint.Primary
? ActiveEndpoint.Backup
: ActiveEndpoint.Primary;
_consecutiveUnstableDisconnects = 0;
_consecutiveFailures = 0;
var newConfig = _activeEndpoint == ActiveEndpoint.Primary
? _primaryConfig
: _backupConfig;
// Dispose old adapter
_adapter.Disconnected -= OnAdapterDisconnected;
_ = _adapter.DisposeAsync().AsTask();
// Create new adapter for the target endpoint
_adapter = _factory.Create(_protocolType, newConfig);
_connectionDetails = newConfig;
_adapter.Disconnected += OnAdapterDisconnected;
_log.Warning("[{0}] Failing over from {1} to {2} (unstable connection pattern)",
_connectionName, previousEndpoint, _activeEndpoint);
if (_siteEventLogger != null)
{
_ = _siteEventLogger.LogEventAsync(
"connection", "Warning", null, _connectionName,
$"Failover from {previousEndpoint} to {_activeEndpoint} (unstable connection)",
$"Connection lasted {connectionDuration.TotalSeconds:F0}s, threshold {StableConnectionThreshold.TotalSeconds:F0}s");
}
}
// Log disconnect to site event log
if (_siteEventLogger != null)
{
_ = _siteEventLogger.LogEventAsync(
"connection", "Warning", null, _connectionName,
$"Connection lost — entering reconnect cycle", null);
}
Become(Reconnecting);
// WP-9: Push bad quality for all subscribed tags on disconnect
PushBadQualityForAllTags();
// Schedule reconnect attempt
Timers.StartSingleTimer("reconnect", new AttemptConnect(), _options.ReconnectInterval);
}
private void Reconnecting(object message)
{
switch (message)
{
case AttemptConnect:
HandleAttemptConnect();
break;
case ConnectResult result:
HandleReconnectResult(result);
break;
case SubscribeTagsRequest:
case WriteTagRequest:
Stash.Stash();
break;
case UnsubscribeTagsRequest req:
// Allow unsubscribe even during reconnect (for cleanup on instance stop)
HandleUnsubscribe(req);
break;
case TagValueReceived:
// Ignore — stale callback from previous connection
break;
case TagResolutionSucceeded:
case TagResolutionFailed:
// Ignore — stale results from previous connection; ReSubscribeAll runs after reconnect
break;
case GetHealthReport:
ReplyWithHealthReport();
break;
default:
Unhandled(message);
break;
}
}
// ── Connection Management ──
private void HandleAttemptConnect()
{
_log.Debug("[{0}] Attempting connection...", _connectionName);
var self = Self;
_adapter.ConnectAsync(_connectionDetails).ContinueWith(t =>
{
if (t.IsCompletedSuccessfully)
return new ConnectResult(true, null);
return new ConnectResult(false, t.Exception?.GetBaseException().Message);
}).PipeTo(self);
}
private void HandleConnectResult(ConnectResult result)
{
if (result.Success)
{
_log.Info("[{0}] Connection established", _connectionName);
BecomeConnected();
}
else
{
_log.Warning("[{0}] Connection failed: {1}. Retrying in {2}s",
_connectionName, result.Error, _options.ReconnectInterval.TotalSeconds);
Timers.StartSingleTimer("reconnect", new AttemptConnect(), _options.ReconnectInterval);
}
}
private void HandleReconnectResult(ConnectResult result)
{
if (result.Success)
{
_log.Info("[{0}] Reconnected successfully on {1} endpoint", _connectionName, _activeEndpoint);
_consecutiveFailures = 0;
// Log restoration event to site event log
if (_siteEventLogger != null)
{
_ = _siteEventLogger.LogEventAsync(
"connection", "Info", null, _connectionName,
$"Connection restored on {_activeEndpoint} endpoint", null);
}
// WP-10: Transparent re-subscribe — re-establish all active subscriptions
ReSubscribeAll();
BecomeConnected();
}
else
{
_consecutiveFailures++;
// Failover: switch endpoint after exhausting retry count (only if backup is configured)
if (_backupConfig != null && _consecutiveFailures >= _failoverRetryCount)
{
var previousEndpoint = _activeEndpoint;
_activeEndpoint = _activeEndpoint == ActiveEndpoint.Primary
? ActiveEndpoint.Backup
: ActiveEndpoint.Primary;
_consecutiveFailures = 0;
var newConfig = _activeEndpoint == ActiveEndpoint.Primary
? _primaryConfig
: _backupConfig;
// Dispose old adapter (fire-and-forget — don't await in actor context)
_adapter.Disconnected -= OnAdapterDisconnected;
_ = _adapter.DisposeAsync().AsTask();
// Create new adapter for the target endpoint
_adapter = _factory.Create(_protocolType, newConfig);
_connectionDetails = newConfig;
// Wire disconnect handler on new adapter
_adapter.Disconnected += OnAdapterDisconnected;
_log.Warning("[{0}] Failing over from {1} to {2}",
_connectionName, previousEndpoint, _activeEndpoint);
// Log failover event to site event log
if (_siteEventLogger != null)
{
_ = _siteEventLogger.LogEventAsync(
"connection", "Warning", null, _connectionName,
$"Failover from {previousEndpoint} to {_activeEndpoint}",
$"After {_failoverRetryCount} consecutive failures");
}
}
else
{
var retryLimit = _backupConfig != null ? _failoverRetryCount.ToString() : "∞";
_log.Warning("[{0}] Reconnect failed: {1}. Retrying in {2}s (attempt {3}/{4})",
_connectionName, result.Error, _options.ReconnectInterval.TotalSeconds,
_consecutiveFailures, retryLimit);
}
Timers.StartSingleTimer("reconnect", new AttemptConnect(), _options.ReconnectInterval);
}
}
private void HandleDisconnect()
{
_log.Warning("[{0}] AdapterDisconnected message received — transitioning to Reconnecting", _connectionName);
BecomeReconnecting();
}
// ── Subscription Management (WP-14) ──
private void HandleSubscribe(SubscribeTagsRequest request)
{
_log.Debug("[{0}] Subscribing {1} tags for instance {2}",
_connectionName, request.TagPaths.Count, request.InstanceUniqueName);
_subscribers[request.InstanceUniqueName] = Sender;
if (!_subscriptionsByInstance.ContainsKey(request.InstanceUniqueName))
_subscriptionsByInstance[request.InstanceUniqueName] = new HashSet();
var instanceTags = _subscriptionsByInstance[request.InstanceUniqueName];
var self = Self;
var sender = Sender;
Task.Run(async () =>
{
foreach (var tagPath in request.TagPaths)
{
if (_subscriptionIds.ContainsKey(tagPath))
{
// Already subscribed — just track for this instance
instanceTags.Add(tagPath);
continue;
}
try
{
var subId = await _adapter.SubscribeAsync(tagPath, (path, value) =>
{
self.Tell(new TagValueReceived(path, value));
});
_subscriptionIds[tagPath] = subId;
instanceTags.Add(tagPath);
_totalSubscribed++;
_resolvedTags++;
}
catch (Exception ex)
{
// WP-12: Tag path resolution failure — mark as unresolved, retry later
_unresolvedTags.Add(tagPath);
instanceTags.Add(tagPath);
_totalSubscribed++;
self.Tell(new TagResolutionFailed(tagPath, ex.Message));
}
}
// Initial read — seed current values for all resolved tags so the Instance Actor
// doesn't stay Uncertain until the next OPC UA data change notification
foreach (var tagPath in instanceTags)
{
if (_unresolvedTags.Contains(tagPath)) continue;
try
{
var readResult = await _adapter.ReadAsync(tagPath);
if (readResult.Success && readResult.Value != null)
{
self.Tell(new TagValueReceived(tagPath, readResult.Value));
}
}
catch
{
// Best-effort — subscription will deliver subsequent changes
}
}
return new SubscribeTagsResponse(
request.CorrelationId, request.InstanceUniqueName, true, null, DateTimeOffset.UtcNow);
}).PipeTo(sender);
// Start tag resolution retry timer if we have unresolved tags
if (_unresolvedTags.Count > 0)
{
Timers.StartPeriodicTimer(
"tag-resolution-retry",
new RetryTagResolution(),
_options.TagResolutionRetryInterval,
_options.TagResolutionRetryInterval);
}
}
private void HandleUnsubscribe(UnsubscribeTagsRequest request)
{
_log.Debug("[{0}] Unsubscribing all tags for instance {1}",
_connectionName, request.InstanceUniqueName);
if (!_subscriptionsByInstance.TryGetValue(request.InstanceUniqueName, out var tags))
return;
// WP-14: Cleanup on Instance Actor stop
foreach (var tagPath in tags)
{
// Check if any other instance is still subscribed to this tag
var otherSubscribers = _subscriptionsByInstance
.Where(kvp => kvp.Key != request.InstanceUniqueName && kvp.Value.Contains(tagPath))
.Any();
if (!otherSubscribers && _subscriptionIds.TryGetValue(tagPath, out var subId))
{
_ = _adapter.UnsubscribeAsync(subId);
_subscriptionIds.Remove(tagPath);
_unresolvedTags.Remove(tagPath);
_totalSubscribed--;
if (!_unresolvedTags.Contains(tagPath))
_resolvedTags--;
}
}
_subscriptionsByInstance.Remove(request.InstanceUniqueName);
_subscribers.Remove(request.InstanceUniqueName);
}
// ── Write Support (WP-11) ──
private void HandleWrite(WriteTagRequest request)
{
_log.Debug("[{0}] Writing to tag {1}", _connectionName, request.TagPath);
var sender = Sender;
// WP-11: Write through DCL to device, failure returned synchronously
_adapter.WriteAsync(request.TagPath, request.Value).ContinueWith(t =>
{
if (t.IsCompletedSuccessfully)
{
var result = t.Result;
return new WriteTagResponse(
request.CorrelationId, result.Success, result.ErrorMessage, DateTimeOffset.UtcNow);
}
return new WriteTagResponse(
request.CorrelationId, false, t.Exception?.GetBaseException().Message, DateTimeOffset.UtcNow);
}).PipeTo(sender);
}
// ── Tag Resolution Retry (WP-12) ──
private void HandleRetryTagResolution()
{
if (_unresolvedTags.Count == 0)
{
Timers.Cancel("tag-resolution-retry");
return;
}
_log.Debug("[{0}] Retrying resolution for {1} unresolved tags", _connectionName, _unresolvedTags.Count);
var self = Self;
var toResolve = _unresolvedTags.ToList();
foreach (var tagPath in toResolve)
{
_adapter.SubscribeAsync(tagPath, (path, value) =>
{
self.Tell(new TagValueReceived(path, value));
}).ContinueWith(t =>
{
if (t.IsCompletedSuccessfully)
return new TagResolutionSucceeded(tagPath, t.Result) as object;
return new TagResolutionFailed(tagPath, t.Exception?.GetBaseException().Message ?? "Unknown error");
}).PipeTo(self);
}
}
// ── Bad Quality Push (WP-9) ──
private void PushBadQualityForAllTags()
{
var now = DateTimeOffset.UtcNow;
foreach (var (instanceName, tags) in _subscriptionsByInstance)
{
if (!_subscribers.TryGetValue(instanceName, out var subscriber))
continue;
subscriber.Tell(new ConnectionQualityChanged(_connectionName, QualityCode.Bad, now));
}
// All tags now bad quality
_tagsGoodQuality = 0;
_tagsUncertainQuality = 0;
_tagsBadQuality = _lastTagQuality.Count;
foreach (var key in _lastTagQuality.Keys.ToList())
_lastTagQuality[key] = QualityCode.Bad;
_healthCollector.UpdateTagQuality(_connectionName, _tagsGoodQuality, _tagsBadQuality, _tagsUncertainQuality);
}
// ── Re-subscribe (WP-10) ──
private void ReSubscribeAll()
{
// Derive tag list from _subscriptionsByInstance (durable source of truth),
// not _subscriptionIds which gets cleared and is only repopulated on success.
var allTags = _subscriptionsByInstance.Values
.SelectMany(tags => tags)
.Distinct()
.ToList();
_log.Info("[{0}] Re-subscribing {1} tags after reconnect", _connectionName, allTags.Count);
var self = Self;
_subscriptionIds.Clear();
_unresolvedTags.Clear();
_resolvedTags = 0;
foreach (var tagPath in allTags)
{
_adapter.SubscribeAsync(tagPath, (path, value) =>
{
self.Tell(new TagValueReceived(path, value));
}).ContinueWith(t =>
{
if (t.IsCompletedSuccessfully)
return new TagResolutionSucceeded(tagPath, t.Result) as object;
return new TagResolutionFailed(tagPath, t.Exception?.GetBaseException().Message ?? "Unknown error");
}).PipeTo(self);
}
}
// ── Health Reporting (WP-13) ──
private void ReplyWithHealthReport()
{
var status = _adapter.Status;
var endpointLabel = _backupConfig == null
? "Primary (no backup)"
: _activeEndpoint.ToString();
Sender.Tell(new DataConnectionHealthReport(
_connectionName, status, _totalSubscribed, _resolvedTags, endpointLabel, DateTimeOffset.UtcNow));
}
// ── Internal message handlers for piped async results ──
private void HandleTagResolutionSucceeded(TagResolutionSucceeded msg)
{
if (_unresolvedTags.Remove(msg.TagPath))
{
_subscriptionIds[msg.TagPath] = msg.SubscriptionId;
_resolvedTags++;
_healthCollector.UpdateTagResolution(_connectionName, _totalSubscribed, _resolvedTags);
_log.Info("[{0}] Tag resolved: {1}", _connectionName, msg.TagPath);
}
if (_unresolvedTags.Count == 0)
{
Timers.Cancel("tag-resolution-retry");
}
}
private void HandleTagResolutionFailed(TagResolutionFailed msg)
{
_log.Debug("[{0}] Tag resolution still failing for {1}: {2}",
_connectionName, msg.TagPath, msg.Error);
// Track as unresolved so periodic retry picks it up
if (_unresolvedTags.Add(msg.TagPath))
{
Timers.StartPeriodicTimer(
"tag-resolution-retry",
new RetryTagResolution(),
_options.TagResolutionRetryInterval,
_options.TagResolutionRetryInterval);
}
}
private void HandleTagValueReceived(TagValueReceived msg)
{
// Fan out to all subscribed instances
foreach (var (instanceName, tags) in _subscriptionsByInstance)
{
if (!tags.Contains(msg.TagPath))
continue;
if (_subscribers.TryGetValue(instanceName, out var subscriber))
{
subscriber.Tell(new TagValueUpdate(
_connectionName, msg.TagPath, msg.Value.Value, msg.Value.Quality, msg.Value.Timestamp));
}
}
// Track quality transitions
if (_lastTagQuality.TryGetValue(msg.TagPath, out var prevQuality))
{
// Decrement old quality bucket
switch (prevQuality)
{
case QualityCode.Good: _tagsGoodQuality--; break;
case QualityCode.Bad: _tagsBadQuality--; break;
case QualityCode.Uncertain: _tagsUncertainQuality--; break;
}
}
// Increment new quality bucket
switch (msg.Value.Quality)
{
case QualityCode.Good: _tagsGoodQuality++; break;
case QualityCode.Bad: _tagsBadQuality++; break;
case QualityCode.Uncertain: _tagsUncertainQuality++; break;
}
_lastTagQuality[msg.TagPath] = msg.Value.Quality;
_healthCollector.UpdateTagQuality(_connectionName, _tagsGoodQuality, _tagsBadQuality, _tagsUncertainQuality);
}
// ── Internal messages ──
internal record AttemptConnect;
internal record ConnectResult(bool Success, string? Error);
internal record AdapterDisconnected;
internal record TagValueReceived(string TagPath, TagValue Value);
internal record TagResolutionFailed(string TagPath, string Error);
internal record TagResolutionSucceeded(string TagPath, string SubscriptionId);
internal record RetryTagResolution;
public record GetHealthReport;
}