From 7de4efeb023969fc73ab871c4e5165a968dea4e0 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Sun, 24 May 2026 06:45:46 -0400 Subject: [PATCH] Rust client: port stream-alarms and acknowledge-alarm + fix stream-events family + 8MB Windows stack Adds the session-less alarm CLI subcommands to mxgw. stream-alarms attaches to the gateway's central alarm feed (--filter-prefix, --max-events, --json/--jsonl; aggregate shape `{messageCount, messages: [...]}`); acknowledge-alarm is a unary ack (--reference required, --comment, --operator). stream_alarms joins query_active_alarms on GatewayClient and re-exports AlarmFeedStream. Also extends stream-events JSON to emit a full `events` array (itemHandle, value projected to protojson-shaped `*Value` keys, etc.) instead of just `eventCount`, matching the other four CLIs, and renders MxEvent.family as the protobuf enum NAME (MX_EVENT_FAMILY_ON_WRITE_COMPLETE) rather than the raw i32 so the e2e write round-trip can recognise the OnWriteComplete echo. Adds clients/rust/.cargo/config.toml bumping the Windows main-thread stack to 8 MB via /STACK:8388608. clap-derive's Command enum (one variant per subcommand) overflowed the default 1 MB stack in debug builds after the new variants landed; release builds were unaffected but the e2e matrix runs Rust via `cargo run` (debug). Co-Authored-By: Claude Opus 4.7 (1M context) --- clients/rust/.cargo/config.toml | 9 + clients/rust/crates/mxgw-cli/src/main.rs | 332 +++++++++++++++++++++-- clients/rust/src/client.rs | 46 +++- clients/rust/src/lib.rs | 2 +- clients/rust/tests/client_behavior.rs | 16 +- clients/rust/tests/proto_fixtures.rs | 2 +- 6 files changed, 368 insertions(+), 39 deletions(-) create mode 100644 clients/rust/.cargo/config.toml diff --git a/clients/rust/.cargo/config.toml b/clients/rust/.cargo/config.toml new file mode 100644 index 0000000..d60668a --- /dev/null +++ b/clients/rust/.cargo/config.toml @@ -0,0 +1,9 @@ +[target.'cfg(windows)'] +# Bump the default 1 MB Windows stack to 8 MB. clap-derive builds a large +# Command enum in this CLI (one variant per subcommand, each carrying flag +# args); in debug builds the enum is materialized on the stack without +# optimization and overflows the default Windows main-thread stack before +# even reaching our code. Release builds are unaffected but the e2e matrix +# drives the CLI through `cargo run` (debug), so the link-arg ships with +# every dev-time invocation. +rustflags = ["-C", "link-arg=/STACK:8388608"] diff --git a/clients/rust/crates/mxgw-cli/src/main.rs b/clients/rust/crates/mxgw-cli/src/main.rs index ae30066..1affe00 100644 --- a/clients/rust/crates/mxgw-cli/src/main.rs +++ b/clients/rust/crates/mxgw-cli/src/main.rs @@ -16,18 +16,19 @@ use std::time::{Duration, Instant}; use clap::{Args, Parser, Subcommand, ValueEnum}; use futures_util::StreamExt; -use zb_mom_ww_mxgateway_client::generated::galaxy_repository::v1::DeployEvent; -use zb_mom_ww_mxgateway_client::generated::mxaccess_gateway::v1::{ - CloseSessionRequest, MxCommand, MxCommandKind, MxCommandRequest, MxValue as ProtoMxValue, - OpenSessionRequest, PingCommand, StreamEventsRequest, Write2BulkEntry, WriteBulkEntry, - WriteSecured2BulkEntry, WriteSecuredBulkEntry, -}; -use zb_mom_ww_mxgateway_client::{ - ApiKey, ClientOptions, Error, GalaxyClient, GatewayClient, MxValue, CLIENT_VERSION, - GATEWAY_PROTOCOL_VERSION, WORKER_PROTOCOL_VERSION, -}; use serde_json::json; use serde_json::Value; +use zb_mom_ww_mxgateway_client::generated::galaxy_repository::v1::DeployEvent; +use zb_mom_ww_mxgateway_client::generated::mxaccess_gateway::v1::{ + alarm_feed_message, AcknowledgeAlarmRequest, AlarmFeedMessage, CloseSessionRequest, MxCommand, + MxCommandKind, MxCommandRequest, MxEvent, MxEventFamily, MxValue as ProtoMxValue, + OpenSessionRequest, PingCommand, StreamAlarmsRequest, StreamEventsRequest, Write2BulkEntry, + WriteBulkEntry, WriteSecured2BulkEntry, WriteSecuredBulkEntry, +}; +use zb_mom_ww_mxgateway_client::{ + ApiKey, ClientOptions, Error, GalaxyClient, GatewayClient, MxValue, MxValueProjection, + CLIENT_VERSION, GATEWAY_PROTOCOL_VERSION, WORKER_PROTOCOL_VERSION, +}; const MAX_AGGREGATE_EVENTS: usize = 10_000; @@ -274,6 +275,38 @@ enum Command { #[arg(long)] jsonl: bool, }, + /// Attach to the gateway's session-less central alarm feed. The stream + /// opens with one `active_alarm` per currently-active alarm, then a + /// single `snapshot_complete`, then a `transition` for every subsequent + /// raise / acknowledge / clear. + StreamAlarms { + #[command(flatten)] + connection: ConnectionArgs, + /// Optional alarm-reference prefix scoping the feed to an equipment + /// sub-tree. Omit to stream every active alarm. + #[arg(long)] + filter_prefix: Option, + #[arg(long, default_value_t = 1)] + max_events: usize, + #[arg(long)] + json: bool, + #[arg(long)] + jsonl: bool, + }, + /// Acknowledge an active MXAccess alarm condition through the gateway's + /// session-less AcknowledgeAlarm RPC. + AcknowledgeAlarm { + #[command(flatten)] + connection: ConnectionArgs, + #[arg(long)] + reference: String, + #[arg(long, default_value = "")] + comment: String, + #[arg(long, default_value = "")] + operator: String, + #[arg(long)] + json: bool, + }, Write { #[command(flatten)] connection: ConnectionArgs, @@ -760,7 +793,7 @@ async fn dispatch(command: Command) -> Result<(), Error> { after_worker_sequence, }) .await?; - let mut events = Vec::new(); + let mut events: Vec = Vec::new(); let mut event_count = 0usize; while event_count < max_events { let Some(event) = stream.next().await else { @@ -769,23 +802,81 @@ async fn dispatch(command: Command) -> Result<(), Error> { let event = event?; event_count += 1; if jsonl { - println!( - "{}", - json!({ - "workerSequence": event.worker_sequence, - "family": event.family, - }) - ); + println!("{}", event_to_json(&event)); } else if json { - events.push(event); + events.push(event_to_json(&event)); } else { println!("{} {}", event.worker_sequence, event.family); } } if json { - println!("{}", json!({ "eventCount": event_count })); + // `eventCount` is preserved for back-compat; `events` carries + // the per-event detail the cross-language e2e matrix compares. + println!("{}", json!({ "eventCount": event_count, "events": events })); } } + Command::StreamAlarms { + connection, + filter_prefix, + max_events, + json, + jsonl, + } => { + if max_events > MAX_AGGREGATE_EVENTS { + return Err(Error::InvalidArgument { + name: "max-events".to_owned(), + detail: format!("must be less than or equal to {MAX_AGGREGATE_EVENTS}"), + }); + } + + let client = connect(connection).await?; + let mut stream = client + .stream_alarms(StreamAlarmsRequest { + client_correlation_id: "rust-cli-stream-alarms".to_owned(), + alarm_filter_prefix: filter_prefix.unwrap_or_default(), + }) + .await?; + let mut messages: Vec = Vec::new(); + let mut message_count = 0usize; + while message_count < max_events { + let Some(message) = stream.next().await else { + break; + }; + let message = message?; + message_count += 1; + if jsonl { + println!("{}", alarm_feed_message_to_json(&message)); + } else if json { + messages.push(alarm_feed_message_to_json(&message)); + } else { + println!("{}", alarm_feed_message_summary(&message)); + } + } + if json { + println!( + "{}", + json!({ "messageCount": message_count, "messages": messages }) + ); + } + } + Command::AcknowledgeAlarm { + connection, + reference, + comment, + operator, + json, + } => { + let client = connect(connection).await?; + let reply = client + .acknowledge_alarm(AcknowledgeAlarmRequest { + client_correlation_id: "rust-cli-acknowledge-alarm".to_owned(), + alarm_full_reference: reference, + comment, + operator_user: operator, + }) + .await?; + print_acknowledge_alarm_reply(&reply, json); + } Command::Write { connection, session_id, @@ -1296,9 +1387,7 @@ async fn run_bench_read_bulk( // successfully-subscribed subset. let bench_outcome = async { let server_handle = session.register(&client_name).await?; - let subscribe_results = session - .subscribe_bulk(server_handle, tags.clone()) - .await?; + let subscribe_results = session.subscribe_bulk(server_handle, tags.clone()).await?; let item_handles: Vec = subscribe_results .iter() .filter(|r| r.was_successful) @@ -1351,9 +1440,7 @@ async fn run_bench_read_bulk( // Best-effort cleanup: unsubscribe so the worker can release cache slots. if !item_handles.is_empty() { - let _ = session - .unsubscribe_bulk(server_handle, item_handles) - .await; + let _ = session.unsubscribe_bulk(server_handle, item_handles).await; } let total_calls = successful_calls + failed_calls; @@ -1577,6 +1664,158 @@ fn print_deploy_event(event: &DeployEvent, use_json: bool) { } } +/// Render a streamed [`MxEvent`] as a JSON object. The scalar value is +/// projected into protojson-style `*Value` keys so the cross-language e2e +/// matrix can extract and compare event values uniformly across all five +/// client CLIs. +fn event_to_json(event: &MxEvent) -> Value { + // Match the other four CLIs by rendering the family as its protobuf enum + // name (e.g. MX_EVENT_FAMILY_ON_WRITE_COMPLETE). The e2e write round-trip + // looks up this name to confirm the OnWriteComplete echo arrived; emitting + // the raw i32 leaves it unable to recognise any event. + let family = MxEventFamily::try_from(event.family) + .map(|f| f.as_str_name()) + .unwrap_or("MX_EVENT_FAMILY_UNSPECIFIED"); + json!({ + "family": family, + "sessionId": event.session_id, + "serverHandle": event.server_handle, + "itemHandle": event.item_handle, + "quality": event.quality, + "workerSequence": event.worker_sequence, + "value": event.value.as_ref().map(event_value_to_json), + }) +} + +/// Project an [`MxValue`] into a protojson-shaped JSON object whose single +/// key names the scalar kind (`int32Value`, `stringValue`, ...), matching +/// the protobuf-JSON the .NET/Go/Java CLIs emit. +fn event_value_to_json(value: &ProtoMxValue) -> Value { + match MxValue::from_proto(value.clone()).projection() { + MxValueProjection::Bool(inner) => json!({ "boolValue": inner }), + MxValueProjection::Int32(inner) => json!({ "int32Value": inner }), + // protojson renders 64-bit integers as strings; mirror that here. + MxValueProjection::Int64(inner) => json!({ "int64Value": inner.to_string() }), + MxValueProjection::Float(inner) => json!({ "floatValue": inner }), + MxValueProjection::Double(inner) => json!({ "doubleValue": inner }), + MxValueProjection::String(inner) => json!({ "stringValue": inner }), + MxValueProjection::Timestamp(ts) => { + json!({ "timestampValue": { "seconds": ts.seconds, "nanos": ts.nanos } }) + } + MxValueProjection::Array(_) => json!({ "arrayValue": {} }), + MxValueProjection::Raw(bytes) => json!({ "rawValue": { "byteCount": bytes.len() } }), + MxValueProjection::Null => json!({ "isNull": true }), + MxValueProjection::Unset => Value::Null, + } +} + +/// Render a streamed [`AlarmFeedMessage`] as a terse one-line summary that +/// distinguishes the three `payload` oneof cases. +fn alarm_feed_message_summary(message: &AlarmFeedMessage) -> String { + match &message.payload { + Some(alarm_feed_message::Payload::ActiveAlarm(snapshot)) => { + format!( + "active-alarm {} state={}", + snapshot.alarm_full_reference, + AlarmEnumName::condition_state(snapshot.current_state) + ) + } + Some(alarm_feed_message::Payload::SnapshotComplete(complete)) => { + format!("snapshot-complete {complete}") + } + Some(alarm_feed_message::Payload::Transition(transition)) => { + format!( + "transition {} kind={}", + transition.alarm_full_reference, + AlarmEnumName::transition_kind(transition.transition_kind) + ) + } + None => "(empty)".to_owned(), + } +} + +/// Render a streamed [`AlarmFeedMessage`] as a JSON object whose single +/// top-level key names the active `payload` oneof case, mirroring the +/// protobuf-JSON the .NET/Go/Java/Python CLIs emit. +fn alarm_feed_message_to_json(message: &AlarmFeedMessage) -> Value { + match &message.payload { + Some(alarm_feed_message::Payload::ActiveAlarm(snapshot)) => json!({ + "activeAlarm": { + "alarmFullReference": snapshot.alarm_full_reference, + "sourceObjectReference": snapshot.source_object_reference, + "alarmTypeName": snapshot.alarm_type_name, + "severity": snapshot.severity, + "currentState": AlarmEnumName::condition_state(snapshot.current_state), + "category": snapshot.category, + "description": snapshot.description, + "operatorUser": snapshot.operator_user, + "operatorComment": snapshot.operator_comment, + } + }), + Some(alarm_feed_message::Payload::SnapshotComplete(complete)) => json!({ + "snapshotComplete": complete, + }), + Some(alarm_feed_message::Payload::Transition(transition)) => json!({ + "transition": { + "alarmFullReference": transition.alarm_full_reference, + "sourceObjectReference": transition.source_object_reference, + "alarmTypeName": transition.alarm_type_name, + "transitionKind": AlarmEnumName::transition_kind(transition.transition_kind), + "severity": transition.severity, + "operatorUser": transition.operator_user, + "operatorComment": transition.operator_comment, + "category": transition.category, + "description": transition.description, + } + }), + None => Value::Null, + } +} + +/// Tiny namespace for alarm-enum name lookups used by the alarm-feed +/// renderers; keeps the proto-enum imports off the `main.rs` top level. +struct AlarmEnumName; + +impl AlarmEnumName { + fn condition_state(value: i32) -> String { + use zb_mom_ww_mxgateway_client::generated::mxaccess_gateway::v1::AlarmConditionState; + AlarmConditionState::try_from(value) + .map(|state| state.as_str_name().to_owned()) + .unwrap_or_else(|_| value.to_string()) + } + + fn transition_kind(value: i32) -> String { + use zb_mom_ww_mxgateway_client::generated::mxaccess_gateway::v1::AlarmTransitionKind; + AlarmTransitionKind::try_from(value) + .map(|kind| kind.as_str_name().to_owned()) + .unwrap_or_else(|_| value.to_string()) + } +} + +/// Render an [`AcknowledgeAlarmReply`] as a terse line or a JSON document. +fn print_acknowledge_alarm_reply( + reply: &zb_mom_ww_mxgateway_client::generated::mxaccess_gateway::v1::AcknowledgeAlarmReply, + use_json: bool, +) { + if use_json { + println!( + "{}", + json!({ + "operation": "acknowledge-alarm", + "correlationId": reply.correlation_id, + "protocolStatus": reply.protocol_status.as_ref().map(|status| json!({ + "code": status.code, + "message": status.message, + })), + "hresult": reply.hresult, + "diagnosticMessage": reply.diagnostic_message, + }) + ); + } else { + println!("acknowledge-alarm completed"); + } +} + /// Parse a small but practically-complete subset of RFC3339: /// `YYYY-MM-DDTHH:MM:SS[.fffffffff][Z|+HH:MM|-HH:MM]`. Returns the /// corresponding `prost_types::Timestamp` (Unix seconds + nanoseconds). @@ -1788,6 +2027,47 @@ mod tests { assert_eq!(value["workerProtocolVersion"], 1); } + #[test] + fn parses_stream_alarms_command() { + let parsed = Cli::try_parse_from([ + "mxgw", + "stream-alarms", + "--filter-prefix", + "Tank01", + "--max-events", + "3", + "--json", + ]); + assert!(parsed.is_ok(), "parse failed: {parsed:?}"); + } + + #[test] + fn parses_stream_alarms_command_without_filter_prefix() { + let parsed = Cli::try_parse_from(["mxgw", "stream-alarms"]); + assert!(parsed.is_ok(), "parse failed: {parsed:?}"); + } + + #[test] + fn parses_acknowledge_alarm_command() { + let parsed = Cli::try_parse_from([ + "mxgw", + "acknowledge-alarm", + "--reference", + "Tank01.Level.HiHi", + "--comment", + "ack from cli", + "--operator", + "operator1", + ]); + assert!(parsed.is_ok(), "parse failed: {parsed:?}"); + } + + #[test] + fn acknowledge_alarm_requires_reference() { + let parsed = Cli::try_parse_from(["mxgw", "acknowledge-alarm"]); + assert!(parsed.is_err()); + } + #[test] fn parses_galaxy_watch_command_with_last_seen_and_max_events() { let parsed = Cli::try_parse_from([ diff --git a/clients/rust/src/client.rs b/clients/rust/src/client.rs index 8490284..1078262 100644 --- a/clients/rust/src/client.rs +++ b/clients/rust/src/client.rs @@ -16,9 +16,10 @@ use crate::auth::AuthInterceptor; use crate::error::{ensure_command_success, ensure_protocol_success, Error}; use crate::generated::mxaccess_gateway::v1::mx_access_gateway_client::MxAccessGatewayClient; use crate::generated::mxaccess_gateway::v1::{ - AcknowledgeAlarmReply, AcknowledgeAlarmRequest, ActiveAlarmSnapshot, CloseSessionReply, - CloseSessionRequest, MxCommandReply, MxCommandRequest, MxEvent, OpenSessionReply, - OpenSessionRequest, QueryActiveAlarmsRequest, StreamEventsRequest, + AcknowledgeAlarmReply, AcknowledgeAlarmRequest, ActiveAlarmSnapshot, AlarmFeedMessage, + CloseSessionReply, CloseSessionRequest, MxCommandReply, MxCommandRequest, MxEvent, + OpenSessionReply, OpenSessionRequest, QueryActiveAlarmsRequest, StreamAlarmsRequest, + StreamEventsRequest, }; use crate::options::ClientOptions; use crate::session::Session; @@ -40,6 +41,13 @@ pub type ActiveAlarmStream = std::pin::Pin< Box> + Send + 'static>, >; +/// Pinned, boxed [`AlarmFeedMessage`] stream returned by +/// [`GatewayClient::stream_alarms`]. Errors are pre-mapped from +/// `tonic::Status` to [`Error`]; dropping the stream cancels the call. +pub type AlarmFeedStream = std::pin::Pin< + Box> + Send + 'static>, +>; + /// Thin async wrapper around the generated gateway client. /// /// The wrapper is `Clone`: every clone shares the underlying tonic channel @@ -219,7 +227,9 @@ impl GatewayClient { request: AcknowledgeAlarmRequest, ) -> Result { let mut client = self.inner.clone(); - let response = client.acknowledge_alarm(self.unary_request(request)).await?; + let response = client + .acknowledge_alarm(self.unary_request(request)) + .await?; let reply = response.into_inner(); ensure_protocol_success("acknowledge alarm", reply.protocol_status.as_ref())?; Ok(reply) @@ -252,6 +262,34 @@ impl GatewayClient { Ok(Box::pin(stream)) } + /// Attach to the gateway's central `StreamAlarms` feed. + /// + /// The returned [`AlarmFeedStream`] opens with one [`AlarmFeedMessage`] + /// per currently-active alarm (the ConditionRefresh snapshot), then a + /// single `snapshot_complete`, then a `transition` for every subsequent + /// raise / acknowledge / clear. It is served by the gateway's always-on + /// alarm monitor — no worker session is opened — so any number of clients + /// may attach. Dropping the stream cancels the gRPC call cooperatively. + /// Optional alarm-reference prefix scoping (`request.alarm_filter_prefix`) + /// limits the stream to a sub-tree. + /// + /// # Errors + /// + /// Returns the `tonic::Status` mapped through [`Error::from`] if the + /// server rejects the request. + pub async fn stream_alarms( + &self, + request: StreamAlarmsRequest, + ) -> Result { + let mut client = self.inner.clone(); + let response = client.stream_alarms(self.stream_request(request)).await?; + let stream = futures_util::StreamExt::map(response.into_inner(), |result| { + result.map_err(Error::from) + }); + + Ok(Box::pin(stream)) + } + fn unary_request(&self, message: T) -> Request { let mut request = Request::new(message); request.set_timeout(self.call_timeout); diff --git a/clients/rust/src/lib.rs b/clients/rust/src/lib.rs index 9d3c9b7..a8f1f4d 100644 --- a/clients/rust/src/lib.rs +++ b/clients/rust/src/lib.rs @@ -24,7 +24,7 @@ pub mod version; #[doc(inline)] pub use auth::{ApiKey, AuthInterceptor}; #[doc(inline)] -pub use client::{EventStream, GatewayClient}; +pub use client::{AlarmFeedStream, EventStream, GatewayClient}; #[doc(inline)] pub use error::{CommandError, Error}; #[doc(inline)] diff --git a/clients/rust/tests/client_behavior.rs b/clients/rust/tests/client_behavior.rs index cef4ac2..cb1e546 100644 --- a/clients/rust/tests/client_behavior.rs +++ b/clients/rust/tests/client_behavior.rs @@ -8,6 +8,12 @@ use std::time::Duration; use futures_core::Stream; use futures_util::StreamExt; +use serde_json::Value; +use tokio::net::TcpListener; +use tokio::sync::{mpsc, Mutex}; +use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream}; +use tonic::transport::Server; +use tonic::{Request, Response, Status}; use zb_mom_ww_mxgateway_client::generated::mxaccess_gateway::v1::mx_access_gateway_server::{ MxAccessGateway, MxAccessGatewayServer, }; @@ -25,12 +31,6 @@ use zb_mom_ww_mxgateway_client::{ ApiKey, ClientOptions, CommandError, Error, GatewayClient, MxStatus, MxValue as ClientMxValue, MxValueProjection, }; -use serde_json::Value; -use tokio::net::TcpListener; -use tokio::sync::{mpsc, Mutex}; -use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream}; -use tonic::transport::Server; -use tonic::{Request, Response, Status}; #[tokio::test] async fn fake_server_receives_bearer_metadata_and_raw_client_is_reachable() { @@ -320,7 +320,9 @@ impl MxAccessGateway for FakeGateway { async fn invoke( &self, - request: Request, + request: Request< + zb_mom_ww_mxgateway_client::generated::mxaccess_gateway::v1::MxCommandRequest, + >, ) -> Result, Status> { let request = request.into_inner(); let kind = request diff --git a/clients/rust/tests/proto_fixtures.rs b/clients/rust/tests/proto_fixtures.rs index 0bd0c45..41b906f 100644 --- a/clients/rust/tests/proto_fixtures.rs +++ b/clients/rust/tests/proto_fixtures.rs @@ -1,12 +1,12 @@ use std::fs; use std::path::PathBuf; +use serde_json::Value; use zb_mom_ww_mxgateway_client::generated::mxaccess_gateway::v1::{ mx_command, mx_value, MxCommand, MxCommandKind, MxCommandRequest, MxDataType, MxEvent, MxEventFamily, MxValue, OpenSessionReply, ProtocolStatusCode, RegisterCommand, }; use zb_mom_ww_mxgateway_client::{GATEWAY_PROTOCOL_VERSION, WORKER_PROTOCOL_VERSION}; -use serde_json::Value; #[test] fn generated_golden_fixtures_are_available() {