using System.IO.Pipes; using MessagePack; using Shouldly; using Xunit; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.Ipc; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared; using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts; namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.Tests; /// /// Exercises the single-pending-slot router in : request/response /// matching, handling, and routing of unsolicited push /// frames (e.g. ) arriving between a request and /// its response. Without the router, a push event interleaved with a call would be consumed /// as the response and the next would /// fail with an "Expected X, got Y" mismatch — the bug that blocked task #112's live Galaxy /// E2E on the dev box. /// [Trait("Category", "Unit")] public sealed class GalaxyIpcClientRoutingTests { private const string Secret = "routing-suite-secret"; [Fact] public async Task Response_matching_expected_kind_completes_the_call() { var (pipe, serverStream, clientTask) = await StartPairAsync(); using (serverStream) await using (var client = await clientTask) { using var reader = new FrameReader(serverStream, leaveOpen: true); using var writer = new FrameWriter(serverStream, leaveOpen: true); var callTask = client.CallAsync( MessageKind.OpenSessionRequest, new OpenSessionRequest { DriverInstanceId = "t", DriverConfigJson = "{}" }, MessageKind.OpenSessionResponse, CancellationToken.None); var request = await reader.ReadFrameAsync(CancellationToken.None); request!.Value.Kind.ShouldBe(MessageKind.OpenSessionRequest); await writer.WriteAsync(MessageKind.OpenSessionResponse, new OpenSessionResponse { Success = true, SessionId = 42 }, CancellationToken.None); var response = await callTask.WaitAsync(TimeSpan.FromSeconds(2)); response.Success.ShouldBeTrue(); response.SessionId.ShouldBe(42); } } [Fact] public async Task ErrorResponse_throws_GalaxyIpcException_regardless_of_expected_kind() { var (pipe, serverStream, clientTask) = await StartPairAsync(); using (serverStream) await using (var client = await clientTask) { using var reader = new FrameReader(serverStream, leaveOpen: true); using var writer = new FrameWriter(serverStream, leaveOpen: true); var callTask = client.CallAsync( MessageKind.OpenSessionRequest, new OpenSessionRequest { DriverInstanceId = "t", DriverConfigJson = "{}" }, MessageKind.OpenSessionResponse, CancellationToken.None); await reader.ReadFrameAsync(CancellationToken.None); await writer.WriteAsync(MessageKind.ErrorResponse, new ErrorResponse { Code = "bad-request", Message = "malformed" }, CancellationToken.None); var ex = await Should.ThrowAsync(() => callTask.WaitAsync(TimeSpan.FromSeconds(2))); ex.Code.ShouldBe("bad-request"); ex.Message.ShouldContain("malformed"); } } [Fact] public async Task Unsolicited_event_between_request_and_response_routes_to_handler_not_the_call() { var (pipe, serverStream, clientTask) = await StartPairAsync(); using (serverStream) await using (var client = await clientTask) { var eventFrames = new List<(MessageKind Kind, byte[] Body)>(); var eventReceived = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); client.SetEventHandler((k, body) => { eventFrames.Add((k, body)); if (k == MessageKind.RuntimeStatusChange) eventReceived.TrySetResult(true); return Task.CompletedTask; }); using var reader = new FrameReader(serverStream, leaveOpen: true); using var writer = new FrameWriter(serverStream, leaveOpen: true); var callTask = client.CallAsync( MessageKind.OpenSessionRequest, new OpenSessionRequest { DriverInstanceId = "t", DriverConfigJson = "{}" }, MessageKind.OpenSessionResponse, CancellationToken.None); await reader.ReadFrameAsync(CancellationToken.None); // Push event lands first — the bug this test guards against is CallAsync consuming // this frame as the response and failing with "Expected X, got Y". await writer.WriteAsync(MessageKind.RuntimeStatusChange, new RuntimeStatusChangeNotification { Status = new HostConnectivityStatus { HostName = "host-a", RuntimeStatus = "Running", LastObservedUtcUnixMs = 1, }, }, CancellationToken.None); await writer.WriteAsync(MessageKind.OpenSessionResponse, new OpenSessionResponse { Success = true, SessionId = 7 }, CancellationToken.None); var response = await callTask.WaitAsync(TimeSpan.FromSeconds(2)); response.SessionId.ShouldBe(7); await eventReceived.Task.WaitAsync(TimeSpan.FromSeconds(2)); var runtime = eventFrames.ShouldHaveSingleItem(); runtime.Kind.ShouldBe(MessageKind.RuntimeStatusChange); var decoded = MessagePackSerializer.Deserialize(runtime.Body); decoded.Status.HostName.ShouldBe("host-a"); } } [Fact] public async Task Idle_push_event_with_no_pending_call_still_reaches_handler() { var (pipe, serverStream, clientTask) = await StartPairAsync(); using (serverStream) await using (var client = await clientTask) { var received = new TaskCompletionSource<(MessageKind, byte[])>(TaskCreationOptions.RunContinuationsAsynchronously); client.SetEventHandler((k, body) => { received.TrySetResult((k, body)); return Task.CompletedTask; }); using var writer = new FrameWriter(serverStream, leaveOpen: true); await writer.WriteAsync(MessageKind.HostConnectivityStatus, new HostConnectivityStatus { HostName = "h", RuntimeStatus = "Running", LastObservedUtcUnixMs = 1 }, CancellationToken.None); var (kind, _) = await received.Task.WaitAsync(TimeSpan.FromSeconds(2)); kind.ShouldBe(MessageKind.HostConnectivityStatus); } } [Fact] public async Task Peer_closing_pipe_during_pending_call_surfaces_as_EndOfStream() { var (pipe, serverStream, clientTask) = await StartPairAsync(); await using var client = await clientTask; using var reader = new FrameReader(serverStream, leaveOpen: true); var callTask = client.CallAsync( MessageKind.OpenSessionRequest, new OpenSessionRequest { DriverInstanceId = "t", DriverConfigJson = "{}" }, MessageKind.OpenSessionResponse, CancellationToken.None); await reader.ReadFrameAsync(CancellationToken.None); serverStream.Dispose(); await Should.ThrowAsync(() => callTask.WaitAsync(TimeSpan.FromSeconds(2))); } // ---- test harness ---------------------------------------------------- private static async Task<(string PipeName, NamedPipeServerStream Server, Task Client)> StartPairAsync() { var pipeName = $"GalaxyIpcRouting-{Guid.NewGuid():N}"; var serverStream = new NamedPipeServerStream( pipeName, PipeDirection.InOut, maxNumberOfServerInstances: 1, PipeTransmissionMode.Byte, PipeOptions.Asynchronous); // Drive a Hello/HelloAck handshake on a background task so the client's ConnectAsync // can complete. After the handshake the test owns the stream for manual framing. var acceptTask = Task.Run(async () => { await serverStream.WaitForConnectionAsync(); using var reader = new FrameReader(serverStream, leaveOpen: true); using var writer = new FrameWriter(serverStream, leaveOpen: true); var hello = await reader.ReadFrameAsync(CancellationToken.None); if (hello is null || hello.Value.Kind != MessageKind.Hello) throw new InvalidOperationException("expected Hello first"); await writer.WriteAsync(MessageKind.HelloAck, new HelloAck { Accepted = true, HostName = "test-host" }, CancellationToken.None); }); var clientTask = GalaxyIpcClient.ConnectAsync(pipeName, Secret, TimeSpan.FromSeconds(5), CancellationToken.None); await acceptTask; return (pipeName, serverStream, clientTask); } }