diff --git a/src/ZB.MOM.WW.OtOpcUa.Server/OpcUa/DriverNodeManager.cs b/src/ZB.MOM.WW.OtOpcUa.Server/OpcUa/DriverNodeManager.cs index 4857adb..06ab659 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Server/OpcUa/DriverNodeManager.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Server/OpcUa/DriverNodeManager.cs @@ -3,6 +3,7 @@ using Microsoft.Extensions.Logging; using Opc.Ua; using Opc.Ua.Server; using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.Core.Resilience; using ZB.MOM.WW.OtOpcUa.Server.Security; using DriverWriteRequest = ZB.MOM.WW.OtOpcUa.Core.Abstractions.WriteRequest; // Core.Abstractions defines a type-named HistoryReadResult (driver-side samples + continuation @@ -33,8 +34,14 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder private readonly IDriver _driver; private readonly IReadable? _readable; private readonly IWritable? _writable; + private readonly CapabilityInvoker _invoker; private readonly ILogger _logger; + // Per-variable idempotency flag populated during Variable() registration from + // DriverAttributeInfo.WriteIdempotent. Drives ExecuteWriteAsync's retry gating in + // OnWriteValue; absent entries default to false (decisions #44, #45, #143). + private readonly Dictionary _writeIdempotentByFullRef = new(StringComparer.OrdinalIgnoreCase); + /// The driver whose address space this node manager exposes. public IDriver Driver => _driver; @@ -53,12 +60,13 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder private FolderState _currentFolder = null!; public DriverNodeManager(IServerInternal server, ApplicationConfiguration configuration, - IDriver driver, ILogger logger) + IDriver driver, CapabilityInvoker invoker, ILogger logger) : base(server, configuration, namespaceUris: $"urn:OtOpcUa:{driver.DriverInstanceId}") { _driver = driver; _readable = driver as IReadable; _writable = driver as IWritable; + _invoker = invoker; _logger = logger; } @@ -148,6 +156,7 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder AddPredefinedNode(SystemContext, v); _variablesByFullRef[attributeInfo.FullName] = v; _securityByFullRef[attributeInfo.FullName] = attributeInfo.SecurityClass; + _writeIdempotentByFullRef[attributeInfo.FullName] = attributeInfo.WriteIdempotent; v.OnReadValue = OnReadValue; v.OnWriteValue = OnWriteValue; @@ -188,7 +197,11 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder try { var fullRef = node.NodeId.Identifier as string ?? ""; - var result = _readable.ReadAsync([fullRef], CancellationToken.None).GetAwaiter().GetResult(); + var result = _invoker.ExecuteAsync( + DriverCapability.Read, + _driver.DriverInstanceId, + async ct => (IReadOnlyList)await _readable.ReadAsync([fullRef], ct).ConfigureAwait(false), + CancellationToken.None).AsTask().GetAwaiter().GetResult(); if (result.Count == 0) { statusCode = StatusCodes.BadNoData; @@ -381,9 +394,15 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder try { - var results = _writable.WriteAsync( - [new DriverWriteRequest(fullRef!, value)], - CancellationToken.None).GetAwaiter().GetResult(); + var isIdempotent = _writeIdempotentByFullRef.GetValueOrDefault(fullRef!, false); + var capturedValue = value; + var results = _invoker.ExecuteWriteAsync( + _driver.DriverInstanceId, + isIdempotent, + async ct => (IReadOnlyList)await _writable.WriteAsync( + [new DriverWriteRequest(fullRef!, capturedValue)], + ct).ConfigureAwait(false), + CancellationToken.None).AsTask().GetAwaiter().GetResult(); if (results.Count > 0 && results[0].StatusCode != 0) { statusCode = results[0].StatusCode; diff --git a/src/ZB.MOM.WW.OtOpcUa.Server/OpcUa/OpcUaApplicationHost.cs b/src/ZB.MOM.WW.OtOpcUa.Server/OpcUa/OpcUaApplicationHost.cs index 7616012..e64e672 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Server/OpcUa/OpcUaApplicationHost.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Server/OpcUa/OpcUaApplicationHost.cs @@ -3,6 +3,7 @@ using Opc.Ua; using Opc.Ua.Configuration; using ZB.MOM.WW.OtOpcUa.Core.Hosting; using ZB.MOM.WW.OtOpcUa.Core.OpcUa; +using ZB.MOM.WW.OtOpcUa.Core.Resilience; using ZB.MOM.WW.OtOpcUa.Server.Security; namespace ZB.MOM.WW.OtOpcUa.Server.OpcUa; @@ -20,6 +21,7 @@ public sealed class OpcUaApplicationHost : IAsyncDisposable private readonly OpcUaServerOptions _options; private readonly DriverHost _driverHost; private readonly IUserAuthenticator _authenticator; + private readonly DriverResiliencePipelineBuilder _pipelineBuilder; private readonly ILoggerFactory _loggerFactory; private readonly ILogger _logger; private ApplicationInstance? _application; @@ -27,11 +29,13 @@ public sealed class OpcUaApplicationHost : IAsyncDisposable private bool _disposed; public OpcUaApplicationHost(OpcUaServerOptions options, DriverHost driverHost, - IUserAuthenticator authenticator, ILoggerFactory loggerFactory, ILogger logger) + IUserAuthenticator authenticator, ILoggerFactory loggerFactory, ILogger logger, + DriverResiliencePipelineBuilder? pipelineBuilder = null) { _options = options; _driverHost = driverHost; _authenticator = authenticator; + _pipelineBuilder = pipelineBuilder ?? new DriverResiliencePipelineBuilder(); _loggerFactory = loggerFactory; _logger = logger; } @@ -58,7 +62,7 @@ public sealed class OpcUaApplicationHost : IAsyncDisposable throw new InvalidOperationException( $"OPC UA application certificate could not be validated or created in {_options.PkiStoreRoot}"); - _server = new OtOpcUaServer(_driverHost, _authenticator, _loggerFactory); + _server = new OtOpcUaServer(_driverHost, _authenticator, _pipelineBuilder, _loggerFactory); await _application.Start(_server).ConfigureAwait(false); _logger.LogInformation("OPC UA server started — endpoint={Endpoint} driverCount={Count}", diff --git a/src/ZB.MOM.WW.OtOpcUa.Server/OpcUa/OtOpcUaServer.cs b/src/ZB.MOM.WW.OtOpcUa.Server/OpcUa/OtOpcUaServer.cs index 8ccb660..1fd231a 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Server/OpcUa/OtOpcUaServer.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Server/OpcUa/OtOpcUaServer.cs @@ -5,6 +5,7 @@ using Opc.Ua.Server; using ZB.MOM.WW.OtOpcUa.Core.Abstractions; using ZB.MOM.WW.OtOpcUa.Core.Hosting; using ZB.MOM.WW.OtOpcUa.Core.OpcUa; +using ZB.MOM.WW.OtOpcUa.Core.Resilience; using ZB.MOM.WW.OtOpcUa.Server.Security; namespace ZB.MOM.WW.OtOpcUa.Server.OpcUa; @@ -19,13 +20,19 @@ public sealed class OtOpcUaServer : StandardServer { private readonly DriverHost _driverHost; private readonly IUserAuthenticator _authenticator; + private readonly DriverResiliencePipelineBuilder _pipelineBuilder; private readonly ILoggerFactory _loggerFactory; private readonly List _driverNodeManagers = new(); - public OtOpcUaServer(DriverHost driverHost, IUserAuthenticator authenticator, ILoggerFactory loggerFactory) + public OtOpcUaServer( + DriverHost driverHost, + IUserAuthenticator authenticator, + DriverResiliencePipelineBuilder pipelineBuilder, + ILoggerFactory loggerFactory) { _driverHost = driverHost; _authenticator = authenticator; + _pipelineBuilder = pipelineBuilder; _loggerFactory = loggerFactory; } @@ -46,7 +53,12 @@ public sealed class OtOpcUaServer : StandardServer if (driver is null) continue; var logger = _loggerFactory.CreateLogger(); - var manager = new DriverNodeManager(server, configuration, driver, logger); + // Per-driver resilience options: default Tier A pending Stream B.1 which wires + // per-type tiers into DriverTypeRegistry. Read ResilienceConfig JSON from the + // DriverInstance row in a follow-up PR; for now every driver gets Tier A defaults. + var options = new DriverResilienceOptions { Tier = DriverTier.A }; + var invoker = new CapabilityInvoker(_pipelineBuilder, driver.DriverInstanceId, () => options); + var manager = new DriverNodeManager(server, configuration, driver, invoker, logger); _driverNodeManagers.Add(manager); } diff --git a/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/FlakeyDriverIntegrationTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/FlakeyDriverIntegrationTests.cs index d58e393..d33807e 100644 --- a/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/FlakeyDriverIntegrationTests.cs +++ b/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/FlakeyDriverIntegrationTests.cs @@ -23,7 +23,10 @@ public sealed class FlakeyDriverIntegrationTests Tier = DriverTier.A, CapabilityPolicies = new Dictionary { - [DriverCapability.Read] = new(TimeoutSeconds: 2, RetryCount: 10, BreakerFailureThreshold: 50), + // TimeoutSeconds=30 gives slack for 5 exponential-backoff retries under + // parallel-test-execution CPU pressure; 10 retries at the default Delay=100ms + // exponential can otherwise exceed a 2-second budget intermittently. + [DriverCapability.Read] = new(TimeoutSeconds: 30, RetryCount: 10, BreakerFailureThreshold: 50), }, }; var invoker = new CapabilityInvoker(new DriverResiliencePipelineBuilder(), "drv-test", () => options);