diff --git a/src/ScadaLink.StoreAndForward/StoreAndForwardStorage.cs b/src/ScadaLink.StoreAndForward/StoreAndForwardStorage.cs index 7b92655..f7564fa 100644 --- a/src/ScadaLink.StoreAndForward/StoreAndForwardStorage.cs +++ b/src/ScadaLink.StoreAndForward/StoreAndForwardStorage.cs @@ -497,10 +497,31 @@ public class StoreAndForwardStorage // Audit Log #23 (ExecutionId Task 4): rows persisted before the // additive migration have no execution_id / source_script value; // IsDBNull guards keep those reading back as null (back-compat). - ExecutionId = reader.IsDBNull(12) ? null : Guid.Parse(reader.GetString(12)), + // Guid.TryParse (not Parse) guards the retry sweep: a corrupt + // non-null execution_id is treated as "no execution id" rather + // than throwing FormatException and aborting the whole sweep. + ExecutionId = ParseExecutionId(reader, 12), SourceScript = reader.IsDBNull(13) ? null : reader.GetString(13) }); } return results; } + + /// + /// Audit Log #23 (ExecutionId Task 4): defensively reads the + /// execution_id column. A null value (legacy pre-migration + /// rows) and a malformed non-null value both yield null — a corrupt + /// id must not throw and abort the retry sweep, which reads many rows. + /// + private static Guid? ParseExecutionId(System.Data.Common.DbDataReader reader, int ordinal) + { + if (reader.IsDBNull(ordinal)) + { + return null; + } + + return Guid.TryParse(reader.GetString(ordinal), out var executionId) + ? executionId + : null; + } } diff --git a/tests/ScadaLink.ExternalSystemGateway.Tests/DatabaseGatewayTests.cs b/tests/ScadaLink.ExternalSystemGateway.Tests/DatabaseGatewayTests.cs index 3e8e796..36fbd9c 100644 --- a/tests/ScadaLink.ExternalSystemGateway.Tests/DatabaseGatewayTests.cs +++ b/tests/ScadaLink.ExternalSystemGateway.Tests/DatabaseGatewayTests.cs @@ -101,15 +101,27 @@ public class DatabaseGatewayTests var gateway = new DatabaseGateway(_repository, NullLogger.Instance, storeAndForward: sf); + // Audit Log #23 (ExecutionId Task 4): a known execution id / source + // script so the gateway -> EnqueueAsync hop can be asserted below. + var executionId = Guid.NewGuid(); + const string sourceScript = "ScriptActor:WriteAudit"; + await gateway.CachedWriteAsync("testDb", "INSERT INTO t VALUES (@v)", - new Dictionary { ["v"] = 1 }); + new Dictionary { ["v"] = 1 }, + executionId: executionId, sourceScript: sourceScript); var depth = await storage.GetBufferDepthByCategoryAsync(); Assert.Equal(1, depth[ScadaLink.Commons.Types.Enums.StoreAndForwardCategory.CachedDbWrite]); - var (maxRetries, retryIntervalMs) = ReadBufferedRetrySettings(connStr); - Assert.Equal(5, maxRetries); - Assert.Equal((long)TimeSpan.FromSeconds(12).TotalMilliseconds, retryIntervalMs); + var buffered = ReadBufferedRetrySettings(connStr); + Assert.Equal(5, buffered.MaxRetries); + Assert.Equal((long)TimeSpan.FromSeconds(12).TotalMilliseconds, buffered.RetryIntervalMs); + + // ExecutionId Task 4: the gateway must forward executionId / sourceScript + // into EnqueueAsync, and the S&F layer must persist them on the + // sf_messages row so the retry loop can stamp the right provenance. + Assert.Equal(executionId, buffered.ExecutionId); + Assert.Equal(sourceScript, buffered.SourceScript); } [Fact] @@ -148,21 +160,27 @@ public class DatabaseGatewayTests await gateway.CachedWriteAsync("testDb", "INSERT INTO t VALUES (1)"); - var (maxRetries, _) = ReadBufferedRetrySettings(connStr); + var (maxRetries, _, _, _) = ReadBufferedRetrySettings(connStr); // Must be the bounded S&F default, never 0 — a stored 0 would mean retry-forever. Assert.Equal(99, maxRetries); Assert.NotEqual(0, maxRetries); } - private static (int MaxRetries, long RetryIntervalMs) ReadBufferedRetrySettings(string connStr) + private static (int MaxRetries, long RetryIntervalMs, Guid? ExecutionId, string? SourceScript) + ReadBufferedRetrySettings(string connStr) { using var conn = new Microsoft.Data.Sqlite.SqliteConnection(connStr); conn.Open(); using var cmd = conn.CreateCommand(); - cmd.CommandText = "SELECT max_retries, retry_interval_ms FROM sf_messages"; + cmd.CommandText = + "SELECT max_retries, retry_interval_ms, execution_id, source_script FROM sf_messages"; using var reader = cmd.ExecuteReader(); Assert.True(reader.Read(), "expected exactly one buffered message"); - var result = (reader.GetInt32(0), reader.GetInt64(1)); + var result = ( + reader.GetInt32(0), + reader.GetInt64(1), + reader.IsDBNull(2) ? (Guid?)null : Guid.Parse(reader.GetString(2)), + reader.IsDBNull(3) ? null : reader.GetString(3)); Assert.False(reader.Read(), "expected exactly one buffered message"); return result; } diff --git a/tests/ScadaLink.ExternalSystemGateway.Tests/ExternalSystemClientTests.cs b/tests/ScadaLink.ExternalSystemGateway.Tests/ExternalSystemClientTests.cs index 43def9b..3898456 100644 --- a/tests/ScadaLink.ExternalSystemGateway.Tests/ExternalSystemClientTests.cs +++ b/tests/ScadaLink.ExternalSystemGateway.Tests/ExternalSystemClientTests.cs @@ -371,26 +371,45 @@ public class ExternalSystemClientTests _httpClientFactory, _repository, NullLogger.Instance, storeAndForward: sf); - var result = await client.CachedCallAsync("TestAPI", "postData"); + // Audit Log #23 (ExecutionId Task 4): a known execution id / source + // script so the gateway -> EnqueueAsync hop can be asserted below. + var executionId = Guid.NewGuid(); + const string sourceScript = "ScriptActor:CheckPressure"; + + var result = await client.CachedCallAsync( + "TestAPI", "postData", + executionId: executionId, sourceScript: sourceScript); Assert.True(result.WasBuffered); var depth = await storage.GetBufferDepthByCategoryAsync(); Assert.Equal(1, depth[ScadaLink.Commons.Types.Enums.StoreAndForwardCategory.ExternalSystem]); - var (maxRetries, retryIntervalMs) = ReadBufferedRetrySettings(connStr); - Assert.Equal(7, maxRetries); - Assert.Equal((long)TimeSpan.FromSeconds(42).TotalMilliseconds, retryIntervalMs); + var buffered = ReadBufferedRetrySettings(connStr); + Assert.Equal(7, buffered.MaxRetries); + Assert.Equal((long)TimeSpan.FromSeconds(42).TotalMilliseconds, buffered.RetryIntervalMs); + + // ExecutionId Task 4: the gateway must forward executionId / sourceScript + // into EnqueueAsync, and the S&F layer must persist them on the + // sf_messages row so the retry loop can stamp the right provenance. + Assert.Equal(executionId, buffered.ExecutionId); + Assert.Equal(sourceScript, buffered.SourceScript); } - private static (int MaxRetries, long RetryIntervalMs) ReadBufferedRetrySettings(string connStr) + private static (int MaxRetries, long RetryIntervalMs, Guid? ExecutionId, string? SourceScript) + ReadBufferedRetrySettings(string connStr) { using var conn = new SqliteConnection(connStr); conn.Open(); using var cmd = conn.CreateCommand(); - cmd.CommandText = "SELECT max_retries, retry_interval_ms FROM sf_messages"; + cmd.CommandText = + "SELECT max_retries, retry_interval_ms, execution_id, source_script FROM sf_messages"; using var reader = cmd.ExecuteReader(); Assert.True(reader.Read(), "expected exactly one buffered message"); - var result = (reader.GetInt32(0), reader.GetInt64(1)); + var result = ( + reader.GetInt32(0), + reader.GetInt64(1), + reader.IsDBNull(2) ? (Guid?)null : Guid.Parse(reader.GetString(2)), + reader.IsDBNull(3) ? null : reader.GetString(3)); Assert.False(reader.Read(), "expected exactly one buffered message"); return result; } @@ -436,7 +455,7 @@ public class ExternalSystemClientTests await client.CachedCallAsync("TestAPI", "postData"); - var (maxRetries, _) = ReadBufferedRetrySettings(connStr); + var (maxRetries, _, _, _) = ReadBufferedRetrySettings(connStr); // Must be the bounded S&F default, never 0 — a stored 0 would mean retry-forever. Assert.Equal(99, maxRetries); Assert.NotEqual(0, maxRetries); diff --git a/tests/ScadaLink.SiteRuntime.Tests/Scripts/DatabaseCachedWriteEmissionTests.cs b/tests/ScadaLink.SiteRuntime.Tests/Scripts/DatabaseCachedWriteEmissionTests.cs index d4a9b2f..405b38e 100644 --- a/tests/ScadaLink.SiteRuntime.Tests/Scripts/DatabaseCachedWriteEmissionTests.cs +++ b/tests/ScadaLink.SiteRuntime.Tests/Scripts/DatabaseCachedWriteEmissionTests.cs @@ -155,6 +155,46 @@ public class DatabaseCachedWriteEmissionTests Times.Once); } + /// + /// Audit Log #23 (ExecutionId Task 4): the helper → gateway hop of the + /// threading chain. The cached-write helper must forward the runtime + /// context's ExecutionId and SourceScript verbatim into + /// — so the buffered retry + /// loop later stamps the right provenance onto its audit rows. This + /// asserts the exact id/script (not It.IsAny), so a regression that + /// dropped the threading would fail here. + /// + [Fact] + public async Task CachedWrite_ThreadsExecutionIdAndSourceScript_IntoGateway() + { + var gateway = new Mock(); + gateway + .Setup(g => g.CachedWriteAsync( + "myDb", "INSERT INTO t VALUES (1)", + It.IsAny?>(), + InstanceName, + It.IsAny(), + It.IsAny(), + It.IsAny(), It.IsAny())) + .Returns(Task.CompletedTask); + var forwarder = new CapturingForwarder(); + + var helper = CreateHelper(gateway.Object, forwarder); + await helper.CachedWrite("myDb", "INSERT INTO t VALUES (1)"); + + // The known TestExecutionId and SourceScript must reach the gateway + // unchanged — these are what the S&F retry loop persists and replays. + gateway.Verify(g => g.CachedWriteAsync( + "myDb", "INSERT INTO t VALUES (1)", + It.IsAny?>(), + InstanceName, + It.IsAny(), + It.IsAny(), + It.Is(id => id == TestExecutionId), + It.Is(s => s == SourceScript)), + Times.Once); + } + [Fact] public async Task CachedWrite_ForwarderThrows_StillReturnsTrackedOperationId() { diff --git a/tests/ScadaLink.SiteRuntime.Tests/Scripts/ExternalSystemCachedCallEmissionTests.cs b/tests/ScadaLink.SiteRuntime.Tests/Scripts/ExternalSystemCachedCallEmissionTests.cs index 8f7dde9..ce392cb 100644 --- a/tests/ScadaLink.SiteRuntime.Tests/Scripts/ExternalSystemCachedCallEmissionTests.cs +++ b/tests/ScadaLink.SiteRuntime.Tests/Scripts/ExternalSystemCachedCallEmissionTests.cs @@ -188,6 +188,46 @@ public class ExternalSystemCachedCallEmissionTests Times.Once); } + /// + /// Audit Log #23 (ExecutionId Task 4): the helper → gateway hop of the + /// threading chain. The cached-call helper must forward the runtime + /// context's ExecutionId and SourceScript verbatim into + /// — so the buffered + /// retry loop later stamps the right provenance onto its audit rows. + /// This asserts the exact id/script (not It.IsAny), so a regression + /// that dropped the threading would fail here. + /// + [Fact] + public async Task CachedCall_ThreadsExecutionIdAndSourceScript_IntoClient() + { + var client = new Mock(); + client + .Setup(c => c.CachedCallAsync( + "ERP", "GetOrder", + It.IsAny?>(), + InstanceName, + It.IsAny(), + It.IsAny(), + It.IsAny(), It.IsAny())) + .ReturnsAsync(new ExternalCallResult(true, null, null, WasBuffered: true)); + var forwarder = new CapturingForwarder(); + + var helper = CreateHelper(client.Object, forwarder); + await helper.CachedCall("ERP", "GetOrder"); + + // The known TestExecutionId and SourceScript must reach the client + // unchanged — these are what the S&F retry loop persists and replays. + client.Verify(c => c.CachedCallAsync( + "ERP", "GetOrder", + It.IsAny?>(), + InstanceName, + It.IsAny(), + It.IsAny(), + It.Is(id => id == TestExecutionId), + It.Is(s => s == SourceScript)), + Times.Once); + } + [Fact] public async Task CachedCall_ForwarderThrows_StillReturnsTrackedOperationId_OriginalCallProceeds() { diff --git a/tests/ScadaLink.StoreAndForward.Tests/StoreAndForwardStorageTests.cs b/tests/ScadaLink.StoreAndForward.Tests/StoreAndForwardStorageTests.cs index be5820e..6fed58e 100644 --- a/tests/ScadaLink.StoreAndForward.Tests/StoreAndForwardStorageTests.cs +++ b/tests/ScadaLink.StoreAndForward.Tests/StoreAndForwardStorageTests.cs @@ -396,6 +396,46 @@ public class StoreAndForwardStorageTests : IAsyncLifetime, IDisposable Assert.Null(retrieved.SourceScript); } + [Fact] + public async Task MalformedExecutionId_ReadsBackAsNull_DoesNotAbortRetrySweep() + { + // Defensive read path: a corrupt (non-null, non-GUID) execution_id must + // be treated as "no execution id" rather than throwing FormatException + // — a single bad row must not abort the whole GetMessagesForRetryAsync + // sweep, which reads many rows. Persist two due rows, then corrupt the + // execution_id of one directly in the DB. + var goodId = Guid.NewGuid(); + var good = CreateMessage("good1", StoreAndForwardCategory.ExternalSystem); + good.ExecutionId = goodId; + good.LastAttemptAt = null; // due immediately + await _storage.EnqueueAsync(good); + + var bad = CreateMessage("bad1", StoreAndForwardCategory.ExternalSystem); + bad.ExecutionId = Guid.NewGuid(); + bad.LastAttemptAt = null; // due immediately + await _storage.EnqueueAsync(bad); + + await using (var conn = new SqliteConnection($"Data Source={_dbName};Mode=Memory;Cache=Shared")) + { + await conn.OpenAsync(); + await using var corrupt = conn.CreateCommand(); + corrupt.CommandText = + "UPDATE sf_messages SET execution_id = 'not-a-guid' WHERE id = 'bad1';"; + await corrupt.ExecuteNonQueryAsync(); + } + + // The sweep must not throw; the corrupt row reads back with a null + // ExecutionId, the well-formed row keeps its value. + var due = await _storage.GetMessagesForRetryAsync(); + Assert.Null(Assert.Single(due, m => m.Id == "bad1").ExecutionId); + Assert.Equal(goodId, Assert.Single(due, m => m.Id == "good1").ExecutionId); + + // The single-row read path is equally defensive. + var retrieved = await _storage.GetMessageByIdAsync("bad1"); + Assert.NotNull(retrieved); + Assert.Null(retrieved!.ExecutionId); + } + [Fact] public async Task InitializeAsync_IsIdempotent_WhenColumnsAlreadyExist() {