using Akka.Actor; using Akka.TestKit.Xunit2; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging.Abstractions; using ZB.MOM.WW.ScadaBridge.Commons.Entities.Kpi; using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Kpi; using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories; using ZB.MOM.WW.ScadaBridge.Commons.Types.Kpi; namespace ZB.MOM.WW.ScadaBridge.KpiHistory.Tests; /// /// K4 tests for . The actor's internal /// SampleTick/PurgeTick messages are exposed to this assembly via /// InternalsVisibleTo so a tick can be driven deterministically without /// racing the periodic timer. Hand-rolled fakes (no mocking lib in this test /// project) record what the recorder hands to the repository. /// public class KpiHistoryRecorderActorTests : TestKit { /// /// A healthy sample source that returns a fixed set of samples, stamping each with the /// capturedAtUtc the recorder supplies. /// private sealed class HealthySource : IKpiSampleSource { public string Source => KpiSources.NotificationOutbox; public Task> CollectAsync( DateTime capturedAtUtc, CancellationToken cancellationToken = default) { IReadOnlyList samples = new List { new() { Source = Source, Metric = "QueueDepth", Scope = KpiScopes.Global, ScopeKey = null, Value = 7, CapturedAtUtc = capturedAtUtc, }, new() { Source = Source, Metric = "ParkedCount", Scope = KpiScopes.Global, ScopeKey = null, Value = 2, CapturedAtUtc = capturedAtUtc, }, }; return Task.FromResult(samples); } } /// A source whose always throws — must not abort the pass. private sealed class ThrowingSource : IKpiSampleSource { public string Source => KpiSources.SiteCallAudit; public Task> CollectAsync( DateTime capturedAtUtc, CancellationToken cancellationToken = default) => throw new InvalidOperationException("simulated source failure"); } /// /// A source that throws on the first call and returns a /// healthy sample on every subsequent call. Used to drive a faulted first tick followed /// by a healthy second tick on the same actor instance. /// private sealed class ThrowOnceSource : IKpiSampleSource { private int _callCount; public string Source => KpiSources.SiteCallAudit; public Task> CollectAsync( DateTime capturedAtUtc, CancellationToken cancellationToken = default) { if (Interlocked.Increment(ref _callCount) == 1) throw new InvalidOperationException("simulated first-call source failure"); IReadOnlyList samples = new[] { new KpiSample { Source = Source, Metric = "RecoveredSample", Scope = KpiScopes.Global, ScopeKey = null, Value = 1, CapturedAtUtc = capturedAtUtc, }, }; return Task.FromResult(samples); } } /// /// Recording repository fake. Captures the samples handed to /// and the cut-off handed to /// . /// private sealed class RecordingRepository : IKpiHistoryRepository { private readonly object _gate = new(); private readonly List _recorded = new(); private DateTime? _purgeCutoff; public IReadOnlyList Recorded { get { lock (_gate) { return _recorded.ToArray(); } } } // PurgeOlderThanAsync runs on a threadpool thread; guard the field with // the same _gate lock used by _recorded so test-thread reads are race-free. public DateTime? PurgeCutoff { get { lock (_gate) { return _purgeCutoff; } } } public Task RecordSamplesAsync( IReadOnlyCollection samples, CancellationToken cancellationToken = default) { lock (_gate) { _recorded.AddRange(samples); } return Task.CompletedTask; } public Task> GetRawSeriesAsync( string source, string metric, string scope, string? scopeKey, DateTime fromUtc, DateTime toUtc, CancellationToken cancellationToken = default) => Task.FromResult>(Array.Empty()); public Task PurgeOlderThanAsync(DateTime before, CancellationToken cancellationToken = default) { lock (_gate) { _purgeCutoff = before; } return Task.FromResult(0); } } private IServiceProvider BuildServiceProvider( IKpiHistoryRepository repository, params IKpiSampleSource[] sources) { var services = new ServiceCollection(); // Mirror production: sources + repository are scoped, so the recorder opens a fresh // scope per tick and resolves there. foreach (var source in sources) { var captured = source; services.AddScoped(_ => captured); } services.AddScoped(_ => repository); return services.BuildServiceProvider(); } /// /// Creates the recorder with both timers set to a long interval so neither periodic timer /// fires during a test — ticks are sent manually instead. /// private IActorRef CreateActor(IServiceProvider serviceProvider, KpiHistoryOptions? options = null) { return Sys.ActorOf(Props.Create(() => new KpiHistoryRecorderActor( serviceProvider, options ?? new KpiHistoryOptions { SampleInterval = TimeSpan.FromHours(1), PurgeInterval = TimeSpan.FromHours(1), }, NullLogger.Instance))); } [Fact] public void SampleTick_WritesHealthySourceSamples_AndThrowingSourceDoesNotAbortTick() { var repository = new RecordingRepository(); // Order the throwing source FIRST so the test also proves a throw early in the // enumeration does not suppress a later healthy source. var sp = BuildServiceProvider(repository, new ThrowingSource(), new HealthySource()); var actor = CreateActor(sp); actor.Tell(KpiHistoryRecorderActor.SampleTick.Instance); AwaitAssert( () => { // The healthy source's two samples were written despite the throwing source. Assert.Equal(2, repository.Recorded.Count); Assert.Contains(repository.Recorded, s => s.Metric == "QueueDepth" && s.Value == 7); Assert.Contains(repository.Recorded, s => s.Metric == "ParkedCount" && s.Value == 2); }, duration: TimeSpan.FromSeconds(3), interval: TimeSpan.FromMilliseconds(50)); } [Fact] public void PurgeTick_CallsPurgeWithCutoff_AtUtcNowMinusRetentionDays() { var repository = new RecordingRepository(); var sp = BuildServiceProvider(repository, new HealthySource()); const int retentionDays = 30; var actor = CreateActor(sp, new KpiHistoryOptions { SampleInterval = TimeSpan.FromHours(1), PurgeInterval = TimeSpan.FromHours(1), RetentionDays = retentionDays, }); actor.Tell(KpiHistoryRecorderActor.PurgeTick.Instance); AwaitAssert( () => { Assert.NotNull(repository.PurgeCutoff); var expected = DateTime.UtcNow - TimeSpan.FromDays(retentionDays); Assert.True( Math.Abs((repository.PurgeCutoff!.Value - expected).TotalMinutes) < 1.0, $"purge cutoff {repository.PurgeCutoff:o} should be within 1 minute of {expected:o}"); }, duration: TimeSpan.FromSeconds(3), interval: TimeSpan.FromMilliseconds(50)); } [Fact] public void FaultedTick_DoesNotCrashActor_AndSubsequentTickStillRuns() { // ThrowOnceSource throws on the first CollectAsync call and returns a healthy // sample on every subsequent call. This lets us send two ticks to the SAME // actor instance and verify that: // • The first tick (faulted source) records nothing but does not crash the actor. // • The second tick reaches the same actor and records the recovered sample, // proving the singleton's message loop is still alive after a faulted pass. var repository = new RecordingRepository(); var sp = BuildServiceProvider(repository, new ThrowOnceSource()); var actor = CreateActor(sp); // First tick: source throws on first call — caught per-source, nothing written, actor lives. actor.Tell(KpiHistoryRecorderActor.SampleTick.Instance); AwaitAssert( () => Assert.Empty(repository.Recorded), duration: TimeSpan.FromSeconds(2), interval: TimeSpan.FromMilliseconds(50)); // Second tick to the SAME actor: source now returns a healthy sample. // AwaitAssert confirms the actor processed the message and recorded it. actor.Tell(KpiHistoryRecorderActor.SampleTick.Instance); AwaitAssert( () => Assert.Single(repository.Recorded), duration: TimeSpan.FromSeconds(3), interval: TimeSpan.FromMilliseconds(50)); } }