[M5] mxaccess-asb: F25 step 5 — KeepAlive + Read + one-way client ops

Extends AsbClient with one-way operation support (`IsOneWay = true`
in IASBIDataV2) plus the KeepAlive and Read operations.

Client API additions:
* `send_envelope_one_way(env)` — frames in SizedEnvelope, writes,
  returns immediately. No response read. Mirrors WCF's IsOneWay
  semantics for KeepAlive / Disconnect / AuthenticateMe.
* `send_signed_envelope_one_way(action, body, force_hmac)` —
  one-way variant that runs the body through F23's authenticator
  signing path so the ConnectionValidator header is attached.
* `keep_alive()` — sends an empty `KeepAliveRequest` with default
  signing. Used to keep the channel alive past the WCF inactivity
  timeout (30s default at `MxAsbDataClient.cs:683`).
* `read(items)` — sends a signed Read envelope, decodes
  ReadResponse with both Status and Values arrays.

Operations API additions:
* `build_keep_alive_request_body()` — empty wrapper element +
  asb.contracts.messages namespace. Mirror of `AsbContracts.cs:117`
  (`public sealed class KeepAlive : ConnectedRequest;`).
* `ReadResponse { status: Vec<ItemStatus>, values: Vec<RuntimeValue> }`
  per `AsbContracts.cs:169-179`.
* `decode_read_response(body_tokens)` — pulls both ASBIData
  payloads, decodes Status as ItemStatus[], decodes Values via
  `decode_runtime_value_array` (4-byte int32 count + per-element
  `RuntimeValue::decode` from F24).

5 new tests:
* KeepAlive body shape (empty wrapper, correct namespace).
* ReadResponse decoder round-trip with both Status and Values.
* ReadResponse decoder graceful handling when Values is absent
  (returns empty vec).
* End-to-end client::keep_alive — peer drains SizedEnvelope but
  doesn't respond; client returns Ok().
* End-to-end client::read — peer responds with synthetic
  ReadResponse, client recovers Values[0].timestamp_binary == 1234
  and Values[0].status round-trip.

Stubbed for next F25 iterations:
* AsbClient::connect — DH Connect + AuthenticateMe handshake. Needs
  ConnectRequest / ConnectResponse builders (regular WCF XML, not
  the IAsbCustomSerializableType fast-path).
* Write / PublishWriteComplete / CreateSubscription /
  AddMonitoredItems / Publish / Disconnect operation wrappers.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-05 11:42:39 -04:00
