[M5] mxaccess-asb: F25 step 3 — response decoders + Read request body

Foundation for response decoding. Adds:

* `contracts::ItemStatus` — ports `AsbContracts.cs:639-722`. Wire
  layout matches `WriteToStream` exactly: Item (ItemIdentity binary)
  → Status (AsbStatus binary, from F24) → ErrorCode (u16) →
  ErrorCodeSpecified (u8 bool). Note this is NOT the DataMember
  declaration order — the binary serialiser hand-picks Item-first.

* `encode_item_status_array` / `decode_item_status_array` — same
  4-byte int32 count + per-element WriteToStream pattern as the
  ItemIdentity array codec.

* `operations::collect_asbidata_payloads(tokens, field_name)` — walks
  an NBFX token stream and pulls out `<{field}><ASBIData>{Bytes}
  </ASBIData></{field}>` payload bytes. Returns Vec<Vec<u8>> because
  some response shapes (ReadResponse) carry multiple ASBIData
  payloads (Status + Values).

* `decode_register_items_response` / `decode_unregister_items_response`
  — parse SOAP body NBFX tokens into typed RegisterItemsResponse /
  UnregisterItemsResponse. The optional ItemCapabilities array (XML-
  serialised, not binary) is recorded as a presence flag for now;
  decoding the individual ItemRegistration records is a follow-up.

* `build_read_request_body(items)` — simplest unary IASBIDataV2
  request, just `<ReadRequest xmlns="..."><Items><ASBIData>...
  </ASBIData></Items></ReadRequest>`.

* `OperationError` — typed error for response-decode failures
  (`MissingField { field }` and codec wraps).

9 new tests: ItemStatus round-trip (default + with id + with status
payload), ItemStatus array round-trip, RegisterItemsResponse
round-trip via synthetic body, ItemCapabilities presence detection,
UnregisterItemsResponse round-trip, multi-payload extraction (ReadResponse-
shape Status + Values), Read body shape correctness, MissingField
error when Status is absent.

