fix(health): replicate site health reports between central nodes

CentralHealthAggregator is a per-node hosted singleton, but site health
reports flow through ClusterClient which round-robins each report to one
central node only. The other node's aggregator never saw those reports
and marked sites offline at the 60s threshold — sites constantly flapped
between online and offline on the monitoring page.

On receive, the active CentralCommunicationActor now republishes a
SiteHealthReportReplica wrapper on a DistributedPubSub topic. Both
central nodes subscribe to the topic and process replicas through a
dedicated path that updates the local aggregator without re-broadcasting
(avoids fan-out loops). The aggregator's existing sequence-number
idempotency makes self-delivery a cheap no-op.

DistributedPubSubExtensionProvider is now listed in the HOCON
`akka.extensions` block so the mediator is initialised at cluster
start, eliminating a race where the first Subscribe arrived before the
extension was loaded.
This commit is contained in:
Joseph Doherty
2026-05-13 06:20:07 -04:00
parent d9caa3dd7e
commit 6f1f6b8467
3 changed files with 62 additions and 0 deletions

View File

@@ -21,3 +21,13 @@ public record SiteHealthReport(
IReadOnlyDictionary<string, TagQualityCounts>? DataConnectionTagQuality = null,
int ParkedMessageCount = 0,
IReadOnlyList<NodeStatus>? ClusterNodes = null);
/// <summary>
/// Broadcast wrapper used between central nodes to keep per-node
/// CentralHealthAggregator state in sync. ClusterClient load-balances each
/// incoming SiteHealthReport to one central node; that node re-publishes
/// this wrapper on a DistributedPubSub topic so the peer node's aggregator
/// also processes the report (idempotently — sequence numbers guard against
/// double-counting).
/// </summary>
public record SiteHealthReportReplica(SiteHealthReport Report);

View File

@@ -1,6 +1,7 @@
using System.Collections.Immutable;
using Akka.Actor;
using Akka.Cluster.Tools.Client;
using Akka.Cluster.Tools.PublishSubscribe;
using Akka.Event;
using Microsoft.Extensions.DependencyInjection;
using ScadaLink.Commons.Interfaces.Repositories;
@@ -65,6 +66,13 @@ public class CentralCommunicationActor : ReceiveActor
private ICancelable? _refreshSchedule;
/// <summary>
/// DistributedPubSub topic used to fan health reports out to the peer
/// central node so both per-node aggregators stay in sync. See
/// <see cref="SiteHealthReportReplica"/> for the protocol rationale.
/// </summary>
private const string HealthReportTopic = "site-health-replica";
public CentralCommunicationActor(IServiceProvider serviceProvider, ISiteClientFactory siteClientFactory)
{
_serviceProvider = serviceProvider;
@@ -79,6 +87,8 @@ public class CentralCommunicationActor : ReceiveActor
// Health monitoring: heartbeats and health reports from sites
Receive<HeartbeatMessage>(HandleHeartbeat);
Receive<SiteHealthReport>(HandleSiteHealthReport);
Receive<SiteHealthReportReplica>(r => ProcessLocally(r.Report));
Receive<SubscribeAck>(_ => { /* DistributedPubSub subscribe confirmation */ });
// Connection state changes
Receive<ConnectionStateChanged>(HandleConnectionStateChanged);
@@ -94,7 +104,32 @@ public class CentralCommunicationActor : ReceiveActor
Context.Parent.Tell(heartbeat);
}
/// <summary>
/// Handles a report delivered directly from a site (via ClusterClient):
/// process locally, then fan out to the peer central node so its
/// aggregator stays in sync.
/// </summary>
private void HandleSiteHealthReport(SiteHealthReport report)
{
ProcessLocally(report);
try
{
DistributedPubSub.Get(Context.System).Mediator.Tell(
new Publish(HealthReportTopic, new SiteHealthReportReplica(report)));
}
catch
{
// No-op in non-clustered hosts (TestKit).
}
}
/// <summary>
/// Applies a report to the local aggregator without re-broadcasting.
/// Used for both site-originated reports and peer-replicated ones — the
/// aggregator is idempotent via sequence-number comparison.
/// </summary>
private void ProcessLocally(SiteHealthReport report)
{
var aggregator = _serviceProvider.GetService<ICentralHealthAggregator>();
if (aggregator != null)
@@ -265,6 +300,20 @@ public class CentralCommunicationActor : ReceiveActor
{
_log.Info("CentralCommunicationActor started");
// Subscribe to the peer-replication topic so we receive health reports
// delivered to the other central node and keep our local aggregator
// in sync (ClusterClient load-balances reports across nodes).
// Tolerant of non-clustered hosts (TestKit) where the extension is absent.
try
{
DistributedPubSub.Get(Context.System).Mediator.Tell(
new Subscribe(HealthReportTopic, Self));
}
catch (Exception ex)
{
_log.Debug("DistributedPubSub not available — peer health replication disabled: {0}", ex.Message);
}
// Schedule periodic refresh of site addresses from the database
_refreshSchedule = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(
TimeSpan.Zero,

View File

@@ -69,6 +69,9 @@ public class AkkaHostedService : IHostedService
var hocon = $@"
akka {{
extensions = [
""Akka.Cluster.Tools.PublishSubscribe.DistributedPubSubExtensionProvider, Akka.Cluster.Tools""
]
actor {{
provider = cluster
}}