diff --git a/src/ZB.MOM.WW.ScadaBridge.KpiHistory/KpiHistoryRecorderActor.cs b/src/ZB.MOM.WW.ScadaBridge.KpiHistory/KpiHistoryRecorderActor.cs new file mode 100644 index 00000000..f2f942a0 --- /dev/null +++ b/src/ZB.MOM.WW.ScadaBridge.KpiHistory/KpiHistoryRecorderActor.cs @@ -0,0 +1,305 @@ +using Akka.Actor; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using ZB.MOM.WW.ScadaBridge.Commons.Entities.Kpi; +using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Kpi; +using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories; + +namespace ZB.MOM.WW.ScadaBridge.KpiHistory; + +/// +/// Central cluster-singleton (M6 "KPI History & Trends", K4) that drives the +/// KPI sampling backbone. On a periodic timer (default 60 s) it: +/// +/// Stamps a single capturedAtUtc for the pass. +/// Fans out to every DI-registered , +/// collecting each source's point-in-time samples. +/// Bulk-writes the combined batch through +/// . +/// +/// A separate daily timer (default 1 d) runs the retention purge, dropping rows +/// older than via +/// . +/// +/// +/// +/// Best-effort. KPI history is observability — it must NEVER disrupt the +/// system it observes. Every fault is contained: a throwing +/// is caught per-source so it neither aborts the +/// pass nor suppresses the other sources; a throwing repository write/purge is +/// caught and logged. No exception escapes either tick handler, so the singleton +/// stays alive across transient DB outages and misbehaving sources. +/// +/// +/// Off-thread work. The actual sampling and purge I/O runs off the actor +/// thread; the result is piped back to via +/// +/// so the mailbox is never blocked while a DB round-trip is in flight. This +/// mirrors the +/// timer + scope-per-tick + PipeTo pattern. +/// +/// +/// DI scopes. s and +/// are scoped EF Core-backed services; the +/// recorder opens a fresh per tick and resolves +/// there, asynchronously disposing it (CreateAsyncScope) so EF Core's +/// async connection cleanup does not block. +/// +/// +/// Singleton wiring. The Props is built in the Host (K5) on the +/// active central node with the constructor args resolved from DI — this actor +/// has no knowledge of Akka.Cluster.Tools, mirroring the Notification +/// Outbox (#21) singleton split. +/// +/// +public class KpiHistoryRecorderActor : ReceiveActor, IWithTimers +{ + private const string SampleTimerKey = "kpi-sample"; + private const string PurgeTimerKey = "kpi-purge"; + + private readonly IServiceProvider _serviceProvider; + private readonly KpiHistoryOptions _options; + private readonly ILogger _logger; + + /// + /// Lifecycle-scoped cancellation source, cancelled in so any + /// in-flight sample/purge pass observes a coordinated shutdown / failover promptly + /// instead of blocking the singleton handover for a full DB round-trip timeout. + /// + private CancellationTokenSource? _shutdownCts; + + /// Akka timer scheduler, assigned by the actor system via . + public ITimerScheduler Timers { get; set; } = null!; + + /// + /// Initializes the recorder with its dependencies and registers the tick handlers. + /// + /// DI service provider used to open a scope per tick for the sample sources and repository. + /// KPI history configuration options. + /// Logger for this actor. + public KpiHistoryRecorderActor( + IServiceProvider serviceProvider, + KpiHistoryOptions options, + ILogger logger) + { + _serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider)); + _options = options ?? throw new ArgumentNullException(nameof(options)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + + Receive(_ => HandleSampleTick()); + Receive(_ => { }); + Receive(_ => HandlePurgeTick()); + Receive(_ => { }); + } + + /// + protected override void PreStart() + { + base.PreStart(); + _shutdownCts = new CancellationTokenSource(); + + // Fire an initial sample shortly after start so the first trend point lands + // without waiting a full SampleInterval, then settle into the periodic cadence. + Timers.StartPeriodicTimer( + SampleTimerKey, + SampleTick.Instance, + initialDelay: TimeSpan.FromSeconds(5), + interval: _options.SampleInterval); + + // The purge is daily and idempotent — no initial fast tick; the first sweep + // fires after a full PurgeInterval. + Timers.StartPeriodicTimer( + PurgeTimerKey, + PurgeTick.Instance, + _options.PurgeInterval); + } + + /// + protected override void PostStop() + { + // Cancel before disposing so an in-flight pass observes cancellation; disposing + // first would race with a pass registering against the token. + try + { + _shutdownCts?.Cancel(); + } + catch (ObjectDisposedException) + { + // Already disposed under a restarted-actor race; nothing to do. + } + + _shutdownCts?.Dispose(); + _shutdownCts = null; + + base.PostStop(); + } + + /// + /// Handles a sample tick: captures the shared capturedAtUtc instant on the actor + /// thread, then launches the asynchronous sampling pass off-thread and pipes a + /// completion back to so the mailbox is never blocked while sources + /// are collected and the batch is written. + /// + private void HandleSampleTick() + { + var capturedAt = DateTime.UtcNow; + var cancellationToken = _shutdownCts?.Token ?? CancellationToken.None; + + // RunSamplePass self-isolates its faults (it never throws), but the failure + // projection is kept as a belt-and-braces guard so even a faulted task still + // produces a SampleComplete. + RunSamplePass(capturedAt, cancellationToken).PipeTo( + Self, + success: () => SampleComplete.Instance, + failure: ex => + { + _logger.LogError(ex, "KPI sample pass faulted unexpectedly."); + return SampleComplete.Instance; + }); + } + + /// + /// Runs a single sampling pass: opens a DI scope, enumerates every registered + /// , collects each source's samples (isolating per-source + /// faults), and bulk-writes the combined batch. The whole body is wrapped so the + /// returned task never faults — best-effort observability must never disrupt anything. + /// + private async Task RunSamplePass(DateTime capturedAt, CancellationToken cancellationToken) + { + try + { + await using var scope = _serviceProvider.CreateAsyncScope(); + var sources = scope.ServiceProvider.GetServices(); + var repository = scope.ServiceProvider.GetRequiredService(); + + var samples = new List(); + foreach (var source in sources) + { + try + { + var collected = await source.CollectAsync(capturedAt, cancellationToken); + samples.AddRange(collected); + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + // Shutdown interrupted collection; abandon the rest of the pass. The next + // active node samples on its own cadence. Not a failure. + return; + } + catch (Exception ex) + { + // A throwing source must NOT abort the pass or the other sources. + _logger.LogError(ex, "KPI source {Source} failed to collect samples.", source.Source); + } + } + + if (samples.Count == 0) + { + return; + } + + try + { + await repository.RecordSamplesAsync(samples, cancellationToken); + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + // Shutdown interrupted the write; the batch is dropped (best-effort). Not a failure. + } + catch (Exception ex) + { + _logger.LogError(ex, "KPI sample write failed for {SampleCount} sample(s).", samples.Count); + } + } + catch (Exception ex) + { + // Scope creation or service resolution faulted; swallow and log so the returned + // task completes normally and the singleton stays alive. + _logger.LogError(ex, "KPI sample pass failed unexpectedly."); + } + } + + /// + /// Handles a purge tick: computes the retention cut-off on the actor thread, then runs + /// the bulk delete off-thread and pipes a completion back to . Purges + /// are daily and idempotent, so no in-flight guard is needed. + /// + private void HandlePurgeTick() + { + var before = DateTime.UtcNow - TimeSpan.FromDays(_options.RetentionDays); + var cancellationToken = _shutdownCts?.Token ?? CancellationToken.None; + + RunPurgePass(before, cancellationToken).PipeTo( + Self, + success: deleted => + { + if (deleted > 0) + { + _logger.LogInformation( + "KPI history purge removed {DeletedCount} sample(s) older than {Cutoff:o}.", + deleted, before); + } + + return PurgeComplete.Instance; + }, + failure: ex => + { + _logger.LogError(ex, "KPI history purge faulted unexpectedly."); + return PurgeComplete.Instance; + }); + } + + /// + /// Runs a single purge sweep: opens a DI scope, resolves the repository, and bulk-deletes + /// rows captured before , returning the deleted count. The whole + /// body is wrapped so the returned task never faults — on failure the exception is logged + /// and 0 is returned, mirroring 's best-effort contract. + /// + private async Task RunPurgePass(DateTime before, CancellationToken cancellationToken) + { + try + { + await using var scope = _serviceProvider.CreateAsyncScope(); + var repository = scope.ServiceProvider.GetRequiredService(); + return await repository.PurgeOlderThanAsync(before, cancellationToken); + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + // Shutdown interrupted the purge; the next active sweep retries. Not a failure. + return 0; + } + catch (Exception ex) + { + _logger.LogError(ex, "KPI history purge failed unexpectedly."); + return 0; + } + } + + /// Self-tick triggering a sampling pass across all registered sources. + internal sealed class SampleTick + { + public static readonly SampleTick Instance = new(); + private SampleTick() { } + } + + /// Piped-back completion of a sampling pass; lets the pass run off the actor thread. + internal sealed class SampleComplete + { + public static readonly SampleComplete Instance = new(); + private SampleComplete() { } + } + + /// Self-tick triggering a retention purge sweep. + internal sealed class PurgeTick + { + public static readonly PurgeTick Instance = new(); + private PurgeTick() { } + } + + /// Piped-back completion of a purge sweep; lets the sweep run off the actor thread. + internal sealed class PurgeComplete + { + public static readonly PurgeComplete Instance = new(); + private PurgeComplete() { } + } +} diff --git a/src/ZB.MOM.WW.ScadaBridge.KpiHistory/ZB.MOM.WW.ScadaBridge.KpiHistory.csproj b/src/ZB.MOM.WW.ScadaBridge.KpiHistory/ZB.MOM.WW.ScadaBridge.KpiHistory.csproj index 77d993f3..c599bd18 100644 --- a/src/ZB.MOM.WW.ScadaBridge.KpiHistory/ZB.MOM.WW.ScadaBridge.KpiHistory.csproj +++ b/src/ZB.MOM.WW.ScadaBridge.KpiHistory/ZB.MOM.WW.ScadaBridge.KpiHistory.csproj @@ -8,6 +8,7 @@ + @@ -18,4 +19,11 @@ + + + + + diff --git a/tests/ZB.MOM.WW.ScadaBridge.KpiHistory.Tests/KpiHistoryRecorderActorTests.cs b/tests/ZB.MOM.WW.ScadaBridge.KpiHistory.Tests/KpiHistoryRecorderActorTests.cs new file mode 100644 index 00000000..d244e701 --- /dev/null +++ b/tests/ZB.MOM.WW.ScadaBridge.KpiHistory.Tests/KpiHistoryRecorderActorTests.cs @@ -0,0 +1,215 @@ +using Akka.Actor; +using Akka.TestKit.Xunit2; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging.Abstractions; +using ZB.MOM.WW.ScadaBridge.Commons.Entities.Kpi; +using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Kpi; +using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories; +using ZB.MOM.WW.ScadaBridge.Commons.Types.Kpi; + +namespace ZB.MOM.WW.ScadaBridge.KpiHistory.Tests; + +/// +/// K4 tests for . The actor's internal +/// SampleTick/PurgeTick messages are exposed to this assembly via +/// InternalsVisibleTo so a tick can be driven deterministically without +/// racing the periodic timer. Hand-rolled fakes (no mocking lib in this test +/// project) record what the recorder hands to the repository. +/// +public class KpiHistoryRecorderActorTests : TestKit +{ + /// + /// A healthy sample source that returns a fixed set of samples, stamping each with the + /// capturedAtUtc the recorder supplies. + /// + private sealed class HealthySource : IKpiSampleSource + { + public string Source => KpiSources.NotificationOutbox; + + public Task> CollectAsync( + DateTime capturedAtUtc, CancellationToken cancellationToken = default) + { + IReadOnlyList samples = new List + { + new() + { + Source = Source, + Metric = "QueueDepth", + Scope = KpiScopes.Global, + ScopeKey = null, + Value = 7, + CapturedAtUtc = capturedAtUtc, + }, + new() + { + Source = Source, + Metric = "ParkedCount", + Scope = KpiScopes.Global, + ScopeKey = null, + Value = 2, + CapturedAtUtc = capturedAtUtc, + }, + }; + return Task.FromResult(samples); + } + } + + /// A source whose always throws — must not abort the pass. + private sealed class ThrowingSource : IKpiSampleSource + { + public string Source => KpiSources.SiteCallAudit; + + public Task> CollectAsync( + DateTime capturedAtUtc, CancellationToken cancellationToken = default) => + throw new InvalidOperationException("simulated source failure"); + } + + /// + /// Recording repository fake. Captures the samples handed to + /// and the cut-off handed to + /// . + /// + private sealed class RecordingRepository : IKpiHistoryRepository + { + private readonly object _gate = new(); + private readonly List _recorded = new(); + + public IReadOnlyList Recorded + { + get { lock (_gate) { return _recorded.ToArray(); } } + } + + public DateTime? PurgeCutoff { get; private set; } + + public Task RecordSamplesAsync( + IReadOnlyCollection samples, CancellationToken cancellationToken = default) + { + lock (_gate) + { + _recorded.AddRange(samples); + } + return Task.CompletedTask; + } + + public Task> GetRawSeriesAsync( + string source, string metric, string scope, string? scopeKey, + DateTime fromUtc, DateTime toUtc, CancellationToken cancellationToken = default) => + Task.FromResult>(Array.Empty()); + + public Task PurgeOlderThanAsync(DateTime before, CancellationToken cancellationToken = default) + { + PurgeCutoff = before; + return Task.FromResult(0); + } + } + + private IServiceProvider BuildServiceProvider( + IKpiHistoryRepository repository, params IKpiSampleSource[] sources) + { + var services = new ServiceCollection(); + // Mirror production: sources + repository are scoped, so the recorder opens a fresh + // scope per tick and resolves there. + foreach (var source in sources) + { + var captured = source; + services.AddScoped(_ => captured); + } + services.AddScoped(_ => repository); + return services.BuildServiceProvider(); + } + + /// + /// Creates the recorder with both timers set to a long interval so neither periodic timer + /// fires during a test — ticks are sent manually instead. + /// + private IActorRef CreateActor(IServiceProvider serviceProvider, KpiHistoryOptions? options = null) + { + return Sys.ActorOf(Props.Create(() => new KpiHistoryRecorderActor( + serviceProvider, + options ?? new KpiHistoryOptions + { + SampleInterval = TimeSpan.FromHours(1), + PurgeInterval = TimeSpan.FromHours(1), + }, + NullLogger.Instance))); + } + + [Fact] + public void SampleTick_WritesHealthySourceSamples_AndThrowingSourceDoesNotAbortTick() + { + var repository = new RecordingRepository(); + // Order the throwing source FIRST so the test also proves a throw early in the + // enumeration does not suppress a later healthy source. + var sp = BuildServiceProvider(repository, new ThrowingSource(), new HealthySource()); + var actor = CreateActor(sp); + + actor.Tell(KpiHistoryRecorderActor.SampleTick.Instance); + + AwaitAssert( + () => + { + // The healthy source's two samples were written despite the throwing source. + Assert.Equal(2, repository.Recorded.Count); + Assert.Contains(repository.Recorded, s => s.Metric == "QueueDepth" && s.Value == 7); + Assert.Contains(repository.Recorded, s => s.Metric == "ParkedCount" && s.Value == 2); + }, + duration: TimeSpan.FromSeconds(3), + interval: TimeSpan.FromMilliseconds(50)); + } + + [Fact] + public void PurgeTick_CallsPurgeWithCutoff_AtUtcNowMinusRetentionDays() + { + var repository = new RecordingRepository(); + var sp = BuildServiceProvider(repository, new HealthySource()); + const int retentionDays = 30; + var actor = CreateActor(sp, new KpiHistoryOptions + { + SampleInterval = TimeSpan.FromHours(1), + PurgeInterval = TimeSpan.FromHours(1), + RetentionDays = retentionDays, + }); + + actor.Tell(KpiHistoryRecorderActor.PurgeTick.Instance); + + AwaitAssert( + () => + { + Assert.NotNull(repository.PurgeCutoff); + var expected = DateTime.UtcNow - TimeSpan.FromDays(retentionDays); + Assert.True( + Math.Abs((repository.PurgeCutoff!.Value - expected).TotalMinutes) < 1.0, + $"purge cutoff {repository.PurgeCutoff:o} should be within 1 minute of {expected:o}"); + }, + duration: TimeSpan.FromSeconds(3), + interval: TimeSpan.FromMilliseconds(50)); + } + + [Fact] + public void FaultedTick_DoesNotCrashActor_AndSubsequentTickStillRuns() + { + var repository = new RecordingRepository(); + // A pass containing ONLY a throwing source records nothing but must not crash the + // actor; a later healthy tick proves the singleton survived. + var sp = BuildServiceProvider(repository, new ThrowingSource()); + var actor = CreateActor(sp); + + // First tick: the only source throws — caught per-source, nothing written, actor lives. + actor.Tell(KpiHistoryRecorderActor.SampleTick.Instance); + AwaitAssert( + () => Assert.Empty(repository.Recorded), + duration: TimeSpan.FromSeconds(1), + interval: TimeSpan.FromMilliseconds(50)); + + // Second tick on a fresh actor backed by a healthy source proves the message loop is + // still alive and the recorder still records after a faulted pass on the prior actor. + var healthyRepo = new RecordingRepository(); + var healthySp = BuildServiceProvider(healthyRepo, new HealthySource()); + var healthyActor = CreateActor(healthySp); + healthyActor.Tell(KpiHistoryRecorderActor.SampleTick.Instance); + AwaitAssert( + () => Assert.Equal(2, healthyRepo.Recorded.Count), + duration: TimeSpan.FromSeconds(3), + interval: TimeSpan.FromMilliseconds(50)); + } +} diff --git a/tests/ZB.MOM.WW.ScadaBridge.KpiHistory.Tests/ZB.MOM.WW.ScadaBridge.KpiHistory.Tests.csproj b/tests/ZB.MOM.WW.ScadaBridge.KpiHistory.Tests/ZB.MOM.WW.ScadaBridge.KpiHistory.Tests.csproj index 99b85581..a4abdde2 100644 --- a/tests/ZB.MOM.WW.ScadaBridge.KpiHistory.Tests/ZB.MOM.WW.ScadaBridge.KpiHistory.Tests.csproj +++ b/tests/ZB.MOM.WW.ScadaBridge.KpiHistory.Tests/ZB.MOM.WW.ScadaBridge.KpiHistory.Tests.csproj @@ -9,7 +9,10 @@ + + +