//! `mxgw` — the Rust test CLI for the MXAccess Gateway. //! //! The binary wraps [`mxgateway_client`] in a `clap`-driven command surface //! used by the cross-language smoke matrix and by developers exercising the //! gateway by hand. Every subcommand mirrors a single gateway/Galaxy RPC, //! prints either a terse line or a JSON document with `--json`, and exits //! non-zero on any failure. #![warn(missing_docs)] use std::env; use std::io::{self, BufRead, Write}; use std::path::PathBuf; use std::process::ExitCode; use std::time::Duration; use clap::{Args, Parser, Subcommand, ValueEnum}; use futures_util::StreamExt; use mxgateway_client::generated::galaxy_repository::v1::DeployEvent; use mxgateway_client::generated::mxaccess_gateway::v1::{ alarm_feed_message, AcknowledgeAlarmRequest, AlarmFeedMessage, CloseSessionRequest, MxCommand, MxCommandKind, MxCommandRequest, MxEvent, MxEventFamily, MxValue as ProtoMxValue, OpenSessionRequest, PingCommand, StreamAlarmsRequest, StreamEventsRequest, Write2BulkEntry, WriteBulkEntry, WriteSecured2BulkEntry, WriteSecuredBulkEntry, }; use mxgateway_client::{ ApiKey, ClientOptions, Error, GalaxyClient, GatewayClient, MxValue, MxValueProjection, CLIENT_VERSION, GATEWAY_PROTOCOL_VERSION, WORKER_PROTOCOL_VERSION, }; use serde_json::json; use serde_json::Value; const MAX_AGGREGATE_EVENTS: usize = 10_000; #[derive(Debug, Parser)] #[command(name = "mxgw")] #[command(about = "MXAccess Gateway Rust test CLI")] struct Cli { #[command(subcommand)] command: Command, } #[derive(Debug, Subcommand)] enum Command { Version { #[arg(long)] json: bool, #[arg(long)] jsonl: bool, }, Ping { #[command(flatten)] connection: ConnectionArgs, #[arg(long, default_value = "ping")] message: String, #[arg(long)] json: bool, }, OpenSession { #[command(flatten)] connection: ConnectionArgs, #[arg(long, default_value = "mxgw-rust-cli")] client_name: String, #[arg(long)] json: bool, }, CloseSession { #[command(flatten)] connection: ConnectionArgs, #[arg(long)] session_id: String, #[arg(long)] json: bool, }, Register { #[command(flatten)] connection: ConnectionArgs, #[arg(long)] session_id: String, #[arg(long, default_value = "mxgw-rust-cli")] client_name: String, #[arg(long)] json: bool, }, AddItem { #[command(flatten)] connection: ConnectionArgs, #[arg(long)] session_id: String, #[arg(long)] server_handle: i32, #[arg(long)] item: String, #[arg(long)] json: bool, }, Advise { #[command(flatten)] connection: ConnectionArgs, #[arg(long)] session_id: String, #[arg(long)] server_handle: i32, #[arg(long)] item_handle: i32, #[arg(long)] json: bool, }, SubscribeBulk { #[command(flatten)] connection: ConnectionArgs, #[arg(long)] session_id: String, #[arg(long)] server_handle: i32, #[arg(long, value_delimiter = ',')] items: Vec, #[arg(long)] json: bool, }, UnsubscribeBulk { #[command(flatten)] connection: ConnectionArgs, #[arg(long)] session_id: String, #[arg(long)] server_handle: i32, #[arg(long, value_delimiter = ',')] item_handles: Vec, #[arg(long)] json: bool, }, /// Snapshot the current value for each requested tag. Cached /// OnDataChange values are returned for tags that are already advised /// without touching the existing subscription; otherwise the worker /// takes a one-shot AddItem + Advise + UnAdvise + RemoveItem lifecycle. ReadBulk { #[command(flatten)] connection: ConnectionArgs, #[arg(long)] session_id: String, #[arg(long)] server_handle: i32, #[arg(long, value_delimiter = ',')] items: Vec, /// Per-tag snapshot timeout in milliseconds. `0` uses the worker default (1000 ms). #[arg(long, default_value_t = 0)] timeout_ms: u32, #[arg(long)] json: bool, }, /// Bulk Write — one MXAccess Write per (item_handle, value) pair. WriteBulk { #[command(flatten)] connection: ConnectionArgs, #[arg(long)] session_id: String, #[arg(long)] server_handle: i32, #[arg(long, value_delimiter = ',')] item_handles: Vec, #[arg(long, value_enum)] value_type: CliValueType, #[arg(long, value_delimiter = ',')] values: Vec, #[arg(long, default_value_t = 0)] user_id: i32, #[arg(long)] json: bool, }, /// Bulk Write2 — timestamped variant; the timestamp applies to all entries. Write2Bulk { #[command(flatten)] connection: ConnectionArgs, #[arg(long)] session_id: String, #[arg(long)] server_handle: i32, #[arg(long, value_delimiter = ',')] item_handles: Vec, #[arg(long, value_enum)] value_type: CliValueType, #[arg(long, value_delimiter = ',')] values: Vec, #[arg(long)] timestamp: String, #[arg(long, default_value_t = 0)] user_id: i32, #[arg(long)] json: bool, }, /// Bulk WriteSecured. WriteSecuredBulk { #[command(flatten)] connection: ConnectionArgs, #[arg(long)] session_id: String, #[arg(long)] server_handle: i32, #[arg(long, value_delimiter = ',')] item_handles: Vec, #[arg(long, value_enum)] value_type: CliValueType, #[arg(long, value_delimiter = ',')] values: Vec, #[arg(long, default_value_t = 0)] current_user_id: i32, #[arg(long, default_value_t = 0)] verifier_user_id: i32, #[arg(long)] json: bool, }, /// Bulk WriteSecured2 — timestamped + verified. WriteSecured2Bulk { #[command(flatten)] connection: ConnectionArgs, #[arg(long)] session_id: String, #[arg(long)] server_handle: i32, #[arg(long, value_delimiter = ',')] item_handles: Vec, #[arg(long, value_enum)] value_type: CliValueType, #[arg(long, value_delimiter = ',')] values: Vec, #[arg(long)] timestamp: String, #[arg(long, default_value_t = 0)] current_user_id: i32, #[arg(long, default_value_t = 0)] verifier_user_id: i32, #[arg(long)] json: bool, }, /// Cross-language ReadBulk stress benchmark. Opens its own session, /// subscribes to bulk-size tags, then hammers ReadBulk in a tight loop /// for duration-seconds and emits a JSON stats record the /// scripts/bench-read-bulk.ps1 driver collates across all five clients. BenchReadBulk { #[command(flatten)] connection: ConnectionArgs, #[arg(long, default_value = "mxgw-rust-bench")] client_name: String, #[arg(long, default_value_t = 30)] duration_seconds: u64, #[arg(long, default_value_t = 3)] warmup_seconds: u64, #[arg(long, default_value_t = 6)] bulk_size: usize, #[arg(long, default_value_t = 1)] tag_start: usize, #[arg(long, default_value = "TestMachine_")] tag_prefix: String, #[arg(long, default_value = "TestChangingInt")] tag_attribute: String, #[arg(long, default_value_t = 1500)] timeout_ms: u32, #[arg(long)] json: bool, }, StreamEvents { #[command(flatten)] connection: ConnectionArgs, #[arg(long)] session_id: String, #[arg(long, default_value_t = 0)] after_worker_sequence: u64, #[arg(long, default_value_t = 1)] max_events: usize, #[arg(long)] json: bool, #[arg(long)] jsonl: bool, }, /// Attach to the gateway's session-less central alarm feed. The stream /// opens with one `active_alarm` per currently-active alarm, then a /// single `snapshot_complete`, then a `transition` for every subsequent /// raise / acknowledge / clear. StreamAlarms { #[command(flatten)] connection: ConnectionArgs, /// Optional alarm-reference prefix scoping the feed to an equipment /// sub-tree. Omit to stream every active alarm. #[arg(long)] filter_prefix: Option, #[arg(long, default_value_t = 1)] max_events: usize, #[arg(long)] json: bool, #[arg(long)] jsonl: bool, }, Write { #[command(flatten)] connection: ConnectionArgs, #[arg(long)] session_id: String, #[arg(long)] server_handle: i32, #[arg(long)] item_handle: i32, #[arg(long, value_enum)] value_type: CliValueType, #[arg(long)] value: String, #[arg(long, default_value_t = 0)] user_id: i32, #[arg(long)] json: bool, }, Write2 { #[command(flatten)] connection: ConnectionArgs, #[arg(long)] session_id: String, #[arg(long)] server_handle: i32, #[arg(long)] item_handle: i32, #[arg(long, value_enum)] value_type: CliValueType, #[arg(long)] value: String, #[arg(long)] timestamp: String, #[arg(long, default_value_t = 0)] user_id: i32, #[arg(long)] json: bool, }, /// Acknowledge an active MXAccess alarm condition through the gateway's /// session-less AcknowledgeAlarm RPC. AcknowledgeAlarm { #[command(flatten)] connection: ConnectionArgs, #[arg(long)] reference: String, #[arg(long, default_value = "")] comment: String, #[arg(long, default_value = "")] operator: String, #[arg(long)] json: bool, }, Smoke { #[command(flatten)] connection: ConnectionArgs, #[arg(long)] item: String, #[arg(long, default_value = "mxgw-rust-smoke")] client_name: String, #[arg(long)] json: bool, }, /// Read commands from stdin, one per line, execute each in sequence, and /// write `__MXGW_BATCH_EOR__` to stdout after every result. Errors are /// written as `{"error":"…","type":"error"}` JSON to stdout (not stderr) /// so the harness can parse them without interleaving stderr. The loop /// never terminates on command error; only stdin EOF (or an empty line) /// ends the session. Batch, #[command(subcommand)] Galaxy(GalaxyCommand), } #[derive(Debug, Subcommand)] enum GalaxyCommand { TestConnection { #[command(flatten)] connection: ConnectionArgs, #[arg(long)] json: bool, }, LastDeployTime { #[command(flatten)] connection: ConnectionArgs, #[arg(long)] json: bool, }, DiscoverHierarchy { #[command(flatten)] connection: ConnectionArgs, #[arg(long)] json: bool, }, /// Subscribe to the WatchDeployEvents server stream. /// /// Prints one line per received event (or one JSON object with `--json`). /// Runs until the stream ends, the server fails the call, or the /// process is interrupted (Ctrl+C). #[command(alias = "watch-deploy-events")] Watch { #[command(flatten)] connection: ConnectionArgs, /// Optional RFC3339 timestamp (e.g. `2026-04-28T15:30:00Z`). When /// supplied, the server suppresses the bootstrap event if its /// cached deploy time matches this value. #[arg(long)] last_seen_deploy_time: Option, /// Optional cap on the number of events to print before exiting. /// 0 (the default) means run until cancelled or the stream ends. #[arg(long, default_value_t = 0)] max_events: usize, #[arg(long)] json: bool, }, } #[derive(Debug, Args, Clone)] struct ConnectionArgs { #[arg(long, default_value = "http://127.0.0.1:5000")] endpoint: String, #[arg(long)] api_key: Option, #[arg(long, default_value = "MXGATEWAY_API_KEY")] api_key_env: String, #[arg(long)] plaintext: bool, #[arg(long)] tls: bool, #[arg(long)] ca_file: Option, #[arg(long)] server_name_override: Option, #[arg(long, default_value_t = 10)] connect_timeout_seconds: u64, #[arg(long, default_value_t = 30)] call_timeout_seconds: u64, } impl ConnectionArgs { fn options(&self) -> ClientOptions { let mut options = ClientOptions::new(self.endpoint.clone()) .with_plaintext(!self.tls || self.plaintext) .with_connect_timeout(Duration::from_secs(self.connect_timeout_seconds)) .with_call_timeout(Duration::from_secs(self.call_timeout_seconds)); if let Some(api_key) = self .api_key .clone() .or_else(|| env::var(&self.api_key_env).ok()) .filter(|value| !value.is_empty()) { options = options.with_api_key(ApiKey::new(api_key)); } if let Some(ca_file) = &self.ca_file { options = options.with_ca_file(ca_file); } if let Some(server_name_override) = &self.server_name_override { options = options.with_server_name_override(server_name_override); } options } } #[derive(Clone, Copy, Debug, ValueEnum)] enum CliValueType { Bool, Int32, Int64, Float, Double, String, } /// Entry point. The real work runs on a dedicated thread with a large stack: /// clap's derive-generated argument parser is deeply recursive, and in debug /// builds (no inlining) parsing the `Command` enum can exhaust the default /// 8 MiB main-thread stack as the enum grows. A 32 MiB worker stack keeps the /// CLI robust regardless of build profile or future subcommand growth. fn main() -> ExitCode { let worker = std::thread::Builder::new() .name("mxgw-cli".to_owned()) .stack_size(32 * 1024 * 1024) .spawn(run) .expect("failed to spawn the CLI worker thread"); worker.join().expect("the CLI worker thread panicked") } fn run() -> ExitCode { let cli = Cli::parse(); let runtime = tokio::runtime::Builder::new_current_thread() .enable_all() .build() .expect("failed to build the Tokio runtime"); let result = runtime.block_on(async { match cli.command { Command::Batch => run_batch().await, command => dispatch(command).await, } }); match result { Ok(()) => ExitCode::SUCCESS, Err(error) => { eprintln!("{error}"); ExitCode::FAILURE } } } /// Dispatch a parsed [`Command`] to its handler. All subcommands except /// [`Command::Batch`] are handled here; `Batch` is handled separately in /// `main` to avoid mutual recursion between `dispatch` and `run_batch`. async fn dispatch(command: Command) -> Result<(), Error> { match command { Command::Batch => { return Err(Error::InvalidArgument { name: "batch".to_owned(), detail: "batch cannot be nested inside another batch session".to_owned(), }); } Command::Version { json, .. } => print_version(json), Command::Ping { connection, message, json, } => { let client = connect(connection).await?; let reply = client .invoke(MxCommandRequest { client_correlation_id: mxgateway_client::next_correlation_id("cli-ping"), command: Some(MxCommand { kind: MxCommandKind::Ping as i32, payload: Some(mxgateway_client::generated::mxaccess_gateway::v1::mx_command::Payload::Ping( PingCommand { message }, )), }), ..MxCommandRequest::default() }) .await?; print_command_reply("ping", &reply, json); } Command::OpenSession { connection, client_name, json, } => { let client = connect(connection).await?; let reply = client .open_session_raw(OpenSessionRequest { client_session_name: client_name, ..OpenSessionRequest::default() }) .await?; if json { println!( "{}", json!({ "sessionId": reply.session_id, "backendName": reply.backend_name, "gatewayProtocolVersion": reply.gateway_protocol_version, "workerProtocolVersion": reply.worker_protocol_version, }) ); } else { println!("{}", reply.session_id); } } Command::CloseSession { connection, session_id, json, } => { let client = connect(connection).await?; let reply = client .close_session_raw(CloseSessionRequest { session_id, client_correlation_id: mxgateway_client::next_correlation_id( "cli-close-session", ), }) .await?; if json { println!("{}", json!({ "sessionId": reply.session_id })); } else { println!("closed {}", reply.session_id); } } Command::Register { connection, session_id, client_name, json, } => { let session = session_for(connection, session_id).await?; let server_handle = session.register(&client_name).await?; print_handle("serverHandle", server_handle, json); } Command::AddItem { connection, session_id, server_handle, item, json, } => { let session = session_for(connection, session_id).await?; let item_handle = session.add_item(server_handle, &item).await?; print_handle("itemHandle", item_handle, json); } Command::Advise { connection, session_id, server_handle, item_handle, json, } => { let session = session_for(connection, session_id).await?; session.advise(server_handle, item_handle).await?; print_ok("advise", json); } Command::SubscribeBulk { connection, session_id, server_handle, items, json, } => { let session = session_for(connection, session_id).await?; let results = session.subscribe_bulk(server_handle, items).await?; print_bulk_results("subscribe-bulk", &results, json); } Command::UnsubscribeBulk { connection, session_id, server_handle, item_handles, json, } => { let session = session_for(connection, session_id).await?; let results = session .unsubscribe_bulk(server_handle, item_handles) .await?; print_bulk_results("unsubscribe-bulk", &results, json); } Command::ReadBulk { connection, session_id, server_handle, items, timeout_ms, json, } => { let session = session_for(connection, session_id).await?; let results = session.read_bulk(server_handle, &items, timeout_ms).await?; print_read_bulk_results("read-bulk", &results, json); } Command::WriteBulk { connection, session_id, server_handle, item_handles, value_type, values, user_id, json, } => { let entries = build_write_bulk_entries(&item_handles, value_type, &values)?; let session = session_for(connection, session_id).await?; let results = session .write_bulk( server_handle, entries .into_iter() .map(|(item_handle, value)| WriteBulkEntry { item_handle, value: Some(value), user_id, }) .collect(), ) .await?; print_write_bulk_results("write-bulk", &results, json); } Command::Write2Bulk { connection, session_id, server_handle, item_handles, value_type, values, timestamp, user_id, json, } => { let entries = build_write_bulk_entries(&item_handles, value_type, &values)?; let timestamp_value: ProtoMxValue = MxValue::string(timestamp).into_proto(); let session = session_for(connection, session_id).await?; let results = session .write2_bulk( server_handle, entries .into_iter() .map(|(item_handle, value)| Write2BulkEntry { item_handle, value: Some(value), timestamp_value: Some(timestamp_value.clone()), user_id, }) .collect(), ) .await?; print_write_bulk_results("write2-bulk", &results, json); } Command::WriteSecuredBulk { connection, session_id, server_handle, item_handles, value_type, values, current_user_id, verifier_user_id, json, } => { let entries = build_write_bulk_entries(&item_handles, value_type, &values)?; let session = session_for(connection, session_id).await?; let results = session .write_secured_bulk( server_handle, entries .into_iter() .map(|(item_handle, value)| WriteSecuredBulkEntry { item_handle, value: Some(value), current_user_id, verifier_user_id, }) .collect(), ) .await?; print_write_bulk_results("write-secured-bulk", &results, json); } Command::WriteSecured2Bulk { connection, session_id, server_handle, item_handles, value_type, values, timestamp, current_user_id, verifier_user_id, json, } => { let entries = build_write_bulk_entries(&item_handles, value_type, &values)?; let timestamp_value: ProtoMxValue = MxValue::string(timestamp).into_proto(); let session = session_for(connection, session_id).await?; let results = session .write_secured2_bulk( server_handle, entries .into_iter() .map(|(item_handle, value)| WriteSecured2BulkEntry { item_handle, value: Some(value), timestamp_value: Some(timestamp_value.clone()), current_user_id, verifier_user_id, }) .collect(), ) .await?; print_write_bulk_results("write-secured2-bulk", &results, json); } Command::BenchReadBulk { connection, client_name, duration_seconds, warmup_seconds, bulk_size, tag_start, tag_prefix, tag_attribute, timeout_ms, json, } => { if bulk_size == 0 { return Err(Error::InvalidArgument { name: "bulk-size".to_owned(), detail: "must be positive".to_owned(), }); } run_bench_read_bulk( connection, client_name, duration_seconds, warmup_seconds, bulk_size, tag_start, tag_prefix, tag_attribute, timeout_ms, json, ) .await?; } Command::StreamEvents { connection, session_id, after_worker_sequence, max_events, json, jsonl, } => { if max_events > MAX_AGGREGATE_EVENTS { return Err(Error::InvalidArgument { name: "max-events".to_owned(), detail: format!("must be less than or equal to {MAX_AGGREGATE_EVENTS}"), }); } let client = connect(connection).await?; let mut stream = client .stream_events(StreamEventsRequest { session_id, after_worker_sequence, }) .await?; let mut events: Vec = Vec::new(); let mut event_count = 0usize; while event_count < max_events { let Some(event) = stream.next().await else { break; }; let event = event?; event_count += 1; if jsonl { println!("{}", event_to_json(&event)); } else if json { events.push(event_to_json(&event)); } else { println!("{} {}", event.worker_sequence, event.family); } } if json { // `eventCount` is preserved for back-compat; `events` carries // the per-event detail the cross-language e2e matrix compares. println!("{}", json!({ "eventCount": event_count, "events": events })); } } Command::StreamAlarms { connection, filter_prefix, max_events, json, jsonl, } => { if max_events > MAX_AGGREGATE_EVENTS { return Err(Error::InvalidArgument { name: "max-events".to_owned(), detail: format!("must be less than or equal to {MAX_AGGREGATE_EVENTS}"), }); } let client = connect(connection).await?; let mut stream = client .stream_alarms(StreamAlarmsRequest { client_correlation_id: mxgateway_client::next_correlation_id( "cli-stream-alarms", ), alarm_filter_prefix: filter_prefix.unwrap_or_default(), }) .await?; let mut messages: Vec = Vec::new(); let mut message_count = 0usize; while message_count < max_events { let Some(message) = stream.next().await else { break; }; let message = message?; message_count += 1; if jsonl { println!("{}", alarm_feed_message_to_json(&message)); } else if json { messages.push(alarm_feed_message_to_json(&message)); } else { println!("{}", alarm_feed_message_summary(&message)); } } if json { println!( "{}", json!({ "messageCount": message_count, "messages": messages }) ); } } Command::Write { connection, session_id, server_handle, item_handle, value_type, value, user_id, json, } => { let session = session_for(connection, session_id).await?; session .write( server_handle, item_handle, parse_value(value_type, &value)?, user_id, ) .await?; print_ok("write", json); } Command::Write2 { connection, session_id, server_handle, item_handle, value_type, value, timestamp, user_id, json, } => { let session = session_for(connection, session_id).await?; session .write2( server_handle, item_handle, parse_value(value_type, &value)?, MxValue::string(timestamp), user_id, ) .await?; print_ok("write2", json); } Command::AcknowledgeAlarm { connection, reference, comment, operator, json, } => { let client = connect(connection).await?; let reply = client .acknowledge_alarm(AcknowledgeAlarmRequest { client_correlation_id: mxgateway_client::next_correlation_id( "cli-acknowledge-alarm", ), alarm_full_reference: reference, comment, operator_user: operator, }) .await?; print_acknowledge_alarm_reply(&reply, json); } Command::Galaxy(galaxy_command) => run_galaxy(galaxy_command).await?, Command::Smoke { connection, item, client_name, json, } => { let client = connect(connection).await?; let session = client .open_session(OpenSessionRequest { client_session_name: client_name.clone(), ..OpenSessionRequest::default() }) .await?; let result = async { let server_handle = session.register(&client_name).await?; let item_handle = session.add_item(server_handle, &item).await?; session.advise(server_handle, item_handle).await?; Ok::<_, Error>((server_handle, item_handle)) } .await; let close_result = session.close().await; let (server_handle, item_handle) = result?; close_result?; if json { println!( "{}", json!({ "sessionId": session.id(), "serverHandle": server_handle, "itemHandle": item_handle, "closed": true, }) ); } else { println!( "session {} registered server {server_handle}, item {item_handle}, closed", session.id() ); } } } Ok(()) } async fn connect(connection: ConnectionArgs) -> Result { GatewayClient::connect(connection.options()).await } async fn connect_galaxy(connection: ConnectionArgs) -> Result { GalaxyClient::connect(connection.options()).await } async fn run_galaxy(command: GalaxyCommand) -> Result<(), Error> { match command { GalaxyCommand::TestConnection { connection, json } => { let mut client = connect_galaxy(connection).await?; let ok = client.test_connection().await?; if json { println!("{}", json!({ "ok": ok })); } else if ok { println!("ok"); } else { println!("not ok"); } } GalaxyCommand::LastDeployTime { connection, json } => { let mut client = connect_galaxy(connection).await?; let timestamp = client.get_last_deploy_time().await?; match (json, timestamp) { (true, Some(ts)) => { println!( "{}", json!({ "present": true, "seconds": ts.seconds, "nanos": ts.nanos, }) ); } (true, None) => { println!("{}", json!({ "present": false })); } (false, Some(ts)) => println!("{}.{:09}", ts.seconds, ts.nanos), (false, None) => println!("(absent)"), } } GalaxyCommand::Watch { connection, last_seen_deploy_time, max_events, json, } => { let mut client = connect_galaxy(connection).await?; let last_seen = last_seen_deploy_time .as_deref() .map(parse_rfc3339_timestamp) .transpose()?; let mut stream = client.watch_deploy_events(last_seen).await?; let mut count = 0usize; loop { tokio::select! { biased; signal = tokio::signal::ctrl_c() => { signal.map_err(|err| Error::InvalidArgument { name: "ctrl_c".to_owned(), detail: err.to_string(), })?; // Drop the stream below by breaking; tonic tears the // gRPC call down cooperatively. break; } next = stream.next() => { let Some(event) = next else { break; }; let event = event?; count += 1; print_deploy_event(&event, json); if max_events != 0 && count >= max_events { break; } } } } } GalaxyCommand::DiscoverHierarchy { connection, json } => { let mut client = connect_galaxy(connection).await?; let objects = client.discover_hierarchy().await?; if json { let payload: Vec<_> = objects .iter() .map(|object| { json!({ "gobjectId": object.gobject_id, "tagName": object.tag_name, "containedName": object.contained_name, "browseName": object.browse_name, "parentGobjectId": object.parent_gobject_id, "isArea": object.is_area, "categoryId": object.category_id, "hostedByGobjectId": object.hosted_by_gobject_id, "templateChain": object.template_chain, "attributes": object.attributes.iter().map(|attribute| json!({ "attributeName": attribute.attribute_name, "fullTagReference": attribute.full_tag_reference, "mxDataType": attribute.mx_data_type, "dataTypeName": attribute.data_type_name, "isArray": attribute.is_array, "arrayDimension": attribute.array_dimension, "arrayDimensionPresent": attribute.array_dimension_present, "mxAttributeCategory": attribute.mx_attribute_category, "securityClassification": attribute.security_classification, "isHistorized": attribute.is_historized, "isAlarm": attribute.is_alarm, })).collect::>(), }) }) .collect(); println!("{}", json!({ "objects": payload })); } else { println!("{}", objects.len()); for object in &objects { println!( "{} {} {} ({} attribute(s))", object.gobject_id, object.tag_name, object.browse_name, object.attributes.len() ); } } } } Ok(()) } async fn session_for( connection: ConnectionArgs, session_id: String, ) -> Result { let client = connect(connection).await?; Ok(client.session(session_id)) } /// End-of-result sentinel written to stdout after every batch command. const BATCH_EOR: &str = "__MXGW_BATCH_EOR__"; /// Run the batch loop: read one command line at a time from stdin, dispatch /// each through the normal [`run`] path, and write [`BATCH_EOR`] to stdout /// after every result. Errors are serialised as JSON to stdout so the /// harness can parse them without interleaving stderr. The loop never /// terminates on command error; only stdin EOF or an empty line ends the /// session. async fn run_batch() -> Result<(), Error> { let stdin = io::stdin(); let stdout = io::stdout(); for line in stdin.lock().lines() { let line = line.map_err(|e| Error::InvalidArgument { name: "stdin".to_owned(), detail: e.to_string(), })?; if line.is_empty() { break; } let parts: Vec<&str> = line.split_ascii_whitespace().collect(); if parts.is_empty() { println!("{BATCH_EOR}"); stdout.lock().flush().ok(); continue; } // Re-parse the split arguments under a fresh Cli, prepending the // program-name placeholder so clap sees a complete argv[]. let parse_result = Cli::try_parse_from(std::iter::once("mxgw-cli").chain(parts.iter().copied())); let outcome: Result<(), Error> = match parse_result { Ok(cli) => { // Spawn on a new tokio task so each command runs with a fresh // stack, avoiding stack overflow from the large dispatch future. tokio::task::spawn(dispatch(cli.command)) .await .unwrap_or_else(|join_err| { Err(Error::InvalidArgument { name: "task".to_owned(), detail: join_err.to_string(), }) }) } Err(clap_err) => Err(Error::InvalidArgument { name: "args".to_owned(), detail: clap_err.to_string(), }), }; if let Err(err) = outcome { // Write error as JSON to stdout so the harness sees it in the // same stream as normal output, framed by the EOR sentinel. println!( "{}", serde_json::json!({ "error": err.to_string(), "type": "error" }) ); } println!("{BATCH_EOR}"); stdout.lock().flush().ok(); } Ok(()) } /// Cross-language ReadBulk stress benchmark — mirrors the .NET / Go / Python / /// Java implementations so the PS driver collates one JSON schema across all /// five clients. #[allow(clippy::too_many_arguments)] async fn run_bench_read_bulk( connection: ConnectionArgs, client_name: String, duration_seconds: u64, warmup_seconds: u64, bulk_size: usize, tag_start: usize, tag_prefix: String, tag_attribute: String, timeout_ms: u32, use_json: bool, ) -> Result<(), Error> { let endpoint = connection.endpoint.clone(); let client = connect(connection).await?; let session = client .open_session(OpenSessionRequest { client_session_name: client_name.clone(), ..OpenSessionRequest::default() }) .await?; let tags: Vec = (0..bulk_size) .map(|i| format!("{tag_prefix}{:03}.{tag_attribute}", tag_start + i)) .collect(); // Bench body in its own block so the trailing session.close() always // runs, even on the early returns the loop body never hits today. let bench_outcome = async { let server_handle = session.register(&client_name).await?; let subscribe_results = session.subscribe_bulk(server_handle, tags.clone()).await?; let item_handles: Vec = subscribe_results .iter() .filter(|r| r.was_successful) .map(|r| r.item_handle) .collect(); let warmup_deadline = std::time::Instant::now() + std::time::Duration::from_secs(warmup_seconds); while std::time::Instant::now() < warmup_deadline { let _ = session.read_bulk(server_handle, &tags, timeout_ms).await; } let mut stats = BenchReadBulkStats::default(); let steady_start = std::time::Instant::now(); let steady_deadline = steady_start + std::time::Duration::from_secs(duration_seconds); while std::time::Instant::now() < steady_deadline { let call_start = std::time::Instant::now(); let outcome = session.read_bulk(server_handle, &tags, timeout_ms).await; let elapsed_ms = call_start.elapsed().as_secs_f64() * 1000.0; match outcome { Ok(results) => stats.record_success(elapsed_ms, &results), Err(error) => stats.record_failure(elapsed_ms, &error), } } let steady_elapsed = steady_start.elapsed(); if !item_handles.is_empty() { let _ = session.unsubscribe_bulk(server_handle, item_handles).await; } let context = BenchReadBulkContext { endpoint: &endpoint, client_name: &client_name, bulk_size, duration_seconds, warmup_seconds, steady_elapsed, tags: &tags, }; let json_stats = stats.to_json(&context); if use_json { println!("{}", json_stats); } else { println!("{}", stats.calls_per_second(steady_elapsed)); } Ok::<(), Error>(()) } .await; let _ = session.close().await; bench_outcome } /// Per-iteration accounting for `bench-read-bulk`. /// /// Every `read_bulk` call's elapsed time contributes to the all-calls /// histogram (`latencies_ms`), matching the .NET/Go/Python/Java bench /// implementations whose `latencyMs` field is the cross-language comparison /// contract collated by `scripts/bench-read-bulk.ps1`. Failures additionally /// land in `failure_latencies_ms` and the first failure's redacted error /// string is stashed in `first_failure`, both surfaced through the JSON as /// Rust-only enrichment so a partial-failure run is still visible at the /// report layer without breaking the side-by-side comparison. #[derive(Default)] struct BenchReadBulkStats { latencies_ms: Vec, failure_latencies_ms: Vec, total_read_results: u64, cached_read_results: u64, successful_calls: u64, failed_calls: u64, first_failure: Option, } impl BenchReadBulkStats { fn record_success( &mut self, elapsed_ms: f64, results: &[mxgateway_client::generated::mxaccess_gateway::v1::BulkReadResult], ) { self.latencies_ms.push(elapsed_ms); self.successful_calls += 1; for result in results { self.total_read_results += 1; if result.was_cached { self.cached_read_results += 1; } } } fn record_failure(&mut self, elapsed_ms: f64, error: &Error) { self.latencies_ms.push(elapsed_ms); self.failure_latencies_ms.push(elapsed_ms); self.failed_calls += 1; if self.first_failure.is_none() { self.first_failure = Some(error.to_string()); } } fn total_calls(&self) -> u64 { self.successful_calls + self.failed_calls } fn calls_per_second(&self, elapsed: std::time::Duration) -> f64 { let seconds = elapsed.as_secs_f64(); if seconds > 0.0 { self.total_calls() as f64 / seconds } else { 0.0 } } fn to_json(&self, context: &BenchReadBulkContext<'_>) -> serde_json::Value { let calls_per_second = self.calls_per_second(context.steady_elapsed); let latency_summary = percentile_summary(&self.latencies_ms); let failure_summary = percentile_summary(&self.failure_latencies_ms); serde_json::json!({ "language": "rust", "command": "bench-read-bulk", "endpoint": context.endpoint, "clientName": context.client_name, "bulkSize": context.bulk_size, "durationSeconds": context.duration_seconds, "warmupSeconds": context.warmup_seconds, "durationMs": context.steady_elapsed.as_millis() as u64, "tags": context.tags, "totalCalls": self.total_calls(), "successfulCalls": self.successful_calls, "failedCalls": self.failed_calls, "totalReadResults": self.total_read_results, "cachedReadResults": self.cached_read_results, "callsPerSecond": round_to(calls_per_second, 2), "latencyMs": latency_summary, "failureLatencyMs": failure_summary, "firstFailure": self.first_failure, }) } } /// Static configuration for one `bench-read-bulk` run, packaged so the /// JSON serialiser can quote it back without taking eight positional args. struct BenchReadBulkContext<'a> { endpoint: &'a str, client_name: &'a str, bulk_size: usize, duration_seconds: u64, warmup_seconds: u64, steady_elapsed: std::time::Duration, tags: &'a [String], } fn percentile_summary(sample: &[f64]) -> serde_json::Value { if sample.is_empty() { return serde_json::json!({ "p50": 0.0, "p95": 0.0, "p99": 0.0, "max": 0.0, "mean": 0.0 }); } let mut sorted = sample.to_vec(); sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal)); let max = sorted[sorted.len() - 1]; let mean = sample.iter().sum::() / sample.len() as f64; serde_json::json!({ "p50": round_to(percentile(&sorted, 0.50), 3), "p95": round_to(percentile(&sorted, 0.95), 3), "p99": round_to(percentile(&sorted, 0.99), 3), "max": round_to(max, 3), "mean": round_to(mean, 3), }) } fn percentile(sorted: &[f64], quantile: f64) -> f64 { if sorted.is_empty() { return 0.0; } if sorted.len() == 1 { return sorted[0]; } let rank = quantile * (sorted.len() - 1) as f64; let lower = rank.floor() as usize; let upper = (lower + 1).min(sorted.len() - 1); let fraction = rank - lower as f64; sorted[lower] + (sorted[upper] - sorted[lower]) * fraction } fn round_to(value: f64, digits: u32) -> f64 { let shift = 10f64.powi(digits as i32); (value * shift).round() / shift } fn print_version(use_json: bool) { if use_json { println!("{}", version_json()); return; } println!("mxgw {CLIENT_VERSION}"); println!("gateway protocol {GATEWAY_PROTOCOL_VERSION}"); println!("worker protocol {WORKER_PROTOCOL_VERSION}"); } fn version_json() -> Value { json!({ "clientVersion": CLIENT_VERSION, "gatewayProtocolVersion": GATEWAY_PROTOCOL_VERSION, "workerProtocolVersion": WORKER_PROTOCOL_VERSION, }) } fn print_command_reply( operation: &str, reply: &mxgateway_client::generated::mxaccess_gateway::v1::MxCommandReply, use_json: bool, ) { if use_json { println!( "{}", json!({ "operation": operation, "sessionId": reply.session_id, "correlationId": reply.correlation_id, "kind": reply.kind, }) ); } else { println!("{operation} completed"); } } fn print_handle(name: &str, handle: i32, use_json: bool) { if use_json { println!("{}", json!({ name: handle })); } else { println!("{handle}"); } } fn print_ok(operation: &str, use_json: bool) { if use_json { println!("{}", json!({ "operation": operation, "ok": true })); } else { println!("{operation} completed"); } } fn print_bulk_results( operation: &str, results: &[mxgateway_client::generated::mxaccess_gateway::v1::SubscribeResult], use_json: bool, ) { if use_json { let results_json: Vec<_> = results .iter() .map(|result| { json!({ "serverHandle": result.server_handle, "tagAddress": result.tag_address, "itemHandle": result.item_handle, "wasSuccessful": result.was_successful, "errorMessage": result.error_message, }) }) .collect(); println!( "{}", json!({ "operation": operation, "results": results_json }) ); } else { println!("{}", results.len()); } } fn print_write_bulk_results( operation: &str, results: &[mxgateway_client::generated::mxaccess_gateway::v1::BulkWriteResult], use_json: bool, ) { if use_json { let results_json: Vec<_> = results .iter() .map(|result| { json!({ "serverHandle": result.server_handle, "itemHandle": result.item_handle, "wasSuccessful": result.was_successful, "hresult": result.hresult, "errorMessage": result.error_message, }) }) .collect(); println!( "{}", json!({ "operation": operation, "results": results_json }) ); } else { println!("{}", results.len()); } } fn print_read_bulk_results( operation: &str, results: &[mxgateway_client::generated::mxaccess_gateway::v1::BulkReadResult], use_json: bool, ) { if use_json { let results_json: Vec<_> = results .iter() .map(|result| { json!({ "serverHandle": result.server_handle, "tagAddress": result.tag_address, "itemHandle": result.item_handle, "wasSuccessful": result.was_successful, "wasCached": result.was_cached, "quality": result.quality, "errorMessage": result.error_message, }) }) .collect(); println!( "{}", json!({ "operation": operation, "results": results_json }) ); } else { println!("{}", results.len()); } } /// Pairs each parsed item handle with its parsed MxValue (proto form) so a /// single helper can build the four bulk-write families without each branch /// repeating the length check and per-value parsing. fn build_write_bulk_entries( item_handles: &[i32], value_type: CliValueType, values: &[String], ) -> Result< Vec<( i32, mxgateway_client::generated::mxaccess_gateway::v1::MxValue, )>, Error, > { if item_handles.len() != values.len() { return Err(Error::InvalidArgument { name: "values".to_owned(), detail: format!( "item-handles count ({}) does not match values count ({})", item_handles.len(), values.len() ), }); } item_handles .iter() .zip(values.iter()) .map(|(handle, value)| { parse_value(value_type, value).map(|wrapper| (*handle, wrapper.into_proto())) }) .collect() } fn parse_value(value_type: CliValueType, value: &str) -> Result { let parsed = match value_type { CliValueType::Bool => MxValue::bool(parse_cli_value(value)?), CliValueType::Int32 => MxValue::int32(parse_cli_value(value)?), CliValueType::Int64 => MxValue::int64(parse_cli_value(value)?), CliValueType::Float => MxValue::float(parse_cli_value(value)?), CliValueType::Double => MxValue::double(parse_cli_value(value)?), CliValueType::String => MxValue::string(value), }; Ok(parsed) } fn print_deploy_event(event: &DeployEvent, use_json: bool) { if use_json { println!( "{}", json!({ "sequence": event.sequence, "observedAt": event.observed_at.as_ref().map(|ts| json!({ "seconds": ts.seconds, "nanos": ts.nanos, })), "timeOfLastDeploy": event.time_of_last_deploy.as_ref().map(|ts| json!({ "seconds": ts.seconds, "nanos": ts.nanos, })), "timeOfLastDeployPresent": event.time_of_last_deploy_present, "objectCount": event.object_count, "attributeCount": event.attribute_count, }) ); } else { let observed = event .observed_at .as_ref() .map(|ts| format!("{}.{:09}", ts.seconds, ts.nanos)) .unwrap_or_else(|| "(absent)".to_owned()); let last_deploy = if event.time_of_last_deploy_present { event .time_of_last_deploy .as_ref() .map(|ts| format!("{}.{:09}", ts.seconds, ts.nanos)) .unwrap_or_else(|| "(absent)".to_owned()) } else { "(absent)".to_owned() }; println!( "seq={} observed={} lastDeploy={} objects={} attributes={}", event.sequence, observed, last_deploy, event.object_count, event.attribute_count, ); } } /// Render a streamed [`AlarmFeedMessage`] as a terse one-line summary that /// distinguishes the three `payload` oneof cases. fn alarm_feed_message_summary(message: &AlarmFeedMessage) -> String { match &message.payload { Some(alarm_feed_message::Payload::ActiveAlarm(snapshot)) => { format!( "active-alarm {} state={}", snapshot.alarm_full_reference, AlarmEnumName::condition_state(snapshot.current_state) ) } Some(alarm_feed_message::Payload::SnapshotComplete(complete)) => { format!("snapshot-complete {complete}") } Some(alarm_feed_message::Payload::Transition(transition)) => { format!( "transition {} kind={}", transition.alarm_full_reference, AlarmEnumName::transition_kind(transition.transition_kind) ) } None => "(empty)".to_owned(), } } /// Render a streamed [`AlarmFeedMessage`] as a JSON object whose single /// top-level key names the active `payload` oneof case, mirroring the /// protobuf-JSON the .NET/Go/Java/Python CLIs emit. fn alarm_feed_message_to_json(message: &AlarmFeedMessage) -> Value { match &message.payload { Some(alarm_feed_message::Payload::ActiveAlarm(snapshot)) => json!({ "activeAlarm": { "alarmFullReference": snapshot.alarm_full_reference, "sourceObjectReference": snapshot.source_object_reference, "alarmTypeName": snapshot.alarm_type_name, "severity": snapshot.severity, "currentState": AlarmEnumName::condition_state(snapshot.current_state), "category": snapshot.category, "description": snapshot.description, "operatorUser": snapshot.operator_user, "operatorComment": snapshot.operator_comment, } }), Some(alarm_feed_message::Payload::SnapshotComplete(complete)) => json!({ "snapshotComplete": complete, }), Some(alarm_feed_message::Payload::Transition(transition)) => json!({ "transition": { "alarmFullReference": transition.alarm_full_reference, "sourceObjectReference": transition.source_object_reference, "alarmTypeName": transition.alarm_type_name, "transitionKind": AlarmEnumName::transition_kind(transition.transition_kind), "severity": transition.severity, "operatorUser": transition.operator_user, "operatorComment": transition.operator_comment, "category": transition.category, "description": transition.description, } }), None => Value::Null, } } /// Tiny namespace for alarm-enum name lookups used by the alarm-feed /// renderers; keeps the proto-enum imports off the `main.rs` top level. struct AlarmEnumName; impl AlarmEnumName { fn condition_state(value: i32) -> String { use mxgateway_client::generated::mxaccess_gateway::v1::AlarmConditionState; AlarmConditionState::try_from(value) .map(|state| state.as_str_name().to_owned()) .unwrap_or_else(|_| value.to_string()) } fn transition_kind(value: i32) -> String { use mxgateway_client::generated::mxaccess_gateway::v1::AlarmTransitionKind; AlarmTransitionKind::try_from(value) .map(|kind| kind.as_str_name().to_owned()) .unwrap_or_else(|_| value.to_string()) } } /// Render an [`AcknowledgeAlarmReply`] as a terse line or a JSON document. fn print_acknowledge_alarm_reply( reply: &mxgateway_client::generated::mxaccess_gateway::v1::AcknowledgeAlarmReply, use_json: bool, ) { if use_json { println!( "{}", json!({ "operation": "acknowledge-alarm", "correlationId": reply.correlation_id, "protocolStatus": reply.protocol_status.as_ref().map(|status| json!({ "code": status.code, "message": status.message, })), "hresult": reply.hresult, "diagnosticMessage": reply.diagnostic_message, }) ); } else { println!("acknowledge-alarm completed"); } } /// Render a streamed [`MxEvent`] as a JSON object. The scalar value is /// projected into protojson-style `*Value` keys so the cross-language e2e /// matrix can extract and compare event values uniformly across all five /// client CLIs. fn event_to_json(event: &MxEvent) -> Value { // Render the family as the proto enum name (e.g. MX_EVENT_FAMILY_ON_WRITE_COMPLETE) // so it matches the protobuf-JSON the .NET/Go/Java/Python CLIs emit. let family = MxEventFamily::try_from(event.family) .map(|family| family.as_str_name().to_owned()) .unwrap_or_else(|_| event.family.to_string()); json!({ "family": family, "sessionId": event.session_id, "serverHandle": event.server_handle, "itemHandle": event.item_handle, "quality": event.quality, "workerSequence": event.worker_sequence, "value": event.value.as_ref().map(event_value_to_json), }) } /// Project an [`MxValue`] into a protojson-shaped JSON object whose single /// key names the scalar kind (`int32Value`, `stringValue`, ...), matching /// the protobuf-JSON the .NET/Go/Java CLIs emit. fn event_value_to_json(value: &ProtoMxValue) -> Value { match MxValue::from_proto(value.clone()).projection() { MxValueProjection::Bool(inner) => json!({ "boolValue": inner }), MxValueProjection::Int32(inner) => json!({ "int32Value": inner }), // protojson renders 64-bit integers as strings; mirror that here. MxValueProjection::Int64(inner) => json!({ "int64Value": inner.to_string() }), MxValueProjection::Float(inner) => json!({ "floatValue": inner }), MxValueProjection::Double(inner) => json!({ "doubleValue": inner }), MxValueProjection::String(inner) => json!({ "stringValue": inner }), MxValueProjection::Timestamp(ts) => { json!({ "timestampValue": { "seconds": ts.seconds, "nanos": ts.nanos } }) } MxValueProjection::Array(_) => json!({ "arrayValue": {} }), MxValueProjection::Raw(bytes) => json!({ "rawValue": { "byteCount": bytes.len() } }), MxValueProjection::Null => json!({ "isNull": true }), MxValueProjection::Unset => Value::Null, } } /// Parse a small but practically-complete subset of RFC3339: /// `YYYY-MM-DDTHH:MM:SS[.fffffffff][Z|+HH:MM|-HH:MM]`. Returns the /// corresponding `prost_types::Timestamp` (Unix seconds + nanoseconds). fn parse_rfc3339_timestamp(input: &str) -> Result { fn invalid(detail: impl Into) -> Error { Error::InvalidArgument { name: "last-seen-deploy-time".to_owned(), detail: detail.into(), } } let bytes = input.as_bytes(); if bytes.len() < 20 || (bytes[10] != b'T' && bytes[10] != b't') { return Err(invalid(format!( "expected RFC3339 timestamp like 2026-04-28T15:30:00Z, got {input:?}" ))); } let read_u32 = |start: usize, len: usize| -> Result { std::str::from_utf8(&bytes[start..start + len]) .ok() .and_then(|slice| slice.parse::().ok()) .ok_or_else(|| invalid(format!("non-numeric digits at byte {start}"))) }; let year = read_u32(0, 4)? as i32; if bytes[4] != b'-' { return Err(invalid("expected '-' after year")); } let month = read_u32(5, 2)?; if bytes[7] != b'-' { return Err(invalid("expected '-' after month")); } let day = read_u32(8, 2)?; let hour = read_u32(11, 2)?; if bytes[13] != b':' { return Err(invalid("expected ':' after hour")); } let minute = read_u32(14, 2)?; if bytes[16] != b':' { return Err(invalid("expected ':' after minute")); } let second = read_u32(17, 2)?; let mut cursor = 19usize; let mut nanos: u32 = 0; if cursor < bytes.len() && bytes[cursor] == b'.' { cursor += 1; let frac_start = cursor; while cursor < bytes.len() && bytes[cursor].is_ascii_digit() { cursor += 1; } let frac_len = cursor - frac_start; if frac_len == 0 { return Err(invalid("expected fractional digits after '.'")); } let take = frac_len.min(9); let frac = std::str::from_utf8(&bytes[frac_start..frac_start + take]) .ok() .and_then(|slice| slice.parse::().ok()) .ok_or_else(|| invalid("invalid fractional digits"))?; nanos = frac * 10u32.pow(9u32.saturating_sub(take as u32)); } let mut offset_seconds: i64 = 0; if cursor >= bytes.len() { return Err(invalid("missing timezone designator (Z or +HH:MM)")); } match bytes[cursor] { b'Z' | b'z' => cursor += 1, sign @ (b'+' | b'-') => { cursor += 1; if cursor + 5 > bytes.len() { return Err(invalid("offset must be ±HH:MM")); } let oh = std::str::from_utf8(&bytes[cursor..cursor + 2]) .ok() .and_then(|slice| slice.parse::().ok()) .ok_or_else(|| invalid("invalid offset hour"))?; if bytes[cursor + 2] != b':' { return Err(invalid("offset must contain ':' between HH and MM")); } let om = std::str::from_utf8(&bytes[cursor + 3..cursor + 5]) .ok() .and_then(|slice| slice.parse::().ok()) .ok_or_else(|| invalid("invalid offset minute"))?; cursor += 5; let signed = if sign == b'-' { -1 } else { 1 }; offset_seconds = signed * (oh * 3600 + om * 60); } other => { return Err(invalid(format!( "unexpected timezone designator byte {other:?}" ))); } } if cursor != bytes.len() { return Err(invalid("trailing characters after timezone")); } let unix = ymdhms_to_unix(year, month, day, hour, minute, second)?; let seconds = unix - offset_seconds; Ok(prost_types::Timestamp { seconds, nanos: nanos as i32, }) } fn ymdhms_to_unix( year: i32, month: u32, day: u32, hour: u32, minute: u32, second: u32, ) -> Result { if !(1..=12).contains(&month) || day < 1 || hour > 23 || minute > 59 || second > 60 { return Err(Error::InvalidArgument { name: "last-seen-deploy-time".to_owned(), detail: "calendar component out of range".to_owned(), }); } fn is_leap(year: i32) -> bool { (year % 4 == 0 && year % 100 != 0) || year % 400 == 0 } const DAYS: [u32; 12] = [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]; let mut max = DAYS[(month - 1) as usize]; if month == 2 && is_leap(year) { max = 29; } if day > max { return Err(Error::InvalidArgument { name: "last-seen-deploy-time".to_owned(), detail: format!("day {day} out of range for month {month}/{year}"), }); } // Days from 1970-01-01 to year-month-day. let mut total_days: i64 = 0; if year >= 1970 { for y in 1970..year { total_days += if is_leap(y) { 366 } else { 365 }; } } else { for y in year..1970 { total_days -= if is_leap(y) { 366 } else { 365 }; } } for m in 1..month { let mut dim = DAYS[(m - 1) as usize]; if m == 2 && is_leap(year) { dim = 29; } total_days += dim as i64; } total_days += (day - 1) as i64; Ok(total_days * 86_400 + hour as i64 * 3600 + minute as i64 * 60 + second as i64) } fn parse_cli_value(value: &str) -> Result where T: std::str::FromStr, T::Err: std::fmt::Display, { value.parse::().map_err(|source| Error::InvalidArgument { name: "value".to_owned(), detail: source.to_string(), }) } #[cfg(test)] mod tests { use clap::Parser; use super::Cli; #[test] fn parses_version_json_command() { let parsed = Cli::try_parse_from(["mxgw", "version", "--json"]); assert!(parsed.is_ok()); } #[test] fn parses_write_command() { let parsed = Cli::try_parse_from([ "mxgw", "write", "--session-id", "session-1", "--server-handle", "12", "--item-handle", "34", "--value-type", "int32", "--value", "123", ]); assert!(parsed.is_ok()); } #[test] fn version_json_output_has_protocol_versions() { let value = super::version_json(); assert_eq!( value["gatewayProtocolVersion"], super::GATEWAY_PROTOCOL_VERSION ); assert_eq!( value["workerProtocolVersion"], super::WORKER_PROTOCOL_VERSION ); } #[test] fn parses_stream_alarms_command() { let parsed = Cli::try_parse_from([ "mxgw", "stream-alarms", "--filter-prefix", "Tank01", "--max-events", "3", "--json", ]); assert!(parsed.is_ok(), "parse failed: {parsed:?}"); } #[test] fn parses_stream_alarms_command_without_filter_prefix() { let parsed = Cli::try_parse_from(["mxgw", "stream-alarms"]); assert!(parsed.is_ok(), "parse failed: {parsed:?}"); } #[test] fn parses_acknowledge_alarm_command() { let parsed = Cli::try_parse_from([ "mxgw", "acknowledge-alarm", "--reference", "Tank01.Level.HiHi", "--comment", "ack from cli", "--operator", "operator1", ]); assert!(parsed.is_ok(), "parse failed: {parsed:?}"); } #[test] fn acknowledge_alarm_requires_reference() { let parsed = Cli::try_parse_from(["mxgw", "acknowledge-alarm"]); assert!(parsed.is_err()); } #[test] fn parses_galaxy_watch_command_with_last_seen_and_max_events() { let parsed = Cli::try_parse_from([ "mxgw", "galaxy", "watch", "--last-seen-deploy-time", "2026-04-28T15:30:00Z", "--max-events", "5", "--json", ]); assert!(parsed.is_ok(), "parse failed: {parsed:?}"); } #[test] fn parses_galaxy_watch_deploy_events_alias() { let parsed = Cli::try_parse_from(["mxgw", "galaxy", "watch-deploy-events"]); assert!(parsed.is_ok(), "parse failed: {parsed:?}"); } #[test] fn rfc3339_parser_round_trips_z_and_offset_inputs() { // 2026-04-28T15:30:00Z = 1_777_995_000 (sanity-checked once below) let utc = super::parse_rfc3339_timestamp("2026-04-28T15:30:00Z").unwrap(); let plus = super::parse_rfc3339_timestamp("2026-04-28T16:30:00+01:00").unwrap(); let frac = super::parse_rfc3339_timestamp("2026-04-28T15:30:00.250Z").unwrap(); assert_eq!(utc.seconds, plus.seconds); assert_eq!(utc.nanos, 0); assert_eq!(frac.seconds, utc.seconds); assert_eq!(frac.nanos, 250_000_000); } #[test] fn bench_read_bulk_stats_tracks_all_calls_in_latency_and_failures_separately() { use mxgateway_client::generated::mxaccess_gateway::v1::BulkReadResult; use mxgateway_client::Error; let mut stats = super::BenchReadBulkStats::default(); let cached = BulkReadResult { was_cached: true, was_successful: true, ..BulkReadResult::default() }; let uncached = BulkReadResult { was_cached: false, was_successful: true, ..BulkReadResult::default() }; // Two fast successes and one slow failure: every call lands in the // all-calls histogram (the cross-language `latencyMs` contract) and // the failure additionally surfaces through `failureLatencyMs` plus // `firstFailure` as Rust-only enrichment. stats.record_success(1.5, std::slice::from_ref(&cached)); stats.record_success(2.0, std::slice::from_ref(&uncached)); let failure = Error::MalformedReply { detail: "synthetic failure for the bench test".to_owned(), }; stats.record_failure(1_500.0, &failure); assert_eq!(stats.latencies_ms, vec![1.5, 2.0, 1_500.0]); assert_eq!(stats.failure_latencies_ms, vec![1_500.0]); assert_eq!(stats.successful_calls, 2); assert_eq!(stats.failed_calls, 1); assert_eq!(stats.total_calls(), 3); assert_eq!(stats.total_read_results, 2); assert_eq!(stats.cached_read_results, 1); assert!(stats .first_failure .as_deref() .unwrap() .contains("synthetic failure")); let elapsed = std::time::Duration::from_secs(1); let context = super::BenchReadBulkContext { endpoint: "http://fake", client_name: "client", bulk_size: 2, duration_seconds: 1, warmup_seconds: 0, steady_elapsed: elapsed, tags: &[], }; let payload = stats.to_json(&context); // The all-calls histogram (cross-language `latencyMs` contract) // includes the failure latency so the side-by-side comparison with // .NET/Go/Python/Java stays apples-to-apples. assert_eq!(payload["latencyMs"]["max"].as_f64().unwrap(), 1_500.0); // The Rust-only `failureLatencyMs` enrichment surfaces failures // separately for partial-failure diagnostics. assert_eq!( payload["failureLatencyMs"]["max"].as_f64().unwrap(), 1_500.0 ); assert_eq!(payload["failedCalls"].as_u64().unwrap(), 1); assert_eq!(payload["successfulCalls"].as_u64().unwrap(), 2); assert!(payload["firstFailure"] .as_str() .unwrap() .contains("synthetic failure")); } #[test] fn bench_read_bulk_stats_calls_per_second_handles_zero_duration() { let stats = super::BenchReadBulkStats::default(); assert_eq!(stats.calls_per_second(std::time::Duration::ZERO), 0.0); } }