1230 lines
44 KiB
Rust
1230 lines
44 KiB
Rust
//! Thin async wrapper for the `GalaxyRepository` gRPC service.
|
|
//!
|
|
//! The wrapper mirrors [`crate::client::GatewayClient`]: it owns a tonic
|
|
//! channel with the shared bearer-token interceptor and exposes the three
|
|
//! read-only RPCs as Rust async methods. Generated Galaxy proto types are
|
|
//! re-exported through [`crate::generated::galaxy_repository::v1`].
|
|
|
|
use std::collections::HashSet;
|
|
use std::fs;
|
|
use std::sync::Arc;
|
|
|
|
use prost_types::Timestamp;
|
|
use tokio::sync::Mutex as AsyncMutex;
|
|
use tonic::codegen::InterceptedService;
|
|
use tonic::transport::{Certificate, Channel, ClientTlsConfig};
|
|
use tonic::Request;
|
|
|
|
use crate::auth::AuthInterceptor;
|
|
use crate::error::Error;
|
|
use crate::generated::galaxy_repository::v1::galaxy_repository_client::GalaxyRepositoryClient;
|
|
use crate::generated::galaxy_repository::v1::{
|
|
browse_children_request, BrowseChildrenReply, BrowseChildrenRequest, DeployEvent,
|
|
DiscoverHierarchyRequest, GalaxyObject, GetLastDeployTimeRequest, TestConnectionRequest,
|
|
WatchDeployEventsRequest,
|
|
};
|
|
use crate::options::ClientOptions;
|
|
|
|
const DISCOVER_HIERARCHY_PAGE_SIZE: i32 = 5000;
|
|
const BROWSE_CHILDREN_PAGE_SIZE: i32 = 500;
|
|
|
|
/// Optional filter set forwarded to `GalaxyRepository.BrowseChildren`.
|
|
///
|
|
/// Mirrors the request-level filters on the wire: combined with AND so a child
|
|
/// only appears when it satisfies every populated criterion. Construct via
|
|
/// [`BrowseChildrenOptions::default`] and tweak the fields you care about.
|
|
#[derive(Debug, Clone, Default)]
|
|
pub struct BrowseChildrenOptions {
|
|
/// Restrict to objects whose `category_id` matches one of the supplied
|
|
/// Galaxy category identifiers. Empty means "no restriction".
|
|
pub category_ids: Vec<i32>,
|
|
/// Restrict to objects whose template chain contains every supplied
|
|
/// template name (case-sensitive substring match on each entry).
|
|
pub template_chain_contains: Vec<String>,
|
|
/// Restrict to objects whose tag name matches the supplied glob (SQL
|
|
/// `LIKE`-style on the server). `None` means "no glob filter".
|
|
pub tag_name_glob: Option<String>,
|
|
/// Optional tri-state hint for whether to populate `GalaxyObject.attributes`
|
|
/// on returned children. `None` falls back to the server default.
|
|
pub include_attributes: Option<bool>,
|
|
/// When `true`, only return children that own at least one alarm-bearing
|
|
/// attribute (matches `DiscoverHierarchy` semantics).
|
|
pub alarm_bearing_only: bool,
|
|
/// When `true`, only return children that own at least one historized
|
|
/// attribute (matches `DiscoverHierarchy` semantics).
|
|
pub historized_only: bool,
|
|
}
|
|
|
|
/// Lazy hierarchy node used by the walker built on top of `BrowseChildren`.
|
|
///
|
|
/// A node owns its [`GalaxyObject`], a hint as to whether the server believes
|
|
/// it has at least one matching descendant under the active filter set, and an
|
|
/// internal `expanded` flag protected by an async mutex. Calling [`expand`]
|
|
/// the first time issues a paged `BrowseChildren` RPC; subsequent calls are
|
|
/// no-ops so callers can poll without re-hitting the server.
|
|
///
|
|
/// `LazyBrowseNode` is cheap to clone — clones share state through an
|
|
/// internal `Arc`, so expanding one clone makes the children visible to every
|
|
/// other clone.
|
|
///
|
|
/// [`expand`]: LazyBrowseNode::expand
|
|
pub struct LazyBrowseNode {
|
|
inner: Arc<LazyBrowseNodeInner>,
|
|
}
|
|
|
|
impl Clone for LazyBrowseNode {
|
|
fn clone(&self) -> Self {
|
|
Self {
|
|
inner: Arc::clone(&self.inner),
|
|
}
|
|
}
|
|
}
|
|
|
|
struct LazyBrowseNodeInner {
|
|
client: GalaxyClient,
|
|
object: GalaxyObject,
|
|
has_children_hint: bool,
|
|
options: BrowseChildrenOptions,
|
|
state: AsyncMutex<LazyBrowseNodeState>,
|
|
}
|
|
|
|
struct LazyBrowseNodeState {
|
|
children: Vec<LazyBrowseNode>,
|
|
is_expanded: bool,
|
|
}
|
|
|
|
impl LazyBrowseNode {
|
|
/// Borrow the [`GalaxyObject`] returned by the server for this node.
|
|
pub fn object(&self) -> &GalaxyObject {
|
|
&self.inner.object
|
|
}
|
|
|
|
/// Server-supplied hint: `true` when the child likely has at least one
|
|
/// further matching descendant. Useful to decide whether a UI should draw
|
|
/// an expand triangle without issuing the RPC up front.
|
|
pub fn has_children_hint(&self) -> bool {
|
|
self.inner.has_children_hint
|
|
}
|
|
|
|
/// Snapshot of the currently-known children. Empty until [`expand`] has
|
|
/// run at least once.
|
|
///
|
|
/// [`expand`]: LazyBrowseNode::expand
|
|
pub async fn children(&self) -> Vec<LazyBrowseNode> {
|
|
self.inner.state.lock().await.children.clone()
|
|
}
|
|
|
|
/// Returns `true` once [`expand`] has populated this node's children.
|
|
///
|
|
/// [`expand`]: LazyBrowseNode::expand
|
|
pub async fn is_expanded(&self) -> bool {
|
|
self.inner.state.lock().await.is_expanded
|
|
}
|
|
|
|
/// Populate this node's children by issuing a paged `BrowseChildren` RPC.
|
|
/// Subsequent calls are no-ops — the cached children stay in place and no
|
|
/// additional RPC is issued.
|
|
pub async fn expand(&self) -> Result<(), Error> {
|
|
let mut state = self.inner.state.lock().await;
|
|
if state.is_expanded {
|
|
return Ok(());
|
|
}
|
|
|
|
let mut client = self.inner.client.clone();
|
|
let new_children = client
|
|
.browse_children_inner(
|
|
Some(self.inner.object.gobject_id),
|
|
self.inner.options.clone(),
|
|
)
|
|
.await?;
|
|
|
|
state.children = new_children;
|
|
state.is_expanded = true;
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
/// Convenience alias for the generated Galaxy client wrapped in the
|
|
/// authentication interceptor.
|
|
pub type RawGalaxyClient = GalaxyRepositoryClient<InterceptedService<Channel, AuthInterceptor>>;
|
|
|
|
/// Stream of `DeployEvent` values returned by
|
|
/// [`GalaxyClient::watch_deploy_events`]. Mirrors
|
|
/// [`crate::client::EventStream`]: a boxed `Stream` whose `tonic::Status`
|
|
/// errors have already been mapped onto [`Error`]. Dropping the stream
|
|
/// cancels the underlying gRPC call.
|
|
pub type DeployEventStream = std::pin::Pin<
|
|
Box<dyn futures_core::Stream<Item = Result<DeployEvent, Error>> + Send + 'static>,
|
|
>;
|
|
|
|
/// Thin async wrapper around the generated Galaxy Repository gRPC client.
|
|
///
|
|
/// Construct it with [`GalaxyClient::connect`] using the same
|
|
/// [`ClientOptions`] that drive [`crate::client::GatewayClient`]. The
|
|
/// service is metadata-only (no sessions) and requires the `metadata:read`
|
|
/// API-key scope on the server side.
|
|
#[derive(Clone)]
|
|
pub struct GalaxyClient {
|
|
inner: RawGalaxyClient,
|
|
call_timeout: std::time::Duration,
|
|
stream_timeout: Option<std::time::Duration>,
|
|
}
|
|
|
|
impl GalaxyClient {
|
|
/// Connect to the gateway endpoint and build a Galaxy client. Mirrors
|
|
/// the TLS / plaintext / API-key handling used by `GatewayClient`.
|
|
pub async fn connect(options: ClientOptions) -> Result<Self, Error> {
|
|
let mut endpoint =
|
|
Channel::from_shared(options.endpoint().to_owned()).map_err(|source| {
|
|
Error::InvalidEndpoint {
|
|
endpoint: options.endpoint().to_owned(),
|
|
detail: source.to_string(),
|
|
}
|
|
})?;
|
|
endpoint = endpoint.connect_timeout(options.connect_timeout());
|
|
|
|
if !options.plaintext() {
|
|
let mut tls = ClientTlsConfig::new();
|
|
if let Some(server_name) = options.server_name_override() {
|
|
tls = tls.domain_name(server_name.to_owned());
|
|
}
|
|
if let Some(ca_file) = options.ca_file() {
|
|
let certificate = fs::read(ca_file).map_err(|source| Error::InvalidEndpoint {
|
|
endpoint: options.endpoint().to_owned(),
|
|
detail: format!("failed to read CA file {}: {source}", ca_file.display()),
|
|
})?;
|
|
tls = tls.ca_certificate(Certificate::from_pem(certificate));
|
|
}
|
|
endpoint = endpoint.tls_config(tls)?;
|
|
}
|
|
|
|
let channel = endpoint.connect().await?;
|
|
let interceptor = AuthInterceptor::new(options.api_key().cloned());
|
|
let max_grpc_message_bytes = options.max_grpc_message_bytes();
|
|
|
|
Ok(Self {
|
|
inner: GalaxyRepositoryClient::with_interceptor(channel, interceptor)
|
|
.max_decoding_message_size(max_grpc_message_bytes)
|
|
.max_encoding_message_size(max_grpc_message_bytes),
|
|
call_timeout: options.call_timeout(),
|
|
stream_timeout: options.stream_timeout(),
|
|
})
|
|
}
|
|
|
|
/// Build a [`GalaxyClient`] that talks through an existing tonic
|
|
/// channel. Tests use this to wire up an in-memory transport.
|
|
pub fn from_channel(channel: Channel, options: &ClientOptions) -> Self {
|
|
let interceptor = AuthInterceptor::new(options.api_key().cloned());
|
|
let max_grpc_message_bytes = options.max_grpc_message_bytes();
|
|
Self {
|
|
inner: GalaxyRepositoryClient::with_interceptor(channel, interceptor)
|
|
.max_decoding_message_size(max_grpc_message_bytes)
|
|
.max_encoding_message_size(max_grpc_message_bytes),
|
|
call_timeout: options.call_timeout(),
|
|
stream_timeout: options.stream_timeout(),
|
|
}
|
|
}
|
|
|
|
/// Borrow the underlying generated client for advanced callers that need
|
|
/// access to features not surfaced by the wrapper.
|
|
pub fn raw_client(&mut self) -> &mut RawGalaxyClient {
|
|
&mut self.inner
|
|
}
|
|
|
|
/// Consume the wrapper and return the generated client.
|
|
pub fn into_inner(self) -> RawGalaxyClient {
|
|
self.inner
|
|
}
|
|
|
|
/// Probe the Galaxy Repository database connection. Returns the `ok`
|
|
/// flag from the server reply.
|
|
pub async fn test_connection(&mut self) -> Result<bool, Error> {
|
|
let response = self
|
|
.inner
|
|
.test_connection(self.unary_request(TestConnectionRequest {}))
|
|
.await?;
|
|
Ok(response.into_inner().ok)
|
|
}
|
|
|
|
/// Read the most recent Galaxy deployment timestamp. Returns `None`
|
|
/// when the server reports `present = false`.
|
|
pub async fn get_last_deploy_time(&mut self) -> Result<Option<Timestamp>, Error> {
|
|
let response = self
|
|
.inner
|
|
.get_last_deploy_time(self.unary_request(GetLastDeployTimeRequest {}))
|
|
.await?;
|
|
let reply = response.into_inner();
|
|
if reply.present {
|
|
Ok(reply.time_of_last_deploy)
|
|
} else {
|
|
Ok(None)
|
|
}
|
|
}
|
|
|
|
/// Walk the deployed object hierarchy. Each [`GalaxyObject`] contains
|
|
/// the object's identifying names plus its dynamic attributes.
|
|
pub async fn discover_hierarchy(&mut self) -> Result<Vec<GalaxyObject>, Error> {
|
|
let mut objects = Vec::new();
|
|
let mut seen_page_tokens = std::collections::HashSet::new();
|
|
let mut page_token = String::new();
|
|
loop {
|
|
let response = self
|
|
.inner
|
|
.discover_hierarchy(self.unary_request(DiscoverHierarchyRequest {
|
|
page_size: DISCOVER_HIERARCHY_PAGE_SIZE,
|
|
page_token,
|
|
..Default::default()
|
|
}))
|
|
.await?;
|
|
let reply = response.into_inner();
|
|
objects.extend(reply.objects);
|
|
page_token = reply.next_page_token;
|
|
if page_token.is_empty() {
|
|
return Ok(objects);
|
|
}
|
|
if !seen_page_tokens.insert(page_token.clone()) {
|
|
return Err(Error::InvalidArgument {
|
|
name: "page_token".to_owned(),
|
|
detail: format!(
|
|
"galaxy discover hierarchy returned repeated page token `{page_token}`"
|
|
),
|
|
});
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Browse the top-level (root) objects of the hierarchy as
|
|
/// [`LazyBrowseNode`] instances. Pass [`BrowseChildrenOptions`] to
|
|
/// restrict the result set; the same filter is reused when callers expand
|
|
/// any returned node.
|
|
pub async fn browse(
|
|
&mut self,
|
|
options: Option<BrowseChildrenOptions>,
|
|
) -> Result<Vec<LazyBrowseNode>, Error> {
|
|
let effective = options.unwrap_or_default();
|
|
self.browse_children_inner(None, effective).await
|
|
}
|
|
|
|
/// Issue a single `BrowseChildren` RPC and return the raw reply. Callers
|
|
/// that want to drive paging themselves (or inspect the cache sequence)
|
|
/// use this; high-level walking goes through [`browse`] and
|
|
/// [`LazyBrowseNode::expand`].
|
|
///
|
|
/// [`browse`]: GalaxyClient::browse
|
|
pub async fn browse_children_raw(
|
|
&mut self,
|
|
request: BrowseChildrenRequest,
|
|
) -> Result<BrowseChildrenReply, Error> {
|
|
let response = self
|
|
.inner
|
|
.browse_children(self.unary_request(request))
|
|
.await?;
|
|
Ok(response.into_inner())
|
|
}
|
|
|
|
pub(crate) async fn browse_children_inner(
|
|
&mut self,
|
|
parent_gobject_id: Option<i32>,
|
|
options: BrowseChildrenOptions,
|
|
) -> Result<Vec<LazyBrowseNode>, Error> {
|
|
let mut nodes = Vec::new();
|
|
let mut page_token = String::new();
|
|
let mut seen_page_tokens: HashSet<String> = HashSet::new();
|
|
loop {
|
|
let parent = parent_gobject_id.map(browse_children_request::Parent::ParentGobjectId);
|
|
let request = BrowseChildrenRequest {
|
|
page_size: BROWSE_CHILDREN_PAGE_SIZE,
|
|
page_token: page_token.clone(),
|
|
category_ids: options.category_ids.clone(),
|
|
template_chain_contains: options.template_chain_contains.clone(),
|
|
tag_name_glob: options.tag_name_glob.clone().unwrap_or_default(),
|
|
include_attributes: options.include_attributes,
|
|
alarm_bearing_only: options.alarm_bearing_only,
|
|
historized_only: options.historized_only,
|
|
parent,
|
|
};
|
|
|
|
let reply = self.browse_children_raw(request).await?;
|
|
let hints = reply.child_has_children;
|
|
for (index, object) in reply.children.into_iter().enumerate() {
|
|
let hint = hints.get(index).copied().unwrap_or(false);
|
|
nodes.push(self.make_lazy_node(object, hint, options.clone()));
|
|
}
|
|
|
|
page_token = reply.next_page_token;
|
|
if page_token.is_empty() {
|
|
return Ok(nodes);
|
|
}
|
|
if !seen_page_tokens.insert(page_token.clone()) {
|
|
return Err(Error::InvalidArgument {
|
|
name: "page_token".to_owned(),
|
|
detail: format!(
|
|
"galaxy browse children returned repeated page token `{page_token}`"
|
|
),
|
|
});
|
|
}
|
|
}
|
|
}
|
|
|
|
fn make_lazy_node(
|
|
&self,
|
|
object: GalaxyObject,
|
|
has_children_hint: bool,
|
|
options: BrowseChildrenOptions,
|
|
) -> LazyBrowseNode {
|
|
LazyBrowseNode {
|
|
inner: Arc::new(LazyBrowseNodeInner {
|
|
client: self.clone(),
|
|
object,
|
|
has_children_hint,
|
|
options,
|
|
state: AsyncMutex::new(LazyBrowseNodeState {
|
|
children: Vec::new(),
|
|
is_expanded: false,
|
|
}),
|
|
}),
|
|
}
|
|
}
|
|
|
|
/// Subscribe to the server-streamed deploy-event feed.
|
|
///
|
|
/// The server emits a bootstrap event describing the current cache state
|
|
/// immediately on subscribe, then one event per observed change to
|
|
/// `galaxy.time_of_last_deploy`. When `last_seen_deploy_time` matches the
|
|
/// current cache, the bootstrap event is suppressed and the stream stays
|
|
/// idle until the next deploy.
|
|
///
|
|
/// Cancellation is cooperative: dropping the returned
|
|
/// [`DeployEventStream`] tears down the underlying gRPC call. Callers
|
|
/// drive consumption with `StreamExt::next` (or any other `Stream`
|
|
/// adapter).
|
|
pub async fn watch_deploy_events(
|
|
&mut self,
|
|
last_seen_deploy_time: Option<Timestamp>,
|
|
) -> Result<DeployEventStream, Error> {
|
|
let request = WatchDeployEventsRequest {
|
|
last_seen_deploy_time,
|
|
};
|
|
let response = self
|
|
.inner
|
|
.watch_deploy_events(self.stream_request(request))
|
|
.await?;
|
|
let stream = futures_util::StreamExt::map(response.into_inner(), |result| {
|
|
result.map_err(Error::from)
|
|
});
|
|
Ok(Box::pin(stream))
|
|
}
|
|
|
|
fn unary_request<T>(&self, message: T) -> Request<T> {
|
|
let mut request = Request::new(message);
|
|
request.set_timeout(self.call_timeout);
|
|
request
|
|
}
|
|
|
|
fn stream_request<T>(&self, message: T) -> Request<T> {
|
|
let mut request = Request::new(message);
|
|
if let Some(timeout) = self.stream_timeout {
|
|
request.set_timeout(timeout);
|
|
}
|
|
request
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use std::pin::Pin;
|
|
use std::sync::{Arc, Mutex};
|
|
|
|
use futures_util::StreamExt;
|
|
use tokio::net::TcpListener;
|
|
use tokio::sync::mpsc;
|
|
use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream};
|
|
use tonic::transport::Server;
|
|
use tonic::{Request, Response, Status};
|
|
|
|
use super::*;
|
|
use crate::auth::ApiKey;
|
|
use crate::generated::galaxy_repository::v1::galaxy_repository_server::{
|
|
GalaxyRepository, GalaxyRepositoryServer,
|
|
};
|
|
use crate::generated::galaxy_repository::v1::{
|
|
BrowseChildrenReply, BrowseChildrenRequest, DeployEvent, DiscoverHierarchyReply,
|
|
DiscoverHierarchyRequest, GalaxyAttribute, GalaxyObject, GetLastDeployTimeReply,
|
|
GetLastDeployTimeRequest, TestConnectionReply, TestConnectionRequest,
|
|
WatchDeployEventsRequest,
|
|
};
|
|
|
|
type DeployEventTx = mpsc::Sender<Result<DeployEvent, Status>>;
|
|
|
|
#[derive(Default)]
|
|
struct FakeState {
|
|
authorization: Mutex<Option<String>>,
|
|
present: Mutex<bool>,
|
|
last_deploy: Mutex<Option<Timestamp>>,
|
|
objects: Mutex<Vec<GalaxyObject>>,
|
|
discover_requests: Mutex<Vec<DiscoverHierarchyRequest>>,
|
|
discover_replies: Mutex<std::collections::VecDeque<DiscoverHierarchyReply>>,
|
|
browse_children_calls: Mutex<Vec<BrowseChildrenRequest>>,
|
|
browse_children_replies: Mutex<std::collections::VecDeque<BrowseChildrenReply>>,
|
|
browse_children_errors: Mutex<Vec<Status>>,
|
|
watch_requests: Mutex<Vec<WatchDeployEventsRequest>>,
|
|
watch_events: Mutex<Vec<DeployEvent>>,
|
|
watch_senders: Mutex<Vec<DeployEventTx>>,
|
|
watch_drop_signal: Mutex<Option<mpsc::UnboundedSender<()>>>,
|
|
}
|
|
|
|
#[derive(Clone)]
|
|
struct FakeGalaxy {
|
|
state: Arc<FakeState>,
|
|
}
|
|
|
|
#[tonic::async_trait]
|
|
impl GalaxyRepository for FakeGalaxy {
|
|
async fn test_connection(
|
|
&self,
|
|
request: Request<TestConnectionRequest>,
|
|
) -> Result<Response<TestConnectionReply>, Status> {
|
|
*self.state.authorization.lock().unwrap() = request
|
|
.metadata()
|
|
.get("authorization")
|
|
.and_then(|value| value.to_str().ok())
|
|
.map(str::to_owned);
|
|
Ok(Response::new(TestConnectionReply { ok: true }))
|
|
}
|
|
|
|
async fn get_last_deploy_time(
|
|
&self,
|
|
_request: Request<GetLastDeployTimeRequest>,
|
|
) -> Result<Response<GetLastDeployTimeReply>, Status> {
|
|
let present = *self.state.present.lock().unwrap();
|
|
let time = *self.state.last_deploy.lock().unwrap();
|
|
Ok(Response::new(GetLastDeployTimeReply {
|
|
present,
|
|
time_of_last_deploy: time,
|
|
}))
|
|
}
|
|
|
|
async fn discover_hierarchy(
|
|
&self,
|
|
request: Request<DiscoverHierarchyRequest>,
|
|
) -> Result<Response<DiscoverHierarchyReply>, Status> {
|
|
self.state
|
|
.discover_requests
|
|
.lock()
|
|
.unwrap()
|
|
.push(request.into_inner());
|
|
if let Some(reply) = self.state.discover_replies.lock().unwrap().pop_front() {
|
|
return Ok(Response::new(reply));
|
|
}
|
|
|
|
Ok(Response::new(DiscoverHierarchyReply {
|
|
objects: self.state.objects.lock().unwrap().clone(),
|
|
next_page_token: String::new(),
|
|
total_object_count: self.state.objects.lock().unwrap().len() as i32,
|
|
}))
|
|
}
|
|
|
|
async fn browse_children(
|
|
&self,
|
|
request: Request<BrowseChildrenRequest>,
|
|
) -> Result<Response<BrowseChildrenReply>, Status> {
|
|
self.state
|
|
.browse_children_calls
|
|
.lock()
|
|
.unwrap()
|
|
.push(request.into_inner());
|
|
if let Some(error) = self.state.browse_children_errors.lock().unwrap().pop() {
|
|
return Err(error);
|
|
}
|
|
let reply = self
|
|
.state
|
|
.browse_children_replies
|
|
.lock()
|
|
.unwrap()
|
|
.pop_front()
|
|
.unwrap_or_default();
|
|
Ok(Response::new(reply))
|
|
}
|
|
|
|
type WatchDeployEventsStream =
|
|
Pin<Box<dyn tokio_stream::Stream<Item = Result<DeployEvent, Status>> + Send + 'static>>;
|
|
|
|
async fn watch_deploy_events(
|
|
&self,
|
|
request: Request<WatchDeployEventsRequest>,
|
|
) -> Result<Response<Self::WatchDeployEventsStream>, Status> {
|
|
self.state
|
|
.watch_requests
|
|
.lock()
|
|
.unwrap()
|
|
.push(request.into_inner());
|
|
|
|
let preset = self.state.watch_events.lock().unwrap().clone();
|
|
let (tx, rx) = mpsc::channel::<Result<DeployEvent, Status>>(16);
|
|
for event in preset {
|
|
tx.send(Ok(event))
|
|
.await
|
|
.map_err(|err| Status::internal(err.to_string()))?;
|
|
}
|
|
self.state.watch_senders.lock().unwrap().push(tx.clone());
|
|
|
|
let drop_signal = self.state.watch_drop_signal.lock().unwrap().clone();
|
|
let stream = ReceiverStream::new(rx);
|
|
let stream: Pin<Box<dyn tokio_stream::Stream<Item = _> + Send + 'static>> =
|
|
if let Some(signal) = drop_signal {
|
|
Box::pin(WatchStreamWithDropSignal {
|
|
inner: stream,
|
|
signal: Some(signal),
|
|
})
|
|
} else {
|
|
Box::pin(stream)
|
|
};
|
|
|
|
Ok(Response::new(stream))
|
|
}
|
|
}
|
|
|
|
/// Wraps the receiver stream so we can detect when the server-side stream
|
|
/// future is dropped (the client cancelled or dropped the stream). Used by
|
|
/// `watch_drop_tears_down_call`.
|
|
struct WatchStreamWithDropSignal<S> {
|
|
inner: S,
|
|
signal: Option<mpsc::UnboundedSender<()>>,
|
|
}
|
|
|
|
impl<S: tokio_stream::Stream + Unpin> tokio_stream::Stream for WatchStreamWithDropSignal<S> {
|
|
type Item = S::Item;
|
|
|
|
fn poll_next(
|
|
mut self: Pin<&mut Self>,
|
|
cx: &mut std::task::Context<'_>,
|
|
) -> std::task::Poll<Option<Self::Item>> {
|
|
Pin::new(&mut self.inner).poll_next(cx)
|
|
}
|
|
}
|
|
|
|
impl<S> Drop for WatchStreamWithDropSignal<S> {
|
|
fn drop(&mut self) {
|
|
if let Some(signal) = self.signal.take() {
|
|
let _ = signal.send(());
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn spawn_fake(state: Arc<FakeState>) -> String {
|
|
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
|
let address = listener.local_addr().unwrap();
|
|
let incoming = TcpListenerStream::new(listener);
|
|
let service = GalaxyRepositoryServer::new(FakeGalaxy { state });
|
|
tokio::spawn(async move {
|
|
Server::builder()
|
|
.add_service(service)
|
|
.serve_with_incoming(incoming)
|
|
.await
|
|
.unwrap();
|
|
});
|
|
format!("http://{address}")
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_connection_attaches_bearer_metadata_and_returns_ok() {
|
|
let state = Arc::new(FakeState::default());
|
|
let endpoint = spawn_fake(state.clone()).await;
|
|
|
|
let mut client = GalaxyClient::connect(
|
|
ClientOptions::new(endpoint).with_api_key(ApiKey::new("mxgw_galaxy_secret")),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
let ok = client.test_connection().await.unwrap();
|
|
|
|
assert!(ok);
|
|
assert_eq!(
|
|
state.authorization.lock().unwrap().as_deref(),
|
|
Some("Bearer mxgw_galaxy_secret")
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn get_last_deploy_time_returns_none_when_not_present() {
|
|
let state = Arc::new(FakeState::default());
|
|
*state.present.lock().unwrap() = false;
|
|
*state.last_deploy.lock().unwrap() = Some(Timestamp {
|
|
seconds: 1_700_000_000,
|
|
nanos: 0,
|
|
});
|
|
let endpoint = spawn_fake(state.clone()).await;
|
|
|
|
let mut client = GalaxyClient::connect(ClientOptions::new(endpoint))
|
|
.await
|
|
.unwrap();
|
|
|
|
let result = client.get_last_deploy_time().await.unwrap();
|
|
|
|
assert!(
|
|
result.is_none(),
|
|
"present=false on the wire must surface as None, got {result:?}"
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn get_last_deploy_time_returns_timestamp_when_present() {
|
|
let state = Arc::new(FakeState::default());
|
|
*state.present.lock().unwrap() = true;
|
|
*state.last_deploy.lock().unwrap() = Some(Timestamp {
|
|
seconds: 1_700_000_000,
|
|
nanos: 250_000_000,
|
|
});
|
|
let endpoint = spawn_fake(state.clone()).await;
|
|
|
|
let mut client = GalaxyClient::connect(ClientOptions::new(endpoint))
|
|
.await
|
|
.unwrap();
|
|
|
|
let result = client.get_last_deploy_time().await.unwrap();
|
|
|
|
let timestamp = result.expect("present=true should yield a timestamp");
|
|
assert_eq!(timestamp.seconds, 1_700_000_000);
|
|
assert_eq!(timestamp.nanos, 250_000_000);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn discover_hierarchy_returns_objects_with_attributes() {
|
|
let state = Arc::new(FakeState::default());
|
|
state
|
|
.discover_replies
|
|
.lock()
|
|
.unwrap()
|
|
.push_back(DiscoverHierarchyReply {
|
|
objects: vec![GalaxyObject {
|
|
gobject_id: 42,
|
|
tag_name: "DelmiaReceiver_001".to_owned(),
|
|
contained_name: "DelmiaReceiver".to_owned(),
|
|
browse_name: "TestMachine_001/DelmiaReceiver".to_owned(),
|
|
parent_gobject_id: 7,
|
|
is_area: false,
|
|
category_id: 3,
|
|
hosted_by_gobject_id: 1,
|
|
template_chain: vec!["$UserDefined".to_owned(), "$DelmiaReceiver".to_owned()],
|
|
attributes: vec![GalaxyAttribute {
|
|
attribute_name: "DownloadPath".to_owned(),
|
|
full_tag_reference: "DelmiaReceiver_001.DownloadPath".to_owned(),
|
|
mx_data_type: 8,
|
|
data_type_name: "MxString".to_owned(),
|
|
is_array: false,
|
|
array_dimension: 0,
|
|
array_dimension_present: false,
|
|
mx_attribute_category: 2,
|
|
security_classification: 1,
|
|
is_historized: false,
|
|
is_alarm: false,
|
|
}],
|
|
}],
|
|
next_page_token: "page-2".to_owned(),
|
|
total_object_count: 2,
|
|
});
|
|
state
|
|
.discover_replies
|
|
.lock()
|
|
.unwrap()
|
|
.push_back(DiscoverHierarchyReply {
|
|
objects: vec![GalaxyObject {
|
|
gobject_id: 43,
|
|
tag_name: "DelmiaReceiver_002".to_owned(),
|
|
contained_name: String::new(),
|
|
browse_name: String::new(),
|
|
parent_gobject_id: 0,
|
|
is_area: false,
|
|
category_id: 0,
|
|
hosted_by_gobject_id: 0,
|
|
template_chain: Vec::new(),
|
|
attributes: Vec::new(),
|
|
}],
|
|
next_page_token: String::new(),
|
|
total_object_count: 2,
|
|
});
|
|
let endpoint = spawn_fake(state.clone()).await;
|
|
|
|
let mut client = GalaxyClient::connect(ClientOptions::new(endpoint))
|
|
.await
|
|
.unwrap();
|
|
|
|
let objects = client.discover_hierarchy().await.unwrap();
|
|
|
|
assert_eq!(objects.len(), 2);
|
|
let requests = state.discover_requests.lock().unwrap();
|
|
assert_eq!(requests.len(), 2);
|
|
assert_eq!(requests[0].page_size, 5000);
|
|
assert_eq!(requests[0].page_token, "");
|
|
assert_eq!(requests[1].page_token, "page-2");
|
|
assert_eq!(objects[0].tag_name, "DelmiaReceiver_001");
|
|
assert_eq!(objects[0].attributes.len(), 1);
|
|
assert_eq!(objects[0].attributes[0].attribute_name, "DownloadPath");
|
|
assert_eq!(
|
|
objects[0].attributes[0].full_tag_reference,
|
|
"DelmiaReceiver_001.DownloadPath"
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn discover_hierarchy_rejects_repeated_page_token() {
|
|
let state = Arc::new(FakeState::default());
|
|
state
|
|
.discover_replies
|
|
.lock()
|
|
.unwrap()
|
|
.push_back(DiscoverHierarchyReply {
|
|
objects: Vec::new(),
|
|
next_page_token: "7:1".to_owned(),
|
|
total_object_count: 1,
|
|
});
|
|
state
|
|
.discover_replies
|
|
.lock()
|
|
.unwrap()
|
|
.push_back(DiscoverHierarchyReply {
|
|
objects: Vec::new(),
|
|
next_page_token: "7:1".to_owned(),
|
|
total_object_count: 1,
|
|
});
|
|
let endpoint = spawn_fake(state).await;
|
|
let mut client = GalaxyClient::connect(ClientOptions::new(endpoint))
|
|
.await
|
|
.unwrap();
|
|
|
|
let error = client.discover_hierarchy().await.unwrap_err();
|
|
|
|
assert!(error.to_string().contains("repeated page token"));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn watch_deploy_events_yields_events_in_order() {
|
|
let state = Arc::new(FakeState::default());
|
|
*state.watch_events.lock().unwrap() = vec![
|
|
DeployEvent {
|
|
sequence: 1,
|
|
observed_at: Some(Timestamp {
|
|
seconds: 1_700_000_000,
|
|
nanos: 0,
|
|
}),
|
|
time_of_last_deploy: Some(Timestamp {
|
|
seconds: 1_699_000_000,
|
|
nanos: 0,
|
|
}),
|
|
time_of_last_deploy_present: true,
|
|
object_count: 12,
|
|
attribute_count: 80,
|
|
},
|
|
DeployEvent {
|
|
sequence: 2,
|
|
observed_at: Some(Timestamp {
|
|
seconds: 1_700_000_500,
|
|
nanos: 0,
|
|
}),
|
|
time_of_last_deploy: Some(Timestamp {
|
|
seconds: 1_699_500_000,
|
|
nanos: 0,
|
|
}),
|
|
time_of_last_deploy_present: true,
|
|
object_count: 13,
|
|
attribute_count: 85,
|
|
},
|
|
];
|
|
let endpoint = spawn_fake(state.clone()).await;
|
|
|
|
let mut client = GalaxyClient::connect(ClientOptions::new(endpoint))
|
|
.await
|
|
.unwrap();
|
|
|
|
let mut stream = client.watch_deploy_events(None).await.unwrap();
|
|
|
|
let first = stream
|
|
.next()
|
|
.await
|
|
.expect("bootstrap event")
|
|
.expect("ok deploy event");
|
|
let second = stream
|
|
.next()
|
|
.await
|
|
.expect("second event")
|
|
.expect("ok deploy event");
|
|
|
|
assert_eq!(first.sequence, 1);
|
|
assert_eq!(first.object_count, 12);
|
|
assert_eq!(second.sequence, 2);
|
|
assert_eq!(second.object_count, 13);
|
|
assert!(first.time_of_last_deploy_present);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn watch_deploy_events_propagates_last_seen_deploy_time() {
|
|
let state = Arc::new(FakeState::default());
|
|
let endpoint = spawn_fake(state.clone()).await;
|
|
|
|
let mut client = GalaxyClient::connect(ClientOptions::new(endpoint))
|
|
.await
|
|
.unwrap();
|
|
|
|
let last_seen = Timestamp {
|
|
seconds: 1_699_999_999,
|
|
nanos: 123_456_789,
|
|
};
|
|
let stream = client.watch_deploy_events(Some(last_seen)).await.unwrap();
|
|
|
|
// Drop the stream right away — the test is solely about the request
|
|
// payload reaching the server.
|
|
drop(stream);
|
|
|
|
// Give the server task a moment to record the request.
|
|
for _ in 0..20 {
|
|
if !state.watch_requests.lock().unwrap().is_empty() {
|
|
break;
|
|
}
|
|
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
|
|
}
|
|
|
|
let requests = state.watch_requests.lock().unwrap().clone();
|
|
assert_eq!(requests.len(), 1);
|
|
let recorded = requests[0]
|
|
.last_seen_deploy_time
|
|
.as_ref()
|
|
.expect("last_seen_deploy_time forwarded");
|
|
assert_eq!(recorded.seconds, last_seen.seconds);
|
|
assert_eq!(recorded.nanos, last_seen.nanos);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn watch_deploy_events_drop_tears_down_call() {
|
|
let state = Arc::new(FakeState::default());
|
|
let (signal_tx, mut signal_rx) = mpsc::unbounded_channel();
|
|
*state.watch_drop_signal.lock().unwrap() = Some(signal_tx);
|
|
// Seed one event so the client gets something on the stream before we
|
|
// drop it; this proves the call is live.
|
|
*state.watch_events.lock().unwrap() = vec![DeployEvent {
|
|
sequence: 7,
|
|
observed_at: None,
|
|
time_of_last_deploy: None,
|
|
time_of_last_deploy_present: false,
|
|
object_count: 0,
|
|
attribute_count: 0,
|
|
}];
|
|
let endpoint = spawn_fake(state.clone()).await;
|
|
|
|
let mut client = GalaxyClient::connect(ClientOptions::new(endpoint))
|
|
.await
|
|
.unwrap();
|
|
|
|
let mut stream = client.watch_deploy_events(None).await.unwrap();
|
|
let event = stream
|
|
.next()
|
|
.await
|
|
.expect("bootstrap event")
|
|
.expect("ok deploy event");
|
|
assert_eq!(event.sequence, 7);
|
|
|
|
// Dropping the client-side stream must trigger the server-side stream
|
|
// future to be dropped as well, signalling cancellation.
|
|
drop(stream);
|
|
|
|
let drop_seen = tokio::time::timeout(std::time::Duration::from_secs(2), signal_rx.recv())
|
|
.await
|
|
.expect("server-side stream future was not dropped within 2s");
|
|
assert!(
|
|
drop_seen.is_some(),
|
|
"drop signal channel closed unexpectedly"
|
|
);
|
|
}
|
|
|
|
fn browse_obj(gid: i32, tag: &str, is_area: bool) -> GalaxyObject {
|
|
GalaxyObject {
|
|
gobject_id: gid,
|
|
tag_name: tag.to_owned(),
|
|
contained_name: String::new(),
|
|
browse_name: tag.to_owned(),
|
|
parent_gobject_id: 0,
|
|
is_area,
|
|
category_id: 0,
|
|
hosted_by_gobject_id: 0,
|
|
template_chain: Vec::new(),
|
|
attributes: Vec::new(),
|
|
}
|
|
}
|
|
|
|
fn build_browse_reply(
|
|
children: Vec<GalaxyObject>,
|
|
child_has_children: Vec<bool>,
|
|
cache_sequence: u64,
|
|
) -> BrowseChildrenReply {
|
|
BrowseChildrenReply {
|
|
total_child_count: children.len() as i32,
|
|
cache_sequence,
|
|
children,
|
|
child_has_children,
|
|
next_page_token: String::new(),
|
|
}
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn browse_no_parent_returns_roots() {
|
|
let state = Arc::new(FakeState::default());
|
|
state
|
|
.browse_children_replies
|
|
.lock()
|
|
.unwrap()
|
|
.push_back(build_browse_reply(
|
|
vec![browse_obj(1, "Area_A", true), browse_obj(2, "Area_B", true)],
|
|
vec![true, false],
|
|
7,
|
|
));
|
|
let endpoint = spawn_fake(state.clone()).await;
|
|
let mut client = GalaxyClient::connect(ClientOptions::new(endpoint))
|
|
.await
|
|
.unwrap();
|
|
|
|
let roots = client.browse(None).await.unwrap();
|
|
|
|
assert_eq!(roots.len(), 2);
|
|
assert_eq!(roots[0].object().tag_name, "Area_A");
|
|
assert!(roots[0].has_children_hint());
|
|
assert_eq!(roots[1].object().tag_name, "Area_B");
|
|
assert!(!roots[1].has_children_hint());
|
|
|
|
let calls = state.browse_children_calls.lock().unwrap();
|
|
assert_eq!(calls.len(), 1);
|
|
assert!(
|
|
calls[0].parent.is_none(),
|
|
"root browse must send an empty parent oneof, got {:?}",
|
|
calls[0].parent
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn browse_expand_populates_children_and_marks_expanded() {
|
|
let state = Arc::new(FakeState::default());
|
|
// First call: roots.
|
|
state
|
|
.browse_children_replies
|
|
.lock()
|
|
.unwrap()
|
|
.push_back(build_browse_reply(
|
|
vec![browse_obj(10, "Area_A", true)],
|
|
vec![true],
|
|
1,
|
|
));
|
|
// Second call: children of gobject 10.
|
|
state
|
|
.browse_children_replies
|
|
.lock()
|
|
.unwrap()
|
|
.push_back(build_browse_reply(
|
|
vec![browse_obj(11, "Receiver_1", false)],
|
|
vec![false],
|
|
1,
|
|
));
|
|
let endpoint = spawn_fake(state.clone()).await;
|
|
let mut client = GalaxyClient::connect(ClientOptions::new(endpoint))
|
|
.await
|
|
.unwrap();
|
|
|
|
let roots = client.browse(None).await.unwrap();
|
|
let root = roots.into_iter().next().expect("at least one root");
|
|
assert!(!root.is_expanded().await);
|
|
|
|
root.expand().await.unwrap();
|
|
|
|
assert!(root.is_expanded().await);
|
|
let children = root.children().await;
|
|
assert_eq!(children.len(), 1);
|
|
assert_eq!(children[0].object().tag_name, "Receiver_1");
|
|
|
|
let calls = state.browse_children_calls.lock().unwrap();
|
|
assert_eq!(calls.len(), 2);
|
|
let expand_call = &calls[1];
|
|
match expand_call.parent.as_ref().expect("expand sends parent") {
|
|
browse_children_request::Parent::ParentGobjectId(id) => assert_eq!(*id, 10),
|
|
other => panic!("expected ParentGobjectId variant, got {other:?}"),
|
|
}
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn browse_expand_idempotent_no_second_rpc() {
|
|
let state = Arc::new(FakeState::default());
|
|
state
|
|
.browse_children_replies
|
|
.lock()
|
|
.unwrap()
|
|
.push_back(build_browse_reply(
|
|
vec![browse_obj(20, "Area_X", true)],
|
|
vec![true],
|
|
1,
|
|
));
|
|
state
|
|
.browse_children_replies
|
|
.lock()
|
|
.unwrap()
|
|
.push_back(build_browse_reply(
|
|
vec![browse_obj(21, "Leaf", false)],
|
|
vec![false],
|
|
1,
|
|
));
|
|
let endpoint = spawn_fake(state.clone()).await;
|
|
let mut client = GalaxyClient::connect(ClientOptions::new(endpoint))
|
|
.await
|
|
.unwrap();
|
|
|
|
let roots = client.browse(None).await.unwrap();
|
|
let root = roots.into_iter().next().unwrap();
|
|
root.expand().await.unwrap();
|
|
let after_first = state.browse_children_calls.lock().unwrap().len();
|
|
|
|
// Calling expand a second time must NOT issue a new RPC.
|
|
root.expand().await.unwrap();
|
|
|
|
let after_second = state.browse_children_calls.lock().unwrap().len();
|
|
assert_eq!(
|
|
after_first, after_second,
|
|
"expand should be idempotent — no extra RPC the second time"
|
|
);
|
|
assert_eq!(root.children().await.len(), 1);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn browse_expand_unknown_parent_returns_not_found_error() {
|
|
let state = Arc::new(FakeState::default());
|
|
// Root browse succeeds.
|
|
state
|
|
.browse_children_replies
|
|
.lock()
|
|
.unwrap()
|
|
.push_back(build_browse_reply(
|
|
vec![browse_obj(99, "GhostArea", true)],
|
|
vec![true],
|
|
1,
|
|
));
|
|
let endpoint = spawn_fake(state.clone()).await;
|
|
let mut client = GalaxyClient::connect(ClientOptions::new(endpoint))
|
|
.await
|
|
.unwrap();
|
|
|
|
let roots = client.browse(None).await.unwrap();
|
|
let root = roots.into_iter().next().unwrap();
|
|
|
|
// Seed the NotFound only AFTER the root call so the FakeGalaxy's
|
|
// error stack doesn't intercept the initial browse.
|
|
state
|
|
.browse_children_errors
|
|
.lock()
|
|
.unwrap()
|
|
.push(Status::not_found("parent gobject 99 not present in cache"));
|
|
|
|
let error = root.expand().await.unwrap_err();
|
|
|
|
match &error {
|
|
Error::Status(status) => {
|
|
assert_eq!(status.code(), tonic::Code::NotFound);
|
|
}
|
|
other => panic!("expected Error::Status(NotFound), got {other:?}"),
|
|
}
|
|
// Failed expand must NOT mark the node as expanded — caller can retry.
|
|
assert!(!root.is_expanded().await);
|
|
assert!(root.children().await.is_empty());
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn browse_expand_multi_page_gathers_all_pages() {
|
|
let state = Arc::new(FakeState::default());
|
|
// First reply: roots.
|
|
state
|
|
.browse_children_replies
|
|
.lock()
|
|
.unwrap()
|
|
.push_back(build_browse_reply(
|
|
vec![browse_obj(30, "Plant", true)],
|
|
vec![true],
|
|
5,
|
|
));
|
|
// Second reply: page 1 of children, with a next_page_token.
|
|
let mut page_one = build_browse_reply(
|
|
vec![
|
|
browse_obj(31, "Child_A", false),
|
|
browse_obj(32, "Child_B", false),
|
|
],
|
|
vec![false, false],
|
|
5,
|
|
);
|
|
page_one.next_page_token = "cursor-2".to_owned();
|
|
page_one.total_child_count = 3;
|
|
state
|
|
.browse_children_replies
|
|
.lock()
|
|
.unwrap()
|
|
.push_back(page_one);
|
|
// Third reply: page 2 of children, with no next page.
|
|
state
|
|
.browse_children_replies
|
|
.lock()
|
|
.unwrap()
|
|
.push_back(build_browse_reply(
|
|
vec![browse_obj(33, "Child_C", false)],
|
|
vec![false],
|
|
5,
|
|
));
|
|
let endpoint = spawn_fake(state.clone()).await;
|
|
let mut client = GalaxyClient::connect(ClientOptions::new(endpoint))
|
|
.await
|
|
.unwrap();
|
|
|
|
let roots = client.browse(None).await.unwrap();
|
|
let root = roots.into_iter().next().unwrap();
|
|
root.expand().await.unwrap();
|
|
|
|
let children = root.children().await;
|
|
assert_eq!(children.len(), 3);
|
|
assert_eq!(children[0].object().tag_name, "Child_A");
|
|
assert_eq!(children[1].object().tag_name, "Child_B");
|
|
assert_eq!(children[2].object().tag_name, "Child_C");
|
|
|
|
let calls = state.browse_children_calls.lock().unwrap();
|
|
// 1 root call + 2 paged expand calls = 3 total.
|
|
assert_eq!(calls.len(), 3);
|
|
assert_eq!(calls[1].page_token, "");
|
|
assert_eq!(calls[2].page_token, "cursor-2");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn browse_with_filter_forwards_to_request() {
|
|
let state = Arc::new(FakeState::default());
|
|
state
|
|
.browse_children_replies
|
|
.lock()
|
|
.unwrap()
|
|
.push_back(build_browse_reply(Vec::new(), Vec::new(), 1));
|
|
let endpoint = spawn_fake(state.clone()).await;
|
|
let mut client = GalaxyClient::connect(ClientOptions::new(endpoint))
|
|
.await
|
|
.unwrap();
|
|
|
|
let options = BrowseChildrenOptions {
|
|
category_ids: vec![3, 5],
|
|
template_chain_contains: vec!["$DelmiaReceiver".to_owned()],
|
|
tag_name_glob: Some("Recv_*".to_owned()),
|
|
include_attributes: Some(true),
|
|
alarm_bearing_only: true,
|
|
historized_only: false,
|
|
};
|
|
|
|
let _ = client.browse(Some(options)).await.unwrap();
|
|
|
|
let calls = state.browse_children_calls.lock().unwrap();
|
|
assert_eq!(calls.len(), 1);
|
|
let req = &calls[0];
|
|
assert_eq!(req.category_ids, vec![3, 5]);
|
|
assert_eq!(req.template_chain_contains, vec!["$DelmiaReceiver"]);
|
|
assert_eq!(req.tag_name_glob, "Recv_*");
|
|
assert_eq!(req.include_attributes, Some(true));
|
|
assert!(req.alarm_bearing_only);
|
|
assert!(!req.historized_only);
|
|
}
|
|
}
|