From c04b13f436da9de7f4934e3e38620e195b558f81 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 19 Apr 2026 04:07:27 -0400 Subject: [PATCH 1/6] =?UTF-8?q?Phase=206.1=20Stream=20A.1/A.2/A.6=20?= =?UTF-8?q?=E2=80=94=20Polly=20resilience=20foundation:=20pipeline=20build?= =?UTF-8?q?er=20+=20per-tier=20policy=20defaults=20+=20WriteIdempotent=20a?= =?UTF-8?q?ttribute?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Lands the first chunk of the Phase 6.1 Stream A resilience layer per docs/v2/implementation/phase-6-1-resilience-and-observability.md §Stream A. Downstream CapabilityInvoker (A.3) + driver-dispatch wiring land in follow-up PRs on the same branch. Core.Abstractions additions: - WriteIdempotentAttribute — marker for tag-definition records that opt into auto-retry on IWritable.WriteAsync. Absence = no retry per decisions #44, #45, #143. Read once via reflection at driver-init time; no per-write cost. - DriverCapability enum — enumerates the 8 capability surface points (Read / Write / Discover / Subscribe / Probe / AlarmSubscribe / AlarmAcknowledge / HistoryRead). AlarmAcknowledge is write-shaped (no retry by default). - DriverTier enum — A/B/C per driver-stability.md §2-4. Stream B.1 wires this into DriverTypeMetadata; surfaced here because the resilience policy defaults key on it. Core.Resilience new namespace: - DriverResilienceOptions — per-tier × per-capability policy defaults. GetTierDefaults(tier) is the source of truth: * Tier A: Read 2s/3 retries, Write 2s/0 retries, breaker threshold 5 * Tier B: Read 4s/3, Write 4s/0, breaker threshold 5 * Tier C: Read 10s/1, Write 10s/0, breaker threshold 0 (supervisor handles process-level breaker per decision #68) Resolve(capability) overlays CapabilityPolicies on top of the defaults. - DriverResiliencePipelineBuilder — composes Timeout → Retry (capability- permitting, never on cancellation) → CircuitBreaker (tier-permitting) → Bulkhead. Pipelines cached in a lock-free ConcurrentDictionary keyed on (DriverInstanceId, HostName, DriverCapability) per decision #144 — one dead PLC behind a multi-device driver does not open the breaker for healthy siblings. Invalidate(driverInstanceId) supports Admin-triggered reload. Tests (30 new, all pass): - DriverResilienceOptionsTests: tier-default coverage for every capability, Write + AlarmAcknowledge never retry at any tier, Tier C disables breaker, resolve-with-override layering. - DriverResiliencePipelineBuilderTests: Read retries transients, Write does NOT retry on failure (decision #44 guard), dead-host isolation from sibling hosts, pipeline reuse for same triple, per-capability isolation, breaker opens after threshold on Tier A, timeout fires, cancellation is not retried, invalidation scoped to matching instance. Polly.Core 8.6.6 added to Core.csproj. Full solution dotnet test: 936 passing (baseline 906 + 30 new). One pre-existing Client.CLI Subscribe flake unchanged by this PR. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../DriverCapability.cs | 42 ++++ .../DriverTier.cs | 34 +++ .../WriteIdempotentAttribute.cs | 19 ++ .../Resilience/DriverResilienceOptions.cs | 96 ++++++++ .../DriverResiliencePipelineBuilder.cs | 118 ++++++++++ .../ZB.MOM.WW.OtOpcUa.Core.csproj | 4 + .../DriverResilienceOptionsTests.cs | 102 ++++++++ .../DriverResiliencePipelineBuilderTests.cs | 222 ++++++++++++++++++ 8 files changed, 637 insertions(+) create mode 100644 src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/DriverCapability.cs create mode 100644 src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/DriverTier.cs create mode 100644 src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/WriteIdempotentAttribute.cs create mode 100644 src/ZB.MOM.WW.OtOpcUa.Core/Resilience/DriverResilienceOptions.cs create mode 100644 src/ZB.MOM.WW.OtOpcUa.Core/Resilience/DriverResiliencePipelineBuilder.cs create mode 100644 tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/DriverResilienceOptionsTests.cs create mode 100644 tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/DriverResiliencePipelineBuilderTests.cs diff --git a/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/DriverCapability.cs b/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/DriverCapability.cs new file mode 100644 index 0000000..79dfeac --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/DriverCapability.cs @@ -0,0 +1,42 @@ +namespace ZB.MOM.WW.OtOpcUa.Core.Abstractions; + +/// +/// Enumerates the driver-capability surface points guarded by Phase 6.1 resilience pipelines. +/// Each value corresponds to one method (or tightly-related method group) on the +/// Core.Abstractions capability interfaces (, , +/// , , , +/// , ). +/// +/// +/// Per docs/v2/plan.md decision #143 (per-capability retry policy): Read / HistoryRead / +/// Discover / Probe / AlarmSubscribe auto-retry; does NOT retry unless the +/// tag-definition carries . Alarm-acknowledge is treated +/// as a write for retry semantics (an alarm-ack is not idempotent at the plant-floor acknowledgement +/// level even if the OPC UA spec permits re-issue). +/// +public enum DriverCapability +{ + /// Batch . Retries by default. + Read, + + /// Batch . Does not retry unless tag is idempotent. + Write, + + /// . Retries by default. + Discover, + + /// and unsubscribe. Retries by default. + Subscribe, + + /// probe loop. Retries by default. + Probe, + + /// . Retries by default. + AlarmSubscribe, + + /// . Does NOT retry — ack is a write-shaped operation (decision #143). + AlarmAcknowledge, + + /// reads (Raw/Processed/AtTime/Events). Retries by default. + HistoryRead, +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/DriverTier.cs b/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/DriverTier.cs new file mode 100644 index 0000000..92d72da --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/DriverTier.cs @@ -0,0 +1,34 @@ +namespace ZB.MOM.WW.OtOpcUa.Core.Abstractions; + +/// +/// Stability tier of a driver type. Determines which cross-cutting runtime protections +/// apply — per-tier retry defaults, memory-tracking thresholds, and whether out-of-process +/// supervision with process-level recycle is in play. +/// +/// +/// Per docs/v2/driver-stability.md §2-4 and docs/v2/plan.md decisions #63-74. +/// +/// +/// A — managed, known-good SDK; low blast radius. In-process. Fast retries. +/// Examples: OPC UA Client (OPCFoundation stack), S7 (S7NetPlus). +/// B — native or semi-trusted SDK with an in-process footprint. Examples: Modbus. +/// C — unmanaged SDK with COM/STA constraints, leak risk, or other out-of-process +/// requirements. Must run as a separate Host process behind a Proxy with a supervisor that +/// can recycle the process on hard-breach. Example: Galaxy (MXAccess COM). +/// +/// +/// Process-kill protections (MemoryRecycle, ScheduledRecycleScheduler) are +/// Tier C only per decisions #73-74 and #145 — killing an in-process Tier A/B driver also kills +/// every OPC UA session and every co-hosted driver, blast-radius worse than the leak. +/// +public enum DriverTier +{ + /// Managed SDK, in-process, low blast radius. + A, + + /// Native or semi-trusted SDK, in-process. + B, + + /// Unmanaged SDK, out-of-process required with Proxy+Host+Supervisor. + C, +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/WriteIdempotentAttribute.cs b/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/WriteIdempotentAttribute.cs new file mode 100644 index 0000000..1c0c0e6 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/WriteIdempotentAttribute.cs @@ -0,0 +1,19 @@ +namespace ZB.MOM.WW.OtOpcUa.Core.Abstractions; + +/// +/// Opts a tag-definition record into auto-retry on failures. +/// Absence of this attribute means writes are not retried — a timed-out write may have +/// already succeeded at the device, and replaying pulses, alarm acks, counter increments, or +/// recipe-step advances can duplicate irreversible field actions. +/// +/// +/// Per docs/v2/plan.md decisions #44, #45, and #143. Applied to tag-definition POCOs +/// (e.g. ModbusTagDefinition, S7TagDefinition, OPC UA client tag rows) at the +/// property or record level. The CapabilityInvoker in ZB.MOM.WW.OtOpcUa.Core.Resilience +/// reads this attribute via reflection once at driver-init time and caches the result; no +/// per-write reflection cost. +/// +[AttributeUsage(AttributeTargets.Property | AttributeTargets.Class | AttributeTargets.Struct, AllowMultiple = false, Inherited = true)] +public sealed class WriteIdempotentAttribute : Attribute +{ +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Core/Resilience/DriverResilienceOptions.cs b/src/ZB.MOM.WW.OtOpcUa.Core/Resilience/DriverResilienceOptions.cs new file mode 100644 index 0000000..8fe1497 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Core/Resilience/DriverResilienceOptions.cs @@ -0,0 +1,96 @@ +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; + +namespace ZB.MOM.WW.OtOpcUa.Core.Resilience; + +/// +/// Per-tier × per-capability resilience policy configuration for a driver instance. +/// Bound from DriverInstance.ResilienceConfig JSON (nullable column; null = tier defaults). +/// Per docs/v2/plan.md decisions #143 and #144. +/// +public sealed record DriverResilienceOptions +{ + /// Tier the owning driver type is registered as; drives the default map. + public required DriverTier Tier { get; init; } + + /// + /// Per-capability policy overrides. Capabilities absent from this map fall back to + /// for the configured . + /// + public IReadOnlyDictionary CapabilityPolicies { get; init; } + = new Dictionary(); + + /// Bulkhead (max concurrent in-flight calls) for every capability. Default 32. + public int BulkheadMaxConcurrent { get; init; } = 32; + + /// + /// Bulkhead queue depth. Zero = no queueing; overflow fails fast with + /// BulkheadRejectedException. Default 64. + /// + public int BulkheadMaxQueue { get; init; } = 64; + + /// + /// Look up the effective policy for a capability, falling back to tier defaults when no + /// override is configured. Never returns null. + /// + public CapabilityPolicy Resolve(DriverCapability capability) + { + if (CapabilityPolicies.TryGetValue(capability, out var policy)) + return policy; + + var defaults = GetTierDefaults(Tier); + return defaults[capability]; + } + + /// + /// Per-tier per-capability default policy table, per decisions #143-144 and the Phase 6.1 + /// Stream A.2 specification. Retries skipped on and + /// regardless of tier. + /// + public static IReadOnlyDictionary GetTierDefaults(DriverTier tier) => + tier switch + { + DriverTier.A => new Dictionary + { + [DriverCapability.Read] = new(TimeoutSeconds: 2, RetryCount: 3, BreakerFailureThreshold: 5), + [DriverCapability.Write] = new(TimeoutSeconds: 2, RetryCount: 0, BreakerFailureThreshold: 5), + [DriverCapability.Discover] = new(TimeoutSeconds: 30, RetryCount: 2, BreakerFailureThreshold: 3), + [DriverCapability.Subscribe] = new(TimeoutSeconds: 5, RetryCount: 3, BreakerFailureThreshold: 5), + [DriverCapability.Probe] = new(TimeoutSeconds: 2, RetryCount: 3, BreakerFailureThreshold: 5), + [DriverCapability.AlarmSubscribe] = new(TimeoutSeconds: 5, RetryCount: 3, BreakerFailureThreshold: 5), + [DriverCapability.AlarmAcknowledge] = new(TimeoutSeconds: 5, RetryCount: 0, BreakerFailureThreshold: 5), + [DriverCapability.HistoryRead] = new(TimeoutSeconds: 30, RetryCount: 2, BreakerFailureThreshold: 5), + }, + DriverTier.B => new Dictionary + { + [DriverCapability.Read] = new(TimeoutSeconds: 4, RetryCount: 3, BreakerFailureThreshold: 5), + [DriverCapability.Write] = new(TimeoutSeconds: 4, RetryCount: 0, BreakerFailureThreshold: 5), + [DriverCapability.Discover] = new(TimeoutSeconds: 60, RetryCount: 2, BreakerFailureThreshold: 3), + [DriverCapability.Subscribe] = new(TimeoutSeconds: 8, RetryCount: 3, BreakerFailureThreshold: 5), + [DriverCapability.Probe] = new(TimeoutSeconds: 4, RetryCount: 3, BreakerFailureThreshold: 5), + [DriverCapability.AlarmSubscribe] = new(TimeoutSeconds: 8, RetryCount: 3, BreakerFailureThreshold: 5), + [DriverCapability.AlarmAcknowledge] = new(TimeoutSeconds: 8, RetryCount: 0, BreakerFailureThreshold: 5), + [DriverCapability.HistoryRead] = new(TimeoutSeconds: 60, RetryCount: 2, BreakerFailureThreshold: 5), + }, + DriverTier.C => new Dictionary + { + [DriverCapability.Read] = new(TimeoutSeconds: 10, RetryCount: 1, BreakerFailureThreshold: 0), + [DriverCapability.Write] = new(TimeoutSeconds: 10, RetryCount: 0, BreakerFailureThreshold: 0), + [DriverCapability.Discover] = new(TimeoutSeconds: 120, RetryCount: 1, BreakerFailureThreshold: 0), + [DriverCapability.Subscribe] = new(TimeoutSeconds: 15, RetryCount: 1, BreakerFailureThreshold: 0), + [DriverCapability.Probe] = new(TimeoutSeconds: 10, RetryCount: 1, BreakerFailureThreshold: 0), + [DriverCapability.AlarmSubscribe] = new(TimeoutSeconds: 15, RetryCount: 1, BreakerFailureThreshold: 0), + [DriverCapability.AlarmAcknowledge] = new(TimeoutSeconds: 15, RetryCount: 0, BreakerFailureThreshold: 0), + [DriverCapability.HistoryRead] = new(TimeoutSeconds: 120, RetryCount: 1, BreakerFailureThreshold: 0), + }, + _ => throw new ArgumentOutOfRangeException(nameof(tier), tier, $"No default policy table defined for tier {tier}."), + }; +} + +/// Policy for one capability on one driver instance. +/// Per-call timeout (wraps the inner Polly execution). +/// Number of retry attempts after the first failure; zero = no retry. +/// +/// Consecutive-failure count that opens the circuit breaker; zero = no breaker +/// (Tier C uses the supervisor's process-level breaker instead, per decision #68). +/// +public sealed record CapabilityPolicy(int TimeoutSeconds, int RetryCount, int BreakerFailureThreshold); diff --git a/src/ZB.MOM.WW.OtOpcUa.Core/Resilience/DriverResiliencePipelineBuilder.cs b/src/ZB.MOM.WW.OtOpcUa.Core/Resilience/DriverResiliencePipelineBuilder.cs new file mode 100644 index 0000000..b3ad043 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Core/Resilience/DriverResiliencePipelineBuilder.cs @@ -0,0 +1,118 @@ +using System.Collections.Concurrent; +using Polly; +using Polly.CircuitBreaker; +using Polly.Retry; +using Polly.Timeout; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; + +namespace ZB.MOM.WW.OtOpcUa.Core.Resilience; + +/// +/// Builds and caches Polly resilience pipelines keyed on +/// (DriverInstanceId, HostName, DriverCapability). One dead PLC behind a multi-device +/// driver cannot open the circuit breaker for healthy sibling hosts. +/// +/// +/// Per docs/v2/plan.md decision #144 (per-device isolation). Composition from outside-in: +/// Timeout → Retry (when capability permits) → Circuit Breaker (when tier permits) → Bulkhead. +/// +/// Pipeline resolution is lock-free on the hot path: the inner +/// caches a per key; +/// first-call cost is one .Build. Thereafter reads are O(1). +/// +public sealed class DriverResiliencePipelineBuilder +{ + private readonly ConcurrentDictionary _pipelines = new(); + private readonly TimeProvider _timeProvider; + + /// Construct with the ambient clock (use in prod). + public DriverResiliencePipelineBuilder(TimeProvider? timeProvider = null) + { + _timeProvider = timeProvider ?? TimeProvider.System; + } + + /// + /// Get or build the pipeline for a given (driver instance, host, capability) triple. + /// Calls with the same key + same options reuse the same pipeline instance; the first caller + /// wins if a race occurs (both pipelines would be behaviourally identical). + /// + /// DriverInstance primary key — opaque to this layer. + /// + /// Host the call targets. For single-host drivers (Galaxy, some OPC UA Client configs) pass the + /// driver's canonical host string. For multi-host drivers (Modbus with N PLCs), pass the + /// specific PLC so one dead PLC doesn't poison healthy siblings. + /// + /// Which capability surface is being called. + /// Per-driver-instance options (tier + per-capability overrides). + public ResiliencePipeline GetOrCreate( + Guid driverInstanceId, + string hostName, + DriverCapability capability, + DriverResilienceOptions options) + { + ArgumentNullException.ThrowIfNull(options); + ArgumentException.ThrowIfNullOrWhiteSpace(hostName); + + var key = new PipelineKey(driverInstanceId, hostName, capability); + return _pipelines.GetOrAdd(key, static (_, state) => Build(state.capability, state.options, state.timeProvider), + (capability, options, timeProvider: _timeProvider)); + } + + /// Drop cached pipelines for one driver instance (e.g. on ResilienceConfig change). Test + Admin-reload use. + public int Invalidate(Guid driverInstanceId) + { + var removed = 0; + foreach (var key in _pipelines.Keys) + { + if (key.DriverInstanceId == driverInstanceId && _pipelines.TryRemove(key, out _)) + removed++; + } + return removed; + } + + /// Snapshot of the current number of cached pipelines. For diagnostics only. + public int CachedPipelineCount => _pipelines.Count; + + private static ResiliencePipeline Build( + DriverCapability capability, + DriverResilienceOptions options, + TimeProvider timeProvider) + { + var policy = options.Resolve(capability); + var builder = new ResiliencePipelineBuilder { TimeProvider = timeProvider }; + + builder.AddTimeout(new TimeoutStrategyOptions + { + Timeout = TimeSpan.FromSeconds(policy.TimeoutSeconds), + }); + + if (policy.RetryCount > 0) + { + builder.AddRetry(new RetryStrategyOptions + { + MaxRetryAttempts = policy.RetryCount, + BackoffType = DelayBackoffType.Exponential, + UseJitter = true, + Delay = TimeSpan.FromMilliseconds(100), + MaxDelay = TimeSpan.FromSeconds(5), + ShouldHandle = new PredicateBuilder().Handle(ex => ex is not OperationCanceledException), + }); + } + + if (policy.BreakerFailureThreshold > 0) + { + builder.AddCircuitBreaker(new CircuitBreakerStrategyOptions + { + FailureRatio = 1.0, + MinimumThroughput = policy.BreakerFailureThreshold, + SamplingDuration = TimeSpan.FromSeconds(30), + BreakDuration = TimeSpan.FromSeconds(15), + ShouldHandle = new PredicateBuilder().Handle(ex => ex is not OperationCanceledException), + }); + } + + return builder.Build(); + } + + private readonly record struct PipelineKey(Guid DriverInstanceId, string HostName, DriverCapability Capability); +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Core/ZB.MOM.WW.OtOpcUa.Core.csproj b/src/ZB.MOM.WW.OtOpcUa.Core/ZB.MOM.WW.OtOpcUa.Core.csproj index 8e48171..805bcff 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Core/ZB.MOM.WW.OtOpcUa.Core.csproj +++ b/src/ZB.MOM.WW.OtOpcUa.Core/ZB.MOM.WW.OtOpcUa.Core.csproj @@ -16,6 +16,10 @@ + + + + diff --git a/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/DriverResilienceOptionsTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/DriverResilienceOptionsTests.cs new file mode 100644 index 0000000..6d23681 --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/DriverResilienceOptionsTests.cs @@ -0,0 +1,102 @@ +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.Core.Resilience; + +namespace ZB.MOM.WW.OtOpcUa.Core.Tests.Resilience; + +[Trait("Category", "Unit")] +public sealed class DriverResilienceOptionsTests +{ + [Theory] + [InlineData(DriverTier.A)] + [InlineData(DriverTier.B)] + [InlineData(DriverTier.C)] + public void TierDefaults_Cover_EveryCapability(DriverTier tier) + { + var defaults = DriverResilienceOptions.GetTierDefaults(tier); + + foreach (var capability in Enum.GetValues()) + defaults.ShouldContainKey(capability); + } + + [Theory] + [InlineData(DriverTier.A)] + [InlineData(DriverTier.B)] + [InlineData(DriverTier.C)] + public void Write_NeverRetries_ByDefault(DriverTier tier) + { + var defaults = DriverResilienceOptions.GetTierDefaults(tier); + defaults[DriverCapability.Write].RetryCount.ShouldBe(0); + } + + [Theory] + [InlineData(DriverTier.A)] + [InlineData(DriverTier.B)] + [InlineData(DriverTier.C)] + public void AlarmAcknowledge_NeverRetries_ByDefault(DriverTier tier) + { + var defaults = DriverResilienceOptions.GetTierDefaults(tier); + defaults[DriverCapability.AlarmAcknowledge].RetryCount.ShouldBe(0); + } + + [Theory] + [InlineData(DriverTier.A, DriverCapability.Read)] + [InlineData(DriverTier.A, DriverCapability.HistoryRead)] + [InlineData(DriverTier.B, DriverCapability.Discover)] + [InlineData(DriverTier.B, DriverCapability.Probe)] + [InlineData(DriverTier.C, DriverCapability.AlarmSubscribe)] + public void IdempotentCapabilities_Retry_ByDefault(DriverTier tier, DriverCapability capability) + { + var defaults = DriverResilienceOptions.GetTierDefaults(tier); + defaults[capability].RetryCount.ShouldBeGreaterThan(0); + } + + [Fact] + public void TierC_DisablesCircuitBreaker_DeferringToSupervisor() + { + var defaults = DriverResilienceOptions.GetTierDefaults(DriverTier.C); + + foreach (var (_, policy) in defaults) + policy.BreakerFailureThreshold.ShouldBe(0, "Tier C breaker is handled by the Proxy supervisor (decision #68)"); + } + + [Theory] + [InlineData(DriverTier.A)] + [InlineData(DriverTier.B)] + public void TierAAndB_EnableCircuitBreaker(DriverTier tier) + { + var defaults = DriverResilienceOptions.GetTierDefaults(tier); + + foreach (var (_, policy) in defaults) + policy.BreakerFailureThreshold.ShouldBeGreaterThan(0); + } + + [Fact] + public void Resolve_Uses_TierDefaults_When_NoOverride() + { + var options = new DriverResilienceOptions { Tier = DriverTier.A }; + + var resolved = options.Resolve(DriverCapability.Read); + + resolved.ShouldBe(DriverResilienceOptions.GetTierDefaults(DriverTier.A)[DriverCapability.Read]); + } + + [Fact] + public void Resolve_Uses_Override_When_Configured() + { + var custom = new CapabilityPolicy(TimeoutSeconds: 42, RetryCount: 7, BreakerFailureThreshold: 9); + var options = new DriverResilienceOptions + { + Tier = DriverTier.A, + CapabilityPolicies = new Dictionary + { + [DriverCapability.Read] = custom, + }, + }; + + options.Resolve(DriverCapability.Read).ShouldBe(custom); + options.Resolve(DriverCapability.Write).ShouldBe( + DriverResilienceOptions.GetTierDefaults(DriverTier.A)[DriverCapability.Write]); + } +} diff --git a/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/DriverResiliencePipelineBuilderTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/DriverResiliencePipelineBuilderTests.cs new file mode 100644 index 0000000..67a3d1d --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/DriverResiliencePipelineBuilderTests.cs @@ -0,0 +1,222 @@ +using Polly.CircuitBreaker; +using Polly.Timeout; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.Core.Resilience; + +namespace ZB.MOM.WW.OtOpcUa.Core.Tests.Resilience; + +[Trait("Category", "Unit")] +public sealed class DriverResiliencePipelineBuilderTests +{ + private static readonly DriverResilienceOptions TierAOptions = new() { Tier = DriverTier.A }; + + [Fact] + public async Task Read_Retries_Transient_Failures() + { + var builder = new DriverResiliencePipelineBuilder(); + var pipeline = builder.GetOrCreate(Guid.NewGuid(), "host-1", DriverCapability.Read, TierAOptions); + var attempts = 0; + + await pipeline.ExecuteAsync(async _ => + { + attempts++; + if (attempts < 3) throw new InvalidOperationException("transient"); + await Task.Yield(); + }); + + attempts.ShouldBe(3); + } + + [Fact] + public async Task Write_DoesNotRetry_OnFailure() + { + var builder = new DriverResiliencePipelineBuilder(); + var pipeline = builder.GetOrCreate(Guid.NewGuid(), "host-1", DriverCapability.Write, TierAOptions); + var attempts = 0; + + var ex = await Should.ThrowAsync(async () => + { + await pipeline.ExecuteAsync(async _ => + { + attempts++; + await Task.Yield(); + throw new InvalidOperationException("boom"); + }); + }); + + attempts.ShouldBe(1); + ex.Message.ShouldBe("boom"); + } + + [Fact] + public async Task AlarmAcknowledge_DoesNotRetry_OnFailure() + { + var builder = new DriverResiliencePipelineBuilder(); + var pipeline = builder.GetOrCreate(Guid.NewGuid(), "host-1", DriverCapability.AlarmAcknowledge, TierAOptions); + var attempts = 0; + + await Should.ThrowAsync(async () => + { + await pipeline.ExecuteAsync(async _ => + { + attempts++; + await Task.Yield(); + throw new InvalidOperationException("boom"); + }); + }); + + attempts.ShouldBe(1); + } + + [Fact] + public void Pipeline_IsIsolated_PerHost() + { + var builder = new DriverResiliencePipelineBuilder(); + var driverId = Guid.NewGuid(); + + var hostA = builder.GetOrCreate(driverId, "host-a", DriverCapability.Read, TierAOptions); + var hostB = builder.GetOrCreate(driverId, "host-b", DriverCapability.Read, TierAOptions); + + hostA.ShouldNotBeSameAs(hostB); + builder.CachedPipelineCount.ShouldBe(2); + } + + [Fact] + public void Pipeline_IsReused_ForSameTriple() + { + var builder = new DriverResiliencePipelineBuilder(); + var driverId = Guid.NewGuid(); + + var first = builder.GetOrCreate(driverId, "host-a", DriverCapability.Read, TierAOptions); + var second = builder.GetOrCreate(driverId, "host-a", DriverCapability.Read, TierAOptions); + + first.ShouldBeSameAs(second); + builder.CachedPipelineCount.ShouldBe(1); + } + + [Fact] + public void Pipeline_IsIsolated_PerCapability() + { + var builder = new DriverResiliencePipelineBuilder(); + var driverId = Guid.NewGuid(); + + var read = builder.GetOrCreate(driverId, "host-a", DriverCapability.Read, TierAOptions); + var write = builder.GetOrCreate(driverId, "host-a", DriverCapability.Write, TierAOptions); + + read.ShouldNotBeSameAs(write); + } + + [Fact] + public async Task DeadHost_DoesNotOpenBreaker_ForSiblingHost() + { + var builder = new DriverResiliencePipelineBuilder(); + var driverId = Guid.NewGuid(); + + var deadHost = builder.GetOrCreate(driverId, "dead-plc", DriverCapability.Read, TierAOptions); + var liveHost = builder.GetOrCreate(driverId, "live-plc", DriverCapability.Read, TierAOptions); + + var threshold = TierAOptions.Resolve(DriverCapability.Read).BreakerFailureThreshold; + for (var i = 0; i < threshold + 5; i++) + { + await Should.ThrowAsync(async () => + await deadHost.ExecuteAsync(async _ => + { + await Task.Yield(); + throw new InvalidOperationException("dead plc"); + })); + } + + var liveAttempts = 0; + await liveHost.ExecuteAsync(async _ => + { + liveAttempts++; + await Task.Yield(); + }); + + liveAttempts.ShouldBe(1, "healthy sibling host must not be affected by dead peer"); + } + + [Fact] + public async Task CircuitBreaker_Opens_AfterFailureThreshold_OnTierA() + { + var builder = new DriverResiliencePipelineBuilder(); + var pipeline = builder.GetOrCreate(Guid.NewGuid(), "host-1", DriverCapability.Write, TierAOptions); + + var threshold = TierAOptions.Resolve(DriverCapability.Write).BreakerFailureThreshold; + for (var i = 0; i < threshold; i++) + { + await Should.ThrowAsync(async () => + await pipeline.ExecuteAsync(async _ => + { + await Task.Yield(); + throw new InvalidOperationException("boom"); + })); + } + + await Should.ThrowAsync(async () => + await pipeline.ExecuteAsync(async _ => + { + await Task.Yield(); + })); + } + + [Fact] + public async Task Timeout_Cancels_SlowOperation() + { + var tierAWithShortTimeout = new DriverResilienceOptions + { + Tier = DriverTier.A, + CapabilityPolicies = new Dictionary + { + [DriverCapability.Read] = new(TimeoutSeconds: 1, RetryCount: 0, BreakerFailureThreshold: 5), + }, + }; + var builder = new DriverResiliencePipelineBuilder(); + var pipeline = builder.GetOrCreate(Guid.NewGuid(), "host-1", DriverCapability.Read, tierAWithShortTimeout); + + await Should.ThrowAsync(async () => + await pipeline.ExecuteAsync(async ct => + { + await Task.Delay(TimeSpan.FromSeconds(5), ct); + })); + } + + [Fact] + public void Invalidate_Removes_OnlyMatchingInstance() + { + var builder = new DriverResiliencePipelineBuilder(); + var keepId = Guid.NewGuid(); + var dropId = Guid.NewGuid(); + + builder.GetOrCreate(keepId, "h", DriverCapability.Read, TierAOptions); + builder.GetOrCreate(keepId, "h", DriverCapability.Write, TierAOptions); + builder.GetOrCreate(dropId, "h", DriverCapability.Read, TierAOptions); + + var removed = builder.Invalidate(dropId); + + removed.ShouldBe(1); + builder.CachedPipelineCount.ShouldBe(2); + } + + [Fact] + public async Task Cancellation_IsNot_Retried() + { + var builder = new DriverResiliencePipelineBuilder(); + var pipeline = builder.GetOrCreate(Guid.NewGuid(), "host-1", DriverCapability.Read, TierAOptions); + var attempts = 0; + using var cts = new CancellationTokenSource(); + cts.Cancel(); + + await Should.ThrowAsync(async () => + await pipeline.ExecuteAsync(async ct => + { + attempts++; + ct.ThrowIfCancellationRequested(); + await Task.Yield(); + }, cts.Token)); + + attempts.ShouldBeLessThanOrEqualTo(1); + } +} -- 2.49.1 From 90f7792c9239f25fbd04fbd626cdad322b5d0cc9 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 19 Apr 2026 04:09:26 -0400 Subject: [PATCH 2/6] =?UTF-8?q?Phase=206.1=20Stream=20A.3=20=E2=80=94=20Ca?= =?UTF-8?q?pabilityInvoker=20wraps=20driver-capability=20calls=20through?= =?UTF-8?q?=20the=20shared=20pipeline?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit One invoker per (DriverInstance, IDriver) pair; calls ExecuteAsync(capability, host, callSite) and the invoker resolves the correct pipeline from the shared DriverResiliencePipelineBuilder. The options accessor is a Func so Admin-edit + pipeline-invalidate takes effect without restarting the invoker or the driver host. ExecuteWriteAsync(isIdempotent) is the explicit write-safety surface: - isIdempotent=false routes through a side pipeline with RetryCount=0 regardless of what the caller configured. The cache key carries a "::non-idempotent" suffix so it never collides with the retry-enabled write pipeline. - isIdempotent=true routes through the normal Write pipeline. If the user has configured Write retries (opt-in), the idempotent tag gets them; otherwise default-0 still wins. The server dispatch layer (next PR) reads WriteIdempotentAttribute on each tag definition once at driver-init time and feeds the boolean into ExecuteWriteAsync. Tests (6 new): - Read retries on transient failure; returns value from call site. - Write non-idempotent does NOT retry even when policy has 3 retries configured (the explicit decision-#44 guard at the dispatch surface). - Write idempotent retries when policy allows. - Write with default tier-A policy (RetryCount=0) never retries regardless of idempotency flag. - Different hosts get independent pipelines. Core.Tests now 44 passing (was 38). Invoker doc-refs completed (the XML comment on WriteIdempotentAttribute no longer references a non-existent type). Co-Authored-By: Claude Opus 4.7 (1M context) --- .../Resilience/CapabilityInvoker.cs | 106 ++++++++++++ .../Resilience/CapabilityInvokerTests.cs | 151 ++++++++++++++++++ 2 files changed, 257 insertions(+) create mode 100644 src/ZB.MOM.WW.OtOpcUa.Core/Resilience/CapabilityInvoker.cs create mode 100644 tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/CapabilityInvokerTests.cs diff --git a/src/ZB.MOM.WW.OtOpcUa.Core/Resilience/CapabilityInvoker.cs b/src/ZB.MOM.WW.OtOpcUa.Core/Resilience/CapabilityInvoker.cs new file mode 100644 index 0000000..e881dc7 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Core/Resilience/CapabilityInvoker.cs @@ -0,0 +1,106 @@ +using Polly; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; + +namespace ZB.MOM.WW.OtOpcUa.Core.Resilience; + +/// +/// Executes driver-capability calls through a shared Polly pipeline. One invoker per +/// (DriverInstance, IDriver) pair; the underlying +/// is process-singleton so all invokers share its cache. +/// +/// +/// Per docs/v2/plan.md decisions #143-144 and Phase 6.1 Stream A.3. The server's dispatch +/// layer routes every capability call (IReadable.ReadAsync, IWritable.WriteAsync, +/// ITagDiscovery.DiscoverAsync, ISubscribable.SubscribeAsync/UnsubscribeAsync, +/// IHostConnectivityProbe probe loop, IAlarmSource.SubscribeAlarmsAsync/AcknowledgeAsync, +/// and all four IHistoryProvider reads) through this invoker. +/// +public sealed class CapabilityInvoker +{ + private readonly DriverResiliencePipelineBuilder _builder; + private readonly Guid _driverInstanceId; + private readonly Func _optionsAccessor; + + /// + /// Construct an invoker for one driver instance. + /// + /// Shared, process-singleton pipeline builder. + /// The DriverInstance.Id column value. + /// + /// Snapshot accessor for the current resilience options. Invoked per call so Admin-edit + + /// pipeline-invalidate can take effect without restarting the invoker. + /// + public CapabilityInvoker( + DriverResiliencePipelineBuilder builder, + Guid driverInstanceId, + Func optionsAccessor) + { + ArgumentNullException.ThrowIfNull(builder); + ArgumentNullException.ThrowIfNull(optionsAccessor); + + _builder = builder; + _driverInstanceId = driverInstanceId; + _optionsAccessor = optionsAccessor; + } + + /// Execute a capability call returning a value, honoring the per-capability pipeline. + /// Return type of the underlying driver call. + public async ValueTask ExecuteAsync( + DriverCapability capability, + string hostName, + Func> callSite, + CancellationToken cancellationToken) + { + ArgumentNullException.ThrowIfNull(callSite); + + var pipeline = ResolvePipeline(capability, hostName); + return await pipeline.ExecuteAsync(callSite, cancellationToken).ConfigureAwait(false); + } + + /// Execute a void-returning capability call, honoring the per-capability pipeline. + public async ValueTask ExecuteAsync( + DriverCapability capability, + string hostName, + Func callSite, + CancellationToken cancellationToken) + { + ArgumentNullException.ThrowIfNull(callSite); + + var pipeline = ResolvePipeline(capability, hostName); + await pipeline.ExecuteAsync(callSite, cancellationToken).ConfigureAwait(false); + } + + /// + /// Execute a call honoring + /// semantics — if is false, retries are disabled regardless + /// of the tag-level configuration (the pipeline for a non-idempotent write never retries per + /// decisions #44-45). If true, the call runs through the capability's pipeline which may + /// retry when the tier configuration permits. + /// + public async ValueTask ExecuteWriteAsync( + string hostName, + bool isIdempotent, + Func> callSite, + CancellationToken cancellationToken) + { + ArgumentNullException.ThrowIfNull(callSite); + + if (!isIdempotent) + { + var noRetryOptions = _optionsAccessor() with + { + CapabilityPolicies = new Dictionary + { + [DriverCapability.Write] = _optionsAccessor().Resolve(DriverCapability.Write) with { RetryCount = 0 }, + }, + }; + var pipeline = _builder.GetOrCreate(_driverInstanceId, $"{hostName}::non-idempotent", DriverCapability.Write, noRetryOptions); + return await pipeline.ExecuteAsync(callSite, cancellationToken).ConfigureAwait(false); + } + + return await ExecuteAsync(DriverCapability.Write, hostName, callSite, cancellationToken).ConfigureAwait(false); + } + + private ResiliencePipeline ResolvePipeline(DriverCapability capability, string hostName) => + _builder.GetOrCreate(_driverInstanceId, hostName, capability, _optionsAccessor()); +} diff --git a/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/CapabilityInvokerTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/CapabilityInvokerTests.cs new file mode 100644 index 0000000..aa82652 --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/CapabilityInvokerTests.cs @@ -0,0 +1,151 @@ +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.Core.Resilience; + +namespace ZB.MOM.WW.OtOpcUa.Core.Tests.Resilience; + +[Trait("Category", "Unit")] +public sealed class CapabilityInvokerTests +{ + private static CapabilityInvoker MakeInvoker( + DriverResiliencePipelineBuilder builder, + DriverResilienceOptions options) => + new(builder, Guid.NewGuid(), () => options); + + [Fact] + public async Task Read_ReturnsValue_FromCallSite() + { + var invoker = MakeInvoker(new DriverResiliencePipelineBuilder(), new DriverResilienceOptions { Tier = DriverTier.A }); + + var result = await invoker.ExecuteAsync( + DriverCapability.Read, + "host-1", + _ => ValueTask.FromResult(42), + CancellationToken.None); + + result.ShouldBe(42); + } + + [Fact] + public async Task Read_Retries_OnTransientFailure() + { + var invoker = MakeInvoker(new DriverResiliencePipelineBuilder(), new DriverResilienceOptions { Tier = DriverTier.A }); + var attempts = 0; + + var result = await invoker.ExecuteAsync( + DriverCapability.Read, + "host-1", + async _ => + { + attempts++; + if (attempts < 2) throw new InvalidOperationException("transient"); + await Task.Yield(); + return "ok"; + }, + CancellationToken.None); + + result.ShouldBe("ok"); + attempts.ShouldBe(2); + } + + [Fact] + public async Task Write_NonIdempotent_DoesNotRetry_EvenWhenPolicyHasRetries() + { + var options = new DriverResilienceOptions + { + Tier = DriverTier.A, + CapabilityPolicies = new Dictionary + { + [DriverCapability.Write] = new(TimeoutSeconds: 2, RetryCount: 3, BreakerFailureThreshold: 5), + }, + }; + var invoker = MakeInvoker(new DriverResiliencePipelineBuilder(), options); + var attempts = 0; + + await Should.ThrowAsync(async () => + await invoker.ExecuteWriteAsync( + "host-1", + isIdempotent: false, + async _ => + { + attempts++; + await Task.Yield(); + throw new InvalidOperationException("boom"); +#pragma warning disable CS0162 + return 0; +#pragma warning restore CS0162 + }, + CancellationToken.None)); + + attempts.ShouldBe(1, "non-idempotent write must never replay"); + } + + [Fact] + public async Task Write_Idempotent_Retries_WhenPolicyHasRetries() + { + var options = new DriverResilienceOptions + { + Tier = DriverTier.A, + CapabilityPolicies = new Dictionary + { + [DriverCapability.Write] = new(TimeoutSeconds: 2, RetryCount: 3, BreakerFailureThreshold: 5), + }, + }; + var invoker = MakeInvoker(new DriverResiliencePipelineBuilder(), options); + var attempts = 0; + + var result = await invoker.ExecuteWriteAsync( + "host-1", + isIdempotent: true, + async _ => + { + attempts++; + if (attempts < 2) throw new InvalidOperationException("transient"); + await Task.Yield(); + return "ok"; + }, + CancellationToken.None); + + result.ShouldBe("ok"); + attempts.ShouldBe(2); + } + + [Fact] + public async Task Write_Default_DoesNotRetry_WhenPolicyHasZeroRetries() + { + // Tier A Write default is RetryCount=0. Even isIdempotent=true shouldn't retry + // because the policy says not to. + var invoker = MakeInvoker(new DriverResiliencePipelineBuilder(), new DriverResilienceOptions { Tier = DriverTier.A }); + var attempts = 0; + + await Should.ThrowAsync(async () => + await invoker.ExecuteWriteAsync( + "host-1", + isIdempotent: true, + async _ => + { + attempts++; + await Task.Yield(); + throw new InvalidOperationException("boom"); +#pragma warning disable CS0162 + return 0; +#pragma warning restore CS0162 + }, + CancellationToken.None)); + + attempts.ShouldBe(1, "tier-A default for Write is RetryCount=0"); + } + + [Fact] + public async Task Execute_HonorsDifferentHosts_Independently() + { + var builder = new DriverResiliencePipelineBuilder(); + var invoker = MakeInvoker(builder, new DriverResilienceOptions { Tier = DriverTier.A }); + + await invoker.ExecuteAsync(DriverCapability.Read, "host-a", _ => ValueTask.FromResult(1), CancellationToken.None); + await invoker.ExecuteAsync(DriverCapability.Read, "host-b", _ => ValueTask.FromResult(2), CancellationToken.None); + + builder.CachedPipelineCount.ShouldBe(2); + } +} -- 2.49.1 From f3850f89140ddf567fb0df2e1209eaf80fe6712e Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 19 Apr 2026 07:16:21 -0400 Subject: [PATCH 3/6] =?UTF-8?q?Phase=206.1=20Stream=20A.5/A.6=20=E2=80=94?= =?UTF-8?q?=20WriteIdempotent=20flag=20on=20DriverAttributeInfo=20+=20Modb?= =?UTF-8?q?us/S7=20tag=20records=20+=20FlakeyDriver=20integration=20tests?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per-tag opt-in for write-retry per docs/v2/plan.md decisions #44, #45, #143. Default is false — writes never auto-retry unless the driver author has marked the tag as safe to replay. Core.Abstractions: - DriverAttributeInfo gains `bool WriteIdempotent = false` at the end of the positional record (back-compatible; every existing call site uses the default). Driver.Modbus: - ModbusTagDefinition gains `bool WriteIdempotent = false`. Safe candidates documented in the param XML: holding-register set-points, configuration registers. Unsafe: edge-triggered coils, counter-increment addresses. - ModbusDriver.DiscoverAsync propagates t.WriteIdempotent into DriverAttributeInfo.WriteIdempotent. Driver.S7: - S7TagDefinition gains `bool WriteIdempotent = false`. Safe candidates: DB word/dword set-points, configuration DBs. Unsafe: M/Q bits that drive edge-triggered program routines. - S7Driver.DiscoverAsync propagates the flag. Stream A.5 integration tests (FlakeyDriverIntegrationTests, 4 new) exercise the invoker + flaky-driver contract the plan enumerates: - Read with 5 transient failures succeeds on the 6th attempt (RetryCount=10). - Non-idempotent write with RetryCount=5 configured still fails on the first failure — no replay (decision #44 guard at the ExecuteWriteAsync surface). - Idempotent write with 2 transient failures succeeds on the 3rd attempt. - Two hosts on the same driver have independent breakers — dead-host trips its breaker but live-host's first call still succeeds. Propagation tests: - ModbusDriverTests: SetPoint WriteIdempotent=true flows into DriverAttributeInfo; PulseCoil default=false. - S7DiscoveryAndSubscribeTests: same pattern for DBx SetPoint vs M-bit. Full solution dotnet test: 947 passing (baseline 906, +41 net across Stream A so far). Pre-existing Client.CLI Subscribe flake unchanged. Stream A's remaining work (wiring CapabilityInvoker into DriverNodeManager's OnReadValue / OnWriteValue / History / Subscribe dispatch paths) is the server-side integration piece + needs DI wiring for the pipeline builder — lands in the next PR on this branch. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../DriverAttributeInfo.cs | 11 +- .../ModbusDriver.cs | 3 +- .../ModbusDriverOptions.cs | 11 +- src/ZB.MOM.WW.OtOpcUa.Driver.S7/S7Driver.cs | 3 +- .../S7DriverOptions.cs | 10 +- .../FlakeyDriverIntegrationTests.cs | 157 ++++++++++++++++++ .../ModbusDriverTests.cs | 17 ++ .../S7DiscoveryAndSubscribeTests.cs | 21 +++ 8 files changed, 228 insertions(+), 5 deletions(-) create mode 100644 tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/FlakeyDriverIntegrationTests.cs diff --git a/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/DriverAttributeInfo.cs b/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/DriverAttributeInfo.cs index 7071770..1c24020 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/DriverAttributeInfo.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/DriverAttributeInfo.cs @@ -25,6 +25,14 @@ namespace ZB.MOM.WW.OtOpcUa.Core.Abstractions; /// OPC UA AlarmConditionState when true. Defaults to false so existing non-Galaxy /// drivers aren't forced to flow a flag they don't produce. /// +/// +/// True when a timed-out or failed write to this attribute is safe to replay. Per +/// docs/v2/plan.md decisions #44, #45, #143 — writes are NOT auto-retried by default +/// because replaying a pulse / alarm-ack / counter-increment / recipe-step advance can +/// duplicate field actions. Drivers flag only tags whose semantics make retry safe +/// (holding registers with level-set values, set-point writes to analog tags) — the +/// capability invoker respects this flag when deciding whether to apply Polly retry. +/// public sealed record DriverAttributeInfo( string FullName, DriverDataType DriverDataType, @@ -32,4 +40,5 @@ public sealed record DriverAttributeInfo( uint? ArrayDim, SecurityClassification SecurityClass, bool IsHistorized, - bool IsAlarm = false); + bool IsAlarm = false, + bool WriteIdempotent = false); diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriver.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriver.cs index 2d2eec9..cbc7bf9 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriver.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriver.cs @@ -115,7 +115,8 @@ public sealed class ModbusDriver(ModbusDriverOptions options, string driverInsta ArrayDim: null, SecurityClass: t.Writable ? SecurityClassification.Operate : SecurityClassification.ViewOnly, IsHistorized: false, - IsAlarm: false)); + IsAlarm: false, + WriteIdempotent: t.WriteIdempotent)); } return Task.CompletedTask; } diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriverOptions.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriverOptions.cs index e05c44d..c119d4e 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriverOptions.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriverOptions.cs @@ -92,6 +92,14 @@ public sealed class ModbusProbeOptions /// AutomationDirect DirectLOGIC (DL205/DL260) and a few legacy families pack the first /// character in the low byte instead — see docs/v2/dl205.md §strings. /// +/// +/// Per docs/v2/plan.md decisions #44, #45, #143 — flag a tag as safe to replay on +/// write timeout / failure. Default false; writes do not auto-retry. Safe candidates: +/// holding-register set-points for analog values and configuration registers where the same +/// value can be written again without side-effects. Unsafe: coils that drive edge-triggered +/// actions (pulse outputs), counter-increment addresses on PLCs that treat writes as deltas, +/// any BCD / counter register where repeat-writes advance state. +/// public sealed record ModbusTagDefinition( string Name, ModbusRegion Region, @@ -101,7 +109,8 @@ public sealed record ModbusTagDefinition( ModbusByteOrder ByteOrder = ModbusByteOrder.BigEndian, byte BitIndex = 0, ushort StringLength = 0, - ModbusStringByteOrder StringByteOrder = ModbusStringByteOrder.HighByteFirst); + ModbusStringByteOrder StringByteOrder = ModbusStringByteOrder.HighByteFirst, + bool WriteIdempotent = false); public enum ModbusRegion { Coils, DiscreteInputs, InputRegisters, HoldingRegisters } diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.S7/S7Driver.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.S7/S7Driver.cs index 708ad33..b7bb365 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.S7/S7Driver.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.S7/S7Driver.cs @@ -341,7 +341,8 @@ public sealed class S7Driver(S7DriverOptions options, string driverInstanceId) ArrayDim: null, SecurityClass: t.Writable ? SecurityClassification.Operate : SecurityClassification.ViewOnly, IsHistorized: false, - IsAlarm: false)); + IsAlarm: false, + WriteIdempotent: t.WriteIdempotent)); } return Task.CompletedTask; } diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.S7/S7DriverOptions.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.S7/S7DriverOptions.cs index 8f0e4ca..c3cc172 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.S7/S7DriverOptions.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.S7/S7DriverOptions.cs @@ -88,12 +88,20 @@ public sealed class S7ProbeOptions /// Logical data type — drives the underlying S7.Net read/write width. /// When true the driver accepts writes for this tag. /// For DataType = String: S7-string max length. Default 254 (S7 max). +/// +/// Per docs/v2/plan.md decisions #44, #45, #143 — flag a tag as safe to replay on +/// write timeout / failure. Default false; writes do not auto-retry. Safe candidates +/// on S7: DB word/dword set-points holding analog values, configuration DBs where the same +/// value can be written again without side-effects. Unsafe: M (merker) bits or Q (output) +/// coils that drive edge-triggered routines in the PLC program. +/// public sealed record S7TagDefinition( string Name, string Address, S7DataType DataType, bool Writable = true, - int StringLength = 254); + int StringLength = 254, + bool WriteIdempotent = false); public enum S7DataType { diff --git a/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/FlakeyDriverIntegrationTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/FlakeyDriverIntegrationTests.cs new file mode 100644 index 0000000..0622cdf --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/FlakeyDriverIntegrationTests.cs @@ -0,0 +1,157 @@ +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.Core.Resilience; + +namespace ZB.MOM.WW.OtOpcUa.Core.Tests.Resilience; + +/// +/// Integration tests for the Phase 6.1 Stream A.5 contract — wrapping a flaky +/// / through the . +/// Exercises the three scenarios the plan enumerates: transient read succeeds after N +/// retries; non-idempotent write fails after one attempt; idempotent write retries through. +/// +[Trait("Category", "Integration")] +public sealed class FlakeyDriverIntegrationTests +{ + [Fact] + public async Task Read_SurfacesSuccess_AfterTransientFailures() + { + var flaky = new FlakeyDriver(failReadsBeforeIndex: 5); + var options = new DriverResilienceOptions + { + Tier = DriverTier.A, + CapabilityPolicies = new Dictionary + { + [DriverCapability.Read] = new(TimeoutSeconds: 2, RetryCount: 10, BreakerFailureThreshold: 50), + }, + }; + var invoker = new CapabilityInvoker(new DriverResiliencePipelineBuilder(), Guid.NewGuid(), () => options); + + var result = await invoker.ExecuteAsync( + DriverCapability.Read, + "host-1", + async ct => await flaky.ReadAsync(["tag-a"], ct), + CancellationToken.None); + + flaky.ReadAttempts.ShouldBe(6); + result[0].StatusCode.ShouldBe(0u); + } + + [Fact] + public async Task Write_NonIdempotent_FailsOnFirstFailure_NoReplay() + { + var flaky = new FlakeyDriver(failWritesBeforeIndex: 3); + var optionsWithAggressiveRetry = new DriverResilienceOptions + { + Tier = DriverTier.A, + CapabilityPolicies = new Dictionary + { + [DriverCapability.Write] = new(TimeoutSeconds: 2, RetryCount: 5, BreakerFailureThreshold: 50), + }, + }; + var invoker = new CapabilityInvoker(new DriverResiliencePipelineBuilder(), Guid.NewGuid(), () => optionsWithAggressiveRetry); + + await Should.ThrowAsync(async () => + await invoker.ExecuteWriteAsync( + "host-1", + isIdempotent: false, + async ct => await flaky.WriteAsync([new WriteRequest("pulse-coil", true)], ct), + CancellationToken.None)); + + flaky.WriteAttempts.ShouldBe(1, "non-idempotent write must never replay (decision #44)"); + } + + [Fact] + public async Task Write_Idempotent_RetriesUntilSuccess() + { + var flaky = new FlakeyDriver(failWritesBeforeIndex: 2); + var optionsWithRetry = new DriverResilienceOptions + { + Tier = DriverTier.A, + CapabilityPolicies = new Dictionary + { + [DriverCapability.Write] = new(TimeoutSeconds: 2, RetryCount: 5, BreakerFailureThreshold: 50), + }, + }; + var invoker = new CapabilityInvoker(new DriverResiliencePipelineBuilder(), Guid.NewGuid(), () => optionsWithRetry); + + var results = await invoker.ExecuteWriteAsync( + "host-1", + isIdempotent: true, + async ct => await flaky.WriteAsync([new WriteRequest("set-point", 42.0f)], ct), + CancellationToken.None); + + flaky.WriteAttempts.ShouldBe(3); + results[0].StatusCode.ShouldBe(0u); + } + + [Fact] + public async Task MultipleHosts_OnOneDriver_HaveIndependentFailureCounts() + { + var flaky = new FlakeyDriver(failReadsBeforeIndex: 0); + var options = new DriverResilienceOptions { Tier = DriverTier.A }; + var builder = new DriverResiliencePipelineBuilder(); + var invoker = new CapabilityInvoker(builder, Guid.NewGuid(), () => options); + + // host-dead: force many failures to exhaust retries + trip breaker + var threshold = options.Resolve(DriverCapability.Read).BreakerFailureThreshold; + for (var i = 0; i < threshold + 5; i++) + { + await Should.ThrowAsync(async () => + await invoker.ExecuteAsync(DriverCapability.Read, "host-dead", + _ => throw new InvalidOperationException("dead"), + CancellationToken.None)); + } + + // host-live: succeeds on first call — unaffected by the dead-host breaker + var liveAttempts = 0; + await invoker.ExecuteAsync(DriverCapability.Read, "host-live", + _ => { liveAttempts++; return ValueTask.FromResult("ok"); }, + CancellationToken.None); + + liveAttempts.ShouldBe(1); + } + + private sealed class FlakeyDriver : IReadable, IWritable + { + private readonly int _failReadsBeforeIndex; + private readonly int _failWritesBeforeIndex; + + public int ReadAttempts { get; private set; } + public int WriteAttempts { get; private set; } + + public FlakeyDriver(int failReadsBeforeIndex = 0, int failWritesBeforeIndex = 0) + { + _failReadsBeforeIndex = failReadsBeforeIndex; + _failWritesBeforeIndex = failWritesBeforeIndex; + } + + public Task> ReadAsync( + IReadOnlyList fullReferences, + CancellationToken cancellationToken) + { + var attempt = ++ReadAttempts; + if (attempt <= _failReadsBeforeIndex) + throw new InvalidOperationException($"transient read failure #{attempt}"); + + var now = DateTime.UtcNow; + IReadOnlyList result = fullReferences + .Select(_ => new DataValueSnapshot(Value: 0, StatusCode: 0u, SourceTimestampUtc: now, ServerTimestampUtc: now)) + .ToList(); + return Task.FromResult(result); + } + + public Task> WriteAsync( + IReadOnlyList writes, + CancellationToken cancellationToken) + { + var attempt = ++WriteAttempts; + if (attempt <= _failWritesBeforeIndex) + throw new InvalidOperationException($"transient write failure #{attempt}"); + + IReadOnlyList result = writes.Select(_ => new WriteResult(0u)).ToList(); + return Task.FromResult(result); + } + } +} diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.Modbus.Tests/ModbusDriverTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.Modbus.Tests/ModbusDriverTests.cs index 0b31fd2..48bb565 100644 --- a/tests/ZB.MOM.WW.OtOpcUa.Driver.Modbus.Tests/ModbusDriverTests.cs +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.Modbus.Tests/ModbusDriverTests.cs @@ -220,6 +220,23 @@ public sealed class ModbusDriverTests builder.Variables.ShouldContain(v => v.BrowseName == "Run" && v.Info.DriverDataType == DriverDataType.Boolean); } + [Fact] + public async Task Discover_propagates_WriteIdempotent_from_tag_to_attribute_info() + { + var (drv, _) = NewDriver( + new ModbusTagDefinition("SetPoint", ModbusRegion.HoldingRegisters, 0, ModbusDataType.Float32, WriteIdempotent: true), + new ModbusTagDefinition("PulseCoil", ModbusRegion.Coils, 0, ModbusDataType.Bool)); + await drv.InitializeAsync("{}", CancellationToken.None); + + var builder = new RecordingBuilder(); + await drv.DiscoverAsync(builder, CancellationToken.None); + + var setPoint = builder.Variables.Single(v => v.BrowseName == "SetPoint"); + var pulse = builder.Variables.Single(v => v.BrowseName == "PulseCoil"); + setPoint.Info.WriteIdempotent.ShouldBeTrue(); + pulse.Info.WriteIdempotent.ShouldBeFalse("default is opt-in per decision #44"); + } + // --- helpers --- private sealed class RecordingBuilder : IAddressSpaceBuilder diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.S7.Tests/S7DiscoveryAndSubscribeTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.S7.Tests/S7DiscoveryAndSubscribeTests.cs index 339369b..dc9d7dc 100644 --- a/tests/ZB.MOM.WW.OtOpcUa.Driver.S7.Tests/S7DiscoveryAndSubscribeTests.cs +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.S7.Tests/S7DiscoveryAndSubscribeTests.cs @@ -65,6 +65,27 @@ public sealed class S7DiscoveryAndSubscribeTests builder.Variables[2].Attr.DriverDataType.ShouldBe(DriverDataType.Float32); } + [Fact] + public async Task DiscoverAsync_propagates_WriteIdempotent_from_tag_to_attribute_info() + { + var opts = new S7DriverOptions + { + Host = "192.0.2.1", + Tags = + [ + new("SetPoint", "DB1.DBW0", S7DataType.Int16, WriteIdempotent: true), + new("StartBit", "M0.0", S7DataType.Bool), + ], + }; + using var drv = new S7Driver(opts, "s7-idem"); + + var builder = new RecordingAddressSpaceBuilder(); + await drv.DiscoverAsync(builder, TestContext.Current.CancellationToken); + + builder.Variables.Single(v => v.Name == "SetPoint").Attr.WriteIdempotent.ShouldBeTrue(); + builder.Variables.Single(v => v.Name == "StartBit").Attr.WriteIdempotent.ShouldBeFalse("default is opt-in per decision #44"); + } + [Fact] public void GetHostStatuses_returns_one_row_with_host_port_identity_pre_init() { -- 2.49.1 From b6d2803ff6b700ec54f331aa7c03a00a4fb2b7af Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 19 Apr 2026 07:18:55 -0400 Subject: [PATCH 4/6] =?UTF-8?q?Phase=206.1=20Stream=20A=20=E2=80=94=20swit?= =?UTF-8?q?ch=20pipeline=20keys=20from=20Guid=20to=20string=20to=20match?= =?UTF-8?q?=20IDriver.DriverInstanceId?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit IDriver.DriverInstanceId is declared as string in Core.Abstractions; keeping the pipeline key as Guid meant every call site would need .ToString() / Guid.Parse at the boundary. Switching the Resilience types to string removes that friction and lets OtOpcUaServer pass driver.DriverInstanceId directly to the builder in the upcoming server-dispatch wiring PR. - DriverResiliencePipelineBuilder.GetOrCreate + Invalidate + PipelineKey - CapabilityInvoker.ctor + _driverInstanceId field Tests: all 48 Core.Tests still pass. The Invalidate test's keepId / dropId now use distinct "drv-keep" / "drv-drop" literals (previously both were distinct Guid.NewGuid() values, which the sed-driven refactor had collapsed to the same literal — caught pre-commit). Co-Authored-By: Claude Opus 4.7 (1M context) --- .../Resilience/CapabilityInvoker.cs | 4 ++-- .../DriverResiliencePipelineBuilder.cs | 6 ++--- .../Resilience/CapabilityInvokerTests.cs | 2 +- .../DriverResiliencePipelineBuilderTests.cs | 24 +++++++++---------- .../FlakeyDriverIntegrationTests.cs | 8 +++---- 5 files changed, 22 insertions(+), 22 deletions(-) diff --git a/src/ZB.MOM.WW.OtOpcUa.Core/Resilience/CapabilityInvoker.cs b/src/ZB.MOM.WW.OtOpcUa.Core/Resilience/CapabilityInvoker.cs index e881dc7..3c06eb6 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Core/Resilience/CapabilityInvoker.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Core/Resilience/CapabilityInvoker.cs @@ -18,7 +18,7 @@ namespace ZB.MOM.WW.OtOpcUa.Core.Resilience; public sealed class CapabilityInvoker { private readonly DriverResiliencePipelineBuilder _builder; - private readonly Guid _driverInstanceId; + private readonly string _driverInstanceId; private readonly Func _optionsAccessor; /// @@ -32,7 +32,7 @@ public sealed class CapabilityInvoker /// public CapabilityInvoker( DriverResiliencePipelineBuilder builder, - Guid driverInstanceId, + string driverInstanceId, Func optionsAccessor) { ArgumentNullException.ThrowIfNull(builder); diff --git a/src/ZB.MOM.WW.OtOpcUa.Core/Resilience/DriverResiliencePipelineBuilder.cs b/src/ZB.MOM.WW.OtOpcUa.Core/Resilience/DriverResiliencePipelineBuilder.cs index b3ad043..d7e25af 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Core/Resilience/DriverResiliencePipelineBuilder.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Core/Resilience/DriverResiliencePipelineBuilder.cs @@ -45,7 +45,7 @@ public sealed class DriverResiliencePipelineBuilder /// Which capability surface is being called. /// Per-driver-instance options (tier + per-capability overrides). public ResiliencePipeline GetOrCreate( - Guid driverInstanceId, + string driverInstanceId, string hostName, DriverCapability capability, DriverResilienceOptions options) @@ -59,7 +59,7 @@ public sealed class DriverResiliencePipelineBuilder } /// Drop cached pipelines for one driver instance (e.g. on ResilienceConfig change). Test + Admin-reload use. - public int Invalidate(Guid driverInstanceId) + public int Invalidate(string driverInstanceId) { var removed = 0; foreach (var key in _pipelines.Keys) @@ -114,5 +114,5 @@ public sealed class DriverResiliencePipelineBuilder return builder.Build(); } - private readonly record struct PipelineKey(Guid DriverInstanceId, string HostName, DriverCapability Capability); + private readonly record struct PipelineKey(string DriverInstanceId, string HostName, DriverCapability Capability); } diff --git a/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/CapabilityInvokerTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/CapabilityInvokerTests.cs index aa82652..9085524 100644 --- a/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/CapabilityInvokerTests.cs +++ b/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/CapabilityInvokerTests.cs @@ -11,7 +11,7 @@ public sealed class CapabilityInvokerTests private static CapabilityInvoker MakeInvoker( DriverResiliencePipelineBuilder builder, DriverResilienceOptions options) => - new(builder, Guid.NewGuid(), () => options); + new(builder, "drv-test", () => options); [Fact] public async Task Read_ReturnsValue_FromCallSite() diff --git a/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/DriverResiliencePipelineBuilderTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/DriverResiliencePipelineBuilderTests.cs index 67a3d1d..1167c5b 100644 --- a/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/DriverResiliencePipelineBuilderTests.cs +++ b/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/DriverResiliencePipelineBuilderTests.cs @@ -16,7 +16,7 @@ public sealed class DriverResiliencePipelineBuilderTests public async Task Read_Retries_Transient_Failures() { var builder = new DriverResiliencePipelineBuilder(); - var pipeline = builder.GetOrCreate(Guid.NewGuid(), "host-1", DriverCapability.Read, TierAOptions); + var pipeline = builder.GetOrCreate("drv-test", "host-1", DriverCapability.Read, TierAOptions); var attempts = 0; await pipeline.ExecuteAsync(async _ => @@ -33,7 +33,7 @@ public sealed class DriverResiliencePipelineBuilderTests public async Task Write_DoesNotRetry_OnFailure() { var builder = new DriverResiliencePipelineBuilder(); - var pipeline = builder.GetOrCreate(Guid.NewGuid(), "host-1", DriverCapability.Write, TierAOptions); + var pipeline = builder.GetOrCreate("drv-test", "host-1", DriverCapability.Write, TierAOptions); var attempts = 0; var ex = await Should.ThrowAsync(async () => @@ -54,7 +54,7 @@ public sealed class DriverResiliencePipelineBuilderTests public async Task AlarmAcknowledge_DoesNotRetry_OnFailure() { var builder = new DriverResiliencePipelineBuilder(); - var pipeline = builder.GetOrCreate(Guid.NewGuid(), "host-1", DriverCapability.AlarmAcknowledge, TierAOptions); + var pipeline = builder.GetOrCreate("drv-test", "host-1", DriverCapability.AlarmAcknowledge, TierAOptions); var attempts = 0; await Should.ThrowAsync(async () => @@ -74,7 +74,7 @@ public sealed class DriverResiliencePipelineBuilderTests public void Pipeline_IsIsolated_PerHost() { var builder = new DriverResiliencePipelineBuilder(); - var driverId = Guid.NewGuid(); + var driverId = "drv-test"; var hostA = builder.GetOrCreate(driverId, "host-a", DriverCapability.Read, TierAOptions); var hostB = builder.GetOrCreate(driverId, "host-b", DriverCapability.Read, TierAOptions); @@ -87,7 +87,7 @@ public sealed class DriverResiliencePipelineBuilderTests public void Pipeline_IsReused_ForSameTriple() { var builder = new DriverResiliencePipelineBuilder(); - var driverId = Guid.NewGuid(); + var driverId = "drv-test"; var first = builder.GetOrCreate(driverId, "host-a", DriverCapability.Read, TierAOptions); var second = builder.GetOrCreate(driverId, "host-a", DriverCapability.Read, TierAOptions); @@ -100,7 +100,7 @@ public sealed class DriverResiliencePipelineBuilderTests public void Pipeline_IsIsolated_PerCapability() { var builder = new DriverResiliencePipelineBuilder(); - var driverId = Guid.NewGuid(); + var driverId = "drv-test"; var read = builder.GetOrCreate(driverId, "host-a", DriverCapability.Read, TierAOptions); var write = builder.GetOrCreate(driverId, "host-a", DriverCapability.Write, TierAOptions); @@ -112,7 +112,7 @@ public sealed class DriverResiliencePipelineBuilderTests public async Task DeadHost_DoesNotOpenBreaker_ForSiblingHost() { var builder = new DriverResiliencePipelineBuilder(); - var driverId = Guid.NewGuid(); + var driverId = "drv-test"; var deadHost = builder.GetOrCreate(driverId, "dead-plc", DriverCapability.Read, TierAOptions); var liveHost = builder.GetOrCreate(driverId, "live-plc", DriverCapability.Read, TierAOptions); @@ -142,7 +142,7 @@ public sealed class DriverResiliencePipelineBuilderTests public async Task CircuitBreaker_Opens_AfterFailureThreshold_OnTierA() { var builder = new DriverResiliencePipelineBuilder(); - var pipeline = builder.GetOrCreate(Guid.NewGuid(), "host-1", DriverCapability.Write, TierAOptions); + var pipeline = builder.GetOrCreate("drv-test", "host-1", DriverCapability.Write, TierAOptions); var threshold = TierAOptions.Resolve(DriverCapability.Write).BreakerFailureThreshold; for (var i = 0; i < threshold; i++) @@ -174,7 +174,7 @@ public sealed class DriverResiliencePipelineBuilderTests }, }; var builder = new DriverResiliencePipelineBuilder(); - var pipeline = builder.GetOrCreate(Guid.NewGuid(), "host-1", DriverCapability.Read, tierAWithShortTimeout); + var pipeline = builder.GetOrCreate("drv-test", "host-1", DriverCapability.Read, tierAWithShortTimeout); await Should.ThrowAsync(async () => await pipeline.ExecuteAsync(async ct => @@ -187,8 +187,8 @@ public sealed class DriverResiliencePipelineBuilderTests public void Invalidate_Removes_OnlyMatchingInstance() { var builder = new DriverResiliencePipelineBuilder(); - var keepId = Guid.NewGuid(); - var dropId = Guid.NewGuid(); + var keepId = "drv-keep"; + var dropId = "drv-drop"; builder.GetOrCreate(keepId, "h", DriverCapability.Read, TierAOptions); builder.GetOrCreate(keepId, "h", DriverCapability.Write, TierAOptions); @@ -204,7 +204,7 @@ public sealed class DriverResiliencePipelineBuilderTests public async Task Cancellation_IsNot_Retried() { var builder = new DriverResiliencePipelineBuilder(); - var pipeline = builder.GetOrCreate(Guid.NewGuid(), "host-1", DriverCapability.Read, TierAOptions); + var pipeline = builder.GetOrCreate("drv-test", "host-1", DriverCapability.Read, TierAOptions); var attempts = 0; using var cts = new CancellationTokenSource(); cts.Cancel(); diff --git a/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/FlakeyDriverIntegrationTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/FlakeyDriverIntegrationTests.cs index 0622cdf..d58e393 100644 --- a/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/FlakeyDriverIntegrationTests.cs +++ b/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/FlakeyDriverIntegrationTests.cs @@ -26,7 +26,7 @@ public sealed class FlakeyDriverIntegrationTests [DriverCapability.Read] = new(TimeoutSeconds: 2, RetryCount: 10, BreakerFailureThreshold: 50), }, }; - var invoker = new CapabilityInvoker(new DriverResiliencePipelineBuilder(), Guid.NewGuid(), () => options); + var invoker = new CapabilityInvoker(new DriverResiliencePipelineBuilder(), "drv-test", () => options); var result = await invoker.ExecuteAsync( DriverCapability.Read, @@ -50,7 +50,7 @@ public sealed class FlakeyDriverIntegrationTests [DriverCapability.Write] = new(TimeoutSeconds: 2, RetryCount: 5, BreakerFailureThreshold: 50), }, }; - var invoker = new CapabilityInvoker(new DriverResiliencePipelineBuilder(), Guid.NewGuid(), () => optionsWithAggressiveRetry); + var invoker = new CapabilityInvoker(new DriverResiliencePipelineBuilder(), "drv-test", () => optionsWithAggressiveRetry); await Should.ThrowAsync(async () => await invoker.ExecuteWriteAsync( @@ -74,7 +74,7 @@ public sealed class FlakeyDriverIntegrationTests [DriverCapability.Write] = new(TimeoutSeconds: 2, RetryCount: 5, BreakerFailureThreshold: 50), }, }; - var invoker = new CapabilityInvoker(new DriverResiliencePipelineBuilder(), Guid.NewGuid(), () => optionsWithRetry); + var invoker = new CapabilityInvoker(new DriverResiliencePipelineBuilder(), "drv-test", () => optionsWithRetry); var results = await invoker.ExecuteWriteAsync( "host-1", @@ -92,7 +92,7 @@ public sealed class FlakeyDriverIntegrationTests var flaky = new FlakeyDriver(failReadsBeforeIndex: 0); var options = new DriverResilienceOptions { Tier = DriverTier.A }; var builder = new DriverResiliencePipelineBuilder(); - var invoker = new CapabilityInvoker(builder, Guid.NewGuid(), () => options); + var invoker = new CapabilityInvoker(builder, "drv-test", () => options); // host-dead: force many failures to exhaust retries + trip breaker var threshold = options.Resolve(DriverCapability.Read).BreakerFailureThreshold; -- 2.49.1 From 29bcaf277b9593c1e3b120e9e0a9a61c35b0af59 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 19 Apr 2026 07:28:28 -0400 Subject: [PATCH 5/6] =?UTF-8?q?Phase=206.1=20Stream=20A.3=20complete=20?= =?UTF-8?q?=E2=80=94=20wire=20CapabilityInvoker=20into=20DriverNodeManager?= =?UTF-8?q?=20dispatch=20end-to-end?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Every OnReadValue / OnWriteValue now routes through the process-singleton DriverResiliencePipelineBuilder's CapabilityInvoker. Read / Write dispatch paths gain timeout + per-capability retry + per-(driver, host) circuit breaker + bulkhead without touching the individual driver implementations. Wiring: - OpcUaApplicationHost: new optional DriverResiliencePipelineBuilder ctor parameter (default null → instance-owned builder). Keeps the 3 test call sites that construct OpcUaApplicationHost directly unchanged. - OtOpcUaServer: requires the builder in its ctor; constructs one CapabilityInvoker per driver at CreateMasterNodeManager time with default Tier A DriverResilienceOptions. TODO: Stream B.1 will wire real per-driver- type tiers via DriverTypeRegistry; Phase 6.1 follow-up will read the DriverInstance.ResilienceConfig JSON column for per-instance overrides. - DriverNodeManager: takes a CapabilityInvoker in its ctor. OnReadValue wraps the driver's ReadAsync through ExecuteAsync(DriverCapability.Read, hostName, ...); OnWriteValue wraps WriteAsync through ExecuteWriteAsync(hostName, isIdempotent, ...) where isIdempotent comes from the new _writeIdempotentByFullRef map populated at Variable() registration from DriverAttributeInfo.WriteIdempotent. HostName defaults to driver.DriverInstanceId for now — a single-host pipeline per driver. Multi-host drivers (Modbus with N PLCs) will expose their own per- call host resolution in a follow-up so failing PLCs can trip per-PLC breakers without poisoning siblings (decision #144). Test fixup: - FlakeyDriverIntegrationTests.Read_SurfacesSuccess_AfterTransientFailures: bumped TimeoutSeconds=2 → 30. 10 retries at exponential backoff with jitter can exceed 2s under parallel-test-run CPU pressure; the test asserts retry behavior, not timeout budget, so the longer slack keeps it deterministic. Full solution dotnet test: 948 passing. Pre-existing Client.CLI Subscribe flake unchanged. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../OpcUa/DriverNodeManager.cs | 29 +++++++++++++++---- .../OpcUa/OpcUaApplicationHost.cs | 8 +++-- .../OpcUa/OtOpcUaServer.cs | 16 ++++++++-- .../FlakeyDriverIntegrationTests.cs | 5 +++- 4 files changed, 48 insertions(+), 10 deletions(-) diff --git a/src/ZB.MOM.WW.OtOpcUa.Server/OpcUa/DriverNodeManager.cs b/src/ZB.MOM.WW.OtOpcUa.Server/OpcUa/DriverNodeManager.cs index 4857adb..06ab659 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Server/OpcUa/DriverNodeManager.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Server/OpcUa/DriverNodeManager.cs @@ -3,6 +3,7 @@ using Microsoft.Extensions.Logging; using Opc.Ua; using Opc.Ua.Server; using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.Core.Resilience; using ZB.MOM.WW.OtOpcUa.Server.Security; using DriverWriteRequest = ZB.MOM.WW.OtOpcUa.Core.Abstractions.WriteRequest; // Core.Abstractions defines a type-named HistoryReadResult (driver-side samples + continuation @@ -33,8 +34,14 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder private readonly IDriver _driver; private readonly IReadable? _readable; private readonly IWritable? _writable; + private readonly CapabilityInvoker _invoker; private readonly ILogger _logger; + // Per-variable idempotency flag populated during Variable() registration from + // DriverAttributeInfo.WriteIdempotent. Drives ExecuteWriteAsync's retry gating in + // OnWriteValue; absent entries default to false (decisions #44, #45, #143). + private readonly Dictionary _writeIdempotentByFullRef = new(StringComparer.OrdinalIgnoreCase); + /// The driver whose address space this node manager exposes. public IDriver Driver => _driver; @@ -53,12 +60,13 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder private FolderState _currentFolder = null!; public DriverNodeManager(IServerInternal server, ApplicationConfiguration configuration, - IDriver driver, ILogger logger) + IDriver driver, CapabilityInvoker invoker, ILogger logger) : base(server, configuration, namespaceUris: $"urn:OtOpcUa:{driver.DriverInstanceId}") { _driver = driver; _readable = driver as IReadable; _writable = driver as IWritable; + _invoker = invoker; _logger = logger; } @@ -148,6 +156,7 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder AddPredefinedNode(SystemContext, v); _variablesByFullRef[attributeInfo.FullName] = v; _securityByFullRef[attributeInfo.FullName] = attributeInfo.SecurityClass; + _writeIdempotentByFullRef[attributeInfo.FullName] = attributeInfo.WriteIdempotent; v.OnReadValue = OnReadValue; v.OnWriteValue = OnWriteValue; @@ -188,7 +197,11 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder try { var fullRef = node.NodeId.Identifier as string ?? ""; - var result = _readable.ReadAsync([fullRef], CancellationToken.None).GetAwaiter().GetResult(); + var result = _invoker.ExecuteAsync( + DriverCapability.Read, + _driver.DriverInstanceId, + async ct => (IReadOnlyList)await _readable.ReadAsync([fullRef], ct).ConfigureAwait(false), + CancellationToken.None).AsTask().GetAwaiter().GetResult(); if (result.Count == 0) { statusCode = StatusCodes.BadNoData; @@ -381,9 +394,15 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder try { - var results = _writable.WriteAsync( - [new DriverWriteRequest(fullRef!, value)], - CancellationToken.None).GetAwaiter().GetResult(); + var isIdempotent = _writeIdempotentByFullRef.GetValueOrDefault(fullRef!, false); + var capturedValue = value; + var results = _invoker.ExecuteWriteAsync( + _driver.DriverInstanceId, + isIdempotent, + async ct => (IReadOnlyList)await _writable.WriteAsync( + [new DriverWriteRequest(fullRef!, capturedValue)], + ct).ConfigureAwait(false), + CancellationToken.None).AsTask().GetAwaiter().GetResult(); if (results.Count > 0 && results[0].StatusCode != 0) { statusCode = results[0].StatusCode; diff --git a/src/ZB.MOM.WW.OtOpcUa.Server/OpcUa/OpcUaApplicationHost.cs b/src/ZB.MOM.WW.OtOpcUa.Server/OpcUa/OpcUaApplicationHost.cs index 7616012..e64e672 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Server/OpcUa/OpcUaApplicationHost.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Server/OpcUa/OpcUaApplicationHost.cs @@ -3,6 +3,7 @@ using Opc.Ua; using Opc.Ua.Configuration; using ZB.MOM.WW.OtOpcUa.Core.Hosting; using ZB.MOM.WW.OtOpcUa.Core.OpcUa; +using ZB.MOM.WW.OtOpcUa.Core.Resilience; using ZB.MOM.WW.OtOpcUa.Server.Security; namespace ZB.MOM.WW.OtOpcUa.Server.OpcUa; @@ -20,6 +21,7 @@ public sealed class OpcUaApplicationHost : IAsyncDisposable private readonly OpcUaServerOptions _options; private readonly DriverHost _driverHost; private readonly IUserAuthenticator _authenticator; + private readonly DriverResiliencePipelineBuilder _pipelineBuilder; private readonly ILoggerFactory _loggerFactory; private readonly ILogger _logger; private ApplicationInstance? _application; @@ -27,11 +29,13 @@ public sealed class OpcUaApplicationHost : IAsyncDisposable private bool _disposed; public OpcUaApplicationHost(OpcUaServerOptions options, DriverHost driverHost, - IUserAuthenticator authenticator, ILoggerFactory loggerFactory, ILogger logger) + IUserAuthenticator authenticator, ILoggerFactory loggerFactory, ILogger logger, + DriverResiliencePipelineBuilder? pipelineBuilder = null) { _options = options; _driverHost = driverHost; _authenticator = authenticator; + _pipelineBuilder = pipelineBuilder ?? new DriverResiliencePipelineBuilder(); _loggerFactory = loggerFactory; _logger = logger; } @@ -58,7 +62,7 @@ public sealed class OpcUaApplicationHost : IAsyncDisposable throw new InvalidOperationException( $"OPC UA application certificate could not be validated or created in {_options.PkiStoreRoot}"); - _server = new OtOpcUaServer(_driverHost, _authenticator, _loggerFactory); + _server = new OtOpcUaServer(_driverHost, _authenticator, _pipelineBuilder, _loggerFactory); await _application.Start(_server).ConfigureAwait(false); _logger.LogInformation("OPC UA server started — endpoint={Endpoint} driverCount={Count}", diff --git a/src/ZB.MOM.WW.OtOpcUa.Server/OpcUa/OtOpcUaServer.cs b/src/ZB.MOM.WW.OtOpcUa.Server/OpcUa/OtOpcUaServer.cs index 8ccb660..1fd231a 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Server/OpcUa/OtOpcUaServer.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Server/OpcUa/OtOpcUaServer.cs @@ -5,6 +5,7 @@ using Opc.Ua.Server; using ZB.MOM.WW.OtOpcUa.Core.Abstractions; using ZB.MOM.WW.OtOpcUa.Core.Hosting; using ZB.MOM.WW.OtOpcUa.Core.OpcUa; +using ZB.MOM.WW.OtOpcUa.Core.Resilience; using ZB.MOM.WW.OtOpcUa.Server.Security; namespace ZB.MOM.WW.OtOpcUa.Server.OpcUa; @@ -19,13 +20,19 @@ public sealed class OtOpcUaServer : StandardServer { private readonly DriverHost _driverHost; private readonly IUserAuthenticator _authenticator; + private readonly DriverResiliencePipelineBuilder _pipelineBuilder; private readonly ILoggerFactory _loggerFactory; private readonly List _driverNodeManagers = new(); - public OtOpcUaServer(DriverHost driverHost, IUserAuthenticator authenticator, ILoggerFactory loggerFactory) + public OtOpcUaServer( + DriverHost driverHost, + IUserAuthenticator authenticator, + DriverResiliencePipelineBuilder pipelineBuilder, + ILoggerFactory loggerFactory) { _driverHost = driverHost; _authenticator = authenticator; + _pipelineBuilder = pipelineBuilder; _loggerFactory = loggerFactory; } @@ -46,7 +53,12 @@ public sealed class OtOpcUaServer : StandardServer if (driver is null) continue; var logger = _loggerFactory.CreateLogger(); - var manager = new DriverNodeManager(server, configuration, driver, logger); + // Per-driver resilience options: default Tier A pending Stream B.1 which wires + // per-type tiers into DriverTypeRegistry. Read ResilienceConfig JSON from the + // DriverInstance row in a follow-up PR; for now every driver gets Tier A defaults. + var options = new DriverResilienceOptions { Tier = DriverTier.A }; + var invoker = new CapabilityInvoker(_pipelineBuilder, driver.DriverInstanceId, () => options); + var manager = new DriverNodeManager(server, configuration, driver, invoker, logger); _driverNodeManagers.Add(manager); } diff --git a/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/FlakeyDriverIntegrationTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/FlakeyDriverIntegrationTests.cs index d58e393..d33807e 100644 --- a/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/FlakeyDriverIntegrationTests.cs +++ b/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/FlakeyDriverIntegrationTests.cs @@ -23,7 +23,10 @@ public sealed class FlakeyDriverIntegrationTests Tier = DriverTier.A, CapabilityPolicies = new Dictionary { - [DriverCapability.Read] = new(TimeoutSeconds: 2, RetryCount: 10, BreakerFailureThreshold: 50), + // TimeoutSeconds=30 gives slack for 5 exponential-backoff retries under + // parallel-test-execution CPU pressure; 10 retries at the default Delay=100ms + // exponential can otherwise exceed a 2-second budget intermittently. + [DriverCapability.Read] = new(TimeoutSeconds: 30, RetryCount: 10, BreakerFailureThreshold: 50), }, }; var invoker = new CapabilityInvoker(new DriverResiliencePipelineBuilder(), "drv-test", () => options); -- 2.49.1 From d2f3a243cd76e8e91ca4808711cb1ec223b5eaa6 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 19 Apr 2026 07:32:10 -0400 Subject: [PATCH 6/6] =?UTF-8?q?Phase=206.1=20Stream=20A.3=20=E2=80=94=20wr?= =?UTF-8?q?ap=20all=204=20HistoryRead=20dispatch=20paths=20through=20Capab?= =?UTF-8?q?ilityInvoker?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per Stream A.3 coverage goal, every IHistoryProvider method on the server dispatch surface routes through the invoker with DriverCapability.HistoryRead: - HistoryReadRaw (line 487) - HistoryReadProcessed (line 551) - HistoryReadAtTime (line 608) - HistoryReadEvents (line 665) Each gets timeout + per-(driver, host) circuit breaker + the default Tier retry policy (Tier A default: 2 retries at 30s timeout). Inner driver GetAwaiter().GetResult() pattern preserved because the OPC UA stack's HistoryRead hook is sync-returning-void — see CustomNodeManager2. With Read, Write, and HistoryRead wrapped, Stream A's invoker-coverage compliance check passes for the dispatch surfaces that live in DriverNodeManager. Subscribe / AlarmSubscribe / AlarmAcknowledge sit behind push-based subscription plumbing (driver → OPC UA event layer) rather than server-pull dispatch, so they're wrapped in the driver-to-server glue rather than in DriverNodeManager — deferred to the follow-up PR that wires the remaining capability surfaces per the final Roslyn-analyzer-enforced coverage map. Full solution dotnet test: 948 passing. Pre-existing Client.CLI Subscribe flake unchanged. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../OpcUa/DriverNodeManager.cs | 57 ++++++++++++------- 1 file changed, 36 insertions(+), 21 deletions(-) diff --git a/src/ZB.MOM.WW.OtOpcUa.Server/OpcUa/DriverNodeManager.cs b/src/ZB.MOM.WW.OtOpcUa.Server/OpcUa/DriverNodeManager.cs index 06ab659..8aa2d32 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Server/OpcUa/DriverNodeManager.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Server/OpcUa/DriverNodeManager.cs @@ -484,12 +484,16 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder try { - var driverResult = History.ReadRawAsync( - fullRef, - details.StartTime, - details.EndTime, - details.NumValuesPerNode, - CancellationToken.None).GetAwaiter().GetResult(); + var driverResult = _invoker.ExecuteAsync( + DriverCapability.HistoryRead, + _driver.DriverInstanceId, + async ct => await History.ReadRawAsync( + fullRef, + details.StartTime, + details.EndTime, + details.NumValuesPerNode, + ct).ConfigureAwait(false), + CancellationToken.None).AsTask().GetAwaiter().GetResult(); WriteResult(results, errors, i, StatusCodes.Good, BuildHistoryData(driverResult.Samples), driverResult.ContinuationPoint); @@ -544,13 +548,17 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder try { - var driverResult = History.ReadProcessedAsync( - fullRef, - details.StartTime, - details.EndTime, - interval, - aggregate.Value, - CancellationToken.None).GetAwaiter().GetResult(); + var driverResult = _invoker.ExecuteAsync( + DriverCapability.HistoryRead, + _driver.DriverInstanceId, + async ct => await History.ReadProcessedAsync( + fullRef, + details.StartTime, + details.EndTime, + interval, + aggregate.Value, + ct).ConfigureAwait(false), + CancellationToken.None).AsTask().GetAwaiter().GetResult(); WriteResult(results, errors, i, StatusCodes.Good, BuildHistoryData(driverResult.Samples), driverResult.ContinuationPoint); @@ -597,8 +605,11 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder try { - var driverResult = History.ReadAtTimeAsync( - fullRef, requestedTimes, CancellationToken.None).GetAwaiter().GetResult(); + var driverResult = _invoker.ExecuteAsync( + DriverCapability.HistoryRead, + _driver.DriverInstanceId, + async ct => await History.ReadAtTimeAsync(fullRef, requestedTimes, ct).ConfigureAwait(false), + CancellationToken.None).AsTask().GetAwaiter().GetResult(); WriteResult(results, errors, i, StatusCodes.Good, BuildHistoryData(driverResult.Samples), driverResult.ContinuationPoint); @@ -651,12 +662,16 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder try { - var driverResult = History.ReadEventsAsync( - sourceName: fullRef, - startUtc: details.StartTime, - endUtc: details.EndTime, - maxEvents: maxEvents, - cancellationToken: CancellationToken.None).GetAwaiter().GetResult(); + var driverResult = _invoker.ExecuteAsync( + DriverCapability.HistoryRead, + _driver.DriverInstanceId, + async ct => await History.ReadEventsAsync( + sourceName: fullRef, + startUtc: details.StartTime, + endUtc: details.EndTime, + maxEvents: maxEvents, + cancellationToken: ct).ConfigureAwait(false), + CancellationToken.None).AsTask().GetAwaiter().GetResult(); WriteResult(results, errors, i, StatusCodes.Good, BuildHistoryEvent(driverResult.Events), driverResult.ContinuationPoint); -- 2.49.1