Group writes by device through new AbCipMultiWritePlanner; for families that support CIP request packing (ControlLogix / CompactLogix / GuardLogix) the packable writes for one device are dispatched concurrently so libplctag's native scheduler can coalesce them onto one Multi-Service Packet (0x0A). Micro800 keeps SupportsRequestPacking=false and falls back to per-tag sequential writes. BOOL-within-DINT writes are excluded from packing and continue to go through the per-parent RMW semaphore so two concurrent bit writes against the same DINT cannot lose one another's update. The libplctag .NET wrapper does not expose a Multi-Service Packet construction API at the per-Tag surface (each Tag is one CIP service), so this PR uses client-side coalescing — concurrent Task.WhenAll dispatch per device — rather than building raw CIP frames. The native libplctag scheduler does pack concurrent same-connection writes when the family allows it, which gives the round-trip reduction #228 calls for without ballooning the diff. Per-tag StatusCodes preserve caller order across success, transport failure, non-writable tags, unknown references, and unknown devices, including in mixed concurrent batches. Closes #228
284 lines
12 KiB
C#
284 lines
12 KiB
C#
using Shouldly;
|
|
using Xunit;
|
|
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
|
using ZB.MOM.WW.OtOpcUa.Driver.AbCip;
|
|
|
|
namespace ZB.MOM.WW.OtOpcUa.Driver.AbCip.Tests;
|
|
|
|
/// <summary>
|
|
/// PR abcip-1.4 — multi-tag write packing. Validates that <see cref="AbCipDriver.WriteAsync"/>
|
|
/// groups writes by device, dispatches packable writes for request-packing-capable
|
|
/// families concurrently, falls back to sequential writes on Micro800, keeps BOOL-RMW
|
|
/// writes on the per-parent semaphore path, and fans per-tag StatusCodes out to the
|
|
/// correct positions on partial failures.
|
|
/// </summary>
|
|
[Trait("Category", "Unit")]
|
|
public sealed class AbCipMultiWritePackingTests
|
|
{
|
|
[Fact]
|
|
public async Task Writes_get_grouped_by_device()
|
|
{
|
|
var factory = new FakeAbCipTagFactory();
|
|
var drv = new AbCipDriver(new AbCipDriverOptions
|
|
{
|
|
Devices =
|
|
[
|
|
new AbCipDeviceOptions("ab://10.0.0.5/1,0"),
|
|
new AbCipDeviceOptions("ab://10.0.0.6/1,0"),
|
|
],
|
|
Tags =
|
|
[
|
|
new AbCipTagDefinition("A1", "ab://10.0.0.5/1,0", "A1", AbCipDataType.DInt),
|
|
new AbCipTagDefinition("A2", "ab://10.0.0.5/1,0", "A2", AbCipDataType.DInt),
|
|
new AbCipTagDefinition("B1", "ab://10.0.0.6/1,0", "B1", AbCipDataType.DInt),
|
|
],
|
|
}, "drv-1", factory);
|
|
await drv.InitializeAsync("{}", CancellationToken.None);
|
|
|
|
var results = await drv.WriteAsync(
|
|
[
|
|
new WriteRequest("A1", 1),
|
|
new WriteRequest("B1", 100),
|
|
new WriteRequest("A2", 2),
|
|
], CancellationToken.None);
|
|
|
|
results.Count.ShouldBe(3);
|
|
results[0].StatusCode.ShouldBe(AbCipStatusMapper.Good);
|
|
results[1].StatusCode.ShouldBe(AbCipStatusMapper.Good);
|
|
results[2].StatusCode.ShouldBe(AbCipStatusMapper.Good);
|
|
// Per-device handles materialised — A1/A2 share device A, B1 lives on device B.
|
|
factory.Tags["A1"].CreationParams.Gateway.ShouldBe("10.0.0.5");
|
|
factory.Tags["A2"].CreationParams.Gateway.ShouldBe("10.0.0.5");
|
|
factory.Tags["B1"].CreationParams.Gateway.ShouldBe("10.0.0.6");
|
|
factory.Tags["A1"].WriteCount.ShouldBe(1);
|
|
factory.Tags["A2"].WriteCount.ShouldBe(1);
|
|
factory.Tags["B1"].WriteCount.ShouldBe(1);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task ControlLogix_packs_concurrently_within_a_device()
|
|
{
|
|
// ControlLogix has SupportsRequestPacking=true → a multi-write batch is dispatched in
|
|
// parallel. The fake's WriteAsync gates on a TaskCompletionSource so we can prove that
|
|
// both writes are in flight at the same time before either completes.
|
|
var gate = new TaskCompletionSource<int>(TaskCreationOptions.RunContinuationsAsynchronously);
|
|
var inFlight = 0;
|
|
var maxInFlight = 0;
|
|
var factory = new FakeAbCipTagFactory
|
|
{
|
|
Customise = p => new GatedWriteFake(p, gate, () =>
|
|
{
|
|
var current = Interlocked.Increment(ref inFlight);
|
|
var observed = maxInFlight;
|
|
while (current > observed
|
|
&& Interlocked.CompareExchange(ref maxInFlight, current, observed) != observed)
|
|
observed = maxInFlight;
|
|
}, () => Interlocked.Decrement(ref inFlight)),
|
|
};
|
|
var drv = new AbCipDriver(new AbCipDriverOptions
|
|
{
|
|
Devices = [new AbCipDeviceOptions("ab://10.0.0.5/1,0", AbCipPlcFamily.ControlLogix)],
|
|
Tags =
|
|
[
|
|
new AbCipTagDefinition("A", "ab://10.0.0.5/1,0", "A", AbCipDataType.DInt),
|
|
new AbCipTagDefinition("B", "ab://10.0.0.5/1,0", "B", AbCipDataType.DInt),
|
|
new AbCipTagDefinition("C", "ab://10.0.0.5/1,0", "C", AbCipDataType.DInt),
|
|
],
|
|
}, "drv-1", factory);
|
|
await drv.InitializeAsync("{}", CancellationToken.None);
|
|
|
|
var writeTask = drv.WriteAsync(
|
|
[
|
|
new WriteRequest("A", 1),
|
|
new WriteRequest("B", 2),
|
|
new WriteRequest("C", 3),
|
|
], CancellationToken.None);
|
|
|
|
// Wait until all three writes have entered WriteAsync simultaneously, then release.
|
|
await WaitForAsync(() => Volatile.Read(ref inFlight) >= 3, TimeSpan.FromSeconds(2));
|
|
gate.SetResult(0);
|
|
|
|
var results = await writeTask;
|
|
results.Count.ShouldBe(3);
|
|
results[0].StatusCode.ShouldBe(AbCipStatusMapper.Good);
|
|
results[1].StatusCode.ShouldBe(AbCipStatusMapper.Good);
|
|
results[2].StatusCode.ShouldBe(AbCipStatusMapper.Good);
|
|
maxInFlight.ShouldBeGreaterThanOrEqualTo(2,
|
|
"ControlLogix supports request packing — packable writes should run concurrently within the device.");
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Micro800_falls_back_to_sequential_writes()
|
|
{
|
|
// Micro800 has SupportsRequestPacking=false → writes go one-at-a-time; the gated fake
|
|
// never sees more than one in-flight at a time.
|
|
var gate = new TaskCompletionSource<int>(TaskCreationOptions.RunContinuationsAsynchronously);
|
|
gate.SetResult(0); // No need to gate — we just observe concurrency.
|
|
var inFlight = 0;
|
|
var maxInFlight = 0;
|
|
var factory = new FakeAbCipTagFactory
|
|
{
|
|
Customise = p => new GatedWriteFake(p, gate, () =>
|
|
{
|
|
var current = Interlocked.Increment(ref inFlight);
|
|
var observed = maxInFlight;
|
|
while (current > observed
|
|
&& Interlocked.CompareExchange(ref maxInFlight, current, observed) != observed)
|
|
observed = maxInFlight;
|
|
}, () => Interlocked.Decrement(ref inFlight)),
|
|
};
|
|
var drv = new AbCipDriver(new AbCipDriverOptions
|
|
{
|
|
Devices = [new AbCipDeviceOptions("ab://10.0.0.5/", AbCipPlcFamily.Micro800)],
|
|
Tags =
|
|
[
|
|
new AbCipTagDefinition("A", "ab://10.0.0.5/", "A", AbCipDataType.DInt),
|
|
new AbCipTagDefinition("B", "ab://10.0.0.5/", "B", AbCipDataType.DInt),
|
|
new AbCipTagDefinition("C", "ab://10.0.0.5/", "C", AbCipDataType.DInt),
|
|
],
|
|
}, "drv-1", factory);
|
|
await drv.InitializeAsync("{}", CancellationToken.None);
|
|
|
|
var results = await drv.WriteAsync(
|
|
[
|
|
new WriteRequest("A", 1),
|
|
new WriteRequest("B", 2),
|
|
new WriteRequest("C", 3),
|
|
], CancellationToken.None);
|
|
|
|
results.Count.ShouldBe(3);
|
|
results.ShouldAllBe(r => r.StatusCode == AbCipStatusMapper.Good);
|
|
maxInFlight.ShouldBe(1,
|
|
"Micro800 disables request packing — writes must execute sequentially.");
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Bit_in_dint_writes_still_route_through_RMW_path()
|
|
{
|
|
// BOOL-with-bitIndex must hit the per-parent RMW semaphore — it must NOT go through
|
|
// the packable per-tag runtime path. We prove this by checking that:
|
|
// (a) the per-tag "bit-selector" runtime is never created (it would throw via
|
|
// LibplctagTagRuntime's NotSupportedException had the bypass happened);
|
|
// (b) the parent-DINT runtime got both a Read and a Write.
|
|
var factory = new FakeAbCipTagFactory();
|
|
var drv = new AbCipDriver(new AbCipDriverOptions
|
|
{
|
|
Devices = [new AbCipDeviceOptions("ab://10.0.0.5/1,0")],
|
|
Tags =
|
|
[
|
|
new AbCipTagDefinition("Flag3", "ab://10.0.0.5/1,0", "Flags.3", AbCipDataType.Bool),
|
|
new AbCipTagDefinition("Speed", "ab://10.0.0.5/1,0", "Speed", AbCipDataType.DInt),
|
|
],
|
|
}, "drv-1", factory);
|
|
await drv.InitializeAsync("{}", CancellationToken.None);
|
|
|
|
var results = await drv.WriteAsync(
|
|
[
|
|
new WriteRequest("Flag3", true),
|
|
new WriteRequest("Speed", 99),
|
|
], CancellationToken.None);
|
|
|
|
results.Count.ShouldBe(2);
|
|
results[0].StatusCode.ShouldBe(AbCipStatusMapper.Good);
|
|
results[1].StatusCode.ShouldBe(AbCipStatusMapper.Good);
|
|
|
|
// Parent runtime created lazily for Flags (no .3 suffix) — drove the RMW.
|
|
factory.Tags.ShouldContainKey("Flags");
|
|
factory.Tags["Flags"].ReadCount.ShouldBe(1);
|
|
factory.Tags["Flags"].WriteCount.ShouldBe(1);
|
|
// Speed went through the packable path.
|
|
factory.Tags["Speed"].WriteCount.ShouldBe(1);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Per_tag_status_code_fan_out_works_on_partial_failure()
|
|
{
|
|
// Mix Good + BadTimeout + BadNotWritable + BadNodeIdUnknown across two devices to
|
|
// exercise the original-index preservation through the per-device plan + concurrent
|
|
// dispatch.
|
|
var factory = new FakeAbCipTagFactory
|
|
{
|
|
Customise = p => p.TagName == "B"
|
|
? new FakeAbCipTag(p) { Status = -5 /* timeout */ }
|
|
: new FakeAbCipTag(p),
|
|
};
|
|
var drv = new AbCipDriver(new AbCipDriverOptions
|
|
{
|
|
Devices =
|
|
[
|
|
new AbCipDeviceOptions("ab://10.0.0.5/1,0"),
|
|
new AbCipDeviceOptions("ab://10.0.0.6/1,0"),
|
|
],
|
|
Tags =
|
|
[
|
|
new AbCipTagDefinition("A", "ab://10.0.0.5/1,0", "A", AbCipDataType.DInt),
|
|
new AbCipTagDefinition("B", "ab://10.0.0.5/1,0", "B", AbCipDataType.DInt),
|
|
new AbCipTagDefinition("RO", "ab://10.0.0.5/1,0", "RO", AbCipDataType.DInt, Writable: false),
|
|
new AbCipTagDefinition("C", "ab://10.0.0.6/1,0", "C", AbCipDataType.DInt),
|
|
],
|
|
}, "drv-1", factory);
|
|
await drv.InitializeAsync("{}", CancellationToken.None);
|
|
|
|
var results = await drv.WriteAsync(
|
|
[
|
|
new WriteRequest("A", 1),
|
|
new WriteRequest("B", 2),
|
|
new WriteRequest("RO", 3),
|
|
new WriteRequest("UnknownTag", 4),
|
|
new WriteRequest("C", 5),
|
|
], CancellationToken.None);
|
|
|
|
results.Count.ShouldBe(5);
|
|
results[0].StatusCode.ShouldBe(AbCipStatusMapper.Good);
|
|
results[1].StatusCode.ShouldBe(AbCipStatusMapper.BadTimeout);
|
|
results[2].StatusCode.ShouldBe(AbCipStatusMapper.BadNotWritable);
|
|
results[3].StatusCode.ShouldBe(AbCipStatusMapper.BadNodeIdUnknown);
|
|
results[4].StatusCode.ShouldBe(AbCipStatusMapper.Good);
|
|
}
|
|
|
|
private static async Task WaitForAsync(Func<bool> predicate, TimeSpan timeout)
|
|
{
|
|
var deadline = DateTime.UtcNow + timeout;
|
|
while (!predicate())
|
|
{
|
|
if (DateTime.UtcNow >= deadline)
|
|
throw new TimeoutException("predicate did not become true within timeout");
|
|
await Task.Delay(10).ConfigureAwait(false);
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Test fake whose <see cref="WriteAsync"/> blocks on a shared
|
|
/// <see cref="TaskCompletionSource"/> so the test can observe how many writes are
|
|
/// simultaneously in flight inside the driver.
|
|
/// </summary>
|
|
private sealed class GatedWriteFake : FakeAbCipTag
|
|
{
|
|
private readonly TaskCompletionSource<int> _gate;
|
|
private readonly Action _onEnter;
|
|
private readonly Action _onExit;
|
|
|
|
public GatedWriteFake(AbCipTagCreateParams p, TaskCompletionSource<int> gate,
|
|
Action onEnter, Action onExit) : base(p)
|
|
{
|
|
_gate = gate;
|
|
_onEnter = onEnter;
|
|
_onExit = onExit;
|
|
}
|
|
|
|
public override async Task WriteAsync(CancellationToken ct)
|
|
{
|
|
_onEnter();
|
|
try
|
|
{
|
|
await _gate.Task.ConfigureAwait(false);
|
|
await base.WriteAsync(ct).ConfigureAwait(false);
|
|
}
|
|
finally
|
|
{
|
|
_onExit();
|
|
}
|
|
}
|
|
}
|
|
}
|