client/python: LazyBrowseNode walker for lazy hierarchy browse
This commit is contained in:
@@ -6,12 +6,16 @@ import asyncio
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any
|
||||
|
||||
import grpc
|
||||
import pytest
|
||||
from google.protobuf.timestamp_pb2 import Timestamp
|
||||
|
||||
from zb_mom_ww_mxgateway import ClientOptions, DeployEvent, GalaxyRepositoryClient, WatchDeployEventsRequest
|
||||
from zb_mom_ww_mxgateway.errors import MxGatewayError
|
||||
from zb_mom_ww_mxgateway.galaxy import LazyBrowseNode
|
||||
from zb_mom_ww_mxgateway.generated import galaxy_repository_pb2 as galaxy_pb
|
||||
from zb_mom_ww_mxgateway.generated import galaxy_repository_pb2_grpc as galaxy_pb_grpc
|
||||
from zb_mom_ww_mxgateway.options import BrowseChildrenOptions
|
||||
|
||||
|
||||
def test_galaxy_messages_import() -> None:
|
||||
@@ -268,15 +272,230 @@ async def test_close_marks_channel_closed_when_no_real_channel() -> None:
|
||||
await client.close()
|
||||
|
||||
|
||||
def _obj(gid: int, tag: str, is_area: bool = False) -> galaxy_pb.GalaxyObject:
|
||||
return galaxy_pb.GalaxyObject(
|
||||
gobject_id=gid, tag_name=tag, browse_name=tag, is_area=is_area,
|
||||
)
|
||||
|
||||
|
||||
def _build_browse_reply(
|
||||
children: list[galaxy_pb.GalaxyObject],
|
||||
child_has_children: list[bool],
|
||||
cache_sequence: int,
|
||||
next_page_token: str = "",
|
||||
) -> galaxy_pb.BrowseChildrenReply:
|
||||
reply = galaxy_pb.BrowseChildrenReply(
|
||||
total_child_count=len(children),
|
||||
cache_sequence=cache_sequence,
|
||||
next_page_token=next_page_token,
|
||||
)
|
||||
reply.children.extend(children)
|
||||
reply.child_has_children.extend(child_has_children)
|
||||
return reply
|
||||
|
||||
|
||||
def _fake_aio_rpc_error(code: grpc.StatusCode, details: str) -> grpc.aio.AioRpcError:
|
||||
return grpc.aio.AioRpcError(
|
||||
code=code,
|
||||
initial_metadata=grpc.aio.Metadata(),
|
||||
trailing_metadata=grpc.aio.Metadata(),
|
||||
details=details,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_browse_no_parent_returns_roots() -> None:
|
||||
stub = FakeGalaxyStub()
|
||||
stub.browse_children.replies = [
|
||||
_build_browse_reply(
|
||||
children=[_obj(1, "Area_A", is_area=True), _obj(2, "Area_B", is_area=True)],
|
||||
child_has_children=[True, False],
|
||||
cache_sequence=7,
|
||||
),
|
||||
]
|
||||
client = await GalaxyRepositoryClient.connect(
|
||||
ClientOptions(endpoint="fake", plaintext=True),
|
||||
stub=stub,
|
||||
)
|
||||
|
||||
roots = await client.browse()
|
||||
|
||||
assert len(roots) == 2
|
||||
assert all(isinstance(node, LazyBrowseNode) for node in roots)
|
||||
assert roots[0].object.tag_name == "Area_A"
|
||||
assert roots[0].has_children_hint is True
|
||||
assert roots[1].has_children_hint is False
|
||||
assert roots[0].is_expanded is False
|
||||
request = stub.browse_children.requests[0]
|
||||
assert request.WhichOneof("parent") is None
|
||||
assert request.page_size == 500
|
||||
assert request.page_token == ""
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_browse_expand_populates_children_and_marks_expanded() -> None:
|
||||
stub = FakeGalaxyStub()
|
||||
stub.browse_children.replies = [
|
||||
_build_browse_reply(
|
||||
children=[_obj(1, "Area_A", is_area=True)],
|
||||
child_has_children=[True],
|
||||
cache_sequence=1,
|
||||
),
|
||||
_build_browse_reply(
|
||||
children=[_obj(11, "Child_A"), _obj(12, "Child_B")],
|
||||
child_has_children=[False, False],
|
||||
cache_sequence=1,
|
||||
),
|
||||
]
|
||||
client = await GalaxyRepositoryClient.connect(
|
||||
ClientOptions(endpoint="fake", plaintext=True),
|
||||
stub=stub,
|
||||
)
|
||||
|
||||
roots = await client.browse()
|
||||
await roots[0].expand()
|
||||
|
||||
assert roots[0].is_expanded is True
|
||||
assert [n.object.tag_name for n in roots[0].children] == ["Child_A", "Child_B"]
|
||||
assert len(stub.browse_children.requests) == 2
|
||||
expand_request = stub.browse_children.requests[1]
|
||||
assert expand_request.WhichOneof("parent") == "parent_gobject_id"
|
||||
assert expand_request.parent_gobject_id == 1
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_browse_expand_idempotent_no_second_rpc() -> None:
|
||||
stub = FakeGalaxyStub()
|
||||
stub.browse_children.replies = [
|
||||
_build_browse_reply(
|
||||
children=[_obj(1, "Area_A", is_area=True)],
|
||||
child_has_children=[True],
|
||||
cache_sequence=1,
|
||||
),
|
||||
_build_browse_reply(
|
||||
children=[_obj(11, "Child_A")],
|
||||
child_has_children=[False],
|
||||
cache_sequence=1,
|
||||
),
|
||||
]
|
||||
client = await GalaxyRepositoryClient.connect(
|
||||
ClientOptions(endpoint="fake", plaintext=True),
|
||||
stub=stub,
|
||||
)
|
||||
|
||||
roots = await client.browse()
|
||||
await roots[0].expand()
|
||||
await roots[0].expand()
|
||||
|
||||
assert len(stub.browse_children.requests) == 2
|
||||
assert len(roots[0].children) == 1
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_browse_expand_unknown_parent_raises_mxgateway_error() -> None:
|
||||
stub = FakeGalaxyStub()
|
||||
stub.browse_children.replies = [
|
||||
_build_browse_reply(
|
||||
children=[_obj(99, "Stale_Parent", is_area=True)],
|
||||
child_has_children=[True],
|
||||
cache_sequence=1,
|
||||
),
|
||||
]
|
||||
stub.browse_children.exceptions = [
|
||||
None,
|
||||
_fake_aio_rpc_error(grpc.StatusCode.NOT_FOUND, "parent not found"),
|
||||
]
|
||||
client = await GalaxyRepositoryClient.connect(
|
||||
ClientOptions(endpoint="fake", plaintext=True),
|
||||
stub=stub,
|
||||
)
|
||||
|
||||
roots = await client.browse()
|
||||
with pytest.raises(MxGatewayError):
|
||||
await roots[0].expand()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_browse_expand_multi_page_gathers_all_pages() -> None:
|
||||
stub = FakeGalaxyStub()
|
||||
stub.browse_children.replies = [
|
||||
_build_browse_reply(
|
||||
children=[_obj(7, "Area_Big", is_area=True)],
|
||||
child_has_children=[True],
|
||||
cache_sequence=2,
|
||||
),
|
||||
_build_browse_reply(
|
||||
children=[_obj(71, "Child_1"), _obj(72, "Child_2")],
|
||||
child_has_children=[False, False],
|
||||
cache_sequence=2,
|
||||
next_page_token="7:abc:2",
|
||||
),
|
||||
_build_browse_reply(
|
||||
children=[_obj(73, "Child_3")],
|
||||
child_has_children=[False],
|
||||
cache_sequence=2,
|
||||
),
|
||||
]
|
||||
client = await GalaxyRepositoryClient.connect(
|
||||
ClientOptions(endpoint="fake", plaintext=True),
|
||||
stub=stub,
|
||||
)
|
||||
|
||||
roots = await client.browse()
|
||||
await roots[0].expand()
|
||||
|
||||
assert [n.object.tag_name for n in roots[0].children] == ["Child_1", "Child_2", "Child_3"]
|
||||
assert len(stub.browse_children.requests) == 3
|
||||
assert stub.browse_children.requests[2].page_token == "7:abc:2"
|
||||
assert stub.browse_children.requests[2].parent_gobject_id == 7
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_browse_with_filter_forwards_to_request() -> None:
|
||||
stub = FakeGalaxyStub()
|
||||
stub.browse_children.replies = [
|
||||
_build_browse_reply(
|
||||
children=[_obj(1, "Area_A", is_area=True)],
|
||||
child_has_children=[False],
|
||||
cache_sequence=3,
|
||||
),
|
||||
]
|
||||
client = await GalaxyRepositoryClient.connect(
|
||||
ClientOptions(endpoint="fake", plaintext=True),
|
||||
stub=stub,
|
||||
)
|
||||
options = BrowseChildrenOptions(
|
||||
category_ids=(4, 5),
|
||||
template_chain_contains=("$DelmiaReceiver",),
|
||||
tag_name_glob="Area_*",
|
||||
include_attributes=True,
|
||||
alarm_bearing_only=True,
|
||||
historized_only=True,
|
||||
)
|
||||
|
||||
await client.browse(options)
|
||||
|
||||
request = stub.browse_children.requests[0]
|
||||
assert list(request.category_ids) == [4, 5]
|
||||
assert list(request.template_chain_contains) == ["$DelmiaReceiver"]
|
||||
assert request.tag_name_glob == "Area_*"
|
||||
assert request.HasField("include_attributes")
|
||||
assert request.include_attributes is True
|
||||
assert request.alarm_bearing_only is True
|
||||
assert request.historized_only is True
|
||||
|
||||
|
||||
class FakeGalaxyStub:
|
||||
def __init__(self) -> None:
|
||||
self.test_connection = FakeUnary([galaxy_pb.TestConnectionReply(ok=False)])
|
||||
self.get_last_deploy_time = FakeUnary([galaxy_pb.GetLastDeployTimeReply(present=False)])
|
||||
self.discover_hierarchy = FakeUnary([galaxy_pb.DiscoverHierarchyReply()])
|
||||
self.browse_children = FakeUnary([galaxy_pb.BrowseChildrenReply()])
|
||||
self.watch_deploy_events = FakeStream([])
|
||||
self.TestConnection = self.test_connection
|
||||
self.GetLastDeployTime = self.get_last_deploy_time
|
||||
self.DiscoverHierarchy = self.discover_hierarchy
|
||||
self.BrowseChildren = self.browse_children
|
||||
|
||||
@property
|
||||
def WatchDeployEvents(self) -> "FakeStream": # noqa: N802 — gRPC naming
|
||||
@@ -287,6 +506,7 @@ class FakeUnary:
|
||||
def __init__(self, replies: list[Any]) -> None:
|
||||
self.replies = replies
|
||||
self.requests: list[Any] = []
|
||||
self.exceptions: list[BaseException] = []
|
||||
self.metadata: tuple[tuple[str, str], ...] | None = None
|
||||
|
||||
async def __call__(
|
||||
@@ -298,6 +518,10 @@ class FakeUnary:
|
||||
) -> Any:
|
||||
self.requests.append(request)
|
||||
self.metadata = metadata
|
||||
if self.exceptions:
|
||||
exc = self.exceptions.pop(0)
|
||||
if exc is not None:
|
||||
raise exc
|
||||
return self.replies.pop(0)
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user