Add stream-alarms and acknowledge-alarm to the Rust CLI
stream-alarms attaches to the gateway's central alarm feed (mirrors stream-events: --max-events cap, --json/--jsonl, --filter-prefix); acknowledge-alarm is a unary session-less ack (--reference required, --comment, --operator). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -18,8 +18,9 @@ use clap::{Args, Parser, Subcommand, ValueEnum};
|
|||||||
use futures_util::StreamExt;
|
use futures_util::StreamExt;
|
||||||
use mxgateway_client::generated::galaxy_repository::v1::DeployEvent;
|
use mxgateway_client::generated::galaxy_repository::v1::DeployEvent;
|
||||||
use mxgateway_client::generated::mxaccess_gateway::v1::{
|
use mxgateway_client::generated::mxaccess_gateway::v1::{
|
||||||
CloseSessionRequest, MxCommand, MxCommandKind, MxCommandRequest, MxEvent, MxEventFamily,
|
alarm_feed_message, AcknowledgeAlarmRequest, AlarmFeedMessage, CloseSessionRequest, MxCommand,
|
||||||
MxValue as ProtoMxValue, OpenSessionRequest, PingCommand, StreamEventsRequest, Write2BulkEntry,
|
MxCommandKind, MxCommandRequest, MxEvent, MxEventFamily, MxValue as ProtoMxValue,
|
||||||
|
OpenSessionRequest, PingCommand, StreamAlarmsRequest, StreamEventsRequest, Write2BulkEntry,
|
||||||
WriteBulkEntry, WriteSecured2BulkEntry, WriteSecuredBulkEntry,
|
WriteBulkEntry, WriteSecured2BulkEntry, WriteSecuredBulkEntry,
|
||||||
};
|
};
|
||||||
use mxgateway_client::{
|
use mxgateway_client::{
|
||||||
@@ -272,6 +273,24 @@ enum Command {
|
|||||||
#[arg(long)]
|
#[arg(long)]
|
||||||
jsonl: bool,
|
jsonl: bool,
|
||||||
},
|
},
|
||||||
|
/// Attach to the gateway's session-less central alarm feed. The stream
|
||||||
|
/// opens with one `active_alarm` per currently-active alarm, then a
|
||||||
|
/// single `snapshot_complete`, then a `transition` for every subsequent
|
||||||
|
/// raise / acknowledge / clear.
|
||||||
|
StreamAlarms {
|
||||||
|
#[command(flatten)]
|
||||||
|
connection: ConnectionArgs,
|
||||||
|
/// Optional alarm-reference prefix scoping the feed to an equipment
|
||||||
|
/// sub-tree. Omit to stream every active alarm.
|
||||||
|
#[arg(long)]
|
||||||
|
filter_prefix: Option<String>,
|
||||||
|
#[arg(long, default_value_t = 1)]
|
||||||
|
max_events: usize,
|
||||||
|
#[arg(long)]
|
||||||
|
json: bool,
|
||||||
|
#[arg(long)]
|
||||||
|
jsonl: bool,
|
||||||
|
},
|
||||||
Write {
|
Write {
|
||||||
#[command(flatten)]
|
#[command(flatten)]
|
||||||
connection: ConnectionArgs,
|
connection: ConnectionArgs,
|
||||||
@@ -310,6 +329,20 @@ enum Command {
|
|||||||
#[arg(long)]
|
#[arg(long)]
|
||||||
json: bool,
|
json: bool,
|
||||||
},
|
},
|
||||||
|
/// Acknowledge an active MXAccess alarm condition through the gateway's
|
||||||
|
/// session-less AcknowledgeAlarm RPC.
|
||||||
|
AcknowledgeAlarm {
|
||||||
|
#[command(flatten)]
|
||||||
|
connection: ConnectionArgs,
|
||||||
|
#[arg(long)]
|
||||||
|
reference: String,
|
||||||
|
#[arg(long, default_value = "")]
|
||||||
|
comment: String,
|
||||||
|
#[arg(long, default_value = "")]
|
||||||
|
operator: String,
|
||||||
|
#[arg(long)]
|
||||||
|
json: bool,
|
||||||
|
},
|
||||||
Smoke {
|
Smoke {
|
||||||
#[command(flatten)]
|
#[command(flatten)]
|
||||||
connection: ConnectionArgs,
|
connection: ConnectionArgs,
|
||||||
@@ -788,6 +821,52 @@ async fn dispatch(command: Command) -> Result<(), Error> {
|
|||||||
println!("{}", json!({ "eventCount": event_count, "events": events }));
|
println!("{}", json!({ "eventCount": event_count, "events": events }));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Command::StreamAlarms {
|
||||||
|
connection,
|
||||||
|
filter_prefix,
|
||||||
|
max_events,
|
||||||
|
json,
|
||||||
|
jsonl,
|
||||||
|
} => {
|
||||||
|
if max_events > MAX_AGGREGATE_EVENTS {
|
||||||
|
return Err(Error::InvalidArgument {
|
||||||
|
name: "max-events".to_owned(),
|
||||||
|
detail: format!("must be less than or equal to {MAX_AGGREGATE_EVENTS}"),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
let client = connect(connection).await?;
|
||||||
|
let mut stream = client
|
||||||
|
.stream_alarms(StreamAlarmsRequest {
|
||||||
|
client_correlation_id: mxgateway_client::next_correlation_id(
|
||||||
|
"cli-stream-alarms",
|
||||||
|
),
|
||||||
|
alarm_filter_prefix: filter_prefix.unwrap_or_default(),
|
||||||
|
})
|
||||||
|
.await?;
|
||||||
|
let mut messages: Vec<Value> = Vec::new();
|
||||||
|
let mut message_count = 0usize;
|
||||||
|
while message_count < max_events {
|
||||||
|
let Some(message) = stream.next().await else {
|
||||||
|
break;
|
||||||
|
};
|
||||||
|
let message = message?;
|
||||||
|
message_count += 1;
|
||||||
|
if jsonl {
|
||||||
|
println!("{}", alarm_feed_message_to_json(&message));
|
||||||
|
} else if json {
|
||||||
|
messages.push(alarm_feed_message_to_json(&message));
|
||||||
|
} else {
|
||||||
|
println!("{}", alarm_feed_message_summary(&message));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if json {
|
||||||
|
println!(
|
||||||
|
"{}",
|
||||||
|
json!({ "messageCount": message_count, "messages": messages })
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
Command::Write {
|
Command::Write {
|
||||||
connection,
|
connection,
|
||||||
session_id,
|
session_id,
|
||||||
@@ -832,6 +911,26 @@ async fn dispatch(command: Command) -> Result<(), Error> {
|
|||||||
.await?;
|
.await?;
|
||||||
print_ok("write2", json);
|
print_ok("write2", json);
|
||||||
}
|
}
|
||||||
|
Command::AcknowledgeAlarm {
|
||||||
|
connection,
|
||||||
|
reference,
|
||||||
|
comment,
|
||||||
|
operator,
|
||||||
|
json,
|
||||||
|
} => {
|
||||||
|
let client = connect(connection).await?;
|
||||||
|
let reply = client
|
||||||
|
.acknowledge_alarm(AcknowledgeAlarmRequest {
|
||||||
|
client_correlation_id: mxgateway_client::next_correlation_id(
|
||||||
|
"cli-acknowledge-alarm",
|
||||||
|
),
|
||||||
|
alarm_full_reference: reference,
|
||||||
|
comment,
|
||||||
|
operator_user: operator,
|
||||||
|
})
|
||||||
|
.await?;
|
||||||
|
print_acknowledge_alarm_reply(&reply, json);
|
||||||
|
}
|
||||||
Command::Galaxy(galaxy_command) => run_galaxy(galaxy_command).await?,
|
Command::Galaxy(galaxy_command) => run_galaxy(galaxy_command).await?,
|
||||||
Command::Smoke {
|
Command::Smoke {
|
||||||
connection,
|
connection,
|
||||||
@@ -1533,6 +1632,113 @@ fn print_deploy_event(event: &DeployEvent, use_json: bool) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Render a streamed [`AlarmFeedMessage`] as a terse one-line summary that
|
||||||
|
/// distinguishes the three `payload` oneof cases.
|
||||||
|
fn alarm_feed_message_summary(message: &AlarmFeedMessage) -> String {
|
||||||
|
match &message.payload {
|
||||||
|
Some(alarm_feed_message::Payload::ActiveAlarm(snapshot)) => {
|
||||||
|
format!(
|
||||||
|
"active-alarm {} state={}",
|
||||||
|
snapshot.alarm_full_reference,
|
||||||
|
AlarmEnumName::condition_state(snapshot.current_state)
|
||||||
|
)
|
||||||
|
}
|
||||||
|
Some(alarm_feed_message::Payload::SnapshotComplete(complete)) => {
|
||||||
|
format!("snapshot-complete {complete}")
|
||||||
|
}
|
||||||
|
Some(alarm_feed_message::Payload::Transition(transition)) => {
|
||||||
|
format!(
|
||||||
|
"transition {} kind={}",
|
||||||
|
transition.alarm_full_reference,
|
||||||
|
AlarmEnumName::transition_kind(transition.transition_kind)
|
||||||
|
)
|
||||||
|
}
|
||||||
|
None => "(empty)".to_owned(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Render a streamed [`AlarmFeedMessage`] as a JSON object whose single
|
||||||
|
/// top-level key names the active `payload` oneof case, mirroring the
|
||||||
|
/// protobuf-JSON the .NET/Go/Java/Python CLIs emit.
|
||||||
|
fn alarm_feed_message_to_json(message: &AlarmFeedMessage) -> Value {
|
||||||
|
match &message.payload {
|
||||||
|
Some(alarm_feed_message::Payload::ActiveAlarm(snapshot)) => json!({
|
||||||
|
"activeAlarm": {
|
||||||
|
"alarmFullReference": snapshot.alarm_full_reference,
|
||||||
|
"sourceObjectReference": snapshot.source_object_reference,
|
||||||
|
"alarmTypeName": snapshot.alarm_type_name,
|
||||||
|
"severity": snapshot.severity,
|
||||||
|
"currentState": AlarmEnumName::condition_state(snapshot.current_state),
|
||||||
|
"category": snapshot.category,
|
||||||
|
"description": snapshot.description,
|
||||||
|
"operatorUser": snapshot.operator_user,
|
||||||
|
"operatorComment": snapshot.operator_comment,
|
||||||
|
}
|
||||||
|
}),
|
||||||
|
Some(alarm_feed_message::Payload::SnapshotComplete(complete)) => json!({
|
||||||
|
"snapshotComplete": complete,
|
||||||
|
}),
|
||||||
|
Some(alarm_feed_message::Payload::Transition(transition)) => json!({
|
||||||
|
"transition": {
|
||||||
|
"alarmFullReference": transition.alarm_full_reference,
|
||||||
|
"sourceObjectReference": transition.source_object_reference,
|
||||||
|
"alarmTypeName": transition.alarm_type_name,
|
||||||
|
"transitionKind": AlarmEnumName::transition_kind(transition.transition_kind),
|
||||||
|
"severity": transition.severity,
|
||||||
|
"operatorUser": transition.operator_user,
|
||||||
|
"operatorComment": transition.operator_comment,
|
||||||
|
"category": transition.category,
|
||||||
|
"description": transition.description,
|
||||||
|
}
|
||||||
|
}),
|
||||||
|
None => Value::Null,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Tiny namespace for alarm-enum name lookups used by the alarm-feed
|
||||||
|
/// renderers; keeps the proto-enum imports off the `main.rs` top level.
|
||||||
|
struct AlarmEnumName;
|
||||||
|
|
||||||
|
impl AlarmEnumName {
|
||||||
|
fn condition_state(value: i32) -> String {
|
||||||
|
use mxgateway_client::generated::mxaccess_gateway::v1::AlarmConditionState;
|
||||||
|
AlarmConditionState::try_from(value)
|
||||||
|
.map(|state| state.as_str_name().to_owned())
|
||||||
|
.unwrap_or_else(|_| value.to_string())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn transition_kind(value: i32) -> String {
|
||||||
|
use mxgateway_client::generated::mxaccess_gateway::v1::AlarmTransitionKind;
|
||||||
|
AlarmTransitionKind::try_from(value)
|
||||||
|
.map(|kind| kind.as_str_name().to_owned())
|
||||||
|
.unwrap_or_else(|_| value.to_string())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Render an [`AcknowledgeAlarmReply`] as a terse line or a JSON document.
|
||||||
|
fn print_acknowledge_alarm_reply(
|
||||||
|
reply: &mxgateway_client::generated::mxaccess_gateway::v1::AcknowledgeAlarmReply,
|
||||||
|
use_json: bool,
|
||||||
|
) {
|
||||||
|
if use_json {
|
||||||
|
println!(
|
||||||
|
"{}",
|
||||||
|
json!({
|
||||||
|
"operation": "acknowledge-alarm",
|
||||||
|
"correlationId": reply.correlation_id,
|
||||||
|
"protocolStatus": reply.protocol_status.as_ref().map(|status| json!({
|
||||||
|
"code": status.code,
|
||||||
|
"message": status.message,
|
||||||
|
})),
|
||||||
|
"hresult": reply.hresult,
|
||||||
|
"diagnosticMessage": reply.diagnostic_message,
|
||||||
|
})
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
println!("acknowledge-alarm completed");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Render a streamed [`MxEvent`] as a JSON object. The scalar value is
|
/// Render a streamed [`MxEvent`] as a JSON object. The scalar value is
|
||||||
/// projected into protojson-style `*Value` keys so the cross-language e2e
|
/// projected into protojson-style `*Value` keys so the cross-language e2e
|
||||||
/// matrix can extract and compare event values uniformly across all five
|
/// matrix can extract and compare event values uniformly across all five
|
||||||
@@ -1793,6 +1999,47 @@ mod tests {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn parses_stream_alarms_command() {
|
||||||
|
let parsed = Cli::try_parse_from([
|
||||||
|
"mxgw",
|
||||||
|
"stream-alarms",
|
||||||
|
"--filter-prefix",
|
||||||
|
"Tank01",
|
||||||
|
"--max-events",
|
||||||
|
"3",
|
||||||
|
"--json",
|
||||||
|
]);
|
||||||
|
assert!(parsed.is_ok(), "parse failed: {parsed:?}");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn parses_stream_alarms_command_without_filter_prefix() {
|
||||||
|
let parsed = Cli::try_parse_from(["mxgw", "stream-alarms"]);
|
||||||
|
assert!(parsed.is_ok(), "parse failed: {parsed:?}");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn parses_acknowledge_alarm_command() {
|
||||||
|
let parsed = Cli::try_parse_from([
|
||||||
|
"mxgw",
|
||||||
|
"acknowledge-alarm",
|
||||||
|
"--reference",
|
||||||
|
"Tank01.Level.HiHi",
|
||||||
|
"--comment",
|
||||||
|
"ack from cli",
|
||||||
|
"--operator",
|
||||||
|
"operator1",
|
||||||
|
]);
|
||||||
|
assert!(parsed.is_ok(), "parse failed: {parsed:?}");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn acknowledge_alarm_requires_reference() {
|
||||||
|
let parsed = Cli::try_parse_from(["mxgw", "acknowledge-alarm"]);
|
||||||
|
assert!(parsed.is_err());
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn parses_galaxy_watch_command_with_last_seen_and_max_events() {
|
fn parses_galaxy_watch_command_with_last_seen_and_max_events() {
|
||||||
let parsed = Cli::try_parse_from([
|
let parsed = Cli::try_parse_from([
|
||||||
|
|||||||
Reference in New Issue
Block a user