Files
mxaccessgw/clients/rust/RustClientDesign.md
T
Joseph Doherty 1aafd6bde4 Code-review 2026-05-20 sweep #2: re-review at a020350, resolve 48 findings
Second re-review pass at commit a020350 caught 48 new findings — including
one High-severity regression I introduced in the prior sweep — and fixed
them all in one parallel wave.

High (1)
- Client.Python-018: prior sweep set `license = "Proprietary"` in
  pyproject.toml. setuptools >= 77 enforces PEP 639 and rejects the
  string (it must be a valid SPDX expression), so `pip wheel .` and
  `pip install -e .` both fail before any source compiles. Tests
  still pass because pytest bypasses the build backend via
  `pythonpath`. Dropped the invalid license string, kept the
  `License :: Other/Proprietary License` classifier, and added
  `tests/test_packaging.py` so a future regression of the same shape
  is caught in CI.

Mediums (6)
- Worker-023: `HeartbeatStuckCeiling` (default 75s = 5x HeartbeatGrace)
  on WorkerPipeSessionOptions bounds the in-flight-command watchdog
  suppression so a truly stuck COM call still triggers StaHung
  instead of permanently defeating the watchdog.
- Client.Rust-018: reverted Rust's `latencyMs` split so the
  cross-language bench comparison is apples-to-apples again;
  `failureLatencyMs` kept as Rust-only enrichment.
- Client.Java-021: applied Client.Java-002's terminal-state
  serialisation pattern to DeployEventStream so close() arriving
  after queue-overflow can't erase the overflow exception.
- IntegrationTests-017: teardown-parity test now uses a two-window
  stability check after UnAdvise instead of strict equality against
  the pre-UnAdvise count (which raced against in-flight events).
- IntegrationTests-019: new RecordingTestOutputHelper wraps every
  log sink the WriteSecured live test owns (worker stdout/stderr,
  gateway logs, direct WriteLine) so the credential is proven
  absent from the full output buffer, not just the diagnostic
  message.
- Tests-020: added MxAccessGatewayServiceConstraintTests coverage
  for the previously-uncovered Write2Bulk and WriteSecured2Bulk
  arms of WriteBulkConstraintPlan.SetPayload.

Lows (41 — highlights)
- Server: Galaxy glob cache eviction is race-free (Server-024);
  GalaxyRepositoryGrpcService takes IGalaxyRepository (Server-025);
  AlarmsOptions validated at startup (Server-026); Authorization.md
  Constraint Enforcement snippet/prose enumerate the bulk write/read
  family (Server-027); bulk-read-commands and bulk-write-commands
  capability tokens added to OpenSession (Server-029);
  NotWiredAlarmRpcDispatcher XML doc and missing scope-resolver and
  state-machine tests cleaned up (023, 028).
- Worker: AlarmCommandHandler now invokes the same STA-affinity
  guard the poll path uses, at every command entry (Worker-024);
  RunAsync null-checks the runtime-session factory result
  (Worker-025).
- Worker.Tests: shared LiveMxAccessOptInVariableName lives on
  GatewayContractInfo (Worker.Tests-025); MxAccessSession.CreateForTesting
  rejects production sinks (Worker.Tests-026); FakeRuntimeSession's
  CancelCommandReturnValue serialised under lock (Worker.Tests-027);
  Probes namespace lifted to MxGateway.Worker.Tests.Probes
  (Worker.Tests-029); cancel-envelope sequence numbers monotonised
  (Worker.Tests-030); docs/GatewayTesting.md gains a "Dev-rig Probes"
  section (Worker.Tests-028).
- Tests: ManualTimeProvider consolidated into one TestSupport/ copy
  (Tests-021); SessionManagerBulkTests adds a mid-flight cancellation
  test backed by a TaskCompletionSource fake (Tests-022); companion
  FakeWorkerProcess.WaitForExitAsync no longer fakes its exit signal
  (Tests-023); constraint plan reply-count divergence pinned
  (Tests-024).
