using System.Buffers.Binary; using System.IO.Pipes; using Google.Protobuf.WellKnownTypes; using ZB.MOM.WW.MxGateway.Contracts; using ZB.MOM.WW.MxGateway.Contracts.Proto; using ZB.MOM.WW.MxGateway.Server.Metrics; using ZB.MOM.WW.MxGateway.Server.Workers; namespace ZB.MOM.WW.MxGateway.Tests.Gateway.Workers.Fakes; public sealed class FakeWorkerHarness : IAsyncDisposable { public const string DefaultSessionId = "session-fake-worker"; public const string DefaultNonce = "nonce-fake-worker"; public const int DefaultWorkerProcessId = 9321; private readonly NamedPipeServerStream? _gatewayStream; private readonly NamedPipeClientStream _workerStream; private readonly WorkerFrameProtocolOptions _frameOptions; private readonly WorkerFrameReader _reader; private readonly WorkerFrameWriter _writer; private bool _workerSideDisposed; private FakeWorkerHarness( string sessionId, string nonce, NamedPipeServerStream? gatewayStream, NamedPipeClientStream workerStream, WorkerFrameProtocolOptions frameOptions) { SessionId = sessionId; Nonce = nonce; _gatewayStream = gatewayStream; _workerStream = workerStream; _frameOptions = frameOptions; _reader = new WorkerFrameReader(_workerStream, frameOptions); _writer = new WorkerFrameWriter(_workerStream, frameOptions); } /// Gets the session ID for the fake worker harness. public string SessionId { get; } /// Gets the nonce for the fake worker harness. public string Nonce { get; } /// Gets or sets the next worker sequence number. public ulong NextWorkerSequence { get; private set; } /// Creates a connected pair of fake worker harness with gateway and worker pipes. /// Identifier for the fake session. /// Nonce for session validation. /// Protocol version for frame communication. /// Maximum message size in bytes. /// Token to cancel the asynchronous operation. public static async Task CreateConnectedPairAsync( string sessionId = DefaultSessionId, string nonce = DefaultNonce, uint protocolVersion = GatewayContractInfo.WorkerProtocolVersion, int maxMessageBytes = WorkerFrameProtocolOptions.DefaultMaxMessageBytes, CancellationToken cancellationToken = default) { string pipeName = $"mxaccessgw-fake-worker-{Guid.NewGuid():N}"; NamedPipeServerStream gatewayStream = new( pipeName, PipeDirection.InOut, maxNumberOfServerInstances: 1, PipeTransmissionMode.Byte, PipeOptions.Asynchronous); NamedPipeClientStream workerStream = CreateWorkerStream(pipeName); Task waitForConnectionTask = gatewayStream.WaitForConnectionAsync(cancellationToken); await workerStream.ConnectAsync(cancellationToken).ConfigureAwait(false); await waitForConnectionTask.ConfigureAwait(false); return new FakeWorkerHarness( sessionId, nonce, gatewayStream, workerStream, new WorkerFrameProtocolOptions(sessionId, protocolVersion, maxMessageBytes)); } /// Connects to an existing gateway pipe as a fake worker harness. /// Identifier for the fake session. /// Nonce for session validation. /// Name of the named pipe to connect to. /// Protocol version for frame communication. /// Maximum message size in bytes. /// Token to cancel the asynchronous operation. public static async Task ConnectToGatewayPipeAsync( string sessionId, string nonce, string pipeName, uint protocolVersion = GatewayContractInfo.WorkerProtocolVersion, int maxMessageBytes = WorkerFrameProtocolOptions.DefaultMaxMessageBytes, CancellationToken cancellationToken = default) { NamedPipeClientStream workerStream = CreateWorkerStream(pipeName); await workerStream.ConnectAsync(cancellationToken).ConfigureAwait(false); return new FakeWorkerHarness( sessionId, nonce, gatewayStream: null, workerStream, new WorkerFrameProtocolOptions(sessionId, protocolVersion, maxMessageBytes)); } /// Creates a worker client connected to the fake worker harness. /// Configuration options for the worker client. /// Gateway metrics collector. /// Time provider for timestamps. /// A configured worker client connected to this harness. public WorkerClient CreateClient( WorkerClientOptions? options = null, GatewayMetrics? metrics = null, TimeProvider? timeProvider = null) { if (_gatewayStream is null) { throw new InvalidOperationException("This fake worker is connected to a gateway-owned pipe."); } WorkerClientConnection connection = new( SessionId, Nonce, _gatewayStream, _frameOptions); return new WorkerClient(connection, options, metrics, timeProvider); } /// Completes the worker startup handshake by reading the gateway hello and sending worker hello and ready. /// Process ID of the fake worker. /// Version string of the fake worker. /// MXAccess COM ProgID. /// MXAccess COM CLSID. /// Token to cancel the asynchronous operation. /// The gateway hello envelope received during startup. public async Task CompleteStartupAsync( int workerProcessId = DefaultWorkerProcessId, string workerVersion = "fake-worker", string mxaccessProgid = "LMXProxy.LMXProxyServer.1", string mxaccessClsid = "{C30B52F5-2CB5-4760-AF0A-3A344A7EB5DC}", CancellationToken cancellationToken = default) { WorkerEnvelope gatewayHello = await ReadGatewayEnvelopeAsync(cancellationToken).ConfigureAwait(false); if (gatewayHello.BodyCase != WorkerEnvelope.BodyOneofCase.GatewayHello) { throw new InvalidOperationException($"Expected GatewayHello but received {gatewayHello.BodyCase}."); } await SendWorkerHelloAsync( workerProcessId, workerVersion, cancellationToken: cancellationToken).ConfigureAwait(false); await SendWorkerReadyAsync( workerProcessId, mxaccessProgid, mxaccessClsid, cancellationToken).ConfigureAwait(false); return gatewayHello; } /// Reads the next gateway envelope from the worker stream. /// Token to cancel the asynchronous operation. /// The gateway envelope read from the stream. public async Task ReadGatewayEnvelopeAsync(CancellationToken cancellationToken = default) { return await _reader.ReadAsync(cancellationToken).ConfigureAwait(false); } /// Reads the next command from the worker stream. /// Token to cancel the asynchronous operation. /// The command envelope read from the stream. public async Task ReadCommandAsync(CancellationToken cancellationToken = default) { WorkerEnvelope envelope = await ReadGatewayEnvelopeAsync(cancellationToken).ConfigureAwait(false); if (envelope.BodyCase != WorkerEnvelope.BodyOneofCase.WorkerCommand) { throw new InvalidOperationException($"Expected WorkerCommand but received {envelope.BodyCase}."); } return envelope; } /// Reads the next shutdown request from the worker stream. /// Token to cancel the asynchronous operation. /// The shutdown envelope read from the stream. public async Task ReadShutdownAsync(CancellationToken cancellationToken = default) { WorkerEnvelope envelope = await ReadGatewayEnvelopeAsync(cancellationToken).ConfigureAwait(false); if (envelope.BodyCase != WorkerEnvelope.BodyOneofCase.WorkerShutdown) { throw new InvalidOperationException($"Expected WorkerShutdown but received {envelope.BodyCase}."); } return envelope; } /// Sends a worker hello message to the gateway. /// Process ID of the fake worker. /// Version string of the fake worker. /// Protocol version override. /// Nonce override. /// Token to cancel the asynchronous operation. public async Task SendWorkerHelloAsync( int workerProcessId = DefaultWorkerProcessId, string workerVersion = "fake-worker", uint? workerProtocolVersion = null, string? nonce = null, CancellationToken cancellationToken = default) { await _writer.WriteAsync( CreateEnvelope( correlationId: string.Empty, envelope => envelope.WorkerHello = new WorkerHello { ProtocolVersion = workerProtocolVersion ?? _frameOptions.ProtocolVersion, Nonce = nonce ?? Nonce, WorkerProcessId = workerProcessId, WorkerVersion = workerVersion, }), cancellationToken).ConfigureAwait(false); } /// Sends a worker ready message to the gateway. /// Process ID of the fake worker. /// MXAccess COM ProgID. /// MXAccess COM CLSID. /// Token to cancel the asynchronous operation. public async Task SendWorkerReadyAsync( int workerProcessId = DefaultWorkerProcessId, string mxaccessProgid = "LMXProxy.LMXProxyServer.1", string mxaccessClsid = "{C30B52F5-2CB5-4760-AF0A-3A344A7EB5DC}", CancellationToken cancellationToken = default) { await _writer.WriteAsync( CreateEnvelope( correlationId: string.Empty, envelope => envelope.WorkerReady = new WorkerReady { WorkerProcessId = workerProcessId, MxaccessProgid = mxaccessProgid, MxaccessClsid = mxaccessClsid, ReadyTimestamp = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow), }), cancellationToken).ConfigureAwait(false); } /// Sends a reply to a command received from the gateway. /// The command envelope to reply to. /// Protocol status code for the reply. /// Human-readable status message. /// Optional callback to customize the reply. /// Token to cancel the asynchronous operation. public async Task ReplyToCommandAsync( WorkerEnvelope commandEnvelope, ProtocolStatusCode statusCode = ProtocolStatusCode.Ok, string statusMessage = "OK", Action? configureReply = null, CancellationToken cancellationToken = default) { if (commandEnvelope.BodyCase != WorkerEnvelope.BodyOneofCase.WorkerCommand) { throw new ArgumentException("Command envelope must contain WorkerCommand.", nameof(commandEnvelope)); } MxCommandKind kind = commandEnvelope.WorkerCommand.Command?.Kind ?? MxCommandKind.Unspecified; MxCommandReply reply = new() { SessionId = SessionId, CorrelationId = commandEnvelope.CorrelationId, Kind = kind, ProtocolStatus = new ProtocolStatus { Code = statusCode, Message = statusMessage, }, }; configureReply?.Invoke(reply); await _writer.WriteAsync( CreateEnvelope( commandEnvelope.CorrelationId, envelope => envelope.WorkerCommandReply = new WorkerCommandReply { Reply = reply, CompletedTimestamp = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow), }), cancellationToken).ConfigureAwait(false); } /// Emits an event to the gateway. /// Family of the event to emit. /// Token to cancel the asynchronous operation. /// Optional callback to customize the event. public async Task EmitEventAsync( MxEventFamily family, CancellationToken cancellationToken = default, Action? configureEvent = null) { ulong sequence = NextWorkerSequence + 1; MxEvent mxEvent = new() { SessionId = SessionId, Family = family, WorkerSequence = sequence, WorkerTimestamp = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow), }; configureEvent?.Invoke(mxEvent); await _writer.WriteAsync( CreateEnvelope( correlationId: string.Empty, envelope => envelope.WorkerEvent = new WorkerEvent { Event = mxEvent, }), cancellationToken).ConfigureAwait(false); } /// Emits a fault message to the gateway. /// Category of the fault. /// Diagnostic message describing the fault. /// Token to cancel the asynchronous operation. public async Task EmitFaultAsync( WorkerFaultCategory category, string diagnosticMessage, CancellationToken cancellationToken = default) { await _writer.WriteAsync( CreateEnvelope( correlationId: string.Empty, envelope => envelope.WorkerFault = new WorkerFault { Category = category, DiagnosticMessage = diagnosticMessage, ProtocolStatus = new ProtocolStatus { Code = ProtocolStatusCode.WorkerUnavailable, Message = diagnosticMessage, }, }), cancellationToken).ConfigureAwait(false); } /// Sends a heartbeat message to the gateway. /// Current worker state. /// Token to cancel the asynchronous operation. /// Optional callback to customize the heartbeat. public async Task SendHeartbeatAsync( WorkerState state = WorkerState.Ready, CancellationToken cancellationToken = default, Action? configureHeartbeat = null) { WorkerHeartbeat heartbeat = new() { WorkerProcessId = DefaultWorkerProcessId, State = state, LastStaActivityTimestamp = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow), }; configureHeartbeat?.Invoke(heartbeat); await _writer.WriteAsync( CreateEnvelope( correlationId: string.Empty, envelope => envelope.WorkerHeartbeat = heartbeat), cancellationToken).ConfigureAwait(false); } /// Sends a shutdown acknowledgment message to the gateway. /// Protocol status code for the acknowledgment. /// Token to cancel the asynchronous operation. public async Task SendShutdownAckAsync( ProtocolStatusCode statusCode = ProtocolStatusCode.Ok, CancellationToken cancellationToken = default) { await _writer.WriteAsync( CreateEnvelope( correlationId: string.Empty, envelope => envelope.WorkerShutdownAck = new WorkerShutdownAck { Status = new ProtocolStatus { Code = statusCode, Message = statusCode.ToString(), }, }), cancellationToken).ConfigureAwait(false); } /// /// Reads one incoming command envelope and, if it is one of the five /// control command kinds (Ping, GetSessionState, GetWorkerInfo, DrainEvents, /// ShutdownWorker), writes a canned reply that mirrors the real worker's /// reply shape. For ShutdownWorker the method additionally sends a /// after the OK reply, matching the real /// worker's shutdown flow. /// /// Token to cancel the asynchronous operation. /// The command envelope that was handled. /// /// Thrown when the next envelope is not a WorkerCommand or contains a /// non-control command kind. /// public async Task RespondToControlCommandAsync( CancellationToken cancellationToken = default) { WorkerEnvelope commandEnvelope = await ReadCommandAsync(cancellationToken).ConfigureAwait(false); return await RespondToControlCommandAsync(commandEnvelope, cancellationToken).ConfigureAwait(false); } /// /// Accepts an already-read command envelope and, if it is one of the five control /// command kinds (Ping, GetSessionState, GetWorkerInfo, DrainEvents, ShutdownWorker), /// writes a canned reply that mirrors the real worker's reply shape. For ShutdownWorker /// the method additionally sends a after the OK reply. /// Use this overload when the caller has already consumed the envelope from the pipe /// (e.g., to inspect the kind before routing) to avoid re-reading. /// /// The already-read command envelope to respond to. /// Token to cancel the asynchronous operation. /// The command envelope that was handled. /// /// Thrown when does not contain a WorkerCommand. /// /// /// Thrown when the command kind is not one of the five control command kinds. /// public async Task RespondToControlCommandAsync( WorkerEnvelope commandEnvelope, CancellationToken cancellationToken = default) { if (commandEnvelope.BodyCase != WorkerEnvelope.BodyOneofCase.WorkerCommand) { throw new ArgumentException( $"Expected WorkerCommand envelope but received {commandEnvelope.BodyCase}.", nameof(commandEnvelope)); } MxCommand command = commandEnvelope.WorkerCommand.Command; switch (command.Kind) { case MxCommandKind.Ping: await ReplyToCommandAsync( commandEnvelope, configureReply: reply => { string? message = command.Ping?.Message; if (!string.IsNullOrEmpty(message)) { reply.DiagnosticMessage = message; } }, cancellationToken: cancellationToken).ConfigureAwait(false); break; case MxCommandKind.GetSessionState: await ReplyToCommandAsync( commandEnvelope, configureReply: reply => reply.SessionState = new SessionStateReply { State = SessionState.Ready, }, cancellationToken: cancellationToken).ConfigureAwait(false); break; case MxCommandKind.GetWorkerInfo: await ReplyToCommandAsync( commandEnvelope, configureReply: reply => reply.WorkerInfo = new WorkerInfoReply { WorkerProcessId = DefaultWorkerProcessId, WorkerVersion = "fake-worker", MxaccessProgid = "LMXProxy.LMXProxyServer.1", MxaccessClsid = "{C30B52F5-2CB5-4760-AF0A-3A344A7EB5DC}", }, cancellationToken: cancellationToken).ConfigureAwait(false); break; case MxCommandKind.DrainEvents: await ReplyToCommandAsync( commandEnvelope, configureReply: reply => reply.DrainEvents = new DrainEventsReply(), cancellationToken: cancellationToken).ConfigureAwait(false); break; case MxCommandKind.ShutdownWorker: await ReplyToCommandAsync( commandEnvelope, cancellationToken: cancellationToken).ConfigureAwait(false); await SendShutdownAckAsync(cancellationToken: cancellationToken).ConfigureAwait(false); break; default: throw new InvalidOperationException( $"RespondToControlCommandAsync only handles control command kinds; received {command.Kind}."); } return commandEnvelope; } /// Writes a malformed payload directly to the worker stream. /// Malformed payload bytes to write. /// Token to cancel the asynchronous operation. public async Task WriteMalformedPayloadAsync( ReadOnlyMemory payload, CancellationToken cancellationToken = default) { if (payload.IsEmpty) { throw new ArgumentException("Malformed payload must include at least one byte.", nameof(payload)); } byte[] lengthPrefix = new byte[sizeof(uint)]; BinaryPrimitives.WriteUInt32LittleEndian(lengthPrefix, (uint)payload.Length); await _workerStream.WriteAsync(lengthPrefix, cancellationToken).ConfigureAwait(false); await _workerStream.WriteAsync(payload, cancellationToken).ConfigureAwait(false); } /// Writes an oversized frame header to the worker stream for testing frame size limits. /// Length of the oversized payload in bytes. /// Token to cancel the asynchronous operation. public async Task WriteOversizedFrameHeaderAsync( uint payloadLength, CancellationToken cancellationToken = default) { if (payloadLength <= _frameOptions.MaxMessageBytes) { throw new ArgumentOutOfRangeException( nameof(payloadLength), payloadLength, "Payload length must exceed the configured maximum."); } byte[] lengthPrefix = new byte[sizeof(uint)]; BinaryPrimitives.WriteUInt32LittleEndian(lengthPrefix, payloadLength); await _workerStream.WriteAsync(lengthPrefix, cancellationToken).ConfigureAwait(false); } /// Disposes the worker-side stream. public async ValueTask DisposeWorkerSideAsync() { if (_workerSideDisposed) { return; } await _workerStream.DisposeAsync().ConfigureAwait(false); _workerSideDisposed = true; } /// public async ValueTask DisposeAsync() { await DisposeWorkerSideAsync().ConfigureAwait(false); if (_gatewayStream is not null) { await _gatewayStream.DisposeAsync().ConfigureAwait(false); } } private WorkerEnvelope CreateEnvelope( string correlationId, Action setBody) { WorkerEnvelope envelope = new() { ProtocolVersion = _frameOptions.ProtocolVersion, SessionId = SessionId, Sequence = AdvanceSequence(), CorrelationId = correlationId, }; setBody(envelope); return envelope; } private ulong AdvanceSequence() { return ++NextWorkerSequence; } private static NamedPipeClientStream CreateWorkerStream(string pipeName) { return new NamedPipeClientStream( ".", pipeName, PipeDirection.InOut, PipeOptions.Asynchronous); } }