[M5] mxaccess-asb: F25 step 9 — Write operation

Closes the highest-value remaining IASBIDataV2 op. With Write landed,
the read+write+subscribe path is functionally complete in-memory.

API additions:
* `MinimalWriteValue { value: AsbVariant }` — carries just the Value
  payload. Optional ArrayElementIndex / Comment / HasQT / Status /
  Timestamp fields are deferred to a later iteration once a live
  capture confirms the WCF DataContract XML form.
* `build_write_request_body(items, values, write_handle)` per
  `AsbContracts.cs:181-194`:
  ```xml
  <WriteBasicRequest xmlns="urn:msg.data.asb.iom:2">
    <Items><ASBIData>{ItemIdentity[] binary}</ASBIData></Items>
    <Values>
      <WriteValue><Value><ASBIData>{Variant binary}</ASBIData></Value></WriteValue>
      ...
    </Values>
    <WriteHandle>{i32}</WriteHandle>
  </WriteBasicRequest>
  ```
  Items array uses the IAsbCustomSerializableType binary fast-path;
  each Value's inner Variant also uses the fast-path. WriteHandle is
  an Int32 (opaque correlation echoed in PublishWriteComplete).
* `decode_write_response` — per-item Status array (mirrors the
  unregister/register pattern).
* `AsbClient::write(items, values, write_handle)` — thin wrapper.

4 new tests:
* `write_request_body_carries_items_values_and_write_handle` — body
  shape sanity (WriteHandle = 7 Int32, WriteValue element present).
* `write_request_body_pairs_items_and_values_arrays` — 2 items + 2
  values produces 2 WriteValue elements.
* `write_response_round_trips_status_array` — Status decode.
* `write_response_missing_status_fails` — graceful MissingField
  error.

Workspace: 695 tests pass (was 691, +4).

Stubbed for next F25 iterations:
* `PublishWriteComplete` — empty request, `ItemWriteComplete[]`
  response.
* `DeleteMonitoredItems` — mirrors AddMonitoredItems pattern.
* Optional WriteValue fields (Comment / Timestamp / etc.) once a
  live capture confirms the wire-byte layout.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-05 13:04:11 -04:00
