using ZB.MOM.WW.OtOpcUa.Commons.Observability; using ZB.MOM.WW.OtOpcUa.Core.Abstractions.Historian; namespace ZB.MOM.WW.OtOpcUa.Runtime.Historian; /// /// Observable-gauge instruments for the continuous-historization durable outbox, hung off the /// central (the same meter the Host's OpenTelemetry / /// Prometheus binding already scrapes), so no extra meter allowlist entry is needed. /// /// The gauges read the bound outbox directly rather than Ask-ing the recorder actor: an /// Ask inside a synchronous observable-gauge callback would block the metrics-collection /// thread on the actor mailbox. The outbox exposes both gauge sources cheaply and /// non-blockingly — completes synchronously (it /// just reads an in-memory FIFO count) and is a /// plain property. The recorder's other counters (TotalRecorded / DroppedNonNumeric / /// OutboxAppendFailures) remain available via its GetStatus Ask for a health hook, but are /// not surfaced as gauges here (Ask-in-gauge is the awkward path the plan calls out). /// /// public static class ContinuousHistorizationMetrics { private static volatile IHistorizationOutbox? _outbox; static ContinuousHistorizationMetrics() { // Registered once on first touch (i.e. when BindOutbox is first called at host start). Instruments // are no-op until a listener attaches, so an unbound process pays nothing; the callbacks are // null-safe and return 0 until BindOutbox supplies the live outbox. OtOpcUaTelemetry.Meter.CreateObservableGauge( "otopcua.historization.outbox.depth", ObserveDepth, unit: "{entry}", description: "Un-acked entries currently held in the continuous-historization durable outbox."); OtOpcUaTelemetry.Meter.CreateObservableGauge( "otopcua.historization.outbox.dropped", ObserveDropped, unit: "{entry}", description: "Lifetime entries the continuous-historization outbox dropped on capacity overflow."); } /// Binds the process outbox the gauges observe. Called once by the Host when continuous /// historization is enabled; subsequent calls re-point the gauges (idempotent in practice). /// The durable outbox the recorder drains. public static void BindOutbox(IHistorizationOutbox outbox) => _outbox = outbox ?? throw new ArgumentNullException(nameof(outbox)); private static long ObserveDepth() { IHistorizationOutbox? outbox = _outbox; if (outbox is null) return 0L; // CountAsync over the FasterLog outbox completes synchronously (in-memory FIFO count); read the // already-completed result without blocking. A (theoretical) pending result reports 0 this scrape. ValueTask pending = outbox.CountAsync(CancellationToken.None); return pending.IsCompletedSuccessfully ? pending.Result : 0L; } private static long ObserveDropped() => _outbox?.DroppedCount ?? 0L; }