diff --git a/code-reviews/Core.AlarmHistorian/findings.md b/code-reviews/Core.AlarmHistorian/findings.md index e5dd30d..8aeda09 100644 --- a/code-reviews/Core.AlarmHistorian/findings.md +++ b/code-reviews/Core.AlarmHistorian/findings.md @@ -7,7 +7,7 @@ | Review date | 2026-05-22 | | Commit reviewed | `76d35d1` | | Status | Reviewed | -| Open findings | 11 | +| Open findings | 10 | ## Checklist coverage @@ -33,13 +33,13 @@ | Severity | Critical | | Category | Correctness & logic bugs | | Location | `src/Core/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/SqliteStoreAndForwardSink.cs:255-278` | -| Status | Open | +| Status | Resolved | **Description:** `ReadBatch` builds two parallel lists, `rowIds` and `events`, that `DrainOnceAsync` later indexes together (`rowIds[i]` paired with `outcomes[i]`, where `outcomes` is 1:1 with `events`). But `rowIds.Add(reader.GetInt64(0))` runs unconditionally for every row, while `events.Add(evt)` is guarded by `if (evt is not null)`. If `JsonSerializer.Deserialize` returns `null` for any row (corrupt or empty payload), `rowIds` gains an entry but `events` does not. The writer then returns `outcomes.Count == events.Count`, which passes the `outcomes.Count != events.Count` guard, and the per-row loop applies each outcome to `rowIds[i]` — every row from the skipped index onward is mapped to the wrong event's outcome. An `Ack` can delete a row whose event was never sent to the historian (silent alarm-event data loss), and a `PermanentFail` can dead-letter an unrelated good row. The corrupt row itself is never advanced and is re-read on every drain forever, permanently stalling the queue head. **Recommendation:** Keep `rowIds` and `events` strictly aligned. Either skip the `rowId` when deserialization returns `null`, or — better — treat a `null`/failed deserialization as an immediate dead-letter for that specific `RowId` (it can never succeed) and exclude it from the batch passed to the writer. Carry the `rowId` inside a single list of `(long RowId, AlarmHistorianEvent Event)` tuples so the two can never drift. -**Resolution:** _(open)_ +**Resolution:** Resolved 2026-05-22 — `ReadBatch` now returns a single list of `QueueRow(long RowId, AlarmHistorianEvent? Event)` records so a rowId can never drift from its event; `DrainOnceAsync` immediately dead-letters rows whose payload is null/un-deserializable (also catching `JsonException`) and forwards only well-formed events to the writer, mapping outcomes by `liveRows[i].RowId`. Regression tests `Drain_with_corrupt_payload_row_deadletters_it_and_keeps_good_rows_aligned` and `Drain_with_corrupt_head_row_does_not_stall_queue` added. ### Core.AlarmHistorian-002 diff --git a/src/Core/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/SqliteStoreAndForwardSink.cs b/src/Core/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/SqliteStoreAndForwardSink.cs index 1b56ba5..77481cc 100644 --- a/src/Core/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/SqliteStoreAndForwardSink.cs +++ b/src/Core/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/SqliteStoreAndForwardSink.cs @@ -142,8 +142,34 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable _lastDrainUtc = _clock(); PurgeAgedDeadLetters(); - var (rowIds, events) = ReadBatch(); - if (rowIds.Count == 0) + var batch = ReadBatch(); + if (batch.Count == 0) + { + _drainState = HistorianDrainState.Idle; + return; + } + + // A null/un-deserializable payload can never succeed — dead-letter it + // immediately for its own RowId so it cannot stall the queue head, and + // exclude it from the batch handed to the writer. + var corruptRowIds = batch.Where(r => r.Event is null).Select(r => r.RowId).ToList(); + var liveRows = batch.Where(r => r.Event is not null).ToList(); + var events = liveRows.Select(r => r.Event!).ToList(); + + if (corruptRowIds.Count > 0) + { + using var corruptConn = new SqliteConnection(_connectionString); + corruptConn.Open(); + using var corruptTx = corruptConn.BeginTransaction(); + foreach (var rowId in corruptRowIds) + DeadLetterRow(corruptConn, corruptTx, rowId, $"corrupt payload at {_clock():O}"); + corruptTx.Commit(); + _logger.Warning( + "Dead-lettered {Count} historian queue row(s) with un-deserializable payload", + corruptRowIds.Count); + } + + if (events.Count == 0) { _drainState = HistorianDrainState.Idle; return; @@ -179,7 +205,7 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable for (var i = 0; i < outcomes.Count; i++) { var outcome = outcomes[i]; - var rowId = rowIds[i]; + var rowId = liveRows[i].RowId; switch (outcome) { case HistorianWriteOutcome.Ack: @@ -252,10 +278,17 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable return cmd.ExecuteNonQuery(); } - private (List rowIds, List events) ReadBatch() + /// + /// One queued row paired with its deserialized event. is + /// null when the row's PayloadJson is corrupt or un-deserializable — + /// the always stays bound to its own row so outcomes can + /// never be mapped to the wrong row. + /// + private readonly record struct QueueRow(long RowId, AlarmHistorianEvent? Event); + + private List ReadBatch() { - var rowIds = new List(); - var events = new List(); + var rows = new List(); using var conn = new SqliteConnection(_connectionString); conn.Open(); using var cmd = conn.CreateCommand(); @@ -269,12 +302,21 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable using var reader = cmd.ExecuteReader(); while (reader.Read()) { - rowIds.Add(reader.GetInt64(0)); + var rowId = reader.GetInt64(0); var payload = reader.GetString(1); - var evt = JsonSerializer.Deserialize(payload); - if (evt is not null) events.Add(evt); + AlarmHistorianEvent? evt; + try + { + evt = JsonSerializer.Deserialize(payload); + } + catch (JsonException) + { + // Malformed JSON — carry a null event so the caller dead-letters this row. + evt = null; + } + rows.Add(new QueueRow(rowId, evt)); } - return (rowIds, events); + return rows; } private static void DeleteRow(SqliteConnection conn, SqliteTransaction tx, long rowId) diff --git a/tests/Core/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.Tests/SqliteStoreAndForwardSinkTests.cs b/tests/Core/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.Tests/SqliteStoreAndForwardSinkTests.cs index 7ed87f1..79aa6a9 100644 --- a/tests/Core/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.Tests/SqliteStoreAndForwardSinkTests.cs +++ b/tests/Core/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.Tests/SqliteStoreAndForwardSinkTests.cs @@ -1,3 +1,4 @@ +using Microsoft.Data.Sqlite; using Serilog; using Serilog.Core; using Serilog.Events; @@ -283,4 +284,78 @@ public sealed class SqliteStoreAndForwardSinkTests : IDisposable await Should.ThrowAsync( () => sink.EnqueueAsync(Event("A1"), CancellationToken.None)); } + + /// + /// Regression for Core.AlarmHistorian-001: a corrupt / un-deserializable + /// PayloadJson row sitting between good rows must not desync the rowId/event + /// lists. The corrupt row is dead-lettered (so it cannot stall the queue head + /// forever), and every good row's outcome is applied to the CORRECT RowId — + /// no good event is silently lost. + /// + [Fact] + public async Task Drain_with_corrupt_payload_row_deadletters_it_and_keeps_good_rows_aligned() + { + var writer = new FakeWriter(); + using var sink = new SqliteStoreAndForwardSink(_dbPath, writer, _log); + + // Row 1: good. Row 2: corrupt JSON (inserted directly). Row 3: good. + await sink.EnqueueAsync(Event("good-1"), CancellationToken.None); + InsertCorruptRow("corrupt"); + await sink.EnqueueAsync(Event("good-2"), CancellationToken.None); + + await sink.DrainOnceAsync(CancellationToken.None); + + // The writer must only see the two well-formed events, in order. + writer.Batches.Count.ShouldBe(1); + writer.Batches[0].Select(e => e.AlarmId).ShouldBe(["good-1", "good-2"]); + + var status = sink.GetStatus(); + // Both good rows acked + removed; the corrupt row is dead-lettered, not lost. + status.QueueDepth.ShouldBe(0, "both good rows acked and removed"); + status.DeadLetterDepth.ShouldBe(1, "corrupt row dead-lettered, not silently dropped"); + + // A second drain must be a clean no-op — the corrupt row no longer stalls the head. + await sink.DrainOnceAsync(CancellationToken.None); + writer.Batches.Count.ShouldBe(1, "no further batches — corrupt row no longer re-read"); + } + + /// + /// A corrupt row at the very head of the queue must be dead-lettered and not + /// prevent the good rows behind it from draining. + /// + [Fact] + public async Task Drain_with_corrupt_head_row_does_not_stall_queue() + { + var writer = new FakeWriter(); + using var sink = new SqliteStoreAndForwardSink(_dbPath, writer, _log); + + InsertCorruptRow("corrupt-head"); + await sink.EnqueueAsync(Event("good-1"), CancellationToken.None); + + await sink.DrainOnceAsync(CancellationToken.None); + + writer.Batches.Count.ShouldBe(1); + writer.Batches[0].Select(e => e.AlarmId).ShouldBe(["good-1"]); + + var status = sink.GetStatus(); + status.QueueDepth.ShouldBe(0); + status.DeadLetterDepth.ShouldBe(1); + } + + /// Insert a queue row whose PayloadJson cannot deserialize into an AlarmHistorianEvent. + private void InsertCorruptRow(string alarmId) + { + using var conn = new SqliteConnection($"Data Source={_dbPath}"); + conn.Open(); + using var cmd = conn.CreateCommand(); + cmd.CommandText = """ + INSERT INTO Queue (AlarmId, EnqueuedUtc, PayloadJson, AttemptCount) + VALUES ($alarmId, $enqueued, $payload, 0); + """; + cmd.Parameters.AddWithValue("$alarmId", alarmId); + cmd.Parameters.AddWithValue("$enqueued", DateTime.UtcNow.ToString("O")); + // JSON literal "null" round-trips through Deserialize as a null reference. + cmd.Parameters.AddWithValue("$payload", "null"); + cmd.ExecuteNonQuery(); + } }