diff --git a/design/followups.md b/design/followups.md index d040e7e..f318eb7 100644 --- a/design/followups.md +++ b/design/followups.md @@ -46,7 +46,11 @@ move to `## Resolved` with a date + commit hash. **Resolves when:** F19-F26 are all closed and the four DoD bullets above pass. -**Cumulative execution log.** F19 + F23 (`ed17c07`); F24 (`7611d9e`); F20 (`9dfd193`); F22 (`43c10a1`); F21 (`5f98558`); F25 step 1 (`25dbd8d`); F25 step 2 (`a2b8989`); F25 step 3 (`c4bf0a0`); F25 step 4 landed in this commit: +**Cumulative execution log.** F19 + F23 (`ed17c07`); F24 (`7611d9e`); F20 (`9dfd193`); F22 (`43c10a1`); F21 (`5f98558`); F25 step 1 (`25dbd8d`); F25 step 2 (`a2b8989`); F25 step 3 (`c4bf0a0`); F25 step 4 (`1e59249`); F25 step 5 landed in this commit: +- F25 step 5: extends `AsbClient` with one-way operation support + `KeepAlive` + `Read` wrappers. New `send_envelope_one_way` / `send_signed_envelope_one_way` mirror WCF's `[OperationContract(IsOneWay = true)]` semantics — write the SizedEnvelope and return immediately. New `client::keep_alive` ports `MxAsbDataClient`'s channel inactivity-keepalive (`AsbContracts.cs:117` — empty wrapper element + ConnectionValidator header). New `client::read` + `decode_read_response` (in operations) decode `Status` (`Vec`) + `Values` (`Vec`) from the dual-``-payload `ReadResponse` body shape. RuntimeValue array decode mirrors `AsbContracts.cs:771-780` (4-byte int32 count + per-element `WriteToStream`). 5 new tests: keep_alive body shape (empty wrapper), ReadResponse round-trip with Status + Values, ReadResponse-with-no-Values graceful handling, plus two end-to-end client tests via `tokio::io::duplex` peer (keep_alive one-way send drains the SizedEnvelope but produces no response, read round-trips Status + Values from a synthetic ReadResponse). + +**Earlier slices:** +- F25 step 4 (commit `1e59249`): - F25 step 4: `mxaccess-asb::client::AsbClient` — async network loop generic over `AsyncRead + AsyncWrite + Unpin + Send`. Wraps the F19-F25.3 stack into a single struct with: `send_preamble` (writes the canonical NMF preamble + waits for PreambleAck; errors on Fault), `send_envelope` (frames in `SizedEnvelope`, writes, reads response, decodes back to `DecodedEnvelope`), `send_signed_envelope` (calls F23 authenticator's `sign` on the unsigned body bytes, attaches a `ConnectionValidator` header, sends), `register_items` / `unregister_items` thin wrappers, `send_end` (writes record `0x07` + shutdowns the stream), and `authenticator_mut` accessor for the future Connect/AuthenticateMe flow. Generic transport means tests use `tokio::io::duplex` for in-memory verification — no live ASB endpoint needed. 6 new tests cover preamble round-trip, fault propagation through preamble, full RegisterItems request → response round-trip via in-memory peer, send-before-preamble guard, send-end record byte (`0x07`), and `PreambleMode` re-export shape. **Note**: the signing path currently hashes the NBFX-encoded body; .NET hashes the XML-text `request.ToXml()`. Functionally present but byte-non-identical to .NET's MAC for the same payload. Live-probe iteration needs to reconcile this — flagged as `TODO` in the doc comment. **Earlier slices:** diff --git a/rust/crates/mxaccess-asb/src/client.rs b/rust/crates/mxaccess-asb/src/client.rs index 4de7e29..6bc71ec 100644 --- a/rust/crates/mxaccess-asb/src/client.rs +++ b/rust/crates/mxaccess-asb/src/client.rs @@ -55,9 +55,10 @@ use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use crate::contracts::{ItemIdentity, ItemStatus}; use crate::envelope::{ConnectionValidator, EnvelopeError, SoapEnvelope}; use crate::operations::{ - OperationError, RegisterItemsResponse, UnregisterItemsResponse, - build_register_items_request_body, build_unregister_items_request_body, - decode_register_items_response, decode_unregister_items_response, + OperationError, ReadResponse, RegisterItemsResponse, UnregisterItemsResponse, + build_keep_alive_request_body, build_read_request_body, build_register_items_request_body, + build_unregister_items_request_body, decode_read_response, decode_register_items_response, + decode_unregister_items_response, }; use crate::{actions, decode_envelope, encode_envelope}; @@ -195,6 +196,69 @@ impl AsbClient { self.send_envelope(&signed_env).await } + /// One-way send: encode + frame + write, but do **not** read a + /// response. Mirrors WCF's `[OperationContract(IsOneWay = true)]` + /// semantics — `KeepAlive`, `Disconnect`, and `AuthenticateMe` all + /// take this path on the .NET side. + pub async fn send_envelope_one_way( + &mut self, + envelope: &SoapEnvelope, + ) -> Result<(), ClientError> { + if !self.preamble_sent { + return Err(ClientError::PreambleNotSent); + } + if self.closed { + return Err(ClientError::AlreadyClosed); + } + let payload = encode_envelope(envelope, &mut self.write_dictionary)?; + let mut framed = Vec::new(); + NmfRecord::SizedEnvelope(payload).encode_into(&mut framed)?; + self.stream.write_all(&framed).await?; + self.stream.flush().await?; + Ok(()) + } + + /// One-way signed send for operations that need a + /// `ConnectionValidator` header. Mirrors `MxAsbDataClient` calls + /// like `authenticator.Sign(...)` followed by an `IsOneWay = true` + /// channel call. + pub async fn send_signed_envelope_one_way( + &mut self, + action: &str, + body_tokens: Vec, + force_hmac: bool, + ) -> Result<(), ClientError> { + let unsigned = SoapEnvelope::new(action).with_body_tokens(body_tokens.clone()); + let mut probe_dict = DynamicDictionary::new(); + let unsigned_bytes = encode_envelope(&unsigned, &mut probe_dict)?; + let signed = self.authenticator.sign(&unsigned_bytes, force_hmac)?; + let validator = ConnectionValidator::from_signed(&signed); + let signed_env = SoapEnvelope::new(action) + .with_body_tokens(body_tokens) + .with_validator(validator); + self.send_envelope_one_way(&signed_env).await + } + + /// `KeepAlive` operation — one-way signed envelope with an empty + /// `KeepAliveRequest` body. Used to keep the channel alive past + /// the WCF inactivity timeout (`MxAsbDataClient.cs:683`, + /// `ReliableSession.InactivityTimeout = 30s`). + pub async fn keep_alive(&mut self) -> Result<(), ClientError> { + let body = build_keep_alive_request_body(); + self.send_signed_envelope_one_way(actions::KEEP_ALIVE, body, false) + .await + } + + /// `Read` operation — sends a signed `ReadIn` SOAP envelope and + /// decodes the `ReadResponse` (Status array + Values array). + pub async fn read(&mut self, items: &[ItemIdentity]) -> Result { + let body = build_read_request_body(items); + let response = self + .send_signed_envelope(actions::READ, body, false) + .await?; + Ok(decode_read_response(&response.body_tokens)?) + } + /// `RegisterItems` operation — sends a signed `RegisterItemsIn` /// SOAP envelope and decodes the `RegisterItemsResponse`. pub async fn register_items( @@ -578,4 +642,159 @@ mod tests { fn preamble_mode_reexport_matches_upstream() { assert_eq!(PreambleMode::Duplex as u8, NmfMode::Duplex as u8); } + + #[tokio::test] + async fn keep_alive_writes_one_way_envelope_without_reading_response() { + let (client_end, peer_end) = tokio::io::duplex(8192); + let peer_task = spawn_peer(peer_end, |mut peer| async move { + // Drain preamble + send PreambleAck + let mut buf = vec![0u8; 256]; + let _n = peer.read(&mut buf).await.unwrap(); + peer.write_all(&[0x0Bu8]).await.unwrap(); + peer.flush().await.unwrap(); + // Drain the KeepAlive SizedEnvelope. Don't reply — one-way op. + let mut typebyte = [0u8; 1]; + peer.read_exact(&mut typebyte).await.unwrap(); + assert_eq!(typebyte[0], 0x06); + let mut lenbuf = Vec::new(); + for _ in 0..5 { + let mut b = [0u8; 1]; + peer.read_exact(&mut b).await.unwrap(); + lenbuf.push(b[0]); + if b[0] & 0x80 == 0 { + break; + } + } + let mut cursor = 0; + let len = mxaccess_asb_nettcp::nmf::decode_multibyte_int31(&lenbuf, &mut cursor) + .unwrap() as usize; + let _payload = read_n(&mut peer, len).await; + peer + }); + + let mut client = AsbClient::new(client_end, make_authenticator(), "test://h/p"); + client.send_preamble().await.unwrap(); + let bob = make_authenticator(); + client + .authenticator_mut() + .accept_connect_response(bob.local_public_key(), None); + + client.keep_alive().await.unwrap(); + let _ = peer_task.await.unwrap(); + } + + #[tokio::test] + async fn read_round_trips_through_in_memory_peer() { + use mxaccess_codec::{AsbStatus, AsbVariant, RuntimeValue}; + + let (client_end, peer_end) = tokio::io::duplex(8192); + let peer_task = spawn_peer(peer_end, |mut peer| async move { + // 1. Drain preamble + send ack + let mut buf = vec![0u8; 256]; + let _n = peer.read(&mut buf).await.unwrap(); + peer.write_all(&[0x0Bu8]).await.unwrap(); + peer.flush().await.unwrap(); + + // 2. Drain Read SizedEnvelope + let mut typebyte = [0u8; 1]; + peer.read_exact(&mut typebyte).await.unwrap(); + assert_eq!(typebyte[0], 0x06); + let mut lenbuf = Vec::new(); + for _ in 0..5 { + let mut b = [0u8; 1]; + peer.read_exact(&mut b).await.unwrap(); + lenbuf.push(b[0]); + if b[0] & 0x80 == 0 { + break; + } + } + let mut cursor = 0; + let len = mxaccess_asb_nettcp::nmf::decode_multibyte_int31(&lenbuf, &mut cursor) + .unwrap() as usize; + let _request_payload = read_n(&mut peer, len).await; + + // 3. Synthesize ReadResponse: Status + Values arrays + let status = vec![ItemStatus { + item: ItemIdentity::absolute_by_name("Tag.A"), + status: AsbStatus::default(), + error_code: 0, + error_code_specified: true, + }]; + let values = vec![RuntimeValue { + timestamp_binary: 1234, + timestamp_specified: true, + value: AsbVariant::from_i32(99), + status: AsbStatus::default(), + }]; + let status_payload = crate::contracts::encode_item_status_array(&status); + let mut values_payload = (values.len() as i32).to_le_bytes().to_vec(); + for v in &values { + v.encode_into(&mut values_payload); + } + let body = synthesise_read_response_body(status_payload, values_payload); + let envelope = SoapEnvelope::new(actions::READ).with_body_tokens(body); + let mut response_dict = DynamicDictionary::new(); + let envelope_bytes = encode_envelope(&envelope, &mut response_dict).unwrap(); + + let mut frame = vec![0x06u8]; + mxaccess_asb_nettcp::nmf::encode_multibyte_int31( + &mut frame, + envelope_bytes.len() as i32, + ) + .unwrap(); + frame.extend_from_slice(&envelope_bytes); + peer.write_all(&frame).await.unwrap(); + peer.flush().await.unwrap(); + peer + }); + + let mut client = AsbClient::new(client_end, make_authenticator(), "test://h/p"); + client.send_preamble().await.unwrap(); + let bob = make_authenticator(); + client + .authenticator_mut() + .accept_connect_response(bob.local_public_key(), None); + + let response = client + .read(&[ItemIdentity::absolute_by_name("Tag.A")]) + .await + .unwrap(); + assert_eq!(response.status.len(), 1); + assert_eq!(response.values.len(), 1); + assert_eq!(response.values[0].timestamp_binary, 1234); + + let _ = peer_task.await.unwrap(); + } + + fn synthesise_read_response_body( + status_payload: Vec, + values_payload: Vec, + ) -> Vec { + use mxaccess_asb_nettcp::nbfx::{NbfxName, NbfxText, NbfxToken}; + const IOM_NS: &str = "urn:msg.data.asb.iom:2"; + let mut tokens = vec![ + NbfxToken::Element { + prefix: None, + name: NbfxName::Inline("ReadResponse".to_string()), + }, + NbfxToken::DefaultNamespace { + value: NbfxText::Chars(IOM_NS.to_string()), + }, + ]; + for (name, payload) in [("Status", status_payload), ("Values", values_payload)] { + tokens.push(NbfxToken::Element { + prefix: None, + name: NbfxName::Inline(name.to_string()), + }); + tokens.push(NbfxToken::Element { + prefix: None, + name: NbfxName::Inline("ASBIData".to_string()), + }); + tokens.push(NbfxToken::Text(NbfxText::Bytes(payload))); + tokens.push(NbfxToken::EndElement); + tokens.push(NbfxToken::EndElement); + } + tokens.push(NbfxToken::EndElement); + tokens + } } diff --git a/rust/crates/mxaccess-asb/src/lib.rs b/rust/crates/mxaccess-asb/src/lib.rs index bd0f63b..ba2c012 100644 --- a/rust/crates/mxaccess-asb/src/lib.rs +++ b/rust/crates/mxaccess-asb/src/lib.rs @@ -25,7 +25,8 @@ pub use envelope::{ encode_envelope, }; pub use operations::{ - OperationError, RegisterItemsResponse, UnregisterItemsResponse, build_read_request_body, - build_register_items_request_body, build_unregister_items_request_body, - collect_asbidata_payloads, decode_register_items_response, decode_unregister_items_response, + OperationError, ReadResponse, RegisterItemsResponse, UnregisterItemsResponse, + build_keep_alive_request_body, build_read_request_body, build_register_items_request_body, + build_unregister_items_request_body, collect_asbidata_payloads, decode_read_response, + decode_register_items_response, decode_unregister_items_response, }; diff --git a/rust/crates/mxaccess-asb/src/operations.rs b/rust/crates/mxaccess-asb/src/operations.rs index c3cdef2..97d87d1 100644 --- a/rust/crates/mxaccess-asb/src/operations.rs +++ b/rust/crates/mxaccess-asb/src/operations.rs @@ -36,7 +36,7 @@ //! `InitializeArrayFromStream` shape. use mxaccess_asb_nettcp::nbfx::{NbfxName, NbfxText, NbfxToken}; -use mxaccess_codec::CodecError; +use mxaccess_codec::{CodecError, RuntimeValue}; use crate::contracts::{ ItemIdentity, ItemStatus, decode_item_status_array, encode_item_identity_array, @@ -90,6 +90,99 @@ pub fn build_read_request_body(items: &[ItemIdentity]) -> Vec { asbidata_request_body("ReadRequest", &[BodyField::asbidata("Items", payload)]) } +/// Build the NBFX token stream for a `KeepAliveIn` request body. The +/// `KeepAlive` contract has no body fields beyond the inherited +/// `ConnectionValidator` header, so the body is just the empty wrapper +/// element (`AsbContracts.cs:117` — `public sealed class KeepAlive : +/// ConnectedRequest;`). +/// +/// One-way op (`IsOneWay = true` at `AsbContracts.cs:26`) — caller +/// uses [`crate::AsbClient::send_envelope_one_way`]. +pub fn build_keep_alive_request_body() -> Vec { + vec![ + NbfxToken::Element { + prefix: None, + name: NbfxName::Inline("KeepAliveRequest".to_string()), + }, + NbfxToken::DefaultNamespace { + value: NbfxText::Chars(MESSAGES_NS.to_string()), + }, + NbfxToken::EndElement, + ] +} + +const MESSAGES_NS: &str = "http://asb.contracts.messages/20111111"; + +/// Decode a `ReadResponse` SOAP body. Mirrors the decode path of +/// `MxAsbDataClient.DecodeVariant` (`MxAsbDataClient.cs:713-825`) +/// applied to each `` `` payload. +/// +/// `Values` are decoded as `RuntimeValue` (timestamp + variant + status +/// per `AsbContracts.cs:741-791`) using the F24 codec. `Status` is the +/// per-item operation status array. +#[derive(Debug, Clone, PartialEq)] +pub struct ReadResponse { + pub status: Vec, + pub values: Vec, +} + +/// Decode a `ReadResponse` SOAP body from the NBFX tokens returned by +/// [`crate::decode_envelope`]. Both `Status` and `Values` arrive as +/// `` payloads; we decode the binary form of each. +pub fn decode_read_response(body_tokens: &[NbfxToken]) -> Result { + let status_payload = collect_asbidata_payloads(body_tokens, "Status") + .into_iter() + .next() + .ok_or(OperationError::MissingField { field: "Status" })?; + let status = decode_item_status_array(&status_payload)?; + + let values = match collect_asbidata_payloads(body_tokens, "Values") + .into_iter() + .next() + { + Some(payload) => decode_runtime_value_array(&payload)?, + None => Vec::new(), + }; + + Ok(ReadResponse { status, values }) +} + +/// Decode a `RuntimeValue[]` array from the WCF custom-serializer +/// binary form (4-byte int32 count + each value's `WriteToStream`). +/// Mirrors `RuntimeValue.InitializeArrayFromStream` (`AsbContracts.cs:771-780`). +fn decode_runtime_value_array(input: &[u8]) -> Result, CodecError> { + if input.len() < 4 { + return Err(CodecError::ShortRead { + expected: 4, + actual: input.len(), + }); + } + let mut count_buf = [0u8; 4]; + if let Some(slice) = input.get(0..4) { + count_buf.copy_from_slice(slice); + } + let count = i32::from_le_bytes(count_buf); + if count < 0 { + return Err(CodecError::Decode { + offset: 0, + reason: "negative runtime-value array count", + buffer_len: input.len(), + }); + } + let mut cursor = 4usize; + let mut out = Vec::with_capacity(count as usize); + for _ in 0..count { + let tail = input.get(cursor..).ok_or(CodecError::ShortRead { + expected: 1, + actual: 0, + })?; + let (rv, consumed) = RuntimeValue::decode(tail)?; + cursor += consumed; + out.push(rv); + } + Ok(out) +} + /// Decoded `RegisterItemsResponse`. The `Status` array is binary-decoded /// via `decode_item_status_array`. The optional `ItemCapabilities` /// (`ItemRegistration[]`) field is **not** decoded here — that contract @@ -588,6 +681,82 @@ mod tests { )); } + #[test] + fn keep_alive_body_is_empty_wrapper_with_namespace() { + let body = build_keep_alive_request_body(); + assert_eq!(body.len(), 3); + assert!(matches!( + &body[0], + NbfxToken::Element { name: NbfxName::Inline(s), .. } if s == "KeepAliveRequest" + )); + assert!(matches!( + &body[1], + NbfxToken::DefaultNamespace { value: NbfxText::Chars(ns) } + if ns == "http://asb.contracts.messages/20111111" + )); + assert!(matches!(&body[2], NbfxToken::EndElement)); + } + + #[test] + fn read_response_decodes_status_and_values() { + use mxaccess_codec::{AsbStatus, AsbVariant, RuntimeValue}; + let status = vec![ItemStatus { + item: ItemIdentity::absolute_by_name("Tag.A"), + status: AsbStatus::default(), + error_code: 0, + error_code_specified: true, + }]; + let values = vec![RuntimeValue { + timestamp_binary: 0x0123_4567_89AB_CDEF, + timestamp_specified: true, + value: AsbVariant::from_i32(42), + status: AsbStatus { + count: 0, + payload: vec![], + }, + }]; + + // Encode the values array using the same int32-count + per-value + // shape that `RuntimeValue.WriteArrayToStream` emits. + let mut values_payload = i32::try_from(values.len()) + .unwrap_or(i32::MAX) + .to_le_bytes() + .to_vec(); + for v in &values { + v.encode_into(&mut values_payload); + } + + let status_payload = crate::contracts::encode_item_status_array(&status); + + let body = asbidata_request_body( + "ReadResponse", + &[ + BodyField::asbidata("Status", status_payload), + BodyField::asbidata("Values", values_payload), + ], + ); + + let decoded = decode_read_response(&body).unwrap(); + assert_eq!(decoded.status, status); + assert_eq!(decoded.values, values); + } + + #[test] + fn read_response_with_no_values_returns_empty_vec() { + use mxaccess_codec::AsbStatus; + let status = vec![ItemStatus { + item: ItemIdentity::absolute_by_name("X"), + status: AsbStatus::default(), + error_code: 0, + error_code_specified: true, + }]; + let payload = crate::contracts::encode_item_status_array(&status); + let body = asbidata_request_body("ReadResponse", &[BodyField::asbidata("Status", payload)]); + let decoded = decode_read_response(&body).unwrap(); + assert_eq!(decoded.status, status); + assert!(decoded.values.is_empty()); + } + #[test] fn empty_items_array_still_produces_valid_envelope() { let body = build_register_items_request_body(&[], false, false);