//! High-level wrapper around the generated `MxAccessGateway` gRPC client. //! //! [`GatewayClient::connect`] builds an authenticated `tonic` channel using //! the supplied [`ClientOptions`], applies the bearer-token interceptor, and //! exposes typed methods for the unary and streaming RPCs. Most application //! code should prefer [`GatewayClient::open_session`] and the [`Session`] //! handle it returns, rather than the `*_raw` methods. use std::fs; use tonic::codegen::InterceptedService; use tonic::transport::{Certificate, Channel, ClientTlsConfig}; use tonic::Request; use crate::auth::AuthInterceptor; use crate::error::{ensure_command_success, ensure_protocol_success, Error}; use crate::generated::mxaccess_gateway::v1::mx_access_gateway_client::MxAccessGatewayClient; use crate::generated::mxaccess_gateway::v1::{ AcknowledgeAlarmReply, AcknowledgeAlarmRequest, ActiveAlarmSnapshot, CloseSessionReply, CloseSessionRequest, MxCommandReply, MxCommandRequest, MxEvent, OpenSessionReply, OpenSessionRequest, QueryActiveAlarmsRequest, StreamEventsRequest, }; use crate::options::ClientOptions; use crate::session::Session; /// Generated gateway client wrapped in the auth interceptor that /// [`GatewayClient`] uses internally. pub type RawGatewayClient = MxAccessGatewayClient>; /// Pinned, boxed [`MxEvent`] stream returned by /// [`GatewayClient::stream_events`]. Errors are pre-mapped from /// `tonic::Status` to [`Error`]; dropping the stream cancels the call. pub type EventStream = std::pin::Pin> + Send + 'static>>; /// Pinned, boxed [`ActiveAlarmSnapshot`] stream returned by /// [`GatewayClient::query_active_alarms`]. Errors are pre-mapped from /// `tonic::Status` to [`Error`]; dropping the stream cancels the call. pub type ActiveAlarmStream = std::pin::Pin< Box> + Send + 'static>, >; /// Thin async wrapper around the generated gateway client. /// /// The wrapper is `Clone`: every clone shares the underlying tonic channel /// (cheap, reference-counted) and the same call/stream timeouts. It is /// designed to be cheap enough to clone per request handler. #[derive(Clone)] pub struct GatewayClient { inner: RawGatewayClient, call_timeout: std::time::Duration, stream_timeout: Option, } impl GatewayClient { /// Connect to the gateway endpoint described by `options` and return a /// ready-to-use client. /// /// # Errors /// /// Returns [`Error::InvalidEndpoint`] if the endpoint URL or CA file is /// malformed, and [`Error::Transport`] if the TCP/TLS handshake fails. pub async fn connect(options: ClientOptions) -> Result { let mut endpoint = Channel::from_shared(options.endpoint().to_owned()).map_err(|source| { Error::InvalidEndpoint { endpoint: options.endpoint().to_owned(), detail: source.to_string(), } })?; endpoint = endpoint.connect_timeout(options.connect_timeout()); if !options.plaintext() { let mut tls = ClientTlsConfig::new(); if let Some(server_name) = options.server_name_override() { tls = tls.domain_name(server_name.to_owned()); } if let Some(ca_file) = options.ca_file() { let certificate = fs::read(ca_file).map_err(|source| Error::InvalidEndpoint { endpoint: options.endpoint().to_owned(), detail: format!("failed to read CA file {}: {source}", ca_file.display()), })?; tls = tls.ca_certificate(Certificate::from_pem(certificate)); } endpoint = endpoint.tls_config(tls)?; } let channel = endpoint.connect().await?; let interceptor = AuthInterceptor::new(options.api_key().cloned()); let max_grpc_message_bytes = options.max_grpc_message_bytes(); Ok(Self { inner: MxAccessGatewayClient::with_interceptor(channel, interceptor) .max_decoding_message_size(max_grpc_message_bytes) .max_encoding_message_size(max_grpc_message_bytes), call_timeout: options.call_timeout(), stream_timeout: options.stream_timeout(), }) } /// Borrow the underlying generated client. Use this only when you need /// access to RPCs not surfaced by the wrapper. pub fn raw_client(&mut self) -> &mut RawGatewayClient { &mut self.inner } /// Consume the wrapper and return the underlying generated client. pub fn into_inner(self) -> RawGatewayClient { self.inner } /// Build a [`Session`] handle from a previously opened session id. No /// RPC is performed — this is the cheap counterpart to /// [`GatewayClient::open_session`] for callers that already own the id. pub fn session(&self, session_id: impl Into) -> Session { Session::new(session_id, self.clone()) } /// Issue an `OpenSession` RPC and return the raw reply without /// validating its `protocol_status`. /// /// # Errors /// /// Returns the `tonic::Status` mapped through [`Error::from`]. pub async fn open_session_raw( &self, request: OpenSessionRequest, ) -> Result { let mut client = self.inner.clone(); let response = client.open_session(self.unary_request(request)).await?; Ok(response.into_inner()) } /// Open a session, validate its `protocol_status`, and return a typed /// [`Session`] handle bound to this client. /// /// # Errors /// /// Returns [`Error::ProtocolStatus`] if the gateway accepts the call /// but reports a non-OK protocol status, plus any of the /// [`Error`] variants produced by [`open_session_raw`](Self::open_session_raw). pub async fn open_session(&self, request: OpenSessionRequest) -> Result { let reply = self.open_session_raw(request).await?; ensure_protocol_success("open session", reply.protocol_status.as_ref())?; Ok(Session::new(reply.session_id, self.clone())) } /// Issue a `CloseSession` RPC and return the raw reply. /// /// # Errors /// /// Returns the `tonic::Status` mapped through [`Error::from`]. pub async fn close_session_raw( &self, request: CloseSessionRequest, ) -> Result { let mut client = self.inner.clone(); let response = client.close_session(self.unary_request(request)).await?; Ok(response.into_inner()) } /// Issue an `Invoke` RPC and return the raw reply, even when the /// command-level protocol status is non-OK. /// /// # Errors /// /// Returns the `tonic::Status` mapped through [`Error::from`]. pub async fn invoke_raw(&self, request: MxCommandRequest) -> Result { let mut client = self.inner.clone(); let response = client.invoke(self.unary_request(request)).await?; Ok(response.into_inner()) } /// Issue an `Invoke` RPC and surface a non-OK reply as /// [`Error::Command`]. /// /// # Errors /// /// Returns [`Error::Command`] when the reply's `protocol_status` is not /// `Ok`, plus any errors propagated by /// [`invoke_raw`](Self::invoke_raw). pub async fn invoke(&self, request: MxCommandRequest) -> Result { ensure_command_success(self.invoke_raw(request).await?) } /// Open the server-streaming `StreamEvents` RPC. /// /// The returned [`EventStream`] yields `MxEvent` messages as the worker /// produces them. Dropping the stream cancels the gRPC call cooperatively. /// /// # Errors /// /// Returns the `tonic::Status` mapped through [`Error::from`] if the /// server rejects the subscription. pub async fn stream_events(&self, request: StreamEventsRequest) -> Result { let mut client = self.inner.clone(); let response = client.stream_events(self.stream_request(request)).await?; let stream = futures_util::StreamExt::map(response.into_inner(), |result| { result.map_err(Error::from) }); Ok(Box::pin(stream)) } /// Acknowledge an active MXAccess alarm condition through the gateway. /// /// The gateway authenticates the request against the API key's /// `invoke:alarm-ack` scope and forwards the acknowledge to the worker's /// MXAccess session; the resulting native MxStatus is returned in the /// reply. Acks are idempotent at the MxAccess layer. /// /// # Errors /// /// Returns [`Error::ProtocolStatus`] when the gateway accepts the call but /// reports a non-OK protocol status, plus any of the [`Error`] variants /// produced by transport failures. pub async fn acknowledge_alarm( &self, request: AcknowledgeAlarmRequest, ) -> Result { let mut client = self.inner.clone(); let response = client .acknowledge_alarm(self.unary_request(request)) .await?; let reply = response.into_inner(); ensure_protocol_success("acknowledge alarm", reply.protocol_status.as_ref())?; Ok(reply) } /// Open the server-streaming `QueryActiveAlarms` RPC — the gateway's /// ConditionRefresh equivalent. /// /// The returned [`ActiveAlarmStream`] yields one [`ActiveAlarmSnapshot`] /// per currently-active alarm. Dropping the stream cancels the gRPC call /// cooperatively. Optional alarm-reference prefix scoping /// (`request.alarm_filter_prefix`) limits the stream to a sub-tree. /// /// # Errors /// /// Returns the `tonic::Status` mapped through [`Error::from`] if the /// server rejects the request. pub async fn query_active_alarms( &self, request: QueryActiveAlarmsRequest, ) -> Result { let mut client = self.inner.clone(); let response = client .query_active_alarms(self.stream_request(request)) .await?; let stream = futures_util::StreamExt::map(response.into_inner(), |result| { result.map_err(Error::from) }); Ok(Box::pin(stream)) } fn unary_request(&self, message: T) -> Request { let mut request = Request::new(message); request.set_timeout(self.call_timeout); request } fn stream_request(&self, message: T) -> Request { let mut request = Request::new(message); if let Some(timeout) = self.stream_timeout { request.set_timeout(timeout); } request } }