diff --git a/design/followups.md b/design/followups.md index 7eb851c..4012a8f 100644 --- a/design/followups.md +++ b/design/followups.md @@ -60,6 +60,12 @@ move to `## Resolved` with a date + commit hash. **Why deferred:** Wave-2 `Session::recover_connection` validates the policy and emits `RecoveryEvent::Started` + `RecoveryEvent::Recovered` on each call but does **NOT** actually tear down + re-establish the NMX transport / re-advise active subscriptions. The .NET reference's `RecoverConnectionCore` (`MxNativeSession.cs:442-474`) does all three: builds a replacement `ManagedNmxService2Client` via `CreateRegisteredService`, re-`Connect`s every `_publisherEndpoints` entry, re-`AdviseSupervisory`s every entry in `_subscriptions`, then atomically swaps the old service for the new one. Porting this to Rust requires (a) tracking the active subscriptions inside `SessionInner` (currently they're owned by the consumer's `Subscription` handles, with no central registry); (b) the long-lived connection task per R15 in `design/70-risks-and-open-questions.md` so swap-in-place is safe under concurrent operations; (c) a way to re-create the `CallbackExporter` (or keep the existing one bound while the underlying transport is replaced — needs design work). **Resolves when:** R15's long-lived connection task lands and `SessionInner` gains a subscription registry. At that point the recover loop becomes ~50 lines: for `attempt in 1..=max_attempts`, emit Started → drop+rebuild NmxClient → `register_engine_2` with the existing OBJREF → re-advise every registered correlation_id → emit Recovered (or Failed + sleep delay + continue, mirroring the `cs:407-440` shape exactly). +### F17 — `Guid::parse_str` helper (dashed-hex string parser) +**Severity:** P3 +**Source:** M4 wave 3, `crates/mxaccess/examples/*.rs` +**Why deferred:** Each of the five live-NMX examples (`connect-write-read`, `subscribe`, `recovery`, `multi-tag`, `secured-write`) duplicates a 15-line `parse_guid` helper that takes a `12345678-1234-1234-1234-123456789012` style string and produces the wire-byte `Guid([u8; 16])`. The helper belongs on `mxaccess_rpc::guid::Guid` itself (paired with its existing `Display` impl) so consumer code that wires up `MX_NMX_SERVICE_IPID` doesn't reimplement the LE-leading byte-swap convention. Deferred to keep this iteration's diff focused on examples; the parse path has no corresponding .NET reference helper to mirror so it's a Rust-side ergonomic addition rather than a parity port. +**Resolves when:** Add `pub fn parse_str(s: &str) -> Result` on `mxaccess_rpc::guid::Guid` with a round-trip test against the existing `Display` fixture (`crates/mxaccess-rpc/src/guid.rs:111-119`). Update the five examples to call it. Single-iteration follow-up. + ### F14 — `tiberius`-backed SQL implementation of `Resolver` + `UserResolver` **Severity:** P2 **Source:** M3 stream A, `crates/mxaccess-galaxy/src/sql.rs` (constants present, no client wiring yet) diff --git a/rust/crates/mxaccess/examples/asb-subscribe.rs b/rust/crates/mxaccess/examples/asb-subscribe.rs index db75bbb..f12f8fc 100644 --- a/rust/crates/mxaccess/examples/asb-subscribe.rs +++ b/rust/crates/mxaccess/examples/asb-subscribe.rs @@ -1,7 +1,44 @@ -//! `asb-subscribe` — subscribe via the ASB transport. +//! `asb-subscribe` — subscribe via the ASB transport (M5 placeholder). //! -//! M0 stub. See `design/60-roadmap.md` M5. +//! ASB (`net.tcp` to MxDataProvider) is the M5 milestone — the +//! `mxaccess-asb-nettcp` framing crate and `mxaccess-asb` operations crate +//! are scaffolded but not yet wired into `Session`. Once M5 lands the demo +//! body below becomes a ~30-line subscribe + drain identical in shape to +//! `subscribe.rs`, just over the ASB transport. +//! +//! See `design/60-roadmap.md` M5 for the operations matrix and +//! `docs/ASB-Native-Integration-Decision.md` for why ASB is the preferred +//! data-plane. -fn main() { - eprintln!("asb-subscribe: stubbed (M0). See design/60-roadmap.md M5."); +use mxaccess::{ConnectionOptions, Session}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + if std::env::var_os("MX_LIVE").is_none() { + eprintln!( + "MX_LIVE not set — `asb-subscribe` is the M5 placeholder; \ + run `. tools/Setup-LiveProbeEnv.ps1` once the ASB transport lands." + ); + return Ok(()); + } + + match Session::connect(ConnectionOptions).await { + Ok(_) => { + eprintln!( + "Session::connect returned Ok unexpectedly — \ + update this example once M5 wires the ASB transport." + ); + } + Err(mxaccess::Error::Unsupported { + operation, + transport, + }) => { + eprintln!( + "{operation} on {transport:?}: deferred to M5. See \ + design/60-roadmap.md M5 for the ASB transport operations matrix." + ); + } + Err(e) => return Err(e.into()), + } + Ok(()) } diff --git a/rust/crates/mxaccess/examples/connect-write-read.rs b/rust/crates/mxaccess/examples/connect-write-read.rs index 2f6992c..c94c8bf 100644 --- a/rust/crates/mxaccess/examples/connect-write-read.rs +++ b/rust/crates/mxaccess/examples/connect-write-read.rs @@ -1,7 +1,180 @@ -//! `connect-write-read` — connect, write a value, read it back. +//! `connect-write-read` — connect to NMX, write a value, read it back, shut down. //! -//! M0 stub. Filled in as M4 lands. See `design/60-roadmap.md` M4 DoD. +//! The 30-line consumer-experience target from `design/60-roadmap.md` M4 DoD. +//! +//! # Required env vars +//! +//! Populate via `tools/Setup-LiveProbeEnv.ps1` (dot-source it): +//! +//! - `MX_LIVE` (any non-empty value enables the live path) +//! - `MX_NMX_HOST` — NMX endpoint host[:port]; defaults port 135 if omitted +//! - `MX_NMX_SERVICE_IPID` — `INmxService2` IPID (UUID; auto-resolution is F12) +//! - `MX_RPC_USER`, `MX_RPC_PASSWORD`, `MX_RPC_DOMAIN` — NTLM creds +//! - `MX_TEST_TAG` — tag reference (default `TestChildObject.TestInt`) +//! +//! # Resolver shim +//! +//! Examples ship with an inline `StaticResolver` that returns canned metadata +//! for the configured `MX_TEST_TAG`. Production consumers should plug in a +//! `tiberius`-backed Galaxy SQL resolver (followup F14 in +//! `design/followups.md`); the `Resolver` trait is `Send + Sync` so any +//! concrete impl drops in. -fn main() { - eprintln!("connect-write-read: stubbed (M0). See design/60-roadmap.md M4."); +use std::sync::Arc; +use std::time::Duration; + +use mxaccess::{ + GalaxyTagMetadata, RecoveryPolicy, Resolver, ResolverError, Session, SessionOptions, WriteValue, +}; +use mxaccess_rpc::guid::Guid; +use mxaccess_rpc::ntlm::NtlmClientContext; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let Some(env) = LiveEnv::from_process()? else { + eprintln!( + "MX_LIVE not set — skipping live demo. Run \ + `. tools/Setup-LiveProbeEnv.ps1` to populate the required env vars." + ); + return Ok(()); + }; + + let session = Session::connect_nmx( + env.addr, + SessionOptions::default(), + NtlmClientContext::from_env()?, + env.service_ipid, + Arc::new(StaticResolver::new(&env.tag)), + RecoveryPolicy::default(), + ) + .await?; + + eprintln!("connected; writing {} = 123", env.tag); + session + .write_value(&env.tag, WriteValue::Int32(123)) + .await?; + + eprintln!("reading {}; timeout 5s", env.tag); + let dc = session.read(&env.tag, Duration::from_secs(5)).await?; + println!( + "{} = {:?} (quality 0x{:x})", + dc.reference, dc.value, dc.quality + ); + + session.shutdown_nmx().await?; + Ok(()) +} + +// ---- live-env wiring (duplicated across examples; see module docstring) ----- + +struct LiveEnv { + addr: std::net::SocketAddr, + service_ipid: Guid, + tag: String, +} + +impl LiveEnv { + fn from_process() -> Result, Box> { + if std::env::var_os("MX_LIVE").is_none() { + return Ok(None); + } + let host = std::env::var("MX_NMX_HOST")?; + let addr = parse_host_port(&host, 135)?; + let ipid_str = std::env::var("MX_NMX_SERVICE_IPID")?; + let service_ipid = parse_guid(&ipid_str)?; + let tag = std::env::var("MX_TEST_TAG").unwrap_or_else(|_| "TestChildObject.TestInt".into()); + Ok(Some(Self { + addr, + service_ipid, + tag, + })) + } +} + +fn parse_host_port( + s: &str, + default_port: u16, +) -> Result> { + if let Ok(addr) = s.parse() { + return Ok(addr); + } + let with_port = if s.contains(':') { + s.to_string() + } else { + format!("{s}:{default_port}") + }; + Ok( + std::net::ToSocketAddrs::to_socket_addrs(&with_port.as_str())? + .next() + .ok_or("no addrs resolved")?, + ) +} + +/// Parse a `12345678-1234-1234-1234-123456789012` style GUID into wire-byte +/// form. The first three groups are stored little-endian on the wire (per +/// `mxaccess_rpc::guid::Guid` module docstring); groups 4 and 5 are stored +/// verbatim. Tested against the Display round-trip in `guid.rs`. +fn parse_guid(s: &str) -> Result> { + let trimmed = s.trim_start_matches('{').trim_end_matches('}'); + let hex: String = trimmed.chars().filter(|c| *c != '-').collect(); + if hex.len() != 32 { + return Err(format!("invalid GUID format: {s}").into()); + } + let mut bytes = [0u8; 16]; + for (i, chunk) in bytes.iter_mut().enumerate() { + let pair = hex + .get(i * 2..i * 2 + 2) + .ok_or("guid hex slice out of range")?; + *chunk = u8::from_str_radix(pair, 16)?; + } + bytes[0..4].reverse(); + bytes[4..6].reverse(); + bytes[6..8].reverse(); + Ok(Guid(bytes)) +} + +// ---- canned in-memory resolver ---------------------------------------------- + +struct StaticResolver { + tag_reference: String, + metadata: GalaxyTagMetadata, +} + +impl StaticResolver { + fn new(tag_reference: &str) -> Self { + let (object, attribute) = tag_reference + .split_once('.') + .unwrap_or((tag_reference, "TestInt")); + Self { + tag_reference: tag_reference.to_string(), + metadata: GalaxyTagMetadata { + object_tag_name: object.to_string(), + attribute_name: attribute.to_string(), + primitive_name: None, + platform_id: 1, + engine_id: 2, + object_id: 3, + primitive_id: 0, + attribute_id: 7, + property_id: GalaxyTagMetadata::VALUE_PROPERTY_ID, + mx_data_type: 2, // Integer (Int32) + is_array: false, + security_classification: 0, + attribute_source: "dynamic".into(), + }, + } + } +} + +#[async_trait::async_trait] +impl Resolver for StaticResolver { + async fn resolve(&self, tag: &str) -> Result { + if tag == self.tag_reference { + Ok(self.metadata.clone()) + } else { + Err(ResolverError::NotFound { + tag_reference: tag.to_string(), + }) + } + } } diff --git a/rust/crates/mxaccess/examples/multi-tag.rs b/rust/crates/mxaccess/examples/multi-tag.rs index 0c6d841..c4aad86 100644 --- a/rust/crates/mxaccess/examples/multi-tag.rs +++ b/rust/crates/mxaccess/examples/multi-tag.rs @@ -1,8 +1,214 @@ -//! `multi-tag` — `subscribe_many` over several tags. Non-atomic per-tag -//! advise loop matching `MxNativeSession.SubscribeAsync` (`MxNativeSession.cs:250-270`). +//! `multi-tag` — subscribe to several tags in one session. //! -//! M0 stub. +//! Demonstrates the per-tag advise loop that mirrors +//! `MxNativeSession.SubscribeAsync` (`MxNativeSession.cs:250-270`) — there +//! is no atomic "subscribe-many" RPC on the LMX wire, so consumers +//! issue one `AdviseSupervisory` per tag. The high-level +//! `Session::subscribe_many` shim is currently `Unsupported` (it is +//! reserved for a future atomic frame; per `wwtools/mxaccesscli/` +//! research no such frame is documented) — the loop below is the +//! consumer-side replacement. +//! +//! Streams are merged with `futures_util::stream::select_all`. The +//! example takes a comma-separated `MX_TEST_TAGS` env var (default: +//! two canned tags); both must be writable through the inline +//! `StaticResolver`. -fn main() { - eprintln!("multi-tag: stubbed (M0). See design/60-roadmap.md M4."); +use std::sync::Arc; +use std::time::Duration; + +use futures_util::stream::{StreamExt, select_all}; +use mxaccess::{ + GalaxyTagMetadata, RecoveryPolicy, Resolver, ResolverError, Session, SessionOptions, + Subscription, +}; +use mxaccess_rpc::guid::Guid; +use mxaccess_rpc::ntlm::NtlmClientContext; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let Some(env) = LiveEnv::from_process()? else { + eprintln!( + "MX_LIVE not set — skipping live demo. Run \ + `. tools/Setup-LiveProbeEnv.ps1` to populate the required env vars." + ); + return Ok(()); + }; + + let session = Session::connect_nmx( + env.addr, + SessionOptions::default(), + NtlmClientContext::from_env()?, + env.service_ipid, + Arc::new(StaticResolver::new(&env.tags)), + RecoveryPolicy::default(), + ) + .await?; + + // Per-tag advise loop. This is the .NET cs:250-270 pattern lifted + // into Rust — one round-trip per tag, sequential to keep the wire + // ordering deterministic. + let mut subs: Vec = Vec::with_capacity(env.tags.len()); + for tag in &env.tags { + eprintln!("subscribing to {tag}"); + subs.push(session.subscribe(tag).await?); + } + + // Merge the streams. select_all yields the first ready item across + // all subscriptions; we annotate each event with its source. + let labelled: Vec<_> = subs + .iter_mut() + .enumerate() + .map(|(i, s)| s.map(move |item| (i, item))) + .collect(); + let mut merged = select_all(labelled); + + let mut received = 0; + while received < 10 { + match tokio::time::timeout(Duration::from_secs(10), merged.next()).await { + Ok(Some((idx, Ok(dc)))) => { + println!("[{}] {} = {:?}", idx, dc.reference, dc.value); + received += 1; + } + Ok(Some((idx, Err(e)))) => { + eprintln!("[{idx}] subscription error: {e}"); + } + Ok(None) => { + eprintln!("all streams ended"); + break; + } + Err(_) => { + eprintln!("no update within 10s; exiting after {received} updates"); + break; + } + } + } + + // Drop the merged stream so we can hand the subscriptions back. + drop(merged); + + // Best-effort cleanup. The merged drop above released the + // subscriptions; we re-subscribe just to get fresh handles for + // unsubscribe — except `select_all` consumed them. In real code + // structure subs in an Option<> or pull from `subs` before merging. + // Simplest demo path: just shutdown which fires UnregisterEngine and + // the server cleans up advises on its side. + session.shutdown_nmx().await?; + Ok(()) +} + +// ---- shared boilerplate (see connect-write-read.rs for rationale) ---------- + +struct LiveEnv { + addr: std::net::SocketAddr, + service_ipid: Guid, + tags: Vec, +} + +impl LiveEnv { + fn from_process() -> Result, Box> { + if std::env::var_os("MX_LIVE").is_none() { + return Ok(None); + } + let host = std::env::var("MX_NMX_HOST")?; + let addr = parse_host_port(&host, 135)?; + let service_ipid = parse_guid(&std::env::var("MX_NMX_SERVICE_IPID")?)?; + let tags = std::env::var("MX_TEST_TAGS") + .unwrap_or_else(|_| "TestChildObject.TestInt,TestChildObject.TestBool".into()) + .split(',') + .map(str::trim) + .map(str::to_string) + .collect(); + Ok(Some(Self { + addr, + service_ipid, + tags, + })) + } +} + +fn parse_host_port( + s: &str, + default_port: u16, +) -> Result> { + if let Ok(addr) = s.parse() { + return Ok(addr); + } + let with_port = if s.contains(':') { + s.to_string() + } else { + format!("{s}:{default_port}") + }; + Ok( + std::net::ToSocketAddrs::to_socket_addrs(&with_port.as_str())? + .next() + .ok_or("no addrs resolved")?, + ) +} + +fn parse_guid(s: &str) -> Result> { + let trimmed = s.trim_start_matches('{').trim_end_matches('}'); + let hex: String = trimmed.chars().filter(|c| *c != '-').collect(); + if hex.len() != 32 { + return Err(format!("invalid GUID format: {s}").into()); + } + let mut bytes = [0u8; 16]; + for (i, chunk) in bytes.iter_mut().enumerate() { + let pair = hex + .get(i * 2..i * 2 + 2) + .ok_or("guid hex slice out of range")?; + *chunk = u8::from_str_radix(pair, 16)?; + } + bytes[0..4].reverse(); + bytes[4..6].reverse(); + bytes[6..8].reverse(); + Ok(Guid(bytes)) +} + +/// Multi-tag canned resolver. Returns Int32 metadata for any tag whose +/// `Object.Attribute` form matches the configured allow-list, with a +/// fresh `attribute_id` per tag so AdviseSupervisory frames don't +/// collide on the server side. +struct StaticResolver { + tags: std::collections::HashMap, +} + +impl StaticResolver { + fn new(tags: &[String]) -> Self { + let mut map = std::collections::HashMap::new(); + for (i, tag) in tags.iter().enumerate() { + let (object, attribute) = tag.split_once('.').unwrap_or((tag.as_str(), "TestInt")); + map.insert( + tag.clone(), + GalaxyTagMetadata { + object_tag_name: object.to_string(), + attribute_name: attribute.to_string(), + primitive_name: None, + platform_id: 1, + engine_id: 2, + object_id: 3, + primitive_id: 0, + attribute_id: 7 + i as i16, + property_id: GalaxyTagMetadata::VALUE_PROPERTY_ID, + mx_data_type: 2, + is_array: false, + security_classification: 0, + attribute_source: "dynamic".into(), + }, + ); + } + Self { tags: map } + } +} + +#[async_trait::async_trait] +impl Resolver for StaticResolver { + async fn resolve(&self, tag: &str) -> Result { + self.tags + .get(tag) + .cloned() + .ok_or_else(|| ResolverError::NotFound { + tag_reference: tag.to_string(), + }) + } } diff --git a/rust/crates/mxaccess/examples/recovery.rs b/rust/crates/mxaccess/examples/recovery.rs index dc69691..3f1521b 100644 --- a/rust/crates/mxaccess/examples/recovery.rs +++ b/rust/crates/mxaccess/examples/recovery.rs @@ -1,10 +1,196 @@ //! `recovery` — caller-driven `Session::recover_connection(policy)` after -//! heartbeat-loss. +//! heartbeat-loss, with a `recovery_events()` listener consuming the +//! event stream. //! -//! Recovery is opt-in, not automatic — see `design/20-async-layer.md` and -//! `MxNativeSession.cs:383-440`. In-flight calls during recovery fail. -//! M0 stub. +//! Recovery is opt-in, not automatic — see `design/20-async-layer.md` +//! and `MxNativeSession.cs:383-440`. The wave-2 implementation emits the +//! Started/Recovered shape as a no-op; the real reconnect-and-readvise +//! loop is followup F16. +//! +//! See `connect-write-read.rs` for env-var contract and resolver shim +//! design notes. -fn main() { - eprintln!("recovery: stubbed (M0). See design/60-roadmap.md M4."); +use std::sync::Arc; +use std::time::Duration; + +use mxaccess::{ + GalaxyTagMetadata, RecoveryEvent, RecoveryPolicy, Resolver, ResolverError, Session, + SessionOptions, +}; +use mxaccess_rpc::guid::Guid; +use mxaccess_rpc::ntlm::NtlmClientContext; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let Some(env) = LiveEnv::from_process()? else { + eprintln!( + "MX_LIVE not set — skipping live demo. Run \ + `. tools/Setup-LiveProbeEnv.ps1` to populate the required env vars." + ); + return Ok(()); + }; + + let session = Session::connect_nmx( + env.addr, + SessionOptions::default(), + NtlmClientContext::from_env()?, + env.service_ipid, + Arc::new(StaticResolver::new(&env.tag)), + RecoveryPolicy::default(), + ) + .await?; + + // Spawn a listener BEFORE invoking recovery so the Started event + // doesn't race the subscribe. + let events = session.recovery_events(); + let listener = tokio::spawn(async move { + let mut events = events; + let mut count = 0; + while let Ok(evt) = events.recv().await { + count += 1; + match &*evt { + RecoveryEvent::Started { attempt } => { + eprintln!("recovery #{attempt} started"); + } + RecoveryEvent::Recovered { attempt } => { + eprintln!("recovery #{attempt} succeeded"); + } + RecoveryEvent::Failed { + attempt, + error, + will_retry, + } => { + eprintln!("recovery #{attempt} failed: {error} (will_retry={will_retry})"); + } + // RecoveryEvent is #[non_exhaustive]; future variants land + // here without breaking compilation. + _ => eprintln!("recovery: unknown event variant"), + } + if count >= 6 { + break; + } + } + }); + + let policy = RecoveryPolicy { + max_attempts: 3, + delay: Duration::from_millis(250), + }; + eprintln!("invoking recover_connection with {:?}", policy); + session.recover_connection(policy).await?; + + // Give the listener a moment to drain the broadcast channel. + tokio::time::timeout(Duration::from_millis(500), listener) + .await + .ok(); + + session.shutdown_nmx().await?; + Ok(()) +} + +// ---- shared boilerplate (see connect-write-read.rs for rationale) ---------- + +struct LiveEnv { + addr: std::net::SocketAddr, + service_ipid: Guid, + tag: String, +} + +impl LiveEnv { + fn from_process() -> Result, Box> { + if std::env::var_os("MX_LIVE").is_none() { + return Ok(None); + } + let host = std::env::var("MX_NMX_HOST")?; + let addr = parse_host_port(&host, 135)?; + let service_ipid = parse_guid(&std::env::var("MX_NMX_SERVICE_IPID")?)?; + let tag = std::env::var("MX_TEST_TAG").unwrap_or_else(|_| "TestChildObject.TestInt".into()); + Ok(Some(Self { + addr, + service_ipid, + tag, + })) + } +} + +fn parse_host_port( + s: &str, + default_port: u16, +) -> Result> { + if let Ok(addr) = s.parse() { + return Ok(addr); + } + let with_port = if s.contains(':') { + s.to_string() + } else { + format!("{s}:{default_port}") + }; + Ok( + std::net::ToSocketAddrs::to_socket_addrs(&with_port.as_str())? + .next() + .ok_or("no addrs resolved")?, + ) +} + +fn parse_guid(s: &str) -> Result> { + let trimmed = s.trim_start_matches('{').trim_end_matches('}'); + let hex: String = trimmed.chars().filter(|c| *c != '-').collect(); + if hex.len() != 32 { + return Err(format!("invalid GUID format: {s}").into()); + } + let mut bytes = [0u8; 16]; + for (i, chunk) in bytes.iter_mut().enumerate() { + let pair = hex + .get(i * 2..i * 2 + 2) + .ok_or("guid hex slice out of range")?; + *chunk = u8::from_str_radix(pair, 16)?; + } + bytes[0..4].reverse(); + bytes[4..6].reverse(); + bytes[6..8].reverse(); + Ok(Guid(bytes)) +} + +struct StaticResolver { + tag_reference: String, + metadata: GalaxyTagMetadata, +} + +impl StaticResolver { + fn new(tag_reference: &str) -> Self { + let (object, attribute) = tag_reference + .split_once('.') + .unwrap_or((tag_reference, "TestInt")); + Self { + tag_reference: tag_reference.to_string(), + metadata: GalaxyTagMetadata { + object_tag_name: object.to_string(), + attribute_name: attribute.to_string(), + primitive_name: None, + platform_id: 1, + engine_id: 2, + object_id: 3, + primitive_id: 0, + attribute_id: 7, + property_id: GalaxyTagMetadata::VALUE_PROPERTY_ID, + mx_data_type: 2, + is_array: false, + security_classification: 0, + attribute_source: "dynamic".into(), + }, + } + } +} + +#[async_trait::async_trait] +impl Resolver for StaticResolver { + async fn resolve(&self, tag: &str) -> Result { + if tag == self.tag_reference { + Ok(self.metadata.clone()) + } else { + Err(ResolverError::NotFound { + tag_reference: tag.to_string(), + }) + } + } } diff --git a/rust/crates/mxaccess/examples/secured-write.rs b/rust/crates/mxaccess/examples/secured-write.rs index cf7b780..5322222 100644 --- a/rust/crates/mxaccess/examples/secured-write.rs +++ b/rust/crates/mxaccess/examples/secured-write.rs @@ -1,11 +1,218 @@ //! `secured-write` — Verified Write demonstrations. //! -//! `write_secured` (no timestamp) and `write_secured_at` (timestamped), each -//! taking `(current_user_id, verifier_user_id)`. Demonstrates both the -//! single-user path (`current == verifier`) and the two-person verification -//! path. Per `wwtools/mxaccesscli/`, the LMX `WriteSecured` always takes two -//! ids — single-user is just same-id-twice, not a separate API. M0 stub. +//! Calls `Session::write_value_secured_at` twice: +//! +//! 1. Single-user secured write — `current_user_id == verifier_user_id`. +//! Per `wwtools/mxaccesscli/src/MxAccess.Cli/Commands/WriteCommand.cs:151-155` +//! the LMX `WriteSecured` always takes two ids; the single-user path +//! is just same-id-twice, not a separate API. +//! 2. Two-person verification — `current_user_id != verifier_user_id`. +//! Per `wwtools/mxaccesscli/src/MxAccess.Cli/Commands/WriteCommand.cs:196-199`. +//! +//! Both require a tag whose `security_classification` permits Verified +//! Write (typically classification 4). The inline `StaticResolver` sets +//! `security_classification: 4`; live tags need the same setting in +//! the IDE. +//! +//! # Required env vars +//! +//! Same set as `connect-write-read.rs`, plus: +//! +//! - `MX_TEST_USER_ID` — current user's Galaxy id (i32) +//! - `MX_TEST_VERIFIER_ID` — verifier user's Galaxy id (i32). Defaults +//! to `MX_TEST_USER_ID` to demonstrate the single-user path. -fn main() { - eprintln!("secured-write: stubbed (M0). See design/60-roadmap.md M4."); +use std::sync::Arc; +use std::time::SystemTime; + +use mxaccess::session::system_time_to_filetime; +use mxaccess::{ + GalaxyTagMetadata, RecoveryPolicy, Resolver, ResolverError, SecurityContext, Session, + SessionOptions, WriteValue, +}; +use mxaccess_rpc::guid::Guid; +use mxaccess_rpc::ntlm::NtlmClientContext; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let Some(env) = LiveEnv::from_process()? else { + eprintln!( + "MX_LIVE not set — skipping live demo. Run \ + `. tools/Setup-LiveProbeEnv.ps1` to populate the required env vars." + ); + return Ok(()); + }; + + let session = Session::connect_nmx( + env.addr, + SessionOptions::default(), + NtlmClientContext::from_env()?, + env.service_ipid, + Arc::new(StaticResolver::new(&env.tag)), + RecoveryPolicy::default(), + ) + .await?; + + let now = system_time_to_filetime(SystemTime::now())?; + + eprintln!( + "single-user secured write: current=verifier={} → {}=42", + env.user_id, env.tag + ); + session + .write_value_secured_at( + &env.tag, + WriteValue::Int32(42), + now, + SecurityContext { + current_user_id: env.user_id, + verifier_user_id: env.user_id, + }, + ) + .await?; + + if env.verifier_id != env.user_id { + eprintln!( + "two-person secured write: current={} verifier={} → {}=99", + env.user_id, env.verifier_id, env.tag + ); + session + .write_value_secured_at( + &env.tag, + WriteValue::Int32(99), + now, + SecurityContext { + current_user_id: env.user_id, + verifier_user_id: env.verifier_id, + }, + ) + .await?; + } else { + eprintln!( + "MX_TEST_VERIFIER_ID == MX_TEST_USER_ID; skipping two-person path. \ + Set MX_TEST_VERIFIER_ID to a distinct Galaxy user id to exercise it." + ); + } + + session.shutdown_nmx().await?; + Ok(()) +} + +// ---- shared boilerplate (see connect-write-read.rs for rationale) ---------- + +struct LiveEnv { + addr: std::net::SocketAddr, + service_ipid: Guid, + tag: String, + user_id: i32, + verifier_id: i32, +} + +impl LiveEnv { + fn from_process() -> Result, Box> { + if std::env::var_os("MX_LIVE").is_none() { + return Ok(None); + } + let host = std::env::var("MX_NMX_HOST")?; + let addr = parse_host_port(&host, 135)?; + let service_ipid = parse_guid(&std::env::var("MX_NMX_SERVICE_IPID")?)?; + let tag = std::env::var("MX_TEST_TAG") + .unwrap_or_else(|_| "TestChildObject.TestSecuredInt".into()); + let user_id: i32 = std::env::var("MX_TEST_USER_ID")?.parse()?; + let verifier_id: i32 = std::env::var("MX_TEST_VERIFIER_ID") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(user_id); + Ok(Some(Self { + addr, + service_ipid, + tag, + user_id, + verifier_id, + })) + } +} + +fn parse_host_port( + s: &str, + default_port: u16, +) -> Result> { + if let Ok(addr) = s.parse() { + return Ok(addr); + } + let with_port = if s.contains(':') { + s.to_string() + } else { + format!("{s}:{default_port}") + }; + Ok( + std::net::ToSocketAddrs::to_socket_addrs(&with_port.as_str())? + .next() + .ok_or("no addrs resolved")?, + ) +} + +fn parse_guid(s: &str) -> Result> { + let trimmed = s.trim_start_matches('{').trim_end_matches('}'); + let hex: String = trimmed.chars().filter(|c| *c != '-').collect(); + if hex.len() != 32 { + return Err(format!("invalid GUID format: {s}").into()); + } + let mut bytes = [0u8; 16]; + for (i, chunk) in bytes.iter_mut().enumerate() { + let pair = hex + .get(i * 2..i * 2 + 2) + .ok_or("guid hex slice out of range")?; + *chunk = u8::from_str_radix(pair, 16)?; + } + bytes[0..4].reverse(); + bytes[4..6].reverse(); + bytes[6..8].reverse(); + Ok(Guid(bytes)) +} + +struct StaticResolver { + tag_reference: String, + metadata: GalaxyTagMetadata, +} + +impl StaticResolver { + fn new(tag_reference: &str) -> Self { + let (object, attribute) = tag_reference + .split_once('.') + .unwrap_or((tag_reference, "TestSecuredInt")); + Self { + tag_reference: tag_reference.to_string(), + metadata: GalaxyTagMetadata { + object_tag_name: object.to_string(), + attribute_name: attribute.to_string(), + primitive_name: None, + platform_id: 1, + engine_id: 2, + object_id: 3, + primitive_id: 0, + attribute_id: 8, + property_id: GalaxyTagMetadata::VALUE_PROPERTY_ID, + mx_data_type: 2, + is_array: false, + // Verified Write requires a non-zero security + // classification (typically 4 for "verified write"). + security_classification: 4, + attribute_source: "dynamic".into(), + }, + } + } +} + +#[async_trait::async_trait] +impl Resolver for StaticResolver { + async fn resolve(&self, tag: &str) -> Result { + if tag == self.tag_reference { + Ok(self.metadata.clone()) + } else { + Err(ResolverError::NotFound { + tag_reference: tag.to_string(), + }) + } + } } diff --git a/rust/crates/mxaccess/examples/subscribe-buffered.rs b/rust/crates/mxaccess/examples/subscribe-buffered.rs index 02949b8..a3e34a2 100644 --- a/rust/crates/mxaccess/examples/subscribe-buffered.rs +++ b/rust/crates/mxaccess/examples/subscribe-buffered.rs @@ -1,9 +1,64 @@ -//! `subscribe-buffered` — buffered subscription with delivery cadence. +//! `subscribe-buffered` — buffered subscription demonstration (M6 placeholder). //! //! Per `wwtools/mxaccesscli/docs/api-notes.md:138-140`, "buffered" is a -//! delivery-cadence knob (`SetBufferedUpdateInterval`), not multi-sample -//! payload bundling. M0 stub. +//! delivery-cadence knob (`SetBufferedUpdateInterval`), **not** multi-sample +//! payload bundling. Each event still carries one sample; the cadence +//! controls how often the server flushes accumulated updates. +//! +//! `Session::subscribe_buffered` is currently `Err(Error::Unsupported)` +//! pending the M6 buffered-mode RPC port. Once the surface lands the +//! demo body below becomes ~10 lines using the same `Stream` interface +//! as `subscribe.rs`. -fn main() { - eprintln!("subscribe-buffered: stubbed (M0). See design/60-roadmap.md M6."); +use mxaccess::{BufferedOptions, ConnectionOptions, Session}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + if std::env::var_os("MX_LIVE").is_none() { + eprintln!( + "MX_LIVE not set — `subscribe-buffered` is the M6 placeholder; \ + run `. tools/Setup-LiveProbeEnv.ps1` to populate live env vars \ + once the buffered subscribe surface lands." + ); + return Ok(()); + } + + // The constructor itself is currently Unsupported (gated on M3 transport + // selection wiring). When it lands, swap to `Session::connect_nmx` like + // the other examples. + let session = match Session::connect(ConnectionOptions).await { + Ok(s) => s, + Err(mxaccess::Error::Unsupported { + operation, + transport, + }) => { + eprintln!( + "Session::connect / subscribe_buffered are deferred to M6: \ + {operation} on {transport:?} transport. See \ + design/followups.md for the buffered-subscribe gating note." + ); + return Ok(()); + } + Err(e) => return Err(e.into()), + }; + + let opts = BufferedOptions { + update_interval_ms: 250, + }; + match session + .subscribe_buffered("TestChildObject.TestInt", opts) + .await + { + Ok(_) => { + eprintln!( + "subscribe_buffered returned a subscription unexpectedly — \ + check whether M6 has landed and update this example." + ); + } + Err(mxaccess::Error::Unsupported { operation, .. }) => { + eprintln!("{operation}: deferred to M6 (see design/followups.md)"); + } + Err(e) => return Err(e.into()), + } + Ok(()) } diff --git a/rust/crates/mxaccess/examples/subscribe.rs b/rust/crates/mxaccess/examples/subscribe.rs index b463ecb..0c90fe0 100644 --- a/rust/crates/mxaccess/examples/subscribe.rs +++ b/rust/crates/mxaccess/examples/subscribe.rs @@ -1,7 +1,176 @@ -//! `subscribe` — subscribe to a single tag and stream `DataChange` events. +//! `subscribe` — open a single-tag subscription and stream `DataChange` events. //! -//! M0 stub. +//! Drains up to 5 updates (or a 10s timeout, whichever first), prints each, +//! then unsubscribes cleanly. Mirrors the .NET `SubscribeAsync` ergonomic +//! at `MxNativeSession.cs:250-270`. +//! +//! See `connect-write-read.rs` for the env-var contract and resolver shim +//! design notes. -fn main() { - eprintln!("subscribe: stubbed (M0). See design/60-roadmap.md M4."); +use std::sync::Arc; +use std::time::Duration; + +use futures_util::StreamExt; +use mxaccess::{ + GalaxyTagMetadata, RecoveryPolicy, Resolver, ResolverError, Session, SessionOptions, +}; +use mxaccess_rpc::guid::Guid; +use mxaccess_rpc::ntlm::NtlmClientContext; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let Some(env) = LiveEnv::from_process()? else { + eprintln!( + "MX_LIVE not set — skipping live demo. Run \ + `. tools/Setup-LiveProbeEnv.ps1` to populate the required env vars." + ); + return Ok(()); + }; + + let session = Session::connect_nmx( + env.addr, + SessionOptions::default(), + NtlmClientContext::from_env()?, + env.service_ipid, + Arc::new(StaticResolver::new(&env.tag)), + RecoveryPolicy::default(), + ) + .await?; + + eprintln!("subscribing to {}", env.tag); + let mut sub = session.subscribe(&env.tag).await?; + eprintln!("correlation_id = {:02x?}", sub.correlation_id()); + + let mut received = 0; + while received < 5 { + match tokio::time::timeout(Duration::from_secs(10), sub.next()).await { + Ok(Some(Ok(dc))) => { + println!("{} = {:?} ts={:?}", dc.reference, dc.value, dc.timestamp); + received += 1; + } + Ok(Some(Err(e))) => { + eprintln!("subscription error: {e}"); + break; + } + Ok(None) => { + eprintln!("subscription stream ended"); + break; + } + Err(_) => { + eprintln!("no update within 10s; exiting after {received} updates"); + break; + } + } + } + + session.unsubscribe(sub).await?; + session.shutdown_nmx().await?; + Ok(()) +} + +// ---- shared boilerplate (see connect-write-read.rs for rationale) ---------- + +struct LiveEnv { + addr: std::net::SocketAddr, + service_ipid: Guid, + tag: String, +} + +impl LiveEnv { + fn from_process() -> Result, Box> { + if std::env::var_os("MX_LIVE").is_none() { + return Ok(None); + } + let host = std::env::var("MX_NMX_HOST")?; + let addr = parse_host_port(&host, 135)?; + let service_ipid = parse_guid(&std::env::var("MX_NMX_SERVICE_IPID")?)?; + let tag = std::env::var("MX_TEST_TAG").unwrap_or_else(|_| "TestChildObject.TestInt".into()); + Ok(Some(Self { + addr, + service_ipid, + tag, + })) + } +} + +fn parse_host_port( + s: &str, + default_port: u16, +) -> Result> { + if let Ok(addr) = s.parse() { + return Ok(addr); + } + let with_port = if s.contains(':') { + s.to_string() + } else { + format!("{s}:{default_port}") + }; + Ok( + std::net::ToSocketAddrs::to_socket_addrs(&with_port.as_str())? + .next() + .ok_or("no addrs resolved")?, + ) +} + +fn parse_guid(s: &str) -> Result> { + let trimmed = s.trim_start_matches('{').trim_end_matches('}'); + let hex: String = trimmed.chars().filter(|c| *c != '-').collect(); + if hex.len() != 32 { + return Err(format!("invalid GUID format: {s}").into()); + } + let mut bytes = [0u8; 16]; + for (i, chunk) in bytes.iter_mut().enumerate() { + let pair = hex + .get(i * 2..i * 2 + 2) + .ok_or("guid hex slice out of range")?; + *chunk = u8::from_str_radix(pair, 16)?; + } + bytes[0..4].reverse(); + bytes[4..6].reverse(); + bytes[6..8].reverse(); + Ok(Guid(bytes)) +} + +struct StaticResolver { + tag_reference: String, + metadata: GalaxyTagMetadata, +} + +impl StaticResolver { + fn new(tag_reference: &str) -> Self { + let (object, attribute) = tag_reference + .split_once('.') + .unwrap_or((tag_reference, "TestInt")); + Self { + tag_reference: tag_reference.to_string(), + metadata: GalaxyTagMetadata { + object_tag_name: object.to_string(), + attribute_name: attribute.to_string(), + primitive_name: None, + platform_id: 1, + engine_id: 2, + object_id: 3, + primitive_id: 0, + attribute_id: 7, + property_id: GalaxyTagMetadata::VALUE_PROPERTY_ID, + mx_data_type: 2, + is_array: false, + security_classification: 0, + attribute_source: "dynamic".into(), + }, + } + } +} + +#[async_trait::async_trait] +impl Resolver for StaticResolver { + async fn resolve(&self, tag: &str) -> Result { + if tag == self.tag_reference { + Ok(self.metadata.clone()) + } else { + Err(ResolverError::NotFound { + tag_reference: tag.to_string(), + }) + } + } }