[M5] mxaccess-asb: F25 step 8 — subscription operations

CreateSubscription / AddMonitoredItems / Publish / DeleteSubscription.
Completes the IASBIDataV2 read-and-subscribe path; remaining ops
(Write/PublishWriteComplete/DeleteMonitoredItems) are mechanical
extensions of the same pattern.

Contracts:
* `MonitoredItemValue` codec (IAsbCustomSerializableType binary
  fast-path: ItemIdentity + RuntimeValue + AsbVariant per
  `AsbContracts.cs:1064-1068`) with array codec (4-byte int32
  count + per-element body, mirrors `WriteArrayToStream` at
  `cs:1095-1103`).

Request builders:
* `build_create_subscription_request_body(max_queue_size,
  sample_interval)` — primitive fields per `cs:215-223`.
* `build_delete_subscription_request_body(subscription_id)` —
  primitive field per `cs:232-237`.
* `build_publish_request_body(subscription_id)` — primitive field
  per `cs:287-292`.
* `build_add_monitored_items_request_body(subscription_id, items,
  require_id)` — minimal MonitoredItem shape (Item +
  SampleInterval + Buffered). Full optional-field set
  (Active/TimeDeadband/ValueDeadband/UserData) deferred to a later
  iteration once a live capture confirms the WCF DataContract
  XML wire form.

Response decoders:
* `decode_create_subscription_response` — single int64
  SubscriptionId field. Decoder accepts Int64Text, Int32Text,
  Zero/One, or numeric-string Chars (covers all WCF binary
  numeric encodings).
* `decode_add_monitored_items_response` — Status array +
  ItemCapabilities-presence flag (mirrors RegisterItemsResponse).
* `decode_publish_response` — Status array + Values
  (MonitoredItemValue) array.

`BodyField::Int64Element` variant added for the primitive
SubscriptionId / MaxQueueSize / SampleInterval fields. `uint64`
helper casts to i64 (covers proven value range; if ulong > i64::MAX
ever appears we'll add UInt64Text to F21's NbfxText enum).

Client wrappers (4 new methods on AsbClient):
* `create_subscription(max_queue_size, sample_interval)`
* `add_monitored_items(subscription_id, items, require_id)`
* `publish(subscription_id)`
* `delete_subscription(subscription_id)`

11 new tests cover:
* MonitoredItemValue round-trip + array round-trip.
* CreateSubscription request body shape (Int64 payloads).
* CreateSubscription response decoder via Int64Text.
* CreateSubscription response decoder via Chars text fallback.
* CreateSubscription response missing-field error.
* AddMonitoredItems body carries SubscriptionId + MonitoredItem
  elements.
* AddMonitoredItems response Status round-trip.
* DeleteSubscription body carries SubscriptionId.
* Publish request body shape.
* Publish response Status + Values round-trip.

