diff --git a/src/ScadaLink.Commons/Interfaces/Repositories/ISiteCallAuditRepository.cs b/src/ScadaLink.Commons/Interfaces/Repositories/ISiteCallAuditRepository.cs index 7bb3790..81c3a48 100644 --- a/src/ScadaLink.Commons/Interfaces/Repositories/ISiteCallAuditRepository.cs +++ b/src/ScadaLink.Commons/Interfaces/Repositories/ISiteCallAuditRepository.cs @@ -63,4 +63,27 @@ public interface ISiteCallAuditRepository /// deleted. /// Task PurgeTerminalAsync(DateTime olderThanUtc, CancellationToken ct = default); + + /// + /// Computes a point-in-time global from the + /// SiteCalls table. Counts are aggregated server-side (no row + /// materialisation): StuckCount uses ; + /// FailedLastInterval / DeliveredLastInterval use + /// ; the current time for OldestPendingAge + /// is captured inside the method. + /// + Task ComputeKpisAsync( + DateTime stuckCutoff, + DateTime intervalSince, + CancellationToken ct = default); + + /// + /// Computes a point-in-time per source + /// site. Sites with no SiteCalls rows at all are omitted. The stuck + /// cutoff and interval bounds are interpreted as in . + /// + Task> ComputePerSiteKpisAsync( + DateTime stuckCutoff, + DateTime intervalSince, + CancellationToken ct = default); } diff --git a/src/ScadaLink.Commons/Messages/Audit/SiteCallQueries.cs b/src/ScadaLink.Commons/Messages/Audit/SiteCallQueries.cs new file mode 100644 index 0000000..d23a467 --- /dev/null +++ b/src/ScadaLink.Commons/Messages/Audit/SiteCallQueries.cs @@ -0,0 +1,153 @@ +using ScadaLink.Commons.Types.Audit; + +namespace ScadaLink.Commons.Messages.Audit; + +/// +/// Site Calls UI -> Central: paginated, filtered query over the central +/// SiteCalls table (Site Call Audit #22). All filter fields are optional; +/// restricts results to stuck cached calls. Mirrors +/// +/// but uses keyset paging ( + ) +/// to match the repository's (CreatedAtUtc DESC, TrackedOperationId DESC) +/// cursor, rather than page numbers. +/// +/// +/// matches the SiteCall.Channel column — +/// "ApiOutbound" or "DbOutbound" (the spec's Kind notion; +/// the entity exposes it as Channel). is an +/// exact-match target filter, consistent with the repository's +/// predicate. +/// +public sealed record SiteCallQueryRequest( + string CorrelationId, + string? StatusFilter, + string? SourceSiteFilter, + string? ChannelFilter, + string? TargetKeyword, + bool StuckOnly, + DateTime? FromUtc, + DateTime? ToUtc, + DateTime? AfterCreatedAtUtc, + Guid? AfterId, + int PageSize); + +/// +/// A single SiteCalls row summarised for the Site Calls UI grid. Carries +/// only the columns the +/// entity genuinely exposes — there are no source-instance/script provenance +/// columns on that entity, so unlike +/// +/// none are surfaced here. +/// +public sealed record SiteCallSummary( + Guid TrackedOperationId, + string SourceSite, + string Channel, + string Target, + string Status, + int RetryCount, + string? LastError, + int? HttpStatus, + DateTime CreatedAtUtc, + DateTime UpdatedAtUtc, + DateTime? TerminalAtUtc, + bool IsStuck); + +/// +/// Central -> Site Calls UI: paginated response for a . +/// The keyset cursor of the last row is echoed back as +/// + for the caller +/// to request the following page; both are null when the page was empty. +/// On a repository fault is false, +/// carries the cause and is empty. +/// +public sealed record SiteCallQueryResponse( + string CorrelationId, + bool Success, + string? ErrorMessage, + IReadOnlyList SiteCalls, + DateTime? NextAfterCreatedAtUtc, + Guid? NextAfterId); + +/// +/// Site Calls UI -> Central: request for the full detail of a single cached call, +/// for the report detail modal. +/// +public sealed record SiteCallDetailRequest( + string CorrelationId, + Guid TrackedOperationId); + +/// +/// Central -> Site Calls UI: full detail for one cached call. On a repository +/// fault or missing row, is false / +/// is null and carries +/// the cause. +/// +public sealed record SiteCallDetailResponse( + string CorrelationId, + bool Success, + string? ErrorMessage, + SiteCallDetail? Detail); + +/// +/// Full SiteCalls row detail for the report detail modal — every field +/// on the entity, +/// including and the +/// timestamp the grid summary omits. +/// +public sealed record SiteCallDetail( + Guid TrackedOperationId, + string SourceSite, + string Channel, + string Target, + string Status, + int RetryCount, + string? LastError, + int? HttpStatus, + DateTime CreatedAtUtc, + DateTime UpdatedAtUtc, + DateTime? TerminalAtUtc, + DateTime IngestedAtUtc); + +/// +/// Site Calls UI -> Central: request for the global SiteCalls KPI summary. +/// Mirrors . +/// +public sealed record SiteCallKpiRequest( + string CorrelationId); + +/// +/// Central -> Site Calls UI: KPI summary for the Site Calls dashboard. On a +/// repository fault is false, +/// carries the cause, and the KPI fields are +/// zeroed/null. +/// +public sealed record SiteCallKpiResponse( + string CorrelationId, + bool Success, + string? ErrorMessage, + int BufferedCount, + int ParkedCount, + int FailedLastInterval, + int DeliveredLastInterval, + TimeSpan? OldestPendingAge, + int StuckCount); + +/// +/// Site Calls UI -> Central: request for the per-source-site SiteCalls +/// KPI breakdown. Mirrors +/// . +/// +public sealed record PerSiteSiteCallKpiRequest( + string CorrelationId); + +/// +/// Central -> Site Calls UI: per-site KPI breakdown for the Site Calls KPIs +/// page. On a repository fault is false, +/// carries the cause, and is empty. +/// +public sealed record PerSiteSiteCallKpiResponse( + string CorrelationId, + bool Success, + string? ErrorMessage, + IReadOnlyList Sites); diff --git a/src/ScadaLink.Commons/Types/Audit/SiteCallKpiSnapshot.cs b/src/ScadaLink.Commons/Types/Audit/SiteCallKpiSnapshot.cs new file mode 100644 index 0000000..07873fb --- /dev/null +++ b/src/ScadaLink.Commons/Types/Audit/SiteCallKpiSnapshot.cs @@ -0,0 +1,38 @@ +namespace ScadaLink.Commons.Types.Audit; + +/// +/// Point-in-time operational metrics for the central SiteCalls table +/// (Site Call Audit #22), surfaced on the health dashboard. The cached-call +/// counterpart of ; +/// mirrors its shape so the Central UI Site Calls KPI tiles can reuse the +/// Notification Outbox tile layout. +/// +/// +/// Count of non-terminal rows (Pending + Retrying) — calls +/// buffered at sites awaiting retry. +/// +/// Count of rows in the Parked status. +/// +/// Count of Failed rows whose +/// is at or after the supplied "since" timestamp. +/// +/// +/// Count of Delivered rows whose +/// is at or after the supplied "since" timestamp. +/// +/// +/// Age of the oldest non-terminal row (now - min(CreatedAtUtc)), or +/// null when there are no non-terminal rows. +/// +/// +/// Count of non-terminal rows (Pending/Retrying) whose +/// is older +/// than the supplied stuck cutoff. Display-only — no escalation. +/// +public sealed record SiteCallKpiSnapshot( + int BufferedCount, + int ParkedCount, + int FailedLastInterval, + int DeliveredLastInterval, + TimeSpan? OldestPendingAge, + int StuckCount); diff --git a/src/ScadaLink.Commons/Types/Audit/SiteCallSiteKpiSnapshot.cs b/src/ScadaLink.Commons/Types/Audit/SiteCallSiteKpiSnapshot.cs new file mode 100644 index 0000000..c67c895 --- /dev/null +++ b/src/ScadaLink.Commons/Types/Audit/SiteCallSiteKpiSnapshot.cs @@ -0,0 +1,34 @@ +namespace ScadaLink.Commons.Types.Audit; + +/// +/// Point-in-time SiteCalls metrics scoped to a single source site. The +/// per-site counterpart of ; surfaced in the +/// per-site breakdown table on the Site Calls KPIs page. Mirrors +/// . +/// +/// The site identifier these metrics are scoped to. +/// Count of this site's non-terminal rows (Pending + Retrying). +/// Count of this site's rows in the Parked status. +/// +/// Count of this site's Failed rows whose TerminalAtUtc is at or +/// after the "since" timestamp. +/// +/// +/// Count of this site's Delivered rows whose TerminalAtUtc is at +/// or after the "since" timestamp. +/// +/// +/// Age of this site's oldest non-terminal row, or null when it has none. +/// +/// +/// Count of this site's non-terminal rows whose CreatedAtUtc is older +/// than the stuck cutoff. +/// +public sealed record SiteCallSiteKpiSnapshot( + string SourceSite, + int BufferedCount, + int ParkedCount, + int FailedLastInterval, + int DeliveredLastInterval, + TimeSpan? OldestPendingAge, + int StuckCount); diff --git a/src/ScadaLink.Communication/CommunicationService.cs b/src/ScadaLink.Communication/CommunicationService.cs index a6ea2c7..c83901b 100644 --- a/src/ScadaLink.Communication/CommunicationService.cs +++ b/src/ScadaLink.Communication/CommunicationService.cs @@ -2,6 +2,7 @@ using Akka.Actor; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using ScadaLink.Commons.Messages.Artifacts; +using ScadaLink.Commons.Messages.Audit; using ScadaLink.Commons.Messages.DebugView; using ScadaLink.Commons.Messages.Deployment; using ScadaLink.Commons.Messages.Health; @@ -25,6 +26,7 @@ public class CommunicationService private readonly ILogger _logger; private IActorRef? _centralCommunicationActor; private IActorRef? _notificationOutboxProxy; + private IActorRef? _siteCallAuditProxy; public CommunicationService( IOptions options, @@ -52,6 +54,17 @@ public class CommunicationService _notificationOutboxProxy = notificationOutboxProxy; } + /// + /// Sets the Site Call Audit (#22) singleton proxy reference. Called during + /// actor system startup. The Site Call Audit actor is central-local, so Site + /// Calls read calls Ask this proxy directly (no SiteEnvelope routing), the + /// same pattern as . + /// + public void SetSiteCallAudit(IActorRef siteCallAuditProxy) + { + _siteCallAuditProxy = siteCallAuditProxy; + } + /// /// Triggers an immediate refresh of the site address cache from the database. /// @@ -80,6 +93,15 @@ public class CommunicationService ?? throw new InvalidOperationException("CommunicationService not initialized. NotificationOutbox proxy not set."); } + /// + /// Gets the Site Call Audit proxy reference. Throws if not yet initialized. + /// + private IActorRef GetSiteCallAudit() + { + return _siteCallAuditProxy + ?? throw new InvalidOperationException("CommunicationService not initialized. SiteCallAudit proxy not set."); + } + // ── Pattern 1: Instance Deployment ── public async Task DeployInstanceAsync( @@ -295,6 +317,36 @@ public class CommunicationService return await GetNotificationOutbox().Ask( request, _options.QueryTimeout, cancellationToken); } + + // ── Site Call Audit (central-local actor — Asked directly, no SiteEnvelope) ── + + public async Task QuerySiteCallsAsync( + SiteCallQueryRequest request, CancellationToken cancellationToken = default) + { + return await GetSiteCallAudit().Ask( + request, _options.QueryTimeout, cancellationToken); + } + + public async Task GetSiteCallDetailAsync( + SiteCallDetailRequest request, CancellationToken cancellationToken = default) + { + return await GetSiteCallAudit().Ask( + request, _options.QueryTimeout, cancellationToken); + } + + public async Task GetSiteCallKpisAsync( + SiteCallKpiRequest request, CancellationToken cancellationToken = default) + { + return await GetSiteCallAudit().Ask( + request, _options.QueryTimeout, cancellationToken); + } + + public async Task GetPerSiteSiteCallKpisAsync( + PerSiteSiteCallKpiRequest request, CancellationToken cancellationToken = default) + { + return await GetSiteCallAudit().Ask( + request, _options.QueryTimeout, cancellationToken); + } } /// diff --git a/src/ScadaLink.ConfigurationDatabase/Repositories/SiteCallAuditRepository.cs b/src/ScadaLink.ConfigurationDatabase/Repositories/SiteCallAuditRepository.cs index 3fdff7e..954e490 100644 --- a/src/ScadaLink.ConfigurationDatabase/Repositories/SiteCallAuditRepository.cs +++ b/src/ScadaLink.ConfigurationDatabase/Repositories/SiteCallAuditRepository.cs @@ -201,6 +201,141 @@ ORDER BY CreatedAtUtc DESC, TrackedOperationId DESC;"; ct); } + // Terminal status string literals for the interval-throughput KPIs. The + // Status column is a plain varchar (no value converter), so these compare + // directly in translated SQL. + // + // NOTE on the "buffered/non-terminal" definition: the SiteCalls operational + // mirror stores AuditStatus-derived strings (Attempted/Delivered/Parked/ + // Failed/...), NOT the tracking-lifecycle Pending/Retrying names the spec's + // KPI section uses. There is therefore no Status string that means + // "buffered". The schema-honest predicate for "non-terminal / buffered" is + // TerminalAtUtc IS NULL — consistent with PurgeTerminalAsync's terminal + // predicate and with the SiteCall entity's own contract ("TerminalAtUtc ... + // null while still active"). All buffered / stuck / oldest-pending counts + // below key off TerminalAtUtc, not Status. + private const string StatusParked = "Parked"; + private const string StatusDelivered = "Delivered"; + private const string StatusFailed = "Failed"; + + /// + /// Computes the global KPI snapshot with five server-side aggregate queries + /// against dbo.SiteCalls. No rows are materialised — every count is a + /// translated COUNT and the oldest-pending age is a translated + /// MIN(CreatedAtUtc). The Status and CreatedAtUtc/TerminalAtUtc + /// columns have no value converter, so the aggregates translate cleanly to + /// SQL Server (unlike the NotificationOutbox's DateTimeOffset-converted + /// column, which forces an order-and-take). "Buffered" / "stuck" key off + /// TerminalAtUtc IS NULL — see the field comments above. + /// + public async Task ComputeKpisAsync( + DateTime stuckCutoff, DateTime intervalSince, CancellationToken ct = default) + { + var now = DateTime.UtcNow; + + var bufferedCount = await _context.SiteCalls + .CountAsync(s => s.TerminalAtUtc == null, ct); + + var parkedCount = await _context.SiteCalls + .CountAsync(s => s.Status == StatusParked, ct); + + var failedLastInterval = await _context.SiteCalls + .CountAsync(s => s.Status == StatusFailed + && s.TerminalAtUtc != null + && s.TerminalAtUtc >= intervalSince, ct); + + var deliveredLastInterval = await _context.SiteCalls + .CountAsync(s => s.Status == StatusDelivered + && s.TerminalAtUtc != null + && s.TerminalAtUtc >= intervalSince, ct); + + var stuckCount = await _context.SiteCalls + .CountAsync(s => s.TerminalAtUtc == null && s.CreatedAtUtc < stuckCutoff, ct); + + var nonTerminal = _context.SiteCalls.Where(s => s.TerminalAtUtc == null); + + TimeSpan? oldestPendingAge = null; + if (await nonTerminal.AnyAsync(ct)) + { + var oldestCreatedAt = await nonTerminal.MinAsync(s => s.CreatedAtUtc, ct); + oldestPendingAge = now - oldestCreatedAt; + } + + return new SiteCallKpiSnapshot( + BufferedCount: bufferedCount, + ParkedCount: parkedCount, + FailedLastInterval: failedLastInterval, + DeliveredLastInterval: deliveredLastInterval, + OldestPendingAge: oldestPendingAge, + StuckCount: stuckCount); + } + + /// + /// Computes the per-source-site KPI breakdown. The five counts are + /// GROUP BY SourceSite aggregates; the oldest-pending age is a + /// per-site MIN(CreatedAtUtc) over the (bounded) non-terminal set — + /// all run server-side. A site appears in the result only if it has at + /// least one row matched by one of the count queries. "Buffered" / "stuck" + /// key off TerminalAtUtc IS NULL — see . + /// + public async Task> ComputePerSiteKpisAsync( + DateTime stuckCutoff, DateTime intervalSince, CancellationToken ct = default) + { + var now = DateTime.UtcNow; + + var buffered = await CountBySiteAsync(s => s.TerminalAtUtc == null, ct); + + var parked = await CountBySiteAsync(s => s.Status == StatusParked, ct); + + var failed = await CountBySiteAsync( + s => s.Status == StatusFailed + && s.TerminalAtUtc != null && s.TerminalAtUtc >= intervalSince, ct); + + var delivered = await CountBySiteAsync( + s => s.Status == StatusDelivered + && s.TerminalAtUtc != null && s.TerminalAtUtc >= intervalSince, ct); + + var stuck = await CountBySiteAsync( + s => s.TerminalAtUtc == null && s.CreatedAtUtc < stuckCutoff, ct); + + // Oldest non-terminal CreatedAtUtc per site — a server-side GROUP BY MIN. + var oldest = (await _context.SiteCalls + .Where(s => s.TerminalAtUtc == null) + .GroupBy(s => s.SourceSite) + .Select(g => new { Site = g.Key, Oldest = g.Min(s => s.CreatedAtUtc) }) + .ToListAsync(ct)) + .ToDictionary(x => x.Site, x => x.Oldest); + + var siteIds = buffered.Keys + .Concat(parked.Keys).Concat(failed.Keys) + .Concat(delivered.Keys).Concat(stuck.Keys) + .Distinct() + .OrderBy(s => s, StringComparer.Ordinal); + + return siteIds.Select(site => new SiteCallSiteKpiSnapshot( + SourceSite: site, + BufferedCount: buffered.GetValueOrDefault(site), + ParkedCount: parked.GetValueOrDefault(site), + FailedLastInterval: failed.GetValueOrDefault(site), + DeliveredLastInterval: delivered.GetValueOrDefault(site), + OldestPendingAge: oldest.TryGetValue(site, out var createdAt) + ? now - createdAt + : null, + StuckCount: stuck.GetValueOrDefault(site))).ToList(); + } + + /// Counts SiteCalls rows matching , grouped by source site. + private async Task> CountBySiteAsync( + System.Linq.Expressions.Expression> predicate, + CancellationToken ct) + { + return await _context.SiteCalls + .Where(predicate) + .GroupBy(s => s.SourceSite) + .Select(g => new { Site = g.Key, Count = g.Count() }) + .ToDictionaryAsync(x => x.Site, x => x.Count, ct); + } + private static int GetRankOrThrow(string status) { if (!StatusRank.TryGetValue(status, out var rank)) diff --git a/src/ScadaLink.Host/Actors/AkkaHostedService.cs b/src/ScadaLink.Host/Actors/AkkaHostedService.cs index ac9bb89..4708744 100644 --- a/src/ScadaLink.Host/Actors/AkkaHostedService.cs +++ b/src/ScadaLink.Host/Actors/AkkaHostedService.cs @@ -423,10 +423,13 @@ akka {{ // is a scoped EF Core service. var siteCallAuditLogger = _serviceProvider.GetRequiredService() .CreateLogger(); + var siteCallAuditOptions = _serviceProvider + .GetRequiredService>().Value; var siteCallAuditSingletonProps = ClusterSingletonManager.Props( singletonProps: Props.Create(() => new ScadaLink.SiteCallAudit.SiteCallAuditActor( _serviceProvider, + siteCallAuditOptions, siteCallAuditLogger)), terminationMessage: PoisonPill.Instance, settings: ClusterSingletonManagerSettings.Create(_actorSystem!) @@ -437,7 +440,12 @@ akka {{ singletonManagerPath: "/user/site-call-audit-singleton", settings: ClusterSingletonProxySettings.Create(_actorSystem) .WithSingletonName("site-call-audit")); - _actorSystem.ActorOf(siteCallAuditProxyProps, "site-call-audit-proxy"); + var siteCallAuditProxy = _actorSystem.ActorOf(siteCallAuditProxyProps, "site-call-audit-proxy"); + + // Hand the proxy to the CommunicationService so the Central UI can Ask + // the Site Call Audit actor directly (query, KPIs, detail) — mirrors the + // SetNotificationOutbox wiring above. + commService?.SetSiteCallAudit(siteCallAuditProxy); _logger.LogInformation("SiteCallAuditActor singleton created"); _logger.LogInformation("Central actors registered. CentralCommunicationActor created."); diff --git a/src/ScadaLink.SiteCallAudit/ScadaLink.SiteCallAudit.csproj b/src/ScadaLink.SiteCallAudit/ScadaLink.SiteCallAudit.csproj index d8b0e7a..7603dd6 100644 --- a/src/ScadaLink.SiteCallAudit/ScadaLink.SiteCallAudit.csproj +++ b/src/ScadaLink.SiteCallAudit/ScadaLink.SiteCallAudit.csproj @@ -13,6 +13,8 @@ + + diff --git a/src/ScadaLink.SiteCallAudit/ServiceCollectionExtensions.cs b/src/ScadaLink.SiteCallAudit/ServiceCollectionExtensions.cs index 6eb0d80..e764b68 100644 --- a/src/ScadaLink.SiteCallAudit/ServiceCollectionExtensions.cs +++ b/src/ScadaLink.SiteCallAudit/ServiceCollectionExtensions.cs @@ -7,33 +7,34 @@ namespace ScadaLink.SiteCallAudit; /// /// /// -/// M3 Bundle C ships the ingest-only minimum surface (the actor itself); the -/// full DI surface — reconciliation puller, KPI projector, central→site -/// Retry/Discard relay, options + validators — is deferred to a follow-up. +/// Binds (stuck-call detection + KPI +/// windowing for the read-side query/KPI handlers). The reconciliation puller +/// and central→site Retry/Discard relay are still deferred to later follow-ups. /// /// /// The repository (ISiteCallAuditRepository) is registered by /// ScadaLink.ConfigurationDatabase.ServiceCollectionExtensions.AddConfigurationDatabase, /// so callers (the Host on the central node) must also call that. The actor's -/// Props are wired up in Host registration (Bundle F); this extension -/// is currently a no-op placeholder kept for symmetry with the AuditLog and -/// NotificationOutbox composition roots — adding it now means consumers can -/// reference the method without re-touching the Host project later. +/// Props are wired up in Host registration. /// /// public static class ServiceCollectionExtensions { + /// Configuration section bound to . + public const string OptionsSection = "ScadaLink:SiteCallAudit"; + /// - /// Registers Site Call Audit (#22) services. Currently a no-op - /// placeholder — Bundle F will populate this with the actor's Props - /// factory + options bindings. The method is exposed now so the Host - /// wiring call already exists at the API boundary. + /// Registers Site Call Audit (#22) services: the + /// binding consumed by the actor's read-side KPI/query handlers. The actor's + /// Props are still constructed inline in Host wiring. /// public static IServiceCollection AddSiteCallAudit(this IServiceCollection services) { ArgumentNullException.ThrowIfNull(services); - // Actor props are constructed in Host wiring (Bundle F). This - // extension is a placeholder for future config + DI. + + services.AddOptions() + .BindConfiguration(OptionsSection); + return services; } } diff --git a/src/ScadaLink.SiteCallAudit/SiteCallAuditActor.cs b/src/ScadaLink.SiteCallAudit/SiteCallAuditActor.cs index 7506681..a6537dd 100644 --- a/src/ScadaLink.SiteCallAudit/SiteCallAuditActor.cs +++ b/src/ScadaLink.SiteCallAudit/SiteCallAuditActor.cs @@ -1,8 +1,11 @@ using Akka.Actor; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; +using ScadaLink.Commons.Entities.Audit; using ScadaLink.Commons.Interfaces.Repositories; using ScadaLink.Commons.Messages.Audit; +using ScadaLink.Commons.Types; +using ScadaLink.Commons.Types.Audit; namespace ScadaLink.SiteCallAudit; @@ -42,26 +45,34 @@ namespace ScadaLink.SiteCallAudit; /// public class SiteCallAuditActor : ReceiveActor { + /// Maximum page size honoured by a . + private const int MaxPageSize = 200; + private readonly IServiceProvider? _serviceProvider; private readonly ISiteCallAuditRepository? _injectedRepository; + private readonly SiteCallAuditOptions _options; private readonly ILogger _logger; /// /// Test-mode constructor — injects a concrete repository instance whose /// lifetime exceeds the test, so the actor reuses the same instance /// across every message. Used by Bundle C's MSSQL-backed TestKit fixture. + /// An optional lets a test pin the stuck/KPI + /// windows; when omitted the production defaults apply. /// public SiteCallAuditActor( ISiteCallAuditRepository repository, - ILogger logger) + ILogger logger, + SiteCallAuditOptions? options = null) { ArgumentNullException.ThrowIfNull(repository); ArgumentNullException.ThrowIfNull(logger); _injectedRepository = repository; _logger = logger; + _options = options ?? new SiteCallAuditOptions(); - ReceiveAsync(OnUpsertAsync); + RegisterHandlers(); } /// @@ -73,15 +84,33 @@ public class SiteCallAuditActor : ReceiveActor /// public SiteCallAuditActor( IServiceProvider serviceProvider, + SiteCallAuditOptions options, ILogger logger) { ArgumentNullException.ThrowIfNull(serviceProvider); + ArgumentNullException.ThrowIfNull(options); ArgumentNullException.ThrowIfNull(logger); _serviceProvider = serviceProvider; + _options = options; _logger = logger; + RegisterHandlers(); + } + + /// + /// Wires up the message handlers shared by both constructors: the M3 + /// ingest path plus the Task 4 read-side (query, detail, global + per-site + /// KPI). All read handlers reply to an Ask, so they capture Sender + /// before the first await and PipeTo the result back. + /// + private void RegisterHandlers() + { ReceiveAsync(OnUpsertAsync); + Receive(HandleQuery); + Receive(HandleDetail); + Receive(HandleKpi); + Receive(HandlePerSiteKpi); } /// @@ -137,4 +166,305 @@ public class SiteCallAuditActor : ReceiveActor scope?.Dispose(); } } + + // ── Task 4: read-side (query / detail / KPI) ── + + /// + /// Handles a paginated, filtered query over the SiteCalls table. + /// Builds a + + /// keyset cursor from the request, runs the query on a scoped repository, + /// and pipes the mapped response back to the captured sender. A repository + /// fault yields a failure response with an empty list. + /// + private void HandleQuery(SiteCallQueryRequest request) + { + var sender = Sender; + var now = DateTime.UtcNow; + + QueryAsync(request, now).PipeTo( + sender, + success: response => response, + failure: ex => new SiteCallQueryResponse( + request.CorrelationId, + Success: false, + ErrorMessage: ex.GetBaseException().Message, + SiteCalls: Array.Empty(), + NextAfterCreatedAtUtc: null, + NextAfterId: null)); + } + + private async Task QueryAsync(SiteCallQueryRequest request, DateTime now) + { + var filter = new SiteCallQueryFilter( + Channel: NullIfBlank(request.ChannelFilter), + SourceSite: NullIfBlank(request.SourceSiteFilter), + Status: NullIfBlank(request.StatusFilter), + Target: NullIfBlank(request.TargetKeyword), + FromUtc: request.FromUtc, + ToUtc: request.ToUtc); + + var pageSize = Math.Clamp(request.PageSize, 1, MaxPageSize); + var paging = new SiteCallPaging( + PageSize: pageSize, + AfterCreatedAtUtc: request.AfterCreatedAtUtc, + AfterId: request.AfterId is { } id ? new TrackedOperationId(id) : null); + + var (scope, repository) = ResolveRepository(); + try + { + var rows = await repository.QueryAsync(filter, paging).ConfigureAwait(false); + + var stuckCutoff = now - _options.StuckAgeThreshold; + var summaries = rows + // StuckOnly is post-filtered here rather than pushed into the + // repository SQL — the SiteCallQueryFilter has no stuck predicate + // and a status-aware created-before clause does not compose with + // the keyset cursor. The page may therefore return fewer than + // PageSize rows when StuckOnly is set; that is acceptable for a + // display-only filter. + .Where(row => !request.StuckOnly || IsStuck(row, stuckCutoff)) + .Select(row => ToSummary(row, stuckCutoff)) + .ToList(); + + // The next-page cursor is the LAST row of the materialised page — + // before StuckOnly post-filtering, so paging still advances even + // when every row on a page was filtered out. + var cursorRow = rows.Count > 0 ? rows[^1] : null; + + return new SiteCallQueryResponse( + request.CorrelationId, + Success: true, + ErrorMessage: null, + SiteCalls: summaries, + NextAfterCreatedAtUtc: cursorRow?.CreatedAtUtc, + NextAfterId: cursorRow?.TrackedOperationId.Value); + } + finally + { + scope?.Dispose(); + } + } + + /// + /// Handles a full-detail query for a single cached call — backs the report + /// detail modal. A missing row yields Success=false with a "not + /// found" message; a repository fault yields Success=false with the + /// fault message. + /// + private void HandleDetail(SiteCallDetailRequest request) + { + var sender = Sender; + + DetailAsync(request).PipeTo( + sender, + success: response => response, + failure: ex => new SiteCallDetailResponse( + request.CorrelationId, + Success: false, + ErrorMessage: ex.GetBaseException().Message, + Detail: null)); + } + + private async Task DetailAsync(SiteCallDetailRequest request) + { + var (scope, repository) = ResolveRepository(); + try + { + var row = await repository + .GetAsync(new TrackedOperationId(request.TrackedOperationId)) + .ConfigureAwait(false); + + if (row is null) + { + return new SiteCallDetailResponse( + request.CorrelationId, + Success: false, + ErrorMessage: "site call not found", + Detail: null); + } + + return new SiteCallDetailResponse( + request.CorrelationId, + Success: true, + ErrorMessage: null, + Detail: ToDetail(row)); + } + finally + { + scope?.Dispose(); + } + } + + /// + /// Handles a global KPI snapshot request, deriving the stuck cutoff from + /// and the + /// failed/delivered interval bound from . + /// + private void HandleKpi(SiteCallKpiRequest request) + { + var sender = Sender; + var now = DateTime.UtcNow; + var stuckCutoff = now - _options.StuckAgeThreshold; + var intervalSince = now - _options.KpiInterval; + + KpiAsync(request.CorrelationId, stuckCutoff, intervalSince).PipeTo( + sender, + success: response => response, + failure: ex => new SiteCallKpiResponse( + request.CorrelationId, + Success: false, + ErrorMessage: ex.GetBaseException().Message, + BufferedCount: 0, + ParkedCount: 0, + FailedLastInterval: 0, + DeliveredLastInterval: 0, + OldestPendingAge: null, + StuckCount: 0)); + } + + private async Task KpiAsync( + string correlationId, DateTime stuckCutoff, DateTime intervalSince) + { + var (scope, repository) = ResolveRepository(); + try + { + var snapshot = await repository + .ComputeKpisAsync(stuckCutoff, intervalSince) + .ConfigureAwait(false); + + return new SiteCallKpiResponse( + correlationId, + Success: true, + ErrorMessage: null, + snapshot.BufferedCount, + snapshot.ParkedCount, + snapshot.FailedLastInterval, + snapshot.DeliveredLastInterval, + snapshot.OldestPendingAge, + snapshot.StuckCount); + } + finally + { + scope?.Dispose(); + } + } + + /// + /// Handles a per-source-site KPI request, using the same stuck cutoff and + /// interval bound as . + /// + private void HandlePerSiteKpi(PerSiteSiteCallKpiRequest request) + { + var sender = Sender; + var now = DateTime.UtcNow; + var stuckCutoff = now - _options.StuckAgeThreshold; + var intervalSince = now - _options.KpiInterval; + + PerSiteKpiAsync(request.CorrelationId, stuckCutoff, intervalSince).PipeTo( + sender, + success: response => response, + failure: ex => new PerSiteSiteCallKpiResponse( + request.CorrelationId, + Success: false, + ErrorMessage: ex.GetBaseException().Message, + Sites: Array.Empty())); + } + + private async Task PerSiteKpiAsync( + string correlationId, DateTime stuckCutoff, DateTime intervalSince) + { + var (scope, repository) = ResolveRepository(); + try + { + var sites = await repository + .ComputePerSiteKpisAsync(stuckCutoff, intervalSince) + .ConfigureAwait(false); + + return new PerSiteSiteCallKpiResponse( + correlationId, Success: true, ErrorMessage: null, sites); + } + finally + { + scope?.Dispose(); + } + } + + /// + /// Resolves an for one read message. + /// In test mode the injected instance is returned with a null scope; in + /// production a fresh DI scope is created and returned so the caller can + /// dispose it once the read completes — the same scope-per-message pattern + /// as . + /// + private (IServiceScope? Scope, ISiteCallAuditRepository Repository) ResolveRepository() + { + if (_injectedRepository is not null) + { + return (null, _injectedRepository); + } + + var scope = _serviceProvider!.CreateScope(); + return (scope, scope.ServiceProvider.GetRequiredService()); + } + + /// + /// A cached call counts as stuck when it is still non-terminal and was + /// created before . Non-terminal is keyed off + /// being null — the + /// SiteCalls operational mirror stores AuditStatus-derived + /// status strings (Attempted/Delivered/Parked/...), not + /// the tracking-lifecycle Pending/Retrying names the spec's + /// KPI section uses, so there is no status string that means "buffered". + /// TerminalAtUtc is the entity's own active/terminal discriminator + /// and is consistent with the repository KPI counts and + /// PurgeTerminalAsync. + /// + private static bool IsStuck(SiteCall row, DateTime stuckCutoff) + { + return row.TerminalAtUtc is null && row.CreatedAtUtc < stuckCutoff; + } + + private static SiteCallSummary ToSummary(SiteCall row, DateTime stuckCutoff) + { + return new SiteCallSummary( + TrackedOperationId: row.TrackedOperationId.Value, + SourceSite: row.SourceSite, + Channel: row.Channel, + Target: row.Target, + Status: row.Status, + RetryCount: row.RetryCount, + LastError: row.LastError, + HttpStatus: row.HttpStatus, + CreatedAtUtc: row.CreatedAtUtc, + UpdatedAtUtc: row.UpdatedAtUtc, + TerminalAtUtc: row.TerminalAtUtc, + IsStuck: IsStuck(row, stuckCutoff)); + } + + private static SiteCallDetail ToDetail(SiteCall row) + { + return new SiteCallDetail( + TrackedOperationId: row.TrackedOperationId.Value, + SourceSite: row.SourceSite, + Channel: row.Channel, + Target: row.Target, + Status: row.Status, + RetryCount: row.RetryCount, + LastError: row.LastError, + HttpStatus: row.HttpStatus, + CreatedAtUtc: row.CreatedAtUtc, + UpdatedAtUtc: row.UpdatedAtUtc, + TerminalAtUtc: row.TerminalAtUtc, + IngestedAtUtc: row.IngestedAtUtc); + } + + /// + /// Treats an empty/whitespace filter string as "no constraint" — the + /// repository's interprets null as + /// a no-op predicate, so a blank UI filter must collapse to null. + /// + private static string? NullIfBlank(string? value) + { + return string.IsNullOrWhiteSpace(value) ? null : value; + } } diff --git a/src/ScadaLink.SiteCallAudit/SiteCallAuditOptions.cs b/src/ScadaLink.SiteCallAudit/SiteCallAuditOptions.cs new file mode 100644 index 0000000..572fec6 --- /dev/null +++ b/src/ScadaLink.SiteCallAudit/SiteCallAuditOptions.cs @@ -0,0 +1,26 @@ +namespace ScadaLink.SiteCallAudit; + +/// +/// Configuration options for the Site Call Audit (#22) read-side: stuck-call +/// detection and KPI windowing. Mirrors the KPI-relevant subset of +/// NotificationOutboxOptions — the reconciliation, purge and dispatch +/// cadence options the Notification Outbox carries are not part of the Site +/// Call Audit read-side backend and are deliberately omitted here. +/// +public class SiteCallAuditOptions +{ + /// + /// Age past which a non-terminal cached call (Pending/Retrying) + /// is considered stuck. Display-only — surfaced as the Stuck KPI and a row + /// badge, with no escalation. Default 10 minutes, matching + /// NotificationOutboxOptions.StuckAgeThreshold. + /// + public TimeSpan StuckAgeThreshold { get; set; } = TimeSpan.FromMinutes(10); + + /// + /// Trailing window used to compute the delivered- and failed-last-interval + /// throughput KPIs. Default 1 minute, matching + /// NotificationOutboxOptions.DeliveredKpiWindow. + /// + public TimeSpan KpiInterval { get; set; } = TimeSpan.FromMinutes(1); +} diff --git a/tests/ScadaLink.AuditLog.Tests/Central/AuditLogIngestActorCombinedTelemetryTests.cs b/tests/ScadaLink.AuditLog.Tests/Central/AuditLogIngestActorCombinedTelemetryTests.cs index f61d1cd..7e3d1ea 100644 --- a/tests/ScadaLink.AuditLog.Tests/Central/AuditLogIngestActorCombinedTelemetryTests.cs +++ b/tests/ScadaLink.AuditLog.Tests/Central/AuditLogIngestActorCombinedTelemetryTests.cs @@ -356,6 +356,12 @@ public class AuditLogIngestActorCombinedTelemetryTests : TestKit, IClassFixture< _inner.QueryAsync(filter, paging, ct); public Task PurgeTerminalAsync(DateTime olderThanUtc, CancellationToken ct = default) => _inner.PurgeTerminalAsync(olderThanUtc, ct); + public Task ComputeKpisAsync( + DateTime stuckCutoff, DateTime intervalSince, CancellationToken ct = default) => + _inner.ComputeKpisAsync(stuckCutoff, intervalSince, ct); + public Task> ComputePerSiteKpisAsync( + DateTime stuckCutoff, DateTime intervalSince, CancellationToken ct = default) => + _inner.ComputePerSiteKpisAsync(stuckCutoff, intervalSince, ct); } /// @@ -387,5 +393,11 @@ public class AuditLogIngestActorCombinedTelemetryTests : TestKit, IClassFixture< _inner.QueryAsync(filter, paging, ct); public Task PurgeTerminalAsync(DateTime olderThanUtc, CancellationToken ct = default) => _inner.PurgeTerminalAsync(olderThanUtc, ct); + public Task ComputeKpisAsync( + DateTime stuckCutoff, DateTime intervalSince, CancellationToken ct = default) => + _inner.ComputeKpisAsync(stuckCutoff, intervalSince, ct); + public Task> ComputePerSiteKpisAsync( + DateTime stuckCutoff, DateTime intervalSince, CancellationToken ct = default) => + _inner.ComputePerSiteKpisAsync(stuckCutoff, intervalSince, ct); } } diff --git a/tests/ScadaLink.Commons.Tests/Messages/SiteCallQueriesTests.cs b/tests/ScadaLink.Commons.Tests/Messages/SiteCallQueriesTests.cs new file mode 100644 index 0000000..a702e14 --- /dev/null +++ b/tests/ScadaLink.Commons.Tests/Messages/SiteCallQueriesTests.cs @@ -0,0 +1,128 @@ +using ScadaLink.Commons.Messages.Audit; +using ScadaLink.Commons.Types.Audit; + +namespace ScadaLink.Commons.Tests.Messages; + +/// +/// Site Call Audit (#22): construction, value-equality and optionality tests +/// for the Site Calls UI query / KPI / detail message contracts. Mirrors the +/// Notification Outbox NotificationMessagesTests coverage of the read +/// side, scoped to the contracts the Site Calls page consumes. +/// +public class SiteCallQueriesTests +{ + [Fact] + public void SiteCallQueryRequest_PositionalConstruction_SetsAllFields() + { + var afterCreated = DateTime.UtcNow; + var afterId = Guid.NewGuid(); + var request = new SiteCallQueryRequest( + "corr-1", "Parked", "plant-a", "ApiOutbound", "ERP.GetOrder", true, + new DateTime(2026, 5, 1), new DateTime(2026, 5, 20), afterCreated, afterId, 50); + + Assert.Equal("corr-1", request.CorrelationId); + Assert.Equal("Parked", request.StatusFilter); + Assert.Equal("plant-a", request.SourceSiteFilter); + Assert.Equal("ApiOutbound", request.ChannelFilter); + Assert.Equal("ERP.GetOrder", request.TargetKeyword); + Assert.True(request.StuckOnly); + Assert.Equal(new DateTime(2026, 5, 1), request.FromUtc); + Assert.Equal(new DateTime(2026, 5, 20), request.ToUtc); + Assert.Equal(afterCreated, request.AfterCreatedAtUtc); + Assert.Equal(afterId, request.AfterId); + Assert.Equal(50, request.PageSize); + } + + [Fact] + public void SiteCallQueryRequest_AllowsNullOptionalFilters() + { + var request = new SiteCallQueryRequest( + "corr-2", null, null, null, null, false, null, null, null, null, 25); + + Assert.Null(request.StatusFilter); + Assert.Null(request.SourceSiteFilter); + Assert.Null(request.ChannelFilter); + Assert.Null(request.TargetKeyword); + Assert.False(request.StuckOnly); + Assert.Null(request.FromUtc); + Assert.Null(request.AfterId); + } + + [Fact] + public void SiteCallQueryResponse_ValueEquality_EqualWhenAllFieldsMatch() + { + var a = new SiteCallQueryResponse("c", true, null, Array.Empty(), null, null); + var b = new SiteCallQueryResponse("c", true, null, Array.Empty(), null, null); + + Assert.Equal(a, b); + Assert.Equal(a.GetHashCode(), b.GetHashCode()); + } + + [Fact] + public void SiteCallSummary_CarriesEntityColumnsAndStuckFlag() + { + var id = Guid.NewGuid(); + var created = DateTime.UtcNow.AddMinutes(-30); + var summary = new SiteCallSummary( + id, "plant-a", "DbOutbound", "InventoryDb", "Retrying", 3, + "transient 503", 503, created, created.AddMinutes(1), null, IsStuck: true); + + Assert.Equal(id, summary.TrackedOperationId); + Assert.Equal("DbOutbound", summary.Channel); + Assert.Equal("InventoryDb", summary.Target); + Assert.Equal("Retrying", summary.Status); + Assert.Equal(3, summary.RetryCount); + Assert.Equal(503, summary.HttpStatus); + Assert.Null(summary.TerminalAtUtc); + Assert.True(summary.IsStuck); + } + + [Fact] + public void SiteCallDetailResponse_MissingRow_HasNullDetail() + { + var response = new SiteCallDetailResponse("c", false, "site call not found", null); + + Assert.False(response.Success); + Assert.Null(response.Detail); + Assert.Equal("site call not found", response.ErrorMessage); + } + + [Fact] + public void SiteCallKpiResponse_FailureShape_ZeroesKpiFields() + { + var response = new SiteCallKpiResponse( + "c", Success: false, ErrorMessage: "db down", + BufferedCount: 0, ParkedCount: 0, FailedLastInterval: 0, + DeliveredLastInterval: 0, OldestPendingAge: null, StuckCount: 0); + + Assert.False(response.Success); + Assert.Equal("db down", response.ErrorMessage); + Assert.Equal(0, response.BufferedCount); + Assert.Null(response.OldestPendingAge); + } + + [Fact] + public void PerSiteSiteCallKpiResponse_CarriesPerSiteSnapshots() + { + var response = new PerSiteSiteCallKpiResponse( + "c", true, null, + new[] + { + new SiteCallSiteKpiSnapshot("plant-a", 4, 1, 0, 9, TimeSpan.FromMinutes(15), 2), + }); + + Assert.True(response.Success); + var site = Assert.Single(response.Sites); + Assert.Equal("plant-a", site.SourceSite); + Assert.Equal(4, site.BufferedCount); + Assert.Equal(2, site.StuckCount); + Assert.Equal(TimeSpan.FromMinutes(15), site.OldestPendingAge); + } + + [Fact] + public void SiteCallKpiSnapshot_OldestPendingAge_IsNullableForEmptyTable() + { + var snapshot = new SiteCallKpiSnapshot(0, 0, 0, 0, null, 0); + Assert.Null(snapshot.OldestPendingAge); + } +} diff --git a/tests/ScadaLink.Communication.Tests/CommunicationServiceTests.cs b/tests/ScadaLink.Communication.Tests/CommunicationServiceTests.cs index 7ac43dc..b1a1de8 100644 --- a/tests/ScadaLink.Communication.Tests/CommunicationServiceTests.cs +++ b/tests/ScadaLink.Communication.Tests/CommunicationServiceTests.cs @@ -2,8 +2,10 @@ using Akka.Actor; using Akka.TestKit.Xunit2; using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Options; +using ScadaLink.Commons.Messages.Audit; using ScadaLink.Commons.Messages.Deployment; using ScadaLink.Commons.Messages.Notification; +using ScadaLink.Commons.Types.Audit; using ScadaLink.Commons.Types.Notifications; namespace ScadaLink.Communication.Tests; @@ -236,6 +238,150 @@ public class CommunicationServiceTests : TestKit Assert.Equal("plant-a", result.Sites[0].SourceSiteId); } + // ── Site Call Audit: central-side audit actor calls ── + + [Fact] + public async Task QuerySiteCallsAsync_BeforeSiteCallAuditSet_Throws() + { + var service = new CommunicationService( + Options.Create(new CommunicationOptions()), + NullLogger.Instance); + + await Assert.ThrowsAsync(() => + service.QuerySiteCallsAsync(new SiteCallQueryRequest( + "corr-1", null, null, null, null, false, null, null, null, null, 50))); + } + + [Fact] + public async Task GetSiteCallKpisAsync_BeforeSiteCallAuditSet_Throws() + { + var service = new CommunicationService( + Options.Create(new CommunicationOptions()), + NullLogger.Instance); + + await Assert.ThrowsAsync(() => + service.GetSiteCallKpisAsync(new SiteCallKpiRequest("corr-1"))); + } + + [Fact] + public async Task GetSiteCallDetailAsync_BeforeSiteCallAuditSet_Throws() + { + var service = new CommunicationService( + Options.Create(new CommunicationOptions()), + NullLogger.Instance); + + await Assert.ThrowsAsync(() => + service.GetSiteCallDetailAsync(new SiteCallDetailRequest("corr-1", Guid.NewGuid()))); + } + + [Fact] + public async Task GetPerSiteSiteCallKpisAsync_BeforeSiteCallAuditSet_Throws() + { + var service = new CommunicationService( + Options.Create(new CommunicationOptions()), + NullLogger.Instance); + + await Assert.ThrowsAsync(() => + service.GetPerSiteSiteCallKpisAsync(new PerSiteSiteCallKpiRequest("corr-1"))); + } + + [Fact] + public async Task QuerySiteCallsAsync_AsksSiteCallAuditProxyDirectly() + { + // The Site Call Audit actor is central-local: the request must be Asked + // directly to its proxy (no SiteEnvelope wrapping). + var service = new CommunicationService( + Options.Create(new CommunicationOptions()), + NullLogger.Instance); + var probe = CreateTestProbe(); + service.SetSiteCallAudit(probe.Ref); + + var request = new SiteCallQueryRequest( + "corr-q", "Parked", "plant-a", "ApiOutbound", "ERP.GetOrder", true, + null, null, null, null, 25); + var task = service.QuerySiteCallsAsync(request); + + var received = probe.ExpectMsg(); + Assert.Same(request, received); + var reply = new SiteCallQueryResponse( + "corr-q", true, null, Array.Empty(), null, null); + probe.Reply(reply); + + Assert.Same(reply, await task); + } + + [Fact] + public async Task GetSiteCallDetailAsync_AsksSiteCallAuditProxyDirectly() + { + var service = new CommunicationService( + Options.Create(new CommunicationOptions()), + NullLogger.Instance); + var probe = CreateTestProbe(); + service.SetSiteCallAudit(probe.Ref); + + var request = new SiteCallDetailRequest("corr-d", Guid.NewGuid()); + var task = service.GetSiteCallDetailAsync(request); + + var received = probe.ExpectMsg(); + Assert.Same(request, received); + var reply = new SiteCallDetailResponse("corr-d", false, "site call not found", null); + probe.Reply(reply); + + var result = await task; + Assert.Same(reply, result); + Assert.False(result.Success); + } + + [Fact] + public async Task GetSiteCallKpisAsync_AsksSiteCallAuditProxyDirectly() + { + var service = new CommunicationService( + Options.Create(new CommunicationOptions()), + NullLogger.Instance); + var probe = CreateTestProbe(); + service.SetSiteCallAudit(probe.Ref); + + var request = new SiteCallKpiRequest("corr-k"); + var task = service.GetSiteCallKpisAsync(request); + + var received = probe.ExpectMsg(); + Assert.Same(request, received); + var reply = new SiteCallKpiResponse( + "corr-k", true, null, 4, 1, 2, 9, TimeSpan.FromMinutes(7), 1); + probe.Reply(reply); + + var result = await task; + Assert.Same(reply, result); + Assert.Equal(4, result.BufferedCount); + Assert.Equal(1, result.StuckCount); + } + + [Fact] + public async Task GetPerSiteSiteCallKpisAsync_AsksSiteCallAuditProxyDirectly() + { + var service = new CommunicationService( + Options.Create(new CommunicationOptions()), + NullLogger.Instance); + var probe = CreateTestProbe(); + service.SetSiteCallAudit(probe.Ref); + + var request = new PerSiteSiteCallKpiRequest("corr-ps"); + var task = service.GetPerSiteSiteCallKpisAsync(request); + + var received = probe.ExpectMsg(); + Assert.Same(request, received); + var reply = new PerSiteSiteCallKpiResponse( + "corr-ps", true, null, + new[] { new SiteCallSiteKpiSnapshot("plant-a", 3, 0, 0, 5, null, 0) }); + probe.Reply(reply); + + var result = await task; + Assert.Same(reply, result); + Assert.True(result.Success); + Assert.Single(result.Sites); + Assert.Equal("plant-a", result.Sites[0].SourceSite); + } + /// /// Stand-in for CentralCommunicationActor: verifies the message is wrapped /// in a SiteEnvelope targeting the requested site and replies with a typed diff --git a/tests/ScadaLink.ConfigurationDatabase.Tests/Repositories/SiteCallAuditRepositoryTests.cs b/tests/ScadaLink.ConfigurationDatabase.Tests/Repositories/SiteCallAuditRepositoryTests.cs index ae5dd90..1156420 100644 --- a/tests/ScadaLink.ConfigurationDatabase.Tests/Repositories/SiteCallAuditRepositoryTests.cs +++ b/tests/ScadaLink.ConfigurationDatabase.Tests/Repositories/SiteCallAuditRepositoryTests.cs @@ -338,6 +338,104 @@ public class SiteCallAuditRepositoryTests : IClassFixture Assert.NotNull(await repo.GetAsync(recentTerminalId)); } + // --- KPI snapshot tests ------------------------------------------------- + + [SkippableFact] + public async Task ComputeKpisAsync_CountsBufferedParkedFailedDeliveredAndStuck() + { + Skip.IfNot(_fixture.Available, _fixture.SkipReason); + + var site = NewSiteId(); + await using var context = CreateContext(); + var repo = new SiteCallAuditRepository(context); + + var now = DateTime.UtcNow; + var stuckCutoff = now.AddMinutes(-10); + var intervalSince = now.AddHours(-1); + + // Buffered + stuck (non-terminal Attempted, created 30 min ago). + await repo.UpsertAsync(NewRow( + TrackedOperationId.New(), site, status: "Attempted", createdAtUtc: now.AddMinutes(-30))); + // Buffered but NOT stuck (non-terminal Attempted, created 2 min ago). + await repo.UpsertAsync(NewRow( + TrackedOperationId.New(), site, status: "Attempted", createdAtUtc: now.AddMinutes(-2))); + // Parked (terminal). + await repo.UpsertAsync(NewRow( + TrackedOperationId.New(), site, status: "Parked", + createdAtUtc: now.AddMinutes(-5), updatedAtUtc: now.AddMinutes(-4), + terminal: true, terminalAtUtc: now.AddMinutes(-4))); + // Delivered within the interval. + await repo.UpsertAsync(NewRow( + TrackedOperationId.New(), site, status: "Delivered", + createdAtUtc: now.AddMinutes(-4), updatedAtUtc: now.AddMinutes(-1), + terminal: true, terminalAtUtc: now.AddMinutes(-1))); + // Failed within the interval. + await repo.UpsertAsync(NewRow( + TrackedOperationId.New(), site, status: "Failed", + createdAtUtc: now.AddMinutes(-6), updatedAtUtc: now.AddMinutes(-2), + terminal: true, terminalAtUtc: now.AddMinutes(-2))); + // Delivered OUTSIDE the interval (2 hours ago) — must not count. + await repo.UpsertAsync(NewRow( + TrackedOperationId.New(), site, status: "Delivered", + createdAtUtc: now.AddHours(-3), updatedAtUtc: now.AddHours(-2), + terminal: true, terminalAtUtc: now.AddHours(-2))); + + var snapshot = await repo.ComputeKpisAsync(stuckCutoff, intervalSince); + + // Counts are global; assert the floor since the table is shared with + // other tests. The OUTSIDE-interval Delivered row proves the window + // bounds the throughput counts. + Assert.True(snapshot.BufferedCount >= 2); + Assert.True(snapshot.ParkedCount >= 1); + Assert.True(snapshot.StuckCount >= 1); + Assert.True(snapshot.DeliveredLastInterval >= 1); + Assert.True(snapshot.FailedLastInterval >= 1); + Assert.NotNull(snapshot.OldestPendingAge); + Assert.True(snapshot.OldestPendingAge >= TimeSpan.FromMinutes(25)); + } + + [SkippableFact] + public async Task ComputePerSiteKpisAsync_ScopesCountsToEachSite() + { + Skip.IfNot(_fixture.Available, _fixture.SkipReason); + + var siteA = NewSiteId(); + var siteB = NewSiteId(); + await using var context = CreateContext(); + var repo = new SiteCallAuditRepository(context); + + var now = DateTime.UtcNow; + var stuckCutoff = now.AddMinutes(-10); + var intervalSince = now.AddHours(-1); + + // siteA: 2 buffered (one stuck), 1 parked. + await repo.UpsertAsync(NewRow(TrackedOperationId.New(), siteA, status: "Attempted", createdAtUtc: now.AddMinutes(-30))); + await repo.UpsertAsync(NewRow(TrackedOperationId.New(), siteA, status: "Attempted", createdAtUtc: now.AddMinutes(-2))); + await repo.UpsertAsync(NewRow( + TrackedOperationId.New(), siteA, status: "Parked", + createdAtUtc: now.AddMinutes(-5), updatedAtUtc: now.AddMinutes(-4), + terminal: true, terminalAtUtc: now.AddMinutes(-4))); + // siteB: 1 delivered within interval only. + await repo.UpsertAsync(NewRow( + TrackedOperationId.New(), siteB, status: "Delivered", + createdAtUtc: now.AddMinutes(-4), updatedAtUtc: now.AddMinutes(-1), + terminal: true, terminalAtUtc: now.AddMinutes(-1))); + + var perSite = await repo.ComputePerSiteKpisAsync(stuckCutoff, intervalSince); + + var a = Assert.Single(perSite, s => s.SourceSite == siteA); + Assert.Equal(2, a.BufferedCount); + Assert.Equal(1, a.ParkedCount); + Assert.Equal(1, a.StuckCount); + Assert.NotNull(a.OldestPendingAge); + + var b = Assert.Single(perSite, s => s.SourceSite == siteB); + Assert.Equal(0, b.BufferedCount); + Assert.Equal(1, b.DeliveredLastInterval); + // siteB has no non-terminal rows — no oldest-pending age. + Assert.Null(b.OldestPendingAge); + } + // --- helpers ------------------------------------------------------------ private ScadaLinkDbContext CreateContext() diff --git a/tests/ScadaLink.SiteCallAudit.Tests/SiteCallAuditActorTests.cs b/tests/ScadaLink.SiteCallAudit.Tests/SiteCallAuditActorTests.cs index e9ef807..73f3ead 100644 --- a/tests/ScadaLink.SiteCallAudit.Tests/SiteCallAuditActorTests.cs +++ b/tests/ScadaLink.SiteCallAudit.Tests/SiteCallAuditActorTests.cs @@ -70,10 +70,12 @@ public class SiteCallAuditActorTests : TestKit, IClassFixture + private IActorRef CreateActor( + ISiteCallAuditRepository repository, SiteCallAuditOptions? options = null) => Sys.ActorOf(Props.Create(() => new SiteCallAuditActor( repository, - NullLogger.Instance))); + NullLogger.Instance, + options))); [SkippableFact] public async Task Receive_UpsertSiteCallCommand_Persists_Replies_Accepted() @@ -182,6 +184,291 @@ public class SiteCallAuditActorTests : TestKit, IClassFixture(TimeSpan.FromSeconds(10)); + Assert.True(response.Success); + Assert.Equal("corr-q1", response.CorrelationId); + Assert.Equal(2, response.SiteCalls.Count); + Assert.All(response.SiteCalls, s => Assert.Equal(siteId, s.SourceSite)); + // Newest first — ordered (CreatedAtUtc DESC). + Assert.Equal("Delivered", response.SiteCalls[0].Status); + // Cursor echoes the last (oldest) row of the page. + Assert.Equal(t0, response.NextAfterCreatedAtUtc); + Assert.Equal(response.SiteCalls[^1].TrackedOperationId, response.NextAfterId); + } + + [SkippableFact] + public async Task SiteCallQueryRequest_KeysetPaging_AdvancesViaCursor() + { + Skip.IfNot(_fixture.Available, _fixture.SkipReason); + + var siteId = NewSiteId(); + await using var context = CreateContext(); + var repo = new SiteCallAuditRepository(context); + var actor = CreateActor(repo); + + var t0 = new DateTime(2026, 5, 20, 13, 0, 0, DateTimeKind.Utc); + for (var i = 0; i < 3; i++) + { + await repo.UpsertAsync(NewRow(TrackedOperationId.New(), siteId, createdAtUtc: t0.AddMinutes(i))); + } + + actor.Tell( + new SiteCallQueryRequest( + "corr-q2", null, siteId, null, null, false, null, null, null, null, PageSize: 2), + TestActor); + var page1 = ExpectMsg(TimeSpan.FromSeconds(10)); + Assert.Equal(2, page1.SiteCalls.Count); + + actor.Tell( + new SiteCallQueryRequest( + "corr-q3", null, siteId, null, null, false, null, null, + page1.NextAfterCreatedAtUtc, page1.NextAfterId, PageSize: 2), + TestActor); + var page2 = ExpectMsg(TimeSpan.FromSeconds(10)); + Assert.Single(page2.SiteCalls); + + // No overlap across the two pages. + var allIds = page1.SiteCalls.Concat(page2.SiteCalls) + .Select(s => s.TrackedOperationId).ToHashSet(); + Assert.Equal(3, allIds.Count); + } + + [SkippableFact] + public async Task SiteCallQueryRequest_StuckOnly_ReturnsOnlyOldNonTerminalRows() + { + Skip.IfNot(_fixture.Available, _fixture.SkipReason); + + var siteId = NewSiteId(); + await using var context = CreateContext(); + var repo = new SiteCallAuditRepository(context); + // 10-minute stuck threshold (the production default). + var actor = CreateActor(repo, new SiteCallAuditOptions { StuckAgeThreshold = TimeSpan.FromMinutes(10) }); + + var now = DateTime.UtcNow; + // Stuck: non-terminal (Attempted, TerminalAtUtc null), created 30 min ago. + var stuckId = TrackedOperationId.New(); + await repo.UpsertAsync(NewRow(stuckId, siteId, status: "Attempted", createdAtUtc: now.AddMinutes(-30))); + // Not stuck: non-terminal but recent. + await repo.UpsertAsync(NewRow(TrackedOperationId.New(), siteId, status: "Attempted", createdAtUtc: now.AddMinutes(-2))); + // Not stuck: old but terminal (Delivered, TerminalAtUtc set). + await repo.UpsertAsync(NewRow( + TrackedOperationId.New(), siteId, status: "Delivered", + createdAtUtc: now.AddMinutes(-40), terminal: true)); + + actor.Tell( + new SiteCallQueryRequest( + "corr-stuck", null, siteId, null, null, StuckOnly: true, + null, null, null, null, PageSize: 50), + TestActor); + + var response = ExpectMsg(TimeSpan.FromSeconds(10)); + Assert.True(response.Success); + Assert.Single(response.SiteCalls); + Assert.Equal(stuckId.Value, response.SiteCalls[0].TrackedOperationId); + Assert.True(response.SiteCalls[0].IsStuck); + } + + [SkippableFact] + public async Task SiteCallDetailRequest_KnownId_ReturnsFullDetail() + { + Skip.IfNot(_fixture.Available, _fixture.SkipReason); + + var siteId = NewSiteId(); + var id = TrackedOperationId.New(); + await using var context = CreateContext(); + var repo = new SiteCallAuditRepository(context); + var actor = CreateActor(repo); + + await repo.UpsertAsync(NewRow(id, siteId, status: "Attempted", retryCount: 2, lastError: "503")); + + actor.Tell(new SiteCallDetailRequest("corr-d1", id.Value), TestActor); + + var response = ExpectMsg(TimeSpan.FromSeconds(10)); + Assert.True(response.Success); + Assert.NotNull(response.Detail); + Assert.Equal(id.Value, response.Detail!.TrackedOperationId); + Assert.Equal("Attempted", response.Detail.Status); + Assert.Equal(2, response.Detail.RetryCount); + Assert.Equal("503", response.Detail.LastError); + Assert.Equal(siteId, response.Detail.SourceSite); + } + + [SkippableFact] + public async Task SiteCallDetailRequest_UnknownId_RepliesNotFound() + { + Skip.IfNot(_fixture.Available, _fixture.SkipReason); + + await using var context = CreateContext(); + var repo = new SiteCallAuditRepository(context); + var actor = CreateActor(repo); + + actor.Tell(new SiteCallDetailRequest("corr-d2", Guid.NewGuid()), TestActor); + + var response = ExpectMsg(TimeSpan.FromSeconds(10)); + Assert.False(response.Success); + Assert.Null(response.Detail); + Assert.NotNull(response.ErrorMessage); + } + + [SkippableFact] + public async Task SiteCallKpiRequest_ComputesPointInTimeCounts() + { + Skip.IfNot(_fixture.Available, _fixture.SkipReason); + + var siteId = NewSiteId(); + await using var context = CreateContext(); + var repo = new SiteCallAuditRepository(context); + var actor = CreateActor(repo, new SiteCallAuditOptions + { + StuckAgeThreshold = TimeSpan.FromMinutes(10), + KpiInterval = TimeSpan.FromHours(1), + }); + + var now = DateTime.UtcNow; + // Buffered (non-terminal Attempted) + stuck (created 30 min ago). + await repo.UpsertAsync(NewRow(TrackedOperationId.New(), siteId, status: "Attempted", createdAtUtc: now.AddMinutes(-30))); + // Buffered (non-terminal Attempted), not stuck. + await repo.UpsertAsync(NewRow(TrackedOperationId.New(), siteId, status: "Attempted", createdAtUtc: now.AddMinutes(-2))); + // Parked (terminal). + await repo.UpsertAsync(NewRow( + TrackedOperationId.New(), siteId, status: "Parked", + createdAtUtc: now.AddMinutes(-5), terminal: true)); + // Delivered within the interval. + await repo.UpsertAsync(NewRow( + TrackedOperationId.New(), siteId, status: "Delivered", + createdAtUtc: now.AddMinutes(-3), updatedAtUtc: now.AddMinutes(-1), terminal: true)); + + actor.Tell(new SiteCallKpiRequest("corr-kpi"), TestActor); + + var response = ExpectMsg(TimeSpan.FromSeconds(10)); + Assert.True(response.Success); + // Per-site rows are isolated by the unique siteId — but KPIs are global, + // so assert the floor (>=) rather than exact counts: other tests' rows + // may share the table. + Assert.True(response.BufferedCount >= 2); + Assert.True(response.ParkedCount >= 1); + Assert.True(response.DeliveredLastInterval >= 1); + Assert.True(response.StuckCount >= 1); + Assert.NotNull(response.OldestPendingAge); + } + + [SkippableFact] + public async Task PerSiteSiteCallKpiRequest_ScopesCountsToEachSite() + { + Skip.IfNot(_fixture.Available, _fixture.SkipReason); + + var siteId = NewSiteId(); + await using var context = CreateContext(); + var repo = new SiteCallAuditRepository(context); + var actor = CreateActor(repo, new SiteCallAuditOptions + { + StuckAgeThreshold = TimeSpan.FromMinutes(10), + KpiInterval = TimeSpan.FromHours(1), + }); + + var now = DateTime.UtcNow; + // Non-terminal Attempted, created 30 min ago — buffered + stuck. + await repo.UpsertAsync(NewRow(TrackedOperationId.New(), siteId, status: "Attempted", createdAtUtc: now.AddMinutes(-30))); + // Terminal Parked. + await repo.UpsertAsync(NewRow( + TrackedOperationId.New(), siteId, status: "Parked", + createdAtUtc: now.AddMinutes(-5), terminal: true)); + + actor.Tell(new PerSiteSiteCallKpiRequest("corr-psk"), TestActor); + + var response = ExpectMsg(TimeSpan.FromSeconds(10)); + Assert.True(response.Success); + + var mySite = Assert.Single(response.Sites, s => s.SourceSite == siteId); + Assert.Equal(1, mySite.BufferedCount); + Assert.Equal(1, mySite.ParkedCount); + Assert.Equal(1, mySite.StuckCount); + Assert.NotNull(mySite.OldestPendingAge); + } + + [SkippableFact] + public async Task SiteCallQueryRequest_RepoThrows_RepliesFailure_ActorStaysAlive() + { + Skip.IfNot(_fixture.Available, _fixture.SkipReason); + + var siteId = NewSiteId(); + await using var context = CreateContext(); + var realRepo = new SiteCallAuditRepository(context); + var actor = CreateActor(new QueryThrowingRepository(realRepo)); + + actor.Tell( + new SiteCallQueryRequest( + "corr-fault", null, siteId, null, null, false, null, null, null, null, 50), + TestActor); + + var response = ExpectMsg(TimeSpan.FromSeconds(10)); + Assert.False(response.Success); + Assert.Empty(response.SiteCalls); + Assert.NotNull(response.ErrorMessage); + Assert.Equal("corr-fault", response.CorrelationId); + } + + /// + /// Test double whose always + /// throws — used to verify the query handler's failure projection produces a + /// Success=false response without crashing the actor. + /// + private sealed class QueryThrowingRepository : ISiteCallAuditRepository + { + private readonly ISiteCallAuditRepository _inner; + + public QueryThrowingRepository(ISiteCallAuditRepository inner) + { + _inner = inner; + } + + public Task UpsertAsync(SiteCall siteCall, CancellationToken ct = default) => + _inner.UpsertAsync(siteCall, ct); + + public Task GetAsync(TrackedOperationId id, CancellationToken ct = default) => + _inner.GetAsync(id, ct); + + public Task> QueryAsync( + SiteCallQueryFilter filter, SiteCallPaging paging, CancellationToken ct = default) => + throw new InvalidOperationException("simulated query failure"); + + public Task PurgeTerminalAsync(DateTime olderThanUtc, CancellationToken ct = default) => + _inner.PurgeTerminalAsync(olderThanUtc, ct); + + public Task ComputeKpisAsync( + DateTime stuckCutoff, DateTime intervalSince, CancellationToken ct = default) => + _inner.ComputeKpisAsync(stuckCutoff, intervalSince, ct); + + public Task> ComputePerSiteKpisAsync( + DateTime stuckCutoff, DateTime intervalSince, CancellationToken ct = default) => + _inner.ComputePerSiteKpisAsync(stuckCutoff, intervalSince, ct); + } + /// /// Tiny test double that delegates to a real repository but throws on a /// specified . Used to verify the actor's @@ -217,5 +504,13 @@ public class SiteCallAuditActorTests : TestKit, IClassFixture PurgeTerminalAsync(DateTime olderThanUtc, CancellationToken ct = default) => _inner.PurgeTerminalAsync(olderThanUtc, ct); + + public Task ComputeKpisAsync( + DateTime stuckCutoff, DateTime intervalSince, CancellationToken ct = default) => + _inner.ComputeKpisAsync(stuckCutoff, intervalSince, ct); + + public Task> ComputePerSiteKpisAsync( + DateTime stuckCutoff, DateTime intervalSince, CancellationToken ct = default) => + _inner.ComputePerSiteKpisAsync(stuckCutoff, intervalSince, ct); } } diff --git a/tests/ScadaLink.SiteCallAudit.Tests/SiteCallAuditOptionsTests.cs b/tests/ScadaLink.SiteCallAudit.Tests/SiteCallAuditOptionsTests.cs new file mode 100644 index 0000000..6778dc1 --- /dev/null +++ b/tests/ScadaLink.SiteCallAudit.Tests/SiteCallAuditOptionsTests.cs @@ -0,0 +1,15 @@ +namespace ScadaLink.SiteCallAudit.Tests; + +public class SiteCallAuditOptionsTests +{ + [Fact] + public void Defaults_AreExpectedValues() + { + var options = new SiteCallAuditOptions(); + + // Stuck threshold mirrors NotificationOutboxOptions.StuckAgeThreshold. + Assert.Equal(TimeSpan.FromMinutes(10), options.StuckAgeThreshold); + // KPI interval mirrors NotificationOutboxOptions.DeliveredKpiWindow. + Assert.Equal(TimeSpan.FromMinutes(1), options.KpiInterval); + } +}