From bb5139fec26aab4da1b23182bdebc58272ee2448 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 15 Jun 2026 10:56:56 -0400 Subject: [PATCH] test(gateway): fake worker responds to control commands (A6) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add RespondToControlCommandAsync to FakeWorkerHarness so scripted fake workers can auto-reply to the five control command kinds (Ping, GetSessionState, GetWorkerInfo, DrainEvents, ShutdownWorker) with canned replies whose shapes match the real WorkerPipeSession helpers. Add five unit tests in FakeWorkerHarnessTests covering each control command kind through the WorkerClient→pipe roundtrip, and one gateway E2E test (GatewayService_WithFakeWorker_ControlCommandsRoundtripThroughGateway) that exercises Ping, GetWorkerInfo, and DrainEvents through the full gRPC→SessionManager→WorkerClient→named-pipe path using a scripted ControlCommandFakeWorkerProcessLauncher. --- .../GatewayEndToEndFakeWorkerSmokeTests.cs | 246 ++++++++++++++++++ .../Gateway/Workers/FakeWorkerHarnessTests.cs | 147 ++++++++++- .../Workers/Fakes/FakeWorkerHarness.cs | 81 ++++++ 3 files changed, 466 insertions(+), 8 deletions(-) diff --git a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/GatewayEndToEndFakeWorkerSmokeTests.cs b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/GatewayEndToEndFakeWorkerSmokeTests.cs index 2185f29..0796638 100644 --- a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/GatewayEndToEndFakeWorkerSmokeTests.cs +++ b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/GatewayEndToEndFakeWorkerSmokeTests.cs @@ -94,6 +94,103 @@ public sealed class GatewayEndToEndFakeWorkerSmokeTests launcher.CommandKinds); } + /// + /// Verifies that the gateway forwards control commands (Ping, GetWorkerInfo, DrainEvents) + /// through the full gRPC→WorkerClient→pipe roundtrip when the fake worker responds + /// with canned replies via RespondToControlCommandAsync. + /// + [Fact] + public async Task GatewayService_WithFakeWorker_ControlCommandsRoundtripThroughGateway() + { + ControlCommandFakeWorkerProcessLauncher launcher = new(); + await using GatewayServiceFixture fixture = new(launcher); + + OpenSessionReply openReply = await fixture.Service.OpenSession( + new OpenSessionRequest + { + ClientSessionName = "control-cmd-e2e", + ClientCorrelationId = "control-open-correlation", + CommandTimeout = Duration.FromTimeSpan(TestTimeout), + }, + new TestServerCallContext()); + + Assert.Equal(ProtocolStatusCode.Ok, openReply.ProtocolStatus.Code); + string sessionId = openReply.SessionId; + + // Ping — the scripted worker echoes back the message. + Task pingTask = fixture.Service.Invoke( + new MxCommandRequest + { + SessionId = sessionId, + ClientCorrelationId = "ping-correlation", + Command = new MxCommand + { + Kind = MxCommandKind.Ping, + Ping = new PingCommand { Message = "e2e-ping" }, + }, + }, + new TestServerCallContext()); + await launcher.WaitForNextControlCommandAsync(TestTimeout); + MxCommandReply pingReply = await pingTask.WaitAsync(TestTimeout); + + Assert.Equal(ProtocolStatusCode.Ok, pingReply.ProtocolStatus.Code); + Assert.Equal(MxCommandKind.Ping, pingReply.Kind); + Assert.Equal("e2e-ping", pingReply.DiagnosticMessage); + + // GetWorkerInfo — the scripted worker returns canned info. + Task infoTask = fixture.Service.Invoke( + new MxCommandRequest + { + SessionId = sessionId, + ClientCorrelationId = "info-correlation", + Command = new MxCommand + { + Kind = MxCommandKind.GetWorkerInfo, + GetWorkerInfo = new GetWorkerInfoCommand(), + }, + }, + new TestServerCallContext()); + await launcher.WaitForNextControlCommandAsync(TestTimeout); + MxCommandReply infoReply = await infoTask.WaitAsync(TestTimeout); + + Assert.Equal(ProtocolStatusCode.Ok, infoReply.ProtocolStatus.Code); + Assert.Equal(MxCommandKind.GetWorkerInfo, infoReply.Kind); + Assert.NotNull(infoReply.WorkerInfo); + Assert.Equal(ControlCommandFakeWorkerProcessLauncher.ProcessId, infoReply.WorkerInfo.WorkerProcessId); + Assert.False(string.IsNullOrEmpty(infoReply.WorkerInfo.MxaccessProgid)); + + // DrainEvents — the scripted worker returns an empty drain reply. + Task drainTask = fixture.Service.Invoke( + new MxCommandRequest + { + SessionId = sessionId, + ClientCorrelationId = "drain-correlation", + Command = new MxCommand + { + Kind = MxCommandKind.DrainEvents, + DrainEvents = new DrainEventsCommand { MaxEvents = 16 }, + }, + }, + new TestServerCallContext()); + await launcher.WaitForNextControlCommandAsync(TestTimeout); + MxCommandReply drainReply = await drainTask.WaitAsync(TestTimeout); + + Assert.Equal(ProtocolStatusCode.Ok, drainReply.ProtocolStatus.Code); + Assert.Equal(MxCommandKind.DrainEvents, drainReply.Kind); + Assert.NotNull(drainReply.DrainEvents); + Assert.Empty(drainReply.DrainEvents.Events); + + // Tear down cleanly. + await fixture.Service.CloseSession( + new CloseSessionRequest + { + SessionId = sessionId, + ClientCorrelationId = "control-close-correlation", + }, + new TestServerCallContext()); + await launcher.WorkerTask.WaitAsync(TestTimeout); + } + private static MxCommandRequest CreateRegisterRequest(string sessionId) { return new MxCommandRequest @@ -355,6 +452,155 @@ public sealed class GatewayEndToEndFakeWorkerSmokeTests } } + /// + /// A fake worker launcher whose scripted worker automatically responds to control + /// commands (Ping, GetWorkerInfo, DrainEvents) using + /// and sends a shutdown ack when the gateway closes the session. Exposes + /// so the test can drive the interaction + /// one command at a time without races. + /// + private sealed class ControlCommandFakeWorkerProcessLauncher : IWorkerProcessLauncher + { + public const int ProcessId = 5590; + + private readonly FakeWorkerProcess _process = new(ProcessId); + private readonly SemaphoreSlim _commandHandled = new(0); + + /// Gets the task backing the scripted worker loop. + public Task WorkerTask { get; private set; } = Task.CompletedTask; + + /// + public Task LaunchAsync( + WorkerProcessLaunchRequest request, + CancellationToken cancellationToken = default) + { + WorkerTask = RunWorkerAsync(request, cancellationToken); + + return Task.FromResult(new WorkerProcessHandle( + _process, + new WorkerProcessCommandLine("fake-control-worker.exe", []), + DateTimeOffset.UtcNow)); + } + + /// Waits until the scripted worker has responded to one control command. + /// Maximum time to wait. + public async Task WaitForNextControlCommandAsync(TimeSpan timeout) + { + using CancellationTokenSource cts = new(timeout); + await _commandHandled.WaitAsync(cts.Token).ConfigureAwait(false); + } + + private async Task RunWorkerAsync( + WorkerProcessLaunchRequest request, + CancellationToken cancellationToken) + { + await using FakeWorkerHarness harness = await FakeWorkerHarness.ConnectToGatewayPipeAsync( + request.SessionId, + request.Nonce, + request.PipeName, + request.ProtocolVersion, + cancellationToken: cancellationToken).ConfigureAwait(false); + await harness.CompleteStartupAsync(ProcessId, cancellationToken: cancellationToken).ConfigureAwait(false); + + while (!cancellationToken.IsCancellationRequested) + { + WorkerEnvelope envelope = await harness + .ReadGatewayEnvelopeAsync(cancellationToken) + .ConfigureAwait(false); + + if (envelope.BodyCase == WorkerEnvelope.BodyOneofCase.WorkerShutdown) + { + await harness.SendShutdownAckAsync(cancellationToken: cancellationToken).ConfigureAwait(false); + _process.MarkExited(0); + return; + } + + if (envelope.BodyCase == WorkerEnvelope.BodyOneofCase.WorkerCommand) + { + MxCommandKind kind = envelope.WorkerCommand?.Command?.Kind ?? MxCommandKind.Unspecified; + if (kind is MxCommandKind.Ping or MxCommandKind.GetSessionState + or MxCommandKind.GetWorkerInfo or MxCommandKind.DrainEvents + or MxCommandKind.ShutdownWorker) + { + // Re-enter the harness to process the already-read envelope + // by replaying it through the control-command responder path. + await RespondToKnownControlCommandAsync(harness, envelope, cancellationToken) + .ConfigureAwait(false); + _commandHandled.Release(); + continue; + } + } + + throw new InvalidOperationException( + $"ControlCommandFakeWorkerProcessLauncher received unexpected envelope {envelope.BodyCase}."); + } + } + + private static async Task RespondToKnownControlCommandAsync( + FakeWorkerHarness harness, + WorkerEnvelope commandEnvelope, + CancellationToken cancellationToken) + { + MxCommand command = commandEnvelope.WorkerCommand.Command; + switch (command.Kind) + { + case MxCommandKind.Ping: + await harness.ReplyToCommandAsync( + commandEnvelope, + configureReply: reply => + { + string? message = command.Ping?.Message; + if (!string.IsNullOrEmpty(message)) + { + reply.DiagnosticMessage = message; + } + }, + cancellationToken: cancellationToken).ConfigureAwait(false); + break; + + case MxCommandKind.GetSessionState: + await harness.ReplyToCommandAsync( + commandEnvelope, + configureReply: reply => reply.SessionState = new SessionStateReply + { + State = SessionState.Ready, + }, + cancellationToken: cancellationToken).ConfigureAwait(false); + break; + + case MxCommandKind.GetWorkerInfo: + await harness.ReplyToCommandAsync( + commandEnvelope, + configureReply: reply => reply.WorkerInfo = new WorkerInfoReply + { + WorkerProcessId = ControlCommandFakeWorkerProcessLauncher.ProcessId, + WorkerVersion = "fake-control-worker", + MxaccessProgid = "LMXProxy.LMXProxyServer.1", + MxaccessClsid = "{C30B52F5-2CB5-4760-AF0A-3A344A7EB5DC}", + }, + cancellationToken: cancellationToken).ConfigureAwait(false); + break; + + case MxCommandKind.DrainEvents: + await harness.ReplyToCommandAsync( + commandEnvelope, + configureReply: reply => reply.DrainEvents = new DrainEventsReply(), + cancellationToken: cancellationToken).ConfigureAwait(false); + break; + + case MxCommandKind.ShutdownWorker: + await harness.ReplyToCommandAsync(commandEnvelope, cancellationToken: cancellationToken) + .ConfigureAwait(false); + await harness.SendShutdownAckAsync(cancellationToken: cancellationToken).ConfigureAwait(false); + break; + + default: + throw new InvalidOperationException( + $"Unexpected control command kind {command.Kind} in ControlCommandFakeWorkerProcessLauncher."); + } + } + } + private sealed class FakeWorkerProcess(int processId) : IWorkerProcess { private readonly TaskCompletionSource _exited = new(TaskCreationOptions.RunContinuationsAsynchronously); diff --git a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Workers/FakeWorkerHarnessTests.cs b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Workers/FakeWorkerHarnessTests.cs index a4edd57..f5a3902 100644 --- a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Workers/FakeWorkerHarnessTests.cs +++ b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Workers/FakeWorkerHarnessTests.cs @@ -192,6 +192,139 @@ public sealed class FakeWorkerHarnessTests Assert.Equal(WorkerClientState.Closed, client.State); } + /// + /// Verifies that RespondToControlCommandAsync echoes the Ping message back + /// in the DiagnosticMessage field, matching the real worker's ping reply shape. + /// + [Fact] + public async Task RespondToControlCommandAsync_Ping_EchoesMessageInDiagnostic() + { + await using FakeWorkerHarness fakeWorker = await FakeWorkerHarness.CreateConnectedPairAsync(); + await using WorkerClient client = fakeWorker.CreateClient(); + await StartClientAsync(fakeWorker, client); + + Task invokeTask = client.InvokeAsync( + CreateCommand(MxCommandKind.Ping, cmd => cmd.Ping = new PingCommand { Message = "hello-ping" }), + TestTimeout, + CancellationToken.None); + await fakeWorker.RespondToControlCommandAsync(); + + WorkerCommandReply reply = await invokeTask.WaitAsync(TestTimeout); + + Assert.Equal(MxCommandKind.Ping, reply.Reply.Kind); + Assert.Equal(ProtocolStatusCode.Ok, reply.Reply.ProtocolStatus.Code); + Assert.Equal("hello-ping", reply.Reply.DiagnosticMessage); + } + + /// + /// Verifies that RespondToControlCommandAsync returns a SessionStateReply + /// with state Ready for a GetSessionState command. + /// + [Fact] + public async Task RespondToControlCommandAsync_GetSessionState_ReturnsReadyState() + { + await using FakeWorkerHarness fakeWorker = await FakeWorkerHarness.CreateConnectedPairAsync(); + await using WorkerClient client = fakeWorker.CreateClient(); + await StartClientAsync(fakeWorker, client); + + Task invokeTask = client.InvokeAsync( + CreateCommand(MxCommandKind.GetSessionState), + TestTimeout, + CancellationToken.None); + await fakeWorker.RespondToControlCommandAsync(); + + WorkerCommandReply reply = await invokeTask.WaitAsync(TestTimeout); + + Assert.Equal(MxCommandKind.GetSessionState, reply.Reply.Kind); + Assert.Equal(ProtocolStatusCode.Ok, reply.Reply.ProtocolStatus.Code); + Assert.NotNull(reply.Reply.SessionState); + Assert.Equal(SessionState.Ready, reply.Reply.SessionState.State); + } + + /// + /// Verifies that RespondToControlCommandAsync returns a WorkerInfoReply + /// with the fake worker's process ID, version, and MXAccess identifiers. + /// + [Fact] + public async Task RespondToControlCommandAsync_GetWorkerInfo_ReturnsFakeWorkerInfo() + { + await using FakeWorkerHarness fakeWorker = await FakeWorkerHarness.CreateConnectedPairAsync(); + await using WorkerClient client = fakeWorker.CreateClient(); + await StartClientAsync(fakeWorker, client); + + Task invokeTask = client.InvokeAsync( + CreateCommand(MxCommandKind.GetWorkerInfo), + TestTimeout, + CancellationToken.None); + await fakeWorker.RespondToControlCommandAsync(); + + WorkerCommandReply reply = await invokeTask.WaitAsync(TestTimeout); + + Assert.Equal(MxCommandKind.GetWorkerInfo, reply.Reply.Kind); + Assert.Equal(ProtocolStatusCode.Ok, reply.Reply.ProtocolStatus.Code); + Assert.NotNull(reply.Reply.WorkerInfo); + Assert.Equal(FakeWorkerHarness.DefaultWorkerProcessId, reply.Reply.WorkerInfo.WorkerProcessId); + Assert.Equal("LMXProxy.LMXProxyServer.1", reply.Reply.WorkerInfo.MxaccessProgid); + Assert.False(string.IsNullOrEmpty(reply.Reply.WorkerInfo.MxaccessClsid)); + } + + /// + /// Verifies that RespondToControlCommandAsync returns an empty DrainEventsReply + /// for a DrainEvents command (the fake harness has no queued events). + /// + [Fact] + public async Task RespondToControlCommandAsync_DrainEvents_ReturnsEmptyReply() + { + await using FakeWorkerHarness fakeWorker = await FakeWorkerHarness.CreateConnectedPairAsync(); + await using WorkerClient client = fakeWorker.CreateClient(); + await StartClientAsync(fakeWorker, client); + + Task invokeTask = client.InvokeAsync( + CreateCommand(MxCommandKind.DrainEvents, cmd => cmd.DrainEvents = new DrainEventsCommand { MaxEvents = 32 }), + TestTimeout, + CancellationToken.None); + await fakeWorker.RespondToControlCommandAsync(); + + WorkerCommandReply reply = await invokeTask.WaitAsync(TestTimeout); + + Assert.Equal(MxCommandKind.DrainEvents, reply.Reply.Kind); + Assert.Equal(ProtocolStatusCode.Ok, reply.Reply.ProtocolStatus.Code); + Assert.NotNull(reply.Reply.DrainEvents); + Assert.Empty(reply.Reply.DrainEvents.Events); + } + + /// + /// Verifies that RespondToControlCommandAsync for ShutdownWorker sends an OK + /// reply followed by a WorkerShutdownAck, which closes the client. + /// + [Fact] + public async Task RespondToControlCommandAsync_ShutdownWorker_SendsReplyThenAck() + { + await using FakeWorkerHarness fakeWorker = await FakeWorkerHarness.CreateConnectedPairAsync(); + await using WorkerClient client = fakeWorker.CreateClient(); + await StartClientAsync(fakeWorker, client); + + // ShutdownAsync triggers a WorkerShutdown envelope (not WorkerCommand), + // so we directly invoke ShutdownWorker as a control command via InvokeAsync. + Task invokeTask = client.InvokeAsync( + CreateCommand(MxCommandKind.ShutdownWorker, cmd => cmd.ShutdownWorker = new ShutdownWorkerCommand()), + TestTimeout, + CancellationToken.None); + + // The harness reads the ShutdownWorker WorkerCommand and replies with + // OK + ShutdownAck — the WorkerClient's read loop processes the ack and + // transitions to Closed. + await fakeWorker.RespondToControlCommandAsync(); + + WorkerCommandReply reply = await invokeTask.WaitAsync(TestTimeout); + + Assert.Equal(MxCommandKind.ShutdownWorker, reply.Reply.Kind); + Assert.Equal(ProtocolStatusCode.Ok, reply.Reply.ProtocolStatus.Code); + + await WaitUntilAsync(() => client.State == WorkerClientState.Closed, TestTimeout); + Assert.Equal(WorkerClientState.Closed, client.State); + } + private static async Task StartClientAsync( FakeWorkerHarness fakeWorker, WorkerClient client) @@ -201,15 +334,13 @@ public sealed class FakeWorkerHarnessTests await startTask.WaitAsync(TestTimeout).ConfigureAwait(false); } - private static WorkerCommand CreateCommand(MxCommandKind kind) + private static WorkerCommand CreateCommand( + MxCommandKind kind, + Action? configure = null) { - return new WorkerCommand - { - Command = new MxCommand - { - Kind = kind, - }, - }; + MxCommand command = new() { Kind = kind }; + configure?.Invoke(command); + return new WorkerCommand { Command = command }; } private static async Task WaitUntilAsync( diff --git a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Workers/Fakes/FakeWorkerHarness.cs b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Workers/Fakes/FakeWorkerHarness.cs index 9d03b6d..2277497 100644 --- a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Workers/Fakes/FakeWorkerHarness.cs +++ b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Workers/Fakes/FakeWorkerHarness.cs @@ -391,6 +391,87 @@ public sealed class FakeWorkerHarness : IAsyncDisposable cancellationToken).ConfigureAwait(false); } + /// + /// Reads one incoming command envelope and, if it is one of the five + /// control command kinds (Ping, GetSessionState, GetWorkerInfo, DrainEvents, + /// ShutdownWorker), writes a canned reply that mirrors the real worker's + /// reply shape. For ShutdownWorker the method additionally sends a + /// after the OK reply, matching the real + /// worker's shutdown flow. + /// + /// Token to cancel the asynchronous operation. + /// The command envelope that was handled. + /// + /// Thrown when the next envelope is not a WorkerCommand or contains a + /// non-control command kind. + /// + public async Task RespondToControlCommandAsync( + CancellationToken cancellationToken = default) + { + WorkerEnvelope commandEnvelope = await ReadCommandAsync(cancellationToken).ConfigureAwait(false); + MxCommand command = commandEnvelope.WorkerCommand.Command; + + switch (command.Kind) + { + case MxCommandKind.Ping: + await ReplyToCommandAsync( + commandEnvelope, + configureReply: reply => + { + string? message = command.Ping?.Message; + if (!string.IsNullOrEmpty(message)) + { + reply.DiagnosticMessage = message; + } + }, + cancellationToken: cancellationToken).ConfigureAwait(false); + break; + + case MxCommandKind.GetSessionState: + await ReplyToCommandAsync( + commandEnvelope, + configureReply: reply => reply.SessionState = new SessionStateReply + { + State = SessionState.Ready, + }, + cancellationToken: cancellationToken).ConfigureAwait(false); + break; + + case MxCommandKind.GetWorkerInfo: + await ReplyToCommandAsync( + commandEnvelope, + configureReply: reply => reply.WorkerInfo = new WorkerInfoReply + { + WorkerProcessId = DefaultWorkerProcessId, + WorkerVersion = "fake-worker", + MxaccessProgid = "LMXProxy.LMXProxyServer.1", + MxaccessClsid = "{C30B52F5-2CB5-4760-AF0A-3A344A7EB5DC}", + }, + cancellationToken: cancellationToken).ConfigureAwait(false); + break; + + case MxCommandKind.DrainEvents: + await ReplyToCommandAsync( + commandEnvelope, + configureReply: reply => reply.DrainEvents = new DrainEventsReply(), + cancellationToken: cancellationToken).ConfigureAwait(false); + break; + + case MxCommandKind.ShutdownWorker: + await ReplyToCommandAsync( + commandEnvelope, + cancellationToken: cancellationToken).ConfigureAwait(false); + await SendShutdownAckAsync(cancellationToken: cancellationToken).ConfigureAwait(false); + break; + + default: + throw new InvalidOperationException( + $"RespondToControlCommandAsync only handles control command kinds; received {command.Kind}."); + } + + return commandEnvelope; + } + /// Writes a malformed payload directly to the worker stream. /// Malformed payload bytes to write. /// Token to cancel the asynchronous operation.