From 22711444cc42ca4f9010625e80e8c3e2d3c97737 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Fri, 26 Jun 2026 17:47:20 -0400 Subject: [PATCH] fix(historian-gateway): cancellation-safe alarm writer + dispose-safe outbox + provisioner polish + outbox tests I-1: GatewayAlarmHistorianWriter no longer dead-letters events cancelled mid-drain at shutdown. WriteBatchAsync short-circuits remaining events to RetryPlease once cancellation is requested, and SendOneAsync catches OperationCanceledException (when the token is cancelled) -> RetryPlease, so in-flight events stay queued instead of being permanently dropped. I-2: FasterLogHistorizationOutbox.Dispose now guards the awaited periodic loop with a broad catch (Exception) after the OperationCanceledException catch, so a non-Faster teardown fault (e.g. ObjectDisposedException) can never escape Dispose. M-1: GatewayTagProvisioner skips the empty EnsureTags round-trip when every request is non-historizable (early return). M-2: GatewayTagProvisioner handles plain shutdown cancellation quietly (Debug, not Warning), counting the unsent batch as Failed, never throwing. M-3/M-4: Added remove-last-entry (TailAddress truncation branch) and FIFO implicit-ack (RemoveAsync acks up to and including the target) durability tests, both reopen-and-survive. M-5: Clarifying comment in RecoverState on the transient over-capacity rebuild after a crash between append-commit and drop-truncation-commit. Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii --- .../GatewayAlarmHistorianWriter.cs | 16 +++++++ .../GatewayTagProvisioner.cs | 14 ++++++ .../Recorder/FasterLogHistorizationOutbox.cs | 11 +++++ .../GatewayAlarmHistorianWriterTests.cs | 17 +++++++ .../GatewayTagProvisionerTests.cs | 22 ++++++++- .../FasterLogHistorizationOutboxTests.cs | 48 +++++++++++++++++++ 6 files changed, 127 insertions(+), 1 deletion(-) diff --git a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/GatewayAlarmHistorianWriter.cs b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/GatewayAlarmHistorianWriter.cs index 535ed779..bddb63d5 100644 --- a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/GatewayAlarmHistorianWriter.cs +++ b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/GatewayAlarmHistorianWriter.cs @@ -65,6 +65,15 @@ public sealed class GatewayAlarmHistorianWriter : IAlarmHistorianWriter for (var i = 0; i < batch.Count; i++) { + if (cancellationToken.IsCancellationRequested) + { + // Shutdown mid-drain: short-circuit the remaining events to RetryPlease rather than + // calling the gateway with a cancelled token. They stay queued for retry next startup + // — a cancellation must NEVER dead-letter an in-flight event (silent data loss). + outcomes[i] = HistorianWriteOutcome.RetryPlease; + continue; + } + outcomes[i] = await SendOneAsync(batch[i], cancellationToken).ConfigureAwait(false); } @@ -79,6 +88,13 @@ public sealed class GatewayAlarmHistorianWriter : IAlarmHistorianWriter .ConfigureAwait(false); return MapAck(ack); } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + // Cancellation mid-send at shutdown is NOT a poison event. Map to RetryPlease so the + // event stays queued for next startup rather than being dead-lettered (data loss). + _logger.LogDebug("Alarm SendEvent cancelled at shutdown; will retry."); + return HistorianWriteOutcome.RetryPlease; + } catch (Exception exception) { // NEVER throw out of the writer — the drain worker expects a per-event outcome. Classify diff --git a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/GatewayTagProvisioner.cs b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/GatewayTagProvisioner.cs index ebba67f2..08e87f28 100644 --- a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/GatewayTagProvisioner.cs +++ b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/GatewayTagProvisioner.cs @@ -61,6 +61,12 @@ public sealed class GatewayTagProvisioner : IHistorianProvisioning }); } + // Every request was non-historizable — nothing to send. Skip the empty gateway round-trip. + if (definitions.Count == 0) + { + return new HistorianProvisionResult(requests.Count, Ensured: 0, Skipped: skipped, Failed: 0); + } + try { var results = await _client.EnsureTagsAsync(definitions, ct).ConfigureAwait(false); @@ -68,6 +74,14 @@ public sealed class GatewayTagProvisioner : IHistorianProvisioning var failed = Math.Max(0, definitions.Count - ensured); return new HistorianProvisionResult(requests.Count, ensured, skipped, failed); } + catch (OperationCanceledException) when (ct.IsCancellationRequested) + { + // A plain shutdown cancellation is not a provisioning fault worth a scary Warning. Count + // the unsent batch as Failed (it didn't land) but log quietly and never throw, keeping + // the non-blocking contract. + _logger.LogDebug("Tag provisioning cancelled at shutdown; deferred."); + return new HistorianProvisionResult(requests.Count, Ensured: 0, Skipped: skipped, Failed: definitions.Count); + } catch (Exception exception) { // Non-blocking: a failed EnsureTags never fails the apply. Count the whole sent batch as diff --git a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/Recorder/FasterLogHistorizationOutbox.cs b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/Recorder/FasterLogHistorizationOutbox.cs index 06e33890..1c090969 100644 --- a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/Recorder/FasterLogHistorizationOutbox.cs +++ b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/Recorder/FasterLogHistorizationOutbox.cs @@ -197,6 +197,11 @@ public sealed class FasterLogHistorizationOutbox : IHistorizationOutbox // // CTOR-ONLY: called once before the instance is published and before the periodic-commit loop // starts. It unconditionally seeds _nextScanAddress/_live/_index, so it must NEVER run post-ctor. + // + // Capacity note: if a crash lands between an append's commit and the subsequent drop-oldest + // truncation commit, recovery scans the still-present oldest record and may transiently rebuild + // _live with MORE than _capacity entries. This self-corrects on the next AppendAsync — its + // drop-oldest while-loop runs until _live.Count <= _capacity, so the overflow converges away. private void RecoverState() { _nextScanAddress = _log.BeginAddress; @@ -258,6 +263,12 @@ public sealed class FasterLogHistorizationOutbox : IHistorizationOutbox { // Cancellation is the expected stop signal — not an error. } + catch (Exception) + { + // The loop faulted on a non-Faster commit error during teardown (e.g. an + // ObjectDisposedException as the device tears down); swallow — Dispose must not + // throw. Already-committed enqueues remain durable. + } _periodicCommitTimer?.Dispose(); _periodicCommitCts.Dispose(); diff --git a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/GatewayAlarmHistorianWriterTests.cs b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/GatewayAlarmHistorianWriterTests.cs index 0bbd80a7..bd307d75 100644 --- a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/GatewayAlarmHistorianWriterTests.cs +++ b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/GatewayAlarmHistorianWriterTests.cs @@ -174,6 +174,23 @@ public sealed class GatewayAlarmHistorianWriterTests Assert.Equal(HistorianWriteOutcome.PermanentFail, outcomes[0]); } + [Fact] + public async Task Cancellation_mid_drain_is_RetryPlease_not_PermanentFail() + { + // Shutdown mid-drain: a cancelled token must NOT dead-letter in-flight events (silent data + // loss). Every outcome is RetryPlease (stays queued for next startup), WriteBatchAsync never + // throws, and the gateway is not called with a cancelled token (short-circuited up front). + using var cts = new CancellationTokenSource(); + await cts.CancelAsync(); + var fake = new FakeHistorianGatewayClient { SendEventThrows = new OperationCanceledException() }; + + var outcomes = await Writer(fake).WriteBatchAsync(new[] { Evt("A"), Evt("B") }, cts.Token); + + Assert.Equal(2, outcomes.Count); + Assert.All(outcomes, o => Assert.Equal(HistorianWriteOutcome.RetryPlease, o)); + Assert.Equal(0, fake.SendEventCallCount); + } + [Fact] public async Task Empty_batch_returns_empty() { diff --git a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/GatewayTagProvisionerTests.cs b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/GatewayTagProvisionerTests.cs index acdecee9..ad022a47 100644 --- a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/GatewayTagProvisionerTests.cs +++ b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/GatewayTagProvisionerTests.cs @@ -64,11 +64,31 @@ public sealed class GatewayTagProvisionerTests new[] { new HistorianTagProvisionRequest("Pump1.Name", DriverDataType.String, null, null) }, TestContext.Current.CancellationToken); - Assert.Empty(fake.LastEnsureDefinitions!); // String is deferred → never built into a definition + // String is deferred → never built into a definition, so the empty batch skips the gateway + // round-trip entirely (the call is never made). + Assert.Equal(0, fake.EnsureTagsCallCount); Assert.Equal(1, result.Requested); Assert.Equal(1, result.Skipped); } + [Fact] + public async Task Cancellation_is_quiet_and_not_misreported() + { + // A plain shutdown cancellation must not throw and must not be a scary Warning. The unsent + // batch is counted as Failed (it didn't land) but handled quietly — non-blocking contract. + using var cts = new CancellationTokenSource(); + await cts.CancelAsync(); + var fake = new FakeHistorianGatewayClient { EnsureTagsThrows = new OperationCanceledException() }; + var p = Provisioner(fake); + + var result = await p.EnsureTagsAsync( + new[] { new HistorianTagProvisionRequest("Pump1.Temp", DriverDataType.Float32, null, null) }, + cts.Token); + + Assert.Equal(1, result.Failed); // counted, not thrown + Assert.Equal(0, result.Ensured); + } + [Fact] public async Task Gateway_failure_is_swallowed_and_counted_not_thrown() { diff --git a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/Recorder/FasterLogHistorizationOutboxTests.cs b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/Recorder/FasterLogHistorizationOutboxTests.cs index bc4d1213..fbf80041 100644 --- a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/Recorder/FasterLogHistorizationOutboxTests.cs +++ b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/Recorder/FasterLogHistorizationOutboxTests.cs @@ -59,6 +59,54 @@ public sealed class FasterLogHistorizationOutboxTests : IDisposable Assert.Equal(keep, batch[0].Id); } + [Fact] + public async Task Remove_last_entry_empties_and_survives_restart() + { + // Removing the only entry exercises the node.Next == null ⇒ TailAddress truncation branch: + // the head advances to the tail, the outbox empties, and that empty state must persist across + // a restart (a stale survivor here would silently re-drain an already-acked entry). + var dir = NewTempDir(); + var a = E("A", 1); + { + using var o = new FasterLogHistorizationOutbox(dir, HistorizationCommitMode.PerEntry); + await o.AppendAsync(a, TestContext.Current.CancellationToken); + await o.RemoveAsync(a.Id, TestContext.Current.CancellationToken); // ack the only entry + Assert.Equal(0, await o.CountAsync(TestContext.Current.CancellationToken)); + } + + using var reopened = new FasterLogHistorizationOutbox(dir, HistorizationCommitMode.PerEntry); + Assert.Equal(0, await reopened.CountAsync(TestContext.Current.CancellationToken)); + Assert.Empty(await reopened.PeekBatchAsync(10, TestContext.Current.CancellationToken)); + } + + [Fact] + public async Task Remove_acks_fifo_up_to_and_including_target_and_survives_restart() + { + // FIFO implicit-ack: acking B truncates everything up to AND including B (so A is implicitly + // acked too), leaving only the newer C. This is the documented head-advance semantics in + // RemoveAsync, and it must persist across a restart. + var dir = NewTempDir(); + var a = E("A", 1); + var b = E("B", 2); + var c = E("C", 3); + { + using var o = new FasterLogHistorizationOutbox(dir, HistorizationCommitMode.PerEntry); + await o.AppendAsync(a, TestContext.Current.CancellationToken); + await o.AppendAsync(b, TestContext.Current.CancellationToken); + await o.AppendAsync(c, TestContext.Current.CancellationToken); + await o.RemoveAsync(b.Id, TestContext.Current.CancellationToken); // acks A and B, leaves C + + Assert.Equal(1, await o.CountAsync(TestContext.Current.CancellationToken)); + var remaining = await o.PeekBatchAsync(10, TestContext.Current.CancellationToken); + Assert.Equal(new[] { c.Id }, remaining.Select(e => e.Id)); + } + + using var reopened = new FasterLogHistorizationOutbox(dir, HistorizationCommitMode.PerEntry); + Assert.Equal(1, await reopened.CountAsync(TestContext.Current.CancellationToken)); + var survived = await reopened.PeekBatchAsync(10, TestContext.Current.CancellationToken); + Assert.Equal(new[] { c.Id }, survived.Select(e => e.Id)); + } + [Fact] public async Task Capacity_full_drops_oldest_and_counts() {