fix(data-connection): resolve DataConnectionLayer-006..012 — quality-counter reconciliation, per-tag batch reads, configurable failover threshold, dedup retry, stale-callback guard, secure cert default

This commit is contained in:
Joseph Doherty
2026-05-16 21:11:24 -04:00
parent 0c82ffcbe6
commit c9b236e507
8 changed files with 515 additions and 34 deletions

View File

@@ -55,6 +55,13 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
/// </summary>
private readonly HashSet<string> _unresolvedTags = new();
/// <summary>
/// DataConnectionLayer-010: tags whose retry SubscribeAsync is currently in flight.
/// They are excluded from the next retry tick so a slow attempt is not duplicated
/// (which would leak monitored items / subscription ids).
/// </summary>
private readonly HashSet<string> _resolutionInFlight = new();
/// <summary>
/// Subscribers: instanceUniqueName → IActorRef (the Instance Actor).
/// </summary>
@@ -80,6 +87,15 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
private int _consecutiveUnstableDisconnects;
private DateTimeOffset _lastConnectedAt;
/// <summary>
/// DataConnectionLayer-011: monotonically increasing tag that identifies the
/// current adapter instance. Subscription callbacks capture the generation in
/// effect when they were created; a <see cref="TagValueReceived"/> whose
/// generation no longer matches comes from a disposed adapter and is dropped so
/// stale pre-failover device data is never forwarded to Instance Actors.
/// </summary>
private int _adapterGeneration;
/// <summary>
/// Captured Self reference for use from non-actor threads (event handlers, callbacks).
/// Akka.NET's Self property is only valid inside the actor's message loop.
@@ -187,12 +203,6 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
// ── Connected State ──
/// <summary>
/// Minimum time connected before we consider the connection stable.
/// If we disconnect before this, it counts as an unstable connection toward failover.
/// </summary>
private static readonly TimeSpan StableConnectionThreshold = TimeSpan.FromSeconds(60);
private void BecomeConnected()
{
_log.Info("[{0}] Entering Connected state", _connectionName);
@@ -263,7 +273,7 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
// If we were connected for less than the stability threshold, this counts
// as an unstable cycle (e.g., connect succeeded but heartbeat went stale).
var connectionDuration = DateTimeOffset.UtcNow - _lastConnectedAt;
if (_lastConnectedAt != default && connectionDuration < StableConnectionThreshold)
if (_lastConnectedAt != default && connectionDuration < _options.StableConnectionThreshold)
{
_consecutiveUnstableDisconnects++;
_log.Warning("[{0}] Unstable connection (lasted {1:F0}s) — consecutive unstable disconnects: {2}/{3}",
@@ -298,6 +308,10 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
_connectionDetails = newConfig;
_adapter.Disconnected += OnAdapterDisconnected;
// DataConnectionLayer-011: new adapter — bump the generation so callbacks
// from the disposed adapter are recognised as stale and dropped.
_adapterGeneration++;
_log.Warning("[{0}] Failing over from {1} to {2} (unstable connection pattern)",
_connectionName, previousEndpoint, _activeEndpoint);
@@ -306,7 +320,7 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
_ = _siteEventLogger.LogEventAsync(
"connection", "Warning", null, _connectionName,
$"Failover from {previousEndpoint} to {_activeEndpoint} (unstable connection)",
$"Connection lasted {connectionDuration.TotalSeconds:F0}s, threshold {StableConnectionThreshold.TotalSeconds:F0}s");
$"Connection lasted {connectionDuration.TotalSeconds:F0}s, threshold {_options.StableConnectionThreshold.TotalSeconds:F0}s");
}
}
@@ -443,6 +457,10 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
// Wire disconnect handler on new adapter
_adapter.Disconnected += OnAdapterDisconnected;
// DataConnectionLayer-011: new adapter — bump the generation so callbacks
// from the disposed adapter are recognised as stale and dropped.
_adapterGeneration++;
_log.Warning("[{0}] Failing over from {1} to {2}",
_connectionName, previousEndpoint, _activeEndpoint);
@@ -487,6 +505,9 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
var self = Self;
var sender = Sender;
// DataConnectionLayer-011: capture the current adapter generation so callbacks
// from this adapter can be distinguished from a later (post-failover) adapter.
var generation = _adapterGeneration;
// Snapshot the already-subscribed tag set on the actor thread. The background
// task below must NOT read or mutate actor state — it performs only adapter
@@ -513,7 +534,7 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
{
var subId = await _adapter.SubscribeAsync(tagPath, (path, value) =>
{
self.Tell(new TagValueReceived(path, value));
self.Tell(new TagValueReceived(path, value, generation));
});
results.Add(new SubscribeTagResult(tagPath, AlreadySubscribed: false, Success: true, subId, null));
tagsToSeed.Add(tagPath);
@@ -541,7 +562,7 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
var readResult = await _adapter.ReadAsync(tagPath);
if (readResult.Success && readResult.Value != null)
{
self.Tell(new TagValueReceived(tagPath, readResult.Value));
self.Tell(new TagValueReceived(tagPath, readResult.Value, generation));
}
}
catch
@@ -676,14 +697,34 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
_ = _adapter.UnsubscribeAsync(subId);
_subscriptionIds.Remove(tagPath);
_unresolvedTags.Remove(tagPath);
_resolutionInFlight.Remove(tagPath);
_totalSubscribed--;
if (!_unresolvedTags.Contains(tagPath))
_resolvedTags--;
// DataConnectionLayer-006: drop the tag's tracked quality so it is no
// longer counted by PushBadQualityForAllTags (which sets _tagsBadQuality
// from _lastTagQuality.Count). Leaving it here drifts the quality
// counters above _totalSubscribed across disconnect cycles.
if (_lastTagQuality.Remove(tagPath, out var droppedQuality))
{
switch (droppedQuality)
{
case QualityCode.Good: _tagsGoodQuality--; break;
case QualityCode.Bad: _tagsBadQuality--; break;
case QualityCode.Uncertain: _tagsUncertainQuality--; break;
}
}
}
}
_subscriptionsByInstance.Remove(request.InstanceUniqueName);
_subscribers.Remove(request.InstanceUniqueName);
// DataConnectionLayer-006: keep the reported quality counters in sync after the
// unsubscribed tags' buckets were decremented above.
_healthCollector.UpdateTagQuality(_connectionName, _tagsGoodQuality, _tagsBadQuality, _tagsUncertainQuality);
_healthCollector.UpdateTagResolution(_connectionName, _totalSubscribed, _resolvedTags);
}
// ── Write Support (WP-11) ──
@@ -731,16 +772,29 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
return;
}
_log.Debug("[{0}] Retrying resolution for {1} unresolved tags", _connectionName, _unresolvedTags.Count);
var self = Self;
var toResolve = _unresolvedTags.ToList();
// DataConnectionLayer-010: only dispatch retries for tags that do not already
// have an attempt in flight. A slow SubscribeAsync overlapping the next tick
// would otherwise produce duplicate concurrent subscribes for the same tag.
var toResolve = _unresolvedTags.Where(t => !_resolutionInFlight.Contains(t)).ToList();
if (toResolve.Count == 0)
{
_log.Debug("[{0}] Tag-resolution retry skipped — {1} attempt(s) still in flight",
_connectionName, _resolutionInFlight.Count);
return;
}
_log.Debug("[{0}] Retrying resolution for {1} unresolved tags", _connectionName, toResolve.Count);
var generation = _adapterGeneration;
foreach (var tagPath in toResolve)
{
_resolutionInFlight.Add(tagPath);
_adapter.SubscribeAsync(tagPath, (path, value) =>
{
self.Tell(new TagValueReceived(path, value));
self.Tell(new TagValueReceived(path, value, generation));
}).ContinueWith(t =>
{
if (t.IsCompletedSuccessfully)
@@ -788,13 +842,25 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
var self = Self;
_subscriptionIds.Clear();
_unresolvedTags.Clear();
_resolutionInFlight.Clear();
_resolvedTags = 0;
// DataConnectionLayer-006: reset the quality tracking too. Otherwise tags
// resolved for the first time after reconnect (never in _lastTagQuality) only
// increment their bucket and the totals drift above _totalSubscribed. They are
// repopulated from fresh TagValueReceived messages once subscriptions activate.
_lastTagQuality.Clear();
_tagsGoodQuality = 0;
_tagsBadQuality = 0;
_tagsUncertainQuality = 0;
_healthCollector.UpdateTagQuality(_connectionName, _tagsGoodQuality, _tagsBadQuality, _tagsUncertainQuality);
var generation = _adapterGeneration;
foreach (var tagPath in allTags)
{
_adapter.SubscribeAsync(tagPath, (path, value) =>
{
self.Tell(new TagValueReceived(path, value));
self.Tell(new TagValueReceived(path, value, generation));
}).ContinueWith(t =>
{
if (t.IsCompletedSuccessfully)
@@ -820,6 +886,9 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
private void HandleTagResolutionSucceeded(TagResolutionSucceeded msg)
{
// DataConnectionLayer-010: the retry attempt for this tag has completed.
_resolutionInFlight.Remove(msg.TagPath);
if (_unresolvedTags.Remove(msg.TagPath))
{
_subscriptionIds[msg.TagPath] = msg.SubscriptionId;
@@ -839,6 +908,10 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
_log.Debug("[{0}] Tag resolution still failing for {1}: {2}",
_connectionName, msg.TagPath, msg.Error);
// DataConnectionLayer-010: the retry attempt for this tag has completed —
// it is eligible for the next retry tick again.
_resolutionInFlight.Remove(msg.TagPath);
// Track as unresolved so periodic retry picks it up
if (_unresolvedTags.Add(msg.TagPath))
{
@@ -852,6 +925,16 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
private void HandleTagValueReceived(TagValueReceived msg)
{
// DataConnectionLayer-011: drop values delivered by a disposed adapter. After a
// failover the old adapter's OPC UA SDK threads may still fire callbacks; those
// carry a stale generation and must not be forwarded to Instance Actors.
if (msg.AdapterGeneration != _adapterGeneration)
{
_log.Debug("[{0}] Dropping stale tag value for {1} from adapter generation {2} (current {3})",
_connectionName, msg.TagPath, msg.AdapterGeneration, _adapterGeneration);
return;
}
// Fan out to all subscribed instances
foreach (var (instanceName, tags) in _subscriptionsByInstance)
{
@@ -892,7 +975,7 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
internal record AttemptConnect;
internal record ConnectResult(bool Success, string? Error);
internal record AdapterDisconnected;
internal record TagValueReceived(string TagPath, TagValue Value);
internal record TagValueReceived(string TagPath, TagValue Value, int AdapterGeneration);
internal record TagResolutionFailed(string TagPath, string Error);
internal record TagResolutionSucceeded(string TagPath, string SubscriptionId);
internal record RetryTagResolution;

View File

@@ -14,7 +14,10 @@ public record OpcUaConnectionOptions(
int SamplingIntervalMs = 1000,
int QueueSize = 10,
string SecurityMode = "None",
bool AutoAcceptUntrustedCerts = true,
// DataConnectionLayer-012: secure-by-default — untrusted server certificates are
// rejected unless an operator explicitly opts in per connection. Accepting any
// certificate defeats the Sign / SignAndEncrypt modes against a man-in-the-middle.
bool AutoAcceptUntrustedCerts = false,
bool DiscardOldest = true,
byte SubscriptionPriority = 0,
string SubscriptionDisplayName = "ScadaLink",

View File

@@ -186,10 +186,26 @@ public class OpcUaDataConnection : IDataConnection
public async Task<IReadOnlyDictionary<string, ReadResult>> ReadBatchAsync(IEnumerable<string> tagPaths, CancellationToken cancellationToken = default)
{
// DataConnectionLayer-007: a single failing tag must not abort the whole batch.
// ReadAsync re-throws non-cancellation exceptions; catch them per tag and record
// a failed ReadResult so the caller receives a complete result map for every
// requested tag (the ReadResult shape already carries per-tag Success/error).
var results = new Dictionary<string, ReadResult>();
foreach (var tagPath in tagPaths)
{
results[tagPath] = await ReadAsync(tagPath, cancellationToken);
try
{
results[tagPath] = await ReadAsync(tagPath, cancellationToken);
}
catch (OperationCanceledException)
{
// Cancellation aborts the whole batch — propagate it.
throw;
}
catch (Exception ex)
{
results[tagPath] = new ReadResult(false, null, ex.Message);
}
}
return results;
}

View File

@@ -1,5 +1,7 @@
using System.Collections.Concurrent;
using System.Security.Cryptography.X509Certificates;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Opc.Ua;
using Opc.Ua.Client;
using Opc.Ua.Configuration;
@@ -25,10 +27,12 @@ public class RealOpcUaClient : IOpcUaClient
private volatile bool _connectionLostFired;
private OpcUaConnectionOptions _options = new();
private readonly OpcUaGlobalOptions _globalOptions;
private readonly ILogger<RealOpcUaClient> _logger;
public RealOpcUaClient(OpcUaGlobalOptions? globalOptions = null)
public RealOpcUaClient(OpcUaGlobalOptions? globalOptions = null, ILogger<RealOpcUaClient>? logger = null)
{
_globalOptions = globalOptions ?? new OpcUaGlobalOptions();
_logger = logger ?? NullLogger<RealOpcUaClient>.Instance;
}
public bool IsConnected => _session?.Connected ?? false;
@@ -65,7 +69,16 @@ public class RealOpcUaClient : IOpcUaClient
await appConfig.ValidateAsync(ApplicationType.Client);
if (opts.AutoAcceptUntrustedCerts)
{
// DataConnectionLayer-012: this accepts ANY server certificate, defeating
// certificate trust enforcement. Surface a prominent warning so an operator
// who has opted in is aware of the man-in-the-middle exposure on the link.
_logger.LogWarning(
"OPC UA connection to {Endpoint} has AutoAcceptUntrustedCerts enabled — every " +
"server certificate is accepted unconditionally. This defeats Sign / " +
"SignAndEncrypt protection against a man-in-the-middle.", endpointUrl);
appConfig.CertificateValidator.CertificateValidation += (_, e) => e.Accept = true;
}
// Discover endpoints from the server, pick the preferred security mode
EndpointDescription? endpoint;

View File

@@ -13,4 +13,11 @@ public class DataConnectionOptions
/// <summary>Timeout for synchronous write operations to devices.</summary>
public TimeSpan WriteTimeout { get; set; } = TimeSpan.FromSeconds(30);
/// <summary>
/// Minimum time a connection must stay up before it is considered stable.
/// If a connection drops before this threshold, it counts as an unstable
/// disconnect toward the failover retry count (DataConnectionLayer-009).
/// </summary>
public TimeSpan StableConnectionThreshold { get; set; } = TimeSpan.FromSeconds(60);
}