Phase 6.1 Stream A.3 complete — wire CapabilityInvoker into DriverNodeManager dispatch end-to-end

Every OnReadValue / OnWriteValue now routes through the process-singleton
DriverResiliencePipelineBuilder's CapabilityInvoker. Read / Write dispatch
paths gain timeout + per-capability retry + per-(driver, host) circuit breaker
+ bulkhead without touching the individual driver implementations.

Wiring:
- OpcUaApplicationHost: new optional DriverResiliencePipelineBuilder ctor
  parameter (default null → instance-owned builder). Keeps the 3 test call
  sites that construct OpcUaApplicationHost directly unchanged.
- OtOpcUaServer: requires the builder in its ctor; constructs one
  CapabilityInvoker per driver at CreateMasterNodeManager time with default
  Tier A DriverResilienceOptions. TODO: Stream B.1 will wire real per-driver-
  type tiers via DriverTypeRegistry; Phase 6.1 follow-up will read the
  DriverInstance.ResilienceConfig JSON column for per-instance overrides.
- DriverNodeManager: takes a CapabilityInvoker in its ctor. OnReadValue wraps
  the driver's ReadAsync through ExecuteAsync(DriverCapability.Read, hostName,
  ...); OnWriteValue wraps WriteAsync through ExecuteWriteAsync(hostName,
  isIdempotent, ...) where isIdempotent comes from the new
  _writeIdempotentByFullRef map populated at Variable() registration from
  DriverAttributeInfo.WriteIdempotent.

