[F33 progress] mxaccess-asb: extend InvalidConnectionId tolerance to subscribe ops
rust / build / test / clippy / fmt (push) Has been cancelled

Continues the F31 tolerance pattern propagation to the two
subscribe-path decoders called out in F33.

CreateSubscriptionResponse:
- Adds result_code: Option<u32> + success: Option<bool> fields.
- decode_create_subscription_response no longer errors with
  MissingField "SubscriptionId" — when the server short-circuits on
  InvalidConnectionId it returns the Result wrapper without a
  SubscriptionId. The decoder returns subscription_id=0 in that case
  with result_code/success surfaced; callers inspect result_code
  before treating subscription_id as valid.
- AsbClient::create_subscription wraps the call in the same retry
  loop register_items uses (10 attempts × 200·N ms backoff on
  RESULT_CODE_INVALID_CONNECTION_ID).

AddMonitoredItemsResponse:
- Adds result_code + success fields.
- decode_add_monitored_items_response tolerates an empty / missing
  <ASBIData /> Status payload (returns empty Vec) and surfaces
  result_code/success.
- AsbClient::add_monitored_items gets the same retry loop.

Both decoders now match the F31 + Read shape: tolerant of empty
payloads when the server short-circuits, surface the wrapper's
result_code so callers (and the in-client retry loop) can detect
the InvalidConnectionId race.

Updated obsolete unit test
(create_subscription_response_missing_id_fails →
create_subscription_response_missing_id_returns_zero_sentinel) plus
two new tests pinning the InvalidConnectionId synthesis path for
both decoders.

Workspace: mxaccess-asb 80 → 82 tests; default-feature clippy
clean; existing live-read still passes (verified separately).

