From d19deb9b420f637b8baab52f9da4439908110dd3 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 14 Jun 2026 23:32:00 -0400 Subject: [PATCH] test(galaxy): readback via explicit TCS + skip unused buffered-interval RPC (review) Code-review refinement of the live-gw read-back helper: complete a TaskCompletionSource from the pump instead of a captured local (explicit cross-task visibility), pass bufferedUpdateIntervalMs:0 (Advise snapshot needs no SetBufferedUpdateInterval), and document the Advise->OnDataChange filter. Live re-verified 2/2. --- .../GatewayGalaxyLiveReopenAndWriteTests.cs | 24 ++++++++++++------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/GatewayGalaxyLiveReopenAndWriteTests.cs b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/GatewayGalaxyLiveReopenAndWriteTests.cs index f441be43..e0e7208f 100644 --- a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/GatewayGalaxyLiveReopenAndWriteTests.cs +++ b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/GatewayGalaxyLiveReopenAndWriteTests.cs @@ -155,7 +155,9 @@ public sealed class GatewayGalaxyLiveReopenAndWriteTests await session.ConnectAsync(clientOptions, ct); var subscriber = new GatewayGalaxySubscriber(session); - double? captured = null; + // First decoded data-change value, completed by the pump (an explicit TCS rather than a + // captured local, so the value's visibility across the Task boundary is unambiguous). + var firstValue = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); using var streamCts = CancellationTokenSource.CreateLinkedTokenSource(ct); streamCts.CancelAfter(timeout); @@ -165,20 +167,26 @@ public sealed class GatewayGalaxyLiveReopenAndWriteTests { await foreach (var ev in subscriber.StreamEventsAsync(streamCts.Token)) { + // SubscribeBulk uses Advise (not AdviseBuffered), so the initial galaxy-value + // snapshot arrives as OnDataChange — never OnBufferedDataChange. if (ev.Family != MxEventFamily.OnDataChange) continue; if (MxValueDecoder.Decode(ev.Value) is not { } decoded) continue; - captured = Convert.ToDouble(decoded); - streamCts.Cancel(); + firstValue.TrySetResult(Convert.ToDouble(decoded)); return; } } - catch (OperationCanceledException) { /* timeout, or we got the value and cancelled */ } + catch (OperationCanceledException) { /* timeout, or stream ended */ } + firstValue.TrySetResult(null); // timed out / stream completed without a data event }, ct); - // Let the StreamEvents RPC attach before adding the subscription so the initial snapshot isn't missed. + // Let the StreamEvents RPC attach before adding the subscription so the initial snapshot isn't + // missed. bufferedUpdateIntervalMs:0 skips the superfluous SetBufferedUpdateInterval RPC — the + // Advise-based snapshot is delivered regardless. await Task.Delay(250, ct); - await subscriber.SubscribeBulkAsync([fullRef], bufferedUpdateIntervalMs: 500, ct); - await pump; - return captured; + await subscriber.SubscribeBulkAsync([fullRef], bufferedUpdateIntervalMs: 0, ct); + var result = await firstValue.Task; + streamCts.Cancel(); // stop the pump's StreamEvents enumeration + await pump; // observe any pump exception + ensure teardown before the session disposes + return result; } }