From fcb38014159b82eb04e37e2f2196b9726d878d80 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Tue, 16 Jun 2026 05:25:43 -0400 Subject: [PATCH] fix(historian): dead-letter poison events after maxAttempts (finding 002) --- .../SqliteStoreAndForwardSink.cs | 32 ++++++++++++++++--- .../Historian/AlarmHistorianOptions.cs | 6 ++++ .../ServiceCollectionExtensions.cs | 3 +- .../SqliteStoreAndForwardSinkTests.cs | 32 +++++++++++++++++++ 4 files changed, 67 insertions(+), 6 deletions(-) diff --git a/src/Core/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/SqliteStoreAndForwardSink.cs b/src/Core/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/SqliteStoreAndForwardSink.cs index 570ded36..f00ea542 100644 --- a/src/Core/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/SqliteStoreAndForwardSink.cs +++ b/src/Core/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/SqliteStoreAndForwardSink.cs @@ -51,6 +51,9 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable public const long DefaultCapacity = 1_000_000; public static readonly TimeSpan DefaultDeadLetterRetention = TimeSpan.FromDays(30); + /// Default max delivery attempts before a perpetually-retrying (poison) row is dead-lettered. + public const int DefaultMaxAttempts = 10; + private static readonly TimeSpan[] BackoffLadder = [ TimeSpan.FromSeconds(1), @@ -66,6 +69,7 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable private readonly int _batchSize; private readonly long _capacity; private readonly TimeSpan _deadLetterRetention; + private readonly int _maxAttempts; private readonly Func _clock; private readonly SemaphoreSlim _drainGate = new(1, 1); @@ -115,6 +119,7 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable /// The maximum number of rows to forward in a single batch. Defaults to 100. /// The maximum queue capacity before evicting oldest rows. Defaults to 1,000,000. /// The timespan to retain dead-lettered rows before purging. Defaults to 30 days. + /// The maximum number of delivery attempts before a perpetually-retrying (poison) row is dead-lettered. Defaults to 10. /// Optional clock function for testing; defaults to . public SqliteStoreAndForwardSink( string databasePath, @@ -123,6 +128,7 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable int batchSize = 100, long capacity = DefaultCapacity, TimeSpan? deadLetterRetention = null, + int maxAttempts = DefaultMaxAttempts, Func? clock = null) { if (string.IsNullOrWhiteSpace(databasePath)) @@ -132,6 +138,7 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable _batchSize = batchSize > 0 ? batchSize : throw new ArgumentOutOfRangeException(nameof(batchSize)); _capacity = capacity > 0 ? capacity : throw new ArgumentOutOfRangeException(nameof(capacity)); _deadLetterRetention = deadLetterRetention ?? DefaultDeadLetterRetention; + _maxAttempts = maxAttempts > 0 ? maxAttempts : throw new ArgumentOutOfRangeException(nameof(maxAttempts)); _clock = clock ?? (() => DateTime.UtcNow); // DefaultTimeout gives ADO.NET command-level retry; the PRAGMA busy_timeout // applied in OpenConnection backs it with SQLite's own busy-handler so an @@ -457,14 +464,28 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable rowsLeavingQueue++; break; case HistorianWriteOutcome.RetryPlease: - BumpAttempt(conn, tx, rowId, "retry-please"); + // finding 002: cap retries so a perpetually-RetryPlease (poison) + // row cannot retry forever at the 60s backoff floor. The incoming + // AttemptCount is the count BEFORE this attempt; +1 accounts for the + // bump this drain represents. At the cap, dead-letter instead of + // bumping — and count it as leaving the live queue like PermanentFail. + if (liveRows[i].AttemptCount + 1 >= _maxAttempts) + { + DeadLetterRow(conn, tx, rowId, $"max attempts ({_maxAttempts}) exceeded"); + rowsLeavingQueue++; + } + else + { + BumpAttempt(conn, tx, rowId, "retry-please"); + } break; } } tx.Commit(); } // Ack-deleted + PermanentFail-dead-lettered rows both leave the - // non-dead-lettered queue — keep the counter aligned (Core.AlarmHistorian-008). + // non-dead-lettered queue, as do RetryPlease rows that hit the max-attempts + // cap (finding 002) — keep the counter aligned (Core.AlarmHistorian-008). if (rowsLeavingQueue > 0) Interlocked.Add(ref _queuedRowCount, -rowsLeavingQueue); @@ -552,14 +573,14 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable /// the always stays bound to its own row so outcomes can /// never be mapped to the wrong row. /// - private readonly record struct QueueRow(long RowId, AlarmHistorianEvent? Event); + private readonly record struct QueueRow(long RowId, AlarmHistorianEvent? Event, long AttemptCount); private List ReadBatch(SqliteConnection conn) { var rows = new List(); using var cmd = conn.CreateCommand(); cmd.CommandText = """ - SELECT RowId, PayloadJson FROM Queue + SELECT RowId, PayloadJson, AttemptCount FROM Queue WHERE DeadLettered = 0 ORDER BY RowId ASC LIMIT $limit @@ -570,6 +591,7 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable { var rowId = reader.GetInt64(0); var payload = reader.GetString(1); + var attemptCount = reader.GetInt64(2); AlarmHistorianEvent? evt; try { @@ -580,7 +602,7 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable // Malformed JSON — carry a null event so the caller dead-letters this row. evt = null; } - rows.Add(new QueueRow(rowId, evt)); + rows.Add(new QueueRow(rowId, evt, attemptCount)); } return rows; } diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Historian/AlarmHistorianOptions.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Historian/AlarmHistorianOptions.cs index 5bdb8096..c01fc059 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Historian/AlarmHistorianOptions.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Historian/AlarmHistorianOptions.cs @@ -53,6 +53,10 @@ public sealed class AlarmHistorianOptions /// Days to retain dead-lettered rows before purge. Defaults to 30. public int DeadLetterRetentionDays { get; init; } = 30; + /// Maximum delivery attempts before a perpetually-retrying (poison) row is dead-lettered. + /// Defaults to 10 (matches SqliteStoreAndForwardSink's DefaultMaxAttempts). + public int MaxAttempts { get; init; } = SqliteStoreAndForwardSink.DefaultMaxAttempts; + /// Returns operator-facing misconfiguration warnings for an Enabled historian /// (empty when disabled or correctly configured). Pure — the registration logs each entry. /// Zero or more human-readable warning messages. @@ -70,6 +74,8 @@ public sealed class AlarmHistorianOptions warnings.Add($"AlarmHistorian:Capacity is {Capacity} — must be > 0; the sink constructor will throw at startup."); if (DeadLetterRetentionDays <= 0) warnings.Add($"AlarmHistorian:DeadLetterRetentionDays is {DeadLetterRetentionDays} — must be > 0; dead-lettered rows would be purged on every drain tick."); + if (MaxAttempts <= 0) + warnings.Add($"AlarmHistorian:MaxAttempts is {MaxAttempts} — must be > 0; the sink constructor will throw at startup."); return warnings; } } diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ServiceCollectionExtensions.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ServiceCollectionExtensions.cs index c97131ac..c0604569 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ServiceCollectionExtensions.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ServiceCollectionExtensions.cs @@ -93,7 +93,8 @@ public static class ServiceCollectionExtensions Serilog.Log.Logger.ForContext(), batchSize: opts.BatchSize, capacity: opts.Capacity, - deadLetterRetention: TimeSpan.FromDays(opts.DeadLetterRetentionDays)); + deadLetterRetention: TimeSpan.FromDays(opts.DeadLetterRetentionDays), + maxAttempts: opts.MaxAttempts); sink.StartDrainLoop(TimeSpan.FromSeconds(opts.DrainIntervalSeconds)); return sink; }); diff --git a/tests/Core/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.Tests/SqliteStoreAndForwardSinkTests.cs b/tests/Core/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.Tests/SqliteStoreAndForwardSinkTests.cs index 0ae583d6..d0348822 100644 --- a/tests/Core/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.Tests/SqliteStoreAndForwardSinkTests.cs +++ b/tests/Core/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.Tests/SqliteStoreAndForwardSinkTests.cs @@ -268,6 +268,37 @@ public sealed class SqliteStoreAndForwardSinkTests : IDisposable sink.CurrentBackoff.ShouldBe(TimeSpan.FromSeconds(60)); } + /// + /// Regression for finding 002: a permanently-malformed (poison) event that the + /// writer can only ever map to + /// must NOT retry forever. After maxAttempts retry-please drains the row + /// is dead-lettered so the queue head can never stall on it indefinitely. + /// + [Fact] + public async Task RetryPlease_dead_letters_row_after_MaxAttempts() + { + var writer = new FakeWriter { DefaultOutcome = HistorianWriteOutcome.RetryPlease }; + using var sink = new SqliteStoreAndForwardSink( + _dbPath, writer, _log, maxAttempts: 3); + + await sink.EnqueueAsync(Event("poison"), CancellationToken.None); + + // Drain 1 + 2: still retrying — the row stays queued, not yet dead-lettered. + await sink.DrainOnceAsync(CancellationToken.None); + sink.GetStatus().QueueDepth.ShouldBe(1, "after 1 attempt the row is still queued"); + sink.GetStatus().DeadLetterDepth.ShouldBe(0, "not yet dead-lettered after 1 attempt"); + + await sink.DrainOnceAsync(CancellationToken.None); + sink.GetStatus().QueueDepth.ShouldBe(1, "after 2 attempts the row is still queued"); + sink.GetStatus().DeadLetterDepth.ShouldBe(0, "not yet dead-lettered after 2 attempts"); + + // Drain 3: the 3rd attempt hits the cap — dead-letter it. + await sink.DrainOnceAsync(CancellationToken.None); + var status = sink.GetStatus(); + status.QueueDepth.ShouldBe(0, "row left the live queue once max attempts exceeded"); + status.DeadLetterDepth.ShouldBe(1, "poison row dead-lettered at the max-attempts cap"); + } + /// Verifies that NullAlarmHistorianSink reports disabled status. [Fact] public void NullAlarmHistorianSink_reports_disabled_status() @@ -295,6 +326,7 @@ public sealed class SqliteStoreAndForwardSinkTests : IDisposable Should.Throw(() => new SqliteStoreAndForwardSink(_dbPath, w, null!)); Should.Throw(() => new SqliteStoreAndForwardSink(_dbPath, w, _log, batchSize: 0)); Should.Throw(() => new SqliteStoreAndForwardSink(_dbPath, w, _log, capacity: 0)); + Should.Throw(() => new SqliteStoreAndForwardSink(_dbPath, w, _log, maxAttempts: 0)); } /// Verifies that a disposed sink rejects enqueue operations.