using Akka.Actor;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using ScadaLink.Commons.Entities.Audit;
using ScadaLink.Commons.Interfaces.Repositories;
namespace ScadaLink.AuditLog.Central;
///
/// Central singleton (M6 Bundle B) that drives the audit-log reconciliation
/// pull loop. On a configurable timer (default 5 minutes) the actor walks every
/// known site, asks the site for any rows with
/// >= the site's last reconciled
/// cursor, ingests them idempotently into the central
/// , and advances the cursor.
///
///
///
/// Self-healing telemetry, not a dispatcher. The push path
/// ( +
/// IngestAuditEvents) is the primary mechanism. This actor exists so a
/// missed push (gRPC blip, central restart, site offline) is eventually
/// repaired by central re-pulling whatever the site still has in
/// Pending/Forwarded state. Idempotency on
/// (M2 Bundle A's race-fix) makes duplicate
/// arrivals from both paths a silent no-op.
///
///
/// Cursor lifetime. The per-site LastReconciledAt watermark is
/// kept in-memory for the actor's lifetime. The cluster singleton normally
/// survives the host process; on a deliberate failover OR a singleton restart
/// the cursors reset to . That is conservative
/// but correct — the next tick simply asks for everything the site still has,
/// and idempotent ingest swallows the dupes. Persisting cursors to MS SQL was
/// considered and rejected for M6: the cost of a write per tick outweighs the
/// rare benefit of avoiding one over-broad pull after a restart.
///
///
/// Stalled detection. The brief calls a site "stalled" when two
/// consecutive pull cycles BOTH return non-empty AND MoreAvailable=true
/// — i.e. the backlog isn't draining. The actor publishes
/// on the actor system's
/// EventStream so a future ICentralHealthCollector bridge (M6 Bundle E)
/// can flip the health metric without coupling this actor to the health
/// collection surface today.
///
///
/// Failure isolation. A single site that throws (DNS, transport,
/// repository write) must NOT prevent other sites from being polled on the
/// same tick. The per-site work runs inside its own try/catch — that
/// per-site catch is what keeps the actor running across handler throws.
/// The override returns
/// (Restart
/// semantics) and governs children only; this actor has no children today,
/// so the override is a forward-compat placeholder. If it ever did fire,
/// restart would reset the in-memory cursors — but as noted above that's
/// a safe (over-pull, idempotent) recovery.
///
///
/// DI scopes. is a scoped EF Core
/// service registered by AddConfigurationDatabase. The singleton actor
/// opens one DI scope per tick and reuses the same repository across all
/// sites in that tick — one DbContext per tick mirrors the
/// AuditLogIngestActor + NotificationOutboxActor pattern.
///
///
public class SiteAuditReconciliationActor : ReceiveActor
{
private readonly ISiteEnumerator _sites;
private readonly IPullAuditEventsClient _client;
private readonly IServiceProvider _services;
private readonly SiteAuditReconciliationOptions _options;
private readonly ILogger _logger;
///
/// Per-site reconciliation watermark — the highest
/// seen for that site on a previous
/// tick. Asking for OccurredAtUtc >= cursor rather than >
/// is the site contract ();
/// duplicate-with-same-timestamp rows are filtered out by the idempotent
/// repository write.
///
private readonly Dictionary _cursors = new();
///
/// Per-site count of consecutive non-draining cycles. Resets to zero on the
/// first draining (or empty) cycle.
///
private readonly Dictionary _nonDrainingCycles = new();
///
/// Per-site latched stalled state — used so the actor only publishes a
/// transition when the
/// stalled flag actually changes, not on every tick while stalled.
///
private readonly Dictionary _stalled = new();
///
/// AuditLog-004: per-EventId retry counter for rows whose central insert
/// threw. While a row keeps failing AND is below
/// , the cursor is held back so the
/// next reconciliation tick re-pulls and retries the row. Crossing the
/// threshold logs Critical and permanently abandons the row (cursor
/// advances past it) so a truly broken row cannot block all subsequent
/// progress for a site. The counter is in-memory only — singleton restart
/// resets it, which is safe because the cursor also resets on restart and
/// the next tick re-pulls everything.
///
private readonly Dictionary _failedInsertAttempts = new();
///
/// AuditLog-004: number of consecutive central-insert failures before a row
/// is permanently abandoned with a Critical log entry and the cursor is
/// allowed to advance past it. Five attempts at the 5-minute default tick
/// is ~25 min of retry budget before a stuck row stops blocking progress.
///
private const int MaxPermanentInsertAttempts = 5;
private ICancelable? _timer;
///
/// Initializes the reconciliation actor with its dependencies and registers the tick handler.
///
/// Enumerates the known sites to reconcile.
/// Client used to pull audit events from individual sites.
/// Root service provider for opening a per-tick DI scope.
/// Reconciliation configuration (interval, page size).
/// Logger for reconciliation diagnostics.
public SiteAuditReconciliationActor(
ISiteEnumerator sites,
IPullAuditEventsClient client,
IServiceProvider services,
IOptions options,
ILogger logger)
{
ArgumentNullException.ThrowIfNull(sites);
ArgumentNullException.ThrowIfNull(client);
ArgumentNullException.ThrowIfNull(services);
ArgumentNullException.ThrowIfNull(options);
ArgumentNullException.ThrowIfNull(logger);
_sites = sites;
_client = client;
_services = services;
_options = options.Value;
_logger = logger;
ReceiveAsync(_ => OnTickAsync());
}
///
protected override void PreStart()
{
base.PreStart();
var interval = _options.ReconciliationInterval;
_timer = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(
initialDelay: interval,
interval: interval,
receiver: Self,
message: ReconciliationTick.Instance,
sender: Self);
}
///
protected override void PostStop()
{
_timer?.Cancel();
base.PostStop();
}
private async Task OnTickAsync()
{
// Capture EventStream BEFORE the first await. Accessing Context (and
// therefore Context.System) after an await is unsafe because Akka's
// ActorBase.Context throws "no active ActorContext" once the
// continuation runs on a thread that isn't currently dispatching this
// actor — mirrors the AuditLogPurgeActor.OnTickAsync fix and the
// AuditLogIngestActor.OnIngestAsync Sender-capture pattern.
var eventStream = Context.System.EventStream;
IReadOnlyList sites;
try
{
sites = await _sites.EnumerateAsync().ConfigureAwait(false);
}
catch (Exception ex)
{
_logger.LogError(ex, "Site enumeration failed; skipping reconciliation tick.");
return;
}
if (sites.Count == 0)
{
return;
}
IServiceScope? scope = null;
IAuditLogRepository repository;
try
{
scope = _services.CreateScope();
repository = scope.ServiceProvider.GetRequiredService();
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to resolve IAuditLogRepository for reconciliation tick.");
scope?.Dispose();
return;
}
try
{
foreach (var site in sites)
{
try
{
await PullSiteAsync(site, repository, eventStream).ConfigureAwait(false);
}
catch (Exception ex)
{
// Catch-all per the failure-isolation invariant: one site's
// fault must not sink the rest of the tick. The cursor for
// the failing site is left at its previous value so the
// next tick retries the same window.
_logger.LogWarning(
ex,
"Reconciliation pull failed for site {SiteId}; other sites continue.",
site.SiteId);
}
}
}
finally
{
scope.Dispose();
}
}
///
/// Issues one PullAuditEvents RPC against the site, ingests the
/// returned rows idempotently into the central repository, and advances
/// the cursor based on the maximum
/// observed. The brief's "saturate until backlog clears" intent is met by
/// the natural cadence — each tick issues one pull, and a backed-up site
/// drains across consecutive ticks. The stalled signal (two non-draining
/// ticks in a row) surfaces when that drain isn't keeping up.
///
private async Task PullSiteAsync(SiteEntry site, IAuditLogRepository repository, Akka.Event.EventStream eventStream)
{
var since = _cursors.TryGetValue(site.SiteId, out var c) ? c : DateTime.MinValue;
var response = await _client.PullAsync(
site.SiteId, since, _options.BatchSize, CancellationToken.None)
.ConfigureAwait(false);
var maxOccurred = since;
var hasUnresolvedFailure = false;
var nowUtc = DateTime.UtcNow;
foreach (var evt in response.Events)
{
var advanceForThisRow = false;
try
{
// Idempotent repository write: duplicate EventIds (from a
// concurrent push, or a retry of this very pull) collapse to
// a no-op courtesy of M2 Bundle A's race-fix on
// InsertIfNotExistsAsync.
var ingested = evt with { IngestedAtUtc = nowUtc };
await repository.InsertIfNotExistsAsync(ingested).ConfigureAwait(false);
_failedInsertAttempts.Remove(evt.EventId);
advanceForThisRow = true;
}
catch (Exception ex)
{
// AuditLog-004: per-row catch so one bad event does not abandon
// the rest of the batch. Track the failure count per EventId —
// below MaxPermanentInsertAttempts the cursor is HELD BACK so
// the next tick re-pulls and retries; at the threshold the row
// is permanently abandoned (LogCritical + cursor advances past)
// to keep a truly broken row from blocking all subsequent
// progress for the site.
var attempts = _failedInsertAttempts.GetValueOrDefault(evt.EventId) + 1;
_failedInsertAttempts[evt.EventId] = attempts;
if (attempts >= MaxPermanentInsertAttempts)
{
_logger.LogCritical(
ex,
"Permanently abandoning AuditEvent {EventId} from site {SiteId} after {Attempts} consecutive insert failures; cursor will advance past it.",
evt.EventId,
site.SiteId,
attempts);
_failedInsertAttempts.Remove(evt.EventId);
advanceForThisRow = true;
}
else
{
_logger.LogError(
ex,
"Reconciliation ingest failed for AuditEvent {EventId} from site {SiteId} (attempt {Attempts}/{Max}); cursor held back for retry.",
evt.EventId,
site.SiteId,
attempts,
MaxPermanentInsertAttempts);
hasUnresolvedFailure = true;
}
}
if (advanceForThisRow && evt.OccurredAtUtc > maxOccurred)
{
maxOccurred = evt.OccurredAtUtc;
}
}
// AuditLog-004: only advance the persisted cursor if no event in this
// batch is still being retried. Leaving the cursor at `since` re-pulls
// the whole batch next tick — successful rows are no-ops thanks to
// InsertIfNotExistsAsync's idempotency, and the failing row gets
// another attempt. Once it succeeds (or hits the permanent-abandon
// threshold) the cursor unblocks naturally.
_cursors[site.SiteId] = hasUnresolvedFailure ? since : maxOccurred;
var nonDraining = response.MoreAvailable && response.Events.Count > 0;
UpdateStalledState(site.SiteId, draining: !nonDraining, eventStream);
}
///
/// Flips the per-site stalled flag based on whether this tick drained the
/// queue. A "draining" cycle is one where the server reported no more rows
/// available OR returned zero events. A "non-draining" cycle is the
/// inverse (events returned AND MoreAvailable=true).
///
///
/// The state machine: counter increments on each consecutive non-draining
/// tick. On reaching
/// the actor latches Stalled=true and publishes the transition; on
/// any subsequent draining tick the counter resets to zero AND, if the
/// latch is currently true, the actor publishes Stalled=false. Only
/// transitions are published — repeated ticks in the same state are
/// silent so a downstream subscriber doesn't see a flood of redundant
/// notifications.
///
private void UpdateStalledState(string siteId, bool draining, Akka.Event.EventStream eventStream)
{
var wasStalled = _stalled.TryGetValue(siteId, out var prior) && prior;
if (draining)
{
_nonDrainingCycles[siteId] = 0;
if (wasStalled)
{
_stalled[siteId] = false;
eventStream.Publish(
new SiteAuditTelemetryStalledChanged(siteId, Stalled: false));
}
return;
}
var consecutive = _nonDrainingCycles.GetValueOrDefault(siteId) + 1;
_nonDrainingCycles[siteId] = consecutive;
if (consecutive >= _options.StalledAfterNonDrainingCycles && !wasStalled)
{
_stalled[siteId] = true;
eventStream.Publish(
new SiteAuditTelemetryStalledChanged(siteId, Stalled: true));
}
}
///
protected override SupervisorStrategy SupervisorStrategy()
{
return new OneForOneStrategy(
maxNrOfRetries: 0,
withinTimeRange: TimeSpan.Zero,
decider: Akka.Actor.SupervisorStrategy.DefaultDecider);
}
/// Self-tick triggering a reconciliation pass across all sites.
internal sealed class ReconciliationTick
{
public static readonly ReconciliationTick Instance = new();
private ReconciliationTick() { }
}
}
///
/// Published on the actor system EventStream when a site's reconciliation
/// puller transitions into or out of the "stalled" state (backlog not
/// draining across multiple cycles). The M6 Bundle E central health collector
/// will subscribe to this and surface
/// SiteAuditTelemetryStalled on the health-report payload.
///
public sealed record SiteAuditTelemetryStalledChanged(string SiteId, bool Stalled);