[M5] mxaccess-asb: F25 step 10 — PublishWriteComplete + DeleteMonitoredItems

Closes out the F25 operation matrix. AsbClient now wraps every
IASBIDataV2 operation:

  Lifecycle:    connect / disconnect / send_end / send_preamble / keep_alive
  Items:        register_items / unregister_items / read / write
  Subscriptions:create_subscription / add_monitored_items / publish
                / delete_monitored_items / delete_subscription
  Write cb:     publish_write_complete

API additions:
* `build_publish_write_complete_request_body()` — empty wrapper
  per `AsbContracts.cs:204-205`. No body fields beyond inherited
  ConnectionValidator.
* `decode_publish_write_complete_response` — returns count of
  `<ItemWriteComplete>` elements observed. Per-element decode
  (Status + WriteHandle) deferred to a later iteration since
  ItemWriteComplete is regular WCF DataContract rather than the
  binary fast-path.
* `build_delete_monitored_items_request_body` — same MonitoredItem
  shape as AddMonitoredItems but omits RequireId per `cs:268-277`.
* `decode_delete_monitored_items_response` — per-item Status array.
* Client wrappers: `publish_write_complete()`,
  `delete_monitored_items(subscription_id, items)`.

6 new tests:
* `publish_write_complete_body_is_empty_wrapper` — body shape.
* `publish_write_complete_response_counts_item_write_complete_elements`
  — counts 2 / 0 elements correctly.
* `publish_write_complete_response_zero_when_no_callbacks`.
* `delete_monitored_items_body_carries_subscription_id_and_items`.
* `delete_monitored_items_body_omits_require_id_field`.
* `delete_monitored_items_response_round_trip`.

Workspace: 701 tests pass (was 695, +6).

Stubbed for future iterations:
* ItemWriteComplete per-element decode (Status + WriteHandle) once
  a live capture confirms the WCF DataContract XML wire form.
* Optional MonitoredItem fields (Active / TimeDeadband /
  ValueDeadband / UserData) — same wire-byte uncertainty.
* Optional WriteValue fields (Comment / Timestamp / etc.).

