using Grpc.Core;
using Grpc.Net.Client;
using Microsoft.Extensions.Logging;
using MxGateway.Contracts.Proto;
using Polly;
using System.Net.Http;
using System.Net.Security;
using System.Security.Cryptography.X509Certificates;
namespace MxGateway.Client;
///
/// Provides the .NET client entry point for the public MXAccess Gateway gRPC API.
///
public sealed class MxGatewayClient : IAsyncDisposable
{
private readonly GrpcChannel _channel;
private readonly IMxGatewayClientTransport _transport;
private readonly ResiliencePipeline _safeUnaryRetryPipeline;
private int _disposed;
///
/// Initializes a new instance of the with given options and transport.
///
/// Client configuration options.
/// Transport implementation for gateway communication.
internal MxGatewayClient(
MxGatewayClientOptions options,
IMxGatewayClientTransport transport)
{
ArgumentNullException.ThrowIfNull(options);
options.Validate();
Options = options;
_transport = transport ?? throw new ArgumentNullException(nameof(transport));
_safeUnaryRetryPipeline = MxGatewayClientRetryPolicy.Create(
options.Retry,
options.LoggerFactory?.CreateLogger());
_channel = null!;
}
private MxGatewayClient(
GrpcChannel channel,
IMxGatewayClientTransport transport)
{
_channel = channel;
_transport = transport;
Options = transport.Options;
_safeUnaryRetryPipeline = MxGatewayClientRetryPolicy.Create(
Options.Retry,
Options.LoggerFactory?.CreateLogger());
}
///
/// Gets the client configuration options.
///
public MxGatewayClientOptions Options { get; }
///
/// Gets the underlying generated gRPC client.
///
public MxAccessGateway.MxAccessGatewayClient RawClient =>
_transport.RawClient
?? throw new InvalidOperationException("The raw generated gRPC client is not available for this client instance.");
///
/// Creates a new gateway client with the given options.
///
/// Client configuration options.
/// A new gateway client instance.
public static MxGatewayClient Create(MxGatewayClientOptions options)
{
ArgumentNullException.ThrowIfNull(options);
options.Validate();
HttpMessageHandler handler = CreateHttpHandler(options);
var channel = GrpcChannel.ForAddress(
options.Endpoint,
new GrpcChannelOptions
{
HttpHandler = handler,
LoggerFactory = options.LoggerFactory,
MaxReceiveMessageSize = options.MaxGrpcMessageBytes,
MaxSendMessageSize = options.MaxGrpcMessageBytes,
});
return new MxGatewayClient(
channel,
new GrpcMxGatewayClientTransport(
options,
new MxAccessGateway.MxAccessGatewayClient(channel)));
}
///
/// Opens a new gateway session.
///
/// Session open request; defaults to empty request if null.
/// Cancellation token for the operation.
/// A wrapped gateway session.
public async Task OpenSessionAsync(
OpenSessionRequest? request = null,
CancellationToken cancellationToken = default)
{
OpenSessionReply reply = await OpenSessionRawAsync(
request ?? new OpenSessionRequest(),
cancellationToken)
.ConfigureAwait(false);
return new MxGatewaySession(this, reply);
}
///
/// Opens a new gateway session and returns the raw protobuf reply.
///
/// Session open request.
/// Cancellation token for the operation.
/// The raw gateway session open reply.
public Task OpenSessionRawAsync(
OpenSessionRequest request,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(request);
ThrowIfDisposed();
return _transport.OpenSessionAsync(request, CreateCallOptions(cancellationToken));
}
///
/// Closes an open gateway session.
///
/// Session close request.
/// Cancellation token for the operation.
/// The session close reply.
public Task CloseSessionRawAsync(
CloseSessionRequest request,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(request);
ThrowIfDisposed();
return ExecuteSafeUnaryAsync(
token => _transport.CloseSessionAsync(request, CreateCallOptions(token)),
cancellationToken);
}
///
/// Invokes an MXAccess command on the open session.
///
/// The command request.
/// Cancellation token for the operation.
/// The command reply.
public Task InvokeAsync(
MxCommandRequest request,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(request);
ThrowIfDisposed();
if (MxGatewayClientRetryPolicy.IsRetryableCommand(request.Command?.Kind ?? MxCommandKind.Unspecified))
{
return ExecuteSafeUnaryAsync(
token => _transport.InvokeAsync(request, CreateCallOptions(token)),
cancellationToken);
}
return _transport.InvokeAsync(request, CreateCallOptions(cancellationToken));
}
///
/// Streams events from the gateway session.
///
/// The stream events request.
/// Cancellation token for the operation.
/// An async enumerable of events.
public IAsyncEnumerable StreamEventsAsync(
StreamEventsRequest request,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(request);
ThrowIfDisposed();
return _transport.StreamEventsAsync(request, CreateStreamCallOptions(cancellationToken));
}
///
/// Acknowledges an active MXAccess alarm condition through the gateway. The
/// gateway authorizes against the API
/// key's admin scope (there is no finer-grained alarm-ack sub-scope)
/// and forwards the acknowledge to the worker's MXAccess session; the
/// resulting is returned in the reply.
///
/// The acknowledge request — alarm reference, comment, operator user.
/// Cancellation token for the operation.
/// The acknowledge reply with protocol + native MxStatus.
public Task AcknowledgeAlarmAsync(
AcknowledgeAlarmRequest request,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(request);
ThrowIfDisposed();
return ExecuteSafeUnaryAsync(
token => _transport.AcknowledgeAlarmAsync(request, CreateCallOptions(token)),
cancellationToken);
}
///
/// Attaches to the gateway's central alarm feed. The stream opens with one
/// per currently-active alarm (the
/// ConditionRefresh snapshot), then a single snapshot_complete, then a
/// transition for every subsequent raise / acknowledge / clear. Served
/// by the gateway's always-on alarm monitor — no worker session is opened, so
/// any number of clients may attach. Optionally scoped by alarm-reference
/// prefix ().
///
/// The stream request, optionally scoped by alarm-reference prefix.
/// Cancellation token for the stream.
/// An async enumerable of alarm feed messages.
public IAsyncEnumerable StreamAlarmsAsync(
StreamAlarmsRequest request,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(request);
ThrowIfDisposed();
return _transport.StreamAlarmsAsync(request, CreateStreamCallOptions(cancellationToken));
}
///
/// Disposes the client and releases all resources.
///
public ValueTask DisposeAsync()
{
if (Interlocked.Exchange(ref _disposed, 1) != 0)
{
return ValueTask.CompletedTask;
}
_channel?.Dispose();
return ValueTask.CompletedTask;
}
///
/// Creates gRPC call options with default timeout and authorization.
///
/// Cancellation token for the call.
/// Configured call options.
internal CallOptions CreateCallOptions(CancellationToken cancellationToken)
{
return CreateCallOptions(cancellationToken, Options.DefaultCallTimeout);
}
///
/// Creates gRPC call options for streaming with stream timeout and authorization.
///
/// Cancellation token for the call.
/// Configured call options.
internal CallOptions CreateStreamCallOptions(CancellationToken cancellationToken)
{
return CreateCallOptions(cancellationToken, Options.StreamTimeout);
}
///
/// Creates gRPC call options with specified timeout and authorization.
///
/// Cancellation token for the call.
/// Optional timeout duration; null means no timeout.
/// Configured call options.
internal CallOptions CreateCallOptions(
CancellationToken cancellationToken,
TimeSpan? timeout)
{
Metadata headers = new()
{
{ "authorization", $"Bearer {Options.ApiKey}" },
};
return new CallOptions(
headers,
timeout is null ? null : DateTime.UtcNow.Add(timeout.Value),
cancellationToken);
}
private async Task ExecuteSafeUnaryAsync(
Func> call,
CancellationToken cancellationToken)
{
using CancellationTokenSource timeout = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
timeout.CancelAfter(Options.DefaultCallTimeout);
return await _safeUnaryRetryPipeline.ExecuteAsync(
async token => await call(token).ConfigureAwait(false),
timeout.Token)
.ConfigureAwait(false);
}
private static HttpMessageHandler CreateHttpHandler(MxGatewayClientOptions options)
{
SocketsHttpHandler handler = new()
{
ConnectTimeout = options.ConnectTimeout,
};
if (options.UseTls)
{
handler.SslOptions = new SslClientAuthenticationOptions();
if (!string.IsNullOrWhiteSpace(options.ServerNameOverride))
{
handler.SslOptions.TargetHost = options.ServerNameOverride;
}
if (!string.IsNullOrWhiteSpace(options.CaCertificatePath))
{
X509Certificate2 trustedRoot = X509CertificateLoader.LoadCertificateFromFile(options.CaCertificatePath);
handler.SslOptions.RemoteCertificateValidationCallback = (_, certificate, chain, errors) =>
{
if (certificate is null)
{
return false;
}
using X509Chain customChain = new();
customChain.ChainPolicy.TrustMode = X509ChainTrustMode.CustomRootTrust;
customChain.ChainPolicy.CustomTrustStore.Add(trustedRoot);
customChain.ChainPolicy.RevocationMode = X509RevocationMode.NoCheck;
customChain.ChainPolicy.VerificationFlags = X509VerificationFlags.NoFlag;
X509Certificate2 certificateToValidate = certificate as X509Certificate2
?? X509CertificateLoader.LoadCertificate(certificate.Export(X509ContentType.Cert));
return customChain.Build(certificateToValidate);
};
}
}
return handler;
}
private void ThrowIfDisposed()
{
ObjectDisposedException.ThrowIf(Volatile.Read(ref _disposed) != 0, this);
}
}