fix(health-monitoring): resolve HealthMonitoring-003..009 — central offline grace, register unknown-site heartbeats, test coverage
This commit is contained in:
@@ -8,7 +8,7 @@
|
||||
| Last reviewed | 2026-05-16 |
|
||||
| Reviewer | claude-agent |
|
||||
| Commit reviewed | `9c60592` |
|
||||
| Open findings | 10 |
|
||||
| Open findings | 5 |
|
||||
|
||||
## Summary
|
||||
|
||||
@@ -143,10 +143,10 @@ and no lost updates.
|
||||
|
||||
| | |
|
||||
|--|--|
|
||||
| Severity | Medium |
|
||||
| Severity | Medium — re-triaged: already resolved as a side-effect of HealthMonitoring-002. |
|
||||
| Category | Concurrency & thread safety |
|
||||
| Status | Open |
|
||||
| Location | `src/ScadaLink.HealthMonitoring/CentralHealthAggregator.cs:55-78` |
|
||||
| Status | Resolved |
|
||||
| Location | `src/ScadaLink.HealthMonitoring/CentralHealthAggregator.cs:45-103` |
|
||||
|
||||
**Description**
|
||||
|
||||
@@ -169,7 +169,17 @@ reference swap with no observable intermediate state.
|
||||
|
||||
**Resolution**
|
||||
|
||||
_Unresolved._
|
||||
Resolved 2026-05-16 (commit `pending`). Re-triaged: verified against the current
|
||||
source — the root cause was already eliminated by the HealthMonitoring-002 fix.
|
||||
`ProcessReport` no longer uses `AddOrUpdate` at all; it is now an explicit
|
||||
compare-and-swap retry loop (`TryGetValue` → guard → `TryAdd`/`TryUpdate`) that
|
||||
produces a brand-new immutable `SiteHealthState` record per transition and never
|
||||
mutates a stored value in place. The sequence-number guard and the field writes are
|
||||
evaluated against the value actually installed by the CAS, so the "only replace if
|
||||
sequence is higher" invariant holds. The concurrency stress test
|
||||
`CentralHealthAggregatorTests.ProcessReport_ConcurrentUpdates_NeverLoseSequenceOrTearState`
|
||||
(added under HealthMonitoring-002) already exercises this path and asserts no lost
|
||||
updates and no torn snapshots. No further code change was required for this finding.
|
||||
|
||||
### HealthMonitoring-004 — Inconsistent heartbeat interval described across XML docs
|
||||
|
||||
@@ -205,7 +215,7 @@ _Unresolved._
|
||||
|--|--|
|
||||
| Severity | Medium |
|
||||
| Category | Correctness & logic bugs |
|
||||
| Status | Open |
|
||||
| Status | Resolved |
|
||||
| Location | `src/ScadaLink.HealthMonitoring/CentralHealthReportLoop.cs:48-81`, `src/ScadaLink.HealthMonitoring/CentralHealthAggregator.cs:149` |
|
||||
|
||||
**Description**
|
||||
@@ -230,7 +240,18 @@ leader starts the report loop promptly on failover.
|
||||
|
||||
**Resolution**
|
||||
|
||||
_Unresolved._
|
||||
Resolved 2026-05-16 (commit `pending`). Applied the distinct-timeout option. A new
|
||||
`HealthMonitoringOptions.CentralOfflineTimeout` (default 3x the report interval =
|
||||
3 minutes) is applied by `CentralHealthAggregator.CheckForOfflineSites` to the
|
||||
`central` keyspace entry only — real sites keep the existing `OfflineTimeout`. This
|
||||
gives the synthetic `central` site (which has no heartbeat source and is fed solely
|
||||
by the 30s leader-only `CentralHealthReportLoop`) enough grace to survive a single
|
||||
skipped or late self-report — the equivalent of the "one missed report" grace the
|
||||
design doc grants real sites — while still going offline on genuine total loss.
|
||||
Regression tests `CentralHealthAggregatorTests.OfflineDetection_CentralSite_HasLongerGraceThanRealSites`
|
||||
(central survives 75s of silence while a real site goes offline) and
|
||||
`OfflineDetection_CentralSite_StillGoesOfflineOnGenuineLoss` (central still detected
|
||||
offline after 10 minutes) verify the behaviour.
|
||||
|
||||
### HealthMonitoring-006 — Sequence seeding contradicts the doc's "starting at 1" wording and is untestable
|
||||
|
||||
@@ -270,7 +291,7 @@ _Unresolved._
|
||||
|--|--|
|
||||
| Severity | Medium |
|
||||
| Category | Correctness & logic bugs |
|
||||
| Status | Open |
|
||||
| Status | Resolved |
|
||||
| Location | `src/ScadaLink.HealthMonitoring/CentralHealthAggregator.cs:86-99` |
|
||||
|
||||
**Description**
|
||||
@@ -293,16 +314,29 @@ after a central restart.
|
||||
|
||||
**Resolution**
|
||||
|
||||
_Unresolved._
|
||||
Resolved 2026-05-16 (commit `pending`). `CentralHealthAggregator.MarkHeartbeat` no
|
||||
longer returns early for an unknown site. When a heartbeat arrives for a site with no
|
||||
aggregator state, it now atomically registers (`TryAdd`, with CAS-loss retry) a
|
||||
minimal `SiteHealthState` that is `IsOnline = true`, `LatestReport = null`,
|
||||
`LastSequenceNumber = 0` and `LastHeartbeatAt = receivedAt` — an "online, awaiting
|
||||
first report" state. This relies on the HealthMonitoring-002 change that made
|
||||
`LatestReport` properly nullable, so UI consumers already handle the null case.
|
||||
Reachable sites therefore show online immediately after a central restart/failover
|
||||
instead of being absent ("unknown") for up to a full report interval. The
|
||||
`ICentralHealthAggregator.MarkHeartbeat` XML doc was corrected to describe the new
|
||||
behaviour. Regression test
|
||||
`CentralHealthAggregatorTests.MarkHeartbeat_RegistersUnknownSite_AsOnlineAwaitingReport`
|
||||
verifies the registration; `MarkHeartbeat_KeepsSiteOnline_BetweenReports` and
|
||||
`MarkHeartbeat_BringsOfflineSiteBackOnline` cover the already-registered paths.
|
||||
|
||||
### HealthMonitoring-008 — `GetAllSiteStates` / `GetSiteState` leak live mutable state objects to callers
|
||||
|
||||
| | |
|
||||
|--|--|
|
||||
| Severity | Medium |
|
||||
| Severity | Medium — re-triaged: already resolved as a side-effect of HealthMonitoring-002. |
|
||||
| Category | Concurrency & thread safety |
|
||||
| Status | Open |
|
||||
| Location | `src/ScadaLink.HealthMonitoring/CentralHealthAggregator.cs:104-116` |
|
||||
| Status | Resolved |
|
||||
| Location | `src/ScadaLink.HealthMonitoring/CentralHealthAggregator.cs:146-158` |
|
||||
|
||||
**Description**
|
||||
|
||||
@@ -323,7 +357,15 @@ state into an immutable DTO before returning.
|
||||
|
||||
**Resolution**
|
||||
|
||||
_Unresolved._
|
||||
Resolved 2026-05-16 (commit `pending`). Re-triaged: verified against the current
|
||||
source — the root cause was already eliminated by the HealthMonitoring-002 fix.
|
||||
`SiteHealthState` is now a `sealed record` with `init`-only properties (fully
|
||||
immutable). Every aggregator transition installs a brand-new record instance via an
|
||||
atomic compare-and-swap, so the references `GetAllSiteStates` and `GetSiteState` hand
|
||||
out are immutable snapshots — a UI consumer reading one on its own thread can never
|
||||
observe a torn or half-applied state, and cannot mutate aggregator state through the
|
||||
returned reference. The recommended fix (make `SiteHealthState` a record) is exactly
|
||||
what the HealthMonitoring-002 change did, so no further code change was required.
|
||||
|
||||
### HealthMonitoring-009 — Missing test coverage for central report loop, heartbeat path, replication, and collector setters
|
||||
|
||||
@@ -331,7 +373,7 @@ _Unresolved._
|
||||
|--|--|
|
||||
| Severity | Medium |
|
||||
| Category | Testing coverage |
|
||||
| Status | Open |
|
||||
| Status | Resolved |
|
||||
| Location | `tests/ScadaLink.HealthMonitoring.Tests/` |
|
||||
|
||||
**Description**
|
||||
@@ -358,7 +400,29 @@ setters' presence in `CollectReport` output.
|
||||
|
||||
**Resolution**
|
||||
|
||||
_Unresolved._
|
||||
Resolved 2026-05-16 (commit `pending`). Added the missing coverage:
|
||||
|
||||
- **`CentralHealthReportLoopTests`** (new file) — `GeneratesCentralReports_WhenSelfIsPrimary`,
|
||||
`GeneratesNoReports_WhenNotPrimary` (leader-only `SelfIsPrimary` gating with a fake
|
||||
`IClusterNodeProvider`), `AssignsMonotonicSequenceNumbers`, and
|
||||
`SetsActiveNodeFlag_EvenWhenNotPrimary`.
|
||||
- **`CentralHealthAggregatorTests`** — `MarkHeartbeat_RegistersUnknownSite_AsOnlineAwaitingReport`,
|
||||
`MarkHeartbeat_KeepsSiteOnline_BetweenReports` (heartbeat keeps a site online past
|
||||
the offline timeout — the path the design depends on), and
|
||||
`MarkHeartbeat_BringsOfflineSiteBackOnline`.
|
||||
- **`SiteHealthCollectorTests`** — reflected-in-report tests for `SetClusterNodes`,
|
||||
`SetInstanceCounts`, `SetParkedMessageCount`, `SetNodeHostname`,
|
||||
`SetActiveNode`/`NodeRole`, `UpdateTagQuality`, `UpdateConnectionEndpoint`, and
|
||||
`SetStoreAndForwardDepths`.
|
||||
|
||||
The `SiteHealthReportReplica` idempotency item is **out of scope** for this module:
|
||||
`SiteHealthReportReplica` is declared in `ScadaLink.Commons` and published/consumed by
|
||||
`CentralCommunicationActor` in the `ScadaLink.Communication` module — the
|
||||
HealthMonitoring module itself has no replication code. Replica double-delivery
|
||||
idempotency is already covered by `ProcessReport`'s sequence-number guard
|
||||
(`ProcessReport_RejectsEqualSequence`, `ProcessReport_RejectsStaleReport_WhenSequenceNotGreater`);
|
||||
testing the actor-side double-publish belongs in the Communication module's review.
|
||||
The HealthMonitoring test suite now stands at 47 passing tests (was 30).
|
||||
|
||||
### HealthMonitoring-010 — `HealthReportSender` silently swallows inner failures with bare `catch {}`
|
||||
|
||||
|
||||
@@ -103,17 +103,42 @@ public class CentralHealthAggregator : BackgroundService, ICentralHealthAggregat
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Bumps the last-seen timestamp for a site already known via a prior
|
||||
/// SiteHealthReport. Heartbeats from sites we have not yet received a
|
||||
/// full report from are ignored — registration only happens on report.
|
||||
/// The update is an atomic compare-and-swap of the immutable state.
|
||||
/// Bumps the last-seen timestamp for a site. If a heartbeat arrives for a
|
||||
/// site the aggregator has no state for yet (e.g. immediately after a central
|
||||
/// restart/failover, when in-memory state is empty), the site is registered
|
||||
/// as online with no <see cref="SiteHealthState.LatestReport"/> — heartbeats
|
||||
/// prove the site is reachable, so it shows online straight away rather than
|
||||
/// as "unknown" for up to a full report interval. The update is an atomic
|
||||
/// compare-and-swap of the immutable state.
|
||||
/// </summary>
|
||||
public void MarkHeartbeat(string siteId, DateTimeOffset receivedAt)
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
if (!_siteStates.TryGetValue(siteId, out var existing))
|
||||
return;
|
||||
{
|
||||
// Unknown site — register it as online, awaiting its first
|
||||
// full report. LatestReport stays null until ProcessReport runs.
|
||||
var registered = new SiteHealthState
|
||||
{
|
||||
SiteId = siteId,
|
||||
LatestReport = null,
|
||||
LastReportReceivedAt = default,
|
||||
LastHeartbeatAt = receivedAt,
|
||||
LastSequenceNumber = 0,
|
||||
IsOnline = true
|
||||
};
|
||||
|
||||
if (_siteStates.TryAdd(siteId, registered))
|
||||
{
|
||||
_logger.LogInformation(
|
||||
"Site {SiteId} registered online via heartbeat (awaiting first report)", siteId);
|
||||
return;
|
||||
}
|
||||
|
||||
// Lost the race — another thread registered first; retry as an update.
|
||||
continue;
|
||||
}
|
||||
|
||||
var newHeartbeat = receivedAt > existing.LastHeartbeatAt
|
||||
? receivedAt
|
||||
@@ -163,10 +188,10 @@ public class CentralHealthAggregator : BackgroundService, ICentralHealthAggregat
|
||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
_logger.LogInformation(
|
||||
"Central health aggregator started, offline timeout {Timeout}s",
|
||||
_options.OfflineTimeout.TotalSeconds);
|
||||
"Central health aggregator started, offline timeout {Timeout}s (central {CentralTimeout}s)",
|
||||
_options.OfflineTimeout.TotalSeconds, _options.CentralOfflineTimeout.TotalSeconds);
|
||||
|
||||
// Check at half the offline timeout interval for timely detection
|
||||
// Check at half the (shorter) offline timeout interval for timely detection
|
||||
var checkInterval = TimeSpan.FromMilliseconds(_options.OfflineTimeout.TotalMilliseconds / 2);
|
||||
using var timer = new PeriodicTimer(checkInterval);
|
||||
|
||||
@@ -189,8 +214,17 @@ public class CentralHealthAggregator : BackgroundService, ICentralHealthAggregat
|
||||
// healthy site node (cadence owned by Cluster Infrastructure /
|
||||
// SiteCommunicationActor), so OfflineTimeout only fires when no
|
||||
// node can reach central, not during single-node failovers.
|
||||
//
|
||||
// The synthetic "central" site has no heartbeat source — its only
|
||||
// signal is the 30s CentralHealthReportLoop self-report — so it gets
|
||||
// a longer grace window (CentralOfflineTimeout) to survive a single
|
||||
// skipped/late self-report.
|
||||
var timeout = kvp.Key == CentralHealthReportLoop.CentralSiteId
|
||||
? _options.CentralOfflineTimeout
|
||||
: _options.OfflineTimeout;
|
||||
|
||||
var elapsed = now - state.LastHeartbeatAt;
|
||||
if (elapsed <= _options.OfflineTimeout)
|
||||
if (elapsed <= timeout)
|
||||
continue;
|
||||
|
||||
// Atomically swap to an offline copy. If the CAS loses to a
|
||||
@@ -201,7 +235,7 @@ public class CentralHealthAggregator : BackgroundService, ICentralHealthAggregat
|
||||
{
|
||||
_logger.LogWarning(
|
||||
"Site {SiteId} marked offline — no signal for {Elapsed}s (timeout: {Timeout}s)",
|
||||
state.SiteId, elapsed.TotalSeconds, _options.OfflineTimeout.TotalSeconds);
|
||||
state.SiteId, elapsed.TotalSeconds, timeout.TotalSeconds);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,4 +4,17 @@ public class HealthMonitoringOptions
|
||||
{
|
||||
public TimeSpan ReportInterval { get; set; } = TimeSpan.FromSeconds(30);
|
||||
public TimeSpan OfflineTimeout { get; set; } = TimeSpan.FromMinutes(1);
|
||||
|
||||
/// <summary>
|
||||
/// Offline timeout applied to the synthetic "central" site only. Real sites
|
||||
/// emit frequent heartbeats that keep <c>LastHeartbeatAt</c> fresh, so the
|
||||
/// normal <see cref="OfflineTimeout"/> only fires on genuine total loss. The
|
||||
/// "central" self-report has no heartbeat source — its only signal is the
|
||||
/// 30s <see cref="CentralHealthReportLoop"/>, so a single skipped/late
|
||||
/// self-report (leader GC pause, brief stall, mid-failover before the new
|
||||
/// leader's loop spins up) would flap it offline under the 60s site timeout.
|
||||
/// A longer central grace gives the equivalent of "one missed report" that
|
||||
/// the design doc grants real sites. Default: 3x the report interval.
|
||||
/// </summary>
|
||||
public TimeSpan CentralOfflineTimeout { get; set; } = TimeSpan.FromMinutes(3);
|
||||
}
|
||||
|
||||
@@ -11,10 +11,13 @@ public interface ICentralHealthAggregator
|
||||
void ProcessReport(SiteHealthReport report);
|
||||
|
||||
/// <summary>
|
||||
/// Bumps the last-seen timestamp for a site already known via a prior
|
||||
/// SiteHealthReport. Used to keep a site marked online between full
|
||||
/// 30s reports when ~2s heartbeats are arriving — protects against the
|
||||
/// 60s offline threshold firing on a transiently delayed report.
|
||||
/// Bumps the last-seen timestamp for a site, keeping it marked online
|
||||
/// between full 30s reports when heartbeats are arriving — protects against
|
||||
/// the offline threshold firing on a transiently delayed report. A heartbeat
|
||||
/// for a site with no aggregator state yet (e.g. just after a central
|
||||
/// restart/failover) registers that site as online with no
|
||||
/// <see cref="SiteHealthState.LatestReport"/>, so reachable sites are not
|
||||
/// shown as "unknown" during the failover window.
|
||||
/// </summary>
|
||||
void MarkHeartbeat(string siteId, DateTimeOffset receivedAt);
|
||||
|
||||
|
||||
@@ -219,6 +219,94 @@ public class CentralHealthAggregatorTests
|
||||
Assert.True(final.IsOnline);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// HealthMonitoring-007 regression: a heartbeat for a site that has not yet
|
||||
/// sent a full report (e.g. immediately after a central restart/failover, when
|
||||
/// the aggregator's in-memory state is empty) must register the site as online
|
||||
/// rather than being silently discarded. Otherwise reachable sites show as
|
||||
/// "unknown" for up to a full report interval during the failover window.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public void MarkHeartbeat_RegistersUnknownSite_AsOnlineAwaitingReport()
|
||||
{
|
||||
var now = _timeProvider.GetUtcNow();
|
||||
|
||||
_aggregator.MarkHeartbeat("site-new", now);
|
||||
|
||||
var state = _aggregator.GetSiteState("site-new");
|
||||
Assert.NotNull(state);
|
||||
Assert.True(state.IsOnline);
|
||||
Assert.Null(state.LatestReport);
|
||||
Assert.Equal(now, state.LastHeartbeatAt);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void MarkHeartbeat_KeepsSiteOnline_BetweenReports()
|
||||
{
|
||||
_aggregator.ProcessReport(MakeReport("site-1", 1));
|
||||
|
||||
// Time advances past the offline timeout, but heartbeats keep arriving.
|
||||
_timeProvider.Advance(TimeSpan.FromSeconds(45));
|
||||
_aggregator.MarkHeartbeat("site-1", _timeProvider.GetUtcNow());
|
||||
_timeProvider.Advance(TimeSpan.FromSeconds(45));
|
||||
_aggregator.MarkHeartbeat("site-1", _timeProvider.GetUtcNow());
|
||||
|
||||
_aggregator.CheckForOfflineSites();
|
||||
|
||||
Assert.True(_aggregator.GetSiteState("site-1")!.IsOnline);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void MarkHeartbeat_BringsOfflineSiteBackOnline()
|
||||
{
|
||||
_aggregator.ProcessReport(MakeReport("site-1", 1));
|
||||
|
||||
_timeProvider.Advance(TimeSpan.FromSeconds(61));
|
||||
_aggregator.CheckForOfflineSites();
|
||||
Assert.False(_aggregator.GetSiteState("site-1")!.IsOnline);
|
||||
|
||||
_aggregator.MarkHeartbeat("site-1", _timeProvider.GetUtcNow());
|
||||
Assert.True(_aggregator.GetSiteState("site-1")!.IsOnline);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// HealthMonitoring-005 regression: the synthetic "central" site has no
|
||||
/// heartbeat source — its LastHeartbeatAt is only bumped by the 30s
|
||||
/// CentralHealthReportLoop self-report. A single skipped/late self-report
|
||||
/// (leader GC pause, brief stall, mid-failover) would leave it with no signal
|
||||
/// for >60s and flap it offline even though the central cluster is healthy.
|
||||
/// The "central" keyspace entry must get a longer offline grace than real sites.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public void OfflineDetection_CentralSite_HasLongerGraceThanRealSites()
|
||||
{
|
||||
_aggregator.ProcessReport(MakeReport(CentralHealthReportLoop.CentralSiteId, 1));
|
||||
_aggregator.ProcessReport(MakeReport("site-1", 1));
|
||||
|
||||
// One missed central self-report (~30s) plus the normal 60s site timeout:
|
||||
// a real site would already be offline here, but central must not be —
|
||||
// it only gets one self-report every 30s, so 60s is barely two reports.
|
||||
_timeProvider.Advance(TimeSpan.FromSeconds(75));
|
||||
_aggregator.CheckForOfflineSites();
|
||||
|
||||
Assert.False(_aggregator.GetSiteState("site-1")!.IsOnline);
|
||||
Assert.True(
|
||||
_aggregator.GetSiteState(CentralHealthReportLoop.CentralSiteId)!.IsOnline,
|
||||
"central must survive a single missed self-report");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void OfflineDetection_CentralSite_StillGoesOfflineOnGenuineLoss()
|
||||
{
|
||||
_aggregator.ProcessReport(MakeReport(CentralHealthReportLoop.CentralSiteId, 1));
|
||||
|
||||
// Well beyond even the central grace window — genuine total loss.
|
||||
_timeProvider.Advance(TimeSpan.FromMinutes(10));
|
||||
_aggregator.CheckForOfflineSites();
|
||||
|
||||
Assert.False(_aggregator.GetSiteState(CentralHealthReportLoop.CentralSiteId)!.IsOnline);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void SequenceNumberReset_RejectedUntilExceedsPrevMax()
|
||||
{
|
||||
|
||||
@@ -0,0 +1,134 @@
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using Microsoft.Extensions.Options;
|
||||
using ScadaLink.Commons.Messages.Health;
|
||||
|
||||
namespace ScadaLink.HealthMonitoring.Tests;
|
||||
|
||||
/// <summary>
|
||||
/// HealthMonitoring-009 regression: the central self-report loop had no test
|
||||
/// coverage at all. These tests exercise leader-only gating (SelfIsPrimary),
|
||||
/// self-report generation for siteId="central", and monotonic sequence
|
||||
/// assignment.
|
||||
/// </summary>
|
||||
public class CentralHealthReportLoopTests
|
||||
{
|
||||
private sealed class FakeClusterNodeProvider : IClusterNodeProvider
|
||||
{
|
||||
public bool SelfIsPrimary { get; set; }
|
||||
public IReadOnlyList<NodeStatus> Nodes { get; set; } = [];
|
||||
public IReadOnlyList<NodeStatus> GetClusterNodes() => Nodes;
|
||||
}
|
||||
|
||||
private sealed class RecordingAggregator : ICentralHealthAggregator
|
||||
{
|
||||
public List<SiteHealthReport> Processed { get; } = [];
|
||||
public void ProcessReport(SiteHealthReport report) => Processed.Add(report);
|
||||
public void MarkHeartbeat(string siteId, DateTimeOffset receivedAt) { }
|
||||
public IReadOnlyDictionary<string, SiteHealthState> GetAllSiteStates() =>
|
||||
new Dictionary<string, SiteHealthState>();
|
||||
public SiteHealthState? GetSiteState(string siteId) => null;
|
||||
}
|
||||
|
||||
private static async Task RunLoopBriefly(CentralHealthReportLoop loop, int runForMs)
|
||||
{
|
||||
using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(runForMs + 100));
|
||||
try
|
||||
{
|
||||
await loop.StartAsync(cts.Token);
|
||||
await Task.Delay(runForMs, CancellationToken.None);
|
||||
await loop.StopAsync(CancellationToken.None);
|
||||
}
|
||||
catch (OperationCanceledException) { }
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task GeneratesCentralReports_WhenSelfIsPrimary()
|
||||
{
|
||||
var collector = new SiteHealthCollector();
|
||||
var aggregator = new RecordingAggregator();
|
||||
var clusterNodes = new FakeClusterNodeProvider { SelfIsPrimary = true };
|
||||
var options = Options.Create(new HealthMonitoringOptions
|
||||
{
|
||||
ReportInterval = TimeSpan.FromMilliseconds(50)
|
||||
});
|
||||
|
||||
var loop = new CentralHealthReportLoop(
|
||||
collector, aggregator, clusterNodes, options,
|
||||
NullLogger<CentralHealthReportLoop>.Instance);
|
||||
|
||||
await RunLoopBriefly(loop, 250);
|
||||
|
||||
Assert.NotEmpty(aggregator.Processed);
|
||||
Assert.All(aggregator.Processed,
|
||||
r => Assert.Equal(CentralHealthReportLoop.CentralSiteId, r.SiteId));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task GeneratesNoReports_WhenNotPrimary()
|
||||
{
|
||||
var collector = new SiteHealthCollector();
|
||||
var aggregator = new RecordingAggregator();
|
||||
var clusterNodes = new FakeClusterNodeProvider { SelfIsPrimary = false };
|
||||
var options = Options.Create(new HealthMonitoringOptions
|
||||
{
|
||||
ReportInterval = TimeSpan.FromMilliseconds(50)
|
||||
});
|
||||
|
||||
var loop = new CentralHealthReportLoop(
|
||||
collector, aggregator, clusterNodes, options,
|
||||
NullLogger<CentralHealthReportLoop>.Instance);
|
||||
|
||||
await RunLoopBriefly(loop, 250);
|
||||
|
||||
Assert.Empty(aggregator.Processed);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task AssignsMonotonicSequenceNumbers()
|
||||
{
|
||||
var collector = new SiteHealthCollector();
|
||||
var aggregator = new RecordingAggregator();
|
||||
var clusterNodes = new FakeClusterNodeProvider { SelfIsPrimary = true };
|
||||
var options = Options.Create(new HealthMonitoringOptions
|
||||
{
|
||||
ReportInterval = TimeSpan.FromMilliseconds(50)
|
||||
});
|
||||
|
||||
var loop = new CentralHealthReportLoop(
|
||||
collector, aggregator, clusterNodes, options,
|
||||
NullLogger<CentralHealthReportLoop>.Instance);
|
||||
|
||||
await RunLoopBriefly(loop, 300);
|
||||
|
||||
Assert.True(aggregator.Processed.Count >= 2,
|
||||
$"Expected at least 2 reports, got {aggregator.Processed.Count}");
|
||||
for (int i = 1; i < aggregator.Processed.Count; i++)
|
||||
{
|
||||
Assert.True(
|
||||
aggregator.Processed[i].SequenceNumber > aggregator.Processed[i - 1].SequenceNumber,
|
||||
$"Sequence numbers not strictly increasing at index {i}");
|
||||
}
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task SetsActiveNodeFlag_EvenWhenNotPrimary()
|
||||
{
|
||||
// The loop must still report the node's role to the collector when it is
|
||||
// the standby, so the standby's own node card shows the correct role.
|
||||
var collector = new SiteHealthCollector();
|
||||
var aggregator = new RecordingAggregator();
|
||||
var clusterNodes = new FakeClusterNodeProvider { SelfIsPrimary = false };
|
||||
var options = Options.Create(new HealthMonitoringOptions
|
||||
{
|
||||
ReportInterval = TimeSpan.FromMilliseconds(50)
|
||||
});
|
||||
|
||||
var loop = new CentralHealthReportLoop(
|
||||
collector, aggregator, clusterNodes, options,
|
||||
NullLogger<CentralHealthReportLoop>.Instance);
|
||||
|
||||
await RunLoopBriefly(loop, 150);
|
||||
|
||||
Assert.False(collector.IsActiveNode);
|
||||
}
|
||||
}
|
||||
@@ -138,6 +138,111 @@ public class SiteHealthCollectorTests
|
||||
Assert.Equal(0, report.SequenceNumber);
|
||||
}
|
||||
|
||||
// HealthMonitoring-009 regression: the remaining collector setters had no
|
||||
// "reflected in report" coverage. The following tests verify each setter's
|
||||
// value reaches CollectReport output.
|
||||
|
||||
[Fact]
|
||||
public void SetClusterNodes_ReflectedInReport()
|
||||
{
|
||||
var nodes = new List<ScadaLink.Commons.Messages.Health.NodeStatus>
|
||||
{
|
||||
new("node-a", true, "Active"),
|
||||
new("node-b", true, "Standby")
|
||||
};
|
||||
_collector.SetClusterNodes(nodes);
|
||||
|
||||
var report = _collector.CollectReport("site-1");
|
||||
|
||||
Assert.NotNull(report.ClusterNodes);
|
||||
Assert.Equal(2, report.ClusterNodes!.Count);
|
||||
Assert.Equal("node-a", report.ClusterNodes[0].Hostname);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void SetInstanceCounts_ReflectedInReport()
|
||||
{
|
||||
_collector.SetInstanceCounts(deployed: 10, enabled: 7, disabled: 3);
|
||||
|
||||
var report = _collector.CollectReport("site-1");
|
||||
|
||||
Assert.Equal(10, report.DeployedInstanceCount);
|
||||
Assert.Equal(7, report.EnabledInstanceCount);
|
||||
Assert.Equal(3, report.DisabledInstanceCount);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void SetParkedMessageCount_ReflectedInReport()
|
||||
{
|
||||
_collector.SetParkedMessageCount(42);
|
||||
|
||||
var report = _collector.CollectReport("site-1");
|
||||
|
||||
Assert.Equal(42, report.ParkedMessageCount);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void SetNodeHostname_ReflectedInReport()
|
||||
{
|
||||
_collector.SetNodeHostname("site-host-1");
|
||||
|
||||
var report = _collector.CollectReport("site-1");
|
||||
|
||||
Assert.Equal("site-host-1", report.NodeHostname);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void SetActiveNode_ReflectedInNodeRole()
|
||||
{
|
||||
_collector.SetActiveNode(true);
|
||||
Assert.Equal("Active", _collector.CollectReport("site-1").NodeRole);
|
||||
Assert.True(_collector.IsActiveNode);
|
||||
|
||||
_collector.SetActiveNode(false);
|
||||
Assert.Equal("Standby", _collector.CollectReport("site-1").NodeRole);
|
||||
Assert.False(_collector.IsActiveNode);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void UpdateTagQuality_ReflectedInReport()
|
||||
{
|
||||
_collector.UpdateTagQuality("opc-1", good: 80, bad: 15, uncertain: 5);
|
||||
|
||||
var report = _collector.CollectReport("site-1");
|
||||
|
||||
Assert.NotNull(report.DataConnectionTagQuality);
|
||||
var quality = report.DataConnectionTagQuality!["opc-1"];
|
||||
Assert.Equal(80, quality.Good);
|
||||
Assert.Equal(15, quality.Bad);
|
||||
Assert.Equal(5, quality.Uncertain);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void UpdateConnectionEndpoint_ReflectedInReport()
|
||||
{
|
||||
_collector.UpdateConnectionEndpoint("opc-1", "opc.tcp://plc-1:4840");
|
||||
|
||||
var report = _collector.CollectReport("site-1");
|
||||
|
||||
Assert.NotNull(report.DataConnectionEndpoints);
|
||||
Assert.Equal("opc.tcp://plc-1:4840", report.DataConnectionEndpoints!["opc-1"]);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void SetStoreAndForwardDepths_ReflectedInReport()
|
||||
{
|
||||
_collector.SetStoreAndForwardDepths(new Dictionary<string, int>
|
||||
{
|
||||
["ExternalSystem"] = 5,
|
||||
["Notification"] = 2
|
||||
});
|
||||
|
||||
var report = _collector.CollectReport("site-1");
|
||||
|
||||
Assert.Equal(5, report.StoreAndForwardBufferDepths["ExternalSystem"]);
|
||||
Assert.Equal(2, report.StoreAndForwardBufferDepths["Notification"]);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ThreadSafety_ConcurrentIncrements()
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user