diff --git a/clients/rust/src/galaxy.rs b/clients/rust/src/galaxy.rs index 3365884..8aee0dd 100644 --- a/clients/rust/src/galaxy.rs +++ b/clients/rust/src/galaxy.rs @@ -5,9 +5,12 @@ //! read-only RPCs as Rust async methods. Generated Galaxy proto types are //! re-exported through [`crate::generated::galaxy_repository::v1`]. +use std::collections::HashSet; use std::fs; +use std::sync::Arc; use prost_types::Timestamp; +use tokio::sync::Mutex as AsyncMutex; use tonic::codegen::InterceptedService; use tonic::transport::{Certificate, Channel, ClientTlsConfig}; use tonic::Request; @@ -16,12 +19,130 @@ use crate::auth::AuthInterceptor; use crate::error::Error; use crate::generated::galaxy_repository::v1::galaxy_repository_client::GalaxyRepositoryClient; use crate::generated::galaxy_repository::v1::{ - DeployEvent, DiscoverHierarchyRequest, GalaxyObject, GetLastDeployTimeRequest, - TestConnectionRequest, WatchDeployEventsRequest, + browse_children_request, BrowseChildrenReply, BrowseChildrenRequest, DeployEvent, + DiscoverHierarchyRequest, GalaxyObject, GetLastDeployTimeRequest, TestConnectionRequest, + WatchDeployEventsRequest, }; use crate::options::ClientOptions; const DISCOVER_HIERARCHY_PAGE_SIZE: i32 = 5000; +const BROWSE_CHILDREN_PAGE_SIZE: i32 = 500; + +/// Optional filter set forwarded to `GalaxyRepository.BrowseChildren`. +/// +/// Mirrors the request-level filters on the wire: combined with AND so a child +/// only appears when it satisfies every populated criterion. Construct via +/// [`BrowseChildrenOptions::default`] and tweak the fields you care about. +#[derive(Debug, Clone, Default)] +pub struct BrowseChildrenOptions { + /// Restrict to objects whose `category_id` matches one of the supplied + /// Galaxy category identifiers. Empty means "no restriction". + pub category_ids: Vec, + /// Restrict to objects whose template chain contains every supplied + /// template name (case-sensitive substring match on each entry). + pub template_chain_contains: Vec, + /// Restrict to objects whose tag name matches the supplied glob (SQL + /// `LIKE`-style on the server). `None` means "no glob filter". + pub tag_name_glob: Option, + /// Optional tri-state hint for whether to populate `GalaxyObject.attributes` + /// on returned children. `None` falls back to the server default. + pub include_attributes: Option, + /// When `true`, only return children that own at least one alarm-bearing + /// attribute (matches `DiscoverHierarchy` semantics). + pub alarm_bearing_only: bool, + /// When `true`, only return children that own at least one historized + /// attribute (matches `DiscoverHierarchy` semantics). + pub historized_only: bool, +} + +/// Lazy hierarchy node used by the walker built on top of `BrowseChildren`. +/// +/// A node owns its [`GalaxyObject`], a hint as to whether the server believes +/// it has at least one matching descendant under the active filter set, and an +/// internal `expanded` flag protected by an async mutex. Calling [`expand`] +/// the first time issues a paged `BrowseChildren` RPC; subsequent calls are +/// no-ops so callers can poll without re-hitting the server. +/// +/// `LazyBrowseNode` is cheap to clone — clones share state through an +/// internal `Arc`, so expanding one clone makes the children visible to every +/// other clone. +/// +/// [`expand`]: LazyBrowseNode::expand +pub struct LazyBrowseNode { + inner: Arc, +} + +impl Clone for LazyBrowseNode { + fn clone(&self) -> Self { + Self { + inner: Arc::clone(&self.inner), + } + } +} + +struct LazyBrowseNodeInner { + client: GalaxyClient, + object: GalaxyObject, + has_children_hint: bool, + options: BrowseChildrenOptions, + state: AsyncMutex, +} + +struct LazyBrowseNodeState { + children: Vec, + is_expanded: bool, +} + +impl LazyBrowseNode { + /// Borrow the [`GalaxyObject`] returned by the server for this node. + pub fn object(&self) -> &GalaxyObject { + &self.inner.object + } + + /// Server-supplied hint: `true` when the child likely has at least one + /// further matching descendant. Useful to decide whether a UI should draw + /// an expand triangle without issuing the RPC up front. + pub fn has_children_hint(&self) -> bool { + self.inner.has_children_hint + } + + /// Snapshot of the currently-known children. Empty until [`expand`] has + /// run at least once. + /// + /// [`expand`]: LazyBrowseNode::expand + pub async fn children(&self) -> Vec { + self.inner.state.lock().await.children.clone() + } + + /// Returns `true` once [`expand`] has populated this node's children. + /// + /// [`expand`]: LazyBrowseNode::expand + pub async fn is_expanded(&self) -> bool { + self.inner.state.lock().await.is_expanded + } + + /// Populate this node's children by issuing a paged `BrowseChildren` RPC. + /// Subsequent calls are no-ops — the cached children stay in place and no + /// additional RPC is issued. + pub async fn expand(&self) -> Result<(), Error> { + let mut state = self.inner.state.lock().await; + if state.is_expanded { + return Ok(()); + } + + let mut client = self.inner.client.clone(); + let new_children = client + .browse_children_inner( + Some(self.inner.object.gobject_id), + self.inner.options.clone(), + ) + .await?; + + state.children = new_children; + state.is_expanded = true; + Ok(()) + } +} /// Convenience alias for the generated Galaxy client wrapped in the /// authentication interceptor. @@ -172,6 +293,99 @@ impl GalaxyClient { } } + /// Browse the top-level (root) objects of the hierarchy as + /// [`LazyBrowseNode`] instances. Pass [`BrowseChildrenOptions`] to + /// restrict the result set; the same filter is reused when callers expand + /// any returned node. + pub async fn browse( + &mut self, + options: Option, + ) -> Result, Error> { + let effective = options.unwrap_or_default(); + self.browse_children_inner(None, effective).await + } + + /// Issue a single `BrowseChildren` RPC and return the raw reply. Callers + /// that want to drive paging themselves (or inspect the cache sequence) + /// use this; high-level walking goes through [`browse`] and + /// [`LazyBrowseNode::expand`]. + /// + /// [`browse`]: GalaxyClient::browse + pub async fn browse_children_raw( + &mut self, + request: BrowseChildrenRequest, + ) -> Result { + let response = self + .inner + .browse_children(self.unary_request(request)) + .await?; + Ok(response.into_inner()) + } + + pub(crate) async fn browse_children_inner( + &mut self, + parent_gobject_id: Option, + options: BrowseChildrenOptions, + ) -> Result, Error> { + let mut nodes = Vec::new(); + let mut page_token = String::new(); + let mut seen_page_tokens: HashSet = HashSet::new(); + loop { + let parent = parent_gobject_id.map(browse_children_request::Parent::ParentGobjectId); + let request = BrowseChildrenRequest { + page_size: BROWSE_CHILDREN_PAGE_SIZE, + page_token: page_token.clone(), + category_ids: options.category_ids.clone(), + template_chain_contains: options.template_chain_contains.clone(), + tag_name_glob: options.tag_name_glob.clone().unwrap_or_default(), + include_attributes: options.include_attributes, + alarm_bearing_only: options.alarm_bearing_only, + historized_only: options.historized_only, + parent, + }; + + let reply = self.browse_children_raw(request).await?; + let hints = reply.child_has_children; + for (index, object) in reply.children.into_iter().enumerate() { + let hint = hints.get(index).copied().unwrap_or(false); + nodes.push(self.make_lazy_node(object, hint, options.clone())); + } + + page_token = reply.next_page_token; + if page_token.is_empty() { + return Ok(nodes); + } + if !seen_page_tokens.insert(page_token.clone()) { + return Err(Error::InvalidArgument { + name: "page_token".to_owned(), + detail: format!( + "galaxy browse children returned repeated page token `{page_token}`" + ), + }); + } + } + } + + fn make_lazy_node( + &self, + object: GalaxyObject, + has_children_hint: bool, + options: BrowseChildrenOptions, + ) -> LazyBrowseNode { + LazyBrowseNode { + inner: Arc::new(LazyBrowseNodeInner { + client: self.clone(), + object, + has_children_hint, + options, + state: AsyncMutex::new(LazyBrowseNodeState { + children: Vec::new(), + is_expanded: false, + }), + }), + } + } + /// Subscribe to the server-streamed deploy-event feed. /// /// The server emits a bootstrap event describing the current cache state @@ -250,6 +464,9 @@ mod tests { objects: Mutex>, discover_requests: Mutex>, discover_replies: Mutex>, + browse_children_calls: Mutex>, + browse_children_replies: Mutex>, + browse_children_errors: Mutex>, watch_requests: Mutex>, watch_events: Mutex>, watch_senders: Mutex>, @@ -309,9 +526,24 @@ mod tests { async fn browse_children( &self, - _request: Request, + request: Request, ) -> Result, Status> { - Err(Status::unimplemented("browse_children not implemented in FakeGalaxy")) + self.state + .browse_children_calls + .lock() + .unwrap() + .push(request.into_inner()); + if let Some(error) = self.state.browse_children_errors.lock().unwrap().pop() { + return Err(error); + } + let reply = self + .state + .browse_children_replies + .lock() + .unwrap() + .pop_front() + .unwrap_or_default(); + Ok(Response::new(reply)) } type WatchDeployEventsStream = @@ -703,4 +935,295 @@ mod tests { "drop signal channel closed unexpectedly" ); } + + fn browse_obj(gid: i32, tag: &str, is_area: bool) -> GalaxyObject { + GalaxyObject { + gobject_id: gid, + tag_name: tag.to_owned(), + contained_name: String::new(), + browse_name: tag.to_owned(), + parent_gobject_id: 0, + is_area, + category_id: 0, + hosted_by_gobject_id: 0, + template_chain: Vec::new(), + attributes: Vec::new(), + } + } + + fn build_browse_reply( + children: Vec, + child_has_children: Vec, + cache_sequence: u64, + ) -> BrowseChildrenReply { + BrowseChildrenReply { + total_child_count: children.len() as i32, + cache_sequence, + children, + child_has_children, + next_page_token: String::new(), + } + } + + #[tokio::test] + async fn browse_no_parent_returns_roots() { + let state = Arc::new(FakeState::default()); + state + .browse_children_replies + .lock() + .unwrap() + .push_back(build_browse_reply( + vec![browse_obj(1, "Area_A", true), browse_obj(2, "Area_B", true)], + vec![true, false], + 7, + )); + let endpoint = spawn_fake(state.clone()).await; + let mut client = GalaxyClient::connect(ClientOptions::new(endpoint)) + .await + .unwrap(); + + let roots = client.browse(None).await.unwrap(); + + assert_eq!(roots.len(), 2); + assert_eq!(roots[0].object().tag_name, "Area_A"); + assert!(roots[0].has_children_hint()); + assert_eq!(roots[1].object().tag_name, "Area_B"); + assert!(!roots[1].has_children_hint()); + + let calls = state.browse_children_calls.lock().unwrap(); + assert_eq!(calls.len(), 1); + assert!( + calls[0].parent.is_none(), + "root browse must send an empty parent oneof, got {:?}", + calls[0].parent + ); + } + + #[tokio::test] + async fn browse_expand_populates_children_and_marks_expanded() { + let state = Arc::new(FakeState::default()); + // First call: roots. + state + .browse_children_replies + .lock() + .unwrap() + .push_back(build_browse_reply( + vec![browse_obj(10, "Area_A", true)], + vec![true], + 1, + )); + // Second call: children of gobject 10. + state + .browse_children_replies + .lock() + .unwrap() + .push_back(build_browse_reply( + vec![browse_obj(11, "Receiver_1", false)], + vec![false], + 1, + )); + let endpoint = spawn_fake(state.clone()).await; + let mut client = GalaxyClient::connect(ClientOptions::new(endpoint)) + .await + .unwrap(); + + let roots = client.browse(None).await.unwrap(); + let root = roots.into_iter().next().expect("at least one root"); + assert!(!root.is_expanded().await); + + root.expand().await.unwrap(); + + assert!(root.is_expanded().await); + let children = root.children().await; + assert_eq!(children.len(), 1); + assert_eq!(children[0].object().tag_name, "Receiver_1"); + + let calls = state.browse_children_calls.lock().unwrap(); + assert_eq!(calls.len(), 2); + let expand_call = &calls[1]; + match expand_call.parent.as_ref().expect("expand sends parent") { + browse_children_request::Parent::ParentGobjectId(id) => assert_eq!(*id, 10), + other => panic!("expected ParentGobjectId variant, got {other:?}"), + } + } + + #[tokio::test] + async fn browse_expand_idempotent_no_second_rpc() { + let state = Arc::new(FakeState::default()); + state + .browse_children_replies + .lock() + .unwrap() + .push_back(build_browse_reply( + vec![browse_obj(20, "Area_X", true)], + vec![true], + 1, + )); + state + .browse_children_replies + .lock() + .unwrap() + .push_back(build_browse_reply( + vec![browse_obj(21, "Leaf", false)], + vec![false], + 1, + )); + let endpoint = spawn_fake(state.clone()).await; + let mut client = GalaxyClient::connect(ClientOptions::new(endpoint)) + .await + .unwrap(); + + let roots = client.browse(None).await.unwrap(); + let root = roots.into_iter().next().unwrap(); + root.expand().await.unwrap(); + let after_first = state.browse_children_calls.lock().unwrap().len(); + + // Calling expand a second time must NOT issue a new RPC. + root.expand().await.unwrap(); + + let after_second = state.browse_children_calls.lock().unwrap().len(); + assert_eq!( + after_first, after_second, + "expand should be idempotent — no extra RPC the second time" + ); + assert_eq!(root.children().await.len(), 1); + } + + #[tokio::test] + async fn browse_expand_unknown_parent_returns_not_found_error() { + let state = Arc::new(FakeState::default()); + // Root browse succeeds. + state + .browse_children_replies + .lock() + .unwrap() + .push_back(build_browse_reply( + vec![browse_obj(99, "GhostArea", true)], + vec![true], + 1, + )); + let endpoint = spawn_fake(state.clone()).await; + let mut client = GalaxyClient::connect(ClientOptions::new(endpoint)) + .await + .unwrap(); + + let roots = client.browse(None).await.unwrap(); + let root = roots.into_iter().next().unwrap(); + + // Seed the NotFound only AFTER the root call so the FakeGalaxy's + // error stack doesn't intercept the initial browse. + state + .browse_children_errors + .lock() + .unwrap() + .push(Status::not_found("parent gobject 99 not present in cache")); + + let error = root.expand().await.unwrap_err(); + + match &error { + Error::Status(status) => { + assert_eq!(status.code(), tonic::Code::NotFound); + } + other => panic!("expected Error::Status(NotFound), got {other:?}"), + } + // Failed expand must NOT mark the node as expanded — caller can retry. + assert!(!root.is_expanded().await); + assert!(root.children().await.is_empty()); + } + + #[tokio::test] + async fn browse_expand_multi_page_gathers_all_pages() { + let state = Arc::new(FakeState::default()); + // First reply: roots. + state + .browse_children_replies + .lock() + .unwrap() + .push_back(build_browse_reply( + vec![browse_obj(30, "Plant", true)], + vec![true], + 5, + )); + // Second reply: page 1 of children, with a next_page_token. + let mut page_one = build_browse_reply( + vec![ + browse_obj(31, "Child_A", false), + browse_obj(32, "Child_B", false), + ], + vec![false, false], + 5, + ); + page_one.next_page_token = "cursor-2".to_owned(); + page_one.total_child_count = 3; + state + .browse_children_replies + .lock() + .unwrap() + .push_back(page_one); + // Third reply: page 2 of children, with no next page. + state + .browse_children_replies + .lock() + .unwrap() + .push_back(build_browse_reply( + vec![browse_obj(33, "Child_C", false)], + vec![false], + 5, + )); + let endpoint = spawn_fake(state.clone()).await; + let mut client = GalaxyClient::connect(ClientOptions::new(endpoint)) + .await + .unwrap(); + + let roots = client.browse(None).await.unwrap(); + let root = roots.into_iter().next().unwrap(); + root.expand().await.unwrap(); + + let children = root.children().await; + assert_eq!(children.len(), 3); + assert_eq!(children[0].object().tag_name, "Child_A"); + assert_eq!(children[1].object().tag_name, "Child_B"); + assert_eq!(children[2].object().tag_name, "Child_C"); + + let calls = state.browse_children_calls.lock().unwrap(); + // 1 root call + 2 paged expand calls = 3 total. + assert_eq!(calls.len(), 3); + assert_eq!(calls[1].page_token, ""); + assert_eq!(calls[2].page_token, "cursor-2"); + } + + #[tokio::test] + async fn browse_with_filter_forwards_to_request() { + let state = Arc::new(FakeState::default()); + state + .browse_children_replies + .lock() + .unwrap() + .push_back(build_browse_reply(Vec::new(), Vec::new(), 1)); + let endpoint = spawn_fake(state.clone()).await; + let mut client = GalaxyClient::connect(ClientOptions::new(endpoint)) + .await + .unwrap(); + + let options = BrowseChildrenOptions { + category_ids: vec![3, 5], + template_chain_contains: vec!["$DelmiaReceiver".to_owned()], + tag_name_glob: Some("Recv_*".to_owned()), + include_attributes: Some(true), + alarm_bearing_only: true, + historized_only: false, + }; + + let _ = client.browse(Some(options)).await.unwrap(); + + let calls = state.browse_children_calls.lock().unwrap(); + assert_eq!(calls.len(), 1); + let req = &calls[0]; + assert_eq!(req.category_ids, vec![3, 5]); + assert_eq!(req.template_chain_contains, vec!["$DelmiaReceiver"]); + assert_eq!(req.tag_name_glob, "Recv_*"); + assert_eq!(req.include_attributes, Some(true)); + assert!(req.alarm_bearing_only); + assert!(!req.historized_only); + } }