[F49 step 1 + F56] callback router: peel envelope before parsing subscription / 0x11 frames

The router used to call NmxSubscriptionMessage::parse_inner directly
on the COM-stub-delivered body, but the wire bytes arrive wrapped in
a ProcessDataReceived envelope (46-byte header + optional 4-byte
length prefix); parse_inner expects post-envelope bytes. Result:
every 0x33 DataUpdate that ever arrived was silently dropped.

Mirrors the .NET reference's MxNativeSession.OnCallbackReceived flow
at cs:582-606 — three sequential parse attempts:
  1. NmxOperationStatusMessage::try_parse_process_data_received_body  (already wired)
  2. NmxReferenceRegistrationResultMessage::try_parse_...              (NEW — was missing)
  3. NmxSubscriptionMessage::try_parse_process_data_received_body      (NEW — was wrong)

Adds:
- NmxSubscriptionMessage::try_parse_process_data_received_body — peels
  envelope via NmxObservedEnvelope::parse_process_data_received_body_flexible,
  then dispatches to existing parse_inner.
- NmxReferenceRegistrationResultMessage::try_parse_process_data_received_body —
  same shape, for the 0x11 registration-result frame.
- Router branch for 0x11 — currently traces the assigned item_handle and
  drops the frame (matches the .NET reference, which fires a
  ReferenceRegistrationReceived event with no consumer in the codebase).
- Router fall-through trace! when neither path matches, so future
  unparseable bodies surface in RUST_LOG=trace instead of vanishing.
- DcomCallbackSink::forward — trace! per inbound callback so
  RUST_LOG=mxaccess_callback=trace surfaces opnum + size.
- crates/mxaccess-compat/tests/buffered_subscribe_live.rs — F49 step 1
  live test that drives subscribe_buffered + a 500ms-cadence writer.
  Also pulls tracing-subscriber as a dev-dep so the test can dump
  router activity.

Existing router_task_decodes_callback_invoked_into_broadcast unit test
updated to wrap its synthetic 0x32 body in an envelope so the new
parse path actually accepts it.

