fix(data-connection-layer): resolve DataConnectionLayer-014..017 — real logger for OPC UA client, initial-connect failover, accurate subscribe response, per-tag write-batch results

This commit is contained in:
Joseph Doherty
2026-05-17 03:18:24 -04:00
parent 3d3f43229f
commit 14ba5495d1
7 changed files with 408 additions and 66 deletions

View File

@@ -410,8 +410,14 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
}
else
{
_log.Warning("[{0}] Connection failed: {1}. Retrying in {2}s",
_connectionName, result.Error, _options.ReconnectInterval.TotalSeconds);
// DataConnectionLayer-015: the INITIAL connect must participate in the
// failover counter exactly like a reconnect. Without this a primary that is
// unreachable when the actor first starts (fresh deployment, site restart, or
// a primary simply down) is retried forever and the configured backup is
// never tried. Count the failure and switch endpoint once the retry count is
// exhausted, then re-arm the timer.
_consecutiveFailures++;
CountFailureAndMaybeFailover(result.Error);
Timers.StartSingleTimer("reconnect", new AttemptConnect(), _options.ReconnectInterval);
}
}
@@ -439,59 +445,69 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
else
{
_consecutiveFailures++;
// Failover: switch endpoint after exhausting retry count (only if backup is configured)
if (_backupConfig != null && _consecutiveFailures >= _failoverRetryCount)
{
var previousEndpoint = _activeEndpoint;
_activeEndpoint = _activeEndpoint == ActiveEndpoint.Primary
? ActiveEndpoint.Backup
: ActiveEndpoint.Primary;
_consecutiveFailures = 0;
var newConfig = _activeEndpoint == ActiveEndpoint.Primary
? _primaryConfig
: _backupConfig;
// Dispose old adapter (fire-and-forget — don't await in actor context)
_adapter.Disconnected -= OnAdapterDisconnected;
_ = _adapter.DisposeAsync().AsTask();
// Create new adapter for the target endpoint
_adapter = _factory.Create(_protocolType, newConfig);
_connectionDetails = newConfig;
// Wire disconnect handler on new adapter
_adapter.Disconnected += OnAdapterDisconnected;
// DataConnectionLayer-011: new adapter — bump the generation so callbacks
// from the disposed adapter are recognised as stale and dropped.
_adapterGeneration++;
_log.Warning("[{0}] Failing over from {1} to {2}",
_connectionName, previousEndpoint, _activeEndpoint);
// Log failover event to site event log
if (_siteEventLogger != null)
{
_ = _siteEventLogger.LogEventAsync(
"connection", "Warning", null, _connectionName,
$"Failover from {previousEndpoint} to {_activeEndpoint}",
$"After {_failoverRetryCount} consecutive failures");
}
}
else
{
var retryLimit = _backupConfig != null ? _failoverRetryCount.ToString() : "∞";
_log.Warning("[{0}] Reconnect failed: {1}. Retrying in {2}s (attempt {3}/{4})",
_connectionName, result.Error, _options.ReconnectInterval.TotalSeconds,
_consecutiveFailures, retryLimit);
}
CountFailureAndMaybeFailover(result.Error);
Timers.StartSingleTimer("reconnect", new AttemptConnect(), _options.ReconnectInterval);
}
}
/// <summary>
/// Shared connect-failure handling for both the initial connect (Connecting state)
/// and reconnect (Reconnecting state). Assumes <see cref="_consecutiveFailures"/> has
/// already been incremented for the current failure. Switches to the other endpoint
/// once the retry count is exhausted and a backup is configured
/// (DataConnectionLayer-015 brought the initial connect onto this path).
/// </summary>
private void CountFailureAndMaybeFailover(string? error)
{
// Failover: switch endpoint after exhausting retry count (only if backup is configured)
if (_backupConfig != null && _consecutiveFailures >= _failoverRetryCount)
{
var previousEndpoint = _activeEndpoint;
_activeEndpoint = _activeEndpoint == ActiveEndpoint.Primary
? ActiveEndpoint.Backup
: ActiveEndpoint.Primary;
_consecutiveFailures = 0;
var newConfig = _activeEndpoint == ActiveEndpoint.Primary
? _primaryConfig
: _backupConfig;
// Dispose old adapter (fire-and-forget — don't await in actor context)
_adapter.Disconnected -= OnAdapterDisconnected;
_ = _adapter.DisposeAsync().AsTask();
// Create new adapter for the target endpoint
_adapter = _factory.Create(_protocolType, newConfig);
_connectionDetails = newConfig;
// Wire disconnect handler on new adapter
_adapter.Disconnected += OnAdapterDisconnected;
// DataConnectionLayer-011: new adapter — bump the generation so callbacks
// from the disposed adapter are recognised as stale and dropped.
_adapterGeneration++;
_log.Warning("[{0}] Failing over from {1} to {2}",
_connectionName, previousEndpoint, _activeEndpoint);
// Log failover event to site event log
if (_siteEventLogger != null)
{
_ = _siteEventLogger.LogEventAsync(
"connection", "Warning", null, _connectionName,
$"Failover from {previousEndpoint} to {_activeEndpoint}",
$"After {_failoverRetryCount} consecutive failures");
}
}
else
{
var retryLimit = _backupConfig != null ? _failoverRetryCount.ToString() : "∞";
_log.Warning("[{0}] Connect failed: {1}. Retrying in {2}s (attempt {3}/{4})",
_connectionName, error, _options.ReconnectInterval.TotalSeconds,
_consecutiveFailures, retryLimit);
}
}
private void HandleDisconnect()
{
_log.Warning("[{0}] AdapterDisconnected message received — transitioning to Reconnecting", _connectionName);
@@ -663,8 +679,18 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
_options.TagResolutionRetryInterval);
}
msg.ReplyTo.Tell(new SubscribeTagsResponse(
msg.Request.CorrelationId, instanceName, true, null, DateTimeOffset.UtcNow));
// DataConnectionLayer-016: the response must match the actor's own assessment.
// When a connection-level failure is driving the actor into Reconnecting, the
// tags were never subscribed at the adapter — replying Success: true would tell
// the Instance Actor the subscribe succeeded when it did not. Genuine
// tag-resolution failures stay Success: true (they are a runtime quality concern
// tracked via _unresolvedTags, with a Bad-quality TagValueUpdate already pushed).
msg.ReplyTo.Tell(connectionLevelFailure
? new SubscribeTagsResponse(
msg.Request.CorrelationId, instanceName, false,
"connection unavailable — will re-subscribe on reconnect", DateTimeOffset.UtcNow)
: new SubscribeTagsResponse(
msg.Request.CorrelationId, instanceName, true, null, DateTimeOffset.UtcNow));
// The caller (Connected state only) decides whether to enter Reconnecting.
// In Connecting/Reconnecting the connection is not established anyway, so the

