diff --git a/src/ZB.MOM.WW.ScadaBridge.HealthMonitoring/Kpi/SiteHealthKpiSampleSource.cs b/src/ZB.MOM.WW.ScadaBridge.HealthMonitoring/Kpi/SiteHealthKpiSampleSource.cs new file mode 100644 index 00000000..0ae3ae4b --- /dev/null +++ b/src/ZB.MOM.WW.ScadaBridge.HealthMonitoring/Kpi/SiteHealthKpiSampleSource.cs @@ -0,0 +1,150 @@ +using ZB.MOM.WW.ScadaBridge.Commons.Entities.Kpi; +using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Kpi; +using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums; +using ZB.MOM.WW.ScadaBridge.Commons.Types.Kpi; + +namespace ZB.MOM.WW.ScadaBridge.HealthMonitoring.Kpi; + +/// +/// Site Health (#11) for the M6 "KPI History & +/// Trends" backbone. Unlike the other M6 sample sources — which each query a +/// central repository — this one reads the in-memory +/// state, snapshotting the latest +/// +/// each tracked site has reported into per-site () +/// rows. +/// +/// +/// +/// Each sampling pass the central recorder enumerates this source and calls +/// , which iterates +/// and, for every site +/// whose is non-null, emits the +/// fixed metric catalog below. Sites known only via heartbeats (null +/// ) contribute nothing. +/// +/// +/// The aggregator is registered as a singleton (its state is shared, immutable, +/// and safe to hand straight to callers); this source is nonetheless registered +/// DI-scoped for consistency with the recorder's per-tick scope and the other +/// M6 sample sources. A scoped consumer resolving a singleton dependency is the +/// normal DI lifetime relationship and introduces no captive-dependency issue. +/// +/// +public sealed class SiteHealthKpiSampleSource : IKpiSampleSource +{ + // ── Metric catalog (the M6-agreed metric-name strings for this source) ── + // Declaration order matches the emission order in AddSiteSnapshot. + private const string MetricConnectionsUp = "connectionsUp"; + private const string MetricConnectionsDown = "connectionsDown"; + private const string MetricScriptErrors = "scriptErrors"; + private const string MetricAlarmEvalErrors = "alarmEvalErrors"; + private const string MetricSfBufferDepth = "sfBufferDepth"; + private const string MetricDeadLetters = "deadLetters"; + private const string MetricParkedMessages = "parkedMessages"; + private const string MetricDeployedInstances = "deployedInstances"; + private const string MetricEnabledInstances = "enabledInstances"; + private const string MetricDisabledInstances = "disabledInstances"; + private const string MetricAuditBacklogPending = "auditBacklogPending"; + private const string MetricEventLogWriteFailures = "eventLogWriteFailures"; + + private readonly ICentralHealthAggregator _aggregator; + + /// + /// Creates the sample source over the central in-memory health aggregator. + /// + /// The central health aggregator holding per-site state. + public SiteHealthKpiSampleSource(ICentralHealthAggregator aggregator) + { + ArgumentNullException.ThrowIfNull(aggregator); + _aggregator = aggregator; + } + + /// + public string Source => KpiSources.SiteHealth; + + /// + public Task> CollectAsync( + DateTime capturedAtUtc, CancellationToken cancellationToken = default) + { + var samples = new List(); + + foreach (var (siteId, state) in _aggregator.GetAllSiteStates()) + { + // Sites known only via heartbeats (no report yet) contribute nothing. + if (state.LatestReport is not { } report) + { + continue; + } + + AddSiteSnapshot(samples, capturedAtUtc, siteId, report); + } + + return Task.FromResult>(samples); + } + + /// + /// Appends the per-site metric catalog for one site's latest report, + /// mapping each metric to the corresponding + /// + /// field. + /// + private static void AddSiteSnapshot( + List samples, + DateTime capturedAtUtc, + string siteId, + ZB.MOM.WW.ScadaBridge.Commons.Messages.Health.SiteHealthReport report) + { + // Connection statuses: Connected counts as up, everything else + // (Disconnected / Connecting / Error) counts as down. + var connectionsUp = 0; + var connectionsDown = 0; + foreach (var status in report.DataConnectionStatuses.Values) + { + if (status == ConnectionHealth.Connected) + { + connectionsUp++; + } + else + { + connectionsDown++; + } + } + + // Sum of the store-and-forward buffer depths across all buffers. + long sfBufferDepth = 0; + foreach (var depth in report.StoreAndForwardBufferDepths.Values) + { + sfBufferDepth += depth; + } + + samples.Add(Sample(capturedAtUtc, MetricConnectionsUp, siteId, connectionsUp)); + samples.Add(Sample(capturedAtUtc, MetricConnectionsDown, siteId, connectionsDown)); + samples.Add(Sample(capturedAtUtc, MetricScriptErrors, siteId, report.ScriptErrorCount)); + samples.Add(Sample(capturedAtUtc, MetricAlarmEvalErrors, siteId, report.AlarmEvaluationErrorCount)); + samples.Add(Sample(capturedAtUtc, MetricSfBufferDepth, siteId, sfBufferDepth)); + samples.Add(Sample(capturedAtUtc, MetricDeadLetters, siteId, report.DeadLetterCount)); + samples.Add(Sample(capturedAtUtc, MetricParkedMessages, siteId, report.ParkedMessageCount)); + samples.Add(Sample(capturedAtUtc, MetricDeployedInstances, siteId, report.DeployedInstanceCount)); + samples.Add(Sample(capturedAtUtc, MetricEnabledInstances, siteId, report.EnabledInstanceCount)); + samples.Add(Sample(capturedAtUtc, MetricDisabledInstances, siteId, report.DisabledInstanceCount)); + // Nested audit-backlog object may be null (no data yet) — treat as 0. + samples.Add(Sample( + capturedAtUtc, MetricAuditBacklogPending, siteId, + report.SiteAuditBacklog?.PendingCount ?? 0)); + samples.Add(Sample( + capturedAtUtc, MetricEventLogWriteFailures, siteId, report.SiteEventLogWriteFailures)); + } + + private static KpiSample Sample( + DateTime capturedAtUtc, string metric, string scopeKey, double value) => + new() + { + Source = KpiSources.SiteHealth, + Metric = metric, + Scope = KpiScopes.Site, + ScopeKey = scopeKey, + Value = value, + CapturedAtUtc = capturedAtUtc, + }; +} diff --git a/src/ZB.MOM.WW.ScadaBridge.HealthMonitoring/ServiceCollectionExtensions.cs b/src/ZB.MOM.WW.ScadaBridge.HealthMonitoring/ServiceCollectionExtensions.cs index 54e86fcc..84375ae1 100644 --- a/src/ZB.MOM.WW.ScadaBridge.HealthMonitoring/ServiceCollectionExtensions.cs +++ b/src/ZB.MOM.WW.ScadaBridge.HealthMonitoring/ServiceCollectionExtensions.cs @@ -3,6 +3,7 @@ using Microsoft.Extensions.DependencyInjection.Extensions; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; +using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Kpi; namespace ZB.MOM.WW.ScadaBridge.HealthMonitoring; @@ -61,6 +62,14 @@ public static class ServiceCollectionExtensions services.AddSingleton(sp => sp.GetRequiredService()); services.AddHostedService(sp => sp.GetRequiredService()); services.AddHostedService(); + + // M6 "KPI History & Trends" (K9): per-site Site Health KPI sample source. + // Reads the in-memory central aggregator (a singleton) rather than a + // repository; registered Scoped to match the recorder's per-tick scope + // and the other M6 sample sources (a scoped source over a singleton + // dependency is fine — no captive dependency). + services.TryAddEnumerable( + ServiceDescriptor.Scoped()); return services; } diff --git a/tests/ZB.MOM.WW.ScadaBridge.HealthMonitoring.Tests/Kpi/SiteHealthKpiSampleSourceTests.cs b/tests/ZB.MOM.WW.ScadaBridge.HealthMonitoring.Tests/Kpi/SiteHealthKpiSampleSourceTests.cs new file mode 100644 index 00000000..5d2ff2a9 --- /dev/null +++ b/tests/ZB.MOM.WW.ScadaBridge.HealthMonitoring.Tests/Kpi/SiteHealthKpiSampleSourceTests.cs @@ -0,0 +1,170 @@ +using ZB.MOM.WW.ScadaBridge.Commons.Entities.Kpi; +using ZB.MOM.WW.ScadaBridge.Commons.Messages.Health; +using ZB.MOM.WW.ScadaBridge.Commons.Types; +using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums; +using ZB.MOM.WW.ScadaBridge.Commons.Types.Kpi; +using ZB.MOM.WW.ScadaBridge.HealthMonitoring.Kpi; + +namespace ZB.MOM.WW.ScadaBridge.HealthMonitoring.Tests.Kpi; + +/// +/// M6 "KPI History & Trends" (K9) coverage for +/// . The source reads the in-memory +/// and emits per-site +/// () rows for each site that +/// has reported a ; sites known only via heartbeats +/// (null ) contribute nothing. +/// +public class SiteHealthKpiSampleSourceTests +{ + private static readonly DateTime CapturedAt = + new(2026, 6, 17, 12, 0, 0, DateTimeKind.Utc); + + [Fact] + public void Source_Is_SiteHealth() + { + var source = new SiteHealthKpiSampleSource(new StubAggregator()); + Assert.Equal(KpiSources.SiteHealth, source.Source); + } + + [Fact] + public async Task CollectAsync_PopulatedSite_EmitsAllMetrics_NullReportSite_EmitsNothing() + { + // site-a: fully-populated report. site-b: heartbeat-only (null report). + var report = new SiteHealthReport( + SiteId: "site-a", + SequenceNumber: 5, + ReportTimestamp: CapturedAt, + DataConnectionStatuses: new Dictionary + { + ["conn-1"] = ConnectionHealth.Connected, + ["conn-2"] = ConnectionHealth.Connected, + ["conn-3"] = ConnectionHealth.Disconnected, + ["conn-4"] = ConnectionHealth.Connecting, + ["conn-5"] = ConnectionHealth.Error, + }, + TagResolutionCounts: new Dictionary(), + ScriptErrorCount: 3, + AlarmEvaluationErrorCount: 4, + StoreAndForwardBufferDepths: new Dictionary + { + ["buf-1"] = 10, + ["buf-2"] = 15, + }, + DeadLetterCount: 6, + DeployedInstanceCount: 20, + EnabledInstanceCount: 18, + DisabledInstanceCount: 2, + ParkedMessageCount: 7, + SiteAuditBacklog: new SiteAuditBacklogSnapshot( + PendingCount: 9, OldestPendingUtc: null, OnDiskBytes: 0), + SiteEventLogWriteFailures: 11); + + var aggregator = new StubAggregator + { + States = + { + ["site-a"] = new SiteHealthState { SiteId = "site-a", LatestReport = report }, + ["site-b"] = new SiteHealthState { SiteId = "site-b", LatestReport = null }, + }, + }; + + var source = new SiteHealthKpiSampleSource(aggregator); + + var samples = await source.CollectAsync(CapturedAt); + + // Every sample is for site-a only — the null-report site yields nothing. + Assert.All(samples, s => + { + Assert.Equal(KpiSources.SiteHealth, s.Source); + Assert.Equal(KpiScopes.Site, s.Scope); + Assert.Equal("site-a", s.ScopeKey); + Assert.Equal(CapturedAt, s.CapturedAtUtc); + }); + Assert.DoesNotContain(samples, s => s.ScopeKey == "site-b"); + + // Exact (Metric, Value) tuples for the populated site. + var byMetric = samples.ToDictionary(s => s.Metric, s => s.Value); + + Assert.Equal(2, byMetric["connectionsUp"]); // 2 Connected + Assert.Equal(3, byMetric["connectionsDown"]); // Disconnected + Connecting + Error + Assert.Equal(3, byMetric["scriptErrors"]); + Assert.Equal(4, byMetric["alarmEvalErrors"]); + Assert.Equal(25, byMetric["sfBufferDepth"]); // 10 + 15 + Assert.Equal(6, byMetric["deadLetters"]); + Assert.Equal(7, byMetric["parkedMessages"]); + Assert.Equal(20, byMetric["deployedInstances"]); + Assert.Equal(18, byMetric["enabledInstances"]); + Assert.Equal(2, byMetric["disabledInstances"]); + Assert.Equal(9, byMetric["auditBacklogPending"]); + Assert.Equal(11, byMetric["eventLogWriteFailures"]); + + // All 12 metrics emitted, exactly once each, for the one populated site. + Assert.Equal(12, byMetric.Count); + Assert.Equal(12, samples.Count); + } + + [Fact] + public async Task CollectAsync_NullAuditBacklog_EmitsZeroAuditBacklogPending() + { + var report = MinimalReport("site-a") with { SiteAuditBacklog = null }; + var aggregator = new StubAggregator + { + States = { ["site-a"] = new SiteHealthState { SiteId = "site-a", LatestReport = report } }, + }; + + var samples = await new SiteHealthKpiSampleSource(aggregator).CollectAsync(CapturedAt); + + Assert.Equal(0, samples.Single(s => s.Metric == "auditBacklogPending").Value); + } + + [Fact] + public async Task CollectAsync_NoSitesWithReports_ReturnsEmptyList() + { + var aggregator = new StubAggregator + { + States = { ["site-b"] = new SiteHealthState { SiteId = "site-b", LatestReport = null } }, + }; + + var samples = await new SiteHealthKpiSampleSource(aggregator).CollectAsync(CapturedAt); + + Assert.NotNull(samples); + Assert.Empty(samples); + } + + private static SiteHealthReport MinimalReport(string siteId) => + new( + SiteId: siteId, + SequenceNumber: 1, + ReportTimestamp: CapturedAt, + DataConnectionStatuses: new Dictionary(), + TagResolutionCounts: new Dictionary(), + ScriptErrorCount: 0, + AlarmEvaluationErrorCount: 0, + StoreAndForwardBufferDepths: new Dictionary(), + DeadLetterCount: 0, + DeployedInstanceCount: 0, + EnabledInstanceCount: 0, + DisabledInstanceCount: 0); + + /// + /// Hand-rolled stub — the + /// HealthMonitoring.Tests project has no mocking library. Only + /// is exercised by the source under test. + /// + private sealed class StubAggregator : ICentralHealthAggregator + { + public Dictionary States { get; } = new(); + + public IReadOnlyDictionary GetAllSiteStates() => States; + + public SiteHealthState? GetSiteState(string siteId) => + States.TryGetValue(siteId, out var state) ? state : null; + + public void ProcessReport(SiteHealthReport report) => + throw new NotSupportedException(); + + public void MarkHeartbeat(string siteId, DateTimeOffset receivedAt) => + throw new NotSupportedException(); + } +}