diff --git a/docs/GatewayTesting.md b/docs/GatewayTesting.md new file mode 100644 index 0000000..1e26fc7 --- /dev/null +++ b/docs/GatewayTesting.md @@ -0,0 +1,51 @@ +# Gateway Testing + +Gateway tests run without installed MXAccess by using fake workers, fake +transports, and in-process gRPC service fakes. Live MXAccess verification belongs +in opt-in integration tests because it depends on installed COM components and +provider state. + +## Fake Worker Harness + +`FakeWorkerHarness` in `src/MxGateway.Tests/Gateway/Workers/Fakes/` provides an +in-process worker side for named-pipe IPC tests. It uses the same +`WorkerFrameReader`, `WorkerFrameWriter`, and `WorkerEnvelope` contract as the +gateway so tests exercise real frame validation and worker-client state changes. + +Use the harness when a gateway or session test needs worker behavior without +starting `MxGateway.Worker.exe` or loading MXAccess COM. The harness scripts: + +- `WorkerHello` and `WorkerReady` startup, +- command replies with matching correlation ids, +- ordered `WorkerEvent` frames, +- `WorkerFault` frames, +- shutdown acknowledgements, +- malformed protobuf payloads and oversized frame headers, +- slow or hung workers by withholding a reply. + +Session-level tests can connect the harness to the pipe created by +`SessionWorkerClientFactory` with `ConnectToGatewayPipeAsync`. Lower-level +`WorkerClient` tests can use `CreateConnectedPairAsync` to create both pipe ends +inside the test. + +## Focused Commands + +Run the fake worker tests after changing gateway worker IPC, session startup, or +event streaming behavior: + +```bash +dotnet test src/MxGateway.Tests/MxGateway.Tests.csproj --filter FullyQualifiedName~FakeWorkerHarnessTests +dotnet test src/MxGateway.Tests/MxGateway.Tests.csproj --filter FullyQualifiedName~SessionWorkerClientFactoryFakeWorkerTests +``` + +Run the gateway test project after shared gateway test infrastructure changes: + +```bash +dotnet test src/MxGateway.Tests/MxGateway.Tests.csproj +``` + +## Related Documentation + +- [Gateway Process Design](./gateway-process-design.md) +- [Worker Frame Protocol](./WorkerFrameProtocol.md) +- [MXAccess Worker Instance Detailed Design](./mxaccess-worker-instance-design.md) diff --git a/docs/gateway-process-design.md b/docs/gateway-process-design.md index 8c0036f..8bcaef8 100644 --- a/docs/gateway-process-design.md +++ b/docs/gateway-process-design.md @@ -891,6 +891,11 @@ behavior unless an explicit non-parity backend is designed. Gateway tests should be able to run without installed MXAccess by using fake workers and fake transports. +Use `FakeWorkerHarness` for tests that need real gateway-to-worker framing, +handshake, command, event, fault, or malformed-protocol behavior without loading +MXAccess COM. See [Gateway Testing](./GatewayTesting.md) for the harness scope +and focused test commands. + Focused tests: - session state transitions, diff --git a/src/MxGateway.Tests/Gateway/Sessions/SessionWorkerClientFactoryFakeWorkerTests.cs b/src/MxGateway.Tests/Gateway/Sessions/SessionWorkerClientFactoryFakeWorkerTests.cs new file mode 100644 index 0000000..625dd38 --- /dev/null +++ b/src/MxGateway.Tests/Gateway/Sessions/SessionWorkerClientFactoryFakeWorkerTests.cs @@ -0,0 +1,216 @@ +using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.Extensions.Options; +using MxGateway.Contracts; +using MxGateway.Contracts.Proto; +using MxGateway.Server.Configuration; +using MxGateway.Server.Metrics; +using MxGateway.Server.Sessions; +using MxGateway.Server.Workers; +using MxGateway.Tests.Gateway.Workers.Fakes; + +namespace MxGateway.Tests.Gateway.Sessions; + +public sealed class SessionWorkerClientFactoryFakeWorkerTests +{ + private static readonly TimeSpan TestTimeout = TimeSpan.FromSeconds(5); + + [Fact] + public async Task CreateAsync_WithScriptedFakeWorker_ReturnsReadyClient() + { + ScriptedFakeWorkerProcessLauncher launcher = new(); + using GatewayMetrics metrics = new(); + SessionWorkerClientFactory factory = new( + launcher, + Options.Create(CreateOptions()), + metrics, + NullLoggerFactory.Instance); + GatewaySession session = CreateSession(); + + await using IWorkerClient workerClient = await factory.CreateAsync( + session, + CancellationToken.None); + + Assert.Equal(WorkerClientState.Ready, workerClient.State); + Assert.Equal(ScriptedFakeWorkerProcessLauncher.ProcessId, workerClient.ProcessId); + Assert.NotNull(launcher.Harness); + + Task invokeTask = workerClient.InvokeAsync( + CreateCommand(MxCommandKind.Ping), + TestTimeout, + CancellationToken.None); + WorkerEnvelope commandEnvelope = await launcher.Harness.ReadCommandAsync(); + await launcher.Harness.ReplyToCommandAsync(commandEnvelope); + WorkerCommandReply reply = await invokeTask.WaitAsync(TestTimeout); + + Assert.Equal(MxCommandKind.Ping, reply.Reply.Kind); + Assert.Equal(ProtocolStatusCode.Ok, reply.Reply.ProtocolStatus.Code); + } + + [Fact] + public async Task CreateAsync_WhenFakeWorkerStartupFails_ThrowsWorkerClientException() + { + FailingStartupWorkerProcessLauncher launcher = new(); + using GatewayMetrics metrics = new(); + SessionWorkerClientFactory factory = new( + launcher, + Options.Create(CreateOptions()), + metrics, + NullLoggerFactory.Instance); + GatewaySession session = CreateSession(); + + WorkerClientException exception = await Assert.ThrowsAsync( + async () => await factory.CreateAsync(session, CancellationToken.None).WaitAsync(TestTimeout)); + + Assert.Equal(WorkerClientErrorCode.ProtocolViolation, exception.ErrorCode); + Assert.True(launcher.Process.IsDisposed); + } + + private static GatewayOptions CreateOptions() + { + return new GatewayOptions + { + Worker = new WorkerOptions + { + StartupTimeoutSeconds = 5, + ShutdownTimeoutSeconds = 5, + HeartbeatIntervalSeconds = 30, + HeartbeatGraceSeconds = 30, + MaxMessageBytes = WorkerFrameProtocolOptions.DefaultMaxMessageBytes, + }, + Events = new EventOptions + { + QueueCapacity = 16, + }, + }; + } + + private static GatewaySession CreateSession() + { + return new GatewaySession( + FakeWorkerHarness.DefaultSessionId, + GatewayContractInfo.DefaultBackendName, + $"mxaccessgw-session-fake-worker-{Guid.NewGuid():N}", + FakeWorkerHarness.DefaultNonce, + "test-client", + "fake-worker-session-test", + "client-correlation-1", + TestTimeout, + TestTimeout, + TestTimeout, + DateTimeOffset.UtcNow); + } + + private static WorkerCommand CreateCommand(MxCommandKind kind) + { + return new WorkerCommand + { + Command = new MxCommand + { + Kind = kind, + }, + }; + } + + private sealed class ScriptedFakeWorkerProcessLauncher : IWorkerProcessLauncher + { + public const int ProcessId = 2468; + private readonly FakeWorkerProcess _process = new(ProcessId); + + public FakeWorkerHarness? Harness { get; private set; } + + public Task LaunchAsync( + WorkerProcessLaunchRequest request, + CancellationToken cancellationToken = default) + { + _ = RunWorkerAsync(request, cancellationToken); + + return Task.FromResult(CreateHandle(_process)); + } + + private async Task RunWorkerAsync( + WorkerProcessLaunchRequest request, + CancellationToken cancellationToken) + { + Harness = await FakeWorkerHarness.ConnectToGatewayPipeAsync( + request.SessionId, + request.Nonce, + request.PipeName, + request.ProtocolVersion, + cancellationToken: cancellationToken).ConfigureAwait(false); + await Harness.CompleteStartupAsync(ProcessId, cancellationToken: cancellationToken).ConfigureAwait(false); + } + } + + private sealed class FailingStartupWorkerProcessLauncher : IWorkerProcessLauncher + { + public FakeWorkerProcess Process { get; } = new(processId: 3579); + + public Task LaunchAsync( + WorkerProcessLaunchRequest request, + CancellationToken cancellationToken = default) + { + _ = RunWorkerAsync(request, cancellationToken); + + return Task.FromResult(CreateHandle(Process)); + } + + private async Task RunWorkerAsync( + WorkerProcessLaunchRequest request, + CancellationToken cancellationToken) + { + await using FakeWorkerHarness harness = await FakeWorkerHarness.ConnectToGatewayPipeAsync( + request.SessionId, + request.Nonce, + request.PipeName, + request.ProtocolVersion, + cancellationToken: cancellationToken).ConfigureAwait(false); + _ = await harness.ReadGatewayEnvelopeAsync(cancellationToken).ConfigureAwait(false); + await harness.SendWorkerHelloAsync( + workerProcessId: Process.Id, + workerProtocolVersion: request.ProtocolVersion + 1, + cancellationToken: cancellationToken).ConfigureAwait(false); + } + } + + private static WorkerProcessHandle CreateHandle(IWorkerProcess process) + { + return new WorkerProcessHandle( + process, + new WorkerProcessCommandLine("fake-worker.exe", []), + DateTimeOffset.UtcNow); + } + + private sealed class FakeWorkerProcess(int processId) : IWorkerProcess + { + private bool _disposed; + + public int Id { get; } = processId; + + public bool HasExited { get; private set; } + + public int? ExitCode { get; private set; } + + public int KillCount { get; private set; } + + public ValueTask WaitForExitAsync(CancellationToken cancellationToken) + { + HasExited = true; + ExitCode = 0; + return ValueTask.CompletedTask; + } + + public void Kill(bool entireProcessTree) + { + KillCount++; + HasExited = true; + ExitCode = -1; + } + + public void Dispose() + { + _disposed = true; + } + + public bool IsDisposed => _disposed; + } +} diff --git a/src/MxGateway.Tests/Gateway/Workers/FakeWorkerHarnessTests.cs b/src/MxGateway.Tests/Gateway/Workers/FakeWorkerHarnessTests.cs new file mode 100644 index 0000000..b5daed4 --- /dev/null +++ b/src/MxGateway.Tests/Gateway/Workers/FakeWorkerHarnessTests.cs @@ -0,0 +1,190 @@ +using MxGateway.Contracts; +using MxGateway.Contracts.Proto; +using MxGateway.Server.Workers; +using MxGateway.Tests.Gateway.Workers.Fakes; + +namespace MxGateway.Tests.Gateway.Workers; + +public sealed class FakeWorkerHarnessTests +{ + private static readonly TimeSpan TestTimeout = TimeSpan.FromSeconds(5); + + [Fact] + public async Task CompleteStartupAsync_WithHelloAndReady_TransitionsClientToReady() + { + await using FakeWorkerHarness fakeWorker = await FakeWorkerHarness.CreateConnectedPairAsync(); + await using WorkerClient client = fakeWorker.CreateClient(); + + Task startTask = client.StartAsync(CancellationToken.None); + WorkerEnvelope gatewayHello = await fakeWorker.CompleteStartupAsync(); + await startTask.WaitAsync(TestTimeout); + + Assert.Equal(WorkerEnvelope.BodyOneofCase.GatewayHello, gatewayHello.BodyCase); + Assert.Equal(FakeWorkerHarness.DefaultNonce, gatewayHello.GatewayHello.Nonce); + Assert.Equal(WorkerClientState.Ready, client.State); + Assert.Equal(FakeWorkerHarness.DefaultWorkerProcessId, client.ProcessId); + } + + [Fact] + public async Task StartAsync_WithProtocolMismatch_FailsStartup() + { + await using FakeWorkerHarness fakeWorker = await FakeWorkerHarness.CreateConnectedPairAsync(); + await using WorkerClient client = fakeWorker.CreateClient(); + + Task startTask = client.StartAsync(CancellationToken.None); + WorkerEnvelope gatewayHello = await fakeWorker.ReadGatewayEnvelopeAsync(); + Assert.Equal(WorkerEnvelope.BodyOneofCase.GatewayHello, gatewayHello.BodyCase); + await fakeWorker.SendWorkerHelloAsync( + workerProtocolVersion: GatewayContractInfo.WorkerProtocolVersion + 1); + + WorkerClientException exception = await Assert.ThrowsAsync( + async () => await startTask.WaitAsync(TestTimeout)); + + Assert.Equal(WorkerClientErrorCode.ProtocolViolation, exception.ErrorCode); + } + + [Fact] + public async Task InvokeAsync_WithScriptedReply_CompletesCommand() + { + await using FakeWorkerHarness fakeWorker = await FakeWorkerHarness.CreateConnectedPairAsync(); + await using WorkerClient client = fakeWorker.CreateClient(); + await StartClientAsync(fakeWorker, client); + + Task invokeTask = client.InvokeAsync( + CreateCommand(MxCommandKind.Ping), + TestTimeout, + CancellationToken.None); + WorkerEnvelope commandEnvelope = await fakeWorker.ReadCommandAsync(); + await fakeWorker.ReplyToCommandAsync(commandEnvelope); + + WorkerCommandReply reply = await invokeTask.WaitAsync(TestTimeout); + + Assert.Equal(commandEnvelope.CorrelationId, reply.Reply.CorrelationId); + Assert.Equal(MxCommandKind.Ping, reply.Reply.Kind); + Assert.Equal(ProtocolStatusCode.Ok, reply.Reply.ProtocolStatus.Code); + } + + [Fact] + public async Task ReadEventsAsync_WithScriptedEvents_YieldsOrderedEvents() + { + await using FakeWorkerHarness fakeWorker = await FakeWorkerHarness.CreateConnectedPairAsync(); + await using WorkerClient client = fakeWorker.CreateClient(); + await StartClientAsync(fakeWorker, client); + using CancellationTokenSource cancellationTokenSource = new(TestTimeout); + + await using IAsyncEnumerator events = + client.ReadEventsAsync(cancellationTokenSource.Token).GetAsyncEnumerator(cancellationTokenSource.Token); + + await fakeWorker.EmitEventAsync(MxEventFamily.OnDataChange, cancellationTokenSource.Token); + await fakeWorker.EmitEventAsync(MxEventFamily.OperationComplete, cancellationTokenSource.Token); + + Assert.True(await events.MoveNextAsync()); + Assert.Equal((ulong)3, events.Current.Event.WorkerSequence); + Assert.Equal(MxEventFamily.OnDataChange, events.Current.Event.Family); + + Assert.True(await events.MoveNextAsync()); + Assert.Equal((ulong)4, events.Current.Event.WorkerSequence); + Assert.Equal(MxEventFamily.OperationComplete, events.Current.Event.Family); + } + + [Fact] + public async Task ReadLoop_WithScriptedFault_FaultsClient() + { + await using FakeWorkerHarness fakeWorker = await FakeWorkerHarness.CreateConnectedPairAsync(); + await using WorkerClient client = fakeWorker.CreateClient(); + await StartClientAsync(fakeWorker, client); + + await fakeWorker.EmitFaultAsync( + WorkerFaultCategory.MxaccessCommandFailed, + "scripted MXAccess command fault"); + + await WaitUntilAsync( + () => client.State == WorkerClientState.Faulted, + TestTimeout); + + Assert.Equal(WorkerClientState.Faulted, client.State); + } + + [Fact] + public async Task InvokeAsync_WithHungWorker_TimesOutPendingCommand() + { + await using FakeWorkerHarness fakeWorker = await FakeWorkerHarness.CreateConnectedPairAsync(); + await using WorkerClient client = fakeWorker.CreateClient(); + await StartClientAsync(fakeWorker, client); + + Task invokeTask = client.InvokeAsync( + CreateCommand(MxCommandKind.Ping), + TimeSpan.FromMilliseconds(50), + CancellationToken.None); + WorkerEnvelope commandEnvelope = await fakeWorker.ReadCommandAsync(); + + WorkerClientException exception = await Assert.ThrowsAsync( + async () => await invokeTask.WaitAsync(TestTimeout)); + + Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerCommand, commandEnvelope.BodyCase); + Assert.Equal(WorkerClientErrorCode.CommandTimeout, exception.ErrorCode); + } + + [Fact] + public async Task ReadLoop_WithMalformedFrame_FaultsClient() + { + await using FakeWorkerHarness fakeWorker = await FakeWorkerHarness.CreateConnectedPairAsync(); + await using WorkerClient client = fakeWorker.CreateClient(); + await StartClientAsync(fakeWorker, client); + + await fakeWorker.WriteMalformedPayloadAsync(new byte[] { 0x08, 0x96, 0x01 }); + + await WaitUntilAsync( + () => client.State == WorkerClientState.Faulted, + TestTimeout); + + Assert.Equal(WorkerClientState.Faulted, client.State); + } + + [Fact] + public async Task ShutdownAsync_WithShutdownAck_ClosesClient() + { + await using FakeWorkerHarness fakeWorker = await FakeWorkerHarness.CreateConnectedPairAsync(); + await using WorkerClient client = fakeWorker.CreateClient(); + await StartClientAsync(fakeWorker, client); + + Task shutdownTask = client.ShutdownAsync(TestTimeout, CancellationToken.None); + WorkerEnvelope shutdownEnvelope = await fakeWorker.ReadShutdownAsync(); + await fakeWorker.SendShutdownAckAsync(); + await shutdownTask.WaitAsync(TestTimeout); + + Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerShutdown, shutdownEnvelope.BodyCase); + Assert.Equal(WorkerClientState.Closed, client.State); + } + + private static async Task StartClientAsync( + FakeWorkerHarness fakeWorker, + WorkerClient client) + { + Task startTask = client.StartAsync(CancellationToken.None); + await fakeWorker.CompleteStartupAsync().ConfigureAwait(false); + await startTask.WaitAsync(TestTimeout).ConfigureAwait(false); + } + + private static WorkerCommand CreateCommand(MxCommandKind kind) + { + return new WorkerCommand + { + Command = new MxCommand + { + Kind = kind, + }, + }; + } + + private static async Task WaitUntilAsync( + Func predicate, + TimeSpan timeout) + { + using CancellationTokenSource cancellationTokenSource = new(timeout); + while (!predicate()) + { + await Task.Delay(TimeSpan.FromMilliseconds(10), cancellationTokenSource.Token); + } + } +} diff --git a/src/MxGateway.Tests/Gateway/Workers/Fakes/FakeWorkerHarness.cs b/src/MxGateway.Tests/Gateway/Workers/Fakes/FakeWorkerHarness.cs new file mode 100644 index 0000000..dd501f1 --- /dev/null +++ b/src/MxGateway.Tests/Gateway/Workers/Fakes/FakeWorkerHarness.cs @@ -0,0 +1,378 @@ +using System.Buffers.Binary; +using System.IO.Pipes; +using Google.Protobuf.WellKnownTypes; +using MxGateway.Contracts; +using MxGateway.Contracts.Proto; +using MxGateway.Server.Metrics; +using MxGateway.Server.Workers; + +namespace MxGateway.Tests.Gateway.Workers.Fakes; + +public sealed class FakeWorkerHarness : IAsyncDisposable +{ + public const string DefaultSessionId = "session-fake-worker"; + public const string DefaultNonce = "nonce-fake-worker"; + public const int DefaultWorkerProcessId = 9321; + + private readonly NamedPipeServerStream? _gatewayStream; + private readonly NamedPipeClientStream _workerStream; + private readonly WorkerFrameProtocolOptions _frameOptions; + private readonly WorkerFrameReader _reader; + private readonly WorkerFrameWriter _writer; + private bool _workerSideDisposed; + + private FakeWorkerHarness( + string sessionId, + string nonce, + NamedPipeServerStream? gatewayStream, + NamedPipeClientStream workerStream, + WorkerFrameProtocolOptions frameOptions) + { + SessionId = sessionId; + Nonce = nonce; + _gatewayStream = gatewayStream; + _workerStream = workerStream; + _frameOptions = frameOptions; + _reader = new WorkerFrameReader(_workerStream, frameOptions); + _writer = new WorkerFrameWriter(_workerStream, frameOptions); + } + + public string SessionId { get; } + + public string Nonce { get; } + + public ulong NextWorkerSequence { get; private set; } + + public static async Task CreateConnectedPairAsync( + string sessionId = DefaultSessionId, + string nonce = DefaultNonce, + uint protocolVersion = GatewayContractInfo.WorkerProtocolVersion, + int maxMessageBytes = WorkerFrameProtocolOptions.DefaultMaxMessageBytes, + CancellationToken cancellationToken = default) + { + string pipeName = $"mxaccessgw-fake-worker-{Guid.NewGuid():N}"; + NamedPipeServerStream gatewayStream = new( + pipeName, + PipeDirection.InOut, + maxNumberOfServerInstances: 1, + PipeTransmissionMode.Byte, + PipeOptions.Asynchronous); + NamedPipeClientStream workerStream = CreateWorkerStream(pipeName); + + Task waitForConnectionTask = gatewayStream.WaitForConnectionAsync(cancellationToken); + await workerStream.ConnectAsync(cancellationToken).ConfigureAwait(false); + await waitForConnectionTask.ConfigureAwait(false); + + return new FakeWorkerHarness( + sessionId, + nonce, + gatewayStream, + workerStream, + new WorkerFrameProtocolOptions(sessionId, protocolVersion, maxMessageBytes)); + } + + public static async Task ConnectToGatewayPipeAsync( + string sessionId, + string nonce, + string pipeName, + uint protocolVersion = GatewayContractInfo.WorkerProtocolVersion, + int maxMessageBytes = WorkerFrameProtocolOptions.DefaultMaxMessageBytes, + CancellationToken cancellationToken = default) + { + NamedPipeClientStream workerStream = CreateWorkerStream(pipeName); + await workerStream.ConnectAsync(cancellationToken).ConfigureAwait(false); + + return new FakeWorkerHarness( + sessionId, + nonce, + gatewayStream: null, + workerStream, + new WorkerFrameProtocolOptions(sessionId, protocolVersion, maxMessageBytes)); + } + + public WorkerClient CreateClient( + WorkerClientOptions? options = null, + GatewayMetrics? metrics = null, + TimeProvider? timeProvider = null) + { + if (_gatewayStream is null) + { + throw new InvalidOperationException("This fake worker is connected to a gateway-owned pipe."); + } + + WorkerClientConnection connection = new( + SessionId, + Nonce, + _gatewayStream, + _frameOptions); + + return new WorkerClient(connection, options, metrics, timeProvider); + } + + public async Task CompleteStartupAsync( + int workerProcessId = DefaultWorkerProcessId, + string workerVersion = "fake-worker", + string mxaccessProgid = "LMXProxy.LMXProxyServer.1", + string mxaccessClsid = "{C30B52F5-2CB5-4760-AF0A-3A344A7EB5DC}", + CancellationToken cancellationToken = default) + { + WorkerEnvelope gatewayHello = await ReadGatewayEnvelopeAsync(cancellationToken).ConfigureAwait(false); + if (gatewayHello.BodyCase != WorkerEnvelope.BodyOneofCase.GatewayHello) + { + throw new InvalidOperationException($"Expected GatewayHello but received {gatewayHello.BodyCase}."); + } + + await SendWorkerHelloAsync( + workerProcessId, + workerVersion, + cancellationToken: cancellationToken).ConfigureAwait(false); + await SendWorkerReadyAsync( + workerProcessId, + mxaccessProgid, + mxaccessClsid, + cancellationToken).ConfigureAwait(false); + + return gatewayHello; + } + + public async Task ReadGatewayEnvelopeAsync(CancellationToken cancellationToken = default) + { + return await _reader.ReadAsync(cancellationToken).ConfigureAwait(false); + } + + public async Task ReadCommandAsync(CancellationToken cancellationToken = default) + { + WorkerEnvelope envelope = await ReadGatewayEnvelopeAsync(cancellationToken).ConfigureAwait(false); + if (envelope.BodyCase != WorkerEnvelope.BodyOneofCase.WorkerCommand) + { + throw new InvalidOperationException($"Expected WorkerCommand but received {envelope.BodyCase}."); + } + + return envelope; + } + + public async Task ReadShutdownAsync(CancellationToken cancellationToken = default) + { + WorkerEnvelope envelope = await ReadGatewayEnvelopeAsync(cancellationToken).ConfigureAwait(false); + if (envelope.BodyCase != WorkerEnvelope.BodyOneofCase.WorkerShutdown) + { + throw new InvalidOperationException($"Expected WorkerShutdown but received {envelope.BodyCase}."); + } + + return envelope; + } + + public async Task SendWorkerHelloAsync( + int workerProcessId = DefaultWorkerProcessId, + string workerVersion = "fake-worker", + uint? workerProtocolVersion = null, + string? nonce = null, + CancellationToken cancellationToken = default) + { + await _writer.WriteAsync( + CreateEnvelope( + correlationId: string.Empty, + envelope => envelope.WorkerHello = new WorkerHello + { + ProtocolVersion = workerProtocolVersion ?? _frameOptions.ProtocolVersion, + Nonce = nonce ?? Nonce, + WorkerProcessId = workerProcessId, + WorkerVersion = workerVersion, + }), + cancellationToken).ConfigureAwait(false); + } + + public async Task SendWorkerReadyAsync( + int workerProcessId = DefaultWorkerProcessId, + string mxaccessProgid = "LMXProxy.LMXProxyServer.1", + string mxaccessClsid = "{C30B52F5-2CB5-4760-AF0A-3A344A7EB5DC}", + CancellationToken cancellationToken = default) + { + await _writer.WriteAsync( + CreateEnvelope( + correlationId: string.Empty, + envelope => envelope.WorkerReady = new WorkerReady + { + WorkerProcessId = workerProcessId, + MxaccessProgid = mxaccessProgid, + MxaccessClsid = mxaccessClsid, + ReadyTimestamp = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow), + }), + cancellationToken).ConfigureAwait(false); + } + + public async Task ReplyToCommandAsync( + WorkerEnvelope commandEnvelope, + ProtocolStatusCode statusCode = ProtocolStatusCode.Ok, + string statusMessage = "OK", + CancellationToken cancellationToken = default) + { + if (commandEnvelope.BodyCase != WorkerEnvelope.BodyOneofCase.WorkerCommand) + { + throw new ArgumentException("Command envelope must contain WorkerCommand.", nameof(commandEnvelope)); + } + + MxCommandKind kind = commandEnvelope.WorkerCommand.Command?.Kind ?? MxCommandKind.Unspecified; + await _writer.WriteAsync( + CreateEnvelope( + commandEnvelope.CorrelationId, + envelope => envelope.WorkerCommandReply = new WorkerCommandReply + { + Reply = new MxCommandReply + { + SessionId = SessionId, + CorrelationId = commandEnvelope.CorrelationId, + Kind = kind, + ProtocolStatus = new ProtocolStatus + { + Code = statusCode, + Message = statusMessage, + }, + }, + CompletedTimestamp = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow), + }), + cancellationToken).ConfigureAwait(false); + } + + public async Task EmitEventAsync( + MxEventFamily family, + CancellationToken cancellationToken = default) + { + ulong sequence = NextWorkerSequence + 1; + await _writer.WriteAsync( + CreateEnvelope( + correlationId: string.Empty, + envelope => envelope.WorkerEvent = new WorkerEvent + { + Event = new MxEvent + { + SessionId = SessionId, + Family = family, + WorkerSequence = sequence, + WorkerTimestamp = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow), + }, + }), + cancellationToken).ConfigureAwait(false); + } + + public async Task EmitFaultAsync( + WorkerFaultCategory category, + string diagnosticMessage, + CancellationToken cancellationToken = default) + { + await _writer.WriteAsync( + CreateEnvelope( + correlationId: string.Empty, + envelope => envelope.WorkerFault = new WorkerFault + { + Category = category, + DiagnosticMessage = diagnosticMessage, + ProtocolStatus = new ProtocolStatus + { + Code = ProtocolStatusCode.WorkerUnavailable, + Message = diagnosticMessage, + }, + }), + cancellationToken).ConfigureAwait(false); + } + + public async Task SendShutdownAckAsync( + ProtocolStatusCode statusCode = ProtocolStatusCode.Ok, + CancellationToken cancellationToken = default) + { + await _writer.WriteAsync( + CreateEnvelope( + correlationId: string.Empty, + envelope => envelope.WorkerShutdownAck = new WorkerShutdownAck + { + Status = new ProtocolStatus + { + Code = statusCode, + Message = statusCode.ToString(), + }, + }), + cancellationToken).ConfigureAwait(false); + } + + public async Task WriteMalformedPayloadAsync( + ReadOnlyMemory payload, + CancellationToken cancellationToken = default) + { + if (payload.IsEmpty) + { + throw new ArgumentException("Malformed payload must include at least one byte.", nameof(payload)); + } + + byte[] lengthPrefix = new byte[sizeof(uint)]; + BinaryPrimitives.WriteUInt32LittleEndian(lengthPrefix, (uint)payload.Length); + await _workerStream.WriteAsync(lengthPrefix, cancellationToken).ConfigureAwait(false); + await _workerStream.WriteAsync(payload, cancellationToken).ConfigureAwait(false); + } + + public async Task WriteOversizedFrameHeaderAsync( + uint payloadLength, + CancellationToken cancellationToken = default) + { + if (payloadLength <= _frameOptions.MaxMessageBytes) + { + throw new ArgumentOutOfRangeException( + nameof(payloadLength), + payloadLength, + "Payload length must exceed the configured maximum."); + } + + byte[] lengthPrefix = new byte[sizeof(uint)]; + BinaryPrimitives.WriteUInt32LittleEndian(lengthPrefix, payloadLength); + await _workerStream.WriteAsync(lengthPrefix, cancellationToken).ConfigureAwait(false); + } + + public async ValueTask DisposeWorkerSideAsync() + { + if (_workerSideDisposed) + { + return; + } + + await _workerStream.DisposeAsync().ConfigureAwait(false); + _workerSideDisposed = true; + } + + public async ValueTask DisposeAsync() + { + await DisposeWorkerSideAsync().ConfigureAwait(false); + if (_gatewayStream is not null) + { + await _gatewayStream.DisposeAsync().ConfigureAwait(false); + } + } + + private WorkerEnvelope CreateEnvelope( + string correlationId, + Action setBody) + { + WorkerEnvelope envelope = new() + { + ProtocolVersion = _frameOptions.ProtocolVersion, + SessionId = SessionId, + Sequence = AdvanceSequence(), + CorrelationId = correlationId, + }; + setBody(envelope); + + return envelope; + } + + private ulong AdvanceSequence() + { + return ++NextWorkerSequence; + } + + private static NamedPipeClientStream CreateWorkerStream(string pipeName) + { + return new NamedPipeClientStream( + ".", + pipeName, + PipeDirection.InOut, + PipeOptions.Asynchronous); + } +}