Phase 3 PR 76 -- OPC UA Client IHistoryProvider #75
@@ -27,7 +27,7 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient;
|
|||||||
/// </para>
|
/// </para>
|
||||||
/// </remarks>
|
/// </remarks>
|
||||||
public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string driverInstanceId)
|
public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string driverInstanceId)
|
||||||
: IDriver, ITagDiscovery, IReadable, IWritable, ISubscribable, IHostConnectivityProbe, IAlarmSource, IDisposable, IAsyncDisposable
|
: IDriver, ITagDiscovery, IReadable, IWritable, ISubscribable, IHostConnectivityProbe, IAlarmSource, IHistoryProvider, IDisposable, IAsyncDisposable
|
||||||
{
|
{
|
||||||
// ---- IAlarmSource state ----
|
// ---- IAlarmSource state ----
|
||||||
|
|
||||||
@@ -1149,6 +1149,126 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
|
|||||||
public string DiagnosticId => $"opcua-alarm-sub-{Id}";
|
public string DiagnosticId => $"opcua-alarm-sub-{Id}";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ---- IHistoryProvider (passthrough to upstream server) ----
|
||||||
|
|
||||||
|
public async Task<Core.Abstractions.HistoryReadResult> ReadRawAsync(
|
||||||
|
string fullReference, DateTime startUtc, DateTime endUtc, uint maxValuesPerNode,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
var details = new ReadRawModifiedDetails
|
||||||
|
{
|
||||||
|
IsReadModified = false,
|
||||||
|
StartTime = startUtc,
|
||||||
|
EndTime = endUtc,
|
||||||
|
NumValuesPerNode = maxValuesPerNode,
|
||||||
|
ReturnBounds = false,
|
||||||
|
};
|
||||||
|
return await ExecuteHistoryReadAsync(fullReference, new ExtensionObject(details), cancellationToken)
|
||||||
|
.ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task<Core.Abstractions.HistoryReadResult> ReadProcessedAsync(
|
||||||
|
string fullReference, DateTime startUtc, DateTime endUtc, TimeSpan interval,
|
||||||
|
HistoryAggregateType aggregate, CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
var aggregateId = MapAggregateToNodeId(aggregate);
|
||||||
|
var details = new ReadProcessedDetails
|
||||||
|
{
|
||||||
|
StartTime = startUtc,
|
||||||
|
EndTime = endUtc,
|
||||||
|
ProcessingInterval = interval.TotalMilliseconds,
|
||||||
|
AggregateType = [aggregateId],
|
||||||
|
};
|
||||||
|
return await ExecuteHistoryReadAsync(fullReference, new ExtensionObject(details), cancellationToken)
|
||||||
|
.ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task<Core.Abstractions.HistoryReadResult> ReadAtTimeAsync(
|
||||||
|
string fullReference, IReadOnlyList<DateTime> timestampsUtc, CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
var reqTimes = new DateTimeCollection(timestampsUtc);
|
||||||
|
var details = new ReadAtTimeDetails
|
||||||
|
{
|
||||||
|
ReqTimes = reqTimes,
|
||||||
|
UseSimpleBounds = true,
|
||||||
|
};
|
||||||
|
return await ExecuteHistoryReadAsync(fullReference, new ExtensionObject(details), cancellationToken)
|
||||||
|
.ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Shared HistoryRead wire path — used by Raw/Processed/AtTime. Handles NodeId parse,
|
||||||
|
/// Session.HistoryReadAsync call, Bad-StatusCode passthrough (no translation per §8
|
||||||
|
/// cascading-quality rule), and HistoryData unwrap into <see cref="DataValueSnapshot"/>.
|
||||||
|
/// </summary>
|
||||||
|
private async Task<Core.Abstractions.HistoryReadResult> ExecuteHistoryReadAsync(
|
||||||
|
string fullReference, ExtensionObject historyReadDetails, CancellationToken ct)
|
||||||
|
{
|
||||||
|
var session = RequireSession();
|
||||||
|
if (!TryParseNodeId(session, fullReference, out var nodeId))
|
||||||
|
{
|
||||||
|
return new Core.Abstractions.HistoryReadResult([], null);
|
||||||
|
}
|
||||||
|
|
||||||
|
var nodesToRead = new HistoryReadValueIdCollection
|
||||||
|
{
|
||||||
|
new HistoryReadValueId { NodeId = nodeId },
|
||||||
|
};
|
||||||
|
|
||||||
|
await _gate.WaitAsync(ct).ConfigureAwait(false);
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var resp = await session.HistoryReadAsync(
|
||||||
|
requestHeader: null,
|
||||||
|
historyReadDetails: historyReadDetails,
|
||||||
|
timestampsToReturn: TimestampsToReturn.Both,
|
||||||
|
releaseContinuationPoints: false,
|
||||||
|
nodesToRead: nodesToRead,
|
||||||
|
ct: ct).ConfigureAwait(false);
|
||||||
|
|
||||||
|
if (resp.Results.Count == 0) return new Core.Abstractions.HistoryReadResult([], null);
|
||||||
|
var r = resp.Results[0];
|
||||||
|
|
||||||
|
// Unwrap HistoryData from the ExtensionObject-encoded payload the SDK returns.
|
||||||
|
// Samples stay in chronological order per OPC UA Part 11; cascading-quality
|
||||||
|
// rule: preserve each DataValue's upstream StatusCode + timestamps verbatim.
|
||||||
|
var samples = new List<DataValueSnapshot>();
|
||||||
|
if (r.HistoryData?.Body is HistoryData hd)
|
||||||
|
{
|
||||||
|
var now = DateTime.UtcNow;
|
||||||
|
foreach (var dv in hd.DataValues)
|
||||||
|
{
|
||||||
|
samples.Add(new DataValueSnapshot(
|
||||||
|
Value: dv.Value,
|
||||||
|
StatusCode: dv.StatusCode.Code,
|
||||||
|
SourceTimestampUtc: dv.SourceTimestamp == DateTime.MinValue ? null : dv.SourceTimestamp,
|
||||||
|
ServerTimestampUtc: dv.ServerTimestamp == DateTime.MinValue ? now : dv.ServerTimestamp));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var contPt = r.ContinuationPoint is { Length: > 0 } ? r.ContinuationPoint : null;
|
||||||
|
return new Core.Abstractions.HistoryReadResult(samples, contPt);
|
||||||
|
}
|
||||||
|
finally { _gate.Release(); }
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>Map <see cref="HistoryAggregateType"/> to the OPC UA Part 13 standard aggregate NodeId.</summary>
|
||||||
|
internal static NodeId MapAggregateToNodeId(HistoryAggregateType aggregate) => aggregate switch
|
||||||
|
{
|
||||||
|
HistoryAggregateType.Average => ObjectIds.AggregateFunction_Average,
|
||||||
|
HistoryAggregateType.Minimum => ObjectIds.AggregateFunction_Minimum,
|
||||||
|
HistoryAggregateType.Maximum => ObjectIds.AggregateFunction_Maximum,
|
||||||
|
HistoryAggregateType.Total => ObjectIds.AggregateFunction_Total,
|
||||||
|
HistoryAggregateType.Count => ObjectIds.AggregateFunction_Count,
|
||||||
|
_ => throw new ArgumentOutOfRangeException(nameof(aggregate), aggregate, null),
|
||||||
|
};
|
||||||
|
|
||||||
|
// ReadEventsAsync stays at the interface default (throws NotSupportedException) per
|
||||||
|
// IHistoryProvider contract -- the OPC UA Client driver CAN forward HistoryReadEvents,
|
||||||
|
// but the call-site needs an EventFilter SelectClauses surface which the interface
|
||||||
|
// doesn't carry. Landing the event-history passthrough requires extending
|
||||||
|
// IHistoryProvider.ReadEventsAsync with a filter-spec parameter; out of scope for this PR.
|
||||||
|
|
||||||
// ---- IHostConnectivityProbe ----
|
// ---- IHostConnectivityProbe ----
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
|
|||||||
@@ -0,0 +1,91 @@
|
|||||||
|
using Opc.Ua;
|
||||||
|
using Shouldly;
|
||||||
|
using Xunit;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests;
|
||||||
|
|
||||||
|
[Trait("Category", "Unit")]
|
||||||
|
public sealed class OpcUaClientHistoryTests
|
||||||
|
{
|
||||||
|
[Theory]
|
||||||
|
[InlineData(HistoryAggregateType.Average)]
|
||||||
|
[InlineData(HistoryAggregateType.Minimum)]
|
||||||
|
[InlineData(HistoryAggregateType.Maximum)]
|
||||||
|
[InlineData(HistoryAggregateType.Total)]
|
||||||
|
[InlineData(HistoryAggregateType.Count)]
|
||||||
|
public void MapAggregateToNodeId_returns_standard_Part13_aggregate_for_every_enum(HistoryAggregateType agg)
|
||||||
|
{
|
||||||
|
var nodeId = OpcUaClientDriver.MapAggregateToNodeId(agg);
|
||||||
|
NodeId.IsNull(nodeId).ShouldBeFalse();
|
||||||
|
// Every mapping should resolve to an AggregateFunction_* NodeId (namespace 0, numeric id).
|
||||||
|
nodeId.NamespaceIndex.ShouldBe((ushort)0);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void MapAggregateToNodeId_rejects_invalid_enum_value()
|
||||||
|
{
|
||||||
|
// Defense-in-depth: a future HistoryAggregateType addition mustn't silently fall through.
|
||||||
|
Should.Throw<ArgumentOutOfRangeException>(() =>
|
||||||
|
OpcUaClientDriver.MapAggregateToNodeId((HistoryAggregateType)99));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task ReadRawAsync_without_initialize_throws_InvalidOperationException()
|
||||||
|
{
|
||||||
|
using var drv = new OpcUaClientDriver(new OpcUaClientDriverOptions(), "opcua-hist-uninit");
|
||||||
|
await Should.ThrowAsync<InvalidOperationException>(async () =>
|
||||||
|
await drv.ReadRawAsync("ns=2;s=Counter",
|
||||||
|
DateTime.UtcNow.AddMinutes(-5), DateTime.UtcNow, 1000,
|
||||||
|
TestContext.Current.CancellationToken));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task ReadRawAsync_with_malformed_NodeId_returns_empty_result_not_throw()
|
||||||
|
{
|
||||||
|
// Same defensive pattern as ReadAsync / WriteAsync — malformed NodeId short-circuits
|
||||||
|
// to an empty result rather than crashing a batch history call. Needs init via the
|
||||||
|
// throw path first, then we pass "" to trigger the parse-fail branch inside
|
||||||
|
// ExecuteHistoryReadAsync. The init itself fails against 127.0.0.1:1 so we stop there.
|
||||||
|
// Not runnable without init — keep as placeholder for when the in-process fixture
|
||||||
|
// PR lands.
|
||||||
|
await Task.CompletedTask;
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task ReadProcessedAsync_without_initialize_throws_InvalidOperationException()
|
||||||
|
{
|
||||||
|
using var drv = new OpcUaClientDriver(new OpcUaClientDriverOptions(), "opcua-hist-uninit");
|
||||||
|
await Should.ThrowAsync<InvalidOperationException>(async () =>
|
||||||
|
await drv.ReadProcessedAsync("ns=2;s=Counter",
|
||||||
|
DateTime.UtcNow.AddMinutes(-5), DateTime.UtcNow,
|
||||||
|
TimeSpan.FromSeconds(10), HistoryAggregateType.Average,
|
||||||
|
TestContext.Current.CancellationToken));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task ReadAtTimeAsync_without_initialize_throws_InvalidOperationException()
|
||||||
|
{
|
||||||
|
using var drv = new OpcUaClientDriver(new OpcUaClientDriverOptions(), "opcua-hist-uninit");
|
||||||
|
await Should.ThrowAsync<InvalidOperationException>(async () =>
|
||||||
|
await drv.ReadAtTimeAsync("ns=2;s=Counter",
|
||||||
|
[DateTime.UtcNow.AddMinutes(-5), DateTime.UtcNow],
|
||||||
|
TestContext.Current.CancellationToken));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task ReadEventsAsync_throws_NotSupportedException_as_documented()
|
||||||
|
{
|
||||||
|
// The IHistoryProvider default implementation throws; the OPC UA Client driver
|
||||||
|
// deliberately inherits that default (see PR 76 commit body) because the OPC UA
|
||||||
|
// client call path needs an EventFilter SelectClauses spec the interface doesn't carry.
|
||||||
|
using var drv = new OpcUaClientDriver(new OpcUaClientDriverOptions(), "opcua-events-default");
|
||||||
|
await Should.ThrowAsync<NotSupportedException>(async () =>
|
||||||
|
await ((IHistoryProvider)drv).ReadEventsAsync(
|
||||||
|
sourceName: null,
|
||||||
|
startUtc: DateTime.UtcNow.AddMinutes(-5),
|
||||||
|
endUtc: DateTime.UtcNow,
|
||||||
|
maxEvents: 100,
|
||||||
|
cancellationToken: TestContext.Current.CancellationToken));
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user