- IntegrationTests: TryGetSession chain carries [MaybeNullWhen(false)]
  end-to-end (IntegrationTests-018); abnormal-exit keyword set
  tightened to pipe-disconnected/end-of-stream and the test now
  asserts streamTask.IsFaulted (020, 021).
- Client.Dotnet: bench commands added to isLongRunning so the
  default 30s wall-clock budget doesn't kill them (015);
  BenchStreamEventsAsync observes the inner stream task on every
  exit path (016).
- Client.Go: parseValue wraps strconv errors with flag context and
  %w (017); bench loops honour ctx.Done() (018); galaxy-watch parses
  RFC3339Nano with fractional seconds (019); runStreamEvents installs
  signal.NotifyContext like runGalaxyWatch (020); five new CLI-level
  table-driven tests cover the bulk/bench subcommands (021).
- Client.Java: toCompletable Javadoc rewritten to match the actual
  cancellation contract Client.Java-015 established (022); stream-events
  text path uses Long.toUnsignedString for worker_sequence (023);
  bench-read-bulk no longer pollutes success-latency histogram with
  failure durations (024); --shutdown-timeout CLI option propagates
  through to ClientOptions (025); seven new MxGatewayCliTests cover
  the bulk and bench commands (026).
- Client.Python: mxgateway_cli ships its own py.typed marker (019);
  wheel-build smoke test added under tests/test_packaging.py (020);
  README documents the Galaxy CLI parity gap explicitly (021).
- Client.Rust: RustClientDesign.md signatures match session.rs and
  document the AsRef<str> read_bulk genericism (019);
  next_correlation_id re-exported at the crate root, with a
  property-style doc contract and an explicit disclaimer that the
  literal textual format is not part of the contract (020).
- Contracts: BulkWriteResult comment names the actual
  IConstraintEnforcer mechanism instead of "tag-allowlist filter"
  (014); BulkReadResult gains explicit per-arm payload-population
  documentation for the success vs failure cases (015).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-20 10:28:54 -04:00

9.8 KiB

Rust Client Detailed Design

Purpose

Provide an async Rust client crate for MXAccess Gateway, plus a test CLI and unit tests. The Rust client should use tonic and tokio.

Follow the Rust Style Guide for handwritten code and the Protobuf Style Guide for generated contract inputs.

Crate Layout

Actual layout — the mxgateway-client library crate is the workspace root, with the mxgw test CLI as a workspace member:

clients/rust/                 # `mxgateway-client` library crate (workspace root)
  Cargo.toml
  build.rs
  src/
    lib.rs
    client.rs
    session.rs
    galaxy.rs
    options.rs
    auth.rs
    value.rs
    version.rs
    error.rs
    generated.rs
  crates/
    mxgw-cli/                 # `mxgw` test CLI (workspace member)
      Cargo.toml
      src/main.rs
  tests/
    client_behavior.rs
    proto_fixtures.rs

Dependencies:

  • tonic
  • prost
  • prost-types
  • tokio
  • tokio-stream
  • thiserror
  • clap
  • serde
  • serde_json

Library API

Suggested API:

pub struct GatewayClient { /* tonic channel + generated client */ }

pub struct ClientOptions {
    pub endpoint: String,
    pub api_key: String,
    pub plaintext: bool,
    pub ca_file: Option<PathBuf>,
    pub server_name_override: Option<String>,
    pub connect_timeout: Duration,
    pub call_timeout: Duration,
}

impl GatewayClient {
    pub async fn connect(options: ClientOptions) -> Result<Self, Error>;
    pub async fn open_session(&self, options: OpenSessionOptions) -> Result<Session, Error>;
    pub async fn invoke(&self, request: MxCommandRequest) -> Result<MxCommandReply, Error>;
}

Session:

pub struct Session {
    pub id: String,
}

