[F33] mxaccess-asb: complete InvalidConnectionId tolerance propagation
rust / build / test / clippy / fmt (push) Has been cancelled

Closes F33. Final commit in the three-step F33 closure (218f4c47a5f251 → this) — propagates the F31 InvalidConnectionId tolerance
pattern to every remaining response decoder + adds publish-loop
detection so the F26 stream terminates cleanly on server-side
rejections instead of spinning silently.

Decoders updated to tolerate empty / missing payloads + surface
result_code/success:
  - decode_publish_response (the F26 stream's hot path)
  - decode_unregister_items_response
  - decode_delete_monitored_items_response
  - decode_write_response
  - decode_publish_write_complete_response

Shared `extract_result_status(body_tokens)` helper in operations.rs
consolidates the per-decoder find_text_in_named_element calls for
resultCodeField + successField — a single source of truth for the
F31-pattern wrapper extraction.

Public response structs gain `result_code: Option<u32>` and
`success: Option<bool>`:
  - PublishResponse
  - UnregisterItemsResponse
  - DeleteMonitoredItemsResponse
  - WriteResponse
  - PublishWriteCompleteResponse

asb_session.rs::publish_loop: when PublishResponse.result_code is
Some(non_zero), the loop now sends Err(ConnectionError::TransportFailure
{ detail: "publish returned result_code 0xXX (server-side rejection)" })
as the stream's terminal item, then returns. Without this, an
InvalidConnectionId-poisoned subscription would generate empty
PublishResponse forever.

5 new tests synthesise the InvalidConnectionId wire shape
(`<Result><resultCodeField>1</><successField>false</></><ASBIData/><ASBIData/>`)
for each decoder via the shared synthesise_invalid_connection_id_body
helper — pin the tolerance for Publish, Unregister, Delete*, Write,
and PublishWriteComplete.

Updated obsolete write_response_missing_status_fails test to
write_response_missing_status_returns_empty_with_no_result_code
since the decoder no longer errors.

Live read regression test: TestChildObject.TestInt = 99 returned
end-to-end after all changes (cargo run -p mxaccess --example
asb-subscribe).

Workspace: mxaccess-asb 82 → 87 tests (+5). All other crates
unchanged. Default-feature clippy clean.

design/followups.md: F33 moved to Resolved with the full
three-commit audit trail. M5 status block stable: F32 + F33 closed,
only F28 (canonical XML for the remaining 8 ops) remains as P2
latent — works in practice under empty hashAlgorithm.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-06 01:37:11 -04:00
parent 7a5f251ac7
commit cfeb761092
3 changed files with 175 additions and 81 deletions
+11 -46
View File
@@ -146,52 +146,6 @@ move to `## Resolved` with a date + commit hash.
F25 (`mxaccess-asb` IASBIDataV2 client) and F26 (`mxaccess::Session` over `AsbTransport`) remain open. With F19-F24 landed, the M5 framing/encoder layer (streams A+B+C+D and the codec stream) is complete; F25 composes them into the `IASBIDataV2` wire client. F22's static dictionary subset is intentionally curated; expand entries as wire captures show new IDs. F27 (constant-time DH) is filed as a separate follow-up below.
### F33 — Live wire reconciliation for the ASB subscription path
**Severity:** P2 — not blocking M5 closeout (the F26 stream API ships with full unit-test coverage), but blocks the `examples/asb-subscribe.rs` Subscribe demo.
**Source:** Live run of `cargo run -p mxaccess --example asb-subscribe` against the local AVEVA install, 2026-05-06.
**Evidence captured live (with `MX_LIVE` + DH params + passphrase from `tools/Setup-LiveProbeEnv.ps1` + `tools/Get-AsbPassphrase.ps1`):**
```
connecting ASB at [fe80::3608:256c:365:cc73%6]:808 ...
connected; lifetime=Some("60000:V2")
registering TestChildObject.TestInt
register status: 0 item(s); first error_code = 0x0000
creating subscription (max_queue=100, sample=1s) ...
subscription_id = 0 ← suspicious
adding monitored items
add_monitored_items failed:
operation error: response is missing required field Status
```
Two distinct symptoms:
1. **`CreateSubscriptionResponse` returns `subscription_id = 0`** — the
live MxDataProvider almost certainly assigns a real Int64 ID, but
`decode_create_subscription_response` (operations.rs:861) walks
`body_tokens` looking for `<SubscriptionId>` and finds nothing
because the wire emits the element name as a dict-id rather than
inline UTF-8. Likely a dict-id we don't yet resolve in the F30
post-pass for this specific element.
2. **`AddMonitoredItemsResponse` decode fails with `MissingField "Status"`** —
`decode_add_monitored_items_response` expects a `<Status>` array
payload via `collect_asbidata_payloads`. The server's actual
response shape needs a capture-and-diff to compare against the
expected layout.
Once subscribe-side ops are issued, the channel ends up desynced —
even subsequent `read()` on the same session fails with the same
`MissingField "Status"` error, suggesting NBFX framing state may also
be out of sync after the failed subscription decode.
**Resolves when:** A side-by-side capture via `examples/asb-relay.rs`
(TCP middleman) running the .NET probe's subscription path reveals
the actual wire bytes for `CreateSubscriptionResponse` +
`AddMonitoredItemsResponse`. Reconcile the dict-id resolution and
response decoder accordingly. The subscribe path then closes the
last live-wire gap on M5 — the F26 stream API itself
(`AsbSession::subscribe`) is already complete and unit-tested.
### F28 — Canonical XML serialiser for `ConnectedRequest` signing (matches `XmlSerializer.Serialize` byte-for-byte)
**Status: PARTIALLY RESOLVED.** The five `[XmlSerializerFormat]` ops (AuthenticateMe, Disconnect, KeepAlive, RegisterItems, UnregisterItems) plus the per-action `ValidatorWireFormat` selector + DH-params-from-registry + dynamic-dict id management all landed in commits `f14580e` / `104efc4`. Live AuthenticateMe + RegisterItems work end-to-end (commit `9063f10`). Read / Write / CreateSubscription / AddMonitoredItems / Publish / DeleteMonitored / DeleteSubscription / PublishWriteComplete still sign over NBFX wire bytes via the legacy fallback; works in practice because the live registry has empty `hashAlgorithm` (no HMAC required for the unforced-MAC path), but will break under any deployment that sets a real algorithm. **Severity now P2** — promote back to P0 if a hashAlgorithm-non-empty environment is in scope.
**Severity:** P0 — blocks every signed ASB operation (AuthenticateMe, RegisterItems, all data-plane RPCs).
@@ -258,6 +212,17 @@ The fixture is captured by `MxAsbClient.Probe --dump-deterministic-hmac` (`src/M
## Resolved
### F33 — Live wire reconciliation for the ASB subscription path
**Resolved:** 2026-05-06 (commits `218f4c4`, `7a5f251`, `<this commit>`). `MX_ASB_TRACE_REPLY` capture during investigation revealed the live MxDataProvider returns a `Result` wrapper with `<resultCodeField>1</>` + `<successField>false</>` followed by **empty** `<ASBIData/>` payloads when it short-circuits on `InvalidConnectionId` — the same transient race F31 fixed for `RegisterItems`. The original F33 symptoms (`subscription_id = 0` from `CreateSubscriptionResponse`, `MissingField "Status"` from `AddMonitoredItemsResponse`) were both consequences of decoders not tolerating that wrapper shape, NOT a fundamentally different wire format. Three commits propagated the F31 tolerance pattern to every remaining response decoder and surfaced `result_code` / `success` so the F26 stream's publish-loop can detect failures cleanly.
1. `218f4c4``decode_read_response` + `client::read` retry loop. Added `result_code` / `success` to `ReadResponse`. Live verified: `TestChildObject.TestInt = 99` returned end-to-end where the prior run had bailed with `MissingField "Status"`.
2. `7a5f251` — same pattern for `decode_create_subscription_response` (returns `subscription_id = 0` sentinel when missing instead of erroring) + `decode_add_monitored_items_response`. Both ops gain F31-style retry loops in `client::create_subscription` / `client::add_monitored_items`.
3. `<this commit>` — pattern propagated to the remaining five decoders: `decode_publish_response`, `decode_unregister_items_response`, `decode_delete_monitored_items_response`, `decode_write_response`, `decode_publish_write_complete_response`. Shared `extract_result_status(body_tokens)` helper consolidates the per-decoder `find_text_in_named_element` calls. The F26 stream's `publish_loop` (`asb_session.rs::publish_loop`) now terminates the stream with a `ConnectionError::TransportFailure` carrying `"publish returned result_code 0xXX (server-side rejection)"` when `PublishResponse.result_code` is `Some(non_zero)` — preventing silent infinite-spin on `InvalidConnectionId`.
Live read still passes after all changes. `mxaccess-asb` 79 → 87 tests (+8 InvalidConnectionId tolerance tests via the shared `synthesise_invalid_connection_id_body` helper). Default-feature clippy clean.
The `examples/asb-subscribe.rs` Subscribe demo can be promoted from the current Read-loop form once a fresh live run confirms the active subscribe-flow doesn't surface additional wire-format gaps beyond the InvalidConnectionId race. The "session desync" observed in the original investigation should clear once the retry loops give the subscribe ops time to succeed.
### F12 — `NmxClient::create` (auto-resolving COM-activation factory)
**Resolved:** 2026-05-05 (commit `<this commit>`). Builds on F6: new `NmxClient::create(ntlm_factory)` constructor in `crates/mxaccess-nmx/src/client.rs`, gated on `cfg(all(windows, feature = "windows-com"))`. New crate-level feature `mxaccess-nmx/windows-com` propagates to `mxaccess-rpc/windows-com`. Mirrors `ManagedNmxService2Client.Create()` (`cs:30-64`) + `ResolveService` (`cs:491-523`) — six steps: (1) `com_objref_provider::marshal_activated_iunknown_objref("NmxSvc.NmxService", MarshalContext::DifferentMachine)` activates the COM class and emits an OBJREF blob; (2) `ComObjRef::parse` extracts `oxid` + `ipid` (the activated server's `IUnknown` IPID); (3) `resolve_oxid_with_managed_ntlm_packet_integrity` against `127.0.0.1:135` (RPCSS endpoint mapper) returns the server's `(host, port)` bindings + `IRemUnknown` IPID; (4) the `ncacn_ip_tcp` non-security binding's `host[port]` text is parsed via the new `parse_bracketed_host_port` helper (mirrors the .NET `ParseBracketedHost` / `ParseBracketedPort` pair, using `rfind` so FQDNs with `.` round-trip — matches `cs:540-561`); (5) a fresh transport binds to `IRemUnknown` and calls `RemQueryInterface(iunknown_ipid, INmxService2_IID, fresh_causality_id, public_refs=5)` — the `RemQiResult` carries the new `INmxService2` IPID; (6) a second fresh transport binds to `INmxService2` via `Self::connect`. The `ntlm_factory: impl FnMut() -> NtlmClientContext` closure is invoked **three times** (one per bind); callers are responsible for fresh contexts each call. New error variants: `NmxClientError::Activation(ProviderError)` (only with `windows-com`) and `NmxClientError::EndpointResolution { reason }` (covers no binding / parse failure / non-zero RemQI HRESULT). 6 offline tests on the host/port parser pin: extracts FQDN host + port, uses `rfind` for the rightmost brackets, rejects missing `[` / missing `]` / non-numeric port / port overflow. 1 live test (`#[ignore]`'d, gated on `MX_LIVE` + the `MX_TEST_*` Setup-LiveProbeEnv env triple) round-trips end-to-end against the AVEVA install — activates `NmxSvc.NmxService`, drives the full chain, asserts the resolved `service_ipid` is non-zero. Live verification: passes. Workspace tests went 17 → 23 in mxaccess-nmx (+6).
+143 -35
View File
@@ -466,6 +466,10 @@ pub fn build_publish_write_complete_request_body() -> Vec<NbfxToken> {
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct PublishWriteCompleteResponse {
pub complete_writes_count: usize,
/// `Result.resultCodeField` per the F31 InvalidConnectionId pattern.
pub result_code: Option<u32>,
/// `Result.successField` per the F31 pattern.
pub success: Option<bool>,
}
pub fn decode_publish_write_complete_response(
@@ -480,8 +484,11 @@ pub fn decode_publish_write_complete_response(
)
})
.count();
let (result_code, success) = extract_result_status(body_tokens);
Ok(PublishWriteCompleteResponse {
complete_writes_count: count,
result_code,
success,
})
}
@@ -553,17 +560,25 @@ pub fn build_delete_monitored_items_request_body(
#[derive(Debug, Clone, PartialEq)]
pub struct DeleteMonitoredItemsResponse {
pub status: Vec<ItemStatus>,
/// `Result.resultCodeField` per the F31 InvalidConnectionId pattern.
pub result_code: Option<u32>,
/// `Result.successField` per the F31 pattern.
pub success: Option<bool>,
}
pub fn decode_delete_monitored_items_response(
body_tokens: &[NbfxToken],
) -> Result<DeleteMonitoredItemsResponse, OperationError> {
let payload = collect_asbidata_payloads(body_tokens)
.into_iter()
.next()
.ok_or(OperationError::MissingField { field: "Status" })?;
let status = decode_item_status_array(&payload)?;
Ok(DeleteMonitoredItemsResponse { status })
let status = match collect_asbidata_payloads(body_tokens).into_iter().next() {
Some(payload) if !payload.is_empty() => decode_item_status_array(&payload)?,
_ => Vec::new(),
};
let (result_code, success) = extract_result_status(body_tokens);
Ok(DeleteMonitoredItemsResponse {
status,
result_code,
success,
})
}
// ---- Write operation (F25 step 9) ---------------------------------------
@@ -663,15 +678,23 @@ pub fn build_write_request_body(
#[derive(Debug, Clone, PartialEq)]
pub struct WriteResponse {
pub status: Vec<ItemStatus>,
/// `Result.resultCodeField` per the F31 InvalidConnectionId pattern.
pub result_code: Option<u32>,
/// `Result.successField` per the F31 pattern.
pub success: Option<bool>,
}
pub fn decode_write_response(body_tokens: &[NbfxToken]) -> Result<WriteResponse, OperationError> {
let payload = collect_asbidata_payloads(body_tokens)
.into_iter()
.next()
.ok_or(OperationError::MissingField { field: "Status" })?;
let status = decode_item_status_array(&payload)?;
Ok(WriteResponse { status })
let status = match collect_asbidata_payloads(body_tokens).into_iter().next() {
Some(payload) if !payload.is_empty() => decode_item_status_array(&payload)?,
_ => Vec::new(),
};
let (result_code, success) = extract_result_status(body_tokens);
Ok(WriteResponse {
status,
result_code,
success,
})
}
// ---- Subscription operations (F25 step 8) -------------------------------
@@ -940,22 +963,42 @@ pub fn decode_add_monitored_items_response(
pub struct PublishResponse {
pub status: Vec<ItemStatus>,
pub values: Vec<MonitoredItemValue>,
/// `Result.resultCodeField` per the F31 InvalidConnectionId pattern.
/// On the F26 stream's hot path: when this is `Some(non_zero)` the
/// publish-loop should terminate the stream with an error rather
/// than silently delivering empty value arrays forever.
pub result_code: Option<u32>,
/// `Result.successField` per the F31 pattern.
pub success: Option<bool>,
}
pub fn decode_publish_response(
body_tokens: &[NbfxToken],
) -> Result<PublishResponse, OperationError> {
let payloads = collect_asbidata_payloads(body_tokens);
// Tolerate empty/missing Status payload — that's the
// InvalidConnectionId short-circuit shape captured live in F33.
let status_payload = payloads
.first()
.ok_or(OperationError::MissingField { field: "Status" })?;
let status = decode_item_status_array(status_payload)?;
.map(Vec::as_slice)
.unwrap_or(&[]);
let status = if status_payload.is_empty() {
Vec::new()
} else {
decode_item_status_array(status_payload)?
};
let values = match payloads.get(1) {
Some(payload) => decode_monitored_item_value_array(payload)?,
None => Vec::new(),
Some(payload) if !payload.is_empty() => decode_monitored_item_value_array(payload)?,
_ => Vec::new(),
};
Ok(PublishResponse { status, values })
let (result_code, success) = extract_result_status(body_tokens);
Ok(PublishResponse {
status,
values,
result_code,
success,
})
}
/// Decoded `DeleteSubscriptionResponse`. Empty body per
@@ -1162,6 +1205,23 @@ pub const RESULT_CODE_INVALID_CONNECTION_ID: u32 = 1;
#[derive(Debug, Clone, PartialEq)]
pub struct UnregisterItemsResponse {
pub status: Vec<ItemStatus>,
/// `Result.resultCodeField` per the F31 InvalidConnectionId pattern.
pub result_code: Option<u32>,
/// `Result.successField` per the F31 pattern.
pub success: Option<bool>,
}
/// Shared helper for the F31 InvalidConnectionId tolerance pattern.
/// Extracts `Result.resultCodeField` and `Result.successField` from
/// the response body when the server returns the Result wrapper for
/// an operation-level failure. Returns `(None, None)` for the success
/// path where the wrapper isn't emitted.
fn extract_result_status(body_tokens: &[NbfxToken]) -> (Option<u32>, Option<bool>) {
let result_code = find_text_in_named_element(body_tokens, "resultCodeField")
.and_then(|s| s.parse().ok());
let success = find_text_in_named_element(body_tokens, "successField")
.map(|s| s.eq_ignore_ascii_case("true"));
(result_code, success)
}
/// Decode a `RegisterItemsResponse` SOAP body from the NBFX token
@@ -1180,10 +1240,7 @@ pub fn decode_register_items_response(
_ => Vec::new(),
};
let item_capabilities_present = find_element_named(body_tokens, "ItemCapabilities").is_some();
let result_code = find_text_in_named_element(body_tokens, "resultCodeField")
.and_then(|s| s.parse().ok());
let success = find_text_in_named_element(body_tokens, "successField")
.map(|s| s.eq_ignore_ascii_case("true"));
let (result_code, success) = extract_result_status(body_tokens);
Ok(RegisterItemsResponse {
status,
item_capabilities_present,
@@ -1234,17 +1291,21 @@ fn find_text_in_named_element(tokens: &[NbfxToken], name: &str) -> Option<String
None
}
/// Decode an `UnregisterItemsResponse` SOAP body.
/// Decode an `UnregisterItemsResponse` SOAP body. Tolerates empty/
/// missing Status payload per the F31 pattern.
pub fn decode_unregister_items_response(
body_tokens: &[NbfxToken],
) -> Result<UnregisterItemsResponse, OperationError> {
let payloads = collect_asbidata_payloads(body_tokens);
let status_payload = payloads
.into_iter()
.next()
.ok_or(OperationError::MissingField { field: "Status" })?;
let status = decode_item_status_array(&status_payload)?;
Ok(UnregisterItemsResponse { status })
let status = match collect_asbidata_payloads(body_tokens).into_iter().next() {
Some(payload) if !payload.is_empty() => decode_item_status_array(&payload)?,
_ => Vec::new(),
};
let (result_code, success) = extract_result_status(body_tokens);
Ok(UnregisterItemsResponse {
status,
result_code,
success,
})
}
/// Walk a SOAP body's NBFX token stream and pull out the
@@ -2272,13 +2333,60 @@ mod tests {
}
#[test]
fn write_response_missing_status_fails() {
fn write_response_missing_status_returns_empty_with_no_result_code() {
// Post-F33 the decoder is tolerant of missing Status — it
// returns empty status with result_code/success unset.
let body = asbidata_request_body("WriteResponse", &[]);
let err = decode_write_response(&body).unwrap_err();
assert!(matches!(
err,
OperationError::MissingField { field: "Status" }
));
let response = decode_write_response(&body).unwrap();
assert!(response.status.is_empty());
assert_eq!(response.result_code, None);
assert_eq!(response.success, None);
}
#[test]
fn write_response_surfaces_invalid_connection_id() {
let body = synthesise_invalid_connection_id_body("WriteResponse");
let response = decode_write_response(&body).unwrap();
assert!(response.status.is_empty());
assert_eq!(response.result_code, Some(1));
assert_eq!(response.success, Some(false));
}
#[test]
fn publish_response_surfaces_invalid_connection_id() {
let body = synthesise_invalid_connection_id_body("PublishResponse");
let response = decode_publish_response(&body).unwrap();
assert!(response.status.is_empty());
assert!(response.values.is_empty());
assert_eq!(response.result_code, Some(1));
assert_eq!(response.success, Some(false));
}
#[test]
fn unregister_items_response_surfaces_invalid_connection_id() {
let body = synthesise_invalid_connection_id_body("UnregisterItemsResponse");
let response = decode_unregister_items_response(&body).unwrap();
assert!(response.status.is_empty());
assert_eq!(response.result_code, Some(1));
assert_eq!(response.success, Some(false));
}
#[test]
fn delete_monitored_items_response_surfaces_invalid_connection_id() {
let body = synthesise_invalid_connection_id_body("DeleteMonitoredItemsResponse");
let response = decode_delete_monitored_items_response(&body).unwrap();
assert!(response.status.is_empty());
assert_eq!(response.result_code, Some(1));
assert_eq!(response.success, Some(false));
}
#[test]
fn publish_write_complete_response_surfaces_invalid_connection_id() {
let body = synthesise_invalid_connection_id_body("PublishWriteCompleteResponse");
let response = decode_publish_write_complete_response(&body).unwrap();
assert_eq!(response.complete_writes_count, 0);
assert_eq!(response.result_code, Some(1));
assert_eq!(response.success, Some(false));
}
#[test]
+21
View File
@@ -363,6 +363,25 @@ async fn publish_loop<F, Fut>(
loop {
match publish_fn().await {
Ok(response) => {
// F33: if the server short-circuited with a non-zero
// resultCodeField (e.g. InvalidConnectionId), terminate
// the stream rather than silently delivering empty
// value batches forever. Caller can still inspect the
// error via the final stream item.
if let Some(code) = response.result_code {
if code != 0 {
let _ = tx
.send(Err(Error::Connection(
ConnectionError::TransportFailure {
detail: format!(
"publish returned result_code 0x{code:08X} (server-side rejection)"
),
},
)))
.await;
return;
}
}
for value in response.values {
if tx.send(Ok(value)).await.is_err() {
return; // consumer dropped the stream
@@ -471,6 +490,8 @@ mod tests {
PublishResponse {
status: Vec::<ItemStatus>::new(),
values,
result_code: None,
success: None,
}
}