490 lines
16 KiB
Go
490 lines
16 KiB
Go
package mxgateway
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"sync"
|
|
"time"
|
|
|
|
pb "gitea.dohertylan.com/dohertj2/mxaccessgw/clients/go/internal/generated"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/status"
|
|
"google.golang.org/protobuf/types/known/timestamppb"
|
|
)
|
|
|
|
// browseChildrenPageSize is the per-request page size used by the lazy walker.
|
|
const browseChildrenPageSize = 500
|
|
|
|
// discoverHierarchyPageSize is the per-request page size used by DiscoverHierarchy.
|
|
// Mirrors the .NET client constant so large galaxies are not silently truncated
|
|
// by the server's default page cap.
|
|
const discoverHierarchyPageSize = 5000
|
|
|
|
// RawGalaxyRepositoryClient is the generated gRPC client interface for the
|
|
// Galaxy Repository service exposed for callers that need direct contract
|
|
// access.
|
|
type RawGalaxyRepositoryClient = pb.GalaxyRepositoryClient
|
|
|
|
// Generated protobuf aliases for Galaxy Repository messages.
|
|
type (
|
|
// TestConnectionRequest is the request for Galaxy Repository TestConnection.
|
|
TestConnectionRequest = pb.TestConnectionRequest
|
|
// TestConnectionReply is the reply for Galaxy Repository TestConnection.
|
|
TestConnectionReply = pb.TestConnectionReply
|
|
// GetLastDeployTimeRequest is the request for GetLastDeployTime.
|
|
GetLastDeployTimeRequest = pb.GetLastDeployTimeRequest
|
|
// GetLastDeployTimeReply is the reply for GetLastDeployTime.
|
|
GetLastDeployTimeReply = pb.GetLastDeployTimeReply
|
|
// DiscoverHierarchyRequest is the request for DiscoverHierarchy.
|
|
DiscoverHierarchyRequest = pb.DiscoverHierarchyRequest
|
|
// DiscoverHierarchyReply is the reply for DiscoverHierarchy.
|
|
DiscoverHierarchyReply = pb.DiscoverHierarchyReply
|
|
// GalaxyObject describes one Galaxy object with its dynamic attributes.
|
|
GalaxyObject = pb.GalaxyObject
|
|
// GalaxyAttribute describes one dynamic attribute on a GalaxyObject.
|
|
GalaxyAttribute = pb.GalaxyAttribute
|
|
// WatchDeployEventsRequest is the request for WatchDeployEvents.
|
|
WatchDeployEventsRequest = pb.WatchDeployEventsRequest
|
|
// DeployEvent is one Galaxy Repository deploy event.
|
|
DeployEvent = pb.DeployEvent
|
|
// BrowseChildrenRequest is the request for BrowseChildren.
|
|
BrowseChildrenRequest = pb.BrowseChildrenRequest
|
|
// BrowseChildrenReply is the reply for BrowseChildren.
|
|
BrowseChildrenReply = pb.BrowseChildrenReply
|
|
)
|
|
|
|
// RawDeployEventStream is the generated WatchDeployEvents client stream.
|
|
type RawDeployEventStream = grpc.ServerStreamingClient[pb.DeployEvent]
|
|
|
|
// GalaxyClient owns a gateway gRPC connection and exposes Galaxy Repository
|
|
// browse helpers. It mirrors the structure of Client and uses the same
|
|
// connection-management conventions.
|
|
type GalaxyClient struct {
|
|
conn *grpc.ClientConn
|
|
raw pb.GalaxyRepositoryClient
|
|
opts Options
|
|
}
|
|
|
|
// DialGalaxy opens a gRPC connection to the gateway for the Galaxy Repository
|
|
// service. It applies the same authentication metadata, transport security,
|
|
// and dial-timeout behavior as Dial.
|
|
func DialGalaxy(ctx context.Context, opts Options) (*GalaxyClient, error) {
|
|
if opts.Endpoint == "" {
|
|
return nil, errors.New("mxgateway: endpoint is required")
|
|
}
|
|
|
|
dialCtx := ctx
|
|
cancel := func() {}
|
|
if opts.DialTimeout > 0 {
|
|
dialCtx, cancel = context.WithTimeout(ctx, opts.DialTimeout)
|
|
} else if _, ok := ctx.Deadline(); !ok {
|
|
dialCtx, cancel = context.WithTimeout(ctx, defaultDialTimeout)
|
|
}
|
|
defer cancel()
|
|
|
|
transportCredentials, err := resolveTransportCredentials(opts)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
dialOptions := []grpc.DialOption{
|
|
grpc.WithTransportCredentials(transportCredentials),
|
|
grpc.WithUnaryInterceptor(unaryAuthInterceptor(opts.APIKey)),
|
|
grpc.WithStreamInterceptor(streamAuthInterceptor(opts.APIKey)),
|
|
grpc.WithBlock(),
|
|
}
|
|
dialOptions = append(dialOptions, opts.DialOptions...)
|
|
|
|
conn, err := grpc.DialContext(dialCtx, opts.Endpoint, dialOptions...)
|
|
if err != nil {
|
|
return nil, &GatewayError{Op: "dial", Err: err}
|
|
}
|
|
|
|
return NewGalaxyClient(conn, opts), nil
|
|
}
|
|
|
|
// NewGalaxyClient wraps an existing gRPC connection for Galaxy Repository
|
|
// access. The caller owns closing conn unless it calls Close on the returned
|
|
// GalaxyClient.
|
|
func NewGalaxyClient(conn *grpc.ClientConn, opts Options) *GalaxyClient {
|
|
return &GalaxyClient{
|
|
conn: conn,
|
|
raw: pb.NewGalaxyRepositoryClient(conn),
|
|
opts: opts,
|
|
}
|
|
}
|
|
|
|
// RawClient returns the generated gRPC client for command-specific parity
|
|
// tests.
|
|
func (c *GalaxyClient) RawClient() RawGalaxyRepositoryClient {
|
|
return c.raw
|
|
}
|
|
|
|
// TestConnection probes the Galaxy Repository service. It returns the server's
|
|
// reported ok flag and a non-nil error only when the RPC itself fails.
|
|
func (c *GalaxyClient) TestConnection(ctx context.Context) (bool, error) {
|
|
callCtx, cancel := c.callContext(ctx)
|
|
defer cancel()
|
|
|
|
reply, err := c.raw.TestConnection(callCtx, &pb.TestConnectionRequest{})
|
|
if err != nil {
|
|
return false, &GatewayError{Op: "galaxy test connection", Err: err}
|
|
}
|
|
return reply.GetOk(), nil
|
|
}
|
|
|
|
// GetLastDeployTime returns the Galaxy's last deploy timestamp. When the server
|
|
// reports present=false (no deploy recorded yet) the call returns
|
|
// (time.Time{}, false, nil). When present=true the timestamp is returned in
|
|
// UTC with present=true.
|
|
func (c *GalaxyClient) GetLastDeployTime(ctx context.Context) (time.Time, bool, error) {
|
|
callCtx, cancel := c.callContext(ctx)
|
|
defer cancel()
|
|
|
|
reply, err := c.raw.GetLastDeployTime(callCtx, &pb.GetLastDeployTimeRequest{})
|
|
if err != nil {
|
|
return time.Time{}, false, &GatewayError{Op: "galaxy get last deploy time", Err: err}
|
|
}
|
|
if !reply.GetPresent() {
|
|
return time.Time{}, false, nil
|
|
}
|
|
ts := reply.GetTimeOfLastDeploy()
|
|
if ts == nil {
|
|
return time.Time{}, false, nil
|
|
}
|
|
return ts.AsTime(), true, nil
|
|
}
|
|
|
|
// DiscoverHierarchy returns the deployed Galaxy object hierarchy with each
|
|
// object's dynamic attributes. The objects are returned in the order supplied
|
|
// by the server. The call pages over the server's NextPageToken until the
|
|
// server signals it has no more results, matching the .NET client.
|
|
func (c *GalaxyClient) DiscoverHierarchy(ctx context.Context) ([]*GalaxyObject, error) {
|
|
var objects []*GalaxyObject
|
|
pageToken := ""
|
|
seen := map[string]struct{}{}
|
|
for {
|
|
callCtx, cancel := c.callContext(ctx)
|
|
reply, err := c.raw.DiscoverHierarchy(callCtx, &pb.DiscoverHierarchyRequest{
|
|
PageSize: discoverHierarchyPageSize,
|
|
PageToken: pageToken,
|
|
})
|
|
cancel()
|
|
if err != nil {
|
|
return nil, &GatewayError{Op: "galaxy discover hierarchy", Err: err}
|
|
}
|
|
objects = append(objects, reply.GetObjects()...)
|
|
pageToken = reply.GetNextPageToken()
|
|
if pageToken == "" {
|
|
return objects, nil
|
|
}
|
|
if _, dup := seen[pageToken]; dup {
|
|
return nil, &GatewayError{
|
|
Op: "galaxy discover hierarchy",
|
|
Err: fmt.Errorf("repeated page token %q", pageToken),
|
|
}
|
|
}
|
|
seen[pageToken] = struct{}{}
|
|
}
|
|
}
|
|
|
|
// WatchDeployEventsRaw starts the generated WatchDeployEvents stream for callers
|
|
// that want direct control over Recv. The caller owns the returned stream's
|
|
// lifetime via ctx cancellation.
|
|
func (c *GalaxyClient) WatchDeployEventsRaw(ctx context.Context, req *WatchDeployEventsRequest) (RawDeployEventStream, error) {
|
|
if req == nil {
|
|
req = &pb.WatchDeployEventsRequest{}
|
|
}
|
|
|
|
stream, err := c.raw.WatchDeployEvents(ctx, req)
|
|
if err != nil {
|
|
return nil, &GatewayError{Op: "galaxy watch deploy events", Err: err}
|
|
}
|
|
return stream, nil
|
|
}
|
|
|
|
// WatchDeployEvents subscribes to Galaxy deploy events. The server emits a
|
|
// bootstrap event with the current state immediately on subscribe, then one
|
|
// event per new deploy. When lastSeenDeployTime is non-nil it is forwarded to
|
|
// the server to suppress the bootstrap event.
|
|
//
|
|
// The returned event channel is closed when the server completes the stream
|
|
// (io.EOF), when ctx is cancelled, or after a terminal error has been
|
|
// delivered on the error channel. The error channel is also closed once the
|
|
// stream tears down. Surfaced errors are wrapped in *GatewayError.
|
|
//
|
|
// Cancel ctx to tear the stream down cleanly.
|
|
func (c *GalaxyClient) WatchDeployEvents(
|
|
ctx context.Context,
|
|
lastSeenDeployTime *time.Time,
|
|
) (<-chan *DeployEvent, <-chan error, error) {
|
|
req := &pb.WatchDeployEventsRequest{}
|
|
if lastSeenDeployTime != nil {
|
|
req.LastSeenDeployTime = timestamppb.New(*lastSeenDeployTime)
|
|
}
|
|
|
|
stream, err := c.WatchDeployEventsRaw(ctx, req)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
events := make(chan *DeployEvent, 16)
|
|
errs := make(chan error, 1)
|
|
go func() {
|
|
defer close(events)
|
|
defer close(errs)
|
|
for {
|
|
event, recvErr := stream.Recv()
|
|
if recvErr == nil {
|
|
select {
|
|
case events <- event:
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
continue
|
|
}
|
|
if recvErr == io.EOF {
|
|
return
|
|
}
|
|
if status.Code(recvErr) == codes.Canceled || ctx.Err() != nil {
|
|
return
|
|
}
|
|
select {
|
|
case errs <- &GatewayError{Op: "galaxy watch deploy events", Err: recvErr}:
|
|
case <-ctx.Done():
|
|
}
|
|
return
|
|
}
|
|
}()
|
|
|
|
return events, errs, nil
|
|
}
|
|
|
|
// Close closes the underlying gRPC connection.
|
|
func (c *GalaxyClient) Close() error {
|
|
if c == nil || c.conn == nil {
|
|
return nil
|
|
}
|
|
return c.conn.Close()
|
|
}
|
|
|
|
// LazyBrowseNode is one node in a lazy Galaxy hierarchy walk produced by
|
|
// (*GalaxyClient).Browse. Children are not fetched until Expand is called.
|
|
// The node is safe for concurrent use; concurrent Expand calls coalesce onto
|
|
// a single in-flight RPC and do not block snapshot accessors.
|
|
type LazyBrowseNode struct {
|
|
client *GalaxyClient
|
|
object *pb.GalaxyObject
|
|
hasChildrenHint bool
|
|
options BrowseChildrenOptions
|
|
|
|
// expandLock gates inspection and mutation of expand-coordination state
|
|
// (expanding, expandDone, expandErr). It is held only briefly; the BrowseChildren
|
|
// RPC itself runs outside this lock so concurrent readers and waiters are not blocked.
|
|
expandLock sync.Mutex
|
|
expanding bool
|
|
expandDone chan struct{}
|
|
expandErr error
|
|
|
|
// mu protects the children snapshot and isExpanded flag for concurrent
|
|
// Children() / IsExpanded() readers.
|
|
mu sync.RWMutex
|
|
children []*LazyBrowseNode
|
|
isExpanded bool
|
|
}
|
|
|
|
// Object returns the underlying GalaxyObject describing this node.
|
|
func (n *LazyBrowseNode) Object() *pb.GalaxyObject { return n.object }
|
|
|
|
// HasChildrenHint reports the server-supplied hint on whether this node has
|
|
// matching descendants under the current filter set.
|
|
func (n *LazyBrowseNode) HasChildrenHint() bool { return n.hasChildrenHint }
|
|
|
|
// Children returns a snapshot copy of the currently-loaded child nodes. Returns
|
|
// an empty slice when Expand has not yet been called.
|
|
func (n *LazyBrowseNode) Children() []*LazyBrowseNode {
|
|
n.mu.RLock()
|
|
defer n.mu.RUnlock()
|
|
out := make([]*LazyBrowseNode, len(n.children))
|
|
copy(out, n.children)
|
|
return out
|
|
}
|
|
|
|
// IsExpanded reports whether Expand has completed successfully on this node.
|
|
func (n *LazyBrowseNode) IsExpanded() bool {
|
|
n.mu.RLock()
|
|
defer n.mu.RUnlock()
|
|
return n.isExpanded
|
|
}
|
|
|
|
// Expand fetches this node's direct children via BrowseChildren when they have
|
|
// not yet been loaded. Subsequent calls after a successful Expand are a no-op
|
|
// and do not issue another RPC.
|
|
//
|
|
// Expand is safe to call concurrently from multiple goroutines: callers that
|
|
// arrive while an expansion is in flight wait on the active RPC and share its
|
|
// result instead of issuing a second RPC. The RPC itself runs without holding
|
|
// the snapshot mutex, so concurrent Children() and IsExpanded() callers are
|
|
// not blocked for the duration of the network round trip.
|
|
//
|
|
// Failure semantics: a failed expansion surfaces the same error to every
|
|
// in-flight waiter, but the node is left in its pre-call state (isExpanded =
|
|
// false, no in-flight expansion). The next Expand call therefore retries with
|
|
// a fresh RPC; failures are not sticky.
|
|
func (n *LazyBrowseNode) Expand(ctx context.Context) error {
|
|
// Fast path: already expanded.
|
|
n.mu.RLock()
|
|
if n.isExpanded {
|
|
n.mu.RUnlock()
|
|
return nil
|
|
}
|
|
n.mu.RUnlock()
|
|
|
|
// Either start a new expansion or wait on an existing one.
|
|
n.expandLock.Lock()
|
|
n.mu.RLock()
|
|
alreadyExpanded := n.isExpanded
|
|
n.mu.RUnlock()
|
|
if alreadyExpanded {
|
|
n.expandLock.Unlock()
|
|
return nil
|
|
}
|
|
if n.expanding {
|
|
done := n.expandDone
|
|
n.expandLock.Unlock()
|
|
select {
|
|
case <-done:
|
|
n.expandLock.Lock()
|
|
err := n.expandErr
|
|
n.expandLock.Unlock()
|
|
return err
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
}
|
|
}
|
|
n.expanding = true
|
|
n.expandDone = make(chan struct{})
|
|
done := n.expandDone
|
|
n.expandLock.Unlock()
|
|
|
|
// Issue the RPC outside any lock so concurrent readers/waiters are not blocked.
|
|
parentID := n.object.GetGobjectId()
|
|
children, err := n.client.browseChildrenInner(ctx, &parentID, n.options)
|
|
|
|
if err == nil {
|
|
n.mu.Lock()
|
|
n.children = children
|
|
n.isExpanded = true
|
|
n.mu.Unlock()
|
|
}
|
|
|
|
// Publish result to waiters and clear the in-flight marker so a failed
|
|
// expansion can be retried by the next Expand call.
|
|
n.expandLock.Lock()
|
|
n.expandErr = err
|
|
n.expanding = false
|
|
close(done)
|
|
n.expandLock.Unlock()
|
|
|
|
return err
|
|
}
|
|
|
|
// Browse returns the root nodes of the Galaxy hierarchy. The returned nodes
|
|
// have only their server-supplied hints populated; call Expand on each node to
|
|
// fetch its direct children. When opts is nil the server defaults apply.
|
|
func (c *GalaxyClient) Browse(ctx context.Context, opts *BrowseChildrenOptions) ([]*LazyBrowseNode, error) {
|
|
effective := BrowseChildrenOptions{}
|
|
if opts != nil {
|
|
effective = *opts
|
|
}
|
|
return c.browseChildrenInner(ctx, nil, effective)
|
|
}
|
|
|
|
// BrowseChildrenRaw issues a single BrowseChildren RPC and returns the raw
|
|
// reply for callers that need direct page-token control. Transport-level
|
|
// failures are wrapped in *GatewayError to match the rest of the client.
|
|
func (c *GalaxyClient) BrowseChildrenRaw(ctx context.Context, req *pb.BrowseChildrenRequest) (*pb.BrowseChildrenReply, error) {
|
|
callCtx, cancel := c.callContext(ctx)
|
|
defer cancel()
|
|
reply, err := c.raw.BrowseChildren(callCtx, req)
|
|
if err != nil {
|
|
return nil, &GatewayError{Op: "galaxy browse children", Err: err}
|
|
}
|
|
return reply, nil
|
|
}
|
|
|
|
func (c *GalaxyClient) browseChildrenInner(
|
|
ctx context.Context,
|
|
parentGobjectID *int32,
|
|
opts BrowseChildrenOptions,
|
|
) ([]*LazyBrowseNode, error) {
|
|
var nodes []*LazyBrowseNode
|
|
pageToken := ""
|
|
seen := map[string]struct{}{}
|
|
for {
|
|
req := &pb.BrowseChildrenRequest{
|
|
PageSize: browseChildrenPageSize,
|
|
PageToken: pageToken,
|
|
CategoryIds: opts.CategoryIds,
|
|
TemplateChainContains: opts.TemplateChainContains,
|
|
TagNameGlob: opts.TagNameGlob,
|
|
AlarmBearingOnly: opts.AlarmBearingOnly,
|
|
HistorizedOnly: opts.HistorizedOnly,
|
|
}
|
|
if parentGobjectID != nil {
|
|
req.Parent = &pb.BrowseChildrenRequest_ParentGobjectId{ParentGobjectId: *parentGobjectID}
|
|
}
|
|
if opts.IncludeAttributes != nil {
|
|
req.IncludeAttributes = opts.IncludeAttributes
|
|
}
|
|
|
|
reply, err := c.BrowseChildrenRaw(ctx, req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for i, child := range reply.GetChildren() {
|
|
hasChildren := reply.GetChildHasChildren()
|
|
hint := i < len(hasChildren) && hasChildren[i]
|
|
nodes = append(nodes, &LazyBrowseNode{
|
|
client: c,
|
|
object: child,
|
|
hasChildrenHint: hint,
|
|
options: opts,
|
|
})
|
|
}
|
|
|
|
pageToken = reply.GetNextPageToken()
|
|
if pageToken == "" {
|
|
return nodes, nil
|
|
}
|
|
if _, dup := seen[pageToken]; dup {
|
|
return nil, &GatewayError{
|
|
Op: "galaxy browse children",
|
|
Err: fmt.Errorf("repeated page token %q", pageToken),
|
|
}
|
|
}
|
|
seen[pageToken] = struct{}{}
|
|
}
|
|
}
|
|
|
|
func (c *GalaxyClient) callContext(ctx context.Context) (context.Context, context.CancelFunc) {
|
|
timeout := c.opts.CallTimeout
|
|
if timeout == 0 {
|
|
timeout = defaultCallTimeout
|
|
}
|
|
if timeout < 0 {
|
|
return ctx, func() {}
|
|
}
|
|
if deadline, ok := ctx.Deadline(); ok {
|
|
timeoutDeadline := time.Now().Add(timeout)
|
|
if deadline.Before(timeoutDeadline) {
|
|
return ctx, func() {}
|
|
}
|
|
}
|
|
return context.WithTimeout(ctx, timeout)
|
|
}
|