diff --git a/docs/Historian.md b/docs/Historian.md index a867cb84..c2882f2c 100644 --- a/docs/Historian.md +++ b/docs/Historian.md @@ -121,11 +121,36 @@ A historized node with no historian configured never returns an error status — empty. This means a deployment can author and publish historized tags before the historian sidecar is provisioned, without producing error spikes in connected clients. +### Continuation-point paging (Raw) + +`HistoryRead-Raw` is paged server-side. The historian backend is single-shot (it returns up to +`NumValuesPerNode` samples with no continuation point of its own), so the server synthesises +paging time-based: + +- A page that returns **exactly** `NumValuesPerNode` samples (with `NumValuesPerNode > 0`) MAY + have more behind it, so the server stores a resume cursor and returns an opaque continuation + point (16 bytes). The client hands it back to fetch the next page. +- A short page (fewer than the cap) is the last page — no continuation point. +- `NumValuesPerNode == 0` ("all values, no limit") is never paged; the whole window returns in one + shot. +- The resume cursor is **tie-safe**: the next page resumes from the last returned sample's + SourceTimestamp *inclusive* and drops the boundary samples already emitted, so samples sharing + the boundary timestamp are neither duplicated nor skipped. + +Continuation points are bound to the OPC UA session (the SDK's +`ServerConfiguration.MaxHistoryContinuationPoints` cap, default 100, with oldest-eviction; points +are disposed when the session closes). Resuming an unknown / evicted / released point returns +`BadContinuationPointInvalid`. `releaseContinuationPoints` drops the stored cursors without reading +data. + ### Known limitations -- **No server-managed continuation points.** Each HistoryRead call is single-shot. The server - honors the client's `NumValuesPerNode` limit but does not issue continuation points for - large result sets. Paging across multiple calls is a documented follow-up. +- **Processed and AtTime are single-shot** (no continuation points). Unlike Raw, neither + `ReadProcessedDetails` nor `ReadAtTimeDetails` carries a client count cap (`NumValuesPerNode`): + the Processed bucket count is deterministic (window / interval) and AtTime returns exactly one + sample per requested timestamp, so the single-shot backend returns the complete result in one + read and there is no "full page ⇒ maybe more" signal to page on. Returning the full result with + no continuation point is spec-conformant. - **No modified-value history** (`HistoryReadModified`). Requests for modified values return `BadHistoryOperationUnsupported`. This is a follow-up. diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer/HistoryContinuationStore.cs b/src/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer/HistoryContinuationStore.cs new file mode 100644 index 00000000..60c90473 --- /dev/null +++ b/src/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer/HistoryContinuationStore.cs @@ -0,0 +1,154 @@ +using Opc.Ua.Server; + +namespace ZB.MOM.WW.OtOpcUa.OpcUaServer; + +/// +/// Stores the server-side resume state behind an opaque OPC UA HistoryRead continuation point. +/// A continuation point is 16 opaque bytes (a fresh ); the store maps it to a +/// . The seam exists so the node manager can page against the +/// SDK's per-session store in production (lifecycle + cap + cleanup owned by the SDK) while tests +/// drive a session-less in-memory store through the same code path. +/// +internal interface IHistoryContinuationStore +{ + /// + /// Persist and return the opaque continuation-point bytes a client + /// hands back to resume. Returns null when the state cannot be stored (e.g. the + /// session-backed store has no session on this request) — the caller then returns the page with + /// NO continuation point, which is spec-safe (a server may always return what it has in one shot). + /// + /// The session the read runs under, or null for a session-less call. + /// The resume state to store. + /// The opaque continuation-point bytes, or null when storage is unavailable. + byte[]? Save(ISession? session, HistoryContinuationState state); + + /// + /// Look up and REMOVE the resume state for an inbound continuation point (a continuation point is + /// single-use: taking it frees the slot; a fresh point is emitted if the resumed page is also full). + /// + /// The session the read runs under, or null for a session-less call. + /// The opaque bytes the client handed back. + /// The stored state, or null when the point is unknown / expired / malformed. + HistoryContinuationState? TryTake(ISession? session, byte[] continuationPoint); + + /// + /// Drop the resume state for a continuation point the client asked to release + /// (releaseContinuationPoints) WITHOUT reading any data. Idempotent — releasing an unknown + /// point is a no-op. + /// + /// The session the read runs under, or null for a session-less call. + /// The opaque bytes to release. + void Release(ISession? session, byte[] continuationPoint); +} + +/// +/// Production backed by the OPC UA SDK's per-session +/// history-continuation store ( / +/// ). Using the SDK store gives us, for free: +/// +/// per-session lifecycle — points are disposed when the session closes, so a client that +/// disconnects mid-page can never leak resume state; +/// a bounded capacity with oldest-eviction — the cap is +/// ServerConfiguration.MaxHistoryContinuationPoints (SDK default 100); when a session +/// exceeds it the SDK silently drops its OLDEST point, so a misbehaving client cannot grow the +/// store unboundedly. A subsequent resume of an evicted point returns +/// BadContinuationPointInvalid (a miss); +/// thread-safety — the SDK session locks internally. +/// +/// The continuation-point bytes are a fresh 16-byte ; the SDK keys its slot by that +/// Guid and round-trips the opaque bytes through the wire untouched (verified against the +/// MasterNodeManager HistoryRead path — it does not register or cap history points itself). +/// +/// When a HistoryRead arrives with no session (only the in-process test path does this), there is +/// nowhere session-bound to durably store resume state across calls, so returns +/// null and the read degrades to single-shot. Tests that exercise multi-page paging inject +/// the in-memory instead. +/// +/// +internal sealed class SessionHistoryContinuationStore : IHistoryContinuationStore +{ + /// + public byte[]? Save(ISession? session, HistoryContinuationState state) + { + if (session is null) return null; + + // A fresh Guid is the opaque point: 16 bytes, collision-free, and the SDK keys its session slot by + // it. The SDK enforces ServerConfiguration.MaxHistoryContinuationPoints with oldest-eviction. + var id = Guid.NewGuid(); + session.SaveHistoryContinuationPoint(id, state); + return id.ToByteArray(); + } + + /// + public HistoryContinuationState? TryTake(ISession? session, byte[] continuationPoint) + { + if (session is null) return null; + // RestoreHistoryContinuationPoint REMOVES the slot (single-use) and returns null for an unknown / + // malformed (non-16-byte) point — exactly the "miss ⇒ BadContinuationPointInvalid" contract. + return session.RestoreHistoryContinuationPoint(continuationPoint) as HistoryContinuationState; + } + + /// + public void Release(ISession? session, byte[] continuationPoint) => + // Restoring removes the slot; we discard the value. Null session / unknown point ⇒ no-op. + session?.RestoreHistoryContinuationPoint(continuationPoint); +} + +/// +/// In-memory independent of any OPC UA session — for the +/// session-less in-process test path, which boots a real server but invokes HistoryRead with a +/// session-less OperationContext. Mirrors the production store's contract: 16-byte Guid points, +/// single-use take, idempotent release, and a bounded capacity with oldest-eviction so the same cap +/// semantics are exercised. +/// +internal sealed class InMemoryHistoryContinuationStore(int capacity = 100) : IHistoryContinuationStore +{ + private readonly object _gate = new(); + private readonly Dictionary _states = new(); + // Insertion order, so we can evict the OLDEST when over capacity (matches the SDK store). + private readonly Queue _order = new(); + private readonly int _capacity = capacity < 1 ? 1 : capacity; + + /// + public byte[]? Save(ISession? session, HistoryContinuationState state) + { + var id = Guid.NewGuid(); + lock (_gate) + { + while (_states.Count >= _capacity && _order.Count > 0) + { + var oldest = _order.Dequeue(); + _states.Remove(oldest); + } + + _states[id] = state; + _order.Enqueue(id); + } + + return id.ToByteArray(); + } + + /// + public HistoryContinuationState? TryTake(ISession? session, byte[] continuationPoint) + { + if (continuationPoint is null || continuationPoint.Length != 16) return null; + var id = new Guid(continuationPoint); + lock (_gate) + { + if (_states.Remove(id, out var state)) return state; + } + + return null; + } + + /// + public void Release(ISession? session, byte[] continuationPoint) + { + if (continuationPoint is null || continuationPoint.Length != 16) return; + var id = new Guid(continuationPoint); + lock (_gate) + { + _states.Remove(id); + } + } +} diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer/HistoryPaging.cs b/src/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer/HistoryPaging.cs new file mode 100644 index 00000000..805b6356 --- /dev/null +++ b/src/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer/HistoryPaging.cs @@ -0,0 +1,167 @@ +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; + +namespace ZB.MOM.WW.OtOpcUa.OpcUaServer; + +/// +/// The kind of variable-history read a continuation point resumes. Only the two count-capped, +/// time-range arms page server-side (see ); AtTime is single-shot +/// (no client count cap, so there is never a "full page" signal to page on) and never produces a +/// continuation point, so it has no entry here. +/// +internal enum HistoryReadKind +{ + /// HistoryRead-Raw — resumes via . + Raw, + + /// HistoryRead-Processed — resumes via . + Processed, +} + +/// +/// The server-side resume state stored behind an opaque continuation point for a single +/// paged variable-history read. Captures exactly enough to continue the SAME logical read from +/// where the previous page stopped: the read kind + tagname, the original (inclusive) end of the +/// window, the next start of the window, and — for Processed — the aggregate + interval. +/// +/// The boundary fields ( + ) encode a +/// tie-safe resume cursor: the next page reads from INCLUSIVE and +/// then drops the first samples whose SourceTimestamp equals +/// . This guarantees that samples sharing the page-boundary timestamp +/// are neither re-returned (duplicate) nor skipped — see . +/// +/// +/// This record carries no SDK types so the whole paging decision surface is a pure, allocation- +/// cheap value that unit tests can drive directly. +/// +/// +/// Which variable-history arm this state resumes. +/// The resolved historian tagname (NOT the NodeId) to read from. +/// +/// Inclusive lower bound for the next page — the boundary timestamp the previous page stopped on. +/// +/// The original (inclusive) upper bound of the read window; unchanged across pages. +/// +/// How many samples whose SourceTimestamp equals were already returned on +/// prior pages and must be dropped from the head of the next page (tie de-dup). +/// +/// The client's per-page cap; re-applied to every resumed page. +/// The aggregate for a Processed read; ignored for Raw. +/// The Processed bucketing interval in ticks; ignored for Raw. +internal sealed record HistoryContinuationState( + HistoryReadKind Kind, + string Tagname, + DateTime NextStartUtc, + DateTime EndUtc, + int BoundarySkipCount, + uint NumValuesPerNode, + HistoryAggregateType Aggregate, + long IntervalTicks); + +/// +/// Pure server-side continuation-point paging decisions for the count-capped variable-history arms +/// (Raw / Processed). The backend (Wonderware sidecar) does NOT page — it returns up to +/// NumValuesPerNode samples with a null continuation point — so paging is synthesised here, +/// time-based: +/// +/// A page that returns EXACTLY the requested cap (NumValuesPerNode > 0) MAY have +/// more behind it ⇒ emit a continuation point. +/// A short page (fewer than the cap) is the last page ⇒ no continuation point. +/// NumValuesPerNode == 0 means "all values, no limit" (OPC UA Part 11) ⇒ never page; +/// return everything in one shot. +/// +/// All methods are static + pure so they unit-test without a server, a session, or the SDK. +/// +internal static class HistoryPaging +{ + /// + /// Decide whether a just-returned page is "full" and therefore MAY be followed by more data — + /// the signal to emit a continuation point. A page is full when the client asked for a finite + /// cap ( > 0) and the backend returned exactly that many + /// samples. A short page (or an unlimited 0 request) is terminal. + /// + /// The number of samples the backend returned for this page. + /// The client's per-page cap; 0 means "no limit". + /// true when a continuation point should be emitted; otherwise false. + public static bool IsFullPage(int returnedCount, uint numValuesPerNode) => + numValuesPerNode > 0 && returnedCount >= numValuesPerNode; + + /// + /// Build the resume cursor (next-start + boundary skip count) from the last sample of a full + /// page, tie-safe against samples that share the boundary SourceTimestamp. + /// + /// The next page resumes from the LAST returned sample's SourceTimestamp inclusive + /// (NOT advanced by a tick), and the returned counts how + /// many samples in the page already carry that exact boundary timestamp. Resuming inclusively + /// + dropping that many head samples guarantees: + /// + /// no sample is re-returned (the ones already emitted at the boundary are skipped), and + /// no sample is skipped (any un-emitted ties at the boundary are still read because we + /// start AT the boundary, not after it). + /// + /// A naive "+1 tick" advance would skip un-emitted ties; this carry-offset strategy does not. + /// + /// + /// The page just returned (chronological, non-empty — guaranteed by the caller, + /// which only pages a full page and a full page implies NumValuesPerNode > 0 samples). + /// The boundary timestamp the next page resumes from (inclusive). + /// How many head samples at the next + /// page must drop (samples already emitted at the boundary timestamp). + public static void ComputeResumeCursor( + IReadOnlyList page, + out DateTime nextStartUtc, + out int boundarySkipCount) + { + // The boundary is the last returned sample's SourceTimestamp. A sample whose SourceTimestamp is + // null (Bad/unset) cannot anchor a time cursor; fall back to MinValue so the next read covers the + // whole remaining window rather than silently dropping data — duplicates are then de-duped by the + // skip count below. + var last = page[^1]; + nextStartUtc = last.SourceTimestampUtc ?? DateTime.MinValue; + + // Count how many trailing samples in THIS page share the boundary timestamp — those are the ties + // already emitted at the boundary that the next page must drop from its head. + var skip = 0; + for (var i = page.Count - 1; i >= 0; i--) + { + if ((page[i].SourceTimestampUtc ?? DateTime.MinValue) == nextStartUtc) skip++; + else break; + } + + boundarySkipCount = skip; + } + + /// + /// Drop the first samples of a freshly-read resumed page + /// whose SourceTimestamp equals the boundary — the ties already + /// emitted on the previous page. Samples past the boundary timestamp are always kept (only the + /// exact-boundary head is trimmed), so a backend that returns fewer boundary ties than expected + /// (data pruned between pages) still yields a correct, monotonic result. + /// + /// The page returned by the resumed backend read (chronological). + /// The boundary timestamp the resume read started at (inclusive). + /// How many head samples at to drop. + /// The page with the already-emitted boundary ties trimmed from the head. + public static IReadOnlyList TrimBoundaryDuplicates( + IReadOnlyList resumedPage, + DateTime boundaryUtc, + int boundarySkipCount) + { + if (boundarySkipCount <= 0 || resumedPage.Count == 0) return resumedPage; + + var dropped = 0; + var i = 0; + while (i < resumedPage.Count + && dropped < boundarySkipCount + && (resumedPage[i].SourceTimestampUtc ?? DateTime.MinValue) == boundaryUtc) + { + i++; + dropped++; + } + + if (i == 0) return resumedPage; + // Slice off the trimmed head; return a copy so the caller owns a plain list (no SDK coupling). + var trimmed = new List(resumedPage.Count - i); + for (var j = i; j < resumedPage.Count; j++) trimmed.Add(resumedPage[j]); + return trimmed; + } +} diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer/OtOpcUaNodeManager.cs b/src/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer/OtOpcUaNodeManager.cs index 747b3d43..15abc256 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer/OtOpcUaNodeManager.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer/OtOpcUaNodeManager.cs @@ -152,6 +152,23 @@ public sealed class OtOpcUaNodeManager : CustomNodeManager2 set => _historianDataSource = value ?? NullHistorianDataSource.Instance; } + private volatile IHistoryContinuationStore _historyContinuationStore = new SessionHistoryContinuationStore(); + + /// + /// The store that holds the server-side resume state behind an opaque HistoryRead continuation + /// point for the count-capped variable-history arms (Raw / Processed). The default + /// binds points to the OPC UA session — so they are + /// capped (ServerConfiguration.MaxHistoryContinuationPoints, SDK default 100, oldest-evicted) + /// and disposed when the session closes. Exposed (internal) so the session-less in-process tests can + /// inject an and exercise the full multi-page round + /// trip through the same dispatch path. Assigning null restores the session-backed default. + /// + internal IHistoryContinuationStore HistoryContinuationStore + { + get => _historyContinuationStore; + set => _historyContinuationStore = value ?? new SessionHistoryContinuationStore(); + } + /// Look up a materialised Part 9 alarm-condition node by its alarm node id (the /// ScriptedAlarmId), or null if not yet materialised. Exposed for tests + diagnostics. /// The alarm node identifier (== ScriptedAlarmId). @@ -1328,6 +1345,13 @@ public sealed class OtOpcUaNodeManager : CustomNodeManager2 /// Serve a HistoryRead-Raw request over the pre-filtered historized variable handles, dispatching /// each to . Modified-history reads /// (IsReadModified) are unsupported — we don't serve a modified-value history surface. + /// + /// Raw is the only arm that pages server-side: ReadRawModifiedDetails carries a client + /// count cap (NumValuesPerNode), so a page that returns exactly that many samples MAY + /// have more behind it ⇒ a time-based continuation point is emitted (see + /// ). An inbound continuation point on a node resumes its stored + /// read. NumValuesPerNode == 0 ("all values") never pages. + /// /// protected override void HistoryReadRawModified( ServerSystemContext context, @@ -1339,6 +1363,7 @@ public sealed class OtOpcUaNodeManager : CustomNodeManager2 List nodesToProcess, IDictionary cache) { + var session = context.OperationContext?.Session; foreach (var handle in nodesToProcess) { if (details.IsReadModified) @@ -1348,12 +1373,9 @@ public sealed class OtOpcUaNodeManager : CustomNodeManager2 continue; } - ServeNode(handle, results, errors, (source, tagname) => source.ReadRawAsync( - tagname, - details.StartTime, - details.EndTime, - details.NumValuesPerNode, - CancellationToken.None)); + ServeRawPaged( + handle, session, nodesToRead, results, errors, + details.StartTime, details.EndTime, details.NumValuesPerNode); } } @@ -1362,7 +1384,9 @@ public sealed class OtOpcUaNodeManager : CustomNodeManager2 /// parallel AggregateType collection — the base guarantees it is the same length as /// nodesToRead) to a and dispatching to /// . An unknown aggregate yields - /// BadAggregateNotSupported for that node. + /// BadAggregateNotSupported for that node. Single-shot (no continuation point): + /// ReadProcessedDetails carries no client count cap — the bucket count is deterministic + /// (window / interval) — so there is no "full page" signal to page on. /// protected override void HistoryReadProcessed( ServerSystemContext context, @@ -1374,6 +1398,8 @@ public sealed class OtOpcUaNodeManager : CustomNodeManager2 List nodesToProcess, IDictionary cache) { + // OPC UA ProcessingInterval is a Duration in milliseconds — convert once per batch. + var interval = TimeSpan.FromMilliseconds(details.ProcessingInterval); foreach (var handle in nodesToProcess) { // AggregateType is a per-node parallel collection (same length as nodesToRead, enforced by @@ -1386,12 +1412,16 @@ public sealed class OtOpcUaNodeManager : CustomNodeManager2 continue; } + // Processed is SINGLE-SHOT (no continuation point). Unlike Raw, ReadProcessedDetails carries + // NO client count cap (NumValuesPerNode) — the bucket count is deterministic (window / interval) + // and the single-shot backend returns every bucket in one read, so there is no "full page ⇒ + // maybe more" signal to page on. Returning the complete aggregate result with a null CP is + // spec-conformant (OPC UA Part 11 lets a server return all available data in one response). ServeNode(handle, results, errors, (source, tagname) => source.ReadProcessedAsync( tagname, details.StartTime, details.EndTime, - // OPC UA ProcessingInterval is a Duration in milliseconds. - TimeSpan.FromMilliseconds(details.ProcessingInterval), + interval, aggregate.Value, CancellationToken.None)); } @@ -1399,7 +1429,9 @@ public sealed class OtOpcUaNodeManager : CustomNodeManager2 /// /// Serve a HistoryRead-AtTime request, dispatching the requested timestamps to - /// . + /// . Single-shot (no continuation point): + /// AtTime carries no client count cap — the request IS the timestamp list and the result is + /// exactly one sample per requested timestamp — so there is no "full page" signal to page on. /// protected override void HistoryReadAtTime( ServerSystemContext context, @@ -1613,7 +1645,9 @@ public sealed class OtOpcUaNodeManager : CustomNodeManager2 // No source samples ⇒ GoodNoData (the node is historized, the window just held no data). StatusCode = historyData.DataValues.Count == 0 ? StatusCodes.GoodNoData : StatusCodes.Good, HistoryData = new ExtensionObject(historyData), - // We never issue continuation points — every read returns the full window in one shot. + // Single-shot arms (Processed / AtTime) never page — the backend returns the complete + // result in one read (no client count cap to detect a "full page" against), so no + // continuation point. Raw pages via ServeRawPaged, not this helper. ContinuationPoint = null, }; errors[handle.Index] = ServiceResult.Good; @@ -1632,6 +1666,168 @@ public sealed class OtOpcUaNodeManager : CustomNodeManager2 } } + /// + /// Serve one historized variable handle for a HistoryRead-Raw request WITH server-side + /// continuation-point paging. The single-shot Wonderware backend does not page, so paging is + /// synthesised time-based: + /// + /// Fresh read (no inbound continuation point): read the window from + /// details.StartTime to capped at + /// . If the page comes back FULL (exactly the cap, and the + /// cap is > 0), store a resume cursor and emit a continuation point. + /// Resume read (inbound continuation point present): take the stored cursor, read + /// the next page from the boundary forward, trim already-emitted boundary ties, and emit a + /// FRESH continuation point only if THIS page is also full — else null (done). + /// + /// The resume cursor is tie-safe (see / + /// ): the next page resumes from the boundary + /// timestamp INCLUSIVE and drops the head ties already returned, so samples sharing the boundary + /// SourceTimestamp are neither duplicated nor skipped. Continuation points live in + /// — session-bound + capped in production. Per-node error + /// isolation matches : a backend throw / an unknown continuation point + /// becomes a Bad status for THIS node only and never throws out of the batch. + /// + /// The pre-filtered node handle; handle.Index indexes results/errors. + /// The session the read runs under (null on the session-less in-process path). + /// The per-node read list; nodesToRead[handle.Index].ContinuationPoint + /// carries the inbound continuation point (non-null ⇒ a resume read). + /// The service-level results list to fill at handle.Index. + /// The service-level errors list to fill at handle.Index. + /// The request window's (inclusive) lower bound, used for a fresh read. + /// The (inclusive) upper bound of the read window; unchanged across pages. + /// The client's per-page cap; 0 means "all values, no paging". + private void ServeRawPaged( + NodeHandle handle, + ISession? session, + IList nodesToRead, + IList results, + IList errors, + DateTime startTimeUtc, + DateTime endUtc, + uint numValuesPerNode) + { + var inboundCp = nodesToRead[handle.Index].ContinuationPoint; + + try + { + DateTime startUtc; + var boundarySkip = 0; + + string tagname; + if (inboundCp is { Length: > 0 }) + { + // Resume read: take the stored cursor. A miss (unknown / evicted / malformed point) ⇒ + // BadContinuationPointInvalid for THIS node. + var state = _historyContinuationStore.TryTake(session, inboundCp); + if (state is null) + { + errors[handle.Index] = StatusCodes.BadContinuationPointInvalid; + results[handle.Index] = new SdkHistoryReadResult { StatusCode = StatusCodes.BadContinuationPointInvalid }; + return; + } + + tagname = state.Tagname; + startUtc = state.NextStartUtc; + boundarySkip = state.BoundarySkipCount; + endUtc = state.EndUtc; + numValuesPerNode = state.NumValuesPerNode; + } + else + { + // Fresh read: resolve the node's historian tagname (as ServeNode does). + var idString = handle.NodeId.Identifier?.ToString(); + if (idString is null || !TryGetHistorizedTagname(idString, out var resolved) || resolved is null) + { + errors[handle.Index] = StatusCodes.BadHistoryOperationUnsupported; + return; + } + + tagname = resolved; + startUtc = startTimeUtc; + } + + // HistoryRead is NOT under the node-manager Lock — block-bridging the async source is safe. + var sourceResult = HistorianDataSource + .ReadRawAsync(tagname, startUtc, endUtc, numValuesPerNode, CancellationToken.None) + .GetAwaiter().GetResult(); + + // On a resume read, drop the boundary ties already returned on the prior page. + var samples = inboundCp is { Length: > 0 } + ? HistoryPaging.TrimBoundaryDuplicates(sourceResult.Samples, startUtc, boundarySkip) + : sourceResult.Samples; + + // The "full page" test is against the RAW backend count (before trimming): the backend honoured + // the cap, so a full backend page ⇒ there may be more even if we trimmed some boundary ties. + byte[]? outboundCp = null; + if (HistoryPaging.IsFullPage(sourceResult.Samples.Count, numValuesPerNode) && samples.Count > 0) + { + HistoryPaging.ComputeResumeCursor(samples, out var nextStart, out var skip); + var nextState = new HistoryContinuationState( + HistoryReadKind.Raw, tagname, nextStart, endUtc, skip, numValuesPerNode, + Aggregate: default, IntervalTicks: 0); + // Save may return null (no session on this request) ⇒ degrade to single-shot for this node. + outboundCp = _historyContinuationStore.Save(session, nextState); + } + + var historyData = ToHistoryDataFromSamples(samples); + results[handle.Index] = new SdkHistoryReadResult + { + // No samples ⇒ GoodNoData (the node is historized, the window just held no data). + StatusCode = samples.Count == 0 ? StatusCodes.GoodNoData : StatusCodes.Good, + HistoryData = new ExtensionObject(historyData), + ContinuationPoint = outboundCp, + }; + errors[handle.Index] = ServiceResult.Good; + } + catch (Exception ex) + { + // One node's backend failure must not throw out of the batch — Bad for THIS node only. +#pragma warning disable CS0618 // Type or member is obsolete + Utils.LogError(ex, "OtOpcUaNodeManager: HistoryReadRaw (paged) failed for node {0}", handle.NodeId); +#pragma warning restore CS0618 + errors[handle.Index] = StatusCodes.BadHistoryOperationUnsupported; + } + } + + /// + /// Drop the resume state for any continuation points the client asked to release + /// (releaseContinuationPoints == true) and return WITHOUT reading data, per OPC UA Part 4. + /// The base dispatcher routes a release-only HistoryRead here (it never reaches the per-details + /// arms), so this is the single place that must free Raw's stored cursors. Each handle's released + /// point is nodesToRead[handle.Index].ContinuationPoint; releasing an unknown / null point + /// is a harmless no-op. Errors are left Good (the base pre-seeds them) — a release does not fail. + /// + protected override void HistoryReleaseContinuationPoints( + ServerSystemContext context, + IList nodesToRead, + IList errors, + List nodesToProcess, + IDictionary cache) + { + var session = context.OperationContext?.Session; + foreach (var handle in nodesToProcess) + { + var cp = nodesToRead[handle.Index].ContinuationPoint; + if (cp is { Length: > 0 }) + { + _historyContinuationStore.Release(session, cp); + } + + errors[handle.Index] = ServiceResult.Good; + } + } + + /// Project a plain sample list into an SDK (the paged Raw path + /// works on a trimmed rather than a whole ). + /// The samples to project (already trimmed of boundary duplicates). + /// The populated SDK . + private static HistoryData ToHistoryDataFromSamples(IReadOnlyList samples) + { + var values = new DataValueCollection(samples.Count); + foreach (var sample in samples) values.Add(ToSdkDataValue(sample)); + return new HistoryData { DataValues = values }; + } + /// /// Map an OPC UA Part 13 standard-aggregate function NodeId to our /// . Returns null for any aggregate we don't serve so diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests/HistoryPagingTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests/HistoryPagingTests.cs new file mode 100644 index 00000000..5ac84381 --- /dev/null +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests/HistoryPagingTests.cs @@ -0,0 +1,214 @@ +using Opc.Ua; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; + +namespace ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests; + +/// +/// Pure unit tests for the server-side HistoryRead paging decisions +/// () and the continuation-point store +/// ( / round-trip + cap). No server, no session, +/// no SDK — these drive the decision surface directly so the boundary / tie / cap logic is pinned +/// independent of the (operator-driven) live wire-level round trip. +/// +public sealed class HistoryPagingTests +{ + private static DataValueSnapshot Sample(double v, DateTime src) => + new(v, StatusCodes.Good, src, src); + + // --- IsFullPage ----------------------------------------------------------------------------- + + [Fact] + public void IsFullPage_full_count_with_finite_cap_emits() + { + // Backend returned exactly the cap ⇒ may be more ⇒ page. + HistoryPaging.IsFullPage(returnedCount: 100, numValuesPerNode: 100).ShouldBeTrue(); + } + + [Fact] + public void IsFullPage_short_page_does_not_emit() + { + // Fewer than the cap ⇒ last page ⇒ no continuation point. + HistoryPaging.IsFullPage(returnedCount: 37, numValuesPerNode: 100).ShouldBeFalse(); + } + + [Fact] + public void IsFullPage_unlimited_request_never_emits() + { + // NumValuesPerNode == 0 means "all values, no limit" (OPC UA Part 11) ⇒ never page. + HistoryPaging.IsFullPage(returnedCount: 5000, numValuesPerNode: 0).ShouldBeFalse(); + } + + [Fact] + public void IsFullPage_over_cap_still_emits() + { + // A backend that ignores the cap and over-returns is still treated as "maybe more". + HistoryPaging.IsFullPage(returnedCount: 101, numValuesPerNode: 100).ShouldBeTrue(); + } + + // --- ComputeResumeCursor -------------------------------------------------------------------- + + [Fact] + public void ComputeResumeCursor_distinct_timestamps_resumes_at_last_with_single_skip() + { + var t0 = new DateTime(2026, 1, 1, 0, 0, 0, DateTimeKind.Utc); + var page = new[] + { + Sample(1, t0), + Sample(2, t0.AddSeconds(1)), + Sample(3, t0.AddSeconds(2)), + }; + + HistoryPaging.ComputeResumeCursor(page, out var nextStart, out var skip); + + // Resume from the LAST sample's timestamp, inclusive; exactly one tie at the boundary (itself). + nextStart.ShouldBe(t0.AddSeconds(2)); + skip.ShouldBe(1); + } + + [Fact] + public void ComputeResumeCursor_tied_boundary_counts_all_ties() + { + var t0 = new DateTime(2026, 1, 1, 0, 0, 0, DateTimeKind.Utc); + var boundary = t0.AddSeconds(5); + var page = new[] + { + Sample(1, t0), + Sample(2, boundary), // tie 1 + Sample(3, boundary), // tie 2 + Sample(4, boundary), // tie 3 (last) + }; + + HistoryPaging.ComputeResumeCursor(page, out var nextStart, out var skip); + + nextStart.ShouldBe(boundary); + skip.ShouldBe(3); // all three boundary ties were emitted ⇒ next page must drop them. + } + + [Fact] + public void ComputeResumeCursor_null_source_timestamp_falls_back_to_min_value() + { + var page = new[] { new DataValueSnapshot(1.0, StatusCodes.Bad, null, DateTime.UtcNow) }; + + HistoryPaging.ComputeResumeCursor(page, out var nextStart, out var skip); + + nextStart.ShouldBe(DateTime.MinValue); + skip.ShouldBe(1); + } + + // --- TrimBoundaryDuplicates ----------------------------------------------------------------- + + [Fact] + public void TrimBoundaryDuplicates_drops_emitted_ties_keeps_the_rest() + { + var b = new DateTime(2026, 1, 1, 0, 0, 5, DateTimeKind.Utc); + // Resumed read started AT the boundary; the first 2 ties were already emitted last page. + var resumed = new[] + { + Sample(10, b), // already-emitted tie (drop) + Sample(11, b), // already-emitted tie (drop) + Sample(12, b), // NEW un-emitted tie at the boundary (keep) + Sample(13, b.AddSeconds(1)), // past the boundary (keep) + }; + + var trimmed = HistoryPaging.TrimBoundaryDuplicates(resumed, b, boundarySkipCount: 2); + + trimmed.Count.ShouldBe(2); + trimmed[0].Value.ShouldBe(12.0); // the previously-skipped tie is NOT lost. + trimmed[1].Value.ShouldBe(13.0); + } + + [Fact] + public void TrimBoundaryDuplicates_zero_skip_is_identity() + { + var b = DateTime.UtcNow; + var resumed = new[] { Sample(1, b), Sample(2, b.AddSeconds(1)) }; + HistoryPaging.TrimBoundaryDuplicates(resumed, b, boundarySkipCount: 0) + .ShouldBeSameAs(resumed); + } + + [Fact] + public void TrimBoundaryDuplicates_only_trims_matching_boundary_timestamps() + { + var b = new DateTime(2026, 1, 1, 0, 0, 5, DateTimeKind.Utc); + // Skip count says 3, but only the first sample matches the boundary ⇒ trim stops at the mismatch. + var resumed = new[] + { + Sample(10, b), + Sample(11, b.AddSeconds(1)), + Sample(12, b.AddSeconds(2)), + }; + + var trimmed = HistoryPaging.TrimBoundaryDuplicates(resumed, b, boundarySkipCount: 3); + + trimmed.Count.ShouldBe(2); + trimmed[0].Value.ShouldBe(11.0); + } + + // --- InMemoryHistoryContinuationStore (mirrors the production session store contract) -------- + + [Fact] + public void Store_round_trips_state_through_opaque_bytes() + { + var store = new InMemoryHistoryContinuationStore(); + var state = new HistoryContinuationState( + HistoryReadKind.Raw, "WW.Tag", new DateTime(2026, 1, 1, 0, 0, 0, DateTimeKind.Utc), + new DateTime(2026, 1, 2, 0, 0, 0, DateTimeKind.Utc), BoundarySkipCount: 2, + NumValuesPerNode: 100, Aggregate: default, IntervalTicks: 0); + + var cp = store.Save(session: null, state); + + cp.ShouldNotBeNull(); + cp!.Length.ShouldBe(16); // a 16-byte Guid, matching the production store's encoding. + + var taken = store.TryTake(session: null, cp); + taken.ShouldBe(state); + } + + [Fact] + public void Store_take_is_single_use() + { + var store = new InMemoryHistoryContinuationStore(); + var cp = store.Save(null, RawState())!; + + store.TryTake(null, cp).ShouldNotBeNull(); + // Second take of the SAME point misses ⇒ caller surfaces BadContinuationPointInvalid. + store.TryTake(null, cp).ShouldBeNull(); + } + + [Fact] + public void Store_release_drops_the_point() + { + var store = new InMemoryHistoryContinuationStore(); + var cp = store.Save(null, RawState())!; + + store.Release(null, cp); + store.TryTake(null, cp).ShouldBeNull(); + } + + [Fact] + public void Store_malformed_point_is_a_miss() + { + var store = new InMemoryHistoryContinuationStore(); + store.TryTake(null, new byte[] { 1, 2, 3 }).ShouldBeNull(); // not 16 bytes + store.TryTake(null, Array.Empty()).ShouldBeNull(); + } + + [Fact] + public void Store_evicts_oldest_over_capacity() + { + var store = new InMemoryHistoryContinuationStore(capacity: 2); + var cp1 = store.Save(null, RawState())!; + var cp2 = store.Save(null, RawState())!; + var cp3 = store.Save(null, RawState())!; // pushes cp1 out (oldest-eviction) + + store.TryTake(null, cp1).ShouldBeNull(); // evicted + store.TryTake(null, cp2).ShouldNotBeNull(); // retained + store.TryTake(null, cp3).ShouldNotBeNull(); // retained + } + + private static HistoryContinuationState RawState() => new( + HistoryReadKind.Raw, "WW.Tag", DateTime.UtcNow, DateTime.UtcNow.AddHours(1), + BoundarySkipCount: 1, NumValuesPerNode: 50, Aggregate: default, IntervalTicks: 0); +} diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests/NodeManagerHistoryReadPagingTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests/NodeManagerHistoryReadPagingTests.cs new file mode 100644 index 00000000..97fb3ef4 --- /dev/null +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests/NodeManagerHistoryReadPagingTests.cs @@ -0,0 +1,371 @@ +using Microsoft.Extensions.Logging.Abstractions; +using Opc.Ua; +using Opc.Ua.Server; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using HistorianRead = ZB.MOM.WW.OtOpcUa.Core.Abstractions.HistoryReadResult; +using SdkHistoryReadResult = Opc.Ua.HistoryReadResult; + +namespace ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests; + +/// +/// Phase-D server-side HistoryRead continuation-point paging for the Raw arm. Boots a real +/// (as does), injects an +/// (the in-process harness uses a SESSION-LESS +/// OperationContext, so the production session-backed store has no session to bind to — the +/// in-memory store exercises the SAME dispatch path), and a series-backed fake historian, then drives +/// the full multi-page round trip through the node manager's public HistoryRead. +/// +public sealed class NodeManagerHistoryReadPagingTests : IDisposable +{ + private static CancellationToken Ct => TestContext.Current.CancellationToken; + + private readonly string _pkiRoot = Path.Combine( + Path.GetTempPath(), $"otopcua-historypaging-{Guid.NewGuid():N}"); + + private static readonly DateTime Epoch = new(2026, 1, 1, 0, 0, 0, DateTimeKind.Utc); + + /// A full first page emits a continuation point; resuming returns subsequent pages; the + /// final short page clears the continuation point — and the union of pages is the complete series with + /// no duplicate or skipped sample. (Resumed pages net slightly fewer than the cap because each resume + /// re-reads + trims the boundary sample to stay tie-safe — see the design; the union is what matters.) + [Fact] + public async Task Raw_pages_full_series_across_continuation_points() + { + var (host, server) = await BootAsync(); + var nm = server.NodeManager!; + nm.HistoryContinuationStore = new InMemoryHistoryContinuationStore(); + + // 250 distinct-timestamp samples; client caps each page at 100. + var series = MakeSeries(count: 250, stepSeconds: 1); + nm.HistorianDataSource = new SeriesHistorianDataSource(series); + + nm.EnsureVariable("eq-1/pv", parentFolderNodeId: null, displayName: "PV", dataType: "Double", + writable: false, historianTagname: "WW.PV"); + var nodeId = nm.TryGetVariable("eq-1/pv")!.NodeId; + + var collected = new List(); + byte[]? cp = null; + var pageCount = 0; + var sawCp = false; + do + { + var (values, error, nextCp) = ReadRaw(nm, nodeId, start: Epoch, end: Epoch.AddHours(1), + max: 100, inboundCp: cp); + error.StatusCode.Code.ShouldBe(StatusCodes.Good); + collected.AddRange(values); + cp = nextCp; + if (cp is not null) sawCp = true; + pageCount++; + pageCount.ShouldBeLessThan(20, "paging must terminate, not loop"); + } + while (cp is not null); + + sawCp.ShouldBeTrue("a 250-sample series capped at 100 must page"); + pageCount.ShouldBeGreaterThan(1); + + // The union is the exact series — no duplicates, no skips, in order. + collected.Count.ShouldBe(250); + collected.ShouldBe(Enumerable.Range(0, 250).Select(i => (double)i)); + + await host.DisposeAsync(); + } + + /// An exactly-full series whose total is a multiple of the cap pages to a trailing + /// EMPTY page (GoodNoData, no CP) — the server can't know the prior page was the last until it reads + /// past the end. + [Fact] + public async Task Raw_exact_multiple_terminates_with_empty_final_page() + { + var (host, server) = await BootAsync(); + var nm = server.NodeManager!; + nm.HistoryContinuationStore = new InMemoryHistoryContinuationStore(); + nm.HistorianDataSource = new SeriesHistorianDataSource(MakeSeries(count: 100, stepSeconds: 1)); + + nm.EnsureVariable("eq-1/exact", parentFolderNodeId: null, displayName: "Exact", dataType: "Double", + writable: false, historianTagname: "WW.Exact"); + var nodeId = nm.TryGetVariable("eq-1/exact")!.NodeId; + + var (r1, _, cp1) = ReadRaw(nm, nodeId, Epoch, Epoch.AddHours(1), max: 100, inboundCp: null); + r1.Count.ShouldBe(100); + cp1.ShouldNotBeNull("a full page emits a CP even when it happens to be the last"); + + var (r2, e2, cp2) = ReadRaw(nm, nodeId, Epoch, Epoch.AddHours(1), max: 100, inboundCp: cp1); + r2.Count.ShouldBe(0); + e2.StatusCode.Code.ShouldBe(StatusCodes.Good); + cp2.ShouldBeNull("the empty trailing page terminates paging"); + + await host.DisposeAsync(); + } + + /// Boundary timestamps that TIE across the page split are neither duplicated nor skipped: + /// the resume cursor drops the already-emitted ties and re-reads the un-emitted ones at the same + /// timestamp. + [Fact] + public async Task Raw_paging_dedups_tied_boundary_timestamps() + { + var (host, server) = await BootAsync(); + var nm = server.NodeManager!; + nm.HistoryContinuationStore = new InMemoryHistoryContinuationStore(); + + // 5 samples: indices 2,3,4 all share the SAME timestamp (Epoch+2s). With cap 4, page 1 returns + // [0,1,2,3] (cutting through the tie cluster); page 2 must return [4] (the un-emitted tie) only. + var ts = new[] + { + Epoch, // 0 + Epoch.AddSeconds(1), // 1 + Epoch.AddSeconds(2), // 2 ┐ + Epoch.AddSeconds(2), // 3 │ tied at Epoch+2s + Epoch.AddSeconds(2), // 4 ┘ + }; + var series = ts.Select((t, i) => new DataValueSnapshot((double)i, StatusCodes.Good, t, t)).ToArray(); + nm.HistorianDataSource = new SeriesHistorianDataSource(series); + + nm.EnsureVariable("eq-1/tie", parentFolderNodeId: null, displayName: "Tie", dataType: "Double", + writable: false, historianTagname: "WW.Tie"); + var nodeId = nm.TryGetVariable("eq-1/tie")!.NodeId; + + var collected = new List(); + + var (r1, _, cp1) = ReadRaw(nm, nodeId, Epoch, Epoch.AddHours(1), max: 4, inboundCp: null); + r1.Count.ShouldBe(4); + r1.ShouldBe(new[] { 0.0, 1.0, 2.0, 3.0 }); + cp1.ShouldNotBeNull(); + collected.AddRange(r1); + + var (r2, _, cp2) = ReadRaw(nm, nodeId, Epoch, Epoch.AddHours(1), max: 4, inboundCp: cp1); + // Only the un-emitted tie (index 4) — NOT the already-emitted 2 and 3. + r2.ShouldBe(new[] { 4.0 }); + cp2.ShouldBeNull(); + collected.AddRange(r2); + + // Exactly the 5 distinct samples, once each. + collected.ShouldBe(new[] { 0.0, 1.0, 2.0, 3.0, 4.0 }); + + await host.DisposeAsync(); + } + + /// NumValuesPerNode == 0 ("all values") never pages — the whole series returns in one shot + /// with a null continuation point. + [Fact] + public async Task Raw_unlimited_request_returns_everything_without_a_cp() + { + var (host, server) = await BootAsync(); + var nm = server.NodeManager!; + nm.HistoryContinuationStore = new InMemoryHistoryContinuationStore(); + nm.HistorianDataSource = new SeriesHistorianDataSource(MakeSeries(count: 250, stepSeconds: 1)); + + nm.EnsureVariable("eq-1/all", parentFolderNodeId: null, displayName: "All", dataType: "Double", + writable: false, historianTagname: "WW.All"); + var nodeId = nm.TryGetVariable("eq-1/all")!.NodeId; + + var (r, e, cp) = ReadRaw(nm, nodeId, Epoch, Epoch.AddHours(1), max: 0, inboundCp: null); + + e.StatusCode.Code.ShouldBe(StatusCodes.Good); + r.Count.ShouldBe(250); + cp.ShouldBeNull("an unlimited (max==0) request must not page"); + + await host.DisposeAsync(); + } + + /// An inbound continuation point the store doesn't know (released / never issued / from + /// another node) yields BadContinuationPointInvalid for that node, and the source is NOT read. + [Fact] + public async Task Raw_unknown_continuation_point_yields_BadContinuationPointInvalid() + { + var (host, server) = await BootAsync(); + var nm = server.NodeManager!; + nm.HistoryContinuationStore = new InMemoryHistoryContinuationStore(); + var fake = new SeriesHistorianDataSource(MakeSeries(count: 10, stepSeconds: 1)); + nm.HistorianDataSource = fake; + + nm.EnsureVariable("eq-1/bad-cp", parentFolderNodeId: null, displayName: "BadCp", dataType: "Double", + writable: false, historianTagname: "WW.BadCp"); + var nodeId = nm.TryGetVariable("eq-1/bad-cp")!.NodeId; + + fake.ResetReadCount(); + var bogus = Guid.NewGuid().ToByteArray(); + var (_, e, cp) = ReadRaw(nm, nodeId, Epoch, Epoch.AddHours(1), max: 100, inboundCp: bogus); + + e.StatusCode.Code.ShouldBe(StatusCodes.BadContinuationPointInvalid); + cp.ShouldBeNull(); + fake.ReadCount.ShouldBe(0); // never reached the backend. + + await host.DisposeAsync(); + } + + /// releaseContinuationPoints drops the stored cursor WITHOUT reading data: a subsequent + /// resume of the released point then misses (BadContinuationPointInvalid). + [Fact] + public async Task Release_drops_the_cursor_without_reading() + { + var (host, server) = await BootAsync(); + var nm = server.NodeManager!; + nm.HistoryContinuationStore = new InMemoryHistoryContinuationStore(); + var fake = new SeriesHistorianDataSource(MakeSeries(count: 250, stepSeconds: 1)); + nm.HistorianDataSource = fake; + + nm.EnsureVariable("eq-1/rel", parentFolderNodeId: null, displayName: "Rel", dataType: "Double", + writable: false, historianTagname: "WW.Rel"); + var nodeId = nm.TryGetVariable("eq-1/rel")!.NodeId; + + // Page 1 — get a CP. + var (_, _, cp1) = ReadRaw(nm, nodeId, Epoch, Epoch.AddHours(1), max: 100, inboundCp: null); + cp1.ShouldNotBeNull(); + + // Release it — the dispatcher routes releaseContinuationPoints=true to HistoryReleaseContinuationPoints, + // which never reaches the Raw arm. No data is read. + fake.ResetReadCount(); + InvokeRelease(nm, nodeId, cp1!); + fake.ReadCount.ShouldBe(0); + + // Resuming the released point now misses. + var (_, e, _) = ReadRaw(nm, nodeId, Epoch, Epoch.AddHours(1), max: 100, inboundCp: cp1); + e.StatusCode.Code.ShouldBe(StatusCodes.BadContinuationPointInvalid); + + await host.DisposeAsync(); + } + + // --- helpers -------------------------------------------------------------------------------- + + /// Issue a single-node Raw HistoryRead and return that node's decoded sample values, error, + /// and outbound continuation point. + private static (IReadOnlyList Values, ServiceResult Error, byte[]? Cp) ReadRaw( + OtOpcUaNodeManager nm, NodeId nodeId, DateTime start, DateTime end, uint max, byte[]? inboundCp) + { + var context = new OperationContext( + new RequestHeader(), secureChannelContext: null, RequestType.HistoryRead, identity: null); + + var details = new ReadRawModifiedDetails + { + StartTime = start, + EndTime = end, + NumValuesPerNode = max, + IsReadModified = false, + }; + + var nodesToRead = new List + { + new() { NodeId = nodeId, ContinuationPoint = inboundCp }, + }; + var results = new List { null! }; + var errors = new List { null! }; + + nm.HistoryRead(context, details, TimestampsToReturn.Both, + releaseContinuationPoints: false, nodesToRead, results, errors); + + var values = new List(); + if (results[0]?.HistoryData is { } ho && !ExtensionObject.IsNull(ho)) + { + var data = (HistoryData)ExtensionObject.ToEncodeable(ho); + foreach (var dv in data.DataValues) values.Add((double)dv.Value); + } + + return (values, errors[0], results[0]?.ContinuationPoint); + } + + /// Issue a release-only HistoryRead (releaseContinuationPoints=true) for the node + point. + private static void InvokeRelease(OtOpcUaNodeManager nm, NodeId nodeId, byte[] cp) + { + var context = new OperationContext( + new RequestHeader(), secureChannelContext: null, RequestType.HistoryRead, identity: null); + var details = new ReadRawModifiedDetails + { + StartTime = Epoch, EndTime = Epoch.AddHours(1), NumValuesPerNode = 100, IsReadModified = false, + }; + var nodesToRead = new List { new() { NodeId = nodeId, ContinuationPoint = cp } }; + var results = new List { null! }; + var errors = new List { null! }; + + nm.HistoryRead(context, details, TimestampsToReturn.Both, + releaseContinuationPoints: true, nodesToRead, results, errors); + } + + private static DataValueSnapshot[] MakeSeries(int count, int stepSeconds) => + Enumerable.Range(0, count) + .Select(i => + { + var t = Epoch.AddSeconds((long)i * stepSeconds); + return new DataValueSnapshot((double)i, StatusCodes.Good, t, t); + }) + .ToArray(); + + /// A series-backed fake historian: holds a full sorted series and serves a Raw read by + /// returning the samples in [start, end] capped at maxValuesPerNode (start inclusive — exactly the + /// resume-cursor contract). Processed / AtTime / Events are not exercised here. + private sealed class SeriesHistorianDataSource(IReadOnlyList series) : IHistorianDataSource + { + private int _readCount; + public int ReadCount => _readCount; + public void ResetReadCount() => _readCount = 0; + + public Task ReadRawAsync( + string fullReference, DateTime startUtc, DateTime endUtc, uint maxValuesPerNode, + CancellationToken cancellationToken) + { + Interlocked.Increment(ref _readCount); + var window = series + .Where(s => (s.SourceTimestampUtc ?? DateTime.MinValue) >= startUtc + && (s.SourceTimestampUtc ?? DateTime.MinValue) <= endUtc) + .ToList(); + // maxValuesPerNode == 0 means "no limit". + var page = maxValuesPerNode == 0 ? window : window.Take((int)maxValuesPerNode).ToList(); + return Task.FromResult(new HistorianRead(page, null)); + } + + public Task ReadProcessedAsync( + string fullReference, DateTime startUtc, DateTime endUtc, TimeSpan interval, + HistoryAggregateType aggregate, CancellationToken cancellationToken) => + Task.FromResult(new HistorianRead(Array.Empty(), null)); + + public Task ReadAtTimeAsync( + string fullReference, IReadOnlyList timestampsUtc, CancellationToken cancellationToken) => + Task.FromResult(new HistorianRead(Array.Empty(), null)); + + public Task ReadEventsAsync( + string? sourceName, DateTime startUtc, DateTime endUtc, int maxEvents, + CancellationToken cancellationToken) => + Task.FromResult(new HistoricalEventsResult(Array.Empty(), null)); + + public HistorianHealthSnapshot GetHealthSnapshot() => NullHistorianDataSource.Instance.GetHealthSnapshot(); + + public void Dispose() { } + } + + private async Task<(OpcUaApplicationHost Host, OtOpcUaSdkServer Server)> BootAsync() + { + var host = new OpcUaApplicationHost( + new OpcUaApplicationHostOptions + { + ApplicationName = "OtOpcUa.HistoryPagingTest", + ApplicationUri = $"urn:OtOpcUa.HistoryPagingTest:{Guid.NewGuid():N}", + OpcUaPort = AllocateFreePort(), + PublicHostname = "localhost", + PkiStoreRoot = _pkiRoot, + }, + NullLogger.Instance); + + var server = new OtOpcUaSdkServer(); + await host.StartAsync(server, Ct); + return (host, server); + } + + private static int AllocateFreePort() + { + using var listener = new System.Net.Sockets.TcpListener(System.Net.IPAddress.Loopback, 0); + listener.Start(); + var port = ((System.Net.IPEndPoint)listener.LocalEndpoint).Port; + listener.Stop(); + return port; + } + + public void Dispose() + { + if (Directory.Exists(_pkiRoot)) + { + try { Directory.Delete(_pkiRoot, recursive: true); } + catch { /* best-effort cleanup */ } + } + } +}