Live result: F56 — the buffered round-trip *registers* successfully
(RegisterReference returns HRESULT 0; engine sends one 0x11
RegistrationResult + one 51-byte op-status per write, perfectly
clocked) but the engine never sends a 0x33 DataUpdate. Rust-port-
specific gap vs the .NET reference's working buffered path; root
cause is likely a field-level difference in the RegisterReference
body or a missing post-RegisterReference step. Captured as F56 in
design/followups.md, blocking F49 step 1; F56's DoD is the same
live test reporting >=3 DataChange arrivals.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-06 09:50:57 -04:00
parent 2fc327a8d5
commit af15fe7587
8 changed files with 477 additions and 24 deletions
+74 -23
View File
@@ -46,8 +46,8 @@ use mxaccess_callback::ExporterIdentities;
#[cfg(all(windows, feature = "windows-com"))]
use mxaccess_rpc::com_objref_provider::IUnknownHolder;
use mxaccess_codec::{
MxStatus, NmxOperationStatusMessage, NmxReferenceRegistrationMessage, NmxSubscriptionMessage,
NmxSubscriptionRecord,
MxStatus, NmxOperationStatusMessage, NmxReferenceRegistrationMessage,
NmxReferenceRegistrationResultMessage, NmxSubscriptionMessage, NmxSubscriptionRecord,
};
use mxaccess_galaxy::{GalaxyTagMetadata, Resolver, ResolverError};
use mxaccess_nmx::{NmxClient, NmxClientError, WriteValue};
@@ -831,15 +831,53 @@ pub(crate) async fn callback_router(
continue;
}
// 2. Fall through to subscription messages — same 23-byte
// preamble + records as `NmxSubscriptionMessage::parse_inner`
// expects. Parse failures are silent (no consumer) since
// the .NET reference also fires `UnparsedCallbackReceived`
// events separately and we don't model that yet.
if let Ok(msg) = NmxSubscriptionMessage::parse_inner(&body) {
// `send` returns `Err(SendError)` only when there are zero
// receivers — that's fine for this wire path; nothing to do.
let _ = callback_tx.send(Arc::new(msg));
// 2. Try `0x11` reference-registration result. NmxSvc
// sends one of these after `RegisterReference` to
// convey the assigned `item_handle` + the engine's
// decoded item definition / context. Mirrors
// `MxNativeSession.OnCallbackReceived:582-588`. The
// .NET reference fires a `ReferenceRegistrationReceived`
// event but no consumer in the codebase reacts to it;
// we currently just consume + drop the frame at trace
// level so the catch-all parse below doesn't log a
// spurious "unexpected opcode 0x11" warning.
if let Ok(result) =
NmxReferenceRegistrationResultMessage::try_parse_process_data_received_body(&body)
{
tracing::trace!(
item_handle = result.item_handle,
correlation_id = ?result.item_correlation_id,
"callback_router: 0x11 RegistrationResult received"
);
continue;
}
// 3. Fall through to subscription messages. Wire bytes
// arrive wrapped in a `ProcessDataReceived` envelope (46-byte
// header, optionally with a 4-byte length prefix); the
// 23-byte subscription preamble starts after that.
// Mirrors `MxNativeSession.OnCallbackReceived:593` which
// calls `NmxSubscriptionMessage.ParseProcessDataReceivedBody`.
// The earlier code called `parse_inner` directly on the
// wire bytes, which silently swallowed every DataUpdate
// because the bytes failed the 23-byte preamble check.
// Parse failures are still silent (no consumer) — the
// .NET reference fires `UnparsedCallbackReceived` events
// separately and we don't model that yet.
match NmxSubscriptionMessage::try_parse_process_data_received_body(&body) {
Ok(msg) => {
// `send` returns `Err(SendError)` only when there
// are zero receivers — that's fine for this wire
// path; nothing to do.
let _ = callback_tx.send(Arc::new(msg));
}
Err(e) => {
tracing::trace!(
err = %e,
body_len = body.len(),
"callback_router: dropping unparseable callback body"
);
}
}
}
}
@@ -2961,18 +2999,31 @@ mod tests {
pending_ops,
));
// Build a minimal valid 0x32 SubscriptionStatus body: 23-byte
// preamble + 16-byte item_correlation_id, record_count=0 so no
// records follow. Total: 39 bytes. Using 0x32 (not 0x33)
// because DataUpdate always attempts to parse one record
// regardless of record_count, and we'd need a full 38-byte
// record body to satisfy that parser.
let mut body = vec![0u8; 39];
body[0] = 0x32;
body[1..3].copy_from_slice(&1u16.to_le_bytes()); // version
body[3..7].copy_from_slice(&0i32.to_le_bytes()); // record_count
body[7..23].copy_from_slice(&[0xEFu8; 16]); // operation_id
body[23..39].copy_from_slice(&[0xCDu8; 16]); // item_correlation_id
// Build a minimal valid 0x32 SubscriptionStatus body wrapped
// in a `ProcessDataReceived` envelope (header-only form, no
// 4-byte total-length prefix): 46-byte header + 39-byte inner.
// The header's `inner_length` at offset 2 is `inner_len + 4`
// (.NET cs:54-56 — declared length includes the size-of-int).
// Using 0x32 (not 0x33) because DataUpdate always attempts
// to parse one record regardless of record_count, and we'd
// need a full 38-byte record body to satisfy that parser.
const HEADER_LEN: usize = 46;
const INNER_LEN_OFFSET: usize = 2;
let inner_len = 39usize;
let mut body = vec![0u8; HEADER_LEN + inner_len];
// Inner-length declaration (at INNER_LEN_OFFSET = 2). Flexible
// (header-only) form compares `declared == body.len() - HEADER_LEN`
// verbatim — no `-4` adjustment (`observed_frame.rs:178`); the
// adjustment only applies on the strict path where there's a
// 4-byte total-length prefix in front.
let declared = inner_len as i32;
body[INNER_LEN_OFFSET..INNER_LEN_OFFSET + 4].copy_from_slice(&declared.to_le_bytes());
let inner = &mut body[HEADER_LEN..];
inner[0] = 0x32;
inner[1..3].copy_from_slice(&1u16.to_le_bytes()); // version
inner[3..7].copy_from_slice(&0i32.to_le_bytes()); // record_count
inner[7..23].copy_from_slice(&[0xEFu8; 16]); // operation_id
inner[23..39].copy_from_slice(&[0xCDu8; 16]); // item_correlation_id
let event = CallbackEvent::CallbackInvoked { opnum: 4, body };
event_tx.send(event).unwrap();