fix(core-alarm-historian): resolve Low code-review findings (Core.AlarmHistorian-008,011)
- Core.AlarmHistorian-008: cache queue depth in an Interlocked counter so EnqueueAsync no longer runs COUNT(*) on every alarm; consolidate DrainOnceAsync onto a single SqliteConnection per tick (purge, batch read, dead-letter, and outcome transaction all share it). - Core.AlarmHistorian-011: confirm the stale Galaxy.Host XML doc references were already fixed under earlier commits; flip to Resolved. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -7,7 +7,7 @@
|
|||||||
| Review date | 2026-05-22 |
|
| Review date | 2026-05-22 |
|
||||||
| Commit reviewed | `76d35d1` |
|
| Commit reviewed | `76d35d1` |
|
||||||
| Status | Reviewed |
|
| Status | Reviewed |
|
||||||
| Open findings | 2 |
|
| Open findings | 0 |
|
||||||
|
|
||||||
## Checklist coverage
|
## Checklist coverage
|
||||||
|
|
||||||
@@ -138,13 +138,13 @@
|
|||||||
| Severity | Low |
|
| Severity | Low |
|
||||||
| Category | Performance & resource management |
|
| Category | Performance & resource management |
|
||||||
| Location | `src/Core/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/SqliteStoreAndForwardSink.cs:107-127,255-278` |
|
| Location | `src/Core/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/SqliteStoreAndForwardSink.cs:107-127,255-278` |
|
||||||
| Status | Open |
|
| Status | Resolved |
|
||||||
|
|
||||||
**Description:** Each `EnqueueAsync` (one per alarm transition — a hot path on a busy plant) opens a connection, runs `EnforceCapacity` (a `COUNT(*)` over the queue table on every single enqueue), serializes JSON, inserts, and closes the connection. The unconditional `COUNT(*)` on every enqueue is an avoidable scan; the open/close churn defeats connection pooling benefits and adds lock-acquisition overhead per event. `DrainOnceAsync` similarly opens three separate connections per tick (`PurgeAgedDeadLetters`, `ReadBatch`, the transaction block).
|
**Description:** Each `EnqueueAsync` (one per alarm transition — a hot path on a busy plant) opens a connection, runs `EnforceCapacity` (a `COUNT(*)` over the queue table on every single enqueue), serializes JSON, inserts, and closes the connection. The unconditional `COUNT(*)` on every enqueue is an avoidable scan; the open/close churn defeats connection pooling benefits and adds lock-acquisition overhead per event. `DrainOnceAsync` similarly opens three separate connections per tick (`PurgeAgedDeadLetters`, `ReadBatch`, the transaction block).
|
||||||
|
|
||||||
**Recommendation:** Reuse a single pooled write connection. Replace the per-enqueue `COUNT(*)` with a periodic capacity check (every Nth enqueue, or piggy-backed on the drain tick), or maintain an in-memory approximate counter. Combine the drain-tick connections into one.
|
**Recommendation:** Reuse a single pooled write connection. Replace the per-enqueue `COUNT(*)` with a periodic capacity check (every Nth enqueue, or piggy-backed on the drain tick), or maintain an in-memory approximate counter. Combine the drain-tick connections into one.
|
||||||
|
|
||||||
**Resolution:** _(open)_
|
**Resolution:** Resolved 2026-05-23 — added an `Interlocked`-guarded in-memory `_queuedRowCount` seeded from storage at construction and kept current by every mutation (enqueue increment, drain Ack/PermanentFail/corrupt-dead-letter decrements, capacity-eviction adjustment, RetryDeadLettered re-add). `EnqueueAsync` now short-circuits capacity enforcement against the cached counter via `EnforceCapacityFastPathAsync`, only paying for a real `COUNT(*)` when the cached value reaches the capacity wall or the periodic resync interval (every 10,000 enqueues) elapses; the obsolete sync `EnforceCapacity` was removed. `GetStatus()` reads `QueueDepth` from the same counter so a busy Admin UI no longer hits the DB for it. `DrainOnceAsync` is consolidated onto one shared `SqliteConnection` per tick — purge, read, corrupt-dead-letter, and the outcome-applying transaction now reuse it instead of opening three. Regression tests `EnqueueAsync_does_not_count_all_rows_on_every_call_below_capacity`, `Enqueue_and_drain_keep_queue_depth_consistent_with_storage`, and `Counter_remains_consistent_under_concurrent_enqueue_and_drain` added.
|
||||||
|
|
||||||
### Core.AlarmHistorian-009
|
### Core.AlarmHistorian-009
|
||||||
|
|
||||||
@@ -183,10 +183,10 @@
|
|||||||
| Severity | Low |
|
| Severity | Low |
|
||||||
| Category | Documentation & comments |
|
| Category | Documentation & comments |
|
||||||
| Location | `src/Core/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/IAlarmHistorianSink.cs:5-9,76`, `AlarmHistorianEvent.cs:20` |
|
| Location | `src/Core/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/IAlarmHistorianSink.cs:5-9,76`, `AlarmHistorianEvent.cs:20` |
|
||||||
| Status | Open |
|
| Status | Resolved |
|
||||||
|
|
||||||
**Description:** Several doc-comments reference the retired v1 architecture. The `IAlarmHistorianSink` summary says ingestion "routes through Galaxy.Host's pipe" and `IAlarmHistorianWriter` says "Stream G wires this to the Galaxy.Host IPC client", but `docs/AlarmTracking.md` and `CLAUDE.md` state the legacy `Galaxy.Host` project was retired in PR 7.2 and the write path is now the Wonderware historian sidecar (`WonderwareHistorianClient`). `AlarmHistorianEvent.cs:20` likewise says "the Galaxy.Host handler maps to the historian's enum on the wire." These stale references will mislead a reader about where the writer is actually hosted.
|
**Description:** Several doc-comments reference the retired v1 architecture. The `IAlarmHistorianSink` summary says ingestion "routes through Galaxy.Host's pipe" and `IAlarmHistorianWriter` says "Stream G wires this to the Galaxy.Host IPC client", but `docs/AlarmTracking.md` and `CLAUDE.md` state the legacy `Galaxy.Host` project was retired in PR 7.2 and the write path is now the Wonderware historian sidecar (`WonderwareHistorianClient`). `AlarmHistorianEvent.cs:20` likewise says "the Galaxy.Host handler maps to the historian's enum on the wire." These stale references will mislead a reader about where the writer is actually hosted.
|
||||||
|
|
||||||
**Recommendation:** Update the doc-comments to refer to the Wonderware historian sidecar / `WonderwareHistorianClient` (`IAlarmHistorianWriter` implementation) instead of `Galaxy.Host`, consistent with `docs/AlarmTracking.md`'s "Historian write-back" section.
|
**Recommendation:** Update the doc-comments to refer to the Wonderware historian sidecar / `WonderwareHistorianClient` (`IAlarmHistorianWriter` implementation) instead of `Galaxy.Host`, consistent with `docs/AlarmTracking.md`'s "Historian write-back" section.
|
||||||
|
|
||||||
**Resolution:** _(open)_
|
**Resolution:** Resolved 2026-05-23 — the three stale `Galaxy.Host` references were already replaced ahead of this resolution by earlier commits (`bdca772` rewrote the `IAlarmHistorianSink` summary + `IAlarmHistorianWriter` summary to name the Wonderware historian sidecar / `WonderwareHistorianClient`; `f6d487b` rewrote the `AlarmHistorianEvent.EventKind` doc-comment). A fresh grep across the project confirms no remaining `Galaxy.Host` / "Stream G wires this" strings — only the legitimate `Galaxy-native` alarm-source label survives. Status flipped to Resolved during the -008 pass; no new source change was needed.
|
||||||
|
|||||||
@@ -87,6 +87,25 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
|
|||||||
// having to scrape the WARN log.
|
// having to scrape the WARN log.
|
||||||
private long _evictedCount;
|
private long _evictedCount;
|
||||||
|
|
||||||
|
// Core.AlarmHistorian-008: keep an approximate in-memory count of non-dead-lettered
|
||||||
|
// rows so EnqueueAsync does not need to run a SELECT COUNT(*) on every call. The
|
||||||
|
// counter is seeded from storage at construction, kept current by every mutation
|
||||||
|
// (Enqueue, Drain, RetryDeadLettered, PurgeAgedDeadLetters, EnforceCapacity), and
|
||||||
|
// periodically re-synced from storage as a safety net against drift.
|
||||||
|
// Mutations cross threads (EnqueueAsync is called from the emitting thread, drain
|
||||||
|
// runs on the timer / drain thread) so it is updated via Interlocked.
|
||||||
|
private long _queuedRowCount;
|
||||||
|
// Probe counter — incremented every time we actually issue a real COUNT(*) for
|
||||||
|
// capacity enforcement. Public for test instrumentation only.
|
||||||
|
private long _capacityProbeCount;
|
||||||
|
// After every Nth enqueue we resync the in-memory counter from storage to defend
|
||||||
|
// against silent drift (e.g. an external process editing the DB).
|
||||||
|
private const long ResyncEnqueueInterval = 10_000;
|
||||||
|
private long _enqueuesSinceResync;
|
||||||
|
|
||||||
|
/// <summary>Test-only: number of times the perf-optimised path fell through to a real <c>COUNT(*)</c>.</summary>
|
||||||
|
public long DebugCapacityProbeCount => Interlocked.Read(ref _capacityProbeCount);
|
||||||
|
|
||||||
public SqliteStoreAndForwardSink(
|
public SqliteStoreAndForwardSink(
|
||||||
string databasePath,
|
string databasePath,
|
||||||
IAlarmHistorianWriter writer,
|
IAlarmHistorianWriter writer,
|
||||||
@@ -115,6 +134,9 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
|
|||||||
}.ToString();
|
}.ToString();
|
||||||
|
|
||||||
InitializeSchema();
|
InitializeSchema();
|
||||||
|
// Core.AlarmHistorian-008: seed the in-memory counter from storage so the
|
||||||
|
// perf-optimised EnqueueAsync path starts in sync with what's on disk.
|
||||||
|
_queuedRowCount = ProbeQueuedRowCount();
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@@ -223,7 +245,11 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
|
|||||||
await conn.OpenAsync(cancellationToken).ConfigureAwait(false);
|
await conn.OpenAsync(cancellationToken).ConfigureAwait(false);
|
||||||
await ApplyPragmasAsync(conn, cancellationToken).ConfigureAwait(false);
|
await ApplyPragmasAsync(conn, cancellationToken).ConfigureAwait(false);
|
||||||
|
|
||||||
await EnforceCapacityAsync(conn, cancellationToken).ConfigureAwait(false);
|
// Core.AlarmHistorian-008: use the in-memory counter to short-circuit the
|
||||||
|
// capacity check on every enqueue. The bare hot path is now one INSERT — no
|
||||||
|
// SELECT COUNT(*). We fall back to a real probe only when the cached counter
|
||||||
|
// says we're at or above capacity, or periodically to defend against drift.
|
||||||
|
await EnforceCapacityFastPathAsync(conn, cancellationToken).ConfigureAwait(false);
|
||||||
|
|
||||||
using var cmd = conn.CreateCommand();
|
using var cmd = conn.CreateCommand();
|
||||||
cmd.CommandText = """
|
cmd.CommandText = """
|
||||||
@@ -234,6 +260,57 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
|
|||||||
cmd.Parameters.AddWithValue("$enqueued", _clock().ToString("O"));
|
cmd.Parameters.AddWithValue("$enqueued", _clock().ToString("O"));
|
||||||
cmd.Parameters.AddWithValue("$payload", JsonSerializer.Serialize(evt));
|
cmd.Parameters.AddWithValue("$payload", JsonSerializer.Serialize(evt));
|
||||||
await cmd.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
|
await cmd.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
|
||||||
|
|
||||||
|
Interlocked.Increment(ref _queuedRowCount);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Capacity enforcement on the hot enqueue path: consults the in-memory counter
|
||||||
|
/// first and only probes storage with a real <c>COUNT(*)</c> when (a) the
|
||||||
|
/// cached value indicates the capacity wall is in reach, or (b) the periodic
|
||||||
|
/// resync interval has elapsed. The actual eviction (when over capacity) goes
|
||||||
|
/// through <see cref="EnforceCapacityAsync"/> which still runs a precise
|
||||||
|
/// COUNT to compute the exact number of rows to evict.
|
||||||
|
/// </summary>
|
||||||
|
private async Task EnforceCapacityFastPathAsync(SqliteConnection conn, CancellationToken ct)
|
||||||
|
{
|
||||||
|
var enqueuesSinceResync = Interlocked.Increment(ref _enqueuesSinceResync);
|
||||||
|
var cached = Interlocked.Read(ref _queuedRowCount);
|
||||||
|
|
||||||
|
// Periodic resync — bounded amount of drift even under exotic conditions.
|
||||||
|
if (enqueuesSinceResync >= ResyncEnqueueInterval)
|
||||||
|
{
|
||||||
|
await ResyncQueuedRowCountAsync(conn, ct).ConfigureAwait(false);
|
||||||
|
cached = Interlocked.Read(ref _queuedRowCount);
|
||||||
|
Interlocked.Exchange(ref _enqueuesSinceResync, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Below capacity per the cached counter — skip the COUNT(*) entirely.
|
||||||
|
if (cached < _capacity) return;
|
||||||
|
|
||||||
|
// Cached counter says we're at or above the capacity wall — fall back to the
|
||||||
|
// precise path which probes COUNT(*) and evicts whatever's needed.
|
||||||
|
await EnforceCapacityAsync(conn, ct).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>Synchronously query <c>COUNT(*)</c> of non-dead-lettered rows. Used at startup.</summary>
|
||||||
|
private long ProbeQueuedRowCount()
|
||||||
|
{
|
||||||
|
Interlocked.Increment(ref _capacityProbeCount);
|
||||||
|
using var conn = OpenConnection();
|
||||||
|
using var cmd = conn.CreateCommand();
|
||||||
|
cmd.CommandText = "SELECT COUNT(*) FROM Queue WHERE DeadLettered = 0";
|
||||||
|
return (long)(cmd.ExecuteScalar() ?? 0L);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>Re-sync the in-memory counter from storage (async path).</summary>
|
||||||
|
private async Task ResyncQueuedRowCountAsync(SqliteConnection conn, CancellationToken ct)
|
||||||
|
{
|
||||||
|
Interlocked.Increment(ref _capacityProbeCount);
|
||||||
|
using var cmd = conn.CreateCommand();
|
||||||
|
cmd.CommandText = "SELECT COUNT(*) FROM Queue WHERE DeadLettered = 0";
|
||||||
|
var live = (long)(await cmd.ExecuteScalarAsync(ct).ConfigureAwait(false) ?? 0L);
|
||||||
|
Interlocked.Exchange(ref _queuedRowCount, live);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@@ -242,6 +319,12 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
|
|||||||
/// on RetryPlease. Safe to call from multiple threads; the semaphore enforces
|
/// on RetryPlease. Safe to call from multiple threads; the semaphore enforces
|
||||||
/// serial execution.
|
/// serial execution.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
|
/// <remarks>
|
||||||
|
/// Core.AlarmHistorian-008: every per-tick SQLite operation runs through a
|
||||||
|
/// single shared connection (purge, read, corrupt-row dead-letter, and the
|
||||||
|
/// outcome-applying transaction). Pre-fix the drain opened three independent
|
||||||
|
/// connections per tick, each paying the open + PRAGMA cost.
|
||||||
|
/// </remarks>
|
||||||
public async Task DrainOnceAsync(CancellationToken ct)
|
public async Task DrainOnceAsync(CancellationToken ct)
|
||||||
{
|
{
|
||||||
if (_disposed) return;
|
if (_disposed) return;
|
||||||
@@ -254,8 +337,12 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
|
|||||||
_lastDrainUtc = _clock();
|
_lastDrainUtc = _clock();
|
||||||
}
|
}
|
||||||
|
|
||||||
PurgeAgedDeadLetters();
|
// One connection per drain tick — used by purge, read, corrupt-dead-letter,
|
||||||
var batch = ReadBatch();
|
// and the outcome-applying transaction.
|
||||||
|
using var conn = OpenConnection();
|
||||||
|
|
||||||
|
PurgeAgedDeadLetters(conn);
|
||||||
|
var batch = ReadBatch(conn);
|
||||||
if (batch.Count == 0)
|
if (batch.Count == 0)
|
||||||
{
|
{
|
||||||
lock (_statusLock) { _drainState = HistorianDrainState.Idle; }
|
lock (_statusLock) { _drainState = HistorianDrainState.Idle; }
|
||||||
@@ -271,11 +358,13 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
|
|||||||
|
|
||||||
if (corruptRowIds.Count > 0)
|
if (corruptRowIds.Count > 0)
|
||||||
{
|
{
|
||||||
using var corruptConn = OpenConnection();
|
using var corruptTx = conn.BeginTransaction();
|
||||||
using var corruptTx = corruptConn.BeginTransaction();
|
|
||||||
foreach (var rowId in corruptRowIds)
|
foreach (var rowId in corruptRowIds)
|
||||||
DeadLetterRow(corruptConn, corruptTx, rowId, $"corrupt payload at {_clock():O}");
|
DeadLetterRow(conn, corruptTx, rowId, $"corrupt payload at {_clock():O}");
|
||||||
corruptTx.Commit();
|
corruptTx.Commit();
|
||||||
|
// Each corrupt row leaves the non-dead-lettered queue — bookkeeping for
|
||||||
|
// the in-memory counter (Core.AlarmHistorian-008).
|
||||||
|
Interlocked.Add(ref _queuedRowCount, -corruptRowIds.Count);
|
||||||
_logger.Warning(
|
_logger.Warning(
|
||||||
"Dead-lettered {Count} historian queue row(s) with un-deserializable payload",
|
"Dead-lettered {Count} historian queue row(s) with un-deserializable payload",
|
||||||
corruptRowIds.Count);
|
corruptRowIds.Count);
|
||||||
@@ -330,8 +419,9 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
using var conn = OpenConnection();
|
int rowsLeavingQueue = 0;
|
||||||
using var tx = conn.BeginTransaction();
|
using (var tx = conn.BeginTransaction())
|
||||||
|
{
|
||||||
for (var i = 0; i < outcomes.Count; i++)
|
for (var i = 0; i < outcomes.Count; i++)
|
||||||
{
|
{
|
||||||
var outcome = outcomes[i];
|
var outcome = outcomes[i];
|
||||||
@@ -340,9 +430,11 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
|
|||||||
{
|
{
|
||||||
case HistorianWriteOutcome.Ack:
|
case HistorianWriteOutcome.Ack:
|
||||||
DeleteRow(conn, tx, rowId);
|
DeleteRow(conn, tx, rowId);
|
||||||
|
rowsLeavingQueue++;
|
||||||
break;
|
break;
|
||||||
case HistorianWriteOutcome.PermanentFail:
|
case HistorianWriteOutcome.PermanentFail:
|
||||||
DeadLetterRow(conn, tx, rowId, $"permanent fail at {_clock():O}");
|
DeadLetterRow(conn, tx, rowId, $"permanent fail at {_clock():O}");
|
||||||
|
rowsLeavingQueue++;
|
||||||
break;
|
break;
|
||||||
case HistorianWriteOutcome.RetryPlease:
|
case HistorianWriteOutcome.RetryPlease:
|
||||||
BumpAttempt(conn, tx, rowId, "retry-please");
|
BumpAttempt(conn, tx, rowId, "retry-please");
|
||||||
@@ -350,6 +442,11 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
tx.Commit();
|
tx.Commit();
|
||||||
|
}
|
||||||
|
// Ack-deleted + PermanentFail-dead-lettered rows both leave the
|
||||||
|
// non-dead-lettered queue — keep the counter aligned (Core.AlarmHistorian-008).
|
||||||
|
if (rowsLeavingQueue > 0)
|
||||||
|
Interlocked.Add(ref _queuedRowCount, -rowsLeavingQueue);
|
||||||
|
|
||||||
var acks = outcomes.Count(o => o == HistorianWriteOutcome.Ack);
|
var acks = outcomes.Count(o => o == HistorianWriteOutcome.Ack);
|
||||||
lock (_statusLock)
|
lock (_statusLock)
|
||||||
@@ -375,15 +472,15 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
|
|||||||
|
|
||||||
public HistorianSinkStatus GetStatus()
|
public HistorianSinkStatus GetStatus()
|
||||||
{
|
{
|
||||||
using var conn = OpenConnection();
|
// Core.AlarmHistorian-008: read the non-dead-lettered count from the in-memory
|
||||||
|
// counter so a busy Admin UI / health probe does not hammer the DB. Dead-letter
|
||||||
|
// depth is rare-path only (it lives in the queue until retention) so a real
|
||||||
|
// COUNT(*) on a single combined connection is fine.
|
||||||
|
var queued = Interlocked.Read(ref _queuedRowCount);
|
||||||
|
if (queued < 0) queued = 0;
|
||||||
|
|
||||||
long queued;
|
|
||||||
long deadlettered;
|
long deadlettered;
|
||||||
using (var cmd = conn.CreateCommand())
|
using (var conn = OpenConnection())
|
||||||
{
|
|
||||||
cmd.CommandText = "SELECT COUNT(*) FROM Queue WHERE DeadLettered = 0";
|
|
||||||
queued = (long)(cmd.ExecuteScalar() ?? 0L);
|
|
||||||
}
|
|
||||||
using (var cmd = conn.CreateCommand())
|
using (var cmd = conn.CreateCommand())
|
||||||
{
|
{
|
||||||
cmd.CommandText = "SELECT COUNT(*) FROM Queue WHERE DeadLettered = 1";
|
cmd.CommandText = "SELECT COUNT(*) FROM Queue WHERE DeadLettered = 1";
|
||||||
@@ -421,7 +518,11 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
|
|||||||
using var conn = OpenConnection();
|
using var conn = OpenConnection();
|
||||||
using var cmd = conn.CreateCommand();
|
using var cmd = conn.CreateCommand();
|
||||||
cmd.CommandText = "UPDATE Queue SET DeadLettered = 0, AttemptCount = 0, LastError = NULL WHERE DeadLettered = 1";
|
cmd.CommandText = "UPDATE Queue SET DeadLettered = 0, AttemptCount = 0, LastError = NULL WHERE DeadLettered = 1";
|
||||||
return cmd.ExecuteNonQuery();
|
var revived = cmd.ExecuteNonQuery();
|
||||||
|
// Dead-lettered rows rejoin the non-dead-lettered queue — keep the in-memory
|
||||||
|
// counter aligned (Core.AlarmHistorian-008).
|
||||||
|
if (revived > 0) Interlocked.Add(ref _queuedRowCount, revived);
|
||||||
|
return revived;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@@ -432,10 +533,9 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
private readonly record struct QueueRow(long RowId, AlarmHistorianEvent? Event);
|
private readonly record struct QueueRow(long RowId, AlarmHistorianEvent? Event);
|
||||||
|
|
||||||
private List<QueueRow> ReadBatch()
|
private List<QueueRow> ReadBatch(SqliteConnection conn)
|
||||||
{
|
{
|
||||||
var rows = new List<QueueRow>();
|
var rows = new List<QueueRow>();
|
||||||
using var conn = OpenConnection();
|
|
||||||
using var cmd = conn.CreateCommand();
|
using var cmd = conn.CreateCommand();
|
||||||
cmd.CommandText = """
|
cmd.CommandText = """
|
||||||
SELECT RowId, PayloadJson FROM Queue
|
SELECT RowId, PayloadJson FROM Queue
|
||||||
@@ -501,50 +601,21 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
|
|||||||
cmd.ExecuteNonQuery();
|
cmd.ExecuteNonQuery();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void EnforceCapacity(SqliteConnection conn)
|
|
||||||
{
|
|
||||||
// Count non-dead-lettered rows only — dead-lettered rows retain for
|
|
||||||
// post-mortem per the configured retention window.
|
|
||||||
long count;
|
|
||||||
using (var cmd = conn.CreateCommand())
|
|
||||||
{
|
|
||||||
cmd.CommandText = "SELECT COUNT(*) FROM Queue WHERE DeadLettered = 0";
|
|
||||||
count = (long)(cmd.ExecuteScalar() ?? 0L);
|
|
||||||
}
|
|
||||||
if (count < _capacity) return;
|
|
||||||
|
|
||||||
var toEvict = count - _capacity + 1;
|
|
||||||
using (var cmd = conn.CreateCommand())
|
|
||||||
{
|
|
||||||
cmd.CommandText = """
|
|
||||||
DELETE FROM Queue
|
|
||||||
WHERE RowId IN (
|
|
||||||
SELECT RowId FROM Queue
|
|
||||||
WHERE DeadLettered = 0
|
|
||||||
ORDER BY RowId ASC
|
|
||||||
LIMIT $n
|
|
||||||
)
|
|
||||||
""";
|
|
||||||
cmd.Parameters.AddWithValue("$n", toEvict);
|
|
||||||
cmd.ExecuteNonQuery();
|
|
||||||
}
|
|
||||||
// Core.AlarmHistorian-009: increment the lifetime eviction counter so the
|
|
||||||
// Admin UI / health check can report overflow without requiring log scraping.
|
|
||||||
lock (_statusLock) { _evictedCount += toEvict; }
|
|
||||||
_logger.Warning(
|
|
||||||
"Historian queue at capacity {Cap} — evicted {Count} oldest row(s) to make room (lifetime evictions: {Total})",
|
|
||||||
_capacity, toEvict, _evictedCount);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Async variant used by EnqueueAsync (Core.AlarmHistorian-003).
|
// Async variant used by EnqueueAsync (Core.AlarmHistorian-003).
|
||||||
|
// Core.AlarmHistorian-008: the precise path — runs COUNT(*) to compute the exact
|
||||||
|
// number of rows to evict. Reached only from the fast-path fallback when the
|
||||||
|
// in-memory counter says we are at or above capacity.
|
||||||
private async Task EnforceCapacityAsync(SqliteConnection conn, CancellationToken ct)
|
private async Task EnforceCapacityAsync(SqliteConnection conn, CancellationToken ct)
|
||||||
{
|
{
|
||||||
|
Interlocked.Increment(ref _capacityProbeCount);
|
||||||
long count;
|
long count;
|
||||||
using (var cmd = conn.CreateCommand())
|
using (var cmd = conn.CreateCommand())
|
||||||
{
|
{
|
||||||
cmd.CommandText = "SELECT COUNT(*) FROM Queue WHERE DeadLettered = 0";
|
cmd.CommandText = "SELECT COUNT(*) FROM Queue WHERE DeadLettered = 0";
|
||||||
count = (long)(await cmd.ExecuteScalarAsync(ct).ConfigureAwait(false) ?? 0L);
|
count = (long)(await cmd.ExecuteScalarAsync(ct).ConfigureAwait(false) ?? 0L);
|
||||||
}
|
}
|
||||||
|
// Resync the in-memory counter while we have a fresh number.
|
||||||
|
Interlocked.Exchange(ref _queuedRowCount, count);
|
||||||
if (count < _capacity) return;
|
if (count < _capacity) return;
|
||||||
|
|
||||||
var toEvict = count - _capacity + 1;
|
var toEvict = count - _capacity + 1;
|
||||||
@@ -562,16 +633,16 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
|
|||||||
cmd.Parameters.AddWithValue("$n", toEvict);
|
cmd.Parameters.AddWithValue("$n", toEvict);
|
||||||
await cmd.ExecuteNonQueryAsync(ct).ConfigureAwait(false);
|
await cmd.ExecuteNonQueryAsync(ct).ConfigureAwait(false);
|
||||||
}
|
}
|
||||||
|
Interlocked.Add(ref _queuedRowCount, -toEvict);
|
||||||
lock (_statusLock) { _evictedCount += toEvict; }
|
lock (_statusLock) { _evictedCount += toEvict; }
|
||||||
_logger.Warning(
|
_logger.Warning(
|
||||||
"Historian queue at capacity {Cap} — evicted {Count} oldest row(s) to make room (lifetime evictions: {Total})",
|
"Historian queue at capacity {Cap} — evicted {Count} oldest row(s) to make room (lifetime evictions: {Total})",
|
||||||
_capacity, toEvict, _evictedCount);
|
_capacity, toEvict, _evictedCount);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void PurgeAgedDeadLetters()
|
private void PurgeAgedDeadLetters(SqliteConnection conn)
|
||||||
{
|
{
|
||||||
var cutoff = (_clock() - _deadLetterRetention).ToString("O");
|
var cutoff = (_clock() - _deadLetterRetention).ToString("O");
|
||||||
using var conn = OpenConnection();
|
|
||||||
using var cmd = conn.CreateCommand();
|
using var cmd = conn.CreateCommand();
|
||||||
cmd.CommandText = """
|
cmd.CommandText = """
|
||||||
DELETE FROM Queue
|
DELETE FROM Queue
|
||||||
|
|||||||
@@ -609,6 +609,130 @@ public sealed class SqliteStoreAndForwardSinkTests : IDisposable
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Regression for Core.AlarmHistorian-008: <c>EnqueueAsync</c> must NOT run a
|
||||||
|
/// <c>SELECT COUNT(*)</c> on every enqueue when we are far below capacity. The
|
||||||
|
/// optimisation tracks the queue depth in memory and only consults the database
|
||||||
|
/// when the cached value indicates the capacity wall is in reach. This regression
|
||||||
|
/// pins the perf characteristic: after many enqueues below capacity, the
|
||||||
|
/// capacity-probe count must stay bounded — not grow proportionally to the
|
||||||
|
/// enqueue count as the un-optimised path did.
|
||||||
|
/// </summary>
|
||||||
|
[Fact]
|
||||||
|
public async Task EnqueueAsync_does_not_count_all_rows_on_every_call_below_capacity()
|
||||||
|
{
|
||||||
|
var writer = new FakeWriter();
|
||||||
|
using var sink = new SqliteStoreAndForwardSink(
|
||||||
|
_dbPath, writer, _log, batchSize: 100, capacity: 10_000);
|
||||||
|
|
||||||
|
for (var i = 0; i < 200; i++)
|
||||||
|
await sink.EnqueueAsync(Event($"E{i}"), CancellationToken.None);
|
||||||
|
|
||||||
|
// Pre-fix: probe count == enqueue count (200). Post-fix: ≤ a handful (initial
|
||||||
|
// load + occasional periodic re-syncs). 25 is generous headroom.
|
||||||
|
sink.DebugCapacityProbeCount.ShouldBeLessThan(25,
|
||||||
|
"EnqueueAsync must not run a per-call SELECT COUNT(*) below capacity");
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Regression for Core.AlarmHistorian-008: across every queue mutation (enqueue,
|
||||||
|
/// Ack drain, PermanentFail drain, capacity eviction, RetryDeadLettered) the
|
||||||
|
/// queue depth reported by <see cref="SqliteStoreAndForwardSink.GetStatus"/> must
|
||||||
|
/// stay aligned with a fresh <c>COUNT(*)</c> against the database. Catches drift
|
||||||
|
/// bugs in the in-memory counter introduced by the perf optimisation.
|
||||||
|
/// </summary>
|
||||||
|
[Fact]
|
||||||
|
public async Task Enqueue_and_drain_keep_queue_depth_consistent_with_storage()
|
||||||
|
{
|
||||||
|
var writer = new FakeWriter();
|
||||||
|
using var sink = new SqliteStoreAndForwardSink(
|
||||||
|
_dbPath, writer, _log, batchSize: 5, capacity: 8);
|
||||||
|
|
||||||
|
// Burst-enqueue below capacity — the in-memory counter must stay aligned with the
|
||||||
|
// SELECT COUNT(*) that GetStatus runs.
|
||||||
|
for (var i = 0; i < 6; i++)
|
||||||
|
await sink.EnqueueAsync(Event($"burst-{i}"), CancellationToken.None);
|
||||||
|
AssertQueueDepthMatchesStorage(sink);
|
||||||
|
sink.GetStatus().QueueDepth.ShouldBe(6);
|
||||||
|
|
||||||
|
// Push past capacity — capacity must still be enforced even when EnqueueAsync no
|
||||||
|
// longer runs COUNT(*) on every call.
|
||||||
|
for (var i = 0; i < 5; i++)
|
||||||
|
await sink.EnqueueAsync(Event($"overflow-{i}"), CancellationToken.None);
|
||||||
|
sink.GetStatus().QueueDepth.ShouldBe(8, "capacity must still be honoured by the perf-optimised path");
|
||||||
|
sink.GetStatus().EvictedCount.ShouldBe(3, "eviction counter must reflect every evicted row");
|
||||||
|
AssertQueueDepthMatchesStorage(sink);
|
||||||
|
|
||||||
|
// Drain a partial batch (Ack) — the in-memory counter must follow the deletes
|
||||||
|
// applied inside the single consolidated drain transaction.
|
||||||
|
await sink.DrainOnceAsync(CancellationToken.None);
|
||||||
|
AssertQueueDepthMatchesStorage(sink);
|
||||||
|
|
||||||
|
// Add a dead-lettered row and verify the counter does NOT include it (QueueDepth
|
||||||
|
// is non-dead-lettered only).
|
||||||
|
writer.NextOutcomePerEvent.Enqueue(HistorianWriteOutcome.PermanentFail);
|
||||||
|
await sink.EnqueueAsync(Event("to-dead-letter"), CancellationToken.None);
|
||||||
|
await sink.DrainOnceAsync(CancellationToken.None);
|
||||||
|
AssertQueueDepthMatchesStorage(sink);
|
||||||
|
sink.GetStatus().DeadLetterDepth.ShouldBeGreaterThanOrEqualTo(1);
|
||||||
|
|
||||||
|
// RetryDeadLettered moves DLQ rows back into the live queue — the counter must
|
||||||
|
// pick that up.
|
||||||
|
sink.RetryDeadLettered();
|
||||||
|
AssertQueueDepthMatchesStorage(sink);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Stress regression for Core.AlarmHistorian-008: interleave many enqueues and
|
||||||
|
/// drains across threads and confirm the in-memory counter stays consistent
|
||||||
|
/// with storage. Catches drift bugs in the optimised path that would only show
|
||||||
|
/// up under contention.
|
||||||
|
/// </summary>
|
||||||
|
[Fact]
|
||||||
|
public async Task Counter_remains_consistent_under_concurrent_enqueue_and_drain()
|
||||||
|
{
|
||||||
|
var writer = new FakeWriter();
|
||||||
|
using var sink = new SqliteStoreAndForwardSink(_dbPath, writer, _log);
|
||||||
|
|
||||||
|
var enqueuers = Enumerable.Range(0, 3).Select(t => Task.Run(async () =>
|
||||||
|
{
|
||||||
|
for (var i = 0; i < 60; i++)
|
||||||
|
await sink.EnqueueAsync(Event($"T{t}-{i}"), CancellationToken.None);
|
||||||
|
}));
|
||||||
|
var drainers = Enumerable.Range(0, 2).Select(_ => Task.Run(async () =>
|
||||||
|
{
|
||||||
|
for (var i = 0; i < 30; i++)
|
||||||
|
{
|
||||||
|
await sink.DrainOnceAsync(CancellationToken.None);
|
||||||
|
await Task.Delay(2);
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
|
||||||
|
await Task.WhenAll(enqueuers.Concat(drainers));
|
||||||
|
|
||||||
|
// Drain anything left over.
|
||||||
|
for (var i = 0; i < 10; i++)
|
||||||
|
await sink.DrainOnceAsync(CancellationToken.None);
|
||||||
|
|
||||||
|
AssertQueueDepthMatchesStorage(sink);
|
||||||
|
sink.GetStatus().QueueDepth.ShouldBe(0, "every event drained at the end of the run");
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Helper that confirms the queue depth surfaced by GetStatus matches a fresh
|
||||||
|
/// COUNT(*) read directly from storage — proves the in-memory counter has not
|
||||||
|
/// drifted from the persisted truth.
|
||||||
|
/// </summary>
|
||||||
|
private void AssertQueueDepthMatchesStorage(SqliteStoreAndForwardSink sink)
|
||||||
|
{
|
||||||
|
using var conn = new SqliteConnection($"Data Source={_dbPath}");
|
||||||
|
conn.Open();
|
||||||
|
using var cmd = conn.CreateCommand();
|
||||||
|
cmd.CommandText = "SELECT COUNT(*) FROM Queue WHERE DeadLettered = 0";
|
||||||
|
var live = (long)(cmd.ExecuteScalar() ?? 0L);
|
||||||
|
sink.GetStatus().QueueDepth.ShouldBe(live, "GetStatus must agree with a fresh COUNT(*)");
|
||||||
|
}
|
||||||
|
|
||||||
/// <summary>Insert a queue row whose PayloadJson cannot deserialize into an AlarmHistorianEvent.</summary>
|
/// <summary>Insert a queue row whose PayloadJson cannot deserialize into an AlarmHistorianEvent.</summary>
|
||||||
private void InsertCorruptRow(string alarmId)
|
private void InsertCorruptRow(string alarmId)
|
||||||
{
|
{
|
||||||
|
|||||||
Reference in New Issue
Block a user