[M4] mxaccess: Session::connect_nmx + write_value + shutdown (wave 1 main)

First working M4 wave 1 slice. Adds session.rs with the connect /
write / shutdown path on top of NmxClient + Resolver, plus a tokio
test that exercises a full round-trip against a hand-rolled server.
Read, subscribe, recovery, and the long-lived connection task land
in wave 2.

Architecture
- Session holds Arc<SessionInner>; SessionInner wraps NmxClient
  behind a tokio::sync::Mutex. All RPC ops serialize on that mutex.
  Wave 2 will replace it with an mpsc::channel<Op> + dispatcher task
  per design/70-risks-and-open-questions.md R15 (drop-time async
  cleanup hazards).
- ensure_connected gate stops post-shutdown ops with
  Connection::EngineNotRegistered. Shutdown is idempotent via
  AtomicBool::swap.
- Manual Debug impl on SessionInner — neither dyn Resolver nor
  NmxClient impl Debug.

Public API
- Session::connect_nmx(addr, options, ntlm, service_ipid, resolver,
  recovery): validates the policy, opens NmxClient, runs
  RegisterEngine2 (no callback yet — wave 2), optionally configures
  heartbeat. Returns Error::Connection on non-zero HRESULT.
- Session::write_value(reference, value: WriteValue): resolves the
  tag through the configured Resolver, dispatches NmxClient::write.
- Session::resolve_write_kind / resolve_tag: convenience accessors.
- Session::shutdown_nmx: calls UnregisterEngine, idempotent.

Error mapping
- map_nmx / map_transport / map_resolver bridge the inner crate
  errors into the public Error enum. NonZeroHresult → InvalidArgument
  with the hex code; transport Fault → Status-shaped error;
  ResolverError::NotFound → Galaxy { reason: "tag not found: ..." }.
- All three matchers handle their #[non_exhaustive] sources with a
  generic catch-all so future variants don't silently break the map.

Tests (8 new in mxaccess; total mxaccess: 19)
- write_value round-trip via in-memory StaticResolver + hand-rolled
  unauthenticated DCE/RPC server.
- write_value propagates resolver not-found → Galaxy error.
- write_value propagates non-zero HRESULT → InvalidArgument.
- shutdown is idempotent (second call is a no-op).
- write after shutdown returns EngineNotRegistered.
- resolve_tag and resolve_write_kind work without RPC.
- envelope-kind constants used by Session match codec exports
  (sanity guard against codec rename).

mxaccess-nmx: WriteValue now re-exported at crate root.
mxaccess: deps gained mxaccess-nmx/galaxy/rpc + tokio + tracing,
plus async-trait as a dev-dep for the test resolver impl.

