feat(sitecallaudit): query, KPI and detail backend for the Site Calls page

This commit is contained in:
Joseph Doherty
2026-05-21 04:14:49 -04:00
parent 6f0d2ca499
commit e3519fdb39
17 changed files with 1514 additions and 18 deletions

View File

@@ -63,4 +63,27 @@ public interface ISiteCallAuditRepository
/// deleted.
/// </summary>
Task<int> PurgeTerminalAsync(DateTime olderThanUtc, CancellationToken ct = default);
/// <summary>
/// Computes a point-in-time global <see cref="SiteCallKpiSnapshot"/> from the
/// <c>SiteCalls</c> table. Counts are aggregated server-side (no row
/// materialisation): <c>StuckCount</c> uses <paramref name="stuckCutoff"/>;
/// <c>FailedLastInterval</c> / <c>DeliveredLastInterval</c> use
/// <paramref name="intervalSince"/>; the current time for <c>OldestPendingAge</c>
/// is captured inside the method.
/// </summary>
Task<SiteCallKpiSnapshot> ComputeKpisAsync(
DateTime stuckCutoff,
DateTime intervalSince,
CancellationToken ct = default);
/// <summary>
/// Computes a point-in-time <see cref="SiteCallSiteKpiSnapshot"/> per source
/// site. Sites with no <c>SiteCalls</c> rows at all are omitted. The stuck
/// cutoff and interval bounds are interpreted as in <see cref="ComputeKpisAsync"/>.
/// </summary>
Task<IReadOnlyList<SiteCallSiteKpiSnapshot>> ComputePerSiteKpisAsync(
DateTime stuckCutoff,
DateTime intervalSince,
CancellationToken ct = default);
}

View File

@@ -0,0 +1,153 @@
using ScadaLink.Commons.Types.Audit;
namespace ScadaLink.Commons.Messages.Audit;
/// <summary>
/// Site Calls UI -> Central: paginated, filtered query over the central
/// <c>SiteCalls</c> table (Site Call Audit #22). All filter fields are optional;
/// <see cref="StuckOnly"/> restricts results to stuck cached calls. Mirrors
/// <see cref="ScadaLink.Commons.Messages.Notification.NotificationOutboxQueryRequest"/>
/// but uses keyset paging (<see cref="AfterCreatedAtUtc"/> + <see cref="AfterId"/>)
/// to match the repository's <c>(CreatedAtUtc DESC, TrackedOperationId DESC)</c>
/// cursor, rather than page numbers.
/// </summary>
/// <remarks>
/// <see cref="ChannelFilter"/> matches the <c>SiteCall.Channel</c> column —
/// <c>"ApiOutbound"</c> or <c>"DbOutbound"</c> (the spec's <c>Kind</c> notion;
/// the entity exposes it as <c>Channel</c>). <see cref="TargetKeyword"/> is an
/// exact-match target filter, consistent with the repository's
/// <see cref="SiteCallQueryFilter.Target"/> predicate.
/// </remarks>
public sealed record SiteCallQueryRequest(
string CorrelationId,
string? StatusFilter,
string? SourceSiteFilter,
string? ChannelFilter,
string? TargetKeyword,
bool StuckOnly,
DateTime? FromUtc,
DateTime? ToUtc,
DateTime? AfterCreatedAtUtc,
Guid? AfterId,
int PageSize);
/// <summary>
/// A single <c>SiteCalls</c> row summarised for the Site Calls UI grid. Carries
/// only the columns the <see cref="ScadaLink.Commons.Entities.Audit.SiteCall"/>
/// entity genuinely exposes — there are no source-instance/script provenance
/// columns on that entity, so unlike
/// <see cref="ScadaLink.Commons.Messages.Notification.NotificationSummary"/>
/// none are surfaced here.
/// </summary>
public sealed record SiteCallSummary(
Guid TrackedOperationId,
string SourceSite,
string Channel,
string Target,
string Status,
int RetryCount,
string? LastError,
int? HttpStatus,
DateTime CreatedAtUtc,
DateTime UpdatedAtUtc,
DateTime? TerminalAtUtc,
bool IsStuck);
/// <summary>
/// Central -> Site Calls UI: paginated response for a <see cref="SiteCallQueryRequest"/>.
/// The keyset cursor of the last row is echoed back as
/// <see cref="NextAfterCreatedAtUtc"/> + <see cref="NextAfterId"/> for the caller
/// to request the following page; both are <c>null</c> when the page was empty.
/// On a repository fault <see cref="Success"/> is <c>false</c>,
/// <see cref="ErrorMessage"/> carries the cause and <see cref="SiteCalls"/> is empty.
/// </summary>
public sealed record SiteCallQueryResponse(
string CorrelationId,
bool Success,
string? ErrorMessage,
IReadOnlyList<SiteCallSummary> SiteCalls,
DateTime? NextAfterCreatedAtUtc,
Guid? NextAfterId);
/// <summary>
/// Site Calls UI -> Central: request for the full detail of a single cached call,
/// for the report detail modal.
/// </summary>
public sealed record SiteCallDetailRequest(
string CorrelationId,
Guid TrackedOperationId);
/// <summary>
/// Central -> Site Calls UI: full detail for one cached call. On a repository
/// fault or missing row, <see cref="Success"/> is <c>false</c> /
/// <see cref="Detail"/> is <c>null</c> and <see cref="ErrorMessage"/> carries
/// the cause.
/// </summary>
public sealed record SiteCallDetailResponse(
string CorrelationId,
bool Success,
string? ErrorMessage,
SiteCallDetail? Detail);
/// <summary>
/// Full <c>SiteCalls</c> row detail for the report detail modal — every field
/// on the <see cref="ScadaLink.Commons.Entities.Audit.SiteCall"/> entity,
/// including <see cref="LastError"/> and the <see cref="IngestedAtUtc"/>
/// timestamp the grid summary omits.
/// </summary>
public sealed record SiteCallDetail(
Guid TrackedOperationId,
string SourceSite,
string Channel,
string Target,
string Status,
int RetryCount,
string? LastError,
int? HttpStatus,
DateTime CreatedAtUtc,
DateTime UpdatedAtUtc,
DateTime? TerminalAtUtc,
DateTime IngestedAtUtc);
/// <summary>
/// Site Calls UI -> Central: request for the global <c>SiteCalls</c> KPI summary.
/// Mirrors <see cref="ScadaLink.Commons.Messages.Notification.NotificationKpiRequest"/>.
/// </summary>
public sealed record SiteCallKpiRequest(
string CorrelationId);
/// <summary>
/// Central -> Site Calls UI: KPI summary for the Site Calls dashboard. On a
/// repository fault <see cref="Success"/> is <c>false</c>,
/// <see cref="ErrorMessage"/> carries the cause, and the KPI fields are
/// zeroed/<c>null</c>.
/// </summary>
public sealed record SiteCallKpiResponse(
string CorrelationId,
bool Success,
string? ErrorMessage,
int BufferedCount,
int ParkedCount,
int FailedLastInterval,
int DeliveredLastInterval,
TimeSpan? OldestPendingAge,
int StuckCount);
/// <summary>
/// Site Calls UI -> Central: request for the per-source-site <c>SiteCalls</c>
/// KPI breakdown. Mirrors
/// <see cref="ScadaLink.Commons.Messages.Notification.PerSiteNotificationKpiRequest"/>.
/// </summary>
public sealed record PerSiteSiteCallKpiRequest(
string CorrelationId);
/// <summary>
/// Central -> Site Calls UI: per-site KPI breakdown for the Site Calls KPIs
/// page. On a repository fault <see cref="Success"/> is <c>false</c>,
/// <see cref="ErrorMessage"/> carries the cause, and <see cref="Sites"/> is empty.
/// </summary>
public sealed record PerSiteSiteCallKpiResponse(
string CorrelationId,
bool Success,
string? ErrorMessage,
IReadOnlyList<SiteCallSiteKpiSnapshot> Sites);

View File

@@ -0,0 +1,38 @@
namespace ScadaLink.Commons.Types.Audit;
/// <summary>
/// Point-in-time operational metrics for the central <c>SiteCalls</c> table
/// (Site Call Audit #22), surfaced on the health dashboard. The cached-call
/// counterpart of <see cref="ScadaLink.Commons.Types.Notifications.NotificationKpiSnapshot"/>;
/// mirrors its shape so the Central UI Site Calls KPI tiles can reuse the
/// Notification Outbox tile layout.
/// </summary>
/// <param name="BufferedCount">
/// Count of non-terminal rows (<c>Pending</c> + <c>Retrying</c>) — calls
/// buffered at sites awaiting retry.
/// </param>
/// <param name="ParkedCount">Count of rows in the <c>Parked</c> status.</param>
/// <param name="FailedLastInterval">
/// Count of <c>Failed</c> rows whose <see cref="ScadaLink.Commons.Entities.Audit.SiteCall.TerminalAtUtc"/>
/// is at or after the supplied "since" timestamp.
/// </param>
/// <param name="DeliveredLastInterval">
/// Count of <c>Delivered</c> rows whose <see cref="ScadaLink.Commons.Entities.Audit.SiteCall.TerminalAtUtc"/>
/// is at or after the supplied "since" timestamp.
/// </param>
/// <param name="OldestPendingAge">
/// Age of the oldest non-terminal row (<c>now - min(CreatedAtUtc)</c>), or
/// <c>null</c> when there are no non-terminal rows.
/// </param>
/// <param name="StuckCount">
/// Count of non-terminal rows (<c>Pending</c>/<c>Retrying</c>) whose
/// <see cref="ScadaLink.Commons.Entities.Audit.SiteCall.CreatedAtUtc"/> is older
/// than the supplied stuck cutoff. Display-only — no escalation.
/// </param>
public sealed record SiteCallKpiSnapshot(
int BufferedCount,
int ParkedCount,
int FailedLastInterval,
int DeliveredLastInterval,
TimeSpan? OldestPendingAge,
int StuckCount);

