Files
mxaccessgw/clients/rust/RustClientDesign.md
T
Joseph Doherty 4a0f88b17d Resolve Client.Rust-022..029: MalformedReply, correlation ids, clippy
Client.Rust-022  Restored Error::MalformedReply for register / add_item /
                 add_item2 and the bulk-subscribe / read-bulk / write-bulk
                 dispatch arms so malformed-but-OK replies fail loudly
                 instead of returning Vec::new().
Client.Rust-023  Restored next_correlation_id and routed every CLI close /
                 stream-alarms / acknowledge-alarm / bench-read-bulk call
                 through it so each call carries a unique opaque token.
Client.Rust-024  Added round-trip tests for read_bulk / write_bulk /
                 write2_bulk / write_secured_bulk / write_secured2_bulk
                 plus stream_alarms and percentile_summary unit tests.
Client.Rust-025  RustClientDesign.md re-synced — new bulk SDK, alarms
                 surface, Error variants, CLI command list, and the
                 Windows stack workaround.
Client.Rust-026  Session::read_bulk now borrows a tag slice; bench-read-
                 bulk binds tags once outside the warm-up / steady-state
                 loops.
Client.Rust-027  .cargo/config.toml selector tightened to
                 cfg(all(windows, target_env = "msvc")) and comment
                 rewritten to match reality (release + debug ship the
                 8 MB reservation).
Client.Rust-028  run_batch removed the empty-line break; stdin EOF is
                 the only terminator.
Client.Rust-029  Re-applied Client.Rust-001 / 002 / 012 — added the
                 missing doc comments, renamed BulkReplyKind variants,
                 and replaced the clone-on-copy with a deref under lock
                 so cargo clippy -D warnings is clean.

All resolved at 2026-05-24; cargo fmt + check + clippy + test all green
(55 tests).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-24 08:50:15 -04:00

12 KiB

Rust Client Detailed Design

Purpose

Provide an async Rust client crate for MXAccess Gateway, plus a test CLI and unit tests. The Rust client should use tonic and tokio.

Follow the Rust Style Guide for handwritten code and the Protobuf Style Guide for generated contract inputs.

Crate Layout

The workspace is rooted at clients/rust/. The top-level crate zb-mom-ww-mxgateway-client is declared by clients/rust/Cargo.toml itself (flat layout — its src/ sits directly under the workspace root, not nested inside crates/). The only [workspace.members] entry is the mxgw-cli binary subcrate under crates/mxgw-cli/.

clients/rust/
  Cargo.toml          # workspace root + top-level crate `zb-mom-ww-mxgateway-client`
  Cargo.lock
  build.rs            # tonic-build proto generation
  README.md
  RustClientDesign.md
  src/
    lib.rs
    client.rs
    session.rs
    galaxy.rs
    options.rs
    auth.rs
    error.rs
    value.rs
    version.rs
    generated.rs      # `pub mod` wrappers around files under `src/generated/`
    generated/        # tonic-build output (not hand-edited)
  tests/
    client_behavior.rs
    proto_fixtures.rs
  crates/
    mxgw-cli/         # sole workspace member — binary `mxgw`
      Cargo.toml
      src/main.rs

Expected dependencies:

  • tonic
  • prost
  • prost-types
  • tokio
  • tokio-stream
  • thiserror
  • clap
  • serde
  • serde_json

Windows Build Notes

clients/rust/.cargo/config.toml carries an MSVC-scoped rustflag that bumps the default 1 MB Windows main-thread stack to 8 MB (-C link-arg=/STACK:8388608, under cfg(all(windows, target_env = "msvc"))). The setting is required because clap-derive materialises a large Command enum (one variant per CLI subcommand, each carrying its flag args) on the main task's stack in debug builds, before any user code runs; the default 1 MB stack overflows during enum construction. The /STACK: link-arg writes into the PE header's IMAGE_OPTIONAL_HEADER.SizeOfStackReserve at link time, so both debug and release artifacts ship with the same 8 MB stack reservation. Release builds would not overflow without it (the optimizer elides the enum from the stack frame), but the setting is kept on for release too so both build flavours produce binaries with identical stack metadata. The MSVC-only selector keeps x86_64-pc-windows-gnu (mingw) builds unaffected, since the GNU linker rejects /STACK:.

Library API

Suggested API:

pub struct GatewayClient { /* tonic channel + generated client */ }

pub struct ClientOptions {
    pub endpoint: String,
    pub api_key: String,
    pub plaintext: bool,
    pub ca_file: Option<PathBuf>,
    pub server_name_override: Option<String>,
    pub connect_timeout: Duration,
    pub call_timeout: Duration,
}

impl GatewayClient {
    pub async fn connect(options: ClientOptions) -> Result<Self, Error>;
    pub async fn open_session(&self, options: OpenSessionOptions) -> Result<Session, Error>;
    pub async fn invoke(&self, request: MxCommandRequest) -> Result<MxCommandReply, Error>;
}

