diff --git a/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/DriverCapability.cs b/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/DriverCapability.cs new file mode 100644 index 0000000..79dfeac --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/DriverCapability.cs @@ -0,0 +1,42 @@ +namespace ZB.MOM.WW.OtOpcUa.Core.Abstractions; + +/// +/// Enumerates the driver-capability surface points guarded by Phase 6.1 resilience pipelines. +/// Each value corresponds to one method (or tightly-related method group) on the +/// Core.Abstractions capability interfaces (, , +/// , , , +/// , ). +/// +/// +/// Per docs/v2/plan.md decision #143 (per-capability retry policy): Read / HistoryRead / +/// Discover / Probe / AlarmSubscribe auto-retry; does NOT retry unless the +/// tag-definition carries . Alarm-acknowledge is treated +/// as a write for retry semantics (an alarm-ack is not idempotent at the plant-floor acknowledgement +/// level even if the OPC UA spec permits re-issue). +/// +public enum DriverCapability +{ + /// Batch . Retries by default. + Read, + + /// Batch . Does not retry unless tag is idempotent. + Write, + + /// . Retries by default. + Discover, + + /// and unsubscribe. Retries by default. + Subscribe, + + /// probe loop. Retries by default. + Probe, + + /// . Retries by default. + AlarmSubscribe, + + /// . Does NOT retry — ack is a write-shaped operation (decision #143). + AlarmAcknowledge, + + /// reads (Raw/Processed/AtTime/Events). Retries by default. + HistoryRead, +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/DriverTier.cs b/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/DriverTier.cs new file mode 100644 index 0000000..92d72da --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/DriverTier.cs @@ -0,0 +1,34 @@ +namespace ZB.MOM.WW.OtOpcUa.Core.Abstractions; + +/// +/// Stability tier of a driver type. Determines which cross-cutting runtime protections +/// apply — per-tier retry defaults, memory-tracking thresholds, and whether out-of-process +/// supervision with process-level recycle is in play. +/// +/// +/// Per docs/v2/driver-stability.md §2-4 and docs/v2/plan.md decisions #63-74. +/// +/// +/// A — managed, known-good SDK; low blast radius. In-process. Fast retries. +/// Examples: OPC UA Client (OPCFoundation stack), S7 (S7NetPlus). +/// B — native or semi-trusted SDK with an in-process footprint. Examples: Modbus. +/// C — unmanaged SDK with COM/STA constraints, leak risk, or other out-of-process +/// requirements. Must run as a separate Host process behind a Proxy with a supervisor that +/// can recycle the process on hard-breach. Example: Galaxy (MXAccess COM). +/// +/// +/// Process-kill protections (MemoryRecycle, ScheduledRecycleScheduler) are +/// Tier C only per decisions #73-74 and #145 — killing an in-process Tier A/B driver also kills +/// every OPC UA session and every co-hosted driver, blast-radius worse than the leak. +/// +public enum DriverTier +{ + /// Managed SDK, in-process, low blast radius. + A, + + /// Native or semi-trusted SDK, in-process. + B, + + /// Unmanaged SDK, out-of-process required with Proxy+Host+Supervisor. + C, +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/WriteIdempotentAttribute.cs b/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/WriteIdempotentAttribute.cs new file mode 100644 index 0000000..1c0c0e6 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/WriteIdempotentAttribute.cs @@ -0,0 +1,19 @@ +namespace ZB.MOM.WW.OtOpcUa.Core.Abstractions; + +/// +/// Opts a tag-definition record into auto-retry on failures. +/// Absence of this attribute means writes are not retried — a timed-out write may have +/// already succeeded at the device, and replaying pulses, alarm acks, counter increments, or +/// recipe-step advances can duplicate irreversible field actions. +/// +/// +/// Per docs/v2/plan.md decisions #44, #45, and #143. Applied to tag-definition POCOs +/// (e.g. ModbusTagDefinition, S7TagDefinition, OPC UA client tag rows) at the +/// property or record level. The CapabilityInvoker in ZB.MOM.WW.OtOpcUa.Core.Resilience +/// reads this attribute via reflection once at driver-init time and caches the result; no +/// per-write reflection cost. +/// +[AttributeUsage(AttributeTargets.Property | AttributeTargets.Class | AttributeTargets.Struct, AllowMultiple = false, Inherited = true)] +public sealed class WriteIdempotentAttribute : Attribute +{ +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Core/Resilience/DriverResilienceOptions.cs b/src/ZB.MOM.WW.OtOpcUa.Core/Resilience/DriverResilienceOptions.cs new file mode 100644 index 0000000..8fe1497 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Core/Resilience/DriverResilienceOptions.cs @@ -0,0 +1,96 @@ +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; + +namespace ZB.MOM.WW.OtOpcUa.Core.Resilience; + +/// +/// Per-tier × per-capability resilience policy configuration for a driver instance. +/// Bound from DriverInstance.ResilienceConfig JSON (nullable column; null = tier defaults). +/// Per docs/v2/plan.md decisions #143 and #144. +/// +public sealed record DriverResilienceOptions +{ + /// Tier the owning driver type is registered as; drives the default map. + public required DriverTier Tier { get; init; } + + /// + /// Per-capability policy overrides. Capabilities absent from this map fall back to + /// for the configured . + /// + public IReadOnlyDictionary CapabilityPolicies { get; init; } + = new Dictionary(); + + /// Bulkhead (max concurrent in-flight calls) for every capability. Default 32. + public int BulkheadMaxConcurrent { get; init; } = 32; + + /// + /// Bulkhead queue depth. Zero = no queueing; overflow fails fast with + /// BulkheadRejectedException. Default 64. + /// + public int BulkheadMaxQueue { get; init; } = 64; + + /// + /// Look up the effective policy for a capability, falling back to tier defaults when no + /// override is configured. Never returns null. + /// + public CapabilityPolicy Resolve(DriverCapability capability) + { + if (CapabilityPolicies.TryGetValue(capability, out var policy)) + return policy; + + var defaults = GetTierDefaults(Tier); + return defaults[capability]; + } + + /// + /// Per-tier per-capability default policy table, per decisions #143-144 and the Phase 6.1 + /// Stream A.2 specification. Retries skipped on and + /// regardless of tier. + /// + public static IReadOnlyDictionary GetTierDefaults(DriverTier tier) => + tier switch + { + DriverTier.A => new Dictionary + { + [DriverCapability.Read] = new(TimeoutSeconds: 2, RetryCount: 3, BreakerFailureThreshold: 5), + [DriverCapability.Write] = new(TimeoutSeconds: 2, RetryCount: 0, BreakerFailureThreshold: 5), + [DriverCapability.Discover] = new(TimeoutSeconds: 30, RetryCount: 2, BreakerFailureThreshold: 3), + [DriverCapability.Subscribe] = new(TimeoutSeconds: 5, RetryCount: 3, BreakerFailureThreshold: 5), + [DriverCapability.Probe] = new(TimeoutSeconds: 2, RetryCount: 3, BreakerFailureThreshold: 5), + [DriverCapability.AlarmSubscribe] = new(TimeoutSeconds: 5, RetryCount: 3, BreakerFailureThreshold: 5), + [DriverCapability.AlarmAcknowledge] = new(TimeoutSeconds: 5, RetryCount: 0, BreakerFailureThreshold: 5), + [DriverCapability.HistoryRead] = new(TimeoutSeconds: 30, RetryCount: 2, BreakerFailureThreshold: 5), + }, + DriverTier.B => new Dictionary + { + [DriverCapability.Read] = new(TimeoutSeconds: 4, RetryCount: 3, BreakerFailureThreshold: 5), + [DriverCapability.Write] = new(TimeoutSeconds: 4, RetryCount: 0, BreakerFailureThreshold: 5), + [DriverCapability.Discover] = new(TimeoutSeconds: 60, RetryCount: 2, BreakerFailureThreshold: 3), + [DriverCapability.Subscribe] = new(TimeoutSeconds: 8, RetryCount: 3, BreakerFailureThreshold: 5), + [DriverCapability.Probe] = new(TimeoutSeconds: 4, RetryCount: 3, BreakerFailureThreshold: 5), + [DriverCapability.AlarmSubscribe] = new(TimeoutSeconds: 8, RetryCount: 3, BreakerFailureThreshold: 5), + [DriverCapability.AlarmAcknowledge] = new(TimeoutSeconds: 8, RetryCount: 0, BreakerFailureThreshold: 5), + [DriverCapability.HistoryRead] = new(TimeoutSeconds: 60, RetryCount: 2, BreakerFailureThreshold: 5), + }, + DriverTier.C => new Dictionary + { + [DriverCapability.Read] = new(TimeoutSeconds: 10, RetryCount: 1, BreakerFailureThreshold: 0), + [DriverCapability.Write] = new(TimeoutSeconds: 10, RetryCount: 0, BreakerFailureThreshold: 0), + [DriverCapability.Discover] = new(TimeoutSeconds: 120, RetryCount: 1, BreakerFailureThreshold: 0), + [DriverCapability.Subscribe] = new(TimeoutSeconds: 15, RetryCount: 1, BreakerFailureThreshold: 0), + [DriverCapability.Probe] = new(TimeoutSeconds: 10, RetryCount: 1, BreakerFailureThreshold: 0), + [DriverCapability.AlarmSubscribe] = new(TimeoutSeconds: 15, RetryCount: 1, BreakerFailureThreshold: 0), + [DriverCapability.AlarmAcknowledge] = new(TimeoutSeconds: 15, RetryCount: 0, BreakerFailureThreshold: 0), + [DriverCapability.HistoryRead] = new(TimeoutSeconds: 120, RetryCount: 1, BreakerFailureThreshold: 0), + }, + _ => throw new ArgumentOutOfRangeException(nameof(tier), tier, $"No default policy table defined for tier {tier}."), + }; +} + +/// Policy for one capability on one driver instance. +/// Per-call timeout (wraps the inner Polly execution). +/// Number of retry attempts after the first failure; zero = no retry. +/// +/// Consecutive-failure count that opens the circuit breaker; zero = no breaker +/// (Tier C uses the supervisor's process-level breaker instead, per decision #68). +/// +public sealed record CapabilityPolicy(int TimeoutSeconds, int RetryCount, int BreakerFailureThreshold); diff --git a/src/ZB.MOM.WW.OtOpcUa.Core/Resilience/DriverResiliencePipelineBuilder.cs b/src/ZB.MOM.WW.OtOpcUa.Core/Resilience/DriverResiliencePipelineBuilder.cs new file mode 100644 index 0000000..b3ad043 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Core/Resilience/DriverResiliencePipelineBuilder.cs @@ -0,0 +1,118 @@ +using System.Collections.Concurrent; +using Polly; +using Polly.CircuitBreaker; +using Polly.Retry; +using Polly.Timeout; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; + +namespace ZB.MOM.WW.OtOpcUa.Core.Resilience; + +/// +/// Builds and caches Polly resilience pipelines keyed on +/// (DriverInstanceId, HostName, DriverCapability). One dead PLC behind a multi-device +/// driver cannot open the circuit breaker for healthy sibling hosts. +/// +/// +/// Per docs/v2/plan.md decision #144 (per-device isolation). Composition from outside-in: +/// Timeout → Retry (when capability permits) → Circuit Breaker (when tier permits) → Bulkhead. +/// +/// Pipeline resolution is lock-free on the hot path: the inner +/// caches a per key; +/// first-call cost is one .Build. Thereafter reads are O(1). +/// +public sealed class DriverResiliencePipelineBuilder +{ + private readonly ConcurrentDictionary _pipelines = new(); + private readonly TimeProvider _timeProvider; + + /// Construct with the ambient clock (use in prod). + public DriverResiliencePipelineBuilder(TimeProvider? timeProvider = null) + { + _timeProvider = timeProvider ?? TimeProvider.System; + } + + /// + /// Get or build the pipeline for a given (driver instance, host, capability) triple. + /// Calls with the same key + same options reuse the same pipeline instance; the first caller + /// wins if a race occurs (both pipelines would be behaviourally identical). + /// + /// DriverInstance primary key — opaque to this layer. + /// + /// Host the call targets. For single-host drivers (Galaxy, some OPC UA Client configs) pass the + /// driver's canonical host string. For multi-host drivers (Modbus with N PLCs), pass the + /// specific PLC so one dead PLC doesn't poison healthy siblings. + /// + /// Which capability surface is being called. + /// Per-driver-instance options (tier + per-capability overrides). + public ResiliencePipeline GetOrCreate( + Guid driverInstanceId, + string hostName, + DriverCapability capability, + DriverResilienceOptions options) + { + ArgumentNullException.ThrowIfNull(options); + ArgumentException.ThrowIfNullOrWhiteSpace(hostName); + + var key = new PipelineKey(driverInstanceId, hostName, capability); + return _pipelines.GetOrAdd(key, static (_, state) => Build(state.capability, state.options, state.timeProvider), + (capability, options, timeProvider: _timeProvider)); + } + + /// Drop cached pipelines for one driver instance (e.g. on ResilienceConfig change). Test + Admin-reload use. + public int Invalidate(Guid driverInstanceId) + { + var removed = 0; + foreach (var key in _pipelines.Keys) + { + if (key.DriverInstanceId == driverInstanceId && _pipelines.TryRemove(key, out _)) + removed++; + } + return removed; + } + + /// Snapshot of the current number of cached pipelines. For diagnostics only. + public int CachedPipelineCount => _pipelines.Count; + + private static ResiliencePipeline Build( + DriverCapability capability, + DriverResilienceOptions options, + TimeProvider timeProvider) + { + var policy = options.Resolve(capability); + var builder = new ResiliencePipelineBuilder { TimeProvider = timeProvider }; + + builder.AddTimeout(new TimeoutStrategyOptions + { + Timeout = TimeSpan.FromSeconds(policy.TimeoutSeconds), + }); + + if (policy.RetryCount > 0) + { + builder.AddRetry(new RetryStrategyOptions + { + MaxRetryAttempts = policy.RetryCount, + BackoffType = DelayBackoffType.Exponential, + UseJitter = true, + Delay = TimeSpan.FromMilliseconds(100), + MaxDelay = TimeSpan.FromSeconds(5), + ShouldHandle = new PredicateBuilder().Handle(ex => ex is not OperationCanceledException), + }); + } + + if (policy.BreakerFailureThreshold > 0) + { + builder.AddCircuitBreaker(new CircuitBreakerStrategyOptions + { + FailureRatio = 1.0, + MinimumThroughput = policy.BreakerFailureThreshold, + SamplingDuration = TimeSpan.FromSeconds(30), + BreakDuration = TimeSpan.FromSeconds(15), + ShouldHandle = new PredicateBuilder().Handle(ex => ex is not OperationCanceledException), + }); + } + + return builder.Build(); + } + + private readonly record struct PipelineKey(Guid DriverInstanceId, string HostName, DriverCapability Capability); +} diff --git a/src/ZB.MOM.WW.OtOpcUa.Core/ZB.MOM.WW.OtOpcUa.Core.csproj b/src/ZB.MOM.WW.OtOpcUa.Core/ZB.MOM.WW.OtOpcUa.Core.csproj index 8e48171..805bcff 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Core/ZB.MOM.WW.OtOpcUa.Core.csproj +++ b/src/ZB.MOM.WW.OtOpcUa.Core/ZB.MOM.WW.OtOpcUa.Core.csproj @@ -16,6 +16,10 @@ + + + + diff --git a/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/DriverResilienceOptionsTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/DriverResilienceOptionsTests.cs new file mode 100644 index 0000000..6d23681 --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/DriverResilienceOptionsTests.cs @@ -0,0 +1,102 @@ +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.Core.Resilience; + +namespace ZB.MOM.WW.OtOpcUa.Core.Tests.Resilience; + +[Trait("Category", "Unit")] +public sealed class DriverResilienceOptionsTests +{ + [Theory] + [InlineData(DriverTier.A)] + [InlineData(DriverTier.B)] + [InlineData(DriverTier.C)] + public void TierDefaults_Cover_EveryCapability(DriverTier tier) + { + var defaults = DriverResilienceOptions.GetTierDefaults(tier); + + foreach (var capability in Enum.GetValues()) + defaults.ShouldContainKey(capability); + } + + [Theory] + [InlineData(DriverTier.A)] + [InlineData(DriverTier.B)] + [InlineData(DriverTier.C)] + public void Write_NeverRetries_ByDefault(DriverTier tier) + { + var defaults = DriverResilienceOptions.GetTierDefaults(tier); + defaults[DriverCapability.Write].RetryCount.ShouldBe(0); + } + + [Theory] + [InlineData(DriverTier.A)] + [InlineData(DriverTier.B)] + [InlineData(DriverTier.C)] + public void AlarmAcknowledge_NeverRetries_ByDefault(DriverTier tier) + { + var defaults = DriverResilienceOptions.GetTierDefaults(tier); + defaults[DriverCapability.AlarmAcknowledge].RetryCount.ShouldBe(0); + } + + [Theory] + [InlineData(DriverTier.A, DriverCapability.Read)] + [InlineData(DriverTier.A, DriverCapability.HistoryRead)] + [InlineData(DriverTier.B, DriverCapability.Discover)] + [InlineData(DriverTier.B, DriverCapability.Probe)] + [InlineData(DriverTier.C, DriverCapability.AlarmSubscribe)] + public void IdempotentCapabilities_Retry_ByDefault(DriverTier tier, DriverCapability capability) + { + var defaults = DriverResilienceOptions.GetTierDefaults(tier); + defaults[capability].RetryCount.ShouldBeGreaterThan(0); + } + + [Fact] + public void TierC_DisablesCircuitBreaker_DeferringToSupervisor() + { + var defaults = DriverResilienceOptions.GetTierDefaults(DriverTier.C); + + foreach (var (_, policy) in defaults) + policy.BreakerFailureThreshold.ShouldBe(0, "Tier C breaker is handled by the Proxy supervisor (decision #68)"); + } + + [Theory] + [InlineData(DriverTier.A)] + [InlineData(DriverTier.B)] + public void TierAAndB_EnableCircuitBreaker(DriverTier tier) + { + var defaults = DriverResilienceOptions.GetTierDefaults(tier); + + foreach (var (_, policy) in defaults) + policy.BreakerFailureThreshold.ShouldBeGreaterThan(0); + } + + [Fact] + public void Resolve_Uses_TierDefaults_When_NoOverride() + { + var options = new DriverResilienceOptions { Tier = DriverTier.A }; + + var resolved = options.Resolve(DriverCapability.Read); + + resolved.ShouldBe(DriverResilienceOptions.GetTierDefaults(DriverTier.A)[DriverCapability.Read]); + } + + [Fact] + public void Resolve_Uses_Override_When_Configured() + { + var custom = new CapabilityPolicy(TimeoutSeconds: 42, RetryCount: 7, BreakerFailureThreshold: 9); + var options = new DriverResilienceOptions + { + Tier = DriverTier.A, + CapabilityPolicies = new Dictionary + { + [DriverCapability.Read] = custom, + }, + }; + + options.Resolve(DriverCapability.Read).ShouldBe(custom); + options.Resolve(DriverCapability.Write).ShouldBe( + DriverResilienceOptions.GetTierDefaults(DriverTier.A)[DriverCapability.Write]); + } +} diff --git a/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/DriverResiliencePipelineBuilderTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/DriverResiliencePipelineBuilderTests.cs new file mode 100644 index 0000000..67a3d1d --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/DriverResiliencePipelineBuilderTests.cs @@ -0,0 +1,222 @@ +using Polly.CircuitBreaker; +using Polly.Timeout; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.Core.Resilience; + +namespace ZB.MOM.WW.OtOpcUa.Core.Tests.Resilience; + +[Trait("Category", "Unit")] +public sealed class DriverResiliencePipelineBuilderTests +{ + private static readonly DriverResilienceOptions TierAOptions = new() { Tier = DriverTier.A }; + + [Fact] + public async Task Read_Retries_Transient_Failures() + { + var builder = new DriverResiliencePipelineBuilder(); + var pipeline = builder.GetOrCreate(Guid.NewGuid(), "host-1", DriverCapability.Read, TierAOptions); + var attempts = 0; + + await pipeline.ExecuteAsync(async _ => + { + attempts++; + if (attempts < 3) throw new InvalidOperationException("transient"); + await Task.Yield(); + }); + + attempts.ShouldBe(3); + } + + [Fact] + public async Task Write_DoesNotRetry_OnFailure() + { + var builder = new DriverResiliencePipelineBuilder(); + var pipeline = builder.GetOrCreate(Guid.NewGuid(), "host-1", DriverCapability.Write, TierAOptions); + var attempts = 0; + + var ex = await Should.ThrowAsync(async () => + { + await pipeline.ExecuteAsync(async _ => + { + attempts++; + await Task.Yield(); + throw new InvalidOperationException("boom"); + }); + }); + + attempts.ShouldBe(1); + ex.Message.ShouldBe("boom"); + } + + [Fact] + public async Task AlarmAcknowledge_DoesNotRetry_OnFailure() + { + var builder = new DriverResiliencePipelineBuilder(); + var pipeline = builder.GetOrCreate(Guid.NewGuid(), "host-1", DriverCapability.AlarmAcknowledge, TierAOptions); + var attempts = 0; + + await Should.ThrowAsync(async () => + { + await pipeline.ExecuteAsync(async _ => + { + attempts++; + await Task.Yield(); + throw new InvalidOperationException("boom"); + }); + }); + + attempts.ShouldBe(1); + } + + [Fact] + public void Pipeline_IsIsolated_PerHost() + { + var builder = new DriverResiliencePipelineBuilder(); + var driverId = Guid.NewGuid(); + + var hostA = builder.GetOrCreate(driverId, "host-a", DriverCapability.Read, TierAOptions); + var hostB = builder.GetOrCreate(driverId, "host-b", DriverCapability.Read, TierAOptions); + + hostA.ShouldNotBeSameAs(hostB); + builder.CachedPipelineCount.ShouldBe(2); + } + + [Fact] + public void Pipeline_IsReused_ForSameTriple() + { + var builder = new DriverResiliencePipelineBuilder(); + var driverId = Guid.NewGuid(); + + var first = builder.GetOrCreate(driverId, "host-a", DriverCapability.Read, TierAOptions); + var second = builder.GetOrCreate(driverId, "host-a", DriverCapability.Read, TierAOptions); + + first.ShouldBeSameAs(second); + builder.CachedPipelineCount.ShouldBe(1); + } + + [Fact] + public void Pipeline_IsIsolated_PerCapability() + { + var builder = new DriverResiliencePipelineBuilder(); + var driverId = Guid.NewGuid(); + + var read = builder.GetOrCreate(driverId, "host-a", DriverCapability.Read, TierAOptions); + var write = builder.GetOrCreate(driverId, "host-a", DriverCapability.Write, TierAOptions); + + read.ShouldNotBeSameAs(write); + } + + [Fact] + public async Task DeadHost_DoesNotOpenBreaker_ForSiblingHost() + { + var builder = new DriverResiliencePipelineBuilder(); + var driverId = Guid.NewGuid(); + + var deadHost = builder.GetOrCreate(driverId, "dead-plc", DriverCapability.Read, TierAOptions); + var liveHost = builder.GetOrCreate(driverId, "live-plc", DriverCapability.Read, TierAOptions); + + var threshold = TierAOptions.Resolve(DriverCapability.Read).BreakerFailureThreshold; + for (var i = 0; i < threshold + 5; i++) + { + await Should.ThrowAsync(async () => + await deadHost.ExecuteAsync(async _ => + { + await Task.Yield(); + throw new InvalidOperationException("dead plc"); + })); + } + + var liveAttempts = 0; + await liveHost.ExecuteAsync(async _ => + { + liveAttempts++; + await Task.Yield(); + }); + + liveAttempts.ShouldBe(1, "healthy sibling host must not be affected by dead peer"); + } + + [Fact] + public async Task CircuitBreaker_Opens_AfterFailureThreshold_OnTierA() + { + var builder = new DriverResiliencePipelineBuilder(); + var pipeline = builder.GetOrCreate(Guid.NewGuid(), "host-1", DriverCapability.Write, TierAOptions); + + var threshold = TierAOptions.Resolve(DriverCapability.Write).BreakerFailureThreshold; + for (var i = 0; i < threshold; i++) + { + await Should.ThrowAsync(async () => + await pipeline.ExecuteAsync(async _ => + { + await Task.Yield(); + throw new InvalidOperationException("boom"); + })); + } + + await Should.ThrowAsync(async () => + await pipeline.ExecuteAsync(async _ => + { + await Task.Yield(); + })); + } + + [Fact] + public async Task Timeout_Cancels_SlowOperation() + { + var tierAWithShortTimeout = new DriverResilienceOptions + { + Tier = DriverTier.A, + CapabilityPolicies = new Dictionary + { + [DriverCapability.Read] = new(TimeoutSeconds: 1, RetryCount: 0, BreakerFailureThreshold: 5), + }, + }; + var builder = new DriverResiliencePipelineBuilder(); + var pipeline = builder.GetOrCreate(Guid.NewGuid(), "host-1", DriverCapability.Read, tierAWithShortTimeout); + + await Should.ThrowAsync(async () => + await pipeline.ExecuteAsync(async ct => + { + await Task.Delay(TimeSpan.FromSeconds(5), ct); + })); + } + + [Fact] + public void Invalidate_Removes_OnlyMatchingInstance() + { + var builder = new DriverResiliencePipelineBuilder(); + var keepId = Guid.NewGuid(); + var dropId = Guid.NewGuid(); + + builder.GetOrCreate(keepId, "h", DriverCapability.Read, TierAOptions); + builder.GetOrCreate(keepId, "h", DriverCapability.Write, TierAOptions); + builder.GetOrCreate(dropId, "h", DriverCapability.Read, TierAOptions); + + var removed = builder.Invalidate(dropId); + + removed.ShouldBe(1); + builder.CachedPipelineCount.ShouldBe(2); + } + + [Fact] + public async Task Cancellation_IsNot_Retried() + { + var builder = new DriverResiliencePipelineBuilder(); + var pipeline = builder.GetOrCreate(Guid.NewGuid(), "host-1", DriverCapability.Read, TierAOptions); + var attempts = 0; + using var cts = new CancellationTokenSource(); + cts.Cancel(); + + await Should.ThrowAsync(async () => + await pipeline.ExecuteAsync(async ct => + { + attempts++; + ct.ThrowIfCancellationRequested(); + await Task.Yield(); + }, cts.Token)); + + attempts.ShouldBeLessThanOrEqualTo(1); + } +}