parent 1e59249662
commit 9b8133f725
4 changed files with 401 additions and 8 deletions
+222 -3
View File
@@ -55,9 +55,10 @@ use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use crate::contracts::{ItemIdentity, ItemStatus};
use crate::envelope::{ConnectionValidator, EnvelopeError, SoapEnvelope};
use crate::operations::{
OperationError, RegisterItemsResponse, UnregisterItemsResponse,
build_register_items_request_body, build_unregister_items_request_body,
decode_register_items_response, decode_unregister_items_response,
OperationError, ReadResponse, RegisterItemsResponse, UnregisterItemsResponse,
build_keep_alive_request_body, build_read_request_body, build_register_items_request_body,
build_unregister_items_request_body, decode_read_response, decode_register_items_response,
decode_unregister_items_response,
};
use crate::{actions, decode_envelope, encode_envelope};
@@ -195,6 +196,69 @@ impl<T: AsyncRead + AsyncWrite + Unpin + Send> AsbClient<T> {
self.send_envelope(&signed_env).await
}
/// One-way send: encode + frame + write, but do **not** read a
/// response. Mirrors WCF's `[OperationContract(IsOneWay = true)]`
/// semantics — `KeepAlive`, `Disconnect`, and `AuthenticateMe` all
/// take this path on the .NET side.
pub async fn send_envelope_one_way(
&mut self,
envelope: &SoapEnvelope,
) -> Result<(), ClientError> {
if !self.preamble_sent {
return Err(ClientError::PreambleNotSent);
}
if self.closed {
return Err(ClientError::AlreadyClosed);
}
let payload = encode_envelope(envelope, &mut self.write_dictionary)?;
let mut framed = Vec::new();
NmfRecord::SizedEnvelope(payload).encode_into(&mut framed)?;
self.stream.write_all(&framed).await?;
self.stream.flush().await?;
Ok(())
}
/// One-way signed send for operations that need a
/// `ConnectionValidator` header. Mirrors `MxAsbDataClient` calls
/// like `authenticator.Sign(...)` followed by an `IsOneWay = true`
/// channel call.
pub async fn send_signed_envelope_one_way(
&mut self,
action: &str,
body_tokens: Vec<mxaccess_asb_nettcp::nbfx::NbfxToken>,
force_hmac: bool,
) -> Result<(), ClientError> {
let unsigned = SoapEnvelope::new(action).with_body_tokens(body_tokens.clone());
let mut probe_dict = DynamicDictionary::new();
let unsigned_bytes = encode_envelope(&unsigned, &mut probe_dict)?;
let signed = self.authenticator.sign(&unsigned_bytes, force_hmac)?;
let validator = ConnectionValidator::from_signed(&signed);
let signed_env = SoapEnvelope::new(action)
.with_body_tokens(body_tokens)
.with_validator(validator);
self.send_envelope_one_way(&signed_env).await
}
/// `KeepAlive` operation — one-way signed envelope with an empty
/// `KeepAliveRequest` body. Used to keep the channel alive past
/// the WCF inactivity timeout (`MxAsbDataClient.cs:683`,
/// `ReliableSession.InactivityTimeout = 30s`).
pub async fn keep_alive(&mut self) -> Result<(), ClientError> {
let body = build_keep_alive_request_body();
self.send_signed_envelope_one_way(actions::KEEP_ALIVE, body, false)
.await
}
/// `Read` operation — sends a signed `ReadIn` SOAP envelope and
/// decodes the `ReadResponse` (Status array + Values array).
pub async fn read(&mut self, items: &[ItemIdentity]) -> Result<ReadResponse, ClientError> {
let body = build_read_request_body(items);
let response = self
.send_signed_envelope(actions::READ, body, false)
.await?;
Ok(decode_read_response(&response.body_tokens)?)
}
/// `RegisterItems` operation — sends a signed `RegisterItemsIn`
/// SOAP envelope and decodes the `RegisterItemsResponse`.
pub async fn register_items(
@@ -578,4 +642,159 @@ mod tests {
fn preamble_mode_reexport_matches_upstream() {
assert_eq!(PreambleMode::Duplex as u8, NmfMode::Duplex as u8);
}
#[tokio::test]
async fn keep_alive_writes_one_way_envelope_without_reading_response() {
let (client_end, peer_end) = tokio::io::duplex(8192);
let peer_task = spawn_peer(peer_end, |mut peer| async move {
// Drain preamble + send PreambleAck
let mut buf = vec![0u8; 256];
let _n = peer.read(&mut buf).await.unwrap();
peer.write_all(&[0x0Bu8]).await.unwrap();
peer.flush().await.unwrap();
// Drain the KeepAlive SizedEnvelope. Don't reply — one-way op.
let mut typebyte = [0u8; 1];
peer.read_exact(&mut typebyte).await.unwrap();
assert_eq!(typebyte[0], 0x06);
let mut lenbuf = Vec::new();
for _ in 0..5 {
let mut b = [0u8; 1];
peer.read_exact(&mut b).await.unwrap();
lenbuf.push(b[0]);
if b[0] & 0x80 == 0 {
break;
}
}
let mut cursor = 0;
let len = mxaccess_asb_nettcp::nmf::decode_multibyte_int31(&lenbuf, &mut cursor)
.unwrap() as usize;
let _payload = read_n(&mut peer, len).await;
peer
});
let mut client = AsbClient::new(client_end, make_authenticator(), "test://h/p");
client.send_preamble().await.unwrap();
let bob = make_authenticator();
client
.authenticator_mut()
.accept_connect_response(bob.local_public_key(), None);
client.keep_alive().await.unwrap();
let _ = peer_task.await.unwrap();
}
#[tokio::test]
async fn read_round_trips_through_in_memory_peer() {
use mxaccess_codec::{AsbStatus, AsbVariant, RuntimeValue};
let (client_end, peer_end) = tokio::io::duplex(8192);
let peer_task = spawn_peer(peer_end, |mut peer| async move {
// 1. Drain preamble + send ack
let mut buf = vec![0u8; 256];
let _n = peer.read(&mut buf).await.unwrap();
peer.write_all(&[0x0Bu8]).await.unwrap();
peer.flush().await.unwrap();
// 2. Drain Read SizedEnvelope
let mut typebyte = [0u8; 1];
peer.read_exact(&mut typebyte).await.unwrap();
assert_eq!(typebyte[0], 0x06);
let mut lenbuf = Vec::new();
for _ in 0..5 {
let mut b = [0u8; 1];
peer.read_exact(&mut b).await.unwrap();
lenbuf.push(b[0]);
if b[0] & 0x80 == 0 {
break;
}
}
let mut cursor = 0;
let len = mxaccess_asb_nettcp::nmf::decode_multibyte_int31(&lenbuf, &mut cursor)
.unwrap() as usize;
let _request_payload = read_n(&mut peer, len).await;
// 3. Synthesize ReadResponse: Status + Values arrays
let status = vec![ItemStatus {
item: ItemIdentity::absolute_by_name("Tag.A"),
status: AsbStatus::default(),
error_code: 0,
error_code_specified: true,
}];
let values = vec![RuntimeValue {
timestamp_binary: 1234,
timestamp_specified: true,
value: AsbVariant::from_i32(99),
status: AsbStatus::default(),
}];
let status_payload = crate::contracts::encode_item_status_array(&status);
let mut values_payload = (values.len() as i32).to_le_bytes().to_vec();
for v in &values {
v.encode_into(&mut values_payload);
}
let body = synthesise_read_response_body(status_payload, values_payload);
let envelope = SoapEnvelope::new(actions::READ).with_body_tokens(body);
let mut response_dict = DynamicDictionary::new();
let envelope_bytes = encode_envelope(&envelope, &mut response_dict).unwrap();
let mut frame = vec![0x06u8];
mxaccess_asb_nettcp::nmf::encode_multibyte_int31(
&mut frame,
envelope_bytes.len() as i32,
)
.unwrap();
frame.extend_from_slice(&envelope_bytes);
peer.write_all(&frame).await.unwrap();
peer.flush().await.unwrap();
peer
});
let mut client = AsbClient::new(client_end, make_authenticator(), "test://h/p");
client.send_preamble().await.unwrap();
let bob = make_authenticator();
client
.authenticator_mut()
.accept_connect_response(bob.local_public_key(), None);
let response = client
.read(&[ItemIdentity::absolute_by_name("Tag.A")])
.await
.unwrap();
assert_eq!(response.status.len(), 1);
assert_eq!(response.values.len(), 1);
assert_eq!(response.values[0].timestamp_binary, 1234);
let _ = peer_task.await.unwrap();
}
fn synthesise_read_response_body(
status_payload: Vec<u8>,
values_payload: Vec<u8>,
) -> Vec<mxaccess_asb_nettcp::nbfx::NbfxToken> {
use mxaccess_asb_nettcp::nbfx::{NbfxName, NbfxText, NbfxToken};
const IOM_NS: &str = "urn:msg.data.asb.iom:2";
let mut tokens = vec![
NbfxToken::Element {
prefix: None,
name: NbfxName::Inline("ReadResponse".to_string()),
},
NbfxToken::DefaultNamespace {
value: NbfxText::Chars(IOM_NS.to_string()),
},
];
for (name, payload) in [("Status", status_payload), ("Values", values_payload)] {
tokens.push(NbfxToken::Element {
prefix: None,
name: NbfxName::Inline(name.to_string()),
});
tokens.push(NbfxToken::Element {
prefix: None,
name: NbfxName::Inline("ASBIData".to_string()),
});
tokens.push(NbfxToken::Text(NbfxText::Bytes(payload)));
tokens.push(NbfxToken::EndElement);
tokens.push(NbfxToken::EndElement);
}
tokens.push(NbfxToken::EndElement);
tokens
}
}