Compare commits
7 Commits
b2448510ac
...
92cc4688e6
| Author | SHA1 | Date | |
|---|---|---|---|
| 92cc4688e6 | |||
| a155554038 | |||
| 68f905a344 | |||
| 5abc222c72 | |||
| da3aa7b0b2 | |||
| f0ec068430 | |||
| 1a1d14a9fd |
+105
-18
@@ -18,6 +18,11 @@ import (
|
|||||||
// browseChildrenPageSize is the per-request page size used by the lazy walker.
|
// browseChildrenPageSize is the per-request page size used by the lazy walker.
|
||||||
const browseChildrenPageSize = 500
|
const browseChildrenPageSize = 500
|
||||||
|
|
||||||
|
// discoverHierarchyPageSize is the per-request page size used by DiscoverHierarchy.
|
||||||
|
// Mirrors the .NET client constant so large galaxies are not silently truncated
|
||||||
|
// by the server's default page cap.
|
||||||
|
const discoverHierarchyPageSize = 5000
|
||||||
|
|
||||||
// RawGalaxyRepositoryClient is the generated gRPC client interface for the
|
// RawGalaxyRepositoryClient is the generated gRPC client interface for the
|
||||||
// Galaxy Repository service exposed for callers that need direct contract
|
// Galaxy Repository service exposed for callers that need direct contract
|
||||||
// access.
|
// access.
|
||||||
@@ -155,16 +160,35 @@ func (c *GalaxyClient) GetLastDeployTime(ctx context.Context) (time.Time, bool,
|
|||||||
|
|
||||||
// DiscoverHierarchy returns the deployed Galaxy object hierarchy with each
|
// DiscoverHierarchy returns the deployed Galaxy object hierarchy with each
|
||||||
// object's dynamic attributes. The objects are returned in the order supplied
|
// object's dynamic attributes. The objects are returned in the order supplied
|
||||||
// by the server.
|
// by the server. The call pages over the server's NextPageToken until the
|
||||||
|
// server signals it has no more results, matching the .NET client.
|
||||||
func (c *GalaxyClient) DiscoverHierarchy(ctx context.Context) ([]*GalaxyObject, error) {
|
func (c *GalaxyClient) DiscoverHierarchy(ctx context.Context) ([]*GalaxyObject, error) {
|
||||||
|
var objects []*GalaxyObject
|
||||||
|
pageToken := ""
|
||||||
|
seen := map[string]struct{}{}
|
||||||
|
for {
|
||||||
callCtx, cancel := c.callContext(ctx)
|
callCtx, cancel := c.callContext(ctx)
|
||||||
defer cancel()
|
reply, err := c.raw.DiscoverHierarchy(callCtx, &pb.DiscoverHierarchyRequest{
|
||||||
|
PageSize: discoverHierarchyPageSize,
|
||||||
reply, err := c.raw.DiscoverHierarchy(callCtx, &pb.DiscoverHierarchyRequest{})
|
PageToken: pageToken,
|
||||||
|
})
|
||||||
|
cancel()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, &GatewayError{Op: "galaxy discover hierarchy", Err: err}
|
return nil, &GatewayError{Op: "galaxy discover hierarchy", Err: err}
|
||||||
}
|
}
|
||||||
return reply.GetObjects(), nil
|
objects = append(objects, reply.GetObjects()...)
|
||||||
|
pageToken = reply.GetNextPageToken()
|
||||||
|
if pageToken == "" {
|
||||||
|
return objects, nil
|
||||||
|
}
|
||||||
|
if _, dup := seen[pageToken]; dup {
|
||||||
|
return nil, &GatewayError{
|
||||||
|
Op: "galaxy discover hierarchy",
|
||||||
|
Err: fmt.Errorf("repeated page token %q", pageToken),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
seen[pageToken] = struct{}{}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WatchDeployEventsRaw starts the generated WatchDeployEvents stream for callers
|
// WatchDeployEventsRaw starts the generated WatchDeployEvents stream for callers
|
||||||
@@ -249,15 +273,25 @@ func (c *GalaxyClient) Close() error {
|
|||||||
|
|
||||||
// LazyBrowseNode is one node in a lazy Galaxy hierarchy walk produced by
|
// LazyBrowseNode is one node in a lazy Galaxy hierarchy walk produced by
|
||||||
// (*GalaxyClient).Browse. Children are not fetched until Expand is called.
|
// (*GalaxyClient).Browse. Children are not fetched until Expand is called.
|
||||||
// The node is safe for concurrent use; concurrent Expand calls collapse to a
|
// The node is safe for concurrent use; concurrent Expand calls coalesce onto
|
||||||
// single RPC.
|
// a single in-flight RPC and do not block snapshot accessors.
|
||||||
type LazyBrowseNode struct {
|
type LazyBrowseNode struct {
|
||||||
client *GalaxyClient
|
client *GalaxyClient
|
||||||
object *pb.GalaxyObject
|
object *pb.GalaxyObject
|
||||||
hasChildrenHint bool
|
hasChildrenHint bool
|
||||||
options BrowseChildrenOptions
|
options BrowseChildrenOptions
|
||||||
|
|
||||||
mu sync.Mutex
|
// expandLock gates inspection and mutation of expand-coordination state
|
||||||
|
// (expanding, expandDone, expandErr). It is held only briefly; the BrowseChildren
|
||||||
|
// RPC itself runs outside this lock so concurrent readers and waiters are not blocked.
|
||||||
|
expandLock sync.Mutex
|
||||||
|
expanding bool
|
||||||
|
expandDone chan struct{}
|
||||||
|
expandErr error
|
||||||
|
|
||||||
|
// mu protects the children snapshot and isExpanded flag for concurrent
|
||||||
|
// Children() / IsExpanded() readers.
|
||||||
|
mu sync.RWMutex
|
||||||
children []*LazyBrowseNode
|
children []*LazyBrowseNode
|
||||||
isExpanded bool
|
isExpanded bool
|
||||||
}
|
}
|
||||||
@@ -272,8 +306,8 @@ func (n *LazyBrowseNode) HasChildrenHint() bool { return n.hasChildrenHint }
|
|||||||
// Children returns a snapshot copy of the currently-loaded child nodes. Returns
|
// Children returns a snapshot copy of the currently-loaded child nodes. Returns
|
||||||
// an empty slice when Expand has not yet been called.
|
// an empty slice when Expand has not yet been called.
|
||||||
func (n *LazyBrowseNode) Children() []*LazyBrowseNode {
|
func (n *LazyBrowseNode) Children() []*LazyBrowseNode {
|
||||||
n.mu.Lock()
|
n.mu.RLock()
|
||||||
defer n.mu.Unlock()
|
defer n.mu.RUnlock()
|
||||||
out := make([]*LazyBrowseNode, len(n.children))
|
out := make([]*LazyBrowseNode, len(n.children))
|
||||||
copy(out, n.children)
|
copy(out, n.children)
|
||||||
return out
|
return out
|
||||||
@@ -281,28 +315,81 @@ func (n *LazyBrowseNode) Children() []*LazyBrowseNode {
|
|||||||
|
|
||||||
// IsExpanded reports whether Expand has completed successfully on this node.
|
// IsExpanded reports whether Expand has completed successfully on this node.
|
||||||
func (n *LazyBrowseNode) IsExpanded() bool {
|
func (n *LazyBrowseNode) IsExpanded() bool {
|
||||||
n.mu.Lock()
|
n.mu.RLock()
|
||||||
defer n.mu.Unlock()
|
defer n.mu.RUnlock()
|
||||||
return n.isExpanded
|
return n.isExpanded
|
||||||
}
|
}
|
||||||
|
|
||||||
// Expand fetches this node's direct children via BrowseChildren when they have
|
// Expand fetches this node's direct children via BrowseChildren when they have
|
||||||
// not yet been loaded. Subsequent calls after a successful Expand are a no-op
|
// not yet been loaded. Subsequent calls after a successful Expand are a no-op
|
||||||
// and do not issue another RPC.
|
// and do not issue another RPC.
|
||||||
|
//
|
||||||
|
// Expand is safe to call concurrently from multiple goroutines: callers that
|
||||||
|
// arrive while an expansion is in flight wait on the active RPC and share its
|
||||||
|
// result instead of issuing a second RPC. The RPC itself runs without holding
|
||||||
|
// the snapshot mutex, so concurrent Children() and IsExpanded() callers are
|
||||||
|
// not blocked for the duration of the network round trip.
|
||||||
|
//
|
||||||
|
// Failure semantics: a failed expansion surfaces the same error to every
|
||||||
|
// in-flight waiter, but the node is left in its pre-call state (isExpanded =
|
||||||
|
// false, no in-flight expansion). The next Expand call therefore retries with
|
||||||
|
// a fresh RPC; failures are not sticky.
|
||||||
func (n *LazyBrowseNode) Expand(ctx context.Context) error {
|
func (n *LazyBrowseNode) Expand(ctx context.Context) error {
|
||||||
n.mu.Lock()
|
// Fast path: already expanded.
|
||||||
defer n.mu.Unlock()
|
n.mu.RLock()
|
||||||
if n.isExpanded {
|
if n.isExpanded {
|
||||||
|
n.mu.RUnlock()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
n.mu.RUnlock()
|
||||||
|
|
||||||
|
// Either start a new expansion or wait on an existing one.
|
||||||
|
n.expandLock.Lock()
|
||||||
|
n.mu.RLock()
|
||||||
|
alreadyExpanded := n.isExpanded
|
||||||
|
n.mu.RUnlock()
|
||||||
|
if alreadyExpanded {
|
||||||
|
n.expandLock.Unlock()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if n.expanding {
|
||||||
|
done := n.expandDone
|
||||||
|
n.expandLock.Unlock()
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
n.expandLock.Lock()
|
||||||
|
err := n.expandErr
|
||||||
|
n.expandLock.Unlock()
|
||||||
|
return err
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
n.expanding = true
|
||||||
|
n.expandDone = make(chan struct{})
|
||||||
|
done := n.expandDone
|
||||||
|
n.expandLock.Unlock()
|
||||||
|
|
||||||
|
// Issue the RPC outside any lock so concurrent readers/waiters are not blocked.
|
||||||
parentID := n.object.GetGobjectId()
|
parentID := n.object.GetGobjectId()
|
||||||
children, err := n.client.browseChildrenInner(ctx, &parentID, n.options)
|
children, err := n.client.browseChildrenInner(ctx, &parentID, n.options)
|
||||||
if err != nil {
|
|
||||||
return err
|
if err == nil {
|
||||||
}
|
n.mu.Lock()
|
||||||
n.children = children
|
n.children = children
|
||||||
n.isExpanded = true
|
n.isExpanded = true
|
||||||
return nil
|
n.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Publish result to waiters and clear the in-flight marker so a failed
|
||||||
|
// expansion can be retried by the next Expand call.
|
||||||
|
n.expandLock.Lock()
|
||||||
|
n.expandErr = err
|
||||||
|
n.expanding = false
|
||||||
|
close(done)
|
||||||
|
n.expandLock.Unlock()
|
||||||
|
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Browse returns the root nodes of the Galaxy hierarchy. The returned nodes
|
// Browse returns the root nodes of the Galaxy hierarchy. The returned nodes
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"net"
|
"net"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -146,6 +147,47 @@ func TestGalaxyDiscoverHierarchyReturnsObjects(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestGalaxyDiscoverHierarchyPaginatesAcrossMultiplePages(t *testing.T) {
|
||||||
|
page1 := &pb.DiscoverHierarchyReply{
|
||||||
|
Objects: []*pb.GalaxyObject{
|
||||||
|
{GobjectId: 1, TagName: "A"},
|
||||||
|
{GobjectId: 2, TagName: "B"},
|
||||||
|
},
|
||||||
|
NextPageToken: "page-2",
|
||||||
|
TotalObjectCount: 3,
|
||||||
|
}
|
||||||
|
page2 := &pb.DiscoverHierarchyReply{
|
||||||
|
Objects: []*pb.GalaxyObject{
|
||||||
|
{GobjectId: 3, TagName: "C"},
|
||||||
|
},
|
||||||
|
TotalObjectCount: 3,
|
||||||
|
}
|
||||||
|
fake := &fakeGalaxyServer{
|
||||||
|
discoverHierarchyReplies: []*pb.DiscoverHierarchyReply{page1, page2},
|
||||||
|
}
|
||||||
|
client, cleanup := newGalaxyBufconnClient(t, fake)
|
||||||
|
defer cleanup()
|
||||||
|
|
||||||
|
objs, err := client.DiscoverHierarchy(context.Background())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("DiscoverHierarchy: %v", err)
|
||||||
|
}
|
||||||
|
if got, want := len(objs), 3; got != want {
|
||||||
|
t.Fatalf("len(objs) = %d, want %d", got, want)
|
||||||
|
}
|
||||||
|
if len(fake.discoverHierarchyCalls) != 2 {
|
||||||
|
t.Fatalf("expected 2 RPC calls, got %d", len(fake.discoverHierarchyCalls))
|
||||||
|
}
|
||||||
|
if fake.discoverHierarchyCalls[0].GetPageSize() != discoverHierarchyPageSize {
|
||||||
|
t.Fatalf("first call PageSize = %d, want %d",
|
||||||
|
fake.discoverHierarchyCalls[0].GetPageSize(), discoverHierarchyPageSize)
|
||||||
|
}
|
||||||
|
if fake.discoverHierarchyCalls[1].GetPageToken() != "page-2" {
|
||||||
|
t.Fatalf("second call page token = %q, want %q",
|
||||||
|
fake.discoverHierarchyCalls[1].GetPageToken(), "page-2")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestGalaxyDialReturnsGatewayErrorOnRpcFailure(t *testing.T) {
|
func TestGalaxyDialReturnsGatewayErrorOnRpcFailure(t *testing.T) {
|
||||||
fake := &fakeGalaxyServer{failTest: true}
|
fake := &fakeGalaxyServer{failTest: true}
|
||||||
client, cleanup := newGalaxyBufconnClient(t, fake)
|
client, cleanup := newGalaxyBufconnClient(t, fake)
|
||||||
@@ -377,6 +419,8 @@ type fakeGalaxyServer struct {
|
|||||||
failTest bool
|
failTest bool
|
||||||
deployReply *pb.GetLastDeployTimeReply
|
deployReply *pb.GetLastDeployTimeReply
|
||||||
discoverReply *pb.DiscoverHierarchyReply
|
discoverReply *pb.DiscoverHierarchyReply
|
||||||
|
discoverHierarchyCalls []*pb.DiscoverHierarchyRequest
|
||||||
|
discoverHierarchyReplies []*pb.DiscoverHierarchyReply
|
||||||
watchEvents []*pb.DeployEvent
|
watchEvents []*pb.DeployEvent
|
||||||
watchRequest *pb.WatchDeployEventsRequest
|
watchRequest *pb.WatchDeployEventsRequest
|
||||||
watchSendInterval time.Duration
|
watchSendInterval time.Duration
|
||||||
@@ -405,6 +449,12 @@ func (s *fakeGalaxyServer) GetLastDeployTime(ctx context.Context, req *pb.GetLas
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *fakeGalaxyServer) DiscoverHierarchy(ctx context.Context, req *pb.DiscoverHierarchyRequest) (*pb.DiscoverHierarchyReply, error) {
|
func (s *fakeGalaxyServer) DiscoverHierarchy(ctx context.Context, req *pb.DiscoverHierarchyRequest) (*pb.DiscoverHierarchyReply, error) {
|
||||||
|
s.discoverHierarchyCalls = append(s.discoverHierarchyCalls, req)
|
||||||
|
if len(s.discoverHierarchyReplies) > 0 {
|
||||||
|
reply := s.discoverHierarchyReplies[0]
|
||||||
|
s.discoverHierarchyReplies = s.discoverHierarchyReplies[1:]
|
||||||
|
return reply, nil
|
||||||
|
}
|
||||||
if s.discoverReply != nil {
|
if s.discoverReply != nil {
|
||||||
return s.discoverReply, nil
|
return s.discoverReply, nil
|
||||||
}
|
}
|
||||||
@@ -739,6 +789,53 @@ func TestGalaxyBrowseWithFilterForwardsToRequest(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestGalaxyBrowseExpandConcurrentCallersOnlyFireOneRpc(t *testing.T) {
|
||||||
|
fake := &fakeGalaxyServer{
|
||||||
|
browseChildrenReplies: []*pb.BrowseChildrenReply{
|
||||||
|
// roots
|
||||||
|
buildBrowseReply([]*pb.GalaxyObject{obj(1, "Plant", true)}, []bool{true}, 7),
|
||||||
|
// one expand: one child
|
||||||
|
buildBrowseReply([]*pb.GalaxyObject{obj(2, "Mixer", false)}, []bool{false}, 7),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
client, cleanup := newGalaxyBufconnClient(t, fake)
|
||||||
|
defer cleanup()
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
roots, err := client.Browse(ctx, nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Browse: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
errs := make(chan error, 10)
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
errs <- roots[0].Expand(ctx)
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
close(errs)
|
||||||
|
for err := range errs {
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("concurrent Expand: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !roots[0].IsExpanded() {
|
||||||
|
t.Fatal("IsExpanded() = false after 10 concurrent expands")
|
||||||
|
}
|
||||||
|
if got, want := len(roots[0].Children()), 1; got != want {
|
||||||
|
t.Fatalf("len(children) = %d, want %d", got, want)
|
||||||
|
}
|
||||||
|
// 1 roots fetch + exactly 1 expand fetch.
|
||||||
|
if got, want := len(fake.browseChildrenCalls), 2; got != want {
|
||||||
|
t.Fatalf("RPC count = %d, want %d", got, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestGalaxyBrowseChildrenRejectsRepeatedPageToken(t *testing.T) {
|
func TestGalaxyBrowseChildrenRejectsRepeatedPageToken(t *testing.T) {
|
||||||
// Build a reply that carries a non-empty NextPageToken so browseChildrenInner
|
// Build a reply that carries a non-empty NextPageToken so browseChildrenInner
|
||||||
// will request a second page. Queue the same reply twice so the second response
|
// will request a second page. Queue the same reply twice so the second response
|
||||||
|
|||||||
+81
-6
@@ -4,6 +4,9 @@ import galaxy_repository.v1.GalaxyRepositoryOuterClass.GalaxyObject;
|
|||||||
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* One node in a lazy-loaded Galaxy browse tree. Holds the underlying
|
* One node in a lazy-loaded Galaxy browse tree. Holds the underlying
|
||||||
@@ -16,7 +19,14 @@ public final class LazyBrowseNode {
|
|||||||
private final GalaxyObject object;
|
private final GalaxyObject object;
|
||||||
private final boolean hasChildrenHint;
|
private final boolean hasChildrenHint;
|
||||||
private final BrowseChildrenOptions options;
|
private final BrowseChildrenOptions options;
|
||||||
private final Object lock = new Object();
|
|
||||||
|
// expandLock gates the start of a new expand AND the publish of the in-flight
|
||||||
|
// future. Readers (getChildren / isExpanded) use a separate read-write lock so
|
||||||
|
// they never block on the gRPC call.
|
||||||
|
private final Object expandLock = new Object();
|
||||||
|
private CompletableFuture<Void> inFlight;
|
||||||
|
|
||||||
|
private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
|
||||||
private List<LazyBrowseNode> children = Collections.emptyList();
|
private List<LazyBrowseNode> children = Collections.emptyList();
|
||||||
private boolean isExpanded;
|
private boolean isExpanded;
|
||||||
|
|
||||||
@@ -43,15 +53,21 @@ public final class LazyBrowseNode {
|
|||||||
|
|
||||||
/** @return a snapshot of direct children loaded by {@link #expand()}; empty until then. */
|
/** @return a snapshot of direct children loaded by {@link #expand()}; empty until then. */
|
||||||
public List<LazyBrowseNode> getChildren() {
|
public List<LazyBrowseNode> getChildren() {
|
||||||
synchronized (lock) {
|
readWriteLock.readLock().lock();
|
||||||
|
try {
|
||||||
return List.copyOf(children);
|
return List.copyOf(children);
|
||||||
|
} finally {
|
||||||
|
readWriteLock.readLock().unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/** @return {@code true} after the first {@link #expand()} call completes. */
|
/** @return {@code true} after the first {@link #expand()} call completes. */
|
||||||
public boolean isExpanded() {
|
public boolean isExpanded() {
|
||||||
synchronized (lock) {
|
readWriteLock.readLock().lock();
|
||||||
|
try {
|
||||||
return isExpanded;
|
return isExpanded;
|
||||||
|
} finally {
|
||||||
|
readWriteLock.readLock().unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -59,17 +75,76 @@ public final class LazyBrowseNode {
|
|||||||
* Fetches direct children from the gateway and populates {@link #getChildren()}.
|
* Fetches direct children from the gateway and populates {@link #getChildren()}.
|
||||||
* Idempotent: subsequent calls are no-ops and do not issue a second RPC.
|
* Idempotent: subsequent calls are no-ops and do not issue a second RPC.
|
||||||
*
|
*
|
||||||
|
* <p>Concurrent callers coalesce onto a single in-flight RPC: the first caller
|
||||||
|
* (the "leader") issues the gRPC call, while any other thread that calls
|
||||||
|
* {@code expand()} during that window blocks on the leader's future and sees
|
||||||
|
* the same result (or the same exception). On failure the in-flight slot is
|
||||||
|
* cleared so a subsequent call can retry.
|
||||||
|
*
|
||||||
|
* <p>Readers ({@link #getChildren()} / {@link #isExpanded()}) take a separate
|
||||||
|
* read lock and are never blocked for the duration of the RPC.
|
||||||
|
*
|
||||||
* @throws MxGatewayException on transport or protocol failure
|
* @throws MxGatewayException on transport or protocol failure
|
||||||
*/
|
*/
|
||||||
public void expand() {
|
public void expand() {
|
||||||
synchronized (lock) {
|
if (isExpanded()) {
|
||||||
if (isExpanded) {
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
CompletableFuture<Void> future;
|
||||||
|
boolean iAmTheLeader;
|
||||||
|
synchronized (expandLock) {
|
||||||
|
if (isExpanded()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (inFlight != null) {
|
||||||
|
future = inFlight;
|
||||||
|
iAmTheLeader = false;
|
||||||
|
} else {
|
||||||
|
future = new CompletableFuture<>();
|
||||||
|
inFlight = future;
|
||||||
|
iAmTheLeader = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (iAmTheLeader) {
|
||||||
|
try {
|
||||||
List<LazyBrowseNode> loaded =
|
List<LazyBrowseNode> loaded =
|
||||||
client.browseChildrenInner(Integer.valueOf(object.getGobjectId()), options);
|
client.browseChildrenInner(object.getGobjectId(), options);
|
||||||
|
readWriteLock.writeLock().lock();
|
||||||
|
try {
|
||||||
this.children = loaded;
|
this.children = loaded;
|
||||||
this.isExpanded = true;
|
this.isExpanded = true;
|
||||||
|
} finally {
|
||||||
|
readWriteLock.writeLock().unlock();
|
||||||
|
}
|
||||||
|
synchronized (expandLock) {
|
||||||
|
inFlight = null;
|
||||||
|
}
|
||||||
|
future.complete(null);
|
||||||
|
} catch (RuntimeException ex) {
|
||||||
|
synchronized (expandLock) {
|
||||||
|
inFlight = null;
|
||||||
|
}
|
||||||
|
future.completeExceptionally(ex);
|
||||||
|
throw ex;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
try {
|
||||||
|
future.get();
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
throw new MxGatewayException("Interrupted waiting for browse-children expand.", ie);
|
||||||
|
} catch (ExecutionException ee) {
|
||||||
|
Throwable cause = ee.getCause();
|
||||||
|
if (cause instanceof MxGatewayException me) {
|
||||||
|
throw me;
|
||||||
|
}
|
||||||
|
if (cause instanceof RuntimeException re) {
|
||||||
|
throw re;
|
||||||
|
}
|
||||||
|
throw new MxGatewayException("BrowseChildren expand failed.", cause);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
+91
-1
@@ -40,9 +40,14 @@ import java.util.List;
|
|||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.Queue;
|
import java.util.Queue;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.concurrent.CopyOnWriteArrayList;
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
@@ -466,6 +471,91 @@ final class GalaxyRepositoryClientTests {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void browseExpandConcurrentCallersOnlyFireOneRpc() throws Exception {
|
||||||
|
// Verifies that concurrent expand() calls coalesce onto a single in-flight
|
||||||
|
// BrowseChildren RPC and that readers (isExpanded/getChildren) are not
|
||||||
|
// blocked for the full RPC duration.
|
||||||
|
BrowseChildrenReply rootsReply = browseReply(
|
||||||
|
List.of(obj(1, "Plant", true)),
|
||||||
|
List.of(true),
|
||||||
|
7L,
|
||||||
|
"");
|
||||||
|
BrowseChildrenReply childrenReply = browseReply(
|
||||||
|
List.of(obj(2, "Mixer_001", false)),
|
||||||
|
List.of(false),
|
||||||
|
7L,
|
||||||
|
"");
|
||||||
|
|
||||||
|
// Gate the child fetch behind a latch so multiple expanders can pile up.
|
||||||
|
CountDownLatch release = new CountDownLatch(1);
|
||||||
|
AtomicInteger childCalls = new AtomicInteger();
|
||||||
|
BrowseChildrenService service = new BrowseChildrenService() {
|
||||||
|
@Override
|
||||||
|
public void browseChildren(
|
||||||
|
BrowseChildrenRequest request, StreamObserver<BrowseChildrenReply> responseObserver) {
|
||||||
|
calls.add(request);
|
||||||
|
BrowseChildrenReply reply;
|
||||||
|
if (!request.hasParentGobjectId()) {
|
||||||
|
reply = rootsReply;
|
||||||
|
} else {
|
||||||
|
// Block the leader until the followers have arrived.
|
||||||
|
try {
|
||||||
|
assertTrue(release.await(5, TimeUnit.SECONDS), "release latch never tripped");
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
responseObserver.onError(Status.CANCELLED.asRuntimeException());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
childCalls.incrementAndGet();
|
||||||
|
reply = childrenReply;
|
||||||
|
}
|
||||||
|
responseObserver.onNext(reply);
|
||||||
|
responseObserver.onCompleted();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
try (InProcessGalaxy g = InProcessGalaxy.start(service, new AtomicReference<>());
|
||||||
|
GalaxyRepositoryClient client = g.client("")) {
|
||||||
|
List<LazyBrowseNode> roots = client.browse();
|
||||||
|
LazyBrowseNode root = roots.get(0);
|
||||||
|
|
||||||
|
int parallelism = 10;
|
||||||
|
ExecutorService pool = Executors.newFixedThreadPool(parallelism);
|
||||||
|
try {
|
||||||
|
CountDownLatch ready = new CountDownLatch(parallelism);
|
||||||
|
List<Future<Void>> futures = new ArrayList<>();
|
||||||
|
for (int i = 0; i < parallelism; i++) {
|
||||||
|
futures.add(pool.submit(() -> {
|
||||||
|
ready.countDown();
|
||||||
|
root.expand();
|
||||||
|
return null;
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
// Wait for all callers to be in flight, then release the leader.
|
||||||
|
assertTrue(ready.await(5, TimeUnit.SECONDS), "expander threads did not start");
|
||||||
|
// Readers must not be blocked by an in-flight expand; this should not deadlock
|
||||||
|
// and should return the pre-expand state.
|
||||||
|
assertFalse(root.isExpanded());
|
||||||
|
assertEquals(0, root.getChildren().size());
|
||||||
|
release.countDown();
|
||||||
|
|
||||||
|
for (Future<Void> f : futures) {
|
||||||
|
f.get(10, TimeUnit.SECONDS);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
pool.shutdownNow();
|
||||||
|
}
|
||||||
|
|
||||||
|
assertTrue(root.isExpanded());
|
||||||
|
assertEquals(1, root.getChildren().size());
|
||||||
|
// Exactly one expand RPC was issued even though many callers raced.
|
||||||
|
assertEquals(1, childCalls.get());
|
||||||
|
// 1 roots fetch + exactly 1 expand fetch.
|
||||||
|
assertEquals(2, service.calls.size());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void browseWithFilterForwardsToRequest() throws Exception {
|
void browseWithFilterForwardsToRequest() throws Exception {
|
||||||
BrowseChildrenService service = new BrowseChildrenService();
|
BrowseChildrenService service = new BrowseChildrenService();
|
||||||
@@ -507,7 +597,7 @@ final class GalaxyRepositoryClientTests {
|
|||||||
return b.build();
|
return b.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final class BrowseChildrenService extends TestService {
|
private static class BrowseChildrenService extends TestService {
|
||||||
final List<BrowseChildrenRequest> calls =
|
final List<BrowseChildrenRequest> calls =
|
||||||
Collections.synchronizedList(new CopyOnWriteArrayList<>());
|
Collections.synchronizedList(new CopyOnWriteArrayList<>());
|
||||||
final Queue<BrowseChildrenReply> replies = new ArrayDeque<>();
|
final Queue<BrowseChildrenReply> replies = new ArrayDeque<>();
|
||||||
|
|||||||
@@ -140,6 +140,22 @@ class GalaxyRepositoryClient:
|
|||||||
)
|
)
|
||||||
seen_page_tokens.add(page_token)
|
seen_page_tokens.add(page_token)
|
||||||
|
|
||||||
|
async def browse_children_raw(
|
||||||
|
self, request: galaxy_pb.BrowseChildrenRequest
|
||||||
|
) -> galaxy_pb.BrowseChildrenReply:
|
||||||
|
"""Issue one BrowseChildren RPC and return the raw reply.
|
||||||
|
|
||||||
|
Lower-level escape hatch for callers that need direct page-token control
|
||||||
|
or do not want LazyBrowseNode wrapping. Most callers should use
|
||||||
|
:py:meth:`browse` and :py:meth:`LazyBrowseNode.expand` instead.
|
||||||
|
"""
|
||||||
|
|
||||||
|
return await self._unary(
|
||||||
|
"browse children",
|
||||||
|
self.raw_stub.BrowseChildren,
|
||||||
|
request,
|
||||||
|
)
|
||||||
|
|
||||||
async def browse(
|
async def browse(
|
||||||
self,
|
self,
|
||||||
options: BrowseChildrenOptions | None = None,
|
options: BrowseChildrenOptions | None = None,
|
||||||
|
|||||||
@@ -507,6 +507,35 @@ async def test_browse_with_filter_forwards_to_request() -> None:
|
|||||||
assert request.historized_only is True
|
assert request.historized_only is True
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_browse_children_raw_returns_reply_unwrapped() -> None:
|
||||||
|
"""browse_children_raw forwards the request to the stub and returns the raw reply."""
|
||||||
|
stub = FakeGalaxyStub()
|
||||||
|
expected = _build_browse_reply(
|
||||||
|
children=[_obj(1, "Plant", is_area=True)],
|
||||||
|
child_has_children=[True],
|
||||||
|
cache_sequence=42,
|
||||||
|
)
|
||||||
|
stub.browse_children.replies = [expected]
|
||||||
|
|
||||||
|
async with await GalaxyRepositoryClient.connect(
|
||||||
|
endpoint="fake",
|
||||||
|
plaintext=True,
|
||||||
|
stub=stub,
|
||||||
|
) as client:
|
||||||
|
request = galaxy_pb.BrowseChildrenRequest(
|
||||||
|
page_size=10,
|
||||||
|
tag_name_glob="Plant*",
|
||||||
|
)
|
||||||
|
reply = await client.browse_children_raw(request)
|
||||||
|
|
||||||
|
assert reply.cache_sequence == 42
|
||||||
|
assert len(reply.children) == 1
|
||||||
|
assert reply.children[0].tag_name == "Plant"
|
||||||
|
assert len(stub.browse_children.requests) == 1
|
||||||
|
assert stub.browse_children.requests[0].tag_name_glob == "Plant*"
|
||||||
|
|
||||||
|
|
||||||
class FakeGalaxyStub:
|
class FakeGalaxyStub:
|
||||||
def __init__(self) -> None:
|
def __init__(self) -> None:
|
||||||
self.test_connection = FakeUnary([galaxy_pb.TestConnectionReply(ok=False)])
|
self.test_connection = FakeUnary([galaxy_pb.TestConnectionReply(ok=False)])
|
||||||
|
|||||||
@@ -62,7 +62,16 @@ public static class GalaxyBrowseProjector
|
|||||||
return new GalaxyBrowseChildrenResult(page, hasChildren, filtered.Children.Count, filterSignature);
|
return new GalaxyBrowseChildrenResult(page, hasChildren, filtered.Children.Count, filterSignature);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static int ResolveParentId(GalaxyHierarchyCacheEntry entry, BrowseChildrenRequest request)
|
/// <summary>
|
||||||
|
/// Resolves the request's parent oneof to a gobject id, throwing
|
||||||
|
/// <see cref="RpcException"/> with <see cref="StatusCode.NotFound"/> when the
|
||||||
|
/// parent does not exist. Public so the gRPC handler can compute the same
|
||||||
|
/// parent id (needed for the page-token signature) without reimplementing the
|
||||||
|
/// resolution rules.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="entry">The Galaxy hierarchy cache entry to query.</param>
|
||||||
|
/// <param name="request">The browse-children request.</param>
|
||||||
|
public static int ResolveParentId(GalaxyHierarchyCacheEntry entry, BrowseChildrenRequest request)
|
||||||
{
|
{
|
||||||
switch (request.ParentCase)
|
switch (request.ParentCase)
|
||||||
{
|
{
|
||||||
@@ -80,9 +89,7 @@ public static class GalaxyBrowseProjector
|
|||||||
return request.ParentGobjectId;
|
return request.ParentGobjectId;
|
||||||
case BrowseChildrenRequest.ParentOneofCase.ParentTagName:
|
case BrowseChildrenRequest.ParentOneofCase.ParentTagName:
|
||||||
{
|
{
|
||||||
GalaxyObjectView? match = entry.Index.ObjectViews.FirstOrDefault(
|
if (!entry.Index.ObjectViewsByTagName.TryGetValue(request.ParentTagName, out GalaxyObjectView? match))
|
||||||
view => string.Equals(view.Object.TagName, request.ParentTagName, StringComparison.OrdinalIgnoreCase));
|
|
||||||
if (match is null)
|
|
||||||
{
|
{
|
||||||
throw new RpcException(new Status(StatusCode.NotFound, "BrowseChildren parent was not found."));
|
throw new RpcException(new Status(StatusCode.NotFound, "BrowseChildren parent was not found."));
|
||||||
}
|
}
|
||||||
@@ -90,9 +97,7 @@ public static class GalaxyBrowseProjector
|
|||||||
}
|
}
|
||||||
case BrowseChildrenRequest.ParentOneofCase.ParentContainedPath:
|
case BrowseChildrenRequest.ParentOneofCase.ParentContainedPath:
|
||||||
{
|
{
|
||||||
GalaxyObjectView? match = entry.Index.ObjectViews.FirstOrDefault(
|
if (!entry.Index.ObjectViewsByContainedPath.TryGetValue(request.ParentContainedPath, out GalaxyObjectView? match))
|
||||||
view => string.Equals(view.ContainedPath, request.ParentContainedPath, StringComparison.OrdinalIgnoreCase));
|
|
||||||
if (match is null)
|
|
||||||
{
|
{
|
||||||
throw new RpcException(new Status(StatusCode.NotFound, "BrowseChildren parent was not found."));
|
throw new RpcException(new Status(StatusCode.NotFound, "BrowseChildren parent was not found."));
|
||||||
}
|
}
|
||||||
@@ -163,11 +168,18 @@ public static class GalaxyBrowseProjector
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Defend against pathological cycles in Galaxy data (e.g. a corrupt A→B→A chain).
|
||||||
|
// BuildContainedPath uses the same visited-id pattern; mirror it so this walk
|
||||||
|
// terminates even when ChildrenByParent forms a cycle.
|
||||||
|
HashSet<int> visited = new() { parent.Object.GobjectId };
|
||||||
Stack<GalaxyObjectView> stack = new();
|
Stack<GalaxyObjectView> stack = new();
|
||||||
foreach (GalaxyObjectView child in children)
|
foreach (GalaxyObjectView child in children)
|
||||||
|
{
|
||||||
|
if (visited.Add(child.Object.GobjectId))
|
||||||
{
|
{
|
||||||
stack.Push(child);
|
stack.Push(child);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
while (stack.Count > 0)
|
while (stack.Count > 0)
|
||||||
{
|
{
|
||||||
GalaxyObjectView candidate = stack.Pop();
|
GalaxyObjectView candidate = stack.Pop();
|
||||||
@@ -179,11 +191,14 @@ public static class GalaxyBrowseProjector
|
|||||||
if (index.ChildrenByParent.TryGetValue(candidate.Object.GobjectId, out IReadOnlyList<GalaxyObjectView>? grandchildren))
|
if (index.ChildrenByParent.TryGetValue(candidate.Object.GobjectId, out IReadOnlyList<GalaxyObjectView>? grandchildren))
|
||||||
{
|
{
|
||||||
foreach (GalaxyObjectView grandchild in grandchildren)
|
foreach (GalaxyObjectView grandchild in grandchildren)
|
||||||
|
{
|
||||||
|
if (visited.Add(grandchild.Object.GobjectId))
|
||||||
{
|
{
|
||||||
stack.Push(grandchild);
|
stack.Push(grandchild);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -8,12 +8,16 @@ public sealed class GalaxyHierarchyIndex
|
|||||||
IReadOnlyList<GalaxyObjectView> objectViews,
|
IReadOnlyList<GalaxyObjectView> objectViews,
|
||||||
IReadOnlyDictionary<int, GalaxyObjectView> objectViewsById,
|
IReadOnlyDictionary<int, GalaxyObjectView> objectViewsById,
|
||||||
IReadOnlyDictionary<string, GalaxyTagLookup> tagsByAddress,
|
IReadOnlyDictionary<string, GalaxyTagLookup> tagsByAddress,
|
||||||
IReadOnlyDictionary<int, IReadOnlyList<GalaxyObjectView>> childrenByParent)
|
IReadOnlyDictionary<int, IReadOnlyList<GalaxyObjectView>> childrenByParent,
|
||||||
|
IReadOnlyDictionary<string, GalaxyObjectView> objectViewsByTagName,
|
||||||
|
IReadOnlyDictionary<string, GalaxyObjectView> objectViewsByContainedPath)
|
||||||
{
|
{
|
||||||
ObjectViews = objectViews;
|
ObjectViews = objectViews;
|
||||||
ObjectViewsById = objectViewsById;
|
ObjectViewsById = objectViewsById;
|
||||||
TagsByAddress = tagsByAddress;
|
TagsByAddress = tagsByAddress;
|
||||||
ChildrenByParent = childrenByParent;
|
ChildrenByParent = childrenByParent;
|
||||||
|
ObjectViewsByTagName = objectViewsByTagName;
|
||||||
|
ObjectViewsByContainedPath = objectViewsByContainedPath;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>Gets an empty Galaxy hierarchy index.</summary>
|
/// <summary>Gets an empty Galaxy hierarchy index.</summary>
|
||||||
@@ -21,7 +25,9 @@ public sealed class GalaxyHierarchyIndex
|
|||||||
Array.Empty<GalaxyObjectView>(),
|
Array.Empty<GalaxyObjectView>(),
|
||||||
new Dictionary<int, GalaxyObjectView>(),
|
new Dictionary<int, GalaxyObjectView>(),
|
||||||
new Dictionary<string, GalaxyTagLookup>(StringComparer.OrdinalIgnoreCase),
|
new Dictionary<string, GalaxyTagLookup>(StringComparer.OrdinalIgnoreCase),
|
||||||
new Dictionary<int, IReadOnlyList<GalaxyObjectView>>());
|
new Dictionary<int, IReadOnlyList<GalaxyObjectView>>(),
|
||||||
|
new Dictionary<string, GalaxyObjectView>(StringComparer.OrdinalIgnoreCase),
|
||||||
|
new Dictionary<string, GalaxyObjectView>(StringComparer.OrdinalIgnoreCase));
|
||||||
|
|
||||||
/// <summary>Gets the object views.</summary>
|
/// <summary>Gets the object views.</summary>
|
||||||
public IReadOnlyList<GalaxyObjectView> ObjectViews { get; }
|
public IReadOnlyList<GalaxyObjectView> ObjectViews { get; }
|
||||||
@@ -35,6 +41,12 @@ public sealed class GalaxyHierarchyIndex
|
|||||||
/// <summary>Gets direct children grouped by parent gobject id. Root objects (no parent, or self-parented) live under key 0. Each list is sorted areas-first, then by display name (OrdinalIgnoreCase).</summary>
|
/// <summary>Gets direct children grouped by parent gobject id. Root objects (no parent, or self-parented) live under key 0. Each list is sorted areas-first, then by display name (OrdinalIgnoreCase).</summary>
|
||||||
public IReadOnlyDictionary<int, IReadOnlyList<GalaxyObjectView>> ChildrenByParent { get; }
|
public IReadOnlyDictionary<int, IReadOnlyList<GalaxyObjectView>> ChildrenByParent { get; }
|
||||||
|
|
||||||
|
/// <summary>Gets object views indexed by <see cref="GalaxyObject.TagName"/> (OrdinalIgnoreCase). Lets browse/discover handlers resolve parents/roots by tag name in O(1) instead of scanning <see cref="ObjectViews"/>.</summary>
|
||||||
|
public IReadOnlyDictionary<string, GalaxyObjectView> ObjectViewsByTagName { get; }
|
||||||
|
|
||||||
|
/// <summary>Gets object views indexed by contained path (OrdinalIgnoreCase). Lets browse/discover handlers resolve parents/roots by path in O(1) instead of scanning <see cref="ObjectViews"/>.</summary>
|
||||||
|
public IReadOnlyDictionary<string, GalaxyObjectView> ObjectViewsByContainedPath { get; }
|
||||||
|
|
||||||
/// <summary>Builds a Galaxy hierarchy index from the given objects.</summary>
|
/// <summary>Builds a Galaxy hierarchy index from the given objects.</summary>
|
||||||
/// <param name="objects">The Galaxy objects to index.</param>
|
/// <param name="objects">The Galaxy objects to index.</param>
|
||||||
/// <returns>A new Galaxy hierarchy index.</returns>
|
/// <returns>A new Galaxy hierarchy index.</returns>
|
||||||
@@ -54,6 +66,8 @@ public sealed class GalaxyHierarchyIndex
|
|||||||
List<GalaxyObjectView> views = new(objects.Count);
|
List<GalaxyObjectView> views = new(objects.Count);
|
||||||
Dictionary<int, GalaxyObjectView> viewsById = new();
|
Dictionary<int, GalaxyObjectView> viewsById = new();
|
||||||
Dictionary<string, GalaxyTagLookup> tagsByAddress = new(StringComparer.OrdinalIgnoreCase);
|
Dictionary<string, GalaxyTagLookup> tagsByAddress = new(StringComparer.OrdinalIgnoreCase);
|
||||||
|
Dictionary<string, GalaxyObjectView> viewsByTagName = new(StringComparer.OrdinalIgnoreCase);
|
||||||
|
Dictionary<string, GalaxyObjectView> viewsByContainedPath = new(StringComparer.OrdinalIgnoreCase);
|
||||||
|
|
||||||
foreach (GalaxyObject obj in objects)
|
foreach (GalaxyObject obj in objects)
|
||||||
{
|
{
|
||||||
@@ -66,6 +80,12 @@ public sealed class GalaxyHierarchyIndex
|
|||||||
if (!string.IsNullOrWhiteSpace(obj.TagName))
|
if (!string.IsNullOrWhiteSpace(obj.TagName))
|
||||||
{
|
{
|
||||||
tagsByAddress.TryAdd(obj.TagName, new GalaxyTagLookup(obj, Attribute: null, path));
|
tagsByAddress.TryAdd(obj.TagName, new GalaxyTagLookup(obj, Attribute: null, path));
|
||||||
|
viewsByTagName.TryAdd(obj.TagName, view);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!string.IsNullOrWhiteSpace(path))
|
||||||
|
{
|
||||||
|
viewsByContainedPath.TryAdd(path, view);
|
||||||
}
|
}
|
||||||
|
|
||||||
foreach (GalaxyAttribute attribute in obj.Attributes)
|
foreach (GalaxyAttribute attribute in obj.Attributes)
|
||||||
@@ -109,7 +129,9 @@ public sealed class GalaxyHierarchyIndex
|
|||||||
views,
|
views,
|
||||||
viewsById,
|
viewsById,
|
||||||
tagsByAddress,
|
tagsByAddress,
|
||||||
readOnlyChildren);
|
readOnlyChildren,
|
||||||
|
viewsByTagName,
|
||||||
|
viewsByContainedPath);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static string BuildContainedPath(
|
private static string BuildContainedPath(
|
||||||
|
|||||||
@@ -103,7 +103,7 @@ public static class GalaxyHierarchyProjector
|
|||||||
// ResolveRoot can throw RpcException(NotFound); run it before consulting the
|
// ResolveRoot can throw RpcException(NotFound); run it before consulting the
|
||||||
// memo so a bad root surfaces consistently regardless of cache state.
|
// memo so a bad root surfaces consistently regardless of cache state.
|
||||||
IReadOnlyList<GalaxyObjectView> views = entry.Index.ObjectViews;
|
IReadOnlyList<GalaxyObjectView> views = entry.Index.ObjectViews;
|
||||||
GalaxyObjectView? root = ResolveRoot(request, views);
|
GalaxyObjectView? root = ResolveRoot(request, entry.Index);
|
||||||
|
|
||||||
ConcurrentDictionary<string, IReadOnlyList<GalaxyObjectView>> memo =
|
ConcurrentDictionary<string, IReadOnlyList<GalaxyObjectView>> memo =
|
||||||
FilteredViewCache.GetValue(entry, static _ => new ConcurrentDictionary<string, IReadOnlyList<GalaxyObjectView>>(StringComparer.Ordinal));
|
FilteredViewCache.GetValue(entry, static _ => new ConcurrentDictionary<string, IReadOnlyList<GalaxyObjectView>>(StringComparer.Ordinal));
|
||||||
@@ -176,17 +176,17 @@ public static class GalaxyHierarchyProjector
|
|||||||
|
|
||||||
private static GalaxyObjectView? ResolveRoot(
|
private static GalaxyObjectView? ResolveRoot(
|
||||||
DiscoverHierarchyRequest request,
|
DiscoverHierarchyRequest request,
|
||||||
IReadOnlyList<GalaxyObjectView> views)
|
GalaxyHierarchyIndex index)
|
||||||
{
|
{
|
||||||
GalaxyObjectView? root = request.RootCase switch
|
GalaxyObjectView? root = request.RootCase switch
|
||||||
{
|
{
|
||||||
DiscoverHierarchyRequest.RootOneofCase.None => null,
|
DiscoverHierarchyRequest.RootOneofCase.None => null,
|
||||||
DiscoverHierarchyRequest.RootOneofCase.RootGobjectId => views.FirstOrDefault(
|
DiscoverHierarchyRequest.RootOneofCase.RootGobjectId =>
|
||||||
view => view.Object.GobjectId == request.RootGobjectId),
|
index.ObjectViewsById.TryGetValue(request.RootGobjectId, out GalaxyObjectView? byId) ? byId : null,
|
||||||
DiscoverHierarchyRequest.RootOneofCase.RootTagName => views.FirstOrDefault(
|
DiscoverHierarchyRequest.RootOneofCase.RootTagName =>
|
||||||
view => string.Equals(view.Object.TagName, request.RootTagName, StringComparison.OrdinalIgnoreCase)),
|
index.ObjectViewsByTagName.TryGetValue(request.RootTagName, out GalaxyObjectView? byTag) ? byTag : null,
|
||||||
DiscoverHierarchyRequest.RootOneofCase.RootContainedPath => views.FirstOrDefault(
|
DiscoverHierarchyRequest.RootOneofCase.RootContainedPath =>
|
||||||
view => string.Equals(view.ContainedPath, request.RootContainedPath, StringComparison.OrdinalIgnoreCase)),
|
index.ObjectViewsByContainedPath.TryGetValue(request.RootContainedPath, out GalaxyObjectView? byPath) ? byPath : null,
|
||||||
_ => null,
|
_ => null,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@@ -128,8 +128,11 @@ public sealed class GalaxyRepositoryGrpcService(
|
|||||||
IReadOnlyList<string> browseSubtrees = ResolveBrowseSubtrees();
|
IReadOnlyList<string> browseSubtrees = ResolveBrowseSubtrees();
|
||||||
|
|
||||||
// Resolve the parent id once so the page-token signature can include it
|
// Resolve the parent id once so the page-token signature can include it
|
||||||
// and the projector sees the same resolved id when memoizing.
|
// and the projector sees the same resolved id when memoizing. The projector
|
||||||
int parentId = ResolveParentIdForToken(entry, request);
|
// re-resolves internally; with the by-name/by-path indexes on
|
||||||
|
// GalaxyHierarchyIndex that second call is O(1), so the redundancy is cheap
|
||||||
|
// and keeps the projector self-contained.
|
||||||
|
int parentId = GalaxyDb.GalaxyBrowseProjector.ResolveParentId(entry, request);
|
||||||
string filterSignature = GalaxyDb.GalaxyBrowseProjector.ComputeFilterSignature(
|
string filterSignature = GalaxyDb.GalaxyBrowseProjector.ComputeFilterSignature(
|
||||||
request, browseSubtrees, parentId);
|
request, browseSubtrees, parentId);
|
||||||
PageToken pageToken = ParsePageToken(request.PageToken, entry.Sequence, filterSignature);
|
PageToken pageToken = ParsePageToken(request.PageToken, entry.Sequence, filterSignature);
|
||||||
@@ -283,32 +286,6 @@ public sealed class GalaxyRepositoryGrpcService(
|
|||||||
return Math.Min(pageSize, MaxDiscoverPageSize);
|
return Math.Min(pageSize, MaxDiscoverPageSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Lightweight parent resolver used only for signature computation. Re-throws
|
|
||||||
// NotFound consistently with the projector so the error surface matches.
|
|
||||||
private static int ResolveParentIdForToken(
|
|
||||||
GalaxyDb.GalaxyHierarchyCacheEntry entry,
|
|
||||||
BrowseChildrenRequest request)
|
|
||||||
{
|
|
||||||
return request.ParentCase switch
|
|
||||||
{
|
|
||||||
BrowseChildrenRequest.ParentOneofCase.None => 0,
|
|
||||||
BrowseChildrenRequest.ParentOneofCase.ParentGobjectId =>
|
|
||||||
request.ParentGobjectId == 0 ? 0
|
|
||||||
: entry.Index.ObjectViewsById.ContainsKey(request.ParentGobjectId)
|
|
||||||
? request.ParentGobjectId
|
|
||||||
: throw new RpcException(new Status(StatusCode.NotFound, "BrowseChildren parent was not found.")),
|
|
||||||
BrowseChildrenRequest.ParentOneofCase.ParentTagName =>
|
|
||||||
entry.Index.ObjectViews.FirstOrDefault(
|
|
||||||
v => string.Equals(v.Object.TagName, request.ParentTagName, StringComparison.OrdinalIgnoreCase))?.Object.GobjectId
|
|
||||||
?? throw new RpcException(new Status(StatusCode.NotFound, "BrowseChildren parent was not found.")),
|
|
||||||
BrowseChildrenRequest.ParentOneofCase.ParentContainedPath =>
|
|
||||||
entry.Index.ObjectViews.FirstOrDefault(
|
|
||||||
v => string.Equals(v.ContainedPath, request.ParentContainedPath, StringComparison.OrdinalIgnoreCase))?.Object.GobjectId
|
|
||||||
?? throw new RpcException(new Status(StatusCode.NotFound, "BrowseChildren parent was not found.")),
|
|
||||||
_ => 0,
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
private IReadOnlyList<string> ResolveBrowseSubtrees()
|
private IReadOnlyList<string> ResolveBrowseSubtrees()
|
||||||
{
|
{
|
||||||
ApiKeyConstraints constraints = identityAccessor.Current?.EffectiveConstraints ?? ApiKeyConstraints.Empty;
|
ApiKeyConstraints constraints = identityAccessor.Current?.EffectiveConstraints ?? ApiKeyConstraints.Empty;
|
||||||
|
|||||||
@@ -223,6 +223,77 @@ public sealed class GalaxyBrowseProjectorTests
|
|||||||
Assert.Equal("Plant.Line_A", result.Children[0].TagName);
|
Assert.Equal("Plant.Line_A", result.Children[0].TagName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Verifies <see cref="GalaxyBrowseProjector"/> terminates when the Galaxy data
|
||||||
|
/// contains a cyclic parent chain (A→B→C→A). Without the visited-id guard in
|
||||||
|
/// <c>HasMatchingDescendant</c>, the depth-first walk would loop forever; the
|
||||||
|
/// 5-second xUnit timeout asserts termination.
|
||||||
|
/// </summary>
|
||||||
|
[Fact(Timeout = 5000)]
|
||||||
|
public async Task Project_CyclicDescendants_DoesNotInfiniteLoop()
|
||||||
|
{
|
||||||
|
await Task.Yield();
|
||||||
|
// Construct a 3-node cycle: A(10)→B(11)→C(12)→A. Each node's ParentGobjectId
|
||||||
|
// points to the next, so GalaxyHierarchyIndex.ChildrenByParent has
|
||||||
|
// [12] = [A], [10] = [B], [11] = [C].
|
||||||
|
// None of them are historized, so HistorizedOnly=true forces the projector to
|
||||||
|
// call HasMatchingDescendant on every direct child, exercising the cycle walk.
|
||||||
|
GalaxyObject a = new()
|
||||||
|
{
|
||||||
|
GobjectId = 10,
|
||||||
|
ParentGobjectId = 12,
|
||||||
|
ContainedName = "A",
|
||||||
|
BrowseName = "A",
|
||||||
|
TagName = "A",
|
||||||
|
};
|
||||||
|
GalaxyObject b = new()
|
||||||
|
{
|
||||||
|
GobjectId = 11,
|
||||||
|
ParentGobjectId = 10,
|
||||||
|
ContainedName = "B",
|
||||||
|
BrowseName = "B",
|
||||||
|
TagName = "B",
|
||||||
|
};
|
||||||
|
GalaxyObject c = new()
|
||||||
|
{
|
||||||
|
GobjectId = 12,
|
||||||
|
ParentGobjectId = 11,
|
||||||
|
ContainedName = "C",
|
||||||
|
BrowseName = "C",
|
||||||
|
TagName = "C",
|
||||||
|
};
|
||||||
|
|
||||||
|
IReadOnlyList<GalaxyObject> objects = new[] { a, b, c };
|
||||||
|
GalaxyHierarchyCacheEntry entry = GalaxyHierarchyCacheEntry.Empty with
|
||||||
|
{
|
||||||
|
Status = GalaxyCacheStatus.Healthy,
|
||||||
|
Sequence = 1,
|
||||||
|
LastSuccessAt = DateTimeOffset.UtcNow,
|
||||||
|
Objects = objects,
|
||||||
|
Index = GalaxyHierarchyIndex.Build(objects),
|
||||||
|
DashboardSummary = DashboardGalaxySummary.Unknown with
|
||||||
|
{
|
||||||
|
Status = DashboardGalaxyStatus.Healthy,
|
||||||
|
ObjectCount = objects.Count,
|
||||||
|
},
|
||||||
|
ObjectCount = objects.Count,
|
||||||
|
};
|
||||||
|
|
||||||
|
// Browse children of A (id=10). Its direct child B fails HistorizedOnly, so the
|
||||||
|
// projector falls back to HasMatchingDescendant(B), which walks B→C→A→B…
|
||||||
|
// without the visited-id guard. With the guard, the walk terminates and returns
|
||||||
|
// an empty page (no historized descendants exist anywhere in the cycle).
|
||||||
|
GalaxyBrowseChildrenResult result = GalaxyBrowseProjector.ProjectChildren(
|
||||||
|
entry,
|
||||||
|
new BrowseChildrenRequest { ParentGobjectId = 10, HistorizedOnly = true },
|
||||||
|
browseSubtreeGlobs: null,
|
||||||
|
offset: 0,
|
||||||
|
pageSize: 10);
|
||||||
|
|
||||||
|
Assert.Empty(result.Children);
|
||||||
|
Assert.Equal(0, result.TotalChildCount);
|
||||||
|
}
|
||||||
|
|
||||||
private static GalaxyHierarchyCacheEntry CreateEntry()
|
private static GalaxyHierarchyCacheEntry CreateEntry()
|
||||||
{
|
{
|
||||||
IReadOnlyList<GalaxyObject> objects = CreateObjects();
|
IReadOnlyList<GalaxyObject> objects = CreateObjects();
|
||||||
|
|||||||
@@ -75,6 +75,47 @@ public sealed class GalaxyHierarchyIndexTests
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>Verifies <see cref="GalaxyHierarchyIndex.ObjectViewsByTagName"/> is OrdinalIgnoreCase and supports O(1) lookups.</summary>
|
||||||
|
[Fact]
|
||||||
|
public void ObjectViewsByTagName_IsCaseInsensitive_AndLookupsAreO1()
|
||||||
|
{
|
||||||
|
GalaxyObject root = new() { GobjectId = 1, ParentGobjectId = 0, IsArea = true, ContainedName = "Plant", BrowseName = "Plant", TagName = "Plant" };
|
||||||
|
GalaxyObject mixer = new() { GobjectId = 2, ParentGobjectId = 1, ContainedName = "Mixer_001", BrowseName = "Mixer_001", TagName = "Plant.Mixer_001" };
|
||||||
|
|
||||||
|
GalaxyHierarchyIndex index = GalaxyHierarchyIndex.Build([root, mixer]);
|
||||||
|
|
||||||
|
Assert.True(index.ObjectViewsByTagName.TryGetValue("Plant.Mixer_001", out GalaxyObjectView? exact));
|
||||||
|
Assert.NotNull(exact);
|
||||||
|
Assert.Equal(2, exact!.Object.GobjectId);
|
||||||
|
|
||||||
|
// Case-insensitive lookup must hit the same entry.
|
||||||
|
Assert.True(index.ObjectViewsByTagName.TryGetValue("plant.mixer_001", out GalaxyObjectView? lower));
|
||||||
|
Assert.NotNull(lower);
|
||||||
|
Assert.Same(exact, lower);
|
||||||
|
|
||||||
|
Assert.False(index.ObjectViewsByTagName.ContainsKey("Plant.Missing"));
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>Verifies <see cref="GalaxyHierarchyIndex.ObjectViewsByContainedPath"/> is OrdinalIgnoreCase.</summary>
|
||||||
|
[Fact]
|
||||||
|
public void ObjectViewsByContainedPath_IsCaseInsensitive()
|
||||||
|
{
|
||||||
|
GalaxyObject root = new() { GobjectId = 1, ParentGobjectId = 0, IsArea = true, ContainedName = "Plant", BrowseName = "Plant", TagName = "Plant" };
|
||||||
|
GalaxyObject lineA = new() { GobjectId = 2, ParentGobjectId = 1, IsArea = true, ContainedName = "Line_A", BrowseName = "Line_A", TagName = "Plant.Line_A" };
|
||||||
|
|
||||||
|
GalaxyHierarchyIndex index = GalaxyHierarchyIndex.Build([root, lineA]);
|
||||||
|
|
||||||
|
Assert.True(index.ObjectViewsByContainedPath.TryGetValue("Plant/Line_A", out GalaxyObjectView? exact));
|
||||||
|
Assert.NotNull(exact);
|
||||||
|
Assert.Equal(2, exact!.Object.GobjectId);
|
||||||
|
|
||||||
|
Assert.True(index.ObjectViewsByContainedPath.TryGetValue("plant/line_a", out GalaxyObjectView? lower));
|
||||||
|
Assert.NotNull(lower);
|
||||||
|
Assert.Same(exact, lower);
|
||||||
|
|
||||||
|
Assert.False(index.ObjectViewsByContainedPath.ContainsKey("Plant/Missing"));
|
||||||
|
}
|
||||||
|
|
||||||
/// <summary>Verifies children sort areas-first, then by display name (case-insensitive).</summary>
|
/// <summary>Verifies children sort areas-first, then by display name (case-insensitive).</summary>
|
||||||
[Fact]
|
[Fact]
|
||||||
public void ChildrenByParent_SortsAreasFirstThenByDisplayName()
|
public void ChildrenByParent_SortsAreasFirstThenByDisplayName()
|
||||||
|
|||||||
Reference in New Issue
Block a user