using Akka.Actor; using Akka.Cluster.Tools.Client; using Akka.Event; using ScadaLink.Commons.Messages.Artifacts; using ScadaLink.Commons.Messages.Audit; using ScadaLink.Commons.Messages.DebugView; using ScadaLink.Commons.Messages.Deployment; using ScadaLink.Commons.Messages.Health; using ScadaLink.Commons.Messages.InboundApi; using ScadaLink.Commons.Messages.Integration; using ScadaLink.Commons.Messages.Lifecycle; using ScadaLink.Commons.Messages.Notification; using ScadaLink.Commons.Messages.RemoteQuery; namespace ScadaLink.Communication.Actors; /// /// Site-side actor that receives messages from central via ClusterClient and routes /// them to the appropriate local actors. Also sends heartbeats and health reports /// to central via the registered ClusterClient. /// /// WP-4: Routes all 8 message patterns to local handlers. /// public class SiteCommunicationActor : ReceiveActor, IWithTimers { private readonly ILoggingAdapter _log = Context.GetLogger(); private readonly string _siteId; private readonly CommunicationOptions _options; /// /// Reference to the local Deployment Manager singleton proxy. /// private readonly IActorRef _deploymentManagerProxy; /// /// ClusterClient reference for sending messages to the central cluster. /// Set via RegisterCentralClient message. /// private IActorRef? _centralClient; /// /// Local actor references for routing specific message patterns. /// Populated via registration messages. /// private IActorRef? _eventLogHandler; private IActorRef? _parkedMessageHandler; private IActorRef? _integrationHandler; private IActorRef? _artifactHandler; public ITimerScheduler Timers { get; set; } = null!; public SiteCommunicationActor( string siteId, CommunicationOptions options, IActorRef deploymentManagerProxy) { _siteId = siteId; _options = options; _deploymentManagerProxy = deploymentManagerProxy; // Registration Receive(msg => { _centralClient = msg.Client; _log.Info("Registered central ClusterClient"); }); Receive(HandleRegisterLocalHandler); // Pattern 1: Instance Deployment — forward to Deployment Manager Receive(msg => { _log.Debug("Routing DeployInstanceCommand for {0} to DeploymentManager", msg.InstanceUniqueName); _deploymentManagerProxy.Forward(msg); }); // Pattern 2: Lifecycle — forward to Deployment Manager Receive(msg => _deploymentManagerProxy.Forward(msg)); Receive(msg => _deploymentManagerProxy.Forward(msg)); Receive(msg => _deploymentManagerProxy.Forward(msg)); // DeploymentManager-006: query-the-site-before-redeploy — forward to // the Deployment Manager, which owns the deployed-config store and // answers with the instance's currently-applied deployment identity. Receive(msg => _deploymentManagerProxy.Forward(msg)); // Pattern 3: Artifact Deployment — forward to artifact handler if registered Receive(msg => { if (_artifactHandler != null) _artifactHandler.Forward(msg); else { _log.Warning("No artifact handler registered, replying with failure"); Sender.Tell(new ArtifactDeploymentResponse( msg.DeploymentId, _siteId, false, "Artifact handler not available", DateTimeOffset.UtcNow)); } }); // Pattern 4: Integration Routing — forward to integration handler Receive(msg => { if (_integrationHandler != null) _integrationHandler.Forward(msg); else { Sender.Tell(new IntegrationCallResponse( msg.CorrelationId, _siteId, false, null, "Integration handler not available", DateTimeOffset.UtcNow)); } }); // Pattern 5: Debug View — forward to Deployment Manager (which routes to Instance Actor) Receive(msg => _deploymentManagerProxy.Forward(msg)); Receive(msg => _deploymentManagerProxy.Forward(msg)); // Pattern 6a: Debug Snapshot (one-shot) — forward to Deployment Manager Receive(msg => _deploymentManagerProxy.Forward(msg)); // Inbound API Route.To() — forward to Deployment Manager for instance routing Receive(msg => _deploymentManagerProxy.Forward(msg)); Receive(msg => _deploymentManagerProxy.Forward(msg)); Receive(msg => _deploymentManagerProxy.Forward(msg)); // Pattern 7: Remote Queries Receive(msg => { if (_eventLogHandler != null) _eventLogHandler.Forward(msg); else { Sender.Tell(new EventLogQueryResponse( msg.CorrelationId, _siteId, [], null, false, false, "Event log handler not available", DateTimeOffset.UtcNow)); } }); Receive(msg => { if (_parkedMessageHandler != null) _parkedMessageHandler.Forward(msg); else { Sender.Tell(new ParkedMessageQueryResponse( msg.CorrelationId, _siteId, [], 0, msg.PageNumber, msg.PageSize, false, "Parked message handler not available", DateTimeOffset.UtcNow)); } }); Receive(msg => { if (_parkedMessageHandler != null) _parkedMessageHandler.Forward(msg); else { Sender.Tell(new ParkedMessageRetryResponse( msg.CorrelationId, false, "Parked message handler not available")); } }); Receive(msg => { if (_parkedMessageHandler != null) _parkedMessageHandler.Forward(msg); else { Sender.Tell(new ParkedMessageDiscardResponse( msg.CorrelationId, false, "Parked message handler not available")); } }); // Task 5 (#22): central→site Retry/Discard relay for parked cached // operations. SiteCallAuditActor relays these over the command/control // channel; the parked-message handler executes them against the local // S&F buffer and replies a ParkedOperationActionAck that routes back to // the relaying SiteCallAuditActor's Ask. Receive(msg => { if (_parkedMessageHandler != null) _parkedMessageHandler.Forward(msg); else { Sender.Tell(new ParkedOperationActionAck( msg.CorrelationId, Applied: false, "Parked message handler not available")); } }); Receive(msg => { if (_parkedMessageHandler != null) _parkedMessageHandler.Forward(msg); else { Sender.Tell(new ParkedOperationActionAck( msg.CorrelationId, Applied: false, "Parked message handler not available")); } }); // Notification Outbox: forward a buffered notification submitted by the site // Store-and-Forward Engine to the central cluster. The original Sender (the // S&F forwarder's Ask) is forwarded as the ClusterClient.Send sender so the // NotificationSubmitAck routes straight back to the waiting Ask, not here. Receive(msg => { if (_centralClient == null) { // No ClusterClient registered yet (e.g. central contact points not // configured, or registration not yet completed). A non-accepted ack // makes the S&F forwarder treat this as transient and retry later. _log.Warning( "Cannot forward NotificationSubmit {0} — no central ClusterClient registered", msg.NotificationId); Sender.Tell(new NotificationSubmitAck( msg.NotificationId, Accepted: false, Error: "Central ClusterClient not registered")); return; } _log.Debug("Forwarding NotificationSubmit {0} to central", msg.NotificationId); _centralClient.Tell( new ClusterClient.Send("/user/central-communication", msg), Sender); }); // Notification Outbox: forward a Notify.Status query to the central cluster. // The original Sender (the Notify helper's Ask) is forwarded as the // ClusterClient.Send sender so the NotificationStatusResponse routes straight // back to the waiting Ask, not here. Receive(msg => { if (_centralClient == null) { // No ClusterClient registered yet. Reply Found: false so Notify.Status // falls back to the site S&F buffer to decide Forwarding vs Unknown. _log.Warning( "Cannot forward NotificationStatusQuery {0} — no central ClusterClient registered", msg.NotificationId); Sender.Tell(new NotificationStatusResponse( msg.CorrelationId, Found: false, Status: "Unknown", RetryCount: 0, LastError: null, DeliveredAt: null)); return; } _log.Debug("Forwarding NotificationStatusQuery {0} to central", msg.NotificationId); _centralClient.Tell( new ClusterClient.Send("/user/central-communication", msg), Sender); }); // Audit Log (#23): forward a batch of site-local audit events to the // central cluster. The site SiteAuditTelemetryActor drains its SQLite // Pending queue through the ClusterClientSiteAuditClient, which Asks // this actor; the original Sender (that Ask) is passed as the // ClusterClient.Send sender so the IngestAuditEventsReply routes // straight back to the waiting Ask, not here. Mirrors NotificationSubmit. Receive(msg => { if (_centralClient == null) { // No ClusterClient registered yet (e.g. central contact points // not configured, or registration not yet completed). Faulting // the Ask makes the SiteAuditTelemetryActor drain loop treat // this as transient and keep the rows Pending for the next tick. _log.Warning( "Cannot forward IngestAuditEventsCommand ({0} events) — no central ClusterClient registered", msg.Events.Count); Sender.Tell(new Status.Failure( new InvalidOperationException("Central ClusterClient not registered"))); return; } _log.Debug("Forwarding IngestAuditEventsCommand ({0} events) to central", msg.Events.Count); _centralClient.Tell( new ClusterClient.Send("/user/central-communication", msg), Sender); }); // Audit Log (#23) M3: forward a batch of combined cached-call telemetry // packets to the central cluster. Same forward + reply-routing pattern // as IngestAuditEventsCommand; central replies with an // IngestCachedTelemetryReply. Receive(msg => { if (_centralClient == null) { _log.Warning( "Cannot forward IngestCachedTelemetryCommand ({0} entries) — no central ClusterClient registered", msg.Entries.Count); Sender.Tell(new Status.Failure( new InvalidOperationException("Central ClusterClient not registered"))); return; } _log.Debug("Forwarding IngestCachedTelemetryCommand ({0} entries) to central", msg.Entries.Count); _centralClient.Tell( new ClusterClient.Send("/user/central-communication", msg), Sender); }); // Internal: send heartbeat tick Receive(_ => SendHeartbeatToCentral()); // Internal: forward health report to central Receive(msg => { _centralClient?.Tell( new ClusterClient.Send("/user/central-communication", msg), Self); }); } /// /// Coordinator supervision strategy (CLAUDE.md: "Resume for coordinator actors"). /// SiteCommunicationActor is a long-lived coordinator routing all message /// patterns to local handlers; a transient fault in a child must Resume so the /// child's in-memory state is preserved rather than discarded by a Restart. /// protected override SupervisorStrategy SupervisorStrategy() { return new OneForOneStrategy( maxNrOfRetries: -1, withinTimeRange: Timeout.InfiniteTimeSpan, decider: Decider.From(ex => { _log.Warning(ex, "Child actor of SiteCommunicationActor faulted, resuming (state preserved)"); return Directive.Resume; })); } protected override void PreStart() { _log.Info("SiteCommunicationActor started for site {0}", _siteId); // Schedule periodic heartbeat to central Timers.StartPeriodicTimer( "heartbeat", new SendHeartbeat(), TimeSpan.FromSeconds(1), // initial delay _options.TransportHeartbeatInterval); } private void HandleRegisterLocalHandler(RegisterLocalHandler msg) { switch (msg.HandlerType) { case LocalHandlerType.EventLog: _eventLogHandler = msg.Handler; break; case LocalHandlerType.ParkedMessages: _parkedMessageHandler = msg.Handler; break; case LocalHandlerType.Integration: _integrationHandler = msg.Handler; break; case LocalHandlerType.Artifacts: _artifactHandler = msg.Handler; break; } _log.Info("Registered local handler for {0}", msg.HandlerType); } private void SendHeartbeatToCentral() { if (_centralClient == null) return; var hostname = Environment.MachineName; var heartbeat = new HeartbeatMessage( _siteId, hostname, IsActive: true, DateTimeOffset.UtcNow); _centralClient.Tell( new ClusterClient.Send("/user/central-communication", heartbeat), Self); } // ── Internal messages ── internal record SendHeartbeat; } /// /// Command to register a ClusterClient for communicating with the central cluster. /// public record RegisterCentralClient(IActorRef Client); /// /// Command to register a local actor as a handler for a specific message pattern. /// public record RegisterLocalHandler(LocalHandlerType HandlerType, IActorRef Handler); public enum LocalHandlerType { EventLog, ParkedMessages, Integration, Artifacts }