fix(historian-gateway): cancellation-safe alarm writer + dispose-safe outbox + provisioner polish + outbox tests
I-1: GatewayAlarmHistorianWriter no longer dead-letters events cancelled mid-drain at shutdown. WriteBatchAsync short-circuits remaining events to RetryPlease once cancellation is requested, and SendOneAsync catches OperationCanceledException (when the token is cancelled) -> RetryPlease, so in-flight events stay queued instead of being permanently dropped. I-2: FasterLogHistorizationOutbox.Dispose now guards the awaited periodic loop with a broad catch (Exception) after the OperationCanceledException catch, so a non-Faster teardown fault (e.g. ObjectDisposedException) can never escape Dispose. M-1: GatewayTagProvisioner skips the empty EnsureTags round-trip when every request is non-historizable (early return). M-2: GatewayTagProvisioner handles plain shutdown cancellation quietly (Debug, not Warning), counting the unsent batch as Failed, never throwing. M-3/M-4: Added remove-last-entry (TailAddress truncation branch) and FIFO implicit-ack (RemoveAsync acks up to and including the target) durability tests, both reopen-and-survive. M-5: Clarifying comment in RecoverState on the transient over-capacity rebuild after a crash between append-commit and drop-truncation-commit. Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii
This commit is contained in:
@@ -65,6 +65,15 @@ public sealed class GatewayAlarmHistorianWriter : IAlarmHistorianWriter
|
||||
|
||||
for (var i = 0; i < batch.Count; i++)
|
||||
{
|
||||
if (cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
// Shutdown mid-drain: short-circuit the remaining events to RetryPlease rather than
|
||||
// calling the gateway with a cancelled token. They stay queued for retry next startup
|
||||
// — a cancellation must NEVER dead-letter an in-flight event (silent data loss).
|
||||
outcomes[i] = HistorianWriteOutcome.RetryPlease;
|
||||
continue;
|
||||
}
|
||||
|
||||
outcomes[i] = await SendOneAsync(batch[i], cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
@@ -79,6 +88,13 @@ public sealed class GatewayAlarmHistorianWriter : IAlarmHistorianWriter
|
||||
.ConfigureAwait(false);
|
||||
return MapAck(ack);
|
||||
}
|
||||
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
// Cancellation mid-send at shutdown is NOT a poison event. Map to RetryPlease so the
|
||||
// event stays queued for next startup rather than being dead-lettered (data loss).
|
||||
_logger.LogDebug("Alarm SendEvent cancelled at shutdown; will retry.");
|
||||
return HistorianWriteOutcome.RetryPlease;
|
||||
}
|
||||
catch (Exception exception)
|
||||
{
|
||||
// NEVER throw out of the writer — the drain worker expects a per-event outcome. Classify
|
||||
|
||||
@@ -61,6 +61,12 @@ public sealed class GatewayTagProvisioner : IHistorianProvisioning
|
||||
});
|
||||
}
|
||||
|
||||
// Every request was non-historizable — nothing to send. Skip the empty gateway round-trip.
|
||||
if (definitions.Count == 0)
|
||||
{
|
||||
return new HistorianProvisionResult(requests.Count, Ensured: 0, Skipped: skipped, Failed: 0);
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
var results = await _client.EnsureTagsAsync(definitions, ct).ConfigureAwait(false);
|
||||
@@ -68,6 +74,14 @@ public sealed class GatewayTagProvisioner : IHistorianProvisioning
|
||||
var failed = Math.Max(0, definitions.Count - ensured);
|
||||
return new HistorianProvisionResult(requests.Count, ensured, skipped, failed);
|
||||
}
|
||||
catch (OperationCanceledException) when (ct.IsCancellationRequested)
|
||||
{
|
||||
// A plain shutdown cancellation is not a provisioning fault worth a scary Warning. Count
|
||||
// the unsent batch as Failed (it didn't land) but log quietly and never throw, keeping
|
||||
// the non-blocking contract.
|
||||
_logger.LogDebug("Tag provisioning cancelled at shutdown; deferred.");
|
||||
return new HistorianProvisionResult(requests.Count, Ensured: 0, Skipped: skipped, Failed: definitions.Count);
|
||||
}
|
||||
catch (Exception exception)
|
||||
{
|
||||
// Non-blocking: a failed EnsureTags never fails the apply. Count the whole sent batch as
|
||||
|
||||
+11
@@ -197,6 +197,11 @@ public sealed class FasterLogHistorizationOutbox : IHistorizationOutbox
|
||||
//
|
||||
// CTOR-ONLY: called once before the instance is published and before the periodic-commit loop
|
||||
// starts. It unconditionally seeds _nextScanAddress/_live/_index, so it must NEVER run post-ctor.
|
||||
//
|
||||
// Capacity note: if a crash lands between an append's commit and the subsequent drop-oldest
|
||||
// truncation commit, recovery scans the still-present oldest record and may transiently rebuild
|
||||
// _live with MORE than _capacity entries. This self-corrects on the next AppendAsync — its
|
||||
// drop-oldest while-loop runs until _live.Count <= _capacity, so the overflow converges away.
|
||||
private void RecoverState()
|
||||
{
|
||||
_nextScanAddress = _log.BeginAddress;
|
||||
@@ -258,6 +263,12 @@ public sealed class FasterLogHistorizationOutbox : IHistorizationOutbox
|
||||
{
|
||||
// Cancellation is the expected stop signal — not an error.
|
||||
}
|
||||
catch (Exception)
|
||||
{
|
||||
// The loop faulted on a non-Faster commit error during teardown (e.g. an
|
||||
// ObjectDisposedException as the device tears down); swallow — Dispose must not
|
||||
// throw. Already-committed enqueues remain durable.
|
||||
}
|
||||
|
||||
_periodicCommitTimer?.Dispose();
|
||||
_periodicCommitCts.Dispose();
|
||||
|
||||
+17
@@ -174,6 +174,23 @@ public sealed class GatewayAlarmHistorianWriterTests
|
||||
Assert.Equal(HistorianWriteOutcome.PermanentFail, outcomes[0]);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Cancellation_mid_drain_is_RetryPlease_not_PermanentFail()
|
||||
{
|
||||
// Shutdown mid-drain: a cancelled token must NOT dead-letter in-flight events (silent data
|
||||
// loss). Every outcome is RetryPlease (stays queued for next startup), WriteBatchAsync never
|
||||
// throws, and the gateway is not called with a cancelled token (short-circuited up front).
|
||||
using var cts = new CancellationTokenSource();
|
||||
await cts.CancelAsync();
|
||||
var fake = new FakeHistorianGatewayClient { SendEventThrows = new OperationCanceledException() };
|
||||
|
||||
var outcomes = await Writer(fake).WriteBatchAsync(new[] { Evt("A"), Evt("B") }, cts.Token);
|
||||
|
||||
Assert.Equal(2, outcomes.Count);
|
||||
Assert.All(outcomes, o => Assert.Equal(HistorianWriteOutcome.RetryPlease, o));
|
||||
Assert.Equal(0, fake.SendEventCallCount);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Empty_batch_returns_empty()
|
||||
{
|
||||
|
||||
+21
-1
@@ -64,11 +64,31 @@ public sealed class GatewayTagProvisionerTests
|
||||
new[] { new HistorianTagProvisionRequest("Pump1.Name", DriverDataType.String, null, null) },
|
||||
TestContext.Current.CancellationToken);
|
||||
|
||||
Assert.Empty(fake.LastEnsureDefinitions!); // String is deferred → never built into a definition
|
||||
// String is deferred → never built into a definition, so the empty batch skips the gateway
|
||||
// round-trip entirely (the call is never made).
|
||||
Assert.Equal(0, fake.EnsureTagsCallCount);
|
||||
Assert.Equal(1, result.Requested);
|
||||
Assert.Equal(1, result.Skipped);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Cancellation_is_quiet_and_not_misreported()
|
||||
{
|
||||
// A plain shutdown cancellation must not throw and must not be a scary Warning. The unsent
|
||||
// batch is counted as Failed (it didn't land) but handled quietly — non-blocking contract.
|
||||
using var cts = new CancellationTokenSource();
|
||||
await cts.CancelAsync();
|
||||
var fake = new FakeHistorianGatewayClient { EnsureTagsThrows = new OperationCanceledException() };
|
||||
var p = Provisioner(fake);
|
||||
|
||||
var result = await p.EnsureTagsAsync(
|
||||
new[] { new HistorianTagProvisionRequest("Pump1.Temp", DriverDataType.Float32, null, null) },
|
||||
cts.Token);
|
||||
|
||||
Assert.Equal(1, result.Failed); // counted, not thrown
|
||||
Assert.Equal(0, result.Ensured);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Gateway_failure_is_swallowed_and_counted_not_thrown()
|
||||
{
|
||||
|
||||
+48
@@ -59,6 +59,54 @@ public sealed class FasterLogHistorizationOutboxTests : IDisposable
|
||||
Assert.Equal(keep, batch[0].Id);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Remove_last_entry_empties_and_survives_restart()
|
||||
{
|
||||
// Removing the only entry exercises the node.Next == null ⇒ TailAddress truncation branch:
|
||||
// the head advances to the tail, the outbox empties, and that empty state must persist across
|
||||
// a restart (a stale survivor here would silently re-drain an already-acked entry).
|
||||
var dir = NewTempDir();
|
||||
var a = E("A", 1);
|
||||
{
|
||||
using var o = new FasterLogHistorizationOutbox(dir, HistorizationCommitMode.PerEntry);
|
||||
await o.AppendAsync(a, TestContext.Current.CancellationToken);
|
||||
await o.RemoveAsync(a.Id, TestContext.Current.CancellationToken); // ack the only entry
|
||||
Assert.Equal(0, await o.CountAsync(TestContext.Current.CancellationToken));
|
||||
}
|
||||
|
||||
using var reopened = new FasterLogHistorizationOutbox(dir, HistorizationCommitMode.PerEntry);
|
||||
Assert.Equal(0, await reopened.CountAsync(TestContext.Current.CancellationToken));
|
||||
Assert.Empty(await reopened.PeekBatchAsync(10, TestContext.Current.CancellationToken));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Remove_acks_fifo_up_to_and_including_target_and_survives_restart()
|
||||
{
|
||||
// FIFO implicit-ack: acking B truncates everything up to AND including B (so A is implicitly
|
||||
// acked too), leaving only the newer C. This is the documented head-advance semantics in
|
||||
// RemoveAsync, and it must persist across a restart.
|
||||
var dir = NewTempDir();
|
||||
var a = E("A", 1);
|
||||
var b = E("B", 2);
|
||||
var c = E("C", 3);
|
||||
{
|
||||
using var o = new FasterLogHistorizationOutbox(dir, HistorizationCommitMode.PerEntry);
|
||||
await o.AppendAsync(a, TestContext.Current.CancellationToken);
|
||||
await o.AppendAsync(b, TestContext.Current.CancellationToken);
|
||||
await o.AppendAsync(c, TestContext.Current.CancellationToken);
|
||||
await o.RemoveAsync(b.Id, TestContext.Current.CancellationToken); // acks A and B, leaves C
|
||||
|
||||
Assert.Equal(1, await o.CountAsync(TestContext.Current.CancellationToken));
|
||||
var remaining = await o.PeekBatchAsync(10, TestContext.Current.CancellationToken);
|
||||
Assert.Equal(new[] { c.Id }, remaining.Select(e => e.Id));
|
||||
}
|
||||
|
||||
using var reopened = new FasterLogHistorizationOutbox(dir, HistorizationCommitMode.PerEntry);
|
||||
Assert.Equal(1, await reopened.CountAsync(TestContext.Current.CancellationToken));
|
||||
var survived = await reopened.PeekBatchAsync(10, TestContext.Current.CancellationToken);
|
||||
Assert.Equal(new[] { c.Id }, survived.Select(e => e.Id));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Capacity_full_drops_oldest_and_counts()
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user