Files
mxaccessgw/clients/rust/src/client.rs
T
Joseph Doherty 0d8a28d2fe Fix all MxGateway.Client.Rust code-review findings
Resolves Client.Rust-001 through Client.Rust-011.

Build/test/clippy gate (Client.Rust-001/002/003):
- options.rs: doc comments on with_max_grpc_message_bytes /
  max_grpc_message_bytes (#![warn(missing_docs)])
- session.rs: rename BulkReplyKind variants to drop the shared `Bulk`
  suffix (clippy::enum_variant_names)
- galaxy.rs: deref instead of clone on Option<Timestamp>
  (clippy::clone_on_copy — an extra violation the gate also hit)
- mxgw-cli: assert version_json against GATEWAY/WORKER_PROTOCOL_VERSION
  constants instead of the stale literal 2
`cargo clippy --workspace --all-targets -- -D warnings` now passes.

Correctness / error handling:
- version.rs: CLIENT_VERSION = env!("CARGO_PKG_VERSION") (Client.Rust-004)
- session.rs: register/add_item/add_item2 handle extractors and
  bulk_results now return Err(Error::MalformedReply) instead of a
  silent 0 / empty vec on a shapeless OK reply (Client.Rust-005/006)
- error.rs: new Error::Unavailable classifies Code::Unavailable /
  ResourceExhausted as transient (Client.Rust-010)
- session.rs: per-call unique correlation ids via an atomic counter
  (Client.Rust-011)

Other:
- value.rs: MxValue/MxArrayValue compute the projection on demand
  instead of caching it, so a wire-only value pays no projection cost
  (Client.Rust-008)
- RustClientDesign.md: correct the crate layout, drop the unused
  `tracing` dependency (Client.Rust-007)
- client_behavior.rs: tests for the bulk-size cap, a mid-stream status
  fault, and the unreadable-CA-file path (Client.Rust-009)

cargo fmt / test --workspace (27 tests) / clippy all pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-18 17:08:55 -04:00

272 lines
11 KiB
Rust

//! High-level wrapper around the generated `MxAccessGateway` gRPC client.
//!
//! [`GatewayClient::connect`] builds an authenticated `tonic` channel using
//! the supplied [`ClientOptions`], applies the bearer-token interceptor, and
//! exposes typed methods for the unary and streaming RPCs. Most application
//! code should prefer [`GatewayClient::open_session`] and the [`Session`]
//! handle it returns, rather than the `*_raw` methods.
use std::fs;
use tonic::codegen::InterceptedService;
use tonic::transport::{Certificate, Channel, ClientTlsConfig};
use tonic::Request;
use crate::auth::AuthInterceptor;
use crate::error::{ensure_command_success, ensure_protocol_success, Error};
use crate::generated::mxaccess_gateway::v1::mx_access_gateway_client::MxAccessGatewayClient;
use crate::generated::mxaccess_gateway::v1::{
AcknowledgeAlarmReply, AcknowledgeAlarmRequest, ActiveAlarmSnapshot, CloseSessionReply,
CloseSessionRequest, MxCommandReply, MxCommandRequest, MxEvent, OpenSessionReply,
OpenSessionRequest, QueryActiveAlarmsRequest, StreamEventsRequest,
};
use crate::options::ClientOptions;
use crate::session::Session;
/// Generated gateway client wrapped in the auth interceptor that
/// [`GatewayClient`] uses internally.
pub type RawGatewayClient = MxAccessGatewayClient<InterceptedService<Channel, AuthInterceptor>>;
/// Pinned, boxed [`MxEvent`] stream returned by
/// [`GatewayClient::stream_events`]. Errors are pre-mapped from
/// `tonic::Status` to [`Error`]; dropping the stream cancels the call.
pub type EventStream =
std::pin::Pin<Box<dyn futures_core::Stream<Item = Result<MxEvent, Error>> + Send + 'static>>;
/// Pinned, boxed [`ActiveAlarmSnapshot`] stream returned by
/// [`GatewayClient::query_active_alarms`]. Errors are pre-mapped from
/// `tonic::Status` to [`Error`]; dropping the stream cancels the call.
pub type ActiveAlarmStream = std::pin::Pin<
Box<dyn futures_core::Stream<Item = Result<ActiveAlarmSnapshot, Error>> + Send + 'static>,
>;
/// Thin async wrapper around the generated gateway client.
///
/// The wrapper is `Clone`: every clone shares the underlying tonic channel
/// (cheap, reference-counted) and the same call/stream timeouts. It is
/// designed to be cheap enough to clone per request handler.
#[derive(Clone)]
pub struct GatewayClient {
inner: RawGatewayClient,
call_timeout: std::time::Duration,
stream_timeout: Option<std::time::Duration>,
}
impl GatewayClient {
/// Connect to the gateway endpoint described by `options` and return a
/// ready-to-use client.
///
/// # Errors
///
/// Returns [`Error::InvalidEndpoint`] if the endpoint URL or CA file is
/// malformed, and [`Error::Transport`] if the TCP/TLS handshake fails.
pub async fn connect(options: ClientOptions) -> Result<Self, Error> {
let mut endpoint =
Channel::from_shared(options.endpoint().to_owned()).map_err(|source| {
Error::InvalidEndpoint {
endpoint: options.endpoint().to_owned(),
detail: source.to_string(),
}
})?;
endpoint = endpoint.connect_timeout(options.connect_timeout());
if !options.plaintext() {
let mut tls = ClientTlsConfig::new();
if let Some(server_name) = options.server_name_override() {
tls = tls.domain_name(server_name.to_owned());
}
if let Some(ca_file) = options.ca_file() {
let certificate = fs::read(ca_file).map_err(|source| Error::InvalidEndpoint {
endpoint: options.endpoint().to_owned(),
detail: format!("failed to read CA file {}: {source}", ca_file.display()),
})?;
tls = tls.ca_certificate(Certificate::from_pem(certificate));
}
endpoint = endpoint.tls_config(tls)?;
}
let channel = endpoint.connect().await?;
let interceptor = AuthInterceptor::new(options.api_key().cloned());
let max_grpc_message_bytes = options.max_grpc_message_bytes();
Ok(Self {
inner: MxAccessGatewayClient::with_interceptor(channel, interceptor)
.max_decoding_message_size(max_grpc_message_bytes)
.max_encoding_message_size(max_grpc_message_bytes),
call_timeout: options.call_timeout(),
stream_timeout: options.stream_timeout(),
})
}
/// Borrow the underlying generated client. Use this only when you need
/// access to RPCs not surfaced by the wrapper.
pub fn raw_client(&mut self) -> &mut RawGatewayClient {
&mut self.inner
}
/// Consume the wrapper and return the underlying generated client.
pub fn into_inner(self) -> RawGatewayClient {
self.inner
}
/// Build a [`Session`] handle from a previously opened session id. No
/// RPC is performed — this is the cheap counterpart to
/// [`GatewayClient::open_session`] for callers that already own the id.
pub fn session(&self, session_id: impl Into<String>) -> Session {
Session::new(session_id, self.clone())
}
/// Issue an `OpenSession` RPC and return the raw reply without
/// validating its `protocol_status`.
///
/// # Errors
///
/// Returns the `tonic::Status` mapped through [`Error::from`].
pub async fn open_session_raw(
&self,
request: OpenSessionRequest,
) -> Result<OpenSessionReply, Error> {
let mut client = self.inner.clone();
let response = client.open_session(self.unary_request(request)).await?;
Ok(response.into_inner())
}
/// Open a session, validate its `protocol_status`, and return a typed
/// [`Session`] handle bound to this client.
///
/// # Errors
///
/// Returns [`Error::ProtocolStatus`] if the gateway accepts the call
/// but reports a non-OK protocol status, plus any of the
/// [`Error`] variants produced by [`open_session_raw`](Self::open_session_raw).
pub async fn open_session(&self, request: OpenSessionRequest) -> Result<Session, Error> {
let reply = self.open_session_raw(request).await?;
ensure_protocol_success("open session", reply.protocol_status.as_ref())?;
Ok(Session::new(reply.session_id, self.clone()))
}
/// Issue a `CloseSession` RPC and return the raw reply.
///
/// # Errors
///
/// Returns the `tonic::Status` mapped through [`Error::from`].
pub async fn close_session_raw(
&self,
request: CloseSessionRequest,
) -> Result<CloseSessionReply, Error> {
let mut client = self.inner.clone();
let response = client.close_session(self.unary_request(request)).await?;
Ok(response.into_inner())
}
/// Issue an `Invoke` RPC and return the raw reply, even when the
/// command-level protocol status is non-OK.
///
/// # Errors
///
/// Returns the `tonic::Status` mapped through [`Error::from`].
pub async fn invoke_raw(&self, request: MxCommandRequest) -> Result<MxCommandReply, Error> {
let mut client = self.inner.clone();
let response = client.invoke(self.unary_request(request)).await?;
Ok(response.into_inner())
}
/// Issue an `Invoke` RPC and surface a non-OK reply as
/// [`Error::Command`].
///
/// # Errors
///
/// Returns [`Error::Command`] when the reply's `protocol_status` is not
/// `Ok`, plus any errors propagated by
/// [`invoke_raw`](Self::invoke_raw).
pub async fn invoke(&self, request: MxCommandRequest) -> Result<MxCommandReply, Error> {
ensure_command_success(self.invoke_raw(request).await?)
}
/// Open the server-streaming `StreamEvents` RPC.
///
/// The returned [`EventStream`] yields `MxEvent` messages as the worker
/// produces them. Dropping the stream cancels the gRPC call cooperatively.
///
/// # Errors
///
/// Returns the `tonic::Status` mapped through [`Error::from`] if the
/// server rejects the subscription.
pub async fn stream_events(&self, request: StreamEventsRequest) -> Result<EventStream, Error> {
let mut client = self.inner.clone();
let response = client.stream_events(self.stream_request(request)).await?;
let stream = futures_util::StreamExt::map(response.into_inner(), |result| {
result.map_err(Error::from)
});
Ok(Box::pin(stream))
}
/// Acknowledge an active MXAccess alarm condition through the gateway.
///
/// The gateway authenticates the request against the API key's
/// `invoke:alarm-ack` scope and forwards the acknowledge to the worker's
/// MXAccess session; the resulting native MxStatus is returned in the
/// reply. Acks are idempotent at the MxAccess layer.
///
/// # Errors
///
/// Returns [`Error::ProtocolStatus`] when the gateway accepts the call but
/// reports a non-OK protocol status, plus any of the [`Error`] variants
/// produced by transport failures.
pub async fn acknowledge_alarm(
&self,
request: AcknowledgeAlarmRequest,
) -> Result<AcknowledgeAlarmReply, Error> {
let mut client = self.inner.clone();
let response = client
.acknowledge_alarm(self.unary_request(request))
.await?;
let reply = response.into_inner();
ensure_protocol_success("acknowledge alarm", reply.protocol_status.as_ref())?;
Ok(reply)
}
/// Open the server-streaming `QueryActiveAlarms` RPC — the gateway's
/// ConditionRefresh equivalent.
///
/// The returned [`ActiveAlarmStream`] yields one [`ActiveAlarmSnapshot`]
/// per currently-active alarm. Dropping the stream cancels the gRPC call
/// cooperatively. Optional alarm-reference prefix scoping
/// (`request.alarm_filter_prefix`) limits the stream to a sub-tree.
///
/// # Errors
///
/// Returns the `tonic::Status` mapped through [`Error::from`] if the
/// server rejects the request.
pub async fn query_active_alarms(
&self,
request: QueryActiveAlarmsRequest,
) -> Result<ActiveAlarmStream, Error> {
let mut client = self.inner.clone();
let response = client
.query_active_alarms(self.stream_request(request))
.await?;
let stream = futures_util::StreamExt::map(response.into_inner(), |result| {
result.map_err(Error::from)
});
Ok(Box::pin(stream))
}
fn unary_request<T>(&self, message: T) -> Request<T> {
let mut request = Request::new(message);
request.set_timeout(self.call_timeout);
request
}
fn stream_request<T>(&self, message: T) -> Request<T> {
let mut request = Request::new(message);
if let Some(timeout) = self.stream_timeout {
request.set_timeout(timeout);
}
request
}
}