using Akka.Actor; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using ScadaLink.Commons.Entities.Audit; using ScadaLink.Commons.Interfaces.Repositories; using ScadaLink.Commons.Messages.Audit; using ScadaLink.Commons.Messages.RemoteQuery; using ScadaLink.Commons.Types; using ScadaLink.Commons.Types.Audit; using ScadaLink.Communication; namespace ScadaLink.SiteCallAudit; /// /// Central singleton for Site Call Audit (#22). Receives /// messages and persists each /// row via /// — idempotent monotonic /// upsert. Out-of-order or duplicate updates are silent no-ops at the /// repository layer; the actor always replies /// with Accepted=true in that case because storage state is consistent /// and the site is free to consider its packet acked. /// /// /// /// Query, detail and KPIs (Task 4) and the central→site Retry/Discard relay /// (Task 5 — the relay handlers live in this actor) are implemented; only /// reconciliation remains deferred (per CLAUDE.md scope discipline — it lands /// in a later follow-up). /// /// /// Per CLAUDE.md "audit-write failure NEVER aborts the user-facing action" — /// the actor catches every exception from the repository call and replies /// Accepted=false without rethrowing, so the central singleton stays /// alive. The uses Resume so an /// unexpected throw before the catch (defence in depth) does not restart the /// actor and reset in-flight state. /// /// /// Two constructors exist for the same reason as /// AuditLogIngestActor: production wiring (Bundle F) resolves the /// scoped EF repository from a fresh DI scope per message because the actor /// is a long-lived cluster singleton, while tests inject a concrete /// against a per-test MSSQL fixture /// so the actor exercises the real monotonic upsert SQL end to end. /// /// public class SiteCallAuditActor : ReceiveActor { /// Maximum page size honoured by a . private const int MaxPageSize = 200; private readonly IServiceProvider? _serviceProvider; private readonly ISiteCallAuditRepository? _injectedRepository; private readonly SiteCallAuditOptions _options; private readonly ILogger _logger; /// /// Task 5 (#22): the central→site command transport — the /// CentralCommunicationActor, which owns the per-site /// ClusterClient map and routes a to the /// owning site. Set via by the /// Host after both actors exist (this actor is a cluster singleton; the /// transport actor is created separately). Null until registration /// completes — a relay arriving before then is answered with a /// outcome, because there /// is genuinely no route to any site yet. /// private IActorRef? _centralCommunication; /// /// Test-mode constructor — injects a concrete repository instance whose /// lifetime exceeds the test, so the actor reuses the same instance /// across every message. Used by Bundle C's MSSQL-backed TestKit fixture. /// An optional lets a test pin the stuck/KPI /// windows; when omitted the production defaults apply. /// public SiteCallAuditActor( ISiteCallAuditRepository repository, ILogger logger, SiteCallAuditOptions? options = null) { ArgumentNullException.ThrowIfNull(repository); ArgumentNullException.ThrowIfNull(logger); _injectedRepository = repository; _logger = logger; _options = options ?? new SiteCallAuditOptions(); RegisterHandlers(); } /// /// Production constructor — resolves /// from a fresh DI scope per message because the repository is a scoped EF /// Core service registered by AddConfigurationDatabase. The actor /// itself is a long-lived cluster singleton, so it cannot hold a scope /// across messages. /// public SiteCallAuditActor( IServiceProvider serviceProvider, SiteCallAuditOptions options, ILogger logger) { ArgumentNullException.ThrowIfNull(serviceProvider); ArgumentNullException.ThrowIfNull(options); ArgumentNullException.ThrowIfNull(logger); _serviceProvider = serviceProvider; _options = options; _logger = logger; RegisterHandlers(); } /// /// Wires up the message handlers shared by both constructors: the M3 /// ingest path plus the Task 4 read-side (query, detail, global + per-site /// KPI). All read handlers reply to an Ask, so they capture Sender /// before the first await and PipeTo the result back. /// private void RegisterHandlers() { ReceiveAsync(OnUpsertAsync); Receive(HandleQuery); Receive(HandleDetail); Receive(HandleKpi); Receive(HandlePerSiteKpi); // Task 5 (#22): central→site Retry/Discard relay for parked cached calls. Receive(msg => { _centralCommunication = msg.CentralCommunication; _logger.LogInformation("SiteCallAudit registered central→site communication transport"); }); Receive(HandleRetrySiteCall); Receive(HandleDiscardSiteCall); } /// /// Audit-write failures are best-effort by design (CLAUDE.md §Audit): a /// thrown exception in the upsert pipeline must not crash the actor. /// Resume keeps the actor's state intact so the next packet is processed /// against the same repository instance. /// protected override SupervisorStrategy SupervisorStrategy() { return new OneForOneStrategy(maxNrOfRetries: 0, withinTimeRange: TimeSpan.Zero, decider: Akka.Actor.SupervisorStrategy.DefaultDecider); } private async Task OnUpsertAsync(UpsertSiteCallCommand cmd) { // Sender is captured before the first await — Akka resets Sender // between message dispatches, so a post-await Tell would go to // DeadLetters. var replyTo = Sender; var id = cmd.SiteCall.TrackedOperationId; // Scope-per-message mirrors AuditLogIngestActor — production EF // repository is scoped; the injected-repository mode (tests) skips // the scope entirely. IServiceScope? scope = null; ISiteCallAuditRepository repository; if (_injectedRepository is not null) { repository = _injectedRepository; } else { scope = _serviceProvider!.CreateScope(); repository = scope.ServiceProvider.GetRequiredService(); } try { await repository.UpsertAsync(cmd.SiteCall).ConfigureAwait(false); replyTo.Tell(new UpsertSiteCallReply(id, Accepted: true)); } catch (Exception ex) { // Per CLAUDE.md: audit-write failure NEVER aborts the user-facing // action — log and reply Accepted=false; do NOT rethrow (the // central singleton MUST stay alive). _logger.LogError(ex, "SiteCallAudit upsert failed for {TrackedOperationId}", id); replyTo.Tell(new UpsertSiteCallReply(id, Accepted: false)); } finally { scope?.Dispose(); } } // ── Task 4: read-side (query / detail / KPI) ── /// /// Handles a paginated, filtered query over the SiteCalls table. /// Builds a + /// keyset cursor from the request, runs the query on a scoped repository, /// and pipes the mapped response back to the captured sender. A repository /// fault yields a failure response with an empty list. /// private void HandleQuery(SiteCallQueryRequest request) { var sender = Sender; var now = DateTime.UtcNow; QueryAsync(request, now).PipeTo( sender, success: response => response, failure: ex => new SiteCallQueryResponse( request.CorrelationId, Success: false, ErrorMessage: ex.GetBaseException().Message, SiteCalls: Array.Empty(), NextAfterCreatedAtUtc: null, NextAfterId: null)); } private async Task QueryAsync(SiteCallQueryRequest request, DateTime now) { var stuckCutoff = now - _options.StuckAgeThreshold; var filter = new SiteCallQueryFilter( Channel: NullIfBlank(request.ChannelFilter), SourceSite: NullIfBlank(request.SourceSiteFilter), Status: NullIfBlank(request.StatusFilter), Target: NullIfBlank(request.TargetKeyword), FromUtc: request.FromUtc, ToUtc: request.ToUtc, // StuckOnly is pushed into the repository SQL via StuckCutoffUtc — // TerminalAtUtc IS NULL AND CreatedAtUtc < cutoff composes with the // keyset cursor, so the page is always honest (full pages, no empty // pages with a non-null next cursor). StuckCutoffUtc: request.StuckOnly ? stuckCutoff : null); var pageSize = Math.Clamp(request.PageSize, 1, MaxPageSize); var paging = new SiteCallPaging( PageSize: pageSize, AfterCreatedAtUtc: request.AfterCreatedAtUtc, AfterId: request.AfterId is { } id ? new TrackedOperationId(id) : null); var (scope, repository) = ResolveRepository(); try { var rows = await repository.QueryAsync(filter, paging).ConfigureAwait(false); var summaries = rows .Select(row => ToSummary(row, stuckCutoff)) .ToList(); // The next-page cursor is the last row of the materialised page. var cursorRow = rows.Count > 0 ? rows[^1] : null; return new SiteCallQueryResponse( request.CorrelationId, Success: true, ErrorMessage: null, SiteCalls: summaries, NextAfterCreatedAtUtc: cursorRow?.CreatedAtUtc, NextAfterId: cursorRow?.TrackedOperationId.Value); } finally { scope?.Dispose(); } } /// /// Handles a full-detail query for a single cached call — backs the report /// detail modal. A missing row yields Success=false with a "not /// found" message; a repository fault yields Success=false with the /// fault message. /// private void HandleDetail(SiteCallDetailRequest request) { var sender = Sender; DetailAsync(request).PipeTo( sender, success: response => response, failure: ex => new SiteCallDetailResponse( request.CorrelationId, Success: false, ErrorMessage: ex.GetBaseException().Message, Detail: null)); } private async Task DetailAsync(SiteCallDetailRequest request) { var (scope, repository) = ResolveRepository(); try { var row = await repository .GetAsync(new TrackedOperationId(request.TrackedOperationId)) .ConfigureAwait(false); if (row is null) { return new SiteCallDetailResponse( request.CorrelationId, Success: false, ErrorMessage: "site call not found", Detail: null); } return new SiteCallDetailResponse( request.CorrelationId, Success: true, ErrorMessage: null, Detail: ToDetail(row)); } finally { scope?.Dispose(); } } /// /// Handles a global KPI snapshot request, deriving the stuck cutoff from /// and the /// failed/delivered interval bound from . /// private void HandleKpi(SiteCallKpiRequest request) { var sender = Sender; var now = DateTime.UtcNow; var stuckCutoff = now - _options.StuckAgeThreshold; var intervalSince = now - _options.KpiInterval; KpiAsync(request.CorrelationId, stuckCutoff, intervalSince).PipeTo( sender, success: response => response, failure: ex => new SiteCallKpiResponse( request.CorrelationId, Success: false, ErrorMessage: ex.GetBaseException().Message, BufferedCount: 0, ParkedCount: 0, FailedLastInterval: 0, DeliveredLastInterval: 0, OldestPendingAge: null, StuckCount: 0)); } private async Task KpiAsync( string correlationId, DateTime stuckCutoff, DateTime intervalSince) { var (scope, repository) = ResolveRepository(); try { var snapshot = await repository .ComputeKpisAsync(stuckCutoff, intervalSince) .ConfigureAwait(false); return new SiteCallKpiResponse( correlationId, Success: true, ErrorMessage: null, snapshot.BufferedCount, snapshot.ParkedCount, snapshot.FailedLastInterval, snapshot.DeliveredLastInterval, snapshot.OldestPendingAge, snapshot.StuckCount); } finally { scope?.Dispose(); } } /// /// Handles a per-source-site KPI request, using the same stuck cutoff and /// interval bound as . /// private void HandlePerSiteKpi(PerSiteSiteCallKpiRequest request) { var sender = Sender; var now = DateTime.UtcNow; var stuckCutoff = now - _options.StuckAgeThreshold; var intervalSince = now - _options.KpiInterval; PerSiteKpiAsync(request.CorrelationId, stuckCutoff, intervalSince).PipeTo( sender, success: response => response, failure: ex => new PerSiteSiteCallKpiResponse( request.CorrelationId, Success: false, ErrorMessage: ex.GetBaseException().Message, Sites: Array.Empty())); } private async Task PerSiteKpiAsync( string correlationId, DateTime stuckCutoff, DateTime intervalSince) { var (scope, repository) = ResolveRepository(); try { var sites = await repository .ComputePerSiteKpisAsync(stuckCutoff, intervalSince) .ConfigureAwait(false); return new PerSiteSiteCallKpiResponse( correlationId, Success: true, ErrorMessage: null, sites); } finally { scope?.Dispose(); } } // ── Task 5: central→site Retry/Discard relay ── /// /// Relays an operator Retry of a parked cached call to its owning site. The /// site is the source of truth — this handler NEVER writes the central /// SiteCalls mirror row. It wraps a /// in a addressed to SourceSite, Asks the /// CentralCommunicationActor (which routes it over the per-site /// ClusterClient), and maps the site's /// — or an Ask timeout — onto a /// . A timeout / no-route is reported as /// the distinct outcome, /// not a generic failure, so the Central UI can tell "site offline" from /// "operation failed". /// private void HandleRetrySiteCall(RetrySiteCallRequest request) { var sender = Sender; if (_centralCommunication is null) { // No transport registered yet — there is genuinely no route to any // site, so the only honest answer is unreachable. _logger.LogWarning( "RetrySiteCall {TrackedOperationId} for site {SourceSite} arrived before the " + "central→site transport was registered; reporting site unreachable", request.TrackedOperationId, request.SourceSite); sender.Tell(UnreachableRetry(request.CorrelationId)); return; } var relay = new RetryParkedOperation( request.CorrelationId, new TrackedOperationId(request.TrackedOperationId)); var envelope = new SiteEnvelope(request.SourceSite, relay); _centralCommunication.Ask(envelope, _options.RelayTimeout) .PipeTo( sender, success: ack => MapRetryResponse(request.CorrelationId, ack), failure: ex => MapRetryFailure(request.CorrelationId, request.SourceSite, ex)); } /// /// Relays an operator Discard of a parked cached call to its owning site. /// Mirrors — see that method for the /// source-of-truth and site-unreachable rationale. /// private void HandleDiscardSiteCall(DiscardSiteCallRequest request) { var sender = Sender; if (_centralCommunication is null) { _logger.LogWarning( "DiscardSiteCall {TrackedOperationId} for site {SourceSite} arrived before the " + "central→site transport was registered; reporting site unreachable", request.TrackedOperationId, request.SourceSite); sender.Tell(UnreachableDiscard(request.CorrelationId)); return; } var relay = new DiscardParkedOperation( request.CorrelationId, new TrackedOperationId(request.TrackedOperationId)); var envelope = new SiteEnvelope(request.SourceSite, relay); _centralCommunication.Ask(envelope, _options.RelayTimeout) .PipeTo( sender, success: ack => MapDiscardResponse(request.CorrelationId, ack), failure: ex => MapDiscardFailure(request.CorrelationId, request.SourceSite, ex)); } /// /// Maps the site's for a Retry onto a /// : an applied action is /// ; a clean no-op /// (Applied=false, no error) is ; /// an ack carrying an error is /// — in every case the site WAS reached. /// private static RetrySiteCallResponse MapRetryResponse(string correlationId, ParkedOperationActionAck ack) { var outcome = ClassifyAck(ack); return new RetrySiteCallResponse( correlationId, outcome, Success: outcome == SiteCallRelayOutcome.Applied, SiteReachable: true, ErrorMessage: AckErrorMessage(outcome, ack)); } private static DiscardSiteCallResponse MapDiscardResponse(string correlationId, ParkedOperationActionAck ack) { var outcome = ClassifyAck(ack); return new DiscardSiteCallResponse( correlationId, outcome, Success: outcome == SiteCallRelayOutcome.Applied, SiteReachable: true, ErrorMessage: AckErrorMessage(outcome, ack)); } private RetrySiteCallResponse MapRetryFailure(string correlationId, string sourceSite, Exception ex) { _logger.LogWarning(ex, "Retry relay to site {SourceSite} did not complete; reporting site unreachable", sourceSite); return UnreachableRetry(correlationId); } private DiscardSiteCallResponse MapDiscardFailure(string correlationId, string sourceSite, Exception ex) { _logger.LogWarning(ex, "Discard relay to site {SourceSite} did not complete; reporting site unreachable", sourceSite); return UnreachableDiscard(correlationId); } /// /// Classifies a site ack: Applied=true → applied; Applied=false /// with no error → the site definitively had nothing parked; Applied=false /// with an error → the site could not apply the action. /// private static SiteCallRelayOutcome ClassifyAck(ParkedOperationActionAck ack) { if (ack.Applied) { return SiteCallRelayOutcome.Applied; } return ack.ErrorMessage is null ? SiteCallRelayOutcome.NotParked : SiteCallRelayOutcome.OperationFailed; } private static string? AckErrorMessage(SiteCallRelayOutcome outcome, ParkedOperationActionAck ack) { return outcome switch { SiteCallRelayOutcome.Applied => null, SiteCallRelayOutcome.NotParked => "The operation is no longer parked at the site (already delivered, discarded, or retrying).", SiteCallRelayOutcome.OperationFailed => ack.ErrorMessage, // SiteUnreachable is never produced from a ParkedOperationActionAck — // unreachable responses are built by UnreachableRetry/UnreachableDiscard // before any ack is classified, so this arm is unreachable by construction. SiteCallRelayOutcome.SiteUnreachable => ack.ErrorMessage, _ => throw new ArgumentOutOfRangeException( nameof(outcome), outcome, "unknown SiteCallRelayOutcome"), }; } /// Shared "site unreachable" detail text for both relay directions. private const string SiteUnreachableMessage = "The owning site is unreachable; the action was not applied. Retry when the site is back online."; private static RetrySiteCallResponse UnreachableRetry(string correlationId) { return new RetrySiteCallResponse( correlationId, SiteCallRelayOutcome.SiteUnreachable, Success: false, SiteReachable: false, ErrorMessage: SiteUnreachableMessage); } private static DiscardSiteCallResponse UnreachableDiscard(string correlationId) { return new DiscardSiteCallResponse( correlationId, SiteCallRelayOutcome.SiteUnreachable, Success: false, SiteReachable: false, ErrorMessage: SiteUnreachableMessage); } /// /// Resolves an for one read message. /// In test mode the injected instance is returned with a null scope; in /// production a fresh DI scope is created and returned so the caller can /// dispose it once the read completes — the same scope-per-message pattern /// as . /// private (IServiceScope? Scope, ISiteCallAuditRepository Repository) ResolveRepository() { if (_injectedRepository is not null) { return (null, _injectedRepository); } var scope = _serviceProvider!.CreateScope(); return (scope, scope.ServiceProvider.GetRequiredService()); } /// /// A cached call counts as stuck when it is still non-terminal and was /// created before . Non-terminal is keyed off /// being null — the /// SiteCalls operational mirror stores AuditStatus-derived /// status strings (Attempted/Delivered/Parked/...), not /// the tracking-lifecycle Pending/Retrying names the spec's /// KPI section uses, so there is no status string that means "buffered". /// TerminalAtUtc is the entity's own active/terminal discriminator /// and is consistent with the repository KPI counts and /// PurgeTerminalAsync. /// private static bool IsStuck(SiteCall row, DateTime stuckCutoff) { return row.TerminalAtUtc is null && row.CreatedAtUtc < stuckCutoff; } private static SiteCallSummary ToSummary(SiteCall row, DateTime stuckCutoff) { return new SiteCallSummary( TrackedOperationId: row.TrackedOperationId.Value, SourceSite: row.SourceSite, Channel: row.Channel, Target: row.Target, Status: row.Status, RetryCount: row.RetryCount, LastError: row.LastError, HttpStatus: row.HttpStatus, CreatedAtUtc: row.CreatedAtUtc, UpdatedAtUtc: row.UpdatedAtUtc, TerminalAtUtc: row.TerminalAtUtc, IsStuck: IsStuck(row, stuckCutoff)); } private static SiteCallDetail ToDetail(SiteCall row) { return new SiteCallDetail( TrackedOperationId: row.TrackedOperationId.Value, SourceSite: row.SourceSite, Channel: row.Channel, Target: row.Target, Status: row.Status, RetryCount: row.RetryCount, LastError: row.LastError, HttpStatus: row.HttpStatus, CreatedAtUtc: row.CreatedAtUtc, UpdatedAtUtc: row.UpdatedAtUtc, TerminalAtUtc: row.TerminalAtUtc, IngestedAtUtc: row.IngestedAtUtc); } /// /// Treats an empty/whitespace filter string as "no constraint" — the /// repository's interprets null as /// a no-op predicate, so a blank UI filter must collapse to null. /// private static string? NullIfBlank(string? value) { return string.IsNullOrWhiteSpace(value) ? null : value; } } /// /// Registers the central→site command transport (the CentralCommunicationActor) /// with the so it can relay Retry/Discard /// actions on parked cached calls to their owning sites. Sent by the Host after /// both actors exist. Lives here (not in Commons) because it carries an /// and ScadaLink.Commons has no Akka reference — /// the same rationale as RegisterAuditIngest. /// public sealed record RegisterCentralCommunication(IActorRef CentralCommunication);