using System.Collections.Immutable; using Akka.Actor; using Akka.Cluster.Tools.Client; using Akka.Cluster.Tools.PublishSubscribe; using Akka.Event; using Microsoft.Extensions.DependencyInjection; using ScadaLink.Commons.Interfaces.Repositories; using ScadaLink.Commons.Messages.Audit; using ScadaLink.Commons.Messages.Communication; using ScadaLink.Commons.Messages.Health; using ScadaLink.Commons.Messages.Notification; using ScadaLink.HealthMonitoring; namespace ScadaLink.Communication.Actors; /// /// Abstraction for creating ClusterClient instances per site, enabling testability. /// public interface ISiteClientFactory { /// Creates a ClusterClient actor for the given site with the specified contact points. /// The actor system in which to create the client. /// The site identifier, used to name the actor. /// The set of receptionist actor paths to use as initial contacts. /// An actor reference for the new ClusterClient. IActorRef Create(ActorSystem system, string siteId, ImmutableHashSet contacts); } /// /// Default implementation that creates a real ClusterClient for each site. /// public class DefaultSiteClientFactory : ISiteClientFactory { /// public IActorRef Create(ActorSystem system, string siteId, ImmutableHashSet contacts) { var settings = ClusterClientSettings.Create(system).WithInitialContacts(contacts); return system.ActorOf(ClusterClient.Props(settings), $"site-client-{siteId}"); } } /// /// Central-side actor that routes messages from central to site clusters via ClusterClient. /// Resolves site addresses from the database on a periodic refresh cycle and manages /// per-site ClusterClient instances. /// /// WP-4: All 8 message patterns routed through this actor. /// WP-5: Ask timeout on connection drop (no central buffering). Debug streams killed on interruption. /// public class CentralCommunicationActor : ReceiveActor { private readonly ILoggingAdapter _log = Context.GetLogger(); private readonly IServiceProvider _serviceProvider; private readonly ISiteClientFactory _siteClientFactory; /// /// Per-site ClusterClient instances and their contact addresses. /// Maps SiteIdentifier → (ClusterClient actor, set of contact address strings). /// Refreshed periodically via RefreshSiteAddresses. /// private readonly Dictionary ContactAddresses)> _siteClients = new(); // Communication-016: the previous _debugSubscriptions / _inProgressDeployments // dictionaries existed solely to support a documented "synchronous kill streams + // mark deployments failed on site disconnect" workflow triggered by // ConnectionStateChanged. No production code ever emitted that message — only // the unit test did — so the workflow was dead from end to end. Disconnect // detection is owned by the underlying transports: the gRPC keepalive PING // signals stream interruption in ~25s (handled by DebugStreamBridgeActor's own // reconnection logic), and an Ask round-trip for a deploy times out at the // CommunicationService layer (caller sees failure). The tracking dicts + // ConnectionStateChanged record + HandleConnectionStateChanged handler are // removed; see docs/requirements/Component-Communication.md "Connection // Failure Behavior" for the keepalive-based contract that survives. private ICancelable? _refreshSchedule; /// /// Communication-019: per-actor lifecycle CTS threaded into the periodic /// repository call so a hung MS SQL /// connection is bounded by actor shutdown rather than holding piped tasks /// open indefinitely. Cancelled in ; never reset. /// private readonly CancellationTokenSource _lifecycleCts = new(); /// /// Proxy for the central NotificationOutboxActor cluster singleton. /// Set via — the Host creates the singleton proxy /// after this actor and registers it (mirrors how the site-side actor receives its /// runtime s). Null until registration completes; a notification /// arriving before then is rejected with a non-accepted ack so the site retries. /// private IActorRef? _notificationOutboxProxy; /// /// Proxy for the central AuditLogIngestActor cluster /// singleton. Set via — the Host creates the /// singleton proxy after this actor and registers it (mirrors /// ). Null until registration completes; /// an audit ingest command arriving before then is answered with an empty /// reply so the site keeps its rows Pending and retries. /// /// Once registered, the handler Asks this proxy and pipes the reply straight /// back to the caller. On an Ask timeout or a faulted reply, PipeTo forwards a /// to the caller — the fault propagates rather /// than being swallowed. This differs from the gRPC handler /// (SiteStreamGrpcServer), which catches the exception and returns an /// empty ack; here the faulted Ask is the transient signal the site relies on /// (see ). /// private IActorRef? _auditIngestProxy; /// /// Default Ask timeout for routing audit ingest commands to the /// AuditLogIngestActor proxy — 30 s, matching the value of /// SiteStreamGrpcServer.AuditIngestAskTimeout (that constant is private /// to the gRPC server and not reachable here, so it is declared locally). A /// generous window absorbs a slow MS SQL connection without the round-trip /// surfacing as a failure on a healthy site. When the window is exceeded the /// Ask faults and that fault is piped back to the caller as a /// (see ). /// private static readonly TimeSpan DefaultAuditIngestAskTimeout = TimeSpan.FromSeconds(30); /// /// Effective Ask timeout for audit ingest routing. Defaults to /// ; overridable via the constructor /// so tests can exercise the timeout/fault path without waiting 30 s. /// private readonly TimeSpan _auditIngestAskTimeout; /// /// DistributedPubSub topic used to fan health reports out to the peer /// central node so both per-node aggregators stay in sync. See /// for the protocol rationale. /// private const string HealthReportTopic = "site-health-replica"; /// Initializes the and wires all message handlers. /// DI service provider for scoped repository and aggregator access. /// Factory used to create per-site ClusterClient actors. /// /// Optional override for the audit-ingest Ask timeout; defaults to /// (30 s). Exists only so tests can /// exercise the timeout/fault path quickly — production always uses the default. /// public CentralCommunicationActor( IServiceProvider serviceProvider, ISiteClientFactory siteClientFactory, TimeSpan? auditIngestAskTimeout = null) { _serviceProvider = serviceProvider; _siteClientFactory = siteClientFactory; _auditIngestAskTimeout = auditIngestAskTimeout ?? DefaultAuditIngestAskTimeout; // Site address cache loaded from database Receive(HandleSiteAddressCacheLoaded); // Periodic refresh trigger Receive(_ => LoadSiteAddressesFromDb()); // Communication-006: a faulted LoadSiteAddressesFromDb task is piped here as a // Status.Failure. Without this handler the failure was an unhandled message // (debug-level only) and the refresh failed silently — operators could not // distinguish "no sites configured" from "database is down". Log at Warning. Receive(failure => _log.Warning(failure.Cause, "Failed to load site addresses from the database; the site ClusterClient " + "cache was not refreshed and may be stale or empty")); // Health monitoring: heartbeats and health reports from sites Receive(HandleHeartbeat); Receive(HandleSiteHealthReport); Receive(r => ProcessLocally(r.Report)); Receive(_ => { /* DistributedPubSub subscribe confirmation */ }); // Route enveloped messages to sites Receive(HandleSiteEnvelope); // Notification Outbox: the Host registers the outbox singleton proxy after this // actor is created (the proxy cannot exist before this actor's construction). Receive(msg => { _notificationOutboxProxy = msg.OutboxProxy; _log.Info("Registered notification outbox proxy"); }); // Notification Outbox ingest: a site forwards a buffered NotificationSubmit to the // central cluster via ClusterClient. Forward to the outbox proxy so the original // Sender (the site's ClusterClient path) is preserved and the NotificationSubmitAck // routes straight back to the site. Receive(HandleNotificationSubmit); // Notification Outbox status query: forward to the outbox proxy, preserving Sender // so the NotificationStatusResponse routes back to the querying site. Receive(HandleNotificationStatusQuery); // Audit Log (#23): the Host registers the AuditLogIngestActor singleton // proxy after this actor is created (the proxy cannot exist before this // actor's construction). Receive(msg => { _auditIngestProxy = msg.AuditIngestActor; _log.Info("Registered audit ingest proxy"); }); // Audit Log (#23) site→central ingest: a site forwards a batch of audit // events to the central cluster via ClusterClient. Ask the ingest proxy // and pipe the IngestAuditEventsReply back to the original Sender (the // site's ClusterClient path) so the site can flip its rows to Forwarded. Receive(HandleIngestAuditEvents); // Audit Log (#23 M3) combined-telemetry ingest: routes to the same proxy // the same way; the proxy replies with an IngestCachedTelemetryReply. Receive(HandleIngestCachedTelemetry); } private void HandleNotificationSubmit(NotificationSubmit msg) { if (_notificationOutboxProxy == null) { // No outbox proxy registered yet. A non-accepted ack makes the site's // Store-and-Forward forwarder treat this as transient and retry later. _log.Warning( "Cannot route NotificationSubmit {0} — notification outbox not available", msg.NotificationId); Sender.Tell(new NotificationSubmitAck( msg.NotificationId, Accepted: false, Error: "notification outbox not available")); return; } _log.Debug("Routing NotificationSubmit {0} to the notification outbox", msg.NotificationId); _notificationOutboxProxy.Forward(msg); } private void HandleNotificationStatusQuery(NotificationStatusQuery msg) { if (_notificationOutboxProxy == null) { // No outbox proxy registered yet. Reply Found: false so the querying site // falls back to its local Store-and-Forward buffer to resolve the status. _log.Warning( "Cannot route NotificationStatusQuery {0} — notification outbox not available", msg.NotificationId); Sender.Tell(new NotificationStatusResponse( msg.CorrelationId, Found: false, Status: "Unknown", RetryCount: 0, LastError: null, DeliveredAt: null)); return; } _log.Debug("Routing NotificationStatusQuery {0} to the notification outbox", msg.NotificationId); _notificationOutboxProxy.Forward(msg); } private void HandleIngestAuditEvents(IngestAuditEventsCommand msg) { if (_auditIngestProxy == null) { // No ingest proxy registered yet (host startup race). Reply with an // empty IngestAuditEventsReply so the site keeps its rows Pending and // retries — the same behaviour as the gRPC handler's wiring-race path. _log.Warning( "Cannot route IngestAuditEventsCommand ({0} events) — audit ingest not available", msg.Events.Count); Sender.Tell(new IngestAuditEventsReply(Array.Empty())); return; } // Capture Sender before the async/PipeTo — Akka resets Sender between // dispatches. The reply is piped straight back to the site's ClusterClient. // On an Ask timeout or a faulted reply, PipeTo delivers a Status.Failure to // replyTo: the fault propagates to the caller rather than being swallowed. // The site's own Ask through this path then faults, and the site drain loop // treats that as a transient failure — rows stay Pending and are retried on // the next tick. (The gRPC handler instead returns an empty ack on fault; // propagating the fault here is the cleaner transient signal.) var replyTo = Sender; _log.Debug("Routing IngestAuditEventsCommand ({0} events) to the audit ingest actor", msg.Events.Count); _auditIngestProxy.Ask(msg, _auditIngestAskTimeout) .PipeTo(replyTo); } private void HandleIngestCachedTelemetry(IngestCachedTelemetryCommand msg) { if (_auditIngestProxy == null) { _log.Warning( "Cannot route IngestCachedTelemetryCommand ({0} entries) — audit ingest not available", msg.Entries.Count); Sender.Tell(new IngestCachedTelemetryReply(Array.Empty())); return; } var replyTo = Sender; _log.Debug("Routing IngestCachedTelemetryCommand ({0} entries) to the audit ingest actor", msg.Entries.Count); _auditIngestProxy.Ask(msg, _auditIngestAskTimeout) .PipeTo(replyTo); } private void HandleHeartbeat(HeartbeatMessage heartbeat) { var aggregator = _serviceProvider.GetService(); aggregator?.MarkHeartbeat(heartbeat.SiteId, heartbeat.Timestamp); } /// /// Handles a report delivered directly from a site (via ClusterClient): /// process locally, then fan out to the peer central node so its /// aggregator stays in sync. /// private void HandleSiteHealthReport(SiteHealthReport report) { ProcessLocally(report); try { DistributedPubSub.Get(Context.System).Mediator.Tell( new Publish(HealthReportTopic, new SiteHealthReportReplica(report))); } catch { // No-op in non-clustered hosts (TestKit). } } /// /// Applies a report to the local aggregator without re-broadcasting. /// Used for both site-originated reports and peer-replicated ones — the /// aggregator is idempotent via sequence-number comparison. /// private void ProcessLocally(SiteHealthReport report) { var aggregator = _serviceProvider.GetService(); if (aggregator != null) { aggregator.ProcessReport(report); } else { _log.Warning("ICentralHealthAggregator not available, dropping health report from site {0}", report.SiteId); } } // Communication-016: HandleConnectionStateChanged removed — no production // caller emitted ConnectionStateChanged, so the workflow ran only in tests. // Disconnect detection is owned by the transport layers (gRPC keepalive + // ClusterClient/Ask timeout). private void HandleSiteEnvelope(SiteEnvelope envelope) { if (!_siteClients.TryGetValue(envelope.SiteId, out var entry)) { _log.Warning("No ClusterClient for site {0}, cannot route message {1}", envelope.SiteId, envelope.Message.GetType().Name); // The Ask will timeout on the caller side — no central buffering (WP-5) return; } // Route via ClusterClient — Sender is preserved for Ask response routing entry.Client.Tell( new ClusterClient.Send("/user/site-communication", envelope.Message), Sender); } private void LoadSiteAddressesFromDb() { var self = Self; // Communication-019: pass the actor's lifecycle CT into the repository // call so a hung database query is cancelled when the actor stops // rather than leaving the piped task to accumulate. Captured locally // because the lifecycle CTS may have been disposed by PostStop on a // racing late tick; treat that as "actor gone, give up". CancellationToken ct; try { ct = _lifecycleCts.Token; } catch (ObjectDisposedException) { return; } Task.Run(async () => { using var scope = _serviceProvider.CreateScope(); var repo = scope.ServiceProvider.GetRequiredService(); var sites = await repo.GetAllSitesAsync(ct).ConfigureAwait(false); var contacts = new Dictionary>(); foreach (var site in sites) { var addrs = new List(); if (!string.IsNullOrWhiteSpace(site.NodeAAddress)) { var addr = site.NodeAAddress; // Strip actor path suffix if present (legacy format) var idx = addr.IndexOf("/user/"); if (idx > 0) addr = addr.Substring(0, idx); addrs.Add(addr); } if (!string.IsNullOrWhiteSpace(site.NodeBAddress)) { var addr = site.NodeBAddress; var idx = addr.IndexOf("/user/"); if (idx > 0) addr = addr.Substring(0, idx); addrs.Add(addr); } if (addrs.Count > 0) contacts[site.SiteIdentifier] = addrs; } // Communication-020: freeze the cross-task payload before piping to // Self. The message record exposes read-only types ( // IReadOnlyDictionary / IReadOnlyList) so the Akka.NET message- // immutability convention is enforced by type, not just convention. var frozen = contacts.ToDictionary( kvp => kvp.Key, kvp => (IReadOnlyList)kvp.Value.AsReadOnly()); return new SiteAddressCacheLoaded(frozen); }).PipeTo(self); } private void HandleSiteAddressCacheLoaded(SiteAddressCacheLoaded msg) { var newSiteIds = msg.SiteContacts.Keys.ToHashSet(); var existingSiteIds = _siteClients.Keys.ToHashSet(); // Stop ClusterClients for removed sites foreach (var removed in existingSiteIds.Except(newSiteIds)) { _log.Info("Stopping ClusterClient for removed site {0}", removed); Context.Stop(_siteClients[removed].Client); _siteClients.Remove(removed); } // Add or update foreach (var (siteId, addresses) in msg.SiteContacts) { // Communication-009: parse all addresses up front inside a try/catch so a // single malformed site row cannot abort the whole refresh loop and leave // the cache half-updated. A bad site is logged and skipped; others proceed. ImmutableHashSet contactPaths; try { contactPaths = addresses .Select(a => ActorPath.Parse($"{a}/system/receptionist")) .ToImmutableHashSet(); } catch (Exception ex) { _log.Warning(ex, "Malformed contact address for site {0}; skipping this site in the refresh " + "(other sites are unaffected)", siteId); continue; } var contactStrings = addresses.ToImmutableHashSet(); // Skip if unchanged if (_siteClients.TryGetValue(siteId, out var existing) && existing.ContactAddresses.SetEquals(contactStrings)) continue; // Stop old client if addresses changed if (_siteClients.ContainsKey(siteId)) { _log.Info("Updating ClusterClient for site {0} (addresses changed)", siteId); Context.Stop(_siteClients[siteId].Client); } var client = _siteClientFactory.Create(Context.System, siteId, contactPaths); _siteClients[siteId] = (client, contactStrings); _log.Info("Created ClusterClient for site {0} with {1} contact(s)", siteId, addresses.Count); } _log.Info("Site ClusterClient cache refreshed with {0} site(s)", _siteClients.Count); } // Communication-016: TrackMessageForCleanup removed — the dicts it fed // existed solely to support the dead ConnectionStateChanged workflow. /// protected override SupervisorStrategy SupervisorStrategy() { return new OneForOneStrategy( maxNrOfRetries: -1, withinTimeRange: Timeout.InfiniteTimeSpan, decider: Decider.From(ex => { _log.Warning(ex, "Child actor of CentralCommunicationActor faulted, resuming (state preserved)"); return Directive.Resume; })); } /// protected override void PreStart() { _log.Info("CentralCommunicationActor started"); // Subscribe to the peer-replication topic so we receive health reports // delivered to the other central node and keep our local aggregator // in sync (ClusterClient load-balances reports across nodes). // Tolerant of non-clustered hosts (TestKit) where the extension is absent. try { DistributedPubSub.Get(Context.System).Mediator.Tell( new Subscribe(HealthReportTopic, Self)); } catch (Exception ex) { _log.Debug("DistributedPubSub not available — peer health replication disabled: {0}", ex.Message); } // Schedule periodic refresh of site addresses from the database _refreshSchedule = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable( TimeSpan.Zero, TimeSpan.FromSeconds(60), Self, new RefreshSiteAddresses(), ActorRefs.NoSender); } /// protected override void PostStop() { _log.Info("CentralCommunicationActor stopped"); _refreshSchedule?.Cancel(); // Communication-019: cancel any in-flight LoadSiteAddressesFromDb so a // hung MS SQL query does not outlive the actor. try { _lifecycleCts.Cancel(); } catch (ObjectDisposedException) { // Double-stop is benign. } _lifecycleCts.Dispose(); } } /// /// Command to trigger a refresh of site addresses from the database. /// public record RefreshSiteAddresses; /// /// Internal message carrying the loaded site contact data from the database. /// ClusterClient creation happens on the actor thread in HandleSiteAddressCacheLoaded. /// /// Communication-020: the payload is exposed as /// of so the Akka.NET "messages are immutable" /// convention is enforced at the type level rather than relying on producer /// discipline. The producer wraps the constructed buckets with /// List<T>.AsReadOnly() before piping to Self. /// internal record SiteAddressCacheLoaded(IReadOnlyDictionary> SiteContacts); /// /// Notification sent to debug view subscribers when the stream is terminated /// due to site disconnection (WP-5). /// public record DebugStreamTerminated(string SiteId, string CorrelationId); /// /// Registers the central NotificationOutboxActor singleton proxy with the /// so site-forwarded /// and messages can be routed to it. Sent by the Host /// after the outbox singleton proxy is created. /// public record RegisterNotificationOutbox(IActorRef OutboxProxy); /// /// Registers the central AuditLogIngestActor singleton proxy with the /// so site-forwarded /// and /// messages can be routed to it. Sent by the Host after the audit-ingest /// singleton proxy is created. Lives here (not in Commons) because /// ScadaLink.Commons has no Akka package reference and cannot hold an /// field. /// public sealed record RegisterAuditIngest(IActorRef AuditIngestActor);