Issue #9: implement worker frame protocol

This commit is contained in:
Joseph Doherty
2026-04-26 16:20:16 -04:00
parent 03ab36c4d5
commit 7a7a9e4d11
9 changed files with 511 additions and 0 deletions
+54
View File
@@ -0,0 +1,54 @@
# Worker Frame Protocol
The gateway uses the worker frame protocol to move `WorkerEnvelope` protobuf
messages over a bidirectional named pipe. The frame layer is deliberately small:
it handles message boundaries, size limits, protobuf parsing, and envelope
validation before higher-level worker client code routes commands, replies,
events, and faults.
## Frame Format
Each frame starts with a four-byte little-endian unsigned payload length,
followed by the serialized `WorkerEnvelope` payload:
```text
uint32 little-endian payload_length
payload_length bytes protobuf WorkerEnvelope
```
The reader rejects zero-length payloads and payloads larger than the configured
maximum before allocating the payload buffer. The default maximum is 16 MiB,
matching the gateway process design.
## Envelope Validation
`WorkerFrameReader` and `WorkerFrameWriter` validate each envelope against the
owning session before returning or writing it:
- `protocol_version` must match the configured worker protocol version,
- `session_id` must match the owning gateway session,
- the envelope must contain one typed `body` value.
Protocol violations throw `WorkerFrameProtocolException` with a
`WorkerFrameProtocolErrorCode` so callers can distinguish malformed frames,
oversized frames, protocol version mismatches, and session mismatches.
## Verification
Run the focused tests after changing the frame protocol:
```bash
dotnet test src/MxGateway.Tests/MxGateway.Tests.csproj --filter WorkerFrameProtocolTests
```
Run the gateway build because the frame protocol is part of
`MxGateway.Server`:
```bash
dotnet build src/MxGateway.Server/MxGateway.Server.csproj
```
## Related Documentation
- [Gateway Process Detailed Design](./gateway-process-design.md)
- [Protobuf Contracts](./Contracts.md)
+2
View File
@@ -45,6 +45,8 @@ Detailed follow-up docs:
- `docs/gateway-process-design.md` covers the .NET 10 gateway process,
session manager, worker supervision, gRPC API, event streaming, fault model,
security, observability, and test strategy.
- `docs/WorkerFrameProtocol.md` covers the gateway-side named-pipe frame
reader/writer and `WorkerEnvelope` validation rules.
- `docs/mxaccess-worker-instance-design.md` covers each .NET Framework 4.8 x86
MXAccess worker instance, including STA ownership, message pumping, COM
lifetime, command dispatch, event sinks, conversion, and shutdown.
@@ -0,0 +1,32 @@
using MxGateway.Contracts.Proto;
namespace MxGateway.Server.Workers;
internal static class WorkerEnvelopeValidator
{
public static void Validate(
WorkerEnvelope envelope,
WorkerFrameProtocolOptions options)
{
if (envelope.ProtocolVersion != options.ProtocolVersion)
{
throw new WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode.ProtocolVersionMismatch,
$"Worker envelope protocol version {envelope.ProtocolVersion} does not match expected version {options.ProtocolVersion}.");
}
if (!string.Equals(envelope.SessionId, options.SessionId, StringComparison.Ordinal))
{
throw new WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode.SessionMismatch,
"Worker envelope session id does not match the owning gateway session.");
}
if (envelope.BodyCase == WorkerEnvelope.BodyOneofCase.None)
{
throw new WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode.InvalidEnvelope,
"Worker envelope must include a typed body.");
}
}
}
@@ -0,0 +1,13 @@
namespace MxGateway.Server.Workers;
public enum WorkerFrameProtocolErrorCode
{
Unknown = 0,
InvalidConfiguration = 1,
EndOfStream = 2,
MalformedLength = 3,
MessageTooLarge = 4,
InvalidEnvelope = 5,
ProtocolVersionMismatch = 6,
SessionMismatch = 7,
}
@@ -0,0 +1,23 @@
namespace MxGateway.Server.Workers;
public sealed class WorkerFrameProtocolException : Exception
{
public WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode errorCode,
string message)
: base(message)
{
ErrorCode = errorCode;
}
public WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode errorCode,
string message,
Exception innerException)
: base(message, innerException)
{
ErrorCode = errorCode;
}
public WorkerFrameProtocolErrorCode ErrorCode { get; }
}
@@ -0,0 +1,53 @@
using MxGateway.Contracts;
namespace MxGateway.Server.Workers;
public sealed class WorkerFrameProtocolOptions
{
public const int DefaultMaxMessageBytes = 16 * 1024 * 1024;
public WorkerFrameProtocolOptions(string sessionId)
: this(
sessionId,
GatewayContractInfo.WorkerProtocolVersion,
DefaultMaxMessageBytes)
{
}
public WorkerFrameProtocolOptions(
string sessionId,
uint protocolVersion,
int maxMessageBytes)
{
if (string.IsNullOrWhiteSpace(sessionId))
{
throw new WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode.InvalidConfiguration,
"Worker frame protocol requires a session id.");
}
if (protocolVersion == 0)
{
throw new WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode.InvalidConfiguration,
"Worker frame protocol requires a non-zero protocol version.");
}
if (maxMessageBytes <= 0)
{
throw new WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode.InvalidConfiguration,
"Worker frame protocol max message size must be greater than zero.");
}
SessionId = sessionId;
ProtocolVersion = protocolVersion;
MaxMessageBytes = maxMessageBytes;
}
public string SessionId { get; }
public uint ProtocolVersion { get; }
public int MaxMessageBytes { get; }
}
@@ -0,0 +1,77 @@
using System.Buffers.Binary;
using Google.Protobuf;
using MxGateway.Contracts.Proto;
namespace MxGateway.Server.Workers;
public sealed class WorkerFrameReader
{
private readonly WorkerFrameProtocolOptions _options;
private readonly Stream _stream;
public WorkerFrameReader(
Stream stream,
WorkerFrameProtocolOptions options)
{
_stream = stream ?? throw new ArgumentNullException(nameof(stream));
_options = options ?? throw new ArgumentNullException(nameof(options));
}
public async ValueTask<WorkerEnvelope> ReadAsync(CancellationToken cancellationToken = default)
{
byte[] lengthPrefix = new byte[sizeof(uint)];
await ReadExactlyOrThrowAsync(lengthPrefix, cancellationToken).ConfigureAwait(false);
uint payloadLength = BinaryPrimitives.ReadUInt32LittleEndian(lengthPrefix);
if (payloadLength == 0)
{
throw new WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode.MalformedLength,
"Worker frame payload length must be greater than zero.");
}
if (payloadLength > _options.MaxMessageBytes)
{
throw new WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode.MessageTooLarge,
$"Worker frame payload length {payloadLength} exceeds the configured maximum of {_options.MaxMessageBytes} bytes.");
}
byte[] payload = new byte[payloadLength];
await ReadExactlyOrThrowAsync(payload, cancellationToken).ConfigureAwait(false);
WorkerEnvelope envelope;
try
{
envelope = WorkerEnvelope.Parser.ParseFrom(payload);
}
catch (InvalidProtocolBufferException exception)
{
throw new WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode.InvalidEnvelope,
"Worker frame payload is not a valid WorkerEnvelope protobuf message.",
exception);
}
WorkerEnvelopeValidator.Validate(envelope, _options);
return envelope;
}
private async ValueTask ReadExactlyOrThrowAsync(
Memory<byte> buffer,
CancellationToken cancellationToken)
{
try
{
await _stream.ReadExactlyAsync(buffer, cancellationToken).ConfigureAwait(false);
}
catch (EndOfStreamException exception)
{
throw new WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode.EndOfStream,
"Worker frame ended before the expected number of bytes were read.",
exception);
}
}
}
@@ -0,0 +1,48 @@
using System.Buffers.Binary;
using Google.Protobuf;
using MxGateway.Contracts.Proto;
namespace MxGateway.Server.Workers;
public sealed class WorkerFrameWriter
{
private readonly WorkerFrameProtocolOptions _options;
private readonly Stream _stream;
public WorkerFrameWriter(
Stream stream,
WorkerFrameProtocolOptions options)
{
_stream = stream ?? throw new ArgumentNullException(nameof(stream));
_options = options ?? throw new ArgumentNullException(nameof(options));
}
public async ValueTask WriteAsync(
WorkerEnvelope envelope,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(envelope);
WorkerEnvelopeValidator.Validate(envelope, _options);
int payloadLength = envelope.CalculateSize();
if (payloadLength == 0)
{
throw new WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode.InvalidEnvelope,
"Worker envelope cannot serialize to an empty payload.");
}
if (payloadLength > _options.MaxMessageBytes)
{
throw new WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode.MessageTooLarge,
$"Worker envelope payload length {payloadLength} exceeds the configured maximum of {_options.MaxMessageBytes} bytes.");
}
byte[] lengthPrefix = new byte[sizeof(uint)];
BinaryPrimitives.WriteUInt32LittleEndian(lengthPrefix, (uint)payloadLength);
await _stream.WriteAsync(lengthPrefix, cancellationToken).ConfigureAwait(false);
await _stream.WriteAsync(envelope.ToByteArray(), cancellationToken).ConfigureAwait(false);
}
}
@@ -0,0 +1,209 @@
using System.Buffers.Binary;
using Google.Protobuf;
using MxGateway.Contracts;
using MxGateway.Contracts.Proto;
using MxGateway.Server.Workers;
namespace MxGateway.Tests.Gateway.Workers;
public sealed class WorkerFrameProtocolTests
{
private const string SessionId = "session-1";
[Fact]
public async Task WriteAndReadAsync_WithValidEnvelope_RoundTripsFrame()
{
WorkerFrameProtocolOptions options = new(SessionId);
await using MemoryStream stream = new();
WorkerEnvelope original = CreateEnvelope();
WorkerFrameWriter writer = new(stream, options);
await writer.WriteAsync(original);
stream.Position = 0;
WorkerFrameReader reader = new(stream, options);
WorkerEnvelope parsed = await reader.ReadAsync();
Assert.Equal(original, parsed);
}
[Fact]
public async Task ReadAsync_WithPartialReads_ReassemblesFrame()
{
WorkerFrameProtocolOptions options = new(SessionId);
WorkerEnvelope original = CreateEnvelope();
byte[] frame = CreateFrame(original);
await using ChunkedReadStream stream = new(frame, chunkSize: 2);
WorkerFrameReader reader = new(stream, options);
WorkerEnvelope parsed = await reader.ReadAsync();
Assert.Equal(original, parsed);
Assert.True(stream.ReadCallCount > 2);
}
[Fact]
public async Task ReadAsync_WithZeroLengthFrame_ThrowsMalformedLength()
{
WorkerFrameProtocolOptions options = new(SessionId);
await using MemoryStream stream = new(new byte[sizeof(uint)]);
WorkerFrameReader reader = new(stream, options);
WorkerFrameProtocolException exception =
await Assert.ThrowsAsync<WorkerFrameProtocolException>(
async () => await reader.ReadAsync());
Assert.Equal(WorkerFrameProtocolErrorCode.MalformedLength, exception.ErrorCode);
}
[Fact]
public async Task ReadAsync_WithOversizedLength_ThrowsBeforePayloadAllocation()
{
WorkerFrameProtocolOptions options = new(SessionId, GatewayContractInfo.WorkerProtocolVersion, maxMessageBytes: 16);
byte[] lengthPrefix = new byte[sizeof(uint)];
BinaryPrimitives.WriteUInt32LittleEndian(lengthPrefix, 17);
await using MemoryStream stream = new(lengthPrefix);
WorkerFrameReader reader = new(stream, options);
WorkerFrameProtocolException exception =
await Assert.ThrowsAsync<WorkerFrameProtocolException>(
async () => await reader.ReadAsync());
Assert.Equal(WorkerFrameProtocolErrorCode.MessageTooLarge, exception.ErrorCode);
}
[Fact]
public async Task ReadAsync_WithWrongProtocolVersion_ThrowsProtocolVersionMismatch()
{
WorkerFrameProtocolOptions options = new(SessionId);
WorkerEnvelope envelope = CreateEnvelope();
envelope.ProtocolVersion++;
await using MemoryStream stream = new(CreateFrame(envelope));
WorkerFrameReader reader = new(stream, options);
WorkerFrameProtocolException exception =
await Assert.ThrowsAsync<WorkerFrameProtocolException>(
async () => await reader.ReadAsync());
Assert.Equal(WorkerFrameProtocolErrorCode.ProtocolVersionMismatch, exception.ErrorCode);
}
[Fact]
public async Task ReadAsync_WithWrongSessionId_ThrowsSessionMismatch()
{
WorkerFrameProtocolOptions options = new(SessionId);
WorkerEnvelope envelope = CreateEnvelope();
envelope.SessionId = "different-session";
await using MemoryStream stream = new(CreateFrame(envelope));
WorkerFrameReader reader = new(stream, options);
WorkerFrameProtocolException exception =
await Assert.ThrowsAsync<WorkerFrameProtocolException>(
async () => await reader.ReadAsync());
Assert.Equal(WorkerFrameProtocolErrorCode.SessionMismatch, exception.ErrorCode);
}
[Fact]
public async Task ReadAsync_WithMalformedPayload_ThrowsInvalidEnvelope()
{
WorkerFrameProtocolOptions options = new(SessionId);
byte[] frame = CreateFrame([0x80]);
await using MemoryStream stream = new(frame);
WorkerFrameReader reader = new(stream, options);
WorkerFrameProtocolException exception =
await Assert.ThrowsAsync<WorkerFrameProtocolException>(
async () => await reader.ReadAsync());
Assert.Equal(WorkerFrameProtocolErrorCode.InvalidEnvelope, exception.ErrorCode);
}
[Fact]
public async Task ReadAsync_WithMissingEnvelopeBody_ThrowsInvalidEnvelope()
{
WorkerFrameProtocolOptions options = new(SessionId);
WorkerEnvelope envelope = CreateEnvelope();
envelope.ClearBody();
await using MemoryStream stream = new(CreateFrame(envelope));
WorkerFrameReader reader = new(stream, options);
WorkerFrameProtocolException exception =
await Assert.ThrowsAsync<WorkerFrameProtocolException>(
async () => await reader.ReadAsync());
Assert.Equal(WorkerFrameProtocolErrorCode.InvalidEnvelope, exception.ErrorCode);
}
[Fact]
public async Task WriteAsync_WithOversizedEnvelope_ThrowsMessageTooLarge()
{
WorkerFrameProtocolOptions options = new(SessionId, GatewayContractInfo.WorkerProtocolVersion, maxMessageBytes: 8);
await using MemoryStream stream = new();
WorkerFrameWriter writer = new(stream, options);
WorkerFrameProtocolException exception =
await Assert.ThrowsAsync<WorkerFrameProtocolException>(
async () => await writer.WriteAsync(CreateEnvelope()));
Assert.Equal(WorkerFrameProtocolErrorCode.MessageTooLarge, exception.ErrorCode);
Assert.Equal(0, stream.Length);
}
private static WorkerEnvelope CreateEnvelope()
{
return new WorkerEnvelope
{
ProtocolVersion = GatewayContractInfo.WorkerProtocolVersion,
SessionId = SessionId,
Sequence = 1,
CorrelationId = "correlation-1",
WorkerHello = new WorkerHello
{
ProtocolVersion = GatewayContractInfo.WorkerProtocolVersion,
Nonce = "nonce",
WorkerProcessId = 1234,
WorkerVersion = "test-worker",
},
};
}
private static byte[] CreateFrame(IMessage message)
{
return CreateFrame(message.ToByteArray());
}
private static byte[] CreateFrame(byte[] payload)
{
byte[] frame = new byte[sizeof(uint) + payload.Length];
BinaryPrimitives.WriteUInt32LittleEndian(frame.AsSpan(0, sizeof(uint)), (uint)payload.Length);
payload.CopyTo(frame.AsSpan(sizeof(uint)));
return frame;
}
private sealed class ChunkedReadStream : MemoryStream
{
private readonly int _chunkSize;
public ChunkedReadStream(
byte[] buffer,
int chunkSize)
: base(buffer)
{
_chunkSize = chunkSize;
}
public int ReadCallCount { get; private set; }
public override ValueTask<int> ReadAsync(
Memory<byte> buffer,
CancellationToken cancellationToken = default)
{
ReadCallCount++;
int requestedCount = Math.Min(buffer.Length, _chunkSize);
return base.ReadAsync(buffer[..requestedCount], cancellationToken);
}
}
}