Compare commits

...

4 Commits

Author SHA1 Message Date
Joseph Doherty
db56a95819 Phase 3 PR 68 -- OPC UA Client ITagDiscovery via recursive browse (Full strategy). Adds ITagDiscovery to OpcUaClientDriver. DiscoverAsync opens a single Remote folder on the IAddressSpaceBuilder and recursively browses from the configured root (default: ObjectsFolder i=85; override via OpcUaClientDriverOptions.BrowseRoot for scoped discovery). Browse uses non-obsolete Session.BrowseAsync(RequestHeader, ViewDescription, uint maxReferences, BrowseDescriptionCollection, ct) with HierarchicalReferences forward, subtypes included, NodeClassMask Object+Variable, ResultMask pulling BrowseName + DisplayName + NodeClass + TypeDefinition. Objects become sub-folders via builder.Folder; Variables become builder.Variable entries with FullName set to the NodeId.ToString() serialization so IReadable/IWritable can round-trip without re-resolving. Three safety caps added to OpcUaClientDriverOptions to bound runaway discovery: (1) MaxBrowseDepth default 10 -- deep enough for realistic OPC UA information models, shallow enough that cyclic graphs can't spin the browse forever. (2) MaxDiscoveredNodes default 10_000 -- caps memory on pathological remote servers. Once the cap is hit, recursion short-circuits and the partially-discovered tree is still projected into the local address space (graceful degradation rather than all-or-nothing). (3) BrowseRoot as an opt-in scope restriction string per driver-specs.md \u00A78 -- defaults to ObjectsFolder but operators with 100k-node servers can point it at a single subtree. Visited-set tracks NodeIds already visited to prevent infinite cycles on graphs with non-strict hierarchy (OPC UA models can have back-references). Transient browse failures on a subtree are swallowed -- the sub-branch stops but the rest of discovery continues, matching the Modbus driver's 'transient poll errors don't kill the loop' pattern. The driver's health surface reflects the network-level cascade via the probe loop (PR 69). Deferred to a follow-up PR: DataType resolution via a batch Session.ReadAsync(Attributes.DataType) after the browse so DriverAttributeInfo.DriverDataType is accurate instead of the current conservative DriverDataType.Int32 default; AccessLevel-derived SecurityClass instead of the current ViewOnly default; array-type detection via Attributes.ValueRank + ArrayDimensions. These need an extra wire round-trip per batch of variables + a NodeId -> DriverDataType mapping table; out of scope for PR 68 to keep browse path landable. Unit tests (OpcUaClientDiscoveryTests, 3 facts): DiscoverAsync_without_initialize_throws_InvalidOperationException (pre-init hits RequireSession); DiscoverAsync_rejects_null_builder (ArgumentNullException); Discovery_caps_are_sensible_defaults (asserts 10000 / 10 / null defaults documented above). NullAddressSpaceBuilder stub implements the full IAddressSpaceBuilder shape including IVariableHandle.MarkAsAlarmCondition (throws NotSupportedException since this PR doesn't wire alarms). Live-browse coverage against a real remote server is deferred to the in-process-server-fixture PR. 10/10 OpcUaClient.Tests pass. dotnet build clean. 2026-04-19 01:17:21 -04:00
89bd726fa8 Merge pull request 'Phase 3 PR 67 -- OPC UA Client IReadable + IWritable' (#66) from phase-3-pr67-opcua-client-read-write into v2 2026-04-19 01:15:42 -04:00
Joseph Doherty
238748bc98 Phase 3 PR 67 -- OPC UA Client IReadable + IWritable via Session.ReadAsync/WriteAsync. Adds IReadable + IWritable capabilities to OpcUaClientDriver, routing reads/writes through the session's non-obsolete ReadAsync(RequestHeader, maxAge, TimestampsToReturn, ReadValueIdCollection, ct) and WriteAsync(RequestHeader, WriteValueCollection, ct) overloads (the sync and BeginXxx/EndXxx patterns are all [Obsolete] in SDK 1.5.378). Serializes on the shared Gate from PR 66 so reads + writes + future subscribe + probe don't race on the single session. NodeId parsing: fullReferences use OPC UA's standard serialized NodeId form -- ns=2;s=Demo.Counter, i=2253, ns=4;g=... for GUID, ns=3;b=... for opaque. TryParseNodeId calls NodeId.Parse with the session's MessageContext which honours the server-negotiated namespace URI table. Malformed input surfaces as BadNodeIdInvalid (0x80330000) WITHOUT a wire round-trip -- saves a request for a fault the driver can detect locally. Cascading-quality implementation per driver-specs.md \u00A78: upstream StatusCode, SourceTimestamp, and ServerTimestamp pass through VERBATIM. Bad codes from the remote server stay as the same Bad code (not translated to generic BadInternalError) so downstream clients can distinguish 'upstream value unavailable' from 'local driver bug'. SourceTimestamp is preserved verbatim (null on MinValue guard) so staleness is visible; ServerTimestamp falls back to DateTime.UtcNow if the upstream omitted it, never overwriting a non-zero value. Wire-level exceptions in the Read batch -- transport / timeout / session-dropped -- fan out BadCommunicationError (0x80050000) across every tag in the batch, not BadInternalError, so operators distinguish network reachability from driver faults. Write-side same pattern: successful WriteAsync maps each upstream StatusCode.Code verbatim into the local WriteResult.StatusCode; transport-layer failure fans out BadCommunicationError across the whole batch. WriteValue carries AttributeId=Value + DataValue wrapping Variant(writeValue) -- the SDK handles the type-to-Variant mapping for common CLR types (bool, int, float, string, etc.) so the driver doesn't need a per-type switch. Name disambiguation: the SDK has its own Opc.Ua.WriteRequest type which collides with ZB.MOM.WW.OtOpcUa.Core.Abstractions.WriteRequest; method signature uses the fully-qualified Core.Abstractions.WriteRequest. Unit tests (OpcUaClientReadWriteTests, 2 facts): ReadAsync_without_initialize_throws_InvalidOperationException + WriteAsync_without_initialize_throws_InvalidOperationException -- pre-init calls hit RequireSession and fail uniformly. Wire-level round-trip coverage against a live remote server lands in a follow-up PR once we scaffold an in-process OPC UA server fixture (the existing Server project in the solution is a candidate host). 7/7 OpcUaClient.Tests pass (5 scaffold + 2 read/write). dotnet build clean. Scope: ITagDiscovery (browse) + ISubscribable + IHostConnectivityProbe remain deferred to PRs 68-69 which also need namespace-index remapping and reference-counted MonitoredItem forwarding per driver-specs.md \u00A78. 2026-04-19 01:13:34 -04:00
b21d550836 Merge pull request 'Phase 3 PR 66 -- OPC UA Client (gateway) driver scaffold' (#65) from phase-3-pr66-opcua-client-scaffold into v2 2026-04-19 01:10:07 -04:00
4 changed files with 378 additions and 1 deletions

View File

@@ -27,8 +27,15 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient;
/// </para>
/// </remarks>
public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string driverInstanceId)
: IDriver, IDisposable, IAsyncDisposable
: IDriver, ITagDiscovery, IReadable, IWritable, IDisposable, IAsyncDisposable
{
// OPC UA StatusCode constants the driver surfaces for local-side faults. Upstream-server
// StatusCodes are passed through verbatim per driver-specs.md §8 "cascading quality" —
// downstream clients need to distinguish 'remote source down' from 'local driver failure'.
private const uint StatusBadNodeIdInvalid = 0x80330000u;
private const uint StatusBadInternalError = 0x80020000u;
private const uint StatusBadCommunicationError = 0x80050000u;
private readonly OpcUaClientDriverOptions _options = options;
private readonly SemaphoreSlim _gate = new(1, 1);
@@ -218,6 +225,265 @@ public sealed class OpcUaClientDriver(OpcUaClientDriverOptions options, string d
public long GetMemoryFootprint() => 0;
public Task FlushOptionalCachesAsync(CancellationToken cancellationToken) => Task.CompletedTask;
// ---- IReadable ----
public async Task<IReadOnlyList<DataValueSnapshot>> ReadAsync(
IReadOnlyList<string> fullReferences, CancellationToken cancellationToken)
{
var session = RequireSession();
var results = new DataValueSnapshot[fullReferences.Count];
var now = DateTime.UtcNow;
// Parse NodeIds up-front. Tags whose reference doesn't parse get BadNodeIdInvalid
// and are omitted from the wire request — saves a round-trip against the upstream
// server for a fault the driver can detect locally.
var toSend = new ReadValueIdCollection();
var indexMap = new List<int>(fullReferences.Count); // maps wire-index -> results-index
for (var i = 0; i < fullReferences.Count; i++)
{
if (!TryParseNodeId(session, fullReferences[i], out var nodeId))
{
results[i] = new DataValueSnapshot(null, StatusBadNodeIdInvalid, null, now);
continue;
}
toSend.Add(new ReadValueId { NodeId = nodeId, AttributeId = Attributes.Value });
indexMap.Add(i);
}
if (toSend.Count == 0) return results;
await _gate.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
try
{
var resp = await session.ReadAsync(
requestHeader: null,
maxAge: 0,
timestampsToReturn: TimestampsToReturn.Both,
nodesToRead: toSend,
ct: cancellationToken).ConfigureAwait(false);
var values = resp.Results;
for (var w = 0; w < values.Count; w++)
{
var r = indexMap[w];
var dv = values[w];
// Preserve the upstream StatusCode verbatim — including Bad codes per
// §8's cascading-quality rule. Also preserve SourceTimestamp so downstream
// clients can detect stale upstream data.
results[r] = new DataValueSnapshot(
Value: dv.Value,
StatusCode: dv.StatusCode.Code,
SourceTimestampUtc: dv.SourceTimestamp == DateTime.MinValue ? null : dv.SourceTimestamp,
ServerTimestampUtc: dv.ServerTimestamp == DateTime.MinValue ? now : dv.ServerTimestamp);
}
_health = new DriverHealth(DriverState.Healthy, now, null);
}
catch (Exception ex)
{
// Transport / timeout / session-dropped — fan out the same fault across every
// tag in this batch. Per-tag StatusCode stays BadCommunicationError (not
// BadInternalError) so operators distinguish "upstream unreachable" from
// "driver bug".
for (var w = 0; w < indexMap.Count; w++)
{
var r = indexMap[w];
results[r] = new DataValueSnapshot(null, StatusBadCommunicationError, null, now);
}
_health = new DriverHealth(DriverState.Degraded, _health.LastSuccessfulRead, ex.Message);
}
}
finally { _gate.Release(); }
return results;
}
// ---- IWritable ----
public async Task<IReadOnlyList<WriteResult>> WriteAsync(
IReadOnlyList<Core.Abstractions.WriteRequest> writes, CancellationToken cancellationToken)
{
var session = RequireSession();
var results = new WriteResult[writes.Count];
var toSend = new WriteValueCollection();
var indexMap = new List<int>(writes.Count);
for (var i = 0; i < writes.Count; i++)
{
if (!TryParseNodeId(session, writes[i].FullReference, out var nodeId))
{
results[i] = new WriteResult(StatusBadNodeIdInvalid);
continue;
}
toSend.Add(new WriteValue
{
NodeId = nodeId,
AttributeId = Attributes.Value,
Value = new DataValue(new Variant(writes[i].Value)),
});
indexMap.Add(i);
}
if (toSend.Count == 0) return results;
await _gate.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
try
{
var resp = await session.WriteAsync(
requestHeader: null,
nodesToWrite: toSend,
ct: cancellationToken).ConfigureAwait(false);
var codes = resp.Results;
for (var w = 0; w < codes.Count; w++)
{
var r = indexMap[w];
// Pass upstream WriteResult StatusCode through verbatim. Success codes
// include Good (0) and any warning-level Good* variants; anything with
// the severity bits set is a Bad.
results[r] = new WriteResult(codes[w].Code);
}
}
catch (Exception)
{
for (var w = 0; w < indexMap.Count; w++)
results[indexMap[w]] = new WriteResult(StatusBadCommunicationError);
}
}
finally { _gate.Release(); }
return results;
}
/// <summary>
/// Parse a tag's full-reference string as a NodeId. Accepts the standard OPC UA
/// serialized forms (<c>ns=2;s=…</c>, <c>i=2253</c>, <c>ns=4;g=…</c>, <c>ns=3;b=…</c>).
/// Empty + malformed strings return false; the driver surfaces that as
/// <see cref="StatusBadNodeIdInvalid"/> without a wire round-trip.
/// </summary>
internal static bool TryParseNodeId(ISession session, string fullReference, out NodeId nodeId)
{
nodeId = NodeId.Null;
if (string.IsNullOrWhiteSpace(fullReference)) return false;
try
{
nodeId = NodeId.Parse(session.MessageContext, fullReference);
return !NodeId.IsNull(nodeId);
}
catch
{
return false;
}
}
private ISession RequireSession() =>
Session ?? throw new InvalidOperationException("OpcUaClientDriver not initialized");
// ---- ITagDiscovery ----
public async Task DiscoverAsync(IAddressSpaceBuilder builder, CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(builder);
var session = RequireSession();
var root = !string.IsNullOrEmpty(_options.BrowseRoot)
? NodeId.Parse(session.MessageContext, _options.BrowseRoot)
: ObjectIds.ObjectsFolder;
var rootFolder = builder.Folder("Remote", "Remote");
var visited = new HashSet<NodeId>();
var discovered = 0;
await _gate.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
await BrowseRecursiveAsync(session, root, rootFolder, visited,
depth: 0, discovered: () => discovered, increment: () => discovered++,
ct: cancellationToken).ConfigureAwait(false);
}
finally { _gate.Release(); }
}
private async Task BrowseRecursiveAsync(
ISession session, NodeId node, IAddressSpaceBuilder folder, HashSet<NodeId> visited,
int depth, Func<int> discovered, Action increment, CancellationToken ct)
{
if (depth >= _options.MaxBrowseDepth) return;
if (discovered() >= _options.MaxDiscoveredNodes) return;
if (!visited.Add(node)) return;
var browseDescriptions = new BrowseDescriptionCollection
{
new()
{
NodeId = node,
BrowseDirection = BrowseDirection.Forward,
ReferenceTypeId = ReferenceTypeIds.HierarchicalReferences,
IncludeSubtypes = true,
NodeClassMask = (uint)(NodeClass.Object | NodeClass.Variable),
ResultMask = (uint)(BrowseResultMask.BrowseName | BrowseResultMask.DisplayName
| BrowseResultMask.NodeClass | BrowseResultMask.TypeDefinition),
}
};
BrowseResponse resp;
try
{
resp = await session.BrowseAsync(
requestHeader: null,
view: null,
requestedMaxReferencesPerNode: 0,
nodesToBrowse: browseDescriptions,
ct: ct).ConfigureAwait(false);
}
catch
{
// Transient browse failure on a sub-tree — don't kill the whole discovery, just
// skip this branch. The driver's health surface will reflect the cascade via the
// probe loop (PR 69).
return;
}
if (resp.Results.Count == 0) return;
var refs = resp.Results[0].References;
foreach (var rf in refs)
{
if (discovered() >= _options.MaxDiscoveredNodes) break;
var childId = ExpandedNodeId.ToNodeId(rf.NodeId, session.NamespaceUris);
if (NodeId.IsNull(childId)) continue;
var browseName = rf.BrowseName?.Name ?? childId.ToString();
var displayName = rf.DisplayName?.Text ?? browseName;
if (rf.NodeClass == NodeClass.Object)
{
var subFolder = folder.Folder(browseName, displayName);
increment();
await BrowseRecursiveAsync(session, childId, subFolder, visited,
depth + 1, discovered, increment, ct).ConfigureAwait(false);
}
else if (rf.NodeClass == NodeClass.Variable)
{
// Serialize the NodeId so the IReadable/IWritable surface receives a
// round-trippable string. Deferring the DataType + AccessLevel fetch to a
// follow-up PR — initial browse uses a conservative ViewOnly + Int32 default.
var nodeIdString = childId.ToString() ?? string.Empty;
folder.Variable(browseName, displayName, new DriverAttributeInfo(
FullName: nodeIdString,
DriverDataType: DriverDataType.Int32,
IsArray: false,
ArrayDim: null,
SecurityClass: SecurityClassification.ViewOnly,
IsHistorized: false,
IsAlarm: false));
increment();
}
}
}
public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult();
public async ValueTask DisposeAsync()

View File

@@ -62,6 +62,30 @@ public sealed class OpcUaClientDriverOptions
/// <summary>Connect + per-operation timeout.</summary>
public TimeSpan Timeout { get; init; } = TimeSpan.FromSeconds(10);
/// <summary>
/// Root NodeId to mirror. Default <c>null</c> = <c>ObjectsFolder</c> (i=85). Set to
/// a scoped root to restrict the address space the driver exposes locally — useful
/// when the remote server has tens of thousands of nodes and only a subset is
/// needed downstream.
/// </summary>
public string? BrowseRoot { get; init; }
/// <summary>
/// Cap on total nodes discovered during <c>DiscoverAsync</c>. Default 10_000 —
/// bounds memory on runaway remote servers without being so low that normal
/// deployments hit it. When the cap is reached discovery stops and a warning is
/// written to the driver health surface; the partially-discovered tree is still
/// projected into the local address space.
/// </summary>
public int MaxDiscoveredNodes { get; init; } = 10_000;
/// <summary>
/// Max hierarchical depth of the browse. Default 10 — deep enough for realistic
/// OPC UA information models, shallow enough that cyclic graphs can't spin the
/// browse forever.
/// </summary>
public int MaxBrowseDepth { get; init; } = 10;
}
/// <summary>OPC UA message security mode.</summary>

