feat(site-runtime): add SourceNode column to OperationTracking + thread through RecordEnqueueAsync

This commit is contained in:
Joseph Doherty
2026-05-23 16:54:48 -04:00
parent f3cb8c0791
commit 277882d230
7 changed files with 238 additions and 19 deletions

View File

@@ -128,12 +128,16 @@ public sealed class CachedCallTelemetryForwarder : ICachedCallTelemetryForwarder
// Enqueue — insert-if-not-exists with the operational
// channel as the kind discriminator. RetryCount is fixed
// at 0 by the tracking store's INSERT contract.
// sourceNode plumbed through but left null here; stamping
// is wired in a later task (Task 14) once the
// INodeIdentityProvider is threaded into the forwarder.
await _trackingStore.RecordEnqueueAsync(
telemetry.Operational.TrackedOperationId,
telemetry.Operational.Channel,
telemetry.Operational.Target,
telemetry.Audit.SourceInstanceId,
telemetry.Audit.SourceScript,
sourceNode: null,
ct).ConfigureAwait(false);
break;

View File

@@ -40,6 +40,7 @@ public interface IOperationTrackingStore
string? targetSummary,
string? sourceInstanceId,
string? sourceScript,
string? sourceNode,
CancellationToken ct = default);
/// <summary>

View File

@@ -25,6 +25,11 @@ namespace ScadaLink.Commons.Types;
/// <param name="TerminalAtUtc">UTC timestamp the row reached a terminal status; null while still active.</param>
/// <param name="SourceInstanceId">Instance id that issued the cached call, when known.</param>
/// <param name="SourceScript">Script that issued the cached call, when known.</param>
/// <param name="SourceNode">
/// Cluster node that submitted the cached call (e.g. <c>"node-a"</c> /
/// <c>"node-b"</c>), captured at enqueue time. Null on rows persisted before
/// the SourceNode stamping migration; stamping itself is wired in a later task.
/// </param>
public sealed record TrackingStatusSnapshot(
TrackedOperationId Id,
string Kind,
@@ -37,4 +42,5 @@ public sealed record TrackingStatusSnapshot(
DateTime UpdatedAtUtc,
DateTime? TerminalAtUtc,
string? SourceInstanceId,
string? SourceScript);
string? SourceScript,
string? SourceNode);

View File

