"""Regression tests for Client.Python-005: streaming hierarchy iteration. `GalaxyRepositoryClient.iter_hierarchy` yields objects page by page instead of buffering the entire Galaxy hierarchy in memory, and `discover_hierarchy` remains a convenience wrapper built on top of it. """ from __future__ import annotations from typing import Any import pytest from mxgateway import ClientOptions, GalaxyRepositoryClient from mxgateway.generated import galaxy_repository_pb2 as galaxy_pb class _FakeUnary: def __init__(self, replies: list[Any]) -> None: self.replies = list(replies) self.requests: list[Any] = [] self.metadata: tuple[tuple[str, str], ...] | None = None async def __call__( self, request: Any, *, metadata: tuple[tuple[str, str], ...], timeout: float | None = None, ) -> Any: self.requests.append(request) self.metadata = metadata return self.replies.pop(0) class _FakeGalaxyStub: def __init__(self, discover_replies: list[Any]) -> None: self.DiscoverHierarchy = _FakeUnary(discover_replies) def _two_page_replies() -> list[galaxy_pb.DiscoverHierarchyReply]: return [ galaxy_pb.DiscoverHierarchyReply( next_page_token="page-2", total_object_count=3, objects=[ galaxy_pb.GalaxyObject(gobject_id=1, tag_name="Area_001", is_area=True), galaxy_pb.GalaxyObject(gobject_id=2, tag_name="Pump_001"), ], ), galaxy_pb.DiscoverHierarchyReply( total_object_count=3, objects=[ galaxy_pb.GalaxyObject(gobject_id=3, tag_name="Pump_002"), ], ), ] @pytest.mark.asyncio async def test_iter_hierarchy_yields_objects_across_pages() -> None: stub = _FakeGalaxyStub(_two_page_replies()) client = await GalaxyRepositoryClient.connect( ClientOptions(endpoint="fake", plaintext=True), stub=stub, ) tags = [obj.tag_name async for obj in client.iter_hierarchy()] assert tags == ["Area_001", "Pump_001", "Pump_002"] assert len(stub.DiscoverHierarchy.requests) == 2 assert stub.DiscoverHierarchy.requests[0].page_token == "" assert stub.DiscoverHierarchy.requests[1].page_token == "page-2" @pytest.mark.asyncio async def test_iter_hierarchy_is_lazy_and_does_not_prefetch_next_page() -> None: """Pulling only the first object must not have requested the second page.""" stub = _FakeGalaxyStub(_two_page_replies()) client = await GalaxyRepositoryClient.connect( ClientOptions(endpoint="fake", plaintext=True), stub=stub, ) iterator = client.iter_hierarchy() first = await iterator.__anext__() assert first.tag_name == "Area_001" # Only the first page should have been fetched so far. assert len(stub.DiscoverHierarchy.requests) == 1 await iterator.aclose() @pytest.mark.asyncio async def test_iter_hierarchy_rejects_repeated_page_token() -> None: stub = _FakeGalaxyStub( [ galaxy_pb.DiscoverHierarchyReply(next_page_token="7:1"), galaxy_pb.DiscoverHierarchyReply(next_page_token="7:1"), ], ) client = await GalaxyRepositoryClient.connect( ClientOptions(endpoint="fake", plaintext=True), stub=stub, ) with pytest.raises(Exception, match="repeated page token"): async for _ in client.iter_hierarchy(): pass @pytest.mark.asyncio async def test_discover_hierarchy_still_returns_full_list() -> None: """The convenience wrapper must keep returning a buffered list.""" stub = _FakeGalaxyStub(_two_page_replies()) client = await GalaxyRepositoryClient.connect( ClientOptions(endpoint="fake", plaintext=True), stub=stub, ) objects = await client.discover_hierarchy() assert isinstance(objects, list) assert [obj.tag_name for obj in objects] == ["Area_001", "Pump_001", "Pump_002"]