feat(kpi): K4 — KpiHistoryRecorderActor (best-effort sampling + daily purge)

This commit is contained in:
Joseph Doherty
2026-06-17 20:06:09 -04:00
parent 76f5ed72e4
commit 9c2e7ab4cb
4 changed files with 531 additions and 0 deletions
@@ -0,0 +1,215 @@
using Akka.Actor;
using Akka.TestKit.Xunit2;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging.Abstractions;
using ZB.MOM.WW.ScadaBridge.Commons.Entities.Kpi;
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Kpi;
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories;
using ZB.MOM.WW.ScadaBridge.Commons.Types.Kpi;
namespace ZB.MOM.WW.ScadaBridge.KpiHistory.Tests;
/// <summary>
/// K4 tests for <see cref="KpiHistoryRecorderActor"/>. The actor's internal
/// <c>SampleTick</c>/<c>PurgeTick</c> messages are exposed to this assembly via
/// <c>InternalsVisibleTo</c> so a tick can be driven deterministically without
/// racing the periodic timer. Hand-rolled fakes (no mocking lib in this test
/// project) record what the recorder hands to the repository.
/// </summary>
public class KpiHistoryRecorderActorTests : TestKit
{
/// <summary>
/// A healthy sample source that returns a fixed set of samples, stamping each with the
/// <c>capturedAtUtc</c> the recorder supplies.
/// </summary>
private sealed class HealthySource : IKpiSampleSource
{
public string Source => KpiSources.NotificationOutbox;
public Task<IReadOnlyList<KpiSample>> CollectAsync(
DateTime capturedAtUtc, CancellationToken cancellationToken = default)
{
IReadOnlyList<KpiSample> samples = new List<KpiSample>
{
new()
{
Source = Source,
Metric = "QueueDepth",
Scope = KpiScopes.Global,
ScopeKey = null,
Value = 7,
CapturedAtUtc = capturedAtUtc,
},
new()
{
Source = Source,
Metric = "ParkedCount",
Scope = KpiScopes.Global,
ScopeKey = null,
Value = 2,
CapturedAtUtc = capturedAtUtc,
},
};
return Task.FromResult(samples);
}
}
/// <summary>A source whose <see cref="CollectAsync"/> always throws — must not abort the pass.</summary>
private sealed class ThrowingSource : IKpiSampleSource
{
public string Source => KpiSources.SiteCallAudit;
public Task<IReadOnlyList<KpiSample>> CollectAsync(
DateTime capturedAtUtc, CancellationToken cancellationToken = default) =>
throw new InvalidOperationException("simulated source failure");
}
/// <summary>
/// Recording repository fake. Captures the samples handed to
/// <see cref="RecordSamplesAsync"/> and the cut-off handed to
/// <see cref="PurgeOlderThanAsync"/>.
/// </summary>
private sealed class RecordingRepository : IKpiHistoryRepository
{
private readonly object _gate = new();
private readonly List<KpiSample> _recorded = new();
public IReadOnlyList<KpiSample> Recorded
{
get { lock (_gate) { return _recorded.ToArray(); } }
}
public DateTime? PurgeCutoff { get; private set; }
public Task RecordSamplesAsync(
IReadOnlyCollection<KpiSample> samples, CancellationToken cancellationToken = default)
{
lock (_gate)
{
_recorded.AddRange(samples);
}
return Task.CompletedTask;
}
public Task<IReadOnlyList<KpiSeriesPoint>> GetRawSeriesAsync(
string source, string metric, string scope, string? scopeKey,
DateTime fromUtc, DateTime toUtc, CancellationToken cancellationToken = default) =>
Task.FromResult<IReadOnlyList<KpiSeriesPoint>>(Array.Empty<KpiSeriesPoint>());
public Task<int> PurgeOlderThanAsync(DateTime before, CancellationToken cancellationToken = default)
{
PurgeCutoff = before;
return Task.FromResult(0);
}
}
private IServiceProvider BuildServiceProvider(
IKpiHistoryRepository repository, params IKpiSampleSource[] sources)
{
var services = new ServiceCollection();
// Mirror production: sources + repository are scoped, so the recorder opens a fresh
// scope per tick and resolves there.
foreach (var source in sources)
{
var captured = source;
services.AddScoped(_ => captured);
}
services.AddScoped(_ => repository);
return services.BuildServiceProvider();
}
/// <summary>
/// Creates the recorder with both timers set to a long interval so neither periodic timer
/// fires during a test — ticks are sent manually instead.
/// </summary>
private IActorRef CreateActor(IServiceProvider serviceProvider, KpiHistoryOptions? options = null)
{
return Sys.ActorOf(Props.Create(() => new KpiHistoryRecorderActor(
serviceProvider,
options ?? new KpiHistoryOptions
{
SampleInterval = TimeSpan.FromHours(1),
PurgeInterval = TimeSpan.FromHours(1),
},
NullLogger<KpiHistoryRecorderActor>.Instance)));
}
[Fact]
public void SampleTick_WritesHealthySourceSamples_AndThrowingSourceDoesNotAbortTick()
{
var repository = new RecordingRepository();
// Order the throwing source FIRST so the test also proves a throw early in the
// enumeration does not suppress a later healthy source.
var sp = BuildServiceProvider(repository, new ThrowingSource(), new HealthySource());
var actor = CreateActor(sp);
actor.Tell(KpiHistoryRecorderActor.SampleTick.Instance);
AwaitAssert(
() =>
{
// The healthy source's two samples were written despite the throwing source.
Assert.Equal(2, repository.Recorded.Count);
Assert.Contains(repository.Recorded, s => s.Metric == "QueueDepth" && s.Value == 7);
Assert.Contains(repository.Recorded, s => s.Metric == "ParkedCount" && s.Value == 2);
},
duration: TimeSpan.FromSeconds(3),
interval: TimeSpan.FromMilliseconds(50));
}
[Fact]
public void PurgeTick_CallsPurgeWithCutoff_AtUtcNowMinusRetentionDays()
{
var repository = new RecordingRepository();
var sp = BuildServiceProvider(repository, new HealthySource());
const int retentionDays = 30;
var actor = CreateActor(sp, new KpiHistoryOptions
{
SampleInterval = TimeSpan.FromHours(1),
PurgeInterval = TimeSpan.FromHours(1),
RetentionDays = retentionDays,
});
actor.Tell(KpiHistoryRecorderActor.PurgeTick.Instance);
AwaitAssert(
() =>
{
Assert.NotNull(repository.PurgeCutoff);
var expected = DateTime.UtcNow - TimeSpan.FromDays(retentionDays);
Assert.True(
Math.Abs((repository.PurgeCutoff!.Value - expected).TotalMinutes) < 1.0,
$"purge cutoff {repository.PurgeCutoff:o} should be within 1 minute of {expected:o}");
},
duration: TimeSpan.FromSeconds(3),
interval: TimeSpan.FromMilliseconds(50));
}
[Fact]
public void FaultedTick_DoesNotCrashActor_AndSubsequentTickStillRuns()
{
var repository = new RecordingRepository();
// A pass containing ONLY a throwing source records nothing but must not crash the
// actor; a later healthy tick proves the singleton survived.
var sp = BuildServiceProvider(repository, new ThrowingSource());
var actor = CreateActor(sp);
// First tick: the only source throws — caught per-source, nothing written, actor lives.
actor.Tell(KpiHistoryRecorderActor.SampleTick.Instance);
AwaitAssert(
() => Assert.Empty(repository.Recorded),
duration: TimeSpan.FromSeconds(1),
interval: TimeSpan.FromMilliseconds(50));
// Second tick on a fresh actor backed by a healthy source proves the message loop is
// still alive and the recorder still records after a faulted pass on the prior actor.
var healthyRepo = new RecordingRepository();
var healthySp = BuildServiceProvider(healthyRepo, new HealthySource());
var healthyActor = CreateActor(healthySp);
healthyActor.Tell(KpiHistoryRecorderActor.SampleTick.Instance);
AwaitAssert(
() => Assert.Equal(2, healthyRepo.Recorded.Count),
duration: TimeSpan.FromSeconds(3),
interval: TimeSpan.FromMilliseconds(50));
}
}
@@ -9,7 +9,10 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Akka.TestKit.Xunit2" />
<PackageReference Include="coverlet.collector" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" />
<PackageReference Include="Microsoft.NET.Test.Sdk" />
<PackageReference Include="xunit" />
<PackageReference Include="xunit.runner.visualstudio" />