fix(focas): serialize per-device wire I/O + bound reads; tolerate AdminUI config formats

Equipment tags were stuck at Bad_WaitingForInitialData on the deployed driver: the equipment poll, fixed-tree loop, probe and recycle shared one FOCAS/2 socket with no serialization, and the steady-state read had no timeout — concurrent reads collided and a stalled read hung forever, never overwriting the node's initial-data seed.

- SynchronizedFocasClient: per-device SemaphoreSlim gate + per-call timeout around every wire op (Connect/Probe gated, not double-bounded); wired in EnsureConnectedAsync. ReadAsync/WriteAsync map a per-call timeout to BadCommunicationError instead of rethrowing.
- FlexibleStringConverter on FOCAS config Series: the AdminUI persists the enum as a number ("series":6); accept number-or-string instead of throwing -> stub.
- FocasHostAddress.TryParse tolerates a scheme-less {ip}[:{port}] (AdminUI hostAddress form); canonical focas:// unchanged, malformed schemes still rejected.

247 FOCAS tests green; each fix has a regression test. Live-validated on wonder-app-vd03 (tags read Good).
This commit is contained in:
Joseph Doherty
2026-06-26 05:59:54 -04:00
parent 20b2df9241
commit 235b8b8e6d
9 changed files with 484 additions and 11 deletions
@@ -70,9 +70,10 @@ public sealed class FocasDriverProbeTests
[Fact]
public async Task MalformedHostAddress_Returns_OkFalse_WithNoHostPortMessage()
{
// "not-a-focas-url" is not a focas:// URL — TryParse returns null.
// A foreign URI scheme ("http://…") is rejected by TryParse → null. (A bare
// "{ip}[:{port}]" without a scheme is now tolerated, so it can't be the malformed case.)
var result = await Probe.ProbeAsync(
"{\"devices\":[{\"hostAddress\":\"not-a-focas-url\"}]}",
"{\"devices\":[{\"hostAddress\":\"http://10.0.0.5/\"}]}",
TimeSpan.FromSeconds(3),
TestContext.Current.CancellationToken);
@@ -38,6 +38,25 @@ public sealed class FocasFactoryConfigTests
drv.Options.FixedTree.TimerPollInterval.ShouldBe(TimeSpan.FromSeconds(30));
}
/// <summary>
/// The AdminUI persists FocasCncSeries as its integer value (e.g. <c>"series":6</c> = Thirty_i) —
/// a bare JSON number. The factory must tolerate it (via FlexibleStringConverter) and build the
/// real driver, not throw + fall back to a stub. Regression for the 2026-06-26 wonder data-plane
/// deploy where the driver stubbed on "Cannot get the value of a token type 'Number' as a string".
/// </summary>
[Fact]
public void CreateInstance_accepts_numeric_Series_from_AdminUI_serialization()
{
const string json = """
{"Backend":"wire","series":6,"devices":[{"hostAddress":"10.0.0.5:8193","deviceName":"Makino","series":6,"positionDecimalPlaces":0}]}
""";
var drv = FocasDriverFactoryExtensions.CreateInstance("drv-1", json);
drv.Options.Devices.ShouldHaveSingleItem();
drv.Options.Devices[0].Series.ShouldBe(FocasCncSeries.Thirty_i);
}
/// <summary>Verifies that the AlarmProjection configuration section is mapped to driver options.</summary>
[Fact]
public void CreateInstance_maps_AlarmProjection_section_onto_options()
@@ -0,0 +1,207 @@
using System.Diagnostics;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Driver.FOCAS;
namespace ZB.MOM.WW.OtOpcUa.Driver.FOCAS.Tests;
/// <summary>
/// Coverage for the FOCAS data-plane fix (2026-06-25 equipment-tag investigation): all wire I/O
/// on a device's single FOCAS/2 socket must be serialized (request→response cannot interleave)
/// and every steady-state read/write must be time-bounded so a stalled CNC read surfaces as a
/// recoverable error instead of hanging forever at BadWaitingForInitialData. See
/// <c>docs/plans/2026-06-25-otopcua-equipment-dataplane-investigation.md</c>.
/// </summary>
[Trait("Category", "Unit")]
public sealed class FocasIoSerializationTests
{
private static readonly FocasAddress Macro500 = new(FocasAreaKind.Macro, null, 500, null);
// ---- SynchronizedFocasClient: serialization ----
[Fact]
public async Task Concurrent_reads_are_serialized_onto_the_inner_client()
{
var inner = new RecordingClient { ReadDelay = TimeSpan.FromMilliseconds(20) };
await using var _ = NoopDispose(inner);
var client = new SynchronizedFocasClient(inner, TimeSpan.FromSeconds(5));
var reads = Enumerable.Range(0, 8)
.Select(_ => client.ReadAsync(Macro500, FocasDataType.Float64, CancellationToken.None));
await Task.WhenAll(reads);
inner.MaxConcurrency.ShouldBe(1); // never more than one wire op on the socket at a time
inner.ReadCount.ShouldBe(8);
}
// ---- SynchronizedFocasClient: per-call timeout ----
[Fact]
public async Task A_hung_read_is_bounded_by_the_call_timeout()
{
var inner = new RecordingClient { BlockReadUntilCancelled = true };
var client = new SynchronizedFocasClient(inner, TimeSpan.FromMilliseconds(100));
var sw = Stopwatch.StartNew();
await Should.ThrowAsync<OperationCanceledException>(
() => client.ReadAsync(Macro500, FocasDataType.Float64, CancellationToken.None));
sw.Stop();
sw.Elapsed.ShouldBeLessThan(TimeSpan.FromSeconds(2)); // bounded, not the indefinite OS TCP wait
}
[Fact]
public async Task A_hung_read_does_not_hold_the_socket_for_the_next_call()
{
// The gate must be released when a bounded call times out, otherwise one stall would wedge
// every subsequent op on the device. Read #1 hangs (times out); read #2 must still proceed.
var inner = new TimeoutThenServeClient { FirstCallBlocks = true };
var client = new SynchronizedFocasClient(inner, TimeSpan.FromMilliseconds(100));
await Should.ThrowAsync<OperationCanceledException>(
() => client.ReadAsync(Macro500, FocasDataType.Float64, CancellationToken.None));
var (value, status) = await client.ReadAsync(Macro500, FocasDataType.Float64, CancellationToken.None);
status.ShouldBe(FocasStatusMapper.Good);
value.ShouldBe(42);
}
[Fact]
public async Task Probe_is_not_bounded_by_the_call_timeout()
{
// Connect/Probe carry their own budgets; the decorator must not shrink them to its read budget.
var inner = new RecordingClient { ProbeDelay = TimeSpan.FromMilliseconds(200) };
var client = new SynchronizedFocasClient(inner, TimeSpan.FromMilliseconds(50));
var result = await client.ProbeAsync(CancellationToken.None);
result.ShouldBeTrue();
}
[Fact]
public async Task Zero_call_timeout_disables_the_per_call_bound()
{
var inner = new RecordingClient { ReadDelay = TimeSpan.FromMilliseconds(120) };
var client = new SynchronizedFocasClient(inner, TimeSpan.Zero);
var (value, status) = await client.ReadAsync(Macro500, FocasDataType.Float64, CancellationToken.None);
status.ShouldBe(FocasStatusMapper.Good);
value.ShouldBe(42);
}
[Fact]
public void Dispose_disposes_the_inner_client()
{
var inner = new RecordingClient();
var client = new SynchronizedFocasClient(inner, TimeSpan.FromSeconds(1));
client.Dispose();
inner.DisposeCount.ShouldBe(1);
}
// ---- Driver level: a timed-out read overwrites the seed with a recoverable status ----
[Fact]
public async Task Driver_read_that_times_out_returns_BadCommunicationError_not_a_hang()
{
var factory = new FakeFocasClientFactory { Customise = () => new RecordingClient { BlockReadUntilCancelled = true } };
var drv = new FocasDriver(new FocasDriverOptions
{
Devices = [new FocasDeviceOptions("focas://10.0.0.5:8193")],
Tags = [new FocasTagDefinition("CustomVar", "focas://10.0.0.5:8193", "MACRO:500", FocasDataType.Float64)],
Probe = new FocasProbeOptions { Enabled = false },
Timeout = TimeSpan.FromMilliseconds(150),
}, "drv-1", factory);
await drv.InitializeAsync("{}", CancellationToken.None);
var sw = Stopwatch.StartNew();
var snap = (await drv.ReadAsync(["CustomVar"], CancellationToken.None)).Single();
sw.Stop();
snap.StatusCode.ShouldBe(FocasStatusMapper.BadCommunicationError);
sw.Elapsed.ShouldBeLessThan(TimeSpan.FromSeconds(2)); // bounded by Timeout, not hung
}
[Fact]
public async Task Driver_read_does_not_propagate_a_call_timeout_as_cancellation()
{
// The per-call timeout must NOT bubble out of ReadAsync as OperationCanceledException — that
// would abort the whole poll batch. It must be caught and turned into a per-tag Bad status.
var factory = new FakeFocasClientFactory { Customise = () => new RecordingClient { BlockReadUntilCancelled = true } };
var drv = new FocasDriver(new FocasDriverOptions
{
Devices = [new FocasDeviceOptions("focas://10.0.0.5:8193")],
Tags = [new FocasTagDefinition("CustomVar", "focas://10.0.0.5:8193", "MACRO:500", FocasDataType.Float64)],
Probe = new FocasProbeOptions { Enabled = false },
Timeout = TimeSpan.FromMilliseconds(120),
}, "drv-1", factory);
await drv.InitializeAsync("{}", CancellationToken.None);
// Should complete (not throw) with a Bad snapshot, even though the caller's token is never cancelled.
var snaps = await drv.ReadAsync(["CustomVar"], CancellationToken.None);
snaps.Single().StatusCode.ShouldBe(FocasStatusMapper.BadCommunicationError);
}
private static DisposeGuard NoopDispose(IDisposable d) => new(d);
private sealed class DisposeGuard(IDisposable inner) : IAsyncDisposable
{
public ValueTask DisposeAsync() { inner.Dispose(); return ValueTask.CompletedTask; }
}
/// <summary>Fake that records concurrency + optionally delays/blocks reads and probes.</summary>
private class RecordingClient : FakeFocasClient
{
private int _current;
public int MaxConcurrency;
public int ReadCount;
public TimeSpan ReadDelay = TimeSpan.Zero;
public bool BlockReadUntilCancelled;
public TimeSpan ProbeDelay = TimeSpan.Zero;
public override async Task<(object? value, uint status)> ReadAsync(
FocasAddress address, FocasDataType type, CancellationToken ct)
{
Interlocked.Increment(ref ReadCount);
var observed = Interlocked.Increment(ref _current);
InterlockedMax(ref MaxConcurrency, observed);
try
{
if (BlockReadUntilCancelled) await Task.Delay(Timeout.Infinite, ct).ConfigureAwait(false);
else if (ReadDelay > TimeSpan.Zero) await Task.Delay(ReadDelay, ct).ConfigureAwait(false);
return ((object?)42, FocasStatusMapper.Good);
}
finally { Interlocked.Decrement(ref _current); }
}
public override async Task<bool> ProbeAsync(CancellationToken ct)
{
if (ProbeDelay > TimeSpan.Zero) await Task.Delay(ProbeDelay, ct).ConfigureAwait(false);
return true;
}
private static void InterlockedMax(ref int target, int value)
{
int seen;
do { seen = Volatile.Read(ref target); if (value <= seen) return; }
while (Interlocked.CompareExchange(ref target, value, seen) != seen);
}
}
/// <summary>First read blocks until cancelled; subsequent reads serve a Good value immediately.</summary>
private sealed class TimeoutThenServeClient : FakeFocasClient
{
public bool FirstCallBlocks;
private int _calls;
public override async Task<(object? value, uint status)> ReadAsync(
FocasAddress address, FocasDataType type, CancellationToken ct)
{
var n = Interlocked.Increment(ref _calls);
if (n == 1 && FirstCallBlocks) await Task.Delay(Timeout.Infinite, ct).ConfigureAwait(false);
return ((object?)42, FocasStatusMapper.Good);
}
}
}
@@ -245,6 +245,32 @@ public sealed class FocasReadWriteTests
/// <summary>Verifies that cancellation signals are propagated.</summary>
[Fact]
public async Task Cancellation_propagates()
{
var (drv, factory) = NewDriver(
new FocasTagDefinition("X", "focas://10.0.0.5:8193", "R100", FocasDataType.Byte));
await drv.InitializeAsync("{}", CancellationToken.None);
using var cts = new CancellationTokenSource();
cts.Cancel();
factory.Customise = () => new FakeFocasClient
{
ThrowOnRead = true,
Exception = new OperationCanceledException(cts.Token),
};
// A CANCELLATION of the caller's token must propagate (abort the read). This is distinct
// from a per-call timeout — an OCE raised while the caller's token is still live is swallowed
// to a per-tag BadCommunicationError (see Swallows_a_spurious_read_OCE_when_caller_not_cancelled).
await Should.ThrowAsync<OperationCanceledException>(
() => drv.ReadAsync(["X"], cts.Token));
}
/// <summary>
/// An OperationCanceledException from the wire read while the CALLER'S token is NOT cancelled
/// (e.g. a per-call timeout firing) must be turned into a per-tag BadCommunicationError, not
/// propagated — otherwise one stalled tag would abort the whole poll batch.
/// </summary>
[Fact]
public async Task Swallows_a_spurious_read_OCE_when_caller_not_cancelled()
{
var (drv, factory) = NewDriver(
new FocasTagDefinition("X", "focas://10.0.0.5:8193", "R100", FocasDataType.Byte));
@@ -255,8 +281,8 @@ public sealed class FocasReadWriteTests
Exception = new OperationCanceledException(),
};
await Should.ThrowAsync<OperationCanceledException>(
() => drv.ReadAsync(["X"], CancellationToken.None));
var snap = (await drv.ReadAsync(["X"], CancellationToken.None)).Single();
snap.StatusCode.ShouldBe(FocasStatusMapper.BadCommunicationError);
}
/// <summary>Verifies that ShutdownAsync disposes the client.</summary>
@@ -20,6 +20,9 @@ public sealed class FocasScaffoldingTests
[InlineData("focas://cnc-01.factory.internal:8193", "cnc-01.factory.internal", 8193)]
[InlineData("focas://10.0.0.5:12345", "10.0.0.5", 12345)]
[InlineData("FOCAS://10.0.0.5:8193", "10.0.0.5", 8193)] // case-insensitive scheme
[InlineData("10.201.31.5:8193", "10.201.31.5", 8193)] // scheme-less (AdminUI-persisted form)
[InlineData("10.0.0.5", "10.0.0.5", 8193)] // scheme-less, default port
[InlineData("cnc-01.factory.internal:8193", "cnc-01.factory.internal", 8193)] // scheme-less hostname
public void HostAddress_parses_valid(string input, string host, int port)
{
var parsed = FocasHostAddress.TryParse(input);
@@ -224,9 +227,11 @@ public sealed class FocasScaffoldingTests
[Fact]
public async Task InitializeAsync_malformed_address_faults()
{
// A non-focas:// URI scheme is rejected by TryParse (a bare "{ip}[:{port}]" is now
// tolerated, so the malformed case must carry a foreign scheme).
var drv = new FocasDriver(new FocasDriverOptions
{
Devices = [new FocasDeviceOptions("not-an-address")],
Devices = [new FocasDeviceOptions("http://10.0.0.5/")],
}, "drv-1");
await Should.ThrowAsync<InvalidOperationException>(