View File

@@ -0,0 +1,34 @@
namespace ScadaLink.Commons.Types.Audit;
/// <summary>
/// Point-in-time <c>SiteCalls</c> metrics scoped to a single source site. The
/// per-site counterpart of <see cref="SiteCallKpiSnapshot"/>; surfaced in the
/// per-site breakdown table on the Site Calls KPIs page. Mirrors
/// <see cref="ScadaLink.Commons.Types.Notifications.SiteNotificationKpiSnapshot"/>.
/// </summary>
/// <param name="SourceSite">The site identifier these metrics are scoped to.</param>
/// <param name="BufferedCount">Count of this site's non-terminal rows (<c>Pending</c> + <c>Retrying</c>).</param>
/// <param name="ParkedCount">Count of this site's rows in the <c>Parked</c> status.</param>
/// <param name="FailedLastInterval">
/// Count of this site's <c>Failed</c> rows whose <c>TerminalAtUtc</c> is at or
/// after the "since" timestamp.
/// </param>
/// <param name="DeliveredLastInterval">
/// Count of this site's <c>Delivered</c> rows whose <c>TerminalAtUtc</c> is at
/// or after the "since" timestamp.
/// </param>
/// <param name="OldestPendingAge">
/// Age of this site's oldest non-terminal row, or <c>null</c> when it has none.
/// </param>
/// <param name="StuckCount">
/// Count of this site's non-terminal rows whose <c>CreatedAtUtc</c> is older
/// than the stuck cutoff.
/// </param>
public sealed record SiteCallSiteKpiSnapshot(
string SourceSite,
int BufferedCount,
int ParkedCount,
int FailedLastInterval,
int DeliveredLastInterval,
TimeSpan? OldestPendingAge,
int StuckCount);

View File

@@ -2,6 +2,7 @@ using Akka.Actor;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using ScadaLink.Commons.Messages.Artifacts;
using ScadaLink.Commons.Messages.Audit;
using ScadaLink.Commons.Messages.DebugView;
using ScadaLink.Commons.Messages.Deployment;
using ScadaLink.Commons.Messages.Health;
@@ -25,6 +26,7 @@ public class CommunicationService
private readonly ILogger<CommunicationService> _logger;
private IActorRef? _centralCommunicationActor;
private IActorRef? _notificationOutboxProxy;
private IActorRef? _siteCallAuditProxy;
public CommunicationService(
IOptions<CommunicationOptions> options,
@@ -52,6 +54,17 @@ public class CommunicationService
_notificationOutboxProxy = notificationOutboxProxy;
}
/// <summary>
/// Sets the Site Call Audit (#22) singleton proxy reference. Called during
/// actor system startup. The Site Call Audit actor is central-local, so Site
/// Calls read calls Ask this proxy directly (no SiteEnvelope routing), the
/// same pattern as <see cref="SetNotificationOutbox"/>.
/// </summary>
public void SetSiteCallAudit(IActorRef siteCallAuditProxy)
{
_siteCallAuditProxy = siteCallAuditProxy;
}
/// <summary>
/// Triggers an immediate refresh of the site address cache from the database.
/// </summary>
@@ -80,6 +93,15 @@ public class CommunicationService
?? throw new InvalidOperationException("CommunicationService not initialized. NotificationOutbox proxy not set.");
}
/// <summary>
/// Gets the Site Call Audit proxy reference. Throws if not yet initialized.
/// </summary>
private IActorRef GetSiteCallAudit()
{
return _siteCallAuditProxy
?? throw new InvalidOperationException("CommunicationService not initialized. SiteCallAudit proxy not set.");
}
// ── Pattern 1: Instance Deployment ──
public async Task<DeploymentStatusResponse> DeployInstanceAsync(
@@ -295,6 +317,36 @@ public class CommunicationService
return await GetNotificationOutbox().Ask<PerSiteNotificationKpiResponse>(
request, _options.QueryTimeout, cancellationToken);
}
// ── Site Call Audit (central-local actor — Asked directly, no SiteEnvelope) ──
public async Task<SiteCallQueryResponse> QuerySiteCallsAsync(
SiteCallQueryRequest request, CancellationToken cancellationToken = default)
{
return await GetSiteCallAudit().Ask<SiteCallQueryResponse>(
request, _options.QueryTimeout, cancellationToken);
}
public async Task<SiteCallDetailResponse> GetSiteCallDetailAsync(
SiteCallDetailRequest request, CancellationToken cancellationToken = default)
{
return await GetSiteCallAudit().Ask<SiteCallDetailResponse>(
request, _options.QueryTimeout, cancellationToken);
}
public async Task<SiteCallKpiResponse> GetSiteCallKpisAsync(
SiteCallKpiRequest request, CancellationToken cancellationToken = default)
{
return await GetSiteCallAudit().Ask<SiteCallKpiResponse>(
request, _options.QueryTimeout, cancellationToken);
}
public async Task<PerSiteSiteCallKpiResponse> GetPerSiteSiteCallKpisAsync(
PerSiteSiteCallKpiRequest request, CancellationToken cancellationToken = default)
{
return await GetSiteCallAudit().Ask<PerSiteSiteCallKpiResponse>(
request, _options.QueryTimeout, cancellationToken);
}
}
/// <summary>

View File

