using MxGateway.Contracts; using MxGateway.Contracts.Proto; using MxGateway.Server.Workers; using MxGateway.Tests.Gateway.Workers.Fakes; namespace MxGateway.Tests.Gateway.Workers; public sealed class FakeWorkerHarnessTests { private static readonly TimeSpan TestTimeout = TimeSpan.FromSeconds(5); [Fact] public async Task CompleteStartupAsync_WithHelloAndReady_TransitionsClientToReady() { await using FakeWorkerHarness fakeWorker = await FakeWorkerHarness.CreateConnectedPairAsync(); await using WorkerClient client = fakeWorker.CreateClient(); Task startTask = client.StartAsync(CancellationToken.None); WorkerEnvelope gatewayHello = await fakeWorker.CompleteStartupAsync(); await startTask.WaitAsync(TestTimeout); Assert.Equal(WorkerEnvelope.BodyOneofCase.GatewayHello, gatewayHello.BodyCase); Assert.Equal(FakeWorkerHarness.DefaultNonce, gatewayHello.GatewayHello.Nonce); Assert.Equal(WorkerClientState.Ready, client.State); Assert.Equal(FakeWorkerHarness.DefaultWorkerProcessId, client.ProcessId); } [Fact] public async Task StartAsync_WithProtocolMismatch_FailsStartup() { await using FakeWorkerHarness fakeWorker = await FakeWorkerHarness.CreateConnectedPairAsync(); await using WorkerClient client = fakeWorker.CreateClient(); Task startTask = client.StartAsync(CancellationToken.None); WorkerEnvelope gatewayHello = await fakeWorker.ReadGatewayEnvelopeAsync(); Assert.Equal(WorkerEnvelope.BodyOneofCase.GatewayHello, gatewayHello.BodyCase); await fakeWorker.SendWorkerHelloAsync( workerProtocolVersion: GatewayContractInfo.WorkerProtocolVersion + 1); WorkerClientException exception = await Assert.ThrowsAsync( async () => await startTask.WaitAsync(TestTimeout)); Assert.Equal(WorkerClientErrorCode.ProtocolViolation, exception.ErrorCode); } [Fact] public async Task InvokeAsync_WithScriptedReply_CompletesCommand() { await using FakeWorkerHarness fakeWorker = await FakeWorkerHarness.CreateConnectedPairAsync(); await using WorkerClient client = fakeWorker.CreateClient(); await StartClientAsync(fakeWorker, client); Task invokeTask = client.InvokeAsync( CreateCommand(MxCommandKind.Ping), TestTimeout, CancellationToken.None); WorkerEnvelope commandEnvelope = await fakeWorker.ReadCommandAsync(); await fakeWorker.ReplyToCommandAsync(commandEnvelope); WorkerCommandReply reply = await invokeTask.WaitAsync(TestTimeout); Assert.Equal(commandEnvelope.CorrelationId, reply.Reply.CorrelationId); Assert.Equal(MxCommandKind.Ping, reply.Reply.Kind); Assert.Equal(ProtocolStatusCode.Ok, reply.Reply.ProtocolStatus.Code); } [Fact] public async Task ReadEventsAsync_WithScriptedEvents_YieldsOrderedEvents() { await using FakeWorkerHarness fakeWorker = await FakeWorkerHarness.CreateConnectedPairAsync(); await using WorkerClient client = fakeWorker.CreateClient(); await StartClientAsync(fakeWorker, client); using CancellationTokenSource cancellationTokenSource = new(TestTimeout); await using IAsyncEnumerator events = client.ReadEventsAsync(cancellationTokenSource.Token).GetAsyncEnumerator(cancellationTokenSource.Token); await fakeWorker.EmitEventAsync(MxEventFamily.OnDataChange, cancellationTokenSource.Token); await fakeWorker.EmitEventAsync(MxEventFamily.OperationComplete, cancellationTokenSource.Token); Assert.True(await events.MoveNextAsync()); Assert.Equal((ulong)3, events.Current.Event.WorkerSequence); Assert.Equal(MxEventFamily.OnDataChange, events.Current.Event.Family); Assert.True(await events.MoveNextAsync()); Assert.Equal((ulong)4, events.Current.Event.WorkerSequence); Assert.Equal(MxEventFamily.OperationComplete, events.Current.Event.Family); } [Fact] public async Task ReadLoop_WithScriptedFault_FaultsClient() { await using FakeWorkerHarness fakeWorker = await FakeWorkerHarness.CreateConnectedPairAsync(); await using WorkerClient client = fakeWorker.CreateClient(); await StartClientAsync(fakeWorker, client); await fakeWorker.EmitFaultAsync( WorkerFaultCategory.MxaccessCommandFailed, "scripted MXAccess command fault"); await WaitUntilAsync( () => client.State == WorkerClientState.Faulted, TestTimeout); Assert.Equal(WorkerClientState.Faulted, client.State); } [Fact] public async Task SendHeartbeatAsync_UpdatesClientHeartbeatState() { await using FakeWorkerHarness fakeWorker = await FakeWorkerHarness.CreateConnectedPairAsync(); await using WorkerClient client = fakeWorker.CreateClient(); await StartClientAsync(fakeWorker, client); DateTimeOffset previousHeartbeat = client.LastHeartbeatAt; await Task.Delay(TimeSpan.FromMilliseconds(20)); await fakeWorker.SendHeartbeatAsync( configureHeartbeat: heartbeat => heartbeat.WorkerProcessId = 2468); await WaitUntilAsync( () => client.ProcessId == 2468 && client.LastHeartbeatAt > previousHeartbeat, TestTimeout); Assert.Equal(WorkerClientState.Ready, client.State); } [Fact] public async Task InvokeAsync_WithHungWorker_TimesOutPendingCommand() { await using FakeWorkerHarness fakeWorker = await FakeWorkerHarness.CreateConnectedPairAsync(); await using WorkerClient client = fakeWorker.CreateClient(); await StartClientAsync(fakeWorker, client); Task invokeTask = client.InvokeAsync( CreateCommand(MxCommandKind.Ping), TimeSpan.FromMilliseconds(50), CancellationToken.None); WorkerEnvelope commandEnvelope = await fakeWorker.ReadCommandAsync(); WorkerClientException exception = await Assert.ThrowsAsync( async () => await invokeTask.WaitAsync(TestTimeout)); Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerCommand, commandEnvelope.BodyCase); Assert.Equal(WorkerClientErrorCode.CommandTimeout, exception.ErrorCode); } [Fact] public async Task ReadLoop_WithMalformedFrame_FaultsClient() { await using FakeWorkerHarness fakeWorker = await FakeWorkerHarness.CreateConnectedPairAsync(); await using WorkerClient client = fakeWorker.CreateClient(); await StartClientAsync(fakeWorker, client); await fakeWorker.WriteMalformedPayloadAsync(new byte[] { 0x08, 0x96, 0x01 }); await WaitUntilAsync( () => client.State == WorkerClientState.Faulted, TestTimeout); Assert.Equal(WorkerClientState.Faulted, client.State); } [Fact] public async Task ShutdownAsync_WithShutdownAck_ClosesClient() { await using FakeWorkerHarness fakeWorker = await FakeWorkerHarness.CreateConnectedPairAsync(); await using WorkerClient client = fakeWorker.CreateClient(); await StartClientAsync(fakeWorker, client); Task shutdownTask = client.ShutdownAsync(TestTimeout, CancellationToken.None); WorkerEnvelope shutdownEnvelope = await fakeWorker.ReadShutdownAsync(); await fakeWorker.SendShutdownAckAsync(); await shutdownTask.WaitAsync(TestTimeout); Assert.Equal(WorkerEnvelope.BodyOneofCase.WorkerShutdown, shutdownEnvelope.BodyCase); Assert.Equal(WorkerClientState.Closed, client.State); } private static async Task StartClientAsync( FakeWorkerHarness fakeWorker, WorkerClient client) { Task startTask = client.StartAsync(CancellationToken.None); await fakeWorker.CompleteStartupAsync().ConfigureAwait(false); await startTask.WaitAsync(TestTimeout).ConfigureAwait(false); } private static WorkerCommand CreateCommand(MxCommandKind kind) { return new WorkerCommand { Command = new MxCommand { Kind = kind, }, }; } private static async Task WaitUntilAsync( Func predicate, TimeSpan timeout) { using CancellationTokenSource cancellationTokenSource = new(timeout); while (!predicate()) { await Task.Delay(TimeSpan.FromMilliseconds(10), cancellationTokenSource.Token); } } }