Files
mxaccessgw/clients/rust/src/client.rs
T

124 lines
4.4 KiB
Rust

use std::fs;
use tonic::codegen::InterceptedService;
use tonic::transport::{Certificate, Channel, ClientTlsConfig};
use tonic::Request;
use crate::auth::AuthInterceptor;
use crate::error::{ensure_command_success, Error};
use crate::generated::mxaccess_gateway::v1::mx_access_gateway_client::MxAccessGatewayClient;
use crate::generated::mxaccess_gateway::v1::{
CloseSessionReply, CloseSessionRequest, MxCommandReply, MxCommandRequest, MxEvent,
OpenSessionReply, OpenSessionRequest, StreamEventsRequest,
};
use crate::options::ClientOptions;
use crate::session::Session;
pub type RawGatewayClient = MxAccessGatewayClient<InterceptedService<Channel, AuthInterceptor>>;
pub type EventStream =
std::pin::Pin<Box<dyn futures_core::Stream<Item = Result<MxEvent, Error>> + Send + 'static>>;
/// Thin owner for the generated gateway client.
#[derive(Clone)]
pub struct GatewayClient {
inner: RawGatewayClient,
call_timeout: std::time::Duration,
}
impl GatewayClient {
pub async fn connect(options: ClientOptions) -> Result<Self, Error> {
let mut endpoint =
Channel::from_shared(options.endpoint().to_owned()).map_err(|source| {
Error::InvalidEndpoint {
endpoint: options.endpoint().to_owned(),
detail: source.to_string(),
}
})?;
endpoint = endpoint.connect_timeout(options.connect_timeout());
if !options.plaintext() {
let mut tls = ClientTlsConfig::new();
if let Some(server_name) = options.server_name_override() {
tls = tls.domain_name(server_name.to_owned());
}
if let Some(ca_file) = options.ca_file() {
let certificate = fs::read(ca_file).map_err(|source| Error::InvalidEndpoint {
endpoint: options.endpoint().to_owned(),
detail: format!("failed to read CA file {}: {source}", ca_file.display()),
})?;
tls = tls.ca_certificate(Certificate::from_pem(certificate));
}
endpoint = endpoint.tls_config(tls)?;
}
let channel = endpoint.connect().await?;
let interceptor = AuthInterceptor::new(options.api_key().cloned());
Ok(Self {
inner: MxAccessGatewayClient::with_interceptor(channel, interceptor),
call_timeout: options.call_timeout(),
})
}
pub fn raw_client(&mut self) -> &mut RawGatewayClient {
&mut self.inner
}
pub fn into_inner(self) -> RawGatewayClient {
self.inner
}
pub fn session(&self, session_id: impl Into<String>) -> Session {
Session::new(session_id, self.clone())
}
pub async fn open_session_raw(
&self,
request: OpenSessionRequest,
) -> Result<OpenSessionReply, Error> {
let mut client = self.inner.clone();
let response = client.open_session(self.unary_request(request)).await?;
Ok(response.into_inner())
}
pub async fn open_session(&self, request: OpenSessionRequest) -> Result<Session, Error> {
let reply = self.open_session_raw(request).await?;
Ok(Session::new(reply.session_id, self.clone()))
}
pub async fn close_session_raw(
&self,
request: CloseSessionRequest,
) -> Result<CloseSessionReply, Error> {
let mut client = self.inner.clone();
let response = client.close_session(self.unary_request(request)).await?;
Ok(response.into_inner())
}
pub async fn invoke_raw(&self, request: MxCommandRequest) -> Result<MxCommandReply, Error> {
let mut client = self.inner.clone();
let response = client.invoke(self.unary_request(request)).await?;
Ok(response.into_inner())
}
pub async fn invoke(&self, request: MxCommandRequest) -> Result<MxCommandReply, Error> {
ensure_command_success(self.invoke_raw(request).await?)
}
pub async fn stream_events(&self, request: StreamEventsRequest) -> Result<EventStream, Error> {
let mut client = self.inner.clone();
let response = client.stream_events(self.unary_request(request)).await?;
let stream = futures_util::StreamExt::map(response.into_inner(), |result| {
result.map_err(Error::from)
});
Ok(Box::pin(stream))
}
fn unary_request<T>(&self, message: T) -> Request<T> {
let mut request = Request::new(message);
request.set_timeout(self.call_timeout);
request
}
}