Files
mxaccess/rust/crates/mxaccess/examples/recovery.rs
T
Joseph Doherty af939730b1 [M4] mxaccess: examples wave 3 — 7 example programs (M4 wave 3)
Replaces the M0 stub bodies in `crates/mxaccess/examples/` with real
consumer-facing demos against the M4 NMX `Session` surface. Each example
gates on `MX_LIVE` and prints a friendly bypass message when the live
env vars aren't populated, so `cargo build --workspace --all-targets`
stays green in CI without an AVEVA install.

Five examples target the proven NMX path (build + connect + demo +
shutdown):
- `connect-write-read` — `Session::write_value` + `read` round-trip; the
  30-line consumer-experience target from `design/60-roadmap.md` M4 DoD.
- `subscribe` — single-tag `Subscription` stream; drains 5 updates or
  10s timeout, then `unsubscribe` cleanly.
- `recovery` — `RecoveryPolicy { max_attempts: 3, delay: 250ms }`
  + spawned `recovery_events()` listener consuming the broadcast.
- `multi-tag` — per-tag `subscribe` loop merged via
  `futures_util::stream::select_all`; matches the .NET cs:250-270 shape
  (no atomic subscribe-many RPC on the wire).
- `secured-write` — `write_value_secured_at` exercising both single-user
  (`current_user_id == verifier_user_id`) and two-person paths per
  `wwtools/mxaccesscli/src/MxAccess.Cli/Commands/WriteCommand.cs:151-155,196-199`.

Two examples hold the place for downstream milestones:
- `subscribe-buffered` — pattern-matches on `Error::Unsupported` from
  `Session::subscribe_buffered` (M6) and exits 0 with an explanation.
- `asb-subscribe` — same shape against `Session::connect` (M5 ASB).

All five live examples share an inline `LiveEnv::from_process` helper,
a dashed-hex `parse_guid`, and a `StaticResolver` that returns canned
metadata for the configured `MX_TEST_TAG`. The duplication is
intentional — Cargo examples are meant to be self-contained and read
top-to-bottom; consumers swap `StaticResolver` for a tiberius-backed
Galaxy resolver (followup F14) without touching any other example.

Test count delta: 524 → 524 (+0; examples are demos, not tests)
Open followups touched: F17 logged (Guid::parse_str helper to dedupe
the per-example dashed-hex parser).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-05 10:14:15 -04:00

197 lines
6.0 KiB
Rust

