Add Galaxy repository API and clients

This commit is contained in:
Joseph Doherty
2026-04-29 07:27:00 -04:00
parent 047d875fe6
commit 133c83029b
103 changed files with 22788 additions and 39 deletions
+12
View File
@@ -595,6 +595,7 @@ dependencies = [
"clap",
"futures-util",
"mxgateway-client",
"prost-types",
"serde",
"serde_json",
"tokio",
@@ -886,6 +887,16 @@ version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64"
[[package]]
name = "signal-hook-registry"
version = "1.4.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4db69cba1110affc0e9f7bcd48bbf87b3f4fc7c61fc9155afd4c469eb3d6c1b"
dependencies = [
"errno",
"libc",
]
[[package]]
name = "slab"
version = "0.4.12"
@@ -990,6 +1001,7 @@ dependencies = [
"libc",
"mio",
"pin-project-lite",
"signal-hook-registry",
"socket2 0.6.3",
"tokio-macros",
"windows-sys 0.61.2",
+70
View File
@@ -99,6 +99,76 @@ preserving the raw message for parity diagnostics. Command replies whose
protocol status is not `PROTOCOL_STATUS_CODE_OK` become `Error::Command` and
retain the raw `MxCommandReply`.
## Galaxy Repository browse
The Galaxy Repository service exposes a read-only browse over the AVEVA System
Platform Galaxy Repository (ZB SQL database). It uses the same API-key auth as
the gateway service but requires the `metadata:read` scope on the server.
[`GalaxyClient`](src/galaxy.rs) wraps the generated Galaxy bindings the same
way [`GatewayClient`](src/client.rs) wraps the gateway bindings:
```rust
let mut galaxy = GalaxyClient::connect(
ClientOptions::new("http://localhost:5000")
.with_api_key(ApiKey::new(api_key)),
).await?;
let ok = galaxy.test_connection().await?;
let last_deploy = galaxy.get_last_deploy_time().await?; // Option<prost_types::Timestamp>
let objects = galaxy.discover_hierarchy().await?; // Vec<GalaxyObject>
```
`get_last_deploy_time` returns `None` when the server reports
`present = false`. `discover_hierarchy` returns the generated
`GalaxyObject` proto type (re-exported via
`mxgateway_client::generated::galaxy_repository::v1`) with all attributes
attached.
The CLI ships matching subcommands under `galaxy`:
```powershell
cargo run -p mxgw-cli -- galaxy test-connection --endpoint http://localhost:5000 --api-key-env MXGATEWAY_API_KEY --json
cargo run -p mxgw-cli -- galaxy last-deploy-time --endpoint http://localhost:5000 --api-key-env MXGATEWAY_API_KEY --json
cargo run -p mxgw-cli -- galaxy discover-hierarchy --endpoint http://localhost:5000 --api-key-env MXGATEWAY_API_KEY --json
```
### Watching deploy events
`watch_deploy_events` opens the `WatchDeployEvents` server stream. The
server emits a bootstrap [`DeployEvent`](src/galaxy.rs) describing the
current cache state on subscribe, then one event each time the cached
`galaxy.time_of_last_deploy` changes. `sequence` is monotonic per server
start; gaps signal that the per-subscriber buffer dropped older events.
Pass `last_seen_deploy_time` to suppress the bootstrap event when the
client's cached deploy time matches the server's.
```rust
use futures_util::StreamExt;
let mut stream = galaxy.watch_deploy_events(None).await?;
while let Some(event) = stream.next().await {
let event = event?;
println!(
"seq={} objects={} attributes={}",
event.sequence, event.object_count, event.attribute_count,
);
}
// Drop the stream to cancel the gRPC call.
```
The matching CLI subcommand prints one line per event (`--json` switches to
one JSON object per event). `--last-seen-deploy-time` accepts an RFC3339
timestamp and is forwarded to the server. `--max-events` (default 0 = no
cap) lets you stop after a fixed number of events; otherwise the command
runs until the stream ends or `Ctrl+C` is pressed.
```powershell
cargo run -p mxgw-cli -- galaxy watch --endpoint http://localhost:5000 --api-key-env MXGATEWAY_API_KEY
cargo run -p mxgw-cli -- galaxy watch --endpoint http://localhost:5000 --api-key-env MXGATEWAY_API_KEY --json
cargo run -p mxgw-cli -- galaxy watch --endpoint http://localhost:5000 --api-key-env MXGATEWAY_API_KEY --last-seen-deploy-time 2026-04-28T15:30:00Z
```
## Integration Checks
Run live checks only when a gateway and MXAccess-backed worker are available:
+7 -1
View File
@@ -13,17 +13,23 @@ fn main() -> Result<(), Box<dyn Error>> {
let proto_root = repo_root.join("src/MxGateway.Contracts/Protos");
let gateway_proto = proto_root.join("mxaccess_gateway.proto");
let worker_proto = proto_root.join("mxaccess_worker.proto");
let galaxy_proto = proto_root.join("galaxy_repository.proto");
let descriptor_path = PathBuf::from(env::var("OUT_DIR")?).join("mxaccessgw-client-v1.protoset");
println!("cargo:rerun-if-changed={}", gateway_proto.display());
println!("cargo:rerun-if-changed={}", worker_proto.display());
println!("cargo:rerun-if-changed={}", galaxy_proto.display());
tonic_build::configure()
.build_server(true)
.build_client(true)
.file_descriptor_set_path(descriptor_path)
.compile_protos(
&[gateway_proto.as_path(), worker_proto.as_path()],
&[
gateway_proto.as_path(),
worker_proto.as_path(),
galaxy_proto.as_path(),
],
&[proto_root.as_path()],
)?;
+2 -1
View File
@@ -12,6 +12,7 @@ path = "src/main.rs"
clap = { workspace = true }
futures-util = { workspace = true }
mxgateway-client = { path = "../.." }
prost-types = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt-multi-thread", "signal", "sync", "time"] }
+412 -2
View File
@@ -5,13 +5,14 @@ use std::time::Duration;
use clap::{Args, Parser, Subcommand, ValueEnum};
use futures_util::StreamExt;
use mxgateway_client::generated::galaxy_repository::v1::DeployEvent;
use mxgateway_client::generated::mxaccess_gateway::v1::{
CloseSessionRequest, MxCommand, MxCommandKind, MxCommandRequest, OpenSessionRequest,
PingCommand, StreamEventsRequest,
};
use mxgateway_client::{
ApiKey, ClientOptions, Error, GatewayClient, MxValue, CLIENT_VERSION, GATEWAY_PROTOCOL_VERSION,
WORKER_PROTOCOL_VERSION,
ApiKey, ClientOptions, Error, GalaxyClient, GatewayClient, MxValue, CLIENT_VERSION,
GATEWAY_PROTOCOL_VERSION, WORKER_PROTOCOL_VERSION,
};
use serde_json::json;
use serde_json::Value;
@@ -178,6 +179,51 @@ enum Command {
#[arg(long)]
json: bool,
},
#[command(subcommand)]
Galaxy(GalaxyCommand),
}
#[derive(Debug, Subcommand)]
enum GalaxyCommand {
TestConnection {
#[command(flatten)]
connection: ConnectionArgs,
#[arg(long)]
json: bool,
},
LastDeployTime {
#[command(flatten)]
connection: ConnectionArgs,
#[arg(long)]
json: bool,
},
DiscoverHierarchy {
#[command(flatten)]
connection: ConnectionArgs,
#[arg(long)]
json: bool,
},
/// Subscribe to the WatchDeployEvents server stream.
///
/// Prints one line per received event (or one JSON object with `--json`).
/// Runs until the stream ends, the server fails the call, or the
/// process is interrupted (Ctrl+C).
#[command(alias = "watch-deploy-events")]
Watch {
#[command(flatten)]
connection: ConnectionArgs,
/// Optional RFC3339 timestamp (e.g. `2026-04-28T15:30:00Z`). When
/// supplied, the server suppresses the bootstrap event if its
/// cached deploy time matches this value.
#[arg(long)]
last_seen_deploy_time: Option<String>,
/// Optional cap on the number of events to print before exiting.
/// 0 (the default) means run until cancelled or the stream ends.
#[arg(long, default_value_t = 0)]
max_events: usize,
#[arg(long)]
json: bool,
},
}
#[derive(Debug, Args, Clone)]
@@ -465,6 +511,7 @@ async fn run(cli: Cli) -> Result<(), Error> {
.await?;
print_ok("write2", json);
}
Command::Galaxy(galaxy_command) => run_galaxy(galaxy_command).await?,
Command::Smoke {
connection,
item,
@@ -514,6 +561,133 @@ async fn connect(connection: ConnectionArgs) -> Result<GatewayClient, Error> {
GatewayClient::connect(connection.options()).await
}
async fn connect_galaxy(connection: ConnectionArgs) -> Result<GalaxyClient, Error> {
GalaxyClient::connect(connection.options()).await
}
async fn run_galaxy(command: GalaxyCommand) -> Result<(), Error> {
match command {
GalaxyCommand::TestConnection { connection, json } => {
let mut client = connect_galaxy(connection).await?;
let ok = client.test_connection().await?;
if json {
println!("{}", json!({ "ok": ok }));
} else if ok {
println!("ok");
} else {
println!("not ok");
}
}
GalaxyCommand::LastDeployTime { connection, json } => {
let mut client = connect_galaxy(connection).await?;
let timestamp = client.get_last_deploy_time().await?;
match (json, timestamp) {
(true, Some(ts)) => {
println!(
"{}",
json!({
"present": true,
"seconds": ts.seconds,
"nanos": ts.nanos,
})
);
}
(true, None) => {
println!("{}", json!({ "present": false }));
}
(false, Some(ts)) => println!("{}.{:09}", ts.seconds, ts.nanos),
(false, None) => println!("(absent)"),
}
}
GalaxyCommand::Watch {
connection,
last_seen_deploy_time,
max_events,
json,
} => {
let mut client = connect_galaxy(connection).await?;
let last_seen = last_seen_deploy_time
.as_deref()
.map(parse_rfc3339_timestamp)
.transpose()?;
let mut stream = client.watch_deploy_events(last_seen).await?;
let mut count = 0usize;
loop {
tokio::select! {
biased;
signal = tokio::signal::ctrl_c() => {
signal.map_err(|err| Error::InvalidArgument {
name: "ctrl_c".to_owned(),
detail: err.to_string(),
})?;
// Drop the stream below by breaking; tonic tears the
// gRPC call down cooperatively.
break;
}
next = stream.next() => {
let Some(event) = next else { break; };
let event = event?;
count += 1;
print_deploy_event(&event, json);
if max_events != 0 && count >= max_events {
break;
}
}
}
}
}
GalaxyCommand::DiscoverHierarchy { connection, json } => {
let mut client = connect_galaxy(connection).await?;
let objects = client.discover_hierarchy().await?;
if json {
let payload: Vec<_> = objects
.iter()
.map(|object| {
json!({
"gobjectId": object.gobject_id,
"tagName": object.tag_name,
"containedName": object.contained_name,
"browseName": object.browse_name,
"parentGobjectId": object.parent_gobject_id,
"isArea": object.is_area,
"categoryId": object.category_id,
"hostedByGobjectId": object.hosted_by_gobject_id,
"templateChain": object.template_chain,
"attributes": object.attributes.iter().map(|attribute| json!({
"attributeName": attribute.attribute_name,
"fullTagReference": attribute.full_tag_reference,
"mxDataType": attribute.mx_data_type,
"dataTypeName": attribute.data_type_name,
"isArray": attribute.is_array,
"arrayDimension": attribute.array_dimension,
"arrayDimensionPresent": attribute.array_dimension_present,
"mxAttributeCategory": attribute.mx_attribute_category,
"securityClassification": attribute.security_classification,
"isHistorized": attribute.is_historized,
"isAlarm": attribute.is_alarm,
})).collect::<Vec<_>>(),
})
})
.collect();
println!("{}", json!({ "objects": payload }));
} else {
println!("{}", objects.len());
for object in &objects {
println!(
"{} {} {} ({} attribute(s))",
object.gobject_id,
object.tag_name,
object.browse_name,
object.attributes.len()
);
}
}
}
}
Ok(())
}
async fn session_for(
connection: ConnectionArgs,
session_id: String,
@@ -616,6 +790,208 @@ fn parse_value(value_type: CliValueType, value: &str) -> Result<MxValue, Error>
Ok(parsed)
}
fn print_deploy_event(event: &DeployEvent, use_json: bool) {
if use_json {
println!(
"{}",
json!({
"sequence": event.sequence,
"observedAt": event.observed_at.as_ref().map(|ts| json!({
"seconds": ts.seconds,
"nanos": ts.nanos,
})),
"timeOfLastDeploy": event.time_of_last_deploy.as_ref().map(|ts| json!({
"seconds": ts.seconds,
"nanos": ts.nanos,
})),
"timeOfLastDeployPresent": event.time_of_last_deploy_present,
"objectCount": event.object_count,
"attributeCount": event.attribute_count,
})
);
} else {
let observed = event
.observed_at
.as_ref()
.map(|ts| format!("{}.{:09}", ts.seconds, ts.nanos))
.unwrap_or_else(|| "(absent)".to_owned());
let last_deploy = if event.time_of_last_deploy_present {
event
.time_of_last_deploy
.as_ref()
.map(|ts| format!("{}.{:09}", ts.seconds, ts.nanos))
.unwrap_or_else(|| "(absent)".to_owned())
} else {
"(absent)".to_owned()
};
println!(
"seq={} observed={} lastDeploy={} objects={} attributes={}",
event.sequence, observed, last_deploy, event.object_count, event.attribute_count,
);
}
}
/// Parse a small but practically-complete subset of RFC3339:
/// `YYYY-MM-DDTHH:MM:SS[.fffffffff][Z|+HH:MM|-HH:MM]`. Returns the
/// corresponding `prost_types::Timestamp` (Unix seconds + nanoseconds).
fn parse_rfc3339_timestamp(input: &str) -> Result<prost_types::Timestamp, Error> {
fn invalid(detail: impl Into<String>) -> Error {
Error::InvalidArgument {
name: "last-seen-deploy-time".to_owned(),
detail: detail.into(),
}
}
let bytes = input.as_bytes();
if bytes.len() < 20 || (bytes[10] != b'T' && bytes[10] != b't') {
return Err(invalid(format!(
"expected RFC3339 timestamp like 2026-04-28T15:30:00Z, got {input:?}"
)));
}
let read_u32 = |start: usize, len: usize| -> Result<u32, Error> {
std::str::from_utf8(&bytes[start..start + len])
.ok()
.and_then(|slice| slice.parse::<u32>().ok())
.ok_or_else(|| invalid(format!("non-numeric digits at byte {start}")))
};
let year = read_u32(0, 4)? as i32;
if bytes[4] != b'-' {
return Err(invalid("expected '-' after year"));
}
let month = read_u32(5, 2)?;
if bytes[7] != b'-' {
return Err(invalid("expected '-' after month"));
}
let day = read_u32(8, 2)?;
let hour = read_u32(11, 2)?;
if bytes[13] != b':' {
return Err(invalid("expected ':' after hour"));
}
let minute = read_u32(14, 2)?;
if bytes[16] != b':' {
return Err(invalid("expected ':' after minute"));
}
let second = read_u32(17, 2)?;
let mut cursor = 19usize;
let mut nanos: u32 = 0;
if cursor < bytes.len() && bytes[cursor] == b'.' {
cursor += 1;
let frac_start = cursor;
while cursor < bytes.len() && bytes[cursor].is_ascii_digit() {
cursor += 1;
}
let frac_len = cursor - frac_start;
if frac_len == 0 {
return Err(invalid("expected fractional digits after '.'"));
}
let take = frac_len.min(9);
let frac = std::str::from_utf8(&bytes[frac_start..frac_start + take])
.ok()
.and_then(|slice| slice.parse::<u32>().ok())
.ok_or_else(|| invalid("invalid fractional digits"))?;
nanos = frac * 10u32.pow(9u32.saturating_sub(take as u32));
}
let mut offset_seconds: i64 = 0;
if cursor >= bytes.len() {
return Err(invalid("missing timezone designator (Z or +HH:MM)"));
}
match bytes[cursor] {
b'Z' | b'z' => cursor += 1,
sign @ (b'+' | b'-') => {
cursor += 1;
if cursor + 5 > bytes.len() {
return Err(invalid("offset must be ±HH:MM"));
}
let oh = std::str::from_utf8(&bytes[cursor..cursor + 2])
.ok()
.and_then(|slice| slice.parse::<i64>().ok())
.ok_or_else(|| invalid("invalid offset hour"))?;
if bytes[cursor + 2] != b':' {
return Err(invalid("offset must contain ':' between HH and MM"));
}
let om = std::str::from_utf8(&bytes[cursor + 3..cursor + 5])
.ok()
.and_then(|slice| slice.parse::<i64>().ok())
.ok_or_else(|| invalid("invalid offset minute"))?;
cursor += 5;
let signed = if sign == b'-' { -1 } else { 1 };
offset_seconds = signed * (oh * 3600 + om * 60);
}
other => {
return Err(invalid(format!(
"unexpected timezone designator byte {other:?}"
)));
}
}
if cursor != bytes.len() {
return Err(invalid("trailing characters after timezone"));
}
let unix = ymdhms_to_unix(year, month, day, hour, minute, second)?;
let seconds = unix - offset_seconds;
Ok(prost_types::Timestamp {
seconds,
nanos: nanos as i32,
})
}
fn ymdhms_to_unix(
year: i32,
month: u32,
day: u32,
hour: u32,
minute: u32,
second: u32,
) -> Result<i64, Error> {
if !(1..=12).contains(&month) || day < 1 || hour > 23 || minute > 59 || second > 60 {
return Err(Error::InvalidArgument {
name: "last-seen-deploy-time".to_owned(),
detail: "calendar component out of range".to_owned(),
});
}
fn is_leap(year: i32) -> bool {
(year % 4 == 0 && year % 100 != 0) || year % 400 == 0
}
const DAYS: [u32; 12] = [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31];
let mut max = DAYS[(month - 1) as usize];
if month == 2 && is_leap(year) {
max = 29;
}
if day > max {
return Err(Error::InvalidArgument {
name: "last-seen-deploy-time".to_owned(),
detail: format!("day {day} out of range for month {month}/{year}"),
});
}
// Days from 1970-01-01 to year-month-day.
let mut total_days: i64 = 0;
if year >= 1970 {
for y in 1970..year {
total_days += if is_leap(y) { 366 } else { 365 };
}
} else {
for y in year..1970 {
total_days -= if is_leap(y) { 366 } else { 365 };
}
}
for m in 1..month {
let mut dim = DAYS[(m - 1) as usize];
if m == 2 && is_leap(year) {
dim = 29;
}
total_days += dim as i64;
}
total_days += (day - 1) as i64;
Ok(total_days * 86_400 + hour as i64 * 3600 + minute as i64 * 60 + second as i64)
}
fn parse_cli_value<T>(value: &str) -> Result<T, Error>
where
T: std::str::FromStr,
@@ -665,4 +1041,38 @@ mod tests {
assert_eq!(value["gatewayProtocolVersion"], 1);
assert_eq!(value["workerProtocolVersion"], 1);
}
#[test]
fn parses_galaxy_watch_command_with_last_seen_and_max_events() {
let parsed = Cli::try_parse_from([
"mxgw",
"galaxy",
"watch",
"--last-seen-deploy-time",
"2026-04-28T15:30:00Z",
"--max-events",
"5",
"--json",
]);
assert!(parsed.is_ok(), "parse failed: {parsed:?}");
}
#[test]
fn parses_galaxy_watch_deploy_events_alias() {
let parsed = Cli::try_parse_from(["mxgw", "galaxy", "watch-deploy-events"]);
assert!(parsed.is_ok(), "parse failed: {parsed:?}");
}
#[test]
fn rfc3339_parser_round_trips_z_and_offset_inputs() {
// 2026-04-28T15:30:00Z = 1_777_995_000 (sanity-checked once below)
let utc = super::parse_rfc3339_timestamp("2026-04-28T15:30:00Z").unwrap();
let plus = super::parse_rfc3339_timestamp("2026-04-28T16:30:00+01:00").unwrap();
let frac = super::parse_rfc3339_timestamp("2026-04-28T15:30:00.250Z").unwrap();
assert_eq!(utc.seconds, plus.seconds);
assert_eq!(utc.nanos, 0);
assert_eq!(frac.seconds, utc.seconds);
assert_eq!(frac.nanos, 250_000_000);
}
}
+591
View File
@@ -0,0 +1,591 @@
//! Thin async wrapper for the `GalaxyRepository` gRPC service.
//!
//! The wrapper mirrors [`crate::client::GatewayClient`]: it owns a tonic
//! channel with the shared bearer-token interceptor and exposes the three
//! read-only RPCs as Rust async methods. Generated Galaxy proto types are
//! re-exported through [`crate::generated::galaxy_repository::v1`].
use std::fs;
use prost_types::Timestamp;
use tonic::codegen::InterceptedService;
use tonic::transport::{Certificate, Channel, ClientTlsConfig};
use tonic::Request;
use crate::auth::AuthInterceptor;
use crate::error::Error;
use crate::generated::galaxy_repository::v1::galaxy_repository_client::GalaxyRepositoryClient;
use crate::generated::galaxy_repository::v1::{
DeployEvent, DiscoverHierarchyRequest, GalaxyObject, GetLastDeployTimeRequest,
TestConnectionRequest, WatchDeployEventsRequest,
};
use crate::options::ClientOptions;
/// Convenience alias for the generated Galaxy client wrapped in the
/// authentication interceptor.
pub type RawGalaxyClient = GalaxyRepositoryClient<InterceptedService<Channel, AuthInterceptor>>;
/// Stream of `DeployEvent` values returned by
/// [`GalaxyClient::watch_deploy_events`]. Mirrors
/// [`crate::client::EventStream`]: a boxed `Stream` whose `tonic::Status`
/// errors have already been mapped onto [`Error`]. Dropping the stream
/// cancels the underlying gRPC call.
pub type DeployEventStream = std::pin::Pin<
Box<dyn futures_core::Stream<Item = Result<DeployEvent, Error>> + Send + 'static>,
>;
/// Thin async wrapper around the generated Galaxy Repository gRPC client.
///
/// Construct it with [`GalaxyClient::connect`] using the same
/// [`ClientOptions`] that drive [`crate::client::GatewayClient`]. The
/// service is metadata-only (no sessions) and requires the `metadata:read`
/// API-key scope on the server side.
#[derive(Clone)]
pub struct GalaxyClient {
inner: RawGalaxyClient,
call_timeout: std::time::Duration,
stream_timeout: Option<std::time::Duration>,
}
impl GalaxyClient {
/// Connect to the gateway endpoint and build a Galaxy client. Mirrors
/// the TLS / plaintext / API-key handling used by `GatewayClient`.
pub async fn connect(options: ClientOptions) -> Result<Self, Error> {
let mut endpoint =
Channel::from_shared(options.endpoint().to_owned()).map_err(|source| {
Error::InvalidEndpoint {
endpoint: options.endpoint().to_owned(),
detail: source.to_string(),
}
})?;
endpoint = endpoint.connect_timeout(options.connect_timeout());
if !options.plaintext() {
let mut tls = ClientTlsConfig::new();
if let Some(server_name) = options.server_name_override() {
tls = tls.domain_name(server_name.to_owned());
}
if let Some(ca_file) = options.ca_file() {
let certificate = fs::read(ca_file).map_err(|source| Error::InvalidEndpoint {
endpoint: options.endpoint().to_owned(),
detail: format!("failed to read CA file {}: {source}", ca_file.display()),
})?;
tls = tls.ca_certificate(Certificate::from_pem(certificate));
}
endpoint = endpoint.tls_config(tls)?;
}
let channel = endpoint.connect().await?;
let interceptor = AuthInterceptor::new(options.api_key().cloned());
Ok(Self {
inner: GalaxyRepositoryClient::with_interceptor(channel, interceptor),
call_timeout: options.call_timeout(),
stream_timeout: options.stream_timeout(),
})
}
/// Build a [`GalaxyClient`] that talks through an existing tonic
/// channel. Tests use this to wire up an in-memory transport.
pub fn from_channel(channel: Channel, options: &ClientOptions) -> Self {
let interceptor = AuthInterceptor::new(options.api_key().cloned());
Self {
inner: GalaxyRepositoryClient::with_interceptor(channel, interceptor),
call_timeout: options.call_timeout(),
stream_timeout: options.stream_timeout(),
}
}
/// Borrow the underlying generated client for advanced callers that need
/// access to features not surfaced by the wrapper.
pub fn raw_client(&mut self) -> &mut RawGalaxyClient {
&mut self.inner
}
/// Consume the wrapper and return the generated client.
pub fn into_inner(self) -> RawGalaxyClient {
self.inner
}
/// Probe the Galaxy Repository database connection. Returns the `ok`
/// flag from the server reply.
pub async fn test_connection(&mut self) -> Result<bool, Error> {
let response = self
.inner
.test_connection(self.unary_request(TestConnectionRequest {}))
.await?;
Ok(response.into_inner().ok)
}
/// Read the most recent Galaxy deployment timestamp. Returns `None`
/// when the server reports `present = false`.
pub async fn get_last_deploy_time(&mut self) -> Result<Option<Timestamp>, Error> {
let response = self
.inner
.get_last_deploy_time(self.unary_request(GetLastDeployTimeRequest {}))
.await?;
let reply = response.into_inner();
if reply.present {
Ok(reply.time_of_last_deploy)
} else {
Ok(None)
}
}
/// Walk the deployed object hierarchy. Each [`GalaxyObject`] contains
/// the object's identifying names plus its dynamic attributes.
pub async fn discover_hierarchy(&mut self) -> Result<Vec<GalaxyObject>, Error> {
let response = self
.inner
.discover_hierarchy(self.unary_request(DiscoverHierarchyRequest {}))
.await?;
Ok(response.into_inner().objects)
}
/// Subscribe to the server-streamed deploy-event feed.
///
/// The server emits a bootstrap event describing the current cache state
/// immediately on subscribe, then one event per observed change to
/// `galaxy.time_of_last_deploy`. When `last_seen_deploy_time` matches the
/// current cache, the bootstrap event is suppressed and the stream stays
/// idle until the next deploy.
///
/// Cancellation is cooperative: dropping the returned
/// [`DeployEventStream`] tears down the underlying gRPC call. Callers
/// drive consumption with `StreamExt::next` (or any other `Stream`
/// adapter).
pub async fn watch_deploy_events(
&mut self,
last_seen_deploy_time: Option<Timestamp>,
) -> Result<DeployEventStream, Error> {
let request = WatchDeployEventsRequest {
last_seen_deploy_time,
};
let response = self
.inner
.watch_deploy_events(self.stream_request(request))
.await?;
let stream = futures_util::StreamExt::map(response.into_inner(), |result| {
result.map_err(Error::from)
});
Ok(Box::pin(stream))
}
fn unary_request<T>(&self, message: T) -> Request<T> {
let mut request = Request::new(message);
request.set_timeout(self.call_timeout);
request
}
fn stream_request<T>(&self, message: T) -> Request<T> {
let mut request = Request::new(message);
if let Some(timeout) = self.stream_timeout {
request.set_timeout(timeout);
}
request
}
}
#[cfg(test)]
mod tests {
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use futures_util::StreamExt;
use tokio::net::TcpListener;
use tokio::sync::mpsc;
use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream};
use tonic::transport::Server;
use tonic::{Request, Response, Status};
use super::*;
use crate::auth::ApiKey;
use crate::generated::galaxy_repository::v1::galaxy_repository_server::{
GalaxyRepository, GalaxyRepositoryServer,
};
use crate::generated::galaxy_repository::v1::{
DeployEvent, DiscoverHierarchyReply, DiscoverHierarchyRequest, GalaxyAttribute,
GalaxyObject, GetLastDeployTimeReply, GetLastDeployTimeRequest, TestConnectionReply,
TestConnectionRequest, WatchDeployEventsRequest,
};
type DeployEventTx = mpsc::Sender<Result<DeployEvent, Status>>;
#[derive(Default)]
struct FakeState {
authorization: Mutex<Option<String>>,
present: Mutex<bool>,
last_deploy: Mutex<Option<Timestamp>>,
objects: Mutex<Vec<GalaxyObject>>,
watch_requests: Mutex<Vec<WatchDeployEventsRequest>>,
watch_events: Mutex<Vec<DeployEvent>>,
watch_senders: Mutex<Vec<DeployEventTx>>,
watch_drop_signal: Mutex<Option<mpsc::UnboundedSender<()>>>,
}
#[derive(Clone)]
struct FakeGalaxy {
state: Arc<FakeState>,
}
#[tonic::async_trait]
impl GalaxyRepository for FakeGalaxy {
async fn test_connection(
&self,
request: Request<TestConnectionRequest>,
) -> Result<Response<TestConnectionReply>, Status> {
*self.state.authorization.lock().unwrap() = request
.metadata()
.get("authorization")
.and_then(|value| value.to_str().ok())
.map(str::to_owned);
Ok(Response::new(TestConnectionReply { ok: true }))
}
async fn get_last_deploy_time(
&self,
_request: Request<GetLastDeployTimeRequest>,
) -> Result<Response<GetLastDeployTimeReply>, Status> {
let present = *self.state.present.lock().unwrap();
let time = self.state.last_deploy.lock().unwrap().clone();
Ok(Response::new(GetLastDeployTimeReply {
present,
time_of_last_deploy: time,
}))
}
async fn discover_hierarchy(
&self,
_request: Request<DiscoverHierarchyRequest>,
) -> Result<Response<DiscoverHierarchyReply>, Status> {
Ok(Response::new(DiscoverHierarchyReply {
objects: self.state.objects.lock().unwrap().clone(),
}))
}
type WatchDeployEventsStream =
Pin<Box<dyn tokio_stream::Stream<Item = Result<DeployEvent, Status>> + Send + 'static>>;
async fn watch_deploy_events(
&self,
request: Request<WatchDeployEventsRequest>,
) -> Result<Response<Self::WatchDeployEventsStream>, Status> {
self.state
.watch_requests
.lock()
.unwrap()
.push(request.into_inner());
let preset = self.state.watch_events.lock().unwrap().clone();
let (tx, rx) = mpsc::channel::<Result<DeployEvent, Status>>(16);
for event in preset {
tx.send(Ok(event))
.await
.map_err(|err| Status::internal(err.to_string()))?;
}
self.state.watch_senders.lock().unwrap().push(tx.clone());
let drop_signal = self.state.watch_drop_signal.lock().unwrap().clone();
let stream = ReceiverStream::new(rx);
let stream: Pin<Box<dyn tokio_stream::Stream<Item = _> + Send + 'static>> =
if let Some(signal) = drop_signal {
Box::pin(WatchStreamWithDropSignal {
inner: stream,
signal: Some(signal),
})
} else {
Box::pin(stream)
};
Ok(Response::new(stream))
}
}
/// Wraps the receiver stream so we can detect when the server-side stream
/// future is dropped (the client cancelled or dropped the stream). Used by
/// `watch_drop_tears_down_call`.
struct WatchStreamWithDropSignal<S> {
inner: S,
signal: Option<mpsc::UnboundedSender<()>>,
}
impl<S: tokio_stream::Stream + Unpin> tokio_stream::Stream for WatchStreamWithDropSignal<S> {
type Item = S::Item;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
Pin::new(&mut self.inner).poll_next(cx)
}
}
impl<S> Drop for WatchStreamWithDropSignal<S> {
fn drop(&mut self) {
if let Some(signal) = self.signal.take() {
let _ = signal.send(());
}
}
}
async fn spawn_fake(state: Arc<FakeState>) -> String {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let address = listener.local_addr().unwrap();
let incoming = TcpListenerStream::new(listener);
let service = GalaxyRepositoryServer::new(FakeGalaxy { state });
tokio::spawn(async move {
Server::builder()
.add_service(service)
.serve_with_incoming(incoming)
.await
.unwrap();
});
format!("http://{address}")
}
#[tokio::test]
async fn test_connection_attaches_bearer_metadata_and_returns_ok() {
let state = Arc::new(FakeState::default());
let endpoint = spawn_fake(state.clone()).await;
let mut client = GalaxyClient::connect(
ClientOptions::new(endpoint).with_api_key(ApiKey::new("mxgw_galaxy_secret")),
)
.await
.unwrap();
let ok = client.test_connection().await.unwrap();
assert!(ok);
assert_eq!(
state.authorization.lock().unwrap().as_deref(),
Some("Bearer mxgw_galaxy_secret")
);
}
#[tokio::test]
async fn get_last_deploy_time_returns_none_when_not_present() {
let state = Arc::new(FakeState::default());
*state.present.lock().unwrap() = false;
*state.last_deploy.lock().unwrap() = Some(Timestamp {
seconds: 1_700_000_000,
nanos: 0,
});
let endpoint = spawn_fake(state.clone()).await;
let mut client = GalaxyClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let result = client.get_last_deploy_time().await.unwrap();
assert!(
result.is_none(),
"present=false on the wire must surface as None, got {result:?}"
);
}
#[tokio::test]
async fn get_last_deploy_time_returns_timestamp_when_present() {
let state = Arc::new(FakeState::default());
*state.present.lock().unwrap() = true;
*state.last_deploy.lock().unwrap() = Some(Timestamp {
seconds: 1_700_000_000,
nanos: 250_000_000,
});
let endpoint = spawn_fake(state.clone()).await;
let mut client = GalaxyClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let result = client.get_last_deploy_time().await.unwrap();
let timestamp = result.expect("present=true should yield a timestamp");
assert_eq!(timestamp.seconds, 1_700_000_000);
assert_eq!(timestamp.nanos, 250_000_000);
}
#[tokio::test]
async fn discover_hierarchy_returns_objects_with_attributes() {
let state = Arc::new(FakeState::default());
*state.objects.lock().unwrap() = vec![GalaxyObject {
gobject_id: 42,
tag_name: "DelmiaReceiver_001".to_owned(),
contained_name: "DelmiaReceiver".to_owned(),
browse_name: "TestMachine_001/DelmiaReceiver".to_owned(),
parent_gobject_id: 7,
is_area: false,
category_id: 3,
hosted_by_gobject_id: 1,
template_chain: vec!["$UserDefined".to_owned(), "$DelmiaReceiver".to_owned()],
attributes: vec![GalaxyAttribute {
attribute_name: "DownloadPath".to_owned(),
full_tag_reference: "DelmiaReceiver_001.DownloadPath".to_owned(),
mx_data_type: 8,
data_type_name: "MxString".to_owned(),
is_array: false,
array_dimension: 0,
array_dimension_present: false,
mx_attribute_category: 2,
security_classification: 1,
is_historized: false,
is_alarm: false,
}],
}];
let endpoint = spawn_fake(state.clone()).await;
let mut client = GalaxyClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let objects = client.discover_hierarchy().await.unwrap();
assert_eq!(objects.len(), 1);
assert_eq!(objects[0].tag_name, "DelmiaReceiver_001");
assert_eq!(objects[0].attributes.len(), 1);
assert_eq!(objects[0].attributes[0].attribute_name, "DownloadPath");
assert_eq!(
objects[0].attributes[0].full_tag_reference,
"DelmiaReceiver_001.DownloadPath"
);
}
#[tokio::test]
async fn watch_deploy_events_yields_events_in_order() {
let state = Arc::new(FakeState::default());
*state.watch_events.lock().unwrap() = vec![
DeployEvent {
sequence: 1,
observed_at: Some(Timestamp {
seconds: 1_700_000_000,
nanos: 0,
}),
time_of_last_deploy: Some(Timestamp {
seconds: 1_699_000_000,
nanos: 0,
}),
time_of_last_deploy_present: true,
object_count: 12,
attribute_count: 80,
},
DeployEvent {
sequence: 2,
observed_at: Some(Timestamp {
seconds: 1_700_000_500,
nanos: 0,
}),
time_of_last_deploy: Some(Timestamp {
seconds: 1_699_500_000,
nanos: 0,
}),
time_of_last_deploy_present: true,
object_count: 13,
attribute_count: 85,
},
];
let endpoint = spawn_fake(state.clone()).await;
let mut client = GalaxyClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let mut stream = client.watch_deploy_events(None).await.unwrap();
let first = stream
.next()
.await
.expect("bootstrap event")
.expect("ok deploy event");
let second = stream
.next()
.await
.expect("second event")
.expect("ok deploy event");
assert_eq!(first.sequence, 1);
assert_eq!(first.object_count, 12);
assert_eq!(second.sequence, 2);
assert_eq!(second.object_count, 13);
assert!(first.time_of_last_deploy_present);
}
#[tokio::test]
async fn watch_deploy_events_propagates_last_seen_deploy_time() {
let state = Arc::new(FakeState::default());
let endpoint = spawn_fake(state.clone()).await;
let mut client = GalaxyClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let last_seen = Timestamp {
seconds: 1_699_999_999,
nanos: 123_456_789,
};
let stream = client.watch_deploy_events(Some(last_seen)).await.unwrap();
// Drop the stream right away — the test is solely about the request
// payload reaching the server.
drop(stream);
// Give the server task a moment to record the request.
for _ in 0..20 {
if !state.watch_requests.lock().unwrap().is_empty() {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
let requests = state.watch_requests.lock().unwrap().clone();
assert_eq!(requests.len(), 1);
let recorded = requests[0]
.last_seen_deploy_time
.as_ref()
.expect("last_seen_deploy_time forwarded");
assert_eq!(recorded.seconds, last_seen.seconds);
assert_eq!(recorded.nanos, last_seen.nanos);
}
#[tokio::test]
async fn watch_deploy_events_drop_tears_down_call() {
let state = Arc::new(FakeState::default());
let (signal_tx, mut signal_rx) = mpsc::unbounded_channel();
*state.watch_drop_signal.lock().unwrap() = Some(signal_tx);
// Seed one event so the client gets something on the stream before we
// drop it; this proves the call is live.
*state.watch_events.lock().unwrap() = vec![DeployEvent {
sequence: 7,
observed_at: None,
time_of_last_deploy: None,
time_of_last_deploy_present: false,
object_count: 0,
attribute_count: 0,
}];
let endpoint = spawn_fake(state.clone()).await;
let mut client = GalaxyClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let mut stream = client.watch_deploy_events(None).await.unwrap();
let event = stream
.next()
.await
.expect("bootstrap event")
.expect("ok deploy event");
assert_eq!(event.sequence, 7);
// Dropping the client-side stream must trigger the server-side stream
// future to be dropped as well, signalling cancellation.
drop(stream);
let drop_seen = tokio::time::timeout(std::time::Duration::from_secs(2), signal_rx.recv())
.await
.expect("server-side stream future was not dropped within 2s");
assert!(
drop_seen.is_some(),
"drop signal channel closed unexpectedly"
);
}
}
+8
View File
@@ -13,3 +13,11 @@ pub mod mxaccess_worker {
tonic::include_proto!("mxaccess_worker.v1");
}
}
pub mod galaxy_repository {
pub mod v1 {
#![allow(clippy::large_enum_variant)]
tonic::include_proto!("galaxy_repository.v1");
}
}
+2
View File
@@ -7,6 +7,7 @@
pub mod auth;
pub mod client;
pub mod error;
pub mod galaxy;
pub mod generated;
pub mod options;
pub mod session;
@@ -16,6 +17,7 @@ pub mod version;
pub use auth::{ApiKey, AuthInterceptor};
pub use client::{EventStream, GatewayClient};
pub use error::{CommandError, Error};
pub use galaxy::{DeployEventStream, GalaxyClient};
pub use options::ClientOptions;
pub use session::Session;
pub use value::{MxArrayProjection, MxArrayValue, MxStatus, MxValue, MxValueProjection};