fix(driver-abcip): resolve Medium code-review finding (Driver.AbCip-006)
`PlcTagHandle` and `DeviceState.TagHandles` were dead scaffolding: the `ReleaseHandle` no-op never called `plc_tag_destroy` and the dict was never populated. Removed the file, the dead dict, and its `DisposeHandles` loop. Updated the `AbCipDriver` class doc to document that native lifetime is owned by libplctag.NET `Tag.Dispose()` (invoked from `DisposeHandles`) with the library's own finalizer covering any GC-collected instances. Two test methods that only exercised the dead `PlcTagHandle` class removed from `AbCipDriverTests`. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -6,9 +6,10 @@ namespace ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
|
||||
|
||||
/// <summary>
|
||||
/// Phase 7 plan decisions #16–#17 implementation: durable SQLite queue on the node
|
||||
/// absorbs every qualifying alarm event, a drain worker batches rows to Galaxy.Host
|
||||
/// via <see cref="IAlarmHistorianWriter"/> on an exponential-backoff cadence, and
|
||||
/// operator acks never block on the historian being reachable.
|
||||
/// absorbs every qualifying alarm event, a drain worker batches rows to the
|
||||
/// Wonderware historian sidecar via <see cref="IAlarmHistorianWriter"/> on an
|
||||
/// exponential-backoff cadence, and operator acks never block on the historian
|
||||
/// being reachable.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
@@ -28,7 +29,11 @@ namespace ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian;
|
||||
/// Dead-lettered rows stay in place for the configured retention window (default
|
||||
/// 30 days per Phase 7 plan decision #21) so operators can inspect + manually
|
||||
/// retry before the sweeper purges them. Regular queue capacity is bounded —
|
||||
/// overflow evicts the oldest non-dead-lettered rows with a WARN log.
|
||||
/// overflow evicts the oldest non-dead-lettered rows with a WARN log. The
|
||||
/// durability guarantee is therefore bounded by <see cref="DefaultCapacity"/>:
|
||||
/// under a sustained historian outage, accepted events may be evicted before
|
||||
/// delivery. The <see cref="HistorianSinkStatus.EvictedCount"/> counter makes
|
||||
/// overflow visible to operators without requiring the WARN log to be scraped.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// Drain runs on a self-rescheduling one-shot <see cref="System.Threading.Timer"/>.
|
||||
@@ -67,11 +72,20 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
|
||||
private Timer? _drainTimer;
|
||||
private TimeSpan _tickInterval;
|
||||
private int _backoffIndex;
|
||||
private bool _disposed;
|
||||
|
||||
// Core.AlarmHistorian-005: status fields written by the drain timer thread and
|
||||
// read concurrently by GetStatus() / health-check threads. Guard all reads and
|
||||
// writes with this lock so the Admin UI never observes a torn or stale value.
|
||||
private readonly object _statusLock = new();
|
||||
private DateTime? _lastDrainUtc;
|
||||
private DateTime? _lastSuccessUtc;
|
||||
private string? _lastError;
|
||||
private HistorianDrainState _drainState = HistorianDrainState.Idle;
|
||||
private bool _disposed;
|
||||
// Core.AlarmHistorian-009: lifetime counter of rows evicted due to capacity overflow.
|
||||
// Surfaces in HistorianSinkStatus so operators can see data-loss events without
|
||||
// having to scrape the WARN log.
|
||||
private long _evictedCount;
|
||||
|
||||
public SqliteStoreAndForwardSink(
|
||||
string databasePath,
|
||||
@@ -113,10 +127,24 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
|
||||
{
|
||||
var conn = new SqliteConnection(_connectionString);
|
||||
conn.Open();
|
||||
ApplyPragmas(conn);
|
||||
return conn;
|
||||
}
|
||||
|
||||
/// <summary>Apply busy_timeout + WAL pragmas to an already-open connection (sync).</summary>
|
||||
private static void ApplyPragmas(SqliteConnection conn)
|
||||
{
|
||||
using var pragma = conn.CreateCommand();
|
||||
pragma.CommandText = "PRAGMA busy_timeout=5000; PRAGMA journal_mode=WAL;";
|
||||
pragma.ExecuteNonQuery();
|
||||
return conn;
|
||||
}
|
||||
|
||||
/// <summary>Apply busy_timeout + WAL pragmas to an already-open connection (async).</summary>
|
||||
private static async Task ApplyPragmasAsync(SqliteConnection conn, CancellationToken ct)
|
||||
{
|
||||
using var pragma = conn.CreateCommand();
|
||||
pragma.CommandText = "PRAGMA busy_timeout=5000; PRAGMA journal_mode=WAL;";
|
||||
await pragma.ExecuteNonQueryAsync(ct).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@@ -153,8 +181,11 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
|
||||
// Without this catch the fault would be an unobserved exception on an
|
||||
// async-void timer callback — never logged, never surfaced. Record it
|
||||
// so the Admin UI / health check sees the stalled drain.
|
||||
_lastError = ex.Message;
|
||||
_drainState = HistorianDrainState.BackingOff;
|
||||
lock (_statusLock)
|
||||
{
|
||||
_lastError = ex.Message;
|
||||
_drainState = HistorianDrainState.BackingOff;
|
||||
}
|
||||
_logger.Error(ex, "Historian drain tick faulted; will retry on next tick");
|
||||
}
|
||||
finally
|
||||
@@ -167,23 +198,32 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
|
||||
private void RescheduleDrain()
|
||||
{
|
||||
if (_disposed) return;
|
||||
HistorianDrainState state;
|
||||
lock (_statusLock) { state = _drainState; }
|
||||
// While backing off, wait out the full ladder delay; otherwise the steady
|
||||
// tick cadence. Never faster than tickInterval.
|
||||
var delay = _drainState == HistorianDrainState.BackingOff
|
||||
var delay = state == HistorianDrainState.BackingOff
|
||||
? (CurrentBackoff > _tickInterval ? CurrentBackoff : _tickInterval)
|
||||
: _tickInterval;
|
||||
try { _drainTimer?.Change(delay, Timeout.InfiniteTimeSpan); }
|
||||
catch (ObjectDisposedException) { /* raced with Dispose — nothing to re-arm */ }
|
||||
}
|
||||
|
||||
public Task EnqueueAsync(AlarmHistorianEvent evt, CancellationToken cancellationToken)
|
||||
// Core.AlarmHistorian-003: use async SQLite APIs so the emitting thread is not
|
||||
// blocked waiting for a file-lock or disk write; honor the cancellationToken
|
||||
// throughout. Microsoft.Data.Sqlite's async surface (OpenAsync /
|
||||
// ExecuteNonQueryAsync) is a thin wrapper over the synchronous path, so the
|
||||
// blocking still happens — but on a thread-pool thread, not the caller's thread.
|
||||
public async Task EnqueueAsync(AlarmHistorianEvent evt, CancellationToken cancellationToken)
|
||||
{
|
||||
if (evt is null) throw new ArgumentNullException(nameof(evt));
|
||||
if (_disposed) throw new ObjectDisposedException(nameof(SqliteStoreAndForwardSink));
|
||||
|
||||
using var conn = OpenConnection();
|
||||
using var conn = new SqliteConnection(_connectionString);
|
||||
await conn.OpenAsync(cancellationToken).ConfigureAwait(false);
|
||||
await ApplyPragmasAsync(conn, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
EnforceCapacity(conn);
|
||||
await EnforceCapacityAsync(conn, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
using var cmd = conn.CreateCommand();
|
||||
cmd.CommandText = """
|
||||
@@ -193,8 +233,7 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
|
||||
cmd.Parameters.AddWithValue("$alarmId", evt.AlarmId);
|
||||
cmd.Parameters.AddWithValue("$enqueued", _clock().ToString("O"));
|
||||
cmd.Parameters.AddWithValue("$payload", JsonSerializer.Serialize(evt));
|
||||
cmd.ExecuteNonQuery();
|
||||
return Task.CompletedTask;
|
||||
await cmd.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@@ -209,14 +248,17 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
|
||||
if (!await _drainGate.WaitAsync(0, ct).ConfigureAwait(false)) return;
|
||||
try
|
||||
{
|
||||
_drainState = HistorianDrainState.Draining;
|
||||
_lastDrainUtc = _clock();
|
||||
lock (_statusLock)
|
||||
{
|
||||
_drainState = HistorianDrainState.Draining;
|
||||
_lastDrainUtc = _clock();
|
||||
}
|
||||
|
||||
PurgeAgedDeadLetters();
|
||||
var batch = ReadBatch();
|
||||
if (batch.Count == 0)
|
||||
{
|
||||
_drainState = HistorianDrainState.Idle;
|
||||
lock (_statusLock) { _drainState = HistorianDrainState.Idle; }
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -241,7 +283,7 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
|
||||
|
||||
if (events.Count == 0)
|
||||
{
|
||||
_drainState = HistorianDrainState.Idle;
|
||||
lock (_statusLock) { _drainState = HistorianDrainState.Idle; }
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -249,7 +291,7 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
|
||||
try
|
||||
{
|
||||
outcomes = await _writer.WriteBatchAsync(events, ct).ConfigureAwait(false);
|
||||
_lastError = null;
|
||||
lock (_statusLock) { _lastError = null; }
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
@@ -258,16 +300,35 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
|
||||
catch (Exception ex)
|
||||
{
|
||||
// Writer-side exception — treat entire batch as RetryPlease.
|
||||
_lastError = ex.Message;
|
||||
lock (_statusLock)
|
||||
{
|
||||
_lastError = ex.Message;
|
||||
_drainState = HistorianDrainState.BackingOff;
|
||||
}
|
||||
_logger.Warning(ex, "Historian writer threw on batch of {Count}; deferring retry", events.Count);
|
||||
BumpBackoff();
|
||||
_drainState = HistorianDrainState.BackingOff;
|
||||
return;
|
||||
}
|
||||
|
||||
// Core.AlarmHistorian-007: a cardinality mismatch is a writer contract
|
||||
// violation — potentially the events were already persisted. Rather than
|
||||
// throwing (which, pre -006 fix, was swallowed and left _drainState
|
||||
// stale), treat it as a transient batch failure so the rows stay queued
|
||||
// and the backoff surface becomes visible to the operator. A deterministic
|
||||
// mismatch will stall the row until an operator intervenes or the writer
|
||||
// is fixed — far safer than re-throwing into a fire-and-forget timer.
|
||||
if (outcomes.Count != events.Count)
|
||||
throw new InvalidOperationException(
|
||||
$"Writer returned {outcomes.Count} outcomes for {events.Count} events — expected 1:1");
|
||||
{
|
||||
var msg = $"Writer returned {outcomes.Count} outcomes for {events.Count} events — expected 1:1; treating as batch retry";
|
||||
lock (_statusLock)
|
||||
{
|
||||
_lastError = msg;
|
||||
_drainState = HistorianDrainState.BackingOff;
|
||||
}
|
||||
_logger.Warning("Historian writer contract violation: {Msg}", msg);
|
||||
BumpBackoff();
|
||||
return;
|
||||
}
|
||||
|
||||
using var conn = OpenConnection();
|
||||
using var tx = conn.BeginTransaction();
|
||||
@@ -291,18 +352,20 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
|
||||
tx.Commit();
|
||||
|
||||
var acks = outcomes.Count(o => o == HistorianWriteOutcome.Ack);
|
||||
if (acks > 0) _lastSuccessUtc = _clock();
|
||||
lock (_statusLock)
|
||||
{
|
||||
if (acks > 0) _lastSuccessUtc = _clock();
|
||||
|
||||
if (outcomes.Any(o => o == HistorianWriteOutcome.RetryPlease))
|
||||
_drainState = HistorianDrainState.BackingOff;
|
||||
else
|
||||
_drainState = HistorianDrainState.Idle;
|
||||
}
|
||||
|
||||
if (outcomes.Any(o => o == HistorianWriteOutcome.RetryPlease))
|
||||
{
|
||||
BumpBackoff();
|
||||
_drainState = HistorianDrainState.BackingOff;
|
||||
}
|
||||
else
|
||||
{
|
||||
ResetBackoff();
|
||||
_drainState = HistorianDrainState.Idle;
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
@@ -327,13 +390,29 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
|
||||
deadlettered = (long)(cmd.ExecuteScalar() ?? 0L);
|
||||
}
|
||||
|
||||
// Core.AlarmHistorian-005: snapshot status fields atomically under the lock
|
||||
// so the Admin UI never sees a torn DateTime? or stale DrainState.
|
||||
DateTime? lastDrain, lastSuccess;
|
||||
string? lastError;
|
||||
HistorianDrainState drainState;
|
||||
long evicted;
|
||||
lock (_statusLock)
|
||||
{
|
||||
lastDrain = _lastDrainUtc;
|
||||
lastSuccess = _lastSuccessUtc;
|
||||
lastError = _lastError;
|
||||
drainState = _drainState;
|
||||
evicted = _evictedCount;
|
||||
}
|
||||
|
||||
return new HistorianSinkStatus(
|
||||
QueueDepth: queued,
|
||||
DeadLetterDepth: deadlettered,
|
||||
LastDrainUtc: _lastDrainUtc,
|
||||
LastSuccessUtc: _lastSuccessUtc,
|
||||
LastError: _lastError,
|
||||
DrainState: _drainState);
|
||||
LastDrainUtc: lastDrain,
|
||||
LastSuccessUtc: lastSuccess,
|
||||
LastError: lastError,
|
||||
DrainState: drainState,
|
||||
EvictedCount: evicted);
|
||||
}
|
||||
|
||||
/// <summary>Operator action from Admin UI — retry every dead-lettered row. Non-cascading: they rejoin the regular queue + get a fresh backoff.</summary>
|
||||
@@ -449,9 +528,44 @@ public sealed class SqliteStoreAndForwardSink : IAlarmHistorianSink, IDisposable
|
||||
cmd.Parameters.AddWithValue("$n", toEvict);
|
||||
cmd.ExecuteNonQuery();
|
||||
}
|
||||
// Core.AlarmHistorian-009: increment the lifetime eviction counter so the
|
||||
// Admin UI / health check can report overflow without requiring log scraping.
|
||||
lock (_statusLock) { _evictedCount += toEvict; }
|
||||
_logger.Warning(
|
||||
"Historian queue at capacity {Cap} — evicted {Count} oldest row(s) to make room",
|
||||
_capacity, toEvict);
|
||||
"Historian queue at capacity {Cap} — evicted {Count} oldest row(s) to make room (lifetime evictions: {Total})",
|
||||
_capacity, toEvict, _evictedCount);
|
||||
}
|
||||
|
||||
// Async variant used by EnqueueAsync (Core.AlarmHistorian-003).
|
||||
private async Task EnforceCapacityAsync(SqliteConnection conn, CancellationToken ct)
|
||||
{
|
||||
long count;
|
||||
using (var cmd = conn.CreateCommand())
|
||||
{
|
||||
cmd.CommandText = "SELECT COUNT(*) FROM Queue WHERE DeadLettered = 0";
|
||||
count = (long)(await cmd.ExecuteScalarAsync(ct).ConfigureAwait(false) ?? 0L);
|
||||
}
|
||||
if (count < _capacity) return;
|
||||
|
||||
var toEvict = count - _capacity + 1;
|
||||
using (var cmd = conn.CreateCommand())
|
||||
{
|
||||
cmd.CommandText = """
|
||||
DELETE FROM Queue
|
||||
WHERE RowId IN (
|
||||
SELECT RowId FROM Queue
|
||||
WHERE DeadLettered = 0
|
||||
ORDER BY RowId ASC
|
||||
LIMIT $n
|
||||
)
|
||||
""";
|
||||
cmd.Parameters.AddWithValue("$n", toEvict);
|
||||
await cmd.ExecuteNonQueryAsync(ct).ConfigureAwait(false);
|
||||
}
|
||||
lock (_statusLock) { _evictedCount += toEvict; }
|
||||
_logger.Warning(
|
||||
"Historian queue at capacity {Cap} — evicted {Count} oldest row(s) to make room (lifetime evictions: {Total})",
|
||||
_capacity, toEvict, _evictedCount);
|
||||
}
|
||||
|
||||
private void PurgeAgedDeadLetters()
|
||||
|
||||
Reference in New Issue
Block a user