From e5eb8719613cf145bc173ade43af210645c9fef8 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Tue, 17 Mar 2026 23:46:17 -0400 Subject: [PATCH] fix: wire up health report pipeline between sites and central aggregator MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Sites now send SiteHealthReport via AkkaHealthReportTransport → SiteCommunicationActor → CentralCommunicationActor → CentralHealthAggregator. Added IHealthReportTransport impl, ISiteIdentityProvider impl, registered HealthReportSender on site nodes, and added SiteHealthReport handler in CentralCommunicationActor. Health Dashboard now shows all 3 sites online. --- .../Actors/CentralCommunicationActor.cs | 19 ++++++++++-- .../ScadaLink.Communication.csproj | 1 + .../AkkaHealthReportTransport.cs | 29 +++++++++++++++++++ src/ScadaLink.Host/Program.cs | 6 +++- src/ScadaLink.Host/SiteIdentityProvider.cs | 18 ++++++++++++ 5 files changed, 70 insertions(+), 3 deletions(-) create mode 100644 src/ScadaLink.Host/AkkaHealthReportTransport.cs create mode 100644 src/ScadaLink.Host/SiteIdentityProvider.cs diff --git a/src/ScadaLink.Communication/Actors/CentralCommunicationActor.cs b/src/ScadaLink.Communication/Actors/CentralCommunicationActor.cs index 54267d0..883f0bc 100644 --- a/src/ScadaLink.Communication/Actors/CentralCommunicationActor.cs +++ b/src/ScadaLink.Communication/Actors/CentralCommunicationActor.cs @@ -4,6 +4,7 @@ using Microsoft.Extensions.DependencyInjection; using ScadaLink.Commons.Interfaces.Repositories; using ScadaLink.Commons.Messages.Communication; using ScadaLink.Commons.Messages.Health; +using ScadaLink.HealthMonitoring; namespace ScadaLink.Communication.Actors; @@ -50,8 +51,9 @@ public class CentralCommunicationActor : ReceiveActor // Periodic refresh trigger Receive(_ => LoadSiteAddressesFromDb()); - // Site registration via heartbeats + // Health monitoring: heartbeats and health reports from sites Receive(HandleHeartbeat); + Receive(HandleSiteHealthReport); // Connection state changes Receive(HandleConnectionStateChanged); @@ -62,10 +64,23 @@ public class CentralCommunicationActor : ReceiveActor private void HandleHeartbeat(HeartbeatMessage heartbeat) { - // Forward heartbeat to parent/subscribers (central health monitoring) + // Forward heartbeat to parent for any interested central actors Context.Parent.Tell(heartbeat); } + private void HandleSiteHealthReport(SiteHealthReport report) + { + var aggregator = _serviceProvider.GetService(); + if (aggregator != null) + { + aggregator.ProcessReport(report); + } + else + { + _log.Warning("ICentralHealthAggregator not available, dropping health report from site {0}", report.SiteId); + } + } + private void HandleConnectionStateChanged(ConnectionStateChanged msg) { if (!msg.IsConnected) diff --git a/src/ScadaLink.Communication/ScadaLink.Communication.csproj b/src/ScadaLink.Communication/ScadaLink.Communication.csproj index a41b756..30d0b03 100644 --- a/src/ScadaLink.Communication/ScadaLink.Communication.csproj +++ b/src/ScadaLink.Communication/ScadaLink.Communication.csproj @@ -20,6 +20,7 @@ + diff --git a/src/ScadaLink.Host/AkkaHealthReportTransport.cs b/src/ScadaLink.Host/AkkaHealthReportTransport.cs new file mode 100644 index 0000000..ca3508c --- /dev/null +++ b/src/ScadaLink.Host/AkkaHealthReportTransport.cs @@ -0,0 +1,29 @@ +using Akka.Actor; +using ScadaLink.Commons.Messages.Health; +using ScadaLink.HealthMonitoring; +using ScadaLink.Host.Actors; + +namespace ScadaLink.Host; + +/// +/// Sends SiteHealthReport to the local SiteCommunicationActor via Akka ActorSelection. +/// The SiteCommunicationActor forwards it to central. +/// +public class AkkaHealthReportTransport : IHealthReportTransport +{ + private readonly AkkaHostedService _akkaService; + + public AkkaHealthReportTransport(AkkaHostedService akkaService) + { + _akkaService = akkaService; + } + + public void Send(SiteHealthReport report) + { + var actorSystem = _akkaService.ActorSystem; + if (actorSystem == null) return; + + var siteComm = actorSystem.ActorSelection("/user/site-communication"); + siteComm.Tell(report, ActorRefs.NoSender); + } +} diff --git a/src/ScadaLink.Host/Program.cs b/src/ScadaLink.Host/Program.cs index 38b8b34..c8a6aa3 100644 --- a/src/ScadaLink.Host/Program.cs +++ b/src/ScadaLink.Host/Program.cs @@ -151,10 +151,14 @@ try // Shared components services.AddClusterInfrastructure(); services.AddCommunication(); - services.AddHealthMonitoring(); + services.AddSiteHealthMonitoring(); services.AddExternalSystemGateway(); services.AddNotificationService(); + // Health report transport: sends SiteHealthReport to SiteCommunicationActor via Akka + services.AddSingleton(); + services.AddSingleton(); + // Site-only components — AddSiteRuntime registers SiteStorageService with SQLite path // and site-local repository implementations (IExternalSystemRepository, INotificationRepository) var siteDbPath = context.Configuration["ScadaLink:Database:SiteDbPath"] ?? "site.db"; diff --git a/src/ScadaLink.Host/SiteIdentityProvider.cs b/src/ScadaLink.Host/SiteIdentityProvider.cs new file mode 100644 index 0000000..7be9067 --- /dev/null +++ b/src/ScadaLink.Host/SiteIdentityProvider.cs @@ -0,0 +1,18 @@ +using Microsoft.Extensions.Options; +using ScadaLink.HealthMonitoring; + +namespace ScadaLink.Host; + +/// +/// Provides the site identity from NodeOptions configuration. +/// +public class SiteIdentityProvider : ISiteIdentityProvider +{ + public string SiteId { get; } + + public SiteIdentityProvider(IOptions nodeOptions) + { + SiteId = nodeOptions.Value.SiteId + ?? throw new InvalidOperationException("SiteId is required for site nodes."); + } +}