using Akka.Actor; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using ZB.MOM.WW.ScadaBridge.AuditLog.Central; using ZB.MOM.WW.ScadaBridge.Commons.Entities.Audit; using ZB.MOM.WW.ScadaBridge.Commons.Interfaces.Repositories; using ZB.MOM.WW.ScadaBridge.Commons.Messages.Audit; using ZB.MOM.WW.ScadaBridge.Commons.Messages.RemoteQuery; using ZB.MOM.WW.ScadaBridge.Commons.Types; using ZB.MOM.WW.ScadaBridge.Commons.Types.Audit; using ZB.MOM.WW.ScadaBridge.Communication; namespace ZB.MOM.WW.ScadaBridge.SiteCallAudit; /// /// Central singleton for Site Call Audit (#22). Receives /// messages and persists each /// row via /// — idempotent monotonic /// upsert. Out-of-order or duplicate updates are silent no-ops at the /// repository layer; the actor always replies /// with Accepted=true in that case because storage state is consistent /// and the site is free to consider its packet acked. /// /// /// /// Implemented: direct telemetry ingest, /// query, detail and KPI handlers (Task 4), the central→site Retry/Discard /// relay (Task 5 — the relay handlers live in this actor), the periodic /// per-site reconciliation puller that backfills lost telemetry (Piece A — /// , the documented self-heal pull), and /// the daily terminal-row purge scheduler (Piece B — /// , which invokes /// on a timer). Both /// background timers are started in and gate on the /// reconciliation collaborators ( + /// ) being available — the repo-only test ctor /// injects neither, so neither timer runs there. /// /// /// Per CLAUDE.md "audit-write failure NEVER aborts the user-facing action" — /// the actor catches every exception from the repository call and replies /// Accepted=false without rethrowing, so the central singleton stays /// alive. The override governs the actor's /// children, not the actor itself; this actor has no children today, /// so the override is currently inert. It returns a one-for-one strategy with /// (Restart on most /// exceptions, Stop on / /// ) and maxNrOfRetries: 0, so any /// future child that throws is Stopped on the first failure — a deliberate /// "fail loudly" posture for the central singleton's eventual sub-actors /// (reconciliation puller, purge scheduler). Self-supervision of this actor /// is whatever the parent /// supplies; the in-handler try/catch in /// is what actually keeps the singleton alive across repository faults. /// /// /// Two constructors exist for the same reason as /// AuditLogIngestActor: production wiring (Bundle F) resolves the /// scoped EF repository from a fresh DI scope per message because the actor /// is a long-lived cluster singleton, while tests inject a concrete /// against a per-test MSSQL fixture /// so the actor exercises the real monotonic upsert SQL end to end. /// /// public class SiteCallAuditActor : ReceiveActor { /// Maximum page size honoured by a . private const int MaxPageSize = 200; private readonly IServiceProvider? _serviceProvider; private readonly ISiteCallAuditRepository? _injectedRepository; private readonly SiteCallAuditOptions _options; private readonly ILogger _logger; /// /// Reconciliation collaborators (Piece A). The per-site self-heal pull /// () and the site list /// (). On the production path these are /// resolved once from the root (central /// singletons registered by AddAuditLogCentralReconciliationClient); /// in the test path they are injected directly. They are null when /// the actor was built via the repo-only test ctor — in that case the /// reconciliation tick is NOT started (see ); /// the purge tick gates on the same collaborators (see ). /// private readonly IPullSiteCallsClient? _pullClient; private readonly ISiteEnumerator? _siteEnumerator; /// /// Per-site reconciliation watermark — the highest /// seen for that site on a previous /// tick. The next tick asks for rows at or after this cursor; idempotent /// monotonic swallows any /// duplicate-with-same-timestamp rows. In-memory for the singleton's /// lifetime — a failover / restart resets every cursor to /// , which is conservative but correct /// (the next tick re-pulls and idempotent upsert dedupes). Mirrors /// SiteAuditReconciliationActor. /// private readonly Dictionary _reconciliationCursors = new(); private ICancelable? _reconciliationTimer; private ICancelable? _purgeTimer; /// /// Task 5 (#22): the central→site command transport — the /// CentralCommunicationActor, which owns the per-site /// ClusterClient map and routes a to the /// owning site. Set via by the /// Host after both actors exist (this actor is a cluster singleton; the /// transport actor is created separately). Null until registration /// completes — a relay arriving before then is answered with a /// outcome, because there /// is genuinely no route to any site yet. /// private IActorRef? _centralCommunication; /// /// Test-mode constructor — injects a concrete repository instance whose /// lifetime exceeds the test, so the actor reuses the same instance /// across every message. Used by Bundle C's MSSQL-backed TestKit fixture. /// An optional lets a test pin the stuck/KPI /// windows; when omitted the production defaults apply. /// /// This ctor injects NO reconciliation client/enumerator, so the /// reconciliation tick is gated off (see ) /// — the MSSQL-backed read/upsert tests must not fire phantom pulls. /// /// /// Concrete repository instance to use for all messages. /// Logger for diagnostics and error reporting. /// Optional configuration overrides; production defaults apply when null. public SiteCallAuditActor( ISiteCallAuditRepository repository, ILogger logger, SiteCallAuditOptions? options = null) { ArgumentNullException.ThrowIfNull(repository); ArgumentNullException.ThrowIfNull(logger); _injectedRepository = repository; _logger = logger; _options = options ?? new SiteCallAuditOptions(); RegisterHandlers(); } /// /// Test-mode constructor for the reconciliation tick (Piece A) — injects a /// concrete repository PLUS the two reconciliation collaborators directly, /// so the per-site self-heal pull is unit-testable in-memory without a DI /// container or a live gRPC channel. Because the client + enumerator are /// present, the reconciliation tick IS started; the purge tick is also /// started (both gate on the collaborators being available — see /// / ). /// /// Concrete repository instance used for upserts and purges. /// Enumerates the sites to reconcile each tick. /// Pull client used to fetch changed rows from each site. /// Logger for diagnostics and error reporting. /// Optional configuration overrides; production defaults apply when null. /// /// Public (not internal) because Akka's default ActivatorProducer /// instantiates the actor via reflection with public-only binding flags — /// an internal ctor yields a MissingMethodException at actor /// creation. Distinguished from the production /// ctor by its concrete-collaborator parameter list; only the test project /// (or a host that hand-resolves the collaborators) constructs it this way. /// public SiteCallAuditActor( ISiteCallAuditRepository repository, ISiteEnumerator siteEnumerator, IPullSiteCallsClient pullClient, ILogger logger, SiteCallAuditOptions? options = null) { ArgumentNullException.ThrowIfNull(repository); ArgumentNullException.ThrowIfNull(siteEnumerator); ArgumentNullException.ThrowIfNull(pullClient); ArgumentNullException.ThrowIfNull(logger); _injectedRepository = repository; _siteEnumerator = siteEnumerator; _pullClient = pullClient; _logger = logger; _options = options ?? new SiteCallAuditOptions(); RegisterHandlers(); } /// /// Production constructor — resolves /// from a fresh DI scope per message because the repository is a scoped EF /// Core service registered by AddConfigurationDatabase. The actor /// itself is a long-lived cluster singleton, so it cannot hold a scope /// across messages. /// /// DI service provider used to create a scoped repository per message. /// Actor configuration (stuck threshold, KPI interval, relay timeout). /// Logger for diagnostics and error reporting. public SiteCallAuditActor( IServiceProvider serviceProvider, SiteCallAuditOptions options, ILogger logger) { ArgumentNullException.ThrowIfNull(serviceProvider); ArgumentNullException.ThrowIfNull(options); ArgumentNullException.ThrowIfNull(logger); _serviceProvider = serviceProvider; _options = options; _logger = logger; // Reconciliation collaborators (Piece A) are central-only singletons // registered by AddAuditLogCentralReconciliationClient — always on the // central composition root (Program.cs). Resolve them once here (the // actor itself is a long-lived singleton; the repository is the only // scoped service and is still resolved per-tick/per-message). GetService // (not GetRequiredService) so a host that somehow omits the helper // degrades to "no reconciliation tick" rather than a startup crash — // the tick startup gates on both being non-null. _pullClient = serviceProvider.GetService(); _siteEnumerator = serviceProvider.GetService(); RegisterHandlers(); } /// /// Wires up the message handlers shared by both constructors: the M3 /// ingest path plus the Task 4 read-side (query, detail, global + per-site /// KPI). All read handlers reply to an Ask, so they capture Sender /// before the first await and PipeTo the result back. /// private void RegisterHandlers() { ReceiveAsync(OnUpsertAsync); Receive(HandleQuery); Receive(HandleDetail); Receive(HandleKpi); Receive(HandlePerSiteKpi); // Task 5 (#22): central→site Retry/Discard relay for parked cached calls. Receive(msg => { _centralCommunication = msg.CentralCommunication; _logger.LogInformation("SiteCallAudit registered central→site communication transport"); }); Receive(HandleRetrySiteCall); Receive(HandleDiscardSiteCall); // Piece A/B (#22): self-ticks for the periodic reconciliation pull and // the daily terminal-row purge. Handlers stay alive across faults via // their own per-site / per-tick try/catch (mirroring the ingest path); // the timers are only started when their collaborators are available. ReceiveAsync(_ => OnReconciliationTickAsync()); ReceiveAsync(_ => OnPurgeTickAsync()); } /// protected override void PreStart() { base.PreStart(); StartReconciliationTimer(); StartPurgeTimer(); } /// protected override void PostStop() { _reconciliationTimer?.Cancel(); _purgeTimer?.Cancel(); base.PostStop(); } /// /// Starts the periodic reconciliation tick — but ONLY when both the pull /// client and the site enumerator are available. The repo-only test ctor /// injects neither, so the tick is gated off there (the MSSQL read/upsert /// tests must not fire phantom pulls); the reconciliation test ctor and the /// production ctor (which resolves both from the SP) start it. /// private void StartReconciliationTimer() { if (_pullClient is null || _siteEnumerator is null) { return; } var interval = _options.ResolvedReconciliationInterval; _reconciliationTimer = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable( initialDelay: interval, interval: interval, receiver: Self, message: ReconciliationTick.Instance, sender: Self); } /// /// Starts the daily purge tick — gated on the same collaborator presence as /// the reconciliation tick. The purge itself only needs the repository, but /// gating both schedulers together keeps the repo-only test ctor (no /// client/enumerator) free of BOTH background timers, so the MSSQL read/ /// upsert tests see no scheduled side effects. /// private void StartPurgeTimer() { if (_pullClient is null || _siteEnumerator is null) { return; } var interval = _options.ResolvedPurgeInterval; _purgeTimer = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable( initialDelay: interval, interval: interval, receiver: Self, message: PurgeTick.Instance, sender: Self); } /// protected override SupervisorStrategy SupervisorStrategy() { return new OneForOneStrategy(maxNrOfRetries: 0, withinTimeRange: TimeSpan.Zero, decider: Akka.Actor.SupervisorStrategy.DefaultDecider); } private async Task OnUpsertAsync(UpsertSiteCallCommand cmd) { // Sender is captured before the first await — Akka resets Sender // between message dispatches, so a post-await Tell would go to // DeadLetters. var replyTo = Sender; var id = cmd.SiteCall.TrackedOperationId; // Scope-per-message mirrors AuditLogIngestActor — production EF // repository is scoped; the injected-repository mode (tests) skips // the scope entirely. IServiceScope? scope = null; ISiteCallAuditRepository repository; if (_injectedRepository is not null) { repository = _injectedRepository; } else { scope = _serviceProvider!.CreateScope(); repository = scope.ServiceProvider.GetRequiredService(); } try { // SiteCallAudit-003: stamp IngestedAtUtc at central-side persist // time on every upsert, mirroring AuditLogIngestActor's combined- // telemetry hot path. IngestedAtUtc is the "central ingested (or // last refreshed) this row" timestamp; callers (telemetry, // future reconciliation puller, direct-writes) cannot in general // know they are running on central, so the actor owns the stamp. var siteCall = cmd.SiteCall with { IngestedAtUtc = DateTime.UtcNow }; await repository.UpsertAsync(siteCall).ConfigureAwait(false); replyTo.Tell(new UpsertSiteCallReply(id, Accepted: true)); } catch (Exception ex) { // Per CLAUDE.md: audit-write failure NEVER aborts the user-facing // action — log and reply Accepted=false; do NOT rethrow (the // central singleton MUST stay alive). _logger.LogError(ex, "SiteCallAudit upsert failed for {TrackedOperationId}", id); replyTo.Tell(new UpsertSiteCallReply(id, Accepted: false)); } finally { scope?.Dispose(); } } // ── Piece A: periodic per-site reconciliation pull (self-heal) ── /// /// One reconciliation pass: enumerate every known site and, per site, pull /// changed rows since that site's cursor and upsert /// them idempotently — the documented self-heal when best-effort gRPC push /// telemetry is lost. This is a mirror, NOT a dispatcher: cached-call /// delivery stays site-local; upserting reconciled rows only refreshes the /// eventually-consistent central SiteCalls mirror. /// /// /// Mirrors SiteAuditReconciliationActor's structure (per-site cursor, /// per-site try/catch failure isolation, advance the cursor by the max /// observed ) but is deliberately simpler: /// no stalled-detection EventStream machinery — just cursor + pull + upsert /// + advance. One DI scope per tick is opened and the same repository reused /// across every site in that tick. /// private async Task OnReconciliationTickAsync() { // The collaborators are guaranteed non-null: the tick is only scheduled // when both are present (StartReconciliationTimer). Assert via the // local copies so a future refactor that drops the gate fails loudly. var enumerator = _siteEnumerator!; var client = _pullClient!; IReadOnlyList sites; try { // No ambient CancellationToken in a ReceiveActor handler — None is // intentional; the work is bounded by the reconciliation interval // plus the singleton's graceful-stop drain on PhaseClusterLeave. sites = await enumerator.EnumerateAsync().ConfigureAwait(false); } catch (Exception ex) { _logger.LogError(ex, "SiteCallAudit site enumeration failed; skipping reconciliation tick."); return; } if (sites.Count == 0) { return; } // AuditLog-003: open the scope INLINE with CreateAsyncScope + await using // so the scoped EF Core repository (an IAsyncDisposable DbContext) disposes // asynchronously at end of tick rather than blocking the Akka dispatcher // thread on a synchronous Dispose() of pending connection cleanup — the tick // holds the scope across many awaited UpsertAsync calls. Mirrors the sibling // SiteAuditReconciliationActor.OnTickAsync. ResolveRepository() (sync Dispose) // is retained for the synchronous message-handler paths. In the injected- // repository test path there is no scope to open and the test repo is reused. if (_injectedRepository is not null) { await ReconcileSitesAsync(sites, client, _injectedRepository).ConfigureAwait(false); return; } await using var scope = _serviceProvider!.CreateAsyncScope(); var repository = scope.ServiceProvider.GetRequiredService(); await ReconcileSitesAsync(sites, client, repository).ConfigureAwait(false); } /// /// Reconciles every site in the tick against a single resolved repository, /// isolating per-site faults so one bad site never sinks the rest of the /// pass (the failing site's cursor is left at its previous value so the next /// tick retries the same window). /// private async Task ReconcileSitesAsync( IReadOnlyList sites, IPullSiteCallsClient client, ISiteCallAuditRepository repository) { foreach (var site in sites) { try { await ReconcileSiteAsync(site, client, repository).ConfigureAwait(false); } catch (Exception ex) { // Failure-isolation invariant: one site's fault (transport, // repository write) must NOT sink the rest of the tick. The // failing site's cursor is left at its previous value so the // next tick retries the same window. _logger.LogWarning( ex, "SiteCallAudit reconciliation pull failed for site {SiteId}; other sites continue.", site.SiteId); } } } /// /// Issues one PullSiteCalls RPC against the site, upserts the /// returned rows idempotently, and advances the site's cursor to the maximum /// observed. The pull client returns rows /// oldest-first with SourceSite already re-stamped from the dialed /// site id, so the actor upserts them verbatim (re-stamping /// IngestedAtUtc at central persist time, as the telemetry path does). /// /// /// /// Coarse per-site retry — a deliberate divergence from /// SiteAuditReconciliationActor. That sibling (AuditLog-004) tracks /// a per-EventId attempt counter and permanently abandons a row after a /// threshold so a single un-insertable row cannot block a site's cursor /// forever. This actor deliberately does NOT: any throw inside the loop /// propagates to 's per-site catch, /// which leaves the site's cursor at its previous value, so the next tick /// re-pulls the whole batch from since. A persistently-bad row therefore /// holds the site's cursor and re-pulls the batch every tick. This is /// acceptable here because is /// monotonic and idempotent — re-pulling already-ingested rows is a cheap /// no-op — and the SiteCalls table is an eventually-consistent mirror, /// not the source of truth, so a slow site simply lags rather than corrupts. /// /// /// Inclusive cursor boundary. The cursor is advanced to the maximum /// seen, and the pull asks for rows at or /// after it (since is >=, not >). The row whose /// timestamp equals the cursor is therefore re-pulled on the next tick and /// deduplicated by the idempotent monotonic upsert — the same inclusive-boundary /// contract as SiteAuditReconciliationActor's cursor. /// /// private async Task ReconcileSiteAsync( SiteEntry site, IPullSiteCallsClient client, ISiteCallAuditRepository repository) { var since = _reconciliationCursors.TryGetValue(site.SiteId, out var c) ? c : DateTime.MinValue; var response = await client .PullAsync(site.SiteId, since, _options.ReconciliationBatchSize, CancellationToken.None) .ConfigureAwait(false); var maxUpdated = since; var nowUtc = DateTime.UtcNow; foreach (var row in response.SiteCalls) { // IngestedAtUtc is the "central ingested (or last refreshed) this // row" stamp — owned by the central actor, exactly as OnUpsertAsync // does for the telemetry path. Monotonic UpsertAsync makes a row // already present (from a prior push) a silent no-op. var siteCall = row with { IngestedAtUtc = nowUtc }; await repository.UpsertAsync(siteCall).ConfigureAwait(false); if (row.UpdatedAtUtc > maxUpdated) { maxUpdated = row.UpdatedAtUtc; } } // Advance the cursor to the newest row seen. A MoreAvailable response // means the site saturated the batch; the next tick continues draining // from the advanced cursor (no immediate re-pull loop — the natural // tick cadence drains the backlog, matching SiteAuditReconciliationActor). _reconciliationCursors[site.SiteId] = maxUpdated; } // ── Piece B: daily terminal-row purge scheduler ── /// /// One purge pass: drops terminal SiteCalls rows whose /// is older than /// UtcNow - RetentionDays via /// . Non-terminal /// rows are never purged (enforced in the repository). The threshold is /// computed each tick so an operator who lowers RetentionDays sees it /// applied on the next purge without an actor restart. Mirrors /// AuditLogPurgeActor's daily cadence + continue-on-error posture: a /// purge fault is logged and swallowed so the singleton stays alive. /// private async Task OnPurgeTickAsync() { var threshold = DateTime.UtcNow - TimeSpan.FromDays(_options.RetentionDays); // AuditLog-003: open the scope INLINE with CreateAsyncScope + await using // so the scoped EF Core repository (an IAsyncDisposable DbContext) disposes // asynchronously rather than blocking the Akka dispatcher thread on a // synchronous Dispose(). Mirrors SiteAuditReconciliationActor; the // injected-repository test path reuses the test repo with no scope. if (_injectedRepository is not null) { await PurgeWithRepositoryAsync(_injectedRepository, threshold).ConfigureAwait(false); return; } await using var scope = _serviceProvider!.CreateAsyncScope(); var repository = scope.ServiceProvider.GetRequiredService(); await PurgeWithRepositoryAsync(repository, threshold).ConfigureAwait(false); } /// /// Runs one terminal-row purge against the resolved repository, logging and /// swallowing any fault (continue-on-error) so a transient SQL failure or /// contention never crashes the central singleton — the next tick retries /// the same window. /// private async Task PurgeWithRepositoryAsync(ISiteCallAuditRepository repository, DateTime threshold) { try { var rowsDeleted = await repository.PurgeTerminalAsync(threshold).ConfigureAwait(false); if (rowsDeleted > 0) { _logger.LogInformation( "SiteCallAudit purged {RowsDeleted} terminal SiteCalls rows older than {ThresholdUtc:o}.", rowsDeleted, threshold); } } catch (Exception ex) { // Continue-on-error: a purge fault (transient SQL failure, // contention) must NOT crash the central singleton. The next tick // retries the same window. _logger.LogError( ex, "SiteCallAudit terminal-row purge failed (threshold {ThresholdUtc:o}); will retry next tick.", threshold); } } // ── Task 4: read-side (query / detail / KPI) ── /// /// Handles a paginated, filtered query over the SiteCalls table. /// Builds a + /// keyset cursor from the request, runs the query on a scoped repository, /// and pipes the mapped response back to the captured sender. A repository /// fault yields a failure response with an empty list. /// private void HandleQuery(SiteCallQueryRequest request) { var sender = Sender; var now = DateTime.UtcNow; QueryAsync(request, now).PipeTo( sender, success: response => response, failure: ex => new SiteCallQueryResponse( request.CorrelationId, Success: false, ErrorMessage: ex.GetBaseException().Message, SiteCalls: Array.Empty(), NextAfterCreatedAtUtc: null, NextAfterId: null)); } private async Task QueryAsync(SiteCallQueryRequest request, DateTime now) { var stuckCutoff = now - _options.StuckAgeThreshold; var filter = new SiteCallQueryFilter( Channel: NullIfBlank(request.ChannelFilter), SourceSite: NullIfBlank(request.SourceSiteFilter), Status: NullIfBlank(request.StatusFilter), Target: NullIfBlank(request.TargetKeyword), FromUtc: request.FromUtc, ToUtc: request.ToUtc, // StuckOnly is pushed into the repository SQL via StuckCutoffUtc — // TerminalAtUtc IS NULL AND CreatedAtUtc < cutoff composes with the // keyset cursor, so the page is always honest (full pages, no empty // pages with a non-null next cursor). StuckCutoffUtc: request.StuckOnly ? stuckCutoff : null, SourceNode: NullIfBlank(request.SourceNodeFilter)); var pageSize = Math.Clamp(request.PageSize, 1, MaxPageSize); var paging = new SiteCallPaging( PageSize: pageSize, AfterCreatedAtUtc: request.AfterCreatedAtUtc, AfterId: request.AfterId is { } id ? new TrackedOperationId(id) : null); var (scope, repository) = ResolveRepository(); try { var rows = await repository.QueryAsync(filter, paging).ConfigureAwait(false); var summaries = rows .Select(row => ToSummary(row, stuckCutoff)) .ToList(); // The next-page cursor is the last row of the materialised page. var cursorRow = rows.Count > 0 ? rows[^1] : null; return new SiteCallQueryResponse( request.CorrelationId, Success: true, ErrorMessage: null, SiteCalls: summaries, NextAfterCreatedAtUtc: cursorRow?.CreatedAtUtc, NextAfterId: cursorRow?.TrackedOperationId.Value); } finally { scope?.Dispose(); } } /// /// Handles a full-detail query for a single cached call — backs the report /// detail modal. A missing row yields Success=false with a "not /// found" message; a repository fault yields Success=false with the /// fault message. /// private void HandleDetail(SiteCallDetailRequest request) { var sender = Sender; DetailAsync(request).PipeTo( sender, success: response => response, failure: ex => new SiteCallDetailResponse( request.CorrelationId, Success: false, ErrorMessage: ex.GetBaseException().Message, Detail: null)); } private async Task DetailAsync(SiteCallDetailRequest request) { var (scope, repository) = ResolveRepository(); try { var row = await repository .GetAsync(new TrackedOperationId(request.TrackedOperationId)) .ConfigureAwait(false); if (row is null) { return new SiteCallDetailResponse( request.CorrelationId, Success: false, ErrorMessage: "site call not found", Detail: null); } return new SiteCallDetailResponse( request.CorrelationId, Success: true, ErrorMessage: null, Detail: ToDetail(row)); } finally { scope?.Dispose(); } } /// /// Handles a global KPI snapshot request, deriving the stuck cutoff from /// and the /// failed/delivered interval bound from . /// private void HandleKpi(SiteCallKpiRequest request) { var sender = Sender; var now = DateTime.UtcNow; var stuckCutoff = now - _options.StuckAgeThreshold; var intervalSince = now - _options.KpiInterval; KpiAsync(request.CorrelationId, stuckCutoff, intervalSince).PipeTo( sender, success: response => response, failure: ex => new SiteCallKpiResponse( request.CorrelationId, Success: false, ErrorMessage: ex.GetBaseException().Message, BufferedCount: 0, ParkedCount: 0, FailedLastInterval: 0, DeliveredLastInterval: 0, OldestPendingAge: null, StuckCount: 0)); } private async Task KpiAsync( string correlationId, DateTime stuckCutoff, DateTime intervalSince) { var (scope, repository) = ResolveRepository(); try { var snapshot = await repository .ComputeKpisAsync(stuckCutoff, intervalSince) .ConfigureAwait(false); return new SiteCallKpiResponse( correlationId, Success: true, ErrorMessage: null, snapshot.BufferedCount, snapshot.ParkedCount, snapshot.FailedLastInterval, snapshot.DeliveredLastInterval, snapshot.OldestPendingAge, snapshot.StuckCount); } finally { scope?.Dispose(); } } /// /// Handles a per-source-site KPI request, using the same stuck cutoff and /// interval bound as . /// private void HandlePerSiteKpi(PerSiteSiteCallKpiRequest request) { var sender = Sender; var now = DateTime.UtcNow; var stuckCutoff = now - _options.StuckAgeThreshold; var intervalSince = now - _options.KpiInterval; PerSiteKpiAsync(request.CorrelationId, stuckCutoff, intervalSince).PipeTo( sender, success: response => response, failure: ex => new PerSiteSiteCallKpiResponse( request.CorrelationId, Success: false, ErrorMessage: ex.GetBaseException().Message, Sites: Array.Empty())); } private async Task PerSiteKpiAsync( string correlationId, DateTime stuckCutoff, DateTime intervalSince) { var (scope, repository) = ResolveRepository(); try { var sites = await repository .ComputePerSiteKpisAsync(stuckCutoff, intervalSince) .ConfigureAwait(false); return new PerSiteSiteCallKpiResponse( correlationId, Success: true, ErrorMessage: null, sites); } finally { scope?.Dispose(); } } // ── Task 5: central→site Retry/Discard relay ── /// /// Relays an operator Retry of a parked cached call to its owning site. The /// site is the source of truth — this handler NEVER writes the central /// SiteCalls mirror row. It wraps a /// in a addressed to SourceSite, Asks the /// CentralCommunicationActor (which routes it over the per-site /// ClusterClient), and maps the site's /// — or an Ask timeout — onto a /// . A timeout / no-route is reported as /// the distinct outcome, /// not a generic failure, so the Central UI can tell "site offline" from /// "operation failed". /// private void HandleRetrySiteCall(RetrySiteCallRequest request) { var sender = Sender; if (_centralCommunication is null) { // No transport registered yet — there is genuinely no route to any // site, so the only honest answer is unreachable. _logger.LogWarning( "RetrySiteCall {TrackedOperationId} for site {SourceSite} arrived before the " + "central→site transport was registered; reporting site unreachable", request.TrackedOperationId, request.SourceSite); sender.Tell(UnreachableRetry(request.CorrelationId)); return; } var relay = new RetryParkedOperation( request.CorrelationId, new TrackedOperationId(request.TrackedOperationId)); var envelope = new SiteEnvelope(request.SourceSite, relay); _centralCommunication.Ask(envelope, _options.RelayTimeout) .PipeTo( sender, success: ack => MapRetryResponse(request.CorrelationId, ack), failure: ex => MapRetryFailure(request.CorrelationId, request.SourceSite, ex)); } /// /// Relays an operator Discard of a parked cached call to its owning site. /// Mirrors — see that method for the /// source-of-truth and site-unreachable rationale. /// private void HandleDiscardSiteCall(DiscardSiteCallRequest request) { var sender = Sender; if (_centralCommunication is null) { _logger.LogWarning( "DiscardSiteCall {TrackedOperationId} for site {SourceSite} arrived before the " + "central→site transport was registered; reporting site unreachable", request.TrackedOperationId, request.SourceSite); sender.Tell(UnreachableDiscard(request.CorrelationId)); return; } var relay = new DiscardParkedOperation( request.CorrelationId, new TrackedOperationId(request.TrackedOperationId)); var envelope = new SiteEnvelope(request.SourceSite, relay); _centralCommunication.Ask(envelope, _options.RelayTimeout) .PipeTo( sender, success: ack => MapDiscardResponse(request.CorrelationId, ack), failure: ex => MapDiscardFailure(request.CorrelationId, request.SourceSite, ex)); } /// /// Maps the site's for a Retry onto a /// : an applied action is /// ; a clean no-op /// (Applied=false, no error) is ; /// an ack carrying an error is /// — in every case the site WAS reached. /// private static RetrySiteCallResponse MapRetryResponse(string correlationId, ParkedOperationActionAck ack) { var outcome = ClassifyAck(ack); return new RetrySiteCallResponse( correlationId, outcome, Success: outcome == SiteCallRelayOutcome.Applied, SiteReachable: true, ErrorMessage: AckErrorMessage(outcome, ack)); } private static DiscardSiteCallResponse MapDiscardResponse(string correlationId, ParkedOperationActionAck ack) { var outcome = ClassifyAck(ack); return new DiscardSiteCallResponse( correlationId, outcome, Success: outcome == SiteCallRelayOutcome.Applied, SiteReachable: true, ErrorMessage: AckErrorMessage(outcome, ack)); } private RetrySiteCallResponse MapRetryFailure(string correlationId, string sourceSite, Exception ex) { _logger.LogWarning(ex, "Retry relay to site {SourceSite} did not complete; reporting site unreachable", sourceSite); return UnreachableRetry(correlationId); } private DiscardSiteCallResponse MapDiscardFailure(string correlationId, string sourceSite, Exception ex) { _logger.LogWarning(ex, "Discard relay to site {SourceSite} did not complete; reporting site unreachable", sourceSite); return UnreachableDiscard(correlationId); } /// /// Classifies a site ack: Applied=true → applied; Applied=false /// with no error → the site definitively had nothing parked; Applied=false /// with an error → the site could not apply the action. /// private static SiteCallRelayOutcome ClassifyAck(ParkedOperationActionAck ack) { if (ack.Applied) { return SiteCallRelayOutcome.Applied; } return ack.ErrorMessage is null ? SiteCallRelayOutcome.NotParked : SiteCallRelayOutcome.OperationFailed; } private static string? AckErrorMessage(SiteCallRelayOutcome outcome, ParkedOperationActionAck ack) { return outcome switch { SiteCallRelayOutcome.Applied => null, SiteCallRelayOutcome.NotParked => "The operation is no longer parked at the site (already delivered, discarded, or retrying).", SiteCallRelayOutcome.OperationFailed => ack.ErrorMessage, // SiteUnreachable is never produced from a ParkedOperationActionAck — // unreachable responses are built by UnreachableRetry/UnreachableDiscard // before any ack is classified, so this arm is unreachable by construction. // We deliberately return ack.ErrorMessage (rather than throwing) to keep // AckErrorMessage total and side-effect-free: site-unreachable is classified // as transient by the upstream relay path (which has already constructed the // SiteUnreachable response and detail text via SiteUnreachableMessage), so a // defensive fall-through here just surfaces whatever error text the ack // carries and lets the caller schedule a retry. Throwing would turn a benign // refactor invariant violation into a relay-path crash. SiteCallRelayOutcome.SiteUnreachable => ack.ErrorMessage, _ => throw new ArgumentOutOfRangeException( nameof(outcome), outcome, "unknown SiteCallRelayOutcome"), }; } /// Shared "site unreachable" detail text for both relay directions. private const string SiteUnreachableMessage = "The owning site is unreachable; the action was not applied. Retry when the site is back online."; private static RetrySiteCallResponse UnreachableRetry(string correlationId) { return new RetrySiteCallResponse( correlationId, SiteCallRelayOutcome.SiteUnreachable, Success: false, SiteReachable: false, ErrorMessage: SiteUnreachableMessage); } private static DiscardSiteCallResponse UnreachableDiscard(string correlationId) { return new DiscardSiteCallResponse( correlationId, SiteCallRelayOutcome.SiteUnreachable, Success: false, SiteReachable: false, ErrorMessage: SiteUnreachableMessage); } /// /// Resolves an for one read message. /// In test mode the injected instance is returned with a null scope; in /// production a fresh DI scope is created and returned so the caller can /// dispose it once the read completes — the same scope-per-message pattern /// as . /// private (IServiceScope? Scope, ISiteCallAuditRepository Repository) ResolveRepository() { if (_injectedRepository is not null) { return (null, _injectedRepository); } var scope = _serviceProvider!.CreateScope(); return (scope, scope.ServiceProvider.GetRequiredService()); } /// /// A cached call counts as stuck when it is still non-terminal and was /// created before . Non-terminal is keyed off /// being null — the /// SiteCalls operational mirror stores AuditStatus-derived /// status strings (Attempted/Delivered/Parked/...), not /// the tracking-lifecycle Pending/Retrying names the spec's /// KPI section uses, so there is no status string that means "buffered". /// TerminalAtUtc is the entity's own active/terminal discriminator /// and is consistent with the repository KPI counts and /// PurgeTerminalAsync. /// private static bool IsStuck(SiteCall row, DateTime stuckCutoff) { return row.TerminalAtUtc is null && row.CreatedAtUtc < stuckCutoff; } private static SiteCallSummary ToSummary(SiteCall row, DateTime stuckCutoff) { return new SiteCallSummary( TrackedOperationId: row.TrackedOperationId.Value, SourceSite: row.SourceSite, Channel: row.Channel, Target: row.Target, Status: row.Status, RetryCount: row.RetryCount, LastError: row.LastError, HttpStatus: row.HttpStatus, CreatedAtUtc: row.CreatedAtUtc, UpdatedAtUtc: row.UpdatedAtUtc, TerminalAtUtc: row.TerminalAtUtc, IsStuck: IsStuck(row, stuckCutoff), SourceNode: row.SourceNode); } private static SiteCallDetail ToDetail(SiteCall row) { return new SiteCallDetail( TrackedOperationId: row.TrackedOperationId.Value, SourceSite: row.SourceSite, Channel: row.Channel, Target: row.Target, Status: row.Status, RetryCount: row.RetryCount, LastError: row.LastError, HttpStatus: row.HttpStatus, CreatedAtUtc: row.CreatedAtUtc, UpdatedAtUtc: row.UpdatedAtUtc, TerminalAtUtc: row.TerminalAtUtc, IngestedAtUtc: row.IngestedAtUtc, SourceNode: row.SourceNode); } /// /// Treats an empty/whitespace filter string as "no constraint" — the /// repository's interprets null as /// a no-op predicate, so a blank UI filter must collapse to null. /// private static string? NullIfBlank(string? value) { return string.IsNullOrWhiteSpace(value) ? null : value; } /// Self-tick triggering a reconciliation pass across all sites (Piece A). internal sealed class ReconciliationTick { public static readonly ReconciliationTick Instance = new(); private ReconciliationTick() { } } /// Self-tick triggering a terminal-row purge pass (Piece B). internal sealed class PurgeTick { public static readonly PurgeTick Instance = new(); private PurgeTick() { } } } /// /// Registers the central→site command transport (the CentralCommunicationActor) /// with the so it can relay Retry/Discard /// actions on parked cached calls to their owning sites. Sent by the Host after /// both actors exist. Lives here (not in Commons) because it carries an /// and ZB.MOM.WW.ScadaBridge.Commons has no Akka reference — /// the same rationale as RegisterAuditIngest. /// public sealed record RegisterCentralCommunication(IActorRef CentralCommunication);