From 1d3c8edb444ed21f569784c3585d07c846238c27 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Thu, 21 May 2026 19:01:49 -0400 Subject: [PATCH] Add stream-alarms and acknowledge-alarm to the Rust CLI stream-alarms attaches to the gateway's central alarm feed (mirrors stream-events: --max-events cap, --json/--jsonl, --filter-prefix); acknowledge-alarm is a unary session-less ack (--reference required, --comment, --operator). Co-Authored-By: Claude Opus 4.7 (1M context) --- clients/rust/crates/mxgw-cli/src/main.rs | 251 ++++++++++++++++++++++- 1 file changed, 249 insertions(+), 2 deletions(-) diff --git a/clients/rust/crates/mxgw-cli/src/main.rs b/clients/rust/crates/mxgw-cli/src/main.rs index 429fe70..f510851 100644 --- a/clients/rust/crates/mxgw-cli/src/main.rs +++ b/clients/rust/crates/mxgw-cli/src/main.rs @@ -18,8 +18,9 @@ use clap::{Args, Parser, Subcommand, ValueEnum}; use futures_util::StreamExt; use mxgateway_client::generated::galaxy_repository::v1::DeployEvent; use mxgateway_client::generated::mxaccess_gateway::v1::{ - CloseSessionRequest, MxCommand, MxCommandKind, MxCommandRequest, MxEvent, MxEventFamily, - MxValue as ProtoMxValue, OpenSessionRequest, PingCommand, StreamEventsRequest, Write2BulkEntry, + alarm_feed_message, AcknowledgeAlarmRequest, AlarmFeedMessage, CloseSessionRequest, MxCommand, + MxCommandKind, MxCommandRequest, MxEvent, MxEventFamily, MxValue as ProtoMxValue, + OpenSessionRequest, PingCommand, StreamAlarmsRequest, StreamEventsRequest, Write2BulkEntry, WriteBulkEntry, WriteSecured2BulkEntry, WriteSecuredBulkEntry, }; use mxgateway_client::{ @@ -272,6 +273,24 @@ enum Command { #[arg(long)] jsonl: bool, }, + /// Attach to the gateway's session-less central alarm feed. The stream + /// opens with one `active_alarm` per currently-active alarm, then a + /// single `snapshot_complete`, then a `transition` for every subsequent + /// raise / acknowledge / clear. + StreamAlarms { + #[command(flatten)] + connection: ConnectionArgs, + /// Optional alarm-reference prefix scoping the feed to an equipment + /// sub-tree. Omit to stream every active alarm. + #[arg(long)] + filter_prefix: Option, + #[arg(long, default_value_t = 1)] + max_events: usize, + #[arg(long)] + json: bool, + #[arg(long)] + jsonl: bool, + }, Write { #[command(flatten)] connection: ConnectionArgs, @@ -310,6 +329,20 @@ enum Command { #[arg(long)] json: bool, }, + /// Acknowledge an active MXAccess alarm condition through the gateway's + /// session-less AcknowledgeAlarm RPC. + AcknowledgeAlarm { + #[command(flatten)] + connection: ConnectionArgs, + #[arg(long)] + reference: String, + #[arg(long, default_value = "")] + comment: String, + #[arg(long, default_value = "")] + operator: String, + #[arg(long)] + json: bool, + }, Smoke { #[command(flatten)] connection: ConnectionArgs, @@ -788,6 +821,52 @@ async fn dispatch(command: Command) -> Result<(), Error> { println!("{}", json!({ "eventCount": event_count, "events": events })); } } + Command::StreamAlarms { + connection, + filter_prefix, + max_events, + json, + jsonl, + } => { + if max_events > MAX_AGGREGATE_EVENTS { + return Err(Error::InvalidArgument { + name: "max-events".to_owned(), + detail: format!("must be less than or equal to {MAX_AGGREGATE_EVENTS}"), + }); + } + + let client = connect(connection).await?; + let mut stream = client + .stream_alarms(StreamAlarmsRequest { + client_correlation_id: mxgateway_client::next_correlation_id( + "cli-stream-alarms", + ), + alarm_filter_prefix: filter_prefix.unwrap_or_default(), + }) + .await?; + let mut messages: Vec = Vec::new(); + let mut message_count = 0usize; + while message_count < max_events { + let Some(message) = stream.next().await else { + break; + }; + let message = message?; + message_count += 1; + if jsonl { + println!("{}", alarm_feed_message_to_json(&message)); + } else if json { + messages.push(alarm_feed_message_to_json(&message)); + } else { + println!("{}", alarm_feed_message_summary(&message)); + } + } + if json { + println!( + "{}", + json!({ "messageCount": message_count, "messages": messages }) + ); + } + } Command::Write { connection, session_id, @@ -832,6 +911,26 @@ async fn dispatch(command: Command) -> Result<(), Error> { .await?; print_ok("write2", json); } + Command::AcknowledgeAlarm { + connection, + reference, + comment, + operator, + json, + } => { + let client = connect(connection).await?; + let reply = client + .acknowledge_alarm(AcknowledgeAlarmRequest { + client_correlation_id: mxgateway_client::next_correlation_id( + "cli-acknowledge-alarm", + ), + alarm_full_reference: reference, + comment, + operator_user: operator, + }) + .await?; + print_acknowledge_alarm_reply(&reply, json); + } Command::Galaxy(galaxy_command) => run_galaxy(galaxy_command).await?, Command::Smoke { connection, @@ -1533,6 +1632,113 @@ fn print_deploy_event(event: &DeployEvent, use_json: bool) { } } +/// Render a streamed [`AlarmFeedMessage`] as a terse one-line summary that +/// distinguishes the three `payload` oneof cases. +fn alarm_feed_message_summary(message: &AlarmFeedMessage) -> String { + match &message.payload { + Some(alarm_feed_message::Payload::ActiveAlarm(snapshot)) => { + format!( + "active-alarm {} state={}", + snapshot.alarm_full_reference, + AlarmEnumName::condition_state(snapshot.current_state) + ) + } + Some(alarm_feed_message::Payload::SnapshotComplete(complete)) => { + format!("snapshot-complete {complete}") + } + Some(alarm_feed_message::Payload::Transition(transition)) => { + format!( + "transition {} kind={}", + transition.alarm_full_reference, + AlarmEnumName::transition_kind(transition.transition_kind) + ) + } + None => "(empty)".to_owned(), + } +} + +/// Render a streamed [`AlarmFeedMessage`] as a JSON object whose single +/// top-level key names the active `payload` oneof case, mirroring the +/// protobuf-JSON the .NET/Go/Java/Python CLIs emit. +fn alarm_feed_message_to_json(message: &AlarmFeedMessage) -> Value { + match &message.payload { + Some(alarm_feed_message::Payload::ActiveAlarm(snapshot)) => json!({ + "activeAlarm": { + "alarmFullReference": snapshot.alarm_full_reference, + "sourceObjectReference": snapshot.source_object_reference, + "alarmTypeName": snapshot.alarm_type_name, + "severity": snapshot.severity, + "currentState": AlarmEnumName::condition_state(snapshot.current_state), + "category": snapshot.category, + "description": snapshot.description, + "operatorUser": snapshot.operator_user, + "operatorComment": snapshot.operator_comment, + } + }), + Some(alarm_feed_message::Payload::SnapshotComplete(complete)) => json!({ + "snapshotComplete": complete, + }), + Some(alarm_feed_message::Payload::Transition(transition)) => json!({ + "transition": { + "alarmFullReference": transition.alarm_full_reference, + "sourceObjectReference": transition.source_object_reference, + "alarmTypeName": transition.alarm_type_name, + "transitionKind": AlarmEnumName::transition_kind(transition.transition_kind), + "severity": transition.severity, + "operatorUser": transition.operator_user, + "operatorComment": transition.operator_comment, + "category": transition.category, + "description": transition.description, + } + }), + None => Value::Null, + } +} + +/// Tiny namespace for alarm-enum name lookups used by the alarm-feed +/// renderers; keeps the proto-enum imports off the `main.rs` top level. +struct AlarmEnumName; + +impl AlarmEnumName { + fn condition_state(value: i32) -> String { + use mxgateway_client::generated::mxaccess_gateway::v1::AlarmConditionState; + AlarmConditionState::try_from(value) + .map(|state| state.as_str_name().to_owned()) + .unwrap_or_else(|_| value.to_string()) + } + + fn transition_kind(value: i32) -> String { + use mxgateway_client::generated::mxaccess_gateway::v1::AlarmTransitionKind; + AlarmTransitionKind::try_from(value) + .map(|kind| kind.as_str_name().to_owned()) + .unwrap_or_else(|_| value.to_string()) + } +} + +/// Render an [`AcknowledgeAlarmReply`] as a terse line or a JSON document. +fn print_acknowledge_alarm_reply( + reply: &mxgateway_client::generated::mxaccess_gateway::v1::AcknowledgeAlarmReply, + use_json: bool, +) { + if use_json { + println!( + "{}", + json!({ + "operation": "acknowledge-alarm", + "correlationId": reply.correlation_id, + "protocolStatus": reply.protocol_status.as_ref().map(|status| json!({ + "code": status.code, + "message": status.message, + })), + "hresult": reply.hresult, + "diagnosticMessage": reply.diagnostic_message, + }) + ); + } else { + println!("acknowledge-alarm completed"); + } +} + /// Render a streamed [`MxEvent`] as a JSON object. The scalar value is /// projected into protojson-style `*Value` keys so the cross-language e2e /// matrix can extract and compare event values uniformly across all five @@ -1793,6 +1999,47 @@ mod tests { ); } + #[test] + fn parses_stream_alarms_command() { + let parsed = Cli::try_parse_from([ + "mxgw", + "stream-alarms", + "--filter-prefix", + "Tank01", + "--max-events", + "3", + "--json", + ]); + assert!(parsed.is_ok(), "parse failed: {parsed:?}"); + } + + #[test] + fn parses_stream_alarms_command_without_filter_prefix() { + let parsed = Cli::try_parse_from(["mxgw", "stream-alarms"]); + assert!(parsed.is_ok(), "parse failed: {parsed:?}"); + } + + #[test] + fn parses_acknowledge_alarm_command() { + let parsed = Cli::try_parse_from([ + "mxgw", + "acknowledge-alarm", + "--reference", + "Tank01.Level.HiHi", + "--comment", + "ack from cli", + "--operator", + "operator1", + ]); + assert!(parsed.is_ok(), "parse failed: {parsed:?}"); + } + + #[test] + fn acknowledge_alarm_requires_reference() { + let parsed = Cli::try_parse_from(["mxgw", "acknowledge-alarm"]); + assert!(parsed.is_err()); + } + #[test] fn parses_galaxy_watch_command_with_last_seen_and_max_events() { let parsed = Cli::try_parse_from([