View File

@@ -228,10 +228,28 @@ public class OpcUaDataConnection : IDataConnection
public async Task<IReadOnlyDictionary<string, WriteResult>> WriteBatchAsync(IDictionary<string, object?> values, CancellationToken cancellationToken = default)
{
// DataConnectionLayer-017: a mid-batch fault must not abort the whole batch.
// WriteAsync calls EnsureConnected(), which throws InvalidOperationException when
// the connection drops partway through; catch per-tag exceptions and record a
// failed WriteResult so the caller (including WriteBatchAndWaitAsync) receives a
// complete result map. OperationCanceledException is still propagated so a
// cancelled batch aborts as a whole — mirrors the DCL-007 fix for ReadBatchAsync.
var results = new Dictionary<string, WriteResult>();
foreach (var (tagPath, value) in values)
{
results[tagPath] = await WriteAsync(tagPath, value, cancellationToken);
try
{
results[tagPath] = await WriteAsync(tagPath, value, cancellationToken);
}
catch (OperationCanceledException)
{
// Cancellation aborts the whole batch — propagate it.
throw;
}
catch (Exception ex)
{
results[tagPath] = new WriteResult(false, ex.Message);
}
}
return results;
}

View File

@@ -316,11 +316,24 @@ public class RealOpcUaClientFactory : IOpcUaClientFactory
{
private readonly OpcUaGlobalOptions _globalOptions;
// DataConnectionLayer-014: a real logger must be threaded through to every
// RealOpcUaClient this factory builds, otherwise the DCL-012 auto-accept-certificate
// warning emitted in RealOpcUaClient.ConnectAsync sinks into NullLogger and is never
// seen in production. The factory is constructed by DataConnectionFactory, which has
// an ILoggerFactory available.
private readonly ILoggerFactory _loggerFactory;
public RealOpcUaClientFactory() : this(new OpcUaGlobalOptions()) { }
public RealOpcUaClientFactory(OpcUaGlobalOptions globalOptions)
: this(globalOptions, NullLoggerFactory.Instance) { }
public RealOpcUaClientFactory(OpcUaGlobalOptions globalOptions, ILoggerFactory loggerFactory)
{
_globalOptions = globalOptions;
_loggerFactory = loggerFactory;
}
public IOpcUaClient Create() => new RealOpcUaClient(_globalOptions);
public IOpcUaClient Create() =>
new RealOpcUaClient(_globalOptions, _loggerFactory.CreateLogger<RealOpcUaClient>());
}

View File

@@ -22,9 +22,12 @@ public class DataConnectionFactory : IDataConnectionFactory
_loggerFactory = loggerFactory;
var globalOptions = opcUaGlobalOptions.Value;
// Register built-in protocols
// Register built-in protocols.
// DataConnectionLayer-014: pass the ILoggerFactory into RealOpcUaClientFactory so
// the RealOpcUaClient it builds gets a real logger — without it the DCL-012
// auto-accept-certificate security warning is silently discarded by NullLogger.
RegisterAdapter("OpcUa", details => new OpcUaDataConnection(
new RealOpcUaClientFactory(globalOptions),
new RealOpcUaClientFactory(globalOptions, _loggerFactory),
_loggerFactory.CreateLogger<OpcUaDataConnection>()));
}