diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.AbCip/AbCipDriver.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.AbCip/AbCipDriver.cs index a63d32b..ee494b5 100644 --- a/src/ZB.MOM.WW.OtOpcUa.Driver.AbCip/AbCipDriver.cs +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.AbCip/AbCipDriver.cs @@ -545,100 +545,184 @@ public sealed class AbCipDriver : IDriver, IReadable, IWritable, ITagDiscovery, // ---- IWritable ---- /// - /// Write each request in order. Writes are NOT auto-retried by the driver — per plan - /// decisions #44, #45, #143 the caller opts in via - /// and the resilience pipeline (layered above the driver) decides whether to replay. - /// Non-writable configurations surface as BadNotWritable; type-conversion failures - /// as BadTypeMismatch; transport errors as BadCommunicationError. + /// Write each request in the batch. Writes are NOT auto-retried by the driver — per + /// plan decisions #44, #45, #143 the caller opts in via + /// and the resilience pipeline (layered + /// above the driver) decides whether to replay. Non-writable configurations surface as + /// BadNotWritable; type-conversion failures as BadTypeMismatch; transport + /// errors as BadCommunicationError. /// + /// + /// PR abcip-1.4 — multi-tag write packing. Writes are grouped by device via + /// . Devices whose family + /// is true dispatch + /// their packable writes concurrently so libplctag's native scheduler can coalesce them + /// onto one CIP Multi-Service Packet (0x0A) per round-trip; Micro800 (no packing) still + /// issues writes one-at-a-time. BOOL-within-DINT writes always go through the RMW path + /// under a per-parent semaphore, regardless of the family flag, because two concurrent + /// RMWs on the same DINT could lose one another's update. Per-tag StatusCodes are + /// preserved in the caller's input order on partial failures. + /// public async Task> WriteAsync( IReadOnlyList writes, CancellationToken cancellationToken) { ArgumentNullException.ThrowIfNull(writes); var results = new WriteResult[writes.Count]; - var now = DateTime.UtcNow; - for (var i = 0; i < writes.Count; i++) + var plans = AbCipMultiWritePlanner.Build( + writes, _tagsByName, _devices, + reportPreflight: (idx, code) => results[idx] = new WriteResult(code)); + + foreach (var plan in plans) { - var w = writes[i]; - if (!_tagsByName.TryGetValue(w.FullReference, out var def)) + if (!_devices.TryGetValue(plan.DeviceHostAddress, out var device)) { - results[i] = new WriteResult(AbCipStatusMapper.BadNodeIdUnknown); - continue; - } - if (!def.Writable || def.SafetyTag) - { - results[i] = new WriteResult(AbCipStatusMapper.BadNotWritable); - continue; - } - if (!_devices.TryGetValue(def.DeviceHostAddress, out var device)) - { - results[i] = new WriteResult(AbCipStatusMapper.BadNodeIdUnknown); + foreach (var e in plan.Packable) results[e.OriginalIndex] = new WriteResult(AbCipStatusMapper.BadNodeIdUnknown); + foreach (var e in plan.BitRmw) results[e.OriginalIndex] = new WriteResult(AbCipStatusMapper.BadNodeIdUnknown); continue; } - try - { - var parsedPath = AbCipTagPath.TryParse(def.TagPath); + // Bit-RMW writes always serialise per-parent — never packed. + foreach (var entry in plan.BitRmw) + results[entry.OriginalIndex] = new WriteResult( + await ExecuteBitRmwWriteAsync(device, entry, cancellationToken).ConfigureAwait(false)); - // BOOL-within-DINT writes — per task #181, RMW against a parallel parent-DINT - // runtime. Dispatching here keeps the normal EncodeValue path clean; the - // per-parent lock prevents two concurrent bit writes to the same DINT from - // losing one another's update. - if (def.DataType == AbCipDataType.Bool && parsedPath?.BitIndex is int bit) + if (plan.Packable.Count == 0) continue; + + if (plan.Profile.SupportsRequestPacking && plan.Packable.Count > 1) + { + // Concurrent dispatch — libplctag's native scheduler packs same-connection writes + // into one Multi-Service Packet when the family supports it. + var tasks = new Task<(int idx, uint code)>[plan.Packable.Count]; + for (var i = 0; i < plan.Packable.Count; i++) { - results[i] = new WriteResult( - await WriteBitInDIntAsync(device, parsedPath, bit, w.Value, cancellationToken) - .ConfigureAwait(false)); - if (results[i].StatusCode == AbCipStatusMapper.Good) - _health = new DriverHealth(DriverState.Healthy, now, null); - continue; + var entry = plan.Packable[i]; + tasks[i] = ExecutePackableWriteAsync(device, entry, cancellationToken); } - - var runtime = await EnsureTagRuntimeAsync(device, def, cancellationToken).ConfigureAwait(false); - runtime.EncodeValue(def.DataType, parsedPath?.BitIndex, w.Value); - await runtime.WriteAsync(cancellationToken).ConfigureAwait(false); - - var status = runtime.GetStatus(); - results[i] = new WriteResult(status == 0 - ? AbCipStatusMapper.Good - : AbCipStatusMapper.MapLibplctagStatus(status)); - if (status == 0) _health = new DriverHealth(DriverState.Healthy, now, null); + var outcomes = await Task.WhenAll(tasks).ConfigureAwait(false); + foreach (var (idx, code) in outcomes) + results[idx] = new WriteResult(code); } - catch (OperationCanceledException) + else { - throw; - } - catch (NotSupportedException nse) - { - results[i] = new WriteResult(AbCipStatusMapper.BadNotSupported); - _health = new DriverHealth(DriverState.Degraded, _health.LastSuccessfulRead, nse.Message); - } - catch (FormatException fe) - { - results[i] = new WriteResult(AbCipStatusMapper.BadTypeMismatch); - _health = new DriverHealth(DriverState.Degraded, _health.LastSuccessfulRead, fe.Message); - } - catch (InvalidCastException ice) - { - results[i] = new WriteResult(AbCipStatusMapper.BadTypeMismatch); - _health = new DriverHealth(DriverState.Degraded, _health.LastSuccessfulRead, ice.Message); - } - catch (OverflowException oe) - { - results[i] = new WriteResult(AbCipStatusMapper.BadOutOfRange); - _health = new DriverHealth(DriverState.Degraded, _health.LastSuccessfulRead, oe.Message); - } - catch (Exception ex) - { - results[i] = new WriteResult(AbCipStatusMapper.BadCommunicationError); - _health = new DriverHealth(DriverState.Degraded, _health.LastSuccessfulRead, ex.Message); + // Single-write groups + Micro800 (SupportsRequestPacking=false) — sequential. + foreach (var entry in plan.Packable) + { + var code = await ExecutePackableWriteAsync(device, entry, cancellationToken) + .ConfigureAwait(false); + results[entry.OriginalIndex] = new WriteResult(code.code); + } } } return results; } + /// + /// Execute one packable write — encode the value into the per-tag runtime, flush, and + /// map the resulting libplctag status. Exception-to-StatusCode mapping mirrors the + /// pre-1.4 per-tag loop so callers see no behaviour change for individual writes. + /// + private async Task<(int idx, uint code)> ExecutePackableWriteAsync( + DeviceState device, AbCipMultiWritePlanner.ClassifiedWrite entry, CancellationToken ct) + { + var def = entry.Definition; + var w = entry.Request; + var now = DateTime.UtcNow; + try + { + var runtime = await EnsureTagRuntimeAsync(device, def, ct).ConfigureAwait(false); + runtime.EncodeValue(def.DataType, entry.ParsedPath?.BitIndex, w.Value); + await runtime.WriteAsync(ct).ConfigureAwait(false); + + var status = runtime.GetStatus(); + if (status == 0) + { + _health = new DriverHealth(DriverState.Healthy, now, null); + return (entry.OriginalIndex, AbCipStatusMapper.Good); + } + return (entry.OriginalIndex, AbCipStatusMapper.MapLibplctagStatus(status)); + } + catch (OperationCanceledException) + { + throw; + } + catch (NotSupportedException nse) + { + _health = new DriverHealth(DriverState.Degraded, _health.LastSuccessfulRead, nse.Message); + return (entry.OriginalIndex, AbCipStatusMapper.BadNotSupported); + } + catch (FormatException fe) + { + _health = new DriverHealth(DriverState.Degraded, _health.LastSuccessfulRead, fe.Message); + return (entry.OriginalIndex, AbCipStatusMapper.BadTypeMismatch); + } + catch (InvalidCastException ice) + { + _health = new DriverHealth(DriverState.Degraded, _health.LastSuccessfulRead, ice.Message); + return (entry.OriginalIndex, AbCipStatusMapper.BadTypeMismatch); + } + catch (OverflowException oe) + { + _health = new DriverHealth(DriverState.Degraded, _health.LastSuccessfulRead, oe.Message); + return (entry.OriginalIndex, AbCipStatusMapper.BadOutOfRange); + } + catch (Exception ex) + { + _health = new DriverHealth(DriverState.Degraded, _health.LastSuccessfulRead, ex.Message); + return (entry.OriginalIndex, AbCipStatusMapper.BadCommunicationError); + } + } + + /// + /// Execute one BOOL-within-DINT write through , with + /// the same exception-mapping fan-out as the pre-1.4 per-tag loop. Bit RMWs cannot be + /// packed because two concurrent writes against the same parent DINT would race their + /// read-modify-write windows. + /// + private async Task ExecuteBitRmwWriteAsync( + DeviceState device, AbCipMultiWritePlanner.ClassifiedWrite entry, CancellationToken ct) + { + try + { + var bit = entry.ParsedPath!.BitIndex!.Value; + var code = await WriteBitInDIntAsync(device, entry.ParsedPath, bit, entry.Request.Value, ct) + .ConfigureAwait(false); + if (code == AbCipStatusMapper.Good) + _health = new DriverHealth(DriverState.Healthy, DateTime.UtcNow, null); + return code; + } + catch (OperationCanceledException) + { + throw; + } + catch (NotSupportedException nse) + { + _health = new DriverHealth(DriverState.Degraded, _health.LastSuccessfulRead, nse.Message); + return AbCipStatusMapper.BadNotSupported; + } + catch (FormatException fe) + { + _health = new DriverHealth(DriverState.Degraded, _health.LastSuccessfulRead, fe.Message); + return AbCipStatusMapper.BadTypeMismatch; + } + catch (InvalidCastException ice) + { + _health = new DriverHealth(DriverState.Degraded, _health.LastSuccessfulRead, ice.Message); + return AbCipStatusMapper.BadTypeMismatch; + } + catch (OverflowException oe) + { + _health = new DriverHealth(DriverState.Degraded, _health.LastSuccessfulRead, oe.Message); + return AbCipStatusMapper.BadOutOfRange; + } + catch (Exception ex) + { + _health = new DriverHealth(DriverState.Degraded, _health.LastSuccessfulRead, ex.Message); + return AbCipStatusMapper.BadCommunicationError; + } + } + /// /// Read-modify-write one bit within a DINT parent. Creates / reuses a parallel /// parent-DINT runtime (distinct from the bit-selector handle) + serialises concurrent diff --git a/src/ZB.MOM.WW.OtOpcUa.Driver.AbCip/AbCipMultiWritePlanner.cs b/src/ZB.MOM.WW.OtOpcUa.Driver.AbCip/AbCipMultiWritePlanner.cs new file mode 100644 index 0000000..bf46923 --- /dev/null +++ b/src/ZB.MOM.WW.OtOpcUa.Driver.AbCip/AbCipMultiWritePlanner.cs @@ -0,0 +1,112 @@ +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.Driver.AbCip.PlcFamilies; + +namespace ZB.MOM.WW.OtOpcUa.Driver.AbCip; + +/// +/// PR abcip-1.4 — multi-tag write planner. Groups a batch of s by +/// device so the driver can submit one round of writes per device instead of looping +/// strictly serially across the whole batch. Honours the per-family +/// flag: families that support +/// CIP request packing (ControlLogix / CompactLogix / GuardLogix) issue their writes in +/// parallel so libplctag's internal scheduler can coalesce them onto one Multi-Service +/// Packet (0x0A); Micro800 (no request packing) falls back to per-tag sequential writes. +/// +/// +/// The libplctag .NET wrapper exposes one CIP service per Tag instance and does +/// not surface Multi-Service Packet construction at the API surface — but the underlying +/// native library packs concurrent operations against the same connection automatically +/// when the family's protocol supports it. Issuing the writes concurrently per device +/// therefore gives us the round-trip reduction described in #228 without having to drop to +/// raw CIP, while still letting us short-circuit packing on Micro800 where it would be +/// unsafe. +/// +/// Bit-RMW writes (BOOL-with-bitIndex against a DINT parent) are excluded from +/// packing here because they need a serialised read-modify-write under the per-parent +/// SemaphoreSlim in . Packing two RMWs +/// on the same DINT would risk losing one another's update. +/// +internal static class AbCipMultiWritePlanner +{ + /// + /// One classified entry in the input batch. preserves the + /// caller's ordering so per-tag StatusCode fan-out lands at the right slot in + /// the result array. routes the entry through the RMW path even + /// when the device supports packing. + /// + internal readonly record struct ClassifiedWrite( + int OriginalIndex, + WriteRequest Request, + AbCipTagDefinition Definition, + AbCipTagPath? ParsedPath, + bool IsBitRmw); + + /// + /// One device's plan slice. entries can be issued concurrently; + /// entries must go through the RMW path one-at-a-time per parent + /// DINT. + /// + internal sealed class DevicePlan + { + public required string DeviceHostAddress { get; init; } + public required AbCipPlcFamilyProfile Profile { get; init; } + public List Packable { get; } = new(); + public List BitRmw { get; } = new(); + } + + /// + /// Build the per-device plan list. Entries are visited in input order so the resulting + /// plan's traversal preserves caller ordering within each device. Entries that fail + /// resolution (unknown reference, non-writable tag, unknown device) are reported via + /// with the appropriate StatusCode and excluded from + /// the plan. + /// + public static IReadOnlyList Build( + IReadOnlyList writes, + IReadOnlyDictionary tagsByName, + IReadOnlyDictionary devices, + Action reportPreflight) + { + var plans = new Dictionary(StringComparer.OrdinalIgnoreCase); + var order = new List(); + + for (var i = 0; i < writes.Count; i++) + { + var w = writes[i]; + if (!tagsByName.TryGetValue(w.FullReference, out var def)) + { + reportPreflight(i, AbCipStatusMapper.BadNodeIdUnknown); + continue; + } + if (!def.Writable || def.SafetyTag) + { + reportPreflight(i, AbCipStatusMapper.BadNotWritable); + continue; + } + if (!devices.TryGetValue(def.DeviceHostAddress, out var device)) + { + reportPreflight(i, AbCipStatusMapper.BadNodeIdUnknown); + continue; + } + + if (!plans.TryGetValue(def.DeviceHostAddress, out var plan)) + { + plan = new DevicePlan + { + DeviceHostAddress = def.DeviceHostAddress, + Profile = device.Profile, + }; + plans[def.DeviceHostAddress] = plan; + order.Add(plan); + } + + var parsed = AbCipTagPath.TryParse(def.TagPath); + var isBitRmw = def.DataType == AbCipDataType.Bool && parsed?.BitIndex is int; + var entry = new ClassifiedWrite(i, w, def, parsed, isBitRmw); + if (isBitRmw) plan.BitRmw.Add(entry); + else plan.Packable.Add(entry); + } + + return order; + } +} diff --git a/tests/ZB.MOM.WW.OtOpcUa.Driver.AbCip.Tests/AbCipMultiWritePackingTests.cs b/tests/ZB.MOM.WW.OtOpcUa.Driver.AbCip.Tests/AbCipMultiWritePackingTests.cs new file mode 100644 index 0000000..25fbdc8 --- /dev/null +++ b/tests/ZB.MOM.WW.OtOpcUa.Driver.AbCip.Tests/AbCipMultiWritePackingTests.cs @@ -0,0 +1,283 @@ +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.Driver.AbCip; + +namespace ZB.MOM.WW.OtOpcUa.Driver.AbCip.Tests; + +/// +/// PR abcip-1.4 — multi-tag write packing. Validates that +/// groups writes by device, dispatches packable writes for request-packing-capable +/// families concurrently, falls back to sequential writes on Micro800, keeps BOOL-RMW +/// writes on the per-parent semaphore path, and fans per-tag StatusCodes out to the +/// correct positions on partial failures. +/// +[Trait("Category", "Unit")] +public sealed class AbCipMultiWritePackingTests +{ + [Fact] + public async Task Writes_get_grouped_by_device() + { + var factory = new FakeAbCipTagFactory(); + var drv = new AbCipDriver(new AbCipDriverOptions + { + Devices = + [ + new AbCipDeviceOptions("ab://10.0.0.5/1,0"), + new AbCipDeviceOptions("ab://10.0.0.6/1,0"), + ], + Tags = + [ + new AbCipTagDefinition("A1", "ab://10.0.0.5/1,0", "A1", AbCipDataType.DInt), + new AbCipTagDefinition("A2", "ab://10.0.0.5/1,0", "A2", AbCipDataType.DInt), + new AbCipTagDefinition("B1", "ab://10.0.0.6/1,0", "B1", AbCipDataType.DInt), + ], + }, "drv-1", factory); + await drv.InitializeAsync("{}", CancellationToken.None); + + var results = await drv.WriteAsync( + [ + new WriteRequest("A1", 1), + new WriteRequest("B1", 100), + new WriteRequest("A2", 2), + ], CancellationToken.None); + + results.Count.ShouldBe(3); + results[0].StatusCode.ShouldBe(AbCipStatusMapper.Good); + results[1].StatusCode.ShouldBe(AbCipStatusMapper.Good); + results[2].StatusCode.ShouldBe(AbCipStatusMapper.Good); + // Per-device handles materialised — A1/A2 share device A, B1 lives on device B. + factory.Tags["A1"].CreationParams.Gateway.ShouldBe("10.0.0.5"); + factory.Tags["A2"].CreationParams.Gateway.ShouldBe("10.0.0.5"); + factory.Tags["B1"].CreationParams.Gateway.ShouldBe("10.0.0.6"); + factory.Tags["A1"].WriteCount.ShouldBe(1); + factory.Tags["A2"].WriteCount.ShouldBe(1); + factory.Tags["B1"].WriteCount.ShouldBe(1); + } + + [Fact] + public async Task ControlLogix_packs_concurrently_within_a_device() + { + // ControlLogix has SupportsRequestPacking=true → a multi-write batch is dispatched in + // parallel. The fake's WriteAsync gates on a TaskCompletionSource so we can prove that + // both writes are in flight at the same time before either completes. + var gate = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var inFlight = 0; + var maxInFlight = 0; + var factory = new FakeAbCipTagFactory + { + Customise = p => new GatedWriteFake(p, gate, () => + { + var current = Interlocked.Increment(ref inFlight); + var observed = maxInFlight; + while (current > observed + && Interlocked.CompareExchange(ref maxInFlight, current, observed) != observed) + observed = maxInFlight; + }, () => Interlocked.Decrement(ref inFlight)), + }; + var drv = new AbCipDriver(new AbCipDriverOptions + { + Devices = [new AbCipDeviceOptions("ab://10.0.0.5/1,0", AbCipPlcFamily.ControlLogix)], + Tags = + [ + new AbCipTagDefinition("A", "ab://10.0.0.5/1,0", "A", AbCipDataType.DInt), + new AbCipTagDefinition("B", "ab://10.0.0.5/1,0", "B", AbCipDataType.DInt), + new AbCipTagDefinition("C", "ab://10.0.0.5/1,0", "C", AbCipDataType.DInt), + ], + }, "drv-1", factory); + await drv.InitializeAsync("{}", CancellationToken.None); + + var writeTask = drv.WriteAsync( + [ + new WriteRequest("A", 1), + new WriteRequest("B", 2), + new WriteRequest("C", 3), + ], CancellationToken.None); + + // Wait until all three writes have entered WriteAsync simultaneously, then release. + await WaitForAsync(() => Volatile.Read(ref inFlight) >= 3, TimeSpan.FromSeconds(2)); + gate.SetResult(0); + + var results = await writeTask; + results.Count.ShouldBe(3); + results[0].StatusCode.ShouldBe(AbCipStatusMapper.Good); + results[1].StatusCode.ShouldBe(AbCipStatusMapper.Good); + results[2].StatusCode.ShouldBe(AbCipStatusMapper.Good); + maxInFlight.ShouldBeGreaterThanOrEqualTo(2, + "ControlLogix supports request packing — packable writes should run concurrently within the device."); + } + + [Fact] + public async Task Micro800_falls_back_to_sequential_writes() + { + // Micro800 has SupportsRequestPacking=false → writes go one-at-a-time; the gated fake + // never sees more than one in-flight at a time. + var gate = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + gate.SetResult(0); // No need to gate — we just observe concurrency. + var inFlight = 0; + var maxInFlight = 0; + var factory = new FakeAbCipTagFactory + { + Customise = p => new GatedWriteFake(p, gate, () => + { + var current = Interlocked.Increment(ref inFlight); + var observed = maxInFlight; + while (current > observed + && Interlocked.CompareExchange(ref maxInFlight, current, observed) != observed) + observed = maxInFlight; + }, () => Interlocked.Decrement(ref inFlight)), + }; + var drv = new AbCipDriver(new AbCipDriverOptions + { + Devices = [new AbCipDeviceOptions("ab://10.0.0.5/", AbCipPlcFamily.Micro800)], + Tags = + [ + new AbCipTagDefinition("A", "ab://10.0.0.5/", "A", AbCipDataType.DInt), + new AbCipTagDefinition("B", "ab://10.0.0.5/", "B", AbCipDataType.DInt), + new AbCipTagDefinition("C", "ab://10.0.0.5/", "C", AbCipDataType.DInt), + ], + }, "drv-1", factory); + await drv.InitializeAsync("{}", CancellationToken.None); + + var results = await drv.WriteAsync( + [ + new WriteRequest("A", 1), + new WriteRequest("B", 2), + new WriteRequest("C", 3), + ], CancellationToken.None); + + results.Count.ShouldBe(3); + results.ShouldAllBe(r => r.StatusCode == AbCipStatusMapper.Good); + maxInFlight.ShouldBe(1, + "Micro800 disables request packing — writes must execute sequentially."); + } + + [Fact] + public async Task Bit_in_dint_writes_still_route_through_RMW_path() + { + // BOOL-with-bitIndex must hit the per-parent RMW semaphore — it must NOT go through + // the packable per-tag runtime path. We prove this by checking that: + // (a) the per-tag "bit-selector" runtime is never created (it would throw via + // LibplctagTagRuntime's NotSupportedException had the bypass happened); + // (b) the parent-DINT runtime got both a Read and a Write. + var factory = new FakeAbCipTagFactory(); + var drv = new AbCipDriver(new AbCipDriverOptions + { + Devices = [new AbCipDeviceOptions("ab://10.0.0.5/1,0")], + Tags = + [ + new AbCipTagDefinition("Flag3", "ab://10.0.0.5/1,0", "Flags.3", AbCipDataType.Bool), + new AbCipTagDefinition("Speed", "ab://10.0.0.5/1,0", "Speed", AbCipDataType.DInt), + ], + }, "drv-1", factory); + await drv.InitializeAsync("{}", CancellationToken.None); + + var results = await drv.WriteAsync( + [ + new WriteRequest("Flag3", true), + new WriteRequest("Speed", 99), + ], CancellationToken.None); + + results.Count.ShouldBe(2); + results[0].StatusCode.ShouldBe(AbCipStatusMapper.Good); + results[1].StatusCode.ShouldBe(AbCipStatusMapper.Good); + + // Parent runtime created lazily for Flags (no .3 suffix) — drove the RMW. + factory.Tags.ShouldContainKey("Flags"); + factory.Tags["Flags"].ReadCount.ShouldBe(1); + factory.Tags["Flags"].WriteCount.ShouldBe(1); + // Speed went through the packable path. + factory.Tags["Speed"].WriteCount.ShouldBe(1); + } + + [Fact] + public async Task Per_tag_status_code_fan_out_works_on_partial_failure() + { + // Mix Good + BadTimeout + BadNotWritable + BadNodeIdUnknown across two devices to + // exercise the original-index preservation through the per-device plan + concurrent + // dispatch. + var factory = new FakeAbCipTagFactory + { + Customise = p => p.TagName == "B" + ? new FakeAbCipTag(p) { Status = -5 /* timeout */ } + : new FakeAbCipTag(p), + }; + var drv = new AbCipDriver(new AbCipDriverOptions + { + Devices = + [ + new AbCipDeviceOptions("ab://10.0.0.5/1,0"), + new AbCipDeviceOptions("ab://10.0.0.6/1,0"), + ], + Tags = + [ + new AbCipTagDefinition("A", "ab://10.0.0.5/1,0", "A", AbCipDataType.DInt), + new AbCipTagDefinition("B", "ab://10.0.0.5/1,0", "B", AbCipDataType.DInt), + new AbCipTagDefinition("RO", "ab://10.0.0.5/1,0", "RO", AbCipDataType.DInt, Writable: false), + new AbCipTagDefinition("C", "ab://10.0.0.6/1,0", "C", AbCipDataType.DInt), + ], + }, "drv-1", factory); + await drv.InitializeAsync("{}", CancellationToken.None); + + var results = await drv.WriteAsync( + [ + new WriteRequest("A", 1), + new WriteRequest("B", 2), + new WriteRequest("RO", 3), + new WriteRequest("UnknownTag", 4), + new WriteRequest("C", 5), + ], CancellationToken.None); + + results.Count.ShouldBe(5); + results[0].StatusCode.ShouldBe(AbCipStatusMapper.Good); + results[1].StatusCode.ShouldBe(AbCipStatusMapper.BadTimeout); + results[2].StatusCode.ShouldBe(AbCipStatusMapper.BadNotWritable); + results[3].StatusCode.ShouldBe(AbCipStatusMapper.BadNodeIdUnknown); + results[4].StatusCode.ShouldBe(AbCipStatusMapper.Good); + } + + private static async Task WaitForAsync(Func predicate, TimeSpan timeout) + { + var deadline = DateTime.UtcNow + timeout; + while (!predicate()) + { + if (DateTime.UtcNow >= deadline) + throw new TimeoutException("predicate did not become true within timeout"); + await Task.Delay(10).ConfigureAwait(false); + } + } + + /// + /// Test fake whose blocks on a shared + /// so the test can observe how many writes are + /// simultaneously in flight inside the driver. + /// + private sealed class GatedWriteFake : FakeAbCipTag + { + private readonly TaskCompletionSource _gate; + private readonly Action _onEnter; + private readonly Action _onExit; + + public GatedWriteFake(AbCipTagCreateParams p, TaskCompletionSource gate, + Action onEnter, Action onExit) : base(p) + { + _gate = gate; + _onEnter = onEnter; + _onExit = onExit; + } + + public override async Task WriteAsync(CancellationToken ct) + { + _onEnter(); + try + { + await _gate.Task.ConfigureAwait(false); + await base.WriteAsync(ct).ConfigureAwait(false); + } + finally + { + _onExit(); + } + } + } +}