feat(configdb): ISiteCallAuditRepository + EF impl, monotonic upsert (#22, #23 M3)

Bundle B3 of Audit Log #23 M3: data-access layer for the central SiteCalls
table introduced in B1+B2. UpsertAsync is insert-if-not-exists then
monotonic-status update so out-of-order telemetry, duplicate gRPC packets,
and reconciliation pulls all converge on the same row without rolling
state backward.

- src/ScadaLink.Commons/Interfaces/Repositories/ISiteCallAuditRepository.cs:
  UpsertAsync (monotonic), GetAsync, QueryAsync, PurgeTerminalAsync.
- src/ScadaLink.Commons/Types/Audit/SiteCallQueryFilter.cs +
  SiteCallPaging.cs: filter (Channel/SourceSite/Status/Target/time range)
  and keyset paging cursor on (CreatedAtUtc DESC, TrackedOperationId DESC),
  mirrored on M1's AuditLog* equivalents.
- src/ScadaLink.ConfigurationDatabase/Repositories/SiteCallAuditRepository.cs:
  raw-SQL InsertIfNotExists + conditional UPDATE with inline CASE rank
  compare (Submitted=0, Forwarded=1, Attempted/Skipped=2, terminal=3 —
  terminal statuses are mutually exclusive so e.g. Delivered cannot
  overwrite Parked). Duplicate-key violations (SQL 2601/2627) are
  swallowed at Debug, identical to AuditLogRepository's race-fix.
  QueryAsync uses FromSqlInterpolated because EF Core 10 cannot translate
  string.Compare against the value-converted TrackedOperationId column
  inside an expression tree.
- ServiceCollectionExtensions wires the repository (scoped, after
  IAuditLogRepository).
- 12 integration tests in tests/ScadaLink.ConfigurationDatabase.Tests/
  Repositories/ (MsSqlMigrationFixture + [SkippableFact]): fresh insert,
  monotonic advance, older-status no-op, same-status no-op,
  terminal-over-terminal no-op, 50-way concurrent-insert race produces
  exactly one row, Get known/unknown, filter by site, keyset paging no
  overlap, purge terminal-and-old, purge keeps non-terminal-and-recent.
This commit is contained in:
Joseph Doherty
2026-05-20 14:10:24 -04:00
parent 6667f345fa
commit bedfa6b8f3
6 changed files with 705 additions and 0 deletions

View File

@@ -0,0 +1,66 @@
using ScadaLink.Commons.Entities.Audit;
using ScadaLink.Commons.Types;
using ScadaLink.Commons.Types.Audit;
namespace ScadaLink.Commons.Interfaces.Repositories;
/// <summary>
/// Operational-state data access for the central <c>SiteCalls</c> table
/// (Site Call Audit #22, Audit Log #23 M3 Bundle B). One row per
/// <see cref="TrackedOperationId"/>; sites remain the source of truth and this
/// table is an eventually-consistent mirror fed by best-effort gRPC telemetry
/// plus periodic reconciliation pulls.
/// </summary>
/// <remarks>
/// <para>
/// Unlike the partitioned append-only <c>AuditLog</c> (M1), this table holds
/// mutable operational state. <see cref="UpsertAsync"/> is insert-if-not-exists
/// then monotonic update — a status update with rank less than or equal to the
/// stored status is a silent no-op so out-of-order telemetry, duplicate gRPC
/// packets, and reconciliation pulls can all feed the same writer without
/// rolling state backward.
/// </para>
/// <para>
/// Status rank for monotonic comparison (lower wins): <c>Submitted=0,
/// Forwarded=1, Attempted=2, Skipped=2, Delivered=3, Failed=3, Parked=3,
/// Discarded=3</c>. Terminal statuses share rank 3 and are mutually exclusive
/// — an attempt to upsert e.g. <c>Delivered</c> over an existing <c>Parked</c>
/// row is a no-op.
/// </para>
/// </remarks>
public interface ISiteCallAuditRepository
{
/// <summary>
/// Inserts <paramref name="siteCall"/> if no row with the same
/// <see cref="SiteCall.TrackedOperationId"/> exists; otherwise updates the
/// existing row IF AND ONLY IF the incoming status' rank strictly exceeds
/// the stored status' rank. Out-of-order / duplicate updates are silently
/// dropped (monotonic forward-only progression).
/// </summary>
Task UpsertAsync(SiteCall siteCall, CancellationToken ct = default);
/// <summary>
/// Returns the row for the given id, or <c>null</c> if none exists.
/// </summary>
Task<SiteCall?> GetAsync(TrackedOperationId id, CancellationToken ct = default);
/// <summary>
/// Returns up to <see cref="SiteCallPaging.PageSize"/> rows matching
/// <paramref name="filter"/>, ordered by <c>(CreatedAtUtc DESC,
/// TrackedOperationId DESC)</c>. Use keyset paging via
/// <see cref="SiteCallPaging.AfterCreatedAtUtc"/> + <see cref="SiteCallPaging.AfterId"/>
/// to fetch subsequent pages.
/// </summary>
Task<IReadOnlyList<SiteCall>> QueryAsync(
SiteCallQueryFilter filter,
SiteCallPaging paging,
CancellationToken ct = default);
/// <summary>
/// Deletes terminal rows whose <see cref="SiteCall.TerminalAtUtc"/> is
/// strictly older than <paramref name="olderThanUtc"/>. Non-terminal rows
/// (TerminalAtUtc IS NULL) are NEVER purged. Returns the number of rows
/// deleted.
/// </summary>
Task<int> PurgeTerminalAsync(DateTime olderThanUtc, CancellationToken ct = default);
}

View File

@@ -0,0 +1,15 @@
namespace ScadaLink.Commons.Types.Audit;
/// <summary>
/// Keyset paging cursor for
/// <see cref="ScadaLink.Commons.Interfaces.Repositories.ISiteCallAuditRepository.QueryAsync"/>.
/// The repository orders by <c>(CreatedAtUtc DESC, TrackedOperationId DESC)</c> — newest
/// calls first, with the strong-typed id breaking ties when two calls share an exact
/// <c>CreatedAtUtc</c>. Callers pass the last row of the previous page back as
/// <see cref="AfterCreatedAtUtc"/> + <see cref="AfterId"/> to fetch the next page.
/// Both must be non-null together, or both null (first page).
/// </summary>
public sealed record SiteCallPaging(
int PageSize,
DateTime? AfterCreatedAtUtc = null,
TrackedOperationId? AfterId = null);

View File

@@ -0,0 +1,21 @@
namespace ScadaLink.Commons.Types.Audit;
/// <summary>
/// Filter predicate for <see cref="ScadaLink.Commons.Interfaces.Repositories.ISiteCallAuditRepository.QueryAsync"/>.
/// Any field left <c>null</c> means "do not constrain on that column". Time bounds
/// are half-open in the spec sense — <see cref="FromUtc"/> is inclusive and
/// <see cref="ToUtc"/> is inclusive of the upper bound; the repository SQL uses
/// <c>&gt;=</c> / <c>&lt;=</c> respectively. All filter fields are AND-combined.
/// </summary>
/// <remarks>
/// Channel / Status / SourceSite / Target are matched as exact strings — the
/// underlying columns are bounded ASCII (varchar) and the Central UI Site Calls
/// page exposes them as drop-down filters, not free-text search.
/// </remarks>
public sealed record SiteCallQueryFilter(
string? Channel = null,
string? SourceSite = null,
string? Status = null,
string? Target = null,
DateTime? FromUtc = null,
DateTime? ToUtc = null);

View File

@@ -0,0 +1,214 @@
using Microsoft.Data.SqlClient;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using ScadaLink.Commons.Entities.Audit;
using ScadaLink.Commons.Interfaces.Repositories;
using ScadaLink.Commons.Types;
using ScadaLink.Commons.Types.Audit;
namespace ScadaLink.ConfigurationDatabase.Repositories;
/// <summary>
/// EF Core implementation of <see cref="ISiteCallAuditRepository"/>. See the
/// interface for the monotonic-upsert contract; this class adds notes on the
/// data-access strategy used by each method.
/// </summary>
public class SiteCallAuditRepository : ISiteCallAuditRepository
{
// SQL Server duplicate-key error numbers, identical to the AuditLogRepository
// race-fix: 2601 = unique-index violation, 2627 = PK/unique-constraint
// violation. The IF NOT EXISTS … INSERT pattern has a check-then-act window
// and the loser surfaces as one of these; monotonic-upsert semantics demand
// we swallow them.
private const int SqlErrorUniqueIndexViolation = 2601;
private const int SqlErrorPrimaryKeyViolation = 2627;
// Monotonic status ordering. Lower rank wins on tie (same-rank upserts are
// no-ops, including terminal-over-terminal). Spec from Bundle B3 plan:
// Submitted < Forwarded < Attempted == Skipped < Delivered == Failed == Parked == Discarded.
private static readonly Dictionary<string, int> StatusRank = new(StringComparer.Ordinal)
{
["Submitted"] = 0,
["Forwarded"] = 1,
["Attempted"] = 2,
["Skipped"] = 2,
["Delivered"] = 3,
["Failed"] = 3,
["Parked"] = 3,
["Discarded"] = 3,
};
private readonly ScadaLinkDbContext _context;
private readonly ILogger<SiteCallAuditRepository> _logger;
public SiteCallAuditRepository(ScadaLinkDbContext context, ILogger<SiteCallAuditRepository>? logger = null)
{
_context = context ?? throw new ArgumentNullException(nameof(context));
_logger = logger ?? NullLogger<SiteCallAuditRepository>.Instance;
}
/// <summary>
/// Two-step: <c>IF NOT EXISTS INSERT</c> then conditional <c>UPDATE</c> with
/// an inline <c>CASE</c> rank comparison. Both go through
/// <see cref="Microsoft.EntityFrameworkCore.RelationalDatabaseFacadeExtensions.ExecuteSqlInterpolatedAsync"/>
/// so the change tracker is bypassed and the value-converted PK column is
/// written as the canonical "D"-format GUID string. Duplicate-key violations
/// from the insert race are swallowed.
/// </summary>
public async Task UpsertAsync(SiteCall siteCall, CancellationToken ct = default)
{
if (siteCall is null)
{
throw new ArgumentNullException(nameof(siteCall));
}
var idText = siteCall.TrackedOperationId.Value.ToString("D");
var incomingRank = GetRankOrThrow(siteCall.Status);
// Step 1: insert-if-not-exists. Like AuditLogRepository.InsertIfNotExistsAsync
// this is check-then-act so a duplicate-key violation may surface under
// concurrent inserts on the same id — caught + logged at Debug.
try
{
await _context.Database.ExecuteSqlInterpolatedAsync(
$@"IF NOT EXISTS (SELECT 1 FROM dbo.SiteCalls WHERE TrackedOperationId = {idText})
INSERT INTO dbo.SiteCalls
(TrackedOperationId, Channel, Target, SourceSite, Status, RetryCount,
LastError, HttpStatus, CreatedAtUtc, UpdatedAtUtc, TerminalAtUtc, IngestedAtUtc)
VALUES
({idText}, {siteCall.Channel}, {siteCall.Target}, {siteCall.SourceSite}, {siteCall.Status}, {siteCall.RetryCount},
{siteCall.LastError}, {siteCall.HttpStatus}, {siteCall.CreatedAtUtc}, {siteCall.UpdatedAtUtc}, {siteCall.TerminalAtUtc}, {siteCall.IngestedAtUtc});",
ct);
}
catch (SqlException ex) when (
ex.Number == SqlErrorUniqueIndexViolation
|| ex.Number == SqlErrorPrimaryKeyViolation)
{
_logger.LogDebug(
ex,
"SiteCallAuditRepository.UpsertAsync swallowed duplicate-key violation (error {SqlErrorNumber}) for TrackedOperationId {TrackedOperationId}; falling through to monotonic update.",
ex.Number,
idText);
}
// Step 2: monotonic update. The CASE expression maps the stored Status
// string to the same rank table the caller uses; we only mutate if the
// incoming rank is strictly greater. Same-rank (including
// terminal-over-terminal) is a no-op — first-write-wins at each rank.
await _context.Database.ExecuteSqlInterpolatedAsync(
$@"UPDATE dbo.SiteCalls
SET Status = {siteCall.Status},
RetryCount = {siteCall.RetryCount},
LastError = {siteCall.LastError},
HttpStatus = {siteCall.HttpStatus},
UpdatedAtUtc = {siteCall.UpdatedAtUtc},
TerminalAtUtc = {siteCall.TerminalAtUtc},
IngestedAtUtc = {siteCall.IngestedAtUtc}
WHERE TrackedOperationId = {idText}
AND {incomingRank} > (CASE Status
WHEN 'Submitted' THEN 0
WHEN 'Forwarded' THEN 1
WHEN 'Attempted' THEN 2
WHEN 'Skipped' THEN 2
WHEN 'Delivered' THEN 3
WHEN 'Failed' THEN 3
WHEN 'Parked' THEN 3
WHEN 'Discarded' THEN 3
ELSE -1
END);",
ct);
}
/// <summary>
/// Single <c>FindAsync</c> against the PK. Returns <c>null</c> for unknown ids.
/// </summary>
public async Task<SiteCall?> GetAsync(TrackedOperationId id, CancellationToken ct = default)
{
return await _context.Set<SiteCall>().FindAsync(new object?[] { id }, ct);
}
/// <summary>
/// Builds a parameterised SQL query against <c>dbo.SiteCalls</c> ordered by
/// <c>(CreatedAtUtc DESC, TrackedOperationId DESC)</c>, with keyset paging.
/// Raw SQL is used here (rather than LINQ) because EF Core 10 cannot
/// translate the lexicographic string comparison against the value-converted
/// <see cref="TrackedOperationId"/> column inside an expression tree — the
/// converter is applied to equality but not to inequality comparisons
/// against the underlying Guid. The keyset tiebreaker is varchar lex order,
/// which is deterministic and gives "no overlap, every row exactly once"
/// paging without depending on Guid byte ordering.
/// </summary>
public async Task<IReadOnlyList<SiteCall>> QueryAsync(
SiteCallQueryFilter filter, SiteCallPaging paging, CancellationToken ct = default)
{
if (filter is null)
{
throw new ArgumentNullException(nameof(filter));
}
if (paging is null)
{
throw new ArgumentNullException(nameof(paging));
}
// FormattableString interpolation parameterises every value (no concatenation)
// so this is injection-safe. EF Core resolves the parameter values, the
// composed sql is shaped to SQL Server's grammar and projected into the
// SiteCall entity via FromSqlInterpolated. The CASE expressions wrap each
// optional predicate so a null filter field degrades to a no-op (matches
// every row) instead of branching at C# level into N variants.
var afterCreated = paging.AfterCreatedAtUtc;
var afterIdString = paging.AfterId?.Value.ToString("D");
var hasCursor = afterCreated is not null && afterIdString is not null;
var fromUtc = filter.FromUtc;
var toUtc = filter.ToUtc;
FormattableString sql = $@"
SELECT TOP ({paging.PageSize})
TrackedOperationId, Channel, Target, SourceSite, Status, RetryCount,
LastError, HttpStatus, CreatedAtUtc, UpdatedAtUtc, TerminalAtUtc, IngestedAtUtc
FROM dbo.SiteCalls
WHERE ({filter.Channel} IS NULL OR Channel = {filter.Channel})
AND ({filter.SourceSite} IS NULL OR SourceSite = {filter.SourceSite})
AND ({filter.Status} IS NULL OR Status = {filter.Status})
AND ({filter.Target} IS NULL OR Target = {filter.Target})
AND ({fromUtc} IS NULL OR CreatedAtUtc >= {fromUtc})
AND ({toUtc} IS NULL OR CreatedAtUtc <= {toUtc})
AND ({(hasCursor ? 1 : 0)} = 0
OR CreatedAtUtc < {afterCreated}
OR (CreatedAtUtc = {afterCreated} AND TrackedOperationId < {afterIdString}))
ORDER BY CreatedAtUtc DESC, TrackedOperationId DESC;";
var rows = await _context.Set<SiteCall>()
.FromSqlInterpolated(sql)
.AsNoTracking()
.ToListAsync(ct);
return rows;
}
/// <summary>
/// Deletes rows whose <see cref="SiteCall.TerminalAtUtc"/> is non-null AND
/// strictly less than <paramref name="olderThanUtc"/>. Non-terminal rows are
/// never touched. Returns the number of rows removed.
/// </summary>
public async Task<int> PurgeTerminalAsync(DateTime olderThanUtc, CancellationToken ct = default)
{
return await _context.Database.ExecuteSqlInterpolatedAsync(
$"DELETE FROM dbo.SiteCalls WHERE TerminalAtUtc IS NOT NULL AND TerminalAtUtc < {olderThanUtc};",
ct);
}
private static int GetRankOrThrow(string status)
{
if (!StatusRank.TryGetValue(status, out var rank))
{
throw new ArgumentException(
$"Unknown SiteCall status '{status}'. Expected one of: {string.Join(", ", StatusRank.Keys)}.",
nameof(status));
}
return rank;
}
}

View File

@@ -47,6 +47,7 @@ public static class ServiceCollectionExtensions
services.AddScoped<INotificationRepository, NotificationRepository>(); services.AddScoped<INotificationRepository, NotificationRepository>();
services.AddScoped<INotificationOutboxRepository, NotificationOutboxRepository>(); services.AddScoped<INotificationOutboxRepository, NotificationOutboxRepository>();
services.AddScoped<IAuditLogRepository, AuditLogRepository>(); services.AddScoped<IAuditLogRepository, AuditLogRepository>();
services.AddScoped<ISiteCallAuditRepository, SiteCallAuditRepository>();
services.AddScoped<IInboundApiRepository, InboundApiRepository>(); services.AddScoped<IInboundApiRepository, InboundApiRepository>();
services.AddScoped<IAuditService, AuditService>(); services.AddScoped<IAuditService, AuditService>();
services.AddScoped<IInstanceLocator, InstanceLocator>(); services.AddScoped<IInstanceLocator, InstanceLocator>();

View File

@@ -0,0 +1,388 @@
using Microsoft.EntityFrameworkCore;
using ScadaLink.Commons.Entities.Audit;
using ScadaLink.Commons.Types;
using ScadaLink.Commons.Types.Audit;
using ScadaLink.ConfigurationDatabase;
using ScadaLink.ConfigurationDatabase.Repositories;
using ScadaLink.ConfigurationDatabase.Tests.Migrations;
using Xunit;
namespace ScadaLink.ConfigurationDatabase.Tests.Repositories;
/// <summary>
/// Bundle B3 (#22, #23 M3) integration tests for <see cref="SiteCallAuditRepository"/>.
/// Uses the same <see cref="MsSqlMigrationFixture"/> as the Bundle B2 migration tests so
/// the monotonic-upsert SQL executes against the real <c>SiteCalls</c> schema. Each test
/// scopes its data by minting a fresh <see cref="TrackedOperationId"/> (or a per-test
/// <c>SourceSite</c> suffix) so tests neither collide nor require teardown.
/// </summary>
public class SiteCallAuditRepositoryTests : IClassFixture<MsSqlMigrationFixture>
{
private readonly MsSqlMigrationFixture _fixture;
public SiteCallAuditRepositoryTests(MsSqlMigrationFixture fixture)
{
_fixture = fixture;
}
[SkippableFact]
public async Task UpsertAsync_FreshId_InsertsOneRow()
{
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
var id = TrackedOperationId.New();
await using var context = CreateContext();
var repo = new SiteCallAuditRepository(context);
var row = NewRow(id, status: "Submitted", retryCount: 0);
await repo.UpsertAsync(row);
await using var readContext = CreateContext();
var loaded = await readContext.Set<SiteCall>()
.Where(s => s.TrackedOperationId == id)
.ToListAsync();
Assert.Single(loaded);
Assert.Equal("Submitted", loaded[0].Status);
Assert.Equal(0, loaded[0].RetryCount);
}
[SkippableFact]
public async Task UpsertAsync_AdvancedStatus_UpdatesRow()
{
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
var id = TrackedOperationId.New();
await using var context = CreateContext();
var repo = new SiteCallAuditRepository(context);
// Submitted (rank 0) → Forwarded (rank 1) → Attempted (rank 2) — every
// step strictly advances the rank, so each upsert must mutate the row.
await repo.UpsertAsync(NewRow(id, status: "Submitted", retryCount: 0));
await repo.UpsertAsync(NewRow(id, status: "Forwarded", retryCount: 0));
await repo.UpsertAsync(NewRow(id, status: "Attempted", retryCount: 1, lastError: "transient 503"));
var loaded = await repo.GetAsync(id);
Assert.NotNull(loaded);
Assert.Equal("Attempted", loaded!.Status);
Assert.Equal(1, loaded.RetryCount);
Assert.Equal("transient 503", loaded.LastError);
}
[SkippableFact]
public async Task UpsertAsync_OlderStatus_IsNoOp_RowUnchanged()
{
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
var id = TrackedOperationId.New();
await using var context = CreateContext();
var repo = new SiteCallAuditRepository(context);
// First land Attempted (rank 2). A late-arriving Submitted (rank 0) must
// NOT roll the row back — silent no-op.
await repo.UpsertAsync(NewRow(id, status: "Attempted", retryCount: 5, lastError: "transient"));
var attemptedSnapshot = await repo.GetAsync(id);
await repo.UpsertAsync(NewRow(id, status: "Submitted", retryCount: 0, lastError: null));
var afterStale = await repo.GetAsync(id);
Assert.NotNull(afterStale);
Assert.Equal("Attempted", afterStale!.Status);
Assert.Equal(5, afterStale.RetryCount);
Assert.Equal("transient", afterStale.LastError);
// UpdatedAtUtc should not have moved when the stale write was rejected.
Assert.Equal(attemptedSnapshot!.UpdatedAtUtc, afterStale.UpdatedAtUtc);
}
[SkippableFact]
public async Task UpsertAsync_SameStatus_IsNoOp()
{
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
var id = TrackedOperationId.New();
await using var context = CreateContext();
var repo = new SiteCallAuditRepository(context);
await repo.UpsertAsync(NewRow(id, status: "Attempted", retryCount: 1, lastError: "first"));
var snapshot = await repo.GetAsync(id);
// Same rank (2) — repository must treat this as a no-op (no fields move).
await repo.UpsertAsync(NewRow(id, status: "Attempted", retryCount: 2, lastError: "second"));
var afterDuplicate = await repo.GetAsync(id);
Assert.NotNull(afterDuplicate);
Assert.Equal("Attempted", afterDuplicate!.Status);
Assert.Equal(1, afterDuplicate.RetryCount);
Assert.Equal("first", afterDuplicate.LastError);
Assert.Equal(snapshot!.UpdatedAtUtc, afterDuplicate.UpdatedAtUtc);
}
[SkippableFact]
public async Task UpsertAsync_TerminalOverTerminal_IsNoOp()
{
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
// Bundle B3 plan: terminal statuses share rank 3 and are mutually
// exclusive — Delivered cannot overwrite Parked.
var id = TrackedOperationId.New();
await using var context = CreateContext();
var repo = new SiteCallAuditRepository(context);
await repo.UpsertAsync(NewRow(id, status: "Parked", retryCount: 3, lastError: "parked-reason", terminal: true));
var afterPark = await repo.GetAsync(id);
await repo.UpsertAsync(NewRow(id, status: "Delivered", retryCount: 4, lastError: null, terminal: true));
var afterDeliveredAttempt = await repo.GetAsync(id);
Assert.NotNull(afterDeliveredAttempt);
Assert.Equal("Parked", afterDeliveredAttempt!.Status);
Assert.Equal("parked-reason", afterDeliveredAttempt.LastError);
Assert.Equal(afterPark!.UpdatedAtUtc, afterDeliveredAttempt.UpdatedAtUtc);
}
[SkippableFact]
public async Task UpsertAsync_ConcurrentInserts_SameId_OnlyOneRow()
{
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
// 50 parallel inserters with the same id. The IF NOT EXISTS … INSERT
// pattern has a check-then-act race; concurrent losers must surface as
// silent duplicate-key swallows, not thrown exceptions. Final row
// count must be exactly 1.
var id = TrackedOperationId.New();
var row = NewRow(id, status: "Submitted", retryCount: 0);
await Parallel.ForEachAsync(
Enumerable.Range(0, 50),
new ParallelOptions { MaxDegreeOfParallelism = 50 },
async (_, ct) =>
{
await using var context = CreateContext();
var repo = new SiteCallAuditRepository(context);
await repo.UpsertAsync(row, ct);
});
await using var readContext = CreateContext();
var count = await readContext.Set<SiteCall>()
.Where(s => s.TrackedOperationId == id)
.CountAsync();
Assert.Equal(1, count);
}
[SkippableFact]
public async Task GetAsync_KnownId_ReturnsRow()
{
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
var id = TrackedOperationId.New();
await using var context = CreateContext();
var repo = new SiteCallAuditRepository(context);
await repo.UpsertAsync(NewRow(id, status: "Submitted", retryCount: 0));
var loaded = await repo.GetAsync(id);
Assert.NotNull(loaded);
Assert.Equal(id, loaded!.TrackedOperationId);
}
[SkippableFact]
public async Task GetAsync_UnknownId_ReturnsNull()
{
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
await using var context = CreateContext();
var repo = new SiteCallAuditRepository(context);
var loaded = await repo.GetAsync(TrackedOperationId.New());
Assert.Null(loaded);
}
[SkippableFact]
public async Task QueryAsync_FilterBySourceSite_ReturnsMatchingRows()
{
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
var siteA = NewSiteId();
var siteB = NewSiteId();
await using var context = CreateContext();
var repo = new SiteCallAuditRepository(context);
var t0 = new DateTime(2026, 5, 20, 10, 0, 0, DateTimeKind.Utc);
await repo.UpsertAsync(NewRow(TrackedOperationId.New(), sourceSite: siteA, createdAtUtc: t0));
await repo.UpsertAsync(NewRow(TrackedOperationId.New(), sourceSite: siteA, createdAtUtc: t0.AddMinutes(1)));
await repo.UpsertAsync(NewRow(TrackedOperationId.New(), sourceSite: siteB, createdAtUtc: t0.AddMinutes(2)));
var rows = await repo.QueryAsync(
new SiteCallQueryFilter(SourceSite: siteA),
new SiteCallPaging(PageSize: 10));
Assert.Equal(2, rows.Count);
Assert.All(rows, r => Assert.Equal(siteA, r.SourceSite));
}
[SkippableFact]
public async Task QueryAsync_KeysetPaging_NoOverlap()
{
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
var site = NewSiteId();
await using var context = CreateContext();
var repo = new SiteCallAuditRepository(context);
// Five rows with distinct CreatedAtUtc. Page-size 2 → page 1 returns
// minutes 4,3; cursor (minutes 3) → page 2 returns minutes 2,1; cursor
// (minutes 1) → page 3 returns minute 0.
var t0 = new DateTime(2026, 5, 20, 9, 0, 0, DateTimeKind.Utc);
for (var i = 0; i < 5; i++)
{
await repo.UpsertAsync(NewRow(TrackedOperationId.New(), sourceSite: site, createdAtUtc: t0.AddMinutes(i)));
}
var page1 = await repo.QueryAsync(
new SiteCallQueryFilter(SourceSite: site),
new SiteCallPaging(PageSize: 2));
Assert.Equal(2, page1.Count);
Assert.Equal(t0.AddMinutes(4), page1[0].CreatedAtUtc);
Assert.Equal(t0.AddMinutes(3), page1[1].CreatedAtUtc);
var cursor1 = page1[^1];
var page2 = await repo.QueryAsync(
new SiteCallQueryFilter(SourceSite: site),
new SiteCallPaging(
PageSize: 2,
AfterCreatedAtUtc: cursor1.CreatedAtUtc,
AfterId: cursor1.TrackedOperationId));
Assert.Equal(2, page2.Count);
Assert.Equal(t0.AddMinutes(2), page2[0].CreatedAtUtc);
Assert.Equal(t0.AddMinutes(1), page2[1].CreatedAtUtc);
var cursor2 = page2[^1];
var page3 = await repo.QueryAsync(
new SiteCallQueryFilter(SourceSite: site),
new SiteCallPaging(
PageSize: 2,
AfterCreatedAtUtc: cursor2.CreatedAtUtc,
AfterId: cursor2.TrackedOperationId));
Assert.Single(page3);
Assert.Equal(t0.AddMinutes(0), page3[0].CreatedAtUtc);
// No overlap across pages.
var allIds = page1.Concat(page2).Concat(page3).Select(r => r.TrackedOperationId).ToHashSet();
Assert.Equal(5, allIds.Count);
}
[SkippableFact]
public async Task PurgeTerminalAsync_RemovesTerminalAndOld()
{
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
var site = NewSiteId();
await using var context = CreateContext();
var repo = new SiteCallAuditRepository(context);
// One row that's been Delivered for a long time (5 days ago) — should be purged.
var oldId = TrackedOperationId.New();
var fiveDaysAgo = DateTime.UtcNow.AddDays(-5);
await repo.UpsertAsync(NewRow(
oldId,
sourceSite: site,
status: "Delivered",
retryCount: 1,
createdAtUtc: fiveDaysAgo.AddMinutes(-1),
updatedAtUtc: fiveDaysAgo,
terminal: true,
terminalAtUtc: fiveDaysAgo));
var purged = await repo.PurgeTerminalAsync(DateTime.UtcNow.AddDays(-1));
Assert.True(purged >= 1, $"Expected at least one purged row; got {purged}.");
Assert.Null(await repo.GetAsync(oldId));
}
[SkippableFact]
public async Task PurgeTerminalAsync_KeepsNonTerminalAndRecent()
{
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
var site = NewSiteId();
await using var context = CreateContext();
var repo = new SiteCallAuditRepository(context);
// Non-terminal row: never eligible.
var activeId = TrackedOperationId.New();
await repo.UpsertAsync(NewRow(
activeId,
sourceSite: site,
status: "Attempted",
retryCount: 1,
createdAtUtc: DateTime.UtcNow.AddDays(-10),
updatedAtUtc: DateTime.UtcNow.AddDays(-10),
terminal: false));
// Recent terminal row: TerminalAtUtc within the keep window.
var recentTerminalId = TrackedOperationId.New();
await repo.UpsertAsync(NewRow(
recentTerminalId,
sourceSite: site,
status: "Delivered",
retryCount: 0,
createdAtUtc: DateTime.UtcNow.AddHours(-2),
updatedAtUtc: DateTime.UtcNow.AddHours(-1),
terminal: true,
terminalAtUtc: DateTime.UtcNow.AddHours(-1)));
// Purge older than 1 day — both rows must survive.
await repo.PurgeTerminalAsync(DateTime.UtcNow.AddDays(-1));
Assert.NotNull(await repo.GetAsync(activeId));
Assert.NotNull(await repo.GetAsync(recentTerminalId));
}
// --- helpers ------------------------------------------------------------
private ScadaLinkDbContext CreateContext()
{
var options = new DbContextOptionsBuilder<ScadaLinkDbContext>()
.UseSqlServer(_fixture.ConnectionString)
.Options;
return new ScadaLinkDbContext(options);
}
private static string NewSiteId() =>
"site-b3-" + Guid.NewGuid().ToString("N").Substring(0, 8);
private static SiteCall NewRow(
TrackedOperationId id,
string? sourceSite = null,
string status = "Submitted",
int retryCount = 0,
string? lastError = null,
int? httpStatus = null,
DateTime? createdAtUtc = null,
DateTime? updatedAtUtc = null,
bool terminal = false,
DateTime? terminalAtUtc = null)
{
var created = createdAtUtc ?? DateTime.UtcNow;
var updated = updatedAtUtc ?? created;
DateTime? terminalAt = terminal
? (terminalAtUtc ?? updated)
: null;
return new SiteCall
{
TrackedOperationId = id,
Channel = "ApiOutbound",
Target = "ERP.GetOrder",
SourceSite = sourceSite ?? NewSiteId(),
Status = status,
RetryCount = retryCount,
LastError = lastError,
HttpStatus = httpStatus,
CreatedAtUtc = created,
UpdatedAtUtc = updated,
TerminalAtUtc = terminalAt,
IngestedAtUtc = DateTime.UtcNow,
};
}
}