Resolve Worker-004, -005, -006, -007, -008 code-review findings

Worker-004: post-watchdog-fault heartbeats reported a non-faulted state.
ReportWatchdogFaultIfNeededAsync now sets _state = Faulted before writing
the StaHung fault.

Worker-005 (re-triaged): the cited OnPoll site was removed by Worker-001;
the real silent-failure bug was in MxAccessStaSession.RunAlarmPollLoopAsync,
which caught only graceful-stop exceptions. A failing PollOnce now records a
WorkerFault on the event queue instead of vanishing on a non-awaited task.

Worker-006: RunAsync's finally skipped runtime disposal when shutdown timed
out, leaking the STA thread and COM object. It now always disposes
(MxAccessStaSession.Dispose is idempotent and bounded).

Worker-007 (re-triaged): replaced MxAccessComServer's Type.InvokeMember
reflection fallback with an IMxAccessServer fast path plus typed
ILMXProxyServer* casts; a non-conforming object now fails fast.

Worker-008: alarm consumer STA affinity was unenforced. MxAccessStaSession
records the alarm consumer's STA thread id and asserts every PollOnce runs
on it via a unit-testable guard.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-18 21:31:23 -04:00
parent 1d9e3afadd
commit 54325343bd
8 changed files with 519 additions and 68 deletions
@@ -313,6 +313,121 @@ public sealed class WorkerPipeSessionTests
await SendShutdownAndWaitAsync(pipePair, runTask, cancellation.Token);
}
/// <summary>
/// Worker-004 regression: once the watchdog reports an StaHung fault,
/// subsequent heartbeats must report <see cref="WorkerState.Faulted"/>
/// rather than a non-faulted state that contradicts the fault. The
/// snapshot uses an empty current-command correlation id so the
/// heartbeat State is derived from the session state, not forced to
/// ExecutingCommand.
/// </summary>
[Fact]
public async Task RunAsync_AfterWatchdogFault_HeartbeatReportsFaultedState()
{
using CancellationTokenSource cancellation = new(TimeSpan.FromSeconds(10));
using PipePair pipePair = await PipePair.CreateAsync(cancellation.Token);
FakeRuntimeSession runtime = new();
runtime.SetSnapshot(new WorkerRuntimeHeartbeatSnapshot(
DateTimeOffset.UtcNow - TimeSpan.FromSeconds(5),
pendingCommandCount: 0,
outboundEventQueueDepth: 0,
lastEventSequence: 0,
currentCommandCorrelationId: string.Empty));
WorkerPipeSession session = CreatePipeSession(
pipePair.WorkerStream,
runtime,
new WorkerPipeSessionOptions
{
HeartbeatInterval = TimeSpan.FromMilliseconds(20),
HeartbeatGrace = TimeSpan.FromMilliseconds(50),
});
Task runTask = session.RunAsync(cancellation.Token);
await CompleteGatewayHandshakeAsync(pipePair, cancellation.Token);
WorkerEnvelope fault = await ReadUntilAsync(
pipePair.GatewayReader,
WorkerEnvelope.BodyOneofCase.WorkerFault,
cancellation.Token);
Assert.Equal(WorkerFaultCategory.StaHung, fault.WorkerFault.Category);
// The next heartbeat after the fault must agree with it.
WorkerEnvelope heartbeat = await ReadUntilAsync(
pipePair.GatewayReader,
WorkerEnvelope.BodyOneofCase.WorkerHeartbeat,
cancellation.Token);
Assert.Equal(WorkerState.Faulted, heartbeat.WorkerHeartbeat.State);
await SendShutdownAndWaitAsync(pipePair, runTask, cancellation.Token);
}
/// <summary>
/// Worker-006 regression: when graceful shutdown times out, RunAsync
/// must still dispose the runtime session in its finally block.
/// Skipping disposal on the timed-out path leaked the STA thread and
/// the MXAccess COM object.
/// </summary>
[Fact]
public async Task RunAsync_WhenShutdownTimesOut_StillDisposesRuntimeSession()
{
using CancellationTokenSource cancellation = new(TimeSpan.FromSeconds(10));
using PipePair pipePair = await PipePair.CreateAsync(cancellation.Token);
FakeRuntimeSession runtime = new()
{
ThrowTimeoutOnShutdown = true,
};
WorkerPipeSession session = CreatePipeSession(
pipePair.WorkerStream,
runtime,
new WorkerPipeSessionOptions
{
HeartbeatInterval = TimeSpan.FromSeconds(1),
HeartbeatGrace = TimeSpan.FromSeconds(30),
});
Task runTask = session.RunAsync(cancellation.Token);
await CompleteGatewayHandshakeAsync(pipePair, cancellation.Token);
await pipePair.GatewayWriter
.WriteAsync(CreateShutdownEnvelope(), cancellation.Token);
// Drain the gateway-side pipe (heartbeats + the shutdown-timeout
// fault) so the worker's writes never block on a full pipe buffer.
Task drainTask = DrainReaderUntilFaultedAsync(pipePair.GatewayReader, cancellation.Token);
// RunAsync must rethrow the TimeoutException and still reach its
// finally block, which disposes the runtime session.
await Assert.ThrowsAsync<TimeoutException>(async () => await runTask);
Assert.True(
runtime.Disposed,
"RunAsync must dispose the runtime session even when shutdown times out.");
await drainTask;
}
private static async Task DrainReaderUntilFaultedAsync(
WorkerFrameReader reader,
CancellationToken cancellationToken)
{
try
{
while (!cancellationToken.IsCancellationRequested)
{
WorkerEnvelope envelope = await reader.ReadAsync(cancellationToken).ConfigureAwait(false);
if (envelope.BodyCase == WorkerEnvelope.BodyOneofCase.WorkerFault
&& envelope.WorkerFault.Category == WorkerFaultCategory.ShutdownTimeout)
{
return;
}
}
}
catch (Exception exception) when (
exception is OperationCanceledException
|| exception is IOException
|| exception is WorkerFrameProtocolException)
{
// The worker pipe closed once RunAsync completed — expected.
}
}
/// <summary>Verifies that shutdown drops late replies and sends shutdown ack.</summary>
[Fact]
public async Task RunAsync_WhenShutdownArrivesDuringCommand_DropsLateReplyAndWritesShutdownAck()
@@ -813,6 +928,12 @@ public sealed class WorkerPipeSessionTests
/// <summary>Gets or sets whether to throw an exception after dispatch is released.</summary>
public bool ThrowAfterDispatchReleased { get; set; }
/// <summary>Gets or sets whether ShutdownGracefullyAsync throws a TimeoutException.</summary>
public bool ThrowTimeoutOnShutdown { get; set; }
/// <summary>Gets a value indicating whether Dispose was called.</summary>
public bool Disposed { get; private set; }
/// <summary>Starts the worker session with the given session ID and process ID.</summary>
/// <param name="sessionId">The session identifier.</param>
/// <param name="workerProcessId">The worker process ID.</param>
@@ -939,6 +1060,12 @@ public sealed class WorkerPipeSessionTests
CancellationToken cancellationToken = default)
{
releaseDispatch.Set();
if (ThrowTimeoutOnShutdown)
{
return Task.FromException<MxAccessShutdownResult>(
new TimeoutException("Simulated graceful shutdown timeout."));
}
return Task.FromResult(new MxAccessShutdownResult(Array.Empty<MxAccessShutdownFailure>()));
}
@@ -971,6 +1098,7 @@ public sealed class WorkerPipeSessionTests
/// <summary>Disposes resources.</summary>
public void Dispose()
{
Disposed = true;
releaseDispatch.Set();
releaseDispatch.Dispose();
DispatchStarted.Dispose();
@@ -0,0 +1,138 @@
using System;
using System.Collections.Generic;
using MxGateway.Worker.MxAccess;
namespace MxGateway.Worker.Tests.MxAccess;
/// <summary>
/// Worker-007 regression tests for <see cref="MxAccessComServer"/>. The
/// adapter no longer falls back to late-bound <c>Type.InvokeMember</c>
/// reflection: a COM object must implement either the typed
/// <c>ILMXProxyServer</c> COM interface family (production) or
/// <see cref="IMxAccessServer"/> directly (test fakes).
/// </summary>
public sealed class MxAccessComServerTests
{
/// <summary>
/// A COM object implementing <see cref="IMxAccessServer"/> is routed
/// through the typed interface — no reflection — preserving arguments
/// and return values.
/// </summary>
[Fact]
public void Methods_WithTypedServer_RouteThroughTypedInterface()
{
RecordingMxAccessServer typed = new(registerHandle: 77);
MxAccessComServer adapter = new(typed);
int serverHandle = adapter.Register("client-a");
adapter.Advise(serverHandle, itemHandle: 9);
adapter.Unregister(serverHandle);
Assert.Equal(77, serverHandle);
Assert.Equal("client-a", typed.RegisteredClientName);
Assert.Equal(new[] { "Register:client-a", "Advise:77:9", "Unregister:77" }, typed.Calls);
}
/// <summary>
/// A COM object that implements neither the typed COM interface family
/// nor <see cref="IMxAccessServer"/> fails fast with a clear
/// <see cref="InvalidOperationException"/> instead of a late-bound
/// reflection call.
/// </summary>
[Fact]
public void Methods_WithUntypedObject_ThrowInvalidOperation()
{
MxAccessComServer adapter = new(new object());
InvalidOperationException exception =
Assert.Throws<InvalidOperationException>(() => adapter.Register("client"));
Assert.Contains("does not implement", exception.Message, StringComparison.Ordinal);
Assert.Contains(nameof(IMxAccessServer), exception.Message, StringComparison.Ordinal);
}
/// <summary>
/// Exceptions thrown by the typed server propagate unchanged — no
/// <c>TargetInvocationException</c> wrapping (reflection is gone).
/// </summary>
[Fact]
public void Methods_WhenTypedServerThrows_PropagateOriginalException()
{
RecordingMxAccessServer typed = new(registerHandle: 1)
{
ThrowOnRegister = new InvalidOperationException("register failed"),
};
MxAccessComServer adapter = new(typed);
InvalidOperationException exception =
Assert.Throws<InvalidOperationException>(() => adapter.Register("client"));
Assert.Equal("register failed", exception.Message);
}
private sealed class RecordingMxAccessServer : IMxAccessServer
{
private readonly int registerHandle;
private readonly List<string> calls = new();
public RecordingMxAccessServer(int registerHandle)
{
this.registerHandle = registerHandle;
}
public string? RegisteredClientName { get; private set; }
public Exception? ThrowOnRegister { get; set; }
public IReadOnlyList<string> Calls => calls.ToArray();
public int Register(string clientName)
{
calls.Add($"Register:{clientName}");
RegisteredClientName = clientName;
if (ThrowOnRegister is not null)
{
throw ThrowOnRegister;
}
return registerHandle;
}
public void Unregister(int serverHandle)
{
calls.Add($"Unregister:{serverHandle}");
}
public int AddItem(int serverHandle, string itemDefinition)
{
calls.Add($"AddItem:{serverHandle}:{itemDefinition}");
return 0;
}
public int AddItem2(int serverHandle, string itemDefinition, string itemContext)
{
calls.Add($"AddItem2:{serverHandle}:{itemDefinition}:{itemContext}");
return 0;
}
public void RemoveItem(int serverHandle, int itemHandle)
{
calls.Add($"RemoveItem:{serverHandle}:{itemHandle}");
}
public void Advise(int serverHandle, int itemHandle)
{
calls.Add($"Advise:{serverHandle}:{itemHandle}");
}
public void UnAdvise(int serverHandle, int itemHandle)
{
calls.Add($"UnAdvise:{serverHandle}:{itemHandle}");
}
public void AdviseSupervisory(int serverHandle, int itemHandle)
{
calls.Add($"AdviseSupervisory:{serverHandle}:{itemHandle}");
}
}
}
@@ -816,7 +816,7 @@ public sealed class MxAccessCommandExecutorTests
TimeSpan.FromMilliseconds(25));
}
private sealed class FakeMxAccessComObject
private sealed class FakeMxAccessComObject : IMxAccessServer
{
private readonly int registerHandle;
private readonly int addItemHandle;
@@ -328,6 +328,77 @@ public sealed class MxAccessStaSessionTests
Assert.Equal(pollCountAtDispose, handler.PollCount);
}
/// <summary>
/// Worker-005 regression: when the alarm poll loop's PollOnce throws a
/// real failure (e.g. a COMException from GetXmlCurrentAlarms2), the
/// failure must be recorded as a fault on the event queue so a broken
/// alarm subscription becomes observable on the IPC fault path instead
/// of silently faulting the never-awaited poll task.
/// </summary>
[Fact]
public async Task RunAlarmPollLoop_WhenPollOnceThrows_RecordsFaultOnEventQueue()
{
FakeAlarmCommandHandler handler = new()
{
PollException = new System.Runtime.InteropServices.COMException(
"GetXmlCurrentAlarms2 failed.", unchecked((int)0x80004005)),
};
FakeMxAccessComObjectFactory factory = new();
FakeMxAccessEventSink eventSink = new();
using StaRuntime runtime = CreateRuntime();
MxAccessEventQueue eventQueue = new();
using MxAccessStaSession session = new(
runtime,
factory,
eventSink,
eventQueue,
_eq => handler);
await session.StartAsync("session-1", workerProcessId: 1);
// Wait up to 5s for the poll loop to fault the queue.
using CancellationTokenSource timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
while (!eventQueue.IsFaulted && !timeout.IsCancellationRequested)
{
await Task.Delay(50, CancellationToken.None);
}
Assert.True(eventQueue.IsFaulted, "Expected the alarm poll failure to fault the event queue.");
WorkerFault? fault = session.DrainFault();
Assert.NotNull(fault);
Assert.Equal(WorkerFaultCategory.MxaccessEventConversionFailed, fault!.Category);
Assert.Contains("alarm poll failed", fault.DiagnosticMessage, StringComparison.OrdinalIgnoreCase);
Assert.Equal(typeof(System.Runtime.InteropServices.COMException).FullName, fault.ExceptionType);
}
/// <summary>
/// Worker-008 regression: the STA-affinity guard throws when an
/// IMxAccessAlarmConsumer call is attempted off the thread that created
/// the consumer, mirroring the MxAccessSession.CreationThreadId invariant.
/// </summary>
[Fact]
public void AssertOnAlarmConsumerThread_WhenOffOwningThread_Throws()
{
const int owningThread = 7;
const int otherThread = 99;
InvalidOperationException exception = Assert.Throws<InvalidOperationException>(
() => MxAccessStaSession.AssertOnAlarmConsumerThread(owningThread, otherThread));
Assert.Contains("off its owning STA thread", exception.Message, StringComparison.Ordinal);
}
/// <summary>
/// Worker-008: the STA-affinity guard is a no-op on the owning thread and
/// when no alarm consumer is configured (expected thread id null).
/// </summary>
[Fact]
public void AssertOnAlarmConsumerThread_OnOwningThreadOrUnset_DoesNotThrow()
{
MxAccessStaSession.AssertOnAlarmConsumerThread(expectedThreadId: 42, actualThreadId: 42);
MxAccessStaSession.AssertOnAlarmConsumerThread(expectedThreadId: null, actualThreadId: 123);
}
/// <summary>
/// Noop STA COM apartment initializer for testing.
/// </summary>
@@ -360,6 +431,9 @@ public sealed class MxAccessStaSessionTests
public bool IsSubscribed { get; private set; }
public string? LastSubscription { get; private set; }
/// <summary>Exception thrown by PollOnce; null to succeed.</summary>
public Exception? PollException { get; set; }
public int PollCount
{
get { lock (gate) return pollCount; }
@@ -400,6 +474,11 @@ public sealed class MxAccessStaSessionTests
pollCount++;
lastPollThreadId = Thread.CurrentThread.ManagedThreadId;
}
if (PollException is not null)
{
throw PollException;
}
}
public void Dispose() { }
+14 -7
View File
@@ -36,7 +36,6 @@ public sealed class WorkerPipeSession
private volatile WorkerState _state = WorkerState.Starting;
private bool _acceptingCommands = true;
private bool _watchdogFaultSent;
private bool _shutdownTimedOut;
/// <summary>Initializes a new worker pipe session over the provided stream.</summary>
/// <param name="stream">Network stream for reading and writing frames.</param>
@@ -119,11 +118,14 @@ public sealed class WorkerPipeSession
}
finally
{
if (!_shutdownTimedOut)
{
_runtimeSession?.Dispose();
}
// Always dispose the runtime session, including after a
// shutdown timeout. MxAccessStaSession.Dispose is idempotent and
// bounded (each STA join is capped at 2s), so re-entering it on
// the normal path is a harmless no-op, while on the timed-out
// path it is the only thing that reclaims the STA thread and
// releases the MXAccess COM object — skipping it leaked both and
// left cleanup to rely solely on process exit.
_runtimeSession?.Dispose();
_runtimeSession = null;
_state = WorkerState.Stopped;
}
@@ -480,7 +482,6 @@ public sealed class WorkerPipeSession
}
catch (TimeoutException exception)
{
_shutdownTimedOut = true;
_state = WorkerState.Faulted;
await TryWriteFaultAsync(CreateShutdownTimeoutFault(exception), cancellationToken).ConfigureAwait(false);
throw;
@@ -615,6 +616,12 @@ public sealed class WorkerPipeSession
}
_watchdogFaultSent = true;
// The STA is hung — move the session to Faulted before the next
// heartbeat so the heartbeat's reported State stays consistent with
// the StaHung fault just sent. Without this the heartbeat loop keeps
// advertising a non-faulted state that contradicts the fault.
_state = WorkerState.Faulted;
await TryWriteFaultAsync(
CreateFault(
WorkerFaultCategory.StaHung,
@@ -1,13 +1,22 @@
using System;
using System.Reflection;
using System.Runtime.ExceptionServices;
using ArchestrA.MxAccess;
namespace MxGateway.Worker.MxAccess;
/// <summary>
/// Adapter exposing MXAccess COM object methods through the IMxAccessServer interface.
/// Adapter exposing MXAccess COM object methods through the <see cref="IMxAccessServer"/>
/// interface.
/// </summary>
/// <remarks>
/// The supplied object must implement the typed MXAccess COM interface contract.
/// In production it is the <c>LMXProxyServerClass</c> RCW, which implements
/// <see cref="ILMXProxyServer"/> / <see cref="ILMXProxyServer3"/> /
/// <see cref="ILMXProxyServer4"/>. Tests substitute a typed fake that
/// implements <see cref="IMxAccessServer"/> directly. The earlier late-bound
/// <c>Type.InvokeMember</c> reflection fallback was removed: it bypassed the
/// typed interface contract, boxed value-type handles on every call, and only
/// ever served test doubles — a typed fake is the supported test seam now.
/// </remarks>
public sealed class MxAccessComServer : IMxAccessServer
{
private readonly object mxAccessComObject;
@@ -15,7 +24,11 @@ public sealed class MxAccessComServer : IMxAccessServer
/// <summary>
/// Initializes the adapter with the MXAccess COM object.
/// </summary>
/// <param name="mxAccessComObject">MXAccess COM object instance.</param>
/// <param name="mxAccessComObject">
/// MXAccess COM object instance. Must implement either the typed
/// <see cref="ILMXProxyServer"/> COM interface family (production) or
/// <see cref="IMxAccessServer"/> directly (test fakes).
/// </param>
public MxAccessComServer(object mxAccessComObject)
{
this.mxAccessComObject = mxAccessComObject ?? throw new ArgumentNullException(nameof(mxAccessComObject));
@@ -24,24 +37,24 @@ public sealed class MxAccessComServer : IMxAccessServer
/// <inheritdoc />
public int Register(string clientName)
{
if (mxAccessComObject is ILMXProxyServer mxAccessServer)
if (mxAccessComObject is IMxAccessServer typedFake)
{
return mxAccessServer.Register(clientName);
return typedFake.Register(clientName);
}
return (int)Invoke(nameof(Register), clientName);
return AsProxyServer().Register(clientName);
}
/// <inheritdoc />
public void Unregister(int serverHandle)
{
if (mxAccessComObject is ILMXProxyServer mxAccessServer)
if (mxAccessComObject is IMxAccessServer typedFake)
{
mxAccessServer.Unregister(serverHandle);
typedFake.Unregister(serverHandle);
return;
}
Invoke(nameof(Unregister), serverHandle);
AsProxyServer().Unregister(serverHandle);
}
/// <inheritdoc />
@@ -49,12 +62,12 @@ public sealed class MxAccessComServer : IMxAccessServer
int serverHandle,
string itemDefinition)
{
if (mxAccessComObject is ILMXProxyServer mxAccessServer)
if (mxAccessComObject is IMxAccessServer typedFake)
{
return mxAccessServer.AddItem(serverHandle, itemDefinition);
return typedFake.AddItem(serverHandle, itemDefinition);
}
return (int)Invoke(nameof(AddItem), serverHandle, itemDefinition);
return AsProxyServer().AddItem(serverHandle, itemDefinition);
}
/// <inheritdoc />
@@ -63,12 +76,12 @@ public sealed class MxAccessComServer : IMxAccessServer
string itemDefinition,
string itemContext)
{
if (mxAccessComObject is ILMXProxyServer3 mxAccessServer)
if (mxAccessComObject is IMxAccessServer typedFake)
{
return mxAccessServer.AddItem2(serverHandle, itemDefinition, itemContext);
return typedFake.AddItem2(serverHandle, itemDefinition, itemContext);
}
return (int)Invoke(nameof(AddItem2), serverHandle, itemDefinition, itemContext);
return AsProxyServer3().AddItem2(serverHandle, itemDefinition, itemContext);
}
/// <inheritdoc />
@@ -76,13 +89,13 @@ public sealed class MxAccessComServer : IMxAccessServer
int serverHandle,
int itemHandle)
{
if (mxAccessComObject is ILMXProxyServer mxAccessServer)
if (mxAccessComObject is IMxAccessServer typedFake)
{
mxAccessServer.RemoveItem(serverHandle, itemHandle);
typedFake.RemoveItem(serverHandle, itemHandle);
return;
}
Invoke(nameof(RemoveItem), serverHandle, itemHandle);
AsProxyServer().RemoveItem(serverHandle, itemHandle);
}
/// <inheritdoc />
@@ -90,13 +103,13 @@ public sealed class MxAccessComServer : IMxAccessServer
int serverHandle,
int itemHandle)
{
if (mxAccessComObject is ILMXProxyServer mxAccessServer)
if (mxAccessComObject is IMxAccessServer typedFake)
{
mxAccessServer.Advise(serverHandle, itemHandle);
typedFake.Advise(serverHandle, itemHandle);
return;
}
Invoke(nameof(Advise), serverHandle, itemHandle);
AsProxyServer().Advise(serverHandle, itemHandle);
}
/// <inheritdoc />
@@ -104,13 +117,13 @@ public sealed class MxAccessComServer : IMxAccessServer
int serverHandle,
int itemHandle)
{
if (mxAccessComObject is ILMXProxyServer mxAccessServer)
if (mxAccessComObject is IMxAccessServer typedFake)
{
mxAccessServer.UnAdvise(serverHandle, itemHandle);
typedFake.UnAdvise(serverHandle, itemHandle);
return;
}
Invoke(nameof(UnAdvise), serverHandle, itemHandle);
AsProxyServer().UnAdvise(serverHandle, itemHandle);
}
/// <inheritdoc />
@@ -118,34 +131,36 @@ public sealed class MxAccessComServer : IMxAccessServer
int serverHandle,
int itemHandle)
{
if (mxAccessComObject is ILMXProxyServer4 mxAccessServer)
if (mxAccessComObject is IMxAccessServer typedFake)
{
mxAccessServer.AdviseSupervisory(serverHandle, itemHandle);
typedFake.AdviseSupervisory(serverHandle, itemHandle);
return;
}
Invoke(nameof(AdviseSupervisory), serverHandle, itemHandle);
AsProxyServer4().AdviseSupervisory(serverHandle, itemHandle);
}
private object Invoke(
string methodName,
params object[] arguments)
private ILMXProxyServer AsProxyServer()
{
try
{
return mxAccessComObject
.GetType()
.InvokeMember(
methodName,
BindingFlags.Instance | BindingFlags.Public | BindingFlags.InvokeMethod,
binder: null,
target: mxAccessComObject,
args: arguments);
}
catch (TargetInvocationException exception) when (exception.InnerException is not null)
{
ExceptionDispatchInfo.Capture(exception.InnerException).Throw();
throw;
}
return mxAccessComObject as ILMXProxyServer
?? throw new InvalidOperationException(
$"MXAccess COM object of type '{mxAccessComObject.GetType().FullName}' does not implement "
+ $"{nameof(ILMXProxyServer)} or {nameof(IMxAccessServer)}.");
}
private ILMXProxyServer3 AsProxyServer3()
{
return mxAccessComObject as ILMXProxyServer3
?? throw new InvalidOperationException(
$"MXAccess COM object of type '{mxAccessComObject.GetType().FullName}' does not implement "
+ $"{nameof(ILMXProxyServer3)} or {nameof(IMxAccessServer)}.");
}
private ILMXProxyServer4 AsProxyServer4()
{
return mxAccessComObject as ILMXProxyServer4
?? throw new InvalidOperationException(
$"MXAccess COM object of type '{mxAccessComObject.GetType().FullName}' does not implement "
+ $"{nameof(ILMXProxyServer4)} or {nameof(IMxAccessServer)}.");
}
}
@@ -23,6 +23,7 @@ public sealed class MxAccessStaSession : IWorkerRuntimeSession
private IAlarmCommandHandler? alarmCommandHandler;
private CancellationTokenSource? alarmPollCts;
private Task? alarmPollTask;
private int? alarmConsumerThreadId;
private bool disposed;
/// <summary>
@@ -180,6 +181,14 @@ public sealed class MxAccessStaSession : IWorkerRuntimeSession
session = MxAccessSession.Create(factory, eventSink, sessionId);
if (alarmCommandHandlerFactory is not null)
{
// STA-affinity invariant: the alarm consumer factory and
// every IMxAccessAlarmConsumer call must run on the STA
// thread, because the production wnwrap consumer holds an
// Apartment-threaded COM object. The factory runs here
// inside staRuntime.InvokeAsync, so this records the STA
// thread id; RunAlarmPollLoopAsync then asserts each
// PollOnce executes on the same thread.
alarmConsumerThreadId = Environment.CurrentManagedThreadId;
alarmCommandHandler = alarmCommandHandlerFactory(eventQueue);
}
commandDispatcher = new StaCommandDispatcher(
@@ -227,7 +236,11 @@ public sealed class MxAccessStaSession : IWorkerRuntimeSession
try
{
await staRuntime.InvokeAsync(
() => handler.PollOnce(),
() =>
{
EnsureOnAlarmConsumerThread();
handler.PollOnce();
},
cancellationToken).ConfigureAwait(false);
}
catch (OperationCanceledException)
@@ -244,10 +257,77 @@ public sealed class MxAccessStaSession : IWorkerRuntimeSession
// STA runtime shutting down — stop the loop gracefully.
return;
}
catch (Exception exception)
{
// A real alarm-poll failure (COMException from
// GetXmlCurrentAlarms2, malformed-XML parse failure, etc.).
// Record it as a fault on the event queue so a broken
// alarm subscription becomes observable on the IPC fault
// path instead of silently faulting this never-awaited
// task. The loop then stops — the subscription is dead.
eventQueue.RecordFault(CreateAlarmPollFault(exception));
return;
}
}
}, CancellationToken.None);
}
private void EnsureOnAlarmConsumerThread()
{
AssertOnAlarmConsumerThread(alarmConsumerThreadId, Environment.CurrentManagedThreadId);
}
/// <summary>
/// Enforces the STA-affinity invariant for the alarm consumer: every
/// <see cref="IMxAccessAlarmConsumer"/> call (and the consumer factory)
/// must run on the same thread the consumer was created on (the worker's
/// STA). Throws <see cref="InvalidOperationException"/> when a caller
/// breaks affinity — a programming error that would otherwise risk a
/// cross-apartment COM deadlock in the production wnwrap consumer, since
/// its CLSID is registered <c>ThreadingModel=Apartment</c>. The check is
/// a no-op until the consumer thread has been recorded (no alarm handler
/// configured, or session not yet started).
/// </summary>
/// <param name="expectedThreadId">
/// The managed thread id the alarm consumer was created on, or
/// <c>null</c> if no alarm consumer is configured.
/// </param>
/// <param name="actualThreadId">The current managed thread id.</param>
internal static void AssertOnAlarmConsumerThread(int? expectedThreadId, int actualThreadId)
{
if (expectedThreadId is not null && actualThreadId != expectedThreadId.Value)
{
throw new InvalidOperationException(
$"Alarm consumer accessed off its owning STA thread. Expected thread {expectedThreadId.Value}, "
+ $"actual {actualThreadId}. All IMxAccessAlarmConsumer calls must run on the STA that "
+ "created the consumer.");
}
}
private static WorkerFault CreateAlarmPollFault(Exception exception)
{
string message =
$"MXAccess alarm poll failed: {exception.Message}";
WorkerFault fault = new()
{
Category = WorkerFaultCategory.MxaccessEventConversionFailed,
ExceptionType = exception.GetType().FullName ?? string.Empty,
DiagnosticMessage = message,
ProtocolStatus = new ProtocolStatus
{
Code = ProtocolStatusCode.WorkerUnavailable,
Message = message,
},
};
if (exception is System.Runtime.InteropServices.COMException comException)
{
fault.Hresult = comException.HResult;
}
return fault;
}
/// <summary>
/// Dispatches a command to the STA thread for execution asynchronously.
/// </summary>