From 0da4f3b63a3f6d61599bfeb6301e5f35994f087e Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 23 May 2026 05:38:26 -0400 Subject: [PATCH] fix(core-alarm-historian): resolve Low code-review findings (Core.AlarmHistorian-008,011) - Core.AlarmHistorian-008: cache queue depth in an Interlocked counter so EnqueueAsync no longer runs COUNT(*) on every alarm; consolidate DrainOnceAsync onto a single SqliteConnection per tick (purge, batch read, dead-letter, and outcome transaction all share it). - Core.AlarmHistorian-011: confirm the stale Galaxy.Host XML doc references were already fixed under earlier commits; flip to Resolved. Co-Authored-By: Claude Opus 4.7 (1M context) --- code-reviews/Core.AlarmHistorian/findings.md | 10 +- .../SqliteStoreAndForwardSink.cs | 209 ++++++++++++------ .../SqliteStoreAndForwardSinkTests.cs | 124 +++++++++++ 3 files changed, 269 insertions(+), 74 deletions(-) diff --git a/code-reviews/Core.AlarmHistorian/findings.md b/code-reviews/Core.AlarmHistorian/findings.md index 12672d8..cc73158 100644 --- a/code-reviews/Core.AlarmHistorian/findings.md +++ b/code-reviews/Core.AlarmHistorian/findings.md @@ -7,7 +7,7 @@ | Review date | 2026-05-22 | | Commit reviewed | `76d35d1` | | Status | Reviewed | -| Open findings | 2 | +| Open findings | 0 | ## Checklist coverage @@ -138,13 +138,13 @@ | Severity | Low | | Category | Performance & resource management | | Location | `src/Core/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/SqliteStoreAndForwardSink.cs:107-127,255-278` | -| Status | Open | +| Status | Resolved | **Description:** Each `EnqueueAsync` (one per alarm transition — a hot path on a busy plant) opens a connection, runs `EnforceCapacity` (a `COUNT(*)` over the queue table on every single enqueue), serializes JSON, inserts, and closes the connection. The unconditional `COUNT(*)` on every enqueue is an avoidable scan; the open/close churn defeats connection pooling benefits and adds lock-acquisition overhead per event. `DrainOnceAsync` similarly opens three separate connections per tick (`PurgeAgedDeadLetters`, `ReadBatch`, the transaction block). **Recommendation:** Reuse a single pooled write connection. Replace the per-enqueue `COUNT(*)` with a periodic capacity check (every Nth enqueue, or piggy-backed on the drain tick), or maintain an in-memory approximate counter. Combine the drain-tick connections into one. -**Resolution:** _(open)_ +**Resolution:** Resolved 2026-05-23 — added an `Interlocked`-guarded in-memory `_queuedRowCount` seeded from storage at construction and kept current by every mutation (enqueue increment, drain Ack/PermanentFail/corrupt-dead-letter decrements, capacity-eviction adjustment, RetryDeadLettered re-add). `EnqueueAsync` now short-circuits capacity enforcement against the cached counter via `EnforceCapacityFastPathAsync`, only paying for a real `COUNT(*)` when the cached value reaches the capacity wall or the periodic resync interval (every 10,000 enqueues) elapses; the obsolete sync `EnforceCapacity` was removed. `GetStatus()` reads `QueueDepth` from the same counter so a busy Admin UI no longer hits the DB for it. `DrainOnceAsync` is consolidated onto one shared `SqliteConnection` per tick — purge, read, corrupt-dead-letter, and the outcome-applying transaction now reuse it instead of opening three. Regression tests `EnqueueAsync_does_not_count_all_rows_on_every_call_below_capacity`, `Enqueue_and_drain_keep_queue_depth_consistent_with_storage`, and `Counter_remains_consistent_under_concurrent_enqueue_and_drain` added. ### Core.AlarmHistorian-009 @@ -183,10 +183,10 @@ | Severity | Low | | Category | Documentation & comments | | Location | `src/Core/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/IAlarmHistorianSink.cs:5-9,76`, `AlarmHistorianEvent.cs:20` | -| Status | Open | +| Status | Resolved | **Description:** Several doc-comments reference the retired v1 architecture. The `IAlarmHistorianSink` summary says ingestion "routes through Galaxy.Host's pipe" and `IAlarmHistorianWriter` says "Stream G wires this to the Galaxy.Host IPC client", but `docs/AlarmTracking.md` and `CLAUDE.md` state the legacy `Galaxy.Host` project was retired in PR 7.2 and the write path is now the Wonderware historian sidecar (`WonderwareHistorianClient`). `AlarmHistorianEvent.cs:20` likewise says "the Galaxy.Host handler maps to the historian's enum on the wire." These stale references will mislead a reader about where the writer is actually hosted. **Recommendation:** Update the doc-comments to refer to the Wonderware historian sidecar / `WonderwareHistorianClient` (`IAlarmHistorianWriter` implementation) instead of `Galaxy.Host`, consistent with `docs/AlarmTracking.md`'s "Historian write-back" section. -**Resolution:** _(open)_ +**Resolution:** Resolved 2026-05-23 — the three stale `Galaxy.Host` references were already replaced ahead of this resolution by earlier commits (`bdca772` rewrote the `IAlarmHistorianSink` summary + `IAlarmHistorianWriter` summary to name the Wonderware historian sidecar / `WonderwareHistorianClient`; `f6d487b` rewrote the `AlarmHistorianEvent.EventKind` doc-comment). A fresh grep across the project confirms no remaining `Galaxy.Host` / "Stream G wires this" strings — only the legitimate `Galaxy-native` alarm-source label survives. Status flipped to Resolved during the -008 pass; no new source change was needed. diff --git a/src/Core/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/SqliteStoreAndForwardSink.cs b/src/Core/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/SqliteStoreAndForwardSink.cs index 9e51857..a1005ad 100644 --- a/src/Core/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/SqliteStoreAndForwardSink.cs +++ b/src/Core/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/SqliteStoreAndForwardSink.cs @@ -87,6 +87,25 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable // having to scrape the WARN log. private long _evictedCount; + // Core.AlarmHistorian-008: keep an approximate in-memory count of non-dead-lettered + // rows so EnqueueAsync does not need to run a SELECT COUNT(*) on every call. The + // counter is seeded from storage at construction, kept current by every mutation + // (Enqueue, Drain, RetryDeadLettered, PurgeAgedDeadLetters, EnforceCapacity), and + // periodically re-synced from storage as a safety net against drift. + // Mutations cross threads (EnqueueAsync is called from the emitting thread, drain + // runs on the timer / drain thread) so it is updated via Interlocked. + private long _queuedRowCount; + // Probe counter — incremented every time we actually issue a real COUNT(*) for + // capacity enforcement. Public for test instrumentation only. + private long _capacityProbeCount; + // After every Nth enqueue we resync the in-memory counter from storage to defend + // against silent drift (e.g. an external process editing the DB). + private const long ResyncEnqueueInterval = 10_000; + private long _enqueuesSinceResync; + + /// Test-only: number of times the perf-optimised path fell through to a real COUNT(*). + public long DebugCapacityProbeCount => Interlocked.Read(ref _capacityProbeCount); + public SqliteStoreAndForwardSink( string databasePath, IAlarmHistorianWriter writer, @@ -115,6 +134,9 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable }.ToString(); InitializeSchema(); + // Core.AlarmHistorian-008: seed the in-memory counter from storage so the + // perf-optimised EnqueueAsync path starts in sync with what's on disk. + _queuedRowCount = ProbeQueuedRowCount(); } /// @@ -223,7 +245,11 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable await conn.OpenAsync(cancellationToken).ConfigureAwait(false); await ApplyPragmasAsync(conn, cancellationToken).ConfigureAwait(false); - await EnforceCapacityAsync(conn, cancellationToken).ConfigureAwait(false); + // Core.AlarmHistorian-008: use the in-memory counter to short-circuit the + // capacity check on every enqueue. The bare hot path is now one INSERT — no + // SELECT COUNT(*). We fall back to a real probe only when the cached counter + // says we're at or above capacity, or periodically to defend against drift. + await EnforceCapacityFastPathAsync(conn, cancellationToken).ConfigureAwait(false); using var cmd = conn.CreateCommand(); cmd.CommandText = """ @@ -234,6 +260,57 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable cmd.Parameters.AddWithValue("$enqueued", _clock().ToString("O")); cmd.Parameters.AddWithValue("$payload", JsonSerializer.Serialize(evt)); await cmd.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false); + + Interlocked.Increment(ref _queuedRowCount); + } + + /// + /// Capacity enforcement on the hot enqueue path: consults the in-memory counter + /// first and only probes storage with a real COUNT(*) when (a) the + /// cached value indicates the capacity wall is in reach, or (b) the periodic + /// resync interval has elapsed. The actual eviction (when over capacity) goes + /// through which still runs a precise + /// COUNT to compute the exact number of rows to evict. + /// + private async Task EnforceCapacityFastPathAsync(SqliteConnection conn, CancellationToken ct) + { + var enqueuesSinceResync = Interlocked.Increment(ref _enqueuesSinceResync); + var cached = Interlocked.Read(ref _queuedRowCount); + + // Periodic resync — bounded amount of drift even under exotic conditions. + if (enqueuesSinceResync >= ResyncEnqueueInterval) + { + await ResyncQueuedRowCountAsync(conn, ct).ConfigureAwait(false); + cached = Interlocked.Read(ref _queuedRowCount); + Interlocked.Exchange(ref _enqueuesSinceResync, 0); + } + + // Below capacity per the cached counter — skip the COUNT(*) entirely. + if (cached < _capacity) return; + + // Cached counter says we're at or above the capacity wall — fall back to the + // precise path which probes COUNT(*) and evicts whatever's needed. + await EnforceCapacityAsync(conn, ct).ConfigureAwait(false); + } + + /// Synchronously query COUNT(*) of non-dead-lettered rows. Used at startup. + private long ProbeQueuedRowCount() + { + Interlocked.Increment(ref _capacityProbeCount); + using var conn = OpenConnection(); + using var cmd = conn.CreateCommand(); + cmd.CommandText = "SELECT COUNT(*) FROM Queue WHERE DeadLettered = 0"; + return (long)(cmd.ExecuteScalar() ?? 0L); + } + + /// Re-sync the in-memory counter from storage (async path). + private async Task ResyncQueuedRowCountAsync(SqliteConnection conn, CancellationToken ct) + { + Interlocked.Increment(ref _capacityProbeCount); + using var cmd = conn.CreateCommand(); + cmd.CommandText = "SELECT COUNT(*) FROM Queue WHERE DeadLettered = 0"; + var live = (long)(await cmd.ExecuteScalarAsync(ct).ConfigureAwait(false) ?? 0L); + Interlocked.Exchange(ref _queuedRowCount, live); } /// @@ -242,6 +319,12 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable /// on RetryPlease. Safe to call from multiple threads; the semaphore enforces /// serial execution. /// + /// + /// Core.AlarmHistorian-008: every per-tick SQLite operation runs through a + /// single shared connection (purge, read, corrupt-row dead-letter, and the + /// outcome-applying transaction). Pre-fix the drain opened three independent + /// connections per tick, each paying the open + PRAGMA cost. + /// public async Task DrainOnceAsync(CancellationToken ct) { if (_disposed) return; @@ -254,8 +337,12 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable _lastDrainUtc = _clock(); } - PurgeAgedDeadLetters(); - var batch = ReadBatch(); + // One connection per drain tick — used by purge, read, corrupt-dead-letter, + // and the outcome-applying transaction. + using var conn = OpenConnection(); + + PurgeAgedDeadLetters(conn); + var batch = ReadBatch(conn); if (batch.Count == 0) { lock (_statusLock) { _drainState = HistorianDrainState.Idle; } @@ -271,11 +358,13 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable if (corruptRowIds.Count > 0) { - using var corruptConn = OpenConnection(); - using var corruptTx = corruptConn.BeginTransaction(); + using var corruptTx = conn.BeginTransaction(); foreach (var rowId in corruptRowIds) - DeadLetterRow(corruptConn, corruptTx, rowId, $"corrupt payload at {_clock():O}"); + DeadLetterRow(conn, corruptTx, rowId, $"corrupt payload at {_clock():O}"); corruptTx.Commit(); + // Each corrupt row leaves the non-dead-lettered queue — bookkeeping for + // the in-memory counter (Core.AlarmHistorian-008). + Interlocked.Add(ref _queuedRowCount, -corruptRowIds.Count); _logger.Warning( "Dead-lettered {Count} historian queue row(s) with un-deserializable payload", corruptRowIds.Count); @@ -330,26 +419,34 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable return; } - using var conn = OpenConnection(); - using var tx = conn.BeginTransaction(); - for (var i = 0; i < outcomes.Count; i++) + int rowsLeavingQueue = 0; + using (var tx = conn.BeginTransaction()) { - var outcome = outcomes[i]; - var rowId = liveRows[i].RowId; - switch (outcome) + for (var i = 0; i < outcomes.Count; i++) { - case HistorianWriteOutcome.Ack: - DeleteRow(conn, tx, rowId); - break; - case HistorianWriteOutcome.PermanentFail: - DeadLetterRow(conn, tx, rowId, $"permanent fail at {_clock():O}"); - break; - case HistorianWriteOutcome.RetryPlease: - BumpAttempt(conn, tx, rowId, "retry-please"); - break; + var outcome = outcomes[i]; + var rowId = liveRows[i].RowId; + switch (outcome) + { + case HistorianWriteOutcome.Ack: + DeleteRow(conn, tx, rowId); + rowsLeavingQueue++; + break; + case HistorianWriteOutcome.PermanentFail: + DeadLetterRow(conn, tx, rowId, $"permanent fail at {_clock():O}"); + rowsLeavingQueue++; + break; + case HistorianWriteOutcome.RetryPlease: + BumpAttempt(conn, tx, rowId, "retry-please"); + break; + } } + tx.Commit(); } - tx.Commit(); + // Ack-deleted + PermanentFail-dead-lettered rows both leave the + // non-dead-lettered queue — keep the counter aligned (Core.AlarmHistorian-008). + if (rowsLeavingQueue > 0) + Interlocked.Add(ref _queuedRowCount, -rowsLeavingQueue); var acks = outcomes.Count(o => o == HistorianWriteOutcome.Ack); lock (_statusLock) @@ -375,15 +472,15 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable public HistorianSinkStatus GetStatus() { - using var conn = OpenConnection(); + // Core.AlarmHistorian-008: read the non-dead-lettered count from the in-memory + // counter so a busy Admin UI / health probe does not hammer the DB. Dead-letter + // depth is rare-path only (it lives in the queue until retention) so a real + // COUNT(*) on a single combined connection is fine. + var queued = Interlocked.Read(ref _queuedRowCount); + if (queued < 0) queued = 0; - long queued; long deadlettered; - using (var cmd = conn.CreateCommand()) - { - cmd.CommandText = "SELECT COUNT(*) FROM Queue WHERE DeadLettered = 0"; - queued = (long)(cmd.ExecuteScalar() ?? 0L); - } + using (var conn = OpenConnection()) using (var cmd = conn.CreateCommand()) { cmd.CommandText = "SELECT COUNT(*) FROM Queue WHERE DeadLettered = 1"; @@ -421,7 +518,11 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable using var conn = OpenConnection(); using var cmd = conn.CreateCommand(); cmd.CommandText = "UPDATE Queue SET DeadLettered = 0, AttemptCount = 0, LastError = NULL WHERE DeadLettered = 1"; - return cmd.ExecuteNonQuery(); + var revived = cmd.ExecuteNonQuery(); + // Dead-lettered rows rejoin the non-dead-lettered queue — keep the in-memory + // counter aligned (Core.AlarmHistorian-008). + if (revived > 0) Interlocked.Add(ref _queuedRowCount, revived); + return revived; } /// @@ -432,10 +533,9 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable /// private readonly record struct QueueRow(long RowId, AlarmHistorianEvent? Event); - private List ReadBatch() + private List ReadBatch(SqliteConnection conn) { var rows = new List(); - using var conn = OpenConnection(); using var cmd = conn.CreateCommand(); cmd.CommandText = """ SELECT RowId, PayloadJson FROM Queue @@ -501,50 +601,21 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable cmd.ExecuteNonQuery(); } - private void EnforceCapacity(SqliteConnection conn) - { - // Count non-dead-lettered rows only — dead-lettered rows retain for - // post-mortem per the configured retention window. - long count; - using (var cmd = conn.CreateCommand()) - { - cmd.CommandText = "SELECT COUNT(*) FROM Queue WHERE DeadLettered = 0"; - count = (long)(cmd.ExecuteScalar() ?? 0L); - } - if (count < _capacity) return; - - var toEvict = count - _capacity + 1; - using (var cmd = conn.CreateCommand()) - { - cmd.CommandText = """ - DELETE FROM Queue - WHERE RowId IN ( - SELECT RowId FROM Queue - WHERE DeadLettered = 0 - ORDER BY RowId ASC - LIMIT $n - ) - """; - cmd.Parameters.AddWithValue("$n", toEvict); - cmd.ExecuteNonQuery(); - } - // Core.AlarmHistorian-009: increment the lifetime eviction counter so the - // Admin UI / health check can report overflow without requiring log scraping. - lock (_statusLock) { _evictedCount += toEvict; } - _logger.Warning( - "Historian queue at capacity {Cap} — evicted {Count} oldest row(s) to make room (lifetime evictions: {Total})", - _capacity, toEvict, _evictedCount); - } - // Async variant used by EnqueueAsync (Core.AlarmHistorian-003). + // Core.AlarmHistorian-008: the precise path — runs COUNT(*) to compute the exact + // number of rows to evict. Reached only from the fast-path fallback when the + // in-memory counter says we are at or above capacity. private async Task EnforceCapacityAsync(SqliteConnection conn, CancellationToken ct) { + Interlocked.Increment(ref _capacityProbeCount); long count; using (var cmd = conn.CreateCommand()) { cmd.CommandText = "SELECT COUNT(*) FROM Queue WHERE DeadLettered = 0"; count = (long)(await cmd.ExecuteScalarAsync(ct).ConfigureAwait(false) ?? 0L); } + // Resync the in-memory counter while we have a fresh number. + Interlocked.Exchange(ref _queuedRowCount, count); if (count < _capacity) return; var toEvict = count - _capacity + 1; @@ -562,16 +633,16 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable cmd.Parameters.AddWithValue("$n", toEvict); await cmd.ExecuteNonQueryAsync(ct).ConfigureAwait(false); } + Interlocked.Add(ref _queuedRowCount, -toEvict); lock (_statusLock) { _evictedCount += toEvict; } _logger.Warning( "Historian queue at capacity {Cap} — evicted {Count} oldest row(s) to make room (lifetime evictions: {Total})", _capacity, toEvict, _evictedCount); } - private void PurgeAgedDeadLetters() + private void PurgeAgedDeadLetters(SqliteConnection conn) { var cutoff = (_clock() - _deadLetterRetention).ToString("O"); - using var conn = OpenConnection(); using var cmd = conn.CreateCommand(); cmd.CommandText = """ DELETE FROM Queue diff --git a/tests/Core/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.Tests/SqliteStoreAndForwardSinkTests.cs b/tests/Core/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.Tests/SqliteStoreAndForwardSinkTests.cs index 06b24e4..8be3e3b 100644 --- a/tests/Core/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.Tests/SqliteStoreAndForwardSinkTests.cs +++ b/tests/Core/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.Tests/SqliteStoreAndForwardSinkTests.cs @@ -609,6 +609,130 @@ public sealed class SqliteStoreAndForwardSinkTests : IDisposable } } + /// + /// Regression for Core.AlarmHistorian-008: EnqueueAsync must NOT run a + /// SELECT COUNT(*) on every enqueue when we are far below capacity. The + /// optimisation tracks the queue depth in memory and only consults the database + /// when the cached value indicates the capacity wall is in reach. This regression + /// pins the perf characteristic: after many enqueues below capacity, the + /// capacity-probe count must stay bounded — not grow proportionally to the + /// enqueue count as the un-optimised path did. + /// + [Fact] + public async Task EnqueueAsync_does_not_count_all_rows_on_every_call_below_capacity() + { + var writer = new FakeWriter(); + using var sink = new SqliteStoreAndForwardSink( + _dbPath, writer, _log, batchSize: 100, capacity: 10_000); + + for (var i = 0; i < 200; i++) + await sink.EnqueueAsync(Event($"E{i}"), CancellationToken.None); + + // Pre-fix: probe count == enqueue count (200). Post-fix: ≤ a handful (initial + // load + occasional periodic re-syncs). 25 is generous headroom. + sink.DebugCapacityProbeCount.ShouldBeLessThan(25, + "EnqueueAsync must not run a per-call SELECT COUNT(*) below capacity"); + } + + /// + /// Regression for Core.AlarmHistorian-008: across every queue mutation (enqueue, + /// Ack drain, PermanentFail drain, capacity eviction, RetryDeadLettered) the + /// queue depth reported by must + /// stay aligned with a fresh COUNT(*) against the database. Catches drift + /// bugs in the in-memory counter introduced by the perf optimisation. + /// + [Fact] + public async Task Enqueue_and_drain_keep_queue_depth_consistent_with_storage() + { + var writer = new FakeWriter(); + using var sink = new SqliteStoreAndForwardSink( + _dbPath, writer, _log, batchSize: 5, capacity: 8); + + // Burst-enqueue below capacity — the in-memory counter must stay aligned with the + // SELECT COUNT(*) that GetStatus runs. + for (var i = 0; i < 6; i++) + await sink.EnqueueAsync(Event($"burst-{i}"), CancellationToken.None); + AssertQueueDepthMatchesStorage(sink); + sink.GetStatus().QueueDepth.ShouldBe(6); + + // Push past capacity — capacity must still be enforced even when EnqueueAsync no + // longer runs COUNT(*) on every call. + for (var i = 0; i < 5; i++) + await sink.EnqueueAsync(Event($"overflow-{i}"), CancellationToken.None); + sink.GetStatus().QueueDepth.ShouldBe(8, "capacity must still be honoured by the perf-optimised path"); + sink.GetStatus().EvictedCount.ShouldBe(3, "eviction counter must reflect every evicted row"); + AssertQueueDepthMatchesStorage(sink); + + // Drain a partial batch (Ack) — the in-memory counter must follow the deletes + // applied inside the single consolidated drain transaction. + await sink.DrainOnceAsync(CancellationToken.None); + AssertQueueDepthMatchesStorage(sink); + + // Add a dead-lettered row and verify the counter does NOT include it (QueueDepth + // is non-dead-lettered only). + writer.NextOutcomePerEvent.Enqueue(HistorianWriteOutcome.PermanentFail); + await sink.EnqueueAsync(Event("to-dead-letter"), CancellationToken.None); + await sink.DrainOnceAsync(CancellationToken.None); + AssertQueueDepthMatchesStorage(sink); + sink.GetStatus().DeadLetterDepth.ShouldBeGreaterThanOrEqualTo(1); + + // RetryDeadLettered moves DLQ rows back into the live queue — the counter must + // pick that up. + sink.RetryDeadLettered(); + AssertQueueDepthMatchesStorage(sink); + } + + /// + /// Stress regression for Core.AlarmHistorian-008: interleave many enqueues and + /// drains across threads and confirm the in-memory counter stays consistent + /// with storage. Catches drift bugs in the optimised path that would only show + /// up under contention. + /// + [Fact] + public async Task Counter_remains_consistent_under_concurrent_enqueue_and_drain() + { + var writer = new FakeWriter(); + using var sink = new SqliteStoreAndForwardSink(_dbPath, writer, _log); + + var enqueuers = Enumerable.Range(0, 3).Select(t => Task.Run(async () => + { + for (var i = 0; i < 60; i++) + await sink.EnqueueAsync(Event($"T{t}-{i}"), CancellationToken.None); + })); + var drainers = Enumerable.Range(0, 2).Select(_ => Task.Run(async () => + { + for (var i = 0; i < 30; i++) + { + await sink.DrainOnceAsync(CancellationToken.None); + await Task.Delay(2); + } + })); + + await Task.WhenAll(enqueuers.Concat(drainers)); + + // Drain anything left over. + for (var i = 0; i < 10; i++) + await sink.DrainOnceAsync(CancellationToken.None); + + AssertQueueDepthMatchesStorage(sink); + sink.GetStatus().QueueDepth.ShouldBe(0, "every event drained at the end of the run"); + } + + /// + /// Helper that confirms the queue depth surfaced by GetStatus matches a fresh + /// COUNT(*) read directly from storage — proves the in-memory counter has not + /// drifted from the persisted truth. + /// + private void AssertQueueDepthMatchesStorage(SqliteStoreAndForwardSink sink) + { + using var conn = new SqliteConnection($"Data Source={_dbPath}"); + conn.Open(); + using var cmd = conn.CreateCommand(); + cmd.CommandText = "SELECT COUNT(*) FROM Queue WHERE DeadLettered = 0"; + var live = (long)(cmd.ExecuteScalar() ?? 0L); + sink.GetStatus().QueueDepth.ShouldBe(live, "GetStatus must agree with a fresh COUNT(*)"); + } + /// Insert a queue row whose PayloadJson cannot deserialize into an AlarmHistorianEvent. private void InsertCorruptRow(string alarmId) {