256 lines
10 KiB
C#
256 lines
10 KiB
C#
using Akka.Actor;
|
|
using Akka.TestKit.Xunit2;
|
|
using Microsoft.Extensions.DependencyInjection;
|
|
using Microsoft.Extensions.Logging.Abstractions;
|
|
using ZB.MOM.WW.ScadaBridge.Commons.Entities.Kpi;
|
|
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Kpi;
|
|
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories;
|
|
using ZB.MOM.WW.ScadaBridge.Commons.Types.Kpi;
|
|
|
|
namespace ZB.MOM.WW.ScadaBridge.KpiHistory.Tests;
|
|
|
|
/// <summary>
|
|
/// K4 tests for <see cref="KpiHistoryRecorderActor"/>. The actor's internal
|
|
/// <c>SampleTick</c>/<c>PurgeTick</c> messages are exposed to this assembly via
|
|
/// <c>InternalsVisibleTo</c> so a tick can be driven deterministically without
|
|
/// racing the periodic timer. Hand-rolled fakes (no mocking lib in this test
|
|
/// project) record what the recorder hands to the repository.
|
|
/// </summary>
|
|
public class KpiHistoryRecorderActorTests : TestKit
|
|
{
|
|
/// <summary>
|
|
/// A healthy sample source that returns a fixed set of samples, stamping each with the
|
|
/// <c>capturedAtUtc</c> the recorder supplies.
|
|
/// </summary>
|
|
private sealed class HealthySource : IKpiSampleSource
|
|
{
|
|
public string Source => KpiSources.NotificationOutbox;
|
|
|
|
public Task<IReadOnlyList<KpiSample>> CollectAsync(
|
|
DateTime capturedAtUtc, CancellationToken cancellationToken = default)
|
|
{
|
|
IReadOnlyList<KpiSample> samples = new List<KpiSample>
|
|
{
|
|
new()
|
|
{
|
|
Source = Source,
|
|
Metric = "QueueDepth",
|
|
Scope = KpiScopes.Global,
|
|
ScopeKey = null,
|
|
Value = 7,
|
|
CapturedAtUtc = capturedAtUtc,
|
|
},
|
|
new()
|
|
{
|
|
Source = Source,
|
|
Metric = "ParkedCount",
|
|
Scope = KpiScopes.Global,
|
|
ScopeKey = null,
|
|
Value = 2,
|
|
CapturedAtUtc = capturedAtUtc,
|
|
},
|
|
};
|
|
return Task.FromResult(samples);
|
|
}
|
|
}
|
|
|
|
/// <summary>A source whose <see cref="CollectAsync"/> always throws — must not abort the pass.</summary>
|
|
private sealed class ThrowingSource : IKpiSampleSource
|
|
{
|
|
public string Source => KpiSources.SiteCallAudit;
|
|
|
|
public Task<IReadOnlyList<KpiSample>> CollectAsync(
|
|
DateTime capturedAtUtc, CancellationToken cancellationToken = default) =>
|
|
throw new InvalidOperationException("simulated source failure");
|
|
}
|
|
|
|
/// <summary>
|
|
/// A source that throws on the first <see cref="CollectAsync"/> call and returns a
|
|
/// healthy sample on every subsequent call. Used to drive a faulted first tick followed
|
|
/// by a healthy second tick on the <em>same</em> actor instance.
|
|
/// </summary>
|
|
private sealed class ThrowOnceSource : IKpiSampleSource
|
|
{
|
|
private int _callCount;
|
|
|
|
public string Source => KpiSources.SiteCallAudit;
|
|
|
|
public Task<IReadOnlyList<KpiSample>> CollectAsync(
|
|
DateTime capturedAtUtc, CancellationToken cancellationToken = default)
|
|
{
|
|
if (Interlocked.Increment(ref _callCount) == 1)
|
|
throw new InvalidOperationException("simulated first-call source failure");
|
|
|
|
IReadOnlyList<KpiSample> samples = new[]
|
|
{
|
|
new KpiSample
|
|
{
|
|
Source = Source,
|
|
Metric = "RecoveredSample",
|
|
Scope = KpiScopes.Global,
|
|
ScopeKey = null,
|
|
Value = 1,
|
|
CapturedAtUtc = capturedAtUtc,
|
|
},
|
|
};
|
|
return Task.FromResult(samples);
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Recording repository fake. Captures the samples handed to
|
|
/// <see cref="RecordSamplesAsync"/> and the cut-off handed to
|
|
/// <see cref="PurgeOlderThanAsync"/>.
|
|
/// </summary>
|
|
private sealed class RecordingRepository : IKpiHistoryRepository
|
|
{
|
|
private readonly object _gate = new();
|
|
private readonly List<KpiSample> _recorded = new();
|
|
private DateTime? _purgeCutoff;
|
|
|
|
public IReadOnlyList<KpiSample> Recorded
|
|
{
|
|
get { lock (_gate) { return _recorded.ToArray(); } }
|
|
}
|
|
|
|
// PurgeOlderThanAsync runs on a threadpool thread; guard the field with
|
|
// the same _gate lock used by _recorded so test-thread reads are race-free.
|
|
public DateTime? PurgeCutoff
|
|
{
|
|
get { lock (_gate) { return _purgeCutoff; } }
|
|
}
|
|
|
|
public Task RecordSamplesAsync(
|
|
IReadOnlyCollection<KpiSample> samples, CancellationToken cancellationToken = default)
|
|
{
|
|
lock (_gate)
|
|
{
|
|
_recorded.AddRange(samples);
|
|
}
|
|
return Task.CompletedTask;
|
|
}
|
|
|
|
public Task<IReadOnlyList<KpiSeriesPoint>> GetRawSeriesAsync(
|
|
string source, string metric, string scope, string? scopeKey,
|
|
DateTime fromUtc, DateTime toUtc, CancellationToken cancellationToken = default) =>
|
|
Task.FromResult<IReadOnlyList<KpiSeriesPoint>>(Array.Empty<KpiSeriesPoint>());
|
|
|
|
public Task<int> PurgeOlderThanAsync(DateTime before, CancellationToken cancellationToken = default)
|
|
{
|
|
lock (_gate) { _purgeCutoff = before; }
|
|
return Task.FromResult(0);
|
|
}
|
|
}
|
|
|
|
private IServiceProvider BuildServiceProvider(
|
|
IKpiHistoryRepository repository, params IKpiSampleSource[] sources)
|
|
{
|
|
var services = new ServiceCollection();
|
|
// Mirror production: sources + repository are scoped, so the recorder opens a fresh
|
|
// scope per tick and resolves there.
|
|
foreach (var source in sources)
|
|
{
|
|
var captured = source;
|
|
services.AddScoped(_ => captured);
|
|
}
|
|
services.AddScoped(_ => repository);
|
|
return services.BuildServiceProvider();
|
|
}
|
|
|
|
/// <summary>
|
|
/// Creates the recorder with both timers set to a long interval so neither periodic timer
|
|
/// fires during a test — ticks are sent manually instead.
|
|
/// </summary>
|
|
private IActorRef CreateActor(IServiceProvider serviceProvider, KpiHistoryOptions? options = null)
|
|
{
|
|
return Sys.ActorOf(Props.Create(() => new KpiHistoryRecorderActor(
|
|
serviceProvider,
|
|
options ?? new KpiHistoryOptions
|
|
{
|
|
SampleInterval = TimeSpan.FromHours(1),
|
|
PurgeInterval = TimeSpan.FromHours(1),
|
|
},
|
|
NullLogger<KpiHistoryRecorderActor>.Instance)));
|
|
}
|
|
|
|
[Fact]
|
|
public void SampleTick_WritesHealthySourceSamples_AndThrowingSourceDoesNotAbortTick()
|
|
{
|
|
var repository = new RecordingRepository();
|
|
// Order the throwing source FIRST so the test also proves a throw early in the
|
|
// enumeration does not suppress a later healthy source.
|
|
var sp = BuildServiceProvider(repository, new ThrowingSource(), new HealthySource());
|
|
var actor = CreateActor(sp);
|
|
|
|
actor.Tell(KpiHistoryRecorderActor.SampleTick.Instance);
|
|
|
|
AwaitAssert(
|
|
() =>
|
|
{
|
|
// The healthy source's two samples were written despite the throwing source.
|
|
Assert.Equal(2, repository.Recorded.Count);
|
|
Assert.Contains(repository.Recorded, s => s.Metric == "QueueDepth" && s.Value == 7);
|
|
Assert.Contains(repository.Recorded, s => s.Metric == "ParkedCount" && s.Value == 2);
|
|
},
|
|
duration: TimeSpan.FromSeconds(3),
|
|
interval: TimeSpan.FromMilliseconds(50));
|
|
}
|
|
|
|
[Fact]
|
|
public void PurgeTick_CallsPurgeWithCutoff_AtUtcNowMinusRetentionDays()
|
|
{
|
|
var repository = new RecordingRepository();
|
|
var sp = BuildServiceProvider(repository, new HealthySource());
|
|
const int retentionDays = 30;
|
|
var actor = CreateActor(sp, new KpiHistoryOptions
|
|
{
|
|
SampleInterval = TimeSpan.FromHours(1),
|
|
PurgeInterval = TimeSpan.FromHours(1),
|
|
RetentionDays = retentionDays,
|
|
});
|
|
|
|
actor.Tell(KpiHistoryRecorderActor.PurgeTick.Instance);
|
|
|
|
AwaitAssert(
|
|
() =>
|
|
{
|
|
Assert.NotNull(repository.PurgeCutoff);
|
|
var expected = DateTime.UtcNow - TimeSpan.FromDays(retentionDays);
|
|
Assert.True(
|
|
Math.Abs((repository.PurgeCutoff!.Value - expected).TotalMinutes) < 1.0,
|
|
$"purge cutoff {repository.PurgeCutoff:o} should be within 1 minute of {expected:o}");
|
|
},
|
|
duration: TimeSpan.FromSeconds(3),
|
|
interval: TimeSpan.FromMilliseconds(50));
|
|
}
|
|
|
|
[Fact]
|
|
public void FaultedTick_DoesNotCrashActor_AndSubsequentTickStillRuns()
|
|
{
|
|
// ThrowOnceSource throws on the first CollectAsync call and returns a healthy
|
|
// sample on every subsequent call. This lets us send two ticks to the SAME
|
|
// actor instance and verify that:
|
|
// • The first tick (faulted source) records nothing but does not crash the actor.
|
|
// • The second tick reaches the same actor and records the recovered sample,
|
|
// proving the singleton's message loop is still alive after a faulted pass.
|
|
var repository = new RecordingRepository();
|
|
var sp = BuildServiceProvider(repository, new ThrowOnceSource());
|
|
var actor = CreateActor(sp);
|
|
|
|
// First tick: source throws on first call — caught per-source, nothing written, actor lives.
|
|
actor.Tell(KpiHistoryRecorderActor.SampleTick.Instance);
|
|
AwaitAssert(
|
|
() => Assert.Empty(repository.Recorded),
|
|
duration: TimeSpan.FromSeconds(2),
|
|
interval: TimeSpan.FromMilliseconds(50));
|
|
|
|
// Second tick to the SAME actor: source now returns a healthy sample.
|
|
// AwaitAssert confirms the actor processed the message and recorded it.
|
|
actor.Tell(KpiHistoryRecorderActor.SampleTick.Instance);
|
|
AwaitAssert(
|
|
() => Assert.Single(repository.Recorded),
|
|
duration: TimeSpan.FromSeconds(3),
|
|
interval: TimeSpan.FromMilliseconds(50));
|
|
}
|
|
}
|