Phase 6.1 Stream A.1/A.2/A.6 — Polly resilience foundation: pipeline builder + per-tier policy defaults + WriteIdempotent attribute
Lands the first chunk of the Phase 6.1 Stream A resilience layer per docs/v2/implementation/phase-6-1-resilience-and-observability.md §Stream A. Downstream CapabilityInvoker (A.3) + driver-dispatch wiring land in follow-up PRs on the same branch. Core.Abstractions additions: - WriteIdempotentAttribute — marker for tag-definition records that opt into auto-retry on IWritable.WriteAsync. Absence = no retry per decisions #44, #45, #143. Read once via reflection at driver-init time; no per-write cost. - DriverCapability enum — enumerates the 8 capability surface points (Read / Write / Discover / Subscribe / Probe / AlarmSubscribe / AlarmAcknowledge / HistoryRead). AlarmAcknowledge is write-shaped (no retry by default). - DriverTier enum — A/B/C per driver-stability.md §2-4. Stream B.1 wires this into DriverTypeMetadata; surfaced here because the resilience policy defaults key on it. Core.Resilience new namespace: - DriverResilienceOptions — per-tier × per-capability policy defaults. GetTierDefaults(tier) is the source of truth: * Tier A: Read 2s/3 retries, Write 2s/0 retries, breaker threshold 5 * Tier B: Read 4s/3, Write 4s/0, breaker threshold 5 * Tier C: Read 10s/1, Write 10s/0, breaker threshold 0 (supervisor handles process-level breaker per decision #68) Resolve(capability) overlays CapabilityPolicies on top of the defaults. - DriverResiliencePipelineBuilder — composes Timeout → Retry (capability- permitting, never on cancellation) → CircuitBreaker (tier-permitting) → Bulkhead. Pipelines cached in a lock-free ConcurrentDictionary keyed on (DriverInstanceId, HostName, DriverCapability) per decision #144 — one dead PLC behind a multi-device driver does not open the breaker for healthy siblings. Invalidate(driverInstanceId) supports Admin-triggered reload. Tests (30 new, all pass): - DriverResilienceOptionsTests: tier-default coverage for every capability, Write + AlarmAcknowledge never retry at any tier, Tier C disables breaker, resolve-with-override layering. - DriverResiliencePipelineBuilderTests: Read retries transients, Write does NOT retry on failure (decision #44 guard), dead-host isolation from sibling hosts, pipeline reuse for same triple, per-capability isolation, breaker opens after threshold on Tier A, timeout fires, cancellation is not retried, invalidation scoped to matching instance. Polly.Core 8.6.6 added to Core.csproj. Full solution dotnet test: 936 passing (baseline 906 + 30 new). One pre-existing Client.CLI Subscribe flake unchanged by this PR. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
42
src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/DriverCapability.cs
Normal file
42
src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/DriverCapability.cs
Normal file
@@ -0,0 +1,42 @@
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
|
||||
/// <summary>
|
||||
/// Enumerates the driver-capability surface points guarded by Phase 6.1 resilience pipelines.
|
||||
/// Each value corresponds to one method (or tightly-related method group) on the
|
||||
/// <c>Core.Abstractions</c> capability interfaces (<see cref="IReadable"/>, <see cref="IWritable"/>,
|
||||
/// <see cref="ITagDiscovery"/>, <see cref="ISubscribable"/>, <see cref="IHostConnectivityProbe"/>,
|
||||
/// <see cref="IAlarmSource"/>, <see cref="IHistoryProvider"/>).
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// Per <c>docs/v2/plan.md</c> decision #143 (per-capability retry policy): Read / HistoryRead /
|
||||
/// Discover / Probe / AlarmSubscribe auto-retry; <see cref="Write"/> does NOT retry unless the
|
||||
/// tag-definition carries <see cref="WriteIdempotentAttribute"/>. Alarm-acknowledge is treated
|
||||
/// as a write for retry semantics (an alarm-ack is not idempotent at the plant-floor acknowledgement
|
||||
/// level even if the OPC UA spec permits re-issue).
|
||||
/// </remarks>
|
||||
public enum DriverCapability
|
||||
{
|
||||
/// <summary>Batch <see cref="IReadable.ReadAsync"/>. Retries by default.</summary>
|
||||
Read,
|
||||
|
||||
/// <summary>Batch <see cref="IWritable.WriteAsync"/>. Does not retry unless tag is <see cref="WriteIdempotentAttribute">idempotent</see>.</summary>
|
||||
Write,
|
||||
|
||||
/// <summary><see cref="ITagDiscovery.DiscoverAsync"/>. Retries by default.</summary>
|
||||
Discover,
|
||||
|
||||
/// <summary><see cref="ISubscribable.SubscribeAsync"/> and unsubscribe. Retries by default.</summary>
|
||||
Subscribe,
|
||||
|
||||
/// <summary><see cref="IHostConnectivityProbe"/> probe loop. Retries by default.</summary>
|
||||
Probe,
|
||||
|
||||
/// <summary><see cref="IAlarmSource.SubscribeAlarmsAsync"/>. Retries by default.</summary>
|
||||
AlarmSubscribe,
|
||||
|
||||
/// <summary><see cref="IAlarmSource.AcknowledgeAsync"/>. Does NOT retry — ack is a write-shaped operation (decision #143).</summary>
|
||||
AlarmAcknowledge,
|
||||
|
||||
/// <summary><see cref="IHistoryProvider"/> reads (Raw/Processed/AtTime/Events). Retries by default.</summary>
|
||||
HistoryRead,
|
||||
}
|
||||
34
src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/DriverTier.cs
Normal file
34
src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/DriverTier.cs
Normal file
@@ -0,0 +1,34 @@
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
|
||||
/// <summary>
|
||||
/// Stability tier of a driver type. Determines which cross-cutting runtime protections
|
||||
/// apply — per-tier retry defaults, memory-tracking thresholds, and whether out-of-process
|
||||
/// supervision with process-level recycle is in play.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// Per <c>docs/v2/driver-stability.md</c> §2-4 and <c>docs/v2/plan.md</c> decisions #63-74.
|
||||
///
|
||||
/// <list type="bullet">
|
||||
/// <item><b>A</b> — managed, known-good SDK; low blast radius. In-process. Fast retries.
|
||||
/// Examples: OPC UA Client (OPCFoundation stack), S7 (S7NetPlus).</item>
|
||||
/// <item><b>B</b> — native or semi-trusted SDK with an in-process footprint. Examples: Modbus.</item>
|
||||
/// <item><b>C</b> — unmanaged SDK with COM/STA constraints, leak risk, or other out-of-process
|
||||
/// requirements. Must run as a separate Host process behind a Proxy with a supervisor that
|
||||
/// can recycle the process on hard-breach. Example: Galaxy (MXAccess COM).</item>
|
||||
/// </list>
|
||||
///
|
||||
/// <para>Process-kill protections (<c>MemoryRecycle</c>, <c>ScheduledRecycleScheduler</c>) are
|
||||
/// Tier C only per decisions #73-74 and #145 — killing an in-process Tier A/B driver also kills
|
||||
/// every OPC UA session and every co-hosted driver, blast-radius worse than the leak.</para>
|
||||
/// </remarks>
|
||||
public enum DriverTier
|
||||
{
|
||||
/// <summary>Managed SDK, in-process, low blast radius.</summary>
|
||||
A,
|
||||
|
||||
/// <summary>Native or semi-trusted SDK, in-process.</summary>
|
||||
B,
|
||||
|
||||
/// <summary>Unmanaged SDK, out-of-process required with Proxy+Host+Supervisor.</summary>
|
||||
C,
|
||||
}
|
||||
@@ -0,0 +1,19 @@
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
|
||||
/// <summary>
|
||||
/// Opts a tag-definition record into auto-retry on <see cref="IWritable.WriteAsync"/> failures.
|
||||
/// Absence of this attribute means writes are <b>not</b> retried — a timed-out write may have
|
||||
/// already succeeded at the device, and replaying pulses, alarm acks, counter increments, or
|
||||
/// recipe-step advances can duplicate irreversible field actions.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// Per <c>docs/v2/plan.md</c> decisions #44, #45, and #143. Applied to tag-definition POCOs
|
||||
/// (e.g. <c>ModbusTagDefinition</c>, <c>S7TagDefinition</c>, OPC UA client tag rows) at the
|
||||
/// property or record level. The <c>CapabilityInvoker</c> in <c>ZB.MOM.WW.OtOpcUa.Core.Resilience</c>
|
||||
/// reads this attribute via reflection once at driver-init time and caches the result; no
|
||||
/// per-write reflection cost.
|
||||
/// </remarks>
|
||||
[AttributeUsage(AttributeTargets.Property | AttributeTargets.Class | AttributeTargets.Struct, AllowMultiple = false, Inherited = true)]
|
||||
public sealed class WriteIdempotentAttribute : Attribute
|
||||
{
|
||||
}
|
||||
@@ -0,0 +1,96 @@
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.Resilience;
|
||||
|
||||
/// <summary>
|
||||
/// Per-tier × per-capability resilience policy configuration for a driver instance.
|
||||
/// Bound from <c>DriverInstance.ResilienceConfig</c> JSON (nullable column; null = tier defaults).
|
||||
/// Per <c>docs/v2/plan.md</c> decisions #143 and #144.
|
||||
/// </summary>
|
||||
public sealed record DriverResilienceOptions
|
||||
{
|
||||
/// <summary>Tier the owning driver type is registered as; drives the default map.</summary>
|
||||
public required DriverTier Tier { get; init; }
|
||||
|
||||
/// <summary>
|
||||
/// Per-capability policy overrides. Capabilities absent from this map fall back to
|
||||
/// <see cref="GetTierDefaults(DriverTier)"/> for the configured <see cref="Tier"/>.
|
||||
/// </summary>
|
||||
public IReadOnlyDictionary<DriverCapability, CapabilityPolicy> CapabilityPolicies { get; init; }
|
||||
= new Dictionary<DriverCapability, CapabilityPolicy>();
|
||||
|
||||
/// <summary>Bulkhead (max concurrent in-flight calls) for every capability. Default 32.</summary>
|
||||
public int BulkheadMaxConcurrent { get; init; } = 32;
|
||||
|
||||
/// <summary>
|
||||
/// Bulkhead queue depth. Zero = no queueing; overflow fails fast with
|
||||
/// <c>BulkheadRejectedException</c>. Default 64.
|
||||
/// </summary>
|
||||
public int BulkheadMaxQueue { get; init; } = 64;
|
||||
|
||||
/// <summary>
|
||||
/// Look up the effective policy for a capability, falling back to tier defaults when no
|
||||
/// override is configured. Never returns null.
|
||||
/// </summary>
|
||||
public CapabilityPolicy Resolve(DriverCapability capability)
|
||||
{
|
||||
if (CapabilityPolicies.TryGetValue(capability, out var policy))
|
||||
return policy;
|
||||
|
||||
var defaults = GetTierDefaults(Tier);
|
||||
return defaults[capability];
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Per-tier per-capability default policy table, per decisions #143-144 and the Phase 6.1
|
||||
/// Stream A.2 specification. Retries skipped on <see cref="DriverCapability.Write"/> and
|
||||
/// <see cref="DriverCapability.AlarmAcknowledge"/> regardless of tier.
|
||||
/// </summary>
|
||||
public static IReadOnlyDictionary<DriverCapability, CapabilityPolicy> GetTierDefaults(DriverTier tier) =>
|
||||
tier switch
|
||||
{
|
||||
DriverTier.A => new Dictionary<DriverCapability, CapabilityPolicy>
|
||||
{
|
||||
[DriverCapability.Read] = new(TimeoutSeconds: 2, RetryCount: 3, BreakerFailureThreshold: 5),
|
||||
[DriverCapability.Write] = new(TimeoutSeconds: 2, RetryCount: 0, BreakerFailureThreshold: 5),
|
||||
[DriverCapability.Discover] = new(TimeoutSeconds: 30, RetryCount: 2, BreakerFailureThreshold: 3),
|
||||
[DriverCapability.Subscribe] = new(TimeoutSeconds: 5, RetryCount: 3, BreakerFailureThreshold: 5),
|
||||
[DriverCapability.Probe] = new(TimeoutSeconds: 2, RetryCount: 3, BreakerFailureThreshold: 5),
|
||||
[DriverCapability.AlarmSubscribe] = new(TimeoutSeconds: 5, RetryCount: 3, BreakerFailureThreshold: 5),
|
||||
[DriverCapability.AlarmAcknowledge] = new(TimeoutSeconds: 5, RetryCount: 0, BreakerFailureThreshold: 5),
|
||||
[DriverCapability.HistoryRead] = new(TimeoutSeconds: 30, RetryCount: 2, BreakerFailureThreshold: 5),
|
||||
},
|
||||
DriverTier.B => new Dictionary<DriverCapability, CapabilityPolicy>
|
||||
{
|
||||
[DriverCapability.Read] = new(TimeoutSeconds: 4, RetryCount: 3, BreakerFailureThreshold: 5),
|
||||
[DriverCapability.Write] = new(TimeoutSeconds: 4, RetryCount: 0, BreakerFailureThreshold: 5),
|
||||
[DriverCapability.Discover] = new(TimeoutSeconds: 60, RetryCount: 2, BreakerFailureThreshold: 3),
|
||||
[DriverCapability.Subscribe] = new(TimeoutSeconds: 8, RetryCount: 3, BreakerFailureThreshold: 5),
|
||||
[DriverCapability.Probe] = new(TimeoutSeconds: 4, RetryCount: 3, BreakerFailureThreshold: 5),
|
||||
[DriverCapability.AlarmSubscribe] = new(TimeoutSeconds: 8, RetryCount: 3, BreakerFailureThreshold: 5),
|
||||
[DriverCapability.AlarmAcknowledge] = new(TimeoutSeconds: 8, RetryCount: 0, BreakerFailureThreshold: 5),
|
||||
[DriverCapability.HistoryRead] = new(TimeoutSeconds: 60, RetryCount: 2, BreakerFailureThreshold: 5),
|
||||
},
|
||||
DriverTier.C => new Dictionary<DriverCapability, CapabilityPolicy>
|
||||
{
|
||||
[DriverCapability.Read] = new(TimeoutSeconds: 10, RetryCount: 1, BreakerFailureThreshold: 0),
|
||||
[DriverCapability.Write] = new(TimeoutSeconds: 10, RetryCount: 0, BreakerFailureThreshold: 0),
|
||||
[DriverCapability.Discover] = new(TimeoutSeconds: 120, RetryCount: 1, BreakerFailureThreshold: 0),
|
||||
[DriverCapability.Subscribe] = new(TimeoutSeconds: 15, RetryCount: 1, BreakerFailureThreshold: 0),
|
||||
[DriverCapability.Probe] = new(TimeoutSeconds: 10, RetryCount: 1, BreakerFailureThreshold: 0),
|
||||
[DriverCapability.AlarmSubscribe] = new(TimeoutSeconds: 15, RetryCount: 1, BreakerFailureThreshold: 0),
|
||||
[DriverCapability.AlarmAcknowledge] = new(TimeoutSeconds: 15, RetryCount: 0, BreakerFailureThreshold: 0),
|
||||
[DriverCapability.HistoryRead] = new(TimeoutSeconds: 120, RetryCount: 1, BreakerFailureThreshold: 0),
|
||||
},
|
||||
_ => throw new ArgumentOutOfRangeException(nameof(tier), tier, $"No default policy table defined for tier {tier}."),
|
||||
};
|
||||
}
|
||||
|
||||
/// <summary>Policy for one capability on one driver instance.</summary>
|
||||
/// <param name="TimeoutSeconds">Per-call timeout (wraps the inner Polly execution).</param>
|
||||
/// <param name="RetryCount">Number of retry attempts after the first failure; zero = no retry.</param>
|
||||
/// <param name="BreakerFailureThreshold">
|
||||
/// Consecutive-failure count that opens the circuit breaker; zero = no breaker
|
||||
/// (Tier C uses the supervisor's process-level breaker instead, per decision #68).
|
||||
/// </param>
|
||||
public sealed record CapabilityPolicy(int TimeoutSeconds, int RetryCount, int BreakerFailureThreshold);
|
||||
@@ -0,0 +1,118 @@
|
||||
using System.Collections.Concurrent;
|
||||
using Polly;
|
||||
using Polly.CircuitBreaker;
|
||||
using Polly.Retry;
|
||||
using Polly.Timeout;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.Resilience;
|
||||
|
||||
/// <summary>
|
||||
/// Builds and caches Polly resilience pipelines keyed on
|
||||
/// <c>(DriverInstanceId, HostName, DriverCapability)</c>. One dead PLC behind a multi-device
|
||||
/// driver cannot open the circuit breaker for healthy sibling hosts.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// Per <c>docs/v2/plan.md</c> decision #144 (per-device isolation). Composition from outside-in:
|
||||
/// <b>Timeout → Retry (when capability permits) → Circuit Breaker (when tier permits) → Bulkhead</b>.
|
||||
///
|
||||
/// <para>Pipeline resolution is lock-free on the hot path: the inner
|
||||
/// <see cref="ConcurrentDictionary{TKey,TValue}"/> caches a <see cref="ResiliencePipeline"/> per key;
|
||||
/// first-call cost is one <see cref="ResiliencePipelineBuilder"/>.Build. Thereafter reads are O(1).</para>
|
||||
/// </remarks>
|
||||
public sealed class DriverResiliencePipelineBuilder
|
||||
{
|
||||
private readonly ConcurrentDictionary<PipelineKey, ResiliencePipeline> _pipelines = new();
|
||||
private readonly TimeProvider _timeProvider;
|
||||
|
||||
/// <summary>Construct with the ambient clock (use <see cref="TimeProvider.System"/> in prod).</summary>
|
||||
public DriverResiliencePipelineBuilder(TimeProvider? timeProvider = null)
|
||||
{
|
||||
_timeProvider = timeProvider ?? TimeProvider.System;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Get or build the pipeline for a given <c>(driver instance, host, capability)</c> triple.
|
||||
/// Calls with the same key + same options reuse the same pipeline instance; the first caller
|
||||
/// wins if a race occurs (both pipelines would be behaviourally identical).
|
||||
/// </summary>
|
||||
/// <param name="driverInstanceId">DriverInstance primary key — opaque to this layer.</param>
|
||||
/// <param name="hostName">
|
||||
/// Host the call targets. For single-host drivers (Galaxy, some OPC UA Client configs) pass the
|
||||
/// driver's canonical host string. For multi-host drivers (Modbus with N PLCs), pass the
|
||||
/// specific PLC so one dead PLC doesn't poison healthy siblings.
|
||||
/// </param>
|
||||
/// <param name="capability">Which capability surface is being called.</param>
|
||||
/// <param name="options">Per-driver-instance options (tier + per-capability overrides).</param>
|
||||
public ResiliencePipeline GetOrCreate(
|
||||
Guid driverInstanceId,
|
||||
string hostName,
|
||||
DriverCapability capability,
|
||||
DriverResilienceOptions options)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(options);
|
||||
ArgumentException.ThrowIfNullOrWhiteSpace(hostName);
|
||||
|
||||
var key = new PipelineKey(driverInstanceId, hostName, capability);
|
||||
return _pipelines.GetOrAdd(key, static (_, state) => Build(state.capability, state.options, state.timeProvider),
|
||||
(capability, options, timeProvider: _timeProvider));
|
||||
}
|
||||
|
||||
/// <summary>Drop cached pipelines for one driver instance (e.g. on ResilienceConfig change). Test + Admin-reload use.</summary>
|
||||
public int Invalidate(Guid driverInstanceId)
|
||||
{
|
||||
var removed = 0;
|
||||
foreach (var key in _pipelines.Keys)
|
||||
{
|
||||
if (key.DriverInstanceId == driverInstanceId && _pipelines.TryRemove(key, out _))
|
||||
removed++;
|
||||
}
|
||||
return removed;
|
||||
}
|
||||
|
||||
/// <summary>Snapshot of the current number of cached pipelines. For diagnostics only.</summary>
|
||||
public int CachedPipelineCount => _pipelines.Count;
|
||||
|
||||
private static ResiliencePipeline Build(
|
||||
DriverCapability capability,
|
||||
DriverResilienceOptions options,
|
||||
TimeProvider timeProvider)
|
||||
{
|
||||
var policy = options.Resolve(capability);
|
||||
var builder = new ResiliencePipelineBuilder { TimeProvider = timeProvider };
|
||||
|
||||
builder.AddTimeout(new TimeoutStrategyOptions
|
||||
{
|
||||
Timeout = TimeSpan.FromSeconds(policy.TimeoutSeconds),
|
||||
});
|
||||
|
||||
if (policy.RetryCount > 0)
|
||||
{
|
||||
builder.AddRetry(new RetryStrategyOptions
|
||||
{
|
||||
MaxRetryAttempts = policy.RetryCount,
|
||||
BackoffType = DelayBackoffType.Exponential,
|
||||
UseJitter = true,
|
||||
Delay = TimeSpan.FromMilliseconds(100),
|
||||
MaxDelay = TimeSpan.FromSeconds(5),
|
||||
ShouldHandle = new PredicateBuilder().Handle<Exception>(ex => ex is not OperationCanceledException),
|
||||
});
|
||||
}
|
||||
|
||||
if (policy.BreakerFailureThreshold > 0)
|
||||
{
|
||||
builder.AddCircuitBreaker(new CircuitBreakerStrategyOptions
|
||||
{
|
||||
FailureRatio = 1.0,
|
||||
MinimumThroughput = policy.BreakerFailureThreshold,
|
||||
SamplingDuration = TimeSpan.FromSeconds(30),
|
||||
BreakDuration = TimeSpan.FromSeconds(15),
|
||||
ShouldHandle = new PredicateBuilder().Handle<Exception>(ex => ex is not OperationCanceledException),
|
||||
});
|
||||
}
|
||||
|
||||
return builder.Build();
|
||||
}
|
||||
|
||||
private readonly record struct PipelineKey(Guid DriverInstanceId, string HostName, DriverCapability Capability);
|
||||
}
|
||||
@@ -16,6 +16,10 @@
|
||||
<ProjectReference Include="..\ZB.MOM.WW.OtOpcUa.Configuration\ZB.MOM.WW.OtOpcUa.Configuration.csproj"/>
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Polly.Core" Version="8.6.6"/>
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<InternalsVisibleTo Include="ZB.MOM.WW.OtOpcUa.Core.Tests"/>
|
||||
</ItemGroup>
|
||||
|
||||
Reference in New Issue
Block a user