From 277882d230a9214468de76de727b640c8f5e0336 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 23 May 2026 16:54:48 -0400 Subject: [PATCH] feat(site-runtime): add SourceNode column to OperationTracking + thread through RecordEnqueueAsync --- .../Telemetry/CachedCallTelemetryForwarder.cs | 4 + .../Interfaces/IOperationTrackingStore.cs | 1 + .../Types/TrackingStatusSnapshot.cs | 8 +- .../Tracking/OperationTrackingStore.cs | 58 +++++- .../CachedCallTelemetryForwarderTests.cs | 8 +- .../Scripts/TrackingApiTests.cs | 3 +- .../Tracking/OperationTrackingStoreTests.cs | 175 +++++++++++++++++- 7 files changed, 238 insertions(+), 19 deletions(-) diff --git a/src/ScadaLink.AuditLog/Site/Telemetry/CachedCallTelemetryForwarder.cs b/src/ScadaLink.AuditLog/Site/Telemetry/CachedCallTelemetryForwarder.cs index c2cea5b..dd3958e 100644 --- a/src/ScadaLink.AuditLog/Site/Telemetry/CachedCallTelemetryForwarder.cs +++ b/src/ScadaLink.AuditLog/Site/Telemetry/CachedCallTelemetryForwarder.cs @@ -128,12 +128,16 @@ public sealed class CachedCallTelemetryForwarder : ICachedCallTelemetryForwarder // Enqueue — insert-if-not-exists with the operational // channel as the kind discriminator. RetryCount is fixed // at 0 by the tracking store's INSERT contract. + // sourceNode plumbed through but left null here; stamping + // is wired in a later task (Task 14) once the + // INodeIdentityProvider is threaded into the forwarder. await _trackingStore.RecordEnqueueAsync( telemetry.Operational.TrackedOperationId, telemetry.Operational.Channel, telemetry.Operational.Target, telemetry.Audit.SourceInstanceId, telemetry.Audit.SourceScript, + sourceNode: null, ct).ConfigureAwait(false); break; diff --git a/src/ScadaLink.Commons/Interfaces/IOperationTrackingStore.cs b/src/ScadaLink.Commons/Interfaces/IOperationTrackingStore.cs index add0a8c..b192441 100644 --- a/src/ScadaLink.Commons/Interfaces/IOperationTrackingStore.cs +++ b/src/ScadaLink.Commons/Interfaces/IOperationTrackingStore.cs @@ -40,6 +40,7 @@ public interface IOperationTrackingStore string? targetSummary, string? sourceInstanceId, string? sourceScript, + string? sourceNode, CancellationToken ct = default); /// diff --git a/src/ScadaLink.Commons/Types/TrackingStatusSnapshot.cs b/src/ScadaLink.Commons/Types/TrackingStatusSnapshot.cs index 22136ff..ea9b5a6 100644 --- a/src/ScadaLink.Commons/Types/TrackingStatusSnapshot.cs +++ b/src/ScadaLink.Commons/Types/TrackingStatusSnapshot.cs @@ -25,6 +25,11 @@ namespace ScadaLink.Commons.Types; /// UTC timestamp the row reached a terminal status; null while still active. /// Instance id that issued the cached call, when known. /// Script that issued the cached call, when known. +/// +/// Cluster node that submitted the cached call (e.g. "node-a" / +/// "node-b"), captured at enqueue time. Null on rows persisted before +/// the SourceNode stamping migration; stamping itself is wired in a later task. +/// public sealed record TrackingStatusSnapshot( TrackedOperationId Id, string Kind, @@ -37,4 +42,5 @@ public sealed record TrackingStatusSnapshot( DateTime UpdatedAtUtc, DateTime? TerminalAtUtc, string? SourceInstanceId, - string? SourceScript); + string? SourceScript, + string? SourceNode); diff --git a/src/ScadaLink.SiteRuntime/Tracking/OperationTrackingStore.cs b/src/ScadaLink.SiteRuntime/Tracking/OperationTrackingStore.cs index 8ef0d2f..9d6305f 100644 --- a/src/ScadaLink.SiteRuntime/Tracking/OperationTrackingStore.cs +++ b/src/ScadaLink.SiteRuntime/Tracking/OperationTrackingStore.cs @@ -70,12 +70,57 @@ public class OperationTrackingStore : IOperationTrackingStore, IAsyncDisposable, UpdatedAtUtc TEXT NOT NULL, TerminalAtUtc TEXT NULL, SourceInstanceId TEXT NULL, - SourceScript TEXT NULL + SourceScript TEXT NULL, + SourceNode TEXT NULL ); CREATE INDEX IF NOT EXISTS IX_OperationTracking_Status_Updated ON OperationTracking (Status, UpdatedAtUtc); """; cmd.ExecuteNonQuery(); + + // SourceNode stamping: additively add the SourceNode column. + // CREATE TABLE IF NOT EXISTS above does NOT add columns to an + // OperationTracking table that already exists from a pre-SourceNode + // build, so a tracking.db created by an older build needs the column + // ALTER-ed in. The file is durable across restart/failover by design + // (retention window default 7 days), so without this step every + // RecordEnqueueAsync on an upgraded deployment would bind $sourceNode + // against a missing column and the write would fail. + // SQLite has no "ADD COLUMN IF NOT EXISTS"; the column presence is + // probed first and the ALTER skipped when already there. The column is + // nullable with no default, so any row written before this migration + // reads back SourceNode = null (back-compat). + // + // NOTE: This is the FIRST idempotent column-upgrade in + // OperationTrackingStore — prior schema changes pre-dated any + // production rollout and relied solely on CREATE TABLE IF NOT EXISTS. + // The helper mirrors the SqliteAuditWriter precedent. + AddColumnIfMissing("SourceNode", "TEXT NULL"); + } + + /// + /// Additively adds a column to OperationTracking only when it is not + /// already present. SQLite lacks ADD COLUMN IF NOT EXISTS, so the + /// schema is probed via PRAGMA table_info first. Idempotent — safe + /// to run on every . Mirrors the + /// SqliteAuditWriter.AddColumnIfMissing precedent. + /// + private void AddColumnIfMissing(string columnName, string columnDefinition) + { + using var probe = _connection.CreateCommand(); + probe.CommandText = "SELECT COUNT(*) FROM pragma_table_info('OperationTracking') WHERE name = $name"; + probe.Parameters.AddWithValue("$name", columnName); + var exists = Convert.ToInt32(probe.ExecuteScalar()) > 0; + if (exists) + { + return; + } + + using var alter = _connection.CreateCommand(); + // Column name + definition are caller-controlled constants, never user + // input — safe to interpolate (parameters are not permitted in DDL). + alter.CommandText = $"ALTER TABLE OperationTracking ADD COLUMN {columnName} {columnDefinition}"; + alter.ExecuteNonQuery(); } /// @@ -85,6 +130,7 @@ public class OperationTrackingStore : IOperationTrackingStore, IAsyncDisposable, string? targetSummary, string? sourceInstanceId, string? sourceScript, + string? sourceNode, CancellationToken ct = default) { ArgumentNullException.ThrowIfNull(kind); @@ -104,12 +150,12 @@ public class OperationTrackingStore : IOperationTrackingStore, IAsyncDisposable, TrackedOperationId, Kind, TargetSummary, Status, RetryCount, LastError, HttpStatus, CreatedAtUtc, UpdatedAtUtc, TerminalAtUtc, - SourceInstanceId, SourceScript + SourceInstanceId, SourceScript, SourceNode ) VALUES ( $id, $kind, $targetSummary, $status, 0, NULL, NULL, $now, $now, NULL, - $sourceInstanceId, $sourceScript + $sourceInstanceId, $sourceScript, $sourceNode ); """; cmd.Parameters.AddWithValue("$id", id.ToString()); @@ -119,6 +165,7 @@ public class OperationTrackingStore : IOperationTrackingStore, IAsyncDisposable, cmd.Parameters.AddWithValue("$now", now); cmd.Parameters.AddWithValue("$sourceInstanceId", (object?)sourceInstanceId ?? DBNull.Value); cmd.Parameters.AddWithValue("$sourceScript", (object?)sourceScript ?? DBNull.Value); + cmd.Parameters.AddWithValue("$sourceNode", (object?)sourceNode ?? DBNull.Value); cmd.ExecuteNonQuery(); } @@ -233,7 +280,7 @@ public class OperationTrackingStore : IOperationTrackingStore, IAsyncDisposable, SELECT TrackedOperationId, Kind, TargetSummary, Status, RetryCount, LastError, HttpStatus, CreatedAtUtc, UpdatedAtUtc, TerminalAtUtc, - SourceInstanceId, SourceScript + SourceInstanceId, SourceScript, SourceNode FROM OperationTracking WHERE TrackedOperationId = $id; """; @@ -257,7 +304,8 @@ public class OperationTrackingStore : IOperationTrackingStore, IAsyncDisposable, UpdatedAtUtc: ParseUtc(reader.GetString(8)), TerminalAtUtc: reader.IsDBNull(9) ? null : ParseUtc(reader.GetString(9)), SourceInstanceId: reader.IsDBNull(10) ? null : reader.GetString(10), - SourceScript: reader.IsDBNull(11) ? null : reader.GetString(11)); + SourceScript: reader.IsDBNull(11) ? null : reader.GetString(11), + SourceNode: reader.IsDBNull(12) ? null : reader.GetString(12)); } finally { diff --git a/tests/ScadaLink.AuditLog.Tests/Site/Telemetry/CachedCallTelemetryForwarderTests.cs b/tests/ScadaLink.AuditLog.Tests/Site/Telemetry/CachedCallTelemetryForwarderTests.cs index 1ec3d78..7be51a6 100644 --- a/tests/ScadaLink.AuditLog.Tests/Site/Telemetry/CachedCallTelemetryForwarderTests.cs +++ b/tests/ScadaLink.AuditLog.Tests/Site/Telemetry/CachedCallTelemetryForwarderTests.cs @@ -135,12 +135,14 @@ public class CachedCallTelemetryForwarderTests Arg.Any()); // Tracking row: insert-if-not-exists with kind discriminator. + // sourceNode is null until Task 14 wires the INodeIdentityProvider through. await _tracking.Received(1).RecordEnqueueAsync( _id, "ApiOutbound", "ERP.GetOrder", "inst-1", "ScriptActor:doStuff", + null, Arg.Any()); await _tracking.DidNotReceiveWithAnyArgs().RecordAttemptAsync( default, default!, default, default, default, default); @@ -166,7 +168,7 @@ public class CachedCallTelemetryForwarderTests await _tracking.Received(1).RecordAttemptAsync( _id, "Attempted", 2, "HTTP 503", 503, Arg.Any()); await _tracking.DidNotReceiveWithAnyArgs().RecordEnqueueAsync( - default, default!, default, default, default, default); + default, default!, default, default, default, default, default); await _tracking.DidNotReceiveWithAnyArgs().RecordTerminalAsync( default, default!, default, default, default); } @@ -189,7 +191,7 @@ public class CachedCallTelemetryForwarderTests await _tracking.Received(1).RecordTerminalAsync( _id, "Delivered", null, null, Arg.Any()); await _tracking.DidNotReceiveWithAnyArgs().RecordEnqueueAsync( - default, default!, default, default, default, default); + default, default!, default, default, default, default, default); await _tracking.DidNotReceiveWithAnyArgs().RecordAttemptAsync( default, default!, default, default, default, default); } @@ -213,6 +215,7 @@ public class CachedCallTelemetryForwarderTests Arg.Any(), Arg.Any(), Arg.Any(), + Arg.Any(), Arg.Any()); } @@ -225,6 +228,7 @@ public class CachedCallTelemetryForwarderTests Arg.Any(), Arg.Any(), Arg.Any(), + Arg.Any(), Arg.Any()) .Throws(new InvalidOperationException("sqlite locked")); diff --git a/tests/ScadaLink.SiteRuntime.Tests/Scripts/TrackingApiTests.cs b/tests/ScadaLink.SiteRuntime.Tests/Scripts/TrackingApiTests.cs index a0acd1c..ef8ffcb 100644 --- a/tests/ScadaLink.SiteRuntime.Tests/Scripts/TrackingApiTests.cs +++ b/tests/ScadaLink.SiteRuntime.Tests/Scripts/TrackingApiTests.cs @@ -51,7 +51,8 @@ public class TrackingApiTests UpdatedAtUtc: new DateTime(2026, 5, 20, 10, 2, 30, DateTimeKind.Utc), TerminalAtUtc: new DateTime(2026, 5, 20, 10, 2, 30, DateTimeKind.Utc), SourceInstanceId: "Plant.Pump42", - SourceScript: "ScriptActor:OnTick"); + SourceScript: "ScriptActor:OnTick", + SourceNode: null); var store = new Mock(); store diff --git a/tests/ScadaLink.SiteRuntime.Tests/Tracking/OperationTrackingStoreTests.cs b/tests/ScadaLink.SiteRuntime.Tests/Tracking/OperationTrackingStoreTests.cs index 952d7dc..b302683 100644 --- a/tests/ScadaLink.SiteRuntime.Tests/Tracking/OperationTrackingStoreTests.cs +++ b/tests/ScadaLink.SiteRuntime.Tests/Tracking/OperationTrackingStoreTests.cs @@ -59,6 +59,7 @@ public class OperationTrackingStoreTests "TrackedOperationId", "Kind", "TargetSummary", "Status", "RetryCount", "LastError", "HttpStatus", "CreatedAtUtc", "UpdatedAtUtc", "TerminalAtUtc", "SourceInstanceId", "SourceScript", + "SourceNode", }; Assert.Equal( expected.OrderBy(n => n), @@ -70,6 +71,159 @@ public class OperationTrackingStoreTests } } + [Fact] + public void Initialize_creates_OperationTracking_with_SourceNode_column() + { + var (store, dataSource) = CreateStore(nameof(Initialize_creates_OperationTracking_with_SourceNode_column)); + using (store) + { + using var connection = OpenVerifierConnection(dataSource); + Assert.True( + ColumnExists(connection, "SourceNode"), + "Fresh OperationTracking schema must include the SourceNode column."); + } + } + + /// + /// The pre-SourceNode OperationTracking schema — the 12-column + /// CREATE TABLE that has the original source-provenance columns + /// (SourceInstanceId, SourceScript) but is WITHOUT + /// SourceNode. A deployment that ran before the SourceNode + /// stamping work already has an on-disk tracking.db in exactly + /// this shape, and CREATE TABLE IF NOT EXISTS is a no-op against it. + /// + private const string OldPreSourceNodeSchema = """ + CREATE TABLE IF NOT EXISTS OperationTracking ( + TrackedOperationId TEXT NOT NULL PRIMARY KEY, + Kind TEXT NOT NULL, + TargetSummary TEXT NULL, + Status TEXT NOT NULL, + RetryCount INTEGER NOT NULL DEFAULT 0, + LastError TEXT NULL, + HttpStatus INTEGER NULL, + CreatedAtUtc TEXT NOT NULL, + UpdatedAtUtc TEXT NOT NULL, + TerminalAtUtc TEXT NULL, + SourceInstanceId TEXT NULL, + SourceScript TEXT NULL + ); + CREATE INDEX IF NOT EXISTS IX_OperationTracking_Status_Updated + ON OperationTracking (Status, UpdatedAtUtc); + """; + + private static SqliteConnection SeedPreSourceNodeSchemaDatabase(string dataSource) + { + var connection = new SqliteConnection($"Data Source={dataSource};Cache=Shared"); + connection.Open(); + using var cmd = connection.CreateCommand(); + cmd.CommandText = OldPreSourceNodeSchema; + cmd.ExecuteNonQuery(); + return connection; + } + + private static bool ColumnExists(SqliteConnection connection, string columnName) + { + using var cmd = connection.CreateCommand(); + cmd.CommandText = "SELECT COUNT(*) FROM pragma_table_info('OperationTracking') WHERE name = $name"; + cmd.Parameters.AddWithValue("$name", columnName); + return Convert.ToInt32(cmd.ExecuteScalar()) > 0; + } + + private static OperationTrackingStore CreateStoreOver(string dataSource) + { + var connectionString = $"Data Source={dataSource};Cache=Shared"; + var options = new OperationTrackingOptions { ConnectionString = connectionString }; + return new OperationTrackingStore( + Options.Create(options), + NullLogger.Instance); + } + + [Fact] + public async Task Initialize_adds_SourceNode_to_pre_existing_schema() + { + var dataSource = $"file:{nameof(Initialize_adds_SourceNode_to_pre_existing_schema)}-{Guid.NewGuid():N}?mode=memory&cache=shared"; + + // A pre-SourceNode deployment: tracking.db already exists with the + // 12-column schema and NO SourceNode column. + using var seedConnection = SeedPreSourceNodeSchemaDatabase(dataSource); + Assert.True(ColumnExists(seedConnection, "SourceInstanceId")); + Assert.True(ColumnExists(seedConnection, "SourceScript")); + Assert.False(ColumnExists(seedConnection, "SourceNode")); + + // Upgrade: a post-branch OperationTrackingStore opens the same database. + // Its InitializeSchema must ALTER the missing SourceNode column in — + // the CREATE TABLE IF NOT EXISTS alone is a no-op against the existing + // table. + await using (var store = CreateStoreOver(dataSource)) + { + Assert.True( + ColumnExists(seedConnection, "SourceNode"), + "OperationTrackingStore must ALTER the SourceNode column into a pre-existing OperationTracking table."); + + // A RecordEnqueueAsync binding $sourceNode must now succeed; without + // the ALTER it would fail with "no such column: SourceNode". + var id = TrackedOperationId.New(); + await store.RecordEnqueueAsync( + id, + kind: "ApiCallCached", + targetSummary: "ERP.GetOrder", + sourceInstanceId: "inst-1", + sourceScript: "ScriptActor:OnTick", + sourceNode: "node-a"); + + var snapshot = await store.GetStatusAsync(id); + Assert.NotNull(snapshot); + Assert.Equal("node-a", snapshot!.SourceNode); + } + + // Idempotency: a second store over the now-upgraded DB must not error + // (the probe sees SourceNode already present and skips the ALTER). + await using (var storeAgain = CreateStoreOver(dataSource)) + { + Assert.True(ColumnExists(seedConnection, "SourceNode")); + } + } + + [Fact] + public async Task RecordEnqueueAsync_persists_SourceNode() + { + var (store, _) = CreateStore(nameof(RecordEnqueueAsync_persists_SourceNode)); + await using var _store = store; + + var id = TrackedOperationId.New(); + await store.RecordEnqueueAsync( + id, + kind: nameof(AuditKind.ApiCallCached), + targetSummary: "ERP.GetOrder", + sourceInstanceId: "Plant.Pump42", + sourceScript: "ScriptActor:OnTick", + sourceNode: "node-a"); + + var snapshot = await store.GetStatusAsync(id); + Assert.NotNull(snapshot); + Assert.Equal("node-a", snapshot!.SourceNode); + } + + [Fact] + public async Task RecordEnqueueAsync_persists_null_SourceNode() + { + var (store, _) = CreateStore(nameof(RecordEnqueueAsync_persists_null_SourceNode)); + await using var _store = store; + + var id = TrackedOperationId.New(); + await store.RecordEnqueueAsync( + id, + kind: nameof(AuditKind.ApiCallCached), + targetSummary: "ERP.GetOrder", + sourceInstanceId: null, + sourceScript: null, + sourceNode: null); + + var snapshot = await store.GetStatusAsync(id); + Assert.NotNull(snapshot); + Assert.Null(snapshot!.SourceNode); + } + [Fact] public async Task RecordEnqueueAsync_InsertsSubmittedRow_WithRetryCountZero() { @@ -82,7 +236,8 @@ public class OperationTrackingStoreTests kind: nameof(AuditKind.ApiCallCached), targetSummary: "ERP.GetOrder", sourceInstanceId: "Plant.Pump42", - sourceScript: "ScriptActor:OnTick"); + sourceScript: "ScriptActor:OnTick", + sourceNode: null); var snapshot = await store.GetStatusAsync(id); Assert.NotNull(snapshot); @@ -107,8 +262,8 @@ public class OperationTrackingStoreTests await using var _store = store; var id = TrackedOperationId.New(); - await store.RecordEnqueueAsync(id, "ApiCallCached", "ERP.GetOrder", "Plant.Pump42", "ScriptActor:OnTick"); - await store.RecordEnqueueAsync(id, "ApiCallCached", "OtherTarget", "Other.Instance", "ScriptActor:Other"); + await store.RecordEnqueueAsync(id, "ApiCallCached", "ERP.GetOrder", "Plant.Pump42", "ScriptActor:OnTick", sourceNode: null); + await store.RecordEnqueueAsync(id, "ApiCallCached", "OtherTarget", "Other.Instance", "ScriptActor:Other", sourceNode: null); var snapshot = await store.GetStatusAsync(id); Assert.NotNull(snapshot); @@ -127,7 +282,7 @@ public class OperationTrackingStoreTests await using var _store = store; var id = TrackedOperationId.New(); - await store.RecordEnqueueAsync(id, "ApiCallCached", "ERP.GetOrder", null, null); + await store.RecordEnqueueAsync(id, "ApiCallCached", "ERP.GetOrder", null, null, sourceNode: null); await store.RecordAttemptAsync( id, @@ -155,7 +310,7 @@ public class OperationTrackingStoreTests await using var _store = store; var id = TrackedOperationId.New(); - await store.RecordEnqueueAsync(id, "ApiCallCached", "ERP.GetOrder", null, null); + await store.RecordEnqueueAsync(id, "ApiCallCached", "ERP.GetOrder", null, null, sourceNode: null); await store.RecordTerminalAsync( id, status: nameof(AuditStatus.Delivered), @@ -190,7 +345,7 @@ public class OperationTrackingStoreTests await using var _store = store; var id = TrackedOperationId.New(); - await store.RecordEnqueueAsync(id, "ApiCallCached", "ERP.GetOrder", null, null); + await store.RecordEnqueueAsync(id, "ApiCallCached", "ERP.GetOrder", null, null, sourceNode: null); var beforeTerminal = DateTime.UtcNow; await store.RecordTerminalAsync( @@ -228,7 +383,7 @@ public class OperationTrackingStoreTests await using var _store = store; var id = TrackedOperationId.New(); - await store.RecordEnqueueAsync(id, "ApiCallCached", "ERP.GetOrder", null, null); + await store.RecordEnqueueAsync(id, "ApiCallCached", "ERP.GetOrder", null, null, sourceNode: null); await store.RecordAttemptAsync(id, nameof(AuditStatus.Attempted), 1, "first failure", 503); await store.RecordAttemptAsync(id, nameof(AuditStatus.Attempted), 2, "second failure", 503); await store.RecordAttemptAsync(id, nameof(AuditStatus.Attempted), 3, "third failure", 504); @@ -254,9 +409,9 @@ public class OperationTrackingStoreTests var bId = TrackedOperationId.New(); var cId = TrackedOperationId.New(); - await store.RecordEnqueueAsync(aId, "ApiCallCached", "A", null, null); - await store.RecordEnqueueAsync(bId, "ApiCallCached", "B", null, null); - await store.RecordEnqueueAsync(cId, "ApiCallCached", "C", null, null); + await store.RecordEnqueueAsync(aId, "ApiCallCached", "A", null, null, sourceNode: null); + await store.RecordEnqueueAsync(bId, "ApiCallCached", "B", null, null, sourceNode: null); + await store.RecordEnqueueAsync(cId, "ApiCallCached", "C", null, null, sourceNode: null); await store.RecordTerminalAsync(aId, nameof(AuditStatus.Delivered), null, 200); await store.RecordTerminalAsync(bId, nameof(AuditStatus.Delivered), null, 200);