//! Thin async wrapper for the `GalaxyRepository` gRPC service. //! //! The wrapper mirrors [`crate::client::GatewayClient`]: it owns a tonic //! channel with the shared bearer-token interceptor and exposes the three //! read-only RPCs as Rust async methods. Generated Galaxy proto types are //! re-exported through [`crate::generated::galaxy_repository::v1`]. use std::fs; use prost_types::Timestamp; use tonic::codegen::InterceptedService; use tonic::transport::{Certificate, Channel, ClientTlsConfig}; use tonic::Request; use crate::auth::AuthInterceptor; use crate::error::Error; use crate::generated::galaxy_repository::v1::galaxy_repository_client::GalaxyRepositoryClient; use crate::generated::galaxy_repository::v1::{ DeployEvent, DiscoverHierarchyRequest, GalaxyObject, GetLastDeployTimeRequest, TestConnectionRequest, WatchDeployEventsRequest, }; use crate::options::ClientOptions; const DISCOVER_HIERARCHY_PAGE_SIZE: i32 = 5000; /// Convenience alias for the generated Galaxy client wrapped in the /// authentication interceptor. pub type RawGalaxyClient = GalaxyRepositoryClient>; /// Stream of `DeployEvent` values returned by /// [`GalaxyClient::watch_deploy_events`]. Mirrors /// [`crate::client::EventStream`]: a boxed `Stream` whose `tonic::Status` /// errors have already been mapped onto [`Error`]. Dropping the stream /// cancels the underlying gRPC call. pub type DeployEventStream = std::pin::Pin< Box> + Send + 'static>, >; /// Thin async wrapper around the generated Galaxy Repository gRPC client. /// /// Construct it with [`GalaxyClient::connect`] using the same /// [`ClientOptions`] that drive [`crate::client::GatewayClient`]. The /// service is metadata-only (no sessions) and requires the `metadata:read` /// API-key scope on the server side. #[derive(Clone)] pub struct GalaxyClient { inner: RawGalaxyClient, call_timeout: std::time::Duration, stream_timeout: Option, } impl GalaxyClient { /// Connect to the gateway endpoint and build a Galaxy client. Mirrors /// the TLS / plaintext / API-key handling used by `GatewayClient`. pub async fn connect(options: ClientOptions) -> Result { let mut endpoint = Channel::from_shared(options.endpoint().to_owned()).map_err(|source| { Error::InvalidEndpoint { endpoint: options.endpoint().to_owned(), detail: source.to_string(), } })?; endpoint = endpoint.connect_timeout(options.connect_timeout()); if !options.plaintext() { let mut tls = ClientTlsConfig::new(); if let Some(server_name) = options.server_name_override() { tls = tls.domain_name(server_name.to_owned()); } if let Some(ca_file) = options.ca_file() { let certificate = fs::read(ca_file).map_err(|source| Error::InvalidEndpoint { endpoint: options.endpoint().to_owned(), detail: format!("failed to read CA file {}: {source}", ca_file.display()), })?; tls = tls.ca_certificate(Certificate::from_pem(certificate)); } endpoint = endpoint.tls_config(tls)?; } let channel = endpoint.connect().await?; let interceptor = AuthInterceptor::new(options.api_key().cloned()); let max_grpc_message_bytes = options.max_grpc_message_bytes(); Ok(Self { inner: GalaxyRepositoryClient::with_interceptor(channel, interceptor) .max_decoding_message_size(max_grpc_message_bytes) .max_encoding_message_size(max_grpc_message_bytes), call_timeout: options.call_timeout(), stream_timeout: options.stream_timeout(), }) } /// Build a [`GalaxyClient`] that talks through an existing tonic /// channel. Tests use this to wire up an in-memory transport. pub fn from_channel(channel: Channel, options: &ClientOptions) -> Self { let interceptor = AuthInterceptor::new(options.api_key().cloned()); let max_grpc_message_bytes = options.max_grpc_message_bytes(); Self { inner: GalaxyRepositoryClient::with_interceptor(channel, interceptor) .max_decoding_message_size(max_grpc_message_bytes) .max_encoding_message_size(max_grpc_message_bytes), call_timeout: options.call_timeout(), stream_timeout: options.stream_timeout(), } } /// Borrow the underlying generated client for advanced callers that need /// access to features not surfaced by the wrapper. pub fn raw_client(&mut self) -> &mut RawGalaxyClient { &mut self.inner } /// Consume the wrapper and return the generated client. pub fn into_inner(self) -> RawGalaxyClient { self.inner } /// Probe the Galaxy Repository database connection. Returns the `ok` /// flag from the server reply. pub async fn test_connection(&mut self) -> Result { let response = self .inner .test_connection(self.unary_request(TestConnectionRequest {})) .await?; Ok(response.into_inner().ok) } /// Read the most recent Galaxy deployment timestamp. Returns `None` /// when the server reports `present = false`. pub async fn get_last_deploy_time(&mut self) -> Result, Error> { let response = self .inner .get_last_deploy_time(self.unary_request(GetLastDeployTimeRequest {})) .await?; let reply = response.into_inner(); if reply.present { Ok(reply.time_of_last_deploy) } else { Ok(None) } } /// Walk the deployed object hierarchy. Each [`GalaxyObject`] contains /// the object's identifying names plus its dynamic attributes. pub async fn discover_hierarchy(&mut self) -> Result, Error> { let mut objects = Vec::new(); let mut seen_page_tokens = std::collections::HashSet::new(); let mut page_token = String::new(); loop { let response = self .inner .discover_hierarchy(self.unary_request(DiscoverHierarchyRequest { page_size: DISCOVER_HIERARCHY_PAGE_SIZE, page_token, ..Default::default() })) .await?; let reply = response.into_inner(); objects.extend(reply.objects); page_token = reply.next_page_token; if page_token.is_empty() { return Ok(objects); } if !seen_page_tokens.insert(page_token.clone()) { return Err(Error::InvalidArgument { name: "page_token".to_owned(), detail: format!( "galaxy discover hierarchy returned repeated page token `{page_token}`" ), }); } } } /// Subscribe to the server-streamed deploy-event feed. /// /// The server emits a bootstrap event describing the current cache state /// immediately on subscribe, then one event per observed change to /// `galaxy.time_of_last_deploy`. When `last_seen_deploy_time` matches the /// current cache, the bootstrap event is suppressed and the stream stays /// idle until the next deploy. /// /// Cancellation is cooperative: dropping the returned /// [`DeployEventStream`] tears down the underlying gRPC call. Callers /// drive consumption with `StreamExt::next` (or any other `Stream` /// adapter). pub async fn watch_deploy_events( &mut self, last_seen_deploy_time: Option, ) -> Result { let request = WatchDeployEventsRequest { last_seen_deploy_time, }; let response = self .inner .watch_deploy_events(self.stream_request(request)) .await?; let stream = futures_util::StreamExt::map(response.into_inner(), |result| { result.map_err(Error::from) }); Ok(Box::pin(stream)) } fn unary_request(&self, message: T) -> Request { let mut request = Request::new(message); request.set_timeout(self.call_timeout); request } fn stream_request(&self, message: T) -> Request { let mut request = Request::new(message); if let Some(timeout) = self.stream_timeout { request.set_timeout(timeout); } request } } #[cfg(test)] mod tests { use std::pin::Pin; use std::sync::{Arc, Mutex}; use futures_util::StreamExt; use tokio::net::TcpListener; use tokio::sync::mpsc; use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream}; use tonic::transport::Server; use tonic::{Request, Response, Status}; use super::*; use crate::auth::ApiKey; use crate::generated::galaxy_repository::v1::galaxy_repository_server::{ GalaxyRepository, GalaxyRepositoryServer, }; use crate::generated::galaxy_repository::v1::{ DeployEvent, DiscoverHierarchyReply, DiscoverHierarchyRequest, GalaxyAttribute, GalaxyObject, GetLastDeployTimeReply, GetLastDeployTimeRequest, TestConnectionReply, TestConnectionRequest, WatchDeployEventsRequest, }; type DeployEventTx = mpsc::Sender>; #[derive(Default)] struct FakeState { authorization: Mutex>, present: Mutex, last_deploy: Mutex>, objects: Mutex>, discover_requests: Mutex>, discover_replies: Mutex>, watch_requests: Mutex>, watch_events: Mutex>, watch_senders: Mutex>, watch_drop_signal: Mutex>>, } #[derive(Clone)] struct FakeGalaxy { state: Arc, } #[tonic::async_trait] impl GalaxyRepository for FakeGalaxy { async fn test_connection( &self, request: Request, ) -> Result, Status> { *self.state.authorization.lock().unwrap() = request .metadata() .get("authorization") .and_then(|value| value.to_str().ok()) .map(str::to_owned); Ok(Response::new(TestConnectionReply { ok: true })) } async fn get_last_deploy_time( &self, _request: Request, ) -> Result, Status> { let present = *self.state.present.lock().unwrap(); let time = self.state.last_deploy.lock().unwrap().clone(); Ok(Response::new(GetLastDeployTimeReply { present, time_of_last_deploy: time, })) } async fn discover_hierarchy( &self, request: Request, ) -> Result, Status> { self.state .discover_requests .lock() .unwrap() .push(request.into_inner()); if let Some(reply) = self.state.discover_replies.lock().unwrap().pop_front() { return Ok(Response::new(reply)); } Ok(Response::new(DiscoverHierarchyReply { objects: self.state.objects.lock().unwrap().clone(), next_page_token: String::new(), total_object_count: self.state.objects.lock().unwrap().len() as i32, })) } type WatchDeployEventsStream = Pin> + Send + 'static>>; async fn watch_deploy_events( &self, request: Request, ) -> Result, Status> { self.state .watch_requests .lock() .unwrap() .push(request.into_inner()); let preset = self.state.watch_events.lock().unwrap().clone(); let (tx, rx) = mpsc::channel::>(16); for event in preset { tx.send(Ok(event)) .await .map_err(|err| Status::internal(err.to_string()))?; } self.state.watch_senders.lock().unwrap().push(tx.clone()); let drop_signal = self.state.watch_drop_signal.lock().unwrap().clone(); let stream = ReceiverStream::new(rx); let stream: Pin + Send + 'static>> = if let Some(signal) = drop_signal { Box::pin(WatchStreamWithDropSignal { inner: stream, signal: Some(signal), }) } else { Box::pin(stream) }; Ok(Response::new(stream)) } } /// Wraps the receiver stream so we can detect when the server-side stream /// future is dropped (the client cancelled or dropped the stream). Used by /// `watch_drop_tears_down_call`. struct WatchStreamWithDropSignal { inner: S, signal: Option>, } impl tokio_stream::Stream for WatchStreamWithDropSignal { type Item = S::Item; fn poll_next( mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { Pin::new(&mut self.inner).poll_next(cx) } } impl Drop for WatchStreamWithDropSignal { fn drop(&mut self) { if let Some(signal) = self.signal.take() { let _ = signal.send(()); } } } async fn spawn_fake(state: Arc) -> String { let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); let address = listener.local_addr().unwrap(); let incoming = TcpListenerStream::new(listener); let service = GalaxyRepositoryServer::new(FakeGalaxy { state }); tokio::spawn(async move { Server::builder() .add_service(service) .serve_with_incoming(incoming) .await .unwrap(); }); format!("http://{address}") } #[tokio::test] async fn test_connection_attaches_bearer_metadata_and_returns_ok() { let state = Arc::new(FakeState::default()); let endpoint = spawn_fake(state.clone()).await; let mut client = GalaxyClient::connect( ClientOptions::new(endpoint).with_api_key(ApiKey::new("mxgw_galaxy_secret")), ) .await .unwrap(); let ok = client.test_connection().await.unwrap(); assert!(ok); assert_eq!( state.authorization.lock().unwrap().as_deref(), Some("Bearer mxgw_galaxy_secret") ); } #[tokio::test] async fn get_last_deploy_time_returns_none_when_not_present() { let state = Arc::new(FakeState::default()); *state.present.lock().unwrap() = false; *state.last_deploy.lock().unwrap() = Some(Timestamp { seconds: 1_700_000_000, nanos: 0, }); let endpoint = spawn_fake(state.clone()).await; let mut client = GalaxyClient::connect(ClientOptions::new(endpoint)) .await .unwrap(); let result = client.get_last_deploy_time().await.unwrap(); assert!( result.is_none(), "present=false on the wire must surface as None, got {result:?}" ); } #[tokio::test] async fn get_last_deploy_time_returns_timestamp_when_present() { let state = Arc::new(FakeState::default()); *state.present.lock().unwrap() = true; *state.last_deploy.lock().unwrap() = Some(Timestamp { seconds: 1_700_000_000, nanos: 250_000_000, }); let endpoint = spawn_fake(state.clone()).await; let mut client = GalaxyClient::connect(ClientOptions::new(endpoint)) .await .unwrap(); let result = client.get_last_deploy_time().await.unwrap(); let timestamp = result.expect("present=true should yield a timestamp"); assert_eq!(timestamp.seconds, 1_700_000_000); assert_eq!(timestamp.nanos, 250_000_000); } #[tokio::test] async fn discover_hierarchy_returns_objects_with_attributes() { let state = Arc::new(FakeState::default()); state .discover_replies .lock() .unwrap() .push_back(DiscoverHierarchyReply { objects: vec![GalaxyObject { gobject_id: 42, tag_name: "DelmiaReceiver_001".to_owned(), contained_name: "DelmiaReceiver".to_owned(), browse_name: "TestMachine_001/DelmiaReceiver".to_owned(), parent_gobject_id: 7, is_area: false, category_id: 3, hosted_by_gobject_id: 1, template_chain: vec!["$UserDefined".to_owned(), "$DelmiaReceiver".to_owned()], attributes: vec![GalaxyAttribute { attribute_name: "DownloadPath".to_owned(), full_tag_reference: "DelmiaReceiver_001.DownloadPath".to_owned(), mx_data_type: 8, data_type_name: "MxString".to_owned(), is_array: false, array_dimension: 0, array_dimension_present: false, mx_attribute_category: 2, security_classification: 1, is_historized: false, is_alarm: false, }], }], next_page_token: "page-2".to_owned(), total_object_count: 2, }); state .discover_replies .lock() .unwrap() .push_back(DiscoverHierarchyReply { objects: vec![GalaxyObject { gobject_id: 43, tag_name: "DelmiaReceiver_002".to_owned(), contained_name: String::new(), browse_name: String::new(), parent_gobject_id: 0, is_area: false, category_id: 0, hosted_by_gobject_id: 0, template_chain: Vec::new(), attributes: Vec::new(), }], next_page_token: String::new(), total_object_count: 2, }); let endpoint = spawn_fake(state.clone()).await; let mut client = GalaxyClient::connect(ClientOptions::new(endpoint)) .await .unwrap(); let objects = client.discover_hierarchy().await.unwrap(); assert_eq!(objects.len(), 2); let requests = state.discover_requests.lock().unwrap(); assert_eq!(requests.len(), 2); assert_eq!(requests[0].page_size, 5000); assert_eq!(requests[0].page_token, ""); assert_eq!(requests[1].page_token, "page-2"); assert_eq!(objects[0].tag_name, "DelmiaReceiver_001"); assert_eq!(objects[0].attributes.len(), 1); assert_eq!(objects[0].attributes[0].attribute_name, "DownloadPath"); assert_eq!( objects[0].attributes[0].full_tag_reference, "DelmiaReceiver_001.DownloadPath" ); } #[tokio::test] async fn discover_hierarchy_rejects_repeated_page_token() { let state = Arc::new(FakeState::default()); state .discover_replies .lock() .unwrap() .push_back(DiscoverHierarchyReply { objects: Vec::new(), next_page_token: "7:1".to_owned(), total_object_count: 1, }); state .discover_replies .lock() .unwrap() .push_back(DiscoverHierarchyReply { objects: Vec::new(), next_page_token: "7:1".to_owned(), total_object_count: 1, }); let endpoint = spawn_fake(state).await; let mut client = GalaxyClient::connect(ClientOptions::new(endpoint)) .await .unwrap(); let error = client.discover_hierarchy().await.unwrap_err(); assert!(error.to_string().contains("repeated page token")); } #[tokio::test] async fn watch_deploy_events_yields_events_in_order() { let state = Arc::new(FakeState::default()); *state.watch_events.lock().unwrap() = vec![ DeployEvent { sequence: 1, observed_at: Some(Timestamp { seconds: 1_700_000_000, nanos: 0, }), time_of_last_deploy: Some(Timestamp { seconds: 1_699_000_000, nanos: 0, }), time_of_last_deploy_present: true, object_count: 12, attribute_count: 80, }, DeployEvent { sequence: 2, observed_at: Some(Timestamp { seconds: 1_700_000_500, nanos: 0, }), time_of_last_deploy: Some(Timestamp { seconds: 1_699_500_000, nanos: 0, }), time_of_last_deploy_present: true, object_count: 13, attribute_count: 85, }, ]; let endpoint = spawn_fake(state.clone()).await; let mut client = GalaxyClient::connect(ClientOptions::new(endpoint)) .await .unwrap(); let mut stream = client.watch_deploy_events(None).await.unwrap(); let first = stream .next() .await .expect("bootstrap event") .expect("ok deploy event"); let second = stream .next() .await .expect("second event") .expect("ok deploy event"); assert_eq!(first.sequence, 1); assert_eq!(first.object_count, 12); assert_eq!(second.sequence, 2); assert_eq!(second.object_count, 13); assert!(first.time_of_last_deploy_present); } #[tokio::test] async fn watch_deploy_events_propagates_last_seen_deploy_time() { let state = Arc::new(FakeState::default()); let endpoint = spawn_fake(state.clone()).await; let mut client = GalaxyClient::connect(ClientOptions::new(endpoint)) .await .unwrap(); let last_seen = Timestamp { seconds: 1_699_999_999, nanos: 123_456_789, }; let stream = client.watch_deploy_events(Some(last_seen)).await.unwrap(); // Drop the stream right away — the test is solely about the request // payload reaching the server. drop(stream); // Give the server task a moment to record the request. for _ in 0..20 { if !state.watch_requests.lock().unwrap().is_empty() { break; } tokio::time::sleep(std::time::Duration::from_millis(10)).await; } let requests = state.watch_requests.lock().unwrap().clone(); assert_eq!(requests.len(), 1); let recorded = requests[0] .last_seen_deploy_time .as_ref() .expect("last_seen_deploy_time forwarded"); assert_eq!(recorded.seconds, last_seen.seconds); assert_eq!(recorded.nanos, last_seen.nanos); } #[tokio::test] async fn watch_deploy_events_drop_tears_down_call() { let state = Arc::new(FakeState::default()); let (signal_tx, mut signal_rx) = mpsc::unbounded_channel(); *state.watch_drop_signal.lock().unwrap() = Some(signal_tx); // Seed one event so the client gets something on the stream before we // drop it; this proves the call is live. *state.watch_events.lock().unwrap() = vec![DeployEvent { sequence: 7, observed_at: None, time_of_last_deploy: None, time_of_last_deploy_present: false, object_count: 0, attribute_count: 0, }]; let endpoint = spawn_fake(state.clone()).await; let mut client = GalaxyClient::connect(ClientOptions::new(endpoint)) .await .unwrap(); let mut stream = client.watch_deploy_events(None).await.unwrap(); let event = stream .next() .await .expect("bootstrap event") .expect("ok deploy event"); assert_eq!(event.sequence, 7); // Dropping the client-side stream must trigger the server-side stream // future to be dropped as well, signalling cancellation. drop(stream); let drop_seen = tokio::time::timeout(std::time::Duration::from_secs(2), signal_rx.recv()) .await .expect("server-side stream future was not dropped within 2s"); assert!( drop_seen.is_some(), "drop signal channel closed unexpectedly" ); } }