diff --git a/design/followups.md b/design/followups.md index 06e02b0..fdcbd19 100644 --- a/design/followups.md +++ b/design/followups.md @@ -41,6 +41,8 @@ Between each publish: wait for the crate to be indexed before the next one's `ca **Step 5 unblocked 2026-05-06 by F55 / Path A.** `cargo test -p mxaccess-compat --features live-windows-com --test lmx_write_complete_live -- --ignored --nocapture` passes against the live AVEVA install: RegisterEngine2 OK, write round-trips, OnWriteComplete fires with the expected `WriteCompleteEvent { server_handle, item_handle, statuses, is_during_recovery }` shape. Steps 1-4 still pending. +**Step 1 attempted 2026-05-06 — blocked by F56.** Added `crates/mxaccess-compat/tests/buffered_subscribe_live.rs` driving `Session::subscribe_buffered` via `Session::connect_nmx_auto`. RegisterReference completes successfully against the live engine, but no `0x33` DataUpdate frames are ever received — only op-status frames per write and the `0x11` registration-result. The Rust port's wire frame for RegisterReference must differ from the .NET reference's in some field. The codec router was also fixed during this attempt (envelope-peeling for `NmxSubscriptionMessage` and the `0x11` `NmxReferenceRegistrationResultMessage` path were both missing); those are real bugs that would have hidden any DataUpdate had one arrived. F56 captures the open work. + **Definition of done:** 1. Per-feature evidence summary in `docs/M6-live-verification.md` (one paragraph per feature with the wire-trace excerpt or metrics-exporter snapshot). 2. If any feature fails live: file a sub-followup with the captured failure and link it from the evidence doc. @@ -100,6 +102,29 @@ Between each publish: wait for the crate to be indexed before the next one's `ca **Resolves when:** the lint is on and the workspace doc build is warning-clean with it. +### F56 — Buffered subscribe completes RegisterReference but receives no `0x33` DataUpdate frames +**Severity:** P1 — blocks F49 step 1 (F36 buffered live verification) and any consumer relying on `Session::subscribe_buffered` to surface value changes. +**Source:** F49 step 1 live attempt 2026-05-06. Test `cargo test -p mxaccess-compat --features live-windows-com --test buffered_subscribe_live -- --ignored --nocapture` (added in this session) connects via `Session::connect_nmx_auto` (F55-proven), issues `subscribe_buffered(TestChildObject.TestInt, 1000ms)` against the live engine, and runs a background writer at 500ms cadence. RegisterReference returns HRESULT 0; the engine then fires: +- One 46-byte heartbeat envelope (header-only, empty inner) +- One 51-byte op-status frame for the `RegisterReference` completion +- One 87-byte `0x11` `NmxReferenceRegistrationResultMessage` carrying the assigned `item_handle` +- One 51-byte op-status frame **per write** (60 frames over 30s — perfectly clocked to the writer cadence) + +But **zero `0x33` `DataUpdate` frames** ever arrive — verified end-to-end via `RUST_LOG=trace mxaccess_callback=trace`. The .NET reference's `MxNativeSession.SubscribeBufferedAsync` does deliver DataUpdates against the same engine + same tag (per F36 wave 1 evidence at `captures/094-frida-buffered-separate-writer/`), so this is a Rust-port-specific gap. + +**Likely causes (in priority order):** +1. The `NmxReferenceRegistrationMessage` body the Rust port sends differs in some field from the .NET reference's. Specifically: `subscribe: true` is set, but other fields (e.g. `item_handle = 0`, `reserved_*`, `source_galaxy_id`) may need different values to trigger DataUpdate dispatch. **Action**: capture the wire bytes from the Rust port's RegisterReference and diff against `captures/094-frida-buffered-separate-writer/` per-byte. +2. Some additional client-side step is required after RegisterReference — e.g. an ACK of the `0x11` registration result via the assigned `item_handle`, or a separate RPC the .NET reference dispatches that we miss. The F36 wave 1 evidence said no `SetBufferedUpdateInterval` is sent, but there may be another op. **Action**: capture .NET reference's outbound calls during `subscribe-buffered` end-to-end and compare to ours. +3. The `0x11` registration-result body might carry a status code we should be checking (see `NmxReferenceRegistrationResultMessage::status_category` / `status_detail`). If non-zero, the engine may have rejected the subscription silently. **Action**: log the parsed `0x11` body and check the status fields. + +**What's already wired (this session):** `NmxSubscriptionMessage::try_parse_process_data_received_body` (envelope-peeling helper) was added — the previous router called `parse_inner` directly on wire bytes and would have silently dropped any `0x33` that did arrive. This was a real bug fix; without it F56 would have stayed invisible. Same for `NmxReferenceRegistrationResultMessage::try_parse_process_data_received_body` + the `0x11` path in the router. + +**Does not affect:** `Session::write` round-trip (proven by F55 live test); plain `Session::subscribe` (not yet live-tested but uses `AdviseSupervisory` not `RegisterReference`). + +**Definition of done:** F49 step 1 passes — `cargo test -p mxaccess-compat --features live-windows-com --test buffered_subscribe_live -- --ignored --nocapture` reports at least 3 `DataChange` arrivals at the configured cadence, with monotonically-increasing values matching the writer. + +**Resolves when:** the missing field / step / status check is identified, the fix lands in `Session::subscribe_buffered_nmx` (or upstream), and the live test passes. + ### F55 — Hand-rolled callback exporter rejected by `RegisterEngine2` on this AVEVA install **Status:** Resolved 2026-05-06 by Path A (DCOM-managed `INmxSvcCallback` sink in `mxaccess-callback::dcom_sink`, wired into `Session::from_nmx_client` behind the `windows-com` feature). Live test `cargo test -p mxaccess-compat --features live-windows-com --test lmx_write_complete_live -- --ignored --nocapture` passes end-to-end: RegisterEngine2 succeeds, write round-trips, OnWriteComplete fires with status from the wire. The hand-rolled `CallbackExporter` is retained for unit tests that exercise the exporter against an in-process fake NMX peer. **Severity:** P1 — blocks F49 live verification of every M6 feature that needs an `Engine` registered (i.e. all of them). diff --git a/rust/Cargo.lock b/rust/Cargo.lock index b2043d0..560d03b 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -19,6 +19,15 @@ dependencies = [ "cpufeatures", ] +[[package]] +name = "aho-corasick" +version = "1.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddd31a130427c27518df266943a5308ed92d4b226cc639f5a8f1002816174301" +dependencies = [ + "memchr", +] + [[package]] name = "async-trait" version = "0.1.89" @@ -421,6 +430,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "lazy_static" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" + [[package]] name = "libc" version = "0.2.186" @@ -433,6 +448,15 @@ version = "0.4.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" +[[package]] +name = "matchers" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1525a2a28c7f4fa0fc98bb91ae755d1e2d1505079e05539e35bc876b5d65ae9" +dependencies = [ + "regex-automata", +] + [[package]] name = "md-5" version = "0.10.6" @@ -583,6 +607,8 @@ dependencies = [ "thiserror 2.0.18", "tokio", "tokio-stream", + "tracing", + "tracing-subscriber", ] [[package]] @@ -628,6 +654,15 @@ dependencies = [ "windows", ] +[[package]] +name = "nu-ansi-term" +version = "0.50.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" +dependencies = [ + "windows-sys 0.61.2", +] + [[package]] name = "num-bigint" version = "0.4.6" @@ -812,6 +847,23 @@ dependencies = [ "cipher 0.5.1", ] +[[package]] +name = "regex-automata" +version = "0.4.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e1dd4122fc1595e8162618945476892eefca7b88c52820e74af6262213cae8f" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", +] + +[[package]] +name = "regex-syntax" +version = "0.8.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc897dd8d9e8bd1ed8cdad82b5966c3e0ecae09fb1907d58efaa013543185d0a" + [[package]] name = "ring" version = "0.17.14" @@ -939,6 +991,15 @@ dependencies = [ "digest", ] +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + [[package]] name = "shlex" version = "1.3.0" @@ -957,6 +1018,12 @@ version = "0.4.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c790de23124f9ab44544d7ac05d60440adc586479ce501c1d6d7da3cd8c9cf5" +[[package]] +name = "smallvec" +version = "1.15.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" + [[package]] name = "socket2" version = "0.6.3" @@ -1024,6 +1091,15 @@ dependencies = [ "syn", ] +[[package]] +name = "thread_local" +version = "1.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f60246a4944f24f6e018aa17cdeffb7818b76356965d03b07d6a9886e8962185" +dependencies = [ + "cfg-if", +] + [[package]] name = "tiberius" version = "0.12.3" @@ -1144,6 +1220,36 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "db97caf9d906fbde555dd62fa95ddba9eecfd14cb388e4f491a66d74cd5fb79a" dependencies = [ "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb7f578e5945fb242538965c2d0b04418d38ec25c79d160cd279bf0731c8d319" +dependencies = [ + "matchers", + "nu-ansi-term", + "once_cell", + "regex-automata", + "sharded-slab", + "smallvec", + "thread_local", + "tracing", + "tracing-core", + "tracing-log", ] [[package]] @@ -1174,6 +1280,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "valuable" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" + [[package]] name = "version_check" version = "0.9.5" diff --git a/rust/crates/mxaccess-callback/src/dcom_sink.rs b/rust/crates/mxaccess-callback/src/dcom_sink.rs index 50a6116..bc216be 100644 --- a/rust/crates/mxaccess-callback/src/dcom_sink.rs +++ b/rust/crates/mxaccess-callback/src/dcom_sink.rs @@ -33,7 +33,7 @@ use std::ptr; use tokio::sync::mpsc; -use tracing::{debug, warn}; +use tracing::{debug, trace, warn}; use windows::Win32::System::Com::Marshal::CoMarshalInterface; use windows::Win32::System::Com::StructuredStorage::{ CreateStreamOnHGlobal, GetHGlobalFromStream, @@ -118,6 +118,12 @@ impl DcomCallbackSink { // the slice is read-only. We copy out before returning. unsafe { std::slice::from_raw_parts(buffer, buffer_size as usize) }.to_vec() }; + trace!( + opnum, + buffer_size, + body_len = body.len(), + "DcomCallbackSink: forwarding inbound callback" + ); if let Err(e) = self.event_tx.send(CallbackEvent::CallbackInvoked { opnum, body }) { // The receiver was dropped (the upstream router // probably exited). NmxSvc keeps calling us until diff --git a/rust/crates/mxaccess-codec/src/reference_registration.rs b/rust/crates/mxaccess-codec/src/reference_registration.rs index 25736ec..cfafb26 100644 --- a/rust/crates/mxaccess-codec/src/reference_registration.rs +++ b/rust/crates/mxaccess-codec/src/reference_registration.rs @@ -572,6 +572,25 @@ impl NmxReferenceRegistrationResultMessage { }) } + /// Peel the `ProcessDataReceived` envelope and parse the inner + /// `0x11` registration-result body. Mirrors + /// `NmxReferenceRegistrationResultMessage.TryParseProcessDataReceivedBody` + /// (the wire-side path used by `MxNativeSession.OnCallbackReceived` + /// at `cs:582`). + /// + /// # Errors + /// + /// - [`CodecError::ShortRead`] / [`CodecError::InnerLengthMismatch`] + /// surfaced from the envelope parse. + /// - Any error from [`Self::parse`] on the inner body — including + /// [`CodecError::UnexpectedOpcode`] when the inner body's first + /// byte isn't `0x11` (use this as a discriminator for "this body + /// isn't a registration-result frame"). + pub fn try_parse_process_data_received_body(body: &[u8]) -> Result { + let envelope = crate::NmxObservedEnvelope::parse_process_data_received_body_flexible(body)?; + Self::parse(&envelope.inner_body) + } + /// Encode the result body. The .NET reference does not provide an /// `Encode` (the result is server-emitted); the Rust port supplies one /// for round-trip testing and for synthetic-server use cases. The diff --git a/rust/crates/mxaccess-codec/src/subscription_message.rs b/rust/crates/mxaccess-codec/src/subscription_message.rs index f74c024..893d312 100644 --- a/rust/crates/mxaccess-codec/src/subscription_message.rs +++ b/rust/crates/mxaccess-codec/src/subscription_message.rs @@ -215,6 +215,29 @@ impl NmxSubscriptionMessage { _ => Err(CodecError::UnexpectedOpcode(command)), } } + + /// Peel the `ProcessDataReceived` envelope and parse the inner + /// subscription body. Mirrors the .NET reference's + /// `NmxSubscriptionMessage.ParseProcessDataReceivedBody` + /// (the wire-side path used by `MxNativeSession.OnCallbackReceived` + /// at `cs:593`). + /// + /// Inbound NMX callbacks arrive as a wire envelope (46-byte header, + /// optionally with a 4-byte total-length prefix), inside which sits + /// the 23-byte preamble + records body that + /// [`Self::parse_inner`] knows how to decode. Calling `parse_inner` + /// directly on the wire bytes — which the router used to do — would + /// fail because the first 46 bytes are envelope, not preamble. + /// + /// # Errors + /// + /// - [`CodecError::ShortRead`] / [`CodecError::InnerLengthMismatch`] + /// surfaced from the envelope parse. + /// - Any error from [`Self::parse_inner`] on the inner body. + pub fn try_parse_process_data_received_body(body: &[u8]) -> Result { + let envelope = crate::NmxObservedEnvelope::parse_process_data_received_body_flexible(body)?; + Self::parse_inner(&envelope.inner_body) + } } /// `0x33` DataUpdate. Mirrors `NmxSubscriptionMessage.ParseDataUpdate` diff --git a/rust/crates/mxaccess-compat/Cargo.toml b/rust/crates/mxaccess-compat/Cargo.toml index abc4555..fe375b4 100644 --- a/rust/crates/mxaccess-compat/Cargo.toml +++ b/rust/crates/mxaccess-compat/Cargo.toml @@ -19,6 +19,10 @@ thiserror = { workspace = true } tokio = { workspace = true, features = ["macros", "rt", "rt-multi-thread", "sync", "time"] } async-trait = { workspace = true } mxaccess-rpc = { path = "../mxaccess-rpc", version = "0.0.0" } +# Live tests use tracing-subscriber to dump router/dcom_sink trace +# events on demand (set RUST_LOG=mxaccess=trace,mxaccess_callback=trace). +tracing = { workspace = true } +tracing-subscriber = { version = "0.3", features = ["env-filter"] } [features] default = [] diff --git a/rust/crates/mxaccess-compat/tests/buffered_subscribe_live.rs b/rust/crates/mxaccess-compat/tests/buffered_subscribe_live.rs new file mode 100644 index 0000000..ca164f3 --- /dev/null +++ b/rust/crates/mxaccess-compat/tests/buffered_subscribe_live.rs @@ -0,0 +1,213 @@ +//! Live verification of F36 — buffered subscribe (`Session::subscribe_buffered`) +//! round-trips against AVEVA and yields `DataChange`s at the requested cadence. +//! +//! F49 step 1. Asserts the structural property of F36 (single +//! `RegisterReference` with `.property(buffer)` suffix, no separate +//! `AdviseSupervisory` follow-up, no `SetBufferedUpdateInterval` RPC) +//! is preserved end-to-end. The structural piece is unit-tested +//! exhaustively in `crates/mxaccess/src/session.rs` (search +//! `subscribe_buffered_nmx`); this test confirms the wire round-trip +//! actually delivers updates. +//! +//! Gated on `MX_LIVE` env + `live-windows-com` feature. Uses +//! `Session::connect_nmx_auto` (F55-proven path). +//! +//! Run with: +//! ```text +//! cd rust +//! cargo test -p mxaccess-compat --features live-windows-com \ +//! --test buffered_subscribe_live -- --ignored --nocapture +//! ``` + +#![allow( + clippy::unwrap_used, + clippy::expect_used, + clippy::indexing_slicing, + clippy::panic +)] + +#[cfg(all(windows, feature = "live-windows-com"))] +mod live { + use std::sync::Arc; + use std::time::{Duration, Instant}; + + use futures_util::StreamExt; + use mxaccess::{ + BufferedOptions, GalaxyTagMetadata, MxValue, RecoveryPolicy, Resolver, ResolverError, + Session, SessionOptions, + }; + use mxaccess_rpc::ntlm::NtlmClientContext; + + struct StaticResolver { + tag_reference: String, + metadata: GalaxyTagMetadata, + } + + impl StaticResolver { + fn new(tag_reference: &str) -> Self { + let (object, attribute) = tag_reference + .split_once('.') + .unwrap_or((tag_reference, "TestInt")); + Self { + tag_reference: tag_reference.to_string(), + metadata: GalaxyTagMetadata { + object_tag_name: object.to_string(), + attribute_name: attribute.to_string(), + primitive_name: None, + platform_id: 1, + engine_id: 2, + object_id: 3, + primitive_id: 0, + attribute_id: 7, + property_id: GalaxyTagMetadata::VALUE_PROPERTY_ID, + mx_data_type: 2, + is_array: false, + security_classification: 0, + attribute_source: "dynamic".into(), + }, + } + } + } + + #[async_trait::async_trait] + impl Resolver for StaticResolver { + async fn resolve(&self, tag: &str) -> Result { + if tag == self.tag_reference { + Ok(self.metadata.clone()) + } else { + Err(ResolverError::NotFound { + tag_reference: tag.to_string(), + }) + } + } + } + + fn ntlm_from_test_env() -> NtlmClientContext { + let user = std::env::var("MX_TEST_USER").expect("MX_TEST_USER"); + let password = std::env::var("MX_TEST_PASSWORD").expect("MX_TEST_PASSWORD"); + let domain = std::env::var("MX_TEST_DOMAIN").unwrap_or_default(); + let hostname = std::env::var("COMPUTERNAME").unwrap_or_default(); + NtlmClientContext::new(&user, &password, &domain, Some(&hostname)) + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + #[ignore] + async fn buffered_subscribe_yields_updates() { + if std::env::var_os("MX_LIVE").is_none() { + eprintln!("MX_LIVE not set — skipping live test"); + return; + } + let tag = std::env::var("MX_TEST_TAG") + .unwrap_or_else(|_| "TestChildObject.TestInt".to_string()); + + // Initialise tracing so RUST_LOG=trace surfaces dcom_sink + + // router events (set by the caller). Init may fail if a + // subscriber is already installed — ignore the result. + let _ = tracing_subscriber::fmt() + .with_env_filter( + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")), + ) + .with_test_writer() + .try_init(); + + eprintln!("connecting via Session::connect_nmx_auto"); + let session = Session::connect_nmx_auto( + ntlm_from_test_env, + SessionOptions::default(), + Arc::new(StaticResolver::new(&tag)), + RecoveryPolicy::default(), + ) + .await + .expect("connect_nmx_auto"); + eprintln!("session connected"); + + // 1s cadence. Mirrors the `subscribe-buffered` example. + let opts = BufferedOptions { + update_interval_ms: 1_000, + }; + eprintln!( + "buffered-subscribing to {} (requested cadence {} ms, rounded to {} ms)", + tag, + opts.update_interval_ms, + opts.rounded_update_interval_ms() + ); + let mut sub = session + .subscribe_buffered(&tag, opts) + .await + .expect("subscribe_buffered"); + eprintln!("correlation_id = {:02x?}", sub.correlation_id()); + + // Buffered cadence is delivery-only — the engine pushes at the + // configured interval but only when the value has changed. + // Spawn a background writer that bumps the tag every 500ms so + // the engine always has a fresh value to deliver at the next + // cadence boundary. 30s drain window. + let deadline = Instant::now() + Duration::from_secs(30); + let writer_session = session.clone(); + let writer_tag = tag.clone(); + let writer_stop = Arc::new(std::sync::atomic::AtomicBool::new(false)); + let writer_stop_clone = writer_stop.clone(); + let writer = tokio::spawn(async move { + let mut value: i32 = 1_000; + while !writer_stop_clone.load(std::sync::atomic::Ordering::Acquire) { + if let Err(e) = writer_session + .write(&writer_tag, MxValue::Int32(value)) + .await + { + eprintln!("writer: write({value}) failed: {e}"); + break; + } + value = value.wrapping_add(1); + tokio::time::sleep(Duration::from_millis(500)).await; + } + value + }); + + let mut received = 0; + let mut last_ts = None; + while received < 3 && Instant::now() < deadline { + match tokio::time::timeout(Duration::from_secs(5), sub.next()).await { + Ok(Some(Ok(dc))) => { + eprintln!( + "[{received}] {} = {:?} ts={:?}", + dc.reference, dc.value, dc.timestamp + ); + received += 1; + last_ts = Some(dc.timestamp); + } + Ok(Some(Err(e))) => { + writer_stop.store(true, std::sync::atomic::Ordering::Release); + let _ = writer.await; + panic!("subscription error: {e}"); + } + Ok(None) => break, + Err(_) => { + eprintln!("5s gap waiting for next update"); + } + } + } + writer_stop.store(true, std::sync::atomic::Ordering::Release); + let last_value = writer.await.unwrap_or(-1); + eprintln!("writer stopped after value {last_value}"); + + assert!( + received >= 1, + "no DataChange arrived within 15s — buffered subscribe didn't round-trip" + ); + eprintln!("received {received} updates; last ts = {last_ts:?}"); + + session.unsubscribe(sub).await.expect("unsubscribe"); + session.shutdown_nmx().await.expect("shutdown"); + eprintln!("clean shutdown"); + } +} + +#[cfg(not(all(windows, feature = "live-windows-com")))] +mod live { + #[test] + #[ignore] + fn buffered_subscribe_yields_updates() { + eprintln!("test skipped: requires Windows + live-windows-com feature"); + } +} diff --git a/rust/crates/mxaccess/src/session.rs b/rust/crates/mxaccess/src/session.rs index 39e96d8..d31811c 100644 --- a/rust/crates/mxaccess/src/session.rs +++ b/rust/crates/mxaccess/src/session.rs @@ -46,8 +46,8 @@ use mxaccess_callback::ExporterIdentities; #[cfg(all(windows, feature = "windows-com"))] use mxaccess_rpc::com_objref_provider::IUnknownHolder; use mxaccess_codec::{ - MxStatus, NmxOperationStatusMessage, NmxReferenceRegistrationMessage, NmxSubscriptionMessage, - NmxSubscriptionRecord, + MxStatus, NmxOperationStatusMessage, NmxReferenceRegistrationMessage, + NmxReferenceRegistrationResultMessage, NmxSubscriptionMessage, NmxSubscriptionRecord, }; use mxaccess_galaxy::{GalaxyTagMetadata, Resolver, ResolverError}; use mxaccess_nmx::{NmxClient, NmxClientError, WriteValue}; @@ -831,15 +831,53 @@ pub(crate) async fn callback_router( continue; } - // 2. Fall through to subscription messages — same 23-byte - // preamble + records as `NmxSubscriptionMessage::parse_inner` - // expects. Parse failures are silent (no consumer) since - // the .NET reference also fires `UnparsedCallbackReceived` - // events separately and we don't model that yet. - if let Ok(msg) = NmxSubscriptionMessage::parse_inner(&body) { - // `send` returns `Err(SendError)` only when there are zero - // receivers — that's fine for this wire path; nothing to do. - let _ = callback_tx.send(Arc::new(msg)); + // 2. Try `0x11` reference-registration result. NmxSvc + // sends one of these after `RegisterReference` to + // convey the assigned `item_handle` + the engine's + // decoded item definition / context. Mirrors + // `MxNativeSession.OnCallbackReceived:582-588`. The + // .NET reference fires a `ReferenceRegistrationReceived` + // event but no consumer in the codebase reacts to it; + // we currently just consume + drop the frame at trace + // level so the catch-all parse below doesn't log a + // spurious "unexpected opcode 0x11" warning. + if let Ok(result) = + NmxReferenceRegistrationResultMessage::try_parse_process_data_received_body(&body) + { + tracing::trace!( + item_handle = result.item_handle, + correlation_id = ?result.item_correlation_id, + "callback_router: 0x11 RegistrationResult received" + ); + continue; + } + + // 3. Fall through to subscription messages. Wire bytes + // arrive wrapped in a `ProcessDataReceived` envelope (46-byte + // header, optionally with a 4-byte length prefix); the + // 23-byte subscription preamble starts after that. + // Mirrors `MxNativeSession.OnCallbackReceived:593` which + // calls `NmxSubscriptionMessage.ParseProcessDataReceivedBody`. + // The earlier code called `parse_inner` directly on the + // wire bytes, which silently swallowed every DataUpdate + // because the bytes failed the 23-byte preamble check. + // Parse failures are still silent (no consumer) — the + // .NET reference fires `UnparsedCallbackReceived` events + // separately and we don't model that yet. + match NmxSubscriptionMessage::try_parse_process_data_received_body(&body) { + Ok(msg) => { + // `send` returns `Err(SendError)` only when there + // are zero receivers — that's fine for this wire + // path; nothing to do. + let _ = callback_tx.send(Arc::new(msg)); + } + Err(e) => { + tracing::trace!( + err = %e, + body_len = body.len(), + "callback_router: dropping unparseable callback body" + ); + } } } } @@ -2961,18 +2999,31 @@ mod tests { pending_ops, )); - // Build a minimal valid 0x32 SubscriptionStatus body: 23-byte - // preamble + 16-byte item_correlation_id, record_count=0 so no - // records follow. Total: 39 bytes. Using 0x32 (not 0x33) - // because DataUpdate always attempts to parse one record - // regardless of record_count, and we'd need a full 38-byte - // record body to satisfy that parser. - let mut body = vec![0u8; 39]; - body[0] = 0x32; - body[1..3].copy_from_slice(&1u16.to_le_bytes()); // version - body[3..7].copy_from_slice(&0i32.to_le_bytes()); // record_count - body[7..23].copy_from_slice(&[0xEFu8; 16]); // operation_id - body[23..39].copy_from_slice(&[0xCDu8; 16]); // item_correlation_id + // Build a minimal valid 0x32 SubscriptionStatus body wrapped + // in a `ProcessDataReceived` envelope (header-only form, no + // 4-byte total-length prefix): 46-byte header + 39-byte inner. + // The header's `inner_length` at offset 2 is `inner_len + 4` + // (.NET cs:54-56 — declared length includes the size-of-int). + // Using 0x32 (not 0x33) because DataUpdate always attempts + // to parse one record regardless of record_count, and we'd + // need a full 38-byte record body to satisfy that parser. + const HEADER_LEN: usize = 46; + const INNER_LEN_OFFSET: usize = 2; + let inner_len = 39usize; + let mut body = vec![0u8; HEADER_LEN + inner_len]; + // Inner-length declaration (at INNER_LEN_OFFSET = 2). Flexible + // (header-only) form compares `declared == body.len() - HEADER_LEN` + // verbatim — no `-4` adjustment (`observed_frame.rs:178`); the + // adjustment only applies on the strict path where there's a + // 4-byte total-length prefix in front. + let declared = inner_len as i32; + body[INNER_LEN_OFFSET..INNER_LEN_OFFSET + 4].copy_from_slice(&declared.to_le_bytes()); + let inner = &mut body[HEADER_LEN..]; + inner[0] = 0x32; + inner[1..3].copy_from_slice(&1u16.to_le_bytes()); // version + inner[3..7].copy_from_slice(&0i32.to_le_bytes()); // record_count + inner[7..23].copy_from_slice(&[0xEFu8; 16]); // operation_id + inner[23..39].copy_from_slice(&[0xCDu8; 16]); // item_correlation_id let event = CallbackEvent::CallbackInvoked { opnum: 4, body }; event_tx.send(event).unwrap();