rename: prefix gateway projects/namespaces with ZB.MOM.WW + sln→slnx

Apply the ZB.MOM.WW. prefix to all gateway-side projects, folders,
.csproj/.sln contents, C# namespaces, using directives, generated proto
C# (csharp_namespace + checked-in generated files), InternalsVisibleTo
attributes, project-name string literals (LoadProject, .sln lookups,
worker exe paths, staticwebassets manifest), and the install/script/doc
references that point at any of the above. Migrate the solution from
.sln to .slnx via `dotnet sln migrate` and delete the old file.

External-runtime identifiers are intentionally NOT prefixed so external
configuration keeps working:
- GatewayMetrics.cs MeterName ("MxGateway.Server")
- DashboardAuthenticationDefaults Scheme/Policy ("MxGateway.Dashboard")
- GatewayRequestLoggingMiddleware logger category ("MxGateway.Request")
- StaRuntime thread name ("MxGateway.Worker.STA")
- appsettings.json root section "MxGateway" + env-var prefix
  MxGateway__... and secret-name MxGateway:ApiKeyPepper
- C:\ProgramData\MxGateway\ data dir paths

Also fixes two tests that were not rename-related but became visible
while validating the rename:

- WorkerLiveMxAccessSmokeTests.ShutDownAsync: cancellation that the
  gateway service correctly maps to RpcException(Cancelled) per gRPC
  convention was being misclassified as a stream fault. Added a sibling
  catch on RpcException with StatusCode.Cancelled.

- IntegrationTestEnvironment.ResolveRepositoryRoot: extracted IsRepositoryRoot
  and made it accept either a .git marker OR a .sln/.slnx next to src/
  so the worker-exe walker works in non-git working copies.

clients/proto/proto-inputs.json's protoRoot updated to point at
src/ZB.MOM.WW.MxGateway.Contracts/Protos.

