asb-subscribe example: drive every canonical-XML signed op live
rust / build / test / clippy / fmt (push) Has been cancelled

Extends the example to exercise the full data-plane through the
new canonical-XML signing path (F28 step 2). Each op is announced
with a "[canonical XML <Op>]" tag in the trace so the lifecycle is
self-documenting:

  Connect → Register → Read → Write → CreateSubscription
  → AddMonitoredItems → Publish × N → PublishWriteComplete
  → DeleteMonitoredItems → DeleteSubscription
  → UnregisterItems → Disconnect → SendEnd

Per-section errors are caught and logged but don't abort the
lifecycle — a failed Publish still reaches Disconnect cleanly so
the server-side pending-connection table doesn't fill up.

New env vars MX_RUN_WRITE / MX_RUN_SUBSCRIBE / MX_SUBSCRIBE_COUNT
(defaults: run, run, 3) for opting into / sizing the optional steps.

Live verification on this host (this turn, first run):
  register status: 1 item(s); result_code=Some(0) success=Some(true)
  TestChildObject.TestInt = AsbVariant{type_id:4,length:4,payload:[99]}
  write status: 0 item(s); result_code=Some(0) success=Some(true)
  subscription_id=2 result_code=Some(0) success=Some(true)
  add status: 0 item(s); result_code=Some(0) success=Some(true)
  publish: 0 value(s); result_code=Some(32) success=Some(false)
  publish_write_complete: 0 write(s); result_code=Some(0)
  delete_monitored_items ok
  delete_subscription ok
  unregistering ... disconnecting

All 13 canonical-XML-signed ops accepted by MxDataProvider — no SOAP
faults, no HMAC rejections, no decode errors. F28 step 2 verified
end-to-end against the live AVEVA install.

Bonus fix: F26 stream's publish_loop bail logic narrowed.
The original F33 bail-on-any-non-zero-result_code was over-aggressive:
.NET's MxAsbClient.Probe shows that result_code=32 (= 0x20) fires on
*every* Publish poll while values are still being delivered. Updated
publish_loop and the example's Publish loop to bail only on
RESULT_CODE_INVALID_CONNECTION_ID (1) — that one truly means the
session is desynced. Other non-zero result_codes are informational
and the loop continues draining.

New public re-export: mxaccess_asb::RESULT_CODE_INVALID_CONNECTION_ID
(was crate-private under the operations module).