@@ -201,6 +201,141 @@ ORDER BY CreatedAtUtc DESC, TrackedOperationId DESC;";
ct);
}
// Terminal status string literals for the interval-throughput KPIs. The
// Status column is a plain varchar (no value converter), so these compare
// directly in translated SQL.
//
// NOTE on the "buffered/non-terminal" definition: the SiteCalls operational
// mirror stores AuditStatus-derived strings (Attempted/Delivered/Parked/
// Failed/...), NOT the tracking-lifecycle Pending/Retrying names the spec's
// KPI section uses. There is therefore no Status string that means
// "buffered". The schema-honest predicate for "non-terminal / buffered" is
// TerminalAtUtc IS NULL — consistent with PurgeTerminalAsync's terminal
// predicate and with the SiteCall entity's own contract ("TerminalAtUtc ...
// null while still active"). All buffered / stuck / oldest-pending counts
// below key off TerminalAtUtc, not Status.
private const string StatusParked = "Parked";
private const string StatusDelivered = "Delivered";
private const string StatusFailed = "Failed";
/// <summary>
/// Computes the global KPI snapshot with five server-side aggregate queries
/// against <c>dbo.SiteCalls</c>. No rows are materialised — every count is a
/// translated <c>COUNT</c> and the oldest-pending age is a translated
/// <c>MIN(CreatedAtUtc)</c>. The <c>Status</c> and <c>CreatedAtUtc</c>/<c>TerminalAtUtc</c>
/// columns have no value converter, so the aggregates translate cleanly to
/// SQL Server (unlike the NotificationOutbox's <c>DateTimeOffset</c>-converted
/// column, which forces an order-and-take). "Buffered" / "stuck" key off
/// <c>TerminalAtUtc IS NULL</c> — see the field comments above.
/// </summary>
public async Task<SiteCallKpiSnapshot> ComputeKpisAsync(
DateTime stuckCutoff, DateTime intervalSince, CancellationToken ct = default)
{
var now = DateTime.UtcNow;
var bufferedCount = await _context.SiteCalls
.CountAsync(s => s.TerminalAtUtc == null, ct);
var parkedCount = await _context.SiteCalls
.CountAsync(s => s.Status == StatusParked, ct);
var failedLastInterval = await _context.SiteCalls
.CountAsync(s => s.Status == StatusFailed
&& s.TerminalAtUtc != null
&& s.TerminalAtUtc >= intervalSince, ct);
var deliveredLastInterval = await _context.SiteCalls
.CountAsync(s => s.Status == StatusDelivered
&& s.TerminalAtUtc != null
&& s.TerminalAtUtc >= intervalSince, ct);
var stuckCount = await _context.SiteCalls
.CountAsync(s => s.TerminalAtUtc == null && s.CreatedAtUtc < stuckCutoff, ct);
var nonTerminal = _context.SiteCalls.Where(s => s.TerminalAtUtc == null);
TimeSpan? oldestPendingAge = null;
if (await nonTerminal.AnyAsync(ct))
{
var oldestCreatedAt = await nonTerminal.MinAsync(s => s.CreatedAtUtc, ct);
oldestPendingAge = now - oldestCreatedAt;
}
return new SiteCallKpiSnapshot(
BufferedCount: bufferedCount,
ParkedCount: parkedCount,
FailedLastInterval: failedLastInterval,
DeliveredLastInterval: deliveredLastInterval,
OldestPendingAge: oldestPendingAge,
StuckCount: stuckCount);
}
/// <summary>
/// Computes the per-source-site KPI breakdown. The five counts are
/// <c>GROUP BY SourceSite</c> aggregates; the oldest-pending age is a
/// per-site <c>MIN(CreatedAtUtc)</c> over the (bounded) non-terminal set —
/// all run server-side. A site appears in the result only if it has at
/// least one row matched by one of the count queries. "Buffered" / "stuck"
/// key off <c>TerminalAtUtc IS NULL</c> — see <see cref="ComputeKpisAsync"/>.
/// </summary>
public async Task<IReadOnlyList<SiteCallSiteKpiSnapshot>> ComputePerSiteKpisAsync(
DateTime stuckCutoff, DateTime intervalSince, CancellationToken ct = default)
{
var now = DateTime.UtcNow;
var buffered = await CountBySiteAsync(s => s.TerminalAtUtc == null, ct);
var parked = await CountBySiteAsync(s => s.Status == StatusParked, ct);
var failed = await CountBySiteAsync(
s => s.Status == StatusFailed
&& s.TerminalAtUtc != null && s.TerminalAtUtc >= intervalSince, ct);
var delivered = await CountBySiteAsync(
s => s.Status == StatusDelivered
&& s.TerminalAtUtc != null && s.TerminalAtUtc >= intervalSince, ct);
var stuck = await CountBySiteAsync(
s => s.TerminalAtUtc == null && s.CreatedAtUtc < stuckCutoff, ct);
// Oldest non-terminal CreatedAtUtc per site — a server-side GROUP BY MIN.
var oldest = (await _context.SiteCalls
.Where(s => s.TerminalAtUtc == null)
.GroupBy(s => s.SourceSite)
.Select(g => new { Site = g.Key, Oldest = g.Min(s => s.CreatedAtUtc) })
.ToListAsync(ct))
.ToDictionary(x => x.Site, x => x.Oldest);
var siteIds = buffered.Keys
.Concat(parked.Keys).Concat(failed.Keys)
.Concat(delivered.Keys).Concat(stuck.Keys)
.Distinct()
.OrderBy(s => s, StringComparer.Ordinal);
return siteIds.Select(site => new SiteCallSiteKpiSnapshot(
SourceSite: site,
BufferedCount: buffered.GetValueOrDefault(site),
ParkedCount: parked.GetValueOrDefault(site),
FailedLastInterval: failed.GetValueOrDefault(site),
DeliveredLastInterval: delivered.GetValueOrDefault(site),
OldestPendingAge: oldest.TryGetValue(site, out var createdAt)
? now - createdAt
: null,
StuckCount: stuck.GetValueOrDefault(site))).ToList();
}
/// <summary>Counts <c>SiteCalls</c> rows matching <paramref name="predicate"/>, grouped by source site.</summary>
private async Task<Dictionary<string, int>> CountBySiteAsync(
System.Linq.Expressions.Expression<Func<SiteCall, bool>> predicate,
CancellationToken ct)
{
return await _context.SiteCalls
.Where(predicate)
.GroupBy(s => s.SourceSite)
.Select(g => new { Site = g.Key, Count = g.Count() })
.ToDictionaryAsync(x => x.Site, x => x.Count, ct);
}
private static int GetRankOrThrow(string status)
{
if (!StatusRank.TryGetValue(status, out var rank))

View File

@@ -423,10 +423,13 @@ akka {{
// is a scoped EF Core service.
var siteCallAuditLogger = _serviceProvider.GetRequiredService<ILoggerFactory>()
.CreateLogger<ScadaLink.SiteCallAudit.SiteCallAuditActor>();
var siteCallAuditOptions = _serviceProvider
.GetRequiredService<IOptions<ScadaLink.SiteCallAudit.SiteCallAuditOptions>>().Value;
var siteCallAuditSingletonProps = ClusterSingletonManager.Props(
singletonProps: Props.Create(() => new ScadaLink.SiteCallAudit.SiteCallAuditActor(
_serviceProvider,
siteCallAuditOptions,
siteCallAuditLogger)),
terminationMessage: PoisonPill.Instance,
settings: ClusterSingletonManagerSettings.Create(_actorSystem!)
@@ -437,7 +440,12 @@ akka {{
singletonManagerPath: "/user/site-call-audit-singleton",
settings: ClusterSingletonProxySettings.Create(_actorSystem)
.WithSingletonName("site-call-audit"));
_actorSystem.ActorOf(siteCallAuditProxyProps, "site-call-audit-proxy");
var siteCallAuditProxy = _actorSystem.ActorOf(siteCallAuditProxyProps, "site-call-audit-proxy");
// Hand the proxy to the CommunicationService so the Central UI can Ask
// the Site Call Audit actor directly (query, KPIs, detail) — mirrors the
// SetNotificationOutbox wiring above.
commService?.SetSiteCallAudit(siteCallAuditProxy);
_logger.LogInformation("SiteCallAuditActor singleton created");
_logger.LogInformation("Central actors registered. CentralCommunicationActor created.");

View File

@@ -13,6 +13,8 @@
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" />
<PackageReference Include="Microsoft.Extensions.Options" />
<!-- BindConfiguration extension for the SiteCallAuditOptions binding. -->
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" />
</ItemGroup>
<ItemGroup>

View File

@@ -7,33 +7,34 @@ namespace ScadaLink.SiteCallAudit;
/// </summary>
/// <remarks>
/// <para>
/// M3 Bundle C ships the ingest-only minimum surface (the actor itself); the
/// full DI surface — reconciliation puller, KPI projector, central→site
/// Retry/Discard relay, options + validators — is deferred to a follow-up.
/// Binds <see cref="SiteCallAuditOptions"/> (stuck-call detection + KPI
/// windowing for the read-side query/KPI handlers). The reconciliation puller
/// and central→site Retry/Discard relay are still deferred to later follow-ups.
/// </para>
/// <para>
/// The repository (<c>ISiteCallAuditRepository</c>) is registered by
/// <c>ScadaLink.ConfigurationDatabase.ServiceCollectionExtensions.AddConfigurationDatabase</c>,
/// so callers (the Host on the central node) must also call that. The actor's
/// <c>Props</c> are wired up in Host registration (Bundle F); this extension
/// is currently a no-op placeholder kept for symmetry with the AuditLog and
/// NotificationOutbox composition roots — adding it now means consumers can
/// reference the method without re-touching the Host project later.
/// <c>Props</c> are wired up in Host registration.
/// </para>
/// </remarks>
public static class ServiceCollectionExtensions
{
/// <summary>Configuration section bound to <see cref="SiteCallAuditOptions"/>.</summary>
public const string OptionsSection = "ScadaLink:SiteCallAudit";
/// <summary>
/// Registers Site Call Audit (#22) services. Currently a no-op
/// placeholder — Bundle F will populate this with the actor's Props
/// factory + options bindings. The method is exposed now so the Host
/// wiring call already exists at the API boundary.
/// Registers Site Call Audit (#22) services: the <see cref="SiteCallAuditOptions"/>
/// binding consumed by the actor's read-side KPI/query handlers. The actor's
/// <c>Props</c> are still constructed inline in Host wiring.
/// </summary>
public static IServiceCollection AddSiteCallAudit(this IServiceCollection services)
{
ArgumentNullException.ThrowIfNull(services);
// Actor props are constructed in Host wiring (Bundle F). This
// extension is a placeholder for future config + DI.
services.AddOptions<SiteCallAuditOptions>()
.BindConfiguration(OptionsSection);
return services;
}
}

View File

@@ -1,8 +1,11 @@
using Akka.Actor;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using ScadaLink.Commons.Entities.Audit;
using ScadaLink.Commons.Interfaces.Repositories;
using ScadaLink.Commons.Messages.Audit;
using ScadaLink.Commons.Types;
using ScadaLink.Commons.Types.Audit;
namespace ScadaLink.SiteCallAudit;
@@ -42,26 +45,34 @@ namespace ScadaLink.SiteCallAudit;
/// </remarks>
public class SiteCallAuditActor : ReceiveActor
{
/// <summary>Maximum page size honoured by a <see cref="SiteCallQueryRequest"/>.</summary>
private const int MaxPageSize = 200;
private readonly IServiceProvider? _serviceProvider;
private readonly ISiteCallAuditRepository? _injectedRepository;
private readonly SiteCallAuditOptions _options;
private readonly ILogger<SiteCallAuditActor> _logger;
/// <summary>
/// Test-mode constructor — injects a concrete repository instance whose
/// lifetime exceeds the test, so the actor reuses the same instance
/// across every message. Used by Bundle C's MSSQL-backed TestKit fixture.
/// An optional <paramref name="options"/> lets a test pin the stuck/KPI
/// windows; when omitted the production defaults apply.
/// </summary>
public SiteCallAuditActor(
ISiteCallAuditRepository repository,
ILogger<SiteCallAuditActor> logger)
ILogger<SiteCallAuditActor> logger,
SiteCallAuditOptions? options = null)
{
ArgumentNullException.ThrowIfNull(repository);
ArgumentNullException.ThrowIfNull(logger);
_injectedRepository = repository;
_logger = logger;
_options = options ?? new SiteCallAuditOptions();
ReceiveAsync<UpsertSiteCallCommand>(OnUpsertAsync);
RegisterHandlers();
}
/// <summary>
@@ -73,15 +84,33 @@ public class SiteCallAuditActor : ReceiveActor
/// </summary>
public SiteCallAuditActor(
IServiceProvider serviceProvider,
SiteCallAuditOptions options,
ILogger<SiteCallAuditActor> logger)
{
ArgumentNullException.ThrowIfNull(serviceProvider);
ArgumentNullException.ThrowIfNull(options);
ArgumentNullException.ThrowIfNull(logger);
_serviceProvider = serviceProvider;
_options = options;
_logger = logger;
RegisterHandlers();
}
/// <summary>
/// Wires up the message handlers shared by both constructors: the M3
/// ingest path plus the Task 4 read-side (query, detail, global + per-site
/// KPI). All read handlers reply to an Ask, so they capture <c>Sender</c>
/// before the first await and <c>PipeTo</c> the result back.
/// </summary>
private void RegisterHandlers()
{
ReceiveAsync<UpsertSiteCallCommand>(OnUpsertAsync);
Receive<SiteCallQueryRequest>(HandleQuery);
Receive<SiteCallDetailRequest>(HandleDetail);
Receive<SiteCallKpiRequest>(HandleKpi);
Receive<PerSiteSiteCallKpiRequest>(HandlePerSiteKpi);
}
/// <summary>
@@ -137,4 +166,305 @@ public class SiteCallAuditActor : ReceiveActor
scope?.Dispose();
}
}
// ── Task 4: read-side (query / detail / KPI) ──
/// <summary>
/// Handles a paginated, filtered query over the <c>SiteCalls</c> table.
/// Builds a <see cref="SiteCallQueryFilter"/> + <see cref="SiteCallPaging"/>
/// keyset cursor from the request, runs the query on a scoped repository,
/// and pipes the mapped response back to the captured sender. A repository
/// fault yields a failure response with an empty list.
/// </summary>
private void HandleQuery(SiteCallQueryRequest request)
{
var sender = Sender;
var now = DateTime.UtcNow;
QueryAsync(request, now).PipeTo(
sender,
success: response => response,
failure: ex => new SiteCallQueryResponse(
request.CorrelationId,
Success: false,
ErrorMessage: ex.GetBaseException().Message,
SiteCalls: Array.Empty<SiteCallSummary>(),
NextAfterCreatedAtUtc: null,
NextAfterId: null));
}
private async Task<SiteCallQueryResponse> QueryAsync(SiteCallQueryRequest request, DateTime now)
{
var filter = new SiteCallQueryFilter(
Channel: NullIfBlank(request.ChannelFilter),
SourceSite: NullIfBlank(request.SourceSiteFilter),
Status: NullIfBlank(request.StatusFilter),
Target: NullIfBlank(request.TargetKeyword),
FromUtc: request.FromUtc,
ToUtc: request.ToUtc);
var pageSize = Math.Clamp(request.PageSize, 1, MaxPageSize);
var paging = new SiteCallPaging(
PageSize: pageSize,
AfterCreatedAtUtc: request.AfterCreatedAtUtc,
AfterId: request.AfterId is { } id ? new TrackedOperationId(id) : null);
var (scope, repository) = ResolveRepository();
try
{
var rows = await repository.QueryAsync(filter, paging).ConfigureAwait(false);
var stuckCutoff = now - _options.StuckAgeThreshold;
var summaries = rows
// StuckOnly is post-filtered here rather than pushed into the
// repository SQL — the SiteCallQueryFilter has no stuck predicate
// and a status-aware created-before clause does not compose with
// the keyset cursor. The page may therefore return fewer than
// PageSize rows when StuckOnly is set; that is acceptable for a
// display-only filter.
.Where(row => !request.StuckOnly || IsStuck(row, stuckCutoff))
.Select(row => ToSummary(row, stuckCutoff))
.ToList();
// The next-page cursor is the LAST row of the materialised page —
// before StuckOnly post-filtering, so paging still advances even
// when every row on a page was filtered out.
var cursorRow = rows.Count > 0 ? rows[^1] : null;
return new SiteCallQueryResponse(
request.CorrelationId,
Success: true,
ErrorMessage: null,
SiteCalls: summaries,
NextAfterCreatedAtUtc: cursorRow?.CreatedAtUtc,
NextAfterId: cursorRow?.TrackedOperationId.Value);
}
finally
{
scope?.Dispose();
}
}
/// <summary>
/// Handles a full-detail query for a single cached call — backs the report
/// detail modal. A missing row yields <c>Success=false</c> with a "not
/// found" message; a repository fault yields <c>Success=false</c> with the
/// fault message.
/// </summary>
private void HandleDetail(SiteCallDetailRequest request)
{
var sender = Sender;
DetailAsync(request).PipeTo(
sender,
success: response => response,
failure: ex => new SiteCallDetailResponse(
request.CorrelationId,
Success: false,
ErrorMessage: ex.GetBaseException().Message,
Detail: null));
}
private async Task<SiteCallDetailResponse> DetailAsync(SiteCallDetailRequest request)
{
var (scope, repository) = ResolveRepository();
try
{
var row = await repository
.GetAsync(new TrackedOperationId(request.TrackedOperationId))
.ConfigureAwait(false);
if (row is null)
{
return new SiteCallDetailResponse(
request.CorrelationId,
Success: false,
ErrorMessage: "site call not found",
Detail: null);
}
return new SiteCallDetailResponse(
request.CorrelationId,
Success: true,
ErrorMessage: null,
Detail: ToDetail(row));
}
finally
{
scope?.Dispose();
}
}
/// <summary>
/// Handles a global KPI snapshot request, deriving the stuck cutoff from
/// <see cref="SiteCallAuditOptions.StuckAgeThreshold"/> and the
/// failed/delivered interval bound from <see cref="SiteCallAuditOptions.KpiInterval"/>.
/// </summary>
private void HandleKpi(SiteCallKpiRequest request)
{
var sender = Sender;
var now = DateTime.UtcNow;
var stuckCutoff = now - _options.StuckAgeThreshold;
var intervalSince = now - _options.KpiInterval;
KpiAsync(request.CorrelationId, stuckCutoff, intervalSince).PipeTo(
sender,
success: response => response,
failure: ex => new SiteCallKpiResponse(
request.CorrelationId,
Success: false,
ErrorMessage: ex.GetBaseException().Message,
BufferedCount: 0,
ParkedCount: 0,
FailedLastInterval: 0,
DeliveredLastInterval: 0,
OldestPendingAge: null,
StuckCount: 0));
}
private async Task<SiteCallKpiResponse> KpiAsync(
string correlationId, DateTime stuckCutoff, DateTime intervalSince)
{
var (scope, repository) = ResolveRepository();
try
{
var snapshot = await repository
.ComputeKpisAsync(stuckCutoff, intervalSince)
.ConfigureAwait(false);
return new SiteCallKpiResponse(
correlationId,
Success: true,
ErrorMessage: null,
snapshot.BufferedCount,
snapshot.ParkedCount,
snapshot.FailedLastInterval,
snapshot.DeliveredLastInterval,
snapshot.OldestPendingAge,
snapshot.StuckCount);
}
finally
{
scope?.Dispose();
}
}
/// <summary>
/// Handles a per-source-site KPI request, using the same stuck cutoff and
/// interval bound as <see cref="HandleKpi"/>.
/// </summary>
private void HandlePerSiteKpi(PerSiteSiteCallKpiRequest request)
{
var sender = Sender;
var now = DateTime.UtcNow;
var stuckCutoff = now - _options.StuckAgeThreshold;
var intervalSince = now - _options.KpiInterval;
PerSiteKpiAsync(request.CorrelationId, stuckCutoff, intervalSince).PipeTo(
sender,
success: response => response,
failure: ex => new PerSiteSiteCallKpiResponse(
request.CorrelationId,
Success: false,
ErrorMessage: ex.GetBaseException().Message,
Sites: Array.Empty<SiteCallSiteKpiSnapshot>()));
}
private async Task<PerSiteSiteCallKpiResponse> PerSiteKpiAsync(
string correlationId, DateTime stuckCutoff, DateTime intervalSince)
{
var (scope, repository) = ResolveRepository();
try
{
var sites = await repository
.ComputePerSiteKpisAsync(stuckCutoff, intervalSince)
.ConfigureAwait(false);
return new PerSiteSiteCallKpiResponse(
correlationId, Success: true, ErrorMessage: null, sites);
}
finally
{
scope?.Dispose();
}
}
/// <summary>
/// Resolves an <see cref="ISiteCallAuditRepository"/> for one read message.
/// In test mode the injected instance is returned with a null scope; in
/// production a fresh DI scope is created and returned so the caller can
/// dispose it once the read completes — the same scope-per-message pattern
/// as <see cref="OnUpsertAsync"/>.
/// </summary>
private (IServiceScope? Scope, ISiteCallAuditRepository Repository) ResolveRepository()
{
if (_injectedRepository is not null)
{
return (null, _injectedRepository);
}
var scope = _serviceProvider!.CreateScope();
return (scope, scope.ServiceProvider.GetRequiredService<ISiteCallAuditRepository>());
}
/// <summary>
/// A cached call counts as stuck when it is still non-terminal and was
/// created before <paramref name="stuckCutoff"/>. Non-terminal is keyed off
/// <see cref="SiteCall.TerminalAtUtc"/> being <c>null</c> — the
/// <c>SiteCalls</c> operational mirror stores <c>AuditStatus</c>-derived
/// status strings (<c>Attempted</c>/<c>Delivered</c>/<c>Parked</c>/...), not
/// the tracking-lifecycle <c>Pending</c>/<c>Retrying</c> names the spec's
/// KPI section uses, so there is no status string that means "buffered".
/// <c>TerminalAtUtc</c> is the entity's own active/terminal discriminator
/// and is consistent with the repository KPI counts and
/// <c>PurgeTerminalAsync</c>.
/// </summary>
private static bool IsStuck(SiteCall row, DateTime stuckCutoff)
{
return row.TerminalAtUtc is null && row.CreatedAtUtc < stuckCutoff;
}
private static SiteCallSummary ToSummary(SiteCall row, DateTime stuckCutoff)
{
return new SiteCallSummary(
TrackedOperationId: row.TrackedOperationId.Value,
SourceSite: row.SourceSite,
Channel: row.Channel,
Target: row.Target,
Status: row.Status,
RetryCount: row.RetryCount,
LastError: row.LastError,
HttpStatus: row.HttpStatus,
CreatedAtUtc: row.CreatedAtUtc,
UpdatedAtUtc: row.UpdatedAtUtc,
TerminalAtUtc: row.TerminalAtUtc,
IsStuck: IsStuck(row, stuckCutoff));
}
private static SiteCallDetail ToDetail(SiteCall row)
{
return new SiteCallDetail(
TrackedOperationId: row.TrackedOperationId.Value,
SourceSite: row.SourceSite,
Channel: row.Channel,
Target: row.Target,
Status: row.Status,
RetryCount: row.RetryCount,
LastError: row.LastError,
HttpStatus: row.HttpStatus,
CreatedAtUtc: row.CreatedAtUtc,
UpdatedAtUtc: row.UpdatedAtUtc,
TerminalAtUtc: row.TerminalAtUtc,
IngestedAtUtc: row.IngestedAtUtc);
}
/// <summary>
/// Treats an empty/whitespace filter string as "no constraint" — the
/// repository's <see cref="SiteCallQueryFilter"/> interprets <c>null</c> as
/// a no-op predicate, so a blank UI filter must collapse to <c>null</c>.
/// </summary>
private static string? NullIfBlank(string? value)
{
return string.IsNullOrWhiteSpace(value) ? null : value;
}
}

View File

@@ -0,0 +1,26 @@
namespace ScadaLink.SiteCallAudit;
/// <summary>
/// Configuration options for the Site Call Audit (#22) read-side: stuck-call
/// detection and KPI windowing. Mirrors the KPI-relevant subset of
/// <c>NotificationOutboxOptions</c> — the reconciliation, purge and dispatch
/// cadence options the Notification Outbox carries are not part of the Site
/// Call Audit read-side backend and are deliberately omitted here.
/// </summary>
public class SiteCallAuditOptions
{
/// <summary>
/// Age past which a non-terminal cached call (<c>Pending</c>/<c>Retrying</c>)
/// is considered stuck. Display-only — surfaced as the Stuck KPI and a row
/// badge, with no escalation. Default 10 minutes, matching
/// <c>NotificationOutboxOptions.StuckAgeThreshold</c>.
/// </summary>
public TimeSpan StuckAgeThreshold { get; set; } = TimeSpan.FromMinutes(10);
/// <summary>
/// Trailing window used to compute the delivered- and failed-last-interval
/// throughput KPIs. Default 1 minute, matching
/// <c>NotificationOutboxOptions.DeliveredKpiWindow</c>.
/// </summary>
public TimeSpan KpiInterval { get; set; } = TimeSpan.FromMinutes(1);
}

View File

@@ -356,6 +356,12 @@ public class AuditLogIngestActorCombinedTelemetryTests : TestKit, IClassFixture<
_inner.QueryAsync(filter, paging, ct);
public Task<int> PurgeTerminalAsync(DateTime olderThanUtc, CancellationToken ct = default) =>
_inner.PurgeTerminalAsync(olderThanUtc, ct);
public Task<SiteCallKpiSnapshot> ComputeKpisAsync(
DateTime stuckCutoff, DateTime intervalSince, CancellationToken ct = default) =>
_inner.ComputeKpisAsync(stuckCutoff, intervalSince, ct);
public Task<IReadOnlyList<SiteCallSiteKpiSnapshot>> ComputePerSiteKpisAsync(
DateTime stuckCutoff, DateTime intervalSince, CancellationToken ct = default) =>
_inner.ComputePerSiteKpisAsync(stuckCutoff, intervalSince, ct);
}
/// <summary>
@@ -387,5 +393,11 @@ public class AuditLogIngestActorCombinedTelemetryTests : TestKit, IClassFixture<
_inner.QueryAsync(filter, paging, ct);
public Task<int> PurgeTerminalAsync(DateTime olderThanUtc, CancellationToken ct = default) =>
_inner.PurgeTerminalAsync(olderThanUtc, ct);
public Task<SiteCallKpiSnapshot> ComputeKpisAsync(
DateTime stuckCutoff, DateTime intervalSince, CancellationToken ct = default) =>
_inner.ComputeKpisAsync(stuckCutoff, intervalSince, ct);
public Task<IReadOnlyList<SiteCallSiteKpiSnapshot>> ComputePerSiteKpisAsync(
DateTime stuckCutoff, DateTime intervalSince, CancellationToken ct = default) =>
_inner.ComputePerSiteKpisAsync(stuckCutoff, intervalSince, ct);
}
}

