diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient/OpcUaClientDriver.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient/OpcUaClientDriver.cs
index fa977c3..cc89175 100644
--- a/src/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient/OpcUaClientDriver.cs
+++ b/src/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient/OpcUaClientDriver.cs
@@ -75,6 +75,31 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
///
private SessionReconnectHandler? _reconnectHandler;
+ ///
+ /// Cached server-advertised OperationLimits, fetched lazily on first batch op and
+ /// refreshed on reconnect. Null until the first successful fetch; null components
+ /// mean "fetch hasn't completed yet, fall through to single-call". Per spec, a 0
+ /// limit means "no limit" — we surface that as uint?=null too so the
+ /// chunking helper has a single sentinel for "don't chunk".
+ ///
+ private OperationLimitsCache? _operationLimits;
+ private readonly SemaphoreSlim _operationLimitsLock = new(1, 1);
+
+ ///
+ /// Snapshot of the four OperationLimits the driver chunks against. Stored as
+ /// uint? so callers can distinguish "not yet fetched" / "no limit"
+ /// (null) from "limit = N" (Some(N)). Spec sentinel 0 is normalized to null at
+ /// fetch time so the chunking helper has a single "don't chunk" sentinel.
+ ///
+ internal sealed record OperationLimitsCache(
+ uint? MaxNodesPerRead,
+ uint? MaxNodesPerWrite,
+ uint? MaxNodesPerBrowse,
+ uint? MaxNodesPerHistoryReadData);
+
+ /// Test seam — exposes the cached limits so unit tests can assert fetch behaviour.
+ internal OperationLimitsCache? OperationLimitsForTest => _operationLimits;
+
public string DriverInstanceId => driverInstanceId;
public string DriverType => "OpcUaClient";
@@ -427,6 +452,7 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
try { Session?.Dispose(); } catch { }
Session = null;
_connectedEndpointUrl = null;
+ _operationLimits = null;
TransitionTo(HostState.Unknown);
_health = new DriverHealth(DriverState.Unknown, _health.LastSuccessfulRead, null);
@@ -442,6 +468,7 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
IReadOnlyList fullReferences, CancellationToken cancellationToken)
{
var session = RequireSession();
+ await EnsureOperationLimitsFetchedAsync(cancellationToken).ConfigureAwait(false);
var results = new DataValueSnapshot[fullReferences.Count];
var now = DateTime.UtcNow;
@@ -463,31 +490,44 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
if (toSend.Count == 0) return results;
+ // Honor server's MaxNodesPerRead — chunk large batches so a single ReadAsync stays
+ // under the cap. cap=null means "no limit" (sentinel for both 0-from-server and
+ // not-yet-fetched), in which case ChunkBy yields the input as a single slice and
+ // the wire path collapses to one SDK call.
+ var readCap = _operationLimits?.MaxNodesPerRead;
+ var indexMapList = indexMap; // close over for catch
await _gate.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
try
{
- var resp = await session.ReadAsync(
- requestHeader: null,
- maxAge: 0,
- timestampsToReturn: TimestampsToReturn.Both,
- nodesToRead: toSend,
- ct: cancellationToken).ConfigureAwait(false);
-
- var values = resp.Results;
- for (var w = 0; w < values.Count; w++)
+ var wireOffset = 0;
+ foreach (var chunk in ChunkBy(toSend, readCap))
{
- var r = indexMap[w];
- var dv = values[w];
- // Preserve the upstream StatusCode verbatim — including Bad codes per
- // §8's cascading-quality rule. Also preserve SourceTimestamp so downstream
- // clients can detect stale upstream data.
- results[r] = new DataValueSnapshot(
- Value: dv.Value,
- StatusCode: dv.StatusCode.Code,
- SourceTimestampUtc: dv.SourceTimestamp == DateTime.MinValue ? null : dv.SourceTimestamp,
- ServerTimestampUtc: dv.ServerTimestamp == DateTime.MinValue ? now : dv.ServerTimestamp);
+ var chunkColl = new ReadValueIdCollection(chunk.Count);
+ for (var i = 0; i < chunk.Count; i++) chunkColl.Add(chunk.Array![chunk.Offset + i]);
+ var resp = await session.ReadAsync(
+ requestHeader: null,
+ maxAge: 0,
+ timestampsToReturn: TimestampsToReturn.Both,
+ nodesToRead: chunkColl,
+ ct: cancellationToken).ConfigureAwait(false);
+
+ var values = resp.Results;
+ for (var w = 0; w < values.Count; w++)
+ {
+ var r = indexMapList[wireOffset + w];
+ var dv = values[w];
+ // Preserve the upstream StatusCode verbatim — including Bad codes per
+ // §8's cascading-quality rule. Also preserve SourceTimestamp so downstream
+ // clients can detect stale upstream data.
+ results[r] = new DataValueSnapshot(
+ Value: dv.Value,
+ StatusCode: dv.StatusCode.Code,
+ SourceTimestampUtc: dv.SourceTimestamp == DateTime.MinValue ? null : dv.SourceTimestamp,
+ ServerTimestampUtc: dv.ServerTimestamp == DateTime.MinValue ? now : dv.ServerTimestamp);
+ }
+ wireOffset += chunk.Count;
}
_health = new DriverHealth(DriverState.Healthy, now, null);
}
@@ -497,9 +537,9 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
// tag in this batch. Per-tag StatusCode stays BadCommunicationError (not
// BadInternalError) so operators distinguish "upstream unreachable" from
// "driver bug".
- for (var w = 0; w < indexMap.Count; w++)
+ for (var w = 0; w < indexMapList.Count; w++)
{
- var r = indexMap[w];
+ var r = indexMapList[w];
results[r] = new DataValueSnapshot(null, StatusBadCommunicationError, null, now);
}
_health = new DriverHealth(DriverState.Degraded, _health.LastSuccessfulRead, ex.Message);
@@ -515,6 +555,7 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
IReadOnlyList writes, CancellationToken cancellationToken)
{
var session = RequireSession();
+ await EnsureOperationLimitsFetchedAsync(cancellationToken).ConfigureAwait(false);
var results = new WriteResult[writes.Count];
var toSend = new WriteValueCollection();
@@ -537,24 +578,34 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
if (toSend.Count == 0) return results;
+ // Honor server's MaxNodesPerWrite — same chunking pattern as ReadAsync. cap=null
+ // collapses to a single wire call.
+ var writeCap = _operationLimits?.MaxNodesPerWrite;
await _gate.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
try
{
- var resp = await session.WriteAsync(
- requestHeader: null,
- nodesToWrite: toSend,
- ct: cancellationToken).ConfigureAwait(false);
-
- var codes = resp.Results;
- for (var w = 0; w < codes.Count; w++)
+ var wireOffset = 0;
+ foreach (var chunk in ChunkBy(toSend, writeCap))
{
- var r = indexMap[w];
- // Pass upstream WriteResult StatusCode through verbatim. Success codes
- // include Good (0) and any warning-level Good* variants; anything with
- // the severity bits set is a Bad.
- results[r] = new WriteResult(codes[w].Code);
+ var chunkColl = new WriteValueCollection(chunk.Count);
+ for (var i = 0; i < chunk.Count; i++) chunkColl.Add(chunk.Array![chunk.Offset + i]);
+ var resp = await session.WriteAsync(
+ requestHeader: null,
+ nodesToWrite: chunkColl,
+ ct: cancellationToken).ConfigureAwait(false);
+
+ var codes = resp.Results;
+ for (var w = 0; w < codes.Count; w++)
+ {
+ var r = indexMap[wireOffset + w];
+ // Pass upstream WriteResult StatusCode through verbatim. Success codes
+ // include Good (0) and any warning-level Good* variants; anything with
+ // the severity bits set is a Bad.
+ results[r] = new WriteResult(codes[w].Code);
+ }
+ wireOffset += chunk.Count;
}
}
catch (Exception)
@@ -591,6 +642,81 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
private ISession RequireSession() =>
Session ?? throw new InvalidOperationException("OpcUaClientDriver not initialized");
+ ///
+ /// Lazily fetch Server.ServerCapabilities.OperationLimits from the upstream
+ /// server and cache them on the driver. Idempotent — called from every batch op,
+ /// no-ops once a successful fetch has populated the cache. The cache is cleared on
+ /// reconnect (see ) so a server with redrawn
+ /// capabilities doesn't run forever with stale caps.
+ ///
+ ///
+ /// Uses when the
+ /// active session is a concrete (always true in production —
+ /// the SDK's session factory returns Session). Falls back gracefully on any fetch
+ /// failure: callers see remain null and fall through
+ /// to single-call behaviour. Per OPC UA Part 5, a server reporting 0 for any
+ /// OperationLimits attribute means "no limit"; we normalize that to null so
+ /// the chunking helper has a single sentinel.
+ ///
+ private async Task EnsureOperationLimitsFetchedAsync(CancellationToken ct)
+ {
+ if (_operationLimits is not null) return;
+
+ await _operationLimitsLock.WaitAsync(ct).ConfigureAwait(false);
+ try
+ {
+ if (_operationLimits is not null) return;
+ if (Session is not Session concrete) return;
+
+ try
+ {
+ await concrete.FetchOperationLimitsAsync(ct).ConfigureAwait(false);
+ var ol = concrete.OperationLimits;
+ if (ol is null) return;
+
+ _operationLimits = new OperationLimitsCache(
+ MaxNodesPerRead: NormalizeLimit(ol.MaxNodesPerRead),
+ MaxNodesPerWrite: NormalizeLimit(ol.MaxNodesPerWrite),
+ MaxNodesPerBrowse: NormalizeLimit(ol.MaxNodesPerBrowse),
+ MaxNodesPerHistoryReadData: NormalizeLimit(ol.MaxNodesPerHistoryReadData));
+ }
+ catch
+ {
+ // Fetch failed — leave cache null so we re-attempt on the next batch op.
+ // Single-call behaviour applies in the meantime; never block traffic on a
+ // capability discovery glitch.
+ }
+ }
+ finally { _operationLimitsLock.Release(); }
+ }
+
+ /// Spec sentinel: 0 = "no limit". Normalize to null for the chunking helper.
+ private static uint? NormalizeLimit(uint raw) => raw == 0 ? null : raw;
+
+ ///
+ /// Split into contiguous slices of at most
+ /// items. Returns the input as a single slice when the cap is null (no limit),
+ /// 0, or larger than the input — the spec sentinel + the no-cap path collapse onto
+ /// the same single-call branch so the wire path stays a single SDK invocation when
+ /// the server doesn't impose a limit.
+ ///
+ internal static IEnumerable> ChunkBy(IReadOnlyList source, uint? cap)
+ {
+ if (source.Count == 0) yield break;
+ var array = source as T[] ?? source.ToArray();
+ if (cap is null or 0 || (uint)array.Length <= cap.Value)
+ {
+ yield return new ArraySegment(array, 0, array.Length);
+ yield break;
+ }
+ var size = checked((int)cap.Value);
+ for (var offset = 0; offset < array.Length; offset += size)
+ {
+ var len = Math.Min(size, array.Length - offset);
+ yield return new ArraySegment(array, offset, len);
+ }
+ }
+
// ---- ITagDiscovery ----
public async Task DiscoverAsync(IAddressSpaceBuilder builder, CancellationToken cancellationToken)
@@ -1437,6 +1563,10 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
}
Session = newSession;
+ // Drop cached OperationLimits so the next batch op refetches against the (potentially
+ // re-redeployed) upstream server. A zero-cost guard against a server whose published
+ // capabilities changed across the reconnect window.
+ _operationLimits = null;
_reconnectHandler?.Dispose();
_reconnectHandler = null;
@@ -1472,5 +1602,6 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
try { await ShutdownAsync(CancellationToken.None).ConfigureAwait(false); }
catch { /* disposal is best-effort */ }
_gate.Dispose();
+ _operationLimitsLock.Dispose();
}
}
diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests/OpcUaClientOperationLimitsTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests/OpcUaClientOperationLimitsTests.cs
new file mode 100644
index 0000000..fc0d114
--- /dev/null
+++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests/OpcUaClientOperationLimitsTests.cs
@@ -0,0 +1,139 @@
+using Shouldly;
+using Xunit;
+
+namespace ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests;
+
+///
+/// Unit tests for the OperationLimits chunking surface (PR #275 / opcuaclient-3). Focused
+/// on the static helper + the
+/// sentinel semantics. Live
+/// end-to-end tests against an in-process server land in the integration suite.
+///
+[Trait("Category", "Unit")]
+public sealed class OpcUaClientOperationLimitsTests
+{
+ [Fact]
+ public void ChunkBy_with_cap_5_splits_12_items_into_3_slices_of_5_5_2()
+ {
+ // The PR-3 acceptance scenario: server advertises MaxNodesPerRead=5, client batches a
+ // 12-tag read; driver must issue exactly 3 wire calls of sizes 5/5/2 in order.
+ var input = Enumerable.Range(0, 12).ToArray();
+
+ var slices = OpcUaClientDriver.ChunkBy(input, cap: 5).ToArray();
+
+ slices.Length.ShouldBe(3);
+ slices[0].Count.ShouldBe(5);
+ slices[1].Count.ShouldBe(5);
+ slices[2].Count.ShouldBe(2);
+ // Order + offsets must reflect the original sequence — chunking must not reorder
+ // tags, otherwise the indexMap ↔ result-index alignment breaks.
+ slices[0].ShouldBe(new[] { 0, 1, 2, 3, 4 });
+ slices[1].ShouldBe(new[] { 5, 6, 7, 8, 9 });
+ slices[2].ShouldBe(new[] { 10, 11 });
+ }
+
+ [Fact]
+ public void ChunkBy_with_null_cap_yields_single_slice_no_chunking()
+ {
+ // cap=null is the "fetch hasn't completed" / "server reports 0 = no limit" sentinel.
+ // Both must collapse to a single SDK call so the wire path doesn't change when the
+ // server doesn't impose a cap.
+ var input = Enumerable.Range(0, 12).ToArray();
+
+ var slices = OpcUaClientDriver.ChunkBy(input, cap: null).ToArray();
+
+ slices.Length.ShouldBe(1, "null cap means no chunking — single SDK call");
+ slices[0].Count.ShouldBe(12);
+ }
+
+ [Fact]
+ public void ChunkBy_with_zero_cap_yields_single_slice_no_chunking()
+ {
+ // OPC UA Part 5: 0 is the wire-level "no limit" sentinel. NormalizeLimit folds it
+ // into null upstream of ChunkBy, but the chunker itself must also treat 0 as
+ // no-chunking — defence in depth in case a caller bypasses NormalizeLimit.
+ var input = Enumerable.Range(0, 7).ToArray();
+
+ var slices = OpcUaClientDriver.ChunkBy(input, cap: 0).ToArray();
+
+ slices.Length.ShouldBe(1);
+ slices[0].Count.ShouldBe(7);
+ }
+
+ [Fact]
+ public void ChunkBy_with_cap_larger_than_input_yields_single_slice()
+ {
+ var input = new[] { 1, 2, 3 };
+
+ var slices = OpcUaClientDriver.ChunkBy(input, cap: 100).ToArray();
+
+ slices.Length.ShouldBe(1);
+ slices[0].Count.ShouldBe(3);
+ }
+
+ [Fact]
+ public void ChunkBy_with_empty_input_yields_no_slices()
+ {
+ // Empty batch must short-circuit before the wire call — saves a round-trip and
+ // matches the !toSend.Count == 0 guard in the driver.
+ var input = Array.Empty();
+
+ var slices = OpcUaClientDriver.ChunkBy(input, cap: 5).ToArray();
+
+ slices.Length.ShouldBe(0);
+ }
+
+ [Fact]
+ public void ChunkBy_with_cap_equal_to_input_size_yields_single_slice()
+ {
+ // Edge case: exactly N items at cap N. Must NOT produce an extra empty slice.
+ var input = Enumerable.Range(0, 5).ToArray();
+
+ var slices = OpcUaClientDriver.ChunkBy(input, cap: 5).ToArray();
+
+ slices.Length.ShouldBe(1);
+ slices[0].Count.ShouldBe(5);
+ }
+
+ [Fact]
+ public void ChunkBy_with_cap_1_splits_each_item_into_its_own_slice()
+ {
+ // Pathological cap — degrades to N wire calls. Verifies the chunker handles the
+ // boundary cleanly without off-by-one.
+ var input = new[] { 10, 20, 30 };
+
+ var slices = OpcUaClientDriver.ChunkBy(input, cap: 1).ToArray();
+
+ slices.Length.ShouldBe(3);
+ slices[0].ShouldBe(new[] { 10 });
+ slices[1].ShouldBe(new[] { 20 });
+ slices[2].ShouldBe(new[] { 30 });
+ }
+
+ [Fact]
+ public void OperationLimitsCache_records_all_four_caps_as_nullable_uint()
+ {
+ // The cache surfaces the four limits the driver chunks against. Storing as uint?
+ // lets the chunker distinguish "not yet fetched" / "no limit" (null) from "limit=N".
+ var cache = new OpcUaClientDriver.OperationLimitsCache(
+ MaxNodesPerRead: 100u,
+ MaxNodesPerWrite: 50u,
+ MaxNodesPerBrowse: null,
+ MaxNodesPerHistoryReadData: 10u);
+
+ cache.MaxNodesPerRead.ShouldBe(100u);
+ cache.MaxNodesPerWrite.ShouldBe(50u);
+ cache.MaxNodesPerBrowse.ShouldBeNull();
+ cache.MaxNodesPerHistoryReadData.ShouldBe(10u);
+ }
+
+ [Fact]
+ public void Driver_starts_with_no_cached_OperationLimits()
+ {
+ // Pre-init / pre-first-batch state: cache is null so callers fall through to
+ // single-call behaviour. Lazy fetch happens on the first ReadAsync/WriteAsync.
+ using var drv = new OpcUaClientDriver(new OpcUaClientDriverOptions(), "opcua-cache-init");
+
+ drv.OperationLimitsForTest.ShouldBeNull();
+ }
+}