feat(historian-gateway): GatewayHistorianDataSource read paths (raw/processed/at-time)
Claude-Session: https://claude.ai/code/session_012SDSQ3AcaXqPcBtDESBRii
This commit is contained in:
@@ -0,0 +1,262 @@
|
||||
using Microsoft.Extensions.Logging;
|
||||
using ZB.MOM.WW.HistorianGateway.Contracts.Grpc;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Mapping;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway;
|
||||
|
||||
/// <summary>
|
||||
/// Server-side <see cref="IHistorianDataSource"/> backed by the HistorianGateway gRPC surface
|
||||
/// (via the <see cref="IHistorianGatewayClient"/> seam). Translates OPC UA HistoryRead requests
|
||||
/// to gateway read calls and maps the wire shapes back to the driver-agnostic
|
||||
/// <see cref="DataValueSnapshot"/> / <see cref="HistoricalEvent"/> carriers using the pure
|
||||
/// mappers in <c>Mapping/</c>.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// The data source owns no historian connection of its own — it delegates to the gateway, which
|
||||
/// pools and amortizes the underlying historian sessions. A thrown gateway exception is recorded
|
||||
/// as a health failure and rethrown: the node manager turns it into a Bad HistoryRead result, so
|
||||
/// a backend fault never crashes the host. An empty time window is a successful (GoodNoData)
|
||||
/// read, not a fault. Health counters follow the single-<c>_healthLock</c> discipline ported
|
||||
/// from <c>WonderwareHistorianClient</c> so <c>TotalSuccesses + TotalFailures == TotalQueries</c>
|
||||
/// holds at every observed snapshot.
|
||||
/// </remarks>
|
||||
public sealed class GatewayHistorianDataSource : IHistorianDataSource, IAsyncDisposable
|
||||
{
|
||||
private readonly IHistorianGatewayClient _client;
|
||||
private readonly ILogger<GatewayHistorianDataSource> _logger;
|
||||
|
||||
private readonly object _healthLock = new();
|
||||
private DateTime? _lastSuccessUtc;
|
||||
private DateTime? _lastFailureUtc;
|
||||
private string? _lastError;
|
||||
private long _totalQueries;
|
||||
private long _totalSuccesses;
|
||||
private long _totalFailures;
|
||||
private int _consecutiveFailures;
|
||||
|
||||
/// <summary>Creates a gateway-backed historian data source.</summary>
|
||||
/// <param name="client">The gateway client seam used for all reads.</param>
|
||||
/// <param name="logger">Diagnostic logger; failures are recorded without leaking tag/host detail.</param>
|
||||
public GatewayHistorianDataSource(IHistorianGatewayClient client, ILogger<GatewayHistorianDataSource> logger)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(client);
|
||||
ArgumentNullException.ThrowIfNull(logger);
|
||||
_client = client;
|
||||
_logger = logger;
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public async Task<HistoryReadResult> ReadRawAsync(
|
||||
string fullReference, DateTime startUtc, DateTime endUtc, uint maxValuesPerNode,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
try
|
||||
{
|
||||
// The gateway seam caps with an int; OPC UA hands us a uint, so clamp to int range.
|
||||
var maxValues = (int)Math.Min(maxValuesPerNode, int.MaxValue);
|
||||
var samples = new List<HistorianSample>();
|
||||
await foreach (var sample in _client
|
||||
.ReadRawAsync(fullReference, startUtc, endUtc, maxValues, cancellationToken)
|
||||
.ConfigureAwait(false))
|
||||
{
|
||||
samples.Add(sample);
|
||||
}
|
||||
|
||||
var snapshots = SampleMapper.ToSnapshots(samples);
|
||||
RecordOutcome(success: true, error: null);
|
||||
return new HistoryReadResult(snapshots, ContinuationPoint: null);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
RecordReadFailure(ex);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public async Task<HistoryReadResult> ReadProcessedAsync(
|
||||
string fullReference, DateTime startUtc, DateTime endUtc, TimeSpan interval,
|
||||
HistoryAggregateType aggregate, CancellationToken cancellationToken)
|
||||
{
|
||||
try
|
||||
{
|
||||
// Total/Count are now native gateway retrieval modes — no client-side scaling
|
||||
// (unlike the Wonderware path that derived Total as Average × interval-seconds).
|
||||
var mode = AggregateModeMapper.ToRetrievalMode(aggregate);
|
||||
var buckets = new List<HistorianAggregateSample>();
|
||||
await foreach (var bucket in _client
|
||||
.ReadAggregateAsync(fullReference, startUtc, endUtc, mode, interval, cancellationToken)
|
||||
.ConfigureAwait(false))
|
||||
{
|
||||
buckets.Add(bucket);
|
||||
}
|
||||
|
||||
var snapshots = SampleMapper.ToAggregateSnapshots(buckets);
|
||||
RecordOutcome(success: true, error: null);
|
||||
return new HistoryReadResult(snapshots, ContinuationPoint: null);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
RecordReadFailure(ex);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public async Task<HistoryReadResult> ReadAtTimeAsync(
|
||||
string fullReference, IReadOnlyList<DateTime> timestampsUtc, CancellationToken cancellationToken)
|
||||
{
|
||||
try
|
||||
{
|
||||
var samples = await _client
|
||||
.ReadAtTimeAsync(fullReference, timestampsUtc, cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
var aligned = AlignAtTimeSnapshots(timestampsUtc, samples);
|
||||
RecordOutcome(success: true, error: null);
|
||||
return new HistoryReadResult(aligned, ContinuationPoint: null);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
RecordReadFailure(ex);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public async Task<HistoricalEventsResult> ReadEventsAsync(
|
||||
string? sourceName, DateTime startUtc, DateTime endUtc, int maxEvents,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
try
|
||||
{
|
||||
var events = new List<HistorianEvent>();
|
||||
await foreach (var wireEvent in _client
|
||||
.ReadEventsAsync(sourceName, startUtc, endUtc, maxEvents, cancellationToken)
|
||||
.ConfigureAwait(false))
|
||||
{
|
||||
events.Add(wireEvent);
|
||||
}
|
||||
|
||||
var mapped = EventMapper.ToHistoricalEvents(events);
|
||||
RecordOutcome(success: true, error: null);
|
||||
return new HistoricalEventsResult(mapped, ContinuationPoint: null);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
RecordReadFailure(ex);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public HistorianHealthSnapshot GetHealthSnapshot()
|
||||
{
|
||||
lock (_healthLock)
|
||||
{
|
||||
return new HistorianHealthSnapshot(
|
||||
TotalQueries: _totalQueries,
|
||||
TotalSuccesses: _totalSuccesses,
|
||||
TotalFailures: _totalFailures,
|
||||
ConsecutiveFailures: _consecutiveFailures,
|
||||
LastSuccessTime: _lastSuccessUtc,
|
||||
LastFailureTime: _lastFailureUtc,
|
||||
LastError: _lastError,
|
||||
// Connection-state caching arrives in T8 (RefreshConnectionStateAsync); until then
|
||||
// both flags read closed. The gateway is non-clustered to us, so node fields are
|
||||
// null/empty (mirrors the Wonderware client's Finding 010 posture).
|
||||
ProcessConnectionOpen: false,
|
||||
EventConnectionOpen: false,
|
||||
ActiveProcessNode: null,
|
||||
ActiveEventNode: null,
|
||||
Nodes: []);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Reconciles a gateway at-time reply against the requested timestamps to honour the
|
||||
/// <see cref="IHistorianDataSource.ReadAtTimeAsync"/> contract: exactly one snapshot per
|
||||
/// requested timestamp, in request order. Returned samples are indexed by timestamp ticks;
|
||||
/// any requested timestamp the gateway did not return is filled with a Bad-quality
|
||||
/// (<c>0x80000000</c>) snapshot stamped at the requested time rather than positionally
|
||||
/// misaligning values. Ported from <c>WonderwareHistorianClient.AlignAtTimeSnapshots</c>.
|
||||
/// </summary>
|
||||
private static IReadOnlyList<DataValueSnapshot> AlignAtTimeSnapshots(
|
||||
IReadOnlyList<DateTime> timestampsUtc, IReadOnlyList<HistorianSample> samples)
|
||||
{
|
||||
// Index returned samples by timestamp ticks. Duplicate timestamps keep the first.
|
||||
var byTicks = new Dictionary<long, HistorianSample>(samples.Count);
|
||||
foreach (var sample in samples)
|
||||
{
|
||||
if (sample.Timestamp is null) continue;
|
||||
byTicks.TryAdd(sample.Timestamp.ToDateTime().Ticks, sample);
|
||||
}
|
||||
|
||||
var result = new DataValueSnapshot[timestampsUtc.Count];
|
||||
for (var i = 0; i < timestampsUtc.Count; i++)
|
||||
{
|
||||
var requested = DateTime.SpecifyKind(timestampsUtc[i], DateTimeKind.Utc);
|
||||
if (byTicks.TryGetValue(requested.Ticks, out var sample))
|
||||
{
|
||||
// Reuse the shared sample mapper for value + quality, then re-stamp the source
|
||||
// timestamp to the requested time per the ReadAtTime contract.
|
||||
result[i] = SampleMapper.ToSnapshot(sample) with { SourceTimestampUtc = requested };
|
||||
}
|
||||
else
|
||||
{
|
||||
// Gap — gateway returned no sample for this timestamp. Per the contract this is a
|
||||
// Bad-quality snapshot stamped at the requested time, not a dropped row.
|
||||
result[i] = new DataValueSnapshot(
|
||||
Value: null,
|
||||
StatusCode: 0x80000000u, // Bad
|
||||
SourceTimestampUtc: requested,
|
||||
ServerTimestampUtc: DateTime.UtcNow);
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Records a failed read: bumps the health counters and logs a generic, redaction-safe
|
||||
/// debug line (no tag, host, or value). The exception itself is rethrown by the caller.
|
||||
/// </summary>
|
||||
private void RecordReadFailure(Exception ex)
|
||||
{
|
||||
RecordOutcome(success: false, error: ex.Message);
|
||||
_logger.LogDebug("Historian gateway read operation failed and was recorded as a health failure.");
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Records the outcome of a single read — increments <c>_totalQueries</c> and exactly one of
|
||||
/// <c>_totalSuccesses</c> / <c>_totalFailures</c> under a single <c>_healthLock</c>
|
||||
/// acquisition so a concurrent <see cref="GetHealthSnapshot"/> never observes a torn state.
|
||||
/// </summary>
|
||||
private void RecordOutcome(bool success, string? error)
|
||||
{
|
||||
lock (_healthLock)
|
||||
{
|
||||
_totalQueries++;
|
||||
if (success)
|
||||
{
|
||||
_totalSuccesses++;
|
||||
_consecutiveFailures = 0;
|
||||
_lastSuccessUtc = DateTime.UtcNow;
|
||||
}
|
||||
else
|
||||
{
|
||||
_totalFailures++;
|
||||
_consecutiveFailures++;
|
||||
_lastFailureUtc = DateTime.UtcNow;
|
||||
_lastError = error;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>Disposes the underlying gateway client.</summary>
|
||||
public void Dispose() => _client.DisposeAsync().AsTask().GetAwaiter().GetResult();
|
||||
|
||||
/// <summary>Asynchronously disposes the underlying gateway client.</summary>
|
||||
/// <returns>A task that completes when the client has been disposed.</returns>
|
||||
public ValueTask DisposeAsync() => _client.DisposeAsync();
|
||||
}
|
||||
+1
@@ -16,6 +16,7 @@
|
||||
<ItemGroup>
|
||||
<PackageReference Include="ZB.MOM.WW.HistorianGateway.Client" />
|
||||
<PackageReference Include="ZB.MOM.WW.HistorianGateway.Contracts" />
|
||||
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
|
||||
+75
@@ -0,0 +1,75 @@
|
||||
using Google.Protobuf.WellKnownTypes;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.HistorianGateway.Contracts.Grpc;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Gateway.Tests;
|
||||
|
||||
public sealed class GatewayHistorianDataSourceTests
|
||||
{
|
||||
[Fact]
|
||||
public async Task ReadRaw_maps_samples_and_passes_args()
|
||||
{
|
||||
var fake = new FakeHistorianGatewayClient
|
||||
{
|
||||
RawSamples = new[]
|
||||
{
|
||||
new HistorianSample { Tag = "T", NumericValue = 1.0, OpcQuality = 192, Timestamp = Ts(2026, 1, 1, 0, 0, 0) },
|
||||
new HistorianSample { Tag = "T", NumericValue = 2.0, OpcQuality = 0, Timestamp = Ts(2026, 1, 1, 0, 0, 1) },
|
||||
},
|
||||
};
|
||||
var ds = new GatewayHistorianDataSource(fake, NullLogger<GatewayHistorianDataSource>.Instance);
|
||||
var r = await ds.ReadRawAsync("T", DateTime.UtcNow.AddMinutes(-5), DateTime.UtcNow, 100, TestContext.Current.CancellationToken);
|
||||
Assert.Equal(2, r.Samples.Count);
|
||||
Assert.Equal(0x80000000u, r.Samples[1].StatusCode); // Bad from quality 0
|
||||
Assert.Equal("T", fake.LastReadRawTag);
|
||||
Assert.Equal(100, fake.LastReadRawMaxValues);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ReadProcessed_uses_aggregate_mode_mapping()
|
||||
{
|
||||
var fake = new FakeHistorianGatewayClient();
|
||||
var ds = new GatewayHistorianDataSource(fake, NullLogger<GatewayHistorianDataSource>.Instance);
|
||||
await ds.ReadProcessedAsync("T", default, default, TimeSpan.FromSeconds(60), HistoryAggregateType.Minimum, TestContext.Current.CancellationToken);
|
||||
Assert.Equal(RetrievalMode.MinimumWithTime, fake.LastAggregateMode);
|
||||
Assert.Equal(TimeSpan.FromSeconds(60), fake.LastAggregateInterval);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ReadAtTime_aligns_one_snapshot_per_timestamp_with_gaps_Bad()
|
||||
{
|
||||
var fake = new FakeHistorianGatewayClient();
|
||||
var t0 = new DateTime(2026, 1, 1, 0, 0, 0, DateTimeKind.Utc);
|
||||
var t1 = t0.AddSeconds(1);
|
||||
fake.AtTimeSamples = new[] { new HistorianSample { NumericValue = 5.0, OpcQuality = 192, Timestamp = Timestamp.FromDateTime(t0) } };
|
||||
var ds = new GatewayHistorianDataSource(fake, NullLogger<GatewayHistorianDataSource>.Instance);
|
||||
var r = await ds.ReadAtTimeAsync("T", new[] { t0, t1 }, TestContext.Current.CancellationToken);
|
||||
Assert.Equal(2, r.Samples.Count); // exactly one per requested ts, in order
|
||||
Assert.Equal(5.0, r.Samples[0].Value);
|
||||
Assert.Equal(0x80000000u, r.Samples[1].StatusCode); // gap → Bad at requested ts
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Empty_window_is_not_a_fault()
|
||||
{
|
||||
var fake = new FakeHistorianGatewayClient { RawSamples = Array.Empty<HistorianSample>() };
|
||||
var ds = new GatewayHistorianDataSource(fake, NullLogger<GatewayHistorianDataSource>.Instance);
|
||||
var r = await ds.ReadRawAsync("T", default, default, 10, TestContext.Current.CancellationToken);
|
||||
Assert.Empty(r.Samples); // GoodNoData-empty, no throw
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Disposing_data_source_disposes_client()
|
||||
{
|
||||
var fake = new FakeHistorianGatewayClient();
|
||||
var ds = new GatewayHistorianDataSource(fake, NullLogger<GatewayHistorianDataSource>.Instance);
|
||||
await ds.DisposeAsync();
|
||||
Assert.Equal(1, fake.DisposeCallCount);
|
||||
}
|
||||
|
||||
// Ts(...) builds a Google.Protobuf.WellKnownTypes.Timestamp from UTC parts.
|
||||
private static Timestamp Ts(int y, int mo, int d, int h, int mi, int s)
|
||||
=> Timestamp.FromDateTime(new DateTime(y, mo, d, h, mi, s, DateTimeKind.Utc));
|
||||
}
|
||||
Reference in New Issue
Block a user