[F34 partial] mxaccess-asb: fix collect_asbidata_payloads + add Active flag
rust / build / test / clippy / fmt (push) Has been cancelled

Investigation via examples/asb-relay.rs middleman captured the full
S→C bytes of a working PublishResponse from the .NET probe against
MxDataProvider. Decoder fix verified by regression test against the
captured fixture; one further wire-format gap surfaced and is filed.

Closed in this commit:

1. collect_asbidata_payloads filtered out empty <ASBIData/> elements
   so positional payload[N] indexing collapsed when Status was
   empty-but-present. The wire form for PublishResponse is:
     <Status><ASBIData/></Status>          ← empty placeholder
     <Values><ASBIData>{bytes}</ASBIData></Values>
   Our decoder lost the positional info and read Values as Status,
   then panicked on the malformed parse. Fix: always push every
   <ASBIData> element (empty or not) so payloads[0]=Status and
   payloads[1]=Values stay aligned. New regression test
   tests/publish_capture.rs runs the full decode chain over the
   captured wire bytes (305-byte frame at
   tests/fixtures/publish-response-with-value.bin) and asserts
   values.len() == 1.

2. MinimalMonitoredItem.active: Option<bool> + new with_active()
   constructor. The .NET reference's MxAsbDataClient.AddMonitoredItems
   defaults to active: true (cs:441). Without <Active>true</Active>
   on the wire, MxDataProvider treats the subscription as inactive
   and Publish polls return empty Values. Both binary build and
   canonical XML emitters now conditionally emit <Active> when
   active.is_some(). Shared push_monitored_item_body helper
   eliminates the duplicate MonitoredItem encoder between
   AddMonitoredItems and DeleteMonitoredItems builders.

3. SampleInterval unit: clarified as **milliseconds** in
   MinimalMonitoredItem.sample_interval doc + the example
   (sample_interval_ticks → sample_interval_ms = 1000). Matches the
   .NET reference's `ulong sampleInterval = 1000` default.

Open: F34's deeper finding — `MonitoredItem`'s wire schema is
DataContract field-suffix names (`activeField`, `bufferedField`,
`itemField`, `sampleIntervalField`, etc., per the per-session NBFX
dictionary the .NET probe declares), NOT XmlSerializer property
names (`Active`, `Buffered`, `Item`, `SampleInterval`). Our binary
NBFX builder still uses the property names, so MxDataProvider
silently fails to register monitored items — successField=true with
a 0-length Status array. The fix needs a complete rebuild of
build_add_monitored_items_request_body and
build_delete_monitored_items_request_body to use the field-suffix
names plus emit the *Specified siblings (activeFieldSpecified,
idFieldSpecified, etc.) as their own elements. The HMAC canonical
XML side is unaffected (XmlSerializer naming is correct there;
verified byte-equal to .NET via the F28 fixtures). Detailed in
design/followups.md F34's "Open" section.

