Files
lmxopcua/tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests/NodeManagerHistoryReadPagingTests.cs
T

485 lines
22 KiB
C#

using Microsoft.Extensions.Logging.Abstractions;
using Opc.Ua;
using Opc.Ua.Server;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using HistorianRead = ZB.MOM.WW.OtOpcUa.Core.Abstractions.HistoryReadResult;
using SdkHistoryReadResult = Opc.Ua.HistoryReadResult;
namespace ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests;
/// <summary>
/// Phase-D server-side HistoryRead continuation-point paging for the Raw arm. Boots a real
/// <see cref="OtOpcUaSdkServer"/> (as <see cref="NodeManagerHistoryReadTests"/> does), injects an
/// <see cref="InMemoryHistoryContinuationStore"/> (the in-process harness uses a SESSION-LESS
/// <c>OperationContext</c>, so the production session-backed store has no session to bind to — the
/// in-memory store exercises the SAME dispatch path), and a series-backed fake historian, then drives
/// the full multi-page round trip through the node manager's public HistoryRead.
/// </summary>
public sealed class NodeManagerHistoryReadPagingTests : IDisposable
{
private static CancellationToken Ct => TestContext.Current.CancellationToken;
private readonly string _pkiRoot = Path.Combine(
Path.GetTempPath(), $"otopcua-historypaging-{Guid.NewGuid():N}");
private static readonly DateTime Epoch = new(2026, 1, 1, 0, 0, 0, DateTimeKind.Utc);
/// <summary>A full first page emits a continuation point; resuming returns subsequent pages; the
/// final short page clears the continuation point — and the union of pages is the complete series with
/// no duplicate or skipped sample. (Resumed pages net slightly fewer than the cap because each resume
/// re-reads + trims the boundary sample to stay tie-safe — see the design; the union is what matters.)</summary>
[Fact]
public async Task Raw_pages_full_series_across_continuation_points()
{
var (host, server) = await BootAsync();
var nm = server.NodeManager!;
nm.HistoryContinuationStore = new InMemoryHistoryContinuationStore();
// 250 distinct-timestamp samples; client caps each page at 100.
var series = MakeSeries(count: 250, stepSeconds: 1);
nm.HistorianDataSource = new SeriesHistorianDataSource(series);
nm.EnsureVariable("eq-1/pv", parentFolderNodeId: null, displayName: "PV", dataType: "Double",
writable: false, historianTagname: "WW.PV");
var nodeId = nm.TryGetVariable("eq-1/pv")!.NodeId;
var collected = new List<double>();
byte[]? cp = null;
var pageCount = 0;
var sawCp = false;
do
{
var (values, error, nextCp) = ReadRaw(nm, nodeId, start: Epoch, end: Epoch.AddHours(1),
max: 100, inboundCp: cp);
error.StatusCode.Code.ShouldBe(StatusCodes.Good);
collected.AddRange(values);
cp = nextCp;
if (cp is not null) sawCp = true;
pageCount++;
pageCount.ShouldBeLessThan(20, "paging must terminate, not loop");
}
while (cp is not null);
sawCp.ShouldBeTrue("a 250-sample series capped at 100 must page");
pageCount.ShouldBeGreaterThan(1);
// The union is the exact series — no duplicates, no skips, in order.
collected.Count.ShouldBe(250);
collected.ShouldBe(Enumerable.Range(0, 250).Select(i => (double)i));
await host.DisposeAsync();
}
/// <summary>An exactly-full series whose total is a multiple of the cap pages to a trailing
/// EMPTY page (GoodNoData, no CP) — the server can't know the prior page was the last until it reads
/// past the end.</summary>
[Fact]
public async Task Raw_exact_multiple_terminates_with_empty_final_page()
{
var (host, server) = await BootAsync();
var nm = server.NodeManager!;
nm.HistoryContinuationStore = new InMemoryHistoryContinuationStore();
nm.HistorianDataSource = new SeriesHistorianDataSource(MakeSeries(count: 100, stepSeconds: 1));
nm.EnsureVariable("eq-1/exact", parentFolderNodeId: null, displayName: "Exact", dataType: "Double",
writable: false, historianTagname: "WW.Exact");
var nodeId = nm.TryGetVariable("eq-1/exact")!.NodeId;
var (r1, _, cp1) = ReadRaw(nm, nodeId, Epoch, Epoch.AddHours(1), max: 100, inboundCp: null);
r1.Count.ShouldBe(100);
cp1.ShouldNotBeNull("a full page emits a CP even when it happens to be the last");
var (r2, e2, cp2) = ReadRaw(nm, nodeId, Epoch, Epoch.AddHours(1), max: 100, inboundCp: cp1);
r2.Count.ShouldBe(0);
e2.StatusCode.Code.ShouldBe(StatusCodes.Good);
cp2.ShouldBeNull("the empty trailing page terminates paging");
await host.DisposeAsync();
}
/// <summary>Boundary timestamps that TIE across the page split are neither duplicated nor skipped:
/// the resume cursor drops the already-emitted ties and re-reads the un-emitted ones at the same
/// timestamp.</summary>
[Fact]
public async Task Raw_paging_dedups_tied_boundary_timestamps()
{
var (host, server) = await BootAsync();
var nm = server.NodeManager!;
nm.HistoryContinuationStore = new InMemoryHistoryContinuationStore();
// 5 samples: indices 2,3,4 all share the SAME timestamp (Epoch+2s). With cap 4, page 1 returns
// [0,1,2,3] (cutting through the tie cluster); page 2 must return [4] (the un-emitted tie) only.
var ts = new[]
{
Epoch, // 0
Epoch.AddSeconds(1), // 1
Epoch.AddSeconds(2), // 2 ┐
Epoch.AddSeconds(2), // 3 │ tied at Epoch+2s
Epoch.AddSeconds(2), // 4 ┘
};
var series = ts.Select((t, i) => new DataValueSnapshot((double)i, StatusCodes.Good, t, t)).ToArray();
nm.HistorianDataSource = new SeriesHistorianDataSource(series);
nm.EnsureVariable("eq-1/tie", parentFolderNodeId: null, displayName: "Tie", dataType: "Double",
writable: false, historianTagname: "WW.Tie");
var nodeId = nm.TryGetVariable("eq-1/tie")!.NodeId;
var collected = new List<double>();
var (r1, _, cp1) = ReadRaw(nm, nodeId, Epoch, Epoch.AddHours(1), max: 4, inboundCp: null);
r1.Count.ShouldBe(4);
r1.ShouldBe(new[] { 0.0, 1.0, 2.0, 3.0 });
cp1.ShouldNotBeNull();
collected.AddRange(r1);
var (r2, _, cp2) = ReadRaw(nm, nodeId, Epoch, Epoch.AddHours(1), max: 4, inboundCp: cp1);
// Only the un-emitted tie (index 4) — NOT the already-emitted 2 and 3.
r2.ShouldBe(new[] { 4.0 });
cp2.ShouldBeNull();
collected.AddRange(r2);
// Exactly the 5 distinct samples, once each.
collected.ShouldBe(new[] { 0.0, 1.0, 2.0, 3.0, 4.0 });
await host.DisposeAsync();
}
/// <summary>Oversized tie cluster (more ties at one timestamp than the page cap): the (timestamp, skip)
/// resume cursor alone cannot advance past it — the fixed-(start,end,cap) backend keeps returning the
/// same first <c>cap</c> ties. The paging now over-fetches the WHOLE cluster (a <c>start == end</c> read,
/// bounded by <c>MaxTieClusterOverfetch</c>) and pages WITHIN the timestamp, so the read drains the
/// cluster (and any data after it) across continuation points with no dup/skip. Here a tie cluster of 3
/// (indices 3..7 share one timestamp) sits between distinct samples; with cap 2 the union must be all 10
/// values in order.</summary>
[Fact]
public async Task Raw_oversized_tie_cluster_pages_within_the_timestamp()
{
var (host, server) = await BootAsync();
var nm = server.NodeManager!;
nm.HistoryContinuationStore = new InMemoryHistoryContinuationStore();
// 3 distinct, then a tie cluster of 5 (all at Epoch+3s), then 2 more distinct — 10 total. The tie
// cluster (5) is larger than the page cap (2), the case that used to stall the cursor.
var series = MakeSeriesWithTieCluster(distinctBefore: 3, tieCount: 5, distinctAfter: 2);
series.Length.ShouldBe(10);
nm.HistorianDataSource = new SeriesHistorianDataSource(series);
nm.EnsureVariable("eq-1/burst", parentFolderNodeId: null, displayName: "Burst", dataType: "Double",
writable: false, historianTagname: "WW.Burst");
var nodeId = nm.TryGetVariable("eq-1/burst")!.NodeId;
var collected = new List<double>();
byte[]? cp = null;
var pageCount = 0;
do
{
var (values, error, nextCp) = ReadRaw(nm, nodeId, Epoch, Epoch.AddHours(1),
max: 2, inboundCp: cp);
error.StatusCode.Code.ShouldBe(StatusCodes.Good);
collected.AddRange(values);
cp = nextCp;
pageCount++;
pageCount.ShouldBeLessThan(20, "paging must terminate, not loop, even through a tie cluster");
}
while (cp is not null);
// Every value, once, in order — the cluster was paged through, not dropped or duplicated.
collected.Count.ShouldBe(10);
collected.ShouldBe(Enumerable.Range(0, 10).Select(i => (double)i));
await host.DisposeAsync();
}
/// <summary>The absurd-burst backstop is preserved: a tie cluster STRICTLY larger than the configured
/// <see cref="OtOpcUaNodeManager.MaxTieClusterOverfetch"/> still surfaces
/// <c>BadHistoryOperationUnsupported</c> for that node rather than buffering an unbounded burst. With
/// the bound set to 3 and a 5-way tie cluster, the resume read that hits the cluster fails loudly.</summary>
[Fact]
public async Task Raw_tie_cluster_beyond_overfetch_bound_fails_loudly()
{
var (host, server) = await BootAsync();
var nm = server.NodeManager!;
nm.HistoryContinuationStore = new InMemoryHistoryContinuationStore();
nm.MaxTieClusterOverfetch = 3; // cluster of 5 will exceed this ⇒ backstop.
// 5 samples ALL sharing one timestamp (Epoch+2s) — a tie cluster of 5 with a page cap of 2.
var t = Epoch.AddSeconds(2);
var series = Enumerable.Range(0, 5)
.Select(i => new DataValueSnapshot((double)i, StatusCodes.Good, t, t)).ToArray();
nm.HistorianDataSource = new SeriesHistorianDataSource(series);
nm.EnsureVariable("eq-1/absurd", parentFolderNodeId: null, displayName: "Absurd", dataType: "Double",
writable: false, historianTagname: "WW.Absurd");
var nodeId = nm.TryGetVariable("eq-1/absurd")!.NodeId;
// Page 1: a full page of the first 2 ties, with a continuation point.
var (r1, e1, cp1) = ReadRaw(nm, nodeId, Epoch, Epoch.AddHours(1), max: 2, inboundCp: null);
e1.StatusCode.Code.ShouldBe(StatusCodes.Good);
r1.ShouldBe(new[] { 0.0, 1.0 });
cp1.ShouldNotBeNull();
// Page 2: the resume read hits the cluster; over-fetch finds 5 > bound 3 ⇒ a clear error, NOT a
// silent GoodNoData that would drop the un-emitted ties.
var (r2, e2, cp2) = ReadRaw(nm, nodeId, Epoch, Epoch.AddHours(1), max: 2, inboundCp: cp1);
e2.StatusCode.Code.ShouldBe(StatusCodes.BadHistoryOperationUnsupported);
r2.ShouldBeEmpty();
cp2.ShouldBeNull();
await host.DisposeAsync();
}
/// <summary>NumValuesPerNode == 0 ("all values") never pages — the whole series returns in one shot
/// with a null continuation point.</summary>
[Fact]
public async Task Raw_unlimited_request_returns_everything_without_a_cp()
{
var (host, server) = await BootAsync();
var nm = server.NodeManager!;
nm.HistoryContinuationStore = new InMemoryHistoryContinuationStore();
nm.HistorianDataSource = new SeriesHistorianDataSource(MakeSeries(count: 250, stepSeconds: 1));
nm.EnsureVariable("eq-1/all", parentFolderNodeId: null, displayName: "All", dataType: "Double",
writable: false, historianTagname: "WW.All");
var nodeId = nm.TryGetVariable("eq-1/all")!.NodeId;
var (r, e, cp) = ReadRaw(nm, nodeId, Epoch, Epoch.AddHours(1), max: 0, inboundCp: null);
e.StatusCode.Code.ShouldBe(StatusCodes.Good);
r.Count.ShouldBe(250);
cp.ShouldBeNull("an unlimited (max==0) request must not page");
await host.DisposeAsync();
}
/// <summary>An inbound continuation point the store doesn't know (released / never issued / from
/// another node) yields BadContinuationPointInvalid for that node, and the source is NOT read.</summary>
[Fact]
public async Task Raw_unknown_continuation_point_yields_BadContinuationPointInvalid()
{
var (host, server) = await BootAsync();
var nm = server.NodeManager!;
nm.HistoryContinuationStore = new InMemoryHistoryContinuationStore();
var fake = new SeriesHistorianDataSource(MakeSeries(count: 10, stepSeconds: 1));
nm.HistorianDataSource = fake;
nm.EnsureVariable("eq-1/bad-cp", parentFolderNodeId: null, displayName: "BadCp", dataType: "Double",
writable: false, historianTagname: "WW.BadCp");
var nodeId = nm.TryGetVariable("eq-1/bad-cp")!.NodeId;
fake.ResetReadCount();
var bogus = Guid.NewGuid().ToByteArray();
var (_, e, cp) = ReadRaw(nm, nodeId, Epoch, Epoch.AddHours(1), max: 100, inboundCp: bogus);
e.StatusCode.Code.ShouldBe(StatusCodes.BadContinuationPointInvalid);
cp.ShouldBeNull();
fake.ReadCount.ShouldBe(0); // never reached the backend.
await host.DisposeAsync();
}
/// <summary>releaseContinuationPoints drops the stored cursor WITHOUT reading data: a subsequent
/// resume of the released point then misses (BadContinuationPointInvalid).</summary>
[Fact]
public async Task Release_drops_the_cursor_without_reading()
{
var (host, server) = await BootAsync();
var nm = server.NodeManager!;
nm.HistoryContinuationStore = new InMemoryHistoryContinuationStore();
var fake = new SeriesHistorianDataSource(MakeSeries(count: 250, stepSeconds: 1));
nm.HistorianDataSource = fake;
nm.EnsureVariable("eq-1/rel", parentFolderNodeId: null, displayName: "Rel", dataType: "Double",
writable: false, historianTagname: "WW.Rel");
var nodeId = nm.TryGetVariable("eq-1/rel")!.NodeId;
// Page 1 — get a CP.
var (_, _, cp1) = ReadRaw(nm, nodeId, Epoch, Epoch.AddHours(1), max: 100, inboundCp: null);
cp1.ShouldNotBeNull();
// Release it — the dispatcher routes releaseContinuationPoints=true to HistoryReleaseContinuationPoints,
// which never reaches the Raw arm. No data is read.
fake.ResetReadCount();
InvokeRelease(nm, nodeId, cp1!);
fake.ReadCount.ShouldBe(0);
// Resuming the released point now misses.
var (_, e, _) = ReadRaw(nm, nodeId, Epoch, Epoch.AddHours(1), max: 100, inboundCp: cp1);
e.StatusCode.Code.ShouldBe(StatusCodes.BadContinuationPointInvalid);
await host.DisposeAsync();
}
// --- helpers --------------------------------------------------------------------------------
/// <summary>Issue a single-node Raw HistoryRead and return that node's decoded sample values, error,
/// and outbound continuation point.</summary>
private static (IReadOnlyList<double> Values, ServiceResult Error, byte[]? Cp) ReadRaw(
OtOpcUaNodeManager nm, NodeId nodeId, DateTime start, DateTime end, uint max, byte[]? inboundCp)
{
var context = new OperationContext(
new RequestHeader(), secureChannelContext: null, RequestType.HistoryRead, identity: null);
var details = new ReadRawModifiedDetails
{
StartTime = start,
EndTime = end,
NumValuesPerNode = max,
IsReadModified = false,
};
var nodesToRead = new List<HistoryReadValueId>
{
new() { NodeId = nodeId, ContinuationPoint = inboundCp },
};
var results = new List<SdkHistoryReadResult> { null! };
var errors = new List<ServiceResult> { null! };
nm.HistoryRead(context, details, TimestampsToReturn.Both,
releaseContinuationPoints: false, nodesToRead, results, errors);
var values = new List<double>();
if (results[0]?.HistoryData is { } ho && !ExtensionObject.IsNull(ho))
{
var data = (HistoryData)ExtensionObject.ToEncodeable(ho);
foreach (var dv in data.DataValues) values.Add((double)dv.Value);
}
return (values, errors[0], results[0]?.ContinuationPoint);
}
/// <summary>Issue a release-only HistoryRead (releaseContinuationPoints=true) for the node + point.</summary>
private static void InvokeRelease(OtOpcUaNodeManager nm, NodeId nodeId, byte[] cp)
{
var context = new OperationContext(
new RequestHeader(), secureChannelContext: null, RequestType.HistoryRead, identity: null);
var details = new ReadRawModifiedDetails
{
StartTime = Epoch, EndTime = Epoch.AddHours(1), NumValuesPerNode = 100, IsReadModified = false,
};
var nodesToRead = new List<HistoryReadValueId> { new() { NodeId = nodeId, ContinuationPoint = cp } };
var results = new List<SdkHistoryReadResult> { null! };
var errors = new List<ServiceResult> { null! };
nm.HistoryRead(context, details, TimestampsToReturn.Both,
releaseContinuationPoints: true, nodesToRead, results, errors);
}
private static DataValueSnapshot[] MakeSeries(int count, int stepSeconds) =>
Enumerable.Range(0, count)
.Select(i =>
{
var t = Epoch.AddSeconds((long)i * stepSeconds);
return new DataValueSnapshot((double)i, StatusCodes.Good, t, t);
})
.ToArray();
/// <summary>Build a sorted series: <paramref name="distinctBefore"/> distinct-second samples, then a
/// run of <paramref name="tieCount"/> samples ALL at the same timestamp (the next second after the
/// last distinct one — the oversized tie cluster), then <paramref name="distinctAfter"/> distinct
/// samples each a second later. Values are 0..N-1 in series order so the union assertion is exact.</summary>
private static DataValueSnapshot[] MakeSeriesWithTieCluster(int distinctBefore, int tieCount, int distinctAfter)
{
var samples = new List<DataValueSnapshot>(distinctBefore + tieCount + distinctAfter);
var value = 0;
var second = 0;
for (var i = 0; i < distinctBefore; i++, second++)
{
var t = Epoch.AddSeconds(second);
samples.Add(new DataValueSnapshot((double)value++, StatusCodes.Good, t, t));
}
var tieT = Epoch.AddSeconds(second++); // the single shared timestamp for the whole cluster
for (var i = 0; i < tieCount; i++)
samples.Add(new DataValueSnapshot((double)value++, StatusCodes.Good, tieT, tieT));
for (var i = 0; i < distinctAfter; i++, second++)
{
var t = Epoch.AddSeconds(second);
samples.Add(new DataValueSnapshot((double)value++, StatusCodes.Good, t, t));
}
return samples.ToArray();
}
/// <summary>A series-backed fake historian: holds a full sorted series and serves a Raw read by
/// returning the samples in [start, end] capped at maxValuesPerNode (start inclusive — exactly the
/// resume-cursor contract). Processed / AtTime / Events are not exercised here.</summary>
private sealed class SeriesHistorianDataSource(IReadOnlyList<DataValueSnapshot> series) : IHistorianDataSource
{
private int _readCount;
public int ReadCount => _readCount;
public void ResetReadCount() => _readCount = 0;
public Task<HistorianRead> ReadRawAsync(
string fullReference, DateTime startUtc, DateTime endUtc, uint maxValuesPerNode,
CancellationToken cancellationToken)
{
Interlocked.Increment(ref _readCount);
var window = series
.Where(s => (s.SourceTimestampUtc ?? DateTime.MinValue) >= startUtc
&& (s.SourceTimestampUtc ?? DateTime.MinValue) <= endUtc)
.ToList();
// maxValuesPerNode == 0 means "no limit".
var page = maxValuesPerNode == 0 ? window : window.Take((int)maxValuesPerNode).ToList();
return Task.FromResult(new HistorianRead(page, null));
}
public Task<HistorianRead> ReadProcessedAsync(
string fullReference, DateTime startUtc, DateTime endUtc, TimeSpan interval,
HistoryAggregateType aggregate, CancellationToken cancellationToken) =>
Task.FromResult(new HistorianRead(Array.Empty<DataValueSnapshot>(), null));
public Task<HistorianRead> ReadAtTimeAsync(
string fullReference, IReadOnlyList<DateTime> timestampsUtc, CancellationToken cancellationToken) =>
Task.FromResult(new HistorianRead(Array.Empty<DataValueSnapshot>(), null));
public Task<HistoricalEventsResult> ReadEventsAsync(
string? sourceName, DateTime startUtc, DateTime endUtc, int maxEvents,
CancellationToken cancellationToken) =>
Task.FromResult(new HistoricalEventsResult(Array.Empty<HistoricalEvent>(), null));
public HistorianHealthSnapshot GetHealthSnapshot() => NullHistorianDataSource.Instance.GetHealthSnapshot();
public void Dispose() { }
}
private async Task<(OpcUaApplicationHost Host, OtOpcUaSdkServer Server)> BootAsync()
{
var host = new OpcUaApplicationHost(
new OpcUaApplicationHostOptions
{
ApplicationName = "OtOpcUa.HistoryPagingTest",
ApplicationUri = $"urn:OtOpcUa.HistoryPagingTest:{Guid.NewGuid():N}",
OpcUaPort = AllocateFreePort(),
PublicHostname = "localhost",
PkiStoreRoot = _pkiRoot,
},
NullLogger<OpcUaApplicationHost>.Instance);
var server = new OtOpcUaSdkServer();
await host.StartAsync(server, Ct);
return (host, server);
}
private static int AllocateFreePort()
{
using var listener = new System.Net.Sockets.TcpListener(System.Net.IPAddress.Loopback, 0);
listener.Start();
var port = ((System.Net.IPEndPoint)listener.LocalEndpoint).Port;
listener.Stop();
return port;
}
public void Dispose()
{
if (Directory.Exists(_pkiRoot))
{
try { Directory.Delete(_pkiRoot, recursive: true); }
catch { /* best-effort cleanup */ }
}
}
}