View File

@@ -0,0 +1,55 @@
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
namespace ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests;
/// <summary>
/// Scaffold tests for <see cref="OpcUaClientDriver"/>'s <see cref="ITagDiscovery"/>
/// surface that don't require a live remote server. Live-browse coverage lands in a
/// follow-up PR once the in-process OPC UA server fixture is scaffolded.
/// </summary>
[Trait("Category", "Unit")]
public sealed class OpcUaClientDiscoveryTests
{
[Fact]
public async Task DiscoverAsync_without_initialize_throws_InvalidOperationException()
{
using var drv = new OpcUaClientDriver(new OpcUaClientDriverOptions(), "opcua-disco");
var builder = new NullAddressSpaceBuilder();
await Should.ThrowAsync<InvalidOperationException>(async () =>
await drv.DiscoverAsync(builder, TestContext.Current.CancellationToken));
}
[Fact]
public void DiscoverAsync_rejects_null_builder()
{
using var drv = new OpcUaClientDriver(new OpcUaClientDriverOptions(), "opcua-disco");
Should.ThrowAsync<ArgumentNullException>(async () =>
await drv.DiscoverAsync(null!, TestContext.Current.CancellationToken));
}
[Fact]
public void Discovery_caps_are_sensible_defaults()
{
var opts = new OpcUaClientDriverOptions();
opts.MaxDiscoveredNodes.ShouldBe(10_000, "bounds memory on runaway servers without clipping normal models");
opts.MaxBrowseDepth.ShouldBe(10, "deep enough for realistic info models; shallow enough for cycle safety");
opts.BrowseRoot.ShouldBeNull("null = default to ObjectsFolder i=85");
}
private sealed class NullAddressSpaceBuilder : IAddressSpaceBuilder
{
public IAddressSpaceBuilder Folder(string browseName, string displayName) => this;
public IVariableHandle Variable(string browseName, string displayName, DriverAttributeInfo attributeInfo)
=> new StubHandle();
public void AddProperty(string browseName, DriverDataType dataType, object? value) { }
public void AttachAlarmCondition(IVariableHandle sourceVariable, string alarmName, DriverAttributeInfo alarmInfo) { }
private sealed class StubHandle : IVariableHandle
{
public string FullReference => "stub";
public IAlarmConditionSink MarkAsAlarmCondition(AlarmConditionInfo info) => throw new NotSupportedException();
}
}
}

View File

@@ -0,0 +1,32 @@
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
namespace ZB.MOM.WW.OtOpcUa.Driver.OpcUaClient.Tests;
/// <summary>
/// Unit tests for the IReadable/IWritable surface that don't need a live remote OPC UA
/// server. Wire-level round-trips against a local in-process server fixture land in a
/// follow-up PR once we have one scaffolded.
/// </summary>
[Trait("Category", "Unit")]
public sealed class OpcUaClientReadWriteTests
{
[Fact]
public async Task ReadAsync_without_initialize_throws_InvalidOperationException()
{
using var drv = new OpcUaClientDriver(new OpcUaClientDriverOptions(), "opcua-uninit");
await Should.ThrowAsync<InvalidOperationException>(async () =>
await drv.ReadAsync(["ns=2;s=Demo"], TestContext.Current.CancellationToken));
}
[Fact]
public async Task WriteAsync_without_initialize_throws_InvalidOperationException()
{
using var drv = new OpcUaClientDriver(new OpcUaClientDriverOptions(), "opcua-uninit");
await Should.ThrowAsync<InvalidOperationException>(async () =>
await drv.WriteAsync(
[new WriteRequest("ns=2;s=Demo", 42)],
TestContext.Current.CancellationToken));
}
}