using System; using System.Buffers; using System.IO; using System.Threading; using System.Threading.Tasks; using Google.Protobuf; using MxGateway.Contracts.Proto; namespace MxGateway.Worker.Ipc; /// Reads length-prefixed WorkerEnvelope protobuf frames from a stream. public sealed class WorkerFrameReader { private readonly WorkerFrameProtocolOptions _options; private readonly Stream _stream; /// Initializes the reader with a stream and protocol options. /// Stream to read frames from. /// Protocol options for frame validation. public WorkerFrameReader( Stream stream, WorkerFrameProtocolOptions options) { _stream = stream ?? throw new ArgumentNullException(nameof(stream)); _options = options ?? throw new ArgumentNullException(nameof(options)); } /// Reads and validates a single length-prefixed frame from the stream. /// Token to cancel the asynchronous operation. public async Task ReadAsync(CancellationToken cancellationToken = default) { byte[] lengthPrefix = new byte[sizeof(uint)]; await ReadExactlyOrThrowAsync(lengthPrefix, lengthPrefix.Length, cancellationToken).ConfigureAwait(false); uint payloadLength = ReadUInt32LittleEndian(lengthPrefix); if (payloadLength == 0) { throw new WorkerFrameProtocolException( WorkerFrameProtocolErrorCode.MalformedLength, "Worker frame payload length must be greater than zero."); } if (payloadLength > _options.MaxMessageBytes) { throw new WorkerFrameProtocolException( WorkerFrameProtocolErrorCode.MessageTooLarge, $"Worker frame payload length {payloadLength} exceeds the configured maximum of {_options.MaxMessageBytes} bytes."); } // Rent the payload buffer from the shared pool rather than allocating // a fresh byte[] per frame. ParseFrom copies whatever it needs into // the parsed message, so the rented buffer can be returned as soon as // parsing completes. int length = checked((int)payloadLength); byte[] payload = ArrayPool.Shared.Rent(length); WorkerEnvelope envelope; try { await ReadExactlyOrThrowAsync(payload, length, cancellationToken).ConfigureAwait(false); try { envelope = WorkerEnvelope.Parser.ParseFrom(payload, 0, length); } catch (InvalidProtocolBufferException exception) { throw new WorkerFrameProtocolException( WorkerFrameProtocolErrorCode.InvalidEnvelope, "Worker frame payload is not a valid WorkerEnvelope protobuf message.", exception); } } finally { ArrayPool.Shared.Return(payload); } WorkerEnvelopeValidator.Validate(envelope, _options); return envelope; } private static uint ReadUInt32LittleEndian(byte[] buffer) { return (uint)buffer[0] | ((uint)buffer[1] << 8) | ((uint)buffer[2] << 16) | ((uint)buffer[3] << 24); } private async Task ReadExactlyOrThrowAsync( byte[] buffer, int count, CancellationToken cancellationToken) { int offset = 0; while (offset < count) { int bytesRead = await _stream .ReadAsync(buffer, offset, count - offset, cancellationToken) .ConfigureAwait(false); if (bytesRead == 0) { throw new WorkerFrameProtocolException( WorkerFrameProtocolErrorCode.EndOfStream, "Worker frame ended before the expected number of bytes were read."); } offset += bytesRead; } } }