impl Session {
    pub async fn register(&self, client_name: &str) -> Result<i32, Error>;
    pub async fn add_item(&self, server_handle: i32, item: &str) -> Result<i32, Error>;
    pub async fn add_item2(&self, server_handle: i32, item: &str, context: &str) -> Result<i32, Error>;
    pub async fn advise(&self, server_handle: i32, item_handle: i32) -> Result<(), Error>;
    pub async fn add_item_bulk(&self, server_handle: i32, tag_addresses: Vec<String>) -> Result<Vec<SubscribeResult>, Error>;
    pub async fn advise_item_bulk(&self, server_handle: i32, item_handles: Vec<i32>) -> Result<Vec<SubscribeResult>, Error>;
    pub async fn remove_item_bulk(&self, server_handle: i32, item_handles: Vec<i32>) -> Result<Vec<SubscribeResult>, Error>;
    pub async fn un_advise_item_bulk(&self, server_handle: i32, item_handles: Vec<i32>) -> Result<Vec<SubscribeResult>, Error>;
    pub async fn subscribe_bulk(&self, server_handle: i32, tag_addresses: Vec<String>) -> Result<Vec<SubscribeResult>, Error>;
    pub async fn unsubscribe_bulk(&self, server_handle: i32, item_handles: Vec<i32>) -> Result<Vec<SubscribeResult>, Error>;
    pub async fn write(&self, server_handle: i32, item_handle: i32, value: MxValue, user_id: i32) -> Result<(), Error>;
    pub async fn write_bulk(&self, server_handle: i32, entries: Vec<WriteBulkEntry>) -> Result<Vec<BulkWriteResult>, Error>;
    pub async fn write2_bulk(&self, server_handle: i32, entries: Vec<Write2BulkEntry>) -> Result<Vec<BulkWriteResult>, Error>;
    pub async fn write_secured_bulk(&self, server_handle: i32, entries: Vec<WriteSecuredBulkEntry>) -> Result<Vec<BulkWriteResult>, Error>;
    pub async fn write_secured2_bulk(&self, server_handle: i32, entries: Vec<WriteSecured2BulkEntry>) -> Result<Vec<BulkWriteResult>, Error>;
    pub async fn read_bulk<S: AsRef<str>>(&self, server_handle: i32, tag_addresses: &[S], timeout_ms: u32) -> Result<Vec<BulkReadResult>, Error>;
    pub async fn events(&self) -> Result<impl Stream<Item = Result<MxEvent, Error>>, Error>;
    pub async fn close(&self) -> Result<(), Error>;
}

