using System.IO.Pipes;
using MessagePack;
using Shouldly;
using Xunit;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.Ipc;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared;
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Shared.Contracts;
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Proxy.Tests;
///
/// Exercises the single-pending-slot router in : request/response
/// matching, handling, and routing of unsolicited push
/// frames (e.g. ) arriving between a request and
/// its response. Without the router, a push event interleaved with a call would be consumed
/// as the response and the next would
/// fail with an "Expected X, got Y" mismatch — the bug that blocked task #112's live Galaxy
/// E2E on the dev box.
///
[Trait("Category", "Unit")]
public sealed class GalaxyIpcClientRoutingTests
{
private const string Secret = "routing-suite-secret";
[Fact]
public async Task Response_matching_expected_kind_completes_the_call()
{
var (pipe, serverStream, clientTask) = await StartPairAsync();
using (serverStream)
await using (var client = await clientTask)
{
using var reader = new FrameReader(serverStream, leaveOpen: true);
using var writer = new FrameWriter(serverStream, leaveOpen: true);
var callTask = client.CallAsync(
MessageKind.OpenSessionRequest,
new OpenSessionRequest { DriverInstanceId = "t", DriverConfigJson = "{}" },
MessageKind.OpenSessionResponse,
CancellationToken.None);
var request = await reader.ReadFrameAsync(CancellationToken.None);
request!.Value.Kind.ShouldBe(MessageKind.OpenSessionRequest);
await writer.WriteAsync(MessageKind.OpenSessionResponse,
new OpenSessionResponse { Success = true, SessionId = 42 },
CancellationToken.None);
var response = await callTask.WaitAsync(TimeSpan.FromSeconds(2));
response.Success.ShouldBeTrue();
response.SessionId.ShouldBe(42);
}
}
[Fact]
public async Task ErrorResponse_throws_GalaxyIpcException_regardless_of_expected_kind()
{
var (pipe, serverStream, clientTask) = await StartPairAsync();
using (serverStream)
await using (var client = await clientTask)
{
using var reader = new FrameReader(serverStream, leaveOpen: true);
using var writer = new FrameWriter(serverStream, leaveOpen: true);
var callTask = client.CallAsync(
MessageKind.OpenSessionRequest,
new OpenSessionRequest { DriverInstanceId = "t", DriverConfigJson = "{}" },
MessageKind.OpenSessionResponse,
CancellationToken.None);
await reader.ReadFrameAsync(CancellationToken.None);
await writer.WriteAsync(MessageKind.ErrorResponse,
new ErrorResponse { Code = "bad-request", Message = "malformed" },
CancellationToken.None);
var ex = await Should.ThrowAsync(() => callTask.WaitAsync(TimeSpan.FromSeconds(2)));
ex.Code.ShouldBe("bad-request");
ex.Message.ShouldContain("malformed");
}
}
[Fact]
public async Task Unsolicited_event_between_request_and_response_routes_to_handler_not_the_call()
{
var (pipe, serverStream, clientTask) = await StartPairAsync();
using (serverStream)
await using (var client = await clientTask)
{
var eventFrames = new List<(MessageKind Kind, byte[] Body)>();
var eventReceived = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
client.SetEventHandler((k, body) =>
{
eventFrames.Add((k, body));
if (k == MessageKind.RuntimeStatusChange) eventReceived.TrySetResult(true);
return Task.CompletedTask;
});
using var reader = new FrameReader(serverStream, leaveOpen: true);
using var writer = new FrameWriter(serverStream, leaveOpen: true);
var callTask = client.CallAsync(
MessageKind.OpenSessionRequest,
new OpenSessionRequest { DriverInstanceId = "t", DriverConfigJson = "{}" },
MessageKind.OpenSessionResponse,
CancellationToken.None);
await reader.ReadFrameAsync(CancellationToken.None);
// Push event lands first — the bug this test guards against is CallAsync consuming
// this frame as the response and failing with "Expected X, got Y".
await writer.WriteAsync(MessageKind.RuntimeStatusChange,
new RuntimeStatusChangeNotification
{
Status = new HostConnectivityStatus
{
HostName = "host-a", RuntimeStatus = "Running", LastObservedUtcUnixMs = 1,
},
}, CancellationToken.None);
await writer.WriteAsync(MessageKind.OpenSessionResponse,
new OpenSessionResponse { Success = true, SessionId = 7 },
CancellationToken.None);
var response = await callTask.WaitAsync(TimeSpan.FromSeconds(2));
response.SessionId.ShouldBe(7);
await eventReceived.Task.WaitAsync(TimeSpan.FromSeconds(2));
var runtime = eventFrames.ShouldHaveSingleItem();
runtime.Kind.ShouldBe(MessageKind.RuntimeStatusChange);
var decoded = MessagePackSerializer.Deserialize(runtime.Body);
decoded.Status.HostName.ShouldBe("host-a");
}
}
[Fact]
public async Task Idle_push_event_with_no_pending_call_still_reaches_handler()
{
var (pipe, serverStream, clientTask) = await StartPairAsync();
using (serverStream)
await using (var client = await clientTask)
{
var received = new TaskCompletionSource<(MessageKind, byte[])>(TaskCreationOptions.RunContinuationsAsynchronously);
client.SetEventHandler((k, body) => { received.TrySetResult((k, body)); return Task.CompletedTask; });
using var writer = new FrameWriter(serverStream, leaveOpen: true);
await writer.WriteAsync(MessageKind.HostConnectivityStatus,
new HostConnectivityStatus { HostName = "h", RuntimeStatus = "Running", LastObservedUtcUnixMs = 1 },
CancellationToken.None);
var (kind, _) = await received.Task.WaitAsync(TimeSpan.FromSeconds(2));
kind.ShouldBe(MessageKind.HostConnectivityStatus);
}
}
[Fact]
public async Task Peer_closing_pipe_during_pending_call_surfaces_as_EndOfStream()
{
var (pipe, serverStream, clientTask) = await StartPairAsync();
await using var client = await clientTask;
using var reader = new FrameReader(serverStream, leaveOpen: true);
var callTask = client.CallAsync(
MessageKind.OpenSessionRequest,
new OpenSessionRequest { DriverInstanceId = "t", DriverConfigJson = "{}" },
MessageKind.OpenSessionResponse,
CancellationToken.None);
await reader.ReadFrameAsync(CancellationToken.None);
serverStream.Dispose();
await Should.ThrowAsync(() => callTask.WaitAsync(TimeSpan.FromSeconds(2)));
}
// ---- test harness ----------------------------------------------------
private static async Task<(string PipeName, NamedPipeServerStream Server, Task Client)> StartPairAsync()
{
var pipeName = $"GalaxyIpcRouting-{Guid.NewGuid():N}";
var serverStream = new NamedPipeServerStream(
pipeName, PipeDirection.InOut, maxNumberOfServerInstances: 1,
PipeTransmissionMode.Byte, PipeOptions.Asynchronous);
// Drive a Hello/HelloAck handshake on a background task so the client's ConnectAsync
// can complete. After the handshake the test owns the stream for manual framing.
var acceptTask = Task.Run(async () =>
{
await serverStream.WaitForConnectionAsync();
using var reader = new FrameReader(serverStream, leaveOpen: true);
using var writer = new FrameWriter(serverStream, leaveOpen: true);
var hello = await reader.ReadFrameAsync(CancellationToken.None);
if (hello is null || hello.Value.Kind != MessageKind.Hello)
throw new InvalidOperationException("expected Hello first");
await writer.WriteAsync(MessageKind.HelloAck,
new HelloAck { Accepted = true, HostName = "test-host" },
CancellationToken.None);
});
var clientTask = GalaxyIpcClient.ConnectAsync(pipeName, Secret, TimeSpan.FromSeconds(5), CancellationToken.None);
await acceptTask;
return (pipeName, serverStream, clientTask);
}
}