From fb425da00916498406d71d1e423e965d76d0fd81 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 26 Apr 2026 18:30:11 -0400 Subject: [PATCH] Add gateway fake worker end-to-end smoke --- docs/GatewayTesting.md | 7 + .../GatewayEndToEndFakeWorkerSmokeTests.cs | 439 ++++++++++++++++++ .../Workers/Fakes/FakeWorkerHarness.cs | 46 +- 3 files changed, 473 insertions(+), 19 deletions(-) create mode 100644 src/MxGateway.Tests/Gateway/GatewayEndToEndFakeWorkerSmokeTests.cs diff --git a/docs/GatewayTesting.md b/docs/GatewayTesting.md index 1e26fc7..6412a7d 100644 --- a/docs/GatewayTesting.md +++ b/docs/GatewayTesting.md @@ -28,6 +28,12 @@ Session-level tests can connect the harness to the pipe created by `WorkerClient` tests can use `CreateConnectedPairAsync` to create both pipe ends inside the test. +`GatewayEndToEndFakeWorkerSmokeTests` composes the real gRPC service, +`SessionManager`, `SessionWorkerClientFactory`, `WorkerClient`, and +`EventStreamService` with a scripted fake worker launcher. The smoke test covers +`OpenSession`, `Register`, `AddItem`, `Advise`, one streamed `OnDataChange` +event, and `CloseSession` without loading MXAccess COM. + ## Focused Commands Run the fake worker tests after changing gateway worker IPC, session startup, or @@ -36,6 +42,7 @@ event streaming behavior: ```bash dotnet test src/MxGateway.Tests/MxGateway.Tests.csproj --filter FullyQualifiedName~FakeWorkerHarnessTests dotnet test src/MxGateway.Tests/MxGateway.Tests.csproj --filter FullyQualifiedName~SessionWorkerClientFactoryFakeWorkerTests +dotnet test src/MxGateway.Tests/MxGateway.Tests.csproj --filter FullyQualifiedName~GatewayEndToEndFakeWorkerSmokeTests ``` Run the gateway test project after shared gateway test infrastructure changes: diff --git a/src/MxGateway.Tests/Gateway/GatewayEndToEndFakeWorkerSmokeTests.cs b/src/MxGateway.Tests/Gateway/GatewayEndToEndFakeWorkerSmokeTests.cs new file mode 100644 index 0000000..7374035 --- /dev/null +++ b/src/MxGateway.Tests/Gateway/GatewayEndToEndFakeWorkerSmokeTests.cs @@ -0,0 +1,439 @@ +using System.Collections.Concurrent; +using Google.Protobuf.WellKnownTypes; +using Grpc.Core; +using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.Extensions.Options; +using MxGateway.Contracts; +using MxGateway.Contracts.Proto; +using MxGateway.Server.Configuration; +using MxGateway.Server.Grpc; +using MxGateway.Server.Metrics; +using MxGateway.Server.Security.Authorization; +using MxGateway.Server.Sessions; +using MxGateway.Server.Workers; +using MxGateway.Tests.Gateway.Workers.Fakes; + +namespace MxGateway.Tests.Gateway; + +public sealed class GatewayEndToEndFakeWorkerSmokeTests +{ + private static readonly TimeSpan TestTimeout = TimeSpan.FromSeconds(5); + private const int ServerHandle = 1001; + private const int ItemHandle = 2002; + + [Fact] + public async Task GatewayService_WithFakeWorker_CompletesSessionCommandEventAndClosePath() + { + ScriptedFakeWorkerProcessLauncher launcher = new(); + await using GatewayServiceFixture fixture = new(launcher); + + OpenSessionReply openReply = await fixture.Service.OpenSession( + new OpenSessionRequest + { + ClientSessionName = "fake-worker-e2e", + ClientCorrelationId = "open-correlation", + CommandTimeout = Duration.FromTimeSpan(TestTimeout), + }, + new TestServerCallContext()); + + RecordingServerStreamWriter eventWriter = new(); + Task streamTask = fixture.Service.StreamEvents( + new StreamEventsRequest { SessionId = openReply.SessionId }, + eventWriter, + new TestServerCallContext()); + + MxCommandReply registerReply = await fixture.Service.Invoke( + CreateRegisterRequest(openReply.SessionId), + new TestServerCallContext()); + MxCommandReply addItemReply = await fixture.Service.Invoke( + CreateAddItemRequest(openReply.SessionId, registerReply.Register.ServerHandle), + new TestServerCallContext()); + MxCommandReply adviseReply = await fixture.Service.Invoke( + CreateAdviseRequest(openReply.SessionId, registerReply.Register.ServerHandle, addItemReply.AddItem.ItemHandle), + new TestServerCallContext()); + + MxEvent dataChange = await eventWriter.WaitForFirstMessageAsync(TestTimeout); + + CloseSessionReply closeReply = await fixture.Service.CloseSession( + new CloseSessionRequest + { + SessionId = openReply.SessionId, + ClientCorrelationId = "close-correlation", + }, + new TestServerCallContext()); + + await streamTask.WaitAsync(TestTimeout); + await launcher.WorkerTask.WaitAsync(TestTimeout); + + Assert.Equal(ProtocolStatusCode.Ok, openReply.ProtocolStatus.Code); + Assert.Equal(GatewayContractInfo.DefaultBackendName, openReply.BackendName); + Assert.Equal(ScriptedFakeWorkerProcessLauncher.ProcessId, openReply.WorkerProcessId); + Assert.Equal(ProtocolStatusCode.Ok, registerReply.ProtocolStatus.Code); + Assert.Equal(ServerHandle, registerReply.Register.ServerHandle); + Assert.Equal(ProtocolStatusCode.Ok, addItemReply.ProtocolStatus.Code); + Assert.Equal(ItemHandle, addItemReply.AddItem.ItemHandle); + Assert.Equal(ProtocolStatusCode.Ok, adviseReply.ProtocolStatus.Code); + Assert.Equal(MxEventFamily.OnDataChange, dataChange.Family); + Assert.Equal(openReply.SessionId, dataChange.SessionId); + Assert.Equal(ServerHandle, dataChange.ServerHandle); + Assert.Equal(ItemHandle, dataChange.ItemHandle); + Assert.Equal("scripted-value", dataChange.Value.StringValue); + Assert.Equal(ProtocolStatusCode.Ok, closeReply.ProtocolStatus.Code); + Assert.Equal(SessionState.Closed, closeReply.FinalState); + Assert.True(launcher.Process.HasExited); + Assert.Equal( + [MxCommandKind.Register, MxCommandKind.AddItem, MxCommandKind.Advise], + launcher.CommandKinds); + } + + private static MxCommandRequest CreateRegisterRequest(string sessionId) + { + return new MxCommandRequest + { + SessionId = sessionId, + ClientCorrelationId = "register-correlation", + Command = new MxCommand + { + Kind = MxCommandKind.Register, + Register = new RegisterCommand { ClientName = "fake-worker-e2e-client" }, + }, + }; + } + + private static MxCommandRequest CreateAddItemRequest( + string sessionId, + int serverHandle) + { + return new MxCommandRequest + { + SessionId = sessionId, + ClientCorrelationId = "add-item-correlation", + Command = new MxCommand + { + Kind = MxCommandKind.AddItem, + AddItem = new AddItemCommand + { + ServerHandle = serverHandle, + ItemDefinition = "Galaxy.Tag.Value", + }, + }, + }; + } + + private static MxCommandRequest CreateAdviseRequest( + string sessionId, + int serverHandle, + int itemHandle) + { + return new MxCommandRequest + { + SessionId = sessionId, + ClientCorrelationId = "advise-correlation", + Command = new MxCommand + { + Kind = MxCommandKind.Advise, + Advise = new AdviseCommand + { + ServerHandle = serverHandle, + ItemHandle = itemHandle, + }, + }, + }; + } + + private sealed class GatewayServiceFixture : IAsyncDisposable + { + private readonly GatewayMetrics _metrics = new(); + private readonly SessionRegistry _registry = new(); + + public GatewayServiceFixture(IWorkerProcessLauncher launcher) + { + IOptions options = Options.Create(CreateOptions()); + SessionWorkerClientFactory workerClientFactory = new( + launcher, + options, + _metrics, + NullLoggerFactory.Instance); + SessionManager sessionManager = new( + _registry, + workerClientFactory, + options, + _metrics, + logger: NullLogger.Instance); + MxAccessGrpcMapper mapper = new(); + EventStreamService eventStreamService = new( + sessionManager, + options, + mapper, + _metrics, + NullLogger.Instance); + + Service = new MxAccessGatewayService( + sessionManager, + new GatewayRequestIdentityAccessor(), + new MxAccessGrpcRequestValidator(), + mapper, + eventStreamService, + NullLogger.Instance); + } + + public MxAccessGatewayService Service { get; } + + public async ValueTask DisposeAsync() + { + foreach (GatewaySession session in _registry.Snapshot()) + { + await session.DisposeAsync(); + } + + _metrics.Dispose(); + } + + private static GatewayOptions CreateOptions() + { + return new GatewayOptions + { + Worker = new WorkerOptions + { + StartupTimeoutSeconds = 5, + ShutdownTimeoutSeconds = 5, + HeartbeatIntervalSeconds = 30, + HeartbeatGraceSeconds = 30, + MaxMessageBytes = WorkerFrameProtocolOptions.DefaultMaxMessageBytes, + }, + Sessions = new SessionOptions + { + DefaultCommandTimeoutSeconds = 5, + MaxSessions = 4, + }, + Events = new EventOptions + { + QueueCapacity = 16, + }, + }; + } + } + + private sealed class ScriptedFakeWorkerProcessLauncher : IWorkerProcessLauncher + { + public const int ProcessId = 4680; + private readonly ConcurrentQueue _commandKinds = new(); + + public FakeWorkerProcess Process { get; } = new(ProcessId); + + public IReadOnlyCollection CommandKinds => _commandKinds.ToArray(); + + public Task WorkerTask { get; private set; } = Task.CompletedTask; + + public Task LaunchAsync( + WorkerProcessLaunchRequest request, + CancellationToken cancellationToken = default) + { + WorkerTask = RunWorkerAsync(request, cancellationToken); + + return Task.FromResult(new WorkerProcessHandle( + Process, + new WorkerProcessCommandLine("fake-worker.exe", []), + DateTimeOffset.UtcNow)); + } + + private async Task RunWorkerAsync( + WorkerProcessLaunchRequest request, + CancellationToken cancellationToken) + { + await using FakeWorkerHarness harness = await FakeWorkerHarness.ConnectToGatewayPipeAsync( + request.SessionId, + request.Nonce, + request.PipeName, + request.ProtocolVersion, + cancellationToken: cancellationToken).ConfigureAwait(false); + await harness.CompleteStartupAsync(ProcessId, cancellationToken: cancellationToken).ConfigureAwait(false); + + while (!cancellationToken.IsCancellationRequested) + { + WorkerEnvelope envelope = await harness.ReadGatewayEnvelopeAsync(cancellationToken).ConfigureAwait(false); + if (envelope.BodyCase == WorkerEnvelope.BodyOneofCase.WorkerCommand) + { + await ReplyToCommandAsync(harness, envelope, cancellationToken).ConfigureAwait(false); + continue; + } + + if (envelope.BodyCase == WorkerEnvelope.BodyOneofCase.WorkerShutdown) + { + await harness.SendShutdownAckAsync(cancellationToken: cancellationToken).ConfigureAwait(false); + Process.MarkExited(0); + return; + } + + throw new InvalidOperationException($"Unexpected gateway envelope {envelope.BodyCase}."); + } + } + + private async Task ReplyToCommandAsync( + FakeWorkerHarness harness, + WorkerEnvelope commandEnvelope, + CancellationToken cancellationToken) + { + MxCommand command = commandEnvelope.WorkerCommand.Command; + _commandKinds.Enqueue(command.Kind); + + await harness.ReplyToCommandAsync( + commandEnvelope, + configureReply: reply => ConfigureReply(reply, command.Kind), + cancellationToken: cancellationToken).ConfigureAwait(false); + + if (command.Kind == MxCommandKind.Advise) + { + await harness.EmitEventAsync( + MxEventFamily.OnDataChange, + cancellationToken, + mxEvent => + { + mxEvent.ServerHandle = command.Advise.ServerHandle; + mxEvent.ItemHandle = command.Advise.ItemHandle; + mxEvent.Quality = 192; + mxEvent.Value = new MxValue + { + DataType = MxDataType.String, + StringValue = "scripted-value", + }; + mxEvent.OnDataChange = new OnDataChangeEvent(); + }).ConfigureAwait(false); + } + } + + private static void ConfigureReply( + MxCommandReply reply, + MxCommandKind kind) + { + switch (kind) + { + case MxCommandKind.Register: + reply.Register = new RegisterReply { ServerHandle = ServerHandle }; + break; + case MxCommandKind.AddItem: + reply.AddItem = new AddItemReply { ItemHandle = ItemHandle }; + break; + } + } + } + + private sealed class FakeWorkerProcess(int processId) : IWorkerProcess + { + public int Id { get; } = processId; + + public bool HasExited { get; private set; } + + public int? ExitCode { get; private set; } + + public ValueTask WaitForExitAsync(CancellationToken cancellationToken) + { + HasExited = true; + ExitCode ??= 0; + return ValueTask.CompletedTask; + } + + public void Kill(bool entireProcessTree) + { + MarkExited(-1); + } + + public void Dispose() + { + } + + public void MarkExited(int exitCode) + { + HasExited = true; + ExitCode = exitCode; + } + } + + private sealed class RecordingServerStreamWriter : IServerStreamWriter + { + private readonly object _syncRoot = new(); + private readonly TaskCompletionSource _firstMessage = new(TaskCreationOptions.RunContinuationsAsynchronously); + private readonly List _messages = []; + + public IReadOnlyList Messages + { + get + { + lock (_syncRoot) + { + return _messages.ToArray(); + } + } + } + + public WriteOptions? WriteOptions { get; set; } + + public Task WriteAsync(T message) + { + lock (_syncRoot) + { + _messages.Add(message); + } + + _firstMessage.TrySetResult(message); + return Task.CompletedTask; + } + + public async Task WaitForFirstMessageAsync(TimeSpan timeout) + { + return await _firstMessage.Task.WaitAsync(timeout).ConfigureAwait(false); + } + } + + private sealed class TestServerCallContext(CancellationToken cancellationToken = default) : ServerCallContext + { + private readonly Metadata _requestHeaders = []; + private readonly Metadata _responseTrailers = []; + private readonly Dictionary _userState = []; + private Status _status; + private WriteOptions? _writeOptions; + + protected override string MethodCore => "/mxaccess_gateway.v1.MxAccessGateway/Test"; + + protected override string HostCore => "localhost"; + + protected override string PeerCore => "ipv4:127.0.0.1:5000"; + + protected override DateTime DeadlineCore => DateTime.UtcNow.AddMinutes(1); + + protected override Metadata RequestHeadersCore => _requestHeaders; + + protected override CancellationToken CancellationTokenCore => cancellationToken; + + protected override Metadata ResponseTrailersCore => _responseTrailers; + + protected override Status StatusCore + { + get => _status; + set => _status = value; + } + + protected override WriteOptions? WriteOptionsCore + { + get => _writeOptions; + set => _writeOptions = value; + } + + protected override AuthContext AuthContextCore { get; } = new( + string.Empty, + new Dictionary>(StringComparer.Ordinal)); + + protected override IDictionary UserStateCore => _userState; + + protected override Task WriteResponseHeadersAsyncCore(Metadata responseHeaders) + { + return Task.CompletedTask; + } + + protected override ContextPropagationToken CreatePropagationTokenCore( + ContextPropagationOptions? options) + { + throw new NotSupportedException(); + } + } +} diff --git a/src/MxGateway.Tests/Gateway/Workers/Fakes/FakeWorkerHarness.cs b/src/MxGateway.Tests/Gateway/Workers/Fakes/FakeWorkerHarness.cs index dd501f1..5981618 100644 --- a/src/MxGateway.Tests/Gateway/Workers/Fakes/FakeWorkerHarness.cs +++ b/src/MxGateway.Tests/Gateway/Workers/Fakes/FakeWorkerHarness.cs @@ -205,6 +205,7 @@ public sealed class FakeWorkerHarness : IAsyncDisposable WorkerEnvelope commandEnvelope, ProtocolStatusCode statusCode = ProtocolStatusCode.Ok, string statusMessage = "OK", + Action? configureReply = null, CancellationToken cancellationToken = default) { if (commandEnvelope.BodyCase != WorkerEnvelope.BodyOneofCase.WorkerCommand) @@ -213,22 +214,25 @@ public sealed class FakeWorkerHarness : IAsyncDisposable } MxCommandKind kind = commandEnvelope.WorkerCommand.Command?.Kind ?? MxCommandKind.Unspecified; + MxCommandReply reply = new() + { + SessionId = SessionId, + CorrelationId = commandEnvelope.CorrelationId, + Kind = kind, + ProtocolStatus = new ProtocolStatus + { + Code = statusCode, + Message = statusMessage, + }, + }; + configureReply?.Invoke(reply); + await _writer.WriteAsync( CreateEnvelope( commandEnvelope.CorrelationId, envelope => envelope.WorkerCommandReply = new WorkerCommandReply { - Reply = new MxCommandReply - { - SessionId = SessionId, - CorrelationId = commandEnvelope.CorrelationId, - Kind = kind, - ProtocolStatus = new ProtocolStatus - { - Code = statusCode, - Message = statusMessage, - }, - }, + Reply = reply, CompletedTimestamp = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow), }), cancellationToken).ConfigureAwait(false); @@ -236,21 +240,25 @@ public sealed class FakeWorkerHarness : IAsyncDisposable public async Task EmitEventAsync( MxEventFamily family, - CancellationToken cancellationToken = default) + CancellationToken cancellationToken = default, + Action? configureEvent = null) { ulong sequence = NextWorkerSequence + 1; + MxEvent mxEvent = new() + { + SessionId = SessionId, + Family = family, + WorkerSequence = sequence, + WorkerTimestamp = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow), + }; + configureEvent?.Invoke(mxEvent); + await _writer.WriteAsync( CreateEnvelope( correlationId: string.Empty, envelope => envelope.WorkerEvent = new WorkerEvent { - Event = new MxEvent - { - SessionId = SessionId, - Family = family, - WorkerSequence = sequence, - WorkerTimestamp = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow), - }, + Event = mxEvent, }), cancellationToken).ConfigureAwait(false); } -- 2.52.0