using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Options; using ScadaLink.Commons.Messages.Health; using ScadaLink.Commons.Types.Enums; namespace ScadaLink.HealthMonitoring.Tests; /// /// A simple fake TimeProvider for testing that allows advancing time manually. /// internal sealed class TestTimeProvider : TimeProvider { private DateTimeOffset _utcNow; public TestTimeProvider(DateTimeOffset startTime) { _utcNow = startTime; } public override DateTimeOffset GetUtcNow() => _utcNow; public void Advance(TimeSpan duration) => _utcNow += duration; } public class CentralHealthAggregatorTests { private readonly TestTimeProvider _timeProvider; private readonly CentralHealthAggregator _aggregator; public CentralHealthAggregatorTests() { _timeProvider = new TestTimeProvider(DateTimeOffset.UtcNow); var options = Options.Create(new HealthMonitoringOptions { OfflineTimeout = TimeSpan.FromSeconds(60) }); _aggregator = new CentralHealthAggregator( options, NullLogger.Instance, _timeProvider); } private static SiteHealthReport MakeReport(string siteId, long seq) => new( SiteId: siteId, SequenceNumber: seq, ReportTimestamp: DateTimeOffset.UtcNow, DataConnectionStatuses: new Dictionary(), TagResolutionCounts: new Dictionary(), ScriptErrorCount: 0, AlarmEvaluationErrorCount: 0, StoreAndForwardBufferDepths: new Dictionary(), DeadLetterCount: 0, DeployedInstanceCount: 0, EnabledInstanceCount: 0, DisabledInstanceCount: 0); [Fact] public void ProcessReport_StoresState_ForNewSite() { _aggregator.ProcessReport(MakeReport("site-1", 1)); var state = _aggregator.GetSiteState("site-1"); Assert.NotNull(state); Assert.True(state.IsOnline); Assert.Equal(1, state.LastSequenceNumber); } [Fact] public void ProcessReport_UpdatesState_WhenSequenceIncreases() { _aggregator.ProcessReport(MakeReport("site-1", 1)); _aggregator.ProcessReport(MakeReport("site-1", 2)); var state = _aggregator.GetSiteState("site-1"); Assert.Equal(2, state!.LastSequenceNumber); } [Fact] public void ProcessReport_RejectsStaleReport_WhenSequenceNotGreater() { _aggregator.ProcessReport(MakeReport("site-1", 5)); _aggregator.ProcessReport(MakeReport("site-1", 3)); var state = _aggregator.GetSiteState("site-1"); Assert.Equal(5, state!.LastSequenceNumber); } [Fact] public void ProcessReport_RejectsEqualSequence() { _aggregator.ProcessReport(MakeReport("site-1", 5)); _aggregator.ProcessReport(MakeReport("site-1", 5)); var state = _aggregator.GetSiteState("site-1"); Assert.Equal(5, state!.LastSequenceNumber); } [Fact] public void OfflineDetection_SiteGoesOffline_WhenNoReportWithinTimeout() { _aggregator.ProcessReport(MakeReport("site-1", 1)); Assert.True(_aggregator.GetSiteState("site-1")!.IsOnline); // Advance past the offline timeout _timeProvider.Advance(TimeSpan.FromSeconds(61)); _aggregator.CheckForOfflineSites(); Assert.False(_aggregator.GetSiteState("site-1")!.IsOnline); } [Fact] public void OnlineRecovery_SiteComesBackOnline_WhenReportReceived() { _aggregator.ProcessReport(MakeReport("site-1", 1)); // Go offline _timeProvider.Advance(TimeSpan.FromSeconds(61)); _aggregator.CheckForOfflineSites(); Assert.False(_aggregator.GetSiteState("site-1")!.IsOnline); // Receive new report → back online _aggregator.ProcessReport(MakeReport("site-1", 2)); Assert.True(_aggregator.GetSiteState("site-1")!.IsOnline); } [Fact] public void OfflineDetection_SiteRemainsOnline_WhenReportWithinTimeout() { _aggregator.ProcessReport(MakeReport("site-1", 1)); _timeProvider.Advance(TimeSpan.FromSeconds(30)); _aggregator.CheckForOfflineSites(); Assert.True(_aggregator.GetSiteState("site-1")!.IsOnline); } [Fact] public void GetAllSiteStates_ReturnsAllKnownSites() { _aggregator.ProcessReport(MakeReport("site-1", 1)); _aggregator.ProcessReport(MakeReport("site-2", 1)); var states = _aggregator.GetAllSiteStates(); Assert.Equal(2, states.Count); Assert.Contains("site-1", states.Keys); Assert.Contains("site-2", states.Keys); } [Fact] public void GetSiteState_ReturnsNull_ForUnknownSite() { var state = _aggregator.GetSiteState("nonexistent"); Assert.Null(state); } [Fact] public void ProcessReport_StoresLatestReport() { var report = MakeReport("site-1", 1) with { ScriptErrorCount = 42 }; _aggregator.ProcessReport(report); var state = _aggregator.GetSiteState("site-1"); Assert.Equal(42, state!.LatestReport!.ScriptErrorCount); } /// /// HealthMonitoring-002 regression: SiteHealthState is mutated from multiple /// threads (ProcessReport, MarkHeartbeat, CheckForOfflineSites). With a mutable /// class and unsynchronized field writes, a snapshot read could observe a torn /// or half-applied state. The state must be immutable and every transition an /// atomic reference swap, so a snapshot is always internally consistent and the /// monotonic sequence-number guard is never subverted by a lost update. /// [Fact] public async Task ProcessReport_ConcurrentUpdates_NeverLoseSequenceOrTearState() { const int iterations = 5_000; // SiteHealthState must be an immutable record so handing the reference to // UI callers (and reading it concurrently) is safe. Assert.True(typeof(SiteHealthState).GetMethod("$") != null, "SiteHealthState must be an immutable record for safe concurrent reads."); _aggregator.ProcessReport(MakeReport("site-1", 0)); var writer = Task.Run(() => { for (long seq = 1; seq <= iterations; seq++) _aggregator.ProcessReport(MakeReport("site-1", seq)); }); var heartbeater = Task.Run(() => { for (int i = 0; i < iterations; i++) _aggregator.MarkHeartbeat("site-1", _timeProvider.GetUtcNow()); }); long maxObserved = 0; var reader = Task.Run(() => { for (int i = 0; i < iterations; i++) { var state = _aggregator.GetSiteState("site-1"); if (state == null) continue; // A consistent snapshot: the stored report's sequence number must // always match the state's LastSequenceNumber (no half-applied update). Assert.Equal(state.LastSequenceNumber, state.LatestReport!.SequenceNumber); if (state.LastSequenceNumber > maxObserved) maxObserved = state.LastSequenceNumber; } }); await Task.WhenAll(writer, heartbeater, reader); // The final state must reflect the highest sequence — no lost update. var final = _aggregator.GetSiteState("site-1"); Assert.Equal(iterations, final!.LastSequenceNumber); Assert.Equal(iterations, final.LatestReport!.SequenceNumber); Assert.True(final.IsOnline); } /// /// HealthMonitoring-007 regression: a heartbeat for a site that has not yet /// sent a full report (e.g. immediately after a central restart/failover, when /// the aggregator's in-memory state is empty) must register the site as online /// rather than being silently discarded. Otherwise reachable sites show as /// "unknown" for up to a full report interval during the failover window. /// [Fact] public void MarkHeartbeat_RegistersUnknownSite_AsOnlineAwaitingReport() { var now = _timeProvider.GetUtcNow(); _aggregator.MarkHeartbeat("site-new", now); var state = _aggregator.GetSiteState("site-new"); Assert.NotNull(state); Assert.True(state.IsOnline); Assert.Null(state.LatestReport); Assert.Equal(now, state.LastHeartbeatAt); } [Fact] public void MarkHeartbeat_KeepsSiteOnline_BetweenReports() { _aggregator.ProcessReport(MakeReport("site-1", 1)); // Time advances past the offline timeout, but heartbeats keep arriving. _timeProvider.Advance(TimeSpan.FromSeconds(45)); _aggregator.MarkHeartbeat("site-1", _timeProvider.GetUtcNow()); _timeProvider.Advance(TimeSpan.FromSeconds(45)); _aggregator.MarkHeartbeat("site-1", _timeProvider.GetUtcNow()); _aggregator.CheckForOfflineSites(); Assert.True(_aggregator.GetSiteState("site-1")!.IsOnline); } [Fact] public void MarkHeartbeat_BringsOfflineSiteBackOnline() { _aggregator.ProcessReport(MakeReport("site-1", 1)); _timeProvider.Advance(TimeSpan.FromSeconds(61)); _aggregator.CheckForOfflineSites(); Assert.False(_aggregator.GetSiteState("site-1")!.IsOnline); _aggregator.MarkHeartbeat("site-1", _timeProvider.GetUtcNow()); Assert.True(_aggregator.GetSiteState("site-1")!.IsOnline); } /// /// HealthMonitoring-005 regression: the synthetic "central" site has no /// heartbeat source — its LastHeartbeatAt is only bumped by the 30s /// CentralHealthReportLoop self-report. A single skipped/late self-report /// (leader GC pause, brief stall, mid-failover) would leave it with no signal /// for >60s and flap it offline even though the central cluster is healthy. /// The "central" keyspace entry must get a longer offline grace than real sites. /// [Fact] public void OfflineDetection_CentralSite_HasLongerGraceThanRealSites() { _aggregator.ProcessReport(MakeReport(CentralHealthReportLoop.CentralSiteId, 1)); _aggregator.ProcessReport(MakeReport("site-1", 1)); // One missed central self-report (~30s) plus the normal 60s site timeout: // a real site would already be offline here, but central must not be — // it only gets one self-report every 30s, so 60s is barely two reports. _timeProvider.Advance(TimeSpan.FromSeconds(75)); _aggregator.CheckForOfflineSites(); Assert.False(_aggregator.GetSiteState("site-1")!.IsOnline); Assert.True( _aggregator.GetSiteState(CentralHealthReportLoop.CentralSiteId)!.IsOnline, "central must survive a single missed self-report"); } [Fact] public void OfflineDetection_CentralSite_StillGoesOfflineOnGenuineLoss() { _aggregator.ProcessReport(MakeReport(CentralHealthReportLoop.CentralSiteId, 1)); // Well beyond even the central grace window — genuine total loss. _timeProvider.Advance(TimeSpan.FromMinutes(10)); _aggregator.CheckForOfflineSites(); Assert.False(_aggregator.GetSiteState(CentralHealthReportLoop.CentralSiteId)!.IsOnline); } /// /// HealthMonitoring-013 regression: the offline-check cadence must be derived /// from the *shorter* of /// and , so that if /// an operator configures CentralOfflineTimeout smaller than /// OfflineTimeout, central offline detection is still timely instead of /// being delayed by up to a full OfflineTimeout / 2. /// [Fact] public void CheckInterval_IsHalfTheShorterTimeout() { // Default: OfflineTimeout (60s) is the shorter of the two. Assert.Equal( TimeSpan.FromSeconds(30), CentralHealthAggregator.ComputeCheckInterval(new HealthMonitoringOptions { OfflineTimeout = TimeSpan.FromSeconds(60), CentralOfflineTimeout = TimeSpan.FromMinutes(3) })); // Operator configures CentralOfflineTimeout shorter — cadence must adapt. Assert.Equal( TimeSpan.FromSeconds(10), CentralHealthAggregator.ComputeCheckInterval(new HealthMonitoringOptions { OfflineTimeout = TimeSpan.FromSeconds(60), CentralOfflineTimeout = TimeSpan.FromSeconds(20) })); } [Fact] public void SequenceNumberReset_RejectedUntilExceedsPrevMax() { // Site sends seq 10, then restarts and sends seq 1. // Per design: sequence resets on singleton restart. // The aggregator will reject seq 1 < 10 — expected behavior. _aggregator.ProcessReport(MakeReport("site-1", 10)); _aggregator.ProcessReport(MakeReport("site-1", 1)); Assert.Equal(10, _aggregator.GetSiteState("site-1")!.LastSequenceNumber); // Once it exceeds the old max, it works again. SiteHealthState is an // immutable snapshot, so re-fetch to observe the new state. _aggregator.ProcessReport(MakeReport("site-1", 11)); Assert.Equal(11, _aggregator.GetSiteState("site-1")!.LastSequenceNumber); } }