From 90f7792c9239f25fbd04fbd626cdad322b5d0cc9 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 19 Apr 2026 04:09:26 -0400 Subject: [PATCH] =?UTF-8?q?Phase=206.1=20Stream=20A.3=20=E2=80=94=20Capabi?= =?UTF-8?q?lityInvoker=20wraps=20driver-capability=20calls=20through=20the?= =?UTF-8?q?=20shared=20pipeline?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit One invoker per (DriverInstance, IDriver) pair; calls ExecuteAsync(capability, host, callSite) and the invoker resolves the correct pipeline from the shared DriverResiliencePipelineBuilder. The options accessor is a Func so Admin-edit + pipeline-invalidate takes effect without restarting the invoker or the driver host. ExecuteWriteAsync(isIdempotent) is the explicit write-safety surface: - isIdempotent=false routes through a side pipeline with RetryCount=0 regardless of what the caller configured. The cache key carries a "::non-idempotent" suffix so it never collides with the retry-enabled write pipeline. - isIdempotent=true routes through the normal Write pipeline. If the user has configured Write retries (opt-in), the idempotent tag gets them; otherwise default-0 still wins. The server dispatch layer (next PR) reads WriteIdempotentAttribute on each tag definition once at driver-init time and feeds the boolean into ExecuteWriteAsync. Tests (6 new): - Read retries on transient failure; returns value from call site. - Write non-idempotent does NOT retry even when policy has 3 retries configured (the explicit decision-#44 guard at the dispatch surface). - Write idempotent retries when policy allows. - Write with default tier-A policy (RetryCount=0) never retries regardless of idempotency flag. - Different hosts get independent pipelines. Core.Tests now 44 passing (was 38). Invoker doc-refs completed (the XML comment on WriteIdempotentAttribute no longer references a non-existent type). Co-Authored-By: Claude Opus 4.7 (1M context) --- .../Resilience/CapabilityInvoker.cs | 106 ++++++++++++ .../Resilience/CapabilityInvokerTests.cs | 151 ++++++++++++++++++ 2 files changed, 257 insertions(+) create mode 100644 src/ZB.MOM.WW.OtOpcUa.Core/Resilience/CapabilityInvoker.cs create mode 100644 tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/CapabilityInvokerTests.cs diff --git a/src/ZB.MOM.WW.OtOpcUa.Core/Resilience/CapabilityInvoker.cs b/src/ZB.MOM.WW.OtOpcUa.Core/Resilience/CapabilityInvoker.cs new file mode 100644 index 0000000..e881dc7 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Core/Resilience/CapabilityInvoker.cs @@ -0,0 +1,106 @@ +using Polly; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; + +namespace ZB.MOM.WW.OtOpcUa.Core.Resilience; + +/// +/// Executes driver-capability calls through a shared Polly pipeline. One invoker per +/// (DriverInstance, IDriver) pair; the underlying +/// is process-singleton so all invokers share its cache. +/// +/// +/// Per docs/v2/plan.md decisions #143-144 and Phase 6.1 Stream A.3. The server's dispatch +/// layer routes every capability call (IReadable.ReadAsync, IWritable.WriteAsync, +/// ITagDiscovery.DiscoverAsync, ISubscribable.SubscribeAsync/UnsubscribeAsync, +/// IHostConnectivityProbe probe loop, IAlarmSource.SubscribeAlarmsAsync/AcknowledgeAsync, +/// and all four IHistoryProvider reads) through this invoker. +/// +public sealed class CapabilityInvoker +{ + private readonly DriverResiliencePipelineBuilder _builder; + private readonly Guid _driverInstanceId; + private readonly Func _optionsAccessor; + + /// + /// Construct an invoker for one driver instance. + /// + /// Shared, process-singleton pipeline builder. + /// The DriverInstance.Id column value. + /// + /// Snapshot accessor for the current resilience options. Invoked per call so Admin-edit + + /// pipeline-invalidate can take effect without restarting the invoker. + /// + public CapabilityInvoker( + DriverResiliencePipelineBuilder builder, + Guid driverInstanceId, + Func optionsAccessor) + { + ArgumentNullException.ThrowIfNull(builder); + ArgumentNullException.ThrowIfNull(optionsAccessor); + + _builder = builder; + _driverInstanceId = driverInstanceId; + _optionsAccessor = optionsAccessor; + } + + /// Execute a capability call returning a value, honoring the per-capability pipeline. + /// Return type of the underlying driver call. + public async ValueTask ExecuteAsync( + DriverCapability capability, + string hostName, + Func> callSite, + CancellationToken cancellationToken) + { + ArgumentNullException.ThrowIfNull(callSite); + + var pipeline = ResolvePipeline(capability, hostName); + return await pipeline.ExecuteAsync(callSite, cancellationToken).ConfigureAwait(false); + } + + /// Execute a void-returning capability call, honoring the per-capability pipeline. + public async ValueTask ExecuteAsync( + DriverCapability capability, + string hostName, + Func callSite, + CancellationToken cancellationToken) + { + ArgumentNullException.ThrowIfNull(callSite); + + var pipeline = ResolvePipeline(capability, hostName); + await pipeline.ExecuteAsync(callSite, cancellationToken).ConfigureAwait(false); + } + + /// + /// Execute a call honoring + /// semantics — if is false, retries are disabled regardless + /// of the tag-level configuration (the pipeline for a non-idempotent write never retries per + /// decisions #44-45). If true, the call runs through the capability's pipeline which may + /// retry when the tier configuration permits. + /// + public async ValueTask ExecuteWriteAsync( + string hostName, + bool isIdempotent, + Func> callSite, + CancellationToken cancellationToken) + { + ArgumentNullException.ThrowIfNull(callSite); + + if (!isIdempotent) + { + var noRetryOptions = _optionsAccessor() with + { + CapabilityPolicies = new Dictionary + { + [DriverCapability.Write] = _optionsAccessor().Resolve(DriverCapability.Write) with { RetryCount = 0 }, + }, + }; + var pipeline = _builder.GetOrCreate(_driverInstanceId, $"{hostName}::non-idempotent", DriverCapability.Write, noRetryOptions); + return await pipeline.ExecuteAsync(callSite, cancellationToken).ConfigureAwait(false); + } + + return await ExecuteAsync(DriverCapability.Write, hostName, callSite, cancellationToken).ConfigureAwait(false); + } + + private ResiliencePipeline ResolvePipeline(DriverCapability capability, string hostName) => + _builder.GetOrCreate(_driverInstanceId, hostName, capability, _optionsAccessor()); +} diff --git a/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/CapabilityInvokerTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/CapabilityInvokerTests.cs new file mode 100644 index 0000000..aa82652 --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/CapabilityInvokerTests.cs @@ -0,0 +1,151 @@ +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.Core.Resilience; + +namespace ZB.MOM.WW.OtOpcUa.Core.Tests.Resilience; + +[Trait("Category", "Unit")] +public sealed class CapabilityInvokerTests +{ + private static CapabilityInvoker MakeInvoker( + DriverResiliencePipelineBuilder builder, + DriverResilienceOptions options) => + new(builder, Guid.NewGuid(), () => options); + + [Fact] + public async Task Read_ReturnsValue_FromCallSite() + { + var invoker = MakeInvoker(new DriverResiliencePipelineBuilder(), new DriverResilienceOptions { Tier = DriverTier.A }); + + var result = await invoker.ExecuteAsync( + DriverCapability.Read, + "host-1", + _ => ValueTask.FromResult(42), + CancellationToken.None); + + result.ShouldBe(42); + } + + [Fact] + public async Task Read_Retries_OnTransientFailure() + { + var invoker = MakeInvoker(new DriverResiliencePipelineBuilder(), new DriverResilienceOptions { Tier = DriverTier.A }); + var attempts = 0; + + var result = await invoker.ExecuteAsync( + DriverCapability.Read, + "host-1", + async _ => + { + attempts++; + if (attempts < 2) throw new InvalidOperationException("transient"); + await Task.Yield(); + return "ok"; + }, + CancellationToken.None); + + result.ShouldBe("ok"); + attempts.ShouldBe(2); + } + + [Fact] + public async Task Write_NonIdempotent_DoesNotRetry_EvenWhenPolicyHasRetries() + { + var options = new DriverResilienceOptions + { + Tier = DriverTier.A, + CapabilityPolicies = new Dictionary + { + [DriverCapability.Write] = new(TimeoutSeconds: 2, RetryCount: 3, BreakerFailureThreshold: 5), + }, + }; + var invoker = MakeInvoker(new DriverResiliencePipelineBuilder(), options); + var attempts = 0; + + await Should.ThrowAsync(async () => + await invoker.ExecuteWriteAsync( + "host-1", + isIdempotent: false, + async _ => + { + attempts++; + await Task.Yield(); + throw new InvalidOperationException("boom"); +#pragma warning disable CS0162 + return 0; +#pragma warning restore CS0162 + }, + CancellationToken.None)); + + attempts.ShouldBe(1, "non-idempotent write must never replay"); + } + + [Fact] + public async Task Write_Idempotent_Retries_WhenPolicyHasRetries() + { + var options = new DriverResilienceOptions + { + Tier = DriverTier.A, + CapabilityPolicies = new Dictionary + { + [DriverCapability.Write] = new(TimeoutSeconds: 2, RetryCount: 3, BreakerFailureThreshold: 5), + }, + }; + var invoker = MakeInvoker(new DriverResiliencePipelineBuilder(), options); + var attempts = 0; + + var result = await invoker.ExecuteWriteAsync( + "host-1", + isIdempotent: true, + async _ => + { + attempts++; + if (attempts < 2) throw new InvalidOperationException("transient"); + await Task.Yield(); + return "ok"; + }, + CancellationToken.None); + + result.ShouldBe("ok"); + attempts.ShouldBe(2); + } + + [Fact] + public async Task Write_Default_DoesNotRetry_WhenPolicyHasZeroRetries() + { + // Tier A Write default is RetryCount=0. Even isIdempotent=true shouldn't retry + // because the policy says not to. + var invoker = MakeInvoker(new DriverResiliencePipelineBuilder(), new DriverResilienceOptions { Tier = DriverTier.A }); + var attempts = 0; + + await Should.ThrowAsync(async () => + await invoker.ExecuteWriteAsync( + "host-1", + isIdempotent: true, + async _ => + { + attempts++; + await Task.Yield(); + throw new InvalidOperationException("boom"); +#pragma warning disable CS0162 + return 0; +#pragma warning restore CS0162 + }, + CancellationToken.None)); + + attempts.ShouldBe(1, "tier-A default for Write is RetryCount=0"); + } + + [Fact] + public async Task Execute_HonorsDifferentHosts_Independently() + { + var builder = new DriverResiliencePipelineBuilder(); + var invoker = MakeInvoker(builder, new DriverResilienceOptions { Tier = DriverTier.A }); + + await invoker.ExecuteAsync(DriverCapability.Read, "host-a", _ => ValueTask.FromResult(1), CancellationToken.None); + await invoker.ExecuteAsync(DriverCapability.Read, "host-b", _ => ValueTask.FromResult(2), CancellationToken.None); + + builder.CachedPipelineCount.ShouldBe(2); + } +}