using Grpc.Core; using MxGateway.Contracts.Proto; namespace MxGateway.Client; internal sealed class GrpcMxGatewayClientTransport( MxGatewayClientOptions options, MxAccessGateway.MxAccessGatewayClient rawClient) : IMxGatewayClientTransport { public MxGatewayClientOptions Options { get; } = options; public MxAccessGateway.MxAccessGatewayClient RawClient { get; } = rawClient; MxAccessGateway.MxAccessGatewayClient? IMxGatewayClientTransport.RawClient => RawClient; public async Task OpenSessionAsync( OpenSessionRequest request, CallOptions callOptions) { try { return await RawClient.OpenSessionAsync(request, callOptions) .ResponseAsync .ConfigureAwait(false); } catch (RpcException exception) { throw MapRpcException(exception); } } public async Task CloseSessionAsync( CloseSessionRequest request, CallOptions callOptions) { try { return await RawClient.CloseSessionAsync(request, callOptions) .ResponseAsync .ConfigureAwait(false); } catch (RpcException exception) { throw MapRpcException(exception); } } public async Task InvokeAsync( MxCommandRequest request, CallOptions callOptions) { try { return await RawClient.InvokeAsync(request, callOptions) .ResponseAsync .ConfigureAwait(false); } catch (RpcException exception) { throw MapRpcException(exception); } } public async IAsyncEnumerable StreamEventsAsync( StreamEventsRequest request, CallOptions callOptions, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default) { CancellationToken effectiveCancellationToken = cancellationToken.CanBeCanceled ? cancellationToken : callOptions.CancellationToken; using AsyncServerStreamingCall call = RawClient.StreamEvents(request, callOptions); IAsyncStreamReader responseStream = call.ResponseStream; while (true) { MxEvent? gatewayEvent; try { if (!await responseStream.MoveNext(effectiveCancellationToken).ConfigureAwait(false)) { break; } gatewayEvent = responseStream.Current; } catch (RpcException exception) { throw MapRpcException(exception); } yield return gatewayEvent; } } IAsyncEnumerable IMxGatewayClientTransport.StreamEventsAsync( StreamEventsRequest request, CallOptions callOptions) { return StreamEventsAsync(request, callOptions); } private static MxGatewayException MapRpcException(RpcException exception) { return exception.StatusCode switch { StatusCode.Unauthenticated => new MxGatewayAuthenticationException( exception.Status.Detail, innerException: exception), StatusCode.PermissionDenied => new MxGatewayAuthorizationException( exception.Status.Detail, innerException: exception), _ => new MxGatewayException(exception.Status.Detail, exception), }; } }