diff --git a/design/followups.md b/design/followups.md index 340eac1..a20893b 100644 --- a/design/followups.md +++ b/design/followups.md @@ -46,7 +46,11 @@ move to `## Resolved` with a date + commit hash. **Resolves when:** F19-F26 are all closed and the four DoD bullets above pass. -**Cumulative execution log.** F19 + F23 (`ed17c07`); F24 (`7611d9e`); F20 (`9dfd193`); F22 (`43c10a1`); F21 (`5f98558`); F25 step 1 (`25dbd8d`); F25 step 2 (`a2b8989`); F25 step 3 (`c4bf0a0`); F25 step 4 (`1e59249`); F25 step 5 (`9b8133f`); F25 step 6 (`321b796`); F25 step 7 (`1b1ee1e`); F26 step 1 (`8a0f92b`); F26 step 2 (`14bb529`); example rewrite (`c6570dc`); F25 step 8 (`b543eb1`); F25 step 9 landed in this commit: +**Cumulative execution log.** F19 + F23 (`ed17c07`); F24 (`7611d9e`); F20 (`9dfd193`); F22 (`43c10a1`); F21 (`5f98558`); F25 step 1 (`25dbd8d`); F25 step 2 (`a2b8989`); F25 step 3 (`c4bf0a0`); F25 step 4 (`1e59249`); F25 step 5 (`9b8133f`); F25 step 6 (`321b796`); F25 step 7 (`1b1ee1e`); F26 step 1 (`8a0f92b`); F26 step 2 (`14bb529`); example rewrite (`c6570dc`); F25 step 8 (`b543eb1`); F25 step 9 (`0441a2e`); F25 step 10 landed in this commit: +- F25 step 10: PublishWriteComplete + DeleteMonitoredItems — closes out the F25 operation matrix. `build_publish_write_complete_request_body` emits the empty wrapper element per `AsbContracts.cs:204-205` (no body fields beyond ConnectionValidator). `decode_publish_write_complete_response` returns a count of `` elements observed; per-element decode (Status array + WriteHandle) is deferred to a later iteration since `ItemWriteComplete` is regular WCF DataContract rather than the binary fast-path. `build_delete_monitored_items_request_body` mirrors AddMonitoredItems but omits the RequireId field per `cs:268-277`. `decode_delete_monitored_items_response` returns the per-item Status array. Two new client wrappers: `publish_write_complete()` and `delete_monitored_items(subscription_id, items)`. 6 new tests cover empty-body shape, ItemWriteComplete counting (0 / 2 elements), DeleteMonitoredItems body shape (carries SubscriptionId + MonitoredItem), DeleteMonitoredItems omits RequireId, and Status round-trip. **F25 operation matrix complete**: AsbClient now wraps every IASBIDataV2 operation: `connect`/`disconnect`/`send_end`/`send_preamble`/`keep_alive` (lifecycle), `register_items`/`unregister_items`/`read`/`write` (items), `create_subscription`/`add_monitored_items`/`publish`/`delete_monitored_items`/`delete_subscription` (subscriptions), `publish_write_complete` (write callback). Workspace: 701 tests pass (was 695, +6). + +**Earlier slices:** +- F25 step 9 (commit `0441a2e`): - F25 step 9: Write operation. New `MinimalWriteValue { value: AsbVariant }` carries just the `Value` payload; optional ArrayElementIndex/Comment/HasQT/Status/Timestamp WriteValue fields are deferred to a later iteration once a live capture confirms the WCF DataContract XML form. New `build_write_request_body(items, values, write_handle)` produces the full `WriteBasicRequest` body shape per `AsbContracts.cs:181-194`: Items array uses the IAsbCustomSerializableType binary fast-path (`{...}`), each Value's inner `Variant` field also uses the fast-path (`{...}`), and WriteHandle is an Int32. New `decode_write_response` returns the per-item Status array. New `client::write(items, values, write_handle)` wrapper. 4 new tests cover Write request body shape (carries Items array, parallel Values array with WriteValue elements, WriteHandle as Int32), parallel-array sizing (2 items + 2 values produces 2 WriteValue elements), Status round-trip, and missing-Status error. Workspace: 695 tests pass (was 691, +4). The IASBIDataV2 read+write+subscribe path is now functionally complete in-memory. **Earlier slices:** diff --git a/rust/crates/mxaccess-asb/src/client.rs b/rust/crates/mxaccess-asb/src/client.rs index f1a3c83..1f2cf74 100644 --- a/rust/crates/mxaccess-asb/src/client.rs +++ b/rust/crates/mxaccess-asb/src/client.rs @@ -56,15 +56,18 @@ use crate::contracts::{ItemIdentity, ItemStatus}; use crate::envelope::{ConnectionValidator, EnvelopeError, SoapEnvelope}; use crate::operations::{ AddMonitoredItemsResponse, ConnectResponse, CreateSubscriptionResponse, - DeleteSubscriptionResponse, MinimalMonitoredItem, MinimalWriteValue, OperationError, - PublishResponse, ReadResponse, RegisterItemsResponse, UnregisterItemsResponse, WriteResponse, + DeleteMonitoredItemsResponse, DeleteSubscriptionResponse, MinimalMonitoredItem, + MinimalWriteValue, OperationError, PublishResponse, PublishWriteCompleteResponse, ReadResponse, + RegisterItemsResponse, UnregisterItemsResponse, WriteResponse, build_add_monitored_items_request_body, build_authenticate_me_request_body, build_connect_request_body, build_create_subscription_request_body, - build_delete_subscription_request_body, build_disconnect_request_body, - build_keep_alive_request_body, build_publish_request_body, build_read_request_body, + build_delete_monitored_items_request_body, build_delete_subscription_request_body, + build_disconnect_request_body, build_keep_alive_request_body, build_publish_request_body, + build_publish_write_complete_request_body, build_read_request_body, build_register_items_request_body, build_unregister_items_request_body, build_write_request_body, decode_add_monitored_items_response, decode_connect_response, - decode_create_subscription_response, decode_publish_response, decode_read_response, + decode_create_subscription_response, decode_delete_monitored_items_response, + decode_publish_response, decode_publish_write_complete_response, decode_read_response, decode_register_items_response, decode_unregister_items_response, decode_write_response, }; use crate::{actions, decode_envelope, encode_envelope}; @@ -341,6 +344,40 @@ impl AsbClient { Ok(decode_read_response(&response.body_tokens)?) } + /// `PublishWriteComplete` operation — long-poll the + /// write-completion-callback queue. Mirrors the + /// `[OperationContract(Action = "...:publishWriteCompleteIn")]` + /// at `AsbContracts.cs:42`. Returns a count of completed writes + /// (per-element decode is deferred to a later iteration once a + /// live capture confirms the WCF DataContract XML shape). + pub async fn publish_write_complete( + &mut self, + ) -> Result { + let body = build_publish_write_complete_request_body(); + let response = self + .send_signed_envelope(actions::PUBLISH_WRITE_COMPLETE, body, false) + .await?; + Ok(decode_publish_write_complete_response( + &response.body_tokens, + )?) + } + + /// `DeleteMonitoredItems` operation — removes items from a + /// subscription. Returns the per-item Status array. + pub async fn delete_monitored_items( + &mut self, + subscription_id: i64, + items: &[MinimalMonitoredItem], + ) -> Result { + let body = build_delete_monitored_items_request_body(subscription_id, items); + let response = self + .send_signed_envelope(actions::DELETE_MONITORED_ITEMS, body, false) + .await?; + Ok(decode_delete_monitored_items_response( + &response.body_tokens, + )?) + } + /// `Write` operation — sends a signed `WriteIn` SOAP envelope and /// decodes the `WriteResponse` (per-item Status array). /// diff --git a/rust/crates/mxaccess-asb/src/lib.rs b/rust/crates/mxaccess-asb/src/lib.rs index c828cfa..918eb2c 100644 --- a/rust/crates/mxaccess-asb/src/lib.rs +++ b/rust/crates/mxaccess-asb/src/lib.rs @@ -27,15 +27,18 @@ pub use envelope::{ }; pub use operations::{ AddMonitoredItemsResponse, AuthenticationDataBytes, ConnectResponse, - CreateSubscriptionResponse, DeleteSubscriptionResponse, MinimalMonitoredItem, - MinimalWriteValue, OperationError, PublishResponse, ReadResponse, RegisterItemsResponse, - UnregisterItemsResponse, WriteResponse, build_add_monitored_items_request_body, - build_authenticate_me_request_body, build_connect_request_body, - build_create_subscription_request_body, build_delete_subscription_request_body, + CreateSubscriptionResponse, DeleteMonitoredItemsResponse, DeleteSubscriptionResponse, + MinimalMonitoredItem, MinimalWriteValue, OperationError, PublishResponse, + PublishWriteCompleteResponse, ReadResponse, RegisterItemsResponse, UnregisterItemsResponse, + WriteResponse, build_add_monitored_items_request_body, build_authenticate_me_request_body, + build_connect_request_body, build_create_subscription_request_body, + build_delete_monitored_items_request_body, build_delete_subscription_request_body, build_disconnect_request_body, build_keep_alive_request_body, build_publish_request_body, - build_read_request_body, build_register_items_request_body, - build_unregister_items_request_body, build_write_request_body, collect_asbidata_payloads, - decode_add_monitored_items_response, decode_connect_response, - decode_create_subscription_response, decode_publish_response, decode_read_response, - decode_register_items_response, decode_unregister_items_response, decode_write_response, + build_publish_write_complete_request_body, build_read_request_body, + build_register_items_request_body, build_unregister_items_request_body, + build_write_request_body, collect_asbidata_payloads, decode_add_monitored_items_response, + decode_connect_response, decode_create_subscription_response, + decode_delete_monitored_items_response, decode_publish_response, + decode_publish_write_complete_response, decode_read_response, decode_register_items_response, + decode_unregister_items_response, decode_write_response, }; diff --git a/rust/crates/mxaccess-asb/src/operations.rs b/rust/crates/mxaccess-asb/src/operations.rs index 904ceca..e41dfbe 100644 --- a/rust/crates/mxaccess-asb/src/operations.rs +++ b/rust/crates/mxaccess-asb/src/operations.rs @@ -395,6 +395,136 @@ fn find_inline_text( None } +// ---- PublishWriteComplete + DeleteMonitoredItems (F25 step 10) ---------- + +/// Build the NBFX token stream for a `PublishWriteCompleteIn` request +/// body. Empty wrapper per `AsbContracts.cs:204-205` +/// (`PublishWriteCompleteRequest : ConnectedRequest;` — no body fields +/// beyond the inherited ConnectionValidator header). +pub fn build_publish_write_complete_request_body() -> Vec { + vec![ + NbfxToken::Element { + prefix: None, + name: NbfxName::Inline("PublishWriteCompleteRequest".to_string()), + }, + NbfxToken::DefaultNamespace { + value: NbfxText::Chars(IOM_NS.to_string()), + }, + NbfxToken::EndElement, + ] +} + +/// Decoded `PublishWriteCompleteResponse`. Mirrors `AsbContracts.cs:207-213`. +/// +/// The inner `ItemWriteComplete` records are regular WCF DataContract +/// (not the binary fast-path), so per-element decode is deferred to a +/// later iteration once a live capture confirms the WCF XML wire form. +/// For now this just counts how many `` elements +/// appeared in the body — enough for callers to detect "complete-write +/// callback fired" without parsing the per-write WriteHandle/Status. +#[derive(Debug, Clone, PartialEq, Eq, Default)] +pub struct PublishWriteCompleteResponse { + pub complete_writes_count: usize, +} + +pub fn decode_publish_write_complete_response( + body_tokens: &[NbfxToken], +) -> Result { + let count = body_tokens + .iter() + .filter(|tok| { + matches!( + tok, + NbfxToken::Element { name: NbfxName::Inline(s), .. } if s == "ItemWriteComplete" + ) + }) + .count(); + Ok(PublishWriteCompleteResponse { + complete_writes_count: count, + }) +} + +/// Build the NBFX token stream for `DeleteMonitoredItemsIn`. Mirrors +/// `AsbContracts.cs:268-277`. Same MonitoredItem shape as +/// AddMonitoredItems but no RequireId field. +pub fn build_delete_monitored_items_request_body( + subscription_id: i64, + items: &[MinimalMonitoredItem], +) -> Vec { + let mut tokens = vec![ + NbfxToken::Element { + prefix: None, + name: NbfxName::Inline("DeleteMonitoredItemsRequest".to_string()), + }, + NbfxToken::DefaultNamespace { + value: NbfxText::Chars(IOM_NS.to_string()), + }, + NbfxToken::Element { + prefix: None, + name: NbfxName::Inline("SubscriptionId".to_string()), + }, + NbfxToken::Text(NbfxText::Int64(subscription_id)), + NbfxToken::EndElement, + NbfxToken::Element { + prefix: None, + name: NbfxName::Inline("Items".to_string()), + }, + ]; + for item in items { + tokens.push(NbfxToken::Element { + prefix: None, + name: NbfxName::Inline("MonitoredItem".to_string()), + }); + tokens.push(NbfxToken::Element { + prefix: None, + name: NbfxName::Inline("Item".to_string()), + }); + tokens.push(NbfxToken::Element { + prefix: None, + name: NbfxName::Inline("ASBIData".to_string()), + }); + tokens.push(NbfxToken::Text(NbfxText::Bytes(item.item.encode()))); + tokens.push(NbfxToken::EndElement); // + tokens.push(NbfxToken::EndElement); // + tokens.push(NbfxToken::Element { + prefix: None, + name: NbfxName::Inline("SampleInterval".to_string()), + }); + tokens.push(NbfxToken::Text(NbfxText::Int64( + item.sample_interval as i64, + ))); + tokens.push(NbfxToken::EndElement); + tokens.push(NbfxToken::Element { + prefix: None, + name: NbfxName::Inline("Buffered".to_string()), + }); + tokens.push(NbfxToken::Text(NbfxText::Bool(item.buffered))); + tokens.push(NbfxToken::EndElement); + tokens.push(NbfxToken::EndElement); // + } + tokens.push(NbfxToken::EndElement); // + tokens.push(NbfxToken::EndElement); // + tokens +} + +/// Decoded `DeleteMonitoredItemsResponse`. Single Status array per +/// `AsbContracts.cs:279-285`. +#[derive(Debug, Clone, PartialEq)] +pub struct DeleteMonitoredItemsResponse { + pub status: Vec, +} + +pub fn decode_delete_monitored_items_response( + body_tokens: &[NbfxToken], +) -> Result { + let payload = collect_asbidata_payloads(body_tokens, "Status") + .into_iter() + .next() + .ok_or(OperationError::MissingField { field: "Status" })?; + let status = decode_item_status_array(&payload)?; + Ok(DeleteMonitoredItemsResponse { status }) +} + // ---- Write operation (F25 step 9) --------------------------------------- /// Minimal `WriteValue` shape carrying just the AsbVariant payload. The @@ -1709,6 +1839,126 @@ mod tests { assert!(decoded.values.is_empty()); } + #[test] + fn publish_write_complete_body_is_empty_wrapper() { + let body = build_publish_write_complete_request_body(); + assert_eq!(body.len(), 3); + assert!(matches!( + &body[0], + NbfxToken::Element { name: NbfxName::Inline(s), .. } if s == "PublishWriteCompleteRequest" + )); + assert!(matches!( + &body[1], + NbfxToken::DefaultNamespace { value: NbfxText::Chars(ns) } if ns == IOM_NS + )); + assert!(matches!(&body[2], NbfxToken::EndElement)); + } + + #[test] + fn publish_write_complete_response_counts_item_write_complete_elements() { + // Synthesize a body with two ItemWriteComplete elements. + let body = vec![ + NbfxToken::Element { + prefix: None, + name: NbfxName::Inline("PublishWriteCompleteResponse".to_string()), + }, + NbfxToken::Element { + prefix: None, + name: NbfxName::Inline("CompleteWrites".to_string()), + }, + NbfxToken::Element { + prefix: None, + name: NbfxName::Inline("ItemWriteComplete".to_string()), + }, + NbfxToken::EndElement, + NbfxToken::Element { + prefix: None, + name: NbfxName::Inline("ItemWriteComplete".to_string()), + }, + NbfxToken::EndElement, + NbfxToken::EndElement, + NbfxToken::EndElement, + ]; + let decoded = decode_publish_write_complete_response(&body).unwrap(); + assert_eq!(decoded.complete_writes_count, 2); + } + + #[test] + fn publish_write_complete_response_zero_when_no_callbacks() { + let body = vec![ + NbfxToken::Element { + prefix: None, + name: NbfxName::Inline("PublishWriteCompleteResponse".to_string()), + }, + NbfxToken::EndElement, + ]; + let decoded = decode_publish_write_complete_response(&body).unwrap(); + assert_eq!(decoded.complete_writes_count, 0); + } + + #[test] + fn delete_monitored_items_body_carries_subscription_id_and_items() { + let item = MinimalMonitoredItem::new(ItemIdentity::absolute_by_name("Tag.A"), 1000); + let body = build_delete_monitored_items_request_body(11, &[item]); + assert!(matches!( + &body[0], + NbfxToken::Element { name: NbfxName::Inline(s), .. } if s == "DeleteMonitoredItemsRequest" + )); + let mut saw_id = false; + let mut saw_monitored_item = false; + for tok in &body { + if let NbfxToken::Text(NbfxText::Int64(11)) = tok { + saw_id = true; + } + if let NbfxToken::Element { + name: NbfxName::Inline(local), + .. + } = tok + { + if local == "MonitoredItem" { + saw_monitored_item = true; + } + } + } + assert!(saw_id); + assert!(saw_monitored_item); + } + + #[test] + fn delete_monitored_items_body_omits_require_id_field() { + let item = MinimalMonitoredItem::new(ItemIdentity::absolute_by_name("Tag.A"), 1000); + let body = build_delete_monitored_items_request_body(7, &[item]); + // The DeleteMonitoredItems contract has no RequireId field; + // assert it doesn't show up. + for tok in &body { + if let NbfxToken::Element { + name: NbfxName::Inline(local), + .. + } = tok + { + assert!(local != "RequireId"); + } + } + } + + #[test] + fn delete_monitored_items_response_round_trip() { + use mxaccess_codec::AsbStatus; + let status = vec![ItemStatus { + item: ItemIdentity::absolute_by_name("Tag.D"), + status: AsbStatus::default(), + error_code: 0, + error_code_specified: true, + }]; + let payload = crate::contracts::encode_item_status_array(&status); + let body = asbidata_request_body( + "DeleteMonitoredItemsResponse", + &[BodyField::asbidata("Status", payload)], + ); + let decoded = decode_delete_monitored_items_response(&body).unwrap(); + assert_eq!(decoded.status, status); + } + #[test] fn write_request_body_carries_items_values_and_write_handle() { use mxaccess_codec::AsbVariant;