using System.Buffers.Binary; using System.IO.Pipes; using Google.Protobuf.WellKnownTypes; using MxGateway.Contracts; using MxGateway.Contracts.Proto; using MxGateway.Server.Metrics; using MxGateway.Server.Workers; namespace MxGateway.Tests.Gateway.Workers.Fakes; public sealed class FakeWorkerHarness : IAsyncDisposable { public const string DefaultSessionId = "session-fake-worker"; public const string DefaultNonce = "nonce-fake-worker"; public const int DefaultWorkerProcessId = 9321; private readonly NamedPipeServerStream? _gatewayStream; private readonly NamedPipeClientStream _workerStream; private readonly WorkerFrameProtocolOptions _frameOptions; private readonly WorkerFrameReader _reader; private readonly WorkerFrameWriter _writer; private bool _workerSideDisposed; private FakeWorkerHarness( string sessionId, string nonce, NamedPipeServerStream? gatewayStream, NamedPipeClientStream workerStream, WorkerFrameProtocolOptions frameOptions) { SessionId = sessionId; Nonce = nonce; _gatewayStream = gatewayStream; _workerStream = workerStream; _frameOptions = frameOptions; _reader = new WorkerFrameReader(_workerStream, frameOptions); _writer = new WorkerFrameWriter(_workerStream, frameOptions); } public string SessionId { get; } public string Nonce { get; } public ulong NextWorkerSequence { get; private set; } public static async Task CreateConnectedPairAsync( string sessionId = DefaultSessionId, string nonce = DefaultNonce, uint protocolVersion = GatewayContractInfo.WorkerProtocolVersion, int maxMessageBytes = WorkerFrameProtocolOptions.DefaultMaxMessageBytes, CancellationToken cancellationToken = default) { string pipeName = $"mxaccessgw-fake-worker-{Guid.NewGuid():N}"; NamedPipeServerStream gatewayStream = new( pipeName, PipeDirection.InOut, maxNumberOfServerInstances: 1, PipeTransmissionMode.Byte, PipeOptions.Asynchronous); NamedPipeClientStream workerStream = CreateWorkerStream(pipeName); Task waitForConnectionTask = gatewayStream.WaitForConnectionAsync(cancellationToken); await workerStream.ConnectAsync(cancellationToken).ConfigureAwait(false); await waitForConnectionTask.ConfigureAwait(false); return new FakeWorkerHarness( sessionId, nonce, gatewayStream, workerStream, new WorkerFrameProtocolOptions(sessionId, protocolVersion, maxMessageBytes)); } public static async Task ConnectToGatewayPipeAsync( string sessionId, string nonce, string pipeName, uint protocolVersion = GatewayContractInfo.WorkerProtocolVersion, int maxMessageBytes = WorkerFrameProtocolOptions.DefaultMaxMessageBytes, CancellationToken cancellationToken = default) { NamedPipeClientStream workerStream = CreateWorkerStream(pipeName); await workerStream.ConnectAsync(cancellationToken).ConfigureAwait(false); return new FakeWorkerHarness( sessionId, nonce, gatewayStream: null, workerStream, new WorkerFrameProtocolOptions(sessionId, protocolVersion, maxMessageBytes)); } public WorkerClient CreateClient( WorkerClientOptions? options = null, GatewayMetrics? metrics = null, TimeProvider? timeProvider = null) { if (_gatewayStream is null) { throw new InvalidOperationException("This fake worker is connected to a gateway-owned pipe."); } WorkerClientConnection connection = new( SessionId, Nonce, _gatewayStream, _frameOptions); return new WorkerClient(connection, options, metrics, timeProvider); } public async Task CompleteStartupAsync( int workerProcessId = DefaultWorkerProcessId, string workerVersion = "fake-worker", string mxaccessProgid = "LMXProxy.LMXProxyServer.1", string mxaccessClsid = "{C30B52F5-2CB5-4760-AF0A-3A344A7EB5DC}", CancellationToken cancellationToken = default) { WorkerEnvelope gatewayHello = await ReadGatewayEnvelopeAsync(cancellationToken).ConfigureAwait(false); if (gatewayHello.BodyCase != WorkerEnvelope.BodyOneofCase.GatewayHello) { throw new InvalidOperationException($"Expected GatewayHello but received {gatewayHello.BodyCase}."); } await SendWorkerHelloAsync( workerProcessId, workerVersion, cancellationToken: cancellationToken).ConfigureAwait(false); await SendWorkerReadyAsync( workerProcessId, mxaccessProgid, mxaccessClsid, cancellationToken).ConfigureAwait(false); return gatewayHello; } public async Task ReadGatewayEnvelopeAsync(CancellationToken cancellationToken = default) { return await _reader.ReadAsync(cancellationToken).ConfigureAwait(false); } public async Task ReadCommandAsync(CancellationToken cancellationToken = default) { WorkerEnvelope envelope = await ReadGatewayEnvelopeAsync(cancellationToken).ConfigureAwait(false); if (envelope.BodyCase != WorkerEnvelope.BodyOneofCase.WorkerCommand) { throw new InvalidOperationException($"Expected WorkerCommand but received {envelope.BodyCase}."); } return envelope; } public async Task ReadShutdownAsync(CancellationToken cancellationToken = default) { WorkerEnvelope envelope = await ReadGatewayEnvelopeAsync(cancellationToken).ConfigureAwait(false); if (envelope.BodyCase != WorkerEnvelope.BodyOneofCase.WorkerShutdown) { throw new InvalidOperationException($"Expected WorkerShutdown but received {envelope.BodyCase}."); } return envelope; } public async Task SendWorkerHelloAsync( int workerProcessId = DefaultWorkerProcessId, string workerVersion = "fake-worker", uint? workerProtocolVersion = null, string? nonce = null, CancellationToken cancellationToken = default) { await _writer.WriteAsync( CreateEnvelope( correlationId: string.Empty, envelope => envelope.WorkerHello = new WorkerHello { ProtocolVersion = workerProtocolVersion ?? _frameOptions.ProtocolVersion, Nonce = nonce ?? Nonce, WorkerProcessId = workerProcessId, WorkerVersion = workerVersion, }), cancellationToken).ConfigureAwait(false); } public async Task SendWorkerReadyAsync( int workerProcessId = DefaultWorkerProcessId, string mxaccessProgid = "LMXProxy.LMXProxyServer.1", string mxaccessClsid = "{C30B52F5-2CB5-4760-AF0A-3A344A7EB5DC}", CancellationToken cancellationToken = default) { await _writer.WriteAsync( CreateEnvelope( correlationId: string.Empty, envelope => envelope.WorkerReady = new WorkerReady { WorkerProcessId = workerProcessId, MxaccessProgid = mxaccessProgid, MxaccessClsid = mxaccessClsid, ReadyTimestamp = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow), }), cancellationToken).ConfigureAwait(false); } public async Task ReplyToCommandAsync( WorkerEnvelope commandEnvelope, ProtocolStatusCode statusCode = ProtocolStatusCode.Ok, string statusMessage = "OK", Action? configureReply = null, CancellationToken cancellationToken = default) { if (commandEnvelope.BodyCase != WorkerEnvelope.BodyOneofCase.WorkerCommand) { throw new ArgumentException("Command envelope must contain WorkerCommand.", nameof(commandEnvelope)); } MxCommandKind kind = commandEnvelope.WorkerCommand.Command?.Kind ?? MxCommandKind.Unspecified; MxCommandReply reply = new() { SessionId = SessionId, CorrelationId = commandEnvelope.CorrelationId, Kind = kind, ProtocolStatus = new ProtocolStatus { Code = statusCode, Message = statusMessage, }, }; configureReply?.Invoke(reply); await _writer.WriteAsync( CreateEnvelope( commandEnvelope.CorrelationId, envelope => envelope.WorkerCommandReply = new WorkerCommandReply { Reply = reply, CompletedTimestamp = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow), }), cancellationToken).ConfigureAwait(false); } public async Task EmitEventAsync( MxEventFamily family, CancellationToken cancellationToken = default, Action? configureEvent = null) { ulong sequence = NextWorkerSequence + 1; MxEvent mxEvent = new() { SessionId = SessionId, Family = family, WorkerSequence = sequence, WorkerTimestamp = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow), }; configureEvent?.Invoke(mxEvent); await _writer.WriteAsync( CreateEnvelope( correlationId: string.Empty, envelope => envelope.WorkerEvent = new WorkerEvent { Event = mxEvent, }), cancellationToken).ConfigureAwait(false); } public async Task EmitFaultAsync( WorkerFaultCategory category, string diagnosticMessage, CancellationToken cancellationToken = default) { await _writer.WriteAsync( CreateEnvelope( correlationId: string.Empty, envelope => envelope.WorkerFault = new WorkerFault { Category = category, DiagnosticMessage = diagnosticMessage, ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.WorkerUnavailable, Message = diagnosticMessage, }, }), cancellationToken).ConfigureAwait(false); } public async Task SendHeartbeatAsync( WorkerState state = WorkerState.Ready, CancellationToken cancellationToken = default, Action? configureHeartbeat = null) { WorkerHeartbeat heartbeat = new() { WorkerProcessId = DefaultWorkerProcessId, State = state, LastStaActivityTimestamp = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow), }; configureHeartbeat?.Invoke(heartbeat); await _writer.WriteAsync( CreateEnvelope( correlationId: string.Empty, envelope => envelope.WorkerHeartbeat = heartbeat), cancellationToken).ConfigureAwait(false); } public async Task SendShutdownAckAsync( ProtocolStatusCode statusCode = ProtocolStatusCode.Ok, CancellationToken cancellationToken = default) { await _writer.WriteAsync( CreateEnvelope( correlationId: string.Empty, envelope => envelope.WorkerShutdownAck = new WorkerShutdownAck { Status = new ProtocolStatus { Code = statusCode, Message = statusCode.ToString(), }, }), cancellationToken).ConfigureAwait(false); } public async Task WriteMalformedPayloadAsync( ReadOnlyMemory payload, CancellationToken cancellationToken = default) { if (payload.IsEmpty) { throw new ArgumentException("Malformed payload must include at least one byte.", nameof(payload)); } byte[] lengthPrefix = new byte[sizeof(uint)]; BinaryPrimitives.WriteUInt32LittleEndian(lengthPrefix, (uint)payload.Length); await _workerStream.WriteAsync(lengthPrefix, cancellationToken).ConfigureAwait(false); await _workerStream.WriteAsync(payload, cancellationToken).ConfigureAwait(false); } public async Task WriteOversizedFrameHeaderAsync( uint payloadLength, CancellationToken cancellationToken = default) { if (payloadLength <= _frameOptions.MaxMessageBytes) { throw new ArgumentOutOfRangeException( nameof(payloadLength), payloadLength, "Payload length must exceed the configured maximum."); } byte[] lengthPrefix = new byte[sizeof(uint)]; BinaryPrimitives.WriteUInt32LittleEndian(lengthPrefix, payloadLength); await _workerStream.WriteAsync(lengthPrefix, cancellationToken).ConfigureAwait(false); } public async ValueTask DisposeWorkerSideAsync() { if (_workerSideDisposed) { return; } await _workerStream.DisposeAsync().ConfigureAwait(false); _workerSideDisposed = true; } public async ValueTask DisposeAsync() { await DisposeWorkerSideAsync().ConfigureAwait(false); if (_gatewayStream is not null) { await _gatewayStream.DisposeAsync().ConfigureAwait(false); } } private WorkerEnvelope CreateEnvelope( string correlationId, Action setBody) { WorkerEnvelope envelope = new() { ProtocolVersion = _frameOptions.ProtocolVersion, SessionId = SessionId, Sequence = AdvanceSequence(), CorrelationId = correlationId, }; setBody(envelope); return envelope; } private ulong AdvanceSequence() { return ++NextWorkerSequence; } private static NamedPipeClientStream CreateWorkerStream(string pipeName) { return new NamedPipeClientStream( ".", pipeName, PipeDirection.InOut, PipeOptions.Asynchronous); } }