diff --git a/clients/go/README.md b/clients/go/README.md index 497409a..5b0783d 100644 --- a/clients/go/README.md +++ b/clients/go/README.md @@ -90,6 +90,19 @@ events may be lost. Raw protobuf messages remain available through the `errors.As` for `GatewayError`, `CommandError`, and `MxAccessError`; command errors preserve the raw reply. +`Dial` and `DialGalaxy` create the connection lazily (`grpc.NewClient`): a +gateway that is briefly unavailable no longer turns into a hard error — the +connection recovers once the gateway comes up. To keep fail-fast behavior, +both run a readiness probe bounded by `DialTimeout` (default 10s, or the +context deadline when sooner) and return a `*GatewayError` if the gateway +cannot be reached in that window. + +For retry, timeout, and auth handling, `GatewayError.Code()` exposes the +wrapped gRPC `codes.Code`, and `mxgateway.IsTransient(err)` reports whether a +failure (`Unavailable`, `DeadlineExceeded`, `ResourceExhausted`, `Aborted`) +may succeed on retry — so callers do not have to unwrap the error and call +`status.Code` themselves. + ## Galaxy Repository browse The `GalaxyRepository` service (proto package `galaxy_repository.v1`) is a diff --git a/clients/go/mxgateway/alarms_test.go b/clients/go/mxgateway/alarms_test.go index 46b0c1f..5c5ae1f 100644 --- a/clients/go/mxgateway/alarms_test.go +++ b/clients/go/mxgateway/alarms_test.go @@ -150,8 +150,8 @@ func TestQueryActiveAlarmsPassesFilterPrefix(t *testing.T) { defer cleanup() stream, err := client.QueryActiveAlarms(context.Background(), &pb.QueryActiveAlarmsRequest{ - SessionId: "session-1", - AlarmFilterPrefix: "Tank01.", + SessionId: "session-1", + AlarmFilterPrefix: "Tank01.", }) if err != nil { t.Fatalf("QueryActiveAlarms() error = %v", err) @@ -221,8 +221,10 @@ func newBufconnClientWithAlarms(t *testing.T, fake *fakeGatewayWithAlarms) (*Cli dialer := func(ctx context.Context, _ string) (net.Conn, error) { return listener.DialContext(ctx) } + // grpc.NewClient defaults to the dns scheme; use passthrough so the + // bufconn fake target reaches the context dialer unresolved. client, err := Dial(context.Background(), Options{ - Endpoint: "bufnet", + Endpoint: "passthrough:///bufnet", APIKey: "test-api-key", Plaintext: true, DialOptions: []grpc.DialOption{grpc.WithContextDialer(dialer)}, diff --git a/clients/go/mxgateway/client.go b/clients/go/mxgateway/client.go index 2aac029..8c00032 100644 --- a/clients/go/mxgateway/client.go +++ b/clients/go/mxgateway/client.go @@ -19,6 +19,7 @@ import ( pb "gitea.dohertylan.com/dohertj2/mxaccessgw/clients/go/internal/generated" "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" "google.golang.org/protobuf/types/known/durationpb" @@ -36,22 +37,36 @@ type Client struct { opts Options } -// Dial opens a gRPC connection to the gateway and configures auth metadata, -// transport security, and blocking dial cancellation from ctx. +// Dial opens a gRPC connection to the gateway and configures auth metadata +// and transport security. +// +// The connection is created lazily with grpc.NewClient: the channel is not +// established until the first RPC (or the readiness probe below) needs it, so +// a gateway that is briefly unavailable at Dial time no longer turns into a +// hard error — the connection recovers when the gateway comes up. To preserve +// fail-fast behavior, Dial then runs an explicit readiness probe bounded by +// DialTimeout (default 10s, or ctx's deadline when sooner): it triggers the +// initial connect and waits for the channel to reach Ready, returning a +// *GatewayError if the gateway cannot be reached in that window. Cancelling +// ctx aborts the probe. func Dial(ctx context.Context, opts Options) (*Client, error) { + conn, err := dial(ctx, opts) + if err != nil { + return nil, err + } + + return NewClient(conn, opts), nil +} + +// dial builds the shared gRPC connection used by both Client and GalaxyClient: +// it resolves transport credentials, assembles dial options, creates a lazy +// connection with grpc.NewClient, and runs the DialTimeout-bounded readiness +// probe so callers still fail fast when the gateway is unreachable. +func dial(ctx context.Context, opts Options) (*grpc.ClientConn, error) { if opts.Endpoint == "" { return nil, errors.New("mxgateway: endpoint is required") } - dialCtx := ctx - cancel := func() {} - if opts.DialTimeout > 0 { - dialCtx, cancel = context.WithTimeout(ctx, opts.DialTimeout) - } else if _, ok := ctx.Deadline(); !ok { - dialCtx, cancel = context.WithTimeout(ctx, defaultDialTimeout) - } - defer cancel() - transportCredentials, err := resolveTransportCredentials(opts) if err != nil { return nil, err @@ -61,16 +76,46 @@ func Dial(ctx context.Context, opts Options) (*Client, error) { grpc.WithTransportCredentials(transportCredentials), grpc.WithUnaryInterceptor(unaryAuthInterceptor(opts.APIKey)), grpc.WithStreamInterceptor(streamAuthInterceptor(opts.APIKey)), - grpc.WithBlock(), } dialOptions = append(dialOptions, opts.DialOptions...) - conn, err := grpc.DialContext(dialCtx, opts.Endpoint, dialOptions...) + conn, err := grpc.NewClient(opts.Endpoint, dialOptions...) if err != nil { return nil, &GatewayError{Op: "dial", Err: err} } - return NewClient(conn, opts), nil + if err := waitForReady(ctx, conn, opts.DialTimeout); err != nil { + _ = conn.Close() + return nil, &GatewayError{Op: "dial", Err: err} + } + + return conn, nil +} + +// waitForReady triggers the initial connect on conn and blocks until the +// channel reaches connectivity.Ready, the timeout elapses, or ctx is +// cancelled. The wait is bounded by dialTimeout when positive, otherwise by +// ctx's existing deadline, otherwise by defaultDialTimeout. +func waitForReady(ctx context.Context, conn *grpc.ClientConn, dialTimeout time.Duration) error { + probeCtx := ctx + cancel := func() {} + if dialTimeout > 0 { + probeCtx, cancel = context.WithTimeout(ctx, dialTimeout) + } else if _, ok := ctx.Deadline(); !ok { + probeCtx, cancel = context.WithTimeout(ctx, defaultDialTimeout) + } + defer cancel() + + conn.Connect() + for { + state := conn.GetState() + if state == connectivity.Ready { + return nil + } + if !conn.WaitForStateChange(probeCtx, state) { + return probeCtx.Err() + } + } } // NewClient wraps an existing gRPC connection. The caller owns closing conn @@ -188,7 +233,15 @@ func (c *Client) Close() error { } func (c *Client) callContext(ctx context.Context) (context.Context, context.CancelFunc) { - timeout := c.opts.CallTimeout + return callContext(ctx, c.opts.CallTimeout) +} + +// callContext derives a per-RPC context from ctx, applying callTimeout: zero +// uses defaultCallTimeout, a negative value disables the bound entirely, and a +// caller-supplied deadline that is already sooner than the derived timeout is +// kept as-is rather than being lengthened. +func callContext(ctx context.Context, callTimeout time.Duration) (context.Context, context.CancelFunc) { + timeout := callTimeout if timeout == 0 { timeout = defaultCallTimeout } diff --git a/clients/go/mxgateway/client_session_test.go b/clients/go/mxgateway/client_session_test.go index 17c3ff5..016caa8 100644 --- a/clients/go/mxgateway/client_session_test.go +++ b/clients/go/mxgateway/client_session_test.go @@ -292,8 +292,11 @@ func newBufconnClient(t *testing.T, fake *fakeGatewayServer) (*Client, func()) { dialer := func(ctx context.Context, _ string) (net.Conn, error) { return listener.DialContext(ctx) } + // grpc.NewClient defaults the target scheme to dns; the bufconn fake name + // is not DNS-resolvable, so use the passthrough scheme to hand the target + // straight to the context dialer. client, err := Dial(context.Background(), Options{ - Endpoint: "bufnet", + Endpoint: "passthrough:///bufnet", APIKey: "test-api-key", Plaintext: true, DialOptions: []grpc.DialOption{ diff --git a/clients/go/mxgateway/coverage_test.go b/clients/go/mxgateway/coverage_test.go new file mode 100644 index 0000000..45ffe9a --- /dev/null +++ b/clients/go/mxgateway/coverage_test.go @@ -0,0 +1,401 @@ +package mxgateway + +import ( + "context" + "crypto/tls" + "errors" + "net" + "reflect" + "strings" + "testing" + "time" + + pb "gitea.dohertylan.com/dohertj2/mxaccessgw/clients/go/internal/generated" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/timestamppb" +) + +// --- Client.Go-008: resolveTransportCredentials precedence ----------------- + +// TestResolveTransportCredentialsPrecedence covers every branch of +// resolveTransportCredentials, which previously only had the Plaintext path +// exercised. +func TestResolveTransportCredentialsPrecedence(t *testing.T) { + custom := insecure.NewCredentials() + + t.Run("TransportCredentialsWins", func(t *testing.T) { + creds, err := resolveTransportCredentials(Options{ + TransportCredentials: custom, + Plaintext: true, // must be ignored + }) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if creds != custom { + t.Fatal("expected the explicit TransportCredentials to be returned as-is") + } + }) + + t.Run("Plaintext", func(t *testing.T) { + creds, err := resolveTransportCredentials(Options{Plaintext: true}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if got := creds.Info().SecurityProtocol; got != "insecure" { + t.Fatalf("expected insecure credentials, got security protocol %q", got) + } + }) + + t.Run("CACertFileMissingErrors", func(t *testing.T) { + _, err := resolveTransportCredentials(Options{CACertFile: "does-not-exist.pem"}) + if err == nil { + t.Fatal("expected an error for a missing CA cert file") + } + }) + + t.Run("TLSConfigWithServerNameOverride", func(t *testing.T) { + creds, err := resolveTransportCredentials(Options{ + TLSConfig: &tls.Config{MinVersion: tls.VersionTLS13}, + ServerNameOverride: "gateway.internal", + }) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if got := creds.Info().ServerName; got != "gateway.internal" { + t.Fatalf("expected ServerName override to be applied, got %q", got) + } + }) + + t.Run("DefaultTLSFloor", func(t *testing.T) { + creds, err := resolveTransportCredentials(Options{ServerNameOverride: "host"}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if got := creds.Info().SecurityProtocol; got != "tls" { + t.Fatalf("expected the default TLS credentials, got %q", got) + } + }) +} + +// TestResolveTransportCredentialsDoesNotMutateTLSConfig confirms the supplied +// TLSConfig is cloned, not mutated, when ServerNameOverride is applied. +func TestResolveTransportCredentialsDoesNotMutateTLSConfig(t *testing.T) { + cfg := &tls.Config{MinVersion: tls.VersionTLS12} + if _, err := resolveTransportCredentials(Options{ + TLSConfig: cfg, + ServerNameOverride: "override", + }); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if cfg.ServerName != "" { + t.Fatalf("resolveTransportCredentials mutated the caller's TLSConfig (ServerName=%q)", cfg.ServerName) + } +} + +// --- Client.Go-008: callContext deadline arithmetic ------------------------ + +// TestCallContextDeadlineArithmetic covers the shared callContext deadline +// logic, including the negative-timeout disable case and the +// caller-deadline-is-sooner case. +func TestCallContextDeadlineArithmetic(t *testing.T) { + t.Run("ZeroUsesDefault", func(t *testing.T) { + ctx, cancel := callContext(context.Background(), 0) + defer cancel() + deadline, ok := ctx.Deadline() + if !ok { + t.Fatal("expected a deadline for the default timeout") + } + remaining := time.Until(deadline) + if remaining <= 0 || remaining > defaultCallTimeout+time.Second { + t.Fatalf("default deadline out of range: %v", remaining) + } + }) + + t.Run("NegativeDisablesBound", func(t *testing.T) { + base := context.Background() + ctx, cancel := callContext(base, -1) + defer cancel() + if _, ok := ctx.Deadline(); ok { + t.Fatal("a negative timeout must disable the deadline entirely") + } + if ctx != base { + t.Fatal("a negative timeout must return the caller context unchanged") + } + }) + + t.Run("PositiveAppliesTimeout", func(t *testing.T) { + ctx, cancel := callContext(context.Background(), 5*time.Second) + defer cancel() + deadline, ok := ctx.Deadline() + if !ok { + t.Fatal("expected a deadline") + } + remaining := time.Until(deadline) + if remaining <= 0 || remaining > 5*time.Second+time.Second { + t.Fatalf("deadline out of range: %v", remaining) + } + }) + + t.Run("CallerDeadlineSoonerIsKept", func(t *testing.T) { + base, baseCancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer baseCancel() + ctx, cancel := callContext(base, 30*time.Second) + defer cancel() + if ctx != base { + t.Fatal("a caller deadline sooner than the timeout must be kept as-is") + } + }) + + t.Run("CallerDeadlineLaterIsShortened", func(t *testing.T) { + base, baseCancel := context.WithTimeout(context.Background(), time.Hour) + defer baseCancel() + ctx, cancel := callContext(base, time.Second) + defer cancel() + deadline, ok := ctx.Deadline() + if !ok { + t.Fatal("expected a deadline") + } + if remaining := time.Until(deadline); remaining > 2*time.Second { + t.Fatalf("expected the shorter timeout to win, got %v remaining", remaining) + } + }) +} + +// --- Client.Go-008: NativeValue / NativeArray edge branches ---------------- + +// TestNativeValueEdgeKinds covers the array, raw-bytes, null, and +// nil-input branches of NativeValue. +func TestNativeValueEdgeKinds(t *testing.T) { + t.Run("NilInput", func(t *testing.T) { + got, err := NativeValue(nil) + if err != nil || got != nil { + t.Fatalf("NativeValue(nil) = (%v, %v), want (nil, nil)", got, err) + } + }) + + t.Run("ExplicitNull", func(t *testing.T) { + got, err := NativeValue(&pb.MxValue{IsNull: true}) + if err != nil || got != nil { + t.Fatalf("NativeValue(null) = (%v, %v), want (nil, nil)", got, err) + } + }) + + t.Run("RawBytes", func(t *testing.T) { + raw := []byte{0x01, 0x02, 0x03} + got, err := NativeValue(&pb.MxValue{Kind: &pb.MxValue_RawValue{RawValue: raw}}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + gotBytes, ok := got.([]byte) + if !ok || !reflect.DeepEqual(gotBytes, raw) { + t.Fatalf("NativeValue raw = %v, want %v", got, raw) + } + // The result must be a copy, not aliasing the protobuf field. + gotBytes[0] = 0xFF + if raw[0] != 0x01 { + t.Fatal("NativeValue raw result aliases the protobuf backing array") + } + }) + + t.Run("ArrayValue", func(t *testing.T) { + value := &pb.MxValue{Kind: &pb.MxValue_ArrayValue{ + ArrayValue: &pb.MxArray{Values: &pb.MxArray_Int32Values{ + Int32Values: &pb.Int32Array{Values: []int32{7, 8}}, + }}, + }} + got, err := NativeValue(value) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !reflect.DeepEqual(got, []int32{7, 8}) { + t.Fatalf("NativeValue array = %v, want [7 8]", got) + } + }) +} + +// TestNativeArrayEdgeKinds covers the nil, raw-bytes, timestamp-with-nil, and +// unsupported-kind branches of NativeArray. +func TestNativeArrayEdgeKinds(t *testing.T) { + t.Run("NilInput", func(t *testing.T) { + got, err := NativeArray(nil) + if err != nil || got != nil { + t.Fatalf("NativeArray(nil) = (%v, %v), want (nil, nil)", got, err) + } + }) + + t.Run("RawValues", func(t *testing.T) { + got, err := NativeArray(&pb.MxArray{Values: &pb.MxArray_RawValues{ + RawValues: &pb.RawArray{Values: [][]byte{{0x0A}, {0x0B}}}, + }}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + want := [][]byte{{0x0A}, {0x0B}} + if !reflect.DeepEqual(got, want) { + t.Fatalf("NativeArray raw = %v, want %v", got, want) + } + }) + + t.Run("TimestampWithNilEntry", func(t *testing.T) { + got, err := NativeArray(&pb.MxArray{Values: &pb.MxArray_TimestampValues{ + TimestampValues: &pb.TimestampArray{Values: []*timestamppb.Timestamp{nil}}, + }}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + times, ok := got.([]time.Time) + if !ok || len(times) != 1 || !times[0].IsZero() { + t.Fatalf("NativeArray timestamp-with-nil = %v, want [zero-time]", got) + } + }) + + t.Run("UnsupportedKind", func(t *testing.T) { + // An MxArray with no oneof set hits the default branch. + _, err := NativeArray(&pb.MxArray{}) + if err == nil { + t.Fatal("expected an error for an MxArray with no values set") + } + if !strings.Contains(err.Error(), "unsupported array value kind") { + t.Fatalf("unexpected error text: %v", err) + } + }) +} + +// TestNativeValueUnsupportedKind covers the default branch of NativeValue. +func TestNativeValueUnsupportedKind(t *testing.T) { + // An MxValue with no oneof Kind set and IsNull false hits the default. + _, err := NativeValue(&pb.MxValue{}) + if err == nil { + t.Fatal("expected an error for an MxValue with no kind set") + } + if !strings.Contains(err.Error(), "unsupported value kind") { + t.Fatalf("unexpected error text: %v", err) + } +} + +// --- Client.Go-005: dial migration ----------------------------------------- + +// TestDialFailsFastWhenGatewayUnreachable confirms that after the migration to +// grpc.NewClient the DialTimeout-bounded readiness probe still fails fast (and +// wraps the failure in *GatewayError) when the gateway cannot be reached. +func TestDialFailsFastWhenGatewayUnreachable(t *testing.T) { + dialer := func(ctx context.Context, _ string) (net.Conn, error) { + return nil, errors.New("connection refused") + } + start := time.Now() + client, err := Dial(context.Background(), Options{ + Endpoint: "passthrough:///unreachable", + APIKey: "k", + Plaintext: true, + DialTimeout: 500 * time.Millisecond, + DialOptions: []grpc.DialOption{grpc.WithContextDialer(dialer)}, + }) + elapsed := time.Since(start) + if err == nil { + client.Close() + t.Fatal("expected Dial to fail for an unreachable gateway") + } + var gwErr *GatewayError + if !errors.As(err, &gwErr) || gwErr.Op != "dial" { + t.Fatalf("expected a *GatewayError with Op=dial, got %#v", err) + } + if elapsed > 5*time.Second { + t.Fatalf("Dial did not honor DialTimeout: took %v", elapsed) + } +} + +// TestDialReadinessProbeReachesReady confirms the readiness probe succeeds +// against a live (bufconn) gateway, i.e. the lazy grpc.NewClient connection is +// driven to Ready before Dial returns. +func TestDialReadinessProbeReachesReady(t *testing.T) { + client, cleanup := newBufconnClient(t, &fakeGatewayServer{ + openReply: &pb.OpenSessionReply{}, + }) + defer cleanup() + if client == nil { + t.Fatal("expected a connected client") + } +} + +// --- Client.Go-006: error taxonomy ---------------------------------------- + +// TestGatewayErrorCode confirms GatewayError.Code surfaces the wrapped gRPC +// status code without the caller unwrapping it. +func TestGatewayErrorCode(t *testing.T) { + var nilErr *GatewayError + if got := nilErr.Code(); got != codes.OK { + t.Fatalf("nil GatewayError.Code() = %v, want OK", got) + } + + gwErr := &GatewayError{Op: "invoke", Err: status.Error(codes.Unavailable, "down")} + if got := gwErr.Code(); got != codes.Unavailable { + t.Fatalf("GatewayError.Code() = %v, want Unavailable", got) + } + + plain := &GatewayError{Op: "dial", Err: errors.New("boom")} + if got := plain.Code(); got != codes.Unknown { + t.Fatalf("GatewayError.Code() for a non-status error = %v, want Unknown", got) + } +} + +// TestIsTransient verifies the transient/permanent classification including +// the unwrap-through-GatewayError path. +func TestIsTransient(t *testing.T) { + tests := []struct { + name string + err error + want bool + }{ + {name: "nil", err: nil, want: false}, + {name: "unavailable wrapped", err: &GatewayError{Op: "invoke", Err: status.Error(codes.Unavailable, "x")}, want: true}, + {name: "deadline wrapped", err: &GatewayError{Op: "invoke", Err: status.Error(codes.DeadlineExceeded, "x")}, want: true}, + {name: "resource exhausted", err: &GatewayError{Err: status.Error(codes.ResourceExhausted, "x")}, want: true}, + {name: "unauthenticated permanent", err: &GatewayError{Err: status.Error(codes.Unauthenticated, "x")}, want: false}, + {name: "invalid argument permanent", err: &GatewayError{Err: status.Error(codes.InvalidArgument, "x")}, want: false}, + {name: "bare status unavailable", err: status.Error(codes.Unavailable, "x"), want: true}, + {name: "plain error", err: errors.New("nope"), want: false}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := IsTransient(tt.err); got != tt.want { + t.Fatalf("IsTransient(%v) = %v, want %v", tt.err, got, tt.want) + } + }) + } +} + +// --- Client.Go-007: correlation id fallback -------------------------------- + +// TestNewCorrelationIDUsesRandEntropy confirms the happy path yields a +// 32-hex-character id. +func TestNewCorrelationIDUsesRandEntropy(t *testing.T) { + id := newCorrelationID() + if len(id) != 32 { + t.Fatalf("expected a 32-char hex id, got %q (len %d)", id, len(id)) + } +} + +// TestNewCorrelationIDFallsBackOnRandFailure reproduces Client.Go-007: when +// crypto/rand fails, newCorrelationID must not return an empty string but a +// unique, non-empty fallback id so the command stays traceable. +func TestNewCorrelationIDFallsBackOnRandFailure(t *testing.T) { + original := randRead + randRead = func([]byte) (int, error) { return 0, errors.New("entropy unavailable") } + defer func() { randRead = original }() + + first := newCorrelationID() + second := newCorrelationID() + + if first == "" || second == "" { + t.Fatal("newCorrelationID returned an empty id on rand failure") + } + if !strings.HasPrefix(first, "fallback-") { + t.Fatalf("expected a fallback- prefixed id, got %q", first) + } + if first == second { + t.Fatalf("fallback correlation ids must be unique, got %q twice", first) + } +} diff --git a/clients/go/mxgateway/errors.go b/clients/go/mxgateway/errors.go index 4ef4f7e..1773ec3 100644 --- a/clients/go/mxgateway/errors.go +++ b/clients/go/mxgateway/errors.go @@ -5,6 +5,8 @@ import ( "fmt" pb "gitea.dohertylan.com/dohertj2/mxaccessgw/clients/go/internal/generated" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) // ErrEventBufferOverflow is the terminal error delivered on the compatibility @@ -42,6 +44,45 @@ func (e *GatewayError) Unwrap() error { return e.Err } +// Code returns the gRPC status code of the wrapped transport error. It returns +// codes.OK when the error is nil and codes.Unknown when the wrapped error does +// not carry a gRPC status. Callers can use it to write retry, timeout, and +// auth handling without manually unwrapping and re-parsing the error. +func (e *GatewayError) Code() codes.Code { + if e == nil || e.Err == nil { + return codes.OK + } + return status.Code(e.Err) +} + +// IsTransient reports whether err is a transport failure that may succeed on +// retry — for example a gateway that is briefly Unavailable or a call that +// hit a DeadlineExceeded. Permanent failures (Unauthenticated, PermissionDenied, +// InvalidArgument, NotFound, and similar) return false. It unwraps through +// *GatewayError and any other error chain carrying a gRPC status, so callers +// do not need to call status.Code themselves. +func IsTransient(err error) bool { + if err == nil { + return false + } + switch transientCode(err) { + case codes.Unavailable, codes.DeadlineExceeded, codes.ResourceExhausted, codes.Aborted: + return true + default: + return false + } +} + +// transientCode extracts a gRPC status code from err, preferring a wrapped +// *GatewayError's Code and otherwise falling back to status.Code on the chain. +func transientCode(err error) codes.Code { + var gatewayErr *GatewayError + if errors.As(err, &gatewayErr) { + return gatewayErr.Code() + } + return status.Code(err) +} + // CommandError reports a non-OK gateway protocol status and keeps the raw // command reply when one exists. type CommandError struct { diff --git a/clients/go/mxgateway/galaxy.go b/clients/go/mxgateway/galaxy.go index a5da3d4..892a6dc 100644 --- a/clients/go/mxgateway/galaxy.go +++ b/clients/go/mxgateway/galaxy.go @@ -2,7 +2,6 @@ package mxgateway import ( "context" - "errors" "io" "time" @@ -56,39 +55,13 @@ type GalaxyClient struct { // DialGalaxy opens a gRPC connection to the gateway for the Galaxy Repository // service. It applies the same authentication metadata, transport security, -// and dial-timeout behavior as Dial. +// lazy connection, and DialTimeout-bounded readiness probe as Dial. func DialGalaxy(ctx context.Context, opts Options) (*GalaxyClient, error) { - if opts.Endpoint == "" { - return nil, errors.New("mxgateway: endpoint is required") - } - - dialCtx := ctx - cancel := func() {} - if opts.DialTimeout > 0 { - dialCtx, cancel = context.WithTimeout(ctx, opts.DialTimeout) - } else if _, ok := ctx.Deadline(); !ok { - dialCtx, cancel = context.WithTimeout(ctx, defaultDialTimeout) - } - defer cancel() - - transportCredentials, err := resolveTransportCredentials(opts) + conn, err := dial(ctx, opts) if err != nil { return nil, err } - dialOptions := []grpc.DialOption{ - grpc.WithTransportCredentials(transportCredentials), - grpc.WithUnaryInterceptor(unaryAuthInterceptor(opts.APIKey)), - grpc.WithStreamInterceptor(streamAuthInterceptor(opts.APIKey)), - grpc.WithBlock(), - } - dialOptions = append(dialOptions, opts.DialOptions...) - - conn, err := grpc.DialContext(dialCtx, opts.Endpoint, dialOptions...) - if err != nil { - return nil, &GatewayError{Op: "dial", Err: err} - } - return NewGalaxyClient(conn, opts), nil } @@ -239,18 +212,5 @@ func (c *GalaxyClient) Close() error { } func (c *GalaxyClient) callContext(ctx context.Context) (context.Context, context.CancelFunc) { - timeout := c.opts.CallTimeout - if timeout == 0 { - timeout = defaultCallTimeout - } - if timeout < 0 { - return ctx, func() {} - } - if deadline, ok := ctx.Deadline(); ok { - timeoutDeadline := time.Now().Add(timeout) - if deadline.Before(timeoutDeadline) { - return ctx, func() {} - } - } - return context.WithTimeout(ctx, timeout) + return callContext(ctx, c.opts.CallTimeout) } diff --git a/clients/go/mxgateway/galaxy_test.go b/clients/go/mxgateway/galaxy_test.go index bffe014..185db1b 100644 --- a/clients/go/mxgateway/galaxy_test.go +++ b/clients/go/mxgateway/galaxy_test.go @@ -55,8 +55,8 @@ func TestGalaxyGetLastDeployTimeReturnsTimestampWhenPresent(t *testing.T) { want := time.Date(2026, 4, 28, 12, 34, 56, 0, time.UTC) fake := &fakeGalaxyServer{ deployReply: &pb.GetLastDeployTimeReply{ - Present: true, - TimeOfLastDeploy: timestamppb.New(want), + Present: true, + TimeOfLastDeploy: timestamppb.New(want), }, } client, cleanup := newGalaxyBufconnClient(t, fake) @@ -348,8 +348,10 @@ func newGalaxyBufconnClient(t *testing.T, fake *fakeGalaxyServer) (*GalaxyClient dialer := func(ctx context.Context, _ string) (net.Conn, error) { return listener.DialContext(ctx) } + // grpc.NewClient defaults to the dns scheme; use passthrough so the + // bufconn fake target reaches the context dialer unresolved. client, err := DialGalaxy(context.Background(), Options{ - Endpoint: "bufnet", + Endpoint: "passthrough:///bufnet", APIKey: "test-api-key", Plaintext: true, DialOptions: []grpc.DialOption{ diff --git a/clients/go/mxgateway/session.go b/clients/go/mxgateway/session.go index 4959f78..81cc2fd 100644 --- a/clients/go/mxgateway/session.go +++ b/clients/go/mxgateway/session.go @@ -8,6 +8,8 @@ import ( "fmt" "io" "sync" + "sync/atomic" + "time" pb "gitea.dohertylan.com/dohertj2/mxaccessgw/clients/go/internal/generated" "google.golang.org/grpc/codes" @@ -547,10 +549,25 @@ func (s *Session) invokeCommand(ctx context.Context, command *MxCommand) (*MxCom }) } +// correlationIDCounter backs the deterministic fallback id used when +// crypto/rand is unavailable, so every command still carries a unique, +// traceable correlation id. +var correlationIDCounter atomic.Uint64 + +// randRead is the entropy source for newCorrelationID. It is a package +// variable solely so tests can simulate a crypto/rand failure. +var randRead = rand.Read + +// newCorrelationID returns a unique correlation id for an MxCommandRequest. +// It prefers 16 bytes of crypto/rand entropy; if rand.Read fails (rare) it +// falls back to a "fallback-" prefixed id built from the current time and a +// process-wide monotonic counter rather than returning an empty string, which +// would leave the command untraceable in gateway logs. func newCorrelationID() string { var buffer [16]byte - if _, err := rand.Read(buffer[:]); err != nil { - return "" + if _, err := randRead(buffer[:]); err != nil { + return fmt.Sprintf("fallback-%x-%x", + time.Now().UnixNano(), correlationIDCounter.Add(1)) } return hex.EncodeToString(buffer[:]) } diff --git a/code-reviews/Client.Go/findings.md b/code-reviews/Client.Go/findings.md index a158f4e..3f36292 100644 --- a/code-reviews/Client.Go/findings.md +++ b/code-reviews/Client.Go/findings.md @@ -7,7 +7,7 @@ | Review date | 2026-05-18 | | Commit reviewed | `3cc53a8` | | Status | Reviewed | -| Open findings | 7 | +| Open findings | 0 | ## Checklist coverage @@ -78,13 +78,13 @@ | Severity | Low | | Category | mxaccessgw conventions | | Location | `clients/go/mxgateway/alarms_test.go:153-154`, `clients/go/mxgateway/galaxy_test.go:58-59` | -| Status | Open | +| Status | Resolved | **Description:** `gofmt -l` flags `alarms_test.go` and `galaxy_test.go` for misaligned struct-literal field padding. The Go client README lists `gofmt` as part of the workflow and the repo enforces style; unformatted committed code breaks `gofmt`-gated checks and CI. **Recommendation:** Run `gofmt -w mxgateway/alarms_test.go mxgateway/galaxy_test.go`. -**Resolution:** _(open)_ +**Resolution:** Resolved 2026-05-18: confirmed `gofmt -l .` flagged both files for misaligned struct-literal padding. Ran `gofmt -w` on `mxgateway/alarms_test.go` and `mxgateway/galaxy_test.go`; `gofmt -l .` is now clean for the whole module. ### Client.Go-005 @@ -93,13 +93,13 @@ | Severity | Low | | Category | Design-document adherence | | Location | `clients/go/mxgateway/client.go:64,68`, `clients/go/mxgateway/galaxy.go:83,87` | -| Status | Open | +| Status | Resolved | **Description:** The client uses `grpc.DialContext` with `grpc.WithBlock()`. In current grpc-go both are deprecated in favour of `grpc.NewClient` (lazy connection). `WithBlock` also changes failure semantics: a transient gateway-unavailable at dial time becomes a hard `Dial` error rather than a connection that recovers when the gateway comes up, working against the design doc's resilience intent. **Recommendation:** Migrate to `grpc.NewClient`; if a fail-fast connect probe is still wanted, do an explicit readiness wait bounded by `DialTimeout`, and update the doc comment. -**Resolution:** _(open)_ +**Resolution:** Resolved 2026-05-18: confirmed `Dial`/`DialGalaxy` used the deprecated `grpc.DialContext` + `grpc.WithBlock` pair. Migrated both to the shared `dial(ctx, opts)` helper, which now builds a lazy connection with `grpc.NewClient` and runs an explicit `waitForReady` readiness probe (`Connect` + `WaitForStateChange` until `connectivity.Ready`) bounded by `DialTimeout` — preserving fail-fast behavior while letting an otherwise lazy connection recover when the gateway is briefly down. Note: `grpc.NewClient` defaults the target scheme to `dns`, so the bufconn test harnesses (`client_session_test.go`, `alarms_test.go`, `galaxy_test.go`) were updated to use `passthrough:///bufnet` so the fake target reaches the context dialer. New tests `TestDialFailsFastWhenGatewayUnreachable` and `TestDialReadinessProbeReachesReady` cover the probe; `go vet` reports no deprecation. `clients/go/README.md` documents the lazy-connect + readiness-probe semantics. ### Client.Go-006 @@ -108,13 +108,13 @@ | Severity | Low | | Category | Error handling & resilience | | Location | `clients/go/mxgateway/errors.go:9-130` | -| Status | Open | +| Status | Resolved | **Description:** `docs/ClientLibrariesDesign.md` recommends a high-level error taxonomy (`TransportError`, `AuthenticationError`, `TimeoutError`, etc.). The Go client collapses all transport/gRPC failures into a single `GatewayError` with no way to classify transient (`Unavailable`, `DeadlineExceeded`) vs permanent (`Unauthenticated`, `InvalidArgument`) without manually unwrapping and calling `status.Code`. **Recommendation:** Add a helper (e.g. `IsTransient(err) bool`) or expose the gRPC `codes.Code` on `GatewayError`, so retry/timeout/auth handling can be written without re-parsing the wrapped error. -**Resolution:** _(open)_ +**Resolution:** Resolved 2026-05-18: implemented the recommended classification surface in `errors.go` rather than a full parallel type hierarchy (the existing `GatewayError`/`CommandError`/`MxAccessError` chain already separates transport from protocol from MXAccess failures). Added `GatewayError.Code()` (returns the wrapped gRPC `codes.Code`, `OK` for nil, `Unknown` for a non-status error) and the free function `IsTransient(err error) bool`, which unwraps through `*GatewayError` and any gRPC-status chain and reports `true` for `Unavailable`, `DeadlineExceeded`, `ResourceExhausted`, and `Aborted`. Tests `TestGatewayErrorCode` and `TestIsTransient` cover the matrix; `clients/go/README.md` documents both for retry/timeout/auth handling. ### Client.Go-007 @@ -123,13 +123,13 @@ | Severity | Low | | Category | Correctness & logic bugs | | Location | `clients/go/mxgateway/session.go:526-532` | -| Status | Open | +| Status | Resolved | **Description:** `newCorrelationID` returns an empty string when `crypto/rand.Read` fails, silently producing an `MxCommandRequest` with no correlation id. `rand.Read` failure is rare, but the failure mode (untraceable command, no error surfaced) is worse than failing loud, and the empty-id path is untested. **Recommendation:** Either propagate the error up through `invokeCommand`, or fall back to a time/counter-based id rather than an empty string. -**Resolution:** _(open)_ +**Resolution:** Resolved 2026-05-18: confirmed `newCorrelationID` returned `""` on a `rand.Read` failure. It now falls back to a non-empty `"fallback--"` id built from `time.Now().UnixNano()` and a process-wide `atomic.Uint64` monotonic counter, so every command stays traceable even without entropy. The `crypto/rand` call was routed through a `randRead` package variable so the failure path is testable; `TestNewCorrelationIDFallsBackOnRandFailure` simulates a `rand.Read` failure and asserts the fallback id is non-empty, `fallback-` prefixed, and unique, and `TestNewCorrelationIDUsesRandEntropy` pins the happy path. ### Client.Go-008 @@ -138,13 +138,13 @@ | Severity | Low | | Category | Testing coverage | | Location | `clients/go/mxgateway/` (test files) | -| Status | Open | +| Status | Resolved | **Description:** Several critical paths are untested: TLS credential resolution in `resolveTransportCredentials` (only the `Plaintext` path is exercised); the `callContext` deadline-shortening logic (`client.go:198-204`) including the negative-timeout disable case; and `NativeValue`/`NativeArray` for the array, raw-bytes, null, and unsupported-kind branches. **Recommendation:** Add unit tests for `resolveTransportCredentials` precedence, `callContext` deadline arithmetic, and `NativeValue`/`NativeArray` round-trips for every kind. -**Resolution:** _(open)_ +**Resolution:** Resolved 2026-05-18: added `clients/go/mxgateway/coverage_test.go`. `TestResolveTransportCredentialsPrecedence` exercises every branch (explicit `TransportCredentials`, `Plaintext`, missing `CACertFile` error, `TLSConfig` + `ServerNameOverride`, default TLS floor) and `TestResolveTransportCredentialsDoesNotMutateTLSConfig` confirms the supplied `*tls.Config` is cloned. `TestCallContextDeadlineArithmetic` covers zero/default, negative-disable, positive timeout, caller-deadline-sooner-kept, and caller-deadline-later-shortened. `TestNativeValueEdgeKinds`, `TestNativeArrayEdgeKinds`, and `TestNativeValueUnsupportedKind` cover the null, raw-bytes (including the no-alias copy), array, timestamp-with-nil, and unsupported-kind branches. ### Client.Go-009 @@ -153,13 +153,13 @@ | Severity | Low | | Category | Code organization & conventions | | Location | `clients/go/mxgateway/galaxy.go:60-93,241-256`, `clients/go/mxgateway/client.go:41-74,190-205` | -| Status | Open | +| Status | Resolved | **Description:** `DialGalaxy`/`Dial` and `GalaxyClient.callContext`/`Client.callContext` are near-identical duplicates (dial-context setup, credential resolution, dial-option assembly, deadline arithmetic). A fix to one (e.g. the Client.Go-005 dial migration) must be applied twice and can drift. **Recommendation:** Extract a shared unexported `dial(ctx, opts)` and a free `callContext(opts, ctx)` function, and have both client constructors call them. -**Resolution:** _(open)_ +**Resolution:** Resolved 2026-05-18: extracted the shared unexported `dial(ctx, opts) (*grpc.ClientConn, error)` (credential resolution, dial-option assembly, `grpc.NewClient`, readiness probe) and the free `callContext(ctx, callTimeout) (context.Context, context.CancelFunc)` into `client.go`. `Dial`/`DialGalaxy` and both `(*Client).callContext`/`(*GalaxyClient).callContext` methods now delegate to them; the duplicated dial and deadline code in `galaxy.go` was removed (its now-unused `errors` import dropped). This was done together with the Client.Go-005 migration so the `grpc.NewClient` change lives in exactly one place. ### Client.Go-010 @@ -168,10 +168,10 @@ | Severity | Low | | Category | Documentation & comments | | Location | `clients/go/mxgateway/client.go:39-40` | -| Status | Open | +| Status | Resolved | **Description:** The `Dial` doc comment states it configures "blocking dial cancellation from ctx." This describes the deprecated `WithBlock` behaviour; once Client.Go-005 is addressed the comment is misleading about how connection establishment and cancellation work. **Recommendation:** Reword to describe the actual connect/timeout semantics after resolving Client.Go-005, and clarify that `DialTimeout` bounds the initial connect attempt. -**Resolution:** _(open)_ +**Resolution:** Resolved 2026-05-18: alongside the Client.Go-005 migration, the `Dial` doc comment was rewritten to describe the lazy `grpc.NewClient` connection, the `DialTimeout`-bounded (default 10s, or ctx deadline when sooner) readiness probe, that a briefly-unavailable gateway recovers instead of producing a hard error, and that cancelling `ctx` aborts the probe. `DialGalaxy` and the new `dial`/`waitForReady`/`callContext` helpers carry matching doc comments.