using Akka.Actor; using Akka.Event; using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Protocol; using ZB.MOM.WW.ScadaBridge.Commons.Messages.DataConnection; using ZB.MOM.WW.ScadaBridge.Commons.Messages.Management; using ZB.MOM.WW.ScadaBridge.HealthMonitoring; using ZB.MOM.WW.ScadaBridge.SiteEventLogging; namespace ZB.MOM.WW.ScadaBridge.DataConnectionLayer.Actors; /// /// WP-34: Protocol extensibility — manages DataConnectionActor instances. /// Routes messages to the correct connection actor based on connection name. /// Adding a new protocol = implement IDataConnection + register with IDataConnectionFactory. /// public class DataConnectionManagerActor : ReceiveActor { private readonly ILoggingAdapter _log = Context.GetLogger(); private readonly IDataConnectionFactory _factory; private readonly DataConnectionOptions _options; private readonly ISiteHealthCollector _healthCollector; private readonly ISiteEventLogger? _siteEventLogger; private readonly Dictionary _connectionActors = new(); /// /// Initializes a new with the required dependencies. /// /// Factory used to create protocol-specific data connection adapters. /// Configuration options for data connections. /// Collector for site health metrics reported by connection actors. /// Optional logger for site event entries; null disables site event logging. public DataConnectionManagerActor( IDataConnectionFactory factory, DataConnectionOptions options, ISiteHealthCollector healthCollector, ISiteEventLogger? siteEventLogger = null) { _factory = factory; _options = options; _healthCollector = healthCollector; _siteEventLogger = siteEventLogger; Receive(HandleCreateConnection); Receive(HandleRoute); Receive(HandleRoute); Receive(HandleRouteWrite); Receive(HandleRemoveConnection); Receive(HandleGetAllHealthReports); Receive(HandleBrowse); Receive(HandleReadTagValues); } private void HandleCreateConnection(CreateConnectionCommand command) { if (_connectionActors.ContainsKey(command.ConnectionName)) { _log.Warning("Connection {0} already exists", command.ConnectionName); return; } // WP-34: Factory creates the correct adapter based on protocol type var adapter = _factory.Create(command.ProtocolType, command.PrimaryConnectionDetails); var props = Props.Create(() => new DataConnectionActor( command.ConnectionName, adapter, _options, _healthCollector, _factory, command.ProtocolType, command.PrimaryConnectionDetails, command.BackupConnectionDetails, command.FailoverRetryCount, _siteEventLogger)); // Sanitize name for Akka actor path (replace spaces and invalid chars) var actorName = new string(command.ConnectionName .Select(c => char.IsLetterOrDigit(c) || "-_.*$+:@&=,!~';()".Contains(c) ? c : '-') .ToArray()); var actorRef = Context.ActorOf(props, actorName); _connectionActors[command.ConnectionName] = actorRef; _log.Info("Created DataConnectionActor for {0} (protocol={1})", command.ConnectionName, command.ProtocolType); } private void HandleRoute(SubscribeTagsRequest request) { if (_connectionActors.TryGetValue(request.ConnectionName, out var actor)) actor.Forward(request); else { _log.Warning("No connection actor for {0}", request.ConnectionName); Sender.Tell(new SubscribeTagsResponse( request.CorrelationId, request.InstanceUniqueName, false, $"Unknown connection: {request.ConnectionName}", DateTimeOffset.UtcNow)); } } private void HandleRoute(UnsubscribeTagsRequest request) { if (_connectionActors.TryGetValue(request.ConnectionName, out var actor)) actor.Forward(request); else _log.Warning("No connection actor for {0} during unsubscribe", request.ConnectionName); } private void HandleRouteWrite(WriteTagRequest request) { if (_connectionActors.TryGetValue(request.ConnectionName, out var actor)) actor.Forward(request); else { _log.Warning("No connection actor for {0}", request.ConnectionName); Sender.Tell(new WriteTagResponse( request.CorrelationId, false, $"Unknown connection: {request.ConnectionName}", DateTimeOffset.UtcNow)); } } /// /// Routes a from the central UI's OPC UA /// Tag Browser to the child that owns the /// named connection. The manager is the only actor that knows whether a /// connection exists at this site — so it owns the /// failure. Everything /// else (capability check, session state, server errors) lives inside the /// child where the adapter is held. /// private void HandleBrowse(BrowseNodeCommand command) { if (_connectionActors.TryGetValue(command.ConnectionName, out var actor)) { actor.Forward(command); } else { _log.Warning("No connection actor for {0} during browse", command.ConnectionName); Sender.Tell(new BrowseNodeResult( Array.Empty(), Truncated: false, new BrowseFailure( BrowseFailureKind.ConnectionNotFound, $"No data connection named '{command.ConnectionName}' at this site."))); } } /// /// Routes a from the CentralUI's Test /// Bindings dialog to the child that /// owns the named connection. Same split as — /// the manager owns /// because it is /// the only actor with site-level visibility; every other failure /// (not connected, server error, timeout) is resolved by the child where /// the adapter is held. /// private void HandleReadTagValues(ReadTagValuesCommand command) { if (_connectionActors.TryGetValue(command.ConnectionName, out var actor)) { actor.Forward(command); } else { _log.Warning("No connection actor for {0} during test-bindings read", command.ConnectionName); Sender.Tell(new ReadTagValuesResult( Array.Empty(), new ReadTagValuesFailure( ReadTagValuesFailureKind.ConnectionNotFound, $"No data connection named '{command.ConnectionName}' at this site."))); } } private void HandleRemoveConnection(RemoveConnectionCommand command) { if (_connectionActors.TryGetValue(command.ConnectionName, out var actor)) { Context.Stop(actor); _connectionActors.Remove(command.ConnectionName); _healthCollector.RemoveConnection(command.ConnectionName); _log.Info("Removed DataConnectionActor for {0}", command.ConnectionName); } } private void HandleGetAllHealthReports(GetAllHealthReports _) { // Forward health report requests to all connection actors foreach (var actor in _connectionActors.Values) { actor.Forward(new DataConnectionActor.GetHealthReport()); } } /// protected override SupervisorStrategy SupervisorStrategy() { return new OneForOneStrategy( maxNrOfRetries: 10, withinTimeRange: TimeSpan.FromMinutes(1), decider: Decider.From(ex => { _log.Warning(ex, "DataConnectionActor threw exception, resuming (subscription state preserved)"); return Directive.Resume; })); } } /// /// Command to remove a data connection actor. /// public record RemoveConnectionCommand(string ConnectionName); /// /// Request for health reports from all active connections. /// public record GetAllHealthReports;