Python client: port stream-alarms and acknowledge-alarm

Adds the session-less alarm CLI subcommands to mxgw-py. stream-alarms reads a
bounded slice of the gateway's central alarm feed (--filter-prefix,
--max-messages, --timeout, --json; aggregate `{messages: [...]}`);
acknowledge-alarm is a unary ack (--reference required, --comment, --operator).
GatewayClient.stream_alarms joins query_active_alarms via a
_canceling_alarm_feed_iterator helper mirroring the existing
_canceling_active_alarms_iterator pattern.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-24 06:45:54 -04:00
parent 7de4efeb02
commit 828e3e6cf6
3 changed files with 146 additions and 0 deletions
@@ -172,6 +172,28 @@ class GatewayClient:
call = self.raw_stub.QueryActiveAlarms(request, **kwargs)
return _canceling_active_alarms_iterator(call)
def stream_alarms(
self,
request: pb.StreamAlarmsRequest,
*,
metadata: Sequence[tuple[str, str]] | None = None,
) -> AsyncIterator[pb.AlarmFeedMessage]:
"""Attach to the gateway's central alarm feed.
The stream opens with one ``AlarmFeedMessage`` per currently-active
alarm (the ConditionRefresh snapshot), then a single
``snapshot_complete``, then a ``transition`` for every subsequent
raise / acknowledge / clear. Served by the gateway's always-on alarm
monitor — no worker session is opened — so any number of clients may
attach. Optionally scoped by alarm-reference prefix
(``request.alarm_filter_prefix``).
"""
kwargs: dict[str, Any] = {"metadata": merge_metadata(self.options.api_key, metadata)}
if self.options.stream_timeout is not None:
kwargs["timeout"] = self.options.stream_timeout
call = self.raw_stub.StreamAlarms(request, **kwargs)
return _canceling_alarm_feed_iterator(call)
async def _unary(
self,
operation: str,
@@ -223,3 +245,15 @@ async def _canceling_active_alarms_iterator(call: Any) -> AsyncIterator[pb.Activ
cancel = getattr(call, "cancel", None)
if cancel is not None:
cancel()
async def _canceling_alarm_feed_iterator(call: Any) -> AsyncIterator[pb.AlarmFeedMessage]:
try:
async for message in call:
yield message
except grpc.RpcError as error:
raise map_rpc_error("stream alarms", error) from error
finally:
cancel = getattr(call, "cancel", None)
if cancel is not None:
cancel()
@@ -386,6 +386,40 @@ def stream_events(**kwargs: Any) -> None:
)
@main.command("stream-alarms")
@gateway_options
@click.option("--filter-prefix", default="", help="Alarm-reference prefix filter.")
@click.option("--max-messages", default=1, type=int, show_default=True)
@click.option("--timeout", default=5.0, type=float, show_default=True)
@click.option("--correlation-id", default="", help="Client correlation id.")
@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.")
def stream_alarms(**kwargs: Any) -> None:
"""Stream a bounded number of messages from the gateway's central alarm feed."""
_run(
_stream_alarms(**kwargs),
output_json=kwargs["output_json"],
secrets=_secrets(kwargs),
)
@main.command("acknowledge-alarm")
@gateway_options
@click.option("--reference", required=True, help="Alarm full reference to acknowledge.")
@click.option("--comment", default="", help="Acknowledgement comment.")
@click.option("--operator", default="", help="Operator user name.")
@click.option("--correlation-id", default="", help="Client correlation id.")
@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.")
def acknowledge_alarm(**kwargs: Any) -> None:
"""Acknowledge an active MXAccess alarm condition (session-less)."""
_run(
_acknowledge_alarm(**kwargs),
output_json=kwargs["output_json"],
secrets=_secrets(kwargs),
)
@main.command()
@gateway_options
@click.option("--session-id", required=True, help="Gateway session id.")
@@ -761,6 +795,34 @@ async def _stream_events(**kwargs: Any) -> dict[str, Any]:
return {"events": [_message_dict(event) for event in events]}
async def _stream_alarms(**kwargs: Any) -> dict[str, Any]:
async with await _connect(kwargs) as client:
messages = await _collect_alarm_messages(
client.stream_alarms(
pb.StreamAlarmsRequest(
client_correlation_id=kwargs["correlation_id"],
alarm_filter_prefix=kwargs["filter_prefix"],
),
),
max_messages=kwargs["max_messages"],
timeout=kwargs["timeout"],
)
return {"messages": [_message_dict(message) for message in messages]}
async def _acknowledge_alarm(**kwargs: Any) -> dict[str, Any]:
async with await _connect(kwargs) as client:
reply = await client.acknowledge_alarm(
pb.AcknowledgeAlarmRequest(
client_correlation_id=kwargs["correlation_id"],
alarm_full_reference=kwargs["reference"],
comment=kwargs["comment"],
operator_user=kwargs["operator"],
),
)
return _message_dict(reply)
async def _write(**kwargs: Any) -> dict[str, Any]:
value = _parse_value(kwargs["value"], kwargs["value_type"])
async with await _connect(kwargs) as client:
@@ -912,6 +974,34 @@ async def _collect_events(
return collected
async def _collect_alarm_messages(
messages: Any,
*,
max_messages: int,
timeout: float,
) -> list[pb.AlarmFeedMessage]:
if max_messages > MAX_AGGREGATE_EVENTS:
raise click.BadParameter(
f"must be less than or equal to {MAX_AGGREGATE_EVENTS}",
param_hint="--max-messages",
)
collected: list[pb.AlarmFeedMessage] = []
iterator = messages.__aiter__()
try:
while len(collected) < max_messages:
collected.append(await asyncio.wait_for(iterator.__anext__(), timeout=timeout))
except StopAsyncIteration:
pass
except asyncio.TimeoutError:
pass
finally:
close = getattr(iterator, "aclose", None)
if close is not None:
await close()
return collected
def _parse_value(raw_value: str, value_type: str) -> MxValueInput:
normalized = value_type.lower()
if normalized == "bool":
+22
View File
@@ -50,6 +50,28 @@ def test_write_parser_rejects_unknown_value_type() -> None:
assert "unsupported value type" in result.output
def test_stream_alarms_is_registered() -> None:
runner = CliRunner()
result = runner.invoke(main, ["stream-alarms", "--help"])
assert result.exit_code == 0
assert "--filter-prefix" in result.output
assert "--max-messages" in result.output
def test_acknowledge_alarm_requires_reference() -> None:
runner = CliRunner()
result = runner.invoke(
main,
["acknowledge-alarm", "--api-key", "mxgw_test_secret", "--json"],
)
assert result.exit_code != 0
assert "--reference" in result.output
def test_cli_error_output_redacts_api_key() -> None:
runner = CliRunner()