Resolve Worker.Tests-008..015 code-review findings

Worker.Tests-008: moved the misplaced WorkerLogRedactor test out of
VariantConverterTests into Bootstrap/WorkerLogRedactorTests.

Worker.Tests-009: renamed 46 snake_case alarm-test methods to PascalCase
Method_Scenario_Expectation.

Worker.Tests-010: replaced a weak Assert.Contains with an exact assertion
against the real diagnostic message and corrected the XML doc.

Worker.Tests-011: renamed and re-documented a cancellation test that
overstated what it proved.

Worker.Tests-012: added an oversized-frame (MessageTooLarge) test; renamed
the mislabeled zero-length-payload test.

Worker.Tests-013: removed the fixed-100ms ThrowIfCompletedAsync helper; the
caller now races runTask deterministically.

Worker.Tests-014: consolidated duplicated test fakes/helpers
(FakeRuntimeSession, NoopComApartmentInitializer, NoopEventSink, frame
helpers) into a shared TestSupport namespace.

Worker.Tests-015: added MxAccessEventQueue coverage for drain-all (maxEvents
0), empty-queue drain, and enqueue-after-fault.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-18 22:59:07 -04:00
parent 9582de077b
commit 371bcb3f91
19 changed files with 507 additions and 512 deletions
@@ -0,0 +1,217 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Google.Protobuf.WellKnownTypes;
using MxGateway.Contracts.Proto;
using MxGateway.Worker.Ipc;
using MxGateway.Worker.MxAccess;
using MxGateway.Worker.Sta;
namespace MxGateway.Worker.Tests.TestSupport;
/// <summary>
/// Single configurable <see cref="IWorkerRuntimeSession"/> test double shared by
/// the IPC tests. Replaces the two independent (and previously diverged)
/// <c>FakeRuntimeSession</c> copies in WorkerPipeSessionTests and
/// WorkerPipeClientTests: one supported dispatch blocking and event enqueue, the
/// other did not. This consolidated double supports every configuration both
/// call sites needed, so a minimal caller simply leaves the options unset.
/// </summary>
internal sealed class FakeRuntimeSession : IWorkerRuntimeSession
{
private readonly ManualResetEventSlim releaseDispatch = new(false);
private readonly object gate = new();
private readonly Queue<WorkerEvent> events = new();
private WorkerRuntimeHeartbeatSnapshot snapshot = new(
DateTimeOffset.UtcNow,
pendingCommandCount: 0,
outboundEventQueueDepth: 0,
lastEventSequence: 0,
currentCommandCorrelationId: string.Empty);
/// <summary>Gets the event signaled when dispatch begins.</summary>
public ManualResetEventSlim DispatchStarted { get; } = new(false);
/// <summary>Blocks dispatch execution until explicitly released.</summary>
public bool BlockDispatch { get; set; }
/// <summary>Gets or sets whether to throw an exception after dispatch is released.</summary>
public bool ThrowAfterDispatchReleased { get; set; }
/// <summary>Gets or sets whether ShutdownGracefullyAsync throws a TimeoutException.</summary>
public bool ThrowTimeoutOnShutdown { get; set; }
/// <summary>Gets a value indicating whether Dispose was called.</summary>
public bool Disposed { get; private set; }
/// <summary>Starts the worker session with the given session ID and process ID.</summary>
/// <param name="sessionId">The session identifier.</param>
/// <param name="workerProcessId">The worker process ID.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>Worker ready response.</returns>
public Task<WorkerReady> StartAsync(
string sessionId,
int workerProcessId,
CancellationToken cancellationToken = default)
{
return Task.FromResult(new WorkerReady
{
WorkerProcessId = workerProcessId,
MxaccessProgid = MxAccessInteropInfo.ProgId,
MxaccessClsid = MxAccessInteropInfo.Clsid,
ReadyTimestamp = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow),
});
}
/// <summary>Dispatches a command to the STA thread.</summary>
/// <param name="command">The command to dispatch.</param>
/// <returns>The command reply.</returns>
public Task<MxCommandReply> DispatchAsync(StaCommand command)
{
return Task.Run(
() =>
{
SetSnapshot(new WorkerRuntimeHeartbeatSnapshot(
DateTimeOffset.UtcNow,
pendingCommandCount: 0,
outboundEventQueueDepth: 0,
lastEventSequence: 0,
command.CorrelationId));
DispatchStarted.Set();
if (BlockDispatch)
{
releaseDispatch.Wait(TimeSpan.FromSeconds(5));
}
SetSnapshot(new WorkerRuntimeHeartbeatSnapshot(
DateTimeOffset.UtcNow,
pendingCommandCount: 0,
outboundEventQueueDepth: 0,
lastEventSequence: 0,
currentCommandCorrelationId: string.Empty));
if (ThrowAfterDispatchReleased)
{
throw new InvalidOperationException("Command failed after shutdown started.");
}
return new MxCommandReply
{
SessionId = command.SessionId,
CorrelationId = command.CorrelationId,
Kind = command.Kind,
ProtocolStatus = new ProtocolStatus
{
Code = ProtocolStatusCode.Ok,
Message = "OK",
},
};
});
}
/// <summary>Captures current heartbeat snapshot.</summary>
/// <returns>Current runtime heartbeat snapshot.</returns>
public WorkerRuntimeHeartbeatSnapshot CaptureHeartbeat()
{
lock (gate)
{
return snapshot;
}
}
/// <summary>Drains queued events up to the specified limit.</summary>
/// <param name="maxEvents">Maximum events to drain; 0 drains all.</param>
/// <returns>The drained events.</returns>
public IReadOnlyList<WorkerEvent> DrainEvents(uint maxEvents)
{
lock (gate)
{
int drainCount = maxEvents == 0
? events.Count
: Math.Min(events.Count, checked((int)Math.Min(maxEvents, int.MaxValue)));
List<WorkerEvent> drained = new(drainCount);
for (int index = 0; index < drainCount; index++)
{
drained.Add(events.Dequeue());
}
return drained;
}
}
/// <summary>Drains a pending fault if any.</summary>
/// <returns>Pending fault or null.</returns>
public WorkerFault? DrainFault()
{
return null;
}
/// <summary>Cancels command by correlation ID.</summary>
/// <param name="correlationId">The command correlation ID.</param>
/// <returns>True if cancelled; false otherwise.</returns>
public bool CancelCommand(string correlationId)
{
return false;
}
/// <summary>Requests graceful shutdown.</summary>
public void RequestShutdown()
{
releaseDispatch.Set();
}
/// <summary>Shuts down gracefully within the specified timeout.</summary>
/// <param name="timeout">Shutdown timeout period.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>Shutdown result.</returns>
public Task<MxAccessShutdownResult> ShutdownGracefullyAsync(
TimeSpan timeout,
CancellationToken cancellationToken = default)
{
releaseDispatch.Set();
if (ThrowTimeoutOnShutdown)
{
return Task.FromException<MxAccessShutdownResult>(
new TimeoutException("Simulated graceful shutdown timeout."));
}
return Task.FromResult(new MxAccessShutdownResult(Array.Empty<MxAccessShutdownFailure>()));
}
/// <summary>Releases a blocked dispatch.</summary>
public void ReleaseDispatch()
{
releaseDispatch.Set();
}
/// <summary>Sets the current heartbeat snapshot.</summary>
/// <param name="value">The snapshot to set.</param>
public void SetSnapshot(WorkerRuntimeHeartbeatSnapshot value)
{
lock (gate)
{
snapshot = value;
}
}
/// <summary>Enqueues a worker event to be drained.</summary>
/// <param name="workerEvent">The event to enqueue.</param>
public void EnqueueEvent(WorkerEvent workerEvent)
{
lock (gate)
{
events.Enqueue(workerEvent);
}
}
/// <summary>Disposes resources.</summary>
public void Dispose()
{
Disposed = true;
releaseDispatch.Set();
releaseDispatch.Dispose();
DispatchStarted.Dispose();
}
}