diff --git a/rust/Cargo.lock b/rust/Cargo.lock index b0fd6a5..2875462 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -214,8 +214,14 @@ dependencies = [ name = "mxaccess" version = "0.0.0" dependencies = [ + "async-trait", "mxaccess-codec", + "mxaccess-galaxy", + "mxaccess-nmx", + "mxaccess-rpc", "thiserror", + "tokio", + "tracing", ] [[package]] @@ -272,6 +278,7 @@ version = "0.0.0" dependencies = [ "mxaccess-callback", "mxaccess-codec", + "mxaccess-galaxy", "mxaccess-rpc", "rand", "thiserror", diff --git a/rust/crates/mxaccess-nmx/src/lib.rs b/rust/crates/mxaccess-nmx/src/lib.rs index 0474f76..cc878de 100644 --- a/rust/crates/mxaccess-nmx/src/lib.rs +++ b/rust/crates/mxaccess-nmx/src/lib.rs @@ -24,4 +24,4 @@ pub mod client; -pub use client::{NmxClient, NmxClientError}; +pub use client::{NmxClient, NmxClientError, WriteValue}; diff --git a/rust/crates/mxaccess/Cargo.toml b/rust/crates/mxaccess/Cargo.toml index 1974dfd..cfd6d5d 100644 --- a/rust/crates/mxaccess/Cargo.toml +++ b/rust/crates/mxaccess/Cargo.toml @@ -9,8 +9,16 @@ rust-version.workspace = true authors.workspace = true [dependencies] -mxaccess-codec = { path = "../mxaccess-codec" } -thiserror = { workspace = true } +mxaccess-codec = { path = "../mxaccess-codec" } +mxaccess-galaxy = { path = "../mxaccess-galaxy" } +mxaccess-nmx = { path = "../mxaccess-nmx" } +mxaccess-rpc = { path = "../mxaccess-rpc" } +thiserror = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } + +[dev-dependencies] +async-trait = { workspace = true } [features] default = [] diff --git a/rust/crates/mxaccess/src/lib.rs b/rust/crates/mxaccess/src/lib.rs index 50f624b..2ab81a1 100644 --- a/rust/crates/mxaccess/src/lib.rs +++ b/rust/crates/mxaccess/src/lib.rs @@ -21,17 +21,19 @@ pub use mxaccess_codec::{ // ---- Public types -------------------------------------------------------- +pub mod session; + +pub use mxaccess_galaxy::{GalaxyTagMetadata, Resolver, ResolverError}; +pub use mxaccess_nmx::WriteValue; + /// Async session façade. Cheap clones share the inner state; drop of the last /// clone fires `UnregisterEngine` best-effort. For deterministic shutdown, /// call `Session::shutdown(timeout).await`. #[derive(Debug, Clone)] pub struct Session { - _inner: Arc, + pub(crate) inner: Arc, } -#[derive(Debug)] -struct SessionInner; - /// Stream of `DataChange` items. Drop sends `UnAdvise` via the long-lived /// connection task (no `tokio::spawn` from `Drop`). #[derive(Debug)] diff --git a/rust/crates/mxaccess/src/session.rs b/rust/crates/mxaccess/src/session.rs new file mode 100644 index 0000000..27a4c2a --- /dev/null +++ b/rust/crates/mxaccess/src/session.rs @@ -0,0 +1,657 @@ +//! M4 wave 1 main — `Session` over the NMX transport. +//! +//! Direct port of the connect / write / shutdown slice of +//! `src/MxNativeClient/MxNativeSession.cs`. Wave 1 deliberately ships +//! a minimal surface: `connect_nmx` + `write` + `shutdown`. Read, +//! subscribe, recovery, and the long-lived connection task land in +//! later waves — see `design/60-roadmap.md` M4 wave 2. +//! +//! ## Architecture (wave 1) +//! +//! Session holds an `Arc` so the public type is cheaply +//! cloneable. The inner state wraps a single `NmxClient` behind a +//! [`tokio::sync::Mutex`]; all RPC operations serialize on that mutex. +//! This is intentionally simple: wave 2 replaces the mutex with a +//! long-lived connection task driven by an `mpsc::channel`, which +//! supports overlapping operations + clean shutdown without the +//! "spawn-from-Drop" hazard tracked at `design/70-risks-and-open-questions.md` +//! R15. +//! +//! ## What's deliberately NOT here (yet) +//! +//! - Recovery loop / `RecoveryEvent` emission (wave 2). +//! - Callback exporter wiring + `Subscription` stream (wave 2). +//! - `read` (read-as-subscribe pattern from `MxNativeSession.ReadAsync` +//! `cs:312-359`) — needs the callback exporter. +//! - Auto-resolving COM activation (followup F12). + +use std::sync::Arc; + +use mxaccess_galaxy::{GalaxyTagMetadata, Resolver, ResolverError}; +use mxaccess_nmx::{NmxClient, NmxClientError, WriteValue}; +use mxaccess_rpc::guid::Guid; +use mxaccess_rpc::ntlm::NtlmClientContext; +use mxaccess_rpc::transport::TransportError; +use std::net::SocketAddr; +use tokio::sync::Mutex; + +use crate::{ConfigError, ConnectionError, Error, RecoveryPolicy, Session, SessionOptions}; + +/// Inner state of [`Session`] when connected over NMX. Held inside the +/// public type's `Arc` so the public clone surface stays +/// cheap. +/// +/// Manual `Debug` impl below: neither `dyn Resolver` nor `NmxClient` +/// implement `Debug`, so a derive would fail. +pub struct SessionInner { + pub(crate) options: SessionOptions, + pub(crate) resolver: Arc, + pub(crate) nmx: Mutex, + /// `false` after [`Session::shutdown`] has run successfully. Subsequent + /// operations short-circuit with [`Error::Connection`]. + pub(crate) connected: std::sync::atomic::AtomicBool, +} + +impl std::fmt::Debug for SessionInner { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("SessionInner") + .field("options", &self.options) + .field( + "connected", + &self.connected.load(std::sync::atomic::Ordering::Acquire), + ) + .finish_non_exhaustive() + } +} + +impl Session { + /// Open a session over the NMX transport. Mirrors the wire-side of + /// `MxNativeSession.Open` (`MxNativeSession.cs:127-147`) — `Open` + /// itself is .NET-side: COM-activates `NmxSvc.NmxService`, marshals + /// an OBJREF, calls ResolveOxid + RemQI to discover `(host, port, + /// service_ipid)`, then calls `RegisterEngine2`. The Rust port + /// requires the caller to pre-resolve those because COM activation + /// is not yet wired (followup F12); the call sequence after that is + /// identical. + /// + /// On success: a `RegisterEngine2` round-trip has completed and the + /// LMX server has acknowledged the engine registration. The + /// `recovery` argument is validated but not yet consumed (wave 2 + /// reads it; wave 1 records it for later). + /// + /// # Errors + /// - [`Error::Configuration`] if `recovery.validate()` rejects. + /// - [`Error::Io`] / transport errors from the TCP / NTLM bind. + /// - [`Error::Connection`] if `RegisterEngine2` returned a non-zero + /// HRESULT. + pub async fn connect_nmx( + addr: SocketAddr, + options: SessionOptions, + ntlm: NtlmClientContext, + service_ipid: Guid, + resolver: Arc, + recovery: RecoveryPolicy, + ) -> Result { + recovery.validate()?; + + let mut nmx = NmxClient::connect(addr, service_ipid, ntlm) + .await + .map_err(map_nmx)?; + + // RegisterEngine2 with a NULL callback for now — the callback + // exporter wiring lands in wave 2. Mirrors cs:163-175 modulo the + // callback param. + let hr = nmx + .register_engine_2_without_callback( + options.local_engine_id, + &options.engine_name, + options.partner_version, + ) + .await + .map_err(map_nmx)?; + if hr != 0 { + return Err(Error::Connection(ConnectionError::EngineNotRegistered)); + } + + // Optional heartbeat-interval setup (cs:165-167). Mirrored as a + // post-register call when the option is `Some`. + if let Some(ticks) = options.heartbeat_ticks_per_beat { + let hr = nmx + .set_heartbeat_send_interval(ticks, options.heartbeat_max_missed_ticks) + .await + .map_err(map_nmx)?; + if hr != 0 { + // Heartbeat mis-configuration is a connection-config issue + // rather than a transport failure. + return Err(Error::Configuration(ConfigError::InvalidArgument { + detail: format!("SetHeartbeatSendInterval returned HRESULT 0x{hr:08x}"), + })); + } + } + + Ok(Self { + inner: Arc::new(SessionInner { + options, + resolver, + nmx: Mutex::new(nmx), + connected: std::sync::atomic::AtomicBool::new(true), + }), + }) + } + + /// Write a value to a tag. Mirrors `MxNativeSession.WriteAsync` + /// (`cs:165-185`) — resolves the tag through the configured + /// `Resolver`, then delegates to `NmxClient::write`. + /// + /// `value` is a typed [`WriteValue`] (re-exported from `mxaccess-nmx`). + /// Use [`GalaxyTagMetadata::resolve_write_kind`] to pre-flight which + /// variant the tag's `(mx_data_type, is_array)` accepts. + /// + /// # Errors + /// - [`Error::Connection`] if the session was already shut down. + /// - [`Error::Configuration`] if the resolver rejects `reference`. + /// - [`Error::Io`] / transport errors from the underlying RPC. + /// - [`Error::Status`]-shaped error if the LMX server returned a + /// non-zero application HRESULT. + pub async fn write_value(&self, reference: &str, value: WriteValue) -> Result<(), Error> { + self.ensure_connected()?; + let inner = self.inner.clone(); + let metadata = inner + .resolver + .resolve(reference) + .await + .map_err(map_resolver)?; + let opts = &inner.options; + let mut nmx = inner.nmx.lock().await; + let hr = nmx + .write( + opts.local_engine_id, + &metadata, + &value, + /* write_index */ 1, + /* client_token */ 0, + opts.galaxy_id, + /* source_galaxy_id */ i32::from(opts.galaxy_id), + opts.source_platform_id, + ) + .await + .map_err(map_nmx)?; + ensure_hresult_ok(hr)?; + Ok(()) + } + + /// Pre-resolve the wire kind a tag expects without dispatching a + /// write. Convenience wrapper that pulls the metadata through the + /// configured resolver and delegates to + /// [`GalaxyTagMetadata::resolve_write_kind`]. Useful when the + /// caller wants to choose the right [`WriteValue`] variant before + /// constructing one. + /// + /// # Errors + /// As for [`Self::write_value`] (resolver / config errors); plus + /// [`Error::Configuration`] when the metadata's `(mx_data_type, + /// is_array)` has no LMX wire encoding. + pub async fn resolve_write_kind( + &self, + reference: &str, + ) -> Result { + self.ensure_connected()?; + let inner = self.inner.clone(); + let metadata = inner + .resolver + .resolve(reference) + .await + .map_err(map_resolver)?; + metadata.resolve_write_kind().map_err(|e| { + Error::Configuration(ConfigError::InvalidArgument { + detail: e.to_string(), + }) + }) + } + + /// Resolve a tag without dispatching any RPC. Primarily for + /// callers that need the metadata directly (e.g. for browse UIs). + /// + /// # Errors + /// As for [`Self::write_value`] (resolver / config errors). + pub async fn resolve_tag(&self, reference: &str) -> Result { + self.ensure_connected()?; + let inner = self.inner.clone(); + inner + .resolver + .resolve(reference) + .await + .map_err(map_resolver) + } + + /// Orderly shutdown — calls `UnregisterEngine` then drops the inner + /// transport. Mirrors `MxNativeSession.Dispose` (`cs:468-482`) + /// minus the COM-side `Marshal.ReleaseComObject` (no .NET COM in + /// the Rust port). + /// + /// Idempotent; second and subsequent calls return `Ok(())` without + /// re-issuing the unregister. + /// + /// # Errors + /// - [`Error::Io`] / transport errors from the unregister round-trip. + /// - [`Error::Connection`] if the unregister HRESULT was non-zero. + pub async fn shutdown_nmx(self) -> Result<(), Error> { + if !self + .inner + .connected + .swap(false, std::sync::atomic::Ordering::AcqRel) + { + return Ok(()); + } + + let mut nmx = self.inner.nmx.lock().await; + let hr = nmx + .unregister_engine(self.inner.options.local_engine_id) + .await + .map_err(map_nmx)?; + if hr != 0 { + return Err(Error::Connection(ConnectionError::EngineNotRegistered)); + } + Ok(()) + } + + fn ensure_connected(&self) -> Result<(), Error> { + if self + .inner + .connected + .load(std::sync::atomic::Ordering::Acquire) + { + Ok(()) + } else { + Err(Error::Connection(ConnectionError::EngineNotRegistered)) + } + } +} + +// --------------------------------------------------------------------------- +// Error mapping +// --------------------------------------------------------------------------- + +fn map_nmx(err: NmxClientError) -> Error { + match err { + NmxClientError::Transport(t) => map_transport(t), + NmxClientError::NonZeroHresult { operation, hresult } => { + Error::Configuration(ConfigError::InvalidArgument { + detail: format!("{operation}: HRESULT 0x{hresult:08x}"), + }) + } + NmxClientError::EmptyTransferDataBody => { + Error::Configuration(ConfigError::InvalidArgument { + detail: "TransferData body cannot be empty".to_string(), + }) + } + NmxClientError::Codec(e) => Error::Protocol(crate::ProtocolError::Decode { + offset: 0, + reason: "codec", + buffer_len: 0, + }) + .also_log(format!("codec error: {e}")), + NmxClientError::UnsupportedDataType(e) => { + Error::Configuration(ConfigError::InvalidArgument { + detail: e.to_string(), + }) + } + // `NmxClientError` is `#[non_exhaustive]`; cover future variants + // with a generic config-error branch so a future codec variant + // doesn't silently break this map. + other => Error::Configuration(ConfigError::InvalidArgument { + detail: format!("nmx client: {other}"), + }), + } +} + +fn map_transport(err: TransportError) -> Error { + match err { + TransportError::Io(io) => Error::Io(io), + TransportError::Fault { status } => Error::Status { + success: -1, + category: mxaccess_codec::MxStatusCategory::Unknown, + detected_by: mxaccess_codec::MxStatusSource::Unknown, + detail: i16::try_from(status & 0xFFFF).unwrap_or(0), + }, + // `TransportError` is `#[non_exhaustive]` — same fall-through + // rationale as `map_nmx`'s catch-all. + other => Error::Configuration(ConfigError::InvalidArgument { + detail: format!("transport: {other}"), + }), + } +} + +fn map_resolver(err: ResolverError) -> Error { + match err { + ResolverError::InvalidTagReference(parse) => { + Error::Configuration(ConfigError::InvalidArgument { + detail: format!("invalid tag reference: {parse}"), + }) + } + ResolverError::NotFound { tag_reference } => Error::Configuration(ConfigError::Galaxy { + reason: format!("tag not found: {tag_reference}"), + }), + ResolverError::Backend { message } => { + Error::Configuration(ConfigError::Galaxy { reason: message }) + } + // `ResolverError` is `#[non_exhaustive]`. + other => Error::Configuration(ConfigError::Galaxy { + reason: format!("resolver: {other}"), + }), + } +} + +fn ensure_hresult_ok(hr: i32) -> Result<(), Error> { + if hr == 0 { + Ok(()) + } else { + Err(Error::Configuration(ConfigError::InvalidArgument { + detail: format!("LMX returned application HRESULT 0x{hr:08x}"), + })) + } +} + +// Tiny extension to attach a sidecar log message; placeholder for the +// proper `tracing::warn!` integration in wave 2. +trait AlsoLog { + fn also_log(self, msg: String) -> Self; +} + +impl AlsoLog for Error { + fn also_log(self, _msg: String) -> Self { + // wave 2: emit through `tracing` — for now the message is + // discarded. + self + } +} + +#[cfg(test)] +#[allow( + clippy::unwrap_used, + clippy::expect_used, + clippy::indexing_slicing, + clippy::panic +)] +mod tests { + use super::*; + use async_trait::async_trait; + use mxaccess_codec::{NmxTransferEnvelope, NmxTransferMessageKind}; + use mxaccess_galaxy::{Resolver, ResolverError}; + use mxaccess_rpc::nmx_service2_messages as svc; + use mxaccess_rpc::orpc::OrpcThat; + use mxaccess_rpc::pdu::{PacketType, PduHeader, ResponsePdu}; + use mxaccess_rpc::transport::DceRpcTcpClient; + use std::collections::HashMap; + use std::net::SocketAddr; + use std::sync::Mutex as StdMutex; + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + use tokio::net::TcpListener; + + // The test resolver re-uses the InMemoryResolver pattern from + // mxaccess-galaxy's resolver tests but exposes the trait at the + // mxaccess crate boundary so we can wire it through Session. + struct StaticResolver { + rows: HashMap, + calls: StdMutex>, + } + + impl StaticResolver { + fn new(refs: &[(&str, GalaxyTagMetadata)]) -> Self { + let mut rows = HashMap::new(); + for (k, v) in refs { + rows.insert((*k).to_string(), v.clone()); + } + Self { + rows, + calls: StdMutex::new(Vec::new()), + } + } + } + + #[async_trait] + impl Resolver for StaticResolver { + async fn resolve(&self, tag_reference: &str) -> Result { + self.calls.lock().unwrap().push(tag_reference.to_string()); + self.rows + .get(tag_reference) + .cloned() + .ok_or_else(|| ResolverError::NotFound { + tag_reference: tag_reference.to_string(), + }) + } + } + + fn sample_metadata() -> GalaxyTagMetadata { + GalaxyTagMetadata { + object_tag_name: "TestObj".to_string(), + attribute_name: "TestInt".to_string(), + primitive_name: None, + platform_id: 5, + engine_id: 7, + object_id: 42, + primitive_id: -1, + attribute_id: 99, + property_id: 10, + mx_data_type: 2, // Integer + is_array: false, + security_classification: 0, + attribute_source: "dynamic".to_string(), + } + } + + fn local_addr() -> SocketAddr { + "127.0.0.1:0".parse().unwrap() + } + + /// Hand-rolled DCE/RPC server that drains a Bind, then services + /// `responses.len()` Request PDUs by replying with `OrpcThat + i32`. + /// Same shape as the mxaccess-nmx test fixture. + async fn unauthenticated_server( + responses: Vec<(i32, Vec)>, + ) -> (SocketAddr, tokio::task::JoinHandle<()>) { + let listener = TcpListener::bind(local_addr()).await.unwrap(); + let addr = listener.local_addr().unwrap(); + let handle = tokio::spawn(async move { + let (mut sock, _) = listener.accept().await.unwrap(); + let mut hdr = [0u8; 16]; + sock.read_exact(&mut hdr).await.unwrap(); + let bind_h = PduHeader::decode(&hdr).unwrap(); + let mut body = vec![0u8; bind_h.fragment_length as usize - 16]; + sock.read_exact(&mut body).await.unwrap(); + let resp_h = PduHeader { + version: 5, + version_minor: 0, + packet_type: PacketType::BindAck, + packet_flags: 0x03, + data_representation: 0x10, + fragment_length: 16, + auth_length: 0, + call_id: bind_h.call_id, + }; + let mut out = [0u8; 16]; + resp_h.encode(&mut out).unwrap(); + sock.write_all(&out).await.unwrap(); + + for (custom_hresult, extra_payload) in responses { + sock.read_exact(&mut hdr).await.unwrap(); + let req_h = PduHeader::decode(&hdr).unwrap(); + let mut body = vec![0u8; req_h.fragment_length as usize - 16]; + sock.read_exact(&mut body).await.unwrap(); + + let mut stub = Vec::new(); + stub.extend_from_slice(&OrpcThat::default().encode()); + stub.extend_from_slice(&custom_hresult.to_le_bytes()); + stub.extend_from_slice(&extra_payload); + + let response = ResponsePdu { + header: PduHeader { + version: 5, + version_minor: 0, + packet_type: PacketType::Response, + packet_flags: 0x03, + data_representation: 0x10, + fragment_length: 0, + auth_length: 0, + call_id: req_h.call_id, + }, + allocation_hint: stub.len() as u32, + context_id: 0, + cancel_count: 0, + reserved23: 0, + stub_data: stub, + }; + let bytes = response.encode(); + sock.write_all(&bytes).await.unwrap(); + } + }); + (addr, handle) + } + + /// Build a Session by going through the unauthenticated bind path + /// (test-only — production `connect_nmx` would do NTLM). + async fn connect_test_session( + addr: SocketAddr, + resolver: Arc, + ) -> Result { + // We can't easily exercise the full NTLM path against a hand-rolled + // server; instead, build a NmxClient via from_bound_transport and + // wire it into Session manually for the test. This bypasses + // Session::connect_nmx but validates write/shutdown. + let mut transport = DceRpcTcpClient::connect(addr).await.unwrap(); + transport.bind(svc::INTERFACE_ID, 0, 0).await.unwrap(); + let nmx = NmxClient::from_bound_transport(transport, Guid::new([0xCC; 16])); + Ok(Session { + inner: Arc::new(SessionInner { + options: SessionOptions::default(), + resolver, + nmx: Mutex::new(nmx), + connected: std::sync::atomic::AtomicBool::new(true), + }), + }) + } + + #[tokio::test] + async fn write_value_round_trip_via_resolver() { + // Server returns HRESULT 0 for the one TransferData call. + let (addr, handle) = unauthenticated_server(vec![(0, Vec::new())]).await; + let resolver: Arc = Arc::new(StaticResolver::new(&[( + "TestObj.TestInt", + sample_metadata(), + )])); + let session = connect_test_session(addr, resolver).await.unwrap(); + session + .write_value("TestObj.TestInt", WriteValue::Int32(42)) + .await + .unwrap(); + handle.await.unwrap(); + } + + #[tokio::test] + async fn write_value_propagates_resolver_not_found() { + let (addr, handle) = unauthenticated_server(Vec::new()).await; + let resolver: Arc = Arc::new(StaticResolver::new(&[])); + let session = connect_test_session(addr, resolver).await.unwrap(); + let err = session + .write_value("Nope.NoTag", WriteValue::Int32(0)) + .await + .unwrap_err(); + match err { + Error::Configuration(ConfigError::Galaxy { reason }) => { + assert!(reason.contains("tag not found")); + } + other => panic!("expected Galaxy not-found, got {other:?}"), + } + handle.abort(); + } + + #[tokio::test] + async fn write_value_propagates_non_zero_hresult_as_config() { + let (addr, handle) = unauthenticated_server(vec![(0x4242, Vec::new())]).await; + let resolver: Arc = Arc::new(StaticResolver::new(&[( + "TestObj.TestInt", + sample_metadata(), + )])); + let session = connect_test_session(addr, resolver).await.unwrap(); + let err = session + .write_value("TestObj.TestInt", WriteValue::Int32(0)) + .await + .unwrap_err(); + match err { + Error::Configuration(ConfigError::InvalidArgument { detail }) => { + assert!(detail.contains("0x00004242")); + } + other => panic!("expected InvalidArgument with HRESULT, got {other:?}"), + } + handle.await.unwrap(); + } + + #[tokio::test] + async fn shutdown_after_no_ops_calls_unregister_then_idempotent() { + let (addr, handle) = unauthenticated_server(vec![(0, Vec::new())]).await; + let resolver: Arc = Arc::new(StaticResolver::new(&[])); + let session = connect_test_session(addr, resolver).await.unwrap(); + let cloned = session.clone(); + session.shutdown_nmx().await.unwrap(); + // Second shutdown is a no-op (idempotent). + cloned.shutdown_nmx().await.unwrap(); + handle.await.unwrap(); + } + + #[tokio::test] + async fn write_after_shutdown_returns_engine_not_registered() { + let (addr, handle) = unauthenticated_server(vec![(0, Vec::new())]).await; + let resolver: Arc = Arc::new(StaticResolver::new(&[( + "TestObj.TestInt", + sample_metadata(), + )])); + let session = connect_test_session(addr, resolver).await.unwrap(); + let cloned = session.clone(); + session.shutdown_nmx().await.unwrap(); + let err = cloned + .write_value("TestObj.TestInt", WriteValue::Int32(0)) + .await + .unwrap_err(); + assert!(matches!( + err, + Error::Connection(ConnectionError::EngineNotRegistered) + )); + handle.await.unwrap(); + } + + #[tokio::test] + async fn resolve_tag_returns_metadata_without_rpc() { + // No server-side call needed — just resolver lookup. + let resolver: Arc = Arc::new(StaticResolver::new(&[( + "TestObj.TestInt", + sample_metadata(), + )])); + // Spin a server but don't expect any traffic. + let (addr, handle) = unauthenticated_server(Vec::new()).await; + let session = connect_test_session(addr, resolver).await.unwrap(); + let meta = session.resolve_tag("TestObj.TestInt").await.unwrap(); + assert_eq!(meta.attribute_id, 99); + handle.abort(); + } + + #[tokio::test] + async fn resolve_write_kind_returns_int32_for_integer_scalar() { + let resolver: Arc = Arc::new(StaticResolver::new(&[( + "TestObj.TestInt", + sample_metadata(), + )])); + let (addr, handle) = unauthenticated_server(Vec::new()).await; + let session = connect_test_session(addr, resolver).await.unwrap(); + let kind = session.resolve_write_kind("TestObj.TestInt").await.unwrap(); + assert_eq!(kind, mxaccess_codec::MxValueKind::Int32); + handle.abort(); + } + + #[test] + fn envelope_kind_constants_used_by_session_match_codec_constants() { + // Sanity check that the Session impl uses the same constants + // mxaccess-codec exports — anchors any future codec rename. + assert_eq!(NmxTransferMessageKind::Write as u8, 3); + assert_eq!(NmxTransferEnvelope::HEADER_LEN, 46); + } +}