View File

@@ -0,0 +1,128 @@
using ScadaLink.Commons.Messages.Audit;
using ScadaLink.Commons.Types.Audit;
namespace ScadaLink.Commons.Tests.Messages;
/// <summary>
/// Site Call Audit (#22): construction, value-equality and optionality tests
/// for the Site Calls UI query / KPI / detail message contracts. Mirrors the
/// Notification Outbox <c>NotificationMessagesTests</c> coverage of the read
/// side, scoped to the contracts the Site Calls page consumes.
/// </summary>
public class SiteCallQueriesTests
{
[Fact]
public void SiteCallQueryRequest_PositionalConstruction_SetsAllFields()
{
var afterCreated = DateTime.UtcNow;
var afterId = Guid.NewGuid();
var request = new SiteCallQueryRequest(
"corr-1", "Parked", "plant-a", "ApiOutbound", "ERP.GetOrder", true,
new DateTime(2026, 5, 1), new DateTime(2026, 5, 20), afterCreated, afterId, 50);
Assert.Equal("corr-1", request.CorrelationId);
Assert.Equal("Parked", request.StatusFilter);
Assert.Equal("plant-a", request.SourceSiteFilter);
Assert.Equal("ApiOutbound", request.ChannelFilter);
Assert.Equal("ERP.GetOrder", request.TargetKeyword);
Assert.True(request.StuckOnly);
Assert.Equal(new DateTime(2026, 5, 1), request.FromUtc);
Assert.Equal(new DateTime(2026, 5, 20), request.ToUtc);
Assert.Equal(afterCreated, request.AfterCreatedAtUtc);
Assert.Equal(afterId, request.AfterId);
Assert.Equal(50, request.PageSize);
}
[Fact]
public void SiteCallQueryRequest_AllowsNullOptionalFilters()
{
var request = new SiteCallQueryRequest(
"corr-2", null, null, null, null, false, null, null, null, null, 25);
Assert.Null(request.StatusFilter);
Assert.Null(request.SourceSiteFilter);
Assert.Null(request.ChannelFilter);
Assert.Null(request.TargetKeyword);
Assert.False(request.StuckOnly);
Assert.Null(request.FromUtc);
Assert.Null(request.AfterId);
}
[Fact]
public void SiteCallQueryResponse_ValueEquality_EqualWhenAllFieldsMatch()
{
var a = new SiteCallQueryResponse("c", true, null, Array.Empty<SiteCallSummary>(), null, null);
var b = new SiteCallQueryResponse("c", true, null, Array.Empty<SiteCallSummary>(), null, null);
Assert.Equal(a, b);
Assert.Equal(a.GetHashCode(), b.GetHashCode());
}
[Fact]
public void SiteCallSummary_CarriesEntityColumnsAndStuckFlag()
{
var id = Guid.NewGuid();
var created = DateTime.UtcNow.AddMinutes(-30);
var summary = new SiteCallSummary(
id, "plant-a", "DbOutbound", "InventoryDb", "Retrying", 3,
"transient 503", 503, created, created.AddMinutes(1), null, IsStuck: true);
Assert.Equal(id, summary.TrackedOperationId);
Assert.Equal("DbOutbound", summary.Channel);
Assert.Equal("InventoryDb", summary.Target);
Assert.Equal("Retrying", summary.Status);
Assert.Equal(3, summary.RetryCount);
Assert.Equal(503, summary.HttpStatus);
Assert.Null(summary.TerminalAtUtc);
Assert.True(summary.IsStuck);
}
[Fact]
public void SiteCallDetailResponse_MissingRow_HasNullDetail()
{
var response = new SiteCallDetailResponse("c", false, "site call not found", null);
Assert.False(response.Success);
Assert.Null(response.Detail);
Assert.Equal("site call not found", response.ErrorMessage);
}
[Fact]
public void SiteCallKpiResponse_FailureShape_ZeroesKpiFields()
{
var response = new SiteCallKpiResponse(
"c", Success: false, ErrorMessage: "db down",
BufferedCount: 0, ParkedCount: 0, FailedLastInterval: 0,
DeliveredLastInterval: 0, OldestPendingAge: null, StuckCount: 0);
Assert.False(response.Success);
Assert.Equal("db down", response.ErrorMessage);
Assert.Equal(0, response.BufferedCount);
Assert.Null(response.OldestPendingAge);
}
[Fact]
public void PerSiteSiteCallKpiResponse_CarriesPerSiteSnapshots()
{
var response = new PerSiteSiteCallKpiResponse(
"c", true, null,
new[]
{
new SiteCallSiteKpiSnapshot("plant-a", 4, 1, 0, 9, TimeSpan.FromMinutes(15), 2),
});
Assert.True(response.Success);
var site = Assert.Single(response.Sites);
Assert.Equal("plant-a", site.SourceSite);
Assert.Equal(4, site.BufferedCount);
Assert.Equal(2, site.StuckCount);
Assert.Equal(TimeSpan.FromMinutes(15), site.OldestPendingAge);
}
[Fact]
public void SiteCallKpiSnapshot_OldestPendingAge_IsNullableForEmptyTable()
{
var snapshot = new SiteCallKpiSnapshot(0, 0, 0, 0, null, 0);
Assert.Null(snapshot.OldestPendingAge);
}
}

