test(galaxy): readback via explicit TCS + skip unused buffered-interval RPC (review)
v2-ci / build (push) Failing after 44s
v2-ci / unit-tests (tests/Core/ZB.MOM.WW.OtOpcUa.Cluster.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Security.Tests) (push) Has been skipped
v2-ci / integration (tests/Server/ZB.MOM.WW.OtOpcUa.Host.IntegrationTests) (push) Has been skipped
v2-ci / integration (tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.IntegrationTests) (push) Has been skipped

Code-review refinement of the live-gw read-back helper: complete a
TaskCompletionSource<double?> from the pump instead of a captured local (explicit
cross-task visibility), pass bufferedUpdateIntervalMs:0 (Advise snapshot needs no
SetBufferedUpdateInterval), and document the Advise->OnDataChange filter. Live re-verified 2/2.
This commit is contained in:
Joseph Doherty
2026-06-14 23:32:00 -04:00
parent 622bfda27d
commit d19deb9b42
@@ -155,7 +155,9 @@ public sealed class GatewayGalaxyLiveReopenAndWriteTests
await session.ConnectAsync(clientOptions, ct);
var subscriber = new GatewayGalaxySubscriber(session);
double? captured = null;
// First decoded data-change value, completed by the pump (an explicit TCS rather than a
// captured local, so the value's visibility across the Task boundary is unambiguous).
var firstValue = new TaskCompletionSource<double?>(TaskCreationOptions.RunContinuationsAsynchronously);
using var streamCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
streamCts.CancelAfter(timeout);
@@ -165,20 +167,26 @@ public sealed class GatewayGalaxyLiveReopenAndWriteTests
{
await foreach (var ev in subscriber.StreamEventsAsync(streamCts.Token))
{
// SubscribeBulk uses Advise (not AdviseBuffered), so the initial galaxy-value
// snapshot arrives as OnDataChange — never OnBufferedDataChange.
if (ev.Family != MxEventFamily.OnDataChange) continue;
if (MxValueDecoder.Decode(ev.Value) is not { } decoded) continue;
captured = Convert.ToDouble(decoded);
streamCts.Cancel();
firstValue.TrySetResult(Convert.ToDouble(decoded));
return;
}
}
catch (OperationCanceledException) { /* timeout, or we got the value and cancelled */ }
catch (OperationCanceledException) { /* timeout, or stream ended */ }
firstValue.TrySetResult(null); // timed out / stream completed without a data event
}, ct);
// Let the StreamEvents RPC attach before adding the subscription so the initial snapshot isn't missed.
// Let the StreamEvents RPC attach before adding the subscription so the initial snapshot isn't
// missed. bufferedUpdateIntervalMs:0 skips the superfluous SetBufferedUpdateInterval RPC — the
// Advise-based snapshot is delivered regardless.
await Task.Delay(250, ct);
await subscriber.SubscribeBulkAsync([fullRef], bufferedUpdateIntervalMs: 500, ct);
await pump;
return captured;
await subscriber.SubscribeBulkAsync([fullRef], bufferedUpdateIntervalMs: 0, ct);
var result = await firstValue.Task;
streamCts.Cancel(); // stop the pump's StreamEvents enumeration
await pump; // observe any pump exception + ensure teardown before the session disposes
return result;
}
}