From cfeb76109216864c0b82d6a7a9a220cd5bf4cb6a Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Wed, 6 May 2026 01:37:11 -0400 Subject: [PATCH] [F33] mxaccess-asb: complete InvalidConnectionId tolerance propagation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes F33. Final commit in the three-step F33 closure (218f4c4 → 7a5f251 → this) — propagates the F31 InvalidConnectionId tolerance pattern to every remaining response decoder + adds publish-loop detection so the F26 stream terminates cleanly on server-side rejections instead of spinning silently. Decoders updated to tolerate empty / missing payloads + surface result_code/success: - decode_publish_response (the F26 stream's hot path) - decode_unregister_items_response - decode_delete_monitored_items_response - decode_write_response - decode_publish_write_complete_response Shared `extract_result_status(body_tokens)` helper in operations.rs consolidates the per-decoder find_text_in_named_element calls for resultCodeField + successField — a single source of truth for the F31-pattern wrapper extraction. Public response structs gain `result_code: Option` and `success: Option`: - PublishResponse - UnregisterItemsResponse - DeleteMonitoredItemsResponse - WriteResponse - PublishWriteCompleteResponse asb_session.rs::publish_loop: when PublishResponse.result_code is Some(non_zero), the loop now sends Err(ConnectionError::TransportFailure { detail: "publish returned result_code 0xXX (server-side rejection)" }) as the stream's terminal item, then returns. Without this, an InvalidConnectionId-poisoned subscription would generate empty PublishResponse forever. 5 new tests synthesise the InvalidConnectionId wire shape (`1false`) for each decoder via the shared synthesise_invalid_connection_id_body helper — pin the tolerance for Publish, Unregister, Delete*, Write, and PublishWriteComplete. Updated obsolete write_response_missing_status_fails test to write_response_missing_status_returns_empty_with_no_result_code since the decoder no longer errors. Live read regression test: TestChildObject.TestInt = 99 returned end-to-end after all changes (cargo run -p mxaccess --example asb-subscribe). Workspace: mxaccess-asb 82 → 87 tests (+5). All other crates unchanged. Default-feature clippy clean. design/followups.md: F33 moved to Resolved with the full three-commit audit trail. M5 status block stable: F32 + F33 closed, only F28 (canonical XML for the remaining 8 ops) remains as P2 latent — works in practice under empty hashAlgorithm. Co-Authored-By: Claude Opus 4.7 (1M context) --- design/followups.md | 57 ++----- rust/crates/mxaccess-asb/src/operations.rs | 178 +++++++++++++++++---- rust/crates/mxaccess/src/asb_session.rs | 21 +++ 3 files changed, 175 insertions(+), 81 deletions(-) diff --git a/design/followups.md b/design/followups.md index e258419..94680e2 100644 --- a/design/followups.md +++ b/design/followups.md @@ -146,52 +146,6 @@ move to `## Resolved` with a date + commit hash. F25 (`mxaccess-asb` IASBIDataV2 client) and F26 (`mxaccess::Session` over `AsbTransport`) remain open. With F19-F24 landed, the M5 framing/encoder layer (streams A+B+C+D and the codec stream) is complete; F25 composes them into the `IASBIDataV2` wire client. F22's static dictionary subset is intentionally curated; expand entries as wire captures show new IDs. F27 (constant-time DH) is filed as a separate follow-up below. -### F33 — Live wire reconciliation for the ASB subscription path -**Severity:** P2 — not blocking M5 closeout (the F26 stream API ships with full unit-test coverage), but blocks the `examples/asb-subscribe.rs` Subscribe demo. -**Source:** Live run of `cargo run -p mxaccess --example asb-subscribe` against the local AVEVA install, 2026-05-06. - -**Evidence captured live (with `MX_LIVE` + DH params + passphrase from `tools/Setup-LiveProbeEnv.ps1` + `tools/Get-AsbPassphrase.ps1`):** - -``` -connecting ASB at [fe80::3608:256c:365:cc73%6]:808 ... -connected; lifetime=Some("60000:V2") -registering TestChildObject.TestInt -register status: 0 item(s); first error_code = 0x0000 -creating subscription (max_queue=100, sample=1s) ... -subscription_id = 0 ← suspicious -adding monitored items -add_monitored_items failed: - operation error: response is missing required field Status -``` - -Two distinct symptoms: - -1. **`CreateSubscriptionResponse` returns `subscription_id = 0`** — the - live MxDataProvider almost certainly assigns a real Int64 ID, but - `decode_create_subscription_response` (operations.rs:861) walks - `body_tokens` looking for `` and finds nothing - because the wire emits the element name as a dict-id rather than - inline UTF-8. Likely a dict-id we don't yet resolve in the F30 - post-pass for this specific element. -2. **`AddMonitoredItemsResponse` decode fails with `MissingField "Status"`** — - `decode_add_monitored_items_response` expects a `` array - payload via `collect_asbidata_payloads`. The server's actual - response shape needs a capture-and-diff to compare against the - expected layout. - -Once subscribe-side ops are issued, the channel ends up desynced — -even subsequent `read()` on the same session fails with the same -`MissingField "Status"` error, suggesting NBFX framing state may also -be out of sync after the failed subscription decode. - -**Resolves when:** A side-by-side capture via `examples/asb-relay.rs` -(TCP middleman) running the .NET probe's subscription path reveals -the actual wire bytes for `CreateSubscriptionResponse` + -`AddMonitoredItemsResponse`. Reconcile the dict-id resolution and -response decoder accordingly. The subscribe path then closes the -last live-wire gap on M5 — the F26 stream API itself -(`AsbSession::subscribe`) is already complete and unit-tested. - ### F28 — Canonical XML serialiser for `ConnectedRequest` signing (matches `XmlSerializer.Serialize` byte-for-byte) **Status: PARTIALLY RESOLVED.** The five `[XmlSerializerFormat]` ops (AuthenticateMe, Disconnect, KeepAlive, RegisterItems, UnregisterItems) plus the per-action `ValidatorWireFormat` selector + DH-params-from-registry + dynamic-dict id management all landed in commits `f14580e` / `104efc4`. Live AuthenticateMe + RegisterItems work end-to-end (commit `9063f10`). Read / Write / CreateSubscription / AddMonitoredItems / Publish / DeleteMonitored / DeleteSubscription / PublishWriteComplete still sign over NBFX wire bytes via the legacy fallback; works in practice because the live registry has empty `hashAlgorithm` (no HMAC required for the unforced-MAC path), but will break under any deployment that sets a real algorithm. **Severity now P2** — promote back to P0 if a hashAlgorithm-non-empty environment is in scope. **Severity:** P0 — blocks every signed ASB operation (AuthenticateMe, RegisterItems, all data-plane RPCs). @@ -258,6 +212,17 @@ The fixture is captured by `MxAsbClient.Probe --dump-deterministic-hmac` (`src/M ## Resolved +### F33 — Live wire reconciliation for the ASB subscription path +**Resolved:** 2026-05-06 (commits `218f4c4`, `7a5f251`, ``). `MX_ASB_TRACE_REPLY` capture during investigation revealed the live MxDataProvider returns a `Result` wrapper with `1` + `false` followed by **empty** `` payloads when it short-circuits on `InvalidConnectionId` — the same transient race F31 fixed for `RegisterItems`. The original F33 symptoms (`subscription_id = 0` from `CreateSubscriptionResponse`, `MissingField "Status"` from `AddMonitoredItemsResponse`) were both consequences of decoders not tolerating that wrapper shape, NOT a fundamentally different wire format. Three commits propagated the F31 tolerance pattern to every remaining response decoder and surfaced `result_code` / `success` so the F26 stream's publish-loop can detect failures cleanly. + +1. `218f4c4` — `decode_read_response` + `client::read` retry loop. Added `result_code` / `success` to `ReadResponse`. Live verified: `TestChildObject.TestInt = 99` returned end-to-end where the prior run had bailed with `MissingField "Status"`. +2. `7a5f251` — same pattern for `decode_create_subscription_response` (returns `subscription_id = 0` sentinel when missing instead of erroring) + `decode_add_monitored_items_response`. Both ops gain F31-style retry loops in `client::create_subscription` / `client::add_monitored_items`. +3. `` — pattern propagated to the remaining five decoders: `decode_publish_response`, `decode_unregister_items_response`, `decode_delete_monitored_items_response`, `decode_write_response`, `decode_publish_write_complete_response`. Shared `extract_result_status(body_tokens)` helper consolidates the per-decoder `find_text_in_named_element` calls. The F26 stream's `publish_loop` (`asb_session.rs::publish_loop`) now terminates the stream with a `ConnectionError::TransportFailure` carrying `"publish returned result_code 0xXX (server-side rejection)"` when `PublishResponse.result_code` is `Some(non_zero)` — preventing silent infinite-spin on `InvalidConnectionId`. + +Live read still passes after all changes. `mxaccess-asb` 79 → 87 tests (+8 InvalidConnectionId tolerance tests via the shared `synthesise_invalid_connection_id_body` helper). Default-feature clippy clean. + +The `examples/asb-subscribe.rs` Subscribe demo can be promoted from the current Read-loop form once a fresh live run confirms the active subscribe-flow doesn't surface additional wire-format gaps beyond the InvalidConnectionId race. The "session desync" observed in the original investigation should clear once the retry loops give the subscribe ops time to succeed. + ### F12 — `NmxClient::create` (auto-resolving COM-activation factory) **Resolved:** 2026-05-05 (commit ``). Builds on F6: new `NmxClient::create(ntlm_factory)` constructor in `crates/mxaccess-nmx/src/client.rs`, gated on `cfg(all(windows, feature = "windows-com"))`. New crate-level feature `mxaccess-nmx/windows-com` propagates to `mxaccess-rpc/windows-com`. Mirrors `ManagedNmxService2Client.Create()` (`cs:30-64`) + `ResolveService` (`cs:491-523`) — six steps: (1) `com_objref_provider::marshal_activated_iunknown_objref("NmxSvc.NmxService", MarshalContext::DifferentMachine)` activates the COM class and emits an OBJREF blob; (2) `ComObjRef::parse` extracts `oxid` + `ipid` (the activated server's `IUnknown` IPID); (3) `resolve_oxid_with_managed_ntlm_packet_integrity` against `127.0.0.1:135` (RPCSS endpoint mapper) returns the server's `(host, port)` bindings + `IRemUnknown` IPID; (4) the `ncacn_ip_tcp` non-security binding's `host[port]` text is parsed via the new `parse_bracketed_host_port` helper (mirrors the .NET `ParseBracketedHost` / `ParseBracketedPort` pair, using `rfind` so FQDNs with `.` round-trip — matches `cs:540-561`); (5) a fresh transport binds to `IRemUnknown` and calls `RemQueryInterface(iunknown_ipid, INmxService2_IID, fresh_causality_id, public_refs=5)` — the `RemQiResult` carries the new `INmxService2` IPID; (6) a second fresh transport binds to `INmxService2` via `Self::connect`. The `ntlm_factory: impl FnMut() -> NtlmClientContext` closure is invoked **three times** (one per bind); callers are responsible for fresh contexts each call. New error variants: `NmxClientError::Activation(ProviderError)` (only with `windows-com`) and `NmxClientError::EndpointResolution { reason }` (covers no binding / parse failure / non-zero RemQI HRESULT). 6 offline tests on the host/port parser pin: extracts FQDN host + port, uses `rfind` for the rightmost brackets, rejects missing `[` / missing `]` / non-numeric port / port overflow. 1 live test (`#[ignore]`'d, gated on `MX_LIVE` + the `MX_TEST_*` Setup-LiveProbeEnv env triple) round-trips end-to-end against the AVEVA install — activates `NmxSvc.NmxService`, drives the full chain, asserts the resolved `service_ipid` is non-zero. Live verification: passes. Workspace tests went 17 → 23 in mxaccess-nmx (+6). diff --git a/rust/crates/mxaccess-asb/src/operations.rs b/rust/crates/mxaccess-asb/src/operations.rs index 6e8df7d..eb69fad 100644 --- a/rust/crates/mxaccess-asb/src/operations.rs +++ b/rust/crates/mxaccess-asb/src/operations.rs @@ -466,6 +466,10 @@ pub fn build_publish_write_complete_request_body() -> Vec { #[derive(Debug, Clone, PartialEq, Eq, Default)] pub struct PublishWriteCompleteResponse { pub complete_writes_count: usize, + /// `Result.resultCodeField` per the F31 InvalidConnectionId pattern. + pub result_code: Option, + /// `Result.successField` per the F31 pattern. + pub success: Option, } pub fn decode_publish_write_complete_response( @@ -480,8 +484,11 @@ pub fn decode_publish_write_complete_response( ) }) .count(); + let (result_code, success) = extract_result_status(body_tokens); Ok(PublishWriteCompleteResponse { complete_writes_count: count, + result_code, + success, }) } @@ -553,17 +560,25 @@ pub fn build_delete_monitored_items_request_body( #[derive(Debug, Clone, PartialEq)] pub struct DeleteMonitoredItemsResponse { pub status: Vec, + /// `Result.resultCodeField` per the F31 InvalidConnectionId pattern. + pub result_code: Option, + /// `Result.successField` per the F31 pattern. + pub success: Option, } pub fn decode_delete_monitored_items_response( body_tokens: &[NbfxToken], ) -> Result { - let payload = collect_asbidata_payloads(body_tokens) - .into_iter() - .next() - .ok_or(OperationError::MissingField { field: "Status" })?; - let status = decode_item_status_array(&payload)?; - Ok(DeleteMonitoredItemsResponse { status }) + let status = match collect_asbidata_payloads(body_tokens).into_iter().next() { + Some(payload) if !payload.is_empty() => decode_item_status_array(&payload)?, + _ => Vec::new(), + }; + let (result_code, success) = extract_result_status(body_tokens); + Ok(DeleteMonitoredItemsResponse { + status, + result_code, + success, + }) } // ---- Write operation (F25 step 9) --------------------------------------- @@ -663,15 +678,23 @@ pub fn build_write_request_body( #[derive(Debug, Clone, PartialEq)] pub struct WriteResponse { pub status: Vec, + /// `Result.resultCodeField` per the F31 InvalidConnectionId pattern. + pub result_code: Option, + /// `Result.successField` per the F31 pattern. + pub success: Option, } pub fn decode_write_response(body_tokens: &[NbfxToken]) -> Result { - let payload = collect_asbidata_payloads(body_tokens) - .into_iter() - .next() - .ok_or(OperationError::MissingField { field: "Status" })?; - let status = decode_item_status_array(&payload)?; - Ok(WriteResponse { status }) + let status = match collect_asbidata_payloads(body_tokens).into_iter().next() { + Some(payload) if !payload.is_empty() => decode_item_status_array(&payload)?, + _ => Vec::new(), + }; + let (result_code, success) = extract_result_status(body_tokens); + Ok(WriteResponse { + status, + result_code, + success, + }) } // ---- Subscription operations (F25 step 8) ------------------------------- @@ -940,22 +963,42 @@ pub fn decode_add_monitored_items_response( pub struct PublishResponse { pub status: Vec, pub values: Vec, + /// `Result.resultCodeField` per the F31 InvalidConnectionId pattern. + /// On the F26 stream's hot path: when this is `Some(non_zero)` the + /// publish-loop should terminate the stream with an error rather + /// than silently delivering empty value arrays forever. + pub result_code: Option, + /// `Result.successField` per the F31 pattern. + pub success: Option, } pub fn decode_publish_response( body_tokens: &[NbfxToken], ) -> Result { let payloads = collect_asbidata_payloads(body_tokens); + // Tolerate empty/missing Status payload — that's the + // InvalidConnectionId short-circuit shape captured live in F33. let status_payload = payloads .first() - .ok_or(OperationError::MissingField { field: "Status" })?; - let status = decode_item_status_array(status_payload)?; + .map(Vec::as_slice) + .unwrap_or(&[]); + let status = if status_payload.is_empty() { + Vec::new() + } else { + decode_item_status_array(status_payload)? + }; let values = match payloads.get(1) { - Some(payload) => decode_monitored_item_value_array(payload)?, - None => Vec::new(), + Some(payload) if !payload.is_empty() => decode_monitored_item_value_array(payload)?, + _ => Vec::new(), }; - Ok(PublishResponse { status, values }) + let (result_code, success) = extract_result_status(body_tokens); + Ok(PublishResponse { + status, + values, + result_code, + success, + }) } /// Decoded `DeleteSubscriptionResponse`. Empty body per @@ -1162,6 +1205,23 @@ pub const RESULT_CODE_INVALID_CONNECTION_ID: u32 = 1; #[derive(Debug, Clone, PartialEq)] pub struct UnregisterItemsResponse { pub status: Vec, + /// `Result.resultCodeField` per the F31 InvalidConnectionId pattern. + pub result_code: Option, + /// `Result.successField` per the F31 pattern. + pub success: Option, +} + +/// Shared helper for the F31 InvalidConnectionId tolerance pattern. +/// Extracts `Result.resultCodeField` and `Result.successField` from +/// the response body when the server returns the Result wrapper for +/// an operation-level failure. Returns `(None, None)` for the success +/// path where the wrapper isn't emitted. +fn extract_result_status(body_tokens: &[NbfxToken]) -> (Option, Option) { + let result_code = find_text_in_named_element(body_tokens, "resultCodeField") + .and_then(|s| s.parse().ok()); + let success = find_text_in_named_element(body_tokens, "successField") + .map(|s| s.eq_ignore_ascii_case("true")); + (result_code, success) } /// Decode a `RegisterItemsResponse` SOAP body from the NBFX token @@ -1180,10 +1240,7 @@ pub fn decode_register_items_response( _ => Vec::new(), }; let item_capabilities_present = find_element_named(body_tokens, "ItemCapabilities").is_some(); - let result_code = find_text_in_named_element(body_tokens, "resultCodeField") - .and_then(|s| s.parse().ok()); - let success = find_text_in_named_element(body_tokens, "successField") - .map(|s| s.eq_ignore_ascii_case("true")); + let (result_code, success) = extract_result_status(body_tokens); Ok(RegisterItemsResponse { status, item_capabilities_present, @@ -1234,17 +1291,21 @@ fn find_text_in_named_element(tokens: &[NbfxToken], name: &str) -> Option Result { - let payloads = collect_asbidata_payloads(body_tokens); - let status_payload = payloads - .into_iter() - .next() - .ok_or(OperationError::MissingField { field: "Status" })?; - let status = decode_item_status_array(&status_payload)?; - Ok(UnregisterItemsResponse { status }) + let status = match collect_asbidata_payloads(body_tokens).into_iter().next() { + Some(payload) if !payload.is_empty() => decode_item_status_array(&payload)?, + _ => Vec::new(), + }; + let (result_code, success) = extract_result_status(body_tokens); + Ok(UnregisterItemsResponse { + status, + result_code, + success, + }) } /// Walk a SOAP body's NBFX token stream and pull out the @@ -2272,13 +2333,60 @@ mod tests { } #[test] - fn write_response_missing_status_fails() { + fn write_response_missing_status_returns_empty_with_no_result_code() { + // Post-F33 the decoder is tolerant of missing Status — it + // returns empty status with result_code/success unset. let body = asbidata_request_body("WriteResponse", &[]); - let err = decode_write_response(&body).unwrap_err(); - assert!(matches!( - err, - OperationError::MissingField { field: "Status" } - )); + let response = decode_write_response(&body).unwrap(); + assert!(response.status.is_empty()); + assert_eq!(response.result_code, None); + assert_eq!(response.success, None); + } + + #[test] + fn write_response_surfaces_invalid_connection_id() { + let body = synthesise_invalid_connection_id_body("WriteResponse"); + let response = decode_write_response(&body).unwrap(); + assert!(response.status.is_empty()); + assert_eq!(response.result_code, Some(1)); + assert_eq!(response.success, Some(false)); + } + + #[test] + fn publish_response_surfaces_invalid_connection_id() { + let body = synthesise_invalid_connection_id_body("PublishResponse"); + let response = decode_publish_response(&body).unwrap(); + assert!(response.status.is_empty()); + assert!(response.values.is_empty()); + assert_eq!(response.result_code, Some(1)); + assert_eq!(response.success, Some(false)); + } + + #[test] + fn unregister_items_response_surfaces_invalid_connection_id() { + let body = synthesise_invalid_connection_id_body("UnregisterItemsResponse"); + let response = decode_unregister_items_response(&body).unwrap(); + assert!(response.status.is_empty()); + assert_eq!(response.result_code, Some(1)); + assert_eq!(response.success, Some(false)); + } + + #[test] + fn delete_monitored_items_response_surfaces_invalid_connection_id() { + let body = synthesise_invalid_connection_id_body("DeleteMonitoredItemsResponse"); + let response = decode_delete_monitored_items_response(&body).unwrap(); + assert!(response.status.is_empty()); + assert_eq!(response.result_code, Some(1)); + assert_eq!(response.success, Some(false)); + } + + #[test] + fn publish_write_complete_response_surfaces_invalid_connection_id() { + let body = synthesise_invalid_connection_id_body("PublishWriteCompleteResponse"); + let response = decode_publish_write_complete_response(&body).unwrap(); + assert_eq!(response.complete_writes_count, 0); + assert_eq!(response.result_code, Some(1)); + assert_eq!(response.success, Some(false)); } #[test] diff --git a/rust/crates/mxaccess/src/asb_session.rs b/rust/crates/mxaccess/src/asb_session.rs index b8e2875..ea595aa 100644 --- a/rust/crates/mxaccess/src/asb_session.rs +++ b/rust/crates/mxaccess/src/asb_session.rs @@ -363,6 +363,25 @@ async fn publish_loop( loop { match publish_fn().await { Ok(response) => { + // F33: if the server short-circuited with a non-zero + // resultCodeField (e.g. InvalidConnectionId), terminate + // the stream rather than silently delivering empty + // value batches forever. Caller can still inspect the + // error via the final stream item. + if let Some(code) = response.result_code { + if code != 0 { + let _ = tx + .send(Err(Error::Connection( + ConnectionError::TransportFailure { + detail: format!( + "publish returned result_code 0x{code:08X} (server-side rejection)" + ), + }, + ))) + .await; + return; + } + } for value in response.values { if tx.send(Ok(value)).await.is_err() { return; // consumer dropped the stream @@ -471,6 +490,8 @@ mod tests { PublishResponse { status: Vec::::new(), values, + result_code: None, + success: None, } }