feat(historian): page within oversized tie clusters (#400) instead of loud-failing
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
using Akka.Actor;
|
||||
using Akka.Cluster.Tools.PublishSubscribe;
|
||||
using Akka.Hosting;
|
||||
using Microsoft.Extensions.Configuration;
|
||||
using Microsoft.Extensions.Hosting;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
@@ -10,6 +11,7 @@ using ZB.MOM.WW.OtOpcUa.OpcUaServer;
|
||||
using ZB.MOM.WW.OtOpcUa.OpcUaServer.Security;
|
||||
using ZB.MOM.WW.OtOpcUa.Runtime;
|
||||
using ZB.MOM.WW.OtOpcUa.Runtime.Drivers;
|
||||
using ZB.MOM.WW.OtOpcUa.Runtime.Historian;
|
||||
using ZB.MOM.WW.OtOpcUa.Runtime.ScriptedAlarms;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Host.OpcUa;
|
||||
@@ -32,6 +34,7 @@ public sealed class OtOpcUaServerHostedService : IHostedService, IAsyncDisposabl
|
||||
private readonly DeferredServiceLevelPublisher _deferredServiceLevel;
|
||||
private readonly IOpcUaUserAuthenticator _userAuthenticator;
|
||||
private readonly IHistorianDataSource _historianDataSource;
|
||||
private readonly ServerHistorianOptions _serverHistorianOptions;
|
||||
private readonly Func<ActorSystem> _actorSystemAccessor;
|
||||
private readonly ActorRegistry _actorRegistry;
|
||||
private readonly ILoggerFactory _loggerFactory;
|
||||
@@ -52,6 +55,11 @@ public sealed class OtOpcUaServerHostedService : IHostedService, IAsyncDisposabl
|
||||
/// node, the same source the address-space sink + node-write gateway come from), or the configured
|
||||
/// Wonderware read client when <c>AddServerHistorian</c> enabled it. Wired onto the node manager in
|
||||
/// <see cref="StartAsync"/>.</param>
|
||||
/// <param name="configuration">App configuration; the <c>ServerHistorian</c> section is bound here to
|
||||
/// read <see cref="ServerHistorianOptions.MaxTieClusterOverfetch"/> for the node manager. Bound directly
|
||||
/// (not via <c>IOptions</c>) so the wiring is self-contained in this hosted service — the option's
|
||||
/// default survives when the section is absent. The companion <c>AddServerHistorian</c> registration in
|
||||
/// <c>Program.cs</c> reads the same section for the read-client gating; this is the read-path only.</param>
|
||||
/// <param name="actorSystemAccessor">Lazy accessor for the running <see cref="ActorSystem"/>, used to
|
||||
/// resolve the DistributedPubSub mediator the inbound alarm-command router publishes through. Resolved
|
||||
/// lazily (mirroring <c>DpsScriptLogPublisher</c>) so construction never races Akka startup.</param>
|
||||
@@ -65,6 +73,7 @@ public sealed class OtOpcUaServerHostedService : IHostedService, IAsyncDisposabl
|
||||
DeferredServiceLevelPublisher deferredServiceLevel,
|
||||
IOpcUaUserAuthenticator userAuthenticator,
|
||||
IHistorianDataSource historianDataSource,
|
||||
IConfiguration configuration,
|
||||
Func<ActorSystem> actorSystemAccessor,
|
||||
ActorRegistry actorRegistry,
|
||||
ILoggerFactory loggerFactory)
|
||||
@@ -74,6 +83,11 @@ public sealed class OtOpcUaServerHostedService : IHostedService, IAsyncDisposabl
|
||||
_deferredServiceLevel = deferredServiceLevel;
|
||||
_userAuthenticator = userAuthenticator;
|
||||
_historianDataSource = historianDataSource;
|
||||
// Bind the ServerHistorian section directly (defaults survive when it's absent) — we only need
|
||||
// MaxTieClusterOverfetch here, and binding it ourselves keeps the wiring in this one file.
|
||||
_serverHistorianOptions =
|
||||
configuration.GetSection(ServerHistorianOptions.SectionName).Get<ServerHistorianOptions>()
|
||||
?? new ServerHistorianOptions();
|
||||
_actorSystemAccessor = actorSystemAccessor;
|
||||
_actorRegistry = actorRegistry;
|
||||
_loggerFactory = loggerFactory;
|
||||
@@ -201,6 +215,12 @@ public sealed class OtOpcUaServerHostedService : IHostedService, IAsyncDisposabl
|
||||
// The node manager's HistoryRead overrides block-bridge to whatever source is set here.
|
||||
_server.SetHistorianDataSource(_historianDataSource);
|
||||
|
||||
// Propagate the configured tie-cluster over-fetch bound onto the node manager so HistoryRead-Raw
|
||||
// can page WITHIN an oversized tie cluster (more samples sharing one SourceTimestamp than the page
|
||||
// cap) rather than failing the read. The option's default (65536) survives when the ServerHistorian
|
||||
// section is absent; NodeManager is non-null here (guarded above).
|
||||
_server.NodeManager.MaxTieClusterOverfetch = _serverHistorianOptions.MaxTieClusterOverfetch;
|
||||
|
||||
// ServiceLevel publisher needs IServerInternal — only available after Start.
|
||||
if (_server.CurrentInstance is { } serverInternal)
|
||||
{
|
||||
|
||||
@@ -150,4 +150,68 @@ internal static class HistoryPaging
|
||||
for (var j = i; j < resumedPage.Count; j++) trimmed.Add(resumedPage[j]);
|
||||
return trimmed;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Page WITHIN a single oversized "tie cluster" — a set of raw samples that all share one
|
||||
/// SourceTimestamp <paramref name="boundaryT"/> and is larger than the client's per-page cap.
|
||||
/// <para>
|
||||
/// The fixed-<c>(start, end, cap)</c> historian backend cannot skip/offset, so an oversized
|
||||
/// tie cluster defeats the (timestamp, skip) resume cursor that <see cref="ComputeResumeCursor"/>
|
||||
/// builds: every resume re-reads the first <c>cap</c> ties, the boundary-tie trim empties the
|
||||
/// page, and the cursor never advances. To page past it, the caller over-fetches the WHOLE
|
||||
/// cluster (a bounded <c>start == end == T</c> read) and hands it here. We then carve out the
|
||||
/// next <paramref name="cap"/> ties starting at <paramref name="skip"/> and compute a cursor
|
||||
/// that advances within the cluster, then steps off it when it is drained.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// <b>Advance is lossless.</b> When the slice drains the cluster (<c>emitted == clusterCount</c>)
|
||||
/// we resume from <c>T + 1 tick</c> with a fresh skip of <c>0</c>. This skips no data because no
|
||||
/// <see cref="DateTime"/> value exists strictly between <c>T</c> and <c>T + 1 tick</c> — a tick
|
||||
/// (100 ns) is the type's resolution — so every remaining sample in the window has a timestamp
|
||||
/// of at least <c>T + 1 tick</c>. We do NOT resume inclusively at <c>T</c> here (the way
|
||||
/// <see cref="ComputeResumeCursor"/> does for cross-boundary ties): we have already over-fetched
|
||||
/// and emitted the entire <c>T</c> cluster, so resuming at <c>T</c> would needlessly re-read it.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// <b>Short-page-with-CP exception.</b> A page that fully drains the cluster but is SHORTER than
|
||||
/// <paramref name="cap"/> (the cluster had fewer than <c>cap</c> remaining ties) STILL emits a
|
||||
/// continuation point when the window extends past <c>T</c> (<c>T + 1 tick <= endUtc</c>). This
|
||||
/// deliberately violates the usual "short page ⇒ terminal" rule (<see cref="IsFullPage"/>):
|
||||
/// the page is short only because the cluster ran out, not because the window did, so there may
|
||||
/// still be data after <c>T</c> that the next page must read.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// <b>Self-heal.</b> If <paramref name="skip"/> meets or exceeds <paramref name="clusterCount"/>
|
||||
/// (e.g. a stale cursor against a cluster that shrank between reads), <c>sliceStart</c> clamps to
|
||||
/// <paramref name="clusterCount"/> and <c>sliceCount</c> is <c>0</c>; since <c>emitted</c> then
|
||||
/// equals <paramref name="clusterCount"/>, the cursor advances/terminates rather than looping.
|
||||
/// </para>
|
||||
/// </summary>
|
||||
/// <param name="clusterCount">The total number of ties at <paramref name="boundaryT"/> (the over-fetched cluster size).</param>
|
||||
/// <param name="skip">How many ties at <paramref name="boundaryT"/> were already emitted on prior pages.</param>
|
||||
/// <param name="cap">The client's per-page cap (<c>NumValuesPerNode</c>); must be > 0 to make progress.</param>
|
||||
/// <param name="boundaryT">The single SourceTimestamp every tie in the cluster shares.</param>
|
||||
/// <param name="endUtc">The (inclusive) upper bound of the read window; unchanged across pages.</param>
|
||||
/// <param name="sliceStart">The index into the cluster the emitted slice starts at (clamped to the count).</param>
|
||||
/// <param name="sliceCount">How many ties the emitted slice contains (may be <c>0</c> on a stale-skip self-heal).</param>
|
||||
/// <param name="nextStartUtc">The next page's inclusive start: <paramref name="boundaryT"/> while the
|
||||
/// cluster still has un-emitted ties, <c>boundaryT + 1 tick</c> once it is drained and the window
|
||||
/// remains, or <c>null</c> when the window is exhausted (terminal — no continuation point).</param>
|
||||
/// <param name="nextSkip">The next page's boundary skip count: the running emitted-tie total while the
|
||||
/// cluster drains, else <c>0</c> after advancing past it.</param>
|
||||
public static void SliceTieCluster(
|
||||
int clusterCount, int skip, uint cap, DateTime boundaryT, DateTime endUtc,
|
||||
out int sliceStart, out int sliceCount, out DateTime? nextStartUtc, out int nextSkip)
|
||||
{
|
||||
sliceStart = Math.Min(skip, clusterCount);
|
||||
sliceCount = Math.Min((int)cap, clusterCount - sliceStart);
|
||||
var emitted = sliceStart + sliceCount;
|
||||
if (emitted < clusterCount) { nextStartUtc = boundaryT; nextSkip = emitted; }
|
||||
else
|
||||
{
|
||||
var next = boundaryT.AddTicks(1);
|
||||
if (next <= endUtc) { nextStartUtc = next; nextSkip = 0; }
|
||||
else { nextStartUtc = null; nextSkip = 0; }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -179,6 +179,19 @@ public sealed class OtOpcUaNodeManager : CustomNodeManager2
|
||||
set => _historianDataSource = value ?? NullHistorianDataSource.Instance;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// The upper bound on the bounded over-fetch <see cref="ServeRawPaged"/> uses to page WITHIN an
|
||||
/// oversized "tie cluster" — more raw samples sharing one SourceTimestamp than the client's per-page
|
||||
/// cap. When a resume read stalls on such a cluster (the boundary-tie trim empties the page), the
|
||||
/// paging over-fetches up to <c>MaxTieClusterOverfetch + 1</c> ties at that single timestamp (a
|
||||
/// <c>start == end</c> read) and slices through them via <see cref="HistoryPaging.SliceTieCluster"/>.
|
||||
/// A cluster strictly larger than this still surfaces <c>BadHistoryOperationUnsupported</c> for that
|
||||
/// node (the absurd-burst backstop). Mirrors the configured
|
||||
/// <c>ServerHistorianOptions.MaxTieClusterOverfetch</c>; the Host sets it at <c>StartAsync</c>. The
|
||||
/// default (65536) survives when the historian section is absent.
|
||||
/// </summary>
|
||||
public int MaxTieClusterOverfetch { get; set; } = 65536;
|
||||
|
||||
private volatile IHistoryContinuationStore _historyContinuationStore = new SessionHistoryContinuationStore();
|
||||
|
||||
/// <summary>
|
||||
@@ -1871,23 +1884,70 @@ public sealed class OtOpcUaNodeManager : CustomNodeManager2
|
||||
? HistoryPaging.TrimBoundaryDuplicates(sourceResult.Samples, startUtc, boundarySkip)
|
||||
: sourceResult.Samples;
|
||||
|
||||
// Degenerate tie cluster: a resume read returned a FULL backend page that the boundary-tie trim
|
||||
// emptied entirely. That can only happen when more than NumValuesPerNode samples share the resume
|
||||
// boundary timestamp — a tie cluster larger than the page cap. The fixed-(start,end,cap) backend
|
||||
// can only ever return the first `cap` of those ties, so a (timestamp, skip) cursor can never
|
||||
// advance past the cluster. Fail LOUDLY for this node rather than silently truncate to GoodNoData
|
||||
// (which would permanently drop the un-emitted ties). The operator's remedy is a larger
|
||||
// NumValuesPerNode; see docs/Historian.md "Paging limitation".
|
||||
if (inboundCp is { Length: > 0 } && backendFull && samples.Count == 0)
|
||||
// Oversized tie cluster: a resume read whose FULL backend page does not advance past the resume
|
||||
// boundary timestamp `startUtc`. That happens when more samples share `startUtc` than the page
|
||||
// cap — a tie cluster larger than NumValuesPerNode. The fixed-(start,end,cap) backend can only
|
||||
// ever return the first `cap` of those ties, so the (timestamp, skip) cursor alone can never page
|
||||
// past (or even reliably within) the cluster: either the boundary-tie trim empties the page, or
|
||||
// it re-emits the same head ties forever. Detect BOTH stall shapes — an empty trimmed page, or a
|
||||
// non-empty page whose LAST sample is still at `startUtc` (no forward progress) — and, rather
|
||||
// than fail the read, over-fetch the WHOLE cluster with an explicit, BOUNDED cap (a start==end
|
||||
// read at the boundary timestamp) and page WITHIN the timestamp via SliceTieCluster.
|
||||
var stalledOnTieCluster = inboundCp is { Length: > 0 }
|
||||
&& backendFull
|
||||
&& (samples.Count == 0
|
||||
|| (samples[^1].SourceTimestampUtc ?? DateTime.MinValue) == startUtc);
|
||||
if (stalledOnTieCluster)
|
||||
{
|
||||
// The over-fetch cap MUST be explicit and non-zero: a cap of 0 falls back to the backend's
|
||||
// MaxValuesPerRead, which would re-introduce the very stall we're escaping. +1 over the bound
|
||||
// lets us DETECT a cluster strictly larger than the bound (the absurd-burst backstop below).
|
||||
var overfetchCap = (uint)(MaxTieClusterOverfetch + 1);
|
||||
var cluster = HistorianDataSource
|
||||
.ReadRawAsync(tagname, startUtc, startUtc, overfetchCap, CancellationToken.None)
|
||||
.GetAwaiter().GetResult().Samples;
|
||||
|
||||
// Absurd burst: more ties than we're willing to buffer in memory. Preserve today's loud-fail
|
||||
// for that node rather than over-fetch an unbounded cluster; the operator's remedy is a
|
||||
// larger ServerHistorian:MaxTieClusterOverfetch (or NumValuesPerNode).
|
||||
if (cluster.Count > MaxTieClusterOverfetch)
|
||||
{
|
||||
#pragma warning disable CS0618 // Type or member is obsolete
|
||||
Utils.LogError(
|
||||
"OtOpcUaNodeManager: HistoryReadRaw paging stalled — tie cluster at {0:O} for tag '{1}' " +
|
||||
"exceeds NumValuesPerNode={2}; cannot page past it. Increase NumValuesPerNode.",
|
||||
startUtc, tagname, numValuesPerNode);
|
||||
Utils.LogError(
|
||||
"OtOpcUaNodeManager: HistoryReadRaw tie cluster at {0:O} for tag '{1}' has {2} samples, " +
|
||||
"exceeding MaxTieClusterOverfetch={3}; cannot page within it. Increase MaxTieClusterOverfetch.",
|
||||
startUtc, tagname, cluster.Count, MaxTieClusterOverfetch);
|
||||
#pragma warning restore CS0618
|
||||
errors[handle.Index] = StatusCodes.BadHistoryOperationUnsupported;
|
||||
results[handle.Index] = new SdkHistoryReadResult { StatusCode = StatusCodes.BadHistoryOperationUnsupported };
|
||||
errors[handle.Index] = StatusCodes.BadHistoryOperationUnsupported;
|
||||
results[handle.Index] = new SdkHistoryReadResult { StatusCode = StatusCodes.BadHistoryOperationUnsupported };
|
||||
return;
|
||||
}
|
||||
|
||||
HistoryPaging.SliceTieCluster(
|
||||
cluster.Count, boundarySkip, numValuesPerNode, startUtc, endUtc,
|
||||
out var sliceStart, out var sliceCount, out var nextStartUtc, out var nextSkip);
|
||||
|
||||
var slice = new List<DataValueSnapshot>(sliceCount);
|
||||
for (var i = sliceStart; i < sliceStart + sliceCount; i++) slice.Add(cluster[i]);
|
||||
|
||||
// Emit a continuation point only when SliceTieCluster says the read continues (within the
|
||||
// cluster, or past it while the window remains). nextSkip is the boundary skip for the next
|
||||
// page — within the cluster it counts the ties already emitted at startUtc; past it it's 0.
|
||||
byte[]? clusterCp = null;
|
||||
if (nextStartUtc is { } resumeAt)
|
||||
{
|
||||
var clusterState = new HistoryContinuationState(
|
||||
tagname, resumeAt, endUtc, nextSkip, numValuesPerNode);
|
||||
clusterCp = _historyContinuationStore.Save(session, clusterState);
|
||||
}
|
||||
|
||||
results[handle.Index] = new SdkHistoryReadResult
|
||||
{
|
||||
StatusCode = slice.Count == 0 ? StatusCodes.GoodNoData : StatusCodes.Good,
|
||||
HistoryData = new ExtensionObject(ToHistoryDataFromSamples(slice)),
|
||||
ContinuationPoint = clusterCp,
|
||||
};
|
||||
errors[handle.Index] = ServiceResult.Good;
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
@@ -41,6 +41,17 @@ public sealed class ServerHistorianOptions
|
||||
/// <summary>Per-process shared secret the sidecar verifies in the Hello frame.</summary>
|
||||
public string SharedSecret { get; init; } = "";
|
||||
|
||||
/// <summary>
|
||||
/// The upper bound on the bounded over-fetch the HistoryRead-Raw paging uses to page WITHIN an
|
||||
/// oversized "tie cluster" (more raw samples sharing one SourceTimestamp than the client's per-page
|
||||
/// cap). When a resume read stalls on such a cluster, the node manager over-fetches up to
|
||||
/// <c>MaxTieClusterOverfetch + 1</c> ties at that single timestamp (a <c>start == end</c> read) and
|
||||
/// pages through them; this bounds how large a single-timestamp burst the server will buffer in
|
||||
/// memory before it gives up and surfaces <c>BadHistoryOperationUnsupported</c> for that node. The
|
||||
/// default (65536) comfortably covers any realistic same-millisecond burst.
|
||||
/// </summary>
|
||||
public int MaxTieClusterOverfetch { get; init; } = 65536;
|
||||
|
||||
/// <summary>Returns operator-facing misconfiguration warnings for an <c>Enabled</c> historian
|
||||
/// (empty when disabled or correctly configured). Pure — the registration logs each entry.</summary>
|
||||
/// <returns>Zero or more human-readable warning messages.</returns>
|
||||
@@ -52,6 +63,8 @@ public sealed class ServerHistorianOptions
|
||||
warnings.Add("ServerHistorian:SharedSecret is empty while the historian is enabled — the Wonderware sidecar Hello frame will carry an empty secret.");
|
||||
if (Port <= 0)
|
||||
warnings.Add($"ServerHistorian:Port is {Port} — must be > 0; the read client cannot dial the sidecar.");
|
||||
if (MaxTieClusterOverfetch <= 0)
|
||||
warnings.Add($"ServerHistorian:MaxTieClusterOverfetch is {MaxTieClusterOverfetch} — must be > 0; HistoryRead-Raw cannot page within an oversized tie cluster and will surface BadHistoryOperationUnsupported for those reads.");
|
||||
return warnings;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -146,6 +146,112 @@ public sealed class HistoryPagingTests
|
||||
trimmed[0].Value.ShouldBe(11.0);
|
||||
}
|
||||
|
||||
// --- SliceTieCluster ------------------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
public void SliceTieCluster_mid_cluster_emits_cp_at_same_timestamp()
|
||||
{
|
||||
// 10 ties at T; already emitted 2; cap 3. Slice [2,5) → next page resumes AT T, skip 5.
|
||||
var t = new DateTime(2026, 1, 1, 0, 0, 5, DateTimeKind.Utc);
|
||||
var end = t.AddHours(1);
|
||||
|
||||
HistoryPaging.SliceTieCluster(
|
||||
clusterCount: 10, skip: 2, cap: 3, boundaryT: t, endUtc: end,
|
||||
out var sliceStart, out var sliceCount, out var nextStartUtc, out var nextSkip);
|
||||
|
||||
sliceStart.ShouldBe(2);
|
||||
sliceCount.ShouldBe(3);
|
||||
nextStartUtc.ShouldBe(t); // still draining the cluster ⇒ resume AT T
|
||||
nextSkip.ShouldBe(5); // 2 already emitted + 3 just emitted
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void SliceTieCluster_exact_drain_advances_one_tick_when_window_remains()
|
||||
{
|
||||
// 6 ties at T; skip 3; cap 3. Slice [3,6) drains the cluster exactly ⇒ advance to T+1tick.
|
||||
var t = new DateTime(2026, 1, 1, 0, 0, 5, DateTimeKind.Utc);
|
||||
var end = t.AddHours(1);
|
||||
|
||||
HistoryPaging.SliceTieCluster(
|
||||
clusterCount: 6, skip: 3, cap: 3, boundaryT: t, endUtc: end,
|
||||
out var sliceStart, out var sliceCount, out var nextStartUtc, out var nextSkip);
|
||||
|
||||
sliceStart.ShouldBe(3);
|
||||
sliceCount.ShouldBe(3);
|
||||
nextStartUtc.ShouldBe(t.AddTicks(1)); // cluster drained, window remains ⇒ next tick, fresh skip
|
||||
nextSkip.ShouldBe(0);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void SliceTieCluster_short_final_slice_still_emits_cp_when_window_remains()
|
||||
{
|
||||
// 5 ties at T; skip 3; cap 10. Slice [3,5) is SHORT (2 < cap) but it fully drains the cluster
|
||||
// and the window still extends past T ⇒ we MUST emit a CP to read the rest of the window even
|
||||
// though this page is short of the cap.
|
||||
var t = new DateTime(2026, 1, 1, 0, 0, 5, DateTimeKind.Utc);
|
||||
var end = t.AddHours(1);
|
||||
|
||||
HistoryPaging.SliceTieCluster(
|
||||
clusterCount: 5, skip: 3, cap: 10, boundaryT: t, endUtc: end,
|
||||
out var sliceStart, out var sliceCount, out var nextStartUtc, out var nextSkip);
|
||||
|
||||
sliceStart.ShouldBe(3);
|
||||
sliceCount.ShouldBe(2); // short slice
|
||||
nextStartUtc.ShouldBe(t.AddTicks(1)); // but CP emitted so the rest of the window is read
|
||||
nextSkip.ShouldBe(0);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void SliceTieCluster_drained_at_window_end_terminates()
|
||||
{
|
||||
// The cluster ends exactly AT the window end (endUtc == T). Draining it cannot advance past the
|
||||
// window ⇒ terminal (no CP).
|
||||
var t = new DateTime(2026, 1, 1, 0, 0, 5, DateTimeKind.Utc);
|
||||
|
||||
HistoryPaging.SliceTieCluster(
|
||||
clusterCount: 4, skip: 0, cap: 10, boundaryT: t, endUtc: t,
|
||||
out var sliceStart, out var sliceCount, out var nextStartUtc, out var nextSkip);
|
||||
|
||||
sliceStart.ShouldBe(0);
|
||||
sliceCount.ShouldBe(4);
|
||||
nextStartUtc.ShouldBeNull(); // T+1tick > endUtc ⇒ window exhausted ⇒ terminal
|
||||
nextSkip.ShouldBe(0);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void SliceTieCluster_self_heals_when_skip_exceeds_cluster()
|
||||
{
|
||||
// Defensive: a stale skip points past the (re-read, possibly shrunk) cluster. The slice is empty
|
||||
// (count 0) and, since emitted == clusterCount, the cursor advances/terminates rather than looping.
|
||||
var t = new DateTime(2026, 1, 1, 0, 0, 5, DateTimeKind.Utc);
|
||||
var end = t.AddHours(1);
|
||||
|
||||
HistoryPaging.SliceTieCluster(
|
||||
clusterCount: 4, skip: 4, cap: 10, boundaryT: t, endUtc: end,
|
||||
out var sliceStart, out var sliceCount, out var nextStartUtc, out var nextSkip);
|
||||
|
||||
sliceStart.ShouldBe(4);
|
||||
sliceCount.ShouldBe(0); // nothing left in the cluster to emit
|
||||
nextStartUtc.ShouldBe(t.AddTicks(1)); // emitted (4) == clusterCount (4) ⇒ advance, don't loop
|
||||
nextSkip.ShouldBe(0);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void SliceTieCluster_self_heals_to_terminal_at_window_end()
|
||||
{
|
||||
// Same stale-skip self-heal, but the window ends at T ⇒ advancing terminates instead of looping.
|
||||
var t = new DateTime(2026, 1, 1, 0, 0, 5, DateTimeKind.Utc);
|
||||
|
||||
HistoryPaging.SliceTieCluster(
|
||||
clusterCount: 4, skip: 9, cap: 10, boundaryT: t, endUtc: t,
|
||||
out var sliceStart, out var sliceCount, out var nextStartUtc, out var nextSkip);
|
||||
|
||||
sliceStart.ShouldBe(4); // clamped to clusterCount
|
||||
sliceCount.ShouldBe(0);
|
||||
nextStartUtc.ShouldBeNull();
|
||||
nextSkip.ShouldBe(0);
|
||||
}
|
||||
|
||||
// --- InMemoryHistoryContinuationStore (mirrors the production session store contract) --------
|
||||
|
||||
[Fact]
|
||||
|
||||
+91
-17
@@ -146,38 +146,83 @@ public sealed class NodeManagerHistoryReadPagingTests : IDisposable
|
||||
await host.DisposeAsync();
|
||||
}
|
||||
|
||||
/// <summary>Degenerate tie cluster larger than the page cap: a single timestamp carrying MORE ties
|
||||
/// than <c>NumValuesPerNode</c> cannot be paged past by a (timestamp, skip) cursor — the fixed-(start,
|
||||
/// end,cap) backend keeps returning the same first <c>cap</c> ties. Rather than silently truncate to
|
||||
/// GoodNoData (permanently dropping the un-emitted ties), the resume read FAILS LOUDLY for that node
|
||||
/// with <c>BadHistoryOperationUnsupported</c>. (Regression for the data-loss path the carry-offset
|
||||
/// cursor cannot resolve; the operator's remedy is a larger NumValuesPerNode.)</summary>
|
||||
/// <summary>Oversized tie cluster (more ties at one timestamp than the page cap): the (timestamp, skip)
|
||||
/// resume cursor alone cannot advance past it — the fixed-(start,end,cap) backend keeps returning the
|
||||
/// same first <c>cap</c> ties. The paging now over-fetches the WHOLE cluster (a <c>start == end</c> read,
|
||||
/// bounded by <c>MaxTieClusterOverfetch</c>) and pages WITHIN the timestamp, so the read drains the
|
||||
/// cluster (and any data after it) across continuation points with no dup/skip. Here a tie cluster of 3
|
||||
/// (indices 3..7 share one timestamp) sits between distinct samples; with cap 2 the union must be all 10
|
||||
/// values in order.</summary>
|
||||
[Fact]
|
||||
public async Task Raw_tie_cluster_larger_than_page_fails_loudly_not_silently()
|
||||
public async Task Raw_oversized_tie_cluster_pages_within_the_timestamp()
|
||||
{
|
||||
var (host, server) = await BootAsync();
|
||||
var nm = server.NodeManager!;
|
||||
nm.HistoryContinuationStore = new InMemoryHistoryContinuationStore();
|
||||
|
||||
// 6 samples ALL sharing one timestamp (Epoch+2s) — a tie cluster of 6 with a page cap of 4.
|
||||
var t = Epoch.AddSeconds(2);
|
||||
var series = Enumerable.Range(0, 6)
|
||||
.Select(i => new DataValueSnapshot((double)i, StatusCodes.Good, t, t)).ToArray();
|
||||
// 3 distinct, then a tie cluster of 5 (all at Epoch+3s), then 2 more distinct — 10 total. The tie
|
||||
// cluster (5) is larger than the page cap (2), the case that used to stall the cursor.
|
||||
var series = MakeSeriesWithTieCluster(distinctBefore: 3, tieCount: 5, distinctAfter: 2);
|
||||
series.Length.ShouldBe(10);
|
||||
nm.HistorianDataSource = new SeriesHistorianDataSource(series);
|
||||
|
||||
nm.EnsureVariable("eq-1/burst", parentFolderNodeId: null, displayName: "Burst", dataType: "Double",
|
||||
writable: false, historianTagname: "WW.Burst");
|
||||
var nodeId = nm.TryGetVariable("eq-1/burst")!.NodeId;
|
||||
|
||||
// Page 1: a full page of the first 4 ties, with a continuation point.
|
||||
var (r1, e1, cp1) = ReadRaw(nm, nodeId, Epoch, Epoch.AddHours(1), max: 4, inboundCp: null);
|
||||
var collected = new List<double>();
|
||||
byte[]? cp = null;
|
||||
var pageCount = 0;
|
||||
do
|
||||
{
|
||||
var (values, error, nextCp) = ReadRaw(nm, nodeId, Epoch, Epoch.AddHours(1),
|
||||
max: 2, inboundCp: cp);
|
||||
error.StatusCode.Code.ShouldBe(StatusCodes.Good);
|
||||
collected.AddRange(values);
|
||||
cp = nextCp;
|
||||
pageCount++;
|
||||
pageCount.ShouldBeLessThan(20, "paging must terminate, not loop, even through a tie cluster");
|
||||
}
|
||||
while (cp is not null);
|
||||
|
||||
// Every value, once, in order — the cluster was paged through, not dropped or duplicated.
|
||||
collected.Count.ShouldBe(10);
|
||||
collected.ShouldBe(Enumerable.Range(0, 10).Select(i => (double)i));
|
||||
|
||||
await host.DisposeAsync();
|
||||
}
|
||||
|
||||
/// <summary>The absurd-burst backstop is preserved: a tie cluster STRICTLY larger than the configured
|
||||
/// <see cref="OtOpcUaNodeManager.MaxTieClusterOverfetch"/> still surfaces
|
||||
/// <c>BadHistoryOperationUnsupported</c> for that node rather than buffering an unbounded burst. With
|
||||
/// the bound set to 3 and a 5-way tie cluster, the resume read that hits the cluster fails loudly.</summary>
|
||||
[Fact]
|
||||
public async Task Raw_tie_cluster_beyond_overfetch_bound_fails_loudly()
|
||||
{
|
||||
var (host, server) = await BootAsync();
|
||||
var nm = server.NodeManager!;
|
||||
nm.HistoryContinuationStore = new InMemoryHistoryContinuationStore();
|
||||
nm.MaxTieClusterOverfetch = 3; // cluster of 5 will exceed this ⇒ backstop.
|
||||
|
||||
// 5 samples ALL sharing one timestamp (Epoch+2s) — a tie cluster of 5 with a page cap of 2.
|
||||
var t = Epoch.AddSeconds(2);
|
||||
var series = Enumerable.Range(0, 5)
|
||||
.Select(i => new DataValueSnapshot((double)i, StatusCodes.Good, t, t)).ToArray();
|
||||
nm.HistorianDataSource = new SeriesHistorianDataSource(series);
|
||||
|
||||
nm.EnsureVariable("eq-1/absurd", parentFolderNodeId: null, displayName: "Absurd", dataType: "Double",
|
||||
writable: false, historianTagname: "WW.Absurd");
|
||||
var nodeId = nm.TryGetVariable("eq-1/absurd")!.NodeId;
|
||||
|
||||
// Page 1: a full page of the first 2 ties, with a continuation point.
|
||||
var (r1, e1, cp1) = ReadRaw(nm, nodeId, Epoch, Epoch.AddHours(1), max: 2, inboundCp: null);
|
||||
e1.StatusCode.Code.ShouldBe(StatusCodes.Good);
|
||||
r1.ShouldBe(new[] { 0.0, 1.0, 2.0, 3.0 });
|
||||
r1.ShouldBe(new[] { 0.0, 1.0 });
|
||||
cp1.ShouldNotBeNull();
|
||||
|
||||
// Page 2: the cursor cannot advance past the oversized cluster ⇒ a clear error, NOT a silent
|
||||
// GoodNoData that would drop samples 4 and 5.
|
||||
var (r2, e2, cp2) = ReadRaw(nm, nodeId, Epoch, Epoch.AddHours(1), max: 4, inboundCp: cp1);
|
||||
// Page 2: the resume read hits the cluster; over-fetch finds 5 > bound 3 ⇒ a clear error, NOT a
|
||||
// silent GoodNoData that would drop the un-emitted ties.
|
||||
var (r2, e2, cp2) = ReadRaw(nm, nodeId, Epoch, Epoch.AddHours(1), max: 2, inboundCp: cp1);
|
||||
e2.StatusCode.Code.ShouldBe(StatusCodes.BadHistoryOperationUnsupported);
|
||||
r2.ShouldBeEmpty();
|
||||
cp2.ShouldBeNull();
|
||||
@@ -330,6 +375,35 @@ public sealed class NodeManagerHistoryReadPagingTests : IDisposable
|
||||
})
|
||||
.ToArray();
|
||||
|
||||
/// <summary>Build a sorted series: <paramref name="distinctBefore"/> distinct-second samples, then a
|
||||
/// run of <paramref name="tieCount"/> samples ALL at the same timestamp (the next second after the
|
||||
/// last distinct one — the oversized tie cluster), then <paramref name="distinctAfter"/> distinct
|
||||
/// samples each a second later. Values are 0..N-1 in series order so the union assertion is exact.</summary>
|
||||
private static DataValueSnapshot[] MakeSeriesWithTieCluster(int distinctBefore, int tieCount, int distinctAfter)
|
||||
{
|
||||
var samples = new List<DataValueSnapshot>(distinctBefore + tieCount + distinctAfter);
|
||||
var value = 0;
|
||||
var second = 0;
|
||||
|
||||
for (var i = 0; i < distinctBefore; i++, second++)
|
||||
{
|
||||
var t = Epoch.AddSeconds(second);
|
||||
samples.Add(new DataValueSnapshot((double)value++, StatusCodes.Good, t, t));
|
||||
}
|
||||
|
||||
var tieT = Epoch.AddSeconds(second++); // the single shared timestamp for the whole cluster
|
||||
for (var i = 0; i < tieCount; i++)
|
||||
samples.Add(new DataValueSnapshot((double)value++, StatusCodes.Good, tieT, tieT));
|
||||
|
||||
for (var i = 0; i < distinctAfter; i++, second++)
|
||||
{
|
||||
var t = Epoch.AddSeconds(second);
|
||||
samples.Add(new DataValueSnapshot((double)value++, StatusCodes.Good, t, t));
|
||||
}
|
||||
|
||||
return samples.ToArray();
|
||||
}
|
||||
|
||||
/// <summary>A series-backed fake historian: holds a full sorted series and serves a Raw read by
|
||||
/// returning the samples in [start, end] capped at maxValuesPerNode (start inclusive — exactly the
|
||||
/// resume-cursor contract). Processed / AtTime / Events are not exercised here.</summary>
|
||||
|
||||
Reference in New Issue
Block a user