HostName defaults to driver.DriverInstanceId for now — a single-host pipeline
per driver. Multi-host drivers (Modbus with N PLCs) will expose their own per-
call host resolution in a follow-up so failing PLCs can trip per-PLC breakers
without poisoning siblings (decision #144).

Test fixup:
- FlakeyDriverIntegrationTests.Read_SurfacesSuccess_AfterTransientFailures:
  bumped TimeoutSeconds=2 → 30. 10 retries at exponential backoff with jitter
  can exceed 2s under parallel-test-run CPU pressure; the test asserts retry
  behavior, not timeout budget, so the longer slack keeps it deterministic.

Full solution dotnet test: 948 passing. Pre-existing Client.CLI Subscribe
flake unchanged.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-04-19 07:28:28 -04:00
parent b6d2803ff6
commit 29bcaf277b
4 changed files with 48 additions and 10 deletions

View File

@@ -3,6 +3,7 @@ using Microsoft.Extensions.Logging;
using Opc.Ua; using Opc.Ua;
using Opc.Ua.Server; using Opc.Ua.Server;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions; using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Core.Resilience;
using ZB.MOM.WW.OtOpcUa.Server.Security; using ZB.MOM.WW.OtOpcUa.Server.Security;
using DriverWriteRequest = ZB.MOM.WW.OtOpcUa.Core.Abstractions.WriteRequest; using DriverWriteRequest = ZB.MOM.WW.OtOpcUa.Core.Abstractions.WriteRequest;
// Core.Abstractions defines a type-named HistoryReadResult (driver-side samples + continuation // Core.Abstractions defines a type-named HistoryReadResult (driver-side samples + continuation
@@ -33,8 +34,14 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder
private readonly IDriver _driver; private readonly IDriver _driver;
private readonly IReadable? _readable; private readonly IReadable? _readable;
private readonly IWritable? _writable; private readonly IWritable? _writable;
private readonly CapabilityInvoker _invoker;
private readonly ILogger<DriverNodeManager> _logger; private readonly ILogger<DriverNodeManager> _logger;
// Per-variable idempotency flag populated during Variable() registration from
// DriverAttributeInfo.WriteIdempotent. Drives ExecuteWriteAsync's retry gating in
// OnWriteValue; absent entries default to false (decisions #44, #45, #143).
private readonly Dictionary<string, bool> _writeIdempotentByFullRef = new(StringComparer.OrdinalIgnoreCase);
/// <summary>The driver whose address space this node manager exposes.</summary> /// <summary>The driver whose address space this node manager exposes.</summary>
public IDriver Driver => _driver; public IDriver Driver => _driver;
@@ -53,12 +60,13 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder
private FolderState _currentFolder = null!; private FolderState _currentFolder = null!;
public DriverNodeManager(IServerInternal server, ApplicationConfiguration configuration, public DriverNodeManager(IServerInternal server, ApplicationConfiguration configuration,
IDriver driver, ILogger<DriverNodeManager> logger) IDriver driver, CapabilityInvoker invoker, ILogger<DriverNodeManager> logger)
: base(server, configuration, namespaceUris: $"urn:OtOpcUa:{driver.DriverInstanceId}") : base(server, configuration, namespaceUris: $"urn:OtOpcUa:{driver.DriverInstanceId}")
{ {
_driver = driver; _driver = driver;
_readable = driver as IReadable; _readable = driver as IReadable;
_writable = driver as IWritable; _writable = driver as IWritable;
_invoker = invoker;
_logger = logger; _logger = logger;
} }
@@ -148,6 +156,7 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder
AddPredefinedNode(SystemContext, v); AddPredefinedNode(SystemContext, v);
_variablesByFullRef[attributeInfo.FullName] = v; _variablesByFullRef[attributeInfo.FullName] = v;
_securityByFullRef[attributeInfo.FullName] = attributeInfo.SecurityClass; _securityByFullRef[attributeInfo.FullName] = attributeInfo.SecurityClass;
_writeIdempotentByFullRef[attributeInfo.FullName] = attributeInfo.WriteIdempotent;
v.OnReadValue = OnReadValue; v.OnReadValue = OnReadValue;
v.OnWriteValue = OnWriteValue; v.OnWriteValue = OnWriteValue;
@@ -188,7 +197,11 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder
try try
{ {
var fullRef = node.NodeId.Identifier as string ?? ""; var fullRef = node.NodeId.Identifier as string ?? "";
var result = _readable.ReadAsync([fullRef], CancellationToken.None).GetAwaiter().GetResult(); var result = _invoker.ExecuteAsync(
DriverCapability.Read,
_driver.DriverInstanceId,
async ct => (IReadOnlyList<DataValueSnapshot>)await _readable.ReadAsync([fullRef], ct).ConfigureAwait(false),
CancellationToken.None).AsTask().GetAwaiter().GetResult();
if (result.Count == 0) if (result.Count == 0)
{ {
statusCode = StatusCodes.BadNoData; statusCode = StatusCodes.BadNoData;
@@ -381,9 +394,15 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder
try try
{ {
var results = _writable.WriteAsync( var isIdempotent = _writeIdempotentByFullRef.GetValueOrDefault(fullRef!, false);
[new DriverWriteRequest(fullRef!, value)], var capturedValue = value;
CancellationToken.None).GetAwaiter().GetResult(); var results = _invoker.ExecuteWriteAsync(
_driver.DriverInstanceId,
isIdempotent,
async ct => (IReadOnlyList<WriteResult>)await _writable.WriteAsync(
[new DriverWriteRequest(fullRef!, capturedValue)],
ct).ConfigureAwait(false),
CancellationToken.None).AsTask().GetAwaiter().GetResult();
if (results.Count > 0 && results[0].StatusCode != 0) if (results.Count > 0 && results[0].StatusCode != 0)
{ {
statusCode = results[0].StatusCode; statusCode = results[0].StatusCode;

View File

@@ -3,6 +3,7 @@ using Opc.Ua;
using Opc.Ua.Configuration; using Opc.Ua.Configuration;
using ZB.MOM.WW.OtOpcUa.Core.Hosting; using ZB.MOM.WW.OtOpcUa.Core.Hosting;
using ZB.MOM.WW.OtOpcUa.Core.OpcUa; using ZB.MOM.WW.OtOpcUa.Core.OpcUa;
using ZB.MOM.WW.OtOpcUa.Core.Resilience;
using ZB.MOM.WW.OtOpcUa.Server.Security; using ZB.MOM.WW.OtOpcUa.Server.Security;
namespace ZB.MOM.WW.OtOpcUa.Server.OpcUa; namespace ZB.MOM.WW.OtOpcUa.Server.OpcUa;
@@ -20,6 +21,7 @@ public sealed class OpcUaApplicationHost : IAsyncDisposable
private readonly OpcUaServerOptions _options; private readonly OpcUaServerOptions _options;
private readonly DriverHost _driverHost; private readonly DriverHost _driverHost;
private readonly IUserAuthenticator _authenticator; private readonly IUserAuthenticator _authenticator;
private readonly DriverResiliencePipelineBuilder _pipelineBuilder;
private readonly ILoggerFactory _loggerFactory; private readonly ILoggerFactory _loggerFactory;
private readonly ILogger<OpcUaApplicationHost> _logger; private readonly ILogger<OpcUaApplicationHost> _logger;
private ApplicationInstance? _application; private ApplicationInstance? _application;
@@ -27,11 +29,13 @@ public sealed class OpcUaApplicationHost : IAsyncDisposable
private bool _disposed; private bool _disposed;
public OpcUaApplicationHost(OpcUaServerOptions options, DriverHost driverHost, public OpcUaApplicationHost(OpcUaServerOptions options, DriverHost driverHost,
IUserAuthenticator authenticator, ILoggerFactory loggerFactory, ILogger<OpcUaApplicationHost> logger) IUserAuthenticator authenticator, ILoggerFactory loggerFactory, ILogger<OpcUaApplicationHost> logger,
DriverResiliencePipelineBuilder? pipelineBuilder = null)
{ {
_options = options; _options = options;
_driverHost = driverHost; _driverHost = driverHost;
_authenticator = authenticator; _authenticator = authenticator;
_pipelineBuilder = pipelineBuilder ?? new DriverResiliencePipelineBuilder();
_loggerFactory = loggerFactory; _loggerFactory = loggerFactory;
_logger = logger; _logger = logger;
} }
@@ -58,7 +62,7 @@ public sealed class OpcUaApplicationHost : IAsyncDisposable
throw new InvalidOperationException( throw new InvalidOperationException(
$"OPC UA application certificate could not be validated or created in {_options.PkiStoreRoot}"); $"OPC UA application certificate could not be validated or created in {_options.PkiStoreRoot}");
_server = new OtOpcUaServer(_driverHost, _authenticator, _loggerFactory); _server = new OtOpcUaServer(_driverHost, _authenticator, _pipelineBuilder, _loggerFactory);
await _application.Start(_server).ConfigureAwait(false); await _application.Start(_server).ConfigureAwait(false);
_logger.LogInformation("OPC UA server started — endpoint={Endpoint} driverCount={Count}", _logger.LogInformation("OPC UA server started — endpoint={Endpoint} driverCount={Count}",

View File

@@ -5,6 +5,7 @@ using Opc.Ua.Server;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions; using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Core.Hosting; using ZB.MOM.WW.OtOpcUa.Core.Hosting;
using ZB.MOM.WW.OtOpcUa.Core.OpcUa; using ZB.MOM.WW.OtOpcUa.Core.OpcUa;
using ZB.MOM.WW.OtOpcUa.Core.Resilience;
using ZB.MOM.WW.OtOpcUa.Server.Security; using ZB.MOM.WW.OtOpcUa.Server.Security;
namespace ZB.MOM.WW.OtOpcUa.Server.OpcUa; namespace ZB.MOM.WW.OtOpcUa.Server.OpcUa;
@@ -19,13 +20,19 @@ public sealed class OtOpcUaServer : StandardServer
{ {
private readonly DriverHost _driverHost; private readonly DriverHost _driverHost;
private readonly IUserAuthenticator _authenticator; private readonly IUserAuthenticator _authenticator;
private readonly DriverResiliencePipelineBuilder _pipelineBuilder;
private readonly ILoggerFactory _loggerFactory; private readonly ILoggerFactory _loggerFactory;
private readonly List<DriverNodeManager> _driverNodeManagers = new(); private readonly List<DriverNodeManager> _driverNodeManagers = new();
public OtOpcUaServer(DriverHost driverHost, IUserAuthenticator authenticator, ILoggerFactory loggerFactory) public OtOpcUaServer(
DriverHost driverHost,
IUserAuthenticator authenticator,
DriverResiliencePipelineBuilder pipelineBuilder,
ILoggerFactory loggerFactory)
{ {
_driverHost = driverHost; _driverHost = driverHost;
_authenticator = authenticator; _authenticator = authenticator;
_pipelineBuilder = pipelineBuilder;
_loggerFactory = loggerFactory; _loggerFactory = loggerFactory;
} }
@@ -46,7 +53,12 @@ public sealed class OtOpcUaServer : StandardServer
if (driver is null) continue; if (driver is null) continue;
var logger = _loggerFactory.CreateLogger<DriverNodeManager>(); var logger = _loggerFactory.CreateLogger<DriverNodeManager>();
var manager = new DriverNodeManager(server, configuration, driver, logger); // Per-driver resilience options: default Tier A pending Stream B.1 which wires
// per-type tiers into DriverTypeRegistry. Read ResilienceConfig JSON from the
// DriverInstance row in a follow-up PR; for now every driver gets Tier A defaults.
var options = new DriverResilienceOptions { Tier = DriverTier.A };
var invoker = new CapabilityInvoker(_pipelineBuilder, driver.DriverInstanceId, () => options);
var manager = new DriverNodeManager(server, configuration, driver, invoker, logger);
_driverNodeManagers.Add(manager); _driverNodeManagers.Add(manager);
} }

View File

@@ -23,7 +23,10 @@ public sealed class FlakeyDriverIntegrationTests
Tier = DriverTier.A, Tier = DriverTier.A,
CapabilityPolicies = new Dictionary<DriverCapability, CapabilityPolicy> CapabilityPolicies = new Dictionary<DriverCapability, CapabilityPolicy>
{ {
[DriverCapability.Read] = new(TimeoutSeconds: 2, RetryCount: 10, BreakerFailureThreshold: 50), // TimeoutSeconds=30 gives slack for 5 exponential-backoff retries under
// parallel-test-execution CPU pressure; 10 retries at the default Delay=100ms
// exponential can otherwise exceed a 2-second budget intermittently.
[DriverCapability.Read] = new(TimeoutSeconds: 30, RetryCount: 10, BreakerFailureThreshold: 50),
}, },
}; };
var invoker = new CapabilityInvoker(new DriverResiliencePipelineBuilder(), "drv-test", () => options); var invoker = new CapabilityInvoker(new DriverResiliencePipelineBuilder(), "drv-test", () => options);