From 847302e297fed2057df0c96f5e0a6bdfa36ec949 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 22 Mar 2026 08:47:44 -0400 Subject: [PATCH] test(dcl): add failover state machine tests for DataConnectionActor --- .../DataConnectionActorTests.cs | 310 ++++++++++++++++++ 1 file changed, 310 insertions(+) diff --git a/tests/ScadaLink.DataConnectionLayer.Tests/DataConnectionActorTests.cs b/tests/ScadaLink.DataConnectionLayer.Tests/DataConnectionActorTests.cs index 05f51c1..6bb0396 100644 --- a/tests/ScadaLink.DataConnectionLayer.Tests/DataConnectionActorTests.cs +++ b/tests/ScadaLink.DataConnectionLayer.Tests/DataConnectionActorTests.cs @@ -1,6 +1,7 @@ using Akka.Actor; using Akka.TestKit.Xunit2; using NSubstitute; +using NSubstitute.Core; using ScadaLink.Commons.Interfaces.Protocol; using ScadaLink.Commons.Messages.DataConnection; using ScadaLink.Commons.Types.Enums; @@ -17,6 +18,7 @@ namespace ScadaLink.DataConnectionLayer.Tests; /// WP-12: Tag path resolution with retry tests. /// WP-13: Health reporting tests. /// WP-14: Subscription lifecycle tests. +/// Task-4: Failover state machine tests. /// public class DataConnectionActorTests : TestKit { @@ -46,6 +48,30 @@ public class DataConnectionActorTests : TestKit _mockFactory, "OpcUa")), name); } + /// + /// Creates a DataConnectionActor with primary/backup failover configuration. + /// + private IActorRef CreateFailoverActor( + IDataConnection adapter, + string name, + IDictionary primaryConfig, + IDictionary? backupConfig, + int failoverRetryCount) + { + return Sys.ActorOf(Props.Create(() => + new DataConnectionActor( + name, adapter, _options, _mockHealthCollector, _mockFactory, "OpcUa", + primaryConfig, backupConfig, failoverRetryCount)), name); + } + + /// + /// Raises the Disconnected event on a NSubstitute mock IDataConnection. + /// + private static void RaiseDisconnected(IDataConnection adapter) + { + adapter.Disconnected += Raise.Event(); + } + [Fact] public void WP6_StartsInConnectingState_AttemptsConnect() { @@ -147,4 +173,288 @@ public class DataConnectionActorTests : TestKit Assert.Equal("health-test", report.ConnectionName); Assert.Equal(ConnectionHealth.Connected, report.Status); } + + // ── Task-4: Failover state machine tests ── + + [Fact] + public async Task Task4_FailoverAfterNRetries_SwitchesToBackup() + { + // Arrange: primary + backup, failoverRetryCount = 2 + var primaryConfig = new Dictionary { ["Endpoint"] = "opc.tcp://primary:4840" }; + var backupConfig = new Dictionary { ["Endpoint"] = "opc.tcp://backup:4840" }; + var primaryAdapter = Substitute.For(); + var backupAdapter = Substitute.For(); + + // Initial connect succeeds on primary + primaryAdapter.ConnectAsync(Arg.Any>(), Arg.Any()) + .Returns(Task.CompletedTask); + + // Factory returns backup adapter when called with backup config + _mockFactory.Create("OpcUa", Arg.Is>(d => d["Endpoint"] == "opc.tcp://backup:4840")) + .Returns(backupAdapter); + + // Backup adapter connect succeeds (so failover can complete) + backupAdapter.ConnectAsync(Arg.Any>(), Arg.Any()) + .Returns(Task.CompletedTask); + + var actor = CreateFailoverActor(primaryAdapter, "failover-test", primaryConfig, backupConfig, failoverRetryCount: 2); + + // Wait for initial connection on primary + AwaitCondition(() => + primaryAdapter.ReceivedCalls().Any(c => c.GetMethodInfo().Name == "ConnectAsync"), + TimeSpan.FromSeconds(2)); + await Task.Delay(200); // State transition to Connected + + // Now make primary reconnect fail + primaryAdapter.ConnectAsync(Arg.Any>(), Arg.Any()) + .Returns(Task.FromException(new Exception("Connection refused"))); + + // Trigger disconnect + RaiseDisconnected(primaryAdapter); + + // Wait for failover: after 2 failures, factory should be called with backup config + AwaitCondition(() => + _mockFactory.ReceivedCalls().Any(c => + c.GetMethodInfo().Name == "Create" && + c.GetArguments()[1] is IDictionary d && + d["Endpoint"] == "opc.tcp://backup:4840"), + TimeSpan.FromSeconds(5)); + } + + [Fact] + public async Task Task4_SingleEndpoint_RetriesIndefinitely_NoFailover() + { + // Arrange: primary only, no backup + var primaryConfig = new Dictionary { ["Endpoint"] = "opc.tcp://primary:4840" }; + var primaryAdapter = Substitute.For(); + var connectCount = 0; + + // First connect succeeds, all subsequent fail + primaryAdapter.ConnectAsync(Arg.Any>(), Arg.Any()) + .Returns(callInfo => + { + var count = Interlocked.Increment(ref connectCount); + if (count == 1) return Task.CompletedTask; + return Task.FromException(new Exception("Connection refused")); + }); + + var actor = CreateFailoverActor(primaryAdapter, "no-backup-test", primaryConfig, backupConfig: null, failoverRetryCount: 3); + + // Wait for initial connection + AwaitCondition(() => connectCount >= 1, TimeSpan.FromSeconds(2)); + await Task.Delay(200); + + // Trigger disconnect — starts reconnect attempts + RaiseDisconnected(primaryAdapter); + + // Wait for many reconnect failures (well over the failoverRetryCount threshold) + AwaitCondition(() => connectCount >= 8, TimeSpan.FromSeconds(10)); + + // Factory should never be called — no backup to fail over to + _mockFactory.DidNotReceive().Create(Arg.Any(), Arg.Any>()); + } + + [Fact] + public async Task Task4_RoundRobin_BackToPrimary_AfterBackupFails() + { + // Arrange: primary + backup, failoverRetryCount = 1 + var primaryConfig = new Dictionary { ["Endpoint"] = "opc.tcp://primary:4840" }; + var backupConfig = new Dictionary { ["Endpoint"] = "opc.tcp://backup:4840" }; + var primaryAdapter = Substitute.For(); + var backupAdapter = Substitute.For(); + var secondPrimaryAdapter = Substitute.For(); + + // Initial connect on primary succeeds + primaryAdapter.ConnectAsync(Arg.Any>(), Arg.Any()) + .Returns(Task.CompletedTask); + + // After disconnect, primary reconnect fails (triggers failover to backup) + var primaryConnectCount = 0; + primaryAdapter.ConnectAsync(Arg.Any>(), Arg.Any()) + .Returns(callInfo => + { + var count = Interlocked.Increment(ref primaryConnectCount); + if (count == 1) return Task.CompletedTask; // Initial connect + return Task.FromException(new Exception("Primary down")); // Reconnect fails + }); + + // Factory: backup config → backupAdapter, primary config → secondPrimaryAdapter + _mockFactory.Create("OpcUa", Arg.Is>(d => d["Endpoint"] == "opc.tcp://backup:4840")) + .Returns(backupAdapter); + _mockFactory.Create("OpcUa", Arg.Is>(d => d["Endpoint"] == "opc.tcp://primary:4840")) + .Returns(secondPrimaryAdapter); + + // Backup connect succeeds first time, then fails on reconnect + var backupConnectCount = 0; + backupAdapter.ConnectAsync(Arg.Any>(), Arg.Any()) + .Returns(callInfo => + { + var count = Interlocked.Increment(ref backupConnectCount); + if (count == 1) return Task.CompletedTask; // First backup connect succeeds + return Task.FromException(new Exception("Backup down")); // Backup reconnect fails + }); + + // Second primary adapter connects fine + secondPrimaryAdapter.ConnectAsync(Arg.Any>(), Arg.Any()) + .Returns(Task.CompletedTask); + + var actor = CreateFailoverActor(primaryAdapter, "roundrobin-test", primaryConfig, backupConfig, failoverRetryCount: 1); + + // Wait for initial primary connect + AwaitCondition(() => primaryConnectCount >= 1, TimeSpan.FromSeconds(2)); + await Task.Delay(200); + + // Disconnect primary → 1 failure → failover to backup + RaiseDisconnected(primaryAdapter); + + // Wait for backup adapter creation + AwaitCondition(() => + _mockFactory.ReceivedCalls().Any(c => + c.GetMethodInfo().Name == "Create" && + c.GetArguments()[1] is IDictionary d && + d["Endpoint"] == "opc.tcp://backup:4840"), + TimeSpan.FromSeconds(5)); + + // Wait for backup to connect successfully + AwaitCondition(() => backupConnectCount >= 1, TimeSpan.FromSeconds(2)); + await Task.Delay(200); + + // Now disconnect backup → 1 failure → failover back to primary + RaiseDisconnected(backupAdapter); + + // Wait for primary adapter re-creation + AwaitCondition(() => + _mockFactory.ReceivedCalls().Any(c => + c.GetMethodInfo().Name == "Create" && + c.GetArguments()[1] is IDictionary d && + d["Endpoint"] == "opc.tcp://primary:4840"), + TimeSpan.FromSeconds(5)); + } + + [Fact] + public async Task Task4_SuccessfulReconnect_ResetsFailureCounter() + { + // Arrange: primary + backup, failoverRetryCount = 3 + var primaryConfig = new Dictionary { ["Endpoint"] = "opc.tcp://primary:4840" }; + var backupConfig = new Dictionary { ["Endpoint"] = "opc.tcp://backup:4840" }; + var primaryAdapter = Substitute.For(); + var connectCount = 0; + + // First connect succeeds, then 2 failures, then success, then 2 more failures, then success + primaryAdapter.ConnectAsync(Arg.Any>(), Arg.Any()) + .Returns(callInfo => + { + var count = Interlocked.Increment(ref connectCount); + // count 1: initial connect → success + // count 2,3: reconnect failures + // count 4: reconnect success (resets counter) + // count 5,6: reconnect failures again + // count 7: reconnect success again + return count switch + { + 1 => Task.CompletedTask, + 2 or 3 => Task.FromException(new Exception("Fail")), + 4 => Task.CompletedTask, + 5 or 6 => Task.FromException(new Exception("Fail")), + _ => Task.CompletedTask + }; + }); + + var actor = CreateFailoverActor(primaryAdapter, "reset-counter-test", primaryConfig, backupConfig, failoverRetryCount: 3); + + // Wait for initial connect + AwaitCondition(() => connectCount >= 1, TimeSpan.FromSeconds(2)); + await Task.Delay(200); + + // Disconnect: triggers 2 failures then success (count 2,3,4) + RaiseDisconnected(primaryAdapter); + + // Wait for successful reconnect (count 4) + AwaitCondition(() => connectCount >= 4, TimeSpan.FromSeconds(5)); + await Task.Delay(200); + + // Disconnect again: triggers 2 more failures then success (count 5,6,7) + RaiseDisconnected(primaryAdapter); + + // Wait for second successful reconnect (count 7) + AwaitCondition(() => connectCount >= 7, TimeSpan.FromSeconds(5)); + await Task.Delay(200); + + // Factory should never be called — counter reset each time before reaching 3 + _mockFactory.DidNotReceive().Create(Arg.Any(), Arg.Any>()); + } + + [Fact] + public async Task Task4_ReSubscribeAll_CalledAfterFailoverReconnect() + { + // Arrange: primary + backup, failoverRetryCount = 1 + var primaryConfig = new Dictionary { ["Endpoint"] = "opc.tcp://primary:4840" }; + var backupConfig = new Dictionary { ["Endpoint"] = "opc.tcp://backup:4840" }; + var primaryAdapter = Substitute.For(); + var backupAdapter = Substitute.For(); + + // Primary initial connect succeeds + var primaryConnectCount = 0; + primaryAdapter.ConnectAsync(Arg.Any>(), Arg.Any()) + .Returns(callInfo => + { + var count = Interlocked.Increment(ref primaryConnectCount); + if (count == 1) return Task.CompletedTask; + return Task.FromException(new Exception("Primary down")); + }); + + // Primary subscribe succeeds + primaryAdapter.SubscribeAsync(Arg.Any(), Arg.Any(), Arg.Any()) + .Returns("sub-primary-001"); + + // Primary read succeeds (for initial read after subscribe) + primaryAdapter.ReadAsync(Arg.Any(), Arg.Any()) + .Returns(new ReadResult(true, new TagValue(42.0, QualityCode.Good, DateTimeOffset.UtcNow), null)); + + // Factory returns backup adapter + _mockFactory.Create("OpcUa", Arg.Is>(d => d["Endpoint"] == "opc.tcp://backup:4840")) + .Returns(backupAdapter); + + // Backup connect succeeds + backupAdapter.ConnectAsync(Arg.Any>(), Arg.Any()) + .Returns(Task.CompletedTask); + + // Backup subscribe succeeds (for re-subscribe after failover) + backupAdapter.SubscribeAsync(Arg.Any(), Arg.Any(), Arg.Any()) + .Returns("sub-backup-001"); + + var actor = CreateFailoverActor(primaryAdapter, "resub-test", primaryConfig, backupConfig, failoverRetryCount: 1); + + // Wait for initial connect + AwaitCondition(() => primaryConnectCount >= 1, TimeSpan.FromSeconds(2)); + await Task.Delay(200); + + // Subscribe to tags while connected on primary + actor.Tell(new SubscribeTagsRequest("corr1", "inst1", "resub-test", ["sensor/temp"], DateTimeOffset.UtcNow)); + ExpectMsg(TimeSpan.FromSeconds(3)); + + // Verify primary adapter received subscribe call + await primaryAdapter.Received().SubscribeAsync( + "sensor/temp", Arg.Any(), Arg.Any()); + + // Disconnect primary → 1 failure → failover to backup + RaiseDisconnected(primaryAdapter); + + // Wait for backup adapter creation and connect + AwaitCondition(() => + _mockFactory.ReceivedCalls().Any(c => + c.GetMethodInfo().Name == "Create" && + c.GetArguments()[1] is IDictionary d && + d["Endpoint"] == "opc.tcp://backup:4840"), + TimeSpan.FromSeconds(5)); + + // Wait for ReSubscribeAll to fire on backup adapter + AwaitCondition(() => + backupAdapter.ReceivedCalls().Any(c => c.GetMethodInfo().Name == "SubscribeAsync"), + TimeSpan.FromSeconds(5)); + + // Verify backup adapter received SubscribeAsync for the same tag + await backupAdapter.Received().SubscribeAsync( + "sensor/temp", Arg.Any(), Arg.Any()); + } }