//! Per-operation request / response NBFX-token builders for //! `IASBIDataV2`. //! //! Each `IAsbCustomSerializableType`-decorated field in a request //! contract is serialised by WCF's `AsbDataCustomSerializer` //! (`AsbContracts.cs:1561-1599`) as: //! //! ```xml //! //! {base64-binary} //! //! ``` //! //! The `` element body is the binary `WriteToStream` / //! `WriteArrayToStream` output, written via `WriteBase64`. In the NBFX //! wire form we get from the WCF binary encoder, `WriteBase64` emits a //! `Bytes8/16/32Text` record (raw binary, NOT base64 text — base64 is //! the XML-text representation of the same bytes). //! //! ## Scope this iteration (F25 step 2) //! //! Implements: //! * [`build_register_items_request_body`] — `RegisterItems` request //! contract per `AsbContracts.cs:119-143`. //! * [`build_unregister_items_request_body`] — `UnregisterItems` //! request per `cs:145-159`. //! //! Stubbed for next F25 iteration: //! * `Read`, `Write`, `PublishWriteComplete`, `CreateSubscription`, //! `AddMonitoredItems`, `DeleteMonitoredItems`, `Publish`. Each //! follows the same NBFX-token pattern; the per-operation cost is //! small once the `RegisterItems` reference is set. //! * Response decoders. Same pattern in reverse: the reply envelope's //! body tokens carry a per-operation outer element wrapping //! `` Bytes records, each decoded via the corresponding //! `InitializeArrayFromStream` shape. use mxaccess_asb_nettcp::nbfx::{NbfxName, NbfxText, NbfxToken}; use mxaccess_codec::{CodecError, RuntimeValue}; use crate::contracts::{ ItemIdentity, ItemStatus, MonitoredItemValue, decode_item_status_array, decode_monitored_item_value_array, encode_item_identity_array, }; /// Build the NBFX token stream for the body of a `RegisterItemsIn` /// SOAP envelope. The caller wraps it via [`crate::SoapEnvelope`] + /// [`crate::encode_envelope`]. /// /// Wire shape (from `AsbContracts.cs:119-143`): /// ```xml /// /// /// {int32 count + each ItemIdentity binary} /// /// true|false /// true|false /// /// ``` /// /// NOTE: WCF emits the wrapper element's `xmlns` declaration as a /// default-namespace attribute (``). NBFX represents this as a /// `DefaultNamespace`-attribute token immediately after the element /// open. pub fn build_register_items_request_body( items: &[ItemIdentity], require_id: bool, register_only: bool, ) -> Vec { let payload = encode_item_identity_array(items); asbidata_request_body( "RegisterItemsRequest", &[ BodyField::asbidata("Items", payload), BodyField::boolean("RequireId", require_id), BodyField::boolean("RegisterOnly", register_only), ], ) } /// Build the NBFX token stream for `ReadIn`. Mirror of /// `AsbContracts.cs:161-167`: /// ```xml /// /// {int32 count + each ItemIdentity} /// /// ``` pub fn build_read_request_body(items: &[ItemIdentity]) -> Vec { let payload = encode_item_identity_array(items); asbidata_request_body("ReadRequest", &[BodyField::asbidata("Items", payload)]) } /// Build the NBFX token stream for a `ConnectIn` request body. /// `ConnectRequest` is the first operation a fresh ASB session sends — /// it carries the consumer's DH public key + a fresh `ConnectionId` /// GUID. Sent **unsigned** (no `ConnectionValidator` header) since the /// authenticator hasn't received the service's public key yet. /// /// Wire shape (mirrors `AsbContracts.cs:78-86`): /// ```xml /// /// {guid-text} /// /// {public-key-bytes} /// /// /// ``` /// /// **Wire-byte caveat**: WCF's XML serialiser emits the `` /// `byte[]` member via `WriteBase64`, which the binary-message encoder /// represents as a `BytesXText` NBFX record (raw binary, not base64 /// text). For services using DataContract serialisation, the inner /// `PublicKey` element may also receive an `xsi:type` attribute or a /// distinct namespace — until a live capture confirms the exact /// wire form, this builder uses the simplest plausible shape. F25 /// live-probe iteration will reconcile. pub fn build_connect_request_body( connection_id: [u8; 16], consumer_public_key: &[u8], ) -> Vec { let mut tokens = vec![ NbfxToken::Element { prefix: None, name: NbfxName::Inline("ConnectRequest".to_string()), }, NbfxToken::DefaultNamespace { value: NbfxText::Chars(MESSAGES_NS.to_string()), }, NbfxToken::Element { prefix: None, name: NbfxName::Inline("ConnectionId".to_string()), }, NbfxToken::Text(NbfxText::Chars(crate::envelope::format_uuid_for_test( &connection_id, ))), NbfxToken::EndElement, // NbfxToken::Element { prefix: None, name: NbfxName::Inline("ConsumerPublicKey".to_string()), }, ]; tokens.extend(public_key_data_field(consumer_public_key)); tokens.push(NbfxToken::EndElement); // tokens.push(NbfxToken::EndElement); // tokens } /// Build the NBFX token stream for `DisconnectIn`. Mirrors /// `AsbContracts.cs:109-114`: /// ```xml /// /// /// {encrypted-bytes} /// {iv-bytes} /// /// /// ``` /// /// One-way op (`IsOneWay = true` at `AsbContracts.cs:22`); typically /// signed with the connection validator (no `forceHmac`) right before /// closing the channel. pub fn build_disconnect_request_body( consumer_data: &[u8], initialization_vector: &[u8], ) -> Vec { let mut tokens = vec![ NbfxToken::Element { prefix: None, name: NbfxName::Inline("DisconnectRequest".to_string()), }, NbfxToken::DefaultNamespace { value: NbfxText::Chars(MESSAGES_NS.to_string()), }, NbfxToken::Element { prefix: None, name: NbfxName::Inline("ConsumerAuthenticationData".to_string()), }, ]; tokens.extend(authentication_data_fields( consumer_data, initialization_vector, )); tokens.push(NbfxToken::EndElement); // tokens.push(NbfxToken::EndElement); // tokens } /// Build the NBFX token stream for `AuthenticateMeIn`. Sent /// **one-way** + **signed with `forceHmac=true`** per /// `MxAsbDataClient.cs:106-111`: /// ```xml /// /// /// {encrypted-bytes} /// {iv-bytes} /// /// /// ``` pub fn build_authenticate_me_request_body( consumer_data: &[u8], initialization_vector: &[u8], ) -> Vec { let mut tokens = vec![ NbfxToken::Element { prefix: None, name: NbfxName::Inline("AuthenticateMeRequest".to_string()), }, NbfxToken::DefaultNamespace { value: NbfxText::Chars(MESSAGES_NS.to_string()), }, NbfxToken::Element { prefix: None, name: NbfxName::Inline("ConsumerAuthenticationData".to_string()), }, ]; tokens.extend(authentication_data_fields( consumer_data, initialization_vector, )); tokens.push(NbfxToken::EndElement); // tokens.push(NbfxToken::EndElement); // tokens } fn public_key_data_field(data: &[u8]) -> Vec { vec![ NbfxToken::Element { prefix: None, name: NbfxName::Inline("Data".to_string()), }, // .NET's `PublicKey` class has // `[XmlType(Namespace = "http://asb.contracts.data/20111111")]` // (`AsbContracts.cs:350-362`). XmlSerializer emits an // `xmlns="..."` redeclaration on `` to switch from the // outer messages namespace into the data namespace. Without // this, the server's deserialiser fails and dispatches a // generic InternalServiceFault. Verified against .NET probe // wire capture. NbfxToken::DefaultNamespace { value: NbfxText::Chars("http://asb.contracts.data/20111111".to_string()), }, NbfxToken::Text(NbfxText::Bytes(data.to_vec())), NbfxToken::EndElement, ] } /// `AuthenticationData` per `AsbContracts.cs:364-381`: /// /// ```csharp /// [XmlType(Namespace = "http://asb.contracts.data/20111111")] /// public sealed class AuthenticationData { /// public byte[]? Data { get; set; } /// public byte[]? InitializationVector { get; set; } /// } /// ``` /// /// Same data-namespace switch as `` — the `` /// element gets the `xmlns="...data/20111111"` redeclaration. The /// `` element is in the same data namespace /// (already-in-scope because of the prior `` redeclaration's /// `xmlns` lasts until end of ``). fn authentication_data_fields(data: &[u8], iv: &[u8]) -> Vec { // The default-namespace declaration on `` only stays in // scope until `` closes. `` opens // afterwards and therefore needs its OWN xmlns redeclaration to // stay in the `http://asb.contracts.data/20111111` namespace // (matching `[XmlType]` on the `AuthenticationData` class). Without // the second redeclaration the IV element falls back to the parent // (messages) namespace and the server's XmlSerializer rejects the // request with a generic InternalServiceFault. let data_ns = "http://asb.contracts.data/20111111".to_string(); vec![ NbfxToken::Element { prefix: None, name: NbfxName::Inline("Data".to_string()), }, NbfxToken::DefaultNamespace { value: NbfxText::Chars(data_ns.clone()), }, NbfxToken::Text(NbfxText::Bytes(data.to_vec())), NbfxToken::EndElement, NbfxToken::Element { prefix: None, name: NbfxName::Inline("InitializationVector".to_string()), }, NbfxToken::DefaultNamespace { value: NbfxText::Chars(data_ns), }, NbfxToken::Text(NbfxText::Bytes(iv.to_vec())), NbfxToken::EndElement, ] } /// Decoded `ConnectResponse`. Mirrors `AsbContracts.cs:88-100`. #[derive(Debug, Clone, PartialEq)] pub struct ConnectResponse { /// Service public key bytes (`PublicKey.Data`). Required. pub service_public_key: Vec, /// Service authentication data — encrypted blob + IV. Optional; /// some service versions omit it. pub service_authentication_data: Option, /// Negotiated connection lifetime (xs:duration string like /// `"PT60M:V2"`). The `:V2` suffix toggles Apollo signing in F23. pub connection_lifetime: Option, } /// `AuthenticationData` payload (`Data` + `InitializationVector`). #[derive(Debug, Clone, PartialEq, Eq)] pub struct AuthenticationDataBytes { pub data: Vec, pub initialization_vector: Vec, } /// Decode a `ConnectResponse` SOAP body from the NBFX tokens returned /// by [`crate::decode_envelope`]. pub fn decode_connect_response( body_tokens: &[NbfxToken], dynamic: &mxaccess_asb_nettcp::nbfx::DynamicDictionary, ) -> Result { let service_public_key = find_inline_bytes(body_tokens, &["ServicePublicKey", "Data"]).ok_or( OperationError::MissingField { field: "ServicePublicKey/Data", }, )?; let service_authentication_data = find_authentication_data(body_tokens, "ServiceAuthenticationData"); let connection_lifetime = find_inline_text(body_tokens, "ConnectionLifetime", dynamic); Ok(ConnectResponse { service_public_key, service_authentication_data, connection_lifetime, }) } /// Walk `tokens` and find the inner `Bytes` payload of an element-path /// like `["ServicePublicKey", "Data"]` (i.e. `{Bytes}`). /// Permissive — skips attributes / namespace decls between element opens. fn find_inline_bytes(tokens: &[NbfxToken], path: &[&str]) -> Option> { let mut idx = 0; let mut path_idx = 0; while let Some(tok) = tokens.get(idx) { if path_idx == path.len() { // Should be a Text(Bytes) here (after skipping attribute-like tokens). let mut inner = idx; while matches!( tokens.get(inner), Some(NbfxToken::Attribute { .. }) | Some(NbfxToken::DefaultNamespace { .. }) | Some(NbfxToken::NamespaceDeclaration { .. }) ) { inner += 1; } if let Some(NbfxToken::Text(NbfxText::Bytes(bytes))) = tokens.get(inner) { return Some(bytes.clone()); } return None; } if let NbfxToken::Element { name: NbfxName::Inline(local), .. } = tok { if let Some(target) = path.get(path_idx) { if local == target { path_idx += 1; } } } idx += 1; } None } fn find_authentication_data( tokens: &[NbfxToken], outer_name: &str, ) -> Option { // Find the outer element, then within its scope locate Data and IV. let mut idx = 0; while let Some(tok) = tokens.get(idx) { if let NbfxToken::Element { name: NbfxName::Inline(local), .. } = tok { if local == outer_name { let data = find_inline_bytes(tokens.get(idx + 1..)?, &["Data"]).unwrap_or_default(); let iv = find_inline_bytes(tokens.get(idx + 1..)?, &["InitializationVector"]) .unwrap_or_default(); if data.is_empty() && iv.is_empty() { return None; } return Some(AuthenticationDataBytes { data, initialization_vector: iv, }); } } idx += 1; } None } fn find_inline_text( tokens: &[NbfxToken], name: &str, dynamic: &mxaccess_asb_nettcp::nbfx::DynamicDictionary, ) -> Option { let mut idx = 0; while let Some(tok) = tokens.get(idx) { if let NbfxToken::Element { name: NbfxName::Inline(local), .. } = tok { if local == name { let mut inner = idx + 1; while matches!( tokens.get(inner), Some(NbfxToken::Attribute { .. }) | Some(NbfxToken::DefaultNamespace { .. }) | Some(NbfxToken::NamespaceDeclaration { .. }) ) { inner += 1; } if let Some(NbfxToken::Text(text)) = tokens.get(inner) { return text.resolve(dynamic); } } } idx += 1; } None } // ---- PublishWriteComplete + DeleteMonitoredItems (F25 step 10) ---------- /// Build the NBFX token stream for a `PublishWriteCompleteIn` request /// body. Empty wrapper per `AsbContracts.cs:204-205` /// (`PublishWriteCompleteRequest : ConnectedRequest;` — no body fields /// beyond the inherited ConnectionValidator header). pub fn build_publish_write_complete_request_body() -> Vec { vec![ NbfxToken::Element { prefix: None, name: NbfxName::Inline("PublishWriteCompleteRequest".to_string()), }, NbfxToken::DefaultNamespace { value: NbfxText::Chars(IOM_NS.to_string()), }, NbfxToken::EndElement, ] } /// Decoded `PublishWriteCompleteResponse`. Mirrors `AsbContracts.cs:207-213`. /// /// The inner `ItemWriteComplete` records are regular WCF DataContract /// (not the binary fast-path), so per-element decode is deferred to a /// later iteration once a live capture confirms the WCF XML wire form. /// For now this just counts how many `` elements /// appeared in the body — enough for callers to detect "complete-write /// callback fired" without parsing the per-write WriteHandle/Status. #[derive(Debug, Clone, PartialEq, Eq, Default)] pub struct PublishWriteCompleteResponse { pub complete_writes_count: usize, /// `Result.resultCodeField` per the F31 InvalidConnectionId pattern. pub result_code: Option, /// `Result.successField` per the F31 pattern. pub success: Option, } pub fn decode_publish_write_complete_response( body_tokens: &[NbfxToken], ) -> Result { let count = body_tokens .iter() .filter(|tok| { matches!( tok, NbfxToken::Element { name: NbfxName::Inline(s), .. } if s == "ItemWriteComplete" ) }) .count(); let (result_code, success) = extract_result_status(body_tokens); Ok(PublishWriteCompleteResponse { complete_writes_count: count, result_code, success, }) } /// Build the NBFX token stream for `DeleteMonitoredItemsIn`. Mirrors /// `AsbContracts.cs:268-277`. Same MonitoredItem shape as /// AddMonitoredItems but no RequireId field. pub fn build_delete_monitored_items_request_body( subscription_id: i64, items: &[MinimalMonitoredItem], ) -> Vec { let mut tokens = vec![ NbfxToken::Element { prefix: None, name: NbfxName::Inline("DeleteMonitoredItemsRequest".to_string()), }, NbfxToken::DefaultNamespace { value: NbfxText::Chars(IOM_NS.to_string()), }, NbfxToken::Element { prefix: None, name: NbfxName::Inline("SubscriptionId".to_string()), }, NbfxToken::Text(NbfxText::Int64(subscription_id)), NbfxToken::EndElement, NbfxToken::Element { prefix: None, name: NbfxName::Inline("Items".to_string()), }, NbfxToken::NamespaceDeclaration { prefix: "b".to_string(), value: NbfxText::Chars(DC_ASBIDATAV2_NS.to_string()), }, NbfxToken::NamespaceDeclaration { prefix: "i".to_string(), value: NbfxText::Chars(XSI_NS.to_string()), }, ]; for item in items { push_monitored_item_body(&mut tokens, item); } tokens.push(NbfxToken::EndElement); // tokens.push(NbfxToken::EndElement); // tokens } /// Decoded `DeleteMonitoredItemsResponse`. Single Status array per /// `AsbContracts.cs:279-285`. #[derive(Debug, Clone, PartialEq)] pub struct DeleteMonitoredItemsResponse { pub status: Vec, /// `Result.resultCodeField` per the F31 InvalidConnectionId pattern. pub result_code: Option, /// `Result.successField` per the F31 pattern. pub success: Option, } pub fn decode_delete_monitored_items_response( body_tokens: &[NbfxToken], ) -> Result { let status = match collect_asbidata_payloads(body_tokens).into_iter().next() { Some(payload) if !payload.is_empty() => decode_item_status_array(&payload)?, _ => Vec::new(), }; let (result_code, success) = extract_result_status(body_tokens); Ok(DeleteMonitoredItemsResponse { status, result_code, success, }) } // ---- Write operation (F25 step 9) --------------------------------------- /// Minimal `WriteValue` shape carrying just the AsbVariant payload. The /// full .NET `WriteValue` (`AsbContracts.cs:793-894`) also has optional /// ArrayElementIndex, Comment, HasQT, Status, and Timestamp fields. /// Those are deferred to a later F25 iteration once a live capture /// confirms the WCF DataContract XML wire form. /// /// Note: the .NET `WriteValue` does NOT carry `Item` directly — /// `WriteBasicRequest` carries `Items[]` + `Values[]` as parallel /// arrays. We mirror that wire shape — see [`build_write_request_body`]. #[derive(Debug, Clone, PartialEq)] pub struct MinimalWriteValue { pub value: mxaccess_codec::AsbVariant, } impl MinimalWriteValue { pub fn new(value: mxaccess_codec::AsbVariant) -> Self { Self { value } } } /// Build the NBFX token stream for a `WriteIn` request body. Mirrors /// `AsbContracts.cs:181-194`. The Items array uses the /// IAsbCustomSerializableType binary fast-path (`` Bytes /// record); the Values array is per-WriteValue regular XML — though /// the Variant inside each WriteValue/Value field IS /// IAsbCustomSerializableType so it gets `` wrapping. /// /// **Wire-byte caveat**: optional ArrayElementIndex / Comment / HasQT /// / Status / Timestamp fields are not emitted. Live-probe iteration /// will reconcile. pub fn build_write_request_body( items: &[ItemIdentity], values: &[MinimalWriteValue], write_handle: u32, ) -> Vec { let items_payload = encode_item_identity_array(items); let mut tokens = vec![ NbfxToken::Element { prefix: None, name: NbfxName::Inline("WriteBasicRequest".to_string()), }, NbfxToken::DefaultNamespace { value: NbfxText::Chars(IOM_NS.to_string()), }, NbfxToken::Element { prefix: None, name: NbfxName::Inline("Items".to_string()), }, NbfxToken::Element { prefix: None, name: NbfxName::Inline("ASBIData".to_string()), }, NbfxToken::Text(NbfxText::Bytes(items_payload)), NbfxToken::EndElement, // NbfxToken::EndElement, // NbfxToken::Element { prefix: None, name: NbfxName::Inline("Values".to_string()), }, ]; for v in values { tokens.push(NbfxToken::Element { prefix: None, name: NbfxName::Inline("WriteValue".to_string()), }); tokens.push(NbfxToken::Element { prefix: None, name: NbfxName::Inline("Value".to_string()), }); tokens.push(NbfxToken::Element { prefix: None, name: NbfxName::Inline("ASBIData".to_string()), }); tokens.push(NbfxToken::Text(NbfxText::Bytes(v.value.encode()))); tokens.push(NbfxToken::EndElement); // tokens.push(NbfxToken::EndElement); // tokens.push(NbfxToken::EndElement); // } tokens.push(NbfxToken::EndElement); // tokens.push(NbfxToken::Element { prefix: None, name: NbfxName::Inline("WriteHandle".to_string()), }); tokens.push(NbfxToken::Text(NbfxText::Int32(write_handle as i32))); tokens.push(NbfxToken::EndElement); tokens.push(NbfxToken::EndElement); // tokens } /// Decoded `WriteResponse`. Mirrors `AsbContracts.cs:196-202` — just /// the per-item Status array. #[derive(Debug, Clone, PartialEq)] pub struct WriteResponse { pub status: Vec, /// `Result.resultCodeField` per the F31 InvalidConnectionId pattern. pub result_code: Option, /// `Result.successField` per the F31 pattern. pub success: Option, } pub fn decode_write_response(body_tokens: &[NbfxToken]) -> Result { let status = match collect_asbidata_payloads(body_tokens).into_iter().next() { Some(payload) if !payload.is_empty() => decode_item_status_array(&payload)?, _ => Vec::new(), }; let (result_code, success) = extract_result_status(body_tokens); Ok(WriteResponse { status, result_code, success, }) } // ---- Subscription operations (F25 step 8) ------------------------------- /// Build the NBFX token stream for a `CreateSubscriptionIn` request /// body. Mirrors `AsbContracts.cs:215-223`: /// ```xml /// /// {long} /// {ulong} /// /// ``` pub fn build_create_subscription_request_body( max_queue_size: i64, sample_interval: u64, ) -> Vec { asbidata_request_body( "CreateSubscriptionRequest", &[ BodyField::int64("MaxQueueSize", max_queue_size), BodyField::uint64("SampleInterval", sample_interval), ], ) } /// Build the NBFX token stream for `DeleteSubscriptionIn`. Mirrors /// `AsbContracts.cs:232-237`: /// ```xml /// /// {long} /// /// ``` pub fn build_delete_subscription_request_body(subscription_id: i64) -> Vec { asbidata_request_body( "DeleteSubscriptionRequest", &[BodyField::int64("SubscriptionId", subscription_id)], ) } /// Build the NBFX token stream for `PublishIn`. Mirrors /// `AsbContracts.cs:287-292`: /// ```xml /// /// {long} /// /// ``` pub fn build_publish_request_body(subscription_id: i64) -> Vec { asbidata_request_body( "PublishRequest", &[BodyField::int64("SubscriptionId", subscription_id)], ) } /// Build the NBFX token stream for `AddMonitoredItemsIn`. Mirrors /// `AsbContracts.cs:242-254` — the **minimal** form that supplies only /// the required `Item` + `SampleInterval` per `MonitoredItem`. Optional /// `Active` / `TimeDeadband` / `ValueDeadband` / `UserData` / `Buffered` /// fields are NOT emitted (their `*Specified=false` in WCF would /// suppress them anyway). Wire shape: /// /// ```xml /// /// {long} /// /// /// {ItemIdentity binary} /// {ulong} /// false /// /// ... /// /// {bool} /// /// ``` /// /// **Wire-byte caveat**: `MonitoredItem` is a regular WCF DataContract /// (not `IAsbCustomSerializableType`). The exact element-ordering / /// xsi:type attribute / namespace prefix layout depends on which /// serializer WCF picks; this builder emits the most plausible shape /// and the live-probe iteration will reconcile. F25 follow-up will /// expand to the full optional-field set once a capture is available. pub fn build_add_monitored_items_request_body( subscription_id: i64, items: &[MinimalMonitoredItem], require_id: bool, ) -> Vec { let mut tokens = vec![ NbfxToken::Element { prefix: None, name: NbfxName::Inline("AddMonitoredItemsRequest".to_string()), }, NbfxToken::DefaultNamespace { value: NbfxText::Chars(IOM_NS.to_string()), }, // NbfxToken::Element { prefix: None, name: NbfxName::Inline("SubscriptionId".to_string()), }, NbfxToken::Text(NbfxText::Int64(subscription_id)), NbfxToken::EndElement, // NbfxToken::Element { prefix: None, name: NbfxName::Inline("Items".to_string()), }, NbfxToken::NamespaceDeclaration { prefix: "b".to_string(), value: NbfxText::Chars(DC_ASBIDATAV2_NS.to_string()), }, NbfxToken::NamespaceDeclaration { prefix: "i".to_string(), value: NbfxText::Chars(XSI_NS.to_string()), }, ]; for item in items { push_monitored_item_body(&mut tokens, item); } tokens.push(NbfxToken::EndElement); // // tokens.push(NbfxToken::Element { prefix: None, name: NbfxName::Inline("RequireId".to_string()), }); tokens.push(NbfxToken::Text(NbfxText::Bool(require_id))); tokens.push(NbfxToken::EndElement); tokens.push(NbfxToken::EndElement); // tokens } /// Emit a single `...` NBFX subtree. /// Shared between AddMonitoredItems and DeleteMonitoredItems request /// builders. /// /// **Wire shape: DataContract field-suffix names, NOT XmlSerializer /// property names.** MxDataProvider's binary deserialiser is the /// `DataContractSerializer`-driven path for non-`IAsbCustomSerializable` /// types like `MonitoredItem`, so the on-the-wire element names are the /// `[DataMember(Name = "...")]` private-field names from /// `AsbContracts.cs:940-965` — `activeField`, `bufferedField`, /// `itemField`, `sampleIntervalField`, etc. — and they live in the DC /// namespace `http://schemas.datacontract.org/2004/07/ArchestrAServices.ASBIDataV2Contract` /// (prefix `b`). /// /// Field order follows the explicit `[DataMember(Order = N)]` attributes /// (alphabetical-by-default for DC, but explicitly numbered here): /// `activeField`, `activeFieldSpecified`, `bufferedField`, `itemField`, /// `sampleIntervalField`, `timeDeadbandField`, /// `timeDeadbandFieldSpecified`, `userDataField`, `valueDeadbandField`. /// /// The canonical-XML HMAC signing path (`xml_canonical::emit_monitored_item`) /// uses XmlSerializer property names (``, ``, etc.) — that /// stays unchanged because `XmlSerializer.Serialize` is what the .NET /// `AsbSystemAuthenticator.Sign` HMACs over (canonical XML form). /// Verified against the captured `add-monitored-items-request-wire.bin` /// fixture — F34. fn push_monitored_item_body(tokens: &mut Vec, item: &MinimalMonitoredItem) { tokens.push(NbfxToken::Element { prefix: Some("b".to_string()), name: NbfxName::Inline("MonitoredItem".to_string()), }); // Order 0: activeField (bool — defaults to false when not Specified) push_b_bool(tokens, "activeField", item.active.unwrap_or(false)); // Order 1: activeFieldSpecified (bool — true iff `active` is Some) push_b_bool(tokens, "activeFieldSpecified", item.active.is_some()); // Order 2: bufferedField push_b_bool(tokens, "bufferedField", item.buffered); // Order 3: itemField (nested ItemIdentity, DataContract-serialised // — NOT the binary fast-path, which only kicks in at // top-level message body members). push_b_item_identity(tokens, &item.item); // Order 4: sampleIntervalField (ulong — WCF binary writer emits // ulong as `Chars8`/etc. text via `XmlConvert.ToString` for non-0/1 // values; 0/1 collapse to the Zero/One text records). push_b_ulong_text(tokens, "sampleIntervalField", item.sample_interval); // Order 5+6: timeDeadbandField + timeDeadbandFieldSpecified — // omitted-from-public-API on `MinimalMonitoredItem`; emit defaults. push_b_ulong_text(tokens, "timeDeadbandField", 0); push_b_bool(tokens, "timeDeadbandFieldSpecified", false); // Order 7: userDataField (empty Variant — typeField=65535 = "no value") push_b_empty_variant(tokens, "userDataField"); // Order 8: valueDeadbandField (empty Variant) push_b_empty_variant(tokens, "valueDeadbandField"); tokens.push(NbfxToken::EndElement); // } /// `{bool}` — Bool text record (with-end-element /// variant chosen by the encoder). fn push_b_bool(tokens: &mut Vec, name: &str, value: bool) { tokens.push(NbfxToken::Element { prefix: Some("b".to_string()), name: NbfxName::Inline(name.to_string()), }); tokens.push(NbfxToken::Text(NbfxText::Bool(value))); tokens.push(NbfxToken::EndElement); } /// `{ulong-as-text}` — WCF emits `ulong` via /// `XmlConvert.ToString` (decimal text) which the binary writer then /// encodes as `Chars8`. Values 0 and 1 collapse to the dedicated /// `ZeroText` / `OneText` records that the WCF binary writer prefers /// when the text would be `"0"` / `"1"`. fn push_b_ulong_text(tokens: &mut Vec, name: &str, value: u64) { tokens.push(NbfxToken::Element { prefix: Some("b".to_string()), name: NbfxName::Inline(name.to_string()), }); let text = match value { 0 => NbfxText::Zero, 1 => NbfxText::One, n => NbfxText::Chars(n.to_string()), }; tokens.push(NbfxToken::Text(text)); tokens.push(NbfxToken::EndElement); } /// `{ushort-as-binary}` — `ushort` goes through /// `WriteInt32` in WCF binary, which emits `Zero` / `One` for those /// values and `Int8` / `Int16` / `Int32` for larger values (smallest /// width that fits). fn push_b_ushort(tokens: &mut Vec, name: &str, value: u16) { tokens.push(NbfxToken::Element { prefix: Some("b".to_string()), name: NbfxName::Inline(name.to_string()), }); let text = match value { 0 => NbfxText::Zero, 1 => NbfxText::One, n if n <= i8::MAX as u16 => NbfxText::Int8(n as i8), n if n <= i16::MAX as u16 => NbfxText::Int16(n as i16), n => NbfxText::Int32(n as i32), }; tokens.push(NbfxToken::Text(text)); tokens.push(NbfxToken::EndElement); } /// `{string-or-empty-element}` — WCF emits a /// non-empty string as `Chars8/16/32` text and `Some("")` / `None` as /// an empty element (no child text). The captured wire shows no /// `i:nil="true"` attribute even when the field semantically maps to /// .NET `null`, so we skip the nil-attribute path. fn push_b_string(tokens: &mut Vec, name: &str, value: Option<&str>) { tokens.push(NbfxToken::Element { prefix: Some("b".to_string()), name: NbfxName::Inline(name.to_string()), }); if let Some(s) = value { if !s.is_empty() { tokens.push(NbfxToken::Text(NbfxText::Chars(s.to_string()))); } } tokens.push(NbfxToken::EndElement); } /// Emit a nested `ItemIdentity` as DataContract fields. Order matches /// `AsbContracts.cs:533-553`: contextNameField, idField, idFieldSpecified, /// nameField, referenceTypeField, typeField (alphabetical by member /// name = the explicit `[DataMember(Order = N)]` ordering). fn push_b_item_identity(tokens: &mut Vec, identity: &ItemIdentity) { tokens.push(NbfxToken::Element { prefix: Some("b".to_string()), name: NbfxName::Inline("itemField".to_string()), }); push_b_string(tokens, "contextNameField", identity.context_name.as_deref()); push_b_ulong_text(tokens, "idField", identity.id); push_b_bool(tokens, "idFieldSpecified", identity.id_specified); push_b_string(tokens, "nameField", identity.name.as_deref()); push_b_ushort(tokens, "referenceTypeField", identity.reference_type); push_b_ushort(tokens, "typeField", identity.kind); tokens.push(NbfxToken::EndElement); // } /// Emit an empty `Variant` (no payload, type = 65535 = "no value"). /// Field order follows `AsbContracts.cs:1170-1181`: lengthField, /// payloadField, typeField. fn push_b_empty_variant(tokens: &mut Vec, name: &str) { tokens.push(NbfxToken::Element { prefix: Some("b".to_string()), name: NbfxName::Inline(name.to_string()), }); push_b_int_text(tokens, "lengthField", 0); // payloadField is `byte[]?`; an empty/null value emits as an empty // element (no `` attribute on the captured wire). tokens.push(NbfxToken::Element { prefix: Some("b".to_string()), name: NbfxName::Inline("payloadField".to_string()), }); tokens.push(NbfxToken::EndElement); push_b_ushort(tokens, "typeField", 65535); tokens.push(NbfxToken::EndElement); // } /// `{int32}` — int32 via the smallest-fit binary /// text record (matches WCF's `WriteInt32` which collapses 0 / 1 to /// the Zero / One text records). fn push_b_int_text(tokens: &mut Vec, name: &str, value: i32) { tokens.push(NbfxToken::Element { prefix: Some("b".to_string()), name: NbfxName::Inline(name.to_string()), }); let text = match value { 0 => NbfxText::Zero, 1 => NbfxText::One, n if (i8::MIN as i32..=i8::MAX as i32).contains(&n) => NbfxText::Int8(n as i8), n if (i16::MIN as i32..=i16::MAX as i32).contains(&n) => NbfxText::Int16(n as i16), n => NbfxText::Int32(n), }; tokens.push(NbfxToken::Text(text)); tokens.push(NbfxToken::EndElement); } /// Minimal `MonitoredItem` shape covering `Item`, `SampleInterval`, /// `Buffered`, and the `*Specified`-gated `Active` field. The full /// .NET `MonitoredItem` (`AsbContracts.cs:936-1030`) also has /// `TimeDeadband`, `ValueDeadband`, and `UserData` — deferred until a /// live capture confirms each one's wire-byte form. /// /// **`sample_interval` unit is milliseconds**, NOT 100-ns ticks. The /// .NET reference's `MxAsbDataClient.AddMonitoredItems` defaults to /// `ulong sampleInterval = 1000` (= 1 second), passed straight to the /// wire (`MxAsbDataClient.cs:441`). Sending tick-units (e.g. /// `10_000_000` for "1 second in 100-ns ticks") makes MxDataProvider /// schedule the next sample ~2.8 hours out — `Publish` polls then /// always come back empty until the misinterpreted timer expires. /// Verified live 2026-05-06. /// /// **`active` is the `*Specified` knob that decides whether /// `` appears on the wire**. `None` → not emitted (server /// defaults to inactive — `Publish` polls return zero values). /// `Some(true)` → emitted as `true`; the /// MxDataProvider then actually delivers values from the /// subscription. The .NET reference's `AddMonitoredItems` defaults /// to `active: true` (`MxAsbDataClient.cs:441`); the /// `MonitoredItem.Active` setter at `AsbContracts.cs:982-987` /// auto-flips `ActiveSpecified=true` so the wire includes the /// element. F34: this asymmetry is what made our subscribe path /// see zero values where .NET sees real ones — verified live /// 2026-05-06. #[derive(Debug, Clone, PartialEq)] pub struct MinimalMonitoredItem { pub item: ItemIdentity, /// Sample interval in **milliseconds** (matches the .NET wire form). pub sample_interval: u64, pub buffered: bool, /// `Some(b)` emits `{b}` on the wire (the /// `*Specified` pattern). `None` omits the element entirely — /// MxDataProvider then treats the item as inactive and delivers /// no values. Use `Some(true)` to actually receive samples. pub active: Option, } impl MinimalMonitoredItem { /// Build a default `MinimalMonitoredItem`: item + interval, no /// Active flag, no Buffered. Matches the .NET `new MonitoredItem /// { Item = ..., SampleInterval = ... }` shape used by the /// canonical-XML fixtures. /// /// **For live subscriptions that should actually deliver values, /// prefer [`Self::with_active`].** Without `Active=true` on the /// wire, the server defaults to inactive and `Publish` returns /// empty payloads. pub fn new(item: ItemIdentity, sample_interval: u64) -> Self { Self { item, sample_interval, buffered: false, active: None, } } /// Build a `MinimalMonitoredItem` with `Active=true`. This is /// what the .NET reference's `AddMonitoredItems` emits by default /// (`MxAsbDataClient.cs:441`) and what makes MxDataProvider /// actually deliver values from the subscription. pub fn with_active(item: ItemIdentity, sample_interval: u64, active: bool) -> Self { Self { item, sample_interval, buffered: false, active: Some(active), } } } /// Decoded `CreateSubscriptionResponse`. Single primitive field per /// `AsbContracts.cs:225-230` plus the F31-style result_code/success /// surface. /// /// `subscription_id` is `0` when the server short-circuits on /// `InvalidConnectionId` and never assigns one — check `result_code` /// for `RESULT_CODE_INVALID_CONNECTION_ID` (1) before treating /// `subscription_id` as valid. #[derive(Debug, Clone, PartialEq)] pub struct CreateSubscriptionResponse { pub subscription_id: i64, /// `Result.resultCodeField` from the response wrapper. `Some(1)` = /// `InvalidConnectionId` — caller should retry. `None` if the /// field wasn't present (the success path doesn't necessarily /// emit it). pub result_code: Option, /// `Result.successField` — `false` means the operation failed /// server-side and `subscription_id` is unset. pub success: Option, } /// Decode a `CreateSubscriptionResponse` SOAP body. /// /// Tolerates a missing `` element — that's how the /// server signals an operation-level failure (`successField=false` /// with non-zero `resultCodeField`). Mirrors the F31 tolerance /// pattern. Caller inspects `result_code` / `success`. pub fn decode_create_subscription_response( body_tokens: &[NbfxToken], dynamic: &mxaccess_asb_nettcp::nbfx::DynamicDictionary, ) -> Result { let subscription_id = find_inline_int64(body_tokens, "SubscriptionId", dynamic).unwrap_or(0); let result_code = find_text_in_named_element(body_tokens, "resultCodeField") .and_then(|s| s.parse().ok()); let success = find_text_in_named_element(body_tokens, "successField") .map(|s| s.eq_ignore_ascii_case("true")); Ok(CreateSubscriptionResponse { subscription_id, result_code, success, }) } /// Decoded `AddMonitoredItemsResponse`. `ItemCapabilities` is regular /// WCF XML (not the binary fast-path) — currently surfaces as a presence /// flag, mirroring `RegisterItemsResponse`. /// /// Tolerates empty / missing `` Status payloads under the /// F31 pattern; surfaces `result_code` / `success` so callers can /// retry on `InvalidConnectionId`. #[derive(Debug, Clone, PartialEq)] pub struct AddMonitoredItemsResponse { pub status: Vec, pub item_capabilities_present: bool, /// `Result.resultCodeField` from the response wrapper. `Some(1)` = /// `InvalidConnectionId`. pub result_code: Option, /// `Result.successField` — `false` means the per-item Status array /// is empty. pub success: Option, } pub fn decode_add_monitored_items_response( body_tokens: &[NbfxToken], ) -> Result { let payloads = collect_asbidata_payloads(body_tokens); let status = match payloads.into_iter().next() { Some(payload) if !payload.is_empty() => decode_item_status_array(&payload)?, _ => Vec::new(), }; let item_capabilities_present = find_element_named(body_tokens, "ItemCapabilities").is_some(); let result_code = find_text_in_named_element(body_tokens, "resultCodeField") .and_then(|s| s.parse().ok()); let success = find_text_in_named_element(body_tokens, "successField") .map(|s| s.eq_ignore_ascii_case("true")); Ok(AddMonitoredItemsResponse { status, item_capabilities_present, result_code, success, }) } /// Decoded `PublishResponse`. Mirrors `AsbContracts.cs:294-304`: /// `Status` (per-item operation status) + `Values` (one /// `MonitoredItemValue` per delivered sample). #[derive(Debug, Clone, PartialEq)] pub struct PublishResponse { pub status: Vec, pub values: Vec, /// `Result.resultCodeField` per the F31 InvalidConnectionId pattern. /// On the F26 stream's hot path: when this is `Some(non_zero)` the /// publish-loop should terminate the stream with an error rather /// than silently delivering empty value arrays forever. pub result_code: Option, /// `Result.successField` per the F31 pattern. pub success: Option, } pub fn decode_publish_response( body_tokens: &[NbfxToken], ) -> Result { let payloads = collect_asbidata_payloads(body_tokens); // Tolerate empty/missing Status payload — that's the // InvalidConnectionId short-circuit shape captured live in F33. let status_payload = payloads .first() .map(Vec::as_slice) .unwrap_or(&[]); let status = if status_payload.is_empty() { Vec::new() } else { decode_item_status_array(status_payload)? }; let values = match payloads.get(1) { Some(payload) if !payload.is_empty() => decode_monitored_item_value_array(payload)?, _ => Vec::new(), }; let (result_code, success) = extract_result_status(body_tokens); Ok(PublishResponse { status, values, result_code, success, }) } /// Decoded `DeleteSubscriptionResponse`. Empty body per /// `AsbContracts.cs:239-240` (`ConnectedResponse;` — no fields). /// Always returns `Ok(())` regardless of body content; the decoder /// exists for symmetry with the other operations. #[derive(Debug, Clone, PartialEq, Eq, Default)] pub struct DeleteSubscriptionResponse; /// Walk `tokens` and find an inline int64 element-text under the named /// element. Used for `` and similar primitive response /// fields. Permissive — skips attributes/xmlns decls between Element /// and Text. fn find_inline_int64( tokens: &[NbfxToken], name: &str, dynamic: &mxaccess_asb_nettcp::nbfx::DynamicDictionary, ) -> Option { let mut idx = 0; while let Some(tok) = tokens.get(idx) { if let NbfxToken::Element { name: NbfxName::Inline(local), .. } = tok { if local == name { let mut inner = idx + 1; while matches!( tokens.get(inner), Some(NbfxToken::Attribute { .. }) | Some(NbfxToken::DefaultNamespace { .. }) | Some(NbfxToken::NamespaceDeclaration { .. }) ) { inner += 1; } match tokens.get(inner) { Some(NbfxToken::Text(NbfxText::Int64(v))) => return Some(*v), Some(NbfxToken::Text(NbfxText::Int32(v))) => return Some(*v as i64), Some(NbfxToken::Text(NbfxText::Zero)) => return Some(0), Some(NbfxToken::Text(NbfxText::One)) => return Some(1), Some(NbfxToken::Text(text)) => { // Fall back to text resolution + parse. if let Some(s) = text.resolve(dynamic) { if let Ok(v) = s.parse::() { return Some(v); } } return None; } _ => return None, } } } idx += 1; } None } /// Build the NBFX token stream for a `KeepAliveIn` request body. The /// `KeepAlive` contract has no body fields beyond the inherited /// `ConnectionValidator` header, so the body is just the empty wrapper /// element (`AsbContracts.cs:117` — `public sealed class KeepAlive : /// ConnectedRequest;`). /// /// One-way op (`IsOneWay = true` at `AsbContracts.cs:26`) — caller /// uses [`crate::AsbClient::send_envelope_one_way`]. pub fn build_keep_alive_request_body() -> Vec { vec![ NbfxToken::Element { prefix: None, name: NbfxName::Inline("KeepAliveRequest".to_string()), }, NbfxToken::DefaultNamespace { value: NbfxText::Chars(MESSAGES_NS.to_string()), }, NbfxToken::EndElement, ] } const MESSAGES_NS: &str = "http://asb.contracts.messages/20111111"; /// Decode a `ReadResponse` SOAP body. Mirrors the decode path of /// `MxAsbDataClient.DecodeVariant` (`MxAsbDataClient.cs:713-825`) /// applied to each `` `` payload. /// /// `Values` are decoded as `RuntimeValue` (timestamp + variant + status /// per `AsbContracts.cs:741-791`) using the F24 codec. `Status` is the /// per-item operation status array. #[derive(Debug, Clone, PartialEq)] pub struct ReadResponse { pub status: Vec, pub values: Vec, /// `Result.resultCodeField` from the response wrapper. `Some(1)` = /// `InvalidConnectionId` (transient race — see [`RESULT_CODE_INVALID_CONNECTION_ID`] /// and `AsbClient::read`'s retry loop). `None` if the field wasn't /// present (e.g. the server wrapped Read differently). pub result_code: Option, /// `Result.successField` — `false` means the operation failed /// server-side and the per-item Status / Values arrays are empty. pub success: Option, } /// Decode a `ReadResponse` SOAP body from the NBFX tokens returned by /// [`crate::decode_envelope`]. Both `Status` and `Values` arrive as /// `` payloads; we decode the binary form of each. /// /// Tolerates empty / missing `` payloads — that's how the /// server signals an operation-level failure (`successField=false` /// with a non-zero `resultCodeField`). Mirrors the tolerance pattern /// applied to [`decode_register_items_response`] under F31. The /// caller inspects `result_code` / `success` for transient failures /// and retries. pub fn decode_read_response(body_tokens: &[NbfxToken]) -> Result { let payloads = collect_asbidata_payloads(body_tokens); let status = match payloads.first() { Some(payload) if !payload.is_empty() => decode_item_status_array(payload)?, _ => Vec::new(), }; let values = match payloads.get(1) { Some(payload) if !payload.is_empty() => decode_runtime_value_array(payload)?, _ => Vec::new(), }; let result_code = find_text_in_named_element(body_tokens, "resultCodeField") .and_then(|s| s.parse().ok()); let success = find_text_in_named_element(body_tokens, "successField") .map(|s| s.eq_ignore_ascii_case("true")); Ok(ReadResponse { status, values, result_code, success, }) } /// Decode a `RuntimeValue[]` array from the WCF custom-serializer /// binary form (4-byte int32 count + each value's `WriteToStream`). /// Mirrors `RuntimeValue.InitializeArrayFromStream` (`AsbContracts.cs:771-780`). fn decode_runtime_value_array(input: &[u8]) -> Result, CodecError> { if input.len() < 4 { return Err(CodecError::ShortRead { expected: 4, actual: input.len(), }); } let mut count_buf = [0u8; 4]; if let Some(slice) = input.get(0..4) { count_buf.copy_from_slice(slice); } let count = i32::from_le_bytes(count_buf); if count < 0 { return Err(CodecError::Decode { offset: 0, reason: "negative runtime-value array count", buffer_len: input.len(), }); } let mut cursor = 4usize; let mut out = Vec::with_capacity(count as usize); for _ in 0..count { let tail = input.get(cursor..).ok_or(CodecError::ShortRead { expected: 1, actual: 0, })?; let (rv, consumed) = RuntimeValue::decode(tail)?; cursor += consumed; out.push(rv); } Ok(out) } /// Decoded `RegisterItemsResponse`. The `Status` array is binary-decoded /// via `decode_item_status_array`. The optional `ItemCapabilities` /// (`ItemRegistration[]`) field is **not** decoded here — that contract /// is regular WCF XML serialization rather than the binary /// `IAsbCustomSerializableType` fast-path, so it's deferred. Today we /// just count whether it appeared in the body. See follow-up F28. #[derive(Debug, Clone, PartialEq)] pub struct RegisterItemsResponse { pub status: Vec, /// Whether the `` element appeared. Decoding the /// individual `ItemRegistration` records is a future iteration. pub item_capabilities_present: bool, /// `Result.resultCodeField` from the response — `0` is success, /// `1` is `InvalidConnectionId` (transient race with the one-way /// AuthenticateMe), see `AsbResultMapping.cs:6` for the full enum. /// `None` if the field wasn't found in the response. pub result_code: Option, /// `Result.successField` — `false` means the operation failed /// server-side and the per-item Status array is empty. pub success: Option, } /// `AsbErrorCode.InvalidConnectionId` per `AsbResultMapping.cs:6`. /// Surfaces as `Result.resultCodeField=1` when the server has not /// (yet) processed our one-way AuthenticateMe and treats the /// connection as unauthenticated. .NET's `MxAsbDataClient.RegisterMany` /// (`cs:191-204`) retries up to 5 times with a 100*N ms backoff per /// attempt — we mirror that pattern in `AsbClient::register_items`. pub const RESULT_CODE_INVALID_CONNECTION_ID: u32 = 1; /// Decoded `UnregisterItemsResponse`. Single field: the per-item /// `Status` array (`AsbContracts.cs:153-159`). #[derive(Debug, Clone, PartialEq)] pub struct UnregisterItemsResponse { pub status: Vec, /// `Result.resultCodeField` per the F31 InvalidConnectionId pattern. pub result_code: Option, /// `Result.successField` per the F31 pattern. pub success: Option, } /// Shared helper for the F31 InvalidConnectionId tolerance pattern. /// Extracts `Result.resultCodeField` and `Result.successField` from /// the response body when the server returns the Result wrapper for /// an operation-level failure. Returns `(None, None)` for the success /// path where the wrapper isn't emitted. fn extract_result_status(body_tokens: &[NbfxToken]) -> (Option, Option) { let result_code = find_text_in_named_element(body_tokens, "resultCodeField") .and_then(|s| s.parse().ok()); let success = find_text_in_named_element(body_tokens, "successField") .map(|s| s.eq_ignore_ascii_case("true")); (result_code, success) } /// Decode a `RegisterItemsResponse` SOAP body from the NBFX token /// stream returned by [`crate::decode_envelope`]. Tolerates an empty /// or missing `` (Status array) — that's how the server /// signals an operation-level failure (e.g. `successField=false` + /// non-zero `resultCodeField`). Caller is expected to inspect /// `result_code` for transient failures like InvalidConnectionId /// and retry where appropriate. pub fn decode_register_items_response( body_tokens: &[NbfxToken], ) -> Result { let payloads = collect_asbidata_payloads(body_tokens); let status = match payloads.into_iter().next() { Some(payload) if !payload.is_empty() => decode_item_status_array(&payload)?, _ => Vec::new(), }; let item_capabilities_present = find_element_named(body_tokens, "ItemCapabilities").is_some(); let (result_code, success) = extract_result_status(body_tokens); Ok(RegisterItemsResponse { status, item_capabilities_present, result_code, success, }) } /// Walk the token stream looking for an element with the given local /// name (inline match) and return its first text child as a string. /// Used to extract `Result.resultCodeField`, `successField`, etc. /// from the structured RegisterItemsResponse body. fn find_text_in_named_element(tokens: &[NbfxToken], name: &str) -> Option { let mut idx = 0; while let Some(tok) = tokens.get(idx) { if let NbfxToken::Element { name: NbfxName::Inline(local), .. } = tok { if local == name { let mut inner = idx + 1; while matches!( tokens.get(inner), Some(NbfxToken::Attribute { .. }) | Some(NbfxToken::DefaultNamespace { .. }) | Some(NbfxToken::NamespaceDeclaration { .. }) ) { inner += 1; } if let Some(NbfxToken::Text(text)) = tokens.get(inner) { return Some(match text { NbfxText::Chars(s) => s.clone(), NbfxText::Zero => "0".to_string(), NbfxText::One => "1".to_string(), NbfxText::Bool(b) => b.to_string(), NbfxText::Int8(n) => n.to_string(), NbfxText::Int16(n) => n.to_string(), NbfxText::Int32(n) => n.to_string(), NbfxText::Int64(n) => n.to_string(), _ => return None, }); } } } idx += 1; } None } /// Decode an `UnregisterItemsResponse` SOAP body. Tolerates empty/ /// missing Status payload per the F31 pattern. pub fn decode_unregister_items_response( body_tokens: &[NbfxToken], ) -> Result { let status = match collect_asbidata_payloads(body_tokens).into_iter().next() { Some(payload) if !payload.is_empty() => decode_item_status_array(&payload)?, _ => Vec::new(), }; let (result_code, success) = extract_result_status(body_tokens); Ok(UnregisterItemsResponse { status, result_code, success, }) } /// Walk a SOAP body's NBFX token stream and pull out the /// `{Bytes}` payload bytes for any element named /// outer wrapper element. Returns `Vec>` ordered by /// declaration position — for shapes with multiple binary fields /// (e.g. `ReadResponse` has both `Status` and `Values`), the caller /// indexes positionally. /// /// `[F25 step 11 fix]` Previously this took a `field_name` parameter /// and looked for `<{name}>{Bytes}`. /// .NET's `AsbDataCustomSerializer.WriteStartObject` actually /// REPLACES the field's outer element with `` directly /// (`AsbContracts.cs:1561-1572`), so the wrapper element doesn't /// exist on the wire — confirmed via `MxAsbClient.Probe /// --dump-messages`. The function now returns all payloads in /// declaration order; callers use `payloads[0]`, `payloads.get(1)` /// etc. pub fn collect_asbidata_payloads(tokens: &[NbfxToken]) -> Vec> { let mut out = Vec::new(); let mut idx = 0; while let Some(tok) = tokens.get(idx) { if let NbfxToken::Element { name: NbfxName::Inline(local), .. } = tok { if local == "ASBIData" { // Skip attributes / namespace decls between Element // and Text. let mut inner = idx + 1; while matches!( tokens.get(inner), Some(NbfxToken::Attribute { .. }) | Some(NbfxToken::DefaultNamespace { .. }) | Some(NbfxToken::NamespaceDeclaration { .. }) ) { inner += 1; } // CONCATENATE all consecutive `Bytes` text records. // .NET's `XmlBinaryWriter.WriteBase64` chunks the byte // array into multiple NBFX `Bytes8/16/32` records when // the total exceeds the per-record budget — captured // live response showed an ASBIData payload split into // a Bytes8(78) + Bytes8WithEndElement(1) pair, total // 79 bytes. Earlier we only returned the first chunk // and the consumer hit a `ShortRead` decoding the // truncated ItemStatus. The decoder collapses adjacent // Bytes-followed-by-Bytes pairs into a single text // token, but a `Bytes`-then-`EndElement` (from the // `WithEndElement` variant) leaves a sequence of // `Bytes` tokens we walk here. let mut buf = Vec::new(); while let Some(NbfxToken::Text(NbfxText::Bytes(payload))) = tokens.get(inner) { buf.extend_from_slice(payload); inner += 1; } // F34: ALWAYS push, even when buf is empty. The wire // uses `` (empty) as positional // placeholders — e.g. `PublishResponse` emits an // empty `` for `Status` when the per-item // status array is empty, followed by a populated // `{values}` for `Values`. If we // skip the empty one, the Values payload shifts down // to index 0 where the decoder reads it as Status // and corrupts the parse. Captured live 2026-05-06 // via `examples/asb-relay.rs` middleman; fixture at // `tests/fixtures/publish-response-with-value.bin`. out.push(buf); } } idx += 1; } out } fn find_element_named<'a>(tokens: &'a [NbfxToken], name: &str) -> Option<&'a NbfxToken> { tokens.iter().find(|tok| { matches!(tok, NbfxToken::Element { name: NbfxName::Inline(local), .. } if local == name) }) } #[derive(Debug, thiserror::Error)] #[non_exhaustive] pub enum OperationError { #[error("response is missing required field {field}")] MissingField { field: &'static str }, #[error("codec error decoding response: {0}")] Codec(#[from] CodecError), } /// Build the NBFX token stream for `UnregisterItemsIn`. Mirror of /// `AsbContracts.cs:145-159`: /// ```xml /// /// {int32 count + each ItemIdentity binary} /// /// ``` pub fn build_unregister_items_request_body(items: &[ItemIdentity]) -> Vec { let payload = encode_item_identity_array(items); asbidata_request_body( "UnregisterItemsRequest", &[BodyField::asbidata("Items", payload)], ) } // ---- internal helpers ---------------------------------------------------- const IOM_NS: &str = "urn:msg.data.asb.iom:2"; /// DataContract namespace for `MonitoredItem` / `ItemIdentity` / /// `Variant` etc. Source: `[DataContract(Namespace = "...")]` on each /// type at `AsbContracts.cs:533, 936, 1170`. F34: this is the wire /// namespace for nested DataContract members emitted under the `b` /// prefix inside `` / `` payloads. const DC_ASBIDATAV2_NS: &str = "http://schemas.datacontract.org/2004/07/ArchestrAServices.ASBIDataV2Contract"; /// `xsi` namespace, declared (often unused) on the `` wrapper /// alongside the DC namespace. WCF declares it preemptively because /// any nullable DataContract field could emit `i:nil="true"`. const XSI_NS: &str = "http://www.w3.org/2001/XMLSchema-instance"; #[derive(Debug, Clone)] #[allow(clippy::enum_variant_names, dead_code)] // every body field is in fact an element; suffix is descriptive. `name` on AsbiDataElement is retained for self-documentation but no longer emitted on the wire (see `asbidata_request_body`). enum BodyField { /// Plain element with text body. BoolElement { name: &'static str, value: bool }, /// Plain element with int64 text body. WCF binary encoder emits /// numeric values as Int8/16/32/64 records — we always pick Int64 /// for simplicity; the decoder accepts any width. Int64Element { name: &'static str, value: i64 }, /// `` element with binary content (NBFX `Bytes` record). /// `name` is the .NET XmlElement attribute name (e.g. "Items", /// "Values") — kept for self-documentation but ignored on the /// wire because WCF's AsbDataCustomSerializer.WriteStartObject /// replaces the field's outer element with `` directly. AsbiDataElement { name: &'static str, payload: Vec, }, } impl BodyField { fn boolean(name: &'static str, value: bool) -> Self { Self::BoolElement { name, value } } fn int64(name: &'static str, value: i64) -> Self { Self::Int64Element { name, value } } /// `u64` is wider than `Int64Text`. WCF binary encodes large `ulong`s /// as `UInt64Text` (record `0xB2`) which our F21 codec doesn't yet /// emit; for the current proven value range (sample intervals, /// queue sizes — all well under `i64::MAX`) we cast to `i64`. If a /// future capture exposes values > `i64::MAX` we'll need to add /// `UInt64` to `NbfxText`. fn uint64(name: &'static str, value: u64) -> Self { Self::Int64Element { name, value: value as i64, } } fn asbidata(name: &'static str, payload: Vec) -> Self { Self::AsbiDataElement { name, payload } } } /// Emit `<{outer} xmlns="urn:msg.data.asb.iom:2"> ... ` with /// each [`BodyField`] in order. fn asbidata_request_body(outer: &str, fields: &[BodyField]) -> Vec { let mut tokens = vec![ NbfxToken::Element { prefix: None, name: NbfxName::Inline(outer.to_string()), }, NbfxToken::DefaultNamespace { value: NbfxText::Chars(IOM_NS.to_string()), }, ]; for field in fields { match field { BodyField::BoolElement { name, value } => { tokens.push(NbfxToken::Element { prefix: None, name: NbfxName::Inline((*name).to_string()), }); tokens.push(NbfxToken::Text(NbfxText::Bool(*value))); tokens.push(NbfxToken::EndElement); } BodyField::Int64Element { name, value } => { tokens.push(NbfxToken::Element { prefix: None, name: NbfxName::Inline((*name).to_string()), }); tokens.push(NbfxToken::Text(NbfxText::Int64(*value))); tokens.push(NbfxToken::EndElement); } BodyField::AsbiDataElement { name: _, payload } => { // WCF's AsbDataCustomSerializer.WriteStartObject // (`AsbContracts.cs:1561-1572`) REPLACES the field's // outer element with `` rather than nesting // inside it. The `name` parameter (e.g. "Items", // "Values") is ignored on the wire — the .NET // XmlElement attribute name is overridden by the // custom serializer. Verified via .NET probe // `--dump-messages` output. tokens.push(NbfxToken::Element { prefix: None, name: NbfxName::Inline("ASBIData".to_string()), }); tokens.push(NbfxToken::Text(NbfxText::Bytes(payload.clone()))); tokens.push(NbfxToken::EndElement); // } } } tokens.push(NbfxToken::EndElement); // tokens } #[cfg(test)] #[allow( clippy::unwrap_used, clippy::expect_used, clippy::panic, clippy::indexing_slicing )] mod tests { use super::*; use crate::contracts::decode_item_identity_array; use mxaccess_asb_nettcp::nbfx::DynamicDictionary; #[test] fn register_items_body_round_trips_items_via_asbidata() { let items = vec![ ItemIdentity::absolute_by_name("Tag.A"), ItemIdentity::absolute_by_name("Tag.B"), ]; let body = build_register_items_request_body(&items, true, false); // The body should open with assert!(matches!( &body[0], NbfxToken::Element { name: NbfxName::Inline(s), .. } if s == "RegisterItemsRequest" )); assert!(matches!( &body[1], NbfxToken::DefaultNamespace { value: NbfxText::Chars(ns) } if ns == IOM_NS )); // Find the {Bytes} token sequence and pull // the Bytes payload back out — it must round-trip the // ItemIdentity array exactly. let mut bytes_payload: Option> = None; for window in body.windows(3) { if matches!( &window[0], NbfxToken::Element { name: NbfxName::Inline(s), .. } if s == "ASBIData" ) { if let NbfxToken::Text(NbfxText::Bytes(b)) = &window[1] { if matches!(window[2], NbfxToken::EndElement) { bytes_payload = Some(b.clone()); break; } } } } let payload = bytes_payload.expect("ASBIData Bytes record not found in body"); let decoded = decode_item_identity_array(&payload).unwrap(); assert_eq!(decoded, items); } #[test] fn register_items_request_round_trips_through_envelope() { // End-to-end: build_register_items_request_body → SoapEnvelope // → encode_envelope → decode_envelope → re-extract body tokens // → re-extract ItemIdentity array. let items = vec![ItemIdentity::absolute_by_name("Tag.X")]; let body = build_register_items_request_body(&items, true, true); let env = crate::SoapEnvelope::new(crate::actions::REGISTER_ITEMS).with_body_tokens(body); let mut dyn_w = DynamicDictionary::new(); let bytes = crate::encode_envelope(&env, &mut dyn_w).unwrap(); let mut dyn_r = DynamicDictionary::new(); let decoded = crate::decode_envelope(&bytes, &mut dyn_r).unwrap(); assert_eq!( decoded.action.as_deref(), Some(crate::actions::REGISTER_ITEMS) ); let mut bytes_payload: Option> = None; for window in decoded.body_tokens.windows(3) { if matches!( &window[0], NbfxToken::Element { name: NbfxName::Inline(s), .. } if s == "ASBIData" ) { if let NbfxToken::Text(NbfxText::Bytes(b)) = &window[1] { bytes_payload = Some(b.clone()); break; } } } let payload = bytes_payload.expect("ASBIData payload missing from decoded envelope"); let recovered = decode_item_identity_array(&payload).unwrap(); assert_eq!(recovered, items); } #[test] fn register_items_body_carries_require_id_and_register_only_booleans() { let body = build_register_items_request_body(&[], true, false); // After the {} sub-tree, the // body should carry true followed by // false. Because `Bytes(empty)` // still emits a Bytes8 record + 1 EndElement + 1 EndElement, // walk the tokens by name to be robust. let mut saw_require_id_true = false; let mut saw_register_only_false = false; let mut idx = 0; while idx < body.len() { if let NbfxToken::Element { name: NbfxName::Inline(local), .. } = &body[idx] { if local == "RequireId" && matches!( body.get(idx + 1), Some(NbfxToken::Text(NbfxText::Bool(true))) ) { saw_require_id_true = true; } if local == "RegisterOnly" && matches!( body.get(idx + 1), Some(NbfxToken::Text(NbfxText::Bool(false))) ) { saw_register_only_false = true; } } idx += 1; } assert!(saw_require_id_true, "RequireId true not found"); assert!(saw_register_only_false, "RegisterOnly false not found"); } #[test] fn unregister_items_body_uses_correct_outer_element_name() { let body = build_unregister_items_request_body(&[ItemIdentity::absolute_by_name("X")]); assert!(matches!( &body[0], NbfxToken::Element { name: NbfxName::Inline(s), .. } if s == "UnregisterItemsRequest" )); // Should NOT have RequireId / RegisterOnly fields — the // unregister contract has only the Items array. for tok in &body { if let NbfxToken::Element { name: NbfxName::Inline(local), .. } = tok { assert!(local != "RequireId"); assert!(local != "RegisterOnly"); } } } #[test] fn read_request_body_uses_correct_outer_element_and_no_register_fields() { let body = build_read_request_body(&[ItemIdentity::absolute_by_name("Tag.X")]); assert!(matches!( &body[0], NbfxToken::Element { name: NbfxName::Inline(s), .. } if s == "ReadRequest" )); // The Read contract has only `Items`. RequireId / RegisterOnly / // Values are NOT present. for tok in &body { if let NbfxToken::Element { name: NbfxName::Inline(local), .. } = tok { assert!(local != "RequireId"); assert!(local != "RegisterOnly"); assert!(local != "Values"); } } } #[test] fn register_items_response_round_trips_status_array() { use mxaccess_codec::AsbStatus; let status = vec![ ItemStatus { item: ItemIdentity::absolute_by_name("Tag.A"), status: AsbStatus { count: 0, payload: vec![], }, error_code: 0, error_code_specified: true, }, ItemStatus { item: ItemIdentity::absolute_by_name("Tag.B"), status: AsbStatus { count: -1, payload: vec![0xC0], }, error_code: 7, error_code_specified: true, }, ]; let payload = crate::contracts::encode_item_status_array(&status); // Build a synthetic response body matching the wire shape. let body = asbidata_request_body( "RegisterItemsResponse", &[BodyField::asbidata("Status", payload)], ); let decoded = decode_register_items_response(&body).unwrap(); assert_eq!(decoded.status, status); assert!(!decoded.item_capabilities_present); } #[test] fn register_items_response_records_when_item_capabilities_appears() { use mxaccess_codec::AsbStatus; let status = vec![ItemStatus { item: ItemIdentity::absolute_by_name("X"), status: AsbStatus::default(), error_code: 0, error_code_specified: false, }]; let status_payload = crate::contracts::encode_item_status_array(&status); // Synthesise a body with both Status and ItemCapabilities elements. let mut body = asbidata_request_body( "RegisterItemsResponse", &[BodyField::asbidata("Status", status_payload)], ); // Splice in a synthetic ItemCapabilities element before the // outer EndElement. let close_idx = body.len() - 1; body.insert( close_idx, NbfxToken::Element { prefix: None, name: NbfxName::Inline("ItemCapabilities".to_string()), }, ); body.insert(close_idx + 1, NbfxToken::EndElement); let decoded = decode_register_items_response(&body).unwrap(); assert_eq!(decoded.status, status); assert!(decoded.item_capabilities_present); } #[test] fn unregister_items_response_round_trips() { use mxaccess_codec::AsbStatus; let status = vec![ItemStatus { item: ItemIdentity::absolute_by_name("Tag.Y"), status: AsbStatus { count: 1, payload: vec![0x40], }, error_code: 0, error_code_specified: false, }]; let payload = crate::contracts::encode_item_status_array(&status); let body = asbidata_request_body( "UnregisterItemsResponse", &[BodyField::asbidata("Status", payload)], ); let decoded = decode_unregister_items_response(&body).unwrap(); assert_eq!(decoded.status, status); } #[test] fn collect_asbidata_payloads_returns_empty_when_field_missing() { let body = vec![ NbfxToken::Element { prefix: None, name: NbfxName::Inline("Empty".to_string()), }, NbfxToken::EndElement, ]; assert!(collect_asbidata_payloads(&body).is_empty()); } #[test] fn collect_asbidata_payloads_handles_multiple_fields_positionally() { let body = asbidata_request_body( "ReadResponse", &[ BodyField::asbidata("Status", vec![1, 2, 3]), BodyField::asbidata("Values", vec![4, 5, 6, 7]), ], ); let payloads = collect_asbidata_payloads(&body); assert_eq!(payloads, vec![vec![1u8, 2, 3], vec![4u8, 5, 6, 7]]); } #[test] fn decode_register_items_response_returns_empty_status_when_absent() { // Per the live wire capture, the server returns an empty // `` (Status array) when an operation fails (e.g. // `successField=false` + `resultCodeField=1`). Decode now // tolerates this rather than erroring with `MissingField` — // callers inspect `result_code` for the failure reason. let body = asbidata_request_body("RegisterItemsResponse", &[]); let response = decode_register_items_response(&body).unwrap(); assert!(response.status.is_empty()); assert!(!response.item_capabilities_present); assert_eq!(response.result_code, None); assert_eq!(response.success, None); } #[test] fn connect_request_carries_connection_id_and_public_key() { let cid = [0x12u8; 16]; let pubkey = vec![0xAB, 0xCD, 0xEF]; let body = build_connect_request_body(cid, &pubkey); // Outer wrapper assert!(matches!( &body[0], NbfxToken::Element { name: NbfxName::Inline(s), .. } if s == "ConnectRequest" )); // ConnectionId text contains hyphenated GUID form let mut found_guid = false; let mut found_pubkey_bytes = false; for tok in &body { if let NbfxToken::Text(NbfxText::Chars(s)) = tok { if s.contains('-') && s.len() == 36 { found_guid = true; } } if let NbfxToken::Text(NbfxText::Bytes(b)) = tok { if *b == pubkey { found_pubkey_bytes = true; } } } assert!(found_guid, "ConnectionId text not found"); assert!(found_pubkey_bytes, "ConsumerPublicKey/Data bytes not found"); } #[test] fn disconnect_request_carries_data_and_iv_under_correct_wrapper() { let data = vec![0xDEu8, 0xAD]; let iv = vec![0xBEu8, 0xEF]; let body = build_disconnect_request_body(&data, &iv); assert!(matches!( &body[0], NbfxToken::Element { name: NbfxName::Inline(s), .. } if s == "DisconnectRequest" )); // Walk for the ConsumerAuthenticationData wrapper. let mut saw_consumer_auth_data = false; for tok in &body { if let NbfxToken::Element { name: NbfxName::Inline(local), .. } = tok { if local == "ConsumerAuthenticationData" { saw_consumer_auth_data = true; } } } assert!(saw_consumer_auth_data); let bytes_payloads: Vec> = body .iter() .filter_map(|tok| { if let NbfxToken::Text(NbfxText::Bytes(b)) = tok { Some(b.clone()) } else { None } }) .collect(); assert_eq!(bytes_payloads, vec![data, iv]); } #[test] fn authenticate_me_request_carries_data_and_iv() { let data = vec![0x01, 0x02, 0x03]; let iv = vec![0x04, 0x05]; let body = build_authenticate_me_request_body(&data, &iv); let bytes_payloads: Vec> = body .iter() .filter_map(|tok| { if let NbfxToken::Text(NbfxText::Bytes(b)) = tok { Some(b.clone()) } else { None } }) .collect(); assert_eq!(bytes_payloads, vec![data, iv]); } #[test] fn connect_response_round_trip() { // Build a synthetic ConnectResponse body and decode it back. let svc_pubkey = vec![0xFEu8, 0xED, 0xFA, 0xCE]; let svc_data = vec![0xBEu8, 0xEF]; let svc_iv = vec![0xCAu8, 0xFE]; let lifetime = "PT60M:V2".to_string(); use mxaccess_asb_nettcp::nbfx::DynamicDictionary; let body: Vec = vec![ NbfxToken::Element { prefix: None, name: NbfxName::Inline("ConnectResponse".to_string()), }, NbfxToken::DefaultNamespace { value: NbfxText::Chars(MESSAGES_NS.to_string()), }, // ServicePublicKey NbfxToken::Element { prefix: None, name: NbfxName::Inline("ServicePublicKey".to_string()), }, NbfxToken::Element { prefix: None, name: NbfxName::Inline("Data".to_string()), }, NbfxToken::Text(NbfxText::Bytes(svc_pubkey.clone())), NbfxToken::EndElement, NbfxToken::EndElement, // ServiceAuthenticationData NbfxToken::Element { prefix: None, name: NbfxName::Inline("ServiceAuthenticationData".to_string()), }, NbfxToken::Element { prefix: None, name: NbfxName::Inline("Data".to_string()), }, NbfxToken::Text(NbfxText::Bytes(svc_data.clone())), NbfxToken::EndElement, NbfxToken::Element { prefix: None, name: NbfxName::Inline("InitializationVector".to_string()), }, NbfxToken::Text(NbfxText::Bytes(svc_iv.clone())), NbfxToken::EndElement, NbfxToken::EndElement, // ConnectionLifetime NbfxToken::Element { prefix: None, name: NbfxName::Inline("ConnectionLifetime".to_string()), }, NbfxToken::Text(NbfxText::Chars(lifetime.clone())), NbfxToken::EndElement, // NbfxToken::EndElement, ]; let dict = DynamicDictionary::new(); let decoded = decode_connect_response(&body, &dict).unwrap(); assert_eq!(decoded.service_public_key, svc_pubkey); assert_eq!( decoded.service_authentication_data, Some(AuthenticationDataBytes { data: svc_data, initialization_vector: svc_iv, }) ); assert_eq!(decoded.connection_lifetime.as_deref(), Some("PT60M:V2")); } #[test] fn connect_response_without_optional_fields() { use mxaccess_asb_nettcp::nbfx::DynamicDictionary; let body: Vec = vec![ NbfxToken::Element { prefix: None, name: NbfxName::Inline("ConnectResponse".to_string()), }, NbfxToken::Element { prefix: None, name: NbfxName::Inline("ServicePublicKey".to_string()), }, NbfxToken::Element { prefix: None, name: NbfxName::Inline("Data".to_string()), }, NbfxToken::Text(NbfxText::Bytes(vec![1, 2, 3])), NbfxToken::EndElement, NbfxToken::EndElement, NbfxToken::EndElement, ]; let dict = DynamicDictionary::new(); let decoded = decode_connect_response(&body, &dict).unwrap(); assert_eq!(decoded.service_public_key, vec![1u8, 2, 3]); assert!(decoded.service_authentication_data.is_none()); assert!(decoded.connection_lifetime.is_none()); } #[test] fn connect_response_missing_service_public_key_fails() { use mxaccess_asb_nettcp::nbfx::DynamicDictionary; let body: Vec = vec![ NbfxToken::Element { prefix: None, name: NbfxName::Inline("ConnectResponse".to_string()), }, NbfxToken::EndElement, ]; let dict = DynamicDictionary::new(); let err = decode_connect_response(&body, &dict).unwrap_err(); assert!(matches!( err, OperationError::MissingField { field: "ServicePublicKey/Data" } )); } #[test] fn keep_alive_body_is_empty_wrapper_with_namespace() { let body = build_keep_alive_request_body(); assert_eq!(body.len(), 3); assert!(matches!( &body[0], NbfxToken::Element { name: NbfxName::Inline(s), .. } if s == "KeepAliveRequest" )); assert!(matches!( &body[1], NbfxToken::DefaultNamespace { value: NbfxText::Chars(ns) } if ns == "http://asb.contracts.messages/20111111" )); assert!(matches!(&body[2], NbfxToken::EndElement)); } #[test] fn read_response_decodes_status_and_values() { use mxaccess_codec::{AsbStatus, AsbVariant, RuntimeValue}; let status = vec![ItemStatus { item: ItemIdentity::absolute_by_name("Tag.A"), status: AsbStatus::default(), error_code: 0, error_code_specified: true, }]; let values = vec![RuntimeValue { timestamp_binary: 0x0123_4567_89AB_CDEF, timestamp_specified: true, value: AsbVariant::from_i32(42), status: AsbStatus { count: 0, payload: vec![], }, }]; // Encode the values array using the same int32-count + per-value // shape that `RuntimeValue.WriteArrayToStream` emits. let mut values_payload = i32::try_from(values.len()) .unwrap_or(i32::MAX) .to_le_bytes() .to_vec(); for v in &values { v.encode_into(&mut values_payload); } let status_payload = crate::contracts::encode_item_status_array(&status); let body = asbidata_request_body( "ReadResponse", &[ BodyField::asbidata("Status", status_payload), BodyField::asbidata("Values", values_payload), ], ); let decoded = decode_read_response(&body).unwrap(); assert_eq!(decoded.status, status); assert_eq!(decoded.values, values); } #[test] fn read_response_with_no_values_returns_empty_vec() { use mxaccess_codec::AsbStatus; let status = vec![ItemStatus { item: ItemIdentity::absolute_by_name("X"), status: AsbStatus::default(), error_code: 0, error_code_specified: true, }]; let payload = crate::contracts::encode_item_status_array(&status); let body = asbidata_request_body("ReadResponse", &[BodyField::asbidata("Status", payload)]); let decoded = decode_read_response(&body).unwrap(); assert_eq!(decoded.status, status); assert!(decoded.values.is_empty()); } #[test] fn read_response_tolerates_empty_asbidata_when_invalid_connection_id() { // Mirrors the live wire capture from F33 — the server returns // empty `` Status + empty `` Values // when it short-circuits on InvalidConnectionId. Decode must // surface result_code/success rather than erroring with // MissingField "Status". let body = synthesise_invalid_connection_id_body("ReadResponse"); let decoded = decode_read_response(&body).unwrap(); assert!(decoded.status.is_empty()); assert!(decoded.values.is_empty()); assert_eq!(decoded.result_code, Some(1)); assert_eq!(decoded.success, Some(false)); } /// Build a body shaped like the live `InvalidConnectionId` response /// captured via `MX_ASB_TRACE_REPLY` against MxDataProvider: /// Result wrapper with `resultCodeField=1`, `successField=false`, /// then two empty `` payloads (Status + the second /// payload, e.g. Values for Read or absent for plain Register). fn synthesise_invalid_connection_id_body(wrapper: &str) -> Vec { use mxaccess_asb_nettcp::nbfx::{NbfxName, NbfxText, NbfxToken}; fn elem(name: &str) -> NbfxToken { NbfxToken::Element { prefix: None, name: NbfxName::Inline(name.to_string()), } } let mut tokens = vec![ elem(wrapper), // Result wrapper elem("Result"), elem("resultCodeField"), NbfxToken::Text(NbfxText::One), NbfxToken::EndElement, elem("successField"), NbfxToken::Text(NbfxText::Bool(false)), NbfxToken::EndElement, NbfxToken::EndElement, // ]; // Two empty payloads. for _ in 0..2 { tokens.push(elem("ASBIData")); tokens.push(NbfxToken::EndElement); } tokens.push(NbfxToken::EndElement); // tokens } #[test] fn publish_write_complete_body_is_empty_wrapper() { let body = build_publish_write_complete_request_body(); assert_eq!(body.len(), 3); assert!(matches!( &body[0], NbfxToken::Element { name: NbfxName::Inline(s), .. } if s == "PublishWriteCompleteRequest" )); assert!(matches!( &body[1], NbfxToken::DefaultNamespace { value: NbfxText::Chars(ns) } if ns == IOM_NS )); assert!(matches!(&body[2], NbfxToken::EndElement)); } #[test] fn publish_write_complete_response_counts_item_write_complete_elements() { // Synthesize a body with two ItemWriteComplete elements. let body = vec![ NbfxToken::Element { prefix: None, name: NbfxName::Inline("PublishWriteCompleteResponse".to_string()), }, NbfxToken::Element { prefix: None, name: NbfxName::Inline("CompleteWrites".to_string()), }, NbfxToken::Element { prefix: None, name: NbfxName::Inline("ItemWriteComplete".to_string()), }, NbfxToken::EndElement, NbfxToken::Element { prefix: None, name: NbfxName::Inline("ItemWriteComplete".to_string()), }, NbfxToken::EndElement, NbfxToken::EndElement, NbfxToken::EndElement, ]; let decoded = decode_publish_write_complete_response(&body).unwrap(); assert_eq!(decoded.complete_writes_count, 2); } #[test] fn publish_write_complete_response_zero_when_no_callbacks() { let body = vec![ NbfxToken::Element { prefix: None, name: NbfxName::Inline("PublishWriteCompleteResponse".to_string()), }, NbfxToken::EndElement, ]; let decoded = decode_publish_write_complete_response(&body).unwrap(); assert_eq!(decoded.complete_writes_count, 0); } #[test] fn delete_monitored_items_body_carries_subscription_id_and_items() { let item = MinimalMonitoredItem::new(ItemIdentity::absolute_by_name("Tag.A"), 1000); let body = build_delete_monitored_items_request_body(11, &[item]); assert!(matches!( &body[0], NbfxToken::Element { name: NbfxName::Inline(s), .. } if s == "DeleteMonitoredItemsRequest" )); let mut saw_id = false; let mut saw_monitored_item = false; for tok in &body { if let NbfxToken::Text(NbfxText::Int64(11)) = tok { saw_id = true; } if let NbfxToken::Element { name: NbfxName::Inline(local), .. } = tok { if local == "MonitoredItem" { saw_monitored_item = true; } } } assert!(saw_id); assert!(saw_monitored_item); } #[test] fn delete_monitored_items_body_omits_require_id_field() { let item = MinimalMonitoredItem::new(ItemIdentity::absolute_by_name("Tag.A"), 1000); let body = build_delete_monitored_items_request_body(7, &[item]); // The DeleteMonitoredItems contract has no RequireId field; // assert it doesn't show up. for tok in &body { if let NbfxToken::Element { name: NbfxName::Inline(local), .. } = tok { assert!(local != "RequireId"); } } } #[test] fn delete_monitored_items_response_round_trip() { use mxaccess_codec::AsbStatus; let status = vec![ItemStatus { item: ItemIdentity::absolute_by_name("Tag.D"), status: AsbStatus::default(), error_code: 0, error_code_specified: true, }]; let payload = crate::contracts::encode_item_status_array(&status); let body = asbidata_request_body( "DeleteMonitoredItemsResponse", &[BodyField::asbidata("Status", payload)], ); let decoded = decode_delete_monitored_items_response(&body).unwrap(); assert_eq!(decoded.status, status); } #[test] fn write_request_body_carries_items_values_and_write_handle() { use mxaccess_codec::AsbVariant; let items = vec![ItemIdentity::absolute_by_name("Tag.X")]; let values = vec![MinimalWriteValue::new(AsbVariant::from_i32(42))]; let body = build_write_request_body(&items, &values, 7); assert!(matches!( &body[0], NbfxToken::Element { name: NbfxName::Inline(s), .. } if s == "WriteBasicRequest" )); // WriteHandle = 7 (Int32) let mut saw_write_handle = false; let mut saw_write_value_element = false; for tok in &body { if let NbfxToken::Text(NbfxText::Int32(7)) = tok { saw_write_handle = true; } if let NbfxToken::Element { name: NbfxName::Inline(local), .. } = tok { if local == "WriteValue" { saw_write_value_element = true; } } } assert!(saw_write_handle); assert!(saw_write_value_element); } #[test] fn write_request_body_pairs_items_and_values_arrays() { use mxaccess_codec::AsbVariant; let items = vec![ ItemIdentity::absolute_by_name("Tag.A"), ItemIdentity::absolute_by_name("Tag.B"), ]; let values = vec![ MinimalWriteValue::new(AsbVariant::from_i32(1)), MinimalWriteValue::new(AsbVariant::from_i32(2)), ]; let body = build_write_request_body(&items, &values, 0); // Two WriteValue elements should appear under . let n_write_value_elements = body .iter() .filter(|tok| { matches!( tok, NbfxToken::Element { name: NbfxName::Inline(s), .. } if s == "WriteValue" ) }) .count(); assert_eq!(n_write_value_elements, 2); } #[test] fn write_response_round_trips_status_array() { use mxaccess_codec::AsbStatus; let status = vec![ItemStatus { item: ItemIdentity::absolute_by_name("Tag.X"), status: AsbStatus::default(), error_code: 0, error_code_specified: true, }]; let payload = crate::contracts::encode_item_status_array(&status); let body = asbidata_request_body("WriteResponse", &[BodyField::asbidata("Status", payload)]); let decoded = decode_write_response(&body).unwrap(); assert_eq!(decoded.status, status); } #[test] fn write_response_missing_status_returns_empty_with_no_result_code() { // Post-F33 the decoder is tolerant of missing Status — it // returns empty status with result_code/success unset. let body = asbidata_request_body("WriteResponse", &[]); let response = decode_write_response(&body).unwrap(); assert!(response.status.is_empty()); assert_eq!(response.result_code, None); assert_eq!(response.success, None); } #[test] fn write_response_surfaces_invalid_connection_id() { let body = synthesise_invalid_connection_id_body("WriteResponse"); let response = decode_write_response(&body).unwrap(); assert!(response.status.is_empty()); assert_eq!(response.result_code, Some(1)); assert_eq!(response.success, Some(false)); } #[test] fn publish_response_surfaces_invalid_connection_id() { let body = synthesise_invalid_connection_id_body("PublishResponse"); let response = decode_publish_response(&body).unwrap(); assert!(response.status.is_empty()); assert!(response.values.is_empty()); assert_eq!(response.result_code, Some(1)); assert_eq!(response.success, Some(false)); } #[test] fn unregister_items_response_surfaces_invalid_connection_id() { let body = synthesise_invalid_connection_id_body("UnregisterItemsResponse"); let response = decode_unregister_items_response(&body).unwrap(); assert!(response.status.is_empty()); assert_eq!(response.result_code, Some(1)); assert_eq!(response.success, Some(false)); } #[test] fn delete_monitored_items_response_surfaces_invalid_connection_id() { let body = synthesise_invalid_connection_id_body("DeleteMonitoredItemsResponse"); let response = decode_delete_monitored_items_response(&body).unwrap(); assert!(response.status.is_empty()); assert_eq!(response.result_code, Some(1)); assert_eq!(response.success, Some(false)); } #[test] fn publish_write_complete_response_surfaces_invalid_connection_id() { let body = synthesise_invalid_connection_id_body("PublishWriteCompleteResponse"); let response = decode_publish_write_complete_response(&body).unwrap(); assert_eq!(response.complete_writes_count, 0); assert_eq!(response.result_code, Some(1)); assert_eq!(response.success, Some(false)); } #[test] fn create_subscription_body_carries_max_queue_and_sample_interval() { let body = build_create_subscription_request_body(0, 1000); assert!(matches!( &body[0], NbfxToken::Element { name: NbfxName::Inline(s), .. } if s == "CreateSubscriptionRequest" )); let int_payloads: Vec = body .iter() .filter_map(|tok| { if let NbfxToken::Text(NbfxText::Int64(v)) = tok { Some(*v) } else { None } }) .collect(); assert_eq!(int_payloads, vec![0, 1000]); } #[test] fn create_subscription_response_decodes_int64_subscription_id() { use mxaccess_asb_nettcp::nbfx::DynamicDictionary; let body = vec![ NbfxToken::Element { prefix: None, name: NbfxName::Inline("CreateSubscriptionResponse".to_string()), }, NbfxToken::Element { prefix: None, name: NbfxName::Inline("SubscriptionId".to_string()), }, NbfxToken::Text(NbfxText::Int64(42)), NbfxToken::EndElement, NbfxToken::EndElement, ]; let dict = DynamicDictionary::new(); let decoded = decode_create_subscription_response(&body, &dict).unwrap(); assert_eq!(decoded.subscription_id, 42); } #[test] fn create_subscription_response_decodes_chars_subscription_id() { // WCF can also emit numerics as text-Chars rather than Int64Text. // Verify the decoder's parse-fallback path handles that. use mxaccess_asb_nettcp::nbfx::DynamicDictionary; let body = vec![ NbfxToken::Element { prefix: None, name: NbfxName::Inline("CreateSubscriptionResponse".to_string()), }, NbfxToken::Element { prefix: None, name: NbfxName::Inline("SubscriptionId".to_string()), }, NbfxToken::Text(NbfxText::Chars("12345".to_string())), NbfxToken::EndElement, NbfxToken::EndElement, ]; let dict = DynamicDictionary::new(); let decoded = decode_create_subscription_response(&body, &dict).unwrap(); assert_eq!(decoded.subscription_id, 12345); } #[test] fn create_subscription_response_missing_id_returns_zero_sentinel() { // Post-F33 the decoder is tolerant of missing SubscriptionId // (that's how the server signals an InvalidConnectionId // failure). It returns subscription_id=0 with result_code/ // success unset; callers inspect those for a real success // signal. use mxaccess_asb_nettcp::nbfx::DynamicDictionary; let body = vec![ NbfxToken::Element { prefix: None, name: NbfxName::Inline("CreateSubscriptionResponse".to_string()), }, NbfxToken::EndElement, ]; let dict = DynamicDictionary::new(); let response = decode_create_subscription_response(&body, &dict).unwrap(); assert_eq!(response.subscription_id, 0); assert_eq!(response.result_code, None); assert_eq!(response.success, None); } #[test] fn create_subscription_response_surfaces_invalid_connection_id() { use mxaccess_asb_nettcp::nbfx::DynamicDictionary; let body = synthesise_invalid_connection_id_body("CreateSubscriptionResponse"); let dict = DynamicDictionary::new(); let response = decode_create_subscription_response(&body, &dict).unwrap(); assert_eq!(response.subscription_id, 0); assert_eq!(response.result_code, Some(1)); assert_eq!(response.success, Some(false)); } #[test] fn add_monitored_items_response_surfaces_invalid_connection_id() { let body = synthesise_invalid_connection_id_body("AddMonitoredItemsResponse"); let response = decode_add_monitored_items_response(&body).unwrap(); assert!(response.status.is_empty()); assert_eq!(response.result_code, Some(1)); assert_eq!(response.success, Some(false)); } #[test] fn add_monitored_items_body_includes_subscription_id_and_items() { let item = MinimalMonitoredItem::new(ItemIdentity::absolute_by_name("Tag.A"), 1000); let body = build_add_monitored_items_request_body(7, &[item], true); assert!(matches!( &body[0], NbfxToken::Element { name: NbfxName::Inline(s), .. } if s == "AddMonitoredItemsRequest" )); // Find SubscriptionId text let mut saw_id_7 = false; let mut saw_monitored_item = false; for tok in &body { if let NbfxToken::Text(NbfxText::Int64(7)) = tok { saw_id_7 = true; } if let NbfxToken::Element { name: NbfxName::Inline(local), .. } = tok { if local == "MonitoredItem" { saw_monitored_item = true; } } } assert!(saw_id_7); assert!(saw_monitored_item); } /// F34 — verify the rewritten `push_monitored_item_body` emits the /// DataContract field-suffix names under the `b` prefix that /// MxDataProvider's binary deserialiser actually expects, in the /// `[DataMember(Order = N)]` order from `AsbContracts.cs:940-965`. /// Captured wire `tests/fixtures/add-monitored-items-request-wire.bin` /// is the source of truth. #[test] fn add_monitored_items_body_uses_data_contract_field_names() { let item = MinimalMonitoredItem::with_active( ItemIdentity::absolute_by_name("TestChildObject.TestInt"), 1000, true, ); let body = build_add_monitored_items_request_body(11, &[item], true); // Collect every (prefix, name) for `Element` tokens. The new // builder emits each `MonitoredItem` child under prefix `b` // with the `[DataMember(Name = "...")]` field-suffix name. let elements: Vec<(Option<&str>, &str)> = body .iter() .filter_map(|tok| { if let NbfxToken::Element { prefix, name: NbfxName::Inline(local), } = tok { Some((prefix.as_deref(), local.as_str())) } else { None } }) .collect(); // The MonitoredItem itself uses prefix `b`. assert!( elements.contains(&(Some("b"), "MonitoredItem")), "expected , got {elements:?}" ); // All 9 DataContract field names appear under prefix `b`, in // declaration order. let expected_dc_fields = [ "activeField", "activeFieldSpecified", "bufferedField", "itemField", "sampleIntervalField", "timeDeadbandField", "timeDeadbandFieldSpecified", "userDataField", "valueDeadbandField", ]; let dc_field_positions: Vec = expected_dc_fields .iter() .map(|f| { elements .iter() .position(|(p, n)| *p == Some("b") && n == f) .unwrap_or_else(|| panic!("missing in body")) }) .collect(); // Strictly increasing → fields appear in DC Order(N) sequence. for window in dc_field_positions.windows(2) { assert!( window[0] < window[1], "DC fields out of order: {expected_dc_fields:?} → {dc_field_positions:?}" ); } // ItemIdentity sub-fields appear under prefix `b` (nested // DataContract serialisation, NOT the binary // fast-path which only kicks in at top-level body members). for ii_field in [ "contextNameField", "idField", "idFieldSpecified", "nameField", "referenceTypeField", "typeField", ] { assert!( elements.contains(&(Some("b"), ii_field)), "expected nested from ItemIdentity, got {elements:?}" ); } // Variant sub-fields (lengthField/payloadField/typeField) // appear for both userDataField and valueDeadbandField. let length_count = elements .iter() .filter(|(p, n)| *p == Some("b") && *n == "lengthField") .count(); let payload_count = elements .iter() .filter(|(p, n)| *p == Some("b") && *n == "payloadField") .count(); assert_eq!( length_count, 2, "expected 2x (userData + valueDeadband Variants)" ); assert_eq!( payload_count, 2, "expected 2x (userData + valueDeadband Variants)" ); // The legacy XmlSerializer property names (Active / Item / // SampleInterval / Buffered) MUST NOT appear on the wire — the // canonical-XML signing path uses those names, but the binary // body uses the DataContract suffix names exclusively. Asserts // the legacy NBFX-bytes shape is fully retired for this op. for legacy in ["Active", "Buffered", "SampleInterval", "ASBIData"] { assert!( !elements.iter().any(|(_, n)| *n == legacy), "legacy XmlSerializer name <{legacy}> should not appear in DC body" ); } // The wrapper declares `xmlns:b` (DC namespace) and // `xmlns:i` (XSI). Verified by scanning for NamespaceDeclaration // tokens immediately following the `` open. let xmlns_decls: Vec<(&str, &NbfxText)> = body .iter() .filter_map(|tok| { if let NbfxToken::NamespaceDeclaration { prefix, value } = tok { Some((prefix.as_str(), value)) } else { None } }) .collect(); assert!( xmlns_decls.iter().any(|(p, v)| *p == "b" && matches!(v, NbfxText::Chars(s) if s == DC_ASBIDATAV2_NS)), "expected xmlns:b={DC_ASBIDATAV2_NS:?} on " ); assert!( xmlns_decls.iter().any(|(p, v)| *p == "i" && matches!(v, NbfxText::Chars(s) if s == XSI_NS)), "expected xmlns:i={XSI_NS:?} on " ); } #[test] fn delete_subscription_body_carries_subscription_id() { let body = build_delete_subscription_request_body(99); let int_payloads: Vec = body .iter() .filter_map(|tok| { if let NbfxToken::Text(NbfxText::Int64(v)) = tok { Some(*v) } else { None } }) .collect(); assert_eq!(int_payloads, vec![99]); } #[test] fn publish_body_carries_subscription_id() { let body = build_publish_request_body(123); assert!(matches!( &body[0], NbfxToken::Element { name: NbfxName::Inline(s), .. } if s == "PublishRequest" )); let int_payloads: Vec = body .iter() .filter_map(|tok| { if let NbfxToken::Text(NbfxText::Int64(v)) = tok { Some(*v) } else { None } }) .collect(); assert_eq!(int_payloads, vec![123]); } #[test] fn publish_response_decodes_status_and_values() { use mxaccess_codec::{AsbStatus, AsbVariant, RuntimeValue}; let status = vec![ItemStatus { item: ItemIdentity::absolute_by_name("Tag.A"), status: AsbStatus::default(), error_code: 0, error_code_specified: true, }]; let values = vec![MonitoredItemValue { item: ItemIdentity::absolute_by_name("Tag.A"), value: RuntimeValue { timestamp_binary: 555, timestamp_specified: true, value: AsbVariant::from_i32(7), status: AsbStatus::default(), }, user_data: AsbVariant::empty(), }]; let status_payload = crate::contracts::encode_item_status_array(&status); let values_payload = crate::contracts::encode_monitored_item_value_array(&values); let body = asbidata_request_body( "PublishResponse", &[ BodyField::asbidata("Status", status_payload), BodyField::asbidata("Values", values_payload), ], ); let decoded = decode_publish_response(&body).unwrap(); assert_eq!(decoded.status, status); assert_eq!(decoded.values, values); } #[test] fn add_monitored_items_response_round_trip() { use mxaccess_codec::AsbStatus; let status = vec![ItemStatus { item: ItemIdentity::absolute_by_name("Tag.M"), status: AsbStatus::default(), error_code: 0, error_code_specified: true, }]; let payload = crate::contracts::encode_item_status_array(&status); let body = asbidata_request_body( "AddMonitoredItemsResponse", &[BodyField::asbidata("Status", payload)], ); let decoded = decode_add_monitored_items_response(&body).unwrap(); assert_eq!(decoded.status, status); assert!(!decoded.item_capabilities_present); } #[test] fn empty_items_array_still_produces_valid_envelope() { let body = build_register_items_request_body(&[], false, false); let env = crate::SoapEnvelope::new(crate::actions::REGISTER_ITEMS).with_body_tokens(body); let mut dyn_w = DynamicDictionary::new(); let bytes = crate::encode_envelope(&env, &mut dyn_w).unwrap(); // Round-trip — at minimum, the action must come back. let mut dyn_r = DynamicDictionary::new(); let decoded = crate::decode_envelope(&bytes, &mut dyn_r).unwrap(); assert_eq!( decoded.action.as_deref(), Some(crate::actions::REGISTER_ITEMS) ); } }