555fe4c0ba
Client.Go-004: ran gofmt on alarms_test.go and galaxy_test.go; the tree is now gofmt-clean. Client.Go-005/009/010: migrated Dial/DialGalaxy off the deprecated grpc.DialContext/WithBlock to grpc.NewClient via a shared dial helper, with a DialTimeout-bounded readiness probe to keep fail-fast semantics; shared callContext deadline arithmetic; updated the stale Dial doc comment. Test harnesses use passthrough:///bufnet for the NewClient default-scheme change. Client.Go-006: added GatewayError.Code() and an IsTransient(err) helper so callers can classify transient gRPC failures. Client.Go-007: newCorrelationID no longer returns an empty id when crypto/rand fails — it falls back to a non-empty time+counter id. Client.Go-008: added coverage_test.go for transport-credential resolution, callContext deadline arithmetic, and native value/array edge kinds. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
309 lines
9.6 KiB
Go
309 lines
9.6 KiB
Go
// Package mxgateway is the Go client for the MXAccess Gateway gRPC service.
|
|
//
|
|
// The package wraps the generated gRPC contract with session-oriented helpers
|
|
// for invoking MXAccess commands, streaming events, and browsing the Galaxy
|
|
// Repository. Authentication uses an API-key bearer token attached as gRPC
|
|
// metadata on every call.
|
|
//
|
|
// Typical use opens a Client with Dial, opens a Session, invokes commands such
|
|
// as Register, AddItem, Advise, and Write, and consumes events via
|
|
// SubscribeEvents. Galaxy Repository browse RPCs are exposed through
|
|
// GalaxyClient.
|
|
package mxgateway
|
|
|
|
import (
|
|
"context"
|
|
"crypto/tls"
|
|
"errors"
|
|
"time"
|
|
|
|
pb "gitea.dohertylan.com/dohertj2/mxaccessgw/clients/go/internal/generated"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/connectivity"
|
|
"google.golang.org/grpc/credentials"
|
|
"google.golang.org/grpc/credentials/insecure"
|
|
"google.golang.org/protobuf/types/known/durationpb"
|
|
)
|
|
|
|
const (
|
|
defaultDialTimeout = 10 * time.Second
|
|
defaultCallTimeout = 30 * time.Second
|
|
)
|
|
|
|
// Client owns a gateway gRPC connection and exposes session-oriented helpers.
|
|
type Client struct {
|
|
conn *grpc.ClientConn
|
|
raw pb.MxAccessGatewayClient
|
|
opts Options
|
|
}
|
|
|
|
// Dial opens a gRPC connection to the gateway and configures auth metadata
|
|
// and transport security.
|
|
//
|
|
// The connection is created lazily with grpc.NewClient: the channel is not
|
|
// established until the first RPC (or the readiness probe below) needs it, so
|
|
// a gateway that is briefly unavailable at Dial time no longer turns into a
|
|
// hard error — the connection recovers when the gateway comes up. To preserve
|
|
// fail-fast behavior, Dial then runs an explicit readiness probe bounded by
|
|
// DialTimeout (default 10s, or ctx's deadline when sooner): it triggers the
|
|
// initial connect and waits for the channel to reach Ready, returning a
|
|
// *GatewayError if the gateway cannot be reached in that window. Cancelling
|
|
// ctx aborts the probe.
|
|
func Dial(ctx context.Context, opts Options) (*Client, error) {
|
|
conn, err := dial(ctx, opts)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return NewClient(conn, opts), nil
|
|
}
|
|
|
|
// dial builds the shared gRPC connection used by both Client and GalaxyClient:
|
|
// it resolves transport credentials, assembles dial options, creates a lazy
|
|
// connection with grpc.NewClient, and runs the DialTimeout-bounded readiness
|
|
// probe so callers still fail fast when the gateway is unreachable.
|
|
func dial(ctx context.Context, opts Options) (*grpc.ClientConn, error) {
|
|
if opts.Endpoint == "" {
|
|
return nil, errors.New("mxgateway: endpoint is required")
|
|
}
|
|
|
|
transportCredentials, err := resolveTransportCredentials(opts)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
dialOptions := []grpc.DialOption{
|
|
grpc.WithTransportCredentials(transportCredentials),
|
|
grpc.WithUnaryInterceptor(unaryAuthInterceptor(opts.APIKey)),
|
|
grpc.WithStreamInterceptor(streamAuthInterceptor(opts.APIKey)),
|
|
}
|
|
dialOptions = append(dialOptions, opts.DialOptions...)
|
|
|
|
conn, err := grpc.NewClient(opts.Endpoint, dialOptions...)
|
|
if err != nil {
|
|
return nil, &GatewayError{Op: "dial", Err: err}
|
|
}
|
|
|
|
if err := waitForReady(ctx, conn, opts.DialTimeout); err != nil {
|
|
_ = conn.Close()
|
|
return nil, &GatewayError{Op: "dial", Err: err}
|
|
}
|
|
|
|
return conn, nil
|
|
}
|
|
|
|
// waitForReady triggers the initial connect on conn and blocks until the
|
|
// channel reaches connectivity.Ready, the timeout elapses, or ctx is
|
|
// cancelled. The wait is bounded by dialTimeout when positive, otherwise by
|
|
// ctx's existing deadline, otherwise by defaultDialTimeout.
|
|
func waitForReady(ctx context.Context, conn *grpc.ClientConn, dialTimeout time.Duration) error {
|
|
probeCtx := ctx
|
|
cancel := func() {}
|
|
if dialTimeout > 0 {
|
|
probeCtx, cancel = context.WithTimeout(ctx, dialTimeout)
|
|
} else if _, ok := ctx.Deadline(); !ok {
|
|
probeCtx, cancel = context.WithTimeout(ctx, defaultDialTimeout)
|
|
}
|
|
defer cancel()
|
|
|
|
conn.Connect()
|
|
for {
|
|
state := conn.GetState()
|
|
if state == connectivity.Ready {
|
|
return nil
|
|
}
|
|
if !conn.WaitForStateChange(probeCtx, state) {
|
|
return probeCtx.Err()
|
|
}
|
|
}
|
|
}
|
|
|
|
// NewClient wraps an existing gRPC connection. The caller owns closing conn
|
|
// unless it calls Close on the returned Client.
|
|
func NewClient(conn *grpc.ClientConn, opts Options) *Client {
|
|
return &Client{
|
|
conn: conn,
|
|
raw: pb.NewMxAccessGatewayClient(conn),
|
|
opts: opts,
|
|
}
|
|
}
|
|
|
|
// RawClient returns the generated gRPC client for command-specific parity tests.
|
|
func (c *Client) RawClient() RawGatewayClient {
|
|
return c.raw
|
|
}
|
|
|
|
// OpenSession creates a gateway-backed MXAccess session.
|
|
func (c *Client) OpenSession(ctx context.Context, opts OpenSessionOptions) (*Session, error) {
|
|
reply, err := c.OpenSessionRaw(ctx, opts.Request())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return newSession(c, reply), nil
|
|
}
|
|
|
|
// OpenSessionRaw sends a raw OpenSession request and validates protocol status.
|
|
func (c *Client) OpenSessionRaw(ctx context.Context, req *OpenSessionRequest) (*OpenSessionReply, error) {
|
|
if req == nil {
|
|
return nil, errors.New("mxgateway: open session request is required")
|
|
}
|
|
|
|
callCtx, cancel := c.callContext(ctx)
|
|
defer cancel()
|
|
|
|
reply, err := c.raw.OpenSession(callCtx, req)
|
|
if err != nil {
|
|
return nil, &GatewayError{Op: "open session", Err: err}
|
|
}
|
|
if err := EnsureProtocolSuccess("open session", reply.GetProtocolStatus(), nil); err != nil {
|
|
return reply, err
|
|
}
|
|
|
|
return reply, nil
|
|
}
|
|
|
|
// Invoke sends a raw MXAccess command request and validates protocol and
|
|
// MXAccess status fields while preserving the raw reply on typed errors.
|
|
func (c *Client) Invoke(ctx context.Context, req *MxCommandRequest) (*MxCommandReply, error) {
|
|
if req == nil {
|
|
return nil, errors.New("mxgateway: command request is required")
|
|
}
|
|
|
|
callCtx, cancel := c.callContext(ctx)
|
|
defer cancel()
|
|
|
|
reply, err := c.raw.Invoke(callCtx, req)
|
|
if err != nil {
|
|
return nil, &GatewayError{Op: "invoke", Err: err}
|
|
}
|
|
if err := EnsureProtocolSuccess("invoke", reply.GetProtocolStatus(), reply); err != nil {
|
|
return reply, err
|
|
}
|
|
if err := EnsureMxAccessSuccess("invoke", reply); err != nil {
|
|
return reply, err
|
|
}
|
|
|
|
return reply, nil
|
|
}
|
|
|
|
// CloseSessionRaw sends a raw CloseSession request and validates protocol
|
|
// status.
|
|
func (c *Client) CloseSessionRaw(ctx context.Context, req *CloseSessionRequest) (*CloseSessionReply, error) {
|
|
if req == nil {
|
|
return nil, errors.New("mxgateway: close session request is required")
|
|
}
|
|
|
|
callCtx, cancel := c.callContext(ctx)
|
|
defer cancel()
|
|
|
|
reply, err := c.raw.CloseSession(callCtx, req)
|
|
if err != nil {
|
|
return nil, &GatewayError{Op: "close session", Err: err}
|
|
}
|
|
if err := EnsureProtocolSuccess("close session", reply.GetProtocolStatus(), nil); err != nil {
|
|
return reply, err
|
|
}
|
|
|
|
return reply, nil
|
|
}
|
|
|
|
// StreamEventsRaw starts the generated event stream for callers that need direct
|
|
// control over Recv.
|
|
func (c *Client) StreamEventsRaw(ctx context.Context, req *StreamEventsRequest) (RawEventStream, error) {
|
|
if req == nil {
|
|
return nil, errors.New("mxgateway: stream events request is required")
|
|
}
|
|
|
|
stream, err := c.raw.StreamEvents(ctx, req)
|
|
if err != nil {
|
|
return nil, &GatewayError{Op: "stream events", Err: err}
|
|
}
|
|
|
|
return stream, nil
|
|
}
|
|
|
|
// Close closes the underlying gRPC connection.
|
|
func (c *Client) Close() error {
|
|
if c == nil || c.conn == nil {
|
|
return nil
|
|
}
|
|
|
|
return c.conn.Close()
|
|
}
|
|
|
|
func (c *Client) callContext(ctx context.Context) (context.Context, context.CancelFunc) {
|
|
return callContext(ctx, c.opts.CallTimeout)
|
|
}
|
|
|
|
// callContext derives a per-RPC context from ctx, applying callTimeout: zero
|
|
// uses defaultCallTimeout, a negative value disables the bound entirely, and a
|
|
// caller-supplied deadline that is already sooner than the derived timeout is
|
|
// kept as-is rather than being lengthened.
|
|
func callContext(ctx context.Context, callTimeout time.Duration) (context.Context, context.CancelFunc) {
|
|
timeout := callTimeout
|
|
if timeout == 0 {
|
|
timeout = defaultCallTimeout
|
|
}
|
|
if timeout < 0 {
|
|
return ctx, func() {}
|
|
}
|
|
if deadline, ok := ctx.Deadline(); ok {
|
|
timeoutDeadline := time.Now().Add(timeout)
|
|
if deadline.Before(timeoutDeadline) {
|
|
return ctx, func() {}
|
|
}
|
|
}
|
|
return context.WithTimeout(ctx, timeout)
|
|
}
|
|
|
|
func resolveTransportCredentials(opts Options) (credentials.TransportCredentials, error) {
|
|
if opts.TransportCredentials != nil {
|
|
return opts.TransportCredentials, nil
|
|
}
|
|
if opts.Plaintext {
|
|
return insecure.NewCredentials(), nil
|
|
}
|
|
if opts.CACertFile != "" {
|
|
return credentials.NewClientTLSFromFile(opts.CACertFile, opts.ServerNameOverride)
|
|
}
|
|
if opts.TLSConfig != nil {
|
|
cfg := opts.TLSConfig.Clone()
|
|
if opts.ServerNameOverride != "" {
|
|
cfg.ServerName = opts.ServerNameOverride
|
|
}
|
|
return credentials.NewTLS(cfg), nil
|
|
}
|
|
|
|
return credentials.NewTLS(&tls.Config{
|
|
MinVersion: tls.VersionTLS12,
|
|
ServerName: opts.ServerNameOverride,
|
|
}), nil
|
|
}
|
|
|
|
// OpenSessionOptions describes fields used to create an OpenSessionRequest.
|
|
type OpenSessionOptions struct {
|
|
// RequestedBackend selects the gateway worker backend (empty for default).
|
|
RequestedBackend string
|
|
// ClientSessionName is a human-readable name recorded on the session.
|
|
ClientSessionName string
|
|
// ClientCorrelationID echoes through gateway logs and replies for tracing.
|
|
ClientCorrelationID string
|
|
// CommandTimeout sets the per-command timeout the gateway forwards to the
|
|
// worker; zero leaves the gateway default in place.
|
|
CommandTimeout time.Duration
|
|
}
|
|
|
|
// Request returns the raw protobuf OpenSessionRequest for these options.
|
|
func (o OpenSessionOptions) Request() *OpenSessionRequest {
|
|
req := &OpenSessionRequest{
|
|
RequestedBackend: o.RequestedBackend,
|
|
ClientSessionName: o.ClientSessionName,
|
|
ClientCorrelationId: o.ClientCorrelationID,
|
|
}
|
|
if o.CommandTimeout > 0 {
|
|
req.CommandTimeout = durationpb.New(o.CommandTimeout)
|
|
}
|
|
return req
|
|
}
|