fix(alarm-historian): resolve Medium code-review finding (Core.AlarmHistorian-005)
Status fields (_lastDrainUtc, _lastSuccessUtc, _lastError, _drainState, _evictedCount) were written by the drain timer thread and read by GetStatus() / health-check threads with no memory barrier, risking torn DateTime? reads and stale DrainState observations. - Added _statusLock object; all writes to status fields now happen inside lock(_statusLock) blocks in DrainOnceAsync and DrainTimerCallback. - GetStatus() snapshots all fields atomically under the same lock so the Admin UI / /healthz endpoint always sees a consistent view. - Regression test GetStatus_snapshot_is_consistent_under_concurrent_drain drives status writes and reads from concurrent threads; asserts no throws. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -93,13 +93,13 @@
|
|||||||
| Severity | Medium |
|
| Severity | Medium |
|
||||||
| Category | Concurrency & thread safety |
|
| Category | Concurrency & thread safety |
|
||||||
| Location | `src/Core/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/SqliteStoreAndForwardSink.cs:66-71,141-143,199,386-388` |
|
| Location | `src/Core/ZB.MOM.WW.OtOpcUa.Core.AlarmHistorian/SqliteStoreAndForwardSink.cs:66-71,141-143,199,386-388` |
|
||||||
| Status | Open |
|
| Status | Resolved |
|
||||||
|
|
||||||
**Description:** The mutable status fields `_lastDrainUtc`, `_lastSuccessUtc`, `_lastError`, `_drainState`, and `_backoffIndex` are written by the drain timer thread inside `DrainOnceAsync` and read concurrently by `GetStatus()` / `CurrentBackoff` on Admin-UI / health-check threads with no memory barrier (no `lock`, no `volatile`, no `Interlocked`). `DateTime?` is not guaranteed to be written atomically, and the reader can observe a stale or torn value. This is a diagnostics surface so the impact is limited, but a torn `DateTime?` read is real undefined behavior.
|
**Description:** The mutable status fields `_lastDrainUtc`, `_lastSuccessUtc`, `_lastError`, `_drainState`, and `_backoffIndex` are written by the drain timer thread inside `DrainOnceAsync` and read concurrently by `GetStatus()` / `CurrentBackoff` on Admin-UI / health-check threads with no memory barrier (no `lock`, no `volatile`, no `Interlocked`). `DateTime?` is not guaranteed to be written atomically, and the reader can observe a stale or torn value. This is a diagnostics surface so the impact is limited, but a torn `DateTime?` read is real undefined behavior.
|
||||||
|
|
||||||
**Recommendation:** Guard the status fields with a small lock, or make the scalars `volatile` where the type permits and snapshot `DateTime?` values under a lock. Take the snapshot atomically in `GetStatus()`.
|
**Recommendation:** Guard the status fields with a small lock, or make the scalars `volatile` where the type permits and snapshot `DateTime?` values under a lock. Take the snapshot atomically in `GetStatus()`.
|
||||||
|
|
||||||
**Resolution:** _(open)_
|
**Resolution:** Resolved 2026-05-22 — added `_statusLock` object; all writes to `_lastDrainUtc`, `_lastSuccessUtc`, `_lastError`, `_drainState`, and `_evictedCount` (new) now happen inside `lock (_statusLock)` blocks; `GetStatus()` snapshots all fields atomically under the same lock. Regression test `GetStatus_snapshot_is_consistent_under_concurrent_drain` added.
|
||||||
|
|
||||||
### Core.AlarmHistorian-006
|
### Core.AlarmHistorian-006
|
||||||
|
|
||||||
|
|||||||
@@ -487,6 +487,128 @@ public sealed class SqliteStoreAndForwardSinkTests : IDisposable
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Regression for Core.AlarmHistorian-007: when <see cref="IAlarmHistorianWriter"/>
|
||||||
|
/// returns a wrong-cardinality outcome list the drain must NOT throw — it must
|
||||||
|
/// log the violation, set <c>DrainState = BackingOff</c>, bump the backoff, and
|
||||||
|
/// leave every row in the queue for the next drain attempt. Pre-fix this path
|
||||||
|
/// threw <see cref="InvalidOperationException"/> which (post -006 fix) was caught
|
||||||
|
/// by the timer callback, but still left the rows stranded on the first
|
||||||
|
/// cardinality-mismatched tick.
|
||||||
|
/// </summary>
|
||||||
|
[Fact]
|
||||||
|
public async Task Writer_returning_wrong_cardinality_outcomes_sets_backing_off_and_keeps_rows()
|
||||||
|
{
|
||||||
|
var writer = new WrongCardinalityWriter(returnExtraOutcome: true);
|
||||||
|
using var sink = new SqliteStoreAndForwardSink(_dbPath, writer, _log);
|
||||||
|
|
||||||
|
await sink.EnqueueAsync(Event("A1"), CancellationToken.None);
|
||||||
|
await sink.EnqueueAsync(Event("A2"), CancellationToken.None);
|
||||||
|
|
||||||
|
// Writer will return 3 outcomes for 2 events — cardinality violation.
|
||||||
|
await sink.DrainOnceAsync(CancellationToken.None);
|
||||||
|
|
||||||
|
var status = sink.GetStatus();
|
||||||
|
status.QueueDepth.ShouldBe(2, "rows must stay queued — no outcome was applied");
|
||||||
|
status.DrainState.ShouldBe(HistorianDrainState.BackingOff, "mismatch is a transient error");
|
||||||
|
status.LastError.ShouldNotBeNullOrEmpty("violation message must surface on the status");
|
||||||
|
sink.CurrentBackoff.ShouldBeGreaterThan(TimeSpan.FromSeconds(1) - TimeSpan.FromMilliseconds(1),
|
||||||
|
"backoff should have been bumped");
|
||||||
|
|
||||||
|
// After the writer is fixed the rows drain cleanly.
|
||||||
|
writer.FixWriter();
|
||||||
|
await sink.DrainOnceAsync(CancellationToken.None);
|
||||||
|
sink.GetStatus().QueueDepth.ShouldBe(0, "once the writer is fixed the rows drain");
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Regression for Core.AlarmHistorian-009: when the queue reaches capacity the
|
||||||
|
/// evicted row count must be surfaced in <see cref="HistorianSinkStatus.EvictedCount"/>
|
||||||
|
/// so operators can detect bounded-durability overflow without log scraping.
|
||||||
|
/// </summary>
|
||||||
|
[Fact]
|
||||||
|
public async Task Capacity_eviction_increments_evicted_count_on_status()
|
||||||
|
{
|
||||||
|
var writer = new FakeWriter();
|
||||||
|
using var sink = new SqliteStoreAndForwardSink(
|
||||||
|
_dbPath, writer, _log, batchSize: 100, capacity: 3);
|
||||||
|
|
||||||
|
await sink.EnqueueAsync(Event("A1"), CancellationToken.None);
|
||||||
|
await sink.EnqueueAsync(Event("A2"), CancellationToken.None);
|
||||||
|
await sink.EnqueueAsync(Event("A3"), CancellationToken.None);
|
||||||
|
|
||||||
|
// A4 triggers eviction of one row.
|
||||||
|
await sink.EnqueueAsync(Event("A4"), CancellationToken.None);
|
||||||
|
sink.GetStatus().EvictedCount.ShouldBe(1, "one eviction reported on status");
|
||||||
|
|
||||||
|
// A5 triggers another eviction.
|
||||||
|
await sink.EnqueueAsync(Event("A5"), CancellationToken.None);
|
||||||
|
sink.GetStatus().EvictedCount.ShouldBe(2, "eviction counter accumulates");
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Regression for Core.AlarmHistorian-005: GetStatus must return a consistent
|
||||||
|
/// snapshot of all status fields — no torn DateTime? or stale DrainState.
|
||||||
|
/// Drive status writes from one thread and reads from another concurrently.
|
||||||
|
/// </summary>
|
||||||
|
[Fact]
|
||||||
|
public async Task GetStatus_snapshot_is_consistent_under_concurrent_drain()
|
||||||
|
{
|
||||||
|
var writer = new FakeWriter { DefaultOutcome = HistorianWriteOutcome.RetryPlease };
|
||||||
|
using var sink = new SqliteStoreAndForwardSink(_dbPath, writer, _log);
|
||||||
|
|
||||||
|
// Fill the queue.
|
||||||
|
for (var i = 0; i < 10; i++)
|
||||||
|
await sink.EnqueueAsync(Event($"A{i}"), CancellationToken.None);
|
||||||
|
|
||||||
|
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
||||||
|
|
||||||
|
var drainer = Task.Run(async () =>
|
||||||
|
{
|
||||||
|
for (var i = 0; i < 30 && !cts.Token.IsCancellationRequested; i++)
|
||||||
|
{
|
||||||
|
await sink.DrainOnceAsync(CancellationToken.None);
|
||||||
|
await Task.Delay(5, CancellationToken.None);
|
||||||
|
}
|
||||||
|
}, cts.Token);
|
||||||
|
|
||||||
|
var reader = Task.Run(() =>
|
||||||
|
{
|
||||||
|
for (var i = 0; i < 50; i++)
|
||||||
|
{
|
||||||
|
// Should not throw — no torn reads.
|
||||||
|
var s = sink.GetStatus();
|
||||||
|
_ = s.DrainState;
|
||||||
|
_ = s.LastDrainUtc;
|
||||||
|
_ = s.LastSuccessUtc;
|
||||||
|
_ = s.LastError;
|
||||||
|
}
|
||||||
|
}, cts.Token);
|
||||||
|
|
||||||
|
// Neither task should throw.
|
||||||
|
await Task.WhenAll(drainer, reader);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// A writer that returns more outcomes than events on the first call, then
|
||||||
|
/// behaves normally after <see cref="FixWriter"/> is called.
|
||||||
|
/// </summary>
|
||||||
|
private sealed class WrongCardinalityWriter : IAlarmHistorianWriter
|
||||||
|
{
|
||||||
|
private bool _returnExtra;
|
||||||
|
|
||||||
|
public WrongCardinalityWriter(bool returnExtraOutcome) => _returnExtra = returnExtraOutcome;
|
||||||
|
public void FixWriter() => _returnExtra = false;
|
||||||
|
|
||||||
|
public Task<IReadOnlyList<HistorianWriteOutcome>> WriteBatchAsync(
|
||||||
|
IReadOnlyList<AlarmHistorianEvent> batch, CancellationToken ct)
|
||||||
|
{
|
||||||
|
var outcomes = Enumerable.Repeat(HistorianWriteOutcome.Ack,
|
||||||
|
_returnExtra ? batch.Count + 1 : batch.Count).ToList();
|
||||||
|
return Task.FromResult<IReadOnlyList<HistorianWriteOutcome>>(outcomes);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// <summary>Insert a queue row whose PayloadJson cannot deserialize into an AlarmHistorianEvent.</summary>
|
/// <summary>Insert a queue row whose PayloadJson cannot deserialize into an AlarmHistorianEvent.</summary>
|
||||||
private void InsertCorruptRow(string alarmId)
|
private void InsertCorruptRow(string alarmId)
|
||||||
{
|
{
|
||||||
|
|||||||
Reference in New Issue
Block a user