fix(data-connection-layer): resolve DataConnectionLayer-002/003/004/005 — Resume supervision, concurrent dicts, subscribe-failure classification, write timeout

This commit is contained in:
Joseph Doherty
2026-05-16 19:40:40 -04:00
parent d7630d80fe
commit fccd3274d3
7 changed files with 350 additions and 25 deletions

View File

@@ -213,7 +213,13 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
HandleSubscribe(req);
break;
case SubscribeCompleted sc:
HandleSubscribeCompleted(sc);
// In Connected state, a connection-level subscribe failure must drive
// the reconnection state machine (DataConnectionLayer-004).
if (HandleSubscribeCompleted(sc))
{
_log.Warning("[{0}] Connection-level subscribe failure — entering Reconnecting", _connectionName);
BecomeReconnecting();
}
break;
case UnsubscribeTagsRequest req:
HandleUnsubscribe(req);
@@ -514,8 +520,14 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
}
catch (Exception ex)
{
// WP-12: Tag path resolution failure — reported back as unresolved.
results.Add(new SubscribeTagResult(tagPath, AlreadySubscribed: false, Success: false, null, ex.Message));
// DataConnectionLayer-004: distinguish a connection-level fault
// (adapter not connected / transport down) from a genuine
// node-not-found. Connection-level faults must drive the
// reconnection state machine, not be retried as unresolved tags.
var connectionLevel = IsConnectionLevelFailure(ex);
results.Add(new SubscribeTagResult(
tagPath, AlreadySubscribed: false, Success: false, null, ex.Message,
ConnectionLevelFailure: connectionLevel));
}
}
@@ -546,8 +558,10 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
/// Applies the result of an asynchronous subscribe on the actor thread. ALL mutation
/// of subscription state and counters happens here — never on the background task —
/// so the actor model's single-threaded state guarantee holds.
/// Returns <c>true</c> if any tag failed at connection level (DataConnectionLayer-004),
/// signalling the caller (only the Connected state) to enter Reconnecting.
/// </summary>
private void HandleSubscribeCompleted(SubscribeCompleted msg)
private bool HandleSubscribeCompleted(SubscribeCompleted msg)
{
var instanceName = msg.Request.InstanceUniqueName;
if (!_subscriptionsByInstance.TryGetValue(instanceName, out var instanceTags))
@@ -557,6 +571,12 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
_subscriptionsByInstance[instanceName] = instanceTags;
}
// DataConnectionLayer-004: if any tag failed because the adapter is not
// connected (a connection-level fault), the subscribe needs the reconnection
// state machine, not the tag-resolution retry. Drive a disconnect and let the
// request be re-stashed/retried after reconnect via ReSubscribeAll.
var connectionLevelFailure = msg.Results.Any(r => !r.Success && r.ConnectionLevelFailure);
foreach (var result in msg.Results)
{
instanceTags.Add(result.TagPath);
@@ -572,13 +592,31 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
_totalSubscribed++;
_resolvedTags++;
}
else if (result.ConnectionLevelFailure)
{
// Connection-level fault — do not count as an unresolved tag.
// ReSubscribeAll after reconnect derives the tag from
// _subscriptionsByInstance (already updated above).
_log.Warning("[{0}] Subscribe for {1} failed at connection level: {2}",
_connectionName, result.TagPath, result.Error);
}
else
{
// WP-12: mark unresolved so the periodic retry timer picks it up.
// WP-12: genuine tag resolution failure — mark unresolved so the
// periodic retry timer picks it up.
_unresolvedTags.Add(result.TagPath);
_totalSubscribed++;
_log.Debug("[{0}] Tag resolution failed for {1}: {2}",
_connectionName, result.TagPath, result.Error);
// DataConnectionLayer-004 / design doc Tag Path Resolution step 2:
// mark the attribute quality `bad` so the Instance Actor sees a
// signal rather than staying Uncertain indefinitely.
if (_subscribers.TryGetValue(instanceName, out var subscriber))
{
subscriber.Tell(new TagValueUpdate(
_connectionName, result.TagPath, null, QualityCode.Bad, DateTimeOffset.UtcNow));
}
}
}
@@ -594,6 +632,27 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
msg.ReplyTo.Tell(new SubscribeTagsResponse(
msg.Request.CorrelationId, instanceName, true, null, DateTimeOffset.UtcNow));
// The caller (Connected state only) decides whether to enter Reconnecting.
// In Connecting/Reconnecting the connection is not established anyway, so the
// existing reconnect cycle handles recovery without a re-trigger here.
return connectionLevelFailure;
}
/// <summary>
/// DataConnectionLayer-004: classifies a subscribe exception as a connection-level
/// fault (adapter not connected / transport down) versus a genuine tag-resolution
/// failure (the node does not exist on the device). Connection-level faults must
/// drive the reconnection state machine; resolution failures are retried on the
/// tag-resolution timer.
/// </summary>
private static bool IsConnectionLevelFailure(Exception ex)
{
var baseEx = ex is AggregateException agg ? agg.GetBaseException() : ex;
return baseEx is InvalidOperationException
or System.Net.Sockets.SocketException
or TimeoutException
or System.IO.IOException;
}
private void HandleUnsubscribe(UnsubscribeTagsRequest request)
@@ -634,15 +693,29 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
_log.Debug("[{0}] Writing to tag {1}", _connectionName, request.TagPath);
var sender = Sender;
// DataConnectionLayer-005: bound the write with WriteTimeout. A hung device
// write (TCP black-hole) would otherwise never complete, so PipeTo never
// fires and the calling script gets no DCL-level error. The CancellationToken
// is passed to the adapter; on timeout we translate cancellation into a
// failed WriteTagResponse so the failure is returned synchronously (WP-11).
var cts = new CancellationTokenSource(_options.WriteTimeout);
// WP-11: Write through DCL to device, failure returned synchronously
_adapter.WriteAsync(request.TagPath, request.Value).ContinueWith(t =>
_adapter.WriteAsync(request.TagPath, request.Value, cts.Token).ContinueWith(t =>
{
cts.Dispose();
if (t.IsCompletedSuccessfully)
{
var result = t.Result;
return new WriteTagResponse(
request.CorrelationId, result.Success, result.ErrorMessage, DateTimeOffset.UtcNow);
}
if (t.IsCanceled || t.Exception?.GetBaseException() is OperationCanceledException)
{
return new WriteTagResponse(
request.CorrelationId, false,
$"Write timeout after {_options.WriteTimeout.TotalSeconds:F0}s", DateTimeOffset.UtcNow);
}
return new WriteTagResponse(
request.CorrelationId, false, t.Exception?.GetBaseException().Message, DateTimeOffset.UtcNow);
}).PipeTo(sender);
@@ -824,7 +897,8 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
internal record TagResolutionSucceeded(string TagPath, string SubscriptionId);
internal record RetryTagResolution;
internal record SubscribeTagResult(
string TagPath, bool AlreadySubscribed, bool Success, string? SubscriptionId, string? Error);
string TagPath, bool AlreadySubscribed, bool Success, string? SubscriptionId, string? Error,
bool ConnectionLevelFailure = false);
internal record SubscribeCompleted(
SubscribeTagsRequest Request, IActorRef ReplyTo, IReadOnlyList<SubscribeTagResult> Results);
public record GetHealthReport;