Stubbed for next F25 iteration: Write / PublishWriteComplete /
CreateSubscription / AddMonitoredItems / DeleteMonitoredItems /
Publish builders, ReadResponse + WriteResponse decoders (need
WriteValue / RuntimeValue contract codecs), and the AsbClient
network loop.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-05 11:32:36 -04:00
parent a2b8989cbf
commit c4bf0a0a04
4 changed files with 429 additions and 6 deletions
+5 -1
View File
@@ -46,7 +46,11 @@ move to `## Resolved` with a date + commit hash.
**Resolves when:** F19-F26 are all closed and the four DoD bullets above pass.
**Cumulative execution log.** F19 + F23 (`ed17c07`); F24 (`7611d9e`); F20 (`9dfd193`); F22 (`43c10a1`); F21 (`5f98558`); F25 step 1 (`25dbd8d`); F25 step 2 landed in this commit:
**Cumulative execution log.** F19 + F23 (`ed17c07`); F24 (`7611d9e`); F20 (`9dfd193`); F22 (`43c10a1`); F21 (`5f98558`); F25 step 1 (`25dbd8d`); F25 step 2 (`a2b8989`); F25 step 3 landed in this commit:
- F25 step 3: response decoder foundation. New `mxaccess-asb::contracts::ItemStatus` ports `AsbContracts.cs:639-722` — Item (ItemIdentity) + Status (AsbStatus, F24) + ErrorCode u16 + ErrorCodeSpecified bool, in the .NET-WriteToStream order (Item / Status / ErrorCode / ErrorCodeSpecified — NOT the DataMember declaration order). `encode_item_status_array` / `decode_item_status_array` follow the same int32-count + per-element pattern. New `mxaccess-asb::operations::collect_asbidata_payloads(tokens, field_name)` walks an NBFX token stream and pulls out the `<{field_name}><ASBIData>{Bytes}</ASBIData></{field_name}>` payload bytes — handles multiple payloads (e.g. ReadResponse has both Status and Values). New `decode_register_items_response` / `decode_unregister_items_response` parse SOAP bodies into typed responses. New `build_read_request_body` adds the simplest unary IASBIDataV2 request shape. Plus a typed `OperationError` for response-decode failures (missing fields, codec errors). 9 new tests cover ItemStatus round-trip + array round-trip, RegisterItems response with status array, RegisterItems response detecting ItemCapabilities presence, UnregisterItems response, multi-payload extraction (`ReadResponse`-style with Status + Values), Read request body shape (no RegisterItems-only fields), and graceful MissingField error when Status is absent.
**Earlier slices:**
- F25 step 2 (commit `a2b8989`):
- F25 step 2: per-operation request-body builders + `IAsbCustomSerializableType` binary fast-path. F21 NBFX gains `Bytes8/16/32` text records (used by `XmlDictionaryWriter.WriteBase64` for the `<ASBIData>` content). New `mxaccess-asb::contracts::ItemIdentity` ports the binary `WriteToStream` shape from `AsbContracts.cs:594-611`: u16 kind + u16 reference_type + `AsbBinary.WriteUnicodeString` Name + ContextName + u64 Id + u8 IdSpecified. Plus `encode_item_identity_array` / `decode_item_identity_array` mirroring `WriteArrayToStream` (4-byte int32 count + items). New `mxaccess-asb::operations` builds the SOAP body NBFX token streams: `build_register_items_request_body(items, require_id, register_only)` and `build_unregister_items_request_body(items)`. The `<ASBIData>` element is wrapped with raw NBFX `Bytes` records (the binary form of WCF's `WriteBase64`). 14 new tests cover ItemIdentity round-trip (default, with id, unicode), ItemIdentity array round-trip, AsbBinary unicode-string null/empty/value semantics, byte-layout pinning (21-byte minimum for default ItemIdentity, le-int32 array count), and the full RegisterItems → SoapEnvelope → encode → decode → recover-ItemIdentity-array round-trip through the entire stack.
**Earlier slices:**
+135 -1
View File
@@ -21,7 +21,7 @@
//! round-trip — so the per-type cost is small once the
//! [`ItemIdentity`] reference establishes it.
use mxaccess_codec::CodecError;
use mxaccess_codec::{AsbStatus, CodecError};
/// `ItemIdentity` per `AsbContracts.cs:533-633`. Wire layout:
///
@@ -121,6 +121,104 @@ impl ItemIdentity {
}
}
/// `ItemStatus` per `AsbContracts.cs:639-722`. Wire layout (from the
/// `WriteToStream` method at `cs:682-688`):
///
/// | Field | Codec |
/// |----------------|-----------------------------|
/// | `Item` | [`ItemIdentity`] binary form |
/// | `Status` | [`AsbStatus`] binary form |
/// | `ErrorCode` | u16 |
/// | `ErrorCodeSpecified` | u8 (bool) |
///
/// Note the field order on the wire (`Item` then `Status`) is **NOT**
/// the `[DataMember(Order = …)]` declared order — `WriteToStream`
/// hand-picks Item-first, Status-second, then the trailing pair.
/// We mirror that exactly.
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct ItemStatus {
pub item: ItemIdentity,
pub status: AsbStatus,
pub error_code: u16,
pub error_code_specified: bool,
}
impl ItemStatus {
pub fn encode_into(&self, out: &mut Vec<u8>) {
self.item.encode_into(out);
self.status.encode_into(out);
out.extend_from_slice(&self.error_code.to_le_bytes());
out.push(if self.error_code_specified { 1 } else { 0 });
}
pub fn encode(&self) -> Vec<u8> {
let mut out = Vec::new();
self.encode_into(&mut out);
out
}
pub fn decode(input: &[u8]) -> Result<(Self, usize), CodecError> {
let (item, item_consumed) = ItemIdentity::decode(input)?;
let mut cursor = item_consumed;
let status_tail = input.get(cursor..).ok_or(CodecError::ShortRead {
expected: 5,
actual: 0,
})?;
let (status, status_consumed) = AsbStatus::decode(status_tail)?;
cursor += status_consumed;
let error_code = read_u16_le(input, &mut cursor)?;
let error_code_specified = read_u8(input, &mut cursor)? != 0;
Ok((
Self {
item,
status,
error_code,
error_code_specified,
},
cursor,
))
}
}
/// Decode an array of `ItemStatus`es from the WCF custom-serializer
/// binary form (4-byte int32 count + each item's `WriteToStream`
/// output). Mirrors `ItemStatus.InitializeArrayFromStream`
/// (`cs:702-711`).
pub fn decode_item_status_array(input: &[u8]) -> Result<Vec<ItemStatus>, CodecError> {
let mut cursor = 0usize;
let count = read_i32_le(input, &mut cursor)?;
if count < 0 {
return Err(CodecError::Decode {
offset: 0,
reason: "negative item-status array count",
buffer_len: input.len(),
});
}
let mut out = Vec::with_capacity(count as usize);
for _ in 0..count {
let tail = input.get(cursor..).ok_or(CodecError::ShortRead {
expected: 1,
actual: 0,
})?;
let (item, consumed) = ItemStatus::decode(tail)?;
cursor += consumed;
out.push(item);
}
Ok(out)
}
/// Encode an array of `ItemStatus`es. Mirrors `ItemStatus.WriteArrayToStream`
/// (`cs:713-721`) — 4-byte int32 count + each element's `WriteToStream`.
pub fn encode_item_status_array(items: &[ItemStatus]) -> Vec<u8> {
let mut out = Vec::new();
let count = i32::try_from(items.len()).unwrap_or(i32::MAX);
out.extend_from_slice(&count.to_le_bytes());
for item in items {
item.encode_into(&mut out);
}
out
}
/// Encode an array of `IAsbCustomSerializableType` items per
/// `AsbDataCustomSerializer.WriteObjectContent` array branch
/// (`AsbContracts.cs:1583-1591` — calls `WriteArrayToStream` which
@@ -360,6 +458,42 @@ mod tests {
);
}
#[test]
fn item_status_round_trip() {
let s = ItemStatus {
item: ItemIdentity::absolute_by_name("Tag.X"),
status: AsbStatus {
count: -1,
payload: vec![0xC0],
},
error_code: 0x1234,
error_code_specified: true,
};
let bytes = s.encode();
let (decoded, consumed) = ItemStatus::decode(&bytes).unwrap();
assert_eq!(consumed, bytes.len());
assert_eq!(decoded, s);
}
#[test]
fn item_status_array_round_trip() {
let arr = vec![
ItemStatus::default(),
ItemStatus {
item: ItemIdentity::absolute_by_name("Tag.A"),
status: AsbStatus {
count: 1,
payload: vec![0x01, 0x02],
},
error_code: 42,
error_code_specified: true,
},
];
let bytes = encode_item_status_array(&arr);
let decoded = decode_item_status_array(&bytes).unwrap();
assert_eq!(decoded, arr);
}
#[test]
fn item_identity_array_count_is_le_int32() {
let items = vec![ItemIdentity::default(); 7];
+7 -3
View File
@@ -14,11 +14,15 @@ pub mod envelope;
pub mod operations;
pub use contracts::{
ItemIdentity, ItemIdentityType, ItemReferenceType, decode_item_identity_array,
encode_item_identity_array,
ItemIdentity, ItemIdentityType, ItemReferenceType, ItemStatus, decode_item_identity_array,
decode_item_status_array, encode_item_identity_array, encode_item_status_array,
};
pub use envelope::{
ConnectionValidator, DecodedEnvelope, EnvelopeError, SoapEnvelope, actions, decode_envelope,
encode_envelope,
};
pub use operations::{build_register_items_request_body, build_unregister_items_request_body};
pub use operations::{
OperationError, RegisterItemsResponse, UnregisterItemsResponse, build_read_request_body,
build_register_items_request_body, build_unregister_items_request_body,
collect_asbidata_payloads, decode_register_items_response, decode_unregister_items_response,
};
+282 -1
View File
@@ -36,8 +36,11 @@
//! `InitializeArrayFromStream` shape.
use mxaccess_asb_nettcp::nbfx::{NbfxName, NbfxText, NbfxToken};
use mxaccess_codec::CodecError;
use crate::contracts::{ItemIdentity, encode_item_identity_array};
use crate::contracts::{
ItemIdentity, ItemStatus, decode_item_status_array, encode_item_identity_array,
};
/// Build the NBFX token stream for the body of a `RegisterItemsIn`
/// SOAP envelope. The caller wraps it via [`crate::SoapEnvelope`] +
@@ -75,6 +78,135 @@ pub fn build_register_items_request_body(
)
}
/// Build the NBFX token stream for `ReadIn`. Mirror of
/// `AsbContracts.cs:161-167`:
/// ```xml
/// <ReadRequest xmlns="urn:msg.data.asb.iom:2">
/// <Items><ASBIData>{int32 count + each ItemIdentity}</ASBIData></Items>
/// </ReadRequest>
/// ```
pub fn build_read_request_body(items: &[ItemIdentity]) -> Vec<NbfxToken> {
let payload = encode_item_identity_array(items);
asbidata_request_body("ReadRequest", &[BodyField::asbidata("Items", payload)])
}
/// Decoded `RegisterItemsResponse`. The `Status` array is binary-decoded
/// via `decode_item_status_array`. The optional `ItemCapabilities`
/// (`ItemRegistration[]`) field is **not** decoded here — that contract
/// is regular WCF XML serialization rather than the binary
/// `IAsbCustomSerializableType` fast-path, so it's deferred. Today we
/// just count whether it appeared in the body. See follow-up F28.
#[derive(Debug, Clone, PartialEq)]
pub struct RegisterItemsResponse {
pub status: Vec<ItemStatus>,
/// Whether the `<ItemCapabilities>` element appeared. Decoding the
/// individual `ItemRegistration` records is a future iteration.
pub item_capabilities_present: bool,
}
/// Decoded `UnregisterItemsResponse`. Single field: the per-item
/// `Status` array (`AsbContracts.cs:153-159`).
#[derive(Debug, Clone, PartialEq)]
pub struct UnregisterItemsResponse {
pub status: Vec<ItemStatus>,
}
/// Decode a `RegisterItemsResponse` SOAP body from the NBFX token
/// stream returned by [`crate::decode_envelope`].
pub fn decode_register_items_response(
body_tokens: &[NbfxToken],
) -> Result<RegisterItemsResponse, OperationError> {
let payloads = collect_asbidata_payloads(body_tokens, "Status");
let status_payload = payloads
.into_iter()
.next()
.ok_or(OperationError::MissingField { field: "Status" })?;
let status = decode_item_status_array(&status_payload)?;
let item_capabilities_present = find_element_named(body_tokens, "ItemCapabilities").is_some();
Ok(RegisterItemsResponse {
status,
item_capabilities_present,
})
}
/// Decode an `UnregisterItemsResponse` SOAP body.
pub fn decode_unregister_items_response(
body_tokens: &[NbfxToken],
) -> Result<UnregisterItemsResponse, OperationError> {
let payloads = collect_asbidata_payloads(body_tokens, "Status");
let status_payload = payloads
.into_iter()
.next()
.ok_or(OperationError::MissingField { field: "Status" })?;
let status = decode_item_status_array(&status_payload)?;
Ok(UnregisterItemsResponse { status })
}
/// Walk a SOAP body's NBFX token stream and pull out the
/// `<ASBIData>{Bytes}</ASBIData>` payload bytes for any element named
/// `field_name`. Returns `Vec<Vec<u8>>` because some response shapes
/// have multiple ASBIData payloads (e.g. `ReadResponse` has both
/// `Status` and `Values`).
///
/// Operates on token windows rather than tracking element depth — the
/// response shapes are shallow enough that name-keyed scanning is
/// reliable. Returns whichever payloads it finds; missing fields
/// surface as an empty `Vec`.
pub fn collect_asbidata_payloads(tokens: &[NbfxToken], field_name: &str) -> Vec<Vec<u8>> {
let mut out = Vec::new();
let mut idx = 0;
while idx < tokens.len() {
if let Some(NbfxToken::Element {
name: NbfxName::Inline(local),
..
}) = tokens.get(idx)
{
if local == field_name {
// Skip attributes / namespace decls.
let mut inner = idx + 1;
while matches!(
tokens.get(inner),
Some(NbfxToken::Attribute { .. })
| Some(NbfxToken::DefaultNamespace { .. })
| Some(NbfxToken::NamespaceDeclaration { .. })
) {
inner += 1;
}
if let Some(NbfxToken::Element {
name: NbfxName::Inline(asbidata),
..
}) = tokens.get(inner)
{
if asbidata == "ASBIData" {
if let Some(NbfxToken::Text(NbfxText::Bytes(payload))) =
tokens.get(inner + 1)
{
out.push(payload.clone());
}
}
}
}
}
idx += 1;
}
out
}
fn find_element_named<'a>(tokens: &'a [NbfxToken], name: &str) -> Option<&'a NbfxToken> {
tokens.iter().find(|tok| {
matches!(tok, NbfxToken::Element { name: NbfxName::Inline(local), .. } if local == name)
})
}
#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
pub enum OperationError {
#[error("response is missing required field {field}")]
MissingField { field: &'static str },
#[error("codec error decoding response: {0}")]
Codec(#[from] CodecError),
}
/// Build the NBFX token stream for `UnregisterItemsIn`. Mirror of
/// `AsbContracts.cs:145-159`:
/// ```xml
@@ -307,6 +439,155 @@ mod tests {
}
}
#[test]
fn read_request_body_uses_correct_outer_element_and_no_register_fields() {
let body = build_read_request_body(&[ItemIdentity::absolute_by_name("Tag.X")]);
assert!(matches!(
&body[0],
NbfxToken::Element { name: NbfxName::Inline(s), .. } if s == "ReadRequest"
));
// The Read contract has only `Items`. RequireId / RegisterOnly /
// Values are NOT present.
for tok in &body {
if let NbfxToken::Element {
name: NbfxName::Inline(local),
..
} = tok
{
assert!(local != "RequireId");
assert!(local != "RegisterOnly");
assert!(local != "Values");
}
}
}
#[test]
fn register_items_response_round_trips_status_array() {
use mxaccess_codec::AsbStatus;
let status = vec![
ItemStatus {
item: ItemIdentity::absolute_by_name("Tag.A"),
status: AsbStatus {
count: 0,
payload: vec![],
},
error_code: 0,
error_code_specified: true,
},
ItemStatus {
item: ItemIdentity::absolute_by_name("Tag.B"),
status: AsbStatus {
count: -1,
payload: vec![0xC0],
},
error_code: 7,
error_code_specified: true,
},
];
let payload = crate::contracts::encode_item_status_array(&status);
// Build a synthetic response body matching the wire shape.
let body = asbidata_request_body(
"RegisterItemsResponse",
&[BodyField::asbidata("Status", payload)],
);
let decoded = decode_register_items_response(&body).unwrap();
assert_eq!(decoded.status, status);
assert!(!decoded.item_capabilities_present);
}
#[test]
fn register_items_response_records_when_item_capabilities_appears() {
use mxaccess_codec::AsbStatus;
let status = vec![ItemStatus {
item: ItemIdentity::absolute_by_name("X"),
status: AsbStatus::default(),
error_code: 0,
error_code_specified: false,
}];
let status_payload = crate::contracts::encode_item_status_array(&status);
// Synthesise a body with both Status and ItemCapabilities elements.
let mut body = asbidata_request_body(
"RegisterItemsResponse",
&[BodyField::asbidata("Status", status_payload)],
);
// Splice in a synthetic ItemCapabilities element before the
// outer EndElement.
let close_idx = body.len() - 1;
body.insert(
close_idx,
NbfxToken::Element {
prefix: None,
name: NbfxName::Inline("ItemCapabilities".to_string()),
},
);
body.insert(close_idx + 1, NbfxToken::EndElement);
let decoded = decode_register_items_response(&body).unwrap();
assert_eq!(decoded.status, status);
assert!(decoded.item_capabilities_present);
}
#[test]
fn unregister_items_response_round_trips() {
use mxaccess_codec::AsbStatus;
let status = vec![ItemStatus {
item: ItemIdentity::absolute_by_name("Tag.Y"),
status: AsbStatus {
count: 1,
payload: vec![0x40],
},
error_code: 0,
error_code_specified: false,
}];
let payload = crate::contracts::encode_item_status_array(&status);
let body = asbidata_request_body(
"UnregisterItemsResponse",
&[BodyField::asbidata("Status", payload)],
);
let decoded = decode_unregister_items_response(&body).unwrap();
assert_eq!(decoded.status, status);
}
#[test]
fn collect_asbidata_payloads_returns_empty_when_field_missing() {
let body = vec![
NbfxToken::Element {
prefix: None,
name: NbfxName::Inline("Empty".to_string()),
},
NbfxToken::EndElement,
];
assert!(collect_asbidata_payloads(&body, "Status").is_empty());
}
#[test]
fn collect_asbidata_payloads_handles_multiple_fields() {
let body = asbidata_request_body(
"ReadResponse",
&[
BodyField::asbidata("Status", vec![1, 2, 3]),
BodyField::asbidata("Values", vec![4, 5, 6, 7]),
],
);
let status = collect_asbidata_payloads(&body, "Status");
let values = collect_asbidata_payloads(&body, "Values");
assert_eq!(status, vec![vec![1u8, 2, 3]]);
assert_eq!(values, vec![vec![4u8, 5, 6, 7]]);
}
#[test]
fn decode_register_items_response_returns_missing_field_when_status_absent() {
let body = asbidata_request_body("RegisterItemsResponse", &[]);
let err = decode_register_items_response(&body).unwrap_err();
assert!(matches!(
err,
OperationError::MissingField { field: "Status" }
));
}
#[test]
fn empty_items_array_still_produces_valid_envelope() {
let body = build_register_items_request_body(&[], false, false);