diff --git a/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/GatewayGalaxyLiveReopenAndWriteTests.cs b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/GatewayGalaxyLiveReopenAndWriteTests.cs new file mode 100644 index 00000000..f441be43 --- /dev/null +++ b/tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/Runtime/GatewayGalaxyLiveReopenAndWriteTests.cs @@ -0,0 +1,184 @@ +using ZB.MOM.WW.MxGateway.Client; +using ZB.MOM.WW.MxGateway.Contracts.Proto; +using Shouldly; +using Xunit; +using ZB.MOM.WW.OtOpcUa.Core.Abstractions; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Config; +using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime; + +namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests.Runtime; + +/// +/// Live-gateway smokes for the two seams that cannot be faked because the MXAccess +/// session types are sealed with internal ctors — so the unit suite can only exercise them in +/// isolation (see ): +/// +/// +/// +/// Reopen — the exact primitive sequence GalaxyDriver.ReopenAsync runs on a +/// reconnect ( then +/// ): a write seeds the +/// item-handle + supervisory-advise caches, the recreate tears the session down and +/// rebuilds it against the live gateway, the caches are dropped, and a follow-up write +/// re-AddItems + re-AdviseSupervisory against the fresh session and succeeds. +/// +/// +/// +/// +/// Write commit + persist — a no-login (WriteUserId = 0) write commits to +/// the galaxy only because the writer AdviseSupervisory's the handle first; proven +/// by reading the value back through a brand-new session (so the value came from +/// the galaxy, not a client-side echo). +/// +/// +/// +/// +/// Skip-gated: runs only when MXGW_ENDPOINT + GALAXY_MXGW_API_KEY point at a +/// reachable gateway (so CI stays green). Captured 2026-06-14 against 10.100.0.48:5120. +/// Writes only the dedicated test attribute TestMachine_002.TestFloat. +/// +/// +[Trait("Category", "Integration")] +public sealed class GatewayGalaxyLiveReopenAndWriteTests +{ + /// The dedicated writable Galaxy test attribute (Float, Operate) used by both smokes. + private const string WriteRef = "TestMachine_002.TestFloat"; + + [Fact] + public async Task Live_reopen_recreates_session_and_re_establishes_write_handles() + { + var (endpoint, apiKey) = RequireLiveGatewayOrSkip(); + var ct = TestContext.Current.CancellationToken; + var clientOptions = BuildClientOptions(endpoint, apiKey); + + await using var session = new GalaxyMxSession(new GalaxyMxAccessOptions(ClientName: "OtOpcUaReopenSmoke")); + await session.ConnectAsync(clientOptions, ct); + var writer = new GatewayGalaxyDataWriter(session, writeUserId: 0); + + // First write seeds both caches (AddItem -> item handle, AdviseSupervisory -> supervised handle). + var first = await writer.WriteAsync( + [new WriteRequest(WriteRef, 4242.0f)], _ => SecurityClassification.FreeAccess, ct); + first.ShouldHaveSingleItem().StatusCode.ShouldBe(0u, "the first live write should return Good (0)"); + writer.CachedItemHandleCount.ShouldBe(1, "the write should have cached one item handle"); + writer.CachedSupervisedHandleCount.ShouldBe(1, "the write should have supervisory-advised the handle once"); + + // Exactly what GalaxyDriver.ReopenAsync does on a reconnect: recreate the live session, drop the caches. + await session.RecreateAsync(clientOptions, ct); + writer.InvalidateHandleCaches(); + writer.CachedItemHandleCount.ShouldBe(0, "reopen must drop the stale item-handle cache"); + writer.CachedSupervisedHandleCount.ShouldBe(0, "reopen must drop the stale supervisory-advise cache"); + + // A write after the reopen must re-AddItem + re-AdviseSupervisory against the fresh session and succeed — + // proving RecreateAsync produced a working session and the writer rebuilt its handles. + var second = await writer.WriteAsync( + [new WriteRequest(WriteRef, 1313.0f)], _ => SecurityClassification.FreeAccess, ct); + second.ShouldHaveSingleItem().StatusCode.ShouldBe(0u, "the post-reopen write should return Good (0)"); + writer.CachedItemHandleCount.ShouldBe(1, "the post-reopen write should re-establish the item handle"); + writer.CachedSupervisedHandleCount.ShouldBe(1, "the post-reopen write should re-advise the handle"); + + TestContext.Current.SendDiagnosticMessage( + $"reopen smoke: wrote {WriteRef}=4242 -> recreate+invalidate -> wrote 1313, both Good, handles re-established"); + } + + [Fact] + public async Task Live_supervisory_write_commits_and_persists_to_galaxy() + { + var (endpoint, apiKey) = RequireLiveGatewayOrSkip(); + var ct = TestContext.Current.CancellationToken; + var clientOptions = BuildClientOptions(endpoint, apiKey); + + // Baseline: read the current value through a fresh session so we can pick a target that + // genuinely CHANGES it (otherwise a no-op write could falsely "persist"). + var current = await ReadbackFloatAsync(clientOptions, "OtOpcUaReadSmokeA", WriteRef, + TimeSpan.FromSeconds(20), ct); + var target = current is { } c && Math.Abs(c - 4242.0) < 1.0 ? 1313.0 : 4242.0; + + // Write via the production writer: no-login write that only commits because the writer + // AdviseSupervisory's the handle first. + await using (var writerSession = new GalaxyMxSession(new GalaxyMxAccessOptions(ClientName: "OtOpcUaWriteSmoke"))) + { + await writerSession.ConnectAsync(clientOptions, ct); + var writer = new GatewayGalaxyDataWriter(writerSession, writeUserId: 0); + var results = await writer.WriteAsync( + [new WriteRequest(WriteRef, (float)target)], _ => SecurityClassification.FreeAccess, ct); + results.ShouldHaveSingleItem().StatusCode.ShouldBe(0u, "the supervisory write should return Good (0)"); + } + + // Persist proof: a BRAND-NEW session must read the written value back from the galaxy. + // Poll (the worker's commit is async) — fail with the last observed value if it never lands. + double? persisted = null; + var deadline = DateTime.UtcNow + TimeSpan.FromSeconds(25); + while (DateTime.UtcNow < deadline) + { + persisted = await ReadbackFloatAsync(clientOptions, "OtOpcUaReadSmokeB", WriteRef, + TimeSpan.FromSeconds(15), ct); + if (persisted is { } p && Math.Abs(p - target) < 0.5) break; + await Task.Delay(1000, ct); + } + + persisted.ShouldNotBeNull("a fresh session should have read a value for the test attribute"); + persisted!.Value.ShouldBe(target, 0.5, + $"the supervisory write must persist in the galaxy (fresh-session read-back; last observed {persisted})"); + + TestContext.Current.SendDiagnosticMessage( + $"write-persist smoke: {WriteRef} {current} -> wrote {target} -> fresh-session read-back {persisted}"); + } + + private static (string Endpoint, string ApiKey) RequireLiveGatewayOrSkip() + { + var endpoint = Environment.GetEnvironmentVariable("MXGW_ENDPOINT"); + var apiKey = Environment.GetEnvironmentVariable("GALAXY_MXGW_API_KEY"); + if (string.IsNullOrWhiteSpace(endpoint) || string.IsNullOrWhiteSpace(apiKey)) + Assert.Skip("Set MXGW_ENDPOINT + GALAXY_MXGW_API_KEY to run the live gateway reopen/write smokes."); + return (endpoint!, apiKey!); + } + + private static MxGatewayClientOptions BuildClientOptions(string endpoint, string apiKey) => new() + { + Endpoint = new Uri(endpoint, UriKind.Absolute), + ApiKey = apiKey, + UseTls = false, + ConnectTimeout = TimeSpan.FromSeconds(10), + DefaultCallTimeout = TimeSpan.FromSeconds(30), + StreamTimeout = TimeSpan.FromSeconds(30), + }; + + /// + /// Open a fresh session, subscribe to a single reference, and return the first decoded + /// data-change value (the subscribe snapshot reflects the galaxy's current value), or null + /// on timeout. A fresh session per call guarantees the value came from the galaxy. + /// + private static async Task ReadbackFloatAsync( + MxGatewayClientOptions clientOptions, string clientName, string fullRef, TimeSpan timeout, CancellationToken ct) + { + await using var session = new GalaxyMxSession(new GalaxyMxAccessOptions(ClientName: clientName)); + await session.ConnectAsync(clientOptions, ct); + var subscriber = new GatewayGalaxySubscriber(session); + + double? captured = null; + using var streamCts = CancellationTokenSource.CreateLinkedTokenSource(ct); + streamCts.CancelAfter(timeout); + + var pump = Task.Run(async () => + { + try + { + await foreach (var ev in subscriber.StreamEventsAsync(streamCts.Token)) + { + if (ev.Family != MxEventFamily.OnDataChange) continue; + if (MxValueDecoder.Decode(ev.Value) is not { } decoded) continue; + captured = Convert.ToDouble(decoded); + streamCts.Cancel(); + return; + } + } + catch (OperationCanceledException) { /* timeout, or we got the value and cancelled */ } + }, ct); + + // Let the StreamEvents RPC attach before adding the subscription so the initial snapshot isn't missed. + await Task.Delay(250, ct); + await subscriber.SubscribeBulkAsync([fullRef], bufferedUpdateIntervalMs: 500, ct); + await pump; + return captured; + } +}