d19deb9b42
v2-ci / build (push) Failing after 44s
v2-ci / unit-tests (tests/Core/ZB.MOM.WW.OtOpcUa.Cluster.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.ControlPlane.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Runtime.Tests) (push) Has been skipped
v2-ci / unit-tests (tests/Server/ZB.MOM.WW.OtOpcUa.Security.Tests) (push) Has been skipped
v2-ci / integration (tests/Server/ZB.MOM.WW.OtOpcUa.Host.IntegrationTests) (push) Has been skipped
v2-ci / integration (tests/Server/ZB.MOM.WW.OtOpcUa.OpcUaServer.IntegrationTests) (push) Has been skipped
Code-review refinement of the live-gw read-back helper: complete a TaskCompletionSource<double?> from the pump instead of a captured local (explicit cross-task visibility), pass bufferedUpdateIntervalMs:0 (Advise snapshot needs no SetBufferedUpdateInterval), and document the Advise->OnDataChange filter. Live re-verified 2/2.
193 lines
10 KiB
C#
193 lines
10 KiB
C#
using ZB.MOM.WW.MxGateway.Client;
|
|
using ZB.MOM.WW.MxGateway.Contracts.Proto;
|
|
using Shouldly;
|
|
using Xunit;
|
|
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Config;
|
|
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime;
|
|
|
|
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests.Runtime;
|
|
|
|
/// <summary>
|
|
/// Live-gateway smokes for the two seams that <em>cannot</em> be faked because the MXAccess
|
|
/// session types are sealed with internal ctors — so the unit suite can only exercise them in
|
|
/// isolation (see <see cref="GatewayGalaxyDataWriterTests"/>):
|
|
/// <list type="number">
|
|
/// <item>
|
|
/// <description>
|
|
/// <b>Reopen</b> — the exact primitive sequence <c>GalaxyDriver.ReopenAsync</c> runs on a
|
|
/// reconnect (<see cref="GalaxyMxSession.RecreateAsync"/> then
|
|
/// <see cref="GatewayGalaxyDataWriter.InvalidateHandleCaches"/>): a write seeds the
|
|
/// item-handle + supervisory-advise caches, the recreate tears the session down and
|
|
/// rebuilds it against the live gateway, the caches are dropped, and a follow-up write
|
|
/// re-AddItems + re-AdviseSupervisory against the fresh session and succeeds.
|
|
/// </description>
|
|
/// </item>
|
|
/// <item>
|
|
/// <description>
|
|
/// <b>Write commit + persist</b> — a no-login (<c>WriteUserId = 0</c>) write commits to
|
|
/// the galaxy only because the writer <c>AdviseSupervisory</c>'s the handle first; proven
|
|
/// by reading the value back through a <em>brand-new</em> session (so the value came from
|
|
/// the galaxy, not a client-side echo).
|
|
/// </description>
|
|
/// </item>
|
|
/// </list>
|
|
/// <para>
|
|
/// Skip-gated: runs only when <c>MXGW_ENDPOINT</c> + <c>GALAXY_MXGW_API_KEY</c> point at a
|
|
/// reachable gateway (so CI stays green). Captured 2026-06-14 against <c>10.100.0.48:5120</c>.
|
|
/// Writes only the dedicated test attribute <c>TestMachine_002.TestFloat</c>.
|
|
/// </para>
|
|
/// </summary>
|
|
[Trait("Category", "Integration")]
|
|
public sealed class GatewayGalaxyLiveReopenAndWriteTests
|
|
{
|
|
/// <summary>The dedicated writable Galaxy test attribute (Float, Operate) used by both smokes.</summary>
|
|
private const string WriteRef = "TestMachine_002.TestFloat";
|
|
|
|
[Fact]
|
|
public async Task Live_reopen_recreates_session_and_re_establishes_write_handles()
|
|
{
|
|
var (endpoint, apiKey) = RequireLiveGatewayOrSkip();
|
|
var ct = TestContext.Current.CancellationToken;
|
|
var clientOptions = BuildClientOptions(endpoint, apiKey);
|
|
|
|
await using var session = new GalaxyMxSession(new GalaxyMxAccessOptions(ClientName: "OtOpcUaReopenSmoke"));
|
|
await session.ConnectAsync(clientOptions, ct);
|
|
var writer = new GatewayGalaxyDataWriter(session, writeUserId: 0);
|
|
|
|
// First write seeds both caches (AddItem -> item handle, AdviseSupervisory -> supervised handle).
|
|
var first = await writer.WriteAsync(
|
|
[new WriteRequest(WriteRef, 4242.0f)], _ => SecurityClassification.FreeAccess, ct);
|
|
first.ShouldHaveSingleItem().StatusCode.ShouldBe(0u, "the first live write should return Good (0)");
|
|
writer.CachedItemHandleCount.ShouldBe(1, "the write should have cached one item handle");
|
|
writer.CachedSupervisedHandleCount.ShouldBe(1, "the write should have supervisory-advised the handle once");
|
|
|
|
// Exactly what GalaxyDriver.ReopenAsync does on a reconnect: recreate the live session, drop the caches.
|
|
await session.RecreateAsync(clientOptions, ct);
|
|
writer.InvalidateHandleCaches();
|
|
writer.CachedItemHandleCount.ShouldBe(0, "reopen must drop the stale item-handle cache");
|
|
writer.CachedSupervisedHandleCount.ShouldBe(0, "reopen must drop the stale supervisory-advise cache");
|
|
|
|
// A write after the reopen must re-AddItem + re-AdviseSupervisory against the fresh session and succeed —
|
|
// proving RecreateAsync produced a working session and the writer rebuilt its handles.
|
|
var second = await writer.WriteAsync(
|
|
[new WriteRequest(WriteRef, 1313.0f)], _ => SecurityClassification.FreeAccess, ct);
|
|
second.ShouldHaveSingleItem().StatusCode.ShouldBe(0u, "the post-reopen write should return Good (0)");
|
|
writer.CachedItemHandleCount.ShouldBe(1, "the post-reopen write should re-establish the item handle");
|
|
writer.CachedSupervisedHandleCount.ShouldBe(1, "the post-reopen write should re-advise the handle");
|
|
|
|
TestContext.Current.SendDiagnosticMessage(
|
|
$"reopen smoke: wrote {WriteRef}=4242 -> recreate+invalidate -> wrote 1313, both Good, handles re-established");
|
|
}
|
|
|
|
[Fact]
|
|
public async Task Live_supervisory_write_commits_and_persists_to_galaxy()
|
|
{
|
|
var (endpoint, apiKey) = RequireLiveGatewayOrSkip();
|
|
var ct = TestContext.Current.CancellationToken;
|
|
var clientOptions = BuildClientOptions(endpoint, apiKey);
|
|
|
|
// Baseline: read the current value through a fresh session so we can pick a target that
|
|
// genuinely CHANGES it (otherwise a no-op write could falsely "persist").
|
|
var current = await ReadbackFloatAsync(clientOptions, "OtOpcUaReadSmokeA", WriteRef,
|
|
TimeSpan.FromSeconds(20), ct);
|
|
var target = current is { } c && Math.Abs(c - 4242.0) < 1.0 ? 1313.0 : 4242.0;
|
|
|
|
// Write via the production writer: no-login write that only commits because the writer
|
|
// AdviseSupervisory's the handle first.
|
|
await using (var writerSession = new GalaxyMxSession(new GalaxyMxAccessOptions(ClientName: "OtOpcUaWriteSmoke")))
|
|
{
|
|
await writerSession.ConnectAsync(clientOptions, ct);
|
|
var writer = new GatewayGalaxyDataWriter(writerSession, writeUserId: 0);
|
|
var results = await writer.WriteAsync(
|
|
[new WriteRequest(WriteRef, (float)target)], _ => SecurityClassification.FreeAccess, ct);
|
|
results.ShouldHaveSingleItem().StatusCode.ShouldBe(0u, "the supervisory write should return Good (0)");
|
|
}
|
|
|
|
// Persist proof: a BRAND-NEW session must read the written value back from the galaxy.
|
|
// Poll (the worker's commit is async) — fail with the last observed value if it never lands.
|
|
double? persisted = null;
|
|
var deadline = DateTime.UtcNow + TimeSpan.FromSeconds(25);
|
|
while (DateTime.UtcNow < deadline)
|
|
{
|
|
persisted = await ReadbackFloatAsync(clientOptions, "OtOpcUaReadSmokeB", WriteRef,
|
|
TimeSpan.FromSeconds(15), ct);
|
|
if (persisted is { } p && Math.Abs(p - target) < 0.5) break;
|
|
await Task.Delay(1000, ct);
|
|
}
|
|
|
|
persisted.ShouldNotBeNull("a fresh session should have read a value for the test attribute");
|
|
persisted!.Value.ShouldBe(target, 0.5,
|
|
$"the supervisory write must persist in the galaxy (fresh-session read-back; last observed {persisted})");
|
|
|
|
TestContext.Current.SendDiagnosticMessage(
|
|
$"write-persist smoke: {WriteRef} {current} -> wrote {target} -> fresh-session read-back {persisted}");
|
|
}
|
|
|
|
private static (string Endpoint, string ApiKey) RequireLiveGatewayOrSkip()
|
|
{
|
|
var endpoint = Environment.GetEnvironmentVariable("MXGW_ENDPOINT");
|
|
var apiKey = Environment.GetEnvironmentVariable("GALAXY_MXGW_API_KEY");
|
|
if (string.IsNullOrWhiteSpace(endpoint) || string.IsNullOrWhiteSpace(apiKey))
|
|
Assert.Skip("Set MXGW_ENDPOINT + GALAXY_MXGW_API_KEY to run the live gateway reopen/write smokes.");
|
|
return (endpoint!, apiKey!);
|
|
}
|
|
|
|
private static MxGatewayClientOptions BuildClientOptions(string endpoint, string apiKey) => new()
|
|
{
|
|
Endpoint = new Uri(endpoint, UriKind.Absolute),
|
|
ApiKey = apiKey,
|
|
UseTls = false,
|
|
ConnectTimeout = TimeSpan.FromSeconds(10),
|
|
DefaultCallTimeout = TimeSpan.FromSeconds(30),
|
|
StreamTimeout = TimeSpan.FromSeconds(30),
|
|
};
|
|
|
|
/// <summary>
|
|
/// Open a fresh session, subscribe to a single reference, and return the first decoded
|
|
/// data-change value (the subscribe snapshot reflects the galaxy's current value), or null
|
|
/// on timeout. A fresh session per call guarantees the value came from the galaxy.
|
|
/// </summary>
|
|
private static async Task<double?> ReadbackFloatAsync(
|
|
MxGatewayClientOptions clientOptions, string clientName, string fullRef, TimeSpan timeout, CancellationToken ct)
|
|
{
|
|
await using var session = new GalaxyMxSession(new GalaxyMxAccessOptions(ClientName: clientName));
|
|
await session.ConnectAsync(clientOptions, ct);
|
|
var subscriber = new GatewayGalaxySubscriber(session);
|
|
|
|
// First decoded data-change value, completed by the pump (an explicit TCS rather than a
|
|
// captured local, so the value's visibility across the Task boundary is unambiguous).
|
|
var firstValue = new TaskCompletionSource<double?>(TaskCreationOptions.RunContinuationsAsynchronously);
|
|
using var streamCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
|
|
streamCts.CancelAfter(timeout);
|
|
|
|
var pump = Task.Run(async () =>
|
|
{
|
|
try
|
|
{
|
|
await foreach (var ev in subscriber.StreamEventsAsync(streamCts.Token))
|
|
{
|
|
// SubscribeBulk uses Advise (not AdviseBuffered), so the initial galaxy-value
|
|
// snapshot arrives as OnDataChange — never OnBufferedDataChange.
|
|
if (ev.Family != MxEventFamily.OnDataChange) continue;
|
|
if (MxValueDecoder.Decode(ev.Value) is not { } decoded) continue;
|
|
firstValue.TrySetResult(Convert.ToDouble(decoded));
|
|
return;
|
|
}
|
|
}
|
|
catch (OperationCanceledException) { /* timeout, or stream ended */ }
|
|
firstValue.TrySetResult(null); // timed out / stream completed without a data event
|
|
}, ct);
|
|
|
|
// Let the StreamEvents RPC attach before adding the subscription so the initial snapshot isn't
|
|
// missed. bufferedUpdateIntervalMs:0 skips the superfluous SetBufferedUpdateInterval RPC — the
|
|
// Advise-based snapshot is delivered regardless.
|
|
await Task.Delay(250, ct);
|
|
await subscriber.SubscribeBulkAsync([fullRef], bufferedUpdateIntervalMs: 0, ct);
|
|
var result = await firstValue.Task;
|
|
streamCts.Cancel(); // stop the pump's StreamEvents enumeration
|
|
await pump; // observe any pump exception + ensure teardown before the session disposes
|
|
return result;
|
|
}
|
|
}
|