Files
scadalink-design/src/ScadaLink.DataConnectionLayer/Actors/DataConnectionManagerActor.cs
Joseph Doherty 75a6636a2c fix: wire DCL connection state changes into ISiteHealthCollector
DataConnectionActor now calls UpdateConnectionHealth() on state
transitions (Connecting/Connected/Reconnecting) and UpdateTagResolution()
on connection establishment. DataConnectionManagerActor calls
RemoveConnection() on actor removal. Health reports now include
data connection statuses when instances are deployed with bindings.
2026-03-18 00:20:02 -04:00

144 lines
5.4 KiB
C#

using Akka.Actor;
using Akka.Event;
using ScadaLink.Commons.Interfaces.Protocol;
using ScadaLink.Commons.Messages.DataConnection;
using ScadaLink.HealthMonitoring;
namespace ScadaLink.DataConnectionLayer.Actors;
/// <summary>
/// WP-34: Protocol extensibility — manages DataConnectionActor instances.
/// Routes messages to the correct connection actor based on connection name.
/// Adding a new protocol = implement IDataConnection + register with IDataConnectionFactory.
/// </summary>
public class DataConnectionManagerActor : ReceiveActor
{
private readonly ILoggingAdapter _log = Context.GetLogger();
private readonly IDataConnectionFactory _factory;
private readonly DataConnectionOptions _options;
private readonly ISiteHealthCollector _healthCollector;
private readonly Dictionary<string, IActorRef> _connectionActors = new();
public DataConnectionManagerActor(
IDataConnectionFactory factory,
DataConnectionOptions options,
ISiteHealthCollector healthCollector)
{
_factory = factory;
_options = options;
_healthCollector = healthCollector;
Receive<CreateConnectionCommand>(HandleCreateConnection);
Receive<SubscribeTagsRequest>(HandleRoute);
Receive<UnsubscribeTagsRequest>(HandleRoute);
Receive<WriteTagRequest>(HandleRouteWrite);
Receive<RemoveConnectionCommand>(HandleRemoveConnection);
Receive<GetAllHealthReports>(HandleGetAllHealthReports);
}
private void HandleCreateConnection(CreateConnectionCommand command)
{
if (_connectionActors.ContainsKey(command.ConnectionName))
{
_log.Warning("Connection {0} already exists", command.ConnectionName);
return;
}
// WP-34: Factory creates the correct adapter based on protocol type
var adapter = _factory.Create(command.ProtocolType, command.ConnectionDetails);
var props = Props.Create(() => new DataConnectionActor(
command.ConnectionName, adapter, _options, _healthCollector, command.ConnectionDetails));
// Sanitize name for Akka actor path (replace spaces and invalid chars)
var actorName = new string(command.ConnectionName
.Select(c => char.IsLetterOrDigit(c) || "-_.*$+:@&=,!~';()".Contains(c) ? c : '-')
.ToArray());
var actorRef = Context.ActorOf(props, actorName);
_connectionActors[command.ConnectionName] = actorRef;
_log.Info("Created DataConnectionActor for {0} (protocol={1})",
command.ConnectionName, command.ProtocolType);
}
private void HandleRoute(SubscribeTagsRequest request)
{
if (_connectionActors.TryGetValue(request.ConnectionName, out var actor))
actor.Forward(request);
else
{
_log.Warning("No connection actor for {0}", request.ConnectionName);
Sender.Tell(new SubscribeTagsResponse(
request.CorrelationId, request.InstanceUniqueName, false,
$"Unknown connection: {request.ConnectionName}", DateTimeOffset.UtcNow));
}
}
private void HandleRoute(UnsubscribeTagsRequest request)
{
if (_connectionActors.TryGetValue(request.ConnectionName, out var actor))
actor.Forward(request);
else
_log.Warning("No connection actor for {0} during unsubscribe", request.ConnectionName);
}
private void HandleRouteWrite(WriteTagRequest request)
{
if (_connectionActors.TryGetValue(request.ConnectionName, out var actor))
actor.Forward(request);
else
{
_log.Warning("No connection actor for {0}", request.ConnectionName);
Sender.Tell(new WriteTagResponse(
request.CorrelationId, false,
$"Unknown connection: {request.ConnectionName}", DateTimeOffset.UtcNow));
}
}
private void HandleRemoveConnection(RemoveConnectionCommand command)
{
if (_connectionActors.TryGetValue(command.ConnectionName, out var actor))
{
Context.Stop(actor);
_connectionActors.Remove(command.ConnectionName);
_healthCollector.RemoveConnection(command.ConnectionName);
_log.Info("Removed DataConnectionActor for {0}", command.ConnectionName);
}
}
private void HandleGetAllHealthReports(GetAllHealthReports _)
{
// Forward health report requests to all connection actors
foreach (var actor in _connectionActors.Values)
{
actor.Forward(new DataConnectionActor.GetHealthReport());
}
}
/// <summary>
/// OneForOneStrategy with Restart for connection actors — a failed connection
/// should restart and attempt reconnection.
/// </summary>
protected override SupervisorStrategy SupervisorStrategy()
{
return new OneForOneStrategy(
maxNrOfRetries: 10,
withinTimeRange: TimeSpan.FromMinutes(1),
decider: Decider.From(ex =>
{
_log.Warning(ex, "DataConnectionActor threw exception, restarting");
return Directive.Restart;
}));
}
}
/// <summary>
/// Command to remove a data connection actor.
/// </summary>
public record RemoveConnectionCommand(string ConnectionName);
/// <summary>
/// Request for health reports from all active connections.
/// </summary>
public record GetAllHealthReports;