From bedfa6b8f38435312c783341e039b6f388e960c8 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Wed, 20 May 2026 14:10:24 -0400 Subject: [PATCH] feat(configdb): ISiteCallAuditRepository + EF impl, monotonic upsert (#22, #23 M3) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Bundle B3 of Audit Log #23 M3: data-access layer for the central SiteCalls table introduced in B1+B2. UpsertAsync is insert-if-not-exists then monotonic-status update so out-of-order telemetry, duplicate gRPC packets, and reconciliation pulls all converge on the same row without rolling state backward. - src/ScadaLink.Commons/Interfaces/Repositories/ISiteCallAuditRepository.cs: UpsertAsync (monotonic), GetAsync, QueryAsync, PurgeTerminalAsync. - src/ScadaLink.Commons/Types/Audit/SiteCallQueryFilter.cs + SiteCallPaging.cs: filter (Channel/SourceSite/Status/Target/time range) and keyset paging cursor on (CreatedAtUtc DESC, TrackedOperationId DESC), mirrored on M1's AuditLog* equivalents. - src/ScadaLink.ConfigurationDatabase/Repositories/SiteCallAuditRepository.cs: raw-SQL InsertIfNotExists + conditional UPDATE with inline CASE rank compare (Submitted=0, Forwarded=1, Attempted/Skipped=2, terminal=3 — terminal statuses are mutually exclusive so e.g. Delivered cannot overwrite Parked). Duplicate-key violations (SQL 2601/2627) are swallowed at Debug, identical to AuditLogRepository's race-fix. QueryAsync uses FromSqlInterpolated because EF Core 10 cannot translate string.Compare against the value-converted TrackedOperationId column inside an expression tree. - ServiceCollectionExtensions wires the repository (scoped, after IAuditLogRepository). - 12 integration tests in tests/ScadaLink.ConfigurationDatabase.Tests/ Repositories/ (MsSqlMigrationFixture + [SkippableFact]): fresh insert, monotonic advance, older-status no-op, same-status no-op, terminal-over-terminal no-op, 50-way concurrent-insert race produces exactly one row, Get known/unknown, filter by site, keyset paging no overlap, purge terminal-and-old, purge keeps non-terminal-and-recent. --- .../Repositories/ISiteCallAuditRepository.cs | 66 +++ .../Types/Audit/SiteCallPaging.cs | 15 + .../Types/Audit/SiteCallQueryFilter.cs | 21 + .../Repositories/SiteCallAuditRepository.cs | 214 ++++++++++ .../ServiceCollectionExtensions.cs | 1 + .../SiteCallAuditRepositoryTests.cs | 388 ++++++++++++++++++ 6 files changed, 705 insertions(+) create mode 100644 src/ScadaLink.Commons/Interfaces/Repositories/ISiteCallAuditRepository.cs create mode 100644 src/ScadaLink.Commons/Types/Audit/SiteCallPaging.cs create mode 100644 src/ScadaLink.Commons/Types/Audit/SiteCallQueryFilter.cs create mode 100644 src/ScadaLink.ConfigurationDatabase/Repositories/SiteCallAuditRepository.cs create mode 100644 tests/ScadaLink.ConfigurationDatabase.Tests/Repositories/SiteCallAuditRepositoryTests.cs diff --git a/src/ScadaLink.Commons/Interfaces/Repositories/ISiteCallAuditRepository.cs b/src/ScadaLink.Commons/Interfaces/Repositories/ISiteCallAuditRepository.cs new file mode 100644 index 0000000..7bb3790 --- /dev/null +++ b/src/ScadaLink.Commons/Interfaces/Repositories/ISiteCallAuditRepository.cs @@ -0,0 +1,66 @@ +using ScadaLink.Commons.Entities.Audit; +using ScadaLink.Commons.Types; +using ScadaLink.Commons.Types.Audit; + +namespace ScadaLink.Commons.Interfaces.Repositories; + +/// +/// Operational-state data access for the central SiteCalls table +/// (Site Call Audit #22, Audit Log #23 M3 Bundle B). One row per +/// ; sites remain the source of truth and this +/// table is an eventually-consistent mirror fed by best-effort gRPC telemetry +/// plus periodic reconciliation pulls. +/// +/// +/// +/// Unlike the partitioned append-only AuditLog (M1), this table holds +/// mutable operational state. is insert-if-not-exists +/// then monotonic update — a status update with rank less than or equal to the +/// stored status is a silent no-op so out-of-order telemetry, duplicate gRPC +/// packets, and reconciliation pulls can all feed the same writer without +/// rolling state backward. +/// +/// +/// Status rank for monotonic comparison (lower wins): Submitted=0, +/// Forwarded=1, Attempted=2, Skipped=2, Delivered=3, Failed=3, Parked=3, +/// Discarded=3. Terminal statuses share rank 3 and are mutually exclusive +/// — an attempt to upsert e.g. Delivered over an existing Parked +/// row is a no-op. +/// +/// +public interface ISiteCallAuditRepository +{ + /// + /// Inserts if no row with the same + /// exists; otherwise updates the + /// existing row IF AND ONLY IF the incoming status' rank strictly exceeds + /// the stored status' rank. Out-of-order / duplicate updates are silently + /// dropped (monotonic forward-only progression). + /// + Task UpsertAsync(SiteCall siteCall, CancellationToken ct = default); + + /// + /// Returns the row for the given id, or null if none exists. + /// + Task GetAsync(TrackedOperationId id, CancellationToken ct = default); + + /// + /// Returns up to rows matching + /// , ordered by (CreatedAtUtc DESC, + /// TrackedOperationId DESC). Use keyset paging via + /// + + /// to fetch subsequent pages. + /// + Task> QueryAsync( + SiteCallQueryFilter filter, + SiteCallPaging paging, + CancellationToken ct = default); + + /// + /// Deletes terminal rows whose is + /// strictly older than . Non-terminal rows + /// (TerminalAtUtc IS NULL) are NEVER purged. Returns the number of rows + /// deleted. + /// + Task PurgeTerminalAsync(DateTime olderThanUtc, CancellationToken ct = default); +} diff --git a/src/ScadaLink.Commons/Types/Audit/SiteCallPaging.cs b/src/ScadaLink.Commons/Types/Audit/SiteCallPaging.cs new file mode 100644 index 0000000..4a8f790 --- /dev/null +++ b/src/ScadaLink.Commons/Types/Audit/SiteCallPaging.cs @@ -0,0 +1,15 @@ +namespace ScadaLink.Commons.Types.Audit; + +/// +/// Keyset paging cursor for +/// . +/// The repository orders by (CreatedAtUtc DESC, TrackedOperationId DESC) — newest +/// calls first, with the strong-typed id breaking ties when two calls share an exact +/// CreatedAtUtc. Callers pass the last row of the previous page back as +/// + to fetch the next page. +/// Both must be non-null together, or both null (first page). +/// +public sealed record SiteCallPaging( + int PageSize, + DateTime? AfterCreatedAtUtc = null, + TrackedOperationId? AfterId = null); diff --git a/src/ScadaLink.Commons/Types/Audit/SiteCallQueryFilter.cs b/src/ScadaLink.Commons/Types/Audit/SiteCallQueryFilter.cs new file mode 100644 index 0000000..cf7e7d4 --- /dev/null +++ b/src/ScadaLink.Commons/Types/Audit/SiteCallQueryFilter.cs @@ -0,0 +1,21 @@ +namespace ScadaLink.Commons.Types.Audit; + +/// +/// Filter predicate for . +/// Any field left null means "do not constrain on that column". Time bounds +/// are half-open in the spec sense — is inclusive and +/// is inclusive of the upper bound; the repository SQL uses +/// >= / <= respectively. All filter fields are AND-combined. +/// +/// +/// Channel / Status / SourceSite / Target are matched as exact strings — the +/// underlying columns are bounded ASCII (varchar) and the Central UI Site Calls +/// page exposes them as drop-down filters, not free-text search. +/// +public sealed record SiteCallQueryFilter( + string? Channel = null, + string? SourceSite = null, + string? Status = null, + string? Target = null, + DateTime? FromUtc = null, + DateTime? ToUtc = null); diff --git a/src/ScadaLink.ConfigurationDatabase/Repositories/SiteCallAuditRepository.cs b/src/ScadaLink.ConfigurationDatabase/Repositories/SiteCallAuditRepository.cs new file mode 100644 index 0000000..3fdff7e --- /dev/null +++ b/src/ScadaLink.ConfigurationDatabase/Repositories/SiteCallAuditRepository.cs @@ -0,0 +1,214 @@ +using Microsoft.Data.SqlClient; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; +using ScadaLink.Commons.Entities.Audit; +using ScadaLink.Commons.Interfaces.Repositories; +using ScadaLink.Commons.Types; +using ScadaLink.Commons.Types.Audit; + +namespace ScadaLink.ConfigurationDatabase.Repositories; + +/// +/// EF Core implementation of . See the +/// interface for the monotonic-upsert contract; this class adds notes on the +/// data-access strategy used by each method. +/// +public class SiteCallAuditRepository : ISiteCallAuditRepository +{ + // SQL Server duplicate-key error numbers, identical to the AuditLogRepository + // race-fix: 2601 = unique-index violation, 2627 = PK/unique-constraint + // violation. The IF NOT EXISTS … INSERT pattern has a check-then-act window + // and the loser surfaces as one of these; monotonic-upsert semantics demand + // we swallow them. + private const int SqlErrorUniqueIndexViolation = 2601; + private const int SqlErrorPrimaryKeyViolation = 2627; + + // Monotonic status ordering. Lower rank wins on tie (same-rank upserts are + // no-ops, including terminal-over-terminal). Spec from Bundle B3 plan: + // Submitted < Forwarded < Attempted == Skipped < Delivered == Failed == Parked == Discarded. + private static readonly Dictionary StatusRank = new(StringComparer.Ordinal) + { + ["Submitted"] = 0, + ["Forwarded"] = 1, + ["Attempted"] = 2, + ["Skipped"] = 2, + ["Delivered"] = 3, + ["Failed"] = 3, + ["Parked"] = 3, + ["Discarded"] = 3, + }; + + private readonly ScadaLinkDbContext _context; + private readonly ILogger _logger; + + public SiteCallAuditRepository(ScadaLinkDbContext context, ILogger? logger = null) + { + _context = context ?? throw new ArgumentNullException(nameof(context)); + _logger = logger ?? NullLogger.Instance; + } + + /// + /// Two-step: IF NOT EXISTS INSERT then conditional UPDATE with + /// an inline CASE rank comparison. Both go through + /// + /// so the change tracker is bypassed and the value-converted PK column is + /// written as the canonical "D"-format GUID string. Duplicate-key violations + /// from the insert race are swallowed. + /// + public async Task UpsertAsync(SiteCall siteCall, CancellationToken ct = default) + { + if (siteCall is null) + { + throw new ArgumentNullException(nameof(siteCall)); + } + + var idText = siteCall.TrackedOperationId.Value.ToString("D"); + var incomingRank = GetRankOrThrow(siteCall.Status); + + // Step 1: insert-if-not-exists. Like AuditLogRepository.InsertIfNotExistsAsync + // this is check-then-act so a duplicate-key violation may surface under + // concurrent inserts on the same id — caught + logged at Debug. + try + { + await _context.Database.ExecuteSqlInterpolatedAsync( + $@"IF NOT EXISTS (SELECT 1 FROM dbo.SiteCalls WHERE TrackedOperationId = {idText}) +INSERT INTO dbo.SiteCalls + (TrackedOperationId, Channel, Target, SourceSite, Status, RetryCount, + LastError, HttpStatus, CreatedAtUtc, UpdatedAtUtc, TerminalAtUtc, IngestedAtUtc) +VALUES + ({idText}, {siteCall.Channel}, {siteCall.Target}, {siteCall.SourceSite}, {siteCall.Status}, {siteCall.RetryCount}, + {siteCall.LastError}, {siteCall.HttpStatus}, {siteCall.CreatedAtUtc}, {siteCall.UpdatedAtUtc}, {siteCall.TerminalAtUtc}, {siteCall.IngestedAtUtc});", + ct); + } + catch (SqlException ex) when ( + ex.Number == SqlErrorUniqueIndexViolation + || ex.Number == SqlErrorPrimaryKeyViolation) + { + _logger.LogDebug( + ex, + "SiteCallAuditRepository.UpsertAsync swallowed duplicate-key violation (error {SqlErrorNumber}) for TrackedOperationId {TrackedOperationId}; falling through to monotonic update.", + ex.Number, + idText); + } + + // Step 2: monotonic update. The CASE expression maps the stored Status + // string to the same rank table the caller uses; we only mutate if the + // incoming rank is strictly greater. Same-rank (including + // terminal-over-terminal) is a no-op — first-write-wins at each rank. + await _context.Database.ExecuteSqlInterpolatedAsync( + $@"UPDATE dbo.SiteCalls +SET Status = {siteCall.Status}, + RetryCount = {siteCall.RetryCount}, + LastError = {siteCall.LastError}, + HttpStatus = {siteCall.HttpStatus}, + UpdatedAtUtc = {siteCall.UpdatedAtUtc}, + TerminalAtUtc = {siteCall.TerminalAtUtc}, + IngestedAtUtc = {siteCall.IngestedAtUtc} +WHERE TrackedOperationId = {idText} + AND {incomingRank} > (CASE Status + WHEN 'Submitted' THEN 0 + WHEN 'Forwarded' THEN 1 + WHEN 'Attempted' THEN 2 + WHEN 'Skipped' THEN 2 + WHEN 'Delivered' THEN 3 + WHEN 'Failed' THEN 3 + WHEN 'Parked' THEN 3 + WHEN 'Discarded' THEN 3 + ELSE -1 + END);", + ct); + } + + /// + /// Single FindAsync against the PK. Returns null for unknown ids. + /// + public async Task GetAsync(TrackedOperationId id, CancellationToken ct = default) + { + return await _context.Set().FindAsync(new object?[] { id }, ct); + } + + /// + /// Builds a parameterised SQL query against dbo.SiteCalls ordered by + /// (CreatedAtUtc DESC, TrackedOperationId DESC), with keyset paging. + /// Raw SQL is used here (rather than LINQ) because EF Core 10 cannot + /// translate the lexicographic string comparison against the value-converted + /// column inside an expression tree — the + /// converter is applied to equality but not to inequality comparisons + /// against the underlying Guid. The keyset tiebreaker is varchar lex order, + /// which is deterministic and gives "no overlap, every row exactly once" + /// paging without depending on Guid byte ordering. + /// + public async Task> QueryAsync( + SiteCallQueryFilter filter, SiteCallPaging paging, CancellationToken ct = default) + { + if (filter is null) + { + throw new ArgumentNullException(nameof(filter)); + } + + if (paging is null) + { + throw new ArgumentNullException(nameof(paging)); + } + + // FormattableString interpolation parameterises every value (no concatenation) + // so this is injection-safe. EF Core resolves the parameter values, the + // composed sql is shaped to SQL Server's grammar and projected into the + // SiteCall entity via FromSqlInterpolated. The CASE expressions wrap each + // optional predicate so a null filter field degrades to a no-op (matches + // every row) instead of branching at C# level into N variants. + var afterCreated = paging.AfterCreatedAtUtc; + var afterIdString = paging.AfterId?.Value.ToString("D"); + var hasCursor = afterCreated is not null && afterIdString is not null; + + var fromUtc = filter.FromUtc; + var toUtc = filter.ToUtc; + + FormattableString sql = $@" +SELECT TOP ({paging.PageSize}) + TrackedOperationId, Channel, Target, SourceSite, Status, RetryCount, + LastError, HttpStatus, CreatedAtUtc, UpdatedAtUtc, TerminalAtUtc, IngestedAtUtc +FROM dbo.SiteCalls +WHERE ({filter.Channel} IS NULL OR Channel = {filter.Channel}) + AND ({filter.SourceSite} IS NULL OR SourceSite = {filter.SourceSite}) + AND ({filter.Status} IS NULL OR Status = {filter.Status}) + AND ({filter.Target} IS NULL OR Target = {filter.Target}) + AND ({fromUtc} IS NULL OR CreatedAtUtc >= {fromUtc}) + AND ({toUtc} IS NULL OR CreatedAtUtc <= {toUtc}) + AND ({(hasCursor ? 1 : 0)} = 0 + OR CreatedAtUtc < {afterCreated} + OR (CreatedAtUtc = {afterCreated} AND TrackedOperationId < {afterIdString})) +ORDER BY CreatedAtUtc DESC, TrackedOperationId DESC;"; + + var rows = await _context.Set() + .FromSqlInterpolated(sql) + .AsNoTracking() + .ToListAsync(ct); + + return rows; + } + + /// + /// Deletes rows whose is non-null AND + /// strictly less than . Non-terminal rows are + /// never touched. Returns the number of rows removed. + /// + public async Task PurgeTerminalAsync(DateTime olderThanUtc, CancellationToken ct = default) + { + return await _context.Database.ExecuteSqlInterpolatedAsync( + $"DELETE FROM dbo.SiteCalls WHERE TerminalAtUtc IS NOT NULL AND TerminalAtUtc < {olderThanUtc};", + ct); + } + + private static int GetRankOrThrow(string status) + { + if (!StatusRank.TryGetValue(status, out var rank)) + { + throw new ArgumentException( + $"Unknown SiteCall status '{status}'. Expected one of: {string.Join(", ", StatusRank.Keys)}.", + nameof(status)); + } + return rank; + } +} diff --git a/src/ScadaLink.ConfigurationDatabase/ServiceCollectionExtensions.cs b/src/ScadaLink.ConfigurationDatabase/ServiceCollectionExtensions.cs index b7ba6b4..bf79b29 100644 --- a/src/ScadaLink.ConfigurationDatabase/ServiceCollectionExtensions.cs +++ b/src/ScadaLink.ConfigurationDatabase/ServiceCollectionExtensions.cs @@ -47,6 +47,7 @@ public static class ServiceCollectionExtensions services.AddScoped(); services.AddScoped(); services.AddScoped(); + services.AddScoped(); services.AddScoped(); services.AddScoped(); services.AddScoped(); diff --git a/tests/ScadaLink.ConfigurationDatabase.Tests/Repositories/SiteCallAuditRepositoryTests.cs b/tests/ScadaLink.ConfigurationDatabase.Tests/Repositories/SiteCallAuditRepositoryTests.cs new file mode 100644 index 0000000..ae5dd90 --- /dev/null +++ b/tests/ScadaLink.ConfigurationDatabase.Tests/Repositories/SiteCallAuditRepositoryTests.cs @@ -0,0 +1,388 @@ +using Microsoft.EntityFrameworkCore; +using ScadaLink.Commons.Entities.Audit; +using ScadaLink.Commons.Types; +using ScadaLink.Commons.Types.Audit; +using ScadaLink.ConfigurationDatabase; +using ScadaLink.ConfigurationDatabase.Repositories; +using ScadaLink.ConfigurationDatabase.Tests.Migrations; +using Xunit; + +namespace ScadaLink.ConfigurationDatabase.Tests.Repositories; + +/// +/// Bundle B3 (#22, #23 M3) integration tests for . +/// Uses the same as the Bundle B2 migration tests so +/// the monotonic-upsert SQL executes against the real SiteCalls schema. Each test +/// scopes its data by minting a fresh (or a per-test +/// SourceSite suffix) so tests neither collide nor require teardown. +/// +public class SiteCallAuditRepositoryTests : IClassFixture +{ + private readonly MsSqlMigrationFixture _fixture; + + public SiteCallAuditRepositoryTests(MsSqlMigrationFixture fixture) + { + _fixture = fixture; + } + + [SkippableFact] + public async Task UpsertAsync_FreshId_InsertsOneRow() + { + Skip.IfNot(_fixture.Available, _fixture.SkipReason); + + var id = TrackedOperationId.New(); + await using var context = CreateContext(); + var repo = new SiteCallAuditRepository(context); + + var row = NewRow(id, status: "Submitted", retryCount: 0); + await repo.UpsertAsync(row); + + await using var readContext = CreateContext(); + var loaded = await readContext.Set() + .Where(s => s.TrackedOperationId == id) + .ToListAsync(); + + Assert.Single(loaded); + Assert.Equal("Submitted", loaded[0].Status); + Assert.Equal(0, loaded[0].RetryCount); + } + + [SkippableFact] + public async Task UpsertAsync_AdvancedStatus_UpdatesRow() + { + Skip.IfNot(_fixture.Available, _fixture.SkipReason); + + var id = TrackedOperationId.New(); + await using var context = CreateContext(); + var repo = new SiteCallAuditRepository(context); + + // Submitted (rank 0) → Forwarded (rank 1) → Attempted (rank 2) — every + // step strictly advances the rank, so each upsert must mutate the row. + await repo.UpsertAsync(NewRow(id, status: "Submitted", retryCount: 0)); + await repo.UpsertAsync(NewRow(id, status: "Forwarded", retryCount: 0)); + await repo.UpsertAsync(NewRow(id, status: "Attempted", retryCount: 1, lastError: "transient 503")); + + var loaded = await repo.GetAsync(id); + Assert.NotNull(loaded); + Assert.Equal("Attempted", loaded!.Status); + Assert.Equal(1, loaded.RetryCount); + Assert.Equal("transient 503", loaded.LastError); + } + + [SkippableFact] + public async Task UpsertAsync_OlderStatus_IsNoOp_RowUnchanged() + { + Skip.IfNot(_fixture.Available, _fixture.SkipReason); + + var id = TrackedOperationId.New(); + await using var context = CreateContext(); + var repo = new SiteCallAuditRepository(context); + + // First land Attempted (rank 2). A late-arriving Submitted (rank 0) must + // NOT roll the row back — silent no-op. + await repo.UpsertAsync(NewRow(id, status: "Attempted", retryCount: 5, lastError: "transient")); + var attemptedSnapshot = await repo.GetAsync(id); + + await repo.UpsertAsync(NewRow(id, status: "Submitted", retryCount: 0, lastError: null)); + var afterStale = await repo.GetAsync(id); + + Assert.NotNull(afterStale); + Assert.Equal("Attempted", afterStale!.Status); + Assert.Equal(5, afterStale.RetryCount); + Assert.Equal("transient", afterStale.LastError); + // UpdatedAtUtc should not have moved when the stale write was rejected. + Assert.Equal(attemptedSnapshot!.UpdatedAtUtc, afterStale.UpdatedAtUtc); + } + + [SkippableFact] + public async Task UpsertAsync_SameStatus_IsNoOp() + { + Skip.IfNot(_fixture.Available, _fixture.SkipReason); + + var id = TrackedOperationId.New(); + await using var context = CreateContext(); + var repo = new SiteCallAuditRepository(context); + + await repo.UpsertAsync(NewRow(id, status: "Attempted", retryCount: 1, lastError: "first")); + var snapshot = await repo.GetAsync(id); + + // Same rank (2) — repository must treat this as a no-op (no fields move). + await repo.UpsertAsync(NewRow(id, status: "Attempted", retryCount: 2, lastError: "second")); + var afterDuplicate = await repo.GetAsync(id); + + Assert.NotNull(afterDuplicate); + Assert.Equal("Attempted", afterDuplicate!.Status); + Assert.Equal(1, afterDuplicate.RetryCount); + Assert.Equal("first", afterDuplicate.LastError); + Assert.Equal(snapshot!.UpdatedAtUtc, afterDuplicate.UpdatedAtUtc); + } + + [SkippableFact] + public async Task UpsertAsync_TerminalOverTerminal_IsNoOp() + { + Skip.IfNot(_fixture.Available, _fixture.SkipReason); + + // Bundle B3 plan: terminal statuses share rank 3 and are mutually + // exclusive — Delivered cannot overwrite Parked. + var id = TrackedOperationId.New(); + await using var context = CreateContext(); + var repo = new SiteCallAuditRepository(context); + + await repo.UpsertAsync(NewRow(id, status: "Parked", retryCount: 3, lastError: "parked-reason", terminal: true)); + var afterPark = await repo.GetAsync(id); + + await repo.UpsertAsync(NewRow(id, status: "Delivered", retryCount: 4, lastError: null, terminal: true)); + var afterDeliveredAttempt = await repo.GetAsync(id); + + Assert.NotNull(afterDeliveredAttempt); + Assert.Equal("Parked", afterDeliveredAttempt!.Status); + Assert.Equal("parked-reason", afterDeliveredAttempt.LastError); + Assert.Equal(afterPark!.UpdatedAtUtc, afterDeliveredAttempt.UpdatedAtUtc); + } + + [SkippableFact] + public async Task UpsertAsync_ConcurrentInserts_SameId_OnlyOneRow() + { + Skip.IfNot(_fixture.Available, _fixture.SkipReason); + + // 50 parallel inserters with the same id. The IF NOT EXISTS … INSERT + // pattern has a check-then-act race; concurrent losers must surface as + // silent duplicate-key swallows, not thrown exceptions. Final row + // count must be exactly 1. + var id = TrackedOperationId.New(); + var row = NewRow(id, status: "Submitted", retryCount: 0); + + await Parallel.ForEachAsync( + Enumerable.Range(0, 50), + new ParallelOptions { MaxDegreeOfParallelism = 50 }, + async (_, ct) => + { + await using var context = CreateContext(); + var repo = new SiteCallAuditRepository(context); + await repo.UpsertAsync(row, ct); + }); + + await using var readContext = CreateContext(); + var count = await readContext.Set() + .Where(s => s.TrackedOperationId == id) + .CountAsync(); + Assert.Equal(1, count); + } + + [SkippableFact] + public async Task GetAsync_KnownId_ReturnsRow() + { + Skip.IfNot(_fixture.Available, _fixture.SkipReason); + + var id = TrackedOperationId.New(); + await using var context = CreateContext(); + var repo = new SiteCallAuditRepository(context); + + await repo.UpsertAsync(NewRow(id, status: "Submitted", retryCount: 0)); + + var loaded = await repo.GetAsync(id); + Assert.NotNull(loaded); + Assert.Equal(id, loaded!.TrackedOperationId); + } + + [SkippableFact] + public async Task GetAsync_UnknownId_ReturnsNull() + { + Skip.IfNot(_fixture.Available, _fixture.SkipReason); + + await using var context = CreateContext(); + var repo = new SiteCallAuditRepository(context); + + var loaded = await repo.GetAsync(TrackedOperationId.New()); + Assert.Null(loaded); + } + + [SkippableFact] + public async Task QueryAsync_FilterBySourceSite_ReturnsMatchingRows() + { + Skip.IfNot(_fixture.Available, _fixture.SkipReason); + + var siteA = NewSiteId(); + var siteB = NewSiteId(); + await using var context = CreateContext(); + var repo = new SiteCallAuditRepository(context); + + var t0 = new DateTime(2026, 5, 20, 10, 0, 0, DateTimeKind.Utc); + await repo.UpsertAsync(NewRow(TrackedOperationId.New(), sourceSite: siteA, createdAtUtc: t0)); + await repo.UpsertAsync(NewRow(TrackedOperationId.New(), sourceSite: siteA, createdAtUtc: t0.AddMinutes(1))); + await repo.UpsertAsync(NewRow(TrackedOperationId.New(), sourceSite: siteB, createdAtUtc: t0.AddMinutes(2))); + + var rows = await repo.QueryAsync( + new SiteCallQueryFilter(SourceSite: siteA), + new SiteCallPaging(PageSize: 10)); + + Assert.Equal(2, rows.Count); + Assert.All(rows, r => Assert.Equal(siteA, r.SourceSite)); + } + + [SkippableFact] + public async Task QueryAsync_KeysetPaging_NoOverlap() + { + Skip.IfNot(_fixture.Available, _fixture.SkipReason); + + var site = NewSiteId(); + await using var context = CreateContext(); + var repo = new SiteCallAuditRepository(context); + + // Five rows with distinct CreatedAtUtc. Page-size 2 → page 1 returns + // minutes 4,3; cursor (minutes 3) → page 2 returns minutes 2,1; cursor + // (minutes 1) → page 3 returns minute 0. + var t0 = new DateTime(2026, 5, 20, 9, 0, 0, DateTimeKind.Utc); + for (var i = 0; i < 5; i++) + { + await repo.UpsertAsync(NewRow(TrackedOperationId.New(), sourceSite: site, createdAtUtc: t0.AddMinutes(i))); + } + + var page1 = await repo.QueryAsync( + new SiteCallQueryFilter(SourceSite: site), + new SiteCallPaging(PageSize: 2)); + Assert.Equal(2, page1.Count); + Assert.Equal(t0.AddMinutes(4), page1[0].CreatedAtUtc); + Assert.Equal(t0.AddMinutes(3), page1[1].CreatedAtUtc); + + var cursor1 = page1[^1]; + var page2 = await repo.QueryAsync( + new SiteCallQueryFilter(SourceSite: site), + new SiteCallPaging( + PageSize: 2, + AfterCreatedAtUtc: cursor1.CreatedAtUtc, + AfterId: cursor1.TrackedOperationId)); + Assert.Equal(2, page2.Count); + Assert.Equal(t0.AddMinutes(2), page2[0].CreatedAtUtc); + Assert.Equal(t0.AddMinutes(1), page2[1].CreatedAtUtc); + + var cursor2 = page2[^1]; + var page3 = await repo.QueryAsync( + new SiteCallQueryFilter(SourceSite: site), + new SiteCallPaging( + PageSize: 2, + AfterCreatedAtUtc: cursor2.CreatedAtUtc, + AfterId: cursor2.TrackedOperationId)); + Assert.Single(page3); + Assert.Equal(t0.AddMinutes(0), page3[0].CreatedAtUtc); + + // No overlap across pages. + var allIds = page1.Concat(page2).Concat(page3).Select(r => r.TrackedOperationId).ToHashSet(); + Assert.Equal(5, allIds.Count); + } + + [SkippableFact] + public async Task PurgeTerminalAsync_RemovesTerminalAndOld() + { + Skip.IfNot(_fixture.Available, _fixture.SkipReason); + + var site = NewSiteId(); + await using var context = CreateContext(); + var repo = new SiteCallAuditRepository(context); + + // One row that's been Delivered for a long time (5 days ago) — should be purged. + var oldId = TrackedOperationId.New(); + var fiveDaysAgo = DateTime.UtcNow.AddDays(-5); + await repo.UpsertAsync(NewRow( + oldId, + sourceSite: site, + status: "Delivered", + retryCount: 1, + createdAtUtc: fiveDaysAgo.AddMinutes(-1), + updatedAtUtc: fiveDaysAgo, + terminal: true, + terminalAtUtc: fiveDaysAgo)); + + var purged = await repo.PurgeTerminalAsync(DateTime.UtcNow.AddDays(-1)); + + Assert.True(purged >= 1, $"Expected at least one purged row; got {purged}."); + Assert.Null(await repo.GetAsync(oldId)); + } + + [SkippableFact] + public async Task PurgeTerminalAsync_KeepsNonTerminalAndRecent() + { + Skip.IfNot(_fixture.Available, _fixture.SkipReason); + + var site = NewSiteId(); + await using var context = CreateContext(); + var repo = new SiteCallAuditRepository(context); + + // Non-terminal row: never eligible. + var activeId = TrackedOperationId.New(); + await repo.UpsertAsync(NewRow( + activeId, + sourceSite: site, + status: "Attempted", + retryCount: 1, + createdAtUtc: DateTime.UtcNow.AddDays(-10), + updatedAtUtc: DateTime.UtcNow.AddDays(-10), + terminal: false)); + + // Recent terminal row: TerminalAtUtc within the keep window. + var recentTerminalId = TrackedOperationId.New(); + await repo.UpsertAsync(NewRow( + recentTerminalId, + sourceSite: site, + status: "Delivered", + retryCount: 0, + createdAtUtc: DateTime.UtcNow.AddHours(-2), + updatedAtUtc: DateTime.UtcNow.AddHours(-1), + terminal: true, + terminalAtUtc: DateTime.UtcNow.AddHours(-1))); + + // Purge older than 1 day — both rows must survive. + await repo.PurgeTerminalAsync(DateTime.UtcNow.AddDays(-1)); + + Assert.NotNull(await repo.GetAsync(activeId)); + Assert.NotNull(await repo.GetAsync(recentTerminalId)); + } + + // --- helpers ------------------------------------------------------------ + + private ScadaLinkDbContext CreateContext() + { + var options = new DbContextOptionsBuilder() + .UseSqlServer(_fixture.ConnectionString) + .Options; + return new ScadaLinkDbContext(options); + } + + private static string NewSiteId() => + "site-b3-" + Guid.NewGuid().ToString("N").Substring(0, 8); + + private static SiteCall NewRow( + TrackedOperationId id, + string? sourceSite = null, + string status = "Submitted", + int retryCount = 0, + string? lastError = null, + int? httpStatus = null, + DateTime? createdAtUtc = null, + DateTime? updatedAtUtc = null, + bool terminal = false, + DateTime? terminalAtUtc = null) + { + var created = createdAtUtc ?? DateTime.UtcNow; + var updated = updatedAtUtc ?? created; + DateTime? terminalAt = terminal + ? (terminalAtUtc ?? updated) + : null; + + return new SiteCall + { + TrackedOperationId = id, + Channel = "ApiOutbound", + Target = "ERP.GetOrder", + SourceSite = sourceSite ?? NewSiteId(), + Status = status, + RetryCount = retryCount, + LastError = lastError, + HttpStatus = httpStatus, + CreatedAtUtc = created, + UpdatedAtUtc = updated, + TerminalAtUtc = terminalAt, + IngestedAtUtc = DateTime.UtcNow, + }; + } +}