diff --git a/code-reviews/DataConnectionLayer/findings.md b/code-reviews/DataConnectionLayer/findings.md index 95aeba2..62e163d 100644 --- a/code-reviews/DataConnectionLayer/findings.md +++ b/code-reviews/DataConnectionLayer/findings.md @@ -8,7 +8,7 @@ | Last reviewed | 2026-05-16 | | Reviewer | claude-agent | | Commit reviewed | `9c60592` | -| Open findings | 13 | +| Open findings | 12 | ## Summary @@ -53,7 +53,7 @@ tag-resolution retry, disconnect/re-subscribe, and concurrency around `HandleSub |--|--| | Severity | Critical | | Category | Concurrency & thread safety | -| Status | Open | +| Status | Resolved | | Location | `src/ScadaLink.DataConnectionLayer/Actors/DataConnectionActor.cs:473-538` | **Description** @@ -82,7 +82,18 @@ handler too. **Resolution** -_Unresolved._ +Resolved 2026-05-16. `HandleSubscribe` was restructured to follow the actor's own +`PipeTo(Self)` pattern (the one already used by `HandleRetryTagResolution`): the +background `Task.Run` now performs only adapter I/O (`SubscribeAsync`/`ReadAsync`), +collects per-tag outcomes into an immutable `SubscribeCompleted` message, and pipes +that to `Self`. All mutation of `_subscriptionIds`, `_subscriptionsByInstance`, +`_totalSubscribed`, `_resolvedTags` and `_unresolvedTags` now happens in the new +`HandleSubscribeCompleted` handler on the actor thread; it is wired into the +Connected, Connecting and Reconnecting states so an in-flight subscribe is applied +regardless of state transitions. Regression test +`DCL001_ConcurrentSubscribes_DoNotCorruptSubscriptionCounters` (30×30 concurrent +subscribes) fails against the pre-fix code and passes after. Fixed by the commit +whose message references `DataConnectionLayer-001`. ### DataConnectionLayer-002 — `Restart` supervision discards all subscription state on connection-actor crash diff --git a/code-reviews/README.md b/code-reviews/README.md index cb5e42a..9fcf56d 100644 --- a/code-reviews/README.md +++ b/code-reviews/README.md @@ -28,53 +28,55 @@ code-reviews/ ## Baseline review — 2026-05-16 -All 19 modules were reviewed at commit `9c60592`. This established the baseline below. +All 19 modules were reviewed at commit `9c60592` (241 findings: 6 Critical, 46 High, +100 Medium, 89 Low). The tables below track what remains **open** as findings are +resolved. | Severity | Open findings | |----------|---------------| -| Critical | 6 | +| Critical | 5 | | High | 46 | | Medium | 100 | | Low | 89 | -| **Total** | **241** | +| **Total** | **240** | ## Module Status -| Module | Review status | Last reviewed | Commit | Open (C/H/M/L) | Total | -|--------|---------------|---------------|--------|----------------|-------| -| [CentralUI](CentralUI/findings.md) | Reviewed | 2026-05-16 | `9c60592` | 1/3/10/5 | 19 | -| [CLI](CLI/findings.md) | Reviewed | 2026-05-16 | `9c60592` | 0/1/6/6 | 13 | -| [ClusterInfrastructure](ClusterInfrastructure/findings.md) | Reviewed | 2026-05-16 | `9c60592` | 0/1/4/3 | 8 | -| [Commons](Commons/findings.md) | Reviewed | 2026-05-16 | `9c60592` | 0/0/4/8 | 12 | -| [Communication](Communication/findings.md) | Reviewed | 2026-05-16 | `9c60592` | 1/2/5/3 | 11 | -| [ConfigurationDatabase](ConfigurationDatabase/findings.md) | Reviewed | 2026-05-16 | `9c60592` | 0/1/4/6 | 11 | -| [DataConnectionLayer](DataConnectionLayer/findings.md) | Reviewed | 2026-05-16 | `9c60592` | 1/4/6/2 | 13 | -| [DeploymentManager](DeploymentManager/findings.md) | Reviewed | 2026-05-16 | `9c60592` | 0/3/6/5 | 14 | -| [ExternalSystemGateway](ExternalSystemGateway/findings.md) | Reviewed | 2026-05-16 | `9c60592` | 1/2/7/4 | 14 | -| [HealthMonitoring](HealthMonitoring/findings.md) | Reviewed | 2026-05-16 | `9c60592` | 0/2/5/5 | 12 | -| [Host](Host/findings.md) | Reviewed | 2026-05-16 | `9c60592` | 0/1/3/7 | 11 | -| [InboundAPI](InboundAPI/findings.md) | Reviewed | 2026-05-16 | `9c60592` | 0/3/5/5 | 13 | -| [ManagementService](ManagementService/findings.md) | Reviewed | 2026-05-16 | `9c60592` | 0/3/5/5 | 13 | -| [NotificationService](NotificationService/findings.md) | Reviewed | 2026-05-16 | `9c60592` | 1/3/5/3 | 12 | -| [Security](Security/findings.md) | Reviewed | 2026-05-16 | `9c60592` | 0/3/4/4 | 11 | -| [SiteEventLogging](SiteEventLogging/findings.md) | Reviewed | 2026-05-16 | `9c60592` | 0/4/4/3 | 11 | -| [SiteRuntime](SiteRuntime/findings.md) | Reviewed | 2026-05-16 | `9c60592` | 0/3/8/5 | 16 | -| [StoreAndForward](StoreAndForward/findings.md) | Reviewed | 2026-05-16 | `9c60592` | 1/2/4/6 | 13 | -| [TemplateEngine](TemplateEngine/findings.md) | Reviewed | 2026-05-16 | `9c60592` | 0/5/5/4 | 14 | +| Module | Last reviewed | Commit | Open (C/H/M/L) | Open | Total | +|--------|---------------|--------|----------------|------|-------| +| [CentralUI](CentralUI/findings.md) | 2026-05-16 | `9c60592` | 1/3/10/5 | 19 | 19 | +| [CLI](CLI/findings.md) | 2026-05-16 | `9c60592` | 0/1/6/6 | 13 | 13 | +| [ClusterInfrastructure](ClusterInfrastructure/findings.md) | 2026-05-16 | `9c60592` | 0/1/4/3 | 8 | 8 | +| [Commons](Commons/findings.md) | 2026-05-16 | `9c60592` | 0/0/4/8 | 12 | 12 | +| [Communication](Communication/findings.md) | 2026-05-16 | `9c60592` | 1/2/5/3 | 11 | 11 | +| [ConfigurationDatabase](ConfigurationDatabase/findings.md) | 2026-05-16 | `9c60592` | 0/1/4/6 | 11 | 11 | +| [DataConnectionLayer](DataConnectionLayer/findings.md) | 2026-05-16 | `9c60592` | 0/4/6/2 | 12 | 13 | +| [DeploymentManager](DeploymentManager/findings.md) | 2026-05-16 | `9c60592` | 0/3/6/5 | 14 | 14 | +| [ExternalSystemGateway](ExternalSystemGateway/findings.md) | 2026-05-16 | `9c60592` | 1/2/7/4 | 14 | 14 | +| [HealthMonitoring](HealthMonitoring/findings.md) | 2026-05-16 | `9c60592` | 0/2/5/5 | 12 | 12 | +| [Host](Host/findings.md) | 2026-05-16 | `9c60592` | 0/1/3/7 | 11 | 11 | +| [InboundAPI](InboundAPI/findings.md) | 2026-05-16 | `9c60592` | 0/3/5/5 | 13 | 13 | +| [ManagementService](ManagementService/findings.md) | 2026-05-16 | `9c60592` | 0/3/5/5 | 13 | 13 | +| [NotificationService](NotificationService/findings.md) | 2026-05-16 | `9c60592` | 1/3/5/3 | 12 | 12 | +| [Security](Security/findings.md) | 2026-05-16 | `9c60592` | 0/3/4/4 | 11 | 11 | +| [SiteEventLogging](SiteEventLogging/findings.md) | 2026-05-16 | `9c60592` | 0/4/4/3 | 11 | 11 | +| [SiteRuntime](SiteRuntime/findings.md) | 2026-05-16 | `9c60592` | 0/3/8/5 | 16 | 16 | +| [StoreAndForward](StoreAndForward/findings.md) | 2026-05-16 | `9c60592` | 1/2/4/6 | 13 | 13 | +| [TemplateEngine](TemplateEngine/findings.md) | 2026-05-16 | `9c60592` | 0/5/5/4 | 14 | 14 | ## Pending Findings -All findings are currently `Open`. As findings are resolved, remove them from the -tables below (see [REVIEW-PROCESS.md](REVIEW-PROCESS.md) §5). Full detail for each -finding — description, location, recommendation — lives in the module's `findings.md`. +Every `Open` / `In Progress` finding across all modules, highest severity first. +Resolved findings drop off this list but remain recorded in their module's +`findings.md` (see [REVIEW-PROCESS.md](REVIEW-PROCESS.md) §4–§5). Full detail — +description, location, recommendation — lives in the module's `findings.md`. -### Critical (6) +### Critical (5) | ID | Module | Title | |----|--------|-------| | CentralUI-001 | [CentralUI](CentralUI/findings.md) | Test Run sandbox executes arbitrary C# with no trust-model enforcement | | Communication-001 | [Communication](Communication/findings.md) | Snapshot timeout leaves orphaned bridge actor and site subscription | -| DataConnectionLayer-001 | [DataConnectionLayer](DataConnectionLayer/findings.md) | `Task.Run` in `HandleSubscribe` mutates actor state off the actor thread | | ExternalSystemGateway-001 | [ExternalSystemGateway](ExternalSystemGateway/findings.md) | No S&F delivery handler registered; cached calls and writes can never be delivered | | NotificationService-001 | [NotificationService](NotificationService/findings.md) | Buffered notifications are never retried (no S&F delivery handler) | | StoreAndForward-001 | [StoreAndForward](StoreAndForward/findings.md) | Replication to standby is never triggered by the active node | diff --git a/src/ScadaLink.DataConnectionLayer/Actors/DataConnectionActor.cs b/src/ScadaLink.DataConnectionLayer/Actors/DataConnectionActor.cs index 50a4904..810dcb5 100644 --- a/src/ScadaLink.DataConnectionLayer/Actors/DataConnectionActor.cs +++ b/src/ScadaLink.DataConnectionLayer/Actors/DataConnectionActor.cs @@ -171,6 +171,11 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers case UnsubscribeTagsRequest: Stash.Stash(); break; + case SubscribeCompleted sc: + // A subscribe started while Connected can complete after a transition; + // apply it so its state survives into the next ReSubscribeAll. + HandleSubscribeCompleted(sc); + break; case GetHealthReport: ReplyWithHealthReport(); break; @@ -207,6 +212,9 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers case SubscribeTagsRequest req: HandleSubscribe(req); break; + case SubscribeCompleted sc: + HandleSubscribeCompleted(sc); + break; case UnsubscribeTagsRequest req: HandleUnsubscribe(req); break; @@ -338,6 +346,11 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers case TagResolutionFailed: // Ignore — stale results from previous connection; ReSubscribeAll runs after reconnect break; + case SubscribeCompleted sc: + // A subscribe started while Connected can complete after a transition; + // apply it so its state survives into the next ReSubscribeAll. + HandleSubscribeCompleted(sc); + break; case GetHealthReport: ReplyWithHealthReport(); break; @@ -466,18 +479,27 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers if (!_subscriptionsByInstance.ContainsKey(request.InstanceUniqueName)) _subscriptionsByInstance[request.InstanceUniqueName] = new HashSet(); - var instanceTags = _subscriptionsByInstance[request.InstanceUniqueName]; var self = Self; var sender = Sender; + // Snapshot the already-subscribed tag set on the actor thread. The background + // task below must NOT read or mutate actor state — it performs only adapter + // I/O and reports results back via a SubscribeCompleted message, which is + // applied to actor state on the actor thread (see HandleSubscribeCompleted). + var alreadySubscribed = new HashSet(_subscriptionIds.Keys); + Task.Run(async () => { + var results = new List(request.TagPaths.Count); + var tagsToSeed = new List(); + foreach (var tagPath in request.TagPaths) { - if (_subscriptionIds.ContainsKey(tagPath)) + if (alreadySubscribed.Contains(tagPath)) { - // Already subscribed — just track for this instance - instanceTags.Add(tagPath); + // Already subscribed by another instance — just track for this one. + results.Add(new SubscribeTagResult(tagPath, AlreadySubscribed: true, Success: true, null, null)); + tagsToSeed.Add(tagPath); continue; } @@ -487,27 +509,21 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers { self.Tell(new TagValueReceived(path, value)); }); - _subscriptionIds[tagPath] = subId; - instanceTags.Add(tagPath); - _totalSubscribed++; - _resolvedTags++; + results.Add(new SubscribeTagResult(tagPath, AlreadySubscribed: false, Success: true, subId, null)); + tagsToSeed.Add(tagPath); } catch (Exception ex) { - // WP-12: Tag path resolution failure — mark as unresolved, retry later - _unresolvedTags.Add(tagPath); - instanceTags.Add(tagPath); - _totalSubscribed++; - - self.Tell(new TagResolutionFailed(tagPath, ex.Message)); + // WP-12: Tag path resolution failure — reported back as unresolved. + results.Add(new SubscribeTagResult(tagPath, AlreadySubscribed: false, Success: false, null, ex.Message)); } } - // Initial read — seed current values for all resolved tags so the Instance Actor - // doesn't stay Uncertain until the next OPC UA data change notification - foreach (var tagPath in instanceTags) + // Initial read — seed current values for resolved tags so the Instance Actor + // doesn't stay Uncertain until the next OPC UA data change notification. + // Tell is thread-safe, so seeded values are delivered directly as messages. + foreach (var tagPath in tagsToSeed) { - if (_unresolvedTags.Contains(tagPath)) continue; try { var readResult = await _adapter.ReadAsync(tagPath); @@ -522,11 +538,51 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers } } - return new SubscribeTagsResponse( - request.CorrelationId, request.InstanceUniqueName, true, null, DateTimeOffset.UtcNow); - }).PipeTo(sender); + return new SubscribeCompleted(request, sender, results); + }).PipeTo(self); + } - // Start tag resolution retry timer if we have unresolved tags + /// + /// Applies the result of an asynchronous subscribe on the actor thread. ALL mutation + /// of subscription state and counters happens here — never on the background task — + /// so the actor model's single-threaded state guarantee holds. + /// + private void HandleSubscribeCompleted(SubscribeCompleted msg) + { + var instanceName = msg.Request.InstanceUniqueName; + if (!_subscriptionsByInstance.TryGetValue(instanceName, out var instanceTags)) + { + // The instance was unsubscribed while the subscribe I/O was in flight. + instanceTags = new HashSet(); + _subscriptionsByInstance[instanceName] = instanceTags; + } + + foreach (var result in msg.Results) + { + instanceTags.Add(result.TagPath); + + // Re-check against current state: another subscribe may have resolved the + // same tag while this request's I/O was in flight. + if (result.AlreadySubscribed || _subscriptionIds.ContainsKey(result.TagPath)) + continue; + + if (result.Success) + { + _subscriptionIds[result.TagPath] = result.SubscriptionId!; + _totalSubscribed++; + _resolvedTags++; + } + else + { + // WP-12: mark unresolved so the periodic retry timer picks it up. + _unresolvedTags.Add(result.TagPath); + _totalSubscribed++; + _log.Debug("[{0}] Tag resolution failed for {1}: {2}", + _connectionName, result.TagPath, result.Error); + } + } + + // Start the tag-resolution retry timer if any tags are unresolved. if (_unresolvedTags.Count > 0) { Timers.StartPeriodicTimer( @@ -535,6 +591,9 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers _options.TagResolutionRetryInterval, _options.TagResolutionRetryInterval); } + + msg.ReplyTo.Tell(new SubscribeTagsResponse( + msg.Request.CorrelationId, instanceName, true, null, DateTimeOffset.UtcNow)); } private void HandleUnsubscribe(UnsubscribeTagsRequest request) @@ -764,5 +823,9 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers internal record TagResolutionFailed(string TagPath, string Error); internal record TagResolutionSucceeded(string TagPath, string SubscriptionId); internal record RetryTagResolution; + internal record SubscribeTagResult( + string TagPath, bool AlreadySubscribed, bool Success, string? SubscriptionId, string? Error); + internal record SubscribeCompleted( + SubscribeTagsRequest Request, IActorRef ReplyTo, IReadOnlyList Results); public record GetHealthReport; } diff --git a/tests/ScadaLink.DataConnectionLayer.Tests/DataConnectionActorTests.cs b/tests/ScadaLink.DataConnectionLayer.Tests/DataConnectionActorTests.cs index 8fce188..1d05a69 100644 --- a/tests/ScadaLink.DataConnectionLayer.Tests/DataConnectionActorTests.cs +++ b/tests/ScadaLink.DataConnectionLayer.Tests/DataConnectionActorTests.cs @@ -458,4 +458,87 @@ public class DataConnectionActorTests : TestKit await backupAdapter.Received().SubscribeAsync( "sensor/temp", Arg.Any(), Arg.Any()); } + + // ── DataConnectionLayer-001: subscribe must not mutate actor state off-thread ── + + private static async Task DelayedSubscribeAsync() + { + // A short delay so concurrent subscribe background tasks pile up and their + // post-await state mutations would race under the pre-fix implementation. + await Task.Delay(1); + return "sub-" + Guid.NewGuid().ToString("N"); + } + + [Fact] + public async Task DCL001_ConcurrentSubscribes_DoNotCorruptSubscriptionCounters() + { + // Regression test for DataConnectionLayer-001. HandleSubscribe used to mutate + // actor state (_subscriptionIds, _totalSubscribed, _resolvedTags, the per-instance + // HashSet) from a Task.Run background thread. Many concurrent subscribes then race + // on non-thread-safe Dictionary/HashSet and on non-atomic int++ — losing increments + // or throwing. After the fix every mutation is applied on the actor thread via a + // SubscribeCompleted message, so the final counts are exact. + _mockAdapter.ConnectAsync(Arg.Any>(), Arg.Any()) + .Returns(Task.CompletedTask); + _mockAdapter.Status.Returns(ConnectionHealth.Connected); + _mockAdapter.SubscribeAsync(Arg.Any(), Arg.Any(), Arg.Any()) + .Returns(_ => DelayedSubscribeAsync()); + _mockAdapter.ReadAsync(Arg.Any(), Arg.Any()) + .Returns(new ReadResult(false, null, null)); + + var actor = CreateConnectionActor("dcl001-concurrent"); + await Task.Delay(300); // reach Connected state + + const int instances = 30; + const int tagsPerInstance = 30; + for (var i = 0; i < instances; i++) + { + var tags = Enumerable.Range(0, tagsPerInstance) + .Select(j => $"inst{i}/tag{j}") + .ToArray(); + actor.Tell(new SubscribeTagsRequest( + $"corr{i}", $"inst{i}", "dcl001-concurrent", tags, DateTimeOffset.UtcNow)); + } + + // Every subscribe must be acknowledged. + for (var i = 0; i < instances; i++) + ExpectMsg(TimeSpan.FromSeconds(15)); + + actor.Tell(new DataConnectionActor.GetHealthReport()); + var report = ExpectMsg(TimeSpan.FromSeconds(5)); + + // Every tag is distinct, so each is a fresh, resolved subscription. + Assert.Equal(instances * tagsPerInstance, report.TotalSubscribedTags); + Assert.Equal(instances * tagsPerInstance, report.ResolvedTags); + } + + [Fact] + public async Task DCL001_SubscribeWithFailedTags_CountsResolvedAndUnresolvedSeparately() + { + // Behavioural guard: the restructured subscribe must preserve the original + // accounting — failed tags count toward TotalSubscribed but not ResolvedTags. + _mockAdapter.ConnectAsync(Arg.Any>(), Arg.Any()) + .Returns(Task.CompletedTask); + _mockAdapter.Status.Returns(ConnectionHealth.Connected); + _mockAdapter.SubscribeAsync(Arg.Any(), Arg.Any(), Arg.Any()) + .Returns(ci => ((string)ci[0]).StartsWith("bad") + ? Task.FromException(new Exception("tag not found")) + : Task.FromResult("sub-ok")); + _mockAdapter.ReadAsync(Arg.Any(), Arg.Any()) + .Returns(new ReadResult(false, null, null)); + + var actor = CreateConnectionActor("dcl001-failed-tags"); + await Task.Delay(300); + + actor.Tell(new SubscribeTagsRequest( + "c1", "inst1", "dcl001-failed-tags", + ["good/a", "good/b", "good/c", "bad/x", "bad/y"], DateTimeOffset.UtcNow)); + ExpectMsg(TimeSpan.FromSeconds(5)); + + actor.Tell(new DataConnectionActor.GetHealthReport()); + var report = ExpectMsg(TimeSpan.FromSeconds(3)); + + Assert.Equal(5, report.TotalSubscribedTags); // all 5 tags tracked + Assert.Equal(3, report.ResolvedTags); // only the 3 good ones resolved + } }