The four bulk-write helpers (write_bulk, write2_bulk, write_secured_bulk, write_secured2_bulk) and read_bulk mirror the worker's bulk command shapes in mxaccess_gateway.proto and use the same correlation-id discipline as the unary helpers — next_correlation_id is part of the public SDK surface, re-exported at the crate root (mxgateway_client::next_correlation_id), so that consumers constructing raw MxCommandRequest/CloseSessionRequest payloads outside the Session helpers (notably the mxgw test CLI's ping and close-session subcommands) share the same id generation. The returned id is documented as an opaque token with three guaranteed properties (embeds the caller's label, unique within a process, carries no secret); its textual format is intentionally not part of the contract.

The per-entry fields that the matching MXAccess COM calls accept once per batch — user_id (WriteBulkEntry/Write2BulkEntry), timestamp_value (Write2BulkEntry/WriteSecured2BulkEntry), and current_user_id / verifier_user_id (WriteSecuredBulkEntry/WriteSecured2BulkEntry) — live on the entry structs themselves rather than as trailing positional arguments on the helper, matching the protobuf shapes in mxaccess_gateway.proto (WriteBulkCommand / Write2BulkCommand / WriteSecuredBulkCommand / WriteSecured2BulkCommand). read_bulk is generic over AsRef<str> so callers can pass &[String] or &[&str] without cloning at the call site.

Authentication

Use a tonic interceptor or request extension layer to add:

authorization: Bearer <api key>

Use SecretString or equivalent if a dependency is acceptable. Always redact API keys in Debug output.

TLS

Support:

  • plaintext channel for local development,
  • native or rustls TLS depending on project preference,
  • custom CA file,
  • domain override.

Streaming

Expose event streams as a Stream<Item = Result<MxEvent, Error>>. Dropping the stream should cancel the underlying gRPC stream.

Do not buffer unboundedly in the client. If a helper channel is used, make it bounded.

Error Handling

Use thiserror:

pub enum Error {
    InvalidEndpoint { endpoint: String, detail: String },
    InvalidArgument { name: String, detail: String },
    Transport(tonic::transport::Error),
    Authentication { message: String, status: Box<tonic::Status> },
    Authorization { message: String, status: Box<tonic::Status> },
    Timeout { message: String, status: Box<tonic::Status> },
    Cancelled { message: String, status: Box<tonic::Status> },
    Unavailable { message: String, status: Box<tonic::Status> },
    Status(Box<tonic::Status>),
    Command(Box<CommandError>),
    ProtocolStatus { operation: &'static str, code: ProtocolStatusCode, message: String },
    MalformedReply { detail: String },
}

Unavailable classifies the transient Code::Unavailable / Code::ResourceExhausted statuses so callers can decide whether to retry without unwrapping the raw status. MalformedReply surfaces OK replies whose payload does not carry the data the command contract requires (for example, an AddItem reply missing the item handle, or a WriteBulk reply carrying the wrong payload arm). InvalidEndpoint is returned when the endpoint URL fails to parse or its TLS material cannot be loaded.

Preserve raw command replies in CommandError where applicable.

Test CLI

Binary: mxgw.

Use clap derive.

Commands (see clients/rust/README.md for full argument lists):

mxgw version
mxgw ping
mxgw open-session
mxgw close-session --session-id <id>
mxgw register --session-id <id> --client-name <name>
mxgw add-item --session-id <id> --server-handle <h> --item <tag>
mxgw advise --session-id <id> --server-handle <h> --item-handle <h>
mxgw subscribe-bulk --session-id <id> --server-handle <h> --items <a,b,c>
mxgw unsubscribe-bulk --session-id <id> --server-handle <h> --item-handles <1,2,3>
mxgw read-bulk --session-id <id> --server-handle <h> --items <a,b,c> --timeout-ms 1500
mxgw write --session-id <id> --server-handle 1 --item-handle 1 --value-type int32 --value 123
mxgw write2 --session-id <id> --server-handle 1 --item-handle 1 --value-type int32 --value 123 --timestamp <rfc3339>
mxgw write-bulk --session-id <id> --server-handle <h> --item-handles <1,2> --value-type int32 --values <1,2>
mxgw write2-bulk --session-id <id> --server-handle <h> --item-handles <1,2> --value-type int32 --values <1,2> --timestamp <rfc3339>
mxgw write-secured-bulk --session-id <id> --server-handle <h> --item-handles <1,2> --value-type int32 --values <1,2>
mxgw write-secured2-bulk --session-id <id> --server-handle <h> --item-handles <1,2> --value-type int32 --values <1,2> --timestamp <rfc3339>
mxgw stream-events --session-id <id> --json
mxgw bench-read-bulk --duration-seconds 30 --bulk-size 6 --json
mxgw smoke --endpoint http://localhost:5000 --api-key-env MXGATEWAY_API_KEY --item TestChildObject.TestInt
mxgw galaxy test-connection
mxgw galaxy last-deploy-time
mxgw galaxy discover-hierarchy
mxgw galaxy watch [--last-seen-deploy-time <rfc3339>] [--max-events N]

JSON output should use serde_json.

Unit Tests

Use a fake tonic server started on a local ephemeral port, or abstract the generated client behind a trait for unit tests.

Required tests:

  • generated client compiles from proto,
  • auth metadata injection,
  • TLS/plaintext endpoint construction,
  • value conversion,
  • command request construction,
  • error mapping from tonic::Status,
  • event stream order,
  • stream cancellation,
  • CLI parsing,
  • JSON redaction.

Integration Tests

Skip unless:

MXGATEWAY_INTEGRATION=1

Use tokio::test. Run bounded smoke flow and ensure CloseSession is attempted with drop fallback docs, but do not rely on Drop for async close.