mxaccess-asb: extend F31 InvalidConnectionId tolerance to Read
rust / build / test / clippy / fmt (push) Has been cancelled
rust / build / test / clippy / fmt (push) Has been cancelled
Live MX_ASB_TRACE_REPLY capture against MxDataProvider during the
F33 investigation showed Read hitting the same InvalidConnectionId
race that F31 fixed for register_items: server replies with a
Result wrapper carrying resultCodeField=1 + successField=false plus
two empty <ASBIData /> payloads. The decoder bailed with
MissingField "Status" instead of surfacing result_code.
Two changes:
1. ReadResponse gains result_code: Option<u32> and success:
Option<bool> fields, matching the RegisterItemsResponse shape.
decode_read_response tolerates empty / missing <ASBIData />
payloads (returns empty status + values arrays) and surfaces
the wrapper's result_code / success via
find_text_in_named_element.
2. AsbClient::read gets a retry loop mirroring register_items:
MAX_ATTEMPTS=10, BACKOFF_BASE_MS=200, total worst-case ~11s.
Internal read_once helper does a single attempt; the public
read() walks the retry budget on
RESULT_CODE_INVALID_CONNECTION_ID.
Live verification: cargo run -p mxaccess --example asb-subscribe
returned `TestChildObject.TestInt = AsbVariant { type_id: 4,
length: 4, payload: [99, 0, 0, 0] }` after presumably one or more
transient retries (the previous run without the retry hit
"MissingField Status" against the same server state).
1 new test
(read_response_tolerates_empty_asbidata_when_invalid_connection_id)
plus a synthesise_invalid_connection_id_body helper that builds
the canonical wire shape captured live (Result wrapper +
resultCodeField=1 + successField=false + two empty <ASBIData />
elements). Workspace 718 → 722 tests... wait, mxaccess-asb went
79 → 80 (+1). Tests still all green; clippy clean on default and
windows-com features.
This is foundation for closing F33: the same tolerance pattern
needs to apply to the subscribe decoders
(decode_create_subscription_response,
decode_add_monitored_items_response, decode_publish_response)
once a similar live-trace capture confirms their wire shapes.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -433,7 +433,35 @@ impl<T: AsyncRead + AsyncWrite + Unpin + Send> AsbClient<T> {
|
|||||||
|
|
||||||
/// `Read` operation — sends a signed `ReadIn` SOAP envelope and
|
/// `Read` operation — sends a signed `ReadIn` SOAP envelope and
|
||||||
/// decodes the `ReadResponse` (Status array + Values array).
|
/// decodes the `ReadResponse` (Status array + Values array).
|
||||||
|
///
|
||||||
|
/// Retries up to 10 times with `200 * attempt` ms backoff on
|
||||||
|
/// `Result.resultCodeField == InvalidConnectionId (1)` — same
|
||||||
|
/// transient race that affects `register_items` (F31). Without
|
||||||
|
/// the retry, a Read issued shortly after `register_items`
|
||||||
|
/// exhausts its own retry budget can land in the same
|
||||||
|
/// pre-authenticated-state window and fail. Total worst-case
|
||||||
|
/// wait ~11s.
|
||||||
pub async fn read(&mut self, items: &[ItemIdentity]) -> Result<ReadResponse, ClientError> {
|
pub async fn read(&mut self, items: &[ItemIdentity]) -> Result<ReadResponse, ClientError> {
|
||||||
|
const MAX_ATTEMPTS: u32 = 10;
|
||||||
|
const BACKOFF_BASE_MS: u64 = 200;
|
||||||
|
|
||||||
|
let mut response = self.read_once(items).await?;
|
||||||
|
let mut attempt = 1u32;
|
||||||
|
while attempt < MAX_ATTEMPTS
|
||||||
|
&& response.result_code
|
||||||
|
== Some(crate::operations::RESULT_CODE_INVALID_CONNECTION_ID)
|
||||||
|
{
|
||||||
|
tokio::time::sleep(std::time::Duration::from_millis(
|
||||||
|
BACKOFF_BASE_MS * u64::from(attempt),
|
||||||
|
))
|
||||||
|
.await;
|
||||||
|
response = self.read_once(items).await?;
|
||||||
|
attempt += 1;
|
||||||
|
}
|
||||||
|
Ok(response)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn read_once(&mut self, items: &[ItemIdentity]) -> Result<ReadResponse, ClientError> {
|
||||||
let body = build_read_request_body(items);
|
let body = build_read_request_body(items);
|
||||||
let response = self
|
let response = self
|
||||||
.send_signed_envelope(actions::READ, body, None, false)
|
.send_signed_envelope(actions::READ, body, None, false)
|
||||||
|
|||||||
@@ -1012,24 +1012,47 @@ const MESSAGES_NS: &str = "http://asb.contracts.messages/20111111";
|
|||||||
pub struct ReadResponse {
|
pub struct ReadResponse {
|
||||||
pub status: Vec<ItemStatus>,
|
pub status: Vec<ItemStatus>,
|
||||||
pub values: Vec<RuntimeValue>,
|
pub values: Vec<RuntimeValue>,
|
||||||
|
/// `Result.resultCodeField` from the response wrapper. `Some(1)` =
|
||||||
|
/// `InvalidConnectionId` (transient race — see [`RESULT_CODE_INVALID_CONNECTION_ID`]
|
||||||
|
/// and `AsbClient::read`'s retry loop). `None` if the field wasn't
|
||||||
|
/// present (e.g. the server wrapped Read differently).
|
||||||
|
pub result_code: Option<u32>,
|
||||||
|
/// `Result.successField` — `false` means the operation failed
|
||||||
|
/// server-side and the per-item Status / Values arrays are empty.
|
||||||
|
pub success: Option<bool>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Decode a `ReadResponse` SOAP body from the NBFX tokens returned by
|
/// Decode a `ReadResponse` SOAP body from the NBFX tokens returned by
|
||||||
/// [`crate::decode_envelope`]. Both `Status` and `Values` arrive as
|
/// [`crate::decode_envelope`]. Both `Status` and `Values` arrive as
|
||||||
/// `<ASBIData>` payloads; we decode the binary form of each.
|
/// `<ASBIData>` payloads; we decode the binary form of each.
|
||||||
|
///
|
||||||
|
/// Tolerates empty / missing `<ASBIData>` payloads — that's how the
|
||||||
|
/// server signals an operation-level failure (`successField=false`
|
||||||
|
/// with a non-zero `resultCodeField`). Mirrors the tolerance pattern
|
||||||
|
/// applied to [`decode_register_items_response`] under F31. The
|
||||||
|
/// caller inspects `result_code` / `success` for transient failures
|
||||||
|
/// and retries.
|
||||||
pub fn decode_read_response(body_tokens: &[NbfxToken]) -> Result<ReadResponse, OperationError> {
|
pub fn decode_read_response(body_tokens: &[NbfxToken]) -> Result<ReadResponse, OperationError> {
|
||||||
let payloads = collect_asbidata_payloads(body_tokens);
|
let payloads = collect_asbidata_payloads(body_tokens);
|
||||||
let status_payload = payloads
|
let status = match payloads.first() {
|
||||||
.first()
|
Some(payload) if !payload.is_empty() => decode_item_status_array(payload)?,
|
||||||
.ok_or(OperationError::MissingField { field: "Status" })?;
|
_ => Vec::new(),
|
||||||
let status = decode_item_status_array(status_payload)?;
|
|
||||||
|
|
||||||
let values = match payloads.get(1) {
|
|
||||||
Some(payload) => decode_runtime_value_array(payload)?,
|
|
||||||
None => Vec::new(),
|
|
||||||
};
|
};
|
||||||
|
let values = match payloads.get(1) {
|
||||||
|
Some(payload) if !payload.is_empty() => decode_runtime_value_array(payload)?,
|
||||||
|
_ => Vec::new(),
|
||||||
|
};
|
||||||
|
let result_code = find_text_in_named_element(body_tokens, "resultCodeField")
|
||||||
|
.and_then(|s| s.parse().ok());
|
||||||
|
let success = find_text_in_named_element(body_tokens, "successField")
|
||||||
|
.map(|s| s.eq_ignore_ascii_case("true"));
|
||||||
|
|
||||||
Ok(ReadResponse { status, values })
|
Ok(ReadResponse {
|
||||||
|
status,
|
||||||
|
values,
|
||||||
|
result_code,
|
||||||
|
success,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Decode a `RuntimeValue[]` array from the WCF custom-serializer
|
/// Decode a `RuntimeValue[]` array from the WCF custom-serializer
|
||||||
@@ -1970,6 +1993,55 @@ mod tests {
|
|||||||
assert!(decoded.values.is_empty());
|
assert!(decoded.values.is_empty());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn read_response_tolerates_empty_asbidata_when_invalid_connection_id() {
|
||||||
|
// Mirrors the live wire capture from F33 — the server returns
|
||||||
|
// empty `<ASBIData />` Status + empty `<ASBIData />` Values
|
||||||
|
// when it short-circuits on InvalidConnectionId. Decode must
|
||||||
|
// surface result_code/success rather than erroring with
|
||||||
|
// MissingField "Status".
|
||||||
|
let body = synthesise_invalid_connection_id_body("ReadResponse");
|
||||||
|
let decoded = decode_read_response(&body).unwrap();
|
||||||
|
assert!(decoded.status.is_empty());
|
||||||
|
assert!(decoded.values.is_empty());
|
||||||
|
assert_eq!(decoded.result_code, Some(1));
|
||||||
|
assert_eq!(decoded.success, Some(false));
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Build a body shaped like the live `InvalidConnectionId` response
|
||||||
|
/// captured via `MX_ASB_TRACE_REPLY` against MxDataProvider:
|
||||||
|
/// Result wrapper with `resultCodeField=1`, `successField=false`,
|
||||||
|
/// then two empty `<ASBIData />` payloads (Status + the second
|
||||||
|
/// payload, e.g. Values for Read or absent for plain Register).
|
||||||
|
fn synthesise_invalid_connection_id_body(wrapper: &str) -> Vec<NbfxToken> {
|
||||||
|
use mxaccess_asb_nettcp::nbfx::{NbfxName, NbfxText, NbfxToken};
|
||||||
|
fn elem(name: &str) -> NbfxToken {
|
||||||
|
NbfxToken::Element {
|
||||||
|
prefix: None,
|
||||||
|
name: NbfxName::Inline(name.to_string()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let mut tokens = vec![
|
||||||
|
elem(wrapper),
|
||||||
|
// Result wrapper
|
||||||
|
elem("Result"),
|
||||||
|
elem("resultCodeField"),
|
||||||
|
NbfxToken::Text(NbfxText::One),
|
||||||
|
NbfxToken::EndElement,
|
||||||
|
elem("successField"),
|
||||||
|
NbfxToken::Text(NbfxText::Bool(false)),
|
||||||
|
NbfxToken::EndElement,
|
||||||
|
NbfxToken::EndElement, // </Result>
|
||||||
|
];
|
||||||
|
// Two empty <ASBIData /> payloads.
|
||||||
|
for _ in 0..2 {
|
||||||
|
tokens.push(elem("ASBIData"));
|
||||||
|
tokens.push(NbfxToken::EndElement);
|
||||||
|
}
|
||||||
|
tokens.push(NbfxToken::EndElement); // </{wrapper}>
|
||||||
|
tokens
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn publish_write_complete_body_is_empty_wrapper() {
|
fn publish_write_complete_body_is_empty_wrapper() {
|
||||||
let body = build_publish_write_complete_request_body();
|
let body = build_publish_write_complete_request_body();
|
||||||
|
|||||||
Reference in New Issue
Block a user