Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| e81682e367 | |||
| 603aff7004 | |||
| d5a982152b | |||
| 0b0be7098e |
@@ -250,6 +250,17 @@ The loop should update a heartbeat timestamp after:
|
|||||||
- finishing a command,
|
- finishing a command,
|
||||||
- processing an MXAccess event.
|
- processing an MXAccess event.
|
||||||
|
|
||||||
|
`StaRuntime` implements this runtime boundary in the worker. It starts one
|
||||||
|
background thread named `MxGateway.Worker.STA`, sets it to `ApartmentState.STA`,
|
||||||
|
initializes COM through `StaComApartmentInitializer`, and runs
|
||||||
|
`StaMessagePump`. Commands are scheduled through `InvokeAsync`; the command
|
||||||
|
queue signals an `AutoResetEvent` so `MsgWaitForMultipleObjectsEx` can wake the
|
||||||
|
STA without busy-waiting. `LastActivityUtc` records pump, command, startup, and
|
||||||
|
shutdown activity so the future heartbeat/watchdog can report whether the STA
|
||||||
|
is still responsive. Shutdown marks the runtime as closing, wakes the pump,
|
||||||
|
rejects new commands, cancels queued work, uninitializes COM on the STA, and
|
||||||
|
waits for the thread to exit.
|
||||||
|
|
||||||
## COM Creation
|
## COM Creation
|
||||||
|
|
||||||
The MXAccess analysis source at `C:\Users\dohertj2\Desktop\mxaccess` identifies
|
The MXAccess analysis source at `C:\Users\dohertj2\Desktop\mxaccess` identifies
|
||||||
|
|||||||
@@ -1,6 +1,9 @@
|
|||||||
using System;
|
using System;
|
||||||
|
using System.Threading;
|
||||||
|
using System.Threading.Tasks;
|
||||||
using MxGateway.Contracts;
|
using MxGateway.Contracts;
|
||||||
using MxGateway.Worker.Bootstrap;
|
using MxGateway.Worker.Bootstrap;
|
||||||
|
using MxGateway.Worker.Ipc;
|
||||||
|
|
||||||
namespace MxGateway.Worker.Tests.Bootstrap;
|
namespace MxGateway.Worker.Tests.Bootstrap;
|
||||||
|
|
||||||
@@ -15,16 +18,19 @@ public sealed class WorkerApplicationTests
|
|||||||
int exitCode = MxGateway.Worker.WorkerApplication.Run(
|
int exitCode = MxGateway.Worker.WorkerApplication.Run(
|
||||||
ValidArgs(),
|
ValidArgs(),
|
||||||
environment,
|
environment,
|
||||||
logger);
|
logger,
|
||||||
|
new SucceedingPipeClient());
|
||||||
|
|
||||||
Assert.Equal((int)WorkerExitCode.Success, exitCode);
|
Assert.Equal((int)WorkerExitCode.Success, exitCode);
|
||||||
MemoryWorkerLogEntry entry = Assert.Single(logger.Entries);
|
Assert.Equal(2, logger.Entries.Count);
|
||||||
|
MemoryWorkerLogEntry entry = logger.Entries[0];
|
||||||
Assert.Equal("Information", entry.Level);
|
Assert.Equal("Information", entry.Level);
|
||||||
Assert.Equal("WorkerBootstrapSucceeded", entry.EventName);
|
Assert.Equal("WorkerBootstrapSucceeded", entry.EventName);
|
||||||
Assert.Equal("session-1", entry.Fields["session_id"]);
|
Assert.Equal("session-1", entry.Fields["session_id"]);
|
||||||
Assert.Equal("mxaccess-gateway-123-session-1", entry.Fields["pipe_name"]);
|
Assert.Equal("mxaccess-gateway-123-session-1", entry.Fields["pipe_name"]);
|
||||||
Assert.Equal(GatewayContractInfo.WorkerProtocolVersion, entry.Fields["protocol_version"]);
|
Assert.Equal(GatewayContractInfo.WorkerProtocolVersion, entry.Fields["protocol_version"]);
|
||||||
Assert.Equal("[redacted]", entry.Fields["nonce"]);
|
Assert.Equal("[redacted]", entry.Fields["nonce"]);
|
||||||
|
Assert.Equal("WorkerPipeHandshakeSucceeded", logger.Entries[1].EventName);
|
||||||
}
|
}
|
||||||
|
|
||||||
[Fact]
|
[Fact]
|
||||||
@@ -73,6 +79,24 @@ public sealed class WorkerApplicationTests
|
|||||||
Assert.Equal((int)WorkerExitCode.MissingNonce, exitCode);
|
Assert.Equal((int)WorkerExitCode.MissingNonce, exitCode);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Run_WithPipeProtocolFailure_ReturnsProtocolViolation()
|
||||||
|
{
|
||||||
|
MemoryWorkerEnvironment environment = CreateEnvironment("nonce-secret");
|
||||||
|
MemoryWorkerLogger logger = new();
|
||||||
|
|
||||||
|
int exitCode = MxGateway.Worker.WorkerApplication.Run(
|
||||||
|
ValidArgs(),
|
||||||
|
environment,
|
||||||
|
logger,
|
||||||
|
new ThrowingPipeClient(new WorkerFrameProtocolException(
|
||||||
|
WorkerFrameProtocolErrorCode.NonceMismatch,
|
||||||
|
"Bad nonce.")));
|
||||||
|
|
||||||
|
Assert.Equal((int)WorkerExitCode.ProtocolViolation, exitCode);
|
||||||
|
Assert.Equal("WorkerPipeProtocolFailure", logger.Entries[1].EventName);
|
||||||
|
}
|
||||||
|
|
||||||
[Fact]
|
[Fact]
|
||||||
public void Run_WithUnexpectedBootstrapFailure_ReturnsUnexpectedFailure()
|
public void Run_WithUnexpectedBootstrapFailure_ReturnsUnexpectedFailure()
|
||||||
{
|
{
|
||||||
@@ -110,4 +134,31 @@ public sealed class WorkerApplicationTests
|
|||||||
environment.Set(WorkerOptions.NonceEnvironmentVariableName, nonce);
|
environment.Set(WorkerOptions.NonceEnvironmentVariableName, nonce);
|
||||||
return environment;
|
return environment;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private sealed class SucceedingPipeClient : IWorkerPipeClient
|
||||||
|
{
|
||||||
|
public Task RunAsync(
|
||||||
|
WorkerOptions options,
|
||||||
|
CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
return Task.CompletedTask;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private sealed class ThrowingPipeClient : IWorkerPipeClient
|
||||||
|
{
|
||||||
|
private readonly Exception _exception;
|
||||||
|
|
||||||
|
public ThrowingPipeClient(Exception exception)
|
||||||
|
{
|
||||||
|
_exception = exception;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task RunAsync(
|
||||||
|
WorkerOptions options,
|
||||||
|
CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
throw _exception;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,163 @@
|
|||||||
|
using System;
|
||||||
|
using System.IO;
|
||||||
|
using System.Linq;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
using Google.Protobuf;
|
||||||
|
using MxGateway.Contracts;
|
||||||
|
using MxGateway.Contracts.Proto;
|
||||||
|
using MxGateway.Worker.Ipc;
|
||||||
|
|
||||||
|
namespace MxGateway.Worker.Tests.Ipc;
|
||||||
|
|
||||||
|
public sealed class WorkerFrameProtocolTests
|
||||||
|
{
|
||||||
|
private const string SessionId = "session-1";
|
||||||
|
private const string Nonce = "nonce-secret";
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task WriteAndReadAsync_WithValidEnvelope_RoundTripsFrame()
|
||||||
|
{
|
||||||
|
WorkerFrameProtocolOptions options = CreateOptions();
|
||||||
|
MemoryStream stream = new();
|
||||||
|
WorkerEnvelope original = CreateGatewayHelloEnvelope();
|
||||||
|
|
||||||
|
WorkerFrameWriter writer = new(stream, options);
|
||||||
|
await writer.WriteAsync(original);
|
||||||
|
stream.Position = 0;
|
||||||
|
|
||||||
|
WorkerFrameReader reader = new(stream, options);
|
||||||
|
WorkerEnvelope parsed = await reader.ReadAsync();
|
||||||
|
|
||||||
|
Assert.Equal(original, parsed);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task ReadAsync_WithWrongProtocolVersion_ThrowsProtocolVersionMismatch()
|
||||||
|
{
|
||||||
|
WorkerFrameProtocolOptions options = CreateOptions();
|
||||||
|
WorkerEnvelope envelope = CreateGatewayHelloEnvelope();
|
||||||
|
envelope.ProtocolVersion++;
|
||||||
|
MemoryStream stream = new(CreateFrame(envelope));
|
||||||
|
|
||||||
|
WorkerFrameReader reader = new(stream, options);
|
||||||
|
WorkerFrameProtocolException exception =
|
||||||
|
await Assert.ThrowsAsync<WorkerFrameProtocolException>(
|
||||||
|
async () => await reader.ReadAsync());
|
||||||
|
|
||||||
|
Assert.Equal(WorkerFrameProtocolErrorCode.ProtocolVersionMismatch, exception.ErrorCode);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task ReadAsync_WithWrongSessionId_ThrowsSessionMismatch()
|
||||||
|
{
|
||||||
|
WorkerFrameProtocolOptions options = CreateOptions();
|
||||||
|
WorkerEnvelope envelope = CreateGatewayHelloEnvelope();
|
||||||
|
envelope.SessionId = "different-session";
|
||||||
|
MemoryStream stream = new(CreateFrame(envelope));
|
||||||
|
|
||||||
|
WorkerFrameReader reader = new(stream, options);
|
||||||
|
WorkerFrameProtocolException exception =
|
||||||
|
await Assert.ThrowsAsync<WorkerFrameProtocolException>(
|
||||||
|
async () => await reader.ReadAsync());
|
||||||
|
|
||||||
|
Assert.Equal(WorkerFrameProtocolErrorCode.SessionMismatch, exception.ErrorCode);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task ReadAsync_WithMalformedLength_ThrowsMalformedLength()
|
||||||
|
{
|
||||||
|
WorkerFrameProtocolOptions options = CreateOptions();
|
||||||
|
MemoryStream stream = new(new byte[sizeof(uint)]);
|
||||||
|
|
||||||
|
WorkerFrameReader reader = new(stream, options);
|
||||||
|
WorkerFrameProtocolException exception =
|
||||||
|
await Assert.ThrowsAsync<WorkerFrameProtocolException>(
|
||||||
|
async () => await reader.ReadAsync());
|
||||||
|
|
||||||
|
Assert.Equal(WorkerFrameProtocolErrorCode.MalformedLength, exception.ErrorCode);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task ReadAsync_WithMalformedPayload_ThrowsInvalidEnvelope()
|
||||||
|
{
|
||||||
|
WorkerFrameProtocolOptions options = CreateOptions();
|
||||||
|
MemoryStream stream = new(CreateFrame(new byte[] { 0x80 }));
|
||||||
|
|
||||||
|
WorkerFrameReader reader = new(stream, options);
|
||||||
|
WorkerFrameProtocolException exception =
|
||||||
|
await Assert.ThrowsAsync<WorkerFrameProtocolException>(
|
||||||
|
async () => await reader.ReadAsync());
|
||||||
|
|
||||||
|
Assert.Equal(WorkerFrameProtocolErrorCode.InvalidEnvelope, exception.ErrorCode);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task WriteAsync_WithConcurrentCalls_SerializesCompleteFrames()
|
||||||
|
{
|
||||||
|
WorkerFrameProtocolOptions options = CreateOptions();
|
||||||
|
MemoryStream stream = new();
|
||||||
|
WorkerFrameWriter writer = new(stream, options);
|
||||||
|
|
||||||
|
await Task.WhenAll(
|
||||||
|
writer.WriteAsync(CreateGatewayHelloEnvelope(sequence: 1)),
|
||||||
|
writer.WriteAsync(CreateGatewayHelloEnvelope(sequence: 2)),
|
||||||
|
writer.WriteAsync(CreateGatewayHelloEnvelope(sequence: 3)));
|
||||||
|
|
||||||
|
stream.Position = 0;
|
||||||
|
WorkerFrameReader reader = new(stream, options);
|
||||||
|
|
||||||
|
WorkerEnvelope first = await reader.ReadAsync();
|
||||||
|
WorkerEnvelope second = await reader.ReadAsync();
|
||||||
|
WorkerEnvelope third = await reader.ReadAsync();
|
||||||
|
|
||||||
|
Assert.Equal(new ulong[] { 1, 2, 3 }, new[] { first.Sequence, second.Sequence, third.Sequence }.OrderBy(sequence => sequence));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static WorkerFrameProtocolOptions CreateOptions()
|
||||||
|
{
|
||||||
|
return new WorkerFrameProtocolOptions(
|
||||||
|
SessionId,
|
||||||
|
GatewayContractInfo.WorkerProtocolVersion,
|
||||||
|
Nonce);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static WorkerEnvelope CreateGatewayHelloEnvelope(ulong sequence = 1)
|
||||||
|
{
|
||||||
|
return new WorkerEnvelope
|
||||||
|
{
|
||||||
|
ProtocolVersion = GatewayContractInfo.WorkerProtocolVersion,
|
||||||
|
SessionId = SessionId,
|
||||||
|
Sequence = sequence,
|
||||||
|
GatewayHello = new GatewayHello
|
||||||
|
{
|
||||||
|
SupportedProtocolVersion = GatewayContractInfo.WorkerProtocolVersion,
|
||||||
|
Nonce = Nonce,
|
||||||
|
GatewayVersion = "test-gateway",
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
private static byte[] CreateFrame(IMessage message)
|
||||||
|
{
|
||||||
|
return CreateFrame(message.ToByteArray());
|
||||||
|
}
|
||||||
|
|
||||||
|
private static byte[] CreateFrame(byte[] payload)
|
||||||
|
{
|
||||||
|
byte[] frame = new byte[sizeof(uint) + payload.Length];
|
||||||
|
WriteUInt32LittleEndian(frame, (uint)payload.Length);
|
||||||
|
payload.CopyTo(frame, sizeof(uint));
|
||||||
|
|
||||||
|
return frame;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void WriteUInt32LittleEndian(
|
||||||
|
byte[] buffer,
|
||||||
|
uint value)
|
||||||
|
{
|
||||||
|
buffer[0] = (byte)value;
|
||||||
|
buffer[1] = (byte)(value >> 8);
|
||||||
|
buffer[2] = (byte)(value >> 16);
|
||||||
|
buffer[3] = (byte)(value >> 24);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,61 @@
|
|||||||
|
using System;
|
||||||
|
using System.IO.Pipes;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
using MxGateway.Contracts;
|
||||||
|
using MxGateway.Contracts.Proto;
|
||||||
|
using MxGateway.Worker.Bootstrap;
|
||||||
|
using MxGateway.Worker.Ipc;
|
||||||
|
|
||||||
|
namespace MxGateway.Worker.Tests.Ipc;
|
||||||
|
|
||||||
|
public sealed class WorkerPipeClientTests
|
||||||
|
{
|
||||||
|
[Fact]
|
||||||
|
public async Task RunAsync_ConnectsToPipeAndCompletesHandshake()
|
||||||
|
{
|
||||||
|
string pipeName = $"mxaccess-gateway-test-{Guid.NewGuid():N}";
|
||||||
|
WorkerOptions workerOptions = new(
|
||||||
|
"session-1",
|
||||||
|
pipeName,
|
||||||
|
GatewayContractInfo.WorkerProtocolVersion,
|
||||||
|
"nonce-secret");
|
||||||
|
WorkerFrameProtocolOptions frameOptions = new(workerOptions);
|
||||||
|
|
||||||
|
using NamedPipeServerStream server = new(
|
||||||
|
pipeName,
|
||||||
|
PipeDirection.InOut,
|
||||||
|
1,
|
||||||
|
PipeTransmissionMode.Byte,
|
||||||
|
PipeOptions.Asynchronous);
|
||||||
|
|
||||||
|
WorkerPipeClient client = new(connectTimeoutMilliseconds: 5000);
|
||||||
|
Task clientTask = client.RunAsync(workerOptions);
|
||||||
|
|
||||||
|
await Task.Factory.FromAsync(server.BeginWaitForConnection, server.EndWaitForConnection, null);
|
||||||
|
|
||||||
|
WorkerFrameReader reader = new(server, frameOptions);
|
||||||
|
WorkerFrameWriter writer = new(server, frameOptions);
|
||||||
|
|
||||||
|
await writer.WriteAsync(new WorkerEnvelope
|
||||||
|
{
|
||||||
|
ProtocolVersion = GatewayContractInfo.WorkerProtocolVersion,
|
||||||
|
SessionId = "session-1",
|
||||||
|
Sequence = 1,
|
||||||
|
GatewayHello = new GatewayHello
|
||||||
|
{
|
||||||
|
SupportedProtocolVersion = GatewayContractInfo.WorkerProtocolVersion,
|
||||||
|
Nonce = "nonce-secret",
|
||||||
|
GatewayVersion = "test-gateway",
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
WorkerEnvelope hello = await reader.ReadAsync();
|
||||||
|
Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerHello, hello.BodyCase);
|
||||||
|
Assert.Equal("nonce-secret", hello.WorkerHello.Nonce);
|
||||||
|
|
||||||
|
WorkerEnvelope ready = await reader.ReadAsync();
|
||||||
|
Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerReady, ready.BodyCase);
|
||||||
|
|
||||||
|
await clientTask;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,192 @@
|
|||||||
|
using System.Collections.Generic;
|
||||||
|
using System.IO;
|
||||||
|
using System.Threading;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
using MxGateway.Contracts;
|
||||||
|
using MxGateway.Contracts.Proto;
|
||||||
|
using MxGateway.Worker.Ipc;
|
||||||
|
|
||||||
|
namespace MxGateway.Worker.Tests.Ipc;
|
||||||
|
|
||||||
|
public sealed class WorkerPipeSessionTests
|
||||||
|
{
|
||||||
|
private const string SessionId = "session-1";
|
||||||
|
private const string Nonce = "nonce-secret";
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task CompleteStartupHandshakeAsync_WithValidGatewayHello_SendsHelloThenReady()
|
||||||
|
{
|
||||||
|
WorkerFrameProtocolOptions options = CreateOptions();
|
||||||
|
MemoryStream inbound = new();
|
||||||
|
await new WorkerFrameWriter(inbound, options).WriteAsync(CreateGatewayHelloEnvelope());
|
||||||
|
inbound.Position = 0;
|
||||||
|
MemoryStream outbound = new();
|
||||||
|
WorkerPipeSession session = CreateSession(inbound, outbound, options);
|
||||||
|
bool initialized = false;
|
||||||
|
|
||||||
|
await session.CompleteStartupHandshakeAsync(
|
||||||
|
_ =>
|
||||||
|
{
|
||||||
|
initialized = true;
|
||||||
|
return Task.CompletedTask;
|
||||||
|
});
|
||||||
|
|
||||||
|
Assert.True(initialized);
|
||||||
|
WorkerEnvelope[] written = ReadWrittenFrames(outbound, options);
|
||||||
|
Assert.Equal(2, written.Length);
|
||||||
|
Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerHello, written[0].BodyCase);
|
||||||
|
Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerReady, written[1].BodyCase);
|
||||||
|
Assert.Equal(Nonce, written[0].WorkerHello.Nonce);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task CompleteStartupHandshakeAsync_WithWrongNonce_FaultsBeforeInitialization()
|
||||||
|
{
|
||||||
|
WorkerFrameProtocolOptions options = CreateOptions();
|
||||||
|
MemoryStream inbound = new();
|
||||||
|
await new WorkerFrameWriter(inbound, options).WriteAsync(CreateGatewayHelloEnvelope(nonce: "wrong"));
|
||||||
|
inbound.Position = 0;
|
||||||
|
MemoryStream outbound = new();
|
||||||
|
WorkerPipeSession session = CreateSession(inbound, outbound, options);
|
||||||
|
bool initialized = false;
|
||||||
|
|
||||||
|
WorkerFrameProtocolException exception =
|
||||||
|
await Assert.ThrowsAsync<WorkerFrameProtocolException>(
|
||||||
|
async () => await session.CompleteStartupHandshakeAsync(
|
||||||
|
_ =>
|
||||||
|
{
|
||||||
|
initialized = true;
|
||||||
|
return Task.CompletedTask;
|
||||||
|
}));
|
||||||
|
|
||||||
|
Assert.False(initialized);
|
||||||
|
Assert.Equal(WorkerFrameProtocolErrorCode.NonceMismatch, exception.ErrorCode);
|
||||||
|
WorkerEnvelope fault = Assert.Single(ReadWrittenFrames(outbound, options));
|
||||||
|
Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerFault, fault.BodyCase);
|
||||||
|
Assert.Equal(WorkerFaultCategory.ProtocolViolation, fault.WorkerFault.Category);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task CompleteStartupHandshakeAsync_WithWrongProtocol_FaultsBeforeInitialization()
|
||||||
|
{
|
||||||
|
WorkerFrameProtocolOptions options = CreateOptions();
|
||||||
|
MemoryStream inbound = new();
|
||||||
|
await new WorkerFrameWriter(inbound, options).WriteAsync(CreateGatewayHelloEnvelope(supportedProtocolVersion: 999));
|
||||||
|
inbound.Position = 0;
|
||||||
|
MemoryStream outbound = new();
|
||||||
|
WorkerPipeSession session = CreateSession(inbound, outbound, options);
|
||||||
|
bool initialized = false;
|
||||||
|
|
||||||
|
WorkerFrameProtocolException exception =
|
||||||
|
await Assert.ThrowsAsync<WorkerFrameProtocolException>(
|
||||||
|
async () => await session.CompleteStartupHandshakeAsync(
|
||||||
|
_ =>
|
||||||
|
{
|
||||||
|
initialized = true;
|
||||||
|
return Task.CompletedTask;
|
||||||
|
}));
|
||||||
|
|
||||||
|
Assert.False(initialized);
|
||||||
|
Assert.Equal(WorkerFrameProtocolErrorCode.ProtocolVersionMismatch, exception.ErrorCode);
|
||||||
|
WorkerEnvelope fault = Assert.Single(ReadWrittenFrames(outbound, options));
|
||||||
|
Assert.Equal(WorkerFaultCategory.ProtocolMismatch, fault.WorkerFault.Category);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task CompleteStartupHandshakeAsync_WithMalformedFrame_WritesWorkerFault()
|
||||||
|
{
|
||||||
|
WorkerFrameProtocolOptions options = CreateOptions();
|
||||||
|
MemoryStream inbound = new(CreateFrame(new byte[] { 0x80 }));
|
||||||
|
MemoryStream outbound = new();
|
||||||
|
WorkerPipeSession session = CreateSession(inbound, outbound, options);
|
||||||
|
bool initialized = false;
|
||||||
|
|
||||||
|
WorkerFrameProtocolException exception =
|
||||||
|
await Assert.ThrowsAsync<WorkerFrameProtocolException>(
|
||||||
|
async () => await session.CompleteStartupHandshakeAsync(
|
||||||
|
_ =>
|
||||||
|
{
|
||||||
|
initialized = true;
|
||||||
|
return Task.CompletedTask;
|
||||||
|
}));
|
||||||
|
|
||||||
|
Assert.False(initialized);
|
||||||
|
Assert.Equal(WorkerFrameProtocolErrorCode.InvalidEnvelope, exception.ErrorCode);
|
||||||
|
WorkerEnvelope fault = Assert.Single(ReadWrittenFrames(outbound, options));
|
||||||
|
Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerFault, fault.BodyCase);
|
||||||
|
Assert.Equal(WorkerFaultCategory.ProtocolViolation, fault.WorkerFault.Category);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static WorkerPipeSession CreateSession(
|
||||||
|
Stream inbound,
|
||||||
|
Stream outbound,
|
||||||
|
WorkerFrameProtocolOptions options)
|
||||||
|
{
|
||||||
|
return new WorkerPipeSession(
|
||||||
|
new WorkerFrameReader(inbound, options),
|
||||||
|
new WorkerFrameWriter(outbound, options),
|
||||||
|
options,
|
||||||
|
() => 1234);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static WorkerFrameProtocolOptions CreateOptions()
|
||||||
|
{
|
||||||
|
return new WorkerFrameProtocolOptions(
|
||||||
|
SessionId,
|
||||||
|
GatewayContractInfo.WorkerProtocolVersion,
|
||||||
|
Nonce);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static WorkerEnvelope CreateGatewayHelloEnvelope(
|
||||||
|
string nonce = Nonce,
|
||||||
|
uint supportedProtocolVersion = GatewayContractInfo.WorkerProtocolVersion)
|
||||||
|
{
|
||||||
|
return new WorkerEnvelope
|
||||||
|
{
|
||||||
|
ProtocolVersion = GatewayContractInfo.WorkerProtocolVersion,
|
||||||
|
SessionId = SessionId,
|
||||||
|
Sequence = 1,
|
||||||
|
GatewayHello = new GatewayHello
|
||||||
|
{
|
||||||
|
SupportedProtocolVersion = supportedProtocolVersion,
|
||||||
|
Nonce = nonce,
|
||||||
|
GatewayVersion = "test-gateway",
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
private static WorkerEnvelope[] ReadWrittenFrames(
|
||||||
|
MemoryStream stream,
|
||||||
|
WorkerFrameProtocolOptions options)
|
||||||
|
{
|
||||||
|
stream.Position = 0;
|
||||||
|
WorkerFrameReader reader = new(stream, options);
|
||||||
|
List<WorkerEnvelope> envelopes = new();
|
||||||
|
|
||||||
|
while (stream.Position < stream.Length)
|
||||||
|
{
|
||||||
|
envelopes.Add(reader.ReadAsync(CancellationToken.None).GetAwaiter().GetResult());
|
||||||
|
}
|
||||||
|
|
||||||
|
return envelopes.ToArray();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static byte[] CreateFrame(byte[] payload)
|
||||||
|
{
|
||||||
|
byte[] frame = new byte[sizeof(uint) + payload.Length];
|
||||||
|
WriteUInt32LittleEndian(frame, (uint)payload.Length);
|
||||||
|
payload.CopyTo(frame, sizeof(uint));
|
||||||
|
|
||||||
|
return frame;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void WriteUInt32LittleEndian(
|
||||||
|
byte[] buffer,
|
||||||
|
uint value)
|
||||||
|
{
|
||||||
|
buffer[0] = (byte)value;
|
||||||
|
buffer[1] = (byte)(value >> 8);
|
||||||
|
buffer[2] = (byte)(value >> 16);
|
||||||
|
buffer[3] = (byte)(value >> 24);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,152 @@
|
|||||||
|
using System;
|
||||||
|
using System.Diagnostics;
|
||||||
|
using System.Threading;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
using MxGateway.Worker.Sta;
|
||||||
|
|
||||||
|
namespace MxGateway.Worker.Tests.Sta;
|
||||||
|
|
||||||
|
public sealed class StaRuntimeTests
|
||||||
|
{
|
||||||
|
[Fact]
|
||||||
|
public async Task InvokeAsync_ExecutesCommandOnStaThread()
|
||||||
|
{
|
||||||
|
RecordingComApartmentInitializer initializer = new();
|
||||||
|
using StaRuntime runtime = CreateRuntime(initializer);
|
||||||
|
|
||||||
|
runtime.Start();
|
||||||
|
|
||||||
|
StaCommandObservation observation = await runtime.InvokeAsync(
|
||||||
|
() => new StaCommandObservation(
|
||||||
|
Thread.CurrentThread.ManagedThreadId,
|
||||||
|
Thread.CurrentThread.GetApartmentState()));
|
||||||
|
|
||||||
|
Assert.Equal(runtime.StaThreadId, observation.ThreadId);
|
||||||
|
Assert.Equal(initializer.InitializeThreadId, observation.ThreadId);
|
||||||
|
Assert.Equal(ApartmentState.STA, observation.ApartmentState);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task InvokeAsync_WakesIdlePumpForQueuedCommand()
|
||||||
|
{
|
||||||
|
RecordingComApartmentInitializer initializer = new();
|
||||||
|
using StaRuntime runtime = new(
|
||||||
|
initializer,
|
||||||
|
new StaMessagePump(),
|
||||||
|
TimeSpan.FromSeconds(30));
|
||||||
|
runtime.Start();
|
||||||
|
Stopwatch stopwatch = Stopwatch.StartNew();
|
||||||
|
|
||||||
|
int threadId = await runtime.InvokeAsync(() => Thread.CurrentThread.ManagedThreadId);
|
||||||
|
|
||||||
|
stopwatch.Stop();
|
||||||
|
Assert.Equal(runtime.StaThreadId, threadId);
|
||||||
|
Assert.True(
|
||||||
|
stopwatch.Elapsed < TimeSpan.FromSeconds(2),
|
||||||
|
$"Command took {stopwatch.Elapsed} to execute, so the command wake event did not wake the STA promptly.");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void Shutdown_StopsThreadAndUninitializesComApartment()
|
||||||
|
{
|
||||||
|
RecordingComApartmentInitializer initializer = new();
|
||||||
|
using StaRuntime runtime = CreateRuntime(initializer);
|
||||||
|
runtime.Start();
|
||||||
|
|
||||||
|
bool stopped = runtime.Shutdown(TimeSpan.FromSeconds(2));
|
||||||
|
|
||||||
|
Assert.True(stopped);
|
||||||
|
Assert.False(runtime.IsRunning);
|
||||||
|
Assert.Equal(1, initializer.InitializeCount);
|
||||||
|
Assert.Equal(1, initializer.UninitializeCount);
|
||||||
|
Assert.Equal(initializer.InitializeThreadId, initializer.UninitializeThreadId);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void LastActivityUtc_UpdatesWhilePumpIsIdle()
|
||||||
|
{
|
||||||
|
RecordingComApartmentInitializer initializer = new();
|
||||||
|
using StaRuntime runtime = CreateRuntime(initializer);
|
||||||
|
runtime.Start();
|
||||||
|
DateTimeOffset firstActivity = runtime.LastActivityUtc;
|
||||||
|
|
||||||
|
bool updated = SpinWait.SpinUntil(
|
||||||
|
() => runtime.LastActivityUtc > firstActivity,
|
||||||
|
TimeSpan.FromSeconds(2));
|
||||||
|
|
||||||
|
Assert.True(updated);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task InvokeAsync_CommandException_FaultsReturnedTaskWithoutStoppingRuntime()
|
||||||
|
{
|
||||||
|
RecordingComApartmentInitializer initializer = new();
|
||||||
|
using StaRuntime runtime = CreateRuntime(initializer);
|
||||||
|
runtime.Start();
|
||||||
|
|
||||||
|
InvalidOperationException exception = await Assert.ThrowsAsync<InvalidOperationException>(
|
||||||
|
() => runtime.InvokeAsync<int>(() => throw new InvalidOperationException("command failed")));
|
||||||
|
|
||||||
|
int threadId = await runtime.InvokeAsync(() => Thread.CurrentThread.ManagedThreadId);
|
||||||
|
Assert.Equal("command failed", exception.Message);
|
||||||
|
Assert.Equal(runtime.StaThreadId, threadId);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task InvokeAsync_AfterShutdown_ReturnsFaultedTask()
|
||||||
|
{
|
||||||
|
RecordingComApartmentInitializer initializer = new();
|
||||||
|
using StaRuntime runtime = CreateRuntime(initializer);
|
||||||
|
runtime.Start();
|
||||||
|
runtime.Shutdown(TimeSpan.FromSeconds(2));
|
||||||
|
|
||||||
|
InvalidOperationException exception = await Assert.ThrowsAsync<InvalidOperationException>(
|
||||||
|
() => runtime.InvokeAsync(() => Thread.CurrentThread.ManagedThreadId));
|
||||||
|
|
||||||
|
Assert.Contains("shutting down", exception.Message);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static StaRuntime CreateRuntime(RecordingComApartmentInitializer initializer)
|
||||||
|
{
|
||||||
|
return new StaRuntime(
|
||||||
|
initializer,
|
||||||
|
new StaMessagePump(),
|
||||||
|
TimeSpan.FromMilliseconds(25));
|
||||||
|
}
|
||||||
|
|
||||||
|
private sealed class StaCommandObservation
|
||||||
|
{
|
||||||
|
public StaCommandObservation(int threadId, ApartmentState apartmentState)
|
||||||
|
{
|
||||||
|
ThreadId = threadId;
|
||||||
|
ApartmentState = apartmentState;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int ThreadId { get; }
|
||||||
|
|
||||||
|
public ApartmentState ApartmentState { get; }
|
||||||
|
}
|
||||||
|
|
||||||
|
private sealed class RecordingComApartmentInitializer : IStaComApartmentInitializer
|
||||||
|
{
|
||||||
|
public int InitializeCount { get; private set; }
|
||||||
|
|
||||||
|
public int UninitializeCount { get; private set; }
|
||||||
|
|
||||||
|
public int? InitializeThreadId { get; private set; }
|
||||||
|
|
||||||
|
public int? UninitializeThreadId { get; private set; }
|
||||||
|
|
||||||
|
public void Initialize()
|
||||||
|
{
|
||||||
|
InitializeCount++;
|
||||||
|
InitializeThreadId = Thread.CurrentThread.ManagedThreadId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void Uninitialize()
|
||||||
|
{
|
||||||
|
UninitializeCount++;
|
||||||
|
UninitializeThreadId = Thread.CurrentThread.ManagedThreadId;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -7,4 +7,6 @@ public enum WorkerExitCode
|
|||||||
InvalidArguments = 2,
|
InvalidArguments = 2,
|
||||||
InvalidProtocolVersion = 3,
|
InvalidProtocolVersion = 3,
|
||||||
MissingNonce = 4,
|
MissingNonce = 4,
|
||||||
|
PipeConnectionFailed = 5,
|
||||||
|
ProtocolViolation = 6,
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,12 @@
|
|||||||
|
using System.Threading;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
using MxGateway.Worker.Bootstrap;
|
||||||
|
|
||||||
|
namespace MxGateway.Worker.Ipc;
|
||||||
|
|
||||||
|
public interface IWorkerPipeClient
|
||||||
|
{
|
||||||
|
Task RunAsync(
|
||||||
|
WorkerOptions options,
|
||||||
|
CancellationToken cancellationToken = default);
|
||||||
|
}
|
||||||
@@ -0,0 +1,33 @@
|
|||||||
|
using System;
|
||||||
|
using MxGateway.Contracts.Proto;
|
||||||
|
|
||||||
|
namespace MxGateway.Worker.Ipc;
|
||||||
|
|
||||||
|
internal static class WorkerEnvelopeValidator
|
||||||
|
{
|
||||||
|
public static void Validate(
|
||||||
|
WorkerEnvelope envelope,
|
||||||
|
WorkerFrameProtocolOptions options)
|
||||||
|
{
|
||||||
|
if (envelope.ProtocolVersion != options.ProtocolVersion)
|
||||||
|
{
|
||||||
|
throw new WorkerFrameProtocolException(
|
||||||
|
WorkerFrameProtocolErrorCode.ProtocolVersionMismatch,
|
||||||
|
$"Worker envelope protocol version {envelope.ProtocolVersion} does not match expected version {options.ProtocolVersion}.");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!string.Equals(envelope.SessionId, options.SessionId, StringComparison.Ordinal))
|
||||||
|
{
|
||||||
|
throw new WorkerFrameProtocolException(
|
||||||
|
WorkerFrameProtocolErrorCode.SessionMismatch,
|
||||||
|
"Worker envelope session id does not match the owning worker session.");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (envelope.BodyCase == WorkerEnvelope.BodyOneofCase.None)
|
||||||
|
{
|
||||||
|
throw new WorkerFrameProtocolException(
|
||||||
|
WorkerFrameProtocolErrorCode.InvalidEnvelope,
|
||||||
|
"Worker envelope must include a typed body.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,15 @@
|
|||||||
|
namespace MxGateway.Worker.Ipc;
|
||||||
|
|
||||||
|
public enum WorkerFrameProtocolErrorCode
|
||||||
|
{
|
||||||
|
Unknown = 0,
|
||||||
|
InvalidConfiguration = 1,
|
||||||
|
EndOfStream = 2,
|
||||||
|
MalformedLength = 3,
|
||||||
|
MessageTooLarge = 4,
|
||||||
|
InvalidEnvelope = 5,
|
||||||
|
ProtocolVersionMismatch = 6,
|
||||||
|
SessionMismatch = 7,
|
||||||
|
NonceMismatch = 8,
|
||||||
|
UnexpectedEnvelopeBody = 9,
|
||||||
|
}
|
||||||
@@ -0,0 +1,25 @@
|
|||||||
|
using System;
|
||||||
|
|
||||||
|
namespace MxGateway.Worker.Ipc;
|
||||||
|
|
||||||
|
public sealed class WorkerFrameProtocolException : Exception
|
||||||
|
{
|
||||||
|
public WorkerFrameProtocolException(
|
||||||
|
WorkerFrameProtocolErrorCode errorCode,
|
||||||
|
string message)
|
||||||
|
: base(message)
|
||||||
|
{
|
||||||
|
ErrorCode = errorCode;
|
||||||
|
}
|
||||||
|
|
||||||
|
public WorkerFrameProtocolException(
|
||||||
|
WorkerFrameProtocolErrorCode errorCode,
|
||||||
|
string message,
|
||||||
|
Exception innerException)
|
||||||
|
: base(message, innerException)
|
||||||
|
{
|
||||||
|
ErrorCode = errorCode;
|
||||||
|
}
|
||||||
|
|
||||||
|
public WorkerFrameProtocolErrorCode ErrorCode { get; }
|
||||||
|
}
|
||||||
@@ -0,0 +1,86 @@
|
|||||||
|
using System;
|
||||||
|
using MxGateway.Contracts;
|
||||||
|
using MxGateway.Worker.Bootstrap;
|
||||||
|
|
||||||
|
namespace MxGateway.Worker.Ipc;
|
||||||
|
|
||||||
|
public sealed class WorkerFrameProtocolOptions
|
||||||
|
{
|
||||||
|
public const int DefaultMaxMessageBytes = 16 * 1024 * 1024;
|
||||||
|
|
||||||
|
public WorkerFrameProtocolOptions(WorkerOptions options)
|
||||||
|
: this(
|
||||||
|
options?.SessionId ?? throw new ArgumentNullException(nameof(options)),
|
||||||
|
options.ProtocolVersion,
|
||||||
|
options.Nonce,
|
||||||
|
DefaultMaxMessageBytes)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
public WorkerFrameProtocolOptions(
|
||||||
|
string sessionId,
|
||||||
|
uint protocolVersion,
|
||||||
|
string nonce)
|
||||||
|
: this(
|
||||||
|
sessionId,
|
||||||
|
protocolVersion,
|
||||||
|
nonce,
|
||||||
|
DefaultMaxMessageBytes)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
public WorkerFrameProtocolOptions(
|
||||||
|
string sessionId,
|
||||||
|
uint protocolVersion,
|
||||||
|
string nonce,
|
||||||
|
int maxMessageBytes)
|
||||||
|
{
|
||||||
|
if (string.IsNullOrWhiteSpace(sessionId))
|
||||||
|
{
|
||||||
|
throw new WorkerFrameProtocolException(
|
||||||
|
WorkerFrameProtocolErrorCode.InvalidConfiguration,
|
||||||
|
"Worker frame protocol requires a session id.");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (protocolVersion == 0)
|
||||||
|
{
|
||||||
|
throw new WorkerFrameProtocolException(
|
||||||
|
WorkerFrameProtocolErrorCode.InvalidConfiguration,
|
||||||
|
"Worker frame protocol requires a non-zero protocol version.");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (protocolVersion != GatewayContractInfo.WorkerProtocolVersion)
|
||||||
|
{
|
||||||
|
throw new WorkerFrameProtocolException(
|
||||||
|
WorkerFrameProtocolErrorCode.ProtocolVersionMismatch,
|
||||||
|
$"Worker frame protocol version {protocolVersion} is not supported.");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (string.IsNullOrWhiteSpace(nonce))
|
||||||
|
{
|
||||||
|
throw new WorkerFrameProtocolException(
|
||||||
|
WorkerFrameProtocolErrorCode.InvalidConfiguration,
|
||||||
|
"Worker frame protocol requires a nonce.");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (maxMessageBytes <= 0)
|
||||||
|
{
|
||||||
|
throw new WorkerFrameProtocolException(
|
||||||
|
WorkerFrameProtocolErrorCode.InvalidConfiguration,
|
||||||
|
"Worker frame protocol max message size must be greater than zero.");
|
||||||
|
}
|
||||||
|
|
||||||
|
SessionId = sessionId;
|
||||||
|
ProtocolVersion = protocolVersion;
|
||||||
|
Nonce = nonce;
|
||||||
|
MaxMessageBytes = maxMessageBytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
public string SessionId { get; }
|
||||||
|
|
||||||
|
public uint ProtocolVersion { get; }
|
||||||
|
|
||||||
|
public string Nonce { get; }
|
||||||
|
|
||||||
|
public int MaxMessageBytes { get; }
|
||||||
|
}
|
||||||
@@ -0,0 +1,93 @@
|
|||||||
|
using System;
|
||||||
|
using System.IO;
|
||||||
|
using System.Threading;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
using Google.Protobuf;
|
||||||
|
using MxGateway.Contracts.Proto;
|
||||||
|
|
||||||
|
namespace MxGateway.Worker.Ipc;
|
||||||
|
|
||||||
|
public sealed class WorkerFrameReader
|
||||||
|
{
|
||||||
|
private readonly WorkerFrameProtocolOptions _options;
|
||||||
|
private readonly Stream _stream;
|
||||||
|
|
||||||
|
public WorkerFrameReader(
|
||||||
|
Stream stream,
|
||||||
|
WorkerFrameProtocolOptions options)
|
||||||
|
{
|
||||||
|
_stream = stream ?? throw new ArgumentNullException(nameof(stream));
|
||||||
|
_options = options ?? throw new ArgumentNullException(nameof(options));
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task<WorkerEnvelope> ReadAsync(CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
byte[] lengthPrefix = new byte[sizeof(uint)];
|
||||||
|
await ReadExactlyOrThrowAsync(lengthPrefix, cancellationToken).ConfigureAwait(false);
|
||||||
|
|
||||||
|
uint payloadLength = ReadUInt32LittleEndian(lengthPrefix);
|
||||||
|
if (payloadLength == 0)
|
||||||
|
{
|
||||||
|
throw new WorkerFrameProtocolException(
|
||||||
|
WorkerFrameProtocolErrorCode.MalformedLength,
|
||||||
|
"Worker frame payload length must be greater than zero.");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (payloadLength > _options.MaxMessageBytes)
|
||||||
|
{
|
||||||
|
throw new WorkerFrameProtocolException(
|
||||||
|
WorkerFrameProtocolErrorCode.MessageTooLarge,
|
||||||
|
$"Worker frame payload length {payloadLength} exceeds the configured maximum of {_options.MaxMessageBytes} bytes.");
|
||||||
|
}
|
||||||
|
|
||||||
|
byte[] payload = new byte[payloadLength];
|
||||||
|
await ReadExactlyOrThrowAsync(payload, cancellationToken).ConfigureAwait(false);
|
||||||
|
|
||||||
|
WorkerEnvelope envelope;
|
||||||
|
try
|
||||||
|
{
|
||||||
|
envelope = WorkerEnvelope.Parser.ParseFrom(payload);
|
||||||
|
}
|
||||||
|
catch (InvalidProtocolBufferException exception)
|
||||||
|
{
|
||||||
|
throw new WorkerFrameProtocolException(
|
||||||
|
WorkerFrameProtocolErrorCode.InvalidEnvelope,
|
||||||
|
"Worker frame payload is not a valid WorkerEnvelope protobuf message.",
|
||||||
|
exception);
|
||||||
|
}
|
||||||
|
|
||||||
|
WorkerEnvelopeValidator.Validate(envelope, _options);
|
||||||
|
|
||||||
|
return envelope;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static uint ReadUInt32LittleEndian(byte[] buffer)
|
||||||
|
{
|
||||||
|
return (uint)buffer[0]
|
||||||
|
| ((uint)buffer[1] << 8)
|
||||||
|
| ((uint)buffer[2] << 16)
|
||||||
|
| ((uint)buffer[3] << 24);
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task ReadExactlyOrThrowAsync(
|
||||||
|
byte[] buffer,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
int offset = 0;
|
||||||
|
while (offset < buffer.Length)
|
||||||
|
{
|
||||||
|
int bytesRead = await _stream
|
||||||
|
.ReadAsync(buffer, offset, buffer.Length - offset, cancellationToken)
|
||||||
|
.ConfigureAwait(false);
|
||||||
|
|
||||||
|
if (bytesRead == 0)
|
||||||
|
{
|
||||||
|
throw new WorkerFrameProtocolException(
|
||||||
|
WorkerFrameProtocolErrorCode.EndOfStream,
|
||||||
|
"Worker frame ended before the expected number of bytes were read.");
|
||||||
|
}
|
||||||
|
|
||||||
|
offset += bytesRead;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,76 @@
|
|||||||
|
using System;
|
||||||
|
using System.IO;
|
||||||
|
using System.Threading;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
using Google.Protobuf;
|
||||||
|
using MxGateway.Contracts.Proto;
|
||||||
|
|
||||||
|
namespace MxGateway.Worker.Ipc;
|
||||||
|
|
||||||
|
public sealed class WorkerFrameWriter
|
||||||
|
{
|
||||||
|
private readonly WorkerFrameProtocolOptions _options;
|
||||||
|
private readonly SemaphoreSlim _writeLock = new(1, 1);
|
||||||
|
private readonly Stream _stream;
|
||||||
|
|
||||||
|
public WorkerFrameWriter(
|
||||||
|
Stream stream,
|
||||||
|
WorkerFrameProtocolOptions options)
|
||||||
|
{
|
||||||
|
_stream = stream ?? throw new ArgumentNullException(nameof(stream));
|
||||||
|
_options = options ?? throw new ArgumentNullException(nameof(options));
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task WriteAsync(
|
||||||
|
WorkerEnvelope envelope,
|
||||||
|
CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
if (envelope is null)
|
||||||
|
{
|
||||||
|
throw new ArgumentNullException(nameof(envelope));
|
||||||
|
}
|
||||||
|
|
||||||
|
WorkerEnvelopeValidator.Validate(envelope, _options);
|
||||||
|
|
||||||
|
int payloadLength = envelope.CalculateSize();
|
||||||
|
if (payloadLength == 0)
|
||||||
|
{
|
||||||
|
throw new WorkerFrameProtocolException(
|
||||||
|
WorkerFrameProtocolErrorCode.InvalidEnvelope,
|
||||||
|
"Worker envelope cannot serialize to an empty payload.");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (payloadLength > _options.MaxMessageBytes)
|
||||||
|
{
|
||||||
|
throw new WorkerFrameProtocolException(
|
||||||
|
WorkerFrameProtocolErrorCode.MessageTooLarge,
|
||||||
|
$"Worker envelope payload length {payloadLength} exceeds the configured maximum of {_options.MaxMessageBytes} bytes.");
|
||||||
|
}
|
||||||
|
|
||||||
|
byte[] payload = envelope.ToByteArray();
|
||||||
|
byte[] lengthPrefix = new byte[sizeof(uint)];
|
||||||
|
WriteUInt32LittleEndian(lengthPrefix, (uint)payloadLength);
|
||||||
|
|
||||||
|
await _writeLock.WaitAsync(cancellationToken).ConfigureAwait(false);
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await _stream.WriteAsync(lengthPrefix, 0, lengthPrefix.Length, cancellationToken).ConfigureAwait(false);
|
||||||
|
await _stream.WriteAsync(payload, 0, payload.Length, cancellationToken).ConfigureAwait(false);
|
||||||
|
await _stream.FlushAsync(cancellationToken).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_writeLock.Release();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void WriteUInt32LittleEndian(
|
||||||
|
byte[] buffer,
|
||||||
|
uint value)
|
||||||
|
{
|
||||||
|
buffer[0] = (byte)value;
|
||||||
|
buffer[1] = (byte)(value >> 8);
|
||||||
|
buffer[2] = (byte)(value >> 16);
|
||||||
|
buffer[3] = (byte)(value >> 24);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,67 @@
|
|||||||
|
using System;
|
||||||
|
using System.IO.Pipes;
|
||||||
|
using System.Threading;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
using MxGateway.Worker.Bootstrap;
|
||||||
|
|
||||||
|
namespace MxGateway.Worker.Ipc;
|
||||||
|
|
||||||
|
public sealed class WorkerPipeClient : IWorkerPipeClient
|
||||||
|
{
|
||||||
|
public const int DefaultConnectTimeoutMilliseconds = 30000;
|
||||||
|
|
||||||
|
private readonly int _connectTimeoutMilliseconds;
|
||||||
|
|
||||||
|
public WorkerPipeClient()
|
||||||
|
: this(DefaultConnectTimeoutMilliseconds)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
public WorkerPipeClient(int connectTimeoutMilliseconds)
|
||||||
|
{
|
||||||
|
if (connectTimeoutMilliseconds <= 0)
|
||||||
|
{
|
||||||
|
throw new ArgumentOutOfRangeException(
|
||||||
|
nameof(connectTimeoutMilliseconds),
|
||||||
|
"Worker pipe connect timeout must be greater than zero.");
|
||||||
|
}
|
||||||
|
|
||||||
|
_connectTimeoutMilliseconds = connectTimeoutMilliseconds;
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task RunAsync(
|
||||||
|
WorkerOptions options,
|
||||||
|
CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
if (options is null)
|
||||||
|
{
|
||||||
|
throw new ArgumentNullException(nameof(options));
|
||||||
|
}
|
||||||
|
|
||||||
|
WorkerFrameProtocolOptions frameOptions = new(options);
|
||||||
|
|
||||||
|
using NamedPipeClientStream pipe = new(
|
||||||
|
".",
|
||||||
|
options.PipeName,
|
||||||
|
PipeDirection.InOut,
|
||||||
|
PipeOptions.Asynchronous);
|
||||||
|
|
||||||
|
await ConnectAsync(pipe, cancellationToken).ConfigureAwait(false);
|
||||||
|
|
||||||
|
WorkerPipeSession session = new(pipe, frameOptions);
|
||||||
|
await session.CompleteStartupHandshakeAsync(cancellationToken).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Task ConnectAsync(
|
||||||
|
NamedPipeClientStream pipe,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
return Task.Run(
|
||||||
|
() =>
|
||||||
|
{
|
||||||
|
cancellationToken.ThrowIfCancellationRequested();
|
||||||
|
pipe.Connect(_connectTimeoutMilliseconds);
|
||||||
|
},
|
||||||
|
cancellationToken);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,218 @@
|
|||||||
|
using System;
|
||||||
|
using System.Diagnostics;
|
||||||
|
using System.IO;
|
||||||
|
using System.Threading;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
using Google.Protobuf.WellKnownTypes;
|
||||||
|
using MxGateway.Contracts.Proto;
|
||||||
|
using MxGateway.Worker.MxAccess;
|
||||||
|
|
||||||
|
namespace MxGateway.Worker.Ipc;
|
||||||
|
|
||||||
|
public sealed class WorkerPipeSession
|
||||||
|
{
|
||||||
|
private readonly WorkerFrameProtocolOptions _options;
|
||||||
|
private readonly Func<int> _processIdProvider;
|
||||||
|
private readonly WorkerFrameReader _reader;
|
||||||
|
private readonly WorkerFrameWriter _writer;
|
||||||
|
private long _nextSequence;
|
||||||
|
|
||||||
|
public WorkerPipeSession(
|
||||||
|
Stream stream,
|
||||||
|
WorkerFrameProtocolOptions options)
|
||||||
|
: this(
|
||||||
|
new WorkerFrameReader(stream, options),
|
||||||
|
new WorkerFrameWriter(stream, options),
|
||||||
|
options,
|
||||||
|
() => Process.GetCurrentProcess().Id)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
public WorkerPipeSession(
|
||||||
|
WorkerFrameReader reader,
|
||||||
|
WorkerFrameWriter writer,
|
||||||
|
WorkerFrameProtocolOptions options,
|
||||||
|
Func<int> processIdProvider)
|
||||||
|
{
|
||||||
|
_reader = reader ?? throw new ArgumentNullException(nameof(reader));
|
||||||
|
_writer = writer ?? throw new ArgumentNullException(nameof(writer));
|
||||||
|
_options = options ?? throw new ArgumentNullException(nameof(options));
|
||||||
|
_processIdProvider = processIdProvider ?? throw new ArgumentNullException(nameof(processIdProvider));
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task CompleteStartupHandshakeAsync(CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
return CompleteStartupHandshakeAsync(_ => Task.CompletedTask, cancellationToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task CompleteStartupHandshakeAsync(
|
||||||
|
Func<CancellationToken, Task> initializeMxAccessAsync,
|
||||||
|
CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
if (initializeMxAccessAsync is null)
|
||||||
|
{
|
||||||
|
throw new ArgumentNullException(nameof(initializeMxAccessAsync));
|
||||||
|
}
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
WorkerEnvelope envelope = await _reader.ReadAsync(cancellationToken).ConfigureAwait(false);
|
||||||
|
ValidateGatewayHello(envelope);
|
||||||
|
|
||||||
|
await WriteWorkerHelloAsync(cancellationToken).ConfigureAwait(false);
|
||||||
|
await initializeMxAccessAsync(cancellationToken).ConfigureAwait(false);
|
||||||
|
await WriteWorkerReadyAsync(cancellationToken).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
catch (WorkerFrameProtocolException exception)
|
||||||
|
{
|
||||||
|
await TryWriteFaultAsync(exception, cancellationToken).ConfigureAwait(false);
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void ValidateGatewayHello(WorkerEnvelope envelope)
|
||||||
|
{
|
||||||
|
if (envelope.BodyCase != WorkerEnvelope.BodyOneofCase.GatewayHello)
|
||||||
|
{
|
||||||
|
throw new WorkerFrameProtocolException(
|
||||||
|
WorkerFrameProtocolErrorCode.UnexpectedEnvelopeBody,
|
||||||
|
"Worker expected GatewayHello during startup handshake.");
|
||||||
|
}
|
||||||
|
|
||||||
|
GatewayHello gatewayHello = envelope.GatewayHello;
|
||||||
|
if (gatewayHello.SupportedProtocolVersion != _options.ProtocolVersion)
|
||||||
|
{
|
||||||
|
throw new WorkerFrameProtocolException(
|
||||||
|
WorkerFrameProtocolErrorCode.ProtocolVersionMismatch,
|
||||||
|
$"GatewayHello supported protocol version {gatewayHello.SupportedProtocolVersion} does not match expected version {_options.ProtocolVersion}.");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!string.Equals(gatewayHello.Nonce, _options.Nonce, StringComparison.Ordinal))
|
||||||
|
{
|
||||||
|
throw new WorkerFrameProtocolException(
|
||||||
|
WorkerFrameProtocolErrorCode.NonceMismatch,
|
||||||
|
"GatewayHello nonce does not match the worker launch nonce.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Task WriteWorkerHelloAsync(CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
return _writer.WriteAsync(
|
||||||
|
CreateEnvelope(new WorkerHello
|
||||||
|
{
|
||||||
|
ProtocolVersion = _options.ProtocolVersion,
|
||||||
|
Nonce = _options.Nonce,
|
||||||
|
WorkerProcessId = _processIdProvider(),
|
||||||
|
WorkerVersion = typeof(WorkerPipeSession).Assembly.GetName().Version?.ToString() ?? string.Empty,
|
||||||
|
}),
|
||||||
|
cancellationToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Task WriteWorkerReadyAsync(CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
return _writer.WriteAsync(
|
||||||
|
CreateEnvelope(new WorkerReady
|
||||||
|
{
|
||||||
|
WorkerProcessId = _processIdProvider(),
|
||||||
|
MxaccessProgid = MxAccessInteropInfo.ProgId,
|
||||||
|
MxaccessClsid = MxAccessInteropInfo.Clsid,
|
||||||
|
ReadyTimestamp = Timestamp.FromDateTime(DateTime.UtcNow),
|
||||||
|
}),
|
||||||
|
cancellationToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task TryWriteFaultAsync(
|
||||||
|
WorkerFrameProtocolException exception,
|
||||||
|
CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await _writer
|
||||||
|
.WriteAsync(CreateEnvelope(CreateFault(exception)), cancellationToken)
|
||||||
|
.ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
catch (Exception faultWriteException) when (
|
||||||
|
faultWriteException is IOException
|
||||||
|
|| faultWriteException is ObjectDisposedException
|
||||||
|
|| faultWriteException is WorkerFrameProtocolException)
|
||||||
|
{
|
||||||
|
// The original protocol failure is the actionable error.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private WorkerEnvelope CreateEnvelope(WorkerHello hello)
|
||||||
|
{
|
||||||
|
return CreateBaseEnvelope(hello);
|
||||||
|
}
|
||||||
|
|
||||||
|
private WorkerEnvelope CreateEnvelope(WorkerReady ready)
|
||||||
|
{
|
||||||
|
return CreateBaseEnvelope(ready);
|
||||||
|
}
|
||||||
|
|
||||||
|
private WorkerEnvelope CreateEnvelope(WorkerFault fault)
|
||||||
|
{
|
||||||
|
return CreateBaseEnvelope(fault);
|
||||||
|
}
|
||||||
|
|
||||||
|
private WorkerEnvelope CreateBaseEnvelope(WorkerHello body)
|
||||||
|
{
|
||||||
|
WorkerEnvelope envelope = CreateBaseEnvelope();
|
||||||
|
envelope.WorkerHello = body;
|
||||||
|
return envelope;
|
||||||
|
}
|
||||||
|
|
||||||
|
private WorkerEnvelope CreateBaseEnvelope(WorkerReady body)
|
||||||
|
{
|
||||||
|
WorkerEnvelope envelope = CreateBaseEnvelope();
|
||||||
|
envelope.WorkerReady = body;
|
||||||
|
return envelope;
|
||||||
|
}
|
||||||
|
|
||||||
|
private WorkerEnvelope CreateBaseEnvelope(WorkerFault body)
|
||||||
|
{
|
||||||
|
WorkerEnvelope envelope = CreateBaseEnvelope();
|
||||||
|
envelope.WorkerFault = body;
|
||||||
|
return envelope;
|
||||||
|
}
|
||||||
|
|
||||||
|
private WorkerEnvelope CreateBaseEnvelope()
|
||||||
|
{
|
||||||
|
return new WorkerEnvelope
|
||||||
|
{
|
||||||
|
ProtocolVersion = _options.ProtocolVersion,
|
||||||
|
SessionId = _options.SessionId,
|
||||||
|
Sequence = NextSequence(),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
private ulong NextSequence()
|
||||||
|
{
|
||||||
|
return unchecked((ulong)Interlocked.Increment(ref _nextSequence));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static WorkerFault CreateFault(WorkerFrameProtocolException exception)
|
||||||
|
{
|
||||||
|
return new WorkerFault
|
||||||
|
{
|
||||||
|
Category = MapFaultCategory(exception.ErrorCode),
|
||||||
|
ExceptionType = exception.GetType().FullName ?? string.Empty,
|
||||||
|
DiagnosticMessage = exception.Message,
|
||||||
|
ProtocolStatus = new ProtocolStatus
|
||||||
|
{
|
||||||
|
Code = ProtocolStatusCode.ProtocolViolation,
|
||||||
|
Message = exception.Message,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
private static WorkerFaultCategory MapFaultCategory(WorkerFrameProtocolErrorCode errorCode)
|
||||||
|
{
|
||||||
|
return errorCode switch
|
||||||
|
{
|
||||||
|
WorkerFrameProtocolErrorCode.ProtocolVersionMismatch => WorkerFaultCategory.ProtocolMismatch,
|
||||||
|
WorkerFrameProtocolErrorCode.EndOfStream => WorkerFaultCategory.PipeDisconnected,
|
||||||
|
_ => WorkerFaultCategory.ProtocolViolation,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,8 @@
|
|||||||
|
namespace MxGateway.Worker.Sta;
|
||||||
|
|
||||||
|
public interface IStaComApartmentInitializer
|
||||||
|
{
|
||||||
|
void Initialize();
|
||||||
|
|
||||||
|
void Uninitialize();
|
||||||
|
}
|
||||||
@@ -0,0 +1,8 @@
|
|||||||
|
namespace MxGateway.Worker.Sta;
|
||||||
|
|
||||||
|
internal interface IStaWorkItem
|
||||||
|
{
|
||||||
|
void CancelBeforeExecution();
|
||||||
|
|
||||||
|
void Execute();
|
||||||
|
}
|
||||||
@@ -0,0 +1,31 @@
|
|||||||
|
using System;
|
||||||
|
using System.Runtime.InteropServices;
|
||||||
|
|
||||||
|
namespace MxGateway.Worker.Sta;
|
||||||
|
|
||||||
|
public sealed class StaComApartmentInitializer : IStaComApartmentInitializer
|
||||||
|
{
|
||||||
|
private const uint CoInitializeApartmentThreaded = 0x2;
|
||||||
|
private const int SOk = 0;
|
||||||
|
private const int SFalse = 1;
|
||||||
|
|
||||||
|
public void Initialize()
|
||||||
|
{
|
||||||
|
int hresult = CoInitializeEx(IntPtr.Zero, CoInitializeApartmentThreaded);
|
||||||
|
if (hresult != SOk && hresult != SFalse)
|
||||||
|
{
|
||||||
|
throw new COMException("Failed to initialize the worker STA COM apartment.", hresult);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void Uninitialize()
|
||||||
|
{
|
||||||
|
CoUninitialize();
|
||||||
|
}
|
||||||
|
|
||||||
|
[DllImport("ole32.dll")]
|
||||||
|
private static extern int CoInitializeEx(IntPtr reserved, uint coInit);
|
||||||
|
|
||||||
|
[DllImport("ole32.dll")]
|
||||||
|
private static extern void CoUninitialize();
|
||||||
|
}
|
||||||
@@ -0,0 +1,111 @@
|
|||||||
|
using System;
|
||||||
|
using System.Runtime.InteropServices;
|
||||||
|
using System.Threading;
|
||||||
|
using Microsoft.Win32.SafeHandles;
|
||||||
|
|
||||||
|
namespace MxGateway.Worker.Sta;
|
||||||
|
|
||||||
|
public sealed class StaMessagePump
|
||||||
|
{
|
||||||
|
private const uint Infinite = 0xFFFFFFFF;
|
||||||
|
private const uint MsgWaitFailed = 0xFFFFFFFF;
|
||||||
|
private const uint MwmoInputAvailable = 0x0004;
|
||||||
|
private const uint PmRemove = 0x0001;
|
||||||
|
private const uint QsAllInput = 0x04FF;
|
||||||
|
|
||||||
|
public void WaitForWorkOrMessages(WaitHandle commandWakeEvent, TimeSpan timeout)
|
||||||
|
{
|
||||||
|
if (commandWakeEvent is null)
|
||||||
|
{
|
||||||
|
throw new ArgumentNullException(nameof(commandWakeEvent));
|
||||||
|
}
|
||||||
|
|
||||||
|
uint timeoutMilliseconds = ToTimeoutMilliseconds(timeout);
|
||||||
|
|
||||||
|
SafeWaitHandle safeHandle = commandWakeEvent.SafeWaitHandle;
|
||||||
|
IntPtr[] handles = [safeHandle.DangerousGetHandle()];
|
||||||
|
uint result = MsgWaitForMultipleObjectsEx(
|
||||||
|
(uint)handles.Length,
|
||||||
|
handles,
|
||||||
|
timeoutMilliseconds,
|
||||||
|
QsAllInput,
|
||||||
|
MwmoInputAvailable);
|
||||||
|
|
||||||
|
if (result == MsgWaitFailed)
|
||||||
|
{
|
||||||
|
throw new InvalidOperationException(
|
||||||
|
"The worker STA message pump failed while waiting for command work or Windows messages.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public int PumpPendingMessages()
|
||||||
|
{
|
||||||
|
int pumpedMessages = 0;
|
||||||
|
|
||||||
|
while (PeekMessage(out NativeMessage message, IntPtr.Zero, 0, 0, PmRemove))
|
||||||
|
{
|
||||||
|
TranslateMessage(ref message);
|
||||||
|
DispatchMessage(ref message);
|
||||||
|
pumpedMessages++;
|
||||||
|
}
|
||||||
|
|
||||||
|
return pumpedMessages;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static uint ToTimeoutMilliseconds(TimeSpan timeout)
|
||||||
|
{
|
||||||
|
if (timeout == Timeout.InfiniteTimeSpan)
|
||||||
|
{
|
||||||
|
return Infinite;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (timeout <= TimeSpan.Zero)
|
||||||
|
{
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
return timeout.TotalMilliseconds >= uint.MaxValue
|
||||||
|
? uint.MaxValue - 1
|
||||||
|
: (uint)Math.Ceiling(timeout.TotalMilliseconds);
|
||||||
|
}
|
||||||
|
|
||||||
|
[DllImport("user32.dll", SetLastError = true)]
|
||||||
|
private static extern uint MsgWaitForMultipleObjectsEx(
|
||||||
|
uint count,
|
||||||
|
IntPtr[] handles,
|
||||||
|
uint milliseconds,
|
||||||
|
uint wakeMask,
|
||||||
|
uint flags);
|
||||||
|
|
||||||
|
[DllImport("user32.dll", SetLastError = true)]
|
||||||
|
private static extern bool PeekMessage(
|
||||||
|
out NativeMessage message,
|
||||||
|
IntPtr windowHandle,
|
||||||
|
uint messageFilterMin,
|
||||||
|
uint messageFilterMax,
|
||||||
|
uint removeMessage);
|
||||||
|
|
||||||
|
[DllImport("user32.dll")]
|
||||||
|
private static extern bool TranslateMessage(ref NativeMessage message);
|
||||||
|
|
||||||
|
[DllImport("user32.dll")]
|
||||||
|
private static extern IntPtr DispatchMessage(ref NativeMessage message);
|
||||||
|
|
||||||
|
[StructLayout(LayoutKind.Sequential)]
|
||||||
|
private struct NativeMessage
|
||||||
|
{
|
||||||
|
public IntPtr WindowHandle;
|
||||||
|
public uint Message;
|
||||||
|
public UIntPtr WParam;
|
||||||
|
public IntPtr LParam;
|
||||||
|
public uint Time;
|
||||||
|
public NativePoint Point;
|
||||||
|
}
|
||||||
|
|
||||||
|
[StructLayout(LayoutKind.Sequential)]
|
||||||
|
private struct NativePoint
|
||||||
|
{
|
||||||
|
public int X;
|
||||||
|
public int Y;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,267 @@
|
|||||||
|
using System;
|
||||||
|
using System.Collections.Concurrent;
|
||||||
|
using System.Threading;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
|
||||||
|
namespace MxGateway.Worker.Sta;
|
||||||
|
|
||||||
|
public sealed class StaRuntime : IDisposable
|
||||||
|
{
|
||||||
|
private readonly IStaComApartmentInitializer comApartmentInitializer;
|
||||||
|
private readonly StaMessagePump messagePump;
|
||||||
|
private readonly ConcurrentQueue<IStaWorkItem> commandQueue = new();
|
||||||
|
private readonly AutoResetEvent commandWakeEvent = new(false);
|
||||||
|
private readonly ManualResetEventSlim startedEvent = new(false);
|
||||||
|
private readonly ManualResetEventSlim stoppedEvent = new(false);
|
||||||
|
private readonly object gate = new();
|
||||||
|
private readonly Thread staThread;
|
||||||
|
private readonly TimeSpan idlePumpInterval;
|
||||||
|
private bool disposed;
|
||||||
|
private bool startRequested;
|
||||||
|
private bool shutdownRequested;
|
||||||
|
private Exception? startupException;
|
||||||
|
private long lastActivityUtcTicks;
|
||||||
|
private bool comInitialized;
|
||||||
|
|
||||||
|
public StaRuntime()
|
||||||
|
: this(new StaComApartmentInitializer(), new StaMessagePump(), TimeSpan.FromMilliseconds(50))
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
public StaRuntime(
|
||||||
|
IStaComApartmentInitializer comApartmentInitializer,
|
||||||
|
StaMessagePump messagePump,
|
||||||
|
TimeSpan idlePumpInterval)
|
||||||
|
{
|
||||||
|
this.comApartmentInitializer = comApartmentInitializer
|
||||||
|
?? throw new ArgumentNullException(nameof(comApartmentInitializer));
|
||||||
|
this.messagePump = messagePump ?? throw new ArgumentNullException(nameof(messagePump));
|
||||||
|
|
||||||
|
if (idlePumpInterval <= TimeSpan.Zero)
|
||||||
|
{
|
||||||
|
throw new ArgumentOutOfRangeException(
|
||||||
|
nameof(idlePumpInterval),
|
||||||
|
"The idle pump interval must be greater than zero.");
|
||||||
|
}
|
||||||
|
|
||||||
|
this.idlePumpInterval = idlePumpInterval;
|
||||||
|
lastActivityUtcTicks = DateTimeOffset.UtcNow.UtcTicks;
|
||||||
|
staThread = new Thread(ThreadMain)
|
||||||
|
{
|
||||||
|
IsBackground = true,
|
||||||
|
Name = "MxGateway.Worker.STA"
|
||||||
|
};
|
||||||
|
staThread.SetApartmentState(ApartmentState.STA);
|
||||||
|
}
|
||||||
|
|
||||||
|
public int? StaThreadId { get; private set; }
|
||||||
|
|
||||||
|
public DateTimeOffset LastActivityUtc =>
|
||||||
|
new(new DateTime(Volatile.Read(ref lastActivityUtcTicks), DateTimeKind.Utc));
|
||||||
|
|
||||||
|
public bool IsRunning => startedEvent.IsSet && !stoppedEvent.IsSet;
|
||||||
|
|
||||||
|
public void Start()
|
||||||
|
{
|
||||||
|
ThrowIfDisposed();
|
||||||
|
|
||||||
|
lock (gate)
|
||||||
|
{
|
||||||
|
if (shutdownRequested)
|
||||||
|
{
|
||||||
|
throw new InvalidOperationException("The worker STA runtime is shutting down.");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!startRequested)
|
||||||
|
{
|
||||||
|
startRequested = true;
|
||||||
|
staThread.Start();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
startedEvent.Wait();
|
||||||
|
if (startupException is not null)
|
||||||
|
{
|
||||||
|
throw new InvalidOperationException(
|
||||||
|
"The worker STA runtime failed to initialize.",
|
||||||
|
startupException);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task InvokeAsync(Action command, CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
if (command is null)
|
||||||
|
{
|
||||||
|
throw new ArgumentNullException(nameof(command));
|
||||||
|
}
|
||||||
|
|
||||||
|
return InvokeAsync(
|
||||||
|
() =>
|
||||||
|
{
|
||||||
|
command();
|
||||||
|
return true;
|
||||||
|
},
|
||||||
|
cancellationToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task<T> InvokeAsync<T>(Func<T> command, CancellationToken cancellationToken = default)
|
||||||
|
{
|
||||||
|
if (command is null)
|
||||||
|
{
|
||||||
|
throw new ArgumentNullException(nameof(command));
|
||||||
|
}
|
||||||
|
|
||||||
|
ThrowIfDisposed();
|
||||||
|
|
||||||
|
if (cancellationToken.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
return Task.FromCanceled<T>(cancellationToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
StaWorkItem<T> workItem = new(command, cancellationToken);
|
||||||
|
|
||||||
|
lock (gate)
|
||||||
|
{
|
||||||
|
if (shutdownRequested)
|
||||||
|
{
|
||||||
|
return Task.FromException<T>(
|
||||||
|
new InvalidOperationException("The worker STA runtime is shutting down."));
|
||||||
|
}
|
||||||
|
|
||||||
|
commandQueue.Enqueue(workItem);
|
||||||
|
}
|
||||||
|
|
||||||
|
commandWakeEvent.Set();
|
||||||
|
return workItem.Task;
|
||||||
|
}
|
||||||
|
|
||||||
|
public bool Shutdown(TimeSpan timeout)
|
||||||
|
{
|
||||||
|
if (timeout < TimeSpan.Zero && timeout != Timeout.InfiniteTimeSpan)
|
||||||
|
{
|
||||||
|
throw new ArgumentOutOfRangeException(nameof(timeout));
|
||||||
|
}
|
||||||
|
|
||||||
|
lock (gate)
|
||||||
|
{
|
||||||
|
shutdownRequested = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
commandWakeEvent.Set();
|
||||||
|
|
||||||
|
if (!startedEvent.IsSet && !staThread.IsAlive)
|
||||||
|
{
|
||||||
|
CancelQueuedCommands();
|
||||||
|
stoppedEvent.Set();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool stopped = stoppedEvent.Wait(timeout);
|
||||||
|
if (stopped)
|
||||||
|
{
|
||||||
|
CancelQueuedCommands();
|
||||||
|
}
|
||||||
|
|
||||||
|
return stopped;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void Dispose()
|
||||||
|
{
|
||||||
|
if (disposed)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool stopped = Shutdown(TimeSpan.FromSeconds(5));
|
||||||
|
if (stopped)
|
||||||
|
{
|
||||||
|
commandWakeEvent.Dispose();
|
||||||
|
startedEvent.Dispose();
|
||||||
|
stoppedEvent.Dispose();
|
||||||
|
}
|
||||||
|
|
||||||
|
disposed = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void ThreadMain()
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
StaThreadId = Thread.CurrentThread.ManagedThreadId;
|
||||||
|
comApartmentInitializer.Initialize();
|
||||||
|
comInitialized = true;
|
||||||
|
MarkActivity();
|
||||||
|
startedEvent.Set();
|
||||||
|
|
||||||
|
while (!IsShutdownRequested())
|
||||||
|
{
|
||||||
|
ProcessQueuedCommands();
|
||||||
|
messagePump.WaitForWorkOrMessages(commandWakeEvent, idlePumpInterval);
|
||||||
|
messagePump.PumpPendingMessages();
|
||||||
|
MarkActivity();
|
||||||
|
}
|
||||||
|
|
||||||
|
ProcessQueuedCommands();
|
||||||
|
}
|
||||||
|
catch (Exception exception)
|
||||||
|
{
|
||||||
|
startupException = exception;
|
||||||
|
startedEvent.Set();
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
CancelQueuedCommands();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
if (comInitialized)
|
||||||
|
{
|
||||||
|
comApartmentInitializer.Uninitialize();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
MarkActivity();
|
||||||
|
stoppedEvent.Set();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void ProcessQueuedCommands()
|
||||||
|
{
|
||||||
|
while (commandQueue.TryDequeue(out IStaWorkItem? workItem))
|
||||||
|
{
|
||||||
|
MarkActivity();
|
||||||
|
workItem.Execute();
|
||||||
|
MarkActivity();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void CancelQueuedCommands()
|
||||||
|
{
|
||||||
|
while (commandQueue.TryDequeue(out IStaWorkItem? workItem))
|
||||||
|
{
|
||||||
|
workItem.CancelBeforeExecution();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private bool IsShutdownRequested()
|
||||||
|
{
|
||||||
|
lock (gate)
|
||||||
|
{
|
||||||
|
return shutdownRequested;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void MarkActivity()
|
||||||
|
{
|
||||||
|
Volatile.Write(ref lastActivityUtcTicks, DateTimeOffset.UtcNow.UtcTicks);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void ThrowIfDisposed()
|
||||||
|
{
|
||||||
|
if (disposed)
|
||||||
|
{
|
||||||
|
throw new ObjectDisposedException(nameof(StaRuntime));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,71 @@
|
|||||||
|
using System;
|
||||||
|
using System.Threading;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
|
||||||
|
namespace MxGateway.Worker.Sta;
|
||||||
|
|
||||||
|
internal sealed class StaWorkItem<T> : IStaWorkItem
|
||||||
|
{
|
||||||
|
private readonly Func<T> command;
|
||||||
|
private readonly CancellationToken cancellationToken;
|
||||||
|
private readonly CancellationTokenRegistration cancellationRegistration;
|
||||||
|
private int started;
|
||||||
|
|
||||||
|
public StaWorkItem(Func<T> command, CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
this.command = command ?? throw new ArgumentNullException(nameof(command));
|
||||||
|
this.cancellationToken = cancellationToken;
|
||||||
|
Completion = new TaskCompletionSource<T>(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||||
|
|
||||||
|
if (cancellationToken.CanBeCanceled)
|
||||||
|
{
|
||||||
|
cancellationRegistration = cancellationToken.Register(
|
||||||
|
() =>
|
||||||
|
{
|
||||||
|
if (Interlocked.CompareExchange(ref started, 1, 0) == 0)
|
||||||
|
{
|
||||||
|
Completion.TrySetCanceled(cancellationToken);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task<T> Task => Completion.Task;
|
||||||
|
|
||||||
|
private TaskCompletionSource<T> Completion { get; }
|
||||||
|
|
||||||
|
public void CancelBeforeExecution()
|
||||||
|
{
|
||||||
|
if (Interlocked.CompareExchange(ref started, 1, 0) == 0)
|
||||||
|
{
|
||||||
|
Completion.TrySetCanceled(cancellationToken);
|
||||||
|
cancellationRegistration.Dispose();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void Execute()
|
||||||
|
{
|
||||||
|
if (Interlocked.CompareExchange(ref started, 1, 0) != 0)
|
||||||
|
{
|
||||||
|
cancellationRegistration.Dispose();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
cancellationRegistration.Dispose();
|
||||||
|
|
||||||
|
if (cancellationToken.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
Completion.TrySetCanceled(cancellationToken);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
Completion.TrySetResult(command());
|
||||||
|
}
|
||||||
|
catch (Exception exception)
|
||||||
|
{
|
||||||
|
Completion.TrySetException(exception);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,6 +1,8 @@
|
|||||||
using System;
|
using System;
|
||||||
using System.Collections.Generic;
|
using System.Collections.Generic;
|
||||||
|
using System.IO;
|
||||||
using MxGateway.Worker.Bootstrap;
|
using MxGateway.Worker.Bootstrap;
|
||||||
|
using MxGateway.Worker.Ipc;
|
||||||
|
|
||||||
namespace MxGateway.Worker;
|
namespace MxGateway.Worker;
|
||||||
|
|
||||||
@@ -11,13 +13,27 @@ public static class WorkerApplication
|
|||||||
return Run(
|
return Run(
|
||||||
args,
|
args,
|
||||||
new EnvironmentVariableWorkerEnvironment(),
|
new EnvironmentVariableWorkerEnvironment(),
|
||||||
new WorkerConsoleLogger(Console.Error));
|
new WorkerConsoleLogger(Console.Error),
|
||||||
|
new WorkerPipeClient());
|
||||||
}
|
}
|
||||||
|
|
||||||
public static int Run(
|
public static int Run(
|
||||||
string[] args,
|
string[] args,
|
||||||
IWorkerEnvironment environment,
|
IWorkerEnvironment environment,
|
||||||
IWorkerLogger logger)
|
IWorkerLogger logger)
|
||||||
|
{
|
||||||
|
return Run(
|
||||||
|
args,
|
||||||
|
environment,
|
||||||
|
logger,
|
||||||
|
new WorkerPipeClient());
|
||||||
|
}
|
||||||
|
|
||||||
|
public static int Run(
|
||||||
|
string[] args,
|
||||||
|
IWorkerEnvironment environment,
|
||||||
|
IWorkerLogger logger,
|
||||||
|
IWorkerPipeClient pipeClient)
|
||||||
{
|
{
|
||||||
if (args is null)
|
if (args is null)
|
||||||
{
|
{
|
||||||
@@ -34,6 +50,11 @@ public static class WorkerApplication
|
|||||||
throw new ArgumentNullException(nameof(logger));
|
throw new ArgumentNullException(nameof(logger));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pipeClient is null)
|
||||||
|
{
|
||||||
|
throw new ArgumentNullException(nameof(pipeClient));
|
||||||
|
}
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
WorkerOptionsParser parser = new(environment);
|
WorkerOptionsParser parser = new(environment);
|
||||||
@@ -61,8 +82,38 @@ public static class WorkerApplication
|
|||||||
["nonce"] = options.Nonce,
|
["nonce"] = options.Nonce,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
pipeClient.RunAsync(options).GetAwaiter().GetResult();
|
||||||
|
|
||||||
|
logger.Information("WorkerPipeHandshakeSucceeded", new Dictionary<string, object?>
|
||||||
|
{
|
||||||
|
["session_id"] = options.SessionId,
|
||||||
|
["pipe_name"] = options.PipeName,
|
||||||
|
["protocol_version"] = options.ProtocolVersion,
|
||||||
|
});
|
||||||
|
|
||||||
return (int)WorkerExitCode.Success;
|
return (int)WorkerExitCode.Success;
|
||||||
}
|
}
|
||||||
|
catch (WorkerFrameProtocolException exception)
|
||||||
|
{
|
||||||
|
logger.Error("WorkerPipeProtocolFailure", new Dictionary<string, object?>
|
||||||
|
{
|
||||||
|
["exit_code"] = WorkerExitCode.ProtocolViolation,
|
||||||
|
["error_code"] = exception.ErrorCode,
|
||||||
|
["exception_type"] = exception.GetType().FullName,
|
||||||
|
});
|
||||||
|
|
||||||
|
return (int)WorkerExitCode.ProtocolViolation;
|
||||||
|
}
|
||||||
|
catch (Exception exception) when (exception is IOException or TimeoutException)
|
||||||
|
{
|
||||||
|
logger.Error("WorkerPipeConnectionFailed", new Dictionary<string, object?>
|
||||||
|
{
|
||||||
|
["exit_code"] = WorkerExitCode.PipeConnectionFailed,
|
||||||
|
["exception_type"] = exception.GetType().FullName,
|
||||||
|
});
|
||||||
|
|
||||||
|
return (int)WorkerExitCode.PipeConnectionFailed;
|
||||||
|
}
|
||||||
catch (Exception exception)
|
catch (Exception exception)
|
||||||
{
|
{
|
||||||
logger.Error("WorkerBootstrapUnexpectedFailure", new Dictionary<string, object?>
|
logger.Error("WorkerBootstrapUnexpectedFailure", new Dictionary<string, object?>
|
||||||
|
|||||||
Reference in New Issue
Block a user