using System.Collections.Immutable; using Akka.Actor; using Akka.Cluster.Tools.Client; using Akka.Event; using Microsoft.Extensions.DependencyInjection; using ScadaLink.Commons.Interfaces.Repositories; using ScadaLink.Commons.Messages.Communication; using ScadaLink.Commons.Messages.Health; using ScadaLink.HealthMonitoring; namespace ScadaLink.Communication.Actors; /// /// Abstraction for creating ClusterClient instances per site, enabling testability. /// public interface ISiteClientFactory { IActorRef Create(ActorSystem system, string siteId, ImmutableHashSet contacts); } /// /// Default implementation that creates a real ClusterClient for each site. /// public class DefaultSiteClientFactory : ISiteClientFactory { public IActorRef Create(ActorSystem system, string siteId, ImmutableHashSet contacts) { var settings = ClusterClientSettings.Create(system).WithInitialContacts(contacts); return system.ActorOf(ClusterClient.Props(settings), $"site-client-{siteId}"); } } /// /// Central-side actor that routes messages from central to site clusters via ClusterClient. /// Resolves site addresses from the database on a periodic refresh cycle and manages /// per-site ClusterClient instances. /// /// WP-4: All 8 message patterns routed through this actor. /// WP-5: Ask timeout on connection drop (no central buffering). Debug streams killed on interruption. /// public class CentralCommunicationActor : ReceiveActor { private readonly ILoggingAdapter _log = Context.GetLogger(); private readonly IServiceProvider _serviceProvider; private readonly ISiteClientFactory _siteClientFactory; /// /// Per-site ClusterClient instances and their contact addresses. /// Maps SiteIdentifier → (ClusterClient actor, set of contact address strings). /// Refreshed periodically via RefreshSiteAddresses. /// private Dictionary ContactAddresses)> _siteClients = new(); /// /// Tracks active debug view subscriptions: correlationId → (siteId, subscriber). /// Used to kill debug streams on site disconnection (WP-5). /// private readonly Dictionary _debugSubscriptions = new(); /// /// Tracks in-progress deployments: deploymentId → siteId. /// On central failover, in-progress deployments are treated as failed (WP-5). /// private readonly Dictionary _inProgressDeployments = new(); private ICancelable? _refreshSchedule; public CentralCommunicationActor(IServiceProvider serviceProvider, ISiteClientFactory siteClientFactory) { _serviceProvider = serviceProvider; _siteClientFactory = siteClientFactory; // Site address cache loaded from database Receive(HandleSiteAddressCacheLoaded); // Periodic refresh trigger Receive(_ => LoadSiteAddressesFromDb()); // Health monitoring: heartbeats and health reports from sites Receive(HandleHeartbeat); Receive(HandleSiteHealthReport); // Connection state changes Receive(HandleConnectionStateChanged); // Route enveloped messages to sites Receive(HandleSiteEnvelope); // Route debug stream events from sites to the correct bridge actor Receive(HandleDebugStreamEvent); } private void HandleHeartbeat(HeartbeatMessage heartbeat) { // Forward heartbeat to parent for any interested central actors Context.Parent.Tell(heartbeat); } private void HandleDebugStreamEvent(Commons.Messages.DebugView.DebugStreamEvent msg) { if (_debugSubscriptions.TryGetValue(msg.CorrelationId, out var entry)) { entry.Subscriber.Tell(msg.Event); } else { _log.Debug("No debug subscription found for correlationId {0}, dropping event", msg.CorrelationId); } } private void HandleSiteHealthReport(SiteHealthReport report) { var aggregator = _serviceProvider.GetService(); if (aggregator != null) { aggregator.ProcessReport(report); } else { _log.Warning("ICentralHealthAggregator not available, dropping health report from site {0}", report.SiteId); } } private void HandleConnectionStateChanged(ConnectionStateChanged msg) { if (!msg.IsConnected) { _log.Warning("Site {0} disconnected at {1}", msg.SiteId, msg.Timestamp); // WP-5: Kill active debug streams for the disconnected site var toRemove = _debugSubscriptions .Where(kvp => kvp.Value.SiteId == msg.SiteId) .ToList(); foreach (var kvp in toRemove) { _log.Info("Killing debug stream {0} for disconnected site {1}", kvp.Key, msg.SiteId); kvp.Value.Subscriber.Tell(new DebugStreamTerminated(msg.SiteId, kvp.Key)); _debugSubscriptions.Remove(kvp.Key); } // WP-5: Mark in-progress deployments as failed var failedDeployments = _inProgressDeployments .Where(kvp => kvp.Value == msg.SiteId) .Select(kvp => kvp.Key) .ToList(); foreach (var deploymentId in failedDeployments) { _log.Warning("Deployment {0} to site {1} treated as failed due to disconnection", deploymentId, msg.SiteId); _inProgressDeployments.Remove(deploymentId); } // Note: Do NOT stop the ClusterClient — it handles reconnection internally } else { _log.Info("Site {0} connected at {1}", msg.SiteId, msg.Timestamp); } } private void HandleSiteEnvelope(SiteEnvelope envelope) { if (!_siteClients.TryGetValue(envelope.SiteId, out var entry)) { _log.Warning("No ClusterClient for site {0}, cannot route message {1}", envelope.SiteId, envelope.Message.GetType().Name); // The Ask will timeout on the caller side — no central buffering (WP-5) return; } // Track debug subscriptions for cleanup on disconnect TrackMessageForCleanup(envelope); // Route via ClusterClient — Sender is preserved for Ask response routing entry.Client.Tell( new ClusterClient.Send("/user/site-communication", envelope.Message), Sender); } private void LoadSiteAddressesFromDb() { var self = Self; Task.Run(async () => { using var scope = _serviceProvider.CreateScope(); var repo = scope.ServiceProvider.GetRequiredService(); var sites = await repo.GetAllSitesAsync(); var contacts = new Dictionary>(); foreach (var site in sites) { var addrs = new List(); if (!string.IsNullOrWhiteSpace(site.NodeAAddress)) { var addr = site.NodeAAddress; // Strip actor path suffix if present (legacy format) var idx = addr.IndexOf("/user/"); if (idx > 0) addr = addr.Substring(0, idx); addrs.Add(addr); } if (!string.IsNullOrWhiteSpace(site.NodeBAddress)) { var addr = site.NodeBAddress; var idx = addr.IndexOf("/user/"); if (idx > 0) addr = addr.Substring(0, idx); addrs.Add(addr); } if (addrs.Count > 0) contacts[site.SiteIdentifier] = addrs; } return new SiteAddressCacheLoaded(contacts); }).PipeTo(self); } private void HandleSiteAddressCacheLoaded(SiteAddressCacheLoaded msg) { var newSiteIds = msg.SiteContacts.Keys.ToHashSet(); var existingSiteIds = _siteClients.Keys.ToHashSet(); // Stop ClusterClients for removed sites foreach (var removed in existingSiteIds.Except(newSiteIds)) { _log.Info("Stopping ClusterClient for removed site {0}", removed); Context.Stop(_siteClients[removed].Client); _siteClients.Remove(removed); } // Add or update foreach (var (siteId, addresses) in msg.SiteContacts) { var contactPaths = addresses .Select(a => ActorPath.Parse($"{a}/system/receptionist")) .ToImmutableHashSet(); var contactStrings = addresses.ToImmutableHashSet(); // Skip if unchanged if (_siteClients.TryGetValue(siteId, out var existing) && existing.ContactAddresses.SetEquals(contactStrings)) continue; // Stop old client if addresses changed if (_siteClients.ContainsKey(siteId)) { _log.Info("Updating ClusterClient for site {0} (addresses changed)", siteId); Context.Stop(_siteClients[siteId].Client); } var client = _siteClientFactory.Create(Context.System, siteId, contactPaths); _siteClients[siteId] = (client, contactStrings); _log.Info("Created ClusterClient for site {0} with {1} contact(s)", siteId, addresses.Count); } _log.Info("Site ClusterClient cache refreshed with {0} site(s)", _siteClients.Count); } private void TrackMessageForCleanup(SiteEnvelope envelope) { switch (envelope.Message) { case Commons.Messages.DebugView.SubscribeDebugViewRequest sub: _debugSubscriptions[sub.CorrelationId] = (envelope.SiteId, Sender); break; case Commons.Messages.DebugView.UnsubscribeDebugViewRequest unsub: _debugSubscriptions.Remove(unsub.CorrelationId); break; case Commons.Messages.Deployment.DeployInstanceCommand deploy: _inProgressDeployments[deploy.DeploymentId] = envelope.SiteId; break; } } protected override void PreStart() { _log.Info("CentralCommunicationActor started"); // Schedule periodic refresh of site addresses from the database _refreshSchedule = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable( TimeSpan.Zero, TimeSpan.FromSeconds(60), Self, new RefreshSiteAddresses(), ActorRefs.NoSender); } protected override void PostStop() { _log.Info("CentralCommunicationActor stopped. In-progress deployments treated as failed (WP-5)."); _refreshSchedule?.Cancel(); // On central failover, all in-progress deployments are failed _inProgressDeployments.Clear(); _debugSubscriptions.Clear(); } } /// /// Command to trigger a refresh of site addresses from the database. /// public record RefreshSiteAddresses; /// /// Internal message carrying the loaded site contact data from the database. /// ClusterClient creation happens on the actor thread in HandleSiteAddressCacheLoaded. /// internal record SiteAddressCacheLoaded(Dictionary> SiteContacts); /// /// Notification sent to debug view subscribers when the stream is terminated /// due to site disconnection (WP-5). /// public record DebugStreamTerminated(string SiteId, string CorrelationId);