diff --git a/code-reviews/DataConnectionLayer/findings.md b/code-reviews/DataConnectionLayer/findings.md index 8c35124..b0f6be1 100644 --- a/code-reviews/DataConnectionLayer/findings.md +++ b/code-reviews/DataConnectionLayer/findings.md @@ -8,7 +8,7 @@ | Last reviewed | 2026-05-16 | | Reviewer | claude-agent | | Commit reviewed | `9c60592` | -| Open findings | 2 | +| Open findings | 0 | ## Summary @@ -381,7 +381,7 @@ after. |--|--| | Severity | Low | | Category | Performance & resource management | -| Status | Open | +| Status | Resolved | | Location | `src/ScadaLink.DataConnectionLayer/Actors/DataConnectionActor.cs:540-569` | **Description** @@ -404,7 +404,26 @@ prior state captured before removal. **Resolution** -_Unresolved._ +Resolved 2026-05-16 (commit pending). A `tagPath → subscriber-count` reverse index +(`_tagSubscriberCount`) was added: `HandleSubscribeCompleted` increments it whenever a +tag is newly added to an instance's set, and `HandleUnsubscribe` decrements it, +releasing a tag at the adapter only when the count reaches zero. The "any other +subscriber" check is now O(1) per tag instead of an O(instances) `Where(...).Any()` +scan. The redundant `!_unresolvedTags.Contains(tagPath)` re-check (always true after +the unconditional `Remove` on the line above) was removed — the surviving branch is +entered only for tags that have a subscription id, which are by definition resolved, +so `_resolvedTags--` is now unconditional with an explanatory comment. The cleanup +also fixed a latent leak the original code could not reach: an unresolved tag whose +last subscriber unsubscribes is now removed from `_unresolvedTags`/`_resolutionInFlight` +and decremented from `_totalSubscribed` (previously it lingered in the retry timer and +the subscribed total forever). Regression test +`DCL008_Unsubscribe_OnlyReleasesTagWhenLastSubscriberLeaves` subscribes a tag from two +instances plus an exclusive tag, then unsubscribes each instance and asserts the +shared tag is released at the adapter only after the last subscriber leaves and the +health counters track correctly. (This finding is a performance refactor, not a +correctness bug — the pre-fix `Where(...).Any()` logic was functionally correct, so +the test passes against both versions and serves as a behavioural guard for the +refactor.) ### DataConnectionLayer-009 — Implemented failover heuristic diverges from the documented state machine @@ -606,7 +625,7 @@ deliberately not made here because this task is scoped to |--|--| | Severity | Low | | Category | Documentation & comments | -| Status | Open | +| Status | Resolved | | Location | `src/ScadaLink.DataConnectionLayer/Adapters/OpcUaDataConnection.cs:270-281` | **Description** @@ -629,4 +648,16 @@ under a race and is tolerated downstream." **Resolution** -_Unresolved._ +Resolved 2026-05-16 (commit pending). Rather than weaken the XML comment to match the +weak guard, the guard was made genuinely atomic so the documented "only the first +caller fires the event" guarantee becomes true. `OpcUaDataConnection._disconnectFired` +and `RealOpcUaClient._connectionLostFired` were changed from `volatile bool` to `int`, +and the check-then-set in `RaiseDisconnected` / `OnSessionKeepAlive` replaced with a +single `Interlocked.Exchange(ref flag, 1) != 0` compare-and-set; the reset on connect +uses `Interlocked.Exchange(ref flag, 0)`. The XML comments on both methods were updated +to describe the atomic compare-and-set explicitly. Regression test +`DCL013_ConcurrentConnectionLost_RaisesDisconnectedExactlyOnce` runs 25 rounds, each +fanning 32 barrier-synchronised threads that raise the client's `ConnectionLost` event +simultaneously, and asserts `Disconnected` fires exactly once per round; against a +non-atomic check-then-set it double-fires (verified by temporarily reverting the +guard), and it passes against the atomic fix. diff --git a/src/ScadaLink.DataConnectionLayer/Actors/DataConnectionActor.cs b/src/ScadaLink.DataConnectionLayer/Actors/DataConnectionActor.cs index 3478bee..a08a16b 100644 --- a/src/ScadaLink.DataConnectionLayer/Actors/DataConnectionActor.cs +++ b/src/ScadaLink.DataConnectionLayer/Actors/DataConnectionActor.cs @@ -50,6 +50,13 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers /// private readonly Dictionary _subscriptionIds = new(); + /// + /// DataConnectionLayer-008: reverse index of how many instances subscribe to each + /// tag path. Lets decide whether any other instance + /// still needs a tag in O(1) instead of scanning every instance's tag set. + /// + private readonly Dictionary _tagSubscriberCount = new(); + /// /// Tags whose path resolution failed and are awaiting retry. /// @@ -600,7 +607,12 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers foreach (var result in msg.Results) { - instanceTags.Add(result.TagPath); + // DataConnectionLayer-008: only a tag newly added to THIS instance's set + // increments the reference count, so the count stays an accurate "number + // of distinct instances subscribed to this tag". + if (instanceTags.Add(result.TagPath)) + _tagSubscriberCount[result.TagPath] = + _tagSubscriberCount.GetValueOrDefault(result.TagPath) + 1; // Re-check against current state: another subscribe may have resolved the // same tag while this request's I/O was in flight. @@ -687,20 +699,29 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers // WP-14: Cleanup on Instance Actor stop foreach (var tagPath in tags) { - // Check if any other instance is still subscribed to this tag - var otherSubscribers = _subscriptionsByInstance - .Where(kvp => kvp.Key != request.InstanceUniqueName && kvp.Value.Contains(tagPath)) - .Any(); + // DataConnectionLayer-008: drop this instance's reference; the tag is only + // released at the adapter when no other instance still subscribes to it. + // The reference count makes this O(1) instead of an O(instances) scan. + var remaining = _tagSubscriberCount.GetValueOrDefault(tagPath) - 1; + if (remaining > 0) + { + _tagSubscriberCount[tagPath] = remaining; + continue; + } + _tagSubscriberCount.Remove(tagPath); - if (!otherSubscribers && _subscriptionIds.TryGetValue(tagPath, out var subId)) + // Last subscriber gone. A tag with a subscription id is a resolved tag; + // an unresolved tag never has a subscription id, so reaching this branch + // via TryGetValue means the tag was resolved — decrement _resolvedTags + // unconditionally (the previous `!_unresolvedTags.Contains` re-check after + // an unconditional Remove was always-true dead logic). + if (_subscriptionIds.TryGetValue(tagPath, out var subId)) { _ = _adapter.UnsubscribeAsync(subId); _subscriptionIds.Remove(tagPath); - _unresolvedTags.Remove(tagPath); _resolutionInFlight.Remove(tagPath); _totalSubscribed--; - if (!_unresolvedTags.Contains(tagPath)) - _resolvedTags--; + _resolvedTags--; // DataConnectionLayer-006: drop the tag's tracked quality so it is no // longer counted by PushBadQualityForAllTags (which sets _tagsBadQuality @@ -716,6 +737,16 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers } } } + else if (_unresolvedTags.Remove(tagPath)) + { + // Last subscriber gone for a tag that had never resolved: stop + // retrying it and drop it from the subscribed total. The previous + // implementation never reached this case (its guard required a + // subscription id), so an unresolved tag leaked into the retry timer + // and TotalSubscribedTags forever after its instance unsubscribed. + _resolutionInFlight.Remove(tagPath); + _totalSubscribed--; + } } _subscriptionsByInstance.Remove(request.InstanceUniqueName); diff --git a/src/ScadaLink.DataConnectionLayer/Adapters/OpcUaDataConnection.cs b/src/ScadaLink.DataConnectionLayer/Adapters/OpcUaDataConnection.cs index 68adc85..e9c0614 100644 --- a/src/ScadaLink.DataConnectionLayer/Adapters/OpcUaDataConnection.cs +++ b/src/ScadaLink.DataConnectionLayer/Adapters/OpcUaDataConnection.cs @@ -38,7 +38,12 @@ public class OpcUaDataConnection : IDataConnection _logger = logger; } - private volatile bool _disconnectFired; + // DataConnectionLayer-013: an int flag toggled with Interlocked.Exchange so the + // "only the first caller fires Disconnected" guard in RaiseDisconnected is genuinely + // atomic. A plain volatile bool gives visibility but not atomicity — two threads + // (e.g. the keep-alive thread and a ReadAsync failure path) could both observe it + // false and both raise the event. 0 = not fired, 1 = fired. + private int _disconnectFired; public ConnectionHealth Status => _status; public event Action? Disconnected; @@ -82,7 +87,7 @@ public class OpcUaDataConnection : IDataConnection await _client.ConnectAsync(_endpointUrl, options, cancellationToken); _status = ConnectionHealth.Connected; - _disconnectFired = false; + Interlocked.Exchange(ref _disconnectFired, 0); _logger.LogInformation("OPC UA connected to {Endpoint}", _endpointUrl); await StartHeartbeatMonitorAsync(config.Heartbeat, cancellationToken); @@ -285,12 +290,15 @@ public class OpcUaDataConnection : IDataConnection /// /// Marks the connection as disconnected and fires the Disconnected event once. - /// Thread-safe: only the first caller triggers the event. + /// Thread-safe: the firing guard is an atomic compare-and-set + /// (), so when several threads race + /// here — e.g. the keep-alive thread via and a + /// ReadAsync failure path — exactly one of them observes the 0→1 transition + /// and invokes . /// private void RaiseDisconnected() { - if (_disconnectFired) return; - _disconnectFired = true; + if (Interlocked.Exchange(ref _disconnectFired, 1) != 0) return; _status = ConnectionHealth.Disconnected; _logger.LogWarning("OPC UA connection to {Endpoint} lost", _endpointUrl); Disconnected?.Invoke(); diff --git a/src/ScadaLink.DataConnectionLayer/Adapters/RealOpcUaClient.cs b/src/ScadaLink.DataConnectionLayer/Adapters/RealOpcUaClient.cs index 0b2efd7..b949ffb 100644 --- a/src/ScadaLink.DataConnectionLayer/Adapters/RealOpcUaClient.cs +++ b/src/ScadaLink.DataConnectionLayer/Adapters/RealOpcUaClient.cs @@ -24,7 +24,10 @@ public class RealOpcUaClient : IOpcUaClient // Clear() is undefined behaviour, so they must be ConcurrentDictionary. private readonly ConcurrentDictionary _monitoredItems = new(); private readonly ConcurrentDictionary> _callbacks = new(); - private volatile bool _connectionLostFired; + // DataConnectionLayer-013: int flag toggled with Interlocked.Exchange so the + // once-only ConnectionLost guard in OnSessionKeepAlive is atomic, not just visible. + // 0 = not fired, 1 = fired. + private int _connectionLostFired; private OpcUaConnectionOptions _options = new(); private readonly OpcUaGlobalOptions _globalOptions; private readonly ILogger _logger; @@ -112,7 +115,7 @@ public class RealOpcUaClient : IOpcUaClient "ScadaLink-DCL-Session", (uint)opts.SessionTimeoutMs, userIdentity, null, cancellationToken); // Detect server going offline via keep-alive failures - _connectionLostFired = false; + Interlocked.Exchange(ref _connectionLostFired, 0); _session.KeepAlive += OnSessionKeepAlive; // Store options for monitored item creation @@ -243,14 +246,15 @@ public class RealOpcUaClient : IOpcUaClient /// /// Called by the OPC UA SDK when a keep-alive response arrives (or fails). - /// When CurrentState is bad, the server is unreachable. + /// When CurrentState is bad, the server is unreachable. The once-only guard is an + /// atomic compare-and-set, so a burst of failed keep-alives raises + /// exactly once. /// private void OnSessionKeepAlive(ISession session, KeepAliveEventArgs e) { if (ServiceResult.IsBad(e.Status)) { - if (_connectionLostFired) return; - _connectionLostFired = true; + if (Interlocked.Exchange(ref _connectionLostFired, 1) != 0) return; ConnectionLost?.Invoke(); } } diff --git a/tests/ScadaLink.DataConnectionLayer.Tests/DataConnectionActorTests.cs b/tests/ScadaLink.DataConnectionLayer.Tests/DataConnectionActorTests.cs index 4e20479..a1e7820 100644 --- a/tests/ScadaLink.DataConnectionLayer.Tests/DataConnectionActorTests.cs +++ b/tests/ScadaLink.DataConnectionLayer.Tests/DataConnectionActorTests.cs @@ -836,4 +836,63 @@ public class DataConnectionActorTests : TestKit Assert.Equal(5, report.TotalSubscribedTags); // all 5 tags tracked Assert.Equal(3, report.ResolvedTags); // only the 3 good ones resolved } + + // ── DataConnectionLayer-008: HandleUnsubscribe shared-tag reference counting ── + + [Fact] + public async Task DCL008_Unsubscribe_OnlyReleasesTagWhenLastSubscriberLeaves() + { + // Regression test for DataConnectionLayer-008. HandleUnsubscribe must release a + // tag at the adapter only when no other instance still subscribes to it. The + // O(n) per-tag scan over every instance was replaced with an O(1) reference + // count; this guards that the reference count tracks shared subscriptions + // correctly — a shared tag is kept while any subscriber remains and the + // resolved-tag counter and adapter UnsubscribeAsync stay consistent. + var unsubscribed = new System.Collections.Concurrent.ConcurrentBag(); + _mockAdapter.ConnectAsync(Arg.Any>(), Arg.Any()) + .Returns(Task.CompletedTask); + _mockAdapter.Status.Returns(ConnectionHealth.Connected); + _mockAdapter.SubscribeAsync(Arg.Any(), Arg.Any(), Arg.Any()) + .Returns(ci => Task.FromResult("sub-" + (string)ci[0])); + _mockAdapter.ReadAsync(Arg.Any(), Arg.Any()) + .Returns(new ReadResult(false, null, null)); + _mockAdapter.UnsubscribeAsync(Arg.Any(), Arg.Any()) + .Returns(ci => { unsubscribed.Add((string)ci[0]); return Task.CompletedTask; }); + + var actor = CreateConnectionActor("dcl008-shared"); + await Task.Delay(300); + + // Two instances both subscribe to the shared tag; instA also has an exclusive tag. + actor.Tell(new SubscribeTagsRequest("c1", "instA", "dcl008-shared", + ["shared/tag", "exclusive/a"], DateTimeOffset.UtcNow)); + ExpectMsg(TimeSpan.FromSeconds(5)); + actor.Tell(new SubscribeTagsRequest("c2", "instB", "dcl008-shared", + ["shared/tag"], DateTimeOffset.UtcNow)); + ExpectMsg(TimeSpan.FromSeconds(5)); + + // Unsubscribe instA — shared/tag must stay (instB still subscribes); only + // exclusive/a is released at the adapter. + actor.Tell(new UnsubscribeTagsRequest("c3", "instA", "dcl008-shared", DateTimeOffset.UtcNow)); + await Task.Delay(300); + + Assert.Contains("sub-exclusive/a", unsubscribed); + Assert.DoesNotContain("sub-shared/tag", unsubscribed); + + // Health: 1 tag still subscribed and resolved (shared/tag held by instB). + actor.Tell(new DataConnectionActor.GetHealthReport()); + var report1 = ExpectMsg(TimeSpan.FromSeconds(3)); + Assert.Equal(1, report1.TotalSubscribedTags); + Assert.Equal(1, report1.ResolvedTags); + + // Unsubscribe instB — now shared/tag has no subscribers and is released. + actor.Tell(new UnsubscribeTagsRequest("c4", "instB", "dcl008-shared", DateTimeOffset.UtcNow)); + await Task.Delay(300); + + Assert.Contains("sub-shared/tag", unsubscribed); + + actor.Tell(new DataConnectionActor.GetHealthReport()); + var report2 = ExpectMsg(TimeSpan.FromSeconds(3)); + Assert.Equal(0, report2.TotalSubscribedTags); + Assert.Equal(0, report2.ResolvedTags); + } } diff --git a/tests/ScadaLink.DataConnectionLayer.Tests/OpcUaDataConnectionTests.cs b/tests/ScadaLink.DataConnectionLayer.Tests/OpcUaDataConnectionTests.cs index f8187e8..f36f388 100644 --- a/tests/ScadaLink.DataConnectionLayer.Tests/OpcUaDataConnectionTests.cs +++ b/tests/ScadaLink.DataConnectionLayer.Tests/OpcUaDataConnectionTests.cs @@ -116,6 +116,45 @@ public class OpcUaDataConnectionTests Assert.Equal(ConnectionHealth.Disconnected, _adapter.Status); } + [Fact] + public async Task DCL013_ConcurrentConnectionLost_RaisesDisconnectedExactlyOnce() + { + // Regression test for DataConnectionLayer-013. RaiseDisconnected used a + // non-atomic check-then-set on a volatile bool: two threads racing through it + // (e.g. the keep-alive thread and a ReadAsync failure path, both routed via + // OnClientConnectionLost) could both observe _disconnectFired == false and both + // invoke Disconnected. The guard is now an atomic Interlocked.Exchange, so a + // burst of concurrent connection-lost callbacks fires the event exactly once. + // Repeat the burst: reconnecting between rounds re-arms the guard, so each + // round must independently fire Disconnected exactly once. Repetition makes + // the (timing-dependent) non-atomic race overwhelmingly likely to be caught. + const int rounds = 25; + const int threads = 32; + for (var round = 0; round < rounds; round++) + { + _mockClient.IsConnected.Returns(true); + await _adapter.ConnectAsync(new Dictionary()); + + var fired = 0; + void Handler() => Interlocked.Increment(ref fired); + _adapter.Disconnected += Handler; + + // Fan out: many threads raise the client's ConnectionLost event together. + using (var ready = new Barrier(threads)) + { + var tasks = Enumerable.Range(0, threads).Select(_ => Task.Run(() => + { + ready.SignalAndWait(); + _mockClient.ConnectionLost += Raise.Event(); + })).ToArray(); + await Task.WhenAll(tasks); + } + + _adapter.Disconnected -= Handler; + Assert.Equal(1, fired); + } + } + [Fact] public async Task Subscribe_DelegatesAndReturnsId() {