Files
ScadaBridge/src/ScadaLink.Communication/Actors/CentralCommunicationActor.cs
T

588 lines
25 KiB
C#

using System.Collections.Immutable;
using Akka.Actor;
using Akka.Cluster.Tools.Client;
using Akka.Cluster.Tools.PublishSubscribe;
using Akka.Event;
using Microsoft.Extensions.DependencyInjection;
using ScadaLink.Commons.Interfaces.Repositories;
using ScadaLink.Commons.Messages.Audit;
using ScadaLink.Commons.Messages.Communication;
using ScadaLink.Commons.Messages.Health;
using ScadaLink.Commons.Messages.Notification;
using ScadaLink.HealthMonitoring;
namespace ScadaLink.Communication.Actors;
/// <summary>
/// Abstraction for creating ClusterClient instances per site, enabling testability.
/// </summary>
public interface ISiteClientFactory
{
IActorRef Create(ActorSystem system, string siteId, ImmutableHashSet<ActorPath> contacts);
}
/// <summary>
/// Default implementation that creates a real ClusterClient for each site.
/// </summary>
public class DefaultSiteClientFactory : ISiteClientFactory
{
public IActorRef Create(ActorSystem system, string siteId, ImmutableHashSet<ActorPath> contacts)
{
var settings = ClusterClientSettings.Create(system).WithInitialContacts(contacts);
return system.ActorOf(ClusterClient.Props(settings), $"site-client-{siteId}");
}
}
/// <summary>
/// Central-side actor that routes messages from central to site clusters via ClusterClient.
/// Resolves site addresses from the database on a periodic refresh cycle and manages
/// per-site ClusterClient instances.
///
/// WP-4: All 8 message patterns routed through this actor.
/// WP-5: Ask timeout on connection drop (no central buffering). Debug streams killed on interruption.
/// </summary>
public class CentralCommunicationActor : ReceiveActor
{
private readonly ILoggingAdapter _log = Context.GetLogger();
private readonly IServiceProvider _serviceProvider;
private readonly ISiteClientFactory _siteClientFactory;
/// <summary>
/// Per-site ClusterClient instances and their contact addresses.
/// Maps SiteIdentifier → (ClusterClient actor, set of contact address strings).
/// Refreshed periodically via RefreshSiteAddresses.
/// </summary>
private readonly Dictionary<string, (IActorRef Client, ImmutableHashSet<string> ContactAddresses)> _siteClients = new();
/// <summary>
/// Tracks active debug view subscriptions: correlationId → (siteId, subscriber).
/// Used to kill debug streams on site disconnection (WP-5).
/// </summary>
private readonly Dictionary<string, (string SiteId, IActorRef Subscriber)> _debugSubscriptions = new();
/// <summary>
/// Tracks in-progress deployments: deploymentId → siteId.
/// On central failover, in-progress deployments are treated as failed (WP-5).
/// </summary>
private readonly Dictionary<string, string> _inProgressDeployments = new();
private ICancelable? _refreshSchedule;
/// <summary>
/// Proxy <see cref="IActorRef"/> for the central NotificationOutboxActor cluster singleton.
/// Set via <see cref="RegisterNotificationOutbox"/> — the Host creates the singleton proxy
/// after this actor and registers it (mirrors how the site-side actor receives its
/// runtime <see cref="IActorRef"/>s). Null until registration completes; a notification
/// arriving before then is rejected with a non-accepted ack so the site retries.
/// </summary>
private IActorRef? _notificationOutboxProxy;
/// <summary>
/// Proxy <see cref="IActorRef"/> for the central AuditLogIngestActor cluster
/// singleton. Set via <see cref="RegisterAuditIngest"/> — the Host creates the
/// singleton proxy after this actor and registers it (mirrors
/// <see cref="_notificationOutboxProxy"/>). Null until registration completes;
/// an audit ingest command arriving before then is answered with an empty
/// reply so the site keeps its rows Pending and retries.
///
/// Once registered, the handler Asks this proxy and pipes the reply straight
/// back to the caller. On an Ask timeout or a faulted reply, PipeTo forwards a
/// <see cref="Status.Failure"/> to the caller — the fault propagates rather
/// than being swallowed. This differs from the gRPC handler
/// (<c>SiteStreamGrpcServer</c>), which catches the exception and returns an
/// empty ack; here the faulted Ask is the transient signal the site relies on
/// (see <see cref="HandleIngestAuditEvents"/>).
/// </summary>
private IActorRef? _auditIngestProxy;
/// <summary>
/// Default Ask timeout for routing audit ingest commands to the
/// AuditLogIngestActor proxy — 30 s, matching the value of
/// <c>SiteStreamGrpcServer.AuditIngestAskTimeout</c> (that constant is private
/// to the gRPC server and not reachable here, so it is declared locally). A
/// generous window absorbs a slow MS SQL connection without the round-trip
/// surfacing as a failure on a healthy site. When the window is exceeded the
/// Ask faults and that fault is piped back to the caller as a
/// <see cref="Status.Failure"/> (see <see cref="HandleIngestAuditEvents"/>).
/// </summary>
private static readonly TimeSpan DefaultAuditIngestAskTimeout = TimeSpan.FromSeconds(30);
/// <summary>
/// Effective Ask timeout for audit ingest routing. Defaults to
/// <see cref="DefaultAuditIngestAskTimeout"/>; overridable via the constructor
/// so tests can exercise the timeout/fault path without waiting 30 s.
/// </summary>
private readonly TimeSpan _auditIngestAskTimeout;
/// <summary>
/// DistributedPubSub topic used to fan health reports out to the peer
/// central node so both per-node aggregators stay in sync. See
/// <see cref="SiteHealthReportReplica"/> for the protocol rationale.
/// </summary>
private const string HealthReportTopic = "site-health-replica";
/// <param name="auditIngestAskTimeout">
/// Optional override for the audit-ingest Ask timeout; defaults to
/// <see cref="DefaultAuditIngestAskTimeout"/> (30 s). Exists only so tests can
/// exercise the timeout/fault path quickly — production always uses the default.
/// </param>
public CentralCommunicationActor(
IServiceProvider serviceProvider,
ISiteClientFactory siteClientFactory,
TimeSpan? auditIngestAskTimeout = null)
{
_serviceProvider = serviceProvider;
_siteClientFactory = siteClientFactory;
_auditIngestAskTimeout = auditIngestAskTimeout ?? DefaultAuditIngestAskTimeout;
// Site address cache loaded from database
Receive<SiteAddressCacheLoaded>(HandleSiteAddressCacheLoaded);
// Periodic refresh trigger
Receive<RefreshSiteAddresses>(_ => LoadSiteAddressesFromDb());
// Communication-006: a faulted LoadSiteAddressesFromDb task is piped here as a
// Status.Failure. Without this handler the failure was an unhandled message
// (debug-level only) and the refresh failed silently — operators could not
// distinguish "no sites configured" from "database is down". Log at Warning.
Receive<Status.Failure>(failure =>
_log.Warning(failure.Cause,
"Failed to load site addresses from the database; the site ClusterClient "
+ "cache was not refreshed and may be stale or empty"));
// Health monitoring: heartbeats and health reports from sites
Receive<HeartbeatMessage>(HandleHeartbeat);
Receive<SiteHealthReport>(HandleSiteHealthReport);
Receive<SiteHealthReportReplica>(r => ProcessLocally(r.Report));
Receive<SubscribeAck>(_ => { /* DistributedPubSub subscribe confirmation */ });
// Connection state changes
Receive<ConnectionStateChanged>(HandleConnectionStateChanged);
// Route enveloped messages to sites
Receive<SiteEnvelope>(HandleSiteEnvelope);
// Notification Outbox: the Host registers the outbox singleton proxy after this
// actor is created (the proxy cannot exist before this actor's construction).
Receive<RegisterNotificationOutbox>(msg =>
{
_notificationOutboxProxy = msg.OutboxProxy;
_log.Info("Registered notification outbox proxy");
});
// Notification Outbox ingest: a site forwards a buffered NotificationSubmit to the
// central cluster via ClusterClient. Forward to the outbox proxy so the original
// Sender (the site's ClusterClient path) is preserved and the NotificationSubmitAck
// routes straight back to the site.
Receive<NotificationSubmit>(HandleNotificationSubmit);
// Notification Outbox status query: forward to the outbox proxy, preserving Sender
// so the NotificationStatusResponse routes back to the querying site.
Receive<NotificationStatusQuery>(HandleNotificationStatusQuery);
// Audit Log (#23): the Host registers the AuditLogIngestActor singleton
// proxy after this actor is created (the proxy cannot exist before this
// actor's construction).
Receive<RegisterAuditIngest>(msg =>
{
_auditIngestProxy = msg.AuditIngestActor;
_log.Info("Registered audit ingest proxy");
});
// Audit Log (#23) site→central ingest: a site forwards a batch of audit
// events to the central cluster via ClusterClient. Ask the ingest proxy
// and pipe the IngestAuditEventsReply back to the original Sender (the
// site's ClusterClient path) so the site can flip its rows to Forwarded.
Receive<IngestAuditEventsCommand>(HandleIngestAuditEvents);
// Audit Log (#23 M3) combined-telemetry ingest: routes to the same proxy
// the same way; the proxy replies with an IngestCachedTelemetryReply.
Receive<IngestCachedTelemetryCommand>(HandleIngestCachedTelemetry);
}
private void HandleNotificationSubmit(NotificationSubmit msg)
{
if (_notificationOutboxProxy == null)
{
// No outbox proxy registered yet. A non-accepted ack makes the site's
// Store-and-Forward forwarder treat this as transient and retry later.
_log.Warning(
"Cannot route NotificationSubmit {0} — notification outbox not available",
msg.NotificationId);
Sender.Tell(new NotificationSubmitAck(
msg.NotificationId, Accepted: false, Error: "notification outbox not available"));
return;
}
_log.Debug("Routing NotificationSubmit {0} to the notification outbox", msg.NotificationId);
_notificationOutboxProxy.Forward(msg);
}
private void HandleNotificationStatusQuery(NotificationStatusQuery msg)
{
if (_notificationOutboxProxy == null)
{
// No outbox proxy registered yet. Reply Found: false so the querying site
// falls back to its local Store-and-Forward buffer to resolve the status.
_log.Warning(
"Cannot route NotificationStatusQuery {0} — notification outbox not available",
msg.NotificationId);
Sender.Tell(new NotificationStatusResponse(
msg.CorrelationId, Found: false, Status: "Unknown",
RetryCount: 0, LastError: null, DeliveredAt: null));
return;
}
_log.Debug("Routing NotificationStatusQuery {0} to the notification outbox", msg.NotificationId);
_notificationOutboxProxy.Forward(msg);
}
private void HandleIngestAuditEvents(IngestAuditEventsCommand msg)
{
if (_auditIngestProxy == null)
{
// No ingest proxy registered yet (host startup race). Reply with an
// empty IngestAuditEventsReply so the site keeps its rows Pending and
// retries — the same behaviour as the gRPC handler's wiring-race path.
_log.Warning(
"Cannot route IngestAuditEventsCommand ({0} events) — audit ingest not available",
msg.Events.Count);
Sender.Tell(new IngestAuditEventsReply(Array.Empty<Guid>()));
return;
}
// Capture Sender before the async/PipeTo — Akka resets Sender between
// dispatches. The reply is piped straight back to the site's ClusterClient.
// On an Ask timeout or a faulted reply, PipeTo delivers a Status.Failure to
// replyTo: the fault propagates to the caller rather than being swallowed.
// The site's own Ask through this path then faults, and the site drain loop
// treats that as a transient failure — rows stay Pending and are retried on
// the next tick. (The gRPC handler instead returns an empty ack on fault;
// propagating the fault here is the cleaner transient signal.)
var replyTo = Sender;
_log.Debug("Routing IngestAuditEventsCommand ({0} events) to the audit ingest actor", msg.Events.Count);
_auditIngestProxy.Ask<IngestAuditEventsReply>(msg, _auditIngestAskTimeout)
.PipeTo(replyTo);
}
private void HandleIngestCachedTelemetry(IngestCachedTelemetryCommand msg)
{
if (_auditIngestProxy == null)
{
_log.Warning(
"Cannot route IngestCachedTelemetryCommand ({0} entries) — audit ingest not available",
msg.Entries.Count);
Sender.Tell(new IngestCachedTelemetryReply(Array.Empty<Guid>()));
return;
}
var replyTo = Sender;
_log.Debug("Routing IngestCachedTelemetryCommand ({0} entries) to the audit ingest actor", msg.Entries.Count);
_auditIngestProxy.Ask<IngestCachedTelemetryReply>(msg, _auditIngestAskTimeout)
.PipeTo(replyTo);
}
private void HandleHeartbeat(HeartbeatMessage heartbeat)
{
var aggregator = _serviceProvider.GetService<ICentralHealthAggregator>();
aggregator?.MarkHeartbeat(heartbeat.SiteId, heartbeat.Timestamp);
}
/// <summary>
/// Handles a report delivered directly from a site (via ClusterClient):
/// process locally, then fan out to the peer central node so its
/// aggregator stays in sync.
/// </summary>
private void HandleSiteHealthReport(SiteHealthReport report)
{
ProcessLocally(report);
try
{
DistributedPubSub.Get(Context.System).Mediator.Tell(
new Publish(HealthReportTopic, new SiteHealthReportReplica(report)));
}
catch
{
// No-op in non-clustered hosts (TestKit).
}
}
/// <summary>
/// Applies a report to the local aggregator without re-broadcasting.
/// Used for both site-originated reports and peer-replicated ones — the
/// aggregator is idempotent via sequence-number comparison.
/// </summary>
private void ProcessLocally(SiteHealthReport report)
{
var aggregator = _serviceProvider.GetService<ICentralHealthAggregator>();
if (aggregator != null)
{
aggregator.ProcessReport(report);
}
else
{
_log.Warning("ICentralHealthAggregator not available, dropping health report from site {0}", report.SiteId);
}
}
private void HandleConnectionStateChanged(ConnectionStateChanged msg)
{
if (!msg.IsConnected)
{
_log.Warning("Site {0} disconnected at {1}", msg.SiteId, msg.Timestamp);
// WP-5: Kill active debug streams for the disconnected site
var toRemove = _debugSubscriptions
.Where(kvp => kvp.Value.SiteId == msg.SiteId)
.ToList();
foreach (var kvp in toRemove)
{
_log.Info("Killing debug stream {0} for disconnected site {1}", kvp.Key, msg.SiteId);
kvp.Value.Subscriber.Tell(new DebugStreamTerminated(msg.SiteId, kvp.Key));
_debugSubscriptions.Remove(kvp.Key);
}
// WP-5: Mark in-progress deployments as failed
var failedDeployments = _inProgressDeployments
.Where(kvp => kvp.Value == msg.SiteId)
.Select(kvp => kvp.Key)
.ToList();
foreach (var deploymentId in failedDeployments)
{
_log.Warning("Deployment {0} to site {1} treated as failed due to disconnection",
deploymentId, msg.SiteId);
_inProgressDeployments.Remove(deploymentId);
}
// Note: Do NOT stop the ClusterClient — it handles reconnection internally
}
else
{
_log.Info("Site {0} connected at {1}", msg.SiteId, msg.Timestamp);
}
}
private void HandleSiteEnvelope(SiteEnvelope envelope)
{
if (!_siteClients.TryGetValue(envelope.SiteId, out var entry))
{
_log.Warning("No ClusterClient for site {0}, cannot route message {1}",
envelope.SiteId, envelope.Message.GetType().Name);
// The Ask will timeout on the caller side — no central buffering (WP-5)
return;
}
// Track debug subscriptions for cleanup on disconnect
TrackMessageForCleanup(envelope);
// Route via ClusterClient — Sender is preserved for Ask response routing
entry.Client.Tell(
new ClusterClient.Send("/user/site-communication", envelope.Message),
Sender);
}
private void LoadSiteAddressesFromDb()
{
var self = Self;
Task.Run(async () =>
{
using var scope = _serviceProvider.CreateScope();
var repo = scope.ServiceProvider.GetRequiredService<ISiteRepository>();
var sites = await repo.GetAllSitesAsync();
var contacts = new Dictionary<string, List<string>>();
foreach (var site in sites)
{
var addrs = new List<string>();
if (!string.IsNullOrWhiteSpace(site.NodeAAddress))
{
var addr = site.NodeAAddress;
// Strip actor path suffix if present (legacy format)
var idx = addr.IndexOf("/user/");
if (idx > 0) addr = addr.Substring(0, idx);
addrs.Add(addr);
}
if (!string.IsNullOrWhiteSpace(site.NodeBAddress))
{
var addr = site.NodeBAddress;
var idx = addr.IndexOf("/user/");
if (idx > 0) addr = addr.Substring(0, idx);
addrs.Add(addr);
}
if (addrs.Count > 0)
contacts[site.SiteIdentifier] = addrs;
}
return new SiteAddressCacheLoaded(contacts);
}).PipeTo(self);
}
private void HandleSiteAddressCacheLoaded(SiteAddressCacheLoaded msg)
{
var newSiteIds = msg.SiteContacts.Keys.ToHashSet();
var existingSiteIds = _siteClients.Keys.ToHashSet();
// Stop ClusterClients for removed sites
foreach (var removed in existingSiteIds.Except(newSiteIds))
{
_log.Info("Stopping ClusterClient for removed site {0}", removed);
Context.Stop(_siteClients[removed].Client);
_siteClients.Remove(removed);
}
// Add or update
foreach (var (siteId, addresses) in msg.SiteContacts)
{
// Communication-009: parse all addresses up front inside a try/catch so a
// single malformed site row cannot abort the whole refresh loop and leave
// the cache half-updated. A bad site is logged and skipped; others proceed.
ImmutableHashSet<ActorPath> contactPaths;
try
{
contactPaths = addresses
.Select(a => ActorPath.Parse($"{a}/system/receptionist"))
.ToImmutableHashSet();
}
catch (Exception ex)
{
_log.Warning(ex,
"Malformed contact address for site {0}; skipping this site in the refresh "
+ "(other sites are unaffected)", siteId);
continue;
}
var contactStrings = addresses.ToImmutableHashSet();
// Skip if unchanged
if (_siteClients.TryGetValue(siteId, out var existing) && existing.ContactAddresses.SetEquals(contactStrings))
continue;
// Stop old client if addresses changed
if (_siteClients.ContainsKey(siteId))
{
_log.Info("Updating ClusterClient for site {0} (addresses changed)", siteId);
Context.Stop(_siteClients[siteId].Client);
}
var client = _siteClientFactory.Create(Context.System, siteId, contactPaths);
_siteClients[siteId] = (client, contactStrings);
_log.Info("Created ClusterClient for site {0} with {1} contact(s)", siteId, addresses.Count);
}
_log.Info("Site ClusterClient cache refreshed with {0} site(s)", _siteClients.Count);
}
private void TrackMessageForCleanup(SiteEnvelope envelope)
{
switch (envelope.Message)
{
case Commons.Messages.DebugView.SubscribeDebugViewRequest sub:
_debugSubscriptions[sub.CorrelationId] = (envelope.SiteId, Sender);
break;
case Commons.Messages.DebugView.UnsubscribeDebugViewRequest unsub:
_debugSubscriptions.Remove(unsub.CorrelationId);
break;
case Commons.Messages.Deployment.DeployInstanceCommand deploy:
_inProgressDeployments[deploy.DeploymentId] = envelope.SiteId;
break;
}
}
/// <summary>
/// Coordinator supervision strategy (CLAUDE.md: "Resume for coordinator actors").
/// CentralCommunicationActor is a long-lived coordinator that owns the per-site
/// ClusterClient map; a transient fault in a child (e.g. a ClusterClient child)
/// must Resume so the child's connection state is preserved rather than wiped by
/// a Restart.
/// </summary>
protected override SupervisorStrategy SupervisorStrategy()
{
return new OneForOneStrategy(
maxNrOfRetries: -1,
withinTimeRange: Timeout.InfiniteTimeSpan,
decider: Decider.From(ex =>
{
_log.Warning(ex, "Child actor of CentralCommunicationActor faulted, resuming (state preserved)");
return Directive.Resume;
}));
}
protected override void PreStart()
{
_log.Info("CentralCommunicationActor started");
// Subscribe to the peer-replication topic so we receive health reports
// delivered to the other central node and keep our local aggregator
// in sync (ClusterClient load-balances reports across nodes).
// Tolerant of non-clustered hosts (TestKit) where the extension is absent.
try
{
DistributedPubSub.Get(Context.System).Mediator.Tell(
new Subscribe(HealthReportTopic, Self));
}
catch (Exception ex)
{
_log.Debug("DistributedPubSub not available — peer health replication disabled: {0}", ex.Message);
}
// Schedule periodic refresh of site addresses from the database
_refreshSchedule = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(
TimeSpan.Zero,
TimeSpan.FromSeconds(60),
Self,
new RefreshSiteAddresses(),
ActorRefs.NoSender);
}
protected override void PostStop()
{
_log.Info("CentralCommunicationActor stopped. In-progress deployments treated as failed (WP-5).");
_refreshSchedule?.Cancel();
// On central failover, all in-progress deployments are failed
_inProgressDeployments.Clear();
_debugSubscriptions.Clear();
}
}
/// <summary>
/// Command to trigger a refresh of site addresses from the database.
/// </summary>
public record RefreshSiteAddresses;
/// <summary>
/// Internal message carrying the loaded site contact data from the database.
/// ClusterClient creation happens on the actor thread in HandleSiteAddressCacheLoaded.
/// </summary>
internal record SiteAddressCacheLoaded(Dictionary<string, List<string>> SiteContacts);
/// <summary>
/// Notification sent to debug view subscribers when the stream is terminated
/// due to site disconnection (WP-5).
/// </summary>
public record DebugStreamTerminated(string SiteId, string CorrelationId);
/// <summary>
/// Registers the central NotificationOutboxActor singleton proxy with the
/// <see cref="CentralCommunicationActor"/> so site-forwarded <see cref="NotificationSubmit"/>
/// and <see cref="NotificationStatusQuery"/> messages can be routed to it. Sent by the Host
/// after the outbox singleton proxy is created.
/// </summary>
public record RegisterNotificationOutbox(IActorRef OutboxProxy);
/// <summary>
/// Registers the central AuditLogIngestActor singleton proxy with the
/// <see cref="CentralCommunicationActor"/> so site-forwarded
/// <see cref="IngestAuditEventsCommand"/> and <see cref="IngestCachedTelemetryCommand"/>
/// messages can be routed to it. Sent by the Host after the audit-ingest
/// singleton proxy is created. Lives here (not in Commons) because
/// <c>ScadaLink.Commons</c> has no Akka package reference and cannot hold an
/// <see cref="IActorRef"/> field.
/// </summary>
public sealed record RegisterAuditIngest(IActorRef AuditIngestActor);