using System;
using System.IO;
using System.IO.Pipes;
using System.Threading;
using System.Threading.Tasks;
using Google.Protobuf.WellKnownTypes;
using MxGateway.Contracts;
using MxGateway.Contracts.Proto;
using MxGateway.Worker.Bootstrap;
using MxGateway.Worker.Ipc;
using MxGateway.Worker.Tests.TestSupport;
namespace MxGateway.Worker.Tests.Ipc;
public sealed class WorkerPipeClientTests
{
/// Verifies that worker client connects and completes handshake.
[Fact]
public async Task RunAsync_ConnectsToPipeAndCompletesHandshake()
{
string pipeName = $"mxaccess-gateway-test-{Guid.NewGuid():N}";
WorkerOptions workerOptions = new(
"session-1",
pipeName,
GatewayContractInfo.WorkerProtocolVersion,
"nonce-secret");
WorkerFrameProtocolOptions frameOptions = new(workerOptions);
using NamedPipeServerStream server = new(
pipeName,
PipeDirection.InOut,
1,
PipeTransmissionMode.Byte,
PipeOptions.Asynchronous);
WorkerPipeClient client = new(
connectTimeoutMilliseconds: 5000,
(stream, options) => CreateSession(stream, options));
Task clientTask = client.RunAsync(workerOptions);
await Task.Factory.FromAsync(server.BeginWaitForConnection, server.EndWaitForConnection, null);
WorkerFrameReader reader = new(server, frameOptions);
WorkerFrameWriter writer = new(server, frameOptions);
await writer.WriteAsync(new WorkerEnvelope
{
ProtocolVersion = GatewayContractInfo.WorkerProtocolVersion,
SessionId = "session-1",
Sequence = 1,
GatewayHello = new GatewayHello
{
SupportedProtocolVersion = GatewayContractInfo.WorkerProtocolVersion,
Nonce = "nonce-secret",
GatewayVersion = "test-gateway",
},
});
WorkerEnvelope hello = await reader.ReadAsync();
Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerHello, hello.BodyCase);
Assert.Equal("nonce-secret", hello.WorkerHello.Nonce);
WorkerEnvelope ready = await reader.ReadAsync();
Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerReady, ready.BodyCase);
await writer.WriteAsync(new WorkerEnvelope
{
ProtocolVersion = GatewayContractInfo.WorkerProtocolVersion,
SessionId = "session-1",
Sequence = 2,
WorkerShutdown = new WorkerShutdown
{
GracePeriod = Duration.FromTimeSpan(TimeSpan.FromSeconds(1)),
Reason = "test-complete",
},
});
WorkerEnvelope shutdownAck = await ReadUntilAsync(
reader,
WorkerEnvelope.BodyOneofCase.WorkerShutdownAck);
Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerShutdownAck, shutdownAck.BodyCase);
await clientTask;
}
/// Verifies that worker client retries until pipe server becomes available.
[Fact]
public async Task RunAsync_RetriesUntilPipeServerAppears()
{
string pipeName = $"mxaccess-gateway-test-{Guid.NewGuid():N}";
WorkerOptions workerOptions = new(
"session-1",
pipeName,
GatewayContractInfo.WorkerProtocolVersion,
"nonce-secret");
WorkerFrameProtocolOptions frameOptions = new(workerOptions);
WorkerPipeClient client = new(
logger: null,
connectTimeoutMilliseconds: 1000,
connectAttemptTimeoutMilliseconds: 50,
(stream, options, _) => CreateSession(stream, options));
Task clientTask = client.RunAsync(workerOptions);
await Task.Delay(150);
using NamedPipeServerStream server = new(
pipeName,
PipeDirection.InOut,
1,
PipeTransmissionMode.Byte,
PipeOptions.Asynchronous);
await Task.Factory.FromAsync(server.BeginWaitForConnection, server.EndWaitForConnection, null);
WorkerFrameReader reader = new(server, frameOptions);
WorkerFrameWriter writer = new(server, frameOptions);
await writer.WriteAsync(CreateGatewayHello());
Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerHello, (await reader.ReadAsync()).BodyCase);
Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerReady, (await reader.ReadAsync()).BodyCase);
await writer.WriteAsync(CreateShutdown());
Assert.Equal(
WorkerEnvelope.BodyOneofCase.WorkerShutdownAck,
(await ReadUntilAsync(reader, WorkerEnvelope.BodyOneofCase.WorkerShutdownAck)).BodyCase);
await clientTask;
}
/// Verifies that worker client throws timeout if pipe never appears.
[Fact]
public async Task RunAsync_WhenPipeNeverAppears_ThrowsTimeoutException()
{
WorkerOptions workerOptions = new(
"session-1",
$"mxaccess-gateway-test-{Guid.NewGuid():N}",
GatewayContractInfo.WorkerProtocolVersion,
"nonce-secret");
WorkerPipeClient client = new(
logger: null,
connectTimeoutMilliseconds: 100,
connectAttemptTimeoutMilliseconds: 50,
(stream, options, _) => CreateSession(stream, options));
await Assert.ThrowsAsync(async () => await client.RunAsync(workerOptions));
}
///
/// Reads frames until one matching the expected body case is found,
/// skipping interleaved heartbeats (the first heartbeat is emitted
/// immediately on entering the heartbeat loop — see Worker-002).
///
private static async Task ReadUntilAsync(
WorkerFrameReader reader,
WorkerEnvelope.BodyOneofCase expectedBody)
{
while (true)
{
WorkerEnvelope envelope = await reader.ReadAsync();
if (envelope.BodyCase == expectedBody)
{
return envelope;
}
}
}
private static WorkerPipeSession CreateSession(
Stream stream,
WorkerFrameProtocolOptions options)
{
return new WorkerPipeSession(
new WorkerFrameReader(stream, options),
new WorkerFrameWriter(stream, options),
options,
() => 1234,
new WorkerPipeSessionOptions
{
HeartbeatInterval = TimeSpan.FromSeconds(30),
HeartbeatGrace = TimeSpan.FromSeconds(30),
},
() => new FakeRuntimeSession());
}
private static WorkerEnvelope CreateGatewayHello()
{
return new WorkerEnvelope
{
ProtocolVersion = GatewayContractInfo.WorkerProtocolVersion,
SessionId = "session-1",
Sequence = 1,
GatewayHello = new GatewayHello
{
SupportedProtocolVersion = GatewayContractInfo.WorkerProtocolVersion,
Nonce = "nonce-secret",
GatewayVersion = "test-gateway",
},
};
}
private static WorkerEnvelope CreateShutdown()
{
return new WorkerEnvelope
{
ProtocolVersion = GatewayContractInfo.WorkerProtocolVersion,
SessionId = "session-1",
Sequence = 2,
WorkerShutdown = new WorkerShutdown
{
GracePeriod = Duration.FromTimeSpan(TimeSpan.FromSeconds(1)),
Reason = "test-complete",
},
};
}
}