From 238748bc981e728719261c891c7b28134eb7c7c1 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 19 Apr 2026 01:13:34 -0400 Subject: [PATCH] Phase 3 PR 67 -- OPC UA Client IReadable + IWritable via Session.ReadAsync/WriteAsync. Adds IReadable + IWritable capabilities to OpcUaClientDriver, routing reads/writes through the session's non-obsolete ReadAsync(RequestHeader, maxAge, TimestampsToReturn, ReadValueIdCollection, ct) and WriteAsync(RequestHeader, WriteValueCollection, ct) overloads (the sync and BeginXxx/EndXxx patterns are all [Obsolete] in SDK 1.5.378). Serializes on the shared Gate from PR 66 so reads + writes + future subscribe + probe don't race on the single session. NodeId parsing: fullReferences use OPC UA's standard serialized NodeId form -- ns=2;s=Demo.Counter, i=2253, ns=4;g=... for GUID, ns=3;b=... for opaque. TryParseNodeId calls NodeId.Parse with the session's MessageContext which honours the server-negotiated namespace URI table. Malformed input surfaces as BadNodeIdInvalid (0x80330000) WITHOUT a wire round-trip -- saves a request for a fault the driver can detect locally. Cascading-quality implementation per driver-specs.md \u00A78: upstream StatusCode, SourceTimestamp, and ServerTimestamp pass through VERBATIM. Bad codes from the remote server stay as the same Bad code (not translated to generic BadInternalError) so downstream clients can distinguish 'upstream value unavailable' from 'local driver bug'. SourceTimestamp is preserved verbatim (null on MinValue guard) so staleness is visible; ServerTimestamp falls back to DateTime.UtcNow if the upstream omitted it, never overwriting a non-zero value. Wire-level exceptions in the Read batch -- transport / timeout / session-dropped -- fan out BadCommunicationError (0x80050000) across every tag in the batch, not BadInternalError, so operators distinguish network reachability from driver faults. Write-side same pattern: successful WriteAsync maps each upstream StatusCode.Code verbatim into the local WriteResult.StatusCode; transport-layer failure fans out BadCommunicationError across the whole batch. WriteValue carries AttributeId=Value + DataValue wrapping Variant(writeValue) -- the SDK handles the type-to-Variant mapping for common CLR types (bool, int, float, string, etc.) so the driver doesn't need a per-type switch. Name disambiguation: the SDK has its own Opc.Ua.WriteRequest type which collides with ZB.MOM.WW.OtOpcUa.Core.Abstractions.WriteRequest; method signature uses the fully-qualified Core.Abstractions.WriteRequest. Unit tests (OpcUaClientReadWriteTests, 2 facts): ReadAsync_without_initialize_throws_InvalidOperationException + WriteAsync_without_initialize_throws_InvalidOperationException -- pre-init calls hit RequireSession and fail uniformly. Wire-level round-trip coverage against a live remote server lands in a follow-up PR once we scaffold an in-process OPC UA server fixture (the existing Server project in the solution is a candidate host). 7/7 OpcUaClient.Tests pass (5 scaffold + 2 read/write). dotnet build clean. Scope: ITagDiscovery (browse) + ISubscribable + IHostConnectivityProbe remain deferred to PRs 68-69 which also need namespace-index remapping and reference-counted MonitoredItem forwarding per driver-specs.md \u00A78. --- .../OpcUaClientDriver.cs | 164 +++++++++++++++++- .../OpcUaClientReadWriteTests.cs | 32 ++++ 2 files changed, 195 insertions(+), 1 deletion(-) create mode 100644 tests/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests/OpcUaClientReadWriteTests.cs diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient/OpcUaClientDriver.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient/OpcUaClientDriver.cs index 0decf84..52858a6 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient/OpcUaClientDriver.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient/OpcUaClientDriver.cs @@ -27,8 +27,15 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient; /// /// public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string driverInstanceId) - : IDriver, IDisposable, IAsyncDisposable + : IDriver, IReadable, IWritable, IDisposable, IAsyncDisposable { + // OPC UA StatusCode constants the driver surfaces for local-side faults. Upstream-server + // StatusCodes are passed through verbatim per driver-specs.md §8 "cascading quality" — + // downstream clients need to distinguish 'remote source down' from 'local driver failure'. + private const uint StatusBadNodeIdInvalid = 0x80330000u; + private const uint StatusBadInternalError = 0x80020000u; + private const uint StatusBadCommunicationError = 0x80050000u; + private readonly OpcUaClientDriverOptions _options = options; private readonly SemaphoreSlim _gate = new(1, 1); @@ -218,6 +225,161 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d public long GetMemoryFootprint() => 0; public Task FlushOptionalCachesAsync(CancellationToken cancellationToken) => Task.CompletedTask; + // ---- IReadable ---- + + public async Task> ReadAsync( + IReadOnlyList fullReferences, CancellationToken cancellationToken) + { + var session = RequireSession(); + var results = new DataValueSnapshot[fullReferences.Count]; + var now = DateTime.UtcNow; + + // Parse NodeIds up-front. Tags whose reference doesn't parse get BadNodeIdInvalid + // and are omitted from the wire request — saves a round-trip against the upstream + // server for a fault the driver can detect locally. + var toSend = new ReadValueIdCollection(); + var indexMap = new List(fullReferences.Count); // maps wire-index -> results-index + for (var i = 0; i < fullReferences.Count; i++) + { + if (!TryParseNodeId(session, fullReferences[i], out var nodeId)) + { + results[i] = new DataValueSnapshot(null, StatusBadNodeIdInvalid, null, now); + continue; + } + toSend.Add(new ReadValueId { NodeId = nodeId, AttributeId = Attributes.Value }); + indexMap.Add(i); + } + + if (toSend.Count == 0) return results; + + await _gate.WaitAsync(cancellationToken).ConfigureAwait(false); + try + { + try + { + var resp = await session.ReadAsync( + requestHeader: null, + maxAge: 0, + timestampsToReturn: TimestampsToReturn.Both, + nodesToRead: toSend, + ct: cancellationToken).ConfigureAwait(false); + + var values = resp.Results; + for (var w = 0; w < values.Count; w++) + { + var r = indexMap[w]; + var dv = values[w]; + // Preserve the upstream StatusCode verbatim — including Bad codes per + // §8's cascading-quality rule. Also preserve SourceTimestamp so downstream + // clients can detect stale upstream data. + results[r] = new DataValueSnapshot( + Value: dv.Value, + StatusCode: dv.StatusCode.Code, + SourceTimestampUtc: dv.SourceTimestamp == DateTime.MinValue ? null : dv.SourceTimestamp, + ServerTimestampUtc: dv.ServerTimestamp == DateTime.MinValue ? now : dv.ServerTimestamp); + } + _health = new DriverHealth(DriverState.Healthy, now, null); + } + catch (Exception ex) + { + // Transport / timeout / session-dropped — fan out the same fault across every + // tag in this batch. Per-tag StatusCode stays BadCommunicationError (not + // BadInternalError) so operators distinguish "upstream unreachable" from + // "driver bug". + for (var w = 0; w < indexMap.Count; w++) + { + var r = indexMap[w]; + results[r] = new DataValueSnapshot(null, StatusBadCommunicationError, null, now); + } + _health = new DriverHealth(DriverState.Degraded, _health.LastSuccessfulRead, ex.Message); + } + } + finally { _gate.Release(); } + return results; + } + + // ---- IWritable ---- + + public async Task> WriteAsync( + IReadOnlyList writes, CancellationToken cancellationToken) + { + var session = RequireSession(); + var results = new WriteResult[writes.Count]; + + var toSend = new WriteValueCollection(); + var indexMap = new List(writes.Count); + for (var i = 0; i < writes.Count; i++) + { + if (!TryParseNodeId(session, writes[i].FullReference, out var nodeId)) + { + results[i] = new WriteResult(StatusBadNodeIdInvalid); + continue; + } + toSend.Add(new WriteValue + { + NodeId = nodeId, + AttributeId = Attributes.Value, + Value = new DataValue(new Variant(writes[i].Value)), + }); + indexMap.Add(i); + } + + if (toSend.Count == 0) return results; + + await _gate.WaitAsync(cancellationToken).ConfigureAwait(false); + try + { + try + { + var resp = await session.WriteAsync( + requestHeader: null, + nodesToWrite: toSend, + ct: cancellationToken).ConfigureAwait(false); + + var codes = resp.Results; + for (var w = 0; w < codes.Count; w++) + { + var r = indexMap[w]; + // Pass upstream WriteResult StatusCode through verbatim. Success codes + // include Good (0) and any warning-level Good* variants; anything with + // the severity bits set is a Bad. + results[r] = new WriteResult(codes[w].Code); + } + } + catch (Exception) + { + for (var w = 0; w < indexMap.Count; w++) + results[indexMap[w]] = new WriteResult(StatusBadCommunicationError); + } + } + finally { _gate.Release(); } + return results; + } + + /// + /// Parse a tag's full-reference string as a NodeId. Accepts the standard OPC UA + /// serialized forms (ns=2;s=…, i=2253, ns=4;g=…, ns=3;b=…). + /// Empty + malformed strings return false; the driver surfaces that as + /// without a wire round-trip. + /// + internal static bool TryParseNodeId(ISession session, string fullReference, out NodeId nodeId) + { + nodeId = NodeId.Null; + if (string.IsNullOrWhiteSpace(fullReference)) return false; + try + { + nodeId = NodeId.Parse(session.MessageContext, fullReference); + return !NodeId.IsNull(nodeId); + } + catch + { + return false; + } + } + + private ISession RequireSession() => + Session ?? throw new InvalidOperationException("OpcUaClientDriver not initialized"); + public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult(); public async ValueTask DisposeAsync() diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests/OpcUaClientReadWriteTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests/OpcUaClientReadWriteTests.cs new file mode 100644 index 0000000..24241ed --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests/OpcUaClientReadWriteTests.cs @@ -0,0 +1,32 @@ +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; + +namespace ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests; + +/// +/// Unit tests for the IReadable/IWritable surface that don't need a live remote OPC UA +/// server. Wire-level round-trips against a local in-process server fixture land in a +/// follow-up PR once we have one scaffolded. +/// +[Trait("Category", "Unit")] +public sealed class OpcUaClientReadWriteTests +{ + [Fact] + public async Task ReadAsync_without_initialize_throws_InvalidOperationException() + { + using var drv = new OpcUaClientDriver(new OpcUaClientDriverOptions(), "opcua-uninit"); + await Should.ThrowAsync(async () => + await drv.ReadAsync(["ns=2;s=Demo"], TestContext.Current.CancellationToken)); + } + + [Fact] + public async Task WriteAsync_without_initialize_throws_InvalidOperationException() + { + using var drv = new OpcUaClientDriver(new OpcUaClientDriverOptions(), "opcua-uninit"); + await Should.ThrowAsync(async () => + await drv.WriteAsync( + [new WriteRequest("ns=2;s=Demo", 42)], + TestContext.Current.CancellationToken)); + } +} -- 2.49.1