Auto: opcuaclient-3 — honor server OperationLimits

Closes #275
This commit is contained in:
Joseph Doherty
2026-04-25 15:38:55 -04:00
parent 9f1e033e83
commit 494fdf2358
2 changed files with 303 additions and 33 deletions

View File

@@ -75,6 +75,31 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
/// </summary>
private SessionReconnectHandler? _reconnectHandler;
/// <summary>
/// Cached server-advertised OperationLimits, fetched lazily on first batch op and
/// refreshed on reconnect. Null until the first successful fetch; null components
/// mean "fetch hasn't completed yet, fall through to single-call". Per spec, a 0
/// limit means "no limit" — we surface that as <c>uint?</c>=null too so the
/// chunking helper has a single sentinel for "don't chunk".
/// </summary>
private OperationLimitsCache? _operationLimits;
private readonly SemaphoreSlim _operationLimitsLock = new(1, 1);
/// <summary>
/// Snapshot of the four OperationLimits the driver chunks against. Stored as
/// <c>uint?</c> so callers can distinguish "not yet fetched" / "no limit"
/// (null) from "limit = N" (Some(N)). Spec sentinel 0 is normalized to null at
/// fetch time so the chunking helper has a single "don't chunk" sentinel.
/// </summary>
internal sealed record OperationLimitsCache(
uint? MaxNodesPerRead,
uint? MaxNodesPerWrite,
uint? MaxNodesPerBrowse,
uint? MaxNodesPerHistoryReadData);
/// <summary>Test seam — exposes the cached limits so unit tests can assert fetch behaviour.</summary>
internal OperationLimitsCache? OperationLimitsForTest => _operationLimits;
public string DriverInstanceId => driverInstanceId;
public string DriverType => "OpcUaClient";
@@ -427,6 +452,7 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
try { Session?.Dispose(); } catch { }
Session = null;
_connectedEndpointUrl = null;
_operationLimits = null;
TransitionTo(HostState.Unknown);
_health = new DriverHealth(DriverState.Unknown, _health.LastSuccessfulRead, null);
@@ -442,6 +468,7 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
IReadOnlyList<string> fullReferences, CancellationToken cancellationToken)
{
var session = RequireSession();
await EnsureOperationLimitsFetchedAsync(cancellationToken).ConfigureAwait(false);
var results = new DataValueSnapshot[fullReferences.Count];
var now = DateTime.UtcNow;
@@ -463,31 +490,44 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
if (toSend.Count == 0) return results;
// Honor server's MaxNodesPerRead — chunk large batches so a single ReadAsync stays
// under the cap. cap=null means "no limit" (sentinel for both 0-from-server and
// not-yet-fetched), in which case ChunkBy yields the input as a single slice and
// the wire path collapses to one SDK call.
var readCap = _operationLimits?.MaxNodesPerRead;
var indexMapList = indexMap; // close over for catch
await _gate.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
try
{
var resp = await session.ReadAsync(
requestHeader: null,
maxAge: 0,
timestampsToReturn: TimestampsToReturn.Both,
nodesToRead: toSend,
ct: cancellationToken).ConfigureAwait(false);
var values = resp.Results;
for (var w = 0; w < values.Count; w++)
var wireOffset = 0;
foreach (var chunk in ChunkBy(toSend, readCap))
{
var r = indexMap[w];
var dv = values[w];
// Preserve the upstream StatusCode verbatim — including Bad codes per
// §8's cascading-quality rule. Also preserve SourceTimestamp so downstream
// clients can detect stale upstream data.
results[r] = new DataValueSnapshot(
Value: dv.Value,
StatusCode: dv.StatusCode.Code,
SourceTimestampUtc: dv.SourceTimestamp == DateTime.MinValue ? null : dv.SourceTimestamp,
ServerTimestampUtc: dv.ServerTimestamp == DateTime.MinValue ? now : dv.ServerTimestamp);
var chunkColl = new ReadValueIdCollection(chunk.Count);
for (var i = 0; i < chunk.Count; i++) chunkColl.Add(chunk.Array![chunk.Offset + i]);
var resp = await session.ReadAsync(
requestHeader: null,
maxAge: 0,
timestampsToReturn: TimestampsToReturn.Both,
nodesToRead: chunkColl,
ct: cancellationToken).ConfigureAwait(false);
var values = resp.Results;
for (var w = 0; w < values.Count; w++)
{
var r = indexMapList[wireOffset + w];
var dv = values[w];
// Preserve the upstream StatusCode verbatim — including Bad codes per
// §8's cascading-quality rule. Also preserve SourceTimestamp so downstream
// clients can detect stale upstream data.
results[r] = new DataValueSnapshot(
Value: dv.Value,
StatusCode: dv.StatusCode.Code,
SourceTimestampUtc: dv.SourceTimestamp == DateTime.MinValue ? null : dv.SourceTimestamp,
ServerTimestampUtc: dv.ServerTimestamp == DateTime.MinValue ? now : dv.ServerTimestamp);
}
wireOffset += chunk.Count;
}
_health = new DriverHealth(DriverState.Healthy, now, null);
}
@@ -497,9 +537,9 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
// tag in this batch. Per-tag StatusCode stays BadCommunicationError (not
// BadInternalError) so operators distinguish "upstream unreachable" from
// "driver bug".
for (var w = 0; w < indexMap.Count; w++)
for (var w = 0; w < indexMapList.Count; w++)
{
var r = indexMap[w];
var r = indexMapList[w];
results[r] = new DataValueSnapshot(null, StatusBadCommunicationError, null, now);
}
_health = new DriverHealth(DriverState.Degraded, _health.LastSuccessfulRead, ex.Message);
@@ -515,6 +555,7 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
IReadOnlyList<Core.Abstractions.WriteRequest> writes, CancellationToken cancellationToken)
{
var session = RequireSession();
await EnsureOperationLimitsFetchedAsync(cancellationToken).ConfigureAwait(false);
var results = new WriteResult[writes.Count];
var toSend = new WriteValueCollection();
@@ -537,24 +578,34 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
if (toSend.Count == 0) return results;
// Honor server's MaxNodesPerWrite — same chunking pattern as ReadAsync. cap=null
// collapses to a single wire call.
var writeCap = _operationLimits?.MaxNodesPerWrite;
await _gate.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
try
{
var resp = await session.WriteAsync(
requestHeader: null,
nodesToWrite: toSend,
ct: cancellationToken).ConfigureAwait(false);
var codes = resp.Results;
for (var w = 0; w < codes.Count; w++)
var wireOffset = 0;
foreach (var chunk in ChunkBy(toSend, writeCap))
{
var r = indexMap[w];
// Pass upstream WriteResult StatusCode through verbatim. Success codes
// include Good (0) and any warning-level Good* variants; anything with
// the severity bits set is a Bad.
results[r] = new WriteResult(codes[w].Code);
var chunkColl = new WriteValueCollection(chunk.Count);
for (var i = 0; i < chunk.Count; i++) chunkColl.Add(chunk.Array![chunk.Offset + i]);
var resp = await session.WriteAsync(
requestHeader: null,
nodesToWrite: chunkColl,
ct: cancellationToken).ConfigureAwait(false);
var codes = resp.Results;
for (var w = 0; w < codes.Count; w++)
{
var r = indexMap[wireOffset + w];
// Pass upstream WriteResult StatusCode through verbatim. Success codes
// include Good (0) and any warning-level Good* variants; anything with
// the severity bits set is a Bad.
results[r] = new WriteResult(codes[w].Code);
}
wireOffset += chunk.Count;
}
}
catch (Exception)
@@ -591,6 +642,81 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
private ISession RequireSession() =>
Session ?? throw new InvalidOperationException("OpcUaClientDriver not initialized");
/// <summary>
/// Lazily fetch <c>Server.ServerCapabilities.OperationLimits</c> from the upstream
/// server and cache them on the driver. Idempotent — called from every batch op,
/// no-ops once a successful fetch has populated the cache. The cache is cleared on
/// reconnect (see <see cref="OnReconnectComplete"/>) so a server with redrawn
/// capabilities doesn't run forever with stale caps.
/// </summary>
/// <remarks>
/// Uses <see cref="Session.FetchOperationLimitsAsync(CancellationToken)"/> when the
/// active session is a concrete <see cref="Session"/> (always true in production —
/// the SDK's session factory returns Session). Falls back gracefully on any fetch
/// failure: callers see <see cref="_operationLimits"/> remain null and fall through
/// to single-call behaviour. Per OPC UA Part 5, a server reporting 0 for any
/// OperationLimits attribute means "no limit"; we normalize that to <c>null</c> so
/// the chunking helper has a single sentinel.
/// </remarks>
private async Task EnsureOperationLimitsFetchedAsync(CancellationToken ct)
{
if (_operationLimits is not null) return;
await _operationLimitsLock.WaitAsync(ct).ConfigureAwait(false);
try
{
if (_operationLimits is not null) return;
if (Session is not Session concrete) return;
try
{
await concrete.FetchOperationLimitsAsync(ct).ConfigureAwait(false);
var ol = concrete.OperationLimits;
if (ol is null) return;
_operationLimits = new OperationLimitsCache(
MaxNodesPerRead: NormalizeLimit(ol.MaxNodesPerRead),
MaxNodesPerWrite: NormalizeLimit(ol.MaxNodesPerWrite),
MaxNodesPerBrowse: NormalizeLimit(ol.MaxNodesPerBrowse),
MaxNodesPerHistoryReadData: NormalizeLimit(ol.MaxNodesPerHistoryReadData));
}
catch
{
// Fetch failed — leave cache null so we re-attempt on the next batch op.
// Single-call behaviour applies in the meantime; never block traffic on a
// capability discovery glitch.
}
}
finally { _operationLimitsLock.Release(); }
}
/// <summary>Spec sentinel: 0 = "no limit". Normalize to null for the chunking helper.</summary>
private static uint? NormalizeLimit(uint raw) => raw == 0 ? null : raw;
/// <summary>
/// Split <paramref name="source"/> into contiguous slices of at most <paramref name="cap"/>
/// items. Returns the input as a single slice when the cap is null (no limit),
/// 0, or larger than the input — the spec sentinel + the no-cap path collapse onto
/// the same single-call branch so the wire path stays a single SDK invocation when
/// the server doesn't impose a limit.
/// </summary>
internal static IEnumerable<ArraySegment<T>> ChunkBy<T>(IReadOnlyList<T> source, uint? cap)
{
if (source.Count == 0) yield break;
var array = source as T[] ?? source.ToArray();
if (cap is null or 0 || (uint)array.Length <= cap.Value)
{
yield return new ArraySegment<T>(array, 0, array.Length);
yield break;
}
var size = checked((int)cap.Value);
for (var offset = 0; offset < array.Length; offset += size)
{
var len = Math.Min(size, array.Length - offset);
yield return new ArraySegment<T>(array, offset, len);
}
}
// ---- ITagDiscovery ----
public async Task DiscoverAsync(IAddressSpaceBuilder builder, CancellationToken cancellationToken)
@@ -1437,6 +1563,10 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
}
Session = newSession;
// Drop cached OperationLimits so the next batch op refetches against the (potentially
// re-redeployed) upstream server. A zero-cost guard against a server whose published
// capabilities changed across the reconnect window.
_operationLimits = null;
_reconnectHandler?.Dispose();
_reconnectHandler = null;
@@ -1472,5 +1602,6 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
try { await ShutdownAsync(CancellationToken.None).ConfigureAwait(false); }
catch { /* disposal is best-effort */ }
_gate.Dispose();
_operationLimitsLock.Dispose();
}
}

