diff --git a/design/followups.md b/design/followups.md
index b3a1040..d040e7e 100644
--- a/design/followups.md
+++ b/design/followups.md
@@ -46,7 +46,11 @@ move to `## Resolved` with a date + commit hash.
**Resolves when:** F19-F26 are all closed and the four DoD bullets above pass.
-**Cumulative execution log.** F19 + F23 (`ed17c07`); F24 (`7611d9e`); F20 (`9dfd193`); F22 (`43c10a1`); F21 (`5f98558`); F25 step 1 (`25dbd8d`); F25 step 2 (`a2b8989`); F25 step 3 landed in this commit:
+**Cumulative execution log.** F19 + F23 (`ed17c07`); F24 (`7611d9e`); F20 (`9dfd193`); F22 (`43c10a1`); F21 (`5f98558`); F25 step 1 (`25dbd8d`); F25 step 2 (`a2b8989`); F25 step 3 (`c4bf0a0`); F25 step 4 landed in this commit:
+- F25 step 4: `mxaccess-asb::client::AsbClient` — async network loop generic over `AsyncRead + AsyncWrite + Unpin + Send`. Wraps the F19-F25.3 stack into a single struct with: `send_preamble` (writes the canonical NMF preamble + waits for PreambleAck; errors on Fault), `send_envelope` (frames in `SizedEnvelope`, writes, reads response, decodes back to `DecodedEnvelope`), `send_signed_envelope` (calls F23 authenticator's `sign` on the unsigned body bytes, attaches a `ConnectionValidator` header, sends), `register_items` / `unregister_items` thin wrappers, `send_end` (writes record `0x07` + shutdowns the stream), and `authenticator_mut` accessor for the future Connect/AuthenticateMe flow. Generic transport means tests use `tokio::io::duplex` for in-memory verification — no live ASB endpoint needed. 6 new tests cover preamble round-trip, fault propagation through preamble, full RegisterItems request → response round-trip via in-memory peer, send-before-preamble guard, send-end record byte (`0x07`), and `PreambleMode` re-export shape. **Note**: the signing path currently hashes the NBFX-encoded body; .NET hashes the XML-text `request.ToXml()`. Functionally present but byte-non-identical to .NET's MAC for the same payload. Live-probe iteration needs to reconcile this — flagged as `TODO` in the doc comment.
+
+**Earlier slices:**
+- F25 step 3 (commit `c4bf0a0`):
- F25 step 3: response decoder foundation. New `mxaccess-asb::contracts::ItemStatus` ports `AsbContracts.cs:639-722` — Item (ItemIdentity) + Status (AsbStatus, F24) + ErrorCode u16 + ErrorCodeSpecified bool, in the .NET-WriteToStream order (Item / Status / ErrorCode / ErrorCodeSpecified — NOT the DataMember declaration order). `encode_item_status_array` / `decode_item_status_array` follow the same int32-count + per-element pattern. New `mxaccess-asb::operations::collect_asbidata_payloads(tokens, field_name)` walks an NBFX token stream and pulls out the `<{field_name}>{Bytes}{field_name}>` payload bytes — handles multiple payloads (e.g. ReadResponse has both Status and Values). New `decode_register_items_response` / `decode_unregister_items_response` parse SOAP bodies into typed responses. New `build_read_request_body` adds the simplest unary IASBIDataV2 request shape. Plus a typed `OperationError` for response-decode failures (missing fields, codec errors). 9 new tests cover ItemStatus round-trip + array round-trip, RegisterItems response with status array, RegisterItems response detecting ItemCapabilities presence, UnregisterItems response, multi-payload extraction (`ReadResponse`-style with Status + Values), Read request body shape (no RegisterItems-only fields), and graceful MissingField error when Status is absent.
**Earlier slices:**
diff --git a/rust/crates/mxaccess-asb/src/client.rs b/rust/crates/mxaccess-asb/src/client.rs
new file mode 100644
index 0000000..4de7e29
--- /dev/null
+++ b/rust/crates/mxaccess-asb/src/client.rs
@@ -0,0 +1,581 @@
+//! `AsbClient` — `IASBIDataV2` request/response loop over a transport
+//! that implements `AsyncRead + AsyncWrite`.
+//!
+//! Wires together every M5 framing layer:
+//!
+//! ```text
+//! Tokio AsyncRead+AsyncWrite (typically a TcpStream)
+//! │
+//! ▼
+//! [MS-NMF] framing (F20: nmf::NmfRecord)
+//! │ preamble: Version → Mode(Duplex) → Via → KnownEncoding(BinaryWithDictionary) → PreambleEnd
+//! │ per-msg: SizedEnvelope { Multibyte Int31 length + payload bytes }
+//! │ shutdown: End record
+//! ▼
+//! [MC-NBFX]/[MC-NBFS] binary XML (F21 + F22)
+//! ▼
+//! SOAP-1.2 envelope (F25 step 1: SoapEnvelope, encode_envelope/decode_envelope)
+//! ▼
+//! IASBIDataV2 operation contract (F25 steps 2/3: per-op request/response)
+//! ```
+//!
+//! The client is generic over the transport so the network-bound
+//! request/response loop can be unit-tested against in-memory streams
+//! (`tokio::io::DuplexStream`) without a live ASB endpoint.
+//!
+//! ## Scope of this iteration (F25 step 4)
+//!
+//! Implements:
+//! * [`AsbClient::new`] — wraps a transport + authenticator into a
+//! ready-to-use client (assumes the preamble has already been sent
+//! or that `send_preamble` will be called next).
+//! * [`AsbClient::send_preamble`] — writes the canonical preamble
+//! record sequence and waits for a `PreambleAck` from the peer.
+//! * [`AsbClient::send_envelope`] — frames a `SoapEnvelope` in a
+//! `SizedEnvelope` record, writes it, reads the next record from
+//! the peer, decodes the response envelope.
+//! * [`AsbClient::send_end`] — writes the NMF `End` record so the
+//! peer can drain cleanly.
+//! * Per-operation thin wrappers: [`AsbClient::register_items`],
+//! [`AsbClient::unregister_items`].
+//!
+//! Stubbed for next F25 iteration:
+//! * `AsbClient::connect` — the DH `Connect` + `AuthenticateMe`
+//! handshake. Needs the `ConnectRequest` / `ConnectResponse` body
+//! builders (regular WCF XML, not the IAsbCustomSerializableType
+//! fast-path) and authentication-data assembly off F23's
+//! `AsbAuthenticator::create_authentication_data`.
+//! * Read / Write / Subscription operation wrappers.
+
+use mxaccess_asb_nettcp::auth::AsbAuthenticator;
+use mxaccess_asb_nettcp::nbfx::{DynamicDictionary, NbfxError};
+use mxaccess_asb_nettcp::nmf::{self, NmfError, NmfRecord, decode_multibyte_int31};
+use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
+
+use crate::contracts::{ItemIdentity, ItemStatus};
+use crate::envelope::{ConnectionValidator, EnvelopeError, SoapEnvelope};
+use crate::operations::{
+ OperationError, RegisterItemsResponse, UnregisterItemsResponse,
+ build_register_items_request_body, build_unregister_items_request_body,
+ decode_register_items_response, decode_unregister_items_response,
+};
+use crate::{actions, decode_envelope, encode_envelope};
+
+/// `IASBIDataV2` request/response client over a Tokio
+/// `AsyncRead + AsyncWrite` transport.
+pub struct AsbClient {
+ stream: T,
+ authenticator: AsbAuthenticator,
+ via_uri: String,
+ write_dictionary: DynamicDictionary,
+ read_dictionary: DynamicDictionary,
+ preamble_sent: bool,
+ closed: bool,
+}
+
+impl AsbClient {
+ /// Wrap a fresh transport. The caller is responsible for opening
+ /// the underlying TCP connection (or test stream) before passing
+ /// it in. `via_uri` is the `net.tcp://host:port/path` URL the peer
+ /// expects in the `ViaRecord`.
+ pub fn new(stream: T, authenticator: AsbAuthenticator, via_uri: impl Into) -> Self {
+ Self {
+ stream,
+ authenticator,
+ via_uri: via_uri.into(),
+ write_dictionary: DynamicDictionary::new(),
+ read_dictionary: DynamicDictionary::new(),
+ preamble_sent: false,
+ closed: false,
+ }
+ }
+
+ /// Borrow the inner authenticator. Useful for tests + for the F25
+ /// `connect` flow that needs to call `accept_connect_response`.
+ pub fn authenticator_mut(&mut self) -> &mut AsbAuthenticator {
+ &mut self.authenticator
+ }
+
+ /// Write the canonical NMF preamble (`Version 1.0` → `Duplex` →
+ /// `Via` → `BinaryWithDictionary` → `PreambleEnd`) and read the
+ /// peer's `PreambleAck` reply. Records other than `PreambleAck` —
+ /// notably `Fault` — surface as a typed error.
+ ///
+ /// Idempotent in the sense that a second call does nothing; the
+ /// preamble is only ever exchanged once per session.
+ pub async fn send_preamble(&mut self) -> Result<(), ClientError> {
+ if self.preamble_sent {
+ return Ok(());
+ }
+ let mut buf = Vec::new();
+ nmf::encode_preamble(&self.via_uri, &mut buf)?;
+ self.stream.write_all(&buf).await?;
+ self.stream.flush().await?;
+
+ let record = read_record(&mut self.stream).await?;
+ match record {
+ NmfRecord::PreambleAck => {
+ self.preamble_sent = true;
+ Ok(())
+ }
+ NmfRecord::Fault(message) => Err(ClientError::Fault(message)),
+ other => Err(ClientError::UnexpectedRecord(format!("{other:?}"))),
+ }
+ }
+
+ /// Encode a `SoapEnvelope` to NBFX bytes, wrap in a `SizedEnvelope`
+ /// NMF record, write, then read the peer's reply (a single
+ /// `SizedEnvelope` containing the response envelope).
+ ///
+ /// Updates the per-session `write_dictionary` + `read_dictionary`
+ /// so subsequent calls compress recurring strings via the dynamic
+ /// dictionary.
+ pub async fn send_envelope(
+ &mut self,
+ envelope: &SoapEnvelope,
+ ) -> Result {
+ if !self.preamble_sent {
+ return Err(ClientError::PreambleNotSent);
+ }
+ if self.closed {
+ return Err(ClientError::AlreadyClosed);
+ }
+
+ let payload = encode_envelope(envelope, &mut self.write_dictionary)?;
+ let mut framed = Vec::new();
+ NmfRecord::SizedEnvelope(payload).encode_into(&mut framed)?;
+ self.stream.write_all(&framed).await?;
+ self.stream.flush().await?;
+
+ let record = read_record(&mut self.stream).await?;
+ match record {
+ NmfRecord::SizedEnvelope(reply_bytes) => {
+ let decoded = decode_envelope(&reply_bytes, &mut self.read_dictionary)?;
+ Ok(decoded)
+ }
+ NmfRecord::Fault(message) => Err(ClientError::Fault(message)),
+ NmfRecord::End => Err(ClientError::PeerClosed),
+ other => Err(ClientError::UnexpectedRecord(format!("{other:?}"))),
+ }
+ }
+
+ /// Sign a request via the authenticator, then send via
+ /// [`Self::send_envelope`]. Mirrors the .NET pattern at
+ /// `MxAsbDataClient.cs:205-206` (`authenticator.Sign(request);
+ /// channel.RegisterItems(request);`).
+ pub async fn send_signed_envelope(
+ &mut self,
+ action: &str,
+ body_tokens: Vec,
+ force_hmac: bool,
+ ) -> Result {
+ // The .NET `AsbSystemAuthenticator.Sign` hashes the
+ // serialised request XML — `request.ToXml()` — and embeds the
+ // resulting MAC in the ConnectionValidator header. We
+ // approximate that here by signing the SOAP body's UTF-8
+ // representation: caller supplies `body_tokens`, we encode an
+ // unsigned envelope to bytes, hash those bytes, then re-encode
+ // with the validator inserted.
+ //
+ // This isn't byte-identical to .NET's hash because we sign the
+ // NBFX-encoded body rather than the canonical-XML form. F25's
+ // live-probe iteration needs to reconcile this; until then,
+ // the signing is functionally present (validator is built and
+ // attached) but the MAC bytes won't match the .NET MAC for the
+ // same payload.
+
+ let unsigned = SoapEnvelope::new(action).with_body_tokens(body_tokens.clone());
+ let mut probe_dict = DynamicDictionary::new();
+ let unsigned_bytes = encode_envelope(&unsigned, &mut probe_dict)?;
+ let signed = self.authenticator.sign(&unsigned_bytes, force_hmac)?;
+ let validator = ConnectionValidator::from_signed(&signed);
+ let signed_env = SoapEnvelope::new(action)
+ .with_body_tokens(body_tokens)
+ .with_validator(validator);
+ self.send_envelope(&signed_env).await
+ }
+
+ /// `RegisterItems` operation — sends a signed `RegisterItemsIn`
+ /// SOAP envelope and decodes the `RegisterItemsResponse`.
+ pub async fn register_items(
+ &mut self,
+ items: &[ItemIdentity],
+ require_id: bool,
+ register_only: bool,
+ ) -> Result {
+ let body = build_register_items_request_body(items, require_id, register_only);
+ let response = self
+ .send_signed_envelope(actions::REGISTER_ITEMS, body, false)
+ .await?;
+ Ok(decode_register_items_response(&response.body_tokens)?)
+ }
+
+ /// `UnregisterItems` operation — sends a signed `UnregisterItemsIn`
+ /// SOAP envelope and decodes the `UnregisterItemsResponse`.
+ pub async fn unregister_items(
+ &mut self,
+ items: &[ItemIdentity],
+ ) -> Result {
+ let body = build_unregister_items_request_body(items);
+ let response = self
+ .send_signed_envelope(actions::UNREGISTER_ITEMS, body, false)
+ .await?;
+ Ok(decode_unregister_items_response(&response.body_tokens)?)
+ }
+
+ /// Send the NMF `End` record so the peer can drain cleanly. Marks
+ /// the client as closed; subsequent send attempts return
+ /// `ClientError::AlreadyClosed`.
+ pub async fn send_end(&mut self) -> Result<(), ClientError> {
+ if self.closed {
+ return Ok(());
+ }
+ let mut buf = Vec::new();
+ NmfRecord::End.encode_into(&mut buf)?;
+ self.stream.write_all(&buf).await?;
+ self.stream.flush().await?;
+ self.stream.shutdown().await?;
+ self.closed = true;
+ Ok(())
+ }
+
+ /// Test-only: surface raw item-status arrays for assertions.
+ #[doc(hidden)]
+ pub fn _drain_status(response: &RegisterItemsResponse) -> &[ItemStatus] {
+ &response.status
+ }
+}
+
+// ---- async record reader -------------------------------------------------
+
+/// Read one NMF record from `stream`. Returns the parsed
+/// [`NmfRecord`]; the encoder peers shape sized envelopes as `0x06 +
+/// Multibyte Int31 length + payload`, so we need streaming reads
+/// rather than bulk-decode.
+async fn read_record(stream: &mut T) -> Result {
+ let mut type_byte = [0u8; 1];
+ stream.read_exact(&mut type_byte).await?;
+ match type_byte[0] {
+ 0x06 => {
+ let len = read_multibyte_int31_async(stream).await?;
+ let mut payload = vec![0u8; len];
+ stream.read_exact(&mut payload).await?;
+ Ok(NmfRecord::SizedEnvelope(payload))
+ }
+ 0x07 => Ok(NmfRecord::End),
+ 0x08 => {
+ let len = read_multibyte_int31_async(stream).await?;
+ let mut payload = vec![0u8; len];
+ stream.read_exact(&mut payload).await?;
+ let message = String::from_utf8(payload).map_err(|_| {
+ ClientError::Nmf(NmfError::Truncated {
+ need: 1,
+ have: 0,
+ stage: "fault-utf8",
+ })
+ })?;
+ Ok(NmfRecord::Fault(message))
+ }
+ 0x0A => Ok(NmfRecord::UpgradeResponse),
+ 0x0B => Ok(NmfRecord::PreambleAck),
+ 0x0C => Ok(NmfRecord::PreambleEnd),
+ // For Version / Mode / Via / KnownEncoding / Extensible /
+ // UnsizedEnvelope / UpgradeRequest, fall back to the bulk
+ // decoder by buffering a single record's worth of bytes. This
+ // path is rarely needed for client→server traffic but exists
+ // for completeness against unexpected peers.
+ other_byte => {
+ // Best-effort: read a small lookahead and dispatch to the
+ // synchronous record decoder. We read at most 2 bytes
+ // since Version (3-byte total), Mode (2), KnownEncoding
+ // (2), UpgradeResponse (1) all fit comfortably.
+ match NmfRecord::decode(&[other_byte]) {
+ Ok((record, _)) => Ok(record),
+ Err(_) => {
+ // Fall through to the multi-byte families. Buffer
+ // up to 64 bytes for the rare paths.
+ let mut tail = vec![0u8; 64];
+ let n = stream.read(&mut tail).await?;
+ let mut combined = vec![other_byte];
+ if let Some(slice) = tail.get(..n) {
+ combined.extend_from_slice(slice);
+ }
+ let (record, _) = NmfRecord::decode(&combined)?;
+ Ok(record)
+ }
+ }
+ }
+ }
+}
+
+async fn read_multibyte_int31_async(
+ stream: &mut T,
+) -> Result {
+ let mut buf = Vec::with_capacity(5);
+ let mut byte = [0u8; 1];
+ for _ in 0..5 {
+ stream.read_exact(&mut byte).await?;
+ buf.push(byte[0]);
+ if byte[0] & 0x80 == 0 {
+ break;
+ }
+ }
+ let mut cursor = 0usize;
+ let value = decode_multibyte_int31(&buf, &mut cursor)?;
+ usize::try_from(value).map_err(|_| ClientError::Nmf(NmfError::NegativeLength(value)))
+}
+
+// ---- error type ----------------------------------------------------------
+
+#[derive(Debug, thiserror::Error)]
+#[non_exhaustive]
+pub enum ClientError {
+ #[error("I/O error: {0}")]
+ Io(#[from] std::io::Error),
+ #[error("NMF framing error: {0}")]
+ Nmf(#[from] NmfError),
+ #[error("NBFX codec error: {0}")]
+ Nbfx(#[from] NbfxError),
+ #[error("envelope error: {0}")]
+ Envelope(#[from] EnvelopeError),
+ #[error("operation error: {0}")]
+ Operation(#[from] OperationError),
+ #[error("auth error: {0}")]
+ Auth(#[from] mxaccess_asb_nettcp::auth::AuthError),
+ #[error("preamble has not been sent yet — call send_preamble() first")]
+ PreambleNotSent,
+ #[error("client has already been closed via send_end()")]
+ AlreadyClosed,
+ #[error("peer reported NMF fault: {0}")]
+ Fault(String),
+ #[error("peer closed the channel before sending a response")]
+ PeerClosed,
+ #[error("unexpected NMF record on response path: {0}")]
+ UnexpectedRecord(String),
+}
+
+/// Convenience: `NmfMode` re-export so callers don't have to pull
+/// `mxaccess-asb-nettcp` directly to specify the preamble mode.
+/// (Currently fixed to `Duplex` per the canonical preamble.)
+pub use mxaccess_asb_nettcp::nmf::NmfMode as PreambleMode;
+
+#[cfg(test)]
+#[allow(
+ clippy::unwrap_used,
+ clippy::expect_used,
+ clippy::panic,
+ clippy::indexing_slicing
+)]
+mod tests {
+ use super::*;
+ use mxaccess_asb_nettcp::auth::CryptoParameters;
+ use mxaccess_asb_nettcp::nmf::NmfMode;
+ use tokio::io::DuplexStream;
+
+ fn make_authenticator() -> AsbAuthenticator {
+ AsbAuthenticator::new("test-passphrase", &CryptoParameters::defaults(), [0u8; 16]).unwrap()
+ }
+
+ /// Spawn a "fake server" task that runs a closure against the peer
+ /// end of an in-memory duplex stream.
+ fn spawn_peer(peer: DuplexStream, work: F) -> tokio::task::JoinHandle
+ where
+ F: FnOnce(DuplexStream) -> Fut + Send + 'static,
+ Fut: std::future::Future
+ ]
+ }
+
+ /// Sanity-check that NmfMode re-export matches the upstream type.
+ #[test]
+ fn preamble_mode_reexport_matches_upstream() {
+ assert_eq!(PreambleMode::Duplex as u8, NmfMode::Duplex as u8);
+ }
+}
diff --git a/rust/crates/mxaccess-asb/src/lib.rs b/rust/crates/mxaccess-asb/src/lib.rs
index 80ec157..bd0f63b 100644
--- a/rust/crates/mxaccess-asb/src/lib.rs
+++ b/rust/crates/mxaccess-asb/src/lib.rs
@@ -9,10 +9,13 @@
#![forbid(unsafe_code)]
+pub mod client;
pub mod contracts;
pub mod envelope;
pub mod operations;
+pub use client::{AsbClient, ClientError, PreambleMode};
+
pub use contracts::{
ItemIdentity, ItemIdentityType, ItemReferenceType, ItemStatus, decode_item_identity_array,
decode_item_status_array, encode_item_identity_array, encode_item_status_array,