using Akka.Actor;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using ZB.MOM.WW.ScadaBridge.Commons.Entities.Kpi;
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Kpi;
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories;
namespace ZB.MOM.WW.ScadaBridge.KpiHistory;
///
/// Central cluster-singleton (M6 "KPI History & Trends", K4) that drives the
/// KPI sampling backbone. On a periodic timer (default 60 s) it:
///
/// - Stamps a single capturedAtUtc for the pass.
/// - Fans out to every DI-registered ,
/// collecting each source's point-in-time samples.
/// - Bulk-writes the combined batch through
/// .
///
/// A separate daily timer (default 1 d) runs the retention purge, dropping rows
/// older than via
/// .
///
///
///
/// Best-effort. KPI history is observability — it must NEVER disrupt the
/// system it observes. Every fault is contained: a throwing
/// is caught per-source so it neither aborts the
/// pass nor suppresses the other sources; a throwing repository write/purge is
/// caught and logged. No exception escapes either tick handler, so the singleton
/// stays alive across transient DB outages and misbehaving sources.
///
///
/// Off-thread work. The actual sampling and purge I/O runs off the actor
/// thread; the result is piped back to via
///
/// so the mailbox is never blocked while a DB round-trip is in flight. This
/// mirrors the
/// timer + scope-per-tick + PipeTo pattern.
///
///
/// DI scopes. s and
/// are scoped EF Core-backed services; the
/// recorder opens a fresh per tick and resolves
/// there, asynchronously disposing it (CreateAsyncScope) so EF Core's
/// async connection cleanup does not block.
///
///
/// Singleton wiring. The Props is built in the Host (K5) on the
/// active central node with the constructor args resolved from DI — this actor
/// has no knowledge of Akka.Cluster.Tools, mirroring the Notification
/// Outbox (#21) singleton split.
///
///
public class KpiHistoryRecorderActor : ReceiveActor, IWithTimers
{
private const string SampleTimerKey = "kpi-sample";
private const string PurgeTimerKey = "kpi-purge";
private readonly IServiceProvider _serviceProvider;
private readonly KpiHistoryOptions _options;
private readonly ILogger _logger;
///
/// Lifecycle-scoped cancellation source, cancelled in so any
/// in-flight sample/purge pass observes a coordinated shutdown / failover promptly
/// instead of blocking the singleton handover for a full DB round-trip timeout.
///
private CancellationTokenSource? _shutdownCts;
/// Akka timer scheduler, assigned by the actor system via .
public ITimerScheduler Timers { get; set; } = null!;
///
/// Initializes the recorder with its dependencies and registers the tick handlers.
///
/// DI service provider used to open a scope per tick for the sample sources and repository.
/// KPI history configuration options.
/// Logger for this actor.
public KpiHistoryRecorderActor(
IServiceProvider serviceProvider,
KpiHistoryOptions options,
ILogger logger)
{
_serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
_options = options ?? throw new ArgumentNullException(nameof(options));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
Receive(_ => HandleSampleTick());
Receive(_ => { }); // best-effort: no actor state to reset on completion
Receive(_ => HandlePurgeTick());
Receive(_ => { }); // best-effort: no actor state to reset on completion
}
///
protected override void PreStart()
{
base.PreStart();
_shutdownCts = new CancellationTokenSource();
// Fire an initial sample shortly after start so the first trend point lands
// without waiting a full SampleInterval, then settle into the periodic cadence.
Timers.StartPeriodicTimer(
SampleTimerKey,
SampleTick.Instance,
initialDelay: TimeSpan.FromSeconds(5),
interval: _options.SampleInterval);
// The purge is daily and idempotent — no initial fast tick; the first sweep
// fires after a full PurgeInterval.
Timers.StartPeriodicTimer(
PurgeTimerKey,
PurgeTick.Instance,
_options.PurgeInterval);
}
///
protected override void PostStop()
{
// Cancel before disposing so an in-flight pass observes cancellation; disposing
// first would race with a pass registering against the token.
try
{
_shutdownCts?.Cancel();
}
catch (ObjectDisposedException)
{
// Already disposed under a restarted-actor race; nothing to do.
}
_shutdownCts?.Dispose();
_shutdownCts = null;
base.PostStop();
}
///
/// Handles a sample tick: captures the shared capturedAtUtc instant on the actor
/// thread, then launches the asynchronous sampling pass off-thread and pipes a
/// completion back to so the mailbox is never blocked while sources
/// are collected and the batch is written.
///
private void HandleSampleTick()
{
var capturedAt = DateTime.UtcNow;
var cancellationToken = _shutdownCts?.Token ?? CancellationToken.None;
// RunSamplePass self-isolates its faults (it never throws), but the failure
// projection is kept as a belt-and-braces guard so even a faulted task still
// produces a SampleComplete.
RunSamplePass(capturedAt, cancellationToken).PipeTo(
Self,
success: () => SampleComplete.Instance,
failure: ex =>
{
_logger.LogError(ex, "KPI sample pass faulted unexpectedly.");
return SampleComplete.Instance;
});
}
///
/// Runs a single sampling pass: opens a DI scope, enumerates every registered
/// , collects each source's samples (isolating per-source
/// faults), and bulk-writes the combined batch. The whole body is wrapped so the
/// returned task never faults — best-effort observability must never disrupt anything.
///
private async Task RunSamplePass(DateTime capturedAt, CancellationToken cancellationToken)
{
try
{
await using var scope = _serviceProvider.CreateAsyncScope();
var sources = scope.ServiceProvider.GetServices();
var repository = scope.ServiceProvider.GetRequiredService();
var samples = new List();
foreach (var source in sources)
{
try
{
var collected = await source.CollectAsync(capturedAt, cancellationToken);
samples.AddRange(collected);
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
// Shutdown interrupted collection; abandon the rest of the pass. The next
// active node samples on its own cadence. Not a failure.
return;
}
catch (Exception ex)
{
// A throwing source must NOT abort the pass or the other sources.
_logger.LogError(ex, "KPI source {Source} failed to collect samples.", source.Source);
}
}
if (samples.Count == 0)
{
return;
}
try
{
await repository.RecordSamplesAsync(samples, cancellationToken);
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
// Shutdown interrupted the write; the batch is dropped (best-effort). Not a failure.
}
catch (Exception ex)
{
_logger.LogError(ex, "KPI sample write failed for {SampleCount} sample(s).", samples.Count);
}
}
catch (Exception ex)
{
// Scope creation or service resolution faulted; swallow and log so the returned
// task completes normally and the singleton stays alive.
_logger.LogError(ex, "KPI sample pass failed unexpectedly.");
}
}
///
/// Handles a purge tick: computes the retention cut-off on the actor thread, then runs
/// the bulk delete off-thread and pipes a completion back to . Purges
/// are daily and idempotent, so no in-flight guard is needed.
///
private void HandlePurgeTick()
{
var before = DateTime.UtcNow - TimeSpan.FromDays(_options.RetentionDays);
var cancellationToken = _shutdownCts?.Token ?? CancellationToken.None;
RunPurgePass(before, cancellationToken).PipeTo(
Self,
success: deleted =>
{
if (deleted > 0)
{
_logger.LogInformation(
"KPI history purge removed {DeletedCount} sample(s) older than {Cutoff:o}.",
deleted, before);
}
return PurgeComplete.Instance;
},
failure: ex =>
{
_logger.LogError(ex, "KPI history purge faulted unexpectedly.");
return PurgeComplete.Instance;
});
}
///
/// Runs a single purge sweep: opens a DI scope, resolves the repository, and bulk-deletes
/// rows captured before , returning the deleted count. The whole
/// body is wrapped so the returned task never faults — on failure the exception is logged
/// and 0 is returned, mirroring 's best-effort contract.
///
private async Task RunPurgePass(DateTime before, CancellationToken cancellationToken)
{
try
{
await using var scope = _serviceProvider.CreateAsyncScope();
var repository = scope.ServiceProvider.GetRequiredService();
return await repository.PurgeOlderThanAsync(before, cancellationToken);
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
// Shutdown interrupted the purge; the next active sweep retries. Not a failure.
return 0;
}
catch (Exception ex)
{
_logger.LogError(ex, "KPI history purge failed unexpectedly.");
return 0;
}
}
/// Self-tick triggering a sampling pass across all registered sources.
internal sealed class SampleTick
{
public static readonly SampleTick Instance = new();
private SampleTick() { }
}
/// Piped-back completion of a sampling pass; lets the pass run off the actor thread.
internal sealed class SampleComplete
{
public static readonly SampleComplete Instance = new();
private SampleComplete() { }
}
/// Self-tick triggering a retention purge sweep.
internal sealed class PurgeTick
{
public static readonly PurgeTick Instance = new();
private PurgeTick() { }
}
/// Piped-back completion of a purge sweep; lets the sweep run off the actor thread.
internal sealed class PurgeComplete
{
public static readonly PurgeComplete Instance = new();
private PurgeComplete() { }
}
}