From 1e59249662f2282f6cef938d845c4f35dda1822d Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Tue, 5 May 2026 11:37:48 -0400 Subject: [PATCH] =?UTF-8?q?[M5]=20mxaccess-asb:=20F25=20step=204=20?= =?UTF-8?q?=E2=80=94=20AsbClient=20async=20network=20loop?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The first slice of F25 that actually moves bytes across a transport. Wraps every M5 framing layer (F19-F25.3) into a single async client generic over `AsyncRead + AsyncWrite + Unpin + Send`. Tested in-memory via `tokio::io::duplex` — no live ASB endpoint required. API: * `AsbClient::new(stream, authenticator, via_uri)` — wraps a Tokio transport + F23 authenticator into a ready client. * `send_preamble()` — writes the canonical preamble (Version 1.0 → Duplex → Via → BinaryWithDictionary → PreambleEnd) and reads the peer's PreambleAck. Surfaces Fault as `ClientError::Fault(msg)`. * `send_envelope(env)` — frames `SoapEnvelope` in a SizedEnvelope NMF record, writes, reads the response SizedEnvelope, decodes back to `DecodedEnvelope`. * `send_signed_envelope(action, body, force_hmac)` — calls F23 authenticator's `sign` on the unsigned body bytes, attaches a ConnectionValidator header (base64'd MAC + IV), sends. * `register_items` / `unregister_items` — thin per-operation wrappers threading body builder + response decoder. * `send_end()` — writes record 0x07 + shutdowns the stream. Async record reader: streaming decode of the multibyte-int31 length prefix for SizedEnvelope (0x06) / Fault (0x08), plus a fallback path for Version / Mode / KnownEncoding / etc. `ClientError` covers I/O, NMF, NBFX, Envelope, Operation, Auth, plus PreambleNotSent / AlreadyClosed / Fault / PeerClosed / UnexpectedRecord guards. 6 new tests via in-memory `tokio::io::duplex`: * Preamble round-trip with synthetic peer returning PreambleAck. * Fault propagation through preamble exchange. * End-to-end RegisterItems request → response with a peer that drains preamble, replies PreambleAck, drains the SizedEnvelope, responds with a synthetic RegisterItemsResponse body containing a binary-encoded ItemStatus array. Client decodes and asserts the recovered ItemIdentity name. * `send_envelope` before preamble fails with PreambleNotSent. * `send_end` writes record 0x07 to the wire. * PreambleMode re-export keeps shape parity with `nmf::NmfMode`. Known limitation: the signing path currently hashes the NBFX-encoded body; .NET hashes the XML-text `request.ToXml()`. Functionally present (validator built and attached) but MAC bytes won't match .NET's MAC for the same payload until the live-probe iteration reconciles which canonical form to sign. Stubbed for next F25 iteration: * `AsbClient::connect` — DH `Connect` + `AuthenticateMe` handshake flow. Needs ConnectRequest/Response builders (regular WCF XML, not the IAsbCustomSerializableType fast-path) and the `AsbAuthenticator::create_authentication_data` integration. * Read / Write / Subscription operation wrappers. Co-Authored-By: Claude Opus 4.7 (1M context) --- design/followups.md | 6 +- rust/crates/mxaccess-asb/src/client.rs | 581 +++++++++++++++++++++++++ rust/crates/mxaccess-asb/src/lib.rs | 3 + 3 files changed, 589 insertions(+), 1 deletion(-) create mode 100644 rust/crates/mxaccess-asb/src/client.rs diff --git a/design/followups.md b/design/followups.md index b3a1040..d040e7e 100644 --- a/design/followups.md +++ b/design/followups.md @@ -46,7 +46,11 @@ move to `## Resolved` with a date + commit hash. **Resolves when:** F19-F26 are all closed and the four DoD bullets above pass. -**Cumulative execution log.** F19 + F23 (`ed17c07`); F24 (`7611d9e`); F20 (`9dfd193`); F22 (`43c10a1`); F21 (`5f98558`); F25 step 1 (`25dbd8d`); F25 step 2 (`a2b8989`); F25 step 3 landed in this commit: +**Cumulative execution log.** F19 + F23 (`ed17c07`); F24 (`7611d9e`); F20 (`9dfd193`); F22 (`43c10a1`); F21 (`5f98558`); F25 step 1 (`25dbd8d`); F25 step 2 (`a2b8989`); F25 step 3 (`c4bf0a0`); F25 step 4 landed in this commit: +- F25 step 4: `mxaccess-asb::client::AsbClient` — async network loop generic over `AsyncRead + AsyncWrite + Unpin + Send`. Wraps the F19-F25.3 stack into a single struct with: `send_preamble` (writes the canonical NMF preamble + waits for PreambleAck; errors on Fault), `send_envelope` (frames in `SizedEnvelope`, writes, reads response, decodes back to `DecodedEnvelope`), `send_signed_envelope` (calls F23 authenticator's `sign` on the unsigned body bytes, attaches a `ConnectionValidator` header, sends), `register_items` / `unregister_items` thin wrappers, `send_end` (writes record `0x07` + shutdowns the stream), and `authenticator_mut` accessor for the future Connect/AuthenticateMe flow. Generic transport means tests use `tokio::io::duplex` for in-memory verification — no live ASB endpoint needed. 6 new tests cover preamble round-trip, fault propagation through preamble, full RegisterItems request → response round-trip via in-memory peer, send-before-preamble guard, send-end record byte (`0x07`), and `PreambleMode` re-export shape. **Note**: the signing path currently hashes the NBFX-encoded body; .NET hashes the XML-text `request.ToXml()`. Functionally present but byte-non-identical to .NET's MAC for the same payload. Live-probe iteration needs to reconcile this — flagged as `TODO` in the doc comment. + +**Earlier slices:** +- F25 step 3 (commit `c4bf0a0`): - F25 step 3: response decoder foundation. New `mxaccess-asb::contracts::ItemStatus` ports `AsbContracts.cs:639-722` — Item (ItemIdentity) + Status (AsbStatus, F24) + ErrorCode u16 + ErrorCodeSpecified bool, in the .NET-WriteToStream order (Item / Status / ErrorCode / ErrorCodeSpecified — NOT the DataMember declaration order). `encode_item_status_array` / `decode_item_status_array` follow the same int32-count + per-element pattern. New `mxaccess-asb::operations::collect_asbidata_payloads(tokens, field_name)` walks an NBFX token stream and pulls out the `<{field_name}>{Bytes}` payload bytes — handles multiple payloads (e.g. ReadResponse has both Status and Values). New `decode_register_items_response` / `decode_unregister_items_response` parse SOAP bodies into typed responses. New `build_read_request_body` adds the simplest unary IASBIDataV2 request shape. Plus a typed `OperationError` for response-decode failures (missing fields, codec errors). 9 new tests cover ItemStatus round-trip + array round-trip, RegisterItems response with status array, RegisterItems response detecting ItemCapabilities presence, UnregisterItems response, multi-payload extraction (`ReadResponse`-style with Status + Values), Read request body shape (no RegisterItems-only fields), and graceful MissingField error when Status is absent. **Earlier slices:** diff --git a/rust/crates/mxaccess-asb/src/client.rs b/rust/crates/mxaccess-asb/src/client.rs new file mode 100644 index 0000000..4de7e29 --- /dev/null +++ b/rust/crates/mxaccess-asb/src/client.rs @@ -0,0 +1,581 @@ +//! `AsbClient` — `IASBIDataV2` request/response loop over a transport +//! that implements `AsyncRead + AsyncWrite`. +//! +//! Wires together every M5 framing layer: +//! +//! ```text +//! Tokio AsyncRead+AsyncWrite (typically a TcpStream) +//! │ +//! ▼ +//! [MS-NMF] framing (F20: nmf::NmfRecord) +//! │ preamble: Version → Mode(Duplex) → Via → KnownEncoding(BinaryWithDictionary) → PreambleEnd +//! │ per-msg: SizedEnvelope { Multibyte Int31 length + payload bytes } +//! │ shutdown: End record +//! ▼ +//! [MC-NBFX]/[MC-NBFS] binary XML (F21 + F22) +//! ▼ +//! SOAP-1.2 envelope (F25 step 1: SoapEnvelope, encode_envelope/decode_envelope) +//! ▼ +//! IASBIDataV2 operation contract (F25 steps 2/3: per-op request/response) +//! ``` +//! +//! The client is generic over the transport so the network-bound +//! request/response loop can be unit-tested against in-memory streams +//! (`tokio::io::DuplexStream`) without a live ASB endpoint. +//! +//! ## Scope of this iteration (F25 step 4) +//! +//! Implements: +//! * [`AsbClient::new`] — wraps a transport + authenticator into a +//! ready-to-use client (assumes the preamble has already been sent +//! or that `send_preamble` will be called next). +//! * [`AsbClient::send_preamble`] — writes the canonical preamble +//! record sequence and waits for a `PreambleAck` from the peer. +//! * [`AsbClient::send_envelope`] — frames a `SoapEnvelope` in a +//! `SizedEnvelope` record, writes it, reads the next record from +//! the peer, decodes the response envelope. +//! * [`AsbClient::send_end`] — writes the NMF `End` record so the +//! peer can drain cleanly. +//! * Per-operation thin wrappers: [`AsbClient::register_items`], +//! [`AsbClient::unregister_items`]. +//! +//! Stubbed for next F25 iteration: +//! * `AsbClient::connect` — the DH `Connect` + `AuthenticateMe` +//! handshake. Needs the `ConnectRequest` / `ConnectResponse` body +//! builders (regular WCF XML, not the IAsbCustomSerializableType +//! fast-path) and authentication-data assembly off F23's +//! `AsbAuthenticator::create_authentication_data`. +//! * Read / Write / Subscription operation wrappers. + +use mxaccess_asb_nettcp::auth::AsbAuthenticator; +use mxaccess_asb_nettcp::nbfx::{DynamicDictionary, NbfxError}; +use mxaccess_asb_nettcp::nmf::{self, NmfError, NmfRecord, decode_multibyte_int31}; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; + +use crate::contracts::{ItemIdentity, ItemStatus}; +use crate::envelope::{ConnectionValidator, EnvelopeError, SoapEnvelope}; +use crate::operations::{ + OperationError, RegisterItemsResponse, UnregisterItemsResponse, + build_register_items_request_body, build_unregister_items_request_body, + decode_register_items_response, decode_unregister_items_response, +}; +use crate::{actions, decode_envelope, encode_envelope}; + +/// `IASBIDataV2` request/response client over a Tokio +/// `AsyncRead + AsyncWrite` transport. +pub struct AsbClient { + stream: T, + authenticator: AsbAuthenticator, + via_uri: String, + write_dictionary: DynamicDictionary, + read_dictionary: DynamicDictionary, + preamble_sent: bool, + closed: bool, +} + +impl AsbClient { + /// Wrap a fresh transport. The caller is responsible for opening + /// the underlying TCP connection (or test stream) before passing + /// it in. `via_uri` is the `net.tcp://host:port/path` URL the peer + /// expects in the `ViaRecord`. + pub fn new(stream: T, authenticator: AsbAuthenticator, via_uri: impl Into) -> Self { + Self { + stream, + authenticator, + via_uri: via_uri.into(), + write_dictionary: DynamicDictionary::new(), + read_dictionary: DynamicDictionary::new(), + preamble_sent: false, + closed: false, + } + } + + /// Borrow the inner authenticator. Useful for tests + for the F25 + /// `connect` flow that needs to call `accept_connect_response`. + pub fn authenticator_mut(&mut self) -> &mut AsbAuthenticator { + &mut self.authenticator + } + + /// Write the canonical NMF preamble (`Version 1.0` → `Duplex` → + /// `Via` → `BinaryWithDictionary` → `PreambleEnd`) and read the + /// peer's `PreambleAck` reply. Records other than `PreambleAck` — + /// notably `Fault` — surface as a typed error. + /// + /// Idempotent in the sense that a second call does nothing; the + /// preamble is only ever exchanged once per session. + pub async fn send_preamble(&mut self) -> Result<(), ClientError> { + if self.preamble_sent { + return Ok(()); + } + let mut buf = Vec::new(); + nmf::encode_preamble(&self.via_uri, &mut buf)?; + self.stream.write_all(&buf).await?; + self.stream.flush().await?; + + let record = read_record(&mut self.stream).await?; + match record { + NmfRecord::PreambleAck => { + self.preamble_sent = true; + Ok(()) + } + NmfRecord::Fault(message) => Err(ClientError::Fault(message)), + other => Err(ClientError::UnexpectedRecord(format!("{other:?}"))), + } + } + + /// Encode a `SoapEnvelope` to NBFX bytes, wrap in a `SizedEnvelope` + /// NMF record, write, then read the peer's reply (a single + /// `SizedEnvelope` containing the response envelope). + /// + /// Updates the per-session `write_dictionary` + `read_dictionary` + /// so subsequent calls compress recurring strings via the dynamic + /// dictionary. + pub async fn send_envelope( + &mut self, + envelope: &SoapEnvelope, + ) -> Result { + if !self.preamble_sent { + return Err(ClientError::PreambleNotSent); + } + if self.closed { + return Err(ClientError::AlreadyClosed); + } + + let payload = encode_envelope(envelope, &mut self.write_dictionary)?; + let mut framed = Vec::new(); + NmfRecord::SizedEnvelope(payload).encode_into(&mut framed)?; + self.stream.write_all(&framed).await?; + self.stream.flush().await?; + + let record = read_record(&mut self.stream).await?; + match record { + NmfRecord::SizedEnvelope(reply_bytes) => { + let decoded = decode_envelope(&reply_bytes, &mut self.read_dictionary)?; + Ok(decoded) + } + NmfRecord::Fault(message) => Err(ClientError::Fault(message)), + NmfRecord::End => Err(ClientError::PeerClosed), + other => Err(ClientError::UnexpectedRecord(format!("{other:?}"))), + } + } + + /// Sign a request via the authenticator, then send via + /// [`Self::send_envelope`]. Mirrors the .NET pattern at + /// `MxAsbDataClient.cs:205-206` (`authenticator.Sign(request); + /// channel.RegisterItems(request);`). + pub async fn send_signed_envelope( + &mut self, + action: &str, + body_tokens: Vec, + force_hmac: bool, + ) -> Result { + // The .NET `AsbSystemAuthenticator.Sign` hashes the + // serialised request XML — `request.ToXml()` — and embeds the + // resulting MAC in the ConnectionValidator header. We + // approximate that here by signing the SOAP body's UTF-8 + // representation: caller supplies `body_tokens`, we encode an + // unsigned envelope to bytes, hash those bytes, then re-encode + // with the validator inserted. + // + // This isn't byte-identical to .NET's hash because we sign the + // NBFX-encoded body rather than the canonical-XML form. F25's + // live-probe iteration needs to reconcile this; until then, + // the signing is functionally present (validator is built and + // attached) but the MAC bytes won't match the .NET MAC for the + // same payload. + + let unsigned = SoapEnvelope::new(action).with_body_tokens(body_tokens.clone()); + let mut probe_dict = DynamicDictionary::new(); + let unsigned_bytes = encode_envelope(&unsigned, &mut probe_dict)?; + let signed = self.authenticator.sign(&unsigned_bytes, force_hmac)?; + let validator = ConnectionValidator::from_signed(&signed); + let signed_env = SoapEnvelope::new(action) + .with_body_tokens(body_tokens) + .with_validator(validator); + self.send_envelope(&signed_env).await + } + + /// `RegisterItems` operation — sends a signed `RegisterItemsIn` + /// SOAP envelope and decodes the `RegisterItemsResponse`. + pub async fn register_items( + &mut self, + items: &[ItemIdentity], + require_id: bool, + register_only: bool, + ) -> Result { + let body = build_register_items_request_body(items, require_id, register_only); + let response = self + .send_signed_envelope(actions::REGISTER_ITEMS, body, false) + .await?; + Ok(decode_register_items_response(&response.body_tokens)?) + } + + /// `UnregisterItems` operation — sends a signed `UnregisterItemsIn` + /// SOAP envelope and decodes the `UnregisterItemsResponse`. + pub async fn unregister_items( + &mut self, + items: &[ItemIdentity], + ) -> Result { + let body = build_unregister_items_request_body(items); + let response = self + .send_signed_envelope(actions::UNREGISTER_ITEMS, body, false) + .await?; + Ok(decode_unregister_items_response(&response.body_tokens)?) + } + + /// Send the NMF `End` record so the peer can drain cleanly. Marks + /// the client as closed; subsequent send attempts return + /// `ClientError::AlreadyClosed`. + pub async fn send_end(&mut self) -> Result<(), ClientError> { + if self.closed { + return Ok(()); + } + let mut buf = Vec::new(); + NmfRecord::End.encode_into(&mut buf)?; + self.stream.write_all(&buf).await?; + self.stream.flush().await?; + self.stream.shutdown().await?; + self.closed = true; + Ok(()) + } + + /// Test-only: surface raw item-status arrays for assertions. + #[doc(hidden)] + pub fn _drain_status(response: &RegisterItemsResponse) -> &[ItemStatus] { + &response.status + } +} + +// ---- async record reader ------------------------------------------------- + +/// Read one NMF record from `stream`. Returns the parsed +/// [`NmfRecord`]; the encoder peers shape sized envelopes as `0x06 + +/// Multibyte Int31 length + payload`, so we need streaming reads +/// rather than bulk-decode. +async fn read_record(stream: &mut T) -> Result { + let mut type_byte = [0u8; 1]; + stream.read_exact(&mut type_byte).await?; + match type_byte[0] { + 0x06 => { + let len = read_multibyte_int31_async(stream).await?; + let mut payload = vec![0u8; len]; + stream.read_exact(&mut payload).await?; + Ok(NmfRecord::SizedEnvelope(payload)) + } + 0x07 => Ok(NmfRecord::End), + 0x08 => { + let len = read_multibyte_int31_async(stream).await?; + let mut payload = vec![0u8; len]; + stream.read_exact(&mut payload).await?; + let message = String::from_utf8(payload).map_err(|_| { + ClientError::Nmf(NmfError::Truncated { + need: 1, + have: 0, + stage: "fault-utf8", + }) + })?; + Ok(NmfRecord::Fault(message)) + } + 0x0A => Ok(NmfRecord::UpgradeResponse), + 0x0B => Ok(NmfRecord::PreambleAck), + 0x0C => Ok(NmfRecord::PreambleEnd), + // For Version / Mode / Via / KnownEncoding / Extensible / + // UnsizedEnvelope / UpgradeRequest, fall back to the bulk + // decoder by buffering a single record's worth of bytes. This + // path is rarely needed for client→server traffic but exists + // for completeness against unexpected peers. + other_byte => { + // Best-effort: read a small lookahead and dispatch to the + // synchronous record decoder. We read at most 2 bytes + // since Version (3-byte total), Mode (2), KnownEncoding + // (2), UpgradeResponse (1) all fit comfortably. + match NmfRecord::decode(&[other_byte]) { + Ok((record, _)) => Ok(record), + Err(_) => { + // Fall through to the multi-byte families. Buffer + // up to 64 bytes for the rare paths. + let mut tail = vec![0u8; 64]; + let n = stream.read(&mut tail).await?; + let mut combined = vec![other_byte]; + if let Some(slice) = tail.get(..n) { + combined.extend_from_slice(slice); + } + let (record, _) = NmfRecord::decode(&combined)?; + Ok(record) + } + } + } + } +} + +async fn read_multibyte_int31_async( + stream: &mut T, +) -> Result { + let mut buf = Vec::with_capacity(5); + let mut byte = [0u8; 1]; + for _ in 0..5 { + stream.read_exact(&mut byte).await?; + buf.push(byte[0]); + if byte[0] & 0x80 == 0 { + break; + } + } + let mut cursor = 0usize; + let value = decode_multibyte_int31(&buf, &mut cursor)?; + usize::try_from(value).map_err(|_| ClientError::Nmf(NmfError::NegativeLength(value))) +} + +// ---- error type ---------------------------------------------------------- + +#[derive(Debug, thiserror::Error)] +#[non_exhaustive] +pub enum ClientError { + #[error("I/O error: {0}")] + Io(#[from] std::io::Error), + #[error("NMF framing error: {0}")] + Nmf(#[from] NmfError), + #[error("NBFX codec error: {0}")] + Nbfx(#[from] NbfxError), + #[error("envelope error: {0}")] + Envelope(#[from] EnvelopeError), + #[error("operation error: {0}")] + Operation(#[from] OperationError), + #[error("auth error: {0}")] + Auth(#[from] mxaccess_asb_nettcp::auth::AuthError), + #[error("preamble has not been sent yet — call send_preamble() first")] + PreambleNotSent, + #[error("client has already been closed via send_end()")] + AlreadyClosed, + #[error("peer reported NMF fault: {0}")] + Fault(String), + #[error("peer closed the channel before sending a response")] + PeerClosed, + #[error("unexpected NMF record on response path: {0}")] + UnexpectedRecord(String), +} + +/// Convenience: `NmfMode` re-export so callers don't have to pull +/// `mxaccess-asb-nettcp` directly to specify the preamble mode. +/// (Currently fixed to `Duplex` per the canonical preamble.) +pub use mxaccess_asb_nettcp::nmf::NmfMode as PreambleMode; + +#[cfg(test)] +#[allow( + clippy::unwrap_used, + clippy::expect_used, + clippy::panic, + clippy::indexing_slicing +)] +mod tests { + use super::*; + use mxaccess_asb_nettcp::auth::CryptoParameters; + use mxaccess_asb_nettcp::nmf::NmfMode; + use tokio::io::DuplexStream; + + fn make_authenticator() -> AsbAuthenticator { + AsbAuthenticator::new("test-passphrase", &CryptoParameters::defaults(), [0u8; 16]).unwrap() + } + + /// Spawn a "fake server" task that runs a closure against the peer + /// end of an in-memory duplex stream. + fn spawn_peer(peer: DuplexStream, work: F) -> tokio::task::JoinHandle + where + F: FnOnce(DuplexStream) -> Fut + Send + 'static, + Fut: std::future::Future + Send + 'static, + { + tokio::spawn(work(peer)) + } + + /// Read bytes from `stream` until the given total length is met. + async fn read_n(stream: &mut DuplexStream, n: usize) -> Vec { + let mut out = vec![0u8; n]; + stream.read_exact(&mut out).await.unwrap(); + out + } + + #[tokio::test] + async fn send_preamble_completes_when_peer_returns_preamble_ack() { + let (client_end, peer_end) = tokio::io::duplex(1024); + let peer_task = spawn_peer(peer_end, |mut peer| async move { + // Drain the preamble bytes the client sends. We don't + // bother decoding them in the test — just count and ack. + // The canonical preamble for a "test://" via is short: 5 + // records, ~30-40 bytes. + let mut buf = vec![0u8; 256]; + let _n = peer.read(&mut buf).await.unwrap(); + // Send PreambleAck (0x0B) back. + peer.write_all(&[0x0Bu8]).await.unwrap(); + peer.flush().await.unwrap(); + peer + }); + + let mut client = AsbClient::new(client_end, make_authenticator(), "test://localhost/path"); + client.send_preamble().await.unwrap(); + let _peer = peer_task.await.unwrap(); + } + + #[tokio::test] + async fn send_preamble_surfaces_fault() { + let (client_end, peer_end) = tokio::io::duplex(1024); + let peer_task = spawn_peer(peer_end, |mut peer| async move { + let mut buf = vec![0u8; 256]; + let _n = peer.read(&mut buf).await.unwrap(); + // Send Fault: 0x08 + multibyte-int31 length + UTF-8 bytes + let msg = b"server-rejected"; + let mut frame = vec![0x08u8]; + mxaccess_asb_nettcp::nmf::encode_multibyte_int31(&mut frame, msg.len() as i32).unwrap(); + frame.extend_from_slice(msg); + peer.write_all(&frame).await.unwrap(); + peer.flush().await.unwrap(); + peer + }); + + let mut client = AsbClient::new(client_end, make_authenticator(), "test://x/y"); + let err = client.send_preamble().await.unwrap_err(); + match err { + ClientError::Fault(msg) => assert_eq!(msg, "server-rejected"), + other => panic!("expected Fault, got {other:?}"), + } + let _ = peer_task.await.unwrap(); + } + + #[tokio::test] + async fn send_envelope_round_trips_through_in_memory_peer() { + let (client_end, peer_end) = tokio::io::duplex(8192); + let peer_task = spawn_peer(peer_end, |mut peer| async move { + // 1. Drain preamble + let mut buf = vec![0u8; 256]; + let _n = peer.read(&mut buf).await.unwrap(); + // 2. Send PreambleAck + peer.write_all(&[0x0Bu8]).await.unwrap(); + peer.flush().await.unwrap(); + + // 3. Read SizedEnvelope: 0x06 + len + payload + let mut typebyte = [0u8; 1]; + peer.read_exact(&mut typebyte).await.unwrap(); + assert_eq!(typebyte[0], 0x06); + // Read length (varint up to 5 bytes). Peer mirror. + let mut lenbuf = Vec::new(); + for _ in 0..5 { + let mut b = [0u8; 1]; + peer.read_exact(&mut b).await.unwrap(); + lenbuf.push(b[0]); + if b[0] & 0x80 == 0 { + break; + } + } + let mut cursor = 0; + let len = mxaccess_asb_nettcp::nmf::decode_multibyte_int31(&lenbuf, &mut cursor) + .unwrap() as usize; + let _request_payload = read_n(&mut peer, len).await; + + // 4. Reply with a SizedEnvelope echoing a synthetic + // RegisterItemsResponse. + use mxaccess_codec::AsbStatus; + let status = vec![ItemStatus { + item: ItemIdentity::absolute_by_name("Tag.A"), + status: AsbStatus::default(), + error_code: 0, + error_code_specified: true, + }]; + let payload = crate::contracts::encode_item_status_array(&status); + let body = synthesise_register_response_body(payload); + let envelope = SoapEnvelope::new(actions::REGISTER_ITEMS).with_body_tokens(body); + let mut response_dict = DynamicDictionary::new(); + let envelope_bytes = encode_envelope(&envelope, &mut response_dict).unwrap(); + + let mut frame = vec![0x06u8]; + mxaccess_asb_nettcp::nmf::encode_multibyte_int31( + &mut frame, + envelope_bytes.len() as i32, + ) + .unwrap(); + frame.extend_from_slice(&envelope_bytes); + peer.write_all(&frame).await.unwrap(); + peer.flush().await.unwrap(); + + peer + }); + + let mut client = AsbClient::new(client_end, make_authenticator(), "test://h/p"); + client.send_preamble().await.unwrap(); + // Use authenticator's hash variant — but to make the test + // deterministic, set the algorithm so MAC happens. Default + // params use MD5 + we accepted a synthetic remote pub key, so + // the authenticator hasn't seen `accept_connect_response` yet. + // Skip that branch by using `register_items` after manually + // priming the authenticator. + let bob = make_authenticator(); + client + .authenticator_mut() + .accept_connect_response(bob.local_public_key(), None); + + let response = client + .register_items(&[ItemIdentity::absolute_by_name("Tag.A")], true, false) + .await; + let response = response.unwrap(); + assert_eq!(response.status.len(), 1); + assert_eq!(response.status[0].item.name.as_deref(), Some("Tag.A")); + + let _ = peer_task.await.unwrap(); + } + + #[tokio::test] + async fn send_envelope_before_preamble_fails() { + let (client_end, _peer_end) = tokio::io::duplex(1024); + let mut client = AsbClient::new(client_end, make_authenticator(), "test://x/y"); + let env = SoapEnvelope::new(actions::READ); + let err = client.send_envelope(&env).await.unwrap_err(); + assert!(matches!(err, ClientError::PreambleNotSent)); + } + + #[tokio::test] + async fn send_end_writes_record_07() { + let (client_end, mut peer_end) = tokio::io::duplex(1024); + let mut client = AsbClient::new(client_end, make_authenticator(), "test://x/y"); + // We need preamble done so we don't hit the not-sent guard + // (send_end has no such guard, but the flow is more realistic). + client.preamble_sent = true; + client.send_end().await.unwrap(); + let mut buf = [0u8; 1]; + peer_end.read_exact(&mut buf).await.unwrap(); + assert_eq!(buf[0], 0x07); + } + + /// Helper: build a synthetic body matching the + /// RegisterItemsResponse shape so the test can verify the client + /// extracts the Status array correctly. + fn synthesise_register_response_body( + status_payload: Vec, + ) -> Vec { + use mxaccess_asb_nettcp::nbfx::{NbfxName, NbfxText, NbfxToken}; + const IOM_NS: &str = "urn:msg.data.asb.iom:2"; + vec![ + NbfxToken::Element { + prefix: None, + name: NbfxName::Inline("RegisterItemsResponse".to_string()), + }, + NbfxToken::DefaultNamespace { + value: NbfxText::Chars(IOM_NS.to_string()), + }, + NbfxToken::Element { + prefix: None, + name: NbfxName::Inline("Status".to_string()), + }, + NbfxToken::Element { + prefix: None, + name: NbfxName::Inline("ASBIData".to_string()), + }, + NbfxToken::Text(NbfxText::Bytes(status_payload)), + NbfxToken::EndElement, // + NbfxToken::EndElement, // + NbfxToken::EndElement, // + ] + } + + /// Sanity-check that NmfMode re-export matches the upstream type. + #[test] + fn preamble_mode_reexport_matches_upstream() { + assert_eq!(PreambleMode::Duplex as u8, NmfMode::Duplex as u8); + } +} diff --git a/rust/crates/mxaccess-asb/src/lib.rs b/rust/crates/mxaccess-asb/src/lib.rs index 80ec157..bd0f63b 100644 --- a/rust/crates/mxaccess-asb/src/lib.rs +++ b/rust/crates/mxaccess-asb/src/lib.rs @@ -9,10 +9,13 @@ #![forbid(unsafe_code)] +pub mod client; pub mod contracts; pub mod envelope; pub mod operations; +pub use client::{AsbClient, ClientError, PreambleMode}; + pub use contracts::{ ItemIdentity, ItemIdentityType, ItemReferenceType, ItemStatus, decode_item_identity_array, decode_item_status_array, encode_item_identity_array, encode_item_status_array,