Phase 6.1 Stream A (partial) - Polly resilience foundation: pipeline builder + CapabilityInvoker + per-tier defaults #78

Merged
dohertj2 merged 6 commits from phase-6-1-stream-a-resilience into v2 2026-04-19 07:33:55 -04:00
8 changed files with 228 additions and 5 deletions
Showing only changes of commit f3850f8914 - Show all commits

View File

@@ -25,6 +25,14 @@ namespace ZB.MOM.WW.OtOpcUa.Core.Abstractions;
/// OPC UA <c>AlarmConditionState</c> when true. Defaults to false so existing non-Galaxy
/// drivers aren't forced to flow a flag they don't produce.
/// </param>
/// <param name="WriteIdempotent">
/// True when a timed-out or failed write to this attribute is safe to replay. Per
/// <c>docs/v2/plan.md</c> decisions #44, #45, #143 — writes are NOT auto-retried by default
/// because replaying a pulse / alarm-ack / counter-increment / recipe-step advance can
/// duplicate field actions. Drivers flag only tags whose semantics make retry safe
/// (holding registers with level-set values, set-point writes to analog tags) — the
/// capability invoker respects this flag when deciding whether to apply Polly retry.
/// </param>
public sealed record DriverAttributeInfo(
string FullName,
DriverDataType DriverDataType,
@@ -32,4 +40,5 @@ public sealed record DriverAttributeInfo(
uint? ArrayDim,
SecurityClassification SecurityClass,
bool IsHistorized,
bool IsAlarm = false);
bool IsAlarm = false,
bool WriteIdempotent = false);

View File

@@ -115,7 +115,8 @@ public sealed class ModbusDriver(ModbusDriverOptions options, string driverInsta
ArrayDim: null,
SecurityClass: t.Writable ? SecurityClassification.Operate : SecurityClassification.ViewOnly,
IsHistorized: false,
IsAlarm: false));
IsAlarm: false,
WriteIdempotent: t.WriteIdempotent));
}
return Task.CompletedTask;
}

View File

@@ -92,6 +92,14 @@ public sealed class ModbusProbeOptions
/// AutomationDirect DirectLOGIC (DL205/DL260) and a few legacy families pack the first
/// character in the low byte instead — see <c>docs/v2/dl205.md</c> §strings.
/// </param>
/// <param name="WriteIdempotent">
/// Per <c>docs/v2/plan.md</c> decisions #44, #45, #143 — flag a tag as safe to replay on
/// write timeout / failure. Default <c>false</c>; writes do not auto-retry. Safe candidates:
/// holding-register set-points for analog values and configuration registers where the same
/// value can be written again without side-effects. Unsafe: coils that drive edge-triggered
/// actions (pulse outputs), counter-increment addresses on PLCs that treat writes as deltas,
/// any BCD / counter register where repeat-writes advance state.
/// </param>
public sealed record ModbusTagDefinition(
string Name,
ModbusRegion Region,
@@ -101,7 +109,8 @@ public sealed record ModbusTagDefinition(
ModbusByteOrder ByteOrder = ModbusByteOrder.BigEndian,
byte BitIndex = 0,
ushort StringLength = 0,
ModbusStringByteOrder StringByteOrder = ModbusStringByteOrder.HighByteFirst);
ModbusStringByteOrder StringByteOrder = ModbusStringByteOrder.HighByteFirst,
bool WriteIdempotent = false);
public enum ModbusRegion { Coils, DiscreteInputs, InputRegisters, HoldingRegisters }

View File

@@ -341,7 +341,8 @@ public sealed class S7Driver(S7DriverOptions options, string driverInstanceId)
ArrayDim: null,
SecurityClass: t.Writable ? SecurityClassification.Operate : SecurityClassification.ViewOnly,
IsHistorized: false,
IsAlarm: false));
IsAlarm: false,
WriteIdempotent: t.WriteIdempotent));
}
return Task.CompletedTask;
}

View File

