[M4] mxaccess: wire CallbackExporter + spawn callback router (F15 step 1/2)

Lands the broadcast layer of F15. Session::connect_nmx now starts a
local CallbackExporter on an OS-assigned ephemeral port, builds a
callback OBJREF advertising it (using local_hostname() with a
127.0.0.1 fallback), and registers that OBJREF with NmxClient::register_engine_2
(was register_engine_2_without_callback). A router task drains the
exporter's CallbackEvent stream, decodes each CallbackInvoked body as
NmxSubscriptionMessage, and broadcasts parsed messages on a
tokio::sync::broadcast channel.

Per-subscription correlation routing — turning these raw messages
into per-Subscription DataChange streams — is the next iteration's
work. F15 stays open until that lands.

New Session API
- Session::callbacks() -> broadcast::Receiver<Arc<NmxSubscriptionMessage>>:
  raw observable of every parsed callback message. Test seam +
  escape hatch for consumers that need raw access today.
- Session::callback_exporter_addr() -> Option<SocketAddr>: returns the
  exporter's local addr (Some until shutdown_nmx, None after).

SessionInner additions
- callback_exporter: Mutex<Option<CallbackExporter>> — taken in shutdown.
- callback_tx: broadcast::Sender<Arc<NmxSubscriptionMessage>>.
- router_handle: std::sync::Mutex<Option<JoinHandle<()>>>.

shutdown_nmx now performs the full cleanup chain:
1. UnregisterEngine over the live NMX transport.
2. CallbackExporter::shutdown (cancels accept loop).
3. Wait for router task — exits naturally once exporter's mpsc
   sender side closes. Std::sync::Mutex guard taken-out-then-dropped
   before await to avoid clippy::await_holding_lock.

Routing rationale (callback_router fn)
- CallbackEvent::CallbackInvoked → parse via
  NmxSubscriptionMessage::parse_inner → broadcast Arc<msg>.
- Other event variants (Bind / Auth3Ignored / ProtocolError / etc.)
  silently dropped at this layer; consumers needing them can listen
  to a future diagnostic-channel hook (no followup yet).
- Parse failures silent — the .NET reference fires a separate
  UnparsedCallbackReceived event we don't model yet.

Cargo.toml: added mxaccess-callback as a direct dep on mxaccess.

Tests (5 new in mxaccess; total 35)
- callbacks receiver observes injected NmxSubscriptionMessage.
- multi-subscriber broadcast hands out the same Arc to each receiver.
- callback_exporter_addr is Some before shutdown, None after.
- router_task end-to-end: feed a hand-built CallbackInvoked event
  with a 39-byte 0x32 SubscriptionStatus body, observe the parsed
  message on the broadcast.
- router silently drops non-CallbackInvoked events (e.g. Bind).

