feat(historian): HistoryRead override (Raw/Processed/AtTime) over IHistorianDataSource
This commit is contained in:
@@ -2,7 +2,13 @@ using System.Collections.Concurrent;
|
||||
using Opc.Ua;
|
||||
using Opc.Ua.Server;
|
||||
using ZB.MOM.WW.OtOpcUa.Commons.OpcUa;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
using ZB.MOM.WW.OtOpcUa.OpcUaServer.Security;
|
||||
// The SDK's HistoryRead service result (the value the override fills + hands back) and the historian
|
||||
// data source's read DTO are both named HistoryReadResult. Alias each to keep the two unambiguous:
|
||||
// the SDK result stays unqualified as the dominant name in the override; the source DTO is HistorianRead.
|
||||
using HistorianRead = ZB.MOM.WW.OtOpcUa.Core.Abstractions.HistoryReadResult;
|
||||
using SdkHistoryReadResult = Opc.Ua.HistoryReadResult;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.OpcUaServer;
|
||||
|
||||
@@ -111,6 +117,32 @@ public sealed class OtOpcUaNodeManager : CustomNodeManager2
|
||||
set => _nodeWriteGateway = value ?? NullOpcUaNodeWriteGateway.Instance;
|
||||
}
|
||||
|
||||
private volatile IHistorianDataSource _historianDataSource = NullHistorianDataSource.Instance;
|
||||
|
||||
/// <summary>
|
||||
/// Server-side read backend for the OPC UA HistoryRead service over historized variable nodes.
|
||||
/// When a client issues a HistoryRead (Raw / Processed / AtTime) against a node materialised
|
||||
/// <c>Historizing</c> (a tag with <see cref="TryGetHistorizedTagname"/> registered), the
|
||||
/// HistoryRead override resolves the node's NodeId to its historian tagname and dispatches to
|
||||
/// this source — so a single registered historian (e.g. Wonderware) serves many drivers' nodes,
|
||||
/// independent of any driver's lifecycle.
|
||||
/// <para>
|
||||
/// Set by the Host at <c>StartAsync</c> (Task 5). The <see cref="NullHistorianDataSource"/>
|
||||
/// default (assigning <c>null</c> restores it) means "no historian wired" → every read
|
||||
/// returns empty, so a historized node's HistoryRead surfaces <c>GoodNoData</c> rather than
|
||||
/// faulting. Backed by a <c>volatile</c> field (auto-properties can't be volatile) to make
|
||||
/// the startup-write / SDK-read-thread handoff explicit: the Host assigns it once at boot on
|
||||
/// the start thread and the SDK reads it on HistoryRead request threads. Unlike
|
||||
/// <see cref="NodeWriteGateway"/>, the HistoryRead override does NOT run under the
|
||||
/// node-manager <c>Lock</c>, so the override may block-bridge to this (async) source.
|
||||
/// </para>
|
||||
/// </summary>
|
||||
public IHistorianDataSource HistorianDataSource
|
||||
{
|
||||
get => _historianDataSource;
|
||||
set => _historianDataSource = value ?? NullHistorianDataSource.Instance;
|
||||
}
|
||||
|
||||
/// <summary>Look up a materialised Part 9 alarm-condition node by its alarm node id (the
|
||||
/// ScriptedAlarmId), or null if not yet materialised. Exposed for tests + diagnostics.</summary>
|
||||
/// <param name="alarmNodeId">The alarm node identifier (== ScriptedAlarmId).</param>
|
||||
@@ -975,6 +1007,239 @@ public sealed class OtOpcUaNodeManager : CustomNodeManager2
|
||||
return _folders.TryGetValue(parentNodeId, out var existing) ? existing : _root!;
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------------------------
|
||||
// Phase C — OPC UA HistoryRead over historized variable nodes.
|
||||
//
|
||||
// The base CustomNodeManager2.HistoryRead (public + protected dispatcher) does the heavy lifting:
|
||||
// it validates handles under Lock, builds `nodesToProcess` (a NodeHandle list for nodes WE own
|
||||
// that carry the HistoryRead access bit), validates the timestamp args, handles
|
||||
// `releaseContinuationPoints`, and dispatches by `details` runtime type to the per-details
|
||||
// protected virtuals below. We override the three variable-history virtuals; HistoryReadEvents is
|
||||
// left to the base (Task 4 adds it). Each override receives the pre-filtered handles and fills
|
||||
// results[handle.Index] / errors[handle.Index] — handle.Index is the original index into the
|
||||
// service-level results/errors lists, seeded by the base. The base pre-seeds every handle's error
|
||||
// to BadHistoryOperationUnsupported, so a handle we don't recognise stays "unsupported" by default.
|
||||
//
|
||||
// NOTE: unlike OnWriteValue, the SDK does NOT hold the node-manager Lock while invoking these, so
|
||||
// block-bridging the async data source (GetAwaiter().GetResult()) is safe — it can't freeze the
|
||||
// address space. Each handle is served in isolation under try/catch so one node's failure (timeout,
|
||||
// backend throw) never throws out of the batch.
|
||||
// ---------------------------------------------------------------------------------------------
|
||||
|
||||
/// <summary>
|
||||
/// Serve a HistoryRead-Raw request over the pre-filtered historized variable handles, dispatching
|
||||
/// each to <see cref="IHistorianDataSource.ReadRawAsync"/>. Modified-history reads
|
||||
/// (<c>IsReadModified</c>) are unsupported — we don't serve a modified-value history surface.
|
||||
/// </summary>
|
||||
protected override void HistoryReadRawModified(
|
||||
ServerSystemContext context,
|
||||
ReadRawModifiedDetails details,
|
||||
TimestampsToReturn timestampsToReturn,
|
||||
IList<HistoryReadValueId> nodesToRead,
|
||||
IList<SdkHistoryReadResult> results,
|
||||
IList<ServiceResult> errors,
|
||||
List<NodeHandle> nodesToProcess,
|
||||
IDictionary<NodeId, NodeState> cache)
|
||||
{
|
||||
foreach (var handle in nodesToProcess)
|
||||
{
|
||||
if (details.IsReadModified)
|
||||
{
|
||||
// We never serve modified-value history; mark this node unsupported and move on.
|
||||
errors[handle.Index] = StatusCodes.BadHistoryOperationUnsupported;
|
||||
continue;
|
||||
}
|
||||
|
||||
ServeNode(handle, results, errors, source => source.ReadRawAsync(
|
||||
ResolveTagname(handle),
|
||||
details.StartTime,
|
||||
details.EndTime,
|
||||
details.NumValuesPerNode,
|
||||
CancellationToken.None));
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Serve a HistoryRead-Processed request, mapping each node's per-node aggregate NodeId (from the
|
||||
/// parallel <c>AggregateType</c> collection — the base guarantees it is the same length as
|
||||
/// <c>nodesToRead</c>) to a <see cref="HistoryAggregateType"/> and dispatching to
|
||||
/// <see cref="IHistorianDataSource.ReadProcessedAsync"/>. An unknown aggregate yields
|
||||
/// <c>BadAggregateNotSupported</c> for that node.
|
||||
/// </summary>
|
||||
protected override void HistoryReadProcessed(
|
||||
ServerSystemContext context,
|
||||
ReadProcessedDetails details,
|
||||
TimestampsToReturn timestampsToReturn,
|
||||
IList<HistoryReadValueId> nodesToRead,
|
||||
IList<SdkHistoryReadResult> results,
|
||||
IList<ServiceResult> errors,
|
||||
List<NodeHandle> nodesToProcess,
|
||||
IDictionary<NodeId, NodeState> cache)
|
||||
{
|
||||
foreach (var handle in nodesToProcess)
|
||||
{
|
||||
// AggregateType is a per-node parallel collection (same length as nodesToRead, enforced by
|
||||
// the base dispatcher). handle.Index is the node's position in that collection.
|
||||
var aggregateNodeId = details.AggregateType[handle.Index];
|
||||
var aggregate = MapAggregate(aggregateNodeId);
|
||||
if (aggregate is null)
|
||||
{
|
||||
errors[handle.Index] = StatusCodes.BadAggregateNotSupported;
|
||||
continue;
|
||||
}
|
||||
|
||||
ServeNode(handle, results, errors, source => source.ReadProcessedAsync(
|
||||
ResolveTagname(handle),
|
||||
details.StartTime,
|
||||
details.EndTime,
|
||||
// OPC UA ProcessingInterval is a Duration in milliseconds.
|
||||
TimeSpan.FromMilliseconds(details.ProcessingInterval),
|
||||
aggregate.Value,
|
||||
CancellationToken.None));
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Serve a HistoryRead-AtTime request, dispatching the requested timestamps to
|
||||
/// <see cref="IHistorianDataSource.ReadAtTimeAsync"/>.
|
||||
/// </summary>
|
||||
protected override void HistoryReadAtTime(
|
||||
ServerSystemContext context,
|
||||
ReadAtTimeDetails details,
|
||||
TimestampsToReturn timestampsToReturn,
|
||||
IList<HistoryReadValueId> nodesToRead,
|
||||
IList<SdkHistoryReadResult> results,
|
||||
IList<ServiceResult> errors,
|
||||
List<NodeHandle> nodesToProcess,
|
||||
IDictionary<NodeId, NodeState> cache)
|
||||
{
|
||||
// Snapshot the requested timestamps once — the same list is read for every node.
|
||||
var timestamps = details.ReqTimes?.ToList() ?? new List<DateTime>();
|
||||
foreach (var handle in nodesToProcess)
|
||||
{
|
||||
ServeNode(handle, results, errors, source => source.ReadAtTimeAsync(
|
||||
ResolveTagname(handle),
|
||||
timestamps,
|
||||
CancellationToken.None));
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Block-bridge to the historian source for one node handle and project the result onto the
|
||||
/// service-level results/errors slots. Resolves the node's registered historian tagname first
|
||||
/// (a node we don't recognise as historized — which shouldn't reach us, since the base only
|
||||
/// hands us nodes with the HistoryRead access bit — maps to <c>BadHistoryOperationUnsupported</c>).
|
||||
/// The <paramref name="read"/> callback is invoked only AFTER the tagname is confirmed present;
|
||||
/// it is wrapped in try/catch so a backend throw / timeout becomes a Bad status for THIS node
|
||||
/// without throwing out of the batch.
|
||||
/// </summary>
|
||||
/// <param name="handle">The pre-filtered node handle to serve; <c>handle.Index</c> indexes results/errors.</param>
|
||||
/// <param name="results">The service-level results list to fill at <c>handle.Index</c>.</param>
|
||||
/// <param name="errors">The service-level errors list to fill at <c>handle.Index</c>.</param>
|
||||
/// <param name="read">Invokes the resolved data-source read; only called once the tagname is known.</param>
|
||||
private void ServeNode(
|
||||
NodeHandle handle,
|
||||
IList<SdkHistoryReadResult> results,
|
||||
IList<ServiceResult> errors,
|
||||
Func<IHistorianDataSource, Task<HistorianRead>> read)
|
||||
{
|
||||
var idString = handle.NodeId.Identifier?.ToString();
|
||||
if (idString is null || !TryGetHistorizedTagname(idString, out _))
|
||||
{
|
||||
// Not a historized node we own a tagname for — unsupported. (The base pre-seeds this same
|
||||
// status, but set it explicitly so the contract is local + obvious.)
|
||||
errors[handle.Index] = StatusCodes.BadHistoryOperationUnsupported;
|
||||
return;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
// HistoryRead is NOT invoked under the node-manager Lock (unlike OnWriteValue), so blocking
|
||||
// on the async source here is safe and won't freeze the address space.
|
||||
var sourceResult = read(HistorianDataSource).GetAwaiter().GetResult();
|
||||
var historyData = ToHistoryData(sourceResult);
|
||||
|
||||
results[handle.Index] = new SdkHistoryReadResult
|
||||
{
|
||||
// No source samples ⇒ GoodNoData (the node is historized, the window just held no data).
|
||||
StatusCode = historyData.DataValues.Count == 0 ? StatusCodes.GoodNoData : StatusCodes.Good,
|
||||
HistoryData = new ExtensionObject(historyData),
|
||||
// We never issue continuation points — every read returns the full window in one shot.
|
||||
ContinuationPoint = null,
|
||||
};
|
||||
errors[handle.Index] = ServiceResult.Good;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
// One node's backend failure (throw / timeout / cancellation) must not throw out of the
|
||||
// batch — surface a Bad status for THIS node only. This CustomNodeManager2 carries no
|
||||
// ILogger (see ReportConditionEvent), so log through the SDK's static trace rather than
|
||||
// swallowing silently. Utils.LogError is [Obsolete] in 1.5.378 (favours an ITelemetryContext
|
||||
// this manager doesn't wire) — suppress the deprecation, matching the existing pattern.
|
||||
#pragma warning disable CS0618 // Type or member is obsolete
|
||||
Utils.LogError(ex, "OtOpcUaNodeManager: HistoryRead failed for node {0}", handle.NodeId);
|
||||
#pragma warning restore CS0618
|
||||
errors[handle.Index] = StatusCodes.BadHistoryOperationUnsupported;
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>Resolve a handle's registered historian tagname. The caller (<see cref="ServeNode"/>)
|
||||
/// has already confirmed the node is historized before invoking the read callback, so this is a
|
||||
/// guaranteed hit; the null-coalesce is a defensive fallback to the bare NodeId string.</summary>
|
||||
private string ResolveTagname(NodeHandle handle)
|
||||
{
|
||||
var idString = handle.NodeId.Identifier?.ToString() ?? string.Empty;
|
||||
return TryGetHistorizedTagname(idString, out var tagname) ? tagname! : idString;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Map an OPC UA Part 13 standard-aggregate function NodeId to our
|
||||
/// <see cref="HistoryAggregateType"/>. Returns <c>null</c> for any aggregate we don't serve so
|
||||
/// the caller can surface <c>BadAggregateNotSupported</c>.
|
||||
/// </summary>
|
||||
/// <param name="aggregateNodeId">The per-node aggregate-function NodeId from the request.</param>
|
||||
/// <returns>The mapped aggregate, or <c>null</c> when unsupported.</returns>
|
||||
private static HistoryAggregateType? MapAggregate(NodeId aggregateNodeId)
|
||||
{
|
||||
if (aggregateNodeId == ObjectIds.AggregateFunction_Average) return HistoryAggregateType.Average;
|
||||
if (aggregateNodeId == ObjectIds.AggregateFunction_Minimum) return HistoryAggregateType.Minimum;
|
||||
if (aggregateNodeId == ObjectIds.AggregateFunction_Maximum) return HistoryAggregateType.Maximum;
|
||||
if (aggregateNodeId == ObjectIds.AggregateFunction_Total) return HistoryAggregateType.Total;
|
||||
if (aggregateNodeId == ObjectIds.AggregateFunction_Count) return HistoryAggregateType.Count;
|
||||
return null;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Project the historian source's <see cref="HistorianRead"/> (Core.Abstractions DTO) into an
|
||||
/// SDK <see cref="HistoryData"/> — one <see cref="DataValue"/> per <see cref="DataValueSnapshot"/>,
|
||||
/// carrying value / status / source+server timestamps. A null SourceTimestamp maps to
|
||||
/// <c>DateTime.MinValue</c> (the SDK's "unset" sentinel for that field).
|
||||
/// </summary>
|
||||
/// <param name="sourceResult">The data source's read result.</param>
|
||||
/// <returns>The populated SDK <see cref="HistoryData"/>.</returns>
|
||||
private static HistoryData ToHistoryData(HistorianRead sourceResult)
|
||||
{
|
||||
var values = new DataValueCollection(sourceResult.Samples.Count);
|
||||
foreach (var sample in sourceResult.Samples)
|
||||
{
|
||||
values.Add(ToSdkDataValue(sample));
|
||||
}
|
||||
|
||||
return new HistoryData { DataValues = values };
|
||||
}
|
||||
|
||||
/// <summary>Convert one driver-agnostic <see cref="DataValueSnapshot"/> to an SDK
|
||||
/// <see cref="DataValue"/>, mirroring value / status code / source + server timestamps.</summary>
|
||||
/// <param name="snapshot">The source sample.</param>
|
||||
/// <returns>The equivalent SDK data value.</returns>
|
||||
private static DataValue ToSdkDataValue(DataValueSnapshot snapshot) => new()
|
||||
{
|
||||
WrappedValue = new Variant(snapshot.Value),
|
||||
StatusCode = new StatusCode(snapshot.StatusCode),
|
||||
SourceTimestamp = snapshot.SourceTimestampUtc ?? DateTime.MinValue,
|
||||
ServerTimestamp = snapshot.ServerTimestampUtc,
|
||||
};
|
||||
|
||||
/// <inheritdoc />
|
||||
public override void CreateAddressSpace(IDictionary<NodeId, IList<IReference>> externalReferences)
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user