fix: wire DCL connection state changes into ISiteHealthCollector

DataConnectionActor now calls UpdateConnectionHealth() on state
transitions (Connecting/Connected/Reconnecting) and UpdateTagResolution()
on connection establishment. DataConnectionManagerActor calls
RemoveConnection() on actor removal. Health reports now include
data connection statuses when instances are deployed with bindings.
This commit is contained in:
Joseph Doherty
2026-03-18 00:20:02 -04:00
parent 4f22ca2b1f
commit 75a6636a2c
6 changed files with 28 additions and 7 deletions

View File

@@ -3,6 +3,7 @@ using Akka.Event;
using ScadaLink.Commons.Interfaces.Protocol; using ScadaLink.Commons.Interfaces.Protocol;
using ScadaLink.Commons.Messages.DataConnection; using ScadaLink.Commons.Messages.DataConnection;
using ScadaLink.Commons.Types.Enums; using ScadaLink.Commons.Types.Enums;
using ScadaLink.HealthMonitoring;
namespace ScadaLink.DataConnectionLayer.Actors; namespace ScadaLink.DataConnectionLayer.Actors;
@@ -28,6 +29,7 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
private readonly string _connectionName; private readonly string _connectionName;
private readonly IDataConnection _adapter; private readonly IDataConnection _adapter;
private readonly DataConnectionOptions _options; private readonly DataConnectionOptions _options;
private readonly ISiteHealthCollector _healthCollector;
public IStash Stash { get; set; } = null!; public IStash Stash { get; set; } = null!;
public ITimerScheduler Timers { get; set; } = null!; public ITimerScheduler Timers { get; set; } = null!;
@@ -64,11 +66,13 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
string connectionName, string connectionName,
IDataConnection adapter, IDataConnection adapter,
DataConnectionOptions options, DataConnectionOptions options,
ISiteHealthCollector healthCollector,
IDictionary<string, string>? connectionDetails = null) IDictionary<string, string>? connectionDetails = null)
{ {
_connectionName = connectionName; _connectionName = connectionName;
_adapter = adapter; _adapter = adapter;
_options = options; _options = options;
_healthCollector = healthCollector;
_connectionDetails = connectionDetails ?? new Dictionary<string, string>(); _connectionDetails = connectionDetails ?? new Dictionary<string, string>();
} }
@@ -96,6 +100,7 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
private void BecomeConnecting() private void BecomeConnecting()
{ {
_log.Info("[{0}] Entering Connecting state", _connectionName); _log.Info("[{0}] Entering Connecting state", _connectionName);
_healthCollector.UpdateConnectionHealth(_connectionName, ConnectionHealth.Connecting);
Become(Connecting); Become(Connecting);
Self.Tell(new AttemptConnect()); Self.Tell(new AttemptConnect());
} }
@@ -129,6 +134,8 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
private void BecomeConnected() private void BecomeConnected()
{ {
_log.Info("[{0}] Entering Connected state", _connectionName); _log.Info("[{0}] Entering Connected state", _connectionName);
_healthCollector.UpdateConnectionHealth(_connectionName, ConnectionHealth.Connected);
_healthCollector.UpdateTagResolution(_connectionName, _totalSubscribed, _resolvedTags);
Become(Connected); Become(Connected);
Stash.UnstashAll(); Stash.UnstashAll();
} }
@@ -166,6 +173,7 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers
private void BecomeReconnecting() private void BecomeReconnecting()
{ {
_log.Warning("[{0}] Entering Reconnecting state", _connectionName); _log.Warning("[{0}] Entering Reconnecting state", _connectionName);
_healthCollector.UpdateConnectionHealth(_connectionName, ConnectionHealth.Disconnected);
Become(Reconnecting); Become(Reconnecting);
// WP-9: Push bad quality for all subscribed tags on disconnect // WP-9: Push bad quality for all subscribed tags on disconnect

View File

@@ -2,6 +2,7 @@ using Akka.Actor;
using Akka.Event; using Akka.Event;
using ScadaLink.Commons.Interfaces.Protocol; using ScadaLink.Commons.Interfaces.Protocol;
using ScadaLink.Commons.Messages.DataConnection; using ScadaLink.Commons.Messages.DataConnection;
using ScadaLink.HealthMonitoring;
namespace ScadaLink.DataConnectionLayer.Actors; namespace ScadaLink.DataConnectionLayer.Actors;
@@ -15,14 +16,17 @@ public class DataConnectionManagerActor : ReceiveActor
private readonly ILoggingAdapter _log = Context.GetLogger(); private readonly ILoggingAdapter _log = Context.GetLogger();
private readonly IDataConnectionFactory _factory; private readonly IDataConnectionFactory _factory;
private readonly DataConnectionOptions _options; private readonly DataConnectionOptions _options;
private readonly ISiteHealthCollector _healthCollector;
private readonly Dictionary<string, IActorRef> _connectionActors = new(); private readonly Dictionary<string, IActorRef> _connectionActors = new();
public DataConnectionManagerActor( public DataConnectionManagerActor(
IDataConnectionFactory factory, IDataConnectionFactory factory,
DataConnectionOptions options) DataConnectionOptions options,
ISiteHealthCollector healthCollector)
{ {
_factory = factory; _factory = factory;
_options = options; _options = options;
_healthCollector = healthCollector;
Receive<CreateConnectionCommand>(HandleCreateConnection); Receive<CreateConnectionCommand>(HandleCreateConnection);
Receive<SubscribeTagsRequest>(HandleRoute); Receive<SubscribeTagsRequest>(HandleRoute);
@@ -44,7 +48,7 @@ public class DataConnectionManagerActor : ReceiveActor
var adapter = _factory.Create(command.ProtocolType, command.ConnectionDetails); var adapter = _factory.Create(command.ProtocolType, command.ConnectionDetails);
var props = Props.Create(() => new DataConnectionActor( var props = Props.Create(() => new DataConnectionActor(
command.ConnectionName, adapter, _options, command.ConnectionDetails)); command.ConnectionName, adapter, _options, _healthCollector, command.ConnectionDetails));
// Sanitize name for Akka actor path (replace spaces and invalid chars) // Sanitize name for Akka actor path (replace spaces and invalid chars)
var actorName = new string(command.ConnectionName var actorName = new string(command.ConnectionName
@@ -97,6 +101,7 @@ public class DataConnectionManagerActor : ReceiveActor
{ {
Context.Stop(actor); Context.Stop(actor);
_connectionActors.Remove(command.ConnectionName); _connectionActors.Remove(command.ConnectionName);
_healthCollector.RemoveConnection(command.ConnectionName);
_log.Info("Removed DataConnectionActor for {0}", command.ConnectionName); _log.Info("Removed DataConnectionActor for {0}", command.ConnectionName);
} }
} }

View File

@@ -20,6 +20,7 @@
<ItemGroup> <ItemGroup>
<ProjectReference Include="../ScadaLink.Commons/ScadaLink.Commons.csproj" /> <ProjectReference Include="../ScadaLink.Commons/ScadaLink.Commons.csproj" />
<ProjectReference Include="../ScadaLink.HealthMonitoring/ScadaLink.HealthMonitoring.csproj" />
</ItemGroup> </ItemGroup>
</Project> </Project>

View File

@@ -219,9 +219,10 @@ akka {{
IActorRef? dclManager = null; IActorRef? dclManager = null;
if (dclFactory != null) if (dclFactory != null)
{ {
var healthCollector = _serviceProvider.GetRequiredService<ScadaLink.HealthMonitoring.ISiteHealthCollector>();
dclManager = _actorSystem!.ActorOf( dclManager = _actorSystem!.ActorOf(
Props.Create(() => new ScadaLink.DataConnectionLayer.Actors.DataConnectionManagerActor( Props.Create(() => new ScadaLink.DataConnectionLayer.Actors.DataConnectionManagerActor(
dclFactory, dclOptions)), dclFactory, dclOptions, healthCollector)),
"dcl-manager"); "dcl-manager");
_logger.LogInformation("Data Connection Layer manager actor created"); _logger.LogInformation("Data Connection Layer manager actor created");
} }

View File

@@ -5,6 +5,7 @@ using ScadaLink.Commons.Interfaces.Protocol;
using ScadaLink.Commons.Messages.DataConnection; using ScadaLink.Commons.Messages.DataConnection;
using ScadaLink.Commons.Types.Enums; using ScadaLink.Commons.Types.Enums;
using ScadaLink.DataConnectionLayer.Actors; using ScadaLink.DataConnectionLayer.Actors;
using ScadaLink.HealthMonitoring;
namespace ScadaLink.DataConnectionLayer.Tests; namespace ScadaLink.DataConnectionLayer.Tests;
@@ -21,11 +22,13 @@ public class DataConnectionActorTests : TestKit
{ {
private readonly IDataConnection _mockAdapter; private readonly IDataConnection _mockAdapter;
private readonly DataConnectionOptions _options; private readonly DataConnectionOptions _options;
private readonly ISiteHealthCollector _mockHealthCollector;
public DataConnectionActorTests() public DataConnectionActorTests()
: base(@"akka.loglevel = DEBUG") : base(@"akka.loglevel = DEBUG")
{ {
_mockAdapter = Substitute.For<IDataConnection>(); _mockAdapter = Substitute.For<IDataConnection>();
_mockHealthCollector = Substitute.For<ISiteHealthCollector>();
_options = new DataConnectionOptions _options = new DataConnectionOptions
{ {
ReconnectInterval = TimeSpan.FromMilliseconds(100), ReconnectInterval = TimeSpan.FromMilliseconds(100),
@@ -37,7 +40,7 @@ public class DataConnectionActorTests : TestKit
private IActorRef CreateConnectionActor(string name = "test-conn") private IActorRef CreateConnectionActor(string name = "test-conn")
{ {
return Sys.ActorOf(Props.Create(() => return Sys.ActorOf(Props.Create(() =>
new DataConnectionActor(name, _mockAdapter, _options)), name); new DataConnectionActor(name, _mockAdapter, _options, _mockHealthCollector)), name);
} }
[Fact] [Fact]

View File

@@ -4,6 +4,7 @@ using NSubstitute;
using ScadaLink.Commons.Interfaces.Protocol; using ScadaLink.Commons.Interfaces.Protocol;
using ScadaLink.Commons.Messages.DataConnection; using ScadaLink.Commons.Messages.DataConnection;
using ScadaLink.DataConnectionLayer.Actors; using ScadaLink.DataConnectionLayer.Actors;
using ScadaLink.HealthMonitoring;
namespace ScadaLink.DataConnectionLayer.Tests; namespace ScadaLink.DataConnectionLayer.Tests;
@@ -14,11 +15,13 @@ public class DataConnectionManagerActorTests : TestKit
{ {
private readonly IDataConnectionFactory _mockFactory; private readonly IDataConnectionFactory _mockFactory;
private readonly DataConnectionOptions _options; private readonly DataConnectionOptions _options;
private readonly ISiteHealthCollector _mockHealthCollector;
public DataConnectionManagerActorTests() public DataConnectionManagerActorTests()
: base(@"akka.loglevel = DEBUG") : base(@"akka.loglevel = DEBUG")
{ {
_mockFactory = Substitute.For<IDataConnectionFactory>(); _mockFactory = Substitute.For<IDataConnectionFactory>();
_mockHealthCollector = Substitute.For<ISiteHealthCollector>();
_options = new DataConnectionOptions _options = new DataConnectionOptions
{ {
ReconnectInterval = TimeSpan.FromMilliseconds(100), ReconnectInterval = TimeSpan.FromMilliseconds(100),
@@ -30,7 +33,7 @@ public class DataConnectionManagerActorTests : TestKit
public void WriteToUnknownConnection_ReturnsError() public void WriteToUnknownConnection_ReturnsError()
{ {
var manager = Sys.ActorOf(Props.Create(() => var manager = Sys.ActorOf(Props.Create(() =>
new DataConnectionManagerActor(_mockFactory, _options))); new DataConnectionManagerActor(_mockFactory, _options, _mockHealthCollector)));
manager.Tell(new WriteTagRequest( manager.Tell(new WriteTagRequest(
"corr1", "nonexistent", "tag1", 42, DateTimeOffset.UtcNow)); "corr1", "nonexistent", "tag1", 42, DateTimeOffset.UtcNow));
@@ -44,7 +47,7 @@ public class DataConnectionManagerActorTests : TestKit
public void SubscribeToUnknownConnection_ReturnsError() public void SubscribeToUnknownConnection_ReturnsError()
{ {
var manager = Sys.ActorOf(Props.Create(() => var manager = Sys.ActorOf(Props.Create(() =>
new DataConnectionManagerActor(_mockFactory, _options))); new DataConnectionManagerActor(_mockFactory, _options, _mockHealthCollector)));
manager.Tell(new SubscribeTagsRequest( manager.Tell(new SubscribeTagsRequest(
"corr1", "inst1", "nonexistent", ["tag1"], DateTimeOffset.UtcNow)); "corr1", "inst1", "nonexistent", ["tag1"], DateTimeOffset.UtcNow));
@@ -64,7 +67,7 @@ public class DataConnectionManagerActorTests : TestKit
.Returns(mockAdapter); .Returns(mockAdapter);
var manager = Sys.ActorOf(Props.Create(() => var manager = Sys.ActorOf(Props.Create(() =>
new DataConnectionManagerActor(_mockFactory, _options))); new DataConnectionManagerActor(_mockFactory, _options, _mockHealthCollector)));
manager.Tell(new CreateConnectionCommand( manager.Tell(new CreateConnectionCommand(
"conn1", "OpcUa", new Dictionary<string, string>())); "conn1", "OpcUa", new Dictionary<string, string>()));