using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Options; using ScadaLink.Commons.Messages.Health; using ScadaLink.Commons.Types.Enums; namespace ScadaLink.HealthMonitoring.Tests; /// /// A simple fake TimeProvider for testing that allows advancing time manually. /// internal sealed class TestTimeProvider : TimeProvider { private DateTimeOffset _utcNow; public TestTimeProvider(DateTimeOffset startTime) { _utcNow = startTime; } public override DateTimeOffset GetUtcNow() => _utcNow; public void Advance(TimeSpan duration) => _utcNow += duration; } public class CentralHealthAggregatorTests { private readonly TestTimeProvider _timeProvider; private readonly CentralHealthAggregator _aggregator; public CentralHealthAggregatorTests() { _timeProvider = new TestTimeProvider(DateTimeOffset.UtcNow); var options = Options.Create(new HealthMonitoringOptions { OfflineTimeout = TimeSpan.FromSeconds(60) }); _aggregator = new CentralHealthAggregator( options, NullLogger.Instance, _timeProvider); } private static SiteHealthReport MakeReport(string siteId, long seq) => new( SiteId: siteId, SequenceNumber: seq, ReportTimestamp: DateTimeOffset.UtcNow, DataConnectionStatuses: new Dictionary(), TagResolutionCounts: new Dictionary(), ScriptErrorCount: 0, AlarmEvaluationErrorCount: 0, StoreAndForwardBufferDepths: new Dictionary(), DeadLetterCount: 0, DeployedInstanceCount: 0, EnabledInstanceCount: 0, DisabledInstanceCount: 0); [Fact] public void ProcessReport_StoresState_ForNewSite() { _aggregator.ProcessReport(MakeReport("site-1", 1)); var state = _aggregator.GetSiteState("site-1"); Assert.NotNull(state); Assert.True(state.IsOnline); Assert.Equal(1, state.LastSequenceNumber); } [Fact] public void ProcessReport_UpdatesState_WhenSequenceIncreases() { _aggregator.ProcessReport(MakeReport("site-1", 1)); _aggregator.ProcessReport(MakeReport("site-1", 2)); var state = _aggregator.GetSiteState("site-1"); Assert.Equal(2, state!.LastSequenceNumber); } [Fact] public void ProcessReport_RejectsStaleReport_WhenSequenceNotGreater() { _aggregator.ProcessReport(MakeReport("site-1", 5)); _aggregator.ProcessReport(MakeReport("site-1", 3)); var state = _aggregator.GetSiteState("site-1"); Assert.Equal(5, state!.LastSequenceNumber); } [Fact] public void ProcessReport_RejectsEqualSequence() { _aggregator.ProcessReport(MakeReport("site-1", 5)); _aggregator.ProcessReport(MakeReport("site-1", 5)); var state = _aggregator.GetSiteState("site-1"); Assert.Equal(5, state!.LastSequenceNumber); } [Fact] public void OfflineDetection_SiteGoesOffline_WhenNoReportWithinTimeout() { _aggregator.ProcessReport(MakeReport("site-1", 1)); Assert.True(_aggregator.GetSiteState("site-1")!.IsOnline); // Advance past the offline timeout _timeProvider.Advance(TimeSpan.FromSeconds(61)); _aggregator.CheckForOfflineSites(); Assert.False(_aggregator.GetSiteState("site-1")!.IsOnline); } [Fact] public void OnlineRecovery_SiteComesBackOnline_WhenReportReceived() { _aggregator.ProcessReport(MakeReport("site-1", 1)); // Go offline _timeProvider.Advance(TimeSpan.FromSeconds(61)); _aggregator.CheckForOfflineSites(); Assert.False(_aggregator.GetSiteState("site-1")!.IsOnline); // Receive new report → back online _aggregator.ProcessReport(MakeReport("site-1", 2)); Assert.True(_aggregator.GetSiteState("site-1")!.IsOnline); } [Fact] public void OfflineDetection_SiteRemainsOnline_WhenReportWithinTimeout() { _aggregator.ProcessReport(MakeReport("site-1", 1)); _timeProvider.Advance(TimeSpan.FromSeconds(30)); _aggregator.CheckForOfflineSites(); Assert.True(_aggregator.GetSiteState("site-1")!.IsOnline); } [Fact] public void GetAllSiteStates_ReturnsAllKnownSites() { _aggregator.ProcessReport(MakeReport("site-1", 1)); _aggregator.ProcessReport(MakeReport("site-2", 1)); var states = _aggregator.GetAllSiteStates(); Assert.Equal(2, states.Count); Assert.Contains("site-1", states.Keys); Assert.Contains("site-2", states.Keys); } [Fact] public void GetSiteState_ReturnsNull_ForUnknownSite() { var state = _aggregator.GetSiteState("nonexistent"); Assert.Null(state); } [Fact] public void ProcessReport_StoresLatestReport() { var report = MakeReport("site-1", 1) with { ScriptErrorCount = 42 }; _aggregator.ProcessReport(report); var state = _aggregator.GetSiteState("site-1"); Assert.Equal(42, state!.LatestReport!.ScriptErrorCount); } /// /// HealthMonitoring-002 regression: SiteHealthState is mutated from multiple /// threads (ProcessReport, MarkHeartbeat, CheckForOfflineSites). With a mutable /// class and unsynchronized field writes, a snapshot read could observe a torn /// or half-applied state. The state must be immutable and every transition an /// atomic reference swap, so a snapshot is always internally consistent and the /// monotonic sequence-number guard is never subverted by a lost update. /// [Fact] public async Task ProcessReport_ConcurrentUpdates_NeverLoseSequenceOrTearState() { const int iterations = 5_000; // SiteHealthState must be an immutable record so handing the reference to // UI callers (and reading it concurrently) is safe. Assert.True(typeof(SiteHealthState).GetMethod("$") != null, "SiteHealthState must be an immutable record for safe concurrent reads."); _aggregator.ProcessReport(MakeReport("site-1", 0)); var writer = Task.Run(() => { for (long seq = 1; seq <= iterations; seq++) _aggregator.ProcessReport(MakeReport("site-1", seq)); }); var heartbeater = Task.Run(() => { for (int i = 0; i < iterations; i++) _aggregator.MarkHeartbeat("site-1", _timeProvider.GetUtcNow()); }); long maxObserved = 0; var reader = Task.Run(() => { for (int i = 0; i < iterations; i++) { var state = _aggregator.GetSiteState("site-1"); if (state == null) continue; // A consistent snapshot: the stored report's sequence number must // always match the state's LastSequenceNumber (no half-applied update). Assert.Equal(state.LastSequenceNumber, state.LatestReport!.SequenceNumber); if (state.LastSequenceNumber > maxObserved) maxObserved = state.LastSequenceNumber; } }); await Task.WhenAll(writer, heartbeater, reader); // The final state must reflect the highest sequence — no lost update. var final = _aggregator.GetSiteState("site-1"); Assert.Equal(iterations, final!.LastSequenceNumber); Assert.Equal(iterations, final.LatestReport!.SequenceNumber); Assert.True(final.IsOnline); } [Fact] public void SequenceNumberReset_RejectedUntilExceedsPrevMax() { // Site sends seq 10, then restarts and sends seq 1. // Per design: sequence resets on singleton restart. // The aggregator will reject seq 1 < 10 — expected behavior. _aggregator.ProcessReport(MakeReport("site-1", 10)); _aggregator.ProcessReport(MakeReport("site-1", 1)); Assert.Equal(10, _aggregator.GetSiteState("site-1")!.LastSequenceNumber); // Once it exceeds the old max, it works again. SiteHealthState is an // immutable snapshot, so re-fetch to observe the new state. _aggregator.ProcessReport(MakeReport("site-1", 11)); Assert.Equal(11, _aggregator.GetSiteState("site-1")!.LastSequenceNumber); } }