Verified by `dotnet build` and a full `dotnet test` of the .slnx with
MXGATEWAY_RUN_LIVE_{MXACCESS,LDAP,GALAXY}_TESTS=1:
  Tests: 472/472 pass
  Worker.Tests: 280/280 pass (4 dev-rig [Fact(Skip=...)] skipped)
  IntegrationTests: 18/18 pass

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-23 16:22:23 -04:00
parent 867bf18116
commit dc9c0c950c
491 changed files with 32854 additions and 8414 deletions
@@ -0,0 +1,226 @@
using ZB.MOM.WW.MxGateway.Contracts;
using ZB.MOM.WW.MxGateway.Contracts.Proto;
using ZB.MOM.WW.MxGateway.Server.Workers;
using ZB.MOM.WW.MxGateway.Tests.Gateway.Workers.Fakes;
using ZB.MOM.WW.MxGateway.Tests.TestSupport;
namespace ZB.MOM.WW.MxGateway.Tests.Gateway.Workers;
public sealed class FakeWorkerHarnessTests
{
private static readonly TimeSpan TestTimeout = TimeSpan.FromSeconds(5);
/// <summary>Verifies that completing startup with hello and ready transitions the client to ready state.</summary>
[Fact]
public async Task CompleteStartupAsync_WithHelloAndReady_TransitionsClientToReady()
{
await using FakeWorkerHarness fakeWorker = await FakeWorkerHarness.CreateConnectedPairAsync();
await using WorkerClient client = fakeWorker.CreateClient();
Task startTask = client.StartAsync(CancellationToken.None);
WorkerEnvelope gatewayHello = await fakeWorker.CompleteStartupAsync();
await startTask.WaitAsync(TestTimeout);
Assert.Equal(WorkerEnvelope.BodyOneofCase.GatewayHello, gatewayHello.BodyCase);
Assert.Equal(FakeWorkerHarness.DefaultNonce, gatewayHello.GatewayHello.Nonce);
Assert.Equal(WorkerClientState.Ready, client.State);
Assert.Equal(FakeWorkerHarness.DefaultWorkerProcessId, client.ProcessId);
}
/// <summary>Verifies that a protocol version mismatch during startup fails the client.</summary>
[Fact]
public async Task StartAsync_WithProtocolMismatch_FailsStartup()
{
await using FakeWorkerHarness fakeWorker = await FakeWorkerHarness.CreateConnectedPairAsync();
await using WorkerClient client = fakeWorker.CreateClient();
Task startTask = client.StartAsync(CancellationToken.None);
WorkerEnvelope gatewayHello = await fakeWorker.ReadGatewayEnvelopeAsync();
Assert.Equal(WorkerEnvelope.BodyOneofCase.GatewayHello, gatewayHello.BodyCase);
await fakeWorker.SendWorkerHelloAsync(
workerProtocolVersion: GatewayContractInfo.WorkerProtocolVersion + 1);
WorkerClientException exception = await Assert.ThrowsAsync<WorkerClientException>(
async () => await startTask.WaitAsync(TestTimeout));
Assert.Equal(WorkerClientErrorCode.ProtocolViolation, exception.ErrorCode);
}
/// <summary>Verifies that a scripted reply completes a pending command invocation.</summary>
[Fact]
public async Task InvokeAsync_WithScriptedReply_CompletesCommand()
{
await using FakeWorkerHarness fakeWorker = await FakeWorkerHarness.CreateConnectedPairAsync();
await using WorkerClient client = fakeWorker.CreateClient();
await StartClientAsync(fakeWorker, client);
Task<WorkerCommandReply> invokeTask = client.InvokeAsync(
CreateCommand(MxCommandKind.Ping),
TestTimeout,
CancellationToken.None);
WorkerEnvelope commandEnvelope = await fakeWorker.ReadCommandAsync();
await fakeWorker.ReplyToCommandAsync(commandEnvelope);
WorkerCommandReply reply = await invokeTask.WaitAsync(TestTimeout);
Assert.Equal(commandEnvelope.CorrelationId, reply.Reply.CorrelationId);
Assert.Equal(MxCommandKind.Ping, reply.Reply.Kind);
Assert.Equal(ProtocolStatusCode.Ok, reply.Reply.ProtocolStatus.Code);
}
/// <summary>Verifies that scripted events are yielded in order through the event stream.</summary>
[Fact]
public async Task ReadEventsAsync_WithScriptedEvents_YieldsOrderedEvents()
{
await using FakeWorkerHarness fakeWorker = await FakeWorkerHarness.CreateConnectedPairAsync();
await using WorkerClient client = fakeWorker.CreateClient();
await StartClientAsync(fakeWorker, client);
using CancellationTokenSource cancellationTokenSource = new(TestTimeout);
await using IAsyncEnumerator<WorkerEvent> events =
client.ReadEventsAsync(cancellationTokenSource.Token).GetAsyncEnumerator(cancellationTokenSource.Token);
await fakeWorker.EmitEventAsync(MxEventFamily.OnDataChange, cancellationTokenSource.Token);
await fakeWorker.EmitEventAsync(MxEventFamily.OperationComplete, cancellationTokenSource.Token);
Assert.True(await events.MoveNextAsync());
Assert.Equal((ulong)3, events.Current.Event.WorkerSequence);
Assert.Equal(MxEventFamily.OnDataChange, events.Current.Event.Family);
Assert.True(await events.MoveNextAsync());
Assert.Equal((ulong)4, events.Current.Event.WorkerSequence);
Assert.Equal(MxEventFamily.OperationComplete, events.Current.Event.Family);
}
/// <summary>Verifies that a scripted fault from the worker faults the client.</summary>
[Fact]
public async Task ReadLoop_WithScriptedFault_FaultsClient()
{
await using FakeWorkerHarness fakeWorker = await FakeWorkerHarness.CreateConnectedPairAsync();
await using WorkerClient client = fakeWorker.CreateClient();
await StartClientAsync(fakeWorker, client);
await fakeWorker.EmitFaultAsync(
WorkerFaultCategory.MxaccessCommandFailed,
"scripted MXAccess command fault");
await WaitUntilAsync(
() => client.State == WorkerClientState.Faulted,
TestTimeout);
Assert.Equal(WorkerClientState.Faulted, client.State);
}
/// <summary>
/// Verifies that sending a heartbeat updates the client heartbeat state. Uses a
/// <see cref="ManualTimeProvider"/> so the timestamp advance is deterministic rather
/// than relying on a wall-clock <c>Task.Delay</c> exceeding clock resolution.
/// </summary>
[Fact]
public async Task SendHeartbeatAsync_UpdatesClientHeartbeatState()
{
ManualTimeProvider clock = new(DateTimeOffset.Parse("2026-05-18T12:00:00Z", System.Globalization.CultureInfo.InvariantCulture));
await using FakeWorkerHarness fakeWorker = await FakeWorkerHarness.CreateConnectedPairAsync();
await using WorkerClient client = fakeWorker.CreateClient(timeProvider: clock);
await StartClientAsync(fakeWorker, client);
DateTimeOffset previousHeartbeat = client.LastHeartbeatAt;
clock.Advance(TimeSpan.FromSeconds(1));
await fakeWorker.SendHeartbeatAsync(
configureHeartbeat: heartbeat => heartbeat.WorkerProcessId = 2468);
await WaitUntilAsync(
() => client.ProcessId == 2468 && client.LastHeartbeatAt > previousHeartbeat,
TestTimeout);
Assert.Equal(WorkerClientState.Ready, client.State);
Assert.Equal(previousHeartbeat + TimeSpan.FromSeconds(1), client.LastHeartbeatAt);
}
/// <summary>Verifies that a hung worker times out pending command invocations.</summary>
[Fact]
public async Task InvokeAsync_WithHungWorker_TimesOutPendingCommand()
{
await using FakeWorkerHarness fakeWorker = await FakeWorkerHarness.CreateConnectedPairAsync();
await using WorkerClient client = fakeWorker.CreateClient();
await StartClientAsync(fakeWorker, client);
Task<WorkerCommandReply> invokeTask = client.InvokeAsync(
CreateCommand(MxCommandKind.Ping),
TimeSpan.FromMilliseconds(50),
CancellationToken.None);
WorkerEnvelope commandEnvelope = await fakeWorker.ReadCommandAsync();
WorkerClientException exception = await Assert.ThrowsAsync<WorkerClientException>(
async () => await invokeTask.WaitAsync(TestTimeout));
Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerCommand, commandEnvelope.BodyCase);
Assert.Equal(WorkerClientErrorCode.CommandTimeout, exception.ErrorCode);
}
/// <summary>Verifies that a malformed frame in the read loop faults the client.</summary>
[Fact]
public async Task ReadLoop_WithMalformedFrame_FaultsClient()
{
await using FakeWorkerHarness fakeWorker = await FakeWorkerHarness.CreateConnectedPairAsync();
await using WorkerClient client = fakeWorker.CreateClient();
await StartClientAsync(fakeWorker, client);
await fakeWorker.WriteMalformedPayloadAsync(new byte[] { 0x08, 0x96, 0x01 });
await WaitUntilAsync(
() => client.State == WorkerClientState.Faulted,
TestTimeout);
Assert.Equal(WorkerClientState.Faulted, client.State);
}
/// <summary>Verifies that a shutdown acknowledgment from the worker closes the client.</summary>
[Fact]
public async Task ShutdownAsync_WithShutdownAck_ClosesClient()
{
await using FakeWorkerHarness fakeWorker = await FakeWorkerHarness.CreateConnectedPairAsync();
await using WorkerClient client = fakeWorker.CreateClient();
await StartClientAsync(fakeWorker, client);
Task shutdownTask = client.ShutdownAsync(TestTimeout, CancellationToken.None);
WorkerEnvelope shutdownEnvelope = await fakeWorker.ReadShutdownAsync();
await fakeWorker.SendShutdownAckAsync();
await shutdownTask.WaitAsync(TestTimeout);
Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerShutdown, shutdownEnvelope.BodyCase);
Assert.Equal(WorkerClientState.Closed, client.State);
}
private static async Task StartClientAsync(
FakeWorkerHarness fakeWorker,
WorkerClient client)
{
Task startTask = client.StartAsync(CancellationToken.None);
await fakeWorker.CompleteStartupAsync().ConfigureAwait(false);
await startTask.WaitAsync(TestTimeout).ConfigureAwait(false);
}
private static WorkerCommand CreateCommand(MxCommandKind kind)
{
return new WorkerCommand
{
Command = new MxCommand
{
Kind = kind,
},
};
}
private static async Task WaitUntilAsync(
Func<bool> predicate,
TimeSpan timeout)
{
using CancellationTokenSource cancellationTokenSource = new(timeout);
while (!predicate())
{
await Task.Delay(TimeSpan.FromMilliseconds(10), cancellationTokenSource.Token);
}
}
}
@@ -0,0 +1,483 @@
using System.Buffers.Binary;
using System.IO.Pipes;
using Google.Protobuf.WellKnownTypes;
using ZB.MOM.WW.MxGateway.Contracts;
using ZB.MOM.WW.MxGateway.Contracts.Proto;
using ZB.MOM.WW.MxGateway.Server.Metrics;
using ZB.MOM.WW.MxGateway.Server.Workers;
namespace ZB.MOM.WW.MxGateway.Tests.Gateway.Workers.Fakes;
public sealed class FakeWorkerHarness : IAsyncDisposable
{
public const string DefaultSessionId = "session-fake-worker";
public const string DefaultNonce = "nonce-fake-worker";
public const int DefaultWorkerProcessId = 9321;
private readonly NamedPipeServerStream? _gatewayStream;
private readonly NamedPipeClientStream _workerStream;
private readonly WorkerFrameProtocolOptions _frameOptions;
private readonly WorkerFrameReader _reader;
private readonly WorkerFrameWriter _writer;
private bool _workerSideDisposed;
private FakeWorkerHarness(
string sessionId,
string nonce,
NamedPipeServerStream? gatewayStream,
NamedPipeClientStream workerStream,
WorkerFrameProtocolOptions frameOptions)
{
SessionId = sessionId;
Nonce = nonce;
_gatewayStream = gatewayStream;
_workerStream = workerStream;
_frameOptions = frameOptions;
_reader = new WorkerFrameReader(_workerStream, frameOptions);
_writer = new WorkerFrameWriter(_workerStream, frameOptions);
}
/// <summary>Gets the session ID for the fake worker harness.</summary>
public string SessionId { get; }
/// <summary>Gets the nonce for the fake worker harness.</summary>
public string Nonce { get; }
/// <summary>Gets or sets the next worker sequence number.</summary>
public ulong NextWorkerSequence { get; private set; }
/// <summary>Creates a connected pair of fake worker harness with gateway and worker pipes.</summary>
/// <param name="sessionId">Identifier for the fake session.</param>
/// <param name="nonce">Nonce for session validation.</param>
/// <param name="protocolVersion">Protocol version for frame communication.</param>
/// <param name="maxMessageBytes">Maximum message size in bytes.</param>
/// <param name="cancellationToken">Token to cancel the asynchronous operation.</param>
public static async Task<FakeWorkerHarness> CreateConnectedPairAsync(
string sessionId = DefaultSessionId,
string nonce = DefaultNonce,
uint protocolVersion = GatewayContractInfo.WorkerProtocolVersion,
int maxMessageBytes = WorkerFrameProtocolOptions.DefaultMaxMessageBytes,
CancellationToken cancellationToken = default)
{
string pipeName = $"mxaccessgw-fake-worker-{Guid.NewGuid():N}";
NamedPipeServerStream gatewayStream = new(
pipeName,
PipeDirection.InOut,
maxNumberOfServerInstances: 1,
PipeTransmissionMode.Byte,
PipeOptions.Asynchronous);
NamedPipeClientStream workerStream = CreateWorkerStream(pipeName);
Task waitForConnectionTask = gatewayStream.WaitForConnectionAsync(cancellationToken);
await workerStream.ConnectAsync(cancellationToken).ConfigureAwait(false);
await waitForConnectionTask.ConfigureAwait(false);
return new FakeWorkerHarness(
sessionId,
nonce,
gatewayStream,
workerStream,
new WorkerFrameProtocolOptions(sessionId, protocolVersion, maxMessageBytes));
}
/// <summary>Connects to an existing gateway pipe as a fake worker harness.</summary>
/// <param name="sessionId">Identifier for the fake session.</param>
/// <param name="nonce">Nonce for session validation.</param>
/// <param name="pipeName">Name of the named pipe to connect to.</param>
/// <param name="protocolVersion">Protocol version for frame communication.</param>
/// <param name="maxMessageBytes">Maximum message size in bytes.</param>
/// <param name="cancellationToken">Token to cancel the asynchronous operation.</param>
public static async Task<FakeWorkerHarness> ConnectToGatewayPipeAsync(
string sessionId,
string nonce,
string pipeName,
uint protocolVersion = GatewayContractInfo.WorkerProtocolVersion,
int maxMessageBytes = WorkerFrameProtocolOptions.DefaultMaxMessageBytes,
CancellationToken cancellationToken = default)
{
NamedPipeClientStream workerStream = CreateWorkerStream(pipeName);
await workerStream.ConnectAsync(cancellationToken).ConfigureAwait(false);
return new FakeWorkerHarness(
sessionId,
nonce,
gatewayStream: null,
workerStream,
new WorkerFrameProtocolOptions(sessionId, protocolVersion, maxMessageBytes));
}
/// <summary>Creates a worker client connected to the fake worker harness.</summary>
/// <param name="options">Configuration options for the worker client.</param>
/// <param name="metrics">Gateway metrics collector.</param>
/// <param name="timeProvider">Time provider for timestamps.</param>
/// <returns>A configured worker client connected to this harness.</returns>
public WorkerClient CreateClient(
WorkerClientOptions? options = null,
GatewayMetrics? metrics = null,
TimeProvider? timeProvider = null)
{
if (_gatewayStream is null)
{
throw new InvalidOperationException("This fake worker is connected to a gateway-owned pipe.");
}
WorkerClientConnection connection = new(
SessionId,
Nonce,
_gatewayStream,
_frameOptions);
return new WorkerClient(connection, options, metrics, timeProvider);
}
/// <summary>Completes the worker startup handshake by reading the gateway hello and sending worker hello and ready.</summary>
/// <param name="workerProcessId">Process ID of the fake worker.</param>
/// <param name="workerVersion">Version string of the fake worker.</param>
/// <param name="mxaccessProgid">MXAccess COM ProgID.</param>
/// <param name="mxaccessClsid">MXAccess COM CLSID.</param>
/// <param name="cancellationToken">Token to cancel the asynchronous operation.</param>
/// <returns>The gateway hello envelope received during startup.</returns>
public async Task<WorkerEnvelope> CompleteStartupAsync(
int workerProcessId = DefaultWorkerProcessId,
string workerVersion = "fake-worker",
string mxaccessProgid = "LMXProxy.LMXProxyServer.1",
string mxaccessClsid = "{C30B52F5-2CB5-4760-AF0A-3A344A7EB5DC}",
CancellationToken cancellationToken = default)
{
WorkerEnvelope gatewayHello = await ReadGatewayEnvelopeAsync(cancellationToken).ConfigureAwait(false);
if (gatewayHello.BodyCase != WorkerEnvelope.BodyOneofCase.GatewayHello)
{
throw new InvalidOperationException($"Expected GatewayHello but received {gatewayHello.BodyCase}.");
}
await SendWorkerHelloAsync(
workerProcessId,
workerVersion,
cancellationToken: cancellationToken).ConfigureAwait(false);
await SendWorkerReadyAsync(
workerProcessId,
mxaccessProgid,
mxaccessClsid,
cancellationToken).ConfigureAwait(false);
return gatewayHello;
}
/// <summary>Reads the next gateway envelope from the worker stream.</summary>
/// <param name="cancellationToken">Token to cancel the asynchronous operation.</param>
/// <returns>The gateway envelope read from the stream.</returns>
public async Task<WorkerEnvelope> ReadGatewayEnvelopeAsync(CancellationToken cancellationToken = default)
{
return await _reader.ReadAsync(cancellationToken).ConfigureAwait(false);
}
/// <summary>Reads the next command from the worker stream.</summary>
/// <param name="cancellationToken">Token to cancel the asynchronous operation.</param>
/// <returns>The command envelope read from the stream.</returns>
public async Task<WorkerEnvelope> ReadCommandAsync(CancellationToken cancellationToken = default)
{
WorkerEnvelope envelope = await ReadGatewayEnvelopeAsync(cancellationToken).ConfigureAwait(false);
if (envelope.BodyCase != WorkerEnvelope.BodyOneofCase.WorkerCommand)
{
throw new InvalidOperationException($"Expected WorkerCommand but received {envelope.BodyCase}.");
}
return envelope;
}
/// <summary>Reads the next shutdown request from the worker stream.</summary>
/// <param name="cancellationToken">Token to cancel the asynchronous operation.</param>
/// <returns>The shutdown envelope read from the stream.</returns>
public async Task<WorkerEnvelope> ReadShutdownAsync(CancellationToken cancellationToken = default)
{
WorkerEnvelope envelope = await ReadGatewayEnvelopeAsync(cancellationToken).ConfigureAwait(false);
if (envelope.BodyCase != WorkerEnvelope.BodyOneofCase.WorkerShutdown)
{
throw new InvalidOperationException($"Expected WorkerShutdown but received {envelope.BodyCase}.");
}
return envelope;
}
/// <summary>Sends a worker hello message to the gateway.</summary>
/// <param name="workerProcessId">Process ID of the fake worker.</param>
/// <param name="workerVersion">Version string of the fake worker.</param>
/// <param name="workerProtocolVersion">Protocol version override.</param>
/// <param name="nonce">Nonce override.</param>
/// <param name="cancellationToken">Token to cancel the asynchronous operation.</param>
public async Task SendWorkerHelloAsync(
int workerProcessId = DefaultWorkerProcessId,
string workerVersion = "fake-worker",
uint? workerProtocolVersion = null,
string? nonce = null,
CancellationToken cancellationToken = default)
{
await _writer.WriteAsync(
CreateEnvelope(
correlationId: string.Empty,
envelope => envelope.WorkerHello = new WorkerHello
{
ProtocolVersion = workerProtocolVersion ?? _frameOptions.ProtocolVersion,
Nonce = nonce ?? Nonce,
WorkerProcessId = workerProcessId,
WorkerVersion = workerVersion,
}),
cancellationToken).ConfigureAwait(false);
}
/// <summary>Sends a worker ready message to the gateway.</summary>
/// <param name="workerProcessId">Process ID of the fake worker.</param>
/// <param name="mxaccessProgid">MXAccess COM ProgID.</param>
/// <param name="mxaccessClsid">MXAccess COM CLSID.</param>
/// <param name="cancellationToken">Token to cancel the asynchronous operation.</param>
public async Task SendWorkerReadyAsync(
int workerProcessId = DefaultWorkerProcessId,
string mxaccessProgid = "LMXProxy.LMXProxyServer.1",
string mxaccessClsid = "{C30B52F5-2CB5-4760-AF0A-3A344A7EB5DC}",
CancellationToken cancellationToken = default)
{
await _writer.WriteAsync(
CreateEnvelope(
correlationId: string.Empty,
envelope => envelope.WorkerReady = new WorkerReady
{
WorkerProcessId = workerProcessId,
MxaccessProgid = mxaccessProgid,
MxaccessClsid = mxaccessClsid,
ReadyTimestamp = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow),
}),
cancellationToken).ConfigureAwait(false);
}
/// <summary>Sends a reply to a command received from the gateway.</summary>
/// <param name="commandEnvelope">The command envelope to reply to.</param>
/// <param name="statusCode">Protocol status code for the reply.</param>
/// <param name="statusMessage">Human-readable status message.</param>
/// <param name="configureReply">Optional callback to customize the reply.</param>
/// <param name="cancellationToken">Token to cancel the asynchronous operation.</param>
public async Task ReplyToCommandAsync(
WorkerEnvelope commandEnvelope,
ProtocolStatusCode statusCode = ProtocolStatusCode.Ok,
string statusMessage = "OK",
Action<MxCommandReply>? configureReply = null,
CancellationToken cancellationToken = default)
{
if (commandEnvelope.BodyCase != WorkerEnvelope.BodyOneofCase.WorkerCommand)
{
throw new ArgumentException("Command envelope must contain WorkerCommand.", nameof(commandEnvelope));
}
MxCommandKind kind = commandEnvelope.WorkerCommand.Command?.Kind ?? MxCommandKind.Unspecified;
MxCommandReply reply = new()
{
SessionId = SessionId,
CorrelationId = commandEnvelope.CorrelationId,
Kind = kind,
ProtocolStatus = new ProtocolStatus
{
Code = statusCode,
Message = statusMessage,
},
};
configureReply?.Invoke(reply);
await _writer.WriteAsync(
CreateEnvelope(
commandEnvelope.CorrelationId,
envelope => envelope.WorkerCommandReply = new WorkerCommandReply
{
Reply = reply,
CompletedTimestamp = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow),
}),
cancellationToken).ConfigureAwait(false);
}
/// <summary>Emits an event to the gateway.</summary>
/// <param name="family">Family of the event to emit.</param>
/// <param name="cancellationToken">Token to cancel the asynchronous operation.</param>
/// <param name="configureEvent">Optional callback to customize the event.</param>
public async Task EmitEventAsync(
MxEventFamily family,
CancellationToken cancellationToken = default,
Action<MxEvent>? configureEvent = null)
{
ulong sequence = NextWorkerSequence + 1;
MxEvent mxEvent = new()
{
SessionId = SessionId,
Family = family,
WorkerSequence = sequence,
WorkerTimestamp = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow),
};
configureEvent?.Invoke(mxEvent);
await _writer.WriteAsync(
CreateEnvelope(
correlationId: string.Empty,
envelope => envelope.WorkerEvent = new WorkerEvent
{
Event = mxEvent,
}),
cancellationToken).ConfigureAwait(false);
}
/// <summary>Emits a fault message to the gateway.</summary>
/// <param name="category">Category of the fault.</param>
/// <param name="diagnosticMessage">Diagnostic message describing the fault.</param>
/// <param name="cancellationToken">Token to cancel the asynchronous operation.</param>
public async Task EmitFaultAsync(
WorkerFaultCategory category,
string diagnosticMessage,
CancellationToken cancellationToken = default)
{
await _writer.WriteAsync(
CreateEnvelope(
correlationId: string.Empty,
envelope => envelope.WorkerFault = new WorkerFault
{
Category = category,
DiagnosticMessage = diagnosticMessage,
ProtocolStatus = new ProtocolStatus
{
Code = ProtocolStatusCode.WorkerUnavailable,
Message = diagnosticMessage,
},
}),
cancellationToken).ConfigureAwait(false);
}
/// <summary>Sends a heartbeat message to the gateway.</summary>
/// <param name="state">Current worker state.</param>
/// <param name="cancellationToken">Token to cancel the asynchronous operation.</param>
/// <param name="configureHeartbeat">Optional callback to customize the heartbeat.</param>
public async Task SendHeartbeatAsync(
WorkerState state = WorkerState.Ready,
CancellationToken cancellationToken = default,
Action<WorkerHeartbeat>? configureHeartbeat = null)
{
WorkerHeartbeat heartbeat = new()
{
WorkerProcessId = DefaultWorkerProcessId,
State = state,
LastStaActivityTimestamp = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow),
};
configureHeartbeat?.Invoke(heartbeat);
await _writer.WriteAsync(
CreateEnvelope(
correlationId: string.Empty,
envelope => envelope.WorkerHeartbeat = heartbeat),
cancellationToken).ConfigureAwait(false);
}
/// <summary>Sends a shutdown acknowledgment message to the gateway.</summary>
/// <param name="statusCode">Protocol status code for the acknowledgment.</param>
/// <param name="cancellationToken">Token to cancel the asynchronous operation.</param>
public async Task SendShutdownAckAsync(
ProtocolStatusCode statusCode = ProtocolStatusCode.Ok,
CancellationToken cancellationToken = default)
{
await _writer.WriteAsync(
CreateEnvelope(
correlationId: string.Empty,
envelope => envelope.WorkerShutdownAck = new WorkerShutdownAck
{
Status = new ProtocolStatus
{
Code = statusCode,
Message = statusCode.ToString(),
},
}),
cancellationToken).ConfigureAwait(false);
}
/// <summary>Writes a malformed payload directly to the worker stream.</summary>
/// <param name="payload">Malformed payload bytes to write.</param>
/// <param name="cancellationToken">Token to cancel the asynchronous operation.</param>
public async Task WriteMalformedPayloadAsync(
ReadOnlyMemory<byte> payload,
CancellationToken cancellationToken = default)
{
if (payload.IsEmpty)
{
throw new ArgumentException("Malformed payload must include at least one byte.", nameof(payload));
}
byte[] lengthPrefix = new byte[sizeof(uint)];
BinaryPrimitives.WriteUInt32LittleEndian(lengthPrefix, (uint)payload.Length);
await _workerStream.WriteAsync(lengthPrefix, cancellationToken).ConfigureAwait(false);
await _workerStream.WriteAsync(payload, cancellationToken).ConfigureAwait(false);
}
/// <summary>Writes an oversized frame header to the worker stream for testing frame size limits.</summary>
/// <param name="payloadLength">Length of the oversized payload in bytes.</param>
/// <param name="cancellationToken">Token to cancel the asynchronous operation.</param>
public async Task WriteOversizedFrameHeaderAsync(
uint payloadLength,
CancellationToken cancellationToken = default)
{
if (payloadLength <= _frameOptions.MaxMessageBytes)
{
throw new ArgumentOutOfRangeException(
nameof(payloadLength),
payloadLength,
"Payload length must exceed the configured maximum.");
}
byte[] lengthPrefix = new byte[sizeof(uint)];
BinaryPrimitives.WriteUInt32LittleEndian(lengthPrefix, payloadLength);
await _workerStream.WriteAsync(lengthPrefix, cancellationToken).ConfigureAwait(false);
}
/// <summary>Disposes the worker-side stream.</summary>
public async ValueTask DisposeWorkerSideAsync()
{
if (_workerSideDisposed)
{
return;
}
await _workerStream.DisposeAsync().ConfigureAwait(false);
_workerSideDisposed = true;
}
/// <inheritdoc />
public async ValueTask DisposeAsync()
{
await DisposeWorkerSideAsync().ConfigureAwait(false);
if (_gatewayStream is not null)
{
await _gatewayStream.DisposeAsync().ConfigureAwait(false);
}
}
private WorkerEnvelope CreateEnvelope(
string correlationId,
Action<WorkerEnvelope> setBody)
{
WorkerEnvelope envelope = new()
{
ProtocolVersion = _frameOptions.ProtocolVersion,
SessionId = SessionId,
Sequence = AdvanceSequence(),
CorrelationId = correlationId,
};
setBody(envelope);
return envelope;
}
private ulong AdvanceSequence()
{
return ++NextWorkerSequence;
}
private static NamedPipeClientStream CreateWorkerStream(string pipeName)
{
return new NamedPipeClientStream(
".",
pipeName,
PipeDirection.InOut,
PipeOptions.Asynchronous);
}
}
@@ -0,0 +1,137 @@
using Microsoft.Extensions.Options;
using ZB.MOM.WW.MxGateway.Server.Configuration;
using ZB.MOM.WW.MxGateway.Server.Metrics;
using ZB.MOM.WW.MxGateway.Server.Workers;
namespace ZB.MOM.WW.MxGateway.Tests.Gateway.Workers;
/// <summary>
/// Server-002 regression: per <c>gateway.md</c> the gateway must terminate
/// orphaned worker processes on startup. These tests pin that the terminator
/// kills leftover workers (matched by executable path, or by image name when
/// the path is unreadable) without touching unrelated processes or itself.
/// </summary>
public sealed class OrphanWorkerTerminatorTests
{
private const string WorkerExecutablePath = @"C:\app\src\ZB.MOM.WW.MxGateway.Worker\bin\x86\Release\ZB.MOM.WW.MxGateway.Worker.exe";
[Fact]
public void TerminateOrphans_KillsWorkerProcessesMatchingConfiguredExecutablePath()
{
FakeProcessInspector inspector = new(
[
new RunningProcessInfo(101, WorkerExecutablePath),
new RunningProcessInfo(102, WorkerExecutablePath),
]);
OrphanWorkerTerminator terminator = CreateTerminator(inspector);
int killed = terminator.TerminateOrphans();
Assert.Equal(2, killed);
Assert.Equal([101, 102], inspector.KilledProcessIds.Order());
}
[Fact]
public void TerminateOrphans_KillsImageNameMatchWhenExecutablePathUnreadable()
{
// The x64 gateway cannot introspect the x86 worker's main module, so the
// path comes back null. Image-name match is the only signal — and it is
// exactly the orphan worker case, so the process must still be killed.
FakeProcessInspector inspector = new(
[
new RunningProcessInfo(201, ExecutablePath: null),
]);
OrphanWorkerTerminator terminator = CreateTerminator(inspector);
int killed = terminator.TerminateOrphans();
Assert.Equal(1, killed);
Assert.Equal([201], inspector.KilledProcessIds);
}
[Fact]
public void TerminateOrphans_DoesNotKillUnrelatedProcessSharingImageName()
{
// A process with the same image name but a different executable path is
// not our worker and must be left alone.
FakeProcessInspector inspector = new(
[
new RunningProcessInfo(301, @"C:\other\place\ZB.MOM.WW.MxGateway.Worker.exe"),
]);
OrphanWorkerTerminator terminator = CreateTerminator(inspector);
int killed = terminator.TerminateOrphans();
Assert.Equal(0, killed);
Assert.Empty(inspector.KilledProcessIds);
}
[Fact]
public void TerminateOrphans_DoesNotKillCurrentProcess()
{
FakeProcessInspector inspector = new(
[
new RunningProcessInfo(Environment.ProcessId, WorkerExecutablePath),
]);
OrphanWorkerTerminator terminator = CreateTerminator(inspector);
int killed = terminator.TerminateOrphans();
Assert.Equal(0, killed);
Assert.Empty(inspector.KilledProcessIds);
}
[Fact]
public void TerminateOrphans_ContinuesWhenOneKillThrows()
{
FakeProcessInspector inspector = new(
[
new RunningProcessInfo(401, WorkerExecutablePath),
new RunningProcessInfo(402, WorkerExecutablePath),
])
{
ThrowOnKillProcessId = 401,
};
OrphanWorkerTerminator terminator = CreateTerminator(inspector);
int killed = terminator.TerminateOrphans();
Assert.Equal(1, killed);
Assert.Contains(402, inspector.KilledProcessIds);
}
private static OrphanWorkerTerminator CreateTerminator(IRunningProcessInspector inspector)
{
GatewayOptions options = new()
{
Worker = new WorkerOptions
{
ExecutablePath = WorkerExecutablePath,
},
};
return new OrphanWorkerTerminator(
Options.Create(options),
inspector,
new GatewayMetrics());
}
private sealed class FakeProcessInspector(IReadOnlyList<RunningProcessInfo> processes)
: IRunningProcessInspector
{
public List<int> KilledProcessIds { get; } = [];
public int? ThrowOnKillProcessId { get; init; }
public IReadOnlyList<RunningProcessInfo> GetProcessesByName(string processName) => processes;
public void Kill(int processId)
{
if (ThrowOnKillProcessId == processId)
{
throw new InvalidOperationException("Process has already exited.");
}
KilledProcessIds.Add(processId);
}
}
}
@@ -0,0 +1,801 @@
using System.IO.Pipes;
using Google.Protobuf.WellKnownTypes;
using ZB.MOM.WW.MxGateway.Contracts;
using ZB.MOM.WW.MxGateway.Contracts.Proto;
using ZB.MOM.WW.MxGateway.Server.Metrics;
using ZB.MOM.WW.MxGateway.Server.Workers;
using ZB.MOM.WW.MxGateway.Tests.TestSupport;
namespace ZB.MOM.WW.MxGateway.Tests.Gateway.Workers;
public sealed class WorkerClientTests
{
private const string SessionId = "session-worker-client";
private const string Nonce = "nonce-worker-client";
private const int WorkerProcessId = 4321;
private static readonly TimeSpan TestTimeout = TimeSpan.FromSeconds(5);
/// <summary>Verifies that StartAsync enters ready state after receiving worker hello and ready messages.</summary>
[Fact]
public async Task StartAsync_WithWorkerHelloAndReady_EntersReadyState()
{
await using PipePair pipePair = await PipePair.CreateAsync();
await using WorkerClient client = CreateClient(pipePair);
await CompleteHandshakeAsync(client, pipePair);
Assert.Equal(WorkerClientState.Ready, client.State);
Assert.Equal(WorkerProcessId, client.ProcessId);
}
/// <summary>Verifies that InvokeAsync completes a pending command when a matching reply arrives.</summary>
[Fact]
public async Task InvokeAsync_WithMatchingReply_CompletesPendingCommand()
{
await using PipePair pipePair = await PipePair.CreateAsync();
await using WorkerClient client = CreateClient(pipePair);
await CompleteHandshakeAsync(client, pipePair);
Task<WorkerCommandReply> invokeTask = client.InvokeAsync(
CreateCommand(MxCommandKind.Ping),
TestTimeout,
CancellationToken.None);
WorkerEnvelope commandEnvelope = await pipePair.WorkerReader.ReadAsync().AsTask().WaitAsync(TestTimeout);
Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerCommand, commandEnvelope.BodyCase);
Assert.False(string.IsNullOrWhiteSpace(commandEnvelope.CorrelationId));
await pipePair.WorkerWriter.WriteAsync(
CreateCommandReplyEnvelope(commandEnvelope.CorrelationId, MxCommandKind.Ping));
WorkerCommandReply reply = await invokeTask.WaitAsync(TestTimeout);
Assert.Equal(commandEnvelope.CorrelationId, reply.Reply.CorrelationId);
Assert.Equal(MxCommandKind.Ping, reply.Reply.Kind);
}
/// <summary>Verifies that InvokeAsync ignores late replies and keeps the client ready.</summary>
[Fact]
public async Task InvokeAsync_WithLateReply_IgnoresLateReplyAndKeepsClientReady()
{
await using PipePair pipePair = await PipePair.CreateAsync();
await using WorkerClient client = CreateClient(pipePair);
await CompleteHandshakeAsync(client, pipePair);
Task<WorkerCommandReply> timedOutInvokeTask = client.InvokeAsync(
CreateCommand(MxCommandKind.Ping),
TimeSpan.FromMilliseconds(50),
CancellationToken.None);
WorkerEnvelope timedOutCommand = await pipePair.WorkerReader.ReadAsync().AsTask().WaitAsync(TestTimeout);
WorkerClientException exception = await Assert.ThrowsAsync<WorkerClientException>(
async () => await timedOutInvokeTask);
Assert.Equal(WorkerClientErrorCode.CommandTimeout, exception.ErrorCode);
// Send the stale reply for the already-timed-out command, then the second
// command's reply. The pipe is FIFO, so the read loop processes (and discards)
// the stale reply before the second reply — no fixed Task.Delay needed.
await pipePair.WorkerWriter.WriteAsync(
CreateCommandReplyEnvelope(timedOutCommand.CorrelationId, MxCommandKind.Ping));
Task<WorkerCommandReply> secondInvokeTask = client.InvokeAsync(
CreateCommand(MxCommandKind.GetWorkerInfo),
TestTimeout,
CancellationToken.None);
WorkerEnvelope secondCommand = await pipePair.WorkerReader.ReadAsync().AsTask().WaitAsync(TestTimeout);
await pipePair.WorkerWriter.WriteAsync(
CreateCommandReplyEnvelope(secondCommand.CorrelationId, MxCommandKind.GetWorkerInfo));
WorkerCommandReply reply = await secondInvokeTask.WaitAsync(TestTimeout);
Assert.Equal(WorkerClientState.Ready, client.State);
Assert.Equal(MxCommandKind.GetWorkerInfo, reply.Reply.Kind);
}
/// <summary>Verifies that ReadEventsAsync yields events in pipe order from the worker.</summary>
[Fact]
public async Task ReadEventsAsync_WithWorkerEvents_YieldsEventsInPipeOrder()
{
await using PipePair pipePair = await PipePair.CreateAsync();
await using WorkerClient client = CreateClient(pipePair);
await CompleteHandshakeAsync(client, pipePair);
using CancellationTokenSource cancellationTokenSource = new(TestTimeout);
await using IAsyncEnumerator<WorkerEvent> events =
client.ReadEventsAsync(cancellationTokenSource.Token).GetAsyncEnumerator(cancellationTokenSource.Token);
await pipePair.WorkerWriter.WriteAsync(
CreateEventEnvelope(sequence: 11, MxEventFamily.OnDataChange));
await pipePair.WorkerWriter.WriteAsync(
CreateEventEnvelope(sequence: 12, MxEventFamily.OperationComplete));
Assert.True(await events.MoveNextAsync());
Assert.Equal((ulong)11, events.Current.Event.WorkerSequence);
Assert.Equal(MxEventFamily.OnDataChange, events.Current.Event.Family);
Assert.True(await events.MoveNextAsync());
Assert.Equal((ulong)12, events.Current.Event.WorkerSequence);
Assert.Equal(MxEventFamily.OperationComplete, events.Current.Event.Family);
}
/// <summary>Verifies that the read loop faults the client when the event queue overflows.</summary>
[Fact]
public async Task ReadLoop_WhenEventQueueOverflows_FaultsClient()
{
await using PipePair pipePair = await PipePair.CreateAsync();
await using WorkerClient client = CreateClient(
pipePair,
new WorkerClientOptions
{
EventChannelCapacity = 1,
EventChannelFullModeTimeout = TimeSpan.FromMilliseconds(50),
HeartbeatGrace = TimeSpan.FromSeconds(30),
HeartbeatCheckInterval = TimeSpan.FromSeconds(30),
});
await CompleteHandshakeAsync(client, pipePair);
await pipePair.WorkerWriter.WriteAsync(
CreateEventEnvelope(sequence: 11, MxEventFamily.OnDataChange));
await pipePair.WorkerWriter.WriteAsync(
CreateEventEnvelope(sequence: 12, MxEventFamily.OnDataChange));
await WaitUntilAsync(
() => client.State == WorkerClientState.Faulted,
TestTimeout);
Assert.Equal(WorkerClientState.Faulted, client.State);
}
/// <summary>
/// Verifies that when the client faults it kills the owned worker process.
/// The assertion waits on <see cref="FakeWorkerProcess.WaitForExitAsync"/>, which
/// completes exactly when <c>Kill</c> runs, instead of polling <c>client.State</c>.
/// Polling state is racy: <see cref="WorkerClient.SetFaulted"/> publishes the
/// <c>Faulted</c> state before it calls <c>KillOwnedProcess</c>, so a state-based
/// wait can observe <c>Faulted</c> while <c>KillCount</c> is still 0.
/// </summary>
[Fact]
public async Task ReadLoop_WhenClientFaults_KillsOwnedWorkerProcess()
{
await using PipePair pipePair = await PipePair.CreateAsync();
FakeWorkerProcess process = new();
await using WorkerClient client = CreateClient(
pipePair,
new WorkerClientOptions
{
EventChannelCapacity = 1,
EventChannelFullModeTimeout = TimeSpan.FromMilliseconds(50),
HeartbeatGrace = TimeSpan.FromSeconds(30),
HeartbeatCheckInterval = TimeSpan.FromSeconds(30),
},
processHandle: CreateProcessHandle(process));
await CompleteHandshakeAsync(client, pipePair);
await pipePair.WorkerWriter.WriteAsync(
CreateEventEnvelope(sequence: 11, MxEventFamily.OnDataChange));
await pipePair.WorkerWriter.WriteAsync(
CreateEventEnvelope(sequence: 12, MxEventFamily.OnDataChange));
// Deterministic: this completes the instant Kill() runs, with no timing window.
using CancellationTokenSource exitTimeout = new(TestTimeout);
await process.WaitForExitAsync(exitTimeout.Token);
Assert.Equal(WorkerClientState.Faulted, client.State);
Assert.Equal(1, process.KillCount);
Assert.True(process.KillEntireProcessTree);
Assert.True(process.HasExited);
}
/// <summary>
/// Verifies that a worker faulting mid-command — the pipe dropping while an
/// <see cref="WorkerClient.InvokeAsync"/> is still pending — completes the pending
/// invoke task with a <see cref="WorkerClientException"/> carrying the
/// pipe-disconnected error code rather than hanging until the command timeout.
/// </summary>
[Fact]
public async Task InvokeAsync_WhenPipeDisconnectsMidCommand_FailsPendingInvokeWithPipeDisconnected()
{
await using PipePair pipePair = await PipePair.CreateAsync();
await using WorkerClient client = CreateClient(pipePair);
await CompleteHandshakeAsync(client, pipePair);
Task<WorkerCommandReply> invokeTask = client.InvokeAsync(
CreateCommand(MxCommandKind.Ping),
TestTimeout,
CancellationToken.None);
// The worker received the command but disconnects before replying.
WorkerEnvelope commandEnvelope = await pipePair.WorkerReader.ReadAsync().AsTask().WaitAsync(TestTimeout);
Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerCommand, commandEnvelope.BodyCase);
await pipePair.DisposeWorkerSideAsync();
WorkerClientException exception = await Assert.ThrowsAsync<WorkerClientException>(
async () => await invokeTask.WaitAsync(TestTimeout));
Assert.Equal(WorkerClientErrorCode.PipeDisconnected, exception.ErrorCode);
await WaitUntilAsync(() => client.State == WorkerClientState.Faulted, TestTimeout);
Assert.Equal(WorkerClientState.Faulted, client.State);
}
/// <summary>
/// Verifies that a worker emitting a <c>WorkerFault</c> envelope while an
/// <see cref="WorkerClient.InvokeAsync"/> is pending completes the pending invoke
/// task with a <see cref="WorkerClientException"/> carrying the worker-faulted
/// error code.
/// </summary>
[Fact]
public async Task InvokeAsync_WhenWorkerFaultsMidCommand_FailsPendingInvokeWithWorkerFaulted()
{
await using PipePair pipePair = await PipePair.CreateAsync();
await using WorkerClient client = CreateClient(pipePair);
await CompleteHandshakeAsync(client, pipePair);
Task<WorkerCommandReply> invokeTask = client.InvokeAsync(
CreateCommand(MxCommandKind.Ping),
TestTimeout,
CancellationToken.None);
WorkerEnvelope commandEnvelope = await pipePair.WorkerReader.ReadAsync().AsTask().WaitAsync(TestTimeout);
Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerCommand, commandEnvelope.BodyCase);
await pipePair.WorkerWriter.WriteAsync(CreateWorkerFaultEnvelope("scripted mid-command fault"));
WorkerClientException exception = await Assert.ThrowsAsync<WorkerClientException>(
async () => await invokeTask.WaitAsync(TestTimeout));
Assert.Equal(WorkerClientErrorCode.WorkerFaulted, exception.ErrorCode);
await WaitUntilAsync(() => client.State == WorkerClientState.Faulted, TestTimeout);
Assert.Equal(WorkerClientState.Faulted, client.State);
}
[Fact]
public async Task ReadLoop_WhenPipeDisconnects_FaultsClient()
{
await using PipePair pipePair = await PipePair.CreateAsync();
await using WorkerClient client = CreateClient(pipePair);
await CompleteHandshakeAsync(client, pipePair);
await pipePair.DisposeWorkerSideAsync();
await WaitUntilAsync(
() => client.State == WorkerClientState.Faulted,
TestTimeout);
Assert.Equal(WorkerClientState.Faulted, client.State);
}
/// <summary>Verifies that the read loop stops the running worker metric when the pipe disconnects.</summary>
[Fact]
public async Task ReadLoop_WhenPipeDisconnects_StopsRunningWorkerMetric()
{
await using PipePair pipePair = await PipePair.CreateAsync();
using GatewayMetrics metrics = new();
await using WorkerClient client = CreateClient(pipePair, metrics: metrics);
await CompleteHandshakeAsync(client, pipePair);
Assert.Equal(1, metrics.GetSnapshot().WorkersRunning);
await pipePair.DisposeWorkerSideAsync();
await WaitUntilAsync(
() => client.State == WorkerClientState.Faulted
&& metrics.GetSnapshot().WorkersRunning == 0,
TestTimeout);
GatewayMetricsSnapshot snapshot = metrics.GetSnapshot();
Assert.Equal(0, snapshot.WorkersRunning);
Assert.Equal(1, snapshot.WorkerExits);
}
/// <summary>Verifies that DisposeAsync returns within a bounded timeout when the pipe read is blocked.</summary>
[Fact]
public async Task DisposeAsync_WhenPipeReadIsBlocked_ReturnsWithinBoundedTimeout()
{
await using PipePair pipePair = await PipePair.CreateAsync();
WorkerClient client = CreateClient(pipePair);
await CompleteHandshakeAsync(client, pipePair);
DateTimeOffset startedAt = DateTimeOffset.UtcNow;
await client.DisposeAsync().AsTask().WaitAsync(TestTimeout);
TimeSpan elapsed = DateTimeOffset.UtcNow - startedAt;
Assert.True(
elapsed < TimeSpan.FromSeconds(4),
$"DisposeAsync took {elapsed.TotalMilliseconds:N0}ms.");
}
/// <summary>Verifies that the read loop updates the last heartbeat and worker process when a heartbeat arrives.</summary>
[Fact]
public async Task DisposeAsync_WhenOwnedWorkerStillRuns_KillsProcessBeforeDisposing()
{
await using PipePair pipePair = await PipePair.CreateAsync();
FakeWorkerProcess process = new();
WorkerClient client = CreateClient(pipePair, processHandle: CreateProcessHandle(process));
await client.DisposeAsync().AsTask().WaitAsync(TestTimeout);
Assert.Equal(1, process.KillCount);
Assert.True(process.KillEntireProcessTree);
Assert.True(process.Disposed);
}
/// <summary>
/// Verifies that a heartbeat envelope updates the last-heartbeat timestamp and worker
/// process id. Uses a <see cref="ManualTimeProvider"/> so the timestamp advance is
/// deterministic instead of relying on a wall-clock <c>Task.Delay</c> exceeding
/// <see cref="DateTimeOffset.UtcNow"/> resolution.
/// </summary>
[Fact]
public async Task ReadLoop_WhenHeartbeatArrives_UpdatesLastHeartbeatAndWorkerProcess()
{
ManualTimeProvider clock = new(DateTimeOffset.Parse("2026-05-18T12:00:00Z", System.Globalization.CultureInfo.InvariantCulture));
await using PipePair pipePair = await PipePair.CreateAsync();
await using WorkerClient client = CreateClient(pipePair, timeProvider: clock);
await CompleteHandshakeAsync(client, pipePair);
DateTimeOffset previousHeartbeat = client.LastHeartbeatAt;
clock.Advance(TimeSpan.FromSeconds(1));
await pipePair.WorkerWriter.WriteAsync(CreateHeartbeatEnvelope(workerProcessId: 9876));
await WaitUntilAsync(
() => client.ProcessId == 9876 && client.LastHeartbeatAt > previousHeartbeat,
TestTimeout);
Assert.Equal(WorkerClientState.Ready, client.State);
Assert.Equal(previousHeartbeat + TimeSpan.FromSeconds(1), client.LastHeartbeatAt);
}
/// <summary>
/// Verifies that the heartbeat monitor faults the client when the heartbeat expires.
/// Uses an injected <see cref="ManualTimeProvider"/> so the grace comparison is deterministic
/// instead of depending on real wall-clock advance; the monitor's
/// <see cref="WorkerClientOptions.HeartbeatCheckInterval"/> timer stays on the real clock and
/// observes the manually-advanced grace on its next tick.
/// </summary>
[Fact]
public async Task HeartbeatMonitor_WhenHeartbeatExpires_FaultsClient()
{
ManualTimeProvider clock = new(DateTimeOffset.Parse("2026-05-20T12:00:00Z", System.Globalization.CultureInfo.InvariantCulture));
await using PipePair pipePair = await PipePair.CreateAsync();
await using WorkerClient client = CreateClient(
pipePair,
new WorkerClientOptions
{
HeartbeatGrace = TimeSpan.FromMilliseconds(80),
HeartbeatCheckInterval = TimeSpan.FromMilliseconds(20),
EventChannelCapacity = 8,
},
timeProvider: clock);
await CompleteHandshakeAsync(client, pipePair);
clock.Advance(TimeSpan.FromSeconds(2));
await WaitUntilAsync(
() => client.State == WorkerClientState.Faulted,
TestTimeout);
Assert.Equal(WorkerClientState.Faulted, client.State);
}
/// <summary>
/// Server-031 regression: while a command is in flight on the
/// gateway↔worker pipe and the oldest pending command is younger
/// than <see cref="WorkerClientOptions.HeartbeatStuckCeiling"/>, the
/// heartbeat watchdog must NOT fault on heartbeat-expired alone — the
/// gap is more likely caused by pipe-write contention than by a hung
/// worker. Mirrors Worker-023 on the worker side.
/// </summary>
[Fact]
public async Task HeartbeatMonitor_WhenCommandInFlightWithinCeiling_DoesNotFaultOnExpiredHeartbeat()
{
ManualTimeProvider clock = new(DateTimeOffset.Parse("2026-05-20T13:00:00Z", System.Globalization.CultureInfo.InvariantCulture));
await using PipePair pipePair = await PipePair.CreateAsync();
await using WorkerClient client = CreateClient(
pipePair,
new WorkerClientOptions
{
HeartbeatGrace = TimeSpan.FromMilliseconds(80),
HeartbeatCheckInterval = TimeSpan.FromMilliseconds(20),
EventChannelCapacity = 8,
HeartbeatStuckCeiling = TimeSpan.FromSeconds(30),
},
timeProvider: clock);
await CompleteHandshakeAsync(client, pipePair);
// Begin a command that the test never replies to — keeps the
// PendingCommand alive in `_pendingCommands` for the duration.
Task<WorkerCommandReply> pendingInvoke = client.InvokeAsync(
CreateCommand(MxCommandKind.Ping),
TestTimeout,
CancellationToken.None);
WorkerEnvelope commandEnvelope = await pipePair.WorkerReader.ReadAsync().AsTask().WaitAsync(TestTimeout);
Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerCommand, commandEnvelope.BodyCase);
// Advance well past HeartbeatGrace but well within HeartbeatStuckCeiling.
clock.Advance(TimeSpan.FromSeconds(2));
// Give the heartbeat monitor a few real check-intervals to observe the gap.
await Task.Delay(TimeSpan.FromMilliseconds(150));
Assert.Equal(WorkerClientState.Ready, client.State);
Assert.False(pendingInvoke.IsCompleted);
}
/// <summary>
/// Server-031 regression: once the oldest pending command exceeds
/// <see cref="WorkerClientOptions.HeartbeatStuckCeiling"/>, the
/// heartbeat watchdog fires anyway — a truly stuck COM call shouldn't
/// keep the watchdog suppressed indefinitely.
/// </summary>
[Fact]
public async Task HeartbeatMonitor_WhenPendingCommandExceedsStuckCeiling_FaultsClient()
{
ManualTimeProvider clock = new(DateTimeOffset.Parse("2026-05-20T13:00:00Z", System.Globalization.CultureInfo.InvariantCulture));
await using PipePair pipePair = await PipePair.CreateAsync();
await using WorkerClient client = CreateClient(
pipePair,
new WorkerClientOptions
{
HeartbeatGrace = TimeSpan.FromMilliseconds(80),
HeartbeatCheckInterval = TimeSpan.FromMilliseconds(20),
EventChannelCapacity = 8,
HeartbeatStuckCeiling = TimeSpan.FromMilliseconds(200),
},
timeProvider: clock);
await CompleteHandshakeAsync(client, pipePair);
Task<WorkerCommandReply> pendingInvoke = client.InvokeAsync(
CreateCommand(MxCommandKind.Ping),
TestTimeout,
CancellationToken.None);
await pipePair.WorkerReader.ReadAsync().AsTask().WaitAsync(TestTimeout);
// Advance the clock past HeartbeatStuckCeiling. The worker pipe's
// PendingCommand.StartTimestamp uses TimeProvider.GetTimestamp(), so the
// ManualTimeProvider's GetElapsedTime sees the advanced gap.
clock.Advance(TimeSpan.FromSeconds(2));
await WaitUntilAsync(
() => client.State == WorkerClientState.Faulted,
TestTimeout);
Assert.Equal(WorkerClientState.Faulted, client.State);
}
/// <summary>
/// Server-032 regression: a transient burst that exceeds
/// <see cref="WorkerClientOptions.EventChannelCapacity"/> must be
/// absorbed for up to <see cref="WorkerClientOptions.EventChannelFullModeTimeout"/>
/// (the channel is configured for <c>BoundedChannelFullMode.Wait</c>);
/// only when the wait elapses without progress is the worker faulted,
/// and the diagnostic must name the channel capacity, depth, and
/// actionable remediation.
/// </summary>
[Fact]
public async Task EnqueueWorkerEvent_WhenChannelFullPastTimeout_FaultsWithRichDiagnostic()
{
await using PipePair pipePair = await PipePair.CreateAsync();
await using WorkerClient client = CreateClient(
pipePair,
new WorkerClientOptions
{
EventChannelCapacity = 4,
EventChannelFullModeTimeout = TimeSpan.FromMilliseconds(100),
HeartbeatGrace = TimeSpan.FromSeconds(30),
HeartbeatCheckInterval = TimeSpan.FromSeconds(1),
});
await CompleteHandshakeAsync(client, pipePair);
// Fill the 4-slot channel and write exactly one more to force the
// overflow path. The gateway never opens a StreamEvents consumer, so
// the events stay buffered. Exactly five events are written: the
// worker client faults while reading the fifth, after which its read
// loop stops — a sixth event would never be drained and its pipe
// write would block forever on a full OS pipe buffer.
for (ulong sequence = 1; sequence <= 5; sequence++)
{
await pipePair.WorkerWriter.WriteAsync(
CreateEventEnvelope(sequence: sequence, MxEventFamily.OnDataChange));
}
await WaitUntilAsync(
() => client.State == WorkerClientState.Faulted,
TestTimeout);
Assert.Equal(WorkerClientState.Faulted, client.State);
// Reading the events channel after fault throws the propagated
// WorkerClientException carrying the rich diagnostic message. The
// drain is bounded by TestTimeout so a regression that leaves the
// channel uncompleted fails the test instead of hanging it.
using CancellationTokenSource drainTimeout = new(TestTimeout);
WorkerClientException fault = await Assert.ThrowsAsync<WorkerClientException>(async () =>
{
await foreach (WorkerEvent _ in client.ReadEventsAsync(drainTimeout.Token))
{
}
});
Assert.Contains("Worker event channel rejected", fault.Message);
Assert.Contains("of 4 capacity", fault.Message);
Assert.Contains("StreamEvents", fault.Message);
Assert.Contains("MxGateway:Events:QueueCapacity", fault.Message);
}
private static WorkerClient CreateClient(
PipePair pipePair,
WorkerClientOptions? options = null,
GatewayMetrics? metrics = null,
WorkerProcessHandle? processHandle = null,
TimeProvider? timeProvider = null)
{
WorkerFrameProtocolOptions frameOptions = new(SessionId);
WorkerClientConnection connection = new(
SessionId,
Nonce,
pipePair.GatewayStream,
frameOptions,
processHandle);
return new WorkerClient(connection, options, metrics, timeProvider);
}
private static WorkerProcessHandle CreateProcessHandle(FakeWorkerProcess process)
{
return new WorkerProcessHandle(
process,
new WorkerProcessCommandLine("ZB.MOM.WW.MxGateway.Worker.exe", []),
DateTimeOffset.UtcNow);
}
private static async Task CompleteHandshakeAsync(
WorkerClient client,
PipePair pipePair)
{
Task startTask = client.StartAsync(CancellationToken.None);
WorkerEnvelope gatewayHello = await pipePair.WorkerReader.ReadAsync().AsTask().WaitAsync(TestTimeout);
Assert.Equal(WorkerEnvelope.BodyOneofCase.GatewayHello, gatewayHello.BodyCase);
Assert.Equal(Nonce, gatewayHello.GatewayHello.Nonce);
Assert.Equal(GatewayContractInfo.WorkerProtocolVersion, gatewayHello.GatewayHello.SupportedProtocolVersion);
await pipePair.WorkerWriter.WriteAsync(CreateWorkerHelloEnvelope());
await pipePair.WorkerWriter.WriteAsync(CreateWorkerReadyEnvelope());
await startTask.WaitAsync(TestTimeout);
}
private static WorkerCommand CreateCommand(MxCommandKind kind)
{
return new WorkerCommand
{
Command = new MxCommand
{
Kind = kind,
},
};
}
private static WorkerEnvelope CreateWorkerHelloEnvelope()
{
return CreateWorkerEnvelope(
correlationId: string.Empty,
sequence: 1,
envelope => envelope.WorkerHello = new WorkerHello
{
ProtocolVersion = GatewayContractInfo.WorkerProtocolVersion,
Nonce = Nonce,
WorkerProcessId = WorkerProcessId,
WorkerVersion = "fake-worker",
});
}
private static WorkerEnvelope CreateWorkerReadyEnvelope()
{
return CreateWorkerEnvelope(
correlationId: string.Empty,
sequence: 2,
envelope => envelope.WorkerReady = new WorkerReady
{
WorkerProcessId = WorkerProcessId,
MxaccessProgid = "LMXProxy.LMXProxyServer.1",
MxaccessClsid = "{C30B52F5-2CB5-4760-AF0A-3A344A7EB5DC}",
});
}
private static WorkerEnvelope CreateCommandReplyEnvelope(
string correlationId,
MxCommandKind kind)
{
return CreateWorkerEnvelope(
correlationId,
sequence: 10,
envelope => envelope.WorkerCommandReply = new WorkerCommandReply
{
Reply = new MxCommandReply
{
SessionId = SessionId,
CorrelationId = correlationId,
Kind = kind,
},
});
}
private static WorkerEnvelope CreateEventEnvelope(
ulong sequence,
MxEventFamily family)
{
return CreateWorkerEnvelope(
correlationId: string.Empty,
sequence,
envelope => envelope.WorkerEvent = new WorkerEvent
{
Event = new MxEvent
{
SessionId = SessionId,
Family = family,
WorkerSequence = sequence,
},
});
}
private static WorkerEnvelope CreateWorkerFaultEnvelope(string diagnosticMessage)
{
return CreateWorkerEnvelope(
correlationId: string.Empty,
sequence: 30,
envelope => envelope.WorkerFault = new WorkerFault
{
Category = WorkerFaultCategory.MxaccessCommandFailed,
DiagnosticMessage = diagnosticMessage,
ProtocolStatus = new ProtocolStatus
{
Code = ProtocolStatusCode.WorkerUnavailable,
Message = diagnosticMessage,
},
});
}
private static WorkerEnvelope CreateHeartbeatEnvelope(int workerProcessId)
{
return CreateWorkerEnvelope(
correlationId: string.Empty,
sequence: 20,
envelope => envelope.WorkerHeartbeat = new WorkerHeartbeat
{
WorkerProcessId = workerProcessId,
State = WorkerState.Ready,
LastStaActivityTimestamp = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow),
PendingCommandCount = 0,
OutboundEventQueueDepth = 0,
});
}
private static WorkerEnvelope CreateWorkerEnvelope(
string correlationId,
ulong sequence,
Action<WorkerEnvelope> setBody)
{
WorkerEnvelope envelope = new()
{
ProtocolVersion = GatewayContractInfo.WorkerProtocolVersion,
SessionId = SessionId,
Sequence = sequence,
CorrelationId = correlationId,
};
setBody(envelope);
return envelope;
}
private static async Task WaitUntilAsync(
Func<bool> predicate,
TimeSpan timeout)
{
using CancellationTokenSource cancellationTokenSource = new(timeout);
while (!predicate())
{
await Task.Delay(TimeSpan.FromMilliseconds(10), cancellationTokenSource.Token);
}
}
private sealed class PipePair : IAsyncDisposable
{
private readonly NamedPipeClientStream _workerStream;
private bool _workerSideDisposed;
private PipePair(
NamedPipeServerStream gatewayStream,
NamedPipeClientStream workerStream)
{
GatewayStream = gatewayStream;
_workerStream = workerStream;
WorkerReader = new WorkerFrameReader(_workerStream, new WorkerFrameProtocolOptions(SessionId));
WorkerWriter = new WorkerFrameWriter(_workerStream, new WorkerFrameProtocolOptions(SessionId));
}
/// <summary>The gateway side of the named pipe connection.</summary>
public NamedPipeServerStream GatewayStream { get; }
/// <summary>Frame reader for worker messages.</summary>
public WorkerFrameReader WorkerReader { get; }
/// <summary>Frame writer for worker messages.</summary>
public WorkerFrameWriter WorkerWriter { get; }
/// <summary>Creates a connected pipe pair for testing.</summary>
public static async Task<PipePair> CreateAsync()
{
string pipeName = $"mxaccessgw-workerclient-tests-{Guid.NewGuid():N}";
NamedPipeServerStream gatewayStream = new(
pipeName,
PipeDirection.InOut,
maxNumberOfServerInstances: 1,
PipeTransmissionMode.Byte,
PipeOptions.Asynchronous);
NamedPipeClientStream workerStream = new(
".",
pipeName,
PipeDirection.InOut,
PipeOptions.Asynchronous);
Task waitForConnectionTask = gatewayStream.WaitForConnectionAsync();
await workerStream.ConnectAsync();
await waitForConnectionTask;
return new PipePair(gatewayStream, workerStream);
}
/// <summary>Disposes the worker side of the pipe.</summary>
public async ValueTask DisposeWorkerSideAsync()
{
if (_workerSideDisposed)
{
return;
}
await _workerStream.DisposeAsync();
_workerSideDisposed = true;
}
/// <summary>Disposes the duplex stream.</summary>
public async ValueTask DisposeAsync()
{
await DisposeWorkerSideAsync();
await GatewayStream.DisposeAsync();
}
}
private sealed class FakeWorkerProcess : IWorkerProcess
{
private readonly TaskCompletionSource _exited = new(TaskCreationOptions.RunContinuationsAsynchronously);
public int Id { get; } = WorkerProcessId;
public bool HasExited { get; private set; }
public int? ExitCode { get; private set; }
public int KillCount { get; private set; }
public bool KillEntireProcessTree { get; private set; }
public bool Disposed { get; private set; }
public ValueTask WaitForExitAsync(CancellationToken cancellationToken)
{
return new ValueTask(_exited.Task.WaitAsync(cancellationToken));
}
public void Kill(bool entireProcessTree)
{
KillCount++;
KillEntireProcessTree = entireProcessTree;
HasExited = true;
ExitCode = -1;
_exited.TrySetResult();
}
public void Dispose()
{
Disposed = true;
}
}
}
@@ -0,0 +1,141 @@
using System.Buffers.Binary;
using ZB.MOM.WW.MxGateway.Server.Configuration;
using ZB.MOM.WW.MxGateway.Server.Workers;
namespace ZB.MOM.WW.MxGateway.Tests.Gateway.Workers;
/// <summary>
/// Coverage for <see cref="WorkerExecutableValidator"/> PE-header architecture parsing
/// (finding Server-013). The validator reads the DOS <c>MZ</c> stub, follows the PE
/// header offset at <c>0x3c</c>, checks the <c>PE\0\0</c> signature, and compares the
/// machine field against the required <see cref="WorkerArchitecture"/>.
/// </summary>
public sealed class WorkerExecutableValidatorTests : IDisposable
{
private const ushort ImageFileMachineI386 = 0x014c;
private const ushort ImageFileMachineAmd64 = 0x8664;
private readonly List<string> _tempFiles = [];
[Fact]
public void Validate_X86ExecutableMatchingRequiredArchitecture_DoesNotThrow()
{
string path = WritePeFile(ImageFileMachineI386);
WorkerExecutableValidator.Validate(path, WorkerArchitecture.X86);
}
[Fact]
public void Validate_X64ExecutableMatchingRequiredArchitecture_DoesNotThrow()
{
string path = WritePeFile(ImageFileMachineAmd64);
WorkerExecutableValidator.Validate(path, WorkerArchitecture.X64);
}
[Fact]
public void Validate_X64ExecutableWhenX86Required_ThrowsInvalidExecutable()
{
string path = WritePeFile(ImageFileMachineAmd64);
WorkerProcessLaunchException exception = Assert.Throws<WorkerProcessLaunchException>(
() => WorkerExecutableValidator.Validate(path, WorkerArchitecture.X86));
Assert.Equal(WorkerProcessLaunchErrorCode.InvalidExecutable, exception.ErrorCode);
Assert.Contains("architecture", exception.Message, StringComparison.OrdinalIgnoreCase);
}
[Fact]
public void Validate_X86ExecutableWhenX64Required_ThrowsInvalidExecutable()
{
string path = WritePeFile(ImageFileMachineI386);
WorkerProcessLaunchException exception = Assert.Throws<WorkerProcessLaunchException>(
() => WorkerExecutableValidator.Validate(path, WorkerArchitecture.X64));
Assert.Equal(WorkerProcessLaunchErrorCode.InvalidExecutable, exception.ErrorCode);
}
[Fact]
public void Validate_FileWithoutMzHeader_ThrowsInvalidExecutable()
{
byte[] bytes = new byte[0x80];
// Leave the first two bytes as zero so the MZ signature check fails.
string path = WriteTempFile(bytes);
WorkerProcessLaunchException exception = Assert.Throws<WorkerProcessLaunchException>(
() => WorkerExecutableValidator.Validate(path, WorkerArchitecture.X86));
Assert.Equal(WorkerProcessLaunchErrorCode.InvalidExecutable, exception.ErrorCode);
Assert.Contains("MZ", exception.Message, StringComparison.Ordinal);
}
[Fact]
public void Validate_FileTooSmallForPeHeader_ThrowsInvalidExecutable()
{
string path = WriteTempFile([(byte)'M', (byte)'Z']);
WorkerProcessLaunchException exception = Assert.Throws<WorkerProcessLaunchException>(
() => WorkerExecutableValidator.Validate(path, WorkerArchitecture.X86));
Assert.Equal(WorkerProcessLaunchErrorCode.InvalidExecutable, exception.ErrorCode);
}
[Fact]
public void Validate_FileWithoutPeSignature_ThrowsInvalidExecutable()
{
// Build a valid MZ header pointing at a PE offset that holds a wrong signature.
byte[] bytes = new byte[0x100];
bytes[0] = (byte)'M';
bytes[1] = (byte)'Z';
BinaryPrimitives.WriteInt32LittleEndian(bytes.AsSpan(0x3c, sizeof(int)), 0x80);
// PE region left as zeros — the "PE\0\0" signature check fails.
string path = WriteTempFile(bytes);
WorkerProcessLaunchException exception = Assert.Throws<WorkerProcessLaunchException>(
() => WorkerExecutableValidator.Validate(path, WorkerArchitecture.X86));
Assert.Equal(WorkerProcessLaunchErrorCode.InvalidExecutable, exception.ErrorCode);
Assert.Contains("PE", exception.Message, StringComparison.Ordinal);
}
private string WritePeFile(ushort machine)
{
const int peHeaderOffset = 0x80;
byte[] bytes = new byte[peHeaderOffset + 6];
bytes[0] = (byte)'M';
bytes[1] = (byte)'Z';
BinaryPrimitives.WriteInt32LittleEndian(bytes.AsSpan(0x3c, sizeof(int)), peHeaderOffset);
bytes[peHeaderOffset] = (byte)'P';
bytes[peHeaderOffset + 1] = (byte)'E';
bytes[peHeaderOffset + 2] = 0;
bytes[peHeaderOffset + 3] = 0;
BinaryPrimitives.WriteUInt16LittleEndian(bytes.AsSpan(peHeaderOffset + 4, sizeof(ushort)), machine);
return WriteTempFile(bytes);
}
private string WriteTempFile(byte[] bytes)
{
string path = Path.Combine(Path.GetTempPath(), $"mxgw-pe-{Guid.NewGuid():N}.bin");
File.WriteAllBytes(path, bytes);
_tempFiles.Add(path);
return path;
}
public void Dispose()
{
foreach (string path in _tempFiles)
{
try
{
File.Delete(path);
}
catch (IOException)
{
// Best-effort cleanup of the temp PE fixtures.
}
}
_tempFiles.Clear();
}
}
@@ -0,0 +1,223 @@
using System.Buffers.Binary;
using Google.Protobuf;
using ZB.MOM.WW.MxGateway.Contracts;
using ZB.MOM.WW.MxGateway.Contracts.Proto;
using ZB.MOM.WW.MxGateway.Server.Workers;
namespace ZB.MOM.WW.MxGateway.Tests.Gateway.Workers;
public sealed class WorkerFrameProtocolTests
{
private const string SessionId = "session-1";
/// <summary>Verifies that writing and reading a valid envelope round-trips the frame correctly.</summary>
[Fact]
public async Task WriteAndReadAsync_WithValidEnvelope_RoundTripsFrame()
{
WorkerFrameProtocolOptions options = new(SessionId);
await using MemoryStream stream = new();
WorkerEnvelope original = CreateEnvelope();
WorkerFrameWriter writer = new(stream, options);
await writer.WriteAsync(original);
stream.Position = 0;
WorkerFrameReader reader = new(stream, options);
WorkerEnvelope parsed = await reader.ReadAsync();
Assert.Equal(original, parsed);
}
/// <summary>Verifies that reading a frame with partial reads reassembles the frame correctly.</summary>
[Fact]
public async Task ReadAsync_WithPartialReads_ReassemblesFrame()
{
WorkerFrameProtocolOptions options = new(SessionId);
WorkerEnvelope original = CreateEnvelope();
byte[] frame = CreateFrame(original);
await using ChunkedReadStream stream = new(frame, chunkSize: 2);
WorkerFrameReader reader = new(stream, options);
WorkerEnvelope parsed = await reader.ReadAsync();
Assert.Equal(original, parsed);
Assert.True(stream.ReadCallCount > 2);
}
/// <summary>Verifies that reading a frame with zero length throws a malformed length exception.</summary>
[Fact]
public async Task ReadAsync_WithZeroLengthFrame_ThrowsMalformedLength()
{
WorkerFrameProtocolOptions options = new(SessionId);
await using MemoryStream stream = new(new byte[sizeof(uint)]);
WorkerFrameReader reader = new(stream, options);
WorkerFrameProtocolException exception =
await Assert.ThrowsAsync<WorkerFrameProtocolException>(
async () => await reader.ReadAsync());
Assert.Equal(WorkerFrameProtocolErrorCode.MalformedLength, exception.ErrorCode);
}
/// <summary>Verifies that reading a frame with oversized length throws before allocating the payload.</summary>
[Fact]
public async Task ReadAsync_WithOversizedLength_ThrowsBeforePayloadAllocation()
{
WorkerFrameProtocolOptions options = new(SessionId, GatewayContractInfo.WorkerProtocolVersion, maxMessageBytes: 16);
byte[] lengthPrefix = new byte[sizeof(uint)];
BinaryPrimitives.WriteUInt32LittleEndian(lengthPrefix, 17);
await using MemoryStream stream = new(lengthPrefix);
WorkerFrameReader reader = new(stream, options);
WorkerFrameProtocolException exception =
await Assert.ThrowsAsync<WorkerFrameProtocolException>(
async () => await reader.ReadAsync());
Assert.Equal(WorkerFrameProtocolErrorCode.MessageTooLarge, exception.ErrorCode);
}
/// <summary>Verifies that reading a frame with wrong protocol version throws a protocol version mismatch exception.</summary>
[Fact]
public async Task ReadAsync_WithWrongProtocolVersion_ThrowsProtocolVersionMismatch()
{
WorkerFrameProtocolOptions options = new(SessionId);
WorkerEnvelope envelope = CreateEnvelope();
envelope.ProtocolVersion++;
await using MemoryStream stream = new(CreateFrame(envelope));
WorkerFrameReader reader = new(stream, options);
WorkerFrameProtocolException exception =
await Assert.ThrowsAsync<WorkerFrameProtocolException>(
async () => await reader.ReadAsync());
Assert.Equal(WorkerFrameProtocolErrorCode.ProtocolVersionMismatch, exception.ErrorCode);
}
/// <summary>Verifies that reading a frame with wrong session ID throws a session mismatch exception.</summary>
[Fact]
public async Task ReadAsync_WithWrongSessionId_ThrowsSessionMismatch()
{
WorkerFrameProtocolOptions options = new(SessionId);
WorkerEnvelope envelope = CreateEnvelope();
envelope.SessionId = "different-session";
await using MemoryStream stream = new(CreateFrame(envelope));
WorkerFrameReader reader = new(stream, options);
WorkerFrameProtocolException exception =
await Assert.ThrowsAsync<WorkerFrameProtocolException>(
async () => await reader.ReadAsync());
Assert.Equal(WorkerFrameProtocolErrorCode.SessionMismatch, exception.ErrorCode);
}
/// <summary>Verifies that reading a frame with malformed payload throws an invalid envelope exception.</summary>
[Fact]
public async Task ReadAsync_WithMalformedPayload_ThrowsInvalidEnvelope()
{
WorkerFrameProtocolOptions options = new(SessionId);
byte[] frame = CreateFrame([0x80]);
await using MemoryStream stream = new(frame);
WorkerFrameReader reader = new(stream, options);
WorkerFrameProtocolException exception =
await Assert.ThrowsAsync<WorkerFrameProtocolException>(
async () => await reader.ReadAsync());
Assert.Equal(WorkerFrameProtocolErrorCode.InvalidEnvelope, exception.ErrorCode);
}
/// <summary>Verifies that reading a frame with missing envelope body throws an invalid envelope exception.</summary>
[Fact]
public async Task ReadAsync_WithMissingEnvelopeBody_ThrowsInvalidEnvelope()
{
WorkerFrameProtocolOptions options = new(SessionId);
WorkerEnvelope envelope = CreateEnvelope();
envelope.ClearBody();
await using MemoryStream stream = new(CreateFrame(envelope));
WorkerFrameReader reader = new(stream, options);
WorkerFrameProtocolException exception =
await Assert.ThrowsAsync<WorkerFrameProtocolException>(
async () => await reader.ReadAsync());
Assert.Equal(WorkerFrameProtocolErrorCode.InvalidEnvelope, exception.ErrorCode);
}
/// <summary>Verifies that writing an oversized envelope throws a message too large exception.</summary>
[Fact]
public async Task WriteAsync_WithOversizedEnvelope_ThrowsMessageTooLarge()
{
WorkerFrameProtocolOptions options = new(SessionId, GatewayContractInfo.WorkerProtocolVersion, maxMessageBytes: 8);
await using MemoryStream stream = new();
WorkerFrameWriter writer = new(stream, options);
WorkerFrameProtocolException exception =
await Assert.ThrowsAsync<WorkerFrameProtocolException>(
async () => await writer.WriteAsync(CreateEnvelope()));
Assert.Equal(WorkerFrameProtocolErrorCode.MessageTooLarge, exception.ErrorCode);
Assert.Equal(0, stream.Length);
}
private static WorkerEnvelope CreateEnvelope()
{
return new WorkerEnvelope
{
ProtocolVersion = GatewayContractInfo.WorkerProtocolVersion,
SessionId = SessionId,
Sequence = 1,
CorrelationId = "correlation-1",
WorkerHello = new WorkerHello
{
ProtocolVersion = GatewayContractInfo.WorkerProtocolVersion,
Nonce = "nonce",
WorkerProcessId = 1234,
WorkerVersion = "test-worker",
},
};
}
private static byte[] CreateFrame(IMessage message)
{
return CreateFrame(message.ToByteArray());
}
private static byte[] CreateFrame(byte[] payload)
{
byte[] frame = new byte[sizeof(uint) + payload.Length];
BinaryPrimitives.WriteUInt32LittleEndian(frame.AsSpan(0, sizeof(uint)), (uint)payload.Length);
payload.CopyTo(frame.AsSpan(sizeof(uint)));
return frame;
}
private sealed class ChunkedReadStream : MemoryStream
{
private readonly int _chunkSize;
/// <summary>Initializes a new instance of the <see cref="ChunkedReadStream"/> class with chunked reads.</summary>
/// <param name="buffer">The buffer containing data to read.</param>
/// <param name="chunkSize">The maximum number of bytes to read per operation.</param>
public ChunkedReadStream(
byte[] buffer,
int chunkSize)
: base(buffer)
{
_chunkSize = chunkSize;
}
/// <summary>Gets the number of read calls made to the stream.</summary>
public int ReadCallCount { get; private set; }
/// <inheritdoc />
public override ValueTask<int> ReadAsync(
Memory<byte> buffer,
CancellationToken cancellationToken = default)
{
ReadCallCount++;
int requestedCount = Math.Min(buffer.Length, _chunkSize);
return base.ReadAsync(buffer[..requestedCount], cancellationToken);
}
}
}
@@ -0,0 +1,400 @@
using System.Diagnostics;
using Microsoft.Extensions.Options;
using ZB.MOM.WW.MxGateway.Contracts;
using ZB.MOM.WW.MxGateway.Server.Configuration;
using ZB.MOM.WW.MxGateway.Server.Metrics;
using ZB.MOM.WW.MxGateway.Server.Workers;
namespace ZB.MOM.WW.MxGateway.Tests.Gateway.Workers;
public sealed class WorkerProcessLauncherTests
{
private const string SessionId = "session-1";
private const string PipeName = "mxaccess-gateway-123-session-1";
private const string Nonce = "super-secret-nonce";
/// <summary>Verifies that a valid worker executable starts with correct bootstrap arguments and nonce environment variable.</summary>
[Fact]
public async Task LaunchAsync_WithValidWorker_StartsProcessWithBootstrapArgumentsAndNonceEnvironment()
{
using TestDirectory directory = TestDirectory.Create();
string executablePath = directory.CreateWorkerExecutable(machine: 0x014c);
FakeWorkerProcess process = new(processId: 1234);
FakePipeReservation pipeReservation = new();
FakeWorkerProcessFactory processFactory = new(process);
GatewayMetrics metrics = new();
WorkerProcessLauncher launcher = CreateLauncher(executablePath, processFactory, new SucceedingStartupProbe(), metrics);
using WorkerProcessHandle handle = await launcher.LaunchAsync(CreateRequest(pipeReservation));
Assert.Equal(1234, handle.ProcessId);
Assert.Same(process, handle.Process);
Assert.NotNull(processFactory.LastStartInfo);
Assert.Equal(Path.GetFullPath(executablePath), processFactory.LastStartInfo.FileName);
Assert.False(processFactory.LastStartInfo.UseShellExecute);
Assert.True(processFactory.LastStartInfo.CreateNoWindow);
Assert.Equal(
["--session-id", SessionId, "--pipe-name", PipeName, "--protocol-version", "1"],
processFactory.LastStartInfo.ArgumentList);
Assert.Equal(Nonce, processFactory.LastStartInfo.Environment[WorkerProcessLauncher.WorkerNonceEnvironmentVariableName]);
Assert.Equal(
"2000",
processFactory.LastStartInfo.Environment[
WorkerProcessLauncher.WorkerPipeConnectAttemptTimeoutEnvironmentVariableName]);
Assert.DoesNotContain(Nonce, handle.CommandLine.ToString(), StringComparison.Ordinal);
Assert.DoesNotContain(Nonce, string.Join(" ", handle.CommandLine.Arguments), StringComparison.Ordinal);
Assert.False(pipeReservation.DisposeCalled);
Assert.Equal(0, metrics.GetSnapshot().WorkersRunning);
}
/// <summary>Verifies that a failed startup probe kills and disposes the worker process.</summary>
[Fact]
public async Task LaunchAsync_WhenStartupProbeFails_KillsAndDisposesWorker()
{
using TestDirectory directory = TestDirectory.Create();
string executablePath = directory.CreateWorkerExecutable(machine: 0x014c);
FakeWorkerProcess process = new(processId: 1234);
FakePipeReservation pipeReservation = new();
GatewayMetrics metrics = new();
WorkerProcessLauncher launcher = CreateLauncher(
executablePath,
new FakeWorkerProcessFactory(process),
new FailingStartupProbe(),
metrics);
WorkerProcessLaunchException exception =
await Assert.ThrowsAsync<WorkerProcessLaunchException>(
async () => await launcher.LaunchAsync(CreateRequest(pipeReservation)));
Assert.Equal(WorkerProcessLaunchErrorCode.StartupFailed, exception.ErrorCode);
Assert.True(process.KillCalled);
Assert.True(process.DisposeCalled);
Assert.True(pipeReservation.DisposeCalled);
Assert.Equal(1, metrics.GetSnapshot().WorkerKills);
}
/// <summary>Verifies that transient startup probe failures are retried without respawning the worker process.</summary>
[Fact]
public async Task LaunchAsync_WhenStartupProbeFailsTransiently_RetriesWithoutRespawningWorker()
{
using TestDirectory directory = TestDirectory.Create();
string executablePath = directory.CreateWorkerExecutable(machine: 0x014c);
FakeWorkerProcess process = new(processId: 1234);
FakeWorkerProcessFactory processFactory = new(process);
GatewayMetrics metrics = new();
WorkerProcessLauncher launcher = CreateLauncher(
executablePath,
processFactory,
new TransientStartupProbe(failuresBeforeSuccess: 1),
metrics,
startupProbeRetryAttempts: 2,
startupProbeRetryDelayMilliseconds: 1);
using WorkerProcessHandle handle = await launcher.LaunchAsync(CreateRequest());
Assert.Same(process, handle.Process);
Assert.Equal(1, processFactory.StartCount);
Assert.False(process.KillCalled);
GatewayMetricsSnapshot snapshot = metrics.GetSnapshot();
Assert.Equal(1, snapshot.RetryAttempts);
Assert.Equal(1, snapshot.RetryAttemptsByArea["worker_startup"]);
}
/// <summary>Verifies that a startup probe timeout kills and disposes the worker process.</summary>
[Fact]
public async Task LaunchAsync_WhenStartupTimesOut_KillsAndDisposesWorker()
{
using TestDirectory directory = TestDirectory.Create();
string executablePath = directory.CreateWorkerExecutable(machine: 0x014c);
FakeWorkerProcess process = new(processId: 1234);
GatewayMetrics metrics = new();
WorkerProcessLauncher launcher = CreateLauncher(
executablePath,
new FakeWorkerProcessFactory(process),
new WaitingStartupProbe(),
metrics,
startupTimeoutSeconds: 1);
WorkerProcessLaunchException exception =
await Assert.ThrowsAsync<WorkerProcessLaunchException>(
async () => await launcher.LaunchAsync(CreateRequest()));
Assert.Equal(WorkerProcessLaunchErrorCode.StartupTimeout, exception.ErrorCode);
Assert.True(process.KillCalled);
Assert.True(process.DisposeCalled);
Assert.Equal(1, metrics.GetSnapshot().WorkerKills);
}
/// <summary>Verifies that a missing worker executable fails before attempting to start the process.</summary>
[Fact]
public async Task LaunchAsync_WhenExecutableDoesNotExist_FailsBeforeStartingProcess()
{
using TestDirectory directory = TestDirectory.Create();
string executablePath = Path.Combine(directory.Path, "missing-worker.exe");
FakeWorkerProcessFactory processFactory = new(new FakeWorkerProcess(processId: 1234));
WorkerProcessLauncher launcher = CreateLauncher(executablePath, processFactory, new SucceedingStartupProbe());
WorkerProcessLaunchException exception =
await Assert.ThrowsAsync<WorkerProcessLaunchException>(
async () => await launcher.LaunchAsync(CreateRequest()));
Assert.Equal(WorkerProcessLaunchErrorCode.ExecutableNotFound, exception.ErrorCode);
Assert.Null(processFactory.LastStartInfo);
}
/// <summary>Verifies that a worker executable with mismatched architecture fails before attempting to start.</summary>
[Fact]
public async Task LaunchAsync_WhenExecutableArchitectureDoesNotMatch_FailsBeforeStartingProcess()
{
using TestDirectory directory = TestDirectory.Create();
string executablePath = directory.CreateWorkerExecutable(machine: 0x8664);
FakeWorkerProcessFactory processFactory = new(new FakeWorkerProcess(processId: 1234));
WorkerProcessLauncher launcher = CreateLauncher(executablePath, processFactory, new SucceedingStartupProbe());
WorkerProcessLaunchException exception =
await Assert.ThrowsAsync<WorkerProcessLaunchException>(
async () => await launcher.LaunchAsync(CreateRequest()));
Assert.Equal(WorkerProcessLaunchErrorCode.InvalidExecutable, exception.ErrorCode);
Assert.Null(processFactory.LastStartInfo);
}
/// <summary>Verifies that a worker that has already exited fails and disposes without additional killing.</summary>
[Fact]
public async Task LaunchAsync_WhenWorkerAlreadyExited_FailsAndDisposesWorkerWithoutKill()
{
using TestDirectory directory = TestDirectory.Create();
string executablePath = directory.CreateWorkerExecutable(machine: 0x014c);
FakeWorkerProcess process = new(processId: 1234)
{
HasExited = true,
ExitCode = 42,
};
WorkerProcessLauncher launcher = CreateLauncher(
executablePath,
new FakeWorkerProcessFactory(process),
new WorkerProcessStartedProbe());
WorkerProcessLaunchException exception =
await Assert.ThrowsAsync<WorkerProcessLaunchException>(
async () => await launcher.LaunchAsync(CreateRequest()));
Assert.Equal(WorkerProcessLaunchErrorCode.StartupFailed, exception.ErrorCode);
Assert.False(process.KillCalled);
Assert.True(process.DisposeCalled);
}
private static WorkerProcessLauncher CreateLauncher(
string executablePath,
IWorkerProcessFactory processFactory,
IWorkerStartupProbe startupProbe,
GatewayMetrics? metrics = null,
int startupTimeoutSeconds = 30,
int startupProbeRetryAttempts = 3,
int startupProbeRetryDelayMilliseconds = 250)
{
GatewayOptions options = new()
{
Worker = new WorkerOptions
{
ExecutablePath = executablePath,
RequiredArchitecture = WorkerArchitecture.X86,
StartupTimeoutSeconds = startupTimeoutSeconds,
StartupProbeRetryAttempts = startupProbeRetryAttempts,
StartupProbeRetryDelayMilliseconds = startupProbeRetryDelayMilliseconds,
},
};
return new WorkerProcessLauncher(
Options.Create(options),
processFactory,
startupProbe,
metrics ?? new GatewayMetrics());
}
private static WorkerProcessLaunchRequest CreateRequest(IDisposable? pipeReservation = null)
{
return new WorkerProcessLaunchRequest(
SessionId,
PipeName,
GatewayContractInfo.WorkerProtocolVersion,
Nonce,
pipeReservation);
}
/// <summary>Fake worker process factory for testing process launch logic.</summary>
private sealed class FakeWorkerProcessFactory(IWorkerProcess process) : IWorkerProcessFactory
{
/// <summary>Gets the most recent process start information.</summary>
public ProcessStartInfo? LastStartInfo { get; private set; }
/// <summary>Gets the number of times the process factory has started a process.</summary>
public int StartCount { get; private set; }
/// <inheritdoc />
public IWorkerProcess Start(ProcessStartInfo startInfo)
{
StartCount++;
LastStartInfo = startInfo;
return process;
}
}
/// <summary>Fake worker process for testing process lifecycle and exit behavior.</summary>
private sealed class FakeWorkerProcess(int processId) : IWorkerProcess
{
/// <inheritdoc />
public int Id { get; } = processId;
/// <summary>Gets or sets a value indicating whether the process has exited.</summary>
public bool HasExited { get; set; }
/// <summary>Gets or sets the process exit code.</summary>
public int? ExitCode { get; set; }
/// <summary>Gets a value indicating whether the Dispose method was called.</summary>
public bool DisposeCalled { get; private set; }
/// <summary>Gets a value indicating whether the Kill method was called.</summary>
public bool KillCalled { get; private set; }
/// <inheritdoc />
public ValueTask WaitForExitAsync(CancellationToken cancellationToken)
{
return ValueTask.CompletedTask;
}
/// <inheritdoc />
public void Kill(bool entireProcessTree)
{
Assert.True(entireProcessTree);
KillCalled = true;
HasExited = true;
}
/// <inheritdoc />
public void Dispose()
{
DisposeCalled = true;
}
}
/// <summary>Fake startup probe that immediately succeeds.</summary>
private sealed class SucceedingStartupProbe : IWorkerStartupProbe
{
/// <inheritdoc />
public Task WaitUntilReadyAsync(
IWorkerProcess process,
WorkerProcessLaunchRequest request,
CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
}
/// <summary>Fake startup probe that always fails.</summary>
private sealed class FailingStartupProbe : IWorkerStartupProbe
{
/// <inheritdoc />
public Task WaitUntilReadyAsync(
IWorkerProcess process,
WorkerProcessLaunchRequest request,
CancellationToken cancellationToken)
{
throw new InvalidOperationException("Fake worker startup failed.");
}
}
/// <summary>Fake startup probe that waits indefinitely to simulate a startup timeout.</summary>
private sealed class WaitingStartupProbe : IWorkerStartupProbe
{
/// <inheritdoc />
public async Task WaitUntilReadyAsync(
IWorkerProcess process,
WorkerProcessLaunchRequest request,
CancellationToken cancellationToken)
{
await Task.Delay(Timeout.InfiniteTimeSpan, cancellationToken);
}
}
/// <summary>Fake startup probe that fails a configurable number of times before succeeding.</summary>
private sealed class TransientStartupProbe(int failuresBeforeSuccess) : IWorkerStartupProbe
{
private int _attempts;
/// <inheritdoc />
public Task WaitUntilReadyAsync(
IWorkerProcess process,
WorkerProcessLaunchRequest request,
CancellationToken cancellationToken)
{
if (Interlocked.Increment(ref _attempts) <= failuresBeforeSuccess)
{
throw new IOException("The worker pipe was not ready yet.");
}
return Task.CompletedTask;
}
}
/// <summary>Fake pipe reservation for testing pipe lifecycle.</summary>
private sealed class FakePipeReservation : IDisposable
{
/// <summary>Gets a value indicating whether the Dispose method was called.</summary>
public bool DisposeCalled { get; private set; }
/// <inheritdoc />
public void Dispose()
{
DisposeCalled = true;
}
}
/// <summary>Test helper that creates and cleans up a temporary directory for worker executable tests.</summary>
private sealed class TestDirectory : IDisposable
{
private TestDirectory(string path)
{
Path = path;
}
/// <summary>Gets the path to the temporary test directory.</summary>
public string Path { get; }
/// <summary>Creates a new temporary directory for testing.</summary>
public static TestDirectory Create()
{
string path = System.IO.Path.Combine(System.IO.Path.GetTempPath(), $"mxgateway-tests-{Guid.NewGuid():N}");
Directory.CreateDirectory(path);
return new TestDirectory(path);
}
/// <summary>Creates a fake PE executable with the specified machine architecture for testing.</summary>
/// <param name="machine">PE machine type constant (0x014c for x86, 0x8664 for x64).</param>
/// <returns>Full path to the created executable file.</returns>
public string CreateWorkerExecutable(ushort machine)
{
string path = System.IO.Path.Combine(Path, "ZB.MOM.WW.MxGateway.Worker.exe");
byte[] bytes = new byte[0x100];
bytes[0] = (byte)'M';
bytes[1] = (byte)'Z';
BitConverter.GetBytes(0x80).CopyTo(bytes, 0x3c);
bytes[0x80] = (byte)'P';
bytes[0x81] = (byte)'E';
bytes[0x82] = 0;
bytes[0x83] = 0;
BitConverter.GetBytes(machine).CopyTo(bytes, 0x84);
File.WriteAllBytes(path, bytes);
return path;
}
/// <inheritdoc />
public void Dispose()
{
Directory.Delete(Path, recursive: true);
}
}
}