using System.IO.Pipes; using MxGateway.Contracts; using MxGateway.Contracts.Proto; using MxGateway.Server.Workers; namespace MxGateway.Tests.Gateway.Workers; public sealed class WorkerClientTests { private const string SessionId = "session-worker-client"; private const string Nonce = "nonce-worker-client"; private const int WorkerProcessId = 4321; private static readonly TimeSpan TestTimeout = TimeSpan.FromSeconds(5); [Fact] public async Task StartAsync_WithWorkerHelloAndReady_EntersReadyState() { await using PipePair pipePair = await PipePair.CreateAsync(); await using WorkerClient client = CreateClient(pipePair); await CompleteHandshakeAsync(client, pipePair); Assert.Equal(WorkerClientState.Ready, client.State); Assert.Equal(WorkerProcessId, client.ProcessId); } [Fact] public async Task InvokeAsync_WithMatchingReply_CompletesPendingCommand() { await using PipePair pipePair = await PipePair.CreateAsync(); await using WorkerClient client = CreateClient(pipePair); await CompleteHandshakeAsync(client, pipePair); Task invokeTask = client.InvokeAsync( CreateCommand(MxCommandKind.Ping), TestTimeout, CancellationToken.None); WorkerEnvelope commandEnvelope = await pipePair.WorkerReader.ReadAsync().AsTask().WaitAsync(TestTimeout); Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerCommand, commandEnvelope.BodyCase); Assert.False(string.IsNullOrWhiteSpace(commandEnvelope.CorrelationId)); await pipePair.WorkerWriter.WriteAsync( CreateCommandReplyEnvelope(commandEnvelope.CorrelationId, MxCommandKind.Ping)); WorkerCommandReply reply = await invokeTask.WaitAsync(TestTimeout); Assert.Equal(commandEnvelope.CorrelationId, reply.Reply.CorrelationId); Assert.Equal(MxCommandKind.Ping, reply.Reply.Kind); } [Fact] public async Task InvokeAsync_WithLateReply_IgnoresLateReplyAndKeepsClientReady() { await using PipePair pipePair = await PipePair.CreateAsync(); await using WorkerClient client = CreateClient(pipePair); await CompleteHandshakeAsync(client, pipePair); Task timedOutInvokeTask = client.InvokeAsync( CreateCommand(MxCommandKind.Ping), TimeSpan.FromMilliseconds(50), CancellationToken.None); WorkerEnvelope timedOutCommand = await pipePair.WorkerReader.ReadAsync().AsTask().WaitAsync(TestTimeout); WorkerClientException exception = await Assert.ThrowsAsync( async () => await timedOutInvokeTask); Assert.Equal(WorkerClientErrorCode.CommandTimeout, exception.ErrorCode); await pipePair.WorkerWriter.WriteAsync( CreateCommandReplyEnvelope(timedOutCommand.CorrelationId, MxCommandKind.Ping)); await Task.Delay(TimeSpan.FromMilliseconds(50)); Task secondInvokeTask = client.InvokeAsync( CreateCommand(MxCommandKind.GetWorkerInfo), TestTimeout, CancellationToken.None); WorkerEnvelope secondCommand = await pipePair.WorkerReader.ReadAsync().AsTask().WaitAsync(TestTimeout); await pipePair.WorkerWriter.WriteAsync( CreateCommandReplyEnvelope(secondCommand.CorrelationId, MxCommandKind.GetWorkerInfo)); WorkerCommandReply reply = await secondInvokeTask.WaitAsync(TestTimeout); Assert.Equal(WorkerClientState.Ready, client.State); Assert.Equal(MxCommandKind.GetWorkerInfo, reply.Reply.Kind); } [Fact] public async Task ReadEventsAsync_WithWorkerEvents_YieldsEventsInPipeOrder() { await using PipePair pipePair = await PipePair.CreateAsync(); await using WorkerClient client = CreateClient(pipePair); await CompleteHandshakeAsync(client, pipePair); using CancellationTokenSource cancellationTokenSource = new(TestTimeout); await using IAsyncEnumerator events = client.ReadEventsAsync(cancellationTokenSource.Token).GetAsyncEnumerator(cancellationTokenSource.Token); await pipePair.WorkerWriter.WriteAsync( CreateEventEnvelope(sequence: 11, MxEventFamily.OnDataChange)); await pipePair.WorkerWriter.WriteAsync( CreateEventEnvelope(sequence: 12, MxEventFamily.OperationComplete)); Assert.True(await events.MoveNextAsync()); Assert.Equal((ulong)11, events.Current.Event.WorkerSequence); Assert.Equal(MxEventFamily.OnDataChange, events.Current.Event.Family); Assert.True(await events.MoveNextAsync()); Assert.Equal((ulong)12, events.Current.Event.WorkerSequence); Assert.Equal(MxEventFamily.OperationComplete, events.Current.Event.Family); } [Fact] public async Task ReadLoop_WhenEventQueueOverflows_FaultsClient() { await using PipePair pipePair = await PipePair.CreateAsync(); await using WorkerClient client = CreateClient( pipePair, new WorkerClientOptions { EventChannelCapacity = 1, HeartbeatGrace = TimeSpan.FromSeconds(30), HeartbeatCheckInterval = TimeSpan.FromSeconds(30), }); await CompleteHandshakeAsync(client, pipePair); await pipePair.WorkerWriter.WriteAsync( CreateEventEnvelope(sequence: 11, MxEventFamily.OnDataChange)); await pipePair.WorkerWriter.WriteAsync( CreateEventEnvelope(sequence: 12, MxEventFamily.OnDataChange)); await WaitUntilAsync( () => client.State == WorkerClientState.Faulted, TestTimeout); Assert.Equal(WorkerClientState.Faulted, client.State); } [Fact] public async Task ReadLoop_WhenPipeDisconnects_FaultsClient() { await using PipePair pipePair = await PipePair.CreateAsync(); await using WorkerClient client = CreateClient(pipePair); await CompleteHandshakeAsync(client, pipePair); await pipePair.DisposeWorkerSideAsync(); await WaitUntilAsync( () => client.State == WorkerClientState.Faulted, TestTimeout); Assert.Equal(WorkerClientState.Faulted, client.State); } [Fact] public async Task HeartbeatMonitor_WhenHeartbeatExpires_FaultsClient() { await using PipePair pipePair = await PipePair.CreateAsync(); await using WorkerClient client = CreateClient( pipePair, new WorkerClientOptions { HeartbeatGrace = TimeSpan.FromMilliseconds(80), HeartbeatCheckInterval = TimeSpan.FromMilliseconds(20), EventChannelCapacity = 8, }); await CompleteHandshakeAsync(client, pipePair); await WaitUntilAsync( () => client.State == WorkerClientState.Faulted, TestTimeout); Assert.Equal(WorkerClientState.Faulted, client.State); } private static WorkerClient CreateClient( PipePair pipePair, WorkerClientOptions? options = null) { WorkerFrameProtocolOptions frameOptions = new(SessionId); WorkerClientConnection connection = new( SessionId, Nonce, pipePair.GatewayStream, frameOptions); return new WorkerClient(connection, options); } private static async Task CompleteHandshakeAsync( WorkerClient client, PipePair pipePair) { Task startTask = client.StartAsync(CancellationToken.None); WorkerEnvelope gatewayHello = await pipePair.WorkerReader.ReadAsync().AsTask().WaitAsync(TestTimeout); Assert.Equal(WorkerEnvelope.BodyOneofCase.GatewayHello, gatewayHello.BodyCase); Assert.Equal(Nonce, gatewayHello.GatewayHello.Nonce); Assert.Equal(GatewayContractInfo.WorkerProtocolVersion, gatewayHello.GatewayHello.SupportedProtocolVersion); await pipePair.WorkerWriter.WriteAsync(CreateWorkerHelloEnvelope()); await pipePair.WorkerWriter.WriteAsync(CreateWorkerReadyEnvelope()); await startTask.WaitAsync(TestTimeout); } private static WorkerCommand CreateCommand(MxCommandKind kind) { return new WorkerCommand { Command = new MxCommand { Kind = kind, }, }; } private static WorkerEnvelope CreateWorkerHelloEnvelope() { return CreateWorkerEnvelope( correlationId: string.Empty, sequence: 1, envelope => envelope.WorkerHello = new WorkerHello { ProtocolVersion = GatewayContractInfo.WorkerProtocolVersion, Nonce = Nonce, WorkerProcessId = WorkerProcessId, WorkerVersion = "fake-worker", }); } private static WorkerEnvelope CreateWorkerReadyEnvelope() { return CreateWorkerEnvelope( correlationId: string.Empty, sequence: 2, envelope => envelope.WorkerReady = new WorkerReady { WorkerProcessId = WorkerProcessId, MxaccessProgid = "LMXProxy.LMXProxyServer.1", MxaccessClsid = "{C30B52F5-2CB5-4760-AF0A-3A344A7EB5DC}", }); } private static WorkerEnvelope CreateCommandReplyEnvelope( string correlationId, MxCommandKind kind) { return CreateWorkerEnvelope( correlationId, sequence: 10, envelope => envelope.WorkerCommandReply = new WorkerCommandReply { Reply = new MxCommandReply { SessionId = SessionId, CorrelationId = correlationId, Kind = kind, }, }); } private static WorkerEnvelope CreateEventEnvelope( ulong sequence, MxEventFamily family) { return CreateWorkerEnvelope( correlationId: string.Empty, sequence, envelope => envelope.WorkerEvent = new WorkerEvent { Event = new MxEvent { SessionId = SessionId, Family = family, WorkerSequence = sequence, }, }); } private static WorkerEnvelope CreateWorkerEnvelope( string correlationId, ulong sequence, Action setBody) { WorkerEnvelope envelope = new() { ProtocolVersion = GatewayContractInfo.WorkerProtocolVersion, SessionId = SessionId, Sequence = sequence, CorrelationId = correlationId, }; setBody(envelope); return envelope; } private static async Task WaitUntilAsync( Func predicate, TimeSpan timeout) { using CancellationTokenSource cancellationTokenSource = new(timeout); while (!predicate()) { await Task.Delay(TimeSpan.FromMilliseconds(10), cancellationTokenSource.Token); } } private sealed class PipePair : IAsyncDisposable { private readonly NamedPipeClientStream _workerStream; private bool _workerSideDisposed; private PipePair( NamedPipeServerStream gatewayStream, NamedPipeClientStream workerStream) { GatewayStream = gatewayStream; _workerStream = workerStream; WorkerReader = new WorkerFrameReader(_workerStream, new WorkerFrameProtocolOptions(SessionId)); WorkerWriter = new WorkerFrameWriter(_workerStream, new WorkerFrameProtocolOptions(SessionId)); } public NamedPipeServerStream GatewayStream { get; } public WorkerFrameReader WorkerReader { get; } public WorkerFrameWriter WorkerWriter { get; } public static async Task CreateAsync() { string pipeName = $"mxaccessgw-workerclient-tests-{Guid.NewGuid():N}"; NamedPipeServerStream gatewayStream = new( pipeName, PipeDirection.InOut, maxNumberOfServerInstances: 1, PipeTransmissionMode.Byte, PipeOptions.Asynchronous); NamedPipeClientStream workerStream = new( ".", pipeName, PipeDirection.InOut, PipeOptions.Asynchronous); Task waitForConnectionTask = gatewayStream.WaitForConnectionAsync(); await workerStream.ConnectAsync(); await waitForConnectionTask; return new PipePair(gatewayStream, workerStream); } public async ValueTask DisposeWorkerSideAsync() { if (_workerSideDisposed) { return; } await _workerStream.DisposeAsync(); _workerSideDisposed = true; } public async ValueTask DisposeAsync() { await DisposeWorkerSideAsync(); await GatewayStream.DisposeAsync(); } } }