diff --git a/clients/python/src/zb_mom_ww_mxgateway/client.py b/clients/python/src/zb_mom_ww_mxgateway/client.py index b08dc64..e271788 100644 --- a/clients/python/src/zb_mom_ww_mxgateway/client.py +++ b/clients/python/src/zb_mom_ww_mxgateway/client.py @@ -172,6 +172,28 @@ class GatewayClient: call = self.raw_stub.QueryActiveAlarms(request, **kwargs) return _canceling_active_alarms_iterator(call) + def stream_alarms( + self, + request: pb.StreamAlarmsRequest, + *, + metadata: Sequence[tuple[str, str]] | None = None, + ) -> AsyncIterator[pb.AlarmFeedMessage]: + """Attach to the gateway's central alarm feed. + + The stream opens with one ``AlarmFeedMessage`` per currently-active + alarm (the ConditionRefresh snapshot), then a single + ``snapshot_complete``, then a ``transition`` for every subsequent + raise / acknowledge / clear. Served by the gateway's always-on alarm + monitor — no worker session is opened — so any number of clients may + attach. Optionally scoped by alarm-reference prefix + (``request.alarm_filter_prefix``). + """ + kwargs: dict[str, Any] = {"metadata": merge_metadata(self.options.api_key, metadata)} + if self.options.stream_timeout is not None: + kwargs["timeout"] = self.options.stream_timeout + call = self.raw_stub.StreamAlarms(request, **kwargs) + return _canceling_alarm_feed_iterator(call) + async def _unary( self, operation: str, @@ -223,3 +245,15 @@ async def _canceling_active_alarms_iterator(call: Any) -> AsyncIterator[pb.Activ cancel = getattr(call, "cancel", None) if cancel is not None: cancel() + + +async def _canceling_alarm_feed_iterator(call: Any) -> AsyncIterator[pb.AlarmFeedMessage]: + try: + async for message in call: + yield message + except grpc.RpcError as error: + raise map_rpc_error("stream alarms", error) from error + finally: + cancel = getattr(call, "cancel", None) + if cancel is not None: + cancel() diff --git a/clients/python/src/zb_mom_ww_mxgateway_cli/commands.py b/clients/python/src/zb_mom_ww_mxgateway_cli/commands.py index fe05355..c486b69 100644 --- a/clients/python/src/zb_mom_ww_mxgateway_cli/commands.py +++ b/clients/python/src/zb_mom_ww_mxgateway_cli/commands.py @@ -386,6 +386,40 @@ def stream_events(**kwargs: Any) -> None: ) +@main.command("stream-alarms") +@gateway_options +@click.option("--filter-prefix", default="", help="Alarm-reference prefix filter.") +@click.option("--max-messages", default=1, type=int, show_default=True) +@click.option("--timeout", default=5.0, type=float, show_default=True) +@click.option("--correlation-id", default="", help="Client correlation id.") +@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.") +def stream_alarms(**kwargs: Any) -> None: + """Stream a bounded number of messages from the gateway's central alarm feed.""" + + _run( + _stream_alarms(**kwargs), + output_json=kwargs["output_json"], + secrets=_secrets(kwargs), + ) + + +@main.command("acknowledge-alarm") +@gateway_options +@click.option("--reference", required=True, help="Alarm full reference to acknowledge.") +@click.option("--comment", default="", help="Acknowledgement comment.") +@click.option("--operator", default="", help="Operator user name.") +@click.option("--correlation-id", default="", help="Client correlation id.") +@click.option("--json", "output_json", is_flag=True, help="Emit JSON output.") +def acknowledge_alarm(**kwargs: Any) -> None: + """Acknowledge an active MXAccess alarm condition (session-less).""" + + _run( + _acknowledge_alarm(**kwargs), + output_json=kwargs["output_json"], + secrets=_secrets(kwargs), + ) + + @main.command() @gateway_options @click.option("--session-id", required=True, help="Gateway session id.") @@ -761,6 +795,34 @@ async def _stream_events(**kwargs: Any) -> dict[str, Any]: return {"events": [_message_dict(event) for event in events]} +async def _stream_alarms(**kwargs: Any) -> dict[str, Any]: + async with await _connect(kwargs) as client: + messages = await _collect_alarm_messages( + client.stream_alarms( + pb.StreamAlarmsRequest( + client_correlation_id=kwargs["correlation_id"], + alarm_filter_prefix=kwargs["filter_prefix"], + ), + ), + max_messages=kwargs["max_messages"], + timeout=kwargs["timeout"], + ) + return {"messages": [_message_dict(message) for message in messages]} + + +async def _acknowledge_alarm(**kwargs: Any) -> dict[str, Any]: + async with await _connect(kwargs) as client: + reply = await client.acknowledge_alarm( + pb.AcknowledgeAlarmRequest( + client_correlation_id=kwargs["correlation_id"], + alarm_full_reference=kwargs["reference"], + comment=kwargs["comment"], + operator_user=kwargs["operator"], + ), + ) + return _message_dict(reply) + + async def _write(**kwargs: Any) -> dict[str, Any]: value = _parse_value(kwargs["value"], kwargs["value_type"]) async with await _connect(kwargs) as client: @@ -912,6 +974,34 @@ async def _collect_events( return collected +async def _collect_alarm_messages( + messages: Any, + *, + max_messages: int, + timeout: float, +) -> list[pb.AlarmFeedMessage]: + if max_messages > MAX_AGGREGATE_EVENTS: + raise click.BadParameter( + f"must be less than or equal to {MAX_AGGREGATE_EVENTS}", + param_hint="--max-messages", + ) + + collected: list[pb.AlarmFeedMessage] = [] + iterator = messages.__aiter__() + try: + while len(collected) < max_messages: + collected.append(await asyncio.wait_for(iterator.__anext__(), timeout=timeout)) + except StopAsyncIteration: + pass + except asyncio.TimeoutError: + pass + finally: + close = getattr(iterator, "aclose", None) + if close is not None: + await close() + return collected + + def _parse_value(raw_value: str, value_type: str) -> MxValueInput: normalized = value_type.lower() if normalized == "bool": diff --git a/clients/python/tests/test_cli.py b/clients/python/tests/test_cli.py index 65e61e1..1e0e72e 100644 --- a/clients/python/tests/test_cli.py +++ b/clients/python/tests/test_cli.py @@ -50,6 +50,28 @@ def test_write_parser_rejects_unknown_value_type() -> None: assert "unsupported value type" in result.output +def test_stream_alarms_is_registered() -> None: + runner = CliRunner() + + result = runner.invoke(main, ["stream-alarms", "--help"]) + + assert result.exit_code == 0 + assert "--filter-prefix" in result.output + assert "--max-messages" in result.output + + +def test_acknowledge_alarm_requires_reference() -> None: + runner = CliRunner() + + result = runner.invoke( + main, + ["acknowledge-alarm", "--api-key", "mxgw_test_secret", "--json"], + ) + + assert result.exit_code != 0 + assert "--reference" in result.output + + def test_cli_error_output_redacts_api_key() -> None: runner = CliRunner()