using Shouldly; using Xunit; using ZB.MOM.WW.OtOpcUa.Core.Abstractions; using ZB.MOM.WW.OtOpcUa.Driver.AbCip; namespace ZB.MOM.WW.OtOpcUa.Driver.AbCip.Tests; /// /// PR abcip-1.4 — multi-tag write packing. Validates that /// groups writes by device, dispatches packable writes for request-packing-capable /// families concurrently, falls back to sequential writes on Micro800, keeps BOOL-RMW /// writes on the per-parent semaphore path, and fans per-tag StatusCodes out to the /// correct positions on partial failures. /// [Trait("Category", "Unit")] public sealed class AbCipMultiWritePackingTests { [Fact] public async Task Writes_get_grouped_by_device() { var factory = new FakeAbCipTagFactory(); var drv = new AbCipDriver(new AbCipDriverOptions { Devices = [ new AbCipDeviceOptions("ab://10.0.0.5/1,0"), new AbCipDeviceOptions("ab://10.0.0.6/1,0"), ], Tags = [ new AbCipTagDefinition("A1", "ab://10.0.0.5/1,0", "A1", AbCipDataType.DInt), new AbCipTagDefinition("A2", "ab://10.0.0.5/1,0", "A2", AbCipDataType.DInt), new AbCipTagDefinition("B1", "ab://10.0.0.6/1,0", "B1", AbCipDataType.DInt), ], }, "drv-1", factory); await drv.InitializeAsync("{}", CancellationToken.None); var results = await drv.WriteAsync( [ new WriteRequest("A1", 1), new WriteRequest("B1", 100), new WriteRequest("A2", 2), ], CancellationToken.None); results.Count.ShouldBe(3); results[0].StatusCode.ShouldBe(AbCipStatusMapper.Good); results[1].StatusCode.ShouldBe(AbCipStatusMapper.Good); results[2].StatusCode.ShouldBe(AbCipStatusMapper.Good); // Per-device handles materialised — A1/A2 share device A, B1 lives on device B. factory.Tags["A1"].CreationParams.Gateway.ShouldBe("10.0.0.5"); factory.Tags["A2"].CreationParams.Gateway.ShouldBe("10.0.0.5"); factory.Tags["B1"].CreationParams.Gateway.ShouldBe("10.0.0.6"); factory.Tags["A1"].WriteCount.ShouldBe(1); factory.Tags["A2"].WriteCount.ShouldBe(1); factory.Tags["B1"].WriteCount.ShouldBe(1); } [Fact] public async Task ControlLogix_packs_concurrently_within_a_device() { // ControlLogix has SupportsRequestPacking=true → a multi-write batch is dispatched in // parallel. The fake's WriteAsync gates on a TaskCompletionSource so we can prove that // both writes are in flight at the same time before either completes. var gate = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var inFlight = 0; var maxInFlight = 0; var factory = new FakeAbCipTagFactory { Customise = p => new GatedWriteFake(p, gate, () => { var current = Interlocked.Increment(ref inFlight); var observed = maxInFlight; while (current > observed && Interlocked.CompareExchange(ref maxInFlight, current, observed) != observed) observed = maxInFlight; }, () => Interlocked.Decrement(ref inFlight)), }; var drv = new AbCipDriver(new AbCipDriverOptions { Devices = [new AbCipDeviceOptions("ab://10.0.0.5/1,0", AbCipPlcFamily.ControlLogix)], Tags = [ new AbCipTagDefinition("A", "ab://10.0.0.5/1,0", "A", AbCipDataType.DInt), new AbCipTagDefinition("B", "ab://10.0.0.5/1,0", "B", AbCipDataType.DInt), new AbCipTagDefinition("C", "ab://10.0.0.5/1,0", "C", AbCipDataType.DInt), ], }, "drv-1", factory); await drv.InitializeAsync("{}", CancellationToken.None); var writeTask = drv.WriteAsync( [ new WriteRequest("A", 1), new WriteRequest("B", 2), new WriteRequest("C", 3), ], CancellationToken.None); // Wait until all three writes have entered WriteAsync simultaneously, then release. await WaitForAsync(() => Volatile.Read(ref inFlight) >= 3, TimeSpan.FromSeconds(2)); gate.SetResult(0); var results = await writeTask; results.Count.ShouldBe(3); results[0].StatusCode.ShouldBe(AbCipStatusMapper.Good); results[1].StatusCode.ShouldBe(AbCipStatusMapper.Good); results[2].StatusCode.ShouldBe(AbCipStatusMapper.Good); maxInFlight.ShouldBeGreaterThanOrEqualTo(2, "ControlLogix supports request packing — packable writes should run concurrently within the device."); } [Fact] public async Task Micro800_falls_back_to_sequential_writes() { // Micro800 has SupportsRequestPacking=false → writes go one-at-a-time; the gated fake // never sees more than one in-flight at a time. var gate = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); gate.SetResult(0); // No need to gate — we just observe concurrency. var inFlight = 0; var maxInFlight = 0; var factory = new FakeAbCipTagFactory { Customise = p => new GatedWriteFake(p, gate, () => { var current = Interlocked.Increment(ref inFlight); var observed = maxInFlight; while (current > observed && Interlocked.CompareExchange(ref maxInFlight, current, observed) != observed) observed = maxInFlight; }, () => Interlocked.Decrement(ref inFlight)), }; var drv = new AbCipDriver(new AbCipDriverOptions { Devices = [new AbCipDeviceOptions("ab://10.0.0.5/", AbCipPlcFamily.Micro800)], Tags = [ new AbCipTagDefinition("A", "ab://10.0.0.5/", "A", AbCipDataType.DInt), new AbCipTagDefinition("B", "ab://10.0.0.5/", "B", AbCipDataType.DInt), new AbCipTagDefinition("C", "ab://10.0.0.5/", "C", AbCipDataType.DInt), ], }, "drv-1", factory); await drv.InitializeAsync("{}", CancellationToken.None); var results = await drv.WriteAsync( [ new WriteRequest("A", 1), new WriteRequest("B", 2), new WriteRequest("C", 3), ], CancellationToken.None); results.Count.ShouldBe(3); results.ShouldAllBe(r => r.StatusCode == AbCipStatusMapper.Good); maxInFlight.ShouldBe(1, "Micro800 disables request packing — writes must execute sequentially."); } [Fact] public async Task Bit_in_dint_writes_still_route_through_RMW_path() { // BOOL-with-bitIndex must hit the per-parent RMW semaphore — it must NOT go through // the packable per-tag runtime path. We prove this by checking that: // (a) the per-tag "bit-selector" runtime is never created (it would throw via // LibplctagTagRuntime's NotSupportedException had the bypass happened); // (b) the parent-DINT runtime got both a Read and a Write. var factory = new FakeAbCipTagFactory(); var drv = new AbCipDriver(new AbCipDriverOptions { Devices = [new AbCipDeviceOptions("ab://10.0.0.5/1,0")], Tags = [ new AbCipTagDefinition("Flag3", "ab://10.0.0.5/1,0", "Flags.3", AbCipDataType.Bool), new AbCipTagDefinition("Speed", "ab://10.0.0.5/1,0", "Speed", AbCipDataType.DInt), ], }, "drv-1", factory); await drv.InitializeAsync("{}", CancellationToken.None); var results = await drv.WriteAsync( [ new WriteRequest("Flag3", true), new WriteRequest("Speed", 99), ], CancellationToken.None); results.Count.ShouldBe(2); results[0].StatusCode.ShouldBe(AbCipStatusMapper.Good); results[1].StatusCode.ShouldBe(AbCipStatusMapper.Good); // Parent runtime created lazily for Flags (no .3 suffix) — drove the RMW. factory.Tags.ShouldContainKey("Flags"); factory.Tags["Flags"].ReadCount.ShouldBe(1); factory.Tags["Flags"].WriteCount.ShouldBe(1); // Speed went through the packable path. factory.Tags["Speed"].WriteCount.ShouldBe(1); } [Fact] public async Task Per_tag_status_code_fan_out_works_on_partial_failure() { // Mix Good + BadTimeout + BadNotWritable + BadNodeIdUnknown across two devices to // exercise the original-index preservation through the per-device plan + concurrent // dispatch. var factory = new FakeAbCipTagFactory { Customise = p => p.TagName == "B" ? new FakeAbCipTag(p) { Status = -5 /* timeout */ } : new FakeAbCipTag(p), }; var drv = new AbCipDriver(new AbCipDriverOptions { Devices = [ new AbCipDeviceOptions("ab://10.0.0.5/1,0"), new AbCipDeviceOptions("ab://10.0.0.6/1,0"), ], Tags = [ new AbCipTagDefinition("A", "ab://10.0.0.5/1,0", "A", AbCipDataType.DInt), new AbCipTagDefinition("B", "ab://10.0.0.5/1,0", "B", AbCipDataType.DInt), new AbCipTagDefinition("RO", "ab://10.0.0.5/1,0", "RO", AbCipDataType.DInt, Writable: false), new AbCipTagDefinition("C", "ab://10.0.0.6/1,0", "C", AbCipDataType.DInt), ], }, "drv-1", factory); await drv.InitializeAsync("{}", CancellationToken.None); var results = await drv.WriteAsync( [ new WriteRequest("A", 1), new WriteRequest("B", 2), new WriteRequest("RO", 3), new WriteRequest("UnknownTag", 4), new WriteRequest("C", 5), ], CancellationToken.None); results.Count.ShouldBe(5); results[0].StatusCode.ShouldBe(AbCipStatusMapper.Good); results[1].StatusCode.ShouldBe(AbCipStatusMapper.BadTimeout); results[2].StatusCode.ShouldBe(AbCipStatusMapper.BadNotWritable); results[3].StatusCode.ShouldBe(AbCipStatusMapper.BadNodeIdUnknown); results[4].StatusCode.ShouldBe(AbCipStatusMapper.Good); } private static async Task WaitForAsync(Func predicate, TimeSpan timeout) { var deadline = DateTime.UtcNow + timeout; while (!predicate()) { if (DateTime.UtcNow >= deadline) throw new TimeoutException("predicate did not become true within timeout"); await Task.Delay(10).ConfigureAwait(false); } } /// /// Test fake whose blocks on a shared /// so the test can observe how many writes are /// simultaneously in flight inside the driver. /// private sealed class GatedWriteFake : FakeAbCipTag { private readonly TaskCompletionSource _gate; private readonly Action _onEnter; private readonly Action _onExit; public GatedWriteFake(AbCipTagCreateParams p, TaskCompletionSource gate, Action onEnter, Action onExit) : base(p) { _gate = gate; _onEnter = onEnter; _onExit = onExit; } public override async Task WriteAsync(CancellationToken ct) { _onEnter(); try { await _gate.Task.ConfigureAwait(false); await base.WriteAsync(ct).ConfigureAwait(false); } finally { _onExit(); } } } }