Resolve Worker.Tests-001 and Worker.Tests-002 code-review findings

Worker.Tests-001: StaMessagePump had no direct unit test. Added
Sta/StaMessagePumpTests.cs — 8 STA-thread facts covering WaitForWorkOrMessages
(wake-event signalled before/during the wait, timeout expiry, zero-timeout
fast path, the QS_ALLINPUT posted-message wake path) and PumpPendingMessages
drain counting.

Worker.Tests-002: no test drove a COM event through the integrated
sink -> mapper -> queue path. Added MxAccess/MxAccessBaseEventSinkTests.cs —
5 facts driving OnDataChange, OnWriteComplete, OperationComplete and
OnBufferedDataChange through a real MxAccessBaseEventSink + mapper + queue and
asserting the converted WorkerEvent lands in MxAccessEventQueue. The four COM
event handlers were widened private -> internal and InternalsVisibleTo for
MxGateway.Worker.Tests was added, mirroring MxAccessAlarmEventSink's existing
test seam; no worker behavior changes.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-18 21:07:48 -04:00
parent 53e3973209
commit 1b4dcf32d5
5 changed files with 459 additions and 9 deletions
@@ -0,0 +1,167 @@
using System;
using ArchestrA.MxAccess;
using MxGateway.Contracts.Proto;
using MxGateway.Worker.MxAccess;
using ComMxDataType = ArchestrA.MxAccess.MxDataType;
namespace MxGateway.Worker.Tests.MxAccess;
/// <summary>
/// Integrated tests for <see cref="MxAccessBaseEventSink"/>: drive an MXAccess COM
/// event through the real sink → <see cref="MxAccessEventMapper"/> →
/// <see cref="MxAccessEventQueue"/> pipeline and assert a correctly-converted
/// protobuf <see cref="WorkerEvent"/> lands in the queue.
/// </summary>
/// <remarks>
/// Boundary: the COM-side <c>+=</c> subscription performed in
/// <see cref="MxAccessBaseEventSink.Attach"/> casts the supplied object to the
/// sealed <c>LMXProxyServerClass</c> RCW and cannot run without a live MXAccess COM
/// object, so <c>Attach</c>/<c>Detach</c> are not exercised here. The event
/// handlers themselves (<c>OnDataChange</c>, <c>OnWriteComplete</c>,
/// <c>OperationComplete</c>, <c>OnBufferedDataChange</c>) are the exact delegate
/// targets the COM runtime invokes; calling them directly reproduces an STA-thread
/// COM callback and exercises the genuine conversion + enqueue path. The
/// <c>sessionId</c> normally set by <c>Attach</c> defaults to empty here, which the
/// assertions account for. The COM-event-conversion fault branch is left to
/// <see cref="MxAccessEventMapperTests"/> and the queue's own fault tests.
/// </remarks>
public sealed class MxAccessBaseEventSinkTests
{
/// <summary>
/// Verifies that an OnDataChange COM callback converts to a protobuf event and lands in the queue.
/// </summary>
[Fact]
public void OnDataChange_ComCallback_ConvertedEventLandsInQueue()
{
MxAccessEventQueue queue = new();
MxAccessBaseEventSink sink = new(queue, new MxAccessEventMapper());
DateTime timestamp = new(2026, 5, 18, 9, 15, 0, DateTimeKind.Utc);
MXSTATUS_PROXY[] statuses = Array.Empty<MXSTATUS_PROXY>();
sink.OnDataChange(
hLMXServerHandle: 7,
phItemHandle: 21,
pvItemValue: 1234,
pwItemQuality: 192,
pftItemTimeStamp: timestamp,
ref statuses);
Assert.Equal(1, queue.Count);
Assert.Equal(1UL, queue.LastEventSequence);
Assert.True(queue.TryDequeue(out WorkerEvent? workerEvent));
Assert.NotNull(workerEvent);
MxEvent mxEvent = workerEvent!.Event;
Assert.Equal(MxEventFamily.OnDataChange, mxEvent.Family);
Assert.Equal(MxEvent.BodyOneofCase.OnDataChange, mxEvent.BodyCase);
Assert.Equal(7, mxEvent.ServerHandle);
Assert.Equal(21, mxEvent.ItemHandle);
Assert.Equal(1234, mxEvent.Value.Int32Value);
Assert.Equal(192, mxEvent.Quality);
Assert.Equal(timestamp, mxEvent.SourceTimestamp.ToDateTime());
Assert.Equal(1UL, mxEvent.WorkerSequence);
Assert.NotNull(mxEvent.WorkerTimestamp);
}
/// <summary>
/// Verifies that consecutive OnDataChange callbacks land in the queue with monotonic sequences.
/// </summary>
[Fact]
public void OnDataChange_MultipleComCallbacks_QueueAssignsMonotonicSequences()
{
MxAccessEventQueue queue = new();
MxAccessBaseEventSink sink = new(queue, new MxAccessEventMapper());
MXSTATUS_PROXY[] statuses = Array.Empty<MXSTATUS_PROXY>();
sink.OnDataChange(1, 10, 100, 192, DateTime.UtcNow, ref statuses);
sink.OnDataChange(1, 11, 200, 192, DateTime.UtcNow, ref statuses);
sink.OnDataChange(1, 12, 300, 192, DateTime.UtcNow, ref statuses);
Assert.Equal(3, queue.Count);
Assert.Equal(3UL, queue.LastEventSequence);
Assert.True(queue.TryDequeue(out WorkerEvent? first));
Assert.True(queue.TryDequeue(out WorkerEvent? second));
Assert.True(queue.TryDequeue(out WorkerEvent? third));
Assert.Equal(1UL, first!.Event.WorkerSequence);
Assert.Equal(2UL, second!.Event.WorkerSequence);
Assert.Equal(3UL, third!.Event.WorkerSequence);
Assert.Equal(10, first.Event.ItemHandle);
Assert.Equal(12, third.Event.ItemHandle);
}
/// <summary>
/// Verifies that an OnWriteComplete COM callback lands in the queue with the correct family.
/// </summary>
[Fact]
public void OnWriteComplete_ComCallback_ConvertedEventLandsInQueue()
{
MxAccessEventQueue queue = new();
MxAccessBaseEventSink sink = new(queue, new MxAccessEventMapper());
MXSTATUS_PROXY[] statuses = Array.Empty<MXSTATUS_PROXY>();
sink.OnWriteComplete(hLMXServerHandle: 3, phItemHandle: 9, ref statuses);
Assert.Equal(1, queue.Count);
Assert.True(queue.TryDequeue(out WorkerEvent? workerEvent));
MxEvent mxEvent = workerEvent!.Event;
Assert.Equal(MxEventFamily.OnWriteComplete, mxEvent.Family);
Assert.Equal(MxEvent.BodyOneofCase.OnWriteComplete, mxEvent.BodyCase);
Assert.Equal(3, mxEvent.ServerHandle);
Assert.Equal(9, mxEvent.ItemHandle);
Assert.Equal(1UL, mxEvent.WorkerSequence);
}
/// <summary>
/// Verifies that an OperationComplete COM callback lands in the queue with the correct family.
/// </summary>
[Fact]
public void OperationComplete_ComCallback_ConvertedEventLandsInQueue()
{
MxAccessEventQueue queue = new();
MxAccessBaseEventSink sink = new(queue, new MxAccessEventMapper());
MXSTATUS_PROXY[] statuses = Array.Empty<MXSTATUS_PROXY>();
sink.OperationComplete(hLMXServerHandle: 4, phItemHandle: 8, ref statuses);
Assert.Equal(1, queue.Count);
Assert.True(queue.TryDequeue(out WorkerEvent? workerEvent));
MxEvent mxEvent = workerEvent!.Event;
Assert.Equal(MxEventFamily.OperationComplete, mxEvent.Family);
Assert.Equal(MxEvent.BodyOneofCase.OperationComplete, mxEvent.BodyCase);
Assert.Equal(4, mxEvent.ServerHandle);
Assert.Equal(8, mxEvent.ItemHandle);
}
/// <summary>
/// Verifies that an OnBufferedDataChange COM callback converts the value and lands in the queue.
/// </summary>
[Fact]
public void OnBufferedDataChange_ComCallback_ConvertedEventLandsInQueue()
{
MxAccessEventQueue queue = new();
MxAccessBaseEventSink sink = new(queue, new MxAccessEventMapper());
MXSTATUS_PROXY[] statuses = Array.Empty<MXSTATUS_PROXY>();
// Raw MXAccess data-type code 2 == Integer (see MxAccessEventMapper.MapMxDataType).
const int integerDataTypeCode = 2;
sink.OnBufferedDataChange(
hLMXServerHandle: 5,
phItemHandle: 13,
dtDataType: (ComMxDataType)integerDataTypeCode,
pvItemValue: 77,
pwItemQuality: 192,
pftItemTimeStamp: DateTime.UtcNow,
ref statuses);
Assert.Equal(1, queue.Count);
Assert.True(queue.TryDequeue(out WorkerEvent? workerEvent));
MxEvent mxEvent = workerEvent!.Event;
Assert.Equal(MxEventFamily.OnBufferedDataChange, mxEvent.Family);
Assert.Equal(MxEvent.BodyOneofCase.OnBufferedDataChange, mxEvent.BodyCase);
Assert.Equal(5, mxEvent.ServerHandle);
Assert.Equal(13, mxEvent.ItemHandle);
Assert.Equal(integerDataTypeCode, mxEvent.OnBufferedDataChange.RawDataType);
}
}
@@ -0,0 +1,260 @@
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using MxGateway.Worker.Sta;
namespace MxGateway.Worker.Tests.Sta;
/// <summary>
/// Tests for <see cref="StaMessagePump"/>.
/// </summary>
/// <remarks>
/// Boundary: the <c>MsgWaitFailed</c> failure branch of <c>WaitForWorkOrMessages</c>
/// is not exercised. Forcing <c>MsgWaitForMultipleObjectsEx</c> to fail requires
/// passing a deliberately invalid native handle, which is unsafe to construct in a
/// managed test and can corrupt the thread's wait state. The other behavior — null
/// argument validation, waking on a signalled event, returning on timeout, the
/// timeout conversion edge cases observable through wait latency, and the
/// pump's drain count — is covered directly here.
/// </remarks>
public sealed class StaMessagePumpTests
{
/// <summary>
/// Verifies that WaitForWorkOrMessages throws ArgumentNullException for a null wake event.
/// </summary>
[Fact]
public void WaitForWorkOrMessages_NullWakeEvent_ThrowsArgumentNullException()
{
StaMessagePump pump = new();
ArgumentNullException exception = Assert.Throws<ArgumentNullException>(
() => pump.WaitForWorkOrMessages(null!, TimeSpan.FromMilliseconds(10)));
Assert.Equal("commandWakeEvent", exception.ParamName);
}
/// <summary>
/// Verifies that WaitForWorkOrMessages returns promptly when the wake event is already signalled.
/// </summary>
[Fact]
public async Task WaitForWorkOrMessages_WakeEventAlreadySignalled_ReturnsImmediately()
{
StaMessagePump pump = new();
using ManualResetEventSlim wakeEvent = new(initialState: true);
await RunOnStaThreadAsync(() =>
{
Stopwatch stopwatch = Stopwatch.StartNew();
pump.WaitForWorkOrMessages(wakeEvent.WaitHandle, TimeSpan.FromSeconds(30));
stopwatch.Stop();
// A 30s timeout was supplied; returning quickly proves the signalled
// wake handle — not the timeout — ended the wait.
Assert.True(
stopwatch.Elapsed < TimeSpan.FromSeconds(5),
$"Wait took {stopwatch.Elapsed}; a pre-signalled wake event should return immediately.");
});
}
/// <summary>
/// Verifies that WaitForWorkOrMessages wakes when the wake event is signalled from another thread.
/// </summary>
[Fact]
public async Task WaitForWorkOrMessages_WakeEventSignalledDuringWait_Returns()
{
StaMessagePump pump = new();
using ManualResetEventSlim wakeEvent = new(initialState: false);
Task signalTask = Task.Run(async () =>
{
await Task.Delay(150, CancellationToken.None);
wakeEvent.Set();
});
await RunOnStaThreadAsync(() =>
{
Stopwatch stopwatch = Stopwatch.StartNew();
pump.WaitForWorkOrMessages(wakeEvent.WaitHandle, TimeSpan.FromSeconds(30));
stopwatch.Stop();
Assert.True(
stopwatch.Elapsed < TimeSpan.FromSeconds(10),
$"Wait took {stopwatch.Elapsed}; signalling the wake event should end the 30s wait early.");
});
await signalTask;
}
/// <summary>
/// Verifies that WaitForWorkOrMessages returns on timeout when the wake event is never signalled.
/// </summary>
[Fact]
public async Task WaitForWorkOrMessages_WakeEventNeverSignalled_ReturnsAfterTimeout()
{
StaMessagePump pump = new();
using ManualResetEventSlim wakeEvent = new(initialState: false);
await RunOnStaThreadAsync(() =>
{
Stopwatch stopwatch = Stopwatch.StartNew();
pump.WaitForWorkOrMessages(wakeEvent.WaitHandle, TimeSpan.FromMilliseconds(150));
stopwatch.Stop();
// The wait must end of its own accord (timeout). Lower bound is loose
// because the timeout converts via Math.Ceiling and the OS scheduler
// adds slack; upper bound proves it is not waiting indefinitely.
Assert.True(
stopwatch.Elapsed < TimeSpan.FromSeconds(10),
$"Wait took {stopwatch.Elapsed}; a 150ms timeout should end the wait without a signal.");
});
}
/// <summary>
/// Verifies that a zero timeout (the TimeSpan.Zero conversion branch) returns without blocking.
/// </summary>
[Fact]
public async Task WaitForWorkOrMessages_ZeroTimeout_ReturnsWithoutBlocking()
{
StaMessagePump pump = new();
using ManualResetEventSlim wakeEvent = new(initialState: false);
await RunOnStaThreadAsync(() =>
{
Stopwatch stopwatch = Stopwatch.StartNew();
// TimeSpan.Zero exercises the "<= Zero -> 0 ms" conversion branch:
// MsgWaitForMultipleObjectsEx polls and returns immediately.
pump.WaitForWorkOrMessages(wakeEvent.WaitHandle, TimeSpan.Zero);
stopwatch.Stop();
Assert.True(
stopwatch.Elapsed < TimeSpan.FromSeconds(2),
$"Wait took {stopwatch.Elapsed}; a zero timeout must not block.");
});
}
/// <summary>
/// Verifies that PumpPendingMessages returns zero when the STA thread message queue is empty.
/// </summary>
[Fact]
public async Task PumpPendingMessages_NoMessagesPosted_ReturnsZero()
{
StaMessagePump pump = new();
int pumped = await RunOnStaThreadAsync(() =>
{
// Drain anything the apartment/thread start posted, then measure a clean queue.
pump.PumpPendingMessages();
return pump.PumpPendingMessages();
});
Assert.Equal(0, pumped);
}
/// <summary>
/// Verifies that PumpPendingMessages dispatches and counts messages posted to the STA thread.
/// </summary>
[Fact]
public async Task PumpPendingMessages_MessagesPostedToStaThread_ReturnsCountProcessed()
{
StaMessagePump pump = new();
int pumped = await RunOnStaThreadAsync(() =>
{
// Clear any startup messages so the count reflects only what we post.
pump.PumpPendingMessages();
uint threadId = GetCurrentThreadId();
Assert.True(PostThreadMessage(threadId, WmNull, UIntPtr.Zero, IntPtr.Zero));
Assert.True(PostThreadMessage(threadId, WmNull, UIntPtr.Zero, IntPtr.Zero));
Assert.True(PostThreadMessage(threadId, WmNull, UIntPtr.Zero, IntPtr.Zero));
return pump.PumpPendingMessages();
});
Assert.Equal(3, pumped);
}
/// <summary>
/// Verifies that WaitForWorkOrMessages returns once a Windows message is posted to the STA thread.
/// </summary>
[Fact]
public async Task WaitForWorkOrMessages_WindowsMessagePosted_ReturnsForInputAvailable()
{
StaMessagePump pump = new();
using ManualResetEventSlim wakeEvent = new(initialState: false);
using ManualResetEventSlim threadReady = new(initialState: false);
uint staThreadId = 0;
Task staTask = RunOnStaThreadAsync(() =>
{
staThreadId = GetCurrentThreadId();
pump.PumpPendingMessages();
threadReady.Set();
Stopwatch stopwatch = Stopwatch.StartNew();
// The wake event is never signalled. Only the posted Windows message
// (QS_ALLINPUT wake mask) can end this 30s wait early.
pump.WaitForWorkOrMessages(wakeEvent.WaitHandle, TimeSpan.FromSeconds(30));
stopwatch.Stop();
Assert.True(
stopwatch.Elapsed < TimeSpan.FromSeconds(10),
$"Wait took {stopwatch.Elapsed}; a posted Windows message should wake the pump.");
});
Assert.True(threadReady.Wait(TimeSpan.FromSeconds(5)), "STA thread did not start.");
await Task.Delay(100, CancellationToken.None);
Assert.True(
PostThreadMessage(staThreadId, WmNull, UIntPtr.Zero, IntPtr.Zero),
"Failed to post a Windows message to the STA thread.");
await staTask;
}
private const uint WmNull = 0x0000;
/// <summary>Runs an action on a dedicated STA thread and returns when it completes.</summary>
private static Task RunOnStaThreadAsync(Action action)
{
return RunOnStaThreadAsync(() =>
{
action();
return 0;
});
}
/// <summary>Runs a function on a dedicated STA thread and returns its result.</summary>
private static Task<T> RunOnStaThreadAsync<T>(Func<T> function)
{
TaskCompletionSource<T> completion = new();
Thread thread = new(() =>
{
try
{
completion.SetResult(function());
}
catch (Exception exception)
{
completion.SetException(exception);
}
})
{
IsBackground = true,
};
thread.SetApartmentState(ApartmentState.STA);
thread.Start();
return completion.Task;
}
[System.Runtime.InteropServices.DllImport("kernel32.dll")]
private static extern uint GetCurrentThreadId();
[System.Runtime.InteropServices.DllImport("user32.dll", SetLastError = true)]
private static extern bool PostThreadMessage(
uint threadId,
uint message,
UIntPtr wParam,
IntPtr lParam);
}
@@ -65,7 +65,14 @@ public sealed class MxAccessBaseEventSink : IMxAccessEventSink
sessionId = string.Empty;
}
private void OnDataChange(
/// <summary>
/// Handles the MXAccess <c>OnDataChange</c> COM event: converts the
/// event arguments to a protobuf <see cref="Proto.MxEvent"/> and enqueues
/// it. Subscribed to the COM object's event in <see cref="Attach"/>.
/// Exposed <c>internal</c> so unit tests can drive the integrated
/// sink → mapper → queue path without a live MXAccess COM event source.
/// </summary>
internal void OnDataChange(
int hLMXServerHandle,
int phItemHandle,
object pvItemValue,
@@ -84,7 +91,11 @@ public sealed class MxAccessBaseEventSink : IMxAccessEventSink
statuses));
}
private void OnWriteComplete(
/// <summary>
/// Handles the MXAccess <c>OnWriteComplete</c> COM event. Exposed
/// <c>internal</c> as a unit-test seam; see <see cref="OnDataChange"/>.
/// </summary>
internal void OnWriteComplete(
int hLMXServerHandle,
int phItemHandle,
ref MXSTATUS_PROXY[] pVars)
@@ -97,7 +108,11 @@ public sealed class MxAccessBaseEventSink : IMxAccessEventSink
statuses));
}
private void OperationComplete(
/// <summary>
/// Handles the MXAccess <c>OperationComplete</c> COM event. Exposed
/// <c>internal</c> as a unit-test seam; see <see cref="OnDataChange"/>.
/// </summary>
internal void OperationComplete(
int hLMXServerHandle,
int phItemHandle,
ref MXSTATUS_PROXY[] pVars)
@@ -110,7 +125,11 @@ public sealed class MxAccessBaseEventSink : IMxAccessEventSink
statuses));
}
private void OnBufferedDataChange(
/// <summary>
/// Handles the MXAccess <c>OnBufferedDataChange</c> COM event. Exposed
/// <c>internal</c> as a unit-test seam; see <see cref="OnDataChange"/>.
/// </summary>
internal void OnBufferedDataChange(
int hLMXServerHandle,
int phItemHandle,
MxDataType dtDataType,
@@ -14,6 +14,10 @@
<PackageReference Include="Polly.Core" Version="8.6.6" />
</ItemGroup>
<ItemGroup>
<InternalsVisibleTo Include="MxGateway.Worker.Tests" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\MxGateway.Contracts\MxGateway.Contracts.csproj" />
</ItemGroup>