clients/python: SDK methods for AcknowledgeAlarm + QueryActiveAlarms (PR E.3)
Eighth PR of the alarms-over-gateway epic (docs/plans/alarms-over-gateway.md). Mirrors PR E.2's .NET surface on the Python async SDK. Depends on PR E.1 (regen, merged). - GatewayClient.acknowledge_alarm — async unary call routed through the existing _unary helper. ensure_protocol_success raises typed gateway errors for non-OK protocol statuses; map_rpc_error wraps RpcError → MxGatewayAuthenticationError / MxGatewayAuthorizationError on Unauthenticated / PermissionDenied responses. - GatewayClient.query_active_alarms — async iterator over ActiveAlarmSnapshot. Mirrors stream_events_raw's cancel-on-close pattern via a dedicated _canceling_active_alarms_iterator (typed for ActiveAlarmSnapshot). Tests: - 6 new tests in test_alarms.py — request shape, Unauthenticated + PermissionDenied mapping, snapshot streaming, filter prefix passthrough, cancel-on-aclose semantics. - Full Python test suite: 39 passed (was 33; 6 new). CLI verb (alarms subscribe / acknowledge / query-active) deferred — the SDK surface is what lmxopcua consumes; CLI follow-up shares the JSON output shape with E.2's .NET CLI for cross-language tooling. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -136,6 +136,42 @@ class GatewayClient:
|
||||
call = self.raw_stub.StreamEvents(request, **kwargs)
|
||||
return _canceling_iterator(call)
|
||||
|
||||
async def acknowledge_alarm(
|
||||
self,
|
||||
request: pb.AcknowledgeAlarmRequest,
|
||||
) -> pb.AcknowledgeAlarmReply:
|
||||
"""Acknowledge an active MXAccess alarm condition through the gateway.
|
||||
|
||||
The gateway authenticates the request against the API key's
|
||||
``invoke:alarm-ack`` scope and forwards the acknowledge to the worker's
|
||||
MXAccess session; the resulting native ``MxStatus`` is returned in the
|
||||
reply. Acks are idempotent — re-acking an already-acked condition is a
|
||||
no-op at the MxAccess layer.
|
||||
"""
|
||||
reply = await self._unary("acknowledge alarm", self.raw_stub.AcknowledgeAlarm, request)
|
||||
ensure_protocol_success("acknowledge alarm", reply.protocol_status, reply)
|
||||
return reply
|
||||
|
||||
def query_active_alarms(
|
||||
self,
|
||||
request: pb.QueryActiveAlarmsRequest,
|
||||
*,
|
||||
metadata: Sequence[tuple[str, str]] | None = None,
|
||||
) -> AsyncIterator[pb.ActiveAlarmSnapshot]:
|
||||
"""Stream a snapshot of all alarms currently Active or ActiveAcked.
|
||||
|
||||
The gateway's ConditionRefresh equivalent. Use after reconnect to seed
|
||||
local Part 9 state, or to reconcile alarms that may have been missed
|
||||
during a transport blip. Optionally scoped by alarm-reference prefix
|
||||
(``request.alarm_filter_prefix``) so a partial refresh can target an
|
||||
equipment sub-tree.
|
||||
"""
|
||||
kwargs: dict[str, Any] = {"metadata": merge_metadata(self.options.api_key, metadata)}
|
||||
if self.options.stream_timeout is not None:
|
||||
kwargs["timeout"] = self.options.stream_timeout
|
||||
call = self.raw_stub.QueryActiveAlarms(request, **kwargs)
|
||||
return _canceling_active_alarms_iterator(call)
|
||||
|
||||
async def _unary(
|
||||
self,
|
||||
operation: str,
|
||||
@@ -175,3 +211,15 @@ async def _canceling_iterator(call: Any) -> AsyncIterator[pb.MxEvent]:
|
||||
cancel = getattr(call, "cancel", None)
|
||||
if cancel is not None:
|
||||
cancel()
|
||||
|
||||
|
||||
async def _canceling_active_alarms_iterator(call: Any) -> AsyncIterator[pb.ActiveAlarmSnapshot]:
|
||||
try:
|
||||
async for snapshot in call:
|
||||
yield snapshot
|
||||
except grpc.RpcError as error:
|
||||
raise map_rpc_error("query active alarms", error) from error
|
||||
finally:
|
||||
cancel = getattr(call, "cancel", None)
|
||||
if cancel is not None:
|
||||
cancel()
|
||||
|
||||
Reference in New Issue
Block a user