use std::env; use std::path::PathBuf; use std::process::ExitCode; use std::time::Duration; use clap::{Args, Parser, Subcommand, ValueEnum}; use futures_util::StreamExt; use mxgateway_client::generated::mxaccess_gateway::v1::{ CloseSessionRequest, MxCommand, MxCommandKind, MxCommandRequest, OpenSessionRequest, PingCommand, StreamEventsRequest, }; use mxgateway_client::{ ApiKey, ClientOptions, Error, GatewayClient, MxValue, CLIENT_VERSION, GATEWAY_PROTOCOL_VERSION, WORKER_PROTOCOL_VERSION, }; use serde_json::json; use serde_json::Value; #[derive(Debug, Parser)] #[command(name = "mxgw")] #[command(about = "MXAccess Gateway Rust test CLI")] struct Cli { #[command(subcommand)] command: Command, } #[derive(Debug, Subcommand)] enum Command { Version { #[arg(long)] json: bool, }, Ping { #[command(flatten)] connection: ConnectionArgs, #[arg(long, default_value = "ping")] message: String, #[arg(long)] json: bool, }, OpenSession { #[command(flatten)] connection: ConnectionArgs, #[arg(long, default_value = "mxgw-rust-cli")] client_name: String, #[arg(long)] json: bool, }, CloseSession { #[command(flatten)] connection: ConnectionArgs, #[arg(long)] session_id: String, #[arg(long)] json: bool, }, Register { #[command(flatten)] connection: ConnectionArgs, #[arg(long)] session_id: String, #[arg(long, default_value = "mxgw-rust-cli")] client_name: String, #[arg(long)] json: bool, }, AddItem { #[command(flatten)] connection: ConnectionArgs, #[arg(long)] session_id: String, #[arg(long)] server_handle: i32, #[arg(long)] item: String, #[arg(long)] json: bool, }, Advise { #[command(flatten)] connection: ConnectionArgs, #[arg(long)] session_id: String, #[arg(long)] server_handle: i32, #[arg(long)] item_handle: i32, #[arg(long)] json: bool, }, StreamEvents { #[command(flatten)] connection: ConnectionArgs, #[arg(long)] session_id: String, #[arg(long, default_value_t = 0)] after_worker_sequence: u64, #[arg(long, default_value_t = 1)] max_events: usize, #[arg(long)] json: bool, }, Write { #[command(flatten)] connection: ConnectionArgs, #[arg(long)] session_id: String, #[arg(long)] server_handle: i32, #[arg(long)] item_handle: i32, #[arg(long, value_enum)] value_type: CliValueType, #[arg(long)] value: String, #[arg(long, default_value_t = 0)] user_id: i32, #[arg(long)] json: bool, }, Write2 { #[command(flatten)] connection: ConnectionArgs, #[arg(long)] session_id: String, #[arg(long)] server_handle: i32, #[arg(long)] item_handle: i32, #[arg(long, value_enum)] value_type: CliValueType, #[arg(long)] value: String, #[arg(long)] timestamp: String, #[arg(long, default_value_t = 0)] user_id: i32, #[arg(long)] json: bool, }, Smoke { #[command(flatten)] connection: ConnectionArgs, #[arg(long)] item: String, #[arg(long, default_value = "mxgw-rust-smoke")] client_name: String, #[arg(long)] json: bool, }, } #[derive(Debug, Args, Clone)] struct ConnectionArgs { #[arg(long, default_value = "http://127.0.0.1:5000")] endpoint: String, #[arg(long)] api_key: Option, #[arg(long, default_value = "MXGATEWAY_API_KEY")] api_key_env: String, #[arg(long)] plaintext: bool, #[arg(long)] tls: bool, #[arg(long)] ca_file: Option, #[arg(long)] server_name_override: Option, #[arg(long, default_value_t = 10)] connect_timeout_seconds: u64, #[arg(long, default_value_t = 30)] call_timeout_seconds: u64, } impl ConnectionArgs { fn options(&self) -> ClientOptions { let mut options = ClientOptions::new(self.endpoint.clone()) .with_plaintext(!self.tls || self.plaintext) .with_connect_timeout(Duration::from_secs(self.connect_timeout_seconds)) .with_call_timeout(Duration::from_secs(self.call_timeout_seconds)); if let Some(api_key) = self .api_key .clone() .or_else(|| env::var(&self.api_key_env).ok()) .filter(|value| !value.is_empty()) { options = options.with_api_key(ApiKey::new(api_key)); } if let Some(ca_file) = &self.ca_file { options = options.with_ca_file(ca_file); } if let Some(server_name_override) = &self.server_name_override { options = options.with_server_name_override(server_name_override); } options } } #[derive(Clone, Copy, Debug, ValueEnum)] enum CliValueType { Bool, Int32, Int64, Float, Double, String, } #[tokio::main] async fn main() -> ExitCode { let cli = Cli::parse(); match run(cli).await { Ok(()) => ExitCode::SUCCESS, Err(error) => { eprintln!("{error}"); ExitCode::FAILURE } } } async fn run(cli: Cli) -> Result<(), Error> { match cli.command { Command::Version { json } => print_version(json), Command::Ping { connection, message, json, } => { let client = connect(connection).await?; let reply = client .invoke(MxCommandRequest { client_correlation_id: "rust-cli-ping".to_owned(), command: Some(MxCommand { kind: MxCommandKind::Ping as i32, payload: Some(mxgateway_client::generated::mxaccess_gateway::v1::mx_command::Payload::Ping( PingCommand { message }, )), }), ..MxCommandRequest::default() }) .await?; print_command_reply("ping", &reply, json); } Command::OpenSession { connection, client_name, json, } => { let client = connect(connection).await?; let reply = client .open_session_raw(OpenSessionRequest { client_session_name: client_name, ..OpenSessionRequest::default() }) .await?; if json { println!( "{}", json!({ "sessionId": reply.session_id, "backendName": reply.backend_name, "gatewayProtocolVersion": reply.gateway_protocol_version, "workerProtocolVersion": reply.worker_protocol_version, }) ); } else { println!("{}", reply.session_id); } } Command::CloseSession { connection, session_id, json, } => { let client = connect(connection).await?; let reply = client .close_session_raw(CloseSessionRequest { session_id, client_correlation_id: "rust-cli-close-session".to_owned(), }) .await?; if json { println!("{}", json!({ "sessionId": reply.session_id })); } else { println!("closed {}", reply.session_id); } } Command::Register { connection, session_id, client_name, json, } => { let session = session_for(connection, session_id).await?; let server_handle = session.register(&client_name).await?; print_handle("serverHandle", server_handle, json); } Command::AddItem { connection, session_id, server_handle, item, json, } => { let session = session_for(connection, session_id).await?; let item_handle = session.add_item(server_handle, &item).await?; print_handle("itemHandle", item_handle, json); } Command::Advise { connection, session_id, server_handle, item_handle, json, } => { let session = session_for(connection, session_id).await?; session.advise(server_handle, item_handle).await?; print_ok("advise", json); } Command::StreamEvents { connection, session_id, after_worker_sequence, max_events, json, } => { let client = connect(connection).await?; let mut stream = client .stream_events(StreamEventsRequest { session_id, after_worker_sequence, }) .await?; let mut events = Vec::new(); while events.len() < max_events { let Some(event) = stream.next().await else { break; }; events.push(event?); } if json { println!("{}", json!({ "eventCount": events.len() })); } else { for event in events { println!("{} {}", event.worker_sequence, event.family); } } } Command::Write { connection, session_id, server_handle, item_handle, value_type, value, user_id, json, } => { let session = session_for(connection, session_id).await?; session .write( server_handle, item_handle, parse_value(value_type, &value)?, user_id, ) .await?; print_ok("write", json); } Command::Write2 { connection, session_id, server_handle, item_handle, value_type, value, timestamp, user_id, json, } => { let session = session_for(connection, session_id).await?; session .write2( server_handle, item_handle, parse_value(value_type, &value)?, MxValue::string(timestamp), user_id, ) .await?; print_ok("write2", json); } Command::Smoke { connection, item, client_name, json, } => { let client = connect(connection).await?; let session = client .open_session(OpenSessionRequest { client_session_name: client_name.clone(), ..OpenSessionRequest::default() }) .await?; let result = async { let server_handle = session.register(&client_name).await?; let item_handle = session.add_item(server_handle, &item).await?; session.advise(server_handle, item_handle).await?; Ok::<_, Error>((server_handle, item_handle)) } .await; let close_result = session.close().await; let (server_handle, item_handle) = result?; close_result?; if json { println!( "{}", json!({ "sessionId": session.id(), "serverHandle": server_handle, "itemHandle": item_handle, "closed": true, }) ); } else { println!( "session {} registered server {server_handle}, item {item_handle}, closed", session.id() ); } } } Ok(()) } async fn connect(connection: ConnectionArgs) -> Result { GatewayClient::connect(connection.options()).await } async fn session_for( connection: ConnectionArgs, session_id: String, ) -> Result { let client = connect(connection).await?; Ok(client.session(session_id)) } fn print_version(use_json: bool) { if use_json { println!("{}", version_json()); return; } println!("mxgw {CLIENT_VERSION}"); println!("gateway protocol {GATEWAY_PROTOCOL_VERSION}"); println!("worker protocol {WORKER_PROTOCOL_VERSION}"); } fn version_json() -> Value { json!({ "clientVersion": CLIENT_VERSION, "gatewayProtocolVersion": GATEWAY_PROTOCOL_VERSION, "workerProtocolVersion": WORKER_PROTOCOL_VERSION, }) } fn print_command_reply( operation: &str, reply: &mxgateway_client::generated::mxaccess_gateway::v1::MxCommandReply, use_json: bool, ) { if use_json { println!( "{}", json!({ "operation": operation, "sessionId": reply.session_id, "correlationId": reply.correlation_id, "kind": reply.kind, }) ); } else { println!("{operation} completed"); } } fn print_handle(name: &str, handle: i32, use_json: bool) { if use_json { println!("{}", json!({ name: handle })); } else { println!("{handle}"); } } fn print_ok(operation: &str, use_json: bool) { if use_json { println!("{}", json!({ "operation": operation, "ok": true })); } else { println!("{operation} completed"); } } fn parse_value(value_type: CliValueType, value: &str) -> Result { let parsed = match value_type { CliValueType::Bool => MxValue::bool(parse_cli_value(value)?), CliValueType::Int32 => MxValue::int32(parse_cli_value(value)?), CliValueType::Int64 => MxValue::int64(parse_cli_value(value)?), CliValueType::Float => MxValue::float(parse_cli_value(value)?), CliValueType::Double => MxValue::double(parse_cli_value(value)?), CliValueType::String => MxValue::string(value), }; Ok(parsed) } fn parse_cli_value(value: &str) -> Result where T: std::str::FromStr, T::Err: std::fmt::Display, { value.parse::().map_err(|source| Error::InvalidArgument { name: "value".to_owned(), detail: source.to_string(), }) } #[cfg(test)] mod tests { use clap::Parser; use super::Cli; #[test] fn parses_version_json_command() { let parsed = Cli::try_parse_from(["mxgw", "version", "--json"]); assert!(parsed.is_ok()); } #[test] fn parses_write_command() { let parsed = Cli::try_parse_from([ "mxgw", "write", "--session-id", "session-1", "--server-handle", "12", "--item-handle", "34", "--value-type", "int32", "--value", "123", ]); assert!(parsed.is_ok()); } #[test] fn version_json_output_has_protocol_versions() { let value = super::version_json(); assert_eq!(value["gatewayProtocolVersion"], 1); assert_eq!(value["workerProtocolVersion"], 1); } }