using Microsoft.Extensions.Logging.Abstractions;
using Opc.Ua;
using Opc.Ua.Server;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using HistorianRead = ZB.MOM.WW.OtOpcUa.Core.Abstractions.HistoryReadResult;
using SdkHistoryReadResult = Opc.Ua.HistoryReadResult;
namespace ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests;
///
/// Phase-D server-side HistoryRead continuation-point paging for the Raw arm. Boots a real
/// (as does), injects an
/// (the in-process harness uses a SESSION-LESS
/// OperationContext, so the production session-backed store has no session to bind to — the
/// in-memory store exercises the SAME dispatch path), and a series-backed fake historian, then drives
/// the full multi-page round trip through the node manager's public HistoryRead.
///
public sealed class NodeManagerHistoryReadPagingTests : IDisposable
{
private static CancellationToken Ct => TestContext.Current.CancellationToken;
private readonly string _pkiRoot = Path.Combine(
Path.GetTempPath(), $"otopcua-historypaging-{Guid.NewGuid():N}");
private static readonly DateTime Epoch = new(2026, 1, 1, 0, 0, 0, DateTimeKind.Utc);
/// A full first page emits a continuation point; resuming returns subsequent pages; the
/// final short page clears the continuation point — and the union of pages is the complete series with
/// no duplicate or skipped sample. (Resumed pages net slightly fewer than the cap because each resume
/// re-reads + trims the boundary sample to stay tie-safe — see the design; the union is what matters.)
[Fact]
public async Task Raw_pages_full_series_across_continuation_points()
{
var (host, server) = await BootAsync();
var nm = server.NodeManager!;
nm.HistoryContinuationStore = new InMemoryHistoryContinuationStore();
// 250 distinct-timestamp samples; client caps each page at 100.
var series = MakeSeries(count: 250, stepSeconds: 1);
nm.HistorianDataSource = new SeriesHistorianDataSource(series);
nm.EnsureVariable("eq-1/pv", parentFolderNodeId: null, displayName: "PV", dataType: "Double",
writable: false, historianTagname: "WW.PV");
var nodeId = nm.TryGetVariable("eq-1/pv")!.NodeId;
var collected = new List();
byte[]? cp = null;
var pageCount = 0;
var sawCp = false;
do
{
var (values, error, nextCp) = ReadRaw(nm, nodeId, start: Epoch, end: Epoch.AddHours(1),
max: 100, inboundCp: cp);
error.StatusCode.Code.ShouldBe(StatusCodes.Good);
collected.AddRange(values);
cp = nextCp;
if (cp is not null) sawCp = true;
pageCount++;
pageCount.ShouldBeLessThan(20, "paging must terminate, not loop");
}
while (cp is not null);
sawCp.ShouldBeTrue("a 250-sample series capped at 100 must page");
pageCount.ShouldBeGreaterThan(1);
// The union is the exact series — no duplicates, no skips, in order.
collected.Count.ShouldBe(250);
collected.ShouldBe(Enumerable.Range(0, 250).Select(i => (double)i));
await host.DisposeAsync();
}
/// An exactly-full series whose total is a multiple of the cap pages to a trailing
/// EMPTY page (GoodNoData, no CP) — the server can't know the prior page was the last until it reads
/// past the end.
[Fact]
public async Task Raw_exact_multiple_terminates_with_empty_final_page()
{
var (host, server) = await BootAsync();
var nm = server.NodeManager!;
nm.HistoryContinuationStore = new InMemoryHistoryContinuationStore();
nm.HistorianDataSource = new SeriesHistorianDataSource(MakeSeries(count: 100, stepSeconds: 1));
nm.EnsureVariable("eq-1/exact", parentFolderNodeId: null, displayName: "Exact", dataType: "Double",
writable: false, historianTagname: "WW.Exact");
var nodeId = nm.TryGetVariable("eq-1/exact")!.NodeId;
var (r1, _, cp1) = ReadRaw(nm, nodeId, Epoch, Epoch.AddHours(1), max: 100, inboundCp: null);
r1.Count.ShouldBe(100);
cp1.ShouldNotBeNull("a full page emits a CP even when it happens to be the last");
var (r2, e2, cp2) = ReadRaw(nm, nodeId, Epoch, Epoch.AddHours(1), max: 100, inboundCp: cp1);
r2.Count.ShouldBe(0);
e2.StatusCode.Code.ShouldBe(StatusCodes.Good);
cp2.ShouldBeNull("the empty trailing page terminates paging");
await host.DisposeAsync();
}
/// Boundary timestamps that TIE across the page split are neither duplicated nor skipped:
/// the resume cursor drops the already-emitted ties and re-reads the un-emitted ones at the same
/// timestamp.
[Fact]
public async Task Raw_paging_dedups_tied_boundary_timestamps()
{
var (host, server) = await BootAsync();
var nm = server.NodeManager!;
nm.HistoryContinuationStore = new InMemoryHistoryContinuationStore();
// 5 samples: indices 2,3,4 all share the SAME timestamp (Epoch+2s). With cap 4, page 1 returns
// [0,1,2,3] (cutting through the tie cluster); page 2 must return [4] (the un-emitted tie) only.
var ts = new[]
{
Epoch, // 0
Epoch.AddSeconds(1), // 1
Epoch.AddSeconds(2), // 2 ┐
Epoch.AddSeconds(2), // 3 │ tied at Epoch+2s
Epoch.AddSeconds(2), // 4 ┘
};
var series = ts.Select((t, i) => new DataValueSnapshot((double)i, StatusCodes.Good, t, t)).ToArray();
nm.HistorianDataSource = new SeriesHistorianDataSource(series);
nm.EnsureVariable("eq-1/tie", parentFolderNodeId: null, displayName: "Tie", dataType: "Double",
writable: false, historianTagname: "WW.Tie");
var nodeId = nm.TryGetVariable("eq-1/tie")!.NodeId;
var collected = new List();
var (r1, _, cp1) = ReadRaw(nm, nodeId, Epoch, Epoch.AddHours(1), max: 4, inboundCp: null);
r1.Count.ShouldBe(4);
r1.ShouldBe(new[] { 0.0, 1.0, 2.0, 3.0 });
cp1.ShouldNotBeNull();
collected.AddRange(r1);
var (r2, _, cp2) = ReadRaw(nm, nodeId, Epoch, Epoch.AddHours(1), max: 4, inboundCp: cp1);
// Only the un-emitted tie (index 4) — NOT the already-emitted 2 and 3.
r2.ShouldBe(new[] { 4.0 });
cp2.ShouldBeNull();
collected.AddRange(r2);
// Exactly the 5 distinct samples, once each.
collected.ShouldBe(new[] { 0.0, 1.0, 2.0, 3.0, 4.0 });
await host.DisposeAsync();
}
/// Degenerate tie cluster larger than the page cap: a single timestamp carrying MORE ties
/// than NumValuesPerNode cannot be paged past by a (timestamp, skip) cursor — the fixed-(start,
/// end,cap) backend keeps returning the same first cap ties. Rather than silently truncate to
/// GoodNoData (permanently dropping the un-emitted ties), the resume read FAILS LOUDLY for that node
/// with BadHistoryOperationUnsupported. (Regression for the data-loss path the carry-offset
/// cursor cannot resolve; the operator's remedy is a larger NumValuesPerNode.)
[Fact]
public async Task Raw_tie_cluster_larger_than_page_fails_loudly_not_silently()
{
var (host, server) = await BootAsync();
var nm = server.NodeManager!;
nm.HistoryContinuationStore = new InMemoryHistoryContinuationStore();
// 6 samples ALL sharing one timestamp (Epoch+2s) — a tie cluster of 6 with a page cap of 4.
var t = Epoch.AddSeconds(2);
var series = Enumerable.Range(0, 6)
.Select(i => new DataValueSnapshot((double)i, StatusCodes.Good, t, t)).ToArray();
nm.HistorianDataSource = new SeriesHistorianDataSource(series);
nm.EnsureVariable("eq-1/burst", parentFolderNodeId: null, displayName: "Burst", dataType: "Double",
writable: false, historianTagname: "WW.Burst");
var nodeId = nm.TryGetVariable("eq-1/burst")!.NodeId;
// Page 1: a full page of the first 4 ties, with a continuation point.
var (r1, e1, cp1) = ReadRaw(nm, nodeId, Epoch, Epoch.AddHours(1), max: 4, inboundCp: null);
e1.StatusCode.Code.ShouldBe(StatusCodes.Good);
r1.ShouldBe(new[] { 0.0, 1.0, 2.0, 3.0 });
cp1.ShouldNotBeNull();
// Page 2: the cursor cannot advance past the oversized cluster ⇒ a clear error, NOT a silent
// GoodNoData that would drop samples 4 and 5.
var (r2, e2, cp2) = ReadRaw(nm, nodeId, Epoch, Epoch.AddHours(1), max: 4, inboundCp: cp1);
e2.StatusCode.Code.ShouldBe(StatusCodes.BadHistoryOperationUnsupported);
r2.ShouldBeEmpty();
cp2.ShouldBeNull();
await host.DisposeAsync();
}
/// NumValuesPerNode == 0 ("all values") never pages — the whole series returns in one shot
/// with a null continuation point.
[Fact]
public async Task Raw_unlimited_request_returns_everything_without_a_cp()
{
var (host, server) = await BootAsync();
var nm = server.NodeManager!;
nm.HistoryContinuationStore = new InMemoryHistoryContinuationStore();
nm.HistorianDataSource = new SeriesHistorianDataSource(MakeSeries(count: 250, stepSeconds: 1));
nm.EnsureVariable("eq-1/all", parentFolderNodeId: null, displayName: "All", dataType: "Double",
writable: false, historianTagname: "WW.All");
var nodeId = nm.TryGetVariable("eq-1/all")!.NodeId;
var (r, e, cp) = ReadRaw(nm, nodeId, Epoch, Epoch.AddHours(1), max: 0, inboundCp: null);
e.StatusCode.Code.ShouldBe(StatusCodes.Good);
r.Count.ShouldBe(250);
cp.ShouldBeNull("an unlimited (max==0) request must not page");
await host.DisposeAsync();
}
/// An inbound continuation point the store doesn't know (released / never issued / from
/// another node) yields BadContinuationPointInvalid for that node, and the source is NOT read.
[Fact]
public async Task Raw_unknown_continuation_point_yields_BadContinuationPointInvalid()
{
var (host, server) = await BootAsync();
var nm = server.NodeManager!;
nm.HistoryContinuationStore = new InMemoryHistoryContinuationStore();
var fake = new SeriesHistorianDataSource(MakeSeries(count: 10, stepSeconds: 1));
nm.HistorianDataSource = fake;
nm.EnsureVariable("eq-1/bad-cp", parentFolderNodeId: null, displayName: "BadCp", dataType: "Double",
writable: false, historianTagname: "WW.BadCp");
var nodeId = nm.TryGetVariable("eq-1/bad-cp")!.NodeId;
fake.ResetReadCount();
var bogus = Guid.NewGuid().ToByteArray();
var (_, e, cp) = ReadRaw(nm, nodeId, Epoch, Epoch.AddHours(1), max: 100, inboundCp: bogus);
e.StatusCode.Code.ShouldBe(StatusCodes.BadContinuationPointInvalid);
cp.ShouldBeNull();
fake.ReadCount.ShouldBe(0); // never reached the backend.
await host.DisposeAsync();
}
/// releaseContinuationPoints drops the stored cursor WITHOUT reading data: a subsequent
/// resume of the released point then misses (BadContinuationPointInvalid).
[Fact]
public async Task Release_drops_the_cursor_without_reading()
{
var (host, server) = await BootAsync();
var nm = server.NodeManager!;
nm.HistoryContinuationStore = new InMemoryHistoryContinuationStore();
var fake = new SeriesHistorianDataSource(MakeSeries(count: 250, stepSeconds: 1));
nm.HistorianDataSource = fake;
nm.EnsureVariable("eq-1/rel", parentFolderNodeId: null, displayName: "Rel", dataType: "Double",
writable: false, historianTagname: "WW.Rel");
var nodeId = nm.TryGetVariable("eq-1/rel")!.NodeId;
// Page 1 — get a CP.
var (_, _, cp1) = ReadRaw(nm, nodeId, Epoch, Epoch.AddHours(1), max: 100, inboundCp: null);
cp1.ShouldNotBeNull();
// Release it — the dispatcher routes releaseContinuationPoints=true to HistoryReleaseContinuationPoints,
// which never reaches the Raw arm. No data is read.
fake.ResetReadCount();
InvokeRelease(nm, nodeId, cp1!);
fake.ReadCount.ShouldBe(0);
// Resuming the released point now misses.
var (_, e, _) = ReadRaw(nm, nodeId, Epoch, Epoch.AddHours(1), max: 100, inboundCp: cp1);
e.StatusCode.Code.ShouldBe(StatusCodes.BadContinuationPointInvalid);
await host.DisposeAsync();
}
// --- helpers --------------------------------------------------------------------------------
/// Issue a single-node Raw HistoryRead and return that node's decoded sample values, error,
/// and outbound continuation point.
private static (IReadOnlyList Values, ServiceResult Error, byte[]? Cp) ReadRaw(
OtOpcUaNodeManager nm, NodeId nodeId, DateTime start, DateTime end, uint max, byte[]? inboundCp)
{
var context = new OperationContext(
new RequestHeader(), secureChannelContext: null, RequestType.HistoryRead, identity: null);
var details = new ReadRawModifiedDetails
{
StartTime = start,
EndTime = end,
NumValuesPerNode = max,
IsReadModified = false,
};
var nodesToRead = new List
{
new() { NodeId = nodeId, ContinuationPoint = inboundCp },
};
var results = new List { null! };
var errors = new List { null! };
nm.HistoryRead(context, details, TimestampsToReturn.Both,
releaseContinuationPoints: false, nodesToRead, results, errors);
var values = new List();
if (results[0]?.HistoryData is { } ho && !ExtensionObject.IsNull(ho))
{
var data = (HistoryData)ExtensionObject.ToEncodeable(ho);
foreach (var dv in data.DataValues) values.Add((double)dv.Value);
}
return (values, errors[0], results[0]?.ContinuationPoint);
}
/// Issue a release-only HistoryRead (releaseContinuationPoints=true) for the node + point.
private static void InvokeRelease(OtOpcUaNodeManager nm, NodeId nodeId, byte[] cp)
{
var context = new OperationContext(
new RequestHeader(), secureChannelContext: null, RequestType.HistoryRead, identity: null);
var details = new ReadRawModifiedDetails
{
StartTime = Epoch, EndTime = Epoch.AddHours(1), NumValuesPerNode = 100, IsReadModified = false,
};
var nodesToRead = new List { new() { NodeId = nodeId, ContinuationPoint = cp } };
var results = new List { null! };
var errors = new List { null! };
nm.HistoryRead(context, details, TimestampsToReturn.Both,
releaseContinuationPoints: true, nodesToRead, results, errors);
}
private static DataValueSnapshot[] MakeSeries(int count, int stepSeconds) =>
Enumerable.Range(0, count)
.Select(i =>
{
var t = Epoch.AddSeconds((long)i * stepSeconds);
return new DataValueSnapshot((double)i, StatusCodes.Good, t, t);
})
.ToArray();
/// A series-backed fake historian: holds a full sorted series and serves a Raw read by
/// returning the samples in [start, end] capped at maxValuesPerNode (start inclusive — exactly the
/// resume-cursor contract). Processed / AtTime / Events are not exercised here.
private sealed class SeriesHistorianDataSource(IReadOnlyList series) : IHistorianDataSource
{
private int _readCount;
public int ReadCount => _readCount;
public void ResetReadCount() => _readCount = 0;
public Task ReadRawAsync(
string fullReference, DateTime startUtc, DateTime endUtc, uint maxValuesPerNode,
CancellationToken cancellationToken)
{
Interlocked.Increment(ref _readCount);
var window = series
.Where(s => (s.SourceTimestampUtc ?? DateTime.MinValue) >= startUtc
&& (s.SourceTimestampUtc ?? DateTime.MinValue) <= endUtc)
.ToList();
// maxValuesPerNode == 0 means "no limit".
var page = maxValuesPerNode == 0 ? window : window.Take((int)maxValuesPerNode).ToList();
return Task.FromResult(new HistorianRead(page, null));
}
public Task ReadProcessedAsync(
string fullReference, DateTime startUtc, DateTime endUtc, TimeSpan interval,
HistoryAggregateType aggregate, CancellationToken cancellationToken) =>
Task.FromResult(new HistorianRead(Array.Empty(), null));
public Task ReadAtTimeAsync(
string fullReference, IReadOnlyList timestampsUtc, CancellationToken cancellationToken) =>
Task.FromResult(new HistorianRead(Array.Empty(), null));
public Task ReadEventsAsync(
string? sourceName, DateTime startUtc, DateTime endUtc, int maxEvents,
CancellationToken cancellationToken) =>
Task.FromResult(new HistoricalEventsResult(Array.Empty(), null));
public HistorianHealthSnapshot GetHealthSnapshot() => NullHistorianDataSource.Instance.GetHealthSnapshot();
public void Dispose() { }
}
private async Task<(OpcUaApplicationHost Host, OtOpcUaSdkServer Server)> BootAsync()
{
var host = new OpcUaApplicationHost(
new OpcUaApplicationHostOptions
{
ApplicationName = "OtOpcUa.HistoryPagingTest",
ApplicationUri = $"urn:OtOpcUa.HistoryPagingTest:{Guid.NewGuid():N}",
OpcUaPort = AllocateFreePort(),
PublicHostname = "localhost",
PkiStoreRoot = _pkiRoot,
},
NullLogger.Instance);
var server = new OtOpcUaSdkServer();
await host.StartAsync(server, Ct);
return (host, server);
}
private static int AllocateFreePort()
{
using var listener = new System.Net.Sockets.TcpListener(System.Net.IPAddress.Loopback, 0);
listener.Start();
var port = ((System.Net.IPEndPoint)listener.LocalEndpoint).Port;
listener.Stop();
return port;
}
public void Dispose()
{
if (Directory.Exists(_pkiRoot))
{
try { Directory.Delete(_pkiRoot, recursive: true); }
catch { /* best-effort cleanup */ }
}
}
}