diff --git a/clients/python/src/zb_mom_ww_mxgateway/galaxy.py b/clients/python/src/zb_mom_ww_mxgateway/galaxy.py index 09069f7..42f29e5 100644 --- a/clients/python/src/zb_mom_ww_mxgateway/galaxy.py +++ b/clients/python/src/zb_mom_ww_mxgateway/galaxy.py @@ -21,9 +21,10 @@ from .auth import merge_metadata from .errors import MxGatewayError, map_rpc_error from .generated import galaxy_repository_pb2 as galaxy_pb from .generated import galaxy_repository_pb2_grpc as galaxy_pb_grpc -from .options import ClientOptions, create_channel +from .options import BrowseChildrenOptions, ClientOptions, create_channel _DISCOVER_HIERARCHY_PAGE_SIZE = 5000 +_BROWSE_CHILDREN_PAGE_SIZE = 500 class GalaxyRepositoryClient: @@ -139,6 +140,73 @@ class GalaxyRepositoryClient: ) seen_page_tokens.add(page_token) + async def browse( + self, + options: BrowseChildrenOptions | None = None, + ) -> list["LazyBrowseNode"]: + """Return the root browse nodes for lazy hierarchy traversal. + + Each returned ``LazyBrowseNode`` wraps a Galaxy object whose direct + children can be loaded on demand by ``await node.expand()``. + """ + + effective = options or BrowseChildrenOptions() + return [ + node + async for node in self._iter_browse_children( + parent_gobject_id=None, + options=effective, + ) + ] + + async def _iter_browse_children( + self, + *, + parent_gobject_id: int | None, + options: BrowseChildrenOptions, + ) -> AsyncIterator["LazyBrowseNode"]: + page_token = "" + seen_page_tokens: set[str] = set() + while True: + request = galaxy_pb.BrowseChildrenRequest( + page_size=_BROWSE_CHILDREN_PAGE_SIZE, + page_token=page_token, + alarm_bearing_only=options.alarm_bearing_only, + historized_only=options.historized_only, + ) + if parent_gobject_id is not None: + request.parent_gobject_id = parent_gobject_id + if options.category_ids: + request.category_ids.extend(options.category_ids) + if options.template_chain_contains: + request.template_chain_contains.extend(options.template_chain_contains) + if options.tag_name_glob: + request.tag_name_glob = options.tag_name_glob + if options.include_attributes is not None: + request.include_attributes = options.include_attributes + + reply = await self._unary( + "browse children", + self.raw_stub.BrowseChildren, + request, + ) + + for index, obj in enumerate(reply.children): + hint = ( + index < len(reply.child_has_children) + and bool(reply.child_has_children[index]) + ) + yield LazyBrowseNode(self, obj, hint, options) + + page_token = reply.next_page_token + if not page_token: + return + if page_token in seen_page_tokens: + raise MxGatewayError( + f"galaxy browse children returned repeated page token {page_token!r}" + ) + seen_page_tokens.add(page_token) + def watch_deploy_events( self, last_seen_deploy_time: datetime | None = None, @@ -202,6 +270,63 @@ class GalaxyRepositoryClient: raise map_rpc_error(operation, error) from error +class LazyBrowseNode: + """One node in a lazy-loaded Galaxy browse tree. + + Calling ``expand`` once fetches direct children (paginating as needed) + and populates ``children``. Subsequent calls are no-ops so callers can + drive UI expand toggles without de-duping. + """ + + def __init__( + self, + client: "GalaxyRepositoryClient", + obj: galaxy_pb.GalaxyObject, + has_children_hint: bool, + options: BrowseChildrenOptions, + ) -> None: + """Initialize a node bound to its owning client and filter set.""" + self._client = client + self._object = obj + self._has_children_hint = has_children_hint + self._options = options + self._children: list[LazyBrowseNode] = [] + self._is_expanded = False + + @property + def object(self) -> galaxy_pb.GalaxyObject: + """Return the underlying ``GalaxyObject`` proto for this node.""" + return self._object + + @property + def has_children_hint(self) -> bool: + """Return the server hint about whether this node has children.""" + return self._has_children_hint + + @property + def children(self) -> list["LazyBrowseNode"]: + """Return a copy of the loaded child nodes (empty until expanded).""" + return list(self._children) + + @property + def is_expanded(self) -> bool: + """Return whether ``expand`` has already populated ``children``.""" + return self._is_expanded + + async def expand(self) -> None: + """Fetch direct children of this node; no-op on subsequent calls.""" + if self._is_expanded: + return + new_children: list[LazyBrowseNode] = [] + async for child in self._client._iter_browse_children( + parent_gobject_id=self._object.gobject_id, + options=self._options, + ): + new_children.append(child) + self._children.extend(new_children) + self._is_expanded = True + + async def _canceling_iterator(call: Any) -> AsyncIterator[galaxy_pb.DeployEvent]: try: async for event in call: diff --git a/clients/python/src/zb_mom_ww_mxgateway/options.py b/clients/python/src/zb_mom_ww_mxgateway/options.py index 59fe092..060d50c 100644 --- a/clients/python/src/zb_mom_ww_mxgateway/options.py +++ b/clients/python/src/zb_mom_ww_mxgateway/options.py @@ -2,7 +2,8 @@ from __future__ import annotations -from dataclasses import dataclass +from collections.abc import Sequence +from dataclasses import dataclass, field from pathlib import Path import grpc @@ -51,6 +52,23 @@ class ClientOptions: ) +@dataclass(frozen=True) +class BrowseChildrenOptions: + """Filters and shape options for ``GalaxyRepositoryClient.browse``. + + Mirrors the AND-combined filter set on ``BrowseChildrenRequest`` so a + single instance can be re-used across an entire lazy browse session + (the filter set is part of the page-token contract). + """ + + category_ids: Sequence[int] = field(default_factory=tuple) + template_chain_contains: Sequence[str] = field(default_factory=tuple) + tag_name_glob: str | None = None + include_attributes: bool | None = None + alarm_bearing_only: bool = False + historized_only: bool = False + + def create_channel(options: ClientOptions) -> grpc.aio.Channel: """Create a plaintext or TLS `grpc.aio` channel from client options.""" diff --git a/clients/python/tests/test_galaxy.py b/clients/python/tests/test_galaxy.py index 6dcf194..f67ab34 100644 --- a/clients/python/tests/test_galaxy.py +++ b/clients/python/tests/test_galaxy.py @@ -6,12 +6,16 @@ import asyncio from datetime import datetime, timezone from typing import Any +import grpc import pytest from google.protobuf.timestamp_pb2 import Timestamp from zb_mom_ww_mxgateway import ClientOptions, DeployEvent, GalaxyRepositoryClient, WatchDeployEventsRequest +from zb_mom_ww_mxgateway.errors import MxGatewayError +from zb_mom_ww_mxgateway.galaxy import LazyBrowseNode from zb_mom_ww_mxgateway.generated import galaxy_repository_pb2 as galaxy_pb from zb_mom_ww_mxgateway.generated import galaxy_repository_pb2_grpc as galaxy_pb_grpc +from zb_mom_ww_mxgateway.options import BrowseChildrenOptions def test_galaxy_messages_import() -> None: @@ -268,15 +272,230 @@ async def test_close_marks_channel_closed_when_no_real_channel() -> None: await client.close() +def _obj(gid: int, tag: str, is_area: bool = False) -> galaxy_pb.GalaxyObject: + return galaxy_pb.GalaxyObject( + gobject_id=gid, tag_name=tag, browse_name=tag, is_area=is_area, + ) + + +def _build_browse_reply( + children: list[galaxy_pb.GalaxyObject], + child_has_children: list[bool], + cache_sequence: int, + next_page_token: str = "", +) -> galaxy_pb.BrowseChildrenReply: + reply = galaxy_pb.BrowseChildrenReply( + total_child_count=len(children), + cache_sequence=cache_sequence, + next_page_token=next_page_token, + ) + reply.children.extend(children) + reply.child_has_children.extend(child_has_children) + return reply + + +def _fake_aio_rpc_error(code: grpc.StatusCode, details: str) -> grpc.aio.AioRpcError: + return grpc.aio.AioRpcError( + code=code, + initial_metadata=grpc.aio.Metadata(), + trailing_metadata=grpc.aio.Metadata(), + details=details, + ) + + +@pytest.mark.asyncio +async def test_browse_no_parent_returns_roots() -> None: + stub = FakeGalaxyStub() + stub.browse_children.replies = [ + _build_browse_reply( + children=[_obj(1, "Area_A", is_area=True), _obj(2, "Area_B", is_area=True)], + child_has_children=[True, False], + cache_sequence=7, + ), + ] + client = await GalaxyRepositoryClient.connect( + ClientOptions(endpoint="fake", plaintext=True), + stub=stub, + ) + + roots = await client.browse() + + assert len(roots) == 2 + assert all(isinstance(node, LazyBrowseNode) for node in roots) + assert roots[0].object.tag_name == "Area_A" + assert roots[0].has_children_hint is True + assert roots[1].has_children_hint is False + assert roots[0].is_expanded is False + request = stub.browse_children.requests[0] + assert request.WhichOneof("parent") is None + assert request.page_size == 500 + assert request.page_token == "" + + +@pytest.mark.asyncio +async def test_browse_expand_populates_children_and_marks_expanded() -> None: + stub = FakeGalaxyStub() + stub.browse_children.replies = [ + _build_browse_reply( + children=[_obj(1, "Area_A", is_area=True)], + child_has_children=[True], + cache_sequence=1, + ), + _build_browse_reply( + children=[_obj(11, "Child_A"), _obj(12, "Child_B")], + child_has_children=[False, False], + cache_sequence=1, + ), + ] + client = await GalaxyRepositoryClient.connect( + ClientOptions(endpoint="fake", plaintext=True), + stub=stub, + ) + + roots = await client.browse() + await roots[0].expand() + + assert roots[0].is_expanded is True + assert [n.object.tag_name for n in roots[0].children] == ["Child_A", "Child_B"] + assert len(stub.browse_children.requests) == 2 + expand_request = stub.browse_children.requests[1] + assert expand_request.WhichOneof("parent") == "parent_gobject_id" + assert expand_request.parent_gobject_id == 1 + + +@pytest.mark.asyncio +async def test_browse_expand_idempotent_no_second_rpc() -> None: + stub = FakeGalaxyStub() + stub.browse_children.replies = [ + _build_browse_reply( + children=[_obj(1, "Area_A", is_area=True)], + child_has_children=[True], + cache_sequence=1, + ), + _build_browse_reply( + children=[_obj(11, "Child_A")], + child_has_children=[False], + cache_sequence=1, + ), + ] + client = await GalaxyRepositoryClient.connect( + ClientOptions(endpoint="fake", plaintext=True), + stub=stub, + ) + + roots = await client.browse() + await roots[0].expand() + await roots[0].expand() + + assert len(stub.browse_children.requests) == 2 + assert len(roots[0].children) == 1 + + +@pytest.mark.asyncio +async def test_browse_expand_unknown_parent_raises_mxgateway_error() -> None: + stub = FakeGalaxyStub() + stub.browse_children.replies = [ + _build_browse_reply( + children=[_obj(99, "Stale_Parent", is_area=True)], + child_has_children=[True], + cache_sequence=1, + ), + ] + stub.browse_children.exceptions = [ + None, + _fake_aio_rpc_error(grpc.StatusCode.NOT_FOUND, "parent not found"), + ] + client = await GalaxyRepositoryClient.connect( + ClientOptions(endpoint="fake", plaintext=True), + stub=stub, + ) + + roots = await client.browse() + with pytest.raises(MxGatewayError): + await roots[0].expand() + + +@pytest.mark.asyncio +async def test_browse_expand_multi_page_gathers_all_pages() -> None: + stub = FakeGalaxyStub() + stub.browse_children.replies = [ + _build_browse_reply( + children=[_obj(7, "Area_Big", is_area=True)], + child_has_children=[True], + cache_sequence=2, + ), + _build_browse_reply( + children=[_obj(71, "Child_1"), _obj(72, "Child_2")], + child_has_children=[False, False], + cache_sequence=2, + next_page_token="7:abc:2", + ), + _build_browse_reply( + children=[_obj(73, "Child_3")], + child_has_children=[False], + cache_sequence=2, + ), + ] + client = await GalaxyRepositoryClient.connect( + ClientOptions(endpoint="fake", plaintext=True), + stub=stub, + ) + + roots = await client.browse() + await roots[0].expand() + + assert [n.object.tag_name for n in roots[0].children] == ["Child_1", "Child_2", "Child_3"] + assert len(stub.browse_children.requests) == 3 + assert stub.browse_children.requests[2].page_token == "7:abc:2" + assert stub.browse_children.requests[2].parent_gobject_id == 7 + + +@pytest.mark.asyncio +async def test_browse_with_filter_forwards_to_request() -> None: + stub = FakeGalaxyStub() + stub.browse_children.replies = [ + _build_browse_reply( + children=[_obj(1, "Area_A", is_area=True)], + child_has_children=[False], + cache_sequence=3, + ), + ] + client = await GalaxyRepositoryClient.connect( + ClientOptions(endpoint="fake", plaintext=True), + stub=stub, + ) + options = BrowseChildrenOptions( + category_ids=(4, 5), + template_chain_contains=("$DelmiaReceiver",), + tag_name_glob="Area_*", + include_attributes=True, + alarm_bearing_only=True, + historized_only=True, + ) + + await client.browse(options) + + request = stub.browse_children.requests[0] + assert list(request.category_ids) == [4, 5] + assert list(request.template_chain_contains) == ["$DelmiaReceiver"] + assert request.tag_name_glob == "Area_*" + assert request.HasField("include_attributes") + assert request.include_attributes is True + assert request.alarm_bearing_only is True + assert request.historized_only is True + + class FakeGalaxyStub: def __init__(self) -> None: self.test_connection = FakeUnary([galaxy_pb.TestConnectionReply(ok=False)]) self.get_last_deploy_time = FakeUnary([galaxy_pb.GetLastDeployTimeReply(present=False)]) self.discover_hierarchy = FakeUnary([galaxy_pb.DiscoverHierarchyReply()]) + self.browse_children = FakeUnary([galaxy_pb.BrowseChildrenReply()]) self.watch_deploy_events = FakeStream([]) self.TestConnection = self.test_connection self.GetLastDeployTime = self.get_last_deploy_time self.DiscoverHierarchy = self.discover_hierarchy + self.BrowseChildren = self.browse_children @property def WatchDeployEvents(self) -> "FakeStream": # noqa: N802 — gRPC naming @@ -287,6 +506,7 @@ class FakeUnary: def __init__(self, replies: list[Any]) -> None: self.replies = replies self.requests: list[Any] = [] + self.exceptions: list[BaseException] = [] self.metadata: tuple[tuple[str, str], ...] | None = None async def __call__( @@ -298,6 +518,10 @@ class FakeUnary: ) -> Any: self.requests.append(request) self.metadata = metadata + if self.exceptions: + exc = self.exceptions.pop(0) + if exc is not None: + raise exc return self.replies.pop(0)