using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; namespace ZB.MOM.WW.ScadaBridge.HealthMonitoring; /// /// Central-side counterpart to . /// Periodically builds a SiteHealthReport for the central cluster itself /// (siteId = ) and feeds it into the local /// CentralHealthAggregator so the UI can render central as another card /// on /monitoring/health. Only the cluster leader (Primary) generates /// reports — the standby's aggregator catches up on failover when it /// becomes Primary and starts its own loop. /// public class CentralHealthReportLoop : BackgroundService { /// /// Reserved siteId used to represent the central cluster in the /// shared CentralHealthAggregator keyspace. /// /// HealthMonitoring-021: the value is prefixed with $ — a character /// that is forbidden in real site identifiers (the configuration / /// repository layer only permits Sites whose SiteIdentifier is a /// plain identifier) — so the synthetic central entry cannot collide with /// a real site whose operator-set identifier happened to be the bare word /// "central". A collision would have caused the two reports to clobber /// each other in the aggregator keyspace via the sequence-number guard, /// and the real site would inherit the longer /// grace and /// stay falsely-online for an extra two minutes after going down. /// Consumers (, /// the Central UI health dashboard) reference this constant rather than /// the literal string, so the change is local. /// public const string CentralSiteId = "$central"; private readonly ISiteHealthCollector _collector; private readonly ICentralHealthAggregator _aggregator; private readonly IClusterNodeProvider _clusterNodeProvider; private readonly HealthMonitoringOptions _options; private readonly ILogger _logger; // Seeded with Unix-ms so reports from a newly-elected central leader // always sort after reports from any prior leader for siteId="central". // The clock is read through the injected TimeProvider so the seeding is // deterministically testable. private long _sequenceNumber; /// /// Initializes the central health report loop. /// /// Local health metrics collector for the central node. /// Aggregator that stores reports for the Central UI health dashboard. /// Provider used to determine whether this node is primary. /// Health monitoring configuration (report interval, offline threshold). /// Logger for diagnostics. /// Optional time provider; defaults to . public CentralHealthReportLoop( ISiteHealthCollector collector, ICentralHealthAggregator aggregator, IClusterNodeProvider clusterNodeProvider, IOptions options, ILogger logger, TimeProvider? timeProvider = null) { _collector = collector; _aggregator = aggregator; _clusterNodeProvider = clusterNodeProvider; _options = options.Value; _logger = logger; _sequenceNumber = (timeProvider ?? TimeProvider.System).GetUtcNow().ToUnixTimeMilliseconds(); } /// /// Current sequence number (for testing). /// public long CurrentSequenceNumber => Interlocked.Read(ref _sequenceNumber); /// protected override async Task ExecuteAsync(CancellationToken stoppingToken) { _logger.LogInformation( "Central health report loop starting, interval {Interval}s", _options.ReportInterval.TotalSeconds); using var timer = new PeriodicTimer(_options.ReportInterval); while (await timer.WaitForNextTickAsync(stoppingToken).ConfigureAwait(false)) { try { var isPrimary = _clusterNodeProvider.SelfIsPrimary; _collector.SetActiveNode(isPrimary); if (!isPrimary) continue; _collector.SetClusterNodes(_clusterNodeProvider.GetClusterNodes()); var seq = Interlocked.Increment(ref _sequenceNumber); // HealthMonitoring-018: CollectReport atomically read-and-resets // the per-interval error counters via Interlocked.Exchange. If // ProcessReport throws (or any other failure occurs between the // collect and the publish), those counts would otherwise be // lost — neither in the un-published report nor in the // now-zeroed collector. Snapshot the freshly-collected report // so that on a publish failure we can atomically restore the // counts back into the shared SiteHealthCollector via // Interlocked.Add. Concurrent increments arriving during the // ProcessReport call are preserved on the counter; the restore // Add safely sums with any such concurrent increments. Same // shape as the HealthMonitoring-017 fix in HealthReportSender. var report = _collector.CollectReport(CentralSiteId); var reportWithSeq = report with { SequenceNumber = seq }; try { _aggregator.ProcessReport(reportWithSeq); } catch { // Restore the captured per-interval counters atomically so // they roll forward into the next report — see // HealthMonitoring-018. _collector.AddIntervalCounters( scriptErrors: report.ScriptErrorCount, alarmErrors: report.AlarmEvaluationErrorCount, deadLetters: report.DeadLetterCount, siteAuditWriteFailures: report.SiteAuditWriteFailures, auditRedactionFailures: report.AuditRedactionFailure); throw; } _logger.LogDebug("Generated central health report #{Seq}", seq); } catch (Exception ex) { _logger.LogError(ex, "Failed to generate central health report"); } } } }