Merge pull request '[opcuaclient] OpcUaClient — Honor server OperationLimits' (#333) from auto/opcuaclient/3 into auto/driver-gaps
This commit was merged in pull request #333.
This commit is contained in:
@@ -75,6 +75,31 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
|
||||
/// </summary>
|
||||
private SessionReconnectHandler? _reconnectHandler;
|
||||
|
||||
/// <summary>
|
||||
/// Cached server-advertised OperationLimits, fetched lazily on first batch op and
|
||||
/// refreshed on reconnect. Null until the first successful fetch; null components
|
||||
/// mean "fetch hasn't completed yet, fall through to single-call". Per spec, a 0
|
||||
/// limit means "no limit" — we surface that as <c>uint?</c>=null too so the
|
||||
/// chunking helper has a single sentinel for "don't chunk".
|
||||
/// </summary>
|
||||
private OperationLimitsCache? _operationLimits;
|
||||
private readonly SemaphoreSlim _operationLimitsLock = new(1, 1);
|
||||
|
||||
/// <summary>
|
||||
/// Snapshot of the four OperationLimits the driver chunks against. Stored as
|
||||
/// <c>uint?</c> so callers can distinguish "not yet fetched" / "no limit"
|
||||
/// (null) from "limit = N" (Some(N)). Spec sentinel 0 is normalized to null at
|
||||
/// fetch time so the chunking helper has a single "don't chunk" sentinel.
|
||||
/// </summary>
|
||||
internal sealed record OperationLimitsCache(
|
||||
uint? MaxNodesPerRead,
|
||||
uint? MaxNodesPerWrite,
|
||||
uint? MaxNodesPerBrowse,
|
||||
uint? MaxNodesPerHistoryReadData);
|
||||
|
||||
/// <summary>Test seam — exposes the cached limits so unit tests can assert fetch behaviour.</summary>
|
||||
internal OperationLimitsCache? OperationLimitsForTest => _operationLimits;
|
||||
|
||||
public string DriverInstanceId => driverInstanceId;
|
||||
public string DriverType => "OpcUaClient";
|
||||
|
||||
@@ -427,6 +452,7 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
|
||||
try { Session?.Dispose(); } catch { }
|
||||
Session = null;
|
||||
_connectedEndpointUrl = null;
|
||||
_operationLimits = null;
|
||||
|
||||
TransitionTo(HostState.Unknown);
|
||||
_health = new DriverHealth(DriverState.Unknown, _health.LastSuccessfulRead, null);
|
||||
@@ -442,6 +468,7 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
|
||||
IReadOnlyList<string> fullReferences, CancellationToken cancellationToken)
|
||||
{
|
||||
var session = RequireSession();
|
||||
await EnsureOperationLimitsFetchedAsync(cancellationToken).ConfigureAwait(false);
|
||||
var results = new DataValueSnapshot[fullReferences.Count];
|
||||
var now = DateTime.UtcNow;
|
||||
|
||||
@@ -463,31 +490,44 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
|
||||
|
||||
if (toSend.Count == 0) return results;
|
||||
|
||||
// Honor server's MaxNodesPerRead — chunk large batches so a single ReadAsync stays
|
||||
// under the cap. cap=null means "no limit" (sentinel for both 0-from-server and
|
||||
// not-yet-fetched), in which case ChunkBy yields the input as a single slice and
|
||||
// the wire path collapses to one SDK call.
|
||||
var readCap = _operationLimits?.MaxNodesPerRead;
|
||||
var indexMapList = indexMap; // close over for catch
|
||||
await _gate.WaitAsync(cancellationToken).ConfigureAwait(false);
|
||||
try
|
||||
{
|
||||
try
|
||||
{
|
||||
var resp = await session.ReadAsync(
|
||||
requestHeader: null,
|
||||
maxAge: 0,
|
||||
timestampsToReturn: TimestampsToReturn.Both,
|
||||
nodesToRead: toSend,
|
||||
ct: cancellationToken).ConfigureAwait(false);
|
||||
|
||||
var values = resp.Results;
|
||||
for (var w = 0; w < values.Count; w++)
|
||||
var wireOffset = 0;
|
||||
foreach (var chunk in ChunkBy(toSend, readCap))
|
||||
{
|
||||
var r = indexMap[w];
|
||||
var dv = values[w];
|
||||
// Preserve the upstream StatusCode verbatim — including Bad codes per
|
||||
// §8's cascading-quality rule. Also preserve SourceTimestamp so downstream
|
||||
// clients can detect stale upstream data.
|
||||
results[r] = new DataValueSnapshot(
|
||||
Value: dv.Value,
|
||||
StatusCode: dv.StatusCode.Code,
|
||||
SourceTimestampUtc: dv.SourceTimestamp == DateTime.MinValue ? null : dv.SourceTimestamp,
|
||||
ServerTimestampUtc: dv.ServerTimestamp == DateTime.MinValue ? now : dv.ServerTimestamp);
|
||||
var chunkColl = new ReadValueIdCollection(chunk.Count);
|
||||
for (var i = 0; i < chunk.Count; i++) chunkColl.Add(chunk.Array![chunk.Offset + i]);
|
||||
var resp = await session.ReadAsync(
|
||||
requestHeader: null,
|
||||
maxAge: 0,
|
||||
timestampsToReturn: TimestampsToReturn.Both,
|
||||
nodesToRead: chunkColl,
|
||||
ct: cancellationToken).ConfigureAwait(false);
|
||||
|
||||
var values = resp.Results;
|
||||
for (var w = 0; w < values.Count; w++)
|
||||
{
|
||||
var r = indexMapList[wireOffset + w];
|
||||
var dv = values[w];
|
||||
// Preserve the upstream StatusCode verbatim — including Bad codes per
|
||||
// §8's cascading-quality rule. Also preserve SourceTimestamp so downstream
|
||||
// clients can detect stale upstream data.
|
||||
results[r] = new DataValueSnapshot(
|
||||
Value: dv.Value,
|
||||
StatusCode: dv.StatusCode.Code,
|
||||
SourceTimestampUtc: dv.SourceTimestamp == DateTime.MinValue ? null : dv.SourceTimestamp,
|
||||
ServerTimestampUtc: dv.ServerTimestamp == DateTime.MinValue ? now : dv.ServerTimestamp);
|
||||
}
|
||||
wireOffset += chunk.Count;
|
||||
}
|
||||
_health = new DriverHealth(DriverState.Healthy, now, null);
|
||||
}
|
||||
@@ -497,9 +537,9 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
|
||||
// tag in this batch. Per-tag StatusCode stays BadCommunicationError (not
|
||||
// BadInternalError) so operators distinguish "upstream unreachable" from
|
||||
// "driver bug".
|
||||
for (var w = 0; w < indexMap.Count; w++)
|
||||
for (var w = 0; w < indexMapList.Count; w++)
|
||||
{
|
||||
var r = indexMap[w];
|
||||
var r = indexMapList[w];
|
||||
results[r] = new DataValueSnapshot(null, StatusBadCommunicationError, null, now);
|
||||
}
|
||||
_health = new DriverHealth(DriverState.Degraded, _health.LastSuccessfulRead, ex.Message);
|
||||
@@ -515,6 +555,7 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
|
||||
IReadOnlyList<Core.Abstractions.WriteRequest> writes, CancellationToken cancellationToken)
|
||||
{
|
||||
var session = RequireSession();
|
||||
await EnsureOperationLimitsFetchedAsync(cancellationToken).ConfigureAwait(false);
|
||||
var results = new WriteResult[writes.Count];
|
||||
|
||||
var toSend = new WriteValueCollection();
|
||||
@@ -537,24 +578,34 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
|
||||
|
||||
if (toSend.Count == 0) return results;
|
||||
|
||||
// Honor server's MaxNodesPerWrite — same chunking pattern as ReadAsync. cap=null
|
||||
// collapses to a single wire call.
|
||||
var writeCap = _operationLimits?.MaxNodesPerWrite;
|
||||
await _gate.WaitAsync(cancellationToken).ConfigureAwait(false);
|
||||
try
|
||||
{
|
||||
try
|
||||
{
|
||||
var resp = await session.WriteAsync(
|
||||
requestHeader: null,
|
||||
nodesToWrite: toSend,
|
||||
ct: cancellationToken).ConfigureAwait(false);
|
||||
|
||||
var codes = resp.Results;
|
||||
for (var w = 0; w < codes.Count; w++)
|
||||
var wireOffset = 0;
|
||||
foreach (var chunk in ChunkBy(toSend, writeCap))
|
||||
{
|
||||
var r = indexMap[w];
|
||||
// Pass upstream WriteResult StatusCode through verbatim. Success codes
|
||||
// include Good (0) and any warning-level Good* variants; anything with
|
||||
// the severity bits set is a Bad.
|
||||
results[r] = new WriteResult(codes[w].Code);
|
||||
var chunkColl = new WriteValueCollection(chunk.Count);
|
||||
for (var i = 0; i < chunk.Count; i++) chunkColl.Add(chunk.Array![chunk.Offset + i]);
|
||||
var resp = await session.WriteAsync(
|
||||
requestHeader: null,
|
||||
nodesToWrite: chunkColl,
|
||||
ct: cancellationToken).ConfigureAwait(false);
|
||||
|
||||
var codes = resp.Results;
|
||||
for (var w = 0; w < codes.Count; w++)
|
||||
{
|
||||
var r = indexMap[wireOffset + w];
|
||||
// Pass upstream WriteResult StatusCode through verbatim. Success codes
|
||||
// include Good (0) and any warning-level Good* variants; anything with
|
||||
// the severity bits set is a Bad.
|
||||
results[r] = new WriteResult(codes[w].Code);
|
||||
}
|
||||
wireOffset += chunk.Count;
|
||||
}
|
||||
}
|
||||
catch (Exception)
|
||||
@@ -591,6 +642,81 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
|
||||
private ISession RequireSession() =>
|
||||
Session ?? throw new InvalidOperationException("OpcUaClientDriver not initialized");
|
||||
|
||||
/// <summary>
|
||||
/// Lazily fetch <c>Server.ServerCapabilities.OperationLimits</c> from the upstream
|
||||
/// server and cache them on the driver. Idempotent — called from every batch op,
|
||||
/// no-ops once a successful fetch has populated the cache. The cache is cleared on
|
||||
/// reconnect (see <see cref="OnReconnectComplete"/>) so a server with redrawn
|
||||
/// capabilities doesn't run forever with stale caps.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// Uses <see cref="Session.FetchOperationLimitsAsync(CancellationToken)"/> when the
|
||||
/// active session is a concrete <see cref="Session"/> (always true in production —
|
||||
/// the SDK's session factory returns Session). Falls back gracefully on any fetch
|
||||
/// failure: callers see <see cref="_operationLimits"/> remain null and fall through
|
||||
/// to single-call behaviour. Per OPC UA Part 5, a server reporting 0 for any
|
||||
/// OperationLimits attribute means "no limit"; we normalize that to <c>null</c> so
|
||||
/// the chunking helper has a single sentinel.
|
||||
/// </remarks>
|
||||
private async Task EnsureOperationLimitsFetchedAsync(CancellationToken ct)
|
||||
{
|
||||
if (_operationLimits is not null) return;
|
||||
|
||||
await _operationLimitsLock.WaitAsync(ct).ConfigureAwait(false);
|
||||
try
|
||||
{
|
||||
if (_operationLimits is not null) return;
|
||||
if (Session is not Session concrete) return;
|
||||
|
||||
try
|
||||
{
|
||||
await concrete.FetchOperationLimitsAsync(ct).ConfigureAwait(false);
|
||||
var ol = concrete.OperationLimits;
|
||||
if (ol is null) return;
|
||||
|
||||
_operationLimits = new OperationLimitsCache(
|
||||
MaxNodesPerRead: NormalizeLimit(ol.MaxNodesPerRead),
|
||||
MaxNodesPerWrite: NormalizeLimit(ol.MaxNodesPerWrite),
|
||||
MaxNodesPerBrowse: NormalizeLimit(ol.MaxNodesPerBrowse),
|
||||
MaxNodesPerHistoryReadData: NormalizeLimit(ol.MaxNodesPerHistoryReadData));
|
||||
}
|
||||
catch
|
||||
{
|
||||
// Fetch failed — leave cache null so we re-attempt on the next batch op.
|
||||
// Single-call behaviour applies in the meantime; never block traffic on a
|
||||
// capability discovery glitch.
|
||||
}
|
||||
}
|
||||
finally { _operationLimitsLock.Release(); }
|
||||
}
|
||||
|
||||
/// <summary>Spec sentinel: 0 = "no limit". Normalize to null for the chunking helper.</summary>
|
||||
private static uint? NormalizeLimit(uint raw) => raw == 0 ? null : raw;
|
||||
|
||||
/// <summary>
|
||||
/// Split <paramref name="source"/> into contiguous slices of at most <paramref name="cap"/>
|
||||
/// items. Returns the input as a single slice when the cap is null (no limit),
|
||||
/// 0, or larger than the input — the spec sentinel + the no-cap path collapse onto
|
||||
/// the same single-call branch so the wire path stays a single SDK invocation when
|
||||
/// the server doesn't impose a limit.
|
||||
/// </summary>
|
||||
internal static IEnumerable<ArraySegment<T>> ChunkBy<T>(IReadOnlyList<T> source, uint? cap)
|
||||
{
|
||||
if (source.Count == 0) yield break;
|
||||
var array = source as T[] ?? source.ToArray();
|
||||
if (cap is null or 0 || (uint)array.Length <= cap.Value)
|
||||
{
|
||||
yield return new ArraySegment<T>(array, 0, array.Length);
|
||||
yield break;
|
||||
}
|
||||
var size = checked((int)cap.Value);
|
||||
for (var offset = 0; offset < array.Length; offset += size)
|
||||
{
|
||||
var len = Math.Min(size, array.Length - offset);
|
||||
yield return new ArraySegment<T>(array, offset, len);
|
||||
}
|
||||
}
|
||||
|
||||
// ---- ITagDiscovery ----
|
||||
|
||||
public async Task DiscoverAsync(IAddressSpaceBuilder builder, CancellationToken cancellationToken)
|
||||
@@ -1437,6 +1563,10 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
|
||||
}
|
||||
|
||||
Session = newSession;
|
||||
// Drop cached OperationLimits so the next batch op refetches against the (potentially
|
||||
// re-redeployed) upstream server. A zero-cost guard against a server whose published
|
||||
// capabilities changed across the reconnect window.
|
||||
_operationLimits = null;
|
||||
_reconnectHandler?.Dispose();
|
||||
_reconnectHandler = null;
|
||||
|
||||
@@ -1472,5 +1602,6 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
|
||||
try { await ShutdownAsync(CancellationToken.None).ConfigureAwait(false); }
|
||||
catch { /* disposal is best-effort */ }
|
||||
_gate.Dispose();
|
||||
_operationLimitsLock.Dispose();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,139 @@
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests;
|
||||
|
||||
/// <summary>
|
||||
/// Unit tests for the OperationLimits chunking surface (PR #275 / opcuaclient-3). Focused
|
||||
/// on the static <see cref="OpcUaClientDriver.ChunkBy{T}"/> helper + the
|
||||
/// <see cref="OpcUaClientDriver.OperationLimitsCache"/> sentinel semantics. Live
|
||||
/// end-to-end tests against an in-process server land in the integration suite.
|
||||
/// </summary>
|
||||
[Trait("Category", "Unit")]
|
||||
public sealed class OpcUaClientOperationLimitsTests
|
||||
{
|
||||
[Fact]
|
||||
public void ChunkBy_with_cap_5_splits_12_items_into_3_slices_of_5_5_2()
|
||||
{
|
||||
// The PR-3 acceptance scenario: server advertises MaxNodesPerRead=5, client batches a
|
||||
// 12-tag read; driver must issue exactly 3 wire calls of sizes 5/5/2 in order.
|
||||
var input = Enumerable.Range(0, 12).ToArray();
|
||||
|
||||
var slices = OpcUaClientDriver.ChunkBy<int>(input, cap: 5).ToArray();
|
||||
|
||||
slices.Length.ShouldBe(3);
|
||||
slices[0].Count.ShouldBe(5);
|
||||
slices[1].Count.ShouldBe(5);
|
||||
slices[2].Count.ShouldBe(2);
|
||||
// Order + offsets must reflect the original sequence — chunking must not reorder
|
||||
// tags, otherwise the indexMap ↔ result-index alignment breaks.
|
||||
slices[0].ShouldBe(new[] { 0, 1, 2, 3, 4 });
|
||||
slices[1].ShouldBe(new[] { 5, 6, 7, 8, 9 });
|
||||
slices[2].ShouldBe(new[] { 10, 11 });
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ChunkBy_with_null_cap_yields_single_slice_no_chunking()
|
||||
{
|
||||
// cap=null is the "fetch hasn't completed" / "server reports 0 = no limit" sentinel.
|
||||
// Both must collapse to a single SDK call so the wire path doesn't change when the
|
||||
// server doesn't impose a cap.
|
||||
var input = Enumerable.Range(0, 12).ToArray();
|
||||
|
||||
var slices = OpcUaClientDriver.ChunkBy<int>(input, cap: null).ToArray();
|
||||
|
||||
slices.Length.ShouldBe(1, "null cap means no chunking — single SDK call");
|
||||
slices[0].Count.ShouldBe(12);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ChunkBy_with_zero_cap_yields_single_slice_no_chunking()
|
||||
{
|
||||
// OPC UA Part 5: 0 is the wire-level "no limit" sentinel. NormalizeLimit folds it
|
||||
// into null upstream of ChunkBy, but the chunker itself must also treat 0 as
|
||||
// no-chunking — defence in depth in case a caller bypasses NormalizeLimit.
|
||||
var input = Enumerable.Range(0, 7).ToArray();
|
||||
|
||||
var slices = OpcUaClientDriver.ChunkBy<int>(input, cap: 0).ToArray();
|
||||
|
||||
slices.Length.ShouldBe(1);
|
||||
slices[0].Count.ShouldBe(7);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ChunkBy_with_cap_larger_than_input_yields_single_slice()
|
||||
{
|
||||
var input = new[] { 1, 2, 3 };
|
||||
|
||||
var slices = OpcUaClientDriver.ChunkBy<int>(input, cap: 100).ToArray();
|
||||
|
||||
slices.Length.ShouldBe(1);
|
||||
slices[0].Count.ShouldBe(3);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ChunkBy_with_empty_input_yields_no_slices()
|
||||
{
|
||||
// Empty batch must short-circuit before the wire call — saves a round-trip and
|
||||
// matches the !toSend.Count == 0 guard in the driver.
|
||||
var input = Array.Empty<int>();
|
||||
|
||||
var slices = OpcUaClientDriver.ChunkBy<int>(input, cap: 5).ToArray();
|
||||
|
||||
slices.Length.ShouldBe(0);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ChunkBy_with_cap_equal_to_input_size_yields_single_slice()
|
||||
{
|
||||
// Edge case: exactly N items at cap N. Must NOT produce an extra empty slice.
|
||||
var input = Enumerable.Range(0, 5).ToArray();
|
||||
|
||||
var slices = OpcUaClientDriver.ChunkBy<int>(input, cap: 5).ToArray();
|
||||
|
||||
slices.Length.ShouldBe(1);
|
||||
slices[0].Count.ShouldBe(5);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ChunkBy_with_cap_1_splits_each_item_into_its_own_slice()
|
||||
{
|
||||
// Pathological cap — degrades to N wire calls. Verifies the chunker handles the
|
||||
// boundary cleanly without off-by-one.
|
||||
var input = new[] { 10, 20, 30 };
|
||||
|
||||
var slices = OpcUaClientDriver.ChunkBy<int>(input, cap: 1).ToArray();
|
||||
|
||||
slices.Length.ShouldBe(3);
|
||||
slices[0].ShouldBe(new[] { 10 });
|
||||
slices[1].ShouldBe(new[] { 20 });
|
||||
slices[2].ShouldBe(new[] { 30 });
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void OperationLimitsCache_records_all_four_caps_as_nullable_uint()
|
||||
{
|
||||
// The cache surfaces the four limits the driver chunks against. Storing as uint?
|
||||
// lets the chunker distinguish "not yet fetched" / "no limit" (null) from "limit=N".
|
||||
var cache = new OpcUaClientDriver.OperationLimitsCache(
|
||||
MaxNodesPerRead: 100u,
|
||||
MaxNodesPerWrite: 50u,
|
||||
MaxNodesPerBrowse: null,
|
||||
MaxNodesPerHistoryReadData: 10u);
|
||||
|
||||
cache.MaxNodesPerRead.ShouldBe(100u);
|
||||
cache.MaxNodesPerWrite.ShouldBe(50u);
|
||||
cache.MaxNodesPerBrowse.ShouldBeNull();
|
||||
cache.MaxNodesPerHistoryReadData.ShouldBe(10u);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Driver_starts_with_no_cached_OperationLimits()
|
||||
{
|
||||
// Pre-init / pre-first-batch state: cache is null so callers fall through to
|
||||
// single-call behaviour. Lazy fetch happens on the first ReadAsync/WriteAsync.
|
||||
using var drv = new OpcUaClientDriver(new OpcUaClientDriverOptions(), "opcua-cache-init");
|
||||
|
||||
drv.OperationLimitsForTest.ShouldBeNull();
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user