// Package mxgateway is the Go client for the MXAccess Gateway gRPC service. // // The package wraps the generated gRPC contract with session-oriented helpers // for invoking MXAccess commands, streaming events, and browsing the Galaxy // Repository. Authentication uses an API-key bearer token attached as gRPC // metadata on every call. // // Typical use opens a Client with Dial, opens a Session, invokes commands such // as Register, AddItem, Advise, and Write, and consumes events via // SubscribeEvents. Galaxy Repository browse RPCs are exposed through // GalaxyClient. package mxgateway import ( "context" "crypto/tls" "errors" "time" pb "gitea.dohertylan.com/dohertj2/mxaccessgw/clients/go/internal/generated" "google.golang.org/grpc" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" "google.golang.org/protobuf/types/known/durationpb" ) const ( defaultDialTimeout = 10 * time.Second defaultCallTimeout = 30 * time.Second ) // Client owns a gateway gRPC connection and exposes session-oriented helpers. type Client struct { conn *grpc.ClientConn raw pb.MxAccessGatewayClient opts Options } // Dial opens a gRPC connection to the gateway and configures auth metadata // and transport security. // // The connection is created lazily with grpc.NewClient: the channel is not // established until the first RPC (or the readiness probe below) needs it, so // a gateway that is briefly unavailable at Dial time no longer turns into a // hard error — the connection recovers when the gateway comes up. To preserve // fail-fast behavior, Dial then runs an explicit readiness probe bounded by // DialTimeout (default 10s, or ctx's deadline when sooner): it triggers the // initial connect and waits for the channel to reach Ready, returning a // *GatewayError if the gateway cannot be reached in that window. Cancelling // ctx aborts the probe. func Dial(ctx context.Context, opts Options) (*Client, error) { conn, err := dial(ctx, opts) if err != nil { return nil, err } return NewClient(conn, opts), nil } // dial builds the shared gRPC connection used by both Client and GalaxyClient: // it resolves transport credentials, assembles dial options, creates a lazy // connection with grpc.NewClient, and runs the DialTimeout-bounded readiness // probe so callers still fail fast when the gateway is unreachable. func dial(ctx context.Context, opts Options) (*grpc.ClientConn, error) { if opts.Endpoint == "" { return nil, errors.New("mxgateway: endpoint is required") } transportCredentials, err := resolveTransportCredentials(opts) if err != nil { return nil, err } dialOptions := []grpc.DialOption{ grpc.WithTransportCredentials(transportCredentials), grpc.WithUnaryInterceptor(unaryAuthInterceptor(opts.APIKey)), grpc.WithStreamInterceptor(streamAuthInterceptor(opts.APIKey)), } dialOptions = append(dialOptions, opts.DialOptions...) conn, err := grpc.NewClient(opts.Endpoint, dialOptions...) if err != nil { return nil, &GatewayError{Op: "dial", Err: err} } if err := waitForReady(ctx, conn, opts.DialTimeout); err != nil { _ = conn.Close() return nil, &GatewayError{Op: "dial", Err: err} } return conn, nil } // waitForReady triggers the initial connect on conn and blocks until the // channel reaches connectivity.Ready, the timeout elapses, or ctx is // cancelled. The wait is bounded by dialTimeout when positive, otherwise by // ctx's existing deadline, otherwise by defaultDialTimeout. func waitForReady(ctx context.Context, conn *grpc.ClientConn, dialTimeout time.Duration) error { probeCtx := ctx cancel := func() {} if dialTimeout > 0 { probeCtx, cancel = context.WithTimeout(ctx, dialTimeout) } else if _, ok := ctx.Deadline(); !ok { probeCtx, cancel = context.WithTimeout(ctx, defaultDialTimeout) } defer cancel() conn.Connect() for { state := conn.GetState() if state == connectivity.Ready { return nil } if !conn.WaitForStateChange(probeCtx, state) { return probeCtx.Err() } } } // NewClient wraps an existing gRPC connection. The caller owns closing conn // unless it calls Close on the returned Client. func NewClient(conn *grpc.ClientConn, opts Options) *Client { return &Client{ conn: conn, raw: pb.NewMxAccessGatewayClient(conn), opts: opts, } } // RawClient returns the generated gRPC client for command-specific parity tests. func (c *Client) RawClient() RawGatewayClient { return c.raw } // OpenSession creates a gateway-backed MXAccess session. func (c *Client) OpenSession(ctx context.Context, opts OpenSessionOptions) (*Session, error) { reply, err := c.OpenSessionRaw(ctx, opts.Request()) if err != nil { return nil, err } return newSession(c, reply), nil } // OpenSessionRaw sends a raw OpenSession request and validates protocol status. func (c *Client) OpenSessionRaw(ctx context.Context, req *OpenSessionRequest) (*OpenSessionReply, error) { if req == nil { return nil, errors.New("mxgateway: open session request is required") } callCtx, cancel := c.callContext(ctx) defer cancel() reply, err := c.raw.OpenSession(callCtx, req) if err != nil { return nil, &GatewayError{Op: "open session", Err: err} } if err := EnsureProtocolSuccess("open session", reply.GetProtocolStatus(), nil); err != nil { return reply, err } return reply, nil } // Invoke sends a raw MXAccess command request and validates protocol and // MXAccess status fields while preserving the raw reply on typed errors. func (c *Client) Invoke(ctx context.Context, req *MxCommandRequest) (*MxCommandReply, error) { if req == nil { return nil, errors.New("mxgateway: command request is required") } callCtx, cancel := c.callContext(ctx) defer cancel() reply, err := c.raw.Invoke(callCtx, req) if err != nil { return nil, &GatewayError{Op: "invoke", Err: err} } if err := EnsureProtocolSuccess("invoke", reply.GetProtocolStatus(), reply); err != nil { return reply, err } if err := EnsureMxAccessSuccess("invoke", reply); err != nil { return reply, err } return reply, nil } // CloseSessionRaw sends a raw CloseSession request and validates protocol // status. func (c *Client) CloseSessionRaw(ctx context.Context, req *CloseSessionRequest) (*CloseSessionReply, error) { if req == nil { return nil, errors.New("mxgateway: close session request is required") } callCtx, cancel := c.callContext(ctx) defer cancel() reply, err := c.raw.CloseSession(callCtx, req) if err != nil { return nil, &GatewayError{Op: "close session", Err: err} } if err := EnsureProtocolSuccess("close session", reply.GetProtocolStatus(), nil); err != nil { return reply, err } return reply, nil } // StreamEventsRaw starts the generated event stream for callers that need direct // control over Recv. func (c *Client) StreamEventsRaw(ctx context.Context, req *StreamEventsRequest) (RawEventStream, error) { if req == nil { return nil, errors.New("mxgateway: stream events request is required") } stream, err := c.raw.StreamEvents(ctx, req) if err != nil { return nil, &GatewayError{Op: "stream events", Err: err} } return stream, nil } // Close closes the underlying gRPC connection. func (c *Client) Close() error { if c == nil || c.conn == nil { return nil } return c.conn.Close() } func (c *Client) callContext(ctx context.Context) (context.Context, context.CancelFunc) { return callContext(ctx, c.opts.CallTimeout) } // callContext derives a per-RPC context from ctx, applying callTimeout: zero // uses defaultCallTimeout, a negative value disables the bound entirely, and a // caller-supplied deadline that is already sooner than the derived timeout is // kept as-is rather than being lengthened. func callContext(ctx context.Context, callTimeout time.Duration) (context.Context, context.CancelFunc) { timeout := callTimeout if timeout == 0 { timeout = defaultCallTimeout } if timeout < 0 { return ctx, func() {} } if deadline, ok := ctx.Deadline(); ok { timeoutDeadline := time.Now().Add(timeout) if deadline.Before(timeoutDeadline) { return ctx, func() {} } } return context.WithTimeout(ctx, timeout) } func resolveTransportCredentials(opts Options) (credentials.TransportCredentials, error) { if opts.TransportCredentials != nil { return opts.TransportCredentials, nil } if opts.Plaintext { return insecure.NewCredentials(), nil } if opts.CACertFile != "" { return credentials.NewClientTLSFromFile(opts.CACertFile, opts.ServerNameOverride) } if opts.TLSConfig != nil { cfg := opts.TLSConfig.Clone() if opts.ServerNameOverride != "" { cfg.ServerName = opts.ServerNameOverride } return credentials.NewTLS(cfg), nil } return credentials.NewTLS(&tls.Config{ MinVersion: tls.VersionTLS12, ServerName: opts.ServerNameOverride, }), nil } // OpenSessionOptions describes fields used to create an OpenSessionRequest. type OpenSessionOptions struct { // RequestedBackend selects the gateway worker backend (empty for default). RequestedBackend string // ClientSessionName is a human-readable name recorded on the session. ClientSessionName string // ClientCorrelationID echoes through gateway logs and replies for tracing. ClientCorrelationID string // CommandTimeout sets the per-command timeout the gateway forwards to the // worker; zero leaves the gateway default in place. CommandTimeout time.Duration } // Request returns the raw protobuf OpenSessionRequest for these options. func (o OpenSessionOptions) Request() *OpenSessionRequest { req := &OpenSessionRequest{ RequestedBackend: o.RequestedBackend, ClientSessionName: o.ClientSessionName, ClientCorrelationId: o.ClientCorrelationID, } if o.CommandTimeout > 0 { req.CommandTimeout = durationpb.New(o.CommandTimeout) } return req }