diff --git a/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/DriverAttributeInfo.cs b/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/DriverAttributeInfo.cs index 7071770..1c24020 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/DriverAttributeInfo.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/DriverAttributeInfo.cs @@ -25,6 +25,14 @@ namespace ZB.MOM.WW.OtOpcUa.Core.Abstractions; /// OPC UA AlarmConditionState when true. Defaults to false so existing non-Galaxy /// drivers aren't forced to flow a flag they don't produce. /// +/// +/// True when a timed-out or failed write to this attribute is safe to replay. Per +/// docs/v2/plan.md decisions #44, #45, #143 — writes are NOT auto-retried by default +/// because replaying a pulse / alarm-ack / counter-increment / recipe-step advance can +/// duplicate field actions. Drivers flag only tags whose semantics make retry safe +/// (holding registers with level-set values, set-point writes to analog tags) — the +/// capability invoker respects this flag when deciding whether to apply Polly retry. +/// public sealed record DriverAttributeInfo( string FullName, DriverDataType DriverDataType, @@ -32,4 +40,5 @@ public sealed record DriverAttributeInfo( uint? ArrayDim, SecurityClassification SecurityClass, bool IsHistorized, - bool IsAlarm = false); + bool IsAlarm = false, + bool WriteIdempotent = false); diff --git a/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/DriverCapability.cs b/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/DriverCapability.cs new file mode 100644 index 0000000..79dfeac --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/DriverCapability.cs @@ -0,0 +1,42 @@ +namespace ZB.MOM.WW.OtOpcUa.Core.Abstractions; + +/// +/// Enumerates the driver-capability surface points guarded by Phase 6.1 resilience pipelines. +/// Each value corresponds to one method (or tightly-related method group) on the +/// Core.Abstractions capability interfaces (, , +/// , , , +/// , ). +/// +/// +/// Per docs/v2/plan.md decision #143 (per-capability retry policy): Read / HistoryRead / +/// Discover / Probe / AlarmSubscribe auto-retry; does NOT retry unless the +/// tag-definition carries . Alarm-acknowledge is treated +/// as a write for retry semantics (an alarm-ack is not idempotent at the plant-floor acknowledgement +/// level even if the OPC UA spec permits re-issue). +/// +public enum DriverCapability +{ + /// Batch . Retries by default. + Read, + + /// Batch . Does not retry unless tag is idempotent. + Write, + + /// . Retries by default. + Discover, + + /// and unsubscribe. Retries by default. + Subscribe, + + /// probe loop. Retries by default. + Probe, + + /// . Retries by default. + AlarmSubscribe, + + /// . Does NOT retry — ack is a write-shaped operation (decision #143). + AlarmAcknowledge, + + /// reads (Raw/Processed/AtTime/Events). Retries by default. + HistoryRead, +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/DriverTier.cs b/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/DriverTier.cs new file mode 100644 index 0000000..92d72da --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/DriverTier.cs @@ -0,0 +1,34 @@ +namespace ZB.MOM.WW.OtOpcUa.Core.Abstractions; + +/// +/// Stability tier of a driver type. Determines which cross-cutting runtime protections +/// apply — per-tier retry defaults, memory-tracking thresholds, and whether out-of-process +/// supervision with process-level recycle is in play. +/// +/// +/// Per docs/v2/driver-stability.md §2-4 and docs/v2/plan.md decisions #63-74. +/// +/// +/// A — managed, known-good SDK; low blast radius. In-process. Fast retries. +/// Examples: OPC UA Client (OPCFoundation stack), S7 (S7NetPlus). +/// B — native or semi-trusted SDK with an in-process footprint. Examples: Modbus. +/// C — unmanaged SDK with COM/STA constraints, leak risk, or other out-of-process +/// requirements. Must run as a separate Host process behind a Proxy with a supervisor that +/// can recycle the process on hard-breach. Example: Galaxy (MXAccess COM). +/// +/// +/// Process-kill protections (MemoryRecycle, ScheduledRecycleScheduler) are +/// Tier C only per decisions #73-74 and #145 — killing an in-process Tier A/B driver also kills +/// every OPC UA session and every co-hosted driver, blast-radius worse than the leak. +/// +public enum DriverTier +{ + /// Managed SDK, in-process, low blast radius. + A, + + /// Native or semi-trusted SDK, in-process. + B, + + /// Unmanaged SDK, out-of-process required with Proxy+Host+Supervisor. + C, +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/WriteIdempotentAttribute.cs b/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/WriteIdempotentAttribute.cs new file mode 100644 index 0000000..1c0c0e6 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/WriteIdempotentAttribute.cs @@ -0,0 +1,19 @@ +namespace ZB.MOM.WW.OtOpcUa.Core.Abstractions; + +/// +/// Opts a tag-definition record into auto-retry on failures. +/// Absence of this attribute means writes are not retried — a timed-out write may have +/// already succeeded at the device, and replaying pulses, alarm acks, counter increments, or +/// recipe-step advances can duplicate irreversible field actions. +/// +/// +/// Per docs/v2/plan.md decisions #44, #45, and #143. Applied to tag-definition POCOs +/// (e.g. ModbusTagDefinition, S7TagDefinition, OPC UA client tag rows) at the +/// property or record level. The CapabilityInvoker in ZB.MOM.WW.OtOpcUa.Core.Resilience +/// reads this attribute via reflection once at driver-init time and caches the result; no +/// per-write reflection cost. +/// +[AttributeUsage(AttributeTargets.Property | AttributeTargets.Class | AttributeTargets.Struct, AllowMultiple = false, Inherited = true)] +public sealed class WriteIdempotentAttribute : Attribute +{ +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Core/Resilience/CapabilityInvoker.cs b/src/ZB.MOM.WW.OtOpcUa.Core/Resilience/CapabilityInvoker.cs new file mode 100644 index 0000000..3c06eb6 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Core/Resilience/CapabilityInvoker.cs @@ -0,0 +1,106 @@ +using Polly; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; + +namespace ZB.MOM.WW.OtOpcUa.Core.Resilience; + +/// +/// Executes driver-capability calls through a shared Polly pipeline. One invoker per +/// (DriverInstance, IDriver) pair; the underlying +/// is process-singleton so all invokers share its cache. +/// +/// +/// Per docs/v2/plan.md decisions #143-144 and Phase 6.1 Stream A.3. The server's dispatch +/// layer routes every capability call (IReadable.ReadAsync, IWritable.WriteAsync, +/// ITagDiscovery.DiscoverAsync, ISubscribable.SubscribeAsync/UnsubscribeAsync, +/// IHostConnectivityProbe probe loop, IAlarmSource.SubscribeAlarmsAsync/AcknowledgeAsync, +/// and all four IHistoryProvider reads) through this invoker. +/// +public sealed class CapabilityInvoker +{ + private readonly DriverResiliencePipelineBuilder _builder; + private readonly string _driverInstanceId; + private readonly Func _optionsAccessor; + + /// + /// Construct an invoker for one driver instance. + /// + /// Shared, process-singleton pipeline builder. + /// The DriverInstance.Id column value. + /// + /// Snapshot accessor for the current resilience options. Invoked per call so Admin-edit + + /// pipeline-invalidate can take effect without restarting the invoker. + /// + public CapabilityInvoker( + DriverResiliencePipelineBuilder builder, + string driverInstanceId, + Func optionsAccessor) + { + ArgumentNullException.ThrowIfNull(builder); + ArgumentNullException.ThrowIfNull(optionsAccessor); + + _builder = builder; + _driverInstanceId = driverInstanceId; + _optionsAccessor = optionsAccessor; + } + + /// Execute a capability call returning a value, honoring the per-capability pipeline. + /// Return type of the underlying driver call. + public async ValueTask ExecuteAsync( + DriverCapability capability, + string hostName, + Func> callSite, + CancellationToken cancellationToken) + { + ArgumentNullException.ThrowIfNull(callSite); + + var pipeline = ResolvePipeline(capability, hostName); + return await pipeline.ExecuteAsync(callSite, cancellationToken).ConfigureAwait(false); + } + + /// Execute a void-returning capability call, honoring the per-capability pipeline. + public async ValueTask ExecuteAsync( + DriverCapability capability, + string hostName, + Func callSite, + CancellationToken cancellationToken) + { + ArgumentNullException.ThrowIfNull(callSite); + + var pipeline = ResolvePipeline(capability, hostName); + await pipeline.ExecuteAsync(callSite, cancellationToken).ConfigureAwait(false); + } + + /// + /// Execute a call honoring + /// semantics — if is false, retries are disabled regardless + /// of the tag-level configuration (the pipeline for a non-idempotent write never retries per + /// decisions #44-45). If true, the call runs through the capability's pipeline which may + /// retry when the tier configuration permits. + /// + public async ValueTask ExecuteWriteAsync( + string hostName, + bool isIdempotent, + Func> callSite, + CancellationToken cancellationToken) + { + ArgumentNullException.ThrowIfNull(callSite); + + if (!isIdempotent) + { + var noRetryOptions = _optionsAccessor() with + { + CapabilityPolicies = new Dictionary + { + [DriverCapability.Write] = _optionsAccessor().Resolve(DriverCapability.Write) with { RetryCount = 0 }, + }, + }; + var pipeline = _builder.GetOrCreate(_driverInstanceId, $"{hostName}::non-idempotent", DriverCapability.Write, noRetryOptions); + return await pipeline.ExecuteAsync(callSite, cancellationToken).ConfigureAwait(false); + } + + return await ExecuteAsync(DriverCapability.Write, hostName, callSite, cancellationToken).ConfigureAwait(false); + } + + private ResiliencePipeline ResolvePipeline(DriverCapability capability, string hostName) => + _builder.GetOrCreate(_driverInstanceId, hostName, capability, _optionsAccessor()); +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Core/Resilience/DriverResilienceOptions.cs b/src/ZB.MOM.WW.OtOpcUa.Core/Resilience/DriverResilienceOptions.cs new file mode 100644 index 0000000..8fe1497 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Core/Resilience/DriverResilienceOptions.cs @@ -0,0 +1,96 @@ +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; + +namespace ZB.MOM.WW.OtOpcUa.Core.Resilience; + +/// +/// Per-tier × per-capability resilience policy configuration for a driver instance. +/// Bound from DriverInstance.ResilienceConfig JSON (nullable column; null = tier defaults). +/// Per docs/v2/plan.md decisions #143 and #144. +/// +public sealed record DriverResilienceOptions +{ + /// Tier the owning driver type is registered as; drives the default map. + public required DriverTier Tier { get; init; } + + /// + /// Per-capability policy overrides. Capabilities absent from this map fall back to + /// for the configured . + /// + public IReadOnlyDictionary CapabilityPolicies { get; init; } + = new Dictionary(); + + /// Bulkhead (max concurrent in-flight calls) for every capability. Default 32. + public int BulkheadMaxConcurrent { get; init; } = 32; + + /// + /// Bulkhead queue depth. Zero = no queueing; overflow fails fast with + /// BulkheadRejectedException. Default 64. + /// + public int BulkheadMaxQueue { get; init; } = 64; + + /// + /// Look up the effective policy for a capability, falling back to tier defaults when no + /// override is configured. Never returns null. + /// + public CapabilityPolicy Resolve(DriverCapability capability) + { + if (CapabilityPolicies.TryGetValue(capability, out var policy)) + return policy; + + var defaults = GetTierDefaults(Tier); + return defaults[capability]; + } + + /// + /// Per-tier per-capability default policy table, per decisions #143-144 and the Phase 6.1 + /// Stream A.2 specification. Retries skipped on and + /// regardless of tier. + /// + public static IReadOnlyDictionary GetTierDefaults(DriverTier tier) => + tier switch + { + DriverTier.A => new Dictionary + { + [DriverCapability.Read] = new(TimeoutSeconds: 2, RetryCount: 3, BreakerFailureThreshold: 5), + [DriverCapability.Write] = new(TimeoutSeconds: 2, RetryCount: 0, BreakerFailureThreshold: 5), + [DriverCapability.Discover] = new(TimeoutSeconds: 30, RetryCount: 2, BreakerFailureThreshold: 3), + [DriverCapability.Subscribe] = new(TimeoutSeconds: 5, RetryCount: 3, BreakerFailureThreshold: 5), + [DriverCapability.Probe] = new(TimeoutSeconds: 2, RetryCount: 3, BreakerFailureThreshold: 5), + [DriverCapability.AlarmSubscribe] = new(TimeoutSeconds: 5, RetryCount: 3, BreakerFailureThreshold: 5), + [DriverCapability.AlarmAcknowledge] = new(TimeoutSeconds: 5, RetryCount: 0, BreakerFailureThreshold: 5), + [DriverCapability.HistoryRead] = new(TimeoutSeconds: 30, RetryCount: 2, BreakerFailureThreshold: 5), + }, + DriverTier.B => new Dictionary + { + [DriverCapability.Read] = new(TimeoutSeconds: 4, RetryCount: 3, BreakerFailureThreshold: 5), + [DriverCapability.Write] = new(TimeoutSeconds: 4, RetryCount: 0, BreakerFailureThreshold: 5), + [DriverCapability.Discover] = new(TimeoutSeconds: 60, RetryCount: 2, BreakerFailureThreshold: 3), + [DriverCapability.Subscribe] = new(TimeoutSeconds: 8, RetryCount: 3, BreakerFailureThreshold: 5), + [DriverCapability.Probe] = new(TimeoutSeconds: 4, RetryCount: 3, BreakerFailureThreshold: 5), + [DriverCapability.AlarmSubscribe] = new(TimeoutSeconds: 8, RetryCount: 3, BreakerFailureThreshold: 5), + [DriverCapability.AlarmAcknowledge] = new(TimeoutSeconds: 8, RetryCount: 0, BreakerFailureThreshold: 5), + [DriverCapability.HistoryRead] = new(TimeoutSeconds: 60, RetryCount: 2, BreakerFailureThreshold: 5), + }, + DriverTier.C => new Dictionary + { + [DriverCapability.Read] = new(TimeoutSeconds: 10, RetryCount: 1, BreakerFailureThreshold: 0), + [DriverCapability.Write] = new(TimeoutSeconds: 10, RetryCount: 0, BreakerFailureThreshold: 0), + [DriverCapability.Discover] = new(TimeoutSeconds: 120, RetryCount: 1, BreakerFailureThreshold: 0), + [DriverCapability.Subscribe] = new(TimeoutSeconds: 15, RetryCount: 1, BreakerFailureThreshold: 0), + [DriverCapability.Probe] = new(TimeoutSeconds: 10, RetryCount: 1, BreakerFailureThreshold: 0), + [DriverCapability.AlarmSubscribe] = new(TimeoutSeconds: 15, RetryCount: 1, BreakerFailureThreshold: 0), + [DriverCapability.AlarmAcknowledge] = new(TimeoutSeconds: 15, RetryCount: 0, BreakerFailureThreshold: 0), + [DriverCapability.HistoryRead] = new(TimeoutSeconds: 120, RetryCount: 1, BreakerFailureThreshold: 0), + }, + _ => throw new ArgumentOutOfRangeException(nameof(tier), tier, $"No default policy table defined for tier {tier}."), + }; +} + +/// Policy for one capability on one driver instance. +/// Per-call timeout (wraps the inner Polly execution). +/// Number of retry attempts after the first failure; zero = no retry. +/// +/// Consecutive-failure count that opens the circuit breaker; zero = no breaker +/// (Tier C uses the supervisor's process-level breaker instead, per decision #68). +/// +public sealed record CapabilityPolicy(int TimeoutSeconds, int RetryCount, int BreakerFailureThreshold); diff --git a/src/ZB.MOM.WW.OtOpcUa.Core/Resilience/DriverResiliencePipelineBuilder.cs b/src/ZB.MOM.WW.OtOpcUa.Core/Resilience/DriverResiliencePipelineBuilder.cs new file mode 100644 index 0000000..d7e25af --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Core/Resilience/DriverResiliencePipelineBuilder.cs @@ -0,0 +1,118 @@ +using System.Collections.Concurrent; +using Polly; +using Polly.CircuitBreaker; +using Polly.Retry; +using Polly.Timeout; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; + +namespace ZB.MOM.WW.OtOpcUa.Core.Resilience; + +/// +/// Builds and caches Polly resilience pipelines keyed on +/// (DriverInstanceId, HostName, DriverCapability). One dead PLC behind a multi-device +/// driver cannot open the circuit breaker for healthy sibling hosts. +/// +/// +/// Per docs/v2/plan.md decision #144 (per-device isolation). Composition from outside-in: +/// Timeout → Retry (when capability permits) → Circuit Breaker (when tier permits) → Bulkhead. +/// +/// Pipeline resolution is lock-free on the hot path: the inner +/// caches a per key; +/// first-call cost is one .Build. Thereafter reads are O(1). +/// +public sealed class DriverResiliencePipelineBuilder +{ + private readonly ConcurrentDictionary _pipelines = new(); + private readonly TimeProvider _timeProvider; + + /// Construct with the ambient clock (use in prod). + public DriverResiliencePipelineBuilder(TimeProvider? timeProvider = null) + { + _timeProvider = timeProvider ?? TimeProvider.System; + } + + /// + /// Get or build the pipeline for a given (driver instance, host, capability) triple. + /// Calls with the same key + same options reuse the same pipeline instance; the first caller + /// wins if a race occurs (both pipelines would be behaviourally identical). + /// + /// DriverInstance primary key — opaque to this layer. + /// + /// Host the call targets. For single-host drivers (Galaxy, some OPC UA Client configs) pass the + /// driver's canonical host string. For multi-host drivers (Modbus with N PLCs), pass the + /// specific PLC so one dead PLC doesn't poison healthy siblings. + /// + /// Which capability surface is being called. + /// Per-driver-instance options (tier + per-capability overrides). + public ResiliencePipeline GetOrCreate( + string driverInstanceId, + string hostName, + DriverCapability capability, + DriverResilienceOptions options) + { + ArgumentNullException.ThrowIfNull(options); + ArgumentException.ThrowIfNullOrWhiteSpace(hostName); + + var key = new PipelineKey(driverInstanceId, hostName, capability); + return _pipelines.GetOrAdd(key, static (_, state) => Build(state.capability, state.options, state.timeProvider), + (capability, options, timeProvider: _timeProvider)); + } + + /// Drop cached pipelines for one driver instance (e.g. on ResilienceConfig change). Test + Admin-reload use. + public int Invalidate(string driverInstanceId) + { + var removed = 0; + foreach (var key in _pipelines.Keys) + { + if (key.DriverInstanceId == driverInstanceId && _pipelines.TryRemove(key, out _)) + removed++; + } + return removed; + } + + /// Snapshot of the current number of cached pipelines. For diagnostics only. + public int CachedPipelineCount => _pipelines.Count; + + private static ResiliencePipeline Build( + DriverCapability capability, + DriverResilienceOptions options, + TimeProvider timeProvider) + { + var policy = options.Resolve(capability); + var builder = new ResiliencePipelineBuilder { TimeProvider = timeProvider }; + + builder.AddTimeout(new TimeoutStrategyOptions + { + Timeout = TimeSpan.FromSeconds(policy.TimeoutSeconds), + }); + + if (policy.RetryCount > 0) + { + builder.AddRetry(new RetryStrategyOptions + { + MaxRetryAttempts = policy.RetryCount, + BackoffType = DelayBackoffType.Exponential, + UseJitter = true, + Delay = TimeSpan.FromMilliseconds(100), + MaxDelay = TimeSpan.FromSeconds(5), + ShouldHandle = new PredicateBuilder().Handle(ex => ex is not OperationCanceledException), + }); + } + + if (policy.BreakerFailureThreshold > 0) + { + builder.AddCircuitBreaker(new CircuitBreakerStrategyOptions + { + FailureRatio = 1.0, + MinimumThroughput = policy.BreakerFailureThreshold, + SamplingDuration = TimeSpan.FromSeconds(30), + BreakDuration = TimeSpan.FromSeconds(15), + ShouldHandle = new PredicateBuilder().Handle(ex => ex is not OperationCanceledException), + }); + } + + return builder.Build(); + } + + private readonly record struct PipelineKey(string DriverInstanceId, string HostName, DriverCapability Capability); +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Core/ZB.MOM.WW.OtOpcUa.Core.csproj b/src/ZB.MOM.WW.OtOpcUa.Core/ZB.MOM.WW.OtOpcUa.Core.csproj index 8e48171..805bcff 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Core/ZB.MOM.WW.OtOpcUa.Core.csproj +++ b/src/ZB.MOM.WW.OtOpcUa.Core/ZB.MOM.WW.OtOpcUa.Core.csproj @@ -16,6 +16,10 @@ + + + + diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriver.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriver.cs index 2d2eec9..cbc7bf9 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriver.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriver.cs @@ -115,7 +115,8 @@ public sealed class ModbusDriver(ModbusDriverOptions options, string driverInsta ArrayDim: null, SecurityClass: t.Writable ? SecurityClassification.Operate : SecurityClassification.ViewOnly, IsHistorized: false, - IsAlarm: false)); + IsAlarm: false, + WriteIdempotent: t.WriteIdempotent)); } return Task.CompletedTask; } diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriverOptions.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriverOptions.cs index e05c44d..c119d4e 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriverOptions.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriverOptions.cs @@ -92,6 +92,14 @@ public sealed class ModbusProbeOptions /// AutomationDirect DirectLOGIC (DL205/DL260) and a few legacy families pack the first /// character in the low byte instead — see docs/v2/dl205.md §strings. /// +/// +/// Per docs/v2/plan.md decisions #44, #45, #143 — flag a tag as safe to replay on +/// write timeout / failure. Default false; writes do not auto-retry. Safe candidates: +/// holding-register set-points for analog values and configuration registers where the same +/// value can be written again without side-effects. Unsafe: coils that drive edge-triggered +/// actions (pulse outputs), counter-increment addresses on PLCs that treat writes as deltas, +/// any BCD / counter register where repeat-writes advance state. +/// public sealed record ModbusTagDefinition( string Name, ModbusRegion Region, @@ -101,7 +109,8 @@ public sealed record ModbusTagDefinition( ModbusByteOrder ByteOrder = ModbusByteOrder.BigEndian, byte BitIndex = 0, ushort StringLength = 0, - ModbusStringByteOrder StringByteOrder = ModbusStringByteOrder.HighByteFirst); + ModbusStringByteOrder StringByteOrder = ModbusStringByteOrder.HighByteFirst, + bool WriteIdempotent = false); public enum ModbusRegion { Coils, DiscreteInputs, InputRegisters, HoldingRegisters } diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.S7/S7Driver.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.S7/S7Driver.cs index 708ad33..b7bb365 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.S7/S7Driver.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.S7/S7Driver.cs @@ -341,7 +341,8 @@ public sealed class S7Driver(S7DriverOptions options, string driverInstanceId) ArrayDim: null, SecurityClass: t.Writable ? SecurityClassification.Operate : SecurityClassification.ViewOnly, IsHistorized: false, - IsAlarm: false)); + IsAlarm: false, + WriteIdempotent: t.WriteIdempotent)); } return Task.CompletedTask; } diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.S7/S7DriverOptions.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.S7/S7DriverOptions.cs index 8f0e4ca..c3cc172 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.S7/S7DriverOptions.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.S7/S7DriverOptions.cs @@ -88,12 +88,20 @@ public sealed class S7ProbeOptions /// Logical data type — drives the underlying S7.Net read/write width. /// When true the driver accepts writes for this tag. /// For DataType = String: S7-string max length. Default 254 (S7 max). +/// +/// Per docs/v2/plan.md decisions #44, #45, #143 — flag a tag as safe to replay on +/// write timeout / failure. Default false; writes do not auto-retry. Safe candidates +/// on S7: DB word/dword set-points holding analog values, configuration DBs where the same +/// value can be written again without side-effects. Unsafe: M (merker) bits or Q (output) +/// coils that drive edge-triggered routines in the PLC program. +/// public sealed record S7TagDefinition( string Name, string Address, S7DataType DataType, bool Writable = true, - int StringLength = 254); + int StringLength = 254, + bool WriteIdempotent = false); public enum S7DataType { diff --git a/src/ZB.MOM.WW.OtOpcUa.Server/OpcUa/DriverNodeManager.cs b/src/ZB.MOM.WW.OtOpcUa.Server/OpcUa/DriverNodeManager.cs index 4857adb..8aa2d32 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Server/OpcUa/DriverNodeManager.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Server/OpcUa/DriverNodeManager.cs @@ -3,6 +3,7 @@ using Microsoft.Extensions.Logging; using Opc.Ua; using Opc.Ua.Server; using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.Core.Resilience; using ZB.MOM.WW.OtOpcUa.Server.Security; using DriverWriteRequest = ZB.MOM.WW.OtOpcUa.Core.Abstractions.WriteRequest; // Core.Abstractions defines a type-named HistoryReadResult (driver-side samples + continuation @@ -33,8 +34,14 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder private readonly IDriver _driver; private readonly IReadable? _readable; private readonly IWritable? _writable; + private readonly CapabilityInvoker _invoker; private readonly ILogger _logger; + // Per-variable idempotency flag populated during Variable() registration from + // DriverAttributeInfo.WriteIdempotent. Drives ExecuteWriteAsync's retry gating in + // OnWriteValue; absent entries default to false (decisions #44, #45, #143). + private readonly Dictionary _writeIdempotentByFullRef = new(StringComparer.OrdinalIgnoreCase); + /// The driver whose address space this node manager exposes. public IDriver Driver => _driver; @@ -53,12 +60,13 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder private FolderState _currentFolder = null!; public DriverNodeManager(IServerInternal server, ApplicationConfiguration configuration, - IDriver driver, ILogger logger) + IDriver driver, CapabilityInvoker invoker, ILogger logger) : base(server, configuration, namespaceUris: $"urn:OtOpcUa:{driver.DriverInstanceId}") { _driver = driver; _readable = driver as IReadable; _writable = driver as IWritable; + _invoker = invoker; _logger = logger; } @@ -148,6 +156,7 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder AddPredefinedNode(SystemContext, v); _variablesByFullRef[attributeInfo.FullName] = v; _securityByFullRef[attributeInfo.FullName] = attributeInfo.SecurityClass; + _writeIdempotentByFullRef[attributeInfo.FullName] = attributeInfo.WriteIdempotent; v.OnReadValue = OnReadValue; v.OnWriteValue = OnWriteValue; @@ -188,7 +197,11 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder try { var fullRef = node.NodeId.Identifier as string ?? ""; - var result = _readable.ReadAsync([fullRef], CancellationToken.None).GetAwaiter().GetResult(); + var result = _invoker.ExecuteAsync( + DriverCapability.Read, + _driver.DriverInstanceId, + async ct => (IReadOnlyList)await _readable.ReadAsync([fullRef], ct).ConfigureAwait(false), + CancellationToken.None).AsTask().GetAwaiter().GetResult(); if (result.Count == 0) { statusCode = StatusCodes.BadNoData; @@ -381,9 +394,15 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder try { - var results = _writable.WriteAsync( - [new DriverWriteRequest(fullRef!, value)], - CancellationToken.None).GetAwaiter().GetResult(); + var isIdempotent = _writeIdempotentByFullRef.GetValueOrDefault(fullRef!, false); + var capturedValue = value; + var results = _invoker.ExecuteWriteAsync( + _driver.DriverInstanceId, + isIdempotent, + async ct => (IReadOnlyList)await _writable.WriteAsync( + [new DriverWriteRequest(fullRef!, capturedValue)], + ct).ConfigureAwait(false), + CancellationToken.None).AsTask().GetAwaiter().GetResult(); if (results.Count > 0 && results[0].StatusCode != 0) { statusCode = results[0].StatusCode; @@ -465,12 +484,16 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder try { - var driverResult = History.ReadRawAsync( - fullRef, - details.StartTime, - details.EndTime, - details.NumValuesPerNode, - CancellationToken.None).GetAwaiter().GetResult(); + var driverResult = _invoker.ExecuteAsync( + DriverCapability.HistoryRead, + _driver.DriverInstanceId, + async ct => await History.ReadRawAsync( + fullRef, + details.StartTime, + details.EndTime, + details.NumValuesPerNode, + ct).ConfigureAwait(false), + CancellationToken.None).AsTask().GetAwaiter().GetResult(); WriteResult(results, errors, i, StatusCodes.Good, BuildHistoryData(driverResult.Samples), driverResult.ContinuationPoint); @@ -525,13 +548,17 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder try { - var driverResult = History.ReadProcessedAsync( - fullRef, - details.StartTime, - details.EndTime, - interval, - aggregate.Value, - CancellationToken.None).GetAwaiter().GetResult(); + var driverResult = _invoker.ExecuteAsync( + DriverCapability.HistoryRead, + _driver.DriverInstanceId, + async ct => await History.ReadProcessedAsync( + fullRef, + details.StartTime, + details.EndTime, + interval, + aggregate.Value, + ct).ConfigureAwait(false), + CancellationToken.None).AsTask().GetAwaiter().GetResult(); WriteResult(results, errors, i, StatusCodes.Good, BuildHistoryData(driverResult.Samples), driverResult.ContinuationPoint); @@ -578,8 +605,11 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder try { - var driverResult = History.ReadAtTimeAsync( - fullRef, requestedTimes, CancellationToken.None).GetAwaiter().GetResult(); + var driverResult = _invoker.ExecuteAsync( + DriverCapability.HistoryRead, + _driver.DriverInstanceId, + async ct => await History.ReadAtTimeAsync(fullRef, requestedTimes, ct).ConfigureAwait(false), + CancellationToken.None).AsTask().GetAwaiter().GetResult(); WriteResult(results, errors, i, StatusCodes.Good, BuildHistoryData(driverResult.Samples), driverResult.ContinuationPoint); @@ -632,12 +662,16 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder try { - var driverResult = History.ReadEventsAsync( - sourceName: fullRef, - startUtc: details.StartTime, - endUtc: details.EndTime, - maxEvents: maxEvents, - cancellationToken: CancellationToken.None).GetAwaiter().GetResult(); + var driverResult = _invoker.ExecuteAsync( + DriverCapability.HistoryRead, + _driver.DriverInstanceId, + async ct => await History.ReadEventsAsync( + sourceName: fullRef, + startUtc: details.StartTime, + endUtc: details.EndTime, + maxEvents: maxEvents, + cancellationToken: ct).ConfigureAwait(false), + CancellationToken.None).AsTask().GetAwaiter().GetResult(); WriteResult(results, errors, i, StatusCodes.Good, BuildHistoryEvent(driverResult.Events), driverResult.ContinuationPoint); diff --git a/src/ZB.MOM.WW.OtOpcUa.Server/OpcUa/OpcUaApplicationHost.cs b/src/ZB.MOM.WW.OtOpcUa.Server/OpcUa/OpcUaApplicationHost.cs index 7616012..e64e672 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Server/OpcUa/OpcUaApplicationHost.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Server/OpcUa/OpcUaApplicationHost.cs @@ -3,6 +3,7 @@ using Opc.Ua; using Opc.Ua.Configuration; using ZB.MOM.WW.OtOpcUa.Core.Hosting; using ZB.MOM.WW.OtOpcUa.Core.OpcUa; +using ZB.MOM.WW.OtOpcUa.Core.Resilience; using ZB.MOM.WW.OtOpcUa.Server.Security; namespace ZB.MOM.WW.OtOpcUa.Server.OpcUa; @@ -20,6 +21,7 @@ public sealed class OpcUaApplicationHost : IAsyncDisposable private readonly OpcUaServerOptions _options; private readonly DriverHost _driverHost; private readonly IUserAuthenticator _authenticator; + private readonly DriverResiliencePipelineBuilder _pipelineBuilder; private readonly ILoggerFactory _loggerFactory; private readonly ILogger _logger; private ApplicationInstance? _application; @@ -27,11 +29,13 @@ public sealed class OpcUaApplicationHost : IAsyncDisposable private bool _disposed; public OpcUaApplicationHost(OpcUaServerOptions options, DriverHost driverHost, - IUserAuthenticator authenticator, ILoggerFactory loggerFactory, ILogger logger) + IUserAuthenticator authenticator, ILoggerFactory loggerFactory, ILogger logger, + DriverResiliencePipelineBuilder? pipelineBuilder = null) { _options = options; _driverHost = driverHost; _authenticator = authenticator; + _pipelineBuilder = pipelineBuilder ?? new DriverResiliencePipelineBuilder(); _loggerFactory = loggerFactory; _logger = logger; } @@ -58,7 +62,7 @@ public sealed class OpcUaApplicationHost : IAsyncDisposable throw new InvalidOperationException( $"OPC UA application certificate could not be validated or created in {_options.PkiStoreRoot}"); - _server = new OtOpcUaServer(_driverHost, _authenticator, _loggerFactory); + _server = new OtOpcUaServer(_driverHost, _authenticator, _pipelineBuilder, _loggerFactory); await _application.Start(_server).ConfigureAwait(false); _logger.LogInformation("OPC UA server started — endpoint={Endpoint} driverCount={Count}", diff --git a/src/ZB.MOM.WW.OtOpcUa.Server/OpcUa/OtOpcUaServer.cs b/src/ZB.MOM.WW.OtOpcUa.Server/OpcUa/OtOpcUaServer.cs index 8ccb660..1fd231a 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Server/OpcUa/OtOpcUaServer.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Server/OpcUa/OtOpcUaServer.cs @@ -5,6 +5,7 @@ using Opc.Ua.Server; using ZB.MOM.WW.OtOpcUa.Core.Abstractions; using ZB.MOM.WW.OtOpcUa.Core.Hosting; using ZB.MOM.WW.OtOpcUa.Core.OpcUa; +using ZB.MOM.WW.OtOpcUa.Core.Resilience; using ZB.MOM.WW.OtOpcUa.Server.Security; namespace ZB.MOM.WW.OtOpcUa.Server.OpcUa; @@ -19,13 +20,19 @@ public sealed class OtOpcUaServer : StandardServer { private readonly DriverHost _driverHost; private readonly IUserAuthenticator _authenticator; + private readonly DriverResiliencePipelineBuilder _pipelineBuilder; private readonly ILoggerFactory _loggerFactory; private readonly List _driverNodeManagers = new(); - public OtOpcUaServer(DriverHost driverHost, IUserAuthenticator authenticator, ILoggerFactory loggerFactory) + public OtOpcUaServer( + DriverHost driverHost, + IUserAuthenticator authenticator, + DriverResiliencePipelineBuilder pipelineBuilder, + ILoggerFactory loggerFactory) { _driverHost = driverHost; _authenticator = authenticator; + _pipelineBuilder = pipelineBuilder; _loggerFactory = loggerFactory; } @@ -46,7 +53,12 @@ public sealed class OtOpcUaServer : StandardServer if (driver is null) continue; var logger = _loggerFactory.CreateLogger(); - var manager = new DriverNodeManager(server, configuration, driver, logger); + // Per-driver resilience options: default Tier A pending Stream B.1 which wires + // per-type tiers into DriverTypeRegistry. Read ResilienceConfig JSON from the + // DriverInstance row in a follow-up PR; for now every driver gets Tier A defaults. + var options = new DriverResilienceOptions { Tier = DriverTier.A }; + var invoker = new CapabilityInvoker(_pipelineBuilder, driver.DriverInstanceId, () => options); + var manager = new DriverNodeManager(server, configuration, driver, invoker, logger); _driverNodeManagers.Add(manager); } diff --git a/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/CapabilityInvokerTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/CapabilityInvokerTests.cs new file mode 100644 index 0000000..9085524 --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/CapabilityInvokerTests.cs @@ -0,0 +1,151 @@ +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.Core.Resilience; + +namespace ZB.MOM.WW.OtOpcUa.Core.Tests.Resilience; + +[Trait("Category", "Unit")] +public sealed class CapabilityInvokerTests +{ + private static CapabilityInvoker MakeInvoker( + DriverResiliencePipelineBuilder builder, + DriverResilienceOptions options) => + new(builder, "drv-test", () => options); + + [Fact] + public async Task Read_ReturnsValue_FromCallSite() + { + var invoker = MakeInvoker(new DriverResiliencePipelineBuilder(), new DriverResilienceOptions { Tier = DriverTier.A }); + + var result = await invoker.ExecuteAsync( + DriverCapability.Read, + "host-1", + _ => ValueTask.FromResult(42), + CancellationToken.None); + + result.ShouldBe(42); + } + + [Fact] + public async Task Read_Retries_OnTransientFailure() + { + var invoker = MakeInvoker(new DriverResiliencePipelineBuilder(), new DriverResilienceOptions { Tier = DriverTier.A }); + var attempts = 0; + + var result = await invoker.ExecuteAsync( + DriverCapability.Read, + "host-1", + async _ => + { + attempts++; + if (attempts < 2) throw new InvalidOperationException("transient"); + await Task.Yield(); + return "ok"; + }, + CancellationToken.None); + + result.ShouldBe("ok"); + attempts.ShouldBe(2); + } + + [Fact] + public async Task Write_NonIdempotent_DoesNotRetry_EvenWhenPolicyHasRetries() + { + var options = new DriverResilienceOptions + { + Tier = DriverTier.A, + CapabilityPolicies = new Dictionary + { + [DriverCapability.Write] = new(TimeoutSeconds: 2, RetryCount: 3, BreakerFailureThreshold: 5), + }, + }; + var invoker = MakeInvoker(new DriverResiliencePipelineBuilder(), options); + var attempts = 0; + + await Should.ThrowAsync(async () => + await invoker.ExecuteWriteAsync( + "host-1", + isIdempotent: false, + async _ => + { + attempts++; + await Task.Yield(); + throw new InvalidOperationException("boom"); +#pragma warning disable CS0162 + return 0; +#pragma warning restore CS0162 + }, + CancellationToken.None)); + + attempts.ShouldBe(1, "non-idempotent write must never replay"); + } + + [Fact] + public async Task Write_Idempotent_Retries_WhenPolicyHasRetries() + { + var options = new DriverResilienceOptions + { + Tier = DriverTier.A, + CapabilityPolicies = new Dictionary + { + [DriverCapability.Write] = new(TimeoutSeconds: 2, RetryCount: 3, BreakerFailureThreshold: 5), + }, + }; + var invoker = MakeInvoker(new DriverResiliencePipelineBuilder(), options); + var attempts = 0; + + var result = await invoker.ExecuteWriteAsync( + "host-1", + isIdempotent: true, + async _ => + { + attempts++; + if (attempts < 2) throw new InvalidOperationException("transient"); + await Task.Yield(); + return "ok"; + }, + CancellationToken.None); + + result.ShouldBe("ok"); + attempts.ShouldBe(2); + } + + [Fact] + public async Task Write_Default_DoesNotRetry_WhenPolicyHasZeroRetries() + { + // Tier A Write default is RetryCount=0. Even isIdempotent=true shouldn't retry + // because the policy says not to. + var invoker = MakeInvoker(new DriverResiliencePipelineBuilder(), new DriverResilienceOptions { Tier = DriverTier.A }); + var attempts = 0; + + await Should.ThrowAsync(async () => + await invoker.ExecuteWriteAsync( + "host-1", + isIdempotent: true, + async _ => + { + attempts++; + await Task.Yield(); + throw new InvalidOperationException("boom"); +#pragma warning disable CS0162 + return 0; +#pragma warning restore CS0162 + }, + CancellationToken.None)); + + attempts.ShouldBe(1, "tier-A default for Write is RetryCount=0"); + } + + [Fact] + public async Task Execute_HonorsDifferentHosts_Independently() + { + var builder = new DriverResiliencePipelineBuilder(); + var invoker = MakeInvoker(builder, new DriverResilienceOptions { Tier = DriverTier.A }); + + await invoker.ExecuteAsync(DriverCapability.Read, "host-a", _ => ValueTask.FromResult(1), CancellationToken.None); + await invoker.ExecuteAsync(DriverCapability.Read, "host-b", _ => ValueTask.FromResult(2), CancellationToken.None); + + builder.CachedPipelineCount.ShouldBe(2); + } +} diff --git a/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/DriverResilienceOptionsTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/DriverResilienceOptionsTests.cs new file mode 100644 index 0000000..6d23681 --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/DriverResilienceOptionsTests.cs @@ -0,0 +1,102 @@ +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.Core.Resilience; + +namespace ZB.MOM.WW.OtOpcUa.Core.Tests.Resilience; + +[Trait("Category", "Unit")] +public sealed class DriverResilienceOptionsTests +{ + [Theory] + [InlineData(DriverTier.A)] + [InlineData(DriverTier.B)] + [InlineData(DriverTier.C)] + public void TierDefaults_Cover_EveryCapability(DriverTier tier) + { + var defaults = DriverResilienceOptions.GetTierDefaults(tier); + + foreach (var capability in Enum.GetValues()) + defaults.ShouldContainKey(capability); + } + + [Theory] + [InlineData(DriverTier.A)] + [InlineData(DriverTier.B)] + [InlineData(DriverTier.C)] + public void Write_NeverRetries_ByDefault(DriverTier tier) + { + var defaults = DriverResilienceOptions.GetTierDefaults(tier); + defaults[DriverCapability.Write].RetryCount.ShouldBe(0); + } + + [Theory] + [InlineData(DriverTier.A)] + [InlineData(DriverTier.B)] + [InlineData(DriverTier.C)] + public void AlarmAcknowledge_NeverRetries_ByDefault(DriverTier tier) + { + var defaults = DriverResilienceOptions.GetTierDefaults(tier); + defaults[DriverCapability.AlarmAcknowledge].RetryCount.ShouldBe(0); + } + + [Theory] + [InlineData(DriverTier.A, DriverCapability.Read)] + [InlineData(DriverTier.A, DriverCapability.HistoryRead)] + [InlineData(DriverTier.B, DriverCapability.Discover)] + [InlineData(DriverTier.B, DriverCapability.Probe)] + [InlineData(DriverTier.C, DriverCapability.AlarmSubscribe)] + public void IdempotentCapabilities_Retry_ByDefault(DriverTier tier, DriverCapability capability) + { + var defaults = DriverResilienceOptions.GetTierDefaults(tier); + defaults[capability].RetryCount.ShouldBeGreaterThan(0); + } + + [Fact] + public void TierC_DisablesCircuitBreaker_DeferringToSupervisor() + { + var defaults = DriverResilienceOptions.GetTierDefaults(DriverTier.C); + + foreach (var (_, policy) in defaults) + policy.BreakerFailureThreshold.ShouldBe(0, "Tier C breaker is handled by the Proxy supervisor (decision #68)"); + } + + [Theory] + [InlineData(DriverTier.A)] + [InlineData(DriverTier.B)] + public void TierAAndB_EnableCircuitBreaker(DriverTier tier) + { + var defaults = DriverResilienceOptions.GetTierDefaults(tier); + + foreach (var (_, policy) in defaults) + policy.BreakerFailureThreshold.ShouldBeGreaterThan(0); + } + + [Fact] + public void Resolve_Uses_TierDefaults_When_NoOverride() + { + var options = new DriverResilienceOptions { Tier = DriverTier.A }; + + var resolved = options.Resolve(DriverCapability.Read); + + resolved.ShouldBe(DriverResilienceOptions.GetTierDefaults(DriverTier.A)[DriverCapability.Read]); + } + + [Fact] + public void Resolve_Uses_Override_When_Configured() + { + var custom = new CapabilityPolicy(TimeoutSeconds: 42, RetryCount: 7, BreakerFailureThreshold: 9); + var options = new DriverResilienceOptions + { + Tier = DriverTier.A, + CapabilityPolicies = new Dictionary + { + [DriverCapability.Read] = custom, + }, + }; + + options.Resolve(DriverCapability.Read).ShouldBe(custom); + options.Resolve(DriverCapability.Write).ShouldBe( + DriverResilienceOptions.GetTierDefaults(DriverTier.A)[DriverCapability.Write]); + } +} diff --git a/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/DriverResiliencePipelineBuilderTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/DriverResiliencePipelineBuilderTests.cs new file mode 100644 index 0000000..1167c5b --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/DriverResiliencePipelineBuilderTests.cs @@ -0,0 +1,222 @@ +using Polly.CircuitBreaker; +using Polly.Timeout; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.Core.Resilience; + +namespace ZB.MOM.WW.OtOpcUa.Core.Tests.Resilience; + +[Trait("Category", "Unit")] +public sealed class DriverResiliencePipelineBuilderTests +{ + private static readonly DriverResilienceOptions TierAOptions = new() { Tier = DriverTier.A }; + + [Fact] + public async Task Read_Retries_Transient_Failures() + { + var builder = new DriverResiliencePipelineBuilder(); + var pipeline = builder.GetOrCreate("drv-test", "host-1", DriverCapability.Read, TierAOptions); + var attempts = 0; + + await pipeline.ExecuteAsync(async _ => + { + attempts++; + if (attempts < 3) throw new InvalidOperationException("transient"); + await Task.Yield(); + }); + + attempts.ShouldBe(3); + } + + [Fact] + public async Task Write_DoesNotRetry_OnFailure() + { + var builder = new DriverResiliencePipelineBuilder(); + var pipeline = builder.GetOrCreate("drv-test", "host-1", DriverCapability.Write, TierAOptions); + var attempts = 0; + + var ex = await Should.ThrowAsync(async () => + { + await pipeline.ExecuteAsync(async _ => + { + attempts++; + await Task.Yield(); + throw new InvalidOperationException("boom"); + }); + }); + + attempts.ShouldBe(1); + ex.Message.ShouldBe("boom"); + } + + [Fact] + public async Task AlarmAcknowledge_DoesNotRetry_OnFailure() + { + var builder = new DriverResiliencePipelineBuilder(); + var pipeline = builder.GetOrCreate("drv-test", "host-1", DriverCapability.AlarmAcknowledge, TierAOptions); + var attempts = 0; + + await Should.ThrowAsync(async () => + { + await pipeline.ExecuteAsync(async _ => + { + attempts++; + await Task.Yield(); + throw new InvalidOperationException("boom"); + }); + }); + + attempts.ShouldBe(1); + } + + [Fact] + public void Pipeline_IsIsolated_PerHost() + { + var builder = new DriverResiliencePipelineBuilder(); + var driverId = "drv-test"; + + var hostA = builder.GetOrCreate(driverId, "host-a", DriverCapability.Read, TierAOptions); + var hostB = builder.GetOrCreate(driverId, "host-b", DriverCapability.Read, TierAOptions); + + hostA.ShouldNotBeSameAs(hostB); + builder.CachedPipelineCount.ShouldBe(2); + } + + [Fact] + public void Pipeline_IsReused_ForSameTriple() + { + var builder = new DriverResiliencePipelineBuilder(); + var driverId = "drv-test"; + + var first = builder.GetOrCreate(driverId, "host-a", DriverCapability.Read, TierAOptions); + var second = builder.GetOrCreate(driverId, "host-a", DriverCapability.Read, TierAOptions); + + first.ShouldBeSameAs(second); + builder.CachedPipelineCount.ShouldBe(1); + } + + [Fact] + public void Pipeline_IsIsolated_PerCapability() + { + var builder = new DriverResiliencePipelineBuilder(); + var driverId = "drv-test"; + + var read = builder.GetOrCreate(driverId, "host-a", DriverCapability.Read, TierAOptions); + var write = builder.GetOrCreate(driverId, "host-a", DriverCapability.Write, TierAOptions); + + read.ShouldNotBeSameAs(write); + } + + [Fact] + public async Task DeadHost_DoesNotOpenBreaker_ForSiblingHost() + { + var builder = new DriverResiliencePipelineBuilder(); + var driverId = "drv-test"; + + var deadHost = builder.GetOrCreate(driverId, "dead-plc", DriverCapability.Read, TierAOptions); + var liveHost = builder.GetOrCreate(driverId, "live-plc", DriverCapability.Read, TierAOptions); + + var threshold = TierAOptions.Resolve(DriverCapability.Read).BreakerFailureThreshold; + for (var i = 0; i < threshold + 5; i++) + { + await Should.ThrowAsync(async () => + await deadHost.ExecuteAsync(async _ => + { + await Task.Yield(); + throw new InvalidOperationException("dead plc"); + })); + } + + var liveAttempts = 0; + await liveHost.ExecuteAsync(async _ => + { + liveAttempts++; + await Task.Yield(); + }); + + liveAttempts.ShouldBe(1, "healthy sibling host must not be affected by dead peer"); + } + + [Fact] + public async Task CircuitBreaker_Opens_AfterFailureThreshold_OnTierA() + { + var builder = new DriverResiliencePipelineBuilder(); + var pipeline = builder.GetOrCreate("drv-test", "host-1", DriverCapability.Write, TierAOptions); + + var threshold = TierAOptions.Resolve(DriverCapability.Write).BreakerFailureThreshold; + for (var i = 0; i < threshold; i++) + { + await Should.ThrowAsync(async () => + await pipeline.ExecuteAsync(async _ => + { + await Task.Yield(); + throw new InvalidOperationException("boom"); + })); + } + + await Should.ThrowAsync(async () => + await pipeline.ExecuteAsync(async _ => + { + await Task.Yield(); + })); + } + + [Fact] + public async Task Timeout_Cancels_SlowOperation() + { + var tierAWithShortTimeout = new DriverResilienceOptions + { + Tier = DriverTier.A, + CapabilityPolicies = new Dictionary + { + [DriverCapability.Read] = new(TimeoutSeconds: 1, RetryCount: 0, BreakerFailureThreshold: 5), + }, + }; + var builder = new DriverResiliencePipelineBuilder(); + var pipeline = builder.GetOrCreate("drv-test", "host-1", DriverCapability.Read, tierAWithShortTimeout); + + await Should.ThrowAsync(async () => + await pipeline.ExecuteAsync(async ct => + { + await Task.Delay(TimeSpan.FromSeconds(5), ct); + })); + } + + [Fact] + public void Invalidate_Removes_OnlyMatchingInstance() + { + var builder = new DriverResiliencePipelineBuilder(); + var keepId = "drv-keep"; + var dropId = "drv-drop"; + + builder.GetOrCreate(keepId, "h", DriverCapability.Read, TierAOptions); + builder.GetOrCreate(keepId, "h", DriverCapability.Write, TierAOptions); + builder.GetOrCreate(dropId, "h", DriverCapability.Read, TierAOptions); + + var removed = builder.Invalidate(dropId); + + removed.ShouldBe(1); + builder.CachedPipelineCount.ShouldBe(2); + } + + [Fact] + public async Task Cancellation_IsNot_Retried() + { + var builder = new DriverResiliencePipelineBuilder(); + var pipeline = builder.GetOrCreate("drv-test", "host-1", DriverCapability.Read, TierAOptions); + var attempts = 0; + using var cts = new CancellationTokenSource(); + cts.Cancel(); + + await Should.ThrowAsync(async () => + await pipeline.ExecuteAsync(async ct => + { + attempts++; + ct.ThrowIfCancellationRequested(); + await Task.Yield(); + }, cts.Token)); + + attempts.ShouldBeLessThanOrEqualTo(1); + } +} diff --git a/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/FlakeyDriverIntegrationTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/FlakeyDriverIntegrationTests.cs new file mode 100644 index 0000000..d33807e --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/FlakeyDriverIntegrationTests.cs @@ -0,0 +1,160 @@ +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.Core.Resilience; + +namespace ZB.MOM.WW.OtOpcUa.Core.Tests.Resilience; + +/// +/// Integration tests for the Phase 6.1 Stream A.5 contract — wrapping a flaky +/// / through the . +/// Exercises the three scenarios the plan enumerates: transient read succeeds after N +/// retries; non-idempotent write fails after one attempt; idempotent write retries through. +/// +[Trait("Category", "Integration")] +public sealed class FlakeyDriverIntegrationTests +{ + [Fact] + public async Task Read_SurfacesSuccess_AfterTransientFailures() + { + var flaky = new FlakeyDriver(failReadsBeforeIndex: 5); + var options = new DriverResilienceOptions + { + Tier = DriverTier.A, + CapabilityPolicies = new Dictionary + { + // TimeoutSeconds=30 gives slack for 5 exponential-backoff retries under + // parallel-test-execution CPU pressure; 10 retries at the default Delay=100ms + // exponential can otherwise exceed a 2-second budget intermittently. + [DriverCapability.Read] = new(TimeoutSeconds: 30, RetryCount: 10, BreakerFailureThreshold: 50), + }, + }; + var invoker = new CapabilityInvoker(new DriverResiliencePipelineBuilder(), "drv-test", () => options); + + var result = await invoker.ExecuteAsync( + DriverCapability.Read, + "host-1", + async ct => await flaky.ReadAsync(["tag-a"], ct), + CancellationToken.None); + + flaky.ReadAttempts.ShouldBe(6); + result[0].StatusCode.ShouldBe(0u); + } + + [Fact] + public async Task Write_NonIdempotent_FailsOnFirstFailure_NoReplay() + { + var flaky = new FlakeyDriver(failWritesBeforeIndex: 3); + var optionsWithAggressiveRetry = new DriverResilienceOptions + { + Tier = DriverTier.A, + CapabilityPolicies = new Dictionary + { + [DriverCapability.Write] = new(TimeoutSeconds: 2, RetryCount: 5, BreakerFailureThreshold: 50), + }, + }; + var invoker = new CapabilityInvoker(new DriverResiliencePipelineBuilder(), "drv-test", () => optionsWithAggressiveRetry); + + await Should.ThrowAsync(async () => + await invoker.ExecuteWriteAsync( + "host-1", + isIdempotent: false, + async ct => await flaky.WriteAsync([new WriteRequest("pulse-coil", true)], ct), + CancellationToken.None)); + + flaky.WriteAttempts.ShouldBe(1, "non-idempotent write must never replay (decision #44)"); + } + + [Fact] + public async Task Write_Idempotent_RetriesUntilSuccess() + { + var flaky = new FlakeyDriver(failWritesBeforeIndex: 2); + var optionsWithRetry = new DriverResilienceOptions + { + Tier = DriverTier.A, + CapabilityPolicies = new Dictionary + { + [DriverCapability.Write] = new(TimeoutSeconds: 2, RetryCount: 5, BreakerFailureThreshold: 50), + }, + }; + var invoker = new CapabilityInvoker(new DriverResiliencePipelineBuilder(), "drv-test", () => optionsWithRetry); + + var results = await invoker.ExecuteWriteAsync( + "host-1", + isIdempotent: true, + async ct => await flaky.WriteAsync([new WriteRequest("set-point", 42.0f)], ct), + CancellationToken.None); + + flaky.WriteAttempts.ShouldBe(3); + results[0].StatusCode.ShouldBe(0u); + } + + [Fact] + public async Task MultipleHosts_OnOneDriver_HaveIndependentFailureCounts() + { + var flaky = new FlakeyDriver(failReadsBeforeIndex: 0); + var options = new DriverResilienceOptions { Tier = DriverTier.A }; + var builder = new DriverResiliencePipelineBuilder(); + var invoker = new CapabilityInvoker(builder, "drv-test", () => options); + + // host-dead: force many failures to exhaust retries + trip breaker + var threshold = options.Resolve(DriverCapability.Read).BreakerFailureThreshold; + for (var i = 0; i < threshold + 5; i++) + { + await Should.ThrowAsync(async () => + await invoker.ExecuteAsync(DriverCapability.Read, "host-dead", + _ => throw new InvalidOperationException("dead"), + CancellationToken.None)); + } + + // host-live: succeeds on first call — unaffected by the dead-host breaker + var liveAttempts = 0; + await invoker.ExecuteAsync(DriverCapability.Read, "host-live", + _ => { liveAttempts++; return ValueTask.FromResult("ok"); }, + CancellationToken.None); + + liveAttempts.ShouldBe(1); + } + + private sealed class FlakeyDriver : IReadable, IWritable + { + private readonly int _failReadsBeforeIndex; + private readonly int _failWritesBeforeIndex; + + public int ReadAttempts { get; private set; } + public int WriteAttempts { get; private set; } + + public FlakeyDriver(int failReadsBeforeIndex = 0, int failWritesBeforeIndex = 0) + { + _failReadsBeforeIndex = failReadsBeforeIndex; + _failWritesBeforeIndex = failWritesBeforeIndex; + } + + public Task> ReadAsync( + IReadOnlyList fullReferences, + CancellationToken cancellationToken) + { + var attempt = ++ReadAttempts; + if (attempt <= _failReadsBeforeIndex) + throw new InvalidOperationException($"transient read failure #{attempt}"); + + var now = DateTime.UtcNow; + IReadOnlyList result = fullReferences + .Select(_ => new DataValueSnapshot(Value: 0, StatusCode: 0u, SourceTimestampUtc: now, ServerTimestampUtc: now)) + .ToList(); + return Task.FromResult(result); + } + + public Task> WriteAsync( + IReadOnlyList writes, + CancellationToken cancellationToken) + { + var attempt = ++WriteAttempts; + if (attempt <= _failWritesBeforeIndex) + throw new InvalidOperationException($"transient write failure #{attempt}"); + + IReadOnlyList result = writes.Select(_ => new WriteResult(0u)).ToList(); + return Task.FromResult(result); + } + } +} diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.Modbus.Tests/ModbusDriverTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.Modbus.Tests/ModbusDriverTests.cs index 0b31fd2..48bb565 100644 --- a/tests/ZB.MOM.WW.OtOpcUa.Driver.Modbus.Tests/ModbusDriverTests.cs +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.Modbus.Tests/ModbusDriverTests.cs @@ -220,6 +220,23 @@ public sealed class ModbusDriverTests builder.Variables.ShouldContain(v => v.BrowseName == "Run" && v.Info.DriverDataType == DriverDataType.Boolean); } + [Fact] + public async Task Discover_propagates_WriteIdempotent_from_tag_to_attribute_info() + { + var (drv, _) = NewDriver( + new ModbusTagDefinition("SetPoint", ModbusRegion.HoldingRegisters, 0, ModbusDataType.Float32, WriteIdempotent: true), + new ModbusTagDefinition("PulseCoil", ModbusRegion.Coils, 0, ModbusDataType.Bool)); + await drv.InitializeAsync("{}", CancellationToken.None); + + var builder = new RecordingBuilder(); + await drv.DiscoverAsync(builder, CancellationToken.None); + + var setPoint = builder.Variables.Single(v => v.BrowseName == "SetPoint"); + var pulse = builder.Variables.Single(v => v.BrowseName == "PulseCoil"); + setPoint.Info.WriteIdempotent.ShouldBeTrue(); + pulse.Info.WriteIdempotent.ShouldBeFalse("default is opt-in per decision #44"); + } + // --- helpers --- private sealed class RecordingBuilder : IAddressSpaceBuilder diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.S7.Tests/S7DiscoveryAndSubscribeTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.S7.Tests/S7DiscoveryAndSubscribeTests.cs index 339369b..dc9d7dc 100644 --- a/tests/ZB.MOM.WW.OtOpcUa.Driver.S7.Tests/S7DiscoveryAndSubscribeTests.cs +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.S7.Tests/S7DiscoveryAndSubscribeTests.cs @@ -65,6 +65,27 @@ public sealed class S7DiscoveryAndSubscribeTests builder.Variables[2].Attr.DriverDataType.ShouldBe(DriverDataType.Float32); } + [Fact] + public async Task DiscoverAsync_propagates_WriteIdempotent_from_tag_to_attribute_info() + { + var opts = new S7DriverOptions + { + Host = "192.0.2.1", + Tags = + [ + new("SetPoint", "DB1.DBW0", S7DataType.Int16, WriteIdempotent: true), + new("StartBit", "M0.0", S7DataType.Bool), + ], + }; + using var drv = new S7Driver(opts, "s7-idem"); + + var builder = new RecordingAddressSpaceBuilder(); + await drv.DiscoverAsync(builder, TestContext.Current.CancellationToken); + + builder.Variables.Single(v => v.Name == "SetPoint").Attr.WriteIdempotent.ShouldBeTrue(); + builder.Variables.Single(v => v.Name == "StartBit").Attr.WriteIdempotent.ShouldBeFalse("default is opt-in per decision #44"); + } + [Fact] public void GetHostStatuses_returns_one_row_with_host_port_identity_pre_init() {