feat(historian-gateway): wire ContinuousHistorizationRecorder into DI + hosted lifecycle + meters

Bind ContinuousHistorizationOptions (Enabled/OutboxPath/CommitMode/
CommitIntervalMs/DrainBatchSize/DrainIntervalSeconds/Capacity/backoff) with a
warn-only Validate(); gated on Enabled AND the ServerHistorian gateway being
configured, the Host registers the durable FasterLogHistorizationOutbox (container
-disposed) + a gateway-backed GatewayHistorianValueWriter, and binds outbox
depth/dropped observable gauges on the central scraped meter. WithOtOpcUaRuntimeActors
spawns the recorder (over the same dependency-mux ref) when the options + writer +
outbox resolve, registering ContinuousHistorizationRecorderKey. Spawned with an EMPTY
historized-ref set: the deployed address space builds later, so ref population is a
documented follow-on (a later SetHistorizedRefs feed) — T18 wires the actor + outbox
+ writer + meters; the ref feed is the known remaining gap.

Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii
This commit is contained in:
Joseph Doherty
2026-06-26 18:47:20 -04:00
parent 97528c500f
commit 2a5c717755
6 changed files with 384 additions and 0 deletions
@@ -0,0 +1,59 @@
using ZB.MOM.WW.OtOpcUa.Commons.Observability;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions.Historian;
namespace ZB.MOM.WW.OtOpcUa.Runtime.Historian;
/// <summary>
/// Observable-gauge instruments for the continuous-historization durable outbox, hung off the
/// central <see cref="OtOpcUaTelemetry.Meter"/> (the same meter the Host's OpenTelemetry /
/// Prometheus binding already scrapes), so no extra meter allowlist entry is needed.
/// <para>
/// The gauges read the bound outbox directly rather than Ask-ing the recorder actor: an
/// <c>Ask</c> inside a synchronous observable-gauge callback would block the metrics-collection
/// thread on the actor mailbox. The outbox exposes both gauge sources cheaply and
/// non-blockingly — <see cref="IHistorizationOutbox.CountAsync"/> completes synchronously (it
/// just reads an in-memory FIFO count) and <see cref="IHistorizationOutbox.DroppedCount"/> is a
/// plain property. The recorder's other counters (TotalRecorded / DroppedNonNumeric /
/// OutboxAppendFailures) remain available via its <c>GetStatus</c> Ask for a health hook, but are
/// not surfaced as gauges here (Ask-in-gauge is the awkward path the plan calls out).
/// </para>
/// </summary>
public static class ContinuousHistorizationMetrics
{
private static volatile IHistorizationOutbox? _outbox;
static ContinuousHistorizationMetrics()
{
// Registered once on first touch (i.e. when BindOutbox is first called at host start). Instruments
// are no-op until a listener attaches, so an unbound process pays nothing; the callbacks are
// null-safe and return 0 until BindOutbox supplies the live outbox.
OtOpcUaTelemetry.Meter.CreateObservableGauge(
"otopcua.historization.outbox.depth",
ObserveDepth,
unit: "{entry}",
description: "Un-acked entries currently held in the continuous-historization durable outbox.");
OtOpcUaTelemetry.Meter.CreateObservableGauge(
"otopcua.historization.outbox.dropped",
ObserveDropped,
unit: "{entry}",
description: "Lifetime entries the continuous-historization outbox dropped on capacity overflow.");
}
/// <summary>Binds the process outbox the gauges observe. Called once by the Host when continuous
/// historization is enabled; subsequent calls re-point the gauges (idempotent in practice).</summary>
/// <param name="outbox">The durable outbox the recorder drains.</param>
public static void BindOutbox(IHistorizationOutbox outbox)
=> _outbox = outbox ?? throw new ArgumentNullException(nameof(outbox));
private static long ObserveDepth()
{
IHistorizationOutbox? outbox = _outbox;
if (outbox is null) return 0L;
// CountAsync over the FasterLog outbox completes synchronously (in-memory FIFO count); read the
// already-completed result without blocking. A (theoretical) pending result reports 0 this scrape.
ValueTask<int> pending = outbox.CountAsync(CancellationToken.None);
return pending.IsCompletedSuccessfully ? pending.Result : 0L;
}
private static long ObserveDropped() => _outbox?.DroppedCount ?? 0L;
}
@@ -0,0 +1,83 @@
using System.Collections.Generic;
namespace ZB.MOM.WW.OtOpcUa.Runtime.Historian;
/// <summary>
/// Binds the <c>ContinuousHistorization</c> configuration section that gates the continuous
/// historization of driver (non-Galaxy) tag values. When <see cref="Enabled"/> is <c>true</c>
/// <em>and</em> the <c>ServerHistorian</c> gateway is configured, the Host builds a durable,
/// crash-safe <c>FasterLogHistorizationOutbox</c> + a gateway-backed <c>IHistorianValueWriter</c>
/// and <c>WithOtOpcUaRuntimeActors</c> spawns the <see cref="ContinuousHistorizationRecorder"/>;
/// otherwise no recorder is spawned and driver tag values are not historized.
/// <para>
/// The recorder taps the per-node dependency-mux value fan-out, appends each numeric value to
/// the outbox (the crash boundary), and drains the outbox to the historian's SQL live-value
/// write path (<c>WriteLiveValues</c>) through the single <c>ServerHistorian</c> gateway. The
/// gateway connection (endpoint / key / TLS) is sourced from <c>ServerHistorianOptions</c> — this
/// section carries only the recorder + outbox knobs, never a gateway address or credential.
/// </para>
/// </summary>
public sealed class ContinuousHistorizationOptions
{
/// <summary>The configuration section name this options class binds.</summary>
public const string SectionName = "ContinuousHistorization";
/// <summary>
/// When <c>true</c> (and the <c>ServerHistorian</c> gateway is configured), the
/// continuous-historization recorder + its durable outbox are wired and spawned; when
/// <c>false</c> (the default) no recorder is spawned and driver tag values are not historized.
/// </summary>
public bool Enabled { get; init; }
/// <summary>
/// Directory holding the FasterLog outbox segment + commit files. Required when
/// <see cref="Enabled"/> is <c>true</c>. In production set an <b>absolute</b> path on durable
/// storage — a relative path resolves against the host's working directory, which may change
/// across deployments.
/// </summary>
public string OutboxPath { get; init; } = "";
/// <summary>
/// Outbox commit cadence: <c>PerEntry</c> (the default) fsyncs the log before each append
/// returns (safest, no loss window); <c>Periodic</c> batches commits onto a background timer
/// every <see cref="CommitIntervalMs"/> ms (higher throughput, a bounded worst-case loss window).
/// Parsed case-insensitively against <c>HistorizationCommitMode</c>; an unrecognized value falls
/// back to <c>PerEntry</c>.
/// </summary>
public string CommitMode { get; init; } = "PerEntry";
/// <summary>Periodic-mode commit cadence in milliseconds; must be positive when
/// <see cref="CommitMode"/> is <c>Periodic</c>. Ignored under <c>PerEntry</c>.</summary>
public int CommitIntervalMs { get; init; } = 100;
/// <summary>Maximum outbox entries peeked + written per drain pass; clamped to a positive value by
/// the recorder (a non-positive value falls back to 64).</summary>
public int DrainBatchSize { get; init; } = 64;
/// <summary>Steady drain cadence in seconds (also the post-success reschedule). Defaults to 2.</summary>
public double DrainIntervalSeconds { get; init; } = 2;
/// <summary>Maximum un-acked outbox entries before the drop-oldest capacity policy kicks in;
/// <c>0</c> (the default) means unbounded.</summary>
public int Capacity { get; init; }
/// <summary>Initial retry backoff (seconds) after a failed drain pass. Defaults to 1.</summary>
public double MinBackoffSeconds { get; init; } = 1;
/// <summary>Cap (seconds) on the exponential retry backoff after repeated drain failures. Defaults to 30.</summary>
public double MaxBackoffSeconds { get; init; } = 30;
/// <summary>Returns operator-facing misconfiguration warnings for an <c>Enabled</c> recorder
/// (empty when disabled or correctly configured). Pure — the registration logs each entry.</summary>
/// <returns>Zero or more human-readable warning messages (never carrying secret values).</returns>
public IReadOnlyList<string> Validate()
{
var warnings = new List<string>();
if (!Enabled) return warnings;
if (string.IsNullOrWhiteSpace(OutboxPath))
warnings.Add("ContinuousHistorization:OutboxPath is empty while historization is enabled — the durable outbox has no directory to persist to; the recorder cannot be wired.");
if (string.Equals(CommitMode, "Periodic", StringComparison.OrdinalIgnoreCase) && CommitIntervalMs <= 0)
warnings.Add($"ContinuousHistorization:CommitIntervalMs is {CommitIntervalMs} — must be > 0 in Periodic commit mode; the periodic-commit loop cannot run.");
return warnings;
}
}