fix(historian-gateway): guard recorder outbox-append failures + retry-success test + Sender capture + mux deregister
I-1: Wrap the OnValueChangedAsync AppendAsync in try/catch so a durable-boundary failure (e.g. a PerEntry fsync hitting disk-full/I-O error) can no longer propagate out of the handler and trip Akka supervision into a restart loop. A canceled append during shutdown returns quietly; any other exception increments a new _outboxAppendFailures counter, logs a Warning (exception type name only), and drops the value without recording it or nudging the drain. The counter is surfaced on RecorderStatus (new OutboxAppendFailures field). I-2: Strengthen Writer_failure_keeps_entry_for_retry to prove the drain actually ran — assert the writer was invoked (the fake records even on Succeed=false) AND the outbox stayed at 1 (RemoveAsync not called), via AwaitAssertAsync. M-3: Capture Sender before the await in the GetStatus handler, then Tell the reply. M-4: Add Retry_after_writer_failure_eventually_acks proving the retry -> success -> ack path; FakeValueWriter gains a FailFirstN option + CallCount (Succeed behaviour unchanged). Short minBackoff keeps it fast and deterministic (AwaitAssert, no sleep). M-5: Deregister mux interest on PostStop via DependencyMuxActor.UnregisterInterest, mirroring VirtualTagActor.PostStop, closing the dead-letter window before Terminated. Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii
This commit is contained in:
@@ -68,12 +68,15 @@ public sealed class ContinuousHistorizationRecorder : ReceiveActor, IWithTimers
|
|||||||
/// <param name="TotalRecorded">Lifetime count of values appended to the outbox.</param>
|
/// <param name="TotalRecorded">Lifetime count of values appended to the outbox.</param>
|
||||||
/// <param name="DroppedNonNumeric">Lifetime count of values dropped for not being numeric-coercible.</param>
|
/// <param name="DroppedNonNumeric">Lifetime count of values dropped for not being numeric-coercible.</param>
|
||||||
/// <param name="OutboxDropped">Lifetime count of entries the outbox dropped on capacity overflow.</param>
|
/// <param name="OutboxDropped">Lifetime count of entries the outbox dropped on capacity overflow.</param>
|
||||||
|
/// <param name="OutboxAppendFailures">Lifetime count of durable-boundary append failures (the value
|
||||||
|
/// was dropped, not recorded, and the actor stayed alive rather than restart-looping).</param>
|
||||||
/// <param name="LastDrainSucceeded">Whether the most recent drain pass acked cleanly.</param>
|
/// <param name="LastDrainSucceeded">Whether the most recent drain pass acked cleanly.</param>
|
||||||
public sealed record RecorderStatus(
|
public sealed record RecorderStatus(
|
||||||
int QueuedDepth,
|
int QueuedDepth,
|
||||||
long TotalRecorded,
|
long TotalRecorded,
|
||||||
long DroppedNonNumeric,
|
long DroppedNonNumeric,
|
||||||
long OutboxDropped,
|
long OutboxDropped,
|
||||||
|
long OutboxAppendFailures,
|
||||||
bool LastDrainSucceeded);
|
bool LastDrainSucceeded);
|
||||||
|
|
||||||
private readonly IActorRef _dependencyMux;
|
private readonly IActorRef _dependencyMux;
|
||||||
@@ -93,6 +96,7 @@ public sealed class ContinuousHistorizationRecorder : ReceiveActor, IWithTimers
|
|||||||
private DateTime _nextAllowedDrainUtc = DateTime.MinValue;
|
private DateTime _nextAllowedDrainUtc = DateTime.MinValue;
|
||||||
private long _totalRecorded;
|
private long _totalRecorded;
|
||||||
private long _droppedNonNumeric;
|
private long _droppedNonNumeric;
|
||||||
|
private long _outboxAppendFailures;
|
||||||
private bool _lastDrainSucceeded = true;
|
private bool _lastDrainSucceeded = true;
|
||||||
|
|
||||||
/// <summary>Gets or sets the timer scheduler (set by Akka via <see cref="IWithTimers"/>).</summary>
|
/// <summary>Gets or sets the timer scheduler (set by Akka via <see cref="IWithTimers"/>).</summary>
|
||||||
@@ -153,7 +157,14 @@ public sealed class ContinuousHistorizationRecorder : ReceiveActor, IWithTimers
|
|||||||
ReceiveAsync<VirtualTagActor.DependencyValueChanged>(OnValueChangedAsync);
|
ReceiveAsync<VirtualTagActor.DependencyValueChanged>(OnValueChangedAsync);
|
||||||
Receive<DrainTick>(_ => OnDrainTick());
|
Receive<DrainTick>(_ => OnDrainTick());
|
||||||
Receive<DrainResult>(OnDrainResult);
|
Receive<DrainResult>(OnDrainResult);
|
||||||
ReceiveAsync<GetStatus>(async _ => Sender.Tell(await BuildStatusAsync().ConfigureAwait(false)));
|
ReceiveAsync<GetStatus>(async _ =>
|
||||||
|
{
|
||||||
|
// Capture Sender before the await: although Akka restores the actor context across awaits,
|
||||||
|
// capturing first is the robust idiom (Sender after an await is brittle).
|
||||||
|
IActorRef replyTo = Sender;
|
||||||
|
RecorderStatus status = await BuildStatusAsync().ConfigureAwait(false);
|
||||||
|
replyTo.Tell(status);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <inheritdoc />
|
/// <inheritdoc />
|
||||||
@@ -169,6 +180,9 @@ public sealed class ContinuousHistorizationRecorder : ReceiveActor, IWithTimers
|
|||||||
/// <inheritdoc />
|
/// <inheritdoc />
|
||||||
protected override void PostStop()
|
protected override void PostStop()
|
||||||
{
|
{
|
||||||
|
// Drop our mux interest eagerly (mirrors VirtualTagActor.PostStop) so the mux stops fanning to
|
||||||
|
// us immediately, closing the dead-letter window between this stop and the mux's Terminated.
|
||||||
|
_dependencyMux.Tell(new DependencyMuxActor.UnregisterInterest(Self));
|
||||||
_lifetime.Cancel();
|
_lifetime.Cancel();
|
||||||
_lifetime.Dispose();
|
_lifetime.Dispose();
|
||||||
base.PostStop();
|
base.PostStop();
|
||||||
@@ -200,7 +214,28 @@ public sealed class ContinuousHistorizationRecorder : ReceiveActor, IWithTimers
|
|||||||
|
|
||||||
// Durable boundary: append (awaited so appends stay serialized) BEFORE the value is considered
|
// Durable boundary: append (awaited so appends stay serialized) BEFORE the value is considered
|
||||||
// captured. The outbox drops the oldest entry on capacity overflow and tracks DroppedCount.
|
// captured. The outbox drops the oldest entry on capacity overflow and tracks DroppedCount.
|
||||||
await _outbox.AppendAsync(entry, _lifetime.Token).ConfigureAwait(false);
|
try
|
||||||
|
{
|
||||||
|
await _outbox.AppendAsync(entry, _lifetime.Token).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
catch (OperationCanceledException) when (_lifetime.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
// Normal shutdown raced the append — not a fault. Drop quietly.
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
// A durable-boundary failure (e.g. a PerEntry fsync hitting disk-full / I/O error) must NEVER
|
||||||
|
// propagate out of the handler — that would trip Akka supervision into a restart, and under a
|
||||||
|
// persistent disk fault the actor would restart-loop (re-register → next value → append fails
|
||||||
|
// → restart → …). Mirror the drain path's catch-all: meter the failure (category only, no
|
||||||
|
// value content), drop this value, and stay alive. Do NOT record it or nudge the drain.
|
||||||
|
_outboxAppendFailures++;
|
||||||
|
_log.Warning("ContinuousHistorization: outbox append failed ({Exception}); value dropped.",
|
||||||
|
ex.GetType().Name);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
_totalRecorded++;
|
_totalRecorded++;
|
||||||
|
|
||||||
// Nudge a prompt drain attempt; the DrainTick handler de-dups (already draining) and honours
|
// Nudge a prompt drain attempt; the DrainTick handler de-dups (already draining) and honours
|
||||||
@@ -326,6 +361,7 @@ public sealed class ContinuousHistorizationRecorder : ReceiveActor, IWithTimers
|
|||||||
_totalRecorded,
|
_totalRecorded,
|
||||||
_droppedNonNumeric,
|
_droppedNonNumeric,
|
||||||
_outbox.DroppedCount,
|
_outbox.DroppedCount,
|
||||||
|
_outboxAppendFailures,
|
||||||
_lastDrainSucceeded);
|
_lastDrainSucceeded);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
+56
-3
@@ -66,8 +66,40 @@ public sealed class ContinuousHistorizationRecorderTests : TestKit
|
|||||||
|
|
||||||
rec.Tell(new VirtualTagActor.DependencyValueChanged("Pump1.Temp", 7.0, DateTime.UtcNow));
|
rec.Tell(new VirtualTagActor.DependencyValueChanged("Pump1.Temp", 7.0, DateTime.UtcNow));
|
||||||
|
|
||||||
|
// Prove the drain actually RAN, not just that the append happened: assert the writer was
|
||||||
|
// invoked (the fake records every value, even on Succeed=false) AND that the outbox stayed at 1
|
||||||
|
// (RemoveAsync was NOT called, so the un-acked entry is retained for retry). Count==1 alone is
|
||||||
|
// true the instant the append lands and would not catch a silently-broken drain.
|
||||||
await AwaitAssertAsync(async () =>
|
await AwaitAssertAsync(async () =>
|
||||||
Assert.Equal(1, await outbox.CountAsync(default))); // not acked -> retained for retry
|
{
|
||||||
|
Assert.Contains(writer.Snapshot(), w => w.Tag == "Pump1.Temp" && w.Value == 7.0);
|
||||||
|
Assert.Equal(1, await outbox.CountAsync(default));
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Retry_after_writer_failure_eventually_acks()
|
||||||
|
{
|
||||||
|
var mux = CreateTestProbe();
|
||||||
|
// First drain fails, the next succeeds — exercises the retry → success → ack path.
|
||||||
|
var writer = new FakeValueWriter { Succeed = true, FailFirstN = 1 };
|
||||||
|
var outbox = new InMemoryOutbox();
|
||||||
|
|
||||||
|
// Short backoff so the retry fires promptly; the assert below is still time-bounded and
|
||||||
|
// deterministic (AwaitAssert polls — no Thread.Sleep).
|
||||||
|
var rec = Sys.ActorOf(ContinuousHistorizationRecorder.Props(
|
||||||
|
mux.Ref, writer, outbox, new[] { "Pump1.Temp" },
|
||||||
|
minBackoff: TimeSpan.FromMilliseconds(50),
|
||||||
|
maxBackoff: TimeSpan.FromMilliseconds(200)));
|
||||||
|
|
||||||
|
rec.Tell(new VirtualTagActor.DependencyValueChanged("Pump1.Temp", 13.0, DateTime.UtcNow));
|
||||||
|
|
||||||
|
// The first drain returns false (entry retained); after the backoff the retry drain succeeds
|
||||||
|
// and acks, truncating the outbox to 0.
|
||||||
|
await AwaitAssertAsync(async () =>
|
||||||
|
Assert.Equal(0, await outbox.CountAsync(default)), TimeSpan.FromSeconds(5));
|
||||||
|
|
||||||
|
Assert.True(writer.CallCount >= 2, "the writer must have been called at least twice (a retry happened)");
|
||||||
}
|
}
|
||||||
|
|
||||||
[Fact]
|
[Fact]
|
||||||
@@ -99,9 +131,27 @@ public sealed class ContinuousHistorizationRecorderTests : TestKit
|
|||||||
{
|
{
|
||||||
private readonly Lock _gate = new();
|
private readonly Lock _gate = new();
|
||||||
private readonly List<WrittenValue> _written = new();
|
private readonly List<WrittenValue> _written = new();
|
||||||
|
private int _calls;
|
||||||
|
|
||||||
public bool Succeed { get; init; } = true;
|
public bool Succeed { get; init; } = true;
|
||||||
|
|
||||||
|
/// <summary>When > 0, the first N calls fail (return false) regardless of <see cref="Succeed"/>;
|
||||||
|
/// later calls use <see cref="Succeed"/>. Lets a test prove the retry → success → ack path while
|
||||||
|
/// leaving the plain <see cref="Succeed"/> behaviour (FailFirstN==0) untouched.</summary>
|
||||||
|
public int FailFirstN { get; init; }
|
||||||
|
|
||||||
|
/// <summary>Lifetime count of <see cref="WriteLiveValuesAsync"/> invocations (proves a retry ran).</summary>
|
||||||
|
public int CallCount
|
||||||
|
{
|
||||||
|
get
|
||||||
|
{
|
||||||
|
lock (_gate)
|
||||||
|
{
|
||||||
|
return _calls;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public IReadOnlyList<WrittenValue> Snapshot()
|
public IReadOnlyList<WrittenValue> Snapshot()
|
||||||
{
|
{
|
||||||
lock (_gate)
|
lock (_gate)
|
||||||
@@ -115,13 +165,16 @@ public sealed class ContinuousHistorizationRecorderTests : TestKit
|
|||||||
{
|
{
|
||||||
lock (_gate)
|
lock (_gate)
|
||||||
{
|
{
|
||||||
|
_calls++;
|
||||||
foreach (HistorizationValue v in values)
|
foreach (HistorizationValue v in values)
|
||||||
{
|
{
|
||||||
_written.Add(new WrittenValue(tag, v.Value, v.Quality, v.TimestampUtc));
|
_written.Add(new WrittenValue(tag, v.Value, v.Quality, v.TimestampUtc));
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
return Task.FromResult(Succeed);
|
// Fail the first FailFirstN calls; thereafter honour Succeed.
|
||||||
|
bool result = _calls > FailFirstN && Succeed;
|
||||||
|
return Task.FromResult(result);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user