From 1e93b2ebfb4d56555d7ebbe1440c81d5f0d85361 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Fri, 26 Jun 2026 16:44:48 -0400 Subject: [PATCH] feat(historian-gateway): GatewayHistorianDataSource read paths (raw/processed/at-time) Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii --- .../GatewayHistorianDataSource.cs | 262 ++++++++++++++++++ ...WW.OtOpcUa.Driver.Historian.Gateway.csproj | 1 + .../GatewayHistorianDataSourceTests.cs | 75 +++++ 3 files changed, 338 insertions(+) create mode 100644 src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/GatewayHistorianDataSource.cs create mode 100644 tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/GatewayHistorianDataSourceTests.cs diff --git a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/GatewayHistorianDataSource.cs b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/GatewayHistorianDataSource.cs new file mode 100644 index 00000000..abf9b614 --- /dev/null +++ b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/GatewayHistorianDataSource.cs @@ -0,0 +1,262 @@ +using Microsoft.Extensions.Logging; +using ZB.MOM.WW.HistorianGateway.Contracts.Grpc; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Mapping; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway; + +/// +/// Server-side backed by the HistorianGateway gRPC surface +/// (via the seam). Translates OPC UA HistoryRead requests +/// to gateway read calls and maps the wire shapes back to the driver-agnostic +/// / carriers using the pure +/// mappers in Mapping/. +/// +/// +/// The data source owns no historian connection of its own — it delegates to the gateway, which +/// pools and amortizes the underlying historian sessions. A thrown gateway exception is recorded +/// as a health failure and rethrown: the node manager turns it into a Bad HistoryRead result, so +/// a backend fault never crashes the host. An empty time window is a successful (GoodNoData) +/// read, not a fault. Health counters follow the single-_healthLock discipline ported +/// from WonderwareHistorianClient so TotalSuccesses + TotalFailures == TotalQueries +/// holds at every observed snapshot. +/// +public sealed class GatewayHistorianDataSource : IHistorianDataSource, IAsyncDisposable +{ + private readonly IHistorianGatewayClient _client; + private readonly ILogger _logger; + + private readonly object _healthLock = new(); + private DateTime? _lastSuccessUtc; + private DateTime? _lastFailureUtc; + private string? _lastError; + private long _totalQueries; + private long _totalSuccesses; + private long _totalFailures; + private int _consecutiveFailures; + + /// Creates a gateway-backed historian data source. + /// The gateway client seam used for all reads. + /// Diagnostic logger; failures are recorded without leaking tag/host detail. + public GatewayHistorianDataSource(IHistorianGatewayClient client, ILogger logger) + { + ArgumentNullException.ThrowIfNull(client); + ArgumentNullException.ThrowIfNull(logger); + _client = client; + _logger = logger; + } + + /// + public async Task ReadRawAsync( + string fullReference, DateTime startUtc, DateTime endUtc, uint maxValuesPerNode, + CancellationToken cancellationToken) + { + try + { + // The gateway seam caps with an int; OPC UA hands us a uint, so clamp to int range. + var maxValues = (int)Math.Min(maxValuesPerNode, int.MaxValue); + var samples = new List(); + await foreach (var sample in _client + .ReadRawAsync(fullReference, startUtc, endUtc, maxValues, cancellationToken) + .ConfigureAwait(false)) + { + samples.Add(sample); + } + + var snapshots = SampleMapper.ToSnapshots(samples); + RecordOutcome(success: true, error: null); + return new HistoryReadResult(snapshots, ContinuationPoint: null); + } + catch (Exception ex) + { + RecordReadFailure(ex); + throw; + } + } + + /// + public async Task ReadProcessedAsync( + string fullReference, DateTime startUtc, DateTime endUtc, TimeSpan interval, + HistoryAggregateType aggregate, CancellationToken cancellationToken) + { + try + { + // Total/Count are now native gateway retrieval modes — no client-side scaling + // (unlike the Wonderware path that derived Total as Average × interval-seconds). + var mode = AggregateModeMapper.ToRetrievalMode(aggregate); + var buckets = new List(); + await foreach (var bucket in _client + .ReadAggregateAsync(fullReference, startUtc, endUtc, mode, interval, cancellationToken) + .ConfigureAwait(false)) + { + buckets.Add(bucket); + } + + var snapshots = SampleMapper.ToAggregateSnapshots(buckets); + RecordOutcome(success: true, error: null); + return new HistoryReadResult(snapshots, ContinuationPoint: null); + } + catch (Exception ex) + { + RecordReadFailure(ex); + throw; + } + } + + /// + public async Task ReadAtTimeAsync( + string fullReference, IReadOnlyList timestampsUtc, CancellationToken cancellationToken) + { + try + { + var samples = await _client + .ReadAtTimeAsync(fullReference, timestampsUtc, cancellationToken) + .ConfigureAwait(false); + var aligned = AlignAtTimeSnapshots(timestampsUtc, samples); + RecordOutcome(success: true, error: null); + return new HistoryReadResult(aligned, ContinuationPoint: null); + } + catch (Exception ex) + { + RecordReadFailure(ex); + throw; + } + } + + /// + public async Task ReadEventsAsync( + string? sourceName, DateTime startUtc, DateTime endUtc, int maxEvents, + CancellationToken cancellationToken) + { + try + { + var events = new List(); + await foreach (var wireEvent in _client + .ReadEventsAsync(sourceName, startUtc, endUtc, maxEvents, cancellationToken) + .ConfigureAwait(false)) + { + events.Add(wireEvent); + } + + var mapped = EventMapper.ToHistoricalEvents(events); + RecordOutcome(success: true, error: null); + return new HistoricalEventsResult(mapped, ContinuationPoint: null); + } + catch (Exception ex) + { + RecordReadFailure(ex); + throw; + } + } + + /// + public HistorianHealthSnapshot GetHealthSnapshot() + { + lock (_healthLock) + { + return new HistorianHealthSnapshot( + TotalQueries: _totalQueries, + TotalSuccesses: _totalSuccesses, + TotalFailures: _totalFailures, + ConsecutiveFailures: _consecutiveFailures, + LastSuccessTime: _lastSuccessUtc, + LastFailureTime: _lastFailureUtc, + LastError: _lastError, + // Connection-state caching arrives in T8 (RefreshConnectionStateAsync); until then + // both flags read closed. The gateway is non-clustered to us, so node fields are + // null/empty (mirrors the Wonderware client's Finding 010 posture). + ProcessConnectionOpen: false, + EventConnectionOpen: false, + ActiveProcessNode: null, + ActiveEventNode: null, + Nodes: []); + } + } + + /// + /// Reconciles a gateway at-time reply against the requested timestamps to honour the + /// contract: exactly one snapshot per + /// requested timestamp, in request order. Returned samples are indexed by timestamp ticks; + /// any requested timestamp the gateway did not return is filled with a Bad-quality + /// (0x80000000) snapshot stamped at the requested time rather than positionally + /// misaligning values. Ported from WonderwareHistorianClient.AlignAtTimeSnapshots. + /// + private static IReadOnlyList AlignAtTimeSnapshots( + IReadOnlyList timestampsUtc, IReadOnlyList samples) + { + // Index returned samples by timestamp ticks. Duplicate timestamps keep the first. + var byTicks = new Dictionary(samples.Count); + foreach (var sample in samples) + { + if (sample.Timestamp is null) continue; + byTicks.TryAdd(sample.Timestamp.ToDateTime().Ticks, sample); + } + + var result = new DataValueSnapshot[timestampsUtc.Count]; + for (var i = 0; i < timestampsUtc.Count; i++) + { + var requested = DateTime.SpecifyKind(timestampsUtc[i], DateTimeKind.Utc); + if (byTicks.TryGetValue(requested.Ticks, out var sample)) + { + // Reuse the shared sample mapper for value + quality, then re-stamp the source + // timestamp to the requested time per the ReadAtTime contract. + result[i] = SampleMapper.ToSnapshot(sample) with { SourceTimestampUtc = requested }; + } + else + { + // Gap — gateway returned no sample for this timestamp. Per the contract this is a + // Bad-quality snapshot stamped at the requested time, not a dropped row. + result[i] = new DataValueSnapshot( + Value: null, + StatusCode: 0x80000000u, // Bad + SourceTimestampUtc: requested, + ServerTimestampUtc: DateTime.UtcNow); + } + } + + return result; + } + + /// + /// Records a failed read: bumps the health counters and logs a generic, redaction-safe + /// debug line (no tag, host, or value). The exception itself is rethrown by the caller. + /// + private void RecordReadFailure(Exception ex) + { + RecordOutcome(success: false, error: ex.Message); + _logger.LogDebug("Historian gateway read operation failed and was recorded as a health failure."); + } + + /// + /// Records the outcome of a single read — increments _totalQueries and exactly one of + /// _totalSuccesses / _totalFailures under a single _healthLock + /// acquisition so a concurrent never observes a torn state. + /// + private void RecordOutcome(bool success, string? error) + { + lock (_healthLock) + { + _totalQueries++; + if (success) + { + _totalSuccesses++; + _consecutiveFailures = 0; + _lastSuccessUtc = DateTime.UtcNow; + } + else + { + _totalFailures++; + _consecutiveFailures++; + _lastFailureUtc = DateTime.UtcNow; + _lastError = error; + } + } + } + + /// Disposes the underlying gateway client. + public void Dispose() => _client.DisposeAsync().AsTask().GetAwaiter().GetResult(); + + /// Asynchronously disposes the underlying gateway client. + /// A task that completes when the client has been disposed. + public ValueTask DisposeAsync() => _client.DisposeAsync(); +} diff --git a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.csproj b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.csproj index 1f2994d9..63c34238 100644 --- a/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.csproj +++ b/src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.csproj @@ -16,6 +16,7 @@ + diff --git a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/GatewayHistorianDataSourceTests.cs b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/GatewayHistorianDataSourceTests.cs new file mode 100644 index 00000000..0e7abcbd --- /dev/null +++ b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests/GatewayHistorianDataSourceTests.cs @@ -0,0 +1,75 @@ +using Google.Protobuf.WellKnownTypes; +using Microsoft.Extensions.Logging.Abstractions; +using Xunit; +using ZB.MOM.WW.HistorianGateway.Contracts.Grpc; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests; + +public sealed class GatewayHistorianDataSourceTests +{ + [Fact] + public async Task ReadRaw_maps_samples_and_passes_args() + { + var fake = new FakeHistorianGatewayClient + { + RawSamples = new[] + { + new HistorianSample { Tag = "T", NumericValue = 1.0, OpcQuality = 192, Timestamp = Ts(2026, 1, 1, 0, 0, 0) }, + new HistorianSample { Tag = "T", NumericValue = 2.0, OpcQuality = 0, Timestamp = Ts(2026, 1, 1, 0, 0, 1) }, + }, + }; + var ds = new GatewayHistorianDataSource(fake, NullLogger.Instance); + var r = await ds.ReadRawAsync("T", DateTime.UtcNow.AddMinutes(-5), DateTime.UtcNow, 100, TestContext.Current.CancellationToken); + Assert.Equal(2, r.Samples.Count); + Assert.Equal(0x80000000u, r.Samples[1].StatusCode); // Bad from quality 0 + Assert.Equal("T", fake.LastReadRawTag); + Assert.Equal(100, fake.LastReadRawMaxValues); + } + + [Fact] + public async Task ReadProcessed_uses_aggregate_mode_mapping() + { + var fake = new FakeHistorianGatewayClient(); + var ds = new GatewayHistorianDataSource(fake, NullLogger.Instance); + await ds.ReadProcessedAsync("T", default, default, TimeSpan.FromSeconds(60), HistoryAggregateType.Minimum, TestContext.Current.CancellationToken); + Assert.Equal(RetrievalMode.MinimumWithTime, fake.LastAggregateMode); + Assert.Equal(TimeSpan.FromSeconds(60), fake.LastAggregateInterval); + } + + [Fact] + public async Task ReadAtTime_aligns_one_snapshot_per_timestamp_with_gaps_Bad() + { + var fake = new FakeHistorianGatewayClient(); + var t0 = new DateTime(2026, 1, 1, 0, 0, 0, DateTimeKind.Utc); + var t1 = t0.AddSeconds(1); + fake.AtTimeSamples = new[] { new HistorianSample { NumericValue = 5.0, OpcQuality = 192, Timestamp = Timestamp.FromDateTime(t0) } }; + var ds = new GatewayHistorianDataSource(fake, NullLogger.Instance); + var r = await ds.ReadAtTimeAsync("T", new[] { t0, t1 }, TestContext.Current.CancellationToken); + Assert.Equal(2, r.Samples.Count); // exactly one per requested ts, in order + Assert.Equal(5.0, r.Samples[0].Value); + Assert.Equal(0x80000000u, r.Samples[1].StatusCode); // gap → Bad at requested ts + } + + [Fact] + public async Task Empty_window_is_not_a_fault() + { + var fake = new FakeHistorianGatewayClient { RawSamples = Array.Empty() }; + var ds = new GatewayHistorianDataSource(fake, NullLogger.Instance); + var r = await ds.ReadRawAsync("T", default, default, 10, TestContext.Current.CancellationToken); + Assert.Empty(r.Samples); // GoodNoData-empty, no throw + } + + [Fact] + public async Task Disposing_data_source_disposes_client() + { + var fake = new FakeHistorianGatewayClient(); + var ds = new GatewayHistorianDataSource(fake, NullLogger.Instance); + await ds.DisposeAsync(); + Assert.Equal(1, fake.DisposeCallCount); + } + + // Ts(...) builds a Google.Protobuf.WellKnownTypes.Timestamp from UTC parts. + private static Timestamp Ts(int y, int mo, int d, int h, int mi, int s) + => Timestamp.FromDateTime(new DateTime(y, mo, d, h, mi, s, DateTimeKind.Utc)); +}