diff --git a/clients/go/mxgateway/galaxy.go b/clients/go/mxgateway/galaxy.go index 0ff658d..3df6d97 100644 --- a/clients/go/mxgateway/galaxy.go +++ b/clients/go/mxgateway/galaxy.go @@ -273,15 +273,25 @@ func (c *GalaxyClient) Close() error { // LazyBrowseNode is one node in a lazy Galaxy hierarchy walk produced by // (*GalaxyClient).Browse. Children are not fetched until Expand is called. -// The node is safe for concurrent use; concurrent Expand calls collapse to a -// single RPC. +// The node is safe for concurrent use; concurrent Expand calls coalesce onto +// a single in-flight RPC and do not block snapshot accessors. type LazyBrowseNode struct { client *GalaxyClient object *pb.GalaxyObject hasChildrenHint bool options BrowseChildrenOptions - mu sync.Mutex + // expandLock gates inspection and mutation of expand-coordination state + // (expanding, expandDone, expandErr). It is held only briefly; the BrowseChildren + // RPC itself runs outside this lock so concurrent readers and waiters are not blocked. + expandLock sync.Mutex + expanding bool + expandDone chan struct{} + expandErr error + + // mu protects the children snapshot and isExpanded flag for concurrent + // Children() / IsExpanded() readers. + mu sync.RWMutex children []*LazyBrowseNode isExpanded bool } @@ -296,8 +306,8 @@ func (n *LazyBrowseNode) HasChildrenHint() bool { return n.hasChildrenHint } // Children returns a snapshot copy of the currently-loaded child nodes. Returns // an empty slice when Expand has not yet been called. func (n *LazyBrowseNode) Children() []*LazyBrowseNode { - n.mu.Lock() - defer n.mu.Unlock() + n.mu.RLock() + defer n.mu.RUnlock() out := make([]*LazyBrowseNode, len(n.children)) copy(out, n.children) return out @@ -305,28 +315,81 @@ func (n *LazyBrowseNode) Children() []*LazyBrowseNode { // IsExpanded reports whether Expand has completed successfully on this node. func (n *LazyBrowseNode) IsExpanded() bool { - n.mu.Lock() - defer n.mu.Unlock() + n.mu.RLock() + defer n.mu.RUnlock() return n.isExpanded } // Expand fetches this node's direct children via BrowseChildren when they have // not yet been loaded. Subsequent calls after a successful Expand are a no-op // and do not issue another RPC. +// +// Expand is safe to call concurrently from multiple goroutines: callers that +// arrive while an expansion is in flight wait on the active RPC and share its +// result instead of issuing a second RPC. The RPC itself runs without holding +// the snapshot mutex, so concurrent Children() and IsExpanded() callers are +// not blocked for the duration of the network round trip. +// +// Failure semantics: a failed expansion surfaces the same error to every +// in-flight waiter, but the node is left in its pre-call state (isExpanded = +// false, no in-flight expansion). The next Expand call therefore retries with +// a fresh RPC; failures are not sticky. func (n *LazyBrowseNode) Expand(ctx context.Context) error { - n.mu.Lock() - defer n.mu.Unlock() + // Fast path: already expanded. + n.mu.RLock() if n.isExpanded { + n.mu.RUnlock() return nil } + n.mu.RUnlock() + + // Either start a new expansion or wait on an existing one. + n.expandLock.Lock() + n.mu.RLock() + alreadyExpanded := n.isExpanded + n.mu.RUnlock() + if alreadyExpanded { + n.expandLock.Unlock() + return nil + } + if n.expanding { + done := n.expandDone + n.expandLock.Unlock() + select { + case <-done: + n.expandLock.Lock() + err := n.expandErr + n.expandLock.Unlock() + return err + case <-ctx.Done(): + return ctx.Err() + } + } + n.expanding = true + n.expandDone = make(chan struct{}) + done := n.expandDone + n.expandLock.Unlock() + + // Issue the RPC outside any lock so concurrent readers/waiters are not blocked. parentID := n.object.GetGobjectId() children, err := n.client.browseChildrenInner(ctx, &parentID, n.options) - if err != nil { - return err + + if err == nil { + n.mu.Lock() + n.children = children + n.isExpanded = true + n.mu.Unlock() } - n.children = children - n.isExpanded = true - return nil + + // Publish result to waiters and clear the in-flight marker so a failed + // expansion can be retried by the next Expand call. + n.expandLock.Lock() + n.expandErr = err + n.expanding = false + close(done) + n.expandLock.Unlock() + + return err } // Browse returns the root nodes of the Galaxy hierarchy. The returned nodes diff --git a/clients/go/mxgateway/galaxy_test.go b/clients/go/mxgateway/galaxy_test.go index 629e812..ffa165d 100644 --- a/clients/go/mxgateway/galaxy_test.go +++ b/clients/go/mxgateway/galaxy_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" "net" + "sync" "testing" "time" @@ -788,6 +789,53 @@ func TestGalaxyBrowseWithFilterForwardsToRequest(t *testing.T) { } } +func TestGalaxyBrowseExpandConcurrentCallersOnlyFireOneRpc(t *testing.T) { + fake := &fakeGalaxyServer{ + browseChildrenReplies: []*pb.BrowseChildrenReply{ + // roots + buildBrowseReply([]*pb.GalaxyObject{obj(1, "Plant", true)}, []bool{true}, 7), + // one expand: one child + buildBrowseReply([]*pb.GalaxyObject{obj(2, "Mixer", false)}, []bool{false}, 7), + }, + } + client, cleanup := newGalaxyBufconnClient(t, fake) + defer cleanup() + + ctx := context.Background() + roots, err := client.Browse(ctx, nil) + if err != nil { + t.Fatalf("Browse: %v", err) + } + + var wg sync.WaitGroup + errs := make(chan error, 10) + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + errs <- roots[0].Expand(ctx) + }() + } + wg.Wait() + close(errs) + for err := range errs { + if err != nil { + t.Fatalf("concurrent Expand: %v", err) + } + } + + if !roots[0].IsExpanded() { + t.Fatal("IsExpanded() = false after 10 concurrent expands") + } + if got, want := len(roots[0].Children()), 1; got != want { + t.Fatalf("len(children) = %d, want %d", got, want) + } + // 1 roots fetch + exactly 1 expand fetch. + if got, want := len(fake.browseChildrenCalls), 2; got != want { + t.Fatalf("RPC count = %d, want %d", got, want) + } +} + func TestGalaxyBrowseChildrenRejectsRepeatedPageToken(t *testing.T) { // Build a reply that carries a non-empty NextPageToken so browseChildrenInner // will request a second page. Queue the same reply twice so the second response