feat(historian): server-side continuation-point paging for HistoryRead-Raw
The Wonderware historian backend is single-shot — it returns up to NumValuesPerNode samples with a null continuation point — so paging is synthesised server-side, time-based, for the only count-capped arm (Raw): - A full page (count == NumValuesPerNode, NumValuesPerNode > 0) emits an opaque 16-byte continuation point and stores a resume cursor; a short page (or NumValuesPerNode == 0 "all values") emits none. - A resume read takes the stored cursor, reads the next page from the boundary forward, and emits a fresh CP only if that page is also full. - The resume cursor is tie-safe (HistoryPaging.ComputeResumeCursor / TrimBoundaryDuplicates): the next page resumes from the boundary timestamp INCLUSIVE and drops the head ties already returned, so samples sharing the boundary SourceTimestamp are neither duplicated nor skipped. Continuation points are bound to the OPC UA session via the SDK's ISession.SaveHistoryContinuationPoint / RestoreHistoryContinuationPoint store (SessionHistoryContinuationStore) — capped by ServerConfiguration. MaxHistoryContinuationPoints (default 100, oldest-evicted) and disposed on session close. releaseContinuationPoints is honoured via an override of HistoryReleaseContinuationPoints (the base dispatcher routes release-only reads there, never to the per-details arms). An unknown / evicted / released point resumes to BadContinuationPointInvalid. Processed and AtTime stay single-shot: neither details type carries a client count cap, so the single-shot backend returns the complete result in one read and there is no "full page" signal to page on (spec-conformant). Modified-value history remains out of scope. The pure paging decisions + CP store contract are unit-tested via HistoryPaging + InMemoryHistoryContinuationStore; the full multi-page round trip is driven end-to-end through the node manager with an in-memory store + a series-backed fake historian (the in-process harness is session-less).
This commit is contained in:
@@ -0,0 +1,154 @@
|
||||
using Opc.Ua.Server;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.OpcUaServer;
|
||||
|
||||
/// <summary>
|
||||
/// Stores the server-side resume state behind an opaque OPC UA HistoryRead continuation point.
|
||||
/// A continuation point is 16 opaque bytes (a fresh <see cref="Guid"/>); the store maps it to a
|
||||
/// <see cref="HistoryContinuationState"/>. The seam exists so the node manager can page against the
|
||||
/// SDK's per-session store in production (lifecycle + cap + cleanup owned by the SDK) while tests
|
||||
/// drive a session-less in-memory store through the same code path.
|
||||
/// </summary>
|
||||
internal interface IHistoryContinuationStore
|
||||
{
|
||||
/// <summary>
|
||||
/// Persist <paramref name="state"/> and return the opaque continuation-point bytes a client
|
||||
/// hands back to resume. Returns <c>null</c> when the state cannot be stored (e.g. the
|
||||
/// session-backed store has no session on this request) — the caller then returns the page with
|
||||
/// NO continuation point, which is spec-safe (a server may always return what it has in one shot).
|
||||
/// </summary>
|
||||
/// <param name="session">The session the read runs under, or <c>null</c> for a session-less call.</param>
|
||||
/// <param name="state">The resume state to store.</param>
|
||||
/// <returns>The opaque continuation-point bytes, or <c>null</c> when storage is unavailable.</returns>
|
||||
byte[]? Save(ISession? session, HistoryContinuationState state);
|
||||
|
||||
/// <summary>
|
||||
/// Look up and REMOVE the resume state for an inbound continuation point (a continuation point is
|
||||
/// single-use: taking it frees the slot; a fresh point is emitted if the resumed page is also full).
|
||||
/// </summary>
|
||||
/// <param name="session">The session the read runs under, or <c>null</c> for a session-less call.</param>
|
||||
/// <param name="continuationPoint">The opaque bytes the client handed back.</param>
|
||||
/// <returns>The stored state, or <c>null</c> when the point is unknown / expired / malformed.</returns>
|
||||
HistoryContinuationState? TryTake(ISession? session, byte[] continuationPoint);
|
||||
|
||||
/// <summary>
|
||||
/// Drop the resume state for a continuation point the client asked to release
|
||||
/// (<c>releaseContinuationPoints</c>) WITHOUT reading any data. Idempotent — releasing an unknown
|
||||
/// point is a no-op.
|
||||
/// </summary>
|
||||
/// <param name="session">The session the read runs under, or <c>null</c> for a session-less call.</param>
|
||||
/// <param name="continuationPoint">The opaque bytes to release.</param>
|
||||
void Release(ISession? session, byte[] continuationPoint);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Production <see cref="IHistoryContinuationStore"/> backed by the OPC UA SDK's per-session
|
||||
/// history-continuation store (<see cref="ISession.SaveHistoryContinuationPoint"/> /
|
||||
/// <see cref="ISession.RestoreHistoryContinuationPoint"/>). Using the SDK store gives us, for free:
|
||||
/// <list type="bullet">
|
||||
/// <item>per-session lifecycle — points are disposed when the session closes, so a client that
|
||||
/// disconnects mid-page can never leak resume state;</item>
|
||||
/// <item>a bounded capacity with oldest-eviction — the cap is
|
||||
/// <c>ServerConfiguration.MaxHistoryContinuationPoints</c> (SDK default 100); when a session
|
||||
/// exceeds it the SDK silently drops its OLDEST point, so a misbehaving client cannot grow the
|
||||
/// store unboundedly. A subsequent resume of an evicted point returns
|
||||
/// <c>BadContinuationPointInvalid</c> (a <see cref="TryTake"/> miss);</item>
|
||||
/// <item>thread-safety — the SDK session locks internally.</item>
|
||||
/// </list>
|
||||
/// The continuation-point bytes are a fresh 16-byte <see cref="Guid"/>; the SDK keys its slot by that
|
||||
/// Guid and round-trips the opaque bytes through the wire untouched (verified against the
|
||||
/// MasterNodeManager HistoryRead path — it does not register or cap history points itself).
|
||||
/// <para>
|
||||
/// When a HistoryRead arrives with no session (only the in-process test path does this), there is
|
||||
/// nowhere session-bound to durably store resume state across calls, so <see cref="Save"/> returns
|
||||
/// <c>null</c> and the read degrades to single-shot. Tests that exercise multi-page paging inject
|
||||
/// the in-memory <see cref="InMemoryHistoryContinuationStore"/> instead.
|
||||
/// </para>
|
||||
/// </summary>
|
||||
internal sealed class SessionHistoryContinuationStore : IHistoryContinuationStore
|
||||
{
|
||||
/// <inheritdoc />
|
||||
public byte[]? Save(ISession? session, HistoryContinuationState state)
|
||||
{
|
||||
if (session is null) return null;
|
||||
|
||||
// A fresh Guid is the opaque point: 16 bytes, collision-free, and the SDK keys its session slot by
|
||||
// it. The SDK enforces ServerConfiguration.MaxHistoryContinuationPoints with oldest-eviction.
|
||||
var id = Guid.NewGuid();
|
||||
session.SaveHistoryContinuationPoint(id, state);
|
||||
return id.ToByteArray();
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public HistoryContinuationState? TryTake(ISession? session, byte[] continuationPoint)
|
||||
{
|
||||
if (session is null) return null;
|
||||
// RestoreHistoryContinuationPoint REMOVES the slot (single-use) and returns null for an unknown /
|
||||
// malformed (non-16-byte) point — exactly the "miss ⇒ BadContinuationPointInvalid" contract.
|
||||
return session.RestoreHistoryContinuationPoint(continuationPoint) as HistoryContinuationState;
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public void Release(ISession? session, byte[] continuationPoint) =>
|
||||
// Restoring removes the slot; we discard the value. Null session / unknown point ⇒ no-op.
|
||||
session?.RestoreHistoryContinuationPoint(continuationPoint);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// In-memory <see cref="IHistoryContinuationStore"/> independent of any OPC UA session — for the
|
||||
/// session-less in-process test path, which boots a real server but invokes HistoryRead with a
|
||||
/// session-less <c>OperationContext</c>. Mirrors the production store's contract: 16-byte Guid points,
|
||||
/// single-use take, idempotent release, and a bounded capacity with oldest-eviction so the same cap
|
||||
/// semantics are exercised.
|
||||
/// </summary>
|
||||
internal sealed class InMemoryHistoryContinuationStore(int capacity = 100) : IHistoryContinuationStore
|
||||
{
|
||||
private readonly object _gate = new();
|
||||
private readonly Dictionary<Guid, HistoryContinuationState> _states = new();
|
||||
// Insertion order, so we can evict the OLDEST when over capacity (matches the SDK store).
|
||||
private readonly Queue<Guid> _order = new();
|
||||
private readonly int _capacity = capacity < 1 ? 1 : capacity;
|
||||
|
||||
/// <inheritdoc />
|
||||
public byte[]? Save(ISession? session, HistoryContinuationState state)
|
||||
{
|
||||
var id = Guid.NewGuid();
|
||||
lock (_gate)
|
||||
{
|
||||
while (_states.Count >= _capacity && _order.Count > 0)
|
||||
{
|
||||
var oldest = _order.Dequeue();
|
||||
_states.Remove(oldest);
|
||||
}
|
||||
|
||||
_states[id] = state;
|
||||
_order.Enqueue(id);
|
||||
}
|
||||
|
||||
return id.ToByteArray();
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public HistoryContinuationState? TryTake(ISession? session, byte[] continuationPoint)
|
||||
{
|
||||
if (continuationPoint is null || continuationPoint.Length != 16) return null;
|
||||
var id = new Guid(continuationPoint);
|
||||
lock (_gate)
|
||||
{
|
||||
if (_states.Remove(id, out var state)) return state;
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public void Release(ISession? session, byte[] continuationPoint)
|
||||
{
|
||||
if (continuationPoint is null || continuationPoint.Length != 16) return;
|
||||
var id = new Guid(continuationPoint);
|
||||
lock (_gate)
|
||||
{
|
||||
_states.Remove(id);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,167 @@
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.OpcUaServer;
|
||||
|
||||
/// <summary>
|
||||
/// The kind of variable-history read a continuation point resumes. Only the two count-capped,
|
||||
/// time-range arms page server-side (see <see cref="HistoryPaging"/>); AtTime is single-shot
|
||||
/// (no client count cap, so there is never a "full page" signal to page on) and never produces a
|
||||
/// continuation point, so it has no entry here.
|
||||
/// </summary>
|
||||
internal enum HistoryReadKind
|
||||
{
|
||||
/// <summary>HistoryRead-Raw — resumes via <see cref="IHistorianDataSource.ReadRawAsync"/>.</summary>
|
||||
Raw,
|
||||
|
||||
/// <summary>HistoryRead-Processed — resumes via <see cref="IHistorianDataSource.ReadProcessedAsync"/>.</summary>
|
||||
Processed,
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// The server-side resume state stored behind an opaque continuation point for a single
|
||||
/// paged variable-history read. Captures exactly enough to continue the SAME logical read from
|
||||
/// where the previous page stopped: the read kind + tagname, the original (inclusive) end of the
|
||||
/// window, the next start of the window, and — for Processed — the aggregate + interval.
|
||||
/// <para>
|
||||
/// The boundary fields (<see cref="NextStartUtc"/> + <see cref="BoundarySkipCount"/>) encode a
|
||||
/// tie-safe resume cursor: the next page reads from <see cref="NextStartUtc"/> INCLUSIVE and
|
||||
/// then drops the first <see cref="BoundarySkipCount"/> samples whose SourceTimestamp equals
|
||||
/// <see cref="NextStartUtc"/>. This guarantees that samples sharing the page-boundary timestamp
|
||||
/// are neither re-returned (duplicate) nor skipped — see <see cref="HistoryPaging"/>.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// This record carries no SDK types so the whole paging decision surface is a pure, allocation-
|
||||
/// cheap value that unit tests can drive directly.
|
||||
/// </para>
|
||||
/// </summary>
|
||||
/// <param name="Kind">Which variable-history arm this state resumes.</param>
|
||||
/// <param name="Tagname">The resolved historian tagname (NOT the NodeId) to read from.</param>
|
||||
/// <param name="NextStartUtc">
|
||||
/// Inclusive lower bound for the next page — the boundary timestamp the previous page stopped on.
|
||||
/// </param>
|
||||
/// <param name="EndUtc">The original (inclusive) upper bound of the read window; unchanged across pages.</param>
|
||||
/// <param name="BoundarySkipCount">
|
||||
/// How many samples whose SourceTimestamp equals <see cref="NextStartUtc"/> were already returned on
|
||||
/// prior pages and must be dropped from the head of the next page (tie de-dup).
|
||||
/// </param>
|
||||
/// <param name="NumValuesPerNode">The client's per-page cap; re-applied to every resumed page.</param>
|
||||
/// <param name="Aggregate">The aggregate for a Processed read; ignored for Raw.</param>
|
||||
/// <param name="IntervalTicks">The Processed bucketing interval in ticks; ignored for Raw.</param>
|
||||
internal sealed record HistoryContinuationState(
|
||||
HistoryReadKind Kind,
|
||||
string Tagname,
|
||||
DateTime NextStartUtc,
|
||||
DateTime EndUtc,
|
||||
int BoundarySkipCount,
|
||||
uint NumValuesPerNode,
|
||||
HistoryAggregateType Aggregate,
|
||||
long IntervalTicks);
|
||||
|
||||
/// <summary>
|
||||
/// Pure server-side continuation-point paging decisions for the count-capped variable-history arms
|
||||
/// (Raw / Processed). The backend (Wonderware sidecar) does NOT page — it returns up to
|
||||
/// <c>NumValuesPerNode</c> samples with a null continuation point — so paging is synthesised here,
|
||||
/// time-based:
|
||||
/// <list type="bullet">
|
||||
/// <item>A page that returns EXACTLY the requested cap (<c>NumValuesPerNode > 0</c>) MAY have
|
||||
/// more behind it ⇒ emit a continuation point.</item>
|
||||
/// <item>A short page (fewer than the cap) is the last page ⇒ no continuation point.</item>
|
||||
/// <item><c>NumValuesPerNode == 0</c> means "all values, no limit" (OPC UA Part 11) ⇒ never page;
|
||||
/// return everything in one shot.</item>
|
||||
/// </list>
|
||||
/// All methods are static + pure so they unit-test without a server, a session, or the SDK.
|
||||
/// </summary>
|
||||
internal static class HistoryPaging
|
||||
{
|
||||
/// <summary>
|
||||
/// Decide whether a just-returned page is "full" and therefore MAY be followed by more data —
|
||||
/// the signal to emit a continuation point. A page is full when the client asked for a finite
|
||||
/// cap (<paramref name="numValuesPerNode"/> > 0) and the backend returned exactly that many
|
||||
/// samples. A short page (or an unlimited <c>0</c> request) is terminal.
|
||||
/// </summary>
|
||||
/// <param name="returnedCount">The number of samples the backend returned for this page.</param>
|
||||
/// <param name="numValuesPerNode">The client's per-page cap; <c>0</c> means "no limit".</param>
|
||||
/// <returns><c>true</c> when a continuation point should be emitted; otherwise <c>false</c>.</returns>
|
||||
public static bool IsFullPage(int returnedCount, uint numValuesPerNode) =>
|
||||
numValuesPerNode > 0 && returnedCount >= numValuesPerNode;
|
||||
|
||||
/// <summary>
|
||||
/// Build the resume cursor (next-start + boundary skip count) from the last sample of a full
|
||||
/// page, tie-safe against samples that share the boundary SourceTimestamp.
|
||||
/// <para>
|
||||
/// The next page resumes from the LAST returned sample's SourceTimestamp <em>inclusive</em>
|
||||
/// (NOT advanced by a tick), and the returned <paramref name="boundarySkipCount"/> counts how
|
||||
/// many samples in the page already carry that exact boundary timestamp. Resuming inclusively
|
||||
/// + dropping that many head samples guarantees:
|
||||
/// <list type="bullet">
|
||||
/// <item>no sample is re-returned (the ones already emitted at the boundary are skipped), and</item>
|
||||
/// <item>no sample is skipped (any un-emitted ties at the boundary are still read because we
|
||||
/// start AT the boundary, not after it).</item>
|
||||
/// </list>
|
||||
/// A naive "+1 tick" advance would skip un-emitted ties; this carry-offset strategy does not.
|
||||
/// </para>
|
||||
/// </summary>
|
||||
/// <param name="page">The page just returned (chronological, non-empty — guaranteed by the caller,
|
||||
/// which only pages a full page and a full page implies <c>NumValuesPerNode > 0</c> samples).</param>
|
||||
/// <param name="nextStartUtc">The boundary timestamp the next page resumes from (inclusive).</param>
|
||||
/// <param name="boundarySkipCount">How many head samples at <paramref name="nextStartUtc"/> the next
|
||||
/// page must drop (samples already emitted at the boundary timestamp).</param>
|
||||
public static void ComputeResumeCursor(
|
||||
IReadOnlyList<DataValueSnapshot> page,
|
||||
out DateTime nextStartUtc,
|
||||
out int boundarySkipCount)
|
||||
{
|
||||
// The boundary is the last returned sample's SourceTimestamp. A sample whose SourceTimestamp is
|
||||
// null (Bad/unset) cannot anchor a time cursor; fall back to MinValue so the next read covers the
|
||||
// whole remaining window rather than silently dropping data — duplicates are then de-duped by the
|
||||
// skip count below.
|
||||
var last = page[^1];
|
||||
nextStartUtc = last.SourceTimestampUtc ?? DateTime.MinValue;
|
||||
|
||||
// Count how many trailing samples in THIS page share the boundary timestamp — those are the ties
|
||||
// already emitted at the boundary that the next page must drop from its head.
|
||||
var skip = 0;
|
||||
for (var i = page.Count - 1; i >= 0; i--)
|
||||
{
|
||||
if ((page[i].SourceTimestampUtc ?? DateTime.MinValue) == nextStartUtc) skip++;
|
||||
else break;
|
||||
}
|
||||
|
||||
boundarySkipCount = skip;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Drop the first <paramref name="boundarySkipCount"/> samples of a freshly-read resumed page
|
||||
/// whose SourceTimestamp equals the boundary <paramref name="boundaryUtc"/> — the ties already
|
||||
/// emitted on the previous page. Samples past the boundary timestamp are always kept (only the
|
||||
/// exact-boundary head is trimmed), so a backend that returns fewer boundary ties than expected
|
||||
/// (data pruned between pages) still yields a correct, monotonic result.
|
||||
/// </summary>
|
||||
/// <param name="resumedPage">The page returned by the resumed backend read (chronological).</param>
|
||||
/// <param name="boundaryUtc">The boundary timestamp the resume read started at (inclusive).</param>
|
||||
/// <param name="boundarySkipCount">How many head samples at <paramref name="boundaryUtc"/> to drop.</param>
|
||||
/// <returns>The page with the already-emitted boundary ties trimmed from the head.</returns>
|
||||
public static IReadOnlyList<DataValueSnapshot> TrimBoundaryDuplicates(
|
||||
IReadOnlyList<DataValueSnapshot> resumedPage,
|
||||
DateTime boundaryUtc,
|
||||
int boundarySkipCount)
|
||||
{
|
||||
if (boundarySkipCount <= 0 || resumedPage.Count == 0) return resumedPage;
|
||||
|
||||
var dropped = 0;
|
||||
var i = 0;
|
||||
while (i < resumedPage.Count
|
||||
&& dropped < boundarySkipCount
|
||||
&& (resumedPage[i].SourceTimestampUtc ?? DateTime.MinValue) == boundaryUtc)
|
||||
{
|
||||
i++;
|
||||
dropped++;
|
||||
}
|
||||
|
||||
if (i == 0) return resumedPage;
|
||||
// Slice off the trimmed head; return a copy so the caller owns a plain list (no SDK coupling).
|
||||
var trimmed = new List<DataValueSnapshot>(resumedPage.Count - i);
|
||||
for (var j = i; j < resumedPage.Count; j++) trimmed.Add(resumedPage[j]);
|
||||
return trimmed;
|
||||
}
|
||||
}
|
||||
@@ -152,6 +152,23 @@ public sealed class OtOpcUaNodeManager : CustomNodeManager2
|
||||
set => _historianDataSource = value ?? NullHistorianDataSource.Instance;
|
||||
}
|
||||
|
||||
private volatile IHistoryContinuationStore _historyContinuationStore = new SessionHistoryContinuationStore();
|
||||
|
||||
/// <summary>
|
||||
/// The store that holds the server-side resume state behind an opaque HistoryRead continuation
|
||||
/// point for the count-capped variable-history arms (Raw / Processed). The default
|
||||
/// <see cref="SessionHistoryContinuationStore"/> binds points to the OPC UA session — so they are
|
||||
/// capped (<c>ServerConfiguration.MaxHistoryContinuationPoints</c>, SDK default 100, oldest-evicted)
|
||||
/// and disposed when the session closes. Exposed (internal) so the session-less in-process tests can
|
||||
/// inject an <see cref="InMemoryHistoryContinuationStore"/> and exercise the full multi-page round
|
||||
/// trip through the same dispatch path. Assigning <c>null</c> restores the session-backed default.
|
||||
/// </summary>
|
||||
internal IHistoryContinuationStore HistoryContinuationStore
|
||||
{
|
||||
get => _historyContinuationStore;
|
||||
set => _historyContinuationStore = value ?? new SessionHistoryContinuationStore();
|
||||
}
|
||||
|
||||
/// <summary>Look up a materialised Part 9 alarm-condition node by its alarm node id (the
|
||||
/// ScriptedAlarmId), or null if not yet materialised. Exposed for tests + diagnostics.</summary>
|
||||
/// <param name="alarmNodeId">The alarm node identifier (== ScriptedAlarmId).</param>
|
||||
@@ -1328,6 +1345,13 @@ public sealed class OtOpcUaNodeManager : CustomNodeManager2
|
||||
/// Serve a HistoryRead-Raw request over the pre-filtered historized variable handles, dispatching
|
||||
/// each to <see cref="IHistorianDataSource.ReadRawAsync"/>. Modified-history reads
|
||||
/// (<c>IsReadModified</c>) are unsupported — we don't serve a modified-value history surface.
|
||||
/// <para>
|
||||
/// Raw is the only arm that pages server-side: <c>ReadRawModifiedDetails</c> carries a client
|
||||
/// count cap (<c>NumValuesPerNode</c>), so a page that returns exactly that many samples MAY
|
||||
/// have more behind it ⇒ a time-based continuation point is emitted (see
|
||||
/// <see cref="ServeRawPaged"/>). An inbound continuation point on a node resumes its stored
|
||||
/// read. <c>NumValuesPerNode == 0</c> ("all values") never pages.
|
||||
/// </para>
|
||||
/// </summary>
|
||||
protected override void HistoryReadRawModified(
|
||||
ServerSystemContext context,
|
||||
@@ -1339,6 +1363,7 @@ public sealed class OtOpcUaNodeManager : CustomNodeManager2
|
||||
List<NodeHandle> nodesToProcess,
|
||||
IDictionary<NodeId, NodeState> cache)
|
||||
{
|
||||
var session = context.OperationContext?.Session;
|
||||
foreach (var handle in nodesToProcess)
|
||||
{
|
||||
if (details.IsReadModified)
|
||||
@@ -1348,12 +1373,9 @@ public sealed class OtOpcUaNodeManager : CustomNodeManager2
|
||||
continue;
|
||||
}
|
||||
|
||||
ServeNode(handle, results, errors, (source, tagname) => source.ReadRawAsync(
|
||||
tagname,
|
||||
details.StartTime,
|
||||
details.EndTime,
|
||||
details.NumValuesPerNode,
|
||||
CancellationToken.None));
|
||||
ServeRawPaged(
|
||||
handle, session, nodesToRead, results, errors,
|
||||
details.StartTime, details.EndTime, details.NumValuesPerNode);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1362,7 +1384,9 @@ public sealed class OtOpcUaNodeManager : CustomNodeManager2
|
||||
/// parallel <c>AggregateType</c> collection — the base guarantees it is the same length as
|
||||
/// <c>nodesToRead</c>) to a <see cref="HistoryAggregateType"/> and dispatching to
|
||||
/// <see cref="IHistorianDataSource.ReadProcessedAsync"/>. An unknown aggregate yields
|
||||
/// <c>BadAggregateNotSupported</c> for that node.
|
||||
/// <c>BadAggregateNotSupported</c> for that node. Single-shot (no continuation point):
|
||||
/// <c>ReadProcessedDetails</c> carries no client count cap — the bucket count is deterministic
|
||||
/// (window / interval) — so there is no "full page" signal to page on.
|
||||
/// </summary>
|
||||
protected override void HistoryReadProcessed(
|
||||
ServerSystemContext context,
|
||||
@@ -1374,6 +1398,8 @@ public sealed class OtOpcUaNodeManager : CustomNodeManager2
|
||||
List<NodeHandle> nodesToProcess,
|
||||
IDictionary<NodeId, NodeState> cache)
|
||||
{
|
||||
// OPC UA ProcessingInterval is a Duration in milliseconds — convert once per batch.
|
||||
var interval = TimeSpan.FromMilliseconds(details.ProcessingInterval);
|
||||
foreach (var handle in nodesToProcess)
|
||||
{
|
||||
// AggregateType is a per-node parallel collection (same length as nodesToRead, enforced by
|
||||
@@ -1386,12 +1412,16 @@ public sealed class OtOpcUaNodeManager : CustomNodeManager2
|
||||
continue;
|
||||
}
|
||||
|
||||
// Processed is SINGLE-SHOT (no continuation point). Unlike Raw, ReadProcessedDetails carries
|
||||
// NO client count cap (NumValuesPerNode) — the bucket count is deterministic (window / interval)
|
||||
// and the single-shot backend returns every bucket in one read, so there is no "full page ⇒
|
||||
// maybe more" signal to page on. Returning the complete aggregate result with a null CP is
|
||||
// spec-conformant (OPC UA Part 11 lets a server return all available data in one response).
|
||||
ServeNode(handle, results, errors, (source, tagname) => source.ReadProcessedAsync(
|
||||
tagname,
|
||||
details.StartTime,
|
||||
details.EndTime,
|
||||
// OPC UA ProcessingInterval is a Duration in milliseconds.
|
||||
TimeSpan.FromMilliseconds(details.ProcessingInterval),
|
||||
interval,
|
||||
aggregate.Value,
|
||||
CancellationToken.None));
|
||||
}
|
||||
@@ -1399,7 +1429,9 @@ public sealed class OtOpcUaNodeManager : CustomNodeManager2
|
||||
|
||||
/// <summary>
|
||||
/// Serve a HistoryRead-AtTime request, dispatching the requested timestamps to
|
||||
/// <see cref="IHistorianDataSource.ReadAtTimeAsync"/>.
|
||||
/// <see cref="IHistorianDataSource.ReadAtTimeAsync"/>. Single-shot (no continuation point):
|
||||
/// AtTime carries no client count cap — the request IS the timestamp list and the result is
|
||||
/// exactly one sample per requested timestamp — so there is no "full page" signal to page on.
|
||||
/// </summary>
|
||||
protected override void HistoryReadAtTime(
|
||||
ServerSystemContext context,
|
||||
@@ -1613,7 +1645,9 @@ public sealed class OtOpcUaNodeManager : CustomNodeManager2
|
||||
// No source samples ⇒ GoodNoData (the node is historized, the window just held no data).
|
||||
StatusCode = historyData.DataValues.Count == 0 ? StatusCodes.GoodNoData : StatusCodes.Good,
|
||||
HistoryData = new ExtensionObject(historyData),
|
||||
// We never issue continuation points — every read returns the full window in one shot.
|
||||
// Single-shot arms (Processed / AtTime) never page — the backend returns the complete
|
||||
// result in one read (no client count cap to detect a "full page" against), so no
|
||||
// continuation point. Raw pages via ServeRawPaged, not this helper.
|
||||
ContinuationPoint = null,
|
||||
};
|
||||
errors[handle.Index] = ServiceResult.Good;
|
||||
@@ -1632,6 +1666,168 @@ public sealed class OtOpcUaNodeManager : CustomNodeManager2
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Serve one historized variable handle for a HistoryRead-Raw request WITH server-side
|
||||
/// continuation-point paging. The single-shot Wonderware backend does not page, so paging is
|
||||
/// synthesised time-based:
|
||||
/// <list type="bullet">
|
||||
/// <item><b>Fresh read</b> (no inbound continuation point): read the window from
|
||||
/// <c>details.StartTime</c> to <paramref name="endUtc"/> capped at
|
||||
/// <paramref name="numValuesPerNode"/>. If the page comes back FULL (exactly the cap, and the
|
||||
/// cap is > 0), store a resume cursor and emit a continuation point.</item>
|
||||
/// <item><b>Resume read</b> (inbound continuation point present): take the stored cursor, read
|
||||
/// the next page from the boundary forward, trim already-emitted boundary ties, and emit a
|
||||
/// FRESH continuation point only if THIS page is also full — else null (done).</item>
|
||||
/// </list>
|
||||
/// The resume cursor is tie-safe (see <see cref="HistoryPaging.ComputeResumeCursor"/> /
|
||||
/// <see cref="HistoryPaging.TrimBoundaryDuplicates"/>): the next page resumes from the boundary
|
||||
/// timestamp INCLUSIVE and drops the head ties already returned, so samples sharing the boundary
|
||||
/// SourceTimestamp are neither duplicated nor skipped. Continuation points live in
|
||||
/// <see cref="HistoryContinuationStore"/> — session-bound + capped in production. Per-node error
|
||||
/// isolation matches <see cref="ServeNode"/>: a backend throw / an unknown continuation point
|
||||
/// becomes a Bad status for THIS node only and never throws out of the batch.
|
||||
/// </summary>
|
||||
/// <param name="handle">The pre-filtered node handle; <c>handle.Index</c> indexes results/errors.</param>
|
||||
/// <param name="session">The session the read runs under (null on the session-less in-process path).</param>
|
||||
/// <param name="nodesToRead">The per-node read list; <c>nodesToRead[handle.Index].ContinuationPoint</c>
|
||||
/// carries the inbound continuation point (non-null ⇒ a resume read).</param>
|
||||
/// <param name="results">The service-level results list to fill at <c>handle.Index</c>.</param>
|
||||
/// <param name="errors">The service-level errors list to fill at <c>handle.Index</c>.</param>
|
||||
/// <param name="startTimeUtc">The request window's (inclusive) lower bound, used for a fresh read.</param>
|
||||
/// <param name="endUtc">The (inclusive) upper bound of the read window; unchanged across pages.</param>
|
||||
/// <param name="numValuesPerNode">The client's per-page cap; <c>0</c> means "all values, no paging".</param>
|
||||
private void ServeRawPaged(
|
||||
NodeHandle handle,
|
||||
ISession? session,
|
||||
IList<HistoryReadValueId> nodesToRead,
|
||||
IList<SdkHistoryReadResult> results,
|
||||
IList<ServiceResult> errors,
|
||||
DateTime startTimeUtc,
|
||||
DateTime endUtc,
|
||||
uint numValuesPerNode)
|
||||
{
|
||||
var inboundCp = nodesToRead[handle.Index].ContinuationPoint;
|
||||
|
||||
try
|
||||
{
|
||||
DateTime startUtc;
|
||||
var boundarySkip = 0;
|
||||
|
||||
string tagname;
|
||||
if (inboundCp is { Length: > 0 })
|
||||
{
|
||||
// Resume read: take the stored cursor. A miss (unknown / evicted / malformed point) ⇒
|
||||
// BadContinuationPointInvalid for THIS node.
|
||||
var state = _historyContinuationStore.TryTake(session, inboundCp);
|
||||
if (state is null)
|
||||
{
|
||||
errors[handle.Index] = StatusCodes.BadContinuationPointInvalid;
|
||||
results[handle.Index] = new SdkHistoryReadResult { StatusCode = StatusCodes.BadContinuationPointInvalid };
|
||||
return;
|
||||
}
|
||||
|
||||
tagname = state.Tagname;
|
||||
startUtc = state.NextStartUtc;
|
||||
boundarySkip = state.BoundarySkipCount;
|
||||
endUtc = state.EndUtc;
|
||||
numValuesPerNode = state.NumValuesPerNode;
|
||||
}
|
||||
else
|
||||
{
|
||||
// Fresh read: resolve the node's historian tagname (as ServeNode does).
|
||||
var idString = handle.NodeId.Identifier?.ToString();
|
||||
if (idString is null || !TryGetHistorizedTagname(idString, out var resolved) || resolved is null)
|
||||
{
|
||||
errors[handle.Index] = StatusCodes.BadHistoryOperationUnsupported;
|
||||
return;
|
||||
}
|
||||
|
||||
tagname = resolved;
|
||||
startUtc = startTimeUtc;
|
||||
}
|
||||
|
||||
// HistoryRead is NOT under the node-manager Lock — block-bridging the async source is safe.
|
||||
var sourceResult = HistorianDataSource
|
||||
.ReadRawAsync(tagname, startUtc, endUtc, numValuesPerNode, CancellationToken.None)
|
||||
.GetAwaiter().GetResult();
|
||||
|
||||
// On a resume read, drop the boundary ties already returned on the prior page.
|
||||
var samples = inboundCp is { Length: > 0 }
|
||||
? HistoryPaging.TrimBoundaryDuplicates(sourceResult.Samples, startUtc, boundarySkip)
|
||||
: sourceResult.Samples;
|
||||
|
||||
// The "full page" test is against the RAW backend count (before trimming): the backend honoured
|
||||
// the cap, so a full backend page ⇒ there may be more even if we trimmed some boundary ties.
|
||||
byte[]? outboundCp = null;
|
||||
if (HistoryPaging.IsFullPage(sourceResult.Samples.Count, numValuesPerNode) && samples.Count > 0)
|
||||
{
|
||||
HistoryPaging.ComputeResumeCursor(samples, out var nextStart, out var skip);
|
||||
var nextState = new HistoryContinuationState(
|
||||
HistoryReadKind.Raw, tagname, nextStart, endUtc, skip, numValuesPerNode,
|
||||
Aggregate: default, IntervalTicks: 0);
|
||||
// Save may return null (no session on this request) ⇒ degrade to single-shot for this node.
|
||||
outboundCp = _historyContinuationStore.Save(session, nextState);
|
||||
}
|
||||
|
||||
var historyData = ToHistoryDataFromSamples(samples);
|
||||
results[handle.Index] = new SdkHistoryReadResult
|
||||
{
|
||||
// No samples ⇒ GoodNoData (the node is historized, the window just held no data).
|
||||
StatusCode = samples.Count == 0 ? StatusCodes.GoodNoData : StatusCodes.Good,
|
||||
HistoryData = new ExtensionObject(historyData),
|
||||
ContinuationPoint = outboundCp,
|
||||
};
|
||||
errors[handle.Index] = ServiceResult.Good;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
// One node's backend failure must not throw out of the batch — Bad for THIS node only.
|
||||
#pragma warning disable CS0618 // Type or member is obsolete
|
||||
Utils.LogError(ex, "OtOpcUaNodeManager: HistoryReadRaw (paged) failed for node {0}", handle.NodeId);
|
||||
#pragma warning restore CS0618
|
||||
errors[handle.Index] = StatusCodes.BadHistoryOperationUnsupported;
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Drop the resume state for any continuation points the client asked to release
|
||||
/// (<c>releaseContinuationPoints == true</c>) and return WITHOUT reading data, per OPC UA Part 4.
|
||||
/// The base dispatcher routes a release-only HistoryRead here (it never reaches the per-details
|
||||
/// arms), so this is the single place that must free Raw's stored cursors. Each handle's released
|
||||
/// point is <c>nodesToRead[handle.Index].ContinuationPoint</c>; releasing an unknown / null point
|
||||
/// is a harmless no-op. Errors are left Good (the base pre-seeds them) — a release does not fail.
|
||||
/// </summary>
|
||||
protected override void HistoryReleaseContinuationPoints(
|
||||
ServerSystemContext context,
|
||||
IList<HistoryReadValueId> nodesToRead,
|
||||
IList<ServiceResult> errors,
|
||||
List<NodeHandle> nodesToProcess,
|
||||
IDictionary<NodeId, NodeState> cache)
|
||||
{
|
||||
var session = context.OperationContext?.Session;
|
||||
foreach (var handle in nodesToProcess)
|
||||
{
|
||||
var cp = nodesToRead[handle.Index].ContinuationPoint;
|
||||
if (cp is { Length: > 0 })
|
||||
{
|
||||
_historyContinuationStore.Release(session, cp);
|
||||
}
|
||||
|
||||
errors[handle.Index] = ServiceResult.Good;
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>Project a plain sample list into an SDK <see cref="HistoryData"/> (the paged Raw path
|
||||
/// works on a trimmed <see cref="IReadOnlyList{T}"/> rather than a whole <see cref="HistorianRead"/>).</summary>
|
||||
/// <param name="samples">The samples to project (already trimmed of boundary duplicates).</param>
|
||||
/// <returns>The populated SDK <see cref="HistoryData"/>.</returns>
|
||||
private static HistoryData ToHistoryDataFromSamples(IReadOnlyList<DataValueSnapshot> samples)
|
||||
{
|
||||
var values = new DataValueCollection(samples.Count);
|
||||
foreach (var sample in samples) values.Add(ToSdkDataValue(sample));
|
||||
return new HistoryData { DataValues = values };
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Map an OPC UA Part 13 standard-aggregate function NodeId to our
|
||||
/// <see cref="HistoryAggregateType"/>. Returns <c>null</c> for any aggregate we don't serve so
|
||||
|
||||
Reference in New Issue
Block a user