All wire-byte caveats trace back to live-probe reconciliation
against an actual AVEVA VM.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-05 13:17:01 -04:00
parent 0441a2e693
commit 9876b4ebb4
4 changed files with 310 additions and 16 deletions
+42 -5
View File
@@ -56,15 +56,18 @@ use crate::contracts::{ItemIdentity, ItemStatus};
use crate::envelope::{ConnectionValidator, EnvelopeError, SoapEnvelope};
use crate::operations::{
AddMonitoredItemsResponse, ConnectResponse, CreateSubscriptionResponse,
DeleteSubscriptionResponse, MinimalMonitoredItem, MinimalWriteValue, OperationError,
PublishResponse, ReadResponse, RegisterItemsResponse, UnregisterItemsResponse, WriteResponse,
DeleteMonitoredItemsResponse, DeleteSubscriptionResponse, MinimalMonitoredItem,
MinimalWriteValue, OperationError, PublishResponse, PublishWriteCompleteResponse, ReadResponse,
RegisterItemsResponse, UnregisterItemsResponse, WriteResponse,
build_add_monitored_items_request_body, build_authenticate_me_request_body,
build_connect_request_body, build_create_subscription_request_body,
build_delete_subscription_request_body, build_disconnect_request_body,
build_keep_alive_request_body, build_publish_request_body, build_read_request_body,
build_delete_monitored_items_request_body, build_delete_subscription_request_body,
build_disconnect_request_body, build_keep_alive_request_body, build_publish_request_body,
build_publish_write_complete_request_body, build_read_request_body,
build_register_items_request_body, build_unregister_items_request_body,
build_write_request_body, decode_add_monitored_items_response, decode_connect_response,
decode_create_subscription_response, decode_publish_response, decode_read_response,
decode_create_subscription_response, decode_delete_monitored_items_response,
decode_publish_response, decode_publish_write_complete_response, decode_read_response,
decode_register_items_response, decode_unregister_items_response, decode_write_response,
};
use crate::{actions, decode_envelope, encode_envelope};
@@ -341,6 +344,40 @@ impl<T: AsyncRead + AsyncWrite + Unpin + Send> AsbClient<T> {
Ok(decode_read_response(&response.body_tokens)?)
}
/// `PublishWriteComplete` operation — long-poll the
/// write-completion-callback queue. Mirrors the
/// `[OperationContract(Action = "...:publishWriteCompleteIn")]`
/// at `AsbContracts.cs:42`. Returns a count of completed writes
/// (per-element decode is deferred to a later iteration once a
/// live capture confirms the WCF DataContract XML shape).
pub async fn publish_write_complete(
&mut self,
) -> Result<PublishWriteCompleteResponse, ClientError> {
let body = build_publish_write_complete_request_body();
let response = self
.send_signed_envelope(actions::PUBLISH_WRITE_COMPLETE, body, false)
.await?;
Ok(decode_publish_write_complete_response(
&response.body_tokens,
)?)
}
/// `DeleteMonitoredItems` operation — removes items from a
/// subscription. Returns the per-item Status array.
pub async fn delete_monitored_items(
&mut self,
subscription_id: i64,
items: &[MinimalMonitoredItem],
) -> Result<DeleteMonitoredItemsResponse, ClientError> {
let body = build_delete_monitored_items_request_body(subscription_id, items);
let response = self
.send_signed_envelope(actions::DELETE_MONITORED_ITEMS, body, false)
.await?;
Ok(decode_delete_monitored_items_response(
&response.body_tokens,
)?)
}
/// `Write` operation — sends a signed `WriteIn` SOAP envelope and
/// decodes the `WriteResponse` (per-item Status array).
///
+13 -10
View File
@@ -27,15 +27,18 @@ pub use envelope::{
};
pub use operations::{
AddMonitoredItemsResponse, AuthenticationDataBytes, ConnectResponse,
CreateSubscriptionResponse, DeleteSubscriptionResponse, MinimalMonitoredItem,
MinimalWriteValue, OperationError, PublishResponse, ReadResponse, RegisterItemsResponse,
UnregisterItemsResponse, WriteResponse, build_add_monitored_items_request_body,
build_authenticate_me_request_body, build_connect_request_body,
build_create_subscription_request_body, build_delete_subscription_request_body,
CreateSubscriptionResponse, DeleteMonitoredItemsResponse, DeleteSubscriptionResponse,
MinimalMonitoredItem, MinimalWriteValue, OperationError, PublishResponse,
PublishWriteCompleteResponse, ReadResponse, RegisterItemsResponse, UnregisterItemsResponse,
WriteResponse, build_add_monitored_items_request_body, build_authenticate_me_request_body,
build_connect_request_body, build_create_subscription_request_body,
build_delete_monitored_items_request_body, build_delete_subscription_request_body,
build_disconnect_request_body, build_keep_alive_request_body, build_publish_request_body,
build_read_request_body, build_register_items_request_body,
build_unregister_items_request_body, build_write_request_body, collect_asbidata_payloads,
decode_add_monitored_items_response, decode_connect_response,
decode_create_subscription_response, decode_publish_response, decode_read_response,
decode_register_items_response, decode_unregister_items_response, decode_write_response,
build_publish_write_complete_request_body, build_read_request_body,
build_register_items_request_body, build_unregister_items_request_body,
build_write_request_body, collect_asbidata_payloads, decode_add_monitored_items_response,
decode_connect_response, decode_create_subscription_response,
decode_delete_monitored_items_response, decode_publish_response,
decode_publish_write_complete_response, decode_read_response, decode_register_items_response,
decode_unregister_items_response, decode_write_response,
};
+250
View File
@@ -395,6 +395,136 @@ fn find_inline_text(
None
}
// ---- PublishWriteComplete + DeleteMonitoredItems (F25 step 10) ----------
/// Build the NBFX token stream for a `PublishWriteCompleteIn` request
/// body. Empty wrapper per `AsbContracts.cs:204-205`
/// (`PublishWriteCompleteRequest : ConnectedRequest;` — no body fields
/// beyond the inherited ConnectionValidator header).
pub fn build_publish_write_complete_request_body() -> Vec<NbfxToken> {
vec![
NbfxToken::Element {
prefix: None,
name: NbfxName::Inline("PublishWriteCompleteRequest".to_string()),
},
NbfxToken::DefaultNamespace {
value: NbfxText::Chars(IOM_NS.to_string()),
},
NbfxToken::EndElement,
]
}
/// Decoded `PublishWriteCompleteResponse`. Mirrors `AsbContracts.cs:207-213`.
///
/// The inner `ItemWriteComplete` records are regular WCF DataContract
/// (not the binary fast-path), so per-element decode is deferred to a
/// later iteration once a live capture confirms the WCF XML wire form.
/// For now this just counts how many `<ItemWriteComplete>` elements
/// appeared in the body — enough for callers to detect "complete-write
/// callback fired" without parsing the per-write WriteHandle/Status.
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct PublishWriteCompleteResponse {
pub complete_writes_count: usize,
}
pub fn decode_publish_write_complete_response(
body_tokens: &[NbfxToken],
) -> Result<PublishWriteCompleteResponse, OperationError> {
let count = body_tokens
.iter()
.filter(|tok| {
matches!(
tok,
NbfxToken::Element { name: NbfxName::Inline(s), .. } if s == "ItemWriteComplete"
)
})
.count();
Ok(PublishWriteCompleteResponse {
complete_writes_count: count,
})
}
/// Build the NBFX token stream for `DeleteMonitoredItemsIn`. Mirrors
/// `AsbContracts.cs:268-277`. Same MonitoredItem shape as
/// AddMonitoredItems but no RequireId field.
pub fn build_delete_monitored_items_request_body(
subscription_id: i64,
items: &[MinimalMonitoredItem],
) -> Vec<NbfxToken> {
let mut tokens = vec![
NbfxToken::Element {
prefix: None,
name: NbfxName::Inline("DeleteMonitoredItemsRequest".to_string()),
},
NbfxToken::DefaultNamespace {
value: NbfxText::Chars(IOM_NS.to_string()),
},
NbfxToken::Element {
prefix: None,
name: NbfxName::Inline("SubscriptionId".to_string()),
},
NbfxToken::Text(NbfxText::Int64(subscription_id)),
NbfxToken::EndElement,
NbfxToken::Element {
prefix: None,
name: NbfxName::Inline("Items".to_string()),
},
];
for item in items {
tokens.push(NbfxToken::Element {
prefix: None,
name: NbfxName::Inline("MonitoredItem".to_string()),
});
tokens.push(NbfxToken::Element {
prefix: None,
name: NbfxName::Inline("Item".to_string()),
});
tokens.push(NbfxToken::Element {
prefix: None,
name: NbfxName::Inline("ASBIData".to_string()),
});
tokens.push(NbfxToken::Text(NbfxText::Bytes(item.item.encode())));
tokens.push(NbfxToken::EndElement); // </ASBIData>
tokens.push(NbfxToken::EndElement); // </Item>
tokens.push(NbfxToken::Element {
prefix: None,
name: NbfxName::Inline("SampleInterval".to_string()),
});
tokens.push(NbfxToken::Text(NbfxText::Int64(
item.sample_interval as i64,
)));
tokens.push(NbfxToken::EndElement);
tokens.push(NbfxToken::Element {
prefix: None,
name: NbfxName::Inline("Buffered".to_string()),
});
tokens.push(NbfxToken::Text(NbfxText::Bool(item.buffered)));
tokens.push(NbfxToken::EndElement);
tokens.push(NbfxToken::EndElement); // </MonitoredItem>
}
tokens.push(NbfxToken::EndElement); // </Items>
tokens.push(NbfxToken::EndElement); // </DeleteMonitoredItemsRequest>
tokens
}
/// Decoded `DeleteMonitoredItemsResponse`. Single Status array per
/// `AsbContracts.cs:279-285`.
#[derive(Debug, Clone, PartialEq)]
pub struct DeleteMonitoredItemsResponse {
pub status: Vec<ItemStatus>,
}
pub fn decode_delete_monitored_items_response(
body_tokens: &[NbfxToken],
) -> Result<DeleteMonitoredItemsResponse, OperationError> {
let payload = collect_asbidata_payloads(body_tokens, "Status")
.into_iter()
.next()
.ok_or(OperationError::MissingField { field: "Status" })?;
let status = decode_item_status_array(&payload)?;
Ok(DeleteMonitoredItemsResponse { status })
}
// ---- Write operation (F25 step 9) ---------------------------------------
/// Minimal `WriteValue` shape carrying just the AsbVariant payload. The
@@ -1709,6 +1839,126 @@ mod tests {
assert!(decoded.values.is_empty());
}
#[test]
fn publish_write_complete_body_is_empty_wrapper() {
let body = build_publish_write_complete_request_body();
assert_eq!(body.len(), 3);
assert!(matches!(
&body[0],
NbfxToken::Element { name: NbfxName::Inline(s), .. } if s == "PublishWriteCompleteRequest"
));
assert!(matches!(
&body[1],
NbfxToken::DefaultNamespace { value: NbfxText::Chars(ns) } if ns == IOM_NS
));
assert!(matches!(&body[2], NbfxToken::EndElement));
}
#[test]
fn publish_write_complete_response_counts_item_write_complete_elements() {
// Synthesize a body with two ItemWriteComplete elements.
let body = vec![
NbfxToken::Element {
prefix: None,
name: NbfxName::Inline("PublishWriteCompleteResponse".to_string()),
},
NbfxToken::Element {
prefix: None,
name: NbfxName::Inline("CompleteWrites".to_string()),
},
NbfxToken::Element {
prefix: None,
name: NbfxName::Inline("ItemWriteComplete".to_string()),
},
NbfxToken::EndElement,
NbfxToken::Element {
prefix: None,
name: NbfxName::Inline("ItemWriteComplete".to_string()),
},
NbfxToken::EndElement,
NbfxToken::EndElement,
NbfxToken::EndElement,
];
let decoded = decode_publish_write_complete_response(&body).unwrap();
assert_eq!(decoded.complete_writes_count, 2);
}
#[test]
fn publish_write_complete_response_zero_when_no_callbacks() {
let body = vec![
NbfxToken::Element {
prefix: None,
name: NbfxName::Inline("PublishWriteCompleteResponse".to_string()),
},
NbfxToken::EndElement,
];
let decoded = decode_publish_write_complete_response(&body).unwrap();
assert_eq!(decoded.complete_writes_count, 0);
}
#[test]
fn delete_monitored_items_body_carries_subscription_id_and_items() {
let item = MinimalMonitoredItem::new(ItemIdentity::absolute_by_name("Tag.A"), 1000);
let body = build_delete_monitored_items_request_body(11, &[item]);
assert!(matches!(
&body[0],
NbfxToken::Element { name: NbfxName::Inline(s), .. } if s == "DeleteMonitoredItemsRequest"
));
let mut saw_id = false;
let mut saw_monitored_item = false;
for tok in &body {
if let NbfxToken::Text(NbfxText::Int64(11)) = tok {
saw_id = true;
}
if let NbfxToken::Element {
name: NbfxName::Inline(local),
..
} = tok
{
if local == "MonitoredItem" {
saw_monitored_item = true;
}
}
}
assert!(saw_id);
assert!(saw_monitored_item);
}
#[test]
fn delete_monitored_items_body_omits_require_id_field() {
let item = MinimalMonitoredItem::new(ItemIdentity::absolute_by_name("Tag.A"), 1000);
let body = build_delete_monitored_items_request_body(7, &[item]);
// The DeleteMonitoredItems contract has no RequireId field;
// assert it doesn't show up.
for tok in &body {
if let NbfxToken::Element {
name: NbfxName::Inline(local),
..
} = tok
{
assert!(local != "RequireId");
}
}
}
#[test]
fn delete_monitored_items_response_round_trip() {
use mxaccess_codec::AsbStatus;
let status = vec![ItemStatus {
item: ItemIdentity::absolute_by_name("Tag.D"),
status: AsbStatus::default(),
error_code: 0,
error_code_specified: true,
}];
let payload = crate::contracts::encode_item_status_array(&status);
let body = asbidata_request_body(
"DeleteMonitoredItemsResponse",
&[BodyField::asbidata("Status", payload)],
);
let decoded = decode_delete_monitored_items_response(&body).unwrap();
assert_eq!(decoded.status, status);
}
#[test]
fn write_request_body_carries_items_values_and_write_handle() {
use mxaccess_codec::AsbVariant;