Workspace: 691 tests pass (was 680, +11). The asb-subscribe example
can now do create_subscription → add_monitored_items → publish-loop
→ delete_subscription once wire-byte reconciliation against a live
capture confirms the MonitoredItem XML shape.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-05 12:57:59 -04:00
parent c6570dcd06
commit b543eb1f84
5 changed files with 774 additions and 13 deletions
+146 -1
View File
@@ -21,7 +21,7 @@
//! round-trip — so the per-type cost is small once the
//! [`ItemIdentity`] reference establishes it.
use mxaccess_codec::{AsbStatus, CodecError};
use mxaccess_codec::{AsbStatus, AsbVariant, CodecError, RuntimeValue};
/// `ItemIdentity` per `AsbContracts.cs:533-633`. Wire layout:
///
@@ -219,6 +219,101 @@ pub fn encode_item_status_array(items: &[ItemStatus]) -> Vec<u8> {
out
}
/// `MonitoredItemValue` per `AsbContracts.cs:1032-1104`.
/// `IAsbCustomSerializableType` binary fast-path; payload order from
/// `WriteToStream` at `cs:1064-1068`:
///
/// 1. `Item` — [`ItemIdentity`] binary.
/// 2. `Value` — [`RuntimeValue`] binary (timestamp + variant + status).
/// 3. `UserData` — [`AsbVariant`] binary.
///
/// `MonitoredItemValue` arrives in `PublishResponse` as part of the
/// `Values` array — one entry per delivered sample.
#[derive(Debug, Clone, PartialEq)]
pub struct MonitoredItemValue {
pub item: ItemIdentity,
pub value: RuntimeValue,
pub user_data: AsbVariant,
}
impl MonitoredItemValue {
pub fn encode_into(&self, out: &mut Vec<u8>) {
self.item.encode_into(out);
self.value.encode_into(out);
self.user_data.encode_into(out);
}
pub fn encode(&self) -> Vec<u8> {
let mut out = Vec::new();
self.encode_into(&mut out);
out
}
pub fn decode(input: &[u8]) -> Result<(Self, usize), CodecError> {
let (item, item_consumed) = ItemIdentity::decode(input)?;
let mut cursor = item_consumed;
let value_tail = input.get(cursor..).ok_or(CodecError::ShortRead {
expected: 1,
actual: 0,
})?;
let (value, value_consumed) = RuntimeValue::decode(value_tail)?;
cursor += value_consumed;
let user_data_tail = input.get(cursor..).ok_or(CodecError::ShortRead {
expected: 1,
actual: 0,
})?;
let (user_data, user_data_consumed) = AsbVariant::decode(user_data_tail)?;
cursor += user_data_consumed;
Ok((
Self {
item,
value,
user_data,
},
cursor,
))
}
}
/// Encode a `MonitoredItemValue[]` array per `WriteArrayToStream`
/// (`cs:1095-1103`) — 4-byte int32 count + per-element body.
pub fn encode_monitored_item_value_array(values: &[MonitoredItemValue]) -> Vec<u8> {
let mut out = Vec::new();
let count = i32::try_from(values.len()).unwrap_or(i32::MAX);
out.extend_from_slice(&count.to_le_bytes());
for v in values {
v.encode_into(&mut out);
}
out
}
/// Decode a `MonitoredItemValue[]` array. Mirrors
/// `MonitoredItemValue.InitializeArrayFromStream` (`cs:1084-1093`).
pub fn decode_monitored_item_value_array(
input: &[u8],
) -> Result<Vec<MonitoredItemValue>, CodecError> {
let mut cursor = 0usize;
let count = read_i32_le(input, &mut cursor)?;
if count < 0 {
return Err(CodecError::Decode {
offset: 0,
reason: "negative monitored-item-value array count",
buffer_len: input.len(),
});
}
let mut out = Vec::with_capacity(count as usize);
for _ in 0..count {
let tail = input.get(cursor..).ok_or(CodecError::ShortRead {
expected: 1,
actual: 0,
})?;
let (v, consumed) = MonitoredItemValue::decode(tail)?;
cursor += consumed;
out.push(v);
}
Ok(out)
}
/// Encode an array of `IAsbCustomSerializableType` items per
/// `AsbDataCustomSerializer.WriteObjectContent` array branch
/// (`AsbContracts.cs:1583-1591` — calls `WriteArrayToStream` which
@@ -494,6 +589,56 @@ mod tests {
assert_eq!(decoded, arr);
}
#[test]
fn monitored_item_value_round_trip() {
let mv = MonitoredItemValue {
item: ItemIdentity::absolute_by_name("Tag.X"),
value: RuntimeValue {
timestamp_binary: 0x0123_4567,
timestamp_specified: true,
value: AsbVariant::from_i32(100),
status: AsbStatus::default(),
},
user_data: AsbVariant::empty(),
};
let bytes = mv.encode();
let (decoded, consumed) = MonitoredItemValue::decode(&bytes).unwrap();
assert_eq!(consumed, bytes.len());
assert_eq!(decoded, mv);
}
#[test]
fn monitored_item_value_array_round_trip() {
let arr = vec![
MonitoredItemValue {
item: ItemIdentity::absolute_by_name("Tag.A"),
value: RuntimeValue {
timestamp_binary: 1,
timestamp_specified: true,
value: AsbVariant::from_i32(1),
status: AsbStatus::default(),
},
user_data: AsbVariant::empty(),
},
MonitoredItemValue {
item: ItemIdentity::absolute_by_name("Tag.B"),
value: RuntimeValue {
timestamp_binary: 2,
timestamp_specified: false,
value: AsbVariant::from_string("hello"),
status: AsbStatus {
count: 1,
payload: vec![0xC0],
},
},
user_data: AsbVariant::from_bool(true),
},
];
let bytes = encode_monitored_item_value_array(&arr);
let decoded = decode_monitored_item_value_array(&bytes).unwrap();
assert_eq!(decoded, arr);
}
#[test]
fn item_identity_array_count_is_le_int32() {
let items = vec![ItemIdentity::default(); 7];