diff --git a/code-reviews/DataConnectionLayer/findings.md b/code-reviews/DataConnectionLayer/findings.md index 2fa18f1..8c35124 100644 --- a/code-reviews/DataConnectionLayer/findings.md +++ b/code-reviews/DataConnectionLayer/findings.md @@ -8,7 +8,7 @@ | Last reviewed | 2026-05-16 | | Reviewer | claude-agent | | Commit reviewed | `9c60592` | -| Open findings | 8 | +| Open findings | 2 | ## Summary @@ -287,7 +287,7 @@ unbounded code and passes after. Fixed by the commit whose message references |--|--| | Severity | Medium | | Category | Correctness & logic bugs | -| Status | Open | +| Status | Resolved | | Location | `src/ScadaLink.DataConnectionLayer/Actors/DataConnectionActor.cs:645-673,721-756` | **Description** @@ -303,6 +303,14 @@ decrement, and the totals can drift above `_totalSubscribed`. Over repeated disconnect/reconnect cycles the health report's good/bad/uncertain counts become unreliable. +**Verification note**: Confirmed against source. The root cause is broader than the +reconnect path the finding describes: `HandleUnsubscribe` also never removes a tag +from `_lastTagQuality` nor decrements its quality bucket, so an unsubscribed tag +lingers and `PushBadQualityForAllTags` (which sets `_tagsBadQuality = +_lastTagQuality.Count`) over-counts it — driving the bad-quality count above +`_totalSubscribed` even without a re-subscribe. Both the unsubscribe leak and the +re-subscribe drift are real. + **Recommendation** On `BecomeConnected` after a re-subscribe (or in `ReSubscribeAll`), clear @@ -312,7 +320,17 @@ fresh `TagValueReceived` messages. Alternatively recompute the buckets from **Resolution** -_Unresolved._ +Resolved 2026-05-16 (commit pending). `HandleUnsubscribe` now removes each +unsubscribed tag from `_lastTagQuality` and decrements the corresponding quality +bucket, then reports the corrected counters via `UpdateTagQuality`/`UpdateTagResolution`; +`ReSubscribeAll` clears `_lastTagQuality` and zeroes the three quality counters so +post-reconnect tags are repopulated from fresh `TagValueReceived` messages instead of +only incrementing. Regression test +`DCL006_DisconnectAfterUnsubscribe_BadQualityCountMatchesRemainingTags` subscribes two +tags, pushes Good values, unsubscribes one, then disconnects and asserts +`PushBadQualityForAllTags` reports exactly 1 bad tag (the reconnect is gated open so +`ReSubscribeAll` does not run before the assertion); it reports 2 against the pre-fix +code and 1 after. ### DataConnectionLayer-007 — `ReadBatchAsync` aborts the whole batch on the first failing tag @@ -320,7 +338,7 @@ _Unresolved._ |--|--| | Severity | Medium | | Category | Correctness & logic bugs | -| Status | Open | +| Status | Resolved | | Location | `src/ScadaLink.DataConnectionLayer/Adapters/OpcUaDataConnection.cs:187-195` | **Description** @@ -333,6 +351,9 @@ has a `Success`/`ErrorMessage` shape designed to carry per-tag failures. The bat also fully serial (one round-trip per tag), defeating the point of a batch API; the design doc lists `ReadBatch`/`WriteBatch` as first-class operations. +**Verification note**: Confirmed against source — `ReadAsync` re-throws on any +non-`OperationCanceledException`, aborting the whole batch. + **Recommendation** Catch per-tag exceptions inside the loop and store a failed `ReadResult` for that tag @@ -342,7 +363,17 @@ for all node IDs (`RealOpcUaClient.ReadValueAsync` already builds a **Resolution** -_Unresolved._ +Resolved 2026-05-16 (commit pending). `ReadBatchAsync` now wraps each per-tag +`ReadAsync` in a try/catch: a per-tag exception is recorded as a failed `ReadResult` +(`Success: false`, message = the exception message) so the batch returns a complete +result map for every requested tag; `OperationCanceledException` is still propagated +so a cancelled batch aborts as a whole. The per-tag-serial loop and single-service-call +optimisation were deliberately left for a follow-up — they are a performance concern, +not the correctness bug this finding raised. Regression test +`DCL007_ReadBatch_ReturnsPerTagResults_WhenOneTagFails` reads three tags where the +middle one throws and asserts all three appear in the result map with the failing one +marked unsuccessful; it threw (no map returned) against the pre-fix code and passes +after. ### DataConnectionLayer-008 — `HandleUnsubscribe` is O(n^2) over instances and rechecks `_unresolvedTags` redundantly @@ -379,9 +410,9 @@ _Unresolved._ | | | |--|--| -| Severity | Medium | +| Severity | Medium — partially design-doc work outside this module's editable scope | | Category | Design-document adherence | -| Status | Open | +| Status | Resolved | | Location | `src/ScadaLink.DataConnectionLayer/Actors/DataConnectionActor.cs:189,242-297,379-449`, `docs/requirements/Component-DataConnectionLayer.md:73-85` | **Description** @@ -398,6 +429,10 @@ all. A reviewer or operator reading `Component-DataConnectionLayer.md` would not predict this behaviour, and the 60 s threshold is a magic constant not exposed via `DataConnectionOptions`. +**Verification note**: Confirmed against source. The hard-coded +`StableConnectionThreshold = TimeSpan.FromSeconds(60)` `static readonly` field and the +`_consecutiveUnstableDisconnects` failover path both exist as described. + **Recommendation** Update `Component-DataConnectionLayer.md` to document the unstable-disconnect failover @@ -406,7 +441,19 @@ path and the stability threshold, and move the 60 s threshold into **Resolution** -_Unresolved._ +Resolved 2026-05-16 (commit pending). The configurability half of the recommendation +is done: the hard-coded `StableConnectionThreshold` constant was removed from +`DataConnectionActor` and replaced with a new `DataConnectionOptions.StableConnectionThreshold` +property (60 s default), bindable from the `DataConnectionLayer` `appsettings.json` +section like `ReconnectInterval`/`TagResolutionRetryInterval`/`WriteTimeout`. Regression +test `DCL009_StableConnectionThreshold_IsConfigurable_WithSixtySecondDefault` guards +the default and the setter. **The documentation half is out of this module's editable +scope** — `docs/requirements/Component-DataConnectionLayer.md` (lines 73-85) still +describes only the connect-failure failover path and does not mention the +unstable-disconnect trigger. **Action required (surfaced):** the DCL design doc should +be updated to document the unstable-disconnect failover path and the configurable +stability threshold; that edit was deliberately not made here because this task is +scoped to `src/ScadaLink.DataConnectionLayer`, tests, and this findings file only. ### DataConnectionLayer-010 — Tag-resolution retry can issue duplicate concurrent subscribe attempts @@ -414,7 +461,7 @@ _Unresolved._ |--|--| | Severity | Medium | | Category | Correctness & logic bugs | -| Status | Open | +| Status | Resolved | | Location | `src/ScadaLink.DataConnectionLayer/Actors/DataConnectionActor.cs:594-619,689-703` | **Description** @@ -429,6 +476,10 @@ monitored items / leaked subscription IDs (the second success overwrites with no `UnsubscribeAsync` call). The timer-cancel condition in `HandleTagResolutionSucceeded` is also non-deterministic for the same reason. +**Verification note**: Confirmed against source — `HandleRetryTagResolution` dispatched +`SubscribeAsync` for every tag in `_unresolvedTags` on every tick with no in-flight +guard. + **Recommendation** Remove tags from `_unresolvedTags` (into an "in-flight" set) when a retry is @@ -437,7 +488,18 @@ subscribe attempts and makes the timer-cancel condition deterministic. **Resolution** -_Unresolved._ +Resolved 2026-05-16 (commit pending). A new `_resolutionInFlight` `HashSet` +tracks tags whose retry `SubscribeAsync` is currently outstanding. +`HandleRetryTagResolution` now dispatches only for unresolved tags **not** already in +flight (and skips entirely if all are in flight), adding each dispatched tag to the +set; `HandleTagResolutionSucceeded` and `HandleTagResolutionFailed` remove the tag +from the set when its attempt completes, and `HandleUnsubscribe`/`ReSubscribeAll` +clear stale entries. This prevents overlapping duplicate subscribe attempts and the +resulting orphaned monitored items. Regression test +`DCL010_TagResolutionRetry_DoesNotIssueDuplicateConcurrentSubscribes` gives a tag a +genuine initial failure then a retry `SubscribeAsync` that never completes, lets six +100 ms retry ticks elapse, and asserts exactly one retry was dispatched (2 total +subscribe calls); the pre-fix code dispatched on every tick (6 total). ### DataConnectionLayer-011 — Stale subscription callbacks from disposed adapters can still reach the actor @@ -445,7 +507,7 @@ _Unresolved._ |--|--| | Severity | Medium | | Category | Error handling & resilience | -| Status | Open | +| Status | Resolved | | Location | `src/ScadaLink.DataConnectionLayer/Actors/DataConnectionActor.cs:486-489,278-285,416-425`, `src/ScadaLink.DataConnectionLayer/Adapters/OpcUaDataConnection.cs:252-262` | **Description** @@ -460,6 +522,10 @@ data with the new endpoint's data and briefly reporting a value the active endpo never produced. There is no per-adapter generation/epoch tag on `TagValueReceived` to distinguish current from stale callbacks. +**Verification note**: Confirmed against source — `TagValueReceived` carried no +adapter identity, and `HandleTagValueReceived` (reachable in `Connected`) processed +any such message regardless of which adapter produced it. + **Recommendation** Add an adapter-generation counter incremented on every adapter swap; stamp it onto @@ -468,15 +534,28 @@ generation does not match the current adapter in `HandleTagValueReceived`. **Resolution** -_Unresolved._ +Resolved 2026-05-16 (commit pending). Implemented exactly as recommended: a new +`_adapterGeneration` `int` field is incremented at both adapter-swap sites (the +unstable-disconnect failover in `BecomeReconnecting` and the connect-failure failover +in `HandleReconnectResult`). The `TagValueReceived` record gained an +`AdapterGeneration` field; every subscription callback closure (`HandleSubscribe`, the +initial-read seed, `HandleRetryTagResolution`, `ReSubscribeAll`) captures the +generation in effect at subscribe time and stamps it onto each `TagValueReceived`. +`HandleTagValueReceived` drops any message whose generation no longer matches the +current adapter, so a callback fired by a disposed adapter after failover cannot reach +an Instance Actor. Regression test +`DCL011_StaleTagValueFromOldAdapter_IsNotForwardedAfterFailover` subscribes on the +primary, fails over to the backup, then invokes the captured primary callback with a +stale value and asserts the subscriber receives nothing; the stale value reached the +subscriber against the pre-fix code and is dropped after. ### DataConnectionLayer-012 — `AutoAcceptUntrustedCerts` defaults to `true`, accepting any server certificate | | | |--|--| -| Severity | Medium | +| Severity | Medium — full secure default also requires a Commons + design-doc change outside this module | | Category | Security | -| Status | Open | +| Status | Resolved | | Location | `src/ScadaLink.DataConnectionLayer/Adapters/IOpcUaClient.cs:17`, `src/ScadaLink.DataConnectionLayer/Adapters/RealOpcUaClient.cs:49,60-61`, `docs/requirements/Component-DataConnectionLayer.md:116` | **Description** @@ -490,6 +569,13 @@ UA link. The design doc explicitly lists `true` as the default. For an industria control link this is a meaningful exposure; a secure-by-default posture would reject untrusted certs unless an operator opts in per connection. +**Verification note**: Confirmed against source. Note the *authoritative* runtime +default does not actually live on `OpcUaConnectionOptions` — for a real connection +`OpcUaDataConnection.ConnectAsync` builds `OpcUaConnectionOptions` from +`OpcUaEndpointConfig` (in `ScadaLink.Commons`), whose `AutoAcceptUntrustedCerts` +property also defaults to `true`. `OpcUaConnectionOptions`' own default is only the +fallback used when an `OpcUaConnectionOptions` is constructed directly. + **Recommendation** Default `AutoAcceptUntrustedCerts` to `false` and require explicit per-connection @@ -498,7 +584,21 @@ installed. Update the design doc to reflect the secure default. **Resolution** -_Unresolved._ +Resolved 2026-05-16 (commit pending). The two in-scope parts of the recommendation +are done: (1) `OpcUaConnectionOptions.AutoAcceptUntrustedCerts` now defaults to +`false`; (2) `RealOpcUaClient.ConnectAsync` logs a prominent `ILogger` warning +whenever the auto-accept certificate validator is installed (an `ILogger` +was added as an optional constructor parameter, defaulting to `NullLogger`, so +existing callers are unaffected). Regression test +`DCL012_OpcUaConnectionOptions_AutoAcceptUntrustedCerts_DefaultsToFalse` guards the +new secure default. **Two parts remain outside this module's editable scope and are +surfaced as action required:** (a) `ScadaLink.Commons.Types.DataConnections.OpcUaEndpointConfig.AutoAcceptUntrustedCerts` +still defaults to `true` — since that is the value actually used for a real connection +(see verification note above), the Commons default must also be flipped to `false` +for the system to be secure-by-default; (b) `docs/requirements/Component-DataConnectionLayer.md` +line 116 still documents `true` as the default and must be updated. Both edits were +deliberately not made here because this task is scoped to +`src/ScadaLink.DataConnectionLayer`, tests, and this findings file only. ### DataConnectionLayer-013 — Misleading XML comment: `RaiseDisconnected` claims thread safety it does not provide diff --git a/src/ScadaLink.DataConnectionLayer/Actors/DataConnectionActor.cs b/src/ScadaLink.DataConnectionLayer/Actors/DataConnectionActor.cs index 75ee993..3478bee 100644 --- a/src/ScadaLink.DataConnectionLayer/Actors/DataConnectionActor.cs +++ b/src/ScadaLink.DataConnectionLayer/Actors/DataConnectionActor.cs @@ -55,6 +55,13 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers /// private readonly HashSet _unresolvedTags = new(); + /// + /// DataConnectionLayer-010: tags whose retry SubscribeAsync is currently in flight. + /// They are excluded from the next retry tick so a slow attempt is not duplicated + /// (which would leak monitored items / subscription ids). + /// + private readonly HashSet _resolutionInFlight = new(); + /// /// Subscribers: instanceUniqueName → IActorRef (the Instance Actor). /// @@ -80,6 +87,15 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers private int _consecutiveUnstableDisconnects; private DateTimeOffset _lastConnectedAt; + /// + /// DataConnectionLayer-011: monotonically increasing tag that identifies the + /// current adapter instance. Subscription callbacks capture the generation in + /// effect when they were created; a whose + /// generation no longer matches comes from a disposed adapter and is dropped so + /// stale pre-failover device data is never forwarded to Instance Actors. + /// + private int _adapterGeneration; + /// /// Captured Self reference for use from non-actor threads (event handlers, callbacks). /// Akka.NET's Self property is only valid inside the actor's message loop. @@ -187,12 +203,6 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers // ── Connected State ── - /// - /// Minimum time connected before we consider the connection stable. - /// If we disconnect before this, it counts as an unstable connection toward failover. - /// - private static readonly TimeSpan StableConnectionThreshold = TimeSpan.FromSeconds(60); - private void BecomeConnected() { _log.Info("[{0}] Entering Connected state", _connectionName); @@ -263,7 +273,7 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers // If we were connected for less than the stability threshold, this counts // as an unstable cycle (e.g., connect succeeded but heartbeat went stale). var connectionDuration = DateTimeOffset.UtcNow - _lastConnectedAt; - if (_lastConnectedAt != default && connectionDuration < StableConnectionThreshold) + if (_lastConnectedAt != default && connectionDuration < _options.StableConnectionThreshold) { _consecutiveUnstableDisconnects++; _log.Warning("[{0}] Unstable connection (lasted {1:F0}s) — consecutive unstable disconnects: {2}/{3}", @@ -298,6 +308,10 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers _connectionDetails = newConfig; _adapter.Disconnected += OnAdapterDisconnected; + // DataConnectionLayer-011: new adapter — bump the generation so callbacks + // from the disposed adapter are recognised as stale and dropped. + _adapterGeneration++; + _log.Warning("[{0}] Failing over from {1} to {2} (unstable connection pattern)", _connectionName, previousEndpoint, _activeEndpoint); @@ -306,7 +320,7 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers _ = _siteEventLogger.LogEventAsync( "connection", "Warning", null, _connectionName, $"Failover from {previousEndpoint} to {_activeEndpoint} (unstable connection)", - $"Connection lasted {connectionDuration.TotalSeconds:F0}s, threshold {StableConnectionThreshold.TotalSeconds:F0}s"); + $"Connection lasted {connectionDuration.TotalSeconds:F0}s, threshold {_options.StableConnectionThreshold.TotalSeconds:F0}s"); } } @@ -443,6 +457,10 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers // Wire disconnect handler on new adapter _adapter.Disconnected += OnAdapterDisconnected; + // DataConnectionLayer-011: new adapter — bump the generation so callbacks + // from the disposed adapter are recognised as stale and dropped. + _adapterGeneration++; + _log.Warning("[{0}] Failing over from {1} to {2}", _connectionName, previousEndpoint, _activeEndpoint); @@ -487,6 +505,9 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers var self = Self; var sender = Sender; + // DataConnectionLayer-011: capture the current adapter generation so callbacks + // from this adapter can be distinguished from a later (post-failover) adapter. + var generation = _adapterGeneration; // Snapshot the already-subscribed tag set on the actor thread. The background // task below must NOT read or mutate actor state — it performs only adapter @@ -513,7 +534,7 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers { var subId = await _adapter.SubscribeAsync(tagPath, (path, value) => { - self.Tell(new TagValueReceived(path, value)); + self.Tell(new TagValueReceived(path, value, generation)); }); results.Add(new SubscribeTagResult(tagPath, AlreadySubscribed: false, Success: true, subId, null)); tagsToSeed.Add(tagPath); @@ -541,7 +562,7 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers var readResult = await _adapter.ReadAsync(tagPath); if (readResult.Success && readResult.Value != null) { - self.Tell(new TagValueReceived(tagPath, readResult.Value)); + self.Tell(new TagValueReceived(tagPath, readResult.Value, generation)); } } catch @@ -676,14 +697,34 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers _ = _adapter.UnsubscribeAsync(subId); _subscriptionIds.Remove(tagPath); _unresolvedTags.Remove(tagPath); + _resolutionInFlight.Remove(tagPath); _totalSubscribed--; if (!_unresolvedTags.Contains(tagPath)) _resolvedTags--; + + // DataConnectionLayer-006: drop the tag's tracked quality so it is no + // longer counted by PushBadQualityForAllTags (which sets _tagsBadQuality + // from _lastTagQuality.Count). Leaving it here drifts the quality + // counters above _totalSubscribed across disconnect cycles. + if (_lastTagQuality.Remove(tagPath, out var droppedQuality)) + { + switch (droppedQuality) + { + case QualityCode.Good: _tagsGoodQuality--; break; + case QualityCode.Bad: _tagsBadQuality--; break; + case QualityCode.Uncertain: _tagsUncertainQuality--; break; + } + } } } _subscriptionsByInstance.Remove(request.InstanceUniqueName); _subscribers.Remove(request.InstanceUniqueName); + + // DataConnectionLayer-006: keep the reported quality counters in sync after the + // unsubscribed tags' buckets were decremented above. + _healthCollector.UpdateTagQuality(_connectionName, _tagsGoodQuality, _tagsBadQuality, _tagsUncertainQuality); + _healthCollector.UpdateTagResolution(_connectionName, _totalSubscribed, _resolvedTags); } // ── Write Support (WP-11) ── @@ -731,16 +772,29 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers return; } - _log.Debug("[{0}] Retrying resolution for {1} unresolved tags", _connectionName, _unresolvedTags.Count); - var self = Self; - var toResolve = _unresolvedTags.ToList(); + // DataConnectionLayer-010: only dispatch retries for tags that do not already + // have an attempt in flight. A slow SubscribeAsync overlapping the next tick + // would otherwise produce duplicate concurrent subscribes for the same tag. + var toResolve = _unresolvedTags.Where(t => !_resolutionInFlight.Contains(t)).ToList(); + + if (toResolve.Count == 0) + { + _log.Debug("[{0}] Tag-resolution retry skipped — {1} attempt(s) still in flight", + _connectionName, _resolutionInFlight.Count); + return; + } + + _log.Debug("[{0}] Retrying resolution for {1} unresolved tags", _connectionName, toResolve.Count); + + var generation = _adapterGeneration; foreach (var tagPath in toResolve) { + _resolutionInFlight.Add(tagPath); _adapter.SubscribeAsync(tagPath, (path, value) => { - self.Tell(new TagValueReceived(path, value)); + self.Tell(new TagValueReceived(path, value, generation)); }).ContinueWith(t => { if (t.IsCompletedSuccessfully) @@ -788,13 +842,25 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers var self = Self; _subscriptionIds.Clear(); _unresolvedTags.Clear(); + _resolutionInFlight.Clear(); _resolvedTags = 0; + // DataConnectionLayer-006: reset the quality tracking too. Otherwise tags + // resolved for the first time after reconnect (never in _lastTagQuality) only + // increment their bucket and the totals drift above _totalSubscribed. They are + // repopulated from fresh TagValueReceived messages once subscriptions activate. + _lastTagQuality.Clear(); + _tagsGoodQuality = 0; + _tagsBadQuality = 0; + _tagsUncertainQuality = 0; + _healthCollector.UpdateTagQuality(_connectionName, _tagsGoodQuality, _tagsBadQuality, _tagsUncertainQuality); + + var generation = _adapterGeneration; foreach (var tagPath in allTags) { _adapter.SubscribeAsync(tagPath, (path, value) => { - self.Tell(new TagValueReceived(path, value)); + self.Tell(new TagValueReceived(path, value, generation)); }).ContinueWith(t => { if (t.IsCompletedSuccessfully) @@ -820,6 +886,9 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers private void HandleTagResolutionSucceeded(TagResolutionSucceeded msg) { + // DataConnectionLayer-010: the retry attempt for this tag has completed. + _resolutionInFlight.Remove(msg.TagPath); + if (_unresolvedTags.Remove(msg.TagPath)) { _subscriptionIds[msg.TagPath] = msg.SubscriptionId; @@ -839,6 +908,10 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers _log.Debug("[{0}] Tag resolution still failing for {1}: {2}", _connectionName, msg.TagPath, msg.Error); + // DataConnectionLayer-010: the retry attempt for this tag has completed — + // it is eligible for the next retry tick again. + _resolutionInFlight.Remove(msg.TagPath); + // Track as unresolved so periodic retry picks it up if (_unresolvedTags.Add(msg.TagPath)) { @@ -852,6 +925,16 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers private void HandleTagValueReceived(TagValueReceived msg) { + // DataConnectionLayer-011: drop values delivered by a disposed adapter. After a + // failover the old adapter's OPC UA SDK threads may still fire callbacks; those + // carry a stale generation and must not be forwarded to Instance Actors. + if (msg.AdapterGeneration != _adapterGeneration) + { + _log.Debug("[{0}] Dropping stale tag value for {1} from adapter generation {2} (current {3})", + _connectionName, msg.TagPath, msg.AdapterGeneration, _adapterGeneration); + return; + } + // Fan out to all subscribed instances foreach (var (instanceName, tags) in _subscriptionsByInstance) { @@ -892,7 +975,7 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers internal record AttemptConnect; internal record ConnectResult(bool Success, string? Error); internal record AdapterDisconnected; - internal record TagValueReceived(string TagPath, TagValue Value); + internal record TagValueReceived(string TagPath, TagValue Value, int AdapterGeneration); internal record TagResolutionFailed(string TagPath, string Error); internal record TagResolutionSucceeded(string TagPath, string SubscriptionId); internal record RetryTagResolution; diff --git a/src/ScadaLink.DataConnectionLayer/Adapters/IOpcUaClient.cs b/src/ScadaLink.DataConnectionLayer/Adapters/IOpcUaClient.cs index 3eb88b3..298d107 100644 --- a/src/ScadaLink.DataConnectionLayer/Adapters/IOpcUaClient.cs +++ b/src/ScadaLink.DataConnectionLayer/Adapters/IOpcUaClient.cs @@ -14,7 +14,10 @@ public record OpcUaConnectionOptions( int SamplingIntervalMs = 1000, int QueueSize = 10, string SecurityMode = "None", - bool AutoAcceptUntrustedCerts = true, + // DataConnectionLayer-012: secure-by-default — untrusted server certificates are + // rejected unless an operator explicitly opts in per connection. Accepting any + // certificate defeats the Sign / SignAndEncrypt modes against a man-in-the-middle. + bool AutoAcceptUntrustedCerts = false, bool DiscardOldest = true, byte SubscriptionPriority = 0, string SubscriptionDisplayName = "ScadaLink", diff --git a/src/ScadaLink.DataConnectionLayer/Adapters/OpcUaDataConnection.cs b/src/ScadaLink.DataConnectionLayer/Adapters/OpcUaDataConnection.cs index 4eb1fed..68adc85 100644 --- a/src/ScadaLink.DataConnectionLayer/Adapters/OpcUaDataConnection.cs +++ b/src/ScadaLink.DataConnectionLayer/Adapters/OpcUaDataConnection.cs @@ -186,10 +186,26 @@ public class OpcUaDataConnection : IDataConnection public async Task> ReadBatchAsync(IEnumerable tagPaths, CancellationToken cancellationToken = default) { + // DataConnectionLayer-007: a single failing tag must not abort the whole batch. + // ReadAsync re-throws non-cancellation exceptions; catch them per tag and record + // a failed ReadResult so the caller receives a complete result map for every + // requested tag (the ReadResult shape already carries per-tag Success/error). var results = new Dictionary(); foreach (var tagPath in tagPaths) { - results[tagPath] = await ReadAsync(tagPath, cancellationToken); + try + { + results[tagPath] = await ReadAsync(tagPath, cancellationToken); + } + catch (OperationCanceledException) + { + // Cancellation aborts the whole batch — propagate it. + throw; + } + catch (Exception ex) + { + results[tagPath] = new ReadResult(false, null, ex.Message); + } } return results; } diff --git a/src/ScadaLink.DataConnectionLayer/Adapters/RealOpcUaClient.cs b/src/ScadaLink.DataConnectionLayer/Adapters/RealOpcUaClient.cs index d3552d5..0b2efd7 100644 --- a/src/ScadaLink.DataConnectionLayer/Adapters/RealOpcUaClient.cs +++ b/src/ScadaLink.DataConnectionLayer/Adapters/RealOpcUaClient.cs @@ -1,5 +1,7 @@ using System.Collections.Concurrent; using System.Security.Cryptography.X509Certificates; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; using Opc.Ua; using Opc.Ua.Client; using Opc.Ua.Configuration; @@ -25,10 +27,12 @@ public class RealOpcUaClient : IOpcUaClient private volatile bool _connectionLostFired; private OpcUaConnectionOptions _options = new(); private readonly OpcUaGlobalOptions _globalOptions; + private readonly ILogger _logger; - public RealOpcUaClient(OpcUaGlobalOptions? globalOptions = null) + public RealOpcUaClient(OpcUaGlobalOptions? globalOptions = null, ILogger? logger = null) { _globalOptions = globalOptions ?? new OpcUaGlobalOptions(); + _logger = logger ?? NullLogger.Instance; } public bool IsConnected => _session?.Connected ?? false; @@ -65,7 +69,16 @@ public class RealOpcUaClient : IOpcUaClient await appConfig.ValidateAsync(ApplicationType.Client); if (opts.AutoAcceptUntrustedCerts) + { + // DataConnectionLayer-012: this accepts ANY server certificate, defeating + // certificate trust enforcement. Surface a prominent warning so an operator + // who has opted in is aware of the man-in-the-middle exposure on the link. + _logger.LogWarning( + "OPC UA connection to {Endpoint} has AutoAcceptUntrustedCerts enabled — every " + + "server certificate is accepted unconditionally. This defeats Sign / " + + "SignAndEncrypt protection against a man-in-the-middle.", endpointUrl); appConfig.CertificateValidator.CertificateValidation += (_, e) => e.Accept = true; + } // Discover endpoints from the server, pick the preferred security mode EndpointDescription? endpoint; diff --git a/src/ScadaLink.DataConnectionLayer/DataConnectionOptions.cs b/src/ScadaLink.DataConnectionLayer/DataConnectionOptions.cs index 57b58b4..c7c8f6e 100644 --- a/src/ScadaLink.DataConnectionLayer/DataConnectionOptions.cs +++ b/src/ScadaLink.DataConnectionLayer/DataConnectionOptions.cs @@ -13,4 +13,11 @@ public class DataConnectionOptions /// Timeout for synchronous write operations to devices. public TimeSpan WriteTimeout { get; set; } = TimeSpan.FromSeconds(30); + + /// + /// Minimum time a connection must stay up before it is considered stable. + /// If a connection drops before this threshold, it counts as an unstable + /// disconnect toward the failover retry count (DataConnectionLayer-009). + /// + public TimeSpan StableConnectionThreshold { get; set; } = TimeSpan.FromSeconds(60); } diff --git a/tests/ScadaLink.DataConnectionLayer.Tests/DataConnectionActorTests.cs b/tests/ScadaLink.DataConnectionLayer.Tests/DataConnectionActorTests.cs index 58a3891..4e20479 100644 --- a/tests/ScadaLink.DataConnectionLayer.Tests/DataConnectionActorTests.cs +++ b/tests/ScadaLink.DataConnectionLayer.Tests/DataConnectionActorTests.cs @@ -612,6 +612,197 @@ public class DataConnectionActorTests : TestKit Assert.Contains("timeout", response.ErrorMessage, StringComparison.OrdinalIgnoreCase); } + // ── DataConnectionLayer-006: quality counters must not drift after unsubscribe/reconnect ── + + [Fact] + public async Task DCL006_DisconnectAfterUnsubscribe_BadQualityCountMatchesRemainingTags() + { + // Regression test for DataConnectionLayer-006. _lastTagQuality and the three + // quality counters were never cleaned up on unsubscribe, so a tag removed via + // HandleUnsubscribe lingered in _lastTagQuality. PushBadQualityForAllTags then + // set _tagsBadQuality = _lastTagQuality.Count, counting the dropped tag and + // drifting the bad-quality count above the number of currently subscribed tags. + var callbacks = new System.Collections.Concurrent.ConcurrentDictionary(); + var connectCount = 0; + var reconnectGate = new TaskCompletionSource(); + // First connect succeeds; the reconnect after the disconnect hangs so the actor + // stays in Reconnecting and ReSubscribeAll does not run before the assertion. + _mockAdapter.ConnectAsync(Arg.Any>(), Arg.Any()) + .Returns(_ => Interlocked.Increment(ref connectCount) == 1 + ? Task.CompletedTask + : reconnectGate.Task); + _mockAdapter.Status.Returns(ConnectionHealth.Connected); + _mockAdapter.SubscribeAsync(Arg.Any(), Arg.Any(), Arg.Any()) + .Returns(ci => + { + callbacks[(string)ci[0]] = (SubscriptionCallback)ci[1]; + return Task.FromResult("sub-" + (string)ci[0]); + }); + _mockAdapter.ReadAsync(Arg.Any(), Arg.Any()) + .Returns(new ReadResult(false, null, null)); + + var actor = CreateConnectionActor("dcl006-drift"); + await Task.Delay(300); + + // Two instances, one tag each. + actor.Tell(new SubscribeTagsRequest("c1", "instA", "dcl006-drift", ["tagA"], DateTimeOffset.UtcNow)); + ExpectMsg(TimeSpan.FromSeconds(5)); + actor.Tell(new SubscribeTagsRequest("c2", "instB", "dcl006-drift", ["tagB"], DateTimeOffset.UtcNow)); + ExpectMsg(TimeSpan.FromSeconds(5)); + + // Push a Good value for each tag so both land in _lastTagQuality. + AwaitCondition(() => callbacks.ContainsKey("tagA") && callbacks.ContainsKey("tagB"), + TimeSpan.FromSeconds(3)); + callbacks["tagA"]("tagA", new TagValue(1, QualityCode.Good, DateTimeOffset.UtcNow)); + callbacks["tagB"]("tagB", new TagValue(2, QualityCode.Good, DateTimeOffset.UtcNow)); + await Task.Delay(200); + + // Unsubscribe instance B — tagB is no longer subscribed by anyone. + actor.Tell(new UnsubscribeTagsRequest("c3", "instB", "dcl006-drift", DateTimeOffset.UtcNow)); + await Task.Delay(200); + + _mockHealthCollector.ClearReceivedCalls(); + + // Disconnect — PushBadQualityForAllTags runs (the reconnect hangs on the gate, + // so the actor stays in Reconnecting and ReSubscribeAll does not run). + RaiseDisconnected(_mockAdapter); + await Task.Delay(300); + + // PushBadQualityForAllTags must report exactly 1 bad tag (only tagA is still + // subscribed). Pre-fix tagB lingered in _lastTagQuality and bad was reported as 2. + var qualityCall = _mockHealthCollector.ReceivedCalls() + .Where(c => c.GetMethodInfo().Name == "UpdateTagQuality") + .FirstOrDefault(); + Assert.NotNull(qualityCall); + var args = qualityCall!.GetArguments(); + var bad = (int)args[2]!; + Assert.Equal(1, bad); + + reconnectGate.SetCanceled(); + } + + // ── DataConnectionLayer-010: tag-resolution retry must not double-dispatch ── + + [Fact] + public async Task DCL010_TagResolutionRetry_DoesNotIssueDuplicateConcurrentSubscribes() + { + // Regression test for DataConnectionLayer-010. HandleRetryTagResolution fired a + // SubscribeAsync for every unresolved tag without removing it from _unresolvedTags + // first. A slow SubscribeAsync overlapping the next retry tick produced duplicate + // concurrent subscribe attempts for the same tag, leaking the first monitored + // item / subscription id. After the fix a tag in flight is excluded from the + // next retry until its attempt completes. + _options.TagResolutionRetryInterval = TimeSpan.FromMilliseconds(100); + + var subscribeGate = new TaskCompletionSource(); + var subscribeCalls = 0; + _mockAdapter.ConnectAsync(Arg.Any>(), Arg.Any()) + .Returns(Task.CompletedTask); + _mockAdapter.Status.Returns(ConnectionHealth.Connected); + _mockAdapter.SubscribeAsync("slow/tag", Arg.Any(), Arg.Any()) + .Returns(ci => + { + var n = Interlocked.Increment(ref subscribeCalls); + // First call (initial subscribe) fails genuinely → unresolved. + if (n == 1) return Task.FromException(new KeyNotFoundException("not found yet")); + // Subsequent calls are retry attempts — block on the gate so they stay + // in flight across multiple retry ticks. + return subscribeGate.Task; + }); + _mockAdapter.ReadAsync(Arg.Any(), Arg.Any()) + .Returns(new ReadResult(false, null, null)); + + var actor = CreateConnectionActor("dcl010-retry"); + await Task.Delay(300); + + actor.Tell(new SubscribeTagsRequest("c1", "inst1", "dcl010-retry", ["slow/tag"], DateTimeOffset.UtcNow)); + // Initial subscribe fails → bad-quality push then ack. + ExpectMsg(TimeSpan.FromSeconds(5)); + ExpectMsg(TimeSpan.FromSeconds(5)); + + // Let several retry ticks (100ms each) elapse while the first retry is blocked. + await Task.Delay(600); + + // Exactly one retry attempt should be in flight: 1 initial + 1 retry = 2 total. + // Pre-fix, every 100ms tick dispatched another → far more than 2. + Assert.Equal(2, Volatile.Read(ref subscribeCalls)); + + subscribeGate.SetCanceled(); + } + + // ── DataConnectionLayer-011: stale callbacks from a disposed adapter must be dropped ── + + [Fact] + public async Task DCL011_StaleTagValueFromOldAdapter_IsNotForwardedAfterFailover() + { + // Regression test for DataConnectionLayer-011. On failover the old adapter is + // disposed and a fresh one created, but the old adapter's subscription callbacks + // captured Self and keep Telling TagValueReceived. With no per-adapter generation + // tag, a value from the disposed adapter delivered after the actor is Connected + // on the new endpoint would be forwarded to the Instance Actor, mixing + // pre-failover device data with the active endpoint's data. + var primaryConfig = new Dictionary { ["Endpoint"] = "opc.tcp://primary:4840" }; + var backupConfig = new Dictionary { ["Endpoint"] = "opc.tcp://backup:4840" }; + var primaryAdapter = Substitute.For(); + var backupAdapter = Substitute.For(); + + SubscriptionCallback? primaryCallback = null; + + var primaryConnectCount = 0; + primaryAdapter.ConnectAsync(Arg.Any>(), Arg.Any()) + .Returns(_ => Interlocked.Increment(ref primaryConnectCount) == 1 + ? Task.CompletedTask + : Task.FromException(new Exception("Primary down"))); + primaryAdapter.Status.Returns(ConnectionHealth.Connected); + primaryAdapter.SubscribeAsync("sensor/temp", Arg.Any(), Arg.Any()) + .Returns(ci => + { + primaryCallback = (SubscriptionCallback)ci[1]; + return Task.FromResult("sub-primary"); + }); + primaryAdapter.ReadAsync(Arg.Any(), Arg.Any()) + .Returns(new ReadResult(false, null, null)); + + _mockFactory.Create("OpcUa", Arg.Is>(d => d["Endpoint"] == "opc.tcp://backup:4840")) + .Returns(backupAdapter); + backupAdapter.ConnectAsync(Arg.Any>(), Arg.Any()) + .Returns(Task.CompletedTask); + backupAdapter.Status.Returns(ConnectionHealth.Connected); + backupAdapter.SubscribeAsync(Arg.Any(), Arg.Any(), Arg.Any()) + .Returns("sub-backup"); + backupAdapter.ReadAsync(Arg.Any(), Arg.Any()) + .Returns(new ReadResult(false, null, null)); + + var actor = CreateFailoverActor(primaryAdapter, "dcl011-stale", primaryConfig, backupConfig, failoverRetryCount: 1); + + AwaitCondition(() => primaryConnectCount >= 1, TimeSpan.FromSeconds(2)); + await Task.Delay(200); + + actor.Tell(new SubscribeTagsRequest("c1", TestActor.Path.Name, "dcl011-stale", ["sensor/temp"], DateTimeOffset.UtcNow)); + ExpectMsg(TimeSpan.FromSeconds(3)); + AwaitCondition(() => primaryCallback != null, TimeSpan.FromSeconds(3)); + + // Fail over to backup. + RaiseDisconnected(primaryAdapter); + + // The disconnect pushes a bad-quality ConnectionQualityChanged to the subscriber. + ExpectMsg(TimeSpan.FromSeconds(3)); + + AwaitCondition(() => + backupAdapter.ReceivedCalls().Any(c => c.GetMethodInfo().Name == "SubscribeAsync"), + TimeSpan.FromSeconds(5)); + await Task.Delay(300); // actor is Connected on backup + + // Drain any value updates produced by the re-subscribe path. + ExpectNoMsg(TimeSpan.FromMilliseconds(300)); + + // The disposed primary adapter's callback fires a stale value. + primaryCallback!("sensor/temp", new TagValue(999, QualityCode.Good, DateTimeOffset.UtcNow)); + + // That stale value must NOT reach the subscriber. + ExpectNoMsg(TimeSpan.FromSeconds(1)); + } + [Fact] public async Task DCL001_SubscribeWithFailedTags_CountsResolvedAndUnresolvedSeparately() { diff --git a/tests/ScadaLink.DataConnectionLayer.Tests/OpcUaDataConnectionTests.cs b/tests/ScadaLink.DataConnectionLayer.Tests/OpcUaDataConnectionTests.cs index 2909ae3..f8187e8 100644 --- a/tests/ScadaLink.DataConnectionLayer.Tests/OpcUaDataConnectionTests.cs +++ b/tests/ScadaLink.DataConnectionLayer.Tests/OpcUaDataConnectionTests.cs @@ -37,6 +37,44 @@ public class RealOpcUaClientThreadSafetyTests } } +/// +/// DataConnectionLayer-012: secure-by-default certificate handling. +/// +public class OpcUaCertificateDefaultTests +{ + [Fact] + public void DCL012_OpcUaConnectionOptions_AutoAcceptUntrustedCerts_DefaultsToFalse() + { + // Regression test for DataConnectionLayer-012. AutoAcceptUntrustedCerts defaulted + // to true, accepting every server certificate unconditionally and defeating the + // Sign / SignAndEncrypt security modes against an active man-in-the-middle. A + // secure-by-default posture rejects untrusted certs unless explicitly opted in. + var options = new OpcUaConnectionOptions(); + Assert.False(options.AutoAcceptUntrustedCerts); + } +} + +/// +/// DataConnectionLayer-009: failover-stability tunables must be configurable. +/// +public class DataConnectionOptionsStabilityTests +{ + [Fact] + public void DCL009_StableConnectionThreshold_IsConfigurable_WithSixtySecondDefault() + { + // Regression test for DataConnectionLayer-009. The unstable-disconnect failover + // path used a hard-coded 60s StableConnectionThreshold constant inside + // DataConnectionActor. It must live on DataConnectionOptions like the other + // tunables (ReconnectInterval, TagResolutionRetryInterval, WriteTimeout) so it + // is configurable via appsettings.json. + var options = new DataConnectionOptions(); + Assert.Equal(TimeSpan.FromSeconds(60), options.StableConnectionThreshold); + + options.StableConnectionThreshold = TimeSpan.FromSeconds(30); + Assert.Equal(TimeSpan.FromSeconds(30), options.StableConnectionThreshold); + } +} + /// /// WP-7: Tests for OPC UA adapter. /// @@ -162,6 +200,36 @@ public class OpcUaDataConnectionTests Assert.All(results.Values, r => Assert.True(r.Success)); } + [Fact] + public async Task DCL007_ReadBatch_ReturnsPerTagResults_WhenOneTagFails() + { + // Regression test for DataConnectionLayer-007. ReadBatchAsync looped calling + // ReadAsync per tag; ReadAsync re-throws any non-cancellation exception, so a + // single failing tag aborted the whole batch and the caller got NO results for + // the tags that did read successfully — even though ReadResult already carries + // a per-tag Success/ErrorMessage shape. After the fix the batch catches per-tag + // exceptions and returns a complete map. + _mockClient.IsConnected.Returns(true); + _mockClient.ReadValueAsync("good1", Arg.Any()) + .Returns((1.0, DateTime.UtcNow, 0u)); + _mockClient.ReadValueAsync("bad", Arg.Any()) + .Returns<(object?, DateTime, uint)>(_ => throw new InvalidOperationException("node not found")); + _mockClient.ReadValueAsync("good2", Arg.Any()) + .Returns((2.0, DateTime.UtcNow, 0u)); + + await _adapter.ConnectAsync(new Dictionary()); + + var results = await _adapter.ReadBatchAsync(["good1", "bad", "good2"]); + + // Every requested tag is present in the result map. + Assert.Equal(3, results.Count); + Assert.True(results["good1"].Success); + Assert.True(results["good2"].Success); + // The failing tag is reported as a failed ReadResult, not by aborting the batch. + Assert.False(results["bad"].Success); + Assert.NotNull(results["bad"].ErrorMessage); + } + [Fact] public async Task NotConnected_ThrowsOnOperations() {