using System.Collections.Concurrent; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using ScadaLink.Commons.Messages.Health; namespace ScadaLink.HealthMonitoring; /// /// Central-side aggregator that receives health reports from all sites, /// tracks latest metrics in memory, and detects offline sites. /// No persistence — display-only for Central UI consumption. /// public class CentralHealthAggregator : BackgroundService, ICentralHealthAggregator { private readonly ConcurrentDictionary _siteStates = new(); private readonly HealthMonitoringOptions _options; private readonly ILogger _logger; private readonly TimeProvider _timeProvider; public CentralHealthAggregator( IOptions options, ILogger logger, TimeProvider? timeProvider = null) { _options = options.Value; _logger = logger; _timeProvider = timeProvider ?? TimeProvider.System; } /// /// Process an incoming health report from a site. /// Only replaces stored state if incoming sequence number is greater than last received. /// Auto-marks previously offline sites as online. /// /// /// is immutable: each transition produces a brand-new /// instance, and the dictionary entry is replaced atomically. The mutation is /// performed in a compare-and-swap retry loop rather than via the /// AddOrUpdate update delegate so the sequence-number guard and the field /// writes are evaluated as a single atomic step against the value actually /// installed — the AddOrUpdate delegate may be invoked more than once /// under contention and could otherwise act on a value that is then discarded. /// public void ProcessReport(SiteHealthReport report) { var now = _timeProvider.GetUtcNow(); while (true) { if (!_siteStates.TryGetValue(report.SiteId, out var existing)) { var registered = new SiteHealthState { SiteId = report.SiteId, LatestReport = report, LastReportReceivedAt = now, LastHeartbeatAt = now, LastSequenceNumber = report.SequenceNumber, IsOnline = true }; if (_siteStates.TryAdd(report.SiteId, registered)) { _logger.LogInformation( "Site {SiteId} registered with sequence #{Seq}", report.SiteId, report.SequenceNumber); return; } // Lost the race — another thread registered first; retry as an update. continue; } if (report.SequenceNumber <= existing.LastSequenceNumber) { _logger.LogDebug( "Rejecting stale report from site {SiteId}: seq {Incoming} <= {Last}", report.SiteId, report.SequenceNumber, existing.LastSequenceNumber); return; } var updated = existing with { LatestReport = report, LastReportReceivedAt = now, LastHeartbeatAt = now, LastSequenceNumber = report.SequenceNumber, IsOnline = true }; if (_siteStates.TryUpdate(report.SiteId, updated, existing)) { if (!existing.IsOnline) { _logger.LogInformation( "Site {SiteId} is back online (seq #{Seq})", report.SiteId, report.SequenceNumber); } return; } // CAS lost — the entry changed under us; retry with the fresh value. } } /// /// Bumps the last-seen timestamp for a site. If a heartbeat arrives for a /// site the aggregator has no state for yet (e.g. immediately after a central /// restart/failover, when in-memory state is empty), the site is registered /// as online with no — heartbeats /// prove the site is reachable, so it shows online straight away rather than /// as "unknown" for up to a full report interval. The update is an atomic /// compare-and-swap of the immutable state. /// public void MarkHeartbeat(string siteId, DateTimeOffset receivedAt) { while (true) { if (!_siteStates.TryGetValue(siteId, out var existing)) { // Unknown site — register it as online, awaiting its first // full report. LatestReport and LastReportReceivedAt both stay // null until ProcessReport runs — "no report yet" is an explicit // nullable state, not a year-0001 sentinel the UI must special-case. var registered = new SiteHealthState { SiteId = siteId, LatestReport = null, LastReportReceivedAt = null, LastHeartbeatAt = receivedAt, LastSequenceNumber = 0, IsOnline = true }; if (_siteStates.TryAdd(siteId, registered)) { _logger.LogInformation( "Site {SiteId} registered online via heartbeat (awaiting first report)", siteId); return; } // Lost the race — another thread registered first; retry as an update. continue; } var newHeartbeat = receivedAt > existing.LastHeartbeatAt ? receivedAt : existing.LastHeartbeatAt; // Nothing to change — avoid a needless swap. if (newHeartbeat == existing.LastHeartbeatAt && existing.IsOnline) return; var updated = existing with { LastHeartbeatAt = newHeartbeat, IsOnline = true }; if (_siteStates.TryUpdate(siteId, updated, existing)) { if (!existing.IsOnline) _logger.LogInformation("Site {SiteId} is back online (heartbeat)", siteId); return; } // CAS lost — retry with the fresh value. } } /// /// Get the current health state for all known sites. /// public IReadOnlyDictionary GetAllSiteStates() { return new Dictionary(_siteStates); } /// /// Get the current health state for a specific site, or null if unknown. /// public SiteHealthState? GetSiteState(string siteId) { _siteStates.TryGetValue(siteId, out var state); return state; } /// /// Background task that periodically checks for offline sites. /// protected override async Task ExecuteAsync(CancellationToken stoppingToken) { _logger.LogInformation( "Central health aggregator started, offline timeout {Timeout}s (central {CentralTimeout}s)", _options.OfflineTimeout.TotalSeconds, _options.CentralOfflineTimeout.TotalSeconds); // Check at half the shorter of the two offline timeouts so detection is // timely for whichever site class (real or "central") has the tighter // window — see ComputeCheckInterval. using var timer = new PeriodicTimer(ComputeCheckInterval(_options)); while (await timer.WaitForNextTickAsync(stoppingToken).ConfigureAwait(false)) { CheckForOfflineSites(); } } /// /// Computes the offline-check timer cadence: half of the shorter of /// and /// . Deriving it /// from the shorter timeout guarantees that whichever site class has the /// tighter window is still polled at least twice within it — so if an /// operator configures CentralOfflineTimeout smaller than /// OfflineTimeout, central offline detection is not delayed by up to a /// full OfflineTimeout / 2. /// internal static TimeSpan ComputeCheckInterval(HealthMonitoringOptions options) { var shorter = options.OfflineTimeout < options.CentralOfflineTimeout ? options.OfflineTimeout : options.CentralOfflineTimeout; return TimeSpan.FromMilliseconds(shorter.TotalMilliseconds / 2); } internal void CheckForOfflineSites() { var now = _timeProvider.GetUtcNow(); foreach (var kvp in _siteStates) { var state = kvp.Value; if (!state.IsOnline) continue; // Use LastHeartbeatAt — heartbeats arrive every ~5s from any // healthy site node (cadence owned by Cluster Infrastructure / // SiteCommunicationActor — CommunicationOptions.TransportHeartbeatInterval), // so the 60s OfflineTimeout tolerates several missed heartbeats and // only fires when no node can reach central, not during single-node // failovers. // // The synthetic "central" site has no heartbeat source — its only // signal is the 30s CentralHealthReportLoop self-report — so it gets // a longer grace window (CentralOfflineTimeout) to survive a single // skipped/late self-report. var timeout = kvp.Key == CentralHealthReportLoop.CentralSiteId ? _options.CentralOfflineTimeout : _options.OfflineTimeout; var elapsed = now - state.LastHeartbeatAt; if (elapsed <= timeout) continue; // Atomically swap to an offline copy. If the CAS loses to a // concurrent report/heartbeat the site was just heard from, so // leaving it online is the correct outcome — no retry needed. var offline = state with { IsOnline = false }; if (_siteStates.TryUpdate(kvp.Key, offline, state)) { _logger.LogWarning( "Site {SiteId} marked offline — no signal for {Elapsed}s (timeout: {Timeout}s)", state.SiteId, elapsed.TotalSeconds, timeout.TotalSeconds); } } } }