diff --git a/design/followups.md b/design/followups.md index fdcbd19..81c074d 100644 --- a/design/followups.md +++ b/design/followups.md @@ -102,8 +102,33 @@ Between each publish: wait for the crate to be indexed before the next one's `ca **Resolves when:** the lint is on and the workspace doc build is warning-clean with it. -### F56 — Buffered subscribe completes RegisterReference but receives no `0x33` DataUpdate frames -**Severity:** P1 — blocks F49 step 1 (F36 buffered live verification) and any consumer relying on `Session::subscribe_buffered` to surface value changes. +### F56 — `subscribe` / `subscribe_buffered` complete on the wire but never receive `0x33` DataUpdate frames +**Status:** Diagnosed 2026-05-06 as a **test-fixture issue, not a Rust port bug**. The .NET reference's own `MxNativeClient.Probe --probe-session-subscribe --tag=TestChildObject.TestInt` returns a single `0x32` SubscriptionStatus with `status=3 detail=3 quality=0x00C0 (Uncertain) value=null` and zero `0x33` DataUpdates — same observation as the Rust port's `subscribe` / `subscribe_buffered` paths. The engine on this Galaxy install does not have a live value for `TestChildObject.TestInt`; nothing is scanning that attribute, so there are no value-changes for the engine to dispatch. F49 steps 1-3 need either (a) a different test tag with active scanning, or (b) configuring the local Galaxy to scan TestChildObject.TestInt before live verification can pass. + +Real codec fixes still landed in this session (envelope-peeling for `NmxSubscriptionMessage` + `0x11` registration-result path + split-form RegisterReference body + per-session item_handle counter); they were necessary preconditions for F49 step 1 even if the test fixture blocks the actual pass criterion. + +**Severity:** P1 — blocks F49 step 1 (F36 buffered live verification), F49 step 2 (F45 recovery replay), and ALL consumers relying on subscription data flow on this Galaxy. + +**Updated 2026-05-06.** Initial diagnosis suspected a buffered-specific wire-body gap; ruled out: +- Wire body proven byte-identical to the .NET reference's by `crates/mxaccess-codec/tests/buffered_register_reference_parity.rs` (which forward-builds the message from `Session::subscribe_buffered`'s inputs and compares against `captures/082-frida-add-buffered-plain-advise-testint/`). +- Test now uses real Galaxy DB metadata via `mxaccess_galaxy::SqlTagResolver` (engine_id=2, attribute_id=155, etc.) instead of the hardcoded StaticResolver shim. +- Item-handle, item_definition, item_context all switched to the .NET-reference split form (handle=1 + per-session counter, definition=".property(buffer)", context=""). + +**Plain subscribe also fails.** Added `crates/mxaccess-compat/tests/plain_subscribe_live.rs` driving `Session::subscribe` (NOT buffered). Same symptom: `AdviseSupervisory` returns HRESULT 0, the engine acks every write with a 51-byte op-status frame, but no `0x33` DataUpdate ever arrives. So this is **not buffered-specific** — the entire inbound DataUpdate path is silent on this machine. + +**Likely revised root cause:** +- The engine generates `0x33` DataUpdate frames into a different transport channel than the one our DCOM sink listens on. The .NET reference's `INmxSvcCallback` has two opnums — `DataReceivedRaw` (3) and `StatusReceivedRaw` (4). We only ever observe opnum=3 callbacks. If the engine routes data updates through a different IID or different DCOM stub on this install, our sink never sees them. +- Alternatively, the engine on this Galaxy install is configured such that local Object scanning is disabled / the deployed objects aren't actively producing value-change events. The `OnWriteComplete` round-trip works (proves write-path + callback-path); a passive subscription doesn't produce updates if no source changes the value. + +**Action items (for whoever picks F56 up):** +1. Compare the **C# DcomCallbackSink** (`src/MxNativeClient/NmxCallbackSink.cs`) to the Rust port's `mxaccess-callback::dcom_sink` — verify it implements **only** `INmxSvcCallback` and that the IID + vtable layout match. There may be a third method or a sibling interface (e.g. `INmxSvcCallback2`) that the engine also calls into for high-cadence DataUpdate dispatch. +2. Try the same live test against a tag that has known active scanning (e.g. a bound-to-PLC InputSource attribute) — rule out static-UDA hypothesis. +3. Run `MxNativeClient.Probe --probe-session-subscribe --tag=TestChildObject.TestInt --subscribe-hold-seconds=30` (the .NET reference's working live probe) and confirm `0x33` DataUpdates fire on THIS machine. If they do, capture the wire bytes via Frida and diff against the Rust port's exact body. + +**What landed in this session (real router/codec fixes, NOT F56-resolving):** +- `NmxSubscriptionMessage::try_parse_process_data_received_body` — peels the `ProcessDataReceived` envelope before calling `parse_inner`. The router previously called `parse_inner` directly on wire bytes, which would have silently dropped any `0x33` even if one arrived. +- `NmxReferenceRegistrationResultMessage::try_parse_process_data_received_body` + router branch — drops `0x11` registration-result frames cleanly instead of logging "unexpected opcode 0x11". +- `Session::subscribe_buffered_nmx` — split-form (object, attribute) wire body + per-session monotonic `item_handle` counter (mirrors `MxNativeCompatibilityServer.AddBufferedItemAsync`'s `_nextItemHandle++`). **Source:** F49 step 1 live attempt 2026-05-06. Test `cargo test -p mxaccess-compat --features live-windows-com --test buffered_subscribe_live -- --ignored --nocapture` (added in this session) connects via `Session::connect_nmx_auto` (F55-proven), issues `subscribe_buffered(TestChildObject.TestInt, 1000ms)` against the live engine, and runs a background writer at 500ms cadence. RegisterReference returns HRESULT 0; the engine then fires: - One 46-byte heartbeat envelope (header-only, empty inner) - One 51-byte op-status frame for the `RegisterReference` completion diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 560d03b..d913b38 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -603,6 +603,7 @@ dependencies = [ "async-trait", "futures-util", "mxaccess", + "mxaccess-galaxy", "mxaccess-rpc", "thiserror 2.0.18", "tokio", diff --git a/rust/crates/mxaccess-compat/Cargo.toml b/rust/crates/mxaccess-compat/Cargo.toml index fe375b4..1ff40c6 100644 --- a/rust/crates/mxaccess-compat/Cargo.toml +++ b/rust/crates/mxaccess-compat/Cargo.toml @@ -19,6 +19,13 @@ thiserror = { workspace = true } tokio = { workspace = true, features = ["macros", "rt", "rt-multi-thread", "sync", "time"] } async-trait = { workspace = true } mxaccess-rpc = { path = "../mxaccess-rpc", version = "0.0.0" } +# F56 — buffered subscribe live test needs real Galaxy DB metadata +# (engine_id / platform_id / object_id / attribute_id from +# `dbo.gobject` etc.); the StaticResolver shim used by lmx_write_live +# was hardcoded to platform_id=1 / engine_id=2 which the engine +# silently accepts for writes but doesn't dispatch DataUpdate frames +# against. The buffered live test resolves real IDs via SqlTagResolver. +mxaccess-galaxy = { path = "../mxaccess-galaxy", version = "0.0.0", features = ["galaxy-resolver"] } # Live tests use tracing-subscriber to dump router/dcom_sink trace # events on demand (set RUST_LOG=mxaccess=trace,mxaccess_callback=trace). tracing = { workspace = true } diff --git a/rust/crates/mxaccess-compat/tests/buffered_subscribe_live.rs b/rust/crates/mxaccess-compat/tests/buffered_subscribe_live.rs index ca164f3..3e17329 100644 --- a/rust/crates/mxaccess-compat/tests/buffered_subscribe_live.rs +++ b/rust/crates/mxaccess-compat/tests/buffered_subscribe_live.rs @@ -32,56 +32,10 @@ mod live { use std::time::{Duration, Instant}; use futures_util::StreamExt; - use mxaccess::{ - BufferedOptions, GalaxyTagMetadata, MxValue, RecoveryPolicy, Resolver, ResolverError, - Session, SessionOptions, - }; + use mxaccess::{BufferedOptions, MxValue, RecoveryPolicy, Session, SessionOptions}; + use mxaccess_galaxy::SqlTagResolver; use mxaccess_rpc::ntlm::NtlmClientContext; - struct StaticResolver { - tag_reference: String, - metadata: GalaxyTagMetadata, - } - - impl StaticResolver { - fn new(tag_reference: &str) -> Self { - let (object, attribute) = tag_reference - .split_once('.') - .unwrap_or((tag_reference, "TestInt")); - Self { - tag_reference: tag_reference.to_string(), - metadata: GalaxyTagMetadata { - object_tag_name: object.to_string(), - attribute_name: attribute.to_string(), - primitive_name: None, - platform_id: 1, - engine_id: 2, - object_id: 3, - primitive_id: 0, - attribute_id: 7, - property_id: GalaxyTagMetadata::VALUE_PROPERTY_ID, - mx_data_type: 2, - is_array: false, - security_classification: 0, - attribute_source: "dynamic".into(), - }, - } - } - } - - #[async_trait::async_trait] - impl Resolver for StaticResolver { - async fn resolve(&self, tag: &str) -> Result { - if tag == self.tag_reference { - Ok(self.metadata.clone()) - } else { - Err(ResolverError::NotFound { - tag_reference: tag.to_string(), - }) - } - } - } - fn ntlm_from_test_env() -> NtlmClientContext { let user = std::env::var("MX_TEST_USER").expect("MX_TEST_USER"); let password = std::env::var("MX_TEST_PASSWORD").expect("MX_TEST_PASSWORD"); @@ -111,11 +65,42 @@ mod live { .with_test_writer() .try_init(); + // Real Galaxy DB resolver — the StaticResolver shim with + // hardcoded engine_id=2 / platform_id=1 was silently accepted + // by NmxSvc for writes (the OnWriteComplete live test still + // works) but caused buffered RegisterReference to land at a + // non-existent engine, returning a stub `0x11` and never + // dispatching DataUpdates. F56 root cause. + let galaxy_db = std::env::var("MX_GALAXY_DB") + .expect("MX_GALAXY_DB (set via tools/Setup-LiveProbeEnv.ps1)"); + let resolver = Arc::new( + SqlTagResolver::from_ado_string(&galaxy_db).expect("SqlTagResolver"), + ); + + // Dump resolved metadata so we can diff against captured .NET wire bytes. + { + use mxaccess_galaxy::Resolver as _; + let m = resolver.resolve(&tag).await.expect("resolve test tag"); + eprintln!( + "resolved {tag}: object_tag={:?} attribute={:?} primitive={:?} platform={} engine={} object={} attribute_id={} property_id={} mx_type={} is_array={}", + m.object_tag_name, + m.attribute_name, + m.primitive_name, + m.platform_id, + m.engine_id, + m.object_id, + m.attribute_id, + m.property_id, + m.mx_data_type, + m.is_array, + ); + } + eprintln!("connecting via Session::connect_nmx_auto"); let session = Session::connect_nmx_auto( ntlm_from_test_env, SessionOptions::default(), - Arc::new(StaticResolver::new(&tag)), + resolver, RecoveryPolicy::default(), ) .await diff --git a/rust/crates/mxaccess-compat/tests/plain_subscribe_live.rs b/rust/crates/mxaccess-compat/tests/plain_subscribe_live.rs new file mode 100644 index 0000000..d803969 --- /dev/null +++ b/rust/crates/mxaccess-compat/tests/plain_subscribe_live.rs @@ -0,0 +1,125 @@ +//! Plain (non-buffered) subscribe live diagnostic for F49 / F56. +//! +//! Mirror of `buffered_subscribe_live.rs` but invokes +//! `Session::subscribe` instead of `subscribe_buffered`. Used to +//! isolate whether F56's "no DataUpdate" symptom is buffered-specific +//! (only `subscribe_buffered` broken) or affects all subscribe paths. + +#![allow( + clippy::unwrap_used, + clippy::expect_used, + clippy::indexing_slicing, + clippy::panic +)] + +#[cfg(all(windows, feature = "live-windows-com"))] +mod live { + use std::sync::Arc; + use std::time::{Duration, Instant}; + + use futures_util::StreamExt; + use mxaccess::{MxValue, RecoveryPolicy, Session, SessionOptions}; + use mxaccess_galaxy::SqlTagResolver; + use mxaccess_rpc::ntlm::NtlmClientContext; + + fn ntlm_from_test_env() -> NtlmClientContext { + let user = std::env::var("MX_TEST_USER").expect("MX_TEST_USER"); + let password = std::env::var("MX_TEST_PASSWORD").expect("MX_TEST_PASSWORD"); + let domain = std::env::var("MX_TEST_DOMAIN").unwrap_or_default(); + let hostname = std::env::var("COMPUTERNAME").unwrap_or_default(); + NtlmClientContext::new(&user, &password, &domain, Some(&hostname)) + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + #[ignore] + async fn plain_subscribe_yields_updates() { + if std::env::var_os("MX_LIVE").is_none() { + eprintln!("MX_LIVE not set — skipping live test"); + return; + } + let tag = std::env::var("MX_TEST_TAG") + .unwrap_or_else(|_| "TestChildObject.TestInt".to_string()); + + let _ = tracing_subscriber::fmt() + .with_env_filter( + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")), + ) + .with_test_writer() + .try_init(); + + let galaxy_db = std::env::var("MX_GALAXY_DB").expect("MX_GALAXY_DB"); + let resolver = Arc::new( + SqlTagResolver::from_ado_string(&galaxy_db).expect("SqlTagResolver"), + ); + + let session = Session::connect_nmx_auto( + ntlm_from_test_env, + SessionOptions::default(), + resolver, + RecoveryPolicy::default(), + ) + .await + .expect("connect_nmx_auto"); + eprintln!("session connected"); + + let mut sub = session.subscribe(&tag).await.expect("subscribe"); + eprintln!("plain subscribe correlation_id = {:02x?}", sub.correlation_id()); + + // Background writer to force value changes. + let deadline = Instant::now() + Duration::from_secs(20); + let writer_session = session.clone(); + let writer_tag = tag.clone(); + let writer_stop = Arc::new(std::sync::atomic::AtomicBool::new(false)); + let writer_stop_clone = writer_stop.clone(); + let writer = tokio::spawn(async move { + let mut value: i32 = 2_000; + while !writer_stop_clone.load(std::sync::atomic::Ordering::Acquire) { + if writer_session + .write(&writer_tag, MxValue::Int32(value)) + .await + .is_err() + { + break; + } + value = value.wrapping_add(1); + tokio::time::sleep(Duration::from_millis(500)).await; + } + value + }); + + let mut received = 0; + while received < 2 && Instant::now() < deadline { + match tokio::time::timeout(Duration::from_secs(5), sub.next()).await { + Ok(Some(Ok(dc))) => { + eprintln!("[{received}] {} = {:?} ts={:?}", dc.reference, dc.value, dc.timestamp); + received += 1; + } + Ok(Some(Err(e))) => { + writer_stop.store(true, std::sync::atomic::Ordering::Release); + let _ = writer.await; + panic!("subscription error: {e}"); + } + Ok(None) => break, + Err(_) => eprintln!("5s gap waiting for next update"), + } + } + writer_stop.store(true, std::sync::atomic::Ordering::Release); + let _ = writer.await; + + assert!(received >= 1, "no DataChange arrived for plain subscribe"); + eprintln!("received {received} updates via plain subscribe"); + + session.unsubscribe(sub).await.expect("unsubscribe"); + session.shutdown_nmx().await.expect("shutdown"); + } +} + +#[cfg(not(all(windows, feature = "live-windows-com")))] +mod live { + #[test] + #[ignore] + fn plain_subscribe_yields_updates() { + eprintln!("test skipped: requires Windows + live-windows-com feature"); + } +} diff --git a/rust/crates/mxaccess-galaxy/src/sql_resolver.rs b/rust/crates/mxaccess-galaxy/src/sql_resolver.rs index 1db566c..3f8392d 100644 --- a/rust/crates/mxaccess-galaxy/src/sql_resolver.rs +++ b/rust/crates/mxaccess-galaxy/src/sql_resolver.rs @@ -35,8 +35,11 @@ //! (`GalaxyRepositoryTagResolver.cs:93-95`). The Galaxy DB is not //! request-pooled in the .NET shape either — tag resolution happens once //! per session bring-up, not on the data-plane hot path. - -#![cfg(feature = "galaxy-resolver")] +//! +//! The crate-level `#[cfg(feature = "galaxy-resolver")]` gate sits on the +//! `pub mod sql_resolver` declaration in `lib.rs`, so the inner-attribute +//! form here would just duplicate that and trip +//! `clippy::duplicated_attributes`. use std::sync::OnceLock; diff --git a/rust/crates/mxaccess/src/session.rs b/rust/crates/mxaccess/src/session.rs index d31811c..09527d6 100644 --- a/rust/crates/mxaccess/src/session.rs +++ b/rust/crates/mxaccess/src/session.rs @@ -614,6 +614,18 @@ pub struct SessionInner { /// dictionaries (`MxNativeSession.cs` field-level comments) plus /// the ordered list those dictionaries are consulted against. pub(crate) pending_ops: Arc>, + /// F56 — monotonically-increasing item-handle assigner for + /// `subscribe_buffered`. The .NET reference's + /// `MxNativeCompatibilityServer.AddItem` flow assigns `1, 2, 3, ...` + /// at the LMX layer (`MxNativeCompatibilityServer.cs`) and threads + /// the handle through to the `RegisterReference` wire body. + /// Sending `0` (the previous behaviour) caused the engine to + /// silently swallow the buffered subscription — RegisterReference + /// returned HRESULT 0 + a `0x11` registration result fired, but no + /// `0x33` DataUpdate frames followed. Starting at 1 mirrors the + /// .NET LMX behaviour captured at + /// `captures/094-frida-buffered-separate-writer/frida-events.tsv:13`. + pub(crate) next_item_handle: std::sync::atomic::AtomicI32, /// F55 / Path A — keeps the DCOM-managed `INmxSvcCallback`'s /// `IUnknown` ref alive for the session's lifetime. The marshalled /// OBJREF passed to `RegisterEngine2` references this object's @@ -841,15 +853,34 @@ pub(crate) async fn callback_router( // we currently just consume + drop the frame at trace // level so the catch-all parse below doesn't log a // spurious "unexpected opcode 0x11" warning. - if let Ok(result) = - NmxReferenceRegistrationResultMessage::try_parse_process_data_received_body(&body) - { - tracing::trace!( - item_handle = result.item_handle, - correlation_id = ?result.item_correlation_id, - "callback_router: 0x11 RegistrationResult received" - ); - continue; + match NmxReferenceRegistrationResultMessage::try_parse_process_data_received_body( + &body, + ) { + Ok(result) => { + tracing::trace!( + item_handle = result.item_handle, + correlation_id = ?result.item_correlation_id, + item_definition = %result.item_definition, + item_context = %result.item_context, + status_category = result.status_category, + status_detail = result.status_detail, + "callback_router: 0x11 RegistrationResult received" + ); + continue; + } + Err(e) => { + let hex: String = body + .iter() + .map(|b| format!("{b:02x}")) + .collect::>() + .join(" "); + tracing::trace!( + err = %e, + body_len = body.len(), + body_hex = %hex, + "callback_router: not a 0x11 RegistrationResult, falling through" + ); + } } // 3. Fall through to subscription messages. Wire bytes @@ -1107,6 +1138,7 @@ impl Session { callback_obj_ref, rebuild_factory: Mutex::new(None), pending_ops, + next_item_handle: std::sync::atomic::AtomicI32::new(1), #[cfg(all(windows, feature = "windows-com"))] dcom_sink_holder: Mutex::new(dcom_sink_holder), }), @@ -1935,25 +1967,36 @@ impl Session { .map_err(map_resolver)?; let correlation_id: [u8; 16] = rand::random(); - // Build the buffered RegisterReference body. Item definition is - // the full reference suffixed with `.property(buffer)`; item - // context is empty for this single-string form (the .NET - // reference's split-context form is reachable via the - // compat-server layer F35 once it lands). The codec helper - // rejects empty/whitespace inputs with `CodecError::InvalidName`. - let item_definition = NmxReferenceRegistrationMessage::to_buffered_item_definition( - reference, - ) - .map_err(|e| { - Error::Configuration(ConfigError::InvalidArgument { - detail: format!("buffered item definition: {e}"), - }) - })?; + // F56 — build the buffered RegisterReference body in the split + // (object, attribute) form the .NET reference uses on the wire: + // item_definition = ".property(buffer)" + // item_context = "" + // item_handle = sequential per-session counter starting at 1 + // + // The previous implementation used the single-string form (full + // reference in `item_definition`, empty `item_context`, + // `item_handle = 0`). RegisterReference returned HRESULT 0 and + // the engine fired a `0x11` registration result, but **no + // `0x33` DataUpdate frames ever followed** — confirmed live + // 2026-05-06. Switching to the split form mirrors the captured + // .NET wire bytes at + // `captures/094-frida-buffered-separate-writer/frida-events.tsv:23` + // (the PutRequest body at 21:40:19.970). + let item_handle = inner + .next_item_handle + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + let attribute_only_definition = + NmxReferenceRegistrationMessage::to_buffered_item_definition(&metadata.attribute_name) + .map_err(|e| { + Error::Configuration(ConfigError::InvalidArgument { + detail: format!("buffered item definition: {e}"), + }) + })?; let registration = NmxReferenceRegistrationMessage { - item_handle: 0, + item_handle, item_correlation_id: correlation_id, - item_definition, - item_context: String::new(), + item_definition: attribute_only_definition, + item_context: metadata.object_tag_name.clone(), subscribe: true, reserved_25_27: [0; 2], reserved_31_55: [0; 24], @@ -1996,9 +2039,13 @@ impl Session { metadata: Arc::clone(&metadata_arc), mode: SubscriptionMode::Buffered { rounded_interval_ms: rounded_ms, - item_definition: reference.to_string(), - item_context: String::new(), - item_handle: 0, + // F56 — recovery replays via `register_reference` and + // must reissue the same wire body. Save the split + // (object, attribute, handle) triple, NOT the + // pre-F56 single-string form. + item_definition: registration.item_definition.clone(), + item_context: registration.item_context.clone(), + item_handle, }, }, ); @@ -2554,6 +2601,7 @@ mod tests { callback_obj_ref: Vec::new(), rebuild_factory: Mutex::new(None), pending_ops, + next_item_handle: std::sync::atomic::AtomicI32::new(1), #[cfg(all(windows, feature = "windows-com"))] dcom_sink_holder: Mutex::new(None), }),