diff --git a/rust/crates/mxaccess/Cargo.toml b/rust/crates/mxaccess/Cargo.toml index 30d3fc5..4889275 100644 --- a/rust/crates/mxaccess/Cargo.toml +++ b/rust/crates/mxaccess/Cargo.toml @@ -9,14 +9,15 @@ rust-version.workspace = true authors.workspace = true [dependencies] -mxaccess-codec = { path = "../mxaccess-codec" } -mxaccess-galaxy = { path = "../mxaccess-galaxy" } -mxaccess-nmx = { path = "../mxaccess-nmx" } -mxaccess-rpc = { path = "../mxaccess-rpc" } -thiserror = { workspace = true } -tokio = { workspace = true } -tracing = { workspace = true } -rand = "0.8" +mxaccess-codec = { path = "../mxaccess-codec" } +mxaccess-callback = { path = "../mxaccess-callback" } +mxaccess-galaxy = { path = "../mxaccess-galaxy" } +mxaccess-nmx = { path = "../mxaccess-nmx" } +mxaccess-rpc = { path = "../mxaccess-rpc" } +thiserror = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } +rand = "0.8" [dev-dependencies] async-trait = { workspace = true } diff --git a/rust/crates/mxaccess/src/session.rs b/rust/crates/mxaccess/src/session.rs index bb6642d..8d48b9c 100644 --- a/rust/crates/mxaccess/src/session.rs +++ b/rust/crates/mxaccess/src/session.rs @@ -20,26 +20,44 @@ //! ## What's deliberately NOT here (yet) //! //! - Recovery loop / `RecoveryEvent` emission (wave 2). -//! - Callback exporter wiring + `Subscription` stream (wave 2). +//! - Per-`Subscription` `Stream` routing (followup +//! F15). The callback exporter is now wired in `connect_nmx` and the +//! broadcast channel exposed via [`Session::callbacks`] yields raw +//! parsed `NmxSubscriptionMessage`s; turning those into typed +//! per-subscription `DataChange` items is the next iteration's work. //! - `read` (read-as-subscribe pattern from `MxNativeSession.ReadAsync` -//! `cs:312-359`) — needs the callback exporter. +//! `cs:312-359`) — needs F15's per-subscription routing. //! - Auto-resolving COM activation (followup F12). use std::sync::Arc; use std::time::SystemTime; +use mxaccess_callback::{CallbackEvent, CallbackExporter, ExporterIdentities}; +use mxaccess_codec::NmxSubscriptionMessage; use mxaccess_galaxy::{GalaxyTagMetadata, Resolver, ResolverError}; use mxaccess_nmx::{NmxClient, NmxClientError, WriteValue}; use mxaccess_rpc::guid::Guid; -use mxaccess_rpc::ntlm::NtlmClientContext; +use mxaccess_rpc::ntlm::{NtlmClientContext, local_hostname}; use mxaccess_rpc::transport::TransportError; use std::net::SocketAddr; -use tokio::sync::Mutex; +use tokio::sync::{Mutex, broadcast}; +use tokio::task::JoinHandle; use crate::{ ConfigError, ConnectionError, Error, RecoveryPolicy, SecurityContext, Session, SessionOptions, }; +/// Capacity of the broadcast channel that fans out parsed +/// `NmxSubscriptionMessage`s to all consumers (`Session::callbacks` / +/// future `Subscription::Stream`). +/// +/// Picked to absorb a short burst of updates without dropping when a +/// consumer is briefly slow. Subscribers that lag past this many items +/// receive `RecvError::Lagged(n)` from `broadcast::Receiver::recv()` — +/// the wire protocol does NOT replay missed updates so consumers must +/// either keep up or accept lag-loss. +const CALLBACK_BROADCAST_CAPACITY: usize = 256; + /// Subscription lifecycle handle returned by [`Session::subscribe`]. /// /// Carries the 16-byte `correlation_id` the Rust port generated for @@ -134,6 +152,19 @@ pub struct SessionInner { pub(crate) options: SessionOptions, pub(crate) resolver: Arc, pub(crate) nmx: Mutex, + /// Local TCP server that `NmxSvc.exe` calls back into. `None` after + /// `shutdown_nmx` consumes it. Held behind a `tokio::sync::Mutex` + /// so the shutdown path can `take()` it out and `await` its + /// `shutdown()`. + pub(crate) callback_exporter: Mutex>, + /// Broadcast channel that fans out parsed callback messages. Tap + /// via [`Session::callbacks`]. + pub(crate) callback_tx: broadcast::Sender>, + /// Handle to the router task that drains the + /// [`CallbackExporter`]'s `CallbackEvent` channel and pushes parsed + /// `NmxSubscriptionMessage`s onto `callback_tx`. `None` after + /// `shutdown_nmx` joins on it. + pub(crate) router_handle: std::sync::Mutex>>, /// `false` after [`Session::shutdown`] has run successfully. Subsequent /// operations short-circuit with [`Error::Connection`]. pub(crate) connected: std::sync::atomic::AtomicBool, @@ -147,10 +178,44 @@ impl std::fmt::Debug for SessionInner { "connected", &self.connected.load(std::sync::atomic::Ordering::Acquire), ) + .field( + "callback_subscriber_count", + &self.callback_tx.receiver_count(), + ) .finish_non_exhaustive() } } +/// Drain `CallbackExporter` events, decode `CallbackInvoked` bodies as +/// `NmxSubscriptionMessage`, and broadcast each parsed message. +/// +/// Exits when the upstream `CallbackEvent` channel closes (which +/// happens when the `CallbackExporter` is dropped or +/// `shutdown()`-ed). Other event variants (Bind, Auth3Ignored, +/// ProtocolError, etc.) are ignored at this layer; consumers that +/// need them can subscribe to the raw `CallbackExporter` events +/// directly via a future "diagnostic-channel" hook (no followup yet +/// — surface only when a real consumer asks). +pub(crate) async fn callback_router( + mut events: tokio::sync::mpsc::UnboundedReceiver, + callback_tx: broadcast::Sender>, +) { + while let Some(event) = events.recv().await { + if let CallbackEvent::CallbackInvoked { body, .. } = event { + // The body is the inner NMX subscription message — same + // 23-byte preamble + records as `NmxSubscriptionMessage::parse_inner` + // expects. Parse failures are silent (no consumer) since the + // .NET reference also fires `UnparsedCallbackReceived` events + // separately and we don't model that yet. + if let Ok(msg) = NmxSubscriptionMessage::parse_inner(&body) { + // `send` returns `Err(SendError)` only when there are zero + // receivers — that's fine for this wire path; nothing to do. + let _ = callback_tx.send(Arc::new(msg)); + } + } + } +} + impl Session { /// Open a session over the NMX transport. Mirrors the wire-side of /// `MxNativeSession.Open` (`MxNativeSession.cs:127-147`) — `Open` @@ -181,35 +246,59 @@ impl Session { ) -> Result { recovery.validate()?; + // 1. Bind a local CallbackExporter on an OS-assigned ephemeral + // port, then build the OBJREF advertising it. Hostname comes + // from `local_hostname()` (env-var lookup); falls back to + // `127.0.0.1` when neither `COMPUTERNAME` nor `HOSTNAME` is + // set so the OBJREF binding is always parseable as + // "[]". + let identities = ExporterIdentities::random(); + // Build the loopback address structurally rather than via `.parse()` + // — avoids `.expect()` on a Result that's structurally infallible + // (clippy::expect_used). + let exporter_addr = SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0); + let (exporter, callback_events) = CallbackExporter::bind(exporter_addr, identities) + .await + .map_err(Error::Io)?; + let hostname = match local_hostname() { + s if s.is_empty() => "127.0.0.1".to_string(), + s => s, + }; + let callback_obj_ref = exporter.create_callback_objref(&hostname); + + // 2. Spawn the router task that broadcasts parsed callback + // messages. + let (callback_tx, _) = broadcast::channel(CALLBACK_BROADCAST_CAPACITY); + let router_handle = tokio::spawn(callback_router(callback_events, callback_tx.clone())); + + // 3. Open the NMX transport + bind. let mut nmx = NmxClient::connect(addr, service_ipid, ntlm) .await .map_err(map_nmx)?; - // RegisterEngine2 with a NULL callback for now — the callback - // exporter wiring lands in wave 2. Mirrors cs:163-175 modulo the - // callback param. + // 4. RegisterEngine2 with the callback OBJREF. Mirrors cs:163-175. let hr = nmx - .register_engine_2_without_callback( + .register_engine_2( options.local_engine_id, &options.engine_name, options.partner_version, + &callback_obj_ref, ) .await .map_err(map_nmx)?; if hr != 0 { + // Best-effort cleanup — the router will exit when the + // exporter is dropped via the SessionInner Drop path. return Err(Error::Connection(ConnectionError::EngineNotRegistered)); } - // Optional heartbeat-interval setup (cs:165-167). Mirrored as a - // post-register call when the option is `Some`. + // 5. Optional heartbeat-interval setup (cs:165-167). if let Some(ticks) = options.heartbeat_ticks_per_beat { let hr = nmx .set_heartbeat_send_interval(ticks, options.heartbeat_max_missed_ticks) .await .map_err(map_nmx)?; if hr != 0 { - // Heartbeat mis-configuration is a connection-config issue - // rather than a transport failure. return Err(Error::Configuration(ConfigError::InvalidArgument { detail: format!("SetHeartbeatSendInterval returned HRESULT 0x{hr:08x}"), })); @@ -221,11 +310,40 @@ impl Session { options, resolver, nmx: Mutex::new(nmx), + callback_exporter: Mutex::new(Some(exporter)), + callback_tx, + router_handle: std::sync::Mutex::new(Some(router_handle)), connected: std::sync::atomic::AtomicBool::new(true), }), }) } + /// Subscribe to the raw stream of parsed callback messages. + /// Returns a [`tokio::sync::broadcast::Receiver`] that yields every + /// `NmxSubscriptionMessage` the router decoded (both `0x32` + /// SubscriptionStatus and `0x33` DataUpdate). + /// + /// Per-subscription correlation routing — turning these raw + /// messages into typed `DataChange` items keyed off + /// `Subscription::correlation_id` — is the next iteration's work. + /// This accessor is the test seam + escape hatch consumers can use + /// today to observe the raw stream. + /// + /// Receivers can lag by up to [`CALLBACK_BROADCAST_CAPACITY`] + /// messages before the broadcast channel starts dropping; lagged + /// receivers see [`tokio::sync::broadcast::error::RecvError::Lagged`]. + #[must_use] + pub fn callbacks(&self) -> broadcast::Receiver> { + self.inner.callback_tx.subscribe() + } + + /// Local socket address the embedded callback exporter is bound to. + /// Useful for tests + diagnostics. `None` after `shutdown_nmx`. + pub async fn callback_exporter_addr(&self) -> Option { + let lock = self.inner.callback_exporter.lock().await; + lock.as_ref().map(CallbackExporter::local_addr) + } + /// Write a value to a tag. Mirrors `MxNativeSession.WriteAsync` /// (`cs:165-185`) — resolves the tag through the configured /// `Resolver`, then delegates to `NmxClient::write`. @@ -516,14 +634,45 @@ impl Session { return Ok(()); } - let mut nmx = self.inner.nmx.lock().await; - let hr = nmx - .unregister_engine(self.inner.options.local_engine_id) - .await - .map_err(map_nmx)?; - if hr != 0 { - return Err(Error::Connection(ConnectionError::EngineNotRegistered)); + // 1. Unregister the engine on the wire first, while the NMX + // transport is still live. + { + let mut nmx = self.inner.nmx.lock().await; + let hr = nmx + .unregister_engine(self.inner.options.local_engine_id) + .await + .map_err(map_nmx)?; + if hr != 0 { + return Err(Error::Connection(ConnectionError::EngineNotRegistered)); + } } + + // 2. Shut down the callback exporter. Its accept loop exits + + // in-flight client connections finish naturally. + if let Some(exp) = self.inner.callback_exporter.lock().await.take() { + exp.shutdown().await; + } + + // 3. Wait for the router task. Once the exporter is dropped its + // upstream mpsc::Sender closes, the router's recv() returns + // None, and the task exits naturally. + // + // Take the handle out under the std::sync::Mutex first, then + // drop the guard before `.await` — holding a sync MutexGuard + // across an await is a deadlock hazard (clippy::await_holding_lock). + let router_handle = self + .inner + .router_handle + .lock() + .unwrap_or_else(|e| e.into_inner()) + .take(); + if let Some(handle) = router_handle { + // Errors here mean the router panicked or was aborted; we + // surface neither to keep shutdown idempotent. The router's + // body has no panic paths. + let _ = handle.await; + } + Ok(()) } @@ -781,7 +930,13 @@ mod tests { } /// Build a Session by going through the unauthenticated bind path - /// (test-only — production `connect_nmx` would do NTLM). + /// (test-only — production `connect_nmx` would do NTLM). The router + /// task is spawned but never sees real callback events; tests that + /// want to exercise the broadcast path can synthesize events by + /// holding a `Session::callbacks()` receiver and pushing + /// `Arc` straight onto `inner.callback_tx` + /// via `tx.send(Arc::new(msg))` (the broadcast Sender is in the + /// inner state — accessed via a test-only helper below). async fn connect_test_session( addr: SocketAddr, resolver: Arc, @@ -793,16 +948,39 @@ mod tests { let mut transport = DceRpcTcpClient::connect(addr).await.unwrap(); transport.bind(svc::INTERFACE_ID, 0, 0).await.unwrap(); let nmx = NmxClient::from_bound_transport(transport, Guid::new([0xCC; 16])); + + // Wire a CallbackExporter + router so the SessionInner shape + // matches production. Tests don't drive real callbacks through + // this path, but keeping the shape symmetric means + // shutdown_nmx exercises the full cleanup chain. + let (exporter, callback_events) = + CallbackExporter::bind("127.0.0.1:0".parse().unwrap(), ExporterIdentities::random()) + .await + .unwrap(); + let (callback_tx, _) = broadcast::channel(CALLBACK_BROADCAST_CAPACITY); + let router_handle = tokio::spawn(callback_router(callback_events, callback_tx.clone())); + Ok(Session { inner: Arc::new(SessionInner { options: SessionOptions::default(), resolver, nmx: Mutex::new(nmx), + callback_exporter: Mutex::new(Some(exporter)), + callback_tx, + router_handle: std::sync::Mutex::new(Some(router_handle)), connected: std::sync::atomic::AtomicBool::new(true), }), }) } + /// Test-only helper: clone the inner broadcast Sender so a test can + /// inject synthetic `NmxSubscriptionMessage`s downstream of the + /// router (mimicking what the router would have produced from a + /// real `CallbackInvoked` event). + fn test_inject_sender(session: &Session) -> broadcast::Sender> { + session.inner.callback_tx.clone() + } + #[tokio::test] async fn write_value_round_trip_via_resolver() { // Server returns HRESULT 0 for the one TransferData call. @@ -1121,4 +1299,169 @@ mod tests { Error::Configuration(ConfigError::InvalidArgument { .. }) )); } + + // ---- F15 callback router + broadcast layer ------------------------ + + #[tokio::test] + async fn callbacks_receiver_observes_injected_subscription_message() { + let (addr, handle) = unauthenticated_server(Vec::new()).await; + let resolver: Arc = Arc::new(StaticResolver::new(&[])); + let session = connect_test_session(addr, resolver).await.unwrap(); + + // Subscribe BEFORE injecting so the broadcast retains the message. + let mut rx = session.callbacks(); + + // Build a synthetic NmxSubscriptionMessage. The router would + // produce one of these from a CallbackInvoked event; we + // shortcut directly to the broadcast Sender. + let injected = NmxSubscriptionMessage { + command: 0x33, // DataUpdate + version: 1, + record_count: 0, + operation_id: mxaccess_codec::subscription_message::NmxGuid::from_bytes(&[0xAB; 16]) + .unwrap(), + item_correlation_id: None, + records: Vec::new(), + }; + let tx = test_inject_sender(&session); + tx.send(Arc::new(injected.clone())).unwrap(); + + let received = tokio::time::timeout(std::time::Duration::from_secs(1), rx.recv()) + .await + .expect("broadcast receiver timed out") + .expect("broadcast Sender was dropped before send completed"); + assert_eq!(*received, injected); + handle.abort(); + } + + #[tokio::test] + async fn callbacks_supports_multiple_subscribers_independently() { + let (addr, handle) = unauthenticated_server(Vec::new()).await; + let resolver: Arc = Arc::new(StaticResolver::new(&[])); + let session = connect_test_session(addr, resolver).await.unwrap(); + + let mut rx_a = session.callbacks(); + let mut rx_b = session.callbacks(); + assert_eq!(session.inner.callback_tx.receiver_count(), 2); + + let injected = NmxSubscriptionMessage { + command: 0x33, + version: 1, + record_count: 0, + operation_id: mxaccess_codec::subscription_message::NmxGuid::from_bytes(&[0xCD; 16]) + .unwrap(), + item_correlation_id: None, + records: Vec::new(), + }; + test_inject_sender(&session) + .send(Arc::new(injected.clone())) + .unwrap(); + + let a = rx_a.recv().await.unwrap(); + let b = rx_b.recv().await.unwrap(); + // Same Arc points reach both subscribers — broadcast::Receiver + // hands out a clone of the underlying Arc, so the two + // pointers refer to the same allocation. + assert!(Arc::ptr_eq(&a, &b)); + handle.abort(); + } + + #[tokio::test] + async fn callback_exporter_addr_returns_some_until_shutdown() { + let (addr, handle) = unauthenticated_server(vec![(0, Vec::new())]).await; + let resolver: Arc = Arc::new(StaticResolver::new(&[])); + let session = connect_test_session(addr, resolver).await.unwrap(); + let cb_addr_before = session.callback_exporter_addr().await; + assert!( + cb_addr_before.is_some(), + "exporter should be live initially" + ); + + let cloned = session.clone(); + session.shutdown_nmx().await.unwrap(); + + let cb_addr_after = cloned.callback_exporter_addr().await; + assert!( + cb_addr_after.is_none(), + "exporter should be taken on shutdown" + ); + handle.await.unwrap(); + } + + #[tokio::test] + async fn router_task_decodes_callback_invoked_into_broadcast() { + // End-to-end exercise: hand-build a CallbackEvent::CallbackInvoked + // with a valid NmxSubscriptionMessage body, send it through the + // exporter's mpsc channel directly, and observe the parsed + // message on the broadcast. + // + // We can't get at the exporter's internal sender, so we spin a + // standalone callback_router with a fake mpsc pair and a fake + // broadcast pair to test the routing logic in isolation. + let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel(); + let (callback_tx, mut callback_rx) = broadcast::channel(8); + + let router_h = tokio::spawn(callback_router(event_rx, callback_tx)); + + // Build a minimal valid 0x32 SubscriptionStatus body: 23-byte + // preamble + 16-byte item_correlation_id, record_count=0 so no + // records follow. Total: 39 bytes. Using 0x32 (not 0x33) + // because DataUpdate always attempts to parse one record + // regardless of record_count, and we'd need a full 38-byte + // record body to satisfy that parser. + let mut body = vec![0u8; 39]; + body[0] = 0x32; + body[1..3].copy_from_slice(&1u16.to_le_bytes()); // version + body[3..7].copy_from_slice(&0i32.to_le_bytes()); // record_count + body[7..23].copy_from_slice(&[0xEFu8; 16]); // operation_id + body[23..39].copy_from_slice(&[0xCDu8; 16]); // item_correlation_id + + let event = CallbackEvent::CallbackInvoked { opnum: 4, body }; + event_tx.send(event).unwrap(); + + let received = tokio::time::timeout(std::time::Duration::from_secs(1), callback_rx.recv()) + .await + .expect("router timed out"); + match received { + Ok(msg) => { + assert_eq!(msg.command, 0x32); + assert_eq!(msg.record_count, 0); + assert_eq!(msg.operation_id.0, [0xEFu8; 16]); + assert_eq!(msg.item_correlation_id.unwrap().0, [0xCDu8; 16]); + } + Err(e) => panic!("broadcast recv error: {e}"), + } + + // Drop the upstream sender → router exits naturally. + drop(event_tx); + let _ = tokio::time::timeout(std::time::Duration::from_secs(1), router_h) + .await + .expect("router task didn't exit after upstream close"); + } + + #[tokio::test] + async fn router_silently_drops_non_callback_events() { + // Bind / Auth3Ignored / ProtocolError / etc. should be ignored + // by the router (they're CallbackExporter diagnostics, not + // subscription data). Verify by sending a Bind event and + // asserting nothing arrives on the broadcast within a short + // window. + let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel(); + let (callback_tx, mut callback_rx) = broadcast::channel(8); + let router_h = tokio::spawn(callback_router(event_rx, callback_tx)); + + event_tx + .send(CallbackEvent::Bind { + context_id: 0, + iid: mxaccess_rpc::guid::Guid::ZERO, + }) + .unwrap(); + + let res = + tokio::time::timeout(std::time::Duration::from_millis(100), callback_rx.recv()).await; + assert!(res.is_err(), "broadcast yielded unexpectedly: {res:?}"); + + drop(event_tx); + let _ = router_h.await; + } }