feat(dcl): add failover state machine to DataConnectionActor with round-robin endpoint switching

This commit is contained in:
Joseph Doherty
2026-03-22 08:30:03 -04:00
parent 46304678da
commit da290fa4f8
3 changed files with 56 additions and 6 deletions

View File

@@ -25,11 +25,15 @@ namespace ScadaLink.DataConnectionLayer.Actors;
/// </summary> /// </summary>
public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
{ {
public enum ActiveEndpoint { Primary, Backup }
private readonly ILoggingAdapter _log = Context.GetLogger(); private readonly ILoggingAdapter _log = Context.GetLogger();
private readonly string _connectionName; private readonly string _connectionName;
private readonly IDataConnection _adapter; private IDataConnection _adapter;
private readonly DataConnectionOptions _options; private readonly DataConnectionOptions _options;
private readonly ISiteHealthCollector _healthCollector; private readonly ISiteHealthCollector _healthCollector;
private readonly IDataConnectionFactory _factory;
private readonly string _protocolType;
public IStash Stash { get; set; } = null!; public IStash Stash { get; set; } = null!;
public ITimerScheduler Timers { get; set; } = null!; public ITimerScheduler Timers { get; set; } = null!;
@@ -60,10 +64,12 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
private int _totalSubscribed; private int _totalSubscribed;
private int _resolvedTags; private int _resolvedTags;
private readonly IDictionary<string, string> _connectionDetails; private IDictionary<string, string> _connectionDetails;
private readonly IDictionary<string, string> _primaryConfig; private readonly IDictionary<string, string> _primaryConfig;
private readonly IDictionary<string, string>? _backupConfig; private readonly IDictionary<string, string>? _backupConfig;
private readonly int _failoverRetryCount; private readonly int _failoverRetryCount;
private ActiveEndpoint _activeEndpoint = ActiveEndpoint.Primary;
private int _consecutiveFailures;
/// <summary> /// <summary>
/// Captured Self reference for use from non-actor threads (event handlers, callbacks). /// Captured Self reference for use from non-actor threads (event handlers, callbacks).
@@ -76,6 +82,8 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
IDataConnection adapter, IDataConnection adapter,
DataConnectionOptions options, DataConnectionOptions options,
ISiteHealthCollector healthCollector, ISiteHealthCollector healthCollector,
IDataConnectionFactory factory,
string protocolType,
IDictionary<string, string>? primaryConfig = null, IDictionary<string, string>? primaryConfig = null,
IDictionary<string, string>? backupConfig = null, IDictionary<string, string>? backupConfig = null,
int failoverRetryCount = 3) int failoverRetryCount = 3)
@@ -84,6 +92,8 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
_adapter = adapter; _adapter = adapter;
_options = options; _options = options;
_healthCollector = healthCollector; _healthCollector = healthCollector;
_factory = factory;
_protocolType = protocolType;
_primaryConfig = primaryConfig ?? new Dictionary<string, string>(); _primaryConfig = primaryConfig ?? new Dictionary<string, string>();
_backupConfig = backupConfig; _backupConfig = backupConfig;
_failoverRetryCount = failoverRetryCount; _failoverRetryCount = failoverRetryCount;
@@ -288,7 +298,8 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
{ {
if (result.Success) if (result.Success)
{ {
_log.Info("[{0}] Reconnected successfully", _connectionName); _log.Info("[{0}] Reconnected successfully on {1} endpoint", _connectionName, _activeEndpoint);
_consecutiveFailures = 0;
// WP-10: Transparent re-subscribe — re-establish all active subscriptions // WP-10: Transparent re-subscribe — re-establish all active subscriptions
ReSubscribeAll(); ReSubscribeAll();
@@ -297,8 +308,43 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
} }
else else
{ {
_log.Warning("[{0}] Reconnect failed: {1}. Retrying in {2}s", _consecutiveFailures++;
_connectionName, result.Error, _options.ReconnectInterval.TotalSeconds);
// Failover: switch endpoint after exhausting retry count (only if backup is configured)
if (_backupConfig != null && _consecutiveFailures >= _failoverRetryCount)
{
var previousEndpoint = _activeEndpoint;
_activeEndpoint = _activeEndpoint == ActiveEndpoint.Primary
? ActiveEndpoint.Backup
: ActiveEndpoint.Primary;
_consecutiveFailures = 0;
var newConfig = _activeEndpoint == ActiveEndpoint.Primary
? _primaryConfig
: _backupConfig;
// Dispose old adapter (fire-and-forget — don't await in actor context)
_adapter.Disconnected -= OnAdapterDisconnected;
_ = _adapter.DisposeAsync().AsTask();
// Create new adapter for the target endpoint
_adapter = _factory.Create(_protocolType, newConfig);
_connectionDetails = newConfig;
// Wire disconnect handler on new adapter
_adapter.Disconnected += OnAdapterDisconnected;
_log.Warning("[{0}] Failing over from {1} to {2}",
_connectionName, previousEndpoint, _activeEndpoint);
}
else
{
var retryLimit = _backupConfig != null ? _failoverRetryCount.ToString() : "∞";
_log.Warning("[{0}] Reconnect failed: {1}. Retrying in {2}s (attempt {3}/{4})",
_connectionName, result.Error, _options.ReconnectInterval.TotalSeconds,
_consecutiveFailures, retryLimit);
}
Timers.StartSingleTimer("reconnect", new AttemptConnect(), _options.ReconnectInterval); Timers.StartSingleTimer("reconnect", new AttemptConnect(), _options.ReconnectInterval);
} }
} }

