diff --git a/rust/crates/mxaccess-asb/src/client.rs b/rust/crates/mxaccess-asb/src/client.rs index e273935..a0eeae6 100644 --- a/rust/crates/mxaccess-asb/src/client.rs +++ b/rust/crates/mxaccess-asb/src/client.rs @@ -528,10 +528,41 @@ impl AsbClient { /// subscription and returns its ID. Caller threads the ID through /// subsequent `add_monitored_items` / `publish` / /// `delete_subscription` calls. + /// + /// Retries on `InvalidConnectionId` per the F31 pattern (10 + /// attempts × 200·N ms backoff). pub async fn create_subscription( &mut self, max_queue_size: i64, sample_interval: u64, + ) -> Result { + const MAX_ATTEMPTS: u32 = 10; + const BACKOFF_BASE_MS: u64 = 200; + + let mut response = self + .create_subscription_once(max_queue_size, sample_interval) + .await?; + let mut attempt = 1u32; + while attempt < MAX_ATTEMPTS + && response.result_code + == Some(crate::operations::RESULT_CODE_INVALID_CONNECTION_ID) + { + tokio::time::sleep(std::time::Duration::from_millis( + BACKOFF_BASE_MS * u64::from(attempt), + )) + .await; + response = self + .create_subscription_once(max_queue_size, sample_interval) + .await?; + attempt += 1; + } + Ok(response) + } + + async fn create_subscription_once( + &mut self, + max_queue_size: i64, + sample_interval: u64, ) -> Result { let body = build_create_subscription_request_body(max_queue_size, sample_interval); let response = self @@ -547,11 +578,43 @@ impl AsbClient { /// subscription. Uses [`MinimalMonitoredItem`] (Item + /// SampleInterval + Buffered); optional fields are deferred to a /// later F25 iteration. + /// + /// Retries on `InvalidConnectionId` per the F31 pattern (10 + /// attempts × 200·N ms backoff). pub async fn add_monitored_items( &mut self, subscription_id: i64, items: &[MinimalMonitoredItem], require_id: bool, + ) -> Result { + const MAX_ATTEMPTS: u32 = 10; + const BACKOFF_BASE_MS: u64 = 200; + + let mut response = self + .add_monitored_items_once(subscription_id, items, require_id) + .await?; + let mut attempt = 1u32; + while attempt < MAX_ATTEMPTS + && response.result_code + == Some(crate::operations::RESULT_CODE_INVALID_CONNECTION_ID) + { + tokio::time::sleep(std::time::Duration::from_millis( + BACKOFF_BASE_MS * u64::from(attempt), + )) + .await; + response = self + .add_monitored_items_once(subscription_id, items, require_id) + .await?; + attempt += 1; + } + Ok(response) + } + + async fn add_monitored_items_once( + &mut self, + subscription_id: i64, + items: &[MinimalMonitoredItem], + require_id: bool, ) -> Result { let body = build_add_monitored_items_request_body(subscription_id, items, require_id); let response = self diff --git a/rust/crates/mxaccess-asb/src/operations.rs b/rust/crates/mxaccess-asb/src/operations.rs index cd89c4c..6e8df7d 100644 --- a/rust/crates/mxaccess-asb/src/operations.rs +++ b/rust/crates/mxaccess-asb/src/operations.rs @@ -849,51 +849,87 @@ impl MinimalMonitoredItem { } } -/// Decoded `CreateSubscriptionResponse`. Single field per -/// `AsbContracts.cs:225-230`. +/// Decoded `CreateSubscriptionResponse`. Single primitive field per +/// `AsbContracts.cs:225-230` plus the F31-style result_code/success +/// surface. +/// +/// `subscription_id` is `0` when the server short-circuits on +/// `InvalidConnectionId` and never assigns one — check `result_code` +/// for `RESULT_CODE_INVALID_CONNECTION_ID` (1) before treating +/// `subscription_id` as valid. #[derive(Debug, Clone, PartialEq)] pub struct CreateSubscriptionResponse { pub subscription_id: i64, + /// `Result.resultCodeField` from the response wrapper. `Some(1)` = + /// `InvalidConnectionId` — caller should retry. `None` if the + /// field wasn't present (the success path doesn't necessarily + /// emit it). + pub result_code: Option, + /// `Result.successField` — `false` means the operation failed + /// server-side and `subscription_id` is unset. + pub success: Option, } -/// Decode a `CreateSubscriptionResponse` SOAP body. Looks for an inline -/// `` text element under the wrapper. +/// Decode a `CreateSubscriptionResponse` SOAP body. +/// +/// Tolerates a missing `` element — that's how the +/// server signals an operation-level failure (`successField=false` +/// with non-zero `resultCodeField`). Mirrors the F31 tolerance +/// pattern. Caller inspects `result_code` / `success`. pub fn decode_create_subscription_response( body_tokens: &[NbfxToken], dynamic: &mxaccess_asb_nettcp::nbfx::DynamicDictionary, ) -> Result { - let id = find_inline_int64(body_tokens, "SubscriptionId", dynamic).ok_or( - OperationError::MissingField { - field: "SubscriptionId", - }, - )?; + let subscription_id = + find_inline_int64(body_tokens, "SubscriptionId", dynamic).unwrap_or(0); + let result_code = find_text_in_named_element(body_tokens, "resultCodeField") + .and_then(|s| s.parse().ok()); + let success = find_text_in_named_element(body_tokens, "successField") + .map(|s| s.eq_ignore_ascii_case("true")); Ok(CreateSubscriptionResponse { - subscription_id: id, + subscription_id, + result_code, + success, }) } /// Decoded `AddMonitoredItemsResponse`. `ItemCapabilities` is regular /// WCF XML (not the binary fast-path) — currently surfaces as a presence /// flag, mirroring `RegisterItemsResponse`. +/// +/// Tolerates empty / missing `` Status payloads under the +/// F31 pattern; surfaces `result_code` / `success` so callers can +/// retry on `InvalidConnectionId`. #[derive(Debug, Clone, PartialEq)] pub struct AddMonitoredItemsResponse { pub status: Vec, pub item_capabilities_present: bool, + /// `Result.resultCodeField` from the response wrapper. `Some(1)` = + /// `InvalidConnectionId`. + pub result_code: Option, + /// `Result.successField` — `false` means the per-item Status array + /// is empty. + pub success: Option, } pub fn decode_add_monitored_items_response( body_tokens: &[NbfxToken], ) -> Result { let payloads = collect_asbidata_payloads(body_tokens); - let status_payload = payloads - .into_iter() - .next() - .ok_or(OperationError::MissingField { field: "Status" })?; - let status = decode_item_status_array(&status_payload)?; + let status = match payloads.into_iter().next() { + Some(payload) if !payload.is_empty() => decode_item_status_array(&payload)?, + _ => Vec::new(), + }; let item_capabilities_present = find_element_named(body_tokens, "ItemCapabilities").is_some(); + let result_code = find_text_in_named_element(body_tokens, "resultCodeField") + .and_then(|s| s.parse().ok()); + let success = find_text_in_named_element(body_tokens, "successField") + .map(|s| s.eq_ignore_ascii_case("true")); Ok(AddMonitoredItemsResponse { status, item_capabilities_present, + result_code, + success, }) } @@ -2310,7 +2346,12 @@ mod tests { } #[test] - fn create_subscription_response_missing_id_fails() { + fn create_subscription_response_missing_id_returns_zero_sentinel() { + // Post-F33 the decoder is tolerant of missing SubscriptionId + // (that's how the server signals an InvalidConnectionId + // failure). It returns subscription_id=0 with result_code/ + // success unset; callers inspect those for a real success + // signal. use mxaccess_asb_nettcp::nbfx::DynamicDictionary; let body = vec![ NbfxToken::Element { @@ -2320,13 +2361,30 @@ mod tests { NbfxToken::EndElement, ]; let dict = DynamicDictionary::new(); - let err = decode_create_subscription_response(&body, &dict).unwrap_err(); - assert!(matches!( - err, - OperationError::MissingField { - field: "SubscriptionId" - } - )); + let response = decode_create_subscription_response(&body, &dict).unwrap(); + assert_eq!(response.subscription_id, 0); + assert_eq!(response.result_code, None); + assert_eq!(response.success, None); + } + + #[test] + fn create_subscription_response_surfaces_invalid_connection_id() { + use mxaccess_asb_nettcp::nbfx::DynamicDictionary; + let body = synthesise_invalid_connection_id_body("CreateSubscriptionResponse"); + let dict = DynamicDictionary::new(); + let response = decode_create_subscription_response(&body, &dict).unwrap(); + assert_eq!(response.subscription_id, 0); + assert_eq!(response.result_code, Some(1)); + assert_eq!(response.success, Some(false)); + } + + #[test] + fn add_monitored_items_response_surfaces_invalid_connection_id() { + let body = synthesise_invalid_connection_id_body("AddMonitoredItemsResponse"); + let response = decode_add_monitored_items_response(&body).unwrap(); + assert!(response.status.is_empty()); + assert_eq!(response.result_code, Some(1)); + assert_eq!(response.success, Some(false)); } #[test]