diff --git a/code-reviews/HealthMonitoring/findings.md b/code-reviews/HealthMonitoring/findings.md index 516646e..6e83a5a 100644 --- a/code-reviews/HealthMonitoring/findings.md +++ b/code-reviews/HealthMonitoring/findings.md @@ -8,7 +8,7 @@ | Last reviewed | 2026-05-16 | | Reviewer | claude-agent | | Commit reviewed | `9c60592` | -| Open findings | 12 | +| Open findings | 10 | ## Summary @@ -55,7 +55,7 @@ design-adherence gap. |--|--| | Severity | High | | Category | Design-document adherence | -| Status | Open | +| Status | Resolved | | Location | `src/ScadaLink.HealthMonitoring/SiteHealthCollector.cs:104`, `src/ScadaLink.HealthMonitoring/HealthReportSender.cs:79` | **Description** @@ -79,7 +79,17 @@ the dead setter. Update the placeholder test accordingly once implemented. **Resolution** -_Unresolved._ +Resolved 2026-05-16 (commit ``). `HealthReportSender.ExecuteAsync` now +queries the existing public `StoreAndForwardStorage.GetBufferDepthByCategoryAsync()` +API alongside the parked-count call and feeds the per-category depths into +`SiteHealthCollector.SetStoreAndForwardDepths` (category enum names as keys), so the +documented store-and-forward buffer depth metric is populated in every emitted +report. Regression test `HealthReportSenderTests.ReportsIncludeStoreAndForwardBufferDepthsFromStorage` +verifies populated per-category depths. The obsolete placeholder test +`SiteHealthCollectorTests.StoreAndForwardBufferDepths_IsEmptyPlaceholder` continues +to pass — it only exercises the collector with no setter call and still correctly +asserts the empty default; it was left in place as the collector-level default-state +test. No StoreAndForward source was modified (existing public API only). ### HealthMonitoring-002 — `SiteHealthState` mutable fields written from multiple threads without synchronization @@ -87,7 +97,7 @@ _Unresolved._ |--|--| | Severity | High | | Category | Concurrency & thread safety | -| Status | Open | +| Status | Resolved | | Location | `src/ScadaLink.HealthMonitoring/SiteHealthState.cs:11`, `src/ScadaLink.HealthMonitoring/CentralHealthAggregator.cs:86`, `src/ScadaLink.HealthMonitoring/CentralHealthAggregator.cs:137` | **Description** @@ -112,7 +122,22 @@ a single atomic reference swap. **Resolution** -_Unresolved._ +Resolved 2026-05-16 (commit ``). `SiteHealthState` is now a `sealed record` +with `init`-only properties. `CentralHealthAggregator.ProcessReport`, +`MarkHeartbeat`, and `CheckForOfflineSites` were rewritten to perform every state +transition as an atomic compare-and-swap (`TryAdd`/`TryUpdate`) producing a new +record instance — no field of a stored state is ever mutated in place. `ProcessReport` +uses an explicit CAS retry loop instead of the `AddOrUpdate` update delegate so the +sequence-number guard and the field writes are evaluated against the value actually +installed (this also closes the root cause behind HealthMonitoring-003). Reads via +`GetAllSiteStates`/`GetSiteState` now hand out immutable snapshots, so a concurrent +reader can never observe a torn or half-applied state. `LatestReport` was changed +from `SiteHealthReport` (`null!`) to `SiteHealthReport?`, making the contract honest; +all existing consumers (CentralUI, integration/perf tests) already null-checked it +and continue to build clean. Regression test +`CentralHealthAggregatorTests.ProcessReport_ConcurrentUpdates_NeverLoseSequenceOrTearState` +exercises concurrent report/heartbeat/read threads and asserts snapshot consistency +and no lost updates. ### HealthMonitoring-003 — Shared state mutated inside `ConcurrentDictionary.AddOrUpdate` update delegate diff --git a/src/ScadaLink.HealthMonitoring/CentralHealthAggregator.cs b/src/ScadaLink.HealthMonitoring/CentralHealthAggregator.cs index bff2c75..cb96627 100644 --- a/src/ScadaLink.HealthMonitoring/CentralHealthAggregator.cs +++ b/src/ScadaLink.HealthMonitoring/CentralHealthAggregator.cs @@ -33,16 +33,24 @@ public class CentralHealthAggregator : BackgroundService, ICentralHealthAggregat /// Only replaces stored state if incoming sequence number is greater than last received. /// Auto-marks previously offline sites as online. /// + /// + /// is immutable: each transition produces a brand-new + /// instance, and the dictionary entry is replaced atomically. The mutation is + /// performed in a compare-and-swap retry loop rather than via the + /// AddOrUpdate update delegate so the sequence-number guard and the field + /// writes are evaluated as a single atomic step against the value actually + /// installed — the AddOrUpdate delegate may be invoked more than once + /// under contention and could otherwise act on a value that is then discarded. + /// public void ProcessReport(SiteHealthReport report) { var now = _timeProvider.GetUtcNow(); - _siteStates.AddOrUpdate( - report.SiteId, - _ => + while (true) + { + if (!_siteStates.TryGetValue(report.SiteId, out var existing)) { - _logger.LogInformation("Site {SiteId} registered with sequence #{Seq}", report.SiteId, report.SequenceNumber); - return new SiteHealthState + var registered = new SiteHealthState { SiteId = report.SiteId, LatestReport = report, @@ -51,50 +59,84 @@ public class CentralHealthAggregator : BackgroundService, ICentralHealthAggregat LastSequenceNumber = report.SequenceNumber, IsOnline = true }; - }, - (_, existing) => + + if (_siteStates.TryAdd(report.SiteId, registered)) + { + _logger.LogInformation( + "Site {SiteId} registered with sequence #{Seq}", report.SiteId, report.SequenceNumber); + return; + } + + // Lost the race — another thread registered first; retry as an update. + continue; + } + + if (report.SequenceNumber <= existing.LastSequenceNumber) { - if (report.SequenceNumber <= existing.LastSequenceNumber) + _logger.LogDebug( + "Rejecting stale report from site {SiteId}: seq {Incoming} <= {Last}", + report.SiteId, report.SequenceNumber, existing.LastSequenceNumber); + return; + } + + var updated = existing with + { + LatestReport = report, + LastReportReceivedAt = now, + LastHeartbeatAt = now, + LastSequenceNumber = report.SequenceNumber, + IsOnline = true + }; + + if (_siteStates.TryUpdate(report.SiteId, updated, existing)) + { + if (!existing.IsOnline) { - _logger.LogDebug( - "Rejecting stale report from site {SiteId}: seq {Incoming} <= {Last}", - report.SiteId, report.SequenceNumber, existing.LastSequenceNumber); - return existing; + _logger.LogInformation( + "Site {SiteId} is back online (seq #{Seq})", report.SiteId, report.SequenceNumber); } + return; + } - var wasOffline = !existing.IsOnline; - existing.LatestReport = report; - existing.LastReportReceivedAt = now; - existing.LastHeartbeatAt = now; - existing.LastSequenceNumber = report.SequenceNumber; - existing.IsOnline = true; - - if (wasOffline) - { - _logger.LogInformation("Site {SiteId} is back online (seq #{Seq})", report.SiteId, report.SequenceNumber); - } - - return existing; - }); + // CAS lost — the entry changed under us; retry with the fresh value. + } } /// /// Bumps the last-seen timestamp for a site already known via a prior /// SiteHealthReport. Heartbeats from sites we have not yet received a /// full report from are ignored — registration only happens on report. + /// The update is an atomic compare-and-swap of the immutable state. /// public void MarkHeartbeat(string siteId, DateTimeOffset receivedAt) { - if (!_siteStates.TryGetValue(siteId, out var state)) - return; - - if (receivedAt > state.LastHeartbeatAt) - state.LastHeartbeatAt = receivedAt; - - if (!state.IsOnline) + while (true) { - state.IsOnline = true; - _logger.LogInformation("Site {SiteId} is back online (heartbeat)", siteId); + if (!_siteStates.TryGetValue(siteId, out var existing)) + return; + + var newHeartbeat = receivedAt > existing.LastHeartbeatAt + ? receivedAt + : existing.LastHeartbeatAt; + + // Nothing to change — avoid a needless swap. + if (newHeartbeat == existing.LastHeartbeatAt && existing.IsOnline) + return; + + var updated = existing with + { + LastHeartbeatAt = newHeartbeat, + IsOnline = true + }; + + if (_siteStates.TryUpdate(siteId, updated, existing)) + { + if (!existing.IsOnline) + _logger.LogInformation("Site {SiteId} is back online (heartbeat)", siteId); + return; + } + + // CAS lost — retry with the fresh value. } } @@ -143,13 +185,20 @@ public class CentralHealthAggregator : BackgroundService, ICentralHealthAggregat var state = kvp.Value; if (!state.IsOnline) continue; - // Use LastHeartbeatAt — heartbeats arrive every ~5s from any - // healthy site node, so OfflineTimeout only fires when no node - // can reach central, not during single-node failovers. + // Use LastHeartbeatAt — heartbeats arrive frequently from any + // healthy site node (cadence owned by Cluster Infrastructure / + // SiteCommunicationActor), so OfflineTimeout only fires when no + // node can reach central, not during single-node failovers. var elapsed = now - state.LastHeartbeatAt; - if (elapsed > _options.OfflineTimeout) + if (elapsed <= _options.OfflineTimeout) + continue; + + // Atomically swap to an offline copy. If the CAS loses to a + // concurrent report/heartbeat the site was just heard from, so + // leaving it online is the correct outcome — no retry needed. + var offline = state with { IsOnline = false }; + if (_siteStates.TryUpdate(kvp.Key, offline, state)) { - state.IsOnline = false; _logger.LogWarning( "Site {SiteId} marked offline — no signal for {Elapsed}s (timeout: {Timeout}s)", state.SiteId, elapsed.TotalSeconds, _options.OfflineTimeout.TotalSeconds); diff --git a/src/ScadaLink.HealthMonitoring/HealthReportSender.cs b/src/ScadaLink.HealthMonitoring/HealthReportSender.cs index 7366e48..dc640d4 100644 --- a/src/ScadaLink.HealthMonitoring/HealthReportSender.cs +++ b/src/ScadaLink.HealthMonitoring/HealthReportSender.cs @@ -84,6 +84,20 @@ public class HealthReportSender : BackgroundService _collector.SetParkedMessageCount(parkedCount); } catch { /* Non-fatal — parked count will be 0 */ } + + try + { + // Per-category pending-message buffer depths (the documented + // "store-and-forward buffer depth" triage metric). Keyed by + // StoreAndForwardCategory name so the central dashboard can + // render external/notification/DB-write depths separately. + var depthsByCategory = await _sfStorage.GetBufferDepthByCategoryAsync(); + var depths = depthsByCategory.ToDictionary( + kvp => kvp.Key.ToString(), + kvp => kvp.Value); + _collector.SetStoreAndForwardDepths(depths); + } + catch { /* Non-fatal — buffer depths will be empty */ } } var seq = Interlocked.Increment(ref _sequenceNumber); diff --git a/src/ScadaLink.HealthMonitoring/SiteHealthState.cs b/src/ScadaLink.HealthMonitoring/SiteHealthState.cs index 2b77ef0..2ec0d2a 100644 --- a/src/ScadaLink.HealthMonitoring/SiteHealthState.cs +++ b/src/ScadaLink.HealthMonitoring/SiteHealthState.cs @@ -4,26 +4,37 @@ namespace ScadaLink.HealthMonitoring; /// /// In-memory state for a single site's health, stored by the central aggregator. +/// Immutable: every state transition produces a new instance which the aggregator +/// installs into its ConcurrentDictionary via an atomic compare-and-swap. +/// This makes handing the reference straight to UI callers safe — a consumer can +/// never observe a torn or half-applied update. /// -public class SiteHealthState +public sealed record SiteHealthState { public required string SiteId { get; init; } - public SiteHealthReport LatestReport { get; set; } = null!; + + /// + /// The latest full received for the site, or + /// null if the site is known only via heartbeats and has not yet sent + /// a report. + /// + public SiteHealthReport? LatestReport { get; init; } /// /// Time the latest full was processed. /// Used by the UI to surface report staleness during failover. /// - public DateTimeOffset LastReportReceivedAt { get; set; } + public DateTimeOffset LastReportReceivedAt { get; init; } /// - /// Time the most recent signal of any kind (full report OR ~5s heartbeat) - /// was received. Drives offline detection — heartbeats from the standby - /// keep the site marked online even when the active node is unable to - /// produce a report (mid-failover, brief stalls). + /// Time the most recent signal of any kind (full report OR heartbeat) was + /// received. Drives offline detection — heartbeats from the standby keep the + /// site marked online even when the active node is unable to produce a report + /// (mid-failover, brief stalls). See the heartbeat scheduler owned by the + /// Cluster Infrastructure / SiteCommunicationActor for the actual cadence. /// - public DateTimeOffset LastHeartbeatAt { get; set; } + public DateTimeOffset LastHeartbeatAt { get; init; } - public long LastSequenceNumber { get; set; } - public bool IsOnline { get; set; } + public long LastSequenceNumber { get; init; } + public bool IsOnline { get; init; } } diff --git a/tests/ScadaLink.HealthMonitoring.Tests/CentralHealthAggregatorTests.cs b/tests/ScadaLink.HealthMonitoring.Tests/CentralHealthAggregatorTests.cs index d475f07..fd1f460 100644 --- a/tests/ScadaLink.HealthMonitoring.Tests/CentralHealthAggregatorTests.cs +++ b/tests/ScadaLink.HealthMonitoring.Tests/CentralHealthAggregatorTests.cs @@ -161,7 +161,62 @@ public class CentralHealthAggregatorTests _aggregator.ProcessReport(report); var state = _aggregator.GetSiteState("site-1"); - Assert.Equal(42, state!.LatestReport.ScriptErrorCount); + Assert.Equal(42, state!.LatestReport!.ScriptErrorCount); + } + + /// + /// HealthMonitoring-002 regression: SiteHealthState is mutated from multiple + /// threads (ProcessReport, MarkHeartbeat, CheckForOfflineSites). With a mutable + /// class and unsynchronized field writes, a snapshot read could observe a torn + /// or half-applied state. The state must be immutable and every transition an + /// atomic reference swap, so a snapshot is always internally consistent and the + /// monotonic sequence-number guard is never subverted by a lost update. + /// + [Fact] + public async Task ProcessReport_ConcurrentUpdates_NeverLoseSequenceOrTearState() + { + const int iterations = 5_000; + // SiteHealthState must be an immutable record so handing the reference to + // UI callers (and reading it concurrently) is safe. + Assert.True(typeof(SiteHealthState).GetMethod("$") != null, + "SiteHealthState must be an immutable record for safe concurrent reads."); + + _aggregator.ProcessReport(MakeReport("site-1", 0)); + + var writer = Task.Run(() => + { + for (long seq = 1; seq <= iterations; seq++) + _aggregator.ProcessReport(MakeReport("site-1", seq)); + }); + + var heartbeater = Task.Run(() => + { + for (int i = 0; i < iterations; i++) + _aggregator.MarkHeartbeat("site-1", _timeProvider.GetUtcNow()); + }); + + long maxObserved = 0; + var reader = Task.Run(() => + { + for (int i = 0; i < iterations; i++) + { + var state = _aggregator.GetSiteState("site-1"); + if (state == null) continue; + // A consistent snapshot: the stored report's sequence number must + // always match the state's LastSequenceNumber (no half-applied update). + Assert.Equal(state.LastSequenceNumber, state.LatestReport!.SequenceNumber); + if (state.LastSequenceNumber > maxObserved) + maxObserved = state.LastSequenceNumber; + } + }); + + await Task.WhenAll(writer, heartbeater, reader); + + // The final state must reflect the highest sequence — no lost update. + var final = _aggregator.GetSiteState("site-1"); + Assert.Equal(iterations, final!.LastSequenceNumber); + Assert.Equal(iterations, final.LatestReport!.SequenceNumber); + Assert.True(final.IsOnline); } [Fact] @@ -173,11 +228,11 @@ public class CentralHealthAggregatorTests _aggregator.ProcessReport(MakeReport("site-1", 10)); _aggregator.ProcessReport(MakeReport("site-1", 1)); - var state = _aggregator.GetSiteState("site-1"); - Assert.Equal(10, state!.LastSequenceNumber); + Assert.Equal(10, _aggregator.GetSiteState("site-1")!.LastSequenceNumber); - // Once it exceeds the old max, it works again + // Once it exceeds the old max, it works again. SiteHealthState is an + // immutable snapshot, so re-fetch to observe the new state. _aggregator.ProcessReport(MakeReport("site-1", 11)); - Assert.Equal(11, state.LastSequenceNumber); + Assert.Equal(11, _aggregator.GetSiteState("site-1")!.LastSequenceNumber); } } diff --git a/tests/ScadaLink.HealthMonitoring.Tests/HealthReportSenderTests.cs b/tests/ScadaLink.HealthMonitoring.Tests/HealthReportSenderTests.cs index 5ec5ab1..c23c06b 100644 --- a/tests/ScadaLink.HealthMonitoring.Tests/HealthReportSenderTests.cs +++ b/tests/ScadaLink.HealthMonitoring.Tests/HealthReportSenderTests.cs @@ -1,7 +1,9 @@ +using Microsoft.Data.Sqlite; using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Options; using ScadaLink.Commons.Messages.Health; using ScadaLink.Commons.Types.Enums; +using ScadaLink.StoreAndForward; namespace ScadaLink.HealthMonitoring.Tests; @@ -136,6 +138,76 @@ public class HealthReportSenderTests } } + /// + /// HealthMonitoring-001 regression: the documented "store-and-forward buffer + /// depth" metric (pending messages by category) must actually be populated in + /// the emitted report. Previously SetStoreAndForwardDepths had no callers, so + /// StoreAndForwardBufferDepths was always empty. The sender must query the S&F + /// engine's per-category depth API and include it alongside the parked count. + /// + [Fact] + public async Task ReportsIncludeStoreAndForwardBufferDepthsFromStorage() + { + var dbName = $"HealthSfDepth_{Guid.NewGuid():N}"; + var connStr = $"Data Source={dbName};Mode=Memory;Cache=Shared"; + // Keep one connection alive so the in-memory DB persists for the test. + using var keepAlive = new SqliteConnection(connStr); + keepAlive.Open(); + + var storage = new StoreAndForwardStorage(connStr, NullLogger.Instance); + await storage.InitializeAsync(); + + // Two pending ExternalSystem messages and one pending Notification message. + await storage.EnqueueAsync(MakePendingMessage("m1", StoreAndForwardCategory.ExternalSystem)); + await storage.EnqueueAsync(MakePendingMessage("m2", StoreAndForwardCategory.ExternalSystem)); + await storage.EnqueueAsync(MakePendingMessage("m3", StoreAndForwardCategory.Notification)); + + var transport = new FakeTransport(); + var collector = new SiteHealthCollector(); + collector.SetActiveNode(true); + var options = Options.Create(new HealthMonitoringOptions + { + ReportInterval = TimeSpan.FromMilliseconds(50) + }); + + var sender = new HealthReportSender( + collector, + transport, + options, + NullLogger.Instance, + new FakeSiteIdentityProvider(), + sfStorage: storage); + + using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(300)); + try + { + await sender.StartAsync(cts.Token); + await Task.Delay(250, CancellationToken.None); + await sender.StopAsync(CancellationToken.None); + } + catch (OperationCanceledException) { } + + Assert.True(transport.SentReports.Count >= 1); + var depths = transport.SentReports[^1].StoreAndForwardBufferDepths; + Assert.Equal(2, depths[nameof(StoreAndForwardCategory.ExternalSystem)]); + Assert.Equal(1, depths[nameof(StoreAndForwardCategory.Notification)]); + Assert.False(depths.ContainsKey(nameof(StoreAndForwardCategory.CachedDbWrite))); + } + + private static StoreAndForwardMessage MakePendingMessage(string id, StoreAndForwardCategory category) => + new() + { + Id = id, + Category = category, + Target = "target", + PayloadJson = "{}", + RetryCount = 0, + MaxRetries = 50, + RetryIntervalMs = 30_000, + CreatedAt = DateTimeOffset.UtcNow, + Status = StoreAndForwardMessageStatus.Pending + }; + [Fact] public void InitialSequenceNumberSeededWithUnixMs() { diff --git a/tests/ScadaLink.HealthMonitoring.Tests/ScadaLink.HealthMonitoring.Tests.csproj b/tests/ScadaLink.HealthMonitoring.Tests/ScadaLink.HealthMonitoring.Tests.csproj index 1de697f..da63b84 100644 --- a/tests/ScadaLink.HealthMonitoring.Tests/ScadaLink.HealthMonitoring.Tests.csproj +++ b/tests/ScadaLink.HealthMonitoring.Tests/ScadaLink.HealthMonitoring.Tests.csproj @@ -10,6 +10,7 @@ +