382 lines
13 KiB
C#
382 lines
13 KiB
C#
using Google.Protobuf.WellKnownTypes;
|
|
using Grpc.Core;
|
|
using Grpc.Net.Client;
|
|
using Microsoft.Extensions.Logging;
|
|
using MxGateway.Contracts.Proto.Galaxy;
|
|
using Polly;
|
|
using System.Net.Http;
|
|
using System.Net.Security;
|
|
using System.Runtime.CompilerServices;
|
|
using System.Security.Cryptography.X509Certificates;
|
|
|
|
namespace MxGateway.Client;
|
|
|
|
/// <summary>
|
|
/// Provides the .NET client entry point for the public Galaxy Repository gRPC API.
|
|
/// All RPCs are read-only metadata calls that share the gateway's API-key auth
|
|
/// interceptor and require the <c>metadata:read</c> scope server-side.
|
|
/// </summary>
|
|
public sealed class GalaxyRepositoryClient : IAsyncDisposable
|
|
{
|
|
private const int DiscoverHierarchyPageSize = 5000;
|
|
|
|
private readonly GrpcChannel? _channel;
|
|
private readonly IGalaxyRepositoryClientTransport _transport;
|
|
private readonly ResiliencePipeline _safeUnaryRetryPipeline;
|
|
private bool _disposed;
|
|
|
|
internal GalaxyRepositoryClient(
|
|
MxGatewayClientOptions options,
|
|
IGalaxyRepositoryClientTransport transport)
|
|
{
|
|
ArgumentNullException.ThrowIfNull(options);
|
|
options.Validate();
|
|
|
|
Options = options;
|
|
_transport = transport ?? throw new ArgumentNullException(nameof(transport));
|
|
_safeUnaryRetryPipeline = MxGatewayClientRetryPolicy.Create(
|
|
options.Retry,
|
|
options.LoggerFactory?.CreateLogger<GalaxyRepositoryClient>());
|
|
_channel = null;
|
|
}
|
|
|
|
private GalaxyRepositoryClient(
|
|
GrpcChannel channel,
|
|
IGalaxyRepositoryClientTransport transport)
|
|
{
|
|
_channel = channel;
|
|
_transport = transport;
|
|
Options = transport.Options;
|
|
_safeUnaryRetryPipeline = MxGatewayClientRetryPolicy.Create(
|
|
Options.Retry,
|
|
Options.LoggerFactory?.CreateLogger<GalaxyRepositoryClient>());
|
|
}
|
|
|
|
public MxGatewayClientOptions Options { get; }
|
|
|
|
public GalaxyRepository.GalaxyRepositoryClient RawClient =>
|
|
_transport.RawClient
|
|
?? throw new InvalidOperationException("The raw generated gRPC client is not available for this client instance.");
|
|
|
|
public static GalaxyRepositoryClient Create(MxGatewayClientOptions options)
|
|
{
|
|
ArgumentNullException.ThrowIfNull(options);
|
|
options.Validate();
|
|
|
|
HttpMessageHandler handler = CreateHttpHandler(options);
|
|
var channel = GrpcChannel.ForAddress(
|
|
options.Endpoint,
|
|
new GrpcChannelOptions
|
|
{
|
|
HttpHandler = handler,
|
|
LoggerFactory = options.LoggerFactory,
|
|
MaxReceiveMessageSize = options.MaxGrpcMessageBytes,
|
|
MaxSendMessageSize = options.MaxGrpcMessageBytes,
|
|
});
|
|
|
|
return new GalaxyRepositoryClient(
|
|
channel,
|
|
new GrpcGalaxyRepositoryClientTransport(
|
|
options,
|
|
new GalaxyRepository.GalaxyRepositoryClient(channel)));
|
|
}
|
|
|
|
/// <summary>
|
|
/// Probes the Galaxy Repository database connection. Returns true when the
|
|
/// gateway can reach the configured ZB SQL Server.
|
|
/// </summary>
|
|
public async Task<bool> TestConnectionAsync(CancellationToken cancellationToken = default)
|
|
{
|
|
TestConnectionReply reply = await TestConnectionRawAsync(
|
|
new TestConnectionRequest(),
|
|
cancellationToken)
|
|
.ConfigureAwait(false);
|
|
|
|
return reply.Ok;
|
|
}
|
|
|
|
public Task<TestConnectionReply> TestConnectionRawAsync(
|
|
TestConnectionRequest request,
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
ArgumentNullException.ThrowIfNull(request);
|
|
ThrowIfDisposed();
|
|
|
|
return ExecuteSafeUnaryAsync(
|
|
token => _transport.TestConnectionAsync(request, CreateCallOptions(token)),
|
|
cancellationToken);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Returns the timestamp of the most recent Galaxy deployment, or
|
|
/// <see langword="null"/> when no deployment has been recorded.
|
|
/// </summary>
|
|
public async Task<DateTime?> GetLastDeployTimeAsync(CancellationToken cancellationToken = default)
|
|
{
|
|
GetLastDeployTimeReply reply = await GetLastDeployTimeRawAsync(
|
|
new GetLastDeployTimeRequest(),
|
|
cancellationToken)
|
|
.ConfigureAwait(false);
|
|
|
|
if (!reply.Present || reply.TimeOfLastDeploy is null)
|
|
{
|
|
return null;
|
|
}
|
|
|
|
return reply.TimeOfLastDeploy.ToDateTime();
|
|
}
|
|
|
|
public Task<GetLastDeployTimeReply> GetLastDeployTimeRawAsync(
|
|
GetLastDeployTimeRequest request,
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
ArgumentNullException.ThrowIfNull(request);
|
|
ThrowIfDisposed();
|
|
|
|
return ExecuteSafeUnaryAsync(
|
|
token => _transport.GetLastDeployTimeAsync(request, CreateCallOptions(token)),
|
|
cancellationToken);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Enumerates the deployed Galaxy object hierarchy. Each <see cref="GalaxyObject"/>
|
|
/// includes its dynamic attributes so callers can determine which tag references
|
|
/// they may subscribe to via the MxAccessGateway service.
|
|
/// </summary>
|
|
public async Task<IReadOnlyList<GalaxyObject>> DiscoverHierarchyAsync(CancellationToken cancellationToken = default)
|
|
{
|
|
return await DiscoverHierarchyAsync(new DiscoverHierarchyOptions(), cancellationToken).ConfigureAwait(false);
|
|
}
|
|
|
|
public async Task<IReadOnlyList<GalaxyObject>> DiscoverHierarchyAsync(
|
|
DiscoverHierarchyOptions options,
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
List<GalaxyObject> objects = [];
|
|
HashSet<string> seenPageTokens = new(StringComparer.Ordinal);
|
|
string pageToken = string.Empty;
|
|
do
|
|
{
|
|
DiscoverHierarchyRequest request = CreateDiscoverHierarchyRequest(options);
|
|
request.PageSize = DiscoverHierarchyPageSize;
|
|
request.PageToken = pageToken;
|
|
DiscoverHierarchyReply reply = await DiscoverHierarchyRawAsync(
|
|
request,
|
|
cancellationToken)
|
|
.ConfigureAwait(false);
|
|
|
|
objects.AddRange(reply.Objects);
|
|
pageToken = reply.NextPageToken;
|
|
if (!string.IsNullOrWhiteSpace(pageToken)
|
|
&& !seenPageTokens.Add(pageToken))
|
|
{
|
|
throw new MxGatewayException(
|
|
$"Galaxy DiscoverHierarchy returned a repeated page token '{pageToken}'.");
|
|
}
|
|
}
|
|
while (!string.IsNullOrWhiteSpace(pageToken));
|
|
|
|
return objects;
|
|
}
|
|
|
|
private static DiscoverHierarchyRequest CreateDiscoverHierarchyRequest(DiscoverHierarchyOptions options)
|
|
{
|
|
ArgumentNullException.ThrowIfNull(options);
|
|
|
|
DiscoverHierarchyRequest request = new()
|
|
{
|
|
AlarmBearingOnly = options.AlarmBearingOnly,
|
|
HistorizedOnly = options.HistorizedOnly,
|
|
};
|
|
|
|
if (options.RootGobjectId.HasValue)
|
|
{
|
|
request.RootGobjectId = options.RootGobjectId.Value;
|
|
}
|
|
else if (!string.IsNullOrWhiteSpace(options.RootTagName))
|
|
{
|
|
request.RootTagName = options.RootTagName;
|
|
}
|
|
else if (!string.IsNullOrWhiteSpace(options.RootContainedPath))
|
|
{
|
|
request.RootContainedPath = options.RootContainedPath;
|
|
}
|
|
|
|
if (options.MaxDepth.HasValue)
|
|
{
|
|
request.MaxDepth = options.MaxDepth.Value;
|
|
}
|
|
|
|
request.CategoryIds.Add(options.CategoryIds);
|
|
request.TemplateChainContains.Add(options.TemplateChainContains);
|
|
if (!string.IsNullOrWhiteSpace(options.TagNameGlob))
|
|
{
|
|
request.TagNameGlob = options.TagNameGlob;
|
|
}
|
|
|
|
if (options.IncludeAttributes.HasValue)
|
|
{
|
|
request.IncludeAttributes = options.IncludeAttributes.Value;
|
|
}
|
|
|
|
return request;
|
|
}
|
|
|
|
public Task<DiscoverHierarchyReply> DiscoverHierarchyRawAsync(
|
|
DiscoverHierarchyRequest request,
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
ArgumentNullException.ThrowIfNull(request);
|
|
ThrowIfDisposed();
|
|
|
|
return ExecuteSafeUnaryAsync(
|
|
token => _transport.DiscoverHierarchyAsync(request, CreateCallOptions(token)),
|
|
cancellationToken);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Subscribes to Galaxy deploy events. The server emits a bootstrap event with the
|
|
/// current state on subscribe so callers can prime their cache, then emits one event
|
|
/// per new <c>time_of_last_deploy</c>. Pass <paramref name="lastSeenDeployTime"/> to
|
|
/// suppress the bootstrap when the caller already holds the current deploy time.
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// Streaming RPCs are not wrapped by the unary safe-read retry pipeline. If the
|
|
/// stream is interrupted the caller must reopen it; the server does not guarantee
|
|
/// at-least-once delivery beyond the per-subscriber buffer (gaps in
|
|
/// <see cref="DeployEvent.Sequence"/> indicate dropped events).
|
|
/// </remarks>
|
|
public IAsyncEnumerable<DeployEvent> WatchDeployEventsAsync(
|
|
DateTimeOffset? lastSeenDeployTime = null,
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
ThrowIfDisposed();
|
|
|
|
WatchDeployEventsRequest request = new();
|
|
if (lastSeenDeployTime is { } seen)
|
|
{
|
|
request.LastSeenDeployTime = Timestamp.FromDateTimeOffset(seen);
|
|
}
|
|
|
|
return WatchDeployEventsRawAsync(request, cancellationToken);
|
|
}
|
|
|
|
public IAsyncEnumerable<DeployEvent> WatchDeployEventsRawAsync(
|
|
WatchDeployEventsRequest request,
|
|
CancellationToken cancellationToken = default)
|
|
{
|
|
ArgumentNullException.ThrowIfNull(request);
|
|
ThrowIfDisposed();
|
|
|
|
return WatchDeployEventsCoreAsync(request, cancellationToken);
|
|
}
|
|
|
|
private async IAsyncEnumerable<DeployEvent> WatchDeployEventsCoreAsync(
|
|
WatchDeployEventsRequest request,
|
|
[EnumeratorCancellation] CancellationToken cancellationToken)
|
|
{
|
|
await foreach (DeployEvent deployEvent in _transport
|
|
.WatchDeployEventsAsync(request, CreateStreamCallOptions(cancellationToken))
|
|
.WithCancellation(cancellationToken)
|
|
.ConfigureAwait(false))
|
|
{
|
|
yield return deployEvent;
|
|
}
|
|
}
|
|
|
|
public ValueTask DisposeAsync()
|
|
{
|
|
if (_disposed)
|
|
{
|
|
return ValueTask.CompletedTask;
|
|
}
|
|
|
|
_disposed = true;
|
|
_channel?.Dispose();
|
|
return ValueTask.CompletedTask;
|
|
}
|
|
|
|
internal CallOptions CreateCallOptions(CancellationToken cancellationToken)
|
|
{
|
|
return CreateCallOptions(cancellationToken, Options.DefaultCallTimeout);
|
|
}
|
|
|
|
internal CallOptions CreateStreamCallOptions(CancellationToken cancellationToken)
|
|
{
|
|
return CreateCallOptions(cancellationToken, Options.StreamTimeout);
|
|
}
|
|
|
|
internal CallOptions CreateCallOptions(
|
|
CancellationToken cancellationToken,
|
|
TimeSpan? timeout)
|
|
{
|
|
Metadata headers = new()
|
|
{
|
|
{ "authorization", $"Bearer {Options.ApiKey}" },
|
|
};
|
|
|
|
return new CallOptions(
|
|
headers,
|
|
timeout is null ? null : DateTime.UtcNow.Add(timeout.Value),
|
|
cancellationToken);
|
|
}
|
|
|
|
private async Task<T> ExecuteSafeUnaryAsync<T>(
|
|
Func<CancellationToken, Task<T>> call,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
using CancellationTokenSource timeout = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
|
|
timeout.CancelAfter(Options.DefaultCallTimeout);
|
|
|
|
return await _safeUnaryRetryPipeline.ExecuteAsync(
|
|
async token => await call(token).ConfigureAwait(false),
|
|
timeout.Token)
|
|
.ConfigureAwait(false);
|
|
}
|
|
|
|
private static HttpMessageHandler CreateHttpHandler(MxGatewayClientOptions options)
|
|
{
|
|
SocketsHttpHandler handler = new()
|
|
{
|
|
ConnectTimeout = options.ConnectTimeout,
|
|
};
|
|
|
|
if (options.UseTls)
|
|
{
|
|
handler.SslOptions = new SslClientAuthenticationOptions();
|
|
if (!string.IsNullOrWhiteSpace(options.ServerNameOverride))
|
|
{
|
|
handler.SslOptions.TargetHost = options.ServerNameOverride;
|
|
}
|
|
|
|
if (!string.IsNullOrWhiteSpace(options.CaCertificatePath))
|
|
{
|
|
X509Certificate2 trustedRoot = X509CertificateLoader.LoadCertificateFromFile(options.CaCertificatePath);
|
|
handler.SslOptions.RemoteCertificateValidationCallback = (_, certificate, chain, errors) =>
|
|
{
|
|
if (certificate is null)
|
|
{
|
|
return false;
|
|
}
|
|
|
|
using X509Chain customChain = new();
|
|
customChain.ChainPolicy.TrustMode = X509ChainTrustMode.CustomRootTrust;
|
|
customChain.ChainPolicy.CustomTrustStore.Add(trustedRoot);
|
|
customChain.ChainPolicy.RevocationMode = X509RevocationMode.NoCheck;
|
|
customChain.ChainPolicy.VerificationFlags = X509VerificationFlags.NoFlag;
|
|
X509Certificate2 certificateToValidate = certificate as X509Certificate2
|
|
?? X509CertificateLoader.LoadCertificate(certificate.Export(X509ContentType.Cert));
|
|
return customChain.Build(certificateToValidate);
|
|
};
|
|
}
|
|
}
|
|
|
|
return handler;
|
|
}
|
|
|
|
private void ThrowIfDisposed()
|
|
{
|
|
ObjectDisposedException.ThrowIf(_disposed, this);
|
|
}
|
|
}
|