diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient/OpcUaClientDriver.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient/OpcUaClientDriver.cs index 0decf84..52858a6 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient/OpcUaClientDriver.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient/OpcUaClientDriver.cs @@ -27,8 +27,15 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient; /// /// public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string driverInstanceId) - : IDriver, IDisposable, IAsyncDisposable + : IDriver, IReadable, IWritable, IDisposable, IAsyncDisposable { + // OPC UA StatusCode constants the driver surfaces for local-side faults. Upstream-server + // StatusCodes are passed through verbatim per driver-specs.md §8 "cascading quality" — + // downstream clients need to distinguish 'remote source down' from 'local driver failure'. + private const uint StatusBadNodeIdInvalid = 0x80330000u; + private const uint StatusBadInternalError = 0x80020000u; + private const uint StatusBadCommunicationError = 0x80050000u; + private readonly OpcUaClientDriverOptions _options = options; private readonly SemaphoreSlim _gate = new(1, 1); @@ -218,6 +225,161 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d public long GetMemoryFootprint() => 0; public Task FlushOptionalCachesAsync(CancellationToken cancellationToken) => Task.CompletedTask; + // ---- IReadable ---- + + public async Task> ReadAsync( + IReadOnlyList fullReferences, CancellationToken cancellationToken) + { + var session = RequireSession(); + var results = new DataValueSnapshot[fullReferences.Count]; + var now = DateTime.UtcNow; + + // Parse NodeIds up-front. Tags whose reference doesn't parse get BadNodeIdInvalid + // and are omitted from the wire request — saves a round-trip against the upstream + // server for a fault the driver can detect locally. + var toSend = new ReadValueIdCollection(); + var indexMap = new List(fullReferences.Count); // maps wire-index -> results-index + for (var i = 0; i < fullReferences.Count; i++) + { + if (!TryParseNodeId(session, fullReferences[i], out var nodeId)) + { + results[i] = new DataValueSnapshot(null, StatusBadNodeIdInvalid, null, now); + continue; + } + toSend.Add(new ReadValueId { NodeId = nodeId, AttributeId = Attributes.Value }); + indexMap.Add(i); + } + + if (toSend.Count == 0) return results; + + await _gate.WaitAsync(cancellationToken).ConfigureAwait(false); + try + { + try + { + var resp = await session.ReadAsync( + requestHeader: null, + maxAge: 0, + timestampsToReturn: TimestampsToReturn.Both, + nodesToRead: toSend, + ct: cancellationToken).ConfigureAwait(false); + + var values = resp.Results; + for (var w = 0; w < values.Count; w++) + { + var r = indexMap[w]; + var dv = values[w]; + // Preserve the upstream StatusCode verbatim — including Bad codes per + // §8's cascading-quality rule. Also preserve SourceTimestamp so downstream + // clients can detect stale upstream data. + results[r] = new DataValueSnapshot( + Value: dv.Value, + StatusCode: dv.StatusCode.Code, + SourceTimestampUtc: dv.SourceTimestamp == DateTime.MinValue ? null : dv.SourceTimestamp, + ServerTimestampUtc: dv.ServerTimestamp == DateTime.MinValue ? now : dv.ServerTimestamp); + } + _health = new DriverHealth(DriverState.Healthy, now, null); + } + catch (Exception ex) + { + // Transport / timeout / session-dropped — fan out the same fault across every + // tag in this batch. Per-tag StatusCode stays BadCommunicationError (not + // BadInternalError) so operators distinguish "upstream unreachable" from + // "driver bug". + for (var w = 0; w < indexMap.Count; w++) + { + var r = indexMap[w]; + results[r] = new DataValueSnapshot(null, StatusBadCommunicationError, null, now); + } + _health = new DriverHealth(DriverState.Degraded, _health.LastSuccessfulRead, ex.Message); + } + } + finally { _gate.Release(); } + return results; + } + + // ---- IWritable ---- + + public async Task> WriteAsync( + IReadOnlyList writes, CancellationToken cancellationToken) + { + var session = RequireSession(); + var results = new WriteResult[writes.Count]; + + var toSend = new WriteValueCollection(); + var indexMap = new List(writes.Count); + for (var i = 0; i < writes.Count; i++) + { + if (!TryParseNodeId(session, writes[i].FullReference, out var nodeId)) + { + results[i] = new WriteResult(StatusBadNodeIdInvalid); + continue; + } + toSend.Add(new WriteValue + { + NodeId = nodeId, + AttributeId = Attributes.Value, + Value = new DataValue(new Variant(writes[i].Value)), + }); + indexMap.Add(i); + } + + if (toSend.Count == 0) return results; + + await _gate.WaitAsync(cancellationToken).ConfigureAwait(false); + try + { + try + { + var resp = await session.WriteAsync( + requestHeader: null, + nodesToWrite: toSend, + ct: cancellationToken).ConfigureAwait(false); + + var codes = resp.Results; + for (var w = 0; w < codes.Count; w++) + { + var r = indexMap[w]; + // Pass upstream WriteResult StatusCode through verbatim. Success codes + // include Good (0) and any warning-level Good* variants; anything with + // the severity bits set is a Bad. + results[r] = new WriteResult(codes[w].Code); + } + } + catch (Exception) + { + for (var w = 0; w < indexMap.Count; w++) + results[indexMap[w]] = new WriteResult(StatusBadCommunicationError); + } + } + finally { _gate.Release(); } + return results; + } + + /// + /// Parse a tag's full-reference string as a NodeId. Accepts the standard OPC UA + /// serialized forms (ns=2;s=…, i=2253, ns=4;g=…, ns=3;b=…). + /// Empty + malformed strings return false; the driver surfaces that as + /// without a wire round-trip. + /// + internal static bool TryParseNodeId(ISession session, string fullReference, out NodeId nodeId) + { + nodeId = NodeId.Null; + if (string.IsNullOrWhiteSpace(fullReference)) return false; + try + { + nodeId = NodeId.Parse(session.MessageContext, fullReference); + return !NodeId.IsNull(nodeId); + } + catch + { + return false; + } + } + + private ISession RequireSession() => + Session ?? throw new InvalidOperationException("OpcUaClientDriver not initialized"); + public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult(); public async ValueTask DisposeAsync() diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests/OpcUaClientReadWriteTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests/OpcUaClientReadWriteTests.cs new file mode 100644 index 0000000..24241ed --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests/OpcUaClientReadWriteTests.cs @@ -0,0 +1,32 @@ +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; + +namespace ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests; + +/// +/// Unit tests for the IReadable/IWritable surface that don't need a live remote OPC UA +/// server. Wire-level round-trips against a local in-process server fixture land in a +/// follow-up PR once we have one scaffolded. +/// +[Trait("Category", "Unit")] +public sealed class OpcUaClientReadWriteTests +{ + [Fact] + public async Task ReadAsync_without_initialize_throws_InvalidOperationException() + { + using var drv = new OpcUaClientDriver(new OpcUaClientDriverOptions(), "opcua-uninit"); + await Should.ThrowAsync(async () => + await drv.ReadAsync(["ns=2;s=Demo"], TestContext.Current.CancellationToken)); + } + + [Fact] + public async Task WriteAsync_without_initialize_throws_InvalidOperationException() + { + using var drv = new OpcUaClientDriver(new OpcUaClientDriverOptions(), "opcua-uninit"); + await Should.ThrowAsync(async () => + await drv.WriteAsync( + [new WriteRequest("ns=2;s=Demo", 42)], + TestContext.Current.CancellationToken)); + } +}