client/python: make LazyBrowseNode.expand concurrency-safe
This commit is contained in:
@@ -292,6 +292,7 @@ class LazyBrowseNode:
|
|||||||
self._options = options
|
self._options = options
|
||||||
self._children: list[LazyBrowseNode] = []
|
self._children: list[LazyBrowseNode] = []
|
||||||
self._is_expanded = False
|
self._is_expanded = False
|
||||||
|
self._expand_lock = asyncio.Lock()
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def object(self) -> galaxy_pb.GalaxyObject:
|
def object(self) -> galaxy_pb.GalaxyObject:
|
||||||
@@ -317,14 +318,17 @@ class LazyBrowseNode:
|
|||||||
"""Fetch direct children of this node; no-op on subsequent calls."""
|
"""Fetch direct children of this node; no-op on subsequent calls."""
|
||||||
if self._is_expanded:
|
if self._is_expanded:
|
||||||
return
|
return
|
||||||
new_children: list[LazyBrowseNode] = []
|
async with self._expand_lock:
|
||||||
async for child in self._client._iter_browse_children(
|
if self._is_expanded:
|
||||||
parent_gobject_id=self._object.gobject_id,
|
return
|
||||||
options=self._options,
|
new_children: list[LazyBrowseNode] = []
|
||||||
):
|
async for child in self._client._iter_browse_children(
|
||||||
new_children.append(child)
|
parent_gobject_id=self._object.gobject_id,
|
||||||
self._children.extend(new_children)
|
options=self._options,
|
||||||
self._is_expanded = True
|
):
|
||||||
|
new_children.append(child)
|
||||||
|
self._children.extend(new_children)
|
||||||
|
self._is_expanded = True
|
||||||
|
|
||||||
|
|
||||||
async def _canceling_iterator(call: Any) -> AsyncIterator[galaxy_pb.DeployEvent]:
|
async def _canceling_iterator(call: Any) -> AsyncIterator[galaxy_pb.DeployEvent]:
|
||||||
|
|||||||
@@ -391,6 +391,28 @@ async def test_browse_expand_idempotent_no_second_rpc() -> None:
|
|||||||
assert len(roots[0].children) == 1
|
assert len(roots[0].children) == 1
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_browse_expand_concurrent_callers_only_fire_one_rpc() -> None:
|
||||||
|
stub = FakeGalaxyStub()
|
||||||
|
stub.browse_children.replies = [
|
||||||
|
_build_browse_reply([_obj(1, "Plant", is_area=True)], [True], 7),
|
||||||
|
_build_browse_reply([_obj(2, "Mixer_001")], [False], 7),
|
||||||
|
]
|
||||||
|
client = await GalaxyRepositoryClient.connect(
|
||||||
|
ClientOptions(endpoint="fake", plaintext=True),
|
||||||
|
stub=stub,
|
||||||
|
)
|
||||||
|
|
||||||
|
roots = await client.browse()
|
||||||
|
# Ten concurrent expand calls on the same node should issue exactly one RPC.
|
||||||
|
await asyncio.gather(*(roots[0].expand() for _ in range(10)))
|
||||||
|
|
||||||
|
assert roots[0].is_expanded
|
||||||
|
assert len(roots[0].children) == 1
|
||||||
|
# 1 roots fetch + exactly 1 expand fetch = 2 total
|
||||||
|
assert len(stub.browse_children.requests) == 2
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_browse_expand_unknown_parent_raises_mxgateway_error() -> None:
|
async def test_browse_expand_unknown_parent_raises_mxgateway_error() -> None:
|
||||||
stub = FakeGalaxyStub()
|
stub = FakeGalaxyStub()
|
||||||
@@ -506,7 +528,8 @@ class FakeUnary:
|
|||||||
def __init__(self, replies: list[Any]) -> None:
|
def __init__(self, replies: list[Any]) -> None:
|
||||||
self.replies = replies
|
self.replies = replies
|
||||||
self.requests: list[Any] = []
|
self.requests: list[Any] = []
|
||||||
self.exceptions: list[BaseException] = []
|
# None entries mean "no exception on this call"; aligns with the replies queue index-by-index.
|
||||||
|
self.exceptions: list[BaseException | None] = []
|
||||||
self.metadata: tuple[tuple[str, str], ...] | None = None
|
self.metadata: tuple[tuple[str, str], ...] | None = None
|
||||||
|
|
||||||
async def __call__(
|
async def __call__(
|
||||||
|
|||||||
Reference in New Issue
Block a user