From 65cc7b69cdba7b948cee384cdc753ec18efe4370 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 23 Mar 2026 10:57:57 -0400 Subject: [PATCH] feat(health): wire up NodeHostname, ConnectionEndpoint, TagQuality, ParkedMessageCount collectors - AkkaHostedService: SetNodeHostname from NodeOptions - DataConnectionActor: UpdateConnectionEndpoint on state transitions, track per-tag quality counts and UpdateTagQuality on value changes - HealthReportSender: query StoreAndForwardStorage for parked message count - StoreAndForwardStorage: add GetParkedMessageCountAsync() --- .../Actors/DataConnectionActor.cs | 38 +++++++++++++++++++ .../HealthReportSender.cs | 16 +++++++- .../ScadaLink.HealthMonitoring.csproj | 1 + .../Actors/AkkaHostedService.cs | 1 + .../StoreAndForwardStorage.cs | 14 +++++++ 5 files changed, 69 insertions(+), 1 deletion(-) diff --git a/src/ScadaLink.DataConnectionLayer/Actors/DataConnectionActor.cs b/src/ScadaLink.DataConnectionLayer/Actors/DataConnectionActor.cs index 9f1df75..2b9d72b 100644 --- a/src/ScadaLink.DataConnectionLayer/Actors/DataConnectionActor.cs +++ b/src/ScadaLink.DataConnectionLayer/Actors/DataConnectionActor.cs @@ -66,6 +66,11 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers private int _totalSubscribed; private int _resolvedTags; + private int _tagsGoodQuality; + private int _tagsBadQuality; + private int _tagsUncertainQuality; + private readonly Dictionary _lastTagQuality = new(); + private IDictionary _connectionDetails; private readonly IDictionary _primaryConfig; private readonly IDictionary? _backupConfig; @@ -144,6 +149,7 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers { _log.Info("[{0}] Entering Connecting state", _connectionName); _healthCollector.UpdateConnectionHealth(_connectionName, ConnectionHealth.Connecting); + _healthCollector.UpdateConnectionEndpoint(_connectionName, "Connecting"); Become(Connecting); Self.Tell(new AttemptConnect()); } @@ -179,6 +185,8 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers _log.Info("[{0}] Entering Connected state", _connectionName); _healthCollector.UpdateConnectionHealth(_connectionName, ConnectionHealth.Connected); _healthCollector.UpdateTagResolution(_connectionName, _totalSubscribed, _resolvedTags); + var endpointLabel = _backupConfig == null ? "Connected" : $"Connected to {_activeEndpoint.ToString().ToLower()}"; + _healthCollector.UpdateConnectionEndpoint(_connectionName, endpointLabel); Become(Connected); Stash.UnstashAll(); } @@ -226,6 +234,7 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers { _log.Warning("[{0}] Entering Reconnecting state", _connectionName); _healthCollector.UpdateConnectionHealth(_connectionName, ConnectionHealth.Disconnected); + _healthCollector.UpdateConnectionEndpoint(_connectionName, "Disconnected"); Become(Reconnecting); // WP-9: Push bad quality for all subscribed tags on disconnect @@ -552,6 +561,14 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers subscriber.Tell(new ConnectionQualityChanged(_connectionName, QualityCode.Bad, now)); } + + // All tags now bad quality + _tagsGoodQuality = 0; + _tagsUncertainQuality = 0; + _tagsBadQuality = _lastTagQuality.Count; + foreach (var key in _lastTagQuality.Keys.ToList()) + _lastTagQuality[key] = QualityCode.Bad; + _healthCollector.UpdateTagQuality(_connectionName, _tagsGoodQuality, _tagsBadQuality, _tagsUncertainQuality); } // ── Re-subscribe (WP-10) ── @@ -646,6 +663,27 @@ public class DataConnectionActor : UntypedActor, IWithStash, IWithTimers _connectionName, msg.TagPath, msg.Value.Value, msg.Value.Quality, msg.Value.Timestamp)); } } + + // Track quality transitions + if (_lastTagQuality.TryGetValue(msg.TagPath, out var prevQuality)) + { + // Decrement old quality bucket + switch (prevQuality) + { + case QualityCode.Good: _tagsGoodQuality--; break; + case QualityCode.Bad: _tagsBadQuality--; break; + case QualityCode.Uncertain: _tagsUncertainQuality--; break; + } + } + // Increment new quality bucket + switch (msg.Value.Quality) + { + case QualityCode.Good: _tagsGoodQuality++; break; + case QualityCode.Bad: _tagsBadQuality++; break; + case QualityCode.Uncertain: _tagsUncertainQuality++; break; + } + _lastTagQuality[msg.TagPath] = msg.Value.Quality; + _healthCollector.UpdateTagQuality(_connectionName, _tagsGoodQuality, _tagsBadQuality, _tagsUncertainQuality); } // ── Internal messages ── diff --git a/src/ScadaLink.HealthMonitoring/HealthReportSender.cs b/src/ScadaLink.HealthMonitoring/HealthReportSender.cs index 94e6cb6..2ec652b 100644 --- a/src/ScadaLink.HealthMonitoring/HealthReportSender.cs +++ b/src/ScadaLink.HealthMonitoring/HealthReportSender.cs @@ -2,6 +2,7 @@ using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using ScadaLink.Commons.Messages.Health; +using ScadaLink.StoreAndForward; namespace ScadaLink.HealthMonitoring; @@ -16,6 +17,7 @@ public class HealthReportSender : BackgroundService private readonly HealthMonitoringOptions _options; private readonly ILogger _logger; private readonly string _siteId; + private readonly StoreAndForwardStorage? _sfStorage; private long _sequenceNumber; public HealthReportSender( @@ -23,13 +25,15 @@ public class HealthReportSender : BackgroundService IHealthReportTransport transport, IOptions options, ILogger logger, - ISiteIdentityProvider siteIdentityProvider) + ISiteIdentityProvider siteIdentityProvider, + StoreAndForwardStorage? sfStorage = null) { _collector = collector; _transport = transport; _options = options.Value; _logger = logger; _siteId = siteIdentityProvider.SiteId; + _sfStorage = sfStorage; } /// @@ -54,6 +58,16 @@ public class HealthReportSender : BackgroundService if (!_collector.IsActiveNode) continue; + if (_sfStorage != null) + { + try + { + var parkedCount = await _sfStorage.GetParkedMessageCountAsync(); + _collector.SetParkedMessageCount(parkedCount); + } + catch { /* Non-fatal — parked count will be 0 */ } + } + var seq = Interlocked.Increment(ref _sequenceNumber); var report = _collector.CollectReport(_siteId); diff --git a/src/ScadaLink.HealthMonitoring/ScadaLink.HealthMonitoring.csproj b/src/ScadaLink.HealthMonitoring/ScadaLink.HealthMonitoring.csproj index cd9a58b..b493058 100644 --- a/src/ScadaLink.HealthMonitoring/ScadaLink.HealthMonitoring.csproj +++ b/src/ScadaLink.HealthMonitoring/ScadaLink.HealthMonitoring.csproj @@ -16,6 +16,7 @@ + diff --git a/src/ScadaLink.Host/Actors/AkkaHostedService.cs b/src/ScadaLink.Host/Actors/AkkaHostedService.cs index 6a7da91..24f7310 100644 --- a/src/ScadaLink.Host/Actors/AkkaHostedService.cs +++ b/src/ScadaLink.Host/Actors/AkkaHostedService.cs @@ -241,6 +241,7 @@ akka {{ // Resolve the health collector for the Deployment Manager var siteHealthCollector = _serviceProvider.GetService(); + siteHealthCollector?.SetNodeHostname(_nodeOptions.NodeHostname); // Create SiteReplicationActor on every node (not a singleton) var sfStorage = _serviceProvider.GetRequiredService(); diff --git a/src/ScadaLink.StoreAndForward/StoreAndForwardStorage.cs b/src/ScadaLink.StoreAndForward/StoreAndForwardStorage.cs index b0a6f77..0749cc1 100644 --- a/src/ScadaLink.StoreAndForward/StoreAndForwardStorage.cs +++ b/src/ScadaLink.StoreAndForward/StoreAndForwardStorage.cs @@ -297,6 +297,20 @@ public class StoreAndForwardStorage return messages.FirstOrDefault(); } + /// + /// Gets the count of parked messages (for health reporting). + /// + public async Task GetParkedMessageCountAsync() + { + await using var conn = new SqliteConnection(_connectionString); + await conn.OpenAsync(); + await using var cmd = conn.CreateCommand(); + cmd.CommandText = "SELECT COUNT(*) FROM sf_messages WHERE status = @parked"; + cmd.Parameters.AddWithValue("@parked", (int)StoreAndForwardMessageStatus.Parked); + var result = await cmd.ExecuteScalarAsync(); + return Convert.ToInt32(result); + } + /// /// Gets total message count by status. ///