//! `recovery` — caller-driven `Session::recover_connection(policy)` after
//! heartbeat-loss, with a `recovery_events()` listener consuming the
//! event stream.
//!
//! Recovery is opt-in, not automatic — see `design/20-async-layer.md`
//! and `MxNativeSession.cs:383-440`. The wave-2 implementation emits the
//! Started/Recovered shape as a no-op; the real reconnect-and-readvise
//! loop is followup F16.
//!
//! See `connect-write-read.rs` for env-var contract and resolver shim
//! design notes.
use std::sync::Arc;
use std::time::Duration;
use mxaccess::{
GalaxyTagMetadata, RecoveryEvent, RecoveryPolicy, Resolver, ResolverError, Session,
SessionOptions,
};
use mxaccess_rpc::guid::Guid;
use mxaccess_rpc::ntlm::NtlmClientContext;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let Some(env) = LiveEnv::from_process()? else {
eprintln!(
"MX_LIVE not set — skipping live demo. Run \
`. tools/Setup-LiveProbeEnv.ps1` to populate the required env vars."
);
return Ok(());
};
let session = Session::connect_nmx(
env.addr,
SessionOptions::default(),
NtlmClientContext::from_env()?,
env.service_ipid,
Arc::new(StaticResolver::new(&env.tag)),
RecoveryPolicy::default(),
)
.await?;
// Spawn a listener BEFORE invoking recovery so the Started event
// doesn't race the subscribe.
let events = session.recovery_events();
let listener = tokio::spawn(async move {
let mut events = events;
let mut count = 0;
while let Ok(evt) = events.recv().await {
count += 1;
match &*evt {
RecoveryEvent::Started { attempt } => {
eprintln!("recovery #{attempt} started");
}
RecoveryEvent::Recovered { attempt } => {
eprintln!("recovery #{attempt} succeeded");
}
RecoveryEvent::Failed {
attempt,
error,
will_retry,
} => {
eprintln!("recovery #{attempt} failed: {error} (will_retry={will_retry})");
}
// RecoveryEvent is #[non_exhaustive]; future variants land
// here without breaking compilation.
_ => eprintln!("recovery: unknown event variant"),
}
if count >= 6 {
break;
}
}
});
let policy = RecoveryPolicy {
max_attempts: 3,
delay: Duration::from_millis(250),
};
eprintln!("invoking recover_connection with {:?}", policy);
session.recover_connection(policy).await?;
// Give the listener a moment to drain the broadcast channel.
tokio::time::timeout(Duration::from_millis(500), listener)
.await
.ok();
session.shutdown_nmx().await?;
Ok(())
}
// ---- shared boilerplate (see connect-write-read.rs for rationale) ----------
struct LiveEnv {
addr: std::net::SocketAddr,
service_ipid: Guid,
tag: String,
}
impl LiveEnv {
fn from_process() -> Result<Option<Self>, Box<dyn std::error::Error>> {
if std::env::var_os("MX_LIVE").is_none() {
return Ok(None);
}
let host = std::env::var("MX_NMX_HOST")?;
let addr = parse_host_port(&host, 135)?;
let service_ipid = parse_guid(&std::env::var("MX_NMX_SERVICE_IPID")?)?;
let tag = std::env::var("MX_TEST_TAG").unwrap_or_else(|_| "TestChildObject.TestInt".into());
Ok(Some(Self {
addr,
service_ipid,
tag,
}))
}
}
fn parse_host_port(
s: &str,
default_port: u16,
) -> Result<std::net::SocketAddr, Box<dyn std::error::Error>> {
if let Ok(addr) = s.parse() {
return Ok(addr);
}
let with_port = if s.contains(':') {
s.to_string()
} else {
format!("{s}:{default_port}")
};
Ok(
std::net::ToSocketAddrs::to_socket_addrs(&with_port.as_str())?
.next()
.ok_or("no addrs resolved")?,
)
}
fn parse_guid(s: &str) -> Result<Guid, Box<dyn std::error::Error>> {
let trimmed = s.trim_start_matches('{').trim_end_matches('}');
let hex: String = trimmed.chars().filter(|c| *c != '-').collect();
if hex.len() != 32 {
return Err(format!("invalid GUID format: {s}").into());
}
let mut bytes = [0u8; 16];
for (i, chunk) in bytes.iter_mut().enumerate() {
let pair = hex
.get(i * 2..i * 2 + 2)
.ok_or("guid hex slice out of range")?;
*chunk = u8::from_str_radix(pair, 16)?;
}
bytes[0..4].reverse();
bytes[4..6].reverse();
bytes[6..8].reverse();
Ok(Guid(bytes))
}
struct StaticResolver {
tag_reference: String,
metadata: GalaxyTagMetadata,
}
impl StaticResolver {
fn new(tag_reference: &str) -> Self {
let (object, attribute) = tag_reference
.split_once('.')
.unwrap_or((tag_reference, "TestInt"));
Self {
tag_reference: tag_reference.to_string(),
metadata: GalaxyTagMetadata {
object_tag_name: object.to_string(),
attribute_name: attribute.to_string(),
primitive_name: None,
platform_id: 1,
engine_id: 2,
object_id: 3,
primitive_id: 0,
attribute_id: 7,
property_id: GalaxyTagMetadata::VALUE_PROPERTY_ID,
mx_data_type: 2,
is_array: false,
security_classification: 0,
attribute_source: "dynamic".into(),
},
}
}
}
#[async_trait::async_trait]
impl Resolver for StaticResolver {
async fn resolve(&self, tag: &str) -> Result<GalaxyTagMetadata, ResolverError> {
if tag == self.tag_reference {
Ok(self.metadata.clone())
} else {
Err(ResolverError::NotFound {
tag_reference: tag.to_string(),
})
}
}
}