Phase 3 PR 76 -- OPC UA Client IHistoryProvider #75
@@ -27,7 +27,7 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient;
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string driverInstanceId)
|
||||
: IDriver, ITagDiscovery, IReadable, IWritable, ISubscribable, IHostConnectivityProbe, IAlarmSource, IDisposable, IAsyncDisposable
|
||||
: IDriver, ITagDiscovery, IReadable, IWritable, ISubscribable, IHostConnectivityProbe, IAlarmSource, IHistoryProvider, IDisposable, IAsyncDisposable
|
||||
{
|
||||
// ---- IAlarmSource state ----
|
||||
|
||||
@@ -1149,6 +1149,126 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
|
||||
public string DiagnosticId => $"opcua-alarm-sub-{Id}";
|
||||
}
|
||||
|
||||
// ---- IHistoryProvider (passthrough to upstream server) ----
|
||||
|
||||
public async Task<Core.Abstractions.HistoryReadResult> ReadRawAsync(
|
||||
string fullReference, DateTime startUtc, DateTime endUtc, uint maxValuesPerNode,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
var details = new ReadRawModifiedDetails
|
||||
{
|
||||
IsReadModified = false,
|
||||
StartTime = startUtc,
|
||||
EndTime = endUtc,
|
||||
NumValuesPerNode = maxValuesPerNode,
|
||||
ReturnBounds = false,
|
||||
};
|
||||
return await ExecuteHistoryReadAsync(fullReference, new ExtensionObject(details), cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
}
|
||||
|
||||
public async Task<Core.Abstractions.HistoryReadResult> ReadProcessedAsync(
|
||||
string fullReference, DateTime startUtc, DateTime endUtc, TimeSpan interval,
|
||||
HistoryAggregateType aggregate, CancellationToken cancellationToken)
|
||||
{
|
||||
var aggregateId = MapAggregateToNodeId(aggregate);
|
||||
var details = new ReadProcessedDetails
|
||||
{
|
||||
StartTime = startUtc,
|
||||
EndTime = endUtc,
|
||||
ProcessingInterval = interval.TotalMilliseconds,
|
||||
AggregateType = [aggregateId],
|
||||
};
|
||||
return await ExecuteHistoryReadAsync(fullReference, new ExtensionObject(details), cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
}
|
||||
|
||||
public async Task<Core.Abstractions.HistoryReadResult> ReadAtTimeAsync(
|
||||
string fullReference, IReadOnlyList<DateTime> timestampsUtc, CancellationToken cancellationToken)
|
||||
{
|
||||
var reqTimes = new DateTimeCollection(timestampsUtc);
|
||||
var details = new ReadAtTimeDetails
|
||||
{
|
||||
ReqTimes = reqTimes,
|
||||
UseSimpleBounds = true,
|
||||
};
|
||||
return await ExecuteHistoryReadAsync(fullReference, new ExtensionObject(details), cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Shared HistoryRead wire path — used by Raw/Processed/AtTime. Handles NodeId parse,
|
||||
/// Session.HistoryReadAsync call, Bad-StatusCode passthrough (no translation per §8
|
||||
/// cascading-quality rule), and HistoryData unwrap into <see cref="DataValueSnapshot"/>.
|
||||
/// </summary>
|
||||
private async Task<Core.Abstractions.HistoryReadResult> ExecuteHistoryReadAsync(
|
||||
string fullReference, ExtensionObject historyReadDetails, CancellationToken ct)
|
||||
{
|
||||
var session = RequireSession();
|
||||
if (!TryParseNodeId(session, fullReference, out var nodeId))
|
||||
{
|
||||
return new Core.Abstractions.HistoryReadResult([], null);
|
||||
}
|
||||
|
||||
var nodesToRead = new HistoryReadValueIdCollection
|
||||
{
|
||||
new HistoryReadValueId { NodeId = nodeId },
|
||||
};
|
||||
|
||||
await _gate.WaitAsync(ct).ConfigureAwait(false);
|
||||
try
|
||||
{
|
||||
var resp = await session.HistoryReadAsync(
|
||||
requestHeader: null,
|
||||
historyReadDetails: historyReadDetails,
|
||||
timestampsToReturn: TimestampsToReturn.Both,
|
||||
releaseContinuationPoints: false,
|
||||
nodesToRead: nodesToRead,
|
||||
ct: ct).ConfigureAwait(false);
|
||||
|
||||
if (resp.Results.Count == 0) return new Core.Abstractions.HistoryReadResult([], null);
|
||||
var r = resp.Results[0];
|
||||
|
||||
// Unwrap HistoryData from the ExtensionObject-encoded payload the SDK returns.
|
||||
// Samples stay in chronological order per OPC UA Part 11; cascading-quality
|
||||
// rule: preserve each DataValue's upstream StatusCode + timestamps verbatim.
|
||||
var samples = new List<DataValueSnapshot>();
|
||||
if (r.HistoryData?.Body is HistoryData hd)
|
||||
{
|
||||
var now = DateTime.UtcNow;
|
||||
foreach (var dv in hd.DataValues)
|
||||
{
|
||||
samples.Add(new DataValueSnapshot(
|
||||
Value: dv.Value,
|
||||
StatusCode: dv.StatusCode.Code,
|
||||
SourceTimestampUtc: dv.SourceTimestamp == DateTime.MinValue ? null : dv.SourceTimestamp,
|
||||
ServerTimestampUtc: dv.ServerTimestamp == DateTime.MinValue ? now : dv.ServerTimestamp));
|
||||
}
|
||||
}
|
||||
|
||||
var contPt = r.ContinuationPoint is { Length: > 0 } ? r.ContinuationPoint : null;
|
||||
return new Core.Abstractions.HistoryReadResult(samples, contPt);
|
||||
}
|
||||
finally { _gate.Release(); }
|
||||
}
|
||||
|
||||
/// <summary>Map <see cref="HistoryAggregateType"/> to the OPC UA Part 13 standard aggregate NodeId.</summary>
|
||||
internal static NodeId MapAggregateToNodeId(HistoryAggregateType aggregate) => aggregate switch
|
||||
{
|
||||
HistoryAggregateType.Average => ObjectIds.AggregateFunction_Average,
|
||||
HistoryAggregateType.Minimum => ObjectIds.AggregateFunction_Minimum,
|
||||
HistoryAggregateType.Maximum => ObjectIds.AggregateFunction_Maximum,
|
||||
HistoryAggregateType.Total => ObjectIds.AggregateFunction_Total,
|
||||
HistoryAggregateType.Count => ObjectIds.AggregateFunction_Count,
|
||||
_ => throw new ArgumentOutOfRangeException(nameof(aggregate), aggregate, null),
|
||||
};
|
||||
|
||||
// ReadEventsAsync stays at the interface default (throws NotSupportedException) per
|
||||
// IHistoryProvider contract -- the OPC UA Client driver CAN forward HistoryReadEvents,
|
||||
// but the call-site needs an EventFilter SelectClauses surface which the interface
|
||||
// doesn't carry. Landing the event-history passthrough requires extending
|
||||
// IHistoryProvider.ReadEventsAsync with a filter-spec parameter; out of scope for this PR.
|
||||
|
||||
// ---- IHostConnectivityProbe ----
|
||||
|
||||
/// <summary>
|
||||
|
||||
@@ -0,0 +1,91 @@
|
||||
using Opc.Ua;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests;
|
||||
|
||||
[Trait("Category", "Unit")]
|
||||
public sealed class OpcUaClientHistoryTests
|
||||
{
|
||||
[Theory]
|
||||
[InlineData(HistoryAggregateType.Average)]
|
||||
[InlineData(HistoryAggregateType.Minimum)]
|
||||
[InlineData(HistoryAggregateType.Maximum)]
|
||||
[InlineData(HistoryAggregateType.Total)]
|
||||
[InlineData(HistoryAggregateType.Count)]
|
||||
public void MapAggregateToNodeId_returns_standard_Part13_aggregate_for_every_enum(HistoryAggregateType agg)
|
||||
{
|
||||
var nodeId = OpcUaClientDriver.MapAggregateToNodeId(agg);
|
||||
NodeId.IsNull(nodeId).ShouldBeFalse();
|
||||
// Every mapping should resolve to an AggregateFunction_* NodeId (namespace 0, numeric id).
|
||||
nodeId.NamespaceIndex.ShouldBe((ushort)0);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void MapAggregateToNodeId_rejects_invalid_enum_value()
|
||||
{
|
||||
// Defense-in-depth: a future HistoryAggregateType addition mustn't silently fall through.
|
||||
Should.Throw<ArgumentOutOfRangeException>(() =>
|
||||
OpcUaClientDriver.MapAggregateToNodeId((HistoryAggregateType)99));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ReadRawAsync_without_initialize_throws_InvalidOperationException()
|
||||
{
|
||||
using var drv = new OpcUaClientDriver(new OpcUaClientDriverOptions(), "opcua-hist-uninit");
|
||||
await Should.ThrowAsync<InvalidOperationException>(async () =>
|
||||
await drv.ReadRawAsync("ns=2;s=Counter",
|
||||
DateTime.UtcNow.AddMinutes(-5), DateTime.UtcNow, 1000,
|
||||
TestContext.Current.CancellationToken));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ReadRawAsync_with_malformed_NodeId_returns_empty_result_not_throw()
|
||||
{
|
||||
// Same defensive pattern as ReadAsync / WriteAsync — malformed NodeId short-circuits
|
||||
// to an empty result rather than crashing a batch history call. Needs init via the
|
||||
// throw path first, then we pass "" to trigger the parse-fail branch inside
|
||||
// ExecuteHistoryReadAsync. The init itself fails against 127.0.0.1:1 so we stop there.
|
||||
// Not runnable without init — keep as placeholder for when the in-process fixture
|
||||
// PR lands.
|
||||
await Task.CompletedTask;
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ReadProcessedAsync_without_initialize_throws_InvalidOperationException()
|
||||
{
|
||||
using var drv = new OpcUaClientDriver(new OpcUaClientDriverOptions(), "opcua-hist-uninit");
|
||||
await Should.ThrowAsync<InvalidOperationException>(async () =>
|
||||
await drv.ReadProcessedAsync("ns=2;s=Counter",
|
||||
DateTime.UtcNow.AddMinutes(-5), DateTime.UtcNow,
|
||||
TimeSpan.FromSeconds(10), HistoryAggregateType.Average,
|
||||
TestContext.Current.CancellationToken));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ReadAtTimeAsync_without_initialize_throws_InvalidOperationException()
|
||||
{
|
||||
using var drv = new OpcUaClientDriver(new OpcUaClientDriverOptions(), "opcua-hist-uninit");
|
||||
await Should.ThrowAsync<InvalidOperationException>(async () =>
|
||||
await drv.ReadAtTimeAsync("ns=2;s=Counter",
|
||||
[DateTime.UtcNow.AddMinutes(-5), DateTime.UtcNow],
|
||||
TestContext.Current.CancellationToken));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ReadEventsAsync_throws_NotSupportedException_as_documented()
|
||||
{
|
||||
// The IHistoryProvider default implementation throws; the OPC UA Client driver
|
||||
// deliberately inherits that default (see PR 76 commit body) because the OPC UA
|
||||
// client call path needs an EventFilter SelectClauses spec the interface doesn't carry.
|
||||
using var drv = new OpcUaClientDriver(new OpcUaClientDriverOptions(), "opcua-events-default");
|
||||
await Should.ThrowAsync<NotSupportedException>(async () =>
|
||||
await ((IHistoryProvider)drv).ReadEventsAsync(
|
||||
sourceName: null,
|
||||
startUtc: DateTime.UtcNow.AddMinutes(-5),
|
||||
endUtc: DateTime.UtcNow,
|
||||
maxEvents: 100,
|
||||
cancellationToken: TestContext.Current.CancellationToken));
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user