Session:

pub struct Session {
    pub id: String,
}

impl Session {
    pub async fn register(&self, client_name: &str) -> Result<i32, Error>;
    pub async fn add_item(&self, server_handle: i32, item: &str) -> Result<i32, Error>;
    pub async fn add_item2(&self, server_handle: i32, item: &str, context: &str) -> Result<i32, Error>;
    pub async fn advise(&self, server_handle: i32, item_handle: i32) -> Result<(), Error>;
    pub async fn un_advise(&self, server_handle: i32, item_handle: i32) -> Result<(), Error>;
    pub async fn remove_item(&self, server_handle: i32, item_handle: i32) -> Result<(), Error>;
    pub async fn add_item_bulk(&self, server_handle: i32, tag_addresses: Vec<String>) -> Result<Vec<SubscribeResult>, Error>;
    pub async fn advise_item_bulk(&self, server_handle: i32, item_handles: Vec<i32>) -> Result<Vec<SubscribeResult>, Error>;
    pub async fn remove_item_bulk(&self, server_handle: i32, item_handles: Vec<i32>) -> Result<Vec<SubscribeResult>, Error>;
    pub async fn un_advise_item_bulk(&self, server_handle: i32, item_handles: Vec<i32>) -> Result<Vec<SubscribeResult>, Error>;
    pub async fn subscribe_bulk(&self, server_handle: i32, tag_addresses: Vec<String>) -> Result<Vec<SubscribeResult>, Error>;
    pub async fn unsubscribe_bulk(&self, server_handle: i32, item_handles: Vec<i32>) -> Result<Vec<SubscribeResult>, Error>;
    pub async fn read_bulk<S: AsRef<str>>(&self, server_handle: i32, tag_addresses: &[S], timeout_ms: u32) -> Result<Vec<BulkReadResult>, Error>;
    pub async fn write(&self, server_handle: i32, item_handle: i32, value: MxValue, user_id: i32) -> Result<(), Error>;
    pub async fn write2(&self, server_handle: i32, item_handle: i32, value: MxValue, timestamp_value: MxValue, user_id: i32) -> Result<(), Error>;
    pub async fn write_bulk(&self, server_handle: i32, entries: Vec<WriteBulkEntry>) -> Result<Vec<BulkWriteResult>, Error>;
    pub async fn write2_bulk(&self, server_handle: i32, entries: Vec<Write2BulkEntry>) -> Result<Vec<BulkWriteResult>, Error>;
    pub async fn write_secured_bulk(&self, server_handle: i32, entries: Vec<WriteSecuredBulkEntry>) -> Result<Vec<BulkWriteResult>, Error>;
    pub async fn write_secured2_bulk(&self, server_handle: i32, entries: Vec<WriteSecured2BulkEntry>) -> Result<Vec<BulkWriteResult>, Error>;
    pub async fn events(&self) -> Result<impl Stream<Item = Result<MxEvent, Error>>, Error>;
    pub async fn close(&self) -> Result<(), Error>;
}

The per-entry credentials and timestamps (user_id, timestamp_value, current_user_id, verifier_user_id) live on the WriteBulkEntry / Write2BulkEntry / WriteSecuredBulkEntry / WriteSecured2BulkEntry structs rather than as trailing positional arguments on the bulk-write helpers, matching the protobuf shapes in mxaccess_gateway.proto. read_bulk is generic over AsRef<str> so callers can pass &[String] or &[&str] without cloning at the call site (the cross-language bench-read-bulk hot loop relies on this).

The session::next_correlation_id helper is pub and re-exported at the crate root (zb_mom_ww_mxgateway_client::next_correlation_id); raw-RPC consumers like the mxgw CLI's Ping, CloseSession, StreamAlarms, AcknowledgeAlarm, and BenchReadBulk paths call it so every request carries a unique correlation id that gateway logs can tell apart from concurrent CLI smokes. The textual format is intentionally not part of the public contract.

Alarms

GatewayClient exposes the gateway's session-less central alarm surface:

pub type AlarmFeedStream = Pin<Box<dyn Stream<Item = Result<AlarmFeedMessage, Error>> + Send + 'static>>;

impl GatewayClient {
    pub async fn stream_alarms(&self, request: StreamAlarmsRequest) -> Result<AlarmFeedStream, Error>;
    pub async fn acknowledge_alarm(&self, request: AcknowledgeAlarmRequest) -> Result<AcknowledgeAlarmReply, Error>;
}

stream_alarms opens with one active_alarm per currently-active alarm (the ConditionRefresh snapshot), then a single snapshot_complete, then a transition for every subsequent raise / acknowledge / clear. The feed is served by the gateway's always-on alarm monitor — no worker session is opened — so any number of clients may attach. Dropping the stream cancels the gRPC call cooperatively. acknowledge_alarm is idempotent at the MxAccess layer; the returned AcknowledgeAlarmReply carries the native MxStatus from the worker.