@@ -88,12 +88,20 @@ public sealed class S7ProbeOptions
/// <param name="DataType">Logical data type — drives the underlying S7.Net read/write width.</param>
/// <param name="Writable">When true the driver accepts writes for this tag.</param>
/// <param name="StringLength">For <c>DataType = String</c>: S7-string max length. Default 254 (S7 max).</param>
/// <param name="WriteIdempotent">
/// Per <c>docs/v2/plan.md</c> decisions #44, #45, #143 — flag a tag as safe to replay on
/// write timeout / failure. Default <c>false</c>; writes do not auto-retry. Safe candidates
/// on S7: DB word/dword set-points holding analog values, configuration DBs where the same
/// value can be written again without side-effects. Unsafe: M (merker) bits or Q (output)
/// coils that drive edge-triggered routines in the PLC program.
/// </param>
public sealed record S7TagDefinition(
string Name,
string Address,
S7DataType DataType,
bool Writable = true,
int StringLength = 254);
int StringLength = 254,
bool WriteIdempotent = false);
public enum S7DataType
{

View File

@@ -0,0 +1,157 @@
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
using ZB.MOM.WW.OtOpcUa.Core.Resilience;
namespace ZB.MOM.WW.OtOpcUa.Core.Tests.Resilience;
/// <summary>
/// Integration tests for the Phase 6.1 Stream A.5 contract — wrapping a flaky
/// <see cref="IReadable"/> / <see cref="IWritable"/> through the <see cref="CapabilityInvoker"/>.
/// Exercises the three scenarios the plan enumerates: transient read succeeds after N
/// retries; non-idempotent write fails after one attempt; idempotent write retries through.
/// </summary>
[Trait("Category", "Integration")]
public sealed class FlakeyDriverIntegrationTests
{
[Fact]
public async Task Read_SurfacesSuccess_AfterTransientFailures()
{
var flaky = new FlakeyDriver(failReadsBeforeIndex: 5);
var options = new DriverResilienceOptions
{
Tier = DriverTier.A,
CapabilityPolicies = new Dictionary<DriverCapability, CapabilityPolicy>
{
[DriverCapability.Read] = new(TimeoutSeconds: 2, RetryCount: 10, BreakerFailureThreshold: 50),
},
};
var invoker = new CapabilityInvoker(new DriverResiliencePipelineBuilder(), Guid.NewGuid(), () => options);
var result = await invoker.ExecuteAsync(
DriverCapability.Read,
"host-1",
async ct => await flaky.ReadAsync(["tag-a"], ct),
CancellationToken.None);
flaky.ReadAttempts.ShouldBe(6);
result[0].StatusCode.ShouldBe(0u);
}
[Fact]
public async Task Write_NonIdempotent_FailsOnFirstFailure_NoReplay()
{
var flaky = new FlakeyDriver(failWritesBeforeIndex: 3);
var optionsWithAggressiveRetry = new DriverResilienceOptions
{
Tier = DriverTier.A,
CapabilityPolicies = new Dictionary<DriverCapability, CapabilityPolicy>
{
[DriverCapability.Write] = new(TimeoutSeconds: 2, RetryCount: 5, BreakerFailureThreshold: 50),
},
};
var invoker = new CapabilityInvoker(new DriverResiliencePipelineBuilder(), Guid.NewGuid(), () => optionsWithAggressiveRetry);
await Should.ThrowAsync<InvalidOperationException>(async () =>
await invoker.ExecuteWriteAsync(
"host-1",
isIdempotent: false,
async ct => await flaky.WriteAsync([new WriteRequest("pulse-coil", true)], ct),
CancellationToken.None));
flaky.WriteAttempts.ShouldBe(1, "non-idempotent write must never replay (decision #44)");
}
[Fact]
public async Task Write_Idempotent_RetriesUntilSuccess()
{
var flaky = new FlakeyDriver(failWritesBeforeIndex: 2);
var optionsWithRetry = new DriverResilienceOptions
{
Tier = DriverTier.A,
CapabilityPolicies = new Dictionary<DriverCapability, CapabilityPolicy>
{
[DriverCapability.Write] = new(TimeoutSeconds: 2, RetryCount: 5, BreakerFailureThreshold: 50),
},
};
var invoker = new CapabilityInvoker(new DriverResiliencePipelineBuilder(), Guid.NewGuid(), () => optionsWithRetry);
var results = await invoker.ExecuteWriteAsync(
"host-1",
isIdempotent: true,
async ct => await flaky.WriteAsync([new WriteRequest("set-point", 42.0f)], ct),
CancellationToken.None);
flaky.WriteAttempts.ShouldBe(3);
results[0].StatusCode.ShouldBe(0u);
}
[Fact]
public async Task MultipleHosts_OnOneDriver_HaveIndependentFailureCounts()
{
var flaky = new FlakeyDriver(failReadsBeforeIndex: 0);
var options = new DriverResilienceOptions { Tier = DriverTier.A };
var builder = new DriverResiliencePipelineBuilder();
var invoker = new CapabilityInvoker(builder, Guid.NewGuid(), () => options);
// host-dead: force many failures to exhaust retries + trip breaker
var threshold = options.Resolve(DriverCapability.Read).BreakerFailureThreshold;
for (var i = 0; i < threshold + 5; i++)
{
await Should.ThrowAsync<Exception>(async () =>
await invoker.ExecuteAsync(DriverCapability.Read, "host-dead",
_ => throw new InvalidOperationException("dead"),
CancellationToken.None));
}
// host-live: succeeds on first call — unaffected by the dead-host breaker
var liveAttempts = 0;
await invoker.ExecuteAsync(DriverCapability.Read, "host-live",
_ => { liveAttempts++; return ValueTask.FromResult("ok"); },
CancellationToken.None);
liveAttempts.ShouldBe(1);
}
private sealed class FlakeyDriver : IReadable, IWritable
{
private readonly int _failReadsBeforeIndex;
private readonly int _failWritesBeforeIndex;
public int ReadAttempts { get; private set; }
public int WriteAttempts { get; private set; }
public FlakeyDriver(int failReadsBeforeIndex = 0, int failWritesBeforeIndex = 0)
{
_failReadsBeforeIndex = failReadsBeforeIndex;
_failWritesBeforeIndex = failWritesBeforeIndex;
}
public Task<IReadOnlyList<DataValueSnapshot>> ReadAsync(
IReadOnlyList<string> fullReferences,
CancellationToken cancellationToken)
{
var attempt = ++ReadAttempts;
if (attempt <= _failReadsBeforeIndex)
throw new InvalidOperationException($"transient read failure #{attempt}");
var now = DateTime.UtcNow;
IReadOnlyList<DataValueSnapshot> result = fullReferences
.Select(_ => new DataValueSnapshot(Value: 0, StatusCode: 0u, SourceTimestampUtc: now, ServerTimestampUtc: now))
.ToList();
return Task.FromResult(result);
}
public Task<IReadOnlyList<WriteResult>> WriteAsync(
IReadOnlyList<WriteRequest> writes,
CancellationToken cancellationToken)
{
var attempt = ++WriteAttempts;
if (attempt <= _failWritesBeforeIndex)
throw new InvalidOperationException($"transient write failure #{attempt}");
IReadOnlyList<WriteResult> result = writes.Select(_ => new WriteResult(0u)).ToList();
return Task.FromResult(result);
}
}
}

View File

@@ -220,6 +220,23 @@ public sealed class ModbusDriverTests
builder.Variables.ShouldContain(v => v.BrowseName == "Run" && v.Info.DriverDataType == DriverDataType.Boolean);
}
[Fact]
public async Task Discover_propagates_WriteIdempotent_from_tag_to_attribute_info()
{
var (drv, _) = NewDriver(
new ModbusTagDefinition("SetPoint", ModbusRegion.HoldingRegisters, 0, ModbusDataType.Float32, WriteIdempotent: true),
new ModbusTagDefinition("PulseCoil", ModbusRegion.Coils, 0, ModbusDataType.Bool));
await drv.InitializeAsync("{}", CancellationToken.None);
var builder = new RecordingBuilder();
await drv.DiscoverAsync(builder, CancellationToken.None);
var setPoint = builder.Variables.Single(v => v.BrowseName == "SetPoint");
var pulse = builder.Variables.Single(v => v.BrowseName == "PulseCoil");
setPoint.Info.WriteIdempotent.ShouldBeTrue();
pulse.Info.WriteIdempotent.ShouldBeFalse("default is opt-in per decision #44");
}
// --- helpers ---
private sealed class RecordingBuilder : IAddressSpaceBuilder

View File

@@ -65,6 +65,27 @@ public sealed class S7DiscoveryAndSubscribeTests
builder.Variables[2].Attr.DriverDataType.ShouldBe(DriverDataType.Float32);
}
[Fact]
public async Task DiscoverAsync_propagates_WriteIdempotent_from_tag_to_attribute_info()
{
var opts = new S7DriverOptions
{
Host = "192.0.2.1",
Tags =
[
new("SetPoint", "DB1.DBW0", S7DataType.Int16, WriteIdempotent: true),
new("StartBit", "M0.0", S7DataType.Bool),
],
};
using var drv = new S7Driver(opts, "s7-idem");
var builder = new RecordingAddressSpaceBuilder();
await drv.DiscoverAsync(builder, TestContext.Current.CancellationToken);
builder.Variables.Single(v => v.Name == "SetPoint").Attr.WriteIdempotent.ShouldBeTrue();
builder.Variables.Single(v => v.Name == "StartBit").Attr.WriteIdempotent.ShouldBeFalse("default is opt-in per decision #44");
}
[Fact]
public void GetHostStatuses_returns_one_row_with_host_port_identity_pre_init()
{