Phase 3 PR 38 — DriverNodeManager HistoryRead override (LMX #1 finish) #37

Merged
dohertj2 merged 1 commits from phase-3-pr38-historyread-servicehandler into v2 2026-04-18 17:53:26 -04:00
4 changed files with 952 additions and 17 deletions

View File

@@ -7,24 +7,50 @@ Basic256Sha256 endpoints and alarms are observable through
specific before the stack can fully replace the v1 deployment, in
rough priority order.
## 1. Proxy-side `IHistoryProvider` for `ReadAtTime` / `ReadEvents`
**Status**: Capability surface complete (PR 35). OPC UA HistoryRead service-handler
wiring in `DriverNodeManager` remains as the next step; integration-test still
pending.
## 1. Proxy-side `IHistoryProvider` for `ReadAtTime` / `ReadEvents` — **DONE (PRs 35 + 38)**
PR 35 extended `IHistoryProvider` with `ReadAtTimeAsync` + `ReadEventsAsync`
(default throwing implementations so existing impls keep compiling), added the
`HistoricalEvent` + `HistoricalEventsResult` records to
`Core.Abstractions`, and implemented both methods in `GalaxyProxyDriver` on top
of the PR 10 / PR 11 IPC messages. Wire-to-domain mapping (`ToHistoricalEvent`)
is unit-tested for field fidelity, null-preservation, and `DateTimeKind.Utc`.
`HistoricalEvent` + `HistoricalEventsResult` records to `Core.Abstractions`,
and implemented both methods in `GalaxyProxyDriver` on top of the PR 10 / PR 11
IPC messages.
**Remaining**:
- `DriverNodeManager` wires the new capability methods onto `HistoryRead`
`AtTime` + `Events` service handlers.
- Integration test: OPC UA client calls `HistoryReadAtTime` / `HistoryReadEvents`,
value flows through IPC to the Host's `HistorianDataSource`, back to the client.
PR 38 wired the OPC UA HistoryRead service-handler through
`DriverNodeManager` by overriding `CustomNodeManager2`'s four per-kind hooks —
`HistoryReadRawModified` / `HistoryReadProcessed` / `HistoryReadAtTime` /
`HistoryReadEvents`. Each walks `nodesToProcess`, resolves the driver-side
full reference from `NodeId.Identifier`, dispatches to the right
`IHistoryProvider` method, and populates the paired results + errors lists
(both must be set — the MasterNodeManager merges them and a Good result with
an unset error slot serializes as `BadHistoryOperationUnsupported` on the
wire). Historized variables gain `AccessLevels.HistoryRead` so the stack
dispatches; the driver root folder gains `EventNotifiers.HistoryRead` so
`HistoryReadEvents` can target it.
Aggregate translation uses a small `MapAggregate` helper that handles
`Average` / `Minimum` / `Maximum` / `Total` / `Count` (the enum surface the
driver exposes) and returns null for unsupported aggregates so the handler
can surface `BadAggregateNotSupported`. Raw+Processed+AtTime wrap driver
samples as `HistoryData` in an `ExtensionObject`; Events emits a
`HistoryEvent` with the standard BaseEventType field list (EventId /
SourceName / Message / Severity / Time / ReceiveTime) — custom
`SelectClause` evaluation is an explicit follow-up.
**Tests**:
- `DriverNodeManagerHistoryMappingTests` — 12 unit cases pinning
`MapAggregate`, `BuildHistoryData`, `BuildHistoryEvent`, `ToDataValue`.
- `HistoryReadIntegrationTests` — 5 end-to-end cases drive a real OPC UA
client (`Session.HistoryRead`) against a fake `IHistoryProvider` driver
through the running stack. Covers raw round-trip, processed with Average
aggregate, unsupported aggregate → `BadAggregateNotSupported`, at-time
timestamp forwarding, and events field-list shape.
**Deferred**:
- Continuation-point plumbing via `Session.Save/RestoreHistoryContinuationPoint`.
Driver returns null continuations today so the pass-through is fine.
- Per-`SelectClause` evaluation in HistoryReadEvents — clients that send a
custom field selection currently get the standard BaseEventType layout.
## 2. Write-gating by role — **DONE (PR 26)**

View File

@@ -5,6 +5,11 @@ using Opc.Ua.Server;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Server.Security;
using DriverWriteRequest = ZB.MOM.WW.OtOpcUa.Core.Abstractions.WriteRequest;
// Core.Abstractions defines a type-named HistoryReadResult (driver-side samples + continuation
// point) that collides with Opc.Ua.HistoryReadResult (service-layer per-node result). We
// assign driver-side results to an explicitly-aliased local and construct only the service
// type in the overrides below.
using OpcHistoryReadResult = Opc.Ua.HistoryReadResult;
namespace ZB.MOM.WW.OtOpcUa.Server.OpcUa;
@@ -71,7 +76,13 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder
NodeId = new NodeId(_driver.DriverInstanceId, NamespaceIndex),
BrowseName = new QualifiedName(_driver.DriverInstanceId, NamespaceIndex),
DisplayName = new LocalizedText(_driver.DriverInstanceId),
EventNotifier = EventNotifiers.None,
// Driver root is the conventional event notifier for HistoryReadEvents — clients
// request alarm history by targeting it and the node manager routes through
// IHistoryProvider.ReadEventsAsync. SubscribeToEvents is also set so live-event
// subscriptions (Alarm & Conditions) can point here in a future PR; today the
// alarm events are emitted by per-variable AlarmConditionState siblings but a
// "subscribe to all events from this driver" path would use this notifier.
EventNotifier = (byte)(EventNotifiers.SubscribeToEvents | EventNotifiers.HistoryRead),
};
// Link under Objects folder so clients see the driver subtree at browse root.
@@ -122,8 +133,15 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder
DisplayName = new LocalizedText(displayName),
DataType = MapDataType(attributeInfo.DriverDataType),
ValueRank = attributeInfo.IsArray ? ValueRanks.OneDimension : ValueRanks.Scalar,
AccessLevel = AccessLevels.CurrentReadOrWrite,
UserAccessLevel = AccessLevels.CurrentReadOrWrite,
// Historized attributes get the HistoryRead access bit so the stack dispatches
// incoming HistoryRead service calls to this node. Without it the base class
// returns BadHistoryOperationUnsupported before our per-kind hook ever runs.
// HistoryWrite isn't granted — history rewrite is a separate capability the
// driver doesn't support today.
AccessLevel = (byte)(AccessLevels.CurrentReadOrWrite
| (attributeInfo.IsHistorized ? AccessLevels.HistoryRead : 0)),
UserAccessLevel = (byte)(AccessLevels.CurrentReadOrWrite
| (attributeInfo.IsHistorized ? AccessLevels.HistoryRead : 0)),
Historizing = attributeInfo.IsHistorized,
};
_currentFolder.AddChild(v);
@@ -384,4 +402,379 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder
internal int VariableCount => _variablesByFullRef.Count;
internal bool TryGetVariable(string fullRef, out BaseDataVariableState? v)
=> _variablesByFullRef.TryGetValue(fullRef, out v!);
// ===================== HistoryRead service handlers (LMX #1, PR 38) =====================
//
// Wires the driver's IHistoryProvider capability (PR 35 added ReadAtTimeAsync / ReadEventsAsync
// alongside the PR 19 ReadRawAsync / ReadProcessedAsync) to the OPC UA HistoryRead service.
// CustomNodeManager2 has four protected per-kind hooks; the base dispatches to the right one
// based on the concrete HistoryReadDetails subtype. Each hook is sync-returning-void — the
// per-driver async calls are bridged via GetAwaiter().GetResult(), matching the pattern
// OnReadValue / OnWriteValue already use in this class so HistoryRead doesn't introduce a
// different sync-over-async convention.
//
// Per-node routing: every HistoryReadValueId in nodesToRead has a NodeHandle in
// nodesToProcess; the NodeHandle's NodeId.Identifier is the driver-side full reference
// (set during Variable() registration) so we can dispatch straight to IHistoryProvider
// without a second lookup. Nodes without IHistoryProvider backing (drivers that don't
// implement the capability) surface BadHistoryOperationUnsupported per slot and the
// rest of the batch continues — same failure-isolation pattern as OnWriteValue.
//
// Continuation-point handling is pass-through only in this PR: the driver returns null
// from its ContinuationPoint field today so the outer result's ContinuationPoint stays
// empty. Full Session.SaveHistoryContinuationPoint plumbing is a follow-up when a driver
// actually needs paging — the dispatch shape doesn't change, only the result-population.
private IHistoryProvider? History => _driver as IHistoryProvider;
protected override void HistoryReadRawModified(
ServerSystemContext context, ReadRawModifiedDetails details, TimestampsToReturn timestamps,
IList<HistoryReadValueId> nodesToRead, IList<OpcHistoryReadResult> results,
IList<ServiceResult> errors, List<NodeHandle> nodesToProcess,
IDictionary<NodeId, NodeState> cache)
{
if (History is null)
{
MarkAllUnsupported(nodesToProcess, results, errors);
return;
}
// IsReadModified=true requests a "modifications" history (who changed the data, when
// it was re-written). The driver side has no modifications store — surface that
// explicitly rather than silently returning raw data, which would mislead the client.
if (details.IsReadModified)
{
MarkAllUnsupported(nodesToProcess, results, errors, StatusCodes.BadHistoryOperationUnsupported);
return;
}
for (var n = 0; n < nodesToProcess.Count; n++)
{
var handle = nodesToProcess[n];
// NodeHandle.Index points back to the slot in the outer results/errors/nodesToRead
// arrays. nodesToProcess is the filtered subset (just the nodes this manager
// claimed), so writing to results[n] lands in the wrong slot when N > 1 and nodes
// are interleaved across multiple node managers.
var i = handle.Index;
var fullRef = ResolveFullRef(handle);
if (fullRef is null)
{
WriteNodeIdUnknown(results, errors, i);
continue;
}
try
{
var driverResult = History.ReadRawAsync(
fullRef,
details.StartTime,
details.EndTime,
details.NumValuesPerNode,
CancellationToken.None).GetAwaiter().GetResult();
WriteResult(results, errors, i, StatusCodes.Good,
BuildHistoryData(driverResult.Samples), driverResult.ContinuationPoint);
}
catch (NotSupportedException)
{
WriteUnsupported(results, errors, i);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "HistoryReadRaw failed for {FullRef}", fullRef);
WriteInternalError(results, errors, i);
}
}
}
protected override void HistoryReadProcessed(
ServerSystemContext context, ReadProcessedDetails details, TimestampsToReturn timestamps,
IList<HistoryReadValueId> nodesToRead, IList<OpcHistoryReadResult> results,
IList<ServiceResult> errors, List<NodeHandle> nodesToProcess,
IDictionary<NodeId, NodeState> cache)
{
if (History is null)
{
MarkAllUnsupported(nodesToProcess, results, errors);
return;
}
// AggregateType is one NodeId shared across every item in the batch — map once.
var aggregate = MapAggregate(details.AggregateType?.FirstOrDefault());
if (aggregate is null)
{
MarkAllUnsupported(nodesToProcess, results, errors, StatusCodes.BadAggregateNotSupported);
return;
}
var interval = TimeSpan.FromMilliseconds(details.ProcessingInterval);
for (var n = 0; n < nodesToProcess.Count; n++)
{
var handle = nodesToProcess[n];
// NodeHandle.Index points back to the slot in the outer results/errors/nodesToRead
// arrays. nodesToProcess is the filtered subset (just the nodes this manager
// claimed), so writing to results[n] lands in the wrong slot when N > 1 and nodes
// are interleaved across multiple node managers.
var i = handle.Index;
var fullRef = ResolveFullRef(handle);
if (fullRef is null)
{
WriteNodeIdUnknown(results, errors, i);
continue;
}
try
{
var driverResult = History.ReadProcessedAsync(
fullRef,
details.StartTime,
details.EndTime,
interval,
aggregate.Value,
CancellationToken.None).GetAwaiter().GetResult();
WriteResult(results, errors, i, StatusCodes.Good,
BuildHistoryData(driverResult.Samples), driverResult.ContinuationPoint);
}
catch (NotSupportedException)
{
WriteUnsupported(results, errors, i);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "HistoryReadProcessed failed for {FullRef}", fullRef);
WriteInternalError(results, errors, i);
}
}
}
protected override void HistoryReadAtTime(
ServerSystemContext context, ReadAtTimeDetails details, TimestampsToReturn timestamps,
IList<HistoryReadValueId> nodesToRead, IList<OpcHistoryReadResult> results,
IList<ServiceResult> errors, List<NodeHandle> nodesToProcess,
IDictionary<NodeId, NodeState> cache)
{
if (History is null)
{
MarkAllUnsupported(nodesToProcess, results, errors);
return;
}
var requestedTimes = (IReadOnlyList<DateTime>)(details.ReqTimes?.ToArray() ?? Array.Empty<DateTime>());
for (var n = 0; n < nodesToProcess.Count; n++)
{
var handle = nodesToProcess[n];
// NodeHandle.Index points back to the slot in the outer results/errors/nodesToRead
// arrays. nodesToProcess is the filtered subset (just the nodes this manager
// claimed), so writing to results[n] lands in the wrong slot when N > 1 and nodes
// are interleaved across multiple node managers.
var i = handle.Index;
var fullRef = ResolveFullRef(handle);
if (fullRef is null)
{
WriteNodeIdUnknown(results, errors, i);
continue;
}
try
{
var driverResult = History.ReadAtTimeAsync(
fullRef, requestedTimes, CancellationToken.None).GetAwaiter().GetResult();
WriteResult(results, errors, i, StatusCodes.Good,
BuildHistoryData(driverResult.Samples), driverResult.ContinuationPoint);
}
catch (NotSupportedException)
{
WriteUnsupported(results, errors, i);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "HistoryReadAtTime failed for {FullRef}", fullRef);
WriteInternalError(results, errors, i);
}
}
}
protected override void HistoryReadEvents(
ServerSystemContext context, ReadEventDetails details, TimestampsToReturn timestamps,
IList<HistoryReadValueId> nodesToRead, IList<OpcHistoryReadResult> results,
IList<ServiceResult> errors, List<NodeHandle> nodesToProcess,
IDictionary<NodeId, NodeState> cache)
{
if (History is null)
{
MarkAllUnsupported(nodesToProcess, results, errors);
return;
}
// SourceName filter extraction is deferred — EventFilter SelectClauses + WhereClause
// handling is a dedicated concern (proper per-select-clause Variant population + where
// filter evaluation). This PR treats the event query as "all events in range for the
// node's source" and populates only the standard BaseEventType fields. Richer filter
// handling is a follow-up; clients issuing empty/default filters get the right answer
// today which covers the common alarm-history browse case.
var maxEvents = (int)details.NumValuesPerNode;
if (maxEvents <= 0) maxEvents = 1000;
for (var n = 0; n < nodesToProcess.Count; n++)
{
var handle = nodesToProcess[n];
// NodeHandle.Index points back to the slot in the outer results/errors/nodesToRead
// arrays. nodesToProcess is the filtered subset (just the nodes this manager
// claimed), so writing to results[n] lands in the wrong slot when N > 1 and nodes
// are interleaved across multiple node managers.
var i = handle.Index;
// Event history queries may target a notifier object (e.g. the driver-root folder)
// rather than a specific variable — in that case we pass sourceName=null to mean
// "all sources in the driver's namespace" per the IHistoryProvider contract.
var fullRef = ResolveFullRef(handle);
try
{
var driverResult = History.ReadEventsAsync(
sourceName: fullRef,
startUtc: details.StartTime,
endUtc: details.EndTime,
maxEvents: maxEvents,
cancellationToken: CancellationToken.None).GetAwaiter().GetResult();
WriteResult(results, errors, i, StatusCodes.Good,
BuildHistoryEvent(driverResult.Events), driverResult.ContinuationPoint);
}
catch (NotSupportedException)
{
WriteUnsupported(results, errors, i);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "HistoryReadEvents failed for {FullRef}", fullRef);
WriteInternalError(results, errors, i);
}
}
}
private string? ResolveFullRef(NodeHandle handle) => handle.NodeId?.Identifier as string;
// Both the results list AND the parallel errors list must be populated — MasterNodeManager
// merges them and the merged StatusCode is what the client sees. Leaving errors[i] at its
// default (BadHistoryOperationUnsupported) overrides a Good result with Unsupported, which
// masks a correctly-constructed HistoryData response. This was the subtle failure mode
// that cost most of PR 38's debugging budget.
private static void WriteResult(IList<OpcHistoryReadResult> results, IList<ServiceResult> errors,
int i, uint statusCode, ExtensionObject historyData, byte[]? continuationPoint)
{
results[i] = new OpcHistoryReadResult
{
StatusCode = statusCode,
HistoryData = historyData,
ContinuationPoint = continuationPoint,
};
errors[i] = statusCode == StatusCodes.Good
? ServiceResult.Good
: new ServiceResult(statusCode);
}
private static void WriteUnsupported(IList<OpcHistoryReadResult> results, IList<ServiceResult> errors, int i)
{
results[i] = new OpcHistoryReadResult { StatusCode = StatusCodes.BadHistoryOperationUnsupported };
errors[i] = StatusCodes.BadHistoryOperationUnsupported;
}
private static void WriteInternalError(IList<OpcHistoryReadResult> results, IList<ServiceResult> errors, int i)
{
results[i] = new OpcHistoryReadResult { StatusCode = StatusCodes.BadInternalError };
errors[i] = StatusCodes.BadInternalError;
}
private static void WriteNodeIdUnknown(IList<OpcHistoryReadResult> results, IList<ServiceResult> errors, int i)
{
WriteNodeIdUnknown(results, errors, i);
errors[i] = StatusCodes.BadNodeIdUnknown;
}
private static void MarkAllUnsupported(
List<NodeHandle> nodes, IList<OpcHistoryReadResult> results, IList<ServiceResult> errors,
uint statusCode = StatusCodes.BadHistoryOperationUnsupported)
{
foreach (var handle in nodes)
{
results[handle.Index] = new OpcHistoryReadResult { StatusCode = statusCode };
errors[handle.Index] = statusCode == StatusCodes.Good ? ServiceResult.Good : new ServiceResult(statusCode);
}
}
/// <summary>
/// Map the OPC UA Part 13 aggregate-function NodeId to the driver's
/// <see cref="HistoryAggregateType"/>. Internal so the test suite can pin the mapping
/// without exposing public API. Returns null for unsupported aggregates so the service
/// handler can surface <c>BadAggregateNotSupported</c> on the whole batch.
/// </summary>
internal static HistoryAggregateType? MapAggregate(NodeId? aggregateNodeId)
{
if (aggregateNodeId is null) return null;
// Every AggregateFunction_* identifier is a numeric uint on the Server (0) namespace.
// Comparing NodeIds by value handles all the cross-encoding cases (expanded vs plain).
if (aggregateNodeId == ObjectIds.AggregateFunction_Average) return HistoryAggregateType.Average;
if (aggregateNodeId == ObjectIds.AggregateFunction_Minimum) return HistoryAggregateType.Minimum;
if (aggregateNodeId == ObjectIds.AggregateFunction_Maximum) return HistoryAggregateType.Maximum;
if (aggregateNodeId == ObjectIds.AggregateFunction_Total) return HistoryAggregateType.Total;
if (aggregateNodeId == ObjectIds.AggregateFunction_Count) return HistoryAggregateType.Count;
return null;
}
/// <summary>
/// Wrap driver samples as <c>HistoryData</c> in an <c>ExtensionObject</c> — the on-wire
/// shape the OPC UA HistoryRead service expects for raw / processed / at-time reads.
/// </summary>
internal static ExtensionObject BuildHistoryData(IReadOnlyList<DataValueSnapshot> samples)
{
var values = new DataValueCollection(samples.Count);
foreach (var s in samples) values.Add(ToDataValue(s));
return new ExtensionObject(new HistoryData { DataValues = values });
}
/// <summary>
/// Wrap driver events as <c>HistoryEvent</c> in an <c>ExtensionObject</c>. Populates
/// the minimum BaseEventType field set (SourceName, Message, Severity, Time,
/// ReceiveTime, EventId) so clients that request the default
/// <c>SimpleAttributeOperand</c> select-clauses see useful data. Custom EventFilter
/// SelectClause evaluation is deferred — when a client sends a specific operand list,
/// they currently get the standard fields back and ignore the extras. Documented on the
/// public follow-up list.
/// </summary>
internal static ExtensionObject BuildHistoryEvent(IReadOnlyList<HistoricalEvent> events)
{
var fieldLists = new HistoryEventFieldListCollection(events.Count);
foreach (var e in events)
{
var fields = new VariantCollection
{
// Order must match BaseEventType's conventional field ordering so clients that
// didn't customize the SelectClauses still see recognizable columns. A future
// PR that respects the client's SelectClause list will drive this from the filter.
new Variant(e.EventId),
new Variant(e.SourceName ?? string.Empty),
new Variant(new LocalizedText(e.Message ?? string.Empty)),
new Variant(e.Severity),
new Variant(e.EventTimeUtc),
new Variant(e.ReceivedTimeUtc),
};
fieldLists.Add(new HistoryEventFieldList { EventFields = fields });
}
return new ExtensionObject(new HistoryEvent { Events = fieldLists });
}
internal static DataValue ToDataValue(DataValueSnapshot s)
{
var dv = new DataValue
{
Value = s.Value,
StatusCode = new StatusCode(s.StatusCode),
ServerTimestamp = s.ServerTimestampUtc,
};
if (s.SourceTimestampUtc.HasValue) dv.SourceTimestamp = s.SourceTimestampUtc.Value;
return dv;
}
}

