fix(alarm-historian): keep queue rows aligned to events on drain (Core.AlarmHistorian-001)
ReadBatch built parallel rowIds / events lists: rowIds.Add ran for every row but events.Add was guarded by `if (evt is not null)`. A corrupt / null-deserializing payload desynced the lists, so DrainOnceAsync applied each outcome to the wrong RowId — an Ack could delete an un-sent event (silent alarm-event data loss) and the corrupt row stalled the queue head forever. ReadBatch now returns a single list of QueueRow(long RowId, AlarmHistorianEvent? Event) records so a rowId can never drift from its event; deserialization is wrapped to yield null on JsonException. DrainOnceAsync immediately dead-letters rows whose payload is null/un-deserializable and forwards only well-formed events to the writer, mapping outcomes by RowId. Regression tests cover a corrupt row mid-batch and at the queue head. Core.AlarmHistorian suite: 16/16 pass. Resolves code-review finding Core.AlarmHistorian-001 (Critical). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -7,7 +7,7 @@
|
||||
| Review date | 2026-05-22 |
|
||||
| Commit reviewed | `76d35d1` |
|
||||
| Status | Reviewed |
|
||||
| Open findings | 11 |
|
||||
| Open findings | 10 |
|
||||
|
||||
## Checklist coverage
|
||||
|
||||
@@ -33,13 +33,13 @@
|
||||
| Severity | Critical |
|
||||
| Category | Correctness & logic bugs |
|
||||
| Location | `src/Core/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/SqliteStoreAndForwardSink.cs:255-278` |
|
||||
| Status | Open |
|
||||
| Status | Resolved |
|
||||
|
||||
**Description:** `ReadBatch` builds two parallel lists, `rowIds` and `events`, that `DrainOnceAsync` later indexes together (`rowIds[i]` paired with `outcomes[i]`, where `outcomes` is 1:1 with `events`). But `rowIds.Add(reader.GetInt64(0))` runs unconditionally for every row, while `events.Add(evt)` is guarded by `if (evt is not null)`. If `JsonSerializer.Deserialize<AlarmHistorianEvent>` returns `null` for any row (corrupt or empty payload), `rowIds` gains an entry but `events` does not. The writer then returns `outcomes.Count == events.Count`, which passes the `outcomes.Count != events.Count` guard, and the per-row loop applies each outcome to `rowIds[i]` — every row from the skipped index onward is mapped to the wrong event's outcome. An `Ack` can delete a row whose event was never sent to the historian (silent alarm-event data loss), and a `PermanentFail` can dead-letter an unrelated good row. The corrupt row itself is never advanced and is re-read on every drain forever, permanently stalling the queue head.
|
||||
|
||||
**Recommendation:** Keep `rowIds` and `events` strictly aligned. Either skip the `rowId` when deserialization returns `null`, or — better — treat a `null`/failed deserialization as an immediate dead-letter for that specific `RowId` (it can never succeed) and exclude it from the batch passed to the writer. Carry the `rowId` inside a single list of `(long RowId, AlarmHistorianEvent Event)` tuples so the two can never drift.
|
||||
|
||||
**Resolution:** _(open)_
|
||||
**Resolution:** Resolved 2026-05-22 — `ReadBatch` now returns a single list of `QueueRow(long RowId, AlarmHistorianEvent? Event)` records so a rowId can never drift from its event; `DrainOnceAsync` immediately dead-letters rows whose payload is null/un-deserializable (also catching `JsonException`) and forwards only well-formed events to the writer, mapping outcomes by `liveRows[i].RowId`. Regression tests `Drain_with_corrupt_payload_row_deadletters_it_and_keeps_good_rows_aligned` and `Drain_with_corrupt_head_row_does_not_stall_queue` added.
|
||||
|
||||
### Core.AlarmHistorian-002
|
||||
|
||||
|
||||
@@ -142,8 +142,34 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
|
||||
_lastDrainUtc = _clock();
|
||||
|
||||
PurgeAgedDeadLetters();
|
||||
var (rowIds, events) = ReadBatch();
|
||||
if (rowIds.Count == 0)
|
||||
var batch = ReadBatch();
|
||||
if (batch.Count == 0)
|
||||
{
|
||||
_drainState = HistorianDrainState.Idle;
|
||||
return;
|
||||
}
|
||||
|
||||
// A null/un-deserializable payload can never succeed — dead-letter it
|
||||
// immediately for its own RowId so it cannot stall the queue head, and
|
||||
// exclude it from the batch handed to the writer.
|
||||
var corruptRowIds = batch.Where(r => r.Event is null).Select(r => r.RowId).ToList();
|
||||
var liveRows = batch.Where(r => r.Event is not null).ToList();
|
||||
var events = liveRows.Select(r => r.Event!).ToList();
|
||||
|
||||
if (corruptRowIds.Count > 0)
|
||||
{
|
||||
using var corruptConn = new SqliteConnection(_connectionString);
|
||||
corruptConn.Open();
|
||||
using var corruptTx = corruptConn.BeginTransaction();
|
||||
foreach (var rowId in corruptRowIds)
|
||||
DeadLetterRow(corruptConn, corruptTx, rowId, $"corrupt payload at {_clock():O}");
|
||||
corruptTx.Commit();
|
||||
_logger.Warning(
|
||||
"Dead-lettered {Count} historian queue row(s) with un-deserializable payload",
|
||||
corruptRowIds.Count);
|
||||
}
|
||||
|
||||
if (events.Count == 0)
|
||||
{
|
||||
_drainState = HistorianDrainState.Idle;
|
||||
return;
|
||||
@@ -179,7 +205,7 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
|
||||
for (var i = 0; i < outcomes.Count; i++)
|
||||
{
|
||||
var outcome = outcomes[i];
|
||||
var rowId = rowIds[i];
|
||||
var rowId = liveRows[i].RowId;
|
||||
switch (outcome)
|
||||
{
|
||||
case HistorianWriteOutcome.Ack:
|
||||
@@ -252,10 +278,17 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
|
||||
return cmd.ExecuteNonQuery();
|
||||
}
|
||||
|
||||
private (List<long> rowIds, List<AlarmHistorianEvent> events) ReadBatch()
|
||||
/// <summary>
|
||||
/// One queued row paired with its deserialized event. <see cref="Event"/> is
|
||||
/// <c>null</c> when the row's <c>PayloadJson</c> is corrupt or un-deserializable —
|
||||
/// the <see cref="RowId"/> always stays bound to its own row so outcomes can
|
||||
/// never be mapped to the wrong row.
|
||||
/// </summary>
|
||||
private readonly record struct QueueRow(long RowId, AlarmHistorianEvent? Event);
|
||||
|
||||
private List<QueueRow> ReadBatch()
|
||||
{
|
||||
var rowIds = new List<long>();
|
||||
var events = new List<AlarmHistorianEvent>();
|
||||
var rows = new List<QueueRow>();
|
||||
using var conn = new SqliteConnection(_connectionString);
|
||||
conn.Open();
|
||||
using var cmd = conn.CreateCommand();
|
||||
@@ -269,12 +302,21 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
|
||||
using var reader = cmd.ExecuteReader();
|
||||
while (reader.Read())
|
||||
{
|
||||
rowIds.Add(reader.GetInt64(0));
|
||||
var rowId = reader.GetInt64(0);
|
||||
var payload = reader.GetString(1);
|
||||
var evt = JsonSerializer.Deserialize<AlarmHistorianEvent>(payload);
|
||||
if (evt is not null) events.Add(evt);
|
||||
AlarmHistorianEvent? evt;
|
||||
try
|
||||
{
|
||||
evt = JsonSerializer.Deserialize<AlarmHistorianEvent>(payload);
|
||||
}
|
||||
catch (JsonException)
|
||||
{
|
||||
// Malformed JSON — carry a null event so the caller dead-letters this row.
|
||||
evt = null;
|
||||
}
|
||||
rows.Add(new QueueRow(rowId, evt));
|
||||
}
|
||||
return (rowIds, events);
|
||||
return rows;
|
||||
}
|
||||
|
||||
private static void DeleteRow(SqliteConnection conn, SqliteTransaction tx, long rowId)
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
using Microsoft.Data.Sqlite;
|
||||
using Serilog;
|
||||
using Serilog.Core;
|
||||
using Serilog.Events;
|
||||
@@ -283,4 +284,78 @@ public sealed class SqliteStoreAndForwardSinkTests : IDisposable
|
||||
await Should.ThrowAsync<ObjectDisposedException>(
|
||||
() => sink.EnqueueAsync(Event("A1"), CancellationToken.None));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Regression for Core.AlarmHistorian-001: a corrupt / un-deserializable
|
||||
/// PayloadJson row sitting between good rows must not desync the rowId/event
|
||||
/// lists. The corrupt row is dead-lettered (so it cannot stall the queue head
|
||||
/// forever), and every good row's outcome is applied to the CORRECT RowId —
|
||||
/// no good event is silently lost.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task Drain_with_corrupt_payload_row_deadletters_it_and_keeps_good_rows_aligned()
|
||||
{
|
||||
var writer = new FakeWriter();
|
||||
using var sink = new SqliteStoreAndForwardSink(_dbPath, writer, _log);
|
||||
|
||||
// Row 1: good. Row 2: corrupt JSON (inserted directly). Row 3: good.
|
||||
await sink.EnqueueAsync(Event("good-1"), CancellationToken.None);
|
||||
InsertCorruptRow("corrupt");
|
||||
await sink.EnqueueAsync(Event("good-2"), CancellationToken.None);
|
||||
|
||||
await sink.DrainOnceAsync(CancellationToken.None);
|
||||
|
||||
// The writer must only see the two well-formed events, in order.
|
||||
writer.Batches.Count.ShouldBe(1);
|
||||
writer.Batches[0].Select(e => e.AlarmId).ShouldBe(["good-1", "good-2"]);
|
||||
|
||||
var status = sink.GetStatus();
|
||||
// Both good rows acked + removed; the corrupt row is dead-lettered, not lost.
|
||||
status.QueueDepth.ShouldBe(0, "both good rows acked and removed");
|
||||
status.DeadLetterDepth.ShouldBe(1, "corrupt row dead-lettered, not silently dropped");
|
||||
|
||||
// A second drain must be a clean no-op — the corrupt row no longer stalls the head.
|
||||
await sink.DrainOnceAsync(CancellationToken.None);
|
||||
writer.Batches.Count.ShouldBe(1, "no further batches — corrupt row no longer re-read");
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// A corrupt row at the very head of the queue must be dead-lettered and not
|
||||
/// prevent the good rows behind it from draining.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task Drain_with_corrupt_head_row_does_not_stall_queue()
|
||||
{
|
||||
var writer = new FakeWriter();
|
||||
using var sink = new SqliteStoreAndForwardSink(_dbPath, writer, _log);
|
||||
|
||||
InsertCorruptRow("corrupt-head");
|
||||
await sink.EnqueueAsync(Event("good-1"), CancellationToken.None);
|
||||
|
||||
await sink.DrainOnceAsync(CancellationToken.None);
|
||||
|
||||
writer.Batches.Count.ShouldBe(1);
|
||||
writer.Batches[0].Select(e => e.AlarmId).ShouldBe(["good-1"]);
|
||||
|
||||
var status = sink.GetStatus();
|
||||
status.QueueDepth.ShouldBe(0);
|
||||
status.DeadLetterDepth.ShouldBe(1);
|
||||
}
|
||||
|
||||
/// <summary>Insert a queue row whose PayloadJson cannot deserialize into an AlarmHistorianEvent.</summary>
|
||||
private void InsertCorruptRow(string alarmId)
|
||||
{
|
||||
using var conn = new SqliteConnection($"Data Source={_dbPath}");
|
||||
conn.Open();
|
||||
using var cmd = conn.CreateCommand();
|
||||
cmd.CommandText = """
|
||||
INSERT INTO Queue (AlarmId, EnqueuedUtc, PayloadJson, AttemptCount)
|
||||
VALUES ($alarmId, $enqueued, $payload, 0);
|
||||
""";
|
||||
cmd.Parameters.AddWithValue("$alarmId", alarmId);
|
||||
cmd.Parameters.AddWithValue("$enqueued", DateTime.UtcNow.ToString("O"));
|
||||
// JSON literal "null" round-trips through Deserialize<T> as a null reference.
|
||||
cmd.Parameters.AddWithValue("$payload", "null");
|
||||
cmd.ExecuteNonQuery();
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user