diff --git a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/GatewayEndToEndFakeWorkerSmokeTests.cs b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/GatewayEndToEndFakeWorkerSmokeTests.cs index 2185f29..0796638 100644 --- a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/GatewayEndToEndFakeWorkerSmokeTests.cs +++ b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/GatewayEndToEndFakeWorkerSmokeTests.cs @@ -94,6 +94,103 @@ public sealed class GatewayEndToEndFakeWorkerSmokeTests launcher.CommandKinds); } + /// + /// Verifies that the gateway forwards control commands (Ping, GetWorkerInfo, DrainEvents) + /// through the full gRPC→WorkerClient→pipe roundtrip when the fake worker responds + /// with canned replies via RespondToControlCommandAsync. + /// + [Fact] + public async Task GatewayService_WithFakeWorker_ControlCommandsRoundtripThroughGateway() + { + ControlCommandFakeWorkerProcessLauncher launcher = new(); + await using GatewayServiceFixture fixture = new(launcher); + + OpenSessionReply openReply = await fixture.Service.OpenSession( + new OpenSessionRequest + { + ClientSessionName = "control-cmd-e2e", + ClientCorrelationId = "control-open-correlation", + CommandTimeout = Duration.FromTimeSpan(TestTimeout), + }, + new TestServerCallContext()); + + Assert.Equal(ProtocolStatusCode.Ok, openReply.ProtocolStatus.Code); + string sessionId = openReply.SessionId; + + // Ping — the scripted worker echoes back the message. + Task pingTask = fixture.Service.Invoke( + new MxCommandRequest + { + SessionId = sessionId, + ClientCorrelationId = "ping-correlation", + Command = new MxCommand + { + Kind = MxCommandKind.Ping, + Ping = new PingCommand { Message = "e2e-ping" }, + }, + }, + new TestServerCallContext()); + await launcher.WaitForNextControlCommandAsync(TestTimeout); + MxCommandReply pingReply = await pingTask.WaitAsync(TestTimeout); + + Assert.Equal(ProtocolStatusCode.Ok, pingReply.ProtocolStatus.Code); + Assert.Equal(MxCommandKind.Ping, pingReply.Kind); + Assert.Equal("e2e-ping", pingReply.DiagnosticMessage); + + // GetWorkerInfo — the scripted worker returns canned info. + Task infoTask = fixture.Service.Invoke( + new MxCommandRequest + { + SessionId = sessionId, + ClientCorrelationId = "info-correlation", + Command = new MxCommand + { + Kind = MxCommandKind.GetWorkerInfo, + GetWorkerInfo = new GetWorkerInfoCommand(), + }, + }, + new TestServerCallContext()); + await launcher.WaitForNextControlCommandAsync(TestTimeout); + MxCommandReply infoReply = await infoTask.WaitAsync(TestTimeout); + + Assert.Equal(ProtocolStatusCode.Ok, infoReply.ProtocolStatus.Code); + Assert.Equal(MxCommandKind.GetWorkerInfo, infoReply.Kind); + Assert.NotNull(infoReply.WorkerInfo); + Assert.Equal(ControlCommandFakeWorkerProcessLauncher.ProcessId, infoReply.WorkerInfo.WorkerProcessId); + Assert.False(string.IsNullOrEmpty(infoReply.WorkerInfo.MxaccessProgid)); + + // DrainEvents — the scripted worker returns an empty drain reply. + Task drainTask = fixture.Service.Invoke( + new MxCommandRequest + { + SessionId = sessionId, + ClientCorrelationId = "drain-correlation", + Command = new MxCommand + { + Kind = MxCommandKind.DrainEvents, + DrainEvents = new DrainEventsCommand { MaxEvents = 16 }, + }, + }, + new TestServerCallContext()); + await launcher.WaitForNextControlCommandAsync(TestTimeout); + MxCommandReply drainReply = await drainTask.WaitAsync(TestTimeout); + + Assert.Equal(ProtocolStatusCode.Ok, drainReply.ProtocolStatus.Code); + Assert.Equal(MxCommandKind.DrainEvents, drainReply.Kind); + Assert.NotNull(drainReply.DrainEvents); + Assert.Empty(drainReply.DrainEvents.Events); + + // Tear down cleanly. + await fixture.Service.CloseSession( + new CloseSessionRequest + { + SessionId = sessionId, + ClientCorrelationId = "control-close-correlation", + }, + new TestServerCallContext()); + await launcher.WorkerTask.WaitAsync(TestTimeout); + } + private static MxCommandRequest CreateRegisterRequest(string sessionId) { return new MxCommandRequest @@ -355,6 +452,155 @@ public sealed class GatewayEndToEndFakeWorkerSmokeTests } } + /// + /// A fake worker launcher whose scripted worker automatically responds to control + /// commands (Ping, GetWorkerInfo, DrainEvents) using + /// and sends a shutdown ack when the gateway closes the session. Exposes + /// so the test can drive the interaction + /// one command at a time without races. + /// + private sealed class ControlCommandFakeWorkerProcessLauncher : IWorkerProcessLauncher + { + public const int ProcessId = 5590; + + private readonly FakeWorkerProcess _process = new(ProcessId); + private readonly SemaphoreSlim _commandHandled = new(0); + + /// Gets the task backing the scripted worker loop. + public Task WorkerTask { get; private set; } = Task.CompletedTask; + + /// + public Task LaunchAsync( + WorkerProcessLaunchRequest request, + CancellationToken cancellationToken = default) + { + WorkerTask = RunWorkerAsync(request, cancellationToken); + + return Task.FromResult(new WorkerProcessHandle( + _process, + new WorkerProcessCommandLine("fake-control-worker.exe", []), + DateTimeOffset.UtcNow)); + } + + /// Waits until the scripted worker has responded to one control command. + /// Maximum time to wait. + public async Task WaitForNextControlCommandAsync(TimeSpan timeout) + { + using CancellationTokenSource cts = new(timeout); + await _commandHandled.WaitAsync(cts.Token).ConfigureAwait(false); + } + + private async Task RunWorkerAsync( + WorkerProcessLaunchRequest request, + CancellationToken cancellationToken) + { + await using FakeWorkerHarness harness = await FakeWorkerHarness.ConnectToGatewayPipeAsync( + request.SessionId, + request.Nonce, + request.PipeName, + request.ProtocolVersion, + cancellationToken: cancellationToken).ConfigureAwait(false); + await harness.CompleteStartupAsync(ProcessId, cancellationToken: cancellationToken).ConfigureAwait(false); + + while (!cancellationToken.IsCancellationRequested) + { + WorkerEnvelope envelope = await harness + .ReadGatewayEnvelopeAsync(cancellationToken) + .ConfigureAwait(false); + + if (envelope.BodyCase == WorkerEnvelope.BodyOneofCase.WorkerShutdown) + { + await harness.SendShutdownAckAsync(cancellationToken: cancellationToken).ConfigureAwait(false); + _process.MarkExited(0); + return; + } + + if (envelope.BodyCase == WorkerEnvelope.BodyOneofCase.WorkerCommand) + { + MxCommandKind kind = envelope.WorkerCommand?.Command?.Kind ?? MxCommandKind.Unspecified; + if (kind is MxCommandKind.Ping or MxCommandKind.GetSessionState + or MxCommandKind.GetWorkerInfo or MxCommandKind.DrainEvents + or MxCommandKind.ShutdownWorker) + { + // Re-enter the harness to process the already-read envelope + // by replaying it through the control-command responder path. + await RespondToKnownControlCommandAsync(harness, envelope, cancellationToken) + .ConfigureAwait(false); + _commandHandled.Release(); + continue; + } + } + + throw new InvalidOperationException( + $"ControlCommandFakeWorkerProcessLauncher received unexpected envelope {envelope.BodyCase}."); + } + } + + private static async Task RespondToKnownControlCommandAsync( + FakeWorkerHarness harness, + WorkerEnvelope commandEnvelope, + CancellationToken cancellationToken) + { + MxCommand command = commandEnvelope.WorkerCommand.Command; + switch (command.Kind) + { + case MxCommandKind.Ping: + await harness.ReplyToCommandAsync( + commandEnvelope, + configureReply: reply => + { + string? message = command.Ping?.Message; + if (!string.IsNullOrEmpty(message)) + { + reply.DiagnosticMessage = message; + } + }, + cancellationToken: cancellationToken).ConfigureAwait(false); + break; + + case MxCommandKind.GetSessionState: + await harness.ReplyToCommandAsync( + commandEnvelope, + configureReply: reply => reply.SessionState = new SessionStateReply + { + State = SessionState.Ready, + }, + cancellationToken: cancellationToken).ConfigureAwait(false); + break; + + case MxCommandKind.GetWorkerInfo: + await harness.ReplyToCommandAsync( + commandEnvelope, + configureReply: reply => reply.WorkerInfo = new WorkerInfoReply + { + WorkerProcessId = ControlCommandFakeWorkerProcessLauncher.ProcessId, + WorkerVersion = "fake-control-worker", + MxaccessProgid = "LMXProxy.LMXProxyServer.1", + MxaccessClsid = "{C30B52F5-2CB5-4760-AF0A-3A344A7EB5DC}", + }, + cancellationToken: cancellationToken).ConfigureAwait(false); + break; + + case MxCommandKind.DrainEvents: + await harness.ReplyToCommandAsync( + commandEnvelope, + configureReply: reply => reply.DrainEvents = new DrainEventsReply(), + cancellationToken: cancellationToken).ConfigureAwait(false); + break; + + case MxCommandKind.ShutdownWorker: + await harness.ReplyToCommandAsync(commandEnvelope, cancellationToken: cancellationToken) + .ConfigureAwait(false); + await harness.SendShutdownAckAsync(cancellationToken: cancellationToken).ConfigureAwait(false); + break; + + default: + throw new InvalidOperationException( + $"Unexpected control command kind {command.Kind} in ControlCommandFakeWorkerProcessLauncher."); + } + } + } + private sealed class FakeWorkerProcess(int processId) : IWorkerProcess { private readonly TaskCompletionSource _exited = new(TaskCreationOptions.RunContinuationsAsynchronously); diff --git a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Workers/FakeWorkerHarnessTests.cs b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Workers/FakeWorkerHarnessTests.cs index a4edd57..f5a3902 100644 --- a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Workers/FakeWorkerHarnessTests.cs +++ b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Workers/FakeWorkerHarnessTests.cs @@ -192,6 +192,139 @@ public sealed class FakeWorkerHarnessTests Assert.Equal(WorkerClientState.Closed, client.State); } + /// + /// Verifies that RespondToControlCommandAsync echoes the Ping message back + /// in the DiagnosticMessage field, matching the real worker's ping reply shape. + /// + [Fact] + public async Task RespondToControlCommandAsync_Ping_EchoesMessageInDiagnostic() + { + await using FakeWorkerHarness fakeWorker = await FakeWorkerHarness.CreateConnectedPairAsync(); + await using WorkerClient client = fakeWorker.CreateClient(); + await StartClientAsync(fakeWorker, client); + + Task invokeTask = client.InvokeAsync( + CreateCommand(MxCommandKind.Ping, cmd => cmd.Ping = new PingCommand { Message = "hello-ping" }), + TestTimeout, + CancellationToken.None); + await fakeWorker.RespondToControlCommandAsync(); + + WorkerCommandReply reply = await invokeTask.WaitAsync(TestTimeout); + + Assert.Equal(MxCommandKind.Ping, reply.Reply.Kind); + Assert.Equal(ProtocolStatusCode.Ok, reply.Reply.ProtocolStatus.Code); + Assert.Equal("hello-ping", reply.Reply.DiagnosticMessage); + } + + /// + /// Verifies that RespondToControlCommandAsync returns a SessionStateReply + /// with state Ready for a GetSessionState command. + /// + [Fact] + public async Task RespondToControlCommandAsync_GetSessionState_ReturnsReadyState() + { + await using FakeWorkerHarness fakeWorker = await FakeWorkerHarness.CreateConnectedPairAsync(); + await using WorkerClient client = fakeWorker.CreateClient(); + await StartClientAsync(fakeWorker, client); + + Task invokeTask = client.InvokeAsync( + CreateCommand(MxCommandKind.GetSessionState), + TestTimeout, + CancellationToken.None); + await fakeWorker.RespondToControlCommandAsync(); + + WorkerCommandReply reply = await invokeTask.WaitAsync(TestTimeout); + + Assert.Equal(MxCommandKind.GetSessionState, reply.Reply.Kind); + Assert.Equal(ProtocolStatusCode.Ok, reply.Reply.ProtocolStatus.Code); + Assert.NotNull(reply.Reply.SessionState); + Assert.Equal(SessionState.Ready, reply.Reply.SessionState.State); + } + + /// + /// Verifies that RespondToControlCommandAsync returns a WorkerInfoReply + /// with the fake worker's process ID, version, and MXAccess identifiers. + /// + [Fact] + public async Task RespondToControlCommandAsync_GetWorkerInfo_ReturnsFakeWorkerInfo() + { + await using FakeWorkerHarness fakeWorker = await FakeWorkerHarness.CreateConnectedPairAsync(); + await using WorkerClient client = fakeWorker.CreateClient(); + await StartClientAsync(fakeWorker, client); + + Task invokeTask = client.InvokeAsync( + CreateCommand(MxCommandKind.GetWorkerInfo), + TestTimeout, + CancellationToken.None); + await fakeWorker.RespondToControlCommandAsync(); + + WorkerCommandReply reply = await invokeTask.WaitAsync(TestTimeout); + + Assert.Equal(MxCommandKind.GetWorkerInfo, reply.Reply.Kind); + Assert.Equal(ProtocolStatusCode.Ok, reply.Reply.ProtocolStatus.Code); + Assert.NotNull(reply.Reply.WorkerInfo); + Assert.Equal(FakeWorkerHarness.DefaultWorkerProcessId, reply.Reply.WorkerInfo.WorkerProcessId); + Assert.Equal("LMXProxy.LMXProxyServer.1", reply.Reply.WorkerInfo.MxaccessProgid); + Assert.False(string.IsNullOrEmpty(reply.Reply.WorkerInfo.MxaccessClsid)); + } + + /// + /// Verifies that RespondToControlCommandAsync returns an empty DrainEventsReply + /// for a DrainEvents command (the fake harness has no queued events). + /// + [Fact] + public async Task RespondToControlCommandAsync_DrainEvents_ReturnsEmptyReply() + { + await using FakeWorkerHarness fakeWorker = await FakeWorkerHarness.CreateConnectedPairAsync(); + await using WorkerClient client = fakeWorker.CreateClient(); + await StartClientAsync(fakeWorker, client); + + Task invokeTask = client.InvokeAsync( + CreateCommand(MxCommandKind.DrainEvents, cmd => cmd.DrainEvents = new DrainEventsCommand { MaxEvents = 32 }), + TestTimeout, + CancellationToken.None); + await fakeWorker.RespondToControlCommandAsync(); + + WorkerCommandReply reply = await invokeTask.WaitAsync(TestTimeout); + + Assert.Equal(MxCommandKind.DrainEvents, reply.Reply.Kind); + Assert.Equal(ProtocolStatusCode.Ok, reply.Reply.ProtocolStatus.Code); + Assert.NotNull(reply.Reply.DrainEvents); + Assert.Empty(reply.Reply.DrainEvents.Events); + } + + /// + /// Verifies that RespondToControlCommandAsync for ShutdownWorker sends an OK + /// reply followed by a WorkerShutdownAck, which closes the client. + /// + [Fact] + public async Task RespondToControlCommandAsync_ShutdownWorker_SendsReplyThenAck() + { + await using FakeWorkerHarness fakeWorker = await FakeWorkerHarness.CreateConnectedPairAsync(); + await using WorkerClient client = fakeWorker.CreateClient(); + await StartClientAsync(fakeWorker, client); + + // ShutdownAsync triggers a WorkerShutdown envelope (not WorkerCommand), + // so we directly invoke ShutdownWorker as a control command via InvokeAsync. + Task invokeTask = client.InvokeAsync( + CreateCommand(MxCommandKind.ShutdownWorker, cmd => cmd.ShutdownWorker = new ShutdownWorkerCommand()), + TestTimeout, + CancellationToken.None); + + // The harness reads the ShutdownWorker WorkerCommand and replies with + // OK + ShutdownAck — the WorkerClient's read loop processes the ack and + // transitions to Closed. + await fakeWorker.RespondToControlCommandAsync(); + + WorkerCommandReply reply = await invokeTask.WaitAsync(TestTimeout); + + Assert.Equal(MxCommandKind.ShutdownWorker, reply.Reply.Kind); + Assert.Equal(ProtocolStatusCode.Ok, reply.Reply.ProtocolStatus.Code); + + await WaitUntilAsync(() => client.State == WorkerClientState.Closed, TestTimeout); + Assert.Equal(WorkerClientState.Closed, client.State); + } + private static async Task StartClientAsync( FakeWorkerHarness fakeWorker, WorkerClient client) @@ -201,15 +334,13 @@ public sealed class FakeWorkerHarnessTests await startTask.WaitAsync(TestTimeout).ConfigureAwait(false); } - private static WorkerCommand CreateCommand(MxCommandKind kind) + private static WorkerCommand CreateCommand( + MxCommandKind kind, + Action? configure = null) { - return new WorkerCommand - { - Command = new MxCommand - { - Kind = kind, - }, - }; + MxCommand command = new() { Kind = kind }; + configure?.Invoke(command); + return new WorkerCommand { Command = command }; } private static async Task WaitUntilAsync( diff --git a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Workers/Fakes/FakeWorkerHarness.cs b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Workers/Fakes/FakeWorkerHarness.cs index 9d03b6d..2277497 100644 --- a/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Workers/Fakes/FakeWorkerHarness.cs +++ b/src/ZB.MOM.WW.MxGateway.Tests/Gateway/Workers/Fakes/FakeWorkerHarness.cs @@ -391,6 +391,87 @@ public sealed class FakeWorkerHarness : IAsyncDisposable cancellationToken).ConfigureAwait(false); } + /// + /// Reads one incoming command envelope and, if it is one of the five + /// control command kinds (Ping, GetSessionState, GetWorkerInfo, DrainEvents, + /// ShutdownWorker), writes a canned reply that mirrors the real worker's + /// reply shape. For ShutdownWorker the method additionally sends a + /// after the OK reply, matching the real + /// worker's shutdown flow. + /// + /// Token to cancel the asynchronous operation. + /// The command envelope that was handled. + /// + /// Thrown when the next envelope is not a WorkerCommand or contains a + /// non-control command kind. + /// + public async Task RespondToControlCommandAsync( + CancellationToken cancellationToken = default) + { + WorkerEnvelope commandEnvelope = await ReadCommandAsync(cancellationToken).ConfigureAwait(false); + MxCommand command = commandEnvelope.WorkerCommand.Command; + + switch (command.Kind) + { + case MxCommandKind.Ping: + await ReplyToCommandAsync( + commandEnvelope, + configureReply: reply => + { + string? message = command.Ping?.Message; + if (!string.IsNullOrEmpty(message)) + { + reply.DiagnosticMessage = message; + } + }, + cancellationToken: cancellationToken).ConfigureAwait(false); + break; + + case MxCommandKind.GetSessionState: + await ReplyToCommandAsync( + commandEnvelope, + configureReply: reply => reply.SessionState = new SessionStateReply + { + State = SessionState.Ready, + }, + cancellationToken: cancellationToken).ConfigureAwait(false); + break; + + case MxCommandKind.GetWorkerInfo: + await ReplyToCommandAsync( + commandEnvelope, + configureReply: reply => reply.WorkerInfo = new WorkerInfoReply + { + WorkerProcessId = DefaultWorkerProcessId, + WorkerVersion = "fake-worker", + MxaccessProgid = "LMXProxy.LMXProxyServer.1", + MxaccessClsid = "{C30B52F5-2CB5-4760-AF0A-3A344A7EB5DC}", + }, + cancellationToken: cancellationToken).ConfigureAwait(false); + break; + + case MxCommandKind.DrainEvents: + await ReplyToCommandAsync( + commandEnvelope, + configureReply: reply => reply.DrainEvents = new DrainEventsReply(), + cancellationToken: cancellationToken).ConfigureAwait(false); + break; + + case MxCommandKind.ShutdownWorker: + await ReplyToCommandAsync( + commandEnvelope, + cancellationToken: cancellationToken).ConfigureAwait(false); + await SendShutdownAckAsync(cancellationToken: cancellationToken).ConfigureAwait(false); + break; + + default: + throw new InvalidOperationException( + $"RespondToControlCommandAsync only handles control command kinds; received {command.Kind}."); + } + + return commandEnvelope; + } + /// Writes a malformed payload directly to the worker stream. /// Malformed payload bytes to write. /// Token to cancel the asynchronous operation.