client/rust: LazyBrowseNode walker for lazy hierarchy browse

This commit is contained in:
Joseph Doherty
2026-05-28 14:26:05 -04:00
parent 1d8c0d83c4
commit 555e4be51f
+527 -4
View File
@@ -5,9 +5,12 @@
//! read-only RPCs as Rust async methods. Generated Galaxy proto types are
//! re-exported through [`crate::generated::galaxy_repository::v1`].
use std::collections::HashSet;
use std::fs;
use std::sync::Arc;
use prost_types::Timestamp;
use tokio::sync::Mutex as AsyncMutex;
use tonic::codegen::InterceptedService;
use tonic::transport::{Certificate, Channel, ClientTlsConfig};
use tonic::Request;
@@ -16,12 +19,130 @@ use crate::auth::AuthInterceptor;
use crate::error::Error;
use crate::generated::galaxy_repository::v1::galaxy_repository_client::GalaxyRepositoryClient;
use crate::generated::galaxy_repository::v1::{
DeployEvent, DiscoverHierarchyRequest, GalaxyObject, GetLastDeployTimeRequest,
TestConnectionRequest, WatchDeployEventsRequest,
browse_children_request, BrowseChildrenReply, BrowseChildrenRequest, DeployEvent,
DiscoverHierarchyRequest, GalaxyObject, GetLastDeployTimeRequest, TestConnectionRequest,
WatchDeployEventsRequest,
};
use crate::options::ClientOptions;
const DISCOVER_HIERARCHY_PAGE_SIZE: i32 = 5000;
const BROWSE_CHILDREN_PAGE_SIZE: i32 = 500;
/// Optional filter set forwarded to `GalaxyRepository.BrowseChildren`.
///
/// Mirrors the request-level filters on the wire: combined with AND so a child
/// only appears when it satisfies every populated criterion. Construct via
/// [`BrowseChildrenOptions::default`] and tweak the fields you care about.
#[derive(Debug, Clone, Default)]
pub struct BrowseChildrenOptions {
/// Restrict to objects whose `category_id` matches one of the supplied
/// Galaxy category identifiers. Empty means "no restriction".
pub category_ids: Vec<i32>,
/// Restrict to objects whose template chain contains every supplied
/// template name (case-sensitive substring match on each entry).
pub template_chain_contains: Vec<String>,
/// Restrict to objects whose tag name matches the supplied glob (SQL
/// `LIKE`-style on the server). `None` means "no glob filter".
pub tag_name_glob: Option<String>,
/// Optional tri-state hint for whether to populate `GalaxyObject.attributes`
/// on returned children. `None` falls back to the server default.
pub include_attributes: Option<bool>,
/// When `true`, only return children that own at least one alarm-bearing
/// attribute (matches `DiscoverHierarchy` semantics).
pub alarm_bearing_only: bool,
/// When `true`, only return children that own at least one historized
/// attribute (matches `DiscoverHierarchy` semantics).
pub historized_only: bool,
}
/// Lazy hierarchy node used by the walker built on top of `BrowseChildren`.
///
/// A node owns its [`GalaxyObject`], a hint as to whether the server believes
/// it has at least one matching descendant under the active filter set, and an
/// internal `expanded` flag protected by an async mutex. Calling [`expand`]
/// the first time issues a paged `BrowseChildren` RPC; subsequent calls are
/// no-ops so callers can poll without re-hitting the server.
///
/// `LazyBrowseNode` is cheap to clone — clones share state through an
/// internal `Arc`, so expanding one clone makes the children visible to every
/// other clone.
///
/// [`expand`]: LazyBrowseNode::expand
pub struct LazyBrowseNode {
inner: Arc<LazyBrowseNodeInner>,
}
impl Clone for LazyBrowseNode {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
}
}
}
struct LazyBrowseNodeInner {
client: GalaxyClient,
object: GalaxyObject,
has_children_hint: bool,
options: BrowseChildrenOptions,
state: AsyncMutex<LazyBrowseNodeState>,
}
struct LazyBrowseNodeState {
children: Vec<LazyBrowseNode>,
is_expanded: bool,
}
impl LazyBrowseNode {
/// Borrow the [`GalaxyObject`] returned by the server for this node.
pub fn object(&self) -> &GalaxyObject {
&self.inner.object
}
/// Server-supplied hint: `true` when the child likely has at least one
/// further matching descendant. Useful to decide whether a UI should draw
/// an expand triangle without issuing the RPC up front.
pub fn has_children_hint(&self) -> bool {
self.inner.has_children_hint
}
/// Snapshot of the currently-known children. Empty until [`expand`] has
/// run at least once.
///
/// [`expand`]: LazyBrowseNode::expand
pub async fn children(&self) -> Vec<LazyBrowseNode> {
self.inner.state.lock().await.children.clone()
}
/// Returns `true` once [`expand`] has populated this node's children.
///
/// [`expand`]: LazyBrowseNode::expand
pub async fn is_expanded(&self) -> bool {
self.inner.state.lock().await.is_expanded
}
/// Populate this node's children by issuing a paged `BrowseChildren` RPC.
/// Subsequent calls are no-ops — the cached children stay in place and no
/// additional RPC is issued.
pub async fn expand(&self) -> Result<(), Error> {
let mut state = self.inner.state.lock().await;
if state.is_expanded {
return Ok(());
}
let mut client = self.inner.client.clone();
let new_children = client
.browse_children_inner(
Some(self.inner.object.gobject_id),
self.inner.options.clone(),
)
.await?;
state.children = new_children;
state.is_expanded = true;
Ok(())
}
}
/// Convenience alias for the generated Galaxy client wrapped in the
/// authentication interceptor.
@@ -172,6 +293,99 @@ impl GalaxyClient {
}
}
/// Browse the top-level (root) objects of the hierarchy as
/// [`LazyBrowseNode`] instances. Pass [`BrowseChildrenOptions`] to
/// restrict the result set; the same filter is reused when callers expand
/// any returned node.
pub async fn browse(
&mut self,
options: Option<BrowseChildrenOptions>,
) -> Result<Vec<LazyBrowseNode>, Error> {
let effective = options.unwrap_or_default();
self.browse_children_inner(None, effective).await
}
/// Issue a single `BrowseChildren` RPC and return the raw reply. Callers
/// that want to drive paging themselves (or inspect the cache sequence)
/// use this; high-level walking goes through [`browse`] and
/// [`LazyBrowseNode::expand`].
///
/// [`browse`]: GalaxyClient::browse
pub async fn browse_children_raw(
&mut self,
request: BrowseChildrenRequest,
) -> Result<BrowseChildrenReply, Error> {
let response = self
.inner
.browse_children(self.unary_request(request))
.await?;
Ok(response.into_inner())
}
pub(crate) async fn browse_children_inner(
&mut self,
parent_gobject_id: Option<i32>,
options: BrowseChildrenOptions,
) -> Result<Vec<LazyBrowseNode>, Error> {
let mut nodes = Vec::new();
let mut page_token = String::new();
let mut seen_page_tokens: HashSet<String> = HashSet::new();
loop {
let parent = parent_gobject_id.map(browse_children_request::Parent::ParentGobjectId);
let request = BrowseChildrenRequest {
page_size: BROWSE_CHILDREN_PAGE_SIZE,
page_token: page_token.clone(),
category_ids: options.category_ids.clone(),
template_chain_contains: options.template_chain_contains.clone(),
tag_name_glob: options.tag_name_glob.clone().unwrap_or_default(),
include_attributes: options.include_attributes,
alarm_bearing_only: options.alarm_bearing_only,
historized_only: options.historized_only,
parent,
};
let reply = self.browse_children_raw(request).await?;
let hints = reply.child_has_children;
for (index, object) in reply.children.into_iter().enumerate() {
let hint = hints.get(index).copied().unwrap_or(false);
nodes.push(self.make_lazy_node(object, hint, options.clone()));
}
page_token = reply.next_page_token;
if page_token.is_empty() {
return Ok(nodes);
}
if !seen_page_tokens.insert(page_token.clone()) {
return Err(Error::InvalidArgument {
name: "page_token".to_owned(),
detail: format!(
"galaxy browse children returned repeated page token `{page_token}`"
),
});
}
}
}
fn make_lazy_node(
&self,
object: GalaxyObject,
has_children_hint: bool,
options: BrowseChildrenOptions,
) -> LazyBrowseNode {
LazyBrowseNode {
inner: Arc::new(LazyBrowseNodeInner {
client: self.clone(),
object,
has_children_hint,
options,
state: AsyncMutex::new(LazyBrowseNodeState {
children: Vec::new(),
is_expanded: false,
}),
}),
}
}
/// Subscribe to the server-streamed deploy-event feed.
///
/// The server emits a bootstrap event describing the current cache state
@@ -250,6 +464,9 @@ mod tests {
objects: Mutex<Vec<GalaxyObject>>,
discover_requests: Mutex<Vec<DiscoverHierarchyRequest>>,
discover_replies: Mutex<std::collections::VecDeque<DiscoverHierarchyReply>>,
browse_children_calls: Mutex<Vec<BrowseChildrenRequest>>,
browse_children_replies: Mutex<std::collections::VecDeque<BrowseChildrenReply>>,
browse_children_errors: Mutex<Vec<Status>>,
watch_requests: Mutex<Vec<WatchDeployEventsRequest>>,
watch_events: Mutex<Vec<DeployEvent>>,
watch_senders: Mutex<Vec<DeployEventTx>>,
@@ -309,9 +526,24 @@ mod tests {
async fn browse_children(
&self,
_request: Request<BrowseChildrenRequest>,
request: Request<BrowseChildrenRequest>,
) -> Result<Response<BrowseChildrenReply>, Status> {
Err(Status::unimplemented("browse_children not implemented in FakeGalaxy"))
self.state
.browse_children_calls
.lock()
.unwrap()
.push(request.into_inner());
if let Some(error) = self.state.browse_children_errors.lock().unwrap().pop() {
return Err(error);
}
let reply = self
.state
.browse_children_replies
.lock()
.unwrap()
.pop_front()
.unwrap_or_default();
Ok(Response::new(reply))
}
type WatchDeployEventsStream =
@@ -703,4 +935,295 @@ mod tests {
"drop signal channel closed unexpectedly"
);
}
fn browse_obj(gid: i32, tag: &str, is_area: bool) -> GalaxyObject {
GalaxyObject {
gobject_id: gid,
tag_name: tag.to_owned(),
contained_name: String::new(),
browse_name: tag.to_owned(),
parent_gobject_id: 0,
is_area,
category_id: 0,
hosted_by_gobject_id: 0,
template_chain: Vec::new(),
attributes: Vec::new(),
}
}
fn build_browse_reply(
children: Vec<GalaxyObject>,
child_has_children: Vec<bool>,
cache_sequence: u64,
) -> BrowseChildrenReply {
BrowseChildrenReply {
total_child_count: children.len() as i32,
cache_sequence,
children,
child_has_children,
next_page_token: String::new(),
}
}
#[tokio::test]
async fn browse_no_parent_returns_roots() {
let state = Arc::new(FakeState::default());
state
.browse_children_replies
.lock()
.unwrap()
.push_back(build_browse_reply(
vec![browse_obj(1, "Area_A", true), browse_obj(2, "Area_B", true)],
vec![true, false],
7,
));
let endpoint = spawn_fake(state.clone()).await;
let mut client = GalaxyClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let roots = client.browse(None).await.unwrap();
assert_eq!(roots.len(), 2);
assert_eq!(roots[0].object().tag_name, "Area_A");
assert!(roots[0].has_children_hint());
assert_eq!(roots[1].object().tag_name, "Area_B");
assert!(!roots[1].has_children_hint());
let calls = state.browse_children_calls.lock().unwrap();
assert_eq!(calls.len(), 1);
assert!(
calls[0].parent.is_none(),
"root browse must send an empty parent oneof, got {:?}",
calls[0].parent
);
}
#[tokio::test]
async fn browse_expand_populates_children_and_marks_expanded() {
let state = Arc::new(FakeState::default());
// First call: roots.
state
.browse_children_replies
.lock()
.unwrap()
.push_back(build_browse_reply(
vec![browse_obj(10, "Area_A", true)],
vec![true],
1,
));
// Second call: children of gobject 10.
state
.browse_children_replies
.lock()
.unwrap()
.push_back(build_browse_reply(
vec![browse_obj(11, "Receiver_1", false)],
vec![false],
1,
));
let endpoint = spawn_fake(state.clone()).await;
let mut client = GalaxyClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let roots = client.browse(None).await.unwrap();
let root = roots.into_iter().next().expect("at least one root");
assert!(!root.is_expanded().await);
root.expand().await.unwrap();
assert!(root.is_expanded().await);
let children = root.children().await;
assert_eq!(children.len(), 1);
assert_eq!(children[0].object().tag_name, "Receiver_1");
let calls = state.browse_children_calls.lock().unwrap();
assert_eq!(calls.len(), 2);
let expand_call = &calls[1];
match expand_call.parent.as_ref().expect("expand sends parent") {
browse_children_request::Parent::ParentGobjectId(id) => assert_eq!(*id, 10),
other => panic!("expected ParentGobjectId variant, got {other:?}"),
}
}
#[tokio::test]
async fn browse_expand_idempotent_no_second_rpc() {
let state = Arc::new(FakeState::default());
state
.browse_children_replies
.lock()
.unwrap()
.push_back(build_browse_reply(
vec![browse_obj(20, "Area_X", true)],
vec![true],
1,
));
state
.browse_children_replies
.lock()
.unwrap()
.push_back(build_browse_reply(
vec![browse_obj(21, "Leaf", false)],
vec![false],
1,
));
let endpoint = spawn_fake(state.clone()).await;
let mut client = GalaxyClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let roots = client.browse(None).await.unwrap();
let root = roots.into_iter().next().unwrap();
root.expand().await.unwrap();
let after_first = state.browse_children_calls.lock().unwrap().len();
// Calling expand a second time must NOT issue a new RPC.
root.expand().await.unwrap();
let after_second = state.browse_children_calls.lock().unwrap().len();
assert_eq!(
after_first, after_second,
"expand should be idempotent — no extra RPC the second time"
);
assert_eq!(root.children().await.len(), 1);
}
#[tokio::test]
async fn browse_expand_unknown_parent_returns_not_found_error() {
let state = Arc::new(FakeState::default());
// Root browse succeeds.
state
.browse_children_replies
.lock()
.unwrap()
.push_back(build_browse_reply(
vec![browse_obj(99, "GhostArea", true)],
vec![true],
1,
));
let endpoint = spawn_fake(state.clone()).await;
let mut client = GalaxyClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let roots = client.browse(None).await.unwrap();
let root = roots.into_iter().next().unwrap();
// Seed the NotFound only AFTER the root call so the FakeGalaxy's
// error stack doesn't intercept the initial browse.
state
.browse_children_errors
.lock()
.unwrap()
.push(Status::not_found("parent gobject 99 not present in cache"));
let error = root.expand().await.unwrap_err();
match &error {
Error::Status(status) => {
assert_eq!(status.code(), tonic::Code::NotFound);
}
other => panic!("expected Error::Status(NotFound), got {other:?}"),
}
// Failed expand must NOT mark the node as expanded — caller can retry.
assert!(!root.is_expanded().await);
assert!(root.children().await.is_empty());
}
#[tokio::test]
async fn browse_expand_multi_page_gathers_all_pages() {
let state = Arc::new(FakeState::default());
// First reply: roots.
state
.browse_children_replies
.lock()
.unwrap()
.push_back(build_browse_reply(
vec![browse_obj(30, "Plant", true)],
vec![true],
5,
));
// Second reply: page 1 of children, with a next_page_token.
let mut page_one = build_browse_reply(
vec![
browse_obj(31, "Child_A", false),
browse_obj(32, "Child_B", false),
],
vec![false, false],
5,
);
page_one.next_page_token = "cursor-2".to_owned();
page_one.total_child_count = 3;
state
.browse_children_replies
.lock()
.unwrap()
.push_back(page_one);
// Third reply: page 2 of children, with no next page.
state
.browse_children_replies
.lock()
.unwrap()
.push_back(build_browse_reply(
vec![browse_obj(33, "Child_C", false)],
vec![false],
5,
));
let endpoint = spawn_fake(state.clone()).await;
let mut client = GalaxyClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let roots = client.browse(None).await.unwrap();
let root = roots.into_iter().next().unwrap();
root.expand().await.unwrap();
let children = root.children().await;
assert_eq!(children.len(), 3);
assert_eq!(children[0].object().tag_name, "Child_A");
assert_eq!(children[1].object().tag_name, "Child_B");
assert_eq!(children[2].object().tag_name, "Child_C");
let calls = state.browse_children_calls.lock().unwrap();
// 1 root call + 2 paged expand calls = 3 total.
assert_eq!(calls.len(), 3);
assert_eq!(calls[1].page_token, "");
assert_eq!(calls[2].page_token, "cursor-2");
}
#[tokio::test]
async fn browse_with_filter_forwards_to_request() {
let state = Arc::new(FakeState::default());
state
.browse_children_replies
.lock()
.unwrap()
.push_back(build_browse_reply(Vec::new(), Vec::new(), 1));
let endpoint = spawn_fake(state.clone()).await;
let mut client = GalaxyClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let options = BrowseChildrenOptions {
category_ids: vec![3, 5],
template_chain_contains: vec!["$DelmiaReceiver".to_owned()],
tag_name_glob: Some("Recv_*".to_owned()),
include_attributes: Some(true),
alarm_bearing_only: true,
historized_only: false,
};
let _ = client.browse(Some(options)).await.unwrap();
let calls = state.browse_children_calls.lock().unwrap();
assert_eq!(calls.len(), 1);
let req = &calls[0];
assert_eq!(req.category_ids, vec![3, 5]);
assert_eq!(req.template_chain_contains, vec!["$DelmiaReceiver"]);
assert_eq!(req.tag_name_glob, "Recv_*");
assert_eq!(req.include_attributes, Some(true));
assert!(req.alarm_bearing_only);
assert!(!req.historized_only);
}
}