Every OnReadValue / OnWriteValue now routes through the process-singleton DriverResiliencePipelineBuilder's CapabilityInvoker. Read / Write dispatch paths gain timeout + per-capability retry + per-(driver, host) circuit breaker + bulkhead without touching the individual driver implementations. Wiring: - OpcUaApplicationHost: new optional DriverResiliencePipelineBuilder ctor parameter (default null → instance-owned builder). Keeps the 3 test call sites that construct OpcUaApplicationHost directly unchanged. - OtOpcUaServer: requires the builder in its ctor; constructs one CapabilityInvoker per driver at CreateMasterNodeManager time with default Tier A DriverResilienceOptions. TODO: Stream B.1 will wire real per-driver- type tiers via DriverTypeRegistry; Phase 6.1 follow-up will read the DriverInstance.ResilienceConfig JSON column for per-instance overrides. - DriverNodeManager: takes a CapabilityInvoker in its ctor. OnReadValue wraps the driver's ReadAsync through ExecuteAsync(DriverCapability.Read, hostName, ...); OnWriteValue wraps WriteAsync through ExecuteWriteAsync(hostName, isIdempotent, ...) where isIdempotent comes from the new _writeIdempotentByFullRef map populated at Variable() registration from DriverAttributeInfo.WriteIdempotent. HostName defaults to driver.DriverInstanceId for now — a single-host pipeline per driver. Multi-host drivers (Modbus with N PLCs) will expose their own per- call host resolution in a follow-up so failing PLCs can trip per-PLC breakers without poisoning siblings (decision #144). Test fixup: - FlakeyDriverIntegrationTests.Read_SurfacesSuccess_AfterTransientFailures: bumped TimeoutSeconds=2 → 30. 10 retries at exponential backoff with jitter can exceed 2s under parallel-test-run CPU pressure; the test asserts retry behavior, not timeout budget, so the longer slack keeps it deterministic. Full solution dotnet test: 948 passing. Pre-existing Client.CLI Subscribe flake unchanged. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
161 lines
6.5 KiB
C#
161 lines
6.5 KiB
C#
using Shouldly;
|
|
using Xunit;
|
|
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
|
using ZB.MOM.WW.OtOpcUa.Core.Resilience;
|
|
|
|
namespace ZB.MOM.WW.OtOpcUa.Core.Tests.Resilience;
|
|
|
|
/// <summary>
|
|
/// Integration tests for the Phase 6.1 Stream A.5 contract — wrapping a flaky
|
|
/// <see cref="IReadable"/> / <see cref="IWritable"/> through the <see cref="CapabilityInvoker"/>.
|
|
/// Exercises the three scenarios the plan enumerates: transient read succeeds after N
|
|
/// retries; non-idempotent write fails after one attempt; idempotent write retries through.
|
|
/// </summary>
|
|
[Trait("Category", "Integration")]
|
|
public sealed class FlakeyDriverIntegrationTests
|
|
{
|
|
[Fact]
|
|
public async Task Read_SurfacesSuccess_AfterTransientFailures()
|
|
{
|
|
var flaky = new FlakeyDriver(failReadsBeforeIndex: 5);
|
|
var options = new DriverResilienceOptions
|
|
{
|
|
Tier = DriverTier.A,
|
|
CapabilityPolicies = new Dictionary<DriverCapability, CapabilityPolicy>
|
|
{
|
|
// TimeoutSeconds=30 gives slack for 5 exponential-backoff retries under
|
|
// parallel-test-execution CPU pressure; 10 retries at the default Delay=100ms
|
|
// exponential can otherwise exceed a 2-second budget intermittently.
|
|
[DriverCapability.Read] = new(TimeoutSeconds: 30, RetryCount: 10, BreakerFailureThreshold: 50),
|
|
},
|
|
};
|
|
var invoker = new CapabilityInvoker(new DriverResiliencePipelineBuilder(), "drv-test", () => options);
|
|
|
|
var result = await invoker.ExecuteAsync(
|
|
DriverCapability.Read,
|
|
"host-1",
|
|
async ct => await flaky.ReadAsync(["tag-a"], ct),
|
|
CancellationToken.None);
|
|
|
|
flaky.ReadAttempts.ShouldBe(6);
|
|
result[0].StatusCode.ShouldBe(0u);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Write_NonIdempotent_FailsOnFirstFailure_NoReplay()
|
|
{
|
|
var flaky = new FlakeyDriver(failWritesBeforeIndex: 3);
|
|
var optionsWithAggressiveRetry = new DriverResilienceOptions
|
|
{
|
|
Tier = DriverTier.A,
|
|
CapabilityPolicies = new Dictionary<DriverCapability, CapabilityPolicy>
|
|
{
|
|
[DriverCapability.Write] = new(TimeoutSeconds: 2, RetryCount: 5, BreakerFailureThreshold: 50),
|
|
},
|
|
};
|
|
var invoker = new CapabilityInvoker(new DriverResiliencePipelineBuilder(), "drv-test", () => optionsWithAggressiveRetry);
|
|
|
|
await Should.ThrowAsync<InvalidOperationException>(async () =>
|
|
await invoker.ExecuteWriteAsync(
|
|
"host-1",
|
|
isIdempotent: false,
|
|
async ct => await flaky.WriteAsync([new WriteRequest("pulse-coil", true)], ct),
|
|
CancellationToken.None));
|
|
|
|
flaky.WriteAttempts.ShouldBe(1, "non-idempotent write must never replay (decision #44)");
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Write_Idempotent_RetriesUntilSuccess()
|
|
{
|
|
var flaky = new FlakeyDriver(failWritesBeforeIndex: 2);
|
|
var optionsWithRetry = new DriverResilienceOptions
|
|
{
|
|
Tier = DriverTier.A,
|
|
CapabilityPolicies = new Dictionary<DriverCapability, CapabilityPolicy>
|
|
{
|
|
[DriverCapability.Write] = new(TimeoutSeconds: 2, RetryCount: 5, BreakerFailureThreshold: 50),
|
|
},
|
|
};
|
|
var invoker = new CapabilityInvoker(new DriverResiliencePipelineBuilder(), "drv-test", () => optionsWithRetry);
|
|
|
|
var results = await invoker.ExecuteWriteAsync(
|
|
"host-1",
|
|
isIdempotent: true,
|
|
async ct => await flaky.WriteAsync([new WriteRequest("set-point", 42.0f)], ct),
|
|
CancellationToken.None);
|
|
|
|
flaky.WriteAttempts.ShouldBe(3);
|
|
results[0].StatusCode.ShouldBe(0u);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task MultipleHosts_OnOneDriver_HaveIndependentFailureCounts()
|
|
{
|
|
var flaky = new FlakeyDriver(failReadsBeforeIndex: 0);
|
|
var options = new DriverResilienceOptions { Tier = DriverTier.A };
|
|
var builder = new DriverResiliencePipelineBuilder();
|
|
var invoker = new CapabilityInvoker(builder, "drv-test", () => options);
|
|
|
|
// host-dead: force many failures to exhaust retries + trip breaker
|
|
var threshold = options.Resolve(DriverCapability.Read).BreakerFailureThreshold;
|
|
for (var i = 0; i < threshold + 5; i++)
|
|
{
|
|
await Should.ThrowAsync<Exception>(async () =>
|
|
await invoker.ExecuteAsync(DriverCapability.Read, "host-dead",
|
|
_ => throw new InvalidOperationException("dead"),
|
|
CancellationToken.None));
|
|
}
|
|
|
|
// host-live: succeeds on first call — unaffected by the dead-host breaker
|
|
var liveAttempts = 0;
|
|
await invoker.ExecuteAsync(DriverCapability.Read, "host-live",
|
|
_ => { liveAttempts++; return ValueTask.FromResult("ok"); },
|
|
CancellationToken.None);
|
|
|
|
liveAttempts.ShouldBe(1);
|
|
}
|
|
|
|
private sealed class FlakeyDriver : IReadable, IWritable
|
|
{
|
|
private readonly int _failReadsBeforeIndex;
|
|
private readonly int _failWritesBeforeIndex;
|
|
|
|
public int ReadAttempts { get; private set; }
|
|
public int WriteAttempts { get; private set; }
|
|
|
|
public FlakeyDriver(int failReadsBeforeIndex = 0, int failWritesBeforeIndex = 0)
|
|
{
|
|
_failReadsBeforeIndex = failReadsBeforeIndex;
|
|
_failWritesBeforeIndex = failWritesBeforeIndex;
|
|
}
|
|
|
|
public Task<IReadOnlyList<DataValueSnapshot>> ReadAsync(
|
|
IReadOnlyList<string> fullReferences,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
var attempt = ++ReadAttempts;
|
|
if (attempt <= _failReadsBeforeIndex)
|
|
throw new InvalidOperationException($"transient read failure #{attempt}");
|
|
|
|
var now = DateTime.UtcNow;
|
|
IReadOnlyList<DataValueSnapshot> result = fullReferences
|
|
.Select(_ => new DataValueSnapshot(Value: 0, StatusCode: 0u, SourceTimestampUtc: now, ServerTimestampUtc: now))
|
|
.ToList();
|
|
return Task.FromResult(result);
|
|
}
|
|
|
|
public Task<IReadOnlyList<WriteResult>> WriteAsync(
|
|
IReadOnlyList<WriteRequest> writes,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
var attempt = ++WriteAttempts;
|
|
if (attempt <= _failWritesBeforeIndex)
|
|
throw new InvalidOperationException($"transient write failure #{attempt}");
|
|
|
|
IReadOnlyList<WriteResult> result = writes.Select(_ => new WriteResult(0u)).ToList();
|
|
return Task.FromResult(result);
|
|
}
|
|
}
|
|
}
|