test(gateway): fake worker responds to control commands (A6)

Add RespondToControlCommandAsync to FakeWorkerHarness so scripted fake
workers can auto-reply to the five control command kinds (Ping,
GetSessionState, GetWorkerInfo, DrainEvents, ShutdownWorker) with canned
replies whose shapes match the real WorkerPipeSession helpers.

Add five unit tests in FakeWorkerHarnessTests covering each control
command kind through the WorkerClient→pipe roundtrip, and one gateway
E2E test (GatewayService_WithFakeWorker_ControlCommandsRoundtripThroughGateway)
that exercises Ping, GetWorkerInfo, and DrainEvents through the full
gRPC→SessionManager→WorkerClient→named-pipe path using a scripted
ControlCommandFakeWorkerProcessLauncher.
This commit is contained in:
Joseph Doherty
2026-06-15 10:56:56 -04:00
parent dde9934e60
commit bb5139fec2
3 changed files with 466 additions and 8 deletions
@@ -94,6 +94,103 @@ public sealed class GatewayEndToEndFakeWorkerSmokeTests
launcher.CommandKinds);
}
/// <summary>
/// Verifies that the gateway forwards control commands (Ping, GetWorkerInfo, DrainEvents)
/// through the full gRPC→WorkerClient→pipe roundtrip when the fake worker responds
/// with canned replies via RespondToControlCommandAsync.
/// </summary>
[Fact]
public async Task GatewayService_WithFakeWorker_ControlCommandsRoundtripThroughGateway()
{
ControlCommandFakeWorkerProcessLauncher launcher = new();
await using GatewayServiceFixture fixture = new(launcher);
OpenSessionReply openReply = await fixture.Service.OpenSession(
new OpenSessionRequest
{
ClientSessionName = "control-cmd-e2e",
ClientCorrelationId = "control-open-correlation",
CommandTimeout = Duration.FromTimeSpan(TestTimeout),
},
new TestServerCallContext());
Assert.Equal(ProtocolStatusCode.Ok, openReply.ProtocolStatus.Code);
string sessionId = openReply.SessionId;
// Ping — the scripted worker echoes back the message.
Task<MxCommandReply> pingTask = fixture.Service.Invoke(
new MxCommandRequest
{
SessionId = sessionId,
ClientCorrelationId = "ping-correlation",
Command = new MxCommand
{
Kind = MxCommandKind.Ping,
Ping = new PingCommand { Message = "e2e-ping" },
},
},
new TestServerCallContext());
await launcher.WaitForNextControlCommandAsync(TestTimeout);
MxCommandReply pingReply = await pingTask.WaitAsync(TestTimeout);
Assert.Equal(ProtocolStatusCode.Ok, pingReply.ProtocolStatus.Code);
Assert.Equal(MxCommandKind.Ping, pingReply.Kind);
Assert.Equal("e2e-ping", pingReply.DiagnosticMessage);
// GetWorkerInfo — the scripted worker returns canned info.
Task<MxCommandReply> infoTask = fixture.Service.Invoke(
new MxCommandRequest
{
SessionId = sessionId,
ClientCorrelationId = "info-correlation",
Command = new MxCommand
{
Kind = MxCommandKind.GetWorkerInfo,
GetWorkerInfo = new GetWorkerInfoCommand(),
},
},
new TestServerCallContext());
await launcher.WaitForNextControlCommandAsync(TestTimeout);
MxCommandReply infoReply = await infoTask.WaitAsync(TestTimeout);
Assert.Equal(ProtocolStatusCode.Ok, infoReply.ProtocolStatus.Code);
Assert.Equal(MxCommandKind.GetWorkerInfo, infoReply.Kind);
Assert.NotNull(infoReply.WorkerInfo);
Assert.Equal(ControlCommandFakeWorkerProcessLauncher.ProcessId, infoReply.WorkerInfo.WorkerProcessId);
Assert.False(string.IsNullOrEmpty(infoReply.WorkerInfo.MxaccessProgid));
// DrainEvents — the scripted worker returns an empty drain reply.
Task<MxCommandReply> drainTask = fixture.Service.Invoke(
new MxCommandRequest
{
SessionId = sessionId,
ClientCorrelationId = "drain-correlation",
Command = new MxCommand
{
Kind = MxCommandKind.DrainEvents,
DrainEvents = new DrainEventsCommand { MaxEvents = 16 },
},
},
new TestServerCallContext());
await launcher.WaitForNextControlCommandAsync(TestTimeout);
MxCommandReply drainReply = await drainTask.WaitAsync(TestTimeout);
Assert.Equal(ProtocolStatusCode.Ok, drainReply.ProtocolStatus.Code);
Assert.Equal(MxCommandKind.DrainEvents, drainReply.Kind);
Assert.NotNull(drainReply.DrainEvents);
Assert.Empty(drainReply.DrainEvents.Events);
// Tear down cleanly.
await fixture.Service.CloseSession(
new CloseSessionRequest
{
SessionId = sessionId,
ClientCorrelationId = "control-close-correlation",
},
new TestServerCallContext());
await launcher.WorkerTask.WaitAsync(TestTimeout);
}
private static MxCommandRequest CreateRegisterRequest(string sessionId)
{
return new MxCommandRequest
@@ -355,6 +452,155 @@ public sealed class GatewayEndToEndFakeWorkerSmokeTests
}
}
/// <summary>
/// A fake worker launcher whose scripted worker automatically responds to control
/// commands (Ping, GetWorkerInfo, DrainEvents) using <see cref="FakeWorkerHarness.RespondToControlCommandAsync"/>
/// and sends a shutdown ack when the gateway closes the session. Exposes
/// <see cref="WaitForNextControlCommandAsync"/> so the test can drive the interaction
/// one command at a time without races.
/// </summary>
private sealed class ControlCommandFakeWorkerProcessLauncher : IWorkerProcessLauncher
{
public const int ProcessId = 5590;
private readonly FakeWorkerProcess _process = new(ProcessId);
private readonly SemaphoreSlim _commandHandled = new(0);
/// <summary>Gets the task backing the scripted worker loop.</summary>
public Task WorkerTask { get; private set; } = Task.CompletedTask;
/// <inheritdoc />
public Task<WorkerProcessHandle> LaunchAsync(
WorkerProcessLaunchRequest request,
CancellationToken cancellationToken = default)
{
WorkerTask = RunWorkerAsync(request, cancellationToken);
return Task.FromResult(new WorkerProcessHandle(
_process,
new WorkerProcessCommandLine("fake-control-worker.exe", []),
DateTimeOffset.UtcNow));
}
/// <summary>Waits until the scripted worker has responded to one control command.</summary>
/// <param name="timeout">Maximum time to wait.</param>
public async Task WaitForNextControlCommandAsync(TimeSpan timeout)
{
using CancellationTokenSource cts = new(timeout);
await _commandHandled.WaitAsync(cts.Token).ConfigureAwait(false);
}
private async Task RunWorkerAsync(
WorkerProcessLaunchRequest request,
CancellationToken cancellationToken)
{
await using FakeWorkerHarness harness = await FakeWorkerHarness.ConnectToGatewayPipeAsync(
request.SessionId,
request.Nonce,
request.PipeName,
request.ProtocolVersion,
cancellationToken: cancellationToken).ConfigureAwait(false);
await harness.CompleteStartupAsync(ProcessId, cancellationToken: cancellationToken).ConfigureAwait(false);
while (!cancellationToken.IsCancellationRequested)
{
WorkerEnvelope envelope = await harness
.ReadGatewayEnvelopeAsync(cancellationToken)
.ConfigureAwait(false);
if (envelope.BodyCase == WorkerEnvelope.BodyOneofCase.WorkerShutdown)
{
await harness.SendShutdownAckAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
_process.MarkExited(0);
return;
}
if (envelope.BodyCase == WorkerEnvelope.BodyOneofCase.WorkerCommand)
{
MxCommandKind kind = envelope.WorkerCommand?.Command?.Kind ?? MxCommandKind.Unspecified;
if (kind is MxCommandKind.Ping or MxCommandKind.GetSessionState
or MxCommandKind.GetWorkerInfo or MxCommandKind.DrainEvents
or MxCommandKind.ShutdownWorker)
{
// Re-enter the harness to process the already-read envelope
// by replaying it through the control-command responder path.
await RespondToKnownControlCommandAsync(harness, envelope, cancellationToken)
.ConfigureAwait(false);
_commandHandled.Release();
continue;
}
}
throw new InvalidOperationException(
$"ControlCommandFakeWorkerProcessLauncher received unexpected envelope {envelope.BodyCase}.");
}
}
private static async Task RespondToKnownControlCommandAsync(
FakeWorkerHarness harness,
WorkerEnvelope commandEnvelope,
CancellationToken cancellationToken)
{
MxCommand command = commandEnvelope.WorkerCommand.Command;
switch (command.Kind)
{
case MxCommandKind.Ping:
await harness.ReplyToCommandAsync(
commandEnvelope,
configureReply: reply =>
{
string? message = command.Ping?.Message;
if (!string.IsNullOrEmpty(message))
{
reply.DiagnosticMessage = message;
}
},
cancellationToken: cancellationToken).ConfigureAwait(false);
break;
case MxCommandKind.GetSessionState:
await harness.ReplyToCommandAsync(
commandEnvelope,
configureReply: reply => reply.SessionState = new SessionStateReply
{
State = SessionState.Ready,
},
cancellationToken: cancellationToken).ConfigureAwait(false);
break;
case MxCommandKind.GetWorkerInfo:
await harness.ReplyToCommandAsync(
commandEnvelope,
configureReply: reply => reply.WorkerInfo = new WorkerInfoReply
{
WorkerProcessId = ControlCommandFakeWorkerProcessLauncher.ProcessId,
WorkerVersion = "fake-control-worker",
MxaccessProgid = "LMXProxy.LMXProxyServer.1",
MxaccessClsid = "{C30B52F5-2CB5-4760-AF0A-3A344A7EB5DC}",
},
cancellationToken: cancellationToken).ConfigureAwait(false);
break;
case MxCommandKind.DrainEvents:
await harness.ReplyToCommandAsync(
commandEnvelope,
configureReply: reply => reply.DrainEvents = new DrainEventsReply(),
cancellationToken: cancellationToken).ConfigureAwait(false);
break;
case MxCommandKind.ShutdownWorker:
await harness.ReplyToCommandAsync(commandEnvelope, cancellationToken: cancellationToken)
.ConfigureAwait(false);
await harness.SendShutdownAckAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
break;
default:
throw new InvalidOperationException(
$"Unexpected control command kind {command.Kind} in ControlCommandFakeWorkerProcessLauncher.");
}
}
}
private sealed class FakeWorkerProcess(int processId) : IWorkerProcess
{
private readonly TaskCompletionSource _exited = new(TaskCreationOptions.RunContinuationsAsynchronously);