Files
mxaccess/rust/crates/mxaccess/examples/recovery.rs
T
Joseph Doherty 48d3a9d6da [M2/M4] mxaccess-rpc: Guid::parse_str + dedupe examples (resolves F17)
Adds `Guid::parse_str(&str) -> Result<Guid, RpcError>` to
`crates/mxaccess-rpc/src/guid.rs` as the inverse of the existing
`Display` impl. Accepts the canonical dashed-hex form, optionally
braced (.NET `B` format), case-insensitive, and tolerant of bare
32-char hex without dashes. Single-pass char-by-char nibble accumulator
avoids per-byte string allocation; applies the same byte-swap of
groups 1-3 that the `Display` impl reads.

Eight new tests cover round-trip against the existing `Display`
fixture (`crates/mxaccess-rpc/src/guid.rs:111-119`,
`b49f92f7-c748-4169-8eca-a0670b012746`), braces, uppercase, no-dashes,
zero-GUID, too-short, too-long, and non-hex rejection.

The five live-NMX examples (`connect-write-read`, `subscribe`,
`recovery`, `multi-tag`, `secured-write`) lose their per-file 15-line
`parse_guid` helpers in favour of the canonical implementation.
`asb-subscribe` and `subscribe-buffered` are unaffected — they don't
parse GUIDs.

Test count delta: 524 → 532 (+8)
Open followups touched: F17 resolved.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-05 10:18:21 -04:00

178 lines
5.4 KiB
Rust

//! `recovery` — caller-driven `Session::recover_connection(policy)` after
//! heartbeat-loss, with a `recovery_events()` listener consuming the
//! event stream.
//!
//! Recovery is opt-in, not automatic — see `design/20-async-layer.md`
//! and `MxNativeSession.cs:383-440`. The wave-2 implementation emits the
//! Started/Recovered shape as a no-op; the real reconnect-and-readvise
//! loop is followup F16.
//!
//! See `connect-write-read.rs` for env-var contract and resolver shim
//! design notes.
use std::sync::Arc;
use std::time::Duration;
use mxaccess::{
GalaxyTagMetadata, RecoveryEvent, RecoveryPolicy, Resolver, ResolverError, Session,
SessionOptions,
};
use mxaccess_rpc::guid::Guid;
use mxaccess_rpc::ntlm::NtlmClientContext;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let Some(env) = LiveEnv::from_process()? else {
eprintln!(
"MX_LIVE not set — skipping live demo. Run \
`. tools/Setup-LiveProbeEnv.ps1` to populate the required env vars."
);
return Ok(());
};
let session = Session::connect_nmx(
env.addr,
SessionOptions::default(),
NtlmClientContext::from_env()?,
env.service_ipid,
Arc::new(StaticResolver::new(&env.tag)),
RecoveryPolicy::default(),
)
.await?;
// Spawn a listener BEFORE invoking recovery so the Started event
// doesn't race the subscribe.
let events = session.recovery_events();
let listener = tokio::spawn(async move {
let mut events = events;
let mut count = 0;
while let Ok(evt) = events.recv().await {
count += 1;
match &*evt {
RecoveryEvent::Started { attempt } => {
eprintln!("recovery #{attempt} started");
}
RecoveryEvent::Recovered { attempt } => {
eprintln!("recovery #{attempt} succeeded");
}
RecoveryEvent::Failed {
attempt,
error,
will_retry,
} => {
eprintln!("recovery #{attempt} failed: {error} (will_retry={will_retry})");
}
// RecoveryEvent is #[non_exhaustive]; future variants land
// here without breaking compilation.
_ => eprintln!("recovery: unknown event variant"),
}
if count >= 6 {
break;
}
}
});
let policy = RecoveryPolicy {
max_attempts: 3,
delay: Duration::from_millis(250),
};
eprintln!("invoking recover_connection with {:?}", policy);
session.recover_connection(policy).await?;
// Give the listener a moment to drain the broadcast channel.
tokio::time::timeout(Duration::from_millis(500), listener)
.await
.ok();
session.shutdown_nmx().await?;
Ok(())
}
// ---- shared boilerplate (see connect-write-read.rs for rationale) ----------
struct LiveEnv {
addr: std::net::SocketAddr,
service_ipid: Guid,
tag: String,
}
impl LiveEnv {
fn from_process() -> Result<Option<Self>, Box<dyn std::error::Error>> {
if std::env::var_os("MX_LIVE").is_none() {
return Ok(None);
}
let host = std::env::var("MX_NMX_HOST")?;
let addr = parse_host_port(&host, 135)?;
let service_ipid = Guid::parse_str(&std::env::var("MX_NMX_SERVICE_IPID")?)?;
let tag = std::env::var("MX_TEST_TAG").unwrap_or_else(|_| "TestChildObject.TestInt".into());
Ok(Some(Self {
addr,
service_ipid,
tag,
}))
}
}
fn parse_host_port(
s: &str,
default_port: u16,
) -> Result<std::net::SocketAddr, Box<dyn std::error::Error>> {
if let Ok(addr) = s.parse() {
return Ok(addr);
}
let with_port = if s.contains(':') {
s.to_string()
} else {
format!("{s}:{default_port}")
};
Ok(
std::net::ToSocketAddrs::to_socket_addrs(&with_port.as_str())?
.next()
.ok_or("no addrs resolved")?,
)
}
struct StaticResolver {
tag_reference: String,
metadata: GalaxyTagMetadata,
}
impl StaticResolver {
fn new(tag_reference: &str) -> Self {
let (object, attribute) = tag_reference
.split_once('.')
.unwrap_or((tag_reference, "TestInt"));
Self {
tag_reference: tag_reference.to_string(),
metadata: GalaxyTagMetadata {
object_tag_name: object.to_string(),
attribute_name: attribute.to_string(),
primitive_name: None,
platform_id: 1,
engine_id: 2,
object_id: 3,
primitive_id: 0,
attribute_id: 7,
property_id: GalaxyTagMetadata::VALUE_PROPERTY_ID,
mx_data_type: 2,
is_array: false,
security_classification: 0,
attribute_source: "dynamic".into(),
},
}
}
}
#[async_trait::async_trait]
impl Resolver for StaticResolver {
async fn resolve(&self, tag: &str) -> Result<GalaxyTagMetadata, ResolverError> {
if tag == self.tag_reference {
Ok(self.metadata.clone())
} else {
Err(ResolverError::NotFound {
tag_reference: tag.to_string(),
})
}
}
}