View File

@@ -0,0 +1,160 @@
using System.Linq;
using Opc.Ua;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Server.OpcUa;
namespace ZB.MOM.WW.OtOpcUa.Server.Tests;
/// <summary>
/// Unit coverage for the static helpers <see cref="DriverNodeManager"/> exposes to bridge
/// driver-side history data (<see cref="HistoricalEvent"/> + <see cref="DataValueSnapshot"/>)
/// to the OPC UA on-wire shape (<c>HistoryData</c> / <c>HistoryEvent</c> wrapped in an
/// <see cref="ExtensionObject"/>). Fast, framework-only — no server fixture.
/// </summary>
[Trait("Category", "Unit")]
public sealed class DriverNodeManagerHistoryMappingTests
{
[Theory]
[InlineData(nameof(HistoryAggregateType.Average), HistoryAggregateType.Average)]
[InlineData(nameof(HistoryAggregateType.Minimum), HistoryAggregateType.Minimum)]
[InlineData(nameof(HistoryAggregateType.Maximum), HistoryAggregateType.Maximum)]
[InlineData(nameof(HistoryAggregateType.Total), HistoryAggregateType.Total)]
[InlineData(nameof(HistoryAggregateType.Count), HistoryAggregateType.Count)]
public void MapAggregate_translates_each_supported_OPC_UA_aggregate_NodeId(
string name, HistoryAggregateType expected)
{
// Resolve the ObjectIds.AggregateFunction_<name> constant via reflection so the test
// keeps working if the stack ever renames them — failure means the stack broke its
// naming convention, worth surfacing loudly.
var field = typeof(ObjectIds).GetField("AggregateFunction_" + name);
field.ShouldNotBeNull();
var nodeId = (NodeId)field!.GetValue(null)!;
DriverNodeManager.MapAggregate(nodeId).ShouldBe(expected);
}
[Fact]
public void MapAggregate_returns_null_for_unknown_aggregate()
{
// AggregateFunction_TimeAverage is a valid OPC UA aggregate but not one the driver
// surfaces. Null here means the service handler will translate to BadAggregateNotSupported
// — the right behavior per Part 13 when the requested aggregate isn't implemented.
DriverNodeManager.MapAggregate(ObjectIds.AggregateFunction_TimeAverage).ShouldBeNull();
}
[Fact]
public void MapAggregate_returns_null_for_null_input()
{
// Processed requests that omit the aggregate list (or pass a single null) must not crash.
DriverNodeManager.MapAggregate(null).ShouldBeNull();
}
[Fact]
public void BuildHistoryData_wraps_samples_as_HistoryData_extension_object()
{
var samples = new[]
{
new DataValueSnapshot(Value: 42, StatusCode: StatusCodes.Good,
SourceTimestampUtc: new DateTime(2024, 1, 1, 0, 0, 0, DateTimeKind.Utc),
ServerTimestampUtc: new DateTime(2024, 1, 1, 0, 0, 1, DateTimeKind.Utc)),
new DataValueSnapshot(Value: 99, StatusCode: StatusCodes.Good,
SourceTimestampUtc: new DateTime(2024, 1, 1, 0, 0, 5, DateTimeKind.Utc),
ServerTimestampUtc: new DateTime(2024, 1, 1, 0, 0, 6, DateTimeKind.Utc)),
};
var ext = DriverNodeManager.BuildHistoryData(samples);
ext.Body.ShouldBeOfType<HistoryData>();
var hd = (HistoryData)ext.Body;
hd.DataValues.Count.ShouldBe(2);
hd.DataValues[0].Value.ShouldBe(42);
hd.DataValues[1].Value.ShouldBe(99);
hd.DataValues[0].SourceTimestamp.ShouldBe(new DateTime(2024, 1, 1, 0, 0, 0, DateTimeKind.Utc));
}
[Fact]
public void BuildHistoryEvent_wraps_events_with_BaseEventType_field_ordering()
{
// BuildHistoryEvent populates a fixed field set in BaseEventType's conventional order:
// EventId, SourceName, Message, Severity, Time, ReceiveTime. Pinning this so a later
// "respect the client's SelectClauses" change can't silently break older clients that
// rely on the default layout.
var events = new[]
{
new HistoricalEvent(
EventId: "e-1",
SourceName: "Tank1.HiAlarm",
EventTimeUtc: new DateTime(2024, 1, 1, 12, 0, 0, DateTimeKind.Utc),
ReceivedTimeUtc: new DateTime(2024, 1, 1, 12, 0, 0, 5, DateTimeKind.Utc),
Message: "High level reached",
Severity: 750),
};
var ext = DriverNodeManager.BuildHistoryEvent(events);
ext.Body.ShouldBeOfType<HistoryEvent>();
var he = (HistoryEvent)ext.Body;
he.Events.Count.ShouldBe(1);
var fields = he.Events[0].EventFields;
fields.Count.ShouldBe(6);
fields[0].Value.ShouldBe("e-1"); // EventId
fields[1].Value.ShouldBe("Tank1.HiAlarm"); // SourceName
((LocalizedText)fields[2].Value).Text.ShouldBe("High level reached"); // Message
fields[3].Value.ShouldBe((ushort)750); // Severity
((DateTime)fields[4].Value).ShouldBe(new DateTime(2024, 1, 1, 12, 0, 0, DateTimeKind.Utc));
((DateTime)fields[5].Value).ShouldBe(new DateTime(2024, 1, 1, 12, 0, 0, 5, DateTimeKind.Utc));
}
[Fact]
public void BuildHistoryEvent_substitutes_empty_string_for_null_SourceName_and_Message()
{
// Driver-side nulls are preserved through the wire contract by design (distinguishes
// "system event with no source" from "source unknown"), but OPC UA Variants of type
// String must not carry null — the stack serializes null-string as empty. This test
// pins the choice so a nullable-Variant refactor doesn't break clients that display
// the field without a null check.
var events = new[]
{
new HistoricalEvent("sys", null, DateTime.UtcNow, DateTime.UtcNow, null, 1),
};
var ext = DriverNodeManager.BuildHistoryEvent(events);
var fields = ((HistoryEvent)ext.Body).Events[0].EventFields;
fields[1].Value.ShouldBe(string.Empty);
((LocalizedText)fields[2].Value).Text.ShouldBe(string.Empty);
}
[Fact]
public void ToDataValue_preserves_status_code_and_timestamps()
{
var snap = new DataValueSnapshot(
Value: 123.45,
StatusCode: StatusCodes.UncertainSubstituteValue,
SourceTimestampUtc: new DateTime(2024, 5, 1, 10, 0, 0, DateTimeKind.Utc),
ServerTimestampUtc: new DateTime(2024, 5, 1, 10, 0, 1, DateTimeKind.Utc));
var dv = DriverNodeManager.ToDataValue(snap);
dv.Value.ShouldBe(123.45);
dv.StatusCode.Code.ShouldBe(StatusCodes.UncertainSubstituteValue);
dv.SourceTimestamp.ShouldBe(new DateTime(2024, 5, 1, 10, 0, 0, DateTimeKind.Utc));
dv.ServerTimestamp.ShouldBe(new DateTime(2024, 5, 1, 10, 0, 1, DateTimeKind.Utc));
}
[Fact]
public void ToDataValue_leaves_SourceTimestamp_default_when_snapshot_has_no_source_time()
{
// Galaxy's raw-history rows often carry only a ServerTimestamp (the historian knows
// when it wrote the row, not when the process sampled it). The mapping must not
// synthesize a bogus SourceTimestamp from ServerTimestamp — that would lie to the
// client about the measurement's actual time.
var snap = new DataValueSnapshot(Value: 1, StatusCode: 0,
SourceTimestampUtc: null,
ServerTimestampUtc: new DateTime(2024, 5, 1, 10, 0, 1, DateTimeKind.Utc));
var dv = DriverNodeManager.ToDataValue(snap);
dv.SourceTimestamp.ShouldBe(default);
}
}

