Compare commits

...

11 Commits

13 changed files with 669 additions and 96 deletions
@@ -106,6 +106,8 @@ public sealed class LazyBrowseNodeTests
new RpcException(new Status(StatusCode.NotFound, "Parent not found"))));
await Assert.ThrowsAsync<MxGatewayException>(async () => await roots[0].ExpandAsync());
Assert.False(roots[0].IsExpanded);
Assert.Empty(roots[0].Children);
}
/// <summary>
+113 -23
View File
@@ -18,6 +18,11 @@ import (
// browseChildrenPageSize is the per-request page size used by the lazy walker.
const browseChildrenPageSize = 500
// discoverHierarchyPageSize is the per-request page size used by DiscoverHierarchy.
// Mirrors the .NET client constant so large galaxies are not silently truncated
// by the server's default page cap.
const discoverHierarchyPageSize = 5000
// RawGalaxyRepositoryClient is the generated gRPC client interface for the
// Galaxy Repository service exposed for callers that need direct contract
// access.
@@ -155,16 +160,35 @@ func (c *GalaxyClient) GetLastDeployTime(ctx context.Context) (time.Time, bool,
// DiscoverHierarchy returns the deployed Galaxy object hierarchy with each
// object's dynamic attributes. The objects are returned in the order supplied
// by the server.
// by the server. The call pages over the server's NextPageToken until the
// server signals it has no more results, matching the .NET client.
func (c *GalaxyClient) DiscoverHierarchy(ctx context.Context) ([]*GalaxyObject, error) {
callCtx, cancel := c.callContext(ctx)
defer cancel()
reply, err := c.raw.DiscoverHierarchy(callCtx, &pb.DiscoverHierarchyRequest{})
if err != nil {
return nil, &GatewayError{Op: "galaxy discover hierarchy", Err: err}
var objects []*GalaxyObject
pageToken := ""
seen := map[string]struct{}{}
for {
callCtx, cancel := c.callContext(ctx)
reply, err := c.raw.DiscoverHierarchy(callCtx, &pb.DiscoverHierarchyRequest{
PageSize: discoverHierarchyPageSize,
PageToken: pageToken,
})
cancel()
if err != nil {
return nil, &GatewayError{Op: "galaxy discover hierarchy", Err: err}
}
objects = append(objects, reply.GetObjects()...)
pageToken = reply.GetNextPageToken()
if pageToken == "" {
return objects, nil
}
if _, dup := seen[pageToken]; dup {
return nil, &GatewayError{
Op: "galaxy discover hierarchy",
Err: fmt.Errorf("repeated page token %q", pageToken),
}
}
seen[pageToken] = struct{}{}
}
return reply.GetObjects(), nil
}
// WatchDeployEventsRaw starts the generated WatchDeployEvents stream for callers
@@ -249,15 +273,25 @@ func (c *GalaxyClient) Close() error {
// LazyBrowseNode is one node in a lazy Galaxy hierarchy walk produced by
// (*GalaxyClient).Browse. Children are not fetched until Expand is called.
// The node is safe for concurrent use; concurrent Expand calls collapse to a
// single RPC.
// The node is safe for concurrent use; concurrent Expand calls coalesce onto
// a single in-flight RPC and do not block snapshot accessors.
type LazyBrowseNode struct {
client *GalaxyClient
object *pb.GalaxyObject
hasChildrenHint bool
options BrowseChildrenOptions
mu sync.Mutex
// expandLock gates inspection and mutation of expand-coordination state
// (expanding, expandDone, expandErr). It is held only briefly; the BrowseChildren
// RPC itself runs outside this lock so concurrent readers and waiters are not blocked.
expandLock sync.Mutex
expanding bool
expandDone chan struct{}
expandErr error
// mu protects the children snapshot and isExpanded flag for concurrent
// Children() / IsExpanded() readers.
mu sync.RWMutex
children []*LazyBrowseNode
isExpanded bool
}
@@ -272,8 +306,8 @@ func (n *LazyBrowseNode) HasChildrenHint() bool { return n.hasChildrenHint }
// Children returns a snapshot copy of the currently-loaded child nodes. Returns
// an empty slice when Expand has not yet been called.
func (n *LazyBrowseNode) Children() []*LazyBrowseNode {
n.mu.Lock()
defer n.mu.Unlock()
n.mu.RLock()
defer n.mu.RUnlock()
out := make([]*LazyBrowseNode, len(n.children))
copy(out, n.children)
return out
@@ -281,28 +315,81 @@ func (n *LazyBrowseNode) Children() []*LazyBrowseNode {
// IsExpanded reports whether Expand has completed successfully on this node.
func (n *LazyBrowseNode) IsExpanded() bool {
n.mu.Lock()
defer n.mu.Unlock()
n.mu.RLock()
defer n.mu.RUnlock()
return n.isExpanded
}
// Expand fetches this node's direct children via BrowseChildren when they have
// not yet been loaded. Subsequent calls after a successful Expand are a no-op
// and do not issue another RPC.
//
// Expand is safe to call concurrently from multiple goroutines: callers that
// arrive while an expansion is in flight wait on the active RPC and share its
// result instead of issuing a second RPC. The RPC itself runs without holding
// the snapshot mutex, so concurrent Children() and IsExpanded() callers are
// not blocked for the duration of the network round trip.
//
// Failure semantics: a failed expansion surfaces the same error to every
// in-flight waiter, but the node is left in its pre-call state (isExpanded =
// false, no in-flight expansion). The next Expand call therefore retries with
// a fresh RPC; failures are not sticky.
func (n *LazyBrowseNode) Expand(ctx context.Context) error {
n.mu.Lock()
defer n.mu.Unlock()
// Fast path: already expanded.
n.mu.RLock()
if n.isExpanded {
n.mu.RUnlock()
return nil
}
n.mu.RUnlock()
// Either start a new expansion or wait on an existing one.
n.expandLock.Lock()
n.mu.RLock()
alreadyExpanded := n.isExpanded
n.mu.RUnlock()
if alreadyExpanded {
n.expandLock.Unlock()
return nil
}
if n.expanding {
done := n.expandDone
n.expandLock.Unlock()
select {
case <-done:
n.expandLock.Lock()
err := n.expandErr
n.expandLock.Unlock()
return err
case <-ctx.Done():
return ctx.Err()
}
}
n.expanding = true
n.expandDone = make(chan struct{})
done := n.expandDone
n.expandLock.Unlock()
// Issue the RPC outside any lock so concurrent readers/waiters are not blocked.
parentID := n.object.GetGobjectId()
children, err := n.client.browseChildrenInner(ctx, &parentID, n.options)
if err != nil {
return err
if err == nil {
n.mu.Lock()
n.children = children
n.isExpanded = true
n.mu.Unlock()
}
n.children = children
n.isExpanded = true
return nil
// Publish result to waiters and clear the in-flight marker so a failed
// expansion can be retried by the next Expand call.
n.expandLock.Lock()
n.expandErr = err
n.expanding = false
close(done)
n.expandLock.Unlock()
return err
}
// Browse returns the root nodes of the Galaxy hierarchy. The returned nodes
@@ -375,7 +462,10 @@ func (c *GalaxyClient) browseChildrenInner(
return nodes, nil
}
if _, dup := seen[pageToken]; dup {
return nil, fmt.Errorf("mxgateway: galaxy browse children returned repeated page token %q", pageToken)
return nil, &GatewayError{
Op: "galaxy browse children",
Err: fmt.Errorf("repeated page token %q", pageToken),
}
}
seen[pageToken] = struct{}{}
}
+136 -12
View File
@@ -4,6 +4,7 @@ import (
"context"
"errors"
"net"
"sync"
"testing"
"time"
@@ -146,6 +147,47 @@ func TestGalaxyDiscoverHierarchyReturnsObjects(t *testing.T) {
}
}
func TestGalaxyDiscoverHierarchyPaginatesAcrossMultiplePages(t *testing.T) {
page1 := &pb.DiscoverHierarchyReply{
Objects: []*pb.GalaxyObject{
{GobjectId: 1, TagName: "A"},
{GobjectId: 2, TagName: "B"},
},
NextPageToken: "page-2",
TotalObjectCount: 3,
}
page2 := &pb.DiscoverHierarchyReply{
Objects: []*pb.GalaxyObject{
{GobjectId: 3, TagName: "C"},
},
TotalObjectCount: 3,
}
fake := &fakeGalaxyServer{
discoverHierarchyReplies: []*pb.DiscoverHierarchyReply{page1, page2},
}
client, cleanup := newGalaxyBufconnClient(t, fake)
defer cleanup()
objs, err := client.DiscoverHierarchy(context.Background())
if err != nil {
t.Fatalf("DiscoverHierarchy: %v", err)
}
if got, want := len(objs), 3; got != want {
t.Fatalf("len(objs) = %d, want %d", got, want)
}
if len(fake.discoverHierarchyCalls) != 2 {
t.Fatalf("expected 2 RPC calls, got %d", len(fake.discoverHierarchyCalls))
}
if fake.discoverHierarchyCalls[0].GetPageSize() != discoverHierarchyPageSize {
t.Fatalf("first call PageSize = %d, want %d",
fake.discoverHierarchyCalls[0].GetPageSize(), discoverHierarchyPageSize)
}
if fake.discoverHierarchyCalls[1].GetPageToken() != "page-2" {
t.Fatalf("second call page token = %q, want %q",
fake.discoverHierarchyCalls[1].GetPageToken(), "page-2")
}
}
func TestGalaxyDialReturnsGatewayErrorOnRpcFailure(t *testing.T) {
fake := &fakeGalaxyServer{failTest: true}
client, cleanup := newGalaxyBufconnClient(t, fake)
@@ -372,18 +414,20 @@ func newGalaxyBufconnClient(t *testing.T, fake *fakeGalaxyServer) (*GalaxyClient
type fakeGalaxyServer struct {
pb.UnimplementedGalaxyRepositoryServer
testReply *pb.TestConnectionReply
testAuth string
failTest bool
deployReply *pb.GetLastDeployTimeReply
discoverReply *pb.DiscoverHierarchyReply
watchEvents []*pb.DeployEvent
watchRequest *pb.WatchDeployEventsRequest
watchSendInterval time.Duration
watchHoldOpen bool
browseChildrenCalls []*pb.BrowseChildrenRequest
browseChildrenReplies []*pb.BrowseChildrenReply
browseChildrenError error
testReply *pb.TestConnectionReply
testAuth string
failTest bool
deployReply *pb.GetLastDeployTimeReply
discoverReply *pb.DiscoverHierarchyReply
discoverHierarchyCalls []*pb.DiscoverHierarchyRequest
discoverHierarchyReplies []*pb.DiscoverHierarchyReply
watchEvents []*pb.DeployEvent
watchRequest *pb.WatchDeployEventsRequest
watchSendInterval time.Duration
watchHoldOpen bool
browseChildrenCalls []*pb.BrowseChildrenRequest
browseChildrenReplies []*pb.BrowseChildrenReply
browseChildrenError error
}
func (s *fakeGalaxyServer) TestConnection(ctx context.Context, req *pb.TestConnectionRequest) (*pb.TestConnectionReply, error) {
@@ -405,6 +449,12 @@ func (s *fakeGalaxyServer) GetLastDeployTime(ctx context.Context, req *pb.GetLas
}
func (s *fakeGalaxyServer) DiscoverHierarchy(ctx context.Context, req *pb.DiscoverHierarchyRequest) (*pb.DiscoverHierarchyReply, error) {
s.discoverHierarchyCalls = append(s.discoverHierarchyCalls, req)
if len(s.discoverHierarchyReplies) > 0 {
reply := s.discoverHierarchyReplies[0]
s.discoverHierarchyReplies = s.discoverHierarchyReplies[1:]
return reply, nil
}
if s.discoverReply != nil {
return s.discoverReply, nil
}
@@ -738,3 +788,77 @@ func TestGalaxyBrowseWithFilterForwardsToRequest(t *testing.T) {
t.Fatal("HistorizedOnly = false, want true")
}
}
func TestGalaxyBrowseExpandConcurrentCallersOnlyFireOneRpc(t *testing.T) {
fake := &fakeGalaxyServer{
browseChildrenReplies: []*pb.BrowseChildrenReply{
// roots
buildBrowseReply([]*pb.GalaxyObject{obj(1, "Plant", true)}, []bool{true}, 7),
// one expand: one child
buildBrowseReply([]*pb.GalaxyObject{obj(2, "Mixer", false)}, []bool{false}, 7),
},
}
client, cleanup := newGalaxyBufconnClient(t, fake)
defer cleanup()
ctx := context.Background()
roots, err := client.Browse(ctx, nil)
if err != nil {
t.Fatalf("Browse: %v", err)
}
var wg sync.WaitGroup
errs := make(chan error, 10)
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
errs <- roots[0].Expand(ctx)
}()
}
wg.Wait()
close(errs)
for err := range errs {
if err != nil {
t.Fatalf("concurrent Expand: %v", err)
}
}
if !roots[0].IsExpanded() {
t.Fatal("IsExpanded() = false after 10 concurrent expands")
}
if got, want := len(roots[0].Children()), 1; got != want {
t.Fatalf("len(children) = %d, want %d", got, want)
}
// 1 roots fetch + exactly 1 expand fetch.
if got, want := len(fake.browseChildrenCalls), 2; got != want {
t.Fatalf("RPC count = %d, want %d", got, want)
}
}
func TestGalaxyBrowseChildrenRejectsRepeatedPageToken(t *testing.T) {
// Build a reply that carries a non-empty NextPageToken so browseChildrenInner
// will request a second page. Queue the same reply twice so the second response
// returns the same page token, triggering the duplicate-token guard.
page := buildBrowseReply(
[]*pb.GalaxyObject{obj(1, "Plant", true)},
[]bool{true},
1,
)
page.NextPageToken = "1:abc:1"
fake := &fakeGalaxyServer{
browseChildrenReplies: []*pb.BrowseChildrenReply{page, page},
}
client, cleanup := newGalaxyBufconnClient(t, fake)
defer cleanup()
_, err := client.Browse(context.Background(), nil)
if err == nil {
t.Fatal("Browse: error = nil, want repeated-page-token error")
}
var gwErr *GatewayError
if !errors.As(err, &gwErr) {
t.Fatalf("error type = %T, want *GatewayError; err = %v", err, err)
}
}
@@ -4,6 +4,9 @@ import galaxy_repository.v1.GalaxyRepositoryOuterClass.GalaxyObject;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* One node in a lazy-loaded Galaxy browse tree. Holds the underlying
@@ -16,7 +19,14 @@ public final class LazyBrowseNode {
private final GalaxyObject object;
private final boolean hasChildrenHint;
private final BrowseChildrenOptions options;
private final Object lock = new Object();
// expandLock gates the start of a new expand AND the publish of the in-flight
// future. Readers (getChildren / isExpanded) use a separate read-write lock so
// they never block on the gRPC call.
private final Object expandLock = new Object();
private CompletableFuture<Void> inFlight;
private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private List<LazyBrowseNode> children = Collections.emptyList();
private boolean isExpanded;
@@ -43,15 +53,21 @@ public final class LazyBrowseNode {
/** @return a snapshot of direct children loaded by {@link #expand()}; empty until then. */
public List<LazyBrowseNode> getChildren() {
synchronized (lock) {
readWriteLock.readLock().lock();
try {
return List.copyOf(children);
} finally {
readWriteLock.readLock().unlock();
}
}
/** @return {@code true} after the first {@link #expand()} call completes. */
public boolean isExpanded() {
synchronized (lock) {
readWriteLock.readLock().lock();
try {
return isExpanded;
} finally {
readWriteLock.readLock().unlock();
}
}
@@ -59,17 +75,76 @@ public final class LazyBrowseNode {
* Fetches direct children from the gateway and populates {@link #getChildren()}.
* Idempotent: subsequent calls are no-ops and do not issue a second RPC.
*
* <p>Concurrent callers coalesce onto a single in-flight RPC: the first caller
* (the "leader") issues the gRPC call, while any other thread that calls
* {@code expand()} during that window blocks on the leader's future and sees
* the same result (or the same exception). On failure the in-flight slot is
* cleared so a subsequent call can retry.
*
* <p>Readers ({@link #getChildren()} / {@link #isExpanded()}) take a separate
* read lock and are never blocked for the duration of the RPC.
*
* @throws MxGatewayException on transport or protocol failure
*/
public void expand() {
synchronized (lock) {
if (isExpanded) {
if (isExpanded()) {
return;
}
CompletableFuture<Void> future;
boolean iAmTheLeader;
synchronized (expandLock) {
if (isExpanded()) {
return;
}
List<LazyBrowseNode> loaded =
client.browseChildrenInner(Integer.valueOf(object.getGobjectId()), options);
this.children = loaded;
this.isExpanded = true;
if (inFlight != null) {
future = inFlight;
iAmTheLeader = false;
} else {
future = new CompletableFuture<>();
inFlight = future;
iAmTheLeader = true;
}
}
if (iAmTheLeader) {
try {
List<LazyBrowseNode> loaded =
client.browseChildrenInner(object.getGobjectId(), options);
readWriteLock.writeLock().lock();
try {
this.children = loaded;
this.isExpanded = true;
} finally {
readWriteLock.writeLock().unlock();
}
synchronized (expandLock) {
inFlight = null;
}
future.complete(null);
} catch (RuntimeException ex) {
synchronized (expandLock) {
inFlight = null;
}
future.completeExceptionally(ex);
throw ex;
}
} else {
try {
future.get();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new MxGatewayException("Interrupted waiting for browse-children expand.", ie);
} catch (ExecutionException ee) {
Throwable cause = ee.getCause();
if (cause instanceof MxGatewayException me) {
throw me;
}
if (cause instanceof RuntimeException re) {
throw re;
}
throw new MxGatewayException("BrowseChildren expand failed.", cause);
}
}
}
}
@@ -40,9 +40,14 @@ import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.UUID;
import java.util.ArrayList;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.jupiter.api.Test;
@@ -203,6 +208,27 @@ final class GalaxyRepositoryClientTests {
}
}
@Test
void browseChildrenRejectsRepeatedPageToken() throws Exception {
// Queue the same BrowseChildrenReply twice with a non-empty NextPageToken.
// The client will request a second page and detect that the token repeats.
BrowseChildrenService service = new BrowseChildrenService();
BrowseChildrenReply repeatedReply = browseReply(
List.of(obj(1, "Plant", true)),
List.of(true),
1L,
"1:abc:1");
service.replies.add(repeatedReply);
service.replies.add(repeatedReply);
try (InProcessGalaxy g = InProcessGalaxy.start(service, new AtomicReference<>());
GalaxyRepositoryClient client = g.client("")) {
MxGatewayException error = assertThrows(MxGatewayException.class, client::browse);
assertTrue(error.getMessage().contains("repeated page token"));
}
}
@Test
void watchDeployEventsReceivesEventsInOrder() throws Exception {
DeployEvent first = DeployEvent.newBuilder()
@@ -445,6 +471,91 @@ final class GalaxyRepositoryClientTests {
}
}
@Test
void browseExpandConcurrentCallersOnlyFireOneRpc() throws Exception {
// Verifies that concurrent expand() calls coalesce onto a single in-flight
// BrowseChildren RPC and that readers (isExpanded/getChildren) are not
// blocked for the full RPC duration.
BrowseChildrenReply rootsReply = browseReply(
List.of(obj(1, "Plant", true)),
List.of(true),
7L,
"");
BrowseChildrenReply childrenReply = browseReply(
List.of(obj(2, "Mixer_001", false)),
List.of(false),
7L,
"");
// Gate the child fetch behind a latch so multiple expanders can pile up.
CountDownLatch release = new CountDownLatch(1);
AtomicInteger childCalls = new AtomicInteger();
BrowseChildrenService service = new BrowseChildrenService() {
@Override
public void browseChildren(
BrowseChildrenRequest request, StreamObserver<BrowseChildrenReply> responseObserver) {
calls.add(request);
BrowseChildrenReply reply;
if (!request.hasParentGobjectId()) {
reply = rootsReply;
} else {
// Block the leader until the followers have arrived.
try {
assertTrue(release.await(5, TimeUnit.SECONDS), "release latch never tripped");
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
responseObserver.onError(Status.CANCELLED.asRuntimeException());
return;
}
childCalls.incrementAndGet();
reply = childrenReply;
}
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
};
try (InProcessGalaxy g = InProcessGalaxy.start(service, new AtomicReference<>());
GalaxyRepositoryClient client = g.client("")) {
List<LazyBrowseNode> roots = client.browse();
LazyBrowseNode root = roots.get(0);
int parallelism = 10;
ExecutorService pool = Executors.newFixedThreadPool(parallelism);
try {
CountDownLatch ready = new CountDownLatch(parallelism);
List<Future<Void>> futures = new ArrayList<>();
for (int i = 0; i < parallelism; i++) {
futures.add(pool.submit(() -> {
ready.countDown();
root.expand();
return null;
}));
}
// Wait for all callers to be in flight, then release the leader.
assertTrue(ready.await(5, TimeUnit.SECONDS), "expander threads did not start");
// Readers must not be blocked by an in-flight expand; this should not deadlock
// and should return the pre-expand state.
assertFalse(root.isExpanded());
assertEquals(0, root.getChildren().size());
release.countDown();
for (Future<Void> f : futures) {
f.get(10, TimeUnit.SECONDS);
}
} finally {
pool.shutdownNow();
}
assertTrue(root.isExpanded());
assertEquals(1, root.getChildren().size());
// Exactly one expand RPC was issued even though many callers raced.
assertEquals(1, childCalls.get());
// 1 roots fetch + exactly 1 expand fetch.
assertEquals(2, service.calls.size());
}
}
@Test
void browseWithFilterForwardsToRequest() throws Exception {
BrowseChildrenService service = new BrowseChildrenService();
@@ -486,7 +597,7 @@ final class GalaxyRepositoryClientTests {
return b.build();
}
private static final class BrowseChildrenService extends TestService {
private static class BrowseChildrenService extends TestService {
final List<BrowseChildrenRequest> calls =
Collections.synchronizedList(new CopyOnWriteArrayList<>());
final Queue<BrowseChildrenReply> replies = new ArrayDeque<>();
@@ -140,6 +140,22 @@ class GalaxyRepositoryClient:
)
seen_page_tokens.add(page_token)
async def browse_children_raw(
self, request: galaxy_pb.BrowseChildrenRequest
) -> galaxy_pb.BrowseChildrenReply:
"""Issue one BrowseChildren RPC and return the raw reply.
Lower-level escape hatch for callers that need direct page-token control
or do not want LazyBrowseNode wrapping. Most callers should use
:py:meth:`browse` and :py:meth:`LazyBrowseNode.expand` instead.
"""
return await self._unary(
"browse children",
self.raw_stub.BrowseChildren,
request,
)
async def browse(
self,
options: BrowseChildrenOptions | None = None,
+29
View File
@@ -507,6 +507,35 @@ async def test_browse_with_filter_forwards_to_request() -> None:
assert request.historized_only is True
@pytest.mark.asyncio
async def test_browse_children_raw_returns_reply_unwrapped() -> None:
"""browse_children_raw forwards the request to the stub and returns the raw reply."""
stub = FakeGalaxyStub()
expected = _build_browse_reply(
children=[_obj(1, "Plant", is_area=True)],
child_has_children=[True],
cache_sequence=42,
)
stub.browse_children.replies = [expected]
async with await GalaxyRepositoryClient.connect(
endpoint="fake",
plaintext=True,
stub=stub,
) as client:
request = galaxy_pb.BrowseChildrenRequest(
page_size=10,
tag_name_glob="Plant*",
)
reply = await client.browse_children_raw(request)
assert reply.cache_sequence == 42
assert len(reply.children) == 1
assert reply.children[0].tag_name == "Plant"
assert len(stub.browse_children.requests) == 1
assert stub.browse_children.requests[0].tag_name_glob == "Plant*"
class FakeGalaxyStub:
def __init__(self) -> None:
self.test_connection = FakeUnary([galaxy_pb.TestConnectionReply(ok=False)])
@@ -62,7 +62,16 @@ public static class GalaxyBrowseProjector
return new GalaxyBrowseChildrenResult(page, hasChildren, filtered.Children.Count, filterSignature);
}
private static int ResolveParentId(GalaxyHierarchyCacheEntry entry, BrowseChildrenRequest request)
/// <summary>
/// Resolves the request's parent oneof to a gobject id, throwing
/// <see cref="RpcException"/> with <see cref="StatusCode.NotFound"/> when the
/// parent does not exist. Public so the gRPC handler can compute the same
/// parent id (needed for the page-token signature) without reimplementing the
/// resolution rules.
/// </summary>
/// <param name="entry">The Galaxy hierarchy cache entry to query.</param>
/// <param name="request">The browse-children request.</param>
public static int ResolveParentId(GalaxyHierarchyCacheEntry entry, BrowseChildrenRequest request)
{
switch (request.ParentCase)
{
@@ -80,9 +89,7 @@ public static class GalaxyBrowseProjector
return request.ParentGobjectId;
case BrowseChildrenRequest.ParentOneofCase.ParentTagName:
{
GalaxyObjectView? match = entry.Index.ObjectViews.FirstOrDefault(
view => string.Equals(view.Object.TagName, request.ParentTagName, StringComparison.OrdinalIgnoreCase));
if (match is null)
if (!entry.Index.ObjectViewsByTagName.TryGetValue(request.ParentTagName, out GalaxyObjectView? match))
{
throw new RpcException(new Status(StatusCode.NotFound, "BrowseChildren parent was not found."));
}
@@ -90,9 +97,7 @@ public static class GalaxyBrowseProjector
}
case BrowseChildrenRequest.ParentOneofCase.ParentContainedPath:
{
GalaxyObjectView? match = entry.Index.ObjectViews.FirstOrDefault(
view => string.Equals(view.ContainedPath, request.ParentContainedPath, StringComparison.OrdinalIgnoreCase));
if (match is null)
if (!entry.Index.ObjectViewsByContainedPath.TryGetValue(request.ParentContainedPath, out GalaxyObjectView? match))
{
throw new RpcException(new Status(StatusCode.NotFound, "BrowseChildren parent was not found."));
}
@@ -163,10 +168,17 @@ public static class GalaxyBrowseProjector
return false;
}
// Defend against pathological cycles in Galaxy data (e.g. a corrupt A→B→A chain).
// BuildContainedPath uses the same visited-id pattern; mirror it so this walk
// terminates even when ChildrenByParent forms a cycle.
HashSet<int> visited = new() { parent.Object.GobjectId };
Stack<GalaxyObjectView> stack = new();
foreach (GalaxyObjectView child in children)
{
stack.Push(child);
if (visited.Add(child.Object.GobjectId))
{
stack.Push(child);
}
}
while (stack.Count > 0)
{
@@ -180,7 +192,10 @@ public static class GalaxyBrowseProjector
{
foreach (GalaxyObjectView grandchild in grandchildren)
{
stack.Push(grandchild);
if (visited.Add(grandchild.Object.GobjectId))
{
stack.Push(grandchild);
}
}
}
}
@@ -8,12 +8,16 @@ public sealed class GalaxyHierarchyIndex
IReadOnlyList<GalaxyObjectView> objectViews,
IReadOnlyDictionary<int, GalaxyObjectView> objectViewsById,
IReadOnlyDictionary<string, GalaxyTagLookup> tagsByAddress,
IReadOnlyDictionary<int, IReadOnlyList<GalaxyObjectView>> childrenByParent)
IReadOnlyDictionary<int, IReadOnlyList<GalaxyObjectView>> childrenByParent,
IReadOnlyDictionary<string, GalaxyObjectView> objectViewsByTagName,
IReadOnlyDictionary<string, GalaxyObjectView> objectViewsByContainedPath)
{
ObjectViews = objectViews;
ObjectViewsById = objectViewsById;
TagsByAddress = tagsByAddress;
ChildrenByParent = childrenByParent;
ObjectViewsByTagName = objectViewsByTagName;
ObjectViewsByContainedPath = objectViewsByContainedPath;
}
/// <summary>Gets an empty Galaxy hierarchy index.</summary>
@@ -21,7 +25,9 @@ public sealed class GalaxyHierarchyIndex
Array.Empty<GalaxyObjectView>(),
new Dictionary<int, GalaxyObjectView>(),
new Dictionary<string, GalaxyTagLookup>(StringComparer.OrdinalIgnoreCase),
new Dictionary<int, IReadOnlyList<GalaxyObjectView>>());
new Dictionary<int, IReadOnlyList<GalaxyObjectView>>(),
new Dictionary<string, GalaxyObjectView>(StringComparer.OrdinalIgnoreCase),
new Dictionary<string, GalaxyObjectView>(StringComparer.OrdinalIgnoreCase));
/// <summary>Gets the object views.</summary>
public IReadOnlyList<GalaxyObjectView> ObjectViews { get; }
@@ -35,6 +41,12 @@ public sealed class GalaxyHierarchyIndex
/// <summary>Gets direct children grouped by parent gobject id. Root objects (no parent, or self-parented) live under key 0. Each list is sorted areas-first, then by display name (OrdinalIgnoreCase).</summary>
public IReadOnlyDictionary<int, IReadOnlyList<GalaxyObjectView>> ChildrenByParent { get; }
/// <summary>Gets object views indexed by <see cref="GalaxyObject.TagName"/> (OrdinalIgnoreCase). Lets browse/discover handlers resolve parents/roots by tag name in O(1) instead of scanning <see cref="ObjectViews"/>.</summary>
public IReadOnlyDictionary<string, GalaxyObjectView> ObjectViewsByTagName { get; }
/// <summary>Gets object views indexed by contained path (OrdinalIgnoreCase). Lets browse/discover handlers resolve parents/roots by path in O(1) instead of scanning <see cref="ObjectViews"/>.</summary>
public IReadOnlyDictionary<string, GalaxyObjectView> ObjectViewsByContainedPath { get; }
/// <summary>Builds a Galaxy hierarchy index from the given objects.</summary>
/// <param name="objects">The Galaxy objects to index.</param>
/// <returns>A new Galaxy hierarchy index.</returns>
@@ -54,6 +66,8 @@ public sealed class GalaxyHierarchyIndex
List<GalaxyObjectView> views = new(objects.Count);
Dictionary<int, GalaxyObjectView> viewsById = new();
Dictionary<string, GalaxyTagLookup> tagsByAddress = new(StringComparer.OrdinalIgnoreCase);
Dictionary<string, GalaxyObjectView> viewsByTagName = new(StringComparer.OrdinalIgnoreCase);
Dictionary<string, GalaxyObjectView> viewsByContainedPath = new(StringComparer.OrdinalIgnoreCase);
foreach (GalaxyObject obj in objects)
{
@@ -66,6 +80,12 @@ public sealed class GalaxyHierarchyIndex
if (!string.IsNullOrWhiteSpace(obj.TagName))
{
tagsByAddress.TryAdd(obj.TagName, new GalaxyTagLookup(obj, Attribute: null, path));
viewsByTagName.TryAdd(obj.TagName, view);
}
if (!string.IsNullOrWhiteSpace(path))
{
viewsByContainedPath.TryAdd(path, view);
}
foreach (GalaxyAttribute attribute in obj.Attributes)
@@ -109,7 +129,9 @@ public sealed class GalaxyHierarchyIndex
views,
viewsById,
tagsByAddress,
readOnlyChildren);
readOnlyChildren,
viewsByTagName,
viewsByContainedPath);
}
private static string BuildContainedPath(
@@ -103,7 +103,7 @@ public static class GalaxyHierarchyProjector
// ResolveRoot can throw RpcException(NotFound); run it before consulting the
// memo so a bad root surfaces consistently regardless of cache state.
IReadOnlyList<GalaxyObjectView> views = entry.Index.ObjectViews;
GalaxyObjectView? root = ResolveRoot(request, views);
GalaxyObjectView? root = ResolveRoot(request, entry.Index);
ConcurrentDictionary<string, IReadOnlyList<GalaxyObjectView>> memo =
FilteredViewCache.GetValue(entry, static _ => new ConcurrentDictionary<string, IReadOnlyList<GalaxyObjectView>>(StringComparer.Ordinal));
@@ -176,17 +176,17 @@ public static class GalaxyHierarchyProjector
private static GalaxyObjectView? ResolveRoot(
DiscoverHierarchyRequest request,
IReadOnlyList<GalaxyObjectView> views)
GalaxyHierarchyIndex index)
{
GalaxyObjectView? root = request.RootCase switch
{
DiscoverHierarchyRequest.RootOneofCase.None => null,
DiscoverHierarchyRequest.RootOneofCase.RootGobjectId => views.FirstOrDefault(
view => view.Object.GobjectId == request.RootGobjectId),
DiscoverHierarchyRequest.RootOneofCase.RootTagName => views.FirstOrDefault(
view => string.Equals(view.Object.TagName, request.RootTagName, StringComparison.OrdinalIgnoreCase)),
DiscoverHierarchyRequest.RootOneofCase.RootContainedPath => views.FirstOrDefault(
view => string.Equals(view.ContainedPath, request.RootContainedPath, StringComparison.OrdinalIgnoreCase)),
DiscoverHierarchyRequest.RootOneofCase.RootGobjectId =>
index.ObjectViewsById.TryGetValue(request.RootGobjectId, out GalaxyObjectView? byId) ? byId : null,
DiscoverHierarchyRequest.RootOneofCase.RootTagName =>
index.ObjectViewsByTagName.TryGetValue(request.RootTagName, out GalaxyObjectView? byTag) ? byTag : null,
DiscoverHierarchyRequest.RootOneofCase.RootContainedPath =>
index.ObjectViewsByContainedPath.TryGetValue(request.RootContainedPath, out GalaxyObjectView? byPath) ? byPath : null,
_ => null,
};
@@ -128,8 +128,11 @@ public sealed class GalaxyRepositoryGrpcService(
IReadOnlyList<string> browseSubtrees = ResolveBrowseSubtrees();
// Resolve the parent id once so the page-token signature can include it
// and the projector sees the same resolved id when memoizing.
int parentId = ResolveParentIdForToken(entry, request);
// and the projector sees the same resolved id when memoizing. The projector
// re-resolves internally; with the by-name/by-path indexes on
// GalaxyHierarchyIndex that second call is O(1), so the redundancy is cheap
// and keeps the projector self-contained.
int parentId = GalaxyDb.GalaxyBrowseProjector.ResolveParentId(entry, request);
string filterSignature = GalaxyDb.GalaxyBrowseProjector.ComputeFilterSignature(
request, browseSubtrees, parentId);
PageToken pageToken = ParsePageToken(request.PageToken, entry.Sequence, filterSignature);
@@ -283,32 +286,6 @@ public sealed class GalaxyRepositoryGrpcService(
return Math.Min(pageSize, MaxDiscoverPageSize);
}
// Lightweight parent resolver used only for signature computation. Re-throws
// NotFound consistently with the projector so the error surface matches.
private static int ResolveParentIdForToken(
GalaxyDb.GalaxyHierarchyCacheEntry entry,
BrowseChildrenRequest request)
{
return request.ParentCase switch
{
BrowseChildrenRequest.ParentOneofCase.None => 0,
BrowseChildrenRequest.ParentOneofCase.ParentGobjectId =>
request.ParentGobjectId == 0 ? 0
: entry.Index.ObjectViewsById.ContainsKey(request.ParentGobjectId)
? request.ParentGobjectId
: throw new RpcException(new Status(StatusCode.NotFound, "BrowseChildren parent was not found.")),
BrowseChildrenRequest.ParentOneofCase.ParentTagName =>
entry.Index.ObjectViews.FirstOrDefault(
v => string.Equals(v.Object.TagName, request.ParentTagName, StringComparison.OrdinalIgnoreCase))?.Object.GobjectId
?? throw new RpcException(new Status(StatusCode.NotFound, "BrowseChildren parent was not found.")),
BrowseChildrenRequest.ParentOneofCase.ParentContainedPath =>
entry.Index.ObjectViews.FirstOrDefault(
v => string.Equals(v.ContainedPath, request.ParentContainedPath, StringComparison.OrdinalIgnoreCase))?.Object.GobjectId
?? throw new RpcException(new Status(StatusCode.NotFound, "BrowseChildren parent was not found.")),
_ => 0,
};
}
private IReadOnlyList<string> ResolveBrowseSubtrees()
{
ApiKeyConstraints constraints = identityAccessor.Current?.EffectiveConstraints ?? ApiKeyConstraints.Empty;
@@ -348,21 +325,21 @@ public sealed class GalaxyRepositoryGrpcService(
{
throw new RpcException(new Status(
StatusCode.InvalidArgument,
"DiscoverHierarchy page_token is invalid."));
"page_token is invalid."));
}
if (sequence != currentSequence)
{
throw new RpcException(new Status(
StatusCode.InvalidArgument,
"DiscoverHierarchy page_token is stale."));
"page_token is stale."));
}
if (!string.Equals(parts[1], currentFilterSignature, StringComparison.Ordinal))
{
throw new RpcException(new Status(
StatusCode.InvalidArgument,
"DiscoverHierarchy page_token does not match the current filters."));
"page_token does not match the current filters."));
}
return new PageToken(sequence, parts[1], offset);
@@ -223,6 +223,77 @@ public sealed class GalaxyBrowseProjectorTests
Assert.Equal("Plant.Line_A", result.Children[0].TagName);
}
/// <summary>
/// Verifies <see cref="GalaxyBrowseProjector"/> terminates when the Galaxy data
/// contains a cyclic parent chain (A→B→C→A). Without the visited-id guard in
/// <c>HasMatchingDescendant</c>, the depth-first walk would loop forever; the
/// 5-second xUnit timeout asserts termination.
/// </summary>
[Fact(Timeout = 5000)]
public async Task Project_CyclicDescendants_DoesNotInfiniteLoop()
{
await Task.Yield();
// Construct a 3-node cycle: A(10)→B(11)→C(12)→A. Each node's ParentGobjectId
// points to the next, so GalaxyHierarchyIndex.ChildrenByParent has
// [12] = [A], [10] = [B], [11] = [C].
// None of them are historized, so HistorizedOnly=true forces the projector to
// call HasMatchingDescendant on every direct child, exercising the cycle walk.
GalaxyObject a = new()
{
GobjectId = 10,
ParentGobjectId = 12,
ContainedName = "A",
BrowseName = "A",
TagName = "A",
};
GalaxyObject b = new()
{
GobjectId = 11,
ParentGobjectId = 10,
ContainedName = "B",
BrowseName = "B",
TagName = "B",
};
GalaxyObject c = new()
{
GobjectId = 12,
ParentGobjectId = 11,
ContainedName = "C",
BrowseName = "C",
TagName = "C",
};
IReadOnlyList<GalaxyObject> objects = new[] { a, b, c };
GalaxyHierarchyCacheEntry entry = GalaxyHierarchyCacheEntry.Empty with
{
Status = GalaxyCacheStatus.Healthy,
Sequence = 1,
LastSuccessAt = DateTimeOffset.UtcNow,
Objects = objects,
Index = GalaxyHierarchyIndex.Build(objects),
DashboardSummary = DashboardGalaxySummary.Unknown with
{
Status = DashboardGalaxyStatus.Healthy,
ObjectCount = objects.Count,
},
ObjectCount = objects.Count,
};
// Browse children of A (id=10). Its direct child B fails HistorizedOnly, so the
// projector falls back to HasMatchingDescendant(B), which walks B→C→A→B…
// without the visited-id guard. With the guard, the walk terminates and returns
// an empty page (no historized descendants exist anywhere in the cycle).
GalaxyBrowseChildrenResult result = GalaxyBrowseProjector.ProjectChildren(
entry,
new BrowseChildrenRequest { ParentGobjectId = 10, HistorizedOnly = true },
browseSubtreeGlobs: null,
offset: 0,
pageSize: 10);
Assert.Empty(result.Children);
Assert.Equal(0, result.TotalChildCount);
}
private static GalaxyHierarchyCacheEntry CreateEntry()
{
IReadOnlyList<GalaxyObject> objects = CreateObjects();
@@ -75,6 +75,47 @@ public sealed class GalaxyHierarchyIndexTests
}
}
/// <summary>Verifies <see cref="GalaxyHierarchyIndex.ObjectViewsByTagName"/> is OrdinalIgnoreCase and supports O(1) lookups.</summary>
[Fact]
public void ObjectViewsByTagName_IsCaseInsensitive_AndLookupsAreO1()
{
GalaxyObject root = new() { GobjectId = 1, ParentGobjectId = 0, IsArea = true, ContainedName = "Plant", BrowseName = "Plant", TagName = "Plant" };
GalaxyObject mixer = new() { GobjectId = 2, ParentGobjectId = 1, ContainedName = "Mixer_001", BrowseName = "Mixer_001", TagName = "Plant.Mixer_001" };
GalaxyHierarchyIndex index = GalaxyHierarchyIndex.Build([root, mixer]);
Assert.True(index.ObjectViewsByTagName.TryGetValue("Plant.Mixer_001", out GalaxyObjectView? exact));
Assert.NotNull(exact);
Assert.Equal(2, exact!.Object.GobjectId);
// Case-insensitive lookup must hit the same entry.
Assert.True(index.ObjectViewsByTagName.TryGetValue("plant.mixer_001", out GalaxyObjectView? lower));
Assert.NotNull(lower);
Assert.Same(exact, lower);
Assert.False(index.ObjectViewsByTagName.ContainsKey("Plant.Missing"));
}
/// <summary>Verifies <see cref="GalaxyHierarchyIndex.ObjectViewsByContainedPath"/> is OrdinalIgnoreCase.</summary>
[Fact]
public void ObjectViewsByContainedPath_IsCaseInsensitive()
{
GalaxyObject root = new() { GobjectId = 1, ParentGobjectId = 0, IsArea = true, ContainedName = "Plant", BrowseName = "Plant", TagName = "Plant" };
GalaxyObject lineA = new() { GobjectId = 2, ParentGobjectId = 1, IsArea = true, ContainedName = "Line_A", BrowseName = "Line_A", TagName = "Plant.Line_A" };
GalaxyHierarchyIndex index = GalaxyHierarchyIndex.Build([root, lineA]);
Assert.True(index.ObjectViewsByContainedPath.TryGetValue("Plant/Line_A", out GalaxyObjectView? exact));
Assert.NotNull(exact);
Assert.Equal(2, exact!.Object.GobjectId);
Assert.True(index.ObjectViewsByContainedPath.TryGetValue("plant/line_a", out GalaxyObjectView? lower));
Assert.NotNull(lower);
Assert.Same(exact, lower);
Assert.False(index.ObjectViewsByContainedPath.ContainsKey("Plant/Missing"));
}
/// <summary>Verifies children sort areas-first, then by display name (case-insensitive).</summary>
[Fact]
public void ChildrenByParent_SortsAreasFirstThenByDisplayName()