View File

@@ -49,6 +49,7 @@ public class DataConnectionManagerActor : ReceiveActor
var props = Props.Create(() => new DataConnectionActor( var props = Props.Create(() => new DataConnectionActor(
command.ConnectionName, adapter, _options, _healthCollector, command.ConnectionName, adapter, _options, _healthCollector,
_factory, command.ProtocolType,
command.PrimaryConnectionDetails, command.PrimaryConnectionDetails,
command.BackupConnectionDetails, command.BackupConnectionDetails,
command.FailoverRetryCount)); command.FailoverRetryCount));

View File

@@ -23,12 +23,14 @@ public class DataConnectionActorTests : TestKit
private readonly IDataConnection _mockAdapter; private readonly IDataConnection _mockAdapter;
private readonly DataConnectionOptions _options; private readonly DataConnectionOptions _options;
private readonly ISiteHealthCollector _mockHealthCollector; private readonly ISiteHealthCollector _mockHealthCollector;
private readonly IDataConnectionFactory _mockFactory;
public DataConnectionActorTests() public DataConnectionActorTests()
: base(@"akka.loglevel = DEBUG") : base(@"akka.loglevel = DEBUG")
{ {
_mockAdapter = Substitute.For<IDataConnection>(); _mockAdapter = Substitute.For<IDataConnection>();
_mockHealthCollector = Substitute.For<ISiteHealthCollector>(); _mockHealthCollector = Substitute.For<ISiteHealthCollector>();
_mockFactory = Substitute.For<IDataConnectionFactory>();
_options = new DataConnectionOptions _options = new DataConnectionOptions
{ {
ReconnectInterval = TimeSpan.FromMilliseconds(100), ReconnectInterval = TimeSpan.FromMilliseconds(100),
@@ -40,7 +42,8 @@ public class DataConnectionActorTests : TestKit
private IActorRef CreateConnectionActor(string name = "test-conn") private IActorRef CreateConnectionActor(string name = "test-conn")
{ {
return Sys.ActorOf(Props.Create(() => return Sys.ActorOf(Props.Create(() =>
new DataConnectionActor(name, _mockAdapter, _options, _mockHealthCollector)), name); new DataConnectionActor(name, _mockAdapter, _options, _mockHealthCollector,
_mockFactory, "OpcUa")), name);
} }
[Fact] [Fact]