210 lines
8.5 KiB
C#
210 lines
8.5 KiB
C#
using MxGateway.Contracts;
|
|
using MxGateway.Contracts.Proto;
|
|
using MxGateway.Server.Workers;
|
|
using MxGateway.Tests.Gateway.Workers.Fakes;
|
|
|
|
namespace MxGateway.Tests.Gateway.Workers;
|
|
|
|
public sealed class FakeWorkerHarnessTests
|
|
{
|
|
private static readonly TimeSpan TestTimeout = TimeSpan.FromSeconds(5);
|
|
|
|
[Fact]
|
|
public async Task CompleteStartupAsync_WithHelloAndReady_TransitionsClientToReady()
|
|
{
|
|
await using FakeWorkerHarness fakeWorker = await FakeWorkerHarness.CreateConnectedPairAsync();
|
|
await using WorkerClient client = fakeWorker.CreateClient();
|
|
|
|
Task startTask = client.StartAsync(CancellationToken.None);
|
|
WorkerEnvelope gatewayHello = await fakeWorker.CompleteStartupAsync();
|
|
await startTask.WaitAsync(TestTimeout);
|
|
|
|
Assert.Equal(WorkerEnvelope.BodyOneofCase.GatewayHello, gatewayHello.BodyCase);
|
|
Assert.Equal(FakeWorkerHarness.DefaultNonce, gatewayHello.GatewayHello.Nonce);
|
|
Assert.Equal(WorkerClientState.Ready, client.State);
|
|
Assert.Equal(FakeWorkerHarness.DefaultWorkerProcessId, client.ProcessId);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task StartAsync_WithProtocolMismatch_FailsStartup()
|
|
{
|
|
await using FakeWorkerHarness fakeWorker = await FakeWorkerHarness.CreateConnectedPairAsync();
|
|
await using WorkerClient client = fakeWorker.CreateClient();
|
|
|
|
Task startTask = client.StartAsync(CancellationToken.None);
|
|
WorkerEnvelope gatewayHello = await fakeWorker.ReadGatewayEnvelopeAsync();
|
|
Assert.Equal(WorkerEnvelope.BodyOneofCase.GatewayHello, gatewayHello.BodyCase);
|
|
await fakeWorker.SendWorkerHelloAsync(
|
|
workerProtocolVersion: GatewayContractInfo.WorkerProtocolVersion + 1);
|
|
|
|
WorkerClientException exception = await Assert.ThrowsAsync<WorkerClientException>(
|
|
async () => await startTask.WaitAsync(TestTimeout));
|
|
|
|
Assert.Equal(WorkerClientErrorCode.ProtocolViolation, exception.ErrorCode);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task InvokeAsync_WithScriptedReply_CompletesCommand()
|
|
{
|
|
await using FakeWorkerHarness fakeWorker = await FakeWorkerHarness.CreateConnectedPairAsync();
|
|
await using WorkerClient client = fakeWorker.CreateClient();
|
|
await StartClientAsync(fakeWorker, client);
|
|
|
|
Task<WorkerCommandReply> invokeTask = client.InvokeAsync(
|
|
CreateCommand(MxCommandKind.Ping),
|
|
TestTimeout,
|
|
CancellationToken.None);
|
|
WorkerEnvelope commandEnvelope = await fakeWorker.ReadCommandAsync();
|
|
await fakeWorker.ReplyToCommandAsync(commandEnvelope);
|
|
|
|
WorkerCommandReply reply = await invokeTask.WaitAsync(TestTimeout);
|
|
|
|
Assert.Equal(commandEnvelope.CorrelationId, reply.Reply.CorrelationId);
|
|
Assert.Equal(MxCommandKind.Ping, reply.Reply.Kind);
|
|
Assert.Equal(ProtocolStatusCode.Ok, reply.Reply.ProtocolStatus.Code);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task ReadEventsAsync_WithScriptedEvents_YieldsOrderedEvents()
|
|
{
|
|
await using FakeWorkerHarness fakeWorker = await FakeWorkerHarness.CreateConnectedPairAsync();
|
|
await using WorkerClient client = fakeWorker.CreateClient();
|
|
await StartClientAsync(fakeWorker, client);
|
|
using CancellationTokenSource cancellationTokenSource = new(TestTimeout);
|
|
|
|
await using IAsyncEnumerator<WorkerEvent> events =
|
|
client.ReadEventsAsync(cancellationTokenSource.Token).GetAsyncEnumerator(cancellationTokenSource.Token);
|
|
|
|
await fakeWorker.EmitEventAsync(MxEventFamily.OnDataChange, cancellationTokenSource.Token);
|
|
await fakeWorker.EmitEventAsync(MxEventFamily.OperationComplete, cancellationTokenSource.Token);
|
|
|
|
Assert.True(await events.MoveNextAsync());
|
|
Assert.Equal((ulong)3, events.Current.Event.WorkerSequence);
|
|
Assert.Equal(MxEventFamily.OnDataChange, events.Current.Event.Family);
|
|
|
|
Assert.True(await events.MoveNextAsync());
|
|
Assert.Equal((ulong)4, events.Current.Event.WorkerSequence);
|
|
Assert.Equal(MxEventFamily.OperationComplete, events.Current.Event.Family);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task ReadLoop_WithScriptedFault_FaultsClient()
|
|
{
|
|
await using FakeWorkerHarness fakeWorker = await FakeWorkerHarness.CreateConnectedPairAsync();
|
|
await using WorkerClient client = fakeWorker.CreateClient();
|
|
await StartClientAsync(fakeWorker, client);
|
|
|
|
await fakeWorker.EmitFaultAsync(
|
|
WorkerFaultCategory.MxaccessCommandFailed,
|
|
"scripted MXAccess command fault");
|
|
|
|
await WaitUntilAsync(
|
|
() => client.State == WorkerClientState.Faulted,
|
|
TestTimeout);
|
|
|
|
Assert.Equal(WorkerClientState.Faulted, client.State);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task SendHeartbeatAsync_UpdatesClientHeartbeatState()
|
|
{
|
|
await using FakeWorkerHarness fakeWorker = await FakeWorkerHarness.CreateConnectedPairAsync();
|
|
await using WorkerClient client = fakeWorker.CreateClient();
|
|
await StartClientAsync(fakeWorker, client);
|
|
DateTimeOffset previousHeartbeat = client.LastHeartbeatAt;
|
|
|
|
await Task.Delay(TimeSpan.FromMilliseconds(20));
|
|
await fakeWorker.SendHeartbeatAsync(
|
|
configureHeartbeat: heartbeat => heartbeat.WorkerProcessId = 2468);
|
|
|
|
await WaitUntilAsync(
|
|
() => client.ProcessId == 2468 && client.LastHeartbeatAt > previousHeartbeat,
|
|
TestTimeout);
|
|
|
|
Assert.Equal(WorkerClientState.Ready, client.State);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task InvokeAsync_WithHungWorker_TimesOutPendingCommand()
|
|
{
|
|
await using FakeWorkerHarness fakeWorker = await FakeWorkerHarness.CreateConnectedPairAsync();
|
|
await using WorkerClient client = fakeWorker.CreateClient();
|
|
await StartClientAsync(fakeWorker, client);
|
|
|
|
Task<WorkerCommandReply> invokeTask = client.InvokeAsync(
|
|
CreateCommand(MxCommandKind.Ping),
|
|
TimeSpan.FromMilliseconds(50),
|
|
CancellationToken.None);
|
|
WorkerEnvelope commandEnvelope = await fakeWorker.ReadCommandAsync();
|
|
|
|
WorkerClientException exception = await Assert.ThrowsAsync<WorkerClientException>(
|
|
async () => await invokeTask.WaitAsync(TestTimeout));
|
|
|
|
Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerCommand, commandEnvelope.BodyCase);
|
|
Assert.Equal(WorkerClientErrorCode.CommandTimeout, exception.ErrorCode);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task ReadLoop_WithMalformedFrame_FaultsClient()
|
|
{
|
|
await using FakeWorkerHarness fakeWorker = await FakeWorkerHarness.CreateConnectedPairAsync();
|
|
await using WorkerClient client = fakeWorker.CreateClient();
|
|
await StartClientAsync(fakeWorker, client);
|
|
|
|
await fakeWorker.WriteMalformedPayloadAsync(new byte[] { 0x08, 0x96, 0x01 });
|
|
|
|
await WaitUntilAsync(
|
|
() => client.State == WorkerClientState.Faulted,
|
|
TestTimeout);
|
|
|
|
Assert.Equal(WorkerClientState.Faulted, client.State);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task ShutdownAsync_WithShutdownAck_ClosesClient()
|
|
{
|
|
await using FakeWorkerHarness fakeWorker = await FakeWorkerHarness.CreateConnectedPairAsync();
|
|
await using WorkerClient client = fakeWorker.CreateClient();
|
|
await StartClientAsync(fakeWorker, client);
|
|
|
|
Task shutdownTask = client.ShutdownAsync(TestTimeout, CancellationToken.None);
|
|
WorkerEnvelope shutdownEnvelope = await fakeWorker.ReadShutdownAsync();
|
|
await fakeWorker.SendShutdownAckAsync();
|
|
await shutdownTask.WaitAsync(TestTimeout);
|
|
|
|
Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerShutdown, shutdownEnvelope.BodyCase);
|
|
Assert.Equal(WorkerClientState.Closed, client.State);
|
|
}
|
|
|
|
private static async Task StartClientAsync(
|
|
FakeWorkerHarness fakeWorker,
|
|
WorkerClient client)
|
|
{
|
|
Task startTask = client.StartAsync(CancellationToken.None);
|
|
await fakeWorker.CompleteStartupAsync().ConfigureAwait(false);
|
|
await startTask.WaitAsync(TestTimeout).ConfigureAwait(false);
|
|
}
|
|
|
|
private static WorkerCommand CreateCommand(MxCommandKind kind)
|
|
{
|
|
return new WorkerCommand
|
|
{
|
|
Command = new MxCommand
|
|
{
|
|
Kind = kind,
|
|
},
|
|
};
|
|
}
|
|
|
|
private static async Task WaitUntilAsync(
|
|
Func<bool> predicate,
|
|
TimeSpan timeout)
|
|
{
|
|
using CancellationTokenSource cancellationTokenSource = new(timeout);
|
|
while (!predicate())
|
|
{
|
|
await Task.Delay(TimeSpan.FromMilliseconds(10), cancellationTokenSource.Token);
|
|
}
|
|
}
|
|
}
|