using Google.Protobuf.WellKnownTypes; using Grpc.Core; using Microsoft.Data.SqlClient; using ZB.MOM.WW.MxGateway.Contracts.Proto.Galaxy; using GalaxyDb = ZB.MOM.WW.MxGateway.Server.Galaxy; using ZB.MOM.WW.MxGateway.Server.Security.Authentication; using ZB.MOM.WW.MxGateway.Server.Security.Authorization; using ProtoGalaxyRepository = ZB.MOM.WW.MxGateway.Contracts.Proto.Galaxy.GalaxyRepository; namespace ZB.MOM.WW.MxGateway.Server.Grpc; /// /// gRPC surface that exposes the Galaxy Repository to clients. DiscoverHierarchy /// and GetLastDeployTime serve from /// so many clients share a single SQL pull. WatchDeployEvents streams events /// from . TestConnection remains a /// direct SQL probe since callers use it as a health check. /// public sealed class GalaxyRepositoryGrpcService( GalaxyDb.IGalaxyRepository repository, GalaxyDb.IGalaxyHierarchyCache cache, GalaxyDb.IGalaxyDeployNotifier notifier, IGatewayRequestIdentityAccessor identityAccessor, ILogger logger) : ProtoGalaxyRepository.GalaxyRepositoryBase { private static readonly TimeSpan FirstLoadWaitBudget = TimeSpan.FromSeconds(5); private const int DefaultDiscoverPageSize = 1000; private const int MaxDiscoverPageSize = 5000; private const int DefaultBrowsePageSize = 500; // MaxBrowsePageSize reuses MaxDiscoverPageSize (5000) — same cap. /// public override async Task TestConnection( TestConnectionRequest request, ServerCallContext context) { bool ok = await repository.TestConnectionAsync(context.CancellationToken).ConfigureAwait(false); return new TestConnectionReply { Ok = ok }; } /// public override async Task GetLastDeployTime( GetLastDeployTimeRequest request, ServerCallContext context) { await WaitForCacheBootstrap(context.CancellationToken).ConfigureAwait(false); GalaxyDb.GalaxyHierarchyCacheEntry entry = cache.Current; if (!entry.HasData) { throw new RpcException(new Status( StatusCode.Unavailable, ResolveUnavailableMessage(entry))); } GetLastDeployTimeReply reply = new() { Present = entry.LastDeployTime.HasValue }; if (entry.LastDeployTime.HasValue) { reply.TimeOfLastDeploy = Timestamp.FromDateTimeOffset(entry.LastDeployTime.Value); } return reply; } /// public override async Task DiscoverHierarchy( DiscoverHierarchyRequest request, ServerCallContext context) { await WaitForCacheBootstrap(context.CancellationToken).ConfigureAwait(false); GalaxyDb.GalaxyHierarchyCacheEntry entry = cache.Current; if (!entry.HasData) { throw new RpcException(new Status( StatusCode.Unavailable, ResolveUnavailableMessage(entry))); } int pageSize = ResolvePageSize(request.PageSize); IReadOnlyList browseSubtrees = ResolveBrowseSubtrees(); string filterSignature = GalaxyDb.GalaxyHierarchyProjector.ComputeFilterSignature(request, browseSubtrees); PageToken pageToken = ParsePageToken(request.PageToken, entry.Sequence, filterSignature); GalaxyDb.GalaxyHierarchyQueryResult query = GalaxyDb.GalaxyHierarchyProjector.Project( entry, request, browseSubtrees, pageToken.Offset, pageSize); int offset = pageToken.Offset; if (offset > query.TotalObjectCount) { throw new RpcException(new Status( StatusCode.InvalidArgument, "DiscoverHierarchy page_token is outside the current hierarchy.")); } DiscoverHierarchyReply reply = new() { TotalObjectCount = query.TotalObjectCount, }; reply.Objects.Add(query.Objects); int nextOffset = offset + query.Objects.Count; if (nextOffset < query.TotalObjectCount) { reply.NextPageToken = FormatPageToken(entry.Sequence, query.FilterSignature, nextOffset); } return reply; } /// public override async Task BrowseChildren( BrowseChildrenRequest request, ServerCallContext context) { await WaitForCacheBootstrap(context.CancellationToken).ConfigureAwait(false); GalaxyDb.GalaxyHierarchyCacheEntry entry = cache.Current; if (!entry.HasData) { throw new RpcException(new Status( StatusCode.Unavailable, ResolveUnavailableMessage(entry))); } int pageSize = ResolveBrowsePageSize(request.PageSize); IReadOnlyList browseSubtrees = ResolveBrowseSubtrees(); // Resolve the parent id once so the page-token signature can include it // and the projector sees the same resolved id when memoizing. The projector // re-resolves internally; with the by-name/by-path indexes on // GalaxyHierarchyIndex that second call is O(1), so the redundancy is cheap // and keeps the projector self-contained. int parentId = GalaxyDb.GalaxyBrowseProjector.ResolveParentId(entry, request); string filterSignature = GalaxyDb.GalaxyBrowseProjector.ComputeFilterSignature( request, browseSubtrees, parentId); PageToken pageToken = ParsePageToken(request.PageToken, entry.Sequence, filterSignature); GalaxyDb.GalaxyBrowseChildrenResult result = GalaxyDb.GalaxyBrowseProjector.ProjectChildren( entry, request, browseSubtrees, pageToken.Offset, pageSize); if (pageToken.Offset > result.TotalChildCount) { throw new RpcException(new Status( StatusCode.InvalidArgument, "BrowseChildren page_token is outside the current children set.")); } BrowseChildrenReply reply = new() { TotalChildCount = result.TotalChildCount, CacheSequence = (ulong)entry.Sequence, }; reply.Children.Add(result.Children); reply.ChildHasChildren.Add(result.ChildHasChildren); int nextOffset = pageToken.Offset + result.Children.Count; if (nextOffset < result.TotalChildCount) { reply.NextPageToken = FormatPageToken(entry.Sequence, result.FilterSignature, nextOffset); } return reply; } /// public override async Task WatchDeployEvents( WatchDeployEventsRequest request, IServerStreamWriter responseStream, ServerCallContext context) { DateTimeOffset? lastSeen = request.LastSeenDeployTime?.ToDateTimeOffset(); // The caller's identity (and therefore its browse-subtree constraints) is fixed // for the lifetime of the stream, so resolve the subtrees once rather than per // streamed event. IReadOnlyList browseSubtrees = ResolveBrowseSubtrees(); await foreach (GalaxyDb.GalaxyDeployEventInfo info in notifier .SubscribeAsync(context.CancellationToken) .ConfigureAwait(false)) { // Suppress the initial bootstrap event when the client already knows about // this deploy time. We only suppress the first one — subsequent events fire // on actual changes, so they always pass. if (lastSeen is { } seen && info.TimeOfLastDeploy == seen) { lastSeen = null; continue; } lastSeen = null; await responseStream.WriteAsync(MapDeployEvent(info, browseSubtrees), context.CancellationToken).ConfigureAwait(false); } } private async Task WaitForCacheBootstrap(CancellationToken cancellationToken) { if (cache.Current.HasData || cache.Current.Status == GalaxyDb.GalaxyCacheStatus.Unavailable) { return; } using CancellationTokenSource budget = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); budget.CancelAfter(FirstLoadWaitBudget); try { await cache.WaitForFirstLoadAsync(budget.Token).ConfigureAwait(false); } catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { throw; } catch (OperationCanceledException) { // Budget elapsed; fall through and let the caller see the current // (possibly Unknown/Unavailable) entry. } } private DeployEvent MapDeployEvent( GalaxyDb.GalaxyDeployEventInfo info, IReadOnlyList browseSubtrees) { int objectCount = info.ObjectCount; int attributeCount = info.AttributeCount; if (browseSubtrees.Count > 0 && cache.Current.HasData) { GalaxyDb.GalaxyHierarchyQueryResult scoped = GalaxyDb.GalaxyHierarchyProjector.Project( cache.Current, new DiscoverHierarchyRequest(), browseSubtrees); objectCount = scoped.TotalObjectCount; attributeCount = scoped.Objects.Sum(obj => obj.Attributes.Count); } DeployEvent ev = new() { Sequence = (ulong)info.Sequence, ObservedAt = Timestamp.FromDateTimeOffset(info.ObservedAt), ObjectCount = objectCount, AttributeCount = attributeCount, TimeOfLastDeployPresent = info.TimeOfLastDeploy.HasValue, }; if (info.TimeOfLastDeploy.HasValue) { ev.TimeOfLastDeploy = Timestamp.FromDateTimeOffset(info.TimeOfLastDeploy.Value); } return ev; } private static string ResolveUnavailableMessage(GalaxyDb.GalaxyHierarchyCacheEntry entry) => entry.Status switch { GalaxyDb.GalaxyCacheStatus.Unknown => "Galaxy cache has not completed its initial load yet.", GalaxyDb.GalaxyCacheStatus.Unavailable => "Galaxy repository is unavailable.", _ => "Galaxy cache has no data available.", }; private static int ResolvePageSize(int requestedPageSize) { if (requestedPageSize < 0) { throw new RpcException(new Status( StatusCode.InvalidArgument, "DiscoverHierarchy page_size must be greater than zero when provided.")); } int pageSize = requestedPageSize == 0 ? DefaultDiscoverPageSize : requestedPageSize; return Math.Min(pageSize, MaxDiscoverPageSize); } private static int ResolveBrowsePageSize(int requested) { if (requested < 0) { throw new RpcException(new Status( StatusCode.InvalidArgument, "BrowseChildren page_size must be greater than zero when provided.")); } int pageSize = requested == 0 ? DefaultBrowsePageSize : requested; return Math.Min(pageSize, MaxDiscoverPageSize); } private IReadOnlyList ResolveBrowseSubtrees() { ApiKeyConstraints constraints = identityAccessor.Current?.EffectiveConstraints ?? ApiKeyConstraints.Empty; return constraints.BrowseSubtrees; } private static string FormatPageToken(long sequence, string filterSignature, int offset) { return string.Concat( sequence.ToString(System.Globalization.CultureInfo.InvariantCulture), ":", filterSignature, ":", offset.ToString(System.Globalization.CultureInfo.InvariantCulture)); } private static PageToken ParsePageToken(string pageToken, long currentSequence, string currentFilterSignature) { if (string.IsNullOrWhiteSpace(pageToken)) { return new PageToken(currentSequence, currentFilterSignature, Offset: 0); } string[] parts = pageToken.Split(':', count: 3); if (parts.Length != 3 || !long.TryParse( parts[0], System.Globalization.NumberStyles.None, System.Globalization.CultureInfo.InvariantCulture, out long sequence) || !int.TryParse( parts[2], System.Globalization.NumberStyles.None, System.Globalization.CultureInfo.InvariantCulture, out int offset) || offset < 0) { throw new RpcException(new Status( StatusCode.InvalidArgument, "page_token is invalid.")); } if (sequence != currentSequence) { throw new RpcException(new Status( StatusCode.InvalidArgument, "page_token is stale.")); } if (!string.Equals(parts[1], currentFilterSignature, StringComparison.Ordinal)) { throw new RpcException(new Status( StatusCode.InvalidArgument, "page_token does not match the current filters.")); } return new PageToken(sequence, parts[1], offset); } private sealed record PageToken(long Sequence, string FilterSignature, int Offset); [System.Diagnostics.CodeAnalysis.SuppressMessage( "Style", "IDE0051:Remove unused private members", Justification = "Kept for parity with prior SQL exception mapping; future direct-SQL paths reuse it.")] private RpcException MapSqlException(SqlException exception) { logger.LogWarning(exception, "Galaxy repository query failed."); return new RpcException(new Status( StatusCode.Unavailable, "Galaxy repository is unavailable.")); } }