af15fe7587
The router used to call NmxSubscriptionMessage::parse_inner directly on the COM-stub-delivered body, but the wire bytes arrive wrapped in a ProcessDataReceived envelope (46-byte header + optional 4-byte length prefix); parse_inner expects post-envelope bytes. Result: every 0x33 DataUpdate that ever arrived was silently dropped. Mirrors the .NET reference's MxNativeSession.OnCallbackReceived flow at cs:582-606 — three sequential parse attempts: 1. NmxOperationStatusMessage::try_parse_process_data_received_body (already wired) 2. NmxReferenceRegistrationResultMessage::try_parse_... (NEW — was missing) 3. NmxSubscriptionMessage::try_parse_process_data_received_body (NEW — was wrong) Adds: - NmxSubscriptionMessage::try_parse_process_data_received_body — peels envelope via NmxObservedEnvelope::parse_process_data_received_body_flexible, then dispatches to existing parse_inner. - NmxReferenceRegistrationResultMessage::try_parse_process_data_received_body — same shape, for the 0x11 registration-result frame. - Router branch for 0x11 — currently traces the assigned item_handle and drops the frame (matches the .NET reference, which fires a ReferenceRegistrationReceived event with no consumer in the codebase). - Router fall-through trace! when neither path matches, so future unparseable bodies surface in RUST_LOG=trace instead of vanishing. - DcomCallbackSink::forward — trace! per inbound callback so RUST_LOG=mxaccess_callback=trace surfaces opnum + size. - crates/mxaccess-compat/tests/buffered_subscribe_live.rs — F49 step 1 live test that drives subscribe_buffered + a 500ms-cadence writer. Also pulls tracing-subscriber as a dev-dep so the test can dump router activity. Existing router_task_decodes_callback_invoked_into_broadcast unit test updated to wrap its synthetic 0x32 body in an envelope so the new parse path actually accepts it. Live result: F56 — the buffered round-trip *registers* successfully (RegisterReference returns HRESULT 0; engine sends one 0x11 RegistrationResult + one 51-byte op-status per write, perfectly clocked) but the engine never sends a 0x33 DataUpdate. Rust-port- specific gap vs the .NET reference's working buffered path; root cause is likely a field-level difference in the RegisterReference body or a missing post-RegisterReference step. Captured as F56 in design/followups.md, blocking F49 step 1; F56's DoD is the same live test reporting >=3 DataChange arrivals. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1241 lines
50 KiB
Rust
1241 lines
50 KiB
Rust
//! `NmxSubscriptionMessage` — `0x32` SubscriptionStatus / `0x33` DataUpdate
|
|
//! callback decoder.
|
|
//!
|
|
//! Direct port of `src/MxNativeCodec/NmxSubscriptionMessage.cs`. This module
|
|
//! decodes the inner-body of the NMX subscription callback delivered through
|
|
//! the NMX `TransferData` envelope (callers should call
|
|
//! [`NmxTransferEnvelope::parse`](crate::NmxTransferEnvelope::parse) first to
|
|
//! peel the 46-byte transfer header, then hand the remaining bytes to
|
|
//! [`NmxSubscriptionMessage::parse_inner`]).
|
|
//!
|
|
//! ## Wire layout summary
|
|
//!
|
|
//! Both message kinds share the 23-byte preamble
|
|
//! (`NmxSubscriptionMessage.cs:52-55`):
|
|
//!
|
|
//! ```text
|
|
//! offset size field
|
|
//! 0 1 command (0x32 SubscriptionStatus, 0x33 DataUpdate)
|
|
//! 1 2 version u16 LE
|
|
//! 3 4 record_count i32 LE
|
|
//! 7 16 operation_id GUID (.NET layout)
|
|
//! ```
|
|
//!
|
|
//! `0x32` SubscriptionStatus extends to 39 bytes by appending a 16-byte
|
|
//! `item_correlation_id` GUID at offset 23 (`NmxSubscriptionMessage.cs:98-99`).
|
|
//! `0x33` DataUpdate has **no** correlation id — its records start at offset 23
|
|
//! (`NmxSubscriptionMessage.cs:76-77`).
|
|
//!
|
|
//! ## Record layout
|
|
//!
|
|
//! - SubscriptionStatus record: `status i32 + detail_status i32 + quality u16
|
|
//! + timestamp_filetime i64 + wire_kind u8 + value` (`hasDetailStatus=true`,
|
|
//! `NmxSubscriptionMessage.cs:117-155`).
|
|
//! - DataUpdate record: `quality u16 + timestamp_filetime i64 + wire_kind u8
|
|
//! + value` (`hasDetailStatus=false`).
|
|
//!
|
|
//! ## Multi-record DataUpdate (F44 evidence)
|
|
//!
|
|
//! The .NET reference rejects DataUpdate bodies with `record_count != 1`
|
|
//! (`NmxSubscriptionMessage.cs:71-74`). The Rust codec **diverges** here based
|
|
//! on F44 evidence (`captures/094-frida-buffered-separate-writer/frida-events.tsv`
|
|
//! line 145, `2026-04-25T21:40:34.222Z`): a `0x33` DataUpdate frame with
|
|
//! `record_count = 2` was observed in production-stack tracing, immediately
|
|
//! after a `Write.variantA` from a separate writer session against a buffered
|
|
//! subscription (`SetBufferedUpdateInterval(1000) + AddBufferedItem`). The two
|
|
//! per-record bodies have the same Int32 layout as the single-record case
|
|
//! (`status i32 + quality u16 + filetime i64 + wire_kind u8 + value`), and
|
|
//! `inner_length = 23 (preamble) + 2 * 19 (records) = 61` matches the envelope
|
|
//! field exactly. Since the per-record decoder is symmetric with
|
|
//! SubscriptionStatus, the DataUpdate parse path now loops over
|
|
//! `record_count` the same way the SubscriptionStatus path does. Records of
|
|
//! count 0 still return an error (a DataUpdate frame with no records is not
|
|
//! meaningful).
|
|
//!
|
|
//! See `docs/M6-buffered-evidence.md` for the per-capture decode summary that
|
|
//! produced this finding, and `design/70-risks-and-open-questions.md` R2 for
|
|
//! the contradiction history.
|
|
//!
|
|
//! ## Encoder/decoder asymmetry: array element width
|
|
//!
|
|
//! On the wire, the array header is `count u16 LE` at body+4 followed by
|
|
//! `element_width` at body+6. The decoder reads `element_width` as **`i32`
|
|
//! LE** (`NmxSubscriptionMessage.cs:264-265`); the encoder side (`write_message.rs`,
|
|
//! NOT this module) writes `u16/u16`. This asymmetry is real and intentional —
|
|
//! the decoder must accept whatever the native NMX service emits.
|
|
//!
|
|
//! ## Wire-kind table
|
|
//!
|
|
//! Scalar: `0x01` Boolean, `0x02` Int32, `0x03` Float32, `0x04` Float64,
|
|
//! `0x05` String, `0x06` DateTime, `0x07` ElapsedTime
|
|
//! (`NmxSubscriptionMessage.cs:165-176`).
|
|
//!
|
|
//! Array: `0x41` BoolArray, `0x42` Int32Array, `0x43` Float32Array,
|
|
//! `0x44` Float64Array, `0x45` StringArray, `0x46` DateTimeArray
|
|
//! (`NmxSubscriptionMessage.cs:268-277`). Note the encoder collapses
|
|
//! StringArray/DateTimeArray to `0x45`; the decoder keeps `0x46` as
|
|
//! DateTimeArray.
|
|
|
|
// Direct byte indexing — see reference_handle.rs for rationale (every byte
|
|
// access is preceded by an explicit length check; matches the .NET source's
|
|
// `BinaryPrimitives` calls 1:1 and is far more readable than `.get(n)?`).
|
|
#![allow(clippy::indexing_slicing)]
|
|
|
|
use crate::error::CodecError;
|
|
use crate::{MxValue, MxValueKind};
|
|
|
|
/// `0x32` — SubscriptionStatus. Per `NmxSubscriptionMessage.cs:36`.
|
|
pub const SUBSCRIPTION_STATUS_COMMAND: u8 = 0x32;
|
|
|
|
/// `0x33` — DataUpdate. Per `NmxSubscriptionMessage.cs:37`.
|
|
pub const DATA_UPDATE_COMMAND: u8 = 0x33;
|
|
|
|
/// 16-byte GUID with the .NET on-the-wire layout used by `new Guid(span)` /
|
|
/// `Guid.WriteToSpan` (data1 LE, data2 LE, data3 LE, then 8 raw bytes).
|
|
/// Mirrors `NmxSubscriptionMessage.cs:55,98` (the .NET `Guid(ReadOnlySpan<byte>)`
|
|
/// constructor consumes exactly 16 bytes in this layout).
|
|
///
|
|
/// Stored as the raw 16 bytes verbatim — the codec preserves whatever the
|
|
/// service emits and surfaces it to consumers as a stable identifier; no
|
|
/// interpretation is needed at the codec layer.
|
|
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
|
|
pub struct NmxGuid(pub [u8; 16]);
|
|
|
|
impl NmxGuid {
|
|
/// Encoded GUID size on the wire.
|
|
pub const ENCODED_LEN: usize = 16;
|
|
|
|
/// Construct from a 16-byte slice.
|
|
///
|
|
/// # Errors
|
|
///
|
|
/// Returns [`CodecError::ShortRead`] if `bytes.len() != 16`.
|
|
pub fn from_bytes(bytes: &[u8]) -> Result<Self, CodecError> {
|
|
if bytes.len() != Self::ENCODED_LEN {
|
|
return Err(CodecError::ShortRead {
|
|
expected: Self::ENCODED_LEN,
|
|
actual: bytes.len(),
|
|
});
|
|
}
|
|
let mut buf = [0u8; 16];
|
|
buf.copy_from_slice(bytes);
|
|
Ok(Self(buf))
|
|
}
|
|
}
|
|
|
|
/// One record from a [`NmxSubscriptionMessage`]. SubscriptionStatus records
|
|
/// carry `status`/`detail_status`; DataUpdate records leave both as `None`.
|
|
///
|
|
/// Per `NmxSubscriptionMessage.cs:12-26`.
|
|
#[derive(Debug, Clone, PartialEq)]
|
|
pub struct NmxSubscriptionRecord {
|
|
/// Status code — `i32` always present for both DataUpdate and
|
|
/// SubscriptionStatus records (`NmxSubscriptionMessage.cs:126-127` reads
|
|
/// it unconditionally).
|
|
pub status: i32,
|
|
/// Detail-status code — present for SubscriptionStatus (`0x32`) only;
|
|
/// `None` for DataUpdate (`NmxSubscriptionMessage.cs:129-134`).
|
|
pub detail_status: Option<i32>,
|
|
/// Quality bitfield (`NmxSubscriptionMessage.cs:136-137`).
|
|
pub quality: u16,
|
|
/// Windows FILETIME ticks (100ns units since 1601-01-01 UTC). Mirrors the
|
|
/// raw `i64` that the .NET reference passes to `DateTime.FromFileTimeUtc`
|
|
/// (`NmxSubscriptionMessage.cs:139-150`).
|
|
pub timestamp_filetime: i64,
|
|
/// Wire-kind tag from the body (`NmxSubscriptionMessage.cs:142`).
|
|
pub wire_kind: u8,
|
|
/// Decoded value, when the wire kind is recognized and the body is
|
|
/// well-formed. Malformed/unknown payloads (negative lengths, bad
|
|
/// element widths, unknown wire kinds) yield `None` — mirrors the .NET
|
|
/// `NmxCallbackValue.Value = null` paths (e.g. `NmxSubscriptionMessage.cs:182,199`).
|
|
pub value: Option<MxValue>,
|
|
/// Offset into the inner buffer where this record began.
|
|
pub offset: usize,
|
|
/// Total bytes this record consumed from the inner buffer.
|
|
pub length: usize,
|
|
}
|
|
|
|
/// Parsed `0x32`/`0x33` subscription callback message.
|
|
/// Per `NmxSubscriptionMessage.cs:28-34`.
|
|
#[derive(Debug, Clone, PartialEq)]
|
|
pub struct NmxSubscriptionMessage {
|
|
/// `0x32` SubscriptionStatus or `0x33` DataUpdate.
|
|
pub command: u8,
|
|
/// `version` field (`NmxSubscriptionMessage.cs:53`).
|
|
pub version: u16,
|
|
/// `record_count` field (`NmxSubscriptionMessage.cs:54`).
|
|
pub record_count: i32,
|
|
/// `operation_id` GUID (`NmxSubscriptionMessage.cs:55`).
|
|
pub operation_id: NmxGuid,
|
|
/// `item_correlation_id` GUID — present only on `0x32` SubscriptionStatus
|
|
/// (`NmxSubscriptionMessage.cs:98`); always `None` on `0x33` DataUpdate.
|
|
pub item_correlation_id: Option<NmxGuid>,
|
|
/// Decoded records.
|
|
pub records: Vec<NmxSubscriptionRecord>,
|
|
}
|
|
|
|
impl NmxSubscriptionMessage {
|
|
/// Length of the shared 23-byte preamble (`NmxSubscriptionMessage.cs:47`).
|
|
pub const PREAMBLE_LEN: usize = 23;
|
|
|
|
/// Length of the SubscriptionStatus header — preamble + 16-byte
|
|
/// correlation id (`NmxSubscriptionMessage.cs:93,99`).
|
|
pub const SUBSCRIPTION_STATUS_HEADER_LEN: usize = 39;
|
|
|
|
/// Parse the inner body (post-46-byte-envelope) of an NMX subscription
|
|
/// callback. Mirrors `NmxSubscriptionMessage.ParseInner`
|
|
/// (`NmxSubscriptionMessage.cs:45-63`).
|
|
///
|
|
/// # Errors
|
|
///
|
|
/// - [`CodecError::ShortRead`] if `inner.len() < 23`.
|
|
/// - [`CodecError::UnexpectedOpcode`] if the command byte is neither
|
|
/// `0x32` nor `0x33`.
|
|
/// - [`CodecError::Decode`] for protocol violations (truncated records,
|
|
/// `record_count <= 0`, etc.). Multi-record DataUpdate bodies are
|
|
/// accepted — see the module-level "Multi-record DataUpdate" note.
|
|
pub fn parse_inner(inner: &[u8]) -> Result<Self, CodecError> {
|
|
if inner.len() < Self::PREAMBLE_LEN {
|
|
return Err(CodecError::ShortRead {
|
|
expected: Self::PREAMBLE_LEN,
|
|
actual: inner.len(),
|
|
});
|
|
}
|
|
|
|
let command = inner[0];
|
|
let version = read_u16_le(inner, 1);
|
|
let record_count = read_i32_le(inner, 3);
|
|
let operation_id = NmxGuid::from_bytes(&inner[7..23])?;
|
|
|
|
match command {
|
|
SUBSCRIPTION_STATUS_COMMAND => {
|
|
parse_subscription_status(inner, version, record_count, operation_id)
|
|
}
|
|
DATA_UPDATE_COMMAND => parse_data_update(inner, version, record_count, operation_id),
|
|
_ => Err(CodecError::UnexpectedOpcode(command)),
|
|
}
|
|
}
|
|
|
|
/// Peel the `ProcessDataReceived` envelope and parse the inner
|
|
/// subscription body. Mirrors the .NET reference's
|
|
/// `NmxSubscriptionMessage.ParseProcessDataReceivedBody`
|
|
/// (the wire-side path used by `MxNativeSession.OnCallbackReceived`
|
|
/// at `cs:593`).
|
|
///
|
|
/// Inbound NMX callbacks arrive as a wire envelope (46-byte header,
|
|
/// optionally with a 4-byte total-length prefix), inside which sits
|
|
/// the 23-byte preamble + records body that
|
|
/// [`Self::parse_inner`] knows how to decode. Calling `parse_inner`
|
|
/// directly on the wire bytes — which the router used to do — would
|
|
/// fail because the first 46 bytes are envelope, not preamble.
|
|
///
|
|
/// # Errors
|
|
///
|
|
/// - [`CodecError::ShortRead`] / [`CodecError::InnerLengthMismatch`]
|
|
/// surfaced from the envelope parse.
|
|
/// - Any error from [`Self::parse_inner`] on the inner body.
|
|
pub fn try_parse_process_data_received_body(body: &[u8]) -> Result<Self, CodecError> {
|
|
let envelope = crate::NmxObservedEnvelope::parse_process_data_received_body_flexible(body)?;
|
|
Self::parse_inner(&envelope.inner_body)
|
|
}
|
|
}
|
|
|
|
/// `0x33` DataUpdate. Mirrors `NmxSubscriptionMessage.ParseDataUpdate`
|
|
/// (`NmxSubscriptionMessage.cs:65-85`) but loops over `record_count` to
|
|
/// support the multi-record bodies F44 documented from
|
|
/// `captures/094-frida-buffered-separate-writer/frida-events.tsv:145`. The
|
|
/// .NET reference still hard-throws on `record_count != 1`; the Rust codec
|
|
/// diverges here for production safety. See module-level "Multi-record
|
|
/// DataUpdate" comment.
|
|
fn parse_data_update(
|
|
inner: &[u8],
|
|
version: u16,
|
|
record_count: i32,
|
|
operation_id: NmxGuid,
|
|
) -> Result<NmxSubscriptionMessage, CodecError> {
|
|
// record_count <= 0 has no meaningful interpretation for DataUpdate. Reject
|
|
// explicitly so consumers don't silently get an empty Vec when the wire
|
|
// produced a malformed count.
|
|
if record_count <= 0 {
|
|
return Err(CodecError::Decode {
|
|
offset: 3,
|
|
reason: "DataUpdate record_count must be >= 1",
|
|
buffer_len: inner.len(),
|
|
});
|
|
}
|
|
|
|
// Records start immediately after the 23-byte preamble — DataUpdate has
|
|
// no correlation id (`NmxSubscriptionMessage.cs:76-77`).
|
|
let count = record_count as usize;
|
|
let mut offset = NmxSubscriptionMessage::PREAMBLE_LEN;
|
|
let mut records = Vec::with_capacity(count);
|
|
for _ in 0..count {
|
|
let record = parse_record(inner, offset, false)?;
|
|
offset += record.length;
|
|
records.push(record);
|
|
}
|
|
|
|
Ok(NmxSubscriptionMessage {
|
|
command: DATA_UPDATE_COMMAND,
|
|
version,
|
|
record_count,
|
|
operation_id,
|
|
item_correlation_id: None,
|
|
records,
|
|
})
|
|
}
|
|
|
|
/// `0x32` SubscriptionStatus. Mirrors
|
|
/// `NmxSubscriptionMessage.ParseSubscriptionStatus`
|
|
/// (`NmxSubscriptionMessage.cs:87-115`).
|
|
fn parse_subscription_status(
|
|
inner: &[u8],
|
|
version: u16,
|
|
record_count: i32,
|
|
operation_id: NmxGuid,
|
|
) -> Result<NmxSubscriptionMessage, CodecError> {
|
|
if inner.len() < NmxSubscriptionMessage::SUBSCRIPTION_STATUS_HEADER_LEN {
|
|
return Err(CodecError::ShortRead {
|
|
expected: NmxSubscriptionMessage::SUBSCRIPTION_STATUS_HEADER_LEN,
|
|
actual: inner.len(),
|
|
});
|
|
}
|
|
|
|
let item_correlation_id = NmxGuid::from_bytes(&inner[23..39])?;
|
|
let mut offset = NmxSubscriptionMessage::SUBSCRIPTION_STATUS_HEADER_LEN;
|
|
// `record_count` is `i32` on the wire; clamp negatives to zero. The .NET
|
|
// for-loop `for (int i = 0; i < recordCount; i++)` also yields zero
|
|
// iterations for negative counts (`NmxSubscriptionMessage.cs:101`).
|
|
let count = if record_count < 0 {
|
|
0usize
|
|
} else {
|
|
record_count as usize
|
|
};
|
|
let mut records = Vec::with_capacity(count);
|
|
for _ in 0..count {
|
|
let record = parse_record(inner, offset, true)?;
|
|
offset += record.length;
|
|
records.push(record);
|
|
}
|
|
|
|
Ok(NmxSubscriptionMessage {
|
|
command: SUBSCRIPTION_STATUS_COMMAND,
|
|
version,
|
|
record_count,
|
|
operation_id,
|
|
item_correlation_id: Some(item_correlation_id),
|
|
records,
|
|
})
|
|
}
|
|
|
|
/// Parse a single record. Mirrors `NmxSubscriptionMessage.ParseRecord`
|
|
/// (`NmxSubscriptionMessage.cs:117-155`). When `has_detail_status` is true the
|
|
/// record begins with `status i32 + detail_status i32`; otherwise neither is
|
|
/// present.
|
|
fn parse_record(
|
|
body: &[u8],
|
|
offset: usize,
|
|
has_detail_status: bool,
|
|
) -> Result<NmxSubscriptionRecord, CodecError> {
|
|
// Minimum length is 19 with detail-status (status + detail_status +
|
|
// quality + timestamp + wire_kind = 4+4+2+8+1) and 15 without
|
|
// (`NmxSubscriptionMessage.cs:119`). We additionally require the
|
|
// wire-kind byte itself to be present (`body[offset++]` at line 142).
|
|
let minimum_length = if has_detail_status { 19 } else { 15 };
|
|
if offset + minimum_length > body.len() {
|
|
return Err(CodecError::Decode {
|
|
offset,
|
|
reason: "subscription record truncated before fixed header",
|
|
buffer_len: body.len(),
|
|
});
|
|
}
|
|
|
|
let start = offset;
|
|
let mut cursor = offset;
|
|
|
|
// `status: i32` is read unconditionally for both DataUpdate and
|
|
// SubscriptionStatus records (`NmxSubscriptionMessage.cs:126-127`).
|
|
//
|
|
// FOLLOW-UP (M1 wave-1 audit): An earlier port draft conditionally read
|
|
// `status` only when `has_detail_status=true`, then required min length 15
|
|
// for DataUpdate without consuming the leading 4 bytes — leaving them to
|
|
// be misread as `quality`. Verified fixed here; if any other codec agent
|
|
// applied the same `hasDetailStatus`-gated conditional read pattern,
|
|
// re-audit. Min lengths are 15 (DataUpdate, status+quality+filetime+kind)
|
|
// and 19 (SubscriptionStatus, +detail_status). See
|
|
// `design/70-risks-and-open-questions.md` "M1 hasDetailStatus audit"
|
|
// follow-up entry.
|
|
let status = read_i32_le(body, cursor);
|
|
cursor += 4;
|
|
|
|
let detail_status = if has_detail_status {
|
|
let d = read_i32_le(body, cursor);
|
|
cursor += 4;
|
|
Some(d)
|
|
} else {
|
|
None
|
|
};
|
|
|
|
let quality = read_u16_le(body, cursor);
|
|
cursor += 2;
|
|
|
|
let timestamp_filetime = read_i64_le(body, cursor);
|
|
cursor += 8;
|
|
|
|
let wire_kind = body[cursor];
|
|
cursor += 1;
|
|
|
|
let (value, encoded_len) = decode_value(wire_kind, &body[cursor..]);
|
|
cursor += encoded_len;
|
|
|
|
Ok(NmxSubscriptionRecord {
|
|
status,
|
|
detail_status,
|
|
quality,
|
|
timestamp_filetime,
|
|
wire_kind,
|
|
value,
|
|
offset: start,
|
|
length: cursor - start,
|
|
})
|
|
}
|
|
|
|
/// Decode a value following a wire-kind byte. Returns `(decoded, bytes_consumed)`.
|
|
/// Mirrors `NmxSubscriptionMessage.DecodeValue` (`NmxSubscriptionMessage.cs:157-176`)
|
|
/// and the per-kind helpers below it.
|
|
///
|
|
/// On any short / malformed payload returns `(None, 0)` — matching the .NET
|
|
/// behaviour where `NmxCallbackValue.Value` is null and `EncodedLength` is 0.
|
|
/// (Subsequent records following a malformed value are unrecoverable; the
|
|
/// .NET reference exhibits the same property.)
|
|
fn decode_value(wire_kind: u8, body: &[u8]) -> (Option<MxValue>, usize) {
|
|
if body.is_empty() {
|
|
return (None, 0);
|
|
}
|
|
|
|
match wire_kind {
|
|
// 0x01 Boolean — single byte, non-zero is true (`NmxSubscriptionMessage.cs:166`).
|
|
0x01 if !body.is_empty() => (Some(MxValue::Boolean(body[0] != 0)), 1),
|
|
// 0x02 Int32 (`NmxSubscriptionMessage.cs:167`).
|
|
0x02 if body.len() >= 4 => (Some(MxValue::Int32(read_i32_le(body, 0))), 4),
|
|
// 0x03 Float32 — bit-cast i32->f32 mirrors `Int32BitsToSingle`
|
|
// (`NmxSubscriptionMessage.cs:168`).
|
|
0x03 if body.len() >= 4 => {
|
|
let bits = read_i32_le(body, 0);
|
|
(Some(MxValue::Float32(f32::from_bits(bits as u32))), 4)
|
|
}
|
|
// 0x04 Float64 — bit-cast i64->f64 (`NmxSubscriptionMessage.cs:169`).
|
|
0x04 if body.len() >= 8 => {
|
|
let bits = read_i64_le(body, 0);
|
|
(Some(MxValue::Float64(f64::from_bits(bits as u64))), 8)
|
|
}
|
|
// 0x05 String (`NmxSubscriptionMessage.cs:170`).
|
|
0x05 => decode_string_value(body),
|
|
// 0x06 DateTime (`NmxSubscriptionMessage.cs:171`).
|
|
0x06 => decode_datetime_value(body),
|
|
// 0x07 ElapsedTime (`NmxSubscriptionMessage.cs:172`).
|
|
0x07 => decode_elapsed_time_value(body),
|
|
// Arrays 0x41..0x46 (`NmxSubscriptionMessage.cs:173`).
|
|
0x41..=0x46 => decode_array_value(wire_kind, body),
|
|
// Unknown / malformed: matches the `_ => new NmxCallbackValue(...
|
|
// EncodedLength=0)` arms in the .NET source.
|
|
_ => (None, 0),
|
|
}
|
|
}
|
|
|
|
/// Decode a string body. Mirrors `DecodeStringValue`
|
|
/// (`NmxSubscriptionMessage.cs:178-210`).
|
|
///
|
|
/// Layout: `record_length i32 + text_byte_length i32 + utf16le bytes`. The
|
|
/// degenerate `record_length == 4` case represents an empty string and
|
|
/// consumes exactly 4 bytes (`NmxSubscriptionMessage.cs:186-189`). Otherwise
|
|
/// the trailing two-byte UTF-16 NUL is stripped if present
|
|
/// (`NmxSubscriptionMessage.cs:203-206`).
|
|
fn decode_string_value(body: &[u8]) -> (Option<MxValue>, usize) {
|
|
if body.len() < 4 {
|
|
return (None, 0);
|
|
}
|
|
let record_length = read_i32_le(body, 0);
|
|
if record_length == 4 {
|
|
return (Some(MxValue::String(String::new())), 4);
|
|
}
|
|
if body.len() < 8 {
|
|
return (None, 0);
|
|
}
|
|
let text_byte_length = read_i32_le(body, 4);
|
|
// .NET checks: `recordLength < 8 || textByteLength < 0 ||
|
|
// recordLength != textByteLength + 4 || body.Length < 8 + textByteLength`
|
|
// (`NmxSubscriptionMessage.cs:197`).
|
|
if record_length < 8 || text_byte_length < 0 {
|
|
return (None, 0);
|
|
}
|
|
let text_byte_length_us = text_byte_length as usize;
|
|
if record_length as usize != text_byte_length_us + 4 || body.len() < 8 + text_byte_length_us {
|
|
return (None, 0);
|
|
}
|
|
|
|
let mut text_bytes = &body[8..8 + text_byte_length_us];
|
|
// Strip optional UTF-16LE NUL terminator.
|
|
if text_bytes.len() >= 2
|
|
&& text_bytes[text_bytes.len() - 2] == 0
|
|
&& text_bytes[text_bytes.len() - 1] == 0
|
|
{
|
|
text_bytes = &text_bytes[..text_bytes.len() - 2];
|
|
}
|
|
let value = decode_utf16_le_lossy(text_bytes);
|
|
(Some(MxValue::String(value)), 8 + text_byte_length_us)
|
|
}
|
|
|
|
/// Decode a DateTime body. Mirrors `DecodeDateTimeValue`
|
|
/// (`NmxSubscriptionMessage.cs:212-243`).
|
|
///
|
|
/// Two shapes exist on the wire:
|
|
/// 1. `record_length i32 + filetime i64 (+ trailer)` — used when
|
|
/// `body.len() >= 14` (`record_length >= 10`); consumes
|
|
/// `4 + record_length` bytes.
|
|
/// 2. Bare `filetime i64` — fallback when the framed shape doesn't fit;
|
|
/// consumes 8 bytes.
|
|
fn decode_datetime_value(body: &[u8]) -> (Option<MxValue>, usize) {
|
|
if body.len() >= 14 {
|
|
let record_length = read_i32_le(body, 0);
|
|
if record_length >= 10 && body.len() >= 4 + record_length as usize {
|
|
let file_time = read_i64_le(body, 4);
|
|
// The .NET reference returns `Value = null` when the FILETIME
|
|
// is out of range (`NmxSubscriptionMessage.cs:229`) but still
|
|
// consumes `4 + record_length` bytes. We carry the raw FILETIME
|
|
// verbatim — the codec preserves the wire value and lets the
|
|
// higher layer judge validity.
|
|
return (
|
|
Some(MxValue::DateTime(file_time)),
|
|
4 + record_length as usize,
|
|
);
|
|
}
|
|
}
|
|
if body.len() >= 8 {
|
|
let file_time = read_i64_le(body, 0);
|
|
return (Some(MxValue::DateTime(file_time)), 8);
|
|
}
|
|
(None, 0)
|
|
}
|
|
|
|
/// Decode an ElapsedTime body. Mirrors `DecodeElapsedTimeValue`
|
|
/// (`NmxSubscriptionMessage.cs:245-254`).
|
|
///
|
|
/// **Wire is signed i32 milliseconds** (`NmxSubscriptionMessage.cs:252` reads
|
|
/// `BinaryPrimitives.ReadInt32LittleEndian`). Negative values are valid and
|
|
/// must round-trip — Rust `Duration` is unsigned so we widen to `i64` ms in
|
|
/// `MxValue::ElapsedTime` (lib.rs:73-74).
|
|
fn decode_elapsed_time_value(body: &[u8]) -> (Option<MxValue>, usize) {
|
|
if body.len() < 4 {
|
|
return (None, 0);
|
|
}
|
|
let milliseconds = read_i32_le(body, 0);
|
|
(Some(MxValue::ElapsedTime(milliseconds as i64)), 4)
|
|
}
|
|
|
|
/// Decode an array body. Mirrors `DecodeArrayValue`
|
|
/// (`NmxSubscriptionMessage.cs:256-278`).
|
|
///
|
|
/// Header layout (per the **decoder**, see module-level note about asymmetry):
|
|
/// `unknown 4 bytes + count u16 LE @+4 + element_width i32 LE @+6 + values`.
|
|
/// Total header = 10 bytes (`NmxSubscriptionMessage.cs:258`). The first 4
|
|
/// bytes appear to be a record-length / record-kind framing field; the .NET
|
|
/// reference does not interpret them and neither do we — they pass through
|
|
/// the consumed-byte accounting via the fixed `arrayHeaderLength = 10`.
|
|
fn decode_array_value(wire_kind: u8, body: &[u8]) -> (Option<MxValue>, usize) {
|
|
const ARRAY_HEADER_LEN: usize = 10;
|
|
if body.len() < ARRAY_HEADER_LEN {
|
|
return (None, 0);
|
|
}
|
|
let count = read_u16_le(body, 4) as usize;
|
|
// Decoder reads element_width as i32 LE — the encoder writes u16/u16 but
|
|
// the wire emitted by NmxSvc puts an i32 here. (`NmxSubscriptionMessage.cs:265`.)
|
|
let element_width = read_i32_le(body, 6);
|
|
let values = &body[ARRAY_HEADER_LEN..];
|
|
|
|
match wire_kind {
|
|
0x41 => decode_bool_array(count, element_width, values),
|
|
0x42 => decode_int32_array(count, element_width, values),
|
|
0x43 => decode_float32_array(count, element_width, values),
|
|
0x44 => decode_float64_array(count, element_width, values),
|
|
0x45 => decode_string_array(count, values),
|
|
0x46 => decode_datetime_array(count, element_width, values),
|
|
// Unreachable given the guard in `decode_value`, but keep it total.
|
|
_ => (None, 0),
|
|
}
|
|
}
|
|
|
|
/// Bool array decoder — element width must be `sizeof(short) == 2`, elements
|
|
/// are `i16` LE where any non-zero value is true. Per the .NET reference the
|
|
/// wire encoding is `-1`/`0` (`NmxSubscriptionMessage.cs:280-294`).
|
|
fn decode_bool_array(count: usize, element_width: i32, values: &[u8]) -> (Option<MxValue>, usize) {
|
|
if element_width != 2 {
|
|
return (None, 0);
|
|
}
|
|
let needed = count.saturating_mul(2);
|
|
if values.len() < needed {
|
|
return (None, 0);
|
|
}
|
|
let mut out = Vec::with_capacity(count);
|
|
for i in 0..count {
|
|
// Boolean elements are i16 (`-1`/`0` in practice); `!= 0` covers both.
|
|
let raw = read_i16_le(values, i * 2);
|
|
out.push(raw != 0);
|
|
}
|
|
(Some(MxValue::BoolArray(out)), 10 + count * 2)
|
|
}
|
|
|
|
/// Int32 array decoder. Per `NmxSubscriptionMessage.cs:296-310`.
|
|
fn decode_int32_array(count: usize, element_width: i32, values: &[u8]) -> (Option<MxValue>, usize) {
|
|
if element_width != 4 {
|
|
return (None, 0);
|
|
}
|
|
let needed = count.saturating_mul(4);
|
|
if values.len() < needed {
|
|
return (None, 0);
|
|
}
|
|
let mut out = Vec::with_capacity(count);
|
|
for i in 0..count {
|
|
out.push(read_i32_le(values, i * 4));
|
|
}
|
|
(Some(MxValue::Int32Array(out)), 10 + count * 4)
|
|
}
|
|
|
|
/// Float32 array decoder. Per `NmxSubscriptionMessage.cs:312-326`.
|
|
fn decode_float32_array(
|
|
count: usize,
|
|
element_width: i32,
|
|
values: &[u8],
|
|
) -> (Option<MxValue>, usize) {
|
|
if element_width != 4 {
|
|
return (None, 0);
|
|
}
|
|
let needed = count.saturating_mul(4);
|
|
if values.len() < needed {
|
|
return (None, 0);
|
|
}
|
|
let mut out = Vec::with_capacity(count);
|
|
for i in 0..count {
|
|
let bits = read_i32_le(values, i * 4);
|
|
out.push(f32::from_bits(bits as u32));
|
|
}
|
|
(Some(MxValue::Float32Array(out)), 10 + count * 4)
|
|
}
|
|
|
|
/// Float64 array decoder. Per `NmxSubscriptionMessage.cs:328-342`.
|
|
fn decode_float64_array(
|
|
count: usize,
|
|
element_width: i32,
|
|
values: &[u8],
|
|
) -> (Option<MxValue>, usize) {
|
|
if element_width != 8 {
|
|
return (None, 0);
|
|
}
|
|
let needed = count.saturating_mul(8);
|
|
if values.len() < needed {
|
|
return (None, 0);
|
|
}
|
|
let mut out = Vec::with_capacity(count);
|
|
for i in 0..count {
|
|
let bits = read_i64_le(values, i * 8);
|
|
out.push(f64::from_bits(bits as u64));
|
|
}
|
|
(Some(MxValue::Float64Array(out)), 10 + count * 8)
|
|
}
|
|
|
|
/// DateTime array decoder. Per `NmxSubscriptionMessage.cs:344-359`.
|
|
/// Element width is **12** on the wire (FILETIME i64 + 4 bytes of padding /
|
|
/// trailer); we read the leading 8 bytes as the FILETIME and skip the rest.
|
|
fn decode_datetime_array(
|
|
count: usize,
|
|
element_width: i32,
|
|
values: &[u8],
|
|
) -> (Option<MxValue>, usize) {
|
|
if element_width != 12 {
|
|
return (None, 0);
|
|
}
|
|
let needed = count.saturating_mul(12);
|
|
if values.len() < needed {
|
|
return (None, 0);
|
|
}
|
|
let mut out = Vec::with_capacity(count);
|
|
for i in 0..count {
|
|
// Only the leading 8 bytes are interpreted; the trailing 4 bytes
|
|
// are not consumed by the .NET reference either (it calls
|
|
// `BinaryPrimitives.ReadInt64LittleEndian` on the slice's first
|
|
// 8 bytes, `NmxSubscriptionMessage.cs:354`).
|
|
let file_time = read_i64_le(values, i * 12);
|
|
out.push(file_time);
|
|
}
|
|
(Some(MxValue::DateTimeArray(out)), 10 + count * 12)
|
|
}
|
|
|
|
/// String array decoder. Per `NmxSubscriptionMessage.cs:361-392`.
|
|
///
|
|
/// Each element is `record_length i32 + element_kind u8 (must be 0x05) +
|
|
/// text_record_length i32 + text_byte_length i32 + utf16le bytes`. The
|
|
/// element header is 13 bytes. The whole array consumes
|
|
/// `10 + sum(13 + text_byte_length)` bytes.
|
|
fn decode_string_array(count: usize, values: &[u8]) -> (Option<MxValue>, usize) {
|
|
let mut out = Vec::with_capacity(count);
|
|
let mut offset = 0usize;
|
|
for _ in 0..count {
|
|
if offset + 13 > values.len() {
|
|
return (None, 0);
|
|
}
|
|
let record_length = read_i32_le(values, offset);
|
|
let element_kind = values[offset + 4];
|
|
let text_record_length = read_i32_le(values, offset + 5);
|
|
let text_byte_length = read_i32_le(values, offset + 9);
|
|
|
|
// .NET checks: `recordLength < 9 || elementKind != 0x05 ||
|
|
// textRecordLength != textByteLength + sizeof(int) ||
|
|
// recordLength != 1 + sizeof(int) + sizeof(int) + textByteLength ||
|
|
// offset + 13 + textByteLength > values.Length`
|
|
// (`NmxSubscriptionMessage.cs:376`).
|
|
if record_length < 9
|
|
|| element_kind != 0x05
|
|
|| text_byte_length < 0
|
|
|| text_record_length != text_byte_length + 4
|
|
|| record_length != 1 + 4 + 4 + text_byte_length
|
|
{
|
|
return (None, 0);
|
|
}
|
|
let text_byte_length_us = text_byte_length as usize;
|
|
if offset + 13 + text_byte_length_us > values.len() {
|
|
return (None, 0);
|
|
}
|
|
|
|
let mut text_bytes = &values[offset + 13..offset + 13 + text_byte_length_us];
|
|
if text_bytes.len() >= 2
|
|
&& text_bytes[text_bytes.len() - 2] == 0
|
|
&& text_bytes[text_bytes.len() - 1] == 0
|
|
{
|
|
text_bytes = &text_bytes[..text_bytes.len() - 2];
|
|
}
|
|
out.push(decode_utf16_le_lossy(text_bytes));
|
|
offset += 13 + text_byte_length_us;
|
|
}
|
|
(Some(MxValue::StringArray(out)), 10 + offset)
|
|
}
|
|
|
|
/// Map a wire-kind byte to its [`MxValueKind`] without decoding the payload.
|
|
/// Mirrors `ToValueKindOrNull` (`NmxSubscriptionMessage.cs:394-413`).
|
|
pub fn wire_kind_to_value_kind(wire_kind: u8) -> Option<MxValueKind> {
|
|
Some(match wire_kind {
|
|
0x01 => MxValueKind::Boolean,
|
|
0x02 => MxValueKind::Int32,
|
|
0x03 => MxValueKind::Float32,
|
|
0x04 => MxValueKind::Float64,
|
|
0x05 => MxValueKind::String,
|
|
0x06 => MxValueKind::DateTime,
|
|
0x07 => MxValueKind::ElapsedTime,
|
|
0x41 => MxValueKind::BoolArray,
|
|
0x42 => MxValueKind::Int32Array,
|
|
0x43 => MxValueKind::Float32Array,
|
|
0x44 => MxValueKind::Float64Array,
|
|
0x45 => MxValueKind::StringArray,
|
|
0x46 => MxValueKind::DateTimeArray,
|
|
_ => return None,
|
|
})
|
|
}
|
|
|
|
/// Decode UTF-16LE bytes lossily — invalid sequences become U+FFFD. Mirrors
|
|
/// `Encoding.Unicode.GetString` (`NmxSubscriptionMessage.cs:208,387`) which
|
|
/// is also lossy on malformed surrogates.
|
|
fn decode_utf16_le_lossy(bytes: &[u8]) -> String {
|
|
let units: Vec<u16> = bytes
|
|
.chunks_exact(2)
|
|
.map(|c| u16::from_le_bytes([c[0], c[1]]))
|
|
.collect();
|
|
String::from_utf16_lossy(&units)
|
|
}
|
|
|
|
#[inline]
|
|
fn read_u16_le(bytes: &[u8], offset: usize) -> u16 {
|
|
u16::from_le_bytes([bytes[offset], bytes[offset + 1]])
|
|
}
|
|
|
|
#[inline]
|
|
fn read_i16_le(bytes: &[u8], offset: usize) -> i16 {
|
|
i16::from_le_bytes([bytes[offset], bytes[offset + 1]])
|
|
}
|
|
|
|
#[inline]
|
|
fn read_i32_le(bytes: &[u8], offset: usize) -> i32 {
|
|
i32::from_le_bytes([
|
|
bytes[offset],
|
|
bytes[offset + 1],
|
|
bytes[offset + 2],
|
|
bytes[offset + 3],
|
|
])
|
|
}
|
|
|
|
#[inline]
|
|
fn read_i64_le(bytes: &[u8], offset: usize) -> i64 {
|
|
i64::from_le_bytes([
|
|
bytes[offset],
|
|
bytes[offset + 1],
|
|
bytes[offset + 2],
|
|
bytes[offset + 3],
|
|
bytes[offset + 4],
|
|
bytes[offset + 5],
|
|
bytes[offset + 6],
|
|
bytes[offset + 7],
|
|
])
|
|
}
|
|
|
|
#[cfg(test)]
|
|
#[allow(
|
|
clippy::unwrap_used,
|
|
clippy::expect_used,
|
|
clippy::indexing_slicing,
|
|
clippy::panic
|
|
)]
|
|
mod tests {
|
|
use super::*;
|
|
|
|
/// Sample 16-byte GUID for hand-crafted bodies.
|
|
const OPERATION_ID_BYTES: [u8; 16] = [
|
|
0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e, 0x0f,
|
|
0x10,
|
|
];
|
|
const CORRELATION_ID_BYTES: [u8; 16] = [
|
|
0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1a, 0x1b, 0x1c, 0x1d, 0x1e, 0x1f,
|
|
0x20,
|
|
];
|
|
|
|
/// Build a DataUpdate (`0x33`) body with the given record bytes appended.
|
|
fn data_update_body(record_count: i32, record: &[u8]) -> Vec<u8> {
|
|
let mut out = Vec::with_capacity(23 + record.len());
|
|
out.push(DATA_UPDATE_COMMAND);
|
|
out.extend_from_slice(&1u16.to_le_bytes()); // version
|
|
out.extend_from_slice(&record_count.to_le_bytes());
|
|
out.extend_from_slice(&OPERATION_ID_BYTES);
|
|
out.extend_from_slice(record);
|
|
out
|
|
}
|
|
|
|
/// Build a SubscriptionStatus (`0x32`) body.
|
|
fn subscription_status_body(record_count: i32, records: &[u8]) -> Vec<u8> {
|
|
let mut out = Vec::with_capacity(39 + records.len());
|
|
out.push(SUBSCRIPTION_STATUS_COMMAND);
|
|
out.extend_from_slice(&1u16.to_le_bytes());
|
|
out.extend_from_slice(&record_count.to_le_bytes());
|
|
out.extend_from_slice(&OPERATION_ID_BYTES);
|
|
out.extend_from_slice(&CORRELATION_ID_BYTES);
|
|
out.extend_from_slice(records);
|
|
out
|
|
}
|
|
|
|
/// DataUpdate record: `status(4) + quality(2) + filetime(8) + wire_kind(1)
|
|
/// + value` — 15-byte fixed prefix per `NmxSubscriptionMessage.cs:119,126-143`
|
|
/// (status is read unconditionally; detail_status is the only field
|
|
/// gated on hasDetailStatus).
|
|
fn data_record(quality: u16, filetime: i64, wire_kind: u8, value: &[u8]) -> Vec<u8> {
|
|
data_record_with_status(0, quality, filetime, wire_kind, value)
|
|
}
|
|
|
|
fn data_record_with_status(
|
|
status: i32,
|
|
quality: u16,
|
|
filetime: i64,
|
|
wire_kind: u8,
|
|
value: &[u8],
|
|
) -> Vec<u8> {
|
|
let mut out = Vec::with_capacity(15 + value.len());
|
|
out.extend_from_slice(&status.to_le_bytes());
|
|
out.extend_from_slice(&quality.to_le_bytes());
|
|
out.extend_from_slice(&filetime.to_le_bytes());
|
|
out.push(wire_kind);
|
|
out.extend_from_slice(value);
|
|
out
|
|
}
|
|
|
|
/// SubscriptionStatus record: `status(4) + detail_status(4) + quality(2) +
|
|
/// filetime(8) + wire_kind(1) + value`.
|
|
fn status_record(
|
|
status: i32,
|
|
detail_status: i32,
|
|
quality: u16,
|
|
filetime: i64,
|
|
wire_kind: u8,
|
|
value: &[u8],
|
|
) -> Vec<u8> {
|
|
let mut out = Vec::with_capacity(19 + value.len());
|
|
out.extend_from_slice(&status.to_le_bytes());
|
|
out.extend_from_slice(&detail_status.to_le_bytes());
|
|
out.extend_from_slice(&quality.to_le_bytes());
|
|
out.extend_from_slice(&filetime.to_le_bytes());
|
|
out.push(wire_kind);
|
|
out.extend_from_slice(value);
|
|
out
|
|
}
|
|
|
|
#[test]
|
|
fn data_update_boolean_round_trip() {
|
|
let rec = data_record(0x00C0, 132_000_000_000, 0x01, &[0x01]);
|
|
let body = data_update_body(1, &rec);
|
|
let msg = NmxSubscriptionMessage::parse_inner(&body).unwrap();
|
|
assert_eq!(msg.command, DATA_UPDATE_COMMAND);
|
|
assert_eq!(msg.record_count, 1);
|
|
assert!(msg.item_correlation_id.is_none());
|
|
assert_eq!(msg.operation_id.0, OPERATION_ID_BYTES);
|
|
assert_eq!(msg.records.len(), 1);
|
|
let r = &msg.records[0];
|
|
assert_eq!(r.status, 0);
|
|
assert_eq!(r.detail_status, None);
|
|
assert_eq!(r.quality, 0x00C0);
|
|
assert_eq!(r.timestamp_filetime, 132_000_000_000);
|
|
assert_eq!(r.wire_kind, 0x01);
|
|
assert_eq!(r.value, Some(MxValue::Boolean(true)));
|
|
}
|
|
|
|
#[test]
|
|
fn data_update_int32() {
|
|
let mut payload = Vec::new();
|
|
payload.extend_from_slice(&42i32.to_le_bytes());
|
|
let rec = data_record(0x00C0, 0, 0x02, &payload);
|
|
let body = data_update_body(1, &rec);
|
|
let msg = NmxSubscriptionMessage::parse_inner(&body).unwrap();
|
|
assert_eq!(msg.records[0].value, Some(MxValue::Int32(42)));
|
|
}
|
|
|
|
#[test]
|
|
fn data_update_float32() {
|
|
let mut payload = Vec::new();
|
|
payload.extend_from_slice(&1.5f32.to_le_bytes());
|
|
let rec = data_record(0x00C0, 0, 0x03, &payload);
|
|
let body = data_update_body(1, &rec);
|
|
let msg = NmxSubscriptionMessage::parse_inner(&body).unwrap();
|
|
assert_eq!(msg.records[0].value, Some(MxValue::Float32(1.5)));
|
|
}
|
|
|
|
#[test]
|
|
fn data_update_float64() {
|
|
let mut payload = Vec::new();
|
|
payload.extend_from_slice(&3.25f64.to_le_bytes());
|
|
let rec = data_record(0x00C0, 0, 0x04, &payload);
|
|
let body = data_update_body(1, &rec);
|
|
let msg = NmxSubscriptionMessage::parse_inner(&body).unwrap();
|
|
assert_eq!(msg.records[0].value, Some(MxValue::Float64(3.25)));
|
|
}
|
|
|
|
#[test]
|
|
fn data_update_string() {
|
|
// "Hi" UTF-16LE = [0x48, 0x00, 0x69, 0x00] then NUL [0x00, 0x00] = 6 bytes.
|
|
let utf16 = [0x48, 0x00, 0x69, 0x00, 0x00, 0x00];
|
|
let text_byte_length: i32 = utf16.len() as i32;
|
|
let record_length: i32 = text_byte_length + 4;
|
|
let mut payload = Vec::new();
|
|
payload.extend_from_slice(&record_length.to_le_bytes());
|
|
payload.extend_from_slice(&text_byte_length.to_le_bytes());
|
|
payload.extend_from_slice(&utf16);
|
|
let rec = data_record(0x00C0, 0, 0x05, &payload);
|
|
let body = data_update_body(1, &rec);
|
|
let msg = NmxSubscriptionMessage::parse_inner(&body).unwrap();
|
|
assert_eq!(
|
|
msg.records[0].value,
|
|
Some(MxValue::String("Hi".to_string()))
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn data_update_string_empty() {
|
|
// record_length == 4 indicates empty string; only 4 bytes consumed.
|
|
let mut payload = Vec::new();
|
|
payload.extend_from_slice(&4i32.to_le_bytes());
|
|
let rec = data_record(0x00C0, 0, 0x05, &payload);
|
|
let body = data_update_body(1, &rec);
|
|
let msg = NmxSubscriptionMessage::parse_inner(&body).unwrap();
|
|
assert_eq!(msg.records[0].value, Some(MxValue::String(String::new())));
|
|
}
|
|
|
|
#[test]
|
|
fn data_update_datetime_framed() {
|
|
// Framed: record_length(4) + filetime(8) + 2 trailer bytes => 14 byte body.
|
|
let file_time: i64 = 132_500_000_000;
|
|
let record_length: i32 = 10;
|
|
let mut payload = Vec::new();
|
|
payload.extend_from_slice(&record_length.to_le_bytes());
|
|
payload.extend_from_slice(&file_time.to_le_bytes());
|
|
payload.extend_from_slice(&[0x00, 0x00]); // trailer
|
|
let rec = data_record(0x00C0, 0, 0x06, &payload);
|
|
let body = data_update_body(1, &rec);
|
|
let msg = NmxSubscriptionMessage::parse_inner(&body).unwrap();
|
|
assert_eq!(msg.records[0].value, Some(MxValue::DateTime(file_time)));
|
|
}
|
|
|
|
#[test]
|
|
fn data_update_elapsed_time_negative() {
|
|
// Encode -500ms; expect a signed-preserving round-trip.
|
|
let mut payload = Vec::new();
|
|
payload.extend_from_slice(&(-500i32).to_le_bytes());
|
|
let rec = data_record(0x00C0, 0, 0x07, &payload);
|
|
let body = data_update_body(1, &rec);
|
|
let msg = NmxSubscriptionMessage::parse_inner(&body).unwrap();
|
|
assert_eq!(msg.records[0].value, Some(MxValue::ElapsedTime(-500)));
|
|
}
|
|
|
|
#[test]
|
|
fn subscription_status_int32_round_trip() {
|
|
let mut payload = Vec::new();
|
|
payload.extend_from_slice(&7i32.to_le_bytes());
|
|
let rec = status_record(-1, -2, 0x00C0, 0, 0x02, &payload);
|
|
let body = subscription_status_body(1, &rec);
|
|
let msg = NmxSubscriptionMessage::parse_inner(&body).unwrap();
|
|
assert_eq!(msg.command, SUBSCRIPTION_STATUS_COMMAND);
|
|
assert_eq!(msg.records.len(), 1);
|
|
assert_eq!(msg.item_correlation_id.unwrap().0, CORRELATION_ID_BYTES);
|
|
let r = &msg.records[0];
|
|
assert_eq!(r.status, -1);
|
|
assert_eq!(r.detail_status, Some(-2));
|
|
assert_eq!(r.quality, 0x00C0);
|
|
assert_eq!(r.value, Some(MxValue::Int32(7)));
|
|
}
|
|
|
|
#[test]
|
|
fn data_update_record_count_zero_hard_errors() {
|
|
// record_count = 0 (or negative) must error — a DataUpdate frame with
|
|
// no records is not meaningful.
|
|
let body0 = data_update_body(0, &[]);
|
|
match NmxSubscriptionMessage::parse_inner(&body0).unwrap_err() {
|
|
CodecError::Decode { offset, reason, .. } => {
|
|
assert_eq!(offset, 3);
|
|
assert!(reason.contains(">= 1"), "unexpected reason: {reason}");
|
|
}
|
|
other => panic!("expected CodecError::Decode, got {other:?}"),
|
|
}
|
|
|
|
// Negative record_count also rejected.
|
|
let body_neg = data_update_body(-1, &[]);
|
|
assert!(matches!(
|
|
NmxSubscriptionMessage::parse_inner(&body_neg).unwrap_err(),
|
|
CodecError::Decode { .. }
|
|
));
|
|
}
|
|
|
|
/// F44 evidence: `captures/094-frida-buffered-separate-writer/` line 145
|
|
/// produced a `0x33` DataUpdate with `record_count = 2` against a buffered
|
|
/// subscription on `TestChildObject.TestInt` after a `Write.variantA` from
|
|
/// a separate writer session. The trace truncated record 2's value (the
|
|
/// inner_length envelope field said 61 bytes; the trace dumped 57). This
|
|
/// test reconstructs a complete two-record body using the captured
|
|
/// per-record fields plus a synthesized 4-byte value for record 2 and
|
|
/// asserts the decoder produces two well-formed records. Records carry
|
|
/// status/quality/filetime/value as observed; the synthesized value bytes
|
|
/// are documented in the inline comment so the divergence from the raw
|
|
/// capture is explicit.
|
|
#[test]
|
|
fn data_update_multi_record_round_trip() {
|
|
// Record 1 (verbatim from capture 094 line 145):
|
|
// status = 3, quality = 0xC0, filetime = 0x01dcd4fc259d1190,
|
|
// wire_kind = 0x02 (Int32), value = 137 (0x89 0x00 0x00 0x00).
|
|
let rec1 =
|
|
data_record_with_status(3, 0x00C0, 0x01dcd4fc259d1190, 0x02, &137i32.to_le_bytes());
|
|
// Record 2 (header verbatim from capture; value synthesized — the trace
|
|
// truncated 4 bytes shy of the inner_length envelope field):
|
|
// status = 4, same quality/filetime/wire_kind. Value
|
|
// `0x00000000` is a placeholder; the real wire bytes are not in
|
|
// the capture, so we round-trip a deterministic placeholder rather
|
|
// than fabricating an observed value.
|
|
let rec2 =
|
|
data_record_with_status(4, 0x00C0, 0x01dcd4fc259d1190, 0x02, &0i32.to_le_bytes());
|
|
let mut combined = Vec::with_capacity(rec1.len() + rec2.len());
|
|
combined.extend_from_slice(&rec1);
|
|
combined.extend_from_slice(&rec2);
|
|
let body = data_update_body(2, &combined);
|
|
|
|
let msg = NmxSubscriptionMessage::parse_inner(&body).unwrap();
|
|
assert_eq!(msg.command, DATA_UPDATE_COMMAND);
|
|
assert_eq!(msg.record_count, 2);
|
|
assert!(msg.item_correlation_id.is_none());
|
|
assert_eq!(msg.records.len(), 2);
|
|
assert_eq!(msg.records[0].status, 3);
|
|
assert_eq!(msg.records[0].value, Some(MxValue::Int32(137)));
|
|
assert_eq!(msg.records[0].offset, 23);
|
|
assert_eq!(msg.records[1].status, 4);
|
|
assert_eq!(msg.records[1].value, Some(MxValue::Int32(0)));
|
|
assert_eq!(msg.records[1].offset, 23 + 19);
|
|
}
|
|
|
|
/// F44 evidence: feed the verbatim (truncated) capture-094 inner bytes and
|
|
/// assert the decoder produces a clean error rather than a panic, slice
|
|
/// out-of-bounds, or partial decode. The trace dropped 4 bytes from
|
|
/// record 2's value (Frida `candidate_size = 107`; `inner_length`
|
|
/// envelope field said 111). The decoder must propagate this as a typed
|
|
/// short-record error.
|
|
#[test]
|
|
fn data_update_capture_094_truncated_record_errors() {
|
|
// 23-byte preamble + 19-byte rec1 + 15-byte rec2 fixed prefix, no value.
|
|
// The hex below is bytes 50..107 of capture 094 line 145 (inner body
|
|
// following the 50-byte outer/envelope wrapping; see
|
|
// `docs/M6-buffered-evidence.md`).
|
|
let inner: [u8; 57] = [
|
|
// command + version + record_count + operation_id (23 bytes)
|
|
0x33, 0x01, 0x00, 0x02, 0x00, 0x00, 0x00, 0x93, 0x8a, 0x8d, 0x18, 0x49, 0x1d, 0x13,
|
|
0x47, 0x86, 0xc1, 0xe2, 0x1d, 0x4f, 0xd7, 0xca, 0x8d,
|
|
// record 1 (19 bytes): status=3, quality=0xc0, filetime, kind=02, value=137
|
|
0x03, 0x00, 0x00, 0x00, 0xc0, 0x00, 0x90, 0x11, 0x9d, 0x25, 0xfc, 0xd4, 0xdc, 0x01,
|
|
0x02, 0x89, 0x00, 0x00, 0x00,
|
|
// record 2 fixed prefix only (15 bytes): status=4, quality, filetime, kind=02
|
|
0x04, 0x00, 0x00, 0x00, 0xc0, 0x00, 0x90, 0x11, 0x9d, 0x25, 0xfc, 0xd4, 0xdc, 0x01,
|
|
0x02,
|
|
];
|
|
// Per-record min length is 15 bytes, which the trailing fragment exactly
|
|
// satisfies — but the Int32 value (4 more bytes) is missing, so the
|
|
// value decoder yields `(None, 0)` and the record consumes only its
|
|
// 15-byte fixed prefix. The decode succeeds with record 2's value as
|
|
// None — preserving capture fidelity rather than synthesising bytes.
|
|
let msg = NmxSubscriptionMessage::parse_inner(&inner).unwrap();
|
|
assert_eq!(msg.record_count, 2);
|
|
assert_eq!(msg.records.len(), 2);
|
|
assert_eq!(msg.records[0].status, 3);
|
|
assert_eq!(msg.records[0].value, Some(MxValue::Int32(137)));
|
|
assert_eq!(msg.records[1].status, 4);
|
|
assert_eq!(msg.records[1].wire_kind, 0x02);
|
|
// Value is None because the trace truncated 4 bytes shy of a complete
|
|
// Int32 — codec preserves "unknown" rather than fabricating.
|
|
assert_eq!(msg.records[1].value, None);
|
|
}
|
|
|
|
#[test]
|
|
fn data_update_has_no_correlation_id() {
|
|
// DataUpdate records start at offset 23 — there is no correlation id
|
|
// gap. Verify by feeding a body that *would* be malformed if 16 extra
|
|
// bytes were consumed before the record.
|
|
let rec = data_record(0x00C0, 0, 0x01, &[0x01]);
|
|
let body = data_update_body(1, &rec);
|
|
let msg = NmxSubscriptionMessage::parse_inner(&body).unwrap();
|
|
assert!(msg.item_correlation_id.is_none());
|
|
// First record begins at offset 23, not 39.
|
|
assert_eq!(msg.records[0].offset, 23);
|
|
}
|
|
|
|
#[test]
|
|
fn subscription_status_reads_correlation_id() {
|
|
let mut payload = Vec::new();
|
|
payload.extend_from_slice(&0i32.to_le_bytes());
|
|
let rec = status_record(0, 0, 0x00C0, 0, 0x02, &payload);
|
|
let body = subscription_status_body(1, &rec);
|
|
let msg = NmxSubscriptionMessage::parse_inner(&body).unwrap();
|
|
assert_eq!(msg.item_correlation_id.unwrap().0, CORRELATION_ID_BYTES);
|
|
// First record begins at offset 39 (preamble + correlation id).
|
|
assert_eq!(msg.records[0].offset, 39);
|
|
}
|
|
|
|
#[test]
|
|
fn boolean_array_minus_one_is_true() {
|
|
// Array header: 4 unknown bytes + count u16 + element_width i32 (=2)
|
|
// + values (count * 2 bytes).
|
|
let count: u16 = 2;
|
|
let element_width: i32 = 2;
|
|
let mut payload = Vec::new();
|
|
payload.extend_from_slice(&[0u8; 4]); // unknown header bytes
|
|
payload.extend_from_slice(&count.to_le_bytes());
|
|
payload.extend_from_slice(&element_width.to_le_bytes());
|
|
// -1 (true) and 0 (false) as i16 LE
|
|
payload.extend_from_slice(&(-1i16).to_le_bytes()); // 0xff 0xff
|
|
payload.extend_from_slice(&0i16.to_le_bytes()); // 0x00 0x00
|
|
let rec = data_record(0x00C0, 0, 0x41, &payload);
|
|
let body = data_update_body(1, &rec);
|
|
let msg = NmxSubscriptionMessage::parse_inner(&body).unwrap();
|
|
assert_eq!(
|
|
msg.records[0].value,
|
|
Some(MxValue::BoolArray(vec![true, false]))
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn boolean_array_byte_pattern_ff_ff_is_true_00_00_is_false() {
|
|
// Sanity: `[0xff, 0xff]` as i16 LE = -1 (true); `[0x00, 0x00]` = 0 (false).
|
|
let count: u16 = 2;
|
|
let element_width: i32 = 2;
|
|
let mut payload = Vec::new();
|
|
payload.extend_from_slice(&[0u8; 4]);
|
|
payload.extend_from_slice(&count.to_le_bytes());
|
|
payload.extend_from_slice(&element_width.to_le_bytes());
|
|
payload.extend_from_slice(&[0xff, 0xff, 0x00, 0x00]);
|
|
let rec = data_record(0x00C0, 0, 0x41, &payload);
|
|
let body = data_update_body(1, &rec);
|
|
let msg = NmxSubscriptionMessage::parse_inner(&body).unwrap();
|
|
assert_eq!(
|
|
msg.records[0].value,
|
|
Some(MxValue::BoolArray(vec![true, false]))
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn datetime_array_decodes_0x46() {
|
|
// Element width is 12: 8-byte filetime + 4 bytes of trailing padding.
|
|
let count: u16 = 2;
|
|
let element_width: i32 = 12;
|
|
let mut payload = Vec::new();
|
|
payload.extend_from_slice(&[0u8; 4]);
|
|
payload.extend_from_slice(&count.to_le_bytes());
|
|
payload.extend_from_slice(&element_width.to_le_bytes());
|
|
// Two FILETIMEs plus 4 bytes of trailing padding each.
|
|
payload.extend_from_slice(&132_000_000_000i64.to_le_bytes());
|
|
payload.extend_from_slice(&[0u8; 4]);
|
|
payload.extend_from_slice(&132_500_000_000i64.to_le_bytes());
|
|
payload.extend_from_slice(&[0u8; 4]);
|
|
let rec = data_record(0x00C0, 0, 0x46, &payload);
|
|
let body = data_update_body(1, &rec);
|
|
let msg = NmxSubscriptionMessage::parse_inner(&body).unwrap();
|
|
assert_eq!(
|
|
msg.records[0].value,
|
|
Some(MxValue::DateTimeArray(vec![
|
|
132_000_000_000,
|
|
132_500_000_000
|
|
]))
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn unknown_command_is_unexpected_opcode() {
|
|
let mut body = data_update_body(1, &[]);
|
|
body[0] = 0x99;
|
|
let err = NmxSubscriptionMessage::parse_inner(&body).unwrap_err();
|
|
assert!(matches!(err, CodecError::UnexpectedOpcode(0x99)));
|
|
}
|
|
|
|
#[test]
|
|
fn short_inner_is_short_read() {
|
|
let err = NmxSubscriptionMessage::parse_inner(&[0u8; 22]).unwrap_err();
|
|
assert!(matches!(err, CodecError::ShortRead { .. }));
|
|
}
|
|
|
|
#[test]
|
|
fn subscription_status_short_header_is_short_read() {
|
|
// 23 bytes is the preamble length, but SubscriptionStatus needs 39.
|
|
let mut body = Vec::with_capacity(23);
|
|
body.push(SUBSCRIPTION_STATUS_COMMAND);
|
|
body.extend_from_slice(&1u16.to_le_bytes());
|
|
body.extend_from_slice(&0i32.to_le_bytes());
|
|
body.extend_from_slice(&OPERATION_ID_BYTES);
|
|
let err = NmxSubscriptionMessage::parse_inner(&body).unwrap_err();
|
|
assert!(matches!(err, CodecError::ShortRead { .. }));
|
|
}
|
|
|
|
#[test]
|
|
fn wire_kind_to_value_kind_table() {
|
|
assert_eq!(wire_kind_to_value_kind(0x01), Some(MxValueKind::Boolean));
|
|
assert_eq!(
|
|
wire_kind_to_value_kind(0x07),
|
|
Some(MxValueKind::ElapsedTime)
|
|
);
|
|
assert_eq!(
|
|
wire_kind_to_value_kind(0x46),
|
|
Some(MxValueKind::DateTimeArray)
|
|
);
|
|
assert_eq!(wire_kind_to_value_kind(0x99), None);
|
|
}
|
|
|
|
#[test]
|
|
fn command_constants_match_dotnet() {
|
|
// NmxSubscriptionMessage.cs:36-37
|
|
assert_eq!(SUBSCRIPTION_STATUS_COMMAND, 0x32);
|
|
assert_eq!(DATA_UPDATE_COMMAND, 0x33);
|
|
}
|
|
}
|