Files
mxaccessgw/src/MxGateway.Worker/Ipc/WorkerFrameReader.cs
T
2026-04-26 17:16:49 -04:00

94 lines
3.0 KiB
C#

using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Google.Protobuf;
using MxGateway.Contracts.Proto;
namespace MxGateway.Worker.Ipc;
public sealed class WorkerFrameReader
{
private readonly WorkerFrameProtocolOptions _options;
private readonly Stream _stream;
public WorkerFrameReader(
Stream stream,
WorkerFrameProtocolOptions options)
{
_stream = stream ?? throw new ArgumentNullException(nameof(stream));
_options = options ?? throw new ArgumentNullException(nameof(options));
}
public async Task<WorkerEnvelope> ReadAsync(CancellationToken cancellationToken = default)
{
byte[] lengthPrefix = new byte[sizeof(uint)];
await ReadExactlyOrThrowAsync(lengthPrefix, cancellationToken).ConfigureAwait(false);
uint payloadLength = ReadUInt32LittleEndian(lengthPrefix);
if (payloadLength == 0)
{
throw new WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode.MalformedLength,
"Worker frame payload length must be greater than zero.");
}
if (payloadLength > _options.MaxMessageBytes)
{
throw new WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode.MessageTooLarge,
$"Worker frame payload length {payloadLength} exceeds the configured maximum of {_options.MaxMessageBytes} bytes.");
}
byte[] payload = new byte[payloadLength];
await ReadExactlyOrThrowAsync(payload, cancellationToken).ConfigureAwait(false);
WorkerEnvelope envelope;
try
{
envelope = WorkerEnvelope.Parser.ParseFrom(payload);
}
catch (InvalidProtocolBufferException exception)
{
throw new WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode.InvalidEnvelope,
"Worker frame payload is not a valid WorkerEnvelope protobuf message.",
exception);
}
WorkerEnvelopeValidator.Validate(envelope, _options);
return envelope;
}
private static uint ReadUInt32LittleEndian(byte[] buffer)
{
return (uint)buffer[0]
| ((uint)buffer[1] << 8)
| ((uint)buffer[2] << 16)
| ((uint)buffer[3] << 24);
}
private async Task ReadExactlyOrThrowAsync(
byte[] buffer,
CancellationToken cancellationToken)
{
int offset = 0;
while (offset < buffer.Length)
{
int bytesRead = await _stream
.ReadAsync(buffer, offset, buffer.Length - offset, cancellationToken)
.ConfigureAwait(false);
if (bytesRead == 0)
{
throw new WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode.EndOfStream,
"Worker frame ended before the expected number of bytes were read.");
}
offset += bytesRead;
}
}
}