using Microsoft.Data.Sqlite;
using Microsoft.Extensions.Logging.Abstractions;
using ScadaLink.Commons.Types.Enums;
namespace ScadaLink.StoreAndForward.Tests;
///
/// WP-9: Tests for SQLite persistence layer.
/// Uses in-memory SQLite with a kept-alive connection for test isolation.
///
public class StoreAndForwardStorageTests : IAsyncLifetime, IDisposable
{
private readonly SqliteConnection _keepAlive;
private readonly StoreAndForwardStorage _storage;
private readonly string _dbName;
public StoreAndForwardStorageTests()
{
_dbName = $"StorageTests_{Guid.NewGuid():N}";
var connStr = $"Data Source={_dbName};Mode=Memory;Cache=Shared";
// Keep one connection alive so the in-memory DB persists
_keepAlive = new SqliteConnection(connStr);
_keepAlive.Open();
_storage = new StoreAndForwardStorage(connStr, NullLogger.Instance);
}
public async Task InitializeAsync() => await _storage.InitializeAsync();
public Task DisposeAsync() => Task.CompletedTask;
public void Dispose()
{
_keepAlive.Dispose();
}
[Fact]
public async Task EnqueueAsync_StoresMessage()
{
var message = CreateMessage("msg1", StoreAndForwardCategory.ExternalSystem);
await _storage.EnqueueAsync(message);
var retrieved = await _storage.GetMessageByIdAsync("msg1");
Assert.NotNull(retrieved);
Assert.Equal("msg1", retrieved!.Id);
Assert.Equal(StoreAndForwardCategory.ExternalSystem, retrieved.Category);
Assert.Equal("target1", retrieved.Target);
}
[Fact]
public async Task EnqueueAsync_AllCategories()
{
await _storage.EnqueueAsync(CreateMessage("es1", StoreAndForwardCategory.ExternalSystem));
await _storage.EnqueueAsync(CreateMessage("n1", StoreAndForwardCategory.Notification));
await _storage.EnqueueAsync(CreateMessage("db1", StoreAndForwardCategory.CachedDbWrite));
var es = await _storage.GetMessageByIdAsync("es1");
var n = await _storage.GetMessageByIdAsync("n1");
var db = await _storage.GetMessageByIdAsync("db1");
Assert.Equal(StoreAndForwardCategory.ExternalSystem, es!.Category);
Assert.Equal(StoreAndForwardCategory.Notification, n!.Category);
Assert.Equal(StoreAndForwardCategory.CachedDbWrite, db!.Category);
}
[Fact]
public async Task RemoveMessageAsync_RemovesSuccessfully()
{
await _storage.EnqueueAsync(CreateMessage("rm1", StoreAndForwardCategory.ExternalSystem));
await _storage.RemoveMessageAsync("rm1");
var retrieved = await _storage.GetMessageByIdAsync("rm1");
Assert.Null(retrieved);
}
[Fact]
public async Task UpdateMessageAsync_UpdatesFields()
{
var message = CreateMessage("upd1", StoreAndForwardCategory.ExternalSystem);
await _storage.EnqueueAsync(message);
message.RetryCount = 5;
message.LastAttemptAt = DateTimeOffset.UtcNow;
message.Status = StoreAndForwardMessageStatus.Parked;
message.LastError = "Connection refused";
await _storage.UpdateMessageAsync(message);
var retrieved = await _storage.GetMessageByIdAsync("upd1");
Assert.Equal(5, retrieved!.RetryCount);
Assert.Equal(StoreAndForwardMessageStatus.Parked, retrieved.Status);
Assert.Equal("Connection refused", retrieved.LastError);
}
[Fact]
public async Task GetMessagesForRetryAsync_ReturnsOnlyPendingMessages()
{
var pending = CreateMessage("pend1", StoreAndForwardCategory.ExternalSystem);
pending.Status = StoreAndForwardMessageStatus.Pending;
await _storage.EnqueueAsync(pending);
var parked = CreateMessage("park1", StoreAndForwardCategory.ExternalSystem);
parked.Status = StoreAndForwardMessageStatus.Parked;
await _storage.EnqueueAsync(parked);
await _storage.UpdateMessageAsync(parked);
var forRetry = await _storage.GetMessagesForRetryAsync();
Assert.All(forRetry, m => Assert.Equal(StoreAndForwardMessageStatus.Pending, m.Status));
}
[Fact]
public async Task GetMessagesForRetryAsync_NonZeroInterval_ExcludesNotYetDueIncludesDue()
{
// StoreAndForward-013: exercise the julianday elapsed-time comparison with a
// non-zero retry interval. A message attempted just now must NOT be due; one
// attempted long ago must be due.
var notDue = CreateMessage("notdue", StoreAndForwardCategory.ExternalSystem);
notDue.RetryIntervalMs = (long)TimeSpan.FromHours(1).TotalMilliseconds;
notDue.LastAttemptAt = DateTimeOffset.UtcNow;
await _storage.EnqueueAsync(notDue);
var due = CreateMessage("due", StoreAndForwardCategory.ExternalSystem);
due.RetryIntervalMs = (long)TimeSpan.FromMinutes(5).TotalMilliseconds;
due.LastAttemptAt = DateTimeOffset.UtcNow.AddHours(-2);
await _storage.EnqueueAsync(due);
var neverAttempted = CreateMessage("never", StoreAndForwardCategory.ExternalSystem);
neverAttempted.RetryIntervalMs = (long)TimeSpan.FromHours(1).TotalMilliseconds;
neverAttempted.LastAttemptAt = null;
await _storage.EnqueueAsync(neverAttempted);
var forRetry = await _storage.GetMessagesForRetryAsync();
var ids = forRetry.Select(m => m.Id).ToHashSet();
Assert.DoesNotContain("notdue", ids);
Assert.Contains("due", ids);
Assert.Contains("never", ids);
}
[Fact]
public async Task GetParkedMessagesAsync_ReturnsParkedOnly()
{
var msg = CreateMessage("prk1", StoreAndForwardCategory.Notification);
msg.Status = StoreAndForwardMessageStatus.Parked;
await _storage.EnqueueAsync(msg);
await _storage.UpdateMessageAsync(msg);
var (messages, total) = await _storage.GetParkedMessagesAsync();
Assert.True(total > 0);
Assert.All(messages, m => Assert.Equal(StoreAndForwardMessageStatus.Parked, m.Status));
}
[Fact]
public async Task RetryParkedMessageAsync_MovesToPending()
{
var msg = CreateMessage("retry1", StoreAndForwardCategory.ExternalSystem);
msg.Status = StoreAndForwardMessageStatus.Parked;
msg.RetryCount = 10;
await _storage.EnqueueAsync(msg);
await _storage.UpdateMessageAsync(msg);
var success = await _storage.RetryParkedMessageAsync("retry1");
Assert.True(success);
var retrieved = await _storage.GetMessageByIdAsync("retry1");
Assert.Equal(StoreAndForwardMessageStatus.Pending, retrieved!.Status);
Assert.Equal(0, retrieved.RetryCount);
}
[Fact]
public async Task RetryParkedMessageAsync_ClearsLastAttemptAt_SoMessageIsImmediatelyDue()
{
// StoreAndForward-010: a re-queued parked message must be unambiguously due
// for the next sweep regardless of its (stale) last_attempt_at. Use a large
// retry interval so a leftover timestamp would otherwise exclude the message.
var msg = CreateMessage("requeue1", StoreAndForwardCategory.ExternalSystem);
msg.RetryIntervalMs = (long)TimeSpan.FromHours(1).TotalMilliseconds;
msg.LastAttemptAt = DateTimeOffset.UtcNow; // recent attempt
msg.Status = StoreAndForwardMessageStatus.Parked;
await _storage.EnqueueAsync(msg);
await _storage.UpdateMessageAsync(msg);
var requeued = await _storage.RetryParkedMessageAsync("requeue1");
Assert.True(requeued);
var retrieved = await _storage.GetMessageByIdAsync("requeue1");
Assert.Null(retrieved!.LastAttemptAt);
// It must appear in the retry-due set even though the configured interval
// (1 hour) has not elapsed since the original attempt.
var due = await _storage.GetMessagesForRetryAsync();
Assert.Contains(due, m => m.Id == "requeue1");
}
[Fact]
public async Task DiscardParkedMessageAsync_RemovesMessage()
{
var msg = CreateMessage("disc1", StoreAndForwardCategory.ExternalSystem);
msg.Status = StoreAndForwardMessageStatus.Parked;
await _storage.EnqueueAsync(msg);
await _storage.UpdateMessageAsync(msg);
var success = await _storage.DiscardParkedMessageAsync("disc1");
Assert.True(success);
var retrieved = await _storage.GetMessageByIdAsync("disc1");
Assert.Null(retrieved);
}
[Fact]
public async Task GetBufferDepthByCategoryAsync_ReturnsCorrectCounts()
{
await _storage.EnqueueAsync(CreateMessage("bd1", StoreAndForwardCategory.ExternalSystem));
await _storage.EnqueueAsync(CreateMessage("bd2", StoreAndForwardCategory.ExternalSystem));
await _storage.EnqueueAsync(CreateMessage("bd3", StoreAndForwardCategory.Notification));
var depth = await _storage.GetBufferDepthByCategoryAsync();
Assert.True(depth.GetValueOrDefault(StoreAndForwardCategory.ExternalSystem) >= 2);
}
[Fact]
public async Task GetMessageCountByOriginInstanceAsync_ReturnsCount()
{
var msg1 = CreateMessage("oi1", StoreAndForwardCategory.ExternalSystem);
msg1.OriginInstanceName = "Pump1";
await _storage.EnqueueAsync(msg1);
var msg2 = CreateMessage("oi2", StoreAndForwardCategory.Notification);
msg2.OriginInstanceName = "Pump1";
await _storage.EnqueueAsync(msg2);
var count = await _storage.GetMessageCountByOriginInstanceAsync("Pump1");
Assert.Equal(2, count);
}
[Fact]
public async Task GetParkedMessagesAsync_Pagination()
{
for (int i = 0; i < 5; i++)
{
var msg = CreateMessage($"page{i}", StoreAndForwardCategory.ExternalSystem);
msg.Status = StoreAndForwardMessageStatus.Parked;
await _storage.EnqueueAsync(msg);
await _storage.UpdateMessageAsync(msg);
}
var (page1, total) = await _storage.GetParkedMessagesAsync(pageNumber: 1, pageSize: 2);
Assert.Equal(2, page1.Count);
Assert.True(total >= 5);
var (page2, _) = await _storage.GetParkedMessagesAsync(pageNumber: 2, pageSize: 2);
Assert.Equal(2, page2.Count);
}
[Fact]
public async Task GetParkedMessagesAsync_TransactionedReads_CountMatchesFullResultSet()
{
// StoreAndForward-006: the COUNT(*) and paged SELECT now run inside one
// transaction so they share a consistent snapshot. This functional check
// guards the fix — it verifies the transaction wiring did not break paging:
// the reported TotalCount and the rows assembled across all pages agree, and
// a page wide enough to hold every parked row contains exactly TotalCount rows.
for (int i = 0; i < 25; i++)
{
var m = CreateMessage($"txn-{i}", StoreAndForwardCategory.ExternalSystem);
m.Status = StoreAndForwardMessageStatus.Parked;
await _storage.EnqueueAsync(m);
await _storage.UpdateMessageAsync(m);
}
var (wholePage, wholeTotal) = await _storage.GetParkedMessagesAsync(pageNumber: 1, pageSize: 1000);
Assert.Equal(25, wholeTotal);
Assert.Equal(wholeTotal, wholePage.Count);
var collected = new List();
int reportedTotal = -1;
for (int page = 1; ; page++)
{
var (rows, total) = await _storage.GetParkedMessagesAsync(pageNumber: page, pageSize: 7);
reportedTotal = total;
collected.AddRange(rows.Select(r => r.Id));
if (rows.Count < 7) break;
}
Assert.Equal(reportedTotal, collected.Count);
Assert.Equal(25, collected.Distinct().Count());
}
[Fact]
public async Task GetMessageCountByStatusAsync_ReturnsAccurateCount()
{
var msg = CreateMessage("cnt1", StoreAndForwardCategory.ExternalSystem);
await _storage.EnqueueAsync(msg);
var count = await _storage.GetMessageCountByStatusAsync(StoreAndForwardMessageStatus.Pending);
Assert.True(count >= 1);
}
// ── Audit Log #23 (ExecutionId Task 4): execution_id / source_script ──
[Fact]
public async Task EnqueueAsync_RoundTripsExecutionIdAndSourceScript()
{
// A cached call buffered on a transient failure carries the originating
// script execution's ExecutionId + SourceScript; both must survive a
// persist + read-back so the retry loop can stamp them on audit rows.
var executionId = Guid.NewGuid();
var message = CreateMessage("exec1", StoreAndForwardCategory.ExternalSystem);
message.ExecutionId = executionId;
message.SourceScript = "Plant.Pump42/OnTick";
await _storage.EnqueueAsync(message);
var retrieved = await _storage.GetMessageByIdAsync("exec1");
Assert.NotNull(retrieved);
Assert.Equal(executionId, retrieved!.ExecutionId);
Assert.Equal("Plant.Pump42/OnTick", retrieved.SourceScript);
}
[Fact]
public async Task EnqueueAsync_NullExecutionIdAndSourceScript_RoundTripAsNull()
{
// Non-cached-call enqueues (notifications) supply neither field — they
// must round-trip as null rather than throwing or coercing.
var message = CreateMessage("noexec1", StoreAndForwardCategory.Notification);
Assert.Null(message.ExecutionId);
Assert.Null(message.SourceScript);
await _storage.EnqueueAsync(message);
var retrieved = await _storage.GetMessageByIdAsync("noexec1");
Assert.NotNull(retrieved);
Assert.Null(retrieved!.ExecutionId);
Assert.Null(retrieved.SourceScript);
}
[Fact]
public async Task ExecutionIdAndSourceScript_SurviveRetrySweepRead()
{
// The retry sweep reads due rows via GetMessagesForRetryAsync; the new
// fields must be present on that read path too (it is the path that
// feeds the CachedCallAttemptContext).
var executionId = Guid.NewGuid();
var message = CreateMessage("sweep1", StoreAndForwardCategory.CachedDbWrite);
message.ExecutionId = executionId;
message.SourceScript = "Plant.Tank/OnAlarm";
message.LastAttemptAt = null; // due immediately
await _storage.EnqueueAsync(message);
var due = await _storage.GetMessagesForRetryAsync();
var row = Assert.Single(due, m => m.Id == "sweep1");
Assert.Equal(executionId, row.ExecutionId);
Assert.Equal("Plant.Tank/OnAlarm", row.SourceScript);
}
[Fact]
public async Task LegacyRowWithoutNewColumns_ReadsBackAsNull()
{
// Back-compat: a row persisted by a build that pre-dates the
// execution_id / source_script columns must still deserialize, with
// ExecutionId / SourceScript reading back as null. Simulate the legacy
// schema by dropping the table and recreating it without the columns,
// inserting directly, then running InitializeAsync (which ALTER-adds
// the columns) and reading the row back.
await using (var setup = new SqliteConnection($"Data Source={_dbName};Mode=Memory;Cache=Shared"))
{
await setup.OpenAsync();
await using var drop = setup.CreateCommand();
drop.CommandText = @"
DROP TABLE IF EXISTS sf_messages;
CREATE TABLE sf_messages (
id TEXT PRIMARY KEY,
category INTEGER NOT NULL,
target TEXT NOT NULL,
payload_json TEXT NOT NULL,
retry_count INTEGER NOT NULL DEFAULT 0,
max_retries INTEGER NOT NULL DEFAULT 50,
retry_interval_ms INTEGER NOT NULL DEFAULT 30000,
created_at TEXT NOT NULL,
last_attempt_at TEXT,
status INTEGER NOT NULL DEFAULT 0,
last_error TEXT,
origin_instance TEXT
);
INSERT INTO sf_messages (id, category, target, payload_json, created_at, status)
VALUES ('legacy1', 0, 'ERP', '{}', '2026-01-01T00:00:00.0000000+00:00', 0);";
await drop.ExecuteNonQueryAsync();
}
// InitializeAsync must additively ALTER-in the new columns without
// disturbing the pre-existing legacy row.
await _storage.InitializeAsync();
var retrieved = await _storage.GetMessageByIdAsync("legacy1");
Assert.NotNull(retrieved);
Assert.Equal("legacy1", retrieved!.Id);
Assert.Null(retrieved.ExecutionId);
Assert.Null(retrieved.SourceScript);
}
[Fact]
public async Task MalformedExecutionId_ReadsBackAsNull_DoesNotAbortRetrySweep()
{
// Defensive read path: a corrupt (non-null, non-GUID) execution_id must
// be treated as "no execution id" rather than throwing FormatException
// — a single bad row must not abort the whole GetMessagesForRetryAsync
// sweep, which reads many rows. Persist two due rows, then corrupt the
// execution_id of one directly in the DB.
var goodId = Guid.NewGuid();
var good = CreateMessage("good1", StoreAndForwardCategory.ExternalSystem);
good.ExecutionId = goodId;
good.LastAttemptAt = null; // due immediately
await _storage.EnqueueAsync(good);
var bad = CreateMessage("bad1", StoreAndForwardCategory.ExternalSystem);
bad.ExecutionId = Guid.NewGuid();
bad.LastAttemptAt = null; // due immediately
await _storage.EnqueueAsync(bad);
await using (var conn = new SqliteConnection($"Data Source={_dbName};Mode=Memory;Cache=Shared"))
{
await conn.OpenAsync();
await using var corrupt = conn.CreateCommand();
corrupt.CommandText =
"UPDATE sf_messages SET execution_id = 'not-a-guid' WHERE id = 'bad1';";
await corrupt.ExecuteNonQueryAsync();
}
// The sweep must not throw; the corrupt row reads back with a null
// ExecutionId, the well-formed row keeps its value.
var due = await _storage.GetMessagesForRetryAsync();
Assert.Null(Assert.Single(due, m => m.Id == "bad1").ExecutionId);
Assert.Equal(goodId, Assert.Single(due, m => m.Id == "good1").ExecutionId);
// The single-row read path is equally defensive.
var retrieved = await _storage.GetMessageByIdAsync("bad1");
Assert.NotNull(retrieved);
Assert.Null(retrieved!.ExecutionId);
}
[Fact]
public async Task InitializeAsync_IsIdempotent_WhenColumnsAlreadyExist()
{
// The additive ALTER must not fail on a second InitializeAsync call
// (SQLite has no ADD COLUMN IF NOT EXISTS — the probe must skip it).
await _storage.InitializeAsync();
await _storage.InitializeAsync();
var message = CreateMessage("idem1", StoreAndForwardCategory.ExternalSystem);
message.ExecutionId = Guid.NewGuid();
await _storage.EnqueueAsync(message);
var retrieved = await _storage.GetMessageByIdAsync("idem1");
Assert.NotNull(retrieved);
Assert.Equal(message.ExecutionId, retrieved!.ExecutionId);
}
private static StoreAndForwardMessage CreateMessage(string id, StoreAndForwardCategory category)
{
return new StoreAndForwardMessage
{
Id = id,
Category = category,
Target = "target1",
PayloadJson = """{"method":"Test","args":{}}""",
RetryCount = 0,
MaxRetries = 50,
RetryIntervalMs = 30000,
CreatedAt = DateTimeOffset.UtcNow,
Status = StoreAndForwardMessageStatus.Pending
};
}
[Fact]
public async Task InitializeAsync_FileInMissingDirectory_CreatesDirectory()
{
// SQLite creates the database file on demand but not its parent directory;
// the storage must create the directory itself or OpenAsync fails with
// "unable to open database file" (the cause of the SiteActorPathTests failures).
var directory = Path.Combine(Path.GetTempPath(), "sf-storage-test-" + Guid.NewGuid().ToString("N"));
var dbPath = Path.Combine(directory, "store-and-forward.db");
Assert.False(Directory.Exists(directory));
try
{
var storage = new StoreAndForwardStorage(
$"Data Source={dbPath}", NullLogger.Instance);
await storage.InitializeAsync();
Assert.True(Directory.Exists(directory));
Assert.True(File.Exists(dbPath));
}
finally
{
if (Directory.Exists(directory))
Directory.Delete(directory, recursive: true);
}
}
}