Test count delta: 479 -> 487 (+8). All four DoD gates green.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-05 09:01:44 -04:00
parent 5cbc330f82
commit 12cb10c3a1
5 changed files with 681 additions and 7 deletions
+7
View File
@@ -214,8 +214,14 @@ dependencies = [
name = "mxaccess"
version = "0.0.0"
dependencies = [
"async-trait",
"mxaccess-codec",
"mxaccess-galaxy",
"mxaccess-nmx",
"mxaccess-rpc",
"thiserror",
"tokio",
"tracing",
]
[[package]]
@@ -272,6 +278,7 @@ version = "0.0.0"
dependencies = [
"mxaccess-callback",
"mxaccess-codec",
"mxaccess-galaxy",
"mxaccess-rpc",
"rand",
"thiserror",
+1 -1
View File
@@ -24,4 +24,4 @@
pub mod client;
pub use client::{NmxClient, NmxClientError};
pub use client::{NmxClient, NmxClientError, WriteValue};
+10 -2
View File
@@ -9,8 +9,16 @@ rust-version.workspace = true
authors.workspace = true
[dependencies]
mxaccess-codec = { path = "../mxaccess-codec" }
thiserror = { workspace = true }
mxaccess-codec = { path = "../mxaccess-codec" }
mxaccess-galaxy = { path = "../mxaccess-galaxy" }
mxaccess-nmx = { path = "../mxaccess-nmx" }
mxaccess-rpc = { path = "../mxaccess-rpc" }
thiserror = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
[dev-dependencies]
async-trait = { workspace = true }
[features]
default = []
+6 -4
View File
@@ -21,17 +21,19 @@ pub use mxaccess_codec::{
// ---- Public types --------------------------------------------------------
pub mod session;
pub use mxaccess_galaxy::{GalaxyTagMetadata, Resolver, ResolverError};
pub use mxaccess_nmx::WriteValue;
/// Async session façade. Cheap clones share the inner state; drop of the last
/// clone fires `UnregisterEngine` best-effort. For deterministic shutdown,
/// call `Session::shutdown(timeout).await`.
#[derive(Debug, Clone)]
pub struct Session {
_inner: Arc<SessionInner>,
pub(crate) inner: Arc<session::SessionInner>,
}
#[derive(Debug)]
struct SessionInner;
/// Stream of `DataChange` items. Drop sends `UnAdvise` via the long-lived
/// connection task (no `tokio::spawn` from `Drop`).
#[derive(Debug)]
+657
View File
@@ -0,0 +1,657 @@
//! M4 wave 1 main — `Session` over the NMX transport.
//!
//! Direct port of the connect / write / shutdown slice of
//! `src/MxNativeClient/MxNativeSession.cs`. Wave 1 deliberately ships
//! a minimal surface: `connect_nmx` + `write` + `shutdown`. Read,
//! subscribe, recovery, and the long-lived connection task land in
//! later waves — see `design/60-roadmap.md` M4 wave 2.
//!
//! ## Architecture (wave 1)
//!
//! Session holds an `Arc<SessionInner>` so the public type is cheaply
//! cloneable. The inner state wraps a single `NmxClient` behind a
//! [`tokio::sync::Mutex`]; all RPC operations serialize on that mutex.
//! This is intentionally simple: wave 2 replaces the mutex with a
//! long-lived connection task driven by an `mpsc::channel<Op>`, which
//! supports overlapping operations + clean shutdown without the
//! "spawn-from-Drop" hazard tracked at `design/70-risks-and-open-questions.md`
//! R15.
//!
//! ## What's deliberately NOT here (yet)
//!
//! - Recovery loop / `RecoveryEvent` emission (wave 2).
//! - Callback exporter wiring + `Subscription` stream (wave 2).
//! - `read` (read-as-subscribe pattern from `MxNativeSession.ReadAsync`
//! `cs:312-359`) — needs the callback exporter.
//! - Auto-resolving COM activation (followup F12).
use std::sync::Arc;
use mxaccess_galaxy::{GalaxyTagMetadata, Resolver, ResolverError};
use mxaccess_nmx::{NmxClient, NmxClientError, WriteValue};
use mxaccess_rpc::guid::Guid;
use mxaccess_rpc::ntlm::NtlmClientContext;
use mxaccess_rpc::transport::TransportError;
use std::net::SocketAddr;
use tokio::sync::Mutex;
use crate::{ConfigError, ConnectionError, Error, RecoveryPolicy, Session, SessionOptions};
/// Inner state of [`Session`] when connected over NMX. Held inside the
/// public type's `Arc<SessionInner>` so the public clone surface stays
/// cheap.
///
/// Manual `Debug` impl below: neither `dyn Resolver` nor `NmxClient`
/// implement `Debug`, so a derive would fail.
pub struct SessionInner {
pub(crate) options: SessionOptions,
pub(crate) resolver: Arc<dyn Resolver>,
pub(crate) nmx: Mutex<NmxClient>,
/// `false` after [`Session::shutdown`] has run successfully. Subsequent
/// operations short-circuit with [`Error::Connection`].
pub(crate) connected: std::sync::atomic::AtomicBool,
}
impl std::fmt::Debug for SessionInner {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SessionInner")
.field("options", &self.options)
.field(
"connected",
&self.connected.load(std::sync::atomic::Ordering::Acquire),
)
.finish_non_exhaustive()
}
}
impl Session {
/// Open a session over the NMX transport. Mirrors the wire-side of
/// `MxNativeSession.Open` (`MxNativeSession.cs:127-147`) — `Open`
/// itself is .NET-side: COM-activates `NmxSvc.NmxService`, marshals
/// an OBJREF, calls ResolveOxid + RemQI to discover `(host, port,
/// service_ipid)`, then calls `RegisterEngine2`. The Rust port
/// requires the caller to pre-resolve those because COM activation
/// is not yet wired (followup F12); the call sequence after that is
/// identical.
///
/// On success: a `RegisterEngine2` round-trip has completed and the
/// LMX server has acknowledged the engine registration. The
/// `recovery` argument is validated but not yet consumed (wave 2
/// reads it; wave 1 records it for later).
///
/// # Errors
/// - [`Error::Configuration`] if `recovery.validate()` rejects.
/// - [`Error::Io`] / transport errors from the TCP / NTLM bind.
/// - [`Error::Connection`] if `RegisterEngine2` returned a non-zero
/// HRESULT.
pub async fn connect_nmx(
addr: SocketAddr,
options: SessionOptions,
ntlm: NtlmClientContext,
service_ipid: Guid,
resolver: Arc<dyn Resolver>,
recovery: RecoveryPolicy,
) -> Result<Self, Error> {
recovery.validate()?;
let mut nmx = NmxClient::connect(addr, service_ipid, ntlm)
.await
.map_err(map_nmx)?;
// RegisterEngine2 with a NULL callback for now — the callback
// exporter wiring lands in wave 2. Mirrors cs:163-175 modulo the
// callback param.
let hr = nmx
.register_engine_2_without_callback(
options.local_engine_id,
&options.engine_name,
options.partner_version,
)
.await
.map_err(map_nmx)?;
if hr != 0 {
return Err(Error::Connection(ConnectionError::EngineNotRegistered));
}
// Optional heartbeat-interval setup (cs:165-167). Mirrored as a
// post-register call when the option is `Some`.
if let Some(ticks) = options.heartbeat_ticks_per_beat {
let hr = nmx
.set_heartbeat_send_interval(ticks, options.heartbeat_max_missed_ticks)
.await
.map_err(map_nmx)?;
if hr != 0 {
// Heartbeat mis-configuration is a connection-config issue
// rather than a transport failure.
return Err(Error::Configuration(ConfigError::InvalidArgument {
detail: format!("SetHeartbeatSendInterval returned HRESULT 0x{hr:08x}"),
}));
}
}
Ok(Self {
inner: Arc::new(SessionInner {
options,
resolver,
nmx: Mutex::new(nmx),
connected: std::sync::atomic::AtomicBool::new(true),
}),
})
}
/// Write a value to a tag. Mirrors `MxNativeSession.WriteAsync`
/// (`cs:165-185`) — resolves the tag through the configured
/// `Resolver`, then delegates to `NmxClient::write`.
///
/// `value` is a typed [`WriteValue`] (re-exported from `mxaccess-nmx`).
/// Use [`GalaxyTagMetadata::resolve_write_kind`] to pre-flight which
/// variant the tag's `(mx_data_type, is_array)` accepts.
///
/// # Errors
/// - [`Error::Connection`] if the session was already shut down.
/// - [`Error::Configuration`] if the resolver rejects `reference`.
/// - [`Error::Io`] / transport errors from the underlying RPC.
/// - [`Error::Status`]-shaped error if the LMX server returned a
/// non-zero application HRESULT.
pub async fn write_value(&self, reference: &str, value: WriteValue) -> Result<(), Error> {
self.ensure_connected()?;
let inner = self.inner.clone();
let metadata = inner
.resolver
.resolve(reference)
.await
.map_err(map_resolver)?;
let opts = &inner.options;
let mut nmx = inner.nmx.lock().await;
let hr = nmx
.write(
opts.local_engine_id,
&metadata,
&value,
/* write_index */ 1,
/* client_token */ 0,
opts.galaxy_id,
/* source_galaxy_id */ i32::from(opts.galaxy_id),
opts.source_platform_id,
)
.await
.map_err(map_nmx)?;
ensure_hresult_ok(hr)?;
Ok(())
}
/// Pre-resolve the wire kind a tag expects without dispatching a
/// write. Convenience wrapper that pulls the metadata through the
/// configured resolver and delegates to
/// [`GalaxyTagMetadata::resolve_write_kind`]. Useful when the
/// caller wants to choose the right [`WriteValue`] variant before
/// constructing one.
///
/// # Errors
/// As for [`Self::write_value`] (resolver / config errors); plus
/// [`Error::Configuration`] when the metadata's `(mx_data_type,
/// is_array)` has no LMX wire encoding.
pub async fn resolve_write_kind(
&self,
reference: &str,
) -> Result<mxaccess_codec::MxValueKind, Error> {
self.ensure_connected()?;
let inner = self.inner.clone();
let metadata = inner
.resolver
.resolve(reference)
.await
.map_err(map_resolver)?;
metadata.resolve_write_kind().map_err(|e| {
Error::Configuration(ConfigError::InvalidArgument {
detail: e.to_string(),
})
})
}
/// Resolve a tag without dispatching any RPC. Primarily for
/// callers that need the metadata directly (e.g. for browse UIs).
///
/// # Errors
/// As for [`Self::write_value`] (resolver / config errors).
pub async fn resolve_tag(&self, reference: &str) -> Result<GalaxyTagMetadata, Error> {
self.ensure_connected()?;
let inner = self.inner.clone();
inner
.resolver
.resolve(reference)
.await
.map_err(map_resolver)
}
/// Orderly shutdown — calls `UnregisterEngine` then drops the inner
/// transport. Mirrors `MxNativeSession.Dispose` (`cs:468-482`)
/// minus the COM-side `Marshal.ReleaseComObject` (no .NET COM in
/// the Rust port).
///
/// Idempotent; second and subsequent calls return `Ok(())` without
/// re-issuing the unregister.
///
/// # Errors
/// - [`Error::Io`] / transport errors from the unregister round-trip.
/// - [`Error::Connection`] if the unregister HRESULT was non-zero.
pub async fn shutdown_nmx(self) -> Result<(), Error> {
if !self
.inner
.connected
.swap(false, std::sync::atomic::Ordering::AcqRel)
{
return Ok(());
}
let mut nmx = self.inner.nmx.lock().await;
let hr = nmx
.unregister_engine(self.inner.options.local_engine_id)
.await
.map_err(map_nmx)?;
if hr != 0 {
return Err(Error::Connection(ConnectionError::EngineNotRegistered));
}
Ok(())
}
fn ensure_connected(&self) -> Result<(), Error> {
if self
.inner
.connected
.load(std::sync::atomic::Ordering::Acquire)
{
Ok(())
} else {
Err(Error::Connection(ConnectionError::EngineNotRegistered))
}
}
}
// ---------------------------------------------------------------------------
// Error mapping
// ---------------------------------------------------------------------------
fn map_nmx(err: NmxClientError) -> Error {
match err {
NmxClientError::Transport(t) => map_transport(t),
NmxClientError::NonZeroHresult { operation, hresult } => {
Error::Configuration(ConfigError::InvalidArgument {
detail: format!("{operation}: HRESULT 0x{hresult:08x}"),
})
}
NmxClientError::EmptyTransferDataBody => {
Error::Configuration(ConfigError::InvalidArgument {
detail: "TransferData body cannot be empty".to_string(),
})
}
NmxClientError::Codec(e) => Error::Protocol(crate::ProtocolError::Decode {
offset: 0,
reason: "codec",
buffer_len: 0,
})
.also_log(format!("codec error: {e}")),
NmxClientError::UnsupportedDataType(e) => {
Error::Configuration(ConfigError::InvalidArgument {
detail: e.to_string(),
})
}
// `NmxClientError` is `#[non_exhaustive]`; cover future variants
// with a generic config-error branch so a future codec variant
// doesn't silently break this map.
other => Error::Configuration(ConfigError::InvalidArgument {
detail: format!("nmx client: {other}"),
}),
}
}
fn map_transport(err: TransportError) -> Error {
match err {
TransportError::Io(io) => Error::Io(io),
TransportError::Fault { status } => Error::Status {
success: -1,
category: mxaccess_codec::MxStatusCategory::Unknown,
detected_by: mxaccess_codec::MxStatusSource::Unknown,
detail: i16::try_from(status & 0xFFFF).unwrap_or(0),
},
// `TransportError` is `#[non_exhaustive]` — same fall-through
// rationale as `map_nmx`'s catch-all.
other => Error::Configuration(ConfigError::InvalidArgument {
detail: format!("transport: {other}"),
}),
}
}
fn map_resolver(err: ResolverError) -> Error {
match err {
ResolverError::InvalidTagReference(parse) => {
Error::Configuration(ConfigError::InvalidArgument {
detail: format!("invalid tag reference: {parse}"),
})
}
ResolverError::NotFound { tag_reference } => Error::Configuration(ConfigError::Galaxy {
reason: format!("tag not found: {tag_reference}"),
}),
ResolverError::Backend { message } => {
Error::Configuration(ConfigError::Galaxy { reason: message })
}
// `ResolverError` is `#[non_exhaustive]`.
other => Error::Configuration(ConfigError::Galaxy {
reason: format!("resolver: {other}"),
}),
}
}
fn ensure_hresult_ok(hr: i32) -> Result<(), Error> {
if hr == 0 {
Ok(())
} else {
Err(Error::Configuration(ConfigError::InvalidArgument {
detail: format!("LMX returned application HRESULT 0x{hr:08x}"),
}))
}
}
// Tiny extension to attach a sidecar log message; placeholder for the
// proper `tracing::warn!` integration in wave 2.
trait AlsoLog {
fn also_log(self, msg: String) -> Self;
}
impl AlsoLog for Error {
fn also_log(self, _msg: String) -> Self {
// wave 2: emit through `tracing` — for now the message is
// discarded.
self
}
}
#[cfg(test)]
#[allow(
clippy::unwrap_used,
clippy::expect_used,
clippy::indexing_slicing,
clippy::panic
)]
mod tests {
use super::*;
use async_trait::async_trait;
use mxaccess_codec::{NmxTransferEnvelope, NmxTransferMessageKind};
use mxaccess_galaxy::{Resolver, ResolverError};
use mxaccess_rpc::nmx_service2_messages as svc;
use mxaccess_rpc::orpc::OrpcThat;
use mxaccess_rpc::pdu::{PacketType, PduHeader, ResponsePdu};
use mxaccess_rpc::transport::DceRpcTcpClient;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Mutex as StdMutex;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
// The test resolver re-uses the InMemoryResolver pattern from
// mxaccess-galaxy's resolver tests but exposes the trait at the
// mxaccess crate boundary so we can wire it through Session.
struct StaticResolver {
rows: HashMap<String, GalaxyTagMetadata>,
calls: StdMutex<Vec<String>>,
}
impl StaticResolver {
fn new(refs: &[(&str, GalaxyTagMetadata)]) -> Self {
let mut rows = HashMap::new();
for (k, v) in refs {
rows.insert((*k).to_string(), v.clone());
}
Self {
rows,
calls: StdMutex::new(Vec::new()),
}
}
}
#[async_trait]
impl Resolver for StaticResolver {
async fn resolve(&self, tag_reference: &str) -> Result<GalaxyTagMetadata, ResolverError> {
self.calls.lock().unwrap().push(tag_reference.to_string());
self.rows
.get(tag_reference)
.cloned()
.ok_or_else(|| ResolverError::NotFound {
tag_reference: tag_reference.to_string(),
})
}
}
fn sample_metadata() -> GalaxyTagMetadata {
GalaxyTagMetadata {
object_tag_name: "TestObj".to_string(),
attribute_name: "TestInt".to_string(),
primitive_name: None,
platform_id: 5,
engine_id: 7,
object_id: 42,
primitive_id: -1,
attribute_id: 99,
property_id: 10,
mx_data_type: 2, // Integer
is_array: false,
security_classification: 0,
attribute_source: "dynamic".to_string(),
}
}
fn local_addr() -> SocketAddr {
"127.0.0.1:0".parse().unwrap()
}
/// Hand-rolled DCE/RPC server that drains a Bind, then services
/// `responses.len()` Request PDUs by replying with `OrpcThat + i32`.
/// Same shape as the mxaccess-nmx test fixture.
async fn unauthenticated_server(
responses: Vec<(i32, Vec<u8>)>,
) -> (SocketAddr, tokio::task::JoinHandle<()>) {
let listener = TcpListener::bind(local_addr()).await.unwrap();
let addr = listener.local_addr().unwrap();
let handle = tokio::spawn(async move {
let (mut sock, _) = listener.accept().await.unwrap();
let mut hdr = [0u8; 16];
sock.read_exact(&mut hdr).await.unwrap();
let bind_h = PduHeader::decode(&hdr).unwrap();
let mut body = vec![0u8; bind_h.fragment_length as usize - 16];
sock.read_exact(&mut body).await.unwrap();
let resp_h = PduHeader {
version: 5,
version_minor: 0,
packet_type: PacketType::BindAck,
packet_flags: 0x03,
data_representation: 0x10,
fragment_length: 16,
auth_length: 0,
call_id: bind_h.call_id,
};
let mut out = [0u8; 16];
resp_h.encode(&mut out).unwrap();
sock.write_all(&out).await.unwrap();
for (custom_hresult, extra_payload) in responses {
sock.read_exact(&mut hdr).await.unwrap();
let req_h = PduHeader::decode(&hdr).unwrap();
let mut body = vec![0u8; req_h.fragment_length as usize - 16];
sock.read_exact(&mut body).await.unwrap();
let mut stub = Vec::new();
stub.extend_from_slice(&OrpcThat::default().encode());
stub.extend_from_slice(&custom_hresult.to_le_bytes());
stub.extend_from_slice(&extra_payload);
let response = ResponsePdu {
header: PduHeader {
version: 5,
version_minor: 0,
packet_type: PacketType::Response,
packet_flags: 0x03,
data_representation: 0x10,
fragment_length: 0,
auth_length: 0,
call_id: req_h.call_id,
},
allocation_hint: stub.len() as u32,
context_id: 0,
cancel_count: 0,
reserved23: 0,
stub_data: stub,
};
let bytes = response.encode();
sock.write_all(&bytes).await.unwrap();
}
});
(addr, handle)
}
/// Build a Session by going through the unauthenticated bind path
/// (test-only — production `connect_nmx` would do NTLM).
async fn connect_test_session(
addr: SocketAddr,
resolver: Arc<dyn Resolver>,
) -> Result<Session, Error> {
// We can't easily exercise the full NTLM path against a hand-rolled
// server; instead, build a NmxClient via from_bound_transport and
// wire it into Session manually for the test. This bypasses
// Session::connect_nmx but validates write/shutdown.
let mut transport = DceRpcTcpClient::connect(addr).await.unwrap();
transport.bind(svc::INTERFACE_ID, 0, 0).await.unwrap();
let nmx = NmxClient::from_bound_transport(transport, Guid::new([0xCC; 16]));
Ok(Session {
inner: Arc::new(SessionInner {
options: SessionOptions::default(),
resolver,
nmx: Mutex::new(nmx),
connected: std::sync::atomic::AtomicBool::new(true),
}),
})
}
#[tokio::test]
async fn write_value_round_trip_via_resolver() {
// Server returns HRESULT 0 for the one TransferData call.
let (addr, handle) = unauthenticated_server(vec![(0, Vec::new())]).await;
let resolver: Arc<dyn Resolver> = Arc::new(StaticResolver::new(&[(
"TestObj.TestInt",
sample_metadata(),
)]));
let session = connect_test_session(addr, resolver).await.unwrap();
session
.write_value("TestObj.TestInt", WriteValue::Int32(42))
.await
.unwrap();
handle.await.unwrap();
}
#[tokio::test]
async fn write_value_propagates_resolver_not_found() {
let (addr, handle) = unauthenticated_server(Vec::new()).await;
let resolver: Arc<dyn Resolver> = Arc::new(StaticResolver::new(&[]));
let session = connect_test_session(addr, resolver).await.unwrap();
let err = session
.write_value("Nope.NoTag", WriteValue::Int32(0))
.await
.unwrap_err();
match err {
Error::Configuration(ConfigError::Galaxy { reason }) => {
assert!(reason.contains("tag not found"));
}
other => panic!("expected Galaxy not-found, got {other:?}"),
}
handle.abort();
}
#[tokio::test]
async fn write_value_propagates_non_zero_hresult_as_config() {
let (addr, handle) = unauthenticated_server(vec![(0x4242, Vec::new())]).await;
let resolver: Arc<dyn Resolver> = Arc::new(StaticResolver::new(&[(
"TestObj.TestInt",
sample_metadata(),
)]));
let session = connect_test_session(addr, resolver).await.unwrap();
let err = session
.write_value("TestObj.TestInt", WriteValue::Int32(0))
.await
.unwrap_err();
match err {
Error::Configuration(ConfigError::InvalidArgument { detail }) => {
assert!(detail.contains("0x00004242"));
}
other => panic!("expected InvalidArgument with HRESULT, got {other:?}"),
}
handle.await.unwrap();
}
#[tokio::test]
async fn shutdown_after_no_ops_calls_unregister_then_idempotent() {
let (addr, handle) = unauthenticated_server(vec![(0, Vec::new())]).await;
let resolver: Arc<dyn Resolver> = Arc::new(StaticResolver::new(&[]));
let session = connect_test_session(addr, resolver).await.unwrap();
let cloned = session.clone();
session.shutdown_nmx().await.unwrap();
// Second shutdown is a no-op (idempotent).
cloned.shutdown_nmx().await.unwrap();
handle.await.unwrap();
}
#[tokio::test]
async fn write_after_shutdown_returns_engine_not_registered() {
let (addr, handle) = unauthenticated_server(vec![(0, Vec::new())]).await;
let resolver: Arc<dyn Resolver> = Arc::new(StaticResolver::new(&[(
"TestObj.TestInt",
sample_metadata(),
)]));
let session = connect_test_session(addr, resolver).await.unwrap();
let cloned = session.clone();
session.shutdown_nmx().await.unwrap();
let err = cloned
.write_value("TestObj.TestInt", WriteValue::Int32(0))
.await
.unwrap_err();
assert!(matches!(
err,
Error::Connection(ConnectionError::EngineNotRegistered)
));
handle.await.unwrap();
}
#[tokio::test]
async fn resolve_tag_returns_metadata_without_rpc() {
// No server-side call needed — just resolver lookup.
let resolver: Arc<dyn Resolver> = Arc::new(StaticResolver::new(&[(
"TestObj.TestInt",
sample_metadata(),
)]));
// Spin a server but don't expect any traffic.
let (addr, handle) = unauthenticated_server(Vec::new()).await;
let session = connect_test_session(addr, resolver).await.unwrap();
let meta = session.resolve_tag("TestObj.TestInt").await.unwrap();
assert_eq!(meta.attribute_id, 99);
handle.abort();
}
#[tokio::test]
async fn resolve_write_kind_returns_int32_for_integer_scalar() {
let resolver: Arc<dyn Resolver> = Arc::new(StaticResolver::new(&[(
"TestObj.TestInt",
sample_metadata(),
)]));
let (addr, handle) = unauthenticated_server(Vec::new()).await;
let session = connect_test_session(addr, resolver).await.unwrap();
let kind = session.resolve_write_kind("TestObj.TestInt").await.unwrap();
assert_eq!(kind, mxaccess_codec::MxValueKind::Int32);
handle.abort();
}
#[test]
fn envelope_kind_constants_used_by_session_match_codec_constants() {
// Sanity check that the Session impl uses the same constants
// mxaccess-codec exports — anchors any future codec rename.
assert_eq!(NmxTransferMessageKind::Write as u8, 3);
assert_eq!(NmxTransferEnvelope::HEADER_LEN, 46);
}
}