From 494fdf2358910cae51af7139418b90f977333db8 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sat, 25 Apr 2026 15:38:55 -0400 Subject: [PATCH] =?UTF-8?q?Auto:=20opcuaclient-3=20=E2=80=94=20honor=20ser?= =?UTF-8?q?ver=20OperationLimits?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes #275 --- .../OpcUaClientDriver.cs | 197 +++++++++++++++--- .../OpcUaClientOperationLimitsTests.cs | 139 ++++++++++++ 2 files changed, 303 insertions(+), 33 deletions(-) create mode 100644 tests/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests/OpcUaClientOperationLimitsTests.cs diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient/OpcUaClientDriver.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient/OpcUaClientDriver.cs index fa977c3..cc89175 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient/OpcUaClientDriver.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient/OpcUaClientDriver.cs @@ -75,6 +75,31 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d /// private SessionReconnectHandler? _reconnectHandler; + /// + /// Cached server-advertised OperationLimits, fetched lazily on first batch op and + /// refreshed on reconnect. Null until the first successful fetch; null components + /// mean "fetch hasn't completed yet, fall through to single-call". Per spec, a 0 + /// limit means "no limit" — we surface that as uint?=null too so the + /// chunking helper has a single sentinel for "don't chunk". + /// + private OperationLimitsCache? _operationLimits; + private readonly SemaphoreSlim _operationLimitsLock = new(1, 1); + + /// + /// Snapshot of the four OperationLimits the driver chunks against. Stored as + /// uint? so callers can distinguish "not yet fetched" / "no limit" + /// (null) from "limit = N" (Some(N)). Spec sentinel 0 is normalized to null at + /// fetch time so the chunking helper has a single "don't chunk" sentinel. + /// + internal sealed record OperationLimitsCache( + uint? MaxNodesPerRead, + uint? MaxNodesPerWrite, + uint? MaxNodesPerBrowse, + uint? MaxNodesPerHistoryReadData); + + /// Test seam — exposes the cached limits so unit tests can assert fetch behaviour. + internal OperationLimitsCache? OperationLimitsForTest => _operationLimits; + public string DriverInstanceId => driverInstanceId; public string DriverType => "OpcUaClient"; @@ -427,6 +452,7 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d try { Session?.Dispose(); } catch { } Session = null; _connectedEndpointUrl = null; + _operationLimits = null; TransitionTo(HostState.Unknown); _health = new DriverHealth(DriverState.Unknown, _health.LastSuccessfulRead, null); @@ -442,6 +468,7 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d IReadOnlyList fullReferences, CancellationToken cancellationToken) { var session = RequireSession(); + await EnsureOperationLimitsFetchedAsync(cancellationToken).ConfigureAwait(false); var results = new DataValueSnapshot[fullReferences.Count]; var now = DateTime.UtcNow; @@ -463,31 +490,44 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d if (toSend.Count == 0) return results; + // Honor server's MaxNodesPerRead — chunk large batches so a single ReadAsync stays + // under the cap. cap=null means "no limit" (sentinel for both 0-from-server and + // not-yet-fetched), in which case ChunkBy yields the input as a single slice and + // the wire path collapses to one SDK call. + var readCap = _operationLimits?.MaxNodesPerRead; + var indexMapList = indexMap; // close over for catch await _gate.WaitAsync(cancellationToken).ConfigureAwait(false); try { try { - var resp = await session.ReadAsync( - requestHeader: null, - maxAge: 0, - timestampsToReturn: TimestampsToReturn.Both, - nodesToRead: toSend, - ct: cancellationToken).ConfigureAwait(false); - - var values = resp.Results; - for (var w = 0; w < values.Count; w++) + var wireOffset = 0; + foreach (var chunk in ChunkBy(toSend, readCap)) { - var r = indexMap[w]; - var dv = values[w]; - // Preserve the upstream StatusCode verbatim — including Bad codes per - // §8's cascading-quality rule. Also preserve SourceTimestamp so downstream - // clients can detect stale upstream data. - results[r] = new DataValueSnapshot( - Value: dv.Value, - StatusCode: dv.StatusCode.Code, - SourceTimestampUtc: dv.SourceTimestamp == DateTime.MinValue ? null : dv.SourceTimestamp, - ServerTimestampUtc: dv.ServerTimestamp == DateTime.MinValue ? now : dv.ServerTimestamp); + var chunkColl = new ReadValueIdCollection(chunk.Count); + for (var i = 0; i < chunk.Count; i++) chunkColl.Add(chunk.Array![chunk.Offset + i]); + var resp = await session.ReadAsync( + requestHeader: null, + maxAge: 0, + timestampsToReturn: TimestampsToReturn.Both, + nodesToRead: chunkColl, + ct: cancellationToken).ConfigureAwait(false); + + var values = resp.Results; + for (var w = 0; w < values.Count; w++) + { + var r = indexMapList[wireOffset + w]; + var dv = values[w]; + // Preserve the upstream StatusCode verbatim — including Bad codes per + // §8's cascading-quality rule. Also preserve SourceTimestamp so downstream + // clients can detect stale upstream data. + results[r] = new DataValueSnapshot( + Value: dv.Value, + StatusCode: dv.StatusCode.Code, + SourceTimestampUtc: dv.SourceTimestamp == DateTime.MinValue ? null : dv.SourceTimestamp, + ServerTimestampUtc: dv.ServerTimestamp == DateTime.MinValue ? now : dv.ServerTimestamp); + } + wireOffset += chunk.Count; } _health = new DriverHealth(DriverState.Healthy, now, null); } @@ -497,9 +537,9 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d // tag in this batch. Per-tag StatusCode stays BadCommunicationError (not // BadInternalError) so operators distinguish "upstream unreachable" from // "driver bug". - for (var w = 0; w < indexMap.Count; w++) + for (var w = 0; w < indexMapList.Count; w++) { - var r = indexMap[w]; + var r = indexMapList[w]; results[r] = new DataValueSnapshot(null, StatusBadCommunicationError, null, now); } _health = new DriverHealth(DriverState.Degraded, _health.LastSuccessfulRead, ex.Message); @@ -515,6 +555,7 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d IReadOnlyList writes, CancellationToken cancellationToken) { var session = RequireSession(); + await EnsureOperationLimitsFetchedAsync(cancellationToken).ConfigureAwait(false); var results = new WriteResult[writes.Count]; var toSend = new WriteValueCollection(); @@ -537,24 +578,34 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d if (toSend.Count == 0) return results; + // Honor server's MaxNodesPerWrite — same chunking pattern as ReadAsync. cap=null + // collapses to a single wire call. + var writeCap = _operationLimits?.MaxNodesPerWrite; await _gate.WaitAsync(cancellationToken).ConfigureAwait(false); try { try { - var resp = await session.WriteAsync( - requestHeader: null, - nodesToWrite: toSend, - ct: cancellationToken).ConfigureAwait(false); - - var codes = resp.Results; - for (var w = 0; w < codes.Count; w++) + var wireOffset = 0; + foreach (var chunk in ChunkBy(toSend, writeCap)) { - var r = indexMap[w]; - // Pass upstream WriteResult StatusCode through verbatim. Success codes - // include Good (0) and any warning-level Good* variants; anything with - // the severity bits set is a Bad. - results[r] = new WriteResult(codes[w].Code); + var chunkColl = new WriteValueCollection(chunk.Count); + for (var i = 0; i < chunk.Count; i++) chunkColl.Add(chunk.Array![chunk.Offset + i]); + var resp = await session.WriteAsync( + requestHeader: null, + nodesToWrite: chunkColl, + ct: cancellationToken).ConfigureAwait(false); + + var codes = resp.Results; + for (var w = 0; w < codes.Count; w++) + { + var r = indexMap[wireOffset + w]; + // Pass upstream WriteResult StatusCode through verbatim. Success codes + // include Good (0) and any warning-level Good* variants; anything with + // the severity bits set is a Bad. + results[r] = new WriteResult(codes[w].Code); + } + wireOffset += chunk.Count; } } catch (Exception) @@ -591,6 +642,81 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d private ISession RequireSession() => Session ?? throw new InvalidOperationException("OpcUaClientDriver not initialized"); + /// + /// Lazily fetch Server.ServerCapabilities.OperationLimits from the upstream + /// server and cache them on the driver. Idempotent — called from every batch op, + /// no-ops once a successful fetch has populated the cache. The cache is cleared on + /// reconnect (see ) so a server with redrawn + /// capabilities doesn't run forever with stale caps. + /// + /// + /// Uses when the + /// active session is a concrete (always true in production — + /// the SDK's session factory returns Session). Falls back gracefully on any fetch + /// failure: callers see remain null and fall through + /// to single-call behaviour. Per OPC UA Part 5, a server reporting 0 for any + /// OperationLimits attribute means "no limit"; we normalize that to null so + /// the chunking helper has a single sentinel. + /// + private async Task EnsureOperationLimitsFetchedAsync(CancellationToken ct) + { + if (_operationLimits is not null) return; + + await _operationLimitsLock.WaitAsync(ct).ConfigureAwait(false); + try + { + if (_operationLimits is not null) return; + if (Session is not Session concrete) return; + + try + { + await concrete.FetchOperationLimitsAsync(ct).ConfigureAwait(false); + var ol = concrete.OperationLimits; + if (ol is null) return; + + _operationLimits = new OperationLimitsCache( + MaxNodesPerRead: NormalizeLimit(ol.MaxNodesPerRead), + MaxNodesPerWrite: NormalizeLimit(ol.MaxNodesPerWrite), + MaxNodesPerBrowse: NormalizeLimit(ol.MaxNodesPerBrowse), + MaxNodesPerHistoryReadData: NormalizeLimit(ol.MaxNodesPerHistoryReadData)); + } + catch + { + // Fetch failed — leave cache null so we re-attempt on the next batch op. + // Single-call behaviour applies in the meantime; never block traffic on a + // capability discovery glitch. + } + } + finally { _operationLimitsLock.Release(); } + } + + /// Spec sentinel: 0 = "no limit". Normalize to null for the chunking helper. + private static uint? NormalizeLimit(uint raw) => raw == 0 ? null : raw; + + /// + /// Split into contiguous slices of at most + /// items. Returns the input as a single slice when the cap is null (no limit), + /// 0, or larger than the input — the spec sentinel + the no-cap path collapse onto + /// the same single-call branch so the wire path stays a single SDK invocation when + /// the server doesn't impose a limit. + /// + internal static IEnumerable> ChunkBy(IReadOnlyList source, uint? cap) + { + if (source.Count == 0) yield break; + var array = source as T[] ?? source.ToArray(); + if (cap is null or 0 || (uint)array.Length <= cap.Value) + { + yield return new ArraySegment(array, 0, array.Length); + yield break; + } + var size = checked((int)cap.Value); + for (var offset = 0; offset < array.Length; offset += size) + { + var len = Math.Min(size, array.Length - offset); + yield return new ArraySegment(array, offset, len); + } + } + // ---- ITagDiscovery ---- public async Task DiscoverAsync(IAddressSpaceBuilder builder, CancellationToken cancellationToken) @@ -1437,6 +1563,10 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d } Session = newSession; + // Drop cached OperationLimits so the next batch op refetches against the (potentially + // re-redeployed) upstream server. A zero-cost guard against a server whose published + // capabilities changed across the reconnect window. + _operationLimits = null; _reconnectHandler?.Dispose(); _reconnectHandler = null; @@ -1472,5 +1602,6 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d try { await ShutdownAsync(CancellationToken.None).ConfigureAwait(false); } catch { /* disposal is best-effort */ } _gate.Dispose(); + _operationLimitsLock.Dispose(); } } diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests/OpcUaClientOperationLimitsTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests/OpcUaClientOperationLimitsTests.cs new file mode 100644 index 0000000..fc0d114 --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests/OpcUaClientOperationLimitsTests.cs @@ -0,0 +1,139 @@ +using Shouldly; +using Xunit; + +namespace ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests; + +/// +/// Unit tests for the OperationLimits chunking surface (PR #275 / opcuaclient-3). Focused +/// on the static helper + the +/// sentinel semantics. Live +/// end-to-end tests against an in-process server land in the integration suite. +/// +[Trait("Category", "Unit")] +public sealed class OpcUaClientOperationLimitsTests +{ + [Fact] + public void ChunkBy_with_cap_5_splits_12_items_into_3_slices_of_5_5_2() + { + // The PR-3 acceptance scenario: server advertises MaxNodesPerRead=5, client batches a + // 12-tag read; driver must issue exactly 3 wire calls of sizes 5/5/2 in order. + var input = Enumerable.Range(0, 12).ToArray(); + + var slices = OpcUaClientDriver.ChunkBy(input, cap: 5).ToArray(); + + slices.Length.ShouldBe(3); + slices[0].Count.ShouldBe(5); + slices[1].Count.ShouldBe(5); + slices[2].Count.ShouldBe(2); + // Order + offsets must reflect the original sequence — chunking must not reorder + // tags, otherwise the indexMap ↔ result-index alignment breaks. + slices[0].ShouldBe(new[] { 0, 1, 2, 3, 4 }); + slices[1].ShouldBe(new[] { 5, 6, 7, 8, 9 }); + slices[2].ShouldBe(new[] { 10, 11 }); + } + + [Fact] + public void ChunkBy_with_null_cap_yields_single_slice_no_chunking() + { + // cap=null is the "fetch hasn't completed" / "server reports 0 = no limit" sentinel. + // Both must collapse to a single SDK call so the wire path doesn't change when the + // server doesn't impose a cap. + var input = Enumerable.Range(0, 12).ToArray(); + + var slices = OpcUaClientDriver.ChunkBy(input, cap: null).ToArray(); + + slices.Length.ShouldBe(1, "null cap means no chunking — single SDK call"); + slices[0].Count.ShouldBe(12); + } + + [Fact] + public void ChunkBy_with_zero_cap_yields_single_slice_no_chunking() + { + // OPC UA Part 5: 0 is the wire-level "no limit" sentinel. NormalizeLimit folds it + // into null upstream of ChunkBy, but the chunker itself must also treat 0 as + // no-chunking — defence in depth in case a caller bypasses NormalizeLimit. + var input = Enumerable.Range(0, 7).ToArray(); + + var slices = OpcUaClientDriver.ChunkBy(input, cap: 0).ToArray(); + + slices.Length.ShouldBe(1); + slices[0].Count.ShouldBe(7); + } + + [Fact] + public void ChunkBy_with_cap_larger_than_input_yields_single_slice() + { + var input = new[] { 1, 2, 3 }; + + var slices = OpcUaClientDriver.ChunkBy(input, cap: 100).ToArray(); + + slices.Length.ShouldBe(1); + slices[0].Count.ShouldBe(3); + } + + [Fact] + public void ChunkBy_with_empty_input_yields_no_slices() + { + // Empty batch must short-circuit before the wire call — saves a round-trip and + // matches the !toSend.Count == 0 guard in the driver. + var input = Array.Empty(); + + var slices = OpcUaClientDriver.ChunkBy(input, cap: 5).ToArray(); + + slices.Length.ShouldBe(0); + } + + [Fact] + public void ChunkBy_with_cap_equal_to_input_size_yields_single_slice() + { + // Edge case: exactly N items at cap N. Must NOT produce an extra empty slice. + var input = Enumerable.Range(0, 5).ToArray(); + + var slices = OpcUaClientDriver.ChunkBy(input, cap: 5).ToArray(); + + slices.Length.ShouldBe(1); + slices[0].Count.ShouldBe(5); + } + + [Fact] + public void ChunkBy_with_cap_1_splits_each_item_into_its_own_slice() + { + // Pathological cap — degrades to N wire calls. Verifies the chunker handles the + // boundary cleanly without off-by-one. + var input = new[] { 10, 20, 30 }; + + var slices = OpcUaClientDriver.ChunkBy(input, cap: 1).ToArray(); + + slices.Length.ShouldBe(3); + slices[0].ShouldBe(new[] { 10 }); + slices[1].ShouldBe(new[] { 20 }); + slices[2].ShouldBe(new[] { 30 }); + } + + [Fact] + public void OperationLimitsCache_records_all_four_caps_as_nullable_uint() + { + // The cache surfaces the four limits the driver chunks against. Storing as uint? + // lets the chunker distinguish "not yet fetched" / "no limit" (null) from "limit=N". + var cache = new OpcUaClientDriver.OperationLimitsCache( + MaxNodesPerRead: 100u, + MaxNodesPerWrite: 50u, + MaxNodesPerBrowse: null, + MaxNodesPerHistoryReadData: 10u); + + cache.MaxNodesPerRead.ShouldBe(100u); + cache.MaxNodesPerWrite.ShouldBe(50u); + cache.MaxNodesPerBrowse.ShouldBeNull(); + cache.MaxNodesPerHistoryReadData.ShouldBe(10u); + } + + [Fact] + public void Driver_starts_with_no_cached_OperationLimits() + { + // Pre-init / pre-first-batch state: cache is null so callers fall through to + // single-call behaviour. Lazy fetch happens on the first ReadAsync/WriteAsync. + using var drv = new OpcUaClientDriver(new OpcUaClientDriverOptions(), "opcua-cache-init"); + + drv.OperationLimitsForTest.ShouldBeNull(); + } +} -- 2.49.1