diff --git a/src/ScadaLink.AuditLog/Site/Telemetry/CachedCallLifecycleBridge.cs b/src/ScadaLink.AuditLog/Site/Telemetry/CachedCallLifecycleBridge.cs index cc6a975..258bbaa 100644 --- a/src/ScadaLink.AuditLog/Site/Telemetry/CachedCallLifecycleBridge.cs +++ b/src/ScadaLink.AuditLog/Site/Telemetry/CachedCallLifecycleBridge.cs @@ -133,9 +133,17 @@ public sealed class CachedCallLifecycleBridge : ICachedCallLifecycleObserver Channel = channel, Kind = kind, CorrelationId = context.TrackedOperationId.Value, + // Audit Log #23 (ExecutionId Task 4): the originating script + // execution's per-run correlation id, threaded through the S&F + // buffer; null on rows buffered before Task 4 (back-compat). + ExecutionId = context.ExecutionId, SourceSiteId = string.IsNullOrEmpty(context.SourceSite) ? null : context.SourceSite, SourceInstanceId = context.SourceInstanceId, - SourceScript = null, // Not threaded through S&F; left null on retry-loop rows. + // Audit Log #23 (ExecutionId Task 4): SourceScript is now + // threaded through the S&F buffer alongside ExecutionId — the + // retry-loop cached rows carry the same provenance the + // script-side cached rows do. Null on pre-Task-4 buffered rows. + SourceScript = context.SourceScript, Target = context.Target, Status = status, HttpStatus = httpStatus, diff --git a/src/ScadaLink.Commons/Interfaces/Services/ICachedCallLifecycleObserver.cs b/src/ScadaLink.Commons/Interfaces/Services/ICachedCallLifecycleObserver.cs index 8e34cb4..6188f0c 100644 --- a/src/ScadaLink.Commons/Interfaces/Services/ICachedCallLifecycleObserver.cs +++ b/src/ScadaLink.Commons/Interfaces/Services/ICachedCallLifecycleObserver.cs @@ -57,6 +57,20 @@ public interface ICachedCallLifecycleObserver /// When this attempt completed. /// Duration of the attempt in milliseconds (null when not measured). /// Originating instance, when known. +/// +/// Audit Log #23 (ExecutionId Task 4): the originating script execution's +/// per-run correlation id, threaded through the store-and-forward buffer from +/// the cached-call enqueue path. The audit bridge stamps it onto the +/// retry-loop ApiCallCached/DbWriteCached Attempted and +/// CachedResolve rows so they correlate with the rest of the run. +/// null for rows buffered before Task 4 (back-compat). +/// +/// +/// Audit Log #23 (ExecutionId Task 4): the originating script identifier, +/// threaded alongside so the retry-loop audit +/// rows carry the same SourceScript provenance the script-side cached +/// rows already do. null when not known. +/// public sealed record CachedCallAttemptContext( TrackedOperationId TrackedOperationId, string Channel, @@ -69,7 +83,9 @@ public sealed record CachedCallAttemptContext( DateTime CreatedAtUtc, DateTime OccurredAtUtc, int? DurationMs, - string? SourceInstanceId); + string? SourceInstanceId, + Guid? ExecutionId = null, + string? SourceScript = null); /// /// Coarse outcome of one cached-call delivery attempt, observed from inside diff --git a/src/ScadaLink.Commons/Interfaces/Services/IDatabaseGateway.cs b/src/ScadaLink.Commons/Interfaces/Services/IDatabaseGateway.cs index 60defe9..0271bec 100644 --- a/src/ScadaLink.Commons/Interfaces/Services/IDatabaseGateway.cs +++ b/src/ScadaLink.Commons/Interfaces/Services/IDatabaseGateway.cs @@ -29,11 +29,24 @@ public interface IDatabaseGateway /// null — when omitted the S&F engine mints a fresh GUID and no /// M3 telemetry is correlated (pre-M3 caller behaviour). /// + /// + /// Audit Log #23 (ExecutionId Task 4): the originating script execution's + /// per-run correlation id. When the write is buffered on a transient + /// failure this is threaded onto the S&F message so the retry-loop + /// cached-write audit rows carry it. null when not threaded. + /// + /// + /// Audit Log #23 (ExecutionId Task 4): the originating script identifier, + /// threaded onto the buffered S&F message alongside + /// . null when not known. + /// Task CachedWriteAsync( string connectionName, string sql, IReadOnlyDictionary? parameters = null, string? originInstanceName = null, CancellationToken cancellationToken = default, - TrackedOperationId? trackedOperationId = null); + TrackedOperationId? trackedOperationId = null, + Guid? executionId = null, + string? sourceScript = null); } diff --git a/src/ScadaLink.Commons/Interfaces/Services/IExternalSystemClient.cs b/src/ScadaLink.Commons/Interfaces/Services/IExternalSystemClient.cs index 627a81d..d4f855c 100644 --- a/src/ScadaLink.Commons/Interfaces/Services/IExternalSystemClient.cs +++ b/src/ScadaLink.Commons/Interfaces/Services/IExternalSystemClient.cs @@ -30,13 +30,26 @@ public interface IExternalSystemClient /// M3 telemetry is correlated (the legacy behaviour pre-M3 callers rely /// on). /// + /// + /// Audit Log #23 (ExecutionId Task 4): the originating script execution's + /// per-run correlation id. When the call is buffered on a transient + /// failure this is threaded onto the S&F message so the retry-loop + /// cached-call audit rows carry it. null when not threaded. + /// + /// + /// Audit Log #23 (ExecutionId Task 4): the originating script identifier, + /// threaded onto the buffered S&F message alongside + /// . null when not known. + /// Task CachedCallAsync( string systemName, string methodName, IReadOnlyDictionary? parameters = null, string? originInstanceName = null, CancellationToken cancellationToken = default, - TrackedOperationId? trackedOperationId = null); + TrackedOperationId? trackedOperationId = null, + Guid? executionId = null, + string? sourceScript = null); } /// diff --git a/src/ScadaLink.ExternalSystemGateway/DatabaseGateway.cs b/src/ScadaLink.ExternalSystemGateway/DatabaseGateway.cs index 7e48288..efa29c4 100644 --- a/src/ScadaLink.ExternalSystemGateway/DatabaseGateway.cs +++ b/src/ScadaLink.ExternalSystemGateway/DatabaseGateway.cs @@ -84,7 +84,9 @@ public class DatabaseGateway : IDatabaseGateway IReadOnlyDictionary? parameters = null, string? originInstanceName = null, CancellationToken cancellationToken = default, - TrackedOperationId? trackedOperationId = null) + TrackedOperationId? trackedOperationId = null, + Guid? executionId = null, + string? sourceScript = null) { var definition = await ResolveConnectionAsync(connectionName, cancellationToken); if (definition == null) @@ -124,7 +126,13 @@ public class DatabaseGateway : IDatabaseGateway // read it back via StoreAndForwardMessage.Id and emit per-attempt + // terminal cached-write telemetry. Null -> S&F mints its own GUID // (legacy pre-M3 behaviour). - messageId: trackedOperationId?.ToString()); + messageId: trackedOperationId?.ToString(), + // Audit Log #23 (ExecutionId Task 4): thread the originating script + // execution's ExecutionId + SourceScript onto the buffered row so + // the retry-loop cached-write audit rows carry the same provenance + // the script-side cached rows do. + executionId: executionId, + sourceScript: sourceScript); } /// diff --git a/src/ScadaLink.ExternalSystemGateway/ExternalSystemClient.cs b/src/ScadaLink.ExternalSystemGateway/ExternalSystemClient.cs index 0b88de6..dfe042b 100644 --- a/src/ScadaLink.ExternalSystemGateway/ExternalSystemClient.cs +++ b/src/ScadaLink.ExternalSystemGateway/ExternalSystemClient.cs @@ -86,7 +86,9 @@ public class ExternalSystemClient : IExternalSystemClient IReadOnlyDictionary? parameters = null, string? originInstanceName = null, CancellationToken cancellationToken = default, - TrackedOperationId? trackedOperationId = null) + TrackedOperationId? trackedOperationId = null, + Guid? executionId = null, + string? sourceScript = null) { var (system, method) = await ResolveSystemAndMethodAsync(systemName, methodName, cancellationToken); if (system == null || method == null) @@ -144,7 +146,13 @@ public class ExternalSystemClient : IExternalSystemClient // StoreAndForwardMessage.Id and emit per-attempt + terminal // cached-call telemetry (Bundle E Tasks E4/E5). Null -> S&F // mints its own GUID (legacy pre-M3 behaviour). - messageId: trackedOperationId?.ToString()); + messageId: trackedOperationId?.ToString(), + // Audit Log #23 (ExecutionId Task 4): thread the originating + // script execution's ExecutionId + SourceScript onto the + // buffered row so the retry-loop cached-call audit rows carry + // the same provenance the script-side cached rows do. + executionId: executionId, + sourceScript: sourceScript); return new ExternalCallResult(true, null, null, WasBuffered: true); } diff --git a/src/ScadaLink.SiteRuntime/Scripts/ScriptRuntimeContext.cs b/src/ScadaLink.SiteRuntime/Scripts/ScriptRuntimeContext.cs index bfdbaa4..1d085f3 100644 --- a/src/ScadaLink.SiteRuntime/Scripts/ScriptRuntimeContext.cs +++ b/src/ScadaLink.SiteRuntime/Scripts/ScriptRuntimeContext.cs @@ -513,7 +513,12 @@ public class ScriptRuntimeContext parameters, _instanceName, cancellationToken, - trackedId).ConfigureAwait(false); + trackedId, + // Audit Log #23 (ExecutionId Task 4): thread the script + // execution's ExecutionId + SourceScript so a buffered + // cached call's retry-loop audit rows carry them. + executionId: _executionId, + sourceScript: _sourceScript).ConfigureAwait(false); } catch (Exception ex) { @@ -1096,7 +1101,12 @@ public class ScriptRuntimeContext try { await _gateway.CachedWriteAsync( - name, sql, parameters, _instanceName, cancellationToken, trackedId) + name, sql, parameters, _instanceName, cancellationToken, trackedId, + // Audit Log #23 (ExecutionId Task 4): thread the script + // execution's ExecutionId + SourceScript so a buffered + // cached write's retry-loop audit rows carry them. + executionId: _executionId, + sourceScript: _sourceScript) .ConfigureAwait(false); } catch (Exception ex) diff --git a/src/ScadaLink.StoreAndForward/StoreAndForwardMessage.cs b/src/ScadaLink.StoreAndForward/StoreAndForwardMessage.cs index 5695ed2..3e799ac 100644 --- a/src/ScadaLink.StoreAndForward/StoreAndForwardMessage.cs +++ b/src/ScadaLink.StoreAndForward/StoreAndForwardMessage.cs @@ -55,4 +55,25 @@ public class StoreAndForwardMessage /// WP-13: Messages are NOT cleared when instance is deleted. /// public string? OriginInstanceName { get; set; } + + /// + /// Audit Log #23 (ExecutionId Task 4): the originating script execution's + /// per-run correlation id, threaded from ScriptRuntimeContext through + /// the cached-call enqueue path. Carried so the store-and-forward retry loop + /// can stamp it onto the per-attempt / terminal cached-call audit rows + /// (ApiCallCached/DbWriteCached Attempted, CachedResolve). + /// null for non-cached-call categories (notifications) and for rows + /// buffered before this field existed — back-compat with old persisted rows + /// (the column is added by an additive migration and read as null when absent). + /// + public Guid? ExecutionId { get; set; } + + /// + /// Audit Log #23 (ExecutionId Task 4): the originating script identifier, + /// threaded alongside from the cached-call enqueue + /// path so the retry-loop audit rows carry the same SourceScript + /// provenance the script-side cached rows already carry. null when not + /// known (non-cached categories, pre-migration rows). + /// + public string? SourceScript { get; set; } } diff --git a/src/ScadaLink.StoreAndForward/StoreAndForwardService.cs b/src/ScadaLink.StoreAndForward/StoreAndForwardService.cs index 70f6c23..fe27528 100644 --- a/src/ScadaLink.StoreAndForward/StoreAndForwardService.cs +++ b/src/ScadaLink.StoreAndForward/StoreAndForwardService.cs @@ -175,6 +175,18 @@ public class StoreAndForwardService /// it is the buffered row's , it is carried /// inside the payload, and it is the id the forwarder submits to central. /// + /// + /// Audit Log #23 (ExecutionId Task 4): the originating script execution's + /// per-run correlation id. Threaded onto the buffered row so the retry-loop + /// cached-call audit rows carry it. null for callers (notifications, + /// pre-Task-4 callers) that do not supply one. + /// + /// + /// Audit Log #23 (ExecutionId Task 4): the originating script identifier, + /// threaded onto the buffered row alongside + /// so the retry-loop audit rows carry the same provenance the script-side + /// cached rows do. null when not known. + /// public async Task EnqueueAsync( StoreAndForwardCategory category, string target, @@ -183,7 +195,9 @@ public class StoreAndForwardService int? maxRetries = null, TimeSpan? retryInterval = null, bool attemptImmediateDelivery = true, - string? messageId = null) + string? messageId = null, + Guid? executionId = null, + string? sourceScript = null) { var message = new StoreAndForwardMessage { @@ -196,7 +210,9 @@ public class StoreAndForwardService RetryIntervalMs = (long)(retryInterval ?? _options.DefaultRetryInterval).TotalMilliseconds, CreatedAt = DateTimeOffset.UtcNow, Status = StoreAndForwardMessageStatus.Pending, - OriginInstanceName = originInstanceName + OriginInstanceName = originInstanceName, + ExecutionId = executionId, + SourceScript = sourceScript }; // Attempt immediate delivery — unless the caller has already made a @@ -492,7 +508,14 @@ public class StoreAndForwardService CreatedAtUtc: message.CreatedAt.UtcDateTime, OccurredAtUtc: DateTime.SpecifyKind(occurredAtUtc, DateTimeKind.Utc), DurationMs: durationMs, - SourceInstanceId: message.OriginInstanceName); + SourceInstanceId: message.OriginInstanceName, + // Audit Log #23 (ExecutionId Task 4): the buffered message + // carries the originating script execution's ExecutionId + + // SourceScript; surface them on the context so the bridge can + // stamp the retry-loop cached audit rows. Null on rows buffered + // before Task 4 (back-compat). + ExecutionId: message.ExecutionId, + SourceScript: message.SourceScript); } catch (Exception buildEx) { diff --git a/src/ScadaLink.StoreAndForward/StoreAndForwardStorage.cs b/src/ScadaLink.StoreAndForward/StoreAndForwardStorage.cs index 7343f9b..7b92655 100644 --- a/src/ScadaLink.StoreAndForward/StoreAndForwardStorage.cs +++ b/src/ScadaLink.StoreAndForward/StoreAndForwardStorage.cs @@ -65,9 +65,45 @@ public class StoreAndForwardStorage "; await command.ExecuteNonQueryAsync(); + // Audit Log #23 (ExecutionId Task 4): additively add the execution_id / + // source_script columns. CREATE TABLE IF NOT EXISTS above does NOT add + // columns to a table that already exists from before these fields, so a + // databases created by an older build needs the columns ALTER-ed in. + // SQLite has no "ADD COLUMN IF NOT EXISTS"; the column presence is + // probed first and the ALTER skipped when already there. Both columns + // are nullable with no default, so any row buffered before this + // migration reads back ExecutionId/SourceScript = null (back-compat). + await AddColumnIfMissingAsync(connection, "execution_id", "TEXT"); + await AddColumnIfMissingAsync(connection, "source_script", "TEXT"); + _logger.LogInformation("Store-and-forward SQLite storage initialized"); } + /// + /// Audit Log #23 (ExecutionId Task 4): adds a column to sf_messages + /// only when it is not already present. SQLite lacks ADD COLUMN IF NOT + /// EXISTS, so the schema is probed via PRAGMA table_info first. + /// Idempotent — safe to run on every . + /// + private static async Task AddColumnIfMissingAsync( + SqliteConnection connection, string columnName, string columnType) + { + await using var probe = connection.CreateCommand(); + probe.CommandText = "SELECT COUNT(*) FROM pragma_table_info('sf_messages') WHERE name = @name"; + probe.Parameters.AddWithValue("@name", columnName); + var exists = Convert.ToInt32(await probe.ExecuteScalarAsync()) > 0; + if (exists) + { + return; + } + + await using var alter = connection.CreateCommand(); + // Column name + type are caller-controlled constants, never user input — + // safe to interpolate (parameters are not permitted in DDL). + alter.CommandText = $"ALTER TABLE sf_messages ADD COLUMN {columnName} {columnType}"; + await alter.ExecuteNonQueryAsync(); + } + /// /// Ensures the directory for a file-backed SQLite database exists. SQLite creates /// the database file on demand but not its parent directory, so a configured path @@ -105,9 +141,11 @@ public class StoreAndForwardStorage await using var cmd = connection.CreateCommand(); cmd.CommandText = @" INSERT INTO sf_messages (id, category, target, payload_json, retry_count, max_retries, - retry_interval_ms, created_at, last_attempt_at, status, last_error, origin_instance) + retry_interval_ms, created_at, last_attempt_at, status, last_error, + origin_instance, execution_id, source_script) VALUES (@id, @category, @target, @payload, @retryCount, @maxRetries, - @retryIntervalMs, @createdAt, @lastAttempt, @status, @lastError, @origin)"; + @retryIntervalMs, @createdAt, @lastAttempt, @status, @lastError, + @origin, @executionId, @sourceScript)"; cmd.Parameters.AddWithValue("@id", message.Id); cmd.Parameters.AddWithValue("@category", (int)message.Category); @@ -122,6 +160,12 @@ public class StoreAndForwardStorage cmd.Parameters.AddWithValue("@status", (int)message.Status); cmd.Parameters.AddWithValue("@lastError", (object?)message.LastError ?? DBNull.Value); cmd.Parameters.AddWithValue("@origin", (object?)message.OriginInstanceName ?? DBNull.Value); + // Audit Log #23 (ExecutionId Task 4): the execution id is stored as its + // canonical string form ("D") so it round-trips cleanly through the + // TEXT column; null when not a cached call / not threaded. + cmd.Parameters.AddWithValue("@executionId", + message.ExecutionId.HasValue ? message.ExecutionId.Value.ToString("D") : DBNull.Value); + cmd.Parameters.AddWithValue("@sourceScript", (object?)message.SourceScript ?? DBNull.Value); await cmd.ExecuteNonQueryAsync(); } @@ -137,7 +181,8 @@ public class StoreAndForwardStorage await using var cmd = connection.CreateCommand(); cmd.CommandText = @" SELECT id, category, target, payload_json, retry_count, max_retries, - retry_interval_ms, created_at, last_attempt_at, status, last_error, origin_instance + retry_interval_ms, created_at, last_attempt_at, status, last_error, origin_instance, + execution_id, source_script FROM sf_messages WHERE status = @pending AND (last_attempt_at IS NULL @@ -268,7 +313,8 @@ public class StoreAndForwardStorage var categoryFilter = category.HasValue ? " AND category = @category" : ""; pageCmd.CommandText = $@" SELECT id, category, target, payload_json, retry_count, max_retries, - retry_interval_ms, created_at, last_attempt_at, status, last_error, origin_instance + retry_interval_ms, created_at, last_attempt_at, status, last_error, origin_instance, + execution_id, source_script FROM sf_messages WHERE status = @parked{categoryFilter} ORDER BY created_at ASC @@ -389,7 +435,8 @@ public class StoreAndForwardStorage await using var cmd = connection.CreateCommand(); cmd.CommandText = @" SELECT id, category, target, payload_json, retry_count, max_retries, - retry_interval_ms, created_at, last_attempt_at, status, last_error, origin_instance + retry_interval_ms, created_at, last_attempt_at, status, last_error, origin_instance, + execution_id, source_script FROM sf_messages WHERE id = @id"; cmd.Parameters.AddWithValue("@id", messageId); @@ -446,7 +493,12 @@ public class StoreAndForwardStorage LastAttemptAt = reader.IsDBNull(8) ? null : DateTimeOffset.Parse(reader.GetString(8)), Status = (StoreAndForwardMessageStatus)reader.GetInt32(9), LastError = reader.IsDBNull(10) ? null : reader.GetString(10), - OriginInstanceName = reader.IsDBNull(11) ? null : reader.GetString(11) + OriginInstanceName = reader.IsDBNull(11) ? null : reader.GetString(11), + // Audit Log #23 (ExecutionId Task 4): rows persisted before the + // additive migration have no execution_id / source_script value; + // IsDBNull guards keep those reading back as null (back-compat). + ExecutionId = reader.IsDBNull(12) ? null : Guid.Parse(reader.GetString(12)), + SourceScript = reader.IsDBNull(13) ? null : reader.GetString(13) }); } return results; diff --git a/tests/ScadaLink.AuditLog.Tests/Site/Telemetry/CachedCallLifecycleBridgeTests.cs b/tests/ScadaLink.AuditLog.Tests/Site/Telemetry/CachedCallLifecycleBridgeTests.cs index 97cbb84..1438cfe 100644 --- a/tests/ScadaLink.AuditLog.Tests/Site/Telemetry/CachedCallLifecycleBridgeTests.cs +++ b/tests/ScadaLink.AuditLog.Tests/Site/Telemetry/CachedCallLifecycleBridgeTests.cs @@ -31,7 +31,9 @@ public class CachedCallLifecycleBridgeTests string channel = "ApiOutbound", int retryCount = 1, string? lastError = null, - int? httpStatus = null) => + int? httpStatus = null, + Guid? executionId = null, + string? sourceScript = null) => new( TrackedOperationId: _id, Channel: channel, @@ -44,7 +46,9 @@ public class CachedCallLifecycleBridgeTests CreatedAtUtc: new DateTime(2026, 5, 20, 9, 0, 0, DateTimeKind.Utc), OccurredAtUtc: new DateTime(2026, 5, 20, 10, 0, 0, DateTimeKind.Utc), DurationMs: 42, - SourceInstanceId: "Plant.Pump42"); + SourceInstanceId: "Plant.Pump42", + ExecutionId: executionId, + SourceScript: sourceScript); [Fact] public async Task TransientFailure_EmitsOneAttemptedRow_NoResolve() @@ -184,4 +188,75 @@ public class CachedCallLifecycleBridgeTests Assert.Equal(42, captured.Audit.DurationMs); Assert.Equal(_id.Value, captured.Audit.CorrelationId); } + + // ── Audit Log #23 (ExecutionId Task 4): ExecutionId / SourceScript ── + + [Fact] + public async Task RetryLoopAttemptedRow_CarriesExecutionIdAndSourceScript_FromContext() + { + // Task 4: the ExecutionId + SourceScript threaded through the S&F + // buffer arrive on the CachedCallAttemptContext; the bridge must stamp + // both onto the per-attempt ApiCallCached row (previously SourceScript + // was hard-coded null with a "not threaded through S&F" comment). + var executionId = Guid.NewGuid(); + var captured = new List(); + _forwarder.ForwardAsync(Arg.Do(t => captured.Add(t)), Arg.Any()) + .Returns(Task.CompletedTask); + + var sut = CreateSut(); + await sut.OnAttemptCompletedAsync(Ctx( + CachedCallAttemptOutcome.TransientFailure, + executionId: executionId, + sourceScript: "Plant.Pump42/OnTick")); + + var packet = Assert.Single(captured); + Assert.Equal(AuditKind.ApiCallCached, packet.Audit.Kind); + Assert.Equal(executionId, packet.Audit.ExecutionId); + Assert.Equal("Plant.Pump42/OnTick", packet.Audit.SourceScript); + } + + [Fact] + public async Task RetryLoopCachedResolveRow_CarriesExecutionIdAndSourceScript_FromContext() + { + // The terminal CachedResolve row must also carry the threaded + // provenance so the whole retry-loop lifecycle is correlated. + var executionId = Guid.NewGuid(); + var captured = new List(); + _forwarder.ForwardAsync(Arg.Do(t => captured.Add(t)), Arg.Any()) + .Returns(Task.CompletedTask); + + var sut = CreateSut(); + await sut.OnAttemptCompletedAsync(Ctx( + CachedCallAttemptOutcome.Delivered, + channel: "DbOutbound", + executionId: executionId, + sourceScript: "Plant.Tank/OnAlarm")); + + Assert.Equal(2, captured.Count); + var resolve = Assert.Single(captured, p => p.Audit.Kind == AuditKind.CachedResolve); + Assert.Equal(executionId, resolve.Audit.ExecutionId); + Assert.Equal("Plant.Tank/OnAlarm", resolve.Audit.SourceScript); + + var attempted = Assert.Single(captured, p => p.Audit.Kind == AuditKind.DbWriteCached); + Assert.Equal(executionId, attempted.Audit.ExecutionId); + Assert.Equal("Plant.Tank/OnAlarm", attempted.Audit.SourceScript); + } + + [Fact] + public async Task RetryLoopRow_NullExecutionIdAndSourceScript_RemainNull() + { + // Back-compat: a pre-Task-4 buffered row has no ExecutionId / + // SourceScript; the bridge must leave the audit row's fields null + // rather than throwing. + CachedCallTelemetry? captured = null; + _forwarder.ForwardAsync(Arg.Do(t => captured = t), Arg.Any()) + .Returns(Task.CompletedTask); + + var sut = CreateSut(); + await sut.OnAttemptCompletedAsync(Ctx(CachedCallAttemptOutcome.TransientFailure)); + + Assert.NotNull(captured); + Assert.Null(captured!.Audit.ExecutionId); + Assert.Null(captured.Audit.SourceScript); + } } diff --git a/tests/ScadaLink.SiteRuntime.Tests/Scripts/DatabaseCachedWriteEmissionTests.cs b/tests/ScadaLink.SiteRuntime.Tests/Scripts/DatabaseCachedWriteEmissionTests.cs index 082ffcc..d4a9b2f 100644 --- a/tests/ScadaLink.SiteRuntime.Tests/Scripts/DatabaseCachedWriteEmissionTests.cs +++ b/tests/ScadaLink.SiteRuntime.Tests/Scripts/DatabaseCachedWriteEmissionTests.cs @@ -72,7 +72,8 @@ public class DatabaseCachedWriteEmissionTests It.IsAny?>(), InstanceName, It.IsAny(), - It.IsAny())) + It.IsAny(), + It.IsAny(), It.IsAny())) .Returns(Task.CompletedTask); var forwarder = new CapturingForwarder(); @@ -110,7 +111,8 @@ public class DatabaseCachedWriteEmissionTests It.IsAny?>(), It.IsAny(), It.IsAny(), - It.IsAny())) + It.IsAny(), + It.IsAny(), It.IsAny())) .Returns(Task.CompletedTask); var forwarder = new CapturingForwarder(); @@ -134,7 +136,8 @@ public class DatabaseCachedWriteEmissionTests It.IsAny?>(), It.IsAny(), It.IsAny(), - It.IsAny())) + It.IsAny(), + It.IsAny(), It.IsAny())) .Returns(Task.CompletedTask); var forwarder = new CapturingForwarder(); @@ -147,7 +150,8 @@ public class DatabaseCachedWriteEmissionTests It.IsAny?>(), InstanceName, It.IsAny(), - trackedId), + trackedId, + It.IsAny(), It.IsAny()), Times.Once); } @@ -161,7 +165,8 @@ public class DatabaseCachedWriteEmissionTests It.IsAny?>(), It.IsAny(), It.IsAny(), - It.IsAny())) + It.IsAny(), + It.IsAny(), It.IsAny())) .Returns(Task.CompletedTask); var forwarder = new CapturingForwarder { @@ -177,7 +182,8 @@ public class DatabaseCachedWriteEmissionTests It.IsAny?>(), InstanceName, It.IsAny(), - trackedId), + trackedId, + It.IsAny(), It.IsAny()), Times.Once); } } diff --git a/tests/ScadaLink.SiteRuntime.Tests/Scripts/ExternalSystemCachedCallEmissionTests.cs b/tests/ScadaLink.SiteRuntime.Tests/Scripts/ExternalSystemCachedCallEmissionTests.cs index c65d93b..8f7dde9 100644 --- a/tests/ScadaLink.SiteRuntime.Tests/Scripts/ExternalSystemCachedCallEmissionTests.cs +++ b/tests/ScadaLink.SiteRuntime.Tests/Scripts/ExternalSystemCachedCallEmissionTests.cs @@ -75,7 +75,8 @@ public class ExternalSystemCachedCallEmissionTests It.IsAny?>(), InstanceName, It.IsAny(), - It.IsAny())) + It.IsAny(), + It.IsAny(), It.IsAny())) .ReturnsAsync(new ExternalCallResult(true, null, null, WasBuffered: true)); var forwarder = new CapturingForwarder(); @@ -117,7 +118,8 @@ public class ExternalSystemCachedCallEmissionTests It.IsAny?>(), InstanceName, It.IsAny(), - It.IsAny())) + It.IsAny(), + It.IsAny(), It.IsAny())) .ReturnsAsync(new ExternalCallResult(true, "{\"ok\":true}", null, WasBuffered: false)); var forwarder = new CapturingForwarder(); @@ -153,7 +155,8 @@ public class ExternalSystemCachedCallEmissionTests It.IsAny?>(), It.IsAny(), It.IsAny(), - It.IsAny())) + It.IsAny(), + It.IsAny(), It.IsAny())) .ReturnsAsync(new ExternalCallResult(true, null, null, WasBuffered: true)); var forwarder = new CapturingForwarder(); @@ -172,14 +175,16 @@ public class ExternalSystemCachedCallEmissionTests It.IsAny?>(), InstanceName, It.IsAny(), - id1), + id1, + It.IsAny(), It.IsAny()), Times.Once); client.Verify(c => c.CachedCallAsync( "ERP", "GetOrder", It.IsAny?>(), InstanceName, It.IsAny(), - id2), + id2, + It.IsAny(), It.IsAny()), Times.Once); } @@ -193,7 +198,8 @@ public class ExternalSystemCachedCallEmissionTests It.IsAny?>(), It.IsAny(), It.IsAny(), - It.IsAny())) + It.IsAny(), + It.IsAny(), It.IsAny())) .ReturnsAsync(new ExternalCallResult(true, null, null, WasBuffered: true)); var forwarder = new CapturingForwarder { @@ -212,7 +218,8 @@ public class ExternalSystemCachedCallEmissionTests It.IsAny?>(), InstanceName, It.IsAny(), - trackedId), + trackedId, + It.IsAny(), It.IsAny()), Times.Once); } @@ -226,7 +233,8 @@ public class ExternalSystemCachedCallEmissionTests It.IsAny?>(), It.IsAny(), It.IsAny(), - It.IsAny())) + It.IsAny(), + It.IsAny(), It.IsAny())) .ReturnsAsync(new ExternalCallResult(true, null, null, WasBuffered: true)); var forwarder = new CapturingForwarder(); @@ -252,7 +260,8 @@ public class ExternalSystemCachedCallEmissionTests It.IsAny?>(), It.IsAny(), It.IsAny(), - It.IsAny())) + It.IsAny(), + It.IsAny(), It.IsAny())) .ReturnsAsync(new ExternalCallResult(true, null, null, WasBuffered: true)); var helper = CreateHelper(client.Object, forwarder: null); @@ -264,7 +273,8 @@ public class ExternalSystemCachedCallEmissionTests It.IsAny?>(), InstanceName, It.IsAny(), - trackedId), + trackedId, + It.IsAny(), It.IsAny()), Times.Once); } @@ -293,7 +303,8 @@ public class ExternalSystemCachedCallEmissionTests It.IsAny?>(), InstanceName, It.IsAny(), - It.IsAny())) + It.IsAny(), + It.IsAny(), It.IsAny())) // WasBuffered=false — the immediate HTTP attempt succeeded; S&F // is bypassed entirely. .ReturnsAsync(new ExternalCallResult(true, "{\"ok\":true}", null, WasBuffered: false)); @@ -354,7 +365,8 @@ public class ExternalSystemCachedCallEmissionTests It.IsAny?>(), InstanceName, It.IsAny(), - It.IsAny())) + It.IsAny(), + It.IsAny(), It.IsAny())) .ReturnsAsync(new ExternalCallResult( false, null, "Permanent error: HTTP 422 bad payload", WasBuffered: false)); var forwarder = new CapturingForwarder(); @@ -396,7 +408,8 @@ public class ExternalSystemCachedCallEmissionTests It.IsAny?>(), InstanceName, It.IsAny(), - It.IsAny())) + It.IsAny(), + It.IsAny(), It.IsAny())) // S&F took ownership — Attempted + Resolve come from the // CachedCallLifecycleBridge driven by the retry loop, not the helper. .ReturnsAsync(new ExternalCallResult(true, null, null, WasBuffered: true)); diff --git a/tests/ScadaLink.StoreAndForward.Tests/CachedCallAttemptEmissionTests.cs b/tests/ScadaLink.StoreAndForward.Tests/CachedCallAttemptEmissionTests.cs index 05a8233..501bc3b 100644 --- a/tests/ScadaLink.StoreAndForward.Tests/CachedCallAttemptEmissionTests.cs +++ b/tests/ScadaLink.StoreAndForward.Tests/CachedCallAttemptEmissionTests.cs @@ -277,6 +277,86 @@ public class CachedCallAttemptEmissionTests : IAsyncLifetime, IDisposable Assert.Equal(trackedId, _observer.Notifications[1].TrackedOperationId); } + // ── Audit Log #23 (ExecutionId Task 4): ExecutionId / SourceScript ── + + [Fact] + public async Task Attempt_CarriesExecutionIdAndSourceScript_FromBufferedMessage() + { + // A buffered cached call carries the originating script execution's + // ExecutionId + SourceScript. The retry sweep must surface both on the + // CachedCallAttemptContext handed to the observer so the audit bridge + // can stamp them on the retry-loop cached rows. + var executionId = Guid.NewGuid(); + _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, + _ => throw new HttpRequestException("HTTP 503")); + + var trackedId = TrackedOperationId.New(); + await _service.EnqueueAsync( + StoreAndForwardCategory.ExternalSystem, + "ERP", + """{"payload":"x"}""", + originInstanceName: "Plant.Pump42", + maxRetries: 5, + retryInterval: TimeSpan.Zero, + attemptImmediateDelivery: false, + messageId: trackedId.ToString(), + executionId: executionId, + sourceScript: "Plant.Pump42/OnTick"); + + await _service.RetryPendingMessagesAsync(); + + var notification = Assert.Single(_observer.Notifications); + Assert.Equal(executionId, notification.ExecutionId); + Assert.Equal("Plant.Pump42/OnTick", notification.SourceScript); + } + + [Fact] + public async Task Attempt_NullExecutionIdAndSourceScript_SurfaceAsNull() + { + // Back-compat: a row buffered without ExecutionId / SourceScript (legacy + // enqueue path) must surface them as null on the context, not throw. + _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, + _ => Task.FromResult(true)); + var trackedId = await EnqueueBufferedAsync( + StoreAndForwardCategory.ExternalSystem, "ERP"); + + await _service.RetryPendingMessagesAsync(); + + var notification = Assert.Single(_observer.Notifications); + Assert.Null(notification.ExecutionId); + Assert.Null(notification.SourceScript); + } + + [Fact] + public async Task TerminalResolve_CarriesExecutionIdAndSourceScript() + { + // The terminal Delivered notification must also carry the threaded + // provenance so the CachedResolve audit row is correlated. + var executionId = Guid.NewGuid(); + _service.RegisterDeliveryHandler(StoreAndForwardCategory.CachedDbWrite, + _ => Task.FromResult(true)); + + var trackedId = TrackedOperationId.New(); + await _service.EnqueueAsync( + StoreAndForwardCategory.CachedDbWrite, + "myDb", + """{"payload":"x"}""", + originInstanceName: "Plant.Tank", + maxRetries: 3, + retryInterval: TimeSpan.Zero, + attemptImmediateDelivery: false, + messageId: trackedId.ToString(), + executionId: executionId, + sourceScript: "Plant.Tank/OnAlarm"); + + await _service.RetryPendingMessagesAsync(); + + var notification = Assert.Single(_observer.Notifications); + Assert.Equal(CachedCallAttemptOutcome.Delivered, notification.Outcome); + Assert.Equal(executionId, notification.ExecutionId); + Assert.Equal("Plant.Tank/OnAlarm", notification.SourceScript); + } + // ── Best-effort contract: observer throws must NOT corrupt retry bookkeeping ── [Fact] diff --git a/tests/ScadaLink.StoreAndForward.Tests/StoreAndForwardStorageTests.cs b/tests/ScadaLink.StoreAndForward.Tests/StoreAndForwardStorageTests.cs index 13f103f..be5820e 100644 --- a/tests/ScadaLink.StoreAndForward.Tests/StoreAndForwardStorageTests.cs +++ b/tests/ScadaLink.StoreAndForward.Tests/StoreAndForwardStorageTests.cs @@ -293,6 +293,125 @@ public class StoreAndForwardStorageTests : IAsyncLifetime, IDisposable Assert.True(count >= 1); } + // ── Audit Log #23 (ExecutionId Task 4): execution_id / source_script ── + + [Fact] + public async Task EnqueueAsync_RoundTripsExecutionIdAndSourceScript() + { + // A cached call buffered on a transient failure carries the originating + // script execution's ExecutionId + SourceScript; both must survive a + // persist + read-back so the retry loop can stamp them on audit rows. + var executionId = Guid.NewGuid(); + var message = CreateMessage("exec1", StoreAndForwardCategory.ExternalSystem); + message.ExecutionId = executionId; + message.SourceScript = "Plant.Pump42/OnTick"; + + await _storage.EnqueueAsync(message); + + var retrieved = await _storage.GetMessageByIdAsync("exec1"); + Assert.NotNull(retrieved); + Assert.Equal(executionId, retrieved!.ExecutionId); + Assert.Equal("Plant.Pump42/OnTick", retrieved.SourceScript); + } + + [Fact] + public async Task EnqueueAsync_NullExecutionIdAndSourceScript_RoundTripAsNull() + { + // Non-cached-call enqueues (notifications) supply neither field — they + // must round-trip as null rather than throwing or coercing. + var message = CreateMessage("noexec1", StoreAndForwardCategory.Notification); + Assert.Null(message.ExecutionId); + Assert.Null(message.SourceScript); + + await _storage.EnqueueAsync(message); + + var retrieved = await _storage.GetMessageByIdAsync("noexec1"); + Assert.NotNull(retrieved); + Assert.Null(retrieved!.ExecutionId); + Assert.Null(retrieved.SourceScript); + } + + [Fact] + public async Task ExecutionIdAndSourceScript_SurviveRetrySweepRead() + { + // The retry sweep reads due rows via GetMessagesForRetryAsync; the new + // fields must be present on that read path too (it is the path that + // feeds the CachedCallAttemptContext). + var executionId = Guid.NewGuid(); + var message = CreateMessage("sweep1", StoreAndForwardCategory.CachedDbWrite); + message.ExecutionId = executionId; + message.SourceScript = "Plant.Tank/OnAlarm"; + message.LastAttemptAt = null; // due immediately + await _storage.EnqueueAsync(message); + + var due = await _storage.GetMessagesForRetryAsync(); + + var row = Assert.Single(due, m => m.Id == "sweep1"); + Assert.Equal(executionId, row.ExecutionId); + Assert.Equal("Plant.Tank/OnAlarm", row.SourceScript); + } + + [Fact] + public async Task LegacyRowWithoutNewColumns_ReadsBackAsNull() + { + // Back-compat: a row persisted by a build that pre-dates the + // execution_id / source_script columns must still deserialize, with + // ExecutionId / SourceScript reading back as null. Simulate the legacy + // schema by dropping the table and recreating it without the columns, + // inserting directly, then running InitializeAsync (which ALTER-adds + // the columns) and reading the row back. + await using (var setup = new SqliteConnection($"Data Source={_dbName};Mode=Memory;Cache=Shared")) + { + await setup.OpenAsync(); + await using var drop = setup.CreateCommand(); + drop.CommandText = @" + DROP TABLE IF EXISTS sf_messages; + CREATE TABLE sf_messages ( + id TEXT PRIMARY KEY, + category INTEGER NOT NULL, + target TEXT NOT NULL, + payload_json TEXT NOT NULL, + retry_count INTEGER NOT NULL DEFAULT 0, + max_retries INTEGER NOT NULL DEFAULT 50, + retry_interval_ms INTEGER NOT NULL DEFAULT 30000, + created_at TEXT NOT NULL, + last_attempt_at TEXT, + status INTEGER NOT NULL DEFAULT 0, + last_error TEXT, + origin_instance TEXT + ); + INSERT INTO sf_messages (id, category, target, payload_json, created_at, status) + VALUES ('legacy1', 0, 'ERP', '{}', '2026-01-01T00:00:00.0000000+00:00', 0);"; + await drop.ExecuteNonQueryAsync(); + } + + // InitializeAsync must additively ALTER-in the new columns without + // disturbing the pre-existing legacy row. + await _storage.InitializeAsync(); + + var retrieved = await _storage.GetMessageByIdAsync("legacy1"); + Assert.NotNull(retrieved); + Assert.Equal("legacy1", retrieved!.Id); + Assert.Null(retrieved.ExecutionId); + Assert.Null(retrieved.SourceScript); + } + + [Fact] + public async Task InitializeAsync_IsIdempotent_WhenColumnsAlreadyExist() + { + // The additive ALTER must not fail on a second InitializeAsync call + // (SQLite has no ADD COLUMN IF NOT EXISTS — the probe must skip it). + await _storage.InitializeAsync(); + await _storage.InitializeAsync(); + + var message = CreateMessage("idem1", StoreAndForwardCategory.ExternalSystem); + message.ExecutionId = Guid.NewGuid(); + await _storage.EnqueueAsync(message); + var retrieved = await _storage.GetMessageByIdAsync("idem1"); + Assert.NotNull(retrieved); + Assert.Equal(message.ExecutionId, retrieved!.ExecutionId); + } + private static StoreAndForwardMessage CreateMessage(string id, StoreAndForwardCategory category) { return new StoreAndForwardMessage