bea0b482d4
C1 (critical): a boundary tie cluster larger than NumValuesPerNode could silently truncate a resumed read to GoodNoData, permanently dropping the un-emitted ties — the (timestamp, skip) cursor cannot advance past a single timestamp the fixed-(start,end,cap) backend keeps re-returning. Now detected and failed LOUDLY per node with BadHistoryOperationUnsupported + a log naming the tag/timestamp/cap; documented in Historian.md with the larger-cap remedy. Regression test Raw_tie_cluster_larger_than_page_fails_loudly_not_silently. I3: build HistoryData before Save() so a projection failure can never orphan a stored continuation cursor. N1 (YAGNI): drop the never-produced HistoryReadKind enum + Processed-only Aggregate/IntervalTicks fields from HistoryContinuationState — only Raw pages. N3: ComputeResumeCursor guards its documented non-empty precondition. I1: document InMemoryHistoryContinuationStore's eventual-consistency (test double). Build clean, 182/182 OpcUaServer tests pass.
411 lines
19 KiB
C#
411 lines
19 KiB
C#
using Microsoft.Extensions.Logging.Abstractions;
|
|
using Opc.Ua;
|
|
using Opc.Ua.Server;
|
|
using Shouldly;
|
|
using Xunit;
|
|
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
|
using HistorianRead = ZB.MOM.WW.OtOpcUa.Core.Abstractions.HistoryReadResult;
|
|
using SdkHistoryReadResult = Opc.Ua.HistoryReadResult;
|
|
|
|
namespace ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests;
|
|
|
|
/// <summary>
|
|
/// Phase-D server-side HistoryRead continuation-point paging for the Raw arm. Boots a real
|
|
/// <see cref="OtOpcUaSdkServer"/> (as <see cref="NodeManagerHistoryReadTests"/> does), injects an
|
|
/// <see cref="InMemoryHistoryContinuationStore"/> (the in-process harness uses a SESSION-LESS
|
|
/// <c>OperationContext</c>, so the production session-backed store has no session to bind to — the
|
|
/// in-memory store exercises the SAME dispatch path), and a series-backed fake historian, then drives
|
|
/// the full multi-page round trip through the node manager's public HistoryRead.
|
|
/// </summary>
|
|
public sealed class NodeManagerHistoryReadPagingTests : IDisposable
|
|
{
|
|
private static CancellationToken Ct => TestContext.Current.CancellationToken;
|
|
|
|
private readonly string _pkiRoot = Path.Combine(
|
|
Path.GetTempPath(), $"otopcua-historypaging-{Guid.NewGuid():N}");
|
|
|
|
private static readonly DateTime Epoch = new(2026, 1, 1, 0, 0, 0, DateTimeKind.Utc);
|
|
|
|
/// <summary>A full first page emits a continuation point; resuming returns subsequent pages; the
|
|
/// final short page clears the continuation point — and the union of pages is the complete series with
|
|
/// no duplicate or skipped sample. (Resumed pages net slightly fewer than the cap because each resume
|
|
/// re-reads + trims the boundary sample to stay tie-safe — see the design; the union is what matters.)</summary>
|
|
[Fact]
|
|
public async Task Raw_pages_full_series_across_continuation_points()
|
|
{
|
|
var (host, server) = await BootAsync();
|
|
var nm = server.NodeManager!;
|
|
nm.HistoryContinuationStore = new InMemoryHistoryContinuationStore();
|
|
|
|
// 250 distinct-timestamp samples; client caps each page at 100.
|
|
var series = MakeSeries(count: 250, stepSeconds: 1);
|
|
nm.HistorianDataSource = new SeriesHistorianDataSource(series);
|
|
|
|
nm.EnsureVariable("eq-1/pv", parentFolderNodeId: null, displayName: "PV", dataType: "Double",
|
|
writable: false, historianTagname: "WW.PV");
|
|
var nodeId = nm.TryGetVariable("eq-1/pv")!.NodeId;
|
|
|
|
var collected = new List<double>();
|
|
byte[]? cp = null;
|
|
var pageCount = 0;
|
|
var sawCp = false;
|
|
do
|
|
{
|
|
var (values, error, nextCp) = ReadRaw(nm, nodeId, start: Epoch, end: Epoch.AddHours(1),
|
|
max: 100, inboundCp: cp);
|
|
error.StatusCode.Code.ShouldBe(StatusCodes.Good);
|
|
collected.AddRange(values);
|
|
cp = nextCp;
|
|
if (cp is not null) sawCp = true;
|
|
pageCount++;
|
|
pageCount.ShouldBeLessThan(20, "paging must terminate, not loop");
|
|
}
|
|
while (cp is not null);
|
|
|
|
sawCp.ShouldBeTrue("a 250-sample series capped at 100 must page");
|
|
pageCount.ShouldBeGreaterThan(1);
|
|
|
|
// The union is the exact series — no duplicates, no skips, in order.
|
|
collected.Count.ShouldBe(250);
|
|
collected.ShouldBe(Enumerable.Range(0, 250).Select(i => (double)i));
|
|
|
|
await host.DisposeAsync();
|
|
}
|
|
|
|
/// <summary>An exactly-full series whose total is a multiple of the cap pages to a trailing
|
|
/// EMPTY page (GoodNoData, no CP) — the server can't know the prior page was the last until it reads
|
|
/// past the end.</summary>
|
|
[Fact]
|
|
public async Task Raw_exact_multiple_terminates_with_empty_final_page()
|
|
{
|
|
var (host, server) = await BootAsync();
|
|
var nm = server.NodeManager!;
|
|
nm.HistoryContinuationStore = new InMemoryHistoryContinuationStore();
|
|
nm.HistorianDataSource = new SeriesHistorianDataSource(MakeSeries(count: 100, stepSeconds: 1));
|
|
|
|
nm.EnsureVariable("eq-1/exact", parentFolderNodeId: null, displayName: "Exact", dataType: "Double",
|
|
writable: false, historianTagname: "WW.Exact");
|
|
var nodeId = nm.TryGetVariable("eq-1/exact")!.NodeId;
|
|
|
|
var (r1, _, cp1) = ReadRaw(nm, nodeId, Epoch, Epoch.AddHours(1), max: 100, inboundCp: null);
|
|
r1.Count.ShouldBe(100);
|
|
cp1.ShouldNotBeNull("a full page emits a CP even when it happens to be the last");
|
|
|
|
var (r2, e2, cp2) = ReadRaw(nm, nodeId, Epoch, Epoch.AddHours(1), max: 100, inboundCp: cp1);
|
|
r2.Count.ShouldBe(0);
|
|
e2.StatusCode.Code.ShouldBe(StatusCodes.Good);
|
|
cp2.ShouldBeNull("the empty trailing page terminates paging");
|
|
|
|
await host.DisposeAsync();
|
|
}
|
|
|
|
/// <summary>Boundary timestamps that TIE across the page split are neither duplicated nor skipped:
|
|
/// the resume cursor drops the already-emitted ties and re-reads the un-emitted ones at the same
|
|
/// timestamp.</summary>
|
|
[Fact]
|
|
public async Task Raw_paging_dedups_tied_boundary_timestamps()
|
|
{
|
|
var (host, server) = await BootAsync();
|
|
var nm = server.NodeManager!;
|
|
nm.HistoryContinuationStore = new InMemoryHistoryContinuationStore();
|
|
|
|
// 5 samples: indices 2,3,4 all share the SAME timestamp (Epoch+2s). With cap 4, page 1 returns
|
|
// [0,1,2,3] (cutting through the tie cluster); page 2 must return [4] (the un-emitted tie) only.
|
|
var ts = new[]
|
|
{
|
|
Epoch, // 0
|
|
Epoch.AddSeconds(1), // 1
|
|
Epoch.AddSeconds(2), // 2 ┐
|
|
Epoch.AddSeconds(2), // 3 │ tied at Epoch+2s
|
|
Epoch.AddSeconds(2), // 4 ┘
|
|
};
|
|
var series = ts.Select((t, i) => new DataValueSnapshot((double)i, StatusCodes.Good, t, t)).ToArray();
|
|
nm.HistorianDataSource = new SeriesHistorianDataSource(series);
|
|
|
|
nm.EnsureVariable("eq-1/tie", parentFolderNodeId: null, displayName: "Tie", dataType: "Double",
|
|
writable: false, historianTagname: "WW.Tie");
|
|
var nodeId = nm.TryGetVariable("eq-1/tie")!.NodeId;
|
|
|
|
var collected = new List<double>();
|
|
|
|
var (r1, _, cp1) = ReadRaw(nm, nodeId, Epoch, Epoch.AddHours(1), max: 4, inboundCp: null);
|
|
r1.Count.ShouldBe(4);
|
|
r1.ShouldBe(new[] { 0.0, 1.0, 2.0, 3.0 });
|
|
cp1.ShouldNotBeNull();
|
|
collected.AddRange(r1);
|
|
|
|
var (r2, _, cp2) = ReadRaw(nm, nodeId, Epoch, Epoch.AddHours(1), max: 4, inboundCp: cp1);
|
|
// Only the un-emitted tie (index 4) — NOT the already-emitted 2 and 3.
|
|
r2.ShouldBe(new[] { 4.0 });
|
|
cp2.ShouldBeNull();
|
|
collected.AddRange(r2);
|
|
|
|
// Exactly the 5 distinct samples, once each.
|
|
collected.ShouldBe(new[] { 0.0, 1.0, 2.0, 3.0, 4.0 });
|
|
|
|
await host.DisposeAsync();
|
|
}
|
|
|
|
/// <summary>Degenerate tie cluster larger than the page cap: a single timestamp carrying MORE ties
|
|
/// than <c>NumValuesPerNode</c> cannot be paged past by a (timestamp, skip) cursor — the fixed-(start,
|
|
/// end,cap) backend keeps returning the same first <c>cap</c> ties. Rather than silently truncate to
|
|
/// GoodNoData (permanently dropping the un-emitted ties), the resume read FAILS LOUDLY for that node
|
|
/// with <c>BadHistoryOperationUnsupported</c>. (Regression for the data-loss path the carry-offset
|
|
/// cursor cannot resolve; the operator's remedy is a larger NumValuesPerNode.)</summary>
|
|
[Fact]
|
|
public async Task Raw_tie_cluster_larger_than_page_fails_loudly_not_silently()
|
|
{
|
|
var (host, server) = await BootAsync();
|
|
var nm = server.NodeManager!;
|
|
nm.HistoryContinuationStore = new InMemoryHistoryContinuationStore();
|
|
|
|
// 6 samples ALL sharing one timestamp (Epoch+2s) — a tie cluster of 6 with a page cap of 4.
|
|
var t = Epoch.AddSeconds(2);
|
|
var series = Enumerable.Range(0, 6)
|
|
.Select(i => new DataValueSnapshot((double)i, StatusCodes.Good, t, t)).ToArray();
|
|
nm.HistorianDataSource = new SeriesHistorianDataSource(series);
|
|
|
|
nm.EnsureVariable("eq-1/burst", parentFolderNodeId: null, displayName: "Burst", dataType: "Double",
|
|
writable: false, historianTagname: "WW.Burst");
|
|
var nodeId = nm.TryGetVariable("eq-1/burst")!.NodeId;
|
|
|
|
// Page 1: a full page of the first 4 ties, with a continuation point.
|
|
var (r1, e1, cp1) = ReadRaw(nm, nodeId, Epoch, Epoch.AddHours(1), max: 4, inboundCp: null);
|
|
e1.StatusCode.Code.ShouldBe(StatusCodes.Good);
|
|
r1.ShouldBe(new[] { 0.0, 1.0, 2.0, 3.0 });
|
|
cp1.ShouldNotBeNull();
|
|
|
|
// Page 2: the cursor cannot advance past the oversized cluster ⇒ a clear error, NOT a silent
|
|
// GoodNoData that would drop samples 4 and 5.
|
|
var (r2, e2, cp2) = ReadRaw(nm, nodeId, Epoch, Epoch.AddHours(1), max: 4, inboundCp: cp1);
|
|
e2.StatusCode.Code.ShouldBe(StatusCodes.BadHistoryOperationUnsupported);
|
|
r2.ShouldBeEmpty();
|
|
cp2.ShouldBeNull();
|
|
|
|
await host.DisposeAsync();
|
|
}
|
|
|
|
/// <summary>NumValuesPerNode == 0 ("all values") never pages — the whole series returns in one shot
|
|
/// with a null continuation point.</summary>
|
|
[Fact]
|
|
public async Task Raw_unlimited_request_returns_everything_without_a_cp()
|
|
{
|
|
var (host, server) = await BootAsync();
|
|
var nm = server.NodeManager!;
|
|
nm.HistoryContinuationStore = new InMemoryHistoryContinuationStore();
|
|
nm.HistorianDataSource = new SeriesHistorianDataSource(MakeSeries(count: 250, stepSeconds: 1));
|
|
|
|
nm.EnsureVariable("eq-1/all", parentFolderNodeId: null, displayName: "All", dataType: "Double",
|
|
writable: false, historianTagname: "WW.All");
|
|
var nodeId = nm.TryGetVariable("eq-1/all")!.NodeId;
|
|
|
|
var (r, e, cp) = ReadRaw(nm, nodeId, Epoch, Epoch.AddHours(1), max: 0, inboundCp: null);
|
|
|
|
e.StatusCode.Code.ShouldBe(StatusCodes.Good);
|
|
r.Count.ShouldBe(250);
|
|
cp.ShouldBeNull("an unlimited (max==0) request must not page");
|
|
|
|
await host.DisposeAsync();
|
|
}
|
|
|
|
/// <summary>An inbound continuation point the store doesn't know (released / never issued / from
|
|
/// another node) yields BadContinuationPointInvalid for that node, and the source is NOT read.</summary>
|
|
[Fact]
|
|
public async Task Raw_unknown_continuation_point_yields_BadContinuationPointInvalid()
|
|
{
|
|
var (host, server) = await BootAsync();
|
|
var nm = server.NodeManager!;
|
|
nm.HistoryContinuationStore = new InMemoryHistoryContinuationStore();
|
|
var fake = new SeriesHistorianDataSource(MakeSeries(count: 10, stepSeconds: 1));
|
|
nm.HistorianDataSource = fake;
|
|
|
|
nm.EnsureVariable("eq-1/bad-cp", parentFolderNodeId: null, displayName: "BadCp", dataType: "Double",
|
|
writable: false, historianTagname: "WW.BadCp");
|
|
var nodeId = nm.TryGetVariable("eq-1/bad-cp")!.NodeId;
|
|
|
|
fake.ResetReadCount();
|
|
var bogus = Guid.NewGuid().ToByteArray();
|
|
var (_, e, cp) = ReadRaw(nm, nodeId, Epoch, Epoch.AddHours(1), max: 100, inboundCp: bogus);
|
|
|
|
e.StatusCode.Code.ShouldBe(StatusCodes.BadContinuationPointInvalid);
|
|
cp.ShouldBeNull();
|
|
fake.ReadCount.ShouldBe(0); // never reached the backend.
|
|
|
|
await host.DisposeAsync();
|
|
}
|
|
|
|
/// <summary>releaseContinuationPoints drops the stored cursor WITHOUT reading data: a subsequent
|
|
/// resume of the released point then misses (BadContinuationPointInvalid).</summary>
|
|
[Fact]
|
|
public async Task Release_drops_the_cursor_without_reading()
|
|
{
|
|
var (host, server) = await BootAsync();
|
|
var nm = server.NodeManager!;
|
|
nm.HistoryContinuationStore = new InMemoryHistoryContinuationStore();
|
|
var fake = new SeriesHistorianDataSource(MakeSeries(count: 250, stepSeconds: 1));
|
|
nm.HistorianDataSource = fake;
|
|
|
|
nm.EnsureVariable("eq-1/rel", parentFolderNodeId: null, displayName: "Rel", dataType: "Double",
|
|
writable: false, historianTagname: "WW.Rel");
|
|
var nodeId = nm.TryGetVariable("eq-1/rel")!.NodeId;
|
|
|
|
// Page 1 — get a CP.
|
|
var (_, _, cp1) = ReadRaw(nm, nodeId, Epoch, Epoch.AddHours(1), max: 100, inboundCp: null);
|
|
cp1.ShouldNotBeNull();
|
|
|
|
// Release it — the dispatcher routes releaseContinuationPoints=true to HistoryReleaseContinuationPoints,
|
|
// which never reaches the Raw arm. No data is read.
|
|
fake.ResetReadCount();
|
|
InvokeRelease(nm, nodeId, cp1!);
|
|
fake.ReadCount.ShouldBe(0);
|
|
|
|
// Resuming the released point now misses.
|
|
var (_, e, _) = ReadRaw(nm, nodeId, Epoch, Epoch.AddHours(1), max: 100, inboundCp: cp1);
|
|
e.StatusCode.Code.ShouldBe(StatusCodes.BadContinuationPointInvalid);
|
|
|
|
await host.DisposeAsync();
|
|
}
|
|
|
|
// --- helpers --------------------------------------------------------------------------------
|
|
|
|
/// <summary>Issue a single-node Raw HistoryRead and return that node's decoded sample values, error,
|
|
/// and outbound continuation point.</summary>
|
|
private static (IReadOnlyList<double> Values, ServiceResult Error, byte[]? Cp) ReadRaw(
|
|
OtOpcUaNodeManager nm, NodeId nodeId, DateTime start, DateTime end, uint max, byte[]? inboundCp)
|
|
{
|
|
var context = new OperationContext(
|
|
new RequestHeader(), secureChannelContext: null, RequestType.HistoryRead, identity: null);
|
|
|
|
var details = new ReadRawModifiedDetails
|
|
{
|
|
StartTime = start,
|
|
EndTime = end,
|
|
NumValuesPerNode = max,
|
|
IsReadModified = false,
|
|
};
|
|
|
|
var nodesToRead = new List<HistoryReadValueId>
|
|
{
|
|
new() { NodeId = nodeId, ContinuationPoint = inboundCp },
|
|
};
|
|
var results = new List<SdkHistoryReadResult> { null! };
|
|
var errors = new List<ServiceResult> { null! };
|
|
|
|
nm.HistoryRead(context, details, TimestampsToReturn.Both,
|
|
releaseContinuationPoints: false, nodesToRead, results, errors);
|
|
|
|
var values = new List<double>();
|
|
if (results[0]?.HistoryData is { } ho && !ExtensionObject.IsNull(ho))
|
|
{
|
|
var data = (HistoryData)ExtensionObject.ToEncodeable(ho);
|
|
foreach (var dv in data.DataValues) values.Add((double)dv.Value);
|
|
}
|
|
|
|
return (values, errors[0], results[0]?.ContinuationPoint);
|
|
}
|
|
|
|
/// <summary>Issue a release-only HistoryRead (releaseContinuationPoints=true) for the node + point.</summary>
|
|
private static void InvokeRelease(OtOpcUaNodeManager nm, NodeId nodeId, byte[] cp)
|
|
{
|
|
var context = new OperationContext(
|
|
new RequestHeader(), secureChannelContext: null, RequestType.HistoryRead, identity: null);
|
|
var details = new ReadRawModifiedDetails
|
|
{
|
|
StartTime = Epoch, EndTime = Epoch.AddHours(1), NumValuesPerNode = 100, IsReadModified = false,
|
|
};
|
|
var nodesToRead = new List<HistoryReadValueId> { new() { NodeId = nodeId, ContinuationPoint = cp } };
|
|
var results = new List<SdkHistoryReadResult> { null! };
|
|
var errors = new List<ServiceResult> { null! };
|
|
|
|
nm.HistoryRead(context, details, TimestampsToReturn.Both,
|
|
releaseContinuationPoints: true, nodesToRead, results, errors);
|
|
}
|
|
|
|
private static DataValueSnapshot[] MakeSeries(int count, int stepSeconds) =>
|
|
Enumerable.Range(0, count)
|
|
.Select(i =>
|
|
{
|
|
var t = Epoch.AddSeconds((long)i * stepSeconds);
|
|
return new DataValueSnapshot((double)i, StatusCodes.Good, t, t);
|
|
})
|
|
.ToArray();
|
|
|
|
/// <summary>A series-backed fake historian: holds a full sorted series and serves a Raw read by
|
|
/// returning the samples in [start, end] capped at maxValuesPerNode (start inclusive — exactly the
|
|
/// resume-cursor contract). Processed / AtTime / Events are not exercised here.</summary>
|
|
private sealed class SeriesHistorianDataSource(IReadOnlyList<DataValueSnapshot> series) : IHistorianDataSource
|
|
{
|
|
private int _readCount;
|
|
public int ReadCount => _readCount;
|
|
public void ResetReadCount() => _readCount = 0;
|
|
|
|
public Task<HistorianRead> ReadRawAsync(
|
|
string fullReference, DateTime startUtc, DateTime endUtc, uint maxValuesPerNode,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
Interlocked.Increment(ref _readCount);
|
|
var window = series
|
|
.Where(s => (s.SourceTimestampUtc ?? DateTime.MinValue) >= startUtc
|
|
&& (s.SourceTimestampUtc ?? DateTime.MinValue) <= endUtc)
|
|
.ToList();
|
|
// maxValuesPerNode == 0 means "no limit".
|
|
var page = maxValuesPerNode == 0 ? window : window.Take((int)maxValuesPerNode).ToList();
|
|
return Task.FromResult(new HistorianRead(page, null));
|
|
}
|
|
|
|
public Task<HistorianRead> ReadProcessedAsync(
|
|
string fullReference, DateTime startUtc, DateTime endUtc, TimeSpan interval,
|
|
HistoryAggregateType aggregate, CancellationToken cancellationToken) =>
|
|
Task.FromResult(new HistorianRead(Array.Empty<DataValueSnapshot>(), null));
|
|
|
|
public Task<HistorianRead> ReadAtTimeAsync(
|
|
string fullReference, IReadOnlyList<DateTime> timestampsUtc, CancellationToken cancellationToken) =>
|
|
Task.FromResult(new HistorianRead(Array.Empty<DataValueSnapshot>(), null));
|
|
|
|
public Task<HistoricalEventsResult> ReadEventsAsync(
|
|
string? sourceName, DateTime startUtc, DateTime endUtc, int maxEvents,
|
|
CancellationToken cancellationToken) =>
|
|
Task.FromResult(new HistoricalEventsResult(Array.Empty<HistoricalEvent>(), null));
|
|
|
|
public HistorianHealthSnapshot GetHealthSnapshot() => NullHistorianDataSource.Instance.GetHealthSnapshot();
|
|
|
|
public void Dispose() { }
|
|
}
|
|
|
|
private async Task<(OpcUaApplicationHost Host, OtOpcUaSdkServer Server)> BootAsync()
|
|
{
|
|
var host = new OpcUaApplicationHost(
|
|
new OpcUaApplicationHostOptions
|
|
{
|
|
ApplicationName = "OtOpcUa.HistoryPagingTest",
|
|
ApplicationUri = $"urn:OtOpcUa.HistoryPagingTest:{Guid.NewGuid():N}",
|
|
OpcUaPort = AllocateFreePort(),
|
|
PublicHostname = "localhost",
|
|
PkiStoreRoot = _pkiRoot,
|
|
},
|
|
NullLogger<OpcUaApplicationHost>.Instance);
|
|
|
|
var server = new OtOpcUaSdkServer();
|
|
await host.StartAsync(server, Ct);
|
|
return (host, server);
|
|
}
|
|
|
|
private static int AllocateFreePort()
|
|
{
|
|
using var listener = new System.Net.Sockets.TcpListener(System.Net.IPAddress.Loopback, 0);
|
|
listener.Start();
|
|
var port = ((System.Net.IPEndPoint)listener.LocalEndpoint).Port;
|
|
listener.Stop();
|
|
return port;
|
|
}
|
|
|
|
public void Dispose()
|
|
{
|
|
if (Directory.Exists(_pkiRoot))
|
|
{
|
|
try { Directory.Delete(_pkiRoot, recursive: true); }
|
|
catch { /* best-effort cleanup */ }
|
|
}
|
|
}
|
|
}
|