Merge pull request 'Phase 3 PR 67 -- OPC UA Client IReadable + IWritable' (#66) from phase-3-pr67-opcua-client-read-write into v2

This commit was merged in pull request #66.
This commit is contained in:
2026-04-19 01:15:42 -04:00
2 changed files with 195 additions and 1 deletions

View File

@@ -27,8 +27,15 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient;
/// </para>
/// </remarks>
public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string driverInstanceId)
: IDriver, IDisposable, IAsyncDisposable
: IDriver, IReadable, IWritable, IDisposable, IAsyncDisposable
{
// OPC UA StatusCode constants the driver surfaces for local-side faults. Upstream-server
// StatusCodes are passed through verbatim per driver-specs.md §8 "cascading quality" —
// downstream clients need to distinguish 'remote source down' from 'local driver failure'.
private const uint StatusBadNodeIdInvalid = 0x80330000u;
private const uint StatusBadInternalError = 0x80020000u;
private const uint StatusBadCommunicationError = 0x80050000u;
private readonly OpcUaClientDriverOptions _options = options;
private readonly SemaphoreSlim _gate = new(1, 1);
@@ -218,6 +225,161 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
public long GetMemoryFootprint() => 0;
public Task FlushOptionalCachesAsync(CancellationToken cancellationToken) => Task.CompletedTask;
// ---- IReadable ----
public async Task<IReadOnlyList<DataValueSnapshot>> ReadAsync(
IReadOnlyList<string> fullReferences, CancellationToken cancellationToken)
{
var session = RequireSession();
var results = new DataValueSnapshot[fullReferences.Count];
var now = DateTime.UtcNow;
// Parse NodeIds up-front. Tags whose reference doesn't parse get BadNodeIdInvalid
// and are omitted from the wire request — saves a round-trip against the upstream
// server for a fault the driver can detect locally.
var toSend = new ReadValueIdCollection();
var indexMap = new List<int>(fullReferences.Count); // maps wire-index -> results-index
for (var i = 0; i < fullReferences.Count; i++)
{
if (!TryParseNodeId(session, fullReferences[i], out var nodeId))
{
results[i] = new DataValueSnapshot(null, StatusBadNodeIdInvalid, null, now);
continue;
}
toSend.Add(new ReadValueId { NodeId = nodeId, AttributeId = Attributes.Value });
indexMap.Add(i);
}
if (toSend.Count == 0) return results;
await _gate.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
try
{
var resp = await session.ReadAsync(
requestHeader: null,
maxAge: 0,
timestampsToReturn: TimestampsToReturn.Both,
nodesToRead: toSend,
ct: cancellationToken).ConfigureAwait(false);
var values = resp.Results;
for (var w = 0; w < values.Count; w++)
{
var r = indexMap[w];
var dv = values[w];
// Preserve the upstream StatusCode verbatim — including Bad codes per
// §8's cascading-quality rule. Also preserve SourceTimestamp so downstream
// clients can detect stale upstream data.
results[r] = new DataValueSnapshot(
Value: dv.Value,
StatusCode: dv.StatusCode.Code,
SourceTimestampUtc: dv.SourceTimestamp == DateTime.MinValue ? null : dv.SourceTimestamp,
ServerTimestampUtc: dv.ServerTimestamp == DateTime.MinValue ? now : dv.ServerTimestamp);
}
_health = new DriverHealth(DriverState.Healthy, now, null);
}
catch (Exception ex)
{
// Transport / timeout / session-dropped — fan out the same fault across every
// tag in this batch. Per-tag StatusCode stays BadCommunicationError (not
// BadInternalError) so operators distinguish "upstream unreachable" from
// "driver bug".
for (var w = 0; w < indexMap.Count; w++)
{
var r = indexMap[w];
results[r] = new DataValueSnapshot(null, StatusBadCommunicationError, null, now);
}
_health = new DriverHealth(DriverState.Degraded, _health.LastSuccessfulRead, ex.Message);
}
}
finally { _gate.Release(); }
return results;
}
// ---- IWritable ----
public async Task<IReadOnlyList<WriteResult>> WriteAsync(
IReadOnlyList<Core.Abstractions.WriteRequest> writes, CancellationToken cancellationToken)
{
var session = RequireSession();
var results = new WriteResult[writes.Count];
var toSend = new WriteValueCollection();
var indexMap = new List<int>(writes.Count);
for (var i = 0; i < writes.Count; i++)
{
if (!TryParseNodeId(session, writes[i].FullReference, out var nodeId))
{
results[i] = new WriteResult(StatusBadNodeIdInvalid);
continue;
}
toSend.Add(new WriteValue
{
NodeId = nodeId,
AttributeId = Attributes.Value,
Value = new DataValue(new Variant(writes[i].Value)),
});
indexMap.Add(i);
}
if (toSend.Count == 0) return results;
await _gate.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
try
{
var resp = await session.WriteAsync(
requestHeader: null,
nodesToWrite: toSend,
ct: cancellationToken).ConfigureAwait(false);
var codes = resp.Results;
for (var w = 0; w < codes.Count; w++)
{
var r = indexMap[w];
// Pass upstream WriteResult StatusCode through verbatim. Success codes
// include Good (0) and any warning-level Good* variants; anything with
// the severity bits set is a Bad.
results[r] = new WriteResult(codes[w].Code);
}
}
catch (Exception)
{
for (var w = 0; w < indexMap.Count; w++)
results[indexMap[w]] = new WriteResult(StatusBadCommunicationError);
}
}
finally { _gate.Release(); }
return results;
}
/// <summary>
/// Parse a tag's full-reference string as a NodeId. Accepts the standard OPC UA
/// serialized forms (<c>ns=2;s=…</c>, <c>i=2253</c>, <c>ns=4;g=…</c>, <c>ns=3;b=…</c>).
/// Empty + malformed strings return false; the driver surfaces that as
/// <see cref="StatusBadNodeIdInvalid"/> without a wire round-trip.
/// </summary>
internal static bool TryParseNodeId(ISession session, string fullReference, out NodeId nodeId)
{
nodeId = NodeId.Null;
if (string.IsNullOrWhiteSpace(fullReference)) return false;
try
{
nodeId = NodeId.Parse(session.MessageContext, fullReference);
return !NodeId.IsNull(nodeId);
}
catch
{
return false;
}
}
private ISession RequireSession() =>
Session ?? throw new InvalidOperationException("OpcUaClientDriver not initialized");
public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult();
public async ValueTask DisposeAsync()

View File

@@ -0,0 +1,32 @@
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
namespace ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests;
/// <summary>
/// Unit tests for the IReadable/IWritable surface that don't need a live remote OPC UA
/// server. Wire-level round-trips against a local in-process server fixture land in a
/// follow-up PR once we have one scaffolded.
/// </summary>
[Trait("Category", "Unit")]
public sealed class OpcUaClientReadWriteTests
{
[Fact]
public async Task ReadAsync_without_initialize_throws_InvalidOperationException()
{
using var drv = new OpcUaClientDriver(new OpcUaClientDriverOptions(), "opcua-uninit");
await Should.ThrowAsync<InvalidOperationException>(async () =>
await drv.ReadAsync(["ns=2;s=Demo"], TestContext.Current.CancellationToken));
}
[Fact]
public async Task WriteAsync_without_initialize_throws_InvalidOperationException()
{
using var drv = new OpcUaClientDriver(new OpcUaClientDriverOptions(), "opcua-uninit");
await Should.ThrowAsync<InvalidOperationException>(async () =>
await drv.WriteAsync(
[new WriteRequest("ns=2;s=Demo", 42)],
TestContext.Current.CancellationToken));
}
}