Test count delta: 506 -> 511 (+5). All four DoD gates green.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-05 09:35:41 -04:00
parent f7139f1118
commit 2b849aed7a
2 changed files with 372 additions and 28 deletions
+9 -8
View File
@@ -9,14 +9,15 @@ rust-version.workspace = true
authors.workspace = true
[dependencies]
mxaccess-codec = { path = "../mxaccess-codec" }
mxaccess-galaxy = { path = "../mxaccess-galaxy" }
mxaccess-nmx = { path = "../mxaccess-nmx" }
mxaccess-rpc = { path = "../mxaccess-rpc" }
thiserror = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
rand = "0.8"
mxaccess-codec = { path = "../mxaccess-codec" }
mxaccess-callback = { path = "../mxaccess-callback" }
mxaccess-galaxy = { path = "../mxaccess-galaxy" }
mxaccess-nmx = { path = "../mxaccess-nmx" }
mxaccess-rpc = { path = "../mxaccess-rpc" }
thiserror = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
rand = "0.8"
[dev-dependencies]
async-trait = { workspace = true }
+363 -20
View File
@@ -20,26 +20,44 @@
//! ## What's deliberately NOT here (yet)
//!
//! - Recovery loop / `RecoveryEvent` emission (wave 2).
//! - Callback exporter wiring + `Subscription` stream (wave 2).
//! - Per-`Subscription` `Stream<Item = DataChange>` routing (followup
//! F15). The callback exporter is now wired in `connect_nmx` and the
//! broadcast channel exposed via [`Session::callbacks`] yields raw
//! parsed `NmxSubscriptionMessage`s; turning those into typed
//! per-subscription `DataChange` items is the next iteration's work.
//! - `read` (read-as-subscribe pattern from `MxNativeSession.ReadAsync`
//! `cs:312-359`) — needs the callback exporter.
//! `cs:312-359`) — needs F15's per-subscription routing.
//! - Auto-resolving COM activation (followup F12).
use std::sync::Arc;
use std::time::SystemTime;
use mxaccess_callback::{CallbackEvent, CallbackExporter, ExporterIdentities};
use mxaccess_codec::NmxSubscriptionMessage;
use mxaccess_galaxy::{GalaxyTagMetadata, Resolver, ResolverError};
use mxaccess_nmx::{NmxClient, NmxClientError, WriteValue};
use mxaccess_rpc::guid::Guid;
use mxaccess_rpc::ntlm::NtlmClientContext;
use mxaccess_rpc::ntlm::{NtlmClientContext, local_hostname};
use mxaccess_rpc::transport::TransportError;
use std::net::SocketAddr;
use tokio::sync::Mutex;
use tokio::sync::{Mutex, broadcast};
use tokio::task::JoinHandle;
use crate::{
ConfigError, ConnectionError, Error, RecoveryPolicy, SecurityContext, Session, SessionOptions,
};
/// Capacity of the broadcast channel that fans out parsed
/// `NmxSubscriptionMessage`s to all consumers (`Session::callbacks` /
/// future `Subscription::Stream`).
///
/// Picked to absorb a short burst of updates without dropping when a
/// consumer is briefly slow. Subscribers that lag past this many items
/// receive `RecvError::Lagged(n)` from `broadcast::Receiver::recv()` —
/// the wire protocol does NOT replay missed updates so consumers must
/// either keep up or accept lag-loss.
const CALLBACK_BROADCAST_CAPACITY: usize = 256;
/// Subscription lifecycle handle returned by [`Session::subscribe`].
///
/// Carries the 16-byte `correlation_id` the Rust port generated for
@@ -134,6 +152,19 @@ pub struct SessionInner {
pub(crate) options: SessionOptions,
pub(crate) resolver: Arc<dyn Resolver>,
pub(crate) nmx: Mutex<NmxClient>,
/// Local TCP server that `NmxSvc.exe` calls back into. `None` after
/// `shutdown_nmx` consumes it. Held behind a `tokio::sync::Mutex`
/// so the shutdown path can `take()` it out and `await` its
/// `shutdown()`.
pub(crate) callback_exporter: Mutex<Option<CallbackExporter>>,
/// Broadcast channel that fans out parsed callback messages. Tap
/// via [`Session::callbacks`].
pub(crate) callback_tx: broadcast::Sender<Arc<NmxSubscriptionMessage>>,
/// Handle to the router task that drains the
/// [`CallbackExporter`]'s `CallbackEvent` channel and pushes parsed
/// `NmxSubscriptionMessage`s onto `callback_tx`. `None` after
/// `shutdown_nmx` joins on it.
pub(crate) router_handle: std::sync::Mutex<Option<JoinHandle<()>>>,
/// `false` after [`Session::shutdown`] has run successfully. Subsequent
/// operations short-circuit with [`Error::Connection`].
pub(crate) connected: std::sync::atomic::AtomicBool,
@@ -147,10 +178,44 @@ impl std::fmt::Debug for SessionInner {
"connected",
&self.connected.load(std::sync::atomic::Ordering::Acquire),
)
.field(
"callback_subscriber_count",
&self.callback_tx.receiver_count(),
)
.finish_non_exhaustive()
}
}
/// Drain `CallbackExporter` events, decode `CallbackInvoked` bodies as
/// `NmxSubscriptionMessage`, and broadcast each parsed message.
///
/// Exits when the upstream `CallbackEvent` channel closes (which
/// happens when the `CallbackExporter` is dropped or
/// `shutdown()`-ed). Other event variants (Bind, Auth3Ignored,
/// ProtocolError, etc.) are ignored at this layer; consumers that
/// need them can subscribe to the raw `CallbackExporter` events
/// directly via a future "diagnostic-channel" hook (no followup yet
/// — surface only when a real consumer asks).
pub(crate) async fn callback_router(
mut events: tokio::sync::mpsc::UnboundedReceiver<CallbackEvent>,
callback_tx: broadcast::Sender<Arc<NmxSubscriptionMessage>>,
) {
while let Some(event) = events.recv().await {
if let CallbackEvent::CallbackInvoked { body, .. } = event {
// The body is the inner NMX subscription message — same
// 23-byte preamble + records as `NmxSubscriptionMessage::parse_inner`
// expects. Parse failures are silent (no consumer) since the
// .NET reference also fires `UnparsedCallbackReceived` events
// separately and we don't model that yet.
if let Ok(msg) = NmxSubscriptionMessage::parse_inner(&body) {
// `send` returns `Err(SendError)` only when there are zero
// receivers — that's fine for this wire path; nothing to do.
let _ = callback_tx.send(Arc::new(msg));
}
}
}
}
impl Session {
/// Open a session over the NMX transport. Mirrors the wire-side of
/// `MxNativeSession.Open` (`MxNativeSession.cs:127-147`) — `Open`
@@ -181,35 +246,59 @@ impl Session {
) -> Result<Self, Error> {
recovery.validate()?;
// 1. Bind a local CallbackExporter on an OS-assigned ephemeral
// port, then build the OBJREF advertising it. Hostname comes
// from `local_hostname()` (env-var lookup); falls back to
// `127.0.0.1` when neither `COMPUTERNAME` nor `HOSTNAME` is
// set so the OBJREF binding is always parseable as
// "<host>[<port>]".
let identities = ExporterIdentities::random();
// Build the loopback address structurally rather than via `.parse()`
// — avoids `.expect()` on a Result that's structurally infallible
// (clippy::expect_used).
let exporter_addr = SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
let (exporter, callback_events) = CallbackExporter::bind(exporter_addr, identities)
.await
.map_err(Error::Io)?;
let hostname = match local_hostname() {
s if s.is_empty() => "127.0.0.1".to_string(),
s => s,
};
let callback_obj_ref = exporter.create_callback_objref(&hostname);
// 2. Spawn the router task that broadcasts parsed callback
// messages.
let (callback_tx, _) = broadcast::channel(CALLBACK_BROADCAST_CAPACITY);
let router_handle = tokio::spawn(callback_router(callback_events, callback_tx.clone()));
// 3. Open the NMX transport + bind.
let mut nmx = NmxClient::connect(addr, service_ipid, ntlm)
.await
.map_err(map_nmx)?;
// RegisterEngine2 with a NULL callback for now — the callback
// exporter wiring lands in wave 2. Mirrors cs:163-175 modulo the
// callback param.
// 4. RegisterEngine2 with the callback OBJREF. Mirrors cs:163-175.
let hr = nmx
.register_engine_2_without_callback(
.register_engine_2(
options.local_engine_id,
&options.engine_name,
options.partner_version,
&callback_obj_ref,
)
.await
.map_err(map_nmx)?;
if hr != 0 {
// Best-effort cleanup — the router will exit when the
// exporter is dropped via the SessionInner Drop path.
return Err(Error::Connection(ConnectionError::EngineNotRegistered));
}
// Optional heartbeat-interval setup (cs:165-167). Mirrored as a
// post-register call when the option is `Some`.
// 5. Optional heartbeat-interval setup (cs:165-167).
if let Some(ticks) = options.heartbeat_ticks_per_beat {
let hr = nmx
.set_heartbeat_send_interval(ticks, options.heartbeat_max_missed_ticks)
.await
.map_err(map_nmx)?;
if hr != 0 {
// Heartbeat mis-configuration is a connection-config issue
// rather than a transport failure.
return Err(Error::Configuration(ConfigError::InvalidArgument {
detail: format!("SetHeartbeatSendInterval returned HRESULT 0x{hr:08x}"),
}));
@@ -221,11 +310,40 @@ impl Session {
options,
resolver,
nmx: Mutex::new(nmx),
callback_exporter: Mutex::new(Some(exporter)),
callback_tx,
router_handle: std::sync::Mutex::new(Some(router_handle)),
connected: std::sync::atomic::AtomicBool::new(true),
}),
})
}
/// Subscribe to the raw stream of parsed callback messages.
/// Returns a [`tokio::sync::broadcast::Receiver`] that yields every
/// `NmxSubscriptionMessage` the router decoded (both `0x32`
/// SubscriptionStatus and `0x33` DataUpdate).
///
/// Per-subscription correlation routing — turning these raw
/// messages into typed `DataChange` items keyed off
/// `Subscription::correlation_id` — is the next iteration's work.
/// This accessor is the test seam + escape hatch consumers can use
/// today to observe the raw stream.
///
/// Receivers can lag by up to [`CALLBACK_BROADCAST_CAPACITY`]
/// messages before the broadcast channel starts dropping; lagged
/// receivers see [`tokio::sync::broadcast::error::RecvError::Lagged`].
#[must_use]
pub fn callbacks(&self) -> broadcast::Receiver<Arc<NmxSubscriptionMessage>> {
self.inner.callback_tx.subscribe()
}
/// Local socket address the embedded callback exporter is bound to.
/// Useful for tests + diagnostics. `None` after `shutdown_nmx`.
pub async fn callback_exporter_addr(&self) -> Option<SocketAddr> {
let lock = self.inner.callback_exporter.lock().await;
lock.as_ref().map(CallbackExporter::local_addr)
}
/// Write a value to a tag. Mirrors `MxNativeSession.WriteAsync`
/// (`cs:165-185`) — resolves the tag through the configured
/// `Resolver`, then delegates to `NmxClient::write`.
@@ -516,14 +634,45 @@ impl Session {
return Ok(());
}
let mut nmx = self.inner.nmx.lock().await;
let hr = nmx
.unregister_engine(self.inner.options.local_engine_id)
.await
.map_err(map_nmx)?;
if hr != 0 {
return Err(Error::Connection(ConnectionError::EngineNotRegistered));
// 1. Unregister the engine on the wire first, while the NMX
// transport is still live.
{
let mut nmx = self.inner.nmx.lock().await;
let hr = nmx
.unregister_engine(self.inner.options.local_engine_id)
.await
.map_err(map_nmx)?;
if hr != 0 {
return Err(Error::Connection(ConnectionError::EngineNotRegistered));
}
}
// 2. Shut down the callback exporter. Its accept loop exits +
// in-flight client connections finish naturally.
if let Some(exp) = self.inner.callback_exporter.lock().await.take() {
exp.shutdown().await;
}
// 3. Wait for the router task. Once the exporter is dropped its
// upstream mpsc::Sender closes, the router's recv() returns
// None, and the task exits naturally.
//
// Take the handle out under the std::sync::Mutex first, then
// drop the guard before `.await` — holding a sync MutexGuard
// across an await is a deadlock hazard (clippy::await_holding_lock).
let router_handle = self
.inner
.router_handle
.lock()
.unwrap_or_else(|e| e.into_inner())
.take();
if let Some(handle) = router_handle {
// Errors here mean the router panicked or was aborted; we
// surface neither to keep shutdown idempotent. The router's
// body has no panic paths.
let _ = handle.await;
}
Ok(())
}
@@ -781,7 +930,13 @@ mod tests {
}
/// Build a Session by going through the unauthenticated bind path
/// (test-only — production `connect_nmx` would do NTLM).
/// (test-only — production `connect_nmx` would do NTLM). The router
/// task is spawned but never sees real callback events; tests that
/// want to exercise the broadcast path can synthesize events by
/// holding a `Session::callbacks()` receiver and pushing
/// `Arc<NmxSubscriptionMessage>` straight onto `inner.callback_tx`
/// via `tx.send(Arc::new(msg))` (the broadcast Sender is in the
/// inner state — accessed via a test-only helper below).
async fn connect_test_session(
addr: SocketAddr,
resolver: Arc<dyn Resolver>,
@@ -793,16 +948,39 @@ mod tests {
let mut transport = DceRpcTcpClient::connect(addr).await.unwrap();
transport.bind(svc::INTERFACE_ID, 0, 0).await.unwrap();
let nmx = NmxClient::from_bound_transport(transport, Guid::new([0xCC; 16]));
// Wire a CallbackExporter + router so the SessionInner shape
// matches production. Tests don't drive real callbacks through
// this path, but keeping the shape symmetric means
// shutdown_nmx exercises the full cleanup chain.
let (exporter, callback_events) =
CallbackExporter::bind("127.0.0.1:0".parse().unwrap(), ExporterIdentities::random())
.await
.unwrap();
let (callback_tx, _) = broadcast::channel(CALLBACK_BROADCAST_CAPACITY);
let router_handle = tokio::spawn(callback_router(callback_events, callback_tx.clone()));
Ok(Session {
inner: Arc::new(SessionInner {
options: SessionOptions::default(),
resolver,
nmx: Mutex::new(nmx),
callback_exporter: Mutex::new(Some(exporter)),
callback_tx,
router_handle: std::sync::Mutex::new(Some(router_handle)),
connected: std::sync::atomic::AtomicBool::new(true),
}),
})
}
/// Test-only helper: clone the inner broadcast Sender so a test can
/// inject synthetic `NmxSubscriptionMessage`s downstream of the
/// router (mimicking what the router would have produced from a
/// real `CallbackInvoked` event).
fn test_inject_sender(session: &Session) -> broadcast::Sender<Arc<NmxSubscriptionMessage>> {
session.inner.callback_tx.clone()
}
#[tokio::test]
async fn write_value_round_trip_via_resolver() {
// Server returns HRESULT 0 for the one TransferData call.
@@ -1121,4 +1299,169 @@ mod tests {
Error::Configuration(ConfigError::InvalidArgument { .. })
));
}
// ---- F15 callback router + broadcast layer ------------------------
#[tokio::test]
async fn callbacks_receiver_observes_injected_subscription_message() {
let (addr, handle) = unauthenticated_server(Vec::new()).await;
let resolver: Arc<dyn Resolver> = Arc::new(StaticResolver::new(&[]));
let session = connect_test_session(addr, resolver).await.unwrap();
// Subscribe BEFORE injecting so the broadcast retains the message.
let mut rx = session.callbacks();
// Build a synthetic NmxSubscriptionMessage. The router would
// produce one of these from a CallbackInvoked event; we
// shortcut directly to the broadcast Sender.
let injected = NmxSubscriptionMessage {
command: 0x33, // DataUpdate
version: 1,
record_count: 0,
operation_id: mxaccess_codec::subscription_message::NmxGuid::from_bytes(&[0xAB; 16])
.unwrap(),
item_correlation_id: None,
records: Vec::new(),
};
let tx = test_inject_sender(&session);
tx.send(Arc::new(injected.clone())).unwrap();
let received = tokio::time::timeout(std::time::Duration::from_secs(1), rx.recv())
.await
.expect("broadcast receiver timed out")
.expect("broadcast Sender was dropped before send completed");
assert_eq!(*received, injected);
handle.abort();
}
#[tokio::test]
async fn callbacks_supports_multiple_subscribers_independently() {
let (addr, handle) = unauthenticated_server(Vec::new()).await;
let resolver: Arc<dyn Resolver> = Arc::new(StaticResolver::new(&[]));
let session = connect_test_session(addr, resolver).await.unwrap();
let mut rx_a = session.callbacks();
let mut rx_b = session.callbacks();
assert_eq!(session.inner.callback_tx.receiver_count(), 2);
let injected = NmxSubscriptionMessage {
command: 0x33,
version: 1,
record_count: 0,
operation_id: mxaccess_codec::subscription_message::NmxGuid::from_bytes(&[0xCD; 16])
.unwrap(),
item_correlation_id: None,
records: Vec::new(),
};
test_inject_sender(&session)
.send(Arc::new(injected.clone()))
.unwrap();
let a = rx_a.recv().await.unwrap();
let b = rx_b.recv().await.unwrap();
// Same Arc points reach both subscribers — broadcast::Receiver
// hands out a clone of the underlying Arc<T>, so the two
// pointers refer to the same allocation.
assert!(Arc::ptr_eq(&a, &b));
handle.abort();
}
#[tokio::test]
async fn callback_exporter_addr_returns_some_until_shutdown() {
let (addr, handle) = unauthenticated_server(vec![(0, Vec::new())]).await;
let resolver: Arc<dyn Resolver> = Arc::new(StaticResolver::new(&[]));
let session = connect_test_session(addr, resolver).await.unwrap();
let cb_addr_before = session.callback_exporter_addr().await;
assert!(
cb_addr_before.is_some(),
"exporter should be live initially"
);
let cloned = session.clone();
session.shutdown_nmx().await.unwrap();
let cb_addr_after = cloned.callback_exporter_addr().await;
assert!(
cb_addr_after.is_none(),
"exporter should be taken on shutdown"
);
handle.await.unwrap();
}
#[tokio::test]
async fn router_task_decodes_callback_invoked_into_broadcast() {
// End-to-end exercise: hand-build a CallbackEvent::CallbackInvoked
// with a valid NmxSubscriptionMessage body, send it through the
// exporter's mpsc channel directly, and observe the parsed
// message on the broadcast.
//
// We can't get at the exporter's internal sender, so we spin a
// standalone callback_router with a fake mpsc pair and a fake
// broadcast pair to test the routing logic in isolation.
let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel();
let (callback_tx, mut callback_rx) = broadcast::channel(8);
let router_h = tokio::spawn(callback_router(event_rx, callback_tx));
// Build a minimal valid 0x32 SubscriptionStatus body: 23-byte
// preamble + 16-byte item_correlation_id, record_count=0 so no
// records follow. Total: 39 bytes. Using 0x32 (not 0x33)
// because DataUpdate always attempts to parse one record
// regardless of record_count, and we'd need a full 38-byte
// record body to satisfy that parser.
let mut body = vec![0u8; 39];
body[0] = 0x32;
body[1..3].copy_from_slice(&1u16.to_le_bytes()); // version
body[3..7].copy_from_slice(&0i32.to_le_bytes()); // record_count
body[7..23].copy_from_slice(&[0xEFu8; 16]); // operation_id
body[23..39].copy_from_slice(&[0xCDu8; 16]); // item_correlation_id
let event = CallbackEvent::CallbackInvoked { opnum: 4, body };
event_tx.send(event).unwrap();
let received = tokio::time::timeout(std::time::Duration::from_secs(1), callback_rx.recv())
.await
.expect("router timed out");
match received {
Ok(msg) => {
assert_eq!(msg.command, 0x32);
assert_eq!(msg.record_count, 0);
assert_eq!(msg.operation_id.0, [0xEFu8; 16]);
assert_eq!(msg.item_correlation_id.unwrap().0, [0xCDu8; 16]);
}
Err(e) => panic!("broadcast recv error: {e}"),
}
// Drop the upstream sender → router exits naturally.
drop(event_tx);
let _ = tokio::time::timeout(std::time::Duration::from_secs(1), router_h)
.await
.expect("router task didn't exit after upstream close");
}
#[tokio::test]
async fn router_silently_drops_non_callback_events() {
// Bind / Auth3Ignored / ProtocolError / etc. should be ignored
// by the router (they're CallbackExporter diagnostics, not
// subscription data). Verify by sending a Bind event and
// asserting nothing arrives on the broadcast within a short
// window.
let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel();
let (callback_tx, mut callback_rx) = broadcast::channel(8);
let router_h = tokio::spawn(callback_router(event_rx, callback_tx));
event_tx
.send(CallbackEvent::Bind {
context_id: 0,
iid: mxaccess_rpc::guid::Guid::ZERO,
})
.unwrap();
let res =
tokio::time::timeout(std::time::Duration::from_millis(100), callback_rx.recv()).await;
assert!(res.is_err(), "broadcast yielded unexpectedly: {res:?}");
drop(event_tx);
let _ = router_h.await;
}
}