Issue #22: implement pipe client and frame protocol

This commit is contained in:
Joseph Doherty
2026-04-26 17:16:49 -04:00
parent 0b0be7098e
commit d5a982152b
15 changed files with 1148 additions and 3 deletions
@@ -7,4 +7,6 @@ public enum WorkerExitCode
InvalidArguments = 2,
InvalidProtocolVersion = 3,
MissingNonce = 4,
PipeConnectionFailed = 5,
ProtocolViolation = 6,
}
@@ -0,0 +1,12 @@
using System.Threading;
using System.Threading.Tasks;
using MxGateway.Worker.Bootstrap;
namespace MxGateway.Worker.Ipc;
public interface IWorkerPipeClient
{
Task RunAsync(
WorkerOptions options,
CancellationToken cancellationToken = default);
}
@@ -0,0 +1,33 @@
using System;
using MxGateway.Contracts.Proto;
namespace MxGateway.Worker.Ipc;
internal static class WorkerEnvelopeValidator
{
public static void Validate(
WorkerEnvelope envelope,
WorkerFrameProtocolOptions options)
{
if (envelope.ProtocolVersion != options.ProtocolVersion)
{
throw new WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode.ProtocolVersionMismatch,
$"Worker envelope protocol version {envelope.ProtocolVersion} does not match expected version {options.ProtocolVersion}.");
}
if (!string.Equals(envelope.SessionId, options.SessionId, StringComparison.Ordinal))
{
throw new WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode.SessionMismatch,
"Worker envelope session id does not match the owning worker session.");
}
if (envelope.BodyCase == WorkerEnvelope.BodyOneofCase.None)
{
throw new WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode.InvalidEnvelope,
"Worker envelope must include a typed body.");
}
}
}
@@ -0,0 +1,15 @@
namespace MxGateway.Worker.Ipc;
public enum WorkerFrameProtocolErrorCode
{
Unknown = 0,
InvalidConfiguration = 1,
EndOfStream = 2,
MalformedLength = 3,
MessageTooLarge = 4,
InvalidEnvelope = 5,
ProtocolVersionMismatch = 6,
SessionMismatch = 7,
NonceMismatch = 8,
UnexpectedEnvelopeBody = 9,
}
@@ -0,0 +1,25 @@
using System;
namespace MxGateway.Worker.Ipc;
public sealed class WorkerFrameProtocolException : Exception
{
public WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode errorCode,
string message)
: base(message)
{
ErrorCode = errorCode;
}
public WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode errorCode,
string message,
Exception innerException)
: base(message, innerException)
{
ErrorCode = errorCode;
}
public WorkerFrameProtocolErrorCode ErrorCode { get; }
}
@@ -0,0 +1,86 @@
using System;
using MxGateway.Contracts;
using MxGateway.Worker.Bootstrap;
namespace MxGateway.Worker.Ipc;
public sealed class WorkerFrameProtocolOptions
{
public const int DefaultMaxMessageBytes = 16 * 1024 * 1024;
public WorkerFrameProtocolOptions(WorkerOptions options)
: this(
options?.SessionId ?? throw new ArgumentNullException(nameof(options)),
options.ProtocolVersion,
options.Nonce,
DefaultMaxMessageBytes)
{
}
public WorkerFrameProtocolOptions(
string sessionId,
uint protocolVersion,
string nonce)
: this(
sessionId,
protocolVersion,
nonce,
DefaultMaxMessageBytes)
{
}
public WorkerFrameProtocolOptions(
string sessionId,
uint protocolVersion,
string nonce,
int maxMessageBytes)
{
if (string.IsNullOrWhiteSpace(sessionId))
{
throw new WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode.InvalidConfiguration,
"Worker frame protocol requires a session id.");
}
if (protocolVersion == 0)
{
throw new WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode.InvalidConfiguration,
"Worker frame protocol requires a non-zero protocol version.");
}
if (protocolVersion != GatewayContractInfo.WorkerProtocolVersion)
{
throw new WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode.ProtocolVersionMismatch,
$"Worker frame protocol version {protocolVersion} is not supported.");
}
if (string.IsNullOrWhiteSpace(nonce))
{
throw new WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode.InvalidConfiguration,
"Worker frame protocol requires a nonce.");
}
if (maxMessageBytes <= 0)
{
throw new WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode.InvalidConfiguration,
"Worker frame protocol max message size must be greater than zero.");
}
SessionId = sessionId;
ProtocolVersion = protocolVersion;
Nonce = nonce;
MaxMessageBytes = maxMessageBytes;
}
public string SessionId { get; }
public uint ProtocolVersion { get; }
public string Nonce { get; }
public int MaxMessageBytes { get; }
}
@@ -0,0 +1,93 @@
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Google.Protobuf;
using MxGateway.Contracts.Proto;
namespace MxGateway.Worker.Ipc;
public sealed class WorkerFrameReader
{
private readonly WorkerFrameProtocolOptions _options;
private readonly Stream _stream;
public WorkerFrameReader(
Stream stream,
WorkerFrameProtocolOptions options)
{
_stream = stream ?? throw new ArgumentNullException(nameof(stream));
_options = options ?? throw new ArgumentNullException(nameof(options));
}
public async Task<WorkerEnvelope> ReadAsync(CancellationToken cancellationToken = default)
{
byte[] lengthPrefix = new byte[sizeof(uint)];
await ReadExactlyOrThrowAsync(lengthPrefix, cancellationToken).ConfigureAwait(false);
uint payloadLength = ReadUInt32LittleEndian(lengthPrefix);
if (payloadLength == 0)
{
throw new WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode.MalformedLength,
"Worker frame payload length must be greater than zero.");
}
if (payloadLength > _options.MaxMessageBytes)
{
throw new WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode.MessageTooLarge,
$"Worker frame payload length {payloadLength} exceeds the configured maximum of {_options.MaxMessageBytes} bytes.");
}
byte[] payload = new byte[payloadLength];
await ReadExactlyOrThrowAsync(payload, cancellationToken).ConfigureAwait(false);
WorkerEnvelope envelope;
try
{
envelope = WorkerEnvelope.Parser.ParseFrom(payload);
}
catch (InvalidProtocolBufferException exception)
{
throw new WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode.InvalidEnvelope,
"Worker frame payload is not a valid WorkerEnvelope protobuf message.",
exception);
}
WorkerEnvelopeValidator.Validate(envelope, _options);
return envelope;
}
private static uint ReadUInt32LittleEndian(byte[] buffer)
{
return (uint)buffer[0]
| ((uint)buffer[1] << 8)
| ((uint)buffer[2] << 16)
| ((uint)buffer[3] << 24);
}
private async Task ReadExactlyOrThrowAsync(
byte[] buffer,
CancellationToken cancellationToken)
{
int offset = 0;
while (offset < buffer.Length)
{
int bytesRead = await _stream
.ReadAsync(buffer, offset, buffer.Length - offset, cancellationToken)
.ConfigureAwait(false);
if (bytesRead == 0)
{
throw new WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode.EndOfStream,
"Worker frame ended before the expected number of bytes were read.");
}
offset += bytesRead;
}
}
}
@@ -0,0 +1,76 @@
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Google.Protobuf;
using MxGateway.Contracts.Proto;
namespace MxGateway.Worker.Ipc;
public sealed class WorkerFrameWriter
{
private readonly WorkerFrameProtocolOptions _options;
private readonly SemaphoreSlim _writeLock = new(1, 1);
private readonly Stream _stream;
public WorkerFrameWriter(
Stream stream,
WorkerFrameProtocolOptions options)
{
_stream = stream ?? throw new ArgumentNullException(nameof(stream));
_options = options ?? throw new ArgumentNullException(nameof(options));
}
public async Task WriteAsync(
WorkerEnvelope envelope,
CancellationToken cancellationToken = default)
{
if (envelope is null)
{
throw new ArgumentNullException(nameof(envelope));
}
WorkerEnvelopeValidator.Validate(envelope, _options);
int payloadLength = envelope.CalculateSize();
if (payloadLength == 0)
{
throw new WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode.InvalidEnvelope,
"Worker envelope cannot serialize to an empty payload.");
}
if (payloadLength > _options.MaxMessageBytes)
{
throw new WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode.MessageTooLarge,
$"Worker envelope payload length {payloadLength} exceeds the configured maximum of {_options.MaxMessageBytes} bytes.");
}
byte[] payload = envelope.ToByteArray();
byte[] lengthPrefix = new byte[sizeof(uint)];
WriteUInt32LittleEndian(lengthPrefix, (uint)payloadLength);
await _writeLock.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
await _stream.WriteAsync(lengthPrefix, 0, lengthPrefix.Length, cancellationToken).ConfigureAwait(false);
await _stream.WriteAsync(payload, 0, payload.Length, cancellationToken).ConfigureAwait(false);
await _stream.FlushAsync(cancellationToken).ConfigureAwait(false);
}
finally
{
_writeLock.Release();
}
}
private static void WriteUInt32LittleEndian(
byte[] buffer,
uint value)
{
buffer[0] = (byte)value;
buffer[1] = (byte)(value >> 8);
buffer[2] = (byte)(value >> 16);
buffer[3] = (byte)(value >> 24);
}
}
@@ -0,0 +1,67 @@
using System;
using System.IO.Pipes;
using System.Threading;
using System.Threading.Tasks;
using MxGateway.Worker.Bootstrap;
namespace MxGateway.Worker.Ipc;
public sealed class WorkerPipeClient : IWorkerPipeClient
{
public const int DefaultConnectTimeoutMilliseconds = 30000;
private readonly int _connectTimeoutMilliseconds;
public WorkerPipeClient()
: this(DefaultConnectTimeoutMilliseconds)
{
}
public WorkerPipeClient(int connectTimeoutMilliseconds)
{
if (connectTimeoutMilliseconds <= 0)
{
throw new ArgumentOutOfRangeException(
nameof(connectTimeoutMilliseconds),
"Worker pipe connect timeout must be greater than zero.");
}
_connectTimeoutMilliseconds = connectTimeoutMilliseconds;
}
public async Task RunAsync(
WorkerOptions options,
CancellationToken cancellationToken = default)
{
if (options is null)
{
throw new ArgumentNullException(nameof(options));
}
WorkerFrameProtocolOptions frameOptions = new(options);
using NamedPipeClientStream pipe = new(
".",
options.PipeName,
PipeDirection.InOut,
PipeOptions.Asynchronous);
await ConnectAsync(pipe, cancellationToken).ConfigureAwait(false);
WorkerPipeSession session = new(pipe, frameOptions);
await session.CompleteStartupHandshakeAsync(cancellationToken).ConfigureAwait(false);
}
private Task ConnectAsync(
NamedPipeClientStream pipe,
CancellationToken cancellationToken)
{
return Task.Run(
() =>
{
cancellationToken.ThrowIfCancellationRequested();
pipe.Connect(_connectTimeoutMilliseconds);
},
cancellationToken);
}
}
@@ -0,0 +1,218 @@
using System;
using System.Diagnostics;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Google.Protobuf.WellKnownTypes;
using MxGateway.Contracts.Proto;
using MxGateway.Worker.MxAccess;
namespace MxGateway.Worker.Ipc;
public sealed class WorkerPipeSession
{
private readonly WorkerFrameProtocolOptions _options;
private readonly Func<int> _processIdProvider;
private readonly WorkerFrameReader _reader;
private readonly WorkerFrameWriter _writer;
private long _nextSequence;
public WorkerPipeSession(
Stream stream,
WorkerFrameProtocolOptions options)
: this(
new WorkerFrameReader(stream, options),
new WorkerFrameWriter(stream, options),
options,
() => Process.GetCurrentProcess().Id)
{
}
public WorkerPipeSession(
WorkerFrameReader reader,
WorkerFrameWriter writer,
WorkerFrameProtocolOptions options,
Func<int> processIdProvider)
{
_reader = reader ?? throw new ArgumentNullException(nameof(reader));
_writer = writer ?? throw new ArgumentNullException(nameof(writer));
_options = options ?? throw new ArgumentNullException(nameof(options));
_processIdProvider = processIdProvider ?? throw new ArgumentNullException(nameof(processIdProvider));
}
public Task CompleteStartupHandshakeAsync(CancellationToken cancellationToken = default)
{
return CompleteStartupHandshakeAsync(_ => Task.CompletedTask, cancellationToken);
}
public async Task CompleteStartupHandshakeAsync(
Func<CancellationToken, Task> initializeMxAccessAsync,
CancellationToken cancellationToken = default)
{
if (initializeMxAccessAsync is null)
{
throw new ArgumentNullException(nameof(initializeMxAccessAsync));
}
try
{
WorkerEnvelope envelope = await _reader.ReadAsync(cancellationToken).ConfigureAwait(false);
ValidateGatewayHello(envelope);
await WriteWorkerHelloAsync(cancellationToken).ConfigureAwait(false);
await initializeMxAccessAsync(cancellationToken).ConfigureAwait(false);
await WriteWorkerReadyAsync(cancellationToken).ConfigureAwait(false);
}
catch (WorkerFrameProtocolException exception)
{
await TryWriteFaultAsync(exception, cancellationToken).ConfigureAwait(false);
throw;
}
}
private void ValidateGatewayHello(WorkerEnvelope envelope)
{
if (envelope.BodyCase != WorkerEnvelope.BodyOneofCase.GatewayHello)
{
throw new WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode.UnexpectedEnvelopeBody,
"Worker expected GatewayHello during startup handshake.");
}
GatewayHello gatewayHello = envelope.GatewayHello;
if (gatewayHello.SupportedProtocolVersion != _options.ProtocolVersion)
{
throw new WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode.ProtocolVersionMismatch,
$"GatewayHello supported protocol version {gatewayHello.SupportedProtocolVersion} does not match expected version {_options.ProtocolVersion}.");
}
if (!string.Equals(gatewayHello.Nonce, _options.Nonce, StringComparison.Ordinal))
{
throw new WorkerFrameProtocolException(
WorkerFrameProtocolErrorCode.NonceMismatch,
"GatewayHello nonce does not match the worker launch nonce.");
}
}
private Task WriteWorkerHelloAsync(CancellationToken cancellationToken)
{
return _writer.WriteAsync(
CreateEnvelope(new WorkerHello
{
ProtocolVersion = _options.ProtocolVersion,
Nonce = _options.Nonce,
WorkerProcessId = _processIdProvider(),
WorkerVersion = typeof(WorkerPipeSession).Assembly.GetName().Version?.ToString() ?? string.Empty,
}),
cancellationToken);
}
private Task WriteWorkerReadyAsync(CancellationToken cancellationToken)
{
return _writer.WriteAsync(
CreateEnvelope(new WorkerReady
{
WorkerProcessId = _processIdProvider(),
MxaccessProgid = MxAccessInteropInfo.ProgId,
MxaccessClsid = MxAccessInteropInfo.Clsid,
ReadyTimestamp = Timestamp.FromDateTime(DateTime.UtcNow),
}),
cancellationToken);
}
private async Task TryWriteFaultAsync(
WorkerFrameProtocolException exception,
CancellationToken cancellationToken)
{
try
{
await _writer
.WriteAsync(CreateEnvelope(CreateFault(exception)), cancellationToken)
.ConfigureAwait(false);
}
catch (Exception faultWriteException) when (
faultWriteException is IOException
|| faultWriteException is ObjectDisposedException
|| faultWriteException is WorkerFrameProtocolException)
{
// The original protocol failure is the actionable error.
}
}
private WorkerEnvelope CreateEnvelope(WorkerHello hello)
{
return CreateBaseEnvelope(hello);
}
private WorkerEnvelope CreateEnvelope(WorkerReady ready)
{
return CreateBaseEnvelope(ready);
}
private WorkerEnvelope CreateEnvelope(WorkerFault fault)
{
return CreateBaseEnvelope(fault);
}
private WorkerEnvelope CreateBaseEnvelope(WorkerHello body)
{
WorkerEnvelope envelope = CreateBaseEnvelope();
envelope.WorkerHello = body;
return envelope;
}
private WorkerEnvelope CreateBaseEnvelope(WorkerReady body)
{
WorkerEnvelope envelope = CreateBaseEnvelope();
envelope.WorkerReady = body;
return envelope;
}
private WorkerEnvelope CreateBaseEnvelope(WorkerFault body)
{
WorkerEnvelope envelope = CreateBaseEnvelope();
envelope.WorkerFault = body;
return envelope;
}
private WorkerEnvelope CreateBaseEnvelope()
{
return new WorkerEnvelope
{
ProtocolVersion = _options.ProtocolVersion,
SessionId = _options.SessionId,
Sequence = NextSequence(),
};
}
private ulong NextSequence()
{
return unchecked((ulong)Interlocked.Increment(ref _nextSequence));
}
private static WorkerFault CreateFault(WorkerFrameProtocolException exception)
{
return new WorkerFault
{
Category = MapFaultCategory(exception.ErrorCode),
ExceptionType = exception.GetType().FullName ?? string.Empty,
DiagnosticMessage = exception.Message,
ProtocolStatus = new ProtocolStatus
{
Code = ProtocolStatusCode.ProtocolViolation,
Message = exception.Message,
},
};
}
private static WorkerFaultCategory MapFaultCategory(WorkerFrameProtocolErrorCode errorCode)
{
return errorCode switch
{
WorkerFrameProtocolErrorCode.ProtocolVersionMismatch => WorkerFaultCategory.ProtocolMismatch,
WorkerFrameProtocolErrorCode.EndOfStream => WorkerFaultCategory.PipeDisconnected,
_ => WorkerFaultCategory.ProtocolViolation,
};
}
}
+52 -1
View File
@@ -1,6 +1,8 @@
using System;
using System.Collections.Generic;
using System.IO;
using MxGateway.Worker.Bootstrap;
using MxGateway.Worker.Ipc;
namespace MxGateway.Worker;
@@ -11,13 +13,27 @@ public static class WorkerApplication
return Run(
args,
new EnvironmentVariableWorkerEnvironment(),
new WorkerConsoleLogger(Console.Error));
new WorkerConsoleLogger(Console.Error),
new WorkerPipeClient());
}
public static int Run(
string[] args,
IWorkerEnvironment environment,
IWorkerLogger logger)
{
return Run(
args,
environment,
logger,
new WorkerPipeClient());
}
public static int Run(
string[] args,
IWorkerEnvironment environment,
IWorkerLogger logger,
IWorkerPipeClient pipeClient)
{
if (args is null)
{
@@ -34,6 +50,11 @@ public static class WorkerApplication
throw new ArgumentNullException(nameof(logger));
}
if (pipeClient is null)
{
throw new ArgumentNullException(nameof(pipeClient));
}
try
{
WorkerOptionsParser parser = new(environment);
@@ -61,8 +82,38 @@ public static class WorkerApplication
["nonce"] = options.Nonce,
});
pipeClient.RunAsync(options).GetAwaiter().GetResult();
logger.Information("WorkerPipeHandshakeSucceeded", new Dictionary<string, object?>
{
["session_id"] = options.SessionId,
["pipe_name"] = options.PipeName,
["protocol_version"] = options.ProtocolVersion,
});
return (int)WorkerExitCode.Success;
}
catch (WorkerFrameProtocolException exception)
{
logger.Error("WorkerPipeProtocolFailure", new Dictionary<string, object?>
{
["exit_code"] = WorkerExitCode.ProtocolViolation,
["error_code"] = exception.ErrorCode,
["exception_type"] = exception.GetType().FullName,
});
return (int)WorkerExitCode.ProtocolViolation;
}
catch (Exception exception) when (exception is IOException or TimeoutException)
{
logger.Error("WorkerPipeConnectionFailed", new Dictionary<string, object?>
{
["exit_code"] = WorkerExitCode.PipeConnectionFailed,
["exception_type"] = exception.GetType().FullName,
});
return (int)WorkerExitCode.PipeConnectionFailed;
}
catch (Exception exception)
{
logger.Error("WorkerBootstrapUnexpectedFailure", new Dictionary<string, object?>