Authentication

Use a tonic interceptor or request extension layer to add:

authorization: Bearer <api key>

Use SecretString or equivalent if a dependency is acceptable. Always redact API keys in Debug output.

TLS

Support:

  • plaintext channel for local development,
  • native or rustls TLS depending on project preference,
  • custom CA file,
  • domain override.

Streaming

Expose event streams as a Stream<Item = Result<MxEvent, Error>>. Dropping the stream should cancel the underlying gRPC stream.

Do not buffer unboundedly in the client. If a helper channel is used, make it bounded.

Error Handling

Use thiserror:

pub enum Error {
    InvalidEndpoint { endpoint: String, detail: String },
    InvalidArgument { name: String, detail: String },
    Transport(tonic::transport::Error),
    Authentication { message: String, status: Box<tonic::Status> },
    Authorization { message: String, status: Box<tonic::Status> },
    Timeout { message: String, status: Box<tonic::Status> },
    Cancelled { message: String, status: Box<tonic::Status> },
    Unavailable { message: String, status: Box<tonic::Status> },
    Status(Box<tonic::Status>),
    Command(Box<CommandError>),
    ProtocolStatus { operation: &'static str, code: ProtocolStatusCode, message: String },
    MalformedReply { detail: String },
}
  • Unavailable classifies Code::Unavailable / Code::ResourceExhausted so callers can distinguish transient failures from permanent ones.
  • MalformedReply surfaces the rare case where the gateway returned a protocol-level Ok envelope but the typed payload arm was missing or did not match the command kind (e.g. an AddItemReply body on a WriteBulk reply). This is distinct from ProtocolStatus because the protocol-level envelope itself succeeded; the corruption is in the payload shape.
  • InvalidEndpoint is raised before any RPC dispatch when the endpoint URL or CA file fails to parse / load.

Preserve raw command replies in CommandError where applicable.

Test CLI

Binary: mxgw.

Use clap derive.

Commands:

mxgw version
mxgw ping
mxgw open-session
mxgw close-session --session-id <id>
mxgw register --session-id <id>
mxgw add-item --session-id <id> --server-handle <h> --item <tag>
mxgw advise --session-id <id> --server-handle <h> --item-handle <h>
mxgw subscribe-bulk --session-id <id> --server-handle <h> --items <csv>
mxgw unsubscribe-bulk --session-id <id> --server-handle <h> --item-handles <csv>
mxgw read-bulk --session-id <id> --server-handle <h> --items <csv> [--timeout-ms <ms>]
mxgw write --session-id <id> --server-handle 1 --item-handle 1 --value-type int32 --value 123
mxgw write2 --session-id <id> --server-handle 1 --item-handle 1 --value-type int32 --value 123 --timestamp <iso>
mxgw write-bulk --session-id <id> --server-handle <h> --item-handles <csv> --value-type <t> --values <csv>
mxgw write2-bulk ...
mxgw write-secured-bulk ...
mxgw write-secured2-bulk ...
mxgw stream-events --session-id <id> --json
mxgw stream-alarms [--filter-prefix <prefix>] [--max-events <n>]
mxgw acknowledge-alarm --reference <full-ref> [--comment <txt>] [--operator <user>]
mxgw bench-read-bulk [--duration-seconds <n>] [--warmup-seconds <n>] [--bulk-size <n>]
mxgw smoke --endpoint http://localhost:5000 --api-key-env MXGATEWAY_API_KEY --item TestChildObject.TestInt
mxgw batch
mxgw galaxy {test-connection,last-deploy-time,discover-hierarchy,watch}

batch reads commands from stdin one per line and dispatches each through the normal subcommand path; the loop terminates only on stdin EOF (blank lines log an empty-EOR-bracketed result and continue) so accidental empty lines from the PowerShell e2e harness do not silently end the session. bench-read-bulk opens its own session, subscribes to --bulk-size tags so the worker's value cache populates from OnDataChange events, hammers read_bulk in a tight loop for --duration-seconds, and emits the cross-language JSON shape that scripts/bench-read-bulk.ps1 collates.

JSON output should use serde_json.

Unit Tests

Use a fake tonic server started on a local ephemeral port, or abstract the generated client behind a trait for unit tests.

Required tests:

  • generated client compiles from proto,
  • auth metadata injection,
  • TLS/plaintext endpoint construction,
  • value conversion,
  • command request construction,
  • error mapping from tonic::Status,
  • event stream order,
  • stream cancellation,
  • CLI parsing,
  • JSON redaction.

Integration Tests

Skip unless:

MXGATEWAY_INTEGRATION=1

Use tokio::test. Run bounded smoke flow and ensure CloseSession is attempted with drop fallback docs, but do not rely on Drop for async close.