From b543eb1f8457b9eff8499827735be9fc7d1548a3 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Tue, 5 May 2026 12:57:59 -0400 Subject: [PATCH] =?UTF-8?q?[M5]=20mxaccess-asb:=20F25=20step=208=20?= =?UTF-8?q?=E2=80=94=20subscription=20operations?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit CreateSubscription / AddMonitoredItems / Publish / DeleteSubscription. Completes the IASBIDataV2 read-and-subscribe path; remaining ops (Write/PublishWriteComplete/DeleteMonitoredItems) are mechanical extensions of the same pattern. Contracts: * `MonitoredItemValue` codec (IAsbCustomSerializableType binary fast-path: ItemIdentity + RuntimeValue + AsbVariant per `AsbContracts.cs:1064-1068`) with array codec (4-byte int32 count + per-element body, mirrors `WriteArrayToStream` at `cs:1095-1103`). Request builders: * `build_create_subscription_request_body(max_queue_size, sample_interval)` — primitive fields per `cs:215-223`. * `build_delete_subscription_request_body(subscription_id)` — primitive field per `cs:232-237`. * `build_publish_request_body(subscription_id)` — primitive field per `cs:287-292`. * `build_add_monitored_items_request_body(subscription_id, items, require_id)` — minimal MonitoredItem shape (Item + SampleInterval + Buffered). Full optional-field set (Active/TimeDeadband/ValueDeadband/UserData) deferred to a later iteration once a live capture confirms the WCF DataContract XML wire form. Response decoders: * `decode_create_subscription_response` — single int64 SubscriptionId field. Decoder accepts Int64Text, Int32Text, Zero/One, or numeric-string Chars (covers all WCF binary numeric encodings). * `decode_add_monitored_items_response` — Status array + ItemCapabilities-presence flag (mirrors RegisterItemsResponse). * `decode_publish_response` — Status array + Values (MonitoredItemValue) array. `BodyField::Int64Element` variant added for the primitive SubscriptionId / MaxQueueSize / SampleInterval fields. `uint64` helper casts to i64 (covers proven value range; if ulong > i64::MAX ever appears we'll add UInt64Text to F21's NbfxText enum). Client wrappers (4 new methods on AsbClient): * `create_subscription(max_queue_size, sample_interval)` * `add_monitored_items(subscription_id, items, require_id)` * `publish(subscription_id)` * `delete_subscription(subscription_id)` 11 new tests cover: * MonitoredItemValue round-trip + array round-trip. * CreateSubscription request body shape (Int64 payloads). * CreateSubscription response decoder via Int64Text. * CreateSubscription response decoder via Chars text fallback. * CreateSubscription response missing-field error. * AddMonitoredItems body carries SubscriptionId + MonitoredItem elements. * AddMonitoredItems response Status round-trip. * DeleteSubscription body carries SubscriptionId. * Publish request body shape. * Publish response Status + Values round-trip. Workspace: 691 tests pass (was 680, +11). The asb-subscribe example can now do create_subscription → add_monitored_items → publish-loop → delete_subscription once wire-byte reconciliation against a live capture confirms the MonitoredItem XML shape. Co-Authored-By: Claude Opus 4.7 (1M context) --- design/followups.md | 6 +- rust/crates/mxaccess-asb/src/client.rs | 75 ++- rust/crates/mxaccess-asb/src/contracts.rs | 147 +++++- rust/crates/mxaccess-asb/src/lib.rs | 18 +- rust/crates/mxaccess-asb/src/operations.rs | 541 ++++++++++++++++++++- 5 files changed, 774 insertions(+), 13 deletions(-) diff --git a/design/followups.md b/design/followups.md index 766e5d2..5547f60 100644 --- a/design/followups.md +++ b/design/followups.md @@ -46,7 +46,11 @@ move to `## Resolved` with a date + commit hash. **Resolves when:** F19-F26 are all closed and the four DoD bullets above pass. -**Cumulative execution log.** F19 + F23 (`ed17c07`); F24 (`7611d9e`); F20 (`9dfd193`); F22 (`43c10a1`); F21 (`5f98558`); F25 step 1 (`25dbd8d`); F25 step 2 (`a2b8989`); F25 step 3 (`c4bf0a0`); F25 step 4 (`1e59249`); F25 step 5 (`9b8133f`); F25 step 6 (`321b796`); F25 step 7 (`1b1ee1e`); F26 step 1 (`8a0f92b`); F26 step 2 (`14bb529`); `examples/asb-subscribe.rs` rewrite landed in this commit: +**Cumulative execution log.** F19 + F23 (`ed17c07`); F24 (`7611d9e`); F20 (`9dfd193`); F22 (`43c10a1`); F21 (`5f98558`); F25 step 1 (`25dbd8d`); F25 step 2 (`a2b8989`); F25 step 3 (`c4bf0a0`); F25 step 4 (`1e59249`); F25 step 5 (`9b8133f`); F25 step 6 (`321b796`); F25 step 7 (`1b1ee1e`); F26 step 1 (`8a0f92b`); F26 step 2 (`14bb529`); example rewrite (`c6570dc`); F25 step 8 landed in this commit: +- F25 step 8: subscription operations — `CreateSubscription`, `AddMonitoredItems`, `Publish`, `DeleteSubscription`. New `MonitoredItemValue` codec in contracts.rs (`IAsbCustomSerializableType` binary fast-path: ItemIdentity + RuntimeValue + AsbVariant per `cs:1064-1068`). New `MinimalMonitoredItem` request struct exposing only the proven fields (Item, SampleInterval, Buffered) — optional Active/TimeDeadband/ValueDeadband/UserData deferred to a later iteration once a live capture confirms the WCF DataContract XML shape. Per-operation builders, response decoders, and client wrappers follow the established F25 pattern. New `BodyField::Int64Element` variant for the `` / `` / `` primitive fields. The subscription path lifts the `examples/asb-subscribe.rs` "Read-loop" caveat — once wire-byte reconciliation lands, the example can do `create_subscription → add_monitored_items → publish-loop → delete_subscription`. 11 new tests cover MonitoredItemValue round-trip + array, CreateSubscription request body shape + response decode (Int64 + Chars text fallback + missing-field error), AddMonitoredItems request body shape + response decode, DeleteSubscription request body, Publish request + response (with full Status + Values round-trip via the in-memory body synthesis pattern). + +**Earlier slices:** +- example rewrite (commit `c6570dc`): - `examples/asb-subscribe.rs` rewrite: replaces the M5 placeholder with an actual end-to-end demo that exercises the F25 + F26 stack: `AsbTransport::connect` (TCP + preamble + DH handshake) → `register_items` → `read` → `disconnect` → `send_end`. Reads endpoint config from `MX_ASB_HOST`, `MX_ASB_PASSPHRASE`, `MX_ASB_VIA`, `MX_TEST_TAG` env vars (analogous to the NMX `connect-write-read` example's pattern). Defaults port 5074 when host omits one; defaults via URI to `net.tcp://{host}/ASBService` when `MX_ASB_VIA` is unset. Without `MX_LIVE` set, prints the `Setup-LiveProbeEnv.ps1` hint and exits cleanly. Connection-id is a fresh 16-byte random buffer (matches .NET's `Guid.NewGuid()` at `MxAsbDataClient.cs:36`). The example is a Read-loop until F25 subscription ops land — at that point the example will gain a Publish-loop and live up to its name. **Earlier slices:** diff --git a/rust/crates/mxaccess-asb/src/client.rs b/rust/crates/mxaccess-asb/src/client.rs index 4cef3f6..45ebd4a 100644 --- a/rust/crates/mxaccess-asb/src/client.rs +++ b/rust/crates/mxaccess-asb/src/client.rs @@ -55,10 +55,16 @@ use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use crate::contracts::{ItemIdentity, ItemStatus}; use crate::envelope::{ConnectionValidator, EnvelopeError, SoapEnvelope}; use crate::operations::{ - ConnectResponse, OperationError, ReadResponse, RegisterItemsResponse, UnregisterItemsResponse, - build_authenticate_me_request_body, build_connect_request_body, build_disconnect_request_body, - build_keep_alive_request_body, build_read_request_body, build_register_items_request_body, - build_unregister_items_request_body, decode_connect_response, decode_read_response, + AddMonitoredItemsResponse, ConnectResponse, CreateSubscriptionResponse, + DeleteSubscriptionResponse, MinimalMonitoredItem, OperationError, PublishResponse, + ReadResponse, RegisterItemsResponse, UnregisterItemsResponse, + build_add_monitored_items_request_body, build_authenticate_me_request_body, + build_connect_request_body, build_create_subscription_request_body, + build_delete_subscription_request_body, build_disconnect_request_body, + build_keep_alive_request_body, build_publish_request_body, build_read_request_body, + build_register_items_request_body, build_unregister_items_request_body, + decode_add_monitored_items_response, decode_connect_response, + decode_create_subscription_response, decode_publish_response, decode_read_response, decode_register_items_response, decode_unregister_items_response, }; use crate::{actions, decode_envelope, encode_envelope}; @@ -335,6 +341,67 @@ impl AsbClient { Ok(decode_read_response(&response.body_tokens)?) } + /// `CreateSubscription` operation — allocates a server-side + /// subscription and returns its ID. Caller threads the ID through + /// subsequent `add_monitored_items` / `publish` / + /// `delete_subscription` calls. + pub async fn create_subscription( + &mut self, + max_queue_size: i64, + sample_interval: u64, + ) -> Result { + let body = build_create_subscription_request_body(max_queue_size, sample_interval); + let response = self + .send_signed_envelope(actions::CREATE_SUBSCRIPTION, body, false) + .await?; + Ok(decode_create_subscription_response( + &response.body_tokens, + &self.read_dictionary, + )?) + } + + /// `AddMonitoredItems` operation — adds items to an existing + /// subscription. Uses [`MinimalMonitoredItem`] (Item + + /// SampleInterval + Buffered); optional fields are deferred to a + /// later F25 iteration. + pub async fn add_monitored_items( + &mut self, + subscription_id: i64, + items: &[MinimalMonitoredItem], + require_id: bool, + ) -> Result { + let body = build_add_monitored_items_request_body(subscription_id, items, require_id); + let response = self + .send_signed_envelope(actions::ADD_MONITORED_ITEMS, body, false) + .await?; + Ok(decode_add_monitored_items_response(&response.body_tokens)?) + } + + /// `Publish` operation — long-polls the subscription queue for + /// available samples. Typical pattern is to call this in a loop + /// with a small `tokio::time::timeout` per call. + pub async fn publish(&mut self, subscription_id: i64) -> Result { + let body = build_publish_request_body(subscription_id); + let response = self + .send_signed_envelope(actions::PUBLISH, body, false) + .await?; + Ok(decode_publish_response(&response.body_tokens)?) + } + + /// `DeleteSubscription` operation — releases a server-side + /// subscription. The response body is empty per + /// `AsbContracts.cs:239-240`. + pub async fn delete_subscription( + &mut self, + subscription_id: i64, + ) -> Result { + let body = build_delete_subscription_request_body(subscription_id); + let _ = self + .send_signed_envelope(actions::DELETE_SUBSCRIPTION, body, false) + .await?; + Ok(DeleteSubscriptionResponse) + } + /// `RegisterItems` operation — sends a signed `RegisterItemsIn` /// SOAP envelope and decodes the `RegisterItemsResponse`. pub async fn register_items( diff --git a/rust/crates/mxaccess-asb/src/contracts.rs b/rust/crates/mxaccess-asb/src/contracts.rs index 2394002..e223b85 100644 --- a/rust/crates/mxaccess-asb/src/contracts.rs +++ b/rust/crates/mxaccess-asb/src/contracts.rs @@ -21,7 +21,7 @@ //! round-trip — so the per-type cost is small once the //! [`ItemIdentity`] reference establishes it. -use mxaccess_codec::{AsbStatus, CodecError}; +use mxaccess_codec::{AsbStatus, AsbVariant, CodecError, RuntimeValue}; /// `ItemIdentity` per `AsbContracts.cs:533-633`. Wire layout: /// @@ -219,6 +219,101 @@ pub fn encode_item_status_array(items: &[ItemStatus]) -> Vec { out } +/// `MonitoredItemValue` per `AsbContracts.cs:1032-1104`. +/// `IAsbCustomSerializableType` binary fast-path; payload order from +/// `WriteToStream` at `cs:1064-1068`: +/// +/// 1. `Item` — [`ItemIdentity`] binary. +/// 2. `Value` — [`RuntimeValue`] binary (timestamp + variant + status). +/// 3. `UserData` — [`AsbVariant`] binary. +/// +/// `MonitoredItemValue` arrives in `PublishResponse` as part of the +/// `Values` array — one entry per delivered sample. +#[derive(Debug, Clone, PartialEq)] +pub struct MonitoredItemValue { + pub item: ItemIdentity, + pub value: RuntimeValue, + pub user_data: AsbVariant, +} + +impl MonitoredItemValue { + pub fn encode_into(&self, out: &mut Vec) { + self.item.encode_into(out); + self.value.encode_into(out); + self.user_data.encode_into(out); + } + + pub fn encode(&self) -> Vec { + let mut out = Vec::new(); + self.encode_into(&mut out); + out + } + + pub fn decode(input: &[u8]) -> Result<(Self, usize), CodecError> { + let (item, item_consumed) = ItemIdentity::decode(input)?; + let mut cursor = item_consumed; + let value_tail = input.get(cursor..).ok_or(CodecError::ShortRead { + expected: 1, + actual: 0, + })?; + let (value, value_consumed) = RuntimeValue::decode(value_tail)?; + cursor += value_consumed; + let user_data_tail = input.get(cursor..).ok_or(CodecError::ShortRead { + expected: 1, + actual: 0, + })?; + let (user_data, user_data_consumed) = AsbVariant::decode(user_data_tail)?; + cursor += user_data_consumed; + Ok(( + Self { + item, + value, + user_data, + }, + cursor, + )) + } +} + +/// Encode a `MonitoredItemValue[]` array per `WriteArrayToStream` +/// (`cs:1095-1103`) — 4-byte int32 count + per-element body. +pub fn encode_monitored_item_value_array(values: &[MonitoredItemValue]) -> Vec { + let mut out = Vec::new(); + let count = i32::try_from(values.len()).unwrap_or(i32::MAX); + out.extend_from_slice(&count.to_le_bytes()); + for v in values { + v.encode_into(&mut out); + } + out +} + +/// Decode a `MonitoredItemValue[]` array. Mirrors +/// `MonitoredItemValue.InitializeArrayFromStream` (`cs:1084-1093`). +pub fn decode_monitored_item_value_array( + input: &[u8], +) -> Result, CodecError> { + let mut cursor = 0usize; + let count = read_i32_le(input, &mut cursor)?; + if count < 0 { + return Err(CodecError::Decode { + offset: 0, + reason: "negative monitored-item-value array count", + buffer_len: input.len(), + }); + } + let mut out = Vec::with_capacity(count as usize); + for _ in 0..count { + let tail = input.get(cursor..).ok_or(CodecError::ShortRead { + expected: 1, + actual: 0, + })?; + let (v, consumed) = MonitoredItemValue::decode(tail)?; + cursor += consumed; + out.push(v); + } + Ok(out) +} + /// Encode an array of `IAsbCustomSerializableType` items per /// `AsbDataCustomSerializer.WriteObjectContent` array branch /// (`AsbContracts.cs:1583-1591` — calls `WriteArrayToStream` which @@ -494,6 +589,56 @@ mod tests { assert_eq!(decoded, arr); } + #[test] + fn monitored_item_value_round_trip() { + let mv = MonitoredItemValue { + item: ItemIdentity::absolute_by_name("Tag.X"), + value: RuntimeValue { + timestamp_binary: 0x0123_4567, + timestamp_specified: true, + value: AsbVariant::from_i32(100), + status: AsbStatus::default(), + }, + user_data: AsbVariant::empty(), + }; + let bytes = mv.encode(); + let (decoded, consumed) = MonitoredItemValue::decode(&bytes).unwrap(); + assert_eq!(consumed, bytes.len()); + assert_eq!(decoded, mv); + } + + #[test] + fn monitored_item_value_array_round_trip() { + let arr = vec![ + MonitoredItemValue { + item: ItemIdentity::absolute_by_name("Tag.A"), + value: RuntimeValue { + timestamp_binary: 1, + timestamp_specified: true, + value: AsbVariant::from_i32(1), + status: AsbStatus::default(), + }, + user_data: AsbVariant::empty(), + }, + MonitoredItemValue { + item: ItemIdentity::absolute_by_name("Tag.B"), + value: RuntimeValue { + timestamp_binary: 2, + timestamp_specified: false, + value: AsbVariant::from_string("hello"), + status: AsbStatus { + count: 1, + payload: vec![0xC0], + }, + }, + user_data: AsbVariant::from_bool(true), + }, + ]; + let bytes = encode_monitored_item_value_array(&arr); + let decoded = decode_monitored_item_value_array(&bytes).unwrap(); + assert_eq!(decoded, arr); + } + #[test] fn item_identity_array_count_is_le_int32() { let items = vec![ItemIdentity::default(); 7]; diff --git a/rust/crates/mxaccess-asb/src/lib.rs b/rust/crates/mxaccess-asb/src/lib.rs index cc25b9b..6000dae 100644 --- a/rust/crates/mxaccess-asb/src/lib.rs +++ b/rust/crates/mxaccess-asb/src/lib.rs @@ -17,18 +17,24 @@ pub mod operations; pub use client::{AsbClient, ClientError, PreambleMode}; pub use contracts::{ - ItemIdentity, ItemIdentityType, ItemReferenceType, ItemStatus, decode_item_identity_array, - decode_item_status_array, encode_item_identity_array, encode_item_status_array, + ItemIdentity, ItemIdentityType, ItemReferenceType, ItemStatus, MonitoredItemValue, + decode_item_identity_array, decode_item_status_array, decode_monitored_item_value_array, + encode_item_identity_array, encode_item_status_array, encode_monitored_item_value_array, }; pub use envelope::{ ConnectionValidator, DecodedEnvelope, EnvelopeError, SoapEnvelope, actions, decode_envelope, encode_envelope, }; pub use operations::{ - AuthenticationDataBytes, ConnectResponse, OperationError, ReadResponse, RegisterItemsResponse, - UnregisterItemsResponse, build_authenticate_me_request_body, build_connect_request_body, - build_disconnect_request_body, build_keep_alive_request_body, build_read_request_body, + AddMonitoredItemsResponse, AuthenticationDataBytes, ConnectResponse, + CreateSubscriptionResponse, DeleteSubscriptionResponse, MinimalMonitoredItem, OperationError, + PublishResponse, ReadResponse, RegisterItemsResponse, UnregisterItemsResponse, + build_add_monitored_items_request_body, build_authenticate_me_request_body, + build_connect_request_body, build_create_subscription_request_body, + build_delete_subscription_request_body, build_disconnect_request_body, + build_keep_alive_request_body, build_publish_request_body, build_read_request_body, build_register_items_request_body, build_unregister_items_request_body, - collect_asbidata_payloads, decode_connect_response, decode_read_response, + collect_asbidata_payloads, decode_add_monitored_items_response, decode_connect_response, + decode_create_subscription_response, decode_publish_response, decode_read_response, decode_register_items_response, decode_unregister_items_response, }; diff --git a/rust/crates/mxaccess-asb/src/operations.rs b/rust/crates/mxaccess-asb/src/operations.rs index 719e365..69f22a6 100644 --- a/rust/crates/mxaccess-asb/src/operations.rs +++ b/rust/crates/mxaccess-asb/src/operations.rs @@ -39,7 +39,8 @@ use mxaccess_asb_nettcp::nbfx::{NbfxName, NbfxText, NbfxToken}; use mxaccess_codec::{CodecError, RuntimeValue}; use crate::contracts::{ - ItemIdentity, ItemStatus, decode_item_status_array, encode_item_identity_array, + ItemIdentity, ItemStatus, MonitoredItemValue, decode_item_status_array, + decode_monitored_item_value_array, encode_item_identity_array, }; /// Build the NBFX token stream for the body of a `RegisterItemsIn` @@ -394,6 +395,313 @@ fn find_inline_text( None } +// ---- Subscription operations (F25 step 8) ------------------------------- + +/// Build the NBFX token stream for a `CreateSubscriptionIn` request +/// body. Mirrors `AsbContracts.cs:215-223`: +/// ```xml +/// +/// {long} +/// {ulong} +/// +/// ``` +pub fn build_create_subscription_request_body( + max_queue_size: i64, + sample_interval: u64, +) -> Vec { + asbidata_request_body( + "CreateSubscriptionRequest", + &[ + BodyField::int64("MaxQueueSize", max_queue_size), + BodyField::uint64("SampleInterval", sample_interval), + ], + ) +} + +/// Build the NBFX token stream for `DeleteSubscriptionIn`. Mirrors +/// `AsbContracts.cs:232-237`: +/// ```xml +/// +/// {long} +/// +/// ``` +pub fn build_delete_subscription_request_body(subscription_id: i64) -> Vec { + asbidata_request_body( + "DeleteSubscriptionRequest", + &[BodyField::int64("SubscriptionId", subscription_id)], + ) +} + +/// Build the NBFX token stream for `PublishIn`. Mirrors +/// `AsbContracts.cs:287-292`: +/// ```xml +/// +/// {long} +/// +/// ``` +pub fn build_publish_request_body(subscription_id: i64) -> Vec { + asbidata_request_body( + "PublishRequest", + &[BodyField::int64("SubscriptionId", subscription_id)], + ) +} + +/// Build the NBFX token stream for `AddMonitoredItemsIn`. Mirrors +/// `AsbContracts.cs:242-254` — the **minimal** form that supplies only +/// the required `Item` + `SampleInterval` per `MonitoredItem`. Optional +/// `Active` / `TimeDeadband` / `ValueDeadband` / `UserData` / `Buffered` +/// fields are NOT emitted (their `*Specified=false` in WCF would +/// suppress them anyway). Wire shape: +/// +/// ```xml +/// +/// {long} +/// +/// +/// {ItemIdentity binary} +/// {ulong} +/// false +/// +/// ... +/// +/// {bool} +/// +/// ``` +/// +/// **Wire-byte caveat**: `MonitoredItem` is a regular WCF DataContract +/// (not `IAsbCustomSerializableType`). The exact element-ordering / +/// xsi:type attribute / namespace prefix layout depends on which +/// serializer WCF picks; this builder emits the most plausible shape +/// and the live-probe iteration will reconcile. F25 follow-up will +/// expand to the full optional-field set once a capture is available. +pub fn build_add_monitored_items_request_body( + subscription_id: i64, + items: &[MinimalMonitoredItem], + require_id: bool, +) -> Vec { + let mut tokens = vec![ + NbfxToken::Element { + prefix: None, + name: NbfxName::Inline("AddMonitoredItemsRequest".to_string()), + }, + NbfxToken::DefaultNamespace { + value: NbfxText::Chars(IOM_NS.to_string()), + }, + // + NbfxToken::Element { + prefix: None, + name: NbfxName::Inline("SubscriptionId".to_string()), + }, + NbfxToken::Text(NbfxText::Int64(subscription_id)), + NbfxToken::EndElement, + // + NbfxToken::Element { + prefix: None, + name: NbfxName::Inline("Items".to_string()), + }, + ]; + for item in items { + // + tokens.push(NbfxToken::Element { + prefix: None, + name: NbfxName::Inline("MonitoredItem".to_string()), + }); + // {ItemIdentity binary} + tokens.push(NbfxToken::Element { + prefix: None, + name: NbfxName::Inline("Item".to_string()), + }); + tokens.push(NbfxToken::Element { + prefix: None, + name: NbfxName::Inline("ASBIData".to_string()), + }); + tokens.push(NbfxToken::Text(NbfxText::Bytes(item.item.encode()))); + tokens.push(NbfxToken::EndElement); // + tokens.push(NbfxToken::EndElement); // + // + tokens.push(NbfxToken::Element { + prefix: None, + name: NbfxName::Inline("SampleInterval".to_string()), + }); + tokens.push(NbfxToken::Text(NbfxText::Int64( + item.sample_interval as i64, + ))); + tokens.push(NbfxToken::EndElement); + // + tokens.push(NbfxToken::Element { + prefix: None, + name: NbfxName::Inline("Buffered".to_string()), + }); + tokens.push(NbfxToken::Text(NbfxText::Bool(item.buffered))); + tokens.push(NbfxToken::EndElement); + tokens.push(NbfxToken::EndElement); // + } + tokens.push(NbfxToken::EndElement); // + // + tokens.push(NbfxToken::Element { + prefix: None, + name: NbfxName::Inline("RequireId".to_string()), + }); + tokens.push(NbfxToken::Text(NbfxText::Bool(require_id))); + tokens.push(NbfxToken::EndElement); + tokens.push(NbfxToken::EndElement); // + tokens +} + +/// Minimal `MonitoredItem` shape covering just `Item`, `SampleInterval`, +/// and `Buffered`. The full .NET `MonitoredItem` (`AsbContracts.cs:936-1030`) +/// also has optional Active, TimeDeadband, ValueDeadband, and UserData +/// fields. Those are deferred to a later F25 iteration once a live +/// capture confirms the wire-byte form. +#[derive(Debug, Clone, PartialEq)] +pub struct MinimalMonitoredItem { + pub item: ItemIdentity, + pub sample_interval: u64, + pub buffered: bool, +} + +impl MinimalMonitoredItem { + pub fn new(item: ItemIdentity, sample_interval: u64) -> Self { + Self { + item, + sample_interval, + buffered: false, + } + } +} + +/// Decoded `CreateSubscriptionResponse`. Single field per +/// `AsbContracts.cs:225-230`. +#[derive(Debug, Clone, PartialEq)] +pub struct CreateSubscriptionResponse { + pub subscription_id: i64, +} + +/// Decode a `CreateSubscriptionResponse` SOAP body. Looks for an inline +/// `` text element under the wrapper. +pub fn decode_create_subscription_response( + body_tokens: &[NbfxToken], + dynamic: &mxaccess_asb_nettcp::nbfx::DynamicDictionary, +) -> Result { + let id = find_inline_int64(body_tokens, "SubscriptionId", dynamic).ok_or( + OperationError::MissingField { + field: "SubscriptionId", + }, + )?; + Ok(CreateSubscriptionResponse { + subscription_id: id, + }) +} + +/// Decoded `AddMonitoredItemsResponse`. `ItemCapabilities` is regular +/// WCF XML (not the binary fast-path) — currently surfaces as a presence +/// flag, mirroring `RegisterItemsResponse`. +#[derive(Debug, Clone, PartialEq)] +pub struct AddMonitoredItemsResponse { + pub status: Vec, + pub item_capabilities_present: bool, +} + +pub fn decode_add_monitored_items_response( + body_tokens: &[NbfxToken], +) -> Result { + let payloads = collect_asbidata_payloads(body_tokens, "Status"); + let status_payload = payloads + .into_iter() + .next() + .ok_or(OperationError::MissingField { field: "Status" })?; + let status = decode_item_status_array(&status_payload)?; + let item_capabilities_present = find_element_named(body_tokens, "ItemCapabilities").is_some(); + Ok(AddMonitoredItemsResponse { + status, + item_capabilities_present, + }) +} + +/// Decoded `PublishResponse`. Mirrors `AsbContracts.cs:294-304`: +/// `Status` (per-item operation status) + `Values` (one +/// `MonitoredItemValue` per delivered sample). +#[derive(Debug, Clone, PartialEq)] +pub struct PublishResponse { + pub status: Vec, + pub values: Vec, +} + +pub fn decode_publish_response( + body_tokens: &[NbfxToken], +) -> Result { + let status_payload = collect_asbidata_payloads(body_tokens, "Status") + .into_iter() + .next() + .ok_or(OperationError::MissingField { field: "Status" })?; + let status = decode_item_status_array(&status_payload)?; + + let values = match collect_asbidata_payloads(body_tokens, "Values") + .into_iter() + .next() + { + Some(payload) => decode_monitored_item_value_array(&payload)?, + None => Vec::new(), + }; + Ok(PublishResponse { status, values }) +} + +/// Decoded `DeleteSubscriptionResponse`. Empty body per +/// `AsbContracts.cs:239-240` (`ConnectedResponse;` — no fields). +/// Always returns `Ok(())` regardless of body content; the decoder +/// exists for symmetry with the other operations. +#[derive(Debug, Clone, PartialEq, Eq, Default)] +pub struct DeleteSubscriptionResponse; + +/// Walk `tokens` and find an inline int64 element-text under the named +/// element. Used for `` and similar primitive response +/// fields. Permissive — skips attributes/xmlns decls between Element +/// and Text. +fn find_inline_int64( + tokens: &[NbfxToken], + name: &str, + dynamic: &mxaccess_asb_nettcp::nbfx::DynamicDictionary, +) -> Option { + let mut idx = 0; + while let Some(tok) = tokens.get(idx) { + if let NbfxToken::Element { + name: NbfxName::Inline(local), + .. + } = tok + { + if local == name { + let mut inner = idx + 1; + while matches!( + tokens.get(inner), + Some(NbfxToken::Attribute { .. }) + | Some(NbfxToken::DefaultNamespace { .. }) + | Some(NbfxToken::NamespaceDeclaration { .. }) + ) { + inner += 1; + } + match tokens.get(inner) { + Some(NbfxToken::Text(NbfxText::Int64(v))) => return Some(*v), + Some(NbfxToken::Text(NbfxText::Int32(v))) => return Some(*v as i64), + Some(NbfxToken::Text(NbfxText::Zero)) => return Some(0), + Some(NbfxToken::Text(NbfxText::One)) => return Some(1), + Some(NbfxToken::Text(text)) => { + // Fall back to text resolution + parse. + if let Some(s) = text.resolve(dynamic) { + if let Ok(v) = s.parse::() { + return Some(v); + } + } + return None; + } + _ => return None, + } + } + } + idx += 1; + } + None +} + /// Build the NBFX token stream for a `KeepAliveIn` request body. The /// `KeepAlive` contract has no body fields beyond the inherited /// `ConnectionValidator` header, so the body is just the empty wrapper @@ -624,9 +932,14 @@ pub fn build_unregister_items_request_body(items: &[ItemIdentity]) -> Vec` with base64-binary content (NBFX /// represents that as `Bytes` text records). AsbiDataElement { @@ -640,6 +953,23 @@ impl BodyField { Self::BoolElement { name, value } } + fn int64(name: &'static str, value: i64) -> Self { + Self::Int64Element { name, value } + } + + /// `u64` is wider than `Int64Text`. WCF binary encodes large `ulong`s + /// as `UInt64Text` (record `0xB2`) which our F21 codec doesn't yet + /// emit; for the current proven value range (sample intervals, + /// queue sizes — all well under `i64::MAX`) we cast to `i64`. If a + /// future capture exposes values > `i64::MAX` we'll need to add + /// `UInt64` to `NbfxText`. + fn uint64(name: &'static str, value: u64) -> Self { + Self::Int64Element { + name, + value: value as i64, + } + } + fn asbidata(name: &'static str, payload: Vec) -> Self { Self::AsbiDataElement { name, payload } } @@ -667,6 +997,14 @@ fn asbidata_request_body(outer: &str, fields: &[BodyField]) -> Vec { tokens.push(NbfxToken::Text(NbfxText::Bool(*value))); tokens.push(NbfxToken::EndElement); } + BodyField::Int64Element { name, value } => { + tokens.push(NbfxToken::Element { + prefix: None, + name: NbfxName::Inline((*name).to_string()), + }); + tokens.push(NbfxToken::Text(NbfxText::Int64(*value))); + tokens.push(NbfxToken::EndElement); + } BodyField::AsbiDataElement { name, payload } => { tokens.push(NbfxToken::Element { prefix: None, @@ -1263,6 +1601,207 @@ mod tests { assert!(decoded.values.is_empty()); } + #[test] + fn create_subscription_body_carries_max_queue_and_sample_interval() { + let body = build_create_subscription_request_body(0, 1000); + assert!(matches!( + &body[0], + NbfxToken::Element { name: NbfxName::Inline(s), .. } if s == "CreateSubscriptionRequest" + )); + let int_payloads: Vec = body + .iter() + .filter_map(|tok| { + if let NbfxToken::Text(NbfxText::Int64(v)) = tok { + Some(*v) + } else { + None + } + }) + .collect(); + assert_eq!(int_payloads, vec![0, 1000]); + } + + #[test] + fn create_subscription_response_decodes_int64_subscription_id() { + use mxaccess_asb_nettcp::nbfx::DynamicDictionary; + let body = vec![ + NbfxToken::Element { + prefix: None, + name: NbfxName::Inline("CreateSubscriptionResponse".to_string()), + }, + NbfxToken::Element { + prefix: None, + name: NbfxName::Inline("SubscriptionId".to_string()), + }, + NbfxToken::Text(NbfxText::Int64(42)), + NbfxToken::EndElement, + NbfxToken::EndElement, + ]; + let dict = DynamicDictionary::new(); + let decoded = decode_create_subscription_response(&body, &dict).unwrap(); + assert_eq!(decoded.subscription_id, 42); + } + + #[test] + fn create_subscription_response_decodes_chars_subscription_id() { + // WCF can also emit numerics as text-Chars rather than Int64Text. + // Verify the decoder's parse-fallback path handles that. + use mxaccess_asb_nettcp::nbfx::DynamicDictionary; + let body = vec![ + NbfxToken::Element { + prefix: None, + name: NbfxName::Inline("CreateSubscriptionResponse".to_string()), + }, + NbfxToken::Element { + prefix: None, + name: NbfxName::Inline("SubscriptionId".to_string()), + }, + NbfxToken::Text(NbfxText::Chars("12345".to_string())), + NbfxToken::EndElement, + NbfxToken::EndElement, + ]; + let dict = DynamicDictionary::new(); + let decoded = decode_create_subscription_response(&body, &dict).unwrap(); + assert_eq!(decoded.subscription_id, 12345); + } + + #[test] + fn create_subscription_response_missing_id_fails() { + use mxaccess_asb_nettcp::nbfx::DynamicDictionary; + let body = vec![ + NbfxToken::Element { + prefix: None, + name: NbfxName::Inline("CreateSubscriptionResponse".to_string()), + }, + NbfxToken::EndElement, + ]; + let dict = DynamicDictionary::new(); + let err = decode_create_subscription_response(&body, &dict).unwrap_err(); + assert!(matches!( + err, + OperationError::MissingField { + field: "SubscriptionId" + } + )); + } + + #[test] + fn add_monitored_items_body_includes_subscription_id_and_items() { + let item = MinimalMonitoredItem::new(ItemIdentity::absolute_by_name("Tag.A"), 1000); + let body = build_add_monitored_items_request_body(7, &[item], true); + assert!(matches!( + &body[0], + NbfxToken::Element { name: NbfxName::Inline(s), .. } if s == "AddMonitoredItemsRequest" + )); + // Find SubscriptionId text + let mut saw_id_7 = false; + let mut saw_monitored_item = false; + for tok in &body { + if let NbfxToken::Text(NbfxText::Int64(7)) = tok { + saw_id_7 = true; + } + if let NbfxToken::Element { + name: NbfxName::Inline(local), + .. + } = tok + { + if local == "MonitoredItem" { + saw_monitored_item = true; + } + } + } + assert!(saw_id_7); + assert!(saw_monitored_item); + } + + #[test] + fn delete_subscription_body_carries_subscription_id() { + let body = build_delete_subscription_request_body(99); + let int_payloads: Vec = body + .iter() + .filter_map(|tok| { + if let NbfxToken::Text(NbfxText::Int64(v)) = tok { + Some(*v) + } else { + None + } + }) + .collect(); + assert_eq!(int_payloads, vec![99]); + } + + #[test] + fn publish_body_carries_subscription_id() { + let body = build_publish_request_body(123); + assert!(matches!( + &body[0], + NbfxToken::Element { name: NbfxName::Inline(s), .. } if s == "PublishRequest" + )); + let int_payloads: Vec = body + .iter() + .filter_map(|tok| { + if let NbfxToken::Text(NbfxText::Int64(v)) = tok { + Some(*v) + } else { + None + } + }) + .collect(); + assert_eq!(int_payloads, vec![123]); + } + + #[test] + fn publish_response_decodes_status_and_values() { + use mxaccess_codec::{AsbStatus, AsbVariant, RuntimeValue}; + let status = vec![ItemStatus { + item: ItemIdentity::absolute_by_name("Tag.A"), + status: AsbStatus::default(), + error_code: 0, + error_code_specified: true, + }]; + let values = vec![MonitoredItemValue { + item: ItemIdentity::absolute_by_name("Tag.A"), + value: RuntimeValue { + timestamp_binary: 555, + timestamp_specified: true, + value: AsbVariant::from_i32(7), + status: AsbStatus::default(), + }, + user_data: AsbVariant::empty(), + }]; + let status_payload = crate::contracts::encode_item_status_array(&status); + let values_payload = crate::contracts::encode_monitored_item_value_array(&values); + let body = asbidata_request_body( + "PublishResponse", + &[ + BodyField::asbidata("Status", status_payload), + BodyField::asbidata("Values", values_payload), + ], + ); + let decoded = decode_publish_response(&body).unwrap(); + assert_eq!(decoded.status, status); + assert_eq!(decoded.values, values); + } + + #[test] + fn add_monitored_items_response_round_trip() { + use mxaccess_codec::AsbStatus; + let status = vec![ItemStatus { + item: ItemIdentity::absolute_by_name("Tag.M"), + status: AsbStatus::default(), + error_code: 0, + error_code_specified: true, + }]; + let payload = crate::contracts::encode_item_status_array(&status); + let body = asbidata_request_body( + "AddMonitoredItemsResponse", + &[BodyField::asbidata("Status", payload)], + ); + let decoded = decode_add_monitored_items_response(&body).unwrap(); + assert_eq!(decoded.status, status); + assert!(!decoded.item_capabilities_present); + } + #[test] fn empty_items_array_still_produces_valid_envelope() { let body = build_register_items_request_body(&[], false, false);