fix(historian): dead-letter poison events after maxAttempts (finding 002)

This commit is contained in:
Joseph Doherty
2026-06-16 05:25:43 -04:00
parent 5e27b5f708
commit fcb3801415
4 changed files with 67 additions and 6 deletions
@@ -51,6 +51,9 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
public const long DefaultCapacity = 1_000_000; public const long DefaultCapacity = 1_000_000;
public static readonly TimeSpan DefaultDeadLetterRetention = TimeSpan.FromDays(30); public static readonly TimeSpan DefaultDeadLetterRetention = TimeSpan.FromDays(30);
/// <summary>Default max delivery attempts before a perpetually-retrying (poison) row is dead-lettered.</summary>
public const int DefaultMaxAttempts = 10;
private static readonly TimeSpan[] BackoffLadder = private static readonly TimeSpan[] BackoffLadder =
[ [
TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1),
@@ -66,6 +69,7 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
private readonly int _batchSize; private readonly int _batchSize;
private readonly long _capacity; private readonly long _capacity;
private readonly TimeSpan _deadLetterRetention; private readonly TimeSpan _deadLetterRetention;
private readonly int _maxAttempts;
private readonly Func<DateTime> _clock; private readonly Func<DateTime> _clock;
private readonly SemaphoreSlim _drainGate = new(1, 1); private readonly SemaphoreSlim _drainGate = new(1, 1);
@@ -115,6 +119,7 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
/// <param name="batchSize">The maximum number of rows to forward in a single batch. Defaults to 100.</param> /// <param name="batchSize">The maximum number of rows to forward in a single batch. Defaults to 100.</param>
/// <param name="capacity">The maximum queue capacity before evicting oldest rows. Defaults to 1,000,000.</param> /// <param name="capacity">The maximum queue capacity before evicting oldest rows. Defaults to 1,000,000.</param>
/// <param name="deadLetterRetention">The timespan to retain dead-lettered rows before purging. Defaults to 30 days.</param> /// <param name="deadLetterRetention">The timespan to retain dead-lettered rows before purging. Defaults to 30 days.</param>
/// <param name="maxAttempts">The maximum number of delivery attempts before a perpetually-retrying (poison) row is dead-lettered. Defaults to 10.</param>
/// <param name="clock">Optional clock function for testing; defaults to <see cref="DateTime.UtcNow"/>.</param> /// <param name="clock">Optional clock function for testing; defaults to <see cref="DateTime.UtcNow"/>.</param>
public SqliteStoreAndForwardSink( public SqliteStoreAndForwardSink(
string databasePath, string databasePath,
@@ -123,6 +128,7 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
int batchSize = 100, int batchSize = 100,
long capacity = DefaultCapacity, long capacity = DefaultCapacity,
TimeSpan? deadLetterRetention = null, TimeSpan? deadLetterRetention = null,
int maxAttempts = DefaultMaxAttempts,
Func<DateTime>? clock = null) Func<DateTime>? clock = null)
{ {
if (string.IsNullOrWhiteSpace(databasePath)) if (string.IsNullOrWhiteSpace(databasePath))
@@ -132,6 +138,7 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
_batchSize = batchSize > 0 ? batchSize : throw new ArgumentOutOfRangeException(nameof(batchSize)); _batchSize = batchSize > 0 ? batchSize : throw new ArgumentOutOfRangeException(nameof(batchSize));
_capacity = capacity > 0 ? capacity : throw new ArgumentOutOfRangeException(nameof(capacity)); _capacity = capacity > 0 ? capacity : throw new ArgumentOutOfRangeException(nameof(capacity));
_deadLetterRetention = deadLetterRetention ?? DefaultDeadLetterRetention; _deadLetterRetention = deadLetterRetention ?? DefaultDeadLetterRetention;
_maxAttempts = maxAttempts > 0 ? maxAttempts : throw new ArgumentOutOfRangeException(nameof(maxAttempts));
_clock = clock ?? (() => DateTime.UtcNow); _clock = clock ?? (() => DateTime.UtcNow);
// DefaultTimeout gives ADO.NET command-level retry; the PRAGMA busy_timeout // DefaultTimeout gives ADO.NET command-level retry; the PRAGMA busy_timeout
// applied in OpenConnection backs it with SQLite's own busy-handler so an // applied in OpenConnection backs it with SQLite's own busy-handler so an
@@ -457,14 +464,28 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
rowsLeavingQueue++; rowsLeavingQueue++;
break; break;
case HistorianWriteOutcome.RetryPlease: case HistorianWriteOutcome.RetryPlease:
BumpAttempt(conn, tx, rowId, "retry-please"); // finding 002: cap retries so a perpetually-RetryPlease (poison)
// row cannot retry forever at the 60s backoff floor. The incoming
// AttemptCount is the count BEFORE this attempt; +1 accounts for the
// bump this drain represents. At the cap, dead-letter instead of
// bumping — and count it as leaving the live queue like PermanentFail.
if (liveRows[i].AttemptCount + 1 >= _maxAttempts)
{
DeadLetterRow(conn, tx, rowId, $"max attempts ({_maxAttempts}) exceeded");
rowsLeavingQueue++;
}
else
{
BumpAttempt(conn, tx, rowId, "retry-please");
}
break; break;
} }
} }
tx.Commit(); tx.Commit();
} }
// Ack-deleted + PermanentFail-dead-lettered rows both leave the // Ack-deleted + PermanentFail-dead-lettered rows both leave the
// non-dead-lettered queue — keep the counter aligned (Core.AlarmHistorian-008). // non-dead-lettered queue, as do RetryPlease rows that hit the max-attempts
// cap (finding 002) — keep the counter aligned (Core.AlarmHistorian-008).
if (rowsLeavingQueue > 0) if (rowsLeavingQueue > 0)
Interlocked.Add(ref _queuedRowCount, -rowsLeavingQueue); Interlocked.Add(ref _queuedRowCount, -rowsLeavingQueue);
@@ -552,14 +573,14 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
/// the <see cref="RowId"/> always stays bound to its own row so outcomes can /// the <see cref="RowId"/> always stays bound to its own row so outcomes can
/// never be mapped to the wrong row. /// never be mapped to the wrong row.
/// </summary> /// </summary>
private readonly record struct QueueRow(long RowId, AlarmHistorianEvent? Event); private readonly record struct QueueRow(long RowId, AlarmHistorianEvent? Event, long AttemptCount);
private List<QueueRow> ReadBatch(SqliteConnection conn) private List<QueueRow> ReadBatch(SqliteConnection conn)
{ {
var rows = new List<QueueRow>(); var rows = new List<QueueRow>();
using var cmd = conn.CreateCommand(); using var cmd = conn.CreateCommand();
cmd.CommandText = """ cmd.CommandText = """
SELECT RowId, PayloadJson FROM Queue SELECT RowId, PayloadJson, AttemptCount FROM Queue
WHERE DeadLettered = 0 WHERE DeadLettered = 0
ORDER BY RowId ASC ORDER BY RowId ASC
LIMIT $limit LIMIT $limit
@@ -570,6 +591,7 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
{ {
var rowId = reader.GetInt64(0); var rowId = reader.GetInt64(0);
var payload = reader.GetString(1); var payload = reader.GetString(1);
var attemptCount = reader.GetInt64(2);
AlarmHistorianEvent? evt; AlarmHistorianEvent? evt;
try try
{ {
@@ -580,7 +602,7 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
// Malformed JSON — carry a null event so the caller dead-letters this row. // Malformed JSON — carry a null event so the caller dead-letters this row.
evt = null; evt = null;
} }
rows.Add(new QueueRow(rowId, evt)); rows.Add(new QueueRow(rowId, evt, attemptCount));
} }
return rows; return rows;
} }
@@ -53,6 +53,10 @@ public sealed class AlarmHistorianOptions
/// <summary>Days to retain dead-lettered rows before purge. Defaults to 30.</summary> /// <summary>Days to retain dead-lettered rows before purge. Defaults to 30.</summary>
public int DeadLetterRetentionDays { get; init; } = 30; public int DeadLetterRetentionDays { get; init; } = 30;
/// <summary>Maximum delivery attempts before a perpetually-retrying (poison) row is dead-lettered.
/// Defaults to 10 (matches <c>SqliteStoreAndForwardSink</c>'s <c>DefaultMaxAttempts</c>).</summary>
public int MaxAttempts { get; init; } = SqliteStoreAndForwardSink.DefaultMaxAttempts;
/// <summary>Returns operator-facing misconfiguration warnings for an <c>Enabled</c> historian /// <summary>Returns operator-facing misconfiguration warnings for an <c>Enabled</c> historian
/// (empty when disabled or correctly configured). Pure — the registration logs each entry.</summary> /// (empty when disabled or correctly configured). Pure — the registration logs each entry.</summary>
/// <returns>Zero or more human-readable warning messages.</returns> /// <returns>Zero or more human-readable warning messages.</returns>
@@ -70,6 +74,8 @@ public sealed class AlarmHistorianOptions
warnings.Add($"AlarmHistorian:Capacity is {Capacity} — must be > 0; the sink constructor will throw at startup."); warnings.Add($"AlarmHistorian:Capacity is {Capacity} — must be > 0; the sink constructor will throw at startup.");
if (DeadLetterRetentionDays <= 0) if (DeadLetterRetentionDays <= 0)
warnings.Add($"AlarmHistorian:DeadLetterRetentionDays is {DeadLetterRetentionDays} — must be > 0; dead-lettered rows would be purged on every drain tick."); warnings.Add($"AlarmHistorian:DeadLetterRetentionDays is {DeadLetterRetentionDays} — must be > 0; dead-lettered rows would be purged on every drain tick.");
if (MaxAttempts <= 0)
warnings.Add($"AlarmHistorian:MaxAttempts is {MaxAttempts} — must be > 0; the sink constructor will throw at startup.");
return warnings; return warnings;
} }
} }
@@ -93,7 +93,8 @@ public static class ServiceCollectionExtensions
Serilog.Log.Logger.ForContext<SqliteStoreAndForwardSink>(), Serilog.Log.Logger.ForContext<SqliteStoreAndForwardSink>(),
batchSize: opts.BatchSize, batchSize: opts.BatchSize,
capacity: opts.Capacity, capacity: opts.Capacity,
deadLetterRetention: TimeSpan.FromDays(opts.DeadLetterRetentionDays)); deadLetterRetention: TimeSpan.FromDays(opts.DeadLetterRetentionDays),
maxAttempts: opts.MaxAttempts);
sink.StartDrainLoop(TimeSpan.FromSeconds(opts.DrainIntervalSeconds)); sink.StartDrainLoop(TimeSpan.FromSeconds(opts.DrainIntervalSeconds));
return sink; return sink;
}); });
@@ -268,6 +268,37 @@ public sealed class SqliteStoreAndForwardSinkTests : IDisposable
sink.CurrentBackoff.ShouldBe(TimeSpan.FromSeconds(60)); sink.CurrentBackoff.ShouldBe(TimeSpan.FromSeconds(60));
} }
/// <summary>
/// Regression for finding 002: a permanently-malformed (poison) event that the
/// writer can only ever map to <see cref="HistorianWriteOutcome.RetryPlease"/>
/// must NOT retry forever. After <c>maxAttempts</c> retry-please drains the row
/// is dead-lettered so the queue head can never stall on it indefinitely.
/// </summary>
[Fact]
public async Task RetryPlease_dead_letters_row_after_MaxAttempts()
{
var writer = new FakeWriter { DefaultOutcome = HistorianWriteOutcome.RetryPlease };
using var sink = new SqliteStoreAndForwardSink(
_dbPath, writer, _log, maxAttempts: 3);
await sink.EnqueueAsync(Event("poison"), CancellationToken.None);
// Drain 1 + 2: still retrying — the row stays queued, not yet dead-lettered.
await sink.DrainOnceAsync(CancellationToken.None);
sink.GetStatus().QueueDepth.ShouldBe(1, "after 1 attempt the row is still queued");
sink.GetStatus().DeadLetterDepth.ShouldBe(0, "not yet dead-lettered after 1 attempt");
await sink.DrainOnceAsync(CancellationToken.None);
sink.GetStatus().QueueDepth.ShouldBe(1, "after 2 attempts the row is still queued");
sink.GetStatus().DeadLetterDepth.ShouldBe(0, "not yet dead-lettered after 2 attempts");
// Drain 3: the 3rd attempt hits the cap — dead-letter it.
await sink.DrainOnceAsync(CancellationToken.None);
var status = sink.GetStatus();
status.QueueDepth.ShouldBe(0, "row left the live queue once max attempts exceeded");
status.DeadLetterDepth.ShouldBe(1, "poison row dead-lettered at the max-attempts cap");
}
/// <summary>Verifies that NullAlarmHistorianSink reports disabled status.</summary> /// <summary>Verifies that NullAlarmHistorianSink reports disabled status.</summary>
[Fact] [Fact]
public void NullAlarmHistorianSink_reports_disabled_status() public void NullAlarmHistorianSink_reports_disabled_status()
@@ -295,6 +326,7 @@ public sealed class SqliteStoreAndForwardSinkTests : IDisposable
Should.Throw<ArgumentNullException>(() => new SqliteStoreAndForwardSink(_dbPath, w, null!)); Should.Throw<ArgumentNullException>(() => new SqliteStoreAndForwardSink(_dbPath, w, null!));
Should.Throw<ArgumentOutOfRangeException>(() => new SqliteStoreAndForwardSink(_dbPath, w, _log, batchSize: 0)); Should.Throw<ArgumentOutOfRangeException>(() => new SqliteStoreAndForwardSink(_dbPath, w, _log, batchSize: 0));
Should.Throw<ArgumentOutOfRangeException>(() => new SqliteStoreAndForwardSink(_dbPath, w, _log, capacity: 0)); Should.Throw<ArgumentOutOfRangeException>(() => new SqliteStoreAndForwardSink(_dbPath, w, _log, capacity: 0));
Should.Throw<ArgumentOutOfRangeException>(() => new SqliteStoreAndForwardSink(_dbPath, w, _log, maxAttempts: 0));
} }
/// <summary>Verifies that a disposed sink rejects enqueue operations.</summary> /// <summary>Verifies that a disposed sink rejects enqueue operations.</summary>