//! `AsbClient` — `IASBIDataV2` request/response loop over a transport //! that implements `AsyncRead + AsyncWrite`. //! //! Wires together every M5 framing layer: //! //! ```text //! Tokio AsyncRead+AsyncWrite (typically a TcpStream) //! │ //! ▼ //! [MS-NMF] framing (F20: nmf::NmfRecord) //! │ preamble: Version → Mode(Duplex) → Via → KnownEncoding(BinaryWithDictionary) → PreambleEnd //! │ per-msg: SizedEnvelope { Multibyte Int31 length + payload bytes } //! │ shutdown: End record //! ▼ //! [MC-NBFX]/[MC-NBFS] binary XML (F21 + F22) //! ▼ //! SOAP-1.2 envelope (F25 step 1: SoapEnvelope, encode_envelope/decode_envelope) //! ▼ //! IASBIDataV2 operation contract (F25 steps 2/3: per-op request/response) //! ``` //! //! The client is generic over the transport so the network-bound //! request/response loop can be unit-tested against in-memory streams //! (`tokio::io::DuplexStream`) without a live ASB endpoint. //! //! ## Scope of this iteration (F25 step 4) //! //! Implements: //! * [`AsbClient::new`] — wraps a transport + authenticator into a //! ready-to-use client (assumes the preamble has already been sent //! or that `send_preamble` will be called next). //! * [`AsbClient::send_preamble`] — writes the canonical preamble //! record sequence and waits for a `PreambleAck` from the peer. //! * [`AsbClient::send_envelope`] — frames a `SoapEnvelope` in a //! `SizedEnvelope` record, writes it, reads the next record from //! the peer, decodes the response envelope. //! * [`AsbClient::send_end`] — writes the NMF `End` record so the //! peer can drain cleanly. //! * Per-operation thin wrappers: [`AsbClient::register_items`], //! [`AsbClient::unregister_items`]. //! //! Stubbed for next F25 iteration: //! * `AsbClient::connect` — the DH `Connect` + `AuthenticateMe` //! handshake. Needs the `ConnectRequest` / `ConnectResponse` body //! builders (regular WCF XML, not the IAsbCustomSerializableType //! fast-path) and authentication-data assembly off F23's //! `AsbAuthenticator::create_authentication_data`. //! * Read / Write / Subscription operation wrappers. use mxaccess_asb_nettcp::auth::AsbAuthenticator; use mxaccess_asb_nettcp::nbfx::{DynamicDictionary, NbfxError}; use mxaccess_asb_nettcp::nmf::{self, NmfError, NmfRecord, decode_multibyte_int31}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use crate::contracts::{ItemIdentity, ItemStatus}; use crate::envelope::{ConnectionValidator, EnvelopeError, SoapEnvelope}; use crate::operations::{ AddMonitoredItemsResponse, ConnectResponse, CreateSubscriptionResponse, DeleteMonitoredItemsResponse, DeleteSubscriptionResponse, MinimalMonitoredItem, MinimalWriteValue, OperationError, PublishResponse, PublishWriteCompleteResponse, ReadResponse, RegisterItemsResponse, UnregisterItemsResponse, WriteResponse, build_add_monitored_items_request_body, build_authenticate_me_request_body, build_connect_request_body, build_create_subscription_request_body, build_delete_monitored_items_request_body, build_delete_subscription_request_body, build_disconnect_request_body, build_keep_alive_request_body, build_publish_request_body, build_publish_write_complete_request_body, build_read_request_body, build_register_items_request_body, build_unregister_items_request_body, build_write_request_body, decode_add_monitored_items_response, decode_connect_response, decode_create_subscription_response, decode_delete_monitored_items_response, decode_publish_response, decode_publish_write_complete_response, decode_read_response, decode_register_items_response, decode_unregister_items_response, decode_write_response, }; use crate::{actions, decode_envelope, encode_envelope}; /// `IASBIDataV2` request/response client over a Tokio /// `AsyncRead + AsyncWrite` transport. pub struct AsbClient { stream: T, authenticator: AsbAuthenticator, via_uri: String, write_dictionary: DynamicDictionary, read_dictionary: DynamicDictionary, preamble_sent: bool, closed: bool, } impl AsbClient { /// Wrap a fresh transport. The caller is responsible for opening /// the underlying TCP connection (or test stream) before passing /// it in. `via_uri` is the `net.tcp://host:port/path` URL the peer /// expects in the `ViaRecord`. pub fn new(stream: T, authenticator: AsbAuthenticator, via_uri: impl Into) -> Self { Self { stream, authenticator, via_uri: via_uri.into(), write_dictionary: DynamicDictionary::new(), read_dictionary: DynamicDictionary::new(), preamble_sent: false, closed: false, } } /// Borrow the inner authenticator. Useful for tests + for the F25 /// `connect` flow that needs to call `accept_connect_response`. pub fn authenticator_mut(&mut self) -> &mut AsbAuthenticator { &mut self.authenticator } /// Write the canonical NMF preamble (`Version 1.0` → `Duplex` → /// `Via` → `BinaryWithDictionary` → `PreambleEnd`) and read the /// peer's `PreambleAck` reply. Records other than `PreambleAck` — /// notably `Fault` — surface as a typed error. /// /// Idempotent in the sense that a second call does nothing; the /// preamble is only ever exchanged once per session. pub async fn send_preamble(&mut self) -> Result<(), ClientError> { if self.preamble_sent { return Ok(()); } let mut buf = Vec::new(); nmf::encode_preamble(&self.via_uri, &mut buf)?; self.stream.write_all(&buf).await?; self.stream.flush().await?; let record = read_record(&mut self.stream).await?; match record { NmfRecord::PreambleAck => { self.preamble_sent = true; Ok(()) } NmfRecord::Fault(message) => Err(ClientError::Fault(message)), other => Err(ClientError::UnexpectedRecord(format!("{other:?}"))), } } /// Encode a `SoapEnvelope` to NBFX bytes, wrap in a `SizedEnvelope` /// NMF record, write, then read the peer's reply (a single /// `SizedEnvelope` containing the response envelope). /// /// Updates the per-session `write_dictionary` + `read_dictionary` /// so subsequent calls compress recurring strings via the dynamic /// dictionary. pub async fn send_envelope( &mut self, envelope: &SoapEnvelope, ) -> Result { if !self.preamble_sent { return Err(ClientError::PreambleNotSent); } if self.closed { return Err(ClientError::AlreadyClosed); } // Default the WS-Addressing To header to the same URL we put // in the NMF Via record. WCF dispatches by To-URL match // against the registered service URL; an empty / wrong To // produces an AddressFilterMismatch fault. let envelope = if envelope.to_uri.is_some() { envelope.clone() } else { envelope.clone().with_to(self.via_uri.clone()) }; let payload = encode_envelope(&envelope, &mut self.write_dictionary)?; let mut framed = Vec::new(); NmfRecord::SizedEnvelope(payload).encode_into(&mut framed)?; self.stream.write_all(&framed).await?; self.stream.flush().await?; let record = read_record(&mut self.stream).await?; match record { NmfRecord::SizedEnvelope(reply_bytes) => { let decoded = decode_envelope(&reply_bytes, &mut self.read_dictionary)?; if let Some(fault) = detect_soap_fault(&decoded) { return Err(fault); } Ok(decoded) } NmfRecord::Fault(message) => Err(ClientError::Fault(message)), NmfRecord::End => Err(ClientError::PeerClosed), other => Err(ClientError::UnexpectedRecord(format!("{other:?}"))), } } /// Sign a request via the authenticator, then send via /// [`Self::send_envelope`]. Mirrors the .NET pattern at /// `MxAsbDataClient.cs:205-206` (`authenticator.Sign(request); /// channel.RegisterItems(request);`). /// /// **Canonical-XML path**: when `xml_for_signing` is `Some(bytes)`, /// HMAC is computed over those bytes — the bytes the caller /// produced via `xml_canonical::emit_*` to match what .NET's /// `XmlSerializer.Serialize(...)` would emit (`AsbSerialization /// .cs:12-48`). This is the production path; the server's HMAC /// recomputation will match. /// /// **Legacy NBFX-bytes path**: when `xml_for_signing` is `None`, /// HMAC is computed over the NBFX-encoded SOAP envelope. Used for /// operations that don't have an XML emitter yet (Read, Write, /// Subscription ops). The server will reject these with an /// `InternalServiceFault` until F28 expands coverage. pub async fn send_signed_envelope( &mut self, action: &str, body_tokens: Vec, xml_for_signing: Option<&[u8]>, force_hmac: bool, ) -> Result { let signed = match xml_for_signing { Some(xml) => self.authenticator.sign(xml, force_hmac)?, None => { let unsigned = SoapEnvelope::new(action).with_body_tokens(body_tokens.clone()); let mut probe_dict = DynamicDictionary::new(); let unsigned_bytes = encode_envelope(&unsigned, &mut probe_dict)?; self.authenticator.sign(&unsigned_bytes, force_hmac)? } }; let validator = ConnectionValidator::from_signed(&signed); let signed_env = SoapEnvelope::new(action) .with_body_tokens(body_tokens) .with_validator(validator); self.send_envelope(&signed_env).await } /// Run the full DH `Connect` + `AuthenticateMe` handshake. Mirrors /// `MxAsbDataClient.cs:84-112`: /// /// 1. Send `ConnectRequest` (unsigned — the authenticator hasn't /// received the service key yet) carrying our connection ID + /// public key. /// 2. Receive `ConnectResponse` containing the service public key /// + optional connection lifetime + optional service auth data. /// 3. Call `authenticator.accept_connect_response(...)` so it can /// derive the shared secret + decide on Apollo vs Baktun /// encryption based on the `:V2` lifetime suffix. /// 4. Build encrypted `ConsumerAuthenticationData` via /// `authenticator.create_authentication_data()` (this is /// `Encrypt(local_pub || remote_pub)` — see F23). /// 5. Send signed `AuthenticateMeRequest` with `forceHmac=true` /// (one-way, no response expected). /// /// Caller must have called [`Self::send_preamble`] first. Returns /// the `ConnectResponse` so callers can inspect the negotiated /// connection lifetime. pub async fn connect(&mut self) -> Result { if !self.preamble_sent { return Err(ClientError::PreambleNotSent); } // Step 1: ConnectRequest (unsigned) let connection_id = self.authenticator.connection_id(); let public_key = self.authenticator.local_public_key().to_vec(); let connect_body = build_connect_request_body(connection_id, &public_key); let unsigned_env = SoapEnvelope::new(actions::CONNECT).with_body_tokens(connect_body); // Step 2: send + receive ConnectResponse let response_env = self.send_envelope(&unsigned_env).await?; let connect_response = decode_connect_response(&response_env.body_tokens, &self.read_dictionary)?; // Step 3: feed the service public key + lifetime into the // authenticator so it can derive the shared secret. self.authenticator.accept_connect_response( &connect_response.service_public_key, connect_response.connection_lifetime.as_deref(), ); // Step 4: build encrypted authentication data (local_pub || // remote_pub, encrypted under the derived AES key). Errors // surface through ClientError::Auth. let auth_data = self.authenticator.create_authentication_data()?; // Step 5: AuthenticateMe one-way, signed with HMAC-SHA1 forced. // The HMAC must cover .NET's `request.ToXml()` canonical form // — see `xml_canonical::emit_authenticate_me_xml`. Build the // pre-signing validator (empty MAC + IV, message number peeked // from the authenticator), emit the canonical XML, then call // sign() which uses the same message number internally. let pre_signing = ConnectionValidator { connection_id: self.authenticator.connection_id(), message_number: self.authenticator.peek_next_message_number(), mac_base64: String::new(), iv_base64: String::new(), }; let consumer_data_b64 = crate::xml_canonical::base64_encode(&auth_data.ciphertext); let consumer_iv_b64 = crate::xml_canonical::base64_encode(&auth_data.iv); let xml = crate::xml_canonical::emit_authenticate_me_xml( &pre_signing, &consumer_data_b64, &consumer_iv_b64, ); let auth_body = build_authenticate_me_request_body(&auth_data.ciphertext, &auth_data.iv); self.send_signed_envelope_one_way( actions::AUTHENTICATE_ME, auth_body, Some(&xml), true, ) .await?; Ok(connect_response) } /// One-way send: encode + frame + write, but do **not** read a /// response. Mirrors WCF's `[OperationContract(IsOneWay = true)]` /// semantics — `KeepAlive`, `Disconnect`, and `AuthenticateMe` all /// take this path on the .NET side. pub async fn send_envelope_one_way( &mut self, envelope: &SoapEnvelope, ) -> Result<(), ClientError> { if !self.preamble_sent { return Err(ClientError::PreambleNotSent); } if self.closed { return Err(ClientError::AlreadyClosed); } let envelope = if envelope.to_uri.is_some() { envelope.clone() } else { envelope.clone().with_to(self.via_uri.clone()) }; let payload = encode_envelope(&envelope, &mut self.write_dictionary)?; let mut framed = Vec::new(); NmfRecord::SizedEnvelope(payload).encode_into(&mut framed)?; self.stream.write_all(&framed).await?; self.stream.flush().await?; Ok(()) } /// One-way signed send for operations that need a /// `ConnectionValidator` header. Mirrors `MxAsbDataClient` calls /// like `authenticator.Sign(...)` followed by an `IsOneWay = true` /// channel call. pub async fn send_signed_envelope_one_way( &mut self, action: &str, body_tokens: Vec, xml_for_signing: Option<&[u8]>, force_hmac: bool, ) -> Result<(), ClientError> { let signed = match xml_for_signing { Some(xml) => { if std::env::var("MX_ASB_TRACE_SIGN").ok().is_some() { eprintln!("asb.sign.action={action}"); eprintln!("asb.sign.xml-utf8-len={}", xml.len()); eprintln!("asb.sign.xml-text=\n{}", String::from_utf8_lossy(xml)); } self.authenticator.sign(xml, force_hmac)? } None => { let unsigned = SoapEnvelope::new(action).with_body_tokens(body_tokens.clone()); let mut probe_dict = DynamicDictionary::new(); let unsigned_bytes = encode_envelope(&unsigned, &mut probe_dict)?; self.authenticator.sign(&unsigned_bytes, force_hmac)? } }; let validator = ConnectionValidator::from_signed(&signed); let signed_env = SoapEnvelope::new(action) .with_body_tokens(body_tokens) .with_validator(validator); self.send_envelope_one_way(&signed_env).await } /// `Disconnect` operation — one-way signed envelope carrying a /// fresh encrypted authentication-data blob, used to close the ASB /// session cleanly. Mirrors `AsbContracts.cs:22` (one-way op) + /// `MxAsbDataClient`'s graceful-close path. /// /// Builds an `AuthenticationData` payload via F23's /// `create_authentication_data()` (which encrypts `local_pub || /// remote_pub` under the derived AES key — same payload shape as /// `AuthenticateMe` but using a fresh IV). /// /// Caller should typically follow this with [`Self::send_end`] + /// `stream.shutdown()`. pub async fn disconnect(&mut self) -> Result<(), ClientError> { let auth_data = self.authenticator.create_authentication_data()?; let pre_signing = ConnectionValidator { connection_id: self.authenticator.connection_id(), message_number: self.authenticator.peek_next_message_number(), mac_base64: String::new(), iv_base64: String::new(), }; let xml = crate::xml_canonical::emit_disconnect_xml( &pre_signing, &crate::xml_canonical::base64_encode(&auth_data.ciphertext), &crate::xml_canonical::base64_encode(&auth_data.iv), ); let body = build_disconnect_request_body(&auth_data.ciphertext, &auth_data.iv); self.send_signed_envelope_one_way(actions::DISCONNECT, body, Some(&xml), false) .await } /// `KeepAlive` operation — one-way signed envelope with an empty /// `KeepAliveRequest` body. Used to keep the channel alive past /// the WCF inactivity timeout (`MxAsbDataClient.cs:683`, /// `ReliableSession.InactivityTimeout = 30s`). pub async fn keep_alive(&mut self) -> Result<(), ClientError> { let pre_signing = ConnectionValidator { connection_id: self.authenticator.connection_id(), message_number: self.authenticator.peek_next_message_number(), mac_base64: String::new(), iv_base64: String::new(), }; let xml = crate::xml_canonical::emit_keep_alive_xml(&pre_signing); let body = build_keep_alive_request_body(); self.send_signed_envelope_one_way(actions::KEEP_ALIVE, body, Some(&xml), false) .await } /// `Read` operation — sends a signed `ReadIn` SOAP envelope and /// decodes the `ReadResponse` (Status array + Values array). pub async fn read(&mut self, items: &[ItemIdentity]) -> Result { let body = build_read_request_body(items); let response = self .send_signed_envelope(actions::READ, body, None, false) .await?; Ok(decode_read_response(&response.body_tokens)?) } /// `PublishWriteComplete` operation — long-poll the /// write-completion-callback queue. Mirrors the /// `[OperationContract(Action = "...:publishWriteCompleteIn")]` /// at `AsbContracts.cs:42`. Returns a count of completed writes /// (per-element decode is deferred to a later iteration once a /// live capture confirms the WCF DataContract XML shape). pub async fn publish_write_complete( &mut self, ) -> Result { let body = build_publish_write_complete_request_body(); let response = self .send_signed_envelope(actions::PUBLISH_WRITE_COMPLETE, body, None, false) .await?; Ok(decode_publish_write_complete_response( &response.body_tokens, )?) } /// `DeleteMonitoredItems` operation — removes items from a /// subscription. Returns the per-item Status array. pub async fn delete_monitored_items( &mut self, subscription_id: i64, items: &[MinimalMonitoredItem], ) -> Result { let body = build_delete_monitored_items_request_body(subscription_id, items); let response = self .send_signed_envelope(actions::DELETE_MONITORED_ITEMS, body, None, false) .await?; Ok(decode_delete_monitored_items_response( &response.body_tokens, )?) } /// `Write` operation — sends a signed `WriteIn` SOAP envelope and /// decodes the `WriteResponse` (per-item Status array). /// /// `items.len()` must equal `values.len()`; the .NET reference /// pairs them positionally per `MxAsbDataClient.cs` Write path. /// `write_handle` is an opaque correlation ID echoed in the /// PublishWriteComplete callback (irrelevant for fire-and-forget /// writes; pass `0`). pub async fn write( &mut self, items: &[ItemIdentity], values: &[MinimalWriteValue], write_handle: u32, ) -> Result { let body = build_write_request_body(items, values, write_handle); let response = self .send_signed_envelope(actions::WRITE, body, None, false) .await?; Ok(decode_write_response(&response.body_tokens)?) } /// `CreateSubscription` operation — allocates a server-side /// subscription and returns its ID. Caller threads the ID through /// subsequent `add_monitored_items` / `publish` / /// `delete_subscription` calls. pub async fn create_subscription( &mut self, max_queue_size: i64, sample_interval: u64, ) -> Result { let body = build_create_subscription_request_body(max_queue_size, sample_interval); let response = self .send_signed_envelope(actions::CREATE_SUBSCRIPTION, body, None, false) .await?; Ok(decode_create_subscription_response( &response.body_tokens, &self.read_dictionary, )?) } /// `AddMonitoredItems` operation — adds items to an existing /// subscription. Uses [`MinimalMonitoredItem`] (Item + /// SampleInterval + Buffered); optional fields are deferred to a /// later F25 iteration. pub async fn add_monitored_items( &mut self, subscription_id: i64, items: &[MinimalMonitoredItem], require_id: bool, ) -> Result { let body = build_add_monitored_items_request_body(subscription_id, items, require_id); let response = self .send_signed_envelope(actions::ADD_MONITORED_ITEMS, body, None, false) .await?; Ok(decode_add_monitored_items_response(&response.body_tokens)?) } /// `Publish` operation — long-polls the subscription queue for /// available samples. Typical pattern is to call this in a loop /// with a small `tokio::time::timeout` per call. pub async fn publish(&mut self, subscription_id: i64) -> Result { let body = build_publish_request_body(subscription_id); let response = self .send_signed_envelope(actions::PUBLISH, body, None, false) .await?; Ok(decode_publish_response(&response.body_tokens)?) } /// `DeleteSubscription` operation — releases a server-side /// subscription. The response body is empty per /// `AsbContracts.cs:239-240`. pub async fn delete_subscription( &mut self, subscription_id: i64, ) -> Result { let body = build_delete_subscription_request_body(subscription_id); let _ = self .send_signed_envelope(actions::DELETE_SUBSCRIPTION, body, None, false) .await?; Ok(DeleteSubscriptionResponse) } /// `RegisterItems` operation — sends a signed `RegisterItemsIn` /// SOAP envelope and decodes the `RegisterItemsResponse`. pub async fn register_items( &mut self, items: &[ItemIdentity], require_id: bool, register_only: bool, ) -> Result { let pre_signing = ConnectionValidator { connection_id: self.authenticator.connection_id(), message_number: self.authenticator.peek_next_message_number(), mac_base64: String::new(), iv_base64: String::new(), }; let xml = crate::xml_canonical::emit_register_items_request_xml( &pre_signing, items, require_id, register_only, ); let body = build_register_items_request_body(items, require_id, register_only); let response = self .send_signed_envelope(actions::REGISTER_ITEMS, body, Some(&xml), false) .await?; Ok(decode_register_items_response(&response.body_tokens)?) } /// `UnregisterItems` operation — sends a signed `UnregisterItemsIn` /// SOAP envelope and decodes the `UnregisterItemsResponse`. pub async fn unregister_items( &mut self, items: &[ItemIdentity], ) -> Result { let pre_signing = ConnectionValidator { connection_id: self.authenticator.connection_id(), message_number: self.authenticator.peek_next_message_number(), mac_base64: String::new(), iv_base64: String::new(), }; let xml = crate::xml_canonical::emit_unregister_items_request_xml(&pre_signing, items); let body = build_unregister_items_request_body(items); let response = self .send_signed_envelope(actions::UNREGISTER_ITEMS, body, Some(&xml), false) .await?; Ok(decode_unregister_items_response(&response.body_tokens)?) } /// Send the NMF `End` record so the peer can drain cleanly. Marks /// the client as closed; subsequent send attempts return /// `ClientError::AlreadyClosed`. pub async fn send_end(&mut self) -> Result<(), ClientError> { if self.closed { return Ok(()); } let mut buf = Vec::new(); NmfRecord::End.encode_into(&mut buf)?; self.stream.write_all(&buf).await?; self.stream.flush().await?; self.stream.shutdown().await?; self.closed = true; Ok(()) } /// Test-only: surface raw item-status arrays for assertions. #[doc(hidden)] pub fn _drain_status(response: &RegisterItemsResponse) -> &[ItemStatus] { &response.status } } // ---- async record reader ------------------------------------------------- /// Read one NMF record from `stream`. Returns the parsed /// [`NmfRecord`]; the encoder peers shape sized envelopes as `0x06 + /// Multibyte Int31 length + payload`, so we need streaming reads /// rather than bulk-decode. async fn read_record(stream: &mut T) -> Result { let mut type_byte = [0u8; 1]; stream.read_exact(&mut type_byte).await?; match type_byte[0] { 0x06 => { let len = read_multibyte_int31_async(stream).await?; let mut payload = vec![0u8; len]; stream.read_exact(&mut payload).await?; Ok(NmfRecord::SizedEnvelope(payload)) } 0x07 => Ok(NmfRecord::End), 0x08 => { let len = read_multibyte_int31_async(stream).await?; let mut payload = vec![0u8; len]; stream.read_exact(&mut payload).await?; let message = String::from_utf8(payload).map_err(|_| { ClientError::Nmf(NmfError::Truncated { need: 1, have: 0, stage: "fault-utf8", }) })?; Ok(NmfRecord::Fault(message)) } 0x0A => Ok(NmfRecord::UpgradeResponse), 0x0B => Ok(NmfRecord::PreambleAck), 0x0C => Ok(NmfRecord::PreambleEnd), // For Version / Mode / Via / KnownEncoding / Extensible / // UnsizedEnvelope / UpgradeRequest, fall back to the bulk // decoder by buffering a single record's worth of bytes. This // path is rarely needed for client→server traffic but exists // for completeness against unexpected peers. other_byte => { // Best-effort: read a small lookahead and dispatch to the // synchronous record decoder. We read at most 2 bytes // since Version (3-byte total), Mode (2), KnownEncoding // (2), UpgradeResponse (1) all fit comfortably. match NmfRecord::decode(&[other_byte]) { Ok((record, _)) => Ok(record), Err(_) => { // Fall through to the multi-byte families. Buffer // up to 64 bytes for the rare paths. let mut tail = vec![0u8; 64]; let n = stream.read(&mut tail).await?; let mut combined = vec![other_byte]; if let Some(slice) = tail.get(..n) { combined.extend_from_slice(slice); } let (record, _) = NmfRecord::decode(&combined)?; Ok(record) } } } } } async fn read_multibyte_int31_async( stream: &mut T, ) -> Result { let mut buf = Vec::with_capacity(5); let mut byte = [0u8; 1]; for _ in 0..5 { stream.read_exact(&mut byte).await?; buf.push(byte[0]); if byte[0] & 0x80 == 0 { break; } } let mut cursor = 0usize; let value = decode_multibyte_int31(&buf, &mut cursor)?; usize::try_from(value).map_err(|_| ClientError::Nmf(NmfError::NegativeLength(value))) } /// Inspect a `DecodedEnvelope` for a SOAP-1.2 `` body and /// return a typed `ClientError::SoapFault` if found. Returns `None` /// for non-fault responses so the normal decode path runs. /// /// WCF surfaces server-side exceptions as a `dispatcher/fault` action /// envelope wrapping ``. The fault structure uses static dict /// ids (Reason=144, Text=146, Value=154 per `[MC-NBFS]`) which our /// `nbfs.rs` static table partially mismatches; rather than relying /// on element-name lookup, we accept any envelope whose Action header /// matches the canonical fault action template AND extract the /// human-readable reason as the longest `Chars` text in the body. /// The fault code is the first short `Chars` value (typically /// `s:Receiver` or `s:Sender`). fn detect_soap_fault(decoded: &crate::DecodedEnvelope) -> Option { use mxaccess_asb_nettcp::nbfx::{NbfxText, NbfxToken}; let action_is_fault = decoded .action .as_deref() .is_some_and(|a| a.contains("/fault") || a.ends_with(":fault")); if !action_is_fault { return None; } // Walk the body's text records. The fault Reason text is by far // the longest free-form Chars in a fault body; the Code/Subcode // values are shorter qname-style strings ("s:Receiver", "...:. // InternalServiceFault"). Sort accordingly. let mut all_chars: Vec<&str> = Vec::new(); for tok in &decoded.body_tokens { if let NbfxToken::Text(NbfxText::Chars(s)) = tok { all_chars.push(s); } } let reason = all_chars .iter() .max_by_key(|s| s.len()) .map(|s| (*s).to_string()) .unwrap_or_else(|| "(no reason text)".to_string()); // First Chars that looks like a SOAP fault code qname (contains a // colon or ends with "Fault"). let code = all_chars .iter() .find(|s| s.contains(':') || s.ends_with("Fault")) .map(|s| (*s).to_string()); let action = decoded.action.clone().unwrap_or_default(); Some(ClientError::SoapFault { action, code, reason, }) } // ---- error type ---------------------------------------------------------- #[derive(Debug, thiserror::Error)] #[non_exhaustive] pub enum ClientError { #[error("I/O error: {0}")] Io(#[from] std::io::Error), #[error("NMF framing error: {0}")] Nmf(#[from] NmfError), #[error("NBFX codec error: {0}")] Nbfx(#[from] NbfxError), #[error("envelope error: {0}")] Envelope(#[from] EnvelopeError), #[error("operation error: {0}")] Operation(#[from] OperationError), #[error("auth error: {0}")] Auth(#[from] mxaccess_asb_nettcp::auth::AuthError), #[error("preamble has not been sent yet — call send_preamble() first")] PreambleNotSent, #[error("client has already been closed via send_end()")] AlreadyClosed, #[error("peer reported NMF fault: {0}")] Fault(String), /// SOAP-level fault inside a SizedEnvelope. WCF's /// `dispatcher/fault` action wraps a SOAP 1.2 `` body /// when the service throws an unhandled exception. The action is /// preserved so callers can correlate (e.g. /// `.../dispatcher/fault` is the generic catch-all; /// `.../addressing/fault` indicates AddressFilterMismatch). The /// `reason` is the human-readable `` text. #[error("SOAP fault from peer (action={action}): {reason}")] SoapFault { action: String, code: Option, reason: String, }, #[error("peer closed the channel before sending a response")] PeerClosed, #[error("unexpected NMF record on response path: {0}")] UnexpectedRecord(String), } /// Convenience: `NmfMode` re-export so callers don't have to pull /// `mxaccess-asb-nettcp` directly to specify the preamble mode. /// (Currently fixed to `Duplex` per the canonical preamble.) pub use mxaccess_asb_nettcp::nmf::NmfMode as PreambleMode; #[cfg(test)] #[allow( clippy::unwrap_used, clippy::expect_used, clippy::panic, clippy::indexing_slicing )] mod tests { use super::*; use mxaccess_asb_nettcp::auth::CryptoParameters; use mxaccess_asb_nettcp::nmf::NmfMode; use tokio::io::DuplexStream; fn make_authenticator() -> AsbAuthenticator { AsbAuthenticator::new("test-passphrase", &CryptoParameters::defaults(), [0u8; 16]).unwrap() } /// Spawn a "fake server" task that runs a closure against the peer /// end of an in-memory duplex stream. fn spawn_peer(peer: DuplexStream, work: F) -> tokio::task::JoinHandle where F: FnOnce(DuplexStream) -> Fut + Send + 'static, Fut: std::future::Future + Send + 'static, { tokio::spawn(work(peer)) } /// Read bytes from `stream` until the given total length is met. async fn read_n(stream: &mut DuplexStream, n: usize) -> Vec { let mut out = vec![0u8; n]; stream.read_exact(&mut out).await.unwrap(); out } #[tokio::test] async fn send_preamble_completes_when_peer_returns_preamble_ack() { let (client_end, peer_end) = tokio::io::duplex(1024); let peer_task = spawn_peer(peer_end, |mut peer| async move { // Drain the preamble bytes the client sends. We don't // bother decoding them in the test — just count and ack. // The canonical preamble for a "test://" via is short: 5 // records, ~30-40 bytes. let mut buf = vec![0u8; 256]; let _n = peer.read(&mut buf).await.unwrap(); // Send PreambleAck (0x0B) back. peer.write_all(&[0x0Bu8]).await.unwrap(); peer.flush().await.unwrap(); peer }); let mut client = AsbClient::new(client_end, make_authenticator(), "test://localhost/path"); client.send_preamble().await.unwrap(); let _peer = peer_task.await.unwrap(); } #[tokio::test] async fn send_preamble_surfaces_fault() { let (client_end, peer_end) = tokio::io::duplex(1024); let peer_task = spawn_peer(peer_end, |mut peer| async move { let mut buf = vec![0u8; 256]; let _n = peer.read(&mut buf).await.unwrap(); // Send Fault: 0x08 + multibyte-int31 length + UTF-8 bytes let msg = b"server-rejected"; let mut frame = vec![0x08u8]; mxaccess_asb_nettcp::nmf::encode_multibyte_int31(&mut frame, msg.len() as i32).unwrap(); frame.extend_from_slice(msg); peer.write_all(&frame).await.unwrap(); peer.flush().await.unwrap(); peer }); let mut client = AsbClient::new(client_end, make_authenticator(), "test://x/y"); let err = client.send_preamble().await.unwrap_err(); match err { ClientError::Fault(msg) => assert_eq!(msg, "server-rejected"), other => panic!("expected Fault, got {other:?}"), } let _ = peer_task.await.unwrap(); } #[tokio::test] async fn send_envelope_round_trips_through_in_memory_peer() { let (client_end, peer_end) = tokio::io::duplex(8192); let peer_task = spawn_peer(peer_end, |mut peer| async move { // 1. Drain preamble let mut buf = vec![0u8; 256]; let _n = peer.read(&mut buf).await.unwrap(); // 2. Send PreambleAck peer.write_all(&[0x0Bu8]).await.unwrap(); peer.flush().await.unwrap(); // 3. Read SizedEnvelope: 0x06 + len + payload let mut typebyte = [0u8; 1]; peer.read_exact(&mut typebyte).await.unwrap(); assert_eq!(typebyte[0], 0x06); // Read length (varint up to 5 bytes). Peer mirror. let mut lenbuf = Vec::new(); for _ in 0..5 { let mut b = [0u8; 1]; peer.read_exact(&mut b).await.unwrap(); lenbuf.push(b[0]); if b[0] & 0x80 == 0 { break; } } let mut cursor = 0; let len = mxaccess_asb_nettcp::nmf::decode_multibyte_int31(&lenbuf, &mut cursor) .unwrap() as usize; let _request_payload = read_n(&mut peer, len).await; // 4. Reply with a SizedEnvelope echoing a synthetic // RegisterItemsResponse. use mxaccess_codec::AsbStatus; let status = vec![ItemStatus { item: ItemIdentity::absolute_by_name("Tag.A"), status: AsbStatus::default(), error_code: 0, error_code_specified: true, }]; let payload = crate::contracts::encode_item_status_array(&status); let body = synthesise_register_response_body(payload); let envelope = SoapEnvelope::new(actions::REGISTER_ITEMS).with_body_tokens(body); let mut response_dict = DynamicDictionary::new(); let envelope_bytes = encode_envelope(&envelope, &mut response_dict).unwrap(); let mut frame = vec![0x06u8]; mxaccess_asb_nettcp::nmf::encode_multibyte_int31( &mut frame, envelope_bytes.len() as i32, ) .unwrap(); frame.extend_from_slice(&envelope_bytes); peer.write_all(&frame).await.unwrap(); peer.flush().await.unwrap(); peer }); let mut client = AsbClient::new(client_end, make_authenticator(), "test://h/p"); client.send_preamble().await.unwrap(); // Use authenticator's hash variant — but to make the test // deterministic, set the algorithm so MAC happens. Default // params use MD5 + we accepted a synthetic remote pub key, so // the authenticator hasn't seen `accept_connect_response` yet. // Skip that branch by using `register_items` after manually // priming the authenticator. let bob = make_authenticator(); client .authenticator_mut() .accept_connect_response(bob.local_public_key(), None); let response = client .register_items(&[ItemIdentity::absolute_by_name("Tag.A")], true, false) .await; let response = response.unwrap(); assert_eq!(response.status.len(), 1); assert_eq!(response.status[0].item.name.as_deref(), Some("Tag.A")); let _ = peer_task.await.unwrap(); } #[tokio::test] async fn send_envelope_before_preamble_fails() { let (client_end, _peer_end) = tokio::io::duplex(1024); let mut client = AsbClient::new(client_end, make_authenticator(), "test://x/y"); let env = SoapEnvelope::new(actions::READ); let err = client.send_envelope(&env).await.unwrap_err(); assert!(matches!(err, ClientError::PreambleNotSent)); } #[tokio::test] async fn send_end_writes_record_07() { let (client_end, mut peer_end) = tokio::io::duplex(1024); let mut client = AsbClient::new(client_end, make_authenticator(), "test://x/y"); // We need preamble done so we don't hit the not-sent guard // (send_end has no such guard, but the flow is more realistic). client.preamble_sent = true; client.send_end().await.unwrap(); let mut buf = [0u8; 1]; peer_end.read_exact(&mut buf).await.unwrap(); assert_eq!(buf[0], 0x07); } /// Helper: build a synthetic body matching the /// RegisterItemsResponse shape so the test can verify the client /// extracts the Status array correctly. fn synthesise_register_response_body( status_payload: Vec, ) -> Vec { use mxaccess_asb_nettcp::nbfx::{NbfxName, NbfxText, NbfxToken}; const IOM_NS: &str = "urn:msg.data.asb.iom:2"; vec![ NbfxToken::Element { prefix: None, name: NbfxName::Inline("RegisterItemsResponse".to_string()), }, NbfxToken::DefaultNamespace { value: NbfxText::Chars(IOM_NS.to_string()), }, NbfxToken::Element { prefix: None, name: NbfxName::Inline("Status".to_string()), }, NbfxToken::Element { prefix: None, name: NbfxName::Inline("ASBIData".to_string()), }, NbfxToken::Text(NbfxText::Bytes(status_payload)), NbfxToken::EndElement, // NbfxToken::EndElement, // NbfxToken::EndElement, // ] } /// Sanity-check that NmfMode re-export matches the upstream type. #[test] fn preamble_mode_reexport_matches_upstream() { assert_eq!(PreambleMode::Duplex as u8, NmfMode::Duplex as u8); } #[tokio::test] async fn keep_alive_writes_one_way_envelope_without_reading_response() { let (client_end, peer_end) = tokio::io::duplex(8192); let peer_task = spawn_peer(peer_end, |mut peer| async move { // Drain preamble + send PreambleAck let mut buf = vec![0u8; 256]; let _n = peer.read(&mut buf).await.unwrap(); peer.write_all(&[0x0Bu8]).await.unwrap(); peer.flush().await.unwrap(); // Drain the KeepAlive SizedEnvelope. Don't reply — one-way op. let mut typebyte = [0u8; 1]; peer.read_exact(&mut typebyte).await.unwrap(); assert_eq!(typebyte[0], 0x06); let mut lenbuf = Vec::new(); for _ in 0..5 { let mut b = [0u8; 1]; peer.read_exact(&mut b).await.unwrap(); lenbuf.push(b[0]); if b[0] & 0x80 == 0 { break; } } let mut cursor = 0; let len = mxaccess_asb_nettcp::nmf::decode_multibyte_int31(&lenbuf, &mut cursor) .unwrap() as usize; let _payload = read_n(&mut peer, len).await; peer }); let mut client = AsbClient::new(client_end, make_authenticator(), "test://h/p"); client.send_preamble().await.unwrap(); let bob = make_authenticator(); client .authenticator_mut() .accept_connect_response(bob.local_public_key(), None); client.keep_alive().await.unwrap(); let _ = peer_task.await.unwrap(); } #[tokio::test] async fn read_round_trips_through_in_memory_peer() { use mxaccess_codec::{AsbStatus, AsbVariant, RuntimeValue}; let (client_end, peer_end) = tokio::io::duplex(8192); let peer_task = spawn_peer(peer_end, |mut peer| async move { // 1. Drain preamble + send ack let mut buf = vec![0u8; 256]; let _n = peer.read(&mut buf).await.unwrap(); peer.write_all(&[0x0Bu8]).await.unwrap(); peer.flush().await.unwrap(); // 2. Drain Read SizedEnvelope let mut typebyte = [0u8; 1]; peer.read_exact(&mut typebyte).await.unwrap(); assert_eq!(typebyte[0], 0x06); let mut lenbuf = Vec::new(); for _ in 0..5 { let mut b = [0u8; 1]; peer.read_exact(&mut b).await.unwrap(); lenbuf.push(b[0]); if b[0] & 0x80 == 0 { break; } } let mut cursor = 0; let len = mxaccess_asb_nettcp::nmf::decode_multibyte_int31(&lenbuf, &mut cursor) .unwrap() as usize; let _request_payload = read_n(&mut peer, len).await; // 3. Synthesize ReadResponse: Status + Values arrays let status = vec![ItemStatus { item: ItemIdentity::absolute_by_name("Tag.A"), status: AsbStatus::default(), error_code: 0, error_code_specified: true, }]; let values = vec![RuntimeValue { timestamp_binary: 1234, timestamp_specified: true, value: AsbVariant::from_i32(99), status: AsbStatus::default(), }]; let status_payload = crate::contracts::encode_item_status_array(&status); let mut values_payload = (values.len() as i32).to_le_bytes().to_vec(); for v in &values { v.encode_into(&mut values_payload); } let body = synthesise_read_response_body(status_payload, values_payload); let envelope = SoapEnvelope::new(actions::READ).with_body_tokens(body); let mut response_dict = DynamicDictionary::new(); let envelope_bytes = encode_envelope(&envelope, &mut response_dict).unwrap(); let mut frame = vec![0x06u8]; mxaccess_asb_nettcp::nmf::encode_multibyte_int31( &mut frame, envelope_bytes.len() as i32, ) .unwrap(); frame.extend_from_slice(&envelope_bytes); peer.write_all(&frame).await.unwrap(); peer.flush().await.unwrap(); peer }); let mut client = AsbClient::new(client_end, make_authenticator(), "test://h/p"); client.send_preamble().await.unwrap(); let bob = make_authenticator(); client .authenticator_mut() .accept_connect_response(bob.local_public_key(), None); let response = client .read(&[ItemIdentity::absolute_by_name("Tag.A")]) .await .unwrap(); assert_eq!(response.status.len(), 1); assert_eq!(response.values.len(), 1); assert_eq!(response.values[0].timestamp_binary, 1234); let _ = peer_task.await.unwrap(); } #[tokio::test] async fn disconnect_writes_signed_one_way_envelope() { let (client_end, peer_end) = tokio::io::duplex(8192); let peer_task = spawn_peer(peer_end, |mut peer| async move { // Drain preamble + ack let mut buf = vec![0u8; 256]; let _n = peer.read(&mut buf).await.unwrap(); peer.write_all(&[0x0Bu8]).await.unwrap(); peer.flush().await.unwrap(); // Drain Disconnect SizedEnvelope (one-way — no reply needed) let mut typebyte = [0u8; 1]; peer.read_exact(&mut typebyte).await.unwrap(); assert_eq!(typebyte[0], 0x06); let mut lenbuf = Vec::new(); for _ in 0..5 { let mut b = [0u8; 1]; peer.read_exact(&mut b).await.unwrap(); lenbuf.push(b[0]); if b[0] & 0x80 == 0 { break; } } let mut cursor = 0; let len = mxaccess_asb_nettcp::nmf::decode_multibyte_int31(&lenbuf, &mut cursor) .unwrap() as usize; let payload = read_n(&mut peer, len).await; // Sanity check: the Disconnect action string appears in the // (NBFX-encoded) envelope bytes. let action = b"http://asb.contracts/20111111:disconnectIn"; assert!(payload.windows(action.len()).any(|w| w == action)); peer }); let mut client = AsbClient::new(client_end, make_authenticator(), "test://h/p"); client.send_preamble().await.unwrap(); // Need a remote public key so create_authentication_data can run. let bob = make_authenticator(); client .authenticator_mut() .accept_connect_response(bob.local_public_key(), None); client.disconnect().await.unwrap(); let _ = peer_task.await.unwrap(); } #[tokio::test] async fn connect_handshake_round_trips_through_in_memory_peer() { let (client_end, peer_end) = tokio::io::duplex(8192); let peer_task = spawn_peer(peer_end, |mut peer| async move { // 1. Drain preamble + send PreambleAck let mut buf = vec![0u8; 256]; let _n = peer.read(&mut buf).await.unwrap(); peer.write_all(&[0x0Bu8]).await.unwrap(); peer.flush().await.unwrap(); // 2. Drain ConnectRequest SizedEnvelope let mut typebyte = [0u8; 1]; peer.read_exact(&mut typebyte).await.unwrap(); assert_eq!(typebyte[0], 0x06); let mut lenbuf = Vec::new(); for _ in 0..5 { let mut b = [0u8; 1]; peer.read_exact(&mut b).await.unwrap(); lenbuf.push(b[0]); if b[0] & 0x80 == 0 { break; } } let mut cursor = 0; let len = mxaccess_asb_nettcp::nmf::decode_multibyte_int31(&lenbuf, &mut cursor) .unwrap() as usize; let _connect_request = read_n(&mut peer, len).await; // 3. Build a synthetic ConnectResponse: service_public_key // = matching `bob` so the shared-secret derivation works. let bob = make_authenticator(); let svc_pubkey = bob.local_public_key().to_vec(); let body = synthesise_connect_response_body(svc_pubkey); let envelope = SoapEnvelope::new(actions::CONNECT).with_body_tokens(body); let mut response_dict = DynamicDictionary::new(); let envelope_bytes = encode_envelope(&envelope, &mut response_dict).unwrap(); let mut frame = vec![0x06u8]; mxaccess_asb_nettcp::nmf::encode_multibyte_int31( &mut frame, envelope_bytes.len() as i32, ) .unwrap(); frame.extend_from_slice(&envelope_bytes); peer.write_all(&frame).await.unwrap(); peer.flush().await.unwrap(); // 4. Drain AuthenticateMe one-way SizedEnvelope. let mut typebyte = [0u8; 1]; peer.read_exact(&mut typebyte).await.unwrap(); assert_eq!(typebyte[0], 0x06); let mut lenbuf = Vec::new(); for _ in 0..5 { let mut b = [0u8; 1]; peer.read_exact(&mut b).await.unwrap(); lenbuf.push(b[0]); if b[0] & 0x80 == 0 { break; } } let mut cursor = 0; let len = mxaccess_asb_nettcp::nmf::decode_multibyte_int31(&lenbuf, &mut cursor) .unwrap() as usize; let _authenticate_me = read_n(&mut peer, len).await; peer }); let mut client = AsbClient::new(client_end, make_authenticator(), "test://h/p"); client.send_preamble().await.unwrap(); let response = client.connect().await.unwrap(); // Smoke-check that the response carries our synthesized public // key bytes (length matches a real DH key, ~129 bytes). assert!(!response.service_public_key.is_empty()); assert!(response.connection_lifetime.is_none()); let _ = peer_task.await.unwrap(); } fn synthesise_connect_response_body( service_public_key: Vec, ) -> Vec { use mxaccess_asb_nettcp::nbfx::{NbfxName, NbfxText, NbfxToken}; const MESSAGES_NS: &str = "http://asb.contracts.messages/20111111"; vec![ NbfxToken::Element { prefix: None, name: NbfxName::Inline("ConnectResponse".to_string()), }, NbfxToken::DefaultNamespace { value: NbfxText::Chars(MESSAGES_NS.to_string()), }, NbfxToken::Element { prefix: None, name: NbfxName::Inline("ServicePublicKey".to_string()), }, NbfxToken::Element { prefix: None, name: NbfxName::Inline("Data".to_string()), }, NbfxToken::Text(NbfxText::Bytes(service_public_key)), NbfxToken::EndElement, // NbfxToken::EndElement, // NbfxToken::EndElement, // ] } fn synthesise_read_response_body( status_payload: Vec, values_payload: Vec, ) -> Vec { use mxaccess_asb_nettcp::nbfx::{NbfxName, NbfxText, NbfxToken}; const IOM_NS: &str = "urn:msg.data.asb.iom:2"; let mut tokens = vec![ NbfxToken::Element { prefix: None, name: NbfxName::Inline("ReadResponse".to_string()), }, NbfxToken::DefaultNamespace { value: NbfxText::Chars(IOM_NS.to_string()), }, ]; for (name, payload) in [("Status", status_payload), ("Values", values_payload)] { tokens.push(NbfxToken::Element { prefix: None, name: NbfxName::Inline(name.to_string()), }); tokens.push(NbfxToken::Element { prefix: None, name: NbfxName::Inline("ASBIData".to_string()), }); tokens.push(NbfxToken::Text(NbfxText::Bytes(payload))); tokens.push(NbfxToken::EndElement); tokens.push(NbfxToken::EndElement); } tokens.push(NbfxToken::EndElement); tokens } }