feat(kpi): K9 — SiteHealth sample source (per-site, from aggregator)

This commit is contained in:
Joseph Doherty
2026-06-17 20:20:18 -04:00
parent cb2a516187
commit 601cc6f594
3 changed files with 329 additions and 0 deletions
@@ -0,0 +1,150 @@
using ZB.MOM.WW.ScadaBridge.Commons.Entities.Kpi;
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Kpi;
using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums;
using ZB.MOM.WW.ScadaBridge.Commons.Types.Kpi;
namespace ZB.MOM.WW.ScadaBridge.HealthMonitoring.Kpi;
/// <summary>
/// Site Health (#11) <see cref="IKpiSampleSource"/> for the M6 "KPI History &amp;
/// Trends" backbone. Unlike the other M6 sample sources — which each query a
/// central repository — this one reads the in-memory
/// <see cref="ICentralHealthAggregator"/> state, snapshotting the latest
/// <see cref="ZB.MOM.WW.ScadaBridge.Commons.Messages.Health.SiteHealthReport"/>
/// each tracked site has reported into per-site (<see cref="KpiScopes.Site"/>)
/// <see cref="KpiSample"/> rows.
/// </summary>
/// <remarks>
/// <para>
/// Each sampling pass the central recorder enumerates this source and calls
/// <see cref="CollectAsync"/>, which iterates
/// <see cref="ICentralHealthAggregator.GetAllSiteStates"/> and, for every site
/// whose <see cref="SiteHealthState.LatestReport"/> is non-null, emits the
/// fixed metric catalog below. Sites known only via heartbeats (null
/// <see cref="SiteHealthState.LatestReport"/>) contribute nothing.
/// </para>
/// <para>
/// The aggregator is registered as a singleton (its state is shared, immutable,
/// and safe to hand straight to callers); this source is nonetheless registered
/// DI-scoped for consistency with the recorder's per-tick scope and the other
/// M6 sample sources. A scoped consumer resolving a singleton dependency is the
/// normal DI lifetime relationship and introduces no captive-dependency issue.
/// </para>
/// </remarks>
public sealed class SiteHealthKpiSampleSource : IKpiSampleSource
{
// ── Metric catalog (the M6-agreed metric-name strings for this source) ──
// Declaration order matches the emission order in AddSiteSnapshot.
private const string MetricConnectionsUp = "connectionsUp";
private const string MetricConnectionsDown = "connectionsDown";
private const string MetricScriptErrors = "scriptErrors";
private const string MetricAlarmEvalErrors = "alarmEvalErrors";
private const string MetricSfBufferDepth = "sfBufferDepth";
private const string MetricDeadLetters = "deadLetters";
private const string MetricParkedMessages = "parkedMessages";
private const string MetricDeployedInstances = "deployedInstances";
private const string MetricEnabledInstances = "enabledInstances";
private const string MetricDisabledInstances = "disabledInstances";
private const string MetricAuditBacklogPending = "auditBacklogPending";
private const string MetricEventLogWriteFailures = "eventLogWriteFailures";
private readonly ICentralHealthAggregator _aggregator;
/// <summary>
/// Creates the sample source over the central in-memory health aggregator.
/// </summary>
/// <param name="aggregator">The central health aggregator holding per-site state.</param>
public SiteHealthKpiSampleSource(ICentralHealthAggregator aggregator)
{
ArgumentNullException.ThrowIfNull(aggregator);
_aggregator = aggregator;
}
/// <inheritdoc />
public string Source => KpiSources.SiteHealth;
/// <inheritdoc />
public Task<IReadOnlyList<KpiSample>> CollectAsync(
DateTime capturedAtUtc, CancellationToken cancellationToken = default)
{
var samples = new List<KpiSample>();
foreach (var (siteId, state) in _aggregator.GetAllSiteStates())
{
// Sites known only via heartbeats (no report yet) contribute nothing.
if (state.LatestReport is not { } report)
{
continue;
}
AddSiteSnapshot(samples, capturedAtUtc, siteId, report);
}
return Task.FromResult<IReadOnlyList<KpiSample>>(samples);
}
/// <summary>
/// Appends the per-site metric catalog for one site's latest report,
/// mapping each metric to the corresponding
/// <see cref="ZB.MOM.WW.ScadaBridge.Commons.Messages.Health.SiteHealthReport"/>
/// field.
/// </summary>
private static void AddSiteSnapshot(
List<KpiSample> samples,
DateTime capturedAtUtc,
string siteId,
ZB.MOM.WW.ScadaBridge.Commons.Messages.Health.SiteHealthReport report)
{
// Connection statuses: Connected counts as up, everything else
// (Disconnected / Connecting / Error) counts as down.
var connectionsUp = 0;
var connectionsDown = 0;
foreach (var status in report.DataConnectionStatuses.Values)
{
if (status == ConnectionHealth.Connected)
{
connectionsUp++;
}
else
{
connectionsDown++;
}
}
// Sum of the store-and-forward buffer depths across all buffers.
long sfBufferDepth = 0;
foreach (var depth in report.StoreAndForwardBufferDepths.Values)
{
sfBufferDepth += depth;
}
samples.Add(Sample(capturedAtUtc, MetricConnectionsUp, siteId, connectionsUp));
samples.Add(Sample(capturedAtUtc, MetricConnectionsDown, siteId, connectionsDown));
samples.Add(Sample(capturedAtUtc, MetricScriptErrors, siteId, report.ScriptErrorCount));
samples.Add(Sample(capturedAtUtc, MetricAlarmEvalErrors, siteId, report.AlarmEvaluationErrorCount));
samples.Add(Sample(capturedAtUtc, MetricSfBufferDepth, siteId, sfBufferDepth));
samples.Add(Sample(capturedAtUtc, MetricDeadLetters, siteId, report.DeadLetterCount));
samples.Add(Sample(capturedAtUtc, MetricParkedMessages, siteId, report.ParkedMessageCount));
samples.Add(Sample(capturedAtUtc, MetricDeployedInstances, siteId, report.DeployedInstanceCount));
samples.Add(Sample(capturedAtUtc, MetricEnabledInstances, siteId, report.EnabledInstanceCount));
samples.Add(Sample(capturedAtUtc, MetricDisabledInstances, siteId, report.DisabledInstanceCount));
// Nested audit-backlog object may be null (no data yet) — treat as 0.
samples.Add(Sample(
capturedAtUtc, MetricAuditBacklogPending, siteId,
report.SiteAuditBacklog?.PendingCount ?? 0));
samples.Add(Sample(
capturedAtUtc, MetricEventLogWriteFailures, siteId, report.SiteEventLogWriteFailures));
}
private static KpiSample Sample(
DateTime capturedAtUtc, string metric, string scopeKey, double value) =>
new()
{
Source = KpiSources.SiteHealth,
Metric = metric,
Scope = KpiScopes.Site,
ScopeKey = scopeKey,
Value = value,
CapturedAtUtc = capturedAtUtc,
};
}
@@ -3,6 +3,7 @@ using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Kpi;
namespace ZB.MOM.WW.ScadaBridge.HealthMonitoring;
@@ -61,6 +62,14 @@ public static class ServiceCollectionExtensions
services.AddSingleton<ICentralHealthAggregator>(sp => sp.GetRequiredService<CentralHealthAggregator>());
services.AddHostedService(sp => sp.GetRequiredService<CentralHealthAggregator>());
services.AddHostedService<CentralHealthReportLoop>();
// M6 "KPI History & Trends" (K9): per-site Site Health KPI sample source.
// Reads the in-memory central aggregator (a singleton) rather than a
// repository; registered Scoped to match the recorder's per-tick scope
// and the other M6 sample sources (a scoped source over a singleton
// dependency is fine — no captive dependency).
services.TryAddEnumerable(
ServiceDescriptor.Scoped<IKpiSampleSource, Kpi.SiteHealthKpiSampleSource>());
return services;
}
@@ -0,0 +1,170 @@
using ZB.MOM.WW.ScadaBridge.Commons.Entities.Kpi;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Health;
using ZB.MOM.WW.ScadaBridge.Commons.Types;
using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums;
using ZB.MOM.WW.ScadaBridge.Commons.Types.Kpi;
using ZB.MOM.WW.ScadaBridge.HealthMonitoring.Kpi;
namespace ZB.MOM.WW.ScadaBridge.HealthMonitoring.Tests.Kpi;
/// <summary>
/// M6 "KPI History &amp; Trends" (K9) coverage for
/// <see cref="SiteHealthKpiSampleSource"/>. The source reads the in-memory
/// <see cref="ICentralHealthAggregator"/> and emits per-site
/// (<see cref="KpiScopes.Site"/>) <see cref="KpiSample"/> rows for each site that
/// has reported a <see cref="SiteHealthReport"/>; sites known only via heartbeats
/// (null <see cref="SiteHealthState.LatestReport"/>) contribute nothing.
/// </summary>
public class SiteHealthKpiSampleSourceTests
{
private static readonly DateTime CapturedAt =
new(2026, 6, 17, 12, 0, 0, DateTimeKind.Utc);
[Fact]
public void Source_Is_SiteHealth()
{
var source = new SiteHealthKpiSampleSource(new StubAggregator());
Assert.Equal(KpiSources.SiteHealth, source.Source);
}
[Fact]
public async Task CollectAsync_PopulatedSite_EmitsAllMetrics_NullReportSite_EmitsNothing()
{
// site-a: fully-populated report. site-b: heartbeat-only (null report).
var report = new SiteHealthReport(
SiteId: "site-a",
SequenceNumber: 5,
ReportTimestamp: CapturedAt,
DataConnectionStatuses: new Dictionary<string, ConnectionHealth>
{
["conn-1"] = ConnectionHealth.Connected,
["conn-2"] = ConnectionHealth.Connected,
["conn-3"] = ConnectionHealth.Disconnected,
["conn-4"] = ConnectionHealth.Connecting,
["conn-5"] = ConnectionHealth.Error,
},
TagResolutionCounts: new Dictionary<string, TagResolutionStatus>(),
ScriptErrorCount: 3,
AlarmEvaluationErrorCount: 4,
StoreAndForwardBufferDepths: new Dictionary<string, int>
{
["buf-1"] = 10,
["buf-2"] = 15,
},
DeadLetterCount: 6,
DeployedInstanceCount: 20,
EnabledInstanceCount: 18,
DisabledInstanceCount: 2,
ParkedMessageCount: 7,
SiteAuditBacklog: new SiteAuditBacklogSnapshot(
PendingCount: 9, OldestPendingUtc: null, OnDiskBytes: 0),
SiteEventLogWriteFailures: 11);
var aggregator = new StubAggregator
{
States =
{
["site-a"] = new SiteHealthState { SiteId = "site-a", LatestReport = report },
["site-b"] = new SiteHealthState { SiteId = "site-b", LatestReport = null },
},
};
var source = new SiteHealthKpiSampleSource(aggregator);
var samples = await source.CollectAsync(CapturedAt);
// Every sample is for site-a only — the null-report site yields nothing.
Assert.All(samples, s =>
{
Assert.Equal(KpiSources.SiteHealth, s.Source);
Assert.Equal(KpiScopes.Site, s.Scope);
Assert.Equal("site-a", s.ScopeKey);
Assert.Equal(CapturedAt, s.CapturedAtUtc);
});
Assert.DoesNotContain(samples, s => s.ScopeKey == "site-b");
// Exact (Metric, Value) tuples for the populated site.
var byMetric = samples.ToDictionary(s => s.Metric, s => s.Value);
Assert.Equal(2, byMetric["connectionsUp"]); // 2 Connected
Assert.Equal(3, byMetric["connectionsDown"]); // Disconnected + Connecting + Error
Assert.Equal(3, byMetric["scriptErrors"]);
Assert.Equal(4, byMetric["alarmEvalErrors"]);
Assert.Equal(25, byMetric["sfBufferDepth"]); // 10 + 15
Assert.Equal(6, byMetric["deadLetters"]);
Assert.Equal(7, byMetric["parkedMessages"]);
Assert.Equal(20, byMetric["deployedInstances"]);
Assert.Equal(18, byMetric["enabledInstances"]);
Assert.Equal(2, byMetric["disabledInstances"]);
Assert.Equal(9, byMetric["auditBacklogPending"]);
Assert.Equal(11, byMetric["eventLogWriteFailures"]);
// All 12 metrics emitted, exactly once each, for the one populated site.
Assert.Equal(12, byMetric.Count);
Assert.Equal(12, samples.Count);
}
[Fact]
public async Task CollectAsync_NullAuditBacklog_EmitsZeroAuditBacklogPending()
{
var report = MinimalReport("site-a") with { SiteAuditBacklog = null };
var aggregator = new StubAggregator
{
States = { ["site-a"] = new SiteHealthState { SiteId = "site-a", LatestReport = report } },
};
var samples = await new SiteHealthKpiSampleSource(aggregator).CollectAsync(CapturedAt);
Assert.Equal(0, samples.Single(s => s.Metric == "auditBacklogPending").Value);
}
[Fact]
public async Task CollectAsync_NoSitesWithReports_ReturnsEmptyList()
{
var aggregator = new StubAggregator
{
States = { ["site-b"] = new SiteHealthState { SiteId = "site-b", LatestReport = null } },
};
var samples = await new SiteHealthKpiSampleSource(aggregator).CollectAsync(CapturedAt);
Assert.NotNull(samples);
Assert.Empty(samples);
}
private static SiteHealthReport MinimalReport(string siteId) =>
new(
SiteId: siteId,
SequenceNumber: 1,
ReportTimestamp: CapturedAt,
DataConnectionStatuses: new Dictionary<string, ConnectionHealth>(),
TagResolutionCounts: new Dictionary<string, TagResolutionStatus>(),
ScriptErrorCount: 0,
AlarmEvaluationErrorCount: 0,
StoreAndForwardBufferDepths: new Dictionary<string, int>(),
DeadLetterCount: 0,
DeployedInstanceCount: 0,
EnabledInstanceCount: 0,
DisabledInstanceCount: 0);
/// <summary>
/// Hand-rolled <see cref="ICentralHealthAggregator"/> stub — the
/// HealthMonitoring.Tests project has no mocking library. Only
/// <see cref="GetAllSiteStates"/> is exercised by the source under test.
/// </summary>
private sealed class StubAggregator : ICentralHealthAggregator
{
public Dictionary<string, SiteHealthState> States { get; } = new();
public IReadOnlyDictionary<string, SiteHealthState> GetAllSiteStates() => States;
public SiteHealthState? GetSiteState(string siteId) =>
States.TryGetValue(siteId, out var state) ? state : null;
public void ProcessReport(SiteHealthReport report) =>
throw new NotSupportedException();
public void MarkHeartbeat(string siteId, DateTimeOffset receivedAt) =>
throw new NotSupportedException();
}
}