From 6f1f6b846728b9ee7603a5cf642c98e91b311e8b Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Wed, 13 May 2026 06:20:07 -0400 Subject: [PATCH] fix(health): replicate site health reports between central nodes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit CentralHealthAggregator is a per-node hosted singleton, but site health reports flow through ClusterClient which round-robins each report to one central node only. The other node's aggregator never saw those reports and marked sites offline at the 60s threshold — sites constantly flapped between online and offline on the monitoring page. On receive, the active CentralCommunicationActor now republishes a SiteHealthReportReplica wrapper on a DistributedPubSub topic. Both central nodes subscribe to the topic and process replicas through a dedicated path that updates the local aggregator without re-broadcasting (avoids fan-out loops). The aggregator's existing sequence-number idempotency makes self-delivery a cheap no-op. DistributedPubSubExtensionProvider is now listed in the HOCON `akka.extensions` block so the mediator is initialised at cluster start, eliminating a race where the first Subscribe arrived before the extension was loaded. --- .../Messages/Health/SiteHealthReport.cs | 10 ++++ .../Actors/CentralCommunicationActor.cs | 49 +++++++++++++++++++ .../Actors/AkkaHostedService.cs | 3 ++ 3 files changed, 62 insertions(+) diff --git a/src/ScadaLink.Commons/Messages/Health/SiteHealthReport.cs b/src/ScadaLink.Commons/Messages/Health/SiteHealthReport.cs index 793c5fd..d61b37e 100644 --- a/src/ScadaLink.Commons/Messages/Health/SiteHealthReport.cs +++ b/src/ScadaLink.Commons/Messages/Health/SiteHealthReport.cs @@ -21,3 +21,13 @@ public record SiteHealthReport( IReadOnlyDictionary? DataConnectionTagQuality = null, int ParkedMessageCount = 0, IReadOnlyList? ClusterNodes = null); + +/// +/// Broadcast wrapper used between central nodes to keep per-node +/// CentralHealthAggregator state in sync. ClusterClient load-balances each +/// incoming SiteHealthReport to one central node; that node re-publishes +/// this wrapper on a DistributedPubSub topic so the peer node's aggregator +/// also processes the report (idempotently — sequence numbers guard against +/// double-counting). +/// +public record SiteHealthReportReplica(SiteHealthReport Report); diff --git a/src/ScadaLink.Communication/Actors/CentralCommunicationActor.cs b/src/ScadaLink.Communication/Actors/CentralCommunicationActor.cs index 080661f..13f12d5 100644 --- a/src/ScadaLink.Communication/Actors/CentralCommunicationActor.cs +++ b/src/ScadaLink.Communication/Actors/CentralCommunicationActor.cs @@ -1,6 +1,7 @@ using System.Collections.Immutable; using Akka.Actor; using Akka.Cluster.Tools.Client; +using Akka.Cluster.Tools.PublishSubscribe; using Akka.Event; using Microsoft.Extensions.DependencyInjection; using ScadaLink.Commons.Interfaces.Repositories; @@ -65,6 +66,13 @@ public class CentralCommunicationActor : ReceiveActor private ICancelable? _refreshSchedule; + /// + /// DistributedPubSub topic used to fan health reports out to the peer + /// central node so both per-node aggregators stay in sync. See + /// for the protocol rationale. + /// + private const string HealthReportTopic = "site-health-replica"; + public CentralCommunicationActor(IServiceProvider serviceProvider, ISiteClientFactory siteClientFactory) { _serviceProvider = serviceProvider; @@ -79,6 +87,8 @@ public class CentralCommunicationActor : ReceiveActor // Health monitoring: heartbeats and health reports from sites Receive(HandleHeartbeat); Receive(HandleSiteHealthReport); + Receive(r => ProcessLocally(r.Report)); + Receive(_ => { /* DistributedPubSub subscribe confirmation */ }); // Connection state changes Receive(HandleConnectionStateChanged); @@ -94,7 +104,32 @@ public class CentralCommunicationActor : ReceiveActor Context.Parent.Tell(heartbeat); } + /// + /// Handles a report delivered directly from a site (via ClusterClient): + /// process locally, then fan out to the peer central node so its + /// aggregator stays in sync. + /// private void HandleSiteHealthReport(SiteHealthReport report) + { + ProcessLocally(report); + + try + { + DistributedPubSub.Get(Context.System).Mediator.Tell( + new Publish(HealthReportTopic, new SiteHealthReportReplica(report))); + } + catch + { + // No-op in non-clustered hosts (TestKit). + } + } + + /// + /// Applies a report to the local aggregator without re-broadcasting. + /// Used for both site-originated reports and peer-replicated ones — the + /// aggregator is idempotent via sequence-number comparison. + /// + private void ProcessLocally(SiteHealthReport report) { var aggregator = _serviceProvider.GetService(); if (aggregator != null) @@ -265,6 +300,20 @@ public class CentralCommunicationActor : ReceiveActor { _log.Info("CentralCommunicationActor started"); + // Subscribe to the peer-replication topic so we receive health reports + // delivered to the other central node and keep our local aggregator + // in sync (ClusterClient load-balances reports across nodes). + // Tolerant of non-clustered hosts (TestKit) where the extension is absent. + try + { + DistributedPubSub.Get(Context.System).Mediator.Tell( + new Subscribe(HealthReportTopic, Self)); + } + catch (Exception ex) + { + _log.Debug("DistributedPubSub not available — peer health replication disabled: {0}", ex.Message); + } + // Schedule periodic refresh of site addresses from the database _refreshSchedule = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable( TimeSpan.Zero, diff --git a/src/ScadaLink.Host/Actors/AkkaHostedService.cs b/src/ScadaLink.Host/Actors/AkkaHostedService.cs index 24f7310..9bfd1ac 100644 --- a/src/ScadaLink.Host/Actors/AkkaHostedService.cs +++ b/src/ScadaLink.Host/Actors/AkkaHostedService.cs @@ -69,6 +69,9 @@ public class AkkaHostedService : IHostedService var hocon = $@" akka {{ + extensions = [ + ""Akka.Cluster.Tools.PublishSubscribe.DistributedPubSubExtensionProvider, Akka.Cluster.Tools"" + ] actor {{ provider = cluster }}