- AkkaHostedService: SetNodeHostname from NodeOptions - DataConnectionActor: UpdateConnectionEndpoint on state transitions, track per-tag quality counts and UpdateTagQuality on value changes - HealthReportSender: query StoreAndForwardStorage for parked message count - StoreAndForwardStorage: add GetParkedMessageCountAsync()
700 lines
25 KiB
C#
700 lines
25 KiB
C#
using Akka.Actor;
|
|
using Akka.Event;
|
|
using ScadaLink.Commons.Interfaces.Protocol;
|
|
using ScadaLink.Commons.Messages.DataConnection;
|
|
using ScadaLink.Commons.Types.Enums;
|
|
using ScadaLink.HealthMonitoring;
|
|
using ScadaLink.SiteEventLogging;
|
|
|
|
namespace ScadaLink.DataConnectionLayer.Actors;
|
|
|
|
/// <summary>
|
|
/// WP-6: Connection actor using Akka.NET Become/Stash pattern for lifecycle state machine.
|
|
///
|
|
/// States:
|
|
/// - Connecting: stash subscribe/write requests; attempts connection
|
|
/// - Connected: unstash and process all requests
|
|
/// - Reconnecting: push bad quality for all subscribed tags, stash new requests,
|
|
/// fixed-interval reconnect
|
|
///
|
|
/// WP-9: Auto-reconnect with bad quality on disconnect.
|
|
/// WP-10: Transparent re-subscribe after reconnection.
|
|
/// WP-11: Write-back support (synchronous failure to caller, no S&F).
|
|
/// WP-12: Tag path resolution with retry.
|
|
/// WP-13: Health reporting (connection status + tag resolution counts).
|
|
/// WP-14: Subscription lifecycle (register on create, cleanup on stop).
|
|
/// </summary>
|
|
public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
|
|
{
|
|
public enum ActiveEndpoint { Primary, Backup }
|
|
|
|
private readonly ILoggingAdapter _log = Context.GetLogger();
|
|
private readonly string _connectionName;
|
|
private IDataConnection _adapter;
|
|
private readonly DataConnectionOptions _options;
|
|
private readonly ISiteHealthCollector _healthCollector;
|
|
private readonly IDataConnectionFactory _factory;
|
|
private readonly string _protocolType;
|
|
private readonly ISiteEventLogger? _siteEventLogger;
|
|
|
|
public IStash Stash { get; set; } = null!;
|
|
public ITimerScheduler Timers { get; set; } = null!;
|
|
|
|
/// <summary>
|
|
/// Active subscriptions: instanceUniqueName → set of tag paths.
|
|
/// </summary>
|
|
private readonly Dictionary<string, HashSet<string>> _subscriptionsByInstance = new();
|
|
|
|
/// <summary>
|
|
/// Subscription IDs returned by the adapter: tagPath → subscriptionId.
|
|
/// </summary>
|
|
private readonly Dictionary<string, string> _subscriptionIds = new();
|
|
|
|
/// <summary>
|
|
/// Tags whose path resolution failed and are awaiting retry.
|
|
/// </summary>
|
|
private readonly HashSet<string> _unresolvedTags = new();
|
|
|
|
/// <summary>
|
|
/// Subscribers: instanceUniqueName → IActorRef (the Instance Actor).
|
|
/// </summary>
|
|
private readonly Dictionary<string, IActorRef> _subscribers = new();
|
|
|
|
/// <summary>
|
|
/// Tracks total subscribed and resolved tags for health reporting.
|
|
/// </summary>
|
|
private int _totalSubscribed;
|
|
private int _resolvedTags;
|
|
|
|
private int _tagsGoodQuality;
|
|
private int _tagsBadQuality;
|
|
private int _tagsUncertainQuality;
|
|
private readonly Dictionary<string, QualityCode> _lastTagQuality = new();
|
|
|
|
private IDictionary<string, string> _connectionDetails;
|
|
private readonly IDictionary<string, string> _primaryConfig;
|
|
private readonly IDictionary<string, string>? _backupConfig;
|
|
private readonly int _failoverRetryCount;
|
|
private ActiveEndpoint _activeEndpoint = ActiveEndpoint.Primary;
|
|
private int _consecutiveFailures;
|
|
|
|
/// <summary>
|
|
/// Captured Self reference for use from non-actor threads (event handlers, callbacks).
|
|
/// Akka.NET's Self property is only valid inside the actor's message loop.
|
|
/// </summary>
|
|
private IActorRef _self = null!;
|
|
|
|
public DataConnectionActor(
|
|
string connectionName,
|
|
IDataConnection adapter,
|
|
DataConnectionOptions options,
|
|
ISiteHealthCollector healthCollector,
|
|
IDataConnectionFactory factory,
|
|
string protocolType,
|
|
IDictionary<string, string>? primaryConfig = null,
|
|
IDictionary<string, string>? backupConfig = null,
|
|
int failoverRetryCount = 3,
|
|
ISiteEventLogger? siteEventLogger = null)
|
|
{
|
|
_connectionName = connectionName;
|
|
_adapter = adapter;
|
|
_options = options;
|
|
_healthCollector = healthCollector;
|
|
_factory = factory;
|
|
_protocolType = protocolType;
|
|
_primaryConfig = primaryConfig ?? new Dictionary<string, string>();
|
|
_backupConfig = backupConfig;
|
|
_failoverRetryCount = failoverRetryCount;
|
|
_siteEventLogger = siteEventLogger;
|
|
_connectionDetails = _primaryConfig;
|
|
}
|
|
|
|
protected override void PreStart()
|
|
{
|
|
_log.Info("DataConnectionActor [{0}] starting in Connecting state", _connectionName);
|
|
|
|
// Capture Self for use from non-actor threads (event handlers, callbacks).
|
|
// Akka.NET's Self property is only valid inside the actor's message loop.
|
|
_self = Self;
|
|
|
|
// Listen for unexpected adapter disconnections
|
|
_adapter.Disconnected += OnAdapterDisconnected;
|
|
|
|
BecomeConnecting();
|
|
}
|
|
|
|
private void OnAdapterDisconnected()
|
|
{
|
|
// Marshal the event onto the actor's message loop using captured _self reference.
|
|
// This runs on a background thread (gRPC stream reader), so Self would throw.
|
|
_self.Tell(new AdapterDisconnected());
|
|
}
|
|
|
|
protected override void PostStop()
|
|
{
|
|
_log.Info("DataConnectionActor [{0}] stopping — disposing adapter", _connectionName);
|
|
_adapter.Disconnected -= OnAdapterDisconnected;
|
|
_ = _adapter.DisposeAsync().AsTask();
|
|
}
|
|
|
|
protected override void OnReceive(object message)
|
|
{
|
|
// Default handler — should not be reached due to Become
|
|
Unhandled(message);
|
|
}
|
|
|
|
// ── Connecting State ──
|
|
|
|
private void BecomeConnecting()
|
|
{
|
|
_log.Info("[{0}] Entering Connecting state", _connectionName);
|
|
_healthCollector.UpdateConnectionHealth(_connectionName, ConnectionHealth.Connecting);
|
|
_healthCollector.UpdateConnectionEndpoint(_connectionName, "Connecting");
|
|
Become(Connecting);
|
|
Self.Tell(new AttemptConnect());
|
|
}
|
|
|
|
private void Connecting(object message)
|
|
{
|
|
switch (message)
|
|
{
|
|
case AttemptConnect:
|
|
HandleAttemptConnect();
|
|
break;
|
|
case ConnectResult result:
|
|
HandleConnectResult(result);
|
|
break;
|
|
case SubscribeTagsRequest:
|
|
case WriteTagRequest:
|
|
case UnsubscribeTagsRequest:
|
|
Stash.Stash();
|
|
break;
|
|
case GetHealthReport:
|
|
ReplyWithHealthReport();
|
|
break;
|
|
default:
|
|
Unhandled(message);
|
|
break;
|
|
}
|
|
}
|
|
|
|
// ── Connected State ──
|
|
|
|
private void BecomeConnected()
|
|
{
|
|
_log.Info("[{0}] Entering Connected state", _connectionName);
|
|
_healthCollector.UpdateConnectionHealth(_connectionName, ConnectionHealth.Connected);
|
|
_healthCollector.UpdateTagResolution(_connectionName, _totalSubscribed, _resolvedTags);
|
|
var endpointLabel = _backupConfig == null ? "Connected" : $"Connected to {_activeEndpoint.ToString().ToLower()}";
|
|
_healthCollector.UpdateConnectionEndpoint(_connectionName, endpointLabel);
|
|
Become(Connected);
|
|
Stash.UnstashAll();
|
|
}
|
|
|
|
private void Connected(object message)
|
|
{
|
|
switch (message)
|
|
{
|
|
case SubscribeTagsRequest req:
|
|
HandleSubscribe(req);
|
|
break;
|
|
case UnsubscribeTagsRequest req:
|
|
HandleUnsubscribe(req);
|
|
break;
|
|
case WriteTagRequest req:
|
|
HandleWrite(req);
|
|
break;
|
|
case TagValueReceived tvr:
|
|
HandleTagValueReceived(tvr);
|
|
break;
|
|
case TagResolutionSucceeded trs:
|
|
HandleTagResolutionSucceeded(trs);
|
|
break;
|
|
case TagResolutionFailed trf:
|
|
HandleTagResolutionFailed(trf);
|
|
break;
|
|
case AdapterDisconnected:
|
|
HandleDisconnect();
|
|
break;
|
|
case RetryTagResolution:
|
|
HandleRetryTagResolution();
|
|
break;
|
|
case GetHealthReport:
|
|
ReplyWithHealthReport();
|
|
break;
|
|
default:
|
|
Unhandled(message);
|
|
break;
|
|
}
|
|
}
|
|
|
|
// ── Reconnecting State ──
|
|
|
|
private void BecomeReconnecting()
|
|
{
|
|
_log.Warning("[{0}] Entering Reconnecting state", _connectionName);
|
|
_healthCollector.UpdateConnectionHealth(_connectionName, ConnectionHealth.Disconnected);
|
|
_healthCollector.UpdateConnectionEndpoint(_connectionName, "Disconnected");
|
|
Become(Reconnecting);
|
|
|
|
// WP-9: Push bad quality for all subscribed tags on disconnect
|
|
PushBadQualityForAllTags();
|
|
|
|
// Schedule reconnect attempt
|
|
Timers.StartSingleTimer("reconnect", new AttemptConnect(), _options.ReconnectInterval);
|
|
}
|
|
|
|
private void Reconnecting(object message)
|
|
{
|
|
switch (message)
|
|
{
|
|
case AttemptConnect:
|
|
HandleAttemptConnect();
|
|
break;
|
|
case ConnectResult result:
|
|
HandleReconnectResult(result);
|
|
break;
|
|
case SubscribeTagsRequest:
|
|
case WriteTagRequest:
|
|
Stash.Stash();
|
|
break;
|
|
case UnsubscribeTagsRequest req:
|
|
// Allow unsubscribe even during reconnect (for cleanup on instance stop)
|
|
HandleUnsubscribe(req);
|
|
break;
|
|
case TagValueReceived:
|
|
// Ignore — stale callback from previous connection
|
|
break;
|
|
case TagResolutionSucceeded:
|
|
case TagResolutionFailed:
|
|
// Ignore — stale results from previous connection; ReSubscribeAll runs after reconnect
|
|
break;
|
|
case GetHealthReport:
|
|
ReplyWithHealthReport();
|
|
break;
|
|
default:
|
|
Unhandled(message);
|
|
break;
|
|
}
|
|
}
|
|
|
|
// ── Connection Management ──
|
|
|
|
private void HandleAttemptConnect()
|
|
{
|
|
_log.Debug("[{0}] Attempting connection...", _connectionName);
|
|
var self = Self;
|
|
_adapter.ConnectAsync(_connectionDetails).ContinueWith(t =>
|
|
{
|
|
if (t.IsCompletedSuccessfully)
|
|
return new ConnectResult(true, null);
|
|
return new ConnectResult(false, t.Exception?.GetBaseException().Message);
|
|
}).PipeTo(self);
|
|
}
|
|
|
|
private void HandleConnectResult(ConnectResult result)
|
|
{
|
|
if (result.Success)
|
|
{
|
|
_log.Info("[{0}] Connection established", _connectionName);
|
|
BecomeConnected();
|
|
}
|
|
else
|
|
{
|
|
_log.Warning("[{0}] Connection failed: {1}. Retrying in {2}s",
|
|
_connectionName, result.Error, _options.ReconnectInterval.TotalSeconds);
|
|
Timers.StartSingleTimer("reconnect", new AttemptConnect(), _options.ReconnectInterval);
|
|
}
|
|
}
|
|
|
|
private void HandleReconnectResult(ConnectResult result)
|
|
{
|
|
if (result.Success)
|
|
{
|
|
_log.Info("[{0}] Reconnected successfully on {1} endpoint", _connectionName, _activeEndpoint);
|
|
_consecutiveFailures = 0;
|
|
|
|
// Log restoration event to site event log
|
|
if (_siteEventLogger != null)
|
|
{
|
|
_ = _siteEventLogger.LogEventAsync(
|
|
"connection", "Info", null, _connectionName,
|
|
$"Connection restored on {_activeEndpoint} endpoint", null);
|
|
}
|
|
|
|
// WP-10: Transparent re-subscribe — re-establish all active subscriptions
|
|
ReSubscribeAll();
|
|
|
|
BecomeConnected();
|
|
}
|
|
else
|
|
{
|
|
_consecutiveFailures++;
|
|
|
|
// Failover: switch endpoint after exhausting retry count (only if backup is configured)
|
|
if (_backupConfig != null && _consecutiveFailures >= _failoverRetryCount)
|
|
{
|
|
var previousEndpoint = _activeEndpoint;
|
|
_activeEndpoint = _activeEndpoint == ActiveEndpoint.Primary
|
|
? ActiveEndpoint.Backup
|
|
: ActiveEndpoint.Primary;
|
|
_consecutiveFailures = 0;
|
|
|
|
var newConfig = _activeEndpoint == ActiveEndpoint.Primary
|
|
? _primaryConfig
|
|
: _backupConfig;
|
|
|
|
// Dispose old adapter (fire-and-forget — don't await in actor context)
|
|
_adapter.Disconnected -= OnAdapterDisconnected;
|
|
_ = _adapter.DisposeAsync().AsTask();
|
|
|
|
// Create new adapter for the target endpoint
|
|
_adapter = _factory.Create(_protocolType, newConfig);
|
|
_connectionDetails = newConfig;
|
|
|
|
// Wire disconnect handler on new adapter
|
|
_adapter.Disconnected += OnAdapterDisconnected;
|
|
|
|
_log.Warning("[{0}] Failing over from {1} to {2}",
|
|
_connectionName, previousEndpoint, _activeEndpoint);
|
|
|
|
// Log failover event to site event log
|
|
if (_siteEventLogger != null)
|
|
{
|
|
_ = _siteEventLogger.LogEventAsync(
|
|
"connection", "Warning", null, _connectionName,
|
|
$"Failover from {previousEndpoint} to {_activeEndpoint}",
|
|
$"After {_failoverRetryCount} consecutive failures");
|
|
}
|
|
}
|
|
else
|
|
{
|
|
var retryLimit = _backupConfig != null ? _failoverRetryCount.ToString() : "∞";
|
|
_log.Warning("[{0}] Reconnect failed: {1}. Retrying in {2}s (attempt {3}/{4})",
|
|
_connectionName, result.Error, _options.ReconnectInterval.TotalSeconds,
|
|
_consecutiveFailures, retryLimit);
|
|
}
|
|
|
|
Timers.StartSingleTimer("reconnect", new AttemptConnect(), _options.ReconnectInterval);
|
|
}
|
|
}
|
|
|
|
private void HandleDisconnect()
|
|
{
|
|
_log.Warning("[{0}] AdapterDisconnected message received — transitioning to Reconnecting", _connectionName);
|
|
BecomeReconnecting();
|
|
}
|
|
|
|
// ── Subscription Management (WP-14) ──
|
|
|
|
private void HandleSubscribe(SubscribeTagsRequest request)
|
|
{
|
|
_log.Debug("[{0}] Subscribing {1} tags for instance {2}",
|
|
_connectionName, request.TagPaths.Count, request.InstanceUniqueName);
|
|
|
|
_subscribers[request.InstanceUniqueName] = Sender;
|
|
|
|
if (!_subscriptionsByInstance.ContainsKey(request.InstanceUniqueName))
|
|
_subscriptionsByInstance[request.InstanceUniqueName] = new HashSet<string>();
|
|
|
|
var instanceTags = _subscriptionsByInstance[request.InstanceUniqueName];
|
|
var self = Self;
|
|
var sender = Sender;
|
|
|
|
Task.Run(async () =>
|
|
{
|
|
foreach (var tagPath in request.TagPaths)
|
|
{
|
|
if (_subscriptionIds.ContainsKey(tagPath))
|
|
{
|
|
// Already subscribed — just track for this instance
|
|
instanceTags.Add(tagPath);
|
|
continue;
|
|
}
|
|
|
|
try
|
|
{
|
|
var subId = await _adapter.SubscribeAsync(tagPath, (path, value) =>
|
|
{
|
|
self.Tell(new TagValueReceived(path, value));
|
|
});
|
|
_subscriptionIds[tagPath] = subId;
|
|
instanceTags.Add(tagPath);
|
|
_totalSubscribed++;
|
|
_resolvedTags++;
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
// WP-12: Tag path resolution failure — mark as unresolved, retry later
|
|
_unresolvedTags.Add(tagPath);
|
|
instanceTags.Add(tagPath);
|
|
_totalSubscribed++;
|
|
|
|
self.Tell(new TagResolutionFailed(tagPath, ex.Message));
|
|
}
|
|
}
|
|
|
|
// Initial read — seed current values for all resolved tags so the Instance Actor
|
|
// doesn't stay Uncertain until the next OPC UA data change notification
|
|
foreach (var tagPath in instanceTags)
|
|
{
|
|
if (_unresolvedTags.Contains(tagPath)) continue;
|
|
try
|
|
{
|
|
var readResult = await _adapter.ReadAsync(tagPath);
|
|
if (readResult.Success && readResult.Value != null)
|
|
{
|
|
self.Tell(new TagValueReceived(tagPath, readResult.Value));
|
|
}
|
|
}
|
|
catch
|
|
{
|
|
// Best-effort — subscription will deliver subsequent changes
|
|
}
|
|
}
|
|
|
|
return new SubscribeTagsResponse(
|
|
request.CorrelationId, request.InstanceUniqueName, true, null, DateTimeOffset.UtcNow);
|
|
}).PipeTo(sender);
|
|
|
|
// Start tag resolution retry timer if we have unresolved tags
|
|
if (_unresolvedTags.Count > 0)
|
|
{
|
|
Timers.StartPeriodicTimer(
|
|
"tag-resolution-retry",
|
|
new RetryTagResolution(),
|
|
_options.TagResolutionRetryInterval,
|
|
_options.TagResolutionRetryInterval);
|
|
}
|
|
}
|
|
|
|
private void HandleUnsubscribe(UnsubscribeTagsRequest request)
|
|
{
|
|
_log.Debug("[{0}] Unsubscribing all tags for instance {1}",
|
|
_connectionName, request.InstanceUniqueName);
|
|
|
|
if (!_subscriptionsByInstance.TryGetValue(request.InstanceUniqueName, out var tags))
|
|
return;
|
|
|
|
// WP-14: Cleanup on Instance Actor stop
|
|
foreach (var tagPath in tags)
|
|
{
|
|
// Check if any other instance is still subscribed to this tag
|
|
var otherSubscribers = _subscriptionsByInstance
|
|
.Where(kvp => kvp.Key != request.InstanceUniqueName && kvp.Value.Contains(tagPath))
|
|
.Any();
|
|
|
|
if (!otherSubscribers && _subscriptionIds.TryGetValue(tagPath, out var subId))
|
|
{
|
|
_ = _adapter.UnsubscribeAsync(subId);
|
|
_subscriptionIds.Remove(tagPath);
|
|
_unresolvedTags.Remove(tagPath);
|
|
_totalSubscribed--;
|
|
if (!_unresolvedTags.Contains(tagPath))
|
|
_resolvedTags--;
|
|
}
|
|
}
|
|
|
|
_subscriptionsByInstance.Remove(request.InstanceUniqueName);
|
|
_subscribers.Remove(request.InstanceUniqueName);
|
|
}
|
|
|
|
// ── Write Support (WP-11) ──
|
|
|
|
private void HandleWrite(WriteTagRequest request)
|
|
{
|
|
_log.Debug("[{0}] Writing to tag {1}", _connectionName, request.TagPath);
|
|
var sender = Sender;
|
|
|
|
// WP-11: Write through DCL to device, failure returned synchronously
|
|
_adapter.WriteAsync(request.TagPath, request.Value).ContinueWith(t =>
|
|
{
|
|
if (t.IsCompletedSuccessfully)
|
|
{
|
|
var result = t.Result;
|
|
return new WriteTagResponse(
|
|
request.CorrelationId, result.Success, result.ErrorMessage, DateTimeOffset.UtcNow);
|
|
}
|
|
return new WriteTagResponse(
|
|
request.CorrelationId, false, t.Exception?.GetBaseException().Message, DateTimeOffset.UtcNow);
|
|
}).PipeTo(sender);
|
|
}
|
|
|
|
// ── Tag Resolution Retry (WP-12) ──
|
|
|
|
private void HandleRetryTagResolution()
|
|
{
|
|
if (_unresolvedTags.Count == 0)
|
|
{
|
|
Timers.Cancel("tag-resolution-retry");
|
|
return;
|
|
}
|
|
|
|
_log.Debug("[{0}] Retrying resolution for {1} unresolved tags", _connectionName, _unresolvedTags.Count);
|
|
|
|
var self = Self;
|
|
var toResolve = _unresolvedTags.ToList();
|
|
|
|
foreach (var tagPath in toResolve)
|
|
{
|
|
_adapter.SubscribeAsync(tagPath, (path, value) =>
|
|
{
|
|
self.Tell(new TagValueReceived(path, value));
|
|
}).ContinueWith(t =>
|
|
{
|
|
if (t.IsCompletedSuccessfully)
|
|
return new TagResolutionSucceeded(tagPath, t.Result) as object;
|
|
return new TagResolutionFailed(tagPath, t.Exception?.GetBaseException().Message ?? "Unknown error");
|
|
}).PipeTo(self);
|
|
}
|
|
}
|
|
|
|
// ── Bad Quality Push (WP-9) ──
|
|
|
|
private void PushBadQualityForAllTags()
|
|
{
|
|
var now = DateTimeOffset.UtcNow;
|
|
foreach (var (instanceName, tags) in _subscriptionsByInstance)
|
|
{
|
|
if (!_subscribers.TryGetValue(instanceName, out var subscriber))
|
|
continue;
|
|
|
|
subscriber.Tell(new ConnectionQualityChanged(_connectionName, QualityCode.Bad, now));
|
|
}
|
|
|
|
// All tags now bad quality
|
|
_tagsGoodQuality = 0;
|
|
_tagsUncertainQuality = 0;
|
|
_tagsBadQuality = _lastTagQuality.Count;
|
|
foreach (var key in _lastTagQuality.Keys.ToList())
|
|
_lastTagQuality[key] = QualityCode.Bad;
|
|
_healthCollector.UpdateTagQuality(_connectionName, _tagsGoodQuality, _tagsBadQuality, _tagsUncertainQuality);
|
|
}
|
|
|
|
// ── Re-subscribe (WP-10) ──
|
|
|
|
private void ReSubscribeAll()
|
|
{
|
|
// Derive tag list from _subscriptionsByInstance (durable source of truth),
|
|
// not _subscriptionIds which gets cleared and is only repopulated on success.
|
|
var allTags = _subscriptionsByInstance.Values
|
|
.SelectMany(tags => tags)
|
|
.Distinct()
|
|
.ToList();
|
|
|
|
_log.Info("[{0}] Re-subscribing {1} tags after reconnect", _connectionName, allTags.Count);
|
|
|
|
var self = Self;
|
|
_subscriptionIds.Clear();
|
|
_unresolvedTags.Clear();
|
|
_resolvedTags = 0;
|
|
|
|
foreach (var tagPath in allTags)
|
|
{
|
|
_adapter.SubscribeAsync(tagPath, (path, value) =>
|
|
{
|
|
self.Tell(new TagValueReceived(path, value));
|
|
}).ContinueWith(t =>
|
|
{
|
|
if (t.IsCompletedSuccessfully)
|
|
return new TagResolutionSucceeded(tagPath, t.Result) as object;
|
|
return new TagResolutionFailed(tagPath, t.Exception?.GetBaseException().Message ?? "Unknown error");
|
|
}).PipeTo(self);
|
|
}
|
|
}
|
|
|
|
// ── Health Reporting (WP-13) ──
|
|
|
|
private void ReplyWithHealthReport()
|
|
{
|
|
var status = _adapter.Status;
|
|
var endpointLabel = _backupConfig == null
|
|
? "Primary (no backup)"
|
|
: _activeEndpoint.ToString();
|
|
Sender.Tell(new DataConnectionHealthReport(
|
|
_connectionName, status, _totalSubscribed, _resolvedTags, endpointLabel, DateTimeOffset.UtcNow));
|
|
}
|
|
|
|
// ── Internal message handlers for piped async results ──
|
|
|
|
private void HandleTagResolutionSucceeded(TagResolutionSucceeded msg)
|
|
{
|
|
if (_unresolvedTags.Remove(msg.TagPath))
|
|
{
|
|
_subscriptionIds[msg.TagPath] = msg.SubscriptionId;
|
|
_resolvedTags++;
|
|
_healthCollector.UpdateTagResolution(_connectionName, _totalSubscribed, _resolvedTags);
|
|
_log.Info("[{0}] Tag resolved: {1}", _connectionName, msg.TagPath);
|
|
}
|
|
|
|
if (_unresolvedTags.Count == 0)
|
|
{
|
|
Timers.Cancel("tag-resolution-retry");
|
|
}
|
|
}
|
|
|
|
private void HandleTagResolutionFailed(TagResolutionFailed msg)
|
|
{
|
|
_log.Debug("[{0}] Tag resolution still failing for {1}: {2}",
|
|
_connectionName, msg.TagPath, msg.Error);
|
|
|
|
// Track as unresolved so periodic retry picks it up
|
|
if (_unresolvedTags.Add(msg.TagPath))
|
|
{
|
|
Timers.StartPeriodicTimer(
|
|
"tag-resolution-retry",
|
|
new RetryTagResolution(),
|
|
_options.TagResolutionRetryInterval,
|
|
_options.TagResolutionRetryInterval);
|
|
}
|
|
}
|
|
|
|
private void HandleTagValueReceived(TagValueReceived msg)
|
|
{
|
|
// Fan out to all subscribed instances
|
|
foreach (var (instanceName, tags) in _subscriptionsByInstance)
|
|
{
|
|
if (!tags.Contains(msg.TagPath))
|
|
continue;
|
|
|
|
if (_subscribers.TryGetValue(instanceName, out var subscriber))
|
|
{
|
|
subscriber.Tell(new TagValueUpdate(
|
|
_connectionName, msg.TagPath, msg.Value.Value, msg.Value.Quality, msg.Value.Timestamp));
|
|
}
|
|
}
|
|
|
|
// Track quality transitions
|
|
if (_lastTagQuality.TryGetValue(msg.TagPath, out var prevQuality))
|
|
{
|
|
// Decrement old quality bucket
|
|
switch (prevQuality)
|
|
{
|
|
case QualityCode.Good: _tagsGoodQuality--; break;
|
|
case QualityCode.Bad: _tagsBadQuality--; break;
|
|
case QualityCode.Uncertain: _tagsUncertainQuality--; break;
|
|
}
|
|
}
|
|
// Increment new quality bucket
|
|
switch (msg.Value.Quality)
|
|
{
|
|
case QualityCode.Good: _tagsGoodQuality++; break;
|
|
case QualityCode.Bad: _tagsBadQuality++; break;
|
|
case QualityCode.Uncertain: _tagsUncertainQuality++; break;
|
|
}
|
|
_lastTagQuality[msg.TagPath] = msg.Value.Quality;
|
|
_healthCollector.UpdateTagQuality(_connectionName, _tagsGoodQuality, _tagsBadQuality, _tagsUncertainQuality);
|
|
}
|
|
|
|
// ── Internal messages ──
|
|
|
|
internal record AttemptConnect;
|
|
internal record ConnectResult(bool Success, string? Error);
|
|
internal record AdapterDisconnected;
|
|
internal record TagValueReceived(string TagPath, TagValue Value);
|
|
internal record TagResolutionFailed(string TagPath, string Error);
|
|
internal record TagResolutionSucceeded(string TagPath, string SubscriptionId);
|
|
internal record RetryTagResolution;
|
|
public record GetHealthReport;
|
|
}
|