package mxgateway import ( "context" "errors" "io" "time" pb "gitea.dohertylan.com/dohertj2/mxaccessgw/clients/go/internal/generated" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/timestamppb" ) // RawGalaxyRepositoryClient is the generated gRPC client interface for the // Galaxy Repository service exposed for callers that need direct contract // access. type RawGalaxyRepositoryClient = pb.GalaxyRepositoryClient // Generated protobuf aliases for Galaxy Repository messages. type ( // TestConnectionRequest is the request for Galaxy Repository TestConnection. TestConnectionRequest = pb.TestConnectionRequest // TestConnectionReply is the reply for Galaxy Repository TestConnection. TestConnectionReply = pb.TestConnectionReply // GetLastDeployTimeRequest is the request for GetLastDeployTime. GetLastDeployTimeRequest = pb.GetLastDeployTimeRequest // GetLastDeployTimeReply is the reply for GetLastDeployTime. GetLastDeployTimeReply = pb.GetLastDeployTimeReply // DiscoverHierarchyRequest is the request for DiscoverHierarchy. DiscoverHierarchyRequest = pb.DiscoverHierarchyRequest // DiscoverHierarchyReply is the reply for DiscoverHierarchy. DiscoverHierarchyReply = pb.DiscoverHierarchyReply // GalaxyObject describes one Galaxy object with its dynamic attributes. GalaxyObject = pb.GalaxyObject // GalaxyAttribute describes one dynamic attribute on a GalaxyObject. GalaxyAttribute = pb.GalaxyAttribute // WatchDeployEventsRequest is the request for WatchDeployEvents. WatchDeployEventsRequest = pb.WatchDeployEventsRequest // DeployEvent is one Galaxy Repository deploy event. DeployEvent = pb.DeployEvent ) // RawDeployEventStream is the generated WatchDeployEvents client stream. type RawDeployEventStream = grpc.ServerStreamingClient[pb.DeployEvent] // GalaxyClient owns a gateway gRPC connection and exposes Galaxy Repository // browse helpers. It mirrors the structure of Client and uses the same // connection-management conventions. type GalaxyClient struct { conn *grpc.ClientConn raw pb.GalaxyRepositoryClient opts Options } // DialGalaxy opens a gRPC connection to the gateway for the Galaxy Repository // service. It applies the same authentication metadata, transport security, // lazy connection, and DialTimeout-bounded readiness probe as Dial. func DialGalaxy(ctx context.Context, opts Options) (*GalaxyClient, error) { conn, err := dial(ctx, opts) if err != nil { return nil, err } return NewGalaxyClient(conn, opts), nil } // NewGalaxyClient wraps an existing gRPC connection for Galaxy Repository // access. The caller owns closing conn unless it calls Close on the returned // GalaxyClient. func NewGalaxyClient(conn *grpc.ClientConn, opts Options) *GalaxyClient { return &GalaxyClient{ conn: conn, raw: pb.NewGalaxyRepositoryClient(conn), opts: opts, } } // RawClient returns the generated gRPC client for command-specific parity // tests. func (c *GalaxyClient) RawClient() RawGalaxyRepositoryClient { return c.raw } // TestConnection probes the Galaxy Repository service. It returns the server's // reported ok flag and a non-nil error only when the RPC itself fails. func (c *GalaxyClient) TestConnection(ctx context.Context) (bool, error) { callCtx, cancel := c.callContext(ctx) defer cancel() reply, err := c.raw.TestConnection(callCtx, &pb.TestConnectionRequest{}) if err != nil { return false, &GatewayError{Op: "galaxy test connection", Err: err} } return reply.GetOk(), nil } // GetLastDeployTime returns the Galaxy's last deploy timestamp. When the server // reports present=false (no deploy recorded yet) the call returns // (time.Time{}, false, nil). When present=true the timestamp is returned in // UTC with present=true. func (c *GalaxyClient) GetLastDeployTime(ctx context.Context) (time.Time, bool, error) { callCtx, cancel := c.callContext(ctx) defer cancel() reply, err := c.raw.GetLastDeployTime(callCtx, &pb.GetLastDeployTimeRequest{}) if err != nil { return time.Time{}, false, &GatewayError{Op: "galaxy get last deploy time", Err: err} } if !reply.GetPresent() { return time.Time{}, false, nil } ts := reply.GetTimeOfLastDeploy() if ts == nil { return time.Time{}, false, nil } return ts.AsTime(), true, nil } // DiscoverHierarchy returns the deployed Galaxy object hierarchy with each // object's dynamic attributes. The objects are returned in the order supplied // by the server. func (c *GalaxyClient) DiscoverHierarchy(ctx context.Context) ([]*GalaxyObject, error) { callCtx, cancel := c.callContext(ctx) defer cancel() reply, err := c.raw.DiscoverHierarchy(callCtx, &pb.DiscoverHierarchyRequest{}) if err != nil { return nil, &GatewayError{Op: "galaxy discover hierarchy", Err: err} } return reply.GetObjects(), nil } // WatchDeployEventsRaw starts the generated WatchDeployEvents stream for callers // that want direct control over Recv. The caller owns the returned stream's // lifetime via ctx cancellation. func (c *GalaxyClient) WatchDeployEventsRaw(ctx context.Context, req *WatchDeployEventsRequest) (RawDeployEventStream, error) { if req == nil { req = &pb.WatchDeployEventsRequest{} } stream, err := c.raw.WatchDeployEvents(ctx, req) if err != nil { return nil, &GatewayError{Op: "galaxy watch deploy events", Err: err} } return stream, nil } // WatchDeployEvents subscribes to Galaxy deploy events. The server emits a // bootstrap event with the current state immediately on subscribe, then one // event per new deploy. When lastSeenDeployTime is non-nil it is forwarded to // the server to suppress the bootstrap event. // // The returned event channel is closed when the server completes the stream // (io.EOF), when ctx is cancelled, or after a terminal error has been // delivered on the error channel. The error channel is also closed once the // stream tears down. Surfaced errors are wrapped in *GatewayError. // // Cancel ctx to tear the stream down cleanly. func (c *GalaxyClient) WatchDeployEvents( ctx context.Context, lastSeenDeployTime *time.Time, ) (<-chan *DeployEvent, <-chan error, error) { req := &pb.WatchDeployEventsRequest{} if lastSeenDeployTime != nil { req.LastSeenDeployTime = timestamppb.New(*lastSeenDeployTime) } stream, err := c.WatchDeployEventsRaw(ctx, req) if err != nil { return nil, nil, err } events := make(chan *DeployEvent, 16) errs := make(chan error, 1) go func() { defer close(events) defer close(errs) for { event, recvErr := stream.Recv() if recvErr == nil { select { case events <- event: case <-ctx.Done(): return } continue } if errors.Is(recvErr, io.EOF) { return } if status.Code(recvErr) == codes.Canceled || ctx.Err() != nil { return } select { case errs <- &GatewayError{Op: "galaxy watch deploy events", Err: recvErr}: case <-ctx.Done(): } return } }() return events, errs, nil } // Close closes the underlying gRPC connection. func (c *GalaxyClient) Close() error { if c == nil || c.conn == nil { return nil } return c.conn.Close() } func (c *GalaxyClient) callContext(ctx context.Context) (context.Context, context.CancelFunc) { return callContext(ctx, c.opts.CallTimeout) }