fix(historian): address code review on Raw HistoryRead paging
C1 (critical): a boundary tie cluster larger than NumValuesPerNode could silently truncate a resumed read to GoodNoData, permanently dropping the un-emitted ties — the (timestamp, skip) cursor cannot advance past a single timestamp the fixed-(start,end,cap) backend keeps re-returning. Now detected and failed LOUDLY per node with BadHistoryOperationUnsupported + a log naming the tag/timestamp/cap; documented in Historian.md with the larger-cap remedy. Regression test Raw_tie_cluster_larger_than_page_fails_loudly_not_silently. I3: build HistoryData before Save() so a projection failure can never orphan a stored continuation cursor. N1 (YAGNI): drop the never-produced HistoryReadKind enum + Processed-only Aggregate/IntervalTicks fields from HistoryContinuationState — only Raw pages. N3: ComputeResumeCursor guards its documented non-empty precondition. I1: document InMemoryHistoryContinuationStore's eventual-consistency (test double). Build clean, 182/182 OpcUaServer tests pass.
This commit is contained in:
@@ -137,6 +137,17 @@ paging time-based:
|
|||||||
SourceTimestamp *inclusive* and drops the boundary samples already emitted, so samples sharing
|
SourceTimestamp *inclusive* and drops the boundary samples already emitted, so samples sharing
|
||||||
the boundary timestamp are neither duplicated nor skipped.
|
the boundary timestamp are neither duplicated nor skipped.
|
||||||
|
|
||||||
|
> **Paging limitation — oversized tie clusters.** The tie-safe cursor is a `(timestamp, skip)`
|
||||||
|
> pair, and the single-shot backend only accepts `(start, end, cap)` — it cannot skip. So if **more
|
||||||
|
> samples share one `SourceTimestamp` than `NumValuesPerNode`** (a tie cluster larger than the page
|
||||||
|
> cap), the cursor cannot advance past that timestamp: every resume re-reads the same first `cap`
|
||||||
|
> ties. Rather than silently truncate the read to `GoodNoData` (which would permanently drop the
|
||||||
|
> un-emitted ties), the resume read fails that node **loudly** with
|
||||||
|
> `BadHistoryOperationUnsupported` and logs the tag + timestamp + cap. The operator's remedy is to
|
||||||
|
> re-issue the read with a larger `NumValuesPerNode`. For a single tag's raw history this is a data
|
||||||
|
> anomaly (raw samples normally carry strictly increasing distinct timestamps); a fully cursor-based
|
||||||
|
> fix that pages *within* a single timestamp is a possible follow-up.
|
||||||
|
|
||||||
Continuation points are bound to the OPC UA session (the SDK's
|
Continuation points are bound to the OPC UA session (the SDK's
|
||||||
`ServerConfiguration.MaxHistoryContinuationPoints` cap, default 100, with oldest-eviction; points
|
`ServerConfiguration.MaxHistoryContinuationPoints` cap, default 100, with oldest-eviction; points
|
||||||
are disposed when the session closes). Resuming an unknown / evicted / released point returns
|
are disposed when the session closes). Resuming an unknown / evicted / released point returns
|
||||||
|
|||||||
@@ -105,7 +105,11 @@ internal sealed class InMemoryHistoryContinuationStore(int capacity = 100) : IHi
|
|||||||
{
|
{
|
||||||
private readonly object _gate = new();
|
private readonly object _gate = new();
|
||||||
private readonly Dictionary<Guid, HistoryContinuationState> _states = new();
|
private readonly Dictionary<Guid, HistoryContinuationState> _states = new();
|
||||||
// Insertion order, so we can evict the OLDEST when over capacity (matches the SDK store).
|
// Insertion order, so we can evict the OLDEST when over capacity (matches the SDK store). Eventually
|
||||||
|
// consistent: a Guid taken/released stays in _order until the eviction loop reaches it and finds it
|
||||||
|
// already gone from _states (a harmless no-op dequeue). _states is the source of truth for liveness;
|
||||||
|
// _order only ever over-approximates it, so eviction never drops a LIVE entry early. Fine for a
|
||||||
|
// bounded test double — production uses the SDK's own per-session store, not this class.
|
||||||
private readonly Queue<Guid> _order = new();
|
private readonly Queue<Guid> _order = new();
|
||||||
private readonly int _capacity = capacity < 1 ? 1 : capacity;
|
private readonly int _capacity = capacity < 1 ? 1 : capacity;
|
||||||
|
|
||||||
|
|||||||
@@ -3,25 +3,12 @@ using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
|||||||
namespace ZB.MOM.WW.OtOpcUa.OpcUaServer;
|
namespace ZB.MOM.WW.OtOpcUa.OpcUaServer;
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// The kind of variable-history read a continuation point resumes. Only the two count-capped,
|
/// The server-side resume state stored behind an opaque continuation point for a single paged
|
||||||
/// time-range arms page server-side (see <see cref="HistoryPaging"/>); AtTime is single-shot
|
/// HistoryRead-Raw read. Captures exactly enough to continue the SAME logical read from where the
|
||||||
/// (no client count cap, so there is never a "full page" signal to page on) and never produces a
|
/// previous page stopped: the tagname, the original (inclusive) end of the window, the next start of
|
||||||
/// continuation point, so it has no entry here.
|
/// the window, and the tie-safe boundary skip. (Only Raw pages server-side — Processed and AtTime
|
||||||
/// </summary>
|
/// carry no client count cap, so they are single-shot and never produce a continuation point; see
|
||||||
internal enum HistoryReadKind
|
/// <see cref="HistoryPaging"/>.)
|
||||||
{
|
|
||||||
/// <summary>HistoryRead-Raw — resumes via <see cref="IHistorianDataSource.ReadRawAsync"/>.</summary>
|
|
||||||
Raw,
|
|
||||||
|
|
||||||
/// <summary>HistoryRead-Processed — resumes via <see cref="IHistorianDataSource.ReadProcessedAsync"/>.</summary>
|
|
||||||
Processed,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// The server-side resume state stored behind an opaque continuation point for a single
|
|
||||||
/// paged variable-history read. Captures exactly enough to continue the SAME logical read from
|
|
||||||
/// where the previous page stopped: the read kind + tagname, the original (inclusive) end of the
|
|
||||||
/// window, the next start of the window, and — for Processed — the aggregate + interval.
|
|
||||||
/// <para>
|
/// <para>
|
||||||
/// The boundary fields (<see cref="NextStartUtc"/> + <see cref="BoundarySkipCount"/>) encode a
|
/// The boundary fields (<see cref="NextStartUtc"/> + <see cref="BoundarySkipCount"/>) encode a
|
||||||
/// tie-safe resume cursor: the next page reads from <see cref="NextStartUtc"/> INCLUSIVE and
|
/// tie-safe resume cursor: the next page reads from <see cref="NextStartUtc"/> INCLUSIVE and
|
||||||
@@ -34,7 +21,6 @@ internal enum HistoryReadKind
|
|||||||
/// cheap value that unit tests can drive directly.
|
/// cheap value that unit tests can drive directly.
|
||||||
/// </para>
|
/// </para>
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <param name="Kind">Which variable-history arm this state resumes.</param>
|
|
||||||
/// <param name="Tagname">The resolved historian tagname (NOT the NodeId) to read from.</param>
|
/// <param name="Tagname">The resolved historian tagname (NOT the NodeId) to read from.</param>
|
||||||
/// <param name="NextStartUtc">
|
/// <param name="NextStartUtc">
|
||||||
/// Inclusive lower bound for the next page — the boundary timestamp the previous page stopped on.
|
/// Inclusive lower bound for the next page — the boundary timestamp the previous page stopped on.
|
||||||
@@ -45,17 +31,12 @@ internal enum HistoryReadKind
|
|||||||
/// prior pages and must be dropped from the head of the next page (tie de-dup).
|
/// prior pages and must be dropped from the head of the next page (tie de-dup).
|
||||||
/// </param>
|
/// </param>
|
||||||
/// <param name="NumValuesPerNode">The client's per-page cap; re-applied to every resumed page.</param>
|
/// <param name="NumValuesPerNode">The client's per-page cap; re-applied to every resumed page.</param>
|
||||||
/// <param name="Aggregate">The aggregate for a Processed read; ignored for Raw.</param>
|
|
||||||
/// <param name="IntervalTicks">The Processed bucketing interval in ticks; ignored for Raw.</param>
|
|
||||||
internal sealed record HistoryContinuationState(
|
internal sealed record HistoryContinuationState(
|
||||||
HistoryReadKind Kind,
|
|
||||||
string Tagname,
|
string Tagname,
|
||||||
DateTime NextStartUtc,
|
DateTime NextStartUtc,
|
||||||
DateTime EndUtc,
|
DateTime EndUtc,
|
||||||
int BoundarySkipCount,
|
int BoundarySkipCount,
|
||||||
uint NumValuesPerNode,
|
uint NumValuesPerNode);
|
||||||
HistoryAggregateType Aggregate,
|
|
||||||
long IntervalTicks);
|
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Pure server-side continuation-point paging decisions for the count-capped variable-history arms
|
/// Pure server-side continuation-point paging decisions for the count-capped variable-history arms
|
||||||
@@ -111,6 +92,11 @@ internal static class HistoryPaging
|
|||||||
out DateTime nextStartUtc,
|
out DateTime nextStartUtc,
|
||||||
out int boundarySkipCount)
|
out int boundarySkipCount)
|
||||||
{
|
{
|
||||||
|
// Enforce the documented precondition at the API boundary rather than relying on the caller's guard
|
||||||
|
// (the only caller only pages a full, non-empty page, but this is a public static helper).
|
||||||
|
if (page.Count == 0)
|
||||||
|
throw new ArgumentOutOfRangeException(nameof(page), "ComputeResumeCursor requires a non-empty page.");
|
||||||
|
|
||||||
// The boundary is the last returned sample's SourceTimestamp. A sample whose SourceTimestamp is
|
// The boundary is the last returned sample's SourceTimestamp. A sample whose SourceTimestamp is
|
||||||
// null (Bad/unset) cannot anchor a time cursor; fall back to MinValue so the next read covers the
|
// null (Bad/unset) cannot anchor a time cursor; fall back to MinValue so the next read covers the
|
||||||
// whole remaining window rather than silently dropping data — duplicates are then de-duped by the
|
// whole remaining window rather than silently dropping data — duplicates are then de-duped by the
|
||||||
|
|||||||
@@ -1751,28 +1751,53 @@ public sealed class OtOpcUaNodeManager : CustomNodeManager2
|
|||||||
.ReadRawAsync(tagname, startUtc, endUtc, numValuesPerNode, CancellationToken.None)
|
.ReadRawAsync(tagname, startUtc, endUtc, numValuesPerNode, CancellationToken.None)
|
||||||
.GetAwaiter().GetResult();
|
.GetAwaiter().GetResult();
|
||||||
|
|
||||||
|
var backendFull = HistoryPaging.IsFullPage(sourceResult.Samples.Count, numValuesPerNode);
|
||||||
|
|
||||||
// On a resume read, drop the boundary ties already returned on the prior page.
|
// On a resume read, drop the boundary ties already returned on the prior page.
|
||||||
var samples = inboundCp is { Length: > 0 }
|
var samples = inboundCp is { Length: > 0 }
|
||||||
? HistoryPaging.TrimBoundaryDuplicates(sourceResult.Samples, startUtc, boundarySkip)
|
? HistoryPaging.TrimBoundaryDuplicates(sourceResult.Samples, startUtc, boundarySkip)
|
||||||
: sourceResult.Samples;
|
: sourceResult.Samples;
|
||||||
|
|
||||||
|
// Degenerate tie cluster: a resume read returned a FULL backend page that the boundary-tie trim
|
||||||
|
// emptied entirely. That can only happen when more than NumValuesPerNode samples share the resume
|
||||||
|
// boundary timestamp — a tie cluster larger than the page cap. The fixed-(start,end,cap) backend
|
||||||
|
// can only ever return the first `cap` of those ties, so a (timestamp, skip) cursor can never
|
||||||
|
// advance past the cluster. Fail LOUDLY for this node rather than silently truncate to GoodNoData
|
||||||
|
// (which would permanently drop the un-emitted ties). The operator's remedy is a larger
|
||||||
|
// NumValuesPerNode; see docs/Historian.md "Paging limitation".
|
||||||
|
if (inboundCp is { Length: > 0 } && backendFull && samples.Count == 0)
|
||||||
|
{
|
||||||
|
#pragma warning disable CS0618 // Type or member is obsolete
|
||||||
|
Utils.LogError(
|
||||||
|
"OtOpcUaNodeManager: HistoryReadRaw paging stalled — tie cluster at {0:O} for tag '{1}' " +
|
||||||
|
"exceeds NumValuesPerNode={2}; cannot page past it. Increase NumValuesPerNode.",
|
||||||
|
startUtc, tagname, numValuesPerNode);
|
||||||
|
#pragma warning restore CS0618
|
||||||
|
errors[handle.Index] = StatusCodes.BadHistoryOperationUnsupported;
|
||||||
|
results[handle.Index] = new SdkHistoryReadResult { StatusCode = StatusCodes.BadHistoryOperationUnsupported };
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// The "full page" test is against the RAW backend count (before trimming): the backend honoured
|
// The "full page" test is against the RAW backend count (before trimming): the backend honoured
|
||||||
// the cap, so a full backend page ⇒ there may be more even if we trimmed some boundary ties.
|
// the cap, so a full backend page ⇒ there may be more even if we trimmed some boundary ties.
|
||||||
|
var historyData = ToHistoryDataFromSamples(samples);
|
||||||
|
|
||||||
byte[]? outboundCp = null;
|
byte[]? outboundCp = null;
|
||||||
if (HistoryPaging.IsFullPage(sourceResult.Samples.Count, numValuesPerNode) && samples.Count > 0)
|
if (backendFull && samples.Count > 0)
|
||||||
{
|
{
|
||||||
HistoryPaging.ComputeResumeCursor(samples, out var nextStart, out var skip);
|
HistoryPaging.ComputeResumeCursor(samples, out var nextStart, out var skip);
|
||||||
var nextState = new HistoryContinuationState(
|
var nextState = new HistoryContinuationState(
|
||||||
HistoryReadKind.Raw, tagname, nextStart, endUtc, skip, numValuesPerNode,
|
tagname, nextStart, endUtc, skip, numValuesPerNode);
|
||||||
Aggregate: default, IntervalTicks: 0);
|
|
||||||
// Save may return null (no session on this request) ⇒ degrade to single-shot for this node.
|
// Save may return null (no session on this request) ⇒ degrade to single-shot for this node.
|
||||||
|
// Built AFTER historyData so a failure projecting samples can never orphan a stored cursor.
|
||||||
outboundCp = _historyContinuationStore.Save(session, nextState);
|
outboundCp = _historyContinuationStore.Save(session, nextState);
|
||||||
}
|
}
|
||||||
|
|
||||||
var historyData = ToHistoryDataFromSamples(samples);
|
|
||||||
results[handle.Index] = new SdkHistoryReadResult
|
results[handle.Index] = new SdkHistoryReadResult
|
||||||
{
|
{
|
||||||
// No samples ⇒ GoodNoData (the node is historized, the window just held no data).
|
// No samples ⇒ GoodNoData (the node is historized, the window just held no data). With the
|
||||||
|
// degenerate-cluster guard above, a resumed empty page now only means the window/cluster is
|
||||||
|
// genuinely drained — never silent data loss.
|
||||||
StatusCode = samples.Count == 0 ? StatusCodes.GoodNoData : StatusCodes.Good,
|
StatusCode = samples.Count == 0 ? StatusCodes.GoodNoData : StatusCodes.Good,
|
||||||
HistoryData = new ExtensionObject(historyData),
|
HistoryData = new ExtensionObject(historyData),
|
||||||
ContinuationPoint = outboundCp,
|
ContinuationPoint = outboundCp,
|
||||||
|
|||||||
@@ -153,9 +153,9 @@ public sealed class HistoryPagingTests
|
|||||||
{
|
{
|
||||||
var store = new InMemoryHistoryContinuationStore();
|
var store = new InMemoryHistoryContinuationStore();
|
||||||
var state = new HistoryContinuationState(
|
var state = new HistoryContinuationState(
|
||||||
HistoryReadKind.Raw, "WW.Tag", new DateTime(2026, 1, 1, 0, 0, 0, DateTimeKind.Utc),
|
"WW.Tag", new DateTime(2026, 1, 1, 0, 0, 0, DateTimeKind.Utc),
|
||||||
new DateTime(2026, 1, 2, 0, 0, 0, DateTimeKind.Utc), BoundarySkipCount: 2,
|
new DateTime(2026, 1, 2, 0, 0, 0, DateTimeKind.Utc), BoundarySkipCount: 2,
|
||||||
NumValuesPerNode: 100, Aggregate: default, IntervalTicks: 0);
|
NumValuesPerNode: 100);
|
||||||
|
|
||||||
var cp = store.Save(session: null, state);
|
var cp = store.Save(session: null, state);
|
||||||
|
|
||||||
@@ -209,6 +209,6 @@ public sealed class HistoryPagingTests
|
|||||||
}
|
}
|
||||||
|
|
||||||
private static HistoryContinuationState RawState() => new(
|
private static HistoryContinuationState RawState() => new(
|
||||||
HistoryReadKind.Raw, "WW.Tag", DateTime.UtcNow, DateTime.UtcNow.AddHours(1),
|
"WW.Tag", DateTime.UtcNow, DateTime.UtcNow.AddHours(1),
|
||||||
BoundarySkipCount: 1, NumValuesPerNode: 50, Aggregate: default, IntervalTicks: 0);
|
BoundarySkipCount: 1, NumValuesPerNode: 50);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -146,6 +146,45 @@ public sealed class NodeManagerHistoryReadPagingTests : IDisposable
|
|||||||
await host.DisposeAsync();
|
await host.DisposeAsync();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>Degenerate tie cluster larger than the page cap: a single timestamp carrying MORE ties
|
||||||
|
/// than <c>NumValuesPerNode</c> cannot be paged past by a (timestamp, skip) cursor — the fixed-(start,
|
||||||
|
/// end,cap) backend keeps returning the same first <c>cap</c> ties. Rather than silently truncate to
|
||||||
|
/// GoodNoData (permanently dropping the un-emitted ties), the resume read FAILS LOUDLY for that node
|
||||||
|
/// with <c>BadHistoryOperationUnsupported</c>. (Regression for the data-loss path the carry-offset
|
||||||
|
/// cursor cannot resolve; the operator's remedy is a larger NumValuesPerNode.)</summary>
|
||||||
|
[Fact]
|
||||||
|
public async Task Raw_tie_cluster_larger_than_page_fails_loudly_not_silently()
|
||||||
|
{
|
||||||
|
var (host, server) = await BootAsync();
|
||||||
|
var nm = server.NodeManager!;
|
||||||
|
nm.HistoryContinuationStore = new InMemoryHistoryContinuationStore();
|
||||||
|
|
||||||
|
// 6 samples ALL sharing one timestamp (Epoch+2s) — a tie cluster of 6 with a page cap of 4.
|
||||||
|
var t = Epoch.AddSeconds(2);
|
||||||
|
var series = Enumerable.Range(0, 6)
|
||||||
|
.Select(i => new DataValueSnapshot((double)i, StatusCodes.Good, t, t)).ToArray();
|
||||||
|
nm.HistorianDataSource = new SeriesHistorianDataSource(series);
|
||||||
|
|
||||||
|
nm.EnsureVariable("eq-1/burst", parentFolderNodeId: null, displayName: "Burst", dataType: "Double",
|
||||||
|
writable: false, historianTagname: "WW.Burst");
|
||||||
|
var nodeId = nm.TryGetVariable("eq-1/burst")!.NodeId;
|
||||||
|
|
||||||
|
// Page 1: a full page of the first 4 ties, with a continuation point.
|
||||||
|
var (r1, e1, cp1) = ReadRaw(nm, nodeId, Epoch, Epoch.AddHours(1), max: 4, inboundCp: null);
|
||||||
|
e1.StatusCode.Code.ShouldBe(StatusCodes.Good);
|
||||||
|
r1.ShouldBe(new[] { 0.0, 1.0, 2.0, 3.0 });
|
||||||
|
cp1.ShouldNotBeNull();
|
||||||
|
|
||||||
|
// Page 2: the cursor cannot advance past the oversized cluster ⇒ a clear error, NOT a silent
|
||||||
|
// GoodNoData that would drop samples 4 and 5.
|
||||||
|
var (r2, e2, cp2) = ReadRaw(nm, nodeId, Epoch, Epoch.AddHours(1), max: 4, inboundCp: cp1);
|
||||||
|
e2.StatusCode.Code.ShouldBe(StatusCodes.BadHistoryOperationUnsupported);
|
||||||
|
r2.ShouldBeEmpty();
|
||||||
|
cp2.ShouldBeNull();
|
||||||
|
|
||||||
|
await host.DisposeAsync();
|
||||||
|
}
|
||||||
|
|
||||||
/// <summary>NumValuesPerNode == 0 ("all values") never pages — the whole series returns in one shot
|
/// <summary>NumValuesPerNode == 0 ("all values") never pages — the whole series returns in one shot
|
||||||
/// with a null continuation point.</summary>
|
/// with a null continuation point.</summary>
|
||||||
[Fact]
|
[Fact]
|
||||||
|
|||||||
Reference in New Issue
Block a user