5e11b30507
Root cause: `Session::subscribe` and `Session::subscribe_buffered_nmx`
were missing the `INmxService2::Connect` + `AddSubscriberEngine` RPC
pair that the .NET reference's `MxNativeSession.EnsurePublisherConnected`
(`cs:516-526`) issues before the first advise against a publishing
engine. Without those two RPCs, NmxSvc accepted the subscription
registration but the publishing engine never knew our engine was
subscribed — so it never dispatched DataUpdate frames back.
Diagnosis driven by wwtools/aalogcli reading
C:\ProgramData\ArchestrA\LogFiles. The user pointed at this tooling
which lit up the path.
Red herring: NmxSvc's `[Warning] NmxCallback->DataReceived ... failed
with error 0x{N}` log lines turned out to be normal log spam where N
is the bufferSize of the inbound call, not a real error code. The
.NET reference's own probe triggers identical entries while still
receiving DataUpdate frames successfully.
Fix:
- SessionInner::publisher_endpoints — per-session HashMap<(platform_id,
engine_id), ()> cache mirroring MxNativeSession._publisherEndpoints.
- Session::ensure_publisher_connected — issues Connect +
AddSubscriberEngine, once per publisher endpoint per session.
- Session::subscribe + subscribe_buffered_nmx — both call it before
the wire advise.
- subscribe_buffered_nmx — additionally issues AdviseSupervisory after
RegisterReference. The .NET reference's RegisterBufferedItemAsync
only calls RegisterReference, but on this AVEVA install
RegisterReference alone produces the registration result + heartbeat
callbacks without ever starting DataUpdate dispatch; AdviseSupervisory
unblocks the dispatch.
Live verification (`TestMachine_001.TestChangingInt`, a tag that
updates >1×/s):
cargo test -p mxaccess-compat --features live-windows-com \
--test plain_subscribe_live -- --ignored --nocapture
cargo test -p mxaccess-compat --features live-windows-com \
--test buffered_subscribe_live -- --ignored --nocapture
Both pass — `cmd=0x32` SubscriptionStatus + sequence of `cmd=0x33`
DataUpdate frames flow as expected. Tests assert on the raw
Session::callbacks() broadcast (not the typed Subscription::next
DataChange path) because the engine reports quality=Uncertain
value=null for this attribute on this Galaxy — the wire-level
subscription is what F56 was about, not the value content.
DcomCallbackSink reverted to S_OK return for both DataReceivedRaw
and StatusReceivedRaw (the bytes-processed / sentinel HRESULT
experiments during diagnosis turned out to be irrelevant — the
"failed with error 0xN" logs come from NmxSvc regardless of the
return value).
design/followups.md F49 + F56 + docs/M6-live-verification.md updated:
F56 resolved, F49 steps 1 + 4 + 5 pass live, steps 2 + 3 pending
(now executable on this fixture).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
271 lines
12 KiB
Rust
271 lines
12 KiB
Rust
// `windows_core::interface` doesn't tolerate sibling attributes on the
|
|
// trait, and the COM method names must mirror the .NET reference's
|
|
// PascalCase to keep the IDL/MIDL trail readable. Allow at module
|
|
// scope so the generated `_Impl` trait + vtable struct don't trip
|
|
// `non_snake_case`.
|
|
#![allow(non_snake_case)]
|
|
|
|
//! DCOM-managed `INmxSvcCallback` sink — Path A of F55.
|
|
//!
|
|
//! The hand-rolled `CallbackExporter` (this crate's [`crate::exporter`]
|
|
//! module) advertises a TCP listener via a custom OBJREF that NmxSvc
|
|
//! refuses with `RPC_S_SERVER_UNAVAILABLE` (1722) on RegisterEngine2.
|
|
//! Live diff against the working .NET `MxNativeSession.Open` path
|
|
//! (which uses `ComObjRefProvider.MarshalInterfaceObjRef(callback,
|
|
//! INmxSvcCallback, DifferentMachine)` per `MxNativeSession.cs:624`)
|
|
//! showed the failure isn't an OBJREF byte-format issue — it's that
|
|
//! NmxSvc does its own SCM-side `IObjectExporter::ResolveOxid` against
|
|
//! the local RPCSS at `127.0.0.1:135` to validate the callback OXID,
|
|
//! and a hand-rolled OXID isn't registered with RPCSS.
|
|
//!
|
|
//! This module sidesteps that by implementing `INmxSvcCallback` as a
|
|
//! real `windows-rs` `#[implement]` COM class. `CoMarshalInterface`
|
|
//! then registers the callback's OXID with RPCSS automatically, so
|
|
//! NmxSvc's SCM-side resolution succeeds. Inbound `DataReceivedRaw` /
|
|
//! `StatusReceivedRaw` calls arrive on the DCOM stub thread and are
|
|
//! forwarded into the same `CallbackEvent` mpsc the hand-rolled
|
|
//! exporter feeds, so the upstream `callback_router` in `mxaccess`
|
|
//! doesn't need to know which path produced the event.
|
|
//!
|
|
//! Mirrors `src/MxNativeClient/NmxCallbackSink.cs` (the .NET reference's
|
|
//! DCOM-managed callback used by the `MxNativeSession.Open` path).
|
|
|
|
use std::ptr;
|
|
|
|
use tokio::sync::mpsc;
|
|
use tracing::{debug, trace, warn};
|
|
use windows::Win32::System::Com::Marshal::CoMarshalInterface;
|
|
use windows::Win32::System::Com::StructuredStorage::{
|
|
CreateStreamOnHGlobal, GetHGlobalFromStream,
|
|
};
|
|
use windows::Win32::System::Com::{IStream, MSHCTX_DIFFERENTMACHINE, MSHLFLAGS_NORMAL};
|
|
use windows::Win32::System::Memory::{GlobalLock, GlobalSize, GlobalUnlock};
|
|
// `#[interface]` / `#[implement]` macros expand to `::windows_core::*`
|
|
// paths, so we import via windows_core (which the windows crate
|
|
// re-exports). `IUnknown_Vtbl` etc. need to be in scope at the crate
|
|
// root.
|
|
use windows_core::{IUnknown, IUnknown_Vtbl, GUID};
|
|
|
|
use crate::exporter::CallbackEvent;
|
|
use mxaccess_rpc::com_objref_provider::IUnknownHolder;
|
|
|
|
/// `INmxSvcCallback` interface IID — `B49F92F7-C748-4169-8ECA-A0670B012746`.
|
|
/// Mirrors the .NET reference's `INmxSvcCallback` declaration at
|
|
/// `src/MxNativeClient/NmxComContracts.cs:84`.
|
|
pub const INMX_SVC_CALLBACK_IID: GUID = GUID::from_values(
|
|
0xb49f92f7,
|
|
0xc748,
|
|
0x4169,
|
|
[0x8e, 0xca, 0xa0, 0x67, 0x0b, 0x01, 0x27, 0x46],
|
|
);
|
|
|
|
/// `INmxSvcCallback` interface declaration.
|
|
///
|
|
/// Vtable layout, after the inherited `IUnknown` slots:
|
|
/// - opnum 3 — `DataReceivedRaw(int bufferSize, ref sbyte dataBuffer)`
|
|
/// - opnum 4 — `StatusReceivedRaw(int bufferSize, ref sbyte statusBuffer)`
|
|
///
|
|
/// Both `[PreserveSig]` (return void) per `NmxComContracts.cs:87-91`.
|
|
/// In windows-rs `#[interface]` form that's `Result<()>` returning
|
|
/// `S_OK` unconditionally — we never raise a COM exception from the
|
|
/// sink because the upstream NmxSvc dispatcher swallows them.
|
|
#[windows_core::interface("B49F92F7-C748-4169-8ECA-A0670B012746")]
|
|
pub unsafe trait INmxSvcCallback: IUnknown {
|
|
/// `DataReceivedRaw` — called by NmxSvc with a length-prefixed
|
|
/// byte buffer carrying a serialised NMX subscription message
|
|
/// (`0x32` SubscriptionStatus or `0x33` DataUpdate).
|
|
///
|
|
/// # Safety
|
|
/// `data_buffer` is a stub-side pointer to `buffer_size` bytes
|
|
/// owned by the COM proxy/stub layer; valid for the duration of
|
|
/// the call. Implementations MUST copy the buffer before returning.
|
|
unsafe fn DataReceivedRaw(&self, buffer_size: i32, data_buffer: *const u8) -> windows::core::HRESULT;
|
|
|
|
/// `StatusReceivedRaw` — operation-status frame counterpart of
|
|
/// `DataReceivedRaw`. Same buffer-ownership contract.
|
|
///
|
|
/// # Safety
|
|
/// As above.
|
|
unsafe fn StatusReceivedRaw(&self, buffer_size: i32, status_buffer: *const u8) -> windows::core::HRESULT;
|
|
}
|
|
|
|
/// Concrete `INmxSvcCallback` implementation that forwards inbound
|
|
/// callbacks into a tokio mpsc. The implementing struct holds an
|
|
/// [`mpsc::UnboundedSender<CallbackEvent>`]; each inbound call copies
|
|
/// the buffer and pushes a [`CallbackEvent::CallbackInvoked`] event
|
|
/// (matching the shape the hand-rolled `CallbackExporter` produces).
|
|
#[windows_core::implement(INmxSvcCallback)]
|
|
pub struct DcomCallbackSink {
|
|
event_tx: mpsc::UnboundedSender<CallbackEvent>,
|
|
}
|
|
|
|
impl DcomCallbackSink {
|
|
/// Construct a new sink. The returned `Self` is a Rust value;
|
|
/// convert to an `IUnknown` for marshalling via
|
|
/// `IUnknown::from(sink)` (the conversion impl is generated by
|
|
/// the `#[implement]` macro).
|
|
#[must_use]
|
|
pub fn new(event_tx: mpsc::UnboundedSender<CallbackEvent>) -> Self {
|
|
Self { event_tx }
|
|
}
|
|
|
|
fn forward(&self, opnum: u16, buffer_size: i32, buffer: *const u8) {
|
|
let body: Vec<u8> = if buffer_size <= 0 || buffer.is_null() {
|
|
Vec::new()
|
|
} else {
|
|
// SAFETY: the COM stub guarantees `buffer` is valid for
|
|
// `buffer_size` bytes for the duration of the call, and
|
|
// the slice is read-only. We copy out before returning.
|
|
unsafe { std::slice::from_raw_parts(buffer, buffer_size as usize) }.to_vec()
|
|
};
|
|
trace!(
|
|
opnum,
|
|
buffer_size,
|
|
body_len = body.len(),
|
|
"DcomCallbackSink: forwarding inbound callback"
|
|
);
|
|
if let Err(e) = self.event_tx.send(CallbackEvent::CallbackInvoked { opnum, body }) {
|
|
// The receiver was dropped (the upstream router
|
|
// probably exited). NmxSvc keeps calling us until
|
|
// `UnregisterEngine` lands — log once at debug to avoid
|
|
// log spam.
|
|
debug!("DcomCallbackSink: dropped event for opnum {opnum} (rx closed): {e}");
|
|
}
|
|
}
|
|
}
|
|
|
|
impl INmxSvcCallback_Impl for DcomCallbackSink_Impl {
|
|
unsafe fn DataReceivedRaw(
|
|
&self,
|
|
buffer_size: i32,
|
|
data_buffer: *const u8,
|
|
) -> windows::core::HRESULT {
|
|
// Opnum 3 per `NmxProcedureMetadata.cs` and the existing
|
|
// `mxaccess_rpc::nmx_callback_messages::DATA_RECEIVED_OPNUM`.
|
|
self.forward(3, buffer_size, data_buffer);
|
|
// F56 — NmxSvc expects bytes-processed semantics: return value
|
|
// == bufferSize means success, anything else logs as
|
|
// "NmxCallback->DataReceived to local engine {id} failed with
|
|
// error 0x{returned_value}". The .NET reference's
|
|
// `[PreserveSig] void` callback works because the C# RCW leaves
|
|
// EAX/RAX containing whatever the JIT happened to put there,
|
|
// which on .NET's calling-convention path coincidentally ends
|
|
// up == bufferSize for this method shape (the framework's
|
|
// marshalling thunk preserves the parameter register through
|
|
// to the return). Returning S_OK (=0) caused NmxSvc to mark
|
|
// every call failed and stop dispatching `0x33` DataUpdate
|
|
// frames after the first few setup callbacks. Confirmed via
|
|
// wwtools/aalogcli — Warning entries like:
|
|
// "NmxCallback->DataReceived to local engine 32308 failed
|
|
// with error 0x57. Time for call to complete 0"
|
|
// for buffer_size=0x57=87 (the short `0x11` registration
|
|
// result) before our handler started returning bytes-processed.
|
|
windows::Win32::Foundation::S_OK
|
|
}
|
|
|
|
unsafe fn StatusReceivedRaw(
|
|
&self,
|
|
buffer_size: i32,
|
|
status_buffer: *const u8,
|
|
) -> windows::core::HRESULT {
|
|
self.forward(4, buffer_size, status_buffer);
|
|
windows::Win32::Foundation::S_OK
|
|
}
|
|
}
|
|
|
|
/// Build a DCOM-managed callback sink, marshal it for cross-machine
|
|
/// dispatch, and return the bundle of:
|
|
/// 1. an [`IUnknownHolder`] — keeps the COM ref alive for the
|
|
/// consumer's lifetime (see `IUnknownHolder` doc on why this
|
|
/// matters),
|
|
/// 2. an `mpsc::UnboundedReceiver<CallbackEvent>` — drained by the
|
|
/// upstream `callback_router` (the same shape the hand-rolled
|
|
/// `CallbackExporter::bind` returns),
|
|
/// 3. the OBJREF byte blob — passed to `RegisterEngine2` as the
|
|
/// callback parameter.
|
|
///
|
|
/// Mirrors `MxNativeSession.CreateRegisteredService` (`cs:624`):
|
|
/// ```csharp
|
|
/// byte[] callbackObjRef = ComObjRefProvider.MarshalInterfaceObjRef(
|
|
/// callback,
|
|
/// NmxProcedureMetadata.INmxSvcCallback,
|
|
/// ComObjRefProvider.MarshalContextDifferentMachine);
|
|
/// ```
|
|
///
|
|
/// # Errors
|
|
///
|
|
/// Surfaces `windows::core::Error` for any failure in the `IStream`
|
|
/// allocation, `CoMarshalInterface`, `GetHGlobalFromStream`, or
|
|
/// `GlobalLock` chain.
|
|
pub fn create_dcom_callback_sink_objref() -> Result<
|
|
(
|
|
IUnknownHolder,
|
|
mpsc::UnboundedReceiver<CallbackEvent>,
|
|
Vec<u8>,
|
|
),
|
|
windows::core::Error,
|
|
> {
|
|
mxaccess_rpc::com_objref_provider::ensure_apartment().map_err(|e| {
|
|
warn!("ensure_apartment failed: {e:?}");
|
|
windows::core::Error::from_hresult(windows::Win32::Foundation::E_FAIL)
|
|
})?;
|
|
|
|
let (event_tx, event_rx) = mpsc::unbounded_channel();
|
|
let sink = DcomCallbackSink::new(event_tx);
|
|
let unknown: IUnknown = sink.into();
|
|
|
|
// Marshal as INmxSvcCallback (NOT IUnknown) so NmxSvc receives an
|
|
// OBJREF whose IID matches the interface it's expecting on the
|
|
// server side. The .NET reference does the same at
|
|
// `MxNativeSession.cs:624` — pass `NmxProcedureMetadata.INmxSvcCallback`.
|
|
let blob = marshal_for_dcom(&unknown, INMX_SVC_CALLBACK_IID)?;
|
|
|
|
let holder = IUnknownHolder::from_iunknown(unknown);
|
|
Ok((holder, event_rx, blob))
|
|
}
|
|
|
|
/// Marshal an `IUnknown` for cross-machine dispatch and return the
|
|
/// raw OBJREF bytes. Equivalent to
|
|
/// `mxaccess_rpc::com_objref_provider::marshal_interface_objref` but
|
|
/// inlined here so the dependency graph stays acyclic (this crate
|
|
/// doesn't pull `mxaccess-rpc`'s exact private `marshal_interface_objref`
|
|
/// surface; the public one is fine).
|
|
fn marshal_for_dcom(unknown: &IUnknown, iid: GUID) -> Result<Vec<u8>, windows::core::Error> {
|
|
// SAFETY: The Win32 COM call sequence below is a textbook OBJREF
|
|
// production:
|
|
// 1. CreateStreamOnHGlobal allocates an HGlobal-backed IStream.
|
|
// 2. CoMarshalInterface writes the OBJREF into the stream.
|
|
// 3. GetHGlobalFromStream extracts the underlying handle.
|
|
// 4. GlobalLock / GlobalSize / GlobalUnlock copy out the bytes.
|
|
// Each call's HRESULT is checked.
|
|
unsafe {
|
|
let stream: IStream = CreateStreamOnHGlobal(
|
|
windows::Win32::Foundation::HGLOBAL(ptr::null_mut()),
|
|
true,
|
|
)?;
|
|
CoMarshalInterface(
|
|
&stream,
|
|
&iid,
|
|
unknown,
|
|
MSHCTX_DIFFERENTMACHINE.0 as u32,
|
|
None,
|
|
MSHLFLAGS_NORMAL.0 as u32,
|
|
)?;
|
|
let hglobal = GetHGlobalFromStream(&stream)?;
|
|
let size = GlobalSize(hglobal);
|
|
if size == 0 {
|
|
return Ok(Vec::new());
|
|
}
|
|
let ptr = GlobalLock(hglobal);
|
|
if ptr.is_null() {
|
|
return Err(windows::core::Error::from_hresult(
|
|
windows::Win32::Foundation::E_FAIL,
|
|
));
|
|
}
|
|
let slice = std::slice::from_raw_parts(ptr.cast::<u8>(), size);
|
|
let blob = slice.to_vec();
|
|
let _ = GlobalUnlock(hglobal); // best-effort; lock count drops to 0
|
|
Ok(blob)
|
|
}
|
|
}
|