Files
scadalink-design/tests/ScadaLink.SiteRuntime.Tests/Tracking/OperationTrackingStoreTests.cs

442 lines
18 KiB
C#

using Microsoft.Data.Sqlite;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using ScadaLink.Commons.Types;
using ScadaLink.Commons.Types.Enums;
using ScadaLink.SiteRuntime.Tracking;
namespace ScadaLink.SiteRuntime.Tests.Tracking;
/// <summary>
/// Audit Log #23 (M3 Bundle A — Task A2) — schema + behaviour tests for the
/// site-local <see cref="OperationTrackingStore"/>. Each test uses a unique
/// shared-cache in-memory SQLite database so the store and the verifier share
/// the same store without touching disk.
/// </summary>
public class OperationTrackingStoreTests
{
private static (OperationTrackingStore store, string dataSource) CreateStore(
string testName)
{
var dataSource = $"file:{testName}-{Guid.NewGuid():N}?mode=memory&cache=shared";
var connectionString = $"Data Source={dataSource};Cache=Shared";
var options = new OperationTrackingOptions
{
ConnectionString = connectionString,
};
var store = new OperationTrackingStore(
Options.Create(options),
NullLogger<OperationTrackingStore>.Instance);
return (store, dataSource);
}
private static SqliteConnection OpenVerifierConnection(string dataSource)
{
var connection = new SqliteConnection($"Data Source={dataSource};Cache=Shared");
connection.Open();
return connection;
}
[Fact]
public void Constructor_CreatesOperationTracking_SchemaOnFirstUse()
{
var (store, dataSource) = CreateStore(nameof(Constructor_CreatesOperationTracking_SchemaOnFirstUse));
using (store)
{
using var connection = OpenVerifierConnection(dataSource);
using var cmd = connection.CreateCommand();
cmd.CommandText = "PRAGMA table_info(OperationTracking);";
using var reader = cmd.ExecuteReader();
var columns = new List<(string Name, int Pk, int NotNull)>();
while (reader.Read())
{
columns.Add((reader.GetString(1), reader.GetInt32(5), reader.GetInt32(3)));
}
var expected = new[]
{
"TrackedOperationId", "Kind", "TargetSummary", "Status",
"RetryCount", "LastError", "HttpStatus", "CreatedAtUtc",
"UpdatedAtUtc", "TerminalAtUtc", "SourceInstanceId", "SourceScript",
"SourceNode",
};
Assert.Equal(
expected.OrderBy(n => n),
columns.Select(c => c.Name).OrderBy(n => n));
var pkColumns = columns.Where(c => c.Pk > 0).Select(c => c.Name).ToList();
Assert.Single(pkColumns);
Assert.Equal("TrackedOperationId", pkColumns[0]);
}
}
[Fact]
public void Initialize_creates_OperationTracking_with_SourceNode_column()
{
var (store, dataSource) = CreateStore(nameof(Initialize_creates_OperationTracking_with_SourceNode_column));
using (store)
{
using var connection = OpenVerifierConnection(dataSource);
Assert.True(
ColumnExists(connection, "SourceNode"),
"Fresh OperationTracking schema must include the SourceNode column.");
}
}
/// <summary>
/// The pre-SourceNode <c>OperationTracking</c> schema — the 12-column
/// CREATE TABLE that has the original source-provenance columns
/// (<c>SourceInstanceId</c>, <c>SourceScript</c>) but is WITHOUT
/// <c>SourceNode</c>. A deployment that ran before the SourceNode
/// stamping work already has an on-disk <c>tracking.db</c> in exactly
/// this shape, and <c>CREATE TABLE IF NOT EXISTS</c> is a no-op against it.
/// </summary>
private const string OldPreSourceNodeSchema = """
CREATE TABLE IF NOT EXISTS OperationTracking (
TrackedOperationId TEXT NOT NULL PRIMARY KEY,
Kind TEXT NOT NULL,
TargetSummary TEXT NULL,
Status TEXT NOT NULL,
RetryCount INTEGER NOT NULL DEFAULT 0,
LastError TEXT NULL,
HttpStatus INTEGER NULL,
CreatedAtUtc TEXT NOT NULL,
UpdatedAtUtc TEXT NOT NULL,
TerminalAtUtc TEXT NULL,
SourceInstanceId TEXT NULL,
SourceScript TEXT NULL
);
CREATE INDEX IF NOT EXISTS IX_OperationTracking_Status_Updated
ON OperationTracking (Status, UpdatedAtUtc);
""";
private static SqliteConnection SeedPreSourceNodeSchemaDatabase(string dataSource)
{
var connection = new SqliteConnection($"Data Source={dataSource};Cache=Shared");
connection.Open();
using var cmd = connection.CreateCommand();
cmd.CommandText = OldPreSourceNodeSchema;
cmd.ExecuteNonQuery();
return connection;
}
private static bool ColumnExists(SqliteConnection connection, string columnName)
{
using var cmd = connection.CreateCommand();
cmd.CommandText = "SELECT COUNT(*) FROM pragma_table_info('OperationTracking') WHERE name = $name";
cmd.Parameters.AddWithValue("$name", columnName);
return Convert.ToInt32(cmd.ExecuteScalar()) > 0;
}
private static OperationTrackingStore CreateStoreOver(string dataSource)
{
var connectionString = $"Data Source={dataSource};Cache=Shared";
var options = new OperationTrackingOptions { ConnectionString = connectionString };
return new OperationTrackingStore(
Options.Create(options),
NullLogger<OperationTrackingStore>.Instance);
}
[Fact]
public async Task Initialize_adds_SourceNode_to_pre_existing_schema()
{
var dataSource = $"file:{nameof(Initialize_adds_SourceNode_to_pre_existing_schema)}-{Guid.NewGuid():N}?mode=memory&cache=shared";
// A pre-SourceNode deployment: tracking.db already exists with the
// 12-column schema and NO SourceNode column.
using var seedConnection = SeedPreSourceNodeSchemaDatabase(dataSource);
Assert.True(ColumnExists(seedConnection, "SourceInstanceId"));
Assert.True(ColumnExists(seedConnection, "SourceScript"));
Assert.False(ColumnExists(seedConnection, "SourceNode"));
// Upgrade: a post-branch OperationTrackingStore opens the same database.
// Its InitializeSchema must ALTER the missing SourceNode column in —
// the CREATE TABLE IF NOT EXISTS alone is a no-op against the existing
// table.
await using (var store = CreateStoreOver(dataSource))
{
Assert.True(
ColumnExists(seedConnection, "SourceNode"),
"OperationTrackingStore must ALTER the SourceNode column into a pre-existing OperationTracking table.");
// A RecordEnqueueAsync binding $sourceNode must now succeed; without
// the ALTER it would fail with "no such column: SourceNode".
var id = TrackedOperationId.New();
await store.RecordEnqueueAsync(
id,
kind: "ApiCallCached",
targetSummary: "ERP.GetOrder",
sourceInstanceId: "inst-1",
sourceScript: "ScriptActor:OnTick",
sourceNode: "node-a");
var snapshot = await store.GetStatusAsync(id);
Assert.NotNull(snapshot);
Assert.Equal("node-a", snapshot!.SourceNode);
}
// Idempotency: a second store over the now-upgraded DB must not error
// (the probe sees SourceNode already present and skips the ALTER).
await using (var storeAgain = CreateStoreOver(dataSource))
{
Assert.True(ColumnExists(seedConnection, "SourceNode"));
}
}
[Fact]
public async Task RecordEnqueueAsync_persists_SourceNode()
{
var (store, _) = CreateStore(nameof(RecordEnqueueAsync_persists_SourceNode));
await using var _store = store;
var id = TrackedOperationId.New();
await store.RecordEnqueueAsync(
id,
kind: nameof(AuditKind.ApiCallCached),
targetSummary: "ERP.GetOrder",
sourceInstanceId: "Plant.Pump42",
sourceScript: "ScriptActor:OnTick",
sourceNode: "node-a");
var snapshot = await store.GetStatusAsync(id);
Assert.NotNull(snapshot);
Assert.Equal("node-a", snapshot!.SourceNode);
}
[Fact]
public async Task RecordEnqueueAsync_persists_null_SourceNode()
{
var (store, _) = CreateStore(nameof(RecordEnqueueAsync_persists_null_SourceNode));
await using var _store = store;
var id = TrackedOperationId.New();
await store.RecordEnqueueAsync(
id,
kind: nameof(AuditKind.ApiCallCached),
targetSummary: "ERP.GetOrder",
sourceInstanceId: null,
sourceScript: null,
sourceNode: null);
var snapshot = await store.GetStatusAsync(id);
Assert.NotNull(snapshot);
Assert.Null(snapshot!.SourceNode);
}
[Fact]
public async Task RecordEnqueueAsync_InsertsSubmittedRow_WithRetryCountZero()
{
var (store, dataSource) = CreateStore(nameof(RecordEnqueueAsync_InsertsSubmittedRow_WithRetryCountZero));
await using var _ = store;
var id = TrackedOperationId.New();
await store.RecordEnqueueAsync(
id,
kind: nameof(AuditKind.ApiCallCached),
targetSummary: "ERP.GetOrder",
sourceInstanceId: "Plant.Pump42",
sourceScript: "ScriptActor:OnTick",
sourceNode: null);
var snapshot = await store.GetStatusAsync(id);
Assert.NotNull(snapshot);
Assert.Equal(id, snapshot!.Id);
Assert.Equal(nameof(AuditKind.ApiCallCached), snapshot.Kind);
Assert.Equal("ERP.GetOrder", snapshot.TargetSummary);
Assert.Equal(nameof(AuditStatus.Submitted), snapshot.Status);
Assert.Equal(0, snapshot.RetryCount);
Assert.Null(snapshot.LastError);
Assert.Null(snapshot.HttpStatus);
Assert.Null(snapshot.TerminalAtUtc);
Assert.Equal("Plant.Pump42", snapshot.SourceInstanceId);
Assert.Equal("ScriptActor:OnTick", snapshot.SourceScript);
Assert.Equal(DateTimeKind.Utc, snapshot.CreatedAtUtc.Kind);
Assert.Equal(DateTimeKind.Utc, snapshot.UpdatedAtUtc.Kind);
}
[Fact]
public async Task RecordEnqueueAsync_Duplicate_IsNoOp_FirstWriteWins()
{
var (store, _) = CreateStore(nameof(RecordEnqueueAsync_Duplicate_IsNoOp_FirstWriteWins));
await using var _store = store;
var id = TrackedOperationId.New();
await store.RecordEnqueueAsync(id, "ApiCallCached", "ERP.GetOrder", "Plant.Pump42", "ScriptActor:OnTick", sourceNode: null);
await store.RecordEnqueueAsync(id, "ApiCallCached", "OtherTarget", "Other.Instance", "ScriptActor:Other", sourceNode: null);
var snapshot = await store.GetStatusAsync(id);
Assert.NotNull(snapshot);
// First-write-wins: the second enqueue is ignored — Target/Source stay first.
Assert.Equal("ERP.GetOrder", snapshot!.TargetSummary);
Assert.Equal("Plant.Pump42", snapshot.SourceInstanceId);
Assert.Equal("ScriptActor:OnTick", snapshot.SourceScript);
Assert.Equal(nameof(AuditStatus.Submitted), snapshot.Status);
Assert.Equal(0, snapshot.RetryCount);
}
[Fact]
public async Task RecordAttemptAsync_AdvancesStatusAndRetryCount_OnNonTerminalRow()
{
var (store, _) = CreateStore(nameof(RecordAttemptAsync_AdvancesStatusAndRetryCount_OnNonTerminalRow));
await using var _store = store;
var id = TrackedOperationId.New();
await store.RecordEnqueueAsync(id, "ApiCallCached", "ERP.GetOrder", null, null, sourceNode: null);
await store.RecordAttemptAsync(
id,
status: nameof(AuditStatus.Attempted),
retryCount: 1,
lastError: "HTTP 503 from ERP",
httpStatus: 503);
var snapshot = await store.GetStatusAsync(id);
Assert.NotNull(snapshot);
Assert.Equal(nameof(AuditStatus.Attempted), snapshot!.Status);
Assert.Equal(1, snapshot.RetryCount);
Assert.Equal("HTTP 503 from ERP", snapshot.LastError);
Assert.Equal(503, snapshot.HttpStatus);
Assert.Null(snapshot.TerminalAtUtc);
// UpdatedAtUtc advances past CreatedAtUtc.
Assert.True(snapshot.UpdatedAtUtc >= snapshot.CreatedAtUtc);
}
[Fact]
public async Task RecordAttemptAsync_OnTerminalRow_IsNoOp()
{
var (store, _) = CreateStore(nameof(RecordAttemptAsync_OnTerminalRow_IsNoOp));
await using var _store = store;
var id = TrackedOperationId.New();
await store.RecordEnqueueAsync(id, "ApiCallCached", "ERP.GetOrder", null, null, sourceNode: null);
await store.RecordTerminalAsync(
id,
status: nameof(AuditStatus.Delivered),
lastError: null,
httpStatus: 200);
var terminalSnapshot = await store.GetStatusAsync(id);
Assert.NotNull(terminalSnapshot);
Assert.NotNull(terminalSnapshot!.TerminalAtUtc);
// Late attempt telemetry must NOT overwrite the terminal row.
await store.RecordAttemptAsync(
id,
status: nameof(AuditStatus.Attempted),
retryCount: 5,
lastError: "late attempt",
httpStatus: 500);
var afterLate = await store.GetStatusAsync(id);
Assert.NotNull(afterLate);
Assert.Equal(nameof(AuditStatus.Delivered), afterLate!.Status);
Assert.Equal(0, afterLate.RetryCount);
Assert.Null(afterLate.LastError);
Assert.Equal(200, afterLate.HttpStatus);
Assert.NotNull(afterLate.TerminalAtUtc);
}
[Fact]
public async Task RecordTerminalAsync_FlipsToTerminal_WithTerminalAtUtcSet()
{
var (store, _) = CreateStore(nameof(RecordTerminalAsync_FlipsToTerminal_WithTerminalAtUtcSet));
await using var _store = store;
var id = TrackedOperationId.New();
await store.RecordEnqueueAsync(id, "ApiCallCached", "ERP.GetOrder", null, null, sourceNode: null);
var beforeTerminal = DateTime.UtcNow;
await store.RecordTerminalAsync(
id,
status: nameof(AuditStatus.Parked),
lastError: "HTTP 503 (max retries)",
httpStatus: 503);
var snapshot = await store.GetStatusAsync(id);
Assert.NotNull(snapshot);
Assert.Equal(nameof(AuditStatus.Parked), snapshot!.Status);
Assert.NotNull(snapshot.TerminalAtUtc);
Assert.Equal(DateTimeKind.Utc, snapshot.TerminalAtUtc!.Value.Kind);
Assert.True(snapshot.TerminalAtUtc >= beforeTerminal.AddSeconds(-1));
Assert.Equal("HTTP 503 (max retries)", snapshot.LastError);
Assert.Equal(503, snapshot.HttpStatus);
}
[Fact]
public async Task GetStatusAsync_Unknown_ReturnsNull()
{
var (store, _) = CreateStore(nameof(GetStatusAsync_Unknown_ReturnsNull));
await using var _store = store;
var unknown = TrackedOperationId.New();
var snapshot = await store.GetStatusAsync(unknown);
Assert.Null(snapshot);
}
[Fact]
public async Task GetStatusAsync_ReturnsLatestSnapshot_AfterMultipleAttempts()
{
var (store, _) = CreateStore(nameof(GetStatusAsync_ReturnsLatestSnapshot_AfterMultipleAttempts));
await using var _store = store;
var id = TrackedOperationId.New();
await store.RecordEnqueueAsync(id, "ApiCallCached", "ERP.GetOrder", null, null, sourceNode: null);
await store.RecordAttemptAsync(id, nameof(AuditStatus.Attempted), 1, "first failure", 503);
await store.RecordAttemptAsync(id, nameof(AuditStatus.Attempted), 2, "second failure", 503);
await store.RecordAttemptAsync(id, nameof(AuditStatus.Attempted), 3, "third failure", 504);
var snapshot = await store.GetStatusAsync(id);
Assert.NotNull(snapshot);
Assert.Equal(3, snapshot!.RetryCount);
Assert.Equal("third failure", snapshot.LastError);
Assert.Equal(504, snapshot.HttpStatus);
}
[Fact]
public async Task PurgeTerminalAsync_RemovesOldTerminalRows_KeepsRecent_KeepsNonTerminal()
{
var (store, dataSource) = CreateStore(nameof(PurgeTerminalAsync_RemovesOldTerminalRows_KeepsRecent_KeepsNonTerminal));
await using var _store = store;
// Three rows:
// (a) terminal, old → should be purged
// (b) terminal, fresh → should be kept
// (c) non-terminal, ancient CreatedAt → should be kept (no TerminalAtUtc)
var aId = TrackedOperationId.New();
var bId = TrackedOperationId.New();
var cId = TrackedOperationId.New();
await store.RecordEnqueueAsync(aId, "ApiCallCached", "A", null, null, sourceNode: null);
await store.RecordEnqueueAsync(bId, "ApiCallCached", "B", null, null, sourceNode: null);
await store.RecordEnqueueAsync(cId, "ApiCallCached", "C", null, null, sourceNode: null);
await store.RecordTerminalAsync(aId, nameof(AuditStatus.Delivered), null, 200);
await store.RecordTerminalAsync(bId, nameof(AuditStatus.Delivered), null, 200);
// Backdate the (a) row's TerminalAtUtc to 30 days ago via a direct UPDATE
// — RecordTerminalAsync stamps DateTime.UtcNow which we cannot inject.
// The verifier connection shares the same in-memory store thanks to
// mode=memory&cache=shared.
using (var connection = OpenVerifierConnection(dataSource))
using (var cmd = connection.CreateCommand())
{
cmd.CommandText =
"UPDATE OperationTracking SET TerminalAtUtc = $old WHERE TrackedOperationId = $id;";
cmd.Parameters.AddWithValue("$old", DateTime.UtcNow.AddDays(-30).ToString("o"));
cmd.Parameters.AddWithValue("$id", aId.ToString());
cmd.ExecuteNonQuery();
}
// Purge anything terminal older than 7 days.
var threshold = DateTime.UtcNow.AddDays(-7);
await store.PurgeTerminalAsync(threshold);
Assert.Null(await store.GetStatusAsync(aId)); // purged
Assert.NotNull(await store.GetStatusAsync(bId)); // kept (recent terminal)
Assert.NotNull(await store.GetStatusAsync(cId)); // kept (non-terminal)
}
}