using Akka.Actor; using Akka.Event; using ScadaLink.Commons.Interfaces.Protocol; using ScadaLink.Commons.Messages.DataConnection; using ScadaLink.HealthMonitoring; using ScadaLink.SiteEventLogging; namespace ScadaLink.DataConnectionLayer.Actors; /// /// WP-34: Protocol extensibility — manages DataConnectionActor instances. /// Routes messages to the correct connection actor based on connection name. /// Adding a new protocol = implement IDataConnection + register with IDataConnectionFactory. /// public class DataConnectionManagerActor : ReceiveActor { private readonly ILoggingAdapter _log = Context.GetLogger(); private readonly IDataConnectionFactory _factory; private readonly DataConnectionOptions _options; private readonly ISiteHealthCollector _healthCollector; private readonly ISiteEventLogger? _siteEventLogger; private readonly Dictionary _connectionActors = new(); public DataConnectionManagerActor( IDataConnectionFactory factory, DataConnectionOptions options, ISiteHealthCollector healthCollector, ISiteEventLogger? siteEventLogger = null) { _factory = factory; _options = options; _healthCollector = healthCollector; _siteEventLogger = siteEventLogger; Receive(HandleCreateConnection); Receive(HandleRoute); Receive(HandleRoute); Receive(HandleRouteWrite); Receive(HandleRemoveConnection); Receive(HandleGetAllHealthReports); } private void HandleCreateConnection(CreateConnectionCommand command) { if (_connectionActors.ContainsKey(command.ConnectionName)) { _log.Warning("Connection {0} already exists", command.ConnectionName); return; } // WP-34: Factory creates the correct adapter based on protocol type var adapter = _factory.Create(command.ProtocolType, command.PrimaryConnectionDetails); var props = Props.Create(() => new DataConnectionActor( command.ConnectionName, adapter, _options, _healthCollector, _factory, command.ProtocolType, command.PrimaryConnectionDetails, command.BackupConnectionDetails, command.FailoverRetryCount, _siteEventLogger)); // Sanitize name for Akka actor path (replace spaces and invalid chars) var actorName = new string(command.ConnectionName .Select(c => char.IsLetterOrDigit(c) || "-_.*$+:@&=,!~';()".Contains(c) ? c : '-') .ToArray()); var actorRef = Context.ActorOf(props, actorName); _connectionActors[command.ConnectionName] = actorRef; _log.Info("Created DataConnectionActor for {0} (protocol={1})", command.ConnectionName, command.ProtocolType); } private void HandleRoute(SubscribeTagsRequest request) { if (_connectionActors.TryGetValue(request.ConnectionName, out var actor)) actor.Forward(request); else { _log.Warning("No connection actor for {0}", request.ConnectionName); Sender.Tell(new SubscribeTagsResponse( request.CorrelationId, request.InstanceUniqueName, false, $"Unknown connection: {request.ConnectionName}", DateTimeOffset.UtcNow)); } } private void HandleRoute(UnsubscribeTagsRequest request) { if (_connectionActors.TryGetValue(request.ConnectionName, out var actor)) actor.Forward(request); else _log.Warning("No connection actor for {0} during unsubscribe", request.ConnectionName); } private void HandleRouteWrite(WriteTagRequest request) { if (_connectionActors.TryGetValue(request.ConnectionName, out var actor)) actor.Forward(request); else { _log.Warning("No connection actor for {0}", request.ConnectionName); Sender.Tell(new WriteTagResponse( request.CorrelationId, false, $"Unknown connection: {request.ConnectionName}", DateTimeOffset.UtcNow)); } } private void HandleRemoveConnection(RemoveConnectionCommand command) { if (_connectionActors.TryGetValue(command.ConnectionName, out var actor)) { Context.Stop(actor); _connectionActors.Remove(command.ConnectionName); _healthCollector.RemoveConnection(command.ConnectionName); _log.Info("Removed DataConnectionActor for {0}", command.ConnectionName); } } private void HandleGetAllHealthReports(GetAllHealthReports _) { // Forward health report requests to all connection actors foreach (var actor in _connectionActors.Values) { actor.Forward(new DataConnectionActor.GetHealthReport()); } } /// /// OneForOneStrategy with Resume for connection actors. /// /// DataConnectionLayer-002: a DataConnectionActor is a long-lived, stateful /// coordinator — its in-memory subscription registry (_subscriptionsByInstance, /// _subscriptionIds, _subscribers) is the only record of which Instance Actors /// subscribed to which tags, and there is no durable store to rebuild it from. /// Restart would create a fresh instance and silently discard that registry, /// breaking the design doc's "transparent re-subscribe" guarantee (WP-10): /// subscribers would never be re-subscribed and would sit at stale quality with /// no error. Resume keeps the actor instance and its state intact, so a transient /// exception in a message handler does not lose subscription state. The actor's /// own Become/Stash reconnect state machine already recovers connection-level /// faults, so it does not need a restart to re-establish the connection. /// This matches the ScadaLink convention of Resume for coordinator actors. /// protected override SupervisorStrategy SupervisorStrategy() { return new OneForOneStrategy( maxNrOfRetries: 10, withinTimeRange: TimeSpan.FromMinutes(1), decider: Decider.From(ex => { _log.Warning(ex, "DataConnectionActor threw exception, resuming (subscription state preserved)"); return Directive.Resume; })); } } /// /// Command to remove a data connection actor. /// public record RemoveConnectionCommand(string ConnectionName); /// /// Request for health reports from all active connections. /// public record GetAllHealthReports;