From a4467e23ef2fc0852fd427fe568bea4438d1c1dc Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Thu, 28 May 2026 14:30:39 -0400 Subject: [PATCH] client/python: make LazyBrowseNode.expand concurrency-safe --- .../python/src/zb_mom_ww_mxgateway/galaxy.py | 20 +++++++++------ clients/python/tests/test_galaxy.py | 25 ++++++++++++++++++- 2 files changed, 36 insertions(+), 9 deletions(-) diff --git a/clients/python/src/zb_mom_ww_mxgateway/galaxy.py b/clients/python/src/zb_mom_ww_mxgateway/galaxy.py index 42f29e5..8acd4ac 100644 --- a/clients/python/src/zb_mom_ww_mxgateway/galaxy.py +++ b/clients/python/src/zb_mom_ww_mxgateway/galaxy.py @@ -292,6 +292,7 @@ class LazyBrowseNode: self._options = options self._children: list[LazyBrowseNode] = [] self._is_expanded = False + self._expand_lock = asyncio.Lock() @property def object(self) -> galaxy_pb.GalaxyObject: @@ -317,14 +318,17 @@ class LazyBrowseNode: """Fetch direct children of this node; no-op on subsequent calls.""" if self._is_expanded: return - new_children: list[LazyBrowseNode] = [] - async for child in self._client._iter_browse_children( - parent_gobject_id=self._object.gobject_id, - options=self._options, - ): - new_children.append(child) - self._children.extend(new_children) - self._is_expanded = True + async with self._expand_lock: + if self._is_expanded: + return + new_children: list[LazyBrowseNode] = [] + async for child in self._client._iter_browse_children( + parent_gobject_id=self._object.gobject_id, + options=self._options, + ): + new_children.append(child) + self._children.extend(new_children) + self._is_expanded = True async def _canceling_iterator(call: Any) -> AsyncIterator[galaxy_pb.DeployEvent]: diff --git a/clients/python/tests/test_galaxy.py b/clients/python/tests/test_galaxy.py index f67ab34..b0bdd0a 100644 --- a/clients/python/tests/test_galaxy.py +++ b/clients/python/tests/test_galaxy.py @@ -391,6 +391,28 @@ async def test_browse_expand_idempotent_no_second_rpc() -> None: assert len(roots[0].children) == 1 +@pytest.mark.asyncio +async def test_browse_expand_concurrent_callers_only_fire_one_rpc() -> None: + stub = FakeGalaxyStub() + stub.browse_children.replies = [ + _build_browse_reply([_obj(1, "Plant", is_area=True)], [True], 7), + _build_browse_reply([_obj(2, "Mixer_001")], [False], 7), + ] + client = await GalaxyRepositoryClient.connect( + ClientOptions(endpoint="fake", plaintext=True), + stub=stub, + ) + + roots = await client.browse() + # Ten concurrent expand calls on the same node should issue exactly one RPC. + await asyncio.gather(*(roots[0].expand() for _ in range(10))) + + assert roots[0].is_expanded + assert len(roots[0].children) == 1 + # 1 roots fetch + exactly 1 expand fetch = 2 total + assert len(stub.browse_children.requests) == 2 + + @pytest.mark.asyncio async def test_browse_expand_unknown_parent_raises_mxgateway_error() -> None: stub = FakeGalaxyStub() @@ -506,7 +528,8 @@ class FakeUnary: def __init__(self, replies: list[Any]) -> None: self.replies = replies self.requests: list[Any] = [] - self.exceptions: list[BaseException] = [] + # None entries mean "no exception on this call"; aligns with the replies queue index-by-index. + self.exceptions: list[BaseException | None] = [] self.metadata: tuple[tuple[str, str], ...] | None = None async def __call__(