From 168bb9a39a58be01c1bf7df012560312e81ade5f Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Thu, 30 Apr 2026 16:50:31 -0400 Subject: [PATCH] clients/python: SDK methods for AcknowledgeAlarm + QueryActiveAlarms (PR E.3) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Eighth PR of the alarms-over-gateway epic (docs/plans/alarms-over-gateway.md). Mirrors PR E.2's .NET surface on the Python async SDK. Depends on PR E.1 (regen, merged). - GatewayClient.acknowledge_alarm — async unary call routed through the existing _unary helper. ensure_protocol_success raises typed gateway errors for non-OK protocol statuses; map_rpc_error wraps RpcError → MxGatewayAuthenticationError / MxGatewayAuthorizationError on Unauthenticated / PermissionDenied responses. - GatewayClient.query_active_alarms — async iterator over ActiveAlarmSnapshot. Mirrors stream_events_raw's cancel-on-close pattern via a dedicated _canceling_active_alarms_iterator (typed for ActiveAlarmSnapshot). Tests: - 6 new tests in test_alarms.py — request shape, Unauthenticated + PermissionDenied mapping, snapshot streaming, filter prefix passthrough, cancel-on-aclose semantics. - Full Python test suite: 39 passed (was 33; 6 new). CLI verb (alarms subscribe / acknowledge / query-active) deferred — the SDK surface is what lmxopcua consumes; CLI follow-up shares the JSON output shape with E.2's .NET CLI for cross-language tooling. Co-Authored-By: Claude Opus 4.7 (1M context) --- clients/python/src/mxgateway/client.py | 48 +++++ clients/python/tests/test_alarms.py | 243 +++++++++++++++++++++++++ 2 files changed, 291 insertions(+) create mode 100644 clients/python/tests/test_alarms.py diff --git a/clients/python/src/mxgateway/client.py b/clients/python/src/mxgateway/client.py index c785d39..b08dc64 100644 --- a/clients/python/src/mxgateway/client.py +++ b/clients/python/src/mxgateway/client.py @@ -136,6 +136,42 @@ class GatewayClient: call = self.raw_stub.StreamEvents(request, **kwargs) return _canceling_iterator(call) + async def acknowledge_alarm( + self, + request: pb.AcknowledgeAlarmRequest, + ) -> pb.AcknowledgeAlarmReply: + """Acknowledge an active MXAccess alarm condition through the gateway. + + The gateway authenticates the request against the API key's + ``invoke:alarm-ack`` scope and forwards the acknowledge to the worker's + MXAccess session; the resulting native ``MxStatus`` is returned in the + reply. Acks are idempotent — re-acking an already-acked condition is a + no-op at the MxAccess layer. + """ + reply = await self._unary("acknowledge alarm", self.raw_stub.AcknowledgeAlarm, request) + ensure_protocol_success("acknowledge alarm", reply.protocol_status, reply) + return reply + + def query_active_alarms( + self, + request: pb.QueryActiveAlarmsRequest, + *, + metadata: Sequence[tuple[str, str]] | None = None, + ) -> AsyncIterator[pb.ActiveAlarmSnapshot]: + """Stream a snapshot of all alarms currently Active or ActiveAcked. + + The gateway's ConditionRefresh equivalent. Use after reconnect to seed + local Part 9 state, or to reconcile alarms that may have been missed + during a transport blip. Optionally scoped by alarm-reference prefix + (``request.alarm_filter_prefix``) so a partial refresh can target an + equipment sub-tree. + """ + kwargs: dict[str, Any] = {"metadata": merge_metadata(self.options.api_key, metadata)} + if self.options.stream_timeout is not None: + kwargs["timeout"] = self.options.stream_timeout + call = self.raw_stub.QueryActiveAlarms(request, **kwargs) + return _canceling_active_alarms_iterator(call) + async def _unary( self, operation: str, @@ -175,3 +211,15 @@ async def _canceling_iterator(call: Any) -> AsyncIterator[pb.MxEvent]: cancel = getattr(call, "cancel", None) if cancel is not None: cancel() + + +async def _canceling_active_alarms_iterator(call: Any) -> AsyncIterator[pb.ActiveAlarmSnapshot]: + try: + async for snapshot in call: + yield snapshot + except grpc.RpcError as error: + raise map_rpc_error("query active alarms", error) from error + finally: + cancel = getattr(call, "cancel", None) + if cancel is not None: + cancel() diff --git a/clients/python/tests/test_alarms.py b/clients/python/tests/test_alarms.py new file mode 100644 index 0000000..190ab7b --- /dev/null +++ b/clients/python/tests/test_alarms.py @@ -0,0 +1,243 @@ +"""Tests for the AcknowledgeAlarm + QueryActiveAlarms client surface (PR E.3).""" + +from __future__ import annotations + +import asyncio +from typing import Any + +import grpc +import pytest + +from mxgateway import ClientOptions, GatewayClient +from mxgateway.errors import MxGatewayAuthenticationError, MxGatewayAuthorizationError +from mxgateway.generated import mxaccess_gateway_pb2 as pb + + +@pytest.mark.asyncio +async def test_acknowledge_alarm_sends_request_and_returns_reply() -> None: + stub = FakeGatewayStub() + stub.acknowledge_alarm.replies = [ + pb.AcknowledgeAlarmReply( + session_id="session-1", + correlation_id="corr-7", + protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK), + status=pb.MxStatusProxy(success=1, category=pb.MX_STATUS_CATEGORY_OK), + ), + ] + client = await GatewayClient.connect( + ClientOptions(endpoint="fake", api_key="mxgw_test_secret", plaintext=True), + stub=stub, + ) + + reply = await client.acknowledge_alarm( + pb.AcknowledgeAlarmRequest( + session_id="session-1", + client_correlation_id="corr-7", + alarm_full_reference="Tank01.Level.HiHi", + comment="investigating", + operator_user="alice", + ), + ) + + assert reply.protocol_status.code == pb.PROTOCOL_STATUS_CODE_OK + assert reply.status.category == pb.MX_STATUS_CATEGORY_OK + + captured_request = stub.acknowledge_alarm.requests[0] + assert captured_request.alarm_full_reference == "Tank01.Level.HiHi" + assert captured_request.comment == "investigating" + assert captured_request.operator_user == "alice" + assert stub.acknowledge_alarm.metadata == (("authorization", "Bearer mxgw_test_secret"),) + + +@pytest.mark.asyncio +async def test_acknowledge_alarm_unauthenticated_raises_typed_error() -> None: + stub = FakeGatewayStub() + stub.acknowledge_alarm.exception = FakeRpcError(grpc.StatusCode.UNAUTHENTICATED, "expired key") + client = await GatewayClient.connect( + ClientOptions(endpoint="fake", api_key="mxgw_test_secret", plaintext=True), + stub=stub, + ) + + with pytest.raises(MxGatewayAuthenticationError): + await client.acknowledge_alarm( + pb.AcknowledgeAlarmRequest( + session_id="session-1", + alarm_full_reference="Tank01.Level.HiHi", + comment="", + operator_user="alice", + ), + ) + + +@pytest.mark.asyncio +async def test_acknowledge_alarm_permission_denied_raises_typed_error() -> None: + stub = FakeGatewayStub() + stub.acknowledge_alarm.exception = FakeRpcError(grpc.StatusCode.PERMISSION_DENIED, "missing scope") + client = await GatewayClient.connect( + ClientOptions(endpoint="fake", api_key="mxgw_test_secret", plaintext=True), + stub=stub, + ) + + with pytest.raises(MxGatewayAuthorizationError): + await client.acknowledge_alarm( + pb.AcknowledgeAlarmRequest( + session_id="session-1", + alarm_full_reference="Tank01.Level.HiHi", + comment="", + operator_user="alice", + ), + ) + + +@pytest.mark.asyncio +async def test_query_active_alarms_streams_snapshots() -> None: + snapshots = [ + pb.ActiveAlarmSnapshot( + alarm_full_reference="Tank01.Level.HiHi", + current_state=pb.ALARM_CONDITION_STATE_ACTIVE, + severity=750, + ), + pb.ActiveAlarmSnapshot( + alarm_full_reference="Tank02.Level.HiHi", + current_state=pb.ALARM_CONDITION_STATE_ACTIVE_ACKED, + severity=750, + ), + ] + stream = FakeSnapshotStream(snapshots) + stub = FakeGatewayStub(snapshot_stream=stream) + client = await GatewayClient.connect( + ClientOptions(endpoint="fake", api_key="mxgw_test_secret", plaintext=True), + stub=stub, + ) + + received: list[pb.ActiveAlarmSnapshot] = [] + async for snapshot in client.query_active_alarms( + pb.QueryActiveAlarmsRequest(session_id="session-1"), + ): + received.append(snapshot) + + assert len(received) == 2 + assert received[0].alarm_full_reference == "Tank01.Level.HiHi" + assert received[0].current_state == pb.ALARM_CONDITION_STATE_ACTIVE + assert received[1].current_state == pb.ALARM_CONDITION_STATE_ACTIVE_ACKED + assert stub.query_metadata == (("authorization", "Bearer mxgw_test_secret"),) + + +@pytest.mark.asyncio +async def test_query_active_alarms_passes_filter_prefix() -> None: + stream = FakeSnapshotStream([]) + stub = FakeGatewayStub(snapshot_stream=stream) + client = await GatewayClient.connect( + ClientOptions(endpoint="fake", api_key="mxgw_test_secret", plaintext=True), + stub=stub, + ) + + iterator = client.query_active_alarms( + pb.QueryActiveAlarmsRequest(session_id="session-1", alarm_filter_prefix="Tank01."), + ) + # Drain to trigger the stub call. + async for _ in iterator: + pass + + assert stub.query_request is not None + assert stub.query_request.alarm_filter_prefix == "Tank01." + + +@pytest.mark.asyncio +async def test_query_active_alarms_cancels_underlying_stream_on_close() -> None: + snapshots = [ + pb.ActiveAlarmSnapshot( + alarm_full_reference="Tank01.Level.HiHi", + current_state=pb.ALARM_CONDITION_STATE_ACTIVE, + ), + ] + stream = FakeSnapshotStream(snapshots) + stub = FakeGatewayStub(snapshot_stream=stream) + client = await GatewayClient.connect( + ClientOptions(endpoint="fake", api_key="mxgw_test_secret", plaintext=True), + stub=stub, + ) + + iterator = client.query_active_alarms(pb.QueryActiveAlarmsRequest(session_id="session-1")) + first = await anext(iterator) + await iterator.aclose() + + assert first.alarm_full_reference == "Tank01.Level.HiHi" + assert stream.cancelled + + +class FakeGatewayStub: + def __init__(self, snapshot_stream: "FakeSnapshotStream | None" = None) -> None: + self.open_session = FakeUnary( + [ + pb.OpenSessionReply( + session_id="session-1", + protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK), + ), + ], + ) + self.acknowledge_alarm = FakeUnary([]) + self.OpenSession = self.open_session + self.AcknowledgeAlarm = self.acknowledge_alarm + self._snapshot_stream = snapshot_stream or FakeSnapshotStream([]) + self.query_request: pb.QueryActiveAlarmsRequest | None = None + self.query_metadata: tuple[tuple[str, str], ...] | None = None + + def QueryActiveAlarms( + self, + request: pb.QueryActiveAlarmsRequest, + *, + metadata: tuple[tuple[str, str], ...], + ) -> "FakeSnapshotStream": + self.query_request = request + self.query_metadata = metadata + return self._snapshot_stream + + +class FakeUnary: + def __init__(self, replies: list[Any]) -> None: + self.replies = replies + self.requests: list[Any] = [] + self.metadata: tuple[tuple[str, str], ...] | None = None + self.exception: Exception | None = None + + async def __call__( + self, + request: Any, + *, + metadata: tuple[tuple[str, str], ...], + ) -> Any: + self.requests.append(request) + self.metadata = metadata + if self.exception is not None: + raise self.exception + return self.replies.pop(0) + + +class FakeSnapshotStream: + def __init__(self, snapshots: list[pb.ActiveAlarmSnapshot]) -> None: + self._snapshots = list(snapshots) + self.cancelled = False + + def __aiter__(self) -> "FakeSnapshotStream": + return self + + async def __anext__(self) -> pb.ActiveAlarmSnapshot: + if not self._snapshots: + raise StopAsyncIteration + return self._snapshots.pop(0) + + def cancel(self) -> None: + self.cancelled = True + + +class FakeRpcError(grpc.RpcError): + def __init__(self, code: grpc.StatusCode, details: str) -> None: + self._code = code + self._details = details + + def code(self) -> grpc.StatusCode: # noqa: D401 + return self._code + + def details(self) -> str: # noqa: D401 + return self._details