fix(alarm-historian): resolve High code-review findings (Core.AlarmHistorian-002, -004, -006)

Core.AlarmHistorian-002 — drain loop now honors exponential backoff:
StartDrainLoop arms a self-rescheduling one-shot Timer. RescheduleDrain
sets the next due-time to max(tickInterval, CurrentBackoff) while the
sink is BackingOff, so a historian outage genuinely slows the cadence
down the 1s->2s->5s->15s->60s ladder instead of hammering at the fixed
tick. Class doc-comment updated.

Core.AlarmHistorian-004 — SQLite busy handling: the connection string
is built via SqliteConnectionStringBuilder with DefaultTimeout=5, and a
new OpenConnection helper applies PRAGMA busy_timeout=5000 and
PRAGMA journal_mode=WAL on every open. A concurrent enqueue-vs-drain
file-lock collision now waits the lock out instead of failing fast with
SQLITE_BUSY. All connection open sites switched to the helper.

Core.AlarmHistorian-006 — drain-loop faults are no longer unobserved:
the timer callback (DrainTimerCallback) awaits DrainOnceAsync inside a
try/catch that logs via _logger.Error, records the message into
_lastError, and sets _drainState=BackingOff so a stalled drain is
visible on GetStatus; a finally always re-arms the timer.

Regression tests added to SqliteStoreAndForwardSinkTests:
StartDrainLoop_honors_backoff_and_slows_cadence_under_retry,
StartDrainLoop_keeps_steady_cadence_when_writer_is_healthy,
StartDrainLoop_records_drain_fault_and_keeps_running,
Concurrent_enqueue_and_drain_do_not_throw_sqlite_busy.

findings.md: 002/004/006 marked Resolved; open count 10 -> 7.

Build: clean (0 warnings). Tests: 20/20 passing.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-22 06:19:55 -04:00
parent 6300a9e4a8
commit 4638366b77
3 changed files with 238 additions and 29 deletions

View File