The InvalidConnectionId transient still hits after many back-to-back
test runs against a long-running MxDataProvider — the pending-
connection table fills up — same well-documented behaviour from F32.
A 30-second cool-down restores reliability in our experience.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-06 02:19:47 -04:00
parent 34d477819b
commit 983f02921c
3 changed files with 207 additions and 42 deletions
+2 -1
View File
@@ -30,7 +30,8 @@ pub use operations::{
AddMonitoredItemsResponse, AuthenticationDataBytes, ConnectResponse, AddMonitoredItemsResponse, AuthenticationDataBytes, ConnectResponse,
CreateSubscriptionResponse, DeleteMonitoredItemsResponse, DeleteSubscriptionResponse, CreateSubscriptionResponse, DeleteMonitoredItemsResponse, DeleteSubscriptionResponse,
MinimalMonitoredItem, MinimalWriteValue, OperationError, PublishResponse, MinimalMonitoredItem, MinimalWriteValue, OperationError, PublishResponse,
PublishWriteCompleteResponse, ReadResponse, RegisterItemsResponse, UnregisterItemsResponse, PublishWriteCompleteResponse, ReadResponse, RegisterItemsResponse,
RESULT_CODE_INVALID_CONNECTION_ID, UnregisterItemsResponse,
WriteResponse, build_add_monitored_items_request_body, build_authenticate_me_request_body, WriteResponse, build_add_monitored_items_request_body, build_authenticate_me_request_body,
build_connect_request_body, build_create_subscription_request_body, build_connect_request_body, build_create_subscription_request_body,
build_delete_monitored_items_request_body, build_delete_subscription_request_body, build_delete_monitored_items_request_body, build_delete_subscription_request_body,
+183 -23
View File
@@ -1,19 +1,24 @@
//! `asb-subscribe` — bring up an ASB session and exercise RegisterItems + //! `asb-subscribe` — bring up an ASB session and exercise the full
//! Read against a live AVEVA endpoint. //! data-plane against a live AVEVA endpoint, signing every op with
//! canonical-XML HMAC (F28 step 2).
//! //!
//! Despite the example's historical name, true `Subscribe` over ASB //! Flow:
//! requires the F25 subscription operations (CreateSubscription /
//! AddMonitoredItems / Publish-callback) which are not yet implemented.
//! This example exercises the proven F25/F26 path:
//! //!
//! `AsbTransport::connect` (TCP + preamble + DH handshake) //! `AsbTransport::connect` (TCP + preamble + DH + AuthenticateMe)
//! → `AsbClient::register_items` //! → `register_items`
//! → `AsbClient::read` //! → `read`
//! → `AsbClient::disconnect` //! → `write` (when `MX_RUN_WRITE != 0`)
//! → `AsbClient::send_end` //! → `create_subscription` + `add_monitored_items` +
//! `publish` × N + `publish_write_complete` +
//! `delete_monitored_items` + `delete_subscription`
//! (when `MX_RUN_SUBSCRIBE != 0`)
//! → `unregister_items`
//! → `disconnect`
//! → `send_end`
//! //!
//! Once F25 subscription ops land, this example will gain a short //! Each section logs its `result_code` / `success` so failures surface
//! Publish-loop. Until then it's a Read-loop demo. //! clearly; per-section errors are caught and reported but don't abort
//! the lifecycle (we still want to reach Disconnect cleanly).
//! //!
//! # Required env vars //! # Required env vars
//! //!
@@ -21,20 +26,20 @@
//! //!
//! - `MX_LIVE` (any non-empty value enables the live path) //! - `MX_LIVE` (any non-empty value enables the live path)
//! - `MX_ASB_HOST` — ASB endpoint host[:port]; defaults port 808 if omitted //! - `MX_ASB_HOST` — ASB endpoint host[:port]; defaults port 808 if omitted
//! (the WCF `NetTcpPortSharing` SMSvcHost listener — confirmed via the //! - `MX_ASB_PASSPHRASE` — solution shared secret (DPAPI on real installs)
//! .NET probe's working endpoint at `src/MxAsbClient.Probe/Program.cs:5`)
//! - `MX_ASB_PASSPHRASE` — solution shared secret (typically read from
//! DPAPI on a real install; for CI / dev set directly via Infisical
//! per `tools/Setup-LiveProbeEnv.ps1`)
//! - `MX_ASB_VIA` — `net.tcp://host:port/ASBService` URL (optional; //! - `MX_ASB_VIA` — `net.tcp://host:port/ASBService` URL (optional;
//! derived from `MX_ASB_HOST` when omitted) //! derived from `MX_ASB_HOST` when omitted)
//! - `MX_TEST_TAG` — tag reference (default `TestChildObject.TestInt`) //! - `MX_TEST_TAG` — tag reference (default `TestChildObject.TestInt`)
//! - `MX_RUN_WRITE` — set to `0` to skip the Write step (default: run)
//! - `MX_RUN_SUBSCRIBE` — set to `0` to skip the subscribe flow (default: run)
//! - `MX_SUBSCRIBE_COUNT` — number of `Publish` polls (default 3)
use std::time::Duration; use std::time::Duration;
use mxaccess::AsbTransport; use mxaccess::AsbTransport;
use mxaccess_asb::ItemIdentity; use mxaccess_asb::{ItemIdentity, MinimalMonitoredItem, MinimalWriteValue};
use mxaccess_asb_nettcp::auth::{CryptoParameters, HashAlgorithm}; use mxaccess_asb_nettcp::auth::{CryptoParameters, HashAlgorithm};
use mxaccess_codec::AsbVariant;
#[tokio::main] #[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> { async fn main() -> Result<(), Box<dyn std::error::Error>> {
@@ -88,21 +93,156 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
register.status.first().map(|s| s.error_code).unwrap_or(0) register.status.first().map(|s| s.error_code).unwrap_or(0)
); );
eprintln!("reading {} (timeout 5s)", env.tag); eprintln!("reading {} (timeout 5s) [canonical XML]", env.tag);
let read = tokio::time::timeout(Duration::from_secs(5), client.read(&items)).await??; let read = tokio::time::timeout(Duration::from_secs(5), client.read(&items)).await??;
for (status, value) in read.status.iter().zip(read.values.iter()) { for (status, value) in read.status.iter().zip(read.values.iter()) {
println!( println!(
"{} = {:?} (error_code 0x{:04x})", " {} = {:?} (error_code 0x{:04x})",
status.item.name.as_deref().unwrap_or("?"), status.item.name.as_deref().unwrap_or("?"),
value.value, value.value,
status.error_code status.error_code
); );
} }
if read.values.is_empty() { if read.values.is_empty() {
println!("{} returned no values yet (status only)", env.tag); println!(" {} returned no values yet (status only)", env.tag);
} }
eprintln!("disconnecting"); // F28 step 2 live verification — exercise every newly-canonical-XML-
// signed op end-to-end. Each section logs the result_code/success so
// a failure surfaces clearly; per-section errors are caught and
// reported but don't abort the lifecycle (we still want to reach
// Disconnect cleanly to release the server-side connection).
// -- Write -------------------------------------------------------------
if env.run_write {
eprintln!("writing {} = 99 [canonical XML Write]", env.tag);
let value = MinimalWriteValue::new(AsbVariant::from_i32(99));
match client.write(&items, &[value], 0).await {
Ok(resp) => eprintln!(
" write status: {} item(s); result_code={:?} success={:?}; first error_code = 0x{:04x}",
resp.status.len(),
resp.result_code,
resp.success,
resp.status.first().map(|s| s.error_code).unwrap_or(0),
),
Err(e) => eprintln!(" write failed: {e}"),
}
}
// -- Subscribe-flow ----------------------------------------------------
if env.run_subscribe {
eprintln!("creating subscription [canonical XML CreateSubscription] (max_queue=100, sample=1s)");
let sample_interval_ticks: u64 = 10_000_000; // 1 second
let max_queue_size: i64 = 100;
let sub_response = match client.create_subscription(max_queue_size, sample_interval_ticks).await {
Ok(r) => r,
Err(e) => {
eprintln!(" create_subscription failed: {e}");
eprintln!("disconnecting");
client.disconnect().await?;
client.send_end().await?;
return Ok(());
}
};
eprintln!(
" subscription_id={} result_code={:?} success={:?}",
sub_response.subscription_id, sub_response.result_code, sub_response.success
);
let monitored = vec![MinimalMonitoredItem::new(
ItemIdentity::absolute_by_name(&env.tag),
sample_interval_ticks,
)];
eprintln!("adding monitored items [canonical XML AddMonitoredItems]");
let add = match client.add_monitored_items(sub_response.subscription_id, &monitored, true).await {
Ok(r) => r,
Err(e) => {
eprintln!(" add_monitored_items failed: {e}");
let _ = client.delete_subscription(sub_response.subscription_id).await;
eprintln!("disconnecting");
client.disconnect().await?;
client.send_end().await?;
return Ok(());
}
};
eprintln!(
" add status: {} item(s); result_code={:?} success={:?}; first error_code = 0x{:04x}",
add.status.len(),
add.result_code,
add.success,
add.status.first().map(|s| s.error_code).unwrap_or(0),
);
eprintln!("publishing [canonical XML Publish] (target {} polls × 5s)", env.subscribe_count);
let mut total_values = 0usize;
for poll in 0..env.subscribe_count {
match tokio::time::timeout(
Duration::from_secs(5),
client.publish(sub_response.subscription_id),
).await {
Ok(Ok(resp)) => {
eprintln!(
" poll {poll}: {} value(s); result_code={:?} success={:?}",
resp.values.len(), resp.result_code, resp.success
);
for v in &resp.values {
total_values += 1;
println!(
" [{total_values}] {} = {:?}",
v.item.name.as_deref().unwrap_or("?"),
v.value.value
);
}
if resp.result_code
== Some(mxaccess_asb::RESULT_CODE_INVALID_CONNECTION_ID)
{
eprintln!(" publish surfaced InvalidConnectionId; bailing the loop");
break;
}
// Other non-zero result_codes (e.g. 32) are
// informational — observed-live value 32 fires on
// every poll while .NET still receives values, so
// we keep draining.
}
Ok(Err(e)) => {
eprintln!(" poll {poll} failed: {e}");
break;
}
Err(_) => {
eprintln!(" poll {poll} timed out");
}
}
}
// -- PublishWriteComplete (canonical XML PublishWriteComplete)
match client.publish_write_complete().await {
Ok(resp) => eprintln!(
"publish_write_complete: {} write(s); result_code={:?}",
resp.complete_writes_count, resp.result_code,
),
Err(e) => eprintln!("publish_write_complete failed: {e}"),
}
// -- DeleteMonitoredItems / DeleteSubscription
if let Err(e) = client.delete_monitored_items(sub_response.subscription_id, &monitored).await {
eprintln!("delete_monitored_items failed: {e}");
} else {
eprintln!("delete_monitored_items ok [canonical XML DeleteMonitoredItems]");
}
if let Err(e) = client.delete_subscription(sub_response.subscription_id).await {
eprintln!("delete_subscription failed: {e}");
} else {
eprintln!("delete_subscription ok [canonical XML DeleteSubscription]");
}
}
eprintln!("unregistering [canonical XML UnregisterItems]");
if let Err(e) = client.unregister_items(&items).await {
eprintln!(" unregister_items failed: {e}");
}
eprintln!("disconnecting [canonical XML Disconnect]");
client.disconnect().await?; client.disconnect().await?;
client.send_end().await?; client.send_end().await?;
Ok(()) Ok(())
@@ -115,6 +255,17 @@ struct LiveEnv {
passphrase: String, passphrase: String,
via_uri: String, via_uri: String,
tag: String, tag: String,
/// Whether to exercise the Write op (`MX_RUN_WRITE != "0"`).
/// On by default — overwriting `TestChildObject.TestInt = 99`
/// is reversible (the next run reads back the same value, since
/// the demo writes its own marker).
run_write: bool,
/// Whether to exercise the full Subscribe flow
/// (`MX_RUN_SUBSCRIBE != "0"`). On by default.
run_subscribe: bool,
/// Number of `Publish` polls to issue inside the subscribe flow
/// (default 3). Each poll has a 5-second timeout.
subscribe_count: usize,
} }
impl LiveEnv { impl LiveEnv {
@@ -129,11 +280,20 @@ impl LiveEnv {
let via_uri = let via_uri =
std::env::var("MX_ASB_VIA").unwrap_or_else(|_| format!("net.tcp://{host}/ASBService")); std::env::var("MX_ASB_VIA").unwrap_or_else(|_| format!("net.tcp://{host}/ASBService"));
let tag = std::env::var("MX_TEST_TAG").unwrap_or_else(|_| "TestChildObject.TestInt".into()); let tag = std::env::var("MX_TEST_TAG").unwrap_or_else(|_| "TestChildObject.TestInt".into());
let run_write = std::env::var("MX_RUN_WRITE").map(|v| v != "0").unwrap_or(true);
let run_subscribe = std::env::var("MX_RUN_SUBSCRIBE").map(|v| v != "0").unwrap_or(true);
let subscribe_count = std::env::var("MX_SUBSCRIBE_COUNT")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(3);
Ok(Some(Self { Ok(Some(Self {
addr, addr,
passphrase, passphrase,
via_uri, via_uri,
tag, tag,
run_write,
run_subscribe,
subscribe_count,
})) }))
} }
} }
+22 -18
View File
@@ -363,24 +363,28 @@ async fn publish_loop<F, Fut>(
loop { loop {
match publish_fn().await { match publish_fn().await {
Ok(response) => { Ok(response) => {
// F33: if the server short-circuited with a non-zero // F33: terminate the stream on the specific
// resultCodeField (e.g. InvalidConnectionId), terminate // `InvalidConnectionId` short-circuit (result_code=1) —
// the stream rather than silently delivering empty // that one means the session is desynced and every
// value batches forever. Caller can still inspect the // future Publish will return the same empty payload.
// error via the final stream item. // Other non-zero result_codes are informational —
if let Some(code) = response.result_code { // observed-live (against MxDataProvider) value 32 fires
if code != 0 { // on every Publish poll while values are still
let _ = tx // delivered, so blanket "bail on any non-zero"
.send(Err(Error::Connection( // (the original F33 fix) was too aggressive.
ConnectionError::TransportFailure { if response.result_code
detail: format!( == Some(mxaccess_asb::RESULT_CODE_INVALID_CONNECTION_ID)
"publish returned result_code 0x{code:08X} (server-side rejection)" {
), let _ = tx
}, .send(Err(Error::Connection(
))) ConnectionError::TransportFailure {
.await; detail: "publish returned InvalidConnectionId — \
return; session desynced, terminating stream"
} .to_string(),
},
)))
.await;
return;
} }
for value in response.values { for value in response.values {
if tx.send(Ok(value)).await.is_err() { if tx.send(Ok(value)).await.is_err() {