Files
mxaccessgw/clients/python/tests/test_stream_timeout_fallback.py
T
Joseph Doherty 1ad0be8276 Point the Python client at the StreamAlarms alarm feed
Regenerate the Python protobuf stubs and replace query_active_alarms
with stream_alarms, an AsyncIterator over AlarmFeedMessage served by
the gateway's central alarm monitor (snapshot, snapshot_complete, then
live transitions). Drops session_id from the acknowledge surface.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 16:45:53 -04:00

139 lines
4.0 KiB
Python

"""Regression tests for Client.Python-003: stream timeout-kwarg fallback.
`stream_events_raw` and `stream_alarms` must tolerate a fake/older stub
that does not accept a ``timeout`` keyword argument, matching the fallback
already present in `galaxy.watch_deploy_events` and the unary `_unary` helper.
"""
from __future__ import annotations
from typing import Any
import pytest
from mxgateway import ClientOptions, GatewayClient
from mxgateway.generated import mxaccess_gateway_pb2 as pb
class _NoTimeoutStream:
"""Sync-callable unary-stream fake that rejects a ``timeout`` kwarg."""
def __init__(self, replies: list[Any]) -> None:
self._replies = list(replies)
self.requests: list[Any] = []
self.metadata: tuple[tuple[str, str], ...] | None = None
self.cancelled = False
def __call__(
self,
request: Any,
*,
metadata: tuple[tuple[str, str], ...],
) -> "_NoTimeoutStream":
self.requests.append(request)
self.metadata = metadata
return self
def __aiter__(self) -> "_NoTimeoutStream":
return self
async def __anext__(self) -> Any:
if not self._replies:
raise StopAsyncIteration
return self._replies.pop(0)
def cancel(self) -> None:
self.cancelled = True
class _NoTimeoutStubStreamEvents:
def __init__(self, stream: _NoTimeoutStream) -> None:
self.StreamEvents = stream
class _NoTimeoutStubStreamAlarms:
def __init__(self, stream: _NoTimeoutStream) -> None:
self.StreamAlarms = stream
@pytest.mark.asyncio
async def test_stream_events_raw_falls_back_when_stub_rejects_timeout() -> None:
stream = _NoTimeoutStream(
[pb.MxEvent(session_id="session-1", worker_sequence=1)],
)
client = await GatewayClient.connect(
ClientOptions(endpoint="fake", plaintext=True, stream_timeout=5.0),
stub=_NoTimeoutStubStreamEvents(stream),
)
received = [
event
async for event in client.stream_events_raw(
pb.StreamEventsRequest(session_id="session-1"),
)
]
assert len(received) == 1
assert received[0].worker_sequence == 1
@pytest.mark.asyncio
async def test_stream_alarms_falls_back_when_stub_rejects_timeout() -> None:
stream = _NoTimeoutStream(
[
pb.AlarmFeedMessage(
active_alarm=pb.ActiveAlarmSnapshot(
alarm_full_reference="Tank01.Level.HiHi",
),
),
],
)
client = await GatewayClient.connect(
ClientOptions(endpoint="fake", plaintext=True, stream_timeout=5.0),
stub=_NoTimeoutStubStreamAlarms(stream),
)
received = [
message
async for message in client.stream_alarms(
pb.StreamAlarmsRequest(),
)
]
assert len(received) == 1
assert received[0].active_alarm.alarm_full_reference == "Tank01.Level.HiHi"
@pytest.mark.asyncio
async def test_stream_events_raw_still_passes_timeout_to_capable_stub() -> None:
"""A stub that accepts ``timeout`` must still receive the configured value."""
captured: dict[str, Any] = {}
class _CapableStream(_NoTimeoutStream):
def __call__( # type: ignore[override]
self,
request: Any,
*,
metadata: tuple[tuple[str, str], ...],
timeout: float | None = None,
) -> "_CapableStream":
captured["timeout"] = timeout
return super().__call__(request, metadata=metadata)
stream = _CapableStream([pb.MxEvent(session_id="session-1", worker_sequence=9)])
client = await GatewayClient.connect(
ClientOptions(endpoint="fake", plaintext=True, stream_timeout=7.5),
stub=_NoTimeoutStubStreamEvents(stream),
)
received = [
event
async for event in client.stream_events_raw(
pb.StreamEventsRequest(session_id="session-1"),
)
]
assert len(received) == 1
assert captured["timeout"] == 7.5