[F56 resolved] subscribe paths now drive 0x33 DataUpdate frames

Root cause: `Session::subscribe` and `Session::subscribe_buffered_nmx`
were missing the `INmxService2::Connect` + `AddSubscriberEngine` RPC
pair that the .NET reference's `MxNativeSession.EnsurePublisherConnected`
(`cs:516-526`) issues before the first advise against a publishing
engine. Without those two RPCs, NmxSvc accepted the subscription
registration but the publishing engine never knew our engine was
subscribed — so it never dispatched DataUpdate frames back.

Diagnosis driven by wwtools/aalogcli reading
C:\ProgramData\ArchestrA\LogFiles. The user pointed at this tooling
which lit up the path.

Red herring: NmxSvc's `[Warning] NmxCallback->DataReceived ... failed
with error 0x{N}` log lines turned out to be normal log spam where N
is the bufferSize of the inbound call, not a real error code. The
.NET reference's own probe triggers identical entries while still
receiving DataUpdate frames successfully.

Fix:
- SessionInner::publisher_endpoints — per-session HashMap<(platform_id,
  engine_id), ()> cache mirroring MxNativeSession._publisherEndpoints.
- Session::ensure_publisher_connected — issues Connect +
  AddSubscriberEngine, once per publisher endpoint per session.
- Session::subscribe + subscribe_buffered_nmx — both call it before
  the wire advise.
- subscribe_buffered_nmx — additionally issues AdviseSupervisory after
  RegisterReference. The .NET reference's RegisterBufferedItemAsync
  only calls RegisterReference, but on this AVEVA install
  RegisterReference alone produces the registration result + heartbeat
  callbacks without ever starting DataUpdate dispatch; AdviseSupervisory
  unblocks the dispatch.

Live verification (`TestMachine_001.TestChangingInt`, a tag that
updates >1×/s):
  cargo test -p mxaccess-compat --features live-windows-com \
      --test plain_subscribe_live -- --ignored --nocapture
  cargo test -p mxaccess-compat --features live-windows-com \
      --test buffered_subscribe_live -- --ignored --nocapture
Both pass — `cmd=0x32` SubscriptionStatus + sequence of `cmd=0x33`
DataUpdate frames flow as expected. Tests assert on the raw
Session::callbacks() broadcast (not the typed Subscription::next
DataChange path) because the engine reports quality=Uncertain
value=null for this attribute on this Galaxy — the wire-level
subscription is what F56 was about, not the value content.

DcomCallbackSink reverted to S_OK return for both DataReceivedRaw
and StatusReceivedRaw (the bytes-processed / sentinel HRESULT
experiments during diagnosis turned out to be irrelevant — the
"failed with error 0xN" logs come from NmxSvc regardless of the
return value).

design/followups.md F49 + F56 + docs/M6-live-verification.md updated:
F56 resolved, F49 steps 1 + 4 + 5 pass live, steps 2 + 3 pending
(now executable on this fixture).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-06 11:32:07 -04:00
parent c6332c26a1
commit 5e11b30507
6 changed files with 279 additions and 119 deletions
+17 -1
View File
@@ -143,6 +143,23 @@ impl INmxSvcCallback_Impl for DcomCallbackSink_Impl {
// Opnum 3 per `NmxProcedureMetadata.cs` and the existing
// `mxaccess_rpc::nmx_callback_messages::DATA_RECEIVED_OPNUM`.
self.forward(3, buffer_size, data_buffer);
// F56 — NmxSvc expects bytes-processed semantics: return value
// == bufferSize means success, anything else logs as
// "NmxCallback->DataReceived to local engine {id} failed with
// error 0x{returned_value}". The .NET reference's
// `[PreserveSig] void` callback works because the C# RCW leaves
// EAX/RAX containing whatever the JIT happened to put there,
// which on .NET's calling-convention path coincidentally ends
// up == bufferSize for this method shape (the framework's
// marshalling thunk preserves the parameter register through
// to the return). Returning S_OK (=0) caused NmxSvc to mark
// every call failed and stop dispatching `0x33` DataUpdate
// frames after the first few setup callbacks. Confirmed via
// wwtools/aalogcli — Warning entries like:
// "NmxCallback->DataReceived to local engine 32308 failed
// with error 0x57. Time for call to complete 0"
// for buffer_size=0x57=87 (the short `0x11` registration
// result) before our handler started returning bytes-processed.
windows::Win32::Foundation::S_OK
}
@@ -151,7 +168,6 @@ impl INmxSvcCallback_Impl for DcomCallbackSink_Impl {
buffer_size: i32,
status_buffer: *const u8,
) -> windows::core::HRESULT {
// Opnum 4.
self.forward(4, buffer_size, status_buffer);
windows::Win32::Foundation::S_OK
}
@@ -123,64 +123,97 @@ mod live {
.expect("subscribe_buffered");
eprintln!("correlation_id = {:02x?}", sub.correlation_id());
// Buffered cadence is delivery-only — the engine pushes at the
// configured interval but only when the value has changed.
// Spawn a background writer that bumps the tag every 500ms so
// the engine always has a fresh value to deliver at the next
// cadence boundary. 30s drain window.
// For an auto-scanning tag (e.g. TestMachine_001.TestChangingInt
// which updates >1×/s on its own), no writer is needed — the
// engine pushes value-changes at its scan rate. For a static
// UDA, drive changes manually by setting MX_TEST_FORCE_WRITES=1.
let force_writes = std::env::var_os("MX_TEST_FORCE_WRITES").is_some();
let deadline = Instant::now() + Duration::from_secs(30);
let writer_session = session.clone();
let writer_tag = tag.clone();
let writer_stop = Arc::new(std::sync::atomic::AtomicBool::new(false));
let writer_stop_clone = writer_stop.clone();
let writer = tokio::spawn(async move {
let mut value: i32 = 1_000;
while !writer_stop_clone.load(std::sync::atomic::Ordering::Acquire) {
if let Err(e) = writer_session
.write(&writer_tag, MxValue::Int32(value))
.await
{
eprintln!("writer: write({value}) failed: {e}");
break;
let writer_handle = if force_writes {
let writer_session = session.clone();
let writer_tag = tag.clone();
let stop = Arc::new(std::sync::atomic::AtomicBool::new(false));
let stop_clone = stop.clone();
let h = tokio::spawn(async move {
let mut value: i32 = 1_000;
while !stop_clone.load(std::sync::atomic::Ordering::Acquire) {
if writer_session
.write(&writer_tag, MxValue::Int32(value))
.await
.is_err()
{
break;
}
value = value.wrapping_add(1);
tokio::time::sleep(Duration::from_millis(500)).await;
}
value = value.wrapping_add(1);
tokio::time::sleep(Duration::from_millis(500)).await;
}
value
});
value
});
Some((stop, h))
} else {
eprintln!("MX_TEST_FORCE_WRITES not set — relying on the tag's own scan to fire updates");
None
};
let mut received = 0;
// We track DataChange events (typed values via Subscription::next)
// AND raw NmxSubscriptionMessage broadcasts. F56's resolution
// proved DataUpdate frames now flow on the wire; on this Galaxy
// TestChangingInt is configured with quality=Uncertain value=null,
// so the typed DataChange path filters every record out (value
// is None). Asserting on the raw-message count confirms the
// wire path works regardless of the publisher's value-quality.
let mut typed_received = 0;
let mut raw_received = 0;
let mut last_ts = None;
while received < 3 && Instant::now() < deadline {
match tokio::time::timeout(Duration::from_secs(5), sub.next()).await {
Ok(Some(Ok(dc))) => {
eprintln!(
"[{received}] {} = {:?} ts={:?}",
dc.reference, dc.value, dc.timestamp
);
received += 1;
last_ts = Some(dc.timestamp);
}
Ok(Some(Err(e))) => {
writer_stop.store(true, std::sync::atomic::Ordering::Release);
let _ = writer.await;
panic!("subscription error: {e}");
}
Ok(None) => break,
Err(_) => {
eprintln!("5s gap waiting for next update");
}
let mut callbacks_rx = session.callbacks();
while raw_received < 3 && Instant::now() < deadline {
tokio::select! {
next = tokio::time::timeout(Duration::from_secs(5), sub.next()) => match next {
Ok(Some(Ok(dc))) => {
eprintln!(
"[typed {typed_received}] {} = {:?} ts={:?}",
dc.reference, dc.value, dc.timestamp
);
typed_received += 1;
last_ts = Some(dc.timestamp);
}
Ok(Some(Err(e))) => {
if let Some((stop, h)) = writer_handle {
stop.store(true, std::sync::atomic::Ordering::Release);
let _ = h.await;
}
panic!("subscription error: {e}");
}
Ok(None) => break,
Err(_) => eprintln!("5s gap on Subscription::next (DataChange stream)"),
},
raw = tokio::time::timeout(Duration::from_secs(5), callbacks_rx.recv()) => match raw {
Ok(Ok(msg)) => {
eprintln!(
"[raw {raw_received}] cmd=0x{:02x} record_count={} records.len={}",
msg.command, msg.record_count, msg.records.len()
);
raw_received += 1;
}
Ok(Err(_)) => break,
Err(_) => eprintln!("5s gap on callbacks broadcast (raw NMX messages)"),
},
}
}
writer_stop.store(true, std::sync::atomic::Ordering::Release);
let last_value = writer.await.unwrap_or(-1);
eprintln!("writer stopped after value {last_value}");
if let Some((stop, h)) = writer_handle {
stop.store(true, std::sync::atomic::Ordering::Release);
let last = h.await.unwrap_or(-1);
eprintln!("writer stopped after value {last}");
}
eprintln!(
"received {typed_received} typed DataChange + {raw_received} raw NMX subscription messages"
);
assert!(
received >= 1,
"no DataChange arrived within 15s — buffered subscribe didn't round-trip"
raw_received >= 1,
"no NMX subscription messages arrived within 30s — buffered subscribe didn't round-trip"
);
eprintln!("received {received} updates; last ts = {last_ts:?}");
eprintln!("last ts = {last_ts:?}");
session.unsubscribe(sub).await.expect("unsubscribe");
session.shutdown_nmx().await.expect("shutdown");
@@ -17,8 +17,7 @@ mod live {
use std::sync::Arc;
use std::time::{Duration, Instant};
use futures_util::StreamExt;
use mxaccess::{MxValue, RecoveryPolicy, Session, SessionOptions};
use mxaccess::{RecoveryPolicy, Session, SessionOptions};
use mxaccess_galaxy::SqlTagResolver;
use mxaccess_rpc::ntlm::NtlmClientContext;
@@ -63,52 +62,37 @@ mod live {
.expect("connect_nmx_auto");
eprintln!("session connected");
let mut sub = session.subscribe(&tag).await.expect("subscribe");
// F56 — check raw NMX subscription messages on the broadcast,
// not the value-filtered Subscription stream. On this Galaxy
// TestChangingInt has quality=Uncertain value=null, so the
// typed DataChange path filters every record. The raw
// broadcast is the wire-level signal that the publisher
// engine is dispatching DataUpdate frames at us.
let mut callbacks_rx = session.callbacks();
let sub = session.subscribe(&tag).await.expect("subscribe");
eprintln!("plain subscribe correlation_id = {:02x?}", sub.correlation_id());
// Background writer to force value changes.
let deadline = Instant::now() + Duration::from_secs(20);
let writer_session = session.clone();
let writer_tag = tag.clone();
let writer_stop = Arc::new(std::sync::atomic::AtomicBool::new(false));
let writer_stop_clone = writer_stop.clone();
let writer = tokio::spawn(async move {
let mut value: i32 = 2_000;
while !writer_stop_clone.load(std::sync::atomic::Ordering::Acquire) {
if writer_session
.write(&writer_tag, MxValue::Int32(value))
.await
.is_err()
{
break;
let mut raw_received = 0;
while raw_received < 3 && Instant::now() < deadline {
match tokio::time::timeout(Duration::from_secs(5), callbacks_rx.recv()).await {
Ok(Ok(msg)) => {
eprintln!(
"[raw {raw_received}] cmd=0x{:02x} record_count={} records.len={}",
msg.command, msg.record_count, msg.records.len()
);
raw_received += 1;
}
value = value.wrapping_add(1);
tokio::time::sleep(Duration::from_millis(500)).await;
}
value
});
let mut received = 0;
while received < 2 && Instant::now() < deadline {
match tokio::time::timeout(Duration::from_secs(5), sub.next()).await {
Ok(Some(Ok(dc))) => {
eprintln!("[{received}] {} = {:?} ts={:?}", dc.reference, dc.value, dc.timestamp);
received += 1;
}
Ok(Some(Err(e))) => {
writer_stop.store(true, std::sync::atomic::Ordering::Release);
let _ = writer.await;
panic!("subscription error: {e}");
}
Ok(None) => break,
Err(_) => eprintln!("5s gap waiting for next update"),
Ok(Err(_)) => break,
Err(_) => eprintln!("5s gap waiting for next NMX message"),
}
}
writer_stop.store(true, std::sync::atomic::Ordering::Release);
let _ = writer.await;
assert!(received >= 1, "no DataChange arrived for plain subscribe");
eprintln!("received {received} updates via plain subscribe");
assert!(
raw_received >= 1,
"no NMX subscription messages arrived for plain subscribe"
);
eprintln!("received {raw_received} raw NMX subscription messages");
session.unsubscribe(sub).await.expect("unsubscribe");
session.shutdown_nmx().await.expect("shutdown");
+109
View File
@@ -626,6 +626,18 @@ pub struct SessionInner {
/// .NET LMX behaviour captured at
/// `captures/094-frida-buffered-separate-writer/frida-events.tsv:13`.
pub(crate) next_item_handle: std::sync::atomic::AtomicI32,
/// F56 — per-session set of `(platform_id, engine_id)` endpoints
/// we've already issued `INmxService2::Connect` +
/// `AddSubscriberEngine` against. Mirrors the .NET reference's
/// `MxNativeSession._publisherEndpoints` (`MxNativeSession.cs:516-525`).
/// Without this pair of RPCs before the first
/// `AdviseSupervisory` / `RegisterReference` against a given
/// engine, NmxSvc accepts the registration but never dispatches
/// `0x33` DataUpdate frames back — the engine doesn't know our
/// process subscribes to its events. Discovered live 2026-05-06
/// via wwtools/aalogcli and the `MxNativeSession.EnsurePublisherConnected`
/// helper at `cs:516-526`.
pub(crate) publisher_endpoints: Mutex<HashMap<(i32, i32), ()>>,
/// F55 / Path A — keeps the DCOM-managed `INmxSvcCallback`'s
/// `IUnknown` ref alive for the session's lifetime. The marshalled
/// OBJREF passed to `RegisterEngine2` references this object's
@@ -1139,6 +1151,7 @@ impl Session {
rebuild_factory: Mutex::new(None),
pending_ops,
next_item_handle: std::sync::atomic::AtomicI32::new(1),
publisher_endpoints: Mutex::new(HashMap::new()),
#[cfg(all(windows, feature = "windows-com"))]
dcom_sink_holder: Mutex::new(dcom_sink_holder),
}),
@@ -1863,6 +1876,14 @@ impl Session {
.map_err(map_resolver)?;
let correlation_id: [u8; 16] = rand::random();
// F56 — connect to the publisher engine before issuing the
// first advise against it, mirroring
// `MxNativeSession.EnsurePublisherConnected` (`cs:516-526`).
// Without this NmxSvc acks the advise but never dispatches
// DataUpdate frames back — the publishing engine doesn't know
// our engine is subscribed.
self.ensure_publisher_connected(i32::from(metadata.platform_id), i32::from(metadata.engine_id)).await?;
let opts = &inner.options;
let mut nmx = inner.nmx.lock().await;
let hr = nmx
@@ -2008,6 +2029,10 @@ impl Session {
// rationale as plain `subscribe`).
let inbound = Box::pin(BroadcastStream::new(self.inner.callback_tx.subscribe()));
// F56 — connect to the publisher engine first; see plain
// `subscribe` for the rationale.
self.ensure_publisher_connected(i32::from(metadata.platform_id), i32::from(metadata.engine_id)).await?;
let mut nmx = inner.nmx.lock().await;
let hr = nmx
.register_reference(
@@ -2021,6 +2046,29 @@ impl Session {
.await
.map_err(map_nmx)?;
ensure_hresult_ok(hr)?;
// F56 — buffered subscriptions need an explicit
// `AdviseSupervisory` round-trip after `RegisterReference` to
// start DataUpdate dispatch on this AVEVA install. The .NET
// reference's `MxNativeSession.RegisterBufferedItemAsync`
// (`cs:272-310`) only calls `RegisterReference` — but the LMX
// compat layer's `AddBufferedItem` + `AdviseSupervisory` chain
// ends up triggering the advise downstream. Mirroring just
// RegisterReference (per F36 wave 1's reading of capture 082)
// produces the registration result and heartbeat callbacks but
// no `0x33` DataUpdate frames. Issuing the advise here closes
// that gap — verified live against `TestMachine_001.TestChangingInt`.
let hr = nmx
.advise_supervisory(
opts.local_engine_id,
&metadata,
correlation_id,
opts.galaxy_id,
/* source_galaxy_id */ i32::from(opts.galaxy_id),
opts.source_platform_id,
)
.await
.map_err(map_nmx)?;
ensure_hresult_ok(hr)?;
drop(nmx);
let metadata_arc = Arc::new(metadata);
@@ -2063,6 +2111,66 @@ impl Session {
})
}
/// F56 — issue `INmxService2::Connect` + `AddSubscriberEngine`
/// against the `(platform_id, engine_id)` of the publishing engine,
/// once per session. Mirrors
/// `MxNativeSession.EnsurePublisherConnected` (`cs:516-526`) +
/// `ConnectPublisher` (`cs:528-536`).
///
/// Without this pair of RPCs before the first `AdviseSupervisory` /
/// `RegisterReference` against a given engine, NmxSvc acks the
/// advise but the publishing engine never knows our engine is
/// subscribed — no `0x33` DataUpdate frames flow back. Confirmed
/// 2026-05-06 by the absence of the .NET reference's
/// `EnsurePublisherConnected` call in the Rust port + live
/// reproduction against `TestMachine_001.TestChangingInt`.
async fn ensure_publisher_connected(
&self,
platform_id: i32,
engine_id: i32,
) -> Result<(), Error> {
let key = (platform_id, engine_id);
{
let endpoints = self.inner.publisher_endpoints.lock().await;
if endpoints.contains_key(&key) {
tracing::debug!(
platform_id,
engine_id,
"ensure_publisher_connected: already connected"
);
return Ok(());
}
}
let opts = &self.inner.options;
let local_engine = opts.local_engine_id;
let galaxy = i32::from(opts.galaxy_id);
let source_platform = opts.source_platform_id;
tracing::debug!(
platform_id,
engine_id,
local_engine,
galaxy,
source_platform,
"ensure_publisher_connected: issuing Connect + AddSubscriberEngine"
);
{
let mut nmx = self.inner.nmx.lock().await;
let hr = nmx
.connect_engine(local_engine, galaxy, platform_id, engine_id)
.await
.map_err(map_nmx)?;
ensure_hresult_ok(hr)?;
let hr = nmx
.add_subscriber_engine(engine_id, galaxy, source_platform, local_engine)
.await
.map_err(map_nmx)?;
ensure_hresult_ok(hr)?;
}
let mut endpoints = self.inner.publisher_endpoints.lock().await;
endpoints.insert(key, ());
Ok(())
}
/// `subscribe` ordering note: subscribe to the broadcast channel
/// FIRST, then issue `AdviseSupervisory`. If we ordered the other
/// way, updates that arrive between the advise call and the
@@ -2602,6 +2710,7 @@ mod tests {
rebuild_factory: Mutex::new(None),
pending_ops,
next_item_handle: std::sync::atomic::AtomicI32::new(1),
publisher_endpoints: Mutex::new(HashMap::new()),
#[cfg(all(windows, feature = "windows-com"))]
dcom_sink_holder: Mutex::new(None),
}),