using Grpc.Core; using Grpc.Net.Client; using Microsoft.Extensions.Logging; using MxGateway.Contracts.Proto; using Polly; using System.Net.Http; using System.Net.Security; using System.Security.Cryptography.X509Certificates; namespace MxGateway.Client; /// /// Provides the .NET client entry point for the public MXAccess Gateway gRPC API. /// public sealed class MxGatewayClient : IAsyncDisposable { private readonly GrpcChannel _channel; private readonly IMxGatewayClientTransport _transport; private readonly ResiliencePipeline _safeUnaryRetryPipeline; private int _disposed; /// /// Initializes a new instance of the with given options and transport. /// /// Client configuration options. /// Transport implementation for gateway communication. internal MxGatewayClient( MxGatewayClientOptions options, IMxGatewayClientTransport transport) { ArgumentNullException.ThrowIfNull(options); options.Validate(); Options = options; _transport = transport ?? throw new ArgumentNullException(nameof(transport)); _safeUnaryRetryPipeline = MxGatewayClientRetryPolicy.Create( options.Retry, options.LoggerFactory?.CreateLogger()); _channel = null!; } private MxGatewayClient( GrpcChannel channel, IMxGatewayClientTransport transport) { _channel = channel; _transport = transport; Options = transport.Options; _safeUnaryRetryPipeline = MxGatewayClientRetryPolicy.Create( Options.Retry, Options.LoggerFactory?.CreateLogger()); } /// /// Gets the client configuration options. /// public MxGatewayClientOptions Options { get; } /// /// Gets the underlying generated gRPC client. /// public MxAccessGateway.MxAccessGatewayClient RawClient => _transport.RawClient ?? throw new InvalidOperationException("The raw generated gRPC client is not available for this client instance."); /// /// Creates a new gateway client with the given options. /// /// Client configuration options. /// A new gateway client instance. public static MxGatewayClient Create(MxGatewayClientOptions options) { ArgumentNullException.ThrowIfNull(options); options.Validate(); HttpMessageHandler handler = CreateHttpHandler(options); var channel = GrpcChannel.ForAddress( options.Endpoint, new GrpcChannelOptions { HttpHandler = handler, LoggerFactory = options.LoggerFactory, MaxReceiveMessageSize = options.MaxGrpcMessageBytes, MaxSendMessageSize = options.MaxGrpcMessageBytes, }); return new MxGatewayClient( channel, new GrpcMxGatewayClientTransport( options, new MxAccessGateway.MxAccessGatewayClient(channel))); } /// /// Opens a new gateway session. /// /// Session open request; defaults to empty request if null. /// Cancellation token for the operation. /// A wrapped gateway session. public async Task OpenSessionAsync( OpenSessionRequest? request = null, CancellationToken cancellationToken = default) { OpenSessionReply reply = await OpenSessionRawAsync( request ?? new OpenSessionRequest(), cancellationToken) .ConfigureAwait(false); return new MxGatewaySession(this, reply); } /// /// Opens a new gateway session and returns the raw protobuf reply. /// /// Session open request. /// Cancellation token for the operation. /// The raw gateway session open reply. public Task OpenSessionRawAsync( OpenSessionRequest request, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(request); ThrowIfDisposed(); return _transport.OpenSessionAsync(request, CreateCallOptions(cancellationToken)); } /// /// Closes an open gateway session. /// /// Session close request. /// Cancellation token for the operation. /// The session close reply. public Task CloseSessionRawAsync( CloseSessionRequest request, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(request); ThrowIfDisposed(); return ExecuteSafeUnaryAsync( token => _transport.CloseSessionAsync(request, CreateCallOptions(token)), cancellationToken); } /// /// Invokes an MXAccess command on the open session. /// /// The command request. /// Cancellation token for the operation. /// The command reply. public Task InvokeAsync( MxCommandRequest request, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(request); ThrowIfDisposed(); if (MxGatewayClientRetryPolicy.IsRetryableCommand(request.Command?.Kind ?? MxCommandKind.Unspecified)) { return ExecuteSafeUnaryAsync( token => _transport.InvokeAsync(request, CreateCallOptions(token)), cancellationToken); } return _transport.InvokeAsync(request, CreateCallOptions(cancellationToken)); } /// /// Streams events from the gateway session. /// /// The stream events request. /// Cancellation token for the operation. /// An async enumerable of events. public IAsyncEnumerable StreamEventsAsync( StreamEventsRequest request, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(request); ThrowIfDisposed(); return _transport.StreamEventsAsync(request, CreateStreamCallOptions(cancellationToken)); } /// /// Acknowledges an active MXAccess alarm condition through the gateway. The /// gateway authorizes against the API /// key's admin scope (there is no finer-grained alarm-ack sub-scope) /// and forwards the acknowledge to the worker's MXAccess session; the /// resulting is returned in the reply. /// /// The acknowledge request — alarm reference, comment, operator user. /// Cancellation token for the operation. /// The acknowledge reply with protocol + native MxStatus. public Task AcknowledgeAlarmAsync( AcknowledgeAlarmRequest request, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(request); ThrowIfDisposed(); return ExecuteSafeUnaryAsync( token => _transport.AcknowledgeAlarmAsync(request, CreateCallOptions(token)), cancellationToken); } /// /// Attaches to the gateway's central alarm feed. The stream opens with one /// per currently-active alarm (the /// ConditionRefresh snapshot), then a single snapshot_complete, then a /// transition for every subsequent raise / acknowledge / clear. Served /// by the gateway's always-on alarm monitor — no worker session is opened, so /// any number of clients may attach. Optionally scoped by alarm-reference /// prefix (). /// /// The stream request, optionally scoped by alarm-reference prefix. /// Cancellation token for the stream. /// An async enumerable of alarm feed messages. public IAsyncEnumerable StreamAlarmsAsync( StreamAlarmsRequest request, CancellationToken cancellationToken = default) { ArgumentNullException.ThrowIfNull(request); ThrowIfDisposed(); return _transport.StreamAlarmsAsync(request, CreateStreamCallOptions(cancellationToken)); } /// /// Disposes the client and releases all resources. /// public ValueTask DisposeAsync() { if (Interlocked.Exchange(ref _disposed, 1) != 0) { return ValueTask.CompletedTask; } _channel?.Dispose(); return ValueTask.CompletedTask; } /// /// Creates gRPC call options with default timeout and authorization. /// /// Cancellation token for the call. /// Configured call options. internal CallOptions CreateCallOptions(CancellationToken cancellationToken) { return CreateCallOptions(cancellationToken, Options.DefaultCallTimeout); } /// /// Creates gRPC call options for streaming with stream timeout and authorization. /// /// Cancellation token for the call. /// Configured call options. internal CallOptions CreateStreamCallOptions(CancellationToken cancellationToken) { return CreateCallOptions(cancellationToken, Options.StreamTimeout); } /// /// Creates gRPC call options with specified timeout and authorization. /// /// Cancellation token for the call. /// Optional timeout duration; null means no timeout. /// Configured call options. internal CallOptions CreateCallOptions( CancellationToken cancellationToken, TimeSpan? timeout) { Metadata headers = new() { { "authorization", $"Bearer {Options.ApiKey}" }, }; return new CallOptions( headers, timeout is null ? null : DateTime.UtcNow.Add(timeout.Value), cancellationToken); } private async Task ExecuteSafeUnaryAsync( Func> call, CancellationToken cancellationToken) { using CancellationTokenSource timeout = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); timeout.CancelAfter(Options.DefaultCallTimeout); return await _safeUnaryRetryPipeline.ExecuteAsync( async token => await call(token).ConfigureAwait(false), timeout.Token) .ConfigureAwait(false); } private static HttpMessageHandler CreateHttpHandler(MxGatewayClientOptions options) { SocketsHttpHandler handler = new() { ConnectTimeout = options.ConnectTimeout, }; if (options.UseTls) { handler.SslOptions = new SslClientAuthenticationOptions(); if (!string.IsNullOrWhiteSpace(options.ServerNameOverride)) { handler.SslOptions.TargetHost = options.ServerNameOverride; } if (!string.IsNullOrWhiteSpace(options.CaCertificatePath)) { X509Certificate2 trustedRoot = X509CertificateLoader.LoadCertificateFromFile(options.CaCertificatePath); handler.SslOptions.RemoteCertificateValidationCallback = (_, certificate, chain, errors) => { if (certificate is null) { return false; } using X509Chain customChain = new(); customChain.ChainPolicy.TrustMode = X509ChainTrustMode.CustomRootTrust; customChain.ChainPolicy.CustomTrustStore.Add(trustedRoot); customChain.ChainPolicy.RevocationMode = X509RevocationMode.NoCheck; customChain.ChainPolicy.VerificationFlags = X509VerificationFlags.NoFlag; X509Certificate2 certificateToValidate = certificate as X509Certificate2 ?? X509CertificateLoader.LoadCertificate(certificate.Export(X509ContentType.Cert)); return customChain.Build(certificateToValidate); }; } } return handler; } private void ThrowIfDisposed() { ObjectDisposedException.ThrowIf(Volatile.Read(ref _disposed) != 0, this); } }