View File

@@ -0,0 +1,139 @@
using Shouldly;
using Xunit;
namespace ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests;
/// <summary>
/// Unit tests for the OperationLimits chunking surface (PR #275 / opcuaclient-3). Focused
/// on the static <see cref="OpcUaClientDriver.ChunkBy{T}"/> helper + the
/// <see cref="OpcUaClientDriver.OperationLimitsCache"/> sentinel semantics. Live
/// end-to-end tests against an in-process server land in the integration suite.
/// </summary>
[Trait("Category", "Unit")]
public sealed class OpcUaClientOperationLimitsTests
{
[Fact]
public void ChunkBy_with_cap_5_splits_12_items_into_3_slices_of_5_5_2()
{
// The PR-3 acceptance scenario: server advertises MaxNodesPerRead=5, client batches a
// 12-tag read; driver must issue exactly 3 wire calls of sizes 5/5/2 in order.
var input = Enumerable.Range(0, 12).ToArray();
var slices = OpcUaClientDriver.ChunkBy<int>(input, cap: 5).ToArray();
slices.Length.ShouldBe(3);
slices[0].Count.ShouldBe(5);
slices[1].Count.ShouldBe(5);
slices[2].Count.ShouldBe(2);
// Order + offsets must reflect the original sequence — chunking must not reorder
// tags, otherwise the indexMap ↔ result-index alignment breaks.
slices[0].ShouldBe(new[] { 0, 1, 2, 3, 4 });
slices[1].ShouldBe(new[] { 5, 6, 7, 8, 9 });
slices[2].ShouldBe(new[] { 10, 11 });
}
[Fact]
public void ChunkBy_with_null_cap_yields_single_slice_no_chunking()
{
// cap=null is the "fetch hasn't completed" / "server reports 0 = no limit" sentinel.
// Both must collapse to a single SDK call so the wire path doesn't change when the
// server doesn't impose a cap.
var input = Enumerable.Range(0, 12).ToArray();
var slices = OpcUaClientDriver.ChunkBy<int>(input, cap: null).ToArray();
slices.Length.ShouldBe(1, "null cap means no chunking — single SDK call");
slices[0].Count.ShouldBe(12);
}
[Fact]
public void ChunkBy_with_zero_cap_yields_single_slice_no_chunking()
{
// OPC UA Part 5: 0 is the wire-level "no limit" sentinel. NormalizeLimit folds it
// into null upstream of ChunkBy, but the chunker itself must also treat 0 as
// no-chunking — defence in depth in case a caller bypasses NormalizeLimit.
var input = Enumerable.Range(0, 7).ToArray();
var slices = OpcUaClientDriver.ChunkBy<int>(input, cap: 0).ToArray();
slices.Length.ShouldBe(1);
slices[0].Count.ShouldBe(7);
}
[Fact]
public void ChunkBy_with_cap_larger_than_input_yields_single_slice()
{
var input = new[] { 1, 2, 3 };
var slices = OpcUaClientDriver.ChunkBy<int>(input, cap: 100).ToArray();
slices.Length.ShouldBe(1);
slices[0].Count.ShouldBe(3);
}
[Fact]
public void ChunkBy_with_empty_input_yields_no_slices()
{
// Empty batch must short-circuit before the wire call — saves a round-trip and
// matches the !toSend.Count == 0 guard in the driver.
var input = Array.Empty<int>();
var slices = OpcUaClientDriver.ChunkBy<int>(input, cap: 5).ToArray();
slices.Length.ShouldBe(0);
}
[Fact]
public void ChunkBy_with_cap_equal_to_input_size_yields_single_slice()
{
// Edge case: exactly N items at cap N. Must NOT produce an extra empty slice.
var input = Enumerable.Range(0, 5).ToArray();
var slices = OpcUaClientDriver.ChunkBy<int>(input, cap: 5).ToArray();
slices.Length.ShouldBe(1);
slices[0].Count.ShouldBe(5);
}
[Fact]
public void ChunkBy_with_cap_1_splits_each_item_into_its_own_slice()
{
// Pathological cap — degrades to N wire calls. Verifies the chunker handles the
// boundary cleanly without off-by-one.
var input = new[] { 10, 20, 30 };
var slices = OpcUaClientDriver.ChunkBy<int>(input, cap: 1).ToArray();
slices.Length.ShouldBe(3);
slices[0].ShouldBe(new[] { 10 });
slices[1].ShouldBe(new[] { 20 });
slices[2].ShouldBe(new[] { 30 });
}
[Fact]
public void OperationLimitsCache_records_all_four_caps_as_nullable_uint()
{
// The cache surfaces the four limits the driver chunks against. Storing as uint?
// lets the chunker distinguish "not yet fetched" / "no limit" (null) from "limit=N".
var cache = new OpcUaClientDriver.OperationLimitsCache(
MaxNodesPerRead: 100u,
MaxNodesPerWrite: 50u,
MaxNodesPerBrowse: null,
MaxNodesPerHistoryReadData: 10u);
cache.MaxNodesPerRead.ShouldBe(100u);
cache.MaxNodesPerWrite.ShouldBe(50u);
cache.MaxNodesPerBrowse.ShouldBeNull();
cache.MaxNodesPerHistoryReadData.ShouldBe(10u);
}
[Fact]
public void Driver_starts_with_no_cached_OperationLimits()
{
// Pre-init / pre-first-batch state: cache is null so callers fall through to
// single-call behaviour. Lazy fetch happens on the first ReadAsync/WriteAsync.
using var drv = new OpcUaClientDriver(new OpcUaClientDriverOptions(), "opcua-cache-init");
drv.OperationLimitsForTest.ShouldBeNull();
}
}