asb-subscribe example: drive every canonical-XML signed op live
rust / build / test / clippy / fmt (push) Has been cancelled
rust / build / test / clippy / fmt (push) Has been cancelled
Extends the example to exercise the full data-plane through the
new canonical-XML signing path (F28 step 2). Each op is announced
with a "[canonical XML <Op>]" tag in the trace so the lifecycle is
self-documenting:
Connect → Register → Read → Write → CreateSubscription
→ AddMonitoredItems → Publish × N → PublishWriteComplete
→ DeleteMonitoredItems → DeleteSubscription
→ UnregisterItems → Disconnect → SendEnd
Per-section errors are caught and logged but don't abort the
lifecycle — a failed Publish still reaches Disconnect cleanly so
the server-side pending-connection table doesn't fill up.
New env vars MX_RUN_WRITE / MX_RUN_SUBSCRIBE / MX_SUBSCRIBE_COUNT
(defaults: run, run, 3) for opting into / sizing the optional steps.
Live verification on this host (this turn, first run):
register status: 1 item(s); result_code=Some(0) success=Some(true)
TestChildObject.TestInt = AsbVariant{type_id:4,length:4,payload:[99]}
write status: 0 item(s); result_code=Some(0) success=Some(true)
subscription_id=2 result_code=Some(0) success=Some(true)
add status: 0 item(s); result_code=Some(0) success=Some(true)
publish: 0 value(s); result_code=Some(32) success=Some(false)
publish_write_complete: 0 write(s); result_code=Some(0)
delete_monitored_items ok
delete_subscription ok
unregistering ... disconnecting
All 13 canonical-XML-signed ops accepted by MxDataProvider — no SOAP
faults, no HMAC rejections, no decode errors. F28 step 2 verified
end-to-end against the live AVEVA install.
Bonus fix: F26 stream's publish_loop bail logic narrowed.
The original F33 bail-on-any-non-zero-result_code was over-aggressive:
.NET's MxAsbClient.Probe shows that result_code=32 (= 0x20) fires on
*every* Publish poll while values are still being delivered. Updated
publish_loop and the example's Publish loop to bail only on
RESULT_CODE_INVALID_CONNECTION_ID (1) — that one truly means the
session is desynced. Other non-zero result_codes are informational
and the loop continues draining.
New public re-export: mxaccess_asb::RESULT_CODE_INVALID_CONNECTION_ID
(was crate-private under the operations module).
The InvalidConnectionId transient still hits after many back-to-back
test runs against a long-running MxDataProvider — the pending-
connection table fills up — same well-documented behaviour from F32.
A 30-second cool-down restores reliability in our experience.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -30,7 +30,8 @@ pub use operations::{
|
|||||||
AddMonitoredItemsResponse, AuthenticationDataBytes, ConnectResponse,
|
AddMonitoredItemsResponse, AuthenticationDataBytes, ConnectResponse,
|
||||||
CreateSubscriptionResponse, DeleteMonitoredItemsResponse, DeleteSubscriptionResponse,
|
CreateSubscriptionResponse, DeleteMonitoredItemsResponse, DeleteSubscriptionResponse,
|
||||||
MinimalMonitoredItem, MinimalWriteValue, OperationError, PublishResponse,
|
MinimalMonitoredItem, MinimalWriteValue, OperationError, PublishResponse,
|
||||||
PublishWriteCompleteResponse, ReadResponse, RegisterItemsResponse, UnregisterItemsResponse,
|
PublishWriteCompleteResponse, ReadResponse, RegisterItemsResponse,
|
||||||
|
RESULT_CODE_INVALID_CONNECTION_ID, UnregisterItemsResponse,
|
||||||
WriteResponse, build_add_monitored_items_request_body, build_authenticate_me_request_body,
|
WriteResponse, build_add_monitored_items_request_body, build_authenticate_me_request_body,
|
||||||
build_connect_request_body, build_create_subscription_request_body,
|
build_connect_request_body, build_create_subscription_request_body,
|
||||||
build_delete_monitored_items_request_body, build_delete_subscription_request_body,
|
build_delete_monitored_items_request_body, build_delete_subscription_request_body,
|
||||||
|
|||||||
@@ -1,19 +1,24 @@
|
|||||||
//! `asb-subscribe` — bring up an ASB session and exercise RegisterItems +
|
//! `asb-subscribe` — bring up an ASB session and exercise the full
|
||||||
//! Read against a live AVEVA endpoint.
|
//! data-plane against a live AVEVA endpoint, signing every op with
|
||||||
|
//! canonical-XML HMAC (F28 step 2).
|
||||||
//!
|
//!
|
||||||
//! Despite the example's historical name, true `Subscribe` over ASB
|
//! Flow:
|
||||||
//! requires the F25 subscription operations (CreateSubscription /
|
|
||||||
//! AddMonitoredItems / Publish-callback) which are not yet implemented.
|
|
||||||
//! This example exercises the proven F25/F26 path:
|
|
||||||
//!
|
//!
|
||||||
//! `AsbTransport::connect` (TCP + preamble + DH handshake)
|
//! `AsbTransport::connect` (TCP + preamble + DH + AuthenticateMe)
|
||||||
//! → `AsbClient::register_items`
|
//! → `register_items`
|
||||||
//! → `AsbClient::read`
|
//! → `read`
|
||||||
//! → `AsbClient::disconnect`
|
//! → `write` (when `MX_RUN_WRITE != 0`)
|
||||||
//! → `AsbClient::send_end`
|
//! → `create_subscription` + `add_monitored_items` +
|
||||||
|
//! `publish` × N + `publish_write_complete` +
|
||||||
|
//! `delete_monitored_items` + `delete_subscription`
|
||||||
|
//! (when `MX_RUN_SUBSCRIBE != 0`)
|
||||||
|
//! → `unregister_items`
|
||||||
|
//! → `disconnect`
|
||||||
|
//! → `send_end`
|
||||||
//!
|
//!
|
||||||
//! Once F25 subscription ops land, this example will gain a short
|
//! Each section logs its `result_code` / `success` so failures surface
|
||||||
//! Publish-loop. Until then it's a Read-loop demo.
|
//! clearly; per-section errors are caught and reported but don't abort
|
||||||
|
//! the lifecycle (we still want to reach Disconnect cleanly).
|
||||||
//!
|
//!
|
||||||
//! # Required env vars
|
//! # Required env vars
|
||||||
//!
|
//!
|
||||||
@@ -21,20 +26,20 @@
|
|||||||
//!
|
//!
|
||||||
//! - `MX_LIVE` (any non-empty value enables the live path)
|
//! - `MX_LIVE` (any non-empty value enables the live path)
|
||||||
//! - `MX_ASB_HOST` — ASB endpoint host[:port]; defaults port 808 if omitted
|
//! - `MX_ASB_HOST` — ASB endpoint host[:port]; defaults port 808 if omitted
|
||||||
//! (the WCF `NetTcpPortSharing` SMSvcHost listener — confirmed via the
|
//! - `MX_ASB_PASSPHRASE` — solution shared secret (DPAPI on real installs)
|
||||||
//! .NET probe's working endpoint at `src/MxAsbClient.Probe/Program.cs:5`)
|
|
||||||
//! - `MX_ASB_PASSPHRASE` — solution shared secret (typically read from
|
|
||||||
//! DPAPI on a real install; for CI / dev set directly via Infisical
|
|
||||||
//! per `tools/Setup-LiveProbeEnv.ps1`)
|
|
||||||
//! - `MX_ASB_VIA` — `net.tcp://host:port/ASBService` URL (optional;
|
//! - `MX_ASB_VIA` — `net.tcp://host:port/ASBService` URL (optional;
|
||||||
//! derived from `MX_ASB_HOST` when omitted)
|
//! derived from `MX_ASB_HOST` when omitted)
|
||||||
//! - `MX_TEST_TAG` — tag reference (default `TestChildObject.TestInt`)
|
//! - `MX_TEST_TAG` — tag reference (default `TestChildObject.TestInt`)
|
||||||
|
//! - `MX_RUN_WRITE` — set to `0` to skip the Write step (default: run)
|
||||||
|
//! - `MX_RUN_SUBSCRIBE` — set to `0` to skip the subscribe flow (default: run)
|
||||||
|
//! - `MX_SUBSCRIBE_COUNT` — number of `Publish` polls (default 3)
|
||||||
|
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use mxaccess::AsbTransport;
|
use mxaccess::AsbTransport;
|
||||||
use mxaccess_asb::ItemIdentity;
|
use mxaccess_asb::{ItemIdentity, MinimalMonitoredItem, MinimalWriteValue};
|
||||||
use mxaccess_asb_nettcp::auth::{CryptoParameters, HashAlgorithm};
|
use mxaccess_asb_nettcp::auth::{CryptoParameters, HashAlgorithm};
|
||||||
|
use mxaccess_codec::AsbVariant;
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
@@ -88,21 +93,156 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||||||
register.status.first().map(|s| s.error_code).unwrap_or(0)
|
register.status.first().map(|s| s.error_code).unwrap_or(0)
|
||||||
);
|
);
|
||||||
|
|
||||||
eprintln!("reading {} (timeout 5s)", env.tag);
|
eprintln!("reading {} (timeout 5s) [canonical XML]", env.tag);
|
||||||
let read = tokio::time::timeout(Duration::from_secs(5), client.read(&items)).await??;
|
let read = tokio::time::timeout(Duration::from_secs(5), client.read(&items)).await??;
|
||||||
for (status, value) in read.status.iter().zip(read.values.iter()) {
|
for (status, value) in read.status.iter().zip(read.values.iter()) {
|
||||||
println!(
|
println!(
|
||||||
"{} = {:?} (error_code 0x{:04x})",
|
" {} = {:?} (error_code 0x{:04x})",
|
||||||
status.item.name.as_deref().unwrap_or("?"),
|
status.item.name.as_deref().unwrap_or("?"),
|
||||||
value.value,
|
value.value,
|
||||||
status.error_code
|
status.error_code
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
if read.values.is_empty() {
|
if read.values.is_empty() {
|
||||||
println!("{} returned no values yet (status only)", env.tag);
|
println!(" {} returned no values yet (status only)", env.tag);
|
||||||
}
|
}
|
||||||
|
|
||||||
eprintln!("disconnecting");
|
// F28 step 2 live verification — exercise every newly-canonical-XML-
|
||||||
|
// signed op end-to-end. Each section logs the result_code/success so
|
||||||
|
// a failure surfaces clearly; per-section errors are caught and
|
||||||
|
// reported but don't abort the lifecycle (we still want to reach
|
||||||
|
// Disconnect cleanly to release the server-side connection).
|
||||||
|
|
||||||
|
// -- Write -------------------------------------------------------------
|
||||||
|
if env.run_write {
|
||||||
|
eprintln!("writing {} = 99 [canonical XML Write]", env.tag);
|
||||||
|
let value = MinimalWriteValue::new(AsbVariant::from_i32(99));
|
||||||
|
match client.write(&items, &[value], 0).await {
|
||||||
|
Ok(resp) => eprintln!(
|
||||||
|
" write status: {} item(s); result_code={:?} success={:?}; first error_code = 0x{:04x}",
|
||||||
|
resp.status.len(),
|
||||||
|
resp.result_code,
|
||||||
|
resp.success,
|
||||||
|
resp.status.first().map(|s| s.error_code).unwrap_or(0),
|
||||||
|
),
|
||||||
|
Err(e) => eprintln!(" write failed: {e}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// -- Subscribe-flow ----------------------------------------------------
|
||||||
|
if env.run_subscribe {
|
||||||
|
eprintln!("creating subscription [canonical XML CreateSubscription] (max_queue=100, sample=1s)");
|
||||||
|
let sample_interval_ticks: u64 = 10_000_000; // 1 second
|
||||||
|
let max_queue_size: i64 = 100;
|
||||||
|
let sub_response = match client.create_subscription(max_queue_size, sample_interval_ticks).await {
|
||||||
|
Ok(r) => r,
|
||||||
|
Err(e) => {
|
||||||
|
eprintln!(" create_subscription failed: {e}");
|
||||||
|
eprintln!("disconnecting");
|
||||||
|
client.disconnect().await?;
|
||||||
|
client.send_end().await?;
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
};
|
||||||
|
eprintln!(
|
||||||
|
" subscription_id={} result_code={:?} success={:?}",
|
||||||
|
sub_response.subscription_id, sub_response.result_code, sub_response.success
|
||||||
|
);
|
||||||
|
|
||||||
|
let monitored = vec![MinimalMonitoredItem::new(
|
||||||
|
ItemIdentity::absolute_by_name(&env.tag),
|
||||||
|
sample_interval_ticks,
|
||||||
|
)];
|
||||||
|
|
||||||
|
eprintln!("adding monitored items [canonical XML AddMonitoredItems]");
|
||||||
|
let add = match client.add_monitored_items(sub_response.subscription_id, &monitored, true).await {
|
||||||
|
Ok(r) => r,
|
||||||
|
Err(e) => {
|
||||||
|
eprintln!(" add_monitored_items failed: {e}");
|
||||||
|
let _ = client.delete_subscription(sub_response.subscription_id).await;
|
||||||
|
eprintln!("disconnecting");
|
||||||
|
client.disconnect().await?;
|
||||||
|
client.send_end().await?;
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
};
|
||||||
|
eprintln!(
|
||||||
|
" add status: {} item(s); result_code={:?} success={:?}; first error_code = 0x{:04x}",
|
||||||
|
add.status.len(),
|
||||||
|
add.result_code,
|
||||||
|
add.success,
|
||||||
|
add.status.first().map(|s| s.error_code).unwrap_or(0),
|
||||||
|
);
|
||||||
|
|
||||||
|
eprintln!("publishing [canonical XML Publish] (target {} polls × 5s)", env.subscribe_count);
|
||||||
|
let mut total_values = 0usize;
|
||||||
|
for poll in 0..env.subscribe_count {
|
||||||
|
match tokio::time::timeout(
|
||||||
|
Duration::from_secs(5),
|
||||||
|
client.publish(sub_response.subscription_id),
|
||||||
|
).await {
|
||||||
|
Ok(Ok(resp)) => {
|
||||||
|
eprintln!(
|
||||||
|
" poll {poll}: {} value(s); result_code={:?} success={:?}",
|
||||||
|
resp.values.len(), resp.result_code, resp.success
|
||||||
|
);
|
||||||
|
for v in &resp.values {
|
||||||
|
total_values += 1;
|
||||||
|
println!(
|
||||||
|
" [{total_values}] {} = {:?}",
|
||||||
|
v.item.name.as_deref().unwrap_or("?"),
|
||||||
|
v.value.value
|
||||||
|
);
|
||||||
|
}
|
||||||
|
if resp.result_code
|
||||||
|
== Some(mxaccess_asb::RESULT_CODE_INVALID_CONNECTION_ID)
|
||||||
|
{
|
||||||
|
eprintln!(" publish surfaced InvalidConnectionId; bailing the loop");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
// Other non-zero result_codes (e.g. 32) are
|
||||||
|
// informational — observed-live value 32 fires on
|
||||||
|
// every poll while .NET still receives values, so
|
||||||
|
// we keep draining.
|
||||||
|
}
|
||||||
|
Ok(Err(e)) => {
|
||||||
|
eprintln!(" poll {poll} failed: {e}");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Err(_) => {
|
||||||
|
eprintln!(" poll {poll} timed out");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// -- PublishWriteComplete (canonical XML PublishWriteComplete)
|
||||||
|
match client.publish_write_complete().await {
|
||||||
|
Ok(resp) => eprintln!(
|
||||||
|
"publish_write_complete: {} write(s); result_code={:?}",
|
||||||
|
resp.complete_writes_count, resp.result_code,
|
||||||
|
),
|
||||||
|
Err(e) => eprintln!("publish_write_complete failed: {e}"),
|
||||||
|
}
|
||||||
|
|
||||||
|
// -- DeleteMonitoredItems / DeleteSubscription
|
||||||
|
if let Err(e) = client.delete_monitored_items(sub_response.subscription_id, &monitored).await {
|
||||||
|
eprintln!("delete_monitored_items failed: {e}");
|
||||||
|
} else {
|
||||||
|
eprintln!("delete_monitored_items ok [canonical XML DeleteMonitoredItems]");
|
||||||
|
}
|
||||||
|
if let Err(e) = client.delete_subscription(sub_response.subscription_id).await {
|
||||||
|
eprintln!("delete_subscription failed: {e}");
|
||||||
|
} else {
|
||||||
|
eprintln!("delete_subscription ok [canonical XML DeleteSubscription]");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
eprintln!("unregistering [canonical XML UnregisterItems]");
|
||||||
|
if let Err(e) = client.unregister_items(&items).await {
|
||||||
|
eprintln!(" unregister_items failed: {e}");
|
||||||
|
}
|
||||||
|
|
||||||
|
eprintln!("disconnecting [canonical XML Disconnect]");
|
||||||
client.disconnect().await?;
|
client.disconnect().await?;
|
||||||
client.send_end().await?;
|
client.send_end().await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
@@ -115,6 +255,17 @@ struct LiveEnv {
|
|||||||
passphrase: String,
|
passphrase: String,
|
||||||
via_uri: String,
|
via_uri: String,
|
||||||
tag: String,
|
tag: String,
|
||||||
|
/// Whether to exercise the Write op (`MX_RUN_WRITE != "0"`).
|
||||||
|
/// On by default — overwriting `TestChildObject.TestInt = 99`
|
||||||
|
/// is reversible (the next run reads back the same value, since
|
||||||
|
/// the demo writes its own marker).
|
||||||
|
run_write: bool,
|
||||||
|
/// Whether to exercise the full Subscribe flow
|
||||||
|
/// (`MX_RUN_SUBSCRIBE != "0"`). On by default.
|
||||||
|
run_subscribe: bool,
|
||||||
|
/// Number of `Publish` polls to issue inside the subscribe flow
|
||||||
|
/// (default 3). Each poll has a 5-second timeout.
|
||||||
|
subscribe_count: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl LiveEnv {
|
impl LiveEnv {
|
||||||
@@ -129,11 +280,20 @@ impl LiveEnv {
|
|||||||
let via_uri =
|
let via_uri =
|
||||||
std::env::var("MX_ASB_VIA").unwrap_or_else(|_| format!("net.tcp://{host}/ASBService"));
|
std::env::var("MX_ASB_VIA").unwrap_or_else(|_| format!("net.tcp://{host}/ASBService"));
|
||||||
let tag = std::env::var("MX_TEST_TAG").unwrap_or_else(|_| "TestChildObject.TestInt".into());
|
let tag = std::env::var("MX_TEST_TAG").unwrap_or_else(|_| "TestChildObject.TestInt".into());
|
||||||
|
let run_write = std::env::var("MX_RUN_WRITE").map(|v| v != "0").unwrap_or(true);
|
||||||
|
let run_subscribe = std::env::var("MX_RUN_SUBSCRIBE").map(|v| v != "0").unwrap_or(true);
|
||||||
|
let subscribe_count = std::env::var("MX_SUBSCRIBE_COUNT")
|
||||||
|
.ok()
|
||||||
|
.and_then(|s| s.parse().ok())
|
||||||
|
.unwrap_or(3);
|
||||||
Ok(Some(Self {
|
Ok(Some(Self {
|
||||||
addr,
|
addr,
|
||||||
passphrase,
|
passphrase,
|
||||||
via_uri,
|
via_uri,
|
||||||
tag,
|
tag,
|
||||||
|
run_write,
|
||||||
|
run_subscribe,
|
||||||
|
subscribe_count,
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -363,24 +363,28 @@ async fn publish_loop<F, Fut>(
|
|||||||
loop {
|
loop {
|
||||||
match publish_fn().await {
|
match publish_fn().await {
|
||||||
Ok(response) => {
|
Ok(response) => {
|
||||||
// F33: if the server short-circuited with a non-zero
|
// F33: terminate the stream on the specific
|
||||||
// resultCodeField (e.g. InvalidConnectionId), terminate
|
// `InvalidConnectionId` short-circuit (result_code=1) —
|
||||||
// the stream rather than silently delivering empty
|
// that one means the session is desynced and every
|
||||||
// value batches forever. Caller can still inspect the
|
// future Publish will return the same empty payload.
|
||||||
// error via the final stream item.
|
// Other non-zero result_codes are informational —
|
||||||
if let Some(code) = response.result_code {
|
// observed-live (against MxDataProvider) value 32 fires
|
||||||
if code != 0 {
|
// on every Publish poll while values are still
|
||||||
let _ = tx
|
// delivered, so blanket "bail on any non-zero"
|
||||||
.send(Err(Error::Connection(
|
// (the original F33 fix) was too aggressive.
|
||||||
ConnectionError::TransportFailure {
|
if response.result_code
|
||||||
detail: format!(
|
== Some(mxaccess_asb::RESULT_CODE_INVALID_CONNECTION_ID)
|
||||||
"publish returned result_code 0x{code:08X} (server-side rejection)"
|
{
|
||||||
),
|
let _ = tx
|
||||||
},
|
.send(Err(Error::Connection(
|
||||||
)))
|
ConnectionError::TransportFailure {
|
||||||
.await;
|
detail: "publish returned InvalidConnectionId — \
|
||||||
return;
|
session desynced, terminating stream"
|
||||||
}
|
.to_string(),
|
||||||
|
},
|
||||||
|
)))
|
||||||
|
.await;
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
for value in response.values {
|
for value in response.values {
|
||||||
if tx.send(Ok(value)).await.is_err() {
|
if tx.send(Ok(value)).await.is_err() {
|
||||||
|
|||||||
Reference in New Issue
Block a user