using Microsoft.Data.Sqlite; using Microsoft.Extensions.Logging.Abstractions; using ScadaLink.Commons.Types.Enums; namespace ScadaLink.StoreAndForward.Tests; /// /// WP-9: Tests for SQLite persistence layer. /// Uses in-memory SQLite with a kept-alive connection for test isolation. /// public class StoreAndForwardStorageTests : IAsyncLifetime, IDisposable { private readonly SqliteConnection _keepAlive; private readonly StoreAndForwardStorage _storage; private readonly string _dbName; public StoreAndForwardStorageTests() { _dbName = $"StorageTests_{Guid.NewGuid():N}"; var connStr = $"Data Source={_dbName};Mode=Memory;Cache=Shared"; // Keep one connection alive so the in-memory DB persists _keepAlive = new SqliteConnection(connStr); _keepAlive.Open(); _storage = new StoreAndForwardStorage(connStr, NullLogger.Instance); } public async Task InitializeAsync() => await _storage.InitializeAsync(); public Task DisposeAsync() => Task.CompletedTask; public void Dispose() { _keepAlive.Dispose(); } [Fact] public async Task EnqueueAsync_StoresMessage() { var message = CreateMessage("msg1", StoreAndForwardCategory.ExternalSystem); await _storage.EnqueueAsync(message); var retrieved = await _storage.GetMessageByIdAsync("msg1"); Assert.NotNull(retrieved); Assert.Equal("msg1", retrieved!.Id); Assert.Equal(StoreAndForwardCategory.ExternalSystem, retrieved.Category); Assert.Equal("target1", retrieved.Target); } [Fact] public async Task EnqueueAsync_AllCategories() { await _storage.EnqueueAsync(CreateMessage("es1", StoreAndForwardCategory.ExternalSystem)); await _storage.EnqueueAsync(CreateMessage("n1", StoreAndForwardCategory.Notification)); await _storage.EnqueueAsync(CreateMessage("db1", StoreAndForwardCategory.CachedDbWrite)); var es = await _storage.GetMessageByIdAsync("es1"); var n = await _storage.GetMessageByIdAsync("n1"); var db = await _storage.GetMessageByIdAsync("db1"); Assert.Equal(StoreAndForwardCategory.ExternalSystem, es!.Category); Assert.Equal(StoreAndForwardCategory.Notification, n!.Category); Assert.Equal(StoreAndForwardCategory.CachedDbWrite, db!.Category); } [Fact] public async Task RemoveMessageAsync_RemovesSuccessfully() { await _storage.EnqueueAsync(CreateMessage("rm1", StoreAndForwardCategory.ExternalSystem)); await _storage.RemoveMessageAsync("rm1"); var retrieved = await _storage.GetMessageByIdAsync("rm1"); Assert.Null(retrieved); } [Fact] public async Task UpdateMessageAsync_UpdatesFields() { var message = CreateMessage("upd1", StoreAndForwardCategory.ExternalSystem); await _storage.EnqueueAsync(message); message.RetryCount = 5; message.LastAttemptAt = DateTimeOffset.UtcNow; message.Status = StoreAndForwardMessageStatus.Parked; message.LastError = "Connection refused"; await _storage.UpdateMessageAsync(message); var retrieved = await _storage.GetMessageByIdAsync("upd1"); Assert.Equal(5, retrieved!.RetryCount); Assert.Equal(StoreAndForwardMessageStatus.Parked, retrieved.Status); Assert.Equal("Connection refused", retrieved.LastError); } [Fact] public async Task GetMessagesForRetryAsync_ReturnsOnlyPendingMessages() { var pending = CreateMessage("pend1", StoreAndForwardCategory.ExternalSystem); pending.Status = StoreAndForwardMessageStatus.Pending; await _storage.EnqueueAsync(pending); var parked = CreateMessage("park1", StoreAndForwardCategory.ExternalSystem); parked.Status = StoreAndForwardMessageStatus.Parked; await _storage.EnqueueAsync(parked); await _storage.UpdateMessageAsync(parked); var forRetry = await _storage.GetMessagesForRetryAsync(); Assert.All(forRetry, m => Assert.Equal(StoreAndForwardMessageStatus.Pending, m.Status)); } [Fact] public async Task GetMessagesForRetryAsync_NonZeroInterval_ExcludesNotYetDueIncludesDue() { // StoreAndForward-013: exercise the julianday elapsed-time comparison with a // non-zero retry interval. A message attempted just now must NOT be due; one // attempted long ago must be due. var notDue = CreateMessage("notdue", StoreAndForwardCategory.ExternalSystem); notDue.RetryIntervalMs = (long)TimeSpan.FromHours(1).TotalMilliseconds; notDue.LastAttemptAt = DateTimeOffset.UtcNow; await _storage.EnqueueAsync(notDue); var due = CreateMessage("due", StoreAndForwardCategory.ExternalSystem); due.RetryIntervalMs = (long)TimeSpan.FromMinutes(5).TotalMilliseconds; due.LastAttemptAt = DateTimeOffset.UtcNow.AddHours(-2); await _storage.EnqueueAsync(due); var neverAttempted = CreateMessage("never", StoreAndForwardCategory.ExternalSystem); neverAttempted.RetryIntervalMs = (long)TimeSpan.FromHours(1).TotalMilliseconds; neverAttempted.LastAttemptAt = null; await _storage.EnqueueAsync(neverAttempted); var forRetry = await _storage.GetMessagesForRetryAsync(); var ids = forRetry.Select(m => m.Id).ToHashSet(); Assert.DoesNotContain("notdue", ids); Assert.Contains("due", ids); Assert.Contains("never", ids); } [Fact] public async Task GetParkedMessagesAsync_ReturnsParkedOnly() { var msg = CreateMessage("prk1", StoreAndForwardCategory.Notification); msg.Status = StoreAndForwardMessageStatus.Parked; await _storage.EnqueueAsync(msg); await _storage.UpdateMessageAsync(msg); var (messages, total) = await _storage.GetParkedMessagesAsync(); Assert.True(total > 0); Assert.All(messages, m => Assert.Equal(StoreAndForwardMessageStatus.Parked, m.Status)); } [Fact] public async Task RetryParkedMessageAsync_MovesToPending() { var msg = CreateMessage("retry1", StoreAndForwardCategory.ExternalSystem); msg.Status = StoreAndForwardMessageStatus.Parked; msg.RetryCount = 10; await _storage.EnqueueAsync(msg); await _storage.UpdateMessageAsync(msg); var success = await _storage.RetryParkedMessageAsync("retry1"); Assert.True(success); var retrieved = await _storage.GetMessageByIdAsync("retry1"); Assert.Equal(StoreAndForwardMessageStatus.Pending, retrieved!.Status); Assert.Equal(0, retrieved.RetryCount); } [Fact] public async Task RetryParkedMessageAsync_ClearsLastAttemptAt_SoMessageIsImmediatelyDue() { // StoreAndForward-010: a re-queued parked message must be unambiguously due // for the next sweep regardless of its (stale) last_attempt_at. Use a large // retry interval so a leftover timestamp would otherwise exclude the message. var msg = CreateMessage("requeue1", StoreAndForwardCategory.ExternalSystem); msg.RetryIntervalMs = (long)TimeSpan.FromHours(1).TotalMilliseconds; msg.LastAttemptAt = DateTimeOffset.UtcNow; // recent attempt msg.Status = StoreAndForwardMessageStatus.Parked; await _storage.EnqueueAsync(msg); await _storage.UpdateMessageAsync(msg); var requeued = await _storage.RetryParkedMessageAsync("requeue1"); Assert.True(requeued); var retrieved = await _storage.GetMessageByIdAsync("requeue1"); Assert.Null(retrieved!.LastAttemptAt); // It must appear in the retry-due set even though the configured interval // (1 hour) has not elapsed since the original attempt. var due = await _storage.GetMessagesForRetryAsync(); Assert.Contains(due, m => m.Id == "requeue1"); } [Fact] public async Task DiscardParkedMessageAsync_RemovesMessage() { var msg = CreateMessage("disc1", StoreAndForwardCategory.ExternalSystem); msg.Status = StoreAndForwardMessageStatus.Parked; await _storage.EnqueueAsync(msg); await _storage.UpdateMessageAsync(msg); var success = await _storage.DiscardParkedMessageAsync("disc1"); Assert.True(success); var retrieved = await _storage.GetMessageByIdAsync("disc1"); Assert.Null(retrieved); } [Fact] public async Task GetBufferDepthByCategoryAsync_ReturnsCorrectCounts() { await _storage.EnqueueAsync(CreateMessage("bd1", StoreAndForwardCategory.ExternalSystem)); await _storage.EnqueueAsync(CreateMessage("bd2", StoreAndForwardCategory.ExternalSystem)); await _storage.EnqueueAsync(CreateMessage("bd3", StoreAndForwardCategory.Notification)); var depth = await _storage.GetBufferDepthByCategoryAsync(); Assert.True(depth.GetValueOrDefault(StoreAndForwardCategory.ExternalSystem) >= 2); } [Fact] public async Task GetMessageCountByOriginInstanceAsync_ReturnsCount() { var msg1 = CreateMessage("oi1", StoreAndForwardCategory.ExternalSystem); msg1.OriginInstanceName = "Pump1"; await _storage.EnqueueAsync(msg1); var msg2 = CreateMessage("oi2", StoreAndForwardCategory.Notification); msg2.OriginInstanceName = "Pump1"; await _storage.EnqueueAsync(msg2); var count = await _storage.GetMessageCountByOriginInstanceAsync("Pump1"); Assert.Equal(2, count); } [Fact] public async Task GetParkedMessagesAsync_Pagination() { for (int i = 0; i < 5; i++) { var msg = CreateMessage($"page{i}", StoreAndForwardCategory.ExternalSystem); msg.Status = StoreAndForwardMessageStatus.Parked; await _storage.EnqueueAsync(msg); await _storage.UpdateMessageAsync(msg); } var (page1, total) = await _storage.GetParkedMessagesAsync(pageNumber: 1, pageSize: 2); Assert.Equal(2, page1.Count); Assert.True(total >= 5); var (page2, _) = await _storage.GetParkedMessagesAsync(pageNumber: 2, pageSize: 2); Assert.Equal(2, page2.Count); } [Fact] public async Task GetParkedMessagesAsync_TransactionedReads_CountMatchesFullResultSet() { // StoreAndForward-006: the COUNT(*) and paged SELECT now run inside one // transaction so they share a consistent snapshot. This functional check // guards the fix — it verifies the transaction wiring did not break paging: // the reported TotalCount and the rows assembled across all pages agree, and // a page wide enough to hold every parked row contains exactly TotalCount rows. for (int i = 0; i < 25; i++) { var m = CreateMessage($"txn-{i}", StoreAndForwardCategory.ExternalSystem); m.Status = StoreAndForwardMessageStatus.Parked; await _storage.EnqueueAsync(m); await _storage.UpdateMessageAsync(m); } var (wholePage, wholeTotal) = await _storage.GetParkedMessagesAsync(pageNumber: 1, pageSize: 1000); Assert.Equal(25, wholeTotal); Assert.Equal(wholeTotal, wholePage.Count); var collected = new List(); int reportedTotal = -1; for (int page = 1; ; page++) { var (rows, total) = await _storage.GetParkedMessagesAsync(pageNumber: page, pageSize: 7); reportedTotal = total; collected.AddRange(rows.Select(r => r.Id)); if (rows.Count < 7) break; } Assert.Equal(reportedTotal, collected.Count); Assert.Equal(25, collected.Distinct().Count()); } [Fact] public async Task GetMessageCountByStatusAsync_ReturnsAccurateCount() { var msg = CreateMessage("cnt1", StoreAndForwardCategory.ExternalSystem); await _storage.EnqueueAsync(msg); var count = await _storage.GetMessageCountByStatusAsync(StoreAndForwardMessageStatus.Pending); Assert.True(count >= 1); } private static StoreAndForwardMessage CreateMessage(string id, StoreAndForwardCategory category) { return new StoreAndForwardMessage { Id = id, Category = category, Target = "target1", PayloadJson = """{"method":"Test","args":{}}""", RetryCount = 0, MaxRetries = 50, RetryIntervalMs = 30000, CreatedAt = DateTimeOffset.UtcNow, Status = StoreAndForwardMessageStatus.Pending }; } [Fact] public async Task InitializeAsync_FileInMissingDirectory_CreatesDirectory() { // SQLite creates the database file on demand but not its parent directory; // the storage must create the directory itself or OpenAsync fails with // "unable to open database file" (the cause of the SiteActorPathTests failures). var directory = Path.Combine(Path.GetTempPath(), "sf-storage-test-" + Guid.NewGuid().ToString("N")); var dbPath = Path.Combine(directory, "store-and-forward.db"); Assert.False(Directory.Exists(directory)); try { var storage = new StoreAndForwardStorage( $"Data Source={dbPath}", NullLogger.Instance); await storage.InitializeAsync(); Assert.True(Directory.Exists(directory)); Assert.True(File.Exists(dbPath)); } finally { if (Directory.Exists(directory)) Directory.Delete(directory, recursive: true); } } }