View File

@@ -2,8 +2,10 @@ using Akka.Actor;
using Akka.TestKit.Xunit2;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using ScadaLink.Commons.Messages.Audit;
using ScadaLink.Commons.Messages.Deployment;
using ScadaLink.Commons.Messages.Notification;
using ScadaLink.Commons.Types.Audit;
using ScadaLink.Commons.Types.Notifications;
namespace ScadaLink.Communication.Tests;
@@ -236,6 +238,150 @@ public class CommunicationServiceTests : TestKit
Assert.Equal("plant-a", result.Sites[0].SourceSiteId);
}
// ── Site Call Audit: central-side audit actor calls ──
[Fact]
public async Task QuerySiteCallsAsync_BeforeSiteCallAuditSet_Throws()
{
var service = new CommunicationService(
Options.Create(new CommunicationOptions()),
NullLogger<CommunicationService>.Instance);
await Assert.ThrowsAsync<InvalidOperationException>(() =>
service.QuerySiteCallsAsync(new SiteCallQueryRequest(
"corr-1", null, null, null, null, false, null, null, null, null, 50)));
}
[Fact]
public async Task GetSiteCallKpisAsync_BeforeSiteCallAuditSet_Throws()
{
var service = new CommunicationService(
Options.Create(new CommunicationOptions()),
NullLogger<CommunicationService>.Instance);
await Assert.ThrowsAsync<InvalidOperationException>(() =>
service.GetSiteCallKpisAsync(new SiteCallKpiRequest("corr-1")));
}
[Fact]
public async Task GetSiteCallDetailAsync_BeforeSiteCallAuditSet_Throws()
{
var service = new CommunicationService(
Options.Create(new CommunicationOptions()),
NullLogger<CommunicationService>.Instance);
await Assert.ThrowsAsync<InvalidOperationException>(() =>
service.GetSiteCallDetailAsync(new SiteCallDetailRequest("corr-1", Guid.NewGuid())));
}
[Fact]
public async Task GetPerSiteSiteCallKpisAsync_BeforeSiteCallAuditSet_Throws()
{
var service = new CommunicationService(
Options.Create(new CommunicationOptions()),
NullLogger<CommunicationService>.Instance);
await Assert.ThrowsAsync<InvalidOperationException>(() =>
service.GetPerSiteSiteCallKpisAsync(new PerSiteSiteCallKpiRequest("corr-1")));
}
[Fact]
public async Task QuerySiteCallsAsync_AsksSiteCallAuditProxyDirectly()
{
// The Site Call Audit actor is central-local: the request must be Asked
// directly to its proxy (no SiteEnvelope wrapping).
var service = new CommunicationService(
Options.Create(new CommunicationOptions()),
NullLogger<CommunicationService>.Instance);
var probe = CreateTestProbe();
service.SetSiteCallAudit(probe.Ref);
var request = new SiteCallQueryRequest(
"corr-q", "Parked", "plant-a", "ApiOutbound", "ERP.GetOrder", true,
null, null, null, null, 25);
var task = service.QuerySiteCallsAsync(request);
var received = probe.ExpectMsg<SiteCallQueryRequest>();
Assert.Same(request, received);
var reply = new SiteCallQueryResponse(
"corr-q", true, null, Array.Empty<SiteCallSummary>(), null, null);
probe.Reply(reply);
Assert.Same(reply, await task);
}
[Fact]
public async Task GetSiteCallDetailAsync_AsksSiteCallAuditProxyDirectly()
{
var service = new CommunicationService(
Options.Create(new CommunicationOptions()),
NullLogger<CommunicationService>.Instance);
var probe = CreateTestProbe();
service.SetSiteCallAudit(probe.Ref);
var request = new SiteCallDetailRequest("corr-d", Guid.NewGuid());
var task = service.GetSiteCallDetailAsync(request);
var received = probe.ExpectMsg<SiteCallDetailRequest>();
Assert.Same(request, received);
var reply = new SiteCallDetailResponse("corr-d", false, "site call not found", null);
probe.Reply(reply);
var result = await task;
Assert.Same(reply, result);
Assert.False(result.Success);
}
[Fact]
public async Task GetSiteCallKpisAsync_AsksSiteCallAuditProxyDirectly()
{
var service = new CommunicationService(
Options.Create(new CommunicationOptions()),
NullLogger<CommunicationService>.Instance);
var probe = CreateTestProbe();
service.SetSiteCallAudit(probe.Ref);
var request = new SiteCallKpiRequest("corr-k");
var task = service.GetSiteCallKpisAsync(request);
var received = probe.ExpectMsg<SiteCallKpiRequest>();
Assert.Same(request, received);
var reply = new SiteCallKpiResponse(
"corr-k", true, null, 4, 1, 2, 9, TimeSpan.FromMinutes(7), 1);
probe.Reply(reply);
var result = await task;
Assert.Same(reply, result);
Assert.Equal(4, result.BufferedCount);
Assert.Equal(1, result.StuckCount);
}
[Fact]
public async Task GetPerSiteSiteCallKpisAsync_AsksSiteCallAuditProxyDirectly()
{
var service = new CommunicationService(
Options.Create(new CommunicationOptions()),
NullLogger<CommunicationService>.Instance);
var probe = CreateTestProbe();
service.SetSiteCallAudit(probe.Ref);
var request = new PerSiteSiteCallKpiRequest("corr-ps");
var task = service.GetPerSiteSiteCallKpisAsync(request);
var received = probe.ExpectMsg<PerSiteSiteCallKpiRequest>();
Assert.Same(request, received);
var reply = new PerSiteSiteCallKpiResponse(
"corr-ps", true, null,
new[] { new SiteCallSiteKpiSnapshot("plant-a", 3, 0, 0, 5, null, 0) });
probe.Reply(reply);
var result = await task;
Assert.Same(reply, result);
Assert.True(result.Success);
Assert.Single(result.Sites);
Assert.Equal("plant-a", result.Sites[0].SourceSite);
}
/// <summary>
/// Stand-in for CentralCommunicationActor: verifies the message is wrapped
/// in a SiteEnvelope targeting the requested site and replies with a typed