@@ -70,12 +70,57 @@ public class OperationTrackingStore : IOperationTrackingStore, IAsyncDisposable,
UpdatedAtUtc TEXT NOT NULL,
TerminalAtUtc TEXT NULL,
SourceInstanceId TEXT NULL,
SourceScript TEXT NULL
SourceScript TEXT NULL,
SourceNode TEXT NULL
);
CREATE INDEX IF NOT EXISTS IX_OperationTracking_Status_Updated
ON OperationTracking (Status, UpdatedAtUtc);
""";
cmd.ExecuteNonQuery();
// SourceNode stamping: additively add the SourceNode column.
// CREATE TABLE IF NOT EXISTS above does NOT add columns to an
// OperationTracking table that already exists from a pre-SourceNode
// build, so a tracking.db created by an older build needs the column
// ALTER-ed in. The file is durable across restart/failover by design
// (retention window default 7 days), so without this step every
// RecordEnqueueAsync on an upgraded deployment would bind $sourceNode
// against a missing column and the write would fail.
// SQLite has no "ADD COLUMN IF NOT EXISTS"; the column presence is
// probed first and the ALTER skipped when already there. The column is
// nullable with no default, so any row written before this migration
// reads back SourceNode = null (back-compat).
//
// NOTE: This is the FIRST idempotent column-upgrade in
// OperationTrackingStore — prior schema changes pre-dated any
// production rollout and relied solely on CREATE TABLE IF NOT EXISTS.
// The helper mirrors the SqliteAuditWriter precedent.
AddColumnIfMissing("SourceNode", "TEXT NULL");
}
/// <summary>
/// Additively adds a column to <c>OperationTracking</c> only when it is not
/// already present. SQLite lacks <c>ADD COLUMN IF NOT EXISTS</c>, so the
/// schema is probed via <c>PRAGMA table_info</c> first. Idempotent — safe
/// to run on every <see cref="InitializeSchema"/>. Mirrors the
/// <c>SqliteAuditWriter.AddColumnIfMissing</c> precedent.
/// </summary>
private void AddColumnIfMissing(string columnName, string columnDefinition)
{
using var probe = _connection.CreateCommand();
probe.CommandText = "SELECT COUNT(*) FROM pragma_table_info('OperationTracking') WHERE name = $name";
probe.Parameters.AddWithValue("$name", columnName);
var exists = Convert.ToInt32(probe.ExecuteScalar()) > 0;
if (exists)
{
return;
}
using var alter = _connection.CreateCommand();
// Column name + definition are caller-controlled constants, never user
// input — safe to interpolate (parameters are not permitted in DDL).
alter.CommandText = $"ALTER TABLE OperationTracking ADD COLUMN {columnName} {columnDefinition}";
alter.ExecuteNonQuery();
}
/// <inheritdoc/>
@@ -85,6 +130,7 @@ public class OperationTrackingStore : IOperationTrackingStore, IAsyncDisposable,
string? targetSummary,
string? sourceInstanceId,
string? sourceScript,
string? sourceNode,
CancellationToken ct = default)
{
ArgumentNullException.ThrowIfNull(kind);
@@ -104,12 +150,12 @@ public class OperationTrackingStore : IOperationTrackingStore, IAsyncDisposable,
TrackedOperationId, Kind, TargetSummary, Status,
RetryCount, LastError, HttpStatus,
CreatedAtUtc, UpdatedAtUtc, TerminalAtUtc,
SourceInstanceId, SourceScript
SourceInstanceId, SourceScript, SourceNode
) VALUES (
$id, $kind, $targetSummary, $status,
0, NULL, NULL,
$now, $now, NULL,
$sourceInstanceId, $sourceScript
$sourceInstanceId, $sourceScript, $sourceNode
);
""";
cmd.Parameters.AddWithValue("$id", id.ToString());
@@ -119,6 +165,7 @@ public class OperationTrackingStore : IOperationTrackingStore, IAsyncDisposable,
cmd.Parameters.AddWithValue("$now", now);
cmd.Parameters.AddWithValue("$sourceInstanceId", (object?)sourceInstanceId ?? DBNull.Value);
cmd.Parameters.AddWithValue("$sourceScript", (object?)sourceScript ?? DBNull.Value);
cmd.Parameters.AddWithValue("$sourceNode", (object?)sourceNode ?? DBNull.Value);
cmd.ExecuteNonQuery();
}
@@ -233,7 +280,7 @@ public class OperationTrackingStore : IOperationTrackingStore, IAsyncDisposable,
SELECT TrackedOperationId, Kind, TargetSummary, Status,
RetryCount, LastError, HttpStatus,
CreatedAtUtc, UpdatedAtUtc, TerminalAtUtc,
SourceInstanceId, SourceScript
SourceInstanceId, SourceScript, SourceNode
FROM OperationTracking
WHERE TrackedOperationId = $id;
""";
@@ -257,7 +304,8 @@ public class OperationTrackingStore : IOperationTrackingStore, IAsyncDisposable,
UpdatedAtUtc: ParseUtc(reader.GetString(8)),
TerminalAtUtc: reader.IsDBNull(9) ? null : ParseUtc(reader.GetString(9)),
SourceInstanceId: reader.IsDBNull(10) ? null : reader.GetString(10),
SourceScript: reader.IsDBNull(11) ? null : reader.GetString(11));
SourceScript: reader.IsDBNull(11) ? null : reader.GetString(11),
SourceNode: reader.IsDBNull(12) ? null : reader.GetString(12));
}
finally
{

View File

@@ -135,12 +135,14 @@ public class CachedCallTelemetryForwarderTests
Arg.Any<CancellationToken>());
// Tracking row: insert-if-not-exists with kind discriminator.
// sourceNode is null until Task 14 wires the INodeIdentityProvider through.
await _tracking.Received(1).RecordEnqueueAsync(
_id,
"ApiOutbound",
"ERP.GetOrder",
"inst-1",
"ScriptActor:doStuff",
null,
Arg.Any<CancellationToken>());
await _tracking.DidNotReceiveWithAnyArgs().RecordAttemptAsync(
default, default!, default, default, default, default);
@@ -166,7 +168,7 @@ public class CachedCallTelemetryForwarderTests
await _tracking.Received(1).RecordAttemptAsync(
_id, "Attempted", 2, "HTTP 503", 503, Arg.Any<CancellationToken>());
await _tracking.DidNotReceiveWithAnyArgs().RecordEnqueueAsync(
default, default!, default, default, default, default);
default, default!, default, default, default, default, default);
await _tracking.DidNotReceiveWithAnyArgs().RecordTerminalAsync(
default, default!, default, default, default);
}
@@ -189,7 +191,7 @@ public class CachedCallTelemetryForwarderTests
await _tracking.Received(1).RecordTerminalAsync(
_id, "Delivered", null, null, Arg.Any<CancellationToken>());
await _tracking.DidNotReceiveWithAnyArgs().RecordEnqueueAsync(
default, default!, default, default, default, default);
default, default!, default, default, default, default, default);
await _tracking.DidNotReceiveWithAnyArgs().RecordAttemptAsync(
default, default!, default, default, default, default);
}
@@ -213,6 +215,7 @@ public class CachedCallTelemetryForwarderTests
Arg.Any<string?>(),
Arg.Any<string?>(),
Arg.Any<string?>(),
Arg.Any<string?>(),
Arg.Any<CancellationToken>());
}
@@ -225,6 +228,7 @@ public class CachedCallTelemetryForwarderTests
Arg.Any<string?>(),
Arg.Any<string?>(),
Arg.Any<string?>(),
Arg.Any<string?>(),
Arg.Any<CancellationToken>())
.Throws(new InvalidOperationException("sqlite locked"));

View File

@@ -51,7 +51,8 @@ public class TrackingApiTests
UpdatedAtUtc: new DateTime(2026, 5, 20, 10, 2, 30, DateTimeKind.Utc),
TerminalAtUtc: new DateTime(2026, 5, 20, 10, 2, 30, DateTimeKind.Utc),
SourceInstanceId: "Plant.Pump42",
SourceScript: "ScriptActor:OnTick");
SourceScript: "ScriptActor:OnTick",
SourceNode: null);
var store = new Mock<IOperationTrackingStore>();
store

View File

@@ -59,6 +59,7 @@ public class OperationTrackingStoreTests
"TrackedOperationId", "Kind", "TargetSummary", "Status",
"RetryCount", "LastError", "HttpStatus", "CreatedAtUtc",
"UpdatedAtUtc", "TerminalAtUtc", "SourceInstanceId", "SourceScript",
"SourceNode",
};
Assert.Equal(
expected.OrderBy(n => n),
@@ -70,6 +71,159 @@ public class OperationTrackingStoreTests
}
}
[Fact]
public void Initialize_creates_OperationTracking_with_SourceNode_column()
{
var (store, dataSource) = CreateStore(nameof(Initialize_creates_OperationTracking_with_SourceNode_column));
using (store)
{
using var connection = OpenVerifierConnection(dataSource);
Assert.True(
ColumnExists(connection, "SourceNode"),
"Fresh OperationTracking schema must include the SourceNode column.");
}
}
/// <summary>
/// The pre-SourceNode <c>OperationTracking</c> schema — the 12-column
/// CREATE TABLE that has the original source-provenance columns
/// (<c>SourceInstanceId</c>, <c>SourceScript</c>) but is WITHOUT
/// <c>SourceNode</c>. A deployment that ran before the SourceNode
/// stamping work already has an on-disk <c>tracking.db</c> in exactly
/// this shape, and <c>CREATE TABLE IF NOT EXISTS</c> is a no-op against it.
/// </summary>
private const string OldPreSourceNodeSchema = """
CREATE TABLE IF NOT EXISTS OperationTracking (
TrackedOperationId TEXT NOT NULL PRIMARY KEY,
Kind TEXT NOT NULL,
TargetSummary TEXT NULL,
Status TEXT NOT NULL,
RetryCount INTEGER NOT NULL DEFAULT 0,
LastError TEXT NULL,
HttpStatus INTEGER NULL,
CreatedAtUtc TEXT NOT NULL,
UpdatedAtUtc TEXT NOT NULL,
TerminalAtUtc TEXT NULL,
SourceInstanceId TEXT NULL,
SourceScript TEXT NULL
);
CREATE INDEX IF NOT EXISTS IX_OperationTracking_Status_Updated
ON OperationTracking (Status, UpdatedAtUtc);
""";
private static SqliteConnection SeedPreSourceNodeSchemaDatabase(string dataSource)
{
var connection = new SqliteConnection($"Data Source={dataSource};Cache=Shared");
connection.Open();
using var cmd = connection.CreateCommand();
cmd.CommandText = OldPreSourceNodeSchema;
cmd.ExecuteNonQuery();
return connection;
}
private static bool ColumnExists(SqliteConnection connection, string columnName)
{
using var cmd = connection.CreateCommand();
cmd.CommandText = "SELECT COUNT(*) FROM pragma_table_info('OperationTracking') WHERE name = $name";
cmd.Parameters.AddWithValue("$name", columnName);
return Convert.ToInt32(cmd.ExecuteScalar()) > 0;
}
private static OperationTrackingStore CreateStoreOver(string dataSource)
{
var connectionString = $"Data Source={dataSource};Cache=Shared";
var options = new OperationTrackingOptions { ConnectionString = connectionString };
return new OperationTrackingStore(
Options.Create(options),
NullLogger<OperationTrackingStore>.Instance);
}
[Fact]
public async Task Initialize_adds_SourceNode_to_pre_existing_schema()
{
var dataSource = $"file:{nameof(Initialize_adds_SourceNode_to_pre_existing_schema)}-{Guid.NewGuid():N}?mode=memory&cache=shared";
// A pre-SourceNode deployment: tracking.db already exists with the
// 12-column schema and NO SourceNode column.
using var seedConnection = SeedPreSourceNodeSchemaDatabase(dataSource);
Assert.True(ColumnExists(seedConnection, "SourceInstanceId"));
Assert.True(ColumnExists(seedConnection, "SourceScript"));
Assert.False(ColumnExists(seedConnection, "SourceNode"));
// Upgrade: a post-branch OperationTrackingStore opens the same database.
// Its InitializeSchema must ALTER the missing SourceNode column in —
// the CREATE TABLE IF NOT EXISTS alone is a no-op against the existing
// table.
await using (var store = CreateStoreOver(dataSource))
{
Assert.True(
ColumnExists(seedConnection, "SourceNode"),
"OperationTrackingStore must ALTER the SourceNode column into a pre-existing OperationTracking table.");
// A RecordEnqueueAsync binding $sourceNode must now succeed; without
// the ALTER it would fail with "no such column: SourceNode".
var id = TrackedOperationId.New();
await store.RecordEnqueueAsync(
id,
kind: "ApiCallCached",
targetSummary: "ERP.GetOrder",
sourceInstanceId: "inst-1",
sourceScript: "ScriptActor:OnTick",
sourceNode: "node-a");
var snapshot = await store.GetStatusAsync(id);
Assert.NotNull(snapshot);
Assert.Equal("node-a", snapshot!.SourceNode);
}
// Idempotency: a second store over the now-upgraded DB must not error
// (the probe sees SourceNode already present and skips the ALTER).
await using (var storeAgain = CreateStoreOver(dataSource))
{
Assert.True(ColumnExists(seedConnection, "SourceNode"));
}
}
[Fact]
public async Task RecordEnqueueAsync_persists_SourceNode()
{
var (store, _) = CreateStore(nameof(RecordEnqueueAsync_persists_SourceNode));
await using var _store = store;
var id = TrackedOperationId.New();
await store.RecordEnqueueAsync(
id,
kind: nameof(AuditKind.ApiCallCached),
targetSummary: "ERP.GetOrder",
sourceInstanceId: "Plant.Pump42",
sourceScript: "ScriptActor:OnTick",
sourceNode: "node-a");
var snapshot = await store.GetStatusAsync(id);
Assert.NotNull(snapshot);
Assert.Equal("node-a", snapshot!.SourceNode);
}
[Fact]
public async Task RecordEnqueueAsync_persists_null_SourceNode()
{
var (store, _) = CreateStore(nameof(RecordEnqueueAsync_persists_null_SourceNode));
await using var _store = store;
var id = TrackedOperationId.New();
await store.RecordEnqueueAsync(
id,
kind: nameof(AuditKind.ApiCallCached),
targetSummary: "ERP.GetOrder",
sourceInstanceId: null,
sourceScript: null,
sourceNode: null);
var snapshot = await store.GetStatusAsync(id);
Assert.NotNull(snapshot);
Assert.Null(snapshot!.SourceNode);
}
[Fact]
public async Task RecordEnqueueAsync_InsertsSubmittedRow_WithRetryCountZero()
{
@@ -82,7 +236,8 @@ public class OperationTrackingStoreTests
kind: nameof(AuditKind.ApiCallCached),
targetSummary: "ERP.GetOrder",
sourceInstanceId: "Plant.Pump42",
sourceScript: "ScriptActor:OnTick");
sourceScript: "ScriptActor:OnTick",
sourceNode: null);
var snapshot = await store.GetStatusAsync(id);
Assert.NotNull(snapshot);
@@ -107,8 +262,8 @@ public class OperationTrackingStoreTests
await using var _store = store;
var id = TrackedOperationId.New();
await store.RecordEnqueueAsync(id, "ApiCallCached", "ERP.GetOrder", "Plant.Pump42", "ScriptActor:OnTick");
await store.RecordEnqueueAsync(id, "ApiCallCached", "OtherTarget", "Other.Instance", "ScriptActor:Other");
await store.RecordEnqueueAsync(id, "ApiCallCached", "ERP.GetOrder", "Plant.Pump42", "ScriptActor:OnTick", sourceNode: null);
await store.RecordEnqueueAsync(id, "ApiCallCached", "OtherTarget", "Other.Instance", "ScriptActor:Other", sourceNode: null);
var snapshot = await store.GetStatusAsync(id);
Assert.NotNull(snapshot);
@@ -127,7 +282,7 @@ public class OperationTrackingStoreTests
await using var _store = store;
var id = TrackedOperationId.New();
await store.RecordEnqueueAsync(id, "ApiCallCached", "ERP.GetOrder", null, null);
await store.RecordEnqueueAsync(id, "ApiCallCached", "ERP.GetOrder", null, null, sourceNode: null);
await store.RecordAttemptAsync(
id,
@@ -155,7 +310,7 @@ public class OperationTrackingStoreTests
await using var _store = store;
var id = TrackedOperationId.New();
await store.RecordEnqueueAsync(id, "ApiCallCached", "ERP.GetOrder", null, null);
await store.RecordEnqueueAsync(id, "ApiCallCached", "ERP.GetOrder", null, null, sourceNode: null);
await store.RecordTerminalAsync(
id,
status: nameof(AuditStatus.Delivered),
@@ -190,7 +345,7 @@ public class OperationTrackingStoreTests
await using var _store = store;
var id = TrackedOperationId.New();
await store.RecordEnqueueAsync(id, "ApiCallCached", "ERP.GetOrder", null, null);
await store.RecordEnqueueAsync(id, "ApiCallCached", "ERP.GetOrder", null, null, sourceNode: null);
var beforeTerminal = DateTime.UtcNow;
await store.RecordTerminalAsync(
@@ -228,7 +383,7 @@ public class OperationTrackingStoreTests
await using var _store = store;
var id = TrackedOperationId.New();
await store.RecordEnqueueAsync(id, "ApiCallCached", "ERP.GetOrder", null, null);
await store.RecordEnqueueAsync(id, "ApiCallCached", "ERP.GetOrder", null, null, sourceNode: null);
await store.RecordAttemptAsync(id, nameof(AuditStatus.Attempted), 1, "first failure", 503);
await store.RecordAttemptAsync(id, nameof(AuditStatus.Attempted), 2, "second failure", 503);
await store.RecordAttemptAsync(id, nameof(AuditStatus.Attempted), 3, "third failure", 504);
@@ -254,9 +409,9 @@ public class OperationTrackingStoreTests
var bId = TrackedOperationId.New();
var cId = TrackedOperationId.New();
await store.RecordEnqueueAsync(aId, "ApiCallCached", "A", null, null);
await store.RecordEnqueueAsync(bId, "ApiCallCached", "B", null, null);
await store.RecordEnqueueAsync(cId, "ApiCallCached", "C", null, null);
await store.RecordEnqueueAsync(aId, "ApiCallCached", "A", null, null, sourceNode: null);
await store.RecordEnqueueAsync(bId, "ApiCallCached", "B", null, null, sourceNode: null);
await store.RecordEnqueueAsync(cId, "ApiCallCached", "C", null, null, sourceNode: null);
await store.RecordTerminalAsync(aId, nameof(AuditStatus.Delivered), null, 200);
await store.RecordTerminalAsync(bId, nameof(AuditStatus.Delivered), null, 200);