View File

@@ -125,8 +125,20 @@ public class DataConnectionManagerActor : ReceiveActor
}
/// <summary>
/// OneForOneStrategy with Restart for connection actors — a failed connection
/// should restart and attempt reconnection.
/// OneForOneStrategy with Resume for connection actors.
///
/// DataConnectionLayer-002: a DataConnectionActor is a long-lived, stateful
/// coordinator — its in-memory subscription registry (_subscriptionsByInstance,
/// _subscriptionIds, _subscribers) is the only record of which Instance Actors
/// subscribed to which tags, and there is no durable store to rebuild it from.
/// Restart would create a fresh instance and silently discard that registry,
/// breaking the design doc's "transparent re-subscribe" guarantee (WP-10):
/// subscribers would never be re-subscribed and would sit at stale quality with
/// no error. Resume keeps the actor instance and its state intact, so a transient
/// exception in a message handler does not lose subscription state. The actor's
/// own Become/Stash reconnect state machine already recovers connection-level
/// faults, so it does not need a restart to re-establish the connection.
/// This matches the ScadaLink convention of Resume for coordinator actors.
/// </summary>
protected override SupervisorStrategy SupervisorStrategy()
{
@@ -135,8 +147,8 @@ public class DataConnectionManagerActor : ReceiveActor
withinTimeRange: TimeSpan.FromMinutes(1),
decider: Decider.From(ex =>
{
_log.Warning(ex, "DataConnectionActor threw exception, restarting");
return Directive.Restart;
_log.Warning(ex, "DataConnectionActor threw exception, resuming (subscription state preserved)");
return Directive.Resume;
}));
}
}

View File

@@ -1,3 +1,4 @@
using System.Collections.Concurrent;
using System.Security.Cryptography.X509Certificates;
using Opc.Ua;
using Opc.Ua.Client;
@@ -13,8 +14,14 @@ public class RealOpcUaClient : IOpcUaClient
{
private ISession? _session;
private Subscription? _subscription;
private readonly Dictionary<string, MonitoredItem> _monitoredItems = new();
private readonly Dictionary<string, Action<string, object?, DateTime, uint>> _callbacks = new();
// DataConnectionLayer-003: these maps are read from the OPC Foundation SDK's
// internal publish threads (the MonitoredItem.Notification handler reads
// _callbacks) concurrently with subscribe/disconnect mutations that run on
// thread-pool threads. Plain Dictionary access during a concurrent resize or
// Clear() is undefined behaviour, so they must be ConcurrentDictionary.
private readonly ConcurrentDictionary<string, MonitoredItem> _monitoredItems = new();
private readonly ConcurrentDictionary<string, Action<string, object?, DateTime, uint>> _callbacks = new();
private volatile bool _connectionLostFired;
private OpcUaConnectionOptions _options = new();
private readonly OpcUaGlobalOptions _globalOptions;
@@ -180,8 +187,8 @@ public class RealOpcUaClient : IOpcUaClient
{
_subscription.RemoveItem(item);
await _subscription.ApplyChangesAsync(cancellationToken);
_monitoredItems.Remove(subscriptionHandle);
_callbacks.Remove(subscriptionHandle);
_monitoredItems.TryRemove(subscriptionHandle, out _);
_callbacks.TryRemove(subscriptionHandle, out _);
}
}