"""Typed exception model for MXAccess Gateway Python clients.""" from __future__ import annotations from typing import Any import grpc from .generated import mxaccess_gateway_pb2 as pb class MxGatewayError(Exception): """Base class for client wrapper errors.""" def __init__( self, message: str, *, protocol_status: pb.ProtocolStatus | None = None, raw_reply: Any | None = None, ) -> None: super().__init__(message) self.protocol_status = protocol_status self.raw_reply = raw_reply class MxGatewayTransportError(MxGatewayError): """Transport-level gRPC failure.""" class MxGatewayAuthenticationError(MxGatewayTransportError): """Authentication failure reported by gRPC.""" class MxGatewayAuthorizationError(MxGatewayTransportError): """Authorization failure reported by gRPC.""" class MxGatewaySessionError(MxGatewayError): """Gateway session failure.""" class MxGatewayWorkerError(MxGatewayError): """Gateway worker process or protocol failure.""" class MxGatewayCommandError(MxGatewayError): """Command failure that preserves the raw protobuf reply.""" class MxAccessError(MxGatewayCommandError): """MXAccess HRESULT or status failure.""" def map_rpc_error(operation: str, error: grpc.RpcError) -> MxGatewayTransportError: """Map a generated gRPC exception to the client exception hierarchy.""" code = error.code() if hasattr(error, "code") else None details = error.details() if hasattr(error, "details") else str(error) message = f"{operation} failed: {details}" if code == grpc.StatusCode.UNAUTHENTICATED: return MxGatewayAuthenticationError(message) if code == grpc.StatusCode.PERMISSION_DENIED: return MxGatewayAuthorizationError(message) return MxGatewayTransportError(message) def ensure_protocol_success( operation: str, protocol_status: pb.ProtocolStatus | None, raw_reply: Any | None = None, ) -> Any | None: """Raise typed gateway errors for non-OK protocol statuses.""" code = ( protocol_status.code if protocol_status is not None else pb.PROTOCOL_STATUS_CODE_UNSPECIFIED ) if code in ( pb.PROTOCOL_STATUS_CODE_OK, pb.PROTOCOL_STATUS_CODE_MXACCESS_FAILURE, ): return raw_reply message_text = protocol_status.message if protocol_status else "" message = f"{operation} failed: {message_text or pb.ProtocolStatusCode.Name(code)}" if code in ( pb.PROTOCOL_STATUS_CODE_SESSION_NOT_FOUND, pb.PROTOCOL_STATUS_CODE_SESSION_NOT_READY, ): raise MxGatewaySessionError( message, protocol_status=protocol_status, raw_reply=raw_reply, ) if code in ( pb.PROTOCOL_STATUS_CODE_WORKER_UNAVAILABLE, pb.PROTOCOL_STATUS_CODE_TIMEOUT, pb.PROTOCOL_STATUS_CODE_CANCELED, pb.PROTOCOL_STATUS_CODE_PROTOCOL_VIOLATION, ): raise MxGatewayWorkerError( message, protocol_status=protocol_status, raw_reply=raw_reply, ) raise MxGatewayCommandError( message, protocol_status=protocol_status, raw_reply=raw_reply, ) def ensure_mxaccess_success(operation: str, reply: pb.MxCommandReply) -> pb.MxCommandReply: """Raise `MxAccessError` when MXAccess returned HRESULT or status failure.""" status = reply.protocol_status if status.code == pb.PROTOCOL_STATUS_CODE_MXACCESS_FAILURE: raise MxAccessError( _mxaccess_message(operation, reply), protocol_status=status, raw_reply=reply, ) if reply.HasField("hresult") and reply.hresult < 0: raise MxAccessError( _mxaccess_message(operation, reply), protocol_status=status, raw_reply=reply, ) for mx_status in reply.statuses: if mx_status.success == 0: raise MxAccessError( _mxaccess_message(operation, reply), protocol_status=status, raw_reply=reply, ) return reply def _mxaccess_message(operation: str, reply: pb.MxCommandReply) -> str: status_text = reply.protocol_status.message or "MXAccess command failed" hresult = reply.hresult if reply.HasField("hresult") else None return ( f"{operation} failed: {status_text}; " f"session={reply.session_id}; correlation={reply.correlation_id}; " f"hresult={hresult}; statuses={len(reply.statuses)}" )