Files
ScadaBridge/src/ZB.MOM.WW.ScadaBridge.SiteRuntime/Tracking/OperationTrackingStore.cs
T
Joseph Doherty 7b0b9c7365 refactor: rename ScadaLink → ZB.MOM.WW.ScadaBridge (code + projects + namespaces)
Solution + 23 src projects + 26 test projects renamed; folders, csproj,
namespaces, and ScadaLinkDbContext/ScadaBridgeDbContext class updated.
ActorSystem "scadalink" → "scadabridge", Akka seed-node URLs migrated.
SQL roles/logins, LDAP domains, CLI command name, and CLI config dir
(~/.scadalink → ~/.scadabridge) also renamed.

Build green; 5 Host.Tests fail awaiting SQL login rename in next commit.
Pre-existing StaleTagMonitor timing flakes unchanged.

Rename script committed at tools/rename-to-scadabridge.sh.
2026-05-28 09:37:45 -04:00

436 lines
18 KiB
C#

using System.Globalization;
using Microsoft.Data.Sqlite;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using ZB.MOM.WW.ScadaBridge.Commons.Interfaces;
using ZB.MOM.WW.ScadaBridge.Commons.Types;
namespace ZB.MOM.WW.ScadaBridge.SiteRuntime.Tracking;
/// <summary>
/// Site-local SQLite source-of-truth for cached-operation tracking — the row
/// that <c>Tracking.Status(TrackedOperationId)</c> reads (Audit Log #23 / M3).
/// </summary>
/// <remarks>
/// <para>
/// One row per <see cref="TrackedOperationId"/>; lifecycle is
/// <c>Submitted → Retrying → Delivered / Parked / Failed / Discarded</c>; terminal
/// rows are purged after the configured retention window
/// (<see cref="OperationTrackingOptions.RetentionDays"/>). Volume is bounded —
/// only cached calls produce rows, and only a handful of lifecycle events per
/// call — so we keep the implementation deliberately simple: a single owned
/// <see cref="SqliteConnection"/> serialised behind a <see cref="SemaphoreSlim"/>
/// (one async writer at a time). This is the pattern the M3 brief calls out as
/// "cleaner than the M2 Channel&lt;T&gt; pipeline given the volume"; the M2
/// audit-writer's batched-channel design is reserved for the high-volume audit
/// hot-path.
/// </para>
/// <para>
/// All mutations are idempotent / monotonic: <see cref="RecordEnqueueAsync"/> is
/// <c>INSERT OR IGNORE</c>, <see cref="RecordAttemptAsync"/> filters out terminal
/// rows in the <c>WHERE</c> clause, and <see cref="RecordTerminalAsync"/> only
/// fires on rows that haven't terminated yet (first-write-wins). This makes the
/// store safe under the at-least-once semantics of the site→central telemetry
/// path.
/// </para>
/// </remarks>
public class OperationTrackingStore : IOperationTrackingStore, IAsyncDisposable, IDisposable
{
// SiteRuntime-024: writer state — one owned SqliteConnection serialised behind
// _writeGate. Readers do NOT share this connection or gate; see GetStatusAsync.
private readonly SqliteConnection _writeConnection;
private readonly SemaphoreSlim _writeGate = new(1, 1);
private readonly string _connectionString;
private readonly ILogger<OperationTrackingStore> _logger;
// SiteRuntime-024: dispose-once state shared by the sync Dispose and async
// DisposeAsync paths. Interlocked.Exchange is the race-safe primitive here —
// a plain bool can be flipped twice if Dispose() and DisposeAsync() are
// invoked concurrently (e.g. host shutdown bridging both). 0 = live,
// 1 = disposed. Read by other methods via Volatile.Read after the gate is
// taken; they raise ObjectDisposedException when set.
private int _disposeState;
/// <summary>
/// Initializes the tracking store, opens the SQLite connection, and applies the schema.
/// </summary>
/// <param name="options">Tracking store configuration (connection string, retention window).</param>
/// <param name="logger">Logger for diagnostics.</param>
public OperationTrackingStore(
IOptions<OperationTrackingOptions> options,
ILogger<OperationTrackingStore> logger)
{
ArgumentNullException.ThrowIfNull(options);
ArgumentNullException.ThrowIfNull(logger);
_logger = logger;
_connectionString = options.Value.ConnectionString;
_writeConnection = new SqliteConnection(_connectionString);
_writeConnection.Open();
InitializeSchema();
}
private void InitializeSchema()
{
using var cmd = _writeConnection.CreateCommand();
cmd.CommandText = """
CREATE TABLE IF NOT EXISTS OperationTracking (
TrackedOperationId TEXT NOT NULL PRIMARY KEY,
Kind TEXT NOT NULL,
TargetSummary TEXT NULL,
Status TEXT NOT NULL,
RetryCount INTEGER NOT NULL DEFAULT 0,
LastError TEXT NULL,
HttpStatus INTEGER NULL,
CreatedAtUtc TEXT NOT NULL,
UpdatedAtUtc TEXT NOT NULL,
TerminalAtUtc TEXT NULL,
SourceInstanceId TEXT NULL,
SourceScript TEXT NULL,
SourceNode TEXT NULL
);
CREATE INDEX IF NOT EXISTS IX_OperationTracking_Status_Updated
ON OperationTracking (Status, UpdatedAtUtc);
""";
cmd.ExecuteNonQuery();
// SourceNode stamping: additively add the SourceNode column.
// CREATE TABLE IF NOT EXISTS above does NOT add columns to an
// OperationTracking table that already exists from a pre-SourceNode
// build, so a tracking.db created by an older build needs the column
// ALTER-ed in. The file is durable across restart/failover by design
// (retention window default 7 days), so without this step every
// RecordEnqueueAsync on an upgraded deployment would bind $sourceNode
// against a missing column and the write would fail.
// SQLite has no "ADD COLUMN IF NOT EXISTS"; the column presence is
// probed first and the ALTER skipped when already there. The column is
// nullable with no default, so any row written before this migration
// reads back SourceNode = null (back-compat).
//
// NOTE: This is the FIRST idempotent column-upgrade in
// OperationTrackingStore — prior schema changes pre-dated any
// production rollout and relied solely on CREATE TABLE IF NOT EXISTS.
// The helper mirrors the SqliteAuditWriter precedent.
AddColumnIfMissing("SourceNode", "TEXT NULL");
}
/// <summary>
/// Additively adds a column to <c>OperationTracking</c> only when it is not
/// already present. SQLite lacks <c>ADD COLUMN IF NOT EXISTS</c>, so the
/// schema is probed via <c>PRAGMA table_info</c> first. Idempotent — safe
/// to run on every <see cref="InitializeSchema"/>. Mirrors the
/// <c>SqliteAuditWriter.AddColumnIfMissing</c> precedent.
/// </summary>
private void AddColumnIfMissing(string columnName, string columnDefinition)
{
using var probe = _writeConnection.CreateCommand();
probe.CommandText = "SELECT COUNT(*) FROM pragma_table_info('OperationTracking') WHERE name = $name";
probe.Parameters.AddWithValue("$name", columnName);
var exists = Convert.ToInt32(probe.ExecuteScalar()) > 0;
if (exists)
{
return;
}
using var alter = _writeConnection.CreateCommand();
// Column name + definition are caller-controlled constants, never user
// input — safe to interpolate (parameters are not permitted in DDL).
alter.CommandText = $"ALTER TABLE OperationTracking ADD COLUMN {columnName} {columnDefinition}";
alter.ExecuteNonQuery();
}
/// <inheritdoc/>
public async Task RecordEnqueueAsync(
TrackedOperationId id,
string kind,
string? targetSummary,
string? sourceInstanceId,
string? sourceScript,
string? sourceNode,
CancellationToken ct = default)
{
ArgumentNullException.ThrowIfNull(kind);
await _writeGate.WaitAsync(ct).ConfigureAwait(false);
try
{
ObjectDisposedException.ThrowIf(Volatile.Read(ref _disposeState) != 0, this);
var now = DateTime.UtcNow.ToString("o", CultureInfo.InvariantCulture);
using var cmd = _writeConnection.CreateCommand();
// INSERT OR IGNORE: duplicate ids are no-ops (first-write-wins) —
// matches the at-least-once semantics the site emits under.
cmd.CommandText = """
INSERT OR IGNORE INTO OperationTracking (
TrackedOperationId, Kind, TargetSummary, Status,
RetryCount, LastError, HttpStatus,
CreatedAtUtc, UpdatedAtUtc, TerminalAtUtc,
SourceInstanceId, SourceScript, SourceNode
) VALUES (
$id, $kind, $targetSummary, $status,
0, NULL, NULL,
$now, $now, NULL,
$sourceInstanceId, $sourceScript, $sourceNode
);
""";
cmd.Parameters.AddWithValue("$id", id.ToString());
cmd.Parameters.AddWithValue("$kind", kind);
cmd.Parameters.AddWithValue("$targetSummary", (object?)targetSummary ?? DBNull.Value);
cmd.Parameters.AddWithValue("$status", "Submitted");
cmd.Parameters.AddWithValue("$now", now);
cmd.Parameters.AddWithValue("$sourceInstanceId", (object?)sourceInstanceId ?? DBNull.Value);
cmd.Parameters.AddWithValue("$sourceScript", (object?)sourceScript ?? DBNull.Value);
cmd.Parameters.AddWithValue("$sourceNode", (object?)sourceNode ?? DBNull.Value);
cmd.ExecuteNonQuery();
}
finally
{
_writeGate.Release();
}
}
/// <inheritdoc/>
public async Task RecordAttemptAsync(
TrackedOperationId id,
string status,
int retryCount,
string? lastError,
int? httpStatus,
CancellationToken ct = default)
{
ArgumentNullException.ThrowIfNull(status);
await _writeGate.WaitAsync(ct).ConfigureAwait(false);
try
{
ObjectDisposedException.ThrowIf(Volatile.Read(ref _disposeState) != 0, this);
var now = DateTime.UtcNow.ToString("o", CultureInfo.InvariantCulture);
using var cmd = _writeConnection.CreateCommand();
// Terminal rows are immutable — the WHERE clause filters them out so
// late-arriving attempt telemetry never overwrites a resolved row.
cmd.CommandText = """
UPDATE OperationTracking
SET Status = $status,
RetryCount = $retryCount,
LastError = $lastError,
HttpStatus = $httpStatus,
UpdatedAtUtc = $now
WHERE TrackedOperationId = $id
AND TerminalAtUtc IS NULL;
""";
cmd.Parameters.AddWithValue("$id", id.ToString());
cmd.Parameters.AddWithValue("$status", status);
cmd.Parameters.AddWithValue("$retryCount", retryCount);
cmd.Parameters.AddWithValue("$lastError", (object?)lastError ?? DBNull.Value);
cmd.Parameters.AddWithValue("$httpStatus", (object?)httpStatus ?? DBNull.Value);
cmd.Parameters.AddWithValue("$now", now);
cmd.ExecuteNonQuery();
}
finally
{
_writeGate.Release();
}
}
/// <inheritdoc/>
public async Task RecordTerminalAsync(
TrackedOperationId id,
string status,
string? lastError,
int? httpStatus,
CancellationToken ct = default)
{
ArgumentNullException.ThrowIfNull(status);
await _writeGate.WaitAsync(ct).ConfigureAwait(false);
try
{
ObjectDisposedException.ThrowIf(Volatile.Read(ref _disposeState) != 0, this);
var now = DateTime.UtcNow.ToString("o", CultureInfo.InvariantCulture);
using var cmd = _writeConnection.CreateCommand();
// First-write-wins on the terminal flip: only update rows that
// haven't already terminated.
cmd.CommandText = """
UPDATE OperationTracking
SET Status = $status,
LastError = $lastError,
HttpStatus = $httpStatus,
UpdatedAtUtc = $now,
TerminalAtUtc = $now
WHERE TrackedOperationId = $id
AND TerminalAtUtc IS NULL;
""";
cmd.Parameters.AddWithValue("$id", id.ToString());
cmd.Parameters.AddWithValue("$status", status);
cmd.Parameters.AddWithValue("$lastError", (object?)lastError ?? DBNull.Value);
cmd.Parameters.AddWithValue("$httpStatus", (object?)httpStatus ?? DBNull.Value);
cmd.Parameters.AddWithValue("$now", now);
cmd.ExecuteNonQuery();
}
finally
{
_writeGate.Release();
}
}
/// <inheritdoc/>
public async Task<TrackingStatusSnapshot?> GetStatusAsync(
TrackedOperationId id,
CancellationToken ct = default)
{
ObjectDisposedException.ThrowIf(Volatile.Read(ref _disposeState) != 0, this);
// SiteRuntime-024: reads open a fresh, ungated SqliteConnection so a
// long-running write doesn't block status queries. The connection
// string is shared with the writer; SQLite handles cross-connection
// isolation natively (a reader sees a consistent snapshot via the
// shared cache lock for in-memory DBs, or a WAL snapshot for file DBs).
// Mirrors the SiteStorageService precedent.
await using var readConnection = new SqliteConnection(_connectionString);
await readConnection.OpenAsync(ct).ConfigureAwait(false);
await using var cmd = readConnection.CreateCommand();
cmd.CommandText = """
SELECT TrackedOperationId, Kind, TargetSummary, Status,
RetryCount, LastError, HttpStatus,
CreatedAtUtc, UpdatedAtUtc, TerminalAtUtc,
SourceInstanceId, SourceScript, SourceNode
FROM OperationTracking
WHERE TrackedOperationId = $id;
""";
cmd.Parameters.AddWithValue("$id", id.ToString());
await using var reader = await cmd.ExecuteReaderAsync(ct).ConfigureAwait(false);
if (!await reader.ReadAsync(ct).ConfigureAwait(false))
{
return null;
}
return new TrackingStatusSnapshot(
Id: TrackedOperationId.Parse(reader.GetString(0)),
Kind: reader.GetString(1),
TargetSummary: reader.IsDBNull(2) ? null : reader.GetString(2),
Status: reader.GetString(3),
RetryCount: reader.GetInt32(4),
LastError: reader.IsDBNull(5) ? null : reader.GetString(5),
HttpStatus: reader.IsDBNull(6) ? null : reader.GetInt32(6),
CreatedAtUtc: ParseUtc(reader.GetString(7)),
UpdatedAtUtc: ParseUtc(reader.GetString(8)),
TerminalAtUtc: reader.IsDBNull(9) ? null : ParseUtc(reader.GetString(9)),
SourceInstanceId: reader.IsDBNull(10) ? null : reader.GetString(10),
SourceScript: reader.IsDBNull(11) ? null : reader.GetString(11),
SourceNode: reader.IsDBNull(12) ? null : reader.GetString(12));
}
/// <inheritdoc/>
public async Task PurgeTerminalAsync(
DateTime olderThanUtc,
CancellationToken ct = default)
{
await _writeGate.WaitAsync(ct).ConfigureAwait(false);
try
{
ObjectDisposedException.ThrowIf(Volatile.Read(ref _disposeState) != 0, this);
using var cmd = _writeConnection.CreateCommand();
// Non-terminal rows (TerminalAtUtc IS NULL) are kept regardless of
// age — the operation is still in flight.
cmd.CommandText = """
DELETE FROM OperationTracking
WHERE TerminalAtUtc IS NOT NULL
AND TerminalAtUtc < $threshold;
""";
cmd.Parameters.AddWithValue(
"$threshold",
olderThanUtc.ToString("o", CultureInfo.InvariantCulture));
cmd.ExecuteNonQuery();
}
finally
{
_writeGate.Release();
}
}
private static DateTime ParseUtc(string raw)
{
return DateTime.Parse(
raw,
CultureInfo.InvariantCulture,
DateTimeStyles.RoundtripKind);
}
/// <summary>
/// Synchronously disposes the tracking store and its SQLite connection.
/// </summary>
/// <remarks>
/// SiteRuntime-024: this path does NOT bridge to async via
/// <c>.AsTask().GetAwaiter().GetResult()</c>. Sync-over-async on a SemaphoreSlim
/// can deadlock when invoked from a non-reentrant SyncContext (e.g. host
/// shutdown continuations observed on the host sync context). In-flight writes
/// at the moment of <see cref="Dispose"/> will fail their next operation
/// against the disposed connection with <see cref="ObjectDisposedException"/> —
/// the caller's responsibility is to ensure no concurrent operations during
/// the synchronous dispose. Use <see cref="DisposeAsync"/> if you need to
/// drain in-flight writes before close.
/// </remarks>
public void Dispose()
{
if (Interlocked.Exchange(ref _disposeState, 1) != 0)
{
return;
}
_writeConnection.Dispose();
_writeGate.Dispose();
GC.SuppressFinalize(this);
}
/// <summary>
/// Asynchronously disposes the tracking store and its SQLite connection.
/// Drains in-flight writes by acquiring the write gate before closing the
/// connection, so a write currently executing a SqliteCommand completes
/// before the connection is freed.
/// </summary>
public async ValueTask DisposeAsync()
{
if (Interlocked.Exchange(ref _disposeState, 1) != 0)
{
return;
}
// Drain any in-flight write by taking the write gate. Past this point
// no new write can acquire the gate because _disposeState is set, so
// the next ThrowIf check in each writer raises ObjectDisposedException.
try
{
await _writeGate.WaitAsync().ConfigureAwait(false);
}
catch (ObjectDisposedException)
{
// Race with another disposer that already disposed the gate — the
// _disposeState exchange above should prevent this, but be defensive.
}
try
{
_writeConnection.Dispose();
}
finally
{
try { _writeGate.Release(); } catch (ObjectDisposedException) { }
_writeGate.Dispose();
}
GC.SuppressFinalize(this);
}
}