Fix all MxGateway.Client.Rust code-review findings

Resolves Client.Rust-001 through Client.Rust-011.

Build/test/clippy gate (Client.Rust-001/002/003):
- options.rs: doc comments on with_max_grpc_message_bytes /
  max_grpc_message_bytes (#![warn(missing_docs)])
- session.rs: rename BulkReplyKind variants to drop the shared `Bulk`
  suffix (clippy::enum_variant_names)
- galaxy.rs: deref instead of clone on Option<Timestamp>
  (clippy::clone_on_copy — an extra violation the gate also hit)
- mxgw-cli: assert version_json against GATEWAY/WORKER_PROTOCOL_VERSION
  constants instead of the stale literal 2
`cargo clippy --workspace --all-targets -- -D warnings` now passes.

Correctness / error handling:
- version.rs: CLIENT_VERSION = env!("CARGO_PKG_VERSION") (Client.Rust-004)
- session.rs: register/add_item/add_item2 handle extractors and
  bulk_results now return Err(Error::MalformedReply) instead of a
  silent 0 / empty vec on a shapeless OK reply (Client.Rust-005/006)
- error.rs: new Error::Unavailable classifies Code::Unavailable /
  ResourceExhausted as transient (Client.Rust-010)
- session.rs: per-call unique correlation ids via an atomic counter
  (Client.Rust-011)

Other:
- value.rs: MxValue/MxArrayValue compute the projection on demand
  instead of caching it, so a wire-only value pays no projection cost
  (Client.Rust-008)
- RustClientDesign.md: correct the crate layout, drop the unused
  `tracing` dependency (Client.Rust-007)
- client_behavior.rs: tests for the bulk-size cap, a mid-stream status
  fault, and the unreadable-CA-file path (Client.Rust-009)

cargo fmt / test --workspace (27 tests) / clippy all pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-18 17:08:55 -04:00
parent f0a4af62b9
commit 0d8a28d2fe
10 changed files with 223 additions and 86 deletions
+19 -14
View File
@@ -11,28 +11,34 @@ generated contract inputs.
## Crate Layout
Recommended layout:
Actual layout — the `mxgateway-client` library crate is the workspace root,
with the `mxgw` test CLI as a workspace member:
```text
clients/rust/
clients/rust/ # `mxgateway-client` library crate (workspace root)
Cargo.toml
build.rs
src/
lib.rs
client.rs
session.rs
galaxy.rs
options.rs
auth.rs
value.rs
version.rs
error.rs
generated.rs
crates/
mxgateway-client/
src/lib.rs
src/client.rs
src/session.rs
src/options.rs
src/auth.rs
src/value.rs
src/error.rs
src/generated/
mxgw-cli/
mxgw-cli/ # `mxgw` test CLI (workspace member)
Cargo.toml
src/main.rs
tests/
client_behavior.rs
proto_fixtures.rs
```
Expected dependencies:
Dependencies:
- `tonic`
- `prost`
@@ -43,7 +49,6 @@ Expected dependencies:
- `clap`
- `serde`
- `serde_json`
- `tracing`
## Library API
+8 -2
View File
@@ -1048,8 +1048,14 @@ mod tests {
fn version_json_output_has_protocol_versions() {
let value = super::version_json();
assert_eq!(value["gatewayProtocolVersion"], 2);
assert_eq!(value["workerProtocolVersion"], 1);
assert_eq!(
value["gatewayProtocolVersion"],
super::GATEWAY_PROTOCOL_VERSION
);
assert_eq!(
value["workerProtocolVersion"],
super::WORKER_PROTOCOL_VERSION
);
}
#[test]
+3 -1
View File
@@ -219,7 +219,9 @@ impl GatewayClient {
request: AcknowledgeAlarmRequest,
) -> Result<AcknowledgeAlarmReply, Error> {
let mut client = self.inner.clone();
let response = client.acknowledge_alarm(self.unary_request(request)).await?;
let response = client
.acknowledge_alarm(self.unary_request(request))
.await?;
let reply = response.into_inner();
ensure_protocol_success("acknowledge alarm", reply.protocol_status.as_ref())?;
Ok(reply)
+28 -4
View File
@@ -1,10 +1,10 @@
//! Error types surfaced by the Rust client.
//!
//! [`Error`] is the umbrella enum returned by every async wrapper. It
//! classifies `tonic::Status` codes (auth, timeout, cancellation) and folds
//! gateway protocol failures and command-level rejections into structured
//! variants. Credentials embedded in status messages are scrubbed before the
//! message reaches a caller.
//! classifies `tonic::Status` codes (auth, timeout, cancellation, transient
//! unavailability) and folds gateway protocol failures and command-level
//! rejections into structured variants. Credentials embedded in status
//! messages are scrubbed before the message reaches a caller.
use thiserror::Error as ThisError;
use tonic::Code;
@@ -85,6 +85,17 @@ pub enum Error {
status: Box<tonic::Status>,
},
/// Server returned `Unavailable` or `ResourceExhausted` — a transient
/// failure (gateway restart, overload) that a caller may reasonably retry.
#[error("gateway temporarily unavailable: {message}")]
Unavailable {
/// Redacted server-supplied detail message.
message: String,
/// Original `tonic::Status`.
#[source]
status: Box<tonic::Status>,
},
/// Any other `tonic::Status` that did not match a more specific variant.
#[error("gateway status error: {0}")]
Status(Box<tonic::Status>),
@@ -106,6 +117,15 @@ pub enum Error {
/// Detail message from the server.
message: String,
},
/// The gateway returned an OK reply whose payload did not carry the data
/// the command contract requires (for example, an `AddItem` reply with no
/// item handle and no `return_value`).
#[error("malformed gateway reply: {detail}")]
MalformedReply {
/// Human-readable description of what the reply was missing.
detail: String,
},
}
/// Wrapper around an [`MxCommandReply`] whose `protocol_status` reported a
@@ -174,6 +194,10 @@ impl From<tonic::Status> for Error {
message,
status: Box::new(status),
},
Code::Unavailable | Code::ResourceExhausted => Self::Unavailable {
message,
status: Box::new(status),
},
_ => Self::Status(Box::new(status)),
}
}
+1 -1
View File
@@ -279,7 +279,7 @@ mod tests {
_request: Request<GetLastDeployTimeRequest>,
) -> Result<Response<GetLastDeployTimeReply>, Status> {
let present = *self.state.present.lock().unwrap();
let time = self.state.last_deploy.lock().unwrap().clone();
let time = *self.state.last_deploy.lock().unwrap();
Ok(Response::new(GetLastDeployTimeReply {
present,
time_of_last_deploy: time,
+3
View File
@@ -95,6 +95,8 @@ impl ClientOptions {
self
}
/// Maximum encoded/decoded gRPC message size, in bytes, the transport
/// will accept. Defaults to 16 MiB.
pub fn with_max_grpc_message_bytes(mut self, max_grpc_message_bytes: usize) -> Self {
self.max_grpc_message_bytes = max_grpc_message_bytes;
self
@@ -140,6 +142,7 @@ impl ClientOptions {
self.stream_timeout
}
/// Configured maximum gRPC message size in bytes.
pub fn max_grpc_message_bytes(&self) -> usize {
self.max_grpc_message_bytes
}
+68 -44
View File
@@ -8,6 +8,8 @@
//! Bulk commands enforce a 1000-item cap before contacting the worker, in
//! line with the gateway's documented `MAX_BULK_ITEMS`.
use std::sync::atomic::{AtomicU64, Ordering};
use crate::client::{EventStream, GatewayClient};
use crate::error::{ensure_protocol_success, Error};
use crate::generated::mxaccess_gateway::v1::mx_command::Payload;
@@ -23,6 +25,16 @@ use crate::value::MxValue;
const MAX_BULK_ITEMS: usize = 1_000;
/// Process-wide monotonic counter that keeps client correlation ids unique.
static CORRELATION_SEQUENCE: AtomicU64 = AtomicU64::new(0);
/// Build a unique `client_correlation_id` for a request so concurrent or
/// repeated calls of the same command kind can be told apart in gateway logs.
fn next_correlation_id(label: &str) -> String {
let sequence = CORRELATION_SEQUENCE.fetch_add(1, Ordering::Relaxed);
format!("rust-client-{label}-{sequence}")
}
/// Handle to an opened gateway session.
///
/// `Session` carries the gateway-issued session id and a cloned
@@ -76,7 +88,7 @@ impl Session {
.client
.close_session_raw(CloseSessionRequest {
session_id: self.id.clone(),
client_correlation_id: "rust-client-close-session".to_owned(),
client_correlation_id: next_correlation_id("close-session"),
})
.await?;
ensure_protocol_success("close session", reply.protocol_status.as_ref())?;
@@ -99,7 +111,7 @@ impl Session {
)
.await?;
Ok(register_server_handle(&reply))
register_server_handle(&reply)
}
/// Run MXAccess `AddItem` against `server_handle` and return the
@@ -120,7 +132,7 @@ impl Session {
)
.await?;
Ok(add_item_handle(&reply))
add_item_handle(&reply)
}
/// Run MXAccess `AddItem2` (item with a caller-supplied context string)
@@ -146,7 +158,7 @@ impl Session {
)
.await?;
Ok(add_item2_handle(&reply))
add_item2_handle(&reply)
}
/// Run MXAccess `RemoveItem` for the given handle pair.
@@ -226,7 +238,7 @@ impl Session {
)
.await?;
Ok(bulk_results(reply, BulkReplyKind::AddItemBulk))
bulk_results(reply, BulkReplyKind::AddItem)
}
/// Bulk variant of [`Session::advise`].
@@ -250,7 +262,7 @@ impl Session {
)
.await?;
Ok(bulk_results(reply, BulkReplyKind::AdviseItemBulk))
bulk_results(reply, BulkReplyKind::AdviseItem)
}
/// Bulk variant of [`Session::remove_item`].
@@ -274,7 +286,7 @@ impl Session {
)
.await?;
Ok(bulk_results(reply, BulkReplyKind::RemoveItemBulk))
bulk_results(reply, BulkReplyKind::RemoveItem)
}
/// Bulk variant of [`Session::un_advise`].
@@ -298,7 +310,7 @@ impl Session {
)
.await?;
Ok(bulk_results(reply, BulkReplyKind::UnAdviseItemBulk))
bulk_results(reply, BulkReplyKind::UnAdviseItem)
}
/// Bulk `Subscribe` (atomic add-and-advise) for a list of tag addresses.
@@ -322,7 +334,7 @@ impl Session {
)
.await?;
Ok(bulk_results(reply, BulkReplyKind::SubscribeBulk))
bulk_results(reply, BulkReplyKind::Subscribe)
}
/// Bulk `Unsubscribe` (atomic un-advise-and-remove) for a list of
@@ -347,7 +359,7 @@ impl Session {
)
.await?;
Ok(bulk_results(reply, BulkReplyKind::UnsubscribeBulk))
bulk_results(reply, BulkReplyKind::Unsubscribe)
}
/// Run MXAccess `Write` (single-value, no caller-supplied timestamp).
@@ -466,7 +478,7 @@ impl Session {
fn command_request(&self, kind: MxCommandKind, payload: Payload) -> MxCommandRequest {
MxCommandRequest {
session_id: self.id.clone(),
client_correlation_id: format!("rust-client-{}", kind.as_str_name()),
client_correlation_id: next_correlation_id(kind.as_str_name()),
command: Some(MxCommand {
kind: kind as i32,
payload: Some(payload),
@@ -486,71 +498,83 @@ fn ensure_bulk_size(name: &'static str, len: usize) -> Result<(), Error> {
}
}
fn register_server_handle(reply: &MxCommandReply) -> i32 {
fn register_server_handle(reply: &MxCommandReply) -> Result<i32, Error> {
match reply.payload.as_ref() {
Some(mx_command_reply::Payload::Register(register)) => register.server_handle,
Some(mx_command_reply::Payload::Register(register)) => Ok(register.server_handle),
_ => reply
.return_value
.as_ref()
.and_then(int32_reply_value)
.unwrap_or_default(),
.ok_or_else(|| Error::MalformedReply {
detail: "Register reply carried neither a register payload nor an \
int32 return value"
.to_owned(),
}),
}
}
fn add_item_handle(reply: &MxCommandReply) -> i32 {
fn add_item_handle(reply: &MxCommandReply) -> Result<i32, Error> {
match reply.payload.as_ref() {
Some(mx_command_reply::Payload::AddItem(add_item)) => add_item.item_handle,
Some(mx_command_reply::Payload::AddItem(add_item)) => Ok(add_item.item_handle),
_ => reply
.return_value
.as_ref()
.and_then(int32_reply_value)
.unwrap_or_default(),
.ok_or_else(|| Error::MalformedReply {
detail: "AddItem reply carried neither an add_item payload nor an \
int32 return value"
.to_owned(),
}),
}
}
fn add_item2_handle(reply: &MxCommandReply) -> i32 {
fn add_item2_handle(reply: &MxCommandReply) -> Result<i32, Error> {
match reply.payload.as_ref() {
Some(mx_command_reply::Payload::AddItem2(add_item)) => add_item.item_handle,
Some(mx_command_reply::Payload::AddItem2(add_item)) => Ok(add_item.item_handle),
_ => reply
.return_value
.as_ref()
.and_then(int32_reply_value)
.unwrap_or_default(),
.ok_or_else(|| Error::MalformedReply {
detail: "AddItem2 reply carried neither an add_item2 payload nor an \
int32 return value"
.to_owned(),
}),
}
}
enum BulkReplyKind {
AddItemBulk,
AdviseItemBulk,
RemoveItemBulk,
UnAdviseItemBulk,
SubscribeBulk,
UnsubscribeBulk,
AddItem,
AdviseItem,
RemoveItem,
UnAdviseItem,
Subscribe,
Unsubscribe,
}
fn bulk_results(reply: MxCommandReply, kind: BulkReplyKind) -> Vec<SubscribeResult> {
fn bulk_results(reply: MxCommandReply, kind: BulkReplyKind) -> Result<Vec<SubscribeResult>, Error> {
match (reply.payload, kind) {
(Some(mx_command_reply::Payload::AddItemBulk(reply)), BulkReplyKind::AddItemBulk) => {
reply.results
(Some(mx_command_reply::Payload::AddItemBulk(reply)), BulkReplyKind::AddItem) => {
Ok(reply.results)
}
(Some(mx_command_reply::Payload::AdviseItemBulk(reply)), BulkReplyKind::AdviseItemBulk) => {
reply.results
(Some(mx_command_reply::Payload::AdviseItemBulk(reply)), BulkReplyKind::AdviseItem) => {
Ok(reply.results)
}
(Some(mx_command_reply::Payload::RemoveItemBulk(reply)), BulkReplyKind::RemoveItemBulk) => {
reply.results
(Some(mx_command_reply::Payload::RemoveItemBulk(reply)), BulkReplyKind::RemoveItem) => {
Ok(reply.results)
}
(
Some(mx_command_reply::Payload::UnAdviseItemBulk(reply)),
BulkReplyKind::UnAdviseItemBulk,
) => reply.results,
(Some(mx_command_reply::Payload::SubscribeBulk(reply)), BulkReplyKind::SubscribeBulk) => {
reply.results
(Some(mx_command_reply::Payload::UnAdviseItemBulk(reply)), BulkReplyKind::UnAdviseItem) => {
Ok(reply.results)
}
(
Some(mx_command_reply::Payload::UnsubscribeBulk(reply)),
BulkReplyKind::UnsubscribeBulk,
) => reply.results,
_ => Vec::new(),
(Some(mx_command_reply::Payload::SubscribeBulk(reply)), BulkReplyKind::Subscribe) => {
Ok(reply.results)
}
(Some(mx_command_reply::Payload::UnsubscribeBulk(reply)), BulkReplyKind::Unsubscribe) => {
Ok(reply.results)
}
_ => Err(Error::MalformedReply {
detail: "bulk command reply did not carry the expected bulk result payload".to_owned(),
}),
}
}
+17 -16
View File
@@ -25,15 +25,13 @@ use crate::generated::mxaccess_gateway::v1::{
#[derive(Clone, Debug, PartialEq)]
pub struct MxValue {
raw: ProtoMxValue,
projection: MxValueProjection,
}
impl MxValue {
/// Wrap a protobuf [`ProtoMxValue`] and compute its
/// [`MxValueProjection`].
/// Wrap a protobuf [`ProtoMxValue`]. The typed [`MxValueProjection`] is
/// computed on demand by [`MxValue::projection`].
pub fn from_proto(raw: ProtoMxValue) -> Self {
let projection = MxValueProjection::from_proto(&raw);
Self { raw, projection }
Self { raw }
}
/// Build a boolean `MxValue` (`MxDataType::Boolean`, `VT_BOOL`).
@@ -102,9 +100,13 @@ impl MxValue {
&self.raw
}
/// Borrow the typed projection.
pub fn projection(&self) -> &MxValueProjection {
&self.projection
/// Compute the typed projection of this value.
///
/// The projection is derived from the raw message on each call rather than
/// cached, so a value built only to be sent over the wire never pays the
/// projection's allocation cost.
pub fn projection(&self) -> MxValueProjection {
MxValueProjection::from_proto(&self.raw)
}
/// Consume the wrapper and return the underlying protobuf message.
@@ -183,15 +185,13 @@ impl MxValueProjection {
#[derive(Clone, Debug, PartialEq)]
pub struct MxArrayValue {
raw: MxArray,
projection: MxArrayProjection,
}
impl MxArrayValue {
/// Wrap a protobuf [`MxArray`] and compute its
/// [`MxArrayProjection`].
/// Wrap a protobuf [`MxArray`]. The typed [`MxArrayProjection`] is
/// computed on demand by [`MxArrayValue::projection`].
pub fn from_proto(raw: MxArray) -> Self {
let projection = MxArrayProjection::from_proto(&raw);
Self { raw, projection }
Self { raw }
}
/// Build a one-dimensional string array (`VT_ARRAY|VT_BSTR`).
@@ -210,9 +210,10 @@ impl MxArrayValue {
&self.raw
}
/// Borrow the typed projection of the array's elements.
pub fn projection(&self) -> &MxArrayProjection {
&self.projection
/// Compute the typed projection of the array's elements, derived from the
/// raw message on each call rather than cached.
pub fn projection(&self) -> MxArrayProjection {
MxArrayProjection::from_proto(&self.raw)
}
}
+3 -2
View File
@@ -3,8 +3,9 @@
//! The protocol versions track the values the gateway and worker negotiate on
//! `OpenSession` and let test harnesses cross-check the wire contract.
/// Semantic version of this Rust client crate. Mirrors `Cargo.toml`.
pub const CLIENT_VERSION: &str = "0.1.0-dev";
/// Semantic version of this Rust client crate, taken from `Cargo.toml` at
/// compile time so the two cannot drift.
pub const CLIENT_VERSION: &str = env!("CARGO_PKG_VERSION");
/// Public gateway gRPC protocol version this client targets.
pub const GATEWAY_PROTOCOL_VERSION: u32 = 3;
+73 -2
View File
@@ -203,7 +203,7 @@ fn value_conversion_fixtures_keep_typed_projection_and_raw_metadata() {
});
assert_eq!(
int64_value.projection(),
&MxValueProjection::Int64(9_223_372_036_854_770_000)
MxValueProjection::Int64(9_223_372_036_854_770_000)
);
let raw_case = case_by_id(cases, "raw-fallback.variant");
@@ -220,7 +220,7 @@ fn value_conversion_fixtures_keep_typed_projection_and_raw_metadata() {
});
assert_eq!(
raw_value.projection(),
&MxValueProjection::Raw(vec![1, 2, 3, 4, 5])
MxValueProjection::Raw(vec![1, 2, 3, 4, 5])
);
assert_eq!(raw_value.raw().raw_data_type, 32767);
assert!(raw_value.raw().raw_diagnostic.contains("No lossless"));
@@ -272,11 +272,76 @@ fn command_error_display_keeps_raw_reply_accessible() {
assert!(error.to_string().contains("MxaccessFailure"));
}
#[tokio::test]
async fn add_item_bulk_rejects_input_above_the_thousand_item_cap() {
let state = Arc::new(FakeState::default());
let endpoint = spawn_fake_gateway(state.clone()).await;
let client = GatewayClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let session = client.session("session-fixture");
let oversized: Vec<String> = (0..1001).map(|index| format!("Tag{index}")).collect();
let error = session.add_item_bulk(12, oversized).await.unwrap_err();
assert!(
matches!(&error, Error::InvalidArgument { name, .. } if name.as_str() == "tag_addresses"),
"expected InvalidArgument for tag_addresses, got {error:?}"
);
}
#[tokio::test]
async fn event_stream_surfaces_a_mid_stream_status_fault() {
let state = Arc::new(FakeState::default());
state.emit_stream_fault.store(true, Ordering::SeqCst);
let endpoint = spawn_fake_gateway(state.clone()).await;
let client = GatewayClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let mut stream = client
.stream_events(StreamEventsRequest {
session_id: "session-fixture".to_owned(),
after_worker_sequence: 0,
})
.await
.unwrap();
assert_eq!(stream.next().await.unwrap().unwrap().worker_sequence, 1);
assert_eq!(stream.next().await.unwrap().unwrap().worker_sequence, 2);
let fault = stream.next().await.unwrap().unwrap_err();
assert!(
matches!(fault, Error::Unavailable { .. }),
"expected Error::Unavailable, got {fault:?}"
);
}
#[tokio::test]
async fn connect_with_unreadable_ca_file_reports_invalid_endpoint() {
let options = ClientOptions::new("https://127.0.0.1:65000")
.with_plaintext(false)
.with_ca_file("definitely-not-a-real-ca-file.pem");
// GatewayClient is not Debug, so unwrap_err is unavailable here.
let error = match GatewayClient::connect(options).await {
Ok(_) => panic!("connect should fail when the CA file cannot be read"),
Err(error) => error,
};
assert!(
matches!(error, Error::InvalidEndpoint { .. }),
"expected Error::InvalidEndpoint, got {error:?}"
);
}
#[derive(Default)]
struct FakeState {
authorization: Mutex<Option<String>>,
last_command_kind: Mutex<Option<i32>>,
stream_dropped: Arc<AtomicBool>,
emit_stream_fault: AtomicBool,
}
#[derive(Clone)]
@@ -376,6 +441,12 @@ impl MxAccessGateway for FakeGateway {
let (sender, receiver) = mpsc::channel(4);
sender.send(Ok(event(1))).await.unwrap();
sender.send(Ok(event(2))).await.unwrap();
if self.state.emit_stream_fault.load(Ordering::SeqCst) {
sender
.send(Err(Status::unavailable("worker dropped the session")))
.await
.unwrap();
}
Ok(Response::new(DropAwareStream {
inner: ReceiverStream::new(receiver),