Issue #24: create mxaccess com object on sta

This commit is contained in:
Joseph Doherty
2026-04-26 17:34:01 -04:00
parent 5511609880
commit 451dccf7e3
12 changed files with 607 additions and 13 deletions
@@ -1,5 +1,6 @@
using System.Collections.Generic;
using System.IO;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using MxGateway.Contracts;
@@ -37,6 +38,10 @@ public sealed class WorkerPipeSessionTests
Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerHello, written[0].BodyCase);
Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerReady, written[1].BodyCase);
Assert.Equal(Nonce, written[0].WorkerHello.Nonce);
Assert.Equal(1234, written[1].WorkerReady.WorkerProcessId);
Assert.Equal(MxGateway.Worker.MxAccess.MxAccessInteropInfo.ProgId, written[1].WorkerReady.MxaccessProgid);
Assert.Equal(MxGateway.Worker.MxAccess.MxAccessInteropInfo.Clsid, written[1].WorkerReady.MxaccessClsid);
Assert.NotNull(written[1].WorkerReady.ReadyTimestamp);
}
[Fact]
@@ -117,6 +122,31 @@ public sealed class WorkerPipeSessionTests
Assert.Equal(WorkerFaultCategory.ProtocolViolation, fault.WorkerFault.Category);
}
[Fact]
public async Task CompleteStartupHandshakeAsync_WhenMxAccessCreationFails_WritesFaultInsteadOfReady()
{
const int hresult = unchecked((int)0x80040154);
WorkerFrameProtocolOptions options = CreateOptions();
MemoryStream inbound = new();
await new WorkerFrameWriter(inbound, options).WriteAsync(CreateGatewayHelloEnvelope());
inbound.Position = 0;
MemoryStream outbound = new();
WorkerPipeSession session = CreateSession(inbound, outbound, options);
await Assert.ThrowsAsync<COMException>(
async () => await session.CompleteStartupHandshakeAsync(
_ => Task.FromException<WorkerReady>(new COMException("Class not registered.", hresult))));
WorkerEnvelope[] written = ReadWrittenFrames(outbound, options);
Assert.Equal(2, written.Length);
Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerHello, written[0].BodyCase);
Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerFault, written[1].BodyCase);
Assert.Equal(WorkerFaultCategory.MxaccessCreationFailed, written[1].WorkerFault.Category);
Assert.Equal(hresult, written[1].WorkerFault.Hresult);
Assert.Equal(typeof(COMException).FullName, written[1].WorkerFault.ExceptionType);
Assert.Equal(ProtocolStatusCode.WorkerUnavailable, written[1].WorkerFault.ProtocolStatus.Code);
}
private static WorkerPipeSession CreateSession(
Stream inbound,
Stream outbound,
@@ -0,0 +1,24 @@
using System;
using System.Threading.Tasks;
using MxGateway.Worker.MxAccess;
namespace MxGateway.Worker.Tests.MxAccess;
public sealed class MxAccessLiveComCreationTests
{
[Fact]
public async Task StartAsync_WhenOptedIn_CreatesInstalledMxAccessComObjectOnSta()
{
if (!string.Equals(
Environment.GetEnvironmentVariable("MXGATEWAY_RUN_LIVE_MXACCESS_TESTS"),
"1",
StringComparison.Ordinal))
{
return;
}
using MxAccessStaSession session = new();
await session.StartAsync(workerProcessId: 1234);
}
}
@@ -0,0 +1,133 @@
using System;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using MxGateway.Contracts.Proto;
using MxGateway.Worker.MxAccess;
using MxGateway.Worker.Sta;
namespace MxGateway.Worker.Tests.MxAccess;
public sealed class MxAccessStaSessionTests
{
[Fact]
public async Task StartAsync_CreatesComObjectAndAttachesEventSinkOnStaThread()
{
FakeMxAccessComObjectFactory factory = new();
FakeMxAccessEventSink eventSink = new();
using StaRuntime runtime = CreateRuntime();
using MxAccessStaSession session = new(runtime, factory, eventSink);
WorkerReady ready = await session.StartAsync(workerProcessId: 1234);
Assert.Equal(1234, ready.WorkerProcessId);
Assert.Equal(MxAccessInteropInfo.ProgId, ready.MxaccessProgid);
Assert.Equal(MxAccessInteropInfo.Clsid, ready.MxaccessClsid);
Assert.NotNull(ready.ReadyTimestamp);
Assert.Equal(runtime.StaThreadId, factory.CreateThreadId);
Assert.Equal(runtime.StaThreadId, eventSink.AttachThreadId);
Assert.Equal(ApartmentState.STA, factory.CreateApartmentState);
Assert.Same(factory.CreatedObject, eventSink.AttachedObject);
}
[Fact]
public async Task StartAsync_WhenFactoryFails_MapsCreationExceptionWithHResult()
{
const int hresult = unchecked((int)0x80040154);
FakeMxAccessComObjectFactory factory = new(new COMException("Class not registered.", hresult));
FakeMxAccessEventSink eventSink = new();
using StaRuntime runtime = CreateRuntime();
using MxAccessStaSession session = new(runtime, factory, eventSink);
MxAccessCreationException exception = await Assert.ThrowsAsync<MxAccessCreationException>(
() => session.StartAsync(workerProcessId: 1234));
Assert.Equal(hresult, exception.CapturedHResult);
Assert.Equal(MxAccessInteropInfo.ProgId, exception.AttemptedProgId);
Assert.Equal(MxAccessInteropInfo.Clsid, exception.AttemptedClsid);
Assert.Null(eventSink.AttachedObject);
}
[Fact]
public async Task Dispose_DetachesEventSinkOnStaThread()
{
FakeMxAccessComObjectFactory factory = new();
FakeMxAccessEventSink eventSink = new();
using StaRuntime runtime = CreateRuntime();
MxAccessStaSession session = new(runtime, factory, eventSink);
await session.StartAsync(workerProcessId: 1234);
session.Dispose();
Assert.Equal(runtime.StaThreadId, eventSink.DetachThreadId);
}
private static StaRuntime CreateRuntime()
{
return new StaRuntime(
new NoopComApartmentInitializer(),
new StaMessagePump(),
TimeSpan.FromMilliseconds(25));
}
private sealed class FakeMxAccessComObjectFactory : IMxAccessComObjectFactory
{
private readonly Exception? exception;
public FakeMxAccessComObjectFactory(Exception? exception = null)
{
this.exception = exception;
}
public object CreatedObject { get; } = new();
public int? CreateThreadId { get; private set; }
public ApartmentState? CreateApartmentState { get; private set; }
public object Create()
{
CreateThreadId = Thread.CurrentThread.ManagedThreadId;
CreateApartmentState = Thread.CurrentThread.GetApartmentState();
if (exception is not null)
{
throw exception;
}
return CreatedObject;
}
}
private sealed class FakeMxAccessEventSink : IMxAccessEventSink
{
public object? AttachedObject { get; private set; }
public int? AttachThreadId { get; private set; }
public int? DetachThreadId { get; private set; }
public void Attach(object mxAccessComObject)
{
AttachedObject = mxAccessComObject;
AttachThreadId = Thread.CurrentThread.ManagedThreadId;
}
public void Detach()
{
DetachThreadId = Thread.CurrentThread.ManagedThreadId;
AttachedObject = null;
}
}
private sealed class NoopComApartmentInitializer : IStaComApartmentInitializer
{
public void Initialize()
{
}
public void Uninitialize()
{
}
}
}
+102 -13
View File
@@ -15,6 +15,7 @@ public sealed class WorkerPipeSession
private readonly Func<int> _processIdProvider;
private readonly WorkerFrameReader _reader;
private readonly WorkerFrameWriter _writer;
private MxAccessStaSession? _mxAccessStaSession;
private long _nextSequence;
public WorkerPipeSession(
@@ -42,7 +43,7 @@ public sealed class WorkerPipeSession
public Task CompleteStartupHandshakeAsync(CancellationToken cancellationToken = default)
{
return CompleteStartupHandshakeAsync(_ => Task.CompletedTask, cancellationToken);
return CompleteStartupHandshakeAsync(InitializeMxAccessAsync, cancellationToken);
}
public async Task CompleteStartupHandshakeAsync(
@@ -54,20 +55,44 @@ public sealed class WorkerPipeSession
throw new ArgumentNullException(nameof(initializeMxAccessAsync));
}
await CompleteStartupHandshakeAsync(
async innerCancellationToken =>
{
await initializeMxAccessAsync(innerCancellationToken).ConfigureAwait(false);
return CreateWorkerReady();
},
cancellationToken).ConfigureAwait(false);
}
public async Task CompleteStartupHandshakeAsync(
Func<CancellationToken, Task<WorkerReady>> initializeMxAccessAsync,
CancellationToken cancellationToken = default)
{
if (initializeMxAccessAsync is null)
{
throw new ArgumentNullException(nameof(initializeMxAccessAsync));
}
try
{
WorkerEnvelope envelope = await _reader.ReadAsync(cancellationToken).ConfigureAwait(false);
ValidateGatewayHello(envelope);
await WriteWorkerHelloAsync(cancellationToken).ConfigureAwait(false);
await initializeMxAccessAsync(cancellationToken).ConfigureAwait(false);
await WriteWorkerReadyAsync(cancellationToken).ConfigureAwait(false);
WorkerReady ready = await initializeMxAccessAsync(cancellationToken).ConfigureAwait(false);
await WriteWorkerReadyAsync(ready, cancellationToken).ConfigureAwait(false);
}
catch (WorkerFrameProtocolException exception)
{
await TryWriteFaultAsync(exception, cancellationToken).ConfigureAwait(false);
throw;
}
catch (Exception exception) when (exception is not OperationCanceledException)
{
await TryWriteFaultAsync(MxAccessCreationException.From(exception), cancellationToken)
.ConfigureAwait(false);
throw;
}
}
private void ValidateGatewayHello(WorkerEnvelope envelope)
@@ -108,17 +133,11 @@ public sealed class WorkerPipeSession
cancellationToken);
}
private Task WriteWorkerReadyAsync(CancellationToken cancellationToken)
private Task WriteWorkerReadyAsync(
WorkerReady ready,
CancellationToken cancellationToken)
{
return _writer.WriteAsync(
CreateEnvelope(new WorkerReady
{
WorkerProcessId = _processIdProvider(),
MxaccessProgid = MxAccessInteropInfo.ProgId,
MxaccessClsid = MxAccessInteropInfo.Clsid,
ReadyTimestamp = Timestamp.FromDateTime(DateTime.UtcNow),
}),
cancellationToken);
return _writer.WriteAsync(CreateEnvelope(ready), cancellationToken);
}
private async Task TryWriteFaultAsync(
@@ -140,6 +159,25 @@ public sealed class WorkerPipeSession
}
}
private async Task TryWriteFaultAsync(
MxAccessCreationException exception,
CancellationToken cancellationToken)
{
try
{
await _writer
.WriteAsync(CreateEnvelope(CreateFault(exception)), cancellationToken)
.ConfigureAwait(false);
}
catch (Exception faultWriteException) when (
faultWriteException is IOException
|| faultWriteException is ObjectDisposedException
|| faultWriteException is WorkerFrameProtocolException)
{
// The MXAccess creation failure is the actionable error.
}
}
private WorkerEnvelope CreateEnvelope(WorkerHello hello)
{
return CreateBaseEnvelope(hello);
@@ -191,6 +229,34 @@ public sealed class WorkerPipeSession
return unchecked((ulong)Interlocked.Increment(ref _nextSequence));
}
private async Task<WorkerReady> InitializeMxAccessAsync(CancellationToken cancellationToken)
{
_mxAccessStaSession = new MxAccessStaSession();
try
{
return await _mxAccessStaSession
.StartAsync(_processIdProvider(), cancellationToken)
.ConfigureAwait(false);
}
catch
{
_mxAccessStaSession.Dispose();
_mxAccessStaSession = null;
throw;
}
}
private WorkerReady CreateWorkerReady()
{
return new WorkerReady
{
WorkerProcessId = _processIdProvider(),
MxaccessProgid = MxAccessInteropInfo.ProgId,
MxaccessClsid = MxAccessInteropInfo.Clsid,
ReadyTimestamp = Timestamp.FromDateTime(DateTime.UtcNow),
};
}
private static WorkerFault CreateFault(WorkerFrameProtocolException exception)
{
return new WorkerFault
@@ -206,6 +272,29 @@ public sealed class WorkerPipeSession
};
}
private static WorkerFault CreateFault(MxAccessCreationException exception)
{
WorkerFault fault = new()
{
Category = WorkerFaultCategory.MxaccessCreationFailed,
ExceptionType = exception.InnerException?.GetType().FullName ?? exception.GetType().FullName ?? string.Empty,
DiagnosticMessage = exception.Message,
ProtocolStatus = new ProtocolStatus
{
Code = ProtocolStatusCode.WorkerUnavailable,
Message = exception.Message,
},
};
int? hresult = MxAccessCreationException.ExtractHResult(exception);
if (hresult.HasValue)
{
fault.Hresult = hresult.Value;
}
return fault;
}
private static WorkerFaultCategory MapFaultCategory(WorkerFrameProtocolErrorCode errorCode)
{
return errorCode switch
@@ -0,0 +1,6 @@
namespace MxGateway.Worker.MxAccess;
public interface IMxAccessComObjectFactory
{
object Create();
}
@@ -0,0 +1,8 @@
namespace MxGateway.Worker.MxAccess;
public interface IMxAccessEventSink
{
void Attach(object mxAccessComObject);
void Detach();
}
@@ -0,0 +1,66 @@
using ArchestrA.MxAccess;
namespace MxGateway.Worker.MxAccess;
public sealed class MxAccessBaseEventSink : IMxAccessEventSink
{
private LMXProxyServerClass? server;
public void Attach(object mxAccessComObject)
{
server = (LMXProxyServerClass)mxAccessComObject;
server.OnDataChange += OnDataChange;
server.OnWriteComplete += OnWriteComplete;
server.OperationComplete += OperationComplete;
server.OnBufferedDataChange += OnBufferedDataChange;
}
public void Detach()
{
if (server is null)
{
return;
}
server.OnDataChange -= OnDataChange;
server.OnWriteComplete -= OnWriteComplete;
server.OperationComplete -= OperationComplete;
server.OnBufferedDataChange -= OnBufferedDataChange;
server = null;
}
private static void OnDataChange(
int hLMXServerHandle,
int phItemHandle,
object pvItemValue,
int pwItemQuality,
object pftItemTimeStamp,
ref MXSTATUS_PROXY[] pVars)
{
}
private static void OnWriteComplete(
int hLMXServerHandle,
int phItemHandle,
ref MXSTATUS_PROXY[] pVars)
{
}
private static void OperationComplete(
int hLMXServerHandle,
int phItemHandle,
ref MXSTATUS_PROXY[] pVars)
{
}
private static void OnBufferedDataChange(
int hLMXServerHandle,
int phItemHandle,
MxDataType dtDataType,
object pvItemValue,
object pwItemQuality,
object pftItemTimeStamp,
ref MXSTATUS_PROXY[] pVars)
{
}
}
@@ -0,0 +1,11 @@
using ArchestrA.MxAccess;
namespace MxGateway.Worker.MxAccess;
public sealed class MxAccessComObjectFactory : IMxAccessComObjectFactory
{
public object Create()
{
return new LMXProxyServerClass();
}
}
@@ -0,0 +1,48 @@
using System;
using System.Runtime.InteropServices;
namespace MxGateway.Worker.MxAccess;
public sealed class MxAccessCreationException : Exception
{
public MxAccessCreationException(Exception innerException)
: base(
$"Failed to create MXAccess COM object {MxAccessInteropInfo.ComClassName} ({MxAccessInteropInfo.ProgId}).",
innerException)
{
AttemptedProgId = MxAccessInteropInfo.ProgId;
AttemptedClsid = MxAccessInteropInfo.Clsid;
AttemptedComClassName = MxAccessInteropInfo.ComClassName;
HResult = innerException.HResult;
}
public string AttemptedProgId { get; }
public string AttemptedClsid { get; }
public string AttemptedComClassName { get; }
public int? CapturedHResult => HResult == 0 ? null : HResult;
public static MxAccessCreationException From(Exception exception)
{
return exception is MxAccessCreationException creationException
? creationException
: new MxAccessCreationException(exception);
}
public static int? ExtractHResult(Exception exception)
{
if (exception is MxAccessCreationException creationException)
{
return creationException.CapturedHResult;
}
if (exception is COMException comException)
{
return comException.HResult;
}
return exception.HResult == 0 ? null : exception.HResult;
}
}
@@ -0,0 +1,97 @@
using System;
using System.Runtime.InteropServices;
using Google.Protobuf.WellKnownTypes;
using MxGateway.Contracts.Proto;
namespace MxGateway.Worker.MxAccess;
public sealed class MxAccessSession : IDisposable
{
private readonly object mxAccessComObject;
private readonly IMxAccessEventSink eventSink;
private bool disposed;
private MxAccessSession(
object mxAccessComObject,
IMxAccessEventSink eventSink,
int creationThreadId)
{
this.mxAccessComObject = mxAccessComObject ?? throw new ArgumentNullException(nameof(mxAccessComObject));
this.eventSink = eventSink ?? throw new ArgumentNullException(nameof(eventSink));
CreationThreadId = creationThreadId;
}
public int CreationThreadId { get; }
public WorkerReady CreateWorkerReady(int workerProcessId)
{
return new WorkerReady
{
WorkerProcessId = workerProcessId,
MxaccessProgid = MxAccessInteropInfo.ProgId,
MxaccessClsid = MxAccessInteropInfo.Clsid,
ReadyTimestamp = Timestamp.FromDateTime(DateTime.UtcNow),
};
}
public static MxAccessSession Create(
IMxAccessComObjectFactory factory,
IMxAccessEventSink eventSink)
{
if (factory is null)
{
throw new ArgumentNullException(nameof(factory));
}
if (eventSink is null)
{
throw new ArgumentNullException(nameof(eventSink));
}
object? mxAccessComObject = null;
try
{
mxAccessComObject = factory.Create();
if (mxAccessComObject is null)
{
throw new InvalidOperationException("MXAccess COM factory returned null.");
}
eventSink.Attach(mxAccessComObject);
return new MxAccessSession(
mxAccessComObject,
eventSink,
Environment.CurrentManagedThreadId);
}
catch (Exception exception)
{
eventSink.Detach();
if (mxAccessComObject is not null && Marshal.IsComObject(mxAccessComObject))
{
Marshal.FinalReleaseComObject(mxAccessComObject);
}
throw MxAccessCreationException.From(exception);
}
}
public void Dispose()
{
if (disposed)
{
return;
}
eventSink.Detach();
if (Marshal.IsComObject(mxAccessComObject))
{
Marshal.FinalReleaseComObject(mxAccessComObject);
}
disposed = true;
}
}
@@ -0,0 +1,70 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using MxGateway.Contracts.Proto;
using MxGateway.Worker.Sta;
namespace MxGateway.Worker.MxAccess;
public sealed class MxAccessStaSession : IDisposable
{
private readonly IMxAccessComObjectFactory factory;
private readonly IMxAccessEventSink eventSink;
private readonly StaRuntime staRuntime;
private MxAccessSession? session;
private bool disposed;
public MxAccessStaSession()
: this(
new StaRuntime(),
new MxAccessComObjectFactory(),
new MxAccessBaseEventSink())
{
}
public MxAccessStaSession(
StaRuntime staRuntime,
IMxAccessComObjectFactory factory,
IMxAccessEventSink eventSink)
{
this.staRuntime = staRuntime ?? throw new ArgumentNullException(nameof(staRuntime));
this.factory = factory ?? throw new ArgumentNullException(nameof(factory));
this.eventSink = eventSink ?? throw new ArgumentNullException(nameof(eventSink));
}
public Task<WorkerReady> StartAsync(
int workerProcessId,
CancellationToken cancellationToken = default)
{
staRuntime.Start();
return staRuntime.InvokeAsync(
() =>
{
if (session is not null)
{
throw new InvalidOperationException("MXAccess COM session has already been created.");
}
session = MxAccessSession.Create(factory, eventSink);
return session.CreateWorkerReady(workerProcessId);
},
cancellationToken);
}
public void Dispose()
{
if (disposed)
{
return;
}
if (session is not null)
{
staRuntime.InvokeAsync(() => session.Dispose()).GetAwaiter().GetResult();
}
staRuntime.Dispose();
disposed = true;
}
}