Files
lmxopcua/src/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/SqliteStoreAndForwardSink.cs
Joseph Doherty 25ad4b1929 Phase 7 Stream D — Historian alarm sink (SQLite store-and-forward + Galaxy.Host IPC contracts)
Phase 7 plan decisions #16, #17, #19, #21 implementation. Durable local SQLite queue
absorbs every qualifying alarm event; drain worker forwards batches to Galaxy.Host
(reusing the already-loaded 32-bit aahClientManaged DLLs) on an exponential-backoff
cadence; operator acks never block on the historian being reachable.

## New project Core.AlarmHistorian (net10)

- AlarmHistorianEvent — source-agnostic event shape (scripted alarms + Galaxy-native +
  AB CIP ALMD + any future IAlarmSource)
- IAlarmHistorianSink / NullAlarmHistorianSink — interface + disabled default
- IAlarmHistorianWriter — per-event outcome (Ack / RetryPlease / PermanentFail); Stream G
  wires the Galaxy.Host IPC client implementation
- SqliteStoreAndForwardSink — full implementation:
  - Queue table with AttemptCount / LastError / DeadLettered columns
  - DrainOnceAsync serialised via SemaphoreSlim
  - BackoffLadder 1s → 2s → 5s → 15s → 60s (cap)
  - DefaultCapacity 1,000,000 rows — overflow evicts oldest non-dead-lettered
  - DefaultDeadLetterRetention 30 days — sweeper purges on every drain tick
  - RetryDeadLettered operator action reattaches dead-letters to the regular queue
  - Writer-side exceptions treated as whole-batch RetryPlease (no data loss)

## New IPC contracts in Driver.Galaxy.Shared

- HistorianAlarmEventRequest — batched up to 100 events/request per plan Stream D.5
- HistorianAlarmEventResponse — per-event outcome (1:1 with request order)
- HistorianAlarmEventOutcomeDto enum (byte on the wire — Ack/RetryPlease/PermanentFail)
- HistorianAlarmEventDto — mirrors Core.AlarmHistorian.AlarmHistorianEvent
- HistorianConnectivityStatusNotification — Host pushes proactively when the SDK
  session drops so /alarms/historian flips red without waiting for the next drain
- MessageKind additions: 0x80 HistorianAlarmEventRequest / 0x81 HistorianAlarmEventResponse
  / 0x82 HistorianConnectivityStatus

## Tests — 14/14

SqliteStoreAndForwardSinkTests covers: enqueue→drain→Ack round-trip, empty-queue no-op,
RetryPlease bumps backoff + keeps row, Ack after Retry resets backoff, PermanentFail
dead-letters one row without blocking neighbors, writer exception treated as whole-batch
retry with error surfaced in status, capacity eviction drops oldest non-dead-lettered,
dead-letters purged past retention window, RetryDeadLettered requeues, ladder caps at
60s after 10 retries, Null sink reports Disabled status, null sink swallows enqueue,
ctor argument validation, disposed sink rejects enqueue.

## Totals
Full Phase 7 tests: 160 green (63 Scripting + 36 VirtualTags + 47 ScriptedAlarms +
14 AlarmHistorian). Stream G wires this into the real Galaxy.Host IPC pipe.
2026-04-20 19:11:17 -04:00

