From b86d7c61abf88859d6a68c113d76040917c870a1 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Wed, 20 May 2026 13:51:09 -0400 Subject: [PATCH] feat(siteruntime): OperationTrackingStore site-local SQLite (#23 M3) --- .../Interfaces/IOperationTrackingStore.cs | 87 +++++ .../Types/TrackingStatusSnapshot.cs | 40 +++ .../Tracking/OperationTrackingOptions.cs | 21 ++ .../Tracking/OperationTrackingStore.cs | 333 ++++++++++++++++++ .../Tracking/OperationTrackingStoreTests.cs | 286 +++++++++++++++ 5 files changed, 767 insertions(+) create mode 100644 src/ScadaLink.Commons/Interfaces/IOperationTrackingStore.cs create mode 100644 src/ScadaLink.Commons/Types/TrackingStatusSnapshot.cs create mode 100644 src/ScadaLink.SiteRuntime/Tracking/OperationTrackingOptions.cs create mode 100644 src/ScadaLink.SiteRuntime/Tracking/OperationTrackingStore.cs create mode 100644 tests/ScadaLink.SiteRuntime.Tests/Tracking/OperationTrackingStoreTests.cs diff --git a/src/ScadaLink.Commons/Interfaces/IOperationTrackingStore.cs b/src/ScadaLink.Commons/Interfaces/IOperationTrackingStore.cs new file mode 100644 index 0000000..add0a8c --- /dev/null +++ b/src/ScadaLink.Commons/Interfaces/IOperationTrackingStore.cs @@ -0,0 +1,87 @@ +using ScadaLink.Commons.Types; + +namespace ScadaLink.Commons.Interfaces; + +/// +/// Site-local source of truth for cached-operation tracking +/// (ExternalSystem.CachedCall / Database.CachedWrite) — alongside the +/// Store-and-Forward buffer, this is the row that Tracking.Status(id) +/// reads (Audit Log #23 / M3). One row per ; +/// terminal rows are purged after a configurable retention window +/// (default 7 days). +/// +/// +/// +/// The store is intentionally a thin write-API on top of SQLite — not a +/// dispatcher. Status transitions follow +/// Submitted → Retrying → Delivered / Parked / Failed / Discarded; rows +/// in a terminal state never roll back. Implementations must: +/// +/// is insert-if-not-exists +/// (caller-supplied id is the idempotency key — duplicate enqueues are no-ops). +/// only updates non-terminal rows. +/// only flips a non-terminal row to terminal. +/// deletes terminal rows whose +/// TerminalAtUtc is strictly older than the supplied threshold. +/// +/// +/// +public interface IOperationTrackingStore +{ + /// + /// Insert a new tracking row in Submitted state with RetryCount = 0. + /// Idempotent — a duplicate id is silently ignored (the existing row is left + /// untouched), matching the at-least-once semantics of the calling site + /// store-and-forward path. + /// + Task RecordEnqueueAsync( + TrackedOperationId id, + string kind, + string? targetSummary, + string? sourceInstanceId, + string? sourceScript, + CancellationToken ct = default); + + /// + /// Advance an in-flight tracking row's status, retry counter, and most- + /// recent error/HTTP-status. Terminal rows ( + /// already applied) are NOT mutated — the operation has reached its final + /// outcome and any late-arriving attempt telemetry is dropped on the floor. + /// + Task RecordAttemptAsync( + TrackedOperationId id, + string status, + int retryCount, + string? lastError, + int? httpStatus, + CancellationToken ct = default); + + /// + /// Flip a non-terminal tracking row to terminal — sets + /// TerminalAtUtc = now and writes the final status / error. A row + /// already in terminal state is left untouched (first-write-wins). + /// + Task RecordTerminalAsync( + TrackedOperationId id, + string status, + string? lastError, + int? httpStatus, + CancellationToken ct = default); + + /// + /// Return the latest snapshot for the supplied id, or null when no + /// tracking row exists (purged or never recorded). + /// + Task GetStatusAsync( + TrackedOperationId id, + CancellationToken ct = default); + + /// + /// Delete terminal rows whose TerminalAtUtc is strictly older than + /// . Non-terminal rows are kept regardless + /// of age (the operation is still in flight). + /// + Task PurgeTerminalAsync( + DateTime olderThanUtc, + CancellationToken ct = default); +} diff --git a/src/ScadaLink.Commons/Types/TrackingStatusSnapshot.cs b/src/ScadaLink.Commons/Types/TrackingStatusSnapshot.cs new file mode 100644 index 0000000..22136ff --- /dev/null +++ b/src/ScadaLink.Commons/Types/TrackingStatusSnapshot.cs @@ -0,0 +1,40 @@ +namespace ScadaLink.Commons.Types; + +/// +/// Site-local snapshot of a cached operation's tracking state, returned by the +/// Tracking.Status(TrackedOperationId) script API (Audit Log #23 / M3). +/// +/// Tracking handle returned by CachedCall/CachedWrite. +/// +/// Operation category — "ApiCallCached" or "DbWriteCached" — mirroring +/// the per-attempt vocabulary. +/// +/// +/// Human-readable target (e.g. "ERP.GetOrder" or "WarehouseDb"); may be +/// null for early-lifecycle rows recorded before the target was resolved. +/// +/// +/// Lifecycle status — one of Submitted, Forwarded, Retrying, +/// Attempted, Delivered, Failed, Parked, Discarded. +/// +/// Number of attempts made; 0 prior to first dispatch. +/// Most recent error message; null while non-terminal-and-no-failures. +/// Most recent HTTP status code where applicable; null otherwise. +/// UTC timestamp the tracking row was first recorded. +/// UTC timestamp of the latest status mutation. +/// UTC timestamp the row reached a terminal status; null while still active. +/// Instance id that issued the cached call, when known. +/// Script that issued the cached call, when known. +public sealed record TrackingStatusSnapshot( + TrackedOperationId Id, + string Kind, + string? TargetSummary, + string Status, + int RetryCount, + string? LastError, + int? HttpStatus, + DateTime CreatedAtUtc, + DateTime UpdatedAtUtc, + DateTime? TerminalAtUtc, + string? SourceInstanceId, + string? SourceScript); diff --git a/src/ScadaLink.SiteRuntime/Tracking/OperationTrackingOptions.cs b/src/ScadaLink.SiteRuntime/Tracking/OperationTrackingOptions.cs new file mode 100644 index 0000000..b2fcf7d --- /dev/null +++ b/src/ScadaLink.SiteRuntime/Tracking/OperationTrackingOptions.cs @@ -0,0 +1,21 @@ +namespace ScadaLink.SiteRuntime.Tracking; + +/// +/// Options for — site-local cached-call +/// tracking SQLite store (Audit Log #23 / M3). +/// +public class OperationTrackingOptions +{ + /// + /// Full ADO.NET connection string for the SQLite database (e.g. + /// "Data Source=site-tracking.db"). Tests use the + /// Mode=Memory;Cache=Shared form to keep the database in-memory. + /// + public string ConnectionString { get; set; } = "Data Source=site-tracking.db"; + + /// + /// Retention window for terminal tracking rows. The default purge cadence + /// (driven by the host) deletes terminal rows older than this many days. + /// + public int RetentionDays { get; set; } = 7; +} diff --git a/src/ScadaLink.SiteRuntime/Tracking/OperationTrackingStore.cs b/src/ScadaLink.SiteRuntime/Tracking/OperationTrackingStore.cs new file mode 100644 index 0000000..8ef0d2f --- /dev/null +++ b/src/ScadaLink.SiteRuntime/Tracking/OperationTrackingStore.cs @@ -0,0 +1,333 @@ +using System.Globalization; +using Microsoft.Data.Sqlite; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using ScadaLink.Commons.Interfaces; +using ScadaLink.Commons.Types; + +namespace ScadaLink.SiteRuntime.Tracking; + +/// +/// Site-local SQLite source-of-truth for cached-operation tracking — the row +/// that Tracking.Status(TrackedOperationId) reads (Audit Log #23 / M3). +/// +/// +/// +/// One row per ; lifecycle is +/// Submitted → Retrying → Delivered / Parked / Failed / Discarded; terminal +/// rows are purged after the configured retention window +/// (). Volume is bounded — +/// only cached calls produce rows, and only a handful of lifecycle events per +/// call — so we keep the implementation deliberately simple: a single owned +/// serialised behind a +/// (one async writer at a time). This is the pattern the M3 brief calls out as +/// "cleaner than the M2 Channel<T> pipeline given the volume"; the M2 +/// audit-writer's batched-channel design is reserved for the high-volume audit +/// hot-path. +/// +/// +/// All mutations are idempotent / monotonic: is +/// INSERT OR IGNORE, filters out terminal +/// rows in the WHERE clause, and only +/// fires on rows that haven't terminated yet (first-write-wins). This makes the +/// store safe under the at-least-once semantics of the site→central telemetry +/// path. +/// +/// +public class OperationTrackingStore : IOperationTrackingStore, IAsyncDisposable, IDisposable +{ + private readonly SqliteConnection _connection; + private readonly SemaphoreSlim _gate = new(1, 1); + private readonly ILogger _logger; + private bool _disposed; + + public OperationTrackingStore( + IOptions options, + ILogger logger) + { + ArgumentNullException.ThrowIfNull(options); + ArgumentNullException.ThrowIfNull(logger); + + _logger = logger; + _connection = new SqliteConnection(options.Value.ConnectionString); + _connection.Open(); + InitializeSchema(); + } + + private void InitializeSchema() + { + using var cmd = _connection.CreateCommand(); + cmd.CommandText = """ + CREATE TABLE IF NOT EXISTS OperationTracking ( + TrackedOperationId TEXT NOT NULL PRIMARY KEY, + Kind TEXT NOT NULL, + TargetSummary TEXT NULL, + Status TEXT NOT NULL, + RetryCount INTEGER NOT NULL DEFAULT 0, + LastError TEXT NULL, + HttpStatus INTEGER NULL, + CreatedAtUtc TEXT NOT NULL, + UpdatedAtUtc TEXT NOT NULL, + TerminalAtUtc TEXT NULL, + SourceInstanceId TEXT NULL, + SourceScript TEXT NULL + ); + CREATE INDEX IF NOT EXISTS IX_OperationTracking_Status_Updated + ON OperationTracking (Status, UpdatedAtUtc); + """; + cmd.ExecuteNonQuery(); + } + + /// + public async Task RecordEnqueueAsync( + TrackedOperationId id, + string kind, + string? targetSummary, + string? sourceInstanceId, + string? sourceScript, + CancellationToken ct = default) + { + ArgumentNullException.ThrowIfNull(kind); + + await _gate.WaitAsync(ct).ConfigureAwait(false); + try + { + ObjectDisposedException.ThrowIf(_disposed, this); + + var now = DateTime.UtcNow.ToString("o", CultureInfo.InvariantCulture); + + using var cmd = _connection.CreateCommand(); + // INSERT OR IGNORE: duplicate ids are no-ops (first-write-wins) — + // matches the at-least-once semantics the site emits under. + cmd.CommandText = """ + INSERT OR IGNORE INTO OperationTracking ( + TrackedOperationId, Kind, TargetSummary, Status, + RetryCount, LastError, HttpStatus, + CreatedAtUtc, UpdatedAtUtc, TerminalAtUtc, + SourceInstanceId, SourceScript + ) VALUES ( + $id, $kind, $targetSummary, $status, + 0, NULL, NULL, + $now, $now, NULL, + $sourceInstanceId, $sourceScript + ); + """; + cmd.Parameters.AddWithValue("$id", id.ToString()); + cmd.Parameters.AddWithValue("$kind", kind); + cmd.Parameters.AddWithValue("$targetSummary", (object?)targetSummary ?? DBNull.Value); + cmd.Parameters.AddWithValue("$status", "Submitted"); + cmd.Parameters.AddWithValue("$now", now); + cmd.Parameters.AddWithValue("$sourceInstanceId", (object?)sourceInstanceId ?? DBNull.Value); + cmd.Parameters.AddWithValue("$sourceScript", (object?)sourceScript ?? DBNull.Value); + + cmd.ExecuteNonQuery(); + } + finally + { + _gate.Release(); + } + } + + /// + public async Task RecordAttemptAsync( + TrackedOperationId id, + string status, + int retryCount, + string? lastError, + int? httpStatus, + CancellationToken ct = default) + { + ArgumentNullException.ThrowIfNull(status); + + await _gate.WaitAsync(ct).ConfigureAwait(false); + try + { + ObjectDisposedException.ThrowIf(_disposed, this); + + var now = DateTime.UtcNow.ToString("o", CultureInfo.InvariantCulture); + + using var cmd = _connection.CreateCommand(); + // Terminal rows are immutable — the WHERE clause filters them out so + // late-arriving attempt telemetry never overwrites a resolved row. + cmd.CommandText = """ + UPDATE OperationTracking + SET Status = $status, + RetryCount = $retryCount, + LastError = $lastError, + HttpStatus = $httpStatus, + UpdatedAtUtc = $now + WHERE TrackedOperationId = $id + AND TerminalAtUtc IS NULL; + """; + cmd.Parameters.AddWithValue("$id", id.ToString()); + cmd.Parameters.AddWithValue("$status", status); + cmd.Parameters.AddWithValue("$retryCount", retryCount); + cmd.Parameters.AddWithValue("$lastError", (object?)lastError ?? DBNull.Value); + cmd.Parameters.AddWithValue("$httpStatus", (object?)httpStatus ?? DBNull.Value); + cmd.Parameters.AddWithValue("$now", now); + + cmd.ExecuteNonQuery(); + } + finally + { + _gate.Release(); + } + } + + /// + public async Task RecordTerminalAsync( + TrackedOperationId id, + string status, + string? lastError, + int? httpStatus, + CancellationToken ct = default) + { + ArgumentNullException.ThrowIfNull(status); + + await _gate.WaitAsync(ct).ConfigureAwait(false); + try + { + ObjectDisposedException.ThrowIf(_disposed, this); + + var now = DateTime.UtcNow.ToString("o", CultureInfo.InvariantCulture); + + using var cmd = _connection.CreateCommand(); + // First-write-wins on the terminal flip: only update rows that + // haven't already terminated. + cmd.CommandText = """ + UPDATE OperationTracking + SET Status = $status, + LastError = $lastError, + HttpStatus = $httpStatus, + UpdatedAtUtc = $now, + TerminalAtUtc = $now + WHERE TrackedOperationId = $id + AND TerminalAtUtc IS NULL; + """; + cmd.Parameters.AddWithValue("$id", id.ToString()); + cmd.Parameters.AddWithValue("$status", status); + cmd.Parameters.AddWithValue("$lastError", (object?)lastError ?? DBNull.Value); + cmd.Parameters.AddWithValue("$httpStatus", (object?)httpStatus ?? DBNull.Value); + cmd.Parameters.AddWithValue("$now", now); + + cmd.ExecuteNonQuery(); + } + finally + { + _gate.Release(); + } + } + + /// + public async Task GetStatusAsync( + TrackedOperationId id, + CancellationToken ct = default) + { + await _gate.WaitAsync(ct).ConfigureAwait(false); + try + { + ObjectDisposedException.ThrowIf(_disposed, this); + + using var cmd = _connection.CreateCommand(); + cmd.CommandText = """ + SELECT TrackedOperationId, Kind, TargetSummary, Status, + RetryCount, LastError, HttpStatus, + CreatedAtUtc, UpdatedAtUtc, TerminalAtUtc, + SourceInstanceId, SourceScript + FROM OperationTracking + WHERE TrackedOperationId = $id; + """; + cmd.Parameters.AddWithValue("$id", id.ToString()); + + using var reader = cmd.ExecuteReader(); + if (!reader.Read()) + { + return null; + } + + return new TrackingStatusSnapshot( + Id: TrackedOperationId.Parse(reader.GetString(0)), + Kind: reader.GetString(1), + TargetSummary: reader.IsDBNull(2) ? null : reader.GetString(2), + Status: reader.GetString(3), + RetryCount: reader.GetInt32(4), + LastError: reader.IsDBNull(5) ? null : reader.GetString(5), + HttpStatus: reader.IsDBNull(6) ? null : reader.GetInt32(6), + CreatedAtUtc: ParseUtc(reader.GetString(7)), + UpdatedAtUtc: ParseUtc(reader.GetString(8)), + TerminalAtUtc: reader.IsDBNull(9) ? null : ParseUtc(reader.GetString(9)), + SourceInstanceId: reader.IsDBNull(10) ? null : reader.GetString(10), + SourceScript: reader.IsDBNull(11) ? null : reader.GetString(11)); + } + finally + { + _gate.Release(); + } + } + + /// + public async Task PurgeTerminalAsync( + DateTime olderThanUtc, + CancellationToken ct = default) + { + await _gate.WaitAsync(ct).ConfigureAwait(false); + try + { + ObjectDisposedException.ThrowIf(_disposed, this); + + using var cmd = _connection.CreateCommand(); + // Non-terminal rows (TerminalAtUtc IS NULL) are kept regardless of + // age — the operation is still in flight. + cmd.CommandText = """ + DELETE FROM OperationTracking + WHERE TerminalAtUtc IS NOT NULL + AND TerminalAtUtc < $threshold; + """; + cmd.Parameters.AddWithValue( + "$threshold", + olderThanUtc.ToString("o", CultureInfo.InvariantCulture)); + + cmd.ExecuteNonQuery(); + } + finally + { + _gate.Release(); + } + } + + private static DateTime ParseUtc(string raw) + { + return DateTime.Parse( + raw, + CultureInfo.InvariantCulture, + DateTimeStyles.RoundtripKind); + } + + public void Dispose() + { + DisposeAsyncCore().AsTask().GetAwaiter().GetResult(); + GC.SuppressFinalize(this); + } + + public async ValueTask DisposeAsync() + { + await DisposeAsyncCore().ConfigureAwait(false); + GC.SuppressFinalize(this); + } + + private async ValueTask DisposeAsyncCore() + { + await _gate.WaitAsync().ConfigureAwait(false); + try + { + if (_disposed) return; + _disposed = true; + _connection.Dispose(); + } + finally + { + _gate.Release(); + _gate.Dispose(); + } + } +} diff --git a/tests/ScadaLink.SiteRuntime.Tests/Tracking/OperationTrackingStoreTests.cs b/tests/ScadaLink.SiteRuntime.Tests/Tracking/OperationTrackingStoreTests.cs new file mode 100644 index 0000000..952d7dc --- /dev/null +++ b/tests/ScadaLink.SiteRuntime.Tests/Tracking/OperationTrackingStoreTests.cs @@ -0,0 +1,286 @@ +using Microsoft.Data.Sqlite; +using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.Extensions.Options; +using ScadaLink.Commons.Types; +using ScadaLink.Commons.Types.Enums; +using ScadaLink.SiteRuntime.Tracking; + +namespace ScadaLink.SiteRuntime.Tests.Tracking; + +/// +/// Audit Log #23 (M3 Bundle A — Task A2) — schema + behaviour tests for the +/// site-local . Each test uses a unique +/// shared-cache in-memory SQLite database so the store and the verifier share +/// the same store without touching disk. +/// +public class OperationTrackingStoreTests +{ + private static (OperationTrackingStore store, string dataSource) CreateStore( + string testName) + { + var dataSource = $"file:{testName}-{Guid.NewGuid():N}?mode=memory&cache=shared"; + var connectionString = $"Data Source={dataSource};Cache=Shared"; + var options = new OperationTrackingOptions + { + ConnectionString = connectionString, + }; + var store = new OperationTrackingStore( + Options.Create(options), + NullLogger.Instance); + return (store, dataSource); + } + + private static SqliteConnection OpenVerifierConnection(string dataSource) + { + var connection = new SqliteConnection($"Data Source={dataSource};Cache=Shared"); + connection.Open(); + return connection; + } + + [Fact] + public void Constructor_CreatesOperationTracking_SchemaOnFirstUse() + { + var (store, dataSource) = CreateStore(nameof(Constructor_CreatesOperationTracking_SchemaOnFirstUse)); + using (store) + { + using var connection = OpenVerifierConnection(dataSource); + using var cmd = connection.CreateCommand(); + cmd.CommandText = "PRAGMA table_info(OperationTracking);"; + using var reader = cmd.ExecuteReader(); + + var columns = new List<(string Name, int Pk, int NotNull)>(); + while (reader.Read()) + { + columns.Add((reader.GetString(1), reader.GetInt32(5), reader.GetInt32(3))); + } + + var expected = new[] + { + "TrackedOperationId", "Kind", "TargetSummary", "Status", + "RetryCount", "LastError", "HttpStatus", "CreatedAtUtc", + "UpdatedAtUtc", "TerminalAtUtc", "SourceInstanceId", "SourceScript", + }; + Assert.Equal( + expected.OrderBy(n => n), + columns.Select(c => c.Name).OrderBy(n => n)); + + var pkColumns = columns.Where(c => c.Pk > 0).Select(c => c.Name).ToList(); + Assert.Single(pkColumns); + Assert.Equal("TrackedOperationId", pkColumns[0]); + } + } + + [Fact] + public async Task RecordEnqueueAsync_InsertsSubmittedRow_WithRetryCountZero() + { + var (store, dataSource) = CreateStore(nameof(RecordEnqueueAsync_InsertsSubmittedRow_WithRetryCountZero)); + await using var _ = store; + + var id = TrackedOperationId.New(); + await store.RecordEnqueueAsync( + id, + kind: nameof(AuditKind.ApiCallCached), + targetSummary: "ERP.GetOrder", + sourceInstanceId: "Plant.Pump42", + sourceScript: "ScriptActor:OnTick"); + + var snapshot = await store.GetStatusAsync(id); + Assert.NotNull(snapshot); + Assert.Equal(id, snapshot!.Id); + Assert.Equal(nameof(AuditKind.ApiCallCached), snapshot.Kind); + Assert.Equal("ERP.GetOrder", snapshot.TargetSummary); + Assert.Equal(nameof(AuditStatus.Submitted), snapshot.Status); + Assert.Equal(0, snapshot.RetryCount); + Assert.Null(snapshot.LastError); + Assert.Null(snapshot.HttpStatus); + Assert.Null(snapshot.TerminalAtUtc); + Assert.Equal("Plant.Pump42", snapshot.SourceInstanceId); + Assert.Equal("ScriptActor:OnTick", snapshot.SourceScript); + Assert.Equal(DateTimeKind.Utc, snapshot.CreatedAtUtc.Kind); + Assert.Equal(DateTimeKind.Utc, snapshot.UpdatedAtUtc.Kind); + } + + [Fact] + public async Task RecordEnqueueAsync_Duplicate_IsNoOp_FirstWriteWins() + { + var (store, _) = CreateStore(nameof(RecordEnqueueAsync_Duplicate_IsNoOp_FirstWriteWins)); + await using var _store = store; + + var id = TrackedOperationId.New(); + await store.RecordEnqueueAsync(id, "ApiCallCached", "ERP.GetOrder", "Plant.Pump42", "ScriptActor:OnTick"); + await store.RecordEnqueueAsync(id, "ApiCallCached", "OtherTarget", "Other.Instance", "ScriptActor:Other"); + + var snapshot = await store.GetStatusAsync(id); + Assert.NotNull(snapshot); + // First-write-wins: the second enqueue is ignored — Target/Source stay first. + Assert.Equal("ERP.GetOrder", snapshot!.TargetSummary); + Assert.Equal("Plant.Pump42", snapshot.SourceInstanceId); + Assert.Equal("ScriptActor:OnTick", snapshot.SourceScript); + Assert.Equal(nameof(AuditStatus.Submitted), snapshot.Status); + Assert.Equal(0, snapshot.RetryCount); + } + + [Fact] + public async Task RecordAttemptAsync_AdvancesStatusAndRetryCount_OnNonTerminalRow() + { + var (store, _) = CreateStore(nameof(RecordAttemptAsync_AdvancesStatusAndRetryCount_OnNonTerminalRow)); + await using var _store = store; + + var id = TrackedOperationId.New(); + await store.RecordEnqueueAsync(id, "ApiCallCached", "ERP.GetOrder", null, null); + + await store.RecordAttemptAsync( + id, + status: nameof(AuditStatus.Attempted), + retryCount: 1, + lastError: "HTTP 503 from ERP", + httpStatus: 503); + + var snapshot = await store.GetStatusAsync(id); + Assert.NotNull(snapshot); + Assert.Equal(nameof(AuditStatus.Attempted), snapshot!.Status); + Assert.Equal(1, snapshot.RetryCount); + Assert.Equal("HTTP 503 from ERP", snapshot.LastError); + Assert.Equal(503, snapshot.HttpStatus); + Assert.Null(snapshot.TerminalAtUtc); + + // UpdatedAtUtc advances past CreatedAtUtc. + Assert.True(snapshot.UpdatedAtUtc >= snapshot.CreatedAtUtc); + } + + [Fact] + public async Task RecordAttemptAsync_OnTerminalRow_IsNoOp() + { + var (store, _) = CreateStore(nameof(RecordAttemptAsync_OnTerminalRow_IsNoOp)); + await using var _store = store; + + var id = TrackedOperationId.New(); + await store.RecordEnqueueAsync(id, "ApiCallCached", "ERP.GetOrder", null, null); + await store.RecordTerminalAsync( + id, + status: nameof(AuditStatus.Delivered), + lastError: null, + httpStatus: 200); + + var terminalSnapshot = await store.GetStatusAsync(id); + Assert.NotNull(terminalSnapshot); + Assert.NotNull(terminalSnapshot!.TerminalAtUtc); + + // Late attempt telemetry must NOT overwrite the terminal row. + await store.RecordAttemptAsync( + id, + status: nameof(AuditStatus.Attempted), + retryCount: 5, + lastError: "late attempt", + httpStatus: 500); + + var afterLate = await store.GetStatusAsync(id); + Assert.NotNull(afterLate); + Assert.Equal(nameof(AuditStatus.Delivered), afterLate!.Status); + Assert.Equal(0, afterLate.RetryCount); + Assert.Null(afterLate.LastError); + Assert.Equal(200, afterLate.HttpStatus); + Assert.NotNull(afterLate.TerminalAtUtc); + } + + [Fact] + public async Task RecordTerminalAsync_FlipsToTerminal_WithTerminalAtUtcSet() + { + var (store, _) = CreateStore(nameof(RecordTerminalAsync_FlipsToTerminal_WithTerminalAtUtcSet)); + await using var _store = store; + + var id = TrackedOperationId.New(); + await store.RecordEnqueueAsync(id, "ApiCallCached", "ERP.GetOrder", null, null); + + var beforeTerminal = DateTime.UtcNow; + await store.RecordTerminalAsync( + id, + status: nameof(AuditStatus.Parked), + lastError: "HTTP 503 (max retries)", + httpStatus: 503); + + var snapshot = await store.GetStatusAsync(id); + Assert.NotNull(snapshot); + Assert.Equal(nameof(AuditStatus.Parked), snapshot!.Status); + Assert.NotNull(snapshot.TerminalAtUtc); + Assert.Equal(DateTimeKind.Utc, snapshot.TerminalAtUtc!.Value.Kind); + Assert.True(snapshot.TerminalAtUtc >= beforeTerminal.AddSeconds(-1)); + Assert.Equal("HTTP 503 (max retries)", snapshot.LastError); + Assert.Equal(503, snapshot.HttpStatus); + } + + [Fact] + public async Task GetStatusAsync_Unknown_ReturnsNull() + { + var (store, _) = CreateStore(nameof(GetStatusAsync_Unknown_ReturnsNull)); + await using var _store = store; + + var unknown = TrackedOperationId.New(); + var snapshot = await store.GetStatusAsync(unknown); + + Assert.Null(snapshot); + } + + [Fact] + public async Task GetStatusAsync_ReturnsLatestSnapshot_AfterMultipleAttempts() + { + var (store, _) = CreateStore(nameof(GetStatusAsync_ReturnsLatestSnapshot_AfterMultipleAttempts)); + await using var _store = store; + + var id = TrackedOperationId.New(); + await store.RecordEnqueueAsync(id, "ApiCallCached", "ERP.GetOrder", null, null); + await store.RecordAttemptAsync(id, nameof(AuditStatus.Attempted), 1, "first failure", 503); + await store.RecordAttemptAsync(id, nameof(AuditStatus.Attempted), 2, "second failure", 503); + await store.RecordAttemptAsync(id, nameof(AuditStatus.Attempted), 3, "third failure", 504); + + var snapshot = await store.GetStatusAsync(id); + Assert.NotNull(snapshot); + Assert.Equal(3, snapshot!.RetryCount); + Assert.Equal("third failure", snapshot.LastError); + Assert.Equal(504, snapshot.HttpStatus); + } + + [Fact] + public async Task PurgeTerminalAsync_RemovesOldTerminalRows_KeepsRecent_KeepsNonTerminal() + { + var (store, dataSource) = CreateStore(nameof(PurgeTerminalAsync_RemovesOldTerminalRows_KeepsRecent_KeepsNonTerminal)); + await using var _store = store; + + // Three rows: + // (a) terminal, old → should be purged + // (b) terminal, fresh → should be kept + // (c) non-terminal, ancient CreatedAt → should be kept (no TerminalAtUtc) + var aId = TrackedOperationId.New(); + var bId = TrackedOperationId.New(); + var cId = TrackedOperationId.New(); + + await store.RecordEnqueueAsync(aId, "ApiCallCached", "A", null, null); + await store.RecordEnqueueAsync(bId, "ApiCallCached", "B", null, null); + await store.RecordEnqueueAsync(cId, "ApiCallCached", "C", null, null); + + await store.RecordTerminalAsync(aId, nameof(AuditStatus.Delivered), null, 200); + await store.RecordTerminalAsync(bId, nameof(AuditStatus.Delivered), null, 200); + + // Backdate the (a) row's TerminalAtUtc to 30 days ago via a direct UPDATE + // — RecordTerminalAsync stamps DateTime.UtcNow which we cannot inject. + // The verifier connection shares the same in-memory store thanks to + // mode=memory&cache=shared. + using (var connection = OpenVerifierConnection(dataSource)) + using (var cmd = connection.CreateCommand()) + { + cmd.CommandText = + "UPDATE OperationTracking SET TerminalAtUtc = $old WHERE TrackedOperationId = $id;"; + cmd.Parameters.AddWithValue("$old", DateTime.UtcNow.AddDays(-30).ToString("o")); + cmd.Parameters.AddWithValue("$id", aId.ToString()); + cmd.ExecuteNonQuery(); + } + + // Purge anything terminal older than 7 days. + var threshold = DateTime.UtcNow.AddDays(-7); + await store.PurgeTerminalAsync(threshold); + + Assert.Null(await store.GetStatusAsync(aId)); // purged + Assert.NotNull(await store.GetStatusAsync(bId)); // kept (recent terminal) + Assert.NotNull(await store.GetStatusAsync(cId)); // kept (non-terminal) + } +}