Files
mxaccess/rust/crates/mxaccess-rpc/src/com_objref_provider.rs
T
Joseph Doherty 2fc327a8d5 [F55 Path A] DCOM-managed INmxSvcCallback sink
Replace the hand-rolled CallbackExporter (TCP listener + custom
OBJREF) with a real `windows-rs` `#[implement]` COM class for
INmxSvcCallback, marshalled via CoMarshalInterface. NmxSvc validates
the callback OBJREF by calling IObjectExporter::ResolveOxid against
the local RPCSS at 127.0.0.1:135; hand-rolled OXIDs aren't registered
there, which is why RegisterEngine2 returned RPC_S_SERVER_UNAVAILABLE
(1722) on every live attempt. CoMarshalInterface registers the OXID
with RPCSS automatically, so the SCM-side resolution succeeds.

Mirrors MxNativeSession.CreateRegisteredService (cs:624), which is
the .NET reference's working path:
  ComObjRefProvider.MarshalInterfaceObjRef(callback,
    INmxSvcCallback, DifferentMachine)

Layout:
- mxaccess-callback::dcom_sink — INmxSvcCallback + DcomCallbackSink
  + create_dcom_callback_sink_objref. Forwards inbound calls into
  the same CallbackEvent::CallbackInvoked { opnum, body } shape the
  legacy exporter produces, so callback_router stays path-agnostic.
- Session::from_nmx_client — branched on `windows-com`. Real DCOM
  sink when on; legacy CallbackExporter when off (kept for unit
  tests that run against an in-process fake NMX peer).
- SessionInner.dcom_sink_holder: Option<IUnknownHolder> — keeps the
  COM ref alive for the session's lifetime; shutdown_nmx drops it.
- mxaccess-rpc + mxaccess-callback: windows-rs 0.59 → 0.62. The 0.59
  #[implement] macro generates code that doesn't compile under
  edition 2024; 0.62 is fixed.

Live result: cargo test -p mxaccess-compat --features
live-windows-com --test lmx_write_complete_live -- --ignored
--nocapture passes end-to-end. RegisterEngine2 OK, write
round-trips, OnWriteComplete fires with the captured MxStatus shape.

Unblocks F49 step 5; F55 marked Resolved in design/followups.md.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-06 09:25:44 -04:00

488 lines
20 KiB
Rust

