using System.Collections.Concurrent; using Google.Protobuf.WellKnownTypes; using Grpc.Core; using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Options; using MxGateway.Contracts; using MxGateway.Contracts.Proto; using MxGateway.Server.Configuration; using MxGateway.Server.Grpc; using MxGateway.Server.Metrics; using MxGateway.Server.Security.Authentication; using MxGateway.Server.Security.Authorization; using MxGateway.Server.Sessions; using MxGateway.Server.Workers; using MxGateway.Tests.Gateway.Workers.Fakes; namespace MxGateway.Tests.Gateway; public sealed class GatewayEndToEndFakeWorkerSmokeTests { private static readonly TimeSpan TestTimeout = TimeSpan.FromSeconds(5); private const int ServerHandle = 1001; private const int ItemHandle = 2002; [Fact] public async Task GatewayService_WithFakeWorker_CompletesSessionCommandEventAndClosePath() { ScriptedFakeWorkerProcessLauncher launcher = new(); await using GatewayServiceFixture fixture = new(launcher); OpenSessionReply openReply = await fixture.Service.OpenSession( new OpenSessionRequest { ClientSessionName = "fake-worker-e2e", ClientCorrelationId = "open-correlation", CommandTimeout = Duration.FromTimeSpan(TestTimeout), }, new TestServerCallContext()); RecordingServerStreamWriter eventWriter = new(); Task streamTask = fixture.Service.StreamEvents( new StreamEventsRequest { SessionId = openReply.SessionId }, eventWriter, new TestServerCallContext()); MxCommandReply registerReply = await fixture.Service.Invoke( CreateRegisterRequest(openReply.SessionId), new TestServerCallContext()); MxCommandReply addItemReply = await fixture.Service.Invoke( CreateAddItemRequest(openReply.SessionId, registerReply.Register.ServerHandle), new TestServerCallContext()); MxCommandReply adviseReply = await fixture.Service.Invoke( CreateAdviseRequest(openReply.SessionId, registerReply.Register.ServerHandle, addItemReply.AddItem.ItemHandle), new TestServerCallContext()); MxEvent dataChange = await eventWriter.WaitForFirstMessageAsync(TestTimeout); CloseSessionReply closeReply = await fixture.Service.CloseSession( new CloseSessionRequest { SessionId = openReply.SessionId, ClientCorrelationId = "close-correlation", }, new TestServerCallContext()); await streamTask.WaitAsync(TestTimeout); await launcher.WorkerTask.WaitAsync(TestTimeout); Assert.Equal(ProtocolStatusCode.Ok, openReply.ProtocolStatus.Code); Assert.Equal(GatewayContractInfo.DefaultBackendName, openReply.BackendName); Assert.Equal(ScriptedFakeWorkerProcessLauncher.ProcessId, openReply.WorkerProcessId); Assert.Equal(ProtocolStatusCode.Ok, registerReply.ProtocolStatus.Code); Assert.Equal(ServerHandle, registerReply.Register.ServerHandle); Assert.Equal(ProtocolStatusCode.Ok, addItemReply.ProtocolStatus.Code); Assert.Equal(ItemHandle, addItemReply.AddItem.ItemHandle); Assert.Equal(ProtocolStatusCode.Ok, adviseReply.ProtocolStatus.Code); Assert.Equal(MxEventFamily.OnDataChange, dataChange.Family); Assert.Equal(openReply.SessionId, dataChange.SessionId); Assert.Equal(ServerHandle, dataChange.ServerHandle); Assert.Equal(ItemHandle, dataChange.ItemHandle); Assert.Equal("scripted-value", dataChange.Value.StringValue); Assert.Equal(ProtocolStatusCode.Ok, closeReply.ProtocolStatus.Code); Assert.Equal(SessionState.Closed, closeReply.FinalState); Assert.True(launcher.Process.HasExited); Assert.Equal( [MxCommandKind.Register, MxCommandKind.AddItem, MxCommandKind.Advise], launcher.CommandKinds); } private static MxCommandRequest CreateRegisterRequest(string sessionId) { return new MxCommandRequest { SessionId = sessionId, ClientCorrelationId = "register-correlation", Command = new MxCommand { Kind = MxCommandKind.Register, Register = new RegisterCommand { ClientName = "fake-worker-e2e-client" }, }, }; } private static MxCommandRequest CreateAddItemRequest( string sessionId, int serverHandle) { return new MxCommandRequest { SessionId = sessionId, ClientCorrelationId = "add-item-correlation", Command = new MxCommand { Kind = MxCommandKind.AddItem, AddItem = new AddItemCommand { ServerHandle = serverHandle, ItemDefinition = "Galaxy.Tag.Value", }, }, }; } private static MxCommandRequest CreateAdviseRequest( string sessionId, int serverHandle, int itemHandle) { return new MxCommandRequest { SessionId = sessionId, ClientCorrelationId = "advise-correlation", Command = new MxCommand { Kind = MxCommandKind.Advise, Advise = new AdviseCommand { ServerHandle = serverHandle, ItemHandle = itemHandle, }, }, }; } private sealed class GatewayServiceFixture : IAsyncDisposable { private readonly GatewayMetrics _metrics = new(); private readonly SessionRegistry _registry = new(); public GatewayServiceFixture(IWorkerProcessLauncher launcher) { IOptions options = Options.Create(CreateOptions()); SessionWorkerClientFactory workerClientFactory = new( launcher, options, _metrics, NullLoggerFactory.Instance); SessionManager sessionManager = new( _registry, workerClientFactory, options, _metrics, logger: NullLogger.Instance); MxAccessGrpcMapper mapper = new(); EventStreamService eventStreamService = new( sessionManager, options, mapper, _metrics, NullLogger.Instance); Service = new MxAccessGatewayService( sessionManager, new GatewayRequestIdentityAccessor(), new AllowAllConstraintEnforcer(), new MxAccessGrpcRequestValidator(), mapper, eventStreamService, _metrics, NullLogger.Instance); } public MxAccessGatewayService Service { get; } public async ValueTask DisposeAsync() { foreach (GatewaySession session in _registry.Snapshot()) { await session.DisposeAsync(); } _metrics.Dispose(); } private static GatewayOptions CreateOptions() { return new GatewayOptions { Worker = new WorkerOptions { StartupTimeoutSeconds = 5, ShutdownTimeoutSeconds = 5, HeartbeatIntervalSeconds = 30, HeartbeatGraceSeconds = 30, MaxMessageBytes = WorkerFrameProtocolOptions.DefaultMaxMessageBytes, }, Sessions = new SessionOptions { DefaultCommandTimeoutSeconds = 5, MaxSessions = 4, }, Events = new EventOptions { QueueCapacity = 16, }, }; } } private sealed class ScriptedFakeWorkerProcessLauncher : IWorkerProcessLauncher { public const int ProcessId = 4680; private readonly ConcurrentQueue _commandKinds = new(); public FakeWorkerProcess Process { get; } = new(ProcessId); public IReadOnlyCollection CommandKinds => _commandKinds.ToArray(); public Task WorkerTask { get; private set; } = Task.CompletedTask; public Task LaunchAsync( WorkerProcessLaunchRequest request, CancellationToken cancellationToken = default) { WorkerTask = RunWorkerAsync(request, cancellationToken); return Task.FromResult(new WorkerProcessHandle( Process, new WorkerProcessCommandLine("fake-worker.exe", []), DateTimeOffset.UtcNow)); } private async Task RunWorkerAsync( WorkerProcessLaunchRequest request, CancellationToken cancellationToken) { await using FakeWorkerHarness harness = await FakeWorkerHarness.ConnectToGatewayPipeAsync( request.SessionId, request.Nonce, request.PipeName, request.ProtocolVersion, cancellationToken: cancellationToken).ConfigureAwait(false); await harness.CompleteStartupAsync(ProcessId, cancellationToken: cancellationToken).ConfigureAwait(false); while (!cancellationToken.IsCancellationRequested) { WorkerEnvelope envelope = await harness.ReadGatewayEnvelopeAsync(cancellationToken).ConfigureAwait(false); if (envelope.BodyCase == WorkerEnvelope.BodyOneofCase.WorkerCommand) { await ReplyToCommandAsync(harness, envelope, cancellationToken).ConfigureAwait(false); continue; } if (envelope.BodyCase == WorkerEnvelope.BodyOneofCase.WorkerShutdown) { await harness.SendShutdownAckAsync(cancellationToken: cancellationToken).ConfigureAwait(false); Process.MarkExited(0); return; } throw new InvalidOperationException($"Unexpected gateway envelope {envelope.BodyCase}."); } } private async Task ReplyToCommandAsync( FakeWorkerHarness harness, WorkerEnvelope commandEnvelope, CancellationToken cancellationToken) { MxCommand command = commandEnvelope.WorkerCommand.Command; _commandKinds.Enqueue(command.Kind); await harness.ReplyToCommandAsync( commandEnvelope, configureReply: reply => ConfigureReply(reply, command.Kind), cancellationToken: cancellationToken).ConfigureAwait(false); if (command.Kind == MxCommandKind.Advise) { await harness.EmitEventAsync( MxEventFamily.OnDataChange, cancellationToken, mxEvent => { mxEvent.ServerHandle = command.Advise.ServerHandle; mxEvent.ItemHandle = command.Advise.ItemHandle; mxEvent.Quality = 192; mxEvent.Value = new MxValue { DataType = MxDataType.String, StringValue = "scripted-value", }; mxEvent.OnDataChange = new OnDataChangeEvent(); }).ConfigureAwait(false); } } private static void ConfigureReply( MxCommandReply reply, MxCommandKind kind) { switch (kind) { case MxCommandKind.Register: reply.Register = new RegisterReply { ServerHandle = ServerHandle }; break; case MxCommandKind.AddItem: reply.AddItem = new AddItemReply { ItemHandle = ItemHandle }; break; } } } private sealed class FakeWorkerProcess(int processId) : IWorkerProcess { public int Id { get; } = processId; public bool HasExited { get; private set; } public int? ExitCode { get; private set; } public ValueTask WaitForExitAsync(CancellationToken cancellationToken) { HasExited = true; ExitCode ??= 0; return ValueTask.CompletedTask; } public void Kill(bool entireProcessTree) { MarkExited(-1); } public void Dispose() { } public void MarkExited(int exitCode) { HasExited = true; ExitCode = exitCode; } } private sealed class RecordingServerStreamWriter : IServerStreamWriter { private readonly object _syncRoot = new(); private readonly TaskCompletionSource _firstMessage = new(TaskCreationOptions.RunContinuationsAsynchronously); private readonly List _messages = []; public IReadOnlyList Messages { get { lock (_syncRoot) { return _messages.ToArray(); } } } public WriteOptions? WriteOptions { get; set; } public Task WriteAsync(T message) { lock (_syncRoot) { _messages.Add(message); } _firstMessage.TrySetResult(message); return Task.CompletedTask; } public async Task WaitForFirstMessageAsync(TimeSpan timeout) { return await _firstMessage.Task.WaitAsync(timeout).ConfigureAwait(false); } } private sealed class TestServerCallContext(CancellationToken cancellationToken = default) : ServerCallContext { private readonly Metadata _requestHeaders = []; private readonly Metadata _responseTrailers = []; private readonly Dictionary _userState = []; private Status _status; private WriteOptions? _writeOptions; protected override string MethodCore => "/mxaccess_gateway.v1.MxAccessGateway/Test"; protected override string HostCore => "localhost"; protected override string PeerCore => "ipv4:127.0.0.1:5000"; protected override DateTime DeadlineCore => DateTime.UtcNow.AddMinutes(1); protected override Metadata RequestHeadersCore => _requestHeaders; protected override CancellationToken CancellationTokenCore => cancellationToken; protected override Metadata ResponseTrailersCore => _responseTrailers; protected override Status StatusCore { get => _status; set => _status = value; } protected override WriteOptions? WriteOptionsCore { get => _writeOptions; set => _writeOptions = value; } protected override AuthContext AuthContextCore { get; } = new( string.Empty, new Dictionary>(StringComparer.Ordinal)); protected override IDictionary UserStateCore => _userState; protected override Task WriteResponseHeadersAsyncCore(Metadata responseHeaders) { return Task.CompletedTask; } protected override ContextPropagationToken CreatePropagationTokenCore( ContextPropagationOptions? options) { throw new NotSupportedException(); } } private sealed class AllowAllConstraintEnforcer : IConstraintEnforcer { public Task CheckReadTagAsync( ApiKeyIdentity? identity, string tagAddress, CancellationToken cancellationToken) => Task.FromResult(null); public Task CheckReadHandleAsync( ApiKeyIdentity? identity, GatewaySession session, int serverHandle, int itemHandle, CancellationToken cancellationToken) => Task.FromResult(null); public Task CheckWriteHandleAsync( ApiKeyIdentity? identity, GatewaySession session, int serverHandle, int itemHandle, CancellationToken cancellationToken) => Task.FromResult(null); public Task RecordDenialAsync( ApiKeyIdentity? identity, string commandKind, string target, ConstraintFailure failure, CancellationToken cancellationToken) => Task.CompletedTask; } }