From f3850f89140ddf567fb0df2e1209eaf80fe6712e Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 19 Apr 2026 07:16:21 -0400 Subject: [PATCH] =?UTF-8?q?Phase=206.1=20Stream=20A.5/A.6=20=E2=80=94=20Wr?= =?UTF-8?q?iteIdempotent=20flag=20on=20DriverAttributeInfo=20+=20Modbus/S7?= =?UTF-8?q?=20tag=20records=20+=20FlakeyDriver=20integration=20tests?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per-tag opt-in for write-retry per docs/v2/plan.md decisions #44, #45, #143. Default is false — writes never auto-retry unless the driver author has marked the tag as safe to replay. Core.Abstractions: - DriverAttributeInfo gains `bool WriteIdempotent = false` at the end of the positional record (back-compatible; every existing call site uses the default). Driver.Modbus: - ModbusTagDefinition gains `bool WriteIdempotent = false`. Safe candidates documented in the param XML: holding-register set-points, configuration registers. Unsafe: edge-triggered coils, counter-increment addresses. - ModbusDriver.DiscoverAsync propagates t.WriteIdempotent into DriverAttributeInfo.WriteIdempotent. Driver.S7: - S7TagDefinition gains `bool WriteIdempotent = false`. Safe candidates: DB word/dword set-points, configuration DBs. Unsafe: M/Q bits that drive edge-triggered program routines. - S7Driver.DiscoverAsync propagates the flag. Stream A.5 integration tests (FlakeyDriverIntegrationTests, 4 new) exercise the invoker + flaky-driver contract the plan enumerates: - Read with 5 transient failures succeeds on the 6th attempt (RetryCount=10). - Non-idempotent write with RetryCount=5 configured still fails on the first failure — no replay (decision #44 guard at the ExecuteWriteAsync surface). - Idempotent write with 2 transient failures succeeds on the 3rd attempt. - Two hosts on the same driver have independent breakers — dead-host trips its breaker but live-host's first call still succeeds. Propagation tests: - ModbusDriverTests: SetPoint WriteIdempotent=true flows into DriverAttributeInfo; PulseCoil default=false. - S7DiscoveryAndSubscribeTests: same pattern for DBx SetPoint vs M-bit. Full solution dotnet test: 947 passing (baseline 906, +41 net across Stream A so far). Pre-existing Client.CLI Subscribe flake unchanged. Stream A's remaining work (wiring CapabilityInvoker into DriverNodeManager's OnReadValue / OnWriteValue / History / Subscribe dispatch paths) is the server-side integration piece + needs DI wiring for the pipeline builder — lands in the next PR on this branch. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../DriverAttributeInfo.cs | 11 +- .../ModbusDriver.cs | 3 +- .../ModbusDriverOptions.cs | 11 +- src/ZB.MOM.WW.OtOpcUa.Driver.S7/S7Driver.cs | 3 +- .../S7DriverOptions.cs | 10 +- .../FlakeyDriverIntegrationTests.cs | 157 ++++++++++++++++++ .../ModbusDriverTests.cs | 17 ++ .../S7DiscoveryAndSubscribeTests.cs | 21 +++ 8 files changed, 228 insertions(+), 5 deletions(-) create mode 100644 tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/FlakeyDriverIntegrationTests.cs diff --git a/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/DriverAttributeInfo.cs b/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/DriverAttributeInfo.cs index 7071770..1c24020 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/DriverAttributeInfo.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/DriverAttributeInfo.cs @@ -25,6 +25,14 @@ namespace ZB.MOM.WW.OtOpcUa.Core.Abstractions; /// OPC UA AlarmConditionState when true. Defaults to false so existing non-Galaxy /// drivers aren't forced to flow a flag they don't produce. /// +/// +/// True when a timed-out or failed write to this attribute is safe to replay. Per +/// docs/v2/plan.md decisions #44, #45, #143 — writes are NOT auto-retried by default +/// because replaying a pulse / alarm-ack / counter-increment / recipe-step advance can +/// duplicate field actions. Drivers flag only tags whose semantics make retry safe +/// (holding registers with level-set values, set-point writes to analog tags) — the +/// capability invoker respects this flag when deciding whether to apply Polly retry. +/// public sealed record DriverAttributeInfo( string FullName, DriverDataType DriverDataType, @@ -32,4 +40,5 @@ public sealed record DriverAttributeInfo( uint? ArrayDim, SecurityClassification SecurityClass, bool IsHistorized, - bool IsAlarm = false); + bool IsAlarm = false, + bool WriteIdempotent = false); diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriver.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriver.cs index 2d2eec9..cbc7bf9 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriver.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriver.cs @@ -115,7 +115,8 @@ public sealed class ModbusDriver(ModbusDriverOptions options, string driverInsta ArrayDim: null, SecurityClass: t.Writable ? SecurityClassification.Operate : SecurityClassification.ViewOnly, IsHistorized: false, - IsAlarm: false)); + IsAlarm: false, + WriteIdempotent: t.WriteIdempotent)); } return Task.CompletedTask; } diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriverOptions.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriverOptions.cs index e05c44d..c119d4e 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriverOptions.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.Modbus/ModbusDriverOptions.cs @@ -92,6 +92,14 @@ public sealed class ModbusProbeOptions /// AutomationDirect DirectLOGIC (DL205/DL260) and a few legacy families pack the first /// character in the low byte instead — see docs/v2/dl205.md §strings. /// +/// +/// Per docs/v2/plan.md decisions #44, #45, #143 — flag a tag as safe to replay on +/// write timeout / failure. Default false; writes do not auto-retry. Safe candidates: +/// holding-register set-points for analog values and configuration registers where the same +/// value can be written again without side-effects. Unsafe: coils that drive edge-triggered +/// actions (pulse outputs), counter-increment addresses on PLCs that treat writes as deltas, +/// any BCD / counter register where repeat-writes advance state. +/// public sealed record ModbusTagDefinition( string Name, ModbusRegion Region, @@ -101,7 +109,8 @@ public sealed record ModbusTagDefinition( ModbusByteOrder ByteOrder = ModbusByteOrder.BigEndian, byte BitIndex = 0, ushort StringLength = 0, - ModbusStringByteOrder StringByteOrder = ModbusStringByteOrder.HighByteFirst); + ModbusStringByteOrder StringByteOrder = ModbusStringByteOrder.HighByteFirst, + bool WriteIdempotent = false); public enum ModbusRegion { Coils, DiscreteInputs, InputRegisters, HoldingRegisters } diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.S7/S7Driver.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.S7/S7Driver.cs index 708ad33..b7bb365 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.S7/S7Driver.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.S7/S7Driver.cs @@ -341,7 +341,8 @@ public sealed class S7Driver(S7DriverOptions options, string driverInstanceId) ArrayDim: null, SecurityClass: t.Writable ? SecurityClassification.Operate : SecurityClassification.ViewOnly, IsHistorized: false, - IsAlarm: false)); + IsAlarm: false, + WriteIdempotent: t.WriteIdempotent)); } return Task.CompletedTask; } diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.S7/S7DriverOptions.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.S7/S7DriverOptions.cs index 8f0e4ca..c3cc172 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.S7/S7DriverOptions.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.S7/S7DriverOptions.cs @@ -88,12 +88,20 @@ public sealed class S7ProbeOptions /// Logical data type — drives the underlying S7.Net read/write width. /// When true the driver accepts writes for this tag. /// For DataType = String: S7-string max length. Default 254 (S7 max). +/// +/// Per docs/v2/plan.md decisions #44, #45, #143 — flag a tag as safe to replay on +/// write timeout / failure. Default false; writes do not auto-retry. Safe candidates +/// on S7: DB word/dword set-points holding analog values, configuration DBs where the same +/// value can be written again without side-effects. Unsafe: M (merker) bits or Q (output) +/// coils that drive edge-triggered routines in the PLC program. +/// public sealed record S7TagDefinition( string Name, string Address, S7DataType DataType, bool Writable = true, - int StringLength = 254); + int StringLength = 254, + bool WriteIdempotent = false); public enum S7DataType { diff --git a/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/FlakeyDriverIntegrationTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/FlakeyDriverIntegrationTests.cs new file mode 100644 index 0000000..0622cdf --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Core.Tests/Resilience/FlakeyDriverIntegrationTests.cs @@ -0,0 +1,157 @@ +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.Core.Resilience; + +namespace ZB.MOM.WW.OtOpcUa.Core.Tests.Resilience; + +/// +/// Integration tests for the Phase 6.1 Stream A.5 contract — wrapping a flaky +/// / through the . +/// Exercises the three scenarios the plan enumerates: transient read succeeds after N +/// retries; non-idempotent write fails after one attempt; idempotent write retries through. +/// +[Trait("Category", "Integration")] +public sealed class FlakeyDriverIntegrationTests +{ + [Fact] + public async Task Read_SurfacesSuccess_AfterTransientFailures() + { + var flaky = new FlakeyDriver(failReadsBeforeIndex: 5); + var options = new DriverResilienceOptions + { + Tier = DriverTier.A, + CapabilityPolicies = new Dictionary + { + [DriverCapability.Read] = new(TimeoutSeconds: 2, RetryCount: 10, BreakerFailureThreshold: 50), + }, + }; + var invoker = new CapabilityInvoker(new DriverResiliencePipelineBuilder(), Guid.NewGuid(), () => options); + + var result = await invoker.ExecuteAsync( + DriverCapability.Read, + "host-1", + async ct => await flaky.ReadAsync(["tag-a"], ct), + CancellationToken.None); + + flaky.ReadAttempts.ShouldBe(6); + result[0].StatusCode.ShouldBe(0u); + } + + [Fact] + public async Task Write_NonIdempotent_FailsOnFirstFailure_NoReplay() + { + var flaky = new FlakeyDriver(failWritesBeforeIndex: 3); + var optionsWithAggressiveRetry = new DriverResilienceOptions + { + Tier = DriverTier.A, + CapabilityPolicies = new Dictionary + { + [DriverCapability.Write] = new(TimeoutSeconds: 2, RetryCount: 5, BreakerFailureThreshold: 50), + }, + }; + var invoker = new CapabilityInvoker(new DriverResiliencePipelineBuilder(), Guid.NewGuid(), () => optionsWithAggressiveRetry); + + await Should.ThrowAsync(async () => + await invoker.ExecuteWriteAsync( + "host-1", + isIdempotent: false, + async ct => await flaky.WriteAsync([new WriteRequest("pulse-coil", true)], ct), + CancellationToken.None)); + + flaky.WriteAttempts.ShouldBe(1, "non-idempotent write must never replay (decision #44)"); + } + + [Fact] + public async Task Write_Idempotent_RetriesUntilSuccess() + { + var flaky = new FlakeyDriver(failWritesBeforeIndex: 2); + var optionsWithRetry = new DriverResilienceOptions + { + Tier = DriverTier.A, + CapabilityPolicies = new Dictionary + { + [DriverCapability.Write] = new(TimeoutSeconds: 2, RetryCount: 5, BreakerFailureThreshold: 50), + }, + }; + var invoker = new CapabilityInvoker(new DriverResiliencePipelineBuilder(), Guid.NewGuid(), () => optionsWithRetry); + + var results = await invoker.ExecuteWriteAsync( + "host-1", + isIdempotent: true, + async ct => await flaky.WriteAsync([new WriteRequest("set-point", 42.0f)], ct), + CancellationToken.None); + + flaky.WriteAttempts.ShouldBe(3); + results[0].StatusCode.ShouldBe(0u); + } + + [Fact] + public async Task MultipleHosts_OnOneDriver_HaveIndependentFailureCounts() + { + var flaky = new FlakeyDriver(failReadsBeforeIndex: 0); + var options = new DriverResilienceOptions { Tier = DriverTier.A }; + var builder = new DriverResiliencePipelineBuilder(); + var invoker = new CapabilityInvoker(builder, Guid.NewGuid(), () => options); + + // host-dead: force many failures to exhaust retries + trip breaker + var threshold = options.Resolve(DriverCapability.Read).BreakerFailureThreshold; + for (var i = 0; i < threshold + 5; i++) + { + await Should.ThrowAsync(async () => + await invoker.ExecuteAsync(DriverCapability.Read, "host-dead", + _ => throw new InvalidOperationException("dead"), + CancellationToken.None)); + } + + // host-live: succeeds on first call — unaffected by the dead-host breaker + var liveAttempts = 0; + await invoker.ExecuteAsync(DriverCapability.Read, "host-live", + _ => { liveAttempts++; return ValueTask.FromResult("ok"); }, + CancellationToken.None); + + liveAttempts.ShouldBe(1); + } + + private sealed class FlakeyDriver : IReadable, IWritable + { + private readonly int _failReadsBeforeIndex; + private readonly int _failWritesBeforeIndex; + + public int ReadAttempts { get; private set; } + public int WriteAttempts { get; private set; } + + public FlakeyDriver(int failReadsBeforeIndex = 0, int failWritesBeforeIndex = 0) + { + _failReadsBeforeIndex = failReadsBeforeIndex; + _failWritesBeforeIndex = failWritesBeforeIndex; + } + + public Task> ReadAsync( + IReadOnlyList fullReferences, + CancellationToken cancellationToken) + { + var attempt = ++ReadAttempts; + if (attempt <= _failReadsBeforeIndex) + throw new InvalidOperationException($"transient read failure #{attempt}"); + + var now = DateTime.UtcNow; + IReadOnlyList result = fullReferences + .Select(_ => new DataValueSnapshot(Value: 0, StatusCode: 0u, SourceTimestampUtc: now, ServerTimestampUtc: now)) + .ToList(); + return Task.FromResult(result); + } + + public Task> WriteAsync( + IReadOnlyList writes, + CancellationToken cancellationToken) + { + var attempt = ++WriteAttempts; + if (attempt <= _failWritesBeforeIndex) + throw new InvalidOperationException($"transient write failure #{attempt}"); + + IReadOnlyList result = writes.Select(_ => new WriteResult(0u)).ToList(); + return Task.FromResult(result); + } + } +} diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.Modbus.Tests/ModbusDriverTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.Modbus.Tests/ModbusDriverTests.cs index 0b31fd2..48bb565 100644 --- a/tests/ZB.MOM.WW.OtOpcUa.Driver.Modbus.Tests/ModbusDriverTests.cs +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.Modbus.Tests/ModbusDriverTests.cs @@ -220,6 +220,23 @@ public sealed class ModbusDriverTests builder.Variables.ShouldContain(v => v.BrowseName == "Run" && v.Info.DriverDataType == DriverDataType.Boolean); } + [Fact] + public async Task Discover_propagates_WriteIdempotent_from_tag_to_attribute_info() + { + var (drv, _) = NewDriver( + new ModbusTagDefinition("SetPoint", ModbusRegion.HoldingRegisters, 0, ModbusDataType.Float32, WriteIdempotent: true), + new ModbusTagDefinition("PulseCoil", ModbusRegion.Coils, 0, ModbusDataType.Bool)); + await drv.InitializeAsync("{}", CancellationToken.None); + + var builder = new RecordingBuilder(); + await drv.DiscoverAsync(builder, CancellationToken.None); + + var setPoint = builder.Variables.Single(v => v.BrowseName == "SetPoint"); + var pulse = builder.Variables.Single(v => v.BrowseName == "PulseCoil"); + setPoint.Info.WriteIdempotent.ShouldBeTrue(); + pulse.Info.WriteIdempotent.ShouldBeFalse("default is opt-in per decision #44"); + } + // --- helpers --- private sealed class RecordingBuilder : IAddressSpaceBuilder diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.S7.Tests/S7DiscoveryAndSubscribeTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.S7.Tests/S7DiscoveryAndSubscribeTests.cs index 339369b..dc9d7dc 100644 --- a/tests/ZB.MOM.WW.OtOpcUa.Driver.S7.Tests/S7DiscoveryAndSubscribeTests.cs +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.S7.Tests/S7DiscoveryAndSubscribeTests.cs @@ -65,6 +65,27 @@ public sealed class S7DiscoveryAndSubscribeTests builder.Variables[2].Attr.DriverDataType.ShouldBe(DriverDataType.Float32); } + [Fact] + public async Task DiscoverAsync_propagates_WriteIdempotent_from_tag_to_attribute_info() + { + var opts = new S7DriverOptions + { + Host = "192.0.2.1", + Tags = + [ + new("SetPoint", "DB1.DBW0", S7DataType.Int16, WriteIdempotent: true), + new("StartBit", "M0.0", S7DataType.Bool), + ], + }; + using var drv = new S7Driver(opts, "s7-idem"); + + var builder = new RecordingAddressSpaceBuilder(); + await drv.DiscoverAsync(builder, TestContext.Current.CancellationToken); + + builder.Variables.Single(v => v.Name == "SetPoint").Attr.WriteIdempotent.ShouldBeTrue(); + builder.Variables.Single(v => v.Name == "StartBit").Attr.WriteIdempotent.ShouldBeFalse("default is opt-in per decision #44"); + } + [Fact] public void GetHostStatuses_returns_one_row_with_host_port_identity_pre_init() {