fix: wire up health report pipeline between sites and central aggregator
Sites now send SiteHealthReport via AkkaHealthReportTransport → SiteCommunicationActor → CentralCommunicationActor → CentralHealthAggregator. Added IHealthReportTransport impl, ISiteIdentityProvider impl, registered HealthReportSender on site nodes, and added SiteHealthReport handler in CentralCommunicationActor. Health Dashboard now shows all 3 sites online.
This commit is contained in:
@@ -4,6 +4,7 @@ using Microsoft.Extensions.DependencyInjection;
|
|||||||
using ScadaLink.Commons.Interfaces.Repositories;
|
using ScadaLink.Commons.Interfaces.Repositories;
|
||||||
using ScadaLink.Commons.Messages.Communication;
|
using ScadaLink.Commons.Messages.Communication;
|
||||||
using ScadaLink.Commons.Messages.Health;
|
using ScadaLink.Commons.Messages.Health;
|
||||||
|
using ScadaLink.HealthMonitoring;
|
||||||
|
|
||||||
namespace ScadaLink.Communication.Actors;
|
namespace ScadaLink.Communication.Actors;
|
||||||
|
|
||||||
@@ -50,8 +51,9 @@ public class CentralCommunicationActor : ReceiveActor
|
|||||||
// Periodic refresh trigger
|
// Periodic refresh trigger
|
||||||
Receive<RefreshSiteAddresses>(_ => LoadSiteAddressesFromDb());
|
Receive<RefreshSiteAddresses>(_ => LoadSiteAddressesFromDb());
|
||||||
|
|
||||||
// Site registration via heartbeats
|
// Health monitoring: heartbeats and health reports from sites
|
||||||
Receive<HeartbeatMessage>(HandleHeartbeat);
|
Receive<HeartbeatMessage>(HandleHeartbeat);
|
||||||
|
Receive<SiteHealthReport>(HandleSiteHealthReport);
|
||||||
|
|
||||||
// Connection state changes
|
// Connection state changes
|
||||||
Receive<ConnectionStateChanged>(HandleConnectionStateChanged);
|
Receive<ConnectionStateChanged>(HandleConnectionStateChanged);
|
||||||
@@ -62,10 +64,23 @@ public class CentralCommunicationActor : ReceiveActor
|
|||||||
|
|
||||||
private void HandleHeartbeat(HeartbeatMessage heartbeat)
|
private void HandleHeartbeat(HeartbeatMessage heartbeat)
|
||||||
{
|
{
|
||||||
// Forward heartbeat to parent/subscribers (central health monitoring)
|
// Forward heartbeat to parent for any interested central actors
|
||||||
Context.Parent.Tell(heartbeat);
|
Context.Parent.Tell(heartbeat);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void HandleSiteHealthReport(SiteHealthReport report)
|
||||||
|
{
|
||||||
|
var aggregator = _serviceProvider.GetService<ICentralHealthAggregator>();
|
||||||
|
if (aggregator != null)
|
||||||
|
{
|
||||||
|
aggregator.ProcessReport(report);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
_log.Warning("ICentralHealthAggregator not available, dropping health report from site {0}", report.SiteId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void HandleConnectionStateChanged(ConnectionStateChanged msg)
|
private void HandleConnectionStateChanged(ConnectionStateChanged msg)
|
||||||
{
|
{
|
||||||
if (!msg.IsConnected)
|
if (!msg.IsConnected)
|
||||||
|
|||||||
@@ -20,6 +20,7 @@
|
|||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
<ProjectReference Include="../ScadaLink.Commons/ScadaLink.Commons.csproj" />
|
<ProjectReference Include="../ScadaLink.Commons/ScadaLink.Commons.csproj" />
|
||||||
|
<ProjectReference Include="../ScadaLink.HealthMonitoring/ScadaLink.HealthMonitoring.csproj" />
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
</Project>
|
</Project>
|
||||||
|
|||||||
29
src/ScadaLink.Host/AkkaHealthReportTransport.cs
Normal file
29
src/ScadaLink.Host/AkkaHealthReportTransport.cs
Normal file
@@ -0,0 +1,29 @@
|
|||||||
|
using Akka.Actor;
|
||||||
|
using ScadaLink.Commons.Messages.Health;
|
||||||
|
using ScadaLink.HealthMonitoring;
|
||||||
|
using ScadaLink.Host.Actors;
|
||||||
|
|
||||||
|
namespace ScadaLink.Host;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Sends SiteHealthReport to the local SiteCommunicationActor via Akka ActorSelection.
|
||||||
|
/// The SiteCommunicationActor forwards it to central.
|
||||||
|
/// </summary>
|
||||||
|
public class AkkaHealthReportTransport : IHealthReportTransport
|
||||||
|
{
|
||||||
|
private readonly AkkaHostedService _akkaService;
|
||||||
|
|
||||||
|
public AkkaHealthReportTransport(AkkaHostedService akkaService)
|
||||||
|
{
|
||||||
|
_akkaService = akkaService;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void Send(SiteHealthReport report)
|
||||||
|
{
|
||||||
|
var actorSystem = _akkaService.ActorSystem;
|
||||||
|
if (actorSystem == null) return;
|
||||||
|
|
||||||
|
var siteComm = actorSystem.ActorSelection("/user/site-communication");
|
||||||
|
siteComm.Tell(report, ActorRefs.NoSender);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -151,10 +151,14 @@ try
|
|||||||
// Shared components
|
// Shared components
|
||||||
services.AddClusterInfrastructure();
|
services.AddClusterInfrastructure();
|
||||||
services.AddCommunication();
|
services.AddCommunication();
|
||||||
services.AddHealthMonitoring();
|
services.AddSiteHealthMonitoring();
|
||||||
services.AddExternalSystemGateway();
|
services.AddExternalSystemGateway();
|
||||||
services.AddNotificationService();
|
services.AddNotificationService();
|
||||||
|
|
||||||
|
// Health report transport: sends SiteHealthReport to SiteCommunicationActor via Akka
|
||||||
|
services.AddSingleton<ISiteIdentityProvider, SiteIdentityProvider>();
|
||||||
|
services.AddSingleton<IHealthReportTransport, AkkaHealthReportTransport>();
|
||||||
|
|
||||||
// Site-only components — AddSiteRuntime registers SiteStorageService with SQLite path
|
// Site-only components — AddSiteRuntime registers SiteStorageService with SQLite path
|
||||||
// and site-local repository implementations (IExternalSystemRepository, INotificationRepository)
|
// and site-local repository implementations (IExternalSystemRepository, INotificationRepository)
|
||||||
var siteDbPath = context.Configuration["ScadaLink:Database:SiteDbPath"] ?? "site.db";
|
var siteDbPath = context.Configuration["ScadaLink:Database:SiteDbPath"] ?? "site.db";
|
||||||
|
|||||||
18
src/ScadaLink.Host/SiteIdentityProvider.cs
Normal file
18
src/ScadaLink.Host/SiteIdentityProvider.cs
Normal file
@@ -0,0 +1,18 @@
|
|||||||
|
using Microsoft.Extensions.Options;
|
||||||
|
using ScadaLink.HealthMonitoring;
|
||||||
|
|
||||||
|
namespace ScadaLink.Host;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Provides the site identity from NodeOptions configuration.
|
||||||
|
/// </summary>
|
||||||
|
public class SiteIdentityProvider : ISiteIdentityProvider
|
||||||
|
{
|
||||||
|
public string SiteId { get; }
|
||||||
|
|
||||||
|
public SiteIdentityProvider(IOptions<NodeOptions> nodeOptions)
|
||||||
|
{
|
||||||
|
SiteId = nodeOptions.Value.SiteId
|
||||||
|
?? throw new InvalidOperationException("SiteId is required for site nodes.");
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user