View File

@@ -338,6 +338,104 @@ public class SiteCallAuditRepositoryTests : IClassFixture<MsSqlMigrationFixture>
Assert.NotNull(await repo.GetAsync(recentTerminalId));
}
// --- KPI snapshot tests -------------------------------------------------
[SkippableFact]
public async Task ComputeKpisAsync_CountsBufferedParkedFailedDeliveredAndStuck()
{
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
var site = NewSiteId();
await using var context = CreateContext();
var repo = new SiteCallAuditRepository(context);
var now = DateTime.UtcNow;
var stuckCutoff = now.AddMinutes(-10);
var intervalSince = now.AddHours(-1);
// Buffered + stuck (non-terminal Attempted, created 30 min ago).
await repo.UpsertAsync(NewRow(
TrackedOperationId.New(), site, status: "Attempted", createdAtUtc: now.AddMinutes(-30)));
// Buffered but NOT stuck (non-terminal Attempted, created 2 min ago).
await repo.UpsertAsync(NewRow(
TrackedOperationId.New(), site, status: "Attempted", createdAtUtc: now.AddMinutes(-2)));
// Parked (terminal).
await repo.UpsertAsync(NewRow(
TrackedOperationId.New(), site, status: "Parked",
createdAtUtc: now.AddMinutes(-5), updatedAtUtc: now.AddMinutes(-4),
terminal: true, terminalAtUtc: now.AddMinutes(-4)));
// Delivered within the interval.
await repo.UpsertAsync(NewRow(
TrackedOperationId.New(), site, status: "Delivered",
createdAtUtc: now.AddMinutes(-4), updatedAtUtc: now.AddMinutes(-1),
terminal: true, terminalAtUtc: now.AddMinutes(-1)));
// Failed within the interval.
await repo.UpsertAsync(NewRow(
TrackedOperationId.New(), site, status: "Failed",
createdAtUtc: now.AddMinutes(-6), updatedAtUtc: now.AddMinutes(-2),
terminal: true, terminalAtUtc: now.AddMinutes(-2)));
// Delivered OUTSIDE the interval (2 hours ago) — must not count.
await repo.UpsertAsync(NewRow(
TrackedOperationId.New(), site, status: "Delivered",
createdAtUtc: now.AddHours(-3), updatedAtUtc: now.AddHours(-2),
terminal: true, terminalAtUtc: now.AddHours(-2)));
var snapshot = await repo.ComputeKpisAsync(stuckCutoff, intervalSince);
// Counts are global; assert the floor since the table is shared with
// other tests. The OUTSIDE-interval Delivered row proves the window
// bounds the throughput counts.
Assert.True(snapshot.BufferedCount >= 2);
Assert.True(snapshot.ParkedCount >= 1);
Assert.True(snapshot.StuckCount >= 1);
Assert.True(snapshot.DeliveredLastInterval >= 1);
Assert.True(snapshot.FailedLastInterval >= 1);
Assert.NotNull(snapshot.OldestPendingAge);
Assert.True(snapshot.OldestPendingAge >= TimeSpan.FromMinutes(25));
}
[SkippableFact]
public async Task ComputePerSiteKpisAsync_ScopesCountsToEachSite()
{
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
var siteA = NewSiteId();
var siteB = NewSiteId();
await using var context = CreateContext();
var repo = new SiteCallAuditRepository(context);
var now = DateTime.UtcNow;
var stuckCutoff = now.AddMinutes(-10);
var intervalSince = now.AddHours(-1);
// siteA: 2 buffered (one stuck), 1 parked.
await repo.UpsertAsync(NewRow(TrackedOperationId.New(), siteA, status: "Attempted", createdAtUtc: now.AddMinutes(-30)));
await repo.UpsertAsync(NewRow(TrackedOperationId.New(), siteA, status: "Attempted", createdAtUtc: now.AddMinutes(-2)));
await repo.UpsertAsync(NewRow(
TrackedOperationId.New(), siteA, status: "Parked",
createdAtUtc: now.AddMinutes(-5), updatedAtUtc: now.AddMinutes(-4),
terminal: true, terminalAtUtc: now.AddMinutes(-4)));
// siteB: 1 delivered within interval only.
await repo.UpsertAsync(NewRow(
TrackedOperationId.New(), siteB, status: "Delivered",
createdAtUtc: now.AddMinutes(-4), updatedAtUtc: now.AddMinutes(-1),
terminal: true, terminalAtUtc: now.AddMinutes(-1)));
var perSite = await repo.ComputePerSiteKpisAsync(stuckCutoff, intervalSince);
var a = Assert.Single(perSite, s => s.SourceSite == siteA);
Assert.Equal(2, a.BufferedCount);
Assert.Equal(1, a.ParkedCount);
Assert.Equal(1, a.StuckCount);
Assert.NotNull(a.OldestPendingAge);
var b = Assert.Single(perSite, s => s.SourceSite == siteB);
Assert.Equal(0, b.BufferedCount);
Assert.Equal(1, b.DeliveredLastInterval);
// siteB has no non-terminal rows — no oldest-pending age.
Assert.Null(b.OldestPendingAge);
}
// --- helpers ------------------------------------------------------------
private ScadaLinkDbContext CreateContext()

