diff --git a/ZB.MOM.WW.GalaxyRepository/src/ZB.MOM.WW.GalaxyRepository/DependencyInjection/GalaxyRepositoryServiceCollectionExtensions.cs b/ZB.MOM.WW.GalaxyRepository/src/ZB.MOM.WW.GalaxyRepository/DependencyInjection/GalaxyRepositoryServiceCollectionExtensions.cs new file mode 100644 index 0000000..b69895d --- /dev/null +++ b/ZB.MOM.WW.GalaxyRepository/src/ZB.MOM.WW.GalaxyRepository/DependencyInjection/GalaxyRepositoryServiceCollectionExtensions.cs @@ -0,0 +1,69 @@ +using Microsoft.AspNetCore.Builder; +using Microsoft.AspNetCore.Routing; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Options; +using ZB.MOM.WW.GalaxyRepository.Grpc; + +namespace ZB.MOM.WW.GalaxyRepository.DependencyInjection; + +/// +/// Dependency-injection and endpoint-routing extensions that register the reusable +/// Galaxy Repository services and map the canonical gRPC service. A consuming gateway +/// calls during service registration and +/// while building its endpoint pipeline. +/// +public static class GalaxyRepositoryServiceCollectionExtensions +{ + /// + /// Registers the Galaxy Repository SQL provider, shared hierarchy cache, deploy + /// notifier, on-disk snapshot store, and the background refresh service, binding + /// from the supplied configuration section. + /// + /// The service collection to add registrations to. + /// The application configuration root. + /// + /// The configuration section path to bind from + /// (for example MxGateway:Galaxy or HistorianGateway:Galaxy). + /// + /// The service collection for chaining. + public static IServiceCollection AddZbGalaxyRepository( + this IServiceCollection services, + IConfiguration configuration, + string sectionPath) + { + ArgumentNullException.ThrowIfNull(services); + ArgumentNullException.ThrowIfNull(configuration); + ArgumentException.ThrowIfNullOrWhiteSpace(sectionPath); + + services + .AddOptions() + .Bind(configuration.GetSection(sectionPath)) + .ValidateOnStart(); + + services.AddSingleton(sp => + new GalaxyRepository(sp.GetRequiredService>().Value)); + services.AddSingleton(sp => sp.GetRequiredService()); + + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + services.AddHostedService(); + + return services; + } + + /// + /// Maps the canonical onto the consuming + /// application's endpoint pipeline. Call after and + /// after gRPC has been added to the application's services. + /// + /// The endpoint route builder to map the gRPC service onto. + /// The endpoint route builder for chaining. + public static IEndpointRouteBuilder MapZbGalaxyRepository(this IEndpointRouteBuilder endpoints) + { + ArgumentNullException.ThrowIfNull(endpoints); + endpoints.MapGrpcService(); + return endpoints; + } +} diff --git a/ZB.MOM.WW.GalaxyRepository/src/ZB.MOM.WW.GalaxyRepository/Grpc/GalaxyRepositoryGrpcService.cs b/ZB.MOM.WW.GalaxyRepository/src/ZB.MOM.WW.GalaxyRepository/Grpc/GalaxyRepositoryGrpcService.cs new file mode 100644 index 0000000..aab17eb --- /dev/null +++ b/ZB.MOM.WW.GalaxyRepository/src/ZB.MOM.WW.GalaxyRepository/Grpc/GalaxyRepositoryGrpcService.cs @@ -0,0 +1,329 @@ +using Google.Protobuf.WellKnownTypes; +using Grpc.Core; +using ProtoGalaxyRepository = ZB.MOM.WW.GalaxyRepository.Grpc.GalaxyRepository; + +namespace ZB.MOM.WW.GalaxyRepository.Grpc; + +/// +/// Reusable gRPC surface that exposes the Galaxy Repository to clients. Hosted by any +/// consuming gateway (e.g. MxAccessGateway or the HistorianGateway sidecar) via +/// . +/// +/// DiscoverHierarchy and GetLastDeployTime serve from +/// so many clients share a single SQL pull. +/// WatchDeployEvents streams events from . +/// TestConnection remains a direct SQL probe since callers use it as a health check. +/// +/// +/// This service applies no per-identity browse-subtree filtering — the full +/// hierarchy is projected (null subtree globs). Authorization (including any +/// subtree scoping) is the responsibility of the hosting gateway's interceptor layer. +/// +/// +/// Direct SQL surface used by TestConnection. +/// Shared hierarchy cache that DiscoverHierarchy/BrowseChildren/GetLastDeployTime serve from. +/// Deploy-event source streamed by WatchDeployEvents. +public sealed class GalaxyRepositoryGrpcService( + IGalaxyRepository repository, + IGalaxyHierarchyCache cache, + IGalaxyDeployNotifier notifier) : ProtoGalaxyRepository.GalaxyRepositoryBase +{ + private static readonly TimeSpan FirstLoadWaitBudget = TimeSpan.FromSeconds(5); + private const int DefaultDiscoverPageSize = 1000; + private const int MaxDiscoverPageSize = 5000; + private const int DefaultBrowsePageSize = 500; + // MaxBrowsePageSize reuses MaxDiscoverPageSize (5000) — same cap. + + /// + public override async Task TestConnection( + TestConnectionRequest request, + ServerCallContext context) + { + bool ok = await repository.TestConnectionAsync(context.CancellationToken).ConfigureAwait(false); + return new TestConnectionReply { Ok = ok }; + } + + /// + public override async Task GetLastDeployTime( + GetLastDeployTimeRequest request, + ServerCallContext context) + { + await WaitForCacheBootstrap(context.CancellationToken).ConfigureAwait(false); + GalaxyHierarchyCacheEntry entry = cache.Current; + + if (!entry.HasData) + { + throw new RpcException(new Status( + StatusCode.Unavailable, + ResolveUnavailableMessage(entry))); + } + + GetLastDeployTimeReply reply = new() { Present = entry.LastDeployTime.HasValue }; + if (entry.LastDeployTime.HasValue) + { + reply.TimeOfLastDeploy = Timestamp.FromDateTimeOffset(entry.LastDeployTime.Value); + } + return reply; + } + + /// + public override async Task DiscoverHierarchy( + DiscoverHierarchyRequest request, + ServerCallContext context) + { + await WaitForCacheBootstrap(context.CancellationToken).ConfigureAwait(false); + GalaxyHierarchyCacheEntry entry = cache.Current; + + if (!entry.HasData) + { + throw new RpcException(new Status( + StatusCode.Unavailable, + ResolveUnavailableMessage(entry))); + } + + int pageSize = ResolvePageSize(request.PageSize); + // The shared library applies no per-identity subtree scoping; the hosting + // gateway enforces authorization at its interceptor layer. + string filterSignature = GalaxyHierarchyProjector.ComputeFilterSignature(request, browseSubtreeGlobs: null); + PageToken pageToken = ParsePageToken(request.PageToken, entry.Sequence, filterSignature); + GalaxyHierarchyQueryResult query = GalaxyHierarchyProjector.Project( + entry, + request, + browseSubtreeGlobs: null, + pageToken.Offset, + pageSize); + int offset = pageToken.Offset; + if (offset > query.TotalObjectCount) + { + throw new RpcException(new Status( + StatusCode.InvalidArgument, + "DiscoverHierarchy page_token is outside the current hierarchy.")); + } + + DiscoverHierarchyReply reply = new() + { + TotalObjectCount = query.TotalObjectCount, + }; + reply.Objects.Add(query.Objects); + + int nextOffset = offset + query.Objects.Count; + if (nextOffset < query.TotalObjectCount) + { + reply.NextPageToken = FormatPageToken(entry.Sequence, query.FilterSignature, nextOffset); + } + + return reply; + } + + /// + public override async Task BrowseChildren( + BrowseChildrenRequest request, + ServerCallContext context) + { + await WaitForCacheBootstrap(context.CancellationToken).ConfigureAwait(false); + GalaxyHierarchyCacheEntry entry = cache.Current; + + if (!entry.HasData) + { + throw new RpcException(new Status( + StatusCode.Unavailable, + ResolveUnavailableMessage(entry))); + } + + int pageSize = ResolveBrowsePageSize(request.PageSize); + + // Resolve the parent id once so the page-token signature can include it + // and the projector sees the same resolved id when memoizing. The projector + // re-resolves internally; with the by-name/by-path indexes on + // GalaxyHierarchyIndex that second call is O(1), so the redundancy is cheap + // and keeps the projector self-contained. + int parentId = GalaxyBrowseProjector.ResolveParentId(entry, request); + string filterSignature = GalaxyBrowseProjector.ComputeFilterSignature( + request, browseSubtreeGlobs: null, parentId); + PageToken pageToken = ParsePageToken(request.PageToken, entry.Sequence, filterSignature); + + GalaxyBrowseChildrenResult result = GalaxyBrowseProjector.ProjectChildren( + entry, + request, + browseSubtreeGlobs: null, + pageToken.Offset, + pageSize); + + if (pageToken.Offset > result.TotalChildCount) + { + throw new RpcException(new Status( + StatusCode.InvalidArgument, + "BrowseChildren page_token is outside the current children set.")); + } + + BrowseChildrenReply reply = new() + { + TotalChildCount = result.TotalChildCount, + CacheSequence = (ulong)entry.Sequence, + }; + reply.Children.Add(result.Children); + reply.ChildHasChildren.Add(result.ChildHasChildren); + + int nextOffset = pageToken.Offset + result.Children.Count; + if (nextOffset < result.TotalChildCount) + { + reply.NextPageToken = FormatPageToken(entry.Sequence, result.FilterSignature, nextOffset); + } + + return reply; + } + + /// + public override async Task WatchDeployEvents( + WatchDeployEventsRequest request, + IServerStreamWriter responseStream, + ServerCallContext context) + { + DateTimeOffset? lastSeen = request.LastSeenDeployTime?.ToDateTimeOffset(); + + await foreach (GalaxyDeployEventInfo info in notifier + .SubscribeAsync(context.CancellationToken) + .ConfigureAwait(false)) + { + // Suppress the initial bootstrap event when the client already knows about + // this deploy time. We only suppress the first one — subsequent events fire + // on actual changes, so they always pass. + if (lastSeen is { } seen && info.TimeOfLastDeploy == seen) + { + lastSeen = null; + continue; + } + lastSeen = null; + + await responseStream.WriteAsync(MapDeployEvent(info), context.CancellationToken).ConfigureAwait(false); + } + } + + private async Task WaitForCacheBootstrap(CancellationToken cancellationToken) + { + if (cache.Current.HasData || cache.Current.Status == GalaxyCacheStatus.Unavailable) + { + return; + } + + using CancellationTokenSource budget = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + budget.CancelAfter(FirstLoadWaitBudget); + try + { + await cache.WaitForFirstLoadAsync(budget.Token).ConfigureAwait(false); + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + throw; + } + catch (OperationCanceledException) + { + // Budget elapsed; fall through and let the caller see the current + // (possibly Unknown/Unavailable) entry. + } + } + + private static DeployEvent MapDeployEvent(GalaxyDeployEventInfo info) + { + DeployEvent ev = new() + { + Sequence = (ulong)info.Sequence, + ObservedAt = Timestamp.FromDateTimeOffset(info.ObservedAt), + ObjectCount = info.ObjectCount, + AttributeCount = info.AttributeCount, + TimeOfLastDeployPresent = info.TimeOfLastDeploy.HasValue, + }; + if (info.TimeOfLastDeploy.HasValue) + { + ev.TimeOfLastDeploy = Timestamp.FromDateTimeOffset(info.TimeOfLastDeploy.Value); + } + return ev; + } + + private static string ResolveUnavailableMessage(GalaxyHierarchyCacheEntry entry) => entry.Status switch + { + GalaxyCacheStatus.Unknown => "Galaxy cache has not completed its initial load yet.", + GalaxyCacheStatus.Unavailable => "Galaxy repository is unavailable.", + _ => "Galaxy cache has no data available.", + }; + + private static int ResolvePageSize(int requestedPageSize) + { + if (requestedPageSize < 0) + { + throw new RpcException(new Status( + StatusCode.InvalidArgument, + "DiscoverHierarchy page_size must be greater than zero when provided.")); + } + + int pageSize = requestedPageSize == 0 ? DefaultDiscoverPageSize : requestedPageSize; + return Math.Min(pageSize, MaxDiscoverPageSize); + } + + private static int ResolveBrowsePageSize(int requested) + { + if (requested < 0) + { + throw new RpcException(new Status( + StatusCode.InvalidArgument, + "BrowseChildren page_size must be greater than zero when provided.")); + } + int pageSize = requested == 0 ? DefaultBrowsePageSize : requested; + return Math.Min(pageSize, MaxDiscoverPageSize); + } + + private static string FormatPageToken(long sequence, string filterSignature, int offset) + { + return string.Concat( + sequence.ToString(System.Globalization.CultureInfo.InvariantCulture), + ":", + filterSignature, + ":", + offset.ToString(System.Globalization.CultureInfo.InvariantCulture)); + } + + private static PageToken ParsePageToken(string pageToken, long currentSequence, string currentFilterSignature) + { + if (string.IsNullOrWhiteSpace(pageToken)) + { + return new PageToken(currentSequence, currentFilterSignature, Offset: 0); + } + + string[] parts = pageToken.Split(':', count: 3); + if (parts.Length != 3 + || !long.TryParse( + parts[0], + System.Globalization.NumberStyles.None, + System.Globalization.CultureInfo.InvariantCulture, + out long sequence) + || !int.TryParse( + parts[2], + System.Globalization.NumberStyles.None, + System.Globalization.CultureInfo.InvariantCulture, + out int offset) + || offset < 0) + { + throw new RpcException(new Status( + StatusCode.InvalidArgument, + "page_token is invalid.")); + } + + if (sequence != currentSequence) + { + throw new RpcException(new Status( + StatusCode.InvalidArgument, + "page_token is stale.")); + } + + if (!string.Equals(parts[1], currentFilterSignature, StringComparison.Ordinal)) + { + throw new RpcException(new Status( + StatusCode.InvalidArgument, + "page_token does not match the current filters.")); + } + + return new PageToken(sequence, parts[1], offset); + } + + private sealed record PageToken(long Sequence, string FilterSignature, int Offset); +}