Live verification of the F34-partial bonus context:
  - Read still returns 99 end-to-end via canonical XML signing.
  - AddMonitoredItems still returns Status[0] = 0 items
    (server doesn't recognize our DataContract-misnamed payload).
  - Publish still returns 0 values (the F34-open consequence).
  - All other 13 canonical-XML signed ops succeed at the request
    level (no SOAP faults, no HMAC rejections).

Workspace: mxaccess-asb 95 → 96 (+1 capture-driven decoder test);
default-feature clippy clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-06 02:49:11 -04:00
parent 0771664092
commit fb40e4c20b
6 changed files with 212 additions and 81 deletions
+113 -77
View File
@@ -519,36 +519,7 @@ pub fn build_delete_monitored_items_request_body(
},
];
for item in items {
tokens.push(NbfxToken::Element {
prefix: None,
name: NbfxName::Inline("MonitoredItem".to_string()),
});
tokens.push(NbfxToken::Element {
prefix: None,
name: NbfxName::Inline("Item".to_string()),
});
tokens.push(NbfxToken::Element {
prefix: None,
name: NbfxName::Inline("ASBIData".to_string()),
});
tokens.push(NbfxToken::Text(NbfxText::Bytes(item.item.encode())));
tokens.push(NbfxToken::EndElement); // </ASBIData>
tokens.push(NbfxToken::EndElement); // </Item>
tokens.push(NbfxToken::Element {
prefix: None,
name: NbfxName::Inline("SampleInterval".to_string()),
});
tokens.push(NbfxToken::Text(NbfxText::Int64(
item.sample_interval as i64,
)));
tokens.push(NbfxToken::EndElement);
tokens.push(NbfxToken::Element {
prefix: None,
name: NbfxName::Inline("Buffered".to_string()),
});
tokens.push(NbfxToken::Text(NbfxText::Bool(item.buffered)));
tokens.push(NbfxToken::EndElement);
tokens.push(NbfxToken::EndElement); // </MonitoredItem>
push_monitored_item_body(&mut tokens, item);
}
tokens.push(NbfxToken::EndElement); // </Items>
tokens.push(NbfxToken::EndElement); // </DeleteMonitoredItemsRequest>
@@ -803,40 +774,7 @@ pub fn build_add_monitored_items_request_body(
},
];
for item in items {
// <MonitoredItem>
tokens.push(NbfxToken::Element {
prefix: None,
name: NbfxName::Inline("MonitoredItem".to_string()),
});
// <Item><ASBIData>{ItemIdentity binary}</ASBIData></Item>
tokens.push(NbfxToken::Element {
prefix: None,
name: NbfxName::Inline("Item".to_string()),
});
tokens.push(NbfxToken::Element {
prefix: None,
name: NbfxName::Inline("ASBIData".to_string()),
});
tokens.push(NbfxToken::Text(NbfxText::Bytes(item.item.encode())));
tokens.push(NbfxToken::EndElement); // </ASBIData>
tokens.push(NbfxToken::EndElement); // </Item>
// <SampleInterval>
tokens.push(NbfxToken::Element {
prefix: None,
name: NbfxName::Inline("SampleInterval".to_string()),
});
tokens.push(NbfxToken::Text(NbfxText::Int64(
item.sample_interval as i64,
)));
tokens.push(NbfxToken::EndElement);
// <Buffered>
tokens.push(NbfxToken::Element {
prefix: None,
name: NbfxName::Inline("Buffered".to_string()),
});
tokens.push(NbfxToken::Text(NbfxText::Bool(item.buffered)));
tokens.push(NbfxToken::EndElement);
tokens.push(NbfxToken::EndElement); // </MonitoredItem>
push_monitored_item_body(&mut tokens, item);
}
tokens.push(NbfxToken::EndElement); // </Items>
// <RequireId>
@@ -850,11 +788,62 @@ pub fn build_add_monitored_items_request_body(
tokens
}
/// Minimal `MonitoredItem` shape covering just `Item`, `SampleInterval`,
/// and `Buffered`. The full .NET `MonitoredItem` (`AsbContracts.cs:936-1030`)
/// also has optional Active, TimeDeadband, ValueDeadband, and UserData
/// fields. Those are deferred to a later F25 iteration once a live
/// capture confirms the wire-byte form.
/// Emit a single `<MonitoredItem>...</MonitoredItem>` NBFX subtree.
/// Shared between AddMonitoredItems and DeleteMonitoredItems request
/// builders. Field order matches the .NET `MonitoredItem` declaration:
/// Item / SampleInterval / Active (when Specified) / Buffered.
fn push_monitored_item_body(tokens: &mut Vec<NbfxToken>, item: &MinimalMonitoredItem) {
tokens.push(NbfxToken::Element {
prefix: None,
name: NbfxName::Inline("MonitoredItem".to_string()),
});
// <Item><ASBIData>{ItemIdentity binary}</ASBIData></Item>
tokens.push(NbfxToken::Element {
prefix: None,
name: NbfxName::Inline("Item".to_string()),
});
tokens.push(NbfxToken::Element {
prefix: None,
name: NbfxName::Inline("ASBIData".to_string()),
});
tokens.push(NbfxToken::Text(NbfxText::Bytes(item.item.encode())));
tokens.push(NbfxToken::EndElement); // </ASBIData>
tokens.push(NbfxToken::EndElement); // </Item>
// <SampleInterval>
tokens.push(NbfxToken::Element {
prefix: None,
name: NbfxName::Inline("SampleInterval".to_string()),
});
tokens.push(NbfxToken::Text(NbfxText::Int64(
item.sample_interval as i64,
)));
tokens.push(NbfxToken::EndElement);
// <Active> — emitted only when ActiveSpecified=true
// (`MonitoredItem.Active` setter at `AsbContracts.cs:982-987`).
// Required to make MxDataProvider actually deliver values; F34.
if let Some(active) = item.active {
tokens.push(NbfxToken::Element {
prefix: None,
name: NbfxName::Inline("Active".to_string()),
});
tokens.push(NbfxToken::Text(NbfxText::Bool(active)));
tokens.push(NbfxToken::EndElement);
}
// <Buffered>
tokens.push(NbfxToken::Element {
prefix: None,
name: NbfxName::Inline("Buffered".to_string()),
});
tokens.push(NbfxToken::Text(NbfxText::Bool(item.buffered)));
tokens.push(NbfxToken::EndElement);
tokens.push(NbfxToken::EndElement); // </MonitoredItem>
}
/// Minimal `MonitoredItem` shape covering `Item`, `SampleInterval`,
/// `Buffered`, and the `*Specified`-gated `Active` field. The full
/// .NET `MonitoredItem` (`AsbContracts.cs:936-1030`) also has
/// `TimeDeadband`, `ValueDeadband`, and `UserData` — deferred until a
/// live capture confirms each one's wire-byte form.
///
/// **`sample_interval` unit is milliseconds**, NOT 100-ns ticks. The
/// .NET reference's `MxAsbDataClient.AddMonitoredItems` defaults to
@@ -864,20 +853,61 @@ pub fn build_add_monitored_items_request_body(
/// schedule the next sample ~2.8 hours out — `Publish` polls then
/// always come back empty until the misinterpreted timer expires.
/// Verified live 2026-05-06.
///
/// **`active` is the `*Specified` knob that decides whether
/// `<Active>` appears on the wire**. `None` → not emitted (server
/// defaults to inactive — `Publish` polls return zero values).
/// `Some(true)` → emitted as `<Active>true</Active>`; the
/// MxDataProvider then actually delivers values from the
/// subscription. The .NET reference's `AddMonitoredItems` defaults
/// to `active: true` (`MxAsbDataClient.cs:441`); the
/// `MonitoredItem.Active` setter at `AsbContracts.cs:982-987`
/// auto-flips `ActiveSpecified=true` so the wire includes the
/// element. F34: this asymmetry is what made our subscribe path
/// see zero values where .NET sees real ones — verified live
/// 2026-05-06.
#[derive(Debug, Clone, PartialEq)]
pub struct MinimalMonitoredItem {
pub item: ItemIdentity,
/// Sample interval in **milliseconds** (matches the .NET wire form).
pub sample_interval: u64,
pub buffered: bool,
/// `Some(b)` emits `<Active>{b}</Active>` on the wire (the
/// `*Specified` pattern). `None` omits the element entirely —
/// MxDataProvider then treats the item as inactive and delivers
/// no values. Use `Some(true)` to actually receive samples.
pub active: Option<bool>,
}
impl MinimalMonitoredItem {
/// Build a default `MinimalMonitoredItem`: item + interval, no
/// Active flag, no Buffered. Matches the .NET `new MonitoredItem
/// { Item = ..., SampleInterval = ... }` shape used by the
/// canonical-XML fixtures.
///
/// **For live subscriptions that should actually deliver values,
/// prefer [`Self::with_active`].** Without `Active=true` on the
/// wire, the server defaults to inactive and `Publish` returns
/// empty payloads.
pub fn new(item: ItemIdentity, sample_interval: u64) -> Self {
Self {
item,
sample_interval,
buffered: false,
active: None,
}
}
/// Build a `MinimalMonitoredItem` with `Active=true`. This is
/// what the .NET reference's `AddMonitoredItems` emits by default
/// (`MxAsbDataClient.cs:441`) and what makes MxDataProvider
/// actually deliver values from the subscription.
pub fn with_active(item: ItemIdentity, sample_interval: u64, active: bool) -> Self {
Self {
item,
sample_interval,
buffered: false,
active: Some(active),
}
}
}
@@ -1368,18 +1398,24 @@ pub fn collect_asbidata_payloads(tokens: &[NbfxToken]) -> Vec<Vec<u8>> {
// token, but a `Bytes`-then-`EndElement` (from the
// `WithEndElement` variant) leaves a sequence of
// `Bytes` tokens we walk here.
let mut combined: Option<Vec<u8>> = None;
let mut buf = Vec::new();
while let Some(NbfxToken::Text(NbfxText::Bytes(payload))) = tokens.get(inner)
{
match combined.as_mut() {
Some(buf) => buf.extend_from_slice(payload),
None => combined = Some(payload.clone()),
}
buf.extend_from_slice(payload);
inner += 1;
}
if let Some(buf) = combined {
out.push(buf);
}
// F34: ALWAYS push, even when buf is empty. The wire
// uses `<ASBIData></ASBIData>` (empty) as positional
// placeholders — e.g. `PublishResponse` emits an
// empty `<ASBIData/>` for `Status` when the per-item
// status array is empty, followed by a populated
// `<ASBIData>{values}</ASBIData>` for `Values`. If we
// skip the empty one, the Values payload shifts down
// to index 0 where the decoder reads it as Status
// and corrupts the parse. Captured live 2026-05-06
// via `examples/asb-relay.rs` middleman; fixture at
// `tests/fixtures/publish-response-with-value.bin`.
out.push(buf);
}
}
idx += 1;
@@ -507,6 +507,13 @@ fn emit_monitored_item(s: &mut String, item: &MinimalMonitoredItem) {
// xmlns redeclaration since they're already in iom:2.
emit_inline_item_identity(s, " ", "Item", &item.item);
emit_iom_text(s, " ", "SampleInterval", &item.sample_interval.to_string());
// <Active> is *Specified-gated; emit only when the consumer
// opted in (None → not on the wire). MxDataProvider's
// Publish path requires Active=true to actually deliver values
// — F34, verified live 2026-05-06.
if let Some(active) = item.active {
emit_iom_text(s, " ", "Active", if active { "true" } else { "false" });
}
// ValueDeadband + UserData: default-Variant shape (type=0,
// length=0, payload nil).
emit_iom_default_variant(s, " ", "ValueDeadband");
@@ -0,0 +1,65 @@
//! F34 — wire-byte trace of a captured `PublishResponse`.
//!
//! `tests/fixtures/publish-response-with-value.bin` is the verbatim
//! S→C bytes the .NET probe (`MxAsbClient.Probe --subscribe`) saw on
//! its first `Publish` poll against the local AVEVA install on
//! 2026-05-06, captured via `examples/asb-relay.rs` middleman with
//! `--via=net.tcp://127.0.0.1:8088/...`. The .NET probe extracted
//! `preview:99` from this exchange — the value bytes
//! `[63 00 00 00]` (= 99 in LE i32) are visible at file offset 0x110.
//!
//! Test goal: dump `decode_envelope` + `decode_publish_response`
//! output so we can see exactly where our value-extraction diverges
//! from .NET's (F34 hypotheses).
//!
//! Frame layout: 3-byte NMF SizedEnvelope header (`06 ae 02`,
//! varint length = 302) + 302-byte SOAP envelope.
#![allow(
clippy::unwrap_used,
clippy::expect_used,
clippy::indexing_slicing,
clippy::panic
)]
use mxaccess_asb::{decode_envelope, decode_publish_response};
use mxaccess_asb_nettcp::nbfx::DynamicDictionary;
#[test]
fn publish_response_capture_decoder_trace() {
let raw = std::fs::read(
std::path::Path::new(env!("CARGO_MANIFEST_DIR"))
.join("tests/fixtures/publish-response-with-value.bin"),
)
.expect("read fixture");
assert_eq!(raw.len(), 305, "frame length sanity check");
// Strip 3-byte NMF SizedEnvelope header.
let envelope = &raw[3..];
assert_eq!(envelope.len(), 302);
let mut dict = DynamicDictionary::new();
let decoded = decode_envelope(envelope, &mut dict).expect("decode_envelope succeeds");
eprintln!("=== body tokens ({} total) ===", decoded.body_tokens.len());
for (i, tok) in decoded.body_tokens.iter().enumerate() {
eprintln!(" body[{i}]={tok:?}");
}
let response = decode_publish_response(&decoded.body_tokens)
.expect("decode_publish_response succeeds");
eprintln!("=== decoded PublishResponse ===");
eprintln!(" status_count: {}", response.status.len());
eprintln!(" values_count: {}", response.values.len());
eprintln!(" result_code: {:?}", response.result_code);
eprintln!(" success: {:?}", response.success);
// The .NET probe extracted 1 value with preview:99 from the same
// wire bytes. If our decoder reports 0 values, the test fails and
// the eprintln body-token dump above shows where the gap is.
assert_eq!(
response.values.len(),
1,
".NET sees 1 value (preview:99) from the same bytes; our decoder reads {}",
response.values.len(),
);
}
@@ -155,9 +155,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
sub_response.subscription_id, sub_response.result_code, sub_response.success
);
let monitored = vec![MinimalMonitoredItem::new(
// F34: MUST set Active=true (`MonitoredItem.ActiveSpecified=true`
// on the wire). Without it MxDataProvider treats the item as
// inactive and Publish polls always return zero values.
let monitored = vec![MinimalMonitoredItem::with_active(
ItemIdentity::absolute_by_name(&env.tag),
sample_interval_ms,
true,
)];
eprintln!("adding monitored items [canonical XML AddMonitoredItems]");