using System.IO.Pipes; using Google.Protobuf.WellKnownTypes; using MxGateway.Contracts; using MxGateway.Contracts.Proto; using MxGateway.Server.Metrics; using MxGateway.Server.Workers; using MxGateway.Tests.TestSupport; namespace MxGateway.Tests.Gateway.Workers; public sealed class WorkerClientTests { private const string SessionId = "session-worker-client"; private const string Nonce = "nonce-worker-client"; private const int WorkerProcessId = 4321; private static readonly TimeSpan TestTimeout = TimeSpan.FromSeconds(5); /// Verifies that StartAsync enters ready state after receiving worker hello and ready messages. [Fact] public async Task StartAsync_WithWorkerHelloAndReady_EntersReadyState() { await using PipePair pipePair = await PipePair.CreateAsync(); await using WorkerClient client = CreateClient(pipePair); await CompleteHandshakeAsync(client, pipePair); Assert.Equal(WorkerClientState.Ready, client.State); Assert.Equal(WorkerProcessId, client.ProcessId); } /// Verifies that InvokeAsync completes a pending command when a matching reply arrives. [Fact] public async Task InvokeAsync_WithMatchingReply_CompletesPendingCommand() { await using PipePair pipePair = await PipePair.CreateAsync(); await using WorkerClient client = CreateClient(pipePair); await CompleteHandshakeAsync(client, pipePair); Task invokeTask = client.InvokeAsync( CreateCommand(MxCommandKind.Ping), TestTimeout, CancellationToken.None); WorkerEnvelope commandEnvelope = await pipePair.WorkerReader.ReadAsync().AsTask().WaitAsync(TestTimeout); Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerCommand, commandEnvelope.BodyCase); Assert.False(string.IsNullOrWhiteSpace(commandEnvelope.CorrelationId)); await pipePair.WorkerWriter.WriteAsync( CreateCommandReplyEnvelope(commandEnvelope.CorrelationId, MxCommandKind.Ping)); WorkerCommandReply reply = await invokeTask.WaitAsync(TestTimeout); Assert.Equal(commandEnvelope.CorrelationId, reply.Reply.CorrelationId); Assert.Equal(MxCommandKind.Ping, reply.Reply.Kind); } /// Verifies that InvokeAsync ignores late replies and keeps the client ready. [Fact] public async Task InvokeAsync_WithLateReply_IgnoresLateReplyAndKeepsClientReady() { await using PipePair pipePair = await PipePair.CreateAsync(); await using WorkerClient client = CreateClient(pipePair); await CompleteHandshakeAsync(client, pipePair); Task timedOutInvokeTask = client.InvokeAsync( CreateCommand(MxCommandKind.Ping), TimeSpan.FromMilliseconds(50), CancellationToken.None); WorkerEnvelope timedOutCommand = await pipePair.WorkerReader.ReadAsync().AsTask().WaitAsync(TestTimeout); WorkerClientException exception = await Assert.ThrowsAsync( async () => await timedOutInvokeTask); Assert.Equal(WorkerClientErrorCode.CommandTimeout, exception.ErrorCode); // Send the stale reply for the already-timed-out command, then the second // command's reply. The pipe is FIFO, so the read loop processes (and discards) // the stale reply before the second reply — no fixed Task.Delay needed. await pipePair.WorkerWriter.WriteAsync( CreateCommandReplyEnvelope(timedOutCommand.CorrelationId, MxCommandKind.Ping)); Task secondInvokeTask = client.InvokeAsync( CreateCommand(MxCommandKind.GetWorkerInfo), TestTimeout, CancellationToken.None); WorkerEnvelope secondCommand = await pipePair.WorkerReader.ReadAsync().AsTask().WaitAsync(TestTimeout); await pipePair.WorkerWriter.WriteAsync( CreateCommandReplyEnvelope(secondCommand.CorrelationId, MxCommandKind.GetWorkerInfo)); WorkerCommandReply reply = await secondInvokeTask.WaitAsync(TestTimeout); Assert.Equal(WorkerClientState.Ready, client.State); Assert.Equal(MxCommandKind.GetWorkerInfo, reply.Reply.Kind); } /// Verifies that ReadEventsAsync yields events in pipe order from the worker. [Fact] public async Task ReadEventsAsync_WithWorkerEvents_YieldsEventsInPipeOrder() { await using PipePair pipePair = await PipePair.CreateAsync(); await using WorkerClient client = CreateClient(pipePair); await CompleteHandshakeAsync(client, pipePair); using CancellationTokenSource cancellationTokenSource = new(TestTimeout); await using IAsyncEnumerator events = client.ReadEventsAsync(cancellationTokenSource.Token).GetAsyncEnumerator(cancellationTokenSource.Token); await pipePair.WorkerWriter.WriteAsync( CreateEventEnvelope(sequence: 11, MxEventFamily.OnDataChange)); await pipePair.WorkerWriter.WriteAsync( CreateEventEnvelope(sequence: 12, MxEventFamily.OperationComplete)); Assert.True(await events.MoveNextAsync()); Assert.Equal((ulong)11, events.Current.Event.WorkerSequence); Assert.Equal(MxEventFamily.OnDataChange, events.Current.Event.Family); Assert.True(await events.MoveNextAsync()); Assert.Equal((ulong)12, events.Current.Event.WorkerSequence); Assert.Equal(MxEventFamily.OperationComplete, events.Current.Event.Family); } /// Verifies that the read loop faults the client when the event queue overflows. [Fact] public async Task ReadLoop_WhenEventQueueOverflows_FaultsClient() { await using PipePair pipePair = await PipePair.CreateAsync(); await using WorkerClient client = CreateClient( pipePair, new WorkerClientOptions { EventChannelCapacity = 1, HeartbeatGrace = TimeSpan.FromSeconds(30), HeartbeatCheckInterval = TimeSpan.FromSeconds(30), }); await CompleteHandshakeAsync(client, pipePair); await pipePair.WorkerWriter.WriteAsync( CreateEventEnvelope(sequence: 11, MxEventFamily.OnDataChange)); await pipePair.WorkerWriter.WriteAsync( CreateEventEnvelope(sequence: 12, MxEventFamily.OnDataChange)); await WaitUntilAsync( () => client.State == WorkerClientState.Faulted, TestTimeout); Assert.Equal(WorkerClientState.Faulted, client.State); } /// /// Verifies that when the client faults it kills the owned worker process. /// The assertion waits on , which /// completes exactly when Kill runs, instead of polling client.State. /// Polling state is racy: publishes the /// Faulted state before it calls KillOwnedProcess, so a state-based /// wait can observe Faulted while KillCount is still 0. /// [Fact] public async Task ReadLoop_WhenClientFaults_KillsOwnedWorkerProcess() { await using PipePair pipePair = await PipePair.CreateAsync(); FakeWorkerProcess process = new(); await using WorkerClient client = CreateClient( pipePair, new WorkerClientOptions { EventChannelCapacity = 1, HeartbeatGrace = TimeSpan.FromSeconds(30), HeartbeatCheckInterval = TimeSpan.FromSeconds(30), }, processHandle: CreateProcessHandle(process)); await CompleteHandshakeAsync(client, pipePair); await pipePair.WorkerWriter.WriteAsync( CreateEventEnvelope(sequence: 11, MxEventFamily.OnDataChange)); await pipePair.WorkerWriter.WriteAsync( CreateEventEnvelope(sequence: 12, MxEventFamily.OnDataChange)); // Deterministic: this completes the instant Kill() runs, with no timing window. using CancellationTokenSource exitTimeout = new(TestTimeout); await process.WaitForExitAsync(exitTimeout.Token); Assert.Equal(WorkerClientState.Faulted, client.State); Assert.Equal(1, process.KillCount); Assert.True(process.KillEntireProcessTree); Assert.True(process.HasExited); } /// /// Verifies that a worker faulting mid-command — the pipe dropping while an /// is still pending — completes the pending /// invoke task with a carrying the /// pipe-disconnected error code rather than hanging until the command timeout. /// [Fact] public async Task InvokeAsync_WhenPipeDisconnectsMidCommand_FailsPendingInvokeWithPipeDisconnected() { await using PipePair pipePair = await PipePair.CreateAsync(); await using WorkerClient client = CreateClient(pipePair); await CompleteHandshakeAsync(client, pipePair); Task invokeTask = client.InvokeAsync( CreateCommand(MxCommandKind.Ping), TestTimeout, CancellationToken.None); // The worker received the command but disconnects before replying. WorkerEnvelope commandEnvelope = await pipePair.WorkerReader.ReadAsync().AsTask().WaitAsync(TestTimeout); Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerCommand, commandEnvelope.BodyCase); await pipePair.DisposeWorkerSideAsync(); WorkerClientException exception = await Assert.ThrowsAsync( async () => await invokeTask.WaitAsync(TestTimeout)); Assert.Equal(WorkerClientErrorCode.PipeDisconnected, exception.ErrorCode); await WaitUntilAsync(() => client.State == WorkerClientState.Faulted, TestTimeout); Assert.Equal(WorkerClientState.Faulted, client.State); } /// /// Verifies that a worker emitting a WorkerFault envelope while an /// is pending completes the pending invoke /// task with a carrying the worker-faulted /// error code. /// [Fact] public async Task InvokeAsync_WhenWorkerFaultsMidCommand_FailsPendingInvokeWithWorkerFaulted() { await using PipePair pipePair = await PipePair.CreateAsync(); await using WorkerClient client = CreateClient(pipePair); await CompleteHandshakeAsync(client, pipePair); Task invokeTask = client.InvokeAsync( CreateCommand(MxCommandKind.Ping), TestTimeout, CancellationToken.None); WorkerEnvelope commandEnvelope = await pipePair.WorkerReader.ReadAsync().AsTask().WaitAsync(TestTimeout); Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerCommand, commandEnvelope.BodyCase); await pipePair.WorkerWriter.WriteAsync(CreateWorkerFaultEnvelope("scripted mid-command fault")); WorkerClientException exception = await Assert.ThrowsAsync( async () => await invokeTask.WaitAsync(TestTimeout)); Assert.Equal(WorkerClientErrorCode.WorkerFaulted, exception.ErrorCode); await WaitUntilAsync(() => client.State == WorkerClientState.Faulted, TestTimeout); Assert.Equal(WorkerClientState.Faulted, client.State); } [Fact] public async Task ReadLoop_WhenPipeDisconnects_FaultsClient() { await using PipePair pipePair = await PipePair.CreateAsync(); await using WorkerClient client = CreateClient(pipePair); await CompleteHandshakeAsync(client, pipePair); await pipePair.DisposeWorkerSideAsync(); await WaitUntilAsync( () => client.State == WorkerClientState.Faulted, TestTimeout); Assert.Equal(WorkerClientState.Faulted, client.State); } /// Verifies that the read loop stops the running worker metric when the pipe disconnects. [Fact] public async Task ReadLoop_WhenPipeDisconnects_StopsRunningWorkerMetric() { await using PipePair pipePair = await PipePair.CreateAsync(); using GatewayMetrics metrics = new(); await using WorkerClient client = CreateClient(pipePair, metrics: metrics); await CompleteHandshakeAsync(client, pipePair); Assert.Equal(1, metrics.GetSnapshot().WorkersRunning); await pipePair.DisposeWorkerSideAsync(); await WaitUntilAsync( () => client.State == WorkerClientState.Faulted && metrics.GetSnapshot().WorkersRunning == 0, TestTimeout); GatewayMetricsSnapshot snapshot = metrics.GetSnapshot(); Assert.Equal(0, snapshot.WorkersRunning); Assert.Equal(1, snapshot.WorkerExits); } /// Verifies that DisposeAsync returns within a bounded timeout when the pipe read is blocked. [Fact] public async Task DisposeAsync_WhenPipeReadIsBlocked_ReturnsWithinBoundedTimeout() { await using PipePair pipePair = await PipePair.CreateAsync(); WorkerClient client = CreateClient(pipePair); await CompleteHandshakeAsync(client, pipePair); DateTimeOffset startedAt = DateTimeOffset.UtcNow; await client.DisposeAsync().AsTask().WaitAsync(TestTimeout); TimeSpan elapsed = DateTimeOffset.UtcNow - startedAt; Assert.True( elapsed < TimeSpan.FromSeconds(4), $"DisposeAsync took {elapsed.TotalMilliseconds:N0}ms."); } /// Verifies that the read loop updates the last heartbeat and worker process when a heartbeat arrives. [Fact] public async Task DisposeAsync_WhenOwnedWorkerStillRuns_KillsProcessBeforeDisposing() { await using PipePair pipePair = await PipePair.CreateAsync(); FakeWorkerProcess process = new(); WorkerClient client = CreateClient(pipePair, processHandle: CreateProcessHandle(process)); await client.DisposeAsync().AsTask().WaitAsync(TestTimeout); Assert.Equal(1, process.KillCount); Assert.True(process.KillEntireProcessTree); Assert.True(process.Disposed); } /// /// Verifies that a heartbeat envelope updates the last-heartbeat timestamp and worker /// process id. Uses a so the timestamp advance is /// deterministic instead of relying on a wall-clock Task.Delay exceeding /// resolution. /// [Fact] public async Task ReadLoop_WhenHeartbeatArrives_UpdatesLastHeartbeatAndWorkerProcess() { ManualTimeProvider clock = new(DateTimeOffset.Parse("2026-05-18T12:00:00Z", System.Globalization.CultureInfo.InvariantCulture)); await using PipePair pipePair = await PipePair.CreateAsync(); await using WorkerClient client = CreateClient(pipePair, timeProvider: clock); await CompleteHandshakeAsync(client, pipePair); DateTimeOffset previousHeartbeat = client.LastHeartbeatAt; clock.Advance(TimeSpan.FromSeconds(1)); await pipePair.WorkerWriter.WriteAsync(CreateHeartbeatEnvelope(workerProcessId: 9876)); await WaitUntilAsync( () => client.ProcessId == 9876 && client.LastHeartbeatAt > previousHeartbeat, TestTimeout); Assert.Equal(WorkerClientState.Ready, client.State); Assert.Equal(previousHeartbeat + TimeSpan.FromSeconds(1), client.LastHeartbeatAt); } /// /// Verifies that the heartbeat monitor faults the client when the heartbeat expires. /// Uses an injected so the grace comparison is deterministic /// instead of depending on real wall-clock advance; the monitor's /// timer stays on the real clock and /// observes the manually-advanced grace on its next tick. /// [Fact] public async Task HeartbeatMonitor_WhenHeartbeatExpires_FaultsClient() { ManualTimeProvider clock = new(DateTimeOffset.Parse("2026-05-20T12:00:00Z", System.Globalization.CultureInfo.InvariantCulture)); await using PipePair pipePair = await PipePair.CreateAsync(); await using WorkerClient client = CreateClient( pipePair, new WorkerClientOptions { HeartbeatGrace = TimeSpan.FromMilliseconds(80), HeartbeatCheckInterval = TimeSpan.FromMilliseconds(20), EventChannelCapacity = 8, }, timeProvider: clock); await CompleteHandshakeAsync(client, pipePair); clock.Advance(TimeSpan.FromSeconds(2)); await WaitUntilAsync( () => client.State == WorkerClientState.Faulted, TestTimeout); Assert.Equal(WorkerClientState.Faulted, client.State); } private static WorkerClient CreateClient( PipePair pipePair, WorkerClientOptions? options = null, GatewayMetrics? metrics = null, WorkerProcessHandle? processHandle = null, TimeProvider? timeProvider = null) { WorkerFrameProtocolOptions frameOptions = new(SessionId); WorkerClientConnection connection = new( SessionId, Nonce, pipePair.GatewayStream, frameOptions, processHandle); return new WorkerClient(connection, options, metrics, timeProvider); } private static WorkerProcessHandle CreateProcessHandle(FakeWorkerProcess process) { return new WorkerProcessHandle( process, new WorkerProcessCommandLine("MxGateway.Worker.exe", []), DateTimeOffset.UtcNow); } private static async Task CompleteHandshakeAsync( WorkerClient client, PipePair pipePair) { Task startTask = client.StartAsync(CancellationToken.None); WorkerEnvelope gatewayHello = await pipePair.WorkerReader.ReadAsync().AsTask().WaitAsync(TestTimeout); Assert.Equal(WorkerEnvelope.BodyOneofCase.GatewayHello, gatewayHello.BodyCase); Assert.Equal(Nonce, gatewayHello.GatewayHello.Nonce); Assert.Equal(GatewayContractInfo.WorkerProtocolVersion, gatewayHello.GatewayHello.SupportedProtocolVersion); await pipePair.WorkerWriter.WriteAsync(CreateWorkerHelloEnvelope()); await pipePair.WorkerWriter.WriteAsync(CreateWorkerReadyEnvelope()); await startTask.WaitAsync(TestTimeout); } private static WorkerCommand CreateCommand(MxCommandKind kind) { return new WorkerCommand { Command = new MxCommand { Kind = kind, }, }; } private static WorkerEnvelope CreateWorkerHelloEnvelope() { return CreateWorkerEnvelope( correlationId: string.Empty, sequence: 1, envelope => envelope.WorkerHello = new WorkerHello { ProtocolVersion = GatewayContractInfo.WorkerProtocolVersion, Nonce = Nonce, WorkerProcessId = WorkerProcessId, WorkerVersion = "fake-worker", }); } private static WorkerEnvelope CreateWorkerReadyEnvelope() { return CreateWorkerEnvelope( correlationId: string.Empty, sequence: 2, envelope => envelope.WorkerReady = new WorkerReady { WorkerProcessId = WorkerProcessId, MxaccessProgid = "LMXProxy.LMXProxyServer.1", MxaccessClsid = "{C30B52F5-2CB5-4760-AF0A-3A344A7EB5DC}", }); } private static WorkerEnvelope CreateCommandReplyEnvelope( string correlationId, MxCommandKind kind) { return CreateWorkerEnvelope( correlationId, sequence: 10, envelope => envelope.WorkerCommandReply = new WorkerCommandReply { Reply = new MxCommandReply { SessionId = SessionId, CorrelationId = correlationId, Kind = kind, }, }); } private static WorkerEnvelope CreateEventEnvelope( ulong sequence, MxEventFamily family) { return CreateWorkerEnvelope( correlationId: string.Empty, sequence, envelope => envelope.WorkerEvent = new WorkerEvent { Event = new MxEvent { SessionId = SessionId, Family = family, WorkerSequence = sequence, }, }); } private static WorkerEnvelope CreateWorkerFaultEnvelope(string diagnosticMessage) { return CreateWorkerEnvelope( correlationId: string.Empty, sequence: 30, envelope => envelope.WorkerFault = new WorkerFault { Category = WorkerFaultCategory.MxaccessCommandFailed, DiagnosticMessage = diagnosticMessage, ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.WorkerUnavailable, Message = diagnosticMessage, }, }); } private static WorkerEnvelope CreateHeartbeatEnvelope(int workerProcessId) { return CreateWorkerEnvelope( correlationId: string.Empty, sequence: 20, envelope => envelope.WorkerHeartbeat = new WorkerHeartbeat { WorkerProcessId = workerProcessId, State = WorkerState.Ready, LastStaActivityTimestamp = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow), PendingCommandCount = 0, OutboundEventQueueDepth = 0, }); } private static WorkerEnvelope CreateWorkerEnvelope( string correlationId, ulong sequence, Action setBody) { WorkerEnvelope envelope = new() { ProtocolVersion = GatewayContractInfo.WorkerProtocolVersion, SessionId = SessionId, Sequence = sequence, CorrelationId = correlationId, }; setBody(envelope); return envelope; } private static async Task WaitUntilAsync( Func predicate, TimeSpan timeout) { using CancellationTokenSource cancellationTokenSource = new(timeout); while (!predicate()) { await Task.Delay(TimeSpan.FromMilliseconds(10), cancellationTokenSource.Token); } } private sealed class PipePair : IAsyncDisposable { private readonly NamedPipeClientStream _workerStream; private bool _workerSideDisposed; private PipePair( NamedPipeServerStream gatewayStream, NamedPipeClientStream workerStream) { GatewayStream = gatewayStream; _workerStream = workerStream; WorkerReader = new WorkerFrameReader(_workerStream, new WorkerFrameProtocolOptions(SessionId)); WorkerWriter = new WorkerFrameWriter(_workerStream, new WorkerFrameProtocolOptions(SessionId)); } /// The gateway side of the named pipe connection. public NamedPipeServerStream GatewayStream { get; } /// Frame reader for worker messages. public WorkerFrameReader WorkerReader { get; } /// Frame writer for worker messages. public WorkerFrameWriter WorkerWriter { get; } /// Creates a connected pipe pair for testing. public static async Task CreateAsync() { string pipeName = $"mxaccessgw-workerclient-tests-{Guid.NewGuid():N}"; NamedPipeServerStream gatewayStream = new( pipeName, PipeDirection.InOut, maxNumberOfServerInstances: 1, PipeTransmissionMode.Byte, PipeOptions.Asynchronous); NamedPipeClientStream workerStream = new( ".", pipeName, PipeDirection.InOut, PipeOptions.Asynchronous); Task waitForConnectionTask = gatewayStream.WaitForConnectionAsync(); await workerStream.ConnectAsync(); await waitForConnectionTask; return new PipePair(gatewayStream, workerStream); } /// Disposes the worker side of the pipe. public async ValueTask DisposeWorkerSideAsync() { if (_workerSideDisposed) { return; } await _workerStream.DisposeAsync(); _workerSideDisposed = true; } /// Disposes the duplex stream. public async ValueTask DisposeAsync() { await DisposeWorkerSideAsync(); await GatewayStream.DisposeAsync(); } } private sealed class FakeWorkerProcess : IWorkerProcess { private readonly TaskCompletionSource _exited = new(TaskCreationOptions.RunContinuationsAsynchronously); public int Id { get; } = WorkerProcessId; public bool HasExited { get; private set; } public int? ExitCode { get; private set; } public int KillCount { get; private set; } public bool KillEntireProcessTree { get; private set; } public bool Disposed { get; private set; } public ValueTask WaitForExitAsync(CancellationToken cancellationToken) { return new ValueTask(_exited.Task.WaitAsync(cancellationToken)); } public void Kill(bool entireProcessTree) { KillCount++; KillEntireProcessTree = entireProcessTree; HasExited = true; ExitCode = -1; _exited.TrySetResult(); } public void Dispose() { Disposed = true; } } }