328 lines
14 KiB
Rust
328 lines
14 KiB
Rust
//! High-level wrapper around the generated `MxAccessGateway` gRPC client.
|
|
//!
|
|
//! [`GatewayClient::connect`] builds an authenticated `tonic` channel using
|
|
//! the supplied [`ClientOptions`], applies the bearer-token interceptor, and
|
|
//! exposes typed methods for the unary and streaming RPCs. Most application
|
|
//! code should prefer [`GatewayClient::open_session`] and the [`Session`]
|
|
//! handle it returns, rather than the `*_raw` methods.
|
|
|
|
use std::fs;
|
|
|
|
use tonic::codegen::InterceptedService;
|
|
use tonic::transport::{Certificate, Channel, ClientTlsConfig};
|
|
use tonic::Request;
|
|
|
|
use crate::auth::AuthInterceptor;
|
|
use crate::error::{ensure_command_success, ensure_protocol_success, Error};
|
|
use crate::generated::mxaccess_gateway::v1::mx_access_gateway_client::MxAccessGatewayClient;
|
|
use crate::generated::mxaccess_gateway::v1::{
|
|
AcknowledgeAlarmReply, AcknowledgeAlarmRequest, ActiveAlarmSnapshot, AlarmFeedMessage,
|
|
CloseSessionReply, CloseSessionRequest, MxCommandReply, MxCommandRequest, MxEvent,
|
|
OpenSessionReply, OpenSessionRequest, QueryActiveAlarmsRequest, StreamAlarmsRequest,
|
|
StreamEventsRequest,
|
|
};
|
|
use crate::options::ClientOptions;
|
|
use crate::session::Session;
|
|
|
|
/// Generated gateway client wrapped in the auth interceptor that
|
|
/// [`GatewayClient`] uses internally.
|
|
pub type RawGatewayClient = MxAccessGatewayClient<InterceptedService<Channel, AuthInterceptor>>;
|
|
|
|
/// Pinned, boxed [`MxEvent`] stream returned by
|
|
/// [`GatewayClient::stream_events`]. Errors are pre-mapped from
|
|
/// `tonic::Status` to [`Error`]; dropping the stream cancels the call.
|
|
pub type EventStream =
|
|
std::pin::Pin<Box<dyn futures_core::Stream<Item = Result<MxEvent, Error>> + Send + 'static>>;
|
|
|
|
/// Pinned, boxed [`ActiveAlarmSnapshot`] stream returned by
|
|
/// [`GatewayClient::query_active_alarms`]. Errors are pre-mapped from
|
|
/// `tonic::Status` to [`Error`]; dropping the stream cancels the call.
|
|
pub type ActiveAlarmStream = std::pin::Pin<
|
|
Box<dyn futures_core::Stream<Item = Result<ActiveAlarmSnapshot, Error>> + Send + 'static>,
|
|
>;
|
|
|
|
/// Pinned, boxed [`AlarmFeedMessage`] stream returned by
|
|
/// [`GatewayClient::stream_alarms`]. Errors are pre-mapped from
|
|
/// `tonic::Status` to [`Error`]; dropping the stream cancels the call.
|
|
pub type AlarmFeedStream = std::pin::Pin<
|
|
Box<dyn futures_core::Stream<Item = Result<AlarmFeedMessage, Error>> + Send + 'static>,
|
|
>;
|
|
|
|
/// Thin async wrapper around the generated gateway client.
|
|
///
|
|
/// The wrapper is `Clone`: every clone shares the underlying tonic channel
|
|
/// (cheap, reference-counted) and the same call/stream timeouts. It is
|
|
/// designed to be cheap enough to clone per request handler.
|
|
#[derive(Clone)]
|
|
pub struct GatewayClient {
|
|
inner: RawGatewayClient,
|
|
call_timeout: std::time::Duration,
|
|
stream_timeout: Option<std::time::Duration>,
|
|
}
|
|
|
|
impl GatewayClient {
|
|
/// Connect to the gateway endpoint described by `options` and return a
|
|
/// ready-to-use client.
|
|
///
|
|
/// # Errors
|
|
///
|
|
/// Returns [`Error::InvalidEndpoint`] if the endpoint URL or CA file is
|
|
/// malformed, and [`Error::Transport`] if the TCP/TLS handshake fails.
|
|
pub async fn connect(options: ClientOptions) -> Result<Self, Error> {
|
|
let mut endpoint =
|
|
Channel::from_shared(options.endpoint().to_owned()).map_err(|source| {
|
|
Error::InvalidEndpoint {
|
|
endpoint: options.endpoint().to_owned(),
|
|
detail: source.to_string(),
|
|
}
|
|
})?;
|
|
endpoint = endpoint.connect_timeout(options.connect_timeout());
|
|
|
|
if !options.plaintext() {
|
|
let mut tls = ClientTlsConfig::new();
|
|
if let Some(server_name) = options.server_name_override() {
|
|
tls = tls.domain_name(server_name.to_owned());
|
|
}
|
|
if let Some(ca_file) = options.ca_file() {
|
|
let certificate = fs::read(ca_file).map_err(|source| Error::InvalidEndpoint {
|
|
endpoint: options.endpoint().to_owned(),
|
|
detail: format!("failed to read CA file {}: {source}", ca_file.display()),
|
|
})?;
|
|
tls = tls.ca_certificate(Certificate::from_pem(certificate));
|
|
} else if !options.require_certificate_validation() {
|
|
// Lenient-default fallback (Rust pin-only exception): tonic
|
|
// 0.13's `ClientTlsConfig` builds its rustls verifier inside a
|
|
// crate-private connector and exposes no hook for a custom
|
|
// `ServerCertVerifier`, so — unlike the other clients — the
|
|
// Rust client cannot accept an arbitrary self-signed cert. Pin
|
|
// the gateway's CA instead, or opt into strict verification
|
|
// against the system trust roots. We reject here rather than
|
|
// silently verifying against system roots (which would fail a
|
|
// self-signed gateway with a confusing handshake error).
|
|
return Err(Error::InvalidEndpoint {
|
|
endpoint: options.endpoint().to_owned(),
|
|
detail: "TLS requested without a pinned CA. The Rust client cannot accept an \
|
|
arbitrary self-signed certificate (tonic 0.13 exposes no custom \
|
|
rustls verifier). Pin the gateway certificate with \
|
|
ClientOptions::with_ca_file, or call \
|
|
ClientOptions::with_require_certificate_validation(true) to verify \
|
|
against the system trust roots."
|
|
.to_owned(),
|
|
});
|
|
}
|
|
endpoint = endpoint.tls_config(tls)?;
|
|
}
|
|
|
|
let channel = endpoint.connect().await?;
|
|
let interceptor = AuthInterceptor::new(options.api_key().cloned());
|
|
let max_grpc_message_bytes = options.max_grpc_message_bytes();
|
|
|
|
Ok(Self {
|
|
inner: MxAccessGatewayClient::with_interceptor(channel, interceptor)
|
|
.max_decoding_message_size(max_grpc_message_bytes)
|
|
.max_encoding_message_size(max_grpc_message_bytes),
|
|
call_timeout: options.call_timeout(),
|
|
stream_timeout: options.stream_timeout(),
|
|
})
|
|
}
|
|
|
|
/// Borrow the underlying generated client. Use this only when you need
|
|
/// access to RPCs not surfaced by the wrapper.
|
|
pub fn raw_client(&mut self) -> &mut RawGatewayClient {
|
|
&mut self.inner
|
|
}
|
|
|
|
/// Consume the wrapper and return the underlying generated client.
|
|
pub fn into_inner(self) -> RawGatewayClient {
|
|
self.inner
|
|
}
|
|
|
|
/// Build a [`Session`] handle from a previously opened session id. No
|
|
/// RPC is performed — this is the cheap counterpart to
|
|
/// [`GatewayClient::open_session`] for callers that already own the id.
|
|
pub fn session(&self, session_id: impl Into<String>) -> Session {
|
|
Session::new(session_id, self.clone())
|
|
}
|
|
|
|
/// Issue an `OpenSession` RPC and return the raw reply without
|
|
/// validating its `protocol_status`.
|
|
///
|
|
/// # Errors
|
|
///
|
|
/// Returns the `tonic::Status` mapped through [`Error::from`].
|
|
pub async fn open_session_raw(
|
|
&self,
|
|
request: OpenSessionRequest,
|
|
) -> Result<OpenSessionReply, Error> {
|
|
let mut client = self.inner.clone();
|
|
let response = client.open_session(self.unary_request(request)).await?;
|
|
Ok(response.into_inner())
|
|
}
|
|
|
|
/// Open a session, validate its `protocol_status`, and return a typed
|
|
/// [`Session`] handle bound to this client.
|
|
///
|
|
/// # Errors
|
|
///
|
|
/// Returns [`Error::ProtocolStatus`] if the gateway accepts the call
|
|
/// but reports a non-OK protocol status, plus any of the
|
|
/// [`Error`] variants produced by [`open_session_raw`](Self::open_session_raw).
|
|
pub async fn open_session(&self, request: OpenSessionRequest) -> Result<Session, Error> {
|
|
let reply = self.open_session_raw(request).await?;
|
|
ensure_protocol_success("open session", reply.protocol_status.as_ref())?;
|
|
Ok(Session::new(reply.session_id, self.clone()))
|
|
}
|
|
|
|
/// Issue a `CloseSession` RPC and return the raw reply.
|
|
///
|
|
/// # Errors
|
|
///
|
|
/// Returns the `tonic::Status` mapped through [`Error::from`].
|
|
pub async fn close_session_raw(
|
|
&self,
|
|
request: CloseSessionRequest,
|
|
) -> Result<CloseSessionReply, Error> {
|
|
let mut client = self.inner.clone();
|
|
let response = client.close_session(self.unary_request(request)).await?;
|
|
Ok(response.into_inner())
|
|
}
|
|
|
|
/// Issue an `Invoke` RPC and return the raw reply, even when the
|
|
/// command-level protocol status is non-OK.
|
|
///
|
|
/// # Errors
|
|
///
|
|
/// Returns the `tonic::Status` mapped through [`Error::from`].
|
|
pub async fn invoke_raw(&self, request: MxCommandRequest) -> Result<MxCommandReply, Error> {
|
|
let mut client = self.inner.clone();
|
|
let response = client.invoke(self.unary_request(request)).await?;
|
|
Ok(response.into_inner())
|
|
}
|
|
|
|
/// Issue an `Invoke` RPC and surface a non-OK reply as
|
|
/// [`Error::Command`].
|
|
///
|
|
/// # Errors
|
|
///
|
|
/// Returns [`Error::Command`] when the reply's `protocol_status` is not
|
|
/// `Ok`, plus any errors propagated by
|
|
/// [`invoke_raw`](Self::invoke_raw).
|
|
pub async fn invoke(&self, request: MxCommandRequest) -> Result<MxCommandReply, Error> {
|
|
ensure_command_success(self.invoke_raw(request).await?)
|
|
}
|
|
|
|
/// Open the server-streaming `StreamEvents` RPC.
|
|
///
|
|
/// The returned [`EventStream`] yields `MxEvent` messages as the worker
|
|
/// produces them. Dropping the stream cancels the gRPC call cooperatively.
|
|
///
|
|
/// # Errors
|
|
///
|
|
/// Returns the `tonic::Status` mapped through [`Error::from`] if the
|
|
/// server rejects the subscription.
|
|
pub async fn stream_events(&self, request: StreamEventsRequest) -> Result<EventStream, Error> {
|
|
let mut client = self.inner.clone();
|
|
let response = client.stream_events(self.stream_request(request)).await?;
|
|
let stream = futures_util::StreamExt::map(response.into_inner(), |result| {
|
|
result.map_err(Error::from)
|
|
});
|
|
|
|
Ok(Box::pin(stream))
|
|
}
|
|
|
|
/// Acknowledge an active MXAccess alarm condition through the gateway.
|
|
///
|
|
/// The gateway authenticates the request against the API key's
|
|
/// `invoke:alarm-ack` scope and forwards the acknowledge to the worker's
|
|
/// MXAccess session; the resulting native MxStatus is returned in the
|
|
/// reply. Acks are idempotent at the MxAccess layer.
|
|
///
|
|
/// # Errors
|
|
///
|
|
/// Returns [`Error::ProtocolStatus`] when the gateway accepts the call but
|
|
/// reports a non-OK protocol status, plus any of the [`Error`] variants
|
|
/// produced by transport failures.
|
|
pub async fn acknowledge_alarm(
|
|
&self,
|
|
request: AcknowledgeAlarmRequest,
|
|
) -> Result<AcknowledgeAlarmReply, Error> {
|
|
let mut client = self.inner.clone();
|
|
let response = client
|
|
.acknowledge_alarm(self.unary_request(request))
|
|
.await?;
|
|
let reply = response.into_inner();
|
|
ensure_protocol_success("acknowledge alarm", reply.protocol_status.as_ref())?;
|
|
Ok(reply)
|
|
}
|
|
|
|
/// Open the server-streaming `QueryActiveAlarms` RPC — the gateway's
|
|
/// ConditionRefresh equivalent.
|
|
///
|
|
/// The returned [`ActiveAlarmStream`] yields one [`ActiveAlarmSnapshot`]
|
|
/// per currently-active alarm. Dropping the stream cancels the gRPC call
|
|
/// cooperatively. Optional alarm-reference prefix scoping
|
|
/// (`request.alarm_filter_prefix`) limits the stream to a sub-tree.
|
|
///
|
|
/// # Errors
|
|
///
|
|
/// Returns the `tonic::Status` mapped through [`Error::from`] if the
|
|
/// server rejects the request.
|
|
pub async fn query_active_alarms(
|
|
&self,
|
|
request: QueryActiveAlarmsRequest,
|
|
) -> Result<ActiveAlarmStream, Error> {
|
|
let mut client = self.inner.clone();
|
|
let response = client
|
|
.query_active_alarms(self.stream_request(request))
|
|
.await?;
|
|
let stream = futures_util::StreamExt::map(response.into_inner(), |result| {
|
|
result.map_err(Error::from)
|
|
});
|
|
|
|
Ok(Box::pin(stream))
|
|
}
|
|
|
|
/// Attach to the gateway's central `StreamAlarms` feed.
|
|
///
|
|
/// The returned [`AlarmFeedStream`] opens with one [`AlarmFeedMessage`]
|
|
/// per currently-active alarm (the ConditionRefresh snapshot), then a
|
|
/// single `snapshot_complete`, then a `transition` for every subsequent
|
|
/// raise / acknowledge / clear. It is served by the gateway's always-on
|
|
/// alarm monitor — no worker session is opened — so any number of clients
|
|
/// may attach. Dropping the stream cancels the gRPC call cooperatively.
|
|
/// Optional alarm-reference prefix scoping (`request.alarm_filter_prefix`)
|
|
/// limits the stream to a sub-tree.
|
|
///
|
|
/// # Errors
|
|
///
|
|
/// Returns the `tonic::Status` mapped through [`Error::from`] if the
|
|
/// server rejects the request.
|
|
pub async fn stream_alarms(
|
|
&self,
|
|
request: StreamAlarmsRequest,
|
|
) -> Result<AlarmFeedStream, Error> {
|
|
let mut client = self.inner.clone();
|
|
let response = client.stream_alarms(self.stream_request(request)).await?;
|
|
let stream = futures_util::StreamExt::map(response.into_inner(), |result| {
|
|
result.map_err(Error::from)
|
|
});
|
|
|
|
Ok(Box::pin(stream))
|
|
}
|
|
|
|
fn unary_request<T>(&self, message: T) -> Request<T> {
|
|
let mut request = Request::new(message);
|
|
request.set_timeout(self.call_timeout);
|
|
request
|
|
}
|
|
|
|
fn stream_request<T>(&self, message: T) -> Request<T> {
|
|
let mut request = Request::new(message);
|
|
if let Some(timeout) = self.stream_timeout {
|
|
request.set_timeout(timeout);
|
|
}
|
|
|
|
request
|
|
}
|
|
}
|