client/go: avoid holding mutex across BrowseChildren RPC in Expand
This commit is contained in:
@@ -273,15 +273,25 @@ func (c *GalaxyClient) Close() error {
|
||||
|
||||
// LazyBrowseNode is one node in a lazy Galaxy hierarchy walk produced by
|
||||
// (*GalaxyClient).Browse. Children are not fetched until Expand is called.
|
||||
// The node is safe for concurrent use; concurrent Expand calls collapse to a
|
||||
// single RPC.
|
||||
// The node is safe for concurrent use; concurrent Expand calls coalesce onto
|
||||
// a single in-flight RPC and do not block snapshot accessors.
|
||||
type LazyBrowseNode struct {
|
||||
client *GalaxyClient
|
||||
object *pb.GalaxyObject
|
||||
hasChildrenHint bool
|
||||
options BrowseChildrenOptions
|
||||
|
||||
mu sync.Mutex
|
||||
// expandLock gates inspection and mutation of expand-coordination state
|
||||
// (expanding, expandDone, expandErr). It is held only briefly; the BrowseChildren
|
||||
// RPC itself runs outside this lock so concurrent readers and waiters are not blocked.
|
||||
expandLock sync.Mutex
|
||||
expanding bool
|
||||
expandDone chan struct{}
|
||||
expandErr error
|
||||
|
||||
// mu protects the children snapshot and isExpanded flag for concurrent
|
||||
// Children() / IsExpanded() readers.
|
||||
mu sync.RWMutex
|
||||
children []*LazyBrowseNode
|
||||
isExpanded bool
|
||||
}
|
||||
@@ -296,8 +306,8 @@ func (n *LazyBrowseNode) HasChildrenHint() bool { return n.hasChildrenHint }
|
||||
// Children returns a snapshot copy of the currently-loaded child nodes. Returns
|
||||
// an empty slice when Expand has not yet been called.
|
||||
func (n *LazyBrowseNode) Children() []*LazyBrowseNode {
|
||||
n.mu.Lock()
|
||||
defer n.mu.Unlock()
|
||||
n.mu.RLock()
|
||||
defer n.mu.RUnlock()
|
||||
out := make([]*LazyBrowseNode, len(n.children))
|
||||
copy(out, n.children)
|
||||
return out
|
||||
@@ -305,28 +315,81 @@ func (n *LazyBrowseNode) Children() []*LazyBrowseNode {
|
||||
|
||||
// IsExpanded reports whether Expand has completed successfully on this node.
|
||||
func (n *LazyBrowseNode) IsExpanded() bool {
|
||||
n.mu.Lock()
|
||||
defer n.mu.Unlock()
|
||||
n.mu.RLock()
|
||||
defer n.mu.RUnlock()
|
||||
return n.isExpanded
|
||||
}
|
||||
|
||||
// Expand fetches this node's direct children via BrowseChildren when they have
|
||||
// not yet been loaded. Subsequent calls after a successful Expand are a no-op
|
||||
// and do not issue another RPC.
|
||||
//
|
||||
// Expand is safe to call concurrently from multiple goroutines: callers that
|
||||
// arrive while an expansion is in flight wait on the active RPC and share its
|
||||
// result instead of issuing a second RPC. The RPC itself runs without holding
|
||||
// the snapshot mutex, so concurrent Children() and IsExpanded() callers are
|
||||
// not blocked for the duration of the network round trip.
|
||||
//
|
||||
// Failure semantics: a failed expansion surfaces the same error to every
|
||||
// in-flight waiter, but the node is left in its pre-call state (isExpanded =
|
||||
// false, no in-flight expansion). The next Expand call therefore retries with
|
||||
// a fresh RPC; failures are not sticky.
|
||||
func (n *LazyBrowseNode) Expand(ctx context.Context) error {
|
||||
n.mu.Lock()
|
||||
defer n.mu.Unlock()
|
||||
// Fast path: already expanded.
|
||||
n.mu.RLock()
|
||||
if n.isExpanded {
|
||||
n.mu.RUnlock()
|
||||
return nil
|
||||
}
|
||||
n.mu.RUnlock()
|
||||
|
||||
// Either start a new expansion or wait on an existing one.
|
||||
n.expandLock.Lock()
|
||||
n.mu.RLock()
|
||||
alreadyExpanded := n.isExpanded
|
||||
n.mu.RUnlock()
|
||||
if alreadyExpanded {
|
||||
n.expandLock.Unlock()
|
||||
return nil
|
||||
}
|
||||
if n.expanding {
|
||||
done := n.expandDone
|
||||
n.expandLock.Unlock()
|
||||
select {
|
||||
case <-done:
|
||||
n.expandLock.Lock()
|
||||
err := n.expandErr
|
||||
n.expandLock.Unlock()
|
||||
return err
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
n.expanding = true
|
||||
n.expandDone = make(chan struct{})
|
||||
done := n.expandDone
|
||||
n.expandLock.Unlock()
|
||||
|
||||
// Issue the RPC outside any lock so concurrent readers/waiters are not blocked.
|
||||
parentID := n.object.GetGobjectId()
|
||||
children, err := n.client.browseChildrenInner(ctx, &parentID, n.options)
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
if err == nil {
|
||||
n.mu.Lock()
|
||||
n.children = children
|
||||
n.isExpanded = true
|
||||
n.mu.Unlock()
|
||||
}
|
||||
n.children = children
|
||||
n.isExpanded = true
|
||||
return nil
|
||||
|
||||
// Publish result to waiters and clear the in-flight marker so a failed
|
||||
// expansion can be retried by the next Expand call.
|
||||
n.expandLock.Lock()
|
||||
n.expandErr = err
|
||||
n.expanding = false
|
||||
close(done)
|
||||
n.expandLock.Unlock()
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// Browse returns the root nodes of the Galaxy hierarchy. The returned nodes
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"net"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -788,6 +789,53 @@ func TestGalaxyBrowseWithFilterForwardsToRequest(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestGalaxyBrowseExpandConcurrentCallersOnlyFireOneRpc(t *testing.T) {
|
||||
fake := &fakeGalaxyServer{
|
||||
browseChildrenReplies: []*pb.BrowseChildrenReply{
|
||||
// roots
|
||||
buildBrowseReply([]*pb.GalaxyObject{obj(1, "Plant", true)}, []bool{true}, 7),
|
||||
// one expand: one child
|
||||
buildBrowseReply([]*pb.GalaxyObject{obj(2, "Mixer", false)}, []bool{false}, 7),
|
||||
},
|
||||
}
|
||||
client, cleanup := newGalaxyBufconnClient(t, fake)
|
||||
defer cleanup()
|
||||
|
||||
ctx := context.Background()
|
||||
roots, err := client.Browse(ctx, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("Browse: %v", err)
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
errs := make(chan error, 10)
|
||||
for i := 0; i < 10; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
errs <- roots[0].Expand(ctx)
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
close(errs)
|
||||
for err := range errs {
|
||||
if err != nil {
|
||||
t.Fatalf("concurrent Expand: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
if !roots[0].IsExpanded() {
|
||||
t.Fatal("IsExpanded() = false after 10 concurrent expands")
|
||||
}
|
||||
if got, want := len(roots[0].Children()), 1; got != want {
|
||||
t.Fatalf("len(children) = %d, want %d", got, want)
|
||||
}
|
||||
// 1 roots fetch + exactly 1 expand fetch.
|
||||
if got, want := len(fake.browseChildrenCalls), 2; got != want {
|
||||
t.Fatalf("RPC count = %d, want %d", got, want)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGalaxyBrowseChildrenRejectsRepeatedPageToken(t *testing.T) {
|
||||
// Build a reply that carries a non-empty NextPageToken so browseChildrenInner
|
||||
// will request a second page. Queue the same reply twice so the second response
|
||||
|
||||
Reference in New Issue
Block a user