test(auditlog): assert ExecutionId threading hops; defensive Guid parse on S&F read
This commit is contained in:
@@ -497,10 +497,31 @@ public class StoreAndForwardStorage
|
|||||||
// Audit Log #23 (ExecutionId Task 4): rows persisted before the
|
// Audit Log #23 (ExecutionId Task 4): rows persisted before the
|
||||||
// additive migration have no execution_id / source_script value;
|
// additive migration have no execution_id / source_script value;
|
||||||
// IsDBNull guards keep those reading back as null (back-compat).
|
// IsDBNull guards keep those reading back as null (back-compat).
|
||||||
ExecutionId = reader.IsDBNull(12) ? null : Guid.Parse(reader.GetString(12)),
|
// Guid.TryParse (not Parse) guards the retry sweep: a corrupt
|
||||||
|
// non-null execution_id is treated as "no execution id" rather
|
||||||
|
// than throwing FormatException and aborting the whole sweep.
|
||||||
|
ExecutionId = ParseExecutionId(reader, 12),
|
||||||
SourceScript = reader.IsDBNull(13) ? null : reader.GetString(13)
|
SourceScript = reader.IsDBNull(13) ? null : reader.GetString(13)
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
return results;
|
return results;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Audit Log #23 (ExecutionId Task 4): defensively reads the
|
||||||
|
/// <c>execution_id</c> column. A <c>null</c> value (legacy pre-migration
|
||||||
|
/// rows) and a malformed non-null value both yield <c>null</c> — a corrupt
|
||||||
|
/// id must not throw and abort the retry sweep, which reads many rows.
|
||||||
|
/// </summary>
|
||||||
|
private static Guid? ParseExecutionId(System.Data.Common.DbDataReader reader, int ordinal)
|
||||||
|
{
|
||||||
|
if (reader.IsDBNull(ordinal))
|
||||||
|
{
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
return Guid.TryParse(reader.GetString(ordinal), out var executionId)
|
||||||
|
? executionId
|
||||||
|
: null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -101,15 +101,27 @@ public class DatabaseGatewayTests
|
|||||||
|
|
||||||
var gateway = new DatabaseGateway(_repository, NullLogger<DatabaseGateway>.Instance, storeAndForward: sf);
|
var gateway = new DatabaseGateway(_repository, NullLogger<DatabaseGateway>.Instance, storeAndForward: sf);
|
||||||
|
|
||||||
|
// Audit Log #23 (ExecutionId Task 4): a known execution id / source
|
||||||
|
// script so the gateway -> EnqueueAsync hop can be asserted below.
|
||||||
|
var executionId = Guid.NewGuid();
|
||||||
|
const string sourceScript = "ScriptActor:WriteAudit";
|
||||||
|
|
||||||
await gateway.CachedWriteAsync("testDb", "INSERT INTO t VALUES (@v)",
|
await gateway.CachedWriteAsync("testDb", "INSERT INTO t VALUES (@v)",
|
||||||
new Dictionary<string, object?> { ["v"] = 1 });
|
new Dictionary<string, object?> { ["v"] = 1 },
|
||||||
|
executionId: executionId, sourceScript: sourceScript);
|
||||||
|
|
||||||
var depth = await storage.GetBufferDepthByCategoryAsync();
|
var depth = await storage.GetBufferDepthByCategoryAsync();
|
||||||
Assert.Equal(1, depth[ScadaLink.Commons.Types.Enums.StoreAndForwardCategory.CachedDbWrite]);
|
Assert.Equal(1, depth[ScadaLink.Commons.Types.Enums.StoreAndForwardCategory.CachedDbWrite]);
|
||||||
|
|
||||||
var (maxRetries, retryIntervalMs) = ReadBufferedRetrySettings(connStr);
|
var buffered = ReadBufferedRetrySettings(connStr);
|
||||||
Assert.Equal(5, maxRetries);
|
Assert.Equal(5, buffered.MaxRetries);
|
||||||
Assert.Equal((long)TimeSpan.FromSeconds(12).TotalMilliseconds, retryIntervalMs);
|
Assert.Equal((long)TimeSpan.FromSeconds(12).TotalMilliseconds, buffered.RetryIntervalMs);
|
||||||
|
|
||||||
|
// ExecutionId Task 4: the gateway must forward executionId / sourceScript
|
||||||
|
// into EnqueueAsync, and the S&F layer must persist them on the
|
||||||
|
// sf_messages row so the retry loop can stamp the right provenance.
|
||||||
|
Assert.Equal(executionId, buffered.ExecutionId);
|
||||||
|
Assert.Equal(sourceScript, buffered.SourceScript);
|
||||||
}
|
}
|
||||||
|
|
||||||
[Fact]
|
[Fact]
|
||||||
@@ -148,21 +160,27 @@ public class DatabaseGatewayTests
|
|||||||
|
|
||||||
await gateway.CachedWriteAsync("testDb", "INSERT INTO t VALUES (1)");
|
await gateway.CachedWriteAsync("testDb", "INSERT INTO t VALUES (1)");
|
||||||
|
|
||||||
var (maxRetries, _) = ReadBufferedRetrySettings(connStr);
|
var (maxRetries, _, _, _) = ReadBufferedRetrySettings(connStr);
|
||||||
// Must be the bounded S&F default, never 0 — a stored 0 would mean retry-forever.
|
// Must be the bounded S&F default, never 0 — a stored 0 would mean retry-forever.
|
||||||
Assert.Equal(99, maxRetries);
|
Assert.Equal(99, maxRetries);
|
||||||
Assert.NotEqual(0, maxRetries);
|
Assert.NotEqual(0, maxRetries);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static (int MaxRetries, long RetryIntervalMs) ReadBufferedRetrySettings(string connStr)
|
private static (int MaxRetries, long RetryIntervalMs, Guid? ExecutionId, string? SourceScript)
|
||||||
|
ReadBufferedRetrySettings(string connStr)
|
||||||
{
|
{
|
||||||
using var conn = new Microsoft.Data.Sqlite.SqliteConnection(connStr);
|
using var conn = new Microsoft.Data.Sqlite.SqliteConnection(connStr);
|
||||||
conn.Open();
|
conn.Open();
|
||||||
using var cmd = conn.CreateCommand();
|
using var cmd = conn.CreateCommand();
|
||||||
cmd.CommandText = "SELECT max_retries, retry_interval_ms FROM sf_messages";
|
cmd.CommandText =
|
||||||
|
"SELECT max_retries, retry_interval_ms, execution_id, source_script FROM sf_messages";
|
||||||
using var reader = cmd.ExecuteReader();
|
using var reader = cmd.ExecuteReader();
|
||||||
Assert.True(reader.Read(), "expected exactly one buffered message");
|
Assert.True(reader.Read(), "expected exactly one buffered message");
|
||||||
var result = (reader.GetInt32(0), reader.GetInt64(1));
|
var result = (
|
||||||
|
reader.GetInt32(0),
|
||||||
|
reader.GetInt64(1),
|
||||||
|
reader.IsDBNull(2) ? (Guid?)null : Guid.Parse(reader.GetString(2)),
|
||||||
|
reader.IsDBNull(3) ? null : reader.GetString(3));
|
||||||
Assert.False(reader.Read(), "expected exactly one buffered message");
|
Assert.False(reader.Read(), "expected exactly one buffered message");
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -371,26 +371,45 @@ public class ExternalSystemClientTests
|
|||||||
_httpClientFactory, _repository, NullLogger<ExternalSystemClient>.Instance,
|
_httpClientFactory, _repository, NullLogger<ExternalSystemClient>.Instance,
|
||||||
storeAndForward: sf);
|
storeAndForward: sf);
|
||||||
|
|
||||||
var result = await client.CachedCallAsync("TestAPI", "postData");
|
// Audit Log #23 (ExecutionId Task 4): a known execution id / source
|
||||||
|
// script so the gateway -> EnqueueAsync hop can be asserted below.
|
||||||
|
var executionId = Guid.NewGuid();
|
||||||
|
const string sourceScript = "ScriptActor:CheckPressure";
|
||||||
|
|
||||||
|
var result = await client.CachedCallAsync(
|
||||||
|
"TestAPI", "postData",
|
||||||
|
executionId: executionId, sourceScript: sourceScript);
|
||||||
Assert.True(result.WasBuffered);
|
Assert.True(result.WasBuffered);
|
||||||
|
|
||||||
var depth = await storage.GetBufferDepthByCategoryAsync();
|
var depth = await storage.GetBufferDepthByCategoryAsync();
|
||||||
Assert.Equal(1, depth[ScadaLink.Commons.Types.Enums.StoreAndForwardCategory.ExternalSystem]);
|
Assert.Equal(1, depth[ScadaLink.Commons.Types.Enums.StoreAndForwardCategory.ExternalSystem]);
|
||||||
|
|
||||||
var (maxRetries, retryIntervalMs) = ReadBufferedRetrySettings(connStr);
|
var buffered = ReadBufferedRetrySettings(connStr);
|
||||||
Assert.Equal(7, maxRetries);
|
Assert.Equal(7, buffered.MaxRetries);
|
||||||
Assert.Equal((long)TimeSpan.FromSeconds(42).TotalMilliseconds, retryIntervalMs);
|
Assert.Equal((long)TimeSpan.FromSeconds(42).TotalMilliseconds, buffered.RetryIntervalMs);
|
||||||
|
|
||||||
|
// ExecutionId Task 4: the gateway must forward executionId / sourceScript
|
||||||
|
// into EnqueueAsync, and the S&F layer must persist them on the
|
||||||
|
// sf_messages row so the retry loop can stamp the right provenance.
|
||||||
|
Assert.Equal(executionId, buffered.ExecutionId);
|
||||||
|
Assert.Equal(sourceScript, buffered.SourceScript);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static (int MaxRetries, long RetryIntervalMs) ReadBufferedRetrySettings(string connStr)
|
private static (int MaxRetries, long RetryIntervalMs, Guid? ExecutionId, string? SourceScript)
|
||||||
|
ReadBufferedRetrySettings(string connStr)
|
||||||
{
|
{
|
||||||
using var conn = new SqliteConnection(connStr);
|
using var conn = new SqliteConnection(connStr);
|
||||||
conn.Open();
|
conn.Open();
|
||||||
using var cmd = conn.CreateCommand();
|
using var cmd = conn.CreateCommand();
|
||||||
cmd.CommandText = "SELECT max_retries, retry_interval_ms FROM sf_messages";
|
cmd.CommandText =
|
||||||
|
"SELECT max_retries, retry_interval_ms, execution_id, source_script FROM sf_messages";
|
||||||
using var reader = cmd.ExecuteReader();
|
using var reader = cmd.ExecuteReader();
|
||||||
Assert.True(reader.Read(), "expected exactly one buffered message");
|
Assert.True(reader.Read(), "expected exactly one buffered message");
|
||||||
var result = (reader.GetInt32(0), reader.GetInt64(1));
|
var result = (
|
||||||
|
reader.GetInt32(0),
|
||||||
|
reader.GetInt64(1),
|
||||||
|
reader.IsDBNull(2) ? (Guid?)null : Guid.Parse(reader.GetString(2)),
|
||||||
|
reader.IsDBNull(3) ? null : reader.GetString(3));
|
||||||
Assert.False(reader.Read(), "expected exactly one buffered message");
|
Assert.False(reader.Read(), "expected exactly one buffered message");
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
@@ -436,7 +455,7 @@ public class ExternalSystemClientTests
|
|||||||
|
|
||||||
await client.CachedCallAsync("TestAPI", "postData");
|
await client.CachedCallAsync("TestAPI", "postData");
|
||||||
|
|
||||||
var (maxRetries, _) = ReadBufferedRetrySettings(connStr);
|
var (maxRetries, _, _, _) = ReadBufferedRetrySettings(connStr);
|
||||||
// Must be the bounded S&F default, never 0 — a stored 0 would mean retry-forever.
|
// Must be the bounded S&F default, never 0 — a stored 0 would mean retry-forever.
|
||||||
Assert.Equal(99, maxRetries);
|
Assert.Equal(99, maxRetries);
|
||||||
Assert.NotEqual(0, maxRetries);
|
Assert.NotEqual(0, maxRetries);
|
||||||
|
|||||||
@@ -155,6 +155,46 @@ public class DatabaseCachedWriteEmissionTests
|
|||||||
Times.Once);
|
Times.Once);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Audit Log #23 (ExecutionId Task 4): the helper → gateway hop of the
|
||||||
|
/// threading chain. The cached-write helper must forward the runtime
|
||||||
|
/// context's <c>ExecutionId</c> and <c>SourceScript</c> verbatim into
|
||||||
|
/// <see cref="IDatabaseGateway.CachedWriteAsync"/> — so the buffered retry
|
||||||
|
/// loop later stamps the right provenance onto its audit rows. This
|
||||||
|
/// asserts the exact id/script (not <c>It.IsAny</c>), so a regression that
|
||||||
|
/// dropped the threading would fail here.
|
||||||
|
/// </summary>
|
||||||
|
[Fact]
|
||||||
|
public async Task CachedWrite_ThreadsExecutionIdAndSourceScript_IntoGateway()
|
||||||
|
{
|
||||||
|
var gateway = new Mock<IDatabaseGateway>();
|
||||||
|
gateway
|
||||||
|
.Setup(g => g.CachedWriteAsync(
|
||||||
|
"myDb", "INSERT INTO t VALUES (1)",
|
||||||
|
It.IsAny<IReadOnlyDictionary<string, object?>?>(),
|
||||||
|
InstanceName,
|
||||||
|
It.IsAny<CancellationToken>(),
|
||||||
|
It.IsAny<TrackedOperationId?>(),
|
||||||
|
It.IsAny<Guid?>(), It.IsAny<string?>()))
|
||||||
|
.Returns(Task.CompletedTask);
|
||||||
|
var forwarder = new CapturingForwarder();
|
||||||
|
|
||||||
|
var helper = CreateHelper(gateway.Object, forwarder);
|
||||||
|
await helper.CachedWrite("myDb", "INSERT INTO t VALUES (1)");
|
||||||
|
|
||||||
|
// The known TestExecutionId and SourceScript must reach the gateway
|
||||||
|
// unchanged — these are what the S&F retry loop persists and replays.
|
||||||
|
gateway.Verify(g => g.CachedWriteAsync(
|
||||||
|
"myDb", "INSERT INTO t VALUES (1)",
|
||||||
|
It.IsAny<IReadOnlyDictionary<string, object?>?>(),
|
||||||
|
InstanceName,
|
||||||
|
It.IsAny<CancellationToken>(),
|
||||||
|
It.IsAny<TrackedOperationId?>(),
|
||||||
|
It.Is<Guid?>(id => id == TestExecutionId),
|
||||||
|
It.Is<string?>(s => s == SourceScript)),
|
||||||
|
Times.Once);
|
||||||
|
}
|
||||||
|
|
||||||
[Fact]
|
[Fact]
|
||||||
public async Task CachedWrite_ForwarderThrows_StillReturnsTrackedOperationId()
|
public async Task CachedWrite_ForwarderThrows_StillReturnsTrackedOperationId()
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -188,6 +188,46 @@ public class ExternalSystemCachedCallEmissionTests
|
|||||||
Times.Once);
|
Times.Once);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Audit Log #23 (ExecutionId Task 4): the helper → gateway hop of the
|
||||||
|
/// threading chain. The cached-call helper must forward the runtime
|
||||||
|
/// context's <c>ExecutionId</c> and <c>SourceScript</c> verbatim into
|
||||||
|
/// <see cref="IExternalSystemClient.CachedCallAsync"/> — so the buffered
|
||||||
|
/// retry loop later stamps the right provenance onto its audit rows.
|
||||||
|
/// This asserts the exact id/script (not <c>It.IsAny</c>), so a regression
|
||||||
|
/// that dropped the threading would fail here.
|
||||||
|
/// </summary>
|
||||||
|
[Fact]
|
||||||
|
public async Task CachedCall_ThreadsExecutionIdAndSourceScript_IntoClient()
|
||||||
|
{
|
||||||
|
var client = new Mock<IExternalSystemClient>();
|
||||||
|
client
|
||||||
|
.Setup(c => c.CachedCallAsync(
|
||||||
|
"ERP", "GetOrder",
|
||||||
|
It.IsAny<IReadOnlyDictionary<string, object?>?>(),
|
||||||
|
InstanceName,
|
||||||
|
It.IsAny<CancellationToken>(),
|
||||||
|
It.IsAny<TrackedOperationId?>(),
|
||||||
|
It.IsAny<Guid?>(), It.IsAny<string?>()))
|
||||||
|
.ReturnsAsync(new ExternalCallResult(true, null, null, WasBuffered: true));
|
||||||
|
var forwarder = new CapturingForwarder();
|
||||||
|
|
||||||
|
var helper = CreateHelper(client.Object, forwarder);
|
||||||
|
await helper.CachedCall("ERP", "GetOrder");
|
||||||
|
|
||||||
|
// The known TestExecutionId and SourceScript must reach the client
|
||||||
|
// unchanged — these are what the S&F retry loop persists and replays.
|
||||||
|
client.Verify(c => c.CachedCallAsync(
|
||||||
|
"ERP", "GetOrder",
|
||||||
|
It.IsAny<IReadOnlyDictionary<string, object?>?>(),
|
||||||
|
InstanceName,
|
||||||
|
It.IsAny<CancellationToken>(),
|
||||||
|
It.IsAny<TrackedOperationId?>(),
|
||||||
|
It.Is<Guid?>(id => id == TestExecutionId),
|
||||||
|
It.Is<string?>(s => s == SourceScript)),
|
||||||
|
Times.Once);
|
||||||
|
}
|
||||||
|
|
||||||
[Fact]
|
[Fact]
|
||||||
public async Task CachedCall_ForwarderThrows_StillReturnsTrackedOperationId_OriginalCallProceeds()
|
public async Task CachedCall_ForwarderThrows_StillReturnsTrackedOperationId_OriginalCallProceeds()
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -396,6 +396,46 @@ public class StoreAndForwardStorageTests : IAsyncLifetime, IDisposable
|
|||||||
Assert.Null(retrieved.SourceScript);
|
Assert.Null(retrieved.SourceScript);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task MalformedExecutionId_ReadsBackAsNull_DoesNotAbortRetrySweep()
|
||||||
|
{
|
||||||
|
// Defensive read path: a corrupt (non-null, non-GUID) execution_id must
|
||||||
|
// be treated as "no execution id" rather than throwing FormatException
|
||||||
|
// — a single bad row must not abort the whole GetMessagesForRetryAsync
|
||||||
|
// sweep, which reads many rows. Persist two due rows, then corrupt the
|
||||||
|
// execution_id of one directly in the DB.
|
||||||
|
var goodId = Guid.NewGuid();
|
||||||
|
var good = CreateMessage("good1", StoreAndForwardCategory.ExternalSystem);
|
||||||
|
good.ExecutionId = goodId;
|
||||||
|
good.LastAttemptAt = null; // due immediately
|
||||||
|
await _storage.EnqueueAsync(good);
|
||||||
|
|
||||||
|
var bad = CreateMessage("bad1", StoreAndForwardCategory.ExternalSystem);
|
||||||
|
bad.ExecutionId = Guid.NewGuid();
|
||||||
|
bad.LastAttemptAt = null; // due immediately
|
||||||
|
await _storage.EnqueueAsync(bad);
|
||||||
|
|
||||||
|
await using (var conn = new SqliteConnection($"Data Source={_dbName};Mode=Memory;Cache=Shared"))
|
||||||
|
{
|
||||||
|
await conn.OpenAsync();
|
||||||
|
await using var corrupt = conn.CreateCommand();
|
||||||
|
corrupt.CommandText =
|
||||||
|
"UPDATE sf_messages SET execution_id = 'not-a-guid' WHERE id = 'bad1';";
|
||||||
|
await corrupt.ExecuteNonQueryAsync();
|
||||||
|
}
|
||||||
|
|
||||||
|
// The sweep must not throw; the corrupt row reads back with a null
|
||||||
|
// ExecutionId, the well-formed row keeps its value.
|
||||||
|
var due = await _storage.GetMessagesForRetryAsync();
|
||||||
|
Assert.Null(Assert.Single(due, m => m.Id == "bad1").ExecutionId);
|
||||||
|
Assert.Equal(goodId, Assert.Single(due, m => m.Id == "good1").ExecutionId);
|
||||||
|
|
||||||
|
// The single-row read path is equally defensive.
|
||||||
|
var retrieved = await _storage.GetMessageByIdAsync("bad1");
|
||||||
|
Assert.NotNull(retrieved);
|
||||||
|
Assert.Null(retrieved!.ExecutionId);
|
||||||
|
}
|
||||||
|
|
||||||
[Fact]
|
[Fact]
|
||||||
public async Task InitializeAsync_IsIdempotent_WhenColumnsAlreadyExist()
|
public async Task InitializeAsync_IsIdempotent_WhenColumnsAlreadyExist()
|
||||||
{
|
{
|
||||||
|
|||||||
Reference in New Issue
Block a user