From 0d8a28d2fe130d732e36a19be50e1fda550ae57c Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 18 May 2026 17:08:55 -0400 Subject: [PATCH] Fix all MxGateway.Client.Rust code-review findings MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Resolves Client.Rust-001 through Client.Rust-011. Build/test/clippy gate (Client.Rust-001/002/003): - options.rs: doc comments on with_max_grpc_message_bytes / max_grpc_message_bytes (#![warn(missing_docs)]) - session.rs: rename BulkReplyKind variants to drop the shared `Bulk` suffix (clippy::enum_variant_names) - galaxy.rs: deref instead of clone on Option (clippy::clone_on_copy — an extra violation the gate also hit) - mxgw-cli: assert version_json against GATEWAY/WORKER_PROTOCOL_VERSION constants instead of the stale literal 2 `cargo clippy --workspace --all-targets -- -D warnings` now passes. Correctness / error handling: - version.rs: CLIENT_VERSION = env!("CARGO_PKG_VERSION") (Client.Rust-004) - session.rs: register/add_item/add_item2 handle extractors and bulk_results now return Err(Error::MalformedReply) instead of a silent 0 / empty vec on a shapeless OK reply (Client.Rust-005/006) - error.rs: new Error::Unavailable classifies Code::Unavailable / ResourceExhausted as transient (Client.Rust-010) - session.rs: per-call unique correlation ids via an atomic counter (Client.Rust-011) Other: - value.rs: MxValue/MxArrayValue compute the projection on demand instead of caching it, so a wire-only value pays no projection cost (Client.Rust-008) - RustClientDesign.md: correct the crate layout, drop the unused `tracing` dependency (Client.Rust-007) - client_behavior.rs: tests for the bulk-size cap, a mid-stream status fault, and the unreadable-CA-file path (Client.Rust-009) cargo fmt / test --workspace (27 tests) / clippy all pass. Co-Authored-By: Claude Opus 4.7 (1M context) --- clients/rust/RustClientDesign.md | 33 ++++--- clients/rust/crates/mxgw-cli/src/main.rs | 10 +- clients/rust/src/client.rs | 4 +- clients/rust/src/error.rs | 32 ++++++- clients/rust/src/galaxy.rs | 2 +- clients/rust/src/options.rs | 3 + clients/rust/src/session.rs | 112 ++++++++++++++--------- clients/rust/src/value.rs | 33 +++---- clients/rust/src/version.rs | 5 +- clients/rust/tests/client_behavior.rs | 75 ++++++++++++++- 10 files changed, 223 insertions(+), 86 deletions(-) diff --git a/clients/rust/RustClientDesign.md b/clients/rust/RustClientDesign.md index 070ee11..2cd1a58 100644 --- a/clients/rust/RustClientDesign.md +++ b/clients/rust/RustClientDesign.md @@ -11,28 +11,34 @@ generated contract inputs. ## Crate Layout -Recommended layout: +Actual layout — the `mxgateway-client` library crate is the workspace root, +with the `mxgw` test CLI as a workspace member: ```text -clients/rust/ +clients/rust/ # `mxgateway-client` library crate (workspace root) Cargo.toml build.rs + src/ + lib.rs + client.rs + session.rs + galaxy.rs + options.rs + auth.rs + value.rs + version.rs + error.rs + generated.rs crates/ - mxgateway-client/ - src/lib.rs - src/client.rs - src/session.rs - src/options.rs - src/auth.rs - src/value.rs - src/error.rs - src/generated/ - mxgw-cli/ + mxgw-cli/ # `mxgw` test CLI (workspace member) + Cargo.toml src/main.rs tests/ + client_behavior.rs + proto_fixtures.rs ``` -Expected dependencies: +Dependencies: - `tonic` - `prost` @@ -43,7 +49,6 @@ Expected dependencies: - `clap` - `serde` - `serde_json` -- `tracing` ## Library API diff --git a/clients/rust/crates/mxgw-cli/src/main.rs b/clients/rust/crates/mxgw-cli/src/main.rs index 25815cf..994c91b 100644 --- a/clients/rust/crates/mxgw-cli/src/main.rs +++ b/clients/rust/crates/mxgw-cli/src/main.rs @@ -1048,8 +1048,14 @@ mod tests { fn version_json_output_has_protocol_versions() { let value = super::version_json(); - assert_eq!(value["gatewayProtocolVersion"], 2); - assert_eq!(value["workerProtocolVersion"], 1); + assert_eq!( + value["gatewayProtocolVersion"], + super::GATEWAY_PROTOCOL_VERSION + ); + assert_eq!( + value["workerProtocolVersion"], + super::WORKER_PROTOCOL_VERSION + ); } #[test] diff --git a/clients/rust/src/client.rs b/clients/rust/src/client.rs index 8490284..308c70f 100644 --- a/clients/rust/src/client.rs +++ b/clients/rust/src/client.rs @@ -219,7 +219,9 @@ impl GatewayClient { request: AcknowledgeAlarmRequest, ) -> Result { let mut client = self.inner.clone(); - let response = client.acknowledge_alarm(self.unary_request(request)).await?; + let response = client + .acknowledge_alarm(self.unary_request(request)) + .await?; let reply = response.into_inner(); ensure_protocol_success("acknowledge alarm", reply.protocol_status.as_ref())?; Ok(reply) diff --git a/clients/rust/src/error.rs b/clients/rust/src/error.rs index f4c8c9a..92c3c00 100644 --- a/clients/rust/src/error.rs +++ b/clients/rust/src/error.rs @@ -1,10 +1,10 @@ //! Error types surfaced by the Rust client. //! //! [`Error`] is the umbrella enum returned by every async wrapper. It -//! classifies `tonic::Status` codes (auth, timeout, cancellation) and folds -//! gateway protocol failures and command-level rejections into structured -//! variants. Credentials embedded in status messages are scrubbed before the -//! message reaches a caller. +//! classifies `tonic::Status` codes (auth, timeout, cancellation, transient +//! unavailability) and folds gateway protocol failures and command-level +//! rejections into structured variants. Credentials embedded in status +//! messages are scrubbed before the message reaches a caller. use thiserror::Error as ThisError; use tonic::Code; @@ -85,6 +85,17 @@ pub enum Error { status: Box, }, + /// Server returned `Unavailable` or `ResourceExhausted` — a transient + /// failure (gateway restart, overload) that a caller may reasonably retry. + #[error("gateway temporarily unavailable: {message}")] + Unavailable { + /// Redacted server-supplied detail message. + message: String, + /// Original `tonic::Status`. + #[source] + status: Box, + }, + /// Any other `tonic::Status` that did not match a more specific variant. #[error("gateway status error: {0}")] Status(Box), @@ -106,6 +117,15 @@ pub enum Error { /// Detail message from the server. message: String, }, + + /// The gateway returned an OK reply whose payload did not carry the data + /// the command contract requires (for example, an `AddItem` reply with no + /// item handle and no `return_value`). + #[error("malformed gateway reply: {detail}")] + MalformedReply { + /// Human-readable description of what the reply was missing. + detail: String, + }, } /// Wrapper around an [`MxCommandReply`] whose `protocol_status` reported a @@ -174,6 +194,10 @@ impl From for Error { message, status: Box::new(status), }, + Code::Unavailable | Code::ResourceExhausted => Self::Unavailable { + message, + status: Box::new(status), + }, _ => Self::Status(Box::new(status)), } } diff --git a/clients/rust/src/galaxy.rs b/clients/rust/src/galaxy.rs index 4349f03..6897c2b 100644 --- a/clients/rust/src/galaxy.rs +++ b/clients/rust/src/galaxy.rs @@ -279,7 +279,7 @@ mod tests { _request: Request, ) -> Result, Status> { let present = *self.state.present.lock().unwrap(); - let time = self.state.last_deploy.lock().unwrap().clone(); + let time = *self.state.last_deploy.lock().unwrap(); Ok(Response::new(GetLastDeployTimeReply { present, time_of_last_deploy: time, diff --git a/clients/rust/src/options.rs b/clients/rust/src/options.rs index e9c601c..63860ff 100644 --- a/clients/rust/src/options.rs +++ b/clients/rust/src/options.rs @@ -95,6 +95,8 @@ impl ClientOptions { self } + /// Maximum encoded/decoded gRPC message size, in bytes, the transport + /// will accept. Defaults to 16 MiB. pub fn with_max_grpc_message_bytes(mut self, max_grpc_message_bytes: usize) -> Self { self.max_grpc_message_bytes = max_grpc_message_bytes; self @@ -140,6 +142,7 @@ impl ClientOptions { self.stream_timeout } + /// Configured maximum gRPC message size in bytes. pub fn max_grpc_message_bytes(&self) -> usize { self.max_grpc_message_bytes } diff --git a/clients/rust/src/session.rs b/clients/rust/src/session.rs index cc8a78f..76a38e0 100644 --- a/clients/rust/src/session.rs +++ b/clients/rust/src/session.rs @@ -8,6 +8,8 @@ //! Bulk commands enforce a 1000-item cap before contacting the worker, in //! line with the gateway's documented `MAX_BULK_ITEMS`. +use std::sync::atomic::{AtomicU64, Ordering}; + use crate::client::{EventStream, GatewayClient}; use crate::error::{ensure_protocol_success, Error}; use crate::generated::mxaccess_gateway::v1::mx_command::Payload; @@ -23,6 +25,16 @@ use crate::value::MxValue; const MAX_BULK_ITEMS: usize = 1_000; +/// Process-wide monotonic counter that keeps client correlation ids unique. +static CORRELATION_SEQUENCE: AtomicU64 = AtomicU64::new(0); + +/// Build a unique `client_correlation_id` for a request so concurrent or +/// repeated calls of the same command kind can be told apart in gateway logs. +fn next_correlation_id(label: &str) -> String { + let sequence = CORRELATION_SEQUENCE.fetch_add(1, Ordering::Relaxed); + format!("rust-client-{label}-{sequence}") +} + /// Handle to an opened gateway session. /// /// `Session` carries the gateway-issued session id and a cloned @@ -76,7 +88,7 @@ impl Session { .client .close_session_raw(CloseSessionRequest { session_id: self.id.clone(), - client_correlation_id: "rust-client-close-session".to_owned(), + client_correlation_id: next_correlation_id("close-session"), }) .await?; ensure_protocol_success("close session", reply.protocol_status.as_ref())?; @@ -99,7 +111,7 @@ impl Session { ) .await?; - Ok(register_server_handle(&reply)) + register_server_handle(&reply) } /// Run MXAccess `AddItem` against `server_handle` and return the @@ -120,7 +132,7 @@ impl Session { ) .await?; - Ok(add_item_handle(&reply)) + add_item_handle(&reply) } /// Run MXAccess `AddItem2` (item with a caller-supplied context string) @@ -146,7 +158,7 @@ impl Session { ) .await?; - Ok(add_item2_handle(&reply)) + add_item2_handle(&reply) } /// Run MXAccess `RemoveItem` for the given handle pair. @@ -226,7 +238,7 @@ impl Session { ) .await?; - Ok(bulk_results(reply, BulkReplyKind::AddItemBulk)) + bulk_results(reply, BulkReplyKind::AddItem) } /// Bulk variant of [`Session::advise`]. @@ -250,7 +262,7 @@ impl Session { ) .await?; - Ok(bulk_results(reply, BulkReplyKind::AdviseItemBulk)) + bulk_results(reply, BulkReplyKind::AdviseItem) } /// Bulk variant of [`Session::remove_item`]. @@ -274,7 +286,7 @@ impl Session { ) .await?; - Ok(bulk_results(reply, BulkReplyKind::RemoveItemBulk)) + bulk_results(reply, BulkReplyKind::RemoveItem) } /// Bulk variant of [`Session::un_advise`]. @@ -298,7 +310,7 @@ impl Session { ) .await?; - Ok(bulk_results(reply, BulkReplyKind::UnAdviseItemBulk)) + bulk_results(reply, BulkReplyKind::UnAdviseItem) } /// Bulk `Subscribe` (atomic add-and-advise) for a list of tag addresses. @@ -322,7 +334,7 @@ impl Session { ) .await?; - Ok(bulk_results(reply, BulkReplyKind::SubscribeBulk)) + bulk_results(reply, BulkReplyKind::Subscribe) } /// Bulk `Unsubscribe` (atomic un-advise-and-remove) for a list of @@ -347,7 +359,7 @@ impl Session { ) .await?; - Ok(bulk_results(reply, BulkReplyKind::UnsubscribeBulk)) + bulk_results(reply, BulkReplyKind::Unsubscribe) } /// Run MXAccess `Write` (single-value, no caller-supplied timestamp). @@ -466,7 +478,7 @@ impl Session { fn command_request(&self, kind: MxCommandKind, payload: Payload) -> MxCommandRequest { MxCommandRequest { session_id: self.id.clone(), - client_correlation_id: format!("rust-client-{}", kind.as_str_name()), + client_correlation_id: next_correlation_id(kind.as_str_name()), command: Some(MxCommand { kind: kind as i32, payload: Some(payload), @@ -486,71 +498,83 @@ fn ensure_bulk_size(name: &'static str, len: usize) -> Result<(), Error> { } } -fn register_server_handle(reply: &MxCommandReply) -> i32 { +fn register_server_handle(reply: &MxCommandReply) -> Result { match reply.payload.as_ref() { - Some(mx_command_reply::Payload::Register(register)) => register.server_handle, + Some(mx_command_reply::Payload::Register(register)) => Ok(register.server_handle), _ => reply .return_value .as_ref() .and_then(int32_reply_value) - .unwrap_or_default(), + .ok_or_else(|| Error::MalformedReply { + detail: "Register reply carried neither a register payload nor an \ + int32 return value" + .to_owned(), + }), } } -fn add_item_handle(reply: &MxCommandReply) -> i32 { +fn add_item_handle(reply: &MxCommandReply) -> Result { match reply.payload.as_ref() { - Some(mx_command_reply::Payload::AddItem(add_item)) => add_item.item_handle, + Some(mx_command_reply::Payload::AddItem(add_item)) => Ok(add_item.item_handle), _ => reply .return_value .as_ref() .and_then(int32_reply_value) - .unwrap_or_default(), + .ok_or_else(|| Error::MalformedReply { + detail: "AddItem reply carried neither an add_item payload nor an \ + int32 return value" + .to_owned(), + }), } } -fn add_item2_handle(reply: &MxCommandReply) -> i32 { +fn add_item2_handle(reply: &MxCommandReply) -> Result { match reply.payload.as_ref() { - Some(mx_command_reply::Payload::AddItem2(add_item)) => add_item.item_handle, + Some(mx_command_reply::Payload::AddItem2(add_item)) => Ok(add_item.item_handle), _ => reply .return_value .as_ref() .and_then(int32_reply_value) - .unwrap_or_default(), + .ok_or_else(|| Error::MalformedReply { + detail: "AddItem2 reply carried neither an add_item2 payload nor an \ + int32 return value" + .to_owned(), + }), } } enum BulkReplyKind { - AddItemBulk, - AdviseItemBulk, - RemoveItemBulk, - UnAdviseItemBulk, - SubscribeBulk, - UnsubscribeBulk, + AddItem, + AdviseItem, + RemoveItem, + UnAdviseItem, + Subscribe, + Unsubscribe, } -fn bulk_results(reply: MxCommandReply, kind: BulkReplyKind) -> Vec { +fn bulk_results(reply: MxCommandReply, kind: BulkReplyKind) -> Result, Error> { match (reply.payload, kind) { - (Some(mx_command_reply::Payload::AddItemBulk(reply)), BulkReplyKind::AddItemBulk) => { - reply.results + (Some(mx_command_reply::Payload::AddItemBulk(reply)), BulkReplyKind::AddItem) => { + Ok(reply.results) } - (Some(mx_command_reply::Payload::AdviseItemBulk(reply)), BulkReplyKind::AdviseItemBulk) => { - reply.results + (Some(mx_command_reply::Payload::AdviseItemBulk(reply)), BulkReplyKind::AdviseItem) => { + Ok(reply.results) } - (Some(mx_command_reply::Payload::RemoveItemBulk(reply)), BulkReplyKind::RemoveItemBulk) => { - reply.results + (Some(mx_command_reply::Payload::RemoveItemBulk(reply)), BulkReplyKind::RemoveItem) => { + Ok(reply.results) } - ( - Some(mx_command_reply::Payload::UnAdviseItemBulk(reply)), - BulkReplyKind::UnAdviseItemBulk, - ) => reply.results, - (Some(mx_command_reply::Payload::SubscribeBulk(reply)), BulkReplyKind::SubscribeBulk) => { - reply.results + (Some(mx_command_reply::Payload::UnAdviseItemBulk(reply)), BulkReplyKind::UnAdviseItem) => { + Ok(reply.results) } - ( - Some(mx_command_reply::Payload::UnsubscribeBulk(reply)), - BulkReplyKind::UnsubscribeBulk, - ) => reply.results, - _ => Vec::new(), + (Some(mx_command_reply::Payload::SubscribeBulk(reply)), BulkReplyKind::Subscribe) => { + Ok(reply.results) + } + (Some(mx_command_reply::Payload::UnsubscribeBulk(reply)), BulkReplyKind::Unsubscribe) => { + Ok(reply.results) + } + _ => Err(Error::MalformedReply { + detail: "bulk command reply did not carry the expected bulk result payload".to_owned(), + }), } } diff --git a/clients/rust/src/value.rs b/clients/rust/src/value.rs index 4da694a..3df70d8 100644 --- a/clients/rust/src/value.rs +++ b/clients/rust/src/value.rs @@ -25,15 +25,13 @@ use crate::generated::mxaccess_gateway::v1::{ #[derive(Clone, Debug, PartialEq)] pub struct MxValue { raw: ProtoMxValue, - projection: MxValueProjection, } impl MxValue { - /// Wrap a protobuf [`ProtoMxValue`] and compute its - /// [`MxValueProjection`]. + /// Wrap a protobuf [`ProtoMxValue`]. The typed [`MxValueProjection`] is + /// computed on demand by [`MxValue::projection`]. pub fn from_proto(raw: ProtoMxValue) -> Self { - let projection = MxValueProjection::from_proto(&raw); - Self { raw, projection } + Self { raw } } /// Build a boolean `MxValue` (`MxDataType::Boolean`, `VT_BOOL`). @@ -102,9 +100,13 @@ impl MxValue { &self.raw } - /// Borrow the typed projection. - pub fn projection(&self) -> &MxValueProjection { - &self.projection + /// Compute the typed projection of this value. + /// + /// The projection is derived from the raw message on each call rather than + /// cached, so a value built only to be sent over the wire never pays the + /// projection's allocation cost. + pub fn projection(&self) -> MxValueProjection { + MxValueProjection::from_proto(&self.raw) } /// Consume the wrapper and return the underlying protobuf message. @@ -183,15 +185,13 @@ impl MxValueProjection { #[derive(Clone, Debug, PartialEq)] pub struct MxArrayValue { raw: MxArray, - projection: MxArrayProjection, } impl MxArrayValue { - /// Wrap a protobuf [`MxArray`] and compute its - /// [`MxArrayProjection`]. + /// Wrap a protobuf [`MxArray`]. The typed [`MxArrayProjection`] is + /// computed on demand by [`MxArrayValue::projection`]. pub fn from_proto(raw: MxArray) -> Self { - let projection = MxArrayProjection::from_proto(&raw); - Self { raw, projection } + Self { raw } } /// Build a one-dimensional string array (`VT_ARRAY|VT_BSTR`). @@ -210,9 +210,10 @@ impl MxArrayValue { &self.raw } - /// Borrow the typed projection of the array's elements. - pub fn projection(&self) -> &MxArrayProjection { - &self.projection + /// Compute the typed projection of the array's elements, derived from the + /// raw message on each call rather than cached. + pub fn projection(&self) -> MxArrayProjection { + MxArrayProjection::from_proto(&self.raw) } } diff --git a/clients/rust/src/version.rs b/clients/rust/src/version.rs index e27fc2e..5019aa1 100644 --- a/clients/rust/src/version.rs +++ b/clients/rust/src/version.rs @@ -3,8 +3,9 @@ //! The protocol versions track the values the gateway and worker negotiate on //! `OpenSession` and let test harnesses cross-check the wire contract. -/// Semantic version of this Rust client crate. Mirrors `Cargo.toml`. -pub const CLIENT_VERSION: &str = "0.1.0-dev"; +/// Semantic version of this Rust client crate, taken from `Cargo.toml` at +/// compile time so the two cannot drift. +pub const CLIENT_VERSION: &str = env!("CARGO_PKG_VERSION"); /// Public gateway gRPC protocol version this client targets. pub const GATEWAY_PROTOCOL_VERSION: u32 = 3; diff --git a/clients/rust/tests/client_behavior.rs b/clients/rust/tests/client_behavior.rs index 117d77b..199ce73 100644 --- a/clients/rust/tests/client_behavior.rs +++ b/clients/rust/tests/client_behavior.rs @@ -203,7 +203,7 @@ fn value_conversion_fixtures_keep_typed_projection_and_raw_metadata() { }); assert_eq!( int64_value.projection(), - &MxValueProjection::Int64(9_223_372_036_854_770_000) + MxValueProjection::Int64(9_223_372_036_854_770_000) ); let raw_case = case_by_id(cases, "raw-fallback.variant"); @@ -220,7 +220,7 @@ fn value_conversion_fixtures_keep_typed_projection_and_raw_metadata() { }); assert_eq!( raw_value.projection(), - &MxValueProjection::Raw(vec![1, 2, 3, 4, 5]) + MxValueProjection::Raw(vec![1, 2, 3, 4, 5]) ); assert_eq!(raw_value.raw().raw_data_type, 32767); assert!(raw_value.raw().raw_diagnostic.contains("No lossless")); @@ -272,11 +272,76 @@ fn command_error_display_keeps_raw_reply_accessible() { assert!(error.to_string().contains("MxaccessFailure")); } +#[tokio::test] +async fn add_item_bulk_rejects_input_above_the_thousand_item_cap() { + let state = Arc::new(FakeState::default()); + let endpoint = spawn_fake_gateway(state.clone()).await; + let client = GatewayClient::connect(ClientOptions::new(endpoint)) + .await + .unwrap(); + let session = client.session("session-fixture"); + + let oversized: Vec = (0..1001).map(|index| format!("Tag{index}")).collect(); + let error = session.add_item_bulk(12, oversized).await.unwrap_err(); + + assert!( + matches!(&error, Error::InvalidArgument { name, .. } if name.as_str() == "tag_addresses"), + "expected InvalidArgument for tag_addresses, got {error:?}" + ); +} + +#[tokio::test] +async fn event_stream_surfaces_a_mid_stream_status_fault() { + let state = Arc::new(FakeState::default()); + state.emit_stream_fault.store(true, Ordering::SeqCst); + let endpoint = spawn_fake_gateway(state.clone()).await; + let client = GatewayClient::connect(ClientOptions::new(endpoint)) + .await + .unwrap(); + + let mut stream = client + .stream_events(StreamEventsRequest { + session_id: "session-fixture".to_owned(), + after_worker_sequence: 0, + }) + .await + .unwrap(); + + assert_eq!(stream.next().await.unwrap().unwrap().worker_sequence, 1); + assert_eq!(stream.next().await.unwrap().unwrap().worker_sequence, 2); + + let fault = stream.next().await.unwrap().unwrap_err(); + + assert!( + matches!(fault, Error::Unavailable { .. }), + "expected Error::Unavailable, got {fault:?}" + ); +} + +#[tokio::test] +async fn connect_with_unreadable_ca_file_reports_invalid_endpoint() { + let options = ClientOptions::new("https://127.0.0.1:65000") + .with_plaintext(false) + .with_ca_file("definitely-not-a-real-ca-file.pem"); + + // GatewayClient is not Debug, so unwrap_err is unavailable here. + let error = match GatewayClient::connect(options).await { + Ok(_) => panic!("connect should fail when the CA file cannot be read"), + Err(error) => error, + }; + + assert!( + matches!(error, Error::InvalidEndpoint { .. }), + "expected Error::InvalidEndpoint, got {error:?}" + ); +} + #[derive(Default)] struct FakeState { authorization: Mutex>, last_command_kind: Mutex>, stream_dropped: Arc, + emit_stream_fault: AtomicBool, } #[derive(Clone)] @@ -376,6 +441,12 @@ impl MxAccessGateway for FakeGateway { let (sender, receiver) = mpsc::channel(4); sender.send(Ok(event(1))).await.unwrap(); sender.send(Ok(event(2))).await.unwrap(); + if self.state.emit_stream_fault.load(Ordering::SeqCst) { + sender + .send(Err(Status::unavailable("worker dropped the session"))) + .await + .unwrap(); + } Ok(Response::new(DropAwareStream { inner: ReceiverStream::new(receiver),