package mxgateway import ( "context" "errors" "fmt" "io" "sync" "time" pb "gitea.dohertylan.com/dohertj2/mxaccessgw/clients/go/internal/generated" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/timestamppb" ) // browseChildrenPageSize is the per-request page size used by the lazy walker. const browseChildrenPageSize = 500 // discoverHierarchyPageSize is the per-request page size used by DiscoverHierarchy. // Mirrors the .NET client constant so large galaxies are not silently truncated // by the server's default page cap. const discoverHierarchyPageSize = 5000 // RawGalaxyRepositoryClient is the generated gRPC client interface for the // Galaxy Repository service exposed for callers that need direct contract // access. type RawGalaxyRepositoryClient = pb.GalaxyRepositoryClient // Generated protobuf aliases for Galaxy Repository messages. type ( // TestConnectionRequest is the request for Galaxy Repository TestConnection. TestConnectionRequest = pb.TestConnectionRequest // TestConnectionReply is the reply for Galaxy Repository TestConnection. TestConnectionReply = pb.TestConnectionReply // GetLastDeployTimeRequest is the request for GetLastDeployTime. GetLastDeployTimeRequest = pb.GetLastDeployTimeRequest // GetLastDeployTimeReply is the reply for GetLastDeployTime. GetLastDeployTimeReply = pb.GetLastDeployTimeReply // DiscoverHierarchyRequest is the request for DiscoverHierarchy. DiscoverHierarchyRequest = pb.DiscoverHierarchyRequest // DiscoverHierarchyReply is the reply for DiscoverHierarchy. DiscoverHierarchyReply = pb.DiscoverHierarchyReply // GalaxyObject describes one Galaxy object with its dynamic attributes. GalaxyObject = pb.GalaxyObject // GalaxyAttribute describes one dynamic attribute on a GalaxyObject. GalaxyAttribute = pb.GalaxyAttribute // WatchDeployEventsRequest is the request for WatchDeployEvents. WatchDeployEventsRequest = pb.WatchDeployEventsRequest // DeployEvent is one Galaxy Repository deploy event. DeployEvent = pb.DeployEvent // BrowseChildrenRequest is the request for BrowseChildren. BrowseChildrenRequest = pb.BrowseChildrenRequest // BrowseChildrenReply is the reply for BrowseChildren. BrowseChildrenReply = pb.BrowseChildrenReply ) // RawDeployEventStream is the generated WatchDeployEvents client stream. type RawDeployEventStream = grpc.ServerStreamingClient[pb.DeployEvent] // GalaxyClient owns a gateway gRPC connection and exposes Galaxy Repository // browse helpers. It mirrors the structure of Client and uses the same // connection-management conventions. type GalaxyClient struct { conn *grpc.ClientConn raw pb.GalaxyRepositoryClient opts Options } // DialGalaxy opens a gRPC connection to the gateway for the Galaxy Repository // service. It applies the same authentication metadata, transport security, // and dial-timeout behavior as Dial. func DialGalaxy(ctx context.Context, opts Options) (*GalaxyClient, error) { if opts.Endpoint == "" { return nil, errors.New("mxgateway: endpoint is required") } dialCtx := ctx cancel := func() {} if opts.DialTimeout > 0 { dialCtx, cancel = context.WithTimeout(ctx, opts.DialTimeout) } else if _, ok := ctx.Deadline(); !ok { dialCtx, cancel = context.WithTimeout(ctx, defaultDialTimeout) } defer cancel() transportCredentials, err := resolveTransportCredentials(opts) if err != nil { return nil, err } dialOptions := []grpc.DialOption{ grpc.WithTransportCredentials(transportCredentials), grpc.WithUnaryInterceptor(unaryAuthInterceptor(opts.APIKey)), grpc.WithStreamInterceptor(streamAuthInterceptor(opts.APIKey)), grpc.WithBlock(), } dialOptions = append(dialOptions, opts.DialOptions...) conn, err := grpc.DialContext(dialCtx, opts.Endpoint, dialOptions...) if err != nil { return nil, &GatewayError{Op: "dial", Err: err} } return NewGalaxyClient(conn, opts), nil } // NewGalaxyClient wraps an existing gRPC connection for Galaxy Repository // access. The caller owns closing conn unless it calls Close on the returned // GalaxyClient. func NewGalaxyClient(conn *grpc.ClientConn, opts Options) *GalaxyClient { return &GalaxyClient{ conn: conn, raw: pb.NewGalaxyRepositoryClient(conn), opts: opts, } } // RawClient returns the generated gRPC client for command-specific parity // tests. func (c *GalaxyClient) RawClient() RawGalaxyRepositoryClient { return c.raw } // TestConnection probes the Galaxy Repository service. It returns the server's // reported ok flag and a non-nil error only when the RPC itself fails. func (c *GalaxyClient) TestConnection(ctx context.Context) (bool, error) { callCtx, cancel := c.callContext(ctx) defer cancel() reply, err := c.raw.TestConnection(callCtx, &pb.TestConnectionRequest{}) if err != nil { return false, &GatewayError{Op: "galaxy test connection", Err: err} } return reply.GetOk(), nil } // GetLastDeployTime returns the Galaxy's last deploy timestamp. When the server // reports present=false (no deploy recorded yet) the call returns // (time.Time{}, false, nil). When present=true the timestamp is returned in // UTC with present=true. func (c *GalaxyClient) GetLastDeployTime(ctx context.Context) (time.Time, bool, error) { callCtx, cancel := c.callContext(ctx) defer cancel() reply, err := c.raw.GetLastDeployTime(callCtx, &pb.GetLastDeployTimeRequest{}) if err != nil { return time.Time{}, false, &GatewayError{Op: "galaxy get last deploy time", Err: err} } if !reply.GetPresent() { return time.Time{}, false, nil } ts := reply.GetTimeOfLastDeploy() if ts == nil { return time.Time{}, false, nil } return ts.AsTime(), true, nil } // DiscoverHierarchy returns the deployed Galaxy object hierarchy with each // object's dynamic attributes. The objects are returned in the order supplied // by the server. The call pages over the server's NextPageToken until the // server signals it has no more results, matching the .NET client. func (c *GalaxyClient) DiscoverHierarchy(ctx context.Context) ([]*GalaxyObject, error) { var objects []*GalaxyObject pageToken := "" seen := map[string]struct{}{} for { callCtx, cancel := c.callContext(ctx) reply, err := c.raw.DiscoverHierarchy(callCtx, &pb.DiscoverHierarchyRequest{ PageSize: discoverHierarchyPageSize, PageToken: pageToken, }) cancel() if err != nil { return nil, &GatewayError{Op: "galaxy discover hierarchy", Err: err} } objects = append(objects, reply.GetObjects()...) pageToken = reply.GetNextPageToken() if pageToken == "" { return objects, nil } if _, dup := seen[pageToken]; dup { return nil, &GatewayError{ Op: "galaxy discover hierarchy", Err: fmt.Errorf("repeated page token %q", pageToken), } } seen[pageToken] = struct{}{} } } // WatchDeployEventsRaw starts the generated WatchDeployEvents stream for callers // that want direct control over Recv. The caller owns the returned stream's // lifetime via ctx cancellation. func (c *GalaxyClient) WatchDeployEventsRaw(ctx context.Context, req *WatchDeployEventsRequest) (RawDeployEventStream, error) { if req == nil { req = &pb.WatchDeployEventsRequest{} } stream, err := c.raw.WatchDeployEvents(ctx, req) if err != nil { return nil, &GatewayError{Op: "galaxy watch deploy events", Err: err} } return stream, nil } // WatchDeployEvents subscribes to Galaxy deploy events. The server emits a // bootstrap event with the current state immediately on subscribe, then one // event per new deploy. When lastSeenDeployTime is non-nil it is forwarded to // the server to suppress the bootstrap event. // // The returned event channel is closed when the server completes the stream // (io.EOF), when ctx is cancelled, or after a terminal error has been // delivered on the error channel. The error channel is also closed once the // stream tears down. Surfaced errors are wrapped in *GatewayError. // // Cancel ctx to tear the stream down cleanly. func (c *GalaxyClient) WatchDeployEvents( ctx context.Context, lastSeenDeployTime *time.Time, ) (<-chan *DeployEvent, <-chan error, error) { req := &pb.WatchDeployEventsRequest{} if lastSeenDeployTime != nil { req.LastSeenDeployTime = timestamppb.New(*lastSeenDeployTime) } stream, err := c.WatchDeployEventsRaw(ctx, req) if err != nil { return nil, nil, err } events := make(chan *DeployEvent, 16) errs := make(chan error, 1) go func() { defer close(events) defer close(errs) for { event, recvErr := stream.Recv() if recvErr == nil { select { case events <- event: case <-ctx.Done(): return } continue } if recvErr == io.EOF { return } if status.Code(recvErr) == codes.Canceled || ctx.Err() != nil { return } select { case errs <- &GatewayError{Op: "galaxy watch deploy events", Err: recvErr}: case <-ctx.Done(): } return } }() return events, errs, nil } // Close closes the underlying gRPC connection. func (c *GalaxyClient) Close() error { if c == nil || c.conn == nil { return nil } return c.conn.Close() } // LazyBrowseNode is one node in a lazy Galaxy hierarchy walk produced by // (*GalaxyClient).Browse. Children are not fetched until Expand is called. // The node is safe for concurrent use; concurrent Expand calls coalesce onto // a single in-flight RPC and do not block snapshot accessors. type LazyBrowseNode struct { client *GalaxyClient object *pb.GalaxyObject hasChildrenHint bool options BrowseChildrenOptions // expandLock gates inspection and mutation of expand-coordination state // (expanding, expandDone, expandErr). It is held only briefly; the BrowseChildren // RPC itself runs outside this lock so concurrent readers and waiters are not blocked. expandLock sync.Mutex expanding bool expandDone chan struct{} expandErr error // mu protects the children snapshot and isExpanded flag for concurrent // Children() / IsExpanded() readers. mu sync.RWMutex children []*LazyBrowseNode isExpanded bool } // Object returns the underlying GalaxyObject describing this node. func (n *LazyBrowseNode) Object() *pb.GalaxyObject { return n.object } // HasChildrenHint reports the server-supplied hint on whether this node has // matching descendants under the current filter set. func (n *LazyBrowseNode) HasChildrenHint() bool { return n.hasChildrenHint } // Children returns a snapshot copy of the currently-loaded child nodes. Returns // an empty slice when Expand has not yet been called. func (n *LazyBrowseNode) Children() []*LazyBrowseNode { n.mu.RLock() defer n.mu.RUnlock() out := make([]*LazyBrowseNode, len(n.children)) copy(out, n.children) return out } // IsExpanded reports whether Expand has completed successfully on this node. func (n *LazyBrowseNode) IsExpanded() bool { n.mu.RLock() defer n.mu.RUnlock() return n.isExpanded } // Expand fetches this node's direct children via BrowseChildren when they have // not yet been loaded. Subsequent calls after a successful Expand are a no-op // and do not issue another RPC. // // Expand is safe to call concurrently from multiple goroutines: callers that // arrive while an expansion is in flight wait on the active RPC and share its // result instead of issuing a second RPC. The RPC itself runs without holding // the snapshot mutex, so concurrent Children() and IsExpanded() callers are // not blocked for the duration of the network round trip. // // Failure semantics: a failed expansion surfaces the same error to every // in-flight waiter, but the node is left in its pre-call state (isExpanded = // false, no in-flight expansion). The next Expand call therefore retries with // a fresh RPC; failures are not sticky. func (n *LazyBrowseNode) Expand(ctx context.Context) error { // Fast path: already expanded. n.mu.RLock() if n.isExpanded { n.mu.RUnlock() return nil } n.mu.RUnlock() // Either start a new expansion or wait on an existing one. n.expandLock.Lock() n.mu.RLock() alreadyExpanded := n.isExpanded n.mu.RUnlock() if alreadyExpanded { n.expandLock.Unlock() return nil } if n.expanding { done := n.expandDone n.expandLock.Unlock() select { case <-done: n.expandLock.Lock() err := n.expandErr n.expandLock.Unlock() return err case <-ctx.Done(): return ctx.Err() } } n.expanding = true n.expandDone = make(chan struct{}) done := n.expandDone n.expandLock.Unlock() // Issue the RPC outside any lock so concurrent readers/waiters are not blocked. parentID := n.object.GetGobjectId() children, err := n.client.browseChildrenInner(ctx, &parentID, n.options) if err == nil { n.mu.Lock() n.children = children n.isExpanded = true n.mu.Unlock() } // Publish result to waiters and clear the in-flight marker so a failed // expansion can be retried by the next Expand call. n.expandLock.Lock() n.expandErr = err n.expanding = false close(done) n.expandLock.Unlock() return err } // Browse returns the root nodes of the Galaxy hierarchy. The returned nodes // have only their server-supplied hints populated; call Expand on each node to // fetch its direct children. When opts is nil the server defaults apply. func (c *GalaxyClient) Browse(ctx context.Context, opts *BrowseChildrenOptions) ([]*LazyBrowseNode, error) { effective := BrowseChildrenOptions{} if opts != nil { effective = *opts } return c.browseChildrenInner(ctx, nil, effective) } // BrowseChildrenRaw issues a single BrowseChildren RPC and returns the raw // reply for callers that need direct page-token control. Transport-level // failures are wrapped in *GatewayError to match the rest of the client. func (c *GalaxyClient) BrowseChildrenRaw(ctx context.Context, req *pb.BrowseChildrenRequest) (*pb.BrowseChildrenReply, error) { callCtx, cancel := c.callContext(ctx) defer cancel() reply, err := c.raw.BrowseChildren(callCtx, req) if err != nil { return nil, &GatewayError{Op: "galaxy browse children", Err: err} } return reply, nil } func (c *GalaxyClient) browseChildrenInner( ctx context.Context, parentGobjectID *int32, opts BrowseChildrenOptions, ) ([]*LazyBrowseNode, error) { var nodes []*LazyBrowseNode pageToken := "" seen := map[string]struct{}{} for { req := &pb.BrowseChildrenRequest{ PageSize: browseChildrenPageSize, PageToken: pageToken, CategoryIds: opts.CategoryIds, TemplateChainContains: opts.TemplateChainContains, TagNameGlob: opts.TagNameGlob, AlarmBearingOnly: opts.AlarmBearingOnly, HistorizedOnly: opts.HistorizedOnly, } if parentGobjectID != nil { req.Parent = &pb.BrowseChildrenRequest_ParentGobjectId{ParentGobjectId: *parentGobjectID} } if opts.IncludeAttributes != nil { req.IncludeAttributes = opts.IncludeAttributes } reply, err := c.BrowseChildrenRaw(ctx, req) if err != nil { return nil, err } for i, child := range reply.GetChildren() { hasChildren := reply.GetChildHasChildren() hint := i < len(hasChildren) && hasChildren[i] nodes = append(nodes, &LazyBrowseNode{ client: c, object: child, hasChildrenHint: hint, options: opts, }) } pageToken = reply.GetNextPageToken() if pageToken == "" { return nodes, nil } if _, dup := seen[pageToken]; dup { return nil, &GatewayError{ Op: "galaxy browse children", Err: fmt.Errorf("repeated page token %q", pageToken), } } seen[pageToken] = struct{}{} } } func (c *GalaxyClient) callContext(ctx context.Context) (context.Context, context.CancelFunc) { timeout := c.opts.CallTimeout if timeout == 0 { timeout = defaultCallTimeout } if timeout < 0 { return ctx, func() {} } if deadline, ok := ctx.Deadline(); ok { timeoutDeadline := time.Now().Add(timeout) if deadline.Before(timeoutDeadline) { return ctx, func() {} } } return context.WithTimeout(ctx, timeout) }