Compare commits

...

11 Commits

Author SHA1 Message Date
Joseph Doherty 276288ad87 Merge remote-tracking branch 'origin/main' into agent-2/issue-31-implement-mxstatus-proxy-and-hresult-conversion 2026-04-26 17:39:48 -04:00
dohertj2 76bd3de5a2 Merge PR #69: Issue #24 create MXAccess COM object on STA
Verified with dotnet build src\\MxGateway.sln, dotnet test src\\MxGateway.Worker.Tests\\MxGateway.Worker.Tests.csproj -p:Platform=x86, and dotnet test src\\MxGateway.sln --no-build.
2026-04-26 17:39:04 -04:00
Joseph Doherty 29455fc1f6 Issue #31: implement mxstatus proxy and hresult conversion 2026-04-26 17:35:30 -04:00
dohertj2 5511609880 Merge PR #68: Issue #12 implement session manager and registry
Verified with dotnet build src\\MxGateway.sln and dotnet test src\\MxGateway.sln.
2026-04-26 17:34:19 -04:00
Joseph Doherty 451dccf7e3 Issue #24: create mxaccess com object on sta 2026-04-26 17:34:12 -04:00
dohertj2 cde9c89386 Merge PR #67: Issue #30 implement value conversion
Verified with dotnet build src\\MxGateway.sln, dotnet test src\\MxGateway.Worker.Tests\\MxGateway.Worker.Tests.csproj -p:Platform=x86, and dotnet test src\\MxGateway.sln --no-build.
2026-04-26 17:30:19 -04:00
Joseph Doherty d496f1fd75 Issue #12: implement session manager and registry 2026-04-26 17:29:47 -04:00
Joseph Doherty 6559672fc1 Issue #30: implement value conversion 2026-04-26 17:26:36 -04:00
dohertj2 97c30b9d00 Merge PR #66: Issue #23 implement STA runtime and message pump
Verified with dotnet build src\\MxGateway.sln, dotnet test src\\MxGateway.Worker.Tests\\MxGateway.Worker.Tests.csproj -p:Platform=x86, and dotnet test src\\MxGateway.sln --no-build.
2026-04-26 17:23:02 -04:00
dohertj2 603aff7004 Merge PR #65: Issue #22 implement pipe client and frame protocol
Verified with dotnet build src\\MxGateway.sln, dotnet test src\\MxGateway.Worker.Tests\\MxGateway.Worker.Tests.csproj -p:Platform=x86, and dotnet test src\\MxGateway.sln --no-build.
2026-04-26 17:20:28 -04:00
Joseph Doherty e81682e367 Issue #23: implement sta runtime and message pump 2026-04-26 17:19:00 -04:00
44 changed files with 3634 additions and 13 deletions
+14
View File
@@ -330,6 +330,20 @@ The worker remains authoritative for MXAccess handles. The gateway may keep a
shadow state for diagnostics, but it must not invent, rewrite, or recycle
MXAccess handles.
`SessionManager` owns the current in-memory session registry. It allocates a
session id, creates the worker pipe name and nonce, registers the session before
worker startup, and removes the session if startup fails. A successful
`OpenSession` attaches the ready `IWorkerClient` and transitions the session to
`Ready`.
Only `Ready` sessions accept command and event operations. `CloseSession` is
idempotent for sessions still known to the registry: the first close shuts down
the worker, and later closes return the final `Closed` state. Lease handling is
exposed as a session hook so a monitor can close expired sessions without
embedding lease policy in the worker client. Gateway shutdown walks the
registry, closes each known session, and kills a worker if graceful shutdown
fails.
## Worker Launch
The gateway should launch the worker using explicit configuration:
+23
View File
@@ -250,6 +250,17 @@ The loop should update a heartbeat timestamp after:
- finishing a command,
- processing an MXAccess event.
`StaRuntime` implements this runtime boundary in the worker. It starts one
background thread named `MxGateway.Worker.STA`, sets it to `ApartmentState.STA`,
initializes COM through `StaComApartmentInitializer`, and runs
`StaMessagePump`. Commands are scheduled through `InvokeAsync`; the command
queue signals an `AutoResetEvent` so `MsgWaitForMultipleObjectsEx` can wake the
STA without busy-waiting. `LastActivityUtc` records pump, command, startup, and
shutdown activity so the future heartbeat/watchdog can report whether the STA
is still responsive. Shutdown marks the runtime as closing, wakes the pump,
rejects new commands, cancels queued work, uninitializes COM on the STA, and
waits for the thread to exit.
## COM Creation
The MXAccess analysis source at `C:\Users\dohertj2\Desktop\mxaccess` identifies
@@ -278,6 +289,13 @@ The worker should reference the interop assembly and instantiate
`LMXProxyServerClass` on the dedicated STA thread. Keep the ProgID and assembly
path configurable for diagnostics, but this COM class is the v1 default.
`MxAccessStaSession` owns the initial COM creation path. It starts `StaRuntime`,
creates `LMXProxyServerClass` through `MxAccessComObjectFactory` on the STA,
attaches `MxAccessBaseEventSink`, and returns `WorkerReady` only after those
steps succeed. `MxAccessSession` keeps the raw COM object private, records the
STA managed thread id that created it, detaches the base event sink during
disposal, and releases the COM reference on the STA.
Creation rules:
- Create COM object only on the STA.
@@ -295,6 +313,11 @@ If COM creation fails, the worker should send a structured fault with:
- worker process id,
- session id.
`WorkerPipeSession` maps startup exceptions from this path to
`WorkerFaultCategory.MxaccessCreationFailed`, includes the captured HRESULT
when the exception exposes one, and does not send `WorkerReady` after a failed
COM creation attempt.
## Event Sink
The worker must subscribe to every public MXAccess event family:
+6
View File
@@ -799,6 +799,12 @@ Core operations:
- track worker state,
- close or kill worker.
The gateway implementation keeps sessions in an in-memory `SessionRegistry`
keyed by session id. `SessionManager` owns the state machine, creates
per-session pipe names and nonces, starts the worker through the worker-client
factory, gates commands to `Ready` sessions, exposes lease-close hooks, and
cleans up workers during gateway shutdown.
State machine:
```text
@@ -4,6 +4,7 @@ using MxGateway.Server.Diagnostics;
using MxGateway.Server.Metrics;
using MxGateway.Server.Security.Authentication;
using MxGateway.Server.Security.Authorization;
using MxGateway.Server.Sessions;
using MxGateway.Server.Workers;
namespace MxGateway.Server;
@@ -31,6 +32,7 @@ public static class GatewayApplication
builder.Services.AddHealthChecks();
builder.Services.AddSingleton<GatewayMetrics>();
builder.Services.AddWorkerProcessLauncher();
builder.Services.AddGatewaySessions();
return builder;
}
@@ -0,0 +1,290 @@
using MxGateway.Contracts.Proto;
using MxGateway.Server.Workers;
namespace MxGateway.Server.Sessions;
public sealed class GatewaySession
{
private readonly object _syncRoot = new();
private readonly SemaphoreSlim _closeLock = new(1, 1);
private IWorkerClient? _workerClient;
private SessionState _state = SessionState.Creating;
private string? _finalFault;
private DateTimeOffset _lastClientActivityAt;
private DateTimeOffset? _leaseExpiresAt;
private bool _closeStarted;
public GatewaySession(
string sessionId,
string backendName,
string pipeName,
string nonce,
string? clientIdentity,
string? clientSessionName,
string? clientCorrelationId,
TimeSpan commandTimeout,
TimeSpan startupTimeout,
TimeSpan shutdownTimeout,
DateTimeOffset openedAt)
{
if (string.IsNullOrWhiteSpace(sessionId))
{
throw new ArgumentException("Session id is required.", nameof(sessionId));
}
if (string.IsNullOrWhiteSpace(backendName))
{
throw new ArgumentException("Backend name is required.", nameof(backendName));
}
if (string.IsNullOrWhiteSpace(pipeName))
{
throw new ArgumentException("Pipe name is required.", nameof(pipeName));
}
if (string.IsNullOrWhiteSpace(nonce))
{
throw new ArgumentException("Nonce is required.", nameof(nonce));
}
SessionId = sessionId;
BackendName = backendName;
PipeName = pipeName;
Nonce = nonce;
ClientIdentity = clientIdentity;
ClientSessionName = clientSessionName;
ClientCorrelationId = clientCorrelationId;
CommandTimeout = commandTimeout;
StartupTimeout = startupTimeout;
ShutdownTimeout = shutdownTimeout;
OpenedAt = openedAt;
_lastClientActivityAt = openedAt;
}
public string SessionId { get; }
public string BackendName { get; }
public string PipeName { get; }
public string Nonce { get; }
public string? ClientIdentity { get; }
public string? ClientSessionName { get; }
public string? ClientCorrelationId { get; }
public TimeSpan CommandTimeout { get; }
public TimeSpan StartupTimeout { get; }
public TimeSpan ShutdownTimeout { get; }
public DateTimeOffset OpenedAt { get; }
public int? WorkerProcessId => _workerClient?.ProcessId;
public IWorkerClient? WorkerClient => _workerClient;
public SessionState State
{
get
{
lock (_syncRoot)
{
return _state;
}
}
}
public DateTimeOffset LastClientActivityAt
{
get
{
lock (_syncRoot)
{
return _lastClientActivityAt;
}
}
}
public DateTimeOffset? LeaseExpiresAt
{
get
{
lock (_syncRoot)
{
return _leaseExpiresAt;
}
}
}
public string? FinalFault
{
get
{
lock (_syncRoot)
{
return _finalFault;
}
}
}
public void AttachWorkerClient(IWorkerClient workerClient)
{
ArgumentNullException.ThrowIfNull(workerClient);
lock (_syncRoot)
{
_workerClient = workerClient;
}
}
public void TransitionTo(SessionState nextState)
{
lock (_syncRoot)
{
if (_state is SessionState.Closed)
{
return;
}
if (_state is SessionState.Faulted && nextState is not SessionState.Closed)
{
return;
}
_state = nextState;
}
}
public void MarkReady()
{
TransitionTo(SessionState.Ready);
}
public void MarkFaulted(string reason)
{
lock (_syncRoot)
{
if (_state is SessionState.Closed)
{
return;
}
_finalFault = reason;
_state = SessionState.Faulted;
}
}
public void TouchClientActivity(DateTimeOffset activityAt)
{
lock (_syncRoot)
{
_lastClientActivityAt = activityAt;
}
}
public void ExtendLease(DateTimeOffset leaseExpiresAt)
{
lock (_syncRoot)
{
_leaseExpiresAt = leaseExpiresAt;
}
}
public bool IsLeaseExpired(DateTimeOffset now)
{
lock (_syncRoot)
{
return _leaseExpiresAt is not null && _leaseExpiresAt <= now;
}
}
public async Task<WorkerCommandReply> InvokeAsync(
WorkerCommand command,
CancellationToken cancellationToken)
{
IWorkerClient workerClient = GetReadyWorkerClient();
TouchClientActivity(DateTimeOffset.UtcNow);
return await workerClient.InvokeAsync(command, CommandTimeout, cancellationToken).ConfigureAwait(false);
}
public IAsyncEnumerable<WorkerEvent> ReadEventsAsync(CancellationToken cancellationToken)
{
IWorkerClient workerClient = GetReadyWorkerClient();
TouchClientActivity(DateTimeOffset.UtcNow);
return workerClient.ReadEventsAsync(cancellationToken);
}
public async Task<SessionCloseResult> CloseAsync(
string reason,
CancellationToken cancellationToken)
{
await _closeLock.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
if (_state is SessionState.Closed)
{
return new SessionCloseResult(SessionId, SessionState.Closed, AlreadyClosed: true);
}
bool alreadyClosing = _closeStarted;
_closeStarted = true;
_state = SessionState.Closing;
if (_workerClient is not null)
{
try
{
await _workerClient.ShutdownAsync(ShutdownTimeout, cancellationToken).ConfigureAwait(false);
}
catch
{
_workerClient.Kill(reason);
throw;
}
}
_state = SessionState.Closed;
return new SessionCloseResult(SessionId, SessionState.Closed, alreadyClosing);
}
finally
{
_closeLock.Release();
}
}
public void KillWorker(string reason)
{
_workerClient?.Kill(reason);
TransitionTo(SessionState.Closed);
}
public async ValueTask DisposeAsync()
{
_closeLock.Dispose();
if (_workerClient is not null)
{
await _workerClient.DisposeAsync().ConfigureAwait(false);
}
}
private IWorkerClient GetReadyWorkerClient()
{
lock (_syncRoot)
{
if (_state != SessionState.Ready || _workerClient?.State != WorkerClientState.Ready)
{
throw new SessionManagerException(
SessionManagerErrorCode.SessionNotReady,
$"Session {SessionId} is not ready. Current state is {_state}.");
}
return _workerClient;
}
}
}
@@ -0,0 +1,34 @@
using MxGateway.Contracts.Proto;
namespace MxGateway.Server.Sessions;
public interface ISessionManager
{
Task<GatewaySession> OpenSessionAsync(
SessionOpenRequest request,
string? clientIdentity,
CancellationToken cancellationToken);
bool TryGetSession(
string sessionId,
out GatewaySession session);
Task<WorkerCommandReply> InvokeAsync(
string sessionId,
WorkerCommand command,
CancellationToken cancellationToken);
IAsyncEnumerable<WorkerEvent> ReadEventsAsync(
string sessionId,
CancellationToken cancellationToken);
Task<SessionCloseResult> CloseSessionAsync(
string sessionId,
CancellationToken cancellationToken);
Task<int> CloseExpiredLeasesAsync(
DateTimeOffset now,
CancellationToken cancellationToken);
Task ShutdownAsync(CancellationToken cancellationToken);
}
@@ -0,0 +1,16 @@
namespace MxGateway.Server.Sessions;
public interface ISessionRegistry
{
int Count { get; }
int ActiveCount { get; }
bool TryAdd(GatewaySession session);
bool TryGet(string sessionId, out GatewaySession session);
bool TryRemove(string sessionId, out GatewaySession session);
IReadOnlyCollection<GatewaySession> Snapshot();
}
@@ -0,0 +1,8 @@
namespace MxGateway.Server.Sessions;
public interface ISessionWorkerClientFactory
{
Task<MxGateway.Server.Workers.IWorkerClient> CreateAsync(
GatewaySession session,
CancellationToken cancellationToken);
}
@@ -0,0 +1,8 @@
using MxGateway.Contracts.Proto;
namespace MxGateway.Server.Sessions;
public sealed record SessionCloseResult(
string SessionId,
SessionState FinalState,
bool AlreadyClosed);
@@ -0,0 +1,287 @@
using System.Security.Cryptography;
using Google.Protobuf.WellKnownTypes;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using MxGateway.Contracts;
using MxGateway.Contracts.Proto;
using MxGateway.Server.Configuration;
using MxGateway.Server.Metrics;
using MxGateway.Server.Workers;
namespace MxGateway.Server.Sessions;
public sealed class SessionManager : ISessionManager
{
public const string DefaultCloseReason = "client-close";
public const string GatewayShutdownReason = "gateway-shutdown";
public const string LeaseExpiredReason = "lease-expired";
private readonly ISessionRegistry _registry;
private readonly ISessionWorkerClientFactory _workerClientFactory;
private readonly GatewayMetrics _metrics;
private readonly TimeProvider _timeProvider;
private readonly ILogger<SessionManager> _logger;
private readonly GatewayOptions _options;
public SessionManager(
ISessionRegistry registry,
ISessionWorkerClientFactory workerClientFactory,
IOptions<GatewayOptions> options,
GatewayMetrics metrics,
TimeProvider? timeProvider = null,
ILogger<SessionManager>? logger = null)
{
_registry = registry ?? throw new ArgumentNullException(nameof(registry));
_workerClientFactory = workerClientFactory ?? throw new ArgumentNullException(nameof(workerClientFactory));
ArgumentNullException.ThrowIfNull(options);
_metrics = metrics ?? throw new ArgumentNullException(nameof(metrics));
_timeProvider = timeProvider ?? TimeProvider.System;
_logger = logger ?? NullLogger<SessionManager>.Instance;
_options = options.Value;
}
public async Task<GatewaySession> OpenSessionAsync(
SessionOpenRequest request,
string? clientIdentity,
CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(request);
EnsureSessionCapacity();
GatewaySession session = CreateSession(request, clientIdentity);
if (!_registry.TryAdd(session))
{
throw new SessionManagerException(
SessionManagerErrorCode.OpenFailed,
$"Session id collision while opening session {session.SessionId}.");
}
try
{
session.TransitionTo(SessionState.StartingWorker);
IWorkerClient workerClient = await _workerClientFactory
.CreateAsync(session, cancellationToken)
.ConfigureAwait(false);
session.AttachWorkerClient(workerClient);
session.MarkReady();
_metrics.SessionOpened();
return session;
}
catch (Exception exception)
{
session.MarkFaulted(exception.Message);
_registry.TryRemove(session.SessionId, out _);
await session.DisposeAsync().ConfigureAwait(false);
_metrics.Fault(SessionManagerErrorCode.OpenFailed.ToString());
_logger.LogWarning(
exception,
"Failed to open gateway session {SessionId}.",
session.SessionId);
throw new SessionManagerException(
SessionManagerErrorCode.OpenFailed,
$"Failed to open session {session.SessionId}.",
exception);
}
}
public bool TryGetSession(
string sessionId,
out GatewaySession session)
{
return _registry.TryGet(sessionId, out session);
}
public async Task<WorkerCommandReply> InvokeAsync(
string sessionId,
WorkerCommand command,
CancellationToken cancellationToken)
{
GatewaySession session = GetRequiredSession(sessionId);
try
{
return await session.InvokeAsync(command, cancellationToken).ConfigureAwait(false);
}
catch (SessionManagerException)
{
throw;
}
catch (Exception exception)
{
if (session.WorkerClient?.State == WorkerClientState.Faulted)
{
session.MarkFaulted(exception.Message);
}
throw;
}
}
public IAsyncEnumerable<WorkerEvent> ReadEventsAsync(
string sessionId,
CancellationToken cancellationToken)
{
GatewaySession session = GetRequiredSession(sessionId);
return session.ReadEventsAsync(cancellationToken);
}
public async Task<SessionCloseResult> CloseSessionAsync(
string sessionId,
CancellationToken cancellationToken)
{
GatewaySession session = GetRequiredSession(sessionId);
SessionCloseResult result = await CloseSessionCoreAsync(
session,
DefaultCloseReason,
cancellationToken).ConfigureAwait(false);
return result;
}
public async Task<int> CloseExpiredLeasesAsync(
DateTimeOffset now,
CancellationToken cancellationToken)
{
int closedCount = 0;
foreach (GatewaySession session in _registry.Snapshot())
{
if (!session.IsLeaseExpired(now))
{
continue;
}
await CloseSessionCoreAsync(session, LeaseExpiredReason, cancellationToken).ConfigureAwait(false);
closedCount++;
}
return closedCount;
}
public async Task ShutdownAsync(CancellationToken cancellationToken)
{
foreach (GatewaySession session in _registry.Snapshot())
{
try
{
await CloseSessionCoreAsync(session, GatewayShutdownReason, cancellationToken).ConfigureAwait(false);
}
catch (Exception exception)
{
_logger.LogWarning(
exception,
"Graceful shutdown failed for session {SessionId}; killing worker.",
session.SessionId);
session.KillWorker(GatewayShutdownReason);
}
}
}
private async Task<SessionCloseResult> CloseSessionCoreAsync(
GatewaySession session,
string reason,
CancellationToken cancellationToken)
{
bool wasClosed = session.State == SessionState.Closed;
try
{
SessionCloseResult result = await session.CloseAsync(reason, cancellationToken).ConfigureAwait(false);
if (!wasClosed && !result.AlreadyClosed)
{
_metrics.SessionClosed();
}
return result;
}
catch (Exception exception)
{
session.MarkFaulted(exception.Message);
_metrics.Fault(SessionManagerErrorCode.CloseFailed.ToString());
throw new SessionManagerException(
SessionManagerErrorCode.CloseFailed,
$"Failed to close session {session.SessionId}.",
exception);
}
}
private GatewaySession GetRequiredSession(string sessionId)
{
if (!_registry.TryGet(sessionId, out GatewaySession session))
{
throw new SessionManagerException(
SessionManagerErrorCode.SessionNotFound,
$"Session {sessionId} was not found.");
}
return session;
}
private void EnsureSessionCapacity()
{
if (_registry.ActiveCount >= _options.Sessions.MaxSessions)
{
throw new SessionManagerException(
SessionManagerErrorCode.SessionLimitExceeded,
$"Gateway session limit {_options.Sessions.MaxSessions} has been reached.");
}
}
private GatewaySession CreateSession(
SessionOpenRequest request,
string? clientIdentity)
{
string sessionId = CreateSessionId();
string backendName = string.IsNullOrWhiteSpace(request.RequestedBackend)
? GatewayContractInfo.DefaultBackendName
: request.RequestedBackend!;
TimeSpan commandTimeout = ResolveCommandTimeout(request.CommandTimeout);
TimeSpan startupTimeout = TimeSpan.FromSeconds(_options.Worker.StartupTimeoutSeconds);
TimeSpan shutdownTimeout = TimeSpan.FromSeconds(_options.Worker.ShutdownTimeoutSeconds);
string pipeName = $"mxaccess-gateway-{Environment.ProcessId}-{sessionId}";
string nonce = CreateNonce();
DateTimeOffset openedAt = _timeProvider.GetUtcNow();
return new GatewaySession(
sessionId,
backendName,
pipeName,
nonce,
clientIdentity,
request.ClientSessionName,
request.ClientCorrelationId,
commandTimeout,
startupTimeout,
shutdownTimeout,
openedAt);
}
private TimeSpan ResolveCommandTimeout(Duration? requestedTimeout)
{
if (requestedTimeout is null)
{
return TimeSpan.FromSeconds(_options.Sessions.DefaultCommandTimeoutSeconds);
}
TimeSpan timeout = requestedTimeout.ToTimeSpan();
return timeout <= TimeSpan.Zero
? TimeSpan.FromSeconds(_options.Sessions.DefaultCommandTimeoutSeconds)
: timeout;
}
private static string CreateSessionId()
{
return $"session-{Guid.NewGuid():N}";
}
private static string CreateNonce()
{
Span<byte> bytes = stackalloc byte[32];
RandomNumberGenerator.Fill(bytes);
return Convert.ToBase64String(bytes);
}
}
@@ -0,0 +1,10 @@
namespace MxGateway.Server.Sessions;
public enum SessionManagerErrorCode
{
SessionNotFound,
SessionNotReady,
SessionLimitExceeded,
OpenFailed,
CloseFailed,
}
@@ -0,0 +1,23 @@
namespace MxGateway.Server.Sessions;
public sealed class SessionManagerException : Exception
{
public SessionManagerException(
SessionManagerErrorCode errorCode,
string message)
: base(message)
{
ErrorCode = errorCode;
}
public SessionManagerException(
SessionManagerErrorCode errorCode,
string message,
Exception innerException)
: base(message, innerException)
{
ErrorCode = errorCode;
}
public SessionManagerErrorCode ErrorCode { get; }
}
@@ -0,0 +1,22 @@
using Google.Protobuf.WellKnownTypes;
using MxGateway.Contracts.Proto;
namespace MxGateway.Server.Sessions;
public sealed record SessionOpenRequest(
string? RequestedBackend,
string? ClientSessionName,
string? ClientCorrelationId,
Duration? CommandTimeout)
{
public static SessionOpenRequest FromContract(OpenSessionRequest request)
{
ArgumentNullException.ThrowIfNull(request);
return new SessionOpenRequest(
request.RequestedBackend,
request.ClientSessionName,
request.ClientCorrelationId,
request.CommandTimeout);
}
}
@@ -0,0 +1,39 @@
using System.Collections.Concurrent;
using MxGateway.Contracts.Proto;
namespace MxGateway.Server.Sessions;
public sealed class SessionRegistry : ISessionRegistry
{
private readonly ConcurrentDictionary<string, GatewaySession> _sessions = new(StringComparer.Ordinal);
public int Count => _sessions.Count;
public int ActiveCount => _sessions.Values.Count(session => session.State is not SessionState.Closed);
public bool TryAdd(GatewaySession session)
{
ArgumentNullException.ThrowIfNull(session);
return _sessions.TryAdd(session.SessionId, session);
}
public bool TryGet(
string sessionId,
out GatewaySession session)
{
return _sessions.TryGetValue(sessionId, out session!);
}
public bool TryRemove(
string sessionId,
out GatewaySession session)
{
return _sessions.TryRemove(sessionId, out session!);
}
public IReadOnlyCollection<GatewaySession> Snapshot()
{
return _sessions.Values.ToArray();
}
}
@@ -0,0 +1,13 @@
namespace MxGateway.Server.Sessions;
public static class SessionServiceCollectionExtensions
{
public static IServiceCollection AddGatewaySessions(this IServiceCollection services)
{
services.AddSingleton<ISessionRegistry, SessionRegistry>();
services.AddSingleton<ISessionWorkerClientFactory, SessionWorkerClientFactory>();
services.AddSingleton<ISessionManager, SessionManager>();
return services;
}
}
@@ -0,0 +1,144 @@
using System.IO.Pipes;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using MxGateway.Contracts;
using MxGateway.Contracts.Proto;
using MxGateway.Server.Configuration;
using MxGateway.Server.Metrics;
using MxGateway.Server.Workers;
namespace MxGateway.Server.Sessions;
public sealed class SessionWorkerClientFactory : ISessionWorkerClientFactory
{
private readonly IWorkerProcessLauncher _workerProcessLauncher;
private readonly GatewayMetrics _metrics;
private readonly TimeProvider _timeProvider;
private readonly ILoggerFactory _loggerFactory;
private readonly GatewayOptions _options;
public SessionWorkerClientFactory(
IWorkerProcessLauncher workerProcessLauncher,
IOptions<GatewayOptions> options,
GatewayMetrics metrics,
ILoggerFactory loggerFactory,
TimeProvider? timeProvider = null)
{
_workerProcessLauncher = workerProcessLauncher ?? throw new ArgumentNullException(nameof(workerProcessLauncher));
ArgumentNullException.ThrowIfNull(options);
_metrics = metrics ?? throw new ArgumentNullException(nameof(metrics));
_loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory));
_timeProvider = timeProvider ?? TimeProvider.System;
_options = options.Value;
}
public async Task<IWorkerClient> CreateAsync(
GatewaySession session,
CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(session);
NamedPipeServerStream? pipe = CreatePipe(session.PipeName);
WorkerProcessHandle? processHandle = null;
IWorkerClient? workerClient = null;
try
{
session.TransitionTo(SessionState.StartingWorker);
processHandle = await _workerProcessLauncher
.LaunchAsync(
new WorkerProcessLaunchRequest(
session.SessionId,
session.PipeName,
GatewayContractInfo.WorkerProtocolVersion,
session.Nonce,
pipe),
cancellationToken)
.ConfigureAwait(false);
session.TransitionTo(SessionState.WaitingForPipe);
await WaitForPipeConnectionAsync(pipe, session.StartupTimeout, cancellationToken).ConfigureAwait(false);
session.TransitionTo(SessionState.Handshaking);
WorkerFrameProtocolOptions frameOptions = new(
session.SessionId,
GatewayContractInfo.WorkerProtocolVersion,
_options.Worker.MaxMessageBytes);
WorkerClientConnection connection = new(
session.SessionId,
session.Nonce,
pipe,
frameOptions,
processHandle);
WorkerClientOptions clientOptions = new()
{
HeartbeatGrace = TimeSpan.FromSeconds(_options.Worker.HeartbeatGraceSeconds),
HeartbeatCheckInterval = TimeSpan.FromSeconds(_options.Worker.HeartbeatIntervalSeconds),
EventChannelCapacity = _options.Events.QueueCapacity,
};
workerClient = new WorkerClient(
connection,
clientOptions,
_metrics,
_timeProvider,
_loggerFactory.CreateLogger<WorkerClient>());
pipe = null;
processHandle = null;
session.TransitionTo(SessionState.InitializingWorker);
await workerClient.StartAsync(cancellationToken).ConfigureAwait(false);
return workerClient;
}
catch
{
if (workerClient is not null)
{
await workerClient.DisposeAsync().ConfigureAwait(false);
}
else
{
if (processHandle is not null)
{
try
{
if (!processHandle.Process.HasExited)
{
processHandle.Process.Kill(entireProcessTree: true);
_metrics.WorkerKilled("OpenSessionFailed");
}
}
finally
{
processHandle.Dispose();
}
}
pipe?.Dispose();
}
throw;
}
}
private static NamedPipeServerStream CreatePipe(string pipeName)
{
return new NamedPipeServerStream(
pipeName,
PipeDirection.InOut,
maxNumberOfServerInstances: 1,
PipeTransmissionMode.Byte,
PipeOptions.Asynchronous);
}
private static async Task WaitForPipeConnectionAsync(
NamedPipeServerStream pipe,
TimeSpan startupTimeout,
CancellationToken cancellationToken)
{
using CancellationTokenSource timeout = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
timeout.CancelAfter(startupTimeout);
await pipe.WaitForConnectionAsync(timeout.Token).ConfigureAwait(false);
}
}
@@ -0,0 +1,320 @@
using Google.Protobuf.WellKnownTypes;
using Microsoft.Extensions.Options;
using MxGateway.Contracts.Proto;
using MxGateway.Server.Configuration;
using MxGateway.Server.Metrics;
using MxGateway.Server.Sessions;
using MxGateway.Server.Workers;
namespace MxGateway.Tests.Gateway.Sessions;
public sealed class SessionManagerTests
{
[Fact]
public async Task OpenSessionAsync_WithWorkerReady_RegistersReadySession()
{
FakeWorkerClient workerClient = new();
FakeSessionWorkerClientFactory factory = new(workerClient)
{
ApplyLifecycleTransitions = true,
};
using GatewayMetrics metrics = new();
SessionManager manager = CreateManager(factory, metrics: metrics);
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None);
Assert.True(manager.TryGetSession(session.SessionId, out GatewaySession registered));
Assert.Same(session, registered);
Assert.Equal(SessionState.Ready, session.State);
Assert.Equal("client-1", session.ClientIdentity);
Assert.Equal(["StartingWorker", "WaitingForPipe", "Handshaking", "InitializingWorker"], factory.ObservedStates);
Assert.Equal(1, metrics.GetSnapshot().OpenSessions);
Assert.Equal(1, metrics.GetSnapshot().SessionsOpened);
}
[Fact]
public async Task InvokeAsync_WhenSessionReady_ForwardsCommandToWorker()
{
FakeWorkerClient workerClient = new();
SessionManager manager = CreateManager(new FakeSessionWorkerClientFactory(workerClient));
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None);
WorkerCommandReply reply = await manager.InvokeAsync(
session.SessionId,
CreateCommand(MxCommandKind.Ping),
CancellationToken.None);
Assert.Equal(1, workerClient.InvokeCount);
Assert.Equal(MxCommandKind.Ping, reply.Reply.Kind);
}
[Fact]
public async Task InvokeAsync_WhenSessionFaulted_RejectsCommand()
{
FakeWorkerClient workerClient = new();
SessionManager manager = CreateManager(new FakeSessionWorkerClientFactory(workerClient));
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None);
session.MarkFaulted("test fault");
SessionManagerException exception = await Assert.ThrowsAsync<SessionManagerException>(
async () => await manager.InvokeAsync(
session.SessionId,
CreateCommand(MxCommandKind.Ping),
CancellationToken.None));
Assert.Equal(SessionManagerErrorCode.SessionNotReady, exception.ErrorCode);
Assert.Equal(0, workerClient.InvokeCount);
}
[Fact]
public async Task CloseSessionAsync_WhenCalledTwice_IsIdempotent()
{
FakeWorkerClient workerClient = new();
using GatewayMetrics metrics = new();
SessionManager manager = CreateManager(new FakeSessionWorkerClientFactory(workerClient), metrics: metrics);
GatewaySession session = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None);
SessionCloseResult firstClose = await manager.CloseSessionAsync(session.SessionId, CancellationToken.None);
SessionCloseResult secondClose = await manager.CloseSessionAsync(session.SessionId, CancellationToken.None);
Assert.False(firstClose.AlreadyClosed);
Assert.True(secondClose.AlreadyClosed);
Assert.Equal(SessionState.Closed, firstClose.FinalState);
Assert.Equal(SessionState.Closed, secondClose.FinalState);
Assert.Equal(1, workerClient.ShutdownCount);
Assert.Equal(1, metrics.GetSnapshot().SessionsClosed);
Assert.Equal(0, metrics.GetSnapshot().OpenSessions);
}
[Fact]
public async Task OpenSessionAsync_WhenWorkerCreationFails_RemovesSessionFromRegistry()
{
SessionRegistry registry = new();
using GatewayMetrics metrics = new();
SessionManager manager = CreateManager(
new FailingSessionWorkerClientFactory(),
registry,
metrics);
SessionManagerException exception = await Assert.ThrowsAsync<SessionManagerException>(
async () => await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None));
Assert.Equal(SessionManagerErrorCode.OpenFailed, exception.ErrorCode);
Assert.Equal(0, registry.Count);
Assert.Equal(0, metrics.GetSnapshot().SessionsOpened);
Assert.Equal(1, metrics.GetSnapshot().Faults);
}
[Fact]
public async Task CloseExpiredLeasesAsync_ClosesExpiredSessionsOnly()
{
FakeWorkerClient expiredClient = new();
FakeWorkerClient activeClient = new();
QueueingSessionWorkerClientFactory factory = new(expiredClient, activeClient);
SessionManager manager = CreateManager(factory);
GatewaySession expiredSession = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None);
GatewaySession activeSession = await manager.OpenSessionAsync(CreateOpenRequest(), "client-2", CancellationToken.None);
DateTimeOffset now = DateTimeOffset.UtcNow;
expiredSession.ExtendLease(now.AddSeconds(-1));
activeSession.ExtendLease(now.AddMinutes(5));
int closedCount = await manager.CloseExpiredLeasesAsync(now, CancellationToken.None);
Assert.Equal(1, closedCount);
Assert.Equal(SessionState.Closed, expiredSession.State);
Assert.Equal(SessionState.Ready, activeSession.State);
Assert.Equal(1, expiredClient.ShutdownCount);
Assert.Equal(0, activeClient.ShutdownCount);
}
[Fact]
public async Task ShutdownAsync_ClosesAllRegisteredSessions()
{
FakeWorkerClient firstClient = new();
FakeWorkerClient secondClient = new();
QueueingSessionWorkerClientFactory factory = new(firstClient, secondClient);
using GatewayMetrics metrics = new();
SessionManager manager = CreateManager(factory, metrics: metrics);
GatewaySession firstSession = await manager.OpenSessionAsync(CreateOpenRequest(), "client-1", CancellationToken.None);
GatewaySession secondSession = await manager.OpenSessionAsync(CreateOpenRequest(), "client-2", CancellationToken.None);
await manager.ShutdownAsync(CancellationToken.None);
Assert.Equal(SessionState.Closed, firstSession.State);
Assert.Equal(SessionState.Closed, secondSession.State);
Assert.Equal(1, firstClient.ShutdownCount);
Assert.Equal(1, secondClient.ShutdownCount);
Assert.Equal(2, metrics.GetSnapshot().SessionsClosed);
Assert.Equal(0, metrics.GetSnapshot().OpenSessions);
}
private static SessionManager CreateManager(
ISessionWorkerClientFactory factory,
ISessionRegistry? registry = null,
GatewayMetrics? metrics = null,
GatewayOptions? options = null)
{
return new SessionManager(
registry ?? new SessionRegistry(),
factory,
Options.Create(options ?? CreateOptions()),
metrics ?? new GatewayMetrics());
}
private static GatewayOptions CreateOptions()
{
return new GatewayOptions
{
Sessions = new SessionOptions
{
DefaultCommandTimeoutSeconds = 30,
MaxSessions = 64,
},
Worker = new WorkerOptions
{
StartupTimeoutSeconds = 30,
ShutdownTimeoutSeconds = 10,
},
};
}
private static SessionOpenRequest CreateOpenRequest()
{
return new SessionOpenRequest(
RequestedBackend: null,
ClientSessionName: "test-session",
ClientCorrelationId: "client-correlation-1",
CommandTimeout: Duration.FromTimeSpan(TimeSpan.FromSeconds(5)));
}
private static WorkerCommand CreateCommand(MxCommandKind kind)
{
return new WorkerCommand
{
Command = new MxCommand
{
Kind = kind,
},
};
}
private sealed class FakeSessionWorkerClientFactory(IWorkerClient workerClient) : ISessionWorkerClientFactory
{
public List<string> ObservedStates { get; } = [];
public bool ApplyLifecycleTransitions { get; init; }
public Task<IWorkerClient> CreateAsync(
GatewaySession session,
CancellationToken cancellationToken)
{
ObservedStates.Add(session.State.ToString());
if (ApplyLifecycleTransitions)
{
session.TransitionTo(SessionState.WaitingForPipe);
ObservedStates.Add(session.State.ToString());
session.TransitionTo(SessionState.Handshaking);
ObservedStates.Add(session.State.ToString());
session.TransitionTo(SessionState.InitializingWorker);
ObservedStates.Add(session.State.ToString());
}
return Task.FromResult(workerClient);
}
}
private sealed class QueueingSessionWorkerClientFactory : ISessionWorkerClientFactory
{
private readonly Queue<IWorkerClient> _workerClients;
public QueueingSessionWorkerClientFactory(params IWorkerClient[] workerClients)
{
_workerClients = new Queue<IWorkerClient>(workerClients);
}
public Task<IWorkerClient> CreateAsync(
GatewaySession session,
CancellationToken cancellationToken)
{
return Task.FromResult(_workerClients.Dequeue());
}
}
private sealed class FailingSessionWorkerClientFactory : ISessionWorkerClientFactory
{
public Task<IWorkerClient> CreateAsync(
GatewaySession session,
CancellationToken cancellationToken)
{
throw new InvalidOperationException("worker startup failed");
}
}
private sealed class FakeWorkerClient : IWorkerClient
{
public string SessionId { get; init; } = "session-1";
public int? ProcessId { get; init; } = 1234;
public WorkerClientState State { get; set; } = WorkerClientState.Ready;
public DateTimeOffset LastHeartbeatAt { get; init; } = DateTimeOffset.UtcNow;
public int InvokeCount { get; private set; }
public int ShutdownCount { get; private set; }
public int KillCount { get; private set; }
public Task StartAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
public Task<WorkerCommandReply> InvokeAsync(
WorkerCommand command,
TimeSpan timeout,
CancellationToken cancellationToken)
{
InvokeCount++;
MxCommandKind kind = command.Command?.Kind ?? MxCommandKind.Unspecified;
return Task.FromResult(new WorkerCommandReply
{
Reply = new MxCommandReply
{
SessionId = SessionId,
CorrelationId = "correlation-1",
Kind = kind,
},
});
}
public async IAsyncEnumerable<WorkerEvent> ReadEventsAsync(
[System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken)
{
await Task.CompletedTask;
yield break;
}
public Task ShutdownAsync(
TimeSpan timeout,
CancellationToken cancellationToken)
{
ShutdownCount++;
State = WorkerClientState.Closed;
return Task.CompletedTask;
}
public void Kill(string reason)
{
KillCount++;
State = WorkerClientState.Faulted;
}
public ValueTask DisposeAsync()
{
return ValueTask.CompletedTask;
}
}
}
@@ -0,0 +1,47 @@
using System;
using System.Runtime.InteropServices;
using MxGateway.Contracts.Proto;
using MxGateway.Worker.Conversion;
namespace MxGateway.Worker.Tests.Conversion;
public sealed class HResultConverterTests
{
private readonly HResultConverter _converter = new();
[Fact]
public void Convert_WithComException_CapturesExceptionHResult()
{
COMException exception = new("Sensitive provider text should not be copied.", unchecked((int)0x80070057));
HResultConversion converted = _converter.Convert(exception);
Assert.Equal(unchecked((int)0x80070057), converted.HResult);
Assert.Equal(ProtocolStatusCode.MxaccessFailure, converted.ProtocolStatus.Code);
Assert.Contains("0x80070057", converted.ProtocolStatus.Message);
Assert.Contains(typeof(COMException).FullName!, converted.DiagnosticMessage);
Assert.DoesNotContain("Sensitive provider text", converted.DiagnosticMessage);
}
[Fact]
public void CreateProtocolStatus_WithSuccessHResult_ReturnsOk()
{
ProtocolStatus status = _converter.CreateProtocolStatus(0);
Assert.Equal(ProtocolStatusCode.Ok, status.Code);
Assert.Equal("HRESULT 0x00000000", status.Message);
}
[Fact]
public void Convert_WithNonComException_CapturesExceptionHResult()
{
InvalidOperationException exception = new("do not include this");
HResultConversion converted = _converter.Convert(exception);
Assert.Equal(exception.HResult, converted.HResult);
Assert.Equal(ProtocolStatusCode.MxaccessFailure, converted.ProtocolStatus.Code);
Assert.Contains("0x", converted.DiagnosticMessage);
Assert.DoesNotContain("do not include this", converted.DiagnosticMessage);
}
}
@@ -0,0 +1,118 @@
using System;
using System.Collections.Generic;
using MxGateway.Contracts.Proto;
using MxGateway.Worker.Conversion;
namespace MxGateway.Worker.Tests.Conversion;
public sealed class MxStatusProxyConverterTests
{
private readonly MxStatusProxyConverter _converter = new();
[Fact]
public void Convert_WithStatusStruct_PreservesStatusFields()
{
FakeMxStatusProxy status = new()
{
success = 1,
category = 5,
detectedBy = 3,
detail = 21,
};
MxStatusProxy converted = _converter.Convert(status);
Assert.Equal(1, converted.Success);
Assert.Equal(MxStatusCategory.OperationalError, converted.Category);
Assert.Equal(MxStatusSource.RespondingNmx, converted.DetectedBy);
Assert.Equal(21, converted.Detail);
Assert.Equal(5, converted.RawCategory);
Assert.Equal(3, converted.RawDetectedBy);
Assert.Equal("Invalid reference", converted.DiagnosticText);
}
[Fact]
public void ConvertMany_WithStatusArray_DoesNotCollapseEntries()
{
FakeMxStatusProxy[] statuses =
[
new()
{
success = 1,
category = 0,
detectedBy = 0,
detail = 0,
},
new()
{
success = 0,
category = 6,
detectedBy = 5,
detail = 33,
},
];
IReadOnlyList<MxStatusProxy> converted = _converter.ConvertMany(statuses);
Assert.Equal(2, converted.Count);
Assert.Equal(MxStatusCategory.Ok, converted[0].Category);
Assert.Equal(MxStatusCategory.SecurityError, converted[1].Category);
Assert.Equal(MxStatusSource.RespondingAutomationObject, converted[1].DetectedBy);
Assert.Equal("Write access denied", converted[1].DiagnosticText);
}
[Fact]
public void Convert_WithUnknownCategoryAndSource_PreservesRawFields()
{
FakeMxStatusProxy status = new()
{
success = -1,
category = 99,
detectedBy = 42,
detail = 1234,
};
MxStatusProxy converted = _converter.Convert(status);
Assert.Equal(-1, converted.Success);
Assert.Equal(MxStatusCategory.Unknown, converted.Category);
Assert.Equal(MxStatusSource.Unknown, converted.DetectedBy);
Assert.Equal(99, converted.RawCategory);
Assert.Equal(42, converted.RawDetectedBy);
Assert.Equal(1234, converted.Detail);
Assert.Equal(string.Empty, converted.DiagnosticText);
}
[Fact]
public void PreserveCompletionOnlyStatusBytes_ReturnsRawHexMetadata()
{
string rawStatus = _converter.PreserveCompletionOnlyStatusBytes(
[0x00, 0x00, 0x50, 0x80, 0x00]);
Assert.Equal("completion_only_status_hex=0000508000", rawStatus);
}
[Fact]
public void Convert_WithMissingStatusField_ThrowsConversionException()
{
MxStatusConversionException exception =
Assert.Throws<MxStatusConversionException>(() => _converter.Convert(new MissingFields()));
Assert.Contains("success", exception.Message);
}
public struct FakeMxStatusProxy
{
public short success;
public int category;
public int detectedBy;
public short detail;
}
private sealed class MissingFields
{
}
}
@@ -0,0 +1,183 @@
using System;
using Google.Protobuf;
using MxGateway.Contracts.Proto;
using MxGateway.Worker.Bootstrap;
using MxGateway.Worker.Conversion;
using ProtobufTimestamp = Google.Protobuf.WellKnownTypes.Timestamp;
namespace MxGateway.Worker.Tests.Conversion;
public sealed class VariantConverterTests
{
private readonly VariantConverter _converter = new();
[Theory]
[InlineData(true, MxDataType.Boolean, MxValue.KindOneofCase.BoolValue)]
[InlineData(42, MxDataType.Integer, MxValue.KindOneofCase.Int32Value)]
[InlineData(42L, MxDataType.Integer, MxValue.KindOneofCase.Int64Value)]
[InlineData(1.25f, MxDataType.Float, MxValue.KindOneofCase.FloatValue)]
[InlineData(2.5d, MxDataType.Double, MxValue.KindOneofCase.DoubleValue)]
[InlineData("value", MxDataType.String, MxValue.KindOneofCase.StringValue)]
public void Convert_WithSupportedScalar_ProjectsTypedValue(
object value,
MxDataType expectedDataType,
MxValue.KindOneofCase expectedKind)
{
MxValue converted = _converter.Convert(value);
Assert.Equal(expectedDataType, converted.DataType);
Assert.Equal(expectedKind, converted.KindCase);
Assert.False(string.IsNullOrWhiteSpace(converted.VariantType));
}
[Fact]
public void Convert_WithDateTime_ProjectsTimestamp()
{
DateTime dateTime = new(2026, 4, 26, 17, 45, 0, DateTimeKind.Utc);
MxValue converted = _converter.Convert(dateTime);
Assert.Equal(MxDataType.Time, converted.DataType);
Assert.Equal(ProtobufTimestamp.FromDateTime(dateTime), converted.TimestampValue);
Assert.Equal("VT_DATE", converted.VariantType);
}
[Fact]
public void Convert_WithFileTimeAndExpectedTime_ProjectsTimestamp()
{
DateTime dateTime = new(2026, 4, 26, 17, 45, 0, DateTimeKind.Utc);
MxValue converted = _converter.Convert(dateTime.ToFileTimeUtc(), MxDataType.Time);
Assert.Equal(MxDataType.Time, converted.DataType);
Assert.Equal(ProtobufTimestamp.FromDateTime(dateTime), converted.TimestampValue);
Assert.Equal("VT_I8", converted.VariantType);
}
[Theory]
[InlineData(null, "VT_EMPTY")]
[InlineData(typeof(DBNull), "VT_NULL")]
public void Convert_WithNullLikeValue_PreservesNull(
object? value,
string expectedVariantType)
{
object? actualValue = value is System.Type ? DBNull.Value : value;
MxValue converted = _converter.Convert(actualValue);
Assert.True(converted.IsNull);
Assert.Equal(MxDataType.NoData, converted.DataType);
Assert.Equal(expectedVariantType, converted.VariantType);
Assert.Equal(MxValue.KindOneofCase.None, converted.KindCase);
}
[Fact]
public void ConvertArray_WithSupportedArrays_ProjectsTypedValuesAndDimensions()
{
MxValue bools = _converter.Convert(new[] { true, false });
MxValue ints = _converter.Convert(new[] { 1, 2, 3 });
MxValue floats = _converter.Convert(new[] { 1.25f, 2.5f });
MxValue doubles = _converter.Convert(new[] { 1.25d, 2.5d });
MxValue strings = _converter.Convert(new[] { "one", "two" });
MxValue times = _converter.Convert(new[]
{
new DateTime(2026, 4, 26, 17, 45, 0, DateTimeKind.Utc),
new DateTime(2026, 4, 26, 17, 46, 0, DateTimeKind.Utc),
});
Assert.Equal(new[] { true, false }, bools.ArrayValue.BoolValues.Values);
Assert.Equal(new[] { 1, 2, 3 }, ints.ArrayValue.Int32Values.Values);
Assert.Equal(new[] { 1.25f, 2.5f }, floats.ArrayValue.FloatValues.Values);
Assert.Equal(new[] { 1.25d, 2.5d }, doubles.ArrayValue.DoubleValues.Values);
Assert.Equal(new[] { "one", "two" }, strings.ArrayValue.StringValues.Values);
Assert.Equal(2, times.ArrayValue.TimestampValues.Values.Count);
Assert.Equal(new uint[] { 2 }, bools.ArrayValue.Dimensions);
Assert.Equal(MxDataType.Boolean, bools.ArrayValue.ElementDataType);
}
[Fact]
public void ConvertArray_WithMultidimensionalArray_PreservesRankAndDimensions()
{
int[,] values =
{
{ 1, 2, 3 },
{ 4, 5, 6 },
};
MxValue converted = _converter.Convert(values);
Assert.Equal(new uint[] { 2, 3 }, converted.ArrayValue.Dimensions);
Assert.Equal(new[] { 1, 2, 3, 4, 5, 6 }, converted.ArrayValue.Int32Values.Values);
}
[Fact]
public void ConvertArray_WithExpectedTimeAndFileTimeValues_ProjectsTimestampArray()
{
DateTime first = new(2026, 4, 26, 17, 45, 0, DateTimeKind.Utc);
DateTime second = new(2026, 4, 26, 17, 46, 0, DateTimeKind.Utc);
MxValue converted = _converter.Convert(
new[] { first.ToFileTimeUtc(), second.ToFileTimeUtc() },
MxDataType.Time);
Assert.Equal(MxDataType.Time, converted.ArrayValue.ElementDataType);
Assert.Equal(
new[] { ProtobufTimestamp.FromDateTime(first), ProtobufTimestamp.FromDateTime(second) },
converted.ArrayValue.TimestampValues.Values);
}
[Fact]
public void Convert_WithUnknownScalar_PreservesRawMetadata()
{
UnsupportedVariant value = new("opaque");
MxValue converted = _converter.Convert(value);
Assert.Equal(MxDataType.Unknown, converted.DataType);
Assert.Equal(MxValue.KindOneofCase.RawValue, converted.KindCase);
Assert.Contains(typeof(UnsupportedVariant).FullName!, converted.VariantType);
Assert.Contains(typeof(UnsupportedVariant).FullName!, converted.RawDiagnostic);
Assert.Equal(ByteString.CopyFromUtf8("opaque"), converted.RawValue);
}
[Fact]
public void ConvertArray_WithUnknownArray_PreservesRawMetadata()
{
UnsupportedVariant[] values =
[
new("first"),
new("second"),
];
MxValue converted = _converter.Convert(values);
Assert.Equal(MxDataType.Unknown, converted.ArrayValue.ElementDataType);
Assert.Equal(MxArray.ValuesOneofCase.RawValues, converted.ArrayValue.ValuesCase);
Assert.Equal(new uint[] { 2 }, converted.ArrayValue.Dimensions);
Assert.Equal("first", converted.ArrayValue.RawValues.Values[0].ToStringUtf8());
Assert.Contains(typeof(UnsupportedVariant).FullName!, converted.ArrayValue.RawDiagnostic);
}
[Fact]
public void Redactor_WithCredentialBearingValueFields_RedactsBeforeLogging()
{
Assert.Equal(WorkerLogRedactor.RedactedValue, WorkerLogRedactor.RedactValue("credential_value", "secret"));
Assert.Equal(WorkerLogRedactor.RedactedValue, WorkerLogRedactor.RedactValue("password_value", "secret"));
Assert.Equal(WorkerLogRedactor.RedactedValue, WorkerLogRedactor.RedactValue("secured_write_token", "secret"));
}
private sealed class UnsupportedVariant
{
private readonly string _value;
public UnsupportedVariant(string value)
{
_value = value;
}
public override string ToString()
{
return _value;
}
}
}
@@ -1,5 +1,6 @@
using System.Collections.Generic;
using System.IO;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using MxGateway.Contracts;
@@ -37,6 +38,10 @@ public sealed class WorkerPipeSessionTests
Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerHello, written[0].BodyCase);
Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerReady, written[1].BodyCase);
Assert.Equal(Nonce, written[0].WorkerHello.Nonce);
Assert.Equal(1234, written[1].WorkerReady.WorkerProcessId);
Assert.Equal(MxGateway.Worker.MxAccess.MxAccessInteropInfo.ProgId, written[1].WorkerReady.MxaccessProgid);
Assert.Equal(MxGateway.Worker.MxAccess.MxAccessInteropInfo.Clsid, written[1].WorkerReady.MxaccessClsid);
Assert.NotNull(written[1].WorkerReady.ReadyTimestamp);
}
[Fact]
@@ -117,6 +122,31 @@ public sealed class WorkerPipeSessionTests
Assert.Equal(WorkerFaultCategory.ProtocolViolation, fault.WorkerFault.Category);
}
[Fact]
public async Task CompleteStartupHandshakeAsync_WhenMxAccessCreationFails_WritesFaultInsteadOfReady()
{
const int hresult = unchecked((int)0x80040154);
WorkerFrameProtocolOptions options = CreateOptions();
MemoryStream inbound = new();
await new WorkerFrameWriter(inbound, options).WriteAsync(CreateGatewayHelloEnvelope());
inbound.Position = 0;
MemoryStream outbound = new();
WorkerPipeSession session = CreateSession(inbound, outbound, options);
await Assert.ThrowsAsync<COMException>(
async () => await session.CompleteStartupHandshakeAsync(
_ => Task.FromException<WorkerReady>(new COMException("Class not registered.", hresult))));
WorkerEnvelope[] written = ReadWrittenFrames(outbound, options);
Assert.Equal(2, written.Length);
Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerHello, written[0].BodyCase);
Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerFault, written[1].BodyCase);
Assert.Equal(WorkerFaultCategory.MxaccessCreationFailed, written[1].WorkerFault.Category);
Assert.Equal(hresult, written[1].WorkerFault.Hresult);
Assert.Equal(typeof(COMException).FullName, written[1].WorkerFault.ExceptionType);
Assert.Equal(ProtocolStatusCode.WorkerUnavailable, written[1].WorkerFault.ProtocolStatus.Code);
}
private static WorkerPipeSession CreateSession(
Stream inbound,
Stream outbound,
@@ -0,0 +1,24 @@
using System;
using System.Threading.Tasks;
using MxGateway.Worker.MxAccess;
namespace MxGateway.Worker.Tests.MxAccess;
public sealed class MxAccessLiveComCreationTests
{
[Fact]
public async Task StartAsync_WhenOptedIn_CreatesInstalledMxAccessComObjectOnSta()
{
if (!string.Equals(
Environment.GetEnvironmentVariable("MXGATEWAY_RUN_LIVE_MXACCESS_TESTS"),
"1",
StringComparison.Ordinal))
{
return;
}
using MxAccessStaSession session = new();
await session.StartAsync(workerProcessId: 1234);
}
}
@@ -0,0 +1,133 @@
using System;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using MxGateway.Contracts.Proto;
using MxGateway.Worker.MxAccess;
using MxGateway.Worker.Sta;
namespace MxGateway.Worker.Tests.MxAccess;
public sealed class MxAccessStaSessionTests
{
[Fact]
public async Task StartAsync_CreatesComObjectAndAttachesEventSinkOnStaThread()
{
FakeMxAccessComObjectFactory factory = new();
FakeMxAccessEventSink eventSink = new();
using StaRuntime runtime = CreateRuntime();
using MxAccessStaSession session = new(runtime, factory, eventSink);
WorkerReady ready = await session.StartAsync(workerProcessId: 1234);
Assert.Equal(1234, ready.WorkerProcessId);
Assert.Equal(MxAccessInteropInfo.ProgId, ready.MxaccessProgid);
Assert.Equal(MxAccessInteropInfo.Clsid, ready.MxaccessClsid);
Assert.NotNull(ready.ReadyTimestamp);
Assert.Equal(runtime.StaThreadId, factory.CreateThreadId);
Assert.Equal(runtime.StaThreadId, eventSink.AttachThreadId);
Assert.Equal(ApartmentState.STA, factory.CreateApartmentState);
Assert.Same(factory.CreatedObject, eventSink.AttachedObject);
}
[Fact]
public async Task StartAsync_WhenFactoryFails_MapsCreationExceptionWithHResult()
{
const int hresult = unchecked((int)0x80040154);
FakeMxAccessComObjectFactory factory = new(new COMException("Class not registered.", hresult));
FakeMxAccessEventSink eventSink = new();
using StaRuntime runtime = CreateRuntime();
using MxAccessStaSession session = new(runtime, factory, eventSink);
MxAccessCreationException exception = await Assert.ThrowsAsync<MxAccessCreationException>(
() => session.StartAsync(workerProcessId: 1234));
Assert.Equal(hresult, exception.CapturedHResult);
Assert.Equal(MxAccessInteropInfo.ProgId, exception.AttemptedProgId);
Assert.Equal(MxAccessInteropInfo.Clsid, exception.AttemptedClsid);
Assert.Null(eventSink.AttachedObject);
}
[Fact]
public async Task Dispose_DetachesEventSinkOnStaThread()
{
FakeMxAccessComObjectFactory factory = new();
FakeMxAccessEventSink eventSink = new();
using StaRuntime runtime = CreateRuntime();
MxAccessStaSession session = new(runtime, factory, eventSink);
await session.StartAsync(workerProcessId: 1234);
session.Dispose();
Assert.Equal(runtime.StaThreadId, eventSink.DetachThreadId);
}
private static StaRuntime CreateRuntime()
{
return new StaRuntime(
new NoopComApartmentInitializer(),
new StaMessagePump(),
TimeSpan.FromMilliseconds(25));
}
private sealed class FakeMxAccessComObjectFactory : IMxAccessComObjectFactory
{
private readonly Exception? exception;
public FakeMxAccessComObjectFactory(Exception? exception = null)
{
this.exception = exception;
}
public object CreatedObject { get; } = new();
public int? CreateThreadId { get; private set; }
public ApartmentState? CreateApartmentState { get; private set; }
public object Create()
{
CreateThreadId = Thread.CurrentThread.ManagedThreadId;
CreateApartmentState = Thread.CurrentThread.GetApartmentState();
if (exception is not null)
{
throw exception;
}
return CreatedObject;
}
}
private sealed class FakeMxAccessEventSink : IMxAccessEventSink
{
public object? AttachedObject { get; private set; }
public int? AttachThreadId { get; private set; }
public int? DetachThreadId { get; private set; }
public void Attach(object mxAccessComObject)
{
AttachedObject = mxAccessComObject;
AttachThreadId = Thread.CurrentThread.ManagedThreadId;
}
public void Detach()
{
DetachThreadId = Thread.CurrentThread.ManagedThreadId;
AttachedObject = null;
}
}
private sealed class NoopComApartmentInitializer : IStaComApartmentInitializer
{
public void Initialize()
{
}
public void Uninitialize()
{
}
}
}
@@ -0,0 +1,152 @@
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using MxGateway.Worker.Sta;
namespace MxGateway.Worker.Tests.Sta;
public sealed class StaRuntimeTests
{
[Fact]
public async Task InvokeAsync_ExecutesCommandOnStaThread()
{
RecordingComApartmentInitializer initializer = new();
using StaRuntime runtime = CreateRuntime(initializer);
runtime.Start();
StaCommandObservation observation = await runtime.InvokeAsync(
() => new StaCommandObservation(
Thread.CurrentThread.ManagedThreadId,
Thread.CurrentThread.GetApartmentState()));
Assert.Equal(runtime.StaThreadId, observation.ThreadId);
Assert.Equal(initializer.InitializeThreadId, observation.ThreadId);
Assert.Equal(ApartmentState.STA, observation.ApartmentState);
}
[Fact]
public async Task InvokeAsync_WakesIdlePumpForQueuedCommand()
{
RecordingComApartmentInitializer initializer = new();
using StaRuntime runtime = new(
initializer,
new StaMessagePump(),
TimeSpan.FromSeconds(30));
runtime.Start();
Stopwatch stopwatch = Stopwatch.StartNew();
int threadId = await runtime.InvokeAsync(() => Thread.CurrentThread.ManagedThreadId);
stopwatch.Stop();
Assert.Equal(runtime.StaThreadId, threadId);
Assert.True(
stopwatch.Elapsed < TimeSpan.FromSeconds(2),
$"Command took {stopwatch.Elapsed} to execute, so the command wake event did not wake the STA promptly.");
}
[Fact]
public void Shutdown_StopsThreadAndUninitializesComApartment()
{
RecordingComApartmentInitializer initializer = new();
using StaRuntime runtime = CreateRuntime(initializer);
runtime.Start();
bool stopped = runtime.Shutdown(TimeSpan.FromSeconds(2));
Assert.True(stopped);
Assert.False(runtime.IsRunning);
Assert.Equal(1, initializer.InitializeCount);
Assert.Equal(1, initializer.UninitializeCount);
Assert.Equal(initializer.InitializeThreadId, initializer.UninitializeThreadId);
}
[Fact]
public void LastActivityUtc_UpdatesWhilePumpIsIdle()
{
RecordingComApartmentInitializer initializer = new();
using StaRuntime runtime = CreateRuntime(initializer);
runtime.Start();
DateTimeOffset firstActivity = runtime.LastActivityUtc;
bool updated = SpinWait.SpinUntil(
() => runtime.LastActivityUtc > firstActivity,
TimeSpan.FromSeconds(2));
Assert.True(updated);
}
[Fact]
public async Task InvokeAsync_CommandException_FaultsReturnedTaskWithoutStoppingRuntime()
{
RecordingComApartmentInitializer initializer = new();
using StaRuntime runtime = CreateRuntime(initializer);
runtime.Start();
InvalidOperationException exception = await Assert.ThrowsAsync<InvalidOperationException>(
() => runtime.InvokeAsync<int>(() => throw new InvalidOperationException("command failed")));
int threadId = await runtime.InvokeAsync(() => Thread.CurrentThread.ManagedThreadId);
Assert.Equal("command failed", exception.Message);
Assert.Equal(runtime.StaThreadId, threadId);
}
[Fact]
public async Task InvokeAsync_AfterShutdown_ReturnsFaultedTask()
{
RecordingComApartmentInitializer initializer = new();
using StaRuntime runtime = CreateRuntime(initializer);
runtime.Start();
runtime.Shutdown(TimeSpan.FromSeconds(2));
InvalidOperationException exception = await Assert.ThrowsAsync<InvalidOperationException>(
() => runtime.InvokeAsync(() => Thread.CurrentThread.ManagedThreadId));
Assert.Contains("shutting down", exception.Message);
}
private static StaRuntime CreateRuntime(RecordingComApartmentInitializer initializer)
{
return new StaRuntime(
initializer,
new StaMessagePump(),
TimeSpan.FromMilliseconds(25));
}
private sealed class StaCommandObservation
{
public StaCommandObservation(int threadId, ApartmentState apartmentState)
{
ThreadId = threadId;
ApartmentState = apartmentState;
}
public int ThreadId { get; }
public ApartmentState ApartmentState { get; }
}
private sealed class RecordingComApartmentInitializer : IStaComApartmentInitializer
{
public int InitializeCount { get; private set; }
public int UninitializeCount { get; private set; }
public int? InitializeThreadId { get; private set; }
public int? UninitializeThreadId { get; private set; }
public void Initialize()
{
InitializeCount++;
InitializeThreadId = Thread.CurrentThread.ManagedThreadId;
}
public void Uninitialize()
{
UninitializeCount++;
UninitializeThreadId = Thread.CurrentThread.ManagedThreadId;
}
}
}
@@ -0,0 +1,22 @@
using MxGateway.Contracts.Proto;
namespace MxGateway.Worker.Conversion;
public sealed class HResultConversion
{
public HResultConversion(
int hresult,
ProtocolStatus protocolStatus,
string diagnosticMessage)
{
HResult = hresult;
ProtocolStatus = protocolStatus;
DiagnosticMessage = diagnosticMessage;
}
public int HResult { get; }
public ProtocolStatus ProtocolStatus { get; }
public string DiagnosticMessage { get; }
}
@@ -0,0 +1,48 @@
using System;
using System.Runtime.InteropServices;
using MxGateway.Contracts.Proto;
namespace MxGateway.Worker.Conversion;
public sealed class HResultConverter
{
public HResultConversion Convert(Exception exception)
{
if (exception is null)
{
throw new ArgumentNullException(nameof(exception));
}
int hresult = exception is COMException comException
? comException.ErrorCode
: exception.HResult;
return new HResultConversion(
hresult,
CreateProtocolStatus(hresult, exception),
CreateSafeDiagnosticMessage(exception));
}
public ProtocolStatus CreateProtocolStatus(
int hresult,
Exception? exception = null)
{
return new ProtocolStatus
{
Code = hresult == 0 ? ProtocolStatusCode.Ok : ProtocolStatusCode.MxaccessFailure,
Message = exception is null
? FormatHResult(hresult)
: $"{exception.GetType().Name}: {FormatHResult(hresult)}",
};
}
private static string CreateSafeDiagnosticMessage(Exception exception)
{
return $"{exception.GetType().FullName}: {FormatHResult(exception.HResult)}";
}
private static string FormatHResult(int hresult)
{
return $"HRESULT 0x{unchecked((uint)hresult):X8}";
}
}
@@ -0,0 +1,11 @@
using System;
namespace MxGateway.Worker.Conversion;
public sealed class MxStatusConversionException : Exception
{
public MxStatusConversionException(string message)
: base(message)
{
}
}
@@ -0,0 +1,54 @@
using System.Collections.Generic;
namespace MxGateway.Worker.Conversion;
internal static class MxStatusDetailText
{
private static readonly IReadOnlyDictionary<int, string> KnownDetails = new Dictionary<int, string>
{
[16] = "Request timed out",
[17] = "Platform communication error",
[18] = "Invalid platform ID",
[19] = "Invalid engine ID",
[20] = "Engine communication error",
[21] = "Invalid reference",
[22] = "No Galaxy Repository",
[23] = "Invalid object ID",
[24] = "Object signature mismatch",
[25] = "Invalid primitive ID",
[26] = "Invalid attribute ID",
[27] = "Invalid property ID",
[28] = "Index out of range",
[29] = "Data out of range",
[30] = "Incorrect data type",
[31] = "Attribute not readable",
[32] = "Attribute not writeable",
[33] = "Write access denied",
[34] = "Unknown error",
[36] = "Wrong data type",
[37] = "Wrong number of dimensions",
[38] = "Invalid index",
[39] = "Index out of order",
[40] = "Dimension does not exist",
[41] = "Conversion not supported",
[42] = "Unable to convert string",
[43] = "Overflow",
[44] = "Attribute signature mismatch",
[47] = "Nmx version mismatch",
[48] = "Nmx command not valid",
[49] = "Lmx version mismatch",
[50] = "Lmx command not valid",
[56] = "Secured Write",
[57] = "Verified Write",
[60] = "User did not have the necessary permissions to write",
[61] = "Verifier did not have the necessary permissions to verify",
[541] = "Conversion to intended data type is not supported",
[542] = "Unable to convert the input string to intended data type",
[8017] = "Object must be offscan to modify attributes that have an MxSecurityConfigure security classification",
};
public static string Lookup(int detail)
{
return KnownDetails.TryGetValue(detail, out string text) ? text : string.Empty;
}
}
@@ -0,0 +1,127 @@
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Reflection;
using MxGateway.Contracts.Proto;
namespace MxGateway.Worker.Conversion;
public sealed class MxStatusProxyConverter
{
public MxStatusProxy Convert(object status)
{
if (status is null)
{
throw new ArgumentNullException(nameof(status));
}
Type statusType = status.GetType();
int success = ReadInt32Field(status, statusType, "success");
int rawCategory = ReadInt32Field(status, statusType, "category");
int rawDetectedBy = ReadInt32Field(status, statusType, "detectedBy");
int detail = ReadInt32Field(status, statusType, "detail");
return new MxStatusProxy
{
Success = success,
Category = MapCategory(rawCategory),
DetectedBy = MapSource(rawDetectedBy),
Detail = detail,
RawCategory = rawCategory,
RawDetectedBy = rawDetectedBy,
DiagnosticText = MxStatusDetailText.Lookup(detail),
};
}
public IReadOnlyList<MxStatusProxy> ConvertMany(Array? statuses)
{
if (statuses is null)
{
return Array.Empty<MxStatusProxy>();
}
List<MxStatusProxy> converted = new(statuses.Length);
foreach (object? status in statuses)
{
if (status is null)
{
converted.Add(new MxStatusProxy
{
Category = MxStatusCategory.Unknown,
DetectedBy = MxStatusSource.Unknown,
DiagnosticText = "Null MXSTATUS_PROXY entry.",
});
continue;
}
converted.Add(Convert(status));
}
return converted;
}
public string PreserveCompletionOnlyStatusBytes(byte[] statusBytes)
{
if (statusBytes is null)
{
throw new ArgumentNullException(nameof(statusBytes));
}
return $"completion_only_status_hex={BitConverter.ToString(statusBytes).Replace("-", string.Empty)}";
}
private static int ReadInt32Field(
object value,
Type valueType,
string fieldName)
{
FieldInfo? field = valueType.GetField(fieldName, BindingFlags.Instance | BindingFlags.Public);
if (field is null)
{
throw new MxStatusConversionException(
$"Status object type '{valueType.FullName}' does not expose required field '{fieldName}'.");
}
object? fieldValue = field.GetValue(value);
if (fieldValue is null)
{
throw new MxStatusConversionException(
$"Status object field '{fieldName}' on type '{valueType.FullName}' is null.");
}
return System.Convert.ToInt32(fieldValue, CultureInfo.InvariantCulture);
}
private static MxStatusCategory MapCategory(int rawCategory)
{
return rawCategory switch
{
-1 => MxStatusCategory.Unknown,
0 => MxStatusCategory.Ok,
1 => MxStatusCategory.Pending,
2 => MxStatusCategory.Warning,
3 => MxStatusCategory.CommunicationError,
4 => MxStatusCategory.ConfigurationError,
5 => MxStatusCategory.OperationalError,
6 => MxStatusCategory.SecurityError,
7 => MxStatusCategory.SoftwareError,
8 => MxStatusCategory.OtherError,
_ => MxStatusCategory.Unknown,
};
}
private static MxStatusSource MapSource(int rawDetectedBy)
{
return rawDetectedBy switch
{
-1 => MxStatusSource.Unknown,
0 => MxStatusSource.RequestingLmx,
1 => MxStatusSource.RespondingLmx,
2 => MxStatusSource.RequestingNmx,
3 => MxStatusSource.RespondingNmx,
4 => MxStatusSource.RequestingAutomationObject,
5 => MxStatusSource.RespondingAutomationObject,
_ => MxStatusSource.Unknown,
};
}
}
@@ -0,0 +1,522 @@
using System;
using System.Globalization;
using Google.Protobuf;
using Google.Protobuf.WellKnownTypes;
using MxGateway.Contracts.Proto;
namespace MxGateway.Worker.Conversion;
public sealed class VariantConverter
{
public MxValue Convert(object? value)
{
return Convert(value, MxDataType.Unspecified);
}
public MxValue Convert(
object? value,
MxDataType expectedDataType)
{
if (value is null || value is DBNull)
{
return CreateNullValue(value, expectedDataType);
}
if (value is Array array)
{
return new MxValue
{
DataType = MxDataType.Unspecified,
VariantType = CreateArrayVariantType(array),
ArrayValue = ConvertArray(array, expectedDataType),
};
}
return ConvertScalar(value, expectedDataType);
}
public MxArray ConvertArray(
Array array,
MxDataType expectedElementDataType = MxDataType.Unspecified)
{
if (array is null)
{
throw new ArgumentNullException(nameof(array));
}
MxArray mxArray = new()
{
VariantType = CreateArrayVariantType(array),
};
for (int dimension = 0; dimension < array.Rank; dimension++)
{
mxArray.Dimensions.Add((uint)array.GetLength(dimension));
}
System.Type? elementType = array.GetType().GetElementType();
MxDataType elementDataType = ResolveArrayElementDataType(elementType, expectedElementDataType);
mxArray.ElementDataType = elementDataType;
switch (elementDataType)
{
case MxDataType.Boolean:
mxArray.BoolValues = ConvertBoolArray(array);
return mxArray;
case MxDataType.Integer:
if (elementType == typeof(long) || elementType == typeof(ulong))
{
mxArray.Int64Values = ConvertInt64Array(array);
}
else
{
mxArray.Int32Values = ConvertInt32Array(array);
}
return mxArray;
case MxDataType.Float:
mxArray.FloatValues = ConvertFloatArray(array);
return mxArray;
case MxDataType.Double:
mxArray.DoubleValues = ConvertDoubleArray(array);
return mxArray;
case MxDataType.String:
mxArray.StringValues = ConvertStringArray(array);
return mxArray;
case MxDataType.Time:
mxArray.TimestampValues = ConvertTimestampArray(array);
return mxArray;
default:
mxArray.ElementDataType = MxDataType.Unknown;
mxArray.RawElementDataType = (int)expectedElementDataType;
mxArray.RawDiagnostic = CreateRawDiagnostic(array);
mxArray.RawValues = ConvertRawArray(array);
return mxArray;
}
}
private static MxValue ConvertScalar(
object value,
MxDataType expectedDataType)
{
System.Type valueType = value.GetType();
string variantType = GetVariantTypeName(valueType);
switch (System.Type.GetTypeCode(valueType))
{
case TypeCode.Boolean:
return new MxValue
{
DataType = MxDataType.Boolean,
VariantType = variantType,
BoolValue = (bool)value,
};
case TypeCode.Byte:
case TypeCode.SByte:
case TypeCode.Int16:
case TypeCode.UInt16:
case TypeCode.Int32:
return new MxValue
{
DataType = MxDataType.Integer,
VariantType = variantType,
Int32Value = System.Convert.ToInt32(value, CultureInfo.InvariantCulture),
};
case TypeCode.UInt32:
case TypeCode.Int64:
return ConvertInt64Scalar(value, variantType, expectedDataType);
case TypeCode.UInt64:
return ConvertUInt64Scalar((ulong)value, variantType, expectedDataType);
case TypeCode.Single:
return new MxValue
{
DataType = MxDataType.Float,
VariantType = variantType,
FloatValue = (float)value,
};
case TypeCode.Double:
return new MxValue
{
DataType = MxDataType.Double,
VariantType = variantType,
DoubleValue = (double)value,
};
case TypeCode.Decimal:
return new MxValue
{
DataType = MxDataType.Double,
VariantType = variantType,
DoubleValue = System.Convert.ToDouble(value, CultureInfo.InvariantCulture),
RawDiagnostic = "Decimal value projected to double.",
};
case TypeCode.String:
case TypeCode.Char:
return new MxValue
{
DataType = MxDataType.String,
VariantType = variantType,
StringValue = System.Convert.ToString(value, CultureInfo.InvariantCulture) ?? string.Empty,
};
case TypeCode.DateTime:
return new MxValue
{
DataType = MxDataType.Time,
VariantType = variantType,
TimestampValue = ToTimestamp((DateTime)value),
};
default:
return CreateRawValue(value, expectedDataType);
}
}
private static MxValue ConvertInt64Scalar(
object value,
string variantType,
MxDataType expectedDataType)
{
long longValue = System.Convert.ToInt64(value, CultureInfo.InvariantCulture);
if (expectedDataType == MxDataType.Time)
{
return new MxValue
{
DataType = MxDataType.Time,
VariantType = variantType,
TimestampValue = Timestamp.FromDateTime(DateTime.FromFileTimeUtc(longValue)),
};
}
return new MxValue
{
DataType = MxDataType.Integer,
VariantType = variantType,
Int64Value = longValue,
};
}
private static MxValue ConvertUInt64Scalar(
ulong value,
string variantType,
MxDataType expectedDataType)
{
if (expectedDataType == MxDataType.Time && value <= long.MaxValue)
{
return new MxValue
{
DataType = MxDataType.Time,
VariantType = variantType,
TimestampValue = Timestamp.FromDateTime(DateTime.FromFileTimeUtc((long)value)),
};
}
if (value <= long.MaxValue)
{
return new MxValue
{
DataType = MxDataType.Integer,
VariantType = variantType,
Int64Value = (long)value,
};
}
return CreateRawValue(value, expectedDataType, "UInt64 value exceeds Int64 range.");
}
private static MxValue CreateNullValue(
object? value,
MxDataType expectedDataType)
{
return new MxValue
{
DataType = expectedDataType == MxDataType.Unspecified ? MxDataType.NoData : expectedDataType,
VariantType = value is DBNull ? "VT_NULL" : "VT_EMPTY",
IsNull = true,
};
}
private static MxValue CreateRawValue(
object value,
MxDataType expectedDataType,
string? diagnosticPrefix = null)
{
string diagnostic = CreateRawDiagnostic(value);
if (!string.IsNullOrWhiteSpace(diagnosticPrefix))
{
diagnostic = $"{diagnosticPrefix} {diagnostic}";
}
return new MxValue
{
DataType = MxDataType.Unknown,
VariantType = GetVariantTypeName(value.GetType()),
RawDataType = (int)expectedDataType,
RawDiagnostic = diagnostic,
RawValue = ByteString.CopyFromUtf8(System.Convert.ToString(value, CultureInfo.InvariantCulture) ?? string.Empty),
};
}
private static BoolArray ConvertBoolArray(Array array)
{
BoolArray values = new();
foreach (object? item in array)
{
values.Values.Add(item is not null && System.Convert.ToBoolean(item, CultureInfo.InvariantCulture));
}
return values;
}
private static Int32Array ConvertInt32Array(Array array)
{
Int32Array values = new();
foreach (object? item in array)
{
values.Values.Add(item is null ? 0 : System.Convert.ToInt32(item, CultureInfo.InvariantCulture));
}
return values;
}
private static Int64Array ConvertInt64Array(Array array)
{
Int64Array values = new();
foreach (object? item in array)
{
values.Values.Add(item is null ? 0 : System.Convert.ToInt64(item, CultureInfo.InvariantCulture));
}
return values;
}
private static FloatArray ConvertFloatArray(Array array)
{
FloatArray values = new();
foreach (object? item in array)
{
values.Values.Add(item is null ? 0 : System.Convert.ToSingle(item, CultureInfo.InvariantCulture));
}
return values;
}
private static DoubleArray ConvertDoubleArray(Array array)
{
DoubleArray values = new();
foreach (object? item in array)
{
values.Values.Add(item is null ? 0 : System.Convert.ToDouble(item, CultureInfo.InvariantCulture));
}
return values;
}
private static StringArray ConvertStringArray(Array array)
{
StringArray values = new();
foreach (object? item in array)
{
values.Values.Add(item is null ? string.Empty : System.Convert.ToString(item, CultureInfo.InvariantCulture) ?? string.Empty);
}
return values;
}
private static TimestampArray ConvertTimestampArray(Array array)
{
TimestampArray values = new();
foreach (object? item in array)
{
if (item is null)
{
values.Values.Add(Timestamp.FromDateTime(new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc)));
}
else if (item is DateTime dateTime)
{
values.Values.Add(ToTimestamp(dateTime));
}
else
{
long fileTime = System.Convert.ToInt64(item, CultureInfo.InvariantCulture);
values.Values.Add(Timestamp.FromDateTime(DateTime.FromFileTimeUtc(fileTime)));
}
}
return values;
}
private static RawArray ConvertRawArray(Array array)
{
RawArray values = new();
foreach (object? item in array)
{
string rawValue = item is null
? string.Empty
: System.Convert.ToString(item, CultureInfo.InvariantCulture) ?? string.Empty;
values.Values.Add(ByteString.CopyFromUtf8(rawValue));
}
return values;
}
private static MxDataType ResolveArrayElementDataType(
System.Type? elementType,
MxDataType expectedElementDataType)
{
if (expectedElementDataType != MxDataType.Unspecified)
{
return expectedElementDataType;
}
if (elementType == typeof(bool))
{
return MxDataType.Boolean;
}
if (elementType == typeof(byte)
|| elementType == typeof(sbyte)
|| elementType == typeof(short)
|| elementType == typeof(ushort)
|| elementType == typeof(int)
|| elementType == typeof(uint)
|| elementType == typeof(long)
|| elementType == typeof(ulong))
{
return MxDataType.Integer;
}
if (elementType == typeof(float))
{
return MxDataType.Float;
}
if (elementType == typeof(double) || elementType == typeof(decimal))
{
return MxDataType.Double;
}
if (elementType == typeof(string) || elementType == typeof(char))
{
return MxDataType.String;
}
if (elementType == typeof(DateTime))
{
return MxDataType.Time;
}
return MxDataType.Unknown;
}
private static Timestamp ToTimestamp(DateTime dateTime)
{
DateTime utcDateTime = dateTime.Kind switch
{
DateTimeKind.Utc => dateTime,
DateTimeKind.Local => dateTime.ToUniversalTime(),
_ => DateTime.SpecifyKind(dateTime, DateTimeKind.Utc),
};
return Timestamp.FromDateTime(utcDateTime);
}
private static string CreateArrayVariantType(Array array)
{
System.Type? elementType = array.GetType().GetElementType();
return $"SAFEARRAY({GetVariantTypeName(elementType)})";
}
private static string GetVariantTypeName(System.Type? type)
{
if (type is null)
{
return "VT_EMPTY";
}
System.Type nonNullableType = Nullable.GetUnderlyingType(type) ?? type;
if (nonNullableType == typeof(bool))
{
return "VT_BOOL";
}
if (nonNullableType == typeof(byte))
{
return "VT_UI1";
}
if (nonNullableType == typeof(sbyte))
{
return "VT_I1";
}
if (nonNullableType == typeof(short))
{
return "VT_I2";
}
if (nonNullableType == typeof(ushort))
{
return "VT_UI2";
}
if (nonNullableType == typeof(int))
{
return "VT_I4";
}
if (nonNullableType == typeof(uint))
{
return "VT_UI4";
}
if (nonNullableType == typeof(long))
{
return "VT_I8";
}
if (nonNullableType == typeof(ulong))
{
return "VT_UI8";
}
if (nonNullableType == typeof(float))
{
return "VT_R4";
}
if (nonNullableType == typeof(double) || nonNullableType == typeof(decimal))
{
return "VT_R8";
}
if (nonNullableType == typeof(string) || nonNullableType == typeof(char))
{
return "VT_BSTR";
}
if (nonNullableType == typeof(DateTime))
{
return "VT_DATE";
}
return $"CLR:{nonNullableType.FullName}";
}
private static string CreateRawDiagnostic(object value)
{
return $"Unsupported variant projection for CLR type '{value.GetType().FullName}'.";
}
}
+102 -13
View File
@@ -15,6 +15,7 @@ public sealed class WorkerPipeSession
private readonly Func<int> _processIdProvider;
private readonly WorkerFrameReader _reader;
private readonly WorkerFrameWriter _writer;
private MxAccessStaSession? _mxAccessStaSession;
private long _nextSequence;
public WorkerPipeSession(
@@ -42,7 +43,7 @@ public sealed class WorkerPipeSession
public Task CompleteStartupHandshakeAsync(CancellationToken cancellationToken = default)
{
return CompleteStartupHandshakeAsync(_ => Task.CompletedTask, cancellationToken);
return CompleteStartupHandshakeAsync(InitializeMxAccessAsync, cancellationToken);
}
public async Task CompleteStartupHandshakeAsync(
@@ -54,20 +55,44 @@ public sealed class WorkerPipeSession
throw new ArgumentNullException(nameof(initializeMxAccessAsync));
}
await CompleteStartupHandshakeAsync(
async innerCancellationToken =>
{
await initializeMxAccessAsync(innerCancellationToken).ConfigureAwait(false);
return CreateWorkerReady();
},
cancellationToken).ConfigureAwait(false);
}
public async Task CompleteStartupHandshakeAsync(
Func<CancellationToken, Task<WorkerReady>> initializeMxAccessAsync,
CancellationToken cancellationToken = default)
{
if (initializeMxAccessAsync is null)
{
throw new ArgumentNullException(nameof(initializeMxAccessAsync));
}
try
{
WorkerEnvelope envelope = await _reader.ReadAsync(cancellationToken).ConfigureAwait(false);
ValidateGatewayHello(envelope);
await WriteWorkerHelloAsync(cancellationToken).ConfigureAwait(false);
await initializeMxAccessAsync(cancellationToken).ConfigureAwait(false);
await WriteWorkerReadyAsync(cancellationToken).ConfigureAwait(false);
WorkerReady ready = await initializeMxAccessAsync(cancellationToken).ConfigureAwait(false);
await WriteWorkerReadyAsync(ready, cancellationToken).ConfigureAwait(false);
}
catch (WorkerFrameProtocolException exception)
{
await TryWriteFaultAsync(exception, cancellationToken).ConfigureAwait(false);
throw;
}
catch (Exception exception) when (exception is not OperationCanceledException)
{
await TryWriteFaultAsync(MxAccessCreationException.From(exception), cancellationToken)
.ConfigureAwait(false);
throw;
}
}
private void ValidateGatewayHello(WorkerEnvelope envelope)
@@ -108,17 +133,11 @@ public sealed class WorkerPipeSession
cancellationToken);
}
private Task WriteWorkerReadyAsync(CancellationToken cancellationToken)
private Task WriteWorkerReadyAsync(
WorkerReady ready,
CancellationToken cancellationToken)
{
return _writer.WriteAsync(
CreateEnvelope(new WorkerReady
{
WorkerProcessId = _processIdProvider(),
MxaccessProgid = MxAccessInteropInfo.ProgId,
MxaccessClsid = MxAccessInteropInfo.Clsid,
ReadyTimestamp = Timestamp.FromDateTime(DateTime.UtcNow),
}),
cancellationToken);
return _writer.WriteAsync(CreateEnvelope(ready), cancellationToken);
}
private async Task TryWriteFaultAsync(
@@ -140,6 +159,25 @@ public sealed class WorkerPipeSession
}
}
private async Task TryWriteFaultAsync(
MxAccessCreationException exception,
CancellationToken cancellationToken)
{
try
{
await _writer
.WriteAsync(CreateEnvelope(CreateFault(exception)), cancellationToken)
.ConfigureAwait(false);
}
catch (Exception faultWriteException) when (
faultWriteException is IOException
|| faultWriteException is ObjectDisposedException
|| faultWriteException is WorkerFrameProtocolException)
{
// The MXAccess creation failure is the actionable error.
}
}
private WorkerEnvelope CreateEnvelope(WorkerHello hello)
{
return CreateBaseEnvelope(hello);
@@ -191,6 +229,34 @@ public sealed class WorkerPipeSession
return unchecked((ulong)Interlocked.Increment(ref _nextSequence));
}
private async Task<WorkerReady> InitializeMxAccessAsync(CancellationToken cancellationToken)
{
_mxAccessStaSession = new MxAccessStaSession();
try
{
return await _mxAccessStaSession
.StartAsync(_processIdProvider(), cancellationToken)
.ConfigureAwait(false);
}
catch
{
_mxAccessStaSession.Dispose();
_mxAccessStaSession = null;
throw;
}
}
private WorkerReady CreateWorkerReady()
{
return new WorkerReady
{
WorkerProcessId = _processIdProvider(),
MxaccessProgid = MxAccessInteropInfo.ProgId,
MxaccessClsid = MxAccessInteropInfo.Clsid,
ReadyTimestamp = Timestamp.FromDateTime(DateTime.UtcNow),
};
}
private static WorkerFault CreateFault(WorkerFrameProtocolException exception)
{
return new WorkerFault
@@ -206,6 +272,29 @@ public sealed class WorkerPipeSession
};
}
private static WorkerFault CreateFault(MxAccessCreationException exception)
{
WorkerFault fault = new()
{
Category = WorkerFaultCategory.MxaccessCreationFailed,
ExceptionType = exception.InnerException?.GetType().FullName ?? exception.GetType().FullName ?? string.Empty,
DiagnosticMessage = exception.Message,
ProtocolStatus = new ProtocolStatus
{
Code = ProtocolStatusCode.WorkerUnavailable,
Message = exception.Message,
},
};
int? hresult = MxAccessCreationException.ExtractHResult(exception);
if (hresult.HasValue)
{
fault.Hresult = hresult.Value;
}
return fault;
}
private static WorkerFaultCategory MapFaultCategory(WorkerFrameProtocolErrorCode errorCode)
{
return errorCode switch
@@ -0,0 +1,6 @@
namespace MxGateway.Worker.MxAccess;
public interface IMxAccessComObjectFactory
{
object Create();
}
@@ -0,0 +1,8 @@
namespace MxGateway.Worker.MxAccess;
public interface IMxAccessEventSink
{
void Attach(object mxAccessComObject);
void Detach();
}
@@ -0,0 +1,66 @@
using ArchestrA.MxAccess;
namespace MxGateway.Worker.MxAccess;
public sealed class MxAccessBaseEventSink : IMxAccessEventSink
{
private LMXProxyServerClass? server;
public void Attach(object mxAccessComObject)
{
server = (LMXProxyServerClass)mxAccessComObject;
server.OnDataChange += OnDataChange;
server.OnWriteComplete += OnWriteComplete;
server.OperationComplete += OperationComplete;
server.OnBufferedDataChange += OnBufferedDataChange;
}
public void Detach()
{
if (server is null)
{
return;
}
server.OnDataChange -= OnDataChange;
server.OnWriteComplete -= OnWriteComplete;
server.OperationComplete -= OperationComplete;
server.OnBufferedDataChange -= OnBufferedDataChange;
server = null;
}
private static void OnDataChange(
int hLMXServerHandle,
int phItemHandle,
object pvItemValue,
int pwItemQuality,
object pftItemTimeStamp,
ref MXSTATUS_PROXY[] pVars)
{
}
private static void OnWriteComplete(
int hLMXServerHandle,
int phItemHandle,
ref MXSTATUS_PROXY[] pVars)
{
}
private static void OperationComplete(
int hLMXServerHandle,
int phItemHandle,
ref MXSTATUS_PROXY[] pVars)
{
}
private static void OnBufferedDataChange(
int hLMXServerHandle,
int phItemHandle,
MxDataType dtDataType,
object pvItemValue,
object pwItemQuality,
object pftItemTimeStamp,
ref MXSTATUS_PROXY[] pVars)
{
}
}
@@ -0,0 +1,11 @@
using ArchestrA.MxAccess;
namespace MxGateway.Worker.MxAccess;
public sealed class MxAccessComObjectFactory : IMxAccessComObjectFactory
{
public object Create()
{
return new LMXProxyServerClass();
}
}
@@ -0,0 +1,48 @@
using System;
using System.Runtime.InteropServices;
namespace MxGateway.Worker.MxAccess;
public sealed class MxAccessCreationException : Exception
{
public MxAccessCreationException(Exception innerException)
: base(
$"Failed to create MXAccess COM object {MxAccessInteropInfo.ComClassName} ({MxAccessInteropInfo.ProgId}).",
innerException)
{
AttemptedProgId = MxAccessInteropInfo.ProgId;
AttemptedClsid = MxAccessInteropInfo.Clsid;
AttemptedComClassName = MxAccessInteropInfo.ComClassName;
HResult = innerException.HResult;
}
public string AttemptedProgId { get; }
public string AttemptedClsid { get; }
public string AttemptedComClassName { get; }
public int? CapturedHResult => HResult == 0 ? null : HResult;
public static MxAccessCreationException From(Exception exception)
{
return exception is MxAccessCreationException creationException
? creationException
: new MxAccessCreationException(exception);
}
public static int? ExtractHResult(Exception exception)
{
if (exception is MxAccessCreationException creationException)
{
return creationException.CapturedHResult;
}
if (exception is COMException comException)
{
return comException.HResult;
}
return exception.HResult == 0 ? null : exception.HResult;
}
}
@@ -0,0 +1,97 @@
using System;
using System.Runtime.InteropServices;
using Google.Protobuf.WellKnownTypes;
using MxGateway.Contracts.Proto;
namespace MxGateway.Worker.MxAccess;
public sealed class MxAccessSession : IDisposable
{
private readonly object mxAccessComObject;
private readonly IMxAccessEventSink eventSink;
private bool disposed;
private MxAccessSession(
object mxAccessComObject,
IMxAccessEventSink eventSink,
int creationThreadId)
{
this.mxAccessComObject = mxAccessComObject ?? throw new ArgumentNullException(nameof(mxAccessComObject));
this.eventSink = eventSink ?? throw new ArgumentNullException(nameof(eventSink));
CreationThreadId = creationThreadId;
}
public int CreationThreadId { get; }
public WorkerReady CreateWorkerReady(int workerProcessId)
{
return new WorkerReady
{
WorkerProcessId = workerProcessId,
MxaccessProgid = MxAccessInteropInfo.ProgId,
MxaccessClsid = MxAccessInteropInfo.Clsid,
ReadyTimestamp = Timestamp.FromDateTime(DateTime.UtcNow),
};
}
public static MxAccessSession Create(
IMxAccessComObjectFactory factory,
IMxAccessEventSink eventSink)
{
if (factory is null)
{
throw new ArgumentNullException(nameof(factory));
}
if (eventSink is null)
{
throw new ArgumentNullException(nameof(eventSink));
}
object? mxAccessComObject = null;
try
{
mxAccessComObject = factory.Create();
if (mxAccessComObject is null)
{
throw new InvalidOperationException("MXAccess COM factory returned null.");
}
eventSink.Attach(mxAccessComObject);
return new MxAccessSession(
mxAccessComObject,
eventSink,
Environment.CurrentManagedThreadId);
}
catch (Exception exception)
{
eventSink.Detach();
if (mxAccessComObject is not null && Marshal.IsComObject(mxAccessComObject))
{
Marshal.FinalReleaseComObject(mxAccessComObject);
}
throw MxAccessCreationException.From(exception);
}
}
public void Dispose()
{
if (disposed)
{
return;
}
eventSink.Detach();
if (Marshal.IsComObject(mxAccessComObject))
{
Marshal.FinalReleaseComObject(mxAccessComObject);
}
disposed = true;
}
}
@@ -0,0 +1,70 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using MxGateway.Contracts.Proto;
using MxGateway.Worker.Sta;
namespace MxGateway.Worker.MxAccess;
public sealed class MxAccessStaSession : IDisposable
{
private readonly IMxAccessComObjectFactory factory;
private readonly IMxAccessEventSink eventSink;
private readonly StaRuntime staRuntime;
private MxAccessSession? session;
private bool disposed;
public MxAccessStaSession()
: this(
new StaRuntime(),
new MxAccessComObjectFactory(),
new MxAccessBaseEventSink())
{
}
public MxAccessStaSession(
StaRuntime staRuntime,
IMxAccessComObjectFactory factory,
IMxAccessEventSink eventSink)
{
this.staRuntime = staRuntime ?? throw new ArgumentNullException(nameof(staRuntime));
this.factory = factory ?? throw new ArgumentNullException(nameof(factory));
this.eventSink = eventSink ?? throw new ArgumentNullException(nameof(eventSink));
}
public Task<WorkerReady> StartAsync(
int workerProcessId,
CancellationToken cancellationToken = default)
{
staRuntime.Start();
return staRuntime.InvokeAsync(
() =>
{
if (session is not null)
{
throw new InvalidOperationException("MXAccess COM session has already been created.");
}
session = MxAccessSession.Create(factory, eventSink);
return session.CreateWorkerReady(workerProcessId);
},
cancellationToken);
}
public void Dispose()
{
if (disposed)
{
return;
}
if (session is not null)
{
staRuntime.InvokeAsync(() => session.Dispose()).GetAwaiter().GetResult();
}
staRuntime.Dispose();
disposed = true;
}
}
@@ -0,0 +1,8 @@
namespace MxGateway.Worker.Sta;
public interface IStaComApartmentInitializer
{
void Initialize();
void Uninitialize();
}
+8
View File
@@ -0,0 +1,8 @@
namespace MxGateway.Worker.Sta;
internal interface IStaWorkItem
{
void CancelBeforeExecution();
void Execute();
}
@@ -0,0 +1,31 @@
using System;
using System.Runtime.InteropServices;
namespace MxGateway.Worker.Sta;
public sealed class StaComApartmentInitializer : IStaComApartmentInitializer
{
private const uint CoInitializeApartmentThreaded = 0x2;
private const int SOk = 0;
private const int SFalse = 1;
public void Initialize()
{
int hresult = CoInitializeEx(IntPtr.Zero, CoInitializeApartmentThreaded);
if (hresult != SOk && hresult != SFalse)
{
throw new COMException("Failed to initialize the worker STA COM apartment.", hresult);
}
}
public void Uninitialize()
{
CoUninitialize();
}
[DllImport("ole32.dll")]
private static extern int CoInitializeEx(IntPtr reserved, uint coInit);
[DllImport("ole32.dll")]
private static extern void CoUninitialize();
}
+111
View File
@@ -0,0 +1,111 @@
using System;
using System.Runtime.InteropServices;
using System.Threading;
using Microsoft.Win32.SafeHandles;
namespace MxGateway.Worker.Sta;
public sealed class StaMessagePump
{
private const uint Infinite = 0xFFFFFFFF;
private const uint MsgWaitFailed = 0xFFFFFFFF;
private const uint MwmoInputAvailable = 0x0004;
private const uint PmRemove = 0x0001;
private const uint QsAllInput = 0x04FF;
public void WaitForWorkOrMessages(WaitHandle commandWakeEvent, TimeSpan timeout)
{
if (commandWakeEvent is null)
{
throw new ArgumentNullException(nameof(commandWakeEvent));
}
uint timeoutMilliseconds = ToTimeoutMilliseconds(timeout);
SafeWaitHandle safeHandle = commandWakeEvent.SafeWaitHandle;
IntPtr[] handles = [safeHandle.DangerousGetHandle()];
uint result = MsgWaitForMultipleObjectsEx(
(uint)handles.Length,
handles,
timeoutMilliseconds,
QsAllInput,
MwmoInputAvailable);
if (result == MsgWaitFailed)
{
throw new InvalidOperationException(
"The worker STA message pump failed while waiting for command work or Windows messages.");
}
}
public int PumpPendingMessages()
{
int pumpedMessages = 0;
while (PeekMessage(out NativeMessage message, IntPtr.Zero, 0, 0, PmRemove))
{
TranslateMessage(ref message);
DispatchMessage(ref message);
pumpedMessages++;
}
return pumpedMessages;
}
private static uint ToTimeoutMilliseconds(TimeSpan timeout)
{
if (timeout == Timeout.InfiniteTimeSpan)
{
return Infinite;
}
if (timeout <= TimeSpan.Zero)
{
return 0;
}
return timeout.TotalMilliseconds >= uint.MaxValue
? uint.MaxValue - 1
: (uint)Math.Ceiling(timeout.TotalMilliseconds);
}
[DllImport("user32.dll", SetLastError = true)]
private static extern uint MsgWaitForMultipleObjectsEx(
uint count,
IntPtr[] handles,
uint milliseconds,
uint wakeMask,
uint flags);
[DllImport("user32.dll", SetLastError = true)]
private static extern bool PeekMessage(
out NativeMessage message,
IntPtr windowHandle,
uint messageFilterMin,
uint messageFilterMax,
uint removeMessage);
[DllImport("user32.dll")]
private static extern bool TranslateMessage(ref NativeMessage message);
[DllImport("user32.dll")]
private static extern IntPtr DispatchMessage(ref NativeMessage message);
[StructLayout(LayoutKind.Sequential)]
private struct NativeMessage
{
public IntPtr WindowHandle;
public uint Message;
public UIntPtr WParam;
public IntPtr LParam;
public uint Time;
public NativePoint Point;
}
[StructLayout(LayoutKind.Sequential)]
private struct NativePoint
{
public int X;
public int Y;
}
}
+267
View File
@@ -0,0 +1,267 @@
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
namespace MxGateway.Worker.Sta;
public sealed class StaRuntime : IDisposable
{
private readonly IStaComApartmentInitializer comApartmentInitializer;
private readonly StaMessagePump messagePump;
private readonly ConcurrentQueue<IStaWorkItem> commandQueue = new();
private readonly AutoResetEvent commandWakeEvent = new(false);
private readonly ManualResetEventSlim startedEvent = new(false);
private readonly ManualResetEventSlim stoppedEvent = new(false);
private readonly object gate = new();
private readonly Thread staThread;
private readonly TimeSpan idlePumpInterval;
private bool disposed;
private bool startRequested;
private bool shutdownRequested;
private Exception? startupException;
private long lastActivityUtcTicks;
private bool comInitialized;
public StaRuntime()
: this(new StaComApartmentInitializer(), new StaMessagePump(), TimeSpan.FromMilliseconds(50))
{
}
public StaRuntime(
IStaComApartmentInitializer comApartmentInitializer,
StaMessagePump messagePump,
TimeSpan idlePumpInterval)
{
this.comApartmentInitializer = comApartmentInitializer
?? throw new ArgumentNullException(nameof(comApartmentInitializer));
this.messagePump = messagePump ?? throw new ArgumentNullException(nameof(messagePump));
if (idlePumpInterval <= TimeSpan.Zero)
{
throw new ArgumentOutOfRangeException(
nameof(idlePumpInterval),
"The idle pump interval must be greater than zero.");
}
this.idlePumpInterval = idlePumpInterval;
lastActivityUtcTicks = DateTimeOffset.UtcNow.UtcTicks;
staThread = new Thread(ThreadMain)
{
IsBackground = true,
Name = "MxGateway.Worker.STA"
};
staThread.SetApartmentState(ApartmentState.STA);
}
public int? StaThreadId { get; private set; }
public DateTimeOffset LastActivityUtc =>
new(new DateTime(Volatile.Read(ref lastActivityUtcTicks), DateTimeKind.Utc));
public bool IsRunning => startedEvent.IsSet && !stoppedEvent.IsSet;
public void Start()
{
ThrowIfDisposed();
lock (gate)
{
if (shutdownRequested)
{
throw new InvalidOperationException("The worker STA runtime is shutting down.");
}
if (!startRequested)
{
startRequested = true;
staThread.Start();
}
}
startedEvent.Wait();
if (startupException is not null)
{
throw new InvalidOperationException(
"The worker STA runtime failed to initialize.",
startupException);
}
}
public Task InvokeAsync(Action command, CancellationToken cancellationToken = default)
{
if (command is null)
{
throw new ArgumentNullException(nameof(command));
}
return InvokeAsync(
() =>
{
command();
return true;
},
cancellationToken);
}
public Task<T> InvokeAsync<T>(Func<T> command, CancellationToken cancellationToken = default)
{
if (command is null)
{
throw new ArgumentNullException(nameof(command));
}
ThrowIfDisposed();
if (cancellationToken.IsCancellationRequested)
{
return Task.FromCanceled<T>(cancellationToken);
}
StaWorkItem<T> workItem = new(command, cancellationToken);
lock (gate)
{
if (shutdownRequested)
{
return Task.FromException<T>(
new InvalidOperationException("The worker STA runtime is shutting down."));
}
commandQueue.Enqueue(workItem);
}
commandWakeEvent.Set();
return workItem.Task;
}
public bool Shutdown(TimeSpan timeout)
{
if (timeout < TimeSpan.Zero && timeout != Timeout.InfiniteTimeSpan)
{
throw new ArgumentOutOfRangeException(nameof(timeout));
}
lock (gate)
{
shutdownRequested = true;
}
commandWakeEvent.Set();
if (!startedEvent.IsSet && !staThread.IsAlive)
{
CancelQueuedCommands();
stoppedEvent.Set();
return true;
}
bool stopped = stoppedEvent.Wait(timeout);
if (stopped)
{
CancelQueuedCommands();
}
return stopped;
}
public void Dispose()
{
if (disposed)
{
return;
}
bool stopped = Shutdown(TimeSpan.FromSeconds(5));
if (stopped)
{
commandWakeEvent.Dispose();
startedEvent.Dispose();
stoppedEvent.Dispose();
}
disposed = true;
}
private void ThreadMain()
{
try
{
StaThreadId = Thread.CurrentThread.ManagedThreadId;
comApartmentInitializer.Initialize();
comInitialized = true;
MarkActivity();
startedEvent.Set();
while (!IsShutdownRequested())
{
ProcessQueuedCommands();
messagePump.WaitForWorkOrMessages(commandWakeEvent, idlePumpInterval);
messagePump.PumpPendingMessages();
MarkActivity();
}
ProcessQueuedCommands();
}
catch (Exception exception)
{
startupException = exception;
startedEvent.Set();
}
finally
{
CancelQueuedCommands();
try
{
if (comInitialized)
{
comApartmentInitializer.Uninitialize();
}
}
finally
{
MarkActivity();
stoppedEvent.Set();
}
}
}
private void ProcessQueuedCommands()
{
while (commandQueue.TryDequeue(out IStaWorkItem? workItem))
{
MarkActivity();
workItem.Execute();
MarkActivity();
}
}
private void CancelQueuedCommands()
{
while (commandQueue.TryDequeue(out IStaWorkItem? workItem))
{
workItem.CancelBeforeExecution();
}
}
private bool IsShutdownRequested()
{
lock (gate)
{
return shutdownRequested;
}
}
private void MarkActivity()
{
Volatile.Write(ref lastActivityUtcTicks, DateTimeOffset.UtcNow.UtcTicks);
}
private void ThrowIfDisposed()
{
if (disposed)
{
throw new ObjectDisposedException(nameof(StaRuntime));
}
}
}
+71
View File
@@ -0,0 +1,71 @@
using System;
using System.Threading;
using System.Threading.Tasks;
namespace MxGateway.Worker.Sta;
internal sealed class StaWorkItem<T> : IStaWorkItem
{
private readonly Func<T> command;
private readonly CancellationToken cancellationToken;
private readonly CancellationTokenRegistration cancellationRegistration;
private int started;
public StaWorkItem(Func<T> command, CancellationToken cancellationToken)
{
this.command = command ?? throw new ArgumentNullException(nameof(command));
this.cancellationToken = cancellationToken;
Completion = new TaskCompletionSource<T>(TaskCreationOptions.RunContinuationsAsynchronously);
if (cancellationToken.CanBeCanceled)
{
cancellationRegistration = cancellationToken.Register(
() =>
{
if (Interlocked.CompareExchange(ref started, 1, 0) == 0)
{
Completion.TrySetCanceled(cancellationToken);
}
});
}
}
public Task<T> Task => Completion.Task;
private TaskCompletionSource<T> Completion { get; }
public void CancelBeforeExecution()
{
if (Interlocked.CompareExchange(ref started, 1, 0) == 0)
{
Completion.TrySetCanceled(cancellationToken);
cancellationRegistration.Dispose();
}
}
public void Execute()
{
if (Interlocked.CompareExchange(ref started, 1, 0) != 0)
{
cancellationRegistration.Dispose();
return;
}
cancellationRegistration.Dispose();
if (cancellationToken.IsCancellationRequested)
{
Completion.TrySetCanceled(cancellationToken);
return;
}
try
{
Completion.TrySetResult(command());
}
catch (Exception exception)
{
Completion.TrySetException(exception);
}
}
}