using System.IO.Pipes; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using MxGateway.Contracts; using MxGateway.Contracts.Proto; using MxGateway.Server.Configuration; using MxGateway.Server.Metrics; using MxGateway.Server.Workers; namespace MxGateway.Server.Sessions; public sealed class SessionWorkerClientFactory : ISessionWorkerClientFactory { private readonly IWorkerProcessLauncher _workerProcessLauncher; private readonly GatewayMetrics _metrics; private readonly TimeProvider _timeProvider; private readonly ILoggerFactory _loggerFactory; private readonly GatewayOptions _options; public SessionWorkerClientFactory( IWorkerProcessLauncher workerProcessLauncher, IOptions options, GatewayMetrics metrics, ILoggerFactory loggerFactory, TimeProvider? timeProvider = null) { _workerProcessLauncher = workerProcessLauncher ?? throw new ArgumentNullException(nameof(workerProcessLauncher)); ArgumentNullException.ThrowIfNull(options); _metrics = metrics ?? throw new ArgumentNullException(nameof(metrics)); _loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory)); _timeProvider = timeProvider ?? TimeProvider.System; _options = options.Value; } public async Task CreateAsync( GatewaySession session, CancellationToken cancellationToken) { ArgumentNullException.ThrowIfNull(session); NamedPipeServerStream? pipe = CreatePipe(session.PipeName); WorkerProcessHandle? processHandle = null; IWorkerClient? workerClient = null; try { session.TransitionTo(SessionState.StartingWorker); processHandle = await _workerProcessLauncher .LaunchAsync( new WorkerProcessLaunchRequest( session.SessionId, session.PipeName, GatewayContractInfo.WorkerProtocolVersion, session.Nonce, pipe), cancellationToken) .ConfigureAwait(false); session.TransitionTo(SessionState.WaitingForPipe); await WaitForPipeConnectionAsync(pipe, session.StartupTimeout, cancellationToken).ConfigureAwait(false); session.TransitionTo(SessionState.Handshaking); WorkerFrameProtocolOptions frameOptions = new( session.SessionId, GatewayContractInfo.WorkerProtocolVersion, _options.Worker.MaxMessageBytes); WorkerClientConnection connection = new( session.SessionId, session.Nonce, pipe, frameOptions, processHandle); WorkerClientOptions clientOptions = new() { HeartbeatGrace = TimeSpan.FromSeconds(_options.Worker.HeartbeatGraceSeconds), HeartbeatCheckInterval = TimeSpan.FromSeconds(_options.Worker.HeartbeatIntervalSeconds), EventChannelCapacity = _options.Events.QueueCapacity, }; workerClient = new WorkerClient( connection, clientOptions, _metrics, _timeProvider, _loggerFactory.CreateLogger()); pipe = null; processHandle = null; session.TransitionTo(SessionState.InitializingWorker); await workerClient.StartAsync(cancellationToken).ConfigureAwait(false); return workerClient; } catch { if (workerClient is not null) { await workerClient.DisposeAsync().ConfigureAwait(false); } else { if (processHandle is not null) { try { if (!processHandle.Process.HasExited) { processHandle.Process.Kill(entireProcessTree: true); _metrics.WorkerKilled("OpenSessionFailed"); } } finally { processHandle.Dispose(); } } pipe?.Dispose(); } throw; } } private static NamedPipeServerStream CreatePipe(string pipeName) { return new NamedPipeServerStream( pipeName, PipeDirection.InOut, maxNumberOfServerInstances: 1, PipeTransmissionMode.Byte, PipeOptions.Asynchronous); } private static async Task WaitForPipeConnectionAsync( NamedPipeServerStream pipe, TimeSpan startupTimeout, CancellationToken cancellationToken) { using CancellationTokenSource timeout = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); timeout.CancelAfter(startupTimeout); await pipe.WaitForConnectionAsync(timeout.Token).ConfigureAwait(false); } }