[F55 Path A] DCOM-managed INmxSvcCallback sink

Replace the hand-rolled CallbackExporter (TCP listener + custom
OBJREF) with a real `windows-rs` `#[implement]` COM class for
INmxSvcCallback, marshalled via CoMarshalInterface. NmxSvc validates
the callback OBJREF by calling IObjectExporter::ResolveOxid against
the local RPCSS at 127.0.0.1:135; hand-rolled OXIDs aren't registered
there, which is why RegisterEngine2 returned RPC_S_SERVER_UNAVAILABLE
(1722) on every live attempt. CoMarshalInterface registers the OXID
with RPCSS automatically, so the SCM-side resolution succeeds.

Mirrors MxNativeSession.CreateRegisteredService (cs:624), which is
the .NET reference's working path:
  ComObjRefProvider.MarshalInterfaceObjRef(callback,
    INmxSvcCallback, DifferentMachine)

Layout:
- mxaccess-callback::dcom_sink — INmxSvcCallback + DcomCallbackSink
  + create_dcom_callback_sink_objref. Forwards inbound calls into
  the same CallbackEvent::CallbackInvoked { opnum, body } shape the
  legacy exporter produces, so callback_router stays path-agnostic.
- Session::from_nmx_client — branched on `windows-com`. Real DCOM
  sink when on; legacy CallbackExporter when off (kept for unit
  tests that run against an in-process fake NMX peer).
- SessionInner.dcom_sink_holder: Option<IUnknownHolder> — keeps the
  COM ref alive for the session's lifetime; shutdown_nmx drops it.
- mxaccess-rpc + mxaccess-callback: windows-rs 0.59 → 0.62. The 0.59
  #[implement] macro generates code that doesn't compile under
  edition 2024; 0.62 is fixed.

Live result: cargo test -p mxaccess-compat --features
live-windows-com --test lmx_write_complete_live -- --ignored
--nocapture passes end-to-end. RegisterEngine2 OK, write
round-trips, OnWriteComplete fires with the captured MxStatus shape.

