Files
lmxopcua/src/ZB.MOM.WW.OtOpcUa.Core/Resilience/DriverResiliencePipelineBuilder.cs
Joseph Doherty b6d2803ff6 Phase 6.1 Stream A — switch pipeline keys from Guid to string to match IDriver.DriverInstanceId
IDriver.DriverInstanceId is declared as string in Core.Abstractions; keeping
the pipeline key as Guid meant every call site would need .ToString() / Guid.Parse
at the boundary. Switching the Resilience types to string removes that friction
and lets OtOpcUaServer pass driver.DriverInstanceId directly to the builder in
the upcoming server-dispatch wiring PR.

- DriverResiliencePipelineBuilder.GetOrCreate + Invalidate + PipelineKey
- CapabilityInvoker.ctor + _driverInstanceId field

Tests: all 48 Core.Tests still pass. The Invalidate test's keepId / dropId now
use distinct "drv-keep" / "drv-drop" literals (previously both were distinct
Guid.NewGuid() values, which the sed-driven refactor had collapsed to the same
literal — caught pre-commit).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-19 07:18:55 -04:00

119 lines
5.1 KiB
C#

using System.Collections.Concurrent;
using Polly;
using Polly.CircuitBreaker;
using Polly.Retry;
using Polly.Timeout;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
namespace ZB.MOM.WW.OtOpcUa.Core.Resilience;
/// <summary>
/// Builds and caches Polly resilience pipelines keyed on
/// <c>(DriverInstanceId, HostName, DriverCapability)</c>. One dead PLC behind a multi-device
/// driver cannot open the circuit breaker for healthy sibling hosts.
/// </summary>
/// <remarks>
/// Per <c>docs/v2/plan.md</c> decision #144 (per-device isolation). Composition from outside-in:
/// <b>Timeout → Retry (when capability permits) → Circuit Breaker (when tier permits) → Bulkhead</b>.
///
/// <para>Pipeline resolution is lock-free on the hot path: the inner
/// <see cref="ConcurrentDictionary{TKey,TValue}"/> caches a <see cref="ResiliencePipeline"/> per key;
/// first-call cost is one <see cref="ResiliencePipelineBuilder"/>.Build. Thereafter reads are O(1).</para>
/// </remarks>
public sealed class DriverResiliencePipelineBuilder
{
private readonly ConcurrentDictionary<PipelineKey, ResiliencePipeline> _pipelines = new();
private readonly TimeProvider _timeProvider;
/// <summary>Construct with the ambient clock (use <see cref="TimeProvider.System"/> in prod).</summary>
public DriverResiliencePipelineBuilder(TimeProvider? timeProvider = null)
{
_timeProvider = timeProvider ?? TimeProvider.System;
}
/// <summary>
/// Get or build the pipeline for a given <c>(driver instance, host, capability)</c> triple.
/// Calls with the same key + same options reuse the same pipeline instance; the first caller
/// wins if a race occurs (both pipelines would be behaviourally identical).
/// </summary>
/// <param name="driverInstanceId">DriverInstance primary key — opaque to this layer.</param>
/// <param name="hostName">
/// Host the call targets. For single-host drivers (Galaxy, some OPC UA Client configs) pass the
/// driver's canonical host string. For multi-host drivers (Modbus with N PLCs), pass the
/// specific PLC so one dead PLC doesn't poison healthy siblings.
/// </param>
/// <param name="capability">Which capability surface is being called.</param>
/// <param name="options">Per-driver-instance options (tier + per-capability overrides).</param>
public ResiliencePipeline GetOrCreate(
string driverInstanceId,
string hostName,
DriverCapability capability,
DriverResilienceOptions options)
{
ArgumentNullException.ThrowIfNull(options);
ArgumentException.ThrowIfNullOrWhiteSpace(hostName);
var key = new PipelineKey(driverInstanceId, hostName, capability);
return _pipelines.GetOrAdd(key, static (_, state) => Build(state.capability, state.options, state.timeProvider),
(capability, options, timeProvider: _timeProvider));
}
/// <summary>Drop cached pipelines for one driver instance (e.g. on ResilienceConfig change). Test + Admin-reload use.</summary>
public int Invalidate(string driverInstanceId)
{
var removed = 0;
foreach (var key in _pipelines.Keys)
{
if (key.DriverInstanceId == driverInstanceId && _pipelines.TryRemove(key, out _))
removed++;
}
return removed;
}
/// <summary>Snapshot of the current number of cached pipelines. For diagnostics only.</summary>
public int CachedPipelineCount => _pipelines.Count;
private static ResiliencePipeline Build(
DriverCapability capability,
DriverResilienceOptions options,
TimeProvider timeProvider)
{
var policy = options.Resolve(capability);
var builder = new ResiliencePipelineBuilder { TimeProvider = timeProvider };
builder.AddTimeout(new TimeoutStrategyOptions
{
Timeout = TimeSpan.FromSeconds(policy.TimeoutSeconds),
});
if (policy.RetryCount > 0)
{
builder.AddRetry(new RetryStrategyOptions
{
MaxRetryAttempts = policy.RetryCount,
BackoffType = DelayBackoffType.Exponential,
UseJitter = true,
Delay = TimeSpan.FromMilliseconds(100),
MaxDelay = TimeSpan.FromSeconds(5),
ShouldHandle = new PredicateBuilder().Handle<Exception>(ex => ex is not OperationCanceledException),
});
}
if (policy.BreakerFailureThreshold > 0)
{
builder.AddCircuitBreaker(new CircuitBreakerStrategyOptions
{
FailureRatio = 1.0,
MinimumThroughput = policy.BreakerFailureThreshold,
SamplingDuration = TimeSpan.FromSeconds(30),
BreakDuration = TimeSpan.FromSeconds(15),
ShouldHandle = new PredicateBuilder().Handle<Exception>(ex => ex is not OperationCanceledException),
});
}
return builder.Build();
}
private readonly record struct PipelineKey(string DriverInstanceId, string HostName, DriverCapability Capability);
}