From 29bcaf277b9593c1e3b120e9e0a9a61c35b0af59 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 19 Apr 2026 07:28:28 -0400 Subject: [PATCH] =?UTF-8?q?Phase=206.1=20Stream=20A.3=20complete=20?= =?UTF-8?q?=E2=80=94=20wire=20CapabilityInvoker=20into=20DriverNodeManager?= =?UTF-8?q?=20dispatch=20end-to-end?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Every OnReadValue / OnWriteValue now routes through the process-singleton DriverResiliencePipelineBuilder's CapabilityInvoker. Read / Write dispatch paths gain timeout + per-capability retry + per-(driver, host) circuit breaker + bulkhead without touching the individual driver implementations. Wiring: - OpcUaApplicationHost: new optional DriverResiliencePipelineBuilder ctor parameter (default null → instance-owned builder). Keeps the 3 test call sites that construct OpcUaApplicationHost directly unchanged. - OtOpcUaServer: requires the builder in its ctor; constructs one CapabilityInvoker per driver at CreateMasterNodeManager time with default Tier A DriverResilienceOptions. TODO: Stream B.1 will wire real per-driver- type tiers via DriverTypeRegistry; Phase 6.1 follow-up will read the DriverInstance.ResilienceConfig JSON column for per-instance overrides. - DriverNodeManager: takes a CapabilityInvoker in its ctor. OnReadValue wraps the driver's ReadAsync through ExecuteAsync(DriverCapability.Read, hostName, ...); OnWriteValue wraps WriteAsync through ExecuteWriteAsync(hostName, isIdempotent, ...) where isIdempotent comes from the new _writeIdempotentByFullRef map populated at Variable() registration from DriverAttributeInfo.WriteIdempotent. HostName defaults to driver.DriverInstanceId for now — a single-host pipeline per driver. Multi-host drivers (Modbus with N PLCs) will expose their own per- call host resolution in a follow-up so failing PLCs can trip per-PLC breakers without poisoning siblings (decision #144). Test fixup: - FlakeyDriverIntegrationTests.Read_SurfacesSuccess_AfterTransientFailures: bumped TimeoutSeconds=2 → 30. 10 retries at exponential backoff with jitter can exceed 2s under parallel-test-run CPU pressure; the test asserts retry behavior, not timeout budget, so the longer slack keeps it deterministic. Full solution dotnet test: 948 passing. Pre-existing Client.CLI Subscribe flake unchanged. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../OpcUa/DriverNodeManager.cs | 29 +++++++++++++++---- .../OpcUa/OpcUaApplicationHost.cs | 8 +++-- .../OpcUa/OtOpcUaServer.cs | 16 ++++++++-- .../FlakeyDriverIntegrationTests.cs | 5 +++- 4 files changed, 48 insertions(+), 10 deletions(-) diff --git a/src/ZB.MOM.WW.OtOpcUa.Server/OpcUa/DriverNodeManager.cs b/src/ZB.MOM.WW.OtOpcUa.Server/OpcUa/DriverNodeManager.cs index 4857adb..06ab659 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Server/OpcUa/DriverNodeManager.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Server/OpcUa/DriverNodeManager.cs @@ -3,6 +3,7 @@ using Microsoft.Extensions.Logging; using Opc.Ua; using Opc.Ua.Server; using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.Core.Resilience; using ZB.MOM.WW.OtOpcUa.Server.Security; using DriverWriteRequest = ZB.MOM.WW.OtOpcUa.Core.Abstractions.WriteRequest; // Core.Abstractions defines a type-named HistoryReadResult (driver-side samples + continuation @@ -33,8 +34,14 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder private readonly IDriver _driver; private readonly IReadable? _readable; private readonly IWritable? _writable; + private readonly CapabilityInvoker _invoker; private readonly ILogger _logger; + // Per-variable idempotency flag populated during Variable() registration from + // DriverAttributeInfo.WriteIdempotent. Drives ExecuteWriteAsync's retry gating in + // OnWriteValue; absent entries default to false (decisions #44, #45, #143). + private readonly Dictionary _writeIdempotentByFullRef = new(StringComparer.OrdinalIgnoreCase); + /// The driver whose address space this node manager exposes. public IDriver Driver => _driver; @@ -53,12 +60,13 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder private FolderState _currentFolder = null!; public DriverNodeManager(IServerInternal server, ApplicationConfiguration configuration, - IDriver driver, ILogger logger) + IDriver driver, CapabilityInvoker invoker, ILogger logger) : base(server, configuration, namespaceUris: $"urn:OtOpcUa:{driver.DriverInstanceId}") { _driver = driver; _readable = driver as IReadable; _writable = driver as IWritable; + _invoker = invoker; _logger = logger; } @@ -148,6 +156,7 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder AddPredefinedNode(SystemContext, v); _variablesByFullRef[attributeInfo.FullName] = v; _securityByFullRef[attributeInfo.FullName] = attributeInfo.SecurityClass; + _writeIdempotentByFullRef[attributeInfo.FullName] = attributeInfo.WriteIdempotent; v.OnReadValue = OnReadValue; v.OnWriteValue = OnWriteValue; @@ -188,7 +197,11 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder try { var fullRef = node.NodeId.Identifier as string ?? ""; - var result = _readable.ReadAsync([fullRef], CancellationToken.None).GetAwaiter().GetResult(); + var result = _invoker.ExecuteAsync( + DriverCapability.Read, + _driver.DriverInstanceId, + async ct => (IReadOnlyList)await _readable.ReadAsync([fullRef], ct).ConfigureAwait(false), + CancellationToken.None).AsTask().GetAwaiter().GetResult(); if (result.Count == 0) { statusCode = StatusCodes.BadNoData; @@ -381,9 +394,15 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder try { - var results = _writable.WriteAsync( - [new DriverWriteRequest(fullRef!, value)], - CancellationToken.None).GetAwaiter().GetResult(); + var isIdempotent = _writeIdempotentByFullRef.GetValueOrDefault(fullRef!, false); + var capturedValue = value; + var results = _invoker.ExecuteWriteAsync( + _driver.DriverInstanceId, + isIdempotent, + async ct => (IReadOnlyList)await _writable.WriteAsync( + [new DriverWriteRequest(fullRef!, capturedValue)], + ct).ConfigureAwait(false), + CancellationToken.None).AsTask().GetAwaiter().GetResult(); if (results.Count > 0 && results[0].StatusCode != 0) { statusCode = results[0].StatusCode; diff --git a/src/ZB.MOM.WW.OtOpcUa.Server/OpcUa/OpcUaApplicationHost.cs b/src/ZB.MOM.WW.OtOpcUa.Server/OpcUa/OpcUaApplicationHost.cs index 7616012..e64e672 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Server/OpcUa/OpcUaApplicationHost.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Server/OpcUa/OpcUaApplicationHost.cs @@ -3,6 +3,7 @@ using Opc.Ua; using Opc.Ua.Configuration; using ZB.MOM.WW.OtOpcUa.Core.Hosting; using ZB.MOM.WW.OtOpcUa.Core.OpcUa; +using ZB.MOM.WW.OtOpcUa.Core.Resilience; using ZB.MOM.WW.OtOpcUa.Server.Security; namespace ZB.MOM.WW.OtOpcUa.Server.OpcUa; @@ -20,6 +21,7 @@ public sealed class OpcUaApplicationHost : IAsyncDisposable private readonly OpcUaServerOptions _options; private readonly DriverHost _driverHost; private readonly IUserAuthenticator _authenticator; + private readonly DriverResiliencePipelineBuilder _pipelineBuilder; private readonly ILoggerFactory _loggerFactory; private readonly ILogger _logger; private ApplicationInstance? _application; @@ -27,11 +29,13 @@ public sealed class OpcUaApplicationHost : IAsyncDisposable private bool _disposed; public OpcUaApplicationHost(OpcUaServerOptions options, DriverHost driverHost, - IUserAuthenticator authenticator, ILoggerFactory loggerFactory, ILogger logger) + IUserAuthenticator authenticator, ILoggerFactory loggerFactory, ILogger logger, + DriverResiliencePipelineBuilder? pipelineBuilder = null) { _options = options; _driverHost = driverHost; _authenticator = authenticator; + _pipelineBuilder = pipelineBuilder ?? new DriverResiliencePipelineBuilder(); _loggerFactory = loggerFactory; _logger = logger; } @@ -58,7 +62,7 @@ public sealed class OpcUaApplicationHost : IAsyncDisposable throw new InvalidOperationException( $"OPC UA application certificate could not be validated or created in {_options.PkiStoreRoot}"); - _server = new OtOpcUaServer(_driverHost, _authenticator, _loggerFactory); + _server = new OtOpcUaServer(_driverHost, _authenticator, _pipelineBuilder, _loggerFactory); await _application.Start(_server).ConfigureAwait(false); _logger.LogInformation("OPC UA server started — endpoint={Endpoint} driverCount={Count}", diff --git a/src/ZB.MOM.WW.OtOpcUa.Server/OpcUa/OtOpcUaServer.cs b/src/ZB.MOM.WW.OtOpcUa.Server/OpcUa/OtOpcUaServer.cs index 8ccb660..1fd231a 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Server/OpcUa/OtOpcUaServer.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Server/OpcUa/OtOpcUaServer.cs @@ -5,6 +5,7 @@ using Opc.Ua.Server; using ZB.MOM.WW.OtOpcUa.Core.Abstractions; using ZB.MOM.WW.OtOpcUa.Core.Hosting; using ZB.MOM.WW.OtOpcUa.Core.OpcUa; +using ZB.MOM.WW.OtOpcUa.Core.Resilience; using ZB.MOM.WW.OtOpcUa.Server.Security; namespace ZB.MOM.WW.OtOpcUa.Server.OpcUa; @@ -19,13 +20,19 @@ public sealed class OtOpcUaServer : StandardServer { private readonly DriverHost _driverHost; private readonly IUserAuthenticator _authenticator; + private readonly DriverResiliencePipelineBuilder _pipelineBuilder; private readonly ILoggerFactory _loggerFactory; private readonly List _driverNodeManagers = new(); - public OtOpcUaServer(DriverHost driverHost, IUserAuthenticator authenticator, ILoggerFactory loggerFactory) + public OtOpcUaServer( + DriverHost driverHost, + IUserAuthenticator authenticator, + DriverResiliencePipelineBuilder pipelineBuilder, + ILoggerFactory loggerFactory) { _driverHost = driverHost; _authenticator = authenticator; + _pipelineBuilder = pipelineBuilder; _loggerFactory = loggerFactory; } @@ -46,7 +53,12 @@ public sealed class OtOpcUaServer : StandardServer if (driver is null) continue; var logger = _loggerFactory.CreateLogger(); - var manager = new DriverNodeManager(server, configuration, driver, logger); + // Per-driver resilience options: default Tier A pending Stream B.1 which wires + // per-type tiers into DriverTypeRegistry. Read ResilienceConfig JSON from the + // DriverInstance row in a follow-up PR; for now every driver gets Tier A defaults. + var options = new DriverResilienceOptions { Tier = DriverTier.A }; + var invoker = new CapabilityInvoker(_pipelineBuilder, driver.DriverInstanceId, () => options); + var manager = new DriverNodeManager(server, configuration, driver, invoker, logger); _driverNodeManagers.Add(manager); } diff --git a/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/FlakeyDriverIntegrationTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/FlakeyDriverIntegrationTests.cs index d58e393..d33807e 100644 --- a/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/FlakeyDriverIntegrationTests.cs +++ b/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/FlakeyDriverIntegrationTests.cs @@ -23,7 +23,10 @@ public sealed class FlakeyDriverIntegrationTests Tier = DriverTier.A, CapabilityPolicies = new Dictionary { - [DriverCapability.Read] = new(TimeoutSeconds: 2, RetryCount: 10, BreakerFailureThreshold: 50), + // TimeoutSeconds=30 gives slack for 5 exponential-backoff retries under + // parallel-test-execution CPU pressure; 10 retries at the default Delay=100ms + // exponential can otherwise exceed a 2-second budget intermittently. + [DriverCapability.Read] = new(TimeoutSeconds: 30, RetryCount: 10, BreakerFailureThreshold: 50), }, }; var invoker = new CapabilityInvoker(new DriverResiliencePipelineBuilder(), "drv-test", () => options);