using Akka.Actor; using Akka.Cluster.Tools.Client; using Akka.Event; using ScadaLink.Commons.Messages.Artifacts; using ScadaLink.Commons.Messages.DebugView; using ScadaLink.Commons.Messages.Deployment; using ScadaLink.Commons.Messages.Health; using ScadaLink.Commons.Messages.InboundApi; using ScadaLink.Commons.Messages.Integration; using ScadaLink.Commons.Messages.Lifecycle; using ScadaLink.Commons.Messages.RemoteQuery; namespace ScadaLink.Communication.Actors; /// /// Site-side actor that receives messages from central via ClusterClient and routes /// them to the appropriate local actors. Also sends heartbeats and health reports /// to central via the registered ClusterClient. /// /// WP-4: Routes all 8 message patterns to local handlers. /// public class SiteCommunicationActor : ReceiveActor, IWithTimers { private readonly ILoggingAdapter _log = Context.GetLogger(); private readonly string _siteId; private readonly CommunicationOptions _options; /// /// Reference to the local Deployment Manager singleton proxy. /// private readonly IActorRef _deploymentManagerProxy; /// /// ClusterClient reference for sending messages to the central cluster. /// Set via RegisterCentralClient message. /// private IActorRef? _centralClient; /// /// Local actor references for routing specific message patterns. /// Populated via registration messages. /// private IActorRef? _eventLogHandler; private IActorRef? _parkedMessageHandler; private IActorRef? _integrationHandler; private IActorRef? _artifactHandler; public ITimerScheduler Timers { get; set; } = null!; public SiteCommunicationActor( string siteId, CommunicationOptions options, IActorRef deploymentManagerProxy) { _siteId = siteId; _options = options; _deploymentManagerProxy = deploymentManagerProxy; // Registration Receive(msg => { _centralClient = msg.Client; _log.Info("Registered central ClusterClient"); }); Receive(HandleRegisterLocalHandler); // Pattern 1: Instance Deployment — forward to Deployment Manager Receive(msg => { _log.Debug("Routing DeployInstanceCommand for {0} to DeploymentManager", msg.InstanceUniqueName); _deploymentManagerProxy.Forward(msg); }); // Pattern 2: Lifecycle — forward to Deployment Manager Receive(msg => _deploymentManagerProxy.Forward(msg)); Receive(msg => _deploymentManagerProxy.Forward(msg)); Receive(msg => _deploymentManagerProxy.Forward(msg)); // Pattern 3: Artifact Deployment — forward to artifact handler if registered Receive(msg => { if (_artifactHandler != null) _artifactHandler.Forward(msg); else { _log.Warning("No artifact handler registered, replying with failure"); Sender.Tell(new ArtifactDeploymentResponse( msg.DeploymentId, _siteId, false, "Artifact handler not available", DateTimeOffset.UtcNow)); } }); // Pattern 4: Integration Routing — forward to integration handler Receive(msg => { if (_integrationHandler != null) _integrationHandler.Forward(msg); else { Sender.Tell(new IntegrationCallResponse( msg.CorrelationId, _siteId, false, null, "Integration handler not available", DateTimeOffset.UtcNow)); } }); // Pattern 5: Debug View — forward to Deployment Manager (which routes to Instance Actor) Receive(msg => _deploymentManagerProxy.Forward(msg)); Receive(msg => _deploymentManagerProxy.Forward(msg)); // Pattern 6a: Debug Snapshot (one-shot) — forward to Deployment Manager Receive(msg => _deploymentManagerProxy.Forward(msg)); // Inbound API Route.To().Call() — forward to Deployment Manager for instance routing Receive(msg => _deploymentManagerProxy.Forward(msg)); // Pattern 7: Remote Queries Receive(msg => { if (_eventLogHandler != null) _eventLogHandler.Forward(msg); else { Sender.Tell(new EventLogQueryResponse( msg.CorrelationId, _siteId, [], null, false, false, "Event log handler not available", DateTimeOffset.UtcNow)); } }); Receive(msg => { if (_parkedMessageHandler != null) _parkedMessageHandler.Forward(msg); else { Sender.Tell(new ParkedMessageQueryResponse( msg.CorrelationId, _siteId, [], 0, msg.PageNumber, msg.PageSize, false, "Parked message handler not available", DateTimeOffset.UtcNow)); } }); // Internal: send heartbeat tick Receive(_ => SendHeartbeatToCentral()); // Internal: forward health report to central Receive(msg => { _centralClient?.Tell( new ClusterClient.Send("/user/central-communication", msg), Self); }); } protected override void PreStart() { _log.Info("SiteCommunicationActor started for site {0}", _siteId); // Schedule periodic heartbeat to central Timers.StartPeriodicTimer( "heartbeat", new SendHeartbeat(), TimeSpan.FromSeconds(1), // initial delay _options.TransportHeartbeatInterval); } private void HandleRegisterLocalHandler(RegisterLocalHandler msg) { switch (msg.HandlerType) { case LocalHandlerType.EventLog: _eventLogHandler = msg.Handler; break; case LocalHandlerType.ParkedMessages: _parkedMessageHandler = msg.Handler; break; case LocalHandlerType.Integration: _integrationHandler = msg.Handler; break; case LocalHandlerType.Artifacts: _artifactHandler = msg.Handler; break; } _log.Info("Registered local handler for {0}", msg.HandlerType); } private void SendHeartbeatToCentral() { if (_centralClient == null) return; var hostname = Environment.MachineName; var heartbeat = new HeartbeatMessage( _siteId, hostname, IsActive: true, DateTimeOffset.UtcNow); _centralClient.Tell( new ClusterClient.Send("/user/central-communication", heartbeat), Self); } // ── Internal messages ── internal record SendHeartbeat; } /// /// Command to register a ClusterClient for communicating with the central cluster. /// public record RegisterCentralClient(IActorRef Client); /// /// Command to register a local actor as a handler for a specific message pattern. /// public record RegisterLocalHandler(LocalHandlerType HandlerType, IActorRef Handler); public enum LocalHandlerType { EventLog, ParkedMessages, Integration, Artifacts }