feat(siteruntime): OperationTrackingStore site-local SQLite (#23 M3)
This commit is contained in:
87
src/ScadaLink.Commons/Interfaces/IOperationTrackingStore.cs
Normal file
87
src/ScadaLink.Commons/Interfaces/IOperationTrackingStore.cs
Normal file
@@ -0,0 +1,87 @@
|
||||
using ScadaLink.Commons.Types;
|
||||
|
||||
namespace ScadaLink.Commons.Interfaces;
|
||||
|
||||
/// <summary>
|
||||
/// Site-local source of truth for cached-operation tracking
|
||||
/// (<c>ExternalSystem.CachedCall</c> / <c>Database.CachedWrite</c>) — alongside the
|
||||
/// Store-and-Forward buffer, this is the row that <c>Tracking.Status(id)</c>
|
||||
/// reads (Audit Log #23 / M3). One row per <see cref="TrackedOperationId"/>;
|
||||
/// terminal rows are purged after a configurable retention window
|
||||
/// (default 7 days).
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// The store is intentionally a thin write-API on top of SQLite — not a
|
||||
/// dispatcher. Status transitions follow
|
||||
/// <c>Submitted → Retrying → Delivered / Parked / Failed / Discarded</c>; rows
|
||||
/// in a terminal state never roll back. Implementations must:
|
||||
/// <list type="bullet">
|
||||
/// <item><description><see cref="RecordEnqueueAsync"/> is insert-if-not-exists
|
||||
/// (caller-supplied id is the idempotency key — duplicate enqueues are no-ops).</description></item>
|
||||
/// <item><description><see cref="RecordAttemptAsync"/> only updates non-terminal rows.</description></item>
|
||||
/// <item><description><see cref="RecordTerminalAsync"/> only flips a non-terminal row to terminal.</description></item>
|
||||
/// <item><description><see cref="PurgeTerminalAsync"/> deletes terminal rows whose
|
||||
/// <c>TerminalAtUtc</c> is strictly older than the supplied threshold.</description></item>
|
||||
/// </list>
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
public interface IOperationTrackingStore
|
||||
{
|
||||
/// <summary>
|
||||
/// Insert a new tracking row in <c>Submitted</c> state with <c>RetryCount = 0</c>.
|
||||
/// Idempotent — a duplicate id is silently ignored (the existing row is left
|
||||
/// untouched), matching the at-least-once semantics of the calling site
|
||||
/// store-and-forward path.
|
||||
/// </summary>
|
||||
Task RecordEnqueueAsync(
|
||||
TrackedOperationId id,
|
||||
string kind,
|
||||
string? targetSummary,
|
||||
string? sourceInstanceId,
|
||||
string? sourceScript,
|
||||
CancellationToken ct = default);
|
||||
|
||||
/// <summary>
|
||||
/// Advance an in-flight tracking row's status, retry counter, and most-
|
||||
/// recent error/HTTP-status. Terminal rows (<see cref="RecordTerminalAsync"/>
|
||||
/// already applied) are NOT mutated — the operation has reached its final
|
||||
/// outcome and any late-arriving attempt telemetry is dropped on the floor.
|
||||
/// </summary>
|
||||
Task RecordAttemptAsync(
|
||||
TrackedOperationId id,
|
||||
string status,
|
||||
int retryCount,
|
||||
string? lastError,
|
||||
int? httpStatus,
|
||||
CancellationToken ct = default);
|
||||
|
||||
/// <summary>
|
||||
/// Flip a non-terminal tracking row to terminal — sets
|
||||
/// <c>TerminalAtUtc = now</c> and writes the final status / error. A row
|
||||
/// already in terminal state is left untouched (first-write-wins).
|
||||
/// </summary>
|
||||
Task RecordTerminalAsync(
|
||||
TrackedOperationId id,
|
||||
string status,
|
||||
string? lastError,
|
||||
int? httpStatus,
|
||||
CancellationToken ct = default);
|
||||
|
||||
/// <summary>
|
||||
/// Return the latest snapshot for the supplied id, or <c>null</c> when no
|
||||
/// tracking row exists (purged or never recorded).
|
||||
/// </summary>
|
||||
Task<TrackingStatusSnapshot?> GetStatusAsync(
|
||||
TrackedOperationId id,
|
||||
CancellationToken ct = default);
|
||||
|
||||
/// <summary>
|
||||
/// Delete terminal rows whose <c>TerminalAtUtc</c> is strictly older than
|
||||
/// <paramref name="olderThanUtc"/>. Non-terminal rows are kept regardless
|
||||
/// of age (the operation is still in flight).
|
||||
/// </summary>
|
||||
Task PurgeTerminalAsync(
|
||||
DateTime olderThanUtc,
|
||||
CancellationToken ct = default);
|
||||
}
|
||||
40
src/ScadaLink.Commons/Types/TrackingStatusSnapshot.cs
Normal file
40
src/ScadaLink.Commons/Types/TrackingStatusSnapshot.cs
Normal file
@@ -0,0 +1,40 @@
|
||||
namespace ScadaLink.Commons.Types;
|
||||
|
||||
/// <summary>
|
||||
/// Site-local snapshot of a cached operation's tracking state, returned by the
|
||||
/// <c>Tracking.Status(TrackedOperationId)</c> script API (Audit Log #23 / M3).
|
||||
/// </summary>
|
||||
/// <param name="Id">Tracking handle returned by <c>CachedCall</c>/<c>CachedWrite</c>.</param>
|
||||
/// <param name="Kind">
|
||||
/// Operation category — <c>"ApiCallCached"</c> or <c>"DbWriteCached"</c> — mirroring
|
||||
/// the <see cref="ScadaLink.Commons.Types.Enums.AuditKind"/> per-attempt vocabulary.
|
||||
/// </param>
|
||||
/// <param name="TargetSummary">
|
||||
/// Human-readable target (e.g. <c>"ERP.GetOrder"</c> or <c>"WarehouseDb"</c>); may be
|
||||
/// null for early-lifecycle rows recorded before the target was resolved.
|
||||
/// </param>
|
||||
/// <param name="Status">
|
||||
/// Lifecycle status — one of <c>Submitted</c>, <c>Forwarded</c>, <c>Retrying</c>,
|
||||
/// <c>Attempted</c>, <c>Delivered</c>, <c>Failed</c>, <c>Parked</c>, <c>Discarded</c>.
|
||||
/// </param>
|
||||
/// <param name="RetryCount">Number of attempts made; 0 prior to first dispatch.</param>
|
||||
/// <param name="LastError">Most recent error message; null while non-terminal-and-no-failures.</param>
|
||||
/// <param name="HttpStatus">Most recent HTTP status code where applicable; null otherwise.</param>
|
||||
/// <param name="CreatedAtUtc">UTC timestamp the tracking row was first recorded.</param>
|
||||
/// <param name="UpdatedAtUtc">UTC timestamp of the latest status mutation.</param>
|
||||
/// <param name="TerminalAtUtc">UTC timestamp the row reached a terminal status; null while still active.</param>
|
||||
/// <param name="SourceInstanceId">Instance id that issued the cached call, when known.</param>
|
||||
/// <param name="SourceScript">Script that issued the cached call, when known.</param>
|
||||
public sealed record TrackingStatusSnapshot(
|
||||
TrackedOperationId Id,
|
||||
string Kind,
|
||||
string? TargetSummary,
|
||||
string Status,
|
||||
int RetryCount,
|
||||
string? LastError,
|
||||
int? HttpStatus,
|
||||
DateTime CreatedAtUtc,
|
||||
DateTime UpdatedAtUtc,
|
||||
DateTime? TerminalAtUtc,
|
||||
string? SourceInstanceId,
|
||||
string? SourceScript);
|
||||
@@ -0,0 +1,21 @@
|
||||
namespace ScadaLink.SiteRuntime.Tracking;
|
||||
|
||||
/// <summary>
|
||||
/// Options for <see cref="OperationTrackingStore"/> — site-local cached-call
|
||||
/// tracking SQLite store (Audit Log #23 / M3).
|
||||
/// </summary>
|
||||
public class OperationTrackingOptions
|
||||
{
|
||||
/// <summary>
|
||||
/// Full ADO.NET connection string for the SQLite database (e.g.
|
||||
/// <c>"Data Source=site-tracking.db"</c>). Tests use the
|
||||
/// <c>Mode=Memory;Cache=Shared</c> form to keep the database in-memory.
|
||||
/// </summary>
|
||||
public string ConnectionString { get; set; } = "Data Source=site-tracking.db";
|
||||
|
||||
/// <summary>
|
||||
/// Retention window for terminal tracking rows. The default purge cadence
|
||||
/// (driven by the host) deletes terminal rows older than this many days.
|
||||
/// </summary>
|
||||
public int RetentionDays { get; set; } = 7;
|
||||
}
|
||||
333
src/ScadaLink.SiteRuntime/Tracking/OperationTrackingStore.cs
Normal file
333
src/ScadaLink.SiteRuntime/Tracking/OperationTrackingStore.cs
Normal file
@@ -0,0 +1,333 @@
|
||||
using System.Globalization;
|
||||
using Microsoft.Data.Sqlite;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
using ScadaLink.Commons.Interfaces;
|
||||
using ScadaLink.Commons.Types;
|
||||
|
||||
namespace ScadaLink.SiteRuntime.Tracking;
|
||||
|
||||
/// <summary>
|
||||
/// Site-local SQLite source-of-truth for cached-operation tracking — the row
|
||||
/// that <c>Tracking.Status(TrackedOperationId)</c> reads (Audit Log #23 / M3).
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// One row per <see cref="TrackedOperationId"/>; lifecycle is
|
||||
/// <c>Submitted → Retrying → Delivered / Parked / Failed / Discarded</c>; terminal
|
||||
/// rows are purged after the configured retention window
|
||||
/// (<see cref="OperationTrackingOptions.RetentionDays"/>). Volume is bounded —
|
||||
/// only cached calls produce rows, and only a handful of lifecycle events per
|
||||
/// call — so we keep the implementation deliberately simple: a single owned
|
||||
/// <see cref="SqliteConnection"/> serialised behind a <see cref="SemaphoreSlim"/>
|
||||
/// (one async writer at a time). This is the pattern the M3 brief calls out as
|
||||
/// "cleaner than the M2 Channel<T> pipeline given the volume"; the M2
|
||||
/// audit-writer's batched-channel design is reserved for the high-volume audit
|
||||
/// hot-path.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// All mutations are idempotent / monotonic: <see cref="RecordEnqueueAsync"/> is
|
||||
/// <c>INSERT OR IGNORE</c>, <see cref="RecordAttemptAsync"/> filters out terminal
|
||||
/// rows in the <c>WHERE</c> clause, and <see cref="RecordTerminalAsync"/> only
|
||||
/// fires on rows that haven't terminated yet (first-write-wins). This makes the
|
||||
/// store safe under the at-least-once semantics of the site→central telemetry
|
||||
/// path.
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
public class OperationTrackingStore : IOperationTrackingStore, IAsyncDisposable, IDisposable
|
||||
{
|
||||
private readonly SqliteConnection _connection;
|
||||
private readonly SemaphoreSlim _gate = new(1, 1);
|
||||
private readonly ILogger<OperationTrackingStore> _logger;
|
||||
private bool _disposed;
|
||||
|
||||
public OperationTrackingStore(
|
||||
IOptions<OperationTrackingOptions> options,
|
||||
ILogger<OperationTrackingStore> logger)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(options);
|
||||
ArgumentNullException.ThrowIfNull(logger);
|
||||
|
||||
_logger = logger;
|
||||
_connection = new SqliteConnection(options.Value.ConnectionString);
|
||||
_connection.Open();
|
||||
InitializeSchema();
|
||||
}
|
||||
|
||||
private void InitializeSchema()
|
||||
{
|
||||
using var cmd = _connection.CreateCommand();
|
||||
cmd.CommandText = """
|
||||
CREATE TABLE IF NOT EXISTS OperationTracking (
|
||||
TrackedOperationId TEXT NOT NULL PRIMARY KEY,
|
||||
Kind TEXT NOT NULL,
|
||||
TargetSummary TEXT NULL,
|
||||
Status TEXT NOT NULL,
|
||||
RetryCount INTEGER NOT NULL DEFAULT 0,
|
||||
LastError TEXT NULL,
|
||||
HttpStatus INTEGER NULL,
|
||||
CreatedAtUtc TEXT NOT NULL,
|
||||
UpdatedAtUtc TEXT NOT NULL,
|
||||
TerminalAtUtc TEXT NULL,
|
||||
SourceInstanceId TEXT NULL,
|
||||
SourceScript TEXT NULL
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS IX_OperationTracking_Status_Updated
|
||||
ON OperationTracking (Status, UpdatedAtUtc);
|
||||
""";
|
||||
cmd.ExecuteNonQuery();
|
||||
}
|
||||
|
||||
/// <inheritdoc/>
|
||||
public async Task RecordEnqueueAsync(
|
||||
TrackedOperationId id,
|
||||
string kind,
|
||||
string? targetSummary,
|
||||
string? sourceInstanceId,
|
||||
string? sourceScript,
|
||||
CancellationToken ct = default)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(kind);
|
||||
|
||||
await _gate.WaitAsync(ct).ConfigureAwait(false);
|
||||
try
|
||||
{
|
||||
ObjectDisposedException.ThrowIf(_disposed, this);
|
||||
|
||||
var now = DateTime.UtcNow.ToString("o", CultureInfo.InvariantCulture);
|
||||
|
||||
using var cmd = _connection.CreateCommand();
|
||||
// INSERT OR IGNORE: duplicate ids are no-ops (first-write-wins) —
|
||||
// matches the at-least-once semantics the site emits under.
|
||||
cmd.CommandText = """
|
||||
INSERT OR IGNORE INTO OperationTracking (
|
||||
TrackedOperationId, Kind, TargetSummary, Status,
|
||||
RetryCount, LastError, HttpStatus,
|
||||
CreatedAtUtc, UpdatedAtUtc, TerminalAtUtc,
|
||||
SourceInstanceId, SourceScript
|
||||
) VALUES (
|
||||
$id, $kind, $targetSummary, $status,
|
||||
0, NULL, NULL,
|
||||
$now, $now, NULL,
|
||||
$sourceInstanceId, $sourceScript
|
||||
);
|
||||
""";
|
||||
cmd.Parameters.AddWithValue("$id", id.ToString());
|
||||
cmd.Parameters.AddWithValue("$kind", kind);
|
||||
cmd.Parameters.AddWithValue("$targetSummary", (object?)targetSummary ?? DBNull.Value);
|
||||
cmd.Parameters.AddWithValue("$status", "Submitted");
|
||||
cmd.Parameters.AddWithValue("$now", now);
|
||||
cmd.Parameters.AddWithValue("$sourceInstanceId", (object?)sourceInstanceId ?? DBNull.Value);
|
||||
cmd.Parameters.AddWithValue("$sourceScript", (object?)sourceScript ?? DBNull.Value);
|
||||
|
||||
cmd.ExecuteNonQuery();
|
||||
}
|
||||
finally
|
||||
{
|
||||
_gate.Release();
|
||||
}
|
||||
}
|
||||
|
||||
/// <inheritdoc/>
|
||||
public async Task RecordAttemptAsync(
|
||||
TrackedOperationId id,
|
||||
string status,
|
||||
int retryCount,
|
||||
string? lastError,
|
||||
int? httpStatus,
|
||||
CancellationToken ct = default)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(status);
|
||||
|
||||
await _gate.WaitAsync(ct).ConfigureAwait(false);
|
||||
try
|
||||
{
|
||||
ObjectDisposedException.ThrowIf(_disposed, this);
|
||||
|
||||
var now = DateTime.UtcNow.ToString("o", CultureInfo.InvariantCulture);
|
||||
|
||||
using var cmd = _connection.CreateCommand();
|
||||
// Terminal rows are immutable — the WHERE clause filters them out so
|
||||
// late-arriving attempt telemetry never overwrites a resolved row.
|
||||
cmd.CommandText = """
|
||||
UPDATE OperationTracking
|
||||
SET Status = $status,
|
||||
RetryCount = $retryCount,
|
||||
LastError = $lastError,
|
||||
HttpStatus = $httpStatus,
|
||||
UpdatedAtUtc = $now
|
||||
WHERE TrackedOperationId = $id
|
||||
AND TerminalAtUtc IS NULL;
|
||||
""";
|
||||
cmd.Parameters.AddWithValue("$id", id.ToString());
|
||||
cmd.Parameters.AddWithValue("$status", status);
|
||||
cmd.Parameters.AddWithValue("$retryCount", retryCount);
|
||||
cmd.Parameters.AddWithValue("$lastError", (object?)lastError ?? DBNull.Value);
|
||||
cmd.Parameters.AddWithValue("$httpStatus", (object?)httpStatus ?? DBNull.Value);
|
||||
cmd.Parameters.AddWithValue("$now", now);
|
||||
|
||||
cmd.ExecuteNonQuery();
|
||||
}
|
||||
finally
|
||||
{
|
||||
_gate.Release();
|
||||
}
|
||||
}
|
||||
|
||||
/// <inheritdoc/>
|
||||
public async Task RecordTerminalAsync(
|
||||
TrackedOperationId id,
|
||||
string status,
|
||||
string? lastError,
|
||||
int? httpStatus,
|
||||
CancellationToken ct = default)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(status);
|
||||
|
||||
await _gate.WaitAsync(ct).ConfigureAwait(false);
|
||||
try
|
||||
{
|
||||
ObjectDisposedException.ThrowIf(_disposed, this);
|
||||
|
||||
var now = DateTime.UtcNow.ToString("o", CultureInfo.InvariantCulture);
|
||||
|
||||
using var cmd = _connection.CreateCommand();
|
||||
// First-write-wins on the terminal flip: only update rows that
|
||||
// haven't already terminated.
|
||||
cmd.CommandText = """
|
||||
UPDATE OperationTracking
|
||||
SET Status = $status,
|
||||
LastError = $lastError,
|
||||
HttpStatus = $httpStatus,
|
||||
UpdatedAtUtc = $now,
|
||||
TerminalAtUtc = $now
|
||||
WHERE TrackedOperationId = $id
|
||||
AND TerminalAtUtc IS NULL;
|
||||
""";
|
||||
cmd.Parameters.AddWithValue("$id", id.ToString());
|
||||
cmd.Parameters.AddWithValue("$status", status);
|
||||
cmd.Parameters.AddWithValue("$lastError", (object?)lastError ?? DBNull.Value);
|
||||
cmd.Parameters.AddWithValue("$httpStatus", (object?)httpStatus ?? DBNull.Value);
|
||||
cmd.Parameters.AddWithValue("$now", now);
|
||||
|
||||
cmd.ExecuteNonQuery();
|
||||
}
|
||||
finally
|
||||
{
|
||||
_gate.Release();
|
||||
}
|
||||
}
|
||||
|
||||
/// <inheritdoc/>
|
||||
public async Task<TrackingStatusSnapshot?> GetStatusAsync(
|
||||
TrackedOperationId id,
|
||||
CancellationToken ct = default)
|
||||
{
|
||||
await _gate.WaitAsync(ct).ConfigureAwait(false);
|
||||
try
|
||||
{
|
||||
ObjectDisposedException.ThrowIf(_disposed, this);
|
||||
|
||||
using var cmd = _connection.CreateCommand();
|
||||
cmd.CommandText = """
|
||||
SELECT TrackedOperationId, Kind, TargetSummary, Status,
|
||||
RetryCount, LastError, HttpStatus,
|
||||
CreatedAtUtc, UpdatedAtUtc, TerminalAtUtc,
|
||||
SourceInstanceId, SourceScript
|
||||
FROM OperationTracking
|
||||
WHERE TrackedOperationId = $id;
|
||||
""";
|
||||
cmd.Parameters.AddWithValue("$id", id.ToString());
|
||||
|
||||
using var reader = cmd.ExecuteReader();
|
||||
if (!reader.Read())
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
return new TrackingStatusSnapshot(
|
||||
Id: TrackedOperationId.Parse(reader.GetString(0)),
|
||||
Kind: reader.GetString(1),
|
||||
TargetSummary: reader.IsDBNull(2) ? null : reader.GetString(2),
|
||||
Status: reader.GetString(3),
|
||||
RetryCount: reader.GetInt32(4),
|
||||
LastError: reader.IsDBNull(5) ? null : reader.GetString(5),
|
||||
HttpStatus: reader.IsDBNull(6) ? null : reader.GetInt32(6),
|
||||
CreatedAtUtc: ParseUtc(reader.GetString(7)),
|
||||
UpdatedAtUtc: ParseUtc(reader.GetString(8)),
|
||||
TerminalAtUtc: reader.IsDBNull(9) ? null : ParseUtc(reader.GetString(9)),
|
||||
SourceInstanceId: reader.IsDBNull(10) ? null : reader.GetString(10),
|
||||
SourceScript: reader.IsDBNull(11) ? null : reader.GetString(11));
|
||||
}
|
||||
finally
|
||||
{
|
||||
_gate.Release();
|
||||
}
|
||||
}
|
||||
|
||||
/// <inheritdoc/>
|
||||
public async Task PurgeTerminalAsync(
|
||||
DateTime olderThanUtc,
|
||||
CancellationToken ct = default)
|
||||
{
|
||||
await _gate.WaitAsync(ct).ConfigureAwait(false);
|
||||
try
|
||||
{
|
||||
ObjectDisposedException.ThrowIf(_disposed, this);
|
||||
|
||||
using var cmd = _connection.CreateCommand();
|
||||
// Non-terminal rows (TerminalAtUtc IS NULL) are kept regardless of
|
||||
// age — the operation is still in flight.
|
||||
cmd.CommandText = """
|
||||
DELETE FROM OperationTracking
|
||||
WHERE TerminalAtUtc IS NOT NULL
|
||||
AND TerminalAtUtc < $threshold;
|
||||
""";
|
||||
cmd.Parameters.AddWithValue(
|
||||
"$threshold",
|
||||
olderThanUtc.ToString("o", CultureInfo.InvariantCulture));
|
||||
|
||||
cmd.ExecuteNonQuery();
|
||||
}
|
||||
finally
|
||||
{
|
||||
_gate.Release();
|
||||
}
|
||||
}
|
||||
|
||||
private static DateTime ParseUtc(string raw)
|
||||
{
|
||||
return DateTime.Parse(
|
||||
raw,
|
||||
CultureInfo.InvariantCulture,
|
||||
DateTimeStyles.RoundtripKind);
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
DisposeAsyncCore().AsTask().GetAwaiter().GetResult();
|
||||
GC.SuppressFinalize(this);
|
||||
}
|
||||
|
||||
public async ValueTask DisposeAsync()
|
||||
{
|
||||
await DisposeAsyncCore().ConfigureAwait(false);
|
||||
GC.SuppressFinalize(this);
|
||||
}
|
||||
|
||||
private async ValueTask DisposeAsyncCore()
|
||||
{
|
||||
await _gate.WaitAsync().ConfigureAwait(false);
|
||||
try
|
||||
{
|
||||
if (_disposed) return;
|
||||
_disposed = true;
|
||||
_connection.Dispose();
|
||||
}
|
||||
finally
|
||||
{
|
||||
_gate.Release();
|
||||
_gate.Dispose();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,286 @@
|
||||
using Microsoft.Data.Sqlite;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using Microsoft.Extensions.Options;
|
||||
using ScadaLink.Commons.Types;
|
||||
using ScadaLink.Commons.Types.Enums;
|
||||
using ScadaLink.SiteRuntime.Tracking;
|
||||
|
||||
namespace ScadaLink.SiteRuntime.Tests.Tracking;
|
||||
|
||||
/// <summary>
|
||||
/// Audit Log #23 (M3 Bundle A — Task A2) — schema + behaviour tests for the
|
||||
/// site-local <see cref="OperationTrackingStore"/>. Each test uses a unique
|
||||
/// shared-cache in-memory SQLite database so the store and the verifier share
|
||||
/// the same store without touching disk.
|
||||
/// </summary>
|
||||
public class OperationTrackingStoreTests
|
||||
{
|
||||
private static (OperationTrackingStore store, string dataSource) CreateStore(
|
||||
string testName)
|
||||
{
|
||||
var dataSource = $"file:{testName}-{Guid.NewGuid():N}?mode=memory&cache=shared";
|
||||
var connectionString = $"Data Source={dataSource};Cache=Shared";
|
||||
var options = new OperationTrackingOptions
|
||||
{
|
||||
ConnectionString = connectionString,
|
||||
};
|
||||
var store = new OperationTrackingStore(
|
||||
Options.Create(options),
|
||||
NullLogger<OperationTrackingStore>.Instance);
|
||||
return (store, dataSource);
|
||||
}
|
||||
|
||||
private static SqliteConnection OpenVerifierConnection(string dataSource)
|
||||
{
|
||||
var connection = new SqliteConnection($"Data Source={dataSource};Cache=Shared");
|
||||
connection.Open();
|
||||
return connection;
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Constructor_CreatesOperationTracking_SchemaOnFirstUse()
|
||||
{
|
||||
var (store, dataSource) = CreateStore(nameof(Constructor_CreatesOperationTracking_SchemaOnFirstUse));
|
||||
using (store)
|
||||
{
|
||||
using var connection = OpenVerifierConnection(dataSource);
|
||||
using var cmd = connection.CreateCommand();
|
||||
cmd.CommandText = "PRAGMA table_info(OperationTracking);";
|
||||
using var reader = cmd.ExecuteReader();
|
||||
|
||||
var columns = new List<(string Name, int Pk, int NotNull)>();
|
||||
while (reader.Read())
|
||||
{
|
||||
columns.Add((reader.GetString(1), reader.GetInt32(5), reader.GetInt32(3)));
|
||||
}
|
||||
|
||||
var expected = new[]
|
||||
{
|
||||
"TrackedOperationId", "Kind", "TargetSummary", "Status",
|
||||
"RetryCount", "LastError", "HttpStatus", "CreatedAtUtc",
|
||||
"UpdatedAtUtc", "TerminalAtUtc", "SourceInstanceId", "SourceScript",
|
||||
};
|
||||
Assert.Equal(
|
||||
expected.OrderBy(n => n),
|
||||
columns.Select(c => c.Name).OrderBy(n => n));
|
||||
|
||||
var pkColumns = columns.Where(c => c.Pk > 0).Select(c => c.Name).ToList();
|
||||
Assert.Single(pkColumns);
|
||||
Assert.Equal("TrackedOperationId", pkColumns[0]);
|
||||
}
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task RecordEnqueueAsync_InsertsSubmittedRow_WithRetryCountZero()
|
||||
{
|
||||
var (store, dataSource) = CreateStore(nameof(RecordEnqueueAsync_InsertsSubmittedRow_WithRetryCountZero));
|
||||
await using var _ = store;
|
||||
|
||||
var id = TrackedOperationId.New();
|
||||
await store.RecordEnqueueAsync(
|
||||
id,
|
||||
kind: nameof(AuditKind.ApiCallCached),
|
||||
targetSummary: "ERP.GetOrder",
|
||||
sourceInstanceId: "Plant.Pump42",
|
||||
sourceScript: "ScriptActor:OnTick");
|
||||
|
||||
var snapshot = await store.GetStatusAsync(id);
|
||||
Assert.NotNull(snapshot);
|
||||
Assert.Equal(id, snapshot!.Id);
|
||||
Assert.Equal(nameof(AuditKind.ApiCallCached), snapshot.Kind);
|
||||
Assert.Equal("ERP.GetOrder", snapshot.TargetSummary);
|
||||
Assert.Equal(nameof(AuditStatus.Submitted), snapshot.Status);
|
||||
Assert.Equal(0, snapshot.RetryCount);
|
||||
Assert.Null(snapshot.LastError);
|
||||
Assert.Null(snapshot.HttpStatus);
|
||||
Assert.Null(snapshot.TerminalAtUtc);
|
||||
Assert.Equal("Plant.Pump42", snapshot.SourceInstanceId);
|
||||
Assert.Equal("ScriptActor:OnTick", snapshot.SourceScript);
|
||||
Assert.Equal(DateTimeKind.Utc, snapshot.CreatedAtUtc.Kind);
|
||||
Assert.Equal(DateTimeKind.Utc, snapshot.UpdatedAtUtc.Kind);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task RecordEnqueueAsync_Duplicate_IsNoOp_FirstWriteWins()
|
||||
{
|
||||
var (store, _) = CreateStore(nameof(RecordEnqueueAsync_Duplicate_IsNoOp_FirstWriteWins));
|
||||
await using var _store = store;
|
||||
|
||||
var id = TrackedOperationId.New();
|
||||
await store.RecordEnqueueAsync(id, "ApiCallCached", "ERP.GetOrder", "Plant.Pump42", "ScriptActor:OnTick");
|
||||
await store.RecordEnqueueAsync(id, "ApiCallCached", "OtherTarget", "Other.Instance", "ScriptActor:Other");
|
||||
|
||||
var snapshot = await store.GetStatusAsync(id);
|
||||
Assert.NotNull(snapshot);
|
||||
// First-write-wins: the second enqueue is ignored — Target/Source stay first.
|
||||
Assert.Equal("ERP.GetOrder", snapshot!.TargetSummary);
|
||||
Assert.Equal("Plant.Pump42", snapshot.SourceInstanceId);
|
||||
Assert.Equal("ScriptActor:OnTick", snapshot.SourceScript);
|
||||
Assert.Equal(nameof(AuditStatus.Submitted), snapshot.Status);
|
||||
Assert.Equal(0, snapshot.RetryCount);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task RecordAttemptAsync_AdvancesStatusAndRetryCount_OnNonTerminalRow()
|
||||
{
|
||||
var (store, _) = CreateStore(nameof(RecordAttemptAsync_AdvancesStatusAndRetryCount_OnNonTerminalRow));
|
||||
await using var _store = store;
|
||||
|
||||
var id = TrackedOperationId.New();
|
||||
await store.RecordEnqueueAsync(id, "ApiCallCached", "ERP.GetOrder", null, null);
|
||||
|
||||
await store.RecordAttemptAsync(
|
||||
id,
|
||||
status: nameof(AuditStatus.Attempted),
|
||||
retryCount: 1,
|
||||
lastError: "HTTP 503 from ERP",
|
||||
httpStatus: 503);
|
||||
|
||||
var snapshot = await store.GetStatusAsync(id);
|
||||
Assert.NotNull(snapshot);
|
||||
Assert.Equal(nameof(AuditStatus.Attempted), snapshot!.Status);
|
||||
Assert.Equal(1, snapshot.RetryCount);
|
||||
Assert.Equal("HTTP 503 from ERP", snapshot.LastError);
|
||||
Assert.Equal(503, snapshot.HttpStatus);
|
||||
Assert.Null(snapshot.TerminalAtUtc);
|
||||
|
||||
// UpdatedAtUtc advances past CreatedAtUtc.
|
||||
Assert.True(snapshot.UpdatedAtUtc >= snapshot.CreatedAtUtc);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task RecordAttemptAsync_OnTerminalRow_IsNoOp()
|
||||
{
|
||||
var (store, _) = CreateStore(nameof(RecordAttemptAsync_OnTerminalRow_IsNoOp));
|
||||
await using var _store = store;
|
||||
|
||||
var id = TrackedOperationId.New();
|
||||
await store.RecordEnqueueAsync(id, "ApiCallCached", "ERP.GetOrder", null, null);
|
||||
await store.RecordTerminalAsync(
|
||||
id,
|
||||
status: nameof(AuditStatus.Delivered),
|
||||
lastError: null,
|
||||
httpStatus: 200);
|
||||
|
||||
var terminalSnapshot = await store.GetStatusAsync(id);
|
||||
Assert.NotNull(terminalSnapshot);
|
||||
Assert.NotNull(terminalSnapshot!.TerminalAtUtc);
|
||||
|
||||
// Late attempt telemetry must NOT overwrite the terminal row.
|
||||
await store.RecordAttemptAsync(
|
||||
id,
|
||||
status: nameof(AuditStatus.Attempted),
|
||||
retryCount: 5,
|
||||
lastError: "late attempt",
|
||||
httpStatus: 500);
|
||||
|
||||
var afterLate = await store.GetStatusAsync(id);
|
||||
Assert.NotNull(afterLate);
|
||||
Assert.Equal(nameof(AuditStatus.Delivered), afterLate!.Status);
|
||||
Assert.Equal(0, afterLate.RetryCount);
|
||||
Assert.Null(afterLate.LastError);
|
||||
Assert.Equal(200, afterLate.HttpStatus);
|
||||
Assert.NotNull(afterLate.TerminalAtUtc);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task RecordTerminalAsync_FlipsToTerminal_WithTerminalAtUtcSet()
|
||||
{
|
||||
var (store, _) = CreateStore(nameof(RecordTerminalAsync_FlipsToTerminal_WithTerminalAtUtcSet));
|
||||
await using var _store = store;
|
||||
|
||||
var id = TrackedOperationId.New();
|
||||
await store.RecordEnqueueAsync(id, "ApiCallCached", "ERP.GetOrder", null, null);
|
||||
|
||||
var beforeTerminal = DateTime.UtcNow;
|
||||
await store.RecordTerminalAsync(
|
||||
id,
|
||||
status: nameof(AuditStatus.Parked),
|
||||
lastError: "HTTP 503 (max retries)",
|
||||
httpStatus: 503);
|
||||
|
||||
var snapshot = await store.GetStatusAsync(id);
|
||||
Assert.NotNull(snapshot);
|
||||
Assert.Equal(nameof(AuditStatus.Parked), snapshot!.Status);
|
||||
Assert.NotNull(snapshot.TerminalAtUtc);
|
||||
Assert.Equal(DateTimeKind.Utc, snapshot.TerminalAtUtc!.Value.Kind);
|
||||
Assert.True(snapshot.TerminalAtUtc >= beforeTerminal.AddSeconds(-1));
|
||||
Assert.Equal("HTTP 503 (max retries)", snapshot.LastError);
|
||||
Assert.Equal(503, snapshot.HttpStatus);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task GetStatusAsync_Unknown_ReturnsNull()
|
||||
{
|
||||
var (store, _) = CreateStore(nameof(GetStatusAsync_Unknown_ReturnsNull));
|
||||
await using var _store = store;
|
||||
|
||||
var unknown = TrackedOperationId.New();
|
||||
var snapshot = await store.GetStatusAsync(unknown);
|
||||
|
||||
Assert.Null(snapshot);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task GetStatusAsync_ReturnsLatestSnapshot_AfterMultipleAttempts()
|
||||
{
|
||||
var (store, _) = CreateStore(nameof(GetStatusAsync_ReturnsLatestSnapshot_AfterMultipleAttempts));
|
||||
await using var _store = store;
|
||||
|
||||
var id = TrackedOperationId.New();
|
||||
await store.RecordEnqueueAsync(id, "ApiCallCached", "ERP.GetOrder", null, null);
|
||||
await store.RecordAttemptAsync(id, nameof(AuditStatus.Attempted), 1, "first failure", 503);
|
||||
await store.RecordAttemptAsync(id, nameof(AuditStatus.Attempted), 2, "second failure", 503);
|
||||
await store.RecordAttemptAsync(id, nameof(AuditStatus.Attempted), 3, "third failure", 504);
|
||||
|
||||
var snapshot = await store.GetStatusAsync(id);
|
||||
Assert.NotNull(snapshot);
|
||||
Assert.Equal(3, snapshot!.RetryCount);
|
||||
Assert.Equal("third failure", snapshot.LastError);
|
||||
Assert.Equal(504, snapshot.HttpStatus);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task PurgeTerminalAsync_RemovesOldTerminalRows_KeepsRecent_KeepsNonTerminal()
|
||||
{
|
||||
var (store, dataSource) = CreateStore(nameof(PurgeTerminalAsync_RemovesOldTerminalRows_KeepsRecent_KeepsNonTerminal));
|
||||
await using var _store = store;
|
||||
|
||||
// Three rows:
|
||||
// (a) terminal, old → should be purged
|
||||
// (b) terminal, fresh → should be kept
|
||||
// (c) non-terminal, ancient CreatedAt → should be kept (no TerminalAtUtc)
|
||||
var aId = TrackedOperationId.New();
|
||||
var bId = TrackedOperationId.New();
|
||||
var cId = TrackedOperationId.New();
|
||||
|
||||
await store.RecordEnqueueAsync(aId, "ApiCallCached", "A", null, null);
|
||||
await store.RecordEnqueueAsync(bId, "ApiCallCached", "B", null, null);
|
||||
await store.RecordEnqueueAsync(cId, "ApiCallCached", "C", null, null);
|
||||
|
||||
await store.RecordTerminalAsync(aId, nameof(AuditStatus.Delivered), null, 200);
|
||||
await store.RecordTerminalAsync(bId, nameof(AuditStatus.Delivered), null, 200);
|
||||
|
||||
// Backdate the (a) row's TerminalAtUtc to 30 days ago via a direct UPDATE
|
||||
// — RecordTerminalAsync stamps DateTime.UtcNow which we cannot inject.
|
||||
// The verifier connection shares the same in-memory store thanks to
|
||||
// mode=memory&cache=shared.
|
||||
using (var connection = OpenVerifierConnection(dataSource))
|
||||
using (var cmd = connection.CreateCommand())
|
||||
{
|
||||
cmd.CommandText =
|
||||
"UPDATE OperationTracking SET TerminalAtUtc = $old WHERE TrackedOperationId = $id;";
|
||||
cmd.Parameters.AddWithValue("$old", DateTime.UtcNow.AddDays(-30).ToString("o"));
|
||||
cmd.Parameters.AddWithValue("$id", aId.ToString());
|
||||
cmd.ExecuteNonQuery();
|
||||
}
|
||||
|
||||
// Purge anything terminal older than 7 days.
|
||||
var threshold = DateTime.UtcNow.AddDays(-7);
|
||||
await store.PurgeTerminalAsync(threshold);
|
||||
|
||||
Assert.Null(await store.GetStatusAsync(aId)); // purged
|
||||
Assert.NotNull(await store.GetStatusAsync(bId)); // kept (recent terminal)
|
||||
Assert.NotNull(await store.GetStatusAsync(cId)); // kept (non-terminal)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user