review(Core.AlarmHistorian): reset drain state on cancel + volatile _disposed

Re-review at 7286d320. -012 (Medium): OperationCanceledException left _drainState stuck
at Draining on the status surface; now resets to BackingOff + test. -013: _disposed ->
volatile (mirrors _backoffIndex). -014 (post-dispose status guards) deferred cross-module.
This commit is contained in:
Joseph Doherty
2026-06-19 11:21:35 -04:00
parent 48af117bff
commit 6b4210cb17
3 changed files with 125 additions and 4 deletions
@@ -786,6 +786,55 @@ public sealed class SqliteStoreAndForwardSinkTests : IDisposable
sink.GetStatus().QueueDepth.ShouldBe(0, "every event drained at the end of the run");
}
/// <summary>
/// Regression for Core.AlarmHistorian-012: when WriteBatchAsync throws
/// <see cref="OperationCanceledException"/> the drain exits via re-throw
/// without resetting <c>_drainState</c>, leaving the status surface permanently
/// showing <c>Draining</c>. The next call to <see cref="SqliteStoreAndForwardSink.GetStatus"/>
/// must reflect <c>Idle</c> or <c>BackingOff</c>, not the stale <c>Draining</c>
/// state that was set at the top of the drain tick.
/// </summary>
[Fact]
public async Task Drain_cancelled_mid_write_resets_drain_state_to_not_draining()
{
var writer = new CancellableWriter();
using var sink = new SqliteStoreAndForwardSink(_dbPath, writer, _log);
await sink.EnqueueAsync(Event("A1"), CancellationToken.None);
using var cts = new CancellationTokenSource();
writer.CancelOn(cts);
// The drain will cancel inside WriteBatchAsync — OperationCanceledException re-throws.
await Should.ThrowAsync<OperationCanceledException>(
() => sink.DrainOnceAsync(cts.Token));
// DrainState must NOT be left as Draining after the cancellation escapes.
sink.GetStatus().DrainState.ShouldNotBe(HistorianDrainState.Draining,
"cancellation must clear the Draining status; row stays queued for next tick");
// The row must still be in the queue — the cancelled drain applied no outcomes.
sink.GetStatus().QueueDepth.ShouldBe(1, "row stays queued when drain is cancelled");
}
/// <summary>A writer that cancels its own CancellationTokenSource on the first call.</summary>
private sealed class CancellableWriter : IAlarmHistorianWriter
{
private CancellationTokenSource? _cts;
/// <summary>Register the token source to cancel on first write.</summary>
public void CancelOn(CancellationTokenSource cts) => _cts = cts;
/// <summary>Writes a batch; cancels via the registered CTS on the first call.</summary>
public Task<IReadOnlyList<HistorianWriteOutcome>> WriteBatchAsync(
IReadOnlyList<AlarmHistorianEvent> batch, CancellationToken ct)
{
_cts?.Cancel();
ct.ThrowIfCancellationRequested();
var outcomes = Enumerable.Repeat(HistorianWriteOutcome.Ack, batch.Count).ToList();
return Task.FromResult<IReadOnlyList<HistorianWriteOutcome>>(outcomes);
}
}
/// <summary>
/// Helper that confirms the queue depth surfaced by GetStatus matches a fresh
/// COUNT(*) read directly from storage — proves the in-memory counter has not