Unblocks F49 step 5; F55 marked Resolved in design/followups.md.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-06 09:25:44 -04:00
parent 0a274af76f
commit 2fc327a8d5
9 changed files with 493 additions and 131 deletions
+1 -1
View File
@@ -45,7 +45,7 @@ serde = ["mxaccess-codec/serde"]
live = []
# Pulls F12's `Session::connect_nmx_auto` constructor — the auto-resolving
# COM-activation path. Propagates to `mxaccess-nmx/windows-com`.
windows-com = ["mxaccess-nmx/windows-com"]
windows-com = ["mxaccess-nmx/windows-com", "mxaccess-callback/windows-com"]
[lints]
workspace = true
+116 -35
View File
@@ -32,7 +32,19 @@
use std::sync::Arc;
use std::time::SystemTime;
use mxaccess_callback::{CallbackEvent, CallbackExporter, ExporterIdentities};
use mxaccess_callback::{CallbackEvent, CallbackExporter};
// `ExporterIdentities` is only used by the legacy hand-rolled
// CallbackExporter path. Path A (windows-com) skips it entirely.
#[cfg(not(all(windows, feature = "windows-com")))]
use mxaccess_callback::ExporterIdentities;
// F55 / Path A — DCOM-managed `INmxSvcCallback` sink. Only used when
// `windows-com` is on; without it the hand-rolled `CallbackExporter`
// path stays in place (it's still useful for unit tests that exercise
// the exporter against a fake NMX peer in-process). The DCOM sink is
// the path that survives NmxSvc's SCM-side OXID validation against the
// live AVEVA install — see `mxaccess_callback::dcom_sink` for context.
#[cfg(all(windows, feature = "windows-com"))]
use mxaccess_rpc::com_objref_provider::IUnknownHolder;
use mxaccess_codec::{
MxStatus, NmxOperationStatusMessage, NmxReferenceRegistrationMessage, NmxSubscriptionMessage,
NmxSubscriptionRecord,
@@ -40,7 +52,11 @@ use mxaccess_codec::{
use mxaccess_galaxy::{GalaxyTagMetadata, Resolver, ResolverError};
use mxaccess_nmx::{NmxClient, NmxClientError, WriteValue};
use mxaccess_rpc::guid::Guid;
use mxaccess_rpc::ntlm::{NtlmClientContext, local_hostname};
use mxaccess_rpc::ntlm::NtlmClientContext;
// Same as `ExporterIdentities` above — only the legacy exporter path
// derives the OBJREF host from `local_hostname()`.
#[cfg(not(all(windows, feature = "windows-com")))]
use mxaccess_rpc::ntlm::local_hostname;
use mxaccess_rpc::transport::TransportError;
use std::collections::{HashMap, VecDeque};
use std::net::SocketAddr;
@@ -598,6 +614,20 @@ pub struct SessionInner {
/// dictionaries (`MxNativeSession.cs` field-level comments) plus
/// the ordered list those dictionaries are consulted against.
pub(crate) pending_ops: Arc<Mutex<PendingOps>>,
/// F55 / Path A — keeps the DCOM-managed `INmxSvcCallback`'s
/// `IUnknown` ref alive for the session's lifetime. The marshalled
/// OBJREF passed to `RegisterEngine2` references this object's
/// OXID/IPID via the SCM's stub manager; once the last `IUnknown`
/// ref drops, the stub is torn down and inbound NmxSvc callbacks
/// fail to dispatch. `None` when the legacy `CallbackExporter`
/// path is in use (no `windows-com` feature) or after `shutdown_nmx`
/// drops the ref.
///
/// Mirrors the .NET reference's `MxNativeSession._callbackSink`
/// field (`MxNativeSession.cs`), which holds the `NmxCallbackSink`
/// instance for the same reason.
#[cfg(all(windows, feature = "windows-com"))]
pub(crate) dcom_sink_holder: Mutex<Option<IUnknownHolder>>,
}
/// FIFO-ordered registry of outstanding NMX operations waiting for an
@@ -908,35 +938,67 @@ impl Session {
options: SessionOptions,
resolver: Arc<dyn Resolver>,
) -> Result<Self, Error> {
// 1. Bind a local CallbackExporter on an OS-assigned ephemeral
// port, then build the OBJREF advertising it. Hostname comes
// from `local_hostname()` (env-var lookup); falls back to
// `127.0.0.1` when neither `COMPUTERNAME` nor `HOSTNAME` is
// set so the OBJREF binding is always parseable as
// "<host>[<port>]".
let identities = ExporterIdentities::random();
// Bind on UNSPECIFIED (`0.0.0.0`) so the listener accepts
// dial-backs on every interface NmxSvc could resolve the
// hostname to. The OBJREF's host string is the machine's
// `COMPUTERNAME` (or `127.0.0.1` fallback), and NmxSvc
// resolves that via DNS — which on a typical AVEVA install
// returns the machine's primary NIC IP, not loopback. If the
// exporter binds only on `127.0.0.1`, the dial-back lands on
// a different interface and the TCP SYN is dropped, surfacing
// as `RegisterEngine2 → Fault(0x800706BA RPC_S_SERVER_UNAVAILABLE)`
// because NmxSvc can't reach our exporter to negotiate the
// callback bind. Binding on UNSPECIFIED (= bind to all v4
// interfaces, including loopback + primary NIC) avoids this.
let exporter_addr =
SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED), 0);
let (exporter, callback_events) = CallbackExporter::bind(exporter_addr, identities)
.await
.map_err(Error::Io)?;
let hostname = match local_hostname() {
s if s.is_empty() => "127.0.0.1".to_string(),
s => s,
// 1. Build the callback sink + OBJREF.
//
// Two paths exist:
//
// - F55 / Path A (`windows-com` feature): Build a DCOM-managed
// `INmxSvcCallback` instance via `windows-rs` `#[implement]`,
// marshal it through `CoMarshalInterface`. This registers the
// sink's OXID with the local RPCSS so NmxSvc's SCM-side
// `IObjectExporter::ResolveOxid` validation passes — the
// hand-rolled OBJREF below fails this check with
// `RPC_S_SERVER_UNAVAILABLE` (1722) on `RegisterEngine2`.
// Mirrors `MxNativeSession.CreateRegisteredService` which
// calls `ComObjRefProvider.MarshalInterfaceObjRef(callback,
// INmxSvcCallback, DifferentMachine)`
// (`src/MxNativeClient/MxNativeSession.cs:624`).
//
// - Legacy hand-rolled exporter (no `windows-com`): Bind a
// local TCP listener that serves `IRemUnknown` +
// `INmxSvcCallback` directly, then advertise it via a
// custom OBJREF. Useful for unit tests that exercise the
// exporter against a fake NMX peer in-process. NOT used
// against a real NmxSvc — see F55 in `design/followups.md`.
#[cfg(all(windows, feature = "windows-com"))]
let (exporter, dcom_sink_holder, callback_events, callback_obj_ref) = {
let (holder, events, blob) = mxaccess_callback::create_dcom_callback_sink_objref()
.map_err(|e| {
Error::Connection(ConnectionError::TransportFailure {
detail: format!("DCOM callback sink marshal failed: {e:?}"),
})
})?;
(None::<CallbackExporter>, Some(holder), events, blob)
};
#[cfg(not(all(windows, feature = "windows-com")))]
let (exporter, callback_events, callback_obj_ref) = {
let identities = ExporterIdentities::random();
// Bind on UNSPECIFIED (`0.0.0.0`) so the listener accepts
// dial-backs on every interface NmxSvc could resolve the
// hostname to. The OBJREF's host string is the machine's
// `COMPUTERNAME` (or `127.0.0.1` fallback), and NmxSvc
// resolves that via DNS — which on a typical AVEVA install
// returns the machine's primary NIC IP, not loopback. If
// the exporter binds only on `127.0.0.1`, the dial-back
// lands on a different interface and the TCP SYN is
// dropped, surfacing as `RegisterEngine2 → Fault(0x800706BA
// RPC_S_SERVER_UNAVAILABLE)` because NmxSvc can't reach
// our exporter to negotiate the callback bind. Binding on
// UNSPECIFIED (= bind to all v4 interfaces, including
// loopback + primary NIC) avoids this.
let exporter_addr =
SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED), 0);
let (exporter, events) = CallbackExporter::bind(exporter_addr, identities)
.await
.map_err(Error::Io)?;
let hostname = match local_hostname() {
s if s.is_empty() => "127.0.0.1".to_string(),
s => s,
};
let blob = exporter.create_callback_objref(&hostname);
(Some(exporter), events, blob)
};
let callback_obj_ref = exporter.create_callback_objref(&hostname);
// 2. Spawn the router task that broadcasts parsed callback
// messages.
@@ -996,7 +1058,7 @@ impl Session {
options,
resolver,
nmx: Mutex::new(nmx),
callback_exporter: Mutex::new(Some(exporter)),
callback_exporter: Mutex::new(exporter),
callback_tx,
operation_status_tx,
recovery_active,
@@ -1007,6 +1069,8 @@ impl Session {
callback_obj_ref,
rebuild_factory: Mutex::new(None),
pending_ops,
#[cfg(all(windows, feature = "windows-com"))]
dcom_sink_holder: Mutex::new(dcom_sink_holder),
}),
})
}
@@ -2102,6 +2166,15 @@ impl Session {
if let Some(exp) = self.inner.callback_exporter.lock().await.take() {
exp.shutdown().await;
}
// F55 / Path A — drop the DCOM-managed sink's IUnknown ref so
// CoMarshalInterface's stub manager unregisters our OXID from
// RPCSS. Mirrors the .NET reference's `MxNativeSession.Dispose`
// path, which lets the `NmxCallbackSink` go out of scope after
// unregister.
#[cfg(all(windows, feature = "windows-com"))]
{
self.inner.dcom_sink_holder.lock().await.take();
}
// 3. Wait for the router task. Once the exporter is dropped its
// upstream mpsc::Sender closes, the router's recv() returns
@@ -2403,10 +2476,16 @@ mod tests {
// matches production. Tests don't drive real callbacks through
// this path, but keeping the shape symmetric means
// shutdown_nmx exercises the full cleanup chain.
let (exporter, callback_events) =
CallbackExporter::bind("127.0.0.1:0".parse().unwrap(), ExporterIdentities::random())
.await
.unwrap();
// Test-only helper exercises the legacy hand-rolled CallbackExporter
// path even when the crate is built with `windows-com`. The DCOM
// path needs a real NmxSvc on the wire; this shim talks to a
// `loopback_listener::expect_*` peer.
let (exporter, callback_events) = CallbackExporter::bind(
"127.0.0.1:0".parse().unwrap(),
mxaccess_callback::ExporterIdentities::random(),
)
.await
.unwrap();
let (callback_tx, _) = broadcast::channel(CALLBACK_BROADCAST_CAPACITY);
let (operation_status_tx, _) =
broadcast::channel::<Arc<OperationStatus>>(OPERATION_STATUS_BROADCAST_CAPACITY);
@@ -2437,6 +2516,8 @@ mod tests {
callback_obj_ref: Vec::new(),
rebuild_factory: Mutex::new(None),
pending_ops,
#[cfg(all(windows, feature = "windows-com"))]
dcom_sink_holder: Mutex::new(None),
}),
})
}