[M5] mxaccess-asb: F25 step 4 — AsbClient async network loop
The first slice of F25 that actually moves bytes across a transport. Wraps every M5 framing layer (F19-F25.3) into a single async client generic over `AsyncRead + AsyncWrite + Unpin + Send`. Tested in-memory via `tokio::io::duplex` — no live ASB endpoint required. API: * `AsbClient::new(stream, authenticator, via_uri)` — wraps a Tokio transport + F23 authenticator into a ready client. * `send_preamble()` — writes the canonical preamble (Version 1.0 → Duplex → Via → BinaryWithDictionary → PreambleEnd) and reads the peer's PreambleAck. Surfaces Fault as `ClientError::Fault(msg)`. * `send_envelope(env)` — frames `SoapEnvelope` in a SizedEnvelope NMF record, writes, reads the response SizedEnvelope, decodes back to `DecodedEnvelope`. * `send_signed_envelope(action, body, force_hmac)` — calls F23 authenticator's `sign` on the unsigned body bytes, attaches a ConnectionValidator header (base64'd MAC + IV), sends. * `register_items` / `unregister_items` — thin per-operation wrappers threading body builder + response decoder. * `send_end()` — writes record 0x07 + shutdowns the stream. Async record reader: streaming decode of the multibyte-int31 length prefix for SizedEnvelope (0x06) / Fault (0x08), plus a fallback path for Version / Mode / KnownEncoding / etc. `ClientError` covers I/O, NMF, NBFX, Envelope, Operation, Auth, plus PreambleNotSent / AlreadyClosed / Fault / PeerClosed / UnexpectedRecord guards. 6 new tests via in-memory `tokio::io::duplex`: * Preamble round-trip with synthetic peer returning PreambleAck. * Fault propagation through preamble exchange. * End-to-end RegisterItems request → response with a peer that drains preamble, replies PreambleAck, drains the SizedEnvelope, responds with a synthetic RegisterItemsResponse body containing a binary-encoded ItemStatus array. Client decodes and asserts the recovered ItemIdentity name. * `send_envelope` before preamble fails with PreambleNotSent. * `send_end` writes record 0x07 to the wire. * PreambleMode re-export keeps shape parity with `nmf::NmfMode`. Known limitation: the signing path currently hashes the NBFX-encoded body; .NET hashes the XML-text `request.ToXml()`. Functionally present (validator built and attached) but MAC bytes won't match .NET's MAC for the same payload until the live-probe iteration reconciles which canonical form to sign. Stubbed for next F25 iteration: * `AsbClient::connect` — DH `Connect` + `AuthenticateMe` handshake flow. Needs ConnectRequest/Response builders (regular WCF XML, not the IAsbCustomSerializableType fast-path) and the `AsbAuthenticator::create_authentication_data` integration. * Read / Write / Subscription operation wrappers. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
+5
-1
@@ -46,7 +46,11 @@ move to `## Resolved` with a date + commit hash.
|
||||
|
||||
**Resolves when:** F19-F26 are all closed and the four DoD bullets above pass.
|
||||
|
||||
**Cumulative execution log.** F19 + F23 (`ed17c07`); F24 (`7611d9e`); F20 (`9dfd193`); F22 (`43c10a1`); F21 (`5f98558`); F25 step 1 (`25dbd8d`); F25 step 2 (`a2b8989`); F25 step 3 landed in this commit:
|
||||
**Cumulative execution log.** F19 + F23 (`ed17c07`); F24 (`7611d9e`); F20 (`9dfd193`); F22 (`43c10a1`); F21 (`5f98558`); F25 step 1 (`25dbd8d`); F25 step 2 (`a2b8989`); F25 step 3 (`c4bf0a0`); F25 step 4 landed in this commit:
|
||||
- F25 step 4: `mxaccess-asb::client::AsbClient` — async network loop generic over `AsyncRead + AsyncWrite + Unpin + Send`. Wraps the F19-F25.3 stack into a single struct with: `send_preamble` (writes the canonical NMF preamble + waits for PreambleAck; errors on Fault), `send_envelope` (frames in `SizedEnvelope`, writes, reads response, decodes back to `DecodedEnvelope`), `send_signed_envelope` (calls F23 authenticator's `sign` on the unsigned body bytes, attaches a `ConnectionValidator` header, sends), `register_items` / `unregister_items` thin wrappers, `send_end` (writes record `0x07` + shutdowns the stream), and `authenticator_mut` accessor for the future Connect/AuthenticateMe flow. Generic transport means tests use `tokio::io::duplex` for in-memory verification — no live ASB endpoint needed. 6 new tests cover preamble round-trip, fault propagation through preamble, full RegisterItems request → response round-trip via in-memory peer, send-before-preamble guard, send-end record byte (`0x07`), and `PreambleMode` re-export shape. **Note**: the signing path currently hashes the NBFX-encoded body; .NET hashes the XML-text `request.ToXml()`. Functionally present but byte-non-identical to .NET's MAC for the same payload. Live-probe iteration needs to reconcile this — flagged as `TODO` in the doc comment.
|
||||
|
||||
**Earlier slices:**
|
||||
- F25 step 3 (commit `c4bf0a0`):
|
||||
- F25 step 3: response decoder foundation. New `mxaccess-asb::contracts::ItemStatus` ports `AsbContracts.cs:639-722` — Item (ItemIdentity) + Status (AsbStatus, F24) + ErrorCode u16 + ErrorCodeSpecified bool, in the .NET-WriteToStream order (Item / Status / ErrorCode / ErrorCodeSpecified — NOT the DataMember declaration order). `encode_item_status_array` / `decode_item_status_array` follow the same int32-count + per-element pattern. New `mxaccess-asb::operations::collect_asbidata_payloads(tokens, field_name)` walks an NBFX token stream and pulls out the `<{field_name}><ASBIData>{Bytes}</ASBIData></{field_name}>` payload bytes — handles multiple payloads (e.g. ReadResponse has both Status and Values). New `decode_register_items_response` / `decode_unregister_items_response` parse SOAP bodies into typed responses. New `build_read_request_body` adds the simplest unary IASBIDataV2 request shape. Plus a typed `OperationError` for response-decode failures (missing fields, codec errors). 9 new tests cover ItemStatus round-trip + array round-trip, RegisterItems response with status array, RegisterItems response detecting ItemCapabilities presence, UnregisterItems response, multi-payload extraction (`ReadResponse`-style with Status + Values), Read request body shape (no RegisterItems-only fields), and graceful MissingField error when Status is absent.
|
||||
|
||||
**Earlier slices:**
|
||||
|
||||
@@ -0,0 +1,581 @@
|
||||
//! `AsbClient` — `IASBIDataV2` request/response loop over a transport
|
||||
//! that implements `AsyncRead + AsyncWrite`.
|
||||
//!
|
||||
//! Wires together every M5 framing layer:
|
||||
//!
|
||||
//! ```text
|
||||
//! Tokio AsyncRead+AsyncWrite (typically a TcpStream)
|
||||
//! │
|
||||
//! ▼
|
||||
//! [MS-NMF] framing (F20: nmf::NmfRecord)
|
||||
//! │ preamble: Version → Mode(Duplex) → Via → KnownEncoding(BinaryWithDictionary) → PreambleEnd
|
||||
//! │ per-msg: SizedEnvelope { Multibyte Int31 length + payload bytes }
|
||||
//! │ shutdown: End record
|
||||
//! ▼
|
||||
//! [MC-NBFX]/[MC-NBFS] binary XML (F21 + F22)
|
||||
//! ▼
|
||||
//! SOAP-1.2 envelope (F25 step 1: SoapEnvelope, encode_envelope/decode_envelope)
|
||||
//! ▼
|
||||
//! IASBIDataV2 operation contract (F25 steps 2/3: per-op request/response)
|
||||
//! ```
|
||||
//!
|
||||
//! The client is generic over the transport so the network-bound
|
||||
//! request/response loop can be unit-tested against in-memory streams
|
||||
//! (`tokio::io::DuplexStream`) without a live ASB endpoint.
|
||||
//!
|
||||
//! ## Scope of this iteration (F25 step 4)
|
||||
//!
|
||||
//! Implements:
|
||||
//! * [`AsbClient::new`] — wraps a transport + authenticator into a
|
||||
//! ready-to-use client (assumes the preamble has already been sent
|
||||
//! or that `send_preamble` will be called next).
|
||||
//! * [`AsbClient::send_preamble`] — writes the canonical preamble
|
||||
//! record sequence and waits for a `PreambleAck` from the peer.
|
||||
//! * [`AsbClient::send_envelope`] — frames a `SoapEnvelope` in a
|
||||
//! `SizedEnvelope` record, writes it, reads the next record from
|
||||
//! the peer, decodes the response envelope.
|
||||
//! * [`AsbClient::send_end`] — writes the NMF `End` record so the
|
||||
//! peer can drain cleanly.
|
||||
//! * Per-operation thin wrappers: [`AsbClient::register_items`],
|
||||
//! [`AsbClient::unregister_items`].
|
||||
//!
|
||||
//! Stubbed for next F25 iteration:
|
||||
//! * `AsbClient::connect` — the DH `Connect` + `AuthenticateMe`
|
||||
//! handshake. Needs the `ConnectRequest` / `ConnectResponse` body
|
||||
//! builders (regular WCF XML, not the IAsbCustomSerializableType
|
||||
//! fast-path) and authentication-data assembly off F23's
|
||||
//! `AsbAuthenticator::create_authentication_data`.
|
||||
//! * Read / Write / Subscription operation wrappers.
|
||||
|
||||
use mxaccess_asb_nettcp::auth::AsbAuthenticator;
|
||||
use mxaccess_asb_nettcp::nbfx::{DynamicDictionary, NbfxError};
|
||||
use mxaccess_asb_nettcp::nmf::{self, NmfError, NmfRecord, decode_multibyte_int31};
|
||||
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
|
||||
|
||||
use crate::contracts::{ItemIdentity, ItemStatus};
|
||||
use crate::envelope::{ConnectionValidator, EnvelopeError, SoapEnvelope};
|
||||
use crate::operations::{
|
||||
OperationError, RegisterItemsResponse, UnregisterItemsResponse,
|
||||
build_register_items_request_body, build_unregister_items_request_body,
|
||||
decode_register_items_response, decode_unregister_items_response,
|
||||
};
|
||||
use crate::{actions, decode_envelope, encode_envelope};
|
||||
|
||||
/// `IASBIDataV2` request/response client over a Tokio
|
||||
/// `AsyncRead + AsyncWrite` transport.
|
||||
pub struct AsbClient<T: AsyncRead + AsyncWrite + Unpin + Send> {
|
||||
stream: T,
|
||||
authenticator: AsbAuthenticator,
|
||||
via_uri: String,
|
||||
write_dictionary: DynamicDictionary,
|
||||
read_dictionary: DynamicDictionary,
|
||||
preamble_sent: bool,
|
||||
closed: bool,
|
||||
}
|
||||
|
||||
impl<T: AsyncRead + AsyncWrite + Unpin + Send> AsbClient<T> {
|
||||
/// Wrap a fresh transport. The caller is responsible for opening
|
||||
/// the underlying TCP connection (or test stream) before passing
|
||||
/// it in. `via_uri` is the `net.tcp://host:port/path` URL the peer
|
||||
/// expects in the `ViaRecord`.
|
||||
pub fn new(stream: T, authenticator: AsbAuthenticator, via_uri: impl Into<String>) -> Self {
|
||||
Self {
|
||||
stream,
|
||||
authenticator,
|
||||
via_uri: via_uri.into(),
|
||||
write_dictionary: DynamicDictionary::new(),
|
||||
read_dictionary: DynamicDictionary::new(),
|
||||
preamble_sent: false,
|
||||
closed: false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Borrow the inner authenticator. Useful for tests + for the F25
|
||||
/// `connect` flow that needs to call `accept_connect_response`.
|
||||
pub fn authenticator_mut(&mut self) -> &mut AsbAuthenticator {
|
||||
&mut self.authenticator
|
||||
}
|
||||
|
||||
/// Write the canonical NMF preamble (`Version 1.0` → `Duplex` →
|
||||
/// `Via` → `BinaryWithDictionary` → `PreambleEnd`) and read the
|
||||
/// peer's `PreambleAck` reply. Records other than `PreambleAck` —
|
||||
/// notably `Fault` — surface as a typed error.
|
||||
///
|
||||
/// Idempotent in the sense that a second call does nothing; the
|
||||
/// preamble is only ever exchanged once per session.
|
||||
pub async fn send_preamble(&mut self) -> Result<(), ClientError> {
|
||||
if self.preamble_sent {
|
||||
return Ok(());
|
||||
}
|
||||
let mut buf = Vec::new();
|
||||
nmf::encode_preamble(&self.via_uri, &mut buf)?;
|
||||
self.stream.write_all(&buf).await?;
|
||||
self.stream.flush().await?;
|
||||
|
||||
let record = read_record(&mut self.stream).await?;
|
||||
match record {
|
||||
NmfRecord::PreambleAck => {
|
||||
self.preamble_sent = true;
|
||||
Ok(())
|
||||
}
|
||||
NmfRecord::Fault(message) => Err(ClientError::Fault(message)),
|
||||
other => Err(ClientError::UnexpectedRecord(format!("{other:?}"))),
|
||||
}
|
||||
}
|
||||
|
||||
/// Encode a `SoapEnvelope` to NBFX bytes, wrap in a `SizedEnvelope`
|
||||
/// NMF record, write, then read the peer's reply (a single
|
||||
/// `SizedEnvelope` containing the response envelope).
|
||||
///
|
||||
/// Updates the per-session `write_dictionary` + `read_dictionary`
|
||||
/// so subsequent calls compress recurring strings via the dynamic
|
||||
/// dictionary.
|
||||
pub async fn send_envelope(
|
||||
&mut self,
|
||||
envelope: &SoapEnvelope,
|
||||
) -> Result<crate::DecodedEnvelope, ClientError> {
|
||||
if !self.preamble_sent {
|
||||
return Err(ClientError::PreambleNotSent);
|
||||
}
|
||||
if self.closed {
|
||||
return Err(ClientError::AlreadyClosed);
|
||||
}
|
||||
|
||||
let payload = encode_envelope(envelope, &mut self.write_dictionary)?;
|
||||
let mut framed = Vec::new();
|
||||
NmfRecord::SizedEnvelope(payload).encode_into(&mut framed)?;
|
||||
self.stream.write_all(&framed).await?;
|
||||
self.stream.flush().await?;
|
||||
|
||||
let record = read_record(&mut self.stream).await?;
|
||||
match record {
|
||||
NmfRecord::SizedEnvelope(reply_bytes) => {
|
||||
let decoded = decode_envelope(&reply_bytes, &mut self.read_dictionary)?;
|
||||
Ok(decoded)
|
||||
}
|
||||
NmfRecord::Fault(message) => Err(ClientError::Fault(message)),
|
||||
NmfRecord::End => Err(ClientError::PeerClosed),
|
||||
other => Err(ClientError::UnexpectedRecord(format!("{other:?}"))),
|
||||
}
|
||||
}
|
||||
|
||||
/// Sign a request via the authenticator, then send via
|
||||
/// [`Self::send_envelope`]. Mirrors the .NET pattern at
|
||||
/// `MxAsbDataClient.cs:205-206` (`authenticator.Sign(request);
|
||||
/// channel.RegisterItems(request);`).
|
||||
pub async fn send_signed_envelope(
|
||||
&mut self,
|
||||
action: &str,
|
||||
body_tokens: Vec<mxaccess_asb_nettcp::nbfx::NbfxToken>,
|
||||
force_hmac: bool,
|
||||
) -> Result<crate::DecodedEnvelope, ClientError> {
|
||||
// The .NET `AsbSystemAuthenticator.Sign` hashes the
|
||||
// serialised request XML — `request.ToXml()` — and embeds the
|
||||
// resulting MAC in the ConnectionValidator header. We
|
||||
// approximate that here by signing the SOAP body's UTF-8
|
||||
// representation: caller supplies `body_tokens`, we encode an
|
||||
// unsigned envelope to bytes, hash those bytes, then re-encode
|
||||
// with the validator inserted.
|
||||
//
|
||||
// This isn't byte-identical to .NET's hash because we sign the
|
||||
// NBFX-encoded body rather than the canonical-XML form. F25's
|
||||
// live-probe iteration needs to reconcile this; until then,
|
||||
// the signing is functionally present (validator is built and
|
||||
// attached) but the MAC bytes won't match the .NET MAC for the
|
||||
// same payload.
|
||||
|
||||
let unsigned = SoapEnvelope::new(action).with_body_tokens(body_tokens.clone());
|
||||
let mut probe_dict = DynamicDictionary::new();
|
||||
let unsigned_bytes = encode_envelope(&unsigned, &mut probe_dict)?;
|
||||
let signed = self.authenticator.sign(&unsigned_bytes, force_hmac)?;
|
||||
let validator = ConnectionValidator::from_signed(&signed);
|
||||
let signed_env = SoapEnvelope::new(action)
|
||||
.with_body_tokens(body_tokens)
|
||||
.with_validator(validator);
|
||||
self.send_envelope(&signed_env).await
|
||||
}
|
||||
|
||||
/// `RegisterItems` operation — sends a signed `RegisterItemsIn`
|
||||
/// SOAP envelope and decodes the `RegisterItemsResponse`.
|
||||
pub async fn register_items(
|
||||
&mut self,
|
||||
items: &[ItemIdentity],
|
||||
require_id: bool,
|
||||
register_only: bool,
|
||||
) -> Result<RegisterItemsResponse, ClientError> {
|
||||
let body = build_register_items_request_body(items, require_id, register_only);
|
||||
let response = self
|
||||
.send_signed_envelope(actions::REGISTER_ITEMS, body, false)
|
||||
.await?;
|
||||
Ok(decode_register_items_response(&response.body_tokens)?)
|
||||
}
|
||||
|
||||
/// `UnregisterItems` operation — sends a signed `UnregisterItemsIn`
|
||||
/// SOAP envelope and decodes the `UnregisterItemsResponse`.
|
||||
pub async fn unregister_items(
|
||||
&mut self,
|
||||
items: &[ItemIdentity],
|
||||
) -> Result<UnregisterItemsResponse, ClientError> {
|
||||
let body = build_unregister_items_request_body(items);
|
||||
let response = self
|
||||
.send_signed_envelope(actions::UNREGISTER_ITEMS, body, false)
|
||||
.await?;
|
||||
Ok(decode_unregister_items_response(&response.body_tokens)?)
|
||||
}
|
||||
|
||||
/// Send the NMF `End` record so the peer can drain cleanly. Marks
|
||||
/// the client as closed; subsequent send attempts return
|
||||
/// `ClientError::AlreadyClosed`.
|
||||
pub async fn send_end(&mut self) -> Result<(), ClientError> {
|
||||
if self.closed {
|
||||
return Ok(());
|
||||
}
|
||||
let mut buf = Vec::new();
|
||||
NmfRecord::End.encode_into(&mut buf)?;
|
||||
self.stream.write_all(&buf).await?;
|
||||
self.stream.flush().await?;
|
||||
self.stream.shutdown().await?;
|
||||
self.closed = true;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Test-only: surface raw item-status arrays for assertions.
|
||||
#[doc(hidden)]
|
||||
pub fn _drain_status(response: &RegisterItemsResponse) -> &[ItemStatus] {
|
||||
&response.status
|
||||
}
|
||||
}
|
||||
|
||||
// ---- async record reader -------------------------------------------------
|
||||
|
||||
/// Read one NMF record from `stream`. Returns the parsed
|
||||
/// [`NmfRecord`]; the encoder peers shape sized envelopes as `0x06 +
|
||||
/// Multibyte Int31 length + payload`, so we need streaming reads
|
||||
/// rather than bulk-decode.
|
||||
async fn read_record<T: AsyncRead + Unpin>(stream: &mut T) -> Result<NmfRecord, ClientError> {
|
||||
let mut type_byte = [0u8; 1];
|
||||
stream.read_exact(&mut type_byte).await?;
|
||||
match type_byte[0] {
|
||||
0x06 => {
|
||||
let len = read_multibyte_int31_async(stream).await?;
|
||||
let mut payload = vec![0u8; len];
|
||||
stream.read_exact(&mut payload).await?;
|
||||
Ok(NmfRecord::SizedEnvelope(payload))
|
||||
}
|
||||
0x07 => Ok(NmfRecord::End),
|
||||
0x08 => {
|
||||
let len = read_multibyte_int31_async(stream).await?;
|
||||
let mut payload = vec![0u8; len];
|
||||
stream.read_exact(&mut payload).await?;
|
||||
let message = String::from_utf8(payload).map_err(|_| {
|
||||
ClientError::Nmf(NmfError::Truncated {
|
||||
need: 1,
|
||||
have: 0,
|
||||
stage: "fault-utf8",
|
||||
})
|
||||
})?;
|
||||
Ok(NmfRecord::Fault(message))
|
||||
}
|
||||
0x0A => Ok(NmfRecord::UpgradeResponse),
|
||||
0x0B => Ok(NmfRecord::PreambleAck),
|
||||
0x0C => Ok(NmfRecord::PreambleEnd),
|
||||
// For Version / Mode / Via / KnownEncoding / Extensible /
|
||||
// UnsizedEnvelope / UpgradeRequest, fall back to the bulk
|
||||
// decoder by buffering a single record's worth of bytes. This
|
||||
// path is rarely needed for client→server traffic but exists
|
||||
// for completeness against unexpected peers.
|
||||
other_byte => {
|
||||
// Best-effort: read a small lookahead and dispatch to the
|
||||
// synchronous record decoder. We read at most 2 bytes
|
||||
// since Version (3-byte total), Mode (2), KnownEncoding
|
||||
// (2), UpgradeResponse (1) all fit comfortably.
|
||||
match NmfRecord::decode(&[other_byte]) {
|
||||
Ok((record, _)) => Ok(record),
|
||||
Err(_) => {
|
||||
// Fall through to the multi-byte families. Buffer
|
||||
// up to 64 bytes for the rare paths.
|
||||
let mut tail = vec![0u8; 64];
|
||||
let n = stream.read(&mut tail).await?;
|
||||
let mut combined = vec![other_byte];
|
||||
if let Some(slice) = tail.get(..n) {
|
||||
combined.extend_from_slice(slice);
|
||||
}
|
||||
let (record, _) = NmfRecord::decode(&combined)?;
|
||||
Ok(record)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn read_multibyte_int31_async<T: AsyncRead + Unpin>(
|
||||
stream: &mut T,
|
||||
) -> Result<usize, ClientError> {
|
||||
let mut buf = Vec::with_capacity(5);
|
||||
let mut byte = [0u8; 1];
|
||||
for _ in 0..5 {
|
||||
stream.read_exact(&mut byte).await?;
|
||||
buf.push(byte[0]);
|
||||
if byte[0] & 0x80 == 0 {
|
||||
break;
|
||||
}
|
||||
}
|
||||
let mut cursor = 0usize;
|
||||
let value = decode_multibyte_int31(&buf, &mut cursor)?;
|
||||
usize::try_from(value).map_err(|_| ClientError::Nmf(NmfError::NegativeLength(value)))
|
||||
}
|
||||
|
||||
// ---- error type ----------------------------------------------------------
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
#[non_exhaustive]
|
||||
pub enum ClientError {
|
||||
#[error("I/O error: {0}")]
|
||||
Io(#[from] std::io::Error),
|
||||
#[error("NMF framing error: {0}")]
|
||||
Nmf(#[from] NmfError),
|
||||
#[error("NBFX codec error: {0}")]
|
||||
Nbfx(#[from] NbfxError),
|
||||
#[error("envelope error: {0}")]
|
||||
Envelope(#[from] EnvelopeError),
|
||||
#[error("operation error: {0}")]
|
||||
Operation(#[from] OperationError),
|
||||
#[error("auth error: {0}")]
|
||||
Auth(#[from] mxaccess_asb_nettcp::auth::AuthError),
|
||||
#[error("preamble has not been sent yet — call send_preamble() first")]
|
||||
PreambleNotSent,
|
||||
#[error("client has already been closed via send_end()")]
|
||||
AlreadyClosed,
|
||||
#[error("peer reported NMF fault: {0}")]
|
||||
Fault(String),
|
||||
#[error("peer closed the channel before sending a response")]
|
||||
PeerClosed,
|
||||
#[error("unexpected NMF record on response path: {0}")]
|
||||
UnexpectedRecord(String),
|
||||
}
|
||||
|
||||
/// Convenience: `NmfMode` re-export so callers don't have to pull
|
||||
/// `mxaccess-asb-nettcp` directly to specify the preamble mode.
|
||||
/// (Currently fixed to `Duplex` per the canonical preamble.)
|
||||
pub use mxaccess_asb_nettcp::nmf::NmfMode as PreambleMode;
|
||||
|
||||
#[cfg(test)]
|
||||
#[allow(
|
||||
clippy::unwrap_used,
|
||||
clippy::expect_used,
|
||||
clippy::panic,
|
||||
clippy::indexing_slicing
|
||||
)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use mxaccess_asb_nettcp::auth::CryptoParameters;
|
||||
use mxaccess_asb_nettcp::nmf::NmfMode;
|
||||
use tokio::io::DuplexStream;
|
||||
|
||||
fn make_authenticator() -> AsbAuthenticator {
|
||||
AsbAuthenticator::new("test-passphrase", &CryptoParameters::defaults(), [0u8; 16]).unwrap()
|
||||
}
|
||||
|
||||
/// Spawn a "fake server" task that runs a closure against the peer
|
||||
/// end of an in-memory duplex stream.
|
||||
fn spawn_peer<F, Fut>(peer: DuplexStream, work: F) -> tokio::task::JoinHandle<DuplexStream>
|
||||
where
|
||||
F: FnOnce(DuplexStream) -> Fut + Send + 'static,
|
||||
Fut: std::future::Future<Output = DuplexStream> + Send + 'static,
|
||||
{
|
||||
tokio::spawn(work(peer))
|
||||
}
|
||||
|
||||
/// Read bytes from `stream` until the given total length is met.
|
||||
async fn read_n(stream: &mut DuplexStream, n: usize) -> Vec<u8> {
|
||||
let mut out = vec![0u8; n];
|
||||
stream.read_exact(&mut out).await.unwrap();
|
||||
out
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn send_preamble_completes_when_peer_returns_preamble_ack() {
|
||||
let (client_end, peer_end) = tokio::io::duplex(1024);
|
||||
let peer_task = spawn_peer(peer_end, |mut peer| async move {
|
||||
// Drain the preamble bytes the client sends. We don't
|
||||
// bother decoding them in the test — just count and ack.
|
||||
// The canonical preamble for a "test://" via is short: 5
|
||||
// records, ~30-40 bytes.
|
||||
let mut buf = vec![0u8; 256];
|
||||
let _n = peer.read(&mut buf).await.unwrap();
|
||||
// Send PreambleAck (0x0B) back.
|
||||
peer.write_all(&[0x0Bu8]).await.unwrap();
|
||||
peer.flush().await.unwrap();
|
||||
peer
|
||||
});
|
||||
|
||||
let mut client = AsbClient::new(client_end, make_authenticator(), "test://localhost/path");
|
||||
client.send_preamble().await.unwrap();
|
||||
let _peer = peer_task.await.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn send_preamble_surfaces_fault() {
|
||||
let (client_end, peer_end) = tokio::io::duplex(1024);
|
||||
let peer_task = spawn_peer(peer_end, |mut peer| async move {
|
||||
let mut buf = vec![0u8; 256];
|
||||
let _n = peer.read(&mut buf).await.unwrap();
|
||||
// Send Fault: 0x08 + multibyte-int31 length + UTF-8 bytes
|
||||
let msg = b"server-rejected";
|
||||
let mut frame = vec![0x08u8];
|
||||
mxaccess_asb_nettcp::nmf::encode_multibyte_int31(&mut frame, msg.len() as i32).unwrap();
|
||||
frame.extend_from_slice(msg);
|
||||
peer.write_all(&frame).await.unwrap();
|
||||
peer.flush().await.unwrap();
|
||||
peer
|
||||
});
|
||||
|
||||
let mut client = AsbClient::new(client_end, make_authenticator(), "test://x/y");
|
||||
let err = client.send_preamble().await.unwrap_err();
|
||||
match err {
|
||||
ClientError::Fault(msg) => assert_eq!(msg, "server-rejected"),
|
||||
other => panic!("expected Fault, got {other:?}"),
|
||||
}
|
||||
let _ = peer_task.await.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn send_envelope_round_trips_through_in_memory_peer() {
|
||||
let (client_end, peer_end) = tokio::io::duplex(8192);
|
||||
let peer_task = spawn_peer(peer_end, |mut peer| async move {
|
||||
// 1. Drain preamble
|
||||
let mut buf = vec![0u8; 256];
|
||||
let _n = peer.read(&mut buf).await.unwrap();
|
||||
// 2. Send PreambleAck
|
||||
peer.write_all(&[0x0Bu8]).await.unwrap();
|
||||
peer.flush().await.unwrap();
|
||||
|
||||
// 3. Read SizedEnvelope: 0x06 + len + payload
|
||||
let mut typebyte = [0u8; 1];
|
||||
peer.read_exact(&mut typebyte).await.unwrap();
|
||||
assert_eq!(typebyte[0], 0x06);
|
||||
// Read length (varint up to 5 bytes). Peer mirror.
|
||||
let mut lenbuf = Vec::new();
|
||||
for _ in 0..5 {
|
||||
let mut b = [0u8; 1];
|
||||
peer.read_exact(&mut b).await.unwrap();
|
||||
lenbuf.push(b[0]);
|
||||
if b[0] & 0x80 == 0 {
|
||||
break;
|
||||
}
|
||||
}
|
||||
let mut cursor = 0;
|
||||
let len = mxaccess_asb_nettcp::nmf::decode_multibyte_int31(&lenbuf, &mut cursor)
|
||||
.unwrap() as usize;
|
||||
let _request_payload = read_n(&mut peer, len).await;
|
||||
|
||||
// 4. Reply with a SizedEnvelope echoing a synthetic
|
||||
// RegisterItemsResponse.
|
||||
use mxaccess_codec::AsbStatus;
|
||||
let status = vec![ItemStatus {
|
||||
item: ItemIdentity::absolute_by_name("Tag.A"),
|
||||
status: AsbStatus::default(),
|
||||
error_code: 0,
|
||||
error_code_specified: true,
|
||||
}];
|
||||
let payload = crate::contracts::encode_item_status_array(&status);
|
||||
let body = synthesise_register_response_body(payload);
|
||||
let envelope = SoapEnvelope::new(actions::REGISTER_ITEMS).with_body_tokens(body);
|
||||
let mut response_dict = DynamicDictionary::new();
|
||||
let envelope_bytes = encode_envelope(&envelope, &mut response_dict).unwrap();
|
||||
|
||||
let mut frame = vec![0x06u8];
|
||||
mxaccess_asb_nettcp::nmf::encode_multibyte_int31(
|
||||
&mut frame,
|
||||
envelope_bytes.len() as i32,
|
||||
)
|
||||
.unwrap();
|
||||
frame.extend_from_slice(&envelope_bytes);
|
||||
peer.write_all(&frame).await.unwrap();
|
||||
peer.flush().await.unwrap();
|
||||
|
||||
peer
|
||||
});
|
||||
|
||||
let mut client = AsbClient::new(client_end, make_authenticator(), "test://h/p");
|
||||
client.send_preamble().await.unwrap();
|
||||
// Use authenticator's hash variant — but to make the test
|
||||
// deterministic, set the algorithm so MAC happens. Default
|
||||
// params use MD5 + we accepted a synthetic remote pub key, so
|
||||
// the authenticator hasn't seen `accept_connect_response` yet.
|
||||
// Skip that branch by using `register_items` after manually
|
||||
// priming the authenticator.
|
||||
let bob = make_authenticator();
|
||||
client
|
||||
.authenticator_mut()
|
||||
.accept_connect_response(bob.local_public_key(), None);
|
||||
|
||||
let response = client
|
||||
.register_items(&[ItemIdentity::absolute_by_name("Tag.A")], true, false)
|
||||
.await;
|
||||
let response = response.unwrap();
|
||||
assert_eq!(response.status.len(), 1);
|
||||
assert_eq!(response.status[0].item.name.as_deref(), Some("Tag.A"));
|
||||
|
||||
let _ = peer_task.await.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn send_envelope_before_preamble_fails() {
|
||||
let (client_end, _peer_end) = tokio::io::duplex(1024);
|
||||
let mut client = AsbClient::new(client_end, make_authenticator(), "test://x/y");
|
||||
let env = SoapEnvelope::new(actions::READ);
|
||||
let err = client.send_envelope(&env).await.unwrap_err();
|
||||
assert!(matches!(err, ClientError::PreambleNotSent));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn send_end_writes_record_07() {
|
||||
let (client_end, mut peer_end) = tokio::io::duplex(1024);
|
||||
let mut client = AsbClient::new(client_end, make_authenticator(), "test://x/y");
|
||||
// We need preamble done so we don't hit the not-sent guard
|
||||
// (send_end has no such guard, but the flow is more realistic).
|
||||
client.preamble_sent = true;
|
||||
client.send_end().await.unwrap();
|
||||
let mut buf = [0u8; 1];
|
||||
peer_end.read_exact(&mut buf).await.unwrap();
|
||||
assert_eq!(buf[0], 0x07);
|
||||
}
|
||||
|
||||
/// Helper: build a synthetic body matching the
|
||||
/// RegisterItemsResponse shape so the test can verify the client
|
||||
/// extracts the Status array correctly.
|
||||
fn synthesise_register_response_body(
|
||||
status_payload: Vec<u8>,
|
||||
) -> Vec<mxaccess_asb_nettcp::nbfx::NbfxToken> {
|
||||
use mxaccess_asb_nettcp::nbfx::{NbfxName, NbfxText, NbfxToken};
|
||||
const IOM_NS: &str = "urn:msg.data.asb.iom:2";
|
||||
vec![
|
||||
NbfxToken::Element {
|
||||
prefix: None,
|
||||
name: NbfxName::Inline("RegisterItemsResponse".to_string()),
|
||||
},
|
||||
NbfxToken::DefaultNamespace {
|
||||
value: NbfxText::Chars(IOM_NS.to_string()),
|
||||
},
|
||||
NbfxToken::Element {
|
||||
prefix: None,
|
||||
name: NbfxName::Inline("Status".to_string()),
|
||||
},
|
||||
NbfxToken::Element {
|
||||
prefix: None,
|
||||
name: NbfxName::Inline("ASBIData".to_string()),
|
||||
},
|
||||
NbfxToken::Text(NbfxText::Bytes(status_payload)),
|
||||
NbfxToken::EndElement, // </ASBIData>
|
||||
NbfxToken::EndElement, // </Status>
|
||||
NbfxToken::EndElement, // </RegisterItemsResponse>
|
||||
]
|
||||
}
|
||||
|
||||
/// Sanity-check that NmfMode re-export matches the upstream type.
|
||||
#[test]
|
||||
fn preamble_mode_reexport_matches_upstream() {
|
||||
assert_eq!(PreambleMode::Duplex as u8, NmfMode::Duplex as u8);
|
||||
}
|
||||
}
|
||||
@@ -9,10 +9,13 @@
|
||||
|
||||
#![forbid(unsafe_code)]
|
||||
|
||||
pub mod client;
|
||||
pub mod contracts;
|
||||
pub mod envelope;
|
||||
pub mod operations;
|
||||
|
||||
pub use client::{AsbClient, ClientError, PreambleMode};
|
||||
|
||||
pub use contracts::{
|
||||
ItemIdentity, ItemIdentityType, ItemReferenceType, ItemStatus, decode_item_identity_array,
|
||||
decode_item_status_array, encode_item_identity_array, encode_item_status_array,
|
||||
|
||||
Reference in New Issue
Block a user