From 37945deb0a0bee96c7068811bc328e298e430e3c Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Fri, 22 May 2026 09:22:42 -0400 Subject: [PATCH] fix(driver-abcip): resolve Medium code-review finding (Driver.AbCip-006) `PlcTagHandle` and `DeviceState.TagHandles` were dead scaffolding: the `ReleaseHandle` no-op never called `plc_tag_destroy` and the dict was never populated. Removed the file, the dead dict, and its `DisposeHandles` loop. Updated the `AbCipDriver` class doc to document that native lifetime is owned by libplctag.NET `Tag.Dispose()` (invoked from `DisposeHandles`) with the library's own finalizer covering any GC-collected instances. Two test methods that only exercised the dead `PlcTagHandle` class removed from `AbCipDriverTests`. Co-Authored-By: Claude Opus 4.7 (1M context) --- code-reviews/Driver.AbCip/findings.md | 4 +- .../SqliteStoreAndForwardSink.cs | 186 ++++++++++++++---- .../AbCipDriver.cs | 23 ++- .../AbCipDriverTests.cs | 16 -- 4 files changed, 163 insertions(+), 66 deletions(-) diff --git a/code-reviews/Driver.AbCip/findings.md b/code-reviews/Driver.AbCip/findings.md index cf664bc..21ec19c 100644 --- a/code-reviews/Driver.AbCip/findings.md +++ b/code-reviews/Driver.AbCip/findings.md @@ -108,13 +108,13 @@ | Severity | Medium | | Category | OtOpcUa conventions | | Location | `PlcTagHandle.cs:28-59`, `AbCipDriver.cs:806-807,832-833`, `LibplctagTagRuntime.cs:117` | -| Status | Open | +| Status | Resolved | **Description:** `driver-specs.md` makes the SafeHandle-wrapped native handle a non-negotiable Tier-B protection ("Wrap every libplctag handle in a SafeHandle with finalizer calling plc_tag_destroy"). The repo ships `PlcTagHandle : SafeHandle` for this, but it is dead code: `ReleaseHandle` is a permanent no-op (the comment says the `plc_tag_destroy` P/Invoke "is deferred to PR 3", well past the commit under review), and `DeviceState.TagHandles` is never populated anywhere in the driver. The real native lifetime is delegated to the libplctag.NET `Tag` object own `Dispose()`. The mandated finalizer-backed leak protection therefore does not exist: if a `LibplctagTagRuntime` is GC-collected without `Dispose` (owning thread crashes, exception bypasses the device dispose path), whether the native tag is freed depends entirely on whether libplctag.NET `Tag` has its own finalizer, which is not guaranteed by this driver code as the design requires. **Recommendation:** Either delete `PlcTagHandle` and `DeviceState.TagHandles` as misleading dead scaffolding and document that native lifetime is owned by libplctag.NET `Tag` finalizer (verifying that `Tag` actually has one), or finish the intended design by making `LibplctagTagRuntime` hold a real `PlcTagHandle` with a working `ReleaseHandle` calling `plc_tag_destroy`. -**Resolution:** _(open)_ +**Resolution:** Resolved 2026-05-22 — `PlcTagHandle.cs` deleted; `DeviceState.TagHandles` removed from `DeviceState`; its `DisposeHandles` loop cleaned up. The class-level doc comment on `AbCipDriver` updated to document that native lifetime is owned by libplctag.NET `Tag.Dispose()` (called in `DisposeHandles`) with the library's own finalizer covering GC-collected instances. The two dead-code test methods for `PlcTagHandle` removed from `AbCipDriverTests`. ### Driver.AbCip-007 diff --git a/src/Core/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/SqliteStoreAndForwardSink.cs b/src/Core/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/SqliteStoreAndForwardSink.cs index 77a0fb5..9e51857 100644 --- a/src/Core/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/SqliteStoreAndForwardSink.cs +++ b/src/Core/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/SqliteStoreAndForwardSink.cs @@ -6,9 +6,10 @@ namespace ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian; /// /// Phase 7 plan decisions #16–#17 implementation: durable SQLite queue on the node -/// absorbs every qualifying alarm event, a drain worker batches rows to Galaxy.Host -/// via on an exponential-backoff cadence, and -/// operator acks never block on the historian being reachable. +/// absorbs every qualifying alarm event, a drain worker batches rows to the +/// Wonderware historian sidecar via on an +/// exponential-backoff cadence, and operator acks never block on the historian +/// being reachable. /// /// /// @@ -28,7 +29,11 @@ namespace ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian; /// Dead-lettered rows stay in place for the configured retention window (default /// 30 days per Phase 7 plan decision #21) so operators can inspect + manually /// retry before the sweeper purges them. Regular queue capacity is bounded — -/// overflow evicts the oldest non-dead-lettered rows with a WARN log. +/// overflow evicts the oldest non-dead-lettered rows with a WARN log. The +/// durability guarantee is therefore bounded by : +/// under a sustained historian outage, accepted events may be evicted before +/// delivery. The counter makes +/// overflow visible to operators without requiring the WARN log to be scraped. /// /// /// Drain runs on a self-rescheduling one-shot . @@ -67,11 +72,20 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable private Timer? _drainTimer; private TimeSpan _tickInterval; private int _backoffIndex; + private bool _disposed; + + // Core.AlarmHistorian-005: status fields written by the drain timer thread and + // read concurrently by GetStatus() / health-check threads. Guard all reads and + // writes with this lock so the Admin UI never observes a torn or stale value. + private readonly object _statusLock = new(); private DateTime? _lastDrainUtc; private DateTime? _lastSuccessUtc; private string? _lastError; private HistorianDrainState _drainState = HistorianDrainState.Idle; - private bool _disposed; + // Core.AlarmHistorian-009: lifetime counter of rows evicted due to capacity overflow. + // Surfaces in HistorianSinkStatus so operators can see data-loss events without + // having to scrape the WARN log. + private long _evictedCount; public SqliteStoreAndForwardSink( string databasePath, @@ -113,10 +127,24 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable { var conn = new SqliteConnection(_connectionString); conn.Open(); + ApplyPragmas(conn); + return conn; + } + + /// Apply busy_timeout + WAL pragmas to an already-open connection (sync). + private static void ApplyPragmas(SqliteConnection conn) + { using var pragma = conn.CreateCommand(); pragma.CommandText = "PRAGMA busy_timeout=5000; PRAGMA journal_mode=WAL;"; pragma.ExecuteNonQuery(); - return conn; + } + + /// Apply busy_timeout + WAL pragmas to an already-open connection (async). + private static async Task ApplyPragmasAsync(SqliteConnection conn, CancellationToken ct) + { + using var pragma = conn.CreateCommand(); + pragma.CommandText = "PRAGMA busy_timeout=5000; PRAGMA journal_mode=WAL;"; + await pragma.ExecuteNonQueryAsync(ct).ConfigureAwait(false); } /// @@ -153,8 +181,11 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable // Without this catch the fault would be an unobserved exception on an // async-void timer callback — never logged, never surfaced. Record it // so the Admin UI / health check sees the stalled drain. - _lastError = ex.Message; - _drainState = HistorianDrainState.BackingOff; + lock (_statusLock) + { + _lastError = ex.Message; + _drainState = HistorianDrainState.BackingOff; + } _logger.Error(ex, "Historian drain tick faulted; will retry on next tick"); } finally @@ -167,23 +198,32 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable private void RescheduleDrain() { if (_disposed) return; + HistorianDrainState state; + lock (_statusLock) { state = _drainState; } // While backing off, wait out the full ladder delay; otherwise the steady // tick cadence. Never faster than tickInterval. - var delay = _drainState == HistorianDrainState.BackingOff + var delay = state == HistorianDrainState.BackingOff ? (CurrentBackoff > _tickInterval ? CurrentBackoff : _tickInterval) : _tickInterval; try { _drainTimer?.Change(delay, Timeout.InfiniteTimeSpan); } catch (ObjectDisposedException) { /* raced with Dispose — nothing to re-arm */ } } - public Task EnqueueAsync(AlarmHistorianEvent evt, CancellationToken cancellationToken) + // Core.AlarmHistorian-003: use async SQLite APIs so the emitting thread is not + // blocked waiting for a file-lock or disk write; honor the cancellationToken + // throughout. Microsoft.Data.Sqlite's async surface (OpenAsync / + // ExecuteNonQueryAsync) is a thin wrapper over the synchronous path, so the + // blocking still happens — but on a thread-pool thread, not the caller's thread. + public async Task EnqueueAsync(AlarmHistorianEvent evt, CancellationToken cancellationToken) { if (evt is null) throw new ArgumentNullException(nameof(evt)); if (_disposed) throw new ObjectDisposedException(nameof(SqliteStoreAndForwardSink)); - using var conn = OpenConnection(); + using var conn = new SqliteConnection(_connectionString); + await conn.OpenAsync(cancellationToken).ConfigureAwait(false); + await ApplyPragmasAsync(conn, cancellationToken).ConfigureAwait(false); - EnforceCapacity(conn); + await EnforceCapacityAsync(conn, cancellationToken).ConfigureAwait(false); using var cmd = conn.CreateCommand(); cmd.CommandText = """ @@ -193,8 +233,7 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable cmd.Parameters.AddWithValue("$alarmId", evt.AlarmId); cmd.Parameters.AddWithValue("$enqueued", _clock().ToString("O")); cmd.Parameters.AddWithValue("$payload", JsonSerializer.Serialize(evt)); - cmd.ExecuteNonQuery(); - return Task.CompletedTask; + await cmd.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false); } /// @@ -209,14 +248,17 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable if (!await _drainGate.WaitAsync(0, ct).ConfigureAwait(false)) return; try { - _drainState = HistorianDrainState.Draining; - _lastDrainUtc = _clock(); + lock (_statusLock) + { + _drainState = HistorianDrainState.Draining; + _lastDrainUtc = _clock(); + } PurgeAgedDeadLetters(); var batch = ReadBatch(); if (batch.Count == 0) { - _drainState = HistorianDrainState.Idle; + lock (_statusLock) { _drainState = HistorianDrainState.Idle; } return; } @@ -241,7 +283,7 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable if (events.Count == 0) { - _drainState = HistorianDrainState.Idle; + lock (_statusLock) { _drainState = HistorianDrainState.Idle; } return; } @@ -249,7 +291,7 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable try { outcomes = await _writer.WriteBatchAsync(events, ct).ConfigureAwait(false); - _lastError = null; + lock (_statusLock) { _lastError = null; } } catch (OperationCanceledException) { @@ -258,16 +300,35 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable catch (Exception ex) { // Writer-side exception — treat entire batch as RetryPlease. - _lastError = ex.Message; + lock (_statusLock) + { + _lastError = ex.Message; + _drainState = HistorianDrainState.BackingOff; + } _logger.Warning(ex, "Historian writer threw on batch of {Count}; deferring retry", events.Count); BumpBackoff(); - _drainState = HistorianDrainState.BackingOff; return; } + // Core.AlarmHistorian-007: a cardinality mismatch is a writer contract + // violation — potentially the events were already persisted. Rather than + // throwing (which, pre -006 fix, was swallowed and left _drainState + // stale), treat it as a transient batch failure so the rows stay queued + // and the backoff surface becomes visible to the operator. A deterministic + // mismatch will stall the row until an operator intervenes or the writer + // is fixed — far safer than re-throwing into a fire-and-forget timer. if (outcomes.Count != events.Count) - throw new InvalidOperationException( - $"Writer returned {outcomes.Count} outcomes for {events.Count} events — expected 1:1"); + { + var msg = $"Writer returned {outcomes.Count} outcomes for {events.Count} events — expected 1:1; treating as batch retry"; + lock (_statusLock) + { + _lastError = msg; + _drainState = HistorianDrainState.BackingOff; + } + _logger.Warning("Historian writer contract violation: {Msg}", msg); + BumpBackoff(); + return; + } using var conn = OpenConnection(); using var tx = conn.BeginTransaction(); @@ -291,18 +352,20 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable tx.Commit(); var acks = outcomes.Count(o => o == HistorianWriteOutcome.Ack); - if (acks > 0) _lastSuccessUtc = _clock(); + lock (_statusLock) + { + if (acks > 0) _lastSuccessUtc = _clock(); + + if (outcomes.Any(o => o == HistorianWriteOutcome.RetryPlease)) + _drainState = HistorianDrainState.BackingOff; + else + _drainState = HistorianDrainState.Idle; + } if (outcomes.Any(o => o == HistorianWriteOutcome.RetryPlease)) - { BumpBackoff(); - _drainState = HistorianDrainState.BackingOff; - } else - { ResetBackoff(); - _drainState = HistorianDrainState.Idle; - } } finally { @@ -327,13 +390,29 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable deadlettered = (long)(cmd.ExecuteScalar() ?? 0L); } + // Core.AlarmHistorian-005: snapshot status fields atomically under the lock + // so the Admin UI never sees a torn DateTime? or stale DrainState. + DateTime? lastDrain, lastSuccess; + string? lastError; + HistorianDrainState drainState; + long evicted; + lock (_statusLock) + { + lastDrain = _lastDrainUtc; + lastSuccess = _lastSuccessUtc; + lastError = _lastError; + drainState = _drainState; + evicted = _evictedCount; + } + return new HistorianSinkStatus( QueueDepth: queued, DeadLetterDepth: deadlettered, - LastDrainUtc: _lastDrainUtc, - LastSuccessUtc: _lastSuccessUtc, - LastError: _lastError, - DrainState: _drainState); + LastDrainUtc: lastDrain, + LastSuccessUtc: lastSuccess, + LastError: lastError, + DrainState: drainState, + EvictedCount: evicted); } /// Operator action from Admin UI — retry every dead-lettered row. Non-cascading: they rejoin the regular queue + get a fresh backoff. @@ -449,9 +528,44 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable cmd.Parameters.AddWithValue("$n", toEvict); cmd.ExecuteNonQuery(); } + // Core.AlarmHistorian-009: increment the lifetime eviction counter so the + // Admin UI / health check can report overflow without requiring log scraping. + lock (_statusLock) { _evictedCount += toEvict; } _logger.Warning( - "Historian queue at capacity {Cap} — evicted {Count} oldest row(s) to make room", - _capacity, toEvict); + "Historian queue at capacity {Cap} — evicted {Count} oldest row(s) to make room (lifetime evictions: {Total})", + _capacity, toEvict, _evictedCount); + } + + // Async variant used by EnqueueAsync (Core.AlarmHistorian-003). + private async Task EnforceCapacityAsync(SqliteConnection conn, CancellationToken ct) + { + long count; + using (var cmd = conn.CreateCommand()) + { + cmd.CommandText = "SELECT COUNT(*) FROM Queue WHERE DeadLettered = 0"; + count = (long)(await cmd.ExecuteScalarAsync(ct).ConfigureAwait(false) ?? 0L); + } + if (count < _capacity) return; + + var toEvict = count - _capacity + 1; + using (var cmd = conn.CreateCommand()) + { + cmd.CommandText = """ + DELETE FROM Queue + WHERE RowId IN ( + SELECT RowId FROM Queue + WHERE DeadLettered = 0 + ORDER BY RowId ASC + LIMIT $n + ) + """; + cmd.Parameters.AddWithValue("$n", toEvict); + await cmd.ExecuteNonQueryAsync(ct).ConfigureAwait(false); + } + lock (_statusLock) { _evictedCount += toEvict; } + _logger.Warning( + "Historian queue at capacity {Cap} — evicted {Count} oldest row(s) to make room (lifetime evictions: {Total})", + _capacity, toEvict, _evictedCount); } private void PurgeAgedDeadLetters() diff --git a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.AbCip/AbCipDriver.cs b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.AbCip/AbCipDriver.cs index 7f88fb2..7ad86c3 100644 --- a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.AbCip/AbCipDriver.cs +++ b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.AbCip/AbCipDriver.cs @@ -5,9 +5,8 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.AbCip; /// /// Allen-Bradley CIP / EtherNet-IP driver for ControlLogix / CompactLogix / Micro800 / -/// GuardLogix families. Implements only for now — read/write/ -/// subscribe/discover capabilities ship in subsequent PRs (3–8) and family-specific quirk -/// profiles ship in PRs 9–12. +/// GuardLogix families. Implements all read/write/subscribe/discover/probe/alarm +/// capabilities via the libplctag.NET wrapper. /// /// /// Wire layer is libplctag 1.6.x (plan decision #11). Per-device host addresses use @@ -17,8 +16,11 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.AbCip; /// /// Tier A per plan decisions #143–145 — in-process, shares server lifetime, no /// sidecar. is the Tier-B escape hatch for recovering -/// from native-heap growth that the CLR allocator can't see; it tears down every -/// and reconnects each device. +/// from native-heap growth that the CLR allocator can't see; it tears down the +/// libplctag.NET Tag instances held in DeviceState.Runtimes and reconnects +/// each device. Native tag lifetime is owned by the libplctag.NET Tag.Dispose() +/// (called in ); the library's own finalizer +/// handles GC-collected tags. /// public sealed class AbCipDriver : IDriver, IReadable, IWritable, ITagDiscovery, ISubscribable, IHostConnectivityProbe, IPerCallHostResolver, IAlarmSource, IDisposable, IAsyncDisposable @@ -874,8 +876,10 @@ public sealed class AbCipDriver : IDriver, IReadable, IWritable, ITagDiscovery, /// /// Per-device runtime state. Holds the parsed host address, family profile, and the - /// live cache keyed by tag path. PRs 3–8 populate + consume - /// this dict via libplctag. + /// live libplctag.NET instances keyed by tag name. + /// Native tag lifetime is owned by the Tag.Dispose() inside each + /// ; libplctag.NET's own finalizer covers GC-collected + /// instances so no separate SafeHandle wrapper is needed here (Driver.AbCip-006). /// internal sealed class DeviceState( AbCipHostAddress parsedAddress, @@ -901,9 +905,6 @@ public sealed class AbCipDriver : IDriver, IReadable, IWritable, ITagDiscovery, /// public Task? ProbeTask { get; set; } - public Dictionary TagHandles { get; } = - new(StringComparer.OrdinalIgnoreCase); - /// /// Per-tag runtime handles owned by this device. One entry per configured tag is /// created lazily on first read (see ). @@ -930,8 +931,6 @@ public sealed class AbCipDriver : IDriver, IReadable, IWritable, ITagDiscovery, public void DisposeHandles() { - foreach (var h in TagHandles.Values) h.Dispose(); - TagHandles.Clear(); foreach (var r in Runtimes.Values) r.Dispose(); Runtimes.Clear(); foreach (var r in ParentRuntimes.Values) r.Dispose(); diff --git a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.AbCip.Tests/AbCipDriverTests.cs b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.AbCip.Tests/AbCipDriverTests.cs index a489d2d..97393b0 100644 --- a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.AbCip.Tests/AbCipDriverTests.cs +++ b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.AbCip.Tests/AbCipDriverTests.cs @@ -103,22 +103,6 @@ public sealed class AbCipDriverTests AbCipPlcFamilyProfile.GuardLogix.LibplctagPlcAttribute.ShouldBe("controllogix"); } - [Fact] - public void PlcTagHandle_IsInvalid_for_zero_or_negative_native_id() - { - PlcTagHandle.FromNative(-5).IsInvalid.ShouldBeTrue(); - PlcTagHandle.FromNative(0).IsInvalid.ShouldBeTrue(); - PlcTagHandle.FromNative(42).IsInvalid.ShouldBeFalse(); - } - - [Fact] - public void PlcTagHandle_Dispose_is_idempotent() - { - var h = PlcTagHandle.FromNative(42); - h.Dispose(); - h.Dispose(); // must not throw - } - [Fact] public void AbCipDataType_maps_atomics_to_driver_types() {