//! `mxgw` — the Rust test CLI for the MXAccess Gateway. //! //! The binary wraps [`zb_mom_ww_mxgateway_client`] in a `clap`-driven command surface //! used by the cross-language smoke matrix and by developers exercising the //! gateway by hand. Every subcommand mirrors a single gateway/Galaxy RPC, //! prints either a terse line or a JSON document with `--json`, and exits //! non-zero on any failure. #![warn(missing_docs)] use std::env; use std::io::{self, BufRead, Write}; use std::path::PathBuf; use std::process::ExitCode; use std::time::{Duration, Instant}; use clap::{Args, Parser, Subcommand, ValueEnum}; use futures_util::StreamExt; use serde_json::json; use serde_json::Value; use zb_mom_ww_mxgateway_client::generated::galaxy_repository::v1::DeployEvent; use zb_mom_ww_mxgateway_client::generated::mxaccess_gateway::v1::{ alarm_feed_message, AcknowledgeAlarmRequest, AlarmFeedMessage, CloseSessionRequest, MxCommand, MxCommandKind, MxCommandRequest, MxEvent, MxEventFamily, MxValue as ProtoMxValue, OpenSessionRequest, PingCommand, StreamAlarmsRequest, StreamEventsRequest, Write2BulkEntry, WriteBulkEntry, WriteSecured2BulkEntry, WriteSecuredBulkEntry, }; use zb_mom_ww_mxgateway_client::{ next_correlation_id, ApiKey, ClientOptions, Error, GalaxyClient, GatewayClient, MxValue, MxValueProjection, CLIENT_VERSION, GATEWAY_PROTOCOL_VERSION, WORKER_PROTOCOL_VERSION, }; const MAX_AGGREGATE_EVENTS: usize = 10_000; #[derive(Debug, Parser)] #[command(name = "mxgw")] #[command(about = "MXAccess Gateway Rust test CLI")] struct Cli { #[command(subcommand)] command: Command, } #[derive(Debug, Subcommand)] enum Command { Version { #[arg(long)] json: bool, #[arg(long)] jsonl: bool, }, Ping { #[command(flatten)] connection: ConnectionArgs, #[arg(long, default_value = "ping")] message: String, #[arg(long)] json: bool, }, OpenSession { #[command(flatten)] connection: ConnectionArgs, #[arg(long, default_value = "mxgw-rust-cli")] client_name: String, #[arg(long)] json: bool, }, CloseSession { #[command(flatten)] connection: ConnectionArgs, #[arg(long)] session_id: String, #[arg(long)] json: bool, }, Register { #[command(flatten)] connection: ConnectionArgs, #[arg(long)] session_id: String, #[arg(long, default_value = "mxgw-rust-cli")] client_name: String, #[arg(long)] json: bool, }, AddItem { #[command(flatten)] connection: ConnectionArgs, #[arg(long)] session_id: String, #[arg(long)] server_handle: i32, #[arg(long)] item: String, #[arg(long)] json: bool, }, Advise { #[command(flatten)] connection: ConnectionArgs, #[arg(long)] session_id: String, #[arg(long)] server_handle: i32, #[arg(long)] item_handle: i32, #[arg(long)] json: bool, }, SubscribeBulk { #[command(flatten)] connection: ConnectionArgs, #[arg(long)] session_id: String, #[arg(long)] server_handle: i32, #[arg(long, value_delimiter = ',')] items: Vec, #[arg(long)] json: bool, }, UnsubscribeBulk { #[command(flatten)] connection: ConnectionArgs, #[arg(long)] session_id: String, #[arg(long)] server_handle: i32, #[arg(long, value_delimiter = ',')] item_handles: Vec, #[arg(long)] json: bool, }, /// Snapshot the current value for each requested tag. Cached /// OnDataChange values are returned for tags that are already advised /// without touching the existing subscription; otherwise the worker /// takes a one-shot AddItem + Advise + UnAdvise + RemoveItem lifecycle. ReadBulk { #[command(flatten)] connection: ConnectionArgs, #[arg(long)] session_id: String, #[arg(long)] server_handle: i32, #[arg(long, value_delimiter = ',')] items: Vec, /// Per-tag snapshot timeout in milliseconds. `0` uses the worker default (1000 ms). #[arg(long, default_value_t = 0)] timeout_ms: u32, #[arg(long)] json: bool, }, /// Bulk Write — one MXAccess Write per (item_handle, value) pair. WriteBulk { #[command(flatten)] connection: ConnectionArgs, #[arg(long)] session_id: String, #[arg(long)] server_handle: i32, #[arg(long, value_delimiter = ',')] item_handles: Vec, #[arg(long, value_enum)] value_type: CliValueType, #[arg(long, value_delimiter = ',')] values: Vec, #[arg(long, default_value_t = 0)] user_id: i32, #[arg(long)] json: bool, }, /// Bulk Write2 — timestamped variant; the timestamp applies to all entries. Write2Bulk { #[command(flatten)] connection: ConnectionArgs, #[arg(long)] session_id: String, #[arg(long)] server_handle: i32, #[arg(long, value_delimiter = ',')] item_handles: Vec, #[arg(long, value_enum)] value_type: CliValueType, #[arg(long, value_delimiter = ',')] values: Vec, #[arg(long)] timestamp: String, #[arg(long, default_value_t = 0)] user_id: i32, #[arg(long)] json: bool, }, /// Bulk WriteSecured. WriteSecuredBulk { #[command(flatten)] connection: ConnectionArgs, #[arg(long)] session_id: String, #[arg(long)] server_handle: i32, #[arg(long, value_delimiter = ',')] item_handles: Vec, #[arg(long, value_enum)] value_type: CliValueType, #[arg(long, value_delimiter = ',')] values: Vec, #[arg(long, default_value_t = 0)] current_user_id: i32, #[arg(long, default_value_t = 0)] verifier_user_id: i32, #[arg(long)] json: bool, }, /// Bulk WriteSecured2 — timestamped + verified. WriteSecured2Bulk { #[command(flatten)] connection: ConnectionArgs, #[arg(long)] session_id: String, #[arg(long)] server_handle: i32, #[arg(long, value_delimiter = ',')] item_handles: Vec, #[arg(long, value_enum)] value_type: CliValueType, #[arg(long, value_delimiter = ',')] values: Vec, #[arg(long)] timestamp: String, #[arg(long, default_value_t = 0)] current_user_id: i32, #[arg(long, default_value_t = 0)] verifier_user_id: i32, #[arg(long)] json: bool, }, /// Cross-language stress benchmark for ReadBulk: opens its own session, /// subscribes to `--bulk-size` tags so the worker's per-session value cache /// populates from real OnDataChange events, then hammers ReadBulk in a /// tight loop for `--duration-seconds` with per-call latency capture. Emits /// a single JSON object on stdout matching the schema the /// `scripts/bench-read-bulk.ps1` driver collates across all five clients. BenchReadBulk { #[command(flatten)] connection: ConnectionArgs, #[arg(long, default_value = "mxgw-rust-bench")] client_name: String, #[arg(long, default_value_t = 30)] duration_seconds: u64, #[arg(long, default_value_t = 3)] warmup_seconds: u64, #[arg(long, default_value_t = 6)] bulk_size: usize, #[arg(long, default_value_t = 1)] tag_start: i32, #[arg(long, default_value = "TestMachine_")] tag_prefix: String, #[arg(long, default_value = "TestChangingInt")] tag_attribute: String, #[arg(long, default_value_t = 1500)] timeout_ms: u32, #[arg(long)] json: bool, }, StreamEvents { #[command(flatten)] connection: ConnectionArgs, #[arg(long)] session_id: String, #[arg(long, default_value_t = 0)] after_worker_sequence: u64, #[arg(long, default_value_t = 1)] max_events: usize, #[arg(long)] json: bool, #[arg(long)] jsonl: bool, }, /// Attach to the gateway's session-less central alarm feed. The stream /// opens with one `active_alarm` per currently-active alarm, then a /// single `snapshot_complete`, then a `transition` for every subsequent /// raise / acknowledge / clear. StreamAlarms { #[command(flatten)] connection: ConnectionArgs, /// Optional alarm-reference prefix scoping the feed to an equipment /// sub-tree. Omit to stream every active alarm. #[arg(long)] filter_prefix: Option, #[arg(long, default_value_t = 1)] max_events: usize, #[arg(long)] json: bool, #[arg(long)] jsonl: bool, }, /// Acknowledge an active MXAccess alarm condition through the gateway's /// session-less AcknowledgeAlarm RPC. AcknowledgeAlarm { #[command(flatten)] connection: ConnectionArgs, #[arg(long)] reference: String, #[arg(long, default_value = "")] comment: String, #[arg(long, default_value = "")] operator: String, #[arg(long)] json: bool, }, Write { #[command(flatten)] connection: ConnectionArgs, #[arg(long)] session_id: String, #[arg(long)] server_handle: i32, #[arg(long)] item_handle: i32, #[arg(long, value_enum)] value_type: CliValueType, #[arg(long)] value: String, #[arg(long, default_value_t = 0)] user_id: i32, #[arg(long)] json: bool, }, Write2 { #[command(flatten)] connection: ConnectionArgs, #[arg(long)] session_id: String, #[arg(long)] server_handle: i32, #[arg(long)] item_handle: i32, #[arg(long, value_enum)] value_type: CliValueType, #[arg(long)] value: String, #[arg(long)] timestamp: String, #[arg(long, default_value_t = 0)] user_id: i32, #[arg(long)] json: bool, }, Smoke { #[command(flatten)] connection: ConnectionArgs, #[arg(long)] item: String, #[arg(long, default_value = "mxgw-rust-smoke")] client_name: String, #[arg(long)] json: bool, }, /// Read commands from stdin, one per line, execute each in sequence, and /// write `__MXGW_BATCH_EOR__` to stdout after every result. Errors are /// written as `{"error":"…","type":"error"}` JSON to stdout (not stderr) /// so the harness can parse them without interleaving stderr. The loop /// never terminates on command error or accidental blank lines; only /// stdin EOF ends the session — empty lines log an empty-EOR-bracketed /// result and continue, matching the other four language CLIs. Batch, #[command(subcommand)] Galaxy(GalaxyCommand), } #[derive(Debug, Subcommand)] enum GalaxyCommand { TestConnection { #[command(flatten)] connection: ConnectionArgs, #[arg(long)] json: bool, }, LastDeployTime { #[command(flatten)] connection: ConnectionArgs, #[arg(long)] json: bool, }, DiscoverHierarchy { #[command(flatten)] connection: ConnectionArgs, #[arg(long)] json: bool, }, /// Subscribe to the WatchDeployEvents server stream. /// /// Prints one line per received event (or one JSON object with `--json`). /// Runs until the stream ends, the server fails the call, or the /// process is interrupted (Ctrl+C). #[command(alias = "watch-deploy-events")] Watch { #[command(flatten)] connection: ConnectionArgs, /// Optional RFC3339 timestamp (e.g. `2026-04-28T15:30:00Z`). When /// supplied, the server suppresses the bootstrap event if its /// cached deploy time matches this value. #[arg(long)] last_seen_deploy_time: Option, /// Optional cap on the number of events to print before exiting. /// 0 (the default) means run until cancelled or the stream ends. #[arg(long, default_value_t = 0)] max_events: usize, #[arg(long)] json: bool, }, } #[derive(Debug, Args, Clone)] struct ConnectionArgs { #[arg(long, default_value = "http://127.0.0.1:5000")] endpoint: String, #[arg(long)] api_key: Option, #[arg(long, default_value = "MXGATEWAY_API_KEY")] api_key_env: String, #[arg(long)] plaintext: bool, #[arg(long)] tls: bool, #[arg(long)] ca_file: Option, #[arg(long)] server_name_override: Option, /// Verify the server certificate against the system trust roots even /// without a pinned CA. The Rust client's default is to require a CA /// file (see `--ca-file`); set this flag to use system roots instead. #[arg(long)] require_certificate_validation: bool, #[arg(long, default_value_t = 10)] connect_timeout_seconds: u64, #[arg(long, default_value_t = 30)] call_timeout_seconds: u64, } impl ConnectionArgs { fn options(&self) -> ClientOptions { let mut options = ClientOptions::new(self.endpoint.clone()) .with_plaintext(!self.tls || self.plaintext) .with_connect_timeout(Duration::from_secs(self.connect_timeout_seconds)) .with_call_timeout(Duration::from_secs(self.call_timeout_seconds)); if let Some(api_key) = self .api_key .clone() .or_else(|| env::var(&self.api_key_env).ok()) .filter(|value| !value.is_empty()) { options = options.with_api_key(ApiKey::new(api_key)); } if let Some(ca_file) = &self.ca_file { options = options.with_ca_file(ca_file); } if let Some(server_name_override) = &self.server_name_override { options = options.with_server_name_override(server_name_override); } if self.require_certificate_validation { options = options.with_require_certificate_validation(true); } options } } #[derive(Clone, Copy, Debug, ValueEnum)] enum CliValueType { Bool, Int32, Int64, Float, Double, String, } #[tokio::main] async fn main() -> ExitCode { let cli = Cli::parse(); let result = match cli.command { Command::Batch => run_batch().await, command => dispatch(command).await, }; match result { Ok(()) => ExitCode::SUCCESS, Err(error) => { eprintln!("{error}"); ExitCode::FAILURE } } } /// Dispatch a parsed [`Command`] to its handler. All subcommands except /// [`Command::Batch`] are handled here; `Batch` is handled separately in /// `main` to avoid mutual recursion between `dispatch` and `run_batch`. async fn dispatch(command: Command) -> Result<(), Error> { match command { Command::Batch => { return Err(Error::InvalidArgument { name: "batch".to_owned(), detail: "batch cannot be nested inside another batch session".to_owned(), }); } Command::Version { json, .. } => print_version(json), Command::Ping { connection, message, json, } => { let client = connect(connection).await?; let reply = client .invoke(MxCommandRequest { client_correlation_id: next_correlation_id("cli-ping"), command: Some(MxCommand { kind: MxCommandKind::Ping as i32, payload: Some(zb_mom_ww_mxgateway_client::generated::mxaccess_gateway::v1::mx_command::Payload::Ping( PingCommand { message }, )), }), ..MxCommandRequest::default() }) .await?; print_command_reply("ping", &reply, json); } Command::OpenSession { connection, client_name, json, } => { let client = connect(connection).await?; let reply = client .open_session_raw(OpenSessionRequest { client_session_name: client_name, ..OpenSessionRequest::default() }) .await?; if json { println!( "{}", json!({ "sessionId": reply.session_id, "backendName": reply.backend_name, "gatewayProtocolVersion": reply.gateway_protocol_version, "workerProtocolVersion": reply.worker_protocol_version, }) ); } else { println!("{}", reply.session_id); } } Command::CloseSession { connection, session_id, json, } => { let client = connect(connection).await?; let reply = client .close_session_raw(CloseSessionRequest { session_id, client_correlation_id: next_correlation_id("cli-close-session"), }) .await?; if json { println!("{}", json!({ "sessionId": reply.session_id })); } else { println!("closed {}", reply.session_id); } } Command::Register { connection, session_id, client_name, json, } => { let session = session_for(connection, session_id).await?; let server_handle = session.register(&client_name).await?; print_handle("serverHandle", server_handle, json); } Command::AddItem { connection, session_id, server_handle, item, json, } => { let session = session_for(connection, session_id).await?; let item_handle = session.add_item(server_handle, &item).await?; print_handle("itemHandle", item_handle, json); } Command::Advise { connection, session_id, server_handle, item_handle, json, } => { let session = session_for(connection, session_id).await?; session.advise(server_handle, item_handle).await?; print_ok("advise", json); } Command::SubscribeBulk { connection, session_id, server_handle, items, json, } => { let session = session_for(connection, session_id).await?; let results = session.subscribe_bulk(server_handle, items).await?; print_bulk_results("subscribe-bulk", &results, json); } Command::UnsubscribeBulk { connection, session_id, server_handle, item_handles, json, } => { let session = session_for(connection, session_id).await?; let results = session .unsubscribe_bulk(server_handle, item_handles) .await?; print_bulk_results("unsubscribe-bulk", &results, json); } Command::ReadBulk { connection, session_id, server_handle, items, timeout_ms, json, } => { let session = session_for(connection, session_id).await?; let results = session.read_bulk(server_handle, &items, timeout_ms).await?; print_read_bulk_results("read-bulk", &results, json); } Command::WriteBulk { connection, session_id, server_handle, item_handles, value_type, values, user_id, json, } => { let entries = build_write_bulk_entries(&item_handles, value_type, &values)?; let session = session_for(connection, session_id).await?; let results = session .write_bulk( server_handle, entries .into_iter() .map(|(item_handle, value)| WriteBulkEntry { item_handle, value: Some(value), user_id, }) .collect(), ) .await?; print_write_bulk_results("write-bulk", &results, json); } Command::Write2Bulk { connection, session_id, server_handle, item_handles, value_type, values, timestamp, user_id, json, } => { let entries = build_write_bulk_entries(&item_handles, value_type, &values)?; let timestamp_value: ProtoMxValue = MxValue::string(timestamp).into_proto(); let session = session_for(connection, session_id).await?; let results = session .write2_bulk( server_handle, entries .into_iter() .map(|(item_handle, value)| Write2BulkEntry { item_handle, value: Some(value), timestamp_value: Some(timestamp_value.clone()), user_id, }) .collect(), ) .await?; print_write_bulk_results("write2-bulk", &results, json); } Command::WriteSecuredBulk { connection, session_id, server_handle, item_handles, value_type, values, current_user_id, verifier_user_id, json, } => { let entries = build_write_bulk_entries(&item_handles, value_type, &values)?; let session = session_for(connection, session_id).await?; let results = session .write_secured_bulk( server_handle, entries .into_iter() .map(|(item_handle, value)| WriteSecuredBulkEntry { item_handle, value: Some(value), current_user_id, verifier_user_id, }) .collect(), ) .await?; print_write_bulk_results("write-secured-bulk", &results, json); } Command::WriteSecured2Bulk { connection, session_id, server_handle, item_handles, value_type, values, timestamp, current_user_id, verifier_user_id, json, } => { let entries = build_write_bulk_entries(&item_handles, value_type, &values)?; let timestamp_value: ProtoMxValue = MxValue::string(timestamp).into_proto(); let session = session_for(connection, session_id).await?; let results = session .write_secured2_bulk( server_handle, entries .into_iter() .map(|(item_handle, value)| WriteSecured2BulkEntry { item_handle, value: Some(value), timestamp_value: Some(timestamp_value.clone()), current_user_id, verifier_user_id, }) .collect(), ) .await?; print_write_bulk_results("write-secured2-bulk", &results, json); } Command::BenchReadBulk { connection, client_name, duration_seconds, warmup_seconds, bulk_size, tag_start, tag_prefix, tag_attribute, timeout_ms, json, } => { run_bench_read_bulk( connection, client_name, duration_seconds, warmup_seconds, bulk_size, tag_start, tag_prefix, tag_attribute, timeout_ms, json, ) .await?; } Command::StreamEvents { connection, session_id, after_worker_sequence, max_events, json, jsonl, } => { if max_events > MAX_AGGREGATE_EVENTS { return Err(Error::InvalidArgument { name: "max-events".to_owned(), detail: format!("must be less than or equal to {MAX_AGGREGATE_EVENTS}"), }); } let client = connect(connection).await?; let mut stream = client .stream_events(StreamEventsRequest { session_id, after_worker_sequence, }) .await?; let mut events: Vec = Vec::new(); let mut event_count = 0usize; while event_count < max_events { let Some(event) = stream.next().await else { break; }; let event = event?; event_count += 1; if jsonl { println!("{}", event_to_json(&event)); } else if json { events.push(event_to_json(&event)); } else { println!("{} {}", event.worker_sequence, event.family); } } if json { // `eventCount` is preserved for back-compat; `events` carries // the per-event detail the cross-language e2e matrix compares. println!("{}", json!({ "eventCount": event_count, "events": events })); } } Command::StreamAlarms { connection, filter_prefix, max_events, json, jsonl, } => { if max_events > MAX_AGGREGATE_EVENTS { return Err(Error::InvalidArgument { name: "max-events".to_owned(), detail: format!("must be less than or equal to {MAX_AGGREGATE_EVENTS}"), }); } let client = connect(connection).await?; let mut stream = client .stream_alarms(StreamAlarmsRequest { client_correlation_id: next_correlation_id("cli-stream-alarms"), alarm_filter_prefix: filter_prefix.unwrap_or_default(), }) .await?; let mut messages: Vec = Vec::new(); let mut message_count = 0usize; while message_count < max_events { let Some(message) = stream.next().await else { break; }; let message = message?; message_count += 1; if jsonl { println!("{}", alarm_feed_message_to_json(&message)); } else if json { messages.push(alarm_feed_message_to_json(&message)); } else { println!("{}", alarm_feed_message_summary(&message)); } } if json { println!( "{}", json!({ "messageCount": message_count, "messages": messages }) ); } } Command::AcknowledgeAlarm { connection, reference, comment, operator, json, } => { let client = connect(connection).await?; let reply = client .acknowledge_alarm(AcknowledgeAlarmRequest { client_correlation_id: next_correlation_id("cli-acknowledge-alarm"), alarm_full_reference: reference, comment, operator_user: operator, }) .await?; print_acknowledge_alarm_reply(&reply, json); } Command::Write { connection, session_id, server_handle, item_handle, value_type, value, user_id, json, } => { let session = session_for(connection, session_id).await?; session .write( server_handle, item_handle, parse_value(value_type, &value)?, user_id, ) .await?; print_ok("write", json); } Command::Write2 { connection, session_id, server_handle, item_handle, value_type, value, timestamp, user_id, json, } => { let session = session_for(connection, session_id).await?; session .write2( server_handle, item_handle, parse_value(value_type, &value)?, MxValue::string(timestamp), user_id, ) .await?; print_ok("write2", json); } Command::Galaxy(galaxy_command) => run_galaxy(galaxy_command).await?, Command::Smoke { connection, item, client_name, json, } => { let client = connect(connection).await?; let session = client .open_session(OpenSessionRequest { client_session_name: client_name.clone(), ..OpenSessionRequest::default() }) .await?; let result = async { let server_handle = session.register(&client_name).await?; let item_handle = session.add_item(server_handle, &item).await?; session.advise(server_handle, item_handle).await?; Ok::<_, Error>((server_handle, item_handle)) } .await; let close_result = session.close().await; let (server_handle, item_handle) = result?; close_result?; if json { println!( "{}", json!({ "sessionId": session.id(), "serverHandle": server_handle, "itemHandle": item_handle, "closed": true, }) ); } else { println!( "session {} registered server {server_handle}, item {item_handle}, closed", session.id() ); } } } Ok(()) } async fn connect(connection: ConnectionArgs) -> Result { GatewayClient::connect(connection.options()).await } async fn connect_galaxy(connection: ConnectionArgs) -> Result { GalaxyClient::connect(connection.options()).await } async fn run_galaxy(command: GalaxyCommand) -> Result<(), Error> { match command { GalaxyCommand::TestConnection { connection, json } => { let mut client = connect_galaxy(connection).await?; let ok = client.test_connection().await?; if json { println!("{}", json!({ "ok": ok })); } else if ok { println!("ok"); } else { println!("not ok"); } } GalaxyCommand::LastDeployTime { connection, json } => { let mut client = connect_galaxy(connection).await?; let timestamp = client.get_last_deploy_time().await?; match (json, timestamp) { (true, Some(ts)) => { println!( "{}", json!({ "present": true, "seconds": ts.seconds, "nanos": ts.nanos, }) ); } (true, None) => { println!("{}", json!({ "present": false })); } (false, Some(ts)) => println!("{}.{:09}", ts.seconds, ts.nanos), (false, None) => println!("(absent)"), } } GalaxyCommand::Watch { connection, last_seen_deploy_time, max_events, json, } => { let mut client = connect_galaxy(connection).await?; let last_seen = last_seen_deploy_time .as_deref() .map(parse_rfc3339_timestamp) .transpose()?; let mut stream = client.watch_deploy_events(last_seen).await?; let mut count = 0usize; loop { tokio::select! { biased; signal = tokio::signal::ctrl_c() => { signal.map_err(|err| Error::InvalidArgument { name: "ctrl_c".to_owned(), detail: err.to_string(), })?; // Drop the stream below by breaking; tonic tears the // gRPC call down cooperatively. break; } next = stream.next() => { let Some(event) = next else { break; }; let event = event?; count += 1; print_deploy_event(&event, json); if max_events != 0 && count >= max_events { break; } } } } } GalaxyCommand::DiscoverHierarchy { connection, json } => { let mut client = connect_galaxy(connection).await?; let objects = client.discover_hierarchy().await?; if json { let payload: Vec<_> = objects .iter() .map(|object| { json!({ "gobjectId": object.gobject_id, "tagName": object.tag_name, "containedName": object.contained_name, "browseName": object.browse_name, "parentGobjectId": object.parent_gobject_id, "isArea": object.is_area, "categoryId": object.category_id, "hostedByGobjectId": object.hosted_by_gobject_id, "templateChain": object.template_chain, "attributes": object.attributes.iter().map(|attribute| json!({ "attributeName": attribute.attribute_name, "fullTagReference": attribute.full_tag_reference, "mxDataType": attribute.mx_data_type, "dataTypeName": attribute.data_type_name, "isArray": attribute.is_array, "arrayDimension": attribute.array_dimension, "arrayDimensionPresent": attribute.array_dimension_present, "mxAttributeCategory": attribute.mx_attribute_category, "securityClassification": attribute.security_classification, "isHistorized": attribute.is_historized, "isAlarm": attribute.is_alarm, })).collect::>(), }) }) .collect(); println!("{}", json!({ "objects": payload })); } else { println!("{}", objects.len()); for object in &objects { println!( "{} {} {} ({} attribute(s))", object.gobject_id, object.tag_name, object.browse_name, object.attributes.len() ); } } } } Ok(()) } async fn session_for( connection: ConnectionArgs, session_id: String, ) -> Result { let client = connect(connection).await?; Ok(client.session(session_id)) } /// End-of-result sentinel written to stdout after every batch command. const BATCH_EOR: &str = "__MXGW_BATCH_EOR__"; /// Run the batch loop: read one command line at a time from stdin, dispatch /// each through the normal [`dispatch`] path, and write [`BATCH_EOR`] to /// stdout after every result. Errors are serialised as JSON to stdout so /// the harness can parse them without interleaving stderr. The loop never /// terminates on command error or accidental blank lines; only stdin EOF /// ends the session — empty lines log an empty-EOR-bracketed result and /// continue. /// /// `std::io::Stdin::lock().lines()` is a blocking iterator and the dispatch /// future is spawned on a separate tokio task so the runtime's main worker /// stays free. When the runtime is multi-threaded the blocking read keeps /// one worker parked on `ReadFile`; that is acceptable here because no other /// future on the main task needs to run while we wait for the next command. async fn run_batch() -> Result<(), Error> { let stdin = io::stdin(); let stdout = io::stdout(); for line in stdin.lock().lines() { let line = line.map_err(|e| Error::InvalidArgument { name: "stdin".to_owned(), detail: e.to_string(), })?; let parts: Vec<&str> = line.split_ascii_whitespace().collect(); if parts.is_empty() { // Empty / whitespace-only line: log an empty-EOR-bracketed // result and continue so accidental blank lines from the // PowerShell e2e harness do not silently end the session. println!("{BATCH_EOR}"); stdout.lock().flush().ok(); continue; } // Re-parse the split arguments under a fresh Cli, prepending the // program-name placeholder so clap sees a complete argv[]. let parse_result = Cli::try_parse_from(std::iter::once("mxgw-cli").chain(parts.iter().copied())); let outcome: Result<(), Error> = match parse_result { Ok(cli) => { // Spawn on a new tokio task so each command runs with a fresh // stack, avoiding stack overflow from the large dispatch future. tokio::task::spawn(dispatch(cli.command)) .await .unwrap_or_else(|join_err| { Err(Error::InvalidArgument { name: "task".to_owned(), detail: join_err.to_string(), }) }) } Err(clap_err) => Err(Error::InvalidArgument { name: "args".to_owned(), detail: clap_err.to_string(), }), }; if let Err(err) = outcome { // Write error as JSON to stdout so the harness sees it in the // same stream as normal output, framed by the EOR sentinel. println!( "{}", serde_json::json!({ "error": err.to_string(), "type": "error" }) ); } println!("{BATCH_EOR}"); stdout.lock().flush().ok(); } Ok(()) } fn print_version(use_json: bool) { if use_json { println!("{}", version_json()); return; } println!("mxgw {CLIENT_VERSION}"); println!("gateway protocol {GATEWAY_PROTOCOL_VERSION}"); println!("worker protocol {WORKER_PROTOCOL_VERSION}"); } fn version_json() -> Value { json!({ "clientVersion": CLIENT_VERSION, "gatewayProtocolVersion": GATEWAY_PROTOCOL_VERSION, "workerProtocolVersion": WORKER_PROTOCOL_VERSION, }) } fn print_command_reply( operation: &str, reply: &zb_mom_ww_mxgateway_client::generated::mxaccess_gateway::v1::MxCommandReply, use_json: bool, ) { if use_json { println!( "{}", json!({ "operation": operation, "sessionId": reply.session_id, "correlationId": reply.correlation_id, "kind": reply.kind, }) ); } else { println!("{operation} completed"); } } fn print_handle(name: &str, handle: i32, use_json: bool) { if use_json { println!("{}", json!({ name: handle })); } else { println!("{handle}"); } } fn print_ok(operation: &str, use_json: bool) { if use_json { println!("{}", json!({ "operation": operation, "ok": true })); } else { println!("{operation} completed"); } } fn print_bulk_results( operation: &str, results: &[zb_mom_ww_mxgateway_client::generated::mxaccess_gateway::v1::SubscribeResult], use_json: bool, ) { if use_json { let results_json: Vec<_> = results .iter() .map(|result| { json!({ "serverHandle": result.server_handle, "tagAddress": result.tag_address, "itemHandle": result.item_handle, "wasSuccessful": result.was_successful, "errorMessage": result.error_message, }) }) .collect(); println!( "{}", json!({ "operation": operation, "results": results_json }) ); } else { println!("{}", results.len()); } } fn parse_value(value_type: CliValueType, value: &str) -> Result { let parsed = match value_type { CliValueType::Bool => MxValue::bool(parse_cli_value(value)?), CliValueType::Int32 => MxValue::int32(parse_cli_value(value)?), CliValueType::Int64 => MxValue::int64(parse_cli_value(value)?), CliValueType::Float => MxValue::float(parse_cli_value(value)?), CliValueType::Double => MxValue::double(parse_cli_value(value)?), CliValueType::String => MxValue::string(value), }; Ok(parsed) } fn print_write_bulk_results( operation: &str, results: &[zb_mom_ww_mxgateway_client::generated::mxaccess_gateway::v1::BulkWriteResult], use_json: bool, ) { if use_json { let results_json: Vec<_> = results .iter() .map(|result| { json!({ "serverHandle": result.server_handle, "itemHandle": result.item_handle, "wasSuccessful": result.was_successful, "hresult": result.hresult, "errorMessage": result.error_message, }) }) .collect(); println!( "{}", json!({ "operation": operation, "results": results_json }) ); } else { println!("{}", results.len()); } } fn print_read_bulk_results( operation: &str, results: &[zb_mom_ww_mxgateway_client::generated::mxaccess_gateway::v1::BulkReadResult], use_json: bool, ) { if use_json { let results_json: Vec<_> = results .iter() .map(|result| { json!({ "serverHandle": result.server_handle, "tagAddress": result.tag_address, "itemHandle": result.item_handle, "wasSuccessful": result.was_successful, "wasCached": result.was_cached, "quality": result.quality, "errorMessage": result.error_message, }) }) .collect(); println!( "{}", json!({ "operation": operation, "results": results_json }) ); } else { println!("{}", results.len()); } } /// Drive the cross-language ReadBulk stress benchmark from Rust: opens its /// own session, subscribes to `bulk_size` tags so the worker's per-session /// value cache populates from real OnDataChange events, hammers ReadBulk in /// a tight loop for `duration_seconds` with per-call latency capture, and /// emits the shared JSON stats schema the `scripts/bench-read-bulk.ps1` /// driver collates across all five clients. #[allow(clippy::too_many_arguments)] async fn run_bench_read_bulk( connection: ConnectionArgs, client_name: String, duration_seconds: u64, warmup_seconds: u64, bulk_size: usize, tag_start: i32, tag_prefix: String, tag_attribute: String, timeout_ms: u32, json: bool, ) -> Result<(), Error> { if bulk_size == 0 { return Err(Error::InvalidArgument { name: "bulk-size".to_owned(), detail: "bulk-size must be positive".to_owned(), }); } if duration_seconds == 0 { return Err(Error::InvalidArgument { name: "duration-seconds".to_owned(), detail: "duration-seconds must be positive".to_owned(), }); } // Build TestMachine_NNN. tags with three-digit machine numbers // matching the existing cross-language tag-discovery convention. let tags: Vec = (0..bulk_size) .map(|index| { format!( "{prefix}{number:03}.{attr}", prefix = tag_prefix, number = tag_start + index as i32, attr = tag_attribute, ) }) .collect(); let endpoint = connection.endpoint.clone(); let client = connect(connection).await?; let session = client .open_session(OpenSessionRequest { client_session_name: client_name.clone(), ..OpenSessionRequest::default() }) .await?; let session_id = session.id().to_owned(); // Subscribe so the worker's MxAccessValueCache populates from real // OnDataChange events before the measurement window opens. Any per-tag // failures fall through silently; the bench is still meaningful for the // successfully-subscribed subset. let bench_outcome = async { let server_handle = session.register(&client_name).await?; let subscribe_results = session.subscribe_bulk(server_handle, tags.clone()).await?; let tags_ref: &[String] = &tags; let item_handles: Vec = subscribe_results .iter() .filter(|r| r.was_successful) .map(|r| r.item_handle) .collect(); let timeout_ms_param = timeout_ms; // Warm-up: drive identical calls so any connection-pool / channel // setup is amortised before the measurement window opens. let warmup_deadline = Instant::now() + Duration::from_secs(warmup_seconds); while Instant::now() < warmup_deadline { let _ = session .read_bulk(server_handle, tags_ref, timeout_ms_param) .await; } // Steady-state measurement window: capture per-call latency as // sub-millisecond f64 deltas from Instant::now() so the histogram // resolution matches the .NET Stopwatch / Go time.Now path. let mut latencies_ms: Vec = Vec::with_capacity(65_536); let mut total_read_results: i64 = 0; let mut cached_read_results: i64 = 0; let mut successful_calls: u64 = 0; let mut failed_calls: u64 = 0; let steady_start = Instant::now(); let steady_deadline = steady_start + Duration::from_secs(duration_seconds); while Instant::now() < steady_deadline { let call_start = Instant::now(); let result = session .read_bulk(server_handle, tags_ref, timeout_ms_param) .await; let elapsed = call_start.elapsed(); latencies_ms.push(elapsed.as_secs_f64() * 1000.0); match result { Ok(results) => { successful_calls += 1; for r in &results { total_read_results += 1; if r.was_cached { cached_read_results += 1; } } } Err(_) => failed_calls += 1, } } let steady_elapsed = steady_start.elapsed(); // Best-effort cleanup: unsubscribe so the worker can release cache slots. if !item_handles.is_empty() { let _ = session.unsubscribe_bulk(server_handle, item_handles).await; } let total_calls = successful_calls + failed_calls; let calls_per_second = if steady_elapsed.as_secs_f64() > 0.0 { total_calls as f64 / steady_elapsed.as_secs_f64() } else { 0.0 }; Ok::<_, Error>(BenchStats { endpoint, client_name, bulk_size, duration_seconds, warmup_seconds, duration_ms: steady_elapsed.as_millis() as u64, tags: tags.clone(), total_calls, successful_calls, failed_calls, total_read_results, cached_read_results, calls_per_second, latencies_ms, }) } .await; // Always close the session, even if the bench loop returned an error. let close_result = client .close_session_raw(CloseSessionRequest { session_id: session_id.clone(), client_correlation_id: next_correlation_id("cli-bench-read-bulk-close"), }) .await; let stats = bench_outcome?; // Closing the session is best-effort; never let it mask a real bench error. let _ = close_result; if json { let latency = percentile_summary(&stats.latencies_ms); let payload = json!({ "language": "rust", "command": "bench-read-bulk", "endpoint": stats.endpoint, "clientName": stats.client_name, "bulkSize": stats.bulk_size, "durationSeconds": stats.duration_seconds, "warmupSeconds": stats.warmup_seconds, "durationMs": stats.duration_ms, "tags": stats.tags, "totalCalls": stats.total_calls, "successfulCalls": stats.successful_calls, "failedCalls": stats.failed_calls, "totalReadResults": stats.total_read_results, "cachedReadResults": stats.cached_read_results, "callsPerSecond": round_to(stats.calls_per_second, 2), "latencyMs": { "p50": round_to(latency.p50, 3), "p95": round_to(latency.p95, 3), "p99": round_to(latency.p99, 3), "max": round_to(latency.max, 3), "mean": round_to(latency.mean, 3), }, }); println!("{payload}"); } else { println!("{}", stats.calls_per_second); } Ok(()) } /// Collected bench-read-bulk measurements; carried in one struct so the /// async block can finish cleanup (unsubscribe, close-session) before the /// caller renders the JSON / plain output. struct BenchStats { endpoint: String, client_name: String, bulk_size: usize, duration_seconds: u64, warmup_seconds: u64, duration_ms: u64, tags: Vec, total_calls: u64, successful_calls: u64, failed_calls: u64, total_read_results: i64, cached_read_results: i64, calls_per_second: f64, latencies_ms: Vec, } /// The same `{ p50, p95, p99, max, mean }` shape every language bench emits. /// `p50`/`p95`/`p99` use nearest-rank with linear interpolation, matching the /// .NET / Go implementations so cross-language comparisons are apples-to-apples. struct LatencySummary { p50: f64, p95: f64, p99: f64, max: f64, mean: f64, } fn percentile_summary(sample: &[f64]) -> LatencySummary { if sample.is_empty() { return LatencySummary { p50: 0.0, p95: 0.0, p99: 0.0, max: 0.0, mean: 0.0, }; } let mut sorted: Vec = sample.to_vec(); sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal)); let max = sorted[sorted.len() - 1]; let mean: f64 = sample.iter().copied().sum::() / sample.len() as f64; LatencySummary { p50: percentile(&sorted, 0.50), p95: percentile(&sorted, 0.95), p99: percentile(&sorted, 0.99), max, mean, } } fn percentile(sorted: &[f64], quantile: f64) -> f64 { if sorted.is_empty() { return 0.0; } if sorted.len() == 1 { return sorted[0]; } let rank = quantile * (sorted.len() - 1) as f64; let lower = rank.floor() as usize; let upper = lower + 1; if upper >= sorted.len() { return sorted[lower]; } let fraction = rank - lower as f64; sorted[lower] + (sorted[upper] - sorted[lower]) * fraction } fn round_to(value: f64, digits: u32) -> f64 { let shift = 10f64.powi(digits as i32); (value * shift).round() / shift } /// Pairs each parsed item handle with its parsed MxValue (proto form) so a /// single helper can build the four bulk-write families without each branch /// repeating the length check and per-value parsing. fn build_write_bulk_entries( item_handles: &[i32], value_type: CliValueType, values: &[String], ) -> Result< Vec<( i32, zb_mom_ww_mxgateway_client::generated::mxaccess_gateway::v1::MxValue, )>, Error, > { if item_handles.len() != values.len() { return Err(Error::InvalidArgument { name: "values".to_owned(), detail: format!( "item-handles count ({}) does not match values count ({})", item_handles.len(), values.len() ), }); } item_handles .iter() .zip(values.iter()) .map(|(handle, value)| { parse_value(value_type, value).map(|wrapper| (*handle, wrapper.into_proto())) }) .collect() } fn print_deploy_event(event: &DeployEvent, use_json: bool) { if use_json { println!( "{}", json!({ "sequence": event.sequence, "observedAt": event.observed_at.as_ref().map(|ts| json!({ "seconds": ts.seconds, "nanos": ts.nanos, })), "timeOfLastDeploy": event.time_of_last_deploy.as_ref().map(|ts| json!({ "seconds": ts.seconds, "nanos": ts.nanos, })), "timeOfLastDeployPresent": event.time_of_last_deploy_present, "objectCount": event.object_count, "attributeCount": event.attribute_count, }) ); } else { let observed = event .observed_at .as_ref() .map(|ts| format!("{}.{:09}", ts.seconds, ts.nanos)) .unwrap_or_else(|| "(absent)".to_owned()); let last_deploy = if event.time_of_last_deploy_present { event .time_of_last_deploy .as_ref() .map(|ts| format!("{}.{:09}", ts.seconds, ts.nanos)) .unwrap_or_else(|| "(absent)".to_owned()) } else { "(absent)".to_owned() }; println!( "seq={} observed={} lastDeploy={} objects={} attributes={}", event.sequence, observed, last_deploy, event.object_count, event.attribute_count, ); } } /// Render a streamed [`MxEvent`] as a JSON object. The scalar value is /// projected into protojson-style `*Value` keys so the cross-language e2e /// matrix can extract and compare event values uniformly across all five /// client CLIs. fn event_to_json(event: &MxEvent) -> Value { // Match the other four CLIs by rendering the family as its protobuf enum // name (e.g. MX_EVENT_FAMILY_ON_WRITE_COMPLETE). The e2e write round-trip // looks up this name to confirm the OnWriteComplete echo arrived; emitting // the raw i32 leaves it unable to recognise any event. let family = MxEventFamily::try_from(event.family) .map(|f| f.as_str_name()) .unwrap_or("MX_EVENT_FAMILY_UNSPECIFIED"); json!({ "family": family, "sessionId": event.session_id, "serverHandle": event.server_handle, "itemHandle": event.item_handle, "quality": event.quality, "workerSequence": event.worker_sequence, "value": event.value.as_ref().map(event_value_to_json), }) } /// Project an [`MxValue`] into a protojson-shaped JSON object whose single /// key names the scalar kind (`int32Value`, `stringValue`, ...), matching /// the protobuf-JSON the .NET/Go/Java CLIs emit. fn event_value_to_json(value: &ProtoMxValue) -> Value { match MxValue::from_proto(value.clone()).projection() { MxValueProjection::Bool(inner) => json!({ "boolValue": inner }), MxValueProjection::Int32(inner) => json!({ "int32Value": inner }), // protojson renders 64-bit integers as strings; mirror that here. MxValueProjection::Int64(inner) => json!({ "int64Value": inner.to_string() }), MxValueProjection::Float(inner) => json!({ "floatValue": inner }), MxValueProjection::Double(inner) => json!({ "doubleValue": inner }), MxValueProjection::String(inner) => json!({ "stringValue": inner }), MxValueProjection::Timestamp(ts) => { json!({ "timestampValue": { "seconds": ts.seconds, "nanos": ts.nanos } }) } MxValueProjection::Array(_) => json!({ "arrayValue": {} }), MxValueProjection::Raw(bytes) => json!({ "rawValue": { "byteCount": bytes.len() } }), MxValueProjection::Null => json!({ "isNull": true }), MxValueProjection::Unset => Value::Null, } } /// Render a streamed [`AlarmFeedMessage`] as a terse one-line summary that /// distinguishes the three `payload` oneof cases. fn alarm_feed_message_summary(message: &AlarmFeedMessage) -> String { match &message.payload { Some(alarm_feed_message::Payload::ActiveAlarm(snapshot)) => { format!( "active-alarm {} state={}", snapshot.alarm_full_reference, AlarmEnumName::condition_state(snapshot.current_state) ) } Some(alarm_feed_message::Payload::SnapshotComplete(complete)) => { format!("snapshot-complete {complete}") } Some(alarm_feed_message::Payload::Transition(transition)) => { format!( "transition {} kind={}", transition.alarm_full_reference, AlarmEnumName::transition_kind(transition.transition_kind) ) } None => "(empty)".to_owned(), } } /// Render a streamed [`AlarmFeedMessage`] as a JSON object whose single /// top-level key names the active `payload` oneof case, mirroring the /// protobuf-JSON the .NET/Go/Java/Python CLIs emit. fn alarm_feed_message_to_json(message: &AlarmFeedMessage) -> Value { match &message.payload { Some(alarm_feed_message::Payload::ActiveAlarm(snapshot)) => json!({ "activeAlarm": { "alarmFullReference": snapshot.alarm_full_reference, "sourceObjectReference": snapshot.source_object_reference, "alarmTypeName": snapshot.alarm_type_name, "severity": snapshot.severity, "currentState": AlarmEnumName::condition_state(snapshot.current_state), "category": snapshot.category, "description": snapshot.description, "operatorUser": snapshot.operator_user, "operatorComment": snapshot.operator_comment, } }), Some(alarm_feed_message::Payload::SnapshotComplete(complete)) => json!({ "snapshotComplete": complete, }), Some(alarm_feed_message::Payload::Transition(transition)) => json!({ "transition": { "alarmFullReference": transition.alarm_full_reference, "sourceObjectReference": transition.source_object_reference, "alarmTypeName": transition.alarm_type_name, "transitionKind": AlarmEnumName::transition_kind(transition.transition_kind), "severity": transition.severity, "operatorUser": transition.operator_user, "operatorComment": transition.operator_comment, "category": transition.category, "description": transition.description, } }), None => Value::Null, } } /// Tiny namespace for alarm-enum name lookups used by the alarm-feed /// renderers; keeps the proto-enum imports off the `main.rs` top level. struct AlarmEnumName; impl AlarmEnumName { fn condition_state(value: i32) -> String { use zb_mom_ww_mxgateway_client::generated::mxaccess_gateway::v1::AlarmConditionState; AlarmConditionState::try_from(value) .map(|state| state.as_str_name().to_owned()) .unwrap_or_else(|_| value.to_string()) } fn transition_kind(value: i32) -> String { use zb_mom_ww_mxgateway_client::generated::mxaccess_gateway::v1::AlarmTransitionKind; AlarmTransitionKind::try_from(value) .map(|kind| kind.as_str_name().to_owned()) .unwrap_or_else(|_| value.to_string()) } } /// Render an [`AcknowledgeAlarmReply`] as a terse line or a JSON document. fn print_acknowledge_alarm_reply( reply: &zb_mom_ww_mxgateway_client::generated::mxaccess_gateway::v1::AcknowledgeAlarmReply, use_json: bool, ) { if use_json { println!( "{}", json!({ "operation": "acknowledge-alarm", "correlationId": reply.correlation_id, "protocolStatus": reply.protocol_status.as_ref().map(|status| json!({ "code": status.code, "message": status.message, })), "hresult": reply.hresult, "diagnosticMessage": reply.diagnostic_message, }) ); } else { println!("acknowledge-alarm completed"); } } /// Parse a small but practically-complete subset of RFC3339: /// `YYYY-MM-DDTHH:MM:SS[.fffffffff][Z|+HH:MM|-HH:MM]`. Returns the /// corresponding `prost_types::Timestamp` (Unix seconds + nanoseconds). fn parse_rfc3339_timestamp(input: &str) -> Result { fn invalid(detail: impl Into) -> Error { Error::InvalidArgument { name: "last-seen-deploy-time".to_owned(), detail: detail.into(), } } let bytes = input.as_bytes(); if bytes.len() < 20 || (bytes[10] != b'T' && bytes[10] != b't') { return Err(invalid(format!( "expected RFC3339 timestamp like 2026-04-28T15:30:00Z, got {input:?}" ))); } let read_u32 = |start: usize, len: usize| -> Result { std::str::from_utf8(&bytes[start..start + len]) .ok() .and_then(|slice| slice.parse::().ok()) .ok_or_else(|| invalid(format!("non-numeric digits at byte {start}"))) }; let year = read_u32(0, 4)? as i32; if bytes[4] != b'-' { return Err(invalid("expected '-' after year")); } let month = read_u32(5, 2)?; if bytes[7] != b'-' { return Err(invalid("expected '-' after month")); } let day = read_u32(8, 2)?; let hour = read_u32(11, 2)?; if bytes[13] != b':' { return Err(invalid("expected ':' after hour")); } let minute = read_u32(14, 2)?; if bytes[16] != b':' { return Err(invalid("expected ':' after minute")); } let second = read_u32(17, 2)?; let mut cursor = 19usize; let mut nanos: u32 = 0; if cursor < bytes.len() && bytes[cursor] == b'.' { cursor += 1; let frac_start = cursor; while cursor < bytes.len() && bytes[cursor].is_ascii_digit() { cursor += 1; } let frac_len = cursor - frac_start; if frac_len == 0 { return Err(invalid("expected fractional digits after '.'")); } let take = frac_len.min(9); let frac = std::str::from_utf8(&bytes[frac_start..frac_start + take]) .ok() .and_then(|slice| slice.parse::().ok()) .ok_or_else(|| invalid("invalid fractional digits"))?; nanos = frac * 10u32.pow(9u32.saturating_sub(take as u32)); } let mut offset_seconds: i64 = 0; if cursor >= bytes.len() { return Err(invalid("missing timezone designator (Z or +HH:MM)")); } match bytes[cursor] { b'Z' | b'z' => cursor += 1, sign @ (b'+' | b'-') => { cursor += 1; if cursor + 5 > bytes.len() { return Err(invalid("offset must be ±HH:MM")); } let oh = std::str::from_utf8(&bytes[cursor..cursor + 2]) .ok() .and_then(|slice| slice.parse::().ok()) .ok_or_else(|| invalid("invalid offset hour"))?; if bytes[cursor + 2] != b':' { return Err(invalid("offset must contain ':' between HH and MM")); } let om = std::str::from_utf8(&bytes[cursor + 3..cursor + 5]) .ok() .and_then(|slice| slice.parse::().ok()) .ok_or_else(|| invalid("invalid offset minute"))?; cursor += 5; let signed = if sign == b'-' { -1 } else { 1 }; offset_seconds = signed * (oh * 3600 + om * 60); } other => { return Err(invalid(format!( "unexpected timezone designator byte {other:?}" ))); } } if cursor != bytes.len() { return Err(invalid("trailing characters after timezone")); } let unix = ymdhms_to_unix(year, month, day, hour, minute, second)?; let seconds = unix - offset_seconds; Ok(prost_types::Timestamp { seconds, nanos: nanos as i32, }) } fn ymdhms_to_unix( year: i32, month: u32, day: u32, hour: u32, minute: u32, second: u32, ) -> Result { if !(1..=12).contains(&month) || day < 1 || hour > 23 || minute > 59 || second > 60 { return Err(Error::InvalidArgument { name: "last-seen-deploy-time".to_owned(), detail: "calendar component out of range".to_owned(), }); } fn is_leap(year: i32) -> bool { (year % 4 == 0 && year % 100 != 0) || year % 400 == 0 } const DAYS: [u32; 12] = [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]; let mut max = DAYS[(month - 1) as usize]; if month == 2 && is_leap(year) { max = 29; } if day > max { return Err(Error::InvalidArgument { name: "last-seen-deploy-time".to_owned(), detail: format!("day {day} out of range for month {month}/{year}"), }); } // Days from 1970-01-01 to year-month-day. let mut total_days: i64 = 0; if year >= 1970 { for y in 1970..year { total_days += if is_leap(y) { 366 } else { 365 }; } } else { for y in year..1970 { total_days -= if is_leap(y) { 366 } else { 365 }; } } for m in 1..month { let mut dim = DAYS[(m - 1) as usize]; if m == 2 && is_leap(year) { dim = 29; } total_days += dim as i64; } total_days += (day - 1) as i64; Ok(total_days * 86_400 + hour as i64 * 3600 + minute as i64 * 60 + second as i64) } fn parse_cli_value(value: &str) -> Result where T: std::str::FromStr, T::Err: std::fmt::Display, { value.parse::().map_err(|source| Error::InvalidArgument { name: "value".to_owned(), detail: source.to_string(), }) } #[cfg(test)] mod tests { use clap::Parser; use super::Cli; #[test] fn parses_version_json_command() { let parsed = Cli::try_parse_from(["mxgw", "version", "--json"]); assert!(parsed.is_ok()); } #[test] fn parses_write_command() { let parsed = Cli::try_parse_from([ "mxgw", "write", "--session-id", "session-1", "--server-handle", "12", "--item-handle", "34", "--value-type", "int32", "--value", "123", ]); assert!(parsed.is_ok()); } #[test] fn version_json_output_has_protocol_versions() { let value = super::version_json(); assert_eq!(value["gatewayProtocolVersion"], 3); assert_eq!(value["workerProtocolVersion"], 1); } #[test] fn parses_stream_alarms_command() { let parsed = Cli::try_parse_from([ "mxgw", "stream-alarms", "--filter-prefix", "Tank01", "--max-events", "3", "--json", ]); assert!(parsed.is_ok(), "parse failed: {parsed:?}"); } #[test] fn parses_stream_alarms_command_without_filter_prefix() { let parsed = Cli::try_parse_from(["mxgw", "stream-alarms"]); assert!(parsed.is_ok(), "parse failed: {parsed:?}"); } #[test] fn parses_acknowledge_alarm_command() { let parsed = Cli::try_parse_from([ "mxgw", "acknowledge-alarm", "--reference", "Tank01.Level.HiHi", "--comment", "ack from cli", "--operator", "operator1", ]); assert!(parsed.is_ok(), "parse failed: {parsed:?}"); } #[test] fn acknowledge_alarm_requires_reference() { let parsed = Cli::try_parse_from(["mxgw", "acknowledge-alarm"]); assert!(parsed.is_err()); } #[test] fn parses_galaxy_watch_command_with_last_seen_and_max_events() { let parsed = Cli::try_parse_from([ "mxgw", "galaxy", "watch", "--last-seen-deploy-time", "2026-04-28T15:30:00Z", "--max-events", "5", "--json", ]); assert!(parsed.is_ok(), "parse failed: {parsed:?}"); } #[test] fn parses_galaxy_watch_deploy_events_alias() { let parsed = Cli::try_parse_from(["mxgw", "galaxy", "watch-deploy-events"]); assert!(parsed.is_ok(), "parse failed: {parsed:?}"); } #[test] fn parses_batch_command() { let parsed = Cli::try_parse_from(["mxgw", "batch"]); assert!(parsed.is_ok(), "parse failed: {parsed:?}"); } #[test] fn batch_eor_marker_is_stable() { assert_eq!(super::BATCH_EOR, "__MXGW_BATCH_EOR__"); } #[test] fn bench_percentile_summary_matches_hand_built_sample() { // Hand-built sample with 5 values: 1, 2, 3, 4, 5. let sample: Vec = vec![1.0, 2.0, 3.0, 4.0, 5.0]; let summary = super::percentile_summary(&sample); assert_eq!(summary.max, 5.0); // Mean = 15/5 = 3.0 assert!((summary.mean - 3.0).abs() < f64::EPSILON); // p50: rank = 0.5 * 4 = 2 -> sorted[2] = 3.0 assert!((summary.p50 - 3.0).abs() < f64::EPSILON); // p95: rank = 0.95 * 4 = 3.8 -> 4.0 + 0.8 * (5.0 - 4.0) = 4.8 assert!((summary.p95 - 4.8).abs() < f64::EPSILON); // p99: rank = 0.99 * 4 = 3.96 -> 4.0 + 0.96 * 1.0 = 4.96 assert!((summary.p99 - 4.96).abs() < f64::EPSILON); } #[test] fn bench_percentile_summary_handles_empty_sample() { let summary = super::percentile_summary(&[]); assert_eq!(summary.p50, 0.0); assert_eq!(summary.p95, 0.0); assert_eq!(summary.p99, 0.0); assert_eq!(summary.max, 0.0); assert_eq!(summary.mean, 0.0); } #[test] fn bench_percentile_summary_handles_single_value_sample() { let summary = super::percentile_summary(&[42.0]); assert_eq!(summary.p50, 42.0); assert_eq!(summary.p95, 42.0); assert_eq!(summary.p99, 42.0); assert_eq!(summary.max, 42.0); assert_eq!(summary.mean, 42.0); } #[test] fn rfc3339_parser_round_trips_z_and_offset_inputs() { // 2026-04-28T15:30:00Z = 1_777_995_000 (sanity-checked once below) let utc = super::parse_rfc3339_timestamp("2026-04-28T15:30:00Z").unwrap(); let plus = super::parse_rfc3339_timestamp("2026-04-28T16:30:00+01:00").unwrap(); let frac = super::parse_rfc3339_timestamp("2026-04-28T15:30:00.250Z").unwrap(); assert_eq!(utc.seconds, plus.seconds); assert_eq!(utc.nanos, 0); assert_eq!(frac.seconds, utc.seconds); assert_eq!(frac.nanos, 250_000_000); } }