using Microsoft.Extensions.Logging;
using ZB.MOM.WW.HistorianGateway.Contracts.Grpc;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Mapping;
namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway;
///
/// Server-side backed by the HistorianGateway gRPC surface
/// (via the seam). Translates OPC UA HistoryRead requests
/// to gateway read calls and maps the wire shapes back to the driver-agnostic
/// / carriers using the pure
/// mappers in Mapping/.
///
///
/// The data source owns no historian connection of its own — it delegates to the gateway, which
/// pools and amortizes the underlying historian sessions. A thrown gateway exception is recorded
/// as a health failure and rethrown: the node manager turns it into a Bad HistoryRead result, so
/// a backend fault never crashes the host. An empty time window is a successful (GoodNoData)
/// read, not a fault. Health counters follow the single-_healthLock discipline ported
/// from WonderwareHistorianClient so TotalSuccesses + TotalFailures == TotalQueries
/// holds at every observed snapshot.
///
public sealed class GatewayHistorianDataSource : IHistorianDataSource, IAsyncDisposable
{
///
/// is a combinable [Flags] value: the
/// process-data connection is bit 0 (value 1), the event connection is bit 1 (value 2).
///
private const uint ProcessConnectionFlag = 1;
private const uint EventConnectionFlag = 2;
private readonly IHistorianGatewayClient _client;
private readonly ILogger _logger;
private readonly object _healthLock = new();
private DateTime? _lastSuccessUtc;
private DateTime? _lastFailureUtc;
private string? _lastError;
private long _totalQueries;
private long _totalSuccesses;
private long _totalFailures;
private int _consecutiveFailures;
private bool _processConnectionOpen;
private bool _eventConnectionOpen;
/// Creates a gateway-backed historian data source.
/// The gateway client seam used for all reads.
/// Diagnostic logger; failures are recorded without leaking tag/host detail.
public GatewayHistorianDataSource(IHistorianGatewayClient client, ILogger logger)
{
ArgumentNullException.ThrowIfNull(client);
ArgumentNullException.ThrowIfNull(logger);
_client = client;
_logger = logger;
}
///
public async Task ReadRawAsync(
string fullReference, DateTime startUtc, DateTime endUtc, uint maxValuesPerNode,
CancellationToken cancellationToken)
{
try
{
// The gateway seam caps with an int; OPC UA hands us a uint, so clamp to int range.
var maxValues = (int)Math.Min(maxValuesPerNode, int.MaxValue);
var samples = new List();
await foreach (var sample in _client
.ReadRawAsync(fullReference, startUtc, endUtc, maxValues, cancellationToken)
.ConfigureAwait(false))
{
samples.Add(sample);
}
var snapshots = SampleMapper.ToSnapshots(samples);
RecordOutcome(success: true, error: null);
return new HistoryReadResult(snapshots, ContinuationPoint: null);
}
catch (Exception ex)
{
RecordReadFailure(ex);
throw;
}
}
///
public async Task ReadProcessedAsync(
string fullReference, DateTime startUtc, DateTime endUtc, TimeSpan interval,
HistoryAggregateType aggregate, CancellationToken cancellationToken)
{
try
{
// Total/Count are now native gateway retrieval modes — no client-side scaling
// (unlike the Wonderware path that derived Total as Average × interval-seconds).
var mode = AggregateModeMapper.ToRetrievalMode(aggregate);
var buckets = new List();
await foreach (var bucket in _client
.ReadAggregateAsync(fullReference, startUtc, endUtc, mode, interval, cancellationToken)
.ConfigureAwait(false))
{
buckets.Add(bucket);
}
var snapshots = SampleMapper.ToAggregateSnapshots(buckets);
RecordOutcome(success: true, error: null);
return new HistoryReadResult(snapshots, ContinuationPoint: null);
}
catch (Exception ex)
{
RecordReadFailure(ex);
throw;
}
}
///
public async Task ReadAtTimeAsync(
string fullReference, IReadOnlyList timestampsUtc, CancellationToken cancellationToken)
{
try
{
var samples = await _client
.ReadAtTimeAsync(fullReference, timestampsUtc, cancellationToken)
.ConfigureAwait(false);
var aligned = AlignAtTimeSnapshots(timestampsUtc, samples);
RecordOutcome(success: true, error: null);
return new HistoryReadResult(aligned, ContinuationPoint: null);
}
catch (Exception ex)
{
RecordReadFailure(ex);
throw;
}
}
///
///
/// Depends on the target gateway running with RuntimeDb:EventReadsEnabled=true (the
/// SQL alarm-history path). The is passed through to the
/// gateway, but its SQL ReadEvents source filter may not be present yet — so this
/// adapter also filters the mapped events by
/// client-side (defensive; remove once the server filter is confirmed). The
/// cap is enforced client-side by early stream termination:
/// a non-positive value applies no client cap (the gateway may still apply its
/// EventReadMaxRows); a positive cap stops at N and sets a non-null
/// iff at least one further matching
/// event existed (the Core.Abstractions-009 truncation signal).
///
public async Task ReadEventsAsync(
string? sourceName, DateTime startUtc, DateTime endUtc, int maxEvents,
CancellationToken cancellationToken)
{
try
{
var hasCap = maxEvents > 0;
var collected = new List(hasCap ? maxEvents : 0);
var truncated = false;
await foreach (var wireEvent in _client
.ReadEventsAsync(sourceName, startUtc, endUtc, maxEvents, cancellationToken)
.ConfigureAwait(false))
{
var mapped = EventMapper.ToHistoricalEvent(wireEvent);
// Defensive client-side source filter: the gateway's SQL ReadEvents source filter
// may not be present, so drop any event whose source does not match the request.
if (sourceName is not null && !string.Equals(mapped.SourceName, sourceName, StringComparison.Ordinal))
{
continue;
}
// One more matching event arriving once the cap is full means the result is
// truncated — stop draining and flag it (Core.Abstractions-009).
if (hasCap && collected.Count == maxEvents)
{
truncated = true;
break;
}
collected.Add(mapped);
}
RecordOutcome(success: true, error: null);
// A non-null, opaque token signals truncation to the caller (Core.Abstractions-009).
// The gateway has no resumable cursor, so the token's contents carry no paging state —
// its presence alone is the "more events exist" signal. A fresh array per call keeps it
// from being shared/mutated.
return new HistoricalEventsResult(collected, truncated ? new byte[] { 0x01 } : null);
}
catch (Exception ex)
{
RecordReadFailure(ex);
throw;
}
}
///
public HistorianHealthSnapshot GetHealthSnapshot()
{
lock (_healthLock)
{
return new HistorianHealthSnapshot(
TotalQueries: _totalQueries,
TotalSuccesses: _totalSuccesses,
TotalFailures: _totalFailures,
ConsecutiveFailures: _consecutiveFailures,
LastSuccessTime: _lastSuccessUtc,
LastFailureTime: _lastFailureUtc,
LastError: _lastError,
// Cached connection flags last observed by RefreshConnectionStateAsync. The gateway
// is non-clustered to us, so node fields are null/empty (mirrors the Wonderware
// client's Finding 010 posture).
ProcessConnectionOpen: _processConnectionOpen,
EventConnectionOpen: _eventConnectionOpen,
ActiveProcessNode: null,
ActiveEventNode: null,
Nodes: []);
}
}
///
/// Refreshes the cached process / event connection flags by querying the gateway's
/// connection status. Intended to be driven by a periodic health hosted-service, keeping
/// pure observation (it never performs I/O). The flags are
/// derived from AND the matching
/// flag bit. A failed status query is a health
/// probe — it never throws to the caller; both flags degrade to closed until the next
/// successful refresh.
///
/// A token to cancel the status query.
/// A task that completes when the cached flags have been updated.
public async Task RefreshConnectionStateAsync(CancellationToken cancellationToken)
{
bool processOpen;
bool eventOpen;
try
{
var status = await _client.GetConnectionStatusAsync(cancellationToken).ConfigureAwait(false);
var connected = status.ConnectedToServer;
processOpen = connected && (status.ConnectionKind & ProcessConnectionFlag) != 0;
eventOpen = connected && (status.ConnectionKind & EventConnectionFlag) != 0;
}
catch (Exception)
{
// A health probe must never crash the host; an unreachable gateway degrades both
// connection flags to closed until the next successful refresh.
_logger.LogDebug("Historian gateway connection-status refresh failed; treating both connections as closed.");
processOpen = false;
eventOpen = false;
}
lock (_healthLock)
{
_processConnectionOpen = processOpen;
_eventConnectionOpen = eventOpen;
}
}
///
/// Reconciles a gateway at-time reply against the requested timestamps to honour the
/// contract: exactly one snapshot per
/// requested timestamp, in request order. Returned samples are indexed by timestamp ticks;
/// any requested timestamp the gateway did not return is filled with a Bad-quality
/// (0x80000000) snapshot stamped at the requested time rather than positionally
/// misaligning values. The alignment logic was ported from the now-retired Wonderware
/// client's at-time snapshot reconciliation.
///
private static IReadOnlyList AlignAtTimeSnapshots(
IReadOnlyList timestampsUtc, IReadOnlyList samples)
{
// Index returned samples by timestamp ticks. Duplicate timestamps keep the first.
var byTicks = new Dictionary(samples.Count);
foreach (var sample in samples)
{
if (sample.Timestamp is null) continue;
byTicks.TryAdd(sample.Timestamp.ToDateTime().Ticks, sample);
}
var result = new DataValueSnapshot[timestampsUtc.Count];
for (var i = 0; i < timestampsUtc.Count; i++)
{
var requested = DateTime.SpecifyKind(timestampsUtc[i], DateTimeKind.Utc);
if (byTicks.TryGetValue(requested.Ticks, out var sample))
{
// Reuse the shared sample mapper for value + quality, then re-stamp the source
// timestamp to the requested time per the ReadAtTime contract.
result[i] = SampleMapper.ToSnapshot(sample) with { SourceTimestampUtc = requested };
}
else
{
// Gap — gateway returned no sample for this timestamp. Per the contract this is a
// Bad-quality snapshot stamped at the requested time, not a dropped row.
result[i] = new DataValueSnapshot(
Value: null,
StatusCode: 0x80000000u, // Bad
SourceTimestampUtc: requested,
ServerTimestampUtc: DateTime.UtcNow);
}
}
return result;
}
///
/// Records a failed read: bumps the health counters and logs a generic, redaction-safe
/// debug line (no tag, host, or value). The exception itself is rethrown by the caller.
///
private void RecordReadFailure(Exception ex)
{
RecordOutcome(success: false, error: ex.Message);
_logger.LogDebug("Historian gateway read operation failed and was recorded as a health failure.");
}
///
/// Records the outcome of a single read — increments _totalQueries and exactly one of
/// _totalSuccesses / _totalFailures under a single _healthLock
/// acquisition so a concurrent never observes a torn state.
///
private void RecordOutcome(bool success, string? error)
{
lock (_healthLock)
{
_totalQueries++;
if (success)
{
_totalSuccesses++;
_consecutiveFailures = 0;
_lastSuccessUtc = DateTime.UtcNow;
}
else
{
_totalFailures++;
_consecutiveFailures++;
_lastFailureUtc = DateTime.UtcNow;
_lastError = error;
}
}
}
/// Disposes the underlying gateway client. Prefer .
public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult();
/// Asynchronously disposes the underlying gateway client.
/// A task that completes when the client has been disposed.
public ValueTask DisposeAsync() => _client.DisposeAsync();
}