diff --git a/src/Core/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/SqliteStoreAndForwardSink.cs b/src/Core/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/SqliteStoreAndForwardSink.cs
index 570ded36..f00ea542 100644
--- a/src/Core/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/SqliteStoreAndForwardSink.cs
+++ b/src/Core/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/SqliteStoreAndForwardSink.cs
@@ -51,6 +51,9 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
public const long DefaultCapacity = 1_000_000;
public static readonly TimeSpan DefaultDeadLetterRetention = TimeSpan.FromDays(30);
+ /// Default max delivery attempts before a perpetually-retrying (poison) row is dead-lettered.
+ public const int DefaultMaxAttempts = 10;
+
private static readonly TimeSpan[] BackoffLadder =
[
TimeSpan.FromSeconds(1),
@@ -66,6 +69,7 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
private readonly int _batchSize;
private readonly long _capacity;
private readonly TimeSpan _deadLetterRetention;
+ private readonly int _maxAttempts;
private readonly Func _clock;
private readonly SemaphoreSlim _drainGate = new(1, 1);
@@ -115,6 +119,7 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
/// The maximum number of rows to forward in a single batch. Defaults to 100.
/// The maximum queue capacity before evicting oldest rows. Defaults to 1,000,000.
/// The timespan to retain dead-lettered rows before purging. Defaults to 30 days.
+ /// The maximum number of delivery attempts before a perpetually-retrying (poison) row is dead-lettered. Defaults to 10.
/// Optional clock function for testing; defaults to .
public SqliteStoreAndForwardSink(
string databasePath,
@@ -123,6 +128,7 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
int batchSize = 100,
long capacity = DefaultCapacity,
TimeSpan? deadLetterRetention = null,
+ int maxAttempts = DefaultMaxAttempts,
Func? clock = null)
{
if (string.IsNullOrWhiteSpace(databasePath))
@@ -132,6 +138,7 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
_batchSize = batchSize > 0 ? batchSize : throw new ArgumentOutOfRangeException(nameof(batchSize));
_capacity = capacity > 0 ? capacity : throw new ArgumentOutOfRangeException(nameof(capacity));
_deadLetterRetention = deadLetterRetention ?? DefaultDeadLetterRetention;
+ _maxAttempts = maxAttempts > 0 ? maxAttempts : throw new ArgumentOutOfRangeException(nameof(maxAttempts));
_clock = clock ?? (() => DateTime.UtcNow);
// DefaultTimeout gives ADO.NET command-level retry; the PRAGMA busy_timeout
// applied in OpenConnection backs it with SQLite's own busy-handler so an
@@ -457,14 +464,28 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
rowsLeavingQueue++;
break;
case HistorianWriteOutcome.RetryPlease:
- BumpAttempt(conn, tx, rowId, "retry-please");
+ // finding 002: cap retries so a perpetually-RetryPlease (poison)
+ // row cannot retry forever at the 60s backoff floor. The incoming
+ // AttemptCount is the count BEFORE this attempt; +1 accounts for the
+ // bump this drain represents. At the cap, dead-letter instead of
+ // bumping — and count it as leaving the live queue like PermanentFail.
+ if (liveRows[i].AttemptCount + 1 >= _maxAttempts)
+ {
+ DeadLetterRow(conn, tx, rowId, $"max attempts ({_maxAttempts}) exceeded");
+ rowsLeavingQueue++;
+ }
+ else
+ {
+ BumpAttempt(conn, tx, rowId, "retry-please");
+ }
break;
}
}
tx.Commit();
}
// Ack-deleted + PermanentFail-dead-lettered rows both leave the
- // non-dead-lettered queue — keep the counter aligned (Core.AlarmHistorian-008).
+ // non-dead-lettered queue, as do RetryPlease rows that hit the max-attempts
+ // cap (finding 002) — keep the counter aligned (Core.AlarmHistorian-008).
if (rowsLeavingQueue > 0)
Interlocked.Add(ref _queuedRowCount, -rowsLeavingQueue);
@@ -552,14 +573,14 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
/// the always stays bound to its own row so outcomes can
/// never be mapped to the wrong row.
///
- private readonly record struct QueueRow(long RowId, AlarmHistorianEvent? Event);
+ private readonly record struct QueueRow(long RowId, AlarmHistorianEvent? Event, long AttemptCount);
private List ReadBatch(SqliteConnection conn)
{
var rows = new List();
using var cmd = conn.CreateCommand();
cmd.CommandText = """
- SELECT RowId, PayloadJson FROM Queue
+ SELECT RowId, PayloadJson, AttemptCount FROM Queue
WHERE DeadLettered = 0
ORDER BY RowId ASC
LIMIT $limit
@@ -570,6 +591,7 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
{
var rowId = reader.GetInt64(0);
var payload = reader.GetString(1);
+ var attemptCount = reader.GetInt64(2);
AlarmHistorianEvent? evt;
try
{
@@ -580,7 +602,7 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
// Malformed JSON — carry a null event so the caller dead-letters this row.
evt = null;
}
- rows.Add(new QueueRow(rowId, evt));
+ rows.Add(new QueueRow(rowId, evt, attemptCount));
}
return rows;
}
diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Historian/AlarmHistorianOptions.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Historian/AlarmHistorianOptions.cs
index 5bdb8096..c01fc059 100644
--- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Historian/AlarmHistorianOptions.cs
+++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Historian/AlarmHistorianOptions.cs
@@ -53,6 +53,10 @@ public sealed class AlarmHistorianOptions
/// Days to retain dead-lettered rows before purge. Defaults to 30.
public int DeadLetterRetentionDays { get; init; } = 30;
+ /// Maximum delivery attempts before a perpetually-retrying (poison) row is dead-lettered.
+ /// Defaults to 10 (matches SqliteStoreAndForwardSink's DefaultMaxAttempts).
+ public int MaxAttempts { get; init; } = SqliteStoreAndForwardSink.DefaultMaxAttempts;
+
/// Returns operator-facing misconfiguration warnings for an Enabled historian
/// (empty when disabled or correctly configured). Pure — the registration logs each entry.
/// Zero or more human-readable warning messages.
@@ -70,6 +74,8 @@ public sealed class AlarmHistorianOptions
warnings.Add($"AlarmHistorian:Capacity is {Capacity} — must be > 0; the sink constructor will throw at startup.");
if (DeadLetterRetentionDays <= 0)
warnings.Add($"AlarmHistorian:DeadLetterRetentionDays is {DeadLetterRetentionDays} — must be > 0; dead-lettered rows would be purged on every drain tick.");
+ if (MaxAttempts <= 0)
+ warnings.Add($"AlarmHistorian:MaxAttempts is {MaxAttempts} — must be > 0; the sink constructor will throw at startup.");
return warnings;
}
}
diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ServiceCollectionExtensions.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ServiceCollectionExtensions.cs
index c97131ac..c0604569 100644
--- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ServiceCollectionExtensions.cs
+++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/ServiceCollectionExtensions.cs
@@ -93,7 +93,8 @@ public static class ServiceCollectionExtensions
Serilog.Log.Logger.ForContext(),
batchSize: opts.BatchSize,
capacity: opts.Capacity,
- deadLetterRetention: TimeSpan.FromDays(opts.DeadLetterRetentionDays));
+ deadLetterRetention: TimeSpan.FromDays(opts.DeadLetterRetentionDays),
+ maxAttempts: opts.MaxAttempts);
sink.StartDrainLoop(TimeSpan.FromSeconds(opts.DrainIntervalSeconds));
return sink;
});
diff --git a/tests/Core/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.Tests/SqliteStoreAndForwardSinkTests.cs b/tests/Core/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.Tests/SqliteStoreAndForwardSinkTests.cs
index 0ae583d6..d0348822 100644
--- a/tests/Core/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.Tests/SqliteStoreAndForwardSinkTests.cs
+++ b/tests/Core/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian.Tests/SqliteStoreAndForwardSinkTests.cs
@@ -268,6 +268,37 @@ public sealed class SqliteStoreAndForwardSinkTests : IDisposable
sink.CurrentBackoff.ShouldBe(TimeSpan.FromSeconds(60));
}
+ ///
+ /// Regression for finding 002: a permanently-malformed (poison) event that the
+ /// writer can only ever map to
+ /// must NOT retry forever. After maxAttempts retry-please drains the row
+ /// is dead-lettered so the queue head can never stall on it indefinitely.
+ ///
+ [Fact]
+ public async Task RetryPlease_dead_letters_row_after_MaxAttempts()
+ {
+ var writer = new FakeWriter { DefaultOutcome = HistorianWriteOutcome.RetryPlease };
+ using var sink = new SqliteStoreAndForwardSink(
+ _dbPath, writer, _log, maxAttempts: 3);
+
+ await sink.EnqueueAsync(Event("poison"), CancellationToken.None);
+
+ // Drain 1 + 2: still retrying — the row stays queued, not yet dead-lettered.
+ await sink.DrainOnceAsync(CancellationToken.None);
+ sink.GetStatus().QueueDepth.ShouldBe(1, "after 1 attempt the row is still queued");
+ sink.GetStatus().DeadLetterDepth.ShouldBe(0, "not yet dead-lettered after 1 attempt");
+
+ await sink.DrainOnceAsync(CancellationToken.None);
+ sink.GetStatus().QueueDepth.ShouldBe(1, "after 2 attempts the row is still queued");
+ sink.GetStatus().DeadLetterDepth.ShouldBe(0, "not yet dead-lettered after 2 attempts");
+
+ // Drain 3: the 3rd attempt hits the cap — dead-letter it.
+ await sink.DrainOnceAsync(CancellationToken.None);
+ var status = sink.GetStatus();
+ status.QueueDepth.ShouldBe(0, "row left the live queue once max attempts exceeded");
+ status.DeadLetterDepth.ShouldBe(1, "poison row dead-lettered at the max-attempts cap");
+ }
+
/// Verifies that NullAlarmHistorianSink reports disabled status.
[Fact]
public void NullAlarmHistorianSink_reports_disabled_status()
@@ -295,6 +326,7 @@ public sealed class SqliteStoreAndForwardSinkTests : IDisposable
Should.Throw(() => new SqliteStoreAndForwardSink(_dbPath, w, null!));
Should.Throw(() => new SqliteStoreAndForwardSink(_dbPath, w, _log, batchSize: 0));
Should.Throw(() => new SqliteStoreAndForwardSink(_dbPath, w, _log, capacity: 0));
+ Should.Throw(() => new SqliteStoreAndForwardSink(_dbPath, w, _log, maxAttempts: 0));
}
/// Verifies that a disposed sink rejects enqueue operations.