@@ -31,9 +31,11 @@ namespace ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
/// overflow evicts the oldest non-dead-lettered rows with a WARN log.
/// </para>
/// <para>
/// Drain runs on a shared <see cref="System.Threading.Timer"/>. Exponential
/// backoff on <see cref="HistorianWriteOutcome.RetryPlease"/>: 1s → 2s → 5s →
/// 15s → 60s cap. <see cref="HistorianWriteOutcome.PermanentFail"/> rows flip
/// Drain runs on a self-rescheduling one-shot <see cref="System.Threading.Timer"/>.
/// Exponential backoff on <see cref="HistorianWriteOutcome.RetryPlease"/>:
/// 1s → 2s → 5s → 15s → 60s cap — the backoff is applied to the timer's next
/// due-time, so a historian outage genuinely slows the drain cadence.
/// <see cref="HistorianWriteOutcome.PermanentFail"/> rows flip
/// the <c>DeadLettered</c> flag on the individual row; neighbors in the batch
/// still retry on their own cadence.
/// </para>
@@ -63,6 +65,7 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
private readonly SemaphoreSlim _drainGate = new(1, 1);
private Timer? _drainTimer;
private TimeSpan _tickInterval;
private int _backoffIndex;
private DateTime? _lastDrainUtc;
private DateTime? _lastSuccessUtc;
@@ -87,21 +90,90 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
_capacity = capacity > 0 ? capacity : throw new ArgumentOutOfRangeException(nameof(capacity));
_deadLetterRetention = deadLetterRetention ?? DefaultDeadLetterRetention;
_clock = clock ?? (() => DateTime.UtcNow);
_connectionString = $"Data Source={databasePath}";
// DefaultTimeout gives ADO.NET command-level retry; the PRAGMA busy_timeout
// applied in OpenConnection backs it with SQLite's own busy-handler so an
// enqueue/drain collision waits out the file lock instead of throwing
// SQLITE_BUSY immediately (Core.AlarmHistorian-004).
_connectionString = new SqliteConnectionStringBuilder
{
DataSource = databasePath,
DefaultTimeout = 5,
}.ToString();
InitializeSchema();
}
/// <summary>
/// Open a connection with the busy timeout + WAL journal applied. SQLite
/// serializes writers with a file lock; the busy_timeout lets a writer wait
/// out a competing lock (default is 0 — fail fast), and WAL lets readers and
/// the single writer proceed without blocking each other.
/// </summary>
private SqliteConnection OpenConnection()
{
var conn = new SqliteConnection(_connectionString);
conn.Open();
using var pragma = conn.CreateCommand();
pragma.CommandText = "PRAGMA busy_timeout=5000; PRAGMA journal_mode=WAL;";
pragma.ExecuteNonQuery();
return conn;
}
/// <summary>
/// Start the background drain worker. Not started automatically so tests can
/// drive <see cref="DrainOnceAsync"/> deterministically.
/// </summary>
/// <remarks>
/// The worker is a self-rescheduling one-shot <see cref="Timer"/>: after each
/// drain it sets its next due-time to <c>max(tickInterval, CurrentBackoff)</c>
/// so a historian outage actually slows the cadence down the backoff ladder
/// (Core.AlarmHistorian-002). The callback body is fully guarded — a fault in
/// <see cref="DrainOnceAsync"/> is logged and recorded into
/// <see cref="GetStatus"/> rather than being lost as an unobserved task
/// exception (Core.AlarmHistorian-006).
/// </remarks>
public void StartDrainLoop(TimeSpan tickInterval)
{
if (_disposed) throw new ObjectDisposedException(nameof(SqliteStoreAndForwardSink));
_tickInterval = tickInterval;
_drainTimer?.Dispose();
_drainTimer = new Timer(_ => _ = DrainOnceAsync(CancellationToken.None),
null, tickInterval, tickInterval);
// One-shot: dueTime = tickInterval, period = Infinite. RescheduleDrain re-arms
// it after every tick once the backoff-aware delay is known.
_drainTimer = new Timer(DrainTimerCallback, null, tickInterval, Timeout.InfiniteTimeSpan);
}
private async void DrainTimerCallback(object? _)
{
try
{
await DrainOnceAsync(CancellationToken.None).ConfigureAwait(false);
}
catch (Exception ex)
{
// Without this catch the fault would be an unobserved exception on an
// async-void timer callback — never logged, never surfaced. Record it
// so the Admin UI / health check sees the stalled drain.
_lastError = ex.Message;
_drainState = HistorianDrainState.BackingOff;
_logger.Error(ex, "Historian drain tick faulted; will retry on next tick");
}
finally
{
RescheduleDrain();
}
}
/// <summary>Re-arm the one-shot drain timer honoring the current backoff window.</summary>
private void RescheduleDrain()
{
if (_disposed) return;
// While backing off, wait out the full ladder delay; otherwise the steady
// tick cadence. Never faster than tickInterval.
var delay = _drainState == HistorianDrainState.BackingOff
? (CurrentBackoff > _tickInterval ? CurrentBackoff : _tickInterval)
: _tickInterval;
try { _drainTimer?.Change(delay, Timeout.InfiniteTimeSpan); }
catch (ObjectDisposedException) { /* raced with Dispose — nothing to re-arm */ }
}
public Task EnqueueAsync(AlarmHistorianEvent evt, CancellationToken cancellationToken)
@@ -109,8 +181,7 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
if (evt is null) throw new ArgumentNullException(nameof(evt));
if (_disposed) throw new ObjectDisposedException(nameof(SqliteStoreAndForwardSink));
using var conn = new SqliteConnection(_connectionString);
conn.Open();
using var conn = OpenConnection();
EnforceCapacity(conn);
@@ -158,8 +229,7 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
if (corruptRowIds.Count > 0)
{
using var corruptConn = new SqliteConnection(_connectionString);
corruptConn.Open();
using var corruptConn = OpenConnection();
using var corruptTx = corruptConn.BeginTransaction();
foreach (var rowId in corruptRowIds)
DeadLetterRow(corruptConn, corruptTx, rowId, $"corrupt payload at {_clock():O}");
@@ -199,8 +269,7 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
throw new InvalidOperationException(
$"Writer returned {outcomes.Count} outcomes for {events.Count} events — expected 1:1");
using var conn = new SqliteConnection(_connectionString);
conn.Open();
using var conn = OpenConnection();
using var tx = conn.BeginTransaction();
for (var i = 0; i < outcomes.Count; i++)
{
@@ -243,8 +312,7 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
public HistorianSinkStatus GetStatus()
{
using var conn = new SqliteConnection(_connectionString);
conn.Open();
using var conn = OpenConnection();
long queued;
long deadlettered;
@@ -271,8 +339,7 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
/// <summary>Operator action from Admin UI — retry every dead-lettered row. Non-cascading: they rejoin the regular queue + get a fresh backoff.</summary>
public int RetryDeadLettered()
{
using var conn = new SqliteConnection(_connectionString);
conn.Open();
using var conn = OpenConnection();
using var cmd = conn.CreateCommand();
cmd.CommandText = "UPDATE Queue SET DeadLettered = 0, AttemptCount = 0, LastError = NULL WHERE DeadLettered = 1";
return cmd.ExecuteNonQuery();
@@ -289,8 +356,7 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
private List<QueueRow> ReadBatch()
{
var rows = new List<QueueRow>();
using var conn = new SqliteConnection(_connectionString);
conn.Open();
using var conn = OpenConnection();
using var cmd = conn.CreateCommand();
cmd.CommandText = """
SELECT RowId, PayloadJson FROM Queue
@@ -391,8 +457,7 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
private void PurgeAgedDeadLetters()
{
var cutoff = (_clock() - _deadLetterRetention).ToString("O");
using var conn = new SqliteConnection(_connectionString);
conn.Open();
using var conn = OpenConnection();
using var cmd = conn.CreateCommand();
cmd.CommandText = """
DELETE FROM Queue
@@ -406,8 +471,7 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
private void InitializeSchema()
{
using var conn = new SqliteConnection(_connectionString);
conn.Open();
using var conn = OpenConnection();
using var cmd = conn.CreateCommand();
cmd.CommandText = """
CREATE TABLE IF NOT EXISTS Queue (