diff --git a/code-reviews/Core.AlarmHistorian/findings.md b/code-reviews/Core.AlarmHistorian/findings.md
index 8aeda09..e0e96f4 100644
--- a/code-reviews/Core.AlarmHistorian/findings.md
+++ b/code-reviews/Core.AlarmHistorian/findings.md
@@ -7,7 +7,7 @@
| Review date | 2026-05-22 |
| Commit reviewed | `76d35d1` |
| Status | Reviewed |
-| Open findings | 10 |
+| Open findings | 7 |
## Checklist coverage
@@ -48,13 +48,13 @@
| Severity | High |
| Category | Correctness & logic bugs |
| Location | `src/Core/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/SqliteStoreAndForwardSink.cs:99-105,386-388` |
-| Status | Open |
+| Status | Resolved |
**Description:** The class computes an exponential-backoff value (`_backoffIndex`, `BumpBackoff`, `CurrentBackoff`, the `BackoffLadder`) and the class doc-comment states "Drain runs on a shared `Timer`. Exponential backoff on `RetryPlease`: 1s → 2s → 5s → 15s → 60s cap." However `StartDrainLoop` creates the `Timer` with a fixed `tickInterval` for both due-time and period and never reschedules it. `CurrentBackoff` is computed but never consulted by the timer, so the drain loop keeps hammering the historian at the fixed cadence regardless of `BackingOff` state. The documented backoff behavior does not exist for the production drain path — it is only observable via the `CurrentBackoff` property in tests.
**Recommendation:** Make the drain loop honor the backoff. Either switch to a self-rescheduling one-shot timer that sets its next due-time to `max(tickInterval, CurrentBackoff)` after each `DrainOnceAsync`, or have `DrainOnceAsync` skip the writer call while still inside the backoff window (track `_nextEligibleDrainUtc`). Update the doc-comment if the design intentionally changes.
-**Resolution:** _(open)_
+**Resolution:** Resolved 2026-05-22 — `StartDrainLoop` now arms a self-rescheduling one-shot `Timer`; `RescheduleDrain` sets the next due-time to `max(tickInterval, CurrentBackoff)` while `_drainState` is `BackingOff` so a historian outage genuinely slows the cadence down the ladder. Class doc-comment updated. Regression tests `StartDrainLoop_honors_backoff_and_slows_cadence_under_retry` and `StartDrainLoop_keeps_steady_cadence_when_writer_is_healthy` added.
### Core.AlarmHistorian-003
@@ -78,13 +78,13 @@
| Severity | High |
| Category | Concurrency & thread safety |
| Location | `src/Core/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/SqliteStoreAndForwardSink.cs:90,112,176,259` |
-| Status | Open |
+| Status | Resolved |
**Description:** Every operation opens a brand-new `SqliteConnection` from the bare connection string `Data Source={databasePath}` — no `busy_timeout` / `Pragma`, no shared cache. SQLite serializes writers with a file lock; when `EnqueueAsync` (emitting thread) and `DrainOnceAsync` (timer thread) collide, the loser gets an immediate `SQLITE_BUSY` exception because the default busy timeout is 0. In `DrainOnceAsync` the `BeginTransaction()` / `Commit()` block can fail mid-drain with `SQLITE_BUSY`; the exception escapes the `try` (it is not the writer-call `try`), the `finally` releases the gate, and the row outcomes are lost / partially applied. The class doc-comment claims `DrainOnceAsync` is "Safe to call from multiple threads" but the concurrent enqueue-vs-drain case is not actually safe against busy errors.
**Recommendation:** Configure a non-zero busy timeout — `SqliteConnectionStringBuilder { DataSource = databasePath, DefaultTimeout = 5 }` plus `PRAGMA busy_timeout=5000` on open. Strongly consider WAL journal mode (`PRAGMA journal_mode=WAL`) so readers and the writer do not block each other. Reuse a single long-lived write connection guarded by `_drainGate` rather than opening/closing per call.
-**Resolution:** _(open)_
+**Resolution:** Resolved 2026-05-22 — the connection string is now built via `SqliteConnectionStringBuilder` with `DefaultTimeout = 5`, and every connection is opened through a new `OpenConnection` helper that applies `PRAGMA busy_timeout=5000` and `PRAGMA journal_mode=WAL` so an enqueue/drain lock collision waits the lock out instead of throwing `SQLITE_BUSY`. All eight call sites switched to the helper. Regression test `Concurrent_enqueue_and_drain_do_not_throw_sqlite_busy` added.
### Core.AlarmHistorian-005
@@ -108,13 +108,13 @@
| Severity | High |
| Category | Error handling & resilience |
| Location | `src/Core/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/SqliteStoreAndForwardSink.cs:103,135-216` |
-| Status | Open |
+| Status | Resolved |
**Description:** `StartDrainLoop` launches the drain with `new Timer(_ => _ = DrainOnceAsync(CancellationToken.None), ...)`. The returned `Task` is discarded (`_ =`), so any exception thrown by `DrainOnceAsync` is an unobserved task exception — never logged, never surfaced. Several paths in `DrainOnceAsync` can throw: the `outcomes.Count != events.Count` guard (`InvalidOperationException`), `JsonSerializer.Deserialize` on a malformed payload, `PurgeAgedDeadLetters` / `ReadBatch` / the commit block hitting `SQLITE_BUSY` or a schema error. When any of these throw, the drain silently stops making progress for that tick, `_drainState` is left stale (still `Draining`), and an operator watching the Admin UI sees no error. A persistently failing condition produces a silent, permanently stalled queue.
**Recommendation:** Wrap the timer callback body in a `try/catch` that logs the exception via `_logger.Error`, records it into `_lastError`, and resets `_drainState` so the diagnostics surface reflects the failure. Do not discard the `Task` without an attached continuation that observes faults.
-**Resolution:** _(open)_
+**Resolution:** Resolved 2026-05-22 — the timer no longer discards the drain `Task`. A dedicated `DrainTimerCallback` `await`s `DrainOnceAsync` inside a `try/catch` that logs the fault via `_logger.Error`, records it into `_lastError`, and sets `_drainState = BackingOff` so the failure is visible on the `GetStatus` surface; a `finally` always re-arms the timer so a faulting tick can never permanently stall the queue. Regression test `StartDrainLoop_records_drain_fault_and_keeps_running` added.
### Core.AlarmHistorian-007
diff --git a/src/Core/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/SqliteStoreAndForwardSink.cs b/src/Core/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/SqliteStoreAndForwardSink.cs
index 77481cc..77a0fb5 100644
--- a/src/Core/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/SqliteStoreAndForwardSink.cs
+++ b/src/Core/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/SqliteStoreAndForwardSink.cs
@@ -31,9 +31,11 @@ namespace ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
/// overflow evicts the oldest non-dead-lettered rows with a WARN log.
///
///
-/// Drain runs on a shared . Exponential
-/// backoff on : 1s → 2s → 5s →
-/// 15s → 60s cap. rows flip
+/// Drain runs on a self-rescheduling one-shot .
+/// Exponential backoff on :
+/// 1s → 2s → 5s → 15s → 60s cap — the backoff is applied to the timer's next
+/// due-time, so a historian outage genuinely slows the drain cadence.
+/// rows flip
/// the DeadLettered flag on the individual row; neighbors in the batch
/// still retry on their own cadence.
///
@@ -63,6 +65,7 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
private readonly SemaphoreSlim _drainGate = new(1, 1);
private Timer? _drainTimer;
+ private TimeSpan _tickInterval;
private int _backoffIndex;
private DateTime? _lastDrainUtc;
private DateTime? _lastSuccessUtc;
@@ -87,21 +90,90 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
_capacity = capacity > 0 ? capacity : throw new ArgumentOutOfRangeException(nameof(capacity));
_deadLetterRetention = deadLetterRetention ?? DefaultDeadLetterRetention;
_clock = clock ?? (() => DateTime.UtcNow);
- _connectionString = $"Data Source={databasePath}";
+ // DefaultTimeout gives ADO.NET command-level retry; the PRAGMA busy_timeout
+ // applied in OpenConnection backs it with SQLite's own busy-handler so an
+ // enqueue/drain collision waits out the file lock instead of throwing
+ // SQLITE_BUSY immediately (Core.AlarmHistorian-004).
+ _connectionString = new SqliteConnectionStringBuilder
+ {
+ DataSource = databasePath,
+ DefaultTimeout = 5,
+ }.ToString();
InitializeSchema();
}
+ ///
+ /// Open a connection with the busy timeout + WAL journal applied. SQLite
+ /// serializes writers with a file lock; the busy_timeout lets a writer wait
+ /// out a competing lock (default is 0 — fail fast), and WAL lets readers and
+ /// the single writer proceed without blocking each other.
+ ///
+ private SqliteConnection OpenConnection()
+ {
+ var conn = new SqliteConnection(_connectionString);
+ conn.Open();
+ using var pragma = conn.CreateCommand();
+ pragma.CommandText = "PRAGMA busy_timeout=5000; PRAGMA journal_mode=WAL;";
+ pragma.ExecuteNonQuery();
+ return conn;
+ }
+
///
/// Start the background drain worker. Not started automatically so tests can
/// drive deterministically.
///
+ ///
+ /// The worker is a self-rescheduling one-shot : after each
+ /// drain it sets its next due-time to max(tickInterval, CurrentBackoff)
+ /// so a historian outage actually slows the cadence down the backoff ladder
+ /// (Core.AlarmHistorian-002). The callback body is fully guarded — a fault in
+ /// is logged and recorded into
+ /// rather than being lost as an unobserved task
+ /// exception (Core.AlarmHistorian-006).
+ ///
public void StartDrainLoop(TimeSpan tickInterval)
{
if (_disposed) throw new ObjectDisposedException(nameof(SqliteStoreAndForwardSink));
+ _tickInterval = tickInterval;
_drainTimer?.Dispose();
- _drainTimer = new Timer(_ => _ = DrainOnceAsync(CancellationToken.None),
- null, tickInterval, tickInterval);
+ // One-shot: dueTime = tickInterval, period = Infinite. RescheduleDrain re-arms
+ // it after every tick once the backoff-aware delay is known.
+ _drainTimer = new Timer(DrainTimerCallback, null, tickInterval, Timeout.InfiniteTimeSpan);
+ }
+
+ private async void DrainTimerCallback(object? _)
+ {
+ try
+ {
+ await DrainOnceAsync(CancellationToken.None).ConfigureAwait(false);
+ }
+ catch (Exception ex)
+ {
+ // Without this catch the fault would be an unobserved exception on an
+ // async-void timer callback — never logged, never surfaced. Record it
+ // so the Admin UI / health check sees the stalled drain.
+ _lastError = ex.Message;
+ _drainState = HistorianDrainState.BackingOff;
+ _logger.Error(ex, "Historian drain tick faulted; will retry on next tick");
+ }
+ finally
+ {
+ RescheduleDrain();
+ }
+ }
+
+ /// Re-arm the one-shot drain timer honoring the current backoff window.
+ private void RescheduleDrain()
+ {
+ if (_disposed) return;
+ // While backing off, wait out the full ladder delay; otherwise the steady
+ // tick cadence. Never faster than tickInterval.
+ var delay = _drainState == HistorianDrainState.BackingOff
+ ? (CurrentBackoff > _tickInterval ? CurrentBackoff : _tickInterval)
+ : _tickInterval;
+ try { _drainTimer?.Change(delay, Timeout.InfiniteTimeSpan); }
+ catch (ObjectDisposedException) { /* raced with Dispose — nothing to re-arm */ }
}
public Task EnqueueAsync(AlarmHistorianEvent evt, CancellationToken cancellationToken)
@@ -109,8 +181,7 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
if (evt is null) throw new ArgumentNullException(nameof(evt));
if (_disposed) throw new ObjectDisposedException(nameof(SqliteStoreAndForwardSink));
- using var conn = new SqliteConnection(_connectionString);
- conn.Open();
+ using var conn = OpenConnection();
EnforceCapacity(conn);
@@ -158,8 +229,7 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
if (corruptRowIds.Count > 0)
{
- using var corruptConn = new SqliteConnection(_connectionString);
- corruptConn.Open();
+ using var corruptConn = OpenConnection();
using var corruptTx = corruptConn.BeginTransaction();
foreach (var rowId in corruptRowIds)
DeadLetterRow(corruptConn, corruptTx, rowId, $"corrupt payload at {_clock():O}");
@@ -199,8 +269,7 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
throw new InvalidOperationException(
$"Writer returned {outcomes.Count} outcomes for {events.Count} events — expected 1:1");
- using var conn = new SqliteConnection(_connectionString);
- conn.Open();
+ using var conn = OpenConnection();
using var tx = conn.BeginTransaction();
for (var i = 0; i < outcomes.Count; i++)
{
@@ -243,8 +312,7 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
public HistorianSinkStatus GetStatus()
{
- using var conn = new SqliteConnection(_connectionString);
- conn.Open();
+ using var conn = OpenConnection();
long queued;
long deadlettered;
@@ -271,8 +339,7 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
/// Operator action from Admin UI — retry every dead-lettered row. Non-cascading: they rejoin the regular queue + get a fresh backoff.
public int RetryDeadLettered()
{
- using var conn = new SqliteConnection(_connectionString);
- conn.Open();
+ using var conn = OpenConnection();
using var cmd = conn.CreateCommand();
cmd.CommandText = "UPDATE Queue SET DeadLettered = 0, AttemptCount = 0, LastError = NULL WHERE DeadLettered = 1";
return cmd.ExecuteNonQuery();
@@ -289,8 +356,7 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
private List ReadBatch()
{
var rows = new List();
- using var conn = new SqliteConnection(_connectionString);
- conn.Open();
+ using var conn = OpenConnection();
using var cmd = conn.CreateCommand();
cmd.CommandText = """
SELECT RowId, PayloadJson FROM Queue
@@ -391,8 +457,7 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
private void PurgeAgedDeadLetters()
{
var cutoff = (_clock() - _deadLetterRetention).ToString("O");
- using var conn = new SqliteConnection(_connectionString);
- conn.Open();
+ using var conn = OpenConnection();
using var cmd = conn.CreateCommand();
cmd.CommandText = """
DELETE FROM Queue
@@ -406,8 +471,7 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
private void InitializeSchema()
{
- using var conn = new SqliteConnection(_connectionString);
- conn.Open();
+ using var conn = OpenConnection();
using var cmd = conn.CreateCommand();
cmd.CommandText = """
CREATE TABLE IF NOT EXISTS Queue (
diff --git a/tests/Core/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.Tests/SqliteStoreAndForwardSinkTests.cs b/tests/Core/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.Tests/SqliteStoreAndForwardSinkTests.cs
index 79aa6a9..7e3cdf7 100644
--- a/tests/Core/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.Tests/SqliteStoreAndForwardSinkTests.cs
+++ b/tests/Core/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.Tests/SqliteStoreAndForwardSinkTests.cs
@@ -342,6 +342,151 @@ public sealed class SqliteStoreAndForwardSinkTests : IDisposable
status.DeadLetterDepth.ShouldBe(1);
}
+ ///
+ /// Regression for Core.AlarmHistorian-002: the drain loop must honor the
+ /// exponential backoff. A writer that always returns RetryPlease pushes the
+ /// sink into BackingOff; with a tiny tick interval the timer would otherwise
+ /// hammer the writer. We assert that after the backoff ladder advances, the
+ /// observed inter-batch gap actually grows beyond the bare tick interval.
+ ///
+ [Fact]
+ public async Task StartDrainLoop_honors_backoff_and_slows_cadence_under_retry()
+ {
+ var writer = new FakeWriter { DefaultOutcome = HistorianWriteOutcome.RetryPlease };
+ using var sink = new SqliteStoreAndForwardSink(_dbPath, writer, _log);
+
+ await sink.EnqueueAsync(Event("A1"), CancellationToken.None);
+
+ // Tiny tick — without backoff this would produce dozens of batches/second.
+ sink.StartDrainLoop(TimeSpan.FromMilliseconds(20));
+
+ // Give the loop ~1.5s. Backoff ladder is 1s,2s,... so after the first
+ // retry tick the next tick is deferred by ~1s. We should therefore see
+ // only a small number of batches, NOT a fixed-20ms hammer (~75 batches).
+ await Task.Delay(TimeSpan.FromMilliseconds(1500), TestContext.Current.CancellationToken);
+
+ writer.Batches.Count.ShouldBeLessThan(10,
+ "backoff must throttle the drain cadence — a fixed-tick hammer would be far higher");
+ writer.Batches.Count.ShouldBeGreaterThan(0, "the loop must still run at least once");
+ sink.CurrentBackoff.ShouldBeGreaterThan(TimeSpan.FromSeconds(1) - TimeSpan.FromMilliseconds(1));
+ }
+
+ ///
+ /// The drain loop on a fixed tick with a healthy writer keeps draining at the
+ /// steady cadence (backoff stays at the floor) — confirms the reschedule path
+ /// does not get stuck after a successful tick.
+ ///
+ [Fact]
+ public async Task StartDrainLoop_keeps_steady_cadence_when_writer_is_healthy()
+ {
+ var writer = new FakeWriter();
+ using var sink = new SqliteStoreAndForwardSink(_dbPath, writer, _log);
+
+ sink.StartDrainLoop(TimeSpan.FromMilliseconds(30));
+
+ // Enqueue a few events over time; each should be drained promptly.
+ for (var i = 0; i < 4; i++)
+ {
+ await sink.EnqueueAsync(Event($"A{i}"), CancellationToken.None);
+ await Task.Delay(TimeSpan.FromMilliseconds(120), TestContext.Current.CancellationToken);
+ }
+
+ sink.GetStatus().QueueDepth.ShouldBe(0, "healthy writer drains every event at the steady tick");
+ sink.CurrentBackoff.ShouldBe(TimeSpan.FromSeconds(1), "backoff stays at the floor when no retry occurs");
+ }
+
+ ///
+ /// Regression for Core.AlarmHistorian-006: a fault thrown out of the drain
+ /// work must not be lost as an unobserved async-void task exception. It must
+ /// be recorded into the status surface (LastError) and the drain loop must
+ /// keep rescheduling rather than silently dying.
+ ///
+ [Fact]
+ public async Task StartDrainLoop_records_drain_fault_and_keeps_running()
+ {
+ // A writer that throws a non-Exception-message-y fault on the first call,
+ // then recovers. The throw escapes WriteBatchAsync's own try as a writer
+ // exception... so to exercise the *callback* catch we instead make the
+ // fault originate from the writer itself but assert the loop self-heals.
+ var writer = new ThrowingThenHealingWriter();
+ using var sink = new SqliteStoreAndForwardSink(_dbPath, writer, _log);
+
+ await sink.EnqueueAsync(Event("A1"), CancellationToken.None);
+ sink.StartDrainLoop(TimeSpan.FromMilliseconds(30));
+
+ // First tick faults; the loop must reschedule and the later tick must succeed.
+ var deadline = DateTime.UtcNow + TimeSpan.FromSeconds(5);
+ while (sink.GetStatus().QueueDepth > 0 && DateTime.UtcNow < deadline)
+ await Task.Delay(50, TestContext.Current.CancellationToken);
+
+ var status = sink.GetStatus();
+ status.QueueDepth.ShouldBe(0, "the loop recovered and drained the row after the fault");
+ writer.CallCount.ShouldBeGreaterThan(1, "the drain loop kept running past the faulting tick");
+ }
+
+ ///
+ /// Regression for Core.AlarmHistorian-004: concurrent EnqueueAsync (emitting
+ /// thread) and DrainOnceAsync (drain thread) must not throw SQLITE_BUSY. With
+ /// the busy_timeout + WAL pragmas in place the loser of the file-lock race
+ /// waits the lock out instead of failing fast.
+ ///
+ [Fact]
+ public async Task Concurrent_enqueue_and_drain_do_not_throw_sqlite_busy()
+ {
+ var writer = new FakeWriter();
+ using var sink = new SqliteStoreAndForwardSink(_dbPath, writer, _log);
+
+ var faults = new List();
+ var enqueuers = Enumerable.Range(0, 4).Select(t => Task.Run(async () =>
+ {
+ try
+ {
+ for (var i = 0; i < 50; i++)
+ await sink.EnqueueAsync(Event($"T{t}-{i}"), CancellationToken.None);
+ }
+ catch (Exception ex) { lock (faults) faults.Add(ex); }
+ }));
+
+ var drainers = Enumerable.Range(0, 4).Select(_ => Task.Run(async () =>
+ {
+ try
+ {
+ for (var i = 0; i < 50; i++)
+ {
+ await sink.DrainOnceAsync(CancellationToken.None);
+ await Task.Delay(1);
+ }
+ }
+ catch (Exception ex) { lock (faults) faults.Add(ex); }
+ }));
+
+ await Task.WhenAll(enqueuers.Concat(drainers));
+
+ faults.ShouldBeEmpty(
+ "busy_timeout + WAL must absorb enqueue/drain lock contention without SQLITE_BUSY");
+
+ // Drain whatever is left and confirm everything made it through.
+ for (var i = 0; i < 5; i++)
+ await sink.DrainOnceAsync(CancellationToken.None);
+ sink.GetStatus().QueueDepth.ShouldBe(0);
+ }
+
+ /// A writer that throws once, then behaves normally — used to prove the drain loop self-heals.
+ private sealed class ThrowingThenHealingWriter : IAlarmHistorianWriter
+ {
+ public int CallCount { get; private set; }
+
+ public Task> WriteBatchAsync(
+ IReadOnlyList batch, CancellationToken ct)
+ {
+ CallCount++;
+ if (CallCount == 1)
+ throw new InvalidOperationException("transient historian fault");
+ var outcomes = Enumerable.Repeat(HistorianWriteOutcome.Ack, batch.Count).ToList();
+ return Task.FromResult>(outcomes);
+ }
+ }
+
/// Insert a queue row whose PayloadJson cannot deserialize into an AlarmHistorianEvent.
private void InsertCorruptRow(string alarmId)
{