From c6570dcd06630ef241895ea09f5d1dee9c4e1956 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Tue, 5 May 2026 12:34:24 -0400 Subject: [PATCH] [M5] mxaccess: asb-subscribe example exercises full F25+F26 stack MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replaces the M5 placeholder with an actual end-to-end demo: AsbTransport::connect (TCP + preamble + DH handshake) → register_items → read → disconnect → send_end Until F25 subscription ops (CreateSubscription / AddMonitoredItems / Publish-callback) land, the example is a Read-loop demo. Once subscription ops arrive, it gains a Publish-loop and lives up to its name. Env vars (analogous to the NMX `connect-write-read` example): MX_LIVE — non-empty enables the live path MX_ASB_HOST — endpoint host[:port]; defaults port 5074 MX_ASB_PASSPHRASE — solution shared secret MX_ASB_VIA — `net.tcp://...` URI (optional; derived from MX_ASB_HOST when omitted) MX_TEST_TAG — tag reference (default `TestChildObject.TestInt`) Without MX_LIVE: prints the `Setup-LiveProbeEnv.ps1` hint and exits cleanly with status 0 — the same pattern every other live example follows. Connection-id is a fresh 16-byte random buffer (matches .NET's `Guid.NewGuid()` at `MxAsbDataClient.cs:36`). Co-Authored-By: Claude Opus 4.7 (1M context) --- design/followups.md | 6 +- .../crates/mxaccess/examples/asb-subscribe.rs | 175 ++++++++++++++---- 2 files changed, 149 insertions(+), 32 deletions(-) diff --git a/design/followups.md b/design/followups.md index 694f76e..766e5d2 100644 --- a/design/followups.md +++ b/design/followups.md @@ -46,7 +46,11 @@ move to `## Resolved` with a date + commit hash. **Resolves when:** F19-F26 are all closed and the four DoD bullets above pass. -**Cumulative execution log.** F19 + F23 (`ed17c07`); F24 (`7611d9e`); F20 (`9dfd193`); F22 (`43c10a1`); F21 (`5f98558`); F25 step 1 (`25dbd8d`); F25 step 2 (`a2b8989`); F25 step 3 (`c4bf0a0`); F25 step 4 (`1e59249`); F25 step 5 (`9b8133f`); F25 step 6 (`321b796`); F25 step 7 (`1b1ee1e`); F26 step 1 (`8a0f92b`); F26 step 2 landed in this commit: +**Cumulative execution log.** F19 + F23 (`ed17c07`); F24 (`7611d9e`); F20 (`9dfd193`); F22 (`43c10a1`); F21 (`5f98558`); F25 step 1 (`25dbd8d`); F25 step 2 (`a2b8989`); F25 step 3 (`c4bf0a0`); F25 step 4 (`1e59249`); F25 step 5 (`9b8133f`); F25 step 6 (`321b796`); F25 step 7 (`1b1ee1e`); F26 step 1 (`8a0f92b`); F26 step 2 (`14bb529`); `examples/asb-subscribe.rs` rewrite landed in this commit: +- `examples/asb-subscribe.rs` rewrite: replaces the M5 placeholder with an actual end-to-end demo that exercises the F25 + F26 stack: `AsbTransport::connect` (TCP + preamble + DH handshake) → `register_items` → `read` → `disconnect` → `send_end`. Reads endpoint config from `MX_ASB_HOST`, `MX_ASB_PASSPHRASE`, `MX_ASB_VIA`, `MX_TEST_TAG` env vars (analogous to the NMX `connect-write-read` example's pattern). Defaults port 5074 when host omits one; defaults via URI to `net.tcp://{host}/ASBService` when `MX_ASB_VIA` is unset. Without `MX_LIVE` set, prints the `Setup-LiveProbeEnv.ps1` hint and exits cleanly. Connection-id is a fresh 16-byte random buffer (matches .NET's `Guid.NewGuid()` at `MxAsbDataClient.cs:36`). The example is a Read-loop until F25 subscription ops land — at that point the example will gain a Publish-loop and live up to its name. + +**Earlier slices:** +- F26 step 2 (commit `14bb529`): - F26 step 2: `AsbTransport::connect(endpoint, passphrase, crypto_parameters, via_uri, connection_id)` — `tokio::net::TcpStream`-specialised async constructor that owns the full transport-bring-up sequence: TCP connect → NMF preamble exchange → DH Connect handshake → AuthenticateMe one-way (signed). Returns `(AsbTransport, ConnectResponse)` so callers can inspect the negotiated lifetime / Apollo-vs-Baktun flag from the response. New `ConnectionError::TransportFailure { detail }` variant carries the underlying error message (NMF / NBFX / auth / I/O) without exploding the public taxonomy. Errors are mapped at the AsbClient/Auth boundary via `map_client_error` / `map_auth_error` helpers. 1 new test confirms a connect to an unreachable endpoint (127.0.0.1:1, TCPMUX-reserved) surfaces an `Err` cleanly without panicking. **Stubbed for F26 step 3:** `Session::connect_asb` constructor (the SessionInner refactor needed to host both NMX + ASB transports under one struct is heavier than this iteration's scope), plus the operation-routing layer that maps ASB result types (`ItemStatus`, `RuntimeValue`) back to `mxaccess` types (`MxStatus`, `DataChange`, `MxValue`). **Earlier slices:** diff --git a/rust/crates/mxaccess/examples/asb-subscribe.rs b/rust/crates/mxaccess/examples/asb-subscribe.rs index f12f8fc..fb1a557 100644 --- a/rust/crates/mxaccess/examples/asb-subscribe.rs +++ b/rust/crates/mxaccess/examples/asb-subscribe.rs @@ -1,44 +1,157 @@ -//! `asb-subscribe` — subscribe via the ASB transport (M5 placeholder). +//! `asb-subscribe` — bring up an ASB session and exercise RegisterItems + +//! Read against a live AVEVA endpoint. //! -//! ASB (`net.tcp` to MxDataProvider) is the M5 milestone — the -//! `mxaccess-asb-nettcp` framing crate and `mxaccess-asb` operations crate -//! are scaffolded but not yet wired into `Session`. Once M5 lands the demo -//! body below becomes a ~30-line subscribe + drain identical in shape to -//! `subscribe.rs`, just over the ASB transport. +//! Despite the example's historical name, true `Subscribe` over ASB +//! requires the F25 subscription operations (CreateSubscription / +//! AddMonitoredItems / Publish-callback) which are not yet implemented. +//! This example exercises the proven F25/F26 path: //! -//! See `design/60-roadmap.md` M5 for the operations matrix and -//! `docs/ASB-Native-Integration-Decision.md` for why ASB is the preferred -//! data-plane. +//! `AsbTransport::connect` (TCP + preamble + DH handshake) +//! → `AsbClient::register_items` +//! → `AsbClient::read` +//! → `AsbClient::disconnect` +//! → `AsbClient::send_end` +//! +//! Once F25 subscription ops land, this example will gain a short +//! Publish-loop. Until then it's a Read-loop demo. +//! +//! # Required env vars +//! +//! Populate via `tools/Setup-LiveProbeEnv.ps1` (dot-source it): +//! +//! - `MX_LIVE` (any non-empty value enables the live path) +//! - `MX_ASB_HOST` — ASB endpoint host[:port]; defaults port 5074 if omitted +//! - `MX_ASB_PASSPHRASE` — solution shared secret (typically read from +//! DPAPI on a real install; for CI / dev set directly via Infisical +//! per `tools/Setup-LiveProbeEnv.ps1`) +//! - `MX_ASB_VIA` — `net.tcp://host:port/ASBService` URL (optional; +//! derived from `MX_ASB_HOST` when omitted) +//! - `MX_TEST_TAG` — tag reference (default `TestChildObject.TestInt`) -use mxaccess::{ConnectionOptions, Session}; +use std::time::Duration; + +use mxaccess::AsbTransport; +use mxaccess_asb::ItemIdentity; +use mxaccess_asb_nettcp::auth::CryptoParameters; #[tokio::main] async fn main() -> Result<(), Box> { - if std::env::var_os("MX_LIVE").is_none() { + let Some(env) = LiveEnv::from_process()? else { eprintln!( - "MX_LIVE not set — `asb-subscribe` is the M5 placeholder; \ - run `. tools/Setup-LiveProbeEnv.ps1` once the ASB transport lands." + "MX_LIVE not set — skipping live demo. Run \ + `. tools/Setup-LiveProbeEnv.ps1` to populate the required env vars." ); return Ok(()); + }; + + eprintln!("connecting ASB at {} via {} ...", env.addr, env.via_uri); + let connection_id = generate_connection_id(); + let (mut transport, response) = AsbTransport::connect( + env.addr, + &env.passphrase, + &CryptoParameters::defaults(), + &env.via_uri, + connection_id, + ) + .await?; + eprintln!( + "connected; lifetime={:?} apollo={}", + response.connection_lifetime, + transport + .client_mut() + .authenticator_mut() + .use_apollo_signing() + ); + + let client = transport.client_mut(); + let items = vec![ItemIdentity::absolute_by_name(&env.tag)]; + + eprintln!("registering {}", env.tag); + let register = client.register_items(&items, true, false).await?; + eprintln!( + "register status: {} item(s); first error_code = 0x{:04x}", + register.status.len(), + register.status.first().map(|s| s.error_code).unwrap_or(0) + ); + + eprintln!("reading {} (timeout 5s)", env.tag); + let read = tokio::time::timeout(Duration::from_secs(5), client.read(&items)).await??; + for (status, value) in read.status.iter().zip(read.values.iter()) { + println!( + "{} = {:?} (error_code 0x{:04x})", + status.item.name.as_deref().unwrap_or("?"), + value.value, + status.error_code + ); + } + if read.values.is_empty() { + println!("{} returned no values yet (status only)", env.tag); } - match Session::connect(ConnectionOptions).await { - Ok(_) => { - eprintln!( - "Session::connect returned Ok unexpectedly — \ - update this example once M5 wires the ASB transport." - ); - } - Err(mxaccess::Error::Unsupported { - operation, - transport, - }) => { - eprintln!( - "{operation} on {transport:?}: deferred to M5. See \ - design/60-roadmap.md M5 for the ASB transport operations matrix." - ); - } - Err(e) => return Err(e.into()), - } + eprintln!("disconnecting"); + client.disconnect().await?; + client.send_end().await?; Ok(()) } + +// ---- live-env wiring -------------------------------------------------------- + +struct LiveEnv { + addr: std::net::SocketAddr, + passphrase: String, + via_uri: String, + tag: String, +} + +impl LiveEnv { + fn from_process() -> Result, Box> { + if std::env::var_os("MX_LIVE").is_none() { + return Ok(None); + } + let host = std::env::var("MX_ASB_HOST")?; + let addr = parse_host_port(&host, 5074)?; + let passphrase = std::env::var("MX_ASB_PASSPHRASE") + .map_err(|_| "MX_ASB_PASSPHRASE not set — ASB requires the solution shared secret")?; + let via_uri = + std::env::var("MX_ASB_VIA").unwrap_or_else(|_| format!("net.tcp://{host}/ASBService")); + let tag = std::env::var("MX_TEST_TAG").unwrap_or_else(|_| "TestChildObject.TestInt".into()); + Ok(Some(Self { + addr, + passphrase, + via_uri, + tag, + })) + } +} + +fn parse_host_port( + s: &str, + default_port: u16, +) -> Result> { + if let Ok(addr) = s.parse() { + return Ok(addr); + } + let with_port = if s.contains(':') { + s.to_string() + } else { + format!("{s}:{default_port}") + }; + Ok( + std::net::ToSocketAddrs::to_socket_addrs(&with_port.as_str())? + .next() + .ok_or("no addrs resolved")?, + ) +} + +/// Generate a fresh 16-byte connection-id GUID for this session. We +/// could pull `uuid::Uuid::new_v4()` for a real RFC 4122 v4, but the +/// example deliberately stays dep-light — `rand::random::<[u8; 16]>()` +/// is sufficient since the field is opaque to the service (the .NET +/// reference at `MxAsbDataClient.cs:36` uses `Guid.NewGuid()` which +/// is also a v4 random GUID). +fn generate_connection_id() -> [u8; 16] { + use rand::RngCore; + let mut bytes = [0u8; 16]; + rand::thread_rng().fill_bytes(&mut bytes); + bytes +}