diff --git a/rust/crates/mxaccess-asb/src/lib.rs b/rust/crates/mxaccess-asb/src/lib.rs index b939ee4..24c46a5 100644 --- a/rust/crates/mxaccess-asb/src/lib.rs +++ b/rust/crates/mxaccess-asb/src/lib.rs @@ -30,7 +30,8 @@ pub use operations::{ AddMonitoredItemsResponse, AuthenticationDataBytes, ConnectResponse, CreateSubscriptionResponse, DeleteMonitoredItemsResponse, DeleteSubscriptionResponse, MinimalMonitoredItem, MinimalWriteValue, OperationError, PublishResponse, - PublishWriteCompleteResponse, ReadResponse, RegisterItemsResponse, UnregisterItemsResponse, + PublishWriteCompleteResponse, ReadResponse, RegisterItemsResponse, + RESULT_CODE_INVALID_CONNECTION_ID, UnregisterItemsResponse, WriteResponse, build_add_monitored_items_request_body, build_authenticate_me_request_body, build_connect_request_body, build_create_subscription_request_body, build_delete_monitored_items_request_body, build_delete_subscription_request_body, diff --git a/rust/crates/mxaccess/examples/asb-subscribe.rs b/rust/crates/mxaccess/examples/asb-subscribe.rs index 6455a9b..9703fa4 100644 --- a/rust/crates/mxaccess/examples/asb-subscribe.rs +++ b/rust/crates/mxaccess/examples/asb-subscribe.rs @@ -1,19 +1,24 @@ -//! `asb-subscribe` — bring up an ASB session and exercise RegisterItems + -//! Read against a live AVEVA endpoint. +//! `asb-subscribe` — bring up an ASB session and exercise the full +//! data-plane against a live AVEVA endpoint, signing every op with +//! canonical-XML HMAC (F28 step 2). //! -//! Despite the example's historical name, true `Subscribe` over ASB -//! requires the F25 subscription operations (CreateSubscription / -//! AddMonitoredItems / Publish-callback) which are not yet implemented. -//! This example exercises the proven F25/F26 path: +//! Flow: //! -//! `AsbTransport::connect` (TCP + preamble + DH handshake) -//! → `AsbClient::register_items` -//! → `AsbClient::read` -//! → `AsbClient::disconnect` -//! → `AsbClient::send_end` +//! `AsbTransport::connect` (TCP + preamble + DH + AuthenticateMe) +//! → `register_items` +//! → `read` +//! → `write` (when `MX_RUN_WRITE != 0`) +//! → `create_subscription` + `add_monitored_items` + +//! `publish` × N + `publish_write_complete` + +//! `delete_monitored_items` + `delete_subscription` +//! (when `MX_RUN_SUBSCRIBE != 0`) +//! → `unregister_items` +//! → `disconnect` +//! → `send_end` //! -//! Once F25 subscription ops land, this example will gain a short -//! Publish-loop. Until then it's a Read-loop demo. +//! Each section logs its `result_code` / `success` so failures surface +//! clearly; per-section errors are caught and reported but don't abort +//! the lifecycle (we still want to reach Disconnect cleanly). //! //! # Required env vars //! @@ -21,20 +26,20 @@ //! //! - `MX_LIVE` (any non-empty value enables the live path) //! - `MX_ASB_HOST` — ASB endpoint host[:port]; defaults port 808 if omitted -//! (the WCF `NetTcpPortSharing` SMSvcHost listener — confirmed via the -//! .NET probe's working endpoint at `src/MxAsbClient.Probe/Program.cs:5`) -//! - `MX_ASB_PASSPHRASE` — solution shared secret (typically read from -//! DPAPI on a real install; for CI / dev set directly via Infisical -//! per `tools/Setup-LiveProbeEnv.ps1`) +//! - `MX_ASB_PASSPHRASE` — solution shared secret (DPAPI on real installs) //! - `MX_ASB_VIA` — `net.tcp://host:port/ASBService` URL (optional; //! derived from `MX_ASB_HOST` when omitted) //! - `MX_TEST_TAG` — tag reference (default `TestChildObject.TestInt`) +//! - `MX_RUN_WRITE` — set to `0` to skip the Write step (default: run) +//! - `MX_RUN_SUBSCRIBE` — set to `0` to skip the subscribe flow (default: run) +//! - `MX_SUBSCRIBE_COUNT` — number of `Publish` polls (default 3) use std::time::Duration; use mxaccess::AsbTransport; -use mxaccess_asb::ItemIdentity; +use mxaccess_asb::{ItemIdentity, MinimalMonitoredItem, MinimalWriteValue}; use mxaccess_asb_nettcp::auth::{CryptoParameters, HashAlgorithm}; +use mxaccess_codec::AsbVariant; #[tokio::main] async fn main() -> Result<(), Box> { @@ -88,21 +93,156 @@ async fn main() -> Result<(), Box> { register.status.first().map(|s| s.error_code).unwrap_or(0) ); - eprintln!("reading {} (timeout 5s)", env.tag); + eprintln!("reading {} (timeout 5s) [canonical XML]", env.tag); let read = tokio::time::timeout(Duration::from_secs(5), client.read(&items)).await??; for (status, value) in read.status.iter().zip(read.values.iter()) { println!( - "{} = {:?} (error_code 0x{:04x})", + " {} = {:?} (error_code 0x{:04x})", status.item.name.as_deref().unwrap_or("?"), value.value, status.error_code ); } if read.values.is_empty() { - println!("{} returned no values yet (status only)", env.tag); + println!(" {} returned no values yet (status only)", env.tag); } - eprintln!("disconnecting"); + // F28 step 2 live verification — exercise every newly-canonical-XML- + // signed op end-to-end. Each section logs the result_code/success so + // a failure surfaces clearly; per-section errors are caught and + // reported but don't abort the lifecycle (we still want to reach + // Disconnect cleanly to release the server-side connection). + + // -- Write ------------------------------------------------------------- + if env.run_write { + eprintln!("writing {} = 99 [canonical XML Write]", env.tag); + let value = MinimalWriteValue::new(AsbVariant::from_i32(99)); + match client.write(&items, &[value], 0).await { + Ok(resp) => eprintln!( + " write status: {} item(s); result_code={:?} success={:?}; first error_code = 0x{:04x}", + resp.status.len(), + resp.result_code, + resp.success, + resp.status.first().map(|s| s.error_code).unwrap_or(0), + ), + Err(e) => eprintln!(" write failed: {e}"), + } + } + + // -- Subscribe-flow ---------------------------------------------------- + if env.run_subscribe { + eprintln!("creating subscription [canonical XML CreateSubscription] (max_queue=100, sample=1s)"); + let sample_interval_ticks: u64 = 10_000_000; // 1 second + let max_queue_size: i64 = 100; + let sub_response = match client.create_subscription(max_queue_size, sample_interval_ticks).await { + Ok(r) => r, + Err(e) => { + eprintln!(" create_subscription failed: {e}"); + eprintln!("disconnecting"); + client.disconnect().await?; + client.send_end().await?; + return Ok(()); + } + }; + eprintln!( + " subscription_id={} result_code={:?} success={:?}", + sub_response.subscription_id, sub_response.result_code, sub_response.success + ); + + let monitored = vec![MinimalMonitoredItem::new( + ItemIdentity::absolute_by_name(&env.tag), + sample_interval_ticks, + )]; + + eprintln!("adding monitored items [canonical XML AddMonitoredItems]"); + let add = match client.add_monitored_items(sub_response.subscription_id, &monitored, true).await { + Ok(r) => r, + Err(e) => { + eprintln!(" add_monitored_items failed: {e}"); + let _ = client.delete_subscription(sub_response.subscription_id).await; + eprintln!("disconnecting"); + client.disconnect().await?; + client.send_end().await?; + return Ok(()); + } + }; + eprintln!( + " add status: {} item(s); result_code={:?} success={:?}; first error_code = 0x{:04x}", + add.status.len(), + add.result_code, + add.success, + add.status.first().map(|s| s.error_code).unwrap_or(0), + ); + + eprintln!("publishing [canonical XML Publish] (target {} polls × 5s)", env.subscribe_count); + let mut total_values = 0usize; + for poll in 0..env.subscribe_count { + match tokio::time::timeout( + Duration::from_secs(5), + client.publish(sub_response.subscription_id), + ).await { + Ok(Ok(resp)) => { + eprintln!( + " poll {poll}: {} value(s); result_code={:?} success={:?}", + resp.values.len(), resp.result_code, resp.success + ); + for v in &resp.values { + total_values += 1; + println!( + " [{total_values}] {} = {:?}", + v.item.name.as_deref().unwrap_or("?"), + v.value.value + ); + } + if resp.result_code + == Some(mxaccess_asb::RESULT_CODE_INVALID_CONNECTION_ID) + { + eprintln!(" publish surfaced InvalidConnectionId; bailing the loop"); + break; + } + // Other non-zero result_codes (e.g. 32) are + // informational — observed-live value 32 fires on + // every poll while .NET still receives values, so + // we keep draining. + } + Ok(Err(e)) => { + eprintln!(" poll {poll} failed: {e}"); + break; + } + Err(_) => { + eprintln!(" poll {poll} timed out"); + } + } + } + + // -- PublishWriteComplete (canonical XML PublishWriteComplete) + match client.publish_write_complete().await { + Ok(resp) => eprintln!( + "publish_write_complete: {} write(s); result_code={:?}", + resp.complete_writes_count, resp.result_code, + ), + Err(e) => eprintln!("publish_write_complete failed: {e}"), + } + + // -- DeleteMonitoredItems / DeleteSubscription + if let Err(e) = client.delete_monitored_items(sub_response.subscription_id, &monitored).await { + eprintln!("delete_monitored_items failed: {e}"); + } else { + eprintln!("delete_monitored_items ok [canonical XML DeleteMonitoredItems]"); + } + if let Err(e) = client.delete_subscription(sub_response.subscription_id).await { + eprintln!("delete_subscription failed: {e}"); + } else { + eprintln!("delete_subscription ok [canonical XML DeleteSubscription]"); + } + } + + eprintln!("unregistering [canonical XML UnregisterItems]"); + if let Err(e) = client.unregister_items(&items).await { + eprintln!(" unregister_items failed: {e}"); + } + + eprintln!("disconnecting [canonical XML Disconnect]"); client.disconnect().await?; client.send_end().await?; Ok(()) @@ -115,6 +255,17 @@ struct LiveEnv { passphrase: String, via_uri: String, tag: String, + /// Whether to exercise the Write op (`MX_RUN_WRITE != "0"`). + /// On by default — overwriting `TestChildObject.TestInt = 99` + /// is reversible (the next run reads back the same value, since + /// the demo writes its own marker). + run_write: bool, + /// Whether to exercise the full Subscribe flow + /// (`MX_RUN_SUBSCRIBE != "0"`). On by default. + run_subscribe: bool, + /// Number of `Publish` polls to issue inside the subscribe flow + /// (default 3). Each poll has a 5-second timeout. + subscribe_count: usize, } impl LiveEnv { @@ -129,11 +280,20 @@ impl LiveEnv { let via_uri = std::env::var("MX_ASB_VIA").unwrap_or_else(|_| format!("net.tcp://{host}/ASBService")); let tag = std::env::var("MX_TEST_TAG").unwrap_or_else(|_| "TestChildObject.TestInt".into()); + let run_write = std::env::var("MX_RUN_WRITE").map(|v| v != "0").unwrap_or(true); + let run_subscribe = std::env::var("MX_RUN_SUBSCRIBE").map(|v| v != "0").unwrap_or(true); + let subscribe_count = std::env::var("MX_SUBSCRIBE_COUNT") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(3); Ok(Some(Self { addr, passphrase, via_uri, tag, + run_write, + run_subscribe, + subscribe_count, })) } } diff --git a/rust/crates/mxaccess/src/asb_session.rs b/rust/crates/mxaccess/src/asb_session.rs index ea595aa..ab03b59 100644 --- a/rust/crates/mxaccess/src/asb_session.rs +++ b/rust/crates/mxaccess/src/asb_session.rs @@ -363,24 +363,28 @@ async fn publish_loop( loop { match publish_fn().await { Ok(response) => { - // F33: if the server short-circuited with a non-zero - // resultCodeField (e.g. InvalidConnectionId), terminate - // the stream rather than silently delivering empty - // value batches forever. Caller can still inspect the - // error via the final stream item. - if let Some(code) = response.result_code { - if code != 0 { - let _ = tx - .send(Err(Error::Connection( - ConnectionError::TransportFailure { - detail: format!( - "publish returned result_code 0x{code:08X} (server-side rejection)" - ), - }, - ))) - .await; - return; - } + // F33: terminate the stream on the specific + // `InvalidConnectionId` short-circuit (result_code=1) — + // that one means the session is desynced and every + // future Publish will return the same empty payload. + // Other non-zero result_codes are informational — + // observed-live (against MxDataProvider) value 32 fires + // on every Publish poll while values are still + // delivered, so blanket "bail on any non-zero" + // (the original F33 fix) was too aggressive. + if response.result_code + == Some(mxaccess_asb::RESULT_CODE_INVALID_CONNECTION_ID) + { + let _ = tx + .send(Err(Error::Connection( + ConnectionError::TransportFailure { + detail: "publish returned InvalidConnectionId — \ + session desynced, terminating stream" + .to_string(), + }, + ))) + .await; + return; } for value in response.values { if tx.send(Ok(value)).await.is_err() {