diff --git a/docs/WorkerFrameProtocol.md b/docs/WorkerFrameProtocol.md new file mode 100644 index 0000000..7a14a12 --- /dev/null +++ b/docs/WorkerFrameProtocol.md @@ -0,0 +1,54 @@ +# Worker Frame Protocol + +The gateway uses the worker frame protocol to move `WorkerEnvelope` protobuf +messages over a bidirectional named pipe. The frame layer is deliberately small: +it handles message boundaries, size limits, protobuf parsing, and envelope +validation before higher-level worker client code routes commands, replies, +events, and faults. + +## Frame Format + +Each frame starts with a four-byte little-endian unsigned payload length, +followed by the serialized `WorkerEnvelope` payload: + +```text +uint32 little-endian payload_length +payload_length bytes protobuf WorkerEnvelope +``` + +The reader rejects zero-length payloads and payloads larger than the configured +maximum before allocating the payload buffer. The default maximum is 16 MiB, +matching the gateway process design. + +## Envelope Validation + +`WorkerFrameReader` and `WorkerFrameWriter` validate each envelope against the +owning session before returning or writing it: + +- `protocol_version` must match the configured worker protocol version, +- `session_id` must match the owning gateway session, +- the envelope must contain one typed `body` value. + +Protocol violations throw `WorkerFrameProtocolException` with a +`WorkerFrameProtocolErrorCode` so callers can distinguish malformed frames, +oversized frames, protocol version mismatches, and session mismatches. + +## Verification + +Run the focused tests after changing the frame protocol: + +```bash +dotnet test src/MxGateway.Tests/MxGateway.Tests.csproj --filter WorkerFrameProtocolTests +``` + +Run the gateway build because the frame protocol is part of +`MxGateway.Server`: + +```bash +dotnet build src/MxGateway.Server/MxGateway.Server.csproj +``` + +## Related Documentation + +- [Gateway Process Detailed Design](./gateway-process-design.md) +- [Protobuf Contracts](./Contracts.md) diff --git a/gateway.md b/gateway.md index 54778b3..8498995 100644 --- a/gateway.md +++ b/gateway.md @@ -45,6 +45,8 @@ Detailed follow-up docs: - `docs/gateway-process-design.md` covers the .NET 10 gateway process, session manager, worker supervision, gRPC API, event streaming, fault model, security, observability, and test strategy. +- `docs/WorkerFrameProtocol.md` covers the gateway-side named-pipe frame + reader/writer and `WorkerEnvelope` validation rules. - `docs/mxaccess-worker-instance-design.md` covers each .NET Framework 4.8 x86 MXAccess worker instance, including STA ownership, message pumping, COM lifetime, command dispatch, event sinks, conversion, and shutdown. diff --git a/src/MxGateway.Server/Workers/WorkerEnvelopeValidator.cs b/src/MxGateway.Server/Workers/WorkerEnvelopeValidator.cs new file mode 100644 index 0000000..9b2b4e0 --- /dev/null +++ b/src/MxGateway.Server/Workers/WorkerEnvelopeValidator.cs @@ -0,0 +1,32 @@ +using MxGateway.Contracts.Proto; + +namespace MxGateway.Server.Workers; + +internal static class WorkerEnvelopeValidator +{ + public static void Validate( + WorkerEnvelope envelope, + WorkerFrameProtocolOptions options) + { + if (envelope.ProtocolVersion != options.ProtocolVersion) + { + throw new WorkerFrameProtocolException( + WorkerFrameProtocolErrorCode.ProtocolVersionMismatch, + $"Worker envelope protocol version {envelope.ProtocolVersion} does not match expected version {options.ProtocolVersion}."); + } + + if (!string.Equals(envelope.SessionId, options.SessionId, StringComparison.Ordinal)) + { + throw new WorkerFrameProtocolException( + WorkerFrameProtocolErrorCode.SessionMismatch, + "Worker envelope session id does not match the owning gateway session."); + } + + if (envelope.BodyCase == WorkerEnvelope.BodyOneofCase.None) + { + throw new WorkerFrameProtocolException( + WorkerFrameProtocolErrorCode.InvalidEnvelope, + "Worker envelope must include a typed body."); + } + } +} diff --git a/src/MxGateway.Server/Workers/WorkerFrameProtocolErrorCode.cs b/src/MxGateway.Server/Workers/WorkerFrameProtocolErrorCode.cs new file mode 100644 index 0000000..904dc29 --- /dev/null +++ b/src/MxGateway.Server/Workers/WorkerFrameProtocolErrorCode.cs @@ -0,0 +1,13 @@ +namespace MxGateway.Server.Workers; + +public enum WorkerFrameProtocolErrorCode +{ + Unknown = 0, + InvalidConfiguration = 1, + EndOfStream = 2, + MalformedLength = 3, + MessageTooLarge = 4, + InvalidEnvelope = 5, + ProtocolVersionMismatch = 6, + SessionMismatch = 7, +} diff --git a/src/MxGateway.Server/Workers/WorkerFrameProtocolException.cs b/src/MxGateway.Server/Workers/WorkerFrameProtocolException.cs new file mode 100644 index 0000000..0dbfece --- /dev/null +++ b/src/MxGateway.Server/Workers/WorkerFrameProtocolException.cs @@ -0,0 +1,23 @@ +namespace MxGateway.Server.Workers; + +public sealed class WorkerFrameProtocolException : Exception +{ + public WorkerFrameProtocolException( + WorkerFrameProtocolErrorCode errorCode, + string message) + : base(message) + { + ErrorCode = errorCode; + } + + public WorkerFrameProtocolException( + WorkerFrameProtocolErrorCode errorCode, + string message, + Exception innerException) + : base(message, innerException) + { + ErrorCode = errorCode; + } + + public WorkerFrameProtocolErrorCode ErrorCode { get; } +} diff --git a/src/MxGateway.Server/Workers/WorkerFrameProtocolOptions.cs b/src/MxGateway.Server/Workers/WorkerFrameProtocolOptions.cs new file mode 100644 index 0000000..f7034d2 --- /dev/null +++ b/src/MxGateway.Server/Workers/WorkerFrameProtocolOptions.cs @@ -0,0 +1,53 @@ +using MxGateway.Contracts; + +namespace MxGateway.Server.Workers; + +public sealed class WorkerFrameProtocolOptions +{ + public const int DefaultMaxMessageBytes = 16 * 1024 * 1024; + + public WorkerFrameProtocolOptions(string sessionId) + : this( + sessionId, + GatewayContractInfo.WorkerProtocolVersion, + DefaultMaxMessageBytes) + { + } + + public WorkerFrameProtocolOptions( + string sessionId, + uint protocolVersion, + int maxMessageBytes) + { + if (string.IsNullOrWhiteSpace(sessionId)) + { + throw new WorkerFrameProtocolException( + WorkerFrameProtocolErrorCode.InvalidConfiguration, + "Worker frame protocol requires a session id."); + } + + if (protocolVersion == 0) + { + throw new WorkerFrameProtocolException( + WorkerFrameProtocolErrorCode.InvalidConfiguration, + "Worker frame protocol requires a non-zero protocol version."); + } + + if (maxMessageBytes <= 0) + { + throw new WorkerFrameProtocolException( + WorkerFrameProtocolErrorCode.InvalidConfiguration, + "Worker frame protocol max message size must be greater than zero."); + } + + SessionId = sessionId; + ProtocolVersion = protocolVersion; + MaxMessageBytes = maxMessageBytes; + } + + public string SessionId { get; } + + public uint ProtocolVersion { get; } + + public int MaxMessageBytes { get; } +} diff --git a/src/MxGateway.Server/Workers/WorkerFrameReader.cs b/src/MxGateway.Server/Workers/WorkerFrameReader.cs new file mode 100644 index 0000000..360d515 --- /dev/null +++ b/src/MxGateway.Server/Workers/WorkerFrameReader.cs @@ -0,0 +1,77 @@ +using System.Buffers.Binary; +using Google.Protobuf; +using MxGateway.Contracts.Proto; + +namespace MxGateway.Server.Workers; + +public sealed class WorkerFrameReader +{ + private readonly WorkerFrameProtocolOptions _options; + private readonly Stream _stream; + + public WorkerFrameReader( + Stream stream, + WorkerFrameProtocolOptions options) + { + _stream = stream ?? throw new ArgumentNullException(nameof(stream)); + _options = options ?? throw new ArgumentNullException(nameof(options)); + } + + public async ValueTask ReadAsync(CancellationToken cancellationToken = default) + { + byte[] lengthPrefix = new byte[sizeof(uint)]; + await ReadExactlyOrThrowAsync(lengthPrefix, cancellationToken).ConfigureAwait(false); + + uint payloadLength = BinaryPrimitives.ReadUInt32LittleEndian(lengthPrefix); + if (payloadLength == 0) + { + throw new WorkerFrameProtocolException( + WorkerFrameProtocolErrorCode.MalformedLength, + "Worker frame payload length must be greater than zero."); + } + + if (payloadLength > _options.MaxMessageBytes) + { + throw new WorkerFrameProtocolException( + WorkerFrameProtocolErrorCode.MessageTooLarge, + $"Worker frame payload length {payloadLength} exceeds the configured maximum of {_options.MaxMessageBytes} bytes."); + } + + byte[] payload = new byte[payloadLength]; + await ReadExactlyOrThrowAsync(payload, cancellationToken).ConfigureAwait(false); + + WorkerEnvelope envelope; + try + { + envelope = WorkerEnvelope.Parser.ParseFrom(payload); + } + catch (InvalidProtocolBufferException exception) + { + throw new WorkerFrameProtocolException( + WorkerFrameProtocolErrorCode.InvalidEnvelope, + "Worker frame payload is not a valid WorkerEnvelope protobuf message.", + exception); + } + + WorkerEnvelopeValidator.Validate(envelope, _options); + + return envelope; + } + + private async ValueTask ReadExactlyOrThrowAsync( + Memory buffer, + CancellationToken cancellationToken) + { + try + { + await _stream.ReadExactlyAsync(buffer, cancellationToken).ConfigureAwait(false); + } + catch (EndOfStreamException exception) + { + throw new WorkerFrameProtocolException( + WorkerFrameProtocolErrorCode.EndOfStream, + "Worker frame ended before the expected number of bytes were read.", + exception); + } + } +} diff --git a/src/MxGateway.Server/Workers/WorkerFrameWriter.cs b/src/MxGateway.Server/Workers/WorkerFrameWriter.cs new file mode 100644 index 0000000..bf18959 --- /dev/null +++ b/src/MxGateway.Server/Workers/WorkerFrameWriter.cs @@ -0,0 +1,48 @@ +using System.Buffers.Binary; +using Google.Protobuf; +using MxGateway.Contracts.Proto; + +namespace MxGateway.Server.Workers; + +public sealed class WorkerFrameWriter +{ + private readonly WorkerFrameProtocolOptions _options; + private readonly Stream _stream; + + public WorkerFrameWriter( + Stream stream, + WorkerFrameProtocolOptions options) + { + _stream = stream ?? throw new ArgumentNullException(nameof(stream)); + _options = options ?? throw new ArgumentNullException(nameof(options)); + } + + public async ValueTask WriteAsync( + WorkerEnvelope envelope, + CancellationToken cancellationToken = default) + { + ArgumentNullException.ThrowIfNull(envelope); + WorkerEnvelopeValidator.Validate(envelope, _options); + + int payloadLength = envelope.CalculateSize(); + if (payloadLength == 0) + { + throw new WorkerFrameProtocolException( + WorkerFrameProtocolErrorCode.InvalidEnvelope, + "Worker envelope cannot serialize to an empty payload."); + } + + if (payloadLength > _options.MaxMessageBytes) + { + throw new WorkerFrameProtocolException( + WorkerFrameProtocolErrorCode.MessageTooLarge, + $"Worker envelope payload length {payloadLength} exceeds the configured maximum of {_options.MaxMessageBytes} bytes."); + } + + byte[] lengthPrefix = new byte[sizeof(uint)]; + BinaryPrimitives.WriteUInt32LittleEndian(lengthPrefix, (uint)payloadLength); + + await _stream.WriteAsync(lengthPrefix, cancellationToken).ConfigureAwait(false); + await _stream.WriteAsync(envelope.ToByteArray(), cancellationToken).ConfigureAwait(false); + } +} diff --git a/src/MxGateway.Tests/Gateway/Workers/WorkerFrameProtocolTests.cs b/src/MxGateway.Tests/Gateway/Workers/WorkerFrameProtocolTests.cs new file mode 100644 index 0000000..3122404 --- /dev/null +++ b/src/MxGateway.Tests/Gateway/Workers/WorkerFrameProtocolTests.cs @@ -0,0 +1,209 @@ +using System.Buffers.Binary; +using Google.Protobuf; +using MxGateway.Contracts; +using MxGateway.Contracts.Proto; +using MxGateway.Server.Workers; + +namespace MxGateway.Tests.Gateway.Workers; + +public sealed class WorkerFrameProtocolTests +{ + private const string SessionId = "session-1"; + + [Fact] + public async Task WriteAndReadAsync_WithValidEnvelope_RoundTripsFrame() + { + WorkerFrameProtocolOptions options = new(SessionId); + await using MemoryStream stream = new(); + WorkerEnvelope original = CreateEnvelope(); + + WorkerFrameWriter writer = new(stream, options); + await writer.WriteAsync(original); + stream.Position = 0; + + WorkerFrameReader reader = new(stream, options); + WorkerEnvelope parsed = await reader.ReadAsync(); + + Assert.Equal(original, parsed); + } + + [Fact] + public async Task ReadAsync_WithPartialReads_ReassemblesFrame() + { + WorkerFrameProtocolOptions options = new(SessionId); + WorkerEnvelope original = CreateEnvelope(); + byte[] frame = CreateFrame(original); + await using ChunkedReadStream stream = new(frame, chunkSize: 2); + + WorkerFrameReader reader = new(stream, options); + WorkerEnvelope parsed = await reader.ReadAsync(); + + Assert.Equal(original, parsed); + Assert.True(stream.ReadCallCount > 2); + } + + [Fact] + public async Task ReadAsync_WithZeroLengthFrame_ThrowsMalformedLength() + { + WorkerFrameProtocolOptions options = new(SessionId); + await using MemoryStream stream = new(new byte[sizeof(uint)]); + + WorkerFrameReader reader = new(stream, options); + WorkerFrameProtocolException exception = + await Assert.ThrowsAsync( + async () => await reader.ReadAsync()); + + Assert.Equal(WorkerFrameProtocolErrorCode.MalformedLength, exception.ErrorCode); + } + + [Fact] + public async Task ReadAsync_WithOversizedLength_ThrowsBeforePayloadAllocation() + { + WorkerFrameProtocolOptions options = new(SessionId, GatewayContractInfo.WorkerProtocolVersion, maxMessageBytes: 16); + byte[] lengthPrefix = new byte[sizeof(uint)]; + BinaryPrimitives.WriteUInt32LittleEndian(lengthPrefix, 17); + await using MemoryStream stream = new(lengthPrefix); + + WorkerFrameReader reader = new(stream, options); + WorkerFrameProtocolException exception = + await Assert.ThrowsAsync( + async () => await reader.ReadAsync()); + + Assert.Equal(WorkerFrameProtocolErrorCode.MessageTooLarge, exception.ErrorCode); + } + + [Fact] + public async Task ReadAsync_WithWrongProtocolVersion_ThrowsProtocolVersionMismatch() + { + WorkerFrameProtocolOptions options = new(SessionId); + WorkerEnvelope envelope = CreateEnvelope(); + envelope.ProtocolVersion++; + await using MemoryStream stream = new(CreateFrame(envelope)); + + WorkerFrameReader reader = new(stream, options); + WorkerFrameProtocolException exception = + await Assert.ThrowsAsync( + async () => await reader.ReadAsync()); + + Assert.Equal(WorkerFrameProtocolErrorCode.ProtocolVersionMismatch, exception.ErrorCode); + } + + [Fact] + public async Task ReadAsync_WithWrongSessionId_ThrowsSessionMismatch() + { + WorkerFrameProtocolOptions options = new(SessionId); + WorkerEnvelope envelope = CreateEnvelope(); + envelope.SessionId = "different-session"; + await using MemoryStream stream = new(CreateFrame(envelope)); + + WorkerFrameReader reader = new(stream, options); + WorkerFrameProtocolException exception = + await Assert.ThrowsAsync( + async () => await reader.ReadAsync()); + + Assert.Equal(WorkerFrameProtocolErrorCode.SessionMismatch, exception.ErrorCode); + } + + [Fact] + public async Task ReadAsync_WithMalformedPayload_ThrowsInvalidEnvelope() + { + WorkerFrameProtocolOptions options = new(SessionId); + byte[] frame = CreateFrame([0x80]); + await using MemoryStream stream = new(frame); + + WorkerFrameReader reader = new(stream, options); + WorkerFrameProtocolException exception = + await Assert.ThrowsAsync( + async () => await reader.ReadAsync()); + + Assert.Equal(WorkerFrameProtocolErrorCode.InvalidEnvelope, exception.ErrorCode); + } + + [Fact] + public async Task ReadAsync_WithMissingEnvelopeBody_ThrowsInvalidEnvelope() + { + WorkerFrameProtocolOptions options = new(SessionId); + WorkerEnvelope envelope = CreateEnvelope(); + envelope.ClearBody(); + await using MemoryStream stream = new(CreateFrame(envelope)); + + WorkerFrameReader reader = new(stream, options); + WorkerFrameProtocolException exception = + await Assert.ThrowsAsync( + async () => await reader.ReadAsync()); + + Assert.Equal(WorkerFrameProtocolErrorCode.InvalidEnvelope, exception.ErrorCode); + } + + [Fact] + public async Task WriteAsync_WithOversizedEnvelope_ThrowsMessageTooLarge() + { + WorkerFrameProtocolOptions options = new(SessionId, GatewayContractInfo.WorkerProtocolVersion, maxMessageBytes: 8); + await using MemoryStream stream = new(); + + WorkerFrameWriter writer = new(stream, options); + WorkerFrameProtocolException exception = + await Assert.ThrowsAsync( + async () => await writer.WriteAsync(CreateEnvelope())); + + Assert.Equal(WorkerFrameProtocolErrorCode.MessageTooLarge, exception.ErrorCode); + Assert.Equal(0, stream.Length); + } + + private static WorkerEnvelope CreateEnvelope() + { + return new WorkerEnvelope + { + ProtocolVersion = GatewayContractInfo.WorkerProtocolVersion, + SessionId = SessionId, + Sequence = 1, + CorrelationId = "correlation-1", + WorkerHello = new WorkerHello + { + ProtocolVersion = GatewayContractInfo.WorkerProtocolVersion, + Nonce = "nonce", + WorkerProcessId = 1234, + WorkerVersion = "test-worker", + }, + }; + } + + private static byte[] CreateFrame(IMessage message) + { + return CreateFrame(message.ToByteArray()); + } + + private static byte[] CreateFrame(byte[] payload) + { + byte[] frame = new byte[sizeof(uint) + payload.Length]; + BinaryPrimitives.WriteUInt32LittleEndian(frame.AsSpan(0, sizeof(uint)), (uint)payload.Length); + payload.CopyTo(frame.AsSpan(sizeof(uint))); + + return frame; + } + + private sealed class ChunkedReadStream : MemoryStream + { + private readonly int _chunkSize; + + public ChunkedReadStream( + byte[] buffer, + int chunkSize) + : base(buffer) + { + _chunkSize = chunkSize; + } + + public int ReadCallCount { get; private set; } + + public override ValueTask ReadAsync( + Memory buffer, + CancellationToken cancellationToken = default) + { + ReadCallCount++; + int requestedCount = Math.Min(buffer.Length, _chunkSize); + + return base.ReadAsync(buffer[..requestedCount], cancellationToken); + } + } +}