feat(historian-gateway): ContinuousHistorizationRecorder actor (outbox->WriteLiveValues, backoff)

Continuous-historization engine for non-Galaxy driver tags. Registers
interest with the per-node DependencyMuxActor for the historized refs and
taps the VirtualTagActor.DependencyValueChanged values the mux fans:
coerce to numeric -> append to the durable IHistorizationOutbox (crash
boundary) -> off-thread drain writes batches through IHistorianValueWriter
and acks (FIFO-truncates) on success, backing off (exponential, capped) on
failure. Non-numeric values are dropped + metered (SQL analog path is
numeric-only).

- New seam IHistorianValueWriter + HistorizationValue in Core.Abstractions
  so Runtime stays free of the gRPC driver.
- GatewayHistorianValueWriter (driver) adapts IHistorianGatewayClient.
  WriteLiveValues: HistorizationValue -> HistorianLiveValue proto, WriteAck
  Success||Queued -> true; non-throwing (errors -> false for retry).
- Drain runs via PipeTo(Self) so the mailbox never blocks on the gateway
  write; appends awaited on the actor thread to stay serialized.

Adaptation vs plan: the mux fans DependencyValueChanged (TagId/Value/
TimestampUtc, no quality), not DriverInstanceActor.AttributeValuePublished,
so values are recorded Good-quality (192) by the same convention the
scripted-alarm host uses.

Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii
This commit is contained in:
Joseph Doherty
2026-06-26 18:18:34 -04:00
parent 8b4028de84
commit bbfbc7b215
4 changed files with 681 additions and 0 deletions
@@ -0,0 +1,35 @@
namespace ZB.MOM.WW.OtOpcUa.Core.Abstractions.Historian;
/// <summary>
/// One numeric sample the continuous-historization recorder drains to the historian's SQL
/// live-value write path. Carries the minimal payload that path can ingest: an optional UTC
/// timestamp, the coerced numeric value, and an OPC-DA quality byte.
/// </summary>
/// <param name="TimestampUtc">
/// UTC source timestamp of the sample, or <c>null</c> to defer to the historian's server-stamped
/// current-time path (the SQL writer uses <c>SYSDATETIME()</c> when the timestamp is absent).
/// </param>
/// <param name="Value">The coerced numeric value (the SQL analog write path is numeric-only).</param>
/// <param name="Quality">OPC-DA-derived quality code carried to the historian (192 = Good).</param>
public readonly record struct HistorizationValue(DateTime? TimestampUtc, double Value, ushort Quality);
/// <summary>
/// Seam over the historian's live-value write path used by the continuous-historization recorder.
/// Lives in the abstraction layer so the Runtime recorder depends on it without taking a hard
/// reference on the gRPC gateway driver; the gateway driver supplies the concrete adapter
/// (<c>GatewayHistorianValueWriter</c>).
/// </summary>
public interface IHistorianValueWriter
{
/// <summary>
/// Writes a batch of live values for a single tag through the historian's SQL live-write path.
/// Implementations are expected to be non-throwing: a transport/gateway error is surfaced as a
/// <c>false</c> result so the recorder retains the entries and retries, rather than as an
/// exception.
/// </summary>
/// <param name="tag">Fully-qualified historian tag the values are recorded against.</param>
/// <param name="values">The numeric samples to write, in append order.</param>
/// <param name="ct">Cancellation token.</param>
/// <returns><c>true</c> on a successful (or durably-queued) gateway ack; <c>false</c> on a retryable failure.</returns>
Task<bool> WriteLiveValuesAsync(string tag, IReadOnlyList<HistorizationValue> values, CancellationToken ct);
}
@@ -0,0 +1,89 @@
using Google.Protobuf.WellKnownTypes;
using Microsoft.Extensions.Logging;
using ZB.MOM.WW.HistorianGateway.Contracts.Grpc;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions.Historian;
namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Recorder;
/// <summary>
/// Adapts the gateway client's <c>WriteLiveValues</c> RPC to the Runtime recorder's
/// <see cref="IHistorianValueWriter"/> seam. Maps each <see cref="HistorizationValue"/> onto a
/// proto <see cref="HistorianLiveValue"/> (numeric value + quality, with an optional timestamp —
/// a null timestamp leaves the proto field unset so the gateway's SQL writer server-stamps the
/// current time) and folds the returned <see cref="WriteAck"/> to a single retry/ack boolean.
/// </summary>
/// <remarks>
/// <para>
/// <b>Non-throwing by contract.</b> The recorder's drain loop stays simple by treating the
/// writer as never throwing: any gateway/transport error (and a non-success, non-queued ack)
/// is mapped to <c>false</c> so the recorder retains the outbox entries and retries. Only the
/// failure category (the exception type name) is logged — never tag values, hostnames, or
/// credentials.
/// </para>
/// <para>
/// A success ack OR a store-forward-queued ack maps to <c>true</c>: a value the gateway
/// durably queued must not be re-drained.
/// </para>
/// </remarks>
public sealed class GatewayHistorianValueWriter : IHistorianValueWriter
{
private readonly IHistorianGatewayClient _client;
private readonly ILogger<GatewayHistorianValueWriter> _logger;
/// <summary>Creates the writer over a gateway client seam.</summary>
/// <param name="client">The gateway client used for the <c>WriteLiveValues</c> write path.</param>
/// <param name="logger">Logger for failure-category diagnostics (never logs value content).</param>
public GatewayHistorianValueWriter(IHistorianGatewayClient client, ILogger<GatewayHistorianValueWriter> logger)
{
_client = client ?? throw new ArgumentNullException(nameof(client));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
/// <inheritdoc />
public async Task<bool> WriteLiveValuesAsync(
string tag, IReadOnlyList<HistorizationValue> values, CancellationToken ct)
{
ArgumentException.ThrowIfNullOrEmpty(tag);
ArgumentNullException.ThrowIfNull(values);
if (values.Count == 0)
{
// Nothing to write is a trivially-successful ack — the recorder treats it as drained.
return true;
}
try
{
var liveValues = new List<HistorianLiveValue>(values.Count);
foreach (HistorizationValue value in values)
{
var live = new HistorianLiveValue
{
NumericValue = value.Value,
Quality = value.Quality,
};
if (value.TimestampUtc is { } timestampUtc)
{
// Timestamp.FromDateTime requires Utc kind; coerce defensively. A null timestamp
// leaves the proto field unset -> the gateway's SQL writer server-stamps now.
live.Timestamp = Timestamp.FromDateTime(DateTime.SpecifyKind(timestampUtc, DateTimeKind.Utc));
}
liveValues.Add(live);
}
WriteAck ack = await _client.WriteLiveValuesAsync(tag, liveValues, ct).ConfigureAwait(false);
return ack.Success || ack.Queued;
}
catch (Exception exception)
{
// NEVER throw out of the writer — the recorder's drain expects a bool so its retain/retry
// logic stays simple. Log only the failure category (no value content, hostnames, or creds).
_logger.LogDebug(
"WriteLiveValues failed ({Exception}); recorder will retain and retry.",
exception.GetType().Name);
return false;
}
}
}
@@ -0,0 +1,353 @@
using Akka.Actor;
using Akka.Event;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions.Historian;
using ZB.MOM.WW.OtOpcUa.Runtime.VirtualTags;
namespace ZB.MOM.WW.OtOpcUa.Runtime.Historian;
/// <summary>
/// Continuous-historization engine for non-Galaxy (driver) tags. Registers interest with the
/// per-node <see cref="DependencyMuxActor"/> for the configured historized tag refs, then taps the
/// <see cref="VirtualTagActor.DependencyValueChanged"/> values the mux fans for those refs:
/// value → coerce to numeric → append to the durable <see cref="IHistorizationOutbox"/> (the crash
/// boundary) → a self-paced drain writes the outbox through <see cref="IHistorianValueWriter"/>'s
/// live-value path and acks (truncates) on success.
/// </summary>
/// <remarks>
/// <para>
/// <b>Message path (grounded against the real actors).</b> The mux fans
/// <see cref="VirtualTagActor.DependencyValueChanged"/> — which carries only
/// <c>(TagId, Value, TimestampUtc)</c> and <em>no quality</em> — to every registered subscriber.
/// The recorder reuses that existing fan-out (no change to <c>DriverHostActor</c>), so values
/// arrive Good-quality by the same convention the scripted-alarm host uses (the mux only
/// forwards driver-published values); historized samples are recorded with
/// <see cref="GoodQuality"/>.
/// </para>
/// <para>
/// <b>Numeric-only gate.</b> The SQL analog live-value write path is numeric-only, so a
/// non-coercible value (string/null/reference) is dropped and metered rather than appended.
/// </para>
/// <para>
/// <b>Non-blocking drain.</b> Appends run on the actor thread (an awaited
/// <see cref="IHistorizationOutbox.AppendAsync"/> so appends stay serialized — the durable
/// boundary completes before the next message), but the drain runs off the actor thread and
/// pipes its outcome back to <c>Self</c> (<see cref="DrainResult"/>) so the mailbox is never
/// blocked on the gateway write. A drain never throws into the actor: the drain task catches
/// everything and reports a failed <see cref="DrainResult"/>, which backs the next attempt off
/// (exponential, capped).
/// </para>
/// </remarks>
public sealed class ContinuousHistorizationRecorder : ReceiveActor, IWithTimers
{
/// <summary>OPC-DA quality byte recorded for mux-fanned values (the mux drops quality; driver-published
/// values are Good by the same convention the scripted-alarm host applies).</summary>
public const ushort GoodQuality = 192;
private const string DrainTimerKey = "drain";
/// <summary>Self-tick that triggers a drain attempt (from the timer or a fresh append).</summary>
private sealed class DrainTick
{
public static readonly DrainTick Instance = new();
private DrainTick() { }
}
/// <summary>Outcome of one off-thread drain pass, piped back to the actor thread.</summary>
private sealed record DrainResult(bool Success, int Acked, int Attempted);
/// <summary>Ask the recorder for its current counters (used by the meter/health wiring).</summary>
public sealed class GetStatus
{
/// <summary>The singleton request instance.</summary>
public static readonly GetStatus Instance = new();
private GetStatus() { }
}
/// <summary>A point-in-time snapshot of the recorder's counters.</summary>
/// <param name="QueuedDepth">Un-acked entries currently held in the durable outbox.</param>
/// <param name="TotalRecorded">Lifetime count of values appended to the outbox.</param>
/// <param name="DroppedNonNumeric">Lifetime count of values dropped for not being numeric-coercible.</param>
/// <param name="OutboxDropped">Lifetime count of entries the outbox dropped on capacity overflow.</param>
/// <param name="LastDrainSucceeded">Whether the most recent drain pass acked cleanly.</param>
public sealed record RecorderStatus(
int QueuedDepth,
long TotalRecorded,
long DroppedNonNumeric,
long OutboxDropped,
bool LastDrainSucceeded);
private readonly IActorRef _dependencyMux;
private readonly IHistorianValueWriter _writer;
private readonly IHistorizationOutbox _outbox;
private readonly IReadOnlyList<string> _historizedRefs;
private readonly HashSet<string> _historizedSet;
private readonly int _drainBatchSize;
private readonly TimeSpan _drainInterval;
private readonly TimeSpan _minBackoff;
private readonly TimeSpan _maxBackoff;
private readonly ILoggingAdapter _log = Context.GetLogger();
private readonly CancellationTokenSource _lifetime = new();
private bool _draining;
private TimeSpan _currentBackoff;
private DateTime _nextAllowedDrainUtc = DateTime.MinValue;
private long _totalRecorded;
private long _droppedNonNumeric;
private bool _lastDrainSucceeded = true;
/// <summary>Gets or sets the timer scheduler (set by Akka via <see cref="IWithTimers"/>).</summary>
public ITimerScheduler Timers { get; set; } = null!;
/// <summary>Builds props for the recorder.</summary>
/// <param name="dependencyMux">The per-node dependency mux to register historized-ref interest with.</param>
/// <param name="writer">The live-value write seam the drain pushes batches through.</param>
/// <param name="outbox">The durable FIFO outbox values are appended to before they are written.</param>
/// <param name="historizedRefs">The fully-qualified tag refs to historize.</param>
/// <param name="drainBatchSize">Max entries peeked + written per drain pass (must be positive).</param>
/// <param name="drainInterval">Steady drain cadence; also the post-success reschedule.</param>
/// <param name="minBackoff">Initial retry backoff after a failed drain.</param>
/// <param name="maxBackoff">Cap on the exponential retry backoff.</param>
/// <returns>Props for the actor.</returns>
public static Props Props(
IActorRef dependencyMux,
IHistorianValueWriter writer,
IHistorizationOutbox outbox,
IReadOnlyList<string> historizedRefs,
int drainBatchSize = 64,
TimeSpan? drainInterval = null,
TimeSpan? minBackoff = null,
TimeSpan? maxBackoff = null) =>
Akka.Actor.Props.Create(() => new ContinuousHistorizationRecorder(
dependencyMux, writer, outbox, historizedRefs, drainBatchSize, drainInterval, minBackoff, maxBackoff));
/// <summary>Initializes a new instance of the <see cref="ContinuousHistorizationRecorder"/> class.</summary>
/// <param name="dependencyMux">The per-node dependency mux to register historized-ref interest with.</param>
/// <param name="writer">The live-value write seam the drain pushes batches through.</param>
/// <param name="outbox">The durable FIFO outbox values are appended to before they are written.</param>
/// <param name="historizedRefs">The fully-qualified tag refs to historize.</param>
/// <param name="drainBatchSize">Max entries peeked + written per drain pass (must be positive).</param>
/// <param name="drainInterval">Steady drain cadence; also the post-success reschedule.</param>
/// <param name="minBackoff">Initial retry backoff after a failed drain.</param>
/// <param name="maxBackoff">Cap on the exponential retry backoff.</param>
public ContinuousHistorizationRecorder(
IActorRef dependencyMux,
IHistorianValueWriter writer,
IHistorizationOutbox outbox,
IReadOnlyList<string> historizedRefs,
int drainBatchSize = 64,
TimeSpan? drainInterval = null,
TimeSpan? minBackoff = null,
TimeSpan? maxBackoff = null)
{
_dependencyMux = dependencyMux ?? throw new ArgumentNullException(nameof(dependencyMux));
_writer = writer ?? throw new ArgumentNullException(nameof(writer));
_outbox = outbox ?? throw new ArgumentNullException(nameof(outbox));
_historizedRefs = historizedRefs ?? throw new ArgumentNullException(nameof(historizedRefs));
_historizedSet = new HashSet<string>(_historizedRefs, StringComparer.Ordinal);
_drainBatchSize = drainBatchSize > 0 ? drainBatchSize : 64;
_drainInterval = drainInterval is { } di && di > TimeSpan.Zero ? di : TimeSpan.FromSeconds(2);
_minBackoff = minBackoff is { } mb && mb > TimeSpan.Zero ? mb : TimeSpan.FromSeconds(1);
_maxBackoff = maxBackoff is { } xb && xb > TimeSpan.Zero ? xb : TimeSpan.FromSeconds(30);
_currentBackoff = _minBackoff;
ReceiveAsync<VirtualTagActor.DependencyValueChanged>(OnValueChangedAsync);
Receive<DrainTick>(_ => OnDrainTick());
Receive<DrainResult>(OnDrainResult);
ReceiveAsync<GetStatus>(async _ => Sender.Tell(await BuildStatusAsync().ConfigureAwait(false)));
}
/// <inheritdoc />
protected override void PreStart()
{
// Register interest for the historized refs so the mux fans their DependencyValueChanged to us.
_dependencyMux.Tell(new DependencyMuxActor.RegisterInterest(_historizedRefs, Self));
// Seed the steady drain cadence; appends also nudge a prompt drain (see OnValueChangedAsync).
Timers.StartSingleTimer(DrainTimerKey, DrainTick.Instance, _drainInterval);
base.PreStart();
}
/// <inheritdoc />
protected override void PostStop()
{
_lifetime.Cancel();
_lifetime.Dispose();
base.PostStop();
}
private async Task OnValueChangedAsync(VirtualTagActor.DependencyValueChanged msg)
{
// Defensive: only historize refs we registered interest for (the mux already scopes to these).
if (!_historizedSet.Contains(msg.TagId))
{
return;
}
if (!TryCoerceToDouble(msg.Value, out double numeric))
{
// The SQL analog live-value path is numeric-only — drop non-coercible values (string/null/
// reference) and meter the drop. No value content is logged.
_droppedNonNumeric++;
_log.Debug("ContinuousHistorization: dropped non-numeric value for a historized ref.");
return;
}
var entry = new HistorizationOutboxEntry(
Guid.NewGuid(),
msg.TagId,
numeric,
GoodQuality,
DateTime.SpecifyKind(msg.TimestampUtc, DateTimeKind.Utc));
// Durable boundary: append (awaited so appends stay serialized) BEFORE the value is considered
// captured. The outbox drops the oldest entry on capacity overflow and tracks DroppedCount.
await _outbox.AppendAsync(entry, _lifetime.Token).ConfigureAwait(false);
_totalRecorded++;
// Nudge a prompt drain attempt; the DrainTick handler de-dups (already draining) and honours
// any active backoff window, so this never bypasses a failure cooldown.
Self.Tell(DrainTick.Instance);
}
private void OnDrainTick()
{
if (_draining)
{
// A drain is already in flight; it reschedules itself on completion.
return;
}
if (DateTime.UtcNow < _nextAllowedDrainUtc)
{
// Inside a post-failure backoff cooldown — the scheduled timer will fire the retry.
return;
}
_draining = true;
DrainOnceAsync(_lifetime.Token)
.PipeTo(Self, success: result => result, failure: ToFailedDrain);
}
// Defensive failure mapper: DrainOnceAsync catches its own exceptions, so this only fires if the
// PipeTo plumbing itself faults. Map to a failed drain so the actor backs off rather than dies.
private static object ToFailedDrain(Exception _) => new DrainResult(false, 0, 0);
private async Task<DrainResult> DrainOnceAsync(CancellationToken ct)
{
try
{
IReadOnlyList<HistorizationOutboxEntry> batch = await _outbox.PeekBatchAsync(_drainBatchSize, ct)
.ConfigureAwait(false);
if (batch.Count == 0)
{
return new DrainResult(true, 0, 0);
}
// Write per-tag batches. The outbox's RemoveAsync truncates the FIFO head THROUGH the acked
// id, so partial per-tag acking could drop un-written older entries; ack all-or-nothing —
// only when every tag's batch wrote do we truncate the whole peeked prefix (one commit).
var allOk = true;
foreach (IGrouping<string, HistorizationOutboxEntry> group in
batch.GroupBy(e => e.Tag, StringComparer.Ordinal))
{
var values = group
.Select(e => new HistorizationValue(e.TimestampUtc, e.NumericValue, e.Quality))
.ToList();
bool ok;
try
{
ok = await _writer.WriteLiveValuesAsync(group.Key, values, ct).ConfigureAwait(false);
}
catch
{
// The writer is non-throwing by contract; guard defensively so one tag's fault
// cannot fail the whole drain task.
ok = false;
}
if (!ok)
{
allOk = false;
}
}
if (!allOk)
{
// Leave the entire peeked prefix queued for the next (backed-off) attempt.
return new DrainResult(false, 0, batch.Count);
}
// Ack the whole prefix by truncating through the last entry (FIFO head advance + commit).
await _outbox.RemoveAsync(batch[^1].Id, ct).ConfigureAwait(false);
return new DrainResult(true, batch.Count, batch.Count);
}
catch (OperationCanceledException) when (ct.IsCancellationRequested)
{
// Shutdown mid-drain: not a fault. Entries stay durable and drain next startup.
return new DrainResult(true, 0, 0);
}
catch (Exception exception)
{
// A drain exception must NEVER kill the actor. Log the category only (no value content) and
// report a failed pass so the next attempt backs off.
_log.Warning("ContinuousHistorization: drain pass failed ({Exception}); will retry.",
exception.GetType().Name);
return new DrainResult(false, 0, 0);
}
}
private void OnDrainResult(DrainResult result)
{
_draining = false;
_lastDrainSucceeded = result.Success;
if (result.Success)
{
// Healthy: reset backoff, clear the cooldown, and keep a steady cadence to catch stragglers
// appended during this pass.
_currentBackoff = _minBackoff;
_nextAllowedDrainUtc = DateTime.MinValue;
Timers.StartSingleTimer(DrainTimerKey, DrainTick.Instance, _drainInterval);
return;
}
// Failed: schedule the retry after the current backoff, then grow it (capped) for the next.
TimeSpan delay = _currentBackoff;
_nextAllowedDrainUtc = DateTime.UtcNow + delay;
_currentBackoff = Min(TimeSpan.FromTicks(_currentBackoff.Ticks * 2), _maxBackoff);
Timers.StartSingleTimer(DrainTimerKey, DrainTick.Instance, delay);
}
private async Task<RecorderStatus> BuildStatusAsync()
{
int depth = await _outbox.CountAsync(_lifetime.Token).ConfigureAwait(false);
return new RecorderStatus(
depth,
_totalRecorded,
_droppedNonNumeric,
_outbox.DroppedCount,
_lastDrainSucceeded);
}
private static TimeSpan Min(TimeSpan a, TimeSpan b) => a < b ? a : b;
private static bool TryCoerceToDouble(object? value, out double result)
{
switch (value)
{
case double d: result = d; return true;
case float f: result = f; return true;
case int i: result = i; return true;
case long l: result = l; return true;
case short s: result = s; return true;
case ushort us: result = us; return true;
case uint ui: result = ui; return true;
case ulong ul: result = ul; return true;
case byte b: result = b; return true;
case sbyte sb: result = sb; return true;
case decimal dec: result = (double)dec; return true;
case bool bo: result = bo ? 1d : 0d; return true; // Boolean historizes as Int1 (1/0).
default: result = 0d; return false; // string / null / DateTime / reference -> dropped.
}
}
}
@@ -0,0 +1,204 @@
using Akka.Actor;
using Akka.TestKit.Xunit2;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions.Historian;
using ZB.MOM.WW.OtOpcUa.Runtime.Historian;
using ZB.MOM.WW.OtOpcUa.Runtime.VirtualTags;
namespace ZB.MOM.WW.OtOpcUa.Runtime.Tests.Historian;
/// <summary>
/// Verifies the <see cref="ContinuousHistorizationRecorder"/>: it registers historized-ref
/// interest with the dependency mux on start, appends mux-fanned
/// <see cref="VirtualTagActor.DependencyValueChanged"/> values to the durable outbox then drains
/// them to the live-value writer, retains entries when the writer fails, and drops non-numeric
/// values (the SQL analog write path is numeric-only).
/// </summary>
/// <remarks>
/// Adapted from the plan's Task 17: the recorder handles the REAL fan-out message the mux emits —
/// <see cref="VirtualTagActor.DependencyValueChanged"/> (TagId/Value/TimestampUtc, no quality) —
/// not <c>DriverInstanceActor.AttributeValuePublished</c>. The mux drops quality, so the recorder
/// records Good-quality (the same convention the scripted-alarm host uses for mux values).
/// </remarks>
public sealed class ContinuousHistorizationRecorderTests : TestKit
{
[Fact]
public void Registers_interest_for_historized_refs_on_start()
{
var mux = CreateTestProbe();
var writer = new FakeValueWriter();
var outbox = new InMemoryOutbox();
Sys.ActorOf(ContinuousHistorizationRecorder.Props(
mux.Ref, writer, outbox, historizedRefs: new[] { "Pump1.Temp" }));
var reg = mux.ExpectMsg<DependencyMuxActor.RegisterInterest>();
Assert.Contains("Pump1.Temp", reg.TagRefs);
}
[Fact]
public async Task DependencyValueChanged_appends_to_outbox_then_drains_to_writer()
{
var mux = CreateTestProbe();
var writer = new FakeValueWriter { Succeed = true };
var outbox = new InMemoryOutbox();
var rec = Sys.ActorOf(ContinuousHistorizationRecorder.Props(
mux.Ref, writer, outbox, new[] { "Pump1.Temp" }));
rec.Tell(new VirtualTagActor.DependencyValueChanged("Pump1.Temp", 42.0, DateTime.UtcNow));
await AwaitAssertAsync(() =>
Assert.Contains(writer.Snapshot(), w => w.Tag == "Pump1.Temp" && w.Value == 42.0));
await AwaitAssertAsync(async () =>
Assert.Equal(0, await outbox.CountAsync(default))); // acked -> truncated
}
[Fact]
public async Task Writer_failure_keeps_entry_for_retry()
{
var mux = CreateTestProbe();
var writer = new FakeValueWriter { Succeed = false };
var outbox = new InMemoryOutbox();
var rec = Sys.ActorOf(ContinuousHistorizationRecorder.Props(
mux.Ref, writer, outbox, new[] { "Pump1.Temp" }));
rec.Tell(new VirtualTagActor.DependencyValueChanged("Pump1.Temp", 7.0, DateTime.UtcNow));
await AwaitAssertAsync(async () =>
Assert.Equal(1, await outbox.CountAsync(default))); // not acked -> retained for retry
}
[Fact]
public async Task Non_numeric_value_is_dropped_with_metric()
{
var mux = CreateTestProbe();
var writer = new FakeValueWriter();
var outbox = new InMemoryOutbox();
var rec = Sys.ActorOf(ContinuousHistorizationRecorder.Props(
mux.Ref, writer, outbox, new[] { "Pump1.Name" }));
rec.Tell(new VirtualTagActor.DependencyValueChanged("Pump1.Name", "text", DateTime.UtcNow));
// A string value can't ride the SQL analog write path -> dropped (metered), never appended.
await AwaitAssertAsync(async () =>
{
var status = await rec.Ask<ContinuousHistorizationRecorder.RecorderStatus>(
ContinuousHistorizationRecorder.GetStatus.Instance, TimeSpan.FromSeconds(3));
Assert.Equal(1, status.DroppedNonNumeric);
Assert.Equal(0, status.QueuedDepth);
});
Assert.Empty(writer.Snapshot());
}
/// <summary>In-memory <see cref="IHistorianValueWriter"/> double: records every value written and
/// returns <see cref="Succeed"/> as the ack. Thread-safe — the recorder drains off the actor thread.</summary>
private sealed class FakeValueWriter : IHistorianValueWriter
{
private readonly Lock _gate = new();
private readonly List<WrittenValue> _written = new();
public bool Succeed { get; init; } = true;
public IReadOnlyList<WrittenValue> Snapshot()
{
lock (_gate)
{
return _written.ToArray();
}
}
public Task<bool> WriteLiveValuesAsync(
string tag, IReadOnlyList<HistorizationValue> values, CancellationToken ct)
{
lock (_gate)
{
foreach (HistorizationValue v in values)
{
_written.Add(new WrittenValue(tag, v.Value, v.Quality, v.TimestampUtc));
}
}
return Task.FromResult(Succeed);
}
}
private sealed record WrittenValue(string Tag, double Value, ushort Quality, DateTime? TimestampUtc);
/// <summary>In-memory <see cref="IHistorizationOutbox"/> double honouring the FIFO-truncate
/// <see cref="IHistorizationOutbox.RemoveAsync"/> contract (remove the id plus any older entries
/// ahead of it) and the optional drop-oldest capacity. Thread-safe.</summary>
private sealed class InMemoryOutbox : IHistorizationOutbox
{
private readonly Lock _gate = new();
private readonly List<HistorizationOutboxEntry> _entries = new();
private readonly int _capacity;
private long _dropped;
public InMemoryOutbox(int capacity = 0) => _capacity = capacity;
public long DroppedCount
{
get
{
lock (_gate)
{
return _dropped;
}
}
}
public ValueTask AppendAsync(HistorizationOutboxEntry entry, CancellationToken ct)
{
lock (_gate)
{
_entries.Add(entry);
while (_capacity > 0 && _entries.Count > _capacity)
{
_entries.RemoveAt(0); // drop oldest on overflow
_dropped++;
}
}
return ValueTask.CompletedTask;
}
public ValueTask<IReadOnlyList<HistorizationOutboxEntry>> PeekBatchAsync(int max, CancellationToken ct)
{
lock (_gate)
{
IReadOnlyList<HistorizationOutboxEntry> batch = _entries.Take(max).ToArray();
return ValueTask.FromResult(batch);
}
}
public ValueTask RemoveAsync(Guid id, CancellationToken ct)
{
lock (_gate)
{
int idx = _entries.FindIndex(e => e.Id == id);
if (idx >= 0)
{
// FIFO ack: remove the target plus everything ahead of it in the buffer.
_entries.RemoveRange(0, idx + 1);
}
}
return ValueTask.CompletedTask;
}
public ValueTask<int> CountAsync(CancellationToken ct)
{
lock (_gate)
{
return ValueTask.FromResult(_entries.Count);
}
}
public void Dispose()
{
}
}
}