//! Win32 OBJREF emitter — port of `src/MxNativeClient/ComObjRefProvider.cs`.
//!
//! Gated on the `windows-com` Cargo feature AND `cfg(windows)`. The pure-Rust
//! [`crate::objref::ComObjRef`] parser stands alone for the inbound activation
//! response path; this module is the *outbound* counterpart — it activates a
//! local COM class, marshals its `IUnknown` through `CoMarshalInterface` to
//! produce an OBJREF blob, and returns that blob as a `Vec<u8>`.
//!
//! ## Why this exists
//!
//! `ManagedNmxService2Client.Create()` (`ManagedNmxService2Client.cs:30-64`)
//! discovers the live `(host, port, service_ipid)` triple of `NmxSvc.exe` by
//! activating the `NmxSvc.NmxService` ProgID, marshalling the resulting
//! `IUnknown` to an OBJREF, and parsing the OXID/IPID out of that blob. The
//! activation must go through `CoMarshalInterface` rather than reading a
//! static config file because the `(host, port)` change every time the NMX
//! service restarts. The .NET reference's `ComObjRefProvider.cs` does this
//! through `Marshal.GetIUnknownForObject` + `CoMarshalInterface` over an
//! HGlobal-backed `IStream`. The Rust port mirrors that exactly using
//! `windows-rs`.
//!
//! ## Safety
//!
//! `mxaccess-rpc` is the only crate where internal `unsafe` is permitted (per
//! `design/00-overview.md` principle 3). All `unsafe` here is wrapped in safe
//! `pub fn` boundaries — callers do not see raw pointers, HRESULTs, or
//! lifetime-bound interface pointers. Each `unsafe` block carries a comment
//! explaining the invariant being upheld.
//!
//! ## COM apartment
//!
//! `CoMarshalInterface` requires the calling thread to be COM-initialised.
//! The high-level entry points call `CoInitializeEx(MULTITHREADED)` lazily,
//! once per worker thread, via a thread-local `OnceLock` guard.
//! `RPC_E_CHANGED_MODE` (the thread is already initialised to STA) is treated
//! as success — the existing apartment is fine for `CoMarshalInterface`.
#![cfg(all(windows, feature = "windows-com"))]
// Win32 FFI requires unsafe; localised to this module per the crate-level rule.
#![allow(unsafe_code)]
// `usize`-sized buffers may legitimately overflow on a 32-bit host with very
// large OBJREF blobs; mirrors the same indexing-permission rationale as
// `objref.rs`.
#![allow(clippy::cast_possible_truncation)]
use std::sync::OnceLock;
use thiserror::Error;
use windows::Win32::Foundation::{
GetLastError, HGLOBAL, RPC_E_CHANGED_MODE, S_FALSE, S_OK,
};
use windows::Win32::System::Com::Marshal::CoMarshalInterface;
use windows::Win32::System::Com::StructuredStorage::{
CreateStreamOnHGlobal, GetHGlobalFromStream,
};
use windows::Win32::System::Com::{
CLSIDFromProgID, CoCreateInstance, CoInitializeEx, IStream, CLSCTX_INPROC_SERVER,
CLSCTX_LOCAL_SERVER, CLSCTX_REMOTE_SERVER, COINIT_MULTITHREADED, MSHCTX_DIFFERENTMACHINE,
MSHCTX_INPROC, MSHCTX_LOCAL, MSHLFLAGS_NORMAL,
};
use windows::Win32::System::Memory::{GlobalLock, GlobalSize, GlobalUnlock};
use windows::core::{IUnknown, Interface, GUID, HSTRING, PCWSTR};
/// Marshalling destination context. Mirrors the .NET constants at
/// `ComObjRefProvider.cs:8-10`. Maps directly to the Win32 `MSHCTX_*`
/// values in `[MS-DCOM]` §2.2.20.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u32)]
pub enum MarshalContext {
/// `MSHCTX_INPROC` — same process. Mirrors `MarshalContextInProcess`.
InProcess = 0,
/// `MSHCTX_LOCAL` — different process, same machine.
Local = 1,
/// `MSHCTX_DIFFERENTMACHINE` — different machine.
DifferentMachine = 2,
}
impl From<MarshalContext> for u32 {
fn from(ctx: MarshalContext) -> u32 {
match ctx {
MarshalContext::InProcess => MSHCTX_INPROC.0 as u32,
MarshalContext::Local => MSHCTX_LOCAL.0 as u32,
MarshalContext::DifferentMachine => MSHCTX_DIFFERENTMACHINE.0 as u32,
}
}
}
/// Errors raised by this module.
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum ProviderError {
/// `CLSIDFromProgID` rejected the ProgID — typically `REGDB_E_CLASSNOTREG`
/// (`0x80040154`). Mirrors the `InvalidOperationException` at
/// `ComObjRefProvider.cs:14`.
#[error("CLSIDFromProgID('{prog_id}') failed: HRESULT 0x{hr:08X}")]
UnknownProgId { prog_id: String, hr: u32 },
/// `CoCreateInstance` failed for the resolved CLSID. Most commonly
/// `CO_E_SERVER_EXEC_FAILURE` or `E_ACCESSDENIED` for cross-bitness
/// LocalServers under DCOM.
#[error("CoCreateInstance({clsid:?}) failed: HRESULT 0x{hr:08X}")]
ActivationFailed { clsid: GUID, hr: u32 },
/// `CoMarshalInterface` / `CreateStreamOnHGlobal` /
/// `GetHGlobalFromStream` returned a non-success HRESULT. The op name
/// names which step.
#[error("{op} failed: HRESULT 0x{hr:08X}")]
MarshalFailed { op: &'static str, hr: u32 },
/// `GlobalLock` returned NULL. The `GetLastError` value is in `last_error`.
#[error("GlobalLock returned NULL (GetLastError = {last_error})")]
GlobalLockFailed { last_error: u32 },
/// `CoInitializeEx` returned an HRESULT other than `S_OK`, `S_FALSE`, or
/// `RPC_E_CHANGED_MODE`. The latter two are treated as success.
#[error("CoInitializeEx failed: HRESULT 0x{hr:08X}")]
ApartmentInitFailed { hr: u32 },
}
// ---------------------------------------------------------------------------
// Apartment management
// ---------------------------------------------------------------------------
/// Ensure the *current thread* is COM-initialised.
///
/// Strict per-thread COM init is awkward to track in async/multi-threaded
/// runtimes; instead, we eagerly call `CoInitializeEx(MULTITHREADED)` once
/// per thread that lands in `marshal_*`. Re-entrant calls return `S_FALSE`,
/// which we accept. If a thread is already initialised to STA we receive
/// `RPC_E_CHANGED_MODE` — also treated as success (the existing apartment
/// is fine for `CoMarshalInterface`).
pub fn ensure_apartment() -> Result<(), ProviderError> {
thread_local! {
// `OnceLock` per thread guarantees we only attempt CoInitializeEx
// once per worker; subsequent calls are a no-op.
static APT: OnceLock<()> = const { OnceLock::new() };
}
APT.with(|cell| {
if cell.get().is_some() {
return Ok(());
}
// SAFETY: CoInitializeEx is the standard COM entry point; passing
// None for the reserved pointer and a valid COINIT flag is the
// documented invocation shape.
let hr = unsafe { CoInitializeEx(None, COINIT_MULTITHREADED) };
if hr == S_OK || hr == S_FALSE || hr == RPC_E_CHANGED_MODE {
// Discard the result — `set` only fails if already set, which
// we checked above.
let _ = cell.set(());
Ok(())
} else {
Err(ProviderError::ApartmentInitFailed {
hr: hr.0 as u32,
})
}
})
}
// ---------------------------------------------------------------------------
// ProgID → CLSID
// ---------------------------------------------------------------------------
/// Resolve a ProgID to a CLSID via `CLSIDFromProgID`. Mirrors
/// `Type.GetTypeFromProgID(progId, throwOnError: true)` (`cs:14`).
///
/// # Errors
///
/// [`ProviderError::UnknownProgId`] when the registry has no entry under
/// `HKCR\<prog_id>\CLSID` (typically `REGDB_E_CLASSNOTREG = 0x80040154`).
pub fn clsid_from_prog_id(prog_id: &str) -> Result<GUID, ProviderError> {
ensure_apartment()?;
let wide = HSTRING::from(prog_id);
// SAFETY: PCWSTR points into `wide` for the duration of the call;
// CLSIDFromProgID writes a CLSID through the out-pointer in the
// generated wrapper.
let result = unsafe { CLSIDFromProgID(PCWSTR::from_raw(wide.as_ptr())) };
result.map_err(|e| ProviderError::UnknownProgId {
prog_id: prog_id.to_string(),
hr: e.code().0 as u32,
})
}
// ---------------------------------------------------------------------------
// Marshalling
// ---------------------------------------------------------------------------
/// Activate a COM class by ProgID and return the OBJREF byte stream that
/// represents its `IUnknown` proxy in the supplied marshal context.
///
/// Mirrors `MarshalActivatedIUnknownObjRef` (`cs:12-30`). Activation uses
/// `CLSCTX_INPROC_SERVER | CLSCTX_LOCAL_SERVER | CLSCTX_REMOTE_SERVER` —
/// the same default `Activator.CreateInstance` picks up via
/// `Type.GetTypeFromProgID`.
///
/// **The activated `IUnknown` is dropped at the end of this call.** For
/// most use cases that's a bug — when the COM ref count goes to zero
/// the SCM may release the activated server-side instance, which makes
/// the marshalled OXID invalid for subsequent RPC. Use
/// [`activate_and_marshal_iunknown_objref`] instead and hold the
/// returned [`IUnknownHolder`] for the lifetime of the consumer that
/// uses the OBJREF (typically the lifetime of the client built from
/// it). This function is retained for callers that consume the OBJREF
/// inline (e.g. tests / probes that use the bytes immediately and
/// don't care about the activated server-side lifetime).
///
/// # Errors
///
/// [`ProviderError::UnknownProgId`], [`ProviderError::ActivationFailed`],
/// [`ProviderError::MarshalFailed`], [`ProviderError::GlobalLockFailed`].
pub fn marshal_activated_iunknown_objref(
prog_id: &str,
destination_context: MarshalContext,
) -> Result<Vec<u8>, ProviderError> {
activate_and_marshal_iunknown_objref(prog_id, destination_context).map(|(blob, _holder)| blob)
}
/// Activate a COM class by ProgID, marshal its `IUnknown`, and return
/// **both** the OBJREF byte stream **and** an [`IUnknownHolder`] that
/// keeps the activated server-side instance alive.
///
/// This is the .NET-reference-faithful path: `ManagedNmxService2Client`
/// (`cs:15`) holds the activated COM object as a private field for the
/// client's lifetime via `_activatedComObject`. The Rust port previously
/// dropped the IUnknown right after marshalling, which let the SCM
/// release the server-side instance and made subsequent
/// `ResolveOxid`/`RemQueryInterface` calls return
/// `RPC_S_SERVER_UNAVAILABLE` (1722). Holding the
/// [`IUnknownHolder`] for the client's lifetime fixes that.
///
/// The OBJREF blob and the IUnknown both refer to the same activated
/// server-side instance; keep them paired.
///
/// # Errors
///
/// [`ProviderError::UnknownProgId`], [`ProviderError::ActivationFailed`],
/// [`ProviderError::MarshalFailed`], [`ProviderError::GlobalLockFailed`].
pub fn activate_and_marshal_iunknown_objref(
prog_id: &str,
destination_context: MarshalContext,
) -> Result<(Vec<u8>, IUnknownHolder), ProviderError> {
ensure_apartment()?;
let clsid = clsid_from_prog_id(prog_id)?;
let activation_flags = CLSCTX_INPROC_SERVER | CLSCTX_LOCAL_SERVER | CLSCTX_REMOTE_SERVER;
// SAFETY: `clsid` is initialised by `CLSIDFromProgID`; activation_flags
// is a valid CLSCTX bitmask; `None` for the controlling-unknown is a
// standard no-aggregation invocation.
let unknown: IUnknown =
unsafe { CoCreateInstance(&clsid, None, activation_flags) }.map_err(|e| {
ProviderError::ActivationFailed {
clsid,
hr: e.code().0 as u32,
}
})?;
let blob = marshal_iunknown_objref(&unknown, destination_context)?;
Ok((blob, IUnknownHolder { inner: unknown }))
}
/// Owns a live `IUnknown` reference to a COM-activated server-side
/// instance. Drop releases the reference (the COM proxy's `Release`
/// runs, which decrements the server-side ref count and may trigger
/// instance teardown when no other holders remain).
///
/// `Send + Sync` because the underlying COM proxy is registered in the
/// MTA (`COINIT_MULTITHREADED` per [`ensure_apartment`]) and is
/// therefore safe to invoke from any thread. SAFETY of the unsafe impls
/// rests on this MTA invariant — callers must not transition the
/// process apartment to STA after activating an [`IUnknownHolder`].
pub struct IUnknownHolder {
#[allow(dead_code)]
inner: IUnknown,
}
impl IUnknownHolder {
/// Wrap an existing `IUnknown` into a holder. Used by callers
/// (e.g. `mxaccess-callback::dcom_sink`) that have an `IUnknown`
/// from a `windows-rs` `#[implement]` cast and need to keep the
/// COM ref alive for the same Path-A reasons documented at the
/// type level.
#[must_use]
pub fn from_iunknown(inner: IUnknown) -> Self {
Self { inner }
}
}
impl std::fmt::Debug for IUnknownHolder {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("IUnknownHolder").finish_non_exhaustive()
}
}
// SAFETY: `IUnknownHolder` only ever wraps an MTA-resident COM proxy
// (see `ensure_apartment` initialising `COINIT_MULTITHREADED`). MTA
// proxies are thread-neutral by COM contract — calls can originate
// from any thread without marshalling.
unsafe impl Send for IUnknownHolder {}
// SAFETY: same MTA-invariant rationale as `Send`.
unsafe impl Sync for IUnknownHolder {}
/// Marshal an arbitrary `IUnknown` to an OBJREF byte stream. Mirrors
/// `MarshalIUnknownObjRef` (`cs:32-35`), passing IID `IID_IUnknown`
/// (`{00000000-0000-0000-C000-000000000046}`).
///
/// # Errors
///
/// [`ProviderError::MarshalFailed`], [`ProviderError::GlobalLockFailed`].
pub fn marshal_iunknown_objref(
unknown: &IUnknown,
destination_context: MarshalContext,
) -> Result<Vec<u8>, ProviderError> {
marshal_interface_objref(unknown, IUnknown::IID, destination_context)
}
/// Marshal `unknown` for the given interface IID to an OBJREF blob.
/// Mirrors `MarshalInterfaceObjRef` (`cs:37-80`).
///
/// The byte sequence returned is the exact bytes `CoMarshalInterface` wrote
/// into the HGlobal-backed stream — not a re-formatted copy. Pass through
/// [`crate::objref::ComObjRef::parse`] to inspect OXID / OID / IPID.
///
/// # Errors
///
/// [`ProviderError::MarshalFailed`] for any non-success HRESULT from
/// `CreateStreamOnHGlobal` / `CoMarshalInterface` / `GetHGlobalFromStream`;
/// [`ProviderError::GlobalLockFailed`] if `GlobalLock` returns NULL.
pub fn marshal_interface_objref(
unknown: &IUnknown,
iid: GUID,
destination_context: MarshalContext,
) -> Result<Vec<u8>, ProviderError> {
ensure_apartment()?;
// SAFETY: All Win32 COM calls below are documented as valid for the
// arguments we pass:
// - `CreateStreamOnHGlobal(NULL, TRUE, ...)` allocates a new HGlobal
// and binds it to the IStream (delete-on-release semantics).
// - `CoMarshalInterface(stream, &iid, unknown, ctx, NULL, NORMAL)`
// writes the OBJREF into the stream.
// - `GetHGlobalFromStream` extracts the underlying HGlobal handle
// for direct memory read — supported by IStreams created via
// `CreateStreamOnHGlobal`.
// - `GlobalLock` / `GlobalUnlock` / `GlobalSize` are the canonical
// accessors for HGlobal-backed memory.
// Each result is checked; failures bubble up as `ProviderError`.
unsafe {
let stream: IStream = CreateStreamOnHGlobal(HGLOBAL(std::ptr::null_mut()), true)
.map_err(|e| ProviderError::MarshalFailed {
op: "CreateStreamOnHGlobal",
hr: e.code().0 as u32,
})?;
CoMarshalInterface(
&stream,
&iid,
unknown,
destination_context.into(),
None,
MSHLFLAGS_NORMAL.0 as u32,
)
.map_err(|e| ProviderError::MarshalFailed {
op: "CoMarshalInterface",
hr: e.code().0 as u32,
})?;
let hglobal: HGLOBAL = GetHGlobalFromStream(&stream).map_err(|e| {
ProviderError::MarshalFailed {
op: "GetHGlobalFromStream",
hr: e.code().0 as u32,
}
})?;
let size = GlobalSize(hglobal);
let pointer = GlobalLock(hglobal);
if pointer.is_null() {
return Err(ProviderError::GlobalLockFailed {
last_error: GetLastError().0,
});
}
// SAFETY: `pointer` is non-null and points to `size` bytes of
// initialised memory inside the HGlobal block; we copy into a
// freshly-allocated Vec without aliasing.
let buffer = std::slice::from_raw_parts(pointer.cast::<u8>(), size).to_vec();
// GlobalUnlock returns Result<()>; documented to return BOOL on
// the wire where FALSE is the *normal* path (lock count drops to
// zero). The Result<()> wrapper treats that as success.
let _ = GlobalUnlock(hglobal);
// The IStream was created with `delete_on_release = TRUE`, so the
// HGlobal is freed when `stream` drops at end-of-scope. No
// explicit `GlobalFree` needed.
drop(stream);
Ok(buffer)
}
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
#[cfg(test)]
#[allow(
clippy::unwrap_used,
clippy::expect_used,
clippy::indexing_slicing,
clippy::panic
)]
mod tests {
use super::*;
use crate::objref::{ComObjRef, OBJREF_HEADER_LEN};
#[test]
fn marshal_context_maps_to_win32_constants() {
assert_eq!(u32::from(MarshalContext::InProcess), MSHCTX_INPROC.0 as u32);
assert_eq!(u32::from(MarshalContext::Local), MSHCTX_LOCAL.0 as u32);
assert_eq!(
u32::from(MarshalContext::DifferentMachine),
MSHCTX_DIFFERENTMACHINE.0 as u32
);
}
#[test]
fn ensure_apartment_is_idempotent() {
// Both calls must return Ok; the second hits the OnceLock path.
ensure_apartment().expect("first apartment init");
ensure_apartment().expect("second apartment init (idempotent)");
}
#[test]
fn clsid_from_unknown_prog_id_returns_unknown_prog_id() {
// REGDB_E_CLASSNOTREG = 0x80040154 — guaranteed for any unregistered
// ProgID. The exact HRESULT may vary by Windows version (some return
// CO_E_CLASSSTRING) so we only assert on the variant, not the code.
let err = clsid_from_prog_id("NonExistent.NotARealProgId.QQQ.123").unwrap_err();
match err {
ProviderError::UnknownProgId { prog_id, hr } => {
assert_eq!(prog_id, "NonExistent.NotARealProgId.QQQ.123");
assert_ne!(hr, 0, "expected non-success HRESULT");
}
other => panic!("expected UnknownProgId, got {other:?}"),
}
}
#[test]
fn marshal_activated_with_unknown_progid_fails_at_resolution() {
// We don't even reach activation — should fail at CLSIDFromProgID.
let err = marshal_activated_iunknown_objref(
"NonExistent.AnotherFakeProgId.999",
MarshalContext::Local,
)
.unwrap_err();
assert!(matches!(err, ProviderError::UnknownProgId { .. }));
}
/// Live integration test — gated on `MX_LIVE` env var (the workspace
/// convention; populated by `tools/Setup-LiveProbeEnv.ps1`).
/// Activates the local `NmxSvc.NmxService` and verifies the marshalled
/// OBJREF parses back via [`crate::objref::ComObjRef::parse`].
///
/// Without `MX_LIVE`, the test is silent — pure-Rust CI hosts have no
/// AVEVA install and would always fail.
#[test]
#[ignore = "requires a live AVEVA install with NmxSvc registered; gated on MX_LIVE"]
fn live_marshal_nmx_service_round_trip() {
if std::env::var_os("MX_LIVE").is_none() {
eprintln!("MX_LIVE not set; skipping");
return;
}
let blob =
marshal_activated_iunknown_objref("NmxSvc.NmxService", MarshalContext::Local)
.expect("activate + marshal NmxSvc.NmxService");
assert!(
blob.len() >= OBJREF_HEADER_LEN,
"OBJREF blob too short ({} bytes)",
blob.len()
);
// The "MEOW" signature is the first 4 bytes per [MS-DCOM] §2.2.18.
let parsed = ComObjRef::parse(&blob).expect("parse marshalled OBJREF");
// OXID / IPID must be non-zero for an activated server.
assert_ne!(parsed.oxid, 0, "OBJREF OXID is zero");
assert_ne!(
parsed.ipid.0,
[0u8; 16],
"OBJREF IPID is zero (expected non-zero IPID)"
);
}
}