[F33 progress] mxaccess-asb: extend InvalidConnectionId tolerance to subscribe ops
rust / build / test / clippy / fmt (push) Has been cancelled
rust / build / test / clippy / fmt (push) Has been cancelled
Continues the F31 tolerance pattern propagation to the two subscribe-path decoders called out in F33. CreateSubscriptionResponse: - Adds result_code: Option<u32> + success: Option<bool> fields. - decode_create_subscription_response no longer errors with MissingField "SubscriptionId" — when the server short-circuits on InvalidConnectionId it returns the Result wrapper without a SubscriptionId. The decoder returns subscription_id=0 in that case with result_code/success surfaced; callers inspect result_code before treating subscription_id as valid. - AsbClient::create_subscription wraps the call in the same retry loop register_items uses (10 attempts × 200·N ms backoff on RESULT_CODE_INVALID_CONNECTION_ID). AddMonitoredItemsResponse: - Adds result_code + success fields. - decode_add_monitored_items_response tolerates an empty / missing <ASBIData /> Status payload (returns empty Vec) and surfaces result_code/success. - AsbClient::add_monitored_items gets the same retry loop. Both decoders now match the F31 + Read shape: tolerant of empty payloads when the server short-circuits, surface the wrapper's result_code so callers (and the in-client retry loop) can detect the InvalidConnectionId race. Updated obsolete unit test (create_subscription_response_missing_id_fails → create_subscription_response_missing_id_returns_zero_sentinel) plus two new tests pinning the InvalidConnectionId synthesis path for both decoders. Workspace: mxaccess-asb 80 → 82 tests; default-feature clippy clean; existing live-read still passes (verified separately). This is the second of three F33 closures. Remaining: applying the same tolerance to decode_publish_response (and optionally decode_delete_*_response, decode_unregister_items_response, decode_write_response, decode_publish_write_complete_response for symmetry). With CreateSubscription + AddMonitoredItems tolerant + retrying, the subscribe-flow example should now reach the publish-loop stage instead of bailing at the second op. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -528,10 +528,41 @@ impl<T: AsyncRead + AsyncWrite + Unpin + Send> AsbClient<T> {
|
||||
/// subscription and returns its ID. Caller threads the ID through
|
||||
/// subsequent `add_monitored_items` / `publish` /
|
||||
/// `delete_subscription` calls.
|
||||
///
|
||||
/// Retries on `InvalidConnectionId` per the F31 pattern (10
|
||||
/// attempts × 200·N ms backoff).
|
||||
pub async fn create_subscription(
|
||||
&mut self,
|
||||
max_queue_size: i64,
|
||||
sample_interval: u64,
|
||||
) -> Result<CreateSubscriptionResponse, ClientError> {
|
||||
const MAX_ATTEMPTS: u32 = 10;
|
||||
const BACKOFF_BASE_MS: u64 = 200;
|
||||
|
||||
let mut response = self
|
||||
.create_subscription_once(max_queue_size, sample_interval)
|
||||
.await?;
|
||||
let mut attempt = 1u32;
|
||||
while attempt < MAX_ATTEMPTS
|
||||
&& response.result_code
|
||||
== Some(crate::operations::RESULT_CODE_INVALID_CONNECTION_ID)
|
||||
{
|
||||
tokio::time::sleep(std::time::Duration::from_millis(
|
||||
BACKOFF_BASE_MS * u64::from(attempt),
|
||||
))
|
||||
.await;
|
||||
response = self
|
||||
.create_subscription_once(max_queue_size, sample_interval)
|
||||
.await?;
|
||||
attempt += 1;
|
||||
}
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
async fn create_subscription_once(
|
||||
&mut self,
|
||||
max_queue_size: i64,
|
||||
sample_interval: u64,
|
||||
) -> Result<CreateSubscriptionResponse, ClientError> {
|
||||
let body = build_create_subscription_request_body(max_queue_size, sample_interval);
|
||||
let response = self
|
||||
@@ -547,11 +578,43 @@ impl<T: AsyncRead + AsyncWrite + Unpin + Send> AsbClient<T> {
|
||||
/// subscription. Uses [`MinimalMonitoredItem`] (Item +
|
||||
/// SampleInterval + Buffered); optional fields are deferred to a
|
||||
/// later F25 iteration.
|
||||
///
|
||||
/// Retries on `InvalidConnectionId` per the F31 pattern (10
|
||||
/// attempts × 200·N ms backoff).
|
||||
pub async fn add_monitored_items(
|
||||
&mut self,
|
||||
subscription_id: i64,
|
||||
items: &[MinimalMonitoredItem],
|
||||
require_id: bool,
|
||||
) -> Result<AddMonitoredItemsResponse, ClientError> {
|
||||
const MAX_ATTEMPTS: u32 = 10;
|
||||
const BACKOFF_BASE_MS: u64 = 200;
|
||||
|
||||
let mut response = self
|
||||
.add_monitored_items_once(subscription_id, items, require_id)
|
||||
.await?;
|
||||
let mut attempt = 1u32;
|
||||
while attempt < MAX_ATTEMPTS
|
||||
&& response.result_code
|
||||
== Some(crate::operations::RESULT_CODE_INVALID_CONNECTION_ID)
|
||||
{
|
||||
tokio::time::sleep(std::time::Duration::from_millis(
|
||||
BACKOFF_BASE_MS * u64::from(attempt),
|
||||
))
|
||||
.await;
|
||||
response = self
|
||||
.add_monitored_items_once(subscription_id, items, require_id)
|
||||
.await?;
|
||||
attempt += 1;
|
||||
}
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
async fn add_monitored_items_once(
|
||||
&mut self,
|
||||
subscription_id: i64,
|
||||
items: &[MinimalMonitoredItem],
|
||||
require_id: bool,
|
||||
) -> Result<AddMonitoredItemsResponse, ClientError> {
|
||||
let body = build_add_monitored_items_request_body(subscription_id, items, require_id);
|
||||
let response = self
|
||||
|
||||
@@ -849,51 +849,87 @@ impl MinimalMonitoredItem {
|
||||
}
|
||||
}
|
||||
|
||||
/// Decoded `CreateSubscriptionResponse`. Single field per
|
||||
/// `AsbContracts.cs:225-230`.
|
||||
/// Decoded `CreateSubscriptionResponse`. Single primitive field per
|
||||
/// `AsbContracts.cs:225-230` plus the F31-style result_code/success
|
||||
/// surface.
|
||||
///
|
||||
/// `subscription_id` is `0` when the server short-circuits on
|
||||
/// `InvalidConnectionId` and never assigns one — check `result_code`
|
||||
/// for `RESULT_CODE_INVALID_CONNECTION_ID` (1) before treating
|
||||
/// `subscription_id` as valid.
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub struct CreateSubscriptionResponse {
|
||||
pub subscription_id: i64,
|
||||
/// `Result.resultCodeField` from the response wrapper. `Some(1)` =
|
||||
/// `InvalidConnectionId` — caller should retry. `None` if the
|
||||
/// field wasn't present (the success path doesn't necessarily
|
||||
/// emit it).
|
||||
pub result_code: Option<u32>,
|
||||
/// `Result.successField` — `false` means the operation failed
|
||||
/// server-side and `subscription_id` is unset.
|
||||
pub success: Option<bool>,
|
||||
}
|
||||
|
||||
/// Decode a `CreateSubscriptionResponse` SOAP body. Looks for an inline
|
||||
/// `<SubscriptionId>` text element under the wrapper.
|
||||
/// Decode a `CreateSubscriptionResponse` SOAP body.
|
||||
///
|
||||
/// Tolerates a missing `<SubscriptionId>` element — that's how the
|
||||
/// server signals an operation-level failure (`successField=false`
|
||||
/// with non-zero `resultCodeField`). Mirrors the F31 tolerance
|
||||
/// pattern. Caller inspects `result_code` / `success`.
|
||||
pub fn decode_create_subscription_response(
|
||||
body_tokens: &[NbfxToken],
|
||||
dynamic: &mxaccess_asb_nettcp::nbfx::DynamicDictionary,
|
||||
) -> Result<CreateSubscriptionResponse, OperationError> {
|
||||
let id = find_inline_int64(body_tokens, "SubscriptionId", dynamic).ok_or(
|
||||
OperationError::MissingField {
|
||||
field: "SubscriptionId",
|
||||
},
|
||||
)?;
|
||||
let subscription_id =
|
||||
find_inline_int64(body_tokens, "SubscriptionId", dynamic).unwrap_or(0);
|
||||
let result_code = find_text_in_named_element(body_tokens, "resultCodeField")
|
||||
.and_then(|s| s.parse().ok());
|
||||
let success = find_text_in_named_element(body_tokens, "successField")
|
||||
.map(|s| s.eq_ignore_ascii_case("true"));
|
||||
Ok(CreateSubscriptionResponse {
|
||||
subscription_id: id,
|
||||
subscription_id,
|
||||
result_code,
|
||||
success,
|
||||
})
|
||||
}
|
||||
|
||||
/// Decoded `AddMonitoredItemsResponse`. `ItemCapabilities` is regular
|
||||
/// WCF XML (not the binary fast-path) — currently surfaces as a presence
|
||||
/// flag, mirroring `RegisterItemsResponse`.
|
||||
///
|
||||
/// Tolerates empty / missing `<ASBIData />` Status payloads under the
|
||||
/// F31 pattern; surfaces `result_code` / `success` so callers can
|
||||
/// retry on `InvalidConnectionId`.
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub struct AddMonitoredItemsResponse {
|
||||
pub status: Vec<ItemStatus>,
|
||||
pub item_capabilities_present: bool,
|
||||
/// `Result.resultCodeField` from the response wrapper. `Some(1)` =
|
||||
/// `InvalidConnectionId`.
|
||||
pub result_code: Option<u32>,
|
||||
/// `Result.successField` — `false` means the per-item Status array
|
||||
/// is empty.
|
||||
pub success: Option<bool>,
|
||||
}
|
||||
|
||||
pub fn decode_add_monitored_items_response(
|
||||
body_tokens: &[NbfxToken],
|
||||
) -> Result<AddMonitoredItemsResponse, OperationError> {
|
||||
let payloads = collect_asbidata_payloads(body_tokens);
|
||||
let status_payload = payloads
|
||||
.into_iter()
|
||||
.next()
|
||||
.ok_or(OperationError::MissingField { field: "Status" })?;
|
||||
let status = decode_item_status_array(&status_payload)?;
|
||||
let status = match payloads.into_iter().next() {
|
||||
Some(payload) if !payload.is_empty() => decode_item_status_array(&payload)?,
|
||||
_ => Vec::new(),
|
||||
};
|
||||
let item_capabilities_present = find_element_named(body_tokens, "ItemCapabilities").is_some();
|
||||
let result_code = find_text_in_named_element(body_tokens, "resultCodeField")
|
||||
.and_then(|s| s.parse().ok());
|
||||
let success = find_text_in_named_element(body_tokens, "successField")
|
||||
.map(|s| s.eq_ignore_ascii_case("true"));
|
||||
Ok(AddMonitoredItemsResponse {
|
||||
status,
|
||||
item_capabilities_present,
|
||||
result_code,
|
||||
success,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -2310,7 +2346,12 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn create_subscription_response_missing_id_fails() {
|
||||
fn create_subscription_response_missing_id_returns_zero_sentinel() {
|
||||
// Post-F33 the decoder is tolerant of missing SubscriptionId
|
||||
// (that's how the server signals an InvalidConnectionId
|
||||
// failure). It returns subscription_id=0 with result_code/
|
||||
// success unset; callers inspect those for a real success
|
||||
// signal.
|
||||
use mxaccess_asb_nettcp::nbfx::DynamicDictionary;
|
||||
let body = vec![
|
||||
NbfxToken::Element {
|
||||
@@ -2320,13 +2361,30 @@ mod tests {
|
||||
NbfxToken::EndElement,
|
||||
];
|
||||
let dict = DynamicDictionary::new();
|
||||
let err = decode_create_subscription_response(&body, &dict).unwrap_err();
|
||||
assert!(matches!(
|
||||
err,
|
||||
OperationError::MissingField {
|
||||
field: "SubscriptionId"
|
||||
}
|
||||
));
|
||||
let response = decode_create_subscription_response(&body, &dict).unwrap();
|
||||
assert_eq!(response.subscription_id, 0);
|
||||
assert_eq!(response.result_code, None);
|
||||
assert_eq!(response.success, None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn create_subscription_response_surfaces_invalid_connection_id() {
|
||||
use mxaccess_asb_nettcp::nbfx::DynamicDictionary;
|
||||
let body = synthesise_invalid_connection_id_body("CreateSubscriptionResponse");
|
||||
let dict = DynamicDictionary::new();
|
||||
let response = decode_create_subscription_response(&body, &dict).unwrap();
|
||||
assert_eq!(response.subscription_id, 0);
|
||||
assert_eq!(response.result_code, Some(1));
|
||||
assert_eq!(response.success, Some(false));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn add_monitored_items_response_surfaces_invalid_connection_id() {
|
||||
let body = synthesise_invalid_connection_id_body("AddMonitoredItemsResponse");
|
||||
let response = decode_add_monitored_items_response(&body).unwrap();
|
||||
assert!(response.status.is_empty());
|
||||
assert_eq!(response.result_code, Some(1));
|
||||
assert_eq!(response.success, Some(false));
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
Reference in New Issue
Block a user