using System.Text.Json; using Microsoft.Data.Sqlite; using Serilog; namespace ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian; /// /// Phase 7 plan decisions #16–#17 implementation: durable SQLite queue on the node /// absorbs every qualifying alarm event, a drain worker batches rows to Galaxy.Host /// via on an exponential-backoff cadence, and /// operator acks never block on the historian being reachable. /// /// /// /// Queue schema: /// /// CREATE TABLE Queue ( /// RowId INTEGER PRIMARY KEY AUTOINCREMENT, /// AlarmId TEXT NOT NULL, /// EnqueuedUtc TEXT NOT NULL, /// PayloadJson TEXT NOT NULL, /// AttemptCount INTEGER NOT NULL DEFAULT 0, /// LastAttemptUtc TEXT NULL, /// LastError TEXT NULL, /// DeadLettered INTEGER NOT NULL DEFAULT 0 /// ); /// /// Dead-lettered rows stay in place for the configured retention window (default /// 30 days per Phase 7 plan decision #21) so operators can inspect + manually /// retry before the sweeper purges them. Regular queue capacity is bounded — /// overflow evicts the oldest non-dead-lettered rows with a WARN log. /// /// /// Drain runs on a shared . Exponential /// backoff on : 1s → 2s → 5s → /// 15s → 60s cap. rows flip /// the DeadLettered flag on the individual row; neighbors in the batch /// still retry on their own cadence. /// /// public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable { /// Default queue capacity — oldest non-dead-lettered rows evicted past this. public const long DefaultCapacity = 1_000_000; public static readonly TimeSpan DefaultDeadLetterRetention = TimeSpan.FromDays(30); private static readonly TimeSpan[] BackoffLadder = [ TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(2), TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(15), TimeSpan.FromSeconds(60), ]; private readonly string _connectionString; private readonly IAlarmHistorianWriter _writer; private readonly ILogger _logger; private readonly int _batchSize; private readonly long _capacity; private readonly TimeSpan _deadLetterRetention; private readonly Func _clock; private readonly SemaphoreSlim _drainGate = new(1, 1); private Timer? _drainTimer; private int _backoffIndex; private DateTime? _lastDrainUtc; private DateTime? _lastSuccessUtc; private string? _lastError; private HistorianDrainState _drainState = HistorianDrainState.Idle; private bool _disposed; public SqliteStoreAndForwardSink( string databasePath, IAlarmHistorianWriter writer, ILogger logger, int batchSize = 100, long capacity = DefaultCapacity, TimeSpan? deadLetterRetention = null, Func? clock = null) { if (string.IsNullOrWhiteSpace(databasePath)) throw new ArgumentException("Database path required.", nameof(databasePath)); _writer = writer ?? throw new ArgumentNullException(nameof(writer)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _batchSize = batchSize > 0 ? batchSize : throw new ArgumentOutOfRangeException(nameof(batchSize)); _capacity = capacity > 0 ? capacity : throw new ArgumentOutOfRangeException(nameof(capacity)); _deadLetterRetention = deadLetterRetention ?? DefaultDeadLetterRetention; _clock = clock ?? (() => DateTime.UtcNow); _connectionString = $"Data Source={databasePath}"; InitializeSchema(); } /// /// Start the background drain worker. Not started automatically so tests can /// drive deterministically. /// public void StartDrainLoop(TimeSpan tickInterval) { if (_disposed) throw new ObjectDisposedException(nameof(SqliteStoreAndForwardSink)); _drainTimer?.Dispose(); _drainTimer = new Timer(_ => _ = DrainOnceAsync(CancellationToken.None), null, tickInterval, tickInterval); } public Task EnqueueAsync(AlarmHistorianEvent evt, CancellationToken cancellationToken) { if (evt is null) throw new ArgumentNullException(nameof(evt)); if (_disposed) throw new ObjectDisposedException(nameof(SqliteStoreAndForwardSink)); using var conn = new SqliteConnection(_connectionString); conn.Open(); EnforceCapacity(conn); using var cmd = conn.CreateCommand(); cmd.CommandText = """ INSERT INTO Queue (AlarmId, EnqueuedUtc, PayloadJson, AttemptCount) VALUES ($alarmId, $enqueued, $payload, 0); """; cmd.Parameters.AddWithValue("$alarmId", evt.AlarmId); cmd.Parameters.AddWithValue("$enqueued", _clock().ToString("O")); cmd.Parameters.AddWithValue("$payload", JsonSerializer.Serialize(evt)); cmd.ExecuteNonQuery(); return Task.CompletedTask; } /// /// Read up to queued rows, forward through the writer, /// remove Ack'd rows, dead-letter PermanentFail rows, and extend the backoff /// on RetryPlease. Safe to call from multiple threads; the semaphore enforces /// serial execution. /// public async Task DrainOnceAsync(CancellationToken ct) { if (_disposed) return; if (!await _drainGate.WaitAsync(0, ct).ConfigureAwait(false)) return; try { _drainState = HistorianDrainState.Draining; _lastDrainUtc = _clock(); PurgeAgedDeadLetters(); var (rowIds, events) = ReadBatch(); if (rowIds.Count == 0) { _drainState = HistorianDrainState.Idle; return; } IReadOnlyList outcomes; try { outcomes = await _writer.WriteBatchAsync(events, ct).ConfigureAwait(false); _lastError = null; } catch (OperationCanceledException) { throw; } catch (Exception ex) { // Writer-side exception — treat entire batch as RetryPlease. _lastError = ex.Message; _logger.Warning(ex, "Historian writer threw on batch of {Count}; deferring retry", events.Count); BumpBackoff(); _drainState = HistorianDrainState.BackingOff; return; } if (outcomes.Count != events.Count) throw new InvalidOperationException( $"Writer returned {outcomes.Count} outcomes for {events.Count} events — expected 1:1"); using var conn = new SqliteConnection(_connectionString); conn.Open(); using var tx = conn.BeginTransaction(); for (var i = 0; i < outcomes.Count; i++) { var outcome = outcomes[i]; var rowId = rowIds[i]; switch (outcome) { case HistorianWriteOutcome.Ack: DeleteRow(conn, tx, rowId); break; case HistorianWriteOutcome.PermanentFail: DeadLetterRow(conn, tx, rowId, $"permanent fail at {_clock():O}"); break; case HistorianWriteOutcome.RetryPlease: BumpAttempt(conn, tx, rowId, "retry-please"); break; } } tx.Commit(); var acks = outcomes.Count(o => o == HistorianWriteOutcome.Ack); if (acks > 0) _lastSuccessUtc = _clock(); if (outcomes.Any(o => o == HistorianWriteOutcome.RetryPlease)) { BumpBackoff(); _drainState = HistorianDrainState.BackingOff; } else { ResetBackoff(); _drainState = HistorianDrainState.Idle; } } finally { _drainGate.Release(); } } public HistorianSinkStatus GetStatus() { using var conn = new SqliteConnection(_connectionString); conn.Open(); long queued; long deadlettered; using (var cmd = conn.CreateCommand()) { cmd.CommandText = "SELECT COUNT(*) FROM Queue WHERE DeadLettered = 0"; queued = (long)(cmd.ExecuteScalar() ?? 0L); } using (var cmd = conn.CreateCommand()) { cmd.CommandText = "SELECT COUNT(*) FROM Queue WHERE DeadLettered = 1"; deadlettered = (long)(cmd.ExecuteScalar() ?? 0L); } return new HistorianSinkStatus( QueueDepth: queued, DeadLetterDepth: deadlettered, LastDrainUtc: _lastDrainUtc, LastSuccessUtc: _lastSuccessUtc, LastError: _lastError, DrainState: _drainState); } /// Operator action from Admin UI — retry every dead-lettered row. Non-cascading: they rejoin the regular queue + get a fresh backoff. public int RetryDeadLettered() { using var conn = new SqliteConnection(_connectionString); conn.Open(); using var cmd = conn.CreateCommand(); cmd.CommandText = "UPDATE Queue SET DeadLettered = 0, AttemptCount = 0, LastError = NULL WHERE DeadLettered = 1"; return cmd.ExecuteNonQuery(); } private (List rowIds, List events) ReadBatch() { var rowIds = new List(); var events = new List(); using var conn = new SqliteConnection(_connectionString); conn.Open(); using var cmd = conn.CreateCommand(); cmd.CommandText = """ SELECT RowId, PayloadJson FROM Queue WHERE DeadLettered = 0 ORDER BY RowId ASC LIMIT $limit """; cmd.Parameters.AddWithValue("$limit", _batchSize); using var reader = cmd.ExecuteReader(); while (reader.Read()) { rowIds.Add(reader.GetInt64(0)); var payload = reader.GetString(1); var evt = JsonSerializer.Deserialize(payload); if (evt is not null) events.Add(evt); } return (rowIds, events); } private static void DeleteRow(SqliteConnection conn, SqliteTransaction tx, long rowId) { using var cmd = conn.CreateCommand(); cmd.Transaction = tx; cmd.CommandText = "DELETE FROM Queue WHERE RowId = $id"; cmd.Parameters.AddWithValue("$id", rowId); cmd.ExecuteNonQuery(); } private void DeadLetterRow(SqliteConnection conn, SqliteTransaction tx, long rowId, string reason) { using var cmd = conn.CreateCommand(); cmd.Transaction = tx; cmd.CommandText = """ UPDATE Queue SET DeadLettered = 1, LastAttemptUtc = $now, LastError = $err, AttemptCount = AttemptCount + 1 WHERE RowId = $id """; cmd.Parameters.AddWithValue("$now", _clock().ToString("O")); cmd.Parameters.AddWithValue("$err", reason); cmd.Parameters.AddWithValue("$id", rowId); cmd.ExecuteNonQuery(); } private void BumpAttempt(SqliteConnection conn, SqliteTransaction tx, long rowId, string reason) { using var cmd = conn.CreateCommand(); cmd.Transaction = tx; cmd.CommandText = """ UPDATE Queue SET LastAttemptUtc = $now, LastError = $err, AttemptCount = AttemptCount + 1 WHERE RowId = $id """; cmd.Parameters.AddWithValue("$now", _clock().ToString("O")); cmd.Parameters.AddWithValue("$err", reason); cmd.Parameters.AddWithValue("$id", rowId); cmd.ExecuteNonQuery(); } private void EnforceCapacity(SqliteConnection conn) { // Count non-dead-lettered rows only — dead-lettered rows retain for // post-mortem per the configured retention window. long count; using (var cmd = conn.CreateCommand()) { cmd.CommandText = "SELECT COUNT(*) FROM Queue WHERE DeadLettered = 0"; count = (long)(cmd.ExecuteScalar() ?? 0L); } if (count < _capacity) return; var toEvict = count - _capacity + 1; using (var cmd = conn.CreateCommand()) { cmd.CommandText = """ DELETE FROM Queue WHERE RowId IN ( SELECT RowId FROM Queue WHERE DeadLettered = 0 ORDER BY RowId ASC LIMIT $n ) """; cmd.Parameters.AddWithValue("$n", toEvict); cmd.ExecuteNonQuery(); } _logger.Warning( "Historian queue at capacity {Cap} — evicted {Count} oldest row(s) to make room", _capacity, toEvict); } private void PurgeAgedDeadLetters() { var cutoff = (_clock() - _deadLetterRetention).ToString("O"); using var conn = new SqliteConnection(_connectionString); conn.Open(); using var cmd = conn.CreateCommand(); cmd.CommandText = """ DELETE FROM Queue WHERE DeadLettered = 1 AND LastAttemptUtc IS NOT NULL AND LastAttemptUtc < $cutoff """; cmd.Parameters.AddWithValue("$cutoff", cutoff); var purged = cmd.ExecuteNonQuery(); if (purged > 0) _logger.Information("Purged {Count} dead-lettered row(s) past retention window", purged); } private void InitializeSchema() { using var conn = new SqliteConnection(_connectionString); conn.Open(); using var cmd = conn.CreateCommand(); cmd.CommandText = """ CREATE TABLE IF NOT EXISTS Queue ( RowId INTEGER PRIMARY KEY AUTOINCREMENT, AlarmId TEXT NOT NULL, EnqueuedUtc TEXT NOT NULL, PayloadJson TEXT NOT NULL, AttemptCount INTEGER NOT NULL DEFAULT 0, LastAttemptUtc TEXT NULL, LastError TEXT NULL, DeadLettered INTEGER NOT NULL DEFAULT 0 ); CREATE INDEX IF NOT EXISTS IX_Queue_Drain ON Queue (DeadLettered, RowId); """; cmd.ExecuteNonQuery(); } private void BumpBackoff() => _backoffIndex = Math.Min(_backoffIndex + 1, BackoffLadder.Length - 1); private void ResetBackoff() => _backoffIndex = 0; public TimeSpan CurrentBackoff => BackoffLadder[_backoffIndex]; public void Dispose() { if (_disposed) return; _disposed = true; _drainTimer?.Dispose(); _drainGate.Dispose(); } }