- AkkaHostedService: SetNodeHostname from NodeOptions - DataConnectionActor: UpdateConnectionEndpoint on state transitions, track per-tag quality counts and UpdateTagQuality on value changes - HealthReportSender: query StoreAndForwardStorage for parked message count - StoreAndForwardStorage: add GetParkedMessageCountAsync()
354 lines
15 KiB
C#
354 lines
15 KiB
C#
using Microsoft.Data.Sqlite;
|
|
using Microsoft.Extensions.Logging;
|
|
using ScadaLink.Commons.Types.Enums;
|
|
|
|
namespace ScadaLink.StoreAndForward;
|
|
|
|
/// <summary>
|
|
/// WP-9: SQLite persistence layer for store-and-forward messages.
|
|
/// Uses direct Microsoft.Data.Sqlite (not EF Core) for lightweight site-side storage.
|
|
/// No max buffer size per design decision.
|
|
/// </summary>
|
|
public class StoreAndForwardStorage
|
|
{
|
|
private readonly string _connectionString;
|
|
private readonly ILogger<StoreAndForwardStorage> _logger;
|
|
|
|
public StoreAndForwardStorage(string connectionString, ILogger<StoreAndForwardStorage> logger)
|
|
{
|
|
_connectionString = connectionString;
|
|
_logger = logger;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Creates the sf_messages table if it does not exist.
|
|
/// </summary>
|
|
public async Task InitializeAsync()
|
|
{
|
|
await using var connection = new SqliteConnection(_connectionString);
|
|
await connection.OpenAsync();
|
|
|
|
await using var command = connection.CreateCommand();
|
|
command.CommandText = @"
|
|
CREATE TABLE IF NOT EXISTS sf_messages (
|
|
id TEXT PRIMARY KEY,
|
|
category INTEGER NOT NULL,
|
|
target TEXT NOT NULL,
|
|
payload_json TEXT NOT NULL,
|
|
retry_count INTEGER NOT NULL DEFAULT 0,
|
|
max_retries INTEGER NOT NULL DEFAULT 50,
|
|
retry_interval_ms INTEGER NOT NULL DEFAULT 30000,
|
|
created_at TEXT NOT NULL,
|
|
last_attempt_at TEXT,
|
|
status INTEGER NOT NULL DEFAULT 0,
|
|
last_error TEXT,
|
|
origin_instance TEXT
|
|
);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_sf_messages_status ON sf_messages(status);
|
|
CREATE INDEX IF NOT EXISTS idx_sf_messages_category ON sf_messages(category);
|
|
";
|
|
await command.ExecuteNonQueryAsync();
|
|
|
|
_logger.LogInformation("Store-and-forward SQLite storage initialized");
|
|
}
|
|
|
|
/// <summary>
|
|
/// WP-9: Enqueues a new message with Pending status.
|
|
/// </summary>
|
|
public async Task EnqueueAsync(StoreAndForwardMessage message)
|
|
{
|
|
await using var connection = new SqliteConnection(_connectionString);
|
|
await connection.OpenAsync();
|
|
|
|
await using var cmd = connection.CreateCommand();
|
|
cmd.CommandText = @"
|
|
INSERT INTO sf_messages (id, category, target, payload_json, retry_count, max_retries,
|
|
retry_interval_ms, created_at, last_attempt_at, status, last_error, origin_instance)
|
|
VALUES (@id, @category, @target, @payload, @retryCount, @maxRetries,
|
|
@retryIntervalMs, @createdAt, @lastAttempt, @status, @lastError, @origin)";
|
|
|
|
cmd.Parameters.AddWithValue("@id", message.Id);
|
|
cmd.Parameters.AddWithValue("@category", (int)message.Category);
|
|
cmd.Parameters.AddWithValue("@target", message.Target);
|
|
cmd.Parameters.AddWithValue("@payload", message.PayloadJson);
|
|
cmd.Parameters.AddWithValue("@retryCount", message.RetryCount);
|
|
cmd.Parameters.AddWithValue("@maxRetries", message.MaxRetries);
|
|
cmd.Parameters.AddWithValue("@retryIntervalMs", message.RetryIntervalMs);
|
|
cmd.Parameters.AddWithValue("@createdAt", message.CreatedAt.ToString("O"));
|
|
cmd.Parameters.AddWithValue("@lastAttempt", message.LastAttemptAt.HasValue
|
|
? message.LastAttemptAt.Value.ToString("O") : DBNull.Value);
|
|
cmd.Parameters.AddWithValue("@status", (int)message.Status);
|
|
cmd.Parameters.AddWithValue("@lastError", (object?)message.LastError ?? DBNull.Value);
|
|
cmd.Parameters.AddWithValue("@origin", (object?)message.OriginInstanceName ?? DBNull.Value);
|
|
|
|
await cmd.ExecuteNonQueryAsync();
|
|
}
|
|
|
|
/// <summary>
|
|
/// WP-10: Gets all messages that are due for retry (Pending status, last attempt older than retry interval).
|
|
/// </summary>
|
|
public async Task<List<StoreAndForwardMessage>> GetMessagesForRetryAsync()
|
|
{
|
|
await using var connection = new SqliteConnection(_connectionString);
|
|
await connection.OpenAsync();
|
|
|
|
await using var cmd = connection.CreateCommand();
|
|
cmd.CommandText = @"
|
|
SELECT id, category, target, payload_json, retry_count, max_retries,
|
|
retry_interval_ms, created_at, last_attempt_at, status, last_error, origin_instance
|
|
FROM sf_messages
|
|
WHERE status = @pending
|
|
AND (last_attempt_at IS NULL
|
|
OR retry_interval_ms = 0
|
|
OR (julianday('now') - julianday(last_attempt_at)) * 86400000 >= retry_interval_ms)
|
|
ORDER BY created_at ASC";
|
|
|
|
cmd.Parameters.AddWithValue("@pending", (int)StoreAndForwardMessageStatus.Pending);
|
|
|
|
return await ReadMessagesAsync(cmd);
|
|
}
|
|
|
|
/// <summary>
|
|
/// WP-10: Updates a message after a delivery attempt.
|
|
/// </summary>
|
|
public async Task UpdateMessageAsync(StoreAndForwardMessage message)
|
|
{
|
|
await using var connection = new SqliteConnection(_connectionString);
|
|
await connection.OpenAsync();
|
|
|
|
await using var cmd = connection.CreateCommand();
|
|
cmd.CommandText = @"
|
|
UPDATE sf_messages
|
|
SET retry_count = @retryCount,
|
|
last_attempt_at = @lastAttempt,
|
|
status = @status,
|
|
last_error = @lastError
|
|
WHERE id = @id";
|
|
|
|
cmd.Parameters.AddWithValue("@id", message.Id);
|
|
cmd.Parameters.AddWithValue("@retryCount", message.RetryCount);
|
|
cmd.Parameters.AddWithValue("@lastAttempt", message.LastAttemptAt.HasValue
|
|
? message.LastAttemptAt.Value.ToString("O") : DBNull.Value);
|
|
cmd.Parameters.AddWithValue("@status", (int)message.Status);
|
|
cmd.Parameters.AddWithValue("@lastError", (object?)message.LastError ?? DBNull.Value);
|
|
|
|
await cmd.ExecuteNonQueryAsync();
|
|
}
|
|
|
|
/// <summary>
|
|
/// WP-10: Removes a successfully delivered message.
|
|
/// </summary>
|
|
public async Task RemoveMessageAsync(string messageId)
|
|
{
|
|
await using var connection = new SqliteConnection(_connectionString);
|
|
await connection.OpenAsync();
|
|
|
|
await using var cmd = connection.CreateCommand();
|
|
cmd.CommandText = "DELETE FROM sf_messages WHERE id = @id";
|
|
cmd.Parameters.AddWithValue("@id", messageId);
|
|
|
|
await cmd.ExecuteNonQueryAsync();
|
|
}
|
|
|
|
/// <summary>
|
|
/// WP-12: Gets all parked messages, optionally filtered by category, with pagination.
|
|
/// </summary>
|
|
public async Task<(List<StoreAndForwardMessage> Messages, int TotalCount)> GetParkedMessagesAsync(
|
|
StoreAndForwardCategory? category = null,
|
|
int pageNumber = 1,
|
|
int pageSize = 50)
|
|
{
|
|
await using var connection = new SqliteConnection(_connectionString);
|
|
await connection.OpenAsync();
|
|
|
|
// Count
|
|
await using var countCmd = connection.CreateCommand();
|
|
countCmd.CommandText = category.HasValue
|
|
? "SELECT COUNT(*) FROM sf_messages WHERE status = @parked AND category = @category"
|
|
: "SELECT COUNT(*) FROM sf_messages WHERE status = @parked";
|
|
countCmd.Parameters.AddWithValue("@parked", (int)StoreAndForwardMessageStatus.Parked);
|
|
if (category.HasValue) countCmd.Parameters.AddWithValue("@category", (int)category.Value);
|
|
var totalCount = Convert.ToInt32(await countCmd.ExecuteScalarAsync());
|
|
|
|
// Page
|
|
await using var pageCmd = connection.CreateCommand();
|
|
var categoryFilter = category.HasValue ? " AND category = @category" : "";
|
|
pageCmd.CommandText = $@"
|
|
SELECT id, category, target, payload_json, retry_count, max_retries,
|
|
retry_interval_ms, created_at, last_attempt_at, status, last_error, origin_instance
|
|
FROM sf_messages
|
|
WHERE status = @parked{categoryFilter}
|
|
ORDER BY created_at ASC
|
|
LIMIT @limit OFFSET @offset";
|
|
|
|
pageCmd.Parameters.AddWithValue("@parked", (int)StoreAndForwardMessageStatus.Parked);
|
|
if (category.HasValue) pageCmd.Parameters.AddWithValue("@category", (int)category.Value);
|
|
pageCmd.Parameters.AddWithValue("@limit", pageSize);
|
|
pageCmd.Parameters.AddWithValue("@offset", (pageNumber - 1) * pageSize);
|
|
|
|
var messages = await ReadMessagesAsync(pageCmd);
|
|
return (messages, totalCount);
|
|
}
|
|
|
|
/// <summary>
|
|
/// WP-12: Moves a parked message back to pending for retry.
|
|
/// </summary>
|
|
public async Task<bool> RetryParkedMessageAsync(string messageId)
|
|
{
|
|
await using var connection = new SqliteConnection(_connectionString);
|
|
await connection.OpenAsync();
|
|
|
|
await using var cmd = connection.CreateCommand();
|
|
cmd.CommandText = @"
|
|
UPDATE sf_messages
|
|
SET status = @pending, retry_count = 0, last_error = NULL
|
|
WHERE id = @id AND status = @parked";
|
|
|
|
cmd.Parameters.AddWithValue("@id", messageId);
|
|
cmd.Parameters.AddWithValue("@pending", (int)StoreAndForwardMessageStatus.Pending);
|
|
cmd.Parameters.AddWithValue("@parked", (int)StoreAndForwardMessageStatus.Parked);
|
|
|
|
var rows = await cmd.ExecuteNonQueryAsync();
|
|
return rows > 0;
|
|
}
|
|
|
|
/// <summary>
|
|
/// WP-12: Permanently discards a parked message.
|
|
/// </summary>
|
|
public async Task<bool> DiscardParkedMessageAsync(string messageId)
|
|
{
|
|
await using var connection = new SqliteConnection(_connectionString);
|
|
await connection.OpenAsync();
|
|
|
|
await using var cmd = connection.CreateCommand();
|
|
cmd.CommandText = "DELETE FROM sf_messages WHERE id = @id AND status = @parked";
|
|
cmd.Parameters.AddWithValue("@id", messageId);
|
|
cmd.Parameters.AddWithValue("@parked", (int)StoreAndForwardMessageStatus.Parked);
|
|
|
|
var rows = await cmd.ExecuteNonQueryAsync();
|
|
return rows > 0;
|
|
}
|
|
|
|
/// <summary>
|
|
/// WP-14: Gets buffer depth by category (count of pending messages per category).
|
|
/// </summary>
|
|
public async Task<Dictionary<StoreAndForwardCategory, int>> GetBufferDepthByCategoryAsync()
|
|
{
|
|
await using var connection = new SqliteConnection(_connectionString);
|
|
await connection.OpenAsync();
|
|
|
|
await using var cmd = connection.CreateCommand();
|
|
cmd.CommandText = @"
|
|
SELECT category, COUNT(*) as cnt
|
|
FROM sf_messages
|
|
WHERE status = @pending
|
|
GROUP BY category";
|
|
cmd.Parameters.AddWithValue("@pending", (int)StoreAndForwardMessageStatus.Pending);
|
|
|
|
var result = new Dictionary<StoreAndForwardCategory, int>();
|
|
await using var reader = await cmd.ExecuteReaderAsync();
|
|
while (await reader.ReadAsync())
|
|
{
|
|
var category = (StoreAndForwardCategory)reader.GetInt32(0);
|
|
var count = reader.GetInt32(1);
|
|
result[category] = count;
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
/// <summary>
|
|
/// WP-13: Verifies messages are NOT deleted when an instance is deleted.
|
|
/// Returns the count of messages for a given origin instance.
|
|
/// </summary>
|
|
public async Task<int> GetMessageCountByOriginInstanceAsync(string instanceName)
|
|
{
|
|
await using var connection = new SqliteConnection(_connectionString);
|
|
await connection.OpenAsync();
|
|
|
|
await using var cmd = connection.CreateCommand();
|
|
cmd.CommandText = @"
|
|
SELECT COUNT(*)
|
|
FROM sf_messages
|
|
WHERE origin_instance = @origin";
|
|
cmd.Parameters.AddWithValue("@origin", instanceName);
|
|
|
|
return Convert.ToInt32(await cmd.ExecuteScalarAsync());
|
|
}
|
|
|
|
/// <summary>
|
|
/// Gets a message by ID.
|
|
/// </summary>
|
|
public async Task<StoreAndForwardMessage?> GetMessageByIdAsync(string messageId)
|
|
{
|
|
await using var connection = new SqliteConnection(_connectionString);
|
|
await connection.OpenAsync();
|
|
|
|
await using var cmd = connection.CreateCommand();
|
|
cmd.CommandText = @"
|
|
SELECT id, category, target, payload_json, retry_count, max_retries,
|
|
retry_interval_ms, created_at, last_attempt_at, status, last_error, origin_instance
|
|
FROM sf_messages
|
|
WHERE id = @id";
|
|
cmd.Parameters.AddWithValue("@id", messageId);
|
|
|
|
var messages = await ReadMessagesAsync(cmd);
|
|
return messages.FirstOrDefault();
|
|
}
|
|
|
|
/// <summary>
|
|
/// Gets the count of parked messages (for health reporting).
|
|
/// </summary>
|
|
public async Task<int> GetParkedMessageCountAsync()
|
|
{
|
|
await using var conn = new SqliteConnection(_connectionString);
|
|
await conn.OpenAsync();
|
|
await using var cmd = conn.CreateCommand();
|
|
cmd.CommandText = "SELECT COUNT(*) FROM sf_messages WHERE status = @parked";
|
|
cmd.Parameters.AddWithValue("@parked", (int)StoreAndForwardMessageStatus.Parked);
|
|
var result = await cmd.ExecuteScalarAsync();
|
|
return Convert.ToInt32(result);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Gets total message count by status.
|
|
/// </summary>
|
|
public async Task<int> GetMessageCountByStatusAsync(StoreAndForwardMessageStatus status)
|
|
{
|
|
await using var connection = new SqliteConnection(_connectionString);
|
|
await connection.OpenAsync();
|
|
|
|
await using var cmd = connection.CreateCommand();
|
|
cmd.CommandText = "SELECT COUNT(*) FROM sf_messages WHERE status = @status";
|
|
cmd.Parameters.AddWithValue("@status", (int)status);
|
|
|
|
return Convert.ToInt32(await cmd.ExecuteScalarAsync());
|
|
}
|
|
|
|
private static async Task<List<StoreAndForwardMessage>> ReadMessagesAsync(SqliteCommand cmd)
|
|
{
|
|
var results = new List<StoreAndForwardMessage>();
|
|
await using var reader = await cmd.ExecuteReaderAsync();
|
|
while (await reader.ReadAsync())
|
|
{
|
|
results.Add(new StoreAndForwardMessage
|
|
{
|
|
Id = reader.GetString(0),
|
|
Category = (StoreAndForwardCategory)reader.GetInt32(1),
|
|
Target = reader.GetString(2),
|
|
PayloadJson = reader.GetString(3),
|
|
RetryCount = reader.GetInt32(4),
|
|
MaxRetries = reader.GetInt32(5),
|
|
RetryIntervalMs = reader.GetInt64(6),
|
|
CreatedAt = DateTimeOffset.Parse(reader.GetString(7)),
|
|
LastAttemptAt = reader.IsDBNull(8) ? null : DateTimeOffset.Parse(reader.GetString(8)),
|
|
Status = (StoreAndForwardMessageStatus)reader.GetInt32(9),
|
|
LastError = reader.IsDBNull(10) ? null : reader.GetString(10),
|
|
OriginInstanceName = reader.IsDBNull(11) ? null : reader.GetString(11)
|
|
});
|
|
}
|
|
return results;
|
|
}
|
|
}
|