Issue #9: implement worker frame protocol
This commit is contained in:
@@ -0,0 +1,54 @@
|
|||||||
|
# Worker Frame Protocol
|
||||||
|
|
||||||
|
The gateway uses the worker frame protocol to move `WorkerEnvelope` protobuf
|
||||||
|
messages over a bidirectional named pipe. The frame layer is deliberately small:
|
||||||
|
it handles message boundaries, size limits, protobuf parsing, and envelope
|
||||||
|
validation before higher-level worker client code routes commands, replies,
|
||||||
|
events, and faults.
|
||||||
|
|
||||||
|
## Frame Format
|
||||||
|
|
||||||
|
Each frame starts with a four-byte little-endian unsigned payload length,
|
||||||
|
followed by the serialized `WorkerEnvelope` payload:
|
||||||
|
|
||||||
|
```text
|
||||||
|
uint32 little-endian payload_length
|
||||||
|
payload_length bytes protobuf WorkerEnvelope
|
||||||
|
```
|
||||||
|
|
||||||
|
The reader rejects zero-length payloads and payloads larger than the configured
|
||||||
|
maximum before allocating the payload buffer. The default maximum is 16 MiB,
|
||||||
|
matching the gateway process design.
|
||||||
|
|
||||||
|
## Envelope Validation
|
||||||
|
|
||||||
|
`WorkerFrameReader` and `WorkerFrameWriter` validate each envelope against the
|
||||||
|
owning session before returning or writing it:
|
||||||
|
|
||||||
|
- `protocol_version` must match the configured worker protocol version,
|
||||||
|
- `session_id` must match the owning gateway session,
|
||||||
|
- the envelope must contain one typed `body` value.
|
||||||
|
|
||||||
|
Protocol violations throw `WorkerFrameProtocolException` with a
|
||||||
|
`WorkerFrameProtocolErrorCode` so callers can distinguish malformed frames,
|
||||||
|
oversized frames, protocol version mismatches, and session mismatches.
|
||||||
|
|
||||||
|
## Verification
|
||||||
|
|
||||||
|
Run the focused tests after changing the frame protocol:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
dotnet test src/MxGateway.Tests/MxGateway.Tests.csproj --filter WorkerFrameProtocolTests
|
||||||
|
```
|
||||||
|
|
||||||
|
Run the gateway build because the frame protocol is part of
|
||||||
|
`MxGateway.Server`:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
dotnet build src/MxGateway.Server/MxGateway.Server.csproj
|
||||||
|
```
|
||||||
|
|
||||||
|
## Related Documentation
|
||||||
|
|
||||||
|
- [Gateway Process Detailed Design](./gateway-process-design.md)
|
||||||
|
- [Protobuf Contracts](./Contracts.md)
|
||||||
@@ -45,6 +45,8 @@ Detailed follow-up docs:
|
|||||||
- `docs/gateway-process-design.md` covers the .NET 10 gateway process,
|
- `docs/gateway-process-design.md` covers the .NET 10 gateway process,
|
||||||
session manager, worker supervision, gRPC API, event streaming, fault model,
|
session manager, worker supervision, gRPC API, event streaming, fault model,
|
||||||
security, observability, and test strategy.
|
security, observability, and test strategy.
|
||||||
|
- `docs/WorkerFrameProtocol.md` covers the gateway-side named-pipe frame
|
||||||
|
reader/writer and `WorkerEnvelope` validation rules.
|
||||||
- `docs/mxaccess-worker-instance-design.md` covers each .NET Framework 4.8 x86
|
- `docs/mxaccess-worker-instance-design.md` covers each .NET Framework 4.8 x86
|
||||||
MXAccess worker instance, including STA ownership, message pumping, COM
|
MXAccess worker instance, including STA ownership, message pumping, COM
|
||||||
lifetime, command dispatch, event sinks, conversion, and shutdown.
|
lifetime, command dispatch, event sinks, conversion, and shutdown.
|
||||||
|
|||||||
@@ -0,0 +1,32 @@
|
|||||||
|
using MxGateway.Contracts.Proto;
|
||||||
|
|
||||||
|
namespace MxGateway.Server.Workers;
|
||||||
|
|
||||||
|
internal static class WorkerEnvelopeValidator
|
||||||
|
{
|
||||||
|
public static void Validate(
|
||||||
|
WorkerEnvelope envelope,
|
||||||
|
WorkerFrameProtocolOptions options)
|
||||||
|
{
|
||||||
|
if (envelope.ProtocolVersion != options.ProtocolVersion)
|
||||||
|
{
|
||||||
|
throw new WorkerFrameProtocolException(
|
||||||
|
WorkerFrameProtocolErrorCode.ProtocolVersionMismatch,
|
||||||
|
$"Worker envelope protocol version {envelope.ProtocolVersion} does not match expected version {options.ProtocolVersion}.");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!string.Equals(envelope.SessionId, options.SessionId, StringComparison.Ordinal))
|
||||||
|
{
|
||||||
|
throw new WorkerFrameProtocolException(
|
||||||
|
WorkerFrameProtocolErrorCode.SessionMismatch,
|
||||||
|
"Worker envelope session id does not match the owning gateway session.");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (envelope.BodyCase == WorkerEnvelope.BodyOneofCase.None)
|
||||||
|
{
|
||||||
|
throw new WorkerFrameProtocolException(
|
||||||
|
WorkerFrameProtocolErrorCode.InvalidEnvelope,
|
||||||
|
"Worker envelope must include a typed body.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,13 @@
|
|||||||
|
namespace MxGateway.Server.Workers;
|
||||||
|
|
||||||
|
public enum WorkerFrameProtocolErrorCode
|
||||||
|
{
|
||||||
|
Unknown = 0,
|
||||||
|
InvalidConfiguration = 1,
|
||||||
|
EndOfStream = 2,
|
||||||
|
MalformedLength = 3,
|
||||||
|
MessageTooLarge = 4,
|
||||||
|
InvalidEnvelope = 5,
|
||||||
|
ProtocolVersionMismatch = 6,
|
||||||
|
SessionMismatch = 7,
|
||||||
|
}
|
||||||
@@ -0,0 +1,23 @@
|
|||||||
|
namespace MxGateway.Server.Workers;
|
||||||
|
|
||||||
|
public sealed class WorkerFrameProtocolException : Exception
|
||||||
|
{
|
||||||
|
public WorkerFrameProtocolException(
|
||||||
|
WorkerFrameProtocolErrorCode errorCode,
|
||||||
|
string message)
|
||||||
|
: base(message)
|
||||||
|
{
|
||||||
|
ErrorCode = errorCode;
|
||||||
|
}
|
||||||
|
|
||||||
|
public WorkerFrameProtocolException(
|
||||||
|
WorkerFrameProtocolErrorCode errorCode,
|
||||||
|
string message,
|
||||||
|
Exception innerException)
|
||||||
|
: base(message, innerException)
|
||||||
|
{
|
||||||
|
ErrorCode = errorCode;
|
||||||
|
}
|
||||||
|
|
||||||
|
public WorkerFrameProtocolErrorCode ErrorCode { get; }
|
||||||
|
}
|
||||||
@@ -0,0 +1,53 @@
|
|||||||
|
using MxGateway.Contracts;
|
||||||
|
|
||||||
|
namespace MxGateway.Server.Workers;
|
||||||
|
|
||||||
|
public sealed class WorkerFrameProtocolOptions
|
||||||
|
{
|
||||||
|
public const int DefaultMaxMessageBytes = 16 * 1024 * 1024;
|
||||||
|
|
||||||
|
public WorkerFrameProtocolOptions(string sessionId)
|
||||||
|
: this(
|
||||||
|
sessionId,
|
||||||
|
GatewayContractInfo.WorkerProtocolVersion,
|
||||||
|
DefaultMaxMessageBytes)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
public WorkerFrameProtocolOptions(
|
||||||
|
string sessionId,
|
||||||
|
uint protocolVersion,
|
||||||
|
int maxMessageBytes)
|
||||||
|
{
|
||||||
|
if (string.IsNullOrWhiteSpace(sessionId))
|
||||||
|
{
|
||||||
|
throw new WorkerFrameProtocolException(
|
||||||
|
WorkerFrameProtocolErrorCode.InvalidConfiguration,
|
||||||
|
"Worker frame protocol requires a session id.");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (protocolVersion == 0)
|
||||||
|
{
|
||||||
|
throw new WorkerFrameProtocolException(
|
||||||
|
WorkerFrameProtocolErrorCode.InvalidConfiguration,
|
||||||
|
"Worker frame protocol requires a non-zero protocol version.");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (maxMessageBytes <= 0)
|
||||||
|
{
|
||||||
|
throw new WorkerFrameProtocolException(
|
||||||
|
WorkerFrameProtocolErrorCode.InvalidConfiguration,
|
||||||
|
"Worker frame protocol max message size must be greater than zero.");
|
||||||
|
}
|
||||||
|
|
||||||
|
SessionId = sessionId;
|
||||||
|
ProtocolVersion = protocolVersion;
|
||||||
|
MaxMessageBytes = maxMessageBytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public string SessionId { get; }
|
||||||
|
|
||||||
|
public uint ProtocolVersion { get; }
|
||||||
|
|
||||||
|
public int MaxMessageBytes { get; }
|
||||||
|
}
|
||||||
@@ -0,0 +1,77 @@
|
|||||||
|
using System.Buffers.Binary;
|
||||||
|
using Google.Protobuf;
|
||||||
|
using MxGateway.Contracts.Proto;
|
||||||
|
|
||||||
|
namespace MxGateway.Server.Workers;
|
||||||
|
|
||||||
|
public sealed class WorkerFrameReader
|
||||||
|
{
|
||||||
|
private readonly WorkerFrameProtocolOptions _options;
|
||||||
|
private readonly Stream _stream;
|
||||||
|
|
||||||
|
public WorkerFrameReader(
|
||||||
|
Stream stream,
|
||||||
|
WorkerFrameProtocolOptions options)
|
||||||
|
{
|
||||||
|
_stream = stream ?? throw new ArgumentNullException(nameof(stream));
|
||||||
|
_options = options ?? throw new ArgumentNullException(nameof(options));
|
||||||
|
}
|
||||||
|
|
||||||
|
public async ValueTask<WorkerEnvelope> ReadAsync(CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
byte[] lengthPrefix = new byte[sizeof(uint)];
|
||||||
|
await ReadExactlyOrThrowAsync(lengthPrefix, cancellationToken).ConfigureAwait(false);
|
||||||
|
|
||||||
|
uint payloadLength = BinaryPrimitives.ReadUInt32LittleEndian(lengthPrefix);
|
||||||
|
if (payloadLength == 0)
|
||||||
|
{
|
||||||
|
throw new WorkerFrameProtocolException(
|
||||||
|
WorkerFrameProtocolErrorCode.MalformedLength,
|
||||||
|
"Worker frame payload length must be greater than zero.");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (payloadLength > _options.MaxMessageBytes)
|
||||||
|
{
|
||||||
|
throw new WorkerFrameProtocolException(
|
||||||
|
WorkerFrameProtocolErrorCode.MessageTooLarge,
|
||||||
|
$"Worker frame payload length {payloadLength} exceeds the configured maximum of {_options.MaxMessageBytes} bytes.");
|
||||||
|
}
|
||||||
|
|
||||||
|
byte[] payload = new byte[payloadLength];
|
||||||
|
await ReadExactlyOrThrowAsync(payload, cancellationToken).ConfigureAwait(false);
|
||||||
|
|
||||||
|
WorkerEnvelope envelope;
|
||||||
|
try
|
||||||
|
{
|
||||||
|
envelope = WorkerEnvelope.Parser.ParseFrom(payload);
|
||||||
|
}
|
||||||
|
catch (InvalidProtocolBufferException exception)
|
||||||
|
{
|
||||||
|
throw new WorkerFrameProtocolException(
|
||||||
|
WorkerFrameProtocolErrorCode.InvalidEnvelope,
|
||||||
|
"Worker frame payload is not a valid WorkerEnvelope protobuf message.",
|
||||||
|
exception);
|
||||||
|
}
|
||||||
|
|
||||||
|
WorkerEnvelopeValidator.Validate(envelope, _options);
|
||||||
|
|
||||||
|
return envelope;
|
||||||
|
}
|
||||||
|
|
||||||
|
private async ValueTask ReadExactlyOrThrowAsync(
|
||||||
|
Memory<byte> buffer,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await _stream.ReadExactlyAsync(buffer, cancellationToken).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
catch (EndOfStreamException exception)
|
||||||
|
{
|
||||||
|
throw new WorkerFrameProtocolException(
|
||||||
|
WorkerFrameProtocolErrorCode.EndOfStream,
|
||||||
|
"Worker frame ended before the expected number of bytes were read.",
|
||||||
|
exception);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,48 @@
|
|||||||
|
using System.Buffers.Binary;
|
||||||
|
using Google.Protobuf;
|
||||||
|
using MxGateway.Contracts.Proto;
|
||||||
|
|
||||||
|
namespace MxGateway.Server.Workers;
|
||||||
|
|
||||||
|
public sealed class WorkerFrameWriter
|
||||||
|
{
|
||||||
|
private readonly WorkerFrameProtocolOptions _options;
|
||||||
|
private readonly Stream _stream;
|
||||||
|
|
||||||
|
public WorkerFrameWriter(
|
||||||
|
Stream stream,
|
||||||
|
WorkerFrameProtocolOptions options)
|
||||||
|
{
|
||||||
|
_stream = stream ?? throw new ArgumentNullException(nameof(stream));
|
||||||
|
_options = options ?? throw new ArgumentNullException(nameof(options));
|
||||||
|
}
|
||||||
|
|
||||||
|
public async ValueTask WriteAsync(
|
||||||
|
WorkerEnvelope envelope,
|
||||||
|
CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
ArgumentNullException.ThrowIfNull(envelope);
|
||||||
|
WorkerEnvelopeValidator.Validate(envelope, _options);
|
||||||
|
|
||||||
|
int payloadLength = envelope.CalculateSize();
|
||||||
|
if (payloadLength == 0)
|
||||||
|
{
|
||||||
|
throw new WorkerFrameProtocolException(
|
||||||
|
WorkerFrameProtocolErrorCode.InvalidEnvelope,
|
||||||
|
"Worker envelope cannot serialize to an empty payload.");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (payloadLength > _options.MaxMessageBytes)
|
||||||
|
{
|
||||||
|
throw new WorkerFrameProtocolException(
|
||||||
|
WorkerFrameProtocolErrorCode.MessageTooLarge,
|
||||||
|
$"Worker envelope payload length {payloadLength} exceeds the configured maximum of {_options.MaxMessageBytes} bytes.");
|
||||||
|
}
|
||||||
|
|
||||||
|
byte[] lengthPrefix = new byte[sizeof(uint)];
|
||||||
|
BinaryPrimitives.WriteUInt32LittleEndian(lengthPrefix, (uint)payloadLength);
|
||||||
|
|
||||||
|
await _stream.WriteAsync(lengthPrefix, cancellationToken).ConfigureAwait(false);
|
||||||
|
await _stream.WriteAsync(envelope.ToByteArray(), cancellationToken).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,209 @@
|
|||||||
|
using System.Buffers.Binary;
|
||||||
|
using Google.Protobuf;
|
||||||
|
using MxGateway.Contracts;
|
||||||
|
using MxGateway.Contracts.Proto;
|
||||||
|
using MxGateway.Server.Workers;
|
||||||
|
|
||||||
|
namespace MxGateway.Tests.Gateway.Workers;
|
||||||
|
|
||||||
|
public sealed class WorkerFrameProtocolTests
|
||||||
|
{
|
||||||
|
private const string SessionId = "session-1";
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task WriteAndReadAsync_WithValidEnvelope_RoundTripsFrame()
|
||||||
|
{
|
||||||
|
WorkerFrameProtocolOptions options = new(SessionId);
|
||||||
|
await using MemoryStream stream = new();
|
||||||
|
WorkerEnvelope original = CreateEnvelope();
|
||||||
|
|
||||||
|
WorkerFrameWriter writer = new(stream, options);
|
||||||
|
await writer.WriteAsync(original);
|
||||||
|
stream.Position = 0;
|
||||||
|
|
||||||
|
WorkerFrameReader reader = new(stream, options);
|
||||||
|
WorkerEnvelope parsed = await reader.ReadAsync();
|
||||||
|
|
||||||
|
Assert.Equal(original, parsed);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task ReadAsync_WithPartialReads_ReassemblesFrame()
|
||||||
|
{
|
||||||
|
WorkerFrameProtocolOptions options = new(SessionId);
|
||||||
|
WorkerEnvelope original = CreateEnvelope();
|
||||||
|
byte[] frame = CreateFrame(original);
|
||||||
|
await using ChunkedReadStream stream = new(frame, chunkSize: 2);
|
||||||
|
|
||||||
|
WorkerFrameReader reader = new(stream, options);
|
||||||
|
WorkerEnvelope parsed = await reader.ReadAsync();
|
||||||
|
|
||||||
|
Assert.Equal(original, parsed);
|
||||||
|
Assert.True(stream.ReadCallCount > 2);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task ReadAsync_WithZeroLengthFrame_ThrowsMalformedLength()
|
||||||
|
{
|
||||||
|
WorkerFrameProtocolOptions options = new(SessionId);
|
||||||
|
await using MemoryStream stream = new(new byte[sizeof(uint)]);
|
||||||
|
|
||||||
|
WorkerFrameReader reader = new(stream, options);
|
||||||
|
WorkerFrameProtocolException exception =
|
||||||
|
await Assert.ThrowsAsync<WorkerFrameProtocolException>(
|
||||||
|
async () => await reader.ReadAsync());
|
||||||
|
|
||||||
|
Assert.Equal(WorkerFrameProtocolErrorCode.MalformedLength, exception.ErrorCode);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task ReadAsync_WithOversizedLength_ThrowsBeforePayloadAllocation()
|
||||||
|
{
|
||||||
|
WorkerFrameProtocolOptions options = new(SessionId, GatewayContractInfo.WorkerProtocolVersion, maxMessageBytes: 16);
|
||||||
|
byte[] lengthPrefix = new byte[sizeof(uint)];
|
||||||
|
BinaryPrimitives.WriteUInt32LittleEndian(lengthPrefix, 17);
|
||||||
|
await using MemoryStream stream = new(lengthPrefix);
|
||||||
|
|
||||||
|
WorkerFrameReader reader = new(stream, options);
|
||||||
|
WorkerFrameProtocolException exception =
|
||||||
|
await Assert.ThrowsAsync<WorkerFrameProtocolException>(
|
||||||
|
async () => await reader.ReadAsync());
|
||||||
|
|
||||||
|
Assert.Equal(WorkerFrameProtocolErrorCode.MessageTooLarge, exception.ErrorCode);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task ReadAsync_WithWrongProtocolVersion_ThrowsProtocolVersionMismatch()
|
||||||
|
{
|
||||||
|
WorkerFrameProtocolOptions options = new(SessionId);
|
||||||
|
WorkerEnvelope envelope = CreateEnvelope();
|
||||||
|
envelope.ProtocolVersion++;
|
||||||
|
await using MemoryStream stream = new(CreateFrame(envelope));
|
||||||
|
|
||||||
|
WorkerFrameReader reader = new(stream, options);
|
||||||
|
WorkerFrameProtocolException exception =
|
||||||
|
await Assert.ThrowsAsync<WorkerFrameProtocolException>(
|
||||||
|
async () => await reader.ReadAsync());
|
||||||
|
|
||||||
|
Assert.Equal(WorkerFrameProtocolErrorCode.ProtocolVersionMismatch, exception.ErrorCode);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task ReadAsync_WithWrongSessionId_ThrowsSessionMismatch()
|
||||||
|
{
|
||||||
|
WorkerFrameProtocolOptions options = new(SessionId);
|
||||||
|
WorkerEnvelope envelope = CreateEnvelope();
|
||||||
|
envelope.SessionId = "different-session";
|
||||||
|
await using MemoryStream stream = new(CreateFrame(envelope));
|
||||||
|
|
||||||
|
WorkerFrameReader reader = new(stream, options);
|
||||||
|
WorkerFrameProtocolException exception =
|
||||||
|
await Assert.ThrowsAsync<WorkerFrameProtocolException>(
|
||||||
|
async () => await reader.ReadAsync());
|
||||||
|
|
||||||
|
Assert.Equal(WorkerFrameProtocolErrorCode.SessionMismatch, exception.ErrorCode);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task ReadAsync_WithMalformedPayload_ThrowsInvalidEnvelope()
|
||||||
|
{
|
||||||
|
WorkerFrameProtocolOptions options = new(SessionId);
|
||||||
|
byte[] frame = CreateFrame([0x80]);
|
||||||
|
await using MemoryStream stream = new(frame);
|
||||||
|
|
||||||
|
WorkerFrameReader reader = new(stream, options);
|
||||||
|
WorkerFrameProtocolException exception =
|
||||||
|
await Assert.ThrowsAsync<WorkerFrameProtocolException>(
|
||||||
|
async () => await reader.ReadAsync());
|
||||||
|
|
||||||
|
Assert.Equal(WorkerFrameProtocolErrorCode.InvalidEnvelope, exception.ErrorCode);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task ReadAsync_WithMissingEnvelopeBody_ThrowsInvalidEnvelope()
|
||||||
|
{
|
||||||
|
WorkerFrameProtocolOptions options = new(SessionId);
|
||||||
|
WorkerEnvelope envelope = CreateEnvelope();
|
||||||
|
envelope.ClearBody();
|
||||||
|
await using MemoryStream stream = new(CreateFrame(envelope));
|
||||||
|
|
||||||
|
WorkerFrameReader reader = new(stream, options);
|
||||||
|
WorkerFrameProtocolException exception =
|
||||||
|
await Assert.ThrowsAsync<WorkerFrameProtocolException>(
|
||||||
|
async () => await reader.ReadAsync());
|
||||||
|
|
||||||
|
Assert.Equal(WorkerFrameProtocolErrorCode.InvalidEnvelope, exception.ErrorCode);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task WriteAsync_WithOversizedEnvelope_ThrowsMessageTooLarge()
|
||||||
|
{
|
||||||
|
WorkerFrameProtocolOptions options = new(SessionId, GatewayContractInfo.WorkerProtocolVersion, maxMessageBytes: 8);
|
||||||
|
await using MemoryStream stream = new();
|
||||||
|
|
||||||
|
WorkerFrameWriter writer = new(stream, options);
|
||||||
|
WorkerFrameProtocolException exception =
|
||||||
|
await Assert.ThrowsAsync<WorkerFrameProtocolException>(
|
||||||
|
async () => await writer.WriteAsync(CreateEnvelope()));
|
||||||
|
|
||||||
|
Assert.Equal(WorkerFrameProtocolErrorCode.MessageTooLarge, exception.ErrorCode);
|
||||||
|
Assert.Equal(0, stream.Length);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static WorkerEnvelope CreateEnvelope()
|
||||||
|
{
|
||||||
|
return new WorkerEnvelope
|
||||||
|
{
|
||||||
|
ProtocolVersion = GatewayContractInfo.WorkerProtocolVersion,
|
||||||
|
SessionId = SessionId,
|
||||||
|
Sequence = 1,
|
||||||
|
CorrelationId = "correlation-1",
|
||||||
|
WorkerHello = new WorkerHello
|
||||||
|
{
|
||||||
|
ProtocolVersion = GatewayContractInfo.WorkerProtocolVersion,
|
||||||
|
Nonce = "nonce",
|
||||||
|
WorkerProcessId = 1234,
|
||||||
|
WorkerVersion = "test-worker",
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
private static byte[] CreateFrame(IMessage message)
|
||||||
|
{
|
||||||
|
return CreateFrame(message.ToByteArray());
|
||||||
|
}
|
||||||
|
|
||||||
|
private static byte[] CreateFrame(byte[] payload)
|
||||||
|
{
|
||||||
|
byte[] frame = new byte[sizeof(uint) + payload.Length];
|
||||||
|
BinaryPrimitives.WriteUInt32LittleEndian(frame.AsSpan(0, sizeof(uint)), (uint)payload.Length);
|
||||||
|
payload.CopyTo(frame.AsSpan(sizeof(uint)));
|
||||||
|
|
||||||
|
return frame;
|
||||||
|
}
|
||||||
|
|
||||||
|
private sealed class ChunkedReadStream : MemoryStream
|
||||||
|
{
|
||||||
|
private readonly int _chunkSize;
|
||||||
|
|
||||||
|
public ChunkedReadStream(
|
||||||
|
byte[] buffer,
|
||||||
|
int chunkSize)
|
||||||
|
: base(buffer)
|
||||||
|
{
|
||||||
|
_chunkSize = chunkSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int ReadCallCount { get; private set; }
|
||||||
|
|
||||||
|
public override ValueTask<int> ReadAsync(
|
||||||
|
Memory<byte> buffer,
|
||||||
|
CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
ReadCallCount++;
|
||||||
|
int requestedCount = Math.Min(buffer.Length, _chunkSize);
|
||||||
|
|
||||||
|
return base.ReadAsync(buffer[..requestedCount], cancellationToken);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user