using Microsoft.Data.Sqlite; using Microsoft.Extensions.Logging; using ScadaLink.Commons.Types.Enums; namespace ScadaLink.StoreAndForward; /// /// WP-9: SQLite persistence layer for store-and-forward messages. /// Uses direct Microsoft.Data.Sqlite (not EF Core) for lightweight site-side storage. /// No max buffer size per design decision. /// /// StoreAndForward-008: every method opens a fresh for /// the duration of the call rather than holding a long-lived connection. This is a /// deliberate trade-off, not an oversight: Microsoft.Data.Sqlite maintains an internal /// connection pool keyed on the connection string, so OpenAsync on a previously /// used connection string reuses a pooled handle instead of performing a real file /// open. The retry sweep therefore relies on that pool for acceptable performance — /// it calls / /// once per due message, and with no max buffer size (by design) the buffer can grow /// large. The connection-per-call style keeps each method self-contained and /// transaction-scoped; if profiling ever shows the pooled open to be a bottleneck on /// the hot retry path, the remedy is a batched sweep API that opens one connection (and /// one transaction) per sweep. /// public class StoreAndForwardStorage { private readonly string _connectionString; private readonly ILogger _logger; public StoreAndForwardStorage(string connectionString, ILogger logger) { _connectionString = connectionString; _logger = logger; } /// /// Creates the sf_messages table if it does not exist. /// public async Task InitializeAsync() { EnsureDatabaseDirectoryExists(); await using var connection = new SqliteConnection(_connectionString); await connection.OpenAsync(); await using var command = connection.CreateCommand(); command.CommandText = @" CREATE TABLE IF NOT EXISTS sf_messages ( id TEXT PRIMARY KEY, category INTEGER NOT NULL, target TEXT NOT NULL, payload_json TEXT NOT NULL, retry_count INTEGER NOT NULL DEFAULT 0, max_retries INTEGER NOT NULL DEFAULT 50, retry_interval_ms INTEGER NOT NULL DEFAULT 30000, created_at TEXT NOT NULL, last_attempt_at TEXT, status INTEGER NOT NULL DEFAULT 0, last_error TEXT, origin_instance TEXT ); CREATE INDEX IF NOT EXISTS idx_sf_messages_status ON sf_messages(status); CREATE INDEX IF NOT EXISTS idx_sf_messages_category ON sf_messages(category); "; await command.ExecuteNonQueryAsync(); // Audit Log #23 (ExecutionId Task 4): additively add the execution_id / // source_script columns. CREATE TABLE IF NOT EXISTS above does NOT add // columns to a table that already exists from before these fields, so a // databases created by an older build needs the columns ALTER-ed in. // SQLite has no "ADD COLUMN IF NOT EXISTS"; the column presence is // probed first and the ALTER skipped when already there. Both columns // are nullable with no default, so any row buffered before this // migration reads back ExecutionId/SourceScript = null (back-compat). await AddColumnIfMissingAsync(connection, "execution_id", "TEXT"); await AddColumnIfMissingAsync(connection, "source_script", "TEXT"); _logger.LogInformation("Store-and-forward SQLite storage initialized"); } /// /// Audit Log #23 (ExecutionId Task 4): adds a column to sf_messages /// only when it is not already present. SQLite lacks ADD COLUMN IF NOT /// EXISTS, so the schema is probed via PRAGMA table_info first. /// Idempotent — safe to run on every . /// private static async Task AddColumnIfMissingAsync( SqliteConnection connection, string columnName, string columnType) { await using var probe = connection.CreateCommand(); probe.CommandText = "SELECT COUNT(*) FROM pragma_table_info('sf_messages') WHERE name = @name"; probe.Parameters.AddWithValue("@name", columnName); var exists = Convert.ToInt32(await probe.ExecuteScalarAsync()) > 0; if (exists) { return; } await using var alter = connection.CreateCommand(); // Column name + type are caller-controlled constants, never user input — // safe to interpolate (parameters are not permitted in DDL). alter.CommandText = $"ALTER TABLE sf_messages ADD COLUMN {columnName} {columnType}"; await alter.ExecuteNonQueryAsync(); } /// /// Ensures the directory for a file-backed SQLite database exists. SQLite creates /// the database file on demand but not its parent directory, so a configured path /// such as "./data/store-and-forward.db" fails to open ("unable to open database /// file") when the "data" directory does not yet exist. In-memory databases and /// bare filenames in the working directory have no directory to create and are /// skipped. /// private void EnsureDatabaseDirectoryExists() { var builder = new SqliteConnectionStringBuilder(_connectionString); if (builder.Mode == SqliteOpenMode.Memory) return; var dataSource = builder.DataSource; if (string.IsNullOrEmpty(dataSource) || dataSource == ":memory:") return; var directory = System.IO.Path.GetDirectoryName(System.IO.Path.GetFullPath(dataSource)); if (!string.IsNullOrEmpty(directory) && !System.IO.Directory.Exists(directory)) { System.IO.Directory.CreateDirectory(directory); _logger.LogInformation("Created store-and-forward database directory: {Directory}", directory); } } /// /// WP-9: Enqueues a new message with Pending status. /// public async Task EnqueueAsync(StoreAndForwardMessage message) { await using var connection = new SqliteConnection(_connectionString); await connection.OpenAsync(); await using var cmd = connection.CreateCommand(); cmd.CommandText = @" INSERT INTO sf_messages (id, category, target, payload_json, retry_count, max_retries, retry_interval_ms, created_at, last_attempt_at, status, last_error, origin_instance, execution_id, source_script) VALUES (@id, @category, @target, @payload, @retryCount, @maxRetries, @retryIntervalMs, @createdAt, @lastAttempt, @status, @lastError, @origin, @executionId, @sourceScript)"; cmd.Parameters.AddWithValue("@id", message.Id); cmd.Parameters.AddWithValue("@category", (int)message.Category); cmd.Parameters.AddWithValue("@target", message.Target); cmd.Parameters.AddWithValue("@payload", message.PayloadJson); cmd.Parameters.AddWithValue("@retryCount", message.RetryCount); cmd.Parameters.AddWithValue("@maxRetries", message.MaxRetries); cmd.Parameters.AddWithValue("@retryIntervalMs", message.RetryIntervalMs); cmd.Parameters.AddWithValue("@createdAt", message.CreatedAt.ToString("O")); cmd.Parameters.AddWithValue("@lastAttempt", message.LastAttemptAt.HasValue ? message.LastAttemptAt.Value.ToString("O") : DBNull.Value); cmd.Parameters.AddWithValue("@status", (int)message.Status); cmd.Parameters.AddWithValue("@lastError", (object?)message.LastError ?? DBNull.Value); cmd.Parameters.AddWithValue("@origin", (object?)message.OriginInstanceName ?? DBNull.Value); // Audit Log #23 (ExecutionId Task 4): the execution id is stored as its // canonical string form ("D") so it round-trips cleanly through the // TEXT column; null when not a cached call / not threaded. cmd.Parameters.AddWithValue("@executionId", message.ExecutionId.HasValue ? message.ExecutionId.Value.ToString("D") : DBNull.Value); cmd.Parameters.AddWithValue("@sourceScript", (object?)message.SourceScript ?? DBNull.Value); await cmd.ExecuteNonQueryAsync(); } /// /// WP-10: Gets all messages that are due for retry (Pending status, last attempt older than retry interval). /// public async Task> GetMessagesForRetryAsync() { await using var connection = new SqliteConnection(_connectionString); await connection.OpenAsync(); await using var cmd = connection.CreateCommand(); cmd.CommandText = @" SELECT id, category, target, payload_json, retry_count, max_retries, retry_interval_ms, created_at, last_attempt_at, status, last_error, origin_instance, execution_id, source_script FROM sf_messages WHERE status = @pending AND (last_attempt_at IS NULL OR retry_interval_ms = 0 OR (julianday('now') - julianday(last_attempt_at)) * 86400000 >= retry_interval_ms) ORDER BY created_at ASC"; cmd.Parameters.AddWithValue("@pending", (int)StoreAndForwardMessageStatus.Pending); return await ReadMessagesAsync(cmd); } /// /// WP-10: Updates a message after a delivery attempt. /// public async Task UpdateMessageAsync(StoreAndForwardMessage message) { await using var connection = new SqliteConnection(_connectionString); await connection.OpenAsync(); await using var cmd = connection.CreateCommand(); cmd.CommandText = @" UPDATE sf_messages SET retry_count = @retryCount, last_attempt_at = @lastAttempt, status = @status, last_error = @lastError WHERE id = @id"; cmd.Parameters.AddWithValue("@id", message.Id); cmd.Parameters.AddWithValue("@retryCount", message.RetryCount); cmd.Parameters.AddWithValue("@lastAttempt", message.LastAttemptAt.HasValue ? message.LastAttemptAt.Value.ToString("O") : DBNull.Value); cmd.Parameters.AddWithValue("@status", (int)message.Status); cmd.Parameters.AddWithValue("@lastError", (object?)message.LastError ?? DBNull.Value); await cmd.ExecuteNonQueryAsync(); } /// /// WP-10: Updates a message after a delivery attempt, but only if the row is still /// in the expected status. Returns true if the row was updated, false if it had /// already been changed (e.g. an operator retried or discarded the message) and so /// was skipped. /// /// StoreAndForward-005: the retry sweep uses this for its state-changing writes so /// it cannot clobber a concurrent operator action (RetryParkedMessageAsync / /// DiscardParkedMessageAsync). Those operator operations are themselves SQL- /// conditional on status = Parked; making the sweep's writes conditional on /// the status the sweep observed closes the sweep-vs-management race rather than /// relying only on the in-process overlapping-sweep guard. /// public async Task UpdateMessageIfStatusAsync( StoreAndForwardMessage message, StoreAndForwardMessageStatus expectedStatus) { await using var connection = new SqliteConnection(_connectionString); await connection.OpenAsync(); await using var cmd = connection.CreateCommand(); cmd.CommandText = @" UPDATE sf_messages SET retry_count = @retryCount, last_attempt_at = @lastAttempt, status = @status, last_error = @lastError WHERE id = @id AND status = @expectedStatus"; cmd.Parameters.AddWithValue("@id", message.Id); cmd.Parameters.AddWithValue("@retryCount", message.RetryCount); cmd.Parameters.AddWithValue("@lastAttempt", message.LastAttemptAt.HasValue ? message.LastAttemptAt.Value.ToString("O") : DBNull.Value); cmd.Parameters.AddWithValue("@status", (int)message.Status); cmd.Parameters.AddWithValue("@lastError", (object?)message.LastError ?? DBNull.Value); cmd.Parameters.AddWithValue("@expectedStatus", (int)expectedStatus); var rows = await cmd.ExecuteNonQueryAsync(); return rows > 0; } /// /// WP-10: Removes a successfully delivered message. /// public async Task RemoveMessageAsync(string messageId) { await using var connection = new SqliteConnection(_connectionString); await connection.OpenAsync(); await using var cmd = connection.CreateCommand(); cmd.CommandText = "DELETE FROM sf_messages WHERE id = @id"; cmd.Parameters.AddWithValue("@id", messageId); await cmd.ExecuteNonQueryAsync(); } /// /// WP-12: Gets all parked messages, optionally filtered by category, with pagination. /// /// StoreAndForward-006: the COUNT(*) and the paged SELECT run inside a single /// transaction so they observe one consistent snapshot. Without it, a concurrent /// enqueue/park/discard arriving between the two statements yields a TotalCount /// inconsistent with the returned page (flickering totals / off-by-one page math /// in the paginated UI). /// public async Task<(List Messages, int TotalCount)> GetParkedMessagesAsync( StoreAndForwardCategory? category = null, int pageNumber = 1, int pageSize = 50) { await using var connection = new SqliteConnection(_connectionString); await connection.OpenAsync(); await using var transaction = (SqliteTransaction)await connection.BeginTransactionAsync(); // Count await using var countCmd = connection.CreateCommand(); countCmd.Transaction = transaction; countCmd.CommandText = category.HasValue ? "SELECT COUNT(*) FROM sf_messages WHERE status = @parked AND category = @category" : "SELECT COUNT(*) FROM sf_messages WHERE status = @parked"; countCmd.Parameters.AddWithValue("@parked", (int)StoreAndForwardMessageStatus.Parked); if (category.HasValue) countCmd.Parameters.AddWithValue("@category", (int)category.Value); var totalCount = Convert.ToInt32(await countCmd.ExecuteScalarAsync()); // Page await using var pageCmd = connection.CreateCommand(); pageCmd.Transaction = transaction; var categoryFilter = category.HasValue ? " AND category = @category" : ""; pageCmd.CommandText = $@" SELECT id, category, target, payload_json, retry_count, max_retries, retry_interval_ms, created_at, last_attempt_at, status, last_error, origin_instance, execution_id, source_script FROM sf_messages WHERE status = @parked{categoryFilter} ORDER BY created_at ASC LIMIT @limit OFFSET @offset"; pageCmd.Parameters.AddWithValue("@parked", (int)StoreAndForwardMessageStatus.Parked); if (category.HasValue) pageCmd.Parameters.AddWithValue("@category", (int)category.Value); pageCmd.Parameters.AddWithValue("@limit", pageSize); pageCmd.Parameters.AddWithValue("@offset", (pageNumber - 1) * pageSize); var messages = await ReadMessagesAsync(pageCmd); await transaction.CommitAsync(); return (messages, totalCount); } /// /// WP-12: Moves a parked message back to pending for retry. /// /// StoreAndForward-010: last_attempt_at is reset to NULL so the re-queued /// message is unambiguously due on the next retry sweep. An operator-initiated /// retry means "attempt this again now"; leaving the stale parked timestamp in /// place would make the message's retry timing depend on the configured retry /// interval relative to the original (pre-park) attempt — "try immediately" only /// by accident, and a long interval would instead delay the operator's retry. /// public async Task RetryParkedMessageAsync(string messageId) { await using var connection = new SqliteConnection(_connectionString); await connection.OpenAsync(); await using var cmd = connection.CreateCommand(); cmd.CommandText = @" UPDATE sf_messages SET status = @pending, retry_count = 0, last_error = NULL, last_attempt_at = NULL WHERE id = @id AND status = @parked"; cmd.Parameters.AddWithValue("@id", messageId); cmd.Parameters.AddWithValue("@pending", (int)StoreAndForwardMessageStatus.Pending); cmd.Parameters.AddWithValue("@parked", (int)StoreAndForwardMessageStatus.Parked); var rows = await cmd.ExecuteNonQueryAsync(); return rows > 0; } /// /// WP-12: Permanently discards a parked message. /// public async Task DiscardParkedMessageAsync(string messageId) { await using var connection = new SqliteConnection(_connectionString); await connection.OpenAsync(); await using var cmd = connection.CreateCommand(); cmd.CommandText = "DELETE FROM sf_messages WHERE id = @id AND status = @parked"; cmd.Parameters.AddWithValue("@id", messageId); cmd.Parameters.AddWithValue("@parked", (int)StoreAndForwardMessageStatus.Parked); var rows = await cmd.ExecuteNonQueryAsync(); return rows > 0; } /// /// WP-14: Gets buffer depth by category (count of pending messages per category). /// public async Task> GetBufferDepthByCategoryAsync() { await using var connection = new SqliteConnection(_connectionString); await connection.OpenAsync(); await using var cmd = connection.CreateCommand(); cmd.CommandText = @" SELECT category, COUNT(*) as cnt FROM sf_messages WHERE status = @pending GROUP BY category"; cmd.Parameters.AddWithValue("@pending", (int)StoreAndForwardMessageStatus.Pending); var result = new Dictionary(); await using var reader = await cmd.ExecuteReaderAsync(); while (await reader.ReadAsync()) { var category = (StoreAndForwardCategory)reader.GetInt32(0); var count = reader.GetInt32(1); result[category] = count; } return result; } /// /// WP-13: Verifies messages are NOT deleted when an instance is deleted. /// Returns the count of messages for a given origin instance. /// public async Task GetMessageCountByOriginInstanceAsync(string instanceName) { await using var connection = new SqliteConnection(_connectionString); await connection.OpenAsync(); await using var cmd = connection.CreateCommand(); cmd.CommandText = @" SELECT COUNT(*) FROM sf_messages WHERE origin_instance = @origin"; cmd.Parameters.AddWithValue("@origin", instanceName); return Convert.ToInt32(await cmd.ExecuteScalarAsync()); } /// /// Gets a message by ID. /// public async Task GetMessageByIdAsync(string messageId) { await using var connection = new SqliteConnection(_connectionString); await connection.OpenAsync(); await using var cmd = connection.CreateCommand(); cmd.CommandText = @" SELECT id, category, target, payload_json, retry_count, max_retries, retry_interval_ms, created_at, last_attempt_at, status, last_error, origin_instance, execution_id, source_script FROM sf_messages WHERE id = @id"; cmd.Parameters.AddWithValue("@id", messageId); var messages = await ReadMessagesAsync(cmd); return messages.FirstOrDefault(); } /// /// Gets the count of parked messages (for health reporting). /// public async Task GetParkedMessageCountAsync() { await using var conn = new SqliteConnection(_connectionString); await conn.OpenAsync(); await using var cmd = conn.CreateCommand(); cmd.CommandText = "SELECT COUNT(*) FROM sf_messages WHERE status = @parked"; cmd.Parameters.AddWithValue("@parked", (int)StoreAndForwardMessageStatus.Parked); var result = await cmd.ExecuteScalarAsync(); return Convert.ToInt32(result); } /// /// Gets total message count by status. /// public async Task GetMessageCountByStatusAsync(StoreAndForwardMessageStatus status) { await using var connection = new SqliteConnection(_connectionString); await connection.OpenAsync(); await using var cmd = connection.CreateCommand(); cmd.CommandText = "SELECT COUNT(*) FROM sf_messages WHERE status = @status"; cmd.Parameters.AddWithValue("@status", (int)status); return Convert.ToInt32(await cmd.ExecuteScalarAsync()); } private static async Task> ReadMessagesAsync(SqliteCommand cmd) { var results = new List(); await using var reader = await cmd.ExecuteReaderAsync(); while (await reader.ReadAsync()) { results.Add(new StoreAndForwardMessage { Id = reader.GetString(0), Category = (StoreAndForwardCategory)reader.GetInt32(1), Target = reader.GetString(2), PayloadJson = reader.GetString(3), RetryCount = reader.GetInt32(4), MaxRetries = reader.GetInt32(5), RetryIntervalMs = reader.GetInt64(6), CreatedAt = DateTimeOffset.Parse(reader.GetString(7)), LastAttemptAt = reader.IsDBNull(8) ? null : DateTimeOffset.Parse(reader.GetString(8)), Status = (StoreAndForwardMessageStatus)reader.GetInt32(9), LastError = reader.IsDBNull(10) ? null : reader.GetString(10), OriginInstanceName = reader.IsDBNull(11) ? null : reader.GetString(11), // Audit Log #23 (ExecutionId Task 4): rows persisted before the // additive migration have no execution_id / source_script value; // IsDBNull guards keep those reading back as null (back-compat). // Guid.TryParse (not Parse) guards the retry sweep: a corrupt // non-null execution_id is treated as "no execution id" rather // than throwing FormatException and aborting the whole sweep. ExecutionId = ParseExecutionId(reader, 12), SourceScript = reader.IsDBNull(13) ? null : reader.GetString(13) }); } return results; } /// /// Audit Log #23 (ExecutionId Task 4): defensively reads the /// execution_id column. A null value (legacy pre-migration /// rows) and a malformed non-null value both yield null — a corrupt /// id must not throw and abort the retry sweep, which reads many rows. /// private static Guid? ParseExecutionId(System.Data.Common.DbDataReader reader, int ordinal) { if (reader.IsDBNull(ordinal)) { return null; } return Guid.TryParse(reader.GetString(ordinal), out var executionId) ? executionId : null; } }