client/go: LazyBrowseNode walker for lazy hierarchy browse
This commit is contained in:
@@ -3,7 +3,9 @@ package mxgateway
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
pb "gitea.dohertylan.com/dohertj2/mxaccessgw/clients/go/internal/generated"
|
||||
@@ -13,6 +15,9 @@ import (
|
||||
"google.golang.org/protobuf/types/known/timestamppb"
|
||||
)
|
||||
|
||||
// browseChildrenPageSize is the per-request page size used by the lazy walker.
|
||||
const browseChildrenPageSize = 500
|
||||
|
||||
// RawGalaxyRepositoryClient is the generated gRPC client interface for the
|
||||
// Galaxy Repository service exposed for callers that need direct contract
|
||||
// access.
|
||||
@@ -40,6 +45,10 @@ type (
|
||||
WatchDeployEventsRequest = pb.WatchDeployEventsRequest
|
||||
// DeployEvent is one Galaxy Repository deploy event.
|
||||
DeployEvent = pb.DeployEvent
|
||||
// BrowseChildrenRequest is the request for BrowseChildren.
|
||||
BrowseChildrenRequest = pb.BrowseChildrenRequest
|
||||
// BrowseChildrenReply is the reply for BrowseChildren.
|
||||
BrowseChildrenReply = pb.BrowseChildrenReply
|
||||
)
|
||||
|
||||
// RawDeployEventStream is the generated WatchDeployEvents client stream.
|
||||
@@ -238,6 +247,140 @@ func (c *GalaxyClient) Close() error {
|
||||
return c.conn.Close()
|
||||
}
|
||||
|
||||
// LazyBrowseNode is one node in a lazy Galaxy hierarchy walk produced by
|
||||
// (*GalaxyClient).Browse. Children are not fetched until Expand is called.
|
||||
// The node is safe for concurrent use; concurrent Expand calls collapse to a
|
||||
// single RPC.
|
||||
type LazyBrowseNode struct {
|
||||
client *GalaxyClient
|
||||
object *pb.GalaxyObject
|
||||
hasChildrenHint bool
|
||||
options BrowseChildrenOptions
|
||||
|
||||
mu sync.Mutex
|
||||
children []*LazyBrowseNode
|
||||
isExpanded bool
|
||||
}
|
||||
|
||||
// Object returns the underlying GalaxyObject describing this node.
|
||||
func (n *LazyBrowseNode) Object() *pb.GalaxyObject { return n.object }
|
||||
|
||||
// HasChildrenHint reports the server-supplied hint on whether this node has
|
||||
// matching descendants under the current filter set.
|
||||
func (n *LazyBrowseNode) HasChildrenHint() bool { return n.hasChildrenHint }
|
||||
|
||||
// Children returns a snapshot copy of the currently-loaded child nodes. Returns
|
||||
// an empty slice when Expand has not yet been called.
|
||||
func (n *LazyBrowseNode) Children() []*LazyBrowseNode {
|
||||
n.mu.Lock()
|
||||
defer n.mu.Unlock()
|
||||
out := make([]*LazyBrowseNode, len(n.children))
|
||||
copy(out, n.children)
|
||||
return out
|
||||
}
|
||||
|
||||
// IsExpanded reports whether Expand has completed successfully on this node.
|
||||
func (n *LazyBrowseNode) IsExpanded() bool {
|
||||
n.mu.Lock()
|
||||
defer n.mu.Unlock()
|
||||
return n.isExpanded
|
||||
}
|
||||
|
||||
// Expand fetches this node's direct children via BrowseChildren when they have
|
||||
// not yet been loaded. Subsequent calls after a successful Expand are a no-op
|
||||
// and do not issue another RPC.
|
||||
func (n *LazyBrowseNode) Expand(ctx context.Context) error {
|
||||
n.mu.Lock()
|
||||
defer n.mu.Unlock()
|
||||
if n.isExpanded {
|
||||
return nil
|
||||
}
|
||||
parentID := n.object.GetGobjectId()
|
||||
children, err := n.client.browseChildrenInner(ctx, &parentID, n.options)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
n.children = children
|
||||
n.isExpanded = true
|
||||
return nil
|
||||
}
|
||||
|
||||
// Browse returns the root nodes of the Galaxy hierarchy. The returned nodes
|
||||
// have only their server-supplied hints populated; call Expand on each node to
|
||||
// fetch its direct children. When opts is nil the server defaults apply.
|
||||
func (c *GalaxyClient) Browse(ctx context.Context, opts *BrowseChildrenOptions) ([]*LazyBrowseNode, error) {
|
||||
effective := BrowseChildrenOptions{}
|
||||
if opts != nil {
|
||||
effective = *opts
|
||||
}
|
||||
return c.browseChildrenInner(ctx, nil, effective)
|
||||
}
|
||||
|
||||
// BrowseChildrenRaw issues a single BrowseChildren RPC and returns the raw
|
||||
// reply for callers that need direct page-token control. Transport-level
|
||||
// failures are wrapped in *GatewayError to match the rest of the client.
|
||||
func (c *GalaxyClient) BrowseChildrenRaw(ctx context.Context, req *pb.BrowseChildrenRequest) (*pb.BrowseChildrenReply, error) {
|
||||
callCtx, cancel := c.callContext(ctx)
|
||||
defer cancel()
|
||||
reply, err := c.raw.BrowseChildren(callCtx, req)
|
||||
if err != nil {
|
||||
return nil, &GatewayError{Op: "galaxy browse children", Err: err}
|
||||
}
|
||||
return reply, nil
|
||||
}
|
||||
|
||||
func (c *GalaxyClient) browseChildrenInner(
|
||||
ctx context.Context,
|
||||
parentGobjectID *int32,
|
||||
opts BrowseChildrenOptions,
|
||||
) ([]*LazyBrowseNode, error) {
|
||||
var nodes []*LazyBrowseNode
|
||||
pageToken := ""
|
||||
seen := map[string]struct{}{}
|
||||
for {
|
||||
req := &pb.BrowseChildrenRequest{
|
||||
PageSize: browseChildrenPageSize,
|
||||
PageToken: pageToken,
|
||||
CategoryIds: opts.CategoryIds,
|
||||
TemplateChainContains: opts.TemplateChainContains,
|
||||
TagNameGlob: opts.TagNameGlob,
|
||||
AlarmBearingOnly: opts.AlarmBearingOnly,
|
||||
HistorizedOnly: opts.HistorizedOnly,
|
||||
}
|
||||
if parentGobjectID != nil {
|
||||
req.Parent = &pb.BrowseChildrenRequest_ParentGobjectId{ParentGobjectId: *parentGobjectID}
|
||||
}
|
||||
if opts.IncludeAttributes != nil {
|
||||
req.IncludeAttributes = opts.IncludeAttributes
|
||||
}
|
||||
|
||||
reply, err := c.BrowseChildrenRaw(ctx, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for i, child := range reply.GetChildren() {
|
||||
hasChildren := reply.GetChildHasChildren()
|
||||
hint := i < len(hasChildren) && hasChildren[i]
|
||||
nodes = append(nodes, &LazyBrowseNode{
|
||||
client: c,
|
||||
object: child,
|
||||
hasChildrenHint: hint,
|
||||
options: opts,
|
||||
})
|
||||
}
|
||||
|
||||
pageToken = reply.GetNextPageToken()
|
||||
if pageToken == "" {
|
||||
return nodes, nil
|
||||
}
|
||||
if _, dup := seen[pageToken]; dup {
|
||||
return nil, fmt.Errorf("mxgateway: galaxy browse children returned repeated page token %q", pageToken)
|
||||
}
|
||||
seen[pageToken] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *GalaxyClient) callContext(ctx context.Context) (context.Context, context.CancelFunc) {
|
||||
timeout := c.opts.CallTimeout
|
||||
if timeout == 0 {
|
||||
|
||||
Reference in New Issue
Block a user