using Akka.Actor; using Akka.TestKit.Xunit2; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging.Abstractions; using ZB.MOM.WW.ScadaBridge.Commons.Entities.Kpi; using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Kpi; using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories; using ZB.MOM.WW.ScadaBridge.Commons.Types.Kpi; namespace ZB.MOM.WW.ScadaBridge.KpiHistory.Tests; /// /// K4 tests for . The actor's internal /// SampleTick/PurgeTick messages are exposed to this assembly via /// InternalsVisibleTo so a tick can be driven deterministically without /// racing the periodic timer. Hand-rolled fakes (no mocking lib in this test /// project) record what the recorder hands to the repository. /// public class KpiHistoryRecorderActorTests : TestKit { /// /// A healthy sample source that returns a fixed set of samples, stamping each with the /// capturedAtUtc the recorder supplies. /// private sealed class HealthySource : IKpiSampleSource { public string Source => KpiSources.NotificationOutbox; public Task> CollectAsync( DateTime capturedAtUtc, CancellationToken cancellationToken = default) { IReadOnlyList samples = new List { new() { Source = Source, Metric = "QueueDepth", Scope = KpiScopes.Global, ScopeKey = null, Value = 7, CapturedAtUtc = capturedAtUtc, }, new() { Source = Source, Metric = "ParkedCount", Scope = KpiScopes.Global, ScopeKey = null, Value = 2, CapturedAtUtc = capturedAtUtc, }, }; return Task.FromResult(samples); } } /// A source whose always throws — must not abort the pass. private sealed class ThrowingSource : IKpiSampleSource { public string Source => KpiSources.SiteCallAudit; public Task> CollectAsync( DateTime capturedAtUtc, CancellationToken cancellationToken = default) => throw new InvalidOperationException("simulated source failure"); } /// /// Recording repository fake. Captures the samples handed to /// and the cut-off handed to /// . /// private sealed class RecordingRepository : IKpiHistoryRepository { private readonly object _gate = new(); private readonly List _recorded = new(); public IReadOnlyList Recorded { get { lock (_gate) { return _recorded.ToArray(); } } } public DateTime? PurgeCutoff { get; private set; } public Task RecordSamplesAsync( IReadOnlyCollection samples, CancellationToken cancellationToken = default) { lock (_gate) { _recorded.AddRange(samples); } return Task.CompletedTask; } public Task> GetRawSeriesAsync( string source, string metric, string scope, string? scopeKey, DateTime fromUtc, DateTime toUtc, CancellationToken cancellationToken = default) => Task.FromResult>(Array.Empty()); public Task PurgeOlderThanAsync(DateTime before, CancellationToken cancellationToken = default) { PurgeCutoff = before; return Task.FromResult(0); } } private IServiceProvider BuildServiceProvider( IKpiHistoryRepository repository, params IKpiSampleSource[] sources) { var services = new ServiceCollection(); // Mirror production: sources + repository are scoped, so the recorder opens a fresh // scope per tick and resolves there. foreach (var source in sources) { var captured = source; services.AddScoped(_ => captured); } services.AddScoped(_ => repository); return services.BuildServiceProvider(); } /// /// Creates the recorder with both timers set to a long interval so neither periodic timer /// fires during a test — ticks are sent manually instead. /// private IActorRef CreateActor(IServiceProvider serviceProvider, KpiHistoryOptions? options = null) { return Sys.ActorOf(Props.Create(() => new KpiHistoryRecorderActor( serviceProvider, options ?? new KpiHistoryOptions { SampleInterval = TimeSpan.FromHours(1), PurgeInterval = TimeSpan.FromHours(1), }, NullLogger.Instance))); } [Fact] public void SampleTick_WritesHealthySourceSamples_AndThrowingSourceDoesNotAbortTick() { var repository = new RecordingRepository(); // Order the throwing source FIRST so the test also proves a throw early in the // enumeration does not suppress a later healthy source. var sp = BuildServiceProvider(repository, new ThrowingSource(), new HealthySource()); var actor = CreateActor(sp); actor.Tell(KpiHistoryRecorderActor.SampleTick.Instance); AwaitAssert( () => { // The healthy source's two samples were written despite the throwing source. Assert.Equal(2, repository.Recorded.Count); Assert.Contains(repository.Recorded, s => s.Metric == "QueueDepth" && s.Value == 7); Assert.Contains(repository.Recorded, s => s.Metric == "ParkedCount" && s.Value == 2); }, duration: TimeSpan.FromSeconds(3), interval: TimeSpan.FromMilliseconds(50)); } [Fact] public void PurgeTick_CallsPurgeWithCutoff_AtUtcNowMinusRetentionDays() { var repository = new RecordingRepository(); var sp = BuildServiceProvider(repository, new HealthySource()); const int retentionDays = 30; var actor = CreateActor(sp, new KpiHistoryOptions { SampleInterval = TimeSpan.FromHours(1), PurgeInterval = TimeSpan.FromHours(1), RetentionDays = retentionDays, }); actor.Tell(KpiHistoryRecorderActor.PurgeTick.Instance); AwaitAssert( () => { Assert.NotNull(repository.PurgeCutoff); var expected = DateTime.UtcNow - TimeSpan.FromDays(retentionDays); Assert.True( Math.Abs((repository.PurgeCutoff!.Value - expected).TotalMinutes) < 1.0, $"purge cutoff {repository.PurgeCutoff:o} should be within 1 minute of {expected:o}"); }, duration: TimeSpan.FromSeconds(3), interval: TimeSpan.FromMilliseconds(50)); } [Fact] public void FaultedTick_DoesNotCrashActor_AndSubsequentTickStillRuns() { var repository = new RecordingRepository(); // A pass containing ONLY a throwing source records nothing but must not crash the // actor; a later healthy tick proves the singleton survived. var sp = BuildServiceProvider(repository, new ThrowingSource()); var actor = CreateActor(sp); // First tick: the only source throws — caught per-source, nothing written, actor lives. actor.Tell(KpiHistoryRecorderActor.SampleTick.Instance); AwaitAssert( () => Assert.Empty(repository.Recorded), duration: TimeSpan.FromSeconds(1), interval: TimeSpan.FromMilliseconds(50)); // Second tick on a fresh actor backed by a healthy source proves the message loop is // still alive and the recorder still records after a faulted pass on the prior actor. var healthyRepo = new RecordingRepository(); var healthySp = BuildServiceProvider(healthyRepo, new HealthySource()); var healthyActor = CreateActor(healthySp); healthyActor.Tell(KpiHistoryRecorderActor.SampleTick.Instance); AwaitAssert( () => Assert.Equal(2, healthyRepo.Recorded.Count), duration: TimeSpan.FromSeconds(3), interval: TimeSpan.FromMilliseconds(50)); } }