View File

@@ -0,0 +1,356 @@
using Microsoft.Extensions.Logging.Abstractions;
using Opc.Ua;
using Opc.Ua.Client;
using Opc.Ua.Configuration;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Core.Hosting;
using ZB.MOM.WW.OtOpcUa.Server.OpcUa;
using ZB.MOM.WW.OtOpcUa.Server.Security;
// Core.Abstractions.HistoryReadResult (driver-side samples) collides with Opc.Ua.HistoryReadResult
// (service-layer per-node result). Alias the driver type so the stub's interface implementations
// are unambiguous.
using DriverHistoryReadResult = ZB.MOM.WW.OtOpcUa.Core.Abstractions.HistoryReadResult;
namespace ZB.MOM.WW.OtOpcUa.Server.Tests;
/// <summary>
/// End-to-end test that a real OPC UA client's HistoryRead service reaches a fake driver's
/// <see cref="IHistoryProvider"/> via <see cref="DriverNodeManager"/>'s
/// <c>HistoryReadRawModified</c> / <c>HistoryReadProcessed</c> / <c>HistoryReadAtTime</c> /
/// <c>HistoryReadEvents</c> overrides. Boots the full OPC UA stack + a stub
/// <see cref="IHistoryProvider"/> driver, opens a client session, issues each HistoryRead
/// variant, and asserts the client receives the expected per-kind payload.
/// </summary>
[Trait("Category", "Integration")]
public sealed class HistoryReadIntegrationTests : IAsyncLifetime
{
private static readonly int Port = 48600 + Random.Shared.Next(0, 99);
private readonly string _endpoint = $"opc.tcp://localhost:{Port}/OtOpcUaHistoryTest";
private readonly string _pkiRoot = Path.Combine(Path.GetTempPath(), $"otopcua-history-test-{Guid.NewGuid():N}");
private DriverHost _driverHost = null!;
private OpcUaApplicationHost _server = null!;
private HistoryDriver _driver = null!;
public async ValueTask InitializeAsync()
{
_driverHost = new DriverHost();
_driver = new HistoryDriver();
await _driverHost.RegisterAsync(_driver, "{}", CancellationToken.None);
var options = new OpcUaServerOptions
{
EndpointUrl = _endpoint,
ApplicationName = "OtOpcUaHistoryTest",
ApplicationUri = "urn:OtOpcUa:Server:HistoryTest",
PkiStoreRoot = _pkiRoot,
AutoAcceptUntrustedClientCertificates = true,
};
_server = new OpcUaApplicationHost(options, _driverHost, new DenyAllUserAuthenticator(),
NullLoggerFactory.Instance, NullLogger<OpcUaApplicationHost>.Instance);
await _server.StartAsync(CancellationToken.None);
}
public async ValueTask DisposeAsync()
{
await _server.DisposeAsync();
await _driverHost.DisposeAsync();
try { Directory.Delete(_pkiRoot, recursive: true); } catch { /* best-effort */ }
}
[Fact]
public async Task HistoryReadRaw_round_trips_driver_samples_to_the_client()
{
using var session = await OpenSessionAsync();
var nsIndex = (ushort)session.NamespaceUris.GetIndex("urn:OtOpcUa:history-driver");
var nodeId = new NodeId("raw.var", nsIndex);
// The Opc.Ua client exposes HistoryRead via Session.HistoryRead. We construct a
// ReadRawModifiedDetails (IsReadModified=false → raw path) and a single
// HistoryReadValueId targeting the driver-backed variable.
var details = new ReadRawModifiedDetails
{
StartTime = new DateTime(2024, 1, 1, 0, 0, 0, DateTimeKind.Utc),
EndTime = new DateTime(2024, 1, 1, 0, 0, 10, DateTimeKind.Utc),
NumValuesPerNode = 100,
IsReadModified = false,
ReturnBounds = false,
};
var extObj = new ExtensionObject(details);
var nodesToRead = new HistoryReadValueIdCollection { new() { NodeId = nodeId } };
session.HistoryRead(null, extObj, TimestampsToReturn.Both, false, nodesToRead,
out var results, out _);
results.Count.ShouldBe(1);
results[0].StatusCode.Code.ShouldBe(StatusCodes.Good, $"HistoryReadRaw returned {results[0].StatusCode}");
var hd = (HistoryData)ExtensionObject.ToEncodeable(results[0].HistoryData);
hd.DataValues.Count.ShouldBe(_driver.RawSamplesReturned, "one DataValue per driver sample");
hd.DataValues[0].Value.ShouldBe(_driver.FirstRawValue);
}
[Fact]
public async Task HistoryReadProcessed_maps_Average_aggregate_and_routes_to_ReadProcessedAsync()
{
using var session = await OpenSessionAsync();
var nsIndex = (ushort)session.NamespaceUris.GetIndex("urn:OtOpcUa:history-driver");
var nodeId = new NodeId("proc.var", nsIndex);
var details = new ReadProcessedDetails
{
StartTime = new DateTime(2024, 1, 1, 0, 0, 0, DateTimeKind.Utc),
EndTime = new DateTime(2024, 1, 1, 0, 1, 0, DateTimeKind.Utc),
ProcessingInterval = 10_000, // 10s buckets
AggregateType = [ObjectIds.AggregateFunction_Average],
};
var extObj = new ExtensionObject(details);
var nodesToRead = new HistoryReadValueIdCollection { new() { NodeId = nodeId } };
session.HistoryRead(null, extObj, TimestampsToReturn.Both, false, nodesToRead,
out var results, out _);
results[0].StatusCode.Code.ShouldBe(StatusCodes.Good);
_driver.LastProcessedAggregate.ShouldBe(HistoryAggregateType.Average,
"MapAggregate must translate ObjectIds.AggregateFunction_Average → driver enum");
_driver.LastProcessedInterval.ShouldBe(TimeSpan.FromSeconds(10));
}
[Fact]
public async Task HistoryReadProcessed_returns_BadAggregateNotSupported_for_unmapped_aggregate()
{
using var session = await OpenSessionAsync();
var nsIndex = (ushort)session.NamespaceUris.GetIndex("urn:OtOpcUa:history-driver");
var nodeId = new NodeId("proc.var", nsIndex);
var details = new ReadProcessedDetails
{
StartTime = new DateTime(2024, 1, 1, 0, 0, 0, DateTimeKind.Utc),
EndTime = new DateTime(2024, 1, 1, 0, 1, 0, DateTimeKind.Utc),
ProcessingInterval = 10_000,
// TimeAverage is a valid OPC UA aggregate NodeId but not one the driver implements —
// the override returns BadAggregateNotSupported per Part 13 rather than coercing.
AggregateType = [ObjectIds.AggregateFunction_TimeAverage],
};
var extObj = new ExtensionObject(details);
var nodesToRead = new HistoryReadValueIdCollection { new() { NodeId = nodeId } };
session.HistoryRead(null, extObj, TimestampsToReturn.Both, false, nodesToRead,
out var results, out _);
results[0].StatusCode.Code.ShouldBe(StatusCodes.BadAggregateNotSupported);
}
[Fact]
public async Task HistoryReadAtTime_forwards_timestamp_list_to_driver()
{
using var session = await OpenSessionAsync();
var nsIndex = (ushort)session.NamespaceUris.GetIndex("urn:OtOpcUa:history-driver");
var nodeId = new NodeId("atTime.var", nsIndex);
var t1 = new DateTime(2024, 3, 1, 10, 0, 0, DateTimeKind.Utc);
var t2 = new DateTime(2024, 3, 1, 10, 0, 30, DateTimeKind.Utc);
var details = new ReadAtTimeDetails { ReqTimes = new DateTimeCollection { t1, t2 } };
var extObj = new ExtensionObject(details);
var nodesToRead = new HistoryReadValueIdCollection { new() { NodeId = nodeId } };
session.HistoryRead(null, extObj, TimestampsToReturn.Both, false, nodesToRead,
out var results, out _);
results[0].StatusCode.Code.ShouldBe(StatusCodes.Good);
_driver.LastAtTimeRequestedTimes.ShouldNotBeNull();
_driver.LastAtTimeRequestedTimes!.Count.ShouldBe(2);
_driver.LastAtTimeRequestedTimes[0].ShouldBe(t1);
_driver.LastAtTimeRequestedTimes[1].ShouldBe(t2);
}
[Fact]
public async Task HistoryReadEvents_returns_HistoryEvent_with_BaseEventType_field_list()
{
using var session = await OpenSessionAsync();
// Events target the driver-root notifier (not a specific variable) which is the
// conventional pattern for alarm-history browse.
var nsIndex = (ushort)session.NamespaceUris.GetIndex("urn:OtOpcUa:history-driver");
var nodeId = new NodeId("history-driver", nsIndex);
// EventFilter must carry at least one SelectClause or the stack rejects it as
// BadEventFilterInvalid before our override runs — empty filters are spec-forbidden.
// We populate the standard BaseEventType selectors any real client would send; my
// override's BuildHistoryEvent ignores the specific clauses and emits the canonical
// field list anyway (the richer "respect exact SelectClauses" behavior is on the PR 38
// follow-up list).
var filter = new EventFilter();
filter.AddSelectClause(ObjectTypeIds.BaseEventType, BrowseNames.EventId);
filter.AddSelectClause(ObjectTypeIds.BaseEventType, BrowseNames.SourceName);
filter.AddSelectClause(ObjectTypeIds.BaseEventType, BrowseNames.Message);
filter.AddSelectClause(ObjectTypeIds.BaseEventType, BrowseNames.Severity);
filter.AddSelectClause(ObjectTypeIds.BaseEventType, BrowseNames.Time);
filter.AddSelectClause(ObjectTypeIds.BaseEventType, BrowseNames.ReceiveTime);
var details = new ReadEventDetails
{
StartTime = new DateTime(2024, 1, 1, 0, 0, 0, DateTimeKind.Utc),
EndTime = new DateTime(2024, 12, 31, 0, 0, 0, DateTimeKind.Utc),
NumValuesPerNode = 10,
Filter = filter,
};
var extObj = new ExtensionObject(details);
var nodesToRead = new HistoryReadValueIdCollection { new() { NodeId = nodeId } };
session.HistoryRead(null, extObj, TimestampsToReturn.Both, false, nodesToRead,
out var results, out _);
results[0].StatusCode.Code.ShouldBe(StatusCodes.Good);
var he = (HistoryEvent)ExtensionObject.ToEncodeable(results[0].HistoryData);
he.Events.Count.ShouldBe(_driver.EventsReturned);
he.Events[0].EventFields.Count.ShouldBe(6, "BaseEventType default field layout is 6 entries");
}
private async Task<ISession> OpenSessionAsync()
{
var cfg = new ApplicationConfiguration
{
ApplicationName = "OtOpcUaHistoryTestClient",
ApplicationUri = "urn:OtOpcUa:HistoryTestClient",
ApplicationType = ApplicationType.Client,
SecurityConfiguration = new SecurityConfiguration
{
ApplicationCertificate = new CertificateIdentifier
{
StoreType = CertificateStoreType.Directory,
StorePath = Path.Combine(_pkiRoot, "client-own"),
SubjectName = "CN=OtOpcUaHistoryTestClient",
},
TrustedIssuerCertificates = new CertificateTrustList { StoreType = CertificateStoreType.Directory, StorePath = Path.Combine(_pkiRoot, "client-issuers") },
TrustedPeerCertificates = new CertificateTrustList { StoreType = CertificateStoreType.Directory, StorePath = Path.Combine(_pkiRoot, "client-trusted") },
RejectedCertificateStore = new CertificateTrustList { StoreType = CertificateStoreType.Directory, StorePath = Path.Combine(_pkiRoot, "client-rejected") },
AutoAcceptUntrustedCertificates = true,
AddAppCertToTrustedStore = true,
},
TransportConfigurations = new TransportConfigurationCollection(),
TransportQuotas = new TransportQuotas { OperationTimeout = 15000 },
ClientConfiguration = new ClientConfiguration { DefaultSessionTimeout = 60000 },
};
await cfg.Validate(ApplicationType.Client);
cfg.CertificateValidator.CertificateValidation += (_, e) => e.Accept = true;
var instance = new ApplicationInstance { ApplicationConfiguration = cfg, ApplicationType = ApplicationType.Client };
await instance.CheckApplicationInstanceCertificate(true, CertificateFactory.DefaultKeySize);
var selected = CoreClientUtils.SelectEndpoint(cfg, _endpoint, useSecurity: false);
var endpointConfig = EndpointConfiguration.Create(cfg);
var configuredEndpoint = new ConfiguredEndpoint(null, selected, endpointConfig);
return await Session.Create(cfg, configuredEndpoint, false, "OtOpcUaHistoryTestClientSession", 60000,
new UserIdentity(new AnonymousIdentityToken()), null);
}
/// <summary>
/// Stub driver that implements <see cref="IHistoryProvider"/> so the service dispatch
/// can be verified without bringing up a real Galaxy or Historian. Captures the last-
/// seen arguments so tests can assert what the service handler forwarded.
/// </summary>
private sealed class HistoryDriver : IDriver, ITagDiscovery, IReadable, IHistoryProvider
{
public string DriverInstanceId => "history-driver";
public string DriverType => "HistoryStub";
public int RawSamplesReturned => 3;
public int FirstRawValue => 100;
public int EventsReturned => 2;
public HistoryAggregateType? LastProcessedAggregate { get; private set; }
public TimeSpan? LastProcessedInterval { get; private set; }
public IReadOnlyList<DateTime>? LastAtTimeRequestedTimes { get; private set; }
public Task InitializeAsync(string driverConfigJson, CancellationToken ct) => Task.CompletedTask;
public Task ReinitializeAsync(string driverConfigJson, CancellationToken ct) => Task.CompletedTask;
public Task ShutdownAsync(CancellationToken ct) => Task.CompletedTask;
public DriverHealth GetHealth() => new(DriverState.Healthy, DateTime.UtcNow, null);
public long GetMemoryFootprint() => 0;
public Task FlushOptionalCachesAsync(CancellationToken ct) => Task.CompletedTask;
public Task DiscoverAsync(IAddressSpaceBuilder builder, CancellationToken ct)
{
// Every variable must be Historized for HistoryRead to route — the node-manager's
// stack base class checks the bit before dispatching.
builder.Variable("raw", "raw",
new DriverAttributeInfo("raw.var", DriverDataType.Int32, false, null,
SecurityClassification.FreeAccess, IsHistorized: true, IsAlarm: false));
builder.Variable("proc", "proc",
new DriverAttributeInfo("proc.var", DriverDataType.Float64, false, null,
SecurityClassification.FreeAccess, IsHistorized: true, IsAlarm: false));
builder.Variable("atTime", "atTime",
new DriverAttributeInfo("atTime.var", DriverDataType.Int32, false, null,
SecurityClassification.FreeAccess, IsHistorized: true, IsAlarm: false));
return Task.CompletedTask;
}
public Task<IReadOnlyList<DataValueSnapshot>> ReadAsync(
IReadOnlyList<string> fullReferences, CancellationToken cancellationToken)
{
var now = DateTime.UtcNow;
IReadOnlyList<DataValueSnapshot> r =
[.. fullReferences.Select(_ => new DataValueSnapshot(0, 0u, now, now))];
return Task.FromResult(r);
}
public Task<DriverHistoryReadResult> ReadRawAsync(
string fullReference, DateTime startUtc, DateTime endUtc, uint maxValuesPerNode,
CancellationToken cancellationToken)
{
var samples = new List<DataValueSnapshot>();
for (var i = 0; i < RawSamplesReturned; i++)
{
samples.Add(new DataValueSnapshot(
Value: FirstRawValue + i,
StatusCode: StatusCodes.Good,
SourceTimestampUtc: startUtc.AddSeconds(i),
ServerTimestampUtc: startUtc.AddSeconds(i)));
}
return Task.FromResult(new DriverHistoryReadResult(samples, null));
}
public Task<DriverHistoryReadResult> ReadProcessedAsync(
string fullReference, DateTime startUtc, DateTime endUtc, TimeSpan interval,
HistoryAggregateType aggregate, CancellationToken cancellationToken)
{
LastProcessedAggregate = aggregate;
LastProcessedInterval = interval;
return Task.FromResult(new DriverHistoryReadResult(
[new DataValueSnapshot(1.0, StatusCodes.Good, startUtc, startUtc)],
null));
}
public Task<DriverHistoryReadResult> ReadAtTimeAsync(
string fullReference, IReadOnlyList<DateTime> timestampsUtc,
CancellationToken cancellationToken)
{
LastAtTimeRequestedTimes = timestampsUtc;
var samples = timestampsUtc
.Select(t => new DataValueSnapshot(42, StatusCodes.Good, t, t))
.ToArray();
return Task.FromResult(new DriverHistoryReadResult(samples, null));
}
public Task<HistoricalEventsResult> ReadEventsAsync(
string? sourceName, DateTime startUtc, DateTime endUtc, int maxEvents,
CancellationToken cancellationToken)
{
var events = new List<HistoricalEvent>();
for (var i = 0; i < EventsReturned; i++)
{
events.Add(new HistoricalEvent(
EventId: $"e{i}",
SourceName: sourceName,
EventTimeUtc: startUtc.AddHours(i),
ReceivedTimeUtc: startUtc.AddHours(i).AddSeconds(1),
Message: $"Event {i}",
Severity: (ushort)(500 + i)));
}
return Task.FromResult(new HistoricalEventsResult(events, null));
}
}
}