Merge pull request (#78) - Phase 6.1 Stream A - Polly resilience + CapabilityInvoker + Read/Write/HistoryRead dispatch wrapping
This commit was merged in pull request #78.
This commit is contained in:
@@ -25,6 +25,14 @@ namespace ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
/// OPC UA <c>AlarmConditionState</c> when true. Defaults to false so existing non-Galaxy
|
||||
/// drivers aren't forced to flow a flag they don't produce.
|
||||
/// </param>
|
||||
/// <param name="WriteIdempotent">
|
||||
/// True when a timed-out or failed write to this attribute is safe to replay. Per
|
||||
/// <c>docs/v2/plan.md</c> decisions #44, #45, #143 — writes are NOT auto-retried by default
|
||||
/// because replaying a pulse / alarm-ack / counter-increment / recipe-step advance can
|
||||
/// duplicate field actions. Drivers flag only tags whose semantics make retry safe
|
||||
/// (holding registers with level-set values, set-point writes to analog tags) — the
|
||||
/// capability invoker respects this flag when deciding whether to apply Polly retry.
|
||||
/// </param>
|
||||
public sealed record DriverAttributeInfo(
|
||||
string FullName,
|
||||
DriverDataType DriverDataType,
|
||||
@@ -32,4 +40,5 @@ public sealed record DriverAttributeInfo(
|
||||
uint? ArrayDim,
|
||||
SecurityClassification SecurityClass,
|
||||
bool IsHistorized,
|
||||
bool IsAlarm = false);
|
||||
bool IsAlarm = false,
|
||||
bool WriteIdempotent = false);
|
||||
|
||||
42
src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/DriverCapability.cs
Normal file
42
src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/DriverCapability.cs
Normal file
@@ -0,0 +1,42 @@
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
|
||||
/// <summary>
|
||||
/// Enumerates the driver-capability surface points guarded by Phase 6.1 resilience pipelines.
|
||||
/// Each value corresponds to one method (or tightly-related method group) on the
|
||||
/// <c>Core.Abstractions</c> capability interfaces (<see cref="IReadable"/>, <see cref="IWritable"/>,
|
||||
/// <see cref="ITagDiscovery"/>, <see cref="ISubscribable"/>, <see cref="IHostConnectivityProbe"/>,
|
||||
/// <see cref="IAlarmSource"/>, <see cref="IHistoryProvider"/>).
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// Per <c>docs/v2/plan.md</c> decision #143 (per-capability retry policy): Read / HistoryRead /
|
||||
/// Discover / Probe / AlarmSubscribe auto-retry; <see cref="Write"/> does NOT retry unless the
|
||||
/// tag-definition carries <see cref="WriteIdempotentAttribute"/>. Alarm-acknowledge is treated
|
||||
/// as a write for retry semantics (an alarm-ack is not idempotent at the plant-floor acknowledgement
|
||||
/// level even if the OPC UA spec permits re-issue).
|
||||
/// </remarks>
|
||||
public enum DriverCapability
|
||||
{
|
||||
/// <summary>Batch <see cref="IReadable.ReadAsync"/>. Retries by default.</summary>
|
||||
Read,
|
||||
|
||||
/// <summary>Batch <see cref="IWritable.WriteAsync"/>. Does not retry unless tag is <see cref="WriteIdempotentAttribute">idempotent</see>.</summary>
|
||||
Write,
|
||||
|
||||
/// <summary><see cref="ITagDiscovery.DiscoverAsync"/>. Retries by default.</summary>
|
||||
Discover,
|
||||
|
||||
/// <summary><see cref="ISubscribable.SubscribeAsync"/> and unsubscribe. Retries by default.</summary>
|
||||
Subscribe,
|
||||
|
||||
/// <summary><see cref="IHostConnectivityProbe"/> probe loop. Retries by default.</summary>
|
||||
Probe,
|
||||
|
||||
/// <summary><see cref="IAlarmSource.SubscribeAlarmsAsync"/>. Retries by default.</summary>
|
||||
AlarmSubscribe,
|
||||
|
||||
/// <summary><see cref="IAlarmSource.AcknowledgeAsync"/>. Does NOT retry — ack is a write-shaped operation (decision #143).</summary>
|
||||
AlarmAcknowledge,
|
||||
|
||||
/// <summary><see cref="IHistoryProvider"/> reads (Raw/Processed/AtTime/Events). Retries by default.</summary>
|
||||
HistoryRead,
|
||||
}
|
||||
34
src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/DriverTier.cs
Normal file
34
src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/DriverTier.cs
Normal file
@@ -0,0 +1,34 @@
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
|
||||
/// <summary>
|
||||
/// Stability tier of a driver type. Determines which cross-cutting runtime protections
|
||||
/// apply — per-tier retry defaults, memory-tracking thresholds, and whether out-of-process
|
||||
/// supervision with process-level recycle is in play.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// Per <c>docs/v2/driver-stability.md</c> §2-4 and <c>docs/v2/plan.md</c> decisions #63-74.
|
||||
///
|
||||
/// <list type="bullet">
|
||||
/// <item><b>A</b> — managed, known-good SDK; low blast radius. In-process. Fast retries.
|
||||
/// Examples: OPC UA Client (OPCFoundation stack), S7 (S7NetPlus).</item>
|
||||
/// <item><b>B</b> — native or semi-trusted SDK with an in-process footprint. Examples: Modbus.</item>
|
||||
/// <item><b>C</b> — unmanaged SDK with COM/STA constraints, leak risk, or other out-of-process
|
||||
/// requirements. Must run as a separate Host process behind a Proxy with a supervisor that
|
||||
/// can recycle the process on hard-breach. Example: Galaxy (MXAccess COM).</item>
|
||||
/// </list>
|
||||
///
|
||||
/// <para>Process-kill protections (<c>MemoryRecycle</c>, <c>ScheduledRecycleScheduler</c>) are
|
||||
/// Tier C only per decisions #73-74 and #145 — killing an in-process Tier A/B driver also kills
|
||||
/// every OPC UA session and every co-hosted driver, blast-radius worse than the leak.</para>
|
||||
/// </remarks>
|
||||
public enum DriverTier
|
||||
{
|
||||
/// <summary>Managed SDK, in-process, low blast radius.</summary>
|
||||
A,
|
||||
|
||||
/// <summary>Native or semi-trusted SDK, in-process.</summary>
|
||||
B,
|
||||
|
||||
/// <summary>Unmanaged SDK, out-of-process required with Proxy+Host+Supervisor.</summary>
|
||||
C,
|
||||
}
|
||||
@@ -0,0 +1,19 @@
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
|
||||
/// <summary>
|
||||
/// Opts a tag-definition record into auto-retry on <see cref="IWritable.WriteAsync"/> failures.
|
||||
/// Absence of this attribute means writes are <b>not</b> retried — a timed-out write may have
|
||||
/// already succeeded at the device, and replaying pulses, alarm acks, counter increments, or
|
||||
/// recipe-step advances can duplicate irreversible field actions.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// Per <c>docs/v2/plan.md</c> decisions #44, #45, and #143. Applied to tag-definition POCOs
|
||||
/// (e.g. <c>ModbusTagDefinition</c>, <c>S7TagDefinition</c>, OPC UA client tag rows) at the
|
||||
/// property or record level. The <c>CapabilityInvoker</c> in <c>ZB.MOM.WW.OtOpcUa.Core.Resilience</c>
|
||||
/// reads this attribute via reflection once at driver-init time and caches the result; no
|
||||
/// per-write reflection cost.
|
||||
/// </remarks>
|
||||
[AttributeUsage(AttributeTargets.Property | AttributeTargets.Class | AttributeTargets.Struct, AllowMultiple = false, Inherited = true)]
|
||||
public sealed class WriteIdempotentAttribute : Attribute
|
||||
{
|
||||
}
|
||||
106
src/ZB.MOM.WW.OtOpcUa.Core/Resilience/CapabilityInvoker.cs
Normal file
106
src/ZB.MOM.WW.OtOpcUa.Core/Resilience/CapabilityInvoker.cs
Normal file
@@ -0,0 +1,106 @@
|
||||
using Polly;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.Resilience;
|
||||
|
||||
/// <summary>
|
||||
/// Executes driver-capability calls through a shared Polly pipeline. One invoker per
|
||||
/// <c>(DriverInstance, IDriver)</c> pair; the underlying <see cref="DriverResiliencePipelineBuilder"/>
|
||||
/// is process-singleton so all invokers share its cache.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// Per <c>docs/v2/plan.md</c> decisions #143-144 and Phase 6.1 Stream A.3. The server's dispatch
|
||||
/// layer routes every capability call (<c>IReadable.ReadAsync</c>, <c>IWritable.WriteAsync</c>,
|
||||
/// <c>ITagDiscovery.DiscoverAsync</c>, <c>ISubscribable.SubscribeAsync/UnsubscribeAsync</c>,
|
||||
/// <c>IHostConnectivityProbe</c> probe loop, <c>IAlarmSource.SubscribeAlarmsAsync/AcknowledgeAsync</c>,
|
||||
/// and all four <c>IHistoryProvider</c> reads) through this invoker.
|
||||
/// </remarks>
|
||||
public sealed class CapabilityInvoker
|
||||
{
|
||||
private readonly DriverResiliencePipelineBuilder _builder;
|
||||
private readonly string _driverInstanceId;
|
||||
private readonly Func<DriverResilienceOptions> _optionsAccessor;
|
||||
|
||||
/// <summary>
|
||||
/// Construct an invoker for one driver instance.
|
||||
/// </summary>
|
||||
/// <param name="builder">Shared, process-singleton pipeline builder.</param>
|
||||
/// <param name="driverInstanceId">The <c>DriverInstance.Id</c> column value.</param>
|
||||
/// <param name="optionsAccessor">
|
||||
/// Snapshot accessor for the current resilience options. Invoked per call so Admin-edit +
|
||||
/// pipeline-invalidate can take effect without restarting the invoker.
|
||||
/// </param>
|
||||
public CapabilityInvoker(
|
||||
DriverResiliencePipelineBuilder builder,
|
||||
string driverInstanceId,
|
||||
Func<DriverResilienceOptions> optionsAccessor)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(builder);
|
||||
ArgumentNullException.ThrowIfNull(optionsAccessor);
|
||||
|
||||
_builder = builder;
|
||||
_driverInstanceId = driverInstanceId;
|
||||
_optionsAccessor = optionsAccessor;
|
||||
}
|
||||
|
||||
/// <summary>Execute a capability call returning a value, honoring the per-capability pipeline.</summary>
|
||||
/// <typeparam name="TResult">Return type of the underlying driver call.</typeparam>
|
||||
public async ValueTask<TResult> ExecuteAsync<TResult>(
|
||||
DriverCapability capability,
|
||||
string hostName,
|
||||
Func<CancellationToken, ValueTask<TResult>> callSite,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(callSite);
|
||||
|
||||
var pipeline = ResolvePipeline(capability, hostName);
|
||||
return await pipeline.ExecuteAsync(callSite, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
/// <summary>Execute a void-returning capability call, honoring the per-capability pipeline.</summary>
|
||||
public async ValueTask ExecuteAsync(
|
||||
DriverCapability capability,
|
||||
string hostName,
|
||||
Func<CancellationToken, ValueTask> callSite,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(callSite);
|
||||
|
||||
var pipeline = ResolvePipeline(capability, hostName);
|
||||
await pipeline.ExecuteAsync(callSite, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Execute a <see cref="DriverCapability.Write"/> call honoring <see cref="WriteIdempotentAttribute"/>
|
||||
/// semantics — if <paramref name="isIdempotent"/> is <c>false</c>, retries are disabled regardless
|
||||
/// of the tag-level configuration (the pipeline for a non-idempotent write never retries per
|
||||
/// decisions #44-45). If <c>true</c>, the call runs through the capability's pipeline which may
|
||||
/// retry when the tier configuration permits.
|
||||
/// </summary>
|
||||
public async ValueTask<TResult> ExecuteWriteAsync<TResult>(
|
||||
string hostName,
|
||||
bool isIdempotent,
|
||||
Func<CancellationToken, ValueTask<TResult>> callSite,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(callSite);
|
||||
|
||||
if (!isIdempotent)
|
||||
{
|
||||
var noRetryOptions = _optionsAccessor() with
|
||||
{
|
||||
CapabilityPolicies = new Dictionary<DriverCapability, CapabilityPolicy>
|
||||
{
|
||||
[DriverCapability.Write] = _optionsAccessor().Resolve(DriverCapability.Write) with { RetryCount = 0 },
|
||||
},
|
||||
};
|
||||
var pipeline = _builder.GetOrCreate(_driverInstanceId, $"{hostName}::non-idempotent", DriverCapability.Write, noRetryOptions);
|
||||
return await pipeline.ExecuteAsync(callSite, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
return await ExecuteAsync(DriverCapability.Write, hostName, callSite, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
private ResiliencePipeline ResolvePipeline(DriverCapability capability, string hostName) =>
|
||||
_builder.GetOrCreate(_driverInstanceId, hostName, capability, _optionsAccessor());
|
||||
}
|
||||
@@ -0,0 +1,96 @@
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.Resilience;
|
||||
|
||||
/// <summary>
|
||||
/// Per-tier × per-capability resilience policy configuration for a driver instance.
|
||||
/// Bound from <c>DriverInstance.ResilienceConfig</c> JSON (nullable column; null = tier defaults).
|
||||
/// Per <c>docs/v2/plan.md</c> decisions #143 and #144.
|
||||
/// </summary>
|
||||
public sealed record DriverResilienceOptions
|
||||
{
|
||||
/// <summary>Tier the owning driver type is registered as; drives the default map.</summary>
|
||||
public required DriverTier Tier { get; init; }
|
||||
|
||||
/// <summary>
|
||||
/// Per-capability policy overrides. Capabilities absent from this map fall back to
|
||||
/// <see cref="GetTierDefaults(DriverTier)"/> for the configured <see cref="Tier"/>.
|
||||
/// </summary>
|
||||
public IReadOnlyDictionary<DriverCapability, CapabilityPolicy> CapabilityPolicies { get; init; }
|
||||
= new Dictionary<DriverCapability, CapabilityPolicy>();
|
||||
|
||||
/// <summary>Bulkhead (max concurrent in-flight calls) for every capability. Default 32.</summary>
|
||||
public int BulkheadMaxConcurrent { get; init; } = 32;
|
||||
|
||||
/// <summary>
|
||||
/// Bulkhead queue depth. Zero = no queueing; overflow fails fast with
|
||||
/// <c>BulkheadRejectedException</c>. Default 64.
|
||||
/// </summary>
|
||||
public int BulkheadMaxQueue { get; init; } = 64;
|
||||
|
||||
/// <summary>
|
||||
/// Look up the effective policy for a capability, falling back to tier defaults when no
|
||||
/// override is configured. Never returns null.
|
||||
/// </summary>
|
||||
public CapabilityPolicy Resolve(DriverCapability capability)
|
||||
{
|
||||
if (CapabilityPolicies.TryGetValue(capability, out var policy))
|
||||
return policy;
|
||||
|
||||
var defaults = GetTierDefaults(Tier);
|
||||
return defaults[capability];
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Per-tier per-capability default policy table, per decisions #143-144 and the Phase 6.1
|
||||
/// Stream A.2 specification. Retries skipped on <see cref="DriverCapability.Write"/> and
|
||||
/// <see cref="DriverCapability.AlarmAcknowledge"/> regardless of tier.
|
||||
/// </summary>
|
||||
public static IReadOnlyDictionary<DriverCapability, CapabilityPolicy> GetTierDefaults(DriverTier tier) =>
|
||||
tier switch
|
||||
{
|
||||
DriverTier.A => new Dictionary<DriverCapability, CapabilityPolicy>
|
||||
{
|
||||
[DriverCapability.Read] = new(TimeoutSeconds: 2, RetryCount: 3, BreakerFailureThreshold: 5),
|
||||
[DriverCapability.Write] = new(TimeoutSeconds: 2, RetryCount: 0, BreakerFailureThreshold: 5),
|
||||
[DriverCapability.Discover] = new(TimeoutSeconds: 30, RetryCount: 2, BreakerFailureThreshold: 3),
|
||||
[DriverCapability.Subscribe] = new(TimeoutSeconds: 5, RetryCount: 3, BreakerFailureThreshold: 5),
|
||||
[DriverCapability.Probe] = new(TimeoutSeconds: 2, RetryCount: 3, BreakerFailureThreshold: 5),
|
||||
[DriverCapability.AlarmSubscribe] = new(TimeoutSeconds: 5, RetryCount: 3, BreakerFailureThreshold: 5),
|
||||
[DriverCapability.AlarmAcknowledge] = new(TimeoutSeconds: 5, RetryCount: 0, BreakerFailureThreshold: 5),
|
||||
[DriverCapability.HistoryRead] = new(TimeoutSeconds: 30, RetryCount: 2, BreakerFailureThreshold: 5),
|
||||
},
|
||||
DriverTier.B => new Dictionary<DriverCapability, CapabilityPolicy>
|
||||
{
|
||||
[DriverCapability.Read] = new(TimeoutSeconds: 4, RetryCount: 3, BreakerFailureThreshold: 5),
|
||||
[DriverCapability.Write] = new(TimeoutSeconds: 4, RetryCount: 0, BreakerFailureThreshold: 5),
|
||||
[DriverCapability.Discover] = new(TimeoutSeconds: 60, RetryCount: 2, BreakerFailureThreshold: 3),
|
||||
[DriverCapability.Subscribe] = new(TimeoutSeconds: 8, RetryCount: 3, BreakerFailureThreshold: 5),
|
||||
[DriverCapability.Probe] = new(TimeoutSeconds: 4, RetryCount: 3, BreakerFailureThreshold: 5),
|
||||
[DriverCapability.AlarmSubscribe] = new(TimeoutSeconds: 8, RetryCount: 3, BreakerFailureThreshold: 5),
|
||||
[DriverCapability.AlarmAcknowledge] = new(TimeoutSeconds: 8, RetryCount: 0, BreakerFailureThreshold: 5),
|
||||
[DriverCapability.HistoryRead] = new(TimeoutSeconds: 60, RetryCount: 2, BreakerFailureThreshold: 5),
|
||||
},
|
||||
DriverTier.C => new Dictionary<DriverCapability, CapabilityPolicy>
|
||||
{
|
||||
[DriverCapability.Read] = new(TimeoutSeconds: 10, RetryCount: 1, BreakerFailureThreshold: 0),
|
||||
[DriverCapability.Write] = new(TimeoutSeconds: 10, RetryCount: 0, BreakerFailureThreshold: 0),
|
||||
[DriverCapability.Discover] = new(TimeoutSeconds: 120, RetryCount: 1, BreakerFailureThreshold: 0),
|
||||
[DriverCapability.Subscribe] = new(TimeoutSeconds: 15, RetryCount: 1, BreakerFailureThreshold: 0),
|
||||
[DriverCapability.Probe] = new(TimeoutSeconds: 10, RetryCount: 1, BreakerFailureThreshold: 0),
|
||||
[DriverCapability.AlarmSubscribe] = new(TimeoutSeconds: 15, RetryCount: 1, BreakerFailureThreshold: 0),
|
||||
[DriverCapability.AlarmAcknowledge] = new(TimeoutSeconds: 15, RetryCount: 0, BreakerFailureThreshold: 0),
|
||||
[DriverCapability.HistoryRead] = new(TimeoutSeconds: 120, RetryCount: 1, BreakerFailureThreshold: 0),
|
||||
},
|
||||
_ => throw new ArgumentOutOfRangeException(nameof(tier), tier, $"No default policy table defined for tier {tier}."),
|
||||
};
|
||||
}
|
||||
|
||||
/// <summary>Policy for one capability on one driver instance.</summary>
|
||||
/// <param name="TimeoutSeconds">Per-call timeout (wraps the inner Polly execution).</param>
|
||||
/// <param name="RetryCount">Number of retry attempts after the first failure; zero = no retry.</param>
|
||||
/// <param name="BreakerFailureThreshold">
|
||||
/// Consecutive-failure count that opens the circuit breaker; zero = no breaker
|
||||
/// (Tier C uses the supervisor's process-level breaker instead, per decision #68).
|
||||
/// </param>
|
||||
public sealed record CapabilityPolicy(int TimeoutSeconds, int RetryCount, int BreakerFailureThreshold);
|
||||
@@ -0,0 +1,118 @@
|
||||
using System.Collections.Concurrent;
|
||||
using Polly;
|
||||
using Polly.CircuitBreaker;
|
||||
using Polly.Retry;
|
||||
using Polly.Timeout;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.Resilience;
|
||||
|
||||
/// <summary>
|
||||
/// Builds and caches Polly resilience pipelines keyed on
|
||||
/// <c>(DriverInstanceId, HostName, DriverCapability)</c>. One dead PLC behind a multi-device
|
||||
/// driver cannot open the circuit breaker for healthy sibling hosts.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// Per <c>docs/v2/plan.md</c> decision #144 (per-device isolation). Composition from outside-in:
|
||||
/// <b>Timeout → Retry (when capability permits) → Circuit Breaker (when tier permits) → Bulkhead</b>.
|
||||
///
|
||||
/// <para>Pipeline resolution is lock-free on the hot path: the inner
|
||||
/// <see cref="ConcurrentDictionary{TKey,TValue}"/> caches a <see cref="ResiliencePipeline"/> per key;
|
||||
/// first-call cost is one <see cref="ResiliencePipelineBuilder"/>.Build. Thereafter reads are O(1).</para>
|
||||
/// </remarks>
|
||||
public sealed class DriverResiliencePipelineBuilder
|
||||
{
|
||||
private readonly ConcurrentDictionary<PipelineKey, ResiliencePipeline> _pipelines = new();
|
||||
private readonly TimeProvider _timeProvider;
|
||||
|
||||
/// <summary>Construct with the ambient clock (use <see cref="TimeProvider.System"/> in prod).</summary>
|
||||
public DriverResiliencePipelineBuilder(TimeProvider? timeProvider = null)
|
||||
{
|
||||
_timeProvider = timeProvider ?? TimeProvider.System;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Get or build the pipeline for a given <c>(driver instance, host, capability)</c> triple.
|
||||
/// Calls with the same key + same options reuse the same pipeline instance; the first caller
|
||||
/// wins if a race occurs (both pipelines would be behaviourally identical).
|
||||
/// </summary>
|
||||
/// <param name="driverInstanceId">DriverInstance primary key — opaque to this layer.</param>
|
||||
/// <param name="hostName">
|
||||
/// Host the call targets. For single-host drivers (Galaxy, some OPC UA Client configs) pass the
|
||||
/// driver's canonical host string. For multi-host drivers (Modbus with N PLCs), pass the
|
||||
/// specific PLC so one dead PLC doesn't poison healthy siblings.
|
||||
/// </param>
|
||||
/// <param name="capability">Which capability surface is being called.</param>
|
||||
/// <param name="options">Per-driver-instance options (tier + per-capability overrides).</param>
|
||||
public ResiliencePipeline GetOrCreate(
|
||||
string driverInstanceId,
|
||||
string hostName,
|
||||
DriverCapability capability,
|
||||
DriverResilienceOptions options)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(options);
|
||||
ArgumentException.ThrowIfNullOrWhiteSpace(hostName);
|
||||
|
||||
var key = new PipelineKey(driverInstanceId, hostName, capability);
|
||||
return _pipelines.GetOrAdd(key, static (_, state) => Build(state.capability, state.options, state.timeProvider),
|
||||
(capability, options, timeProvider: _timeProvider));
|
||||
}
|
||||
|
||||
/// <summary>Drop cached pipelines for one driver instance (e.g. on ResilienceConfig change). Test + Admin-reload use.</summary>
|
||||
public int Invalidate(string driverInstanceId)
|
||||
{
|
||||
var removed = 0;
|
||||
foreach (var key in _pipelines.Keys)
|
||||
{
|
||||
if (key.DriverInstanceId == driverInstanceId && _pipelines.TryRemove(key, out _))
|
||||
removed++;
|
||||
}
|
||||
return removed;
|
||||
}
|
||||
|
||||
/// <summary>Snapshot of the current number of cached pipelines. For diagnostics only.</summary>
|
||||
public int CachedPipelineCount => _pipelines.Count;
|
||||
|
||||
private static ResiliencePipeline Build(
|
||||
DriverCapability capability,
|
||||
DriverResilienceOptions options,
|
||||
TimeProvider timeProvider)
|
||||
{
|
||||
var policy = options.Resolve(capability);
|
||||
var builder = new ResiliencePipelineBuilder { TimeProvider = timeProvider };
|
||||
|
||||
builder.AddTimeout(new TimeoutStrategyOptions
|
||||
{
|
||||
Timeout = TimeSpan.FromSeconds(policy.TimeoutSeconds),
|
||||
});
|
||||
|
||||
if (policy.RetryCount > 0)
|
||||
{
|
||||
builder.AddRetry(new RetryStrategyOptions
|
||||
{
|
||||
MaxRetryAttempts = policy.RetryCount,
|
||||
BackoffType = DelayBackoffType.Exponential,
|
||||
UseJitter = true,
|
||||
Delay = TimeSpan.FromMilliseconds(100),
|
||||
MaxDelay = TimeSpan.FromSeconds(5),
|
||||
ShouldHandle = new PredicateBuilder().Handle<Exception>(ex => ex is not OperationCanceledException),
|
||||
});
|
||||
}
|
||||
|
||||
if (policy.BreakerFailureThreshold > 0)
|
||||
{
|
||||
builder.AddCircuitBreaker(new CircuitBreakerStrategyOptions
|
||||
{
|
||||
FailureRatio = 1.0,
|
||||
MinimumThroughput = policy.BreakerFailureThreshold,
|
||||
SamplingDuration = TimeSpan.FromSeconds(30),
|
||||
BreakDuration = TimeSpan.FromSeconds(15),
|
||||
ShouldHandle = new PredicateBuilder().Handle<Exception>(ex => ex is not OperationCanceledException),
|
||||
});
|
||||
}
|
||||
|
||||
return builder.Build();
|
||||
}
|
||||
|
||||
private readonly record struct PipelineKey(string DriverInstanceId, string HostName, DriverCapability Capability);
|
||||
}
|
||||
@@ -16,6 +16,10 @@
|
||||
<ProjectReference Include="..\ZB.MOM.WW.OtOpcUa.Configuration\ZB.MOM.WW.OtOpcUa.Configuration.csproj"/>
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Polly.Core" Version="8.6.6"/>
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<InternalsVisibleTo Include="ZB.MOM.WW.OtOpcUa.Core.Tests"/>
|
||||
</ItemGroup>
|
||||
|
||||
@@ -115,7 +115,8 @@ public sealed class ModbusDriver(ModbusDriverOptions options, string driverInsta
|
||||
ArrayDim: null,
|
||||
SecurityClass: t.Writable ? SecurityClassification.Operate : SecurityClassification.ViewOnly,
|
||||
IsHistorized: false,
|
||||
IsAlarm: false));
|
||||
IsAlarm: false,
|
||||
WriteIdempotent: t.WriteIdempotent));
|
||||
}
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
@@ -92,6 +92,14 @@ public sealed class ModbusProbeOptions
|
||||
/// AutomationDirect DirectLOGIC (DL205/DL260) and a few legacy families pack the first
|
||||
/// character in the low byte instead — see <c>docs/v2/dl205.md</c> §strings.
|
||||
/// </param>
|
||||
/// <param name="WriteIdempotent">
|
||||
/// Per <c>docs/v2/plan.md</c> decisions #44, #45, #143 — flag a tag as safe to replay on
|
||||
/// write timeout / failure. Default <c>false</c>; writes do not auto-retry. Safe candidates:
|
||||
/// holding-register set-points for analog values and configuration registers where the same
|
||||
/// value can be written again without side-effects. Unsafe: coils that drive edge-triggered
|
||||
/// actions (pulse outputs), counter-increment addresses on PLCs that treat writes as deltas,
|
||||
/// any BCD / counter register where repeat-writes advance state.
|
||||
/// </param>
|
||||
public sealed record ModbusTagDefinition(
|
||||
string Name,
|
||||
ModbusRegion Region,
|
||||
@@ -101,7 +109,8 @@ public sealed record ModbusTagDefinition(
|
||||
ModbusByteOrder ByteOrder = ModbusByteOrder.BigEndian,
|
||||
byte BitIndex = 0,
|
||||
ushort StringLength = 0,
|
||||
ModbusStringByteOrder StringByteOrder = ModbusStringByteOrder.HighByteFirst);
|
||||
ModbusStringByteOrder StringByteOrder = ModbusStringByteOrder.HighByteFirst,
|
||||
bool WriteIdempotent = false);
|
||||
|
||||
public enum ModbusRegion { Coils, DiscreteInputs, InputRegisters, HoldingRegisters }
|
||||
|
||||
|
||||
@@ -341,7 +341,8 @@ public sealed class S7Driver(S7DriverOptions options, string driverInstanceId)
|
||||
ArrayDim: null,
|
||||
SecurityClass: t.Writable ? SecurityClassification.Operate : SecurityClassification.ViewOnly,
|
||||
IsHistorized: false,
|
||||
IsAlarm: false));
|
||||
IsAlarm: false,
|
||||
WriteIdempotent: t.WriteIdempotent));
|
||||
}
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
@@ -88,12 +88,20 @@ public sealed class S7ProbeOptions
|
||||
/// <param name="DataType">Logical data type — drives the underlying S7.Net read/write width.</param>
|
||||
/// <param name="Writable">When true the driver accepts writes for this tag.</param>
|
||||
/// <param name="StringLength">For <c>DataType = String</c>: S7-string max length. Default 254 (S7 max).</param>
|
||||
/// <param name="WriteIdempotent">
|
||||
/// Per <c>docs/v2/plan.md</c> decisions #44, #45, #143 — flag a tag as safe to replay on
|
||||
/// write timeout / failure. Default <c>false</c>; writes do not auto-retry. Safe candidates
|
||||
/// on S7: DB word/dword set-points holding analog values, configuration DBs where the same
|
||||
/// value can be written again without side-effects. Unsafe: M (merker) bits or Q (output)
|
||||
/// coils that drive edge-triggered routines in the PLC program.
|
||||
/// </param>
|
||||
public sealed record S7TagDefinition(
|
||||
string Name,
|
||||
string Address,
|
||||
S7DataType DataType,
|
||||
bool Writable = true,
|
||||
int StringLength = 254);
|
||||
int StringLength = 254,
|
||||
bool WriteIdempotent = false);
|
||||
|
||||
public enum S7DataType
|
||||
{
|
||||
|
||||
@@ -3,6 +3,7 @@ using Microsoft.Extensions.Logging;
|
||||
using Opc.Ua;
|
||||
using Opc.Ua.Server;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Resilience;
|
||||
using ZB.MOM.WW.OtOpcUa.Server.Security;
|
||||
using DriverWriteRequest = ZB.MOM.WW.OtOpcUa.Core.Abstractions.WriteRequest;
|
||||
// Core.Abstractions defines a type-named HistoryReadResult (driver-side samples + continuation
|
||||
@@ -33,8 +34,14 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder
|
||||
private readonly IDriver _driver;
|
||||
private readonly IReadable? _readable;
|
||||
private readonly IWritable? _writable;
|
||||
private readonly CapabilityInvoker _invoker;
|
||||
private readonly ILogger<DriverNodeManager> _logger;
|
||||
|
||||
// Per-variable idempotency flag populated during Variable() registration from
|
||||
// DriverAttributeInfo.WriteIdempotent. Drives ExecuteWriteAsync's retry gating in
|
||||
// OnWriteValue; absent entries default to false (decisions #44, #45, #143).
|
||||
private readonly Dictionary<string, bool> _writeIdempotentByFullRef = new(StringComparer.OrdinalIgnoreCase);
|
||||
|
||||
/// <summary>The driver whose address space this node manager exposes.</summary>
|
||||
public IDriver Driver => _driver;
|
||||
|
||||
@@ -53,12 +60,13 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder
|
||||
private FolderState _currentFolder = null!;
|
||||
|
||||
public DriverNodeManager(IServerInternal server, ApplicationConfiguration configuration,
|
||||
IDriver driver, ILogger<DriverNodeManager> logger)
|
||||
IDriver driver, CapabilityInvoker invoker, ILogger<DriverNodeManager> logger)
|
||||
: base(server, configuration, namespaceUris: $"urn:OtOpcUa:{driver.DriverInstanceId}")
|
||||
{
|
||||
_driver = driver;
|
||||
_readable = driver as IReadable;
|
||||
_writable = driver as IWritable;
|
||||
_invoker = invoker;
|
||||
_logger = logger;
|
||||
}
|
||||
|
||||
@@ -148,6 +156,7 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder
|
||||
AddPredefinedNode(SystemContext, v);
|
||||
_variablesByFullRef[attributeInfo.FullName] = v;
|
||||
_securityByFullRef[attributeInfo.FullName] = attributeInfo.SecurityClass;
|
||||
_writeIdempotentByFullRef[attributeInfo.FullName] = attributeInfo.WriteIdempotent;
|
||||
|
||||
v.OnReadValue = OnReadValue;
|
||||
v.OnWriteValue = OnWriteValue;
|
||||
@@ -188,7 +197,11 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder
|
||||
try
|
||||
{
|
||||
var fullRef = node.NodeId.Identifier as string ?? "";
|
||||
var result = _readable.ReadAsync([fullRef], CancellationToken.None).GetAwaiter().GetResult();
|
||||
var result = _invoker.ExecuteAsync(
|
||||
DriverCapability.Read,
|
||||
_driver.DriverInstanceId,
|
||||
async ct => (IReadOnlyList<DataValueSnapshot>)await _readable.ReadAsync([fullRef], ct).ConfigureAwait(false),
|
||||
CancellationToken.None).AsTask().GetAwaiter().GetResult();
|
||||
if (result.Count == 0)
|
||||
{
|
||||
statusCode = StatusCodes.BadNoData;
|
||||
@@ -381,9 +394,15 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder
|
||||
|
||||
try
|
||||
{
|
||||
var results = _writable.WriteAsync(
|
||||
[new DriverWriteRequest(fullRef!, value)],
|
||||
CancellationToken.None).GetAwaiter().GetResult();
|
||||
var isIdempotent = _writeIdempotentByFullRef.GetValueOrDefault(fullRef!, false);
|
||||
var capturedValue = value;
|
||||
var results = _invoker.ExecuteWriteAsync(
|
||||
_driver.DriverInstanceId,
|
||||
isIdempotent,
|
||||
async ct => (IReadOnlyList<WriteResult>)await _writable.WriteAsync(
|
||||
[new DriverWriteRequest(fullRef!, capturedValue)],
|
||||
ct).ConfigureAwait(false),
|
||||
CancellationToken.None).AsTask().GetAwaiter().GetResult();
|
||||
if (results.Count > 0 && results[0].StatusCode != 0)
|
||||
{
|
||||
statusCode = results[0].StatusCode;
|
||||
@@ -465,12 +484,16 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder
|
||||
|
||||
try
|
||||
{
|
||||
var driverResult = History.ReadRawAsync(
|
||||
fullRef,
|
||||
details.StartTime,
|
||||
details.EndTime,
|
||||
details.NumValuesPerNode,
|
||||
CancellationToken.None).GetAwaiter().GetResult();
|
||||
var driverResult = _invoker.ExecuteAsync(
|
||||
DriverCapability.HistoryRead,
|
||||
_driver.DriverInstanceId,
|
||||
async ct => await History.ReadRawAsync(
|
||||
fullRef,
|
||||
details.StartTime,
|
||||
details.EndTime,
|
||||
details.NumValuesPerNode,
|
||||
ct).ConfigureAwait(false),
|
||||
CancellationToken.None).AsTask().GetAwaiter().GetResult();
|
||||
|
||||
WriteResult(results, errors, i, StatusCodes.Good,
|
||||
BuildHistoryData(driverResult.Samples), driverResult.ContinuationPoint);
|
||||
@@ -525,13 +548,17 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder
|
||||
|
||||
try
|
||||
{
|
||||
var driverResult = History.ReadProcessedAsync(
|
||||
fullRef,
|
||||
details.StartTime,
|
||||
details.EndTime,
|
||||
interval,
|
||||
aggregate.Value,
|
||||
CancellationToken.None).GetAwaiter().GetResult();
|
||||
var driverResult = _invoker.ExecuteAsync(
|
||||
DriverCapability.HistoryRead,
|
||||
_driver.DriverInstanceId,
|
||||
async ct => await History.ReadProcessedAsync(
|
||||
fullRef,
|
||||
details.StartTime,
|
||||
details.EndTime,
|
||||
interval,
|
||||
aggregate.Value,
|
||||
ct).ConfigureAwait(false),
|
||||
CancellationToken.None).AsTask().GetAwaiter().GetResult();
|
||||
|
||||
WriteResult(results, errors, i, StatusCodes.Good,
|
||||
BuildHistoryData(driverResult.Samples), driverResult.ContinuationPoint);
|
||||
@@ -578,8 +605,11 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder
|
||||
|
||||
try
|
||||
{
|
||||
var driverResult = History.ReadAtTimeAsync(
|
||||
fullRef, requestedTimes, CancellationToken.None).GetAwaiter().GetResult();
|
||||
var driverResult = _invoker.ExecuteAsync(
|
||||
DriverCapability.HistoryRead,
|
||||
_driver.DriverInstanceId,
|
||||
async ct => await History.ReadAtTimeAsync(fullRef, requestedTimes, ct).ConfigureAwait(false),
|
||||
CancellationToken.None).AsTask().GetAwaiter().GetResult();
|
||||
|
||||
WriteResult(results, errors, i, StatusCodes.Good,
|
||||
BuildHistoryData(driverResult.Samples), driverResult.ContinuationPoint);
|
||||
@@ -632,12 +662,16 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder
|
||||
|
||||
try
|
||||
{
|
||||
var driverResult = History.ReadEventsAsync(
|
||||
sourceName: fullRef,
|
||||
startUtc: details.StartTime,
|
||||
endUtc: details.EndTime,
|
||||
maxEvents: maxEvents,
|
||||
cancellationToken: CancellationToken.None).GetAwaiter().GetResult();
|
||||
var driverResult = _invoker.ExecuteAsync(
|
||||
DriverCapability.HistoryRead,
|
||||
_driver.DriverInstanceId,
|
||||
async ct => await History.ReadEventsAsync(
|
||||
sourceName: fullRef,
|
||||
startUtc: details.StartTime,
|
||||
endUtc: details.EndTime,
|
||||
maxEvents: maxEvents,
|
||||
cancellationToken: ct).ConfigureAwait(false),
|
||||
CancellationToken.None).AsTask().GetAwaiter().GetResult();
|
||||
|
||||
WriteResult(results, errors, i, StatusCodes.Good,
|
||||
BuildHistoryEvent(driverResult.Events), driverResult.ContinuationPoint);
|
||||
|
||||
@@ -3,6 +3,7 @@ using Opc.Ua;
|
||||
using Opc.Ua.Configuration;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Hosting;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.OpcUa;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Resilience;
|
||||
using ZB.MOM.WW.OtOpcUa.Server.Security;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Server.OpcUa;
|
||||
@@ -20,6 +21,7 @@ public sealed class OpcUaApplicationHost : IAsyncDisposable
|
||||
private readonly OpcUaServerOptions _options;
|
||||
private readonly DriverHost _driverHost;
|
||||
private readonly IUserAuthenticator _authenticator;
|
||||
private readonly DriverResiliencePipelineBuilder _pipelineBuilder;
|
||||
private readonly ILoggerFactory _loggerFactory;
|
||||
private readonly ILogger<OpcUaApplicationHost> _logger;
|
||||
private ApplicationInstance? _application;
|
||||
@@ -27,11 +29,13 @@ public sealed class OpcUaApplicationHost : IAsyncDisposable
|
||||
private bool _disposed;
|
||||
|
||||
public OpcUaApplicationHost(OpcUaServerOptions options, DriverHost driverHost,
|
||||
IUserAuthenticator authenticator, ILoggerFactory loggerFactory, ILogger<OpcUaApplicationHost> logger)
|
||||
IUserAuthenticator authenticator, ILoggerFactory loggerFactory, ILogger<OpcUaApplicationHost> logger,
|
||||
DriverResiliencePipelineBuilder? pipelineBuilder = null)
|
||||
{
|
||||
_options = options;
|
||||
_driverHost = driverHost;
|
||||
_authenticator = authenticator;
|
||||
_pipelineBuilder = pipelineBuilder ?? new DriverResiliencePipelineBuilder();
|
||||
_loggerFactory = loggerFactory;
|
||||
_logger = logger;
|
||||
}
|
||||
@@ -58,7 +62,7 @@ public sealed class OpcUaApplicationHost : IAsyncDisposable
|
||||
throw new InvalidOperationException(
|
||||
$"OPC UA application certificate could not be validated or created in {_options.PkiStoreRoot}");
|
||||
|
||||
_server = new OtOpcUaServer(_driverHost, _authenticator, _loggerFactory);
|
||||
_server = new OtOpcUaServer(_driverHost, _authenticator, _pipelineBuilder, _loggerFactory);
|
||||
await _application.Start(_server).ConfigureAwait(false);
|
||||
|
||||
_logger.LogInformation("OPC UA server started — endpoint={Endpoint} driverCount={Count}",
|
||||
|
||||
@@ -5,6 +5,7 @@ using Opc.Ua.Server;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Hosting;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.OpcUa;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Resilience;
|
||||
using ZB.MOM.WW.OtOpcUa.Server.Security;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Server.OpcUa;
|
||||
@@ -19,13 +20,19 @@ public sealed class OtOpcUaServer : StandardServer
|
||||
{
|
||||
private readonly DriverHost _driverHost;
|
||||
private readonly IUserAuthenticator _authenticator;
|
||||
private readonly DriverResiliencePipelineBuilder _pipelineBuilder;
|
||||
private readonly ILoggerFactory _loggerFactory;
|
||||
private readonly List<DriverNodeManager> _driverNodeManagers = new();
|
||||
|
||||
public OtOpcUaServer(DriverHost driverHost, IUserAuthenticator authenticator, ILoggerFactory loggerFactory)
|
||||
public OtOpcUaServer(
|
||||
DriverHost driverHost,
|
||||
IUserAuthenticator authenticator,
|
||||
DriverResiliencePipelineBuilder pipelineBuilder,
|
||||
ILoggerFactory loggerFactory)
|
||||
{
|
||||
_driverHost = driverHost;
|
||||
_authenticator = authenticator;
|
||||
_pipelineBuilder = pipelineBuilder;
|
||||
_loggerFactory = loggerFactory;
|
||||
}
|
||||
|
||||
@@ -46,7 +53,12 @@ public sealed class OtOpcUaServer : StandardServer
|
||||
if (driver is null) continue;
|
||||
|
||||
var logger = _loggerFactory.CreateLogger<DriverNodeManager>();
|
||||
var manager = new DriverNodeManager(server, configuration, driver, logger);
|
||||
// Per-driver resilience options: default Tier A pending Stream B.1 which wires
|
||||
// per-type tiers into DriverTypeRegistry. Read ResilienceConfig JSON from the
|
||||
// DriverInstance row in a follow-up PR; for now every driver gets Tier A defaults.
|
||||
var options = new DriverResilienceOptions { Tier = DriverTier.A };
|
||||
var invoker = new CapabilityInvoker(_pipelineBuilder, driver.DriverInstanceId, () => options);
|
||||
var manager = new DriverNodeManager(server, configuration, driver, invoker, logger);
|
||||
_driverNodeManagers.Add(manager);
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,151 @@
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Resilience;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.Tests.Resilience;
|
||||
|
||||
[Trait("Category", "Unit")]
|
||||
public sealed class CapabilityInvokerTests
|
||||
{
|
||||
private static CapabilityInvoker MakeInvoker(
|
||||
DriverResiliencePipelineBuilder builder,
|
||||
DriverResilienceOptions options) =>
|
||||
new(builder, "drv-test", () => options);
|
||||
|
||||
[Fact]
|
||||
public async Task Read_ReturnsValue_FromCallSite()
|
||||
{
|
||||
var invoker = MakeInvoker(new DriverResiliencePipelineBuilder(), new DriverResilienceOptions { Tier = DriverTier.A });
|
||||
|
||||
var result = await invoker.ExecuteAsync(
|
||||
DriverCapability.Read,
|
||||
"host-1",
|
||||
_ => ValueTask.FromResult(42),
|
||||
CancellationToken.None);
|
||||
|
||||
result.ShouldBe(42);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Read_Retries_OnTransientFailure()
|
||||
{
|
||||
var invoker = MakeInvoker(new DriverResiliencePipelineBuilder(), new DriverResilienceOptions { Tier = DriverTier.A });
|
||||
var attempts = 0;
|
||||
|
||||
var result = await invoker.ExecuteAsync(
|
||||
DriverCapability.Read,
|
||||
"host-1",
|
||||
async _ =>
|
||||
{
|
||||
attempts++;
|
||||
if (attempts < 2) throw new InvalidOperationException("transient");
|
||||
await Task.Yield();
|
||||
return "ok";
|
||||
},
|
||||
CancellationToken.None);
|
||||
|
||||
result.ShouldBe("ok");
|
||||
attempts.ShouldBe(2);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Write_NonIdempotent_DoesNotRetry_EvenWhenPolicyHasRetries()
|
||||
{
|
||||
var options = new DriverResilienceOptions
|
||||
{
|
||||
Tier = DriverTier.A,
|
||||
CapabilityPolicies = new Dictionary<DriverCapability, CapabilityPolicy>
|
||||
{
|
||||
[DriverCapability.Write] = new(TimeoutSeconds: 2, RetryCount: 3, BreakerFailureThreshold: 5),
|
||||
},
|
||||
};
|
||||
var invoker = MakeInvoker(new DriverResiliencePipelineBuilder(), options);
|
||||
var attempts = 0;
|
||||
|
||||
await Should.ThrowAsync<InvalidOperationException>(async () =>
|
||||
await invoker.ExecuteWriteAsync(
|
||||
"host-1",
|
||||
isIdempotent: false,
|
||||
async _ =>
|
||||
{
|
||||
attempts++;
|
||||
await Task.Yield();
|
||||
throw new InvalidOperationException("boom");
|
||||
#pragma warning disable CS0162
|
||||
return 0;
|
||||
#pragma warning restore CS0162
|
||||
},
|
||||
CancellationToken.None));
|
||||
|
||||
attempts.ShouldBe(1, "non-idempotent write must never replay");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Write_Idempotent_Retries_WhenPolicyHasRetries()
|
||||
{
|
||||
var options = new DriverResilienceOptions
|
||||
{
|
||||
Tier = DriverTier.A,
|
||||
CapabilityPolicies = new Dictionary<DriverCapability, CapabilityPolicy>
|
||||
{
|
||||
[DriverCapability.Write] = new(TimeoutSeconds: 2, RetryCount: 3, BreakerFailureThreshold: 5),
|
||||
},
|
||||
};
|
||||
var invoker = MakeInvoker(new DriverResiliencePipelineBuilder(), options);
|
||||
var attempts = 0;
|
||||
|
||||
var result = await invoker.ExecuteWriteAsync(
|
||||
"host-1",
|
||||
isIdempotent: true,
|
||||
async _ =>
|
||||
{
|
||||
attempts++;
|
||||
if (attempts < 2) throw new InvalidOperationException("transient");
|
||||
await Task.Yield();
|
||||
return "ok";
|
||||
},
|
||||
CancellationToken.None);
|
||||
|
||||
result.ShouldBe("ok");
|
||||
attempts.ShouldBe(2);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Write_Default_DoesNotRetry_WhenPolicyHasZeroRetries()
|
||||
{
|
||||
// Tier A Write default is RetryCount=0. Even isIdempotent=true shouldn't retry
|
||||
// because the policy says not to.
|
||||
var invoker = MakeInvoker(new DriverResiliencePipelineBuilder(), new DriverResilienceOptions { Tier = DriverTier.A });
|
||||
var attempts = 0;
|
||||
|
||||
await Should.ThrowAsync<InvalidOperationException>(async () =>
|
||||
await invoker.ExecuteWriteAsync(
|
||||
"host-1",
|
||||
isIdempotent: true,
|
||||
async _ =>
|
||||
{
|
||||
attempts++;
|
||||
await Task.Yield();
|
||||
throw new InvalidOperationException("boom");
|
||||
#pragma warning disable CS0162
|
||||
return 0;
|
||||
#pragma warning restore CS0162
|
||||
},
|
||||
CancellationToken.None));
|
||||
|
||||
attempts.ShouldBe(1, "tier-A default for Write is RetryCount=0");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Execute_HonorsDifferentHosts_Independently()
|
||||
{
|
||||
var builder = new DriverResiliencePipelineBuilder();
|
||||
var invoker = MakeInvoker(builder, new DriverResilienceOptions { Tier = DriverTier.A });
|
||||
|
||||
await invoker.ExecuteAsync(DriverCapability.Read, "host-a", _ => ValueTask.FromResult(1), CancellationToken.None);
|
||||
await invoker.ExecuteAsync(DriverCapability.Read, "host-b", _ => ValueTask.FromResult(2), CancellationToken.None);
|
||||
|
||||
builder.CachedPipelineCount.ShouldBe(2);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,102 @@
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Resilience;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.Tests.Resilience;
|
||||
|
||||
[Trait("Category", "Unit")]
|
||||
public sealed class DriverResilienceOptionsTests
|
||||
{
|
||||
[Theory]
|
||||
[InlineData(DriverTier.A)]
|
||||
[InlineData(DriverTier.B)]
|
||||
[InlineData(DriverTier.C)]
|
||||
public void TierDefaults_Cover_EveryCapability(DriverTier tier)
|
||||
{
|
||||
var defaults = DriverResilienceOptions.GetTierDefaults(tier);
|
||||
|
||||
foreach (var capability in Enum.GetValues<DriverCapability>())
|
||||
defaults.ShouldContainKey(capability);
|
||||
}
|
||||
|
||||
[Theory]
|
||||
[InlineData(DriverTier.A)]
|
||||
[InlineData(DriverTier.B)]
|
||||
[InlineData(DriverTier.C)]
|
||||
public void Write_NeverRetries_ByDefault(DriverTier tier)
|
||||
{
|
||||
var defaults = DriverResilienceOptions.GetTierDefaults(tier);
|
||||
defaults[DriverCapability.Write].RetryCount.ShouldBe(0);
|
||||
}
|
||||
|
||||
[Theory]
|
||||
[InlineData(DriverTier.A)]
|
||||
[InlineData(DriverTier.B)]
|
||||
[InlineData(DriverTier.C)]
|
||||
public void AlarmAcknowledge_NeverRetries_ByDefault(DriverTier tier)
|
||||
{
|
||||
var defaults = DriverResilienceOptions.GetTierDefaults(tier);
|
||||
defaults[DriverCapability.AlarmAcknowledge].RetryCount.ShouldBe(0);
|
||||
}
|
||||
|
||||
[Theory]
|
||||
[InlineData(DriverTier.A, DriverCapability.Read)]
|
||||
[InlineData(DriverTier.A, DriverCapability.HistoryRead)]
|
||||
[InlineData(DriverTier.B, DriverCapability.Discover)]
|
||||
[InlineData(DriverTier.B, DriverCapability.Probe)]
|
||||
[InlineData(DriverTier.C, DriverCapability.AlarmSubscribe)]
|
||||
public void IdempotentCapabilities_Retry_ByDefault(DriverTier tier, DriverCapability capability)
|
||||
{
|
||||
var defaults = DriverResilienceOptions.GetTierDefaults(tier);
|
||||
defaults[capability].RetryCount.ShouldBeGreaterThan(0);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void TierC_DisablesCircuitBreaker_DeferringToSupervisor()
|
||||
{
|
||||
var defaults = DriverResilienceOptions.GetTierDefaults(DriverTier.C);
|
||||
|
||||
foreach (var (_, policy) in defaults)
|
||||
policy.BreakerFailureThreshold.ShouldBe(0, "Tier C breaker is handled by the Proxy supervisor (decision #68)");
|
||||
}
|
||||
|
||||
[Theory]
|
||||
[InlineData(DriverTier.A)]
|
||||
[InlineData(DriverTier.B)]
|
||||
public void TierAAndB_EnableCircuitBreaker(DriverTier tier)
|
||||
{
|
||||
var defaults = DriverResilienceOptions.GetTierDefaults(tier);
|
||||
|
||||
foreach (var (_, policy) in defaults)
|
||||
policy.BreakerFailureThreshold.ShouldBeGreaterThan(0);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Resolve_Uses_TierDefaults_When_NoOverride()
|
||||
{
|
||||
var options = new DriverResilienceOptions { Tier = DriverTier.A };
|
||||
|
||||
var resolved = options.Resolve(DriverCapability.Read);
|
||||
|
||||
resolved.ShouldBe(DriverResilienceOptions.GetTierDefaults(DriverTier.A)[DriverCapability.Read]);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Resolve_Uses_Override_When_Configured()
|
||||
{
|
||||
var custom = new CapabilityPolicy(TimeoutSeconds: 42, RetryCount: 7, BreakerFailureThreshold: 9);
|
||||
var options = new DriverResilienceOptions
|
||||
{
|
||||
Tier = DriverTier.A,
|
||||
CapabilityPolicies = new Dictionary<DriverCapability, CapabilityPolicy>
|
||||
{
|
||||
[DriverCapability.Read] = custom,
|
||||
},
|
||||
};
|
||||
|
||||
options.Resolve(DriverCapability.Read).ShouldBe(custom);
|
||||
options.Resolve(DriverCapability.Write).ShouldBe(
|
||||
DriverResilienceOptions.GetTierDefaults(DriverTier.A)[DriverCapability.Write]);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,222 @@
|
||||
using Polly.CircuitBreaker;
|
||||
using Polly.Timeout;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Resilience;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.Tests.Resilience;
|
||||
|
||||
[Trait("Category", "Unit")]
|
||||
public sealed class DriverResiliencePipelineBuilderTests
|
||||
{
|
||||
private static readonly DriverResilienceOptions TierAOptions = new() { Tier = DriverTier.A };
|
||||
|
||||
[Fact]
|
||||
public async Task Read_Retries_Transient_Failures()
|
||||
{
|
||||
var builder = new DriverResiliencePipelineBuilder();
|
||||
var pipeline = builder.GetOrCreate("drv-test", "host-1", DriverCapability.Read, TierAOptions);
|
||||
var attempts = 0;
|
||||
|
||||
await pipeline.ExecuteAsync(async _ =>
|
||||
{
|
||||
attempts++;
|
||||
if (attempts < 3) throw new InvalidOperationException("transient");
|
||||
await Task.Yield();
|
||||
});
|
||||
|
||||
attempts.ShouldBe(3);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Write_DoesNotRetry_OnFailure()
|
||||
{
|
||||
var builder = new DriverResiliencePipelineBuilder();
|
||||
var pipeline = builder.GetOrCreate("drv-test", "host-1", DriverCapability.Write, TierAOptions);
|
||||
var attempts = 0;
|
||||
|
||||
var ex = await Should.ThrowAsync<InvalidOperationException>(async () =>
|
||||
{
|
||||
await pipeline.ExecuteAsync(async _ =>
|
||||
{
|
||||
attempts++;
|
||||
await Task.Yield();
|
||||
throw new InvalidOperationException("boom");
|
||||
});
|
||||
});
|
||||
|
||||
attempts.ShouldBe(1);
|
||||
ex.Message.ShouldBe("boom");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task AlarmAcknowledge_DoesNotRetry_OnFailure()
|
||||
{
|
||||
var builder = new DriverResiliencePipelineBuilder();
|
||||
var pipeline = builder.GetOrCreate("drv-test", "host-1", DriverCapability.AlarmAcknowledge, TierAOptions);
|
||||
var attempts = 0;
|
||||
|
||||
await Should.ThrowAsync<InvalidOperationException>(async () =>
|
||||
{
|
||||
await pipeline.ExecuteAsync(async _ =>
|
||||
{
|
||||
attempts++;
|
||||
await Task.Yield();
|
||||
throw new InvalidOperationException("boom");
|
||||
});
|
||||
});
|
||||
|
||||
attempts.ShouldBe(1);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Pipeline_IsIsolated_PerHost()
|
||||
{
|
||||
var builder = new DriverResiliencePipelineBuilder();
|
||||
var driverId = "drv-test";
|
||||
|
||||
var hostA = builder.GetOrCreate(driverId, "host-a", DriverCapability.Read, TierAOptions);
|
||||
var hostB = builder.GetOrCreate(driverId, "host-b", DriverCapability.Read, TierAOptions);
|
||||
|
||||
hostA.ShouldNotBeSameAs(hostB);
|
||||
builder.CachedPipelineCount.ShouldBe(2);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Pipeline_IsReused_ForSameTriple()
|
||||
{
|
||||
var builder = new DriverResiliencePipelineBuilder();
|
||||
var driverId = "drv-test";
|
||||
|
||||
var first = builder.GetOrCreate(driverId, "host-a", DriverCapability.Read, TierAOptions);
|
||||
var second = builder.GetOrCreate(driverId, "host-a", DriverCapability.Read, TierAOptions);
|
||||
|
||||
first.ShouldBeSameAs(second);
|
||||
builder.CachedPipelineCount.ShouldBe(1);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Pipeline_IsIsolated_PerCapability()
|
||||
{
|
||||
var builder = new DriverResiliencePipelineBuilder();
|
||||
var driverId = "drv-test";
|
||||
|
||||
var read = builder.GetOrCreate(driverId, "host-a", DriverCapability.Read, TierAOptions);
|
||||
var write = builder.GetOrCreate(driverId, "host-a", DriverCapability.Write, TierAOptions);
|
||||
|
||||
read.ShouldNotBeSameAs(write);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task DeadHost_DoesNotOpenBreaker_ForSiblingHost()
|
||||
{
|
||||
var builder = new DriverResiliencePipelineBuilder();
|
||||
var driverId = "drv-test";
|
||||
|
||||
var deadHost = builder.GetOrCreate(driverId, "dead-plc", DriverCapability.Read, TierAOptions);
|
||||
var liveHost = builder.GetOrCreate(driverId, "live-plc", DriverCapability.Read, TierAOptions);
|
||||
|
||||
var threshold = TierAOptions.Resolve(DriverCapability.Read).BreakerFailureThreshold;
|
||||
for (var i = 0; i < threshold + 5; i++)
|
||||
{
|
||||
await Should.ThrowAsync<Exception>(async () =>
|
||||
await deadHost.ExecuteAsync(async _ =>
|
||||
{
|
||||
await Task.Yield();
|
||||
throw new InvalidOperationException("dead plc");
|
||||
}));
|
||||
}
|
||||
|
||||
var liveAttempts = 0;
|
||||
await liveHost.ExecuteAsync(async _ =>
|
||||
{
|
||||
liveAttempts++;
|
||||
await Task.Yield();
|
||||
});
|
||||
|
||||
liveAttempts.ShouldBe(1, "healthy sibling host must not be affected by dead peer");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task CircuitBreaker_Opens_AfterFailureThreshold_OnTierA()
|
||||
{
|
||||
var builder = new DriverResiliencePipelineBuilder();
|
||||
var pipeline = builder.GetOrCreate("drv-test", "host-1", DriverCapability.Write, TierAOptions);
|
||||
|
||||
var threshold = TierAOptions.Resolve(DriverCapability.Write).BreakerFailureThreshold;
|
||||
for (var i = 0; i < threshold; i++)
|
||||
{
|
||||
await Should.ThrowAsync<InvalidOperationException>(async () =>
|
||||
await pipeline.ExecuteAsync(async _ =>
|
||||
{
|
||||
await Task.Yield();
|
||||
throw new InvalidOperationException("boom");
|
||||
}));
|
||||
}
|
||||
|
||||
await Should.ThrowAsync<BrokenCircuitException>(async () =>
|
||||
await pipeline.ExecuteAsync(async _ =>
|
||||
{
|
||||
await Task.Yield();
|
||||
}));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Timeout_Cancels_SlowOperation()
|
||||
{
|
||||
var tierAWithShortTimeout = new DriverResilienceOptions
|
||||
{
|
||||
Tier = DriverTier.A,
|
||||
CapabilityPolicies = new Dictionary<DriverCapability, CapabilityPolicy>
|
||||
{
|
||||
[DriverCapability.Read] = new(TimeoutSeconds: 1, RetryCount: 0, BreakerFailureThreshold: 5),
|
||||
},
|
||||
};
|
||||
var builder = new DriverResiliencePipelineBuilder();
|
||||
var pipeline = builder.GetOrCreate("drv-test", "host-1", DriverCapability.Read, tierAWithShortTimeout);
|
||||
|
||||
await Should.ThrowAsync<TimeoutRejectedException>(async () =>
|
||||
await pipeline.ExecuteAsync(async ct =>
|
||||
{
|
||||
await Task.Delay(TimeSpan.FromSeconds(5), ct);
|
||||
}));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Invalidate_Removes_OnlyMatchingInstance()
|
||||
{
|
||||
var builder = new DriverResiliencePipelineBuilder();
|
||||
var keepId = "drv-keep";
|
||||
var dropId = "drv-drop";
|
||||
|
||||
builder.GetOrCreate(keepId, "h", DriverCapability.Read, TierAOptions);
|
||||
builder.GetOrCreate(keepId, "h", DriverCapability.Write, TierAOptions);
|
||||
builder.GetOrCreate(dropId, "h", DriverCapability.Read, TierAOptions);
|
||||
|
||||
var removed = builder.Invalidate(dropId);
|
||||
|
||||
removed.ShouldBe(1);
|
||||
builder.CachedPipelineCount.ShouldBe(2);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Cancellation_IsNot_Retried()
|
||||
{
|
||||
var builder = new DriverResiliencePipelineBuilder();
|
||||
var pipeline = builder.GetOrCreate("drv-test", "host-1", DriverCapability.Read, TierAOptions);
|
||||
var attempts = 0;
|
||||
using var cts = new CancellationTokenSource();
|
||||
cts.Cancel();
|
||||
|
||||
await Should.ThrowAsync<OperationCanceledException>(async () =>
|
||||
await pipeline.ExecuteAsync(async ct =>
|
||||
{
|
||||
attempts++;
|
||||
ct.ThrowIfCancellationRequested();
|
||||
await Task.Yield();
|
||||
}, cts.Token));
|
||||
|
||||
attempts.ShouldBeLessThanOrEqualTo(1);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,160 @@
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Resilience;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.Tests.Resilience;
|
||||
|
||||
/// <summary>
|
||||
/// Integration tests for the Phase 6.1 Stream A.5 contract — wrapping a flaky
|
||||
/// <see cref="IReadable"/> / <see cref="IWritable"/> through the <see cref="CapabilityInvoker"/>.
|
||||
/// Exercises the three scenarios the plan enumerates: transient read succeeds after N
|
||||
/// retries; non-idempotent write fails after one attempt; idempotent write retries through.
|
||||
/// </summary>
|
||||
[Trait("Category", "Integration")]
|
||||
public sealed class FlakeyDriverIntegrationTests
|
||||
{
|
||||
[Fact]
|
||||
public async Task Read_SurfacesSuccess_AfterTransientFailures()
|
||||
{
|
||||
var flaky = new FlakeyDriver(failReadsBeforeIndex: 5);
|
||||
var options = new DriverResilienceOptions
|
||||
{
|
||||
Tier = DriverTier.A,
|
||||
CapabilityPolicies = new Dictionary<DriverCapability, CapabilityPolicy>
|
||||
{
|
||||
// TimeoutSeconds=30 gives slack for 5 exponential-backoff retries under
|
||||
// parallel-test-execution CPU pressure; 10 retries at the default Delay=100ms
|
||||
// exponential can otherwise exceed a 2-second budget intermittently.
|
||||
[DriverCapability.Read] = new(TimeoutSeconds: 30, RetryCount: 10, BreakerFailureThreshold: 50),
|
||||
},
|
||||
};
|
||||
var invoker = new CapabilityInvoker(new DriverResiliencePipelineBuilder(), "drv-test", () => options);
|
||||
|
||||
var result = await invoker.ExecuteAsync(
|
||||
DriverCapability.Read,
|
||||
"host-1",
|
||||
async ct => await flaky.ReadAsync(["tag-a"], ct),
|
||||
CancellationToken.None);
|
||||
|
||||
flaky.ReadAttempts.ShouldBe(6);
|
||||
result[0].StatusCode.ShouldBe(0u);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Write_NonIdempotent_FailsOnFirstFailure_NoReplay()
|
||||
{
|
||||
var flaky = new FlakeyDriver(failWritesBeforeIndex: 3);
|
||||
var optionsWithAggressiveRetry = new DriverResilienceOptions
|
||||
{
|
||||
Tier = DriverTier.A,
|
||||
CapabilityPolicies = new Dictionary<DriverCapability, CapabilityPolicy>
|
||||
{
|
||||
[DriverCapability.Write] = new(TimeoutSeconds: 2, RetryCount: 5, BreakerFailureThreshold: 50),
|
||||
},
|
||||
};
|
||||
var invoker = new CapabilityInvoker(new DriverResiliencePipelineBuilder(), "drv-test", () => optionsWithAggressiveRetry);
|
||||
|
||||
await Should.ThrowAsync<InvalidOperationException>(async () =>
|
||||
await invoker.ExecuteWriteAsync(
|
||||
"host-1",
|
||||
isIdempotent: false,
|
||||
async ct => await flaky.WriteAsync([new WriteRequest("pulse-coil", true)], ct),
|
||||
CancellationToken.None));
|
||||
|
||||
flaky.WriteAttempts.ShouldBe(1, "non-idempotent write must never replay (decision #44)");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Write_Idempotent_RetriesUntilSuccess()
|
||||
{
|
||||
var flaky = new FlakeyDriver(failWritesBeforeIndex: 2);
|
||||
var optionsWithRetry = new DriverResilienceOptions
|
||||
{
|
||||
Tier = DriverTier.A,
|
||||
CapabilityPolicies = new Dictionary<DriverCapability, CapabilityPolicy>
|
||||
{
|
||||
[DriverCapability.Write] = new(TimeoutSeconds: 2, RetryCount: 5, BreakerFailureThreshold: 50),
|
||||
},
|
||||
};
|
||||
var invoker = new CapabilityInvoker(new DriverResiliencePipelineBuilder(), "drv-test", () => optionsWithRetry);
|
||||
|
||||
var results = await invoker.ExecuteWriteAsync(
|
||||
"host-1",
|
||||
isIdempotent: true,
|
||||
async ct => await flaky.WriteAsync([new WriteRequest("set-point", 42.0f)], ct),
|
||||
CancellationToken.None);
|
||||
|
||||
flaky.WriteAttempts.ShouldBe(3);
|
||||
results[0].StatusCode.ShouldBe(0u);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task MultipleHosts_OnOneDriver_HaveIndependentFailureCounts()
|
||||
{
|
||||
var flaky = new FlakeyDriver(failReadsBeforeIndex: 0);
|
||||
var options = new DriverResilienceOptions { Tier = DriverTier.A };
|
||||
var builder = new DriverResiliencePipelineBuilder();
|
||||
var invoker = new CapabilityInvoker(builder, "drv-test", () => options);
|
||||
|
||||
// host-dead: force many failures to exhaust retries + trip breaker
|
||||
var threshold = options.Resolve(DriverCapability.Read).BreakerFailureThreshold;
|
||||
for (var i = 0; i < threshold + 5; i++)
|
||||
{
|
||||
await Should.ThrowAsync<Exception>(async () =>
|
||||
await invoker.ExecuteAsync(DriverCapability.Read, "host-dead",
|
||||
_ => throw new InvalidOperationException("dead"),
|
||||
CancellationToken.None));
|
||||
}
|
||||
|
||||
// host-live: succeeds on first call — unaffected by the dead-host breaker
|
||||
var liveAttempts = 0;
|
||||
await invoker.ExecuteAsync(DriverCapability.Read, "host-live",
|
||||
_ => { liveAttempts++; return ValueTask.FromResult("ok"); },
|
||||
CancellationToken.None);
|
||||
|
||||
liveAttempts.ShouldBe(1);
|
||||
}
|
||||
|
||||
private sealed class FlakeyDriver : IReadable, IWritable
|
||||
{
|
||||
private readonly int _failReadsBeforeIndex;
|
||||
private readonly int _failWritesBeforeIndex;
|
||||
|
||||
public int ReadAttempts { get; private set; }
|
||||
public int WriteAttempts { get; private set; }
|
||||
|
||||
public FlakeyDriver(int failReadsBeforeIndex = 0, int failWritesBeforeIndex = 0)
|
||||
{
|
||||
_failReadsBeforeIndex = failReadsBeforeIndex;
|
||||
_failWritesBeforeIndex = failWritesBeforeIndex;
|
||||
}
|
||||
|
||||
public Task<IReadOnlyList<DataValueSnapshot>> ReadAsync(
|
||||
IReadOnlyList<string> fullReferences,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
var attempt = ++ReadAttempts;
|
||||
if (attempt <= _failReadsBeforeIndex)
|
||||
throw new InvalidOperationException($"transient read failure #{attempt}");
|
||||
|
||||
var now = DateTime.UtcNow;
|
||||
IReadOnlyList<DataValueSnapshot> result = fullReferences
|
||||
.Select(_ => new DataValueSnapshot(Value: 0, StatusCode: 0u, SourceTimestampUtc: now, ServerTimestampUtc: now))
|
||||
.ToList();
|
||||
return Task.FromResult(result);
|
||||
}
|
||||
|
||||
public Task<IReadOnlyList<WriteResult>> WriteAsync(
|
||||
IReadOnlyList<WriteRequest> writes,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
var attempt = ++WriteAttempts;
|
||||
if (attempt <= _failWritesBeforeIndex)
|
||||
throw new InvalidOperationException($"transient write failure #{attempt}");
|
||||
|
||||
IReadOnlyList<WriteResult> result = writes.Select(_ => new WriteResult(0u)).ToList();
|
||||
return Task.FromResult(result);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -220,6 +220,23 @@ public sealed class ModbusDriverTests
|
||||
builder.Variables.ShouldContain(v => v.BrowseName == "Run" && v.Info.DriverDataType == DriverDataType.Boolean);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Discover_propagates_WriteIdempotent_from_tag_to_attribute_info()
|
||||
{
|
||||
var (drv, _) = NewDriver(
|
||||
new ModbusTagDefinition("SetPoint", ModbusRegion.HoldingRegisters, 0, ModbusDataType.Float32, WriteIdempotent: true),
|
||||
new ModbusTagDefinition("PulseCoil", ModbusRegion.Coils, 0, ModbusDataType.Bool));
|
||||
await drv.InitializeAsync("{}", CancellationToken.None);
|
||||
|
||||
var builder = new RecordingBuilder();
|
||||
await drv.DiscoverAsync(builder, CancellationToken.None);
|
||||
|
||||
var setPoint = builder.Variables.Single(v => v.BrowseName == "SetPoint");
|
||||
var pulse = builder.Variables.Single(v => v.BrowseName == "PulseCoil");
|
||||
setPoint.Info.WriteIdempotent.ShouldBeTrue();
|
||||
pulse.Info.WriteIdempotent.ShouldBeFalse("default is opt-in per decision #44");
|
||||
}
|
||||
|
||||
// --- helpers ---
|
||||
|
||||
private sealed class RecordingBuilder : IAddressSpaceBuilder
|
||||
|
||||
@@ -65,6 +65,27 @@ public sealed class S7DiscoveryAndSubscribeTests
|
||||
builder.Variables[2].Attr.DriverDataType.ShouldBe(DriverDataType.Float32);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task DiscoverAsync_propagates_WriteIdempotent_from_tag_to_attribute_info()
|
||||
{
|
||||
var opts = new S7DriverOptions
|
||||
{
|
||||
Host = "192.0.2.1",
|
||||
Tags =
|
||||
[
|
||||
new("SetPoint", "DB1.DBW0", S7DataType.Int16, WriteIdempotent: true),
|
||||
new("StartBit", "M0.0", S7DataType.Bool),
|
||||
],
|
||||
};
|
||||
using var drv = new S7Driver(opts, "s7-idem");
|
||||
|
||||
var builder = new RecordingAddressSpaceBuilder();
|
||||
await drv.DiscoverAsync(builder, TestContext.Current.CancellationToken);
|
||||
|
||||
builder.Variables.Single(v => v.Name == "SetPoint").Attr.WriteIdempotent.ShouldBeTrue();
|
||||
builder.Variables.Single(v => v.Name == "StartBit").Attr.WriteIdempotent.ShouldBeFalse("default is opt-in per decision #44");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void GetHostStatuses_returns_one_row_with_host_port_identity_pre_init()
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user