diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Historian/ContinuousHistorizationRecorder.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Historian/ContinuousHistorizationRecorder.cs index de0f76d0..1af30213 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Historian/ContinuousHistorizationRecorder.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Historian/ContinuousHistorizationRecorder.cs @@ -68,12 +68,15 @@ public sealed class ContinuousHistorizationRecorder : ReceiveActor, IWithTimers /// Lifetime count of values appended to the outbox. /// Lifetime count of values dropped for not being numeric-coercible. /// Lifetime count of entries the outbox dropped on capacity overflow. + /// Lifetime count of durable-boundary append failures (the value + /// was dropped, not recorded, and the actor stayed alive rather than restart-looping). /// Whether the most recent drain pass acked cleanly. public sealed record RecorderStatus( int QueuedDepth, long TotalRecorded, long DroppedNonNumeric, long OutboxDropped, + long OutboxAppendFailures, bool LastDrainSucceeded); private readonly IActorRef _dependencyMux; @@ -93,6 +96,7 @@ public sealed class ContinuousHistorizationRecorder : ReceiveActor, IWithTimers private DateTime _nextAllowedDrainUtc = DateTime.MinValue; private long _totalRecorded; private long _droppedNonNumeric; + private long _outboxAppendFailures; private bool _lastDrainSucceeded = true; /// Gets or sets the timer scheduler (set by Akka via ). @@ -153,7 +157,14 @@ public sealed class ContinuousHistorizationRecorder : ReceiveActor, IWithTimers ReceiveAsync(OnValueChangedAsync); Receive(_ => OnDrainTick()); Receive(OnDrainResult); - ReceiveAsync(async _ => Sender.Tell(await BuildStatusAsync().ConfigureAwait(false))); + ReceiveAsync(async _ => + { + // Capture Sender before the await: although Akka restores the actor context across awaits, + // capturing first is the robust idiom (Sender after an await is brittle). + IActorRef replyTo = Sender; + RecorderStatus status = await BuildStatusAsync().ConfigureAwait(false); + replyTo.Tell(status); + }); } /// @@ -169,6 +180,9 @@ public sealed class ContinuousHistorizationRecorder : ReceiveActor, IWithTimers /// protected override void PostStop() { + // Drop our mux interest eagerly (mirrors VirtualTagActor.PostStop) so the mux stops fanning to + // us immediately, closing the dead-letter window between this stop and the mux's Terminated. + _dependencyMux.Tell(new DependencyMuxActor.UnregisterInterest(Self)); _lifetime.Cancel(); _lifetime.Dispose(); base.PostStop(); @@ -200,7 +214,28 @@ public sealed class ContinuousHistorizationRecorder : ReceiveActor, IWithTimers // Durable boundary: append (awaited so appends stay serialized) BEFORE the value is considered // captured. The outbox drops the oldest entry on capacity overflow and tracks DroppedCount. - await _outbox.AppendAsync(entry, _lifetime.Token).ConfigureAwait(false); + try + { + await _outbox.AppendAsync(entry, _lifetime.Token).ConfigureAwait(false); + } + catch (OperationCanceledException) when (_lifetime.IsCancellationRequested) + { + // Normal shutdown raced the append — not a fault. Drop quietly. + return; + } + catch (Exception ex) + { + // A durable-boundary failure (e.g. a PerEntry fsync hitting disk-full / I/O error) must NEVER + // propagate out of the handler — that would trip Akka supervision into a restart, and under a + // persistent disk fault the actor would restart-loop (re-register → next value → append fails + // → restart → …). Mirror the drain path's catch-all: meter the failure (category only, no + // value content), drop this value, and stay alive. Do NOT record it or nudge the drain. + _outboxAppendFailures++; + _log.Warning("ContinuousHistorization: outbox append failed ({Exception}); value dropped.", + ex.GetType().Name); + return; + } + _totalRecorded++; // Nudge a prompt drain attempt; the DrainTick handler de-dups (already draining) and honours @@ -326,6 +361,7 @@ public sealed class ContinuousHistorizationRecorder : ReceiveActor, IWithTimers _totalRecorded, _droppedNonNumeric, _outbox.DroppedCount, + _outboxAppendFailures, _lastDrainSucceeded); } diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Historian/ContinuousHistorizationRecorderTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Historian/ContinuousHistorizationRecorderTests.cs index 42ee87fe..f01d4053 100644 --- a/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Historian/ContinuousHistorizationRecorderTests.cs +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests/Historian/ContinuousHistorizationRecorderTests.cs @@ -66,8 +66,40 @@ public sealed class ContinuousHistorizationRecorderTests : TestKit rec.Tell(new VirtualTagActor.DependencyValueChanged("Pump1.Temp", 7.0, DateTime.UtcNow)); + // Prove the drain actually RAN, not just that the append happened: assert the writer was + // invoked (the fake records every value, even on Succeed=false) AND that the outbox stayed at 1 + // (RemoveAsync was NOT called, so the un-acked entry is retained for retry). Count==1 alone is + // true the instant the append lands and would not catch a silently-broken drain. await AwaitAssertAsync(async () => - Assert.Equal(1, await outbox.CountAsync(default))); // not acked -> retained for retry + { + Assert.Contains(writer.Snapshot(), w => w.Tag == "Pump1.Temp" && w.Value == 7.0); + Assert.Equal(1, await outbox.CountAsync(default)); + }); + } + + [Fact] + public async Task Retry_after_writer_failure_eventually_acks() + { + var mux = CreateTestProbe(); + // First drain fails, the next succeeds — exercises the retry → success → ack path. + var writer = new FakeValueWriter { Succeed = true, FailFirstN = 1 }; + var outbox = new InMemoryOutbox(); + + // Short backoff so the retry fires promptly; the assert below is still time-bounded and + // deterministic (AwaitAssert polls — no Thread.Sleep). + var rec = Sys.ActorOf(ContinuousHistorizationRecorder.Props( + mux.Ref, writer, outbox, new[] { "Pump1.Temp" }, + minBackoff: TimeSpan.FromMilliseconds(50), + maxBackoff: TimeSpan.FromMilliseconds(200))); + + rec.Tell(new VirtualTagActor.DependencyValueChanged("Pump1.Temp", 13.0, DateTime.UtcNow)); + + // The first drain returns false (entry retained); after the backoff the retry drain succeeds + // and acks, truncating the outbox to 0. + await AwaitAssertAsync(async () => + Assert.Equal(0, await outbox.CountAsync(default)), TimeSpan.FromSeconds(5)); + + Assert.True(writer.CallCount >= 2, "the writer must have been called at least twice (a retry happened)"); } [Fact] @@ -99,9 +131,27 @@ public sealed class ContinuousHistorizationRecorderTests : TestKit { private readonly Lock _gate = new(); private readonly List _written = new(); + private int _calls; public bool Succeed { get; init; } = true; + /// When > 0, the first N calls fail (return false) regardless of ; + /// later calls use . Lets a test prove the retry → success → ack path while + /// leaving the plain behaviour (FailFirstN==0) untouched. + public int FailFirstN { get; init; } + + /// Lifetime count of invocations (proves a retry ran). + public int CallCount + { + get + { + lock (_gate) + { + return _calls; + } + } + } + public IReadOnlyList Snapshot() { lock (_gate) @@ -115,13 +165,16 @@ public sealed class ContinuousHistorizationRecorderTests : TestKit { lock (_gate) { + _calls++; foreach (HistorizationValue v in values) { _written.Add(new WrittenValue(tag, v.Value, v.Quality, v.TimestampUtc)); } - } - return Task.FromResult(Succeed); + // Fail the first FailFirstN calls; thereafter honour Succeed. + bool result = _calls > FailFirstN && Succeed; + return Task.FromResult(result); + } } }