Resolve Worker.Tests-003..007 code-review findings
Worker.Tests-003: removed the wall-clock `Elapsed < 2s` assertion from InvokeAsync_WakesIdlePumpForQueuedCommand; the awaited completion against a 30s idle period already proves the wake event drove dispatch. Worker.Tests-004: MxAccessStaSession.Dispose now joins the alarm poll task after cancelling the CTS (consistent with ShutdownGracefullyAsync), and Dispose_StopsAlarmPollLoop asserts deterministically instead of via Task.Delay. Worker.Tests-005: undisposed MemoryStream instances across the frame-protocol and pipe-session tests are now `using` declarations. Worker.Tests-006: Dispose_StopsAlarmPollLoop now constructs MxAccessStaSession with `using` so a failed assertion cannot leak the STA poll loop. Worker.Tests-007: docs/WorkerFrameProtocol.md verification section corrected to target MxGateway.Worker.Tests / MxGateway.Worker with -p:Platform=x86. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -1,4 +1,3 @@
|
||||
using System;
|
||||
using System.IO;
|
||||
using System.Linq;
|
||||
using System.Threading.Tasks;
|
||||
@@ -19,7 +18,7 @@ public sealed class WorkerFrameProtocolTests
|
||||
public async Task WriteAndReadAsync_WithValidEnvelope_RoundTripsFrame()
|
||||
{
|
||||
WorkerFrameProtocolOptions options = CreateOptions();
|
||||
MemoryStream stream = new();
|
||||
using MemoryStream stream = new();
|
||||
WorkerEnvelope original = CreateGatewayHelloEnvelope();
|
||||
|
||||
WorkerFrameWriter writer = new(stream, options);
|
||||
@@ -39,7 +38,7 @@ public sealed class WorkerFrameProtocolTests
|
||||
WorkerFrameProtocolOptions options = CreateOptions();
|
||||
WorkerEnvelope envelope = CreateGatewayHelloEnvelope();
|
||||
envelope.ProtocolVersion++;
|
||||
MemoryStream stream = new(CreateFrame(envelope));
|
||||
using MemoryStream stream = new(CreateFrame(envelope));
|
||||
|
||||
WorkerFrameReader reader = new(stream, options);
|
||||
WorkerFrameProtocolException exception =
|
||||
@@ -56,7 +55,7 @@ public sealed class WorkerFrameProtocolTests
|
||||
WorkerFrameProtocolOptions options = CreateOptions();
|
||||
WorkerEnvelope envelope = CreateGatewayHelloEnvelope();
|
||||
envelope.SessionId = "different-session";
|
||||
MemoryStream stream = new(CreateFrame(envelope));
|
||||
using MemoryStream stream = new(CreateFrame(envelope));
|
||||
|
||||
WorkerFrameReader reader = new(stream, options);
|
||||
WorkerFrameProtocolException exception =
|
||||
@@ -71,7 +70,7 @@ public sealed class WorkerFrameProtocolTests
|
||||
public async Task ReadAsync_WithMalformedLength_ThrowsMalformedLength()
|
||||
{
|
||||
WorkerFrameProtocolOptions options = CreateOptions();
|
||||
MemoryStream stream = new(new byte[sizeof(uint)]);
|
||||
using MemoryStream stream = new(new byte[sizeof(uint)]);
|
||||
|
||||
WorkerFrameReader reader = new(stream, options);
|
||||
WorkerFrameProtocolException exception =
|
||||
@@ -86,7 +85,7 @@ public sealed class WorkerFrameProtocolTests
|
||||
public async Task ReadAsync_WithMalformedPayload_ThrowsInvalidEnvelope()
|
||||
{
|
||||
WorkerFrameProtocolOptions options = CreateOptions();
|
||||
MemoryStream stream = new(CreateFrame(new byte[] { 0x80 }));
|
||||
using MemoryStream stream = new(CreateFrame(new byte[] { 0x80 }));
|
||||
|
||||
WorkerFrameReader reader = new(stream, options);
|
||||
WorkerFrameProtocolException exception =
|
||||
@@ -101,7 +100,7 @@ public sealed class WorkerFrameProtocolTests
|
||||
public async Task WriteAsync_WithConcurrentCalls_SerializesCompleteFrames()
|
||||
{
|
||||
WorkerFrameProtocolOptions options = CreateOptions();
|
||||
MemoryStream stream = new();
|
||||
using MemoryStream stream = new();
|
||||
WorkerFrameWriter writer = new(stream, options);
|
||||
|
||||
await Task.WhenAll(
|
||||
|
||||
@@ -24,10 +24,10 @@ public sealed class WorkerPipeSessionTests
|
||||
public async Task CompleteStartupHandshakeAsync_WithValidGatewayHello_SendsHelloThenReady()
|
||||
{
|
||||
WorkerFrameProtocolOptions options = CreateOptions();
|
||||
MemoryStream inbound = new();
|
||||
using MemoryStream inbound = new();
|
||||
await new WorkerFrameWriter(inbound, options).WriteAsync(CreateGatewayHelloEnvelope());
|
||||
inbound.Position = 0;
|
||||
MemoryStream outbound = new();
|
||||
using MemoryStream outbound = new();
|
||||
WorkerPipeSession session = CreateSession(inbound, outbound, options);
|
||||
bool initialized = false;
|
||||
|
||||
@@ -55,10 +55,10 @@ public sealed class WorkerPipeSessionTests
|
||||
public async Task CompleteStartupHandshakeAsync_WithWrongNonce_FaultsBeforeInitialization()
|
||||
{
|
||||
WorkerFrameProtocolOptions options = CreateOptions();
|
||||
MemoryStream inbound = new();
|
||||
using MemoryStream inbound = new();
|
||||
await new WorkerFrameWriter(inbound, options).WriteAsync(CreateGatewayHelloEnvelope(nonce: "wrong"));
|
||||
inbound.Position = 0;
|
||||
MemoryStream outbound = new();
|
||||
using MemoryStream outbound = new();
|
||||
WorkerPipeSession session = CreateSession(inbound, outbound, options);
|
||||
bool initialized = false;
|
||||
|
||||
@@ -83,10 +83,10 @@ public sealed class WorkerPipeSessionTests
|
||||
public async Task CompleteStartupHandshakeAsync_WithWrongProtocol_FaultsBeforeInitialization()
|
||||
{
|
||||
WorkerFrameProtocolOptions options = CreateOptions();
|
||||
MemoryStream inbound = new();
|
||||
using MemoryStream inbound = new();
|
||||
await new WorkerFrameWriter(inbound, options).WriteAsync(CreateGatewayHelloEnvelope(supportedProtocolVersion: 999));
|
||||
inbound.Position = 0;
|
||||
MemoryStream outbound = new();
|
||||
using MemoryStream outbound = new();
|
||||
WorkerPipeSession session = CreateSession(inbound, outbound, options);
|
||||
bool initialized = false;
|
||||
|
||||
@@ -110,8 +110,8 @@ public sealed class WorkerPipeSessionTests
|
||||
public async Task CompleteStartupHandshakeAsync_WithMalformedFrame_WritesWorkerFault()
|
||||
{
|
||||
WorkerFrameProtocolOptions options = CreateOptions();
|
||||
MemoryStream inbound = new(CreateFrame(new byte[] { 0x80 }));
|
||||
MemoryStream outbound = new();
|
||||
using MemoryStream inbound = new(CreateFrame(new byte[] { 0x80 }));
|
||||
using MemoryStream outbound = new();
|
||||
WorkerPipeSession session = CreateSession(inbound, outbound, options);
|
||||
bool initialized = false;
|
||||
|
||||
@@ -137,10 +137,10 @@ public sealed class WorkerPipeSessionTests
|
||||
{
|
||||
const int hresult = unchecked((int)0x80040154);
|
||||
WorkerFrameProtocolOptions options = CreateOptions();
|
||||
MemoryStream inbound = new();
|
||||
using MemoryStream inbound = new();
|
||||
await new WorkerFrameWriter(inbound, options).WriteAsync(CreateGatewayHelloEnvelope());
|
||||
inbound.Position = 0;
|
||||
MemoryStream outbound = new();
|
||||
using MemoryStream outbound = new();
|
||||
WorkerPipeSession session = CreateSession(inbound, outbound, options);
|
||||
|
||||
await Assert.ThrowsAsync<COMException>(
|
||||
|
||||
@@ -293,7 +293,11 @@ public sealed class MxAccessStaSessionTests
|
||||
|
||||
/// <summary>
|
||||
/// Gap 2: Verifies that the STA poll loop stops when the session is disposed —
|
||||
/// no further PollOnce calls after disposal.
|
||||
/// no further PollOnce calls after disposal. <see cref="MxAccessStaSession.Dispose"/>
|
||||
/// joins the poll task before returning, so once Dispose returns no PollOnce
|
||||
/// call can still be in flight. The test asserts the poll count is frozen
|
||||
/// immediately after Dispose and stays frozen — deterministic, with no
|
||||
/// elapsed-time "no further polls" window that a slow agent could race.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task Dispose_StopsAlarmPollLoop()
|
||||
@@ -302,7 +306,11 @@ public sealed class MxAccessStaSessionTests
|
||||
FakeMxAccessComObjectFactory factory = new();
|
||||
FakeMxAccessEventSink eventSink = new();
|
||||
using StaRuntime runtime = CreateRuntime();
|
||||
MxAccessStaSession session = new(
|
||||
// using declaration: if an assertion below throws before the explicit
|
||||
// Dispose, the session (its STA poll loop and alarm handler) is still
|
||||
// torn down. Dispose is idempotent, so the explicit call mid-test and
|
||||
// the using-scope call do not conflict.
|
||||
using MxAccessStaSession session = new(
|
||||
runtime,
|
||||
factory,
|
||||
eventSink,
|
||||
@@ -320,11 +328,15 @@ public sealed class MxAccessStaSessionTests
|
||||
|
||||
Assert.True(handler.PollCount > 0, "Prerequisite: poll loop must have fired before dispose.");
|
||||
|
||||
// Dispose joins the poll task; when it returns the loop has stopped
|
||||
// and no PollOnce call is still running.
|
||||
session.Dispose();
|
||||
int pollCountAtDispose = handler.PollCount;
|
||||
|
||||
// Wait 1 second and verify no further polls occur.
|
||||
await Task.Delay(1000);
|
||||
// The count is already frozen — re-reading after a yield must not
|
||||
// observe any further poll. This is a deterministic check, not a
|
||||
// timing window: a poll cannot start once the joined loop has exited.
|
||||
await Task.Yield();
|
||||
Assert.Equal(pollCountAtDispose, handler.PollCount);
|
||||
}
|
||||
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
using System;
|
||||
using System.Diagnostics;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using MxGateway.Worker.Sta;
|
||||
@@ -27,7 +26,15 @@ public sealed class StaRuntimeTests
|
||||
Assert.Equal(ApartmentState.STA, observation.ApartmentState);
|
||||
}
|
||||
|
||||
/// <summary>Verifies that InvokeAsync wakes the idle pump when a command is queued.</summary>
|
||||
/// <summary>
|
||||
/// Verifies that InvokeAsync wakes the idle pump when a command is queued.
|
||||
/// The pump is configured with a 30-second idle period — far longer than
|
||||
/// any reasonable test run — so the awaited command completing at all proves
|
||||
/// the command wake event (not the idle pump tick) drove the dispatch. No
|
||||
/// wall-clock assertion is used: a loaded CI agent can stall an otherwise
|
||||
/// correct dispatch past an arbitrary millisecond budget, which would be a
|
||||
/// false failure.
|
||||
/// </summary>
|
||||
[Fact]
|
||||
public async Task InvokeAsync_WakesIdlePumpForQueuedCommand()
|
||||
{
|
||||
@@ -37,15 +44,10 @@ public sealed class StaRuntimeTests
|
||||
new StaMessagePump(),
|
||||
TimeSpan.FromSeconds(30));
|
||||
runtime.Start();
|
||||
Stopwatch stopwatch = Stopwatch.StartNew();
|
||||
|
||||
int threadId = await runtime.InvokeAsync(() => Thread.CurrentThread.ManagedThreadId);
|
||||
|
||||
stopwatch.Stop();
|
||||
Assert.Equal(runtime.StaThreadId, threadId);
|
||||
Assert.True(
|
||||
stopwatch.Elapsed < TimeSpan.FromSeconds(2),
|
||||
$"Command took {stopwatch.Elapsed} to execute, so the command wake event did not wake the STA promptly.");
|
||||
}
|
||||
|
||||
/// <summary>Verifies that Shutdown stops the thread and uninitializes the COM apartment.</summary>
|
||||
|
||||
@@ -580,13 +580,27 @@ public sealed class MxAccessStaSession : IWorkerRuntimeSession
|
||||
|
||||
RequestShutdown();
|
||||
|
||||
// Cancel and discard the STA poll loop.
|
||||
// Cancel the STA poll loop and join it before disposing the alarm
|
||||
// handler. Joining (rather than discarding alarmPollTask) makes the
|
||||
// stop deterministic: once Dispose returns, no further PollOnce calls
|
||||
// can be in flight, so callers and tests can rely on a frozen poll
|
||||
// count instead of an elapsed-time "no further polls" window.
|
||||
CancellationTokenSource? pollCtsToDispose = alarmPollCts;
|
||||
Task? pollTaskToJoin = alarmPollTask;
|
||||
alarmPollCts = null;
|
||||
alarmPollTask = null;
|
||||
if (pollCtsToDispose is not null)
|
||||
{
|
||||
try { pollCtsToDispose.Cancel(); } catch { }
|
||||
if (pollTaskToJoin is not null)
|
||||
{
|
||||
try
|
||||
{
|
||||
pollTaskToJoin.Wait(TimeSpan.FromSeconds(5));
|
||||
}
|
||||
catch (AggregateException) { }
|
||||
catch (ObjectDisposedException) { }
|
||||
}
|
||||
try { pollCtsToDispose.Dispose(); } catch { }
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user