fix(health-monitoring): resolve HealthMonitoring-001/002 — populate S&F buffer depth, make SiteHealthState immutable
This commit is contained in:
@@ -8,7 +8,7 @@
|
||||
| Last reviewed | 2026-05-16 |
|
||||
| Reviewer | claude-agent |
|
||||
| Commit reviewed | `9c60592` |
|
||||
| Open findings | 12 |
|
||||
| Open findings | 10 |
|
||||
|
||||
## Summary
|
||||
|
||||
@@ -55,7 +55,7 @@ design-adherence gap.
|
||||
|--|--|
|
||||
| Severity | High |
|
||||
| Category | Design-document adherence |
|
||||
| Status | Open |
|
||||
| Status | Resolved |
|
||||
| Location | `src/ScadaLink.HealthMonitoring/SiteHealthCollector.cs:104`, `src/ScadaLink.HealthMonitoring/HealthReportSender.cs:79` |
|
||||
|
||||
**Description**
|
||||
@@ -79,7 +79,17 @@ the dead setter. Update the placeholder test accordingly once implemented.
|
||||
|
||||
**Resolution**
|
||||
|
||||
_Unresolved._
|
||||
Resolved 2026-05-16 (commit `<pending>`). `HealthReportSender.ExecuteAsync` now
|
||||
queries the existing public `StoreAndForwardStorage.GetBufferDepthByCategoryAsync()`
|
||||
API alongside the parked-count call and feeds the per-category depths into
|
||||
`SiteHealthCollector.SetStoreAndForwardDepths` (category enum names as keys), so the
|
||||
documented store-and-forward buffer depth metric is populated in every emitted
|
||||
report. Regression test `HealthReportSenderTests.ReportsIncludeStoreAndForwardBufferDepthsFromStorage`
|
||||
verifies populated per-category depths. The obsolete placeholder test
|
||||
`SiteHealthCollectorTests.StoreAndForwardBufferDepths_IsEmptyPlaceholder` continues
|
||||
to pass — it only exercises the collector with no setter call and still correctly
|
||||
asserts the empty default; it was left in place as the collector-level default-state
|
||||
test. No StoreAndForward source was modified (existing public API only).
|
||||
|
||||
### HealthMonitoring-002 — `SiteHealthState` mutable fields written from multiple threads without synchronization
|
||||
|
||||
@@ -87,7 +97,7 @@ _Unresolved._
|
||||
|--|--|
|
||||
| Severity | High |
|
||||
| Category | Concurrency & thread safety |
|
||||
| Status | Open |
|
||||
| Status | Resolved |
|
||||
| Location | `src/ScadaLink.HealthMonitoring/SiteHealthState.cs:11`, `src/ScadaLink.HealthMonitoring/CentralHealthAggregator.cs:86`, `src/ScadaLink.HealthMonitoring/CentralHealthAggregator.cs:137` |
|
||||
|
||||
**Description**
|
||||
@@ -112,7 +122,22 @@ a single atomic reference swap.
|
||||
|
||||
**Resolution**
|
||||
|
||||
_Unresolved._
|
||||
Resolved 2026-05-16 (commit `<pending>`). `SiteHealthState` is now a `sealed record`
|
||||
with `init`-only properties. `CentralHealthAggregator.ProcessReport`,
|
||||
`MarkHeartbeat`, and `CheckForOfflineSites` were rewritten to perform every state
|
||||
transition as an atomic compare-and-swap (`TryAdd`/`TryUpdate`) producing a new
|
||||
record instance — no field of a stored state is ever mutated in place. `ProcessReport`
|
||||
uses an explicit CAS retry loop instead of the `AddOrUpdate` update delegate so the
|
||||
sequence-number guard and the field writes are evaluated against the value actually
|
||||
installed (this also closes the root cause behind HealthMonitoring-003). Reads via
|
||||
`GetAllSiteStates`/`GetSiteState` now hand out immutable snapshots, so a concurrent
|
||||
reader can never observe a torn or half-applied state. `LatestReport` was changed
|
||||
from `SiteHealthReport` (`null!`) to `SiteHealthReport?`, making the contract honest;
|
||||
all existing consumers (CentralUI, integration/perf tests) already null-checked it
|
||||
and continue to build clean. Regression test
|
||||
`CentralHealthAggregatorTests.ProcessReport_ConcurrentUpdates_NeverLoseSequenceOrTearState`
|
||||
exercises concurrent report/heartbeat/read threads and asserts snapshot consistency
|
||||
and no lost updates.
|
||||
|
||||
### HealthMonitoring-003 — Shared state mutated inside `ConcurrentDictionary.AddOrUpdate` update delegate
|
||||
|
||||
|
||||
@@ -33,16 +33,24 @@ public class CentralHealthAggregator : BackgroundService, ICentralHealthAggregat
|
||||
/// Only replaces stored state if incoming sequence number is greater than last received.
|
||||
/// Auto-marks previously offline sites as online.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <see cref="SiteHealthState"/> is immutable: each transition produces a brand-new
|
||||
/// instance, and the dictionary entry is replaced atomically. The mutation is
|
||||
/// performed in a compare-and-swap retry loop rather than via the
|
||||
/// <c>AddOrUpdate</c> update delegate so the sequence-number guard and the field
|
||||
/// writes are evaluated as a single atomic step against the value actually
|
||||
/// installed — the <c>AddOrUpdate</c> delegate may be invoked more than once
|
||||
/// under contention and could otherwise act on a value that is then discarded.
|
||||
/// </remarks>
|
||||
public void ProcessReport(SiteHealthReport report)
|
||||
{
|
||||
var now = _timeProvider.GetUtcNow();
|
||||
|
||||
_siteStates.AddOrUpdate(
|
||||
report.SiteId,
|
||||
_ =>
|
||||
while (true)
|
||||
{
|
||||
_logger.LogInformation("Site {SiteId} registered with sequence #{Seq}", report.SiteId, report.SequenceNumber);
|
||||
return new SiteHealthState
|
||||
if (!_siteStates.TryGetValue(report.SiteId, out var existing))
|
||||
{
|
||||
var registered = new SiteHealthState
|
||||
{
|
||||
SiteId = report.SiteId,
|
||||
LatestReport = report,
|
||||
@@ -51,50 +59,84 @@ public class CentralHealthAggregator : BackgroundService, ICentralHealthAggregat
|
||||
LastSequenceNumber = report.SequenceNumber,
|
||||
IsOnline = true
|
||||
};
|
||||
},
|
||||
(_, existing) =>
|
||||
|
||||
if (_siteStates.TryAdd(report.SiteId, registered))
|
||||
{
|
||||
_logger.LogInformation(
|
||||
"Site {SiteId} registered with sequence #{Seq}", report.SiteId, report.SequenceNumber);
|
||||
return;
|
||||
}
|
||||
|
||||
// Lost the race — another thread registered first; retry as an update.
|
||||
continue;
|
||||
}
|
||||
|
||||
if (report.SequenceNumber <= existing.LastSequenceNumber)
|
||||
{
|
||||
_logger.LogDebug(
|
||||
"Rejecting stale report from site {SiteId}: seq {Incoming} <= {Last}",
|
||||
report.SiteId, report.SequenceNumber, existing.LastSequenceNumber);
|
||||
return existing;
|
||||
return;
|
||||
}
|
||||
|
||||
var wasOffline = !existing.IsOnline;
|
||||
existing.LatestReport = report;
|
||||
existing.LastReportReceivedAt = now;
|
||||
existing.LastHeartbeatAt = now;
|
||||
existing.LastSequenceNumber = report.SequenceNumber;
|
||||
existing.IsOnline = true;
|
||||
|
||||
if (wasOffline)
|
||||
var updated = existing with
|
||||
{
|
||||
_logger.LogInformation("Site {SiteId} is back online (seq #{Seq})", report.SiteId, report.SequenceNumber);
|
||||
LatestReport = report,
|
||||
LastReportReceivedAt = now,
|
||||
LastHeartbeatAt = now,
|
||||
LastSequenceNumber = report.SequenceNumber,
|
||||
IsOnline = true
|
||||
};
|
||||
|
||||
if (_siteStates.TryUpdate(report.SiteId, updated, existing))
|
||||
{
|
||||
if (!existing.IsOnline)
|
||||
{
|
||||
_logger.LogInformation(
|
||||
"Site {SiteId} is back online (seq #{Seq})", report.SiteId, report.SequenceNumber);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
return existing;
|
||||
});
|
||||
// CAS lost — the entry changed under us; retry with the fresh value.
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Bumps the last-seen timestamp for a site already known via a prior
|
||||
/// SiteHealthReport. Heartbeats from sites we have not yet received a
|
||||
/// full report from are ignored — registration only happens on report.
|
||||
/// The update is an atomic compare-and-swap of the immutable state.
|
||||
/// </summary>
|
||||
public void MarkHeartbeat(string siteId, DateTimeOffset receivedAt)
|
||||
{
|
||||
if (!_siteStates.TryGetValue(siteId, out var state))
|
||||
while (true)
|
||||
{
|
||||
if (!_siteStates.TryGetValue(siteId, out var existing))
|
||||
return;
|
||||
|
||||
if (receivedAt > state.LastHeartbeatAt)
|
||||
state.LastHeartbeatAt = receivedAt;
|
||||
var newHeartbeat = receivedAt > existing.LastHeartbeatAt
|
||||
? receivedAt
|
||||
: existing.LastHeartbeatAt;
|
||||
|
||||
if (!state.IsOnline)
|
||||
// Nothing to change — avoid a needless swap.
|
||||
if (newHeartbeat == existing.LastHeartbeatAt && existing.IsOnline)
|
||||
return;
|
||||
|
||||
var updated = existing with
|
||||
{
|
||||
state.IsOnline = true;
|
||||
LastHeartbeatAt = newHeartbeat,
|
||||
IsOnline = true
|
||||
};
|
||||
|
||||
if (_siteStates.TryUpdate(siteId, updated, existing))
|
||||
{
|
||||
if (!existing.IsOnline)
|
||||
_logger.LogInformation("Site {SiteId} is back online (heartbeat)", siteId);
|
||||
return;
|
||||
}
|
||||
|
||||
// CAS lost — retry with the fresh value.
|
||||
}
|
||||
}
|
||||
|
||||
@@ -143,13 +185,20 @@ public class CentralHealthAggregator : BackgroundService, ICentralHealthAggregat
|
||||
var state = kvp.Value;
|
||||
if (!state.IsOnline) continue;
|
||||
|
||||
// Use LastHeartbeatAt — heartbeats arrive every ~5s from any
|
||||
// healthy site node, so OfflineTimeout only fires when no node
|
||||
// can reach central, not during single-node failovers.
|
||||
// Use LastHeartbeatAt — heartbeats arrive frequently from any
|
||||
// healthy site node (cadence owned by Cluster Infrastructure /
|
||||
// SiteCommunicationActor), so OfflineTimeout only fires when no
|
||||
// node can reach central, not during single-node failovers.
|
||||
var elapsed = now - state.LastHeartbeatAt;
|
||||
if (elapsed > _options.OfflineTimeout)
|
||||
if (elapsed <= _options.OfflineTimeout)
|
||||
continue;
|
||||
|
||||
// Atomically swap to an offline copy. If the CAS loses to a
|
||||
// concurrent report/heartbeat the site was just heard from, so
|
||||
// leaving it online is the correct outcome — no retry needed.
|
||||
var offline = state with { IsOnline = false };
|
||||
if (_siteStates.TryUpdate(kvp.Key, offline, state))
|
||||
{
|
||||
state.IsOnline = false;
|
||||
_logger.LogWarning(
|
||||
"Site {SiteId} marked offline — no signal for {Elapsed}s (timeout: {Timeout}s)",
|
||||
state.SiteId, elapsed.TotalSeconds, _options.OfflineTimeout.TotalSeconds);
|
||||
|
||||
@@ -84,6 +84,20 @@ public class HealthReportSender : BackgroundService
|
||||
_collector.SetParkedMessageCount(parkedCount);
|
||||
}
|
||||
catch { /* Non-fatal — parked count will be 0 */ }
|
||||
|
||||
try
|
||||
{
|
||||
// Per-category pending-message buffer depths (the documented
|
||||
// "store-and-forward buffer depth" triage metric). Keyed by
|
||||
// StoreAndForwardCategory name so the central dashboard can
|
||||
// render external/notification/DB-write depths separately.
|
||||
var depthsByCategory = await _sfStorage.GetBufferDepthByCategoryAsync();
|
||||
var depths = depthsByCategory.ToDictionary(
|
||||
kvp => kvp.Key.ToString(),
|
||||
kvp => kvp.Value);
|
||||
_collector.SetStoreAndForwardDepths(depths);
|
||||
}
|
||||
catch { /* Non-fatal — buffer depths will be empty */ }
|
||||
}
|
||||
|
||||
var seq = Interlocked.Increment(ref _sequenceNumber);
|
||||
|
||||
@@ -4,26 +4,37 @@ namespace ScadaLink.HealthMonitoring;
|
||||
|
||||
/// <summary>
|
||||
/// In-memory state for a single site's health, stored by the central aggregator.
|
||||
/// Immutable: every state transition produces a new instance which the aggregator
|
||||
/// installs into its <c>ConcurrentDictionary</c> via an atomic compare-and-swap.
|
||||
/// This makes handing the reference straight to UI callers safe — a consumer can
|
||||
/// never observe a torn or half-applied update.
|
||||
/// </summary>
|
||||
public class SiteHealthState
|
||||
public sealed record SiteHealthState
|
||||
{
|
||||
public required string SiteId { get; init; }
|
||||
public SiteHealthReport LatestReport { get; set; } = null!;
|
||||
|
||||
/// <summary>
|
||||
/// The latest full <see cref="SiteHealthReport"/> received for the site, or
|
||||
/// <c>null</c> if the site is known only via heartbeats and has not yet sent
|
||||
/// a report.
|
||||
/// </summary>
|
||||
public SiteHealthReport? LatestReport { get; init; }
|
||||
|
||||
/// <summary>
|
||||
/// Time the latest full <see cref="SiteHealthReport"/> was processed.
|
||||
/// Used by the UI to surface report staleness during failover.
|
||||
/// </summary>
|
||||
public DateTimeOffset LastReportReceivedAt { get; set; }
|
||||
public DateTimeOffset LastReportReceivedAt { get; init; }
|
||||
|
||||
/// <summary>
|
||||
/// Time the most recent signal of any kind (full report OR ~5s heartbeat)
|
||||
/// was received. Drives offline detection — heartbeats from the standby
|
||||
/// keep the site marked online even when the active node is unable to
|
||||
/// produce a report (mid-failover, brief stalls).
|
||||
/// Time the most recent signal of any kind (full report OR heartbeat) was
|
||||
/// received. Drives offline detection — heartbeats from the standby keep the
|
||||
/// site marked online even when the active node is unable to produce a report
|
||||
/// (mid-failover, brief stalls). See the heartbeat scheduler owned by the
|
||||
/// Cluster Infrastructure / SiteCommunicationActor for the actual cadence.
|
||||
/// </summary>
|
||||
public DateTimeOffset LastHeartbeatAt { get; set; }
|
||||
public DateTimeOffset LastHeartbeatAt { get; init; }
|
||||
|
||||
public long LastSequenceNumber { get; set; }
|
||||
public bool IsOnline { get; set; }
|
||||
public long LastSequenceNumber { get; init; }
|
||||
public bool IsOnline { get; init; }
|
||||
}
|
||||
|
||||
@@ -161,7 +161,62 @@ public class CentralHealthAggregatorTests
|
||||
_aggregator.ProcessReport(report);
|
||||
|
||||
var state = _aggregator.GetSiteState("site-1");
|
||||
Assert.Equal(42, state!.LatestReport.ScriptErrorCount);
|
||||
Assert.Equal(42, state!.LatestReport!.ScriptErrorCount);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// HealthMonitoring-002 regression: SiteHealthState is mutated from multiple
|
||||
/// threads (ProcessReport, MarkHeartbeat, CheckForOfflineSites). With a mutable
|
||||
/// class and unsynchronized field writes, a snapshot read could observe a torn
|
||||
/// or half-applied state. The state must be immutable and every transition an
|
||||
/// atomic reference swap, so a snapshot is always internally consistent and the
|
||||
/// monotonic sequence-number guard is never subverted by a lost update.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task ProcessReport_ConcurrentUpdates_NeverLoseSequenceOrTearState()
|
||||
{
|
||||
const int iterations = 5_000;
|
||||
// SiteHealthState must be an immutable record so handing the reference to
|
||||
// UI callers (and reading it concurrently) is safe.
|
||||
Assert.True(typeof(SiteHealthState).GetMethod("<Clone>$") != null,
|
||||
"SiteHealthState must be an immutable record for safe concurrent reads.");
|
||||
|
||||
_aggregator.ProcessReport(MakeReport("site-1", 0));
|
||||
|
||||
var writer = Task.Run(() =>
|
||||
{
|
||||
for (long seq = 1; seq <= iterations; seq++)
|
||||
_aggregator.ProcessReport(MakeReport("site-1", seq));
|
||||
});
|
||||
|
||||
var heartbeater = Task.Run(() =>
|
||||
{
|
||||
for (int i = 0; i < iterations; i++)
|
||||
_aggregator.MarkHeartbeat("site-1", _timeProvider.GetUtcNow());
|
||||
});
|
||||
|
||||
long maxObserved = 0;
|
||||
var reader = Task.Run(() =>
|
||||
{
|
||||
for (int i = 0; i < iterations; i++)
|
||||
{
|
||||
var state = _aggregator.GetSiteState("site-1");
|
||||
if (state == null) continue;
|
||||
// A consistent snapshot: the stored report's sequence number must
|
||||
// always match the state's LastSequenceNumber (no half-applied update).
|
||||
Assert.Equal(state.LastSequenceNumber, state.LatestReport!.SequenceNumber);
|
||||
if (state.LastSequenceNumber > maxObserved)
|
||||
maxObserved = state.LastSequenceNumber;
|
||||
}
|
||||
});
|
||||
|
||||
await Task.WhenAll(writer, heartbeater, reader);
|
||||
|
||||
// The final state must reflect the highest sequence — no lost update.
|
||||
var final = _aggregator.GetSiteState("site-1");
|
||||
Assert.Equal(iterations, final!.LastSequenceNumber);
|
||||
Assert.Equal(iterations, final.LatestReport!.SequenceNumber);
|
||||
Assert.True(final.IsOnline);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
@@ -173,11 +228,11 @@ public class CentralHealthAggregatorTests
|
||||
_aggregator.ProcessReport(MakeReport("site-1", 10));
|
||||
_aggregator.ProcessReport(MakeReport("site-1", 1));
|
||||
|
||||
var state = _aggregator.GetSiteState("site-1");
|
||||
Assert.Equal(10, state!.LastSequenceNumber);
|
||||
Assert.Equal(10, _aggregator.GetSiteState("site-1")!.LastSequenceNumber);
|
||||
|
||||
// Once it exceeds the old max, it works again
|
||||
// Once it exceeds the old max, it works again. SiteHealthState is an
|
||||
// immutable snapshot, so re-fetch to observe the new state.
|
||||
_aggregator.ProcessReport(MakeReport("site-1", 11));
|
||||
Assert.Equal(11, state.LastSequenceNumber);
|
||||
Assert.Equal(11, _aggregator.GetSiteState("site-1")!.LastSequenceNumber);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
using Microsoft.Data.Sqlite;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using Microsoft.Extensions.Options;
|
||||
using ScadaLink.Commons.Messages.Health;
|
||||
using ScadaLink.Commons.Types.Enums;
|
||||
using ScadaLink.StoreAndForward;
|
||||
|
||||
namespace ScadaLink.HealthMonitoring.Tests;
|
||||
|
||||
@@ -136,6 +138,76 @@ public class HealthReportSenderTests
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// HealthMonitoring-001 regression: the documented "store-and-forward buffer
|
||||
/// depth" metric (pending messages by category) must actually be populated in
|
||||
/// the emitted report. Previously SetStoreAndForwardDepths had no callers, so
|
||||
/// StoreAndForwardBufferDepths was always empty. The sender must query the S&F
|
||||
/// engine's per-category depth API and include it alongside the parked count.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task ReportsIncludeStoreAndForwardBufferDepthsFromStorage()
|
||||
{
|
||||
var dbName = $"HealthSfDepth_{Guid.NewGuid():N}";
|
||||
var connStr = $"Data Source={dbName};Mode=Memory;Cache=Shared";
|
||||
// Keep one connection alive so the in-memory DB persists for the test.
|
||||
using var keepAlive = new SqliteConnection(connStr);
|
||||
keepAlive.Open();
|
||||
|
||||
var storage = new StoreAndForwardStorage(connStr, NullLogger<StoreAndForwardStorage>.Instance);
|
||||
await storage.InitializeAsync();
|
||||
|
||||
// Two pending ExternalSystem messages and one pending Notification message.
|
||||
await storage.EnqueueAsync(MakePendingMessage("m1", StoreAndForwardCategory.ExternalSystem));
|
||||
await storage.EnqueueAsync(MakePendingMessage("m2", StoreAndForwardCategory.ExternalSystem));
|
||||
await storage.EnqueueAsync(MakePendingMessage("m3", StoreAndForwardCategory.Notification));
|
||||
|
||||
var transport = new FakeTransport();
|
||||
var collector = new SiteHealthCollector();
|
||||
collector.SetActiveNode(true);
|
||||
var options = Options.Create(new HealthMonitoringOptions
|
||||
{
|
||||
ReportInterval = TimeSpan.FromMilliseconds(50)
|
||||
});
|
||||
|
||||
var sender = new HealthReportSender(
|
||||
collector,
|
||||
transport,
|
||||
options,
|
||||
NullLogger<HealthReportSender>.Instance,
|
||||
new FakeSiteIdentityProvider(),
|
||||
sfStorage: storage);
|
||||
|
||||
using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(300));
|
||||
try
|
||||
{
|
||||
await sender.StartAsync(cts.Token);
|
||||
await Task.Delay(250, CancellationToken.None);
|
||||
await sender.StopAsync(CancellationToken.None);
|
||||
}
|
||||
catch (OperationCanceledException) { }
|
||||
|
||||
Assert.True(transport.SentReports.Count >= 1);
|
||||
var depths = transport.SentReports[^1].StoreAndForwardBufferDepths;
|
||||
Assert.Equal(2, depths[nameof(StoreAndForwardCategory.ExternalSystem)]);
|
||||
Assert.Equal(1, depths[nameof(StoreAndForwardCategory.Notification)]);
|
||||
Assert.False(depths.ContainsKey(nameof(StoreAndForwardCategory.CachedDbWrite)));
|
||||
}
|
||||
|
||||
private static StoreAndForwardMessage MakePendingMessage(string id, StoreAndForwardCategory category) =>
|
||||
new()
|
||||
{
|
||||
Id = id,
|
||||
Category = category,
|
||||
Target = "target",
|
||||
PayloadJson = "{}",
|
||||
RetryCount = 0,
|
||||
MaxRetries = 50,
|
||||
RetryIntervalMs = 30_000,
|
||||
CreatedAt = DateTimeOffset.UtcNow,
|
||||
Status = StoreAndForwardMessageStatus.Pending
|
||||
};
|
||||
|
||||
[Fact]
|
||||
public void InitialSequenceNumberSeededWithUnixMs()
|
||||
{
|
||||
|
||||
@@ -10,6 +10,7 @@
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="coverlet.collector" />
|
||||
<PackageReference Include="Microsoft.Data.Sqlite" />
|
||||
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" />
|
||||
<PackageReference Include="Microsoft.Extensions.Options" />
|
||||
<PackageReference Include="Microsoft.NET.Test.Sdk" />
|
||||
|
||||
Reference in New Issue
Block a user