fix(data-connection): resolve DataConnectionLayer-001 — off-thread actor state mutation
HandleSubscribe spawned a Task.Run that mutated DataConnectionActor private state (_subscriptionIds, _subscriptionsByInstance, _totalSubscribed, _resolvedTags, _unresolvedTags) from a thread-pool thread, racing the actor's own message loop — a data race on non-thread-safe Dictionary/HashSet and non-atomic counters. Restructured HandleSubscribe to follow the actor's existing PipeTo(Self) pattern: the background task now performs only adapter I/O and pipes a SubscribeCompleted message to Self; all subscription-state mutation happens in the new HandleSubscribeCompleted handler on the actor thread (wired into the Connected, Connecting and Reconnecting states). Adds DCL001_ConcurrentSubscribes_DoNotCorruptSubscriptionCounters (30x30 concurrent subscribes) which fails against the pre-fix code and passes after.
This commit is contained in:
@@ -171,6 +171,11 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
|
||||
case UnsubscribeTagsRequest:
|
||||
Stash.Stash();
|
||||
break;
|
||||
case SubscribeCompleted sc:
|
||||
// A subscribe started while Connected can complete after a transition;
|
||||
// apply it so its state survives into the next ReSubscribeAll.
|
||||
HandleSubscribeCompleted(sc);
|
||||
break;
|
||||
case GetHealthReport:
|
||||
ReplyWithHealthReport();
|
||||
break;
|
||||
@@ -207,6 +212,9 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
|
||||
case SubscribeTagsRequest req:
|
||||
HandleSubscribe(req);
|
||||
break;
|
||||
case SubscribeCompleted sc:
|
||||
HandleSubscribeCompleted(sc);
|
||||
break;
|
||||
case UnsubscribeTagsRequest req:
|
||||
HandleUnsubscribe(req);
|
||||
break;
|
||||
@@ -338,6 +346,11 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
|
||||
case TagResolutionFailed:
|
||||
// Ignore — stale results from previous connection; ReSubscribeAll runs after reconnect
|
||||
break;
|
||||
case SubscribeCompleted sc:
|
||||
// A subscribe started while Connected can complete after a transition;
|
||||
// apply it so its state survives into the next ReSubscribeAll.
|
||||
HandleSubscribeCompleted(sc);
|
||||
break;
|
||||
case GetHealthReport:
|
||||
ReplyWithHealthReport();
|
||||
break;
|
||||
@@ -466,18 +479,27 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
|
||||
if (!_subscriptionsByInstance.ContainsKey(request.InstanceUniqueName))
|
||||
_subscriptionsByInstance[request.InstanceUniqueName] = new HashSet<string>();
|
||||
|
||||
var instanceTags = _subscriptionsByInstance[request.InstanceUniqueName];
|
||||
var self = Self;
|
||||
var sender = Sender;
|
||||
|
||||
// Snapshot the already-subscribed tag set on the actor thread. The background
|
||||
// task below must NOT read or mutate actor state — it performs only adapter
|
||||
// I/O and reports results back via a SubscribeCompleted message, which is
|
||||
// applied to actor state on the actor thread (see HandleSubscribeCompleted).
|
||||
var alreadySubscribed = new HashSet<string>(_subscriptionIds.Keys);
|
||||
|
||||
Task.Run(async () =>
|
||||
{
|
||||
var results = new List<SubscribeTagResult>(request.TagPaths.Count);
|
||||
var tagsToSeed = new List<string>();
|
||||
|
||||
foreach (var tagPath in request.TagPaths)
|
||||
{
|
||||
if (_subscriptionIds.ContainsKey(tagPath))
|
||||
if (alreadySubscribed.Contains(tagPath))
|
||||
{
|
||||
// Already subscribed — just track for this instance
|
||||
instanceTags.Add(tagPath);
|
||||
// Already subscribed by another instance — just track for this one.
|
||||
results.Add(new SubscribeTagResult(tagPath, AlreadySubscribed: true, Success: true, null, null));
|
||||
tagsToSeed.Add(tagPath);
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -487,27 +509,21 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
|
||||
{
|
||||
self.Tell(new TagValueReceived(path, value));
|
||||
});
|
||||
_subscriptionIds[tagPath] = subId;
|
||||
instanceTags.Add(tagPath);
|
||||
_totalSubscribed++;
|
||||
_resolvedTags++;
|
||||
results.Add(new SubscribeTagResult(tagPath, AlreadySubscribed: false, Success: true, subId, null));
|
||||
tagsToSeed.Add(tagPath);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
// WP-12: Tag path resolution failure — mark as unresolved, retry later
|
||||
_unresolvedTags.Add(tagPath);
|
||||
instanceTags.Add(tagPath);
|
||||
_totalSubscribed++;
|
||||
|
||||
self.Tell(new TagResolutionFailed(tagPath, ex.Message));
|
||||
// WP-12: Tag path resolution failure — reported back as unresolved.
|
||||
results.Add(new SubscribeTagResult(tagPath, AlreadySubscribed: false, Success: false, null, ex.Message));
|
||||
}
|
||||
}
|
||||
|
||||
// Initial read — seed current values for all resolved tags so the Instance Actor
|
||||
// doesn't stay Uncertain until the next OPC UA data change notification
|
||||
foreach (var tagPath in instanceTags)
|
||||
// Initial read — seed current values for resolved tags so the Instance Actor
|
||||
// doesn't stay Uncertain until the next OPC UA data change notification.
|
||||
// Tell is thread-safe, so seeded values are delivered directly as messages.
|
||||
foreach (var tagPath in tagsToSeed)
|
||||
{
|
||||
if (_unresolvedTags.Contains(tagPath)) continue;
|
||||
try
|
||||
{
|
||||
var readResult = await _adapter.ReadAsync(tagPath);
|
||||
@@ -522,11 +538,51 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
|
||||
}
|
||||
}
|
||||
|
||||
return new SubscribeTagsResponse(
|
||||
request.CorrelationId, request.InstanceUniqueName, true, null, DateTimeOffset.UtcNow);
|
||||
}).PipeTo(sender);
|
||||
return new SubscribeCompleted(request, sender, results);
|
||||
}).PipeTo(self);
|
||||
}
|
||||
|
||||
// Start tag resolution retry timer if we have unresolved tags
|
||||
/// <summary>
|
||||
/// Applies the result of an asynchronous subscribe on the actor thread. ALL mutation
|
||||
/// of subscription state and counters happens here — never on the background task —
|
||||
/// so the actor model's single-threaded state guarantee holds.
|
||||
/// </summary>
|
||||
private void HandleSubscribeCompleted(SubscribeCompleted msg)
|
||||
{
|
||||
var instanceName = msg.Request.InstanceUniqueName;
|
||||
if (!_subscriptionsByInstance.TryGetValue(instanceName, out var instanceTags))
|
||||
{
|
||||
// The instance was unsubscribed while the subscribe I/O was in flight.
|
||||
instanceTags = new HashSet<string>();
|
||||
_subscriptionsByInstance[instanceName] = instanceTags;
|
||||
}
|
||||
|
||||
foreach (var result in msg.Results)
|
||||
{
|
||||
instanceTags.Add(result.TagPath);
|
||||
|
||||
// Re-check against current state: another subscribe may have resolved the
|
||||
// same tag while this request's I/O was in flight.
|
||||
if (result.AlreadySubscribed || _subscriptionIds.ContainsKey(result.TagPath))
|
||||
continue;
|
||||
|
||||
if (result.Success)
|
||||
{
|
||||
_subscriptionIds[result.TagPath] = result.SubscriptionId!;
|
||||
_totalSubscribed++;
|
||||
_resolvedTags++;
|
||||
}
|
||||
else
|
||||
{
|
||||
// WP-12: mark unresolved so the periodic retry timer picks it up.
|
||||
_unresolvedTags.Add(result.TagPath);
|
||||
_totalSubscribed++;
|
||||
_log.Debug("[{0}] Tag resolution failed for {1}: {2}",
|
||||
_connectionName, result.TagPath, result.Error);
|
||||
}
|
||||
}
|
||||
|
||||
// Start the tag-resolution retry timer if any tags are unresolved.
|
||||
if (_unresolvedTags.Count > 0)
|
||||
{
|
||||
Timers.StartPeriodicTimer(
|
||||
@@ -535,6 +591,9 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
|
||||
_options.TagResolutionRetryInterval,
|
||||
_options.TagResolutionRetryInterval);
|
||||
}
|
||||
|
||||
msg.ReplyTo.Tell(new SubscribeTagsResponse(
|
||||
msg.Request.CorrelationId, instanceName, true, null, DateTimeOffset.UtcNow));
|
||||
}
|
||||
|
||||
private void HandleUnsubscribe(UnsubscribeTagsRequest request)
|
||||
@@ -764,5 +823,9 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
|
||||
internal record TagResolutionFailed(string TagPath, string Error);
|
||||
internal record TagResolutionSucceeded(string TagPath, string SubscriptionId);
|
||||
internal record RetryTagResolution;
|
||||
internal record SubscribeTagResult(
|
||||
string TagPath, bool AlreadySubscribed, bool Success, string? SubscriptionId, string? Error);
|
||||
internal record SubscribeCompleted(
|
||||
SubscribeTagsRequest Request, IActorRef ReplyTo, IReadOnlyList<SubscribeTagResult> Results);
|
||||
public record GetHealthReport;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user