feat(kpi): K6 — NotificationOutbox sample source (global/site/node)

This commit is contained in:
Joseph Doherty
2026-06-17 19:53:39 -04:00
parent 9ffa34d3e7
commit 0d6c026dff
4 changed files with 331 additions and 0 deletions
@@ -0,0 +1,142 @@
using Microsoft.Extensions.Options;
using ZB.MOM.WW.ScadaBridge.Commons.Entities.Kpi;
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Kpi;
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories;
using ZB.MOM.WW.ScadaBridge.Commons.Types.Kpi;
using ZB.MOM.WW.ScadaBridge.Commons.Types.Notifications;
namespace ZB.MOM.WW.ScadaBridge.NotificationOutbox.Kpi;
/// <summary>
/// <see cref="IKpiSampleSource"/> for the Notification Outbox (#21): snapshots the same
/// point-in-time delivery KPIs the live Health-dashboard tiles surface — queue depth, stuck
/// count, parked count, delivered-last-interval, and oldest-pending age — at every sampling
/// pass of the central KPI-history recorder (M6 "KPI History &amp; Trends").
/// </summary>
/// <remarks>
/// <para>
/// Computes the same cutoffs the live KPI handlers in
/// <see cref="NotificationOutboxActor"/> use, anchored on the recorder's shared
/// <c>capturedAtUtc</c> rather than wall-clock <c>now</c>: the stuck cutoff is
/// <c>capturedAtUtc - <see cref="NotificationOutboxOptions.StuckAgeThreshold"/></c> and the
/// delivered window is <c>capturedAtUtc - <see cref="NotificationOutboxOptions.DeliveredKpiWindow"/></c>.
/// So a sample captured at the same instant equals the live tile.
/// </para>
/// <para>
/// Emits Global (<c>ScopeKey == null</c>), per-Site (<c>ScopeKey == SourceSiteId</c>), and
/// per-Node (<c>ScopeKey == SourceNode</c>) samples, mirroring the repository's three KPI
/// computation methods. The oldest-pending age (a <see cref="TimeSpan"/>?) maps to
/// <c>oldestPendingAgeSeconds</c> via <see cref="TimeSpan.TotalSeconds"/>; that one metric is
/// omitted when the age is <c>null</c>.
/// </para>
/// </remarks>
public sealed class NotificationOutboxKpiSampleSource : IKpiSampleSource
{
private const string MetricQueueDepth = "queueDepth";
private const string MetricStuckCount = "stuckCount";
private const string MetricParkedCount = "parkedCount";
private const string MetricDeliveredLastInterval = "deliveredLastInterval";
private const string MetricOldestPendingAgeSeconds = "oldestPendingAgeSeconds";
private readonly INotificationOutboxRepository _repository;
private readonly NotificationOutboxOptions _options;
/// <summary>
/// Creates the sample source.
/// </summary>
/// <param name="repository">Outbox repository providing the KPI computation methods.</param>
/// <param name="options">Outbox options carrying the stuck-age and delivered-window cutoffs.</param>
public NotificationOutboxKpiSampleSource(
INotificationOutboxRepository repository,
IOptions<NotificationOutboxOptions> options)
{
ArgumentNullException.ThrowIfNull(repository);
ArgumentNullException.ThrowIfNull(options);
_repository = repository;
_options = options.Value;
}
/// <inheritdoc />
public string Source => KpiSources.NotificationOutbox;
/// <inheritdoc />
public async Task<IReadOnlyList<KpiSample>> CollectAsync(
DateTime capturedAtUtc, CancellationToken cancellationToken = default)
{
// Anchor the live KPI cutoffs on the recorder's shared capture instant, so a sample
// captured at the same moment equals the live Health-dashboard tile.
var capturedAt = new DateTimeOffset(capturedAtUtc, TimeSpan.Zero);
var stuckCutoff = capturedAt - _options.StuckAgeThreshold;
var deliveredSince = capturedAt - _options.DeliveredKpiWindow;
var samples = new List<KpiSample>();
var global = await _repository.ComputeKpisAsync(stuckCutoff, deliveredSince, cancellationToken)
.ConfigureAwait(false);
AddSnapshot(
samples, capturedAtUtc, KpiScopes.Global, scopeKey: null,
global.QueueDepth, global.StuckCount, global.ParkedCount,
global.DeliveredLastInterval, global.OldestPendingAge);
var perSite = await _repository.ComputePerSiteKpisAsync(stuckCutoff, deliveredSince, cancellationToken)
.ConfigureAwait(false);
foreach (var site in perSite)
{
AddSnapshot(
samples, capturedAtUtc, KpiScopes.Site, site.SourceSiteId,
site.QueueDepth, site.StuckCount, site.ParkedCount,
site.DeliveredLastInterval, site.OldestPendingAge);
}
var perNode = await _repository.ComputePerNodeKpisAsync(stuckCutoff, deliveredSince, cancellationToken)
.ConfigureAwait(false);
foreach (var node in perNode)
{
AddSnapshot(
samples, capturedAtUtc, KpiScopes.Node, node.SourceNode,
node.QueueDepth, node.StuckCount, node.ParkedCount,
node.DeliveredLastInterval, node.OldestPendingAge);
}
return samples;
}
/// <summary>
/// Appends the five outbox metrics for one snapshot at the given scope, omitting
/// <c>oldestPendingAgeSeconds</c> when <paramref name="oldestPendingAge"/> is <c>null</c>.
/// </summary>
private void AddSnapshot(
List<KpiSample> samples,
DateTime capturedAtUtc,
string scope,
string? scopeKey,
int queueDepth,
int stuckCount,
int parkedCount,
int deliveredLastInterval,
TimeSpan? oldestPendingAge)
{
samples.Add(Sample(capturedAtUtc, MetricQueueDepth, scope, scopeKey, queueDepth));
samples.Add(Sample(capturedAtUtc, MetricStuckCount, scope, scopeKey, stuckCount));
samples.Add(Sample(capturedAtUtc, MetricParkedCount, scope, scopeKey, parkedCount));
samples.Add(Sample(capturedAtUtc, MetricDeliveredLastInterval, scope, scopeKey, deliveredLastInterval));
if (oldestPendingAge is { } age)
{
samples.Add(Sample(capturedAtUtc, MetricOldestPendingAgeSeconds, scope, scopeKey, age.TotalSeconds));
}
}
private KpiSample Sample(
DateTime capturedAtUtc, string metric, string scope, string? scopeKey, double value) =>
new()
{
Source = KpiSources.NotificationOutbox,
Metric = metric,
Scope = scope,
ScopeKey = scopeKey,
Value = value,
CapturedAtUtc = capturedAtUtc,
};
}
@@ -1,5 +1,7 @@
using Microsoft.Extensions.DependencyInjection;
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Kpi;
using ZB.MOM.WW.ScadaBridge.NotificationOutbox.Delivery;
using ZB.MOM.WW.ScadaBridge.NotificationOutbox.Kpi;
namespace ZB.MOM.WW.ScadaBridge.NotificationOutbox;
@@ -46,6 +48,10 @@ public static class ServiceCollectionExtensions
services.AddScoped<INotificationDeliveryAdapter>(
sp => sp.GetRequiredService<EmailNotificationDeliveryAdapter>());
// KPI history (M6): the recorder singleton enumerates every IKpiSampleSource each
// sampling pass to snapshot the outbox delivery KPIs into the central history store.
services.AddScoped<IKpiSampleSource, NotificationOutboxKpiSampleSource>();
return services;
}
}
@@ -0,0 +1,182 @@
using Microsoft.Extensions.Options;
using NSubstitute;
using ZB.MOM.WW.ScadaBridge.Commons.Entities.Kpi;
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories;
using ZB.MOM.WW.ScadaBridge.Commons.Types.Kpi;
using ZB.MOM.WW.ScadaBridge.Commons.Types.Notifications;
using ZB.MOM.WW.ScadaBridge.NotificationOutbox;
using ZB.MOM.WW.ScadaBridge.NotificationOutbox.Kpi;
namespace ZB.MOM.WW.ScadaBridge.NotificationService.Tests.Kpi;
/// <summary>
/// Tests for <see cref="NotificationOutboxKpiSampleSource"/> — the M6 KPI sample source that
/// snapshots the Notification Outbox delivery KPIs (global / per-site / per-node) into the
/// central KPI-history store.
/// </summary>
public class NotificationOutboxKpiSampleSourceTests
{
private static readonly DateTime CapturedAt = new(2026, 6, 15, 12, 0, 0, DateTimeKind.Utc);
private static readonly NotificationOutboxOptions Options = new()
{
StuckAgeThreshold = TimeSpan.FromMinutes(10),
DeliveredKpiWindow = TimeSpan.FromMinutes(1),
};
private static NotificationOutboxKpiSampleSource CreateSource(INotificationOutboxRepository repository) =>
new(repository, Microsoft.Extensions.Options.Options.Create(Options));
[Fact]
public void Source_IsNotificationOutbox()
{
var source = CreateSource(Substitute.For<INotificationOutboxRepository>());
Assert.Equal(KpiSources.NotificationOutbox, source.Source);
}
[Fact]
public async Task CollectAsync_PassesCutoffsAnchoredOnCapturedAt()
{
var repository = Substitute.For<INotificationOutboxRepository>();
StubEmptySnapshots(repository);
var source = CreateSource(repository);
await source.CollectAsync(CapturedAt);
var expectedStuckCutoff = new DateTimeOffset(CapturedAt, TimeSpan.Zero) - Options.StuckAgeThreshold;
var expectedDeliveredSince = new DateTimeOffset(CapturedAt, TimeSpan.Zero) - Options.DeliveredKpiWindow;
await repository.Received(1).ComputeKpisAsync(
expectedStuckCutoff, expectedDeliveredSince, Arg.Any<CancellationToken>());
await repository.Received(1).ComputePerSiteKpisAsync(
expectedStuckCutoff, expectedDeliveredSince, Arg.Any<CancellationToken>());
await repository.Received(1).ComputePerNodeKpisAsync(
expectedStuckCutoff, expectedDeliveredSince, Arg.Any<CancellationToken>());
}
[Fact]
public async Task CollectAsync_EmitsGlobalSiteAndNodeSamples_WithExpectedTuples()
{
var repository = Substitute.For<INotificationOutboxRepository>();
repository.ComputeKpisAsync(Arg.Any<DateTimeOffset>(), Arg.Any<DateTimeOffset>(), Arg.Any<CancellationToken>())
.Returns(new NotificationKpiSnapshot(
QueueDepth: 5,
StuckCount: 2,
ParkedCount: 1,
DeliveredLastInterval: 7,
OldestPendingAge: TimeSpan.FromSeconds(90)));
repository.ComputePerSiteKpisAsync(Arg.Any<DateTimeOffset>(), Arg.Any<DateTimeOffset>(), Arg.Any<CancellationToken>())
.Returns(new[]
{
new SiteNotificationKpiSnapshot(
SourceSiteId: "site-a",
QueueDepth: 3,
StuckCount: 1,
ParkedCount: 0,
DeliveredLastInterval: 4,
OldestPendingAge: TimeSpan.FromSeconds(30)),
});
repository.ComputePerNodeKpisAsync(Arg.Any<DateTimeOffset>(), Arg.Any<DateTimeOffset>(), Arg.Any<CancellationToken>())
.Returns(new[]
{
new NodeNotificationKpiSnapshot(
SourceNode: "node-a",
QueueDepth: 2,
StuckCount: 1,
ParkedCount: 1,
DeliveredLastInterval: 3,
OldestPendingAge: TimeSpan.FromSeconds(60)),
});
var source = CreateSource(repository);
var samples = await source.CollectAsync(CapturedAt);
// 3 scopes × 5 metrics (all ages non-null) = 15 samples.
Assert.Equal(15, samples.Count);
Assert.All(samples, s => Assert.Equal(KpiSources.NotificationOutbox, s.Source));
Assert.All(samples, s => Assert.Equal(CapturedAt, s.CapturedAtUtc));
// Global — null ScopeKey.
AssertSample(samples, "queueDepth", KpiScopes.Global, null, 5);
AssertSample(samples, "stuckCount", KpiScopes.Global, null, 2);
AssertSample(samples, "parkedCount", KpiScopes.Global, null, 1);
AssertSample(samples, "deliveredLastInterval", KpiScopes.Global, null, 7);
AssertSample(samples, "oldestPendingAgeSeconds", KpiScopes.Global, null, 90);
// Site — ScopeKey == site id.
AssertSample(samples, "queueDepth", KpiScopes.Site, "site-a", 3);
AssertSample(samples, "stuckCount", KpiScopes.Site, "site-a", 1);
AssertSample(samples, "parkedCount", KpiScopes.Site, "site-a", 0);
AssertSample(samples, "deliveredLastInterval", KpiScopes.Site, "site-a", 4);
AssertSample(samples, "oldestPendingAgeSeconds", KpiScopes.Site, "site-a", 30);
// Node — ScopeKey == node name.
AssertSample(samples, "queueDepth", KpiScopes.Node, "node-a", 2);
AssertSample(samples, "stuckCount", KpiScopes.Node, "node-a", 1);
AssertSample(samples, "parkedCount", KpiScopes.Node, "node-a", 1);
AssertSample(samples, "deliveredLastInterval", KpiScopes.Node, "node-a", 3);
AssertSample(samples, "oldestPendingAgeSeconds", KpiScopes.Node, "node-a", 60);
}
[Fact]
public async Task CollectAsync_OmitsOldestPendingAge_WhenNull()
{
var repository = Substitute.For<INotificationOutboxRepository>();
repository.ComputeKpisAsync(Arg.Any<DateTimeOffset>(), Arg.Any<DateTimeOffset>(), Arg.Any<CancellationToken>())
.Returns(new NotificationKpiSnapshot(
QueueDepth: 0,
StuckCount: 0,
ParkedCount: 0,
DeliveredLastInterval: 0,
OldestPendingAge: null));
repository.ComputePerSiteKpisAsync(Arg.Any<DateTimeOffset>(), Arg.Any<DateTimeOffset>(), Arg.Any<CancellationToken>())
.Returns(Array.Empty<SiteNotificationKpiSnapshot>());
repository.ComputePerNodeKpisAsync(Arg.Any<DateTimeOffset>(), Arg.Any<DateTimeOffset>(), Arg.Any<CancellationToken>())
.Returns(Array.Empty<NodeNotificationKpiSnapshot>());
var source = CreateSource(repository);
var samples = await source.CollectAsync(CapturedAt);
// Only the global snapshot, age omitted -> the four count metrics.
Assert.Equal(4, samples.Count);
Assert.DoesNotContain(samples, s => s.Metric == "oldestPendingAgeSeconds");
AssertSample(samples, "queueDepth", KpiScopes.Global, null, 0);
AssertSample(samples, "stuckCount", KpiScopes.Global, null, 0);
AssertSample(samples, "parkedCount", KpiScopes.Global, null, 0);
AssertSample(samples, "deliveredLastInterval", KpiScopes.Global, null, 0);
}
[Fact]
public async Task CollectAsync_ReturnsEmptyList_NeverNull_WhenNothingToReport()
{
// ComputeKpisAsync always returns a global snapshot; the only way the list is empty is
// a guard that produces no samples. Confirm an all-zero global with null age still yields
// the four count metrics (i.e. the list is never null even at idle).
var repository = Substitute.For<INotificationOutboxRepository>();
StubEmptySnapshots(repository);
var source = CreateSource(repository);
var samples = await source.CollectAsync(CapturedAt);
Assert.NotNull(samples);
}
private static void StubEmptySnapshots(INotificationOutboxRepository repository)
{
repository.ComputeKpisAsync(Arg.Any<DateTimeOffset>(), Arg.Any<DateTimeOffset>(), Arg.Any<CancellationToken>())
.Returns(new NotificationKpiSnapshot(0, 0, 0, 0, null));
repository.ComputePerSiteKpisAsync(Arg.Any<DateTimeOffset>(), Arg.Any<DateTimeOffset>(), Arg.Any<CancellationToken>())
.Returns(Array.Empty<SiteNotificationKpiSnapshot>());
repository.ComputePerNodeKpisAsync(Arg.Any<DateTimeOffset>(), Arg.Any<DateTimeOffset>(), Arg.Any<CancellationToken>())
.Returns(Array.Empty<NodeNotificationKpiSnapshot>());
}
private static void AssertSample(
IReadOnlyList<KpiSample> samples, string metric, string scope, string? scopeKey, double value)
{
var match = Assert.Single(
samples, s => s.Metric == metric && s.Scope == scope && s.ScopeKey == scopeKey);
Assert.Equal(value, match.Value);
}
}
@@ -23,6 +23,7 @@
<ItemGroup>
<ProjectReference Include="../../src/ZB.MOM.WW.ScadaBridge.NotificationService/ZB.MOM.WW.ScadaBridge.NotificationService.csproj" />
<ProjectReference Include="../../src/ZB.MOM.WW.ScadaBridge.NotificationOutbox/ZB.MOM.WW.ScadaBridge.NotificationOutbox.csproj" />
<ProjectReference Include="../../src/ZB.MOM.WW.ScadaBridge.Commons/ZB.MOM.WW.ScadaBridge.Commons.csproj" />
<ProjectReference Include="../../src/ZB.MOM.WW.ScadaBridge.StoreAndForward/ZB.MOM.WW.ScadaBridge.StoreAndForward.csproj" />
</ItemGroup>