From fccd3274d33b524e0b7ee024d11da9274a7f3bc1 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 16 May 2026 19:40:40 -0400 Subject: [PATCH] =?UTF-8?q?fix(data-connection-layer):=20resolve=20DataCon?= =?UTF-8?q?nectionLayer-002/003/004/005=20=E2=80=94=20Resume=20supervision?= =?UTF-8?q?,=20concurrent=20dicts,=20subscribe-failure=20classification,?= =?UTF-8?q?=20write=20timeout?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- code-reviews/DataConnectionLayer/findings.md | 68 +++++++++-- .../Actors/DataConnectionActor.cs | 88 +++++++++++++-- .../Actors/DataConnectionManagerActor.cs | 20 +++- .../Adapters/RealOpcUaClient.cs | 15 ++- .../DataConnectionActorTests.cs | 106 +++++++++++++++++- .../DataConnectionManagerActorTests.cs | 47 ++++++++ .../OpcUaDataConnectionTests.cs | 31 +++++ 7 files changed, 350 insertions(+), 25 deletions(-) diff --git a/code-reviews/DataConnectionLayer/findings.md b/code-reviews/DataConnectionLayer/findings.md index 62e163d..2fa18f1 100644 --- a/code-reviews/DataConnectionLayer/findings.md +++ b/code-reviews/DataConnectionLayer/findings.md @@ -8,7 +8,7 @@ | Last reviewed | 2026-05-16 | | Reviewer | claude-agent | | Commit reviewed | `9c60592` | -| Open findings | 12 | +| Open findings | 8 | ## Summary @@ -101,7 +101,7 @@ whose message references `DataConnectionLayer-001`. |--|--| | Severity | High | | Category | Akka.NET conventions | -| Status | Open | +| Status | Resolved | | Location | `src/ScadaLink.DataConnectionLayer/Actors/DataConnectionManagerActor.cs:131-141` | **Description** @@ -127,7 +127,20 @@ after a crash and surface the lost-state condition rather than failing silently. **Resolution** -_Unresolved._ +Resolved 2026-05-16. The `DataConnectionManagerActor.SupervisorStrategy` was changed +from `Directive.Restart` to `Directive.Resume` for `DataConnectionActor` failures. +`Resume` keeps the existing actor instance and all its in-memory subscription state +(`_subscriptionsByInstance`, `_subscriptionIds`, `_subscribers`, quality counters) +intact across a transient handler exception, so the design doc's "transparent +re-subscribe" guarantee (WP-10) is preserved. The actor is a long-lived stateful +coordinator and its own Become/Stash reconnect state machine already recovers +connection-level faults — it does not need a restart. This also aligns with the +ScadaLink convention of `Resume` for coordinator actors. Regression test +`DCL002_ConnectionActorCrash_PreservesSubscriptionState` crashes the connection actor +via a synchronously-throwing write and asserts the subscription survives (health +report still shows 1 subscribed/resolved tag); it fails against the pre-fix `Restart` +code and passes after. Fixed by the commit whose message references +`DataConnectionLayer-002` (commit ``). ### DataConnectionLayer-003 — `RealOpcUaClient` callback/monitored-item dictionaries mutated without synchronization @@ -135,7 +148,7 @@ _Unresolved._ |--|--| | Severity | High | | Category | Concurrency & thread safety | -| Status | Open | +| Status | Resolved | | Location | `src/ScadaLink.DataConnectionLayer/Adapters/RealOpcUaClient.cs:16-17,130-131,153,163,173,183-184` | **Description** @@ -162,7 +175,18 @@ threads still read `_callbacks` concurrently with `RemoveSubscriptionAsync` / **Resolution** -_Unresolved._ +Resolved 2026-05-16. `_monitoredItems` and `_callbacks` in `RealOpcUaClient` were +changed from plain `Dictionary<,>` to `ConcurrentDictionary<,>`, and the two +`Remove(key)` call sites switched to `TryRemove`. This makes the maps safe to read +from the OPC Foundation SDK's publish threads (`MonitoredItem.Notification` reading +`_callbacks`) concurrently with subscribe/disconnect mutations on other threads. +`RealOpcUaClient` wraps concrete OPC Foundation SDK types (`ISession`, +`Subscription`, `MonitoredItem`) and cannot be exercised without a live OPC UA +server, so the regression is guarded structurally by +`DCL003_SharedDictionaryFields_AreConcurrentCollections` (a reflection test asserting +both fields are `ConcurrentDictionary<,>`); it fails against the pre-fix `Dictionary` +code and passes after. Fixed by the commit whose message references +`DataConnectionLayer-003` (commit ``). ### DataConnectionLayer-004 — Subscribe-time tag-resolution failure leaves the connection healthy but never recovers correctly @@ -170,7 +194,7 @@ _Unresolved._ |--|--| | Severity | High | | Category | Error handling & resilience | -| Status | Open | +| Status | Resolved | | Location | `src/ScadaLink.DataConnectionLayer/Actors/DataConnectionActor.cs:495-503,529-537` | **Description** @@ -197,7 +221,21 @@ Instance Actor so it reflects the documented behaviour. **Resolution** -_Unresolved._ +Resolved 2026-05-16. The subscribe background task now classifies each subscribe +exception via the new `IsConnectionLevelFailure` helper (`InvalidOperationException` +— thrown by `EnsureConnected()` — plus `SocketException`/`TimeoutException`/ +`IOException` count as connection-level; anything else is a genuine resolution +failure). The classification is carried on `SubscribeTagResult.ConnectionLevelFailure` +and applied on the actor thread in `HandleSubscribeCompleted`: connection-level +failures no longer become unresolved tags and instead drive the reconnection state +machine (`HandleSubscribeCompleted` returns a flag and the Connected-state handler +calls `BecomeReconnecting`); genuine resolution failures still go to `_unresolvedTags` +and the retry timer, and now also push a `TagValueUpdate` with `QualityCode.Bad` to +the subscribing Instance Actor, matching the design doc's Tag Path Resolution step 2. +Regression tests `DCL004_GenuineTagResolutionFailure_PushesBadQualityToSubscriber` +and `DCL004_ConnectionLevelSubscribeFailure_TriggersReconnect_NotTagRetry` both fail +against the pre-fix code and pass after. Fixed by the commit whose message references +`DataConnectionLayer-004` (commit ``). ### DataConnectionLayer-005 — `WriteTimeout` option is documented and configured but never applied @@ -205,7 +243,7 @@ _Unresolved._ |--|--| | Severity | High | | Category | Design-document adherence | -| Status | Open | +| Status | Resolved | | Location | `src/ScadaLink.DataConnectionLayer/DataConnectionOptions.cs:15`, `src/ScadaLink.DataConnectionLayer/Actors/DataConnectionActor.cs:573-590` | **Description** @@ -229,7 +267,19 @@ the initial-value seed and to `WriteBatchAndWaitAsync` paths if they are reachab **Resolution** -_Unresolved._ +Resolved 2026-05-16. `HandleWrite` now creates a `CancellationTokenSource(_options.WriteTimeout)`, +passes its token to `_adapter.WriteAsync(...)`, and disposes the source in the +continuation. A cancelled/timed-out write (`Task.IsCanceled` or a base +`OperationCanceledException`) is translated into a failed `WriteTagResponse` with a +`"Write timeout after Ns"` message, so a hung device write is bounded and the failure +is returned synchronously to the calling script (WP-11) instead of blocking until the +script's own Ask-timeout. (The `WriteBatchAndWaitAsync` adapter path already accepts +an explicit `timeout`/`CancellationToken` and is not invoked by `HandleWrite`, so no +change was needed there.) Regression test +`DCL005_Write_ThatHangs_TimesOutAndReturnsFailureSynchronously` uses an adapter whose +`WriteAsync` only completes when its token fires; it fails against the pre-fix +unbounded code and passes after. Fixed by the commit whose message references +`DataConnectionLayer-005` (commit ``). ### DataConnectionLayer-006 — Health quality counters not reset/recomputed after failover or re-subscribe diff --git a/src/ScadaLink.DataConnectionLayer/Actors/DataConnectionActor.cs b/src/ScadaLink.DataConnectionLayer/Actors/DataConnectionActor.cs index 810dcb5..75ee993 100644 --- a/src/ScadaLink.DataConnectionLayer/Actors/DataConnectionActor.cs +++ b/src/ScadaLink.DataConnectionLayer/Actors/DataConnectionActor.cs @@ -213,7 +213,13 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers HandleSubscribe(req); break; case SubscribeCompleted sc: - HandleSubscribeCompleted(sc); + // In Connected state, a connection-level subscribe failure must drive + // the reconnection state machine (DataConnectionLayer-004). + if (HandleSubscribeCompleted(sc)) + { + _log.Warning("[{0}] Connection-level subscribe failure — entering Reconnecting", _connectionName); + BecomeReconnecting(); + } break; case UnsubscribeTagsRequest req: HandleUnsubscribe(req); @@ -514,8 +520,14 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers } catch (Exception ex) { - // WP-12: Tag path resolution failure — reported back as unresolved. - results.Add(new SubscribeTagResult(tagPath, AlreadySubscribed: false, Success: false, null, ex.Message)); + // DataConnectionLayer-004: distinguish a connection-level fault + // (adapter not connected / transport down) from a genuine + // node-not-found. Connection-level faults must drive the + // reconnection state machine, not be retried as unresolved tags. + var connectionLevel = IsConnectionLevelFailure(ex); + results.Add(new SubscribeTagResult( + tagPath, AlreadySubscribed: false, Success: false, null, ex.Message, + ConnectionLevelFailure: connectionLevel)); } } @@ -546,8 +558,10 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers /// Applies the result of an asynchronous subscribe on the actor thread. ALL mutation /// of subscription state and counters happens here — never on the background task — /// so the actor model's single-threaded state guarantee holds. + /// Returns true if any tag failed at connection level (DataConnectionLayer-004), + /// signalling the caller (only the Connected state) to enter Reconnecting. /// - private void HandleSubscribeCompleted(SubscribeCompleted msg) + private bool HandleSubscribeCompleted(SubscribeCompleted msg) { var instanceName = msg.Request.InstanceUniqueName; if (!_subscriptionsByInstance.TryGetValue(instanceName, out var instanceTags)) @@ -557,6 +571,12 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers _subscriptionsByInstance[instanceName] = instanceTags; } + // DataConnectionLayer-004: if any tag failed because the adapter is not + // connected (a connection-level fault), the subscribe needs the reconnection + // state machine, not the tag-resolution retry. Drive a disconnect and let the + // request be re-stashed/retried after reconnect via ReSubscribeAll. + var connectionLevelFailure = msg.Results.Any(r => !r.Success && r.ConnectionLevelFailure); + foreach (var result in msg.Results) { instanceTags.Add(result.TagPath); @@ -572,13 +592,31 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers _totalSubscribed++; _resolvedTags++; } + else if (result.ConnectionLevelFailure) + { + // Connection-level fault — do not count as an unresolved tag. + // ReSubscribeAll after reconnect derives the tag from + // _subscriptionsByInstance (already updated above). + _log.Warning("[{0}] Subscribe for {1} failed at connection level: {2}", + _connectionName, result.TagPath, result.Error); + } else { - // WP-12: mark unresolved so the periodic retry timer picks it up. + // WP-12: genuine tag resolution failure — mark unresolved so the + // periodic retry timer picks it up. _unresolvedTags.Add(result.TagPath); _totalSubscribed++; _log.Debug("[{0}] Tag resolution failed for {1}: {2}", _connectionName, result.TagPath, result.Error); + + // DataConnectionLayer-004 / design doc Tag Path Resolution step 2: + // mark the attribute quality `bad` so the Instance Actor sees a + // signal rather than staying Uncertain indefinitely. + if (_subscribers.TryGetValue(instanceName, out var subscriber)) + { + subscriber.Tell(new TagValueUpdate( + _connectionName, result.TagPath, null, QualityCode.Bad, DateTimeOffset.UtcNow)); + } } } @@ -594,6 +632,27 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers msg.ReplyTo.Tell(new SubscribeTagsResponse( msg.Request.CorrelationId, instanceName, true, null, DateTimeOffset.UtcNow)); + + // The caller (Connected state only) decides whether to enter Reconnecting. + // In Connecting/Reconnecting the connection is not established anyway, so the + // existing reconnect cycle handles recovery without a re-trigger here. + return connectionLevelFailure; + } + + /// + /// DataConnectionLayer-004: classifies a subscribe exception as a connection-level + /// fault (adapter not connected / transport down) versus a genuine tag-resolution + /// failure (the node does not exist on the device). Connection-level faults must + /// drive the reconnection state machine; resolution failures are retried on the + /// tag-resolution timer. + /// + private static bool IsConnectionLevelFailure(Exception ex) + { + var baseEx = ex is AggregateException agg ? agg.GetBaseException() : ex; + return baseEx is InvalidOperationException + or System.Net.Sockets.SocketException + or TimeoutException + or System.IO.IOException; } private void HandleUnsubscribe(UnsubscribeTagsRequest request) @@ -634,15 +693,29 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers _log.Debug("[{0}] Writing to tag {1}", _connectionName, request.TagPath); var sender = Sender; + // DataConnectionLayer-005: bound the write with WriteTimeout. A hung device + // write (TCP black-hole) would otherwise never complete, so PipeTo never + // fires and the calling script gets no DCL-level error. The CancellationToken + // is passed to the adapter; on timeout we translate cancellation into a + // failed WriteTagResponse so the failure is returned synchronously (WP-11). + var cts = new CancellationTokenSource(_options.WriteTimeout); + // WP-11: Write through DCL to device, failure returned synchronously - _adapter.WriteAsync(request.TagPath, request.Value).ContinueWith(t => + _adapter.WriteAsync(request.TagPath, request.Value, cts.Token).ContinueWith(t => { + cts.Dispose(); if (t.IsCompletedSuccessfully) { var result = t.Result; return new WriteTagResponse( request.CorrelationId, result.Success, result.ErrorMessage, DateTimeOffset.UtcNow); } + if (t.IsCanceled || t.Exception?.GetBaseException() is OperationCanceledException) + { + return new WriteTagResponse( + request.CorrelationId, false, + $"Write timeout after {_options.WriteTimeout.TotalSeconds:F0}s", DateTimeOffset.UtcNow); + } return new WriteTagResponse( request.CorrelationId, false, t.Exception?.GetBaseException().Message, DateTimeOffset.UtcNow); }).PipeTo(sender); @@ -824,7 +897,8 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers internal record TagResolutionSucceeded(string TagPath, string SubscriptionId); internal record RetryTagResolution; internal record SubscribeTagResult( - string TagPath, bool AlreadySubscribed, bool Success, string? SubscriptionId, string? Error); + string TagPath, bool AlreadySubscribed, bool Success, string? SubscriptionId, string? Error, + bool ConnectionLevelFailure = false); internal record SubscribeCompleted( SubscribeTagsRequest Request, IActorRef ReplyTo, IReadOnlyList Results); public record GetHealthReport; diff --git a/src/ScadaLink.DataConnectionLayer/Actors/DataConnectionManagerActor.cs b/src/ScadaLink.DataConnectionLayer/Actors/DataConnectionManagerActor.cs index 505f028..e9697b5 100644 --- a/src/ScadaLink.DataConnectionLayer/Actors/DataConnectionManagerActor.cs +++ b/src/ScadaLink.DataConnectionLayer/Actors/DataConnectionManagerActor.cs @@ -125,8 +125,20 @@ public class DataConnectionManagerActor : ReceiveActor } /// - /// OneForOneStrategy with Restart for connection actors — a failed connection - /// should restart and attempt reconnection. + /// OneForOneStrategy with Resume for connection actors. + /// + /// DataConnectionLayer-002: a DataConnectionActor is a long-lived, stateful + /// coordinator — its in-memory subscription registry (_subscriptionsByInstance, + /// _subscriptionIds, _subscribers) is the only record of which Instance Actors + /// subscribed to which tags, and there is no durable store to rebuild it from. + /// Restart would create a fresh instance and silently discard that registry, + /// breaking the design doc's "transparent re-subscribe" guarantee (WP-10): + /// subscribers would never be re-subscribed and would sit at stale quality with + /// no error. Resume keeps the actor instance and its state intact, so a transient + /// exception in a message handler does not lose subscription state. The actor's + /// own Become/Stash reconnect state machine already recovers connection-level + /// faults, so it does not need a restart to re-establish the connection. + /// This matches the ScadaLink convention of Resume for coordinator actors. /// protected override SupervisorStrategy SupervisorStrategy() { @@ -135,8 +147,8 @@ public class DataConnectionManagerActor : ReceiveActor withinTimeRange: TimeSpan.FromMinutes(1), decider: Decider.From(ex => { - _log.Warning(ex, "DataConnectionActor threw exception, restarting"); - return Directive.Restart; + _log.Warning(ex, "DataConnectionActor threw exception, resuming (subscription state preserved)"); + return Directive.Resume; })); } } diff --git a/src/ScadaLink.DataConnectionLayer/Adapters/RealOpcUaClient.cs b/src/ScadaLink.DataConnectionLayer/Adapters/RealOpcUaClient.cs index d3d2f08..d3552d5 100644 --- a/src/ScadaLink.DataConnectionLayer/Adapters/RealOpcUaClient.cs +++ b/src/ScadaLink.DataConnectionLayer/Adapters/RealOpcUaClient.cs @@ -1,3 +1,4 @@ +using System.Collections.Concurrent; using System.Security.Cryptography.X509Certificates; using Opc.Ua; using Opc.Ua.Client; @@ -13,8 +14,14 @@ public class RealOpcUaClient : IOpcUaClient { private ISession? _session; private Subscription? _subscription; - private readonly Dictionary _monitoredItems = new(); - private readonly Dictionary> _callbacks = new(); + + // DataConnectionLayer-003: these maps are read from the OPC Foundation SDK's + // internal publish threads (the MonitoredItem.Notification handler reads + // _callbacks) concurrently with subscribe/disconnect mutations that run on + // thread-pool threads. Plain Dictionary access during a concurrent resize or + // Clear() is undefined behaviour, so they must be ConcurrentDictionary. + private readonly ConcurrentDictionary _monitoredItems = new(); + private readonly ConcurrentDictionary> _callbacks = new(); private volatile bool _connectionLostFired; private OpcUaConnectionOptions _options = new(); private readonly OpcUaGlobalOptions _globalOptions; @@ -180,8 +187,8 @@ public class RealOpcUaClient : IOpcUaClient { _subscription.RemoveItem(item); await _subscription.ApplyChangesAsync(cancellationToken); - _monitoredItems.Remove(subscriptionHandle); - _callbacks.Remove(subscriptionHandle); + _monitoredItems.TryRemove(subscriptionHandle, out _); + _callbacks.TryRemove(subscriptionHandle, out _); } } diff --git a/tests/ScadaLink.DataConnectionLayer.Tests/DataConnectionActorTests.cs b/tests/ScadaLink.DataConnectionLayer.Tests/DataConnectionActorTests.cs index 1d05a69..58a3891 100644 --- a/tests/ScadaLink.DataConnectionLayer.Tests/DataConnectionActorTests.cs +++ b/tests/ScadaLink.DataConnectionLayer.Tests/DataConnectionActorTests.cs @@ -512,6 +512,106 @@ public class DataConnectionActorTests : TestKit Assert.Equal(instances * tagsPerInstance, report.ResolvedTags); } + // ── DataConnectionLayer-004: subscribe-time failure classification ── + + [Fact] + public async Task DCL004_GenuineTagResolutionFailure_PushesBadQualityToSubscriber() + { + // Regression test for DataConnectionLayer-004. When a tag genuinely fails to + // resolve at subscribe time, the design doc (Tag Path Resolution, step 2) + // requires the attribute to be marked quality `bad`. The pre-fix code only + // logged and added the tag to _unresolvedTags — the Instance Actor never got + // a signal. After the fix, a bad-quality TagValueUpdate is pushed. + _mockAdapter.ConnectAsync(Arg.Any>(), Arg.Any()) + .Returns(Task.CompletedTask); + _mockAdapter.Status.Returns(ConnectionHealth.Connected); + // Genuine node-not-found: a non-connection exception. + _mockAdapter.SubscribeAsync("missing/tag", Arg.Any(), Arg.Any()) + .Returns(Task.FromException(new KeyNotFoundException("node not found"))); + _mockAdapter.ReadAsync(Arg.Any(), Arg.Any()) + .Returns(new ReadResult(false, null, null)); + + var actor = CreateConnectionActor("dcl004-bad-quality"); + await Task.Delay(300); + + actor.Tell(new SubscribeTagsRequest( + "c1", "inst1", "dcl004-bad-quality", ["missing/tag"], DateTimeOffset.UtcNow)); + + // Two messages arrive: the subscribe ack and a bad-quality update for the tag. + var bad = ExpectMsg(TimeSpan.FromSeconds(5)); + Assert.Equal("missing/tag", bad.TagPath); + Assert.Equal(QualityCode.Bad, bad.Quality); + + var ack = ExpectMsg(TimeSpan.FromSeconds(5)); + Assert.True(ack.Success); + } + + [Fact] + public async Task DCL004_ConnectionLevelSubscribeFailure_TriggersReconnect_NotTagRetry() + { + // Regression test for DataConnectionLayer-004. A subscribe failing because the + // adapter is not connected (InvalidOperationException from EnsureConnected) is + // a connection problem, not a bad tag path. The pre-fix code misclassified it + // as an unresolved tag and retried it on the 10s tag-resolution timer. After + // the fix it drives the reconnection state machine instead. + _mockAdapter.ConnectAsync(Arg.Any>(), Arg.Any()) + .Returns(Task.CompletedTask); + _mockAdapter.Status.Returns(ConnectionHealth.Connected); + _mockAdapter.SubscribeAsync(Arg.Any(), Arg.Any(), Arg.Any()) + .Returns(Task.FromException( + new InvalidOperationException("OPC UA client is not connected."))); + _mockAdapter.ReadAsync(Arg.Any(), Arg.Any()) + .Returns(new ReadResult(false, null, null)); + + var actor = CreateConnectionActor("dcl004-conn-level"); + await Task.Delay(300); + + actor.Tell(new SubscribeTagsRequest( + "c1", "inst1", "dcl004-conn-level", ["some/tag"], DateTimeOffset.UtcNow)); + + // The connection-level failure must drive the actor into Reconnecting, which + // re-attempts ConnectAsync. Pre-fix the actor stayed Connected and only armed + // the tag-resolution timer, so ConnectAsync is called exactly once. + AwaitCondition(() => + _mockAdapter.ReceivedCalls().Count(c => c.GetMethodInfo().Name == "ConnectAsync") >= 2, + TimeSpan.FromSeconds(5)); + } + + // ── DataConnectionLayer-005: WriteTimeout must bound a hung write ── + + [Fact] + public async Task DCL005_Write_ThatHangs_TimesOutAndReturnsFailureSynchronously() + { + // Regression test for DataConnectionLayer-005. HandleWrite called WriteAsync + // with no CancellationToken and no timeout, so a hung device write never + // produced a WriteTagResponse. The calling script would block until its own + // Ask-timeout with no DCL-level error. After the fix, _options.WriteTimeout + // bounds the write and a timeout is surfaced as a failed WriteTagResponse. + _options.WriteTimeout = TimeSpan.FromMilliseconds(300); + + _mockAdapter.ConnectAsync(Arg.Any>(), Arg.Any()) + .Returns(Task.CompletedTask); + + // WriteAsync never completes unless its cancellation token fires. + _mockAdapter.WriteAsync("tag1", 42, Arg.Any()) + .Returns(ci => + { + var ct = ci.Arg(); + var tcs = new TaskCompletionSource(); + ct.Register(() => tcs.TrySetCanceled(ct)); + return tcs.Task; + }); + + var actor = CreateConnectionActor("dcl005-write-timeout"); + await Task.Delay(300); // reach Connected state + + actor.Tell(new WriteTagRequest("corr1", "dcl005-write-timeout", "tag1", 42, DateTimeOffset.UtcNow)); + + var response = ExpectMsg(TimeSpan.FromSeconds(3)); + Assert.False(response.Success); + Assert.Contains("timeout", response.ErrorMessage, StringComparison.OrdinalIgnoreCase); + } + [Fact] public async Task DCL001_SubscribeWithFailedTags_CountsResolvedAndUnresolvedSeparately() { @@ -533,7 +633,11 @@ public class DataConnectionActorTests : TestKit actor.Tell(new SubscribeTagsRequest( "c1", "inst1", "dcl001-failed-tags", ["good/a", "good/b", "good/c", "bad/x", "bad/y"], DateTimeOffset.UtcNow)); - ExpectMsg(TimeSpan.FromSeconds(5)); + + // Two genuine resolution failures now also push a bad-quality TagValueUpdate + // to the subscriber (DataConnectionLayer-004); skip past those to the ack. + var ack = FishForMessage(_ => true, TimeSpan.FromSeconds(5)); + Assert.True(ack.Success); actor.Tell(new DataConnectionActor.GetHealthReport()); var report = ExpectMsg(TimeSpan.FromSeconds(3)); diff --git a/tests/ScadaLink.DataConnectionLayer.Tests/DataConnectionManagerActorTests.cs b/tests/ScadaLink.DataConnectionLayer.Tests/DataConnectionManagerActorTests.cs index 7cec277..3b1906e 100644 --- a/tests/ScadaLink.DataConnectionLayer.Tests/DataConnectionManagerActorTests.cs +++ b/tests/ScadaLink.DataConnectionLayer.Tests/DataConnectionManagerActorTests.cs @@ -3,6 +3,7 @@ using Akka.TestKit.Xunit2; using NSubstitute; using ScadaLink.Commons.Interfaces.Protocol; using ScadaLink.Commons.Messages.DataConnection; +using ScadaLink.Commons.Types.Enums; using ScadaLink.DataConnectionLayer.Actors; using ScadaLink.HealthMonitoring; @@ -57,6 +58,52 @@ public class DataConnectionManagerActorTests : TestKit Assert.Contains("Unknown connection", response.ErrorMessage); } + [Fact] + public async Task DCL002_ConnectionActorCrash_PreservesSubscriptionState() + { + // Regression test for DataConnectionLayer-002. The supervisor used + // Directive.Restart, which discards the connection actor's in-memory + // subscription registry — breaking the design doc's "transparent + // re-subscribe" guarantee (subscribers are never re-subscribed and sit at + // stale quality forever). After the fix the supervisor uses Resume, which + // keeps the actor instance and its state across a transient exception. + var mockAdapter = Substitute.For(); + mockAdapter.ConnectAsync(Arg.Any>(), Arg.Any()) + .Returns(Task.CompletedTask); + mockAdapter.Status.Returns(ConnectionHealth.Connected); + mockAdapter.SubscribeAsync(Arg.Any(), Arg.Any(), Arg.Any()) + .Returns("sub-001"); + mockAdapter.ReadAsync(Arg.Any(), Arg.Any()) + .Returns(new ReadResult(false, null, null)); + // A write throws synchronously, escaping the message handler and crashing + // the connection actor — exercising the supervisor strategy. + mockAdapter.WriteAsync(Arg.Any(), Arg.Any(), Arg.Any()) + .Returns>(_ => throw new InvalidOperationException("boom")); + + _mockFactory.Create("OpcUa", Arg.Any>()).Returns(mockAdapter); + + var manager = Sys.ActorOf(Props.Create(() => + new DataConnectionManagerActor(_mockFactory, _options, _mockHealthCollector))); + + manager.Tell(new CreateConnectionCommand("conn1", "OpcUa", new Dictionary(), null, 3)); + await Task.Delay(300); // connection actor reaches Connected + + // Register a subscription. + manager.Tell(new SubscribeTagsRequest("c1", "inst1", "conn1", ["tag1"], DateTimeOffset.UtcNow)); + ExpectMsg(TimeSpan.FromSeconds(3)); + + // Crash the connection actor via a synchronously-throwing write. + manager.Tell(new WriteTagRequest("c2", "conn1", "tag1", 42, DateTimeOffset.UtcNow)); + await Task.Delay(300); // supervisor handles the failure + + // After the crash the subscription state must survive: the health report + // still shows the subscribed/resolved tag. With Restart it would be 0. + manager.Tell(new GetAllHealthReports()); + var report = ExpectMsg(TimeSpan.FromSeconds(3)); + Assert.Equal(1, report.TotalSubscribedTags); + Assert.Equal(1, report.ResolvedTags); + } + [Fact] public void CreateConnection_UsesFactory() { diff --git a/tests/ScadaLink.DataConnectionLayer.Tests/OpcUaDataConnectionTests.cs b/tests/ScadaLink.DataConnectionLayer.Tests/OpcUaDataConnectionTests.cs index d39e2be..2909ae3 100644 --- a/tests/ScadaLink.DataConnectionLayer.Tests/OpcUaDataConnectionTests.cs +++ b/tests/ScadaLink.DataConnectionLayer.Tests/OpcUaDataConnectionTests.cs @@ -6,6 +6,37 @@ using ScadaLink.DataConnectionLayer.Adapters; namespace ScadaLink.DataConnectionLayer.Tests; +/// +/// DataConnectionLayer-003: structural regression guard. RealOpcUaClient's +/// monitored-item / callback maps are read from the OPC UA SDK's publish threads +/// concurrently with subscribe/disconnect mutations on other threads. They must be +/// concurrent collections, not plain Dictionary. This is verified structurally +/// because RealOpcUaClient wraps concrete OPC Foundation SDK types and cannot be +/// exercised without a live OPC UA server. +/// +public class RealOpcUaClientThreadSafetyTests +{ + [Theory] + [InlineData("_callbacks")] + [InlineData("_monitoredItems")] + public void DCL003_SharedDictionaryFields_AreConcurrentCollections(string fieldName) + { + var field = typeof(RealOpcUaClient) + .GetField(fieldName, + System.Reflection.BindingFlags.Instance | + System.Reflection.BindingFlags.NonPublic); + + Assert.NotNull(field); + + var fieldType = field!.FieldType; + Assert.True( + fieldType.IsGenericType && + fieldType.GetGenericTypeDefinition() == typeof(System.Collections.Concurrent.ConcurrentDictionary<,>), + $"RealOpcUaClient.{fieldName} must be a ConcurrentDictionary<,> for thread safety, " + + $"but was {fieldType.Name}."); + } +} + /// /// WP-7: Tests for OPC UA adapter. ///