Files
ScadaBridge/src/ZB.MOM.WW.ScadaBridge.SiteCallAudit/SiteCallAuditActor.cs
T

1148 lines
50 KiB
C#

using Akka.Actor;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using ZB.MOM.WW.ScadaBridge.AuditLog.Central;
using ZB.MOM.WW.ScadaBridge.Commons.Entities.Audit;
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.Audit;
using ZB.MOM.WW.ScadaBridge.Commons.Messages.RemoteQuery;
using ZB.MOM.WW.ScadaBridge.Commons.Types;
using ZB.MOM.WW.ScadaBridge.Commons.Types.Audit;
using ZB.MOM.WW.ScadaBridge.Communication;
namespace ZB.MOM.WW.ScadaBridge.SiteCallAudit;
/// <summary>
/// Central singleton for Site Call Audit (#22). Receives
/// <see cref="UpsertSiteCallCommand"/> messages and persists each
/// <see cref="ZB.MOM.WW.ScadaBridge.Commons.Entities.Audit.SiteCall"/> row via
/// <see cref="ISiteCallAuditRepository.UpsertAsync"/> — idempotent monotonic
/// upsert. Out-of-order or duplicate updates are silent no-ops at the
/// repository layer; the actor always replies <see cref="UpsertSiteCallReply"/>
/// with <c>Accepted=true</c> in that case because storage state is consistent
/// and the site is free to consider its packet acked.
/// </summary>
/// <remarks>
/// <para>
/// Implemented: direct <see cref="UpsertSiteCallCommand"/> telemetry ingest,
/// query, detail and KPI handlers (Task 4), the central→site Retry/Discard
/// relay (Task 5 — the relay handlers live in this actor), the periodic
/// per-site reconciliation puller that backfills lost telemetry (Piece A —
/// <see cref="OnReconciliationTickAsync"/>, the documented self-heal pull), and
/// the daily terminal-row purge scheduler (Piece B —
/// <see cref="OnPurgeTickAsync"/>, which invokes
/// <see cref="ISiteCallAuditRepository.PurgeTerminalAsync"/> on a timer). Both
/// background timers are started in <see cref="PreStart"/> and gate on the
/// reconciliation collaborators (<see cref="IPullSiteCallsClient"/> +
/// <see cref="ISiteEnumerator"/>) being available — the repo-only test ctor
/// injects neither, so neither timer runs there.
/// </para>
/// <para>
/// Per CLAUDE.md "audit-write failure NEVER aborts the user-facing action" —
/// the actor catches every exception from the repository call and replies
/// <c>Accepted=false</c> without rethrowing, so the central singleton stays
/// alive. The <see cref="SupervisorStrategy"/> override governs the actor's
/// <em>children</em>, not the actor itself; this actor has no children today,
/// so the override is currently inert. It returns a one-for-one strategy with
/// <see cref="Akka.Actor.SupervisorStrategy.DefaultDecider"/> (Restart on most
/// exceptions, Stop on <see cref="ActorInitializationException"/> /
/// <see cref="ActorKilledException"/>) and <c>maxNrOfRetries: 0</c>, so any
/// future child that throws is Stopped on the first failure — a deliberate
/// "fail loudly" posture for the central singleton's eventual sub-actors
/// (reconciliation puller, purge scheduler). Self-supervision of this actor
/// is whatever the parent <see cref="Akka.Cluster.Tools.Singleton.ClusterSingletonManager"/>
/// supplies; the in-handler <c>try/catch</c> in <see cref="OnUpsertAsync"/>
/// is what actually keeps the singleton alive across repository faults.
/// </para>
/// <para>
/// Two constructors exist for the same reason as
/// <c>AuditLogIngestActor</c>: production wiring (Bundle F) resolves the
/// scoped EF repository from a fresh DI scope per message because the actor
/// is a long-lived cluster singleton, while tests inject a concrete
/// <see cref="ISiteCallAuditRepository"/> against a per-test MSSQL fixture
/// so the actor exercises the real monotonic upsert SQL end to end.
/// </para>
/// </remarks>
public class SiteCallAuditActor : ReceiveActor
{
/// <summary>Maximum page size honoured by a <see cref="SiteCallQueryRequest"/>.</summary>
private const int MaxPageSize = 200;
private readonly IServiceProvider? _serviceProvider;
private readonly ISiteCallAuditRepository? _injectedRepository;
private readonly SiteCallAuditOptions _options;
private readonly ILogger<SiteCallAuditActor> _logger;
/// <summary>
/// Reconciliation collaborators (Piece A). The per-site self-heal pull
/// (<see cref="IPullSiteCallsClient"/>) and the site list
/// (<see cref="ISiteEnumerator"/>). On the production path these are
/// resolved once from the root <see cref="IServiceProvider"/> (central
/// singletons registered by <c>AddAuditLogCentralReconciliationClient</c>);
/// in the test path they are injected directly. They are <c>null</c> when
/// the actor was built via the repo-only test ctor — in that case the
/// reconciliation tick is NOT started (see <see cref="StartReconciliationTimer"/>);
/// the purge tick gates on the same collaborators (see <see cref="StartPurgeTimer"/>).
/// </summary>
private readonly IPullSiteCallsClient? _pullClient;
private readonly ISiteEnumerator? _siteEnumerator;
/// <summary>
/// Per-site reconciliation watermark — the highest
/// <see cref="SiteCall.UpdatedAtUtc"/> seen for that site on a previous
/// tick. The next tick asks for rows at or after this cursor; idempotent
/// monotonic <see cref="ISiteCallAuditRepository.UpsertAsync"/> swallows any
/// duplicate-with-same-timestamp rows. In-memory for the singleton's
/// lifetime — a failover / restart resets every cursor to
/// <see cref="DateTime.MinValue"/>, which is conservative but correct
/// (the next tick re-pulls and idempotent upsert dedupes). Mirrors
/// <c>SiteAuditReconciliationActor</c>.
/// </summary>
private readonly Dictionary<string, DateTime> _reconciliationCursors = new();
private ICancelable? _reconciliationTimer;
private ICancelable? _purgeTimer;
/// <summary>
/// Task 5 (#22): the central→site command transport — the
/// <c>CentralCommunicationActor</c>, which owns the per-site
/// <c>ClusterClient</c> map and routes a <see cref="SiteEnvelope"/> to the
/// owning site. Set via <see cref="RegisterCentralCommunication"/> by the
/// Host after both actors exist (this actor is a cluster singleton; the
/// transport actor is created separately). Null until registration
/// completes — a relay arriving before then is answered with a
/// <see cref="SiteCallRelayOutcome.SiteUnreachable"/> outcome, because there
/// is genuinely no route to any site yet.
/// </summary>
private IActorRef? _centralCommunication;
/// <summary>
/// Test-mode constructor — injects a concrete repository instance whose
/// lifetime exceeds the test, so the actor reuses the same instance
/// across every message. Used by Bundle C's MSSQL-backed TestKit fixture.
/// An optional <paramref name="options"/> lets a test pin the stuck/KPI
/// windows; when omitted the production defaults apply.
/// <para>
/// This ctor injects NO reconciliation client/enumerator, so the
/// reconciliation tick is gated off (see <see cref="StartReconciliationTimer"/>)
/// — the MSSQL-backed read/upsert tests must not fire phantom pulls.
/// </para>
/// </summary>
/// <param name="repository">Concrete repository instance to use for all messages.</param>
/// <param name="logger">Logger for diagnostics and error reporting.</param>
/// <param name="options">Optional configuration overrides; production defaults apply when null.</param>
public SiteCallAuditActor(
ISiteCallAuditRepository repository,
ILogger<SiteCallAuditActor> logger,
SiteCallAuditOptions? options = null)
{
ArgumentNullException.ThrowIfNull(repository);
ArgumentNullException.ThrowIfNull(logger);
_injectedRepository = repository;
_logger = logger;
_options = options ?? new SiteCallAuditOptions();
RegisterHandlers();
}
/// <summary>
/// Test-mode constructor for the reconciliation tick (Piece A) — injects a
/// concrete repository PLUS the two reconciliation collaborators directly,
/// so the per-site self-heal pull is unit-testable in-memory without a DI
/// container or a live gRPC channel. Because the client + enumerator are
/// present, the reconciliation tick IS started; the purge tick is also
/// started (both gate on the collaborators being available — see
/// <see cref="StartReconciliationTimer"/> / <see cref="StartPurgeTimer"/>).
/// </summary>
/// <param name="repository">Concrete repository instance used for upserts and purges.</param>
/// <param name="siteEnumerator">Enumerates the sites to reconcile each tick.</param>
/// <param name="pullClient">Pull client used to fetch changed rows from each site.</param>
/// <param name="logger">Logger for diagnostics and error reporting.</param>
/// <param name="options">Optional configuration overrides; production defaults apply when null.</param>
/// <remarks>
/// Public (not internal) because Akka's default <c>ActivatorProducer</c>
/// instantiates the actor via reflection with public-only binding flags —
/// an internal ctor yields a <c>MissingMethodException</c> at actor
/// creation. Distinguished from the production <see cref="IServiceProvider"/>
/// ctor by its concrete-collaborator parameter list; only the test project
/// (or a host that hand-resolves the collaborators) constructs it this way.
/// </remarks>
public SiteCallAuditActor(
ISiteCallAuditRepository repository,
ISiteEnumerator siteEnumerator,
IPullSiteCallsClient pullClient,
ILogger<SiteCallAuditActor> logger,
SiteCallAuditOptions? options = null)
{
ArgumentNullException.ThrowIfNull(repository);
ArgumentNullException.ThrowIfNull(siteEnumerator);
ArgumentNullException.ThrowIfNull(pullClient);
ArgumentNullException.ThrowIfNull(logger);
_injectedRepository = repository;
_siteEnumerator = siteEnumerator;
_pullClient = pullClient;
_logger = logger;
_options = options ?? new SiteCallAuditOptions();
RegisterHandlers();
}
/// <summary>
/// Production constructor — resolves <see cref="ISiteCallAuditRepository"/>
/// from a fresh DI scope per message because the repository is a scoped EF
/// Core service registered by <c>AddConfigurationDatabase</c>. The actor
/// itself is a long-lived cluster singleton, so it cannot hold a scope
/// across messages.
/// </summary>
/// <param name="serviceProvider">DI service provider used to create a scoped repository per message.</param>
/// <param name="options">Actor configuration (stuck threshold, KPI interval, relay timeout).</param>
/// <param name="logger">Logger for diagnostics and error reporting.</param>
public SiteCallAuditActor(
IServiceProvider serviceProvider,
SiteCallAuditOptions options,
ILogger<SiteCallAuditActor> logger)
{
ArgumentNullException.ThrowIfNull(serviceProvider);
ArgumentNullException.ThrowIfNull(options);
ArgumentNullException.ThrowIfNull(logger);
_serviceProvider = serviceProvider;
_options = options;
_logger = logger;
// Reconciliation collaborators (Piece A) are central-only singletons
// registered by AddAuditLogCentralReconciliationClient — always on the
// central composition root (Program.cs). Resolve them once here (the
// actor itself is a long-lived singleton; the repository is the only
// scoped service and is still resolved per-tick/per-message). GetService
// (not GetRequiredService) so a host that somehow omits the helper
// degrades to "no reconciliation tick" rather than a startup crash —
// the tick startup gates on both being non-null.
_pullClient = serviceProvider.GetService<IPullSiteCallsClient>();
_siteEnumerator = serviceProvider.GetService<ISiteEnumerator>();
RegisterHandlers();
}
/// <summary>
/// Wires up the message handlers shared by both constructors: the M3
/// ingest path plus the Task 4 read-side (query, detail, global + per-site
/// KPI). All read handlers reply to an Ask, so they capture <c>Sender</c>
/// before the first await and <c>PipeTo</c> the result back.
/// </summary>
private void RegisterHandlers()
{
ReceiveAsync<UpsertSiteCallCommand>(OnUpsertAsync);
Receive<SiteCallQueryRequest>(HandleQuery);
Receive<SiteCallDetailRequest>(HandleDetail);
Receive<SiteCallKpiRequest>(HandleKpi);
Receive<PerSiteSiteCallKpiRequest>(HandlePerSiteKpi);
Receive<PerNodeSiteCallKpiRequest>(HandlePerNodeKpi);
// Task 5 (#22): central→site Retry/Discard relay for parked cached calls.
Receive<RegisterCentralCommunication>(msg =>
{
_centralCommunication = msg.CentralCommunication;
_logger.LogInformation("SiteCallAudit registered central→site communication transport");
});
Receive<RetrySiteCallRequest>(HandleRetrySiteCall);
Receive<DiscardSiteCallRequest>(HandleDiscardSiteCall);
// Piece A/B (#22): self-ticks for the periodic reconciliation pull and
// the daily terminal-row purge. Handlers stay alive across faults via
// their own per-site / per-tick try/catch (mirroring the ingest path);
// the timers are only started when their collaborators are available.
ReceiveAsync<ReconciliationTick>(_ => OnReconciliationTickAsync());
ReceiveAsync<PurgeTick>(_ => OnPurgeTickAsync());
}
/// <inheritdoc />
protected override void PreStart()
{
base.PreStart();
StartReconciliationTimer();
StartPurgeTimer();
}
/// <inheritdoc />
protected override void PostStop()
{
_reconciliationTimer?.Cancel();
_purgeTimer?.Cancel();
base.PostStop();
}
/// <summary>
/// Starts the periodic reconciliation tick — but ONLY when both the pull
/// client and the site enumerator are available. The repo-only test ctor
/// injects neither, so the tick is gated off there (the MSSQL read/upsert
/// tests must not fire phantom pulls); the reconciliation test ctor and the
/// production ctor (which resolves both from the SP) start it.
/// </summary>
private void StartReconciliationTimer()
{
if (_pullClient is null || _siteEnumerator is null)
{
return;
}
var interval = _options.ResolvedReconciliationInterval;
_reconciliationTimer = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(
initialDelay: interval,
interval: interval,
receiver: Self,
message: ReconciliationTick.Instance,
sender: Self);
}
/// <summary>
/// Starts the daily purge tick — gated on the same collaborator presence as
/// the reconciliation tick. The purge itself only needs the repository, but
/// gating both schedulers together keeps the repo-only test ctor (no
/// client/enumerator) free of BOTH background timers, so the MSSQL read/
/// upsert tests see no scheduled side effects.
/// </summary>
private void StartPurgeTimer()
{
if (_pullClient is null || _siteEnumerator is null)
{
return;
}
var interval = _options.ResolvedPurgeInterval;
_purgeTimer = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(
initialDelay: interval,
interval: interval,
receiver: Self,
message: PurgeTick.Instance,
sender: Self);
}
/// <inheritdoc />
protected override SupervisorStrategy SupervisorStrategy()
{
return new OneForOneStrategy(maxNrOfRetries: 0, withinTimeRange: TimeSpan.Zero, decider:
Akka.Actor.SupervisorStrategy.DefaultDecider);
}
private async Task OnUpsertAsync(UpsertSiteCallCommand cmd)
{
// Sender is captured before the first await — Akka resets Sender
// between message dispatches, so a post-await Tell would go to
// DeadLetters.
var replyTo = Sender;
var id = cmd.SiteCall.TrackedOperationId;
// Scope-per-message mirrors AuditLogIngestActor — production EF
// repository is scoped; the injected-repository mode (tests) skips
// the scope entirely.
IServiceScope? scope = null;
ISiteCallAuditRepository repository;
if (_injectedRepository is not null)
{
repository = _injectedRepository;
}
else
{
scope = _serviceProvider!.CreateScope();
repository = scope.ServiceProvider.GetRequiredService<ISiteCallAuditRepository>();
}
try
{
// SiteCallAudit-003: stamp IngestedAtUtc at central-side persist
// time on every upsert, mirroring AuditLogIngestActor's combined-
// telemetry hot path. IngestedAtUtc is the "central ingested (or
// last refreshed) this row" timestamp; callers (telemetry,
// future reconciliation puller, direct-writes) cannot in general
// know they are running on central, so the actor owns the stamp.
var siteCall = cmd.SiteCall with { IngestedAtUtc = DateTime.UtcNow };
await repository.UpsertAsync(siteCall).ConfigureAwait(false);
replyTo.Tell(new UpsertSiteCallReply(id, Accepted: true));
}
catch (Exception ex)
{
// Per CLAUDE.md: audit-write failure NEVER aborts the user-facing
// action — log and reply Accepted=false; do NOT rethrow (the
// central singleton MUST stay alive).
_logger.LogError(ex, "SiteCallAudit upsert failed for {TrackedOperationId}", id);
replyTo.Tell(new UpsertSiteCallReply(id, Accepted: false));
}
finally
{
scope?.Dispose();
}
}
// ── Piece A: periodic per-site reconciliation pull (self-heal) ──
/// <summary>
/// One reconciliation pass: enumerate every known site and, per site, pull
/// changed <see cref="SiteCall"/> rows since that site's cursor and upsert
/// them idempotently — the documented self-heal when best-effort gRPC push
/// telemetry is lost. This is a mirror, NOT a dispatcher: cached-call
/// delivery stays site-local; upserting reconciled rows only refreshes the
/// eventually-consistent central <c>SiteCalls</c> mirror.
/// </summary>
/// <remarks>
/// Mirrors <c>SiteAuditReconciliationActor</c>'s structure (per-site cursor,
/// per-site try/catch failure isolation, advance the cursor by the max
/// observed <see cref="SiteCall.UpdatedAtUtc"/>) but is deliberately simpler:
/// no stalled-detection EventStream machinery — just cursor + pull + upsert
/// + advance. One DI scope per tick is opened and the same repository reused
/// across every site in that tick.
/// </remarks>
private async Task OnReconciliationTickAsync()
{
// The collaborators are guaranteed non-null: the tick is only scheduled
// when both are present (StartReconciliationTimer). Assert via the
// local copies so a future refactor that drops the gate fails loudly.
var enumerator = _siteEnumerator!;
var client = _pullClient!;
IReadOnlyList<SiteEntry> sites;
try
{
// No ambient CancellationToken in a ReceiveActor handler — None is
// intentional; the work is bounded by the reconciliation interval
// plus the singleton's graceful-stop drain on PhaseClusterLeave.
sites = await enumerator.EnumerateAsync().ConfigureAwait(false);
}
catch (Exception ex)
{
_logger.LogError(ex, "SiteCallAudit site enumeration failed; skipping reconciliation tick.");
return;
}
if (sites.Count == 0)
{
return;
}
// AuditLog-003: open the scope INLINE with CreateAsyncScope + await using
// so the scoped EF Core repository (an IAsyncDisposable DbContext) disposes
// asynchronously at end of tick rather than blocking the Akka dispatcher
// thread on a synchronous Dispose() of pending connection cleanup — the tick
// holds the scope across many awaited UpsertAsync calls. Mirrors the sibling
// SiteAuditReconciliationActor.OnTickAsync. ResolveRepository() (sync Dispose)
// is retained for the synchronous message-handler paths. In the injected-
// repository test path there is no scope to open and the test repo is reused.
if (_injectedRepository is not null)
{
await ReconcileSitesAsync(sites, client, _injectedRepository).ConfigureAwait(false);
return;
}
await using var scope = _serviceProvider!.CreateAsyncScope();
var repository = scope.ServiceProvider.GetRequiredService<ISiteCallAuditRepository>();
await ReconcileSitesAsync(sites, client, repository).ConfigureAwait(false);
}
/// <summary>
/// Reconciles every site in the tick against a single resolved repository,
/// isolating per-site faults so one bad site never sinks the rest of the
/// pass (the failing site's cursor is left at its previous value so the next
/// tick retries the same window).
/// </summary>
private async Task ReconcileSitesAsync(
IReadOnlyList<SiteEntry> sites, IPullSiteCallsClient client, ISiteCallAuditRepository repository)
{
foreach (var site in sites)
{
try
{
await ReconcileSiteAsync(site, client, repository).ConfigureAwait(false);
}
catch (Exception ex)
{
// Failure-isolation invariant: one site's fault (transport,
// repository write) must NOT sink the rest of the tick. The
// failing site's cursor is left at its previous value so the
// next tick retries the same window.
_logger.LogWarning(
ex,
"SiteCallAudit reconciliation pull failed for site {SiteId}; other sites continue.",
site.SiteId);
}
}
}
/// <summary>
/// Issues one <c>PullSiteCalls</c> RPC against the site, upserts the
/// returned rows idempotently, and advances the site's cursor to the maximum
/// <see cref="SiteCall.UpdatedAtUtc"/> observed. The pull client returns rows
/// oldest-first with <c>SourceSite</c> already re-stamped from the dialed
/// site id, so the actor upserts them verbatim (re-stamping
/// <c>IngestedAtUtc</c> at central persist time, as the telemetry path does).
/// </summary>
/// <remarks>
/// <para>
/// <b>Coarse per-site retry — a deliberate divergence from
/// <c>SiteAuditReconciliationActor</c>.</b> That sibling (AuditLog-004) tracks
/// a per-EventId attempt counter and permanently abandons a row after a
/// threshold so a single un-insertable row cannot block a site's cursor
/// forever. This actor deliberately does NOT: any throw inside the loop
/// propagates to <see cref="OnReconciliationTickAsync"/>'s per-site catch,
/// which leaves the site's cursor at its previous value, so the next tick
/// re-pulls the whole batch from <c>since</c>. A persistently-bad row therefore
/// holds the site's cursor and re-pulls the batch every tick. This is
/// acceptable here because <see cref="ISiteCallAuditRepository.UpsertAsync"/> is
/// monotonic and idempotent — re-pulling already-ingested rows is a cheap
/// no-op — and the <c>SiteCalls</c> table is an eventually-consistent mirror,
/// not the source of truth, so a slow site simply lags rather than corrupts.
/// </para>
/// <para>
/// <b>Inclusive cursor boundary.</b> The cursor is advanced to the maximum
/// <see cref="SiteCall.UpdatedAtUtc"/> seen, and the pull asks for rows at or
/// after it (<c>since</c> is <c>&gt;=</c>, not <c>&gt;</c>). The row whose
/// timestamp equals the cursor is therefore re-pulled on the next tick and
/// deduplicated by the idempotent monotonic upsert — the same inclusive-boundary
/// contract as <c>SiteAuditReconciliationActor</c>'s cursor.
/// </para>
/// </remarks>
private async Task ReconcileSiteAsync(
SiteEntry site, IPullSiteCallsClient client, ISiteCallAuditRepository repository)
{
var since = _reconciliationCursors.TryGetValue(site.SiteId, out var c) ? c : DateTime.MinValue;
var response = await client
.PullAsync(site.SiteId, since, _options.ReconciliationBatchSize, CancellationToken.None)
.ConfigureAwait(false);
var maxUpdated = since;
var nowUtc = DateTime.UtcNow;
foreach (var row in response.SiteCalls)
{
// IngestedAtUtc is the "central ingested (or last refreshed) this
// row" stamp — owned by the central actor, exactly as OnUpsertAsync
// does for the telemetry path. Monotonic UpsertAsync makes a row
// already present (from a prior push) a silent no-op.
var siteCall = row with { IngestedAtUtc = nowUtc };
await repository.UpsertAsync(siteCall).ConfigureAwait(false);
if (row.UpdatedAtUtc > maxUpdated)
{
maxUpdated = row.UpdatedAtUtc;
}
}
// Advance the cursor to the newest row seen. A MoreAvailable response
// means the site saturated the batch; the next tick continues draining
// from the advanced cursor (no immediate re-pull loop — the natural
// tick cadence drains the backlog, matching SiteAuditReconciliationActor).
_reconciliationCursors[site.SiteId] = maxUpdated;
}
// ── Piece B: daily terminal-row purge scheduler ──
/// <summary>
/// One purge pass: drops terminal <c>SiteCalls</c> rows whose
/// <see cref="SiteCall.TerminalAtUtc"/> is older than
/// <c>UtcNow - RetentionDays</c> via
/// <see cref="ISiteCallAuditRepository.PurgeTerminalAsync"/>. Non-terminal
/// rows are never purged (enforced in the repository). The threshold is
/// computed each tick so an operator who lowers <c>RetentionDays</c> sees it
/// applied on the next purge without an actor restart. Mirrors
/// <c>AuditLogPurgeActor</c>'s daily cadence + continue-on-error posture: a
/// purge fault is logged and swallowed so the singleton stays alive.
/// </summary>
private async Task OnPurgeTickAsync()
{
var threshold = DateTime.UtcNow - TimeSpan.FromDays(_options.RetentionDays);
// AuditLog-003: open the scope INLINE with CreateAsyncScope + await using
// so the scoped EF Core repository (an IAsyncDisposable DbContext) disposes
// asynchronously rather than blocking the Akka dispatcher thread on a
// synchronous Dispose(). Mirrors SiteAuditReconciliationActor; the
// injected-repository test path reuses the test repo with no scope.
if (_injectedRepository is not null)
{
await PurgeWithRepositoryAsync(_injectedRepository, threshold).ConfigureAwait(false);
return;
}
await using var scope = _serviceProvider!.CreateAsyncScope();
var repository = scope.ServiceProvider.GetRequiredService<ISiteCallAuditRepository>();
await PurgeWithRepositoryAsync(repository, threshold).ConfigureAwait(false);
}
/// <summary>
/// Runs one terminal-row purge against the resolved repository, logging and
/// swallowing any fault (continue-on-error) so a transient SQL failure or
/// contention never crashes the central singleton — the next tick retries
/// the same window.
/// </summary>
private async Task PurgeWithRepositoryAsync(ISiteCallAuditRepository repository, DateTime threshold)
{
try
{
var rowsDeleted = await repository.PurgeTerminalAsync(threshold).ConfigureAwait(false);
if (rowsDeleted > 0)
{
_logger.LogInformation(
"SiteCallAudit purged {RowsDeleted} terminal SiteCalls rows older than {ThresholdUtc:o}.",
rowsDeleted,
threshold);
}
}
catch (Exception ex)
{
// Continue-on-error: a purge fault (transient SQL failure,
// contention) must NOT crash the central singleton. The next tick
// retries the same window.
_logger.LogError(
ex,
"SiteCallAudit terminal-row purge failed (threshold {ThresholdUtc:o}); will retry next tick.",
threshold);
}
}
// ── Task 4: read-side (query / detail / KPI) ──
/// <summary>
/// Handles a paginated, filtered query over the <c>SiteCalls</c> table.
/// Builds a <see cref="SiteCallQueryFilter"/> + <see cref="SiteCallPaging"/>
/// keyset cursor from the request, runs the query on a scoped repository,
/// and pipes the mapped response back to the captured sender. A repository
/// fault yields a failure response with an empty list.
/// </summary>
private void HandleQuery(SiteCallQueryRequest request)
{
var sender = Sender;
var now = DateTime.UtcNow;
QueryAsync(request, now).PipeTo(
sender,
success: response => response,
failure: ex => new SiteCallQueryResponse(
request.CorrelationId,
Success: false,
ErrorMessage: ex.GetBaseException().Message,
SiteCalls: Array.Empty<SiteCallSummary>(),
NextAfterCreatedAtUtc: null,
NextAfterId: null));
}
private async Task<SiteCallQueryResponse> QueryAsync(SiteCallQueryRequest request, DateTime now)
{
var stuckCutoff = now - _options.StuckAgeThreshold;
var filter = new SiteCallQueryFilter(
Channel: NullIfBlank(request.ChannelFilter),
SourceSite: NullIfBlank(request.SourceSiteFilter),
Status: NullIfBlank(request.StatusFilter),
Target: NullIfBlank(request.TargetKeyword),
FromUtc: request.FromUtc,
ToUtc: request.ToUtc,
// StuckOnly is pushed into the repository SQL via StuckCutoffUtc —
// TerminalAtUtc IS NULL AND CreatedAtUtc < cutoff composes with the
// keyset cursor, so the page is always honest (full pages, no empty
// pages with a non-null next cursor).
StuckCutoffUtc: request.StuckOnly ? stuckCutoff : null,
SourceNode: NullIfBlank(request.SourceNodeFilter));
var pageSize = Math.Clamp(request.PageSize, 1, MaxPageSize);
var paging = new SiteCallPaging(
PageSize: pageSize,
AfterCreatedAtUtc: request.AfterCreatedAtUtc,
AfterId: request.AfterId is { } id ? new TrackedOperationId(id) : null);
var (scope, repository) = ResolveRepository();
try
{
var rows = await repository.QueryAsync(filter, paging).ConfigureAwait(false);
var summaries = rows
.Select(row => ToSummary(row, stuckCutoff))
.ToList();
// The next-page cursor is the last row of the materialised page.
var cursorRow = rows.Count > 0 ? rows[^1] : null;
return new SiteCallQueryResponse(
request.CorrelationId,
Success: true,
ErrorMessage: null,
SiteCalls: summaries,
NextAfterCreatedAtUtc: cursorRow?.CreatedAtUtc,
NextAfterId: cursorRow?.TrackedOperationId.Value);
}
finally
{
scope?.Dispose();
}
}
/// <summary>
/// Handles a full-detail query for a single cached call — backs the report
/// detail modal. A missing row yields <c>Success=false</c> with a "not
/// found" message; a repository fault yields <c>Success=false</c> with the
/// fault message.
/// </summary>
private void HandleDetail(SiteCallDetailRequest request)
{
var sender = Sender;
DetailAsync(request).PipeTo(
sender,
success: response => response,
failure: ex => new SiteCallDetailResponse(
request.CorrelationId,
Success: false,
ErrorMessage: ex.GetBaseException().Message,
Detail: null));
}
private async Task<SiteCallDetailResponse> DetailAsync(SiteCallDetailRequest request)
{
var (scope, repository) = ResolveRepository();
try
{
var row = await repository
.GetAsync(new TrackedOperationId(request.TrackedOperationId))
.ConfigureAwait(false);
if (row is null)
{
return new SiteCallDetailResponse(
request.CorrelationId,
Success: false,
ErrorMessage: "site call not found",
Detail: null);
}
return new SiteCallDetailResponse(
request.CorrelationId,
Success: true,
ErrorMessage: null,
Detail: ToDetail(row));
}
finally
{
scope?.Dispose();
}
}
/// <summary>
/// Handles a global KPI snapshot request, deriving the stuck cutoff from
/// <see cref="SiteCallAuditOptions.StuckAgeThreshold"/> and the
/// failed/delivered interval bound from <see cref="SiteCallAuditOptions.KpiInterval"/>.
/// </summary>
private void HandleKpi(SiteCallKpiRequest request)
{
var sender = Sender;
var now = DateTime.UtcNow;
var stuckCutoff = now - _options.StuckAgeThreshold;
var intervalSince = now - _options.KpiInterval;
KpiAsync(request.CorrelationId, stuckCutoff, intervalSince).PipeTo(
sender,
success: response => response,
failure: ex => new SiteCallKpiResponse(
request.CorrelationId,
Success: false,
ErrorMessage: ex.GetBaseException().Message,
BufferedCount: 0,
ParkedCount: 0,
FailedLastInterval: 0,
DeliveredLastInterval: 0,
OldestPendingAge: null,
StuckCount: 0));
}
private async Task<SiteCallKpiResponse> KpiAsync(
string correlationId, DateTime stuckCutoff, DateTime intervalSince)
{
var (scope, repository) = ResolveRepository();
try
{
var snapshot = await repository
.ComputeKpisAsync(stuckCutoff, intervalSince)
.ConfigureAwait(false);
return new SiteCallKpiResponse(
correlationId,
Success: true,
ErrorMessage: null,
snapshot.BufferedCount,
snapshot.ParkedCount,
snapshot.FailedLastInterval,
snapshot.DeliveredLastInterval,
snapshot.OldestPendingAge,
snapshot.StuckCount);
}
finally
{
scope?.Dispose();
}
}
/// <summary>
/// Handles a per-source-site KPI request, using the same stuck cutoff and
/// interval bound as <see cref="HandleKpi"/>.
/// </summary>
private void HandlePerSiteKpi(PerSiteSiteCallKpiRequest request)
{
var sender = Sender;
var now = DateTime.UtcNow;
var stuckCutoff = now - _options.StuckAgeThreshold;
var intervalSince = now - _options.KpiInterval;
PerSiteKpiAsync(request.CorrelationId, stuckCutoff, intervalSince).PipeTo(
sender,
success: response => response,
failure: ex => new PerSiteSiteCallKpiResponse(
request.CorrelationId,
Success: false,
ErrorMessage: ex.GetBaseException().Message,
Sites: Array.Empty<SiteCallSiteKpiSnapshot>()));
}
private async Task<PerSiteSiteCallKpiResponse> PerSiteKpiAsync(
string correlationId, DateTime stuckCutoff, DateTime intervalSince)
{
var (scope, repository) = ResolveRepository();
try
{
var sites = await repository
.ComputePerSiteKpisAsync(stuckCutoff, intervalSince)
.ConfigureAwait(false);
return new PerSiteSiteCallKpiResponse(
correlationId, Success: true, ErrorMessage: null, sites);
}
finally
{
scope?.Dispose();
}
}
/// <summary>
/// Handles a per-node KPI request, using the same stuck cutoff and
/// interval bound as <see cref="HandleKpi"/>. Additive alongside
/// <see cref="HandlePerSiteKpi"/> — does not change per-site behaviour.
/// </summary>
private void HandlePerNodeKpi(PerNodeSiteCallKpiRequest request)
{
var sender = Sender;
var now = DateTime.UtcNow;
var stuckCutoff = now - _options.StuckAgeThreshold;
var intervalSince = now - _options.KpiInterval;
PerNodeKpiAsync(request.CorrelationId, stuckCutoff, intervalSince).PipeTo(
sender,
success: response => response,
failure: ex => new PerNodeSiteCallKpiResponse(
request.CorrelationId,
Success: false,
ErrorMessage: ex.GetBaseException().Message,
Nodes: Array.Empty<SiteCallNodeKpiSnapshot>()));
}
private async Task<PerNodeSiteCallKpiResponse> PerNodeKpiAsync(
string correlationId, DateTime stuckCutoff, DateTime intervalSince)
{
var (scope, repository) = ResolveRepository();
try
{
var nodes = await repository
.ComputePerNodeKpisAsync(stuckCutoff, intervalSince)
.ConfigureAwait(false);
return new PerNodeSiteCallKpiResponse(
correlationId, Success: true, ErrorMessage: null, nodes);
}
finally
{
scope?.Dispose();
}
}
// ── Task 5: central→site Retry/Discard relay ──
/// <summary>
/// Relays an operator Retry of a parked cached call to its owning site. The
/// site is the source of truth — this handler NEVER writes the central
/// <c>SiteCalls</c> mirror row. It wraps a <see cref="RetryParkedOperation"/>
/// in a <see cref="SiteEnvelope"/> addressed to <c>SourceSite</c>, Asks the
/// <c>CentralCommunicationActor</c> (which routes it over the per-site
/// <c>ClusterClient</c>), and maps the site's
/// <see cref="ParkedOperationActionAck"/> — or an Ask timeout — onto a
/// <see cref="RetrySiteCallResponse"/>. A timeout / no-route is reported as
/// the distinct <see cref="SiteCallRelayOutcome.SiteUnreachable"/> outcome,
/// not a generic failure, so the Central UI can tell "site offline" from
/// "operation failed".
/// </summary>
private void HandleRetrySiteCall(RetrySiteCallRequest request)
{
var sender = Sender;
if (_centralCommunication is null)
{
// No transport registered yet — there is genuinely no route to any
// site, so the only honest answer is unreachable.
_logger.LogWarning(
"RetrySiteCall {TrackedOperationId} for site {SourceSite} arrived before the "
+ "central→site transport was registered; reporting site unreachable",
request.TrackedOperationId, request.SourceSite);
sender.Tell(UnreachableRetry(request.CorrelationId));
return;
}
var relay = new RetryParkedOperation(
request.CorrelationId, new TrackedOperationId(request.TrackedOperationId));
var envelope = new SiteEnvelope(request.SourceSite, relay);
_centralCommunication.Ask<ParkedOperationActionAck>(envelope, _options.RelayTimeout)
.PipeTo(
sender,
success: ack => MapRetryResponse(request.CorrelationId, ack),
failure: ex => MapRetryFailure(request.CorrelationId, request.SourceSite, ex));
}
/// <summary>
/// Relays an operator Discard of a parked cached call to its owning site.
/// Mirrors <see cref="HandleRetrySiteCall"/> — see that method for the
/// source-of-truth and site-unreachable rationale.
/// </summary>
private void HandleDiscardSiteCall(DiscardSiteCallRequest request)
{
var sender = Sender;
if (_centralCommunication is null)
{
_logger.LogWarning(
"DiscardSiteCall {TrackedOperationId} for site {SourceSite} arrived before the "
+ "central→site transport was registered; reporting site unreachable",
request.TrackedOperationId, request.SourceSite);
sender.Tell(UnreachableDiscard(request.CorrelationId));
return;
}
var relay = new DiscardParkedOperation(
request.CorrelationId, new TrackedOperationId(request.TrackedOperationId));
var envelope = new SiteEnvelope(request.SourceSite, relay);
_centralCommunication.Ask<ParkedOperationActionAck>(envelope, _options.RelayTimeout)
.PipeTo(
sender,
success: ack => MapDiscardResponse(request.CorrelationId, ack),
failure: ex => MapDiscardFailure(request.CorrelationId, request.SourceSite, ex));
}
/// <summary>
/// Maps the site's <see cref="ParkedOperationActionAck"/> for a Retry onto a
/// <see cref="RetrySiteCallResponse"/>: an applied action is
/// <see cref="SiteCallRelayOutcome.Applied"/>; a clean no-op
/// (<c>Applied=false</c>, no error) is <see cref="SiteCallRelayOutcome.NotParked"/>;
/// an ack carrying an error is <see cref="SiteCallRelayOutcome.OperationFailed"/>
/// — in every case the site WAS reached.
/// </summary>
private static RetrySiteCallResponse MapRetryResponse(string correlationId, ParkedOperationActionAck ack)
{
var outcome = ClassifyAck(ack);
return new RetrySiteCallResponse(
correlationId,
outcome,
Success: outcome == SiteCallRelayOutcome.Applied,
SiteReachable: true,
ErrorMessage: AckErrorMessage(outcome, ack));
}
private static DiscardSiteCallResponse MapDiscardResponse(string correlationId, ParkedOperationActionAck ack)
{
var outcome = ClassifyAck(ack);
return new DiscardSiteCallResponse(
correlationId,
outcome,
Success: outcome == SiteCallRelayOutcome.Applied,
SiteReachable: true,
ErrorMessage: AckErrorMessage(outcome, ack));
}
private RetrySiteCallResponse MapRetryFailure(string correlationId, string sourceSite, Exception ex)
{
_logger.LogWarning(ex,
"Retry relay to site {SourceSite} did not complete; reporting site unreachable", sourceSite);
return UnreachableRetry(correlationId);
}
private DiscardSiteCallResponse MapDiscardFailure(string correlationId, string sourceSite, Exception ex)
{
_logger.LogWarning(ex,
"Discard relay to site {SourceSite} did not complete; reporting site unreachable", sourceSite);
return UnreachableDiscard(correlationId);
}
/// <summary>
/// Classifies a site ack: <c>Applied=true</c> → applied; <c>Applied=false</c>
/// with no error → the site definitively had nothing parked; <c>Applied=false</c>
/// with an error → the site could not apply the action.
/// </summary>
private static SiteCallRelayOutcome ClassifyAck(ParkedOperationActionAck ack)
{
if (ack.Applied)
{
return SiteCallRelayOutcome.Applied;
}
return ack.ErrorMessage is null
? SiteCallRelayOutcome.NotParked
: SiteCallRelayOutcome.OperationFailed;
}
private static string? AckErrorMessage(SiteCallRelayOutcome outcome, ParkedOperationActionAck ack)
{
return outcome switch
{
SiteCallRelayOutcome.Applied => null,
SiteCallRelayOutcome.NotParked =>
"The operation is no longer parked at the site (already delivered, discarded, or retrying).",
SiteCallRelayOutcome.OperationFailed => ack.ErrorMessage,
// SiteUnreachable is never produced from a ParkedOperationActionAck —
// unreachable responses are built by UnreachableRetry/UnreachableDiscard
// before any ack is classified, so this arm is unreachable by construction.
// We deliberately return ack.ErrorMessage (rather than throwing) to keep
// AckErrorMessage total and side-effect-free: site-unreachable is classified
// as transient by the upstream relay path (which has already constructed the
// SiteUnreachable response and detail text via SiteUnreachableMessage), so a
// defensive fall-through here just surfaces whatever error text the ack
// carries and lets the caller schedule a retry. Throwing would turn a benign
// refactor invariant violation into a relay-path crash.
SiteCallRelayOutcome.SiteUnreachable => ack.ErrorMessage,
_ => throw new ArgumentOutOfRangeException(
nameof(outcome), outcome, "unknown SiteCallRelayOutcome"),
};
}
/// <summary>Shared "site unreachable" detail text for both relay directions.</summary>
private const string SiteUnreachableMessage =
"The owning site is unreachable; the action was not applied. Retry when the site is back online.";
private static RetrySiteCallResponse UnreachableRetry(string correlationId)
{
return new RetrySiteCallResponse(
correlationId,
SiteCallRelayOutcome.SiteUnreachable,
Success: false,
SiteReachable: false,
ErrorMessage: SiteUnreachableMessage);
}
private static DiscardSiteCallResponse UnreachableDiscard(string correlationId)
{
return new DiscardSiteCallResponse(
correlationId,
SiteCallRelayOutcome.SiteUnreachable,
Success: false,
SiteReachable: false,
ErrorMessage: SiteUnreachableMessage);
}
/// <summary>
/// Resolves an <see cref="ISiteCallAuditRepository"/> for one read message.
/// In test mode the injected instance is returned with a null scope; in
/// production a fresh DI scope is created and returned so the caller can
/// dispose it once the read completes — the same scope-per-message pattern
/// as <see cref="OnUpsertAsync"/>.
/// </summary>
private (IServiceScope? Scope, ISiteCallAuditRepository Repository) ResolveRepository()
{
if (_injectedRepository is not null)
{
return (null, _injectedRepository);
}
var scope = _serviceProvider!.CreateScope();
return (scope, scope.ServiceProvider.GetRequiredService<ISiteCallAuditRepository>());
}
/// <summary>
/// A cached call counts as stuck when it is still non-terminal and was
/// created before <paramref name="stuckCutoff"/>. Non-terminal is keyed off
/// <see cref="SiteCall.TerminalAtUtc"/> being <c>null</c> — the
/// <c>SiteCalls</c> operational mirror stores <c>AuditStatus</c>-derived
/// status strings (<c>Attempted</c>/<c>Delivered</c>/<c>Parked</c>/...), not
/// the tracking-lifecycle <c>Pending</c>/<c>Retrying</c> names the spec's
/// KPI section uses, so there is no status string that means "buffered".
/// <c>TerminalAtUtc</c> is the entity's own active/terminal discriminator
/// and is consistent with the repository KPI counts and
/// <c>PurgeTerminalAsync</c>.
/// </summary>
private static bool IsStuck(SiteCall row, DateTime stuckCutoff)
{
return row.TerminalAtUtc is null && row.CreatedAtUtc < stuckCutoff;
}
private static SiteCallSummary ToSummary(SiteCall row, DateTime stuckCutoff)
{
return new SiteCallSummary(
TrackedOperationId: row.TrackedOperationId.Value,
SourceSite: row.SourceSite,
Channel: row.Channel,
Target: row.Target,
Status: row.Status,
RetryCount: row.RetryCount,
LastError: row.LastError,
HttpStatus: row.HttpStatus,
CreatedAtUtc: row.CreatedAtUtc,
UpdatedAtUtc: row.UpdatedAtUtc,
TerminalAtUtc: row.TerminalAtUtc,
IsStuck: IsStuck(row, stuckCutoff),
SourceNode: row.SourceNode);
}
private static SiteCallDetail ToDetail(SiteCall row)
{
return new SiteCallDetail(
TrackedOperationId: row.TrackedOperationId.Value,
SourceSite: row.SourceSite,
Channel: row.Channel,
Target: row.Target,
Status: row.Status,
RetryCount: row.RetryCount,
LastError: row.LastError,
HttpStatus: row.HttpStatus,
CreatedAtUtc: row.CreatedAtUtc,
UpdatedAtUtc: row.UpdatedAtUtc,
TerminalAtUtc: row.TerminalAtUtc,
IngestedAtUtc: row.IngestedAtUtc,
SourceNode: row.SourceNode);
}
/// <summary>
/// Treats an empty/whitespace filter string as "no constraint" — the
/// repository's <see cref="SiteCallQueryFilter"/> interprets <c>null</c> as
/// a no-op predicate, so a blank UI filter must collapse to <c>null</c>.
/// </summary>
private static string? NullIfBlank(string? value)
{
return string.IsNullOrWhiteSpace(value) ? null : value;
}
/// <summary>Self-tick triggering a reconciliation pass across all sites (Piece A).</summary>
internal sealed class ReconciliationTick
{
public static readonly ReconciliationTick Instance = new();
private ReconciliationTick() { }
}
/// <summary>Self-tick triggering a terminal-row purge pass (Piece B).</summary>
internal sealed class PurgeTick
{
public static readonly PurgeTick Instance = new();
private PurgeTick() { }
}
}
/// <summary>
/// Registers the central→site command transport (the <c>CentralCommunicationActor</c>)
/// with the <see cref="SiteCallAuditActor"/> so it can relay Retry/Discard
/// actions on parked cached calls to their owning sites. Sent by the Host after
/// both actors exist. Lives here (not in Commons) because it carries an
/// <see cref="IActorRef"/> and <c>ZB.MOM.WW.ScadaBridge.Commons</c> has no Akka reference —
/// the same rationale as <c>RegisterAuditIngest</c>.
/// </summary>
public sealed record RegisterCentralCommunication(IActorRef CentralCommunication);