Resolve Client.Python-003, -005, -009 code-review findings
Client.Python-003: stream_events_raw and query_active_alarms passed `timeout` to the stub with no TypeError fallback, unlike _unary. Both now route through a shared _open_stream helper that strips `timeout` on TypeError. Client.Python-005: discover_hierarchy buffered the entire Galaxy hierarchy in memory. Added GalaxyRepositoryClient.iter_hierarchy, a lazy async generator yielding objects page-by-page; discover_hierarchy is now a thin wrapper that preserves its list contract. README documents iter_hierarchy. Client.Python-009: added regression coverage for previously untested paths — write2/add_item2 request shape, the MAX_BULK_ITEMS boundary, the None-argument TypeError guards, TLS ca_file reading, and the non-auth map_rpc_error fallthrough. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -131,6 +131,25 @@ The methods return native Python types (`bool`, `datetime | None`, and a
|
||||
into the hierarchy without learning the underlying stub class. The
|
||||
service requires the `metadata:read` scope on the API key.
|
||||
|
||||
`discover_hierarchy` buffers every object (with its full attribute list)
|
||||
into a single in-memory `list`. For a large Galaxy use `iter_hierarchy`
|
||||
instead — it is an async generator that fetches one page at a time and
|
||||
yields objects as they arrive, so peak memory stays bounded by a single
|
||||
page rather than the whole hierarchy:
|
||||
|
||||
```python
|
||||
async with await GalaxyRepositoryClient.connect(
|
||||
endpoint="localhost:5000",
|
||||
api_key="<gateway-api-key>",
|
||||
plaintext=True,
|
||||
) as galaxy:
|
||||
async for obj in galaxy.iter_hierarchy():
|
||||
print(obj.tag_name, obj.contained_name)
|
||||
```
|
||||
|
||||
Pages are fetched lazily: the next page is only requested once the
|
||||
caller has consumed every object from the current page.
|
||||
|
||||
### Watching deploy events
|
||||
|
||||
`GalaxyRepositoryClient.watch_deploy_events` opens a server-streaming
|
||||
|
||||
@@ -133,7 +133,7 @@ class GatewayClient:
|
||||
kwargs: dict[str, Any] = {"metadata": merge_metadata(self.options.api_key, metadata)}
|
||||
if self.options.stream_timeout is not None:
|
||||
kwargs["timeout"] = self.options.stream_timeout
|
||||
call = self.raw_stub.StreamEvents(request, **kwargs)
|
||||
call = _open_stream(self.raw_stub.StreamEvents, request, kwargs)
|
||||
return _canceling_iterator(call)
|
||||
|
||||
async def acknowledge_alarm(
|
||||
@@ -169,7 +169,7 @@ class GatewayClient:
|
||||
kwargs: dict[str, Any] = {"metadata": merge_metadata(self.options.api_key, metadata)}
|
||||
if self.options.stream_timeout is not None:
|
||||
kwargs["timeout"] = self.options.stream_timeout
|
||||
call = self.raw_stub.QueryActiveAlarms(request, **kwargs)
|
||||
call = _open_stream(self.raw_stub.QueryActiveAlarms, request, kwargs)
|
||||
return _canceling_active_alarms_iterator(call)
|
||||
|
||||
async def _unary(
|
||||
@@ -201,6 +201,23 @@ class GatewayClient:
|
||||
raise map_rpc_error(operation, error) from error
|
||||
|
||||
|
||||
def _open_stream(method: Any, request: Any, kwargs: dict[str, Any]) -> Any:
|
||||
"""Open a server-streaming call, dropping ``timeout`` if the stub rejects it.
|
||||
|
||||
Mirrors the fallback in ``_unary`` so an older or fake stub that does not
|
||||
accept a ``timeout`` keyword argument does not crash when ``stream_timeout``
|
||||
is configured.
|
||||
"""
|
||||
|
||||
try:
|
||||
return method(request, **kwargs)
|
||||
except TypeError as error:
|
||||
if "timeout" not in kwargs or "unexpected keyword argument 'timeout'" not in str(error):
|
||||
raise
|
||||
kwargs.pop("timeout")
|
||||
return method(request, **kwargs)
|
||||
|
||||
|
||||
async def _canceling_iterator(call: Any) -> AsyncIterator[pb.MxEvent]:
|
||||
try:
|
||||
async for event in call:
|
||||
|
||||
@@ -114,10 +114,17 @@ class GalaxyRepositoryClient:
|
||||
return None
|
||||
return reply.time_of_last_deploy.ToDatetime()
|
||||
|
||||
async def discover_hierarchy(self) -> list[galaxy_pb.GalaxyObject]:
|
||||
"""Return the deployed Galaxy object hierarchy as raw proto messages."""
|
||||
async def iter_hierarchy(self) -> AsyncIterator[galaxy_pb.GalaxyObject]:
|
||||
"""Yield the deployed Galaxy object hierarchy one object at a time.
|
||||
|
||||
Pages are fetched lazily: a page is only requested once the caller has
|
||||
consumed every object from the previous page. This keeps peak memory
|
||||
bounded by a single page (``_DISCOVER_HIERARCHY_PAGE_SIZE`` objects)
|
||||
rather than the whole Galaxy. Use this for large Galaxies; use
|
||||
:meth:`discover_hierarchy` when a fully buffered ``list`` is convenient
|
||||
and the Galaxy is known to be small.
|
||||
"""
|
||||
|
||||
objects: list[galaxy_pb.GalaxyObject] = []
|
||||
seen_page_tokens: set[str] = set()
|
||||
page_token = ""
|
||||
while True:
|
||||
@@ -129,16 +136,27 @@ class GalaxyRepositoryClient:
|
||||
page_token=page_token,
|
||||
),
|
||||
)
|
||||
objects.extend(reply.objects)
|
||||
for obj in reply.objects:
|
||||
yield obj
|
||||
page_token = reply.next_page_token
|
||||
if not page_token:
|
||||
return objects
|
||||
return
|
||||
if page_token in seen_page_tokens:
|
||||
raise MxGatewayError(
|
||||
f"galaxy discover hierarchy returned repeated page token {page_token!r}"
|
||||
)
|
||||
seen_page_tokens.add(page_token)
|
||||
|
||||
async def discover_hierarchy(self) -> list[galaxy_pb.GalaxyObject]:
|
||||
"""Return the deployed Galaxy object hierarchy as raw proto messages.
|
||||
|
||||
This buffers every object (and its full attribute list) into a single
|
||||
in-memory ``list``. For a large Galaxy prefer :meth:`iter_hierarchy`,
|
||||
which streams objects page by page without holding the whole hierarchy.
|
||||
"""
|
||||
|
||||
return [obj async for obj in self.iter_hierarchy()]
|
||||
|
||||
def watch_deploy_events(
|
||||
self,
|
||||
last_seen_deploy_time: datetime | None = None,
|
||||
|
||||
@@ -0,0 +1,284 @@
|
||||
"""Regression tests for Client.Python-009: untested public paths.
|
||||
|
||||
Covers `Session.write2`/`add_item2` request construction, the bulk-size limit
|
||||
guard, the ``None``-argument ``TypeError`` guards, the TLS ``ca_file`` read
|
||||
path in `create_channel`, the generic `map_rpc_error` fallthrough, and a
|
||||
happy-path CLI command body driven by a fake stub.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any
|
||||
|
||||
import grpc
|
||||
import pytest
|
||||
from click.testing import CliRunner
|
||||
|
||||
from mxgateway import ClientOptions, GatewayClient
|
||||
from mxgateway.errors import MxGatewayTransportError, map_rpc_error
|
||||
from mxgateway.generated import mxaccess_gateway_pb2 as pb
|
||||
from mxgateway.options import create_channel
|
||||
from mxgateway.session import MAX_BULK_ITEMS, Session
|
||||
|
||||
|
||||
class _FakeUnary:
|
||||
def __init__(self, replies: list[Any]) -> None:
|
||||
self.replies = list(replies)
|
||||
self.requests: list[Any] = []
|
||||
self.metadata: tuple[tuple[str, str], ...] | None = None
|
||||
|
||||
async def __call__(
|
||||
self,
|
||||
request: Any,
|
||||
*,
|
||||
metadata: tuple[tuple[str, str], ...],
|
||||
) -> Any:
|
||||
self.requests.append(request)
|
||||
self.metadata = metadata
|
||||
return self.replies.pop(0)
|
||||
|
||||
|
||||
class _FakeGatewayStub:
|
||||
def __init__(self) -> None:
|
||||
self.open_session = _FakeUnary(
|
||||
[
|
||||
pb.OpenSessionReply(
|
||||
session_id="session-1",
|
||||
protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK),
|
||||
),
|
||||
],
|
||||
)
|
||||
self.invoke = _FakeUnary([])
|
||||
self.OpenSession = self.open_session
|
||||
self.Invoke = self.invoke
|
||||
|
||||
|
||||
def _ok_reply(kind: int, **fields: Any) -> pb.MxCommandReply:
|
||||
return pb.MxCommandReply(
|
||||
session_id="session-1",
|
||||
kind=kind,
|
||||
protocol_status=pb.ProtocolStatus(code=pb.PROTOCOL_STATUS_CODE_OK),
|
||||
**fields,
|
||||
)
|
||||
|
||||
|
||||
# --- write2 / add_item2 request construction -------------------------------
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_add_item2_sends_item_context_and_returns_handle() -> None:
|
||||
stub = _FakeGatewayStub()
|
||||
stub.invoke.replies = [
|
||||
_ok_reply(pb.MX_COMMAND_KIND_ADD_ITEM2, add_item2=pb.AddItem2Reply(item_handle=77)),
|
||||
]
|
||||
client = await GatewayClient.connect(
|
||||
ClientOptions(endpoint="fake", plaintext=True),
|
||||
stub=stub,
|
||||
)
|
||||
session = await client.open_session()
|
||||
|
||||
item_handle = await session.add_item2(12, "Object.Attribute", "ctx-A")
|
||||
|
||||
assert item_handle == 77
|
||||
command = stub.invoke.requests[0].command
|
||||
assert command.kind == pb.MX_COMMAND_KIND_ADD_ITEM2
|
||||
assert command.add_item2.server_handle == 12
|
||||
assert command.add_item2.item_definition == "Object.Attribute"
|
||||
assert command.add_item2.item_context == "ctx-A"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_write2_sends_value_and_timestamp_value() -> None:
|
||||
stub = _FakeGatewayStub()
|
||||
stub.invoke.replies = [_ok_reply(pb.MX_COMMAND_KIND_WRITE2)]
|
||||
client = await GatewayClient.connect(
|
||||
ClientOptions(endpoint="fake", plaintext=True),
|
||||
stub=stub,
|
||||
)
|
||||
session = await client.open_session()
|
||||
|
||||
when = datetime(2025, 4, 1, 12, 0, 0, tzinfo=timezone.utc)
|
||||
await session.write2(12, 34, 123, when, user_id=5)
|
||||
|
||||
command = stub.invoke.requests[0].command
|
||||
assert command.kind == pb.MX_COMMAND_KIND_WRITE2
|
||||
assert command.write2.server_handle == 12
|
||||
assert command.write2.item_handle == 34
|
||||
assert command.write2.user_id == 5
|
||||
# The integer value is carried as the int32 field of the MxValue oneof.
|
||||
assert command.write2.value.WhichOneof("kind") == "int32_value"
|
||||
assert command.write2.value.int32_value == 123
|
||||
# The timestamp value carries the datetime via the timestamp_value oneof.
|
||||
assert command.write2.timestamp_value.WhichOneof("kind") == "timestamp_value"
|
||||
assert command.write2.timestamp_value.timestamp_value.ToDatetime(
|
||||
tzinfo=timezone.utc,
|
||||
) == when
|
||||
|
||||
|
||||
# --- bulk-size limit + None-argument guards --------------------------------
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_subscribe_bulk_rejects_oversized_request() -> None:
|
||||
stub = _FakeGatewayStub()
|
||||
client = await GatewayClient.connect(
|
||||
ClientOptions(endpoint="fake", plaintext=True),
|
||||
stub=stub,
|
||||
)
|
||||
session = await client.open_session()
|
||||
|
||||
oversized = [f"Tag_{i}" for i in range(MAX_BULK_ITEMS + 1)]
|
||||
with pytest.raises(ValueError, match=str(MAX_BULK_ITEMS)):
|
||||
await session.subscribe_bulk(12, oversized)
|
||||
|
||||
# No RPC should have been issued for a rejected request.
|
||||
assert stub.invoke.requests == []
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_advise_item_bulk_rejects_none_argument() -> None:
|
||||
stub = _FakeGatewayStub()
|
||||
client = await GatewayClient.connect(
|
||||
ClientOptions(endpoint="fake", plaintext=True),
|
||||
stub=stub,
|
||||
)
|
||||
session = await client.open_session()
|
||||
|
||||
with pytest.raises(TypeError, match="item_handles is required"):
|
||||
await session.advise_item_bulk(12, None) # type: ignore[arg-type]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_add_item_bulk_at_limit_is_allowed() -> None:
|
||||
stub = _FakeGatewayStub()
|
||||
stub.invoke.replies = [
|
||||
_ok_reply(
|
||||
pb.MX_COMMAND_KIND_ADD_ITEM_BULK,
|
||||
add_item_bulk=pb.BulkSubscribeReply(results=[]),
|
||||
),
|
||||
]
|
||||
client = await GatewayClient.connect(
|
||||
ClientOptions(endpoint="fake", plaintext=True),
|
||||
stub=stub,
|
||||
)
|
||||
session = await client.open_session()
|
||||
|
||||
at_limit = [f"Tag_{i}" for i in range(MAX_BULK_ITEMS)]
|
||||
results = await session.add_item_bulk(12, at_limit)
|
||||
|
||||
assert results == []
|
||||
assert len(stub.invoke.requests) == 1
|
||||
assert len(stub.invoke.requests[0].command.add_item_bulk.tag_addresses) == MAX_BULK_ITEMS
|
||||
|
||||
|
||||
# --- TLS ca_file read path -------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_create_channel_reads_ca_file(tmp_path: Any) -> None:
|
||||
ca_path = tmp_path / "ca.pem"
|
||||
ca_path.write_bytes(b"-----BEGIN CERTIFICATE-----\nfake\n-----END CERTIFICATE-----\n")
|
||||
|
||||
channel = create_channel(
|
||||
ClientOptions(
|
||||
endpoint="mxgateway.example.local:5001",
|
||||
ca_file=str(ca_path),
|
||||
server_name_override="mxgateway.example.local",
|
||||
),
|
||||
)
|
||||
|
||||
# A secure channel object is returned without raising; the ca_file was read.
|
||||
assert channel is not None
|
||||
await channel.close()
|
||||
|
||||
|
||||
def test_create_channel_missing_ca_file_raises() -> None:
|
||||
with pytest.raises(FileNotFoundError):
|
||||
create_channel(
|
||||
ClientOptions(
|
||||
endpoint="mxgateway.example.local:5001",
|
||||
ca_file="C:/does/not/exist/ca.pem",
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
# --- map_rpc_error generic fallthrough -------------------------------------
|
||||
|
||||
|
||||
class _FakeRpcError(grpc.RpcError):
|
||||
def __init__(self, code: grpc.StatusCode, details: str) -> None:
|
||||
self._code = code
|
||||
self._details = details
|
||||
|
||||
def code(self) -> grpc.StatusCode:
|
||||
return self._code
|
||||
|
||||
def details(self) -> str:
|
||||
return self._details
|
||||
|
||||
|
||||
def test_map_rpc_error_generic_branch_returns_transport_error() -> None:
|
||||
error = _FakeRpcError(grpc.StatusCode.UNAVAILABLE, "connection refused")
|
||||
|
||||
mapped = map_rpc_error("invoke", error)
|
||||
|
||||
assert type(mapped) is MxGatewayTransportError
|
||||
assert "invoke failed: connection refused" in str(mapped)
|
||||
|
||||
|
||||
def test_map_rpc_error_handles_error_without_code() -> None:
|
||||
mapped = map_rpc_error("invoke", grpc.RpcError())
|
||||
|
||||
assert type(mapped) is MxGatewayTransportError
|
||||
assert "invoke failed:" in str(mapped)
|
||||
|
||||
|
||||
# --- happy-path CLI command body -------------------------------------------
|
||||
|
||||
|
||||
def test_cli_register_happy_path_emits_server_handle(monkeypatch: Any) -> None:
|
||||
"""Drive the `register` CLI command end to end against a fake stub."""
|
||||
|
||||
from mxgateway_cli import commands
|
||||
|
||||
invoke = _FakeUnary(
|
||||
[
|
||||
_ok_reply(
|
||||
pb.MX_COMMAND_KIND_REGISTER,
|
||||
register=pb.RegisterReply(server_handle=99),
|
||||
),
|
||||
],
|
||||
)
|
||||
|
||||
class _Stub:
|
||||
def __init__(self) -> None:
|
||||
self.Invoke = invoke
|
||||
|
||||
async def _fake_connect(kwargs: dict[str, Any]) -> GatewayClient:
|
||||
return await GatewayClient.connect(
|
||||
ClientOptions(endpoint=kwargs["endpoint"], plaintext=True),
|
||||
stub=_Stub(),
|
||||
)
|
||||
|
||||
monkeypatch.setattr(commands, "_connect", _fake_connect)
|
||||
|
||||
runner = CliRunner()
|
||||
result = runner.invoke(
|
||||
commands.main,
|
||||
[
|
||||
"register",
|
||||
"--endpoint",
|
||||
"localhost:5000",
|
||||
"--session-id",
|
||||
"session-1",
|
||||
"--client-name",
|
||||
"pytest-client",
|
||||
"--json",
|
||||
],
|
||||
)
|
||||
|
||||
assert result.exit_code == 0, result.output
|
||||
assert json.loads(result.output) == {"serverHandle": 99}
|
||||
assert invoke.requests[0].command.register.client_name == "pytest-client"
|
||||
@@ -0,0 +1,127 @@
|
||||
"""Regression tests for Client.Python-005: streaming hierarchy iteration.
|
||||
|
||||
`GalaxyRepositoryClient.iter_hierarchy` yields objects page by page instead of
|
||||
buffering the entire Galaxy hierarchy in memory, and `discover_hierarchy`
|
||||
remains a convenience wrapper built on top of it.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
import pytest
|
||||
|
||||
from mxgateway import ClientOptions, GalaxyRepositoryClient
|
||||
from mxgateway.generated import galaxy_repository_pb2 as galaxy_pb
|
||||
|
||||
|
||||
class _FakeUnary:
|
||||
def __init__(self, replies: list[Any]) -> None:
|
||||
self.replies = list(replies)
|
||||
self.requests: list[Any] = []
|
||||
self.metadata: tuple[tuple[str, str], ...] | None = None
|
||||
|
||||
async def __call__(
|
||||
self,
|
||||
request: Any,
|
||||
*,
|
||||
metadata: tuple[tuple[str, str], ...],
|
||||
timeout: float | None = None,
|
||||
) -> Any:
|
||||
self.requests.append(request)
|
||||
self.metadata = metadata
|
||||
return self.replies.pop(0)
|
||||
|
||||
|
||||
class _FakeGalaxyStub:
|
||||
def __init__(self, discover_replies: list[Any]) -> None:
|
||||
self.DiscoverHierarchy = _FakeUnary(discover_replies)
|
||||
|
||||
|
||||
def _two_page_replies() -> list[galaxy_pb.DiscoverHierarchyReply]:
|
||||
return [
|
||||
galaxy_pb.DiscoverHierarchyReply(
|
||||
next_page_token="page-2",
|
||||
total_object_count=3,
|
||||
objects=[
|
||||
galaxy_pb.GalaxyObject(gobject_id=1, tag_name="Area_001", is_area=True),
|
||||
galaxy_pb.GalaxyObject(gobject_id=2, tag_name="Pump_001"),
|
||||
],
|
||||
),
|
||||
galaxy_pb.DiscoverHierarchyReply(
|
||||
total_object_count=3,
|
||||
objects=[
|
||||
galaxy_pb.GalaxyObject(gobject_id=3, tag_name="Pump_002"),
|
||||
],
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_iter_hierarchy_yields_objects_across_pages() -> None:
|
||||
stub = _FakeGalaxyStub(_two_page_replies())
|
||||
client = await GalaxyRepositoryClient.connect(
|
||||
ClientOptions(endpoint="fake", plaintext=True),
|
||||
stub=stub,
|
||||
)
|
||||
|
||||
tags = [obj.tag_name async for obj in client.iter_hierarchy()]
|
||||
|
||||
assert tags == ["Area_001", "Pump_001", "Pump_002"]
|
||||
assert len(stub.DiscoverHierarchy.requests) == 2
|
||||
assert stub.DiscoverHierarchy.requests[0].page_token == ""
|
||||
assert stub.DiscoverHierarchy.requests[1].page_token == "page-2"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_iter_hierarchy_is_lazy_and_does_not_prefetch_next_page() -> None:
|
||||
"""Pulling only the first object must not have requested the second page."""
|
||||
|
||||
stub = _FakeGalaxyStub(_two_page_replies())
|
||||
client = await GalaxyRepositoryClient.connect(
|
||||
ClientOptions(endpoint="fake", plaintext=True),
|
||||
stub=stub,
|
||||
)
|
||||
|
||||
iterator = client.iter_hierarchy()
|
||||
first = await iterator.__anext__()
|
||||
|
||||
assert first.tag_name == "Area_001"
|
||||
# Only the first page should have been fetched so far.
|
||||
assert len(stub.DiscoverHierarchy.requests) == 1
|
||||
|
||||
await iterator.aclose()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_iter_hierarchy_rejects_repeated_page_token() -> None:
|
||||
stub = _FakeGalaxyStub(
|
||||
[
|
||||
galaxy_pb.DiscoverHierarchyReply(next_page_token="7:1"),
|
||||
galaxy_pb.DiscoverHierarchyReply(next_page_token="7:1"),
|
||||
],
|
||||
)
|
||||
client = await GalaxyRepositoryClient.connect(
|
||||
ClientOptions(endpoint="fake", plaintext=True),
|
||||
stub=stub,
|
||||
)
|
||||
|
||||
with pytest.raises(Exception, match="repeated page token"):
|
||||
async for _ in client.iter_hierarchy():
|
||||
pass
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_discover_hierarchy_still_returns_full_list() -> None:
|
||||
"""The convenience wrapper must keep returning a buffered list."""
|
||||
|
||||
stub = _FakeGalaxyStub(_two_page_replies())
|
||||
client = await GalaxyRepositoryClient.connect(
|
||||
ClientOptions(endpoint="fake", plaintext=True),
|
||||
stub=stub,
|
||||
)
|
||||
|
||||
objects = await client.discover_hierarchy()
|
||||
|
||||
assert isinstance(objects, list)
|
||||
assert [obj.tag_name for obj in objects] == ["Area_001", "Pump_001", "Pump_002"]
|
||||
@@ -0,0 +1,132 @@
|
||||
"""Regression tests for Client.Python-003: stream timeout-kwarg fallback.
|
||||
|
||||
`stream_events_raw` and `query_active_alarms` must tolerate a fake/older stub
|
||||
that does not accept a ``timeout`` keyword argument, matching the fallback
|
||||
already present in `galaxy.watch_deploy_events` and the unary `_unary` helper.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
import pytest
|
||||
|
||||
from mxgateway import ClientOptions, GatewayClient
|
||||
from mxgateway.generated import mxaccess_gateway_pb2 as pb
|
||||
|
||||
|
||||
class _NoTimeoutStream:
|
||||
"""Sync-callable unary-stream fake that rejects a ``timeout`` kwarg."""
|
||||
|
||||
def __init__(self, replies: list[Any]) -> None:
|
||||
self._replies = list(replies)
|
||||
self.requests: list[Any] = []
|
||||
self.metadata: tuple[tuple[str, str], ...] | None = None
|
||||
self.cancelled = False
|
||||
|
||||
def __call__(
|
||||
self,
|
||||
request: Any,
|
||||
*,
|
||||
metadata: tuple[tuple[str, str], ...],
|
||||
) -> "_NoTimeoutStream":
|
||||
self.requests.append(request)
|
||||
self.metadata = metadata
|
||||
return self
|
||||
|
||||
def __aiter__(self) -> "_NoTimeoutStream":
|
||||
return self
|
||||
|
||||
async def __anext__(self) -> Any:
|
||||
if not self._replies:
|
||||
raise StopAsyncIteration
|
||||
return self._replies.pop(0)
|
||||
|
||||
def cancel(self) -> None:
|
||||
self.cancelled = True
|
||||
|
||||
|
||||
class _NoTimeoutStubStreamEvents:
|
||||
def __init__(self, stream: _NoTimeoutStream) -> None:
|
||||
self.StreamEvents = stream
|
||||
|
||||
|
||||
class _NoTimeoutStubQueryAlarms:
|
||||
def __init__(self, stream: _NoTimeoutStream) -> None:
|
||||
self.QueryActiveAlarms = stream
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_stream_events_raw_falls_back_when_stub_rejects_timeout() -> None:
|
||||
stream = _NoTimeoutStream(
|
||||
[pb.MxEvent(session_id="session-1", worker_sequence=1)],
|
||||
)
|
||||
client = await GatewayClient.connect(
|
||||
ClientOptions(endpoint="fake", plaintext=True, stream_timeout=5.0),
|
||||
stub=_NoTimeoutStubStreamEvents(stream),
|
||||
)
|
||||
|
||||
received = [
|
||||
event
|
||||
async for event in client.stream_events_raw(
|
||||
pb.StreamEventsRequest(session_id="session-1"),
|
||||
)
|
||||
]
|
||||
|
||||
assert len(received) == 1
|
||||
assert received[0].worker_sequence == 1
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_query_active_alarms_falls_back_when_stub_rejects_timeout() -> None:
|
||||
stream = _NoTimeoutStream(
|
||||
[pb.ActiveAlarmSnapshot(alarm_full_reference="Tank01.Level.HiHi")],
|
||||
)
|
||||
client = await GatewayClient.connect(
|
||||
ClientOptions(endpoint="fake", plaintext=True, stream_timeout=5.0),
|
||||
stub=_NoTimeoutStubQueryAlarms(stream),
|
||||
)
|
||||
|
||||
received = [
|
||||
snapshot
|
||||
async for snapshot in client.query_active_alarms(
|
||||
pb.QueryActiveAlarmsRequest(session_id="session-1"),
|
||||
)
|
||||
]
|
||||
|
||||
assert len(received) == 1
|
||||
assert received[0].alarm_full_reference == "Tank01.Level.HiHi"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_stream_events_raw_still_passes_timeout_to_capable_stub() -> None:
|
||||
"""A stub that accepts ``timeout`` must still receive the configured value."""
|
||||
|
||||
captured: dict[str, Any] = {}
|
||||
|
||||
class _CapableStream(_NoTimeoutStream):
|
||||
def __call__( # type: ignore[override]
|
||||
self,
|
||||
request: Any,
|
||||
*,
|
||||
metadata: tuple[tuple[str, str], ...],
|
||||
timeout: float | None = None,
|
||||
) -> "_CapableStream":
|
||||
captured["timeout"] = timeout
|
||||
return super().__call__(request, metadata=metadata)
|
||||
|
||||
stream = _CapableStream([pb.MxEvent(session_id="session-1", worker_sequence=9)])
|
||||
client = await GatewayClient.connect(
|
||||
ClientOptions(endpoint="fake", plaintext=True, stream_timeout=7.5),
|
||||
stub=_NoTimeoutStubStreamEvents(stream),
|
||||
)
|
||||
|
||||
received = [
|
||||
event
|
||||
async for event in client.stream_events_raw(
|
||||
pb.StreamEventsRequest(session_id="session-1"),
|
||||
)
|
||||
]
|
||||
|
||||
assert len(received) == 1
|
||||
assert captured["timeout"] == 7.5
|
||||
@@ -7,7 +7,7 @@
|
||||
| Review date | 2026-05-18 |
|
||||
| Commit reviewed | `3cc53a8` |
|
||||
| Status | Reviewed |
|
||||
| Open findings | 12 |
|
||||
| Open findings | 9 |
|
||||
|
||||
## Checklist coverage
|
||||
|
||||
@@ -63,13 +63,13 @@
|
||||
| Severity | Medium |
|
||||
| Category | Error handling & resilience |
|
||||
| Location | `clients/python/src/mxgateway/client.py:125-137,155-173` |
|
||||
| Status | Open |
|
||||
| Status | Resolved |
|
||||
|
||||
**Description:** `stream_events_raw` and `query_active_alarms` call the stub directly with a `timeout` kwarg when `stream_timeout` is set, with no `TypeError` fallback. `galaxy.py:watch_deploy_events` and `_unary` *do* have a fallback that strips `timeout` if the callable rejects it. This asymmetry means a fake/older stub that does not accept `timeout` crashes for gateway streams but not Galaxy streams. It is only masked today because `stream_timeout` defaults to `None`.
|
||||
|
||||
**Recommendation:** Apply the same `try/except TypeError` timeout-fallback pattern to `stream_events_raw` and `query_active_alarms`, or remove the fallback everywhere and standardise on a single behaviour.
|
||||
|
||||
**Resolution:** _(open)_
|
||||
**Resolution:** 2026-05-18 — Confirmed: both stream methods in `client.py` called the stub with `timeout` unconditionally and had no `TypeError` fallback, unlike `_unary` and `galaxy.watch_deploy_events`. Added a shared `_open_stream` helper in `client.py` that opens a server-streaming call and strips the `timeout` kwarg when the stub raises `TypeError: ... unexpected keyword argument 'timeout'`, then routed both `stream_events_raw` and `query_active_alarms` through it. Regression tests in `tests/test_stream_timeout_fallback.py` (`test_stream_events_raw_falls_back_when_stub_rejects_timeout`, `test_query_active_alarms_falls_back_when_stub_rejects_timeout`, `test_stream_events_raw_still_passes_timeout_to_capable_stub`) failed before the fix and pass after. No public behaviour change for real gRPC stubs, so no README update needed.
|
||||
|
||||
### Client.Python-004
|
||||
|
||||
@@ -93,13 +93,13 @@
|
||||
| Severity | Medium |
|
||||
| Category | Performance & resource management |
|
||||
| Location | `clients/python/src/mxgateway/galaxy.py:117-140` |
|
||||
| Status | Open |
|
||||
| Status | Resolved |
|
||||
|
||||
**Description:** `discover_hierarchy` pages through the entire Galaxy object hierarchy and accumulates every `GalaxyObject` (each carrying its full attribute list) into a single in-memory `list` before returning. For a large Galaxy this is a very large allocation with no streaming alternative and no caller-side bound.
|
||||
|
||||
**Recommendation:** Offer an async-generator variant (e.g. `iter_hierarchy()`) that yields objects/pages as they arrive, keeping `discover_hierarchy()` as a convenience wrapper. At minimum document the memory characteristic.
|
||||
|
||||
**Resolution:** _(open)_
|
||||
**Resolution:** 2026-05-18 — Confirmed: `discover_hierarchy` buffered the entire hierarchy with no streaming alternative. Added `GalaxyRepositoryClient.iter_hierarchy`, an async generator that fetches one `DiscoverHierarchyRequest` page at a time and yields each `GalaxyObject` as it arrives, so peak memory is bounded by a single page (`_DISCOVER_HIERARCHY_PAGE_SIZE`). Pages are fetched lazily — the next page is only requested after the current page is fully consumed. `discover_hierarchy` is now a thin convenience wrapper (`[obj async for obj in self.iter_hierarchy()]`) that preserves its `list[GalaxyObject]` contract, including the repeated-page-token guard. Regression tests in `tests/test_galaxy_iter_hierarchy.py` (`test_iter_hierarchy_yields_objects_across_pages`, `test_iter_hierarchy_is_lazy_and_does_not_prefetch_next_page`, `test_iter_hierarchy_rejects_repeated_page_token`, `test_discover_hierarchy_still_returns_full_list`) failed before the fix and pass after. `clients/python/README.md` updated with the `iter_hierarchy` usage and memory guidance since this adds a new public method.
|
||||
|
||||
### Client.Python-006
|
||||
|
||||
@@ -153,13 +153,13 @@
|
||||
| Severity | Medium |
|
||||
| Category | Testing coverage |
|
||||
| Location | `clients/python/tests/` |
|
||||
| Status | Open |
|
||||
| Status | Resolved |
|
||||
|
||||
**Description:** Several non-trivial public paths are untested: `Session.write2`/`add_item2` request construction; the bulk-size limit `_ensure_bulk_size`/`MAX_BULK_ITEMS` guard; the `None`-argument `TypeError` guards in bulk methods; the TLS `ca_file` read path in `create_channel`; most CLI command bodies; and `map_rpc_error`'s default (non-auth) branch.
|
||||
|
||||
**Recommendation:** Add tests for `write2`/`add_item2` request shape, the bulk-size `ValueError`, the `ca_file` TLS branch, the generic `map_rpc_error` fallthrough, and at least one happy-path CLI command using a fake stub.
|
||||
|
||||
**Resolution:** _(open)_
|
||||
**Resolution:** 2026-05-18 — Confirmed coverage gap against the existing `tests/` files. Added `tests/test_coverage_gaps.py` covering every path the finding lists: `test_add_item2_sends_item_context_and_returns_handle` and `test_write2_sends_value_and_timestamp_value` (request shape + `MxValue` oneof), `test_subscribe_bulk_rejects_oversized_request` and `test_add_item_bulk_at_limit_is_allowed` (the `MAX_BULK_ITEMS` `_ensure_bulk_size` boundary), `test_advise_item_bulk_rejects_none_argument` (the `None`-argument `TypeError` guard), `test_create_channel_reads_ca_file` and `test_create_channel_missing_ca_file_raises` (the TLS `ca_file` read path), `test_map_rpc_error_generic_branch_returns_transport_error` and `test_map_rpc_error_handles_error_without_code` (the non-auth `map_rpc_error` fallthrough and the no-`code` path), and `test_cli_register_happy_path_emits_server_handle` (a happy-path CLI command body driven end to end through `CliRunner` with a fake stub via a monkeypatched `_connect`). All 10 new tests pass. No source change required — this is a pure coverage finding.
|
||||
|
||||
### Client.Python-010
|
||||
|
||||
|
||||
Reference in New Issue
Block a user