using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; using NSubstitute; using ScadaLink.Commons.Interfaces.Protocol; using ScadaLink.Commons.Types.Enums; using ScadaLink.DataConnectionLayer; using ScadaLink.DataConnectionLayer.Adapters; namespace ScadaLink.DataConnectionLayer.Tests; /// /// DataConnectionLayer-003: structural regression guard. RealOpcUaClient's /// monitored-item / callback maps are read from the OPC UA SDK's publish threads /// concurrently with subscribe/disconnect mutations on other threads. They must be /// concurrent collections, not plain Dictionary. This is verified structurally /// because RealOpcUaClient wraps concrete OPC Foundation SDK types and cannot be /// exercised without a live OPC UA server. /// public class RealOpcUaClientThreadSafetyTests { [Theory] [InlineData("_callbacks")] [InlineData("_monitoredItems")] public void DCL003_SharedDictionaryFields_AreConcurrentCollections(string fieldName) { var field = typeof(RealOpcUaClient) .GetField(fieldName, System.Reflection.BindingFlags.Instance | System.Reflection.BindingFlags.NonPublic); Assert.NotNull(field); var fieldType = field!.FieldType; Assert.True( fieldType.IsGenericType && fieldType.GetGenericTypeDefinition() == typeof(System.Collections.Concurrent.ConcurrentDictionary<,>), $"RealOpcUaClient.{fieldName} must be a ConcurrentDictionary<,> for thread safety, " + $"but was {fieldType.Name}."); } } /// /// DataConnectionLayer-012: secure-by-default certificate handling. /// public class OpcUaCertificateDefaultTests { [Fact] public void DCL012_OpcUaConnectionOptions_AutoAcceptUntrustedCerts_DefaultsToFalse() { // Regression test for DataConnectionLayer-012. AutoAcceptUntrustedCerts defaulted // to true, accepting every server certificate unconditionally and defeating the // Sign / SignAndEncrypt security modes against an active man-in-the-middle. A // secure-by-default posture rejects untrusted certs unless explicitly opted in. var options = new OpcUaConnectionOptions(); Assert.False(options.AutoAcceptUntrustedCerts); } } /// /// DataConnectionLayer-014: the DCL-012 auto-accept-certificate security warning is /// only effective if RealOpcUaClient is built with a real logger. The only production /// path that constructs one is RealOpcUaClientFactory.Create(); that factory must /// thread a logger through, otherwise the warning sinks into NullLogger and an /// operator who enables AutoAcceptUntrustedCerts sees no signal anywhere. /// public class RealOpcUaClientFactoryLoggerTests { private static ILogger ReadLogger(RealOpcUaClient client) { var field = typeof(RealOpcUaClient).GetField("_logger", System.Reflection.BindingFlags.Instance | System.Reflection.BindingFlags.NonPublic); Assert.NotNull(field); return (ILogger)field!.GetValue(client)!; } [Fact] public void DCL014_RealOpcUaClientFactory_CreatesClientWithRealLogger() { // Regression test for DataConnectionLayer-014. RealOpcUaClientFactory.Create() // constructed `new RealOpcUaClient(_globalOptions)` with no logger, so the // DCL-012 man-in-the-middle warning was always discarded by NullLogger in // production. After the fix the factory accepts an ILoggerFactory and passes a // real ILogger into every client it creates. using var loggerFactory = LoggerFactory.Create(b => { }); var factory = new RealOpcUaClientFactory(new OpcUaGlobalOptions(), loggerFactory); var client = factory.Create(); var logger = ReadLogger((RealOpcUaClient)client); Assert.NotSame(NullLogger.Instance, logger); } [Fact] public void DCL014_DataConnectionFactory_ThreadsLoggerToRealOpcUaClient() { // The full production wiring: DataConnectionFactory holds an ILoggerFactory and // registers the OpcUa adapter. The RealOpcUaClient it ultimately builds must end // up with a real (non-Null) logger so the auto-accept-cert warning is visible. using var loggerFactory = LoggerFactory.Create(b => { }); var dataConnectionFactory = new DataConnectionFactory(loggerFactory); var adapter = (OpcUaDataConnection)dataConnectionFactory.Create( "OpcUa", new Dictionary()); // Reach the RealOpcUaClient the adapter would create on connect. var clientFactoryField = typeof(OpcUaDataConnection).GetField("_clientFactory", System.Reflection.BindingFlags.Instance | System.Reflection.BindingFlags.NonPublic); Assert.NotNull(clientFactoryField); var clientFactory = (RealOpcUaClientFactory)clientFactoryField!.GetValue(adapter)!; var client = (RealOpcUaClient)clientFactory.Create(); var logger = ReadLogger(client); Assert.NotSame(NullLogger.Instance, logger); } } /// /// DataConnectionLayer-009: failover-stability tunables must be configurable. /// public class DataConnectionOptionsStabilityTests { [Fact] public void DCL009_StableConnectionThreshold_IsConfigurable_WithSixtySecondDefault() { // Regression test for DataConnectionLayer-009. The unstable-disconnect failover // path used a hard-coded 60s StableConnectionThreshold constant inside // DataConnectionActor. It must live on DataConnectionOptions like the other // tunables (ReconnectInterval, TagResolutionRetryInterval, WriteTimeout) so it // is configurable via appsettings.json. var options = new DataConnectionOptions(); Assert.Equal(TimeSpan.FromSeconds(60), options.StableConnectionThreshold); options.StableConnectionThreshold = TimeSpan.FromSeconds(30); Assert.Equal(TimeSpan.FromSeconds(30), options.StableConnectionThreshold); } } /// /// WP-7: Tests for OPC UA adapter. /// public class OpcUaDataConnectionTests { private readonly IOpcUaClient _mockClient; private readonly IOpcUaClientFactory _mockFactory; private readonly OpcUaDataConnection _adapter; public OpcUaDataConnectionTests() { _mockClient = Substitute.For(); _mockFactory = Substitute.For(); _mockFactory.Create().Returns(_mockClient); _adapter = new OpcUaDataConnection(_mockFactory, NullLogger.Instance); } [Fact] public async Task Connect_SetsStatusToConnected() { _mockClient.IsConnected.Returns(true); await _adapter.ConnectAsync(new Dictionary { ["EndpointUrl"] = "opc.tcp://localhost:4840" }); Assert.Equal(ConnectionHealth.Connected, _adapter.Status); await _mockClient.Received(1).ConnectAsync("opc.tcp://localhost:4840", Arg.Any(), Arg.Any()); } [Fact] public async Task Disconnect_SetsStatusToDisconnected() { _mockClient.IsConnected.Returns(true); await _adapter.ConnectAsync(new Dictionary()); await _adapter.DisconnectAsync(); Assert.Equal(ConnectionHealth.Disconnected, _adapter.Status); } [Fact] public async Task DCL013_ConcurrentConnectionLost_RaisesDisconnectedExactlyOnce() { // Regression test for DataConnectionLayer-013. RaiseDisconnected used a // non-atomic check-then-set on a volatile bool: two threads racing through it // (e.g. the keep-alive thread and a ReadAsync failure path, both routed via // OnClientConnectionLost) could both observe _disconnectFired == false and both // invoke Disconnected. The guard is now an atomic Interlocked.Exchange, so a // burst of concurrent connection-lost callbacks fires the event exactly once. // Repeat the burst: reconnecting between rounds re-arms the guard, so each // round must independently fire Disconnected exactly once. Repetition makes // the (timing-dependent) non-atomic race overwhelmingly likely to be caught. const int rounds = 25; const int threads = 32; for (var round = 0; round < rounds; round++) { _mockClient.IsConnected.Returns(true); await _adapter.ConnectAsync(new Dictionary()); var fired = 0; void Handler() => Interlocked.Increment(ref fired); _adapter.Disconnected += Handler; // Fan out: many threads raise the client's ConnectionLost event together. using (var ready = new Barrier(threads)) { var tasks = Enumerable.Range(0, threads).Select(_ => Task.Run(() => { ready.SignalAndWait(); _mockClient.ConnectionLost += Raise.Event(); })).ToArray(); await Task.WhenAll(tasks); } _adapter.Disconnected -= Handler; Assert.Equal(1, fired); } } [Fact] public async Task Subscribe_DelegatesAndReturnsId() { _mockClient.IsConnected.Returns(true); _mockClient.CreateSubscriptionAsync(Arg.Any(), Arg.Any>(), Arg.Any()) .Returns("sub-001"); await _adapter.ConnectAsync(new Dictionary()); var subId = await _adapter.SubscribeAsync("ns=2;s=Tag1", (_, _) => { }); Assert.Equal("sub-001", subId); } [Fact] public async Task Write_Success_ReturnsGoodResult() { _mockClient.IsConnected.Returns(true); _mockClient.WriteValueAsync("ns=2;s=Tag1", 42, Arg.Any()) .Returns((uint)0); await _adapter.ConnectAsync(new Dictionary()); var result = await _adapter.WriteAsync("ns=2;s=Tag1", 42); Assert.True(result.Success); Assert.Null(result.ErrorMessage); } [Fact] public async Task Write_Failure_ReturnsError() { _mockClient.IsConnected.Returns(true); _mockClient.WriteValueAsync("ns=2;s=Tag1", 42, Arg.Any()) .Returns(0x80000000u); await _adapter.ConnectAsync(new Dictionary()); var result = await _adapter.WriteAsync("ns=2;s=Tag1", 42); Assert.False(result.Success); Assert.Contains("0x80000000", result.ErrorMessage); } [Fact] public async Task Read_BadStatus_ReturnsBadResult() { _mockClient.IsConnected.Returns(true); _mockClient.ReadValueAsync("ns=2;s=Tag1", Arg.Any()) .Returns((null, DateTime.UtcNow, 0x80000000u)); await _adapter.ConnectAsync(new Dictionary()); var result = await _adapter.ReadAsync("ns=2;s=Tag1"); Assert.False(result.Success); } [Fact] public async Task Read_GoodStatus_ReturnsValue() { _mockClient.IsConnected.Returns(true); _mockClient.ReadValueAsync("ns=2;s=Tag1", Arg.Any()) .Returns((42.5, DateTime.UtcNow, 0u)); await _adapter.ConnectAsync(new Dictionary()); var result = await _adapter.ReadAsync("ns=2;s=Tag1"); Assert.True(result.Success); Assert.NotNull(result.Value); Assert.Equal(42.5, result.Value!.Value); Assert.Equal(QualityCode.Good, result.Value.Quality); } [Fact] public async Task ReadBatch_ReadsAllTags() { _mockClient.IsConnected.Returns(true); _mockClient.ReadValueAsync(Arg.Any(), Arg.Any()) .Returns((1.0, DateTime.UtcNow, 0u)); await _adapter.ConnectAsync(new Dictionary()); var results = await _adapter.ReadBatchAsync(["tag1", "tag2", "tag3"]); Assert.Equal(3, results.Count); Assert.All(results.Values, r => Assert.True(r.Success)); } [Fact] public async Task DCL007_ReadBatch_ReturnsPerTagResults_WhenOneTagFails() { // Regression test for DataConnectionLayer-007. ReadBatchAsync looped calling // ReadAsync per tag; ReadAsync re-throws any non-cancellation exception, so a // single failing tag aborted the whole batch and the caller got NO results for // the tags that did read successfully — even though ReadResult already carries // a per-tag Success/ErrorMessage shape. After the fix the batch catches per-tag // exceptions and returns a complete map. _mockClient.IsConnected.Returns(true); _mockClient.ReadValueAsync("good1", Arg.Any()) .Returns((1.0, DateTime.UtcNow, 0u)); _mockClient.ReadValueAsync("bad", Arg.Any()) .Returns<(object?, DateTime, uint)>(_ => throw new InvalidOperationException("node not found")); _mockClient.ReadValueAsync("good2", Arg.Any()) .Returns((2.0, DateTime.UtcNow, 0u)); await _adapter.ConnectAsync(new Dictionary()); var results = await _adapter.ReadBatchAsync(["good1", "bad", "good2"]); // Every requested tag is present in the result map. Assert.Equal(3, results.Count); Assert.True(results["good1"].Success); Assert.True(results["good2"].Success); // The failing tag is reported as a failed ReadResult, not by aborting the batch. Assert.False(results["bad"].Success); Assert.NotNull(results["bad"].ErrorMessage); } [Fact] public async Task DCL017_WriteBatch_ReturnsPerTagResults_WhenConnectionDropsMidBatch() { // Regression test for DataConnectionLayer-017. WriteBatchAsync looped calling // WriteAsync per tag; WriteAsync first calls EnsureConnected(), which throws // InvalidOperationException when the client is disconnected. WriteBatchAsync did // not catch that, so a connection dropping partway through a batch made the whole // WriteBatchAsync throw — the caller lost the per-tag outcomes for the tags that // already wrote. After the fix (mirroring DCL-007's ReadBatchAsync) each per-tag // failure is recorded as a failed WriteResult and the batch returns a complete map. var writeCount = 0; // First write succeeds; then the client "disconnects" so EnsureConnected throws. _mockClient.IsConnected.Returns(_ => Interlocked.Increment(ref writeCount) <= 1); _mockClient.WriteValueAsync(Arg.Any(), Arg.Any(), Arg.Any()) .Returns((uint)0); // Connect leaves IsConnected true for the first WriteAsync's EnsureConnected check. _mockClient.IsConnected.Returns(true); await _adapter.ConnectAsync(new Dictionary()); // Re-arm: IsConnected true for tag1's check, false for tag2 and tag3. var checks = 0; _mockClient.IsConnected.Returns(_ => Interlocked.Increment(ref checks) <= 1); var results = await _adapter.WriteBatchAsync(new Dictionary { ["tag1"] = 1, ["tag2"] = 2, ["tag3"] = 3 }); // Every requested tag is present in the result map — the batch was not aborted. Assert.Equal(3, results.Count); Assert.True(results["tag1"].Success); // tag2 and tag3 fail at the connection check but are reported per-tag. Assert.False(results["tag2"].Success); Assert.NotNull(results["tag2"].ErrorMessage); Assert.False(results["tag3"].Success); Assert.NotNull(results["tag3"].ErrorMessage); } [Fact] public async Task DCL017_WriteBatch_CancellationAbortsWholeBatch() { // Companion guard: a cancelled batch must still abort as a whole — only // connection/device faults are demoted to per-tag results, never cancellation. _mockClient.IsConnected.Returns(true); await _adapter.ConnectAsync(new Dictionary()); using var cts = new CancellationTokenSource(); cts.Cancel(); _mockClient.WriteValueAsync(Arg.Any(), Arg.Any(), Arg.Any()) .Returns(_ => throw new OperationCanceledException()); await Assert.ThrowsAnyAsync(() => _adapter.WriteBatchAsync(new Dictionary { ["tag1"] = 1 }, cts.Token)); } [Fact] public async Task NotConnected_ThrowsOnOperations() { _mockClient.IsConnected.Returns(false); await Assert.ThrowsAsync(() => _adapter.ReadAsync("tag1")); } [Fact] public async Task DisposeAsync_CleansUp() { _mockClient.IsConnected.Returns(true); await _adapter.ConnectAsync(new Dictionary()); await _adapter.DisposeAsync(); Assert.Equal(ConnectionHealth.Disconnected, _adapter.Status); } // --- Configuration Parsing --- [Fact] public async Task Connect_ParsesAllConfigurationKeys() { _mockClient.IsConnected.Returns(true); await _adapter.ConnectAsync(new Dictionary { ["EndpointUrl"] = "opc.tcp://myserver:4840", ["SessionTimeoutMs"] = "120000", ["OperationTimeoutMs"] = "30000", ["PublishingIntervalMs"] = "500", ["KeepAliveCount"] = "5", ["LifetimeCount"] = "15", ["MaxNotificationsPerPublish"] = "200", ["SamplingIntervalMs"] = "250", ["QueueSize"] = "20", ["SecurityMode"] = "SignAndEncrypt", ["AutoAcceptUntrustedCerts"] = "false" }); await _mockClient.Received(1).ConnectAsync( "opc.tcp://myserver:4840", Arg.Is(o => o != null && o.SessionTimeoutMs == 120000 && o.OperationTimeoutMs == 30000 && o.PublishingIntervalMs == 500 && o.KeepAliveCount == 5 && o.LifetimeCount == 15 && o.MaxNotificationsPerPublish == 200 && o.SamplingIntervalMs == 250 && o.QueueSize == 20 && o.SecurityMode == "SignAndEncrypt" && o.AutoAcceptUntrustedCerts == false), Arg.Any()); } [Fact] public async Task Connect_UsesDefaults_WhenKeysNotProvided() { _mockClient.IsConnected.Returns(true); await _adapter.ConnectAsync(new Dictionary()); await _mockClient.Received(1).ConnectAsync( "opc.tcp://localhost:4840", Arg.Is(o => o != null && o.SessionTimeoutMs == 60000 && o.OperationTimeoutMs == 15000 && o.PublishingIntervalMs == 1000 && o.KeepAliveCount == 10 && o.LifetimeCount == 30 && o.MaxNotificationsPerPublish == 100 && o.SamplingIntervalMs == 1000 && o.QueueSize == 10 && o.SecurityMode == "None" && o.AutoAcceptUntrustedCerts == true), Arg.Any()); } [Fact] public async Task Connect_IgnoresInvalidNumericValues() { _mockClient.IsConnected.Returns(true); await _adapter.ConnectAsync(new Dictionary { ["SessionTimeoutMs"] = "notanumber", ["OperationTimeoutMs"] = "", ["PublishingIntervalMs"] = "abc", ["QueueSize"] = "12.5" }); await _mockClient.Received(1).ConnectAsync( Arg.Any(), Arg.Is(o => o != null && o.SessionTimeoutMs == 60000 && o.OperationTimeoutMs == 15000 && o.PublishingIntervalMs == 1000 && o.QueueSize == 10), Arg.Any()); } [Fact] public async Task Connect_ParsesSecurityMode() { _mockClient.IsConnected.Returns(true); await _adapter.ConnectAsync(new Dictionary { ["SecurityMode"] = "Sign" }); await _mockClient.Received(1).ConnectAsync( Arg.Any(), Arg.Is(o => o != null && o.SecurityMode == "Sign"), Arg.Any()); } [Fact] public async Task Connect_ParsesAutoAcceptCerts() { _mockClient.IsConnected.Returns(true); await _adapter.ConnectAsync(new Dictionary { ["AutoAcceptUntrustedCerts"] = "false" }); await _mockClient.Received(1).ConnectAsync( Arg.Any(), Arg.Is(o => o != null && o.AutoAcceptUntrustedCerts == false), Arg.Any()); } }