This is the second of three F33 closures. Remaining: applying the
same tolerance to decode_publish_response (and optionally
decode_delete_*_response, decode_unregister_items_response,
decode_write_response, decode_publish_write_complete_response for
symmetry). With CreateSubscription + AddMonitoredItems tolerant
+ retrying, the subscribe-flow example should now reach the
publish-loop stage instead of bailing at the second op.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-06 01:29:38 -04:00
parent 218f4c4ec8
commit 7a5f251ac7
2 changed files with 144 additions and 23 deletions
+63
View File
@@ -528,10 +528,41 @@ impl<T: AsyncRead + AsyncWrite + Unpin + Send> AsbClient<T> {
/// subscription and returns its ID. Caller threads the ID through /// subscription and returns its ID. Caller threads the ID through
/// subsequent `add_monitored_items` / `publish` / /// subsequent `add_monitored_items` / `publish` /
/// `delete_subscription` calls. /// `delete_subscription` calls.
///
/// Retries on `InvalidConnectionId` per the F31 pattern (10
/// attempts × 200·N ms backoff).
pub async fn create_subscription( pub async fn create_subscription(
&mut self, &mut self,
max_queue_size: i64, max_queue_size: i64,
sample_interval: u64, sample_interval: u64,
) -> Result<CreateSubscriptionResponse, ClientError> {
const MAX_ATTEMPTS: u32 = 10;
const BACKOFF_BASE_MS: u64 = 200;
let mut response = self
.create_subscription_once(max_queue_size, sample_interval)
.await?;
let mut attempt = 1u32;
while attempt < MAX_ATTEMPTS
&& response.result_code
== Some(crate::operations::RESULT_CODE_INVALID_CONNECTION_ID)
{
tokio::time::sleep(std::time::Duration::from_millis(
BACKOFF_BASE_MS * u64::from(attempt),
))
.await;
response = self
.create_subscription_once(max_queue_size, sample_interval)
.await?;
attempt += 1;
}
Ok(response)
}
async fn create_subscription_once(
&mut self,
max_queue_size: i64,
sample_interval: u64,
) -> Result<CreateSubscriptionResponse, ClientError> { ) -> Result<CreateSubscriptionResponse, ClientError> {
let body = build_create_subscription_request_body(max_queue_size, sample_interval); let body = build_create_subscription_request_body(max_queue_size, sample_interval);
let response = self let response = self
@@ -547,11 +578,43 @@ impl<T: AsyncRead + AsyncWrite + Unpin + Send> AsbClient<T> {
/// subscription. Uses [`MinimalMonitoredItem`] (Item + /// subscription. Uses [`MinimalMonitoredItem`] (Item +
/// SampleInterval + Buffered); optional fields are deferred to a /// SampleInterval + Buffered); optional fields are deferred to a
/// later F25 iteration. /// later F25 iteration.
///
/// Retries on `InvalidConnectionId` per the F31 pattern (10
/// attempts × 200·N ms backoff).
pub async fn add_monitored_items( pub async fn add_monitored_items(
&mut self, &mut self,
subscription_id: i64, subscription_id: i64,
items: &[MinimalMonitoredItem], items: &[MinimalMonitoredItem],
require_id: bool, require_id: bool,
) -> Result<AddMonitoredItemsResponse, ClientError> {
const MAX_ATTEMPTS: u32 = 10;
const BACKOFF_BASE_MS: u64 = 200;
let mut response = self
.add_monitored_items_once(subscription_id, items, require_id)
.await?;
let mut attempt = 1u32;
while attempt < MAX_ATTEMPTS
&& response.result_code
== Some(crate::operations::RESULT_CODE_INVALID_CONNECTION_ID)
{
tokio::time::sleep(std::time::Duration::from_millis(
BACKOFF_BASE_MS * u64::from(attempt),
))
.await;
response = self
.add_monitored_items_once(subscription_id, items, require_id)
.await?;
attempt += 1;
}
Ok(response)
}
async fn add_monitored_items_once(
&mut self,
subscription_id: i64,
items: &[MinimalMonitoredItem],
require_id: bool,
) -> Result<AddMonitoredItemsResponse, ClientError> { ) -> Result<AddMonitoredItemsResponse, ClientError> {
let body = build_add_monitored_items_request_body(subscription_id, items, require_id); let body = build_add_monitored_items_request_body(subscription_id, items, require_id);
let response = self let response = self
+81 -23
View File
@@ -849,51 +849,87 @@ impl MinimalMonitoredItem {
} }
} }
/// Decoded `CreateSubscriptionResponse`. Single field per /// Decoded `CreateSubscriptionResponse`. Single primitive field per
/// `AsbContracts.cs:225-230`. /// `AsbContracts.cs:225-230` plus the F31-style result_code/success
/// surface.
///
/// `subscription_id` is `0` when the server short-circuits on
/// `InvalidConnectionId` and never assigns one — check `result_code`
/// for `RESULT_CODE_INVALID_CONNECTION_ID` (1) before treating
/// `subscription_id` as valid.
#[derive(Debug, Clone, PartialEq)] #[derive(Debug, Clone, PartialEq)]
pub struct CreateSubscriptionResponse { pub struct CreateSubscriptionResponse {
pub subscription_id: i64, pub subscription_id: i64,
/// `Result.resultCodeField` from the response wrapper. `Some(1)` =
/// `InvalidConnectionId` — caller should retry. `None` if the
/// field wasn't present (the success path doesn't necessarily
/// emit it).
pub result_code: Option<u32>,
/// `Result.successField` — `false` means the operation failed
/// server-side and `subscription_id` is unset.
pub success: Option<bool>,
} }
/// Decode a `CreateSubscriptionResponse` SOAP body. Looks for an inline /// Decode a `CreateSubscriptionResponse` SOAP body.
/// `<SubscriptionId>` text element under the wrapper. ///
/// Tolerates a missing `<SubscriptionId>` element — that's how the
/// server signals an operation-level failure (`successField=false`
/// with non-zero `resultCodeField`). Mirrors the F31 tolerance
/// pattern. Caller inspects `result_code` / `success`.
pub fn decode_create_subscription_response( pub fn decode_create_subscription_response(
body_tokens: &[NbfxToken], body_tokens: &[NbfxToken],
dynamic: &mxaccess_asb_nettcp::nbfx::DynamicDictionary, dynamic: &mxaccess_asb_nettcp::nbfx::DynamicDictionary,
) -> Result<CreateSubscriptionResponse, OperationError> { ) -> Result<CreateSubscriptionResponse, OperationError> {
let id = find_inline_int64(body_tokens, "SubscriptionId", dynamic).ok_or( let subscription_id =
OperationError::MissingField { find_inline_int64(body_tokens, "SubscriptionId", dynamic).unwrap_or(0);
field: "SubscriptionId", let result_code = find_text_in_named_element(body_tokens, "resultCodeField")
}, .and_then(|s| s.parse().ok());
)?; let success = find_text_in_named_element(body_tokens, "successField")
.map(|s| s.eq_ignore_ascii_case("true"));
Ok(CreateSubscriptionResponse { Ok(CreateSubscriptionResponse {
subscription_id: id, subscription_id,
result_code,
success,
}) })
} }
/// Decoded `AddMonitoredItemsResponse`. `ItemCapabilities` is regular /// Decoded `AddMonitoredItemsResponse`. `ItemCapabilities` is regular
/// WCF XML (not the binary fast-path) — currently surfaces as a presence /// WCF XML (not the binary fast-path) — currently surfaces as a presence
/// flag, mirroring `RegisterItemsResponse`. /// flag, mirroring `RegisterItemsResponse`.
///
/// Tolerates empty / missing `<ASBIData />` Status payloads under the
/// F31 pattern; surfaces `result_code` / `success` so callers can
/// retry on `InvalidConnectionId`.
#[derive(Debug, Clone, PartialEq)] #[derive(Debug, Clone, PartialEq)]
pub struct AddMonitoredItemsResponse { pub struct AddMonitoredItemsResponse {
pub status: Vec<ItemStatus>, pub status: Vec<ItemStatus>,
pub item_capabilities_present: bool, pub item_capabilities_present: bool,
/// `Result.resultCodeField` from the response wrapper. `Some(1)` =
/// `InvalidConnectionId`.
pub result_code: Option<u32>,
/// `Result.successField` — `false` means the per-item Status array
/// is empty.
pub success: Option<bool>,
} }
pub fn decode_add_monitored_items_response( pub fn decode_add_monitored_items_response(
body_tokens: &[NbfxToken], body_tokens: &[NbfxToken],
) -> Result<AddMonitoredItemsResponse, OperationError> { ) -> Result<AddMonitoredItemsResponse, OperationError> {
let payloads = collect_asbidata_payloads(body_tokens); let payloads = collect_asbidata_payloads(body_tokens);
let status_payload = payloads let status = match payloads.into_iter().next() {
.into_iter() Some(payload) if !payload.is_empty() => decode_item_status_array(&payload)?,
.next() _ => Vec::new(),
.ok_or(OperationError::MissingField { field: "Status" })?; };
let status = decode_item_status_array(&status_payload)?;
let item_capabilities_present = find_element_named(body_tokens, "ItemCapabilities").is_some(); let item_capabilities_present = find_element_named(body_tokens, "ItemCapabilities").is_some();
let result_code = find_text_in_named_element(body_tokens, "resultCodeField")
.and_then(|s| s.parse().ok());
let success = find_text_in_named_element(body_tokens, "successField")
.map(|s| s.eq_ignore_ascii_case("true"));
Ok(AddMonitoredItemsResponse { Ok(AddMonitoredItemsResponse {
status, status,
item_capabilities_present, item_capabilities_present,
result_code,
success,
}) })
} }
@@ -2310,7 +2346,12 @@ mod tests {
} }
#[test] #[test]
fn create_subscription_response_missing_id_fails() { fn create_subscription_response_missing_id_returns_zero_sentinel() {
// Post-F33 the decoder is tolerant of missing SubscriptionId
// (that's how the server signals an InvalidConnectionId
// failure). It returns subscription_id=0 with result_code/
// success unset; callers inspect those for a real success
// signal.
use mxaccess_asb_nettcp::nbfx::DynamicDictionary; use mxaccess_asb_nettcp::nbfx::DynamicDictionary;
let body = vec![ let body = vec![
NbfxToken::Element { NbfxToken::Element {
@@ -2320,13 +2361,30 @@ mod tests {
NbfxToken::EndElement, NbfxToken::EndElement,
]; ];
let dict = DynamicDictionary::new(); let dict = DynamicDictionary::new();
let err = decode_create_subscription_response(&body, &dict).unwrap_err(); let response = decode_create_subscription_response(&body, &dict).unwrap();
assert!(matches!( assert_eq!(response.subscription_id, 0);
err, assert_eq!(response.result_code, None);
OperationError::MissingField { assert_eq!(response.success, None);
field: "SubscriptionId" }
}
)); #[test]
fn create_subscription_response_surfaces_invalid_connection_id() {
use mxaccess_asb_nettcp::nbfx::DynamicDictionary;
let body = synthesise_invalid_connection_id_body("CreateSubscriptionResponse");
let dict = DynamicDictionary::new();
let response = decode_create_subscription_response(&body, &dict).unwrap();
assert_eq!(response.subscription_id, 0);
assert_eq!(response.result_code, Some(1));
assert_eq!(response.success, Some(false));
}
#[test]
fn add_monitored_items_response_surfaces_invalid_connection_id() {
let body = synthesise_invalid_connection_id_body("AddMonitoredItemsResponse");
let response = decode_add_monitored_items_response(&body).unwrap();
assert!(response.status.is_empty());
assert_eq!(response.result_code, Some(1));
assert_eq!(response.success, Some(false));
} }
#[test] #[test]