398 lines
15 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
using System.Text.Json;
using Microsoft.Data.Sqlite;
using Serilog;
namespace ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
/// <summary>
/// Phase 7 plan decisions #16#17 implementation: durable SQLite queue on the node
/// absorbs every qualifying alarm event, a drain worker batches rows to Galaxy.Host
/// via <see cref="IAlarmHistorianWriter"/> on an exponential-backoff cadence, and
/// operator acks never block on the historian being reachable.
/// </summary>
/// <remarks>
/// <para>
/// Queue schema:
/// <code>
/// CREATE TABLE Queue (
/// RowId INTEGER PRIMARY KEY AUTOINCREMENT,
/// AlarmId TEXT NOT NULL,
/// EnqueuedUtc TEXT NOT NULL,
/// PayloadJson TEXT NOT NULL,
/// AttemptCount INTEGER NOT NULL DEFAULT 0,
/// LastAttemptUtc TEXT NULL,
/// LastError TEXT NULL,
/// DeadLettered INTEGER NOT NULL DEFAULT 0
/// );
/// </code>
/// Dead-lettered rows stay in place for the configured retention window (default
/// 30 days per Phase 7 plan decision #21) so operators can inspect + manually
/// retry before the sweeper purges them. Regular queue capacity is bounded —
/// overflow evicts the oldest non-dead-lettered rows with a WARN log.
/// </para>
/// <para>
/// Drain runs on a shared <see cref="System.Threading.Timer"/>. Exponential
/// backoff on <see cref="HistorianWriteOutcome.RetryPlease"/>: 1s → 2s → 5s →
/// 15s → 60s cap. <see cref="HistorianWriteOutcome.PermanentFail"/> rows flip
/// the <c>DeadLettered</c> flag on the individual row; neighbors in the batch
/// still retry on their own cadence.
/// </para>
/// </remarks>
public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
{
/// <summary>Default queue capacity — oldest non-dead-lettered rows evicted past this.</summary>
public const long DefaultCapacity = 1_000_000;
public static readonly TimeSpan DefaultDeadLetterRetention = TimeSpan.FromDays(30);
private static readonly TimeSpan[] BackoffLadder =
[
TimeSpan.FromSeconds(1),
TimeSpan.FromSeconds(2),
TimeSpan.FromSeconds(5),
TimeSpan.FromSeconds(15),
TimeSpan.FromSeconds(60),
];
private readonly string _connectionString;
private readonly IAlarmHistorianWriter _writer;
private readonly ILogger _logger;
private readonly int _batchSize;
private readonly long _capacity;
private readonly TimeSpan _deadLetterRetention;
private readonly Func<DateTime> _clock;
private readonly SemaphoreSlim _drainGate = new(1, 1);
private Timer? _drainTimer;
private int _backoffIndex;
private DateTime? _lastDrainUtc;
private DateTime? _lastSuccessUtc;
private string? _lastError;
private HistorianDrainState _drainState = HistorianDrainState.Idle;
private bool _disposed;
public SqliteStoreAndForwardSink(
string databasePath,
IAlarmHistorianWriter writer,
ILogger logger,
int batchSize = 100,
long capacity = DefaultCapacity,
TimeSpan? deadLetterRetention = null,
Func<DateTime>? clock = null)
{
if (string.IsNullOrWhiteSpace(databasePath))
throw new ArgumentException("Database path required.", nameof(databasePath));
_writer = writer ?? throw new ArgumentNullException(nameof(writer));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_batchSize = batchSize > 0 ? batchSize : throw new ArgumentOutOfRangeException(nameof(batchSize));
_capacity = capacity > 0 ? capacity : throw new ArgumentOutOfRangeException(nameof(capacity));
_deadLetterRetention = deadLetterRetention ?? DefaultDeadLetterRetention;
_clock = clock ?? (() => DateTime.UtcNow);
_connectionString = $"Data Source={databasePath}";
InitializeSchema();
}
/// <summary>
/// Start the background drain worker. Not started automatically so tests can
/// drive <see cref="DrainOnceAsync"/> deterministically.
/// </summary>
public void StartDrainLoop(TimeSpan tickInterval)
{
if (_disposed) throw new ObjectDisposedException(nameof(SqliteStoreAndForwardSink));
_drainTimer?.Dispose();
_drainTimer = new Timer(_ => _ = DrainOnceAsync(CancellationToken.None),
null, tickInterval, tickInterval);
}
public Task EnqueueAsync(AlarmHistorianEvent evt, CancellationToken cancellationToken)
{
if (evt is null) throw new ArgumentNullException(nameof(evt));
if (_disposed) throw new ObjectDisposedException(nameof(SqliteStoreAndForwardSink));
using var conn = new SqliteConnection(_connectionString);
conn.Open();
EnforceCapacity(conn);
using var cmd = conn.CreateCommand();
cmd.CommandText = """
INSERT INTO Queue (AlarmId, EnqueuedUtc, PayloadJson, AttemptCount)
VALUES ($alarmId, $enqueued, $payload, 0);
""";
cmd.Parameters.AddWithValue("$alarmId", evt.AlarmId);
cmd.Parameters.AddWithValue("$enqueued", _clock().ToString("O"));
cmd.Parameters.AddWithValue("$payload", JsonSerializer.Serialize(evt));
cmd.ExecuteNonQuery();
return Task.CompletedTask;
}
/// <summary>
/// Read up to <see cref="_batchSize"/> queued rows, forward through the writer,
/// remove Ack'd rows, dead-letter PermanentFail rows, and extend the backoff
/// on RetryPlease. Safe to call from multiple threads; the semaphore enforces
/// serial execution.
/// </summary>
public async Task DrainOnceAsync(CancellationToken ct)
{
if (_disposed) return;
if (!await _drainGate.WaitAsync(0, ct).ConfigureAwait(false)) return;
try
{
_drainState = HistorianDrainState.Draining;
_lastDrainUtc = _clock();
PurgeAgedDeadLetters();
var (rowIds, events) = ReadBatch();
if (rowIds.Count == 0)
{
_drainState = HistorianDrainState.Idle;
return;
}
IReadOnlyList<HistorianWriteOutcome> outcomes;
try
{
outcomes = await _writer.WriteBatchAsync(events, ct).ConfigureAwait(false);
_lastError = null;
}
catch (OperationCanceledException)
{
throw;
}
catch (Exception ex)
{
// Writer-side exception — treat entire batch as RetryPlease.
_lastError = ex.Message;
_logger.Warning(ex, "Historian writer threw on batch of {Count}; deferring retry", events.Count);
BumpBackoff();
_drainState = HistorianDrainState.BackingOff;
return;
}
if (outcomes.Count != events.Count)
throw new InvalidOperationException(
$"Writer returned {outcomes.Count} outcomes for {events.Count} events — expected 1:1");
using var conn = new SqliteConnection(_connectionString);
conn.Open();
using var tx = conn.BeginTransaction();
for (var i = 0; i < outcomes.Count; i++)
{
var outcome = outcomes[i];
var rowId = rowIds[i];
switch (outcome)
{
case HistorianWriteOutcome.Ack:
DeleteRow(conn, tx, rowId);
break;
case HistorianWriteOutcome.PermanentFail:
DeadLetterRow(conn, tx, rowId, $"permanent fail at {_clock():O}");
break;
case HistorianWriteOutcome.RetryPlease:
BumpAttempt(conn, tx, rowId, "retry-please");
break;
}
}
tx.Commit();
var acks = outcomes.Count(o => o == HistorianWriteOutcome.Ack);
if (acks > 0) _lastSuccessUtc = _clock();
if (outcomes.Any(o => o == HistorianWriteOutcome.RetryPlease))
{
BumpBackoff();
_drainState = HistorianDrainState.BackingOff;
}
else
{
ResetBackoff();
_drainState = HistorianDrainState.Idle;
}
}
finally
{
_drainGate.Release();
}
}
public HistorianSinkStatus GetStatus()
{
using var conn = new SqliteConnection(_connectionString);
conn.Open();
long queued;
long deadlettered;
using (var cmd = conn.CreateCommand())
{
cmd.CommandText = "SELECT COUNT(*) FROM Queue WHERE DeadLettered = 0";
queued = (long)(cmd.ExecuteScalar() ?? 0L);
}
using (var cmd = conn.CreateCommand())
{
cmd.CommandText = "SELECT COUNT(*) FROM Queue WHERE DeadLettered = 1";
deadlettered = (long)(cmd.ExecuteScalar() ?? 0L);
}
return new HistorianSinkStatus(
QueueDepth: queued,
DeadLetterDepth: deadlettered,
LastDrainUtc: _lastDrainUtc,
LastSuccessUtc: _lastSuccessUtc,
LastError: _lastError,
DrainState: _drainState);
}
/// <summary>Operator action from Admin UI — retry every dead-lettered row. Non-cascading: they rejoin the regular queue + get a fresh backoff.</summary>
public int RetryDeadLettered()
{
using var conn = new SqliteConnection(_connectionString);
conn.Open();
using var cmd = conn.CreateCommand();
cmd.CommandText = "UPDATE Queue SET DeadLettered = 0, AttemptCount = 0, LastError = NULL WHERE DeadLettered = 1";
return cmd.ExecuteNonQuery();
}
private (List<long> rowIds, List<AlarmHistorianEvent> events) ReadBatch()
{
var rowIds = new List<long>();
var events = new List<AlarmHistorianEvent>();
using var conn = new SqliteConnection(_connectionString);
conn.Open();
using var cmd = conn.CreateCommand();
cmd.CommandText = """
SELECT RowId, PayloadJson FROM Queue
WHERE DeadLettered = 0
ORDER BY RowId ASC
LIMIT $limit
""";
cmd.Parameters.AddWithValue("$limit", _batchSize);
using var reader = cmd.ExecuteReader();
while (reader.Read())
{
rowIds.Add(reader.GetInt64(0));
var payload = reader.GetString(1);
var evt = JsonSerializer.Deserialize<AlarmHistorianEvent>(payload);
if (evt is not null) events.Add(evt);
}
return (rowIds, events);
}
private static void DeleteRow(SqliteConnection conn, SqliteTransaction tx, long rowId)
{
using var cmd = conn.CreateCommand();
cmd.Transaction = tx;
cmd.CommandText = "DELETE FROM Queue WHERE RowId = $id";
cmd.Parameters.AddWithValue("$id", rowId);
cmd.ExecuteNonQuery();
}
private void DeadLetterRow(SqliteConnection conn, SqliteTransaction tx, long rowId, string reason)
{
using var cmd = conn.CreateCommand();
cmd.Transaction = tx;
cmd.CommandText = """
UPDATE Queue SET DeadLettered = 1, LastAttemptUtc = $now, LastError = $err, AttemptCount = AttemptCount + 1
WHERE RowId = $id
""";
cmd.Parameters.AddWithValue("$now", _clock().ToString("O"));
cmd.Parameters.AddWithValue("$err", reason);
cmd.Parameters.AddWithValue("$id", rowId);
cmd.ExecuteNonQuery();
}
private void BumpAttempt(SqliteConnection conn, SqliteTransaction tx, long rowId, string reason)
{
using var cmd = conn.CreateCommand();
cmd.Transaction = tx;
cmd.CommandText = """
UPDATE Queue SET LastAttemptUtc = $now, LastError = $err, AttemptCount = AttemptCount + 1
WHERE RowId = $id
""";
cmd.Parameters.AddWithValue("$now", _clock().ToString("O"));
cmd.Parameters.AddWithValue("$err", reason);
cmd.Parameters.AddWithValue("$id", rowId);
cmd.ExecuteNonQuery();
}
private void EnforceCapacity(SqliteConnection conn)
{
// Count non-dead-lettered rows only — dead-lettered rows retain for
// post-mortem per the configured retention window.
long count;
using (var cmd = conn.CreateCommand())
{
cmd.CommandText = "SELECT COUNT(*) FROM Queue WHERE DeadLettered = 0";
count = (long)(cmd.ExecuteScalar() ?? 0L);
}
if (count < _capacity) return;
var toEvict = count - _capacity + 1;
using (var cmd = conn.CreateCommand())
{
cmd.CommandText = """
DELETE FROM Queue
WHERE RowId IN (
SELECT RowId FROM Queue
WHERE DeadLettered = 0
ORDER BY RowId ASC
LIMIT $n
)
""";
cmd.Parameters.AddWithValue("$n", toEvict);
cmd.ExecuteNonQuery();
}
_logger.Warning(
"Historian queue at capacity {Cap} — evicted {Count} oldest row(s) to make room",
_capacity, toEvict);
}
private void PurgeAgedDeadLetters()
{
var cutoff = (_clock() - _deadLetterRetention).ToString("O");
using var conn = new SqliteConnection(_connectionString);
conn.Open();
using var cmd = conn.CreateCommand();
cmd.CommandText = """
DELETE FROM Queue
WHERE DeadLettered = 1 AND LastAttemptUtc IS NOT NULL AND LastAttemptUtc < $cutoff
""";
cmd.Parameters.AddWithValue("$cutoff", cutoff);
var purged = cmd.ExecuteNonQuery();
if (purged > 0)
_logger.Information("Purged {Count} dead-lettered row(s) past retention window", purged);
}
private void InitializeSchema()
{
using var conn = new SqliteConnection(_connectionString);
conn.Open();
using var cmd = conn.CreateCommand();
cmd.CommandText = """
CREATE TABLE IF NOT EXISTS Queue (
RowId INTEGER PRIMARY KEY AUTOINCREMENT,
AlarmId TEXT NOT NULL,
EnqueuedUtc TEXT NOT NULL,
PayloadJson TEXT NOT NULL,
AttemptCount INTEGER NOT NULL DEFAULT 0,
LastAttemptUtc TEXT NULL,
LastError TEXT NULL,
DeadLettered INTEGER NOT NULL DEFAULT 0
);
CREATE INDEX IF NOT EXISTS IX_Queue_Drain ON Queue (DeadLettered, RowId);
""";
cmd.ExecuteNonQuery();
}
private void BumpBackoff() => _backoffIndex = Math.Min(_backoffIndex + 1, BackoffLadder.Length - 1);
private void ResetBackoff() => _backoffIndex = 0;
public TimeSpan CurrentBackoff => BackoffLadder[_backoffIndex];
public void Dispose()
{
if (_disposed) return;
_disposed = true;
_drainTimer?.Dispose();
_drainGate.Dispose();
}
}