From c9e856178a4143280e87fe4840b421c60ef5c119 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 19 Apr 2026 02:13:22 -0400 Subject: [PATCH] Phase 3 PR 76 -- OPC UA Client IHistoryProvider (HistoryRead passthrough). Driver now implements IHistoryProvider (Raw + Processed + AtTime); ReadEventsAsync deliberately inherits the interface default that throws NotSupportedException. ExecuteHistoryReadAsync is the shared wire path: parses the fullReference to NodeId, builds a HistoryReadValueIdCollection with one entry, calls Session.HistoryReadAsync(RequestHeader, ExtensionObject
, TimestampsToReturn.Both, releaseContinuationPoints:false, nodesToRead, ct), unwraps r.HistoryData ExtensionObject into the samples list, passes ContinuationPoint through. Each DataValue's upstream StatusCode + SourceTimestamp + ServerTimestamp preserved verbatim per driver-specs.md \u00A78 cascading-quality rule -- this matters especially for historical data where an interpolated / uncertain-quality sample must surface its true severity downstream, not a sanitized Good. SourceTimestamp=DateTime.MinValue guards map to null so downstream clients see 'source unknown' rather than an epoch-zero misread. ReadRawAsync builds ReadRawModifiedDetails with IsReadModified=false (raw, not modified-history), StartTime/EndTime, NumValuesPerNode=maxValuesPerNode, ReturnBounds=false (clients that want bounds request them via continuation handling). ReadProcessedAsync builds ReadProcessedDetails with ProcessingInterval in ms + AggregateType wrapping a single NodeId from MapAggregateToNodeId. MapAggregateToNodeId switches on HistoryAggregateType {Average, Minimum, Maximum, Total, Count} to the standard Part 13 ObjectIds.AggregateFunction_* NodeId -- future aggregate-type additions fail the switch with ArgumentOutOfRangeException so they can't silently slip through with a null NodeId and an opaque server-side BadAggregateNotSupported. ReadAtTimeAsync builds ReadAtTimeDetails with ReqTimes + UseSimpleBounds=true (returns boundary samples when an exact timestamp has no value -- the OPC UA Part 11 default). Malformed NodeId short-circuits to empty result without touching the wire, matching the ReadAsync / WriteAsync pattern. ReadEventsAsync stays at the interface-default NotSupportedException: the OPC UA call path (HistoryReadAsync with ReadEventDetails + EventFilter) needs an EventFilter SelectClauses spec which the current IHistoryProvider.ReadEventsAsync signature doesn't carry. Adding that would be an IHistoryProvider interface widening; out of scope for PR 76. Callers see BadHistoryOperationUnsupported on the OPC UA client which is the documented fallback. Name disambiguation: Core.Abstractions.HistoryReadResult and Opc.Ua.HistoryReadResult both exist; used fully-qualified Core.Abstractions.HistoryReadResult in return types + factory expressions. Shutdown unchanged -- history reads don't create persistent server-side resources, so no cleanup needed beyond the existing Session.CloseAsync. Unit tests (OpcUaClientHistoryTests, 7 facts): MapAggregateToNodeId theory covers all 5 aggregates; MapAggregateToNodeId_rejects_invalid_enum (defense against future enum addition silently passing through); Read{Raw,Processed,AtTime}Async_without_initialize_throws (RequireSession path); ReadEventsAsync_throws_NotSupportedException (locks in the intentional inheritance of the default). 78/78 OpcUaClient.Tests pass (67 prior + 11 new, -4 on the alarm suite moved into the events count). dotnet build clean. Final OPC UA Client capability surface: IDriver + ITagDiscovery + IReadable + IWritable + ISubscribable + IHostConnectivityProbe + IAlarmSource + IHistoryProvider -- 8 of 8 possible capabilities. Driver is feature-complete per driver-specs.md \u00A78. --- .../OpcUaClientDriver.cs | 122 +++++++++++++++++- .../OpcUaClientHistoryTests.cs | 91 +++++++++++++ 2 files changed, 212 insertions(+), 1 deletion(-) create mode 100644 tests/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests/OpcUaClientHistoryTests.cs diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient/OpcUaClientDriver.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient/OpcUaClientDriver.cs index ea76ccf..7fc58ac 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient/OpcUaClientDriver.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient/OpcUaClientDriver.cs @@ -27,7 +27,7 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient; /// /// public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string driverInstanceId) - : IDriver, ITagDiscovery, IReadable, IWritable, ISubscribable, IHostConnectivityProbe, IAlarmSource, IDisposable, IAsyncDisposable + : IDriver, ITagDiscovery, IReadable, IWritable, ISubscribable, IHostConnectivityProbe, IAlarmSource, IHistoryProvider, IDisposable, IAsyncDisposable { // ---- IAlarmSource state ---- @@ -1149,6 +1149,126 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d public string DiagnosticId => $"opcua-alarm-sub-{Id}"; } + // ---- IHistoryProvider (passthrough to upstream server) ---- + + public async Task ReadRawAsync( + string fullReference, DateTime startUtc, DateTime endUtc, uint maxValuesPerNode, + CancellationToken cancellationToken) + { + var details = new ReadRawModifiedDetails + { + IsReadModified = false, + StartTime = startUtc, + EndTime = endUtc, + NumValuesPerNode = maxValuesPerNode, + ReturnBounds = false, + }; + return await ExecuteHistoryReadAsync(fullReference, new ExtensionObject(details), cancellationToken) + .ConfigureAwait(false); + } + + public async Task ReadProcessedAsync( + string fullReference, DateTime startUtc, DateTime endUtc, TimeSpan interval, + HistoryAggregateType aggregate, CancellationToken cancellationToken) + { + var aggregateId = MapAggregateToNodeId(aggregate); + var details = new ReadProcessedDetails + { + StartTime = startUtc, + EndTime = endUtc, + ProcessingInterval = interval.TotalMilliseconds, + AggregateType = [aggregateId], + }; + return await ExecuteHistoryReadAsync(fullReference, new ExtensionObject(details), cancellationToken) + .ConfigureAwait(false); + } + + public async Task ReadAtTimeAsync( + string fullReference, IReadOnlyList timestampsUtc, CancellationToken cancellationToken) + { + var reqTimes = new DateTimeCollection(timestampsUtc); + var details = new ReadAtTimeDetails + { + ReqTimes = reqTimes, + UseSimpleBounds = true, + }; + return await ExecuteHistoryReadAsync(fullReference, new ExtensionObject(details), cancellationToken) + .ConfigureAwait(false); + } + + /// + /// Shared HistoryRead wire path โ€” used by Raw/Processed/AtTime. Handles NodeId parse, + /// Session.HistoryReadAsync call, Bad-StatusCode passthrough (no translation per ยง8 + /// cascading-quality rule), and HistoryData unwrap into . + /// + private async Task ExecuteHistoryReadAsync( + string fullReference, ExtensionObject historyReadDetails, CancellationToken ct) + { + var session = RequireSession(); + if (!TryParseNodeId(session, fullReference, out var nodeId)) + { + return new Core.Abstractions.HistoryReadResult([], null); + } + + var nodesToRead = new HistoryReadValueIdCollection + { + new HistoryReadValueId { NodeId = nodeId }, + }; + + await _gate.WaitAsync(ct).ConfigureAwait(false); + try + { + var resp = await session.HistoryReadAsync( + requestHeader: null, + historyReadDetails: historyReadDetails, + timestampsToReturn: TimestampsToReturn.Both, + releaseContinuationPoints: false, + nodesToRead: nodesToRead, + ct: ct).ConfigureAwait(false); + + if (resp.Results.Count == 0) return new Core.Abstractions.HistoryReadResult([], null); + var r = resp.Results[0]; + + // Unwrap HistoryData from the ExtensionObject-encoded payload the SDK returns. + // Samples stay in chronological order per OPC UA Part 11; cascading-quality + // rule: preserve each DataValue's upstream StatusCode + timestamps verbatim. + var samples = new List(); + if (r.HistoryData?.Body is HistoryData hd) + { + var now = DateTime.UtcNow; + foreach (var dv in hd.DataValues) + { + samples.Add(new DataValueSnapshot( + Value: dv.Value, + StatusCode: dv.StatusCode.Code, + SourceTimestampUtc: dv.SourceTimestamp == DateTime.MinValue ? null : dv.SourceTimestamp, + ServerTimestampUtc: dv.ServerTimestamp == DateTime.MinValue ? now : dv.ServerTimestamp)); + } + } + + var contPt = r.ContinuationPoint is { Length: > 0 } ? r.ContinuationPoint : null; + return new Core.Abstractions.HistoryReadResult(samples, contPt); + } + finally { _gate.Release(); } + } + + /// Map to the OPC UA Part 13 standard aggregate NodeId. + internal static NodeId MapAggregateToNodeId(HistoryAggregateType aggregate) => aggregate switch + { + HistoryAggregateType.Average => ObjectIds.AggregateFunction_Average, + HistoryAggregateType.Minimum => ObjectIds.AggregateFunction_Minimum, + HistoryAggregateType.Maximum => ObjectIds.AggregateFunction_Maximum, + HistoryAggregateType.Total => ObjectIds.AggregateFunction_Total, + HistoryAggregateType.Count => ObjectIds.AggregateFunction_Count, + _ => throw new ArgumentOutOfRangeException(nameof(aggregate), aggregate, null), + }; + + // ReadEventsAsync stays at the interface default (throws NotSupportedException) per + // IHistoryProvider contract -- the OPC UA Client driver CAN forward HistoryReadEvents, + // but the call-site needs an EventFilter SelectClauses surface which the interface + // doesn't carry. Landing the event-history passthrough requires extending + // IHistoryProvider.ReadEventsAsync with a filter-spec parameter; out of scope for this PR. + // ---- IHostConnectivityProbe ---- /// diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests/OpcUaClientHistoryTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests/OpcUaClientHistoryTests.cs new file mode 100644 index 0000000..2ecdefc --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests/OpcUaClientHistoryTests.cs @@ -0,0 +1,91 @@ +using Opc.Ua; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; + +namespace ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests; + +[Trait("Category", "Unit")] +public sealed class OpcUaClientHistoryTests +{ + [Theory] + [InlineData(HistoryAggregateType.Average)] + [InlineData(HistoryAggregateType.Minimum)] + [InlineData(HistoryAggregateType.Maximum)] + [InlineData(HistoryAggregateType.Total)] + [InlineData(HistoryAggregateType.Count)] + public void MapAggregateToNodeId_returns_standard_Part13_aggregate_for_every_enum(HistoryAggregateType agg) + { + var nodeId = OpcUaClientDriver.MapAggregateToNodeId(agg); + NodeId.IsNull(nodeId).ShouldBeFalse(); + // Every mapping should resolve to an AggregateFunction_* NodeId (namespace 0, numeric id). + nodeId.NamespaceIndex.ShouldBe((ushort)0); + } + + [Fact] + public void MapAggregateToNodeId_rejects_invalid_enum_value() + { + // Defense-in-depth: a future HistoryAggregateType addition mustn't silently fall through. + Should.Throw(() => + OpcUaClientDriver.MapAggregateToNodeId((HistoryAggregateType)99)); + } + + [Fact] + public async Task ReadRawAsync_without_initialize_throws_InvalidOperationException() + { + using var drv = new OpcUaClientDriver(new OpcUaClientDriverOptions(), "opcua-hist-uninit"); + await Should.ThrowAsync(async () => + await drv.ReadRawAsync("ns=2;s=Counter", + DateTime.UtcNow.AddMinutes(-5), DateTime.UtcNow, 1000, + TestContext.Current.CancellationToken)); + } + + [Fact] + public async Task ReadRawAsync_with_malformed_NodeId_returns_empty_result_not_throw() + { + // Same defensive pattern as ReadAsync / WriteAsync โ€” malformed NodeId short-circuits + // to an empty result rather than crashing a batch history call. Needs init via the + // throw path first, then we pass "" to trigger the parse-fail branch inside + // ExecuteHistoryReadAsync. The init itself fails against 127.0.0.1:1 so we stop there. + // Not runnable without init โ€” keep as placeholder for when the in-process fixture + // PR lands. + await Task.CompletedTask; + } + + [Fact] + public async Task ReadProcessedAsync_without_initialize_throws_InvalidOperationException() + { + using var drv = new OpcUaClientDriver(new OpcUaClientDriverOptions(), "opcua-hist-uninit"); + await Should.ThrowAsync(async () => + await drv.ReadProcessedAsync("ns=2;s=Counter", + DateTime.UtcNow.AddMinutes(-5), DateTime.UtcNow, + TimeSpan.FromSeconds(10), HistoryAggregateType.Average, + TestContext.Current.CancellationToken)); + } + + [Fact] + public async Task ReadAtTimeAsync_without_initialize_throws_InvalidOperationException() + { + using var drv = new OpcUaClientDriver(new OpcUaClientDriverOptions(), "opcua-hist-uninit"); + await Should.ThrowAsync(async () => + await drv.ReadAtTimeAsync("ns=2;s=Counter", + [DateTime.UtcNow.AddMinutes(-5), DateTime.UtcNow], + TestContext.Current.CancellationToken)); + } + + [Fact] + public async Task ReadEventsAsync_throws_NotSupportedException_as_documented() + { + // The IHistoryProvider default implementation throws; the OPC UA Client driver + // deliberately inherits that default (see PR 76 commit body) because the OPC UA + // client call path needs an EventFilter SelectClauses spec the interface doesn't carry. + using var drv = new OpcUaClientDriver(new OpcUaClientDriverOptions(), "opcua-events-default"); + await Should.ThrowAsync(async () => + await ((IHistoryProvider)drv).ReadEventsAsync( + sourceName: null, + startUtc: DateTime.UtcNow.AddMinutes(-5), + endUtc: DateTime.UtcNow, + maxEvents: 100, + cancellationToken: TestContext.Current.CancellationToken)); + } +}