Point the Python client at the StreamAlarms alarm feed

Regenerate the Python protobuf stubs and replace query_active_alarms
with stream_alarms, an AsyncIterator over AlarmFeedMessage served by
the gateway's central alarm monitor (snapshot, snapshot_complete, then
live transitions). Drops session_id from the acknowledge surface.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-21 16:45:53 -04:00
parent 9328c4f657
commit 1ad0be8276
5 changed files with 165 additions and 152 deletions
+13 -11
View File
@@ -166,25 +166,27 @@ class GatewayClient:
ensure_protocol_success("acknowledge alarm", reply.protocol_status, reply) ensure_protocol_success("acknowledge alarm", reply.protocol_status, reply)
return reply return reply
def query_active_alarms( def stream_alarms(
self, self,
request: pb.QueryActiveAlarmsRequest, request: pb.StreamAlarmsRequest,
*, *,
metadata: Sequence[tuple[str, str]] | None = None, metadata: Sequence[tuple[str, str]] | None = None,
) -> AsyncIterator[pb.ActiveAlarmSnapshot]: ) -> AsyncIterator[pb.AlarmFeedMessage]:
"""Stream a snapshot of all alarms currently Active or ActiveAcked. """Attach to the gateway's central alarm feed.
The gateway's ConditionRefresh equivalent. Use after reconnect to seed The stream opens with one ``AlarmFeedMessage`` per currently-active
local Part 9 state, or to reconcile alarms that may have been missed alarm (the ConditionRefresh snapshot), then a single
during a transport blip. Optionally scoped by alarm-reference prefix ``snapshot_complete``, then a ``transition`` for every subsequent
(``request.alarm_filter_prefix``) so a partial refresh can target an raise / acknowledge / clear. Served by the gateway's always-on alarm
equipment sub-tree. monitor — no worker session is opened — so any number of clients may
attach. Optionally scoped by alarm-reference prefix
(``request.alarm_filter_prefix``).
""" """
kwargs: dict[str, Any] = {"metadata": merge_metadata(self.options.api_key, metadata)} kwargs: dict[str, Any] = {"metadata": merge_metadata(self.options.api_key, metadata)}
if self.options.stream_timeout is not None: if self.options.stream_timeout is not None:
kwargs["timeout"] = self.options.stream_timeout kwargs["timeout"] = self.options.stream_timeout
call = _open_stream(self.raw_stub.QueryActiveAlarms, request, kwargs) call = _open_stream(self.raw_stub.StreamAlarms, request, kwargs)
return _canceling_iterator(call, "query active alarms") return _canceling_iterator(call, "stream alarms")
async def _unary( async def _unary(
self, self,
File diff suppressed because one or more lines are too long
@@ -30,8 +30,7 @@ class MxAccessGatewayStub(object):
additively only. Never renumber or repurpose an existing field number or additively only. Never renumber or repurpose an existing field number or
enum value. When a field or enum value is removed, add a `reserved` range enum value. When a field or enum value is removed, add a `reserved` range
(and `reserved` name) covering it in the same change so a future editor (and `reserved` name) covering it in the same change so a future editor
cannot accidentally reuse the retired tag. There are no `reserved` cannot accidentally reuse the retired tag.
declarations today because no field or enum value has ever been removed.
Public client API for MXAccess sessions hosted by the gateway. Public client API for MXAccess sessions hosted by the gateway.
""" """
@@ -67,10 +66,10 @@ class MxAccessGatewayStub(object):
request_serializer=mxaccess__gateway__pb2.AcknowledgeAlarmRequest.SerializeToString, request_serializer=mxaccess__gateway__pb2.AcknowledgeAlarmRequest.SerializeToString,
response_deserializer=mxaccess__gateway__pb2.AcknowledgeAlarmReply.FromString, response_deserializer=mxaccess__gateway__pb2.AcknowledgeAlarmReply.FromString,
_registered_method=True) _registered_method=True)
self.QueryActiveAlarms = channel.unary_stream( self.StreamAlarms = channel.unary_stream(
'/mxaccess_gateway.v1.MxAccessGateway/QueryActiveAlarms', '/mxaccess_gateway.v1.MxAccessGateway/StreamAlarms',
request_serializer=mxaccess__gateway__pb2.QueryActiveAlarmsRequest.SerializeToString, request_serializer=mxaccess__gateway__pb2.StreamAlarmsRequest.SerializeToString,
response_deserializer=mxaccess__gateway__pb2.ActiveAlarmSnapshot.FromString, response_deserializer=mxaccess__gateway__pb2.AlarmFeedMessage.FromString,
_registered_method=True) _registered_method=True)
@@ -79,8 +78,7 @@ class MxAccessGatewayServicer(object):
additively only. Never renumber or repurpose an existing field number or additively only. Never renumber or repurpose an existing field number or
enum value. When a field or enum value is removed, add a `reserved` range enum value. When a field or enum value is removed, add a `reserved` range
(and `reserved` name) covering it in the same change so a future editor (and `reserved` name) covering it in the same change so a future editor
cannot accidentally reuse the retired tag. There are no `reserved` cannot accidentally reuse the retired tag.
declarations today because no field or enum value has ever been removed.
Public client API for MXAccess sessions hosted by the gateway. Public client API for MXAccess sessions hosted by the gateway.
""" """
@@ -115,8 +113,13 @@ class MxAccessGatewayServicer(object):
context.set_details('Method not implemented!') context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!') raise NotImplementedError('Method not implemented!')
def QueryActiveAlarms(self, request, context): def StreamAlarms(self, request, context):
"""Missing associated documentation comment in .proto file.""" """Session-less central alarm feed. The stream opens with the current
active-alarm snapshot (one `active_alarm` per alarm), then a single
`snapshot_complete`, then a `transition` for every subsequent change.
Served by the gateway's always-on alarm monitor; any number of clients
fan out from the single monitor without opening a worker session.
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!') context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!') raise NotImplementedError('Method not implemented!')
@@ -149,10 +152,10 @@ def add_MxAccessGatewayServicer_to_server(servicer, server):
request_deserializer=mxaccess__gateway__pb2.AcknowledgeAlarmRequest.FromString, request_deserializer=mxaccess__gateway__pb2.AcknowledgeAlarmRequest.FromString,
response_serializer=mxaccess__gateway__pb2.AcknowledgeAlarmReply.SerializeToString, response_serializer=mxaccess__gateway__pb2.AcknowledgeAlarmReply.SerializeToString,
), ),
'QueryActiveAlarms': grpc.unary_stream_rpc_method_handler( 'StreamAlarms': grpc.unary_stream_rpc_method_handler(
servicer.QueryActiveAlarms, servicer.StreamAlarms,
request_deserializer=mxaccess__gateway__pb2.QueryActiveAlarmsRequest.FromString, request_deserializer=mxaccess__gateway__pb2.StreamAlarmsRequest.FromString,
response_serializer=mxaccess__gateway__pb2.ActiveAlarmSnapshot.SerializeToString, response_serializer=mxaccess__gateway__pb2.AlarmFeedMessage.SerializeToString,
), ),
} }
generic_handler = grpc.method_handlers_generic_handler( generic_handler = grpc.method_handlers_generic_handler(
@@ -167,8 +170,7 @@ class MxAccessGateway(object):
additively only. Never renumber or repurpose an existing field number or additively only. Never renumber or repurpose an existing field number or
enum value. When a field or enum value is removed, add a `reserved` range enum value. When a field or enum value is removed, add a `reserved` range
(and `reserved` name) covering it in the same change so a future editor (and `reserved` name) covering it in the same change so a future editor
cannot accidentally reuse the retired tag. There are no `reserved` cannot accidentally reuse the retired tag.
declarations today because no field or enum value has ever been removed.
Public client API for MXAccess sessions hosted by the gateway. Public client API for MXAccess sessions hosted by the gateway.
""" """
@@ -309,7 +311,7 @@ class MxAccessGateway(object):
_registered_method=True) _registered_method=True)
@staticmethod @staticmethod
def QueryActiveAlarms(request, def StreamAlarms(request,
target, target,
options=(), options=(),
channel_credentials=None, channel_credentials=None,
@@ -322,9 +324,9 @@ class MxAccessGateway(object):
return grpc.experimental.unary_stream( return grpc.experimental.unary_stream(
request, request,
target, target,
'/mxaccess_gateway.v1.MxAccessGateway/QueryActiveAlarms', '/mxaccess_gateway.v1.MxAccessGateway/StreamAlarms',
mxaccess__gateway__pb2.QueryActiveAlarmsRequest.SerializeToString, mxaccess__gateway__pb2.StreamAlarmsRequest.SerializeToString,
mxaccess__gateway__pb2.ActiveAlarmSnapshot.FromString, mxaccess__gateway__pb2.AlarmFeedMessage.FromString,
options, options,
channel_credentials, channel_credentials,
insecure, insecure,
+62 -61
View File
@@ -1,8 +1,7 @@
"""Tests for the AcknowledgeAlarm + QueryActiveAlarms client surface (PR E.3).""" """Tests for the AcknowledgeAlarm + StreamAlarms client surface."""
from __future__ import annotations from __future__ import annotations
import asyncio
from typing import Any from typing import Any
import grpc import grpc
@@ -18,7 +17,6 @@ async def test_acknowledge_alarm_sends_request_and_returns_reply() -> None:
stub = FakeGatewayStub() stub = FakeGatewayStub()
stub.acknowledge_alarm.replies = [ stub.acknowledge_alarm.replies = [
pb.AcknowledgeAlarmReply( pb.AcknowledgeAlarmReply(
session_id="session-1",
correlation_id="corr-7", correlation_id="corr-7",
protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK), protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK),
status=pb.MxStatusProxy(success=1, category=pb.MX_STATUS_CATEGORY_OK), status=pb.MxStatusProxy(success=1, category=pb.MX_STATUS_CATEGORY_OK),
@@ -31,7 +29,6 @@ async def test_acknowledge_alarm_sends_request_and_returns_reply() -> None:
reply = await client.acknowledge_alarm( reply = await client.acknowledge_alarm(
pb.AcknowledgeAlarmRequest( pb.AcknowledgeAlarmRequest(
session_id="session-1",
client_correlation_id="corr-7", client_correlation_id="corr-7",
alarm_full_reference="Tank01.Level.HiHi", alarm_full_reference="Tank01.Level.HiHi",
comment="investigating", comment="investigating",
@@ -61,7 +58,6 @@ async def test_acknowledge_alarm_unauthenticated_raises_typed_error() -> None:
with pytest.raises(MxGatewayAuthenticationError): with pytest.raises(MxGatewayAuthenticationError):
await client.acknowledge_alarm( await client.acknowledge_alarm(
pb.AcknowledgeAlarmRequest( pb.AcknowledgeAlarmRequest(
session_id="session-1",
alarm_full_reference="Tank01.Level.HiHi", alarm_full_reference="Tank01.Level.HiHi",
comment="", comment="",
operator_user="alice", operator_user="alice",
@@ -81,7 +77,6 @@ async def test_acknowledge_alarm_permission_denied_raises_typed_error() -> None:
with pytest.raises(MxGatewayAuthorizationError): with pytest.raises(MxGatewayAuthorizationError):
await client.acknowledge_alarm( await client.acknowledge_alarm(
pb.AcknowledgeAlarmRequest( pb.AcknowledgeAlarmRequest(
session_id="session-1",
alarm_full_reference="Tank01.Level.HiHi", alarm_full_reference="Tank01.Level.HiHi",
comment="", comment="",
operator_user="alice", operator_user="alice",
@@ -90,84 +85,90 @@ async def test_acknowledge_alarm_permission_denied_raises_typed_error() -> None:
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_query_active_alarms_streams_snapshots() -> None: async def test_stream_alarms_streams_snapshot_then_snapshot_complete() -> None:
snapshots = [ messages = [
pb.ActiveAlarmSnapshot( pb.AlarmFeedMessage(
alarm_full_reference="Tank01.Level.HiHi", active_alarm=pb.ActiveAlarmSnapshot(
current_state=pb.ALARM_CONDITION_STATE_ACTIVE, alarm_full_reference="Tank01.Level.HiHi",
severity=750, current_state=pb.ALARM_CONDITION_STATE_ACTIVE,
severity=750,
),
), ),
pb.ActiveAlarmSnapshot( pb.AlarmFeedMessage(
alarm_full_reference="Tank02.Level.HiHi", active_alarm=pb.ActiveAlarmSnapshot(
current_state=pb.ALARM_CONDITION_STATE_ACTIVE_ACKED, alarm_full_reference="Tank02.Level.HiHi",
severity=750, current_state=pb.ALARM_CONDITION_STATE_ACTIVE_ACKED,
severity=750,
),
), ),
pb.AlarmFeedMessage(snapshot_complete=True),
] ]
stream = FakeSnapshotStream(snapshots) stream = FakeAlarmFeedStream(messages)
stub = FakeGatewayStub(snapshot_stream=stream) stub = FakeGatewayStub(alarm_feed_stream=stream)
client = await GatewayClient.connect( client = await GatewayClient.connect(
ClientOptions(endpoint="fake", api_key="mxgw_test_secret", plaintext=True), ClientOptions(endpoint="fake", api_key="mxgw_test_secret", plaintext=True),
stub=stub, stub=stub,
) )
received: list[pb.ActiveAlarmSnapshot] = [] received: list[pb.AlarmFeedMessage] = []
async for snapshot in client.query_active_alarms( async for message in client.stream_alarms(pb.StreamAlarmsRequest()):
pb.QueryActiveAlarmsRequest(session_id="session-1"), received.append(message)
):
received.append(snapshot)
assert len(received) == 2 assert len(received) == 3
assert received[0].alarm_full_reference == "Tank01.Level.HiHi" assert received[0].active_alarm.alarm_full_reference == "Tank01.Level.HiHi"
assert received[0].current_state == pb.ALARM_CONDITION_STATE_ACTIVE assert received[0].active_alarm.current_state == pb.ALARM_CONDITION_STATE_ACTIVE
assert received[1].current_state == pb.ALARM_CONDITION_STATE_ACTIVE_ACKED assert received[1].active_alarm.current_state == pb.ALARM_CONDITION_STATE_ACTIVE_ACKED
assert stub.query_metadata == (("authorization", "Bearer mxgw_test_secret"),) assert received[2].snapshot_complete is True
assert stub.stream_metadata == (("authorization", "Bearer mxgw_test_secret"),)
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_query_active_alarms_passes_filter_prefix() -> None: async def test_stream_alarms_passes_filter_prefix() -> None:
stream = FakeSnapshotStream([]) stream = FakeAlarmFeedStream([])
stub = FakeGatewayStub(snapshot_stream=stream) stub = FakeGatewayStub(alarm_feed_stream=stream)
client = await GatewayClient.connect( client = await GatewayClient.connect(
ClientOptions(endpoint="fake", api_key="mxgw_test_secret", plaintext=True), ClientOptions(endpoint="fake", api_key="mxgw_test_secret", plaintext=True),
stub=stub, stub=stub,
) )
iterator = client.query_active_alarms( iterator = client.stream_alarms(
pb.QueryActiveAlarmsRequest(session_id="session-1", alarm_filter_prefix="Tank01."), pb.StreamAlarmsRequest(alarm_filter_prefix="Tank01."),
) )
# Drain to trigger the stub call. # Drain to trigger the stub call.
async for _ in iterator: async for _ in iterator:
pass pass
assert stub.query_request is not None assert stub.stream_request is not None
assert stub.query_request.alarm_filter_prefix == "Tank01." assert stub.stream_request.alarm_filter_prefix == "Tank01."
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_query_active_alarms_cancels_underlying_stream_on_close() -> None: async def test_stream_alarms_cancels_underlying_stream_on_close() -> None:
snapshots = [ messages = [
pb.ActiveAlarmSnapshot( pb.AlarmFeedMessage(
alarm_full_reference="Tank01.Level.HiHi", active_alarm=pb.ActiveAlarmSnapshot(
current_state=pb.ALARM_CONDITION_STATE_ACTIVE, alarm_full_reference="Tank01.Level.HiHi",
current_state=pb.ALARM_CONDITION_STATE_ACTIVE,
),
), ),
] ]
stream = FakeSnapshotStream(snapshots) stream = FakeAlarmFeedStream(messages)
stub = FakeGatewayStub(snapshot_stream=stream) stub = FakeGatewayStub(alarm_feed_stream=stream)
client = await GatewayClient.connect( client = await GatewayClient.connect(
ClientOptions(endpoint="fake", api_key="mxgw_test_secret", plaintext=True), ClientOptions(endpoint="fake", api_key="mxgw_test_secret", plaintext=True),
stub=stub, stub=stub,
) )
iterator = client.query_active_alarms(pb.QueryActiveAlarmsRequest(session_id="session-1")) iterator = client.stream_alarms(pb.StreamAlarmsRequest())
first = await anext(iterator) first = await anext(iterator)
await iterator.aclose() await iterator.aclose()
assert first.alarm_full_reference == "Tank01.Level.HiHi" assert first.active_alarm.alarm_full_reference == "Tank01.Level.HiHi"
assert stream.cancelled assert stream.cancelled
class FakeGatewayStub: class FakeGatewayStub:
def __init__(self, snapshot_stream: "FakeSnapshotStream | None" = None) -> None: def __init__(self, alarm_feed_stream: "FakeAlarmFeedStream | None" = None) -> None:
self.open_session = FakeUnary( self.open_session = FakeUnary(
[ [
pb.OpenSessionReply( pb.OpenSessionReply(
@@ -179,19 +180,19 @@ class FakeGatewayStub:
self.acknowledge_alarm = FakeUnary([]) self.acknowledge_alarm = FakeUnary([])
self.OpenSession = self.open_session self.OpenSession = self.open_session
self.AcknowledgeAlarm = self.acknowledge_alarm self.AcknowledgeAlarm = self.acknowledge_alarm
self._snapshot_stream = snapshot_stream or FakeSnapshotStream([]) self._alarm_feed_stream = alarm_feed_stream or FakeAlarmFeedStream([])
self.query_request: pb.QueryActiveAlarmsRequest | None = None self.stream_request: pb.StreamAlarmsRequest | None = None
self.query_metadata: tuple[tuple[str, str], ...] | None = None self.stream_metadata: tuple[tuple[str, str], ...] | None = None
def QueryActiveAlarms( def StreamAlarms(
self, self,
request: pb.QueryActiveAlarmsRequest, request: pb.StreamAlarmsRequest,
*, *,
metadata: tuple[tuple[str, str], ...], metadata: tuple[tuple[str, str], ...],
) -> "FakeSnapshotStream": ) -> "FakeAlarmFeedStream":
self.query_request = request self.stream_request = request
self.query_metadata = metadata self.stream_metadata = metadata
return self._snapshot_stream return self._alarm_feed_stream
class FakeUnary: class FakeUnary:
@@ -214,18 +215,18 @@ class FakeUnary:
return self.replies.pop(0) return self.replies.pop(0)
class FakeSnapshotStream: class FakeAlarmFeedStream:
def __init__(self, snapshots: list[pb.ActiveAlarmSnapshot]) -> None: def __init__(self, messages: list[pb.AlarmFeedMessage]) -> None:
self._snapshots = list(snapshots) self._messages = list(messages)
self.cancelled = False self.cancelled = False
def __aiter__(self) -> "FakeSnapshotStream": def __aiter__(self) -> "FakeAlarmFeedStream":
return self return self
async def __anext__(self) -> pb.ActiveAlarmSnapshot: async def __anext__(self) -> pb.AlarmFeedMessage:
if not self._snapshots: if not self._messages:
raise StopAsyncIteration raise StopAsyncIteration
return self._snapshots.pop(0) return self._messages.pop(0)
def cancel(self) -> None: def cancel(self) -> None:
self.cancelled = True self.cancelled = True
@@ -1,6 +1,6 @@
"""Regression tests for Client.Python-003: stream timeout-kwarg fallback. """Regression tests for Client.Python-003: stream timeout-kwarg fallback.
`stream_events_raw` and `query_active_alarms` must tolerate a fake/older stub `stream_events_raw` and `stream_alarms` must tolerate a fake/older stub
that does not accept a ``timeout`` keyword argument, matching the fallback that does not accept a ``timeout`` keyword argument, matching the fallback
already present in `galaxy.watch_deploy_events` and the unary `_unary` helper. already present in `galaxy.watch_deploy_events` and the unary `_unary` helper.
""" """
@@ -51,9 +51,9 @@ class _NoTimeoutStubStreamEvents:
self.StreamEvents = stream self.StreamEvents = stream
class _NoTimeoutStubQueryAlarms: class _NoTimeoutStubStreamAlarms:
def __init__(self, stream: _NoTimeoutStream) -> None: def __init__(self, stream: _NoTimeoutStream) -> None:
self.QueryActiveAlarms = stream self.StreamAlarms = stream
@pytest.mark.asyncio @pytest.mark.asyncio
@@ -78,24 +78,30 @@ async def test_stream_events_raw_falls_back_when_stub_rejects_timeout() -> None:
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_query_active_alarms_falls_back_when_stub_rejects_timeout() -> None: async def test_stream_alarms_falls_back_when_stub_rejects_timeout() -> None:
stream = _NoTimeoutStream( stream = _NoTimeoutStream(
[pb.ActiveAlarmSnapshot(alarm_full_reference="Tank01.Level.HiHi")], [
pb.AlarmFeedMessage(
active_alarm=pb.ActiveAlarmSnapshot(
alarm_full_reference="Tank01.Level.HiHi",
),
),
],
) )
client = await GatewayClient.connect( client = await GatewayClient.connect(
ClientOptions(endpoint="fake", plaintext=True, stream_timeout=5.0), ClientOptions(endpoint="fake", plaintext=True, stream_timeout=5.0),
stub=_NoTimeoutStubQueryAlarms(stream), stub=_NoTimeoutStubStreamAlarms(stream),
) )
received = [ received = [
snapshot message
async for snapshot in client.query_active_alarms( async for message in client.stream_alarms(
pb.QueryActiveAlarmsRequest(session_id="session-1"), pb.StreamAlarmsRequest(),
) )
] ]
assert len(received) == 1 assert len(received) == 1
assert received[0].alarm_full_reference == "Tank01.Level.HiHi" assert received[0].active_alarm.alarm_full_reference == "Tank01.Level.HiHi"
@pytest.mark.asyncio @pytest.mark.asyncio