using Akka.Actor; using Akka.TestKit.Xunit2; using NSubstitute; using NSubstitute.Core; using ScadaLink.Commons.Interfaces.Protocol; using ScadaLink.Commons.Messages.DataConnection; using ScadaLink.Commons.Types.Enums; using ScadaLink.DataConnectionLayer.Actors; using ScadaLink.HealthMonitoring; namespace ScadaLink.DataConnectionLayer.Tests; /// /// WP-6: Tests for DataConnectionActor Become/Stash state machine. /// WP-9: Auto-reconnect and bad quality tests. /// WP-10: Transparent re-subscribe tests. /// WP-11: Write-back support tests. /// WP-12: Tag path resolution with retry tests. /// WP-13: Health reporting tests. /// WP-14: Subscription lifecycle tests. /// Task-4: Failover state machine tests. /// public class DataConnectionActorTests : TestKit { private readonly IDataConnection _mockAdapter; private readonly DataConnectionOptions _options; private readonly ISiteHealthCollector _mockHealthCollector; private readonly IDataConnectionFactory _mockFactory; public DataConnectionActorTests() : base(@"akka.loglevel = DEBUG") { _mockAdapter = Substitute.For(); _mockHealthCollector = Substitute.For(); _mockFactory = Substitute.For(); _options = new DataConnectionOptions { ReconnectInterval = TimeSpan.FromMilliseconds(100), TagResolutionRetryInterval = TimeSpan.FromMilliseconds(200), WriteTimeout = TimeSpan.FromSeconds(5) }; } private IActorRef CreateConnectionActor(string name = "test-conn") { return Sys.ActorOf(Props.Create(() => new DataConnectionActor(name, _mockAdapter, _options, _mockHealthCollector, _mockFactory, "OpcUa")), name); } /// /// Creates a DataConnectionActor with primary/backup failover configuration. /// private IActorRef CreateFailoverActor( IDataConnection adapter, string name, IDictionary primaryConfig, IDictionary? backupConfig, int failoverRetryCount) { return Sys.ActorOf(Props.Create(() => new DataConnectionActor( name, adapter, _options, _mockHealthCollector, _mockFactory, "OpcUa", primaryConfig, backupConfig, failoverRetryCount)), name); } /// /// Raises the Disconnected event on a NSubstitute mock IDataConnection. /// private static void RaiseDisconnected(IDataConnection adapter) { adapter.Disconnected += Raise.Event(); } [Fact] public void WP6_StartsInConnectingState_AttemptsConnect() { _mockAdapter.ConnectAsync(Arg.Any>(), Arg.Any()) .Returns(Task.CompletedTask); var actor = CreateConnectionActor(); // Give it time to attempt connection AwaitCondition(() => _mockAdapter.ReceivedCalls().Any(c => c.GetMethodInfo().Name == "ConnectAsync"), TimeSpan.FromSeconds(2)); } [Fact] public void WP6_ConnectingState_StashesSubscribeRequests() { // Make connect hang so we stay in Connecting var tcs = new TaskCompletionSource(); _mockAdapter.ConnectAsync(Arg.Any>(), Arg.Any()) .Returns(tcs.Task); var actor = CreateConnectionActor("stash-test"); // Send subscribe while connecting — should be stashed actor.Tell(new SubscribeTagsRequest( "corr1", "inst1", "stash-test", ["tag1"], DateTimeOffset.UtcNow)); // No response yet (stashed) ExpectNoMsg(TimeSpan.FromMilliseconds(200)); // Complete connection — should unstash and process _mockAdapter.SubscribeAsync(Arg.Any(), Arg.Any(), Arg.Any()) .Returns("sub-001"); tcs.SetResult(); // Now we should get the response ExpectMsg(TimeSpan.FromSeconds(2)); } [Fact] public async Task WP11_ConnectedState_Write_ReturnsResult() { _mockAdapter.ConnectAsync(Arg.Any>(), Arg.Any()) .Returns(Task.CompletedTask); _mockAdapter.WriteAsync("tag1", 42, Arg.Any()) .Returns(new WriteResult(true, null)); var actor = CreateConnectionActor("write-test"); // Wait for connected state AwaitCondition(() => _mockAdapter.ReceivedCalls().Any(c => c.GetMethodInfo().Name == "ConnectAsync"), TimeSpan.FromSeconds(2)); // Small delay for state transition await Task.Delay(200); actor.Tell(new WriteTagRequest("corr1", "write-test", "tag1", 42, DateTimeOffset.UtcNow)); var response = ExpectMsg(TimeSpan.FromSeconds(3)); Assert.True(response.Success); } [Fact] public async Task WP11_Write_Failure_ReturnedSynchronously() { _mockAdapter.ConnectAsync(Arg.Any>(), Arg.Any()) .Returns(Task.CompletedTask); _mockAdapter.WriteAsync("tag1", 42, Arg.Any()) .Returns(new WriteResult(false, "Device offline")); var actor = CreateConnectionActor("write-fail-test"); await Task.Delay(300); actor.Tell(new WriteTagRequest("corr1", "write-fail-test", "tag1", 42, DateTimeOffset.UtcNow)); var response = ExpectMsg(TimeSpan.FromSeconds(3)); Assert.False(response.Success); Assert.Equal("Device offline", response.ErrorMessage); } [Fact] public async Task WP13_HealthReport_ReturnsConnectionStatus() { _mockAdapter.ConnectAsync(Arg.Any>(), Arg.Any()) .Returns(Task.CompletedTask); _mockAdapter.Status.Returns(ConnectionHealth.Connected); var actor = CreateConnectionActor("health-test"); await Task.Delay(300); actor.Tell(new DataConnectionActor.GetHealthReport()); var report = ExpectMsg(TimeSpan.FromSeconds(2)); Assert.Equal("health-test", report.ConnectionName); Assert.Equal(ConnectionHealth.Connected, report.Status); } // ── Task-4: Failover state machine tests ── [Fact] public async Task Task4_FailoverAfterNRetries_SwitchesToBackup() { // Arrange: primary + backup, failoverRetryCount = 2 var primaryConfig = new Dictionary { ["Endpoint"] = "opc.tcp://primary:4840" }; var backupConfig = new Dictionary { ["Endpoint"] = "opc.tcp://backup:4840" }; var primaryAdapter = Substitute.For(); var backupAdapter = Substitute.For(); // Initial connect succeeds on primary primaryAdapter.ConnectAsync(Arg.Any>(), Arg.Any()) .Returns(Task.CompletedTask); // Factory returns backup adapter when called with backup config _mockFactory.Create("OpcUa", Arg.Is>(d => d["Endpoint"] == "opc.tcp://backup:4840")) .Returns(backupAdapter); // Backup adapter connect succeeds (so failover can complete) backupAdapter.ConnectAsync(Arg.Any>(), Arg.Any()) .Returns(Task.CompletedTask); var actor = CreateFailoverActor(primaryAdapter, "failover-test", primaryConfig, backupConfig, failoverRetryCount: 2); // Wait for initial connection on primary AwaitCondition(() => primaryAdapter.ReceivedCalls().Any(c => c.GetMethodInfo().Name == "ConnectAsync"), TimeSpan.FromSeconds(2)); await Task.Delay(200); // State transition to Connected // Now make primary reconnect fail primaryAdapter.ConnectAsync(Arg.Any>(), Arg.Any()) .Returns(Task.FromException(new Exception("Connection refused"))); // Trigger disconnect RaiseDisconnected(primaryAdapter); // Wait for failover: after 2 failures, factory should be called with backup config AwaitCondition(() => _mockFactory.ReceivedCalls().Any(c => c.GetMethodInfo().Name == "Create" && c.GetArguments()[1] is IDictionary d && d["Endpoint"] == "opc.tcp://backup:4840"), TimeSpan.FromSeconds(5)); } [Fact] public async Task Task4_SingleEndpoint_RetriesIndefinitely_NoFailover() { // Arrange: primary only, no backup var primaryConfig = new Dictionary { ["Endpoint"] = "opc.tcp://primary:4840" }; var primaryAdapter = Substitute.For(); var connectCount = 0; // First connect succeeds, all subsequent fail primaryAdapter.ConnectAsync(Arg.Any>(), Arg.Any()) .Returns(callInfo => { var count = Interlocked.Increment(ref connectCount); if (count == 1) return Task.CompletedTask; return Task.FromException(new Exception("Connection refused")); }); var actor = CreateFailoverActor(primaryAdapter, "no-backup-test", primaryConfig, backupConfig: null, failoverRetryCount: 3); // Wait for initial connection AwaitCondition(() => connectCount >= 1, TimeSpan.FromSeconds(2)); await Task.Delay(200); // Trigger disconnect — starts reconnect attempts RaiseDisconnected(primaryAdapter); // Wait for many reconnect failures (well over the failoverRetryCount threshold) AwaitCondition(() => connectCount >= 8, TimeSpan.FromSeconds(10)); // Factory should never be called — no backup to fail over to _mockFactory.DidNotReceive().Create(Arg.Any(), Arg.Any>()); } [Fact] public async Task Task4_RoundRobin_BackToPrimary_AfterBackupFails() { // Arrange: primary + backup, failoverRetryCount = 1 var primaryConfig = new Dictionary { ["Endpoint"] = "opc.tcp://primary:4840" }; var backupConfig = new Dictionary { ["Endpoint"] = "opc.tcp://backup:4840" }; var primaryAdapter = Substitute.For(); var backupAdapter = Substitute.For(); var secondPrimaryAdapter = Substitute.For(); // Initial connect on primary succeeds primaryAdapter.ConnectAsync(Arg.Any>(), Arg.Any()) .Returns(Task.CompletedTask); // After disconnect, primary reconnect fails (triggers failover to backup) var primaryConnectCount = 0; primaryAdapter.ConnectAsync(Arg.Any>(), Arg.Any()) .Returns(callInfo => { var count = Interlocked.Increment(ref primaryConnectCount); if (count == 1) return Task.CompletedTask; // Initial connect return Task.FromException(new Exception("Primary down")); // Reconnect fails }); // Factory: backup config → backupAdapter, primary config → secondPrimaryAdapter _mockFactory.Create("OpcUa", Arg.Is>(d => d["Endpoint"] == "opc.tcp://backup:4840")) .Returns(backupAdapter); _mockFactory.Create("OpcUa", Arg.Is>(d => d["Endpoint"] == "opc.tcp://primary:4840")) .Returns(secondPrimaryAdapter); // Backup connect succeeds first time, then fails on reconnect var backupConnectCount = 0; backupAdapter.ConnectAsync(Arg.Any>(), Arg.Any()) .Returns(callInfo => { var count = Interlocked.Increment(ref backupConnectCount); if (count == 1) return Task.CompletedTask; // First backup connect succeeds return Task.FromException(new Exception("Backup down")); // Backup reconnect fails }); // Second primary adapter connects fine secondPrimaryAdapter.ConnectAsync(Arg.Any>(), Arg.Any()) .Returns(Task.CompletedTask); var actor = CreateFailoverActor(primaryAdapter, "roundrobin-test", primaryConfig, backupConfig, failoverRetryCount: 1); // Wait for initial primary connect AwaitCondition(() => primaryConnectCount >= 1, TimeSpan.FromSeconds(2)); await Task.Delay(200); // Disconnect primary → 1 failure → failover to backup RaiseDisconnected(primaryAdapter); // Wait for backup adapter creation AwaitCondition(() => _mockFactory.ReceivedCalls().Any(c => c.GetMethodInfo().Name == "Create" && c.GetArguments()[1] is IDictionary d && d["Endpoint"] == "opc.tcp://backup:4840"), TimeSpan.FromSeconds(5)); // Wait for backup to connect successfully AwaitCondition(() => backupConnectCount >= 1, TimeSpan.FromSeconds(2)); await Task.Delay(200); // Now disconnect backup → 1 failure → failover back to primary RaiseDisconnected(backupAdapter); // Wait for primary adapter re-creation AwaitCondition(() => _mockFactory.ReceivedCalls().Any(c => c.GetMethodInfo().Name == "Create" && c.GetArguments()[1] is IDictionary d && d["Endpoint"] == "opc.tcp://primary:4840"), TimeSpan.FromSeconds(5)); } [Fact] public async Task Task4_SuccessfulReconnect_ResetsFailureCounter() { // Arrange: primary + backup, failoverRetryCount = 3 var primaryConfig = new Dictionary { ["Endpoint"] = "opc.tcp://primary:4840" }; var backupConfig = new Dictionary { ["Endpoint"] = "opc.tcp://backup:4840" }; var primaryAdapter = Substitute.For(); var connectCount = 0; // First connect succeeds, then 2 failures, then success, then 2 more failures, then success primaryAdapter.ConnectAsync(Arg.Any>(), Arg.Any()) .Returns(callInfo => { var count = Interlocked.Increment(ref connectCount); // count 1: initial connect → success // count 2,3: reconnect failures // count 4: reconnect success (resets counter) // count 5,6: reconnect failures again // count 7: reconnect success again return count switch { 1 => Task.CompletedTask, 2 or 3 => Task.FromException(new Exception("Fail")), 4 => Task.CompletedTask, 5 or 6 => Task.FromException(new Exception("Fail")), _ => Task.CompletedTask }; }); var actor = CreateFailoverActor(primaryAdapter, "reset-counter-test", primaryConfig, backupConfig, failoverRetryCount: 3); // Wait for initial connect AwaitCondition(() => connectCount >= 1, TimeSpan.FromSeconds(2)); await Task.Delay(200); // Disconnect: triggers 2 failures then success (count 2,3,4) RaiseDisconnected(primaryAdapter); // Wait for successful reconnect (count 4) AwaitCondition(() => connectCount >= 4, TimeSpan.FromSeconds(5)); await Task.Delay(200); // Disconnect again: triggers 2 more failures then success (count 5,6,7) RaiseDisconnected(primaryAdapter); // Wait for second successful reconnect (count 7) AwaitCondition(() => connectCount >= 7, TimeSpan.FromSeconds(5)); await Task.Delay(200); // Factory should never be called — counter reset each time before reaching 3 _mockFactory.DidNotReceive().Create(Arg.Any(), Arg.Any>()); } [Fact] public async Task Task4_ReSubscribeAll_CalledAfterFailoverReconnect() { // Arrange: primary + backup, failoverRetryCount = 1 var primaryConfig = new Dictionary { ["Endpoint"] = "opc.tcp://primary:4840" }; var backupConfig = new Dictionary { ["Endpoint"] = "opc.tcp://backup:4840" }; var primaryAdapter = Substitute.For(); var backupAdapter = Substitute.For(); // Primary initial connect succeeds var primaryConnectCount = 0; primaryAdapter.ConnectAsync(Arg.Any>(), Arg.Any()) .Returns(callInfo => { var count = Interlocked.Increment(ref primaryConnectCount); if (count == 1) return Task.CompletedTask; return Task.FromException(new Exception("Primary down")); }); // Primary subscribe succeeds primaryAdapter.SubscribeAsync(Arg.Any(), Arg.Any(), Arg.Any()) .Returns("sub-primary-001"); // Primary read succeeds (for initial read after subscribe) primaryAdapter.ReadAsync(Arg.Any(), Arg.Any()) .Returns(new ReadResult(true, new TagValue(42.0, QualityCode.Good, DateTimeOffset.UtcNow), null)); // Factory returns backup adapter _mockFactory.Create("OpcUa", Arg.Is>(d => d["Endpoint"] == "opc.tcp://backup:4840")) .Returns(backupAdapter); // Backup connect succeeds backupAdapter.ConnectAsync(Arg.Any>(), Arg.Any()) .Returns(Task.CompletedTask); // Backup subscribe succeeds (for re-subscribe after failover) backupAdapter.SubscribeAsync(Arg.Any(), Arg.Any(), Arg.Any()) .Returns("sub-backup-001"); var actor = CreateFailoverActor(primaryAdapter, "resub-test", primaryConfig, backupConfig, failoverRetryCount: 1); // Wait for initial connect AwaitCondition(() => primaryConnectCount >= 1, TimeSpan.FromSeconds(2)); await Task.Delay(200); // Subscribe to tags while connected on primary actor.Tell(new SubscribeTagsRequest("corr1", "inst1", "resub-test", ["sensor/temp"], DateTimeOffset.UtcNow)); ExpectMsg(TimeSpan.FromSeconds(3)); // Verify primary adapter received subscribe call await primaryAdapter.Received().SubscribeAsync( "sensor/temp", Arg.Any(), Arg.Any()); // Disconnect primary → 1 failure → failover to backup RaiseDisconnected(primaryAdapter); // Wait for backup adapter creation and connect AwaitCondition(() => _mockFactory.ReceivedCalls().Any(c => c.GetMethodInfo().Name == "Create" && c.GetArguments()[1] is IDictionary d && d["Endpoint"] == "opc.tcp://backup:4840"), TimeSpan.FromSeconds(5)); // Wait for ReSubscribeAll to fire on backup adapter AwaitCondition(() => backupAdapter.ReceivedCalls().Any(c => c.GetMethodInfo().Name == "SubscribeAsync"), TimeSpan.FromSeconds(5)); // Verify backup adapter received SubscribeAsync for the same tag await backupAdapter.Received().SubscribeAsync( "sensor/temp", Arg.Any(), Arg.Any()); } }