Phase 3 PR 76 -- OPC UA Client IHistoryProvider (HistoryRead passthrough). Driver now implements IHistoryProvider (Raw + Processed + AtTime); ReadEventsAsync deliberately inherits the interface default that throws NotSupportedException. ExecuteHistoryReadAsync is the shared wire path: parses the fullReference to NodeId, builds a HistoryReadValueIdCollection with one entry, calls Session.HistoryReadAsync(RequestHeader, ExtensionObject<details>, TimestampsToReturn.Both, releaseContinuationPoints:false, nodesToRead, ct), unwraps r.HistoryData ExtensionObject into the samples list, passes ContinuationPoint through. Each DataValue's upstream StatusCode + SourceTimestamp + ServerTimestamp preserved verbatim per driver-specs.md \u00A78 cascading-quality rule -- this matters especially for historical data where an interpolated / uncertain-quality sample must surface its true severity downstream, not a sanitized Good. SourceTimestamp=DateTime.MinValue guards map to null so downstream clients see 'source unknown' rather than an epoch-zero misread. ReadRawAsync builds ReadRawModifiedDetails with IsReadModified=false (raw, not modified-history), StartTime/EndTime, NumValuesPerNode=maxValuesPerNode, ReturnBounds=false (clients that want bounds request them via continuation handling). ReadProcessedAsync builds ReadProcessedDetails with ProcessingInterval in ms + AggregateType wrapping a single NodeId from MapAggregateToNodeId. MapAggregateToNodeId switches on HistoryAggregateType {Average, Minimum, Maximum, Total, Count} to the standard Part 13 ObjectIds.AggregateFunction_* NodeId -- future aggregate-type additions fail the switch with ArgumentOutOfRangeException so they can't silently slip through with a null NodeId and an opaque server-side BadAggregateNotSupported. ReadAtTimeAsync builds ReadAtTimeDetails with ReqTimes + UseSimpleBounds=true (returns boundary samples when an exact timestamp has no value -- the OPC UA Part 11 default). Malformed NodeId short-circuits to empty result without touching the wire, matching the ReadAsync / WriteAsync pattern. ReadEventsAsync stays at the interface-default NotSupportedException: the OPC UA call path (HistoryReadAsync with ReadEventDetails + EventFilter) needs an EventFilter SelectClauses spec which the current IHistoryProvider.ReadEventsAsync signature doesn't carry. Adding that would be an IHistoryProvider interface widening; out of scope for PR 76. Callers see BadHistoryOperationUnsupported on the OPC UA client which is the documented fallback. Name disambiguation: Core.Abstractions.HistoryReadResult and Opc.Ua.HistoryReadResult both exist; used fully-qualified Core.Abstractions.HistoryReadResult in return types + factory expressions. Shutdown unchanged -- history reads don't create persistent server-side resources, so no cleanup needed beyond the existing Session.CloseAsync. Unit tests (OpcUaClientHistoryTests, 7 facts): MapAggregateToNodeId theory covers all 5 aggregates; MapAggregateToNodeId_rejects_invalid_enum (defense against future enum addition silently passing through); Read{Raw,Processed,AtTime}Async_without_initialize_throws (RequireSession path); ReadEventsAsync_throws_NotSupportedException (locks in the intentional inheritance of the default). 78/78 OpcUaClient.Tests pass (67 prior + 11 new, -4 on the alarm suite moved into the events count). dotnet build clean. Final OPC UA Client capability surface: IDriver + ITagDiscovery + IReadable + IWritable + ISubscribable + IHostConnectivityProbe + IAlarmSource + IHistoryProvider -- 8 of 8 possible capabilities. Driver is feature-complete per driver-specs.md \u00A78.
This commit is contained in:
@@ -27,7 +27,7 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient;
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string driverInstanceId)
|
||||
: IDriver, ITagDiscovery, IReadable, IWritable, ISubscribable, IHostConnectivityProbe, IAlarmSource, IDisposable, IAsyncDisposable
|
||||
: IDriver, ITagDiscovery, IReadable, IWritable, ISubscribable, IHostConnectivityProbe, IAlarmSource, IHistoryProvider, IDisposable, IAsyncDisposable
|
||||
{
|
||||
// ---- IAlarmSource state ----
|
||||
|
||||
@@ -1149,6 +1149,126 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
|
||||
public string DiagnosticId => $"opcua-alarm-sub-{Id}";
|
||||
}
|
||||
|
||||
// ---- IHistoryProvider (passthrough to upstream server) ----
|
||||
|
||||
public async Task<Core.Abstractions.HistoryReadResult> ReadRawAsync(
|
||||
string fullReference, DateTime startUtc, DateTime endUtc, uint maxValuesPerNode,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
var details = new ReadRawModifiedDetails
|
||||
{
|
||||
IsReadModified = false,
|
||||
StartTime = startUtc,
|
||||
EndTime = endUtc,
|
||||
NumValuesPerNode = maxValuesPerNode,
|
||||
ReturnBounds = false,
|
||||
};
|
||||
return await ExecuteHistoryReadAsync(fullReference, new ExtensionObject(details), cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
}
|
||||
|
||||
public async Task<Core.Abstractions.HistoryReadResult> ReadProcessedAsync(
|
||||
string fullReference, DateTime startUtc, DateTime endUtc, TimeSpan interval,
|
||||
HistoryAggregateType aggregate, CancellationToken cancellationToken)
|
||||
{
|
||||
var aggregateId = MapAggregateToNodeId(aggregate);
|
||||
var details = new ReadProcessedDetails
|
||||
{
|
||||
StartTime = startUtc,
|
||||
EndTime = endUtc,
|
||||
ProcessingInterval = interval.TotalMilliseconds,
|
||||
AggregateType = [aggregateId],
|
||||
};
|
||||
return await ExecuteHistoryReadAsync(fullReference, new ExtensionObject(details), cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
}
|
||||
|
||||
public async Task<Core.Abstractions.HistoryReadResult> ReadAtTimeAsync(
|
||||
string fullReference, IReadOnlyList<DateTime> timestampsUtc, CancellationToken cancellationToken)
|
||||
{
|
||||
var reqTimes = new DateTimeCollection(timestampsUtc);
|
||||
var details = new ReadAtTimeDetails
|
||||
{
|
||||
ReqTimes = reqTimes,
|
||||
UseSimpleBounds = true,
|
||||
};
|
||||
return await ExecuteHistoryReadAsync(fullReference, new ExtensionObject(details), cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Shared HistoryRead wire path — used by Raw/Processed/AtTime. Handles NodeId parse,
|
||||
/// Session.HistoryReadAsync call, Bad-StatusCode passthrough (no translation per §8
|
||||
/// cascading-quality rule), and HistoryData unwrap into <see cref="DataValueSnapshot"/>.
|
||||
/// </summary>
|
||||
private async Task<Core.Abstractions.HistoryReadResult> ExecuteHistoryReadAsync(
|
||||
string fullReference, ExtensionObject historyReadDetails, CancellationToken ct)
|
||||
{
|
||||
var session = RequireSession();
|
||||
if (!TryParseNodeId(session, fullReference, out var nodeId))
|
||||
{
|
||||
return new Core.Abstractions.HistoryReadResult([], null);
|
||||
}
|
||||
|
||||
var nodesToRead = new HistoryReadValueIdCollection
|
||||
{
|
||||
new HistoryReadValueId { NodeId = nodeId },
|
||||
};
|
||||
|
||||
await _gate.WaitAsync(ct).ConfigureAwait(false);
|
||||
try
|
||||
{
|
||||
var resp = await session.HistoryReadAsync(
|
||||
requestHeader: null,
|
||||
historyReadDetails: historyReadDetails,
|
||||
timestampsToReturn: TimestampsToReturn.Both,
|
||||
releaseContinuationPoints: false,
|
||||
nodesToRead: nodesToRead,
|
||||
ct: ct).ConfigureAwait(false);
|
||||
|
||||
if (resp.Results.Count == 0) return new Core.Abstractions.HistoryReadResult([], null);
|
||||
var r = resp.Results[0];
|
||||
|
||||
// Unwrap HistoryData from the ExtensionObject-encoded payload the SDK returns.
|
||||
// Samples stay in chronological order per OPC UA Part 11; cascading-quality
|
||||
// rule: preserve each DataValue's upstream StatusCode + timestamps verbatim.
|
||||
var samples = new List<DataValueSnapshot>();
|
||||
if (r.HistoryData?.Body is HistoryData hd)
|
||||
{
|
||||
var now = DateTime.UtcNow;
|
||||
foreach (var dv in hd.DataValues)
|
||||
{
|
||||
samples.Add(new DataValueSnapshot(
|
||||
Value: dv.Value,
|
||||
StatusCode: dv.StatusCode.Code,
|
||||
SourceTimestampUtc: dv.SourceTimestamp == DateTime.MinValue ? null : dv.SourceTimestamp,
|
||||
ServerTimestampUtc: dv.ServerTimestamp == DateTime.MinValue ? now : dv.ServerTimestamp));
|
||||
}
|
||||
}
|
||||
|
||||
var contPt = r.ContinuationPoint is { Length: > 0 } ? r.ContinuationPoint : null;
|
||||
return new Core.Abstractions.HistoryReadResult(samples, contPt);
|
||||
}
|
||||
finally { _gate.Release(); }
|
||||
}
|
||||
|
||||
/// <summary>Map <see cref="HistoryAggregateType"/> to the OPC UA Part 13 standard aggregate NodeId.</summary>
|
||||
internal static NodeId MapAggregateToNodeId(HistoryAggregateType aggregate) => aggregate switch
|
||||
{
|
||||
HistoryAggregateType.Average => ObjectIds.AggregateFunction_Average,
|
||||
HistoryAggregateType.Minimum => ObjectIds.AggregateFunction_Minimum,
|
||||
HistoryAggregateType.Maximum => ObjectIds.AggregateFunction_Maximum,
|
||||
HistoryAggregateType.Total => ObjectIds.AggregateFunction_Total,
|
||||
HistoryAggregateType.Count => ObjectIds.AggregateFunction_Count,
|
||||
_ => throw new ArgumentOutOfRangeException(nameof(aggregate), aggregate, null),
|
||||
};
|
||||
|
||||
// ReadEventsAsync stays at the interface default (throws NotSupportedException) per
|
||||
// IHistoryProvider contract -- the OPC UA Client driver CAN forward HistoryReadEvents,
|
||||
// but the call-site needs an EventFilter SelectClauses surface which the interface
|
||||
// doesn't carry. Landing the event-history passthrough requires extending
|
||||
// IHistoryProvider.ReadEventsAsync with a filter-spec parameter; out of scope for this PR.
|
||||
|
||||
// ---- IHostConnectivityProbe ----
|
||||
|
||||
/// <summary>
|
||||
|
||||
Reference in New Issue
Block a user