diff --git a/code-reviews/Driver.AbCip/findings.md b/code-reviews/Driver.AbCip/findings.md
index cf664bc..21ec19c 100644
--- a/code-reviews/Driver.AbCip/findings.md
+++ b/code-reviews/Driver.AbCip/findings.md
@@ -108,13 +108,13 @@
| Severity | Medium |
| Category | OtOpcUa conventions |
| Location | `PlcTagHandle.cs:28-59`, `AbCipDriver.cs:806-807,832-833`, `LibplctagTagRuntime.cs:117` |
-| Status | Open |
+| Status | Resolved |
**Description:** `driver-specs.md` makes the SafeHandle-wrapped native handle a non-negotiable Tier-B protection ("Wrap every libplctag handle in a SafeHandle with finalizer calling plc_tag_destroy"). The repo ships `PlcTagHandle : SafeHandle` for this, but it is dead code: `ReleaseHandle` is a permanent no-op (the comment says the `plc_tag_destroy` P/Invoke "is deferred to PR 3", well past the commit under review), and `DeviceState.TagHandles` is never populated anywhere in the driver. The real native lifetime is delegated to the libplctag.NET `Tag` object own `Dispose()`. The mandated finalizer-backed leak protection therefore does not exist: if a `LibplctagTagRuntime` is GC-collected without `Dispose` (owning thread crashes, exception bypasses the device dispose path), whether the native tag is freed depends entirely on whether libplctag.NET `Tag` has its own finalizer, which is not guaranteed by this driver code as the design requires.
**Recommendation:** Either delete `PlcTagHandle` and `DeviceState.TagHandles` as misleading dead scaffolding and document that native lifetime is owned by libplctag.NET `Tag` finalizer (verifying that `Tag` actually has one), or finish the intended design by making `LibplctagTagRuntime` hold a real `PlcTagHandle` with a working `ReleaseHandle` calling `plc_tag_destroy`.
-**Resolution:** _(open)_
+**Resolution:** Resolved 2026-05-22 — `PlcTagHandle.cs` deleted; `DeviceState.TagHandles` removed from `DeviceState`; its `DisposeHandles` loop cleaned up. The class-level doc comment on `AbCipDriver` updated to document that native lifetime is owned by libplctag.NET `Tag.Dispose()` (called in `DisposeHandles`) with the library's own finalizer covering GC-collected instances. The two dead-code test methods for `PlcTagHandle` removed from `AbCipDriverTests`.
### Driver.AbCip-007
diff --git a/src/Core/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/SqliteStoreAndForwardSink.cs b/src/Core/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/SqliteStoreAndForwardSink.cs
index 77a0fb5..9e51857 100644
--- a/src/Core/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/SqliteStoreAndForwardSink.cs
+++ b/src/Core/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/SqliteStoreAndForwardSink.cs
@@ -6,9 +6,10 @@ namespace ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
///
/// Phase 7 plan decisions #16–#17 implementation: durable SQLite queue on the node
-/// absorbs every qualifying alarm event, a drain worker batches rows to Galaxy.Host
-/// via on an exponential-backoff cadence, and
-/// operator acks never block on the historian being reachable.
+/// absorbs every qualifying alarm event, a drain worker batches rows to the
+/// Wonderware historian sidecar via on an
+/// exponential-backoff cadence, and operator acks never block on the historian
+/// being reachable.
///
///
///
@@ -28,7 +29,11 @@ namespace ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
/// Dead-lettered rows stay in place for the configured retention window (default
/// 30 days per Phase 7 plan decision #21) so operators can inspect + manually
/// retry before the sweeper purges them. Regular queue capacity is bounded —
-/// overflow evicts the oldest non-dead-lettered rows with a WARN log.
+/// overflow evicts the oldest non-dead-lettered rows with a WARN log. The
+/// durability guarantee is therefore bounded by :
+/// under a sustained historian outage, accepted events may be evicted before
+/// delivery. The counter makes
+/// overflow visible to operators without requiring the WARN log to be scraped.
///
///
/// Drain runs on a self-rescheduling one-shot .
@@ -67,11 +72,20 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
private Timer? _drainTimer;
private TimeSpan _tickInterval;
private int _backoffIndex;
+ private bool _disposed;
+
+ // Core.AlarmHistorian-005: status fields written by the drain timer thread and
+ // read concurrently by GetStatus() / health-check threads. Guard all reads and
+ // writes with this lock so the Admin UI never observes a torn or stale value.
+ private readonly object _statusLock = new();
private DateTime? _lastDrainUtc;
private DateTime? _lastSuccessUtc;
private string? _lastError;
private HistorianDrainState _drainState = HistorianDrainState.Idle;
- private bool _disposed;
+ // Core.AlarmHistorian-009: lifetime counter of rows evicted due to capacity overflow.
+ // Surfaces in HistorianSinkStatus so operators can see data-loss events without
+ // having to scrape the WARN log.
+ private long _evictedCount;
public SqliteStoreAndForwardSink(
string databasePath,
@@ -113,10 +127,24 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
{
var conn = new SqliteConnection(_connectionString);
conn.Open();
+ ApplyPragmas(conn);
+ return conn;
+ }
+
+ /// Apply busy_timeout + WAL pragmas to an already-open connection (sync).
+ private static void ApplyPragmas(SqliteConnection conn)
+ {
using var pragma = conn.CreateCommand();
pragma.CommandText = "PRAGMA busy_timeout=5000; PRAGMA journal_mode=WAL;";
pragma.ExecuteNonQuery();
- return conn;
+ }
+
+ /// Apply busy_timeout + WAL pragmas to an already-open connection (async).
+ private static async Task ApplyPragmasAsync(SqliteConnection conn, CancellationToken ct)
+ {
+ using var pragma = conn.CreateCommand();
+ pragma.CommandText = "PRAGMA busy_timeout=5000; PRAGMA journal_mode=WAL;";
+ await pragma.ExecuteNonQueryAsync(ct).ConfigureAwait(false);
}
///
@@ -153,8 +181,11 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
// Without this catch the fault would be an unobserved exception on an
// async-void timer callback — never logged, never surfaced. Record it
// so the Admin UI / health check sees the stalled drain.
- _lastError = ex.Message;
- _drainState = HistorianDrainState.BackingOff;
+ lock (_statusLock)
+ {
+ _lastError = ex.Message;
+ _drainState = HistorianDrainState.BackingOff;
+ }
_logger.Error(ex, "Historian drain tick faulted; will retry on next tick");
}
finally
@@ -167,23 +198,32 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
private void RescheduleDrain()
{
if (_disposed) return;
+ HistorianDrainState state;
+ lock (_statusLock) { state = _drainState; }
// While backing off, wait out the full ladder delay; otherwise the steady
// tick cadence. Never faster than tickInterval.
- var delay = _drainState == HistorianDrainState.BackingOff
+ var delay = state == HistorianDrainState.BackingOff
? (CurrentBackoff > _tickInterval ? CurrentBackoff : _tickInterval)
: _tickInterval;
try { _drainTimer?.Change(delay, Timeout.InfiniteTimeSpan); }
catch (ObjectDisposedException) { /* raced with Dispose — nothing to re-arm */ }
}
- public Task EnqueueAsync(AlarmHistorianEvent evt, CancellationToken cancellationToken)
+ // Core.AlarmHistorian-003: use async SQLite APIs so the emitting thread is not
+ // blocked waiting for a file-lock or disk write; honor the cancellationToken
+ // throughout. Microsoft.Data.Sqlite's async surface (OpenAsync /
+ // ExecuteNonQueryAsync) is a thin wrapper over the synchronous path, so the
+ // blocking still happens — but on a thread-pool thread, not the caller's thread.
+ public async Task EnqueueAsync(AlarmHistorianEvent evt, CancellationToken cancellationToken)
{
if (evt is null) throw new ArgumentNullException(nameof(evt));
if (_disposed) throw new ObjectDisposedException(nameof(SqliteStoreAndForwardSink));
- using var conn = OpenConnection();
+ using var conn = new SqliteConnection(_connectionString);
+ await conn.OpenAsync(cancellationToken).ConfigureAwait(false);
+ await ApplyPragmasAsync(conn, cancellationToken).ConfigureAwait(false);
- EnforceCapacity(conn);
+ await EnforceCapacityAsync(conn, cancellationToken).ConfigureAwait(false);
using var cmd = conn.CreateCommand();
cmd.CommandText = """
@@ -193,8 +233,7 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
cmd.Parameters.AddWithValue("$alarmId", evt.AlarmId);
cmd.Parameters.AddWithValue("$enqueued", _clock().ToString("O"));
cmd.Parameters.AddWithValue("$payload", JsonSerializer.Serialize(evt));
- cmd.ExecuteNonQuery();
- return Task.CompletedTask;
+ await cmd.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}
///
@@ -209,14 +248,17 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
if (!await _drainGate.WaitAsync(0, ct).ConfigureAwait(false)) return;
try
{
- _drainState = HistorianDrainState.Draining;
- _lastDrainUtc = _clock();
+ lock (_statusLock)
+ {
+ _drainState = HistorianDrainState.Draining;
+ _lastDrainUtc = _clock();
+ }
PurgeAgedDeadLetters();
var batch = ReadBatch();
if (batch.Count == 0)
{
- _drainState = HistorianDrainState.Idle;
+ lock (_statusLock) { _drainState = HistorianDrainState.Idle; }
return;
}
@@ -241,7 +283,7 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
if (events.Count == 0)
{
- _drainState = HistorianDrainState.Idle;
+ lock (_statusLock) { _drainState = HistorianDrainState.Idle; }
return;
}
@@ -249,7 +291,7 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
try
{
outcomes = await _writer.WriteBatchAsync(events, ct).ConfigureAwait(false);
- _lastError = null;
+ lock (_statusLock) { _lastError = null; }
}
catch (OperationCanceledException)
{
@@ -258,16 +300,35 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
catch (Exception ex)
{
// Writer-side exception — treat entire batch as RetryPlease.
- _lastError = ex.Message;
+ lock (_statusLock)
+ {
+ _lastError = ex.Message;
+ _drainState = HistorianDrainState.BackingOff;
+ }
_logger.Warning(ex, "Historian writer threw on batch of {Count}; deferring retry", events.Count);
BumpBackoff();
- _drainState = HistorianDrainState.BackingOff;
return;
}
+ // Core.AlarmHistorian-007: a cardinality mismatch is a writer contract
+ // violation — potentially the events were already persisted. Rather than
+ // throwing (which, pre -006 fix, was swallowed and left _drainState
+ // stale), treat it as a transient batch failure so the rows stay queued
+ // and the backoff surface becomes visible to the operator. A deterministic
+ // mismatch will stall the row until an operator intervenes or the writer
+ // is fixed — far safer than re-throwing into a fire-and-forget timer.
if (outcomes.Count != events.Count)
- throw new InvalidOperationException(
- $"Writer returned {outcomes.Count} outcomes for {events.Count} events — expected 1:1");
+ {
+ var msg = $"Writer returned {outcomes.Count} outcomes for {events.Count} events — expected 1:1; treating as batch retry";
+ lock (_statusLock)
+ {
+ _lastError = msg;
+ _drainState = HistorianDrainState.BackingOff;
+ }
+ _logger.Warning("Historian writer contract violation: {Msg}", msg);
+ BumpBackoff();
+ return;
+ }
using var conn = OpenConnection();
using var tx = conn.BeginTransaction();
@@ -291,18 +352,20 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
tx.Commit();
var acks = outcomes.Count(o => o == HistorianWriteOutcome.Ack);
- if (acks > 0) _lastSuccessUtc = _clock();
+ lock (_statusLock)
+ {
+ if (acks > 0) _lastSuccessUtc = _clock();
+
+ if (outcomes.Any(o => o == HistorianWriteOutcome.RetryPlease))
+ _drainState = HistorianDrainState.BackingOff;
+ else
+ _drainState = HistorianDrainState.Idle;
+ }
if (outcomes.Any(o => o == HistorianWriteOutcome.RetryPlease))
- {
BumpBackoff();
- _drainState = HistorianDrainState.BackingOff;
- }
else
- {
ResetBackoff();
- _drainState = HistorianDrainState.Idle;
- }
}
finally
{
@@ -327,13 +390,29 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
deadlettered = (long)(cmd.ExecuteScalar() ?? 0L);
}
+ // Core.AlarmHistorian-005: snapshot status fields atomically under the lock
+ // so the Admin UI never sees a torn DateTime? or stale DrainState.
+ DateTime? lastDrain, lastSuccess;
+ string? lastError;
+ HistorianDrainState drainState;
+ long evicted;
+ lock (_statusLock)
+ {
+ lastDrain = _lastDrainUtc;
+ lastSuccess = _lastSuccessUtc;
+ lastError = _lastError;
+ drainState = _drainState;
+ evicted = _evictedCount;
+ }
+
return new HistorianSinkStatus(
QueueDepth: queued,
DeadLetterDepth: deadlettered,
- LastDrainUtc: _lastDrainUtc,
- LastSuccessUtc: _lastSuccessUtc,
- LastError: _lastError,
- DrainState: _drainState);
+ LastDrainUtc: lastDrain,
+ LastSuccessUtc: lastSuccess,
+ LastError: lastError,
+ DrainState: drainState,
+ EvictedCount: evicted);
}
/// Operator action from Admin UI — retry every dead-lettered row. Non-cascading: they rejoin the regular queue + get a fresh backoff.
@@ -449,9 +528,44 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
cmd.Parameters.AddWithValue("$n", toEvict);
cmd.ExecuteNonQuery();
}
+ // Core.AlarmHistorian-009: increment the lifetime eviction counter so the
+ // Admin UI / health check can report overflow without requiring log scraping.
+ lock (_statusLock) { _evictedCount += toEvict; }
_logger.Warning(
- "Historian queue at capacity {Cap} — evicted {Count} oldest row(s) to make room",
- _capacity, toEvict);
+ "Historian queue at capacity {Cap} — evicted {Count} oldest row(s) to make room (lifetime evictions: {Total})",
+ _capacity, toEvict, _evictedCount);
+ }
+
+ // Async variant used by EnqueueAsync (Core.AlarmHistorian-003).
+ private async Task EnforceCapacityAsync(SqliteConnection conn, CancellationToken ct)
+ {
+ long count;
+ using (var cmd = conn.CreateCommand())
+ {
+ cmd.CommandText = "SELECT COUNT(*) FROM Queue WHERE DeadLettered = 0";
+ count = (long)(await cmd.ExecuteScalarAsync(ct).ConfigureAwait(false) ?? 0L);
+ }
+ if (count < _capacity) return;
+
+ var toEvict = count - _capacity + 1;
+ using (var cmd = conn.CreateCommand())
+ {
+ cmd.CommandText = """
+ DELETE FROM Queue
+ WHERE RowId IN (
+ SELECT RowId FROM Queue
+ WHERE DeadLettered = 0
+ ORDER BY RowId ASC
+ LIMIT $n
+ )
+ """;
+ cmd.Parameters.AddWithValue("$n", toEvict);
+ await cmd.ExecuteNonQueryAsync(ct).ConfigureAwait(false);
+ }
+ lock (_statusLock) { _evictedCount += toEvict; }
+ _logger.Warning(
+ "Historian queue at capacity {Cap} — evicted {Count} oldest row(s) to make room (lifetime evictions: {Total})",
+ _capacity, toEvict, _evictedCount);
}
private void PurgeAgedDeadLetters()
diff --git a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.AbCip/AbCipDriver.cs b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.AbCip/AbCipDriver.cs
index 7f88fb2..7ad86c3 100644
--- a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.AbCip/AbCipDriver.cs
+++ b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.AbCip/AbCipDriver.cs
@@ -5,9 +5,8 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.AbCip;
///
/// Allen-Bradley CIP / EtherNet-IP driver for ControlLogix / CompactLogix / Micro800 /
-/// GuardLogix families. Implements only for now — read/write/
-/// subscribe/discover capabilities ship in subsequent PRs (3–8) and family-specific quirk
-/// profiles ship in PRs 9–12.
+/// GuardLogix families. Implements all read/write/subscribe/discover/probe/alarm
+/// capabilities via the libplctag.NET wrapper.
///
///
/// Wire layer is libplctag 1.6.x (plan decision #11). Per-device host addresses use
@@ -17,8 +16,11 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.AbCip;
///
/// Tier A per plan decisions #143–145 — in-process, shares server lifetime, no
/// sidecar. is the Tier-B escape hatch for recovering
-/// from native-heap growth that the CLR allocator can't see; it tears down every
-/// and reconnects each device.
+/// from native-heap growth that the CLR allocator can't see; it tears down the
+/// libplctag.NET Tag instances held in DeviceState.Runtimes and reconnects
+/// each device. Native tag lifetime is owned by the libplctag.NET Tag.Dispose()
+/// (called in ); the library's own finalizer
+/// handles GC-collected tags.
///
public sealed class AbCipDriver : IDriver, IReadable, IWritable, ITagDiscovery, ISubscribable,
IHostConnectivityProbe, IPerCallHostResolver, IAlarmSource, IDisposable, IAsyncDisposable
@@ -874,8 +876,10 @@ public sealed class AbCipDriver : IDriver, IReadable, IWritable, ITagDiscovery,
///
/// Per-device runtime state. Holds the parsed host address, family profile, and the
- /// live cache keyed by tag path. PRs 3–8 populate + consume
- /// this dict via libplctag.
+ /// live libplctag.NET instances keyed by tag name.
+ /// Native tag lifetime is owned by the Tag.Dispose() inside each
+ /// ; libplctag.NET's own finalizer covers GC-collected
+ /// instances so no separate SafeHandle wrapper is needed here (Driver.AbCip-006).
///
internal sealed class DeviceState(
AbCipHostAddress parsedAddress,
@@ -901,9 +905,6 @@ public sealed class AbCipDriver : IDriver, IReadable, IWritable, ITagDiscovery,
///
public Task? ProbeTask { get; set; }
- public Dictionary TagHandles { get; } =
- new(StringComparer.OrdinalIgnoreCase);
-
///
/// Per-tag runtime handles owned by this device. One entry per configured tag is
/// created lazily on first read (see ).
@@ -930,8 +931,6 @@ public sealed class AbCipDriver : IDriver, IReadable, IWritable, ITagDiscovery,
public void DisposeHandles()
{
- foreach (var h in TagHandles.Values) h.Dispose();
- TagHandles.Clear();
foreach (var r in Runtimes.Values) r.Dispose();
Runtimes.Clear();
foreach (var r in ParentRuntimes.Values) r.Dispose();
diff --git a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.AbCip.Tests/AbCipDriverTests.cs b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.AbCip.Tests/AbCipDriverTests.cs
index a489d2d..97393b0 100644
--- a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.AbCip.Tests/AbCipDriverTests.cs
+++ b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.AbCip.Tests/AbCipDriverTests.cs
@@ -103,22 +103,6 @@ public sealed class AbCipDriverTests
AbCipPlcFamilyProfile.GuardLogix.LibplctagPlcAttribute.ShouldBe("controllogix");
}
- [Fact]
- public void PlcTagHandle_IsInvalid_for_zero_or_negative_native_id()
- {
- PlcTagHandle.FromNative(-5).IsInvalid.ShouldBeTrue();
- PlcTagHandle.FromNative(0).IsInvalid.ShouldBeTrue();
- PlcTagHandle.FromNative(42).IsInvalid.ShouldBeFalse();
- }
-
- [Fact]
- public void PlcTagHandle_Dispose_is_idempotent()
- {
- var h = PlcTagHandle.FromNative(42);
- h.Dispose();
- h.Dispose(); // must not throw
- }
-
[Fact]
public void AbCipDataType_maps_atomics_to_driver_types()
{