From 2e6c6d3ab62494289c229a488f6d3239d47de84a Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Wed, 17 Jun 2026 20:11:09 -0400 Subject: [PATCH] feat(historian): page within oversized tie clusters (#400) instead of loud-failing --- .../OpcUa/OtOpcUaServerHostedService.cs | 20 ++++ .../HistoryPaging.cs | 64 +++++++++++ .../OtOpcUaNodeManager.cs | 88 +++++++++++--- .../Historian/ServerHistorianOptions.cs | 13 +++ .../HistoryPagingTests.cs | 106 +++++++++++++++++ .../NodeManagerHistoryReadPagingTests.cs | 108 +++++++++++++++--- 6 files changed, 368 insertions(+), 31 deletions(-) diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Host/OpcUa/OtOpcUaServerHostedService.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Host/OpcUa/OtOpcUaServerHostedService.cs index cddf546c..53ae4c7c 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Host/OpcUa/OtOpcUaServerHostedService.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Host/OpcUa/OtOpcUaServerHostedService.cs @@ -1,6 +1,7 @@ using Akka.Actor; using Akka.Cluster.Tools.PublishSubscribe; using Akka.Hosting; +using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; @@ -10,6 +11,7 @@ using ZB.MOM.WW.OtOpcUa.OpcUaServer; using ZB.MOM.WW.OtOpcUa.OpcUaServer.Security; using ZB.MOM.WW.OtOpcUa.Runtime; using ZB.MOM.WW.OtOpcUa.Runtime.Drivers; +using ZB.MOM.WW.OtOpcUa.Runtime.Historian; using ZB.MOM.WW.OtOpcUa.Runtime.ScriptedAlarms; namespace ZB.MOM.WW.OtOpcUa.Host.OpcUa; @@ -32,6 +34,7 @@ public sealed class OtOpcUaServerHostedService : IHostedService, IAsyncDisposabl private readonly DeferredServiceLevelPublisher _deferredServiceLevel; private readonly IOpcUaUserAuthenticator _userAuthenticator; private readonly IHistorianDataSource _historianDataSource; + private readonly ServerHistorianOptions _serverHistorianOptions; private readonly Func _actorSystemAccessor; private readonly ActorRegistry _actorRegistry; private readonly ILoggerFactory _loggerFactory; @@ -52,6 +55,11 @@ public sealed class OtOpcUaServerHostedService : IHostedService, IAsyncDisposabl /// node, the same source the address-space sink + node-write gateway come from), or the configured /// Wonderware read client when AddServerHistorian enabled it. Wired onto the node manager in /// . + /// App configuration; the ServerHistorian section is bound here to + /// read for the node manager. Bound directly + /// (not via IOptions) so the wiring is self-contained in this hosted service — the option's + /// default survives when the section is absent. The companion AddServerHistorian registration in + /// Program.cs reads the same section for the read-client gating; this is the read-path only. /// Lazy accessor for the running , used to /// resolve the DistributedPubSub mediator the inbound alarm-command router publishes through. Resolved /// lazily (mirroring DpsScriptLogPublisher) so construction never races Akka startup. @@ -65,6 +73,7 @@ public sealed class OtOpcUaServerHostedService : IHostedService, IAsyncDisposabl DeferredServiceLevelPublisher deferredServiceLevel, IOpcUaUserAuthenticator userAuthenticator, IHistorianDataSource historianDataSource, + IConfiguration configuration, Func actorSystemAccessor, ActorRegistry actorRegistry, ILoggerFactory loggerFactory) @@ -74,6 +83,11 @@ public sealed class OtOpcUaServerHostedService : IHostedService, IAsyncDisposabl _deferredServiceLevel = deferredServiceLevel; _userAuthenticator = userAuthenticator; _historianDataSource = historianDataSource; + // Bind the ServerHistorian section directly (defaults survive when it's absent) — we only need + // MaxTieClusterOverfetch here, and binding it ourselves keeps the wiring in this one file. + _serverHistorianOptions = + configuration.GetSection(ServerHistorianOptions.SectionName).Get() + ?? new ServerHistorianOptions(); _actorSystemAccessor = actorSystemAccessor; _actorRegistry = actorRegistry; _loggerFactory = loggerFactory; @@ -201,6 +215,12 @@ public sealed class OtOpcUaServerHostedService : IHostedService, IAsyncDisposabl // The node manager's HistoryRead overrides block-bridge to whatever source is set here. _server.SetHistorianDataSource(_historianDataSource); + // Propagate the configured tie-cluster over-fetch bound onto the node manager so HistoryRead-Raw + // can page WITHIN an oversized tie cluster (more samples sharing one SourceTimestamp than the page + // cap) rather than failing the read. The option's default (65536) survives when the ServerHistorian + // section is absent; NodeManager is non-null here (guarded above). + _server.NodeManager.MaxTieClusterOverfetch = _serverHistorianOptions.MaxTieClusterOverfetch; + // ServiceLevel publisher needs IServerInternal — only available after Start. if (_server.CurrentInstance is { } serverInternal) { diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer/HistoryPaging.cs b/src/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer/HistoryPaging.cs index ec994260..83e37c23 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer/HistoryPaging.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer/HistoryPaging.cs @@ -150,4 +150,68 @@ internal static class HistoryPaging for (var j = i; j < resumedPage.Count; j++) trimmed.Add(resumedPage[j]); return trimmed; } + + /// + /// Page WITHIN a single oversized "tie cluster" — a set of raw samples that all share one + /// SourceTimestamp and is larger than the client's per-page cap. + /// + /// The fixed-(start, end, cap) historian backend cannot skip/offset, so an oversized + /// tie cluster defeats the (timestamp, skip) resume cursor that + /// builds: every resume re-reads the first cap ties, the boundary-tie trim empties the + /// page, and the cursor never advances. To page past it, the caller over-fetches the WHOLE + /// cluster (a bounded start == end == T read) and hands it here. We then carve out the + /// next ties starting at and compute a cursor + /// that advances within the cluster, then steps off it when it is drained. + /// + /// + /// Advance is lossless. When the slice drains the cluster (emitted == clusterCount) + /// we resume from T + 1 tick with a fresh skip of 0. This skips no data because no + /// value exists strictly between T and T + 1 tick — a tick + /// (100 ns) is the type's resolution — so every remaining sample in the window has a timestamp + /// of at least T + 1 tick. We do NOT resume inclusively at T here (the way + /// does for cross-boundary ties): we have already over-fetched + /// and emitted the entire T cluster, so resuming at T would needlessly re-read it. + /// + /// + /// Short-page-with-CP exception. A page that fully drains the cluster but is SHORTER than + /// (the cluster had fewer than cap remaining ties) STILL emits a + /// continuation point when the window extends past T (T + 1 tick <= endUtc). This + /// deliberately violates the usual "short page ⇒ terminal" rule (): + /// the page is short only because the cluster ran out, not because the window did, so there may + /// still be data after T that the next page must read. + /// + /// + /// Self-heal. If meets or exceeds + /// (e.g. a stale cursor against a cluster that shrank between reads), sliceStart clamps to + /// and sliceCount is 0; since emitted then + /// equals , the cursor advances/terminates rather than looping. + /// + /// + /// The total number of ties at (the over-fetched cluster size). + /// How many ties at were already emitted on prior pages. + /// The client's per-page cap (NumValuesPerNode); must be > 0 to make progress. + /// The single SourceTimestamp every tie in the cluster shares. + /// The (inclusive) upper bound of the read window; unchanged across pages. + /// The index into the cluster the emitted slice starts at (clamped to the count). + /// How many ties the emitted slice contains (may be 0 on a stale-skip self-heal). + /// The next page's inclusive start: while the + /// cluster still has un-emitted ties, boundaryT + 1 tick once it is drained and the window + /// remains, or null when the window is exhausted (terminal — no continuation point). + /// The next page's boundary skip count: the running emitted-tie total while the + /// cluster drains, else 0 after advancing past it. + public static void SliceTieCluster( + int clusterCount, int skip, uint cap, DateTime boundaryT, DateTime endUtc, + out int sliceStart, out int sliceCount, out DateTime? nextStartUtc, out int nextSkip) + { + sliceStart = Math.Min(skip, clusterCount); + sliceCount = Math.Min((int)cap, clusterCount - sliceStart); + var emitted = sliceStart + sliceCount; + if (emitted < clusterCount) { nextStartUtc = boundaryT; nextSkip = emitted; } + else + { + var next = boundaryT.AddTicks(1); + if (next <= endUtc) { nextStartUtc = next; nextSkip = 0; } + else { nextStartUtc = null; nextSkip = 0; } + } + } } diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer/OtOpcUaNodeManager.cs b/src/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer/OtOpcUaNodeManager.cs index 5d4f2e89..2edbadda 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer/OtOpcUaNodeManager.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer/OtOpcUaNodeManager.cs @@ -179,6 +179,19 @@ public sealed class OtOpcUaNodeManager : CustomNodeManager2 set => _historianDataSource = value ?? NullHistorianDataSource.Instance; } + /// + /// The upper bound on the bounded over-fetch uses to page WITHIN an + /// oversized "tie cluster" — more raw samples sharing one SourceTimestamp than the client's per-page + /// cap. When a resume read stalls on such a cluster (the boundary-tie trim empties the page), the + /// paging over-fetches up to MaxTieClusterOverfetch + 1 ties at that single timestamp (a + /// start == end read) and slices through them via . + /// A cluster strictly larger than this still surfaces BadHistoryOperationUnsupported for that + /// node (the absurd-burst backstop). Mirrors the configured + /// ServerHistorianOptions.MaxTieClusterOverfetch; the Host sets it at StartAsync. The + /// default (65536) survives when the historian section is absent. + /// + public int MaxTieClusterOverfetch { get; set; } = 65536; + private volatile IHistoryContinuationStore _historyContinuationStore = new SessionHistoryContinuationStore(); /// @@ -1871,23 +1884,70 @@ public sealed class OtOpcUaNodeManager : CustomNodeManager2 ? HistoryPaging.TrimBoundaryDuplicates(sourceResult.Samples, startUtc, boundarySkip) : sourceResult.Samples; - // Degenerate tie cluster: a resume read returned a FULL backend page that the boundary-tie trim - // emptied entirely. That can only happen when more than NumValuesPerNode samples share the resume - // boundary timestamp — a tie cluster larger than the page cap. The fixed-(start,end,cap) backend - // can only ever return the first `cap` of those ties, so a (timestamp, skip) cursor can never - // advance past the cluster. Fail LOUDLY for this node rather than silently truncate to GoodNoData - // (which would permanently drop the un-emitted ties). The operator's remedy is a larger - // NumValuesPerNode; see docs/Historian.md "Paging limitation". - if (inboundCp is { Length: > 0 } && backendFull && samples.Count == 0) + // Oversized tie cluster: a resume read whose FULL backend page does not advance past the resume + // boundary timestamp `startUtc`. That happens when more samples share `startUtc` than the page + // cap — a tie cluster larger than NumValuesPerNode. The fixed-(start,end,cap) backend can only + // ever return the first `cap` of those ties, so the (timestamp, skip) cursor alone can never page + // past (or even reliably within) the cluster: either the boundary-tie trim empties the page, or + // it re-emits the same head ties forever. Detect BOTH stall shapes — an empty trimmed page, or a + // non-empty page whose LAST sample is still at `startUtc` (no forward progress) — and, rather + // than fail the read, over-fetch the WHOLE cluster with an explicit, BOUNDED cap (a start==end + // read at the boundary timestamp) and page WITHIN the timestamp via SliceTieCluster. + var stalledOnTieCluster = inboundCp is { Length: > 0 } + && backendFull + && (samples.Count == 0 + || (samples[^1].SourceTimestampUtc ?? DateTime.MinValue) == startUtc); + if (stalledOnTieCluster) { + // The over-fetch cap MUST be explicit and non-zero: a cap of 0 falls back to the backend's + // MaxValuesPerRead, which would re-introduce the very stall we're escaping. +1 over the bound + // lets us DETECT a cluster strictly larger than the bound (the absurd-burst backstop below). + var overfetchCap = (uint)(MaxTieClusterOverfetch + 1); + var cluster = HistorianDataSource + .ReadRawAsync(tagname, startUtc, startUtc, overfetchCap, CancellationToken.None) + .GetAwaiter().GetResult().Samples; + + // Absurd burst: more ties than we're willing to buffer in memory. Preserve today's loud-fail + // for that node rather than over-fetch an unbounded cluster; the operator's remedy is a + // larger ServerHistorian:MaxTieClusterOverfetch (or NumValuesPerNode). + if (cluster.Count > MaxTieClusterOverfetch) + { #pragma warning disable CS0618 // Type or member is obsolete - Utils.LogError( - "OtOpcUaNodeManager: HistoryReadRaw paging stalled — tie cluster at {0:O} for tag '{1}' " + - "exceeds NumValuesPerNode={2}; cannot page past it. Increase NumValuesPerNode.", - startUtc, tagname, numValuesPerNode); + Utils.LogError( + "OtOpcUaNodeManager: HistoryReadRaw tie cluster at {0:O} for tag '{1}' has {2} samples, " + + "exceeding MaxTieClusterOverfetch={3}; cannot page within it. Increase MaxTieClusterOverfetch.", + startUtc, tagname, cluster.Count, MaxTieClusterOverfetch); #pragma warning restore CS0618 - errors[handle.Index] = StatusCodes.BadHistoryOperationUnsupported; - results[handle.Index] = new SdkHistoryReadResult { StatusCode = StatusCodes.BadHistoryOperationUnsupported }; + errors[handle.Index] = StatusCodes.BadHistoryOperationUnsupported; + results[handle.Index] = new SdkHistoryReadResult { StatusCode = StatusCodes.BadHistoryOperationUnsupported }; + return; + } + + HistoryPaging.SliceTieCluster( + cluster.Count, boundarySkip, numValuesPerNode, startUtc, endUtc, + out var sliceStart, out var sliceCount, out var nextStartUtc, out var nextSkip); + + var slice = new List(sliceCount); + for (var i = sliceStart; i < sliceStart + sliceCount; i++) slice.Add(cluster[i]); + + // Emit a continuation point only when SliceTieCluster says the read continues (within the + // cluster, or past it while the window remains). nextSkip is the boundary skip for the next + // page — within the cluster it counts the ties already emitted at startUtc; past it it's 0. + byte[]? clusterCp = null; + if (nextStartUtc is { } resumeAt) + { + var clusterState = new HistoryContinuationState( + tagname, resumeAt, endUtc, nextSkip, numValuesPerNode); + clusterCp = _historyContinuationStore.Save(session, clusterState); + } + + results[handle.Index] = new SdkHistoryReadResult + { + StatusCode = slice.Count == 0 ? StatusCodes.GoodNoData : StatusCodes.Good, + HistoryData = new ExtensionObject(ToHistoryDataFromSamples(slice)), + ContinuationPoint = clusterCp, + }; + errors[handle.Index] = ServiceResult.Good; return; } diff --git a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Historian/ServerHistorianOptions.cs b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Historian/ServerHistorianOptions.cs index 254dd24a..96908f43 100644 --- a/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Historian/ServerHistorianOptions.cs +++ b/src/Server/ZB.MOM.WW.OtOpcUa.Runtime/Historian/ServerHistorianOptions.cs @@ -41,6 +41,17 @@ public sealed class ServerHistorianOptions /// Per-process shared secret the sidecar verifies in the Hello frame. public string SharedSecret { get; init; } = ""; + /// + /// The upper bound on the bounded over-fetch the HistoryRead-Raw paging uses to page WITHIN an + /// oversized "tie cluster" (more raw samples sharing one SourceTimestamp than the client's per-page + /// cap). When a resume read stalls on such a cluster, the node manager over-fetches up to + /// MaxTieClusterOverfetch + 1 ties at that single timestamp (a start == end read) and + /// pages through them; this bounds how large a single-timestamp burst the server will buffer in + /// memory before it gives up and surfaces BadHistoryOperationUnsupported for that node. The + /// default (65536) comfortably covers any realistic same-millisecond burst. + /// + public int MaxTieClusterOverfetch { get; init; } = 65536; + /// Returns operator-facing misconfiguration warnings for an Enabled historian /// (empty when disabled or correctly configured). Pure — the registration logs each entry. /// Zero or more human-readable warning messages. @@ -52,6 +63,8 @@ public sealed class ServerHistorianOptions warnings.Add("ServerHistorian:SharedSecret is empty while the historian is enabled — the Wonderware sidecar Hello frame will carry an empty secret."); if (Port <= 0) warnings.Add($"ServerHistorian:Port is {Port} — must be > 0; the read client cannot dial the sidecar."); + if (MaxTieClusterOverfetch <= 0) + warnings.Add($"ServerHistorian:MaxTieClusterOverfetch is {MaxTieClusterOverfetch} — must be > 0; HistoryRead-Raw cannot page within an oversized tie cluster and will surface BadHistoryOperationUnsupported for those reads."); return warnings; } } diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests/HistoryPagingTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests/HistoryPagingTests.cs index 51893232..3efcc088 100644 --- a/tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests/HistoryPagingTests.cs +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests/HistoryPagingTests.cs @@ -146,6 +146,112 @@ public sealed class HistoryPagingTests trimmed[0].Value.ShouldBe(11.0); } + // --- SliceTieCluster ------------------------------------------------------------------------ + + [Fact] + public void SliceTieCluster_mid_cluster_emits_cp_at_same_timestamp() + { + // 10 ties at T; already emitted 2; cap 3. Slice [2,5) → next page resumes AT T, skip 5. + var t = new DateTime(2026, 1, 1, 0, 0, 5, DateTimeKind.Utc); + var end = t.AddHours(1); + + HistoryPaging.SliceTieCluster( + clusterCount: 10, skip: 2, cap: 3, boundaryT: t, endUtc: end, + out var sliceStart, out var sliceCount, out var nextStartUtc, out var nextSkip); + + sliceStart.ShouldBe(2); + sliceCount.ShouldBe(3); + nextStartUtc.ShouldBe(t); // still draining the cluster ⇒ resume AT T + nextSkip.ShouldBe(5); // 2 already emitted + 3 just emitted + } + + [Fact] + public void SliceTieCluster_exact_drain_advances_one_tick_when_window_remains() + { + // 6 ties at T; skip 3; cap 3. Slice [3,6) drains the cluster exactly ⇒ advance to T+1tick. + var t = new DateTime(2026, 1, 1, 0, 0, 5, DateTimeKind.Utc); + var end = t.AddHours(1); + + HistoryPaging.SliceTieCluster( + clusterCount: 6, skip: 3, cap: 3, boundaryT: t, endUtc: end, + out var sliceStart, out var sliceCount, out var nextStartUtc, out var nextSkip); + + sliceStart.ShouldBe(3); + sliceCount.ShouldBe(3); + nextStartUtc.ShouldBe(t.AddTicks(1)); // cluster drained, window remains ⇒ next tick, fresh skip + nextSkip.ShouldBe(0); + } + + [Fact] + public void SliceTieCluster_short_final_slice_still_emits_cp_when_window_remains() + { + // 5 ties at T; skip 3; cap 10. Slice [3,5) is SHORT (2 < cap) but it fully drains the cluster + // and the window still extends past T ⇒ we MUST emit a CP to read the rest of the window even + // though this page is short of the cap. + var t = new DateTime(2026, 1, 1, 0, 0, 5, DateTimeKind.Utc); + var end = t.AddHours(1); + + HistoryPaging.SliceTieCluster( + clusterCount: 5, skip: 3, cap: 10, boundaryT: t, endUtc: end, + out var sliceStart, out var sliceCount, out var nextStartUtc, out var nextSkip); + + sliceStart.ShouldBe(3); + sliceCount.ShouldBe(2); // short slice + nextStartUtc.ShouldBe(t.AddTicks(1)); // but CP emitted so the rest of the window is read + nextSkip.ShouldBe(0); + } + + [Fact] + public void SliceTieCluster_drained_at_window_end_terminates() + { + // The cluster ends exactly AT the window end (endUtc == T). Draining it cannot advance past the + // window ⇒ terminal (no CP). + var t = new DateTime(2026, 1, 1, 0, 0, 5, DateTimeKind.Utc); + + HistoryPaging.SliceTieCluster( + clusterCount: 4, skip: 0, cap: 10, boundaryT: t, endUtc: t, + out var sliceStart, out var sliceCount, out var nextStartUtc, out var nextSkip); + + sliceStart.ShouldBe(0); + sliceCount.ShouldBe(4); + nextStartUtc.ShouldBeNull(); // T+1tick > endUtc ⇒ window exhausted ⇒ terminal + nextSkip.ShouldBe(0); + } + + [Fact] + public void SliceTieCluster_self_heals_when_skip_exceeds_cluster() + { + // Defensive: a stale skip points past the (re-read, possibly shrunk) cluster. The slice is empty + // (count 0) and, since emitted == clusterCount, the cursor advances/terminates rather than looping. + var t = new DateTime(2026, 1, 1, 0, 0, 5, DateTimeKind.Utc); + var end = t.AddHours(1); + + HistoryPaging.SliceTieCluster( + clusterCount: 4, skip: 4, cap: 10, boundaryT: t, endUtc: end, + out var sliceStart, out var sliceCount, out var nextStartUtc, out var nextSkip); + + sliceStart.ShouldBe(4); + sliceCount.ShouldBe(0); // nothing left in the cluster to emit + nextStartUtc.ShouldBe(t.AddTicks(1)); // emitted (4) == clusterCount (4) ⇒ advance, don't loop + nextSkip.ShouldBe(0); + } + + [Fact] + public void SliceTieCluster_self_heals_to_terminal_at_window_end() + { + // Same stale-skip self-heal, but the window ends at T ⇒ advancing terminates instead of looping. + var t = new DateTime(2026, 1, 1, 0, 0, 5, DateTimeKind.Utc); + + HistoryPaging.SliceTieCluster( + clusterCount: 4, skip: 9, cap: 10, boundaryT: t, endUtc: t, + out var sliceStart, out var sliceCount, out var nextStartUtc, out var nextSkip); + + sliceStart.ShouldBe(4); // clamped to clusterCount + sliceCount.ShouldBe(0); + nextStartUtc.ShouldBeNull(); + nextSkip.ShouldBe(0); + } + // --- InMemoryHistoryContinuationStore (mirrors the production session store contract) -------- [Fact] diff --git a/tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests/NodeManagerHistoryReadPagingTests.cs b/tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests/NodeManagerHistoryReadPagingTests.cs index ac130e5a..16ddb42e 100644 --- a/tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests/NodeManagerHistoryReadPagingTests.cs +++ b/tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests/NodeManagerHistoryReadPagingTests.cs @@ -146,38 +146,83 @@ public sealed class NodeManagerHistoryReadPagingTests : IDisposable await host.DisposeAsync(); } - /// Degenerate tie cluster larger than the page cap: a single timestamp carrying MORE ties - /// than NumValuesPerNode cannot be paged past by a (timestamp, skip) cursor — the fixed-(start, - /// end,cap) backend keeps returning the same first cap ties. Rather than silently truncate to - /// GoodNoData (permanently dropping the un-emitted ties), the resume read FAILS LOUDLY for that node - /// with BadHistoryOperationUnsupported. (Regression for the data-loss path the carry-offset - /// cursor cannot resolve; the operator's remedy is a larger NumValuesPerNode.) + /// Oversized tie cluster (more ties at one timestamp than the page cap): the (timestamp, skip) + /// resume cursor alone cannot advance past it — the fixed-(start,end,cap) backend keeps returning the + /// same first cap ties. The paging now over-fetches the WHOLE cluster (a start == end read, + /// bounded by MaxTieClusterOverfetch) and pages WITHIN the timestamp, so the read drains the + /// cluster (and any data after it) across continuation points with no dup/skip. Here a tie cluster of 3 + /// (indices 3..7 share one timestamp) sits between distinct samples; with cap 2 the union must be all 10 + /// values in order. [Fact] - public async Task Raw_tie_cluster_larger_than_page_fails_loudly_not_silently() + public async Task Raw_oversized_tie_cluster_pages_within_the_timestamp() { var (host, server) = await BootAsync(); var nm = server.NodeManager!; nm.HistoryContinuationStore = new InMemoryHistoryContinuationStore(); - // 6 samples ALL sharing one timestamp (Epoch+2s) — a tie cluster of 6 with a page cap of 4. - var t = Epoch.AddSeconds(2); - var series = Enumerable.Range(0, 6) - .Select(i => new DataValueSnapshot((double)i, StatusCodes.Good, t, t)).ToArray(); + // 3 distinct, then a tie cluster of 5 (all at Epoch+3s), then 2 more distinct — 10 total. The tie + // cluster (5) is larger than the page cap (2), the case that used to stall the cursor. + var series = MakeSeriesWithTieCluster(distinctBefore: 3, tieCount: 5, distinctAfter: 2); + series.Length.ShouldBe(10); nm.HistorianDataSource = new SeriesHistorianDataSource(series); nm.EnsureVariable("eq-1/burst", parentFolderNodeId: null, displayName: "Burst", dataType: "Double", writable: false, historianTagname: "WW.Burst"); var nodeId = nm.TryGetVariable("eq-1/burst")!.NodeId; - // Page 1: a full page of the first 4 ties, with a continuation point. - var (r1, e1, cp1) = ReadRaw(nm, nodeId, Epoch, Epoch.AddHours(1), max: 4, inboundCp: null); + var collected = new List(); + byte[]? cp = null; + var pageCount = 0; + do + { + var (values, error, nextCp) = ReadRaw(nm, nodeId, Epoch, Epoch.AddHours(1), + max: 2, inboundCp: cp); + error.StatusCode.Code.ShouldBe(StatusCodes.Good); + collected.AddRange(values); + cp = nextCp; + pageCount++; + pageCount.ShouldBeLessThan(20, "paging must terminate, not loop, even through a tie cluster"); + } + while (cp is not null); + + // Every value, once, in order — the cluster was paged through, not dropped or duplicated. + collected.Count.ShouldBe(10); + collected.ShouldBe(Enumerable.Range(0, 10).Select(i => (double)i)); + + await host.DisposeAsync(); + } + + /// The absurd-burst backstop is preserved: a tie cluster STRICTLY larger than the configured + /// still surfaces + /// BadHistoryOperationUnsupported for that node rather than buffering an unbounded burst. With + /// the bound set to 3 and a 5-way tie cluster, the resume read that hits the cluster fails loudly. + [Fact] + public async Task Raw_tie_cluster_beyond_overfetch_bound_fails_loudly() + { + var (host, server) = await BootAsync(); + var nm = server.NodeManager!; + nm.HistoryContinuationStore = new InMemoryHistoryContinuationStore(); + nm.MaxTieClusterOverfetch = 3; // cluster of 5 will exceed this ⇒ backstop. + + // 5 samples ALL sharing one timestamp (Epoch+2s) — a tie cluster of 5 with a page cap of 2. + var t = Epoch.AddSeconds(2); + var series = Enumerable.Range(0, 5) + .Select(i => new DataValueSnapshot((double)i, StatusCodes.Good, t, t)).ToArray(); + nm.HistorianDataSource = new SeriesHistorianDataSource(series); + + nm.EnsureVariable("eq-1/absurd", parentFolderNodeId: null, displayName: "Absurd", dataType: "Double", + writable: false, historianTagname: "WW.Absurd"); + var nodeId = nm.TryGetVariable("eq-1/absurd")!.NodeId; + + // Page 1: a full page of the first 2 ties, with a continuation point. + var (r1, e1, cp1) = ReadRaw(nm, nodeId, Epoch, Epoch.AddHours(1), max: 2, inboundCp: null); e1.StatusCode.Code.ShouldBe(StatusCodes.Good); - r1.ShouldBe(new[] { 0.0, 1.0, 2.0, 3.0 }); + r1.ShouldBe(new[] { 0.0, 1.0 }); cp1.ShouldNotBeNull(); - // Page 2: the cursor cannot advance past the oversized cluster ⇒ a clear error, NOT a silent - // GoodNoData that would drop samples 4 and 5. - var (r2, e2, cp2) = ReadRaw(nm, nodeId, Epoch, Epoch.AddHours(1), max: 4, inboundCp: cp1); + // Page 2: the resume read hits the cluster; over-fetch finds 5 > bound 3 ⇒ a clear error, NOT a + // silent GoodNoData that would drop the un-emitted ties. + var (r2, e2, cp2) = ReadRaw(nm, nodeId, Epoch, Epoch.AddHours(1), max: 2, inboundCp: cp1); e2.StatusCode.Code.ShouldBe(StatusCodes.BadHistoryOperationUnsupported); r2.ShouldBeEmpty(); cp2.ShouldBeNull(); @@ -330,6 +375,35 @@ public sealed class NodeManagerHistoryReadPagingTests : IDisposable }) .ToArray(); + /// Build a sorted series: distinct-second samples, then a + /// run of samples ALL at the same timestamp (the next second after the + /// last distinct one — the oversized tie cluster), then distinct + /// samples each a second later. Values are 0..N-1 in series order so the union assertion is exact. + private static DataValueSnapshot[] MakeSeriesWithTieCluster(int distinctBefore, int tieCount, int distinctAfter) + { + var samples = new List(distinctBefore + tieCount + distinctAfter); + var value = 0; + var second = 0; + + for (var i = 0; i < distinctBefore; i++, second++) + { + var t = Epoch.AddSeconds(second); + samples.Add(new DataValueSnapshot((double)value++, StatusCodes.Good, t, t)); + } + + var tieT = Epoch.AddSeconds(second++); // the single shared timestamp for the whole cluster + for (var i = 0; i < tieCount; i++) + samples.Add(new DataValueSnapshot((double)value++, StatusCodes.Good, tieT, tieT)); + + for (var i = 0; i < distinctAfter; i++, second++) + { + var t = Epoch.AddSeconds(second); + samples.Add(new DataValueSnapshot((double)value++, StatusCodes.Good, t, t)); + } + + return samples.ToArray(); + } + /// A series-backed fake historian: holds a full sorted series and serves a Raw read by /// returning the samples in [start, end] capped at maxValuesPerNode (start inclusive — exactly the /// resume-cursor contract). Processed / AtTime / Events are not exercised here.