parent b543eb1f84
commit 0441a2e693
4 changed files with 231 additions and 14 deletions
+25 -4
View File
@@ -56,16 +56,16 @@ use crate::contracts::{ItemIdentity, ItemStatus};
use crate::envelope::{ConnectionValidator, EnvelopeError, SoapEnvelope};
use crate::operations::{
AddMonitoredItemsResponse, ConnectResponse, CreateSubscriptionResponse,
DeleteSubscriptionResponse, MinimalMonitoredItem, OperationError, PublishResponse,
ReadResponse, RegisterItemsResponse, UnregisterItemsResponse,
DeleteSubscriptionResponse, MinimalMonitoredItem, MinimalWriteValue, OperationError,
PublishResponse, ReadResponse, RegisterItemsResponse, UnregisterItemsResponse, WriteResponse,
build_add_monitored_items_request_body, build_authenticate_me_request_body,
build_connect_request_body, build_create_subscription_request_body,
build_delete_subscription_request_body, build_disconnect_request_body,
build_keep_alive_request_body, build_publish_request_body, build_read_request_body,
build_register_items_request_body, build_unregister_items_request_body,
decode_add_monitored_items_response, decode_connect_response,
build_write_request_body, decode_add_monitored_items_response, decode_connect_response,
decode_create_subscription_response, decode_publish_response, decode_read_response,
decode_register_items_response, decode_unregister_items_response,
decode_register_items_response, decode_unregister_items_response, decode_write_response,
};
use crate::{actions, decode_envelope, encode_envelope};
@@ -341,6 +341,27 @@ impl<T: AsyncRead + AsyncWrite + Unpin + Send> AsbClient<T> {
Ok(decode_read_response(&response.body_tokens)?)
}
/// `Write` operation — sends a signed `WriteIn` SOAP envelope and
/// decodes the `WriteResponse` (per-item Status array).
///
/// `items.len()` must equal `values.len()`; the .NET reference
/// pairs them positionally per `MxAsbDataClient.cs` Write path.
/// `write_handle` is an opaque correlation ID echoed in the
/// PublishWriteComplete callback (irrelevant for fire-and-forget
/// writes; pass `0`).
pub async fn write(
&mut self,
items: &[ItemIdentity],
values: &[MinimalWriteValue],
write_handle: u32,
) -> Result<WriteResponse, ClientError> {
let body = build_write_request_body(items, values, write_handle);
let response = self
.send_signed_envelope(actions::WRITE, body, false)
.await?;
Ok(decode_write_response(&response.body_tokens)?)
}
/// `CreateSubscription` operation — allocates a server-side
/// subscription and returns its ID. Caller threads the ID through
/// subsequent `add_monitored_items` / `publish` /
+10 -9
View File
@@ -27,14 +27,15 @@ pub use envelope::{
};
pub use operations::{
AddMonitoredItemsResponse, AuthenticationDataBytes, ConnectResponse,
CreateSubscriptionResponse, DeleteSubscriptionResponse, MinimalMonitoredItem, OperationError,
PublishResponse, ReadResponse, RegisterItemsResponse, UnregisterItemsResponse,
build_add_monitored_items_request_body, build_authenticate_me_request_body,
build_connect_request_body, build_create_subscription_request_body,
build_delete_subscription_request_body, build_disconnect_request_body,
build_keep_alive_request_body, build_publish_request_body, build_read_request_body,
build_register_items_request_body, build_unregister_items_request_body,
collect_asbidata_payloads, decode_add_monitored_items_response, decode_connect_response,
CreateSubscriptionResponse, DeleteSubscriptionResponse, MinimalMonitoredItem,
MinimalWriteValue, OperationError, PublishResponse, ReadResponse, RegisterItemsResponse,
UnregisterItemsResponse, WriteResponse, build_add_monitored_items_request_body,
build_authenticate_me_request_body, build_connect_request_body,
build_create_subscription_request_body, build_delete_subscription_request_body,
build_disconnect_request_body, build_keep_alive_request_body, build_publish_request_body,
build_read_request_body, build_register_items_request_body,
build_unregister_items_request_body, build_write_request_body, collect_asbidata_payloads,
decode_add_monitored_items_response, decode_connect_response,
decode_create_subscription_response, decode_publish_response, decode_read_response,
decode_register_items_response, decode_unregister_items_response,
decode_register_items_response, decode_unregister_items_response, decode_write_response,
};
+191
View File
@@ -395,6 +395,114 @@ fn find_inline_text(
None
}
// ---- Write operation (F25 step 9) ---------------------------------------
/// Minimal `WriteValue` shape carrying just the AsbVariant payload. The
/// full .NET `WriteValue` (`AsbContracts.cs:793-894`) also has optional
/// ArrayElementIndex, Comment, HasQT, Status, and Timestamp fields.
/// Those are deferred to a later F25 iteration once a live capture
/// confirms the WCF DataContract XML wire form.
///
/// Note: the .NET `WriteValue` does NOT carry `Item` directly —
/// `WriteBasicRequest` carries `Items[]` + `Values[]` as parallel
/// arrays. We mirror that wire shape — see [`build_write_request_body`].
#[derive(Debug, Clone, PartialEq)]
pub struct MinimalWriteValue {
pub value: mxaccess_codec::AsbVariant,
}
impl MinimalWriteValue {
pub fn new(value: mxaccess_codec::AsbVariant) -> Self {
Self { value }
}
}
/// Build the NBFX token stream for a `WriteIn` request body. Mirrors
/// `AsbContracts.cs:181-194`. The Items array uses the
/// IAsbCustomSerializableType binary fast-path (`<ASBIData>` Bytes
/// record); the Values array is per-WriteValue regular XML — though
/// the Variant inside each WriteValue/Value field IS
/// IAsbCustomSerializableType so it gets `<ASBIData>` wrapping.
///
/// **Wire-byte caveat**: optional ArrayElementIndex / Comment / HasQT
/// / Status / Timestamp fields are not emitted. Live-probe iteration
/// will reconcile.
pub fn build_write_request_body(
items: &[ItemIdentity],
values: &[MinimalWriteValue],
write_handle: u32,
) -> Vec<NbfxToken> {
let items_payload = encode_item_identity_array(items);
let mut tokens = vec![
NbfxToken::Element {
prefix: None,
name: NbfxName::Inline("WriteBasicRequest".to_string()),
},
NbfxToken::DefaultNamespace {
value: NbfxText::Chars(IOM_NS.to_string()),
},
NbfxToken::Element {
prefix: None,
name: NbfxName::Inline("Items".to_string()),
},
NbfxToken::Element {
prefix: None,
name: NbfxName::Inline("ASBIData".to_string()),
},
NbfxToken::Text(NbfxText::Bytes(items_payload)),
NbfxToken::EndElement, // </ASBIData>
NbfxToken::EndElement, // </Items>
NbfxToken::Element {
prefix: None,
name: NbfxName::Inline("Values".to_string()),
},
];
for v in values {
tokens.push(NbfxToken::Element {
prefix: None,
name: NbfxName::Inline("WriteValue".to_string()),
});
tokens.push(NbfxToken::Element {
prefix: None,
name: NbfxName::Inline("Value".to_string()),
});
tokens.push(NbfxToken::Element {
prefix: None,
name: NbfxName::Inline("ASBIData".to_string()),
});
tokens.push(NbfxToken::Text(NbfxText::Bytes(v.value.encode())));
tokens.push(NbfxToken::EndElement); // </ASBIData>
tokens.push(NbfxToken::EndElement); // </Value>
tokens.push(NbfxToken::EndElement); // </WriteValue>
}
tokens.push(NbfxToken::EndElement); // </Values>
tokens.push(NbfxToken::Element {
prefix: None,
name: NbfxName::Inline("WriteHandle".to_string()),
});
tokens.push(NbfxToken::Text(NbfxText::Int32(write_handle as i32)));
tokens.push(NbfxToken::EndElement);
tokens.push(NbfxToken::EndElement); // </WriteBasicRequest>
tokens
}
/// Decoded `WriteResponse`. Mirrors `AsbContracts.cs:196-202` — just
/// the per-item Status array.
#[derive(Debug, Clone, PartialEq)]
pub struct WriteResponse {
pub status: Vec<ItemStatus>,
}
pub fn decode_write_response(body_tokens: &[NbfxToken]) -> Result<WriteResponse, OperationError> {
let payload = collect_asbidata_payloads(body_tokens, "Status")
.into_iter()
.next()
.ok_or(OperationError::MissingField { field: "Status" })?;
let status = decode_item_status_array(&payload)?;
Ok(WriteResponse { status })
}
// ---- Subscription operations (F25 step 8) -------------------------------
/// Build the NBFX token stream for a `CreateSubscriptionIn` request
@@ -1601,6 +1709,89 @@ mod tests {
assert!(decoded.values.is_empty());
}
#[test]
fn write_request_body_carries_items_values_and_write_handle() {
use mxaccess_codec::AsbVariant;
let items = vec![ItemIdentity::absolute_by_name("Tag.X")];
let values = vec![MinimalWriteValue::new(AsbVariant::from_i32(42))];
let body = build_write_request_body(&items, &values, 7);
assert!(matches!(
&body[0],
NbfxToken::Element { name: NbfxName::Inline(s), .. } if s == "WriteBasicRequest"
));
// WriteHandle = 7 (Int32)
let mut saw_write_handle = false;
let mut saw_write_value_element = false;
for tok in &body {
if let NbfxToken::Text(NbfxText::Int32(7)) = tok {
saw_write_handle = true;
}
if let NbfxToken::Element {
name: NbfxName::Inline(local),
..
} = tok
{
if local == "WriteValue" {
saw_write_value_element = true;
}
}
}
assert!(saw_write_handle);
assert!(saw_write_value_element);
}
#[test]
fn write_request_body_pairs_items_and_values_arrays() {
use mxaccess_codec::AsbVariant;
let items = vec![
ItemIdentity::absolute_by_name("Tag.A"),
ItemIdentity::absolute_by_name("Tag.B"),
];
let values = vec![
MinimalWriteValue::new(AsbVariant::from_i32(1)),
MinimalWriteValue::new(AsbVariant::from_i32(2)),
];
let body = build_write_request_body(&items, &values, 0);
// Two WriteValue elements should appear under <Values>.
let n_write_value_elements = body
.iter()
.filter(|tok| {
matches!(
tok,
NbfxToken::Element { name: NbfxName::Inline(s), .. } if s == "WriteValue"
)
})
.count();
assert_eq!(n_write_value_elements, 2);
}
#[test]
fn write_response_round_trips_status_array() {
use mxaccess_codec::AsbStatus;
let status = vec![ItemStatus {
item: ItemIdentity::absolute_by_name("Tag.X"),
status: AsbStatus::default(),
error_code: 0,
error_code_specified: true,
}];
let payload = crate::contracts::encode_item_status_array(&status);
let body =
asbidata_request_body("WriteResponse", &[BodyField::asbidata("Status", payload)]);
let decoded = decode_write_response(&body).unwrap();
assert_eq!(decoded.status, status);
}
#[test]
fn write_response_missing_status_fails() {
let body = asbidata_request_body("WriteResponse", &[]);
let err = decode_write_response(&body).unwrap_err();
assert!(matches!(
err,
OperationError::MissingField { field: "Status" }
));
}
#[test]
fn create_subscription_body_carries_max_queue_and_sample_interval() {
let body = build_create_subscription_request_body(0, 1000);