feat(galaxyrepo): reusable gRPC service + AddZbGalaxyRepository DI

This commit is contained in:
Joseph Doherty
2026-06-23 20:26:59 -04:00
parent afd0287f54
commit a30f8551e9
2 changed files with 398 additions and 0 deletions
@@ -0,0 +1,69 @@
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Routing;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using ZB.MOM.WW.GalaxyRepository.Grpc;
namespace ZB.MOM.WW.GalaxyRepository.DependencyInjection;
/// <summary>
/// Dependency-injection and endpoint-routing extensions that register the reusable
/// Galaxy Repository services and map the canonical gRPC service. A consuming gateway
/// calls <see cref="AddZbGalaxyRepository"/> during service registration and
/// <see cref="MapZbGalaxyRepository"/> while building its endpoint pipeline.
/// </summary>
public static class GalaxyRepositoryServiceCollectionExtensions
{
/// <summary>
/// Registers the Galaxy Repository SQL provider, shared hierarchy cache, deploy
/// notifier, on-disk snapshot store, and the background refresh service, binding
/// <see cref="GalaxyRepositoryOptions"/> from the supplied configuration section.
/// </summary>
/// <param name="services">The service collection to add registrations to.</param>
/// <param name="configuration">The application configuration root.</param>
/// <param name="sectionPath">
/// The configuration section path to bind <see cref="GalaxyRepositoryOptions"/> from
/// (for example <c>MxGateway:Galaxy</c> or <c>HistorianGateway:Galaxy</c>).
/// </param>
/// <returns>The service collection for chaining.</returns>
public static IServiceCollection AddZbGalaxyRepository(
this IServiceCollection services,
IConfiguration configuration,
string sectionPath)
{
ArgumentNullException.ThrowIfNull(services);
ArgumentNullException.ThrowIfNull(configuration);
ArgumentException.ThrowIfNullOrWhiteSpace(sectionPath);
services
.AddOptions<GalaxyRepositoryOptions>()
.Bind(configuration.GetSection(sectionPath))
.ValidateOnStart();
services.AddSingleton(sp =>
new GalaxyRepository(sp.GetRequiredService<IOptions<GalaxyRepositoryOptions>>().Value));
services.AddSingleton<IGalaxyRepository>(sp => sp.GetRequiredService<GalaxyRepository>());
services.AddSingleton<IGalaxyDeployNotifier, GalaxyDeployNotifier>();
services.AddSingleton<IGalaxyHierarchySnapshotStore, GalaxyHierarchySnapshotStore>();
services.AddSingleton<IGalaxyHierarchyCache, GalaxyHierarchyCache>();
services.AddHostedService<GalaxyHierarchyRefreshService>();
return services;
}
/// <summary>
/// Maps the canonical <see cref="GalaxyRepositoryGrpcService"/> onto the consuming
/// application's endpoint pipeline. Call after <see cref="AddZbGalaxyRepository"/> and
/// after gRPC has been added to the application's services.
/// </summary>
/// <param name="endpoints">The endpoint route builder to map the gRPC service onto.</param>
/// <returns>The endpoint route builder for chaining.</returns>
public static IEndpointRouteBuilder MapZbGalaxyRepository(this IEndpointRouteBuilder endpoints)
{
ArgumentNullException.ThrowIfNull(endpoints);
endpoints.MapGrpcService<GalaxyRepositoryGrpcService>();
return endpoints;
}
}
@@ -0,0 +1,329 @@
using Google.Protobuf.WellKnownTypes;
using Grpc.Core;
using ProtoGalaxyRepository = ZB.MOM.WW.GalaxyRepository.Grpc.GalaxyRepository;
namespace ZB.MOM.WW.GalaxyRepository.Grpc;
/// <summary>
/// Reusable gRPC surface that exposes the Galaxy Repository to clients. Hosted by any
/// consuming gateway (e.g. MxAccessGateway or the HistorianGateway sidecar) via
/// <see cref="DependencyInjection.GalaxyRepositoryServiceCollectionExtensions.MapZbGalaxyRepository"/>.
/// <para>
/// <c>DiscoverHierarchy</c> and <c>GetLastDeployTime</c> serve from
/// <see cref="IGalaxyHierarchyCache"/> so many clients share a single SQL pull.
/// <c>WatchDeployEvents</c> streams events from <see cref="IGalaxyDeployNotifier"/>.
/// <c>TestConnection</c> remains a direct SQL probe since callers use it as a health check.
/// </para>
/// <para>
/// This service applies <b>no</b> per-identity browse-subtree filtering — the full
/// hierarchy is projected (<c>null</c> subtree globs). Authorization (including any
/// subtree scoping) is the responsibility of the hosting gateway's interceptor layer.
/// </para>
/// </summary>
/// <param name="repository">Direct SQL surface used by <c>TestConnection</c>.</param>
/// <param name="cache">Shared hierarchy cache that <c>DiscoverHierarchy</c>/<c>BrowseChildren</c>/<c>GetLastDeployTime</c> serve from.</param>
/// <param name="notifier">Deploy-event source streamed by <c>WatchDeployEvents</c>.</param>
public sealed class GalaxyRepositoryGrpcService(
IGalaxyRepository repository,
IGalaxyHierarchyCache cache,
IGalaxyDeployNotifier notifier) : ProtoGalaxyRepository.GalaxyRepositoryBase
{
private static readonly TimeSpan FirstLoadWaitBudget = TimeSpan.FromSeconds(5);
private const int DefaultDiscoverPageSize = 1000;
private const int MaxDiscoverPageSize = 5000;
private const int DefaultBrowsePageSize = 500;
// MaxBrowsePageSize reuses MaxDiscoverPageSize (5000) — same cap.
/// <inheritdoc />
public override async Task<TestConnectionReply> TestConnection(
TestConnectionRequest request,
ServerCallContext context)
{
bool ok = await repository.TestConnectionAsync(context.CancellationToken).ConfigureAwait(false);
return new TestConnectionReply { Ok = ok };
}
/// <inheritdoc />
public override async Task<GetLastDeployTimeReply> GetLastDeployTime(
GetLastDeployTimeRequest request,
ServerCallContext context)
{
await WaitForCacheBootstrap(context.CancellationToken).ConfigureAwait(false);
GalaxyHierarchyCacheEntry entry = cache.Current;
if (!entry.HasData)
{
throw new RpcException(new Status(
StatusCode.Unavailable,
ResolveUnavailableMessage(entry)));
}
GetLastDeployTimeReply reply = new() { Present = entry.LastDeployTime.HasValue };
if (entry.LastDeployTime.HasValue)
{
reply.TimeOfLastDeploy = Timestamp.FromDateTimeOffset(entry.LastDeployTime.Value);
}
return reply;
}
/// <inheritdoc />
public override async Task<DiscoverHierarchyReply> DiscoverHierarchy(
DiscoverHierarchyRequest request,
ServerCallContext context)
{
await WaitForCacheBootstrap(context.CancellationToken).ConfigureAwait(false);
GalaxyHierarchyCacheEntry entry = cache.Current;
if (!entry.HasData)
{
throw new RpcException(new Status(
StatusCode.Unavailable,
ResolveUnavailableMessage(entry)));
}
int pageSize = ResolvePageSize(request.PageSize);
// The shared library applies no per-identity subtree scoping; the hosting
// gateway enforces authorization at its interceptor layer.
string filterSignature = GalaxyHierarchyProjector.ComputeFilterSignature(request, browseSubtreeGlobs: null);
PageToken pageToken = ParsePageToken(request.PageToken, entry.Sequence, filterSignature);
GalaxyHierarchyQueryResult query = GalaxyHierarchyProjector.Project(
entry,
request,
browseSubtreeGlobs: null,
pageToken.Offset,
pageSize);
int offset = pageToken.Offset;
if (offset > query.TotalObjectCount)
{
throw new RpcException(new Status(
StatusCode.InvalidArgument,
"DiscoverHierarchy page_token is outside the current hierarchy."));
}
DiscoverHierarchyReply reply = new()
{
TotalObjectCount = query.TotalObjectCount,
};
reply.Objects.Add(query.Objects);
int nextOffset = offset + query.Objects.Count;
if (nextOffset < query.TotalObjectCount)
{
reply.NextPageToken = FormatPageToken(entry.Sequence, query.FilterSignature, nextOffset);
}
return reply;
}
/// <inheritdoc />
public override async Task<BrowseChildrenReply> BrowseChildren(
BrowseChildrenRequest request,
ServerCallContext context)
{
await WaitForCacheBootstrap(context.CancellationToken).ConfigureAwait(false);
GalaxyHierarchyCacheEntry entry = cache.Current;
if (!entry.HasData)
{
throw new RpcException(new Status(
StatusCode.Unavailable,
ResolveUnavailableMessage(entry)));
}
int pageSize = ResolveBrowsePageSize(request.PageSize);
// Resolve the parent id once so the page-token signature can include it
// and the projector sees the same resolved id when memoizing. The projector
// re-resolves internally; with the by-name/by-path indexes on
// GalaxyHierarchyIndex that second call is O(1), so the redundancy is cheap
// and keeps the projector self-contained.
int parentId = GalaxyBrowseProjector.ResolveParentId(entry, request);
string filterSignature = GalaxyBrowseProjector.ComputeFilterSignature(
request, browseSubtreeGlobs: null, parentId);
PageToken pageToken = ParsePageToken(request.PageToken, entry.Sequence, filterSignature);
GalaxyBrowseChildrenResult result = GalaxyBrowseProjector.ProjectChildren(
entry,
request,
browseSubtreeGlobs: null,
pageToken.Offset,
pageSize);
if (pageToken.Offset > result.TotalChildCount)
{
throw new RpcException(new Status(
StatusCode.InvalidArgument,
"BrowseChildren page_token is outside the current children set."));
}
BrowseChildrenReply reply = new()
{
TotalChildCount = result.TotalChildCount,
CacheSequence = (ulong)entry.Sequence,
};
reply.Children.Add(result.Children);
reply.ChildHasChildren.Add(result.ChildHasChildren);
int nextOffset = pageToken.Offset + result.Children.Count;
if (nextOffset < result.TotalChildCount)
{
reply.NextPageToken = FormatPageToken(entry.Sequence, result.FilterSignature, nextOffset);
}
return reply;
}
/// <inheritdoc />
public override async Task WatchDeployEvents(
WatchDeployEventsRequest request,
IServerStreamWriter<DeployEvent> responseStream,
ServerCallContext context)
{
DateTimeOffset? lastSeen = request.LastSeenDeployTime?.ToDateTimeOffset();
await foreach (GalaxyDeployEventInfo info in notifier
.SubscribeAsync(context.CancellationToken)
.ConfigureAwait(false))
{
// Suppress the initial bootstrap event when the client already knows about
// this deploy time. We only suppress the first one — subsequent events fire
// on actual changes, so they always pass.
if (lastSeen is { } seen && info.TimeOfLastDeploy == seen)
{
lastSeen = null;
continue;
}
lastSeen = null;
await responseStream.WriteAsync(MapDeployEvent(info), context.CancellationToken).ConfigureAwait(false);
}
}
private async Task WaitForCacheBootstrap(CancellationToken cancellationToken)
{
if (cache.Current.HasData || cache.Current.Status == GalaxyCacheStatus.Unavailable)
{
return;
}
using CancellationTokenSource budget = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
budget.CancelAfter(FirstLoadWaitBudget);
try
{
await cache.WaitForFirstLoadAsync(budget.Token).ConfigureAwait(false);
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
throw;
}
catch (OperationCanceledException)
{
// Budget elapsed; fall through and let the caller see the current
// (possibly Unknown/Unavailable) entry.
}
}
private static DeployEvent MapDeployEvent(GalaxyDeployEventInfo info)
{
DeployEvent ev = new()
{
Sequence = (ulong)info.Sequence,
ObservedAt = Timestamp.FromDateTimeOffset(info.ObservedAt),
ObjectCount = info.ObjectCount,
AttributeCount = info.AttributeCount,
TimeOfLastDeployPresent = info.TimeOfLastDeploy.HasValue,
};
if (info.TimeOfLastDeploy.HasValue)
{
ev.TimeOfLastDeploy = Timestamp.FromDateTimeOffset(info.TimeOfLastDeploy.Value);
}
return ev;
}
private static string ResolveUnavailableMessage(GalaxyHierarchyCacheEntry entry) => entry.Status switch
{
GalaxyCacheStatus.Unknown => "Galaxy cache has not completed its initial load yet.",
GalaxyCacheStatus.Unavailable => "Galaxy repository is unavailable.",
_ => "Galaxy cache has no data available.",
};
private static int ResolvePageSize(int requestedPageSize)
{
if (requestedPageSize < 0)
{
throw new RpcException(new Status(
StatusCode.InvalidArgument,
"DiscoverHierarchy page_size must be greater than zero when provided."));
}
int pageSize = requestedPageSize == 0 ? DefaultDiscoverPageSize : requestedPageSize;
return Math.Min(pageSize, MaxDiscoverPageSize);
}
private static int ResolveBrowsePageSize(int requested)
{
if (requested < 0)
{
throw new RpcException(new Status(
StatusCode.InvalidArgument,
"BrowseChildren page_size must be greater than zero when provided."));
}
int pageSize = requested == 0 ? DefaultBrowsePageSize : requested;
return Math.Min(pageSize, MaxDiscoverPageSize);
}
private static string FormatPageToken(long sequence, string filterSignature, int offset)
{
return string.Concat(
sequence.ToString(System.Globalization.CultureInfo.InvariantCulture),
":",
filterSignature,
":",
offset.ToString(System.Globalization.CultureInfo.InvariantCulture));
}
private static PageToken ParsePageToken(string pageToken, long currentSequence, string currentFilterSignature)
{
if (string.IsNullOrWhiteSpace(pageToken))
{
return new PageToken(currentSequence, currentFilterSignature, Offset: 0);
}
string[] parts = pageToken.Split(':', count: 3);
if (parts.Length != 3
|| !long.TryParse(
parts[0],
System.Globalization.NumberStyles.None,
System.Globalization.CultureInfo.InvariantCulture,
out long sequence)
|| !int.TryParse(
parts[2],
System.Globalization.NumberStyles.None,
System.Globalization.CultureInfo.InvariantCulture,
out int offset)
|| offset < 0)
{
throw new RpcException(new Status(
StatusCode.InvalidArgument,
"page_token is invalid."));
}
if (sequence != currentSequence)
{
throw new RpcException(new Status(
StatusCode.InvalidArgument,
"page_token is stale."));
}
if (!string.Equals(parts[1], currentFilterSignature, StringComparison.Ordinal))
{
throw new RpcException(new Status(
StatusCode.InvalidArgument,
"page_token does not match the current filters."));
}
return new PageToken(sequence, parts[1], offset);
}
private sealed record PageToken(long Sequence, string FilterSignature, int Offset);
}