View File

@@ -70,10 +70,12 @@ public class SiteCallAuditActorTests : TestKit, IClassFixture<MsSqlMigrationFixt
};
}
private IActorRef CreateActor(ISiteCallAuditRepository repository) =>
private IActorRef CreateActor(
ISiteCallAuditRepository repository, SiteCallAuditOptions? options = null) =>
Sys.ActorOf(Props.Create(() => new SiteCallAuditActor(
repository,
NullLogger<SiteCallAuditActor>.Instance)));
NullLogger<SiteCallAuditActor>.Instance,
options)));
[SkippableFact]
public async Task Receive_UpsertSiteCallCommand_Persists_Replies_Accepted()
@@ -182,6 +184,291 @@ public class SiteCallAuditActorTests : TestKit, IClassFixture<MsSqlMigrationFixt
Assert.Equal(healthyId, rows[0].TrackedOperationId);
}
// ── Task 4: read-side (query / detail / KPI) handlers ──
[SkippableFact]
public async Task SiteCallQueryRequest_FilterBySourceSite_ReturnsMatchingSummaries()
{
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
var siteId = NewSiteId();
await using var context = CreateContext();
var repo = new SiteCallAuditRepository(context);
var actor = CreateActor(repo);
var t0 = new DateTime(2026, 5, 20, 12, 0, 0, DateTimeKind.Utc);
await repo.UpsertAsync(NewRow(TrackedOperationId.New(), siteId, status: "Attempted", createdAtUtc: t0));
await repo.UpsertAsync(NewRow(
TrackedOperationId.New(), siteId, status: "Delivered",
createdAtUtc: t0.AddMinutes(1), terminal: true));
actor.Tell(
new SiteCallQueryRequest(
"corr-q1", StatusFilter: null, SourceSiteFilter: siteId, ChannelFilter: null,
TargetKeyword: null, StuckOnly: false, FromUtc: null, ToUtc: null,
AfterCreatedAtUtc: null, AfterId: null, PageSize: 50),
TestActor);
var response = ExpectMsg<SiteCallQueryResponse>(TimeSpan.FromSeconds(10));
Assert.True(response.Success);
Assert.Equal("corr-q1", response.CorrelationId);
Assert.Equal(2, response.SiteCalls.Count);
Assert.All(response.SiteCalls, s => Assert.Equal(siteId, s.SourceSite));
// Newest first — ordered (CreatedAtUtc DESC).
Assert.Equal("Delivered", response.SiteCalls[0].Status);
// Cursor echoes the last (oldest) row of the page.
Assert.Equal(t0, response.NextAfterCreatedAtUtc);
Assert.Equal(response.SiteCalls[^1].TrackedOperationId, response.NextAfterId);
}
[SkippableFact]
public async Task SiteCallQueryRequest_KeysetPaging_AdvancesViaCursor()
{
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
var siteId = NewSiteId();
await using var context = CreateContext();
var repo = new SiteCallAuditRepository(context);
var actor = CreateActor(repo);
var t0 = new DateTime(2026, 5, 20, 13, 0, 0, DateTimeKind.Utc);
for (var i = 0; i < 3; i++)
{
await repo.UpsertAsync(NewRow(TrackedOperationId.New(), siteId, createdAtUtc: t0.AddMinutes(i)));
}
actor.Tell(
new SiteCallQueryRequest(
"corr-q2", null, siteId, null, null, false, null, null, null, null, PageSize: 2),
TestActor);
var page1 = ExpectMsg<SiteCallQueryResponse>(TimeSpan.FromSeconds(10));
Assert.Equal(2, page1.SiteCalls.Count);
actor.Tell(
new SiteCallQueryRequest(
"corr-q3", null, siteId, null, null, false, null, null,
page1.NextAfterCreatedAtUtc, page1.NextAfterId, PageSize: 2),
TestActor);
var page2 = ExpectMsg<SiteCallQueryResponse>(TimeSpan.FromSeconds(10));
Assert.Single(page2.SiteCalls);
// No overlap across the two pages.
var allIds = page1.SiteCalls.Concat(page2.SiteCalls)
.Select(s => s.TrackedOperationId).ToHashSet();
Assert.Equal(3, allIds.Count);
}
[SkippableFact]
public async Task SiteCallQueryRequest_StuckOnly_ReturnsOnlyOldNonTerminalRows()
{
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
var siteId = NewSiteId();
await using var context = CreateContext();
var repo = new SiteCallAuditRepository(context);
// 10-minute stuck threshold (the production default).
var actor = CreateActor(repo, new SiteCallAuditOptions { StuckAgeThreshold = TimeSpan.FromMinutes(10) });
var now = DateTime.UtcNow;
// Stuck: non-terminal (Attempted, TerminalAtUtc null), created 30 min ago.
var stuckId = TrackedOperationId.New();
await repo.UpsertAsync(NewRow(stuckId, siteId, status: "Attempted", createdAtUtc: now.AddMinutes(-30)));
// Not stuck: non-terminal but recent.
await repo.UpsertAsync(NewRow(TrackedOperationId.New(), siteId, status: "Attempted", createdAtUtc: now.AddMinutes(-2)));
// Not stuck: old but terminal (Delivered, TerminalAtUtc set).
await repo.UpsertAsync(NewRow(
TrackedOperationId.New(), siteId, status: "Delivered",
createdAtUtc: now.AddMinutes(-40), terminal: true));
actor.Tell(
new SiteCallQueryRequest(
"corr-stuck", null, siteId, null, null, StuckOnly: true,
null, null, null, null, PageSize: 50),
TestActor);
var response = ExpectMsg<SiteCallQueryResponse>(TimeSpan.FromSeconds(10));
Assert.True(response.Success);
Assert.Single(response.SiteCalls);
Assert.Equal(stuckId.Value, response.SiteCalls[0].TrackedOperationId);
Assert.True(response.SiteCalls[0].IsStuck);
}
[SkippableFact]
public async Task SiteCallDetailRequest_KnownId_ReturnsFullDetail()
{
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
var siteId = NewSiteId();
var id = TrackedOperationId.New();
await using var context = CreateContext();
var repo = new SiteCallAuditRepository(context);
var actor = CreateActor(repo);
await repo.UpsertAsync(NewRow(id, siteId, status: "Attempted", retryCount: 2, lastError: "503"));
actor.Tell(new SiteCallDetailRequest("corr-d1", id.Value), TestActor);
var response = ExpectMsg<SiteCallDetailResponse>(TimeSpan.FromSeconds(10));
Assert.True(response.Success);
Assert.NotNull(response.Detail);
Assert.Equal(id.Value, response.Detail!.TrackedOperationId);
Assert.Equal("Attempted", response.Detail.Status);
Assert.Equal(2, response.Detail.RetryCount);
Assert.Equal("503", response.Detail.LastError);
Assert.Equal(siteId, response.Detail.SourceSite);
}
[SkippableFact]
public async Task SiteCallDetailRequest_UnknownId_RepliesNotFound()
{
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
await using var context = CreateContext();
var repo = new SiteCallAuditRepository(context);
var actor = CreateActor(repo);
actor.Tell(new SiteCallDetailRequest("corr-d2", Guid.NewGuid()), TestActor);
var response = ExpectMsg<SiteCallDetailResponse>(TimeSpan.FromSeconds(10));
Assert.False(response.Success);
Assert.Null(response.Detail);
Assert.NotNull(response.ErrorMessage);
}
[SkippableFact]
public async Task SiteCallKpiRequest_ComputesPointInTimeCounts()
{
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
var siteId = NewSiteId();
await using var context = CreateContext();
var repo = new SiteCallAuditRepository(context);
var actor = CreateActor(repo, new SiteCallAuditOptions
{
StuckAgeThreshold = TimeSpan.FromMinutes(10),
KpiInterval = TimeSpan.FromHours(1),
});
var now = DateTime.UtcNow;
// Buffered (non-terminal Attempted) + stuck (created 30 min ago).
await repo.UpsertAsync(NewRow(TrackedOperationId.New(), siteId, status: "Attempted", createdAtUtc: now.AddMinutes(-30)));
// Buffered (non-terminal Attempted), not stuck.
await repo.UpsertAsync(NewRow(TrackedOperationId.New(), siteId, status: "Attempted", createdAtUtc: now.AddMinutes(-2)));
// Parked (terminal).
await repo.UpsertAsync(NewRow(
TrackedOperationId.New(), siteId, status: "Parked",
createdAtUtc: now.AddMinutes(-5), terminal: true));
// Delivered within the interval.
await repo.UpsertAsync(NewRow(
TrackedOperationId.New(), siteId, status: "Delivered",
createdAtUtc: now.AddMinutes(-3), updatedAtUtc: now.AddMinutes(-1), terminal: true));
actor.Tell(new SiteCallKpiRequest("corr-kpi"), TestActor);
var response = ExpectMsg<SiteCallKpiResponse>(TimeSpan.FromSeconds(10));
Assert.True(response.Success);
// Per-site rows are isolated by the unique siteId — but KPIs are global,
// so assert the floor (>=) rather than exact counts: other tests' rows
// may share the table.
Assert.True(response.BufferedCount >= 2);
Assert.True(response.ParkedCount >= 1);
Assert.True(response.DeliveredLastInterval >= 1);
Assert.True(response.StuckCount >= 1);
Assert.NotNull(response.OldestPendingAge);
}
[SkippableFact]
public async Task PerSiteSiteCallKpiRequest_ScopesCountsToEachSite()
{
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
var siteId = NewSiteId();
await using var context = CreateContext();
var repo = new SiteCallAuditRepository(context);
var actor = CreateActor(repo, new SiteCallAuditOptions
{
StuckAgeThreshold = TimeSpan.FromMinutes(10),
KpiInterval = TimeSpan.FromHours(1),
});
var now = DateTime.UtcNow;
// Non-terminal Attempted, created 30 min ago — buffered + stuck.
await repo.UpsertAsync(NewRow(TrackedOperationId.New(), siteId, status: "Attempted", createdAtUtc: now.AddMinutes(-30)));
// Terminal Parked.
await repo.UpsertAsync(NewRow(
TrackedOperationId.New(), siteId, status: "Parked",
createdAtUtc: now.AddMinutes(-5), terminal: true));
actor.Tell(new PerSiteSiteCallKpiRequest("corr-psk"), TestActor);
var response = ExpectMsg<PerSiteSiteCallKpiResponse>(TimeSpan.FromSeconds(10));
Assert.True(response.Success);
var mySite = Assert.Single(response.Sites, s => s.SourceSite == siteId);
Assert.Equal(1, mySite.BufferedCount);
Assert.Equal(1, mySite.ParkedCount);
Assert.Equal(1, mySite.StuckCount);
Assert.NotNull(mySite.OldestPendingAge);
}
[SkippableFact]
public async Task SiteCallQueryRequest_RepoThrows_RepliesFailure_ActorStaysAlive()
{
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
var siteId = NewSiteId();
await using var context = CreateContext();
var realRepo = new SiteCallAuditRepository(context);
var actor = CreateActor(new QueryThrowingRepository(realRepo));
actor.Tell(
new SiteCallQueryRequest(
"corr-fault", null, siteId, null, null, false, null, null, null, null, 50),
TestActor);
var response = ExpectMsg<SiteCallQueryResponse>(TimeSpan.FromSeconds(10));
Assert.False(response.Success);
Assert.Empty(response.SiteCalls);
Assert.NotNull(response.ErrorMessage);
Assert.Equal("corr-fault", response.CorrelationId);
}
/// <summary>
/// Test double whose <see cref="ISiteCallAuditRepository.QueryAsync"/> always
/// throws — used to verify the query handler's failure projection produces a
/// <c>Success=false</c> response without crashing the actor.
/// </summary>
private sealed class QueryThrowingRepository : ISiteCallAuditRepository
{
private readonly ISiteCallAuditRepository _inner;
public QueryThrowingRepository(ISiteCallAuditRepository inner)
{
_inner = inner;
}
public Task UpsertAsync(SiteCall siteCall, CancellationToken ct = default) =>
_inner.UpsertAsync(siteCall, ct);
public Task<SiteCall?> GetAsync(TrackedOperationId id, CancellationToken ct = default) =>
_inner.GetAsync(id, ct);
public Task<IReadOnlyList<SiteCall>> QueryAsync(
SiteCallQueryFilter filter, SiteCallPaging paging, CancellationToken ct = default) =>
throw new InvalidOperationException("simulated query failure");
public Task<int> PurgeTerminalAsync(DateTime olderThanUtc, CancellationToken ct = default) =>
_inner.PurgeTerminalAsync(olderThanUtc, ct);
public Task<SiteCallKpiSnapshot> ComputeKpisAsync(
DateTime stuckCutoff, DateTime intervalSince, CancellationToken ct = default) =>
_inner.ComputeKpisAsync(stuckCutoff, intervalSince, ct);
public Task<IReadOnlyList<SiteCallSiteKpiSnapshot>> ComputePerSiteKpisAsync(
DateTime stuckCutoff, DateTime intervalSince, CancellationToken ct = default) =>
_inner.ComputePerSiteKpisAsync(stuckCutoff, intervalSince, ct);
}
/// <summary>
/// Tiny test double that delegates to a real repository but throws on a
/// specified <see cref="TrackedOperationId"/>. Used to verify the actor's
@@ -217,5 +504,13 @@ public class SiteCallAuditActorTests : TestKit, IClassFixture<MsSqlMigrationFixt
public Task<int> PurgeTerminalAsync(DateTime olderThanUtc, CancellationToken ct = default) =>
_inner.PurgeTerminalAsync(olderThanUtc, ct);
public Task<SiteCallKpiSnapshot> ComputeKpisAsync(
DateTime stuckCutoff, DateTime intervalSince, CancellationToken ct = default) =>
_inner.ComputeKpisAsync(stuckCutoff, intervalSince, ct);
public Task<IReadOnlyList<SiteCallSiteKpiSnapshot>> ComputePerSiteKpisAsync(
DateTime stuckCutoff, DateTime intervalSince, CancellationToken ct = default) =>
_inner.ComputePerSiteKpisAsync(stuckCutoff, intervalSince, ct);
}
}

View File

@@ -0,0 +1,15 @@
namespace ScadaLink.SiteCallAudit.Tests;
public class SiteCallAuditOptionsTests
{
[Fact]
public void Defaults_AreExpectedValues()
{
var options = new SiteCallAuditOptions();
// Stuck threshold mirrors NotificationOutboxOptions.StuckAgeThreshold.
Assert.Equal(TimeSpan.FromMinutes(10), options.StuckAgeThreshold);
// KPI interval mirrors NotificationOutboxOptions.DeliveredKpiWindow.
Assert.Equal(TimeSpan.FromMinutes(1), options.KpiInterval);
}
}