From 120cd0b1b673405b0c30cc4b2ec5cef995b3d6f2 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Thu, 21 May 2026 19:47:19 -0400 Subject: [PATCH] Add stream-alarms and acknowledge-alarm to the Python CLI Brings the Python mxgateway_cli in line with the other four client CLIs: stream-alarms reads a bounded slice of the gateway's central alarm feed (--filter-prefix, --max-messages, --timeout); acknowledge-alarm is a unary session-less ack (--reference required, --comment, --operator). Co-Authored-By: Claude Opus 4.7 (1M context) --- clients/python/src/mxgateway_cli/commands.py | 90 ++++++++++++++++++++ clients/python/tests/test_cli.py | 22 +++++ 2 files changed, 112 insertions(+) diff --git a/clients/python/src/mxgateway_cli/commands.py b/clients/python/src/mxgateway_cli/commands.py index 0187753..7f6db05 100644 --- a/clients/python/src/mxgateway_cli/commands.py +++ b/clients/python/src/mxgateway_cli/commands.py @@ -404,6 +404,40 @@ def stream_events(**kwargs: Any) -> None: ) +@main.command("stream-alarms") +@gateway_options +@click.option("--filter-prefix", default="", help="Alarm-reference prefix filter.") +@click.option("--max-messages", default=1, type=int, show_default=True) +@click.option("--timeout", default=5.0, type=float, show_default=True) +@click.option("--correlation-id", default="", help="Client correlation id.") +@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.") +def stream_alarms(**kwargs: Any) -> None: + """Stream a bounded number of messages from the gateway's central alarm feed.""" + + _run( + _stream_alarms(**kwargs), + output_json=kwargs["output_json"], + secrets=_secrets(kwargs), + ) + + +@main.command("acknowledge-alarm") +@gateway_options +@click.option("--reference", required=True, help="Alarm full reference to acknowledge.") +@click.option("--comment", default="", help="Acknowledgement comment.") +@click.option("--operator", default="", help="Operator user name.") +@click.option("--correlation-id", default="", help="Client correlation id.") +@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.") +def acknowledge_alarm(**kwargs: Any) -> None: + """Acknowledge an active MXAccess alarm condition (session-less).""" + + _run( + _acknowledge_alarm(**kwargs), + output_json=kwargs["output_json"], + secrets=_secrets(kwargs), + ) + + @main.command() @gateway_options @click.option("--session-id", required=True, help="Gateway session id.") @@ -779,6 +813,34 @@ async def _stream_events(**kwargs: Any) -> dict[str, Any]: return {"events": [_message_dict(event) for event in events]} +async def _stream_alarms(**kwargs: Any) -> dict[str, Any]: + async with await _connect(kwargs) as client: + messages = await _collect_alarm_messages( + client.stream_alarms( + pb.StreamAlarmsRequest( + client_correlation_id=kwargs["correlation_id"], + alarm_filter_prefix=kwargs["filter_prefix"], + ), + ), + max_messages=kwargs["max_messages"], + timeout=kwargs["timeout"], + ) + return {"messages": [_message_dict(message) for message in messages]} + + +async def _acknowledge_alarm(**kwargs: Any) -> dict[str, Any]: + async with await _connect(kwargs) as client: + reply = await client.acknowledge_alarm( + pb.AcknowledgeAlarmRequest( + client_correlation_id=kwargs["correlation_id"], + alarm_full_reference=kwargs["reference"], + comment=kwargs["comment"], + operator_user=kwargs["operator"], + ), + ) + return {"rawReply": _message_dict(reply)} + + async def _write(**kwargs: Any) -> dict[str, Any]: value = _parse_value(kwargs["value"], kwargs["value_type"]) async with await _connect(kwargs) as client: @@ -936,6 +998,34 @@ async def _collect_events( return collected +async def _collect_alarm_messages( + messages: Any, + *, + max_messages: int, + timeout: float, +) -> list[pb.AlarmFeedMessage]: + if max_messages > MAX_AGGREGATE_EVENTS: + raise click.BadParameter( + f"must be less than or equal to {MAX_AGGREGATE_EVENTS}", + param_hint="--max-messages", + ) + + collected: list[pb.AlarmFeedMessage] = [] + iterator = messages.__aiter__() + try: + while len(collected) < max_messages: + collected.append(await asyncio.wait_for(iterator.__anext__(), timeout=timeout)) + except StopAsyncIteration: + pass + except asyncio.TimeoutError: + pass + finally: + close = getattr(iterator, "aclose", None) + if close is not None: + await close() + return collected + + def _parse_value(raw_value: str, value_type: str) -> MxValueInput: normalized = value_type.lower() if normalized == "bool": diff --git a/clients/python/tests/test_cli.py b/clients/python/tests/test_cli.py index b1e411e..2f1d3d8 100644 --- a/clients/python/tests/test_cli.py +++ b/clients/python/tests/test_cli.py @@ -52,6 +52,28 @@ def test_write_parser_rejects_unknown_value_type() -> None: assert "unsupported value type" in result.output +def test_stream_alarms_is_registered() -> None: + runner = CliRunner() + + result = runner.invoke(main, ["stream-alarms", "--help"]) + + assert result.exit_code == 0 + assert "--filter-prefix" in result.output + assert "--max-messages" in result.output + + +def test_acknowledge_alarm_requires_reference() -> None: + runner = CliRunner() + + result = runner.invoke( + main, + ["acknowledge-alarm", "--api-key", "mxgw_test_secret", "--json"], + ) + + assert result.exit_code != 0 + assert "--reference" in result.output + + def test_cli_error_output_redacts_api_key() -> None: runner = CliRunner()