//! Thin async wrapper for the `GalaxyRepository` gRPC service. //! //! The wrapper mirrors [`crate::client::GatewayClient`]: it owns a tonic //! channel with the shared bearer-token interceptor and exposes the three //! read-only RPCs as Rust async methods. Generated Galaxy proto types are //! re-exported through [`crate::generated::galaxy_repository::v1`]. use std::collections::HashSet; use std::sync::Arc; use prost_types::Timestamp; use tokio::sync::Mutex as AsyncMutex; use tonic::codegen::InterceptedService; use tonic::transport::Channel; use tonic::Request; use crate::auth::AuthInterceptor; use crate::error::Error; use crate::generated::galaxy_repository::v1::galaxy_repository_client::GalaxyRepositoryClient; use crate::generated::galaxy_repository::v1::{ browse_children_request, BrowseChildrenReply, BrowseChildrenRequest, DeployEvent, DiscoverHierarchyRequest, GalaxyObject, GetLastDeployTimeRequest, TestConnectionRequest, WatchDeployEventsRequest, }; use crate::options::{build_tls_config, ClientOptions}; const DISCOVER_HIERARCHY_PAGE_SIZE: i32 = 5000; const BROWSE_CHILDREN_PAGE_SIZE: i32 = 500; /// Optional filter set forwarded to `GalaxyRepository.BrowseChildren`. /// /// Mirrors the request-level filters on the wire: combined with AND so a child /// only appears when it satisfies every populated criterion. Construct via /// [`BrowseChildrenOptions::default`] and tweak the fields you care about. #[derive(Debug, Clone, Default)] pub struct BrowseChildrenOptions { /// Restrict to objects whose `category_id` matches one of the supplied /// Galaxy category identifiers. Empty means "no restriction". pub category_ids: Vec, /// Restrict to objects whose template chain contains every supplied /// template name (case-sensitive substring match on each entry). pub template_chain_contains: Vec, /// Restrict to objects whose tag name matches the supplied glob (SQL /// `LIKE`-style on the server). `None` means "no glob filter". pub tag_name_glob: Option, /// Optional tri-state hint for whether to populate `GalaxyObject.attributes` /// on returned children. `None` falls back to the server default. pub include_attributes: Option, /// When `true`, only return children that own at least one alarm-bearing /// attribute (matches `DiscoverHierarchy` semantics). pub alarm_bearing_only: bool, /// When `true`, only return children that own at least one historized /// attribute (matches `DiscoverHierarchy` semantics). pub historized_only: bool, } /// Lazy hierarchy node used by the walker built on top of `BrowseChildren`. /// /// A node owns its [`GalaxyObject`], a hint as to whether the server believes /// it has at least one matching descendant under the active filter set, and an /// internal `expanded` flag protected by an async mutex. Calling [`expand`] /// the first time issues a paged `BrowseChildren` RPC; subsequent calls are /// no-ops so callers can poll without re-hitting the server. /// /// `LazyBrowseNode` is cheap to clone — clones share state through an /// internal `Arc`, so expanding one clone makes the children visible to every /// other clone. /// /// [`expand`]: LazyBrowseNode::expand pub struct LazyBrowseNode { inner: Arc, } impl Clone for LazyBrowseNode { fn clone(&self) -> Self { Self { inner: Arc::clone(&self.inner), } } } struct LazyBrowseNodeInner { client: GalaxyClient, object: GalaxyObject, has_children_hint: bool, options: BrowseChildrenOptions, state: AsyncMutex, } struct LazyBrowseNodeState { children: Vec, is_expanded: bool, } impl LazyBrowseNode { /// Borrow the [`GalaxyObject`] returned by the server for this node. pub fn object(&self) -> &GalaxyObject { &self.inner.object } /// Server-supplied hint: `true` when the child likely has at least one /// further matching descendant. Useful to decide whether a UI should draw /// an expand triangle without issuing the RPC up front. pub fn has_children_hint(&self) -> bool { self.inner.has_children_hint } /// Snapshot of the currently-known children. Empty until [`expand`] has /// run at least once. /// /// [`expand`]: LazyBrowseNode::expand pub async fn children(&self) -> Vec { self.inner.state.lock().await.children.clone() } /// Returns `true` once [`expand`] has populated this node's children. /// /// [`expand`]: LazyBrowseNode::expand pub async fn is_expanded(&self) -> bool { self.inner.state.lock().await.is_expanded } /// Populate this node's children by issuing a paged `BrowseChildren` RPC. /// Subsequent calls are no-ops — the cached children stay in place and no /// additional RPC is issued. pub async fn expand(&self) -> Result<(), Error> { let mut state = self.inner.state.lock().await; if state.is_expanded { return Ok(()); } let mut client = self.inner.client.clone(); let new_children = client .browse_children_inner( Some(self.inner.object.gobject_id), self.inner.options.clone(), ) .await?; state.children = new_children; state.is_expanded = true; Ok(()) } } /// Convenience alias for the generated Galaxy client wrapped in the /// authentication interceptor. pub type RawGalaxyClient = GalaxyRepositoryClient>; /// Stream of `DeployEvent` values returned by /// [`GalaxyClient::watch_deploy_events`]. Mirrors /// [`crate::client::EventStream`]: a boxed `Stream` whose `tonic::Status` /// errors have already been mapped onto [`Error`]. Dropping the stream /// cancels the underlying gRPC call. pub type DeployEventStream = std::pin::Pin< Box> + Send + 'static>, >; /// Thin async wrapper around the generated Galaxy Repository gRPC client. /// /// Construct it with [`GalaxyClient::connect`] using the same /// [`ClientOptions`] that drive [`crate::client::GatewayClient`]. The /// service is metadata-only (no sessions) and requires the `metadata:read` /// API-key scope on the server side. #[derive(Clone)] pub struct GalaxyClient { inner: RawGalaxyClient, call_timeout: std::time::Duration, stream_timeout: Option, } impl GalaxyClient { /// Connect to the gateway endpoint and build a Galaxy client. Mirrors /// the TLS / plaintext / API-key handling used by `GatewayClient`. pub async fn connect(options: ClientOptions) -> Result { let mut endpoint = Channel::from_shared(options.endpoint().to_owned()).map_err(|source| { Error::InvalidEndpoint { endpoint: options.endpoint().to_owned(), detail: source.to_string(), } })?; endpoint = endpoint.connect_timeout(options.connect_timeout()); if let Some(tls) = build_tls_config(&options)? { endpoint = endpoint.tls_config(tls)?; } let channel = endpoint.connect().await?; let interceptor = AuthInterceptor::new(options.api_key().cloned()); let max_grpc_message_bytes = options.max_grpc_message_bytes(); Ok(Self { inner: GalaxyRepositoryClient::with_interceptor(channel, interceptor) .max_decoding_message_size(max_grpc_message_bytes) .max_encoding_message_size(max_grpc_message_bytes), call_timeout: options.call_timeout(), stream_timeout: options.stream_timeout(), }) } /// Build a [`GalaxyClient`] that talks through an existing tonic /// channel. Tests use this to wire up an in-memory transport. pub fn from_channel(channel: Channel, options: &ClientOptions) -> Self { let interceptor = AuthInterceptor::new(options.api_key().cloned()); let max_grpc_message_bytes = options.max_grpc_message_bytes(); Self { inner: GalaxyRepositoryClient::with_interceptor(channel, interceptor) .max_decoding_message_size(max_grpc_message_bytes) .max_encoding_message_size(max_grpc_message_bytes), call_timeout: options.call_timeout(), stream_timeout: options.stream_timeout(), } } /// Borrow the underlying generated client for advanced callers that need /// access to features not surfaced by the wrapper. pub fn raw_client(&mut self) -> &mut RawGalaxyClient { &mut self.inner } /// Consume the wrapper and return the generated client. pub fn into_inner(self) -> RawGalaxyClient { self.inner } /// Probe the Galaxy Repository database connection. Returns the `ok` /// flag from the server reply. pub async fn test_connection(&mut self) -> Result { let response = self .inner .test_connection(self.unary_request(TestConnectionRequest {})) .await?; Ok(response.into_inner().ok) } /// Read the most recent Galaxy deployment timestamp. Returns `None` /// when the server reports `present = false`. pub async fn get_last_deploy_time(&mut self) -> Result, Error> { let response = self .inner .get_last_deploy_time(self.unary_request(GetLastDeployTimeRequest {})) .await?; let reply = response.into_inner(); if reply.present { Ok(reply.time_of_last_deploy) } else { Ok(None) } } /// Walk the deployed object hierarchy. Each [`GalaxyObject`] contains /// the object's identifying names plus its dynamic attributes. pub async fn discover_hierarchy(&mut self) -> Result, Error> { let mut objects = Vec::new(); let mut seen_page_tokens = std::collections::HashSet::new(); let mut page_token = String::new(); loop { let response = self .inner .discover_hierarchy(self.unary_request(DiscoverHierarchyRequest { page_size: DISCOVER_HIERARCHY_PAGE_SIZE, page_token, ..Default::default() })) .await?; let reply = response.into_inner(); objects.extend(reply.objects); page_token = reply.next_page_token; if page_token.is_empty() { return Ok(objects); } if !seen_page_tokens.insert(page_token.clone()) { return Err(Error::InvalidArgument { name: "page_token".to_owned(), detail: format!( "galaxy discover hierarchy returned repeated page token `{page_token}`" ), }); } } } /// Browse the top-level (root) objects of the hierarchy as /// [`LazyBrowseNode`] instances. Pass [`BrowseChildrenOptions`] to /// restrict the result set; the same filter is reused when callers expand /// any returned node. pub async fn browse( &mut self, options: Option, ) -> Result, Error> { let effective = options.unwrap_or_default(); self.browse_children_inner(None, effective).await } /// Issue a single `BrowseChildren` RPC and return the raw reply. Callers /// that want to drive paging themselves (or inspect the cache sequence) /// use this; high-level walking goes through [`browse`] and /// [`LazyBrowseNode::expand`]. /// /// [`browse`]: GalaxyClient::browse pub async fn browse_children_raw( &mut self, request: BrowseChildrenRequest, ) -> Result { let response = self .inner .browse_children(self.unary_request(request)) .await?; Ok(response.into_inner()) } pub(crate) async fn browse_children_inner( &mut self, parent_gobject_id: Option, options: BrowseChildrenOptions, ) -> Result, Error> { let mut nodes = Vec::new(); let mut page_token = String::new(); let mut seen_page_tokens: HashSet = HashSet::new(); loop { let parent = parent_gobject_id.map(browse_children_request::Parent::ParentGobjectId); let request = BrowseChildrenRequest { page_size: BROWSE_CHILDREN_PAGE_SIZE, page_token: page_token.clone(), category_ids: options.category_ids.clone(), template_chain_contains: options.template_chain_contains.clone(), tag_name_glob: options.tag_name_glob.clone().unwrap_or_default(), include_attributes: options.include_attributes, alarm_bearing_only: options.alarm_bearing_only, historized_only: options.historized_only, parent, }; let reply = self.browse_children_raw(request).await?; let hints = reply.child_has_children; for (index, object) in reply.children.into_iter().enumerate() { let hint = hints.get(index).copied().unwrap_or(false); nodes.push(self.make_lazy_node(object, hint, options.clone())); } page_token = reply.next_page_token; if page_token.is_empty() { return Ok(nodes); } if !seen_page_tokens.insert(page_token.clone()) { return Err(Error::InvalidArgument { name: "page_token".to_owned(), detail: format!( "galaxy browse children returned repeated page token `{page_token}`" ), }); } } } fn make_lazy_node( &self, object: GalaxyObject, has_children_hint: bool, options: BrowseChildrenOptions, ) -> LazyBrowseNode { LazyBrowseNode { inner: Arc::new(LazyBrowseNodeInner { client: self.clone(), object, has_children_hint, options, state: AsyncMutex::new(LazyBrowseNodeState { children: Vec::new(), is_expanded: false, }), }), } } /// Subscribe to the server-streamed deploy-event feed. /// /// The server emits a bootstrap event describing the current cache state /// immediately on subscribe, then one event per observed change to /// `galaxy.time_of_last_deploy`. When `last_seen_deploy_time` matches the /// current cache, the bootstrap event is suppressed and the stream stays /// idle until the next deploy. /// /// Cancellation is cooperative: dropping the returned /// [`DeployEventStream`] tears down the underlying gRPC call. Callers /// drive consumption with `StreamExt::next` (or any other `Stream` /// adapter). pub async fn watch_deploy_events( &mut self, last_seen_deploy_time: Option, ) -> Result { let request = WatchDeployEventsRequest { last_seen_deploy_time, }; let response = self .inner .watch_deploy_events(self.stream_request(request)) .await?; let stream = futures_util::StreamExt::map(response.into_inner(), |result| { result.map_err(Error::from) }); Ok(Box::pin(stream)) } fn unary_request(&self, message: T) -> Request { let mut request = Request::new(message); request.set_timeout(self.call_timeout); request } fn stream_request(&self, message: T) -> Request { let mut request = Request::new(message); if let Some(timeout) = self.stream_timeout { request.set_timeout(timeout); } request } } #[cfg(test)] mod tests { use std::pin::Pin; use std::sync::{Arc, Mutex}; use futures_util::StreamExt; use tokio::net::TcpListener; use tokio::sync::mpsc; use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream}; use tonic::transport::Server; use tonic::{Request, Response, Status}; use super::*; use crate::auth::ApiKey; use crate::generated::galaxy_repository::v1::galaxy_repository_server::{ GalaxyRepository, GalaxyRepositoryServer, }; use crate::generated::galaxy_repository::v1::{ BrowseChildrenReply, BrowseChildrenRequest, DeployEvent, DiscoverHierarchyReply, DiscoverHierarchyRequest, GalaxyAttribute, GalaxyObject, GetLastDeployTimeReply, GetLastDeployTimeRequest, TestConnectionReply, TestConnectionRequest, WatchDeployEventsRequest, }; type DeployEventTx = mpsc::Sender>; #[derive(Default)] struct FakeState { authorization: Mutex>, present: Mutex, last_deploy: Mutex>, objects: Mutex>, discover_requests: Mutex>, discover_replies: Mutex>, browse_children_calls: Mutex>, browse_children_replies: Mutex>, browse_children_errors: Mutex>, watch_requests: Mutex>, watch_events: Mutex>, watch_senders: Mutex>, watch_drop_signal: Mutex>>, } #[derive(Clone)] struct FakeGalaxy { state: Arc, } #[tonic::async_trait] impl GalaxyRepository for FakeGalaxy { async fn test_connection( &self, request: Request, ) -> Result, Status> { *self.state.authorization.lock().unwrap() = request .metadata() .get("authorization") .and_then(|value| value.to_str().ok()) .map(str::to_owned); Ok(Response::new(TestConnectionReply { ok: true })) } async fn get_last_deploy_time( &self, _request: Request, ) -> Result, Status> { let present = *self.state.present.lock().unwrap(); let time = *self.state.last_deploy.lock().unwrap(); Ok(Response::new(GetLastDeployTimeReply { present, time_of_last_deploy: time, })) } async fn discover_hierarchy( &self, request: Request, ) -> Result, Status> { self.state .discover_requests .lock() .unwrap() .push(request.into_inner()); if let Some(reply) = self.state.discover_replies.lock().unwrap().pop_front() { return Ok(Response::new(reply)); } Ok(Response::new(DiscoverHierarchyReply { objects: self.state.objects.lock().unwrap().clone(), next_page_token: String::new(), total_object_count: self.state.objects.lock().unwrap().len() as i32, })) } async fn browse_children( &self, request: Request, ) -> Result, Status> { self.state .browse_children_calls .lock() .unwrap() .push(request.into_inner()); if let Some(error) = self.state.browse_children_errors.lock().unwrap().pop() { return Err(error); } let reply = self .state .browse_children_replies .lock() .unwrap() .pop_front() .unwrap_or_default(); Ok(Response::new(reply)) } type WatchDeployEventsStream = Pin> + Send + 'static>>; async fn watch_deploy_events( &self, request: Request, ) -> Result, Status> { self.state .watch_requests .lock() .unwrap() .push(request.into_inner()); let preset = self.state.watch_events.lock().unwrap().clone(); let (tx, rx) = mpsc::channel::>(16); for event in preset { tx.send(Ok(event)) .await .map_err(|err| Status::internal(err.to_string()))?; } self.state.watch_senders.lock().unwrap().push(tx.clone()); let drop_signal = self.state.watch_drop_signal.lock().unwrap().clone(); let stream = ReceiverStream::new(rx); let stream: Pin + Send + 'static>> = if let Some(signal) = drop_signal { Box::pin(WatchStreamWithDropSignal { inner: stream, signal: Some(signal), }) } else { Box::pin(stream) }; Ok(Response::new(stream)) } } /// Wraps the receiver stream so we can detect when the server-side stream /// future is dropped (the client cancelled or dropped the stream). Used by /// `watch_drop_tears_down_call`. struct WatchStreamWithDropSignal { inner: S, signal: Option>, } impl tokio_stream::Stream for WatchStreamWithDropSignal { type Item = S::Item; fn poll_next( mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { Pin::new(&mut self.inner).poll_next(cx) } } impl Drop for WatchStreamWithDropSignal { fn drop(&mut self) { if let Some(signal) = self.signal.take() { let _ = signal.send(()); } } } async fn spawn_fake(state: Arc) -> String { let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); let address = listener.local_addr().unwrap(); let incoming = TcpListenerStream::new(listener); let service = GalaxyRepositoryServer::new(FakeGalaxy { state }); tokio::spawn(async move { Server::builder() .add_service(service) .serve_with_incoming(incoming) .await .unwrap(); }); format!("http://{address}") } #[tokio::test] async fn test_connection_attaches_bearer_metadata_and_returns_ok() { let state = Arc::new(FakeState::default()); let endpoint = spawn_fake(state.clone()).await; let mut client = GalaxyClient::connect( ClientOptions::new(endpoint).with_api_key(ApiKey::new("mxgw_galaxy_secret")), ) .await .unwrap(); let ok = client.test_connection().await.unwrap(); assert!(ok); assert_eq!( state.authorization.lock().unwrap().as_deref(), Some("Bearer mxgw_galaxy_secret") ); } #[tokio::test] async fn get_last_deploy_time_returns_none_when_not_present() { let state = Arc::new(FakeState::default()); *state.present.lock().unwrap() = false; *state.last_deploy.lock().unwrap() = Some(Timestamp { seconds: 1_700_000_000, nanos: 0, }); let endpoint = spawn_fake(state.clone()).await; let mut client = GalaxyClient::connect(ClientOptions::new(endpoint)) .await .unwrap(); let result = client.get_last_deploy_time().await.unwrap(); assert!( result.is_none(), "present=false on the wire must surface as None, got {result:?}" ); } #[tokio::test] async fn get_last_deploy_time_returns_timestamp_when_present() { let state = Arc::new(FakeState::default()); *state.present.lock().unwrap() = true; *state.last_deploy.lock().unwrap() = Some(Timestamp { seconds: 1_700_000_000, nanos: 250_000_000, }); let endpoint = spawn_fake(state.clone()).await; let mut client = GalaxyClient::connect(ClientOptions::new(endpoint)) .await .unwrap(); let result = client.get_last_deploy_time().await.unwrap(); let timestamp = result.expect("present=true should yield a timestamp"); assert_eq!(timestamp.seconds, 1_700_000_000); assert_eq!(timestamp.nanos, 250_000_000); } #[tokio::test] async fn discover_hierarchy_returns_objects_with_attributes() { let state = Arc::new(FakeState::default()); state .discover_replies .lock() .unwrap() .push_back(DiscoverHierarchyReply { objects: vec![GalaxyObject { gobject_id: 42, tag_name: "DelmiaReceiver_001".to_owned(), contained_name: "DelmiaReceiver".to_owned(), browse_name: "TestMachine_001/DelmiaReceiver".to_owned(), parent_gobject_id: 7, is_area: false, category_id: 3, hosted_by_gobject_id: 1, template_chain: vec!["$UserDefined".to_owned(), "$DelmiaReceiver".to_owned()], attributes: vec![GalaxyAttribute { attribute_name: "DownloadPath".to_owned(), full_tag_reference: "DelmiaReceiver_001.DownloadPath".to_owned(), mx_data_type: 8, data_type_name: "MxString".to_owned(), is_array: false, array_dimension: 0, array_dimension_present: false, mx_attribute_category: 2, security_classification: 1, is_historized: false, is_alarm: false, }], }], next_page_token: "page-2".to_owned(), total_object_count: 2, }); state .discover_replies .lock() .unwrap() .push_back(DiscoverHierarchyReply { objects: vec![GalaxyObject { gobject_id: 43, tag_name: "DelmiaReceiver_002".to_owned(), contained_name: String::new(), browse_name: String::new(), parent_gobject_id: 0, is_area: false, category_id: 0, hosted_by_gobject_id: 0, template_chain: Vec::new(), attributes: Vec::new(), }], next_page_token: String::new(), total_object_count: 2, }); let endpoint = spawn_fake(state.clone()).await; let mut client = GalaxyClient::connect(ClientOptions::new(endpoint)) .await .unwrap(); let objects = client.discover_hierarchy().await.unwrap(); assert_eq!(objects.len(), 2); let requests = state.discover_requests.lock().unwrap(); assert_eq!(requests.len(), 2); assert_eq!(requests[0].page_size, 5000); assert_eq!(requests[0].page_token, ""); assert_eq!(requests[1].page_token, "page-2"); assert_eq!(objects[0].tag_name, "DelmiaReceiver_001"); assert_eq!(objects[0].attributes.len(), 1); assert_eq!(objects[0].attributes[0].attribute_name, "DownloadPath"); assert_eq!( objects[0].attributes[0].full_tag_reference, "DelmiaReceiver_001.DownloadPath" ); } #[tokio::test] async fn discover_hierarchy_rejects_repeated_page_token() { let state = Arc::new(FakeState::default()); state .discover_replies .lock() .unwrap() .push_back(DiscoverHierarchyReply { objects: Vec::new(), next_page_token: "7:1".to_owned(), total_object_count: 1, }); state .discover_replies .lock() .unwrap() .push_back(DiscoverHierarchyReply { objects: Vec::new(), next_page_token: "7:1".to_owned(), total_object_count: 1, }); let endpoint = spawn_fake(state).await; let mut client = GalaxyClient::connect(ClientOptions::new(endpoint)) .await .unwrap(); let error = client.discover_hierarchy().await.unwrap_err(); assert!(error.to_string().contains("repeated page token")); } #[tokio::test] async fn watch_deploy_events_yields_events_in_order() { let state = Arc::new(FakeState::default()); *state.watch_events.lock().unwrap() = vec![ DeployEvent { sequence: 1, observed_at: Some(Timestamp { seconds: 1_700_000_000, nanos: 0, }), time_of_last_deploy: Some(Timestamp { seconds: 1_699_000_000, nanos: 0, }), time_of_last_deploy_present: true, object_count: 12, attribute_count: 80, }, DeployEvent { sequence: 2, observed_at: Some(Timestamp { seconds: 1_700_000_500, nanos: 0, }), time_of_last_deploy: Some(Timestamp { seconds: 1_699_500_000, nanos: 0, }), time_of_last_deploy_present: true, object_count: 13, attribute_count: 85, }, ]; let endpoint = spawn_fake(state.clone()).await; let mut client = GalaxyClient::connect(ClientOptions::new(endpoint)) .await .unwrap(); let mut stream = client.watch_deploy_events(None).await.unwrap(); let first = stream .next() .await .expect("bootstrap event") .expect("ok deploy event"); let second = stream .next() .await .expect("second event") .expect("ok deploy event"); assert_eq!(first.sequence, 1); assert_eq!(first.object_count, 12); assert_eq!(second.sequence, 2); assert_eq!(second.object_count, 13); assert!(first.time_of_last_deploy_present); } #[tokio::test] async fn watch_deploy_events_propagates_last_seen_deploy_time() { let state = Arc::new(FakeState::default()); let endpoint = spawn_fake(state.clone()).await; let mut client = GalaxyClient::connect(ClientOptions::new(endpoint)) .await .unwrap(); let last_seen = Timestamp { seconds: 1_699_999_999, nanos: 123_456_789, }; let stream = client.watch_deploy_events(Some(last_seen)).await.unwrap(); // Drop the stream right away — the test is solely about the request // payload reaching the server. drop(stream); // Give the server task a moment to record the request. for _ in 0..20 { if !state.watch_requests.lock().unwrap().is_empty() { break; } tokio::time::sleep(std::time::Duration::from_millis(10)).await; } let requests = state.watch_requests.lock().unwrap().clone(); assert_eq!(requests.len(), 1); let recorded = requests[0] .last_seen_deploy_time .as_ref() .expect("last_seen_deploy_time forwarded"); assert_eq!(recorded.seconds, last_seen.seconds); assert_eq!(recorded.nanos, last_seen.nanos); } #[tokio::test] async fn watch_deploy_events_drop_tears_down_call() { let state = Arc::new(FakeState::default()); let (signal_tx, mut signal_rx) = mpsc::unbounded_channel(); *state.watch_drop_signal.lock().unwrap() = Some(signal_tx); // Seed one event so the client gets something on the stream before we // drop it; this proves the call is live. *state.watch_events.lock().unwrap() = vec![DeployEvent { sequence: 7, observed_at: None, time_of_last_deploy: None, time_of_last_deploy_present: false, object_count: 0, attribute_count: 0, }]; let endpoint = spawn_fake(state.clone()).await; let mut client = GalaxyClient::connect(ClientOptions::new(endpoint)) .await .unwrap(); let mut stream = client.watch_deploy_events(None).await.unwrap(); let event = stream .next() .await .expect("bootstrap event") .expect("ok deploy event"); assert_eq!(event.sequence, 7); // Dropping the client-side stream must trigger the server-side stream // future to be dropped as well, signalling cancellation. drop(stream); let drop_seen = tokio::time::timeout(std::time::Duration::from_secs(2), signal_rx.recv()) .await .expect("server-side stream future was not dropped within 2s"); assert!( drop_seen.is_some(), "drop signal channel closed unexpectedly" ); } fn browse_obj(gid: i32, tag: &str, is_area: bool) -> GalaxyObject { GalaxyObject { gobject_id: gid, tag_name: tag.to_owned(), contained_name: String::new(), browse_name: tag.to_owned(), parent_gobject_id: 0, is_area, category_id: 0, hosted_by_gobject_id: 0, template_chain: Vec::new(), attributes: Vec::new(), } } fn build_browse_reply( children: Vec, child_has_children: Vec, cache_sequence: u64, ) -> BrowseChildrenReply { BrowseChildrenReply { total_child_count: children.len() as i32, cache_sequence, children, child_has_children, next_page_token: String::new(), } } #[tokio::test] async fn browse_no_parent_returns_roots() { let state = Arc::new(FakeState::default()); state .browse_children_replies .lock() .unwrap() .push_back(build_browse_reply( vec![browse_obj(1, "Area_A", true), browse_obj(2, "Area_B", true)], vec![true, false], 7, )); let endpoint = spawn_fake(state.clone()).await; let mut client = GalaxyClient::connect(ClientOptions::new(endpoint)) .await .unwrap(); let roots = client.browse(None).await.unwrap(); assert_eq!(roots.len(), 2); assert_eq!(roots[0].object().tag_name, "Area_A"); assert!(roots[0].has_children_hint()); assert_eq!(roots[1].object().tag_name, "Area_B"); assert!(!roots[1].has_children_hint()); let calls = state.browse_children_calls.lock().unwrap(); assert_eq!(calls.len(), 1); assert!( calls[0].parent.is_none(), "root browse must send an empty parent oneof, got {:?}", calls[0].parent ); } #[tokio::test] async fn browse_expand_populates_children_and_marks_expanded() { let state = Arc::new(FakeState::default()); // First call: roots. state .browse_children_replies .lock() .unwrap() .push_back(build_browse_reply( vec![browse_obj(10, "Area_A", true)], vec![true], 1, )); // Second call: children of gobject 10. state .browse_children_replies .lock() .unwrap() .push_back(build_browse_reply( vec![browse_obj(11, "Receiver_1", false)], vec![false], 1, )); let endpoint = spawn_fake(state.clone()).await; let mut client = GalaxyClient::connect(ClientOptions::new(endpoint)) .await .unwrap(); let roots = client.browse(None).await.unwrap(); let root = roots.into_iter().next().expect("at least one root"); assert!(!root.is_expanded().await); root.expand().await.unwrap(); assert!(root.is_expanded().await); let children = root.children().await; assert_eq!(children.len(), 1); assert_eq!(children[0].object().tag_name, "Receiver_1"); let calls = state.browse_children_calls.lock().unwrap(); assert_eq!(calls.len(), 2); let expand_call = &calls[1]; match expand_call.parent.as_ref().expect("expand sends parent") { browse_children_request::Parent::ParentGobjectId(id) => assert_eq!(*id, 10), other => panic!("expected ParentGobjectId variant, got {other:?}"), } } #[tokio::test] async fn browse_expand_idempotent_no_second_rpc() { let state = Arc::new(FakeState::default()); state .browse_children_replies .lock() .unwrap() .push_back(build_browse_reply( vec![browse_obj(20, "Area_X", true)], vec![true], 1, )); state .browse_children_replies .lock() .unwrap() .push_back(build_browse_reply( vec![browse_obj(21, "Leaf", false)], vec![false], 1, )); let endpoint = spawn_fake(state.clone()).await; let mut client = GalaxyClient::connect(ClientOptions::new(endpoint)) .await .unwrap(); let roots = client.browse(None).await.unwrap(); let root = roots.into_iter().next().unwrap(); root.expand().await.unwrap(); let after_first = state.browse_children_calls.lock().unwrap().len(); // Calling expand a second time must NOT issue a new RPC. root.expand().await.unwrap(); let after_second = state.browse_children_calls.lock().unwrap().len(); assert_eq!( after_first, after_second, "expand should be idempotent — no extra RPC the second time" ); assert_eq!(root.children().await.len(), 1); } #[tokio::test] async fn browse_expand_unknown_parent_returns_not_found_error() { let state = Arc::new(FakeState::default()); // Root browse succeeds. state .browse_children_replies .lock() .unwrap() .push_back(build_browse_reply( vec![browse_obj(99, "GhostArea", true)], vec![true], 1, )); let endpoint = spawn_fake(state.clone()).await; let mut client = GalaxyClient::connect(ClientOptions::new(endpoint)) .await .unwrap(); let roots = client.browse(None).await.unwrap(); let root = roots.into_iter().next().unwrap(); // Seed the NotFound only AFTER the root call so the FakeGalaxy's // error stack doesn't intercept the initial browse. state .browse_children_errors .lock() .unwrap() .push(Status::not_found("parent gobject 99 not present in cache")); let error = root.expand().await.unwrap_err(); match &error { Error::Status(status) => { assert_eq!(status.code(), tonic::Code::NotFound); } other => panic!("expected Error::Status(NotFound), got {other:?}"), } // Failed expand must NOT mark the node as expanded — caller can retry. assert!(!root.is_expanded().await); assert!(root.children().await.is_empty()); } #[tokio::test] async fn browse_expand_multi_page_gathers_all_pages() { let state = Arc::new(FakeState::default()); // First reply: roots. state .browse_children_replies .lock() .unwrap() .push_back(build_browse_reply( vec![browse_obj(30, "Plant", true)], vec![true], 5, )); // Second reply: page 1 of children, with a next_page_token. let mut page_one = build_browse_reply( vec![ browse_obj(31, "Child_A", false), browse_obj(32, "Child_B", false), ], vec![false, false], 5, ); page_one.next_page_token = "cursor-2".to_owned(); page_one.total_child_count = 3; state .browse_children_replies .lock() .unwrap() .push_back(page_one); // Third reply: page 2 of children, with no next page. state .browse_children_replies .lock() .unwrap() .push_back(build_browse_reply( vec![browse_obj(33, "Child_C", false)], vec![false], 5, )); let endpoint = spawn_fake(state.clone()).await; let mut client = GalaxyClient::connect(ClientOptions::new(endpoint)) .await .unwrap(); let roots = client.browse(None).await.unwrap(); let root = roots.into_iter().next().unwrap(); root.expand().await.unwrap(); let children = root.children().await; assert_eq!(children.len(), 3); assert_eq!(children[0].object().tag_name, "Child_A"); assert_eq!(children[1].object().tag_name, "Child_B"); assert_eq!(children[2].object().tag_name, "Child_C"); let calls = state.browse_children_calls.lock().unwrap(); // 1 root call + 2 paged expand calls = 3 total. assert_eq!(calls.len(), 3); assert_eq!(calls[1].page_token, ""); assert_eq!(calls[2].page_token, "cursor-2"); } #[tokio::test] async fn browse_with_filter_forwards_to_request() { let state = Arc::new(FakeState::default()); state .browse_children_replies .lock() .unwrap() .push_back(build_browse_reply(Vec::new(), Vec::new(), 1)); let endpoint = spawn_fake(state.clone()).await; let mut client = GalaxyClient::connect(ClientOptions::new(endpoint)) .await .unwrap(); let options = BrowseChildrenOptions { category_ids: vec![3, 5], template_chain_contains: vec!["$DelmiaReceiver".to_owned()], tag_name_glob: Some("Recv_*".to_owned()), include_attributes: Some(true), alarm_bearing_only: true, historized_only: false, }; let _ = client.browse(Some(options)).await.unwrap(); let calls = state.browse_children_calls.lock().unwrap(); assert_eq!(calls.len(), 1); let req = &calls[0]; assert_eq!(req.category_ids, vec![3, 5]); assert_eq!(req.template_chain_contains, vec!["$DelmiaReceiver"]); assert_eq!(req.tag_name_glob, "Recv_*"); assert_eq!(req.include_attributes, Some(true)); assert!(req.alarm_bearing_only); assert!(!req.historized_only); } }