Files
mxaccess/rust/crates/mxaccess-nmx/src/client.rs
T
Joseph Doherty e79e289743 [F42] cargo doc --workspace --no-deps clean (0 warnings)
Fix all 33 rustdoc warnings across the workspace:

- Unresolved intra-doc links: rewrite [`name`] → either backtick text
  (when not actually a link) or fully-qualified `[Type::method]` /
  `[crate::module::name]` form. Affected: mxaccess-codec
  (asb_variant, item_control, metadata_query, observed_write_template,
  reference_handle, write_message), mxaccess-rpc (pdu), mxaccess-nmx
  (client), mxaccess-asb-nettcp (nmf), mxaccess-callback (exporter),
  mxaccess (asb_session, session, lib).
- Bracket-text being interpreted as link refs (e.g. `body[17]` →
  `` `body[17]` ``).
- Private-item references in public docs (CALLBACK_BROADCAST_CAPACITY,
  recover_connection_core, mxvalue_to_writevalue) reduced to
  backtick-text since they aren't part of the public API.

`RUSTDOCFLAGS="-D warnings" cargo doc --workspace --no-deps` now
exits clean. Workspace 759 tests pass; clippy clean.

Defers `#![warn(missing_docs)]` lint to a future pass — the cleanup
target is the broken-link warnings, which are signal; missing-docs
would surface hundreds of low-priority public-item gaps that are out
of scope for this F-number.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-06 04:39:51 -04:00

1567 lines
58 KiB
Rust

//! `INmxService2` async client.
//!
//! Direct port of the **raw opnum surface** + the **high-level
//! write/advise wrappers** of `src/MxNativeClient/ManagedNmxService2Client.cs`
//! over tokio. Wraps a single [`mxaccess_rpc::transport::DceRpcTcpClient`]
//! connection and exposes:
//!
//! - The 9 raw procedures defined by `INmxService2` (opnums 3..11) —
//! [`NmxClient::get_partner_version`], [`NmxClient::register_engine_2`],
//! [`NmxClient::unregister_engine`], [`NmxClient::connect_engine`],
//! [`NmxClient::add_subscriber_engine`],
//! [`NmxClient::remove_subscriber_engine`],
//! [`NmxClient::set_heartbeat_send_interval`],
//! [`NmxClient::transfer_data`].
//! - The 7 high-level wrappers from `cs:303-466` — [`NmxClient::write`],
//! [`NmxClient::write2`], [`NmxClient::write_secured2`],
//! [`NmxClient::advise_supervisory`],
//! [`NmxClient::send_observed_pre_advise_metadata`],
//! [`NmxClient::register_reference`], [`NmxClient::un_advise`]. Each
//! takes a [`mxaccess_galaxy::GalaxyTagMetadata`] for routing
//! (`platform_id` / `engine_id` / `to_reference_handle`) and a typed
//! [`WriteValue`] re-exported from `mxaccess-codec` for the value
//! payload.
//!
//! ## Out of scope (deferred)
//!
//! The auto-resolving `Create()` factory (`cs:30-64`) uses .NET COM
//! activation (`Type.GetTypeFromProgID("NmxSvc.NmxService")`) to discover
//! the service host/port/IPID via three steps:
//! `ComObjRefProvider.MarshalIUnknownObjRef`, then `ResolveOxid`, then
//! `IRemUnknown::RemQueryInterface`. That dance needs `windows-rs` (the COM
//! activation) which is not yet wired — see `design/followups.md` F12 for
//! the COM-activation follow-up.
#![allow(clippy::indexing_slicing)]
use std::net::SocketAddr;
use mxaccess_codec::{
CodecError, NmxItemControlCommand, NmxItemControlMessage, NmxMetadataQueryMessage,
NmxReferenceRegistrationMessage, NmxTransferEnvelope, NmxTransferMessageKind,
};
use mxaccess_codec::{secured_write, write_message};
use mxaccess_galaxy::{GalaxyTagMetadata, UnsupportedDataType};
use mxaccess_rpc::guid::Guid;
use mxaccess_rpc::nmx_service2_messages as svc;
use mxaccess_rpc::ntlm::NtlmClientContext;
use mxaccess_rpc::orpc::OrpcThis;
use mxaccess_rpc::transport::{DceRpcTcpClient, TransportError};
pub use mxaccess_codec::write_message::WriteValue;
/// Errors raised by [`NmxClient`]. Mirrors `ThrowIfFailed`
/// (`ManagedNmxService2Client.cs:563-574`) and the codec-error pass-through
/// from `CallForHResult`.
#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
pub enum NmxClientError {
/// Transport-layer failure (I/O, codec, NTLM, fault, etc.).
#[error("transport: {0}")]
Transport(#[from] TransportError),
/// Server returned a non-zero application HRESULT in the response
/// body. Mirrors the `cs:570-573` "application status 0xX" branch and
/// the `cs:565-568` "negative HRESULT" branch — both unify into one
/// typed surface here.
#[error("{operation} returned non-zero HRESULT 0x{hresult:08x}")]
NonZeroHresult {
operation: &'static str,
hresult: i32,
},
/// `transfer_data` was called with an empty body (`cs:174-177`).
#[error("TransferData body cannot be empty")]
EmptyTransferDataBody,
/// One of the high-level write/advise wrappers failed to build the
/// inner NMX body — typically because the supplied
/// [`mxaccess_galaxy::GalaxyTagMetadata`] has an empty
/// `object_tag_name` / `attribute_name` (rejected by
/// `MxReferenceHandle::from_names`) or because a
/// `WriteValue::*Array` exceeds the `u16::MAX` element-count limit.
/// Mirrors the `CodecError` propagation paths inside the .NET
/// reference's `EncodeWriteTransferBody` family
/// (`ManagedNmxService2Client.cs:186-301`).
#[error("codec: {0}")]
Codec(#[from] CodecError),
/// The metadata's `(mx_data_type, is_array)` pair has no LMX wire
/// encoding (e.g. arrays of `ElapsedTime`, scalars of
/// `ReferenceType`). Returned by [`GalaxyTagMetadata::resolve_write_kind`]
/// helpers when the caller asks for a kind that
/// [`mxaccess_codec::MxValueKind::for_data_type`] rejects.
/// Mirrors the `ArgumentOutOfRangeException` paths in the .NET
/// `GalaxyTagMetadata.ProjectWriteValue` (`cs:62,70`).
#[error("unsupported data type: {0}")]
UnsupportedDataType(#[from] UnsupportedDataType),
/// COM activation / OBJREF marshalling failed during
/// [`NmxClient::create`] — typically `REGDB_E_CLASSNOTREG` (the AVEVA
/// install is missing) or `CO_E_SERVER_EXEC_FAILURE` (NmxSvc.exe
/// failed to launch). Only emitted when the `windows-com` feature is
/// enabled.
#[cfg(all(windows, feature = "windows-com"))]
#[error("NmxSvc COM activation failed: {0}")]
Activation(#[from] mxaccess_rpc::com_objref_provider::ProviderError),
/// `ResolveOxid` returned without a usable `ncacn_ip_tcp` binding,
/// the binding's `host[port]` couldn't be parsed, or `IRemUnknown::RemQueryInterface`
/// returned a non-zero HRESULT / error code. Mirrors the
/// `InvalidOperationException` at
/// `ManagedNmxService2Client.cs:519,545,559`.
#[error("NmxSvc endpoint resolution failed: {reason}")]
EndpointResolution { reason: String },
}
/// Generates a random correlation `Cid` for each outgoing `OrpcThis` —
/// mirrors `Guid.NewGuid()` per call site
/// (`ManagedNmxService2Client.cs:70,84,96,108`, etc.).
///
/// The .NET reference re-rolls per call; the Rust port matches.
fn fresh_orpc_this() -> OrpcThis {
OrpcThis::create(Guid::new(rand::random()), None)
}
/// Parse a `host[port]` binding string of the shape `ManagedNmxService2Client`
/// expects (`cs:540-561`). The host is everything before the **last** `[`,
/// the port is the decimal text between that `[` and the **last** `]`.
///
/// Used by [`NmxClient::create`] only — gated on `windows-com`.
#[cfg_attr(
not(all(windows, feature = "windows-com")),
allow(dead_code)
)]
fn parse_bracketed_host_port(binding: &str) -> Result<(String, u16), NmxClientError> {
let open = binding.rfind('[').ok_or_else(|| NmxClientError::EndpointResolution {
reason: format!("binding {binding:?} has no '['"),
})?;
let close = binding.rfind(']').ok_or_else(|| NmxClientError::EndpointResolution {
reason: format!("binding {binding:?} has no ']'"),
})?;
if open == 0 || close <= open {
return Err(NmxClientError::EndpointResolution {
reason: format!("binding {binding:?} has malformed brackets"),
});
}
let host = binding[..open].to_string();
let port_text = &binding[open + 1..close];
let port: u16 =
port_text
.parse()
.map_err(|e: std::num::ParseIntError| NmxClientError::EndpointResolution {
reason: format!("binding {binding:?} port {port_text:?} parse: {e}"),
})?;
Ok((host, port))
}
/// Async `INmxService2` client. Owns one `DceRpcTcpClient` connection that
/// has already completed an authenticated bind to the `INmxService2` IID.
///
/// Construct via [`NmxClient::connect`] (manual, deterministic) — the
/// auto-resolving `Create()` from the .NET reference depends on COM
/// activation that hasn't landed yet (see module-level doc).
pub struct NmxClient {
transport: DceRpcTcpClient,
/// `service_ipid` is the IPID returned by `IRemUnknown::RemQueryInterface`
/// against the activated `IUnknown`'s OBJREF. Every `call_bound_object`
/// stamps this into the `PFC_OBJECT_UUID` slot so the LMX server routes
/// the call to the right per-engine `INmxService2` instance
/// (`ManagedNmxService2Client.cs:74,486-488`).
service_ipid: Guid,
}
impl NmxClient {
/// Open a new client connection. Mirrors the assembly of
/// `ManagedNmxService2Client.cs:41-54` *minus* the COM-activation
/// pre-step that resolves `(host, port, service_ipid)` automatically.
///
/// `service_ipid` must be the `IPID` returned by
/// `IRemUnknown::RemQueryInterface` (`crate::rem_unknown::*`) against
/// the activated `NmxService` IUnknown. Until COM activation lands,
/// callers obtain it via the .NET probe (or by capturing the wire
/// bytes with Frida and pulling the IPID directly).
///
/// # Errors
/// I/O from the TCP connect or NTLM bind round-trip.
pub async fn connect(
addr: SocketAddr,
service_ipid: Guid,
ntlm: NtlmClientContext,
) -> Result<Self, NmxClientError> {
let mut transport = DceRpcTcpClient::connect(addr)
.await
.map_err(TransportError::from)?;
transport
.bind_with_managed_ntlm_packet_integrity(svc::INTERFACE_ID, 0, 0, ntlm)
.await?;
Ok(Self {
transport,
service_ipid,
})
}
/// Auto-resolve `(host, port, service_ipid)` via COM activation +
/// OXID resolution + `IRemUnknown::RemQueryInterface`, then bind to
/// `INmxService2` and return a ready-to-use client.
///
/// Mirrors `ManagedNmxService2Client.Create()` (`cs:30-64`) +
/// `ResolveService` (`cs:491-523`). Only available when the crate
/// is built with the `windows-com` feature on Windows.
///
/// `ntlm_factory` is invoked **three times**: once for the
/// `ResolveOxid` call against `127.0.0.1:135` (RPCSS endpoint
/// mapper), once for the `IRemUnknown` bind against the discovered
/// NmxSvc endpoint, and once for the final `INmxService2` bind on a
/// fresh transport. Each NTLM context is consumed by its bind; the
/// caller is responsible for producing fresh ones (typically by
/// re-reading credentials from `MX_RPC_*` env vars via
/// [`NtlmClientContext::from_env`]).
///
/// Steps:
///
/// 1. `marshal_activated_iunknown_objref("NmxSvc.NmxService", DifferentMachine)`
/// activates the COM class and emits an OBJREF blob.
/// 2. [`mxaccess_rpc::objref::ComObjRef::parse`] extracts `oxid` +
/// `ipid` (the activated server's `IUnknown` IPID).
/// 3. [`mxaccess_rpc::object_exporter_client::resolve_oxid_with_managed_ntlm_packet_integrity`]
/// against `127.0.0.1:135` returns the server's `(host, port)`
/// bindings + `IRemUnknown` IPID.
/// 4. The `ncacn_ip_tcp` binding's `host[port]` text is parsed.
/// 5. A fresh transport binds to `IRemUnknown` and calls
/// `RemQueryInterface(iunknown_ipid, INmxService2)` to obtain the
/// `INmxService2` IPID.
/// 6. A second fresh transport binds to `INmxService2` and is
/// returned wrapped in this client.
///
/// # Errors
///
/// [`NmxClientError::Activation`] for COM activation /
/// `CoMarshalInterface` failures;
/// [`NmxClientError::EndpointResolution`] when `ResolveOxid`
/// returns no `ncacn_ip_tcp` binding, the host/port string is
/// malformed, or `RemQueryInterface` returns a non-zero HRESULT;
/// [`NmxClientError::Transport`] for I/O / NTLM failures during
/// any of the three binds.
#[cfg(all(windows, feature = "windows-com"))]
pub async fn create(
mut ntlm_factory: impl FnMut() -> NtlmClientContext,
) -> Result<Self, NmxClientError> {
use mxaccess_rpc::com_objref_provider::{
marshal_activated_iunknown_objref, MarshalContext,
};
use mxaccess_rpc::object_exporter::PROTSEQ_NCACN_IP_TCP;
use mxaccess_rpc::object_exporter_client::{
resolve_oxid_with_managed_ntlm_packet_integrity, ResolveOxidOutcome,
};
use mxaccess_rpc::objref::ComObjRef;
use mxaccess_rpc::rem_unknown::{
encode_rem_query_interface_request, parse_rem_query_interface_response,
IREM_UNKNOWN_IID, REM_QUERY_INTERFACE_OPNUM,
};
// Step 1+2: Activate NmxSvc.NmxService and parse OBJREF.
let blob = marshal_activated_iunknown_objref(
"NmxSvc.NmxService",
MarshalContext::DifferentMachine,
)?;
let objref = ComObjRef::parse(&blob).map_err(|e| NmxClientError::EndpointResolution {
reason: format!("OBJREF parse: {e}"),
})?;
// Step 3: ResolveOxid against the local RPCSS endpoint mapper.
let exporter_addr: SocketAddr = "127.0.0.1:135"
.parse()
.map_err(|e: std::net::AddrParseError| NmxClientError::EndpointResolution {
reason: format!("invalid 127.0.0.1:135 literal: {e}"),
})?;
let outcome = resolve_oxid_with_managed_ntlm_packet_integrity(
exporter_addr,
objref.oxid,
&[PROTSEQ_NCACN_IP_TCP],
ntlm_factory(),
)
.await?;
let resolved = match outcome {
ResolveOxidOutcome::Result(r) => r,
ResolveOxidOutcome::Failure(f) => {
return Err(NmxClientError::EndpointResolution {
reason: format!(
"ResolveOxid returned failure status 0x{:08X}",
f.error_status
),
});
}
};
if resolved.error_status != 0 {
return Err(NmxClientError::EndpointResolution {
reason: format!(
"ResolveOxid completed with non-zero error_status 0x{:08X}",
resolved.error_status
),
});
}
// Step 4: Find the ncacn_ip_tcp binding and parse host[port].
let endpoint = resolved
.bindings
.iter()
.find(|b| b.tower_id == PROTSEQ_NCACN_IP_TCP && !b.is_security_binding)
.ok_or_else(|| NmxClientError::EndpointResolution {
reason: "ResolveOxid returned no ncacn_ip_tcp binding".to_string(),
})?;
let (host, port) = parse_bracketed_host_port(&endpoint.value)?;
let svc_addr: SocketAddr =
tokio::net::lookup_host((host.as_str(), port))
.await
.map_err(|e| NmxClientError::EndpointResolution {
reason: format!("DNS lookup of {host}:{port} failed: {e}"),
})?
.next()
.ok_or_else(|| NmxClientError::EndpointResolution {
reason: format!("DNS resolution of {host}:{port} produced no addresses"),
})?;
// Step 5: Bind IRemUnknown on a fresh transport and call
// RemQueryInterface(iunknown_ipid, INmxService2).
let mut rem_qi_client = DceRpcTcpClient::connect(svc_addr)
.await
.map_err(TransportError::from)?;
rem_qi_client
.bind_with_managed_ntlm_packet_integrity(IREM_UNKNOWN_IID, 0, 0, ntlm_factory())
.await?;
// Native uses `public_refs = 5` (`RemUnknownMessages.cs:12`); the
// Rust signature requires it explicitly so the default isn't
// hidden in the call-site.
let qi_request = encode_rem_query_interface_request(
objref.ipid,
svc::INTERFACE_ID,
Guid::new(rand::random()),
5,
);
let qi_response = rem_qi_client
.call_bound_object(
resolved.rem_unknown_ipid,
REM_QUERY_INTERFACE_OPNUM,
&qi_request,
)
.await?;
let parsed = parse_rem_query_interface_response(&qi_response.stub_data)
.map_err(TransportError::from)?;
let qi_result = parsed.result.ok_or_else(|| NmxClientError::EndpointResolution {
reason: format!(
"RemQueryInterface response had no REMQIRESULT (error_code 0x{:08X})",
parsed.error_code
),
})?;
if qi_result.hresult != 0 || parsed.error_code != 0 {
return Err(NmxClientError::EndpointResolution {
reason: format!(
"RemQueryInterface failed: hresult=0x{:08X}, error_code=0x{:08X}",
qi_result.hresult, parsed.error_code
),
});
}
let service_ipid = qi_result.standard_object_reference.ipid;
// Drop the QI transport; the .NET reference uses a `using` block
// for the same reason — the IRemUnknown bind is single-use.
drop(rem_qi_client);
// Step 6: Final transport bound to INmxService2.
Self::connect(svc_addr, service_ipid, ntlm_factory()).await
}
/// Construct from an already-bound transport. Useful when a caller
/// has already negotiated the bind (e.g. for tests against a hand-rolled
/// server, or for an unauthenticated probe path).
#[must_use]
pub fn from_bound_transport(transport: DceRpcTcpClient, service_ipid: Guid) -> Self {
Self {
transport,
service_ipid,
}
}
/// IPID this client routes every call through.
#[must_use]
pub fn service_ipid(&self) -> Guid {
self.service_ipid
}
// --- INmxService2 opnums ---------------------------------------------
/// `INmxService2::GetPartnerVersion` (opnum 11). Mirrors
/// `cs:66-78`. Returns the partner protocol version on success;
/// `NmxClientError::NonZeroHresult` for any non-zero HRESULT in the
/// response body.
///
/// # Errors
/// Transport, codec, or non-zero HRESULT in the response.
pub async fn get_partner_version(
&mut self,
galaxy_id: i32,
platform_id: i32,
engine_id: i32,
) -> Result<i32, NmxClientError> {
let request = svc::encode_get_partner_version_request(
fresh_orpc_this(),
galaxy_id,
platform_id,
engine_id,
);
let response = self
.transport
.call_bound_object(self.service_ipid, svc::GET_PARTNER_VERSION_OPNUM, &request)
.await?;
let parsed = svc::parse_get_partner_version_response(&response.stub_data)
.map_err(TransportError::from)?;
if parsed.hresult != 0 {
return Err(NmxClientError::NonZeroHresult {
operation: "GetPartnerVersion",
hresult: parsed.hresult,
});
}
Ok(parsed.partner_version)
}
/// `INmxService2::RegisterEngine2` (opnum 10). Mirrors `cs:80-90`.
/// `callback_obj_ref` is the OBJREF bytes of the local callback
/// exporter — typically `mxaccess_callback::CallbackExporter::create_callback_objref`.
///
/// Returns the application HRESULT verbatim — the .NET reference's
/// `CallForHResult` (`cs:484-489`) does not raise on non-zero. Mirror
/// that here so callers can distinguish "transport-OK + LMX rejected"
/// from "transport-error".
///
/// # Errors
/// Transport or codec.
pub async fn register_engine_2(
&mut self,
local_engine_id: i32,
engine_name: &str,
version: i32,
callback_obj_ref: &[u8],
) -> Result<i32, NmxClientError> {
let request = svc::encode_register_engine_2_request(
fresh_orpc_this(),
local_engine_id,
engine_name,
version,
Some(callback_obj_ref),
);
self.call_for_hresult(svc::REGISTER_ENGINE_2_OPNUM, request)
.await
}
/// `INmxService2::RegisterEngine2` with no callback (a NULL interface
/// pointer is sent in place of the callback OBJREF). Mirrors
/// `cs:92-102`.
///
/// # Errors
/// Transport or codec.
pub async fn register_engine_2_without_callback(
&mut self,
local_engine_id: i32,
engine_name: &str,
version: i32,
) -> Result<i32, NmxClientError> {
let request = svc::encode_register_engine_2_request(
fresh_orpc_this(),
local_engine_id,
engine_name,
version,
None,
);
self.call_for_hresult(svc::REGISTER_ENGINE_2_OPNUM, request)
.await
}
/// `INmxService2::UnRegisterEngine` (opnum 4). Mirrors `cs:104-111`.
///
/// # Errors
/// Transport or codec.
pub async fn unregister_engine(&mut self, local_engine_id: i32) -> Result<i32, NmxClientError> {
let request = svc::encode_unregister_engine_request(fresh_orpc_this(), local_engine_id);
self.call_for_hresult(svc::UNREGISTER_ENGINE_OPNUM, request)
.await
}
/// `INmxService2::Connect` (opnum 5). Mirrors `cs:113-123`.
///
/// # Errors
/// Transport or codec.
pub async fn connect_engine(
&mut self,
local_engine_id: i32,
remote_galaxy_id: i32,
remote_platform_id: i32,
remote_engine_id: i32,
) -> Result<i32, NmxClientError> {
let request = svc::encode_connect_request(
fresh_orpc_this(),
local_engine_id,
remote_galaxy_id,
remote_platform_id,
remote_engine_id,
);
self.call_for_hresult(svc::CONNECT_OPNUM, request).await
}
/// `INmxService2::AddSubscriberEngine` (opnum 7). Mirrors `cs:125-135`.
///
/// # Errors
/// Transport or codec.
pub async fn add_subscriber_engine(
&mut self,
local_engine_id: i32,
subscriber_galaxy_id: i32,
subscriber_platform_id: i32,
subscriber_engine_id: i32,
) -> Result<i32, NmxClientError> {
let request = svc::encode_subscriber_engine_request(
fresh_orpc_this(),
local_engine_id,
subscriber_galaxy_id,
subscriber_platform_id,
subscriber_engine_id,
);
self.call_for_hresult(svc::ADD_SUBSCRIBER_ENGINE_OPNUM, request)
.await
}
/// `INmxService2::RemoveSubscriberEngine` (opnum 8). Mirrors
/// `cs:137-147`. Same wire shape as [`Self::add_subscriber_engine`].
///
/// # Errors
/// Transport or codec.
pub async fn remove_subscriber_engine(
&mut self,
local_engine_id: i32,
subscriber_galaxy_id: i32,
subscriber_platform_id: i32,
subscriber_engine_id: i32,
) -> Result<i32, NmxClientError> {
let request = svc::encode_subscriber_engine_request(
fresh_orpc_this(),
local_engine_id,
subscriber_galaxy_id,
subscriber_platform_id,
subscriber_engine_id,
);
self.call_for_hresult(svc::REMOVE_SUBSCRIBER_ENGINE_OPNUM, request)
.await
}
/// `INmxService2::SetHeartbeatSendInterval` (opnum 9). Mirrors
/// `cs:149-157`.
///
/// # Errors
/// Transport or codec.
pub async fn set_heartbeat_send_interval(
&mut self,
ticks_per_beat: i32,
max_missed_ticks: i32,
) -> Result<i32, NmxClientError> {
let request = svc::encode_set_heartbeat_send_interval_request(
fresh_orpc_this(),
ticks_per_beat,
max_missed_ticks,
);
self.call_for_hresult(svc::SET_HEARTBEAT_SEND_INTERVAL_OPNUM, request)
.await
}
/// `INmxService2::TransferData` (opnum 6). Mirrors `cs:159-170`.
///
/// `message_body` must be a complete `NmxTransferEnvelope` (the .NET
/// reference validates this via `NmxTransferEnvelopeTemplate.FromObserved`
/// at `cs:179`). The Rust port enforces only that the body is non-empty;
/// stricter validation is the caller's responsibility for now (high-level
/// `Write*`/`Advise*` wrappers will land later and do this implicitly).
///
/// # Errors
/// Transport, codec, or [`NmxClientError::EmptyTransferDataBody`] when
/// `message_body.is_empty()`.
pub async fn transfer_data(
&mut self,
remote_galaxy_id: i32,
remote_platform_id: i32,
remote_engine_id: i32,
message_body: &[u8],
) -> Result<i32, NmxClientError> {
if message_body.is_empty() {
return Err(NmxClientError::EmptyTransferDataBody);
}
let request = svc::encode_transfer_data_request(
fresh_orpc_this(),
remote_galaxy_id,
remote_platform_id,
remote_engine_id,
message_body,
);
self.call_for_hresult(svc::TRANSFER_DATA_OPNUM, request)
.await
}
/// Common HRESULT-only call path. Mirrors `CallForHResult`
/// (`cs:484-489`).
async fn call_for_hresult(
&mut self,
opnum: u16,
request: Vec<u8>,
) -> Result<i32, NmxClientError> {
let response = self
.transport
.call_bound_object(self.service_ipid, opnum, &request)
.await?;
let parsed =
svc::parse_hresult_response(&response.stub_data).map_err(TransportError::from)?;
Ok(parsed.hresult)
}
// ------------------------------------------------------------------
// High-level write / advise / unadvise wrappers
// (port of ManagedNmxService2Client.cs:186-466 — resolves F13).
//
// Each wrapper builds an inner NMX message via mxaccess-codec,
// wraps it in NmxTransferEnvelope, and calls `transfer_data`. The
// return value is the LMX HRESULT verbatim (per CallForHResult at
// cs:484-489): non-zero is forwarded to the caller, not raised, so
// application-status responses can be distinguished from transport
// errors.
// ------------------------------------------------------------------
/// Write a value. Mirrors `Write` (`ManagedNmxService2Client.cs:303-324`).
///
/// `value` is a typed [`WriteValue`] (re-exported from
/// `mxaccess-codec`). Use [`GalaxyTagMetadata::resolve_write_kind`]
/// to learn which variant to construct from the tag metadata.
///
/// # Errors
/// Transport, codec, or non-zero HRESULT is forwarded as `Ok(hr)`.
#[allow(clippy::too_many_arguments)]
pub async fn write(
&mut self,
local_engine_id: i32,
tag: &GalaxyTagMetadata,
value: &WriteValue,
write_index: i32,
client_token: u32,
galaxy_id: u8,
source_galaxy_id: i32,
source_platform_id: i32,
) -> Result<i32, NmxClientError> {
let inner = encode_write_transfer_body(
local_engine_id,
tag,
value,
write_index,
client_token,
galaxy_id,
source_galaxy_id,
source_platform_id,
)?;
self.transfer_data(
i32::from(galaxy_id),
i32::from(tag.platform_id),
i32::from(tag.engine_id),
&inner,
)
.await
}
/// Write a value with explicit timestamp. Mirrors `Write2`
/// (`cs:326-349`).
///
/// `timestamp_filetime` is a Windows FILETIME tick count (100-ns
/// intervals since 1601-01-01 UTC) — same encoding as `cs:248`'s
/// `DateTime.ToFileTime()`.
///
/// # Errors
/// As for [`Self::write`].
#[allow(clippy::too_many_arguments)]
pub async fn write2(
&mut self,
local_engine_id: i32,
tag: &GalaxyTagMetadata,
value: &WriteValue,
timestamp_filetime: i64,
write_index: i32,
client_token: u32,
galaxy_id: u8,
source_galaxy_id: i32,
source_platform_id: i32,
) -> Result<i32, NmxClientError> {
let inner = encode_write2_transfer_body(
local_engine_id,
tag,
value,
timestamp_filetime,
write_index,
client_token,
galaxy_id,
source_galaxy_id,
source_platform_id,
)?;
self.transfer_data(
i32::from(galaxy_id),
i32::from(tag.platform_id),
i32::from(tag.engine_id),
&inner,
)
.await
}
/// Secured write with timestamp + dual user tokens (single-user
/// secured writes pass the same id twice). Mirrors `WriteSecured2`
/// (`cs:351-380`).
///
/// `current_user_id` and `verifier_user_id` are `dbo.user_profile.user_profile_id`
/// values — convert to wire tokens via
/// [`mxaccess_codec::secured_write::resolve_observed_user_token`]
/// internally. `client_name` is the human-readable name of the
/// signing party (UTF-16LE NUL-terminated on the wire).
///
/// # Errors
/// As for [`Self::write`].
#[allow(clippy::too_many_arguments)]
pub async fn write_secured2(
&mut self,
local_engine_id: i32,
tag: &GalaxyTagMetadata,
value: &WriteValue,
timestamp_filetime: i64,
client_name: &str,
current_user_id: i32,
verifier_user_id: i32,
write_index: i32,
client_token: u32,
galaxy_id: u8,
source_galaxy_id: i32,
source_platform_id: i32,
) -> Result<i32, NmxClientError> {
let inner = encode_write_secured2_transfer_body(
local_engine_id,
tag,
value,
timestamp_filetime,
client_name,
current_user_id,
verifier_user_id,
write_index,
client_token,
galaxy_id,
source_galaxy_id,
source_platform_id,
)?;
self.transfer_data(
i32::from(galaxy_id),
i32::from(tag.platform_id),
i32::from(tag.engine_id),
&inner,
)
.await
}
/// Advise (subscribe) for supervisory data. Mirrors
/// `AdviseSupervisory` (`cs:382-399`).
///
/// `item_correlation_id` is a 16-byte caller-chosen identifier the
/// LMX server will echo back in subsequent `INmxSvcCallback`
/// frames so the consumer can correlate updates to subscriptions.
///
/// # Errors
/// As for [`Self::write`].
pub async fn advise_supervisory(
&mut self,
local_engine_id: i32,
tag: &GalaxyTagMetadata,
item_correlation_id: [u8; 16],
galaxy_id: u8,
source_galaxy_id: i32,
source_platform_id: i32,
) -> Result<i32, NmxClientError> {
let inner = encode_advise_supervisory_transfer_body(
local_engine_id,
tag,
item_correlation_id,
galaxy_id,
source_galaxy_id,
source_platform_id,
)?;
self.transfer_data(
i32::from(galaxy_id),
i32::from(tag.platform_id),
i32::from(tag.engine_id),
&inner,
)
.await
}
/// Send the observed pre-advise metadata frame. Mirrors
/// `SendObservedPreAdviseMetadata` (`cs:401-420`).
///
/// Routes to platform/engine `(1, 1)` — the .NET reference hard-codes
/// `targetPlatformId: 1, targetEngineId: 1` at `cs:415`.
pub async fn send_observed_pre_advise_metadata(
&mut self,
local_engine_id: i32,
item_correlation_id: [u8; 16],
galaxy_id: u8,
source_galaxy_id: i32,
source_platform_id: i32,
) -> Result<i32, NmxClientError> {
let inner_body = NmxMetadataQueryMessage::encode_observed_pre_advise(item_correlation_id);
let envelope = NmxTransferEnvelope {
message_kind: NmxTransferMessageKind::Metadata,
local_engine_id,
target_galaxy_id: i32::from(galaxy_id),
target_platform_id: 1,
target_engine_id: 1,
source_galaxy_id,
source_platform_id,
..Default::default()
};
let transfer_body = envelope.encode_with_inner(&inner_body);
self.transfer_data(i32::from(galaxy_id), 1, 1, &transfer_body)
.await
}
/// Register a reference. Mirrors `RegisterReference` (`cs:422-441`).
///
/// `route_tag` provides the `platform_id` / `engine_id` to route the
/// envelope to. `message` is caller-built — typically constructed
/// via the codec's `NmxReferenceRegistrationMessage` builders.
pub async fn register_reference(
&mut self,
local_engine_id: i32,
route_tag: &GalaxyTagMetadata,
message: &NmxReferenceRegistrationMessage,
galaxy_id: u8,
source_galaxy_id: i32,
source_platform_id: i32,
) -> Result<i32, NmxClientError> {
let inner_body = message.encode();
let envelope = NmxTransferEnvelope {
message_kind: NmxTransferMessageKind::ItemControl,
local_engine_id,
target_galaxy_id: i32::from(galaxy_id),
target_platform_id: i32::from(route_tag.platform_id),
target_engine_id: i32::from(route_tag.engine_id),
source_galaxy_id,
source_platform_id,
..Default::default()
};
let transfer_body = envelope.encode_with_inner(&inner_body);
self.transfer_data(
i32::from(galaxy_id),
i32::from(route_tag.platform_id),
i32::from(route_tag.engine_id),
&transfer_body,
)
.await
}
/// Unadvise (unsubscribe). Mirrors `UnAdvise` (`cs:443-466`).
///
/// Note the `.NET` reference uses `NmxTransferMessageKind.Write`
/// (not `ItemControl`) for the envelope on the unadvise path
/// (`cs:457`). The Rust port matches that exactly per CLAUDE.md
/// "preserve unknown bytes" — this is the .NET reference's choice,
/// not a generic NMX rule.
pub async fn un_advise(
&mut self,
local_engine_id: i32,
tag: &GalaxyTagMetadata,
item_correlation_id: [u8; 16],
galaxy_id: u8,
source_galaxy_id: i32,
source_platform_id: i32,
) -> Result<i32, NmxClientError> {
let inner = encode_un_advise_transfer_body(
local_engine_id,
tag,
item_correlation_id,
galaxy_id,
source_galaxy_id,
source_platform_id,
)?;
self.transfer_data(
i32::from(galaxy_id),
i32::from(tag.platform_id),
i32::from(tag.engine_id),
&inner,
)
.await
}
}
// ------------------------------------------------------------------
// Pure-codec helpers — extracted for testability and to mirror the
// .NET reference's `internal static` `Encode*TransferBody` methods at
// `ManagedNmxService2Client.cs:186-301`.
// ------------------------------------------------------------------
/// Mirrors `EncodeWriteTransferBody` (`cs:186-212`).
#[allow(clippy::too_many_arguments)]
pub(crate) fn encode_write_transfer_body(
local_engine_id: i32,
tag: &GalaxyTagMetadata,
value: &WriteValue,
write_index: i32,
client_token: u32,
galaxy_id: u8,
source_galaxy_id: i32,
source_platform_id: i32,
) -> Result<Vec<u8>, NmxClientError> {
let handle = tag.to_reference_handle(galaxy_id)?;
let inner = write_message::encode(&handle, value, write_index, client_token)?;
Ok(envelope_for(
NmxTransferMessageKind::Write,
local_engine_id,
galaxy_id,
tag,
source_galaxy_id,
source_platform_id,
)
.encode_with_inner(&inner))
}
/// Mirrors `EncodeWrite2TransferBody` (`cs:214-242`).
#[allow(clippy::too_many_arguments)]
pub(crate) fn encode_write2_transfer_body(
local_engine_id: i32,
tag: &GalaxyTagMetadata,
value: &WriteValue,
timestamp_filetime: i64,
write_index: i32,
client_token: u32,
galaxy_id: u8,
source_galaxy_id: i32,
source_platform_id: i32,
) -> Result<Vec<u8>, NmxClientError> {
let handle = tag.to_reference_handle(galaxy_id)?;
let inner = write_message::encode_timestamped(
&handle,
value,
timestamp_filetime,
write_index,
client_token,
)?;
Ok(envelope_for(
NmxTransferMessageKind::Write,
local_engine_id,
galaxy_id,
tag,
source_galaxy_id,
source_platform_id,
)
.encode_with_inner(&inner))
}
/// Mirrors `EncodeWriteSecured2TransferBody` (`cs:244-278`).
#[allow(clippy::too_many_arguments)]
pub(crate) fn encode_write_secured2_transfer_body(
local_engine_id: i32,
tag: &GalaxyTagMetadata,
value: &WriteValue,
timestamp_filetime: i64,
client_name: &str,
current_user_id: i32,
verifier_user_id: i32,
write_index: i32,
client_token: u32,
galaxy_id: u8,
source_galaxy_id: i32,
source_platform_id: i32,
) -> Result<Vec<u8>, NmxClientError> {
let handle = tag.to_reference_handle(galaxy_id)?;
let current_token = secured_write::resolve_observed_user_token(current_user_id);
let verifier_token = secured_write::resolve_observed_user_token(verifier_user_id);
let inner = secured_write::encode(
&handle,
value,
current_token,
verifier_token,
client_name,
timestamp_filetime,
write_index,
client_token,
)?;
Ok(envelope_for(
NmxTransferMessageKind::Write,
local_engine_id,
galaxy_id,
tag,
source_galaxy_id,
source_platform_id,
)
.encode_with_inner(&inner))
}
/// Mirrors `EncodeAdviseSupervisoryTransferBody` (`cs:280-301`).
pub(crate) fn encode_advise_supervisory_transfer_body(
local_engine_id: i32,
tag: &GalaxyTagMetadata,
item_correlation_id: [u8; 16],
galaxy_id: u8,
source_galaxy_id: i32,
source_platform_id: i32,
) -> Result<Vec<u8>, NmxClientError> {
let handle = tag.to_reference_handle(galaxy_id)?;
let inner = NmxItemControlMessage::from_reference_handle_fields(
NmxItemControlCommand::AdviseSupervisory,
item_correlation_id,
handle.object_id,
handle.object_signature(),
handle.primitive_id,
handle.attribute_id,
handle.property_id,
handle.attribute_signature(),
handle.attribute_index,
// `tail` defaults to `DEFAULT_TAIL = 3` per
// `mxaccess_codec::item_control` doc / `cs:88`.
3,
)
.encode();
Ok(envelope_for(
NmxTransferMessageKind::ItemControl,
local_engine_id,
galaxy_id,
tag,
source_galaxy_id,
source_platform_id,
)
.encode_with_inner(&inner))
}
/// Mirrors the `UnAdvise` body builder embedded in `cs:451-465`. Note
/// this uses [`NmxTransferMessageKind::Write`] for the envelope, not
/// `ItemControl` (cs:457) — same divergence from the AdviseSupervisory
/// envelope as the .NET reference.
pub(crate) fn encode_un_advise_transfer_body(
local_engine_id: i32,
tag: &GalaxyTagMetadata,
item_correlation_id: [u8; 16],
galaxy_id: u8,
source_galaxy_id: i32,
source_platform_id: i32,
) -> Result<Vec<u8>, NmxClientError> {
let handle = tag.to_reference_handle(galaxy_id)?;
let inner = NmxItemControlMessage::from_reference_handle_fields(
NmxItemControlCommand::UnAdvise,
item_correlation_id,
handle.object_id,
handle.object_signature(),
handle.primitive_id,
handle.attribute_id,
handle.property_id,
handle.attribute_signature(),
handle.attribute_index,
3,
)
.encode();
Ok(envelope_for(
// Mirrors `cs:457` — the .NET reference uses `Write` here, not
// `ItemControl`. Preserved verbatim per CLAUDE.md.
NmxTransferMessageKind::Write,
local_engine_id,
galaxy_id,
tag,
source_galaxy_id,
source_platform_id,
)
.encode_with_inner(&inner))
}
/// Build the `NmxTransferEnvelope` shared by every wrapper. Mirrors the
/// `NmxTransferEnvelope.Encode(...)` parameter assembly the .NET
/// reference repeats at `cs:203-211, 233-241, 269-277, 292-300, 410-418,
/// 431-439, 456-464`.
fn envelope_for(
message_kind: NmxTransferMessageKind,
local_engine_id: i32,
galaxy_id: u8,
tag: &GalaxyTagMetadata,
source_galaxy_id: i32,
source_platform_id: i32,
) -> NmxTransferEnvelope {
NmxTransferEnvelope {
message_kind,
local_engine_id,
target_galaxy_id: i32::from(galaxy_id),
target_platform_id: i32::from(tag.platform_id),
target_engine_id: i32::from(tag.engine_id),
source_galaxy_id,
source_platform_id,
..Default::default()
}
}
#[cfg(test)]
#[allow(
clippy::unwrap_used,
clippy::expect_used,
clippy::indexing_slicing,
clippy::panic
)]
mod tests {
use super::*;
use mxaccess_rpc::orpc::OrpcThat;
use mxaccess_rpc::pdu::{PacketType, PduHeader, ResponsePdu};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
fn local_addr() -> SocketAddr {
"127.0.0.1:0".parse().unwrap()
}
// ----- F12 host[port] parser ------------------------------------------
#[test]
fn parse_bracketed_extracts_host_and_port() {
let (h, p) = parse_bracketed_host_port("DESKTOP-6JL3KKO[55690]").unwrap();
assert_eq!(h, "DESKTOP-6JL3KKO");
assert_eq!(p, 55690);
}
#[test]
fn parse_bracketed_uses_last_brackets() {
// The native ResolveOxid bindings can include FQDN forms like
// `host.subdomain[12345]` — `rfind` keeps the right boundary.
let (h, p) = parse_bracketed_host_port("foo.example.com[1234]").unwrap();
assert_eq!(h, "foo.example.com");
assert_eq!(p, 1234);
}
#[test]
fn parse_bracketed_rejects_missing_open() {
let err = parse_bracketed_host_port("hostonly").unwrap_err();
assert!(matches!(err, NmxClientError::EndpointResolution { .. }));
}
#[test]
fn parse_bracketed_rejects_missing_close() {
let err = parse_bracketed_host_port("host[1234").unwrap_err();
assert!(matches!(err, NmxClientError::EndpointResolution { .. }));
}
#[test]
fn parse_bracketed_rejects_non_numeric_port() {
let err = parse_bracketed_host_port("host[abc]").unwrap_err();
assert!(matches!(err, NmxClientError::EndpointResolution { .. }));
}
#[test]
fn parse_bracketed_rejects_port_overflow() {
let err = parse_bracketed_host_port("host[100000]").unwrap_err();
assert!(matches!(err, NmxClientError::EndpointResolution { .. }));
}
/// Live integration test for [`NmxClient::create`]. Activates
/// `NmxSvc.NmxService` and resolves the `INmxService2` IPID via the
/// real OBJREF + OXID + RemQI chain. Gated on `MX_LIVE` plus the
/// `MX_TEST_USER` / `MX_TEST_PASSWORD` / `MX_TEST_DOMAIN` triple
/// populated by `tools/Setup-LiveProbeEnv.ps1` (which fetches them
/// from Infisical).
#[cfg(all(windows, feature = "windows-com"))]
#[tokio::test(flavor = "current_thread")]
#[ignore = "requires AVEVA + MX_LIVE; gated on env vars from Setup-LiveProbeEnv.ps1"]
async fn live_create_resolves_inmxservice2() {
if std::env::var_os("MX_LIVE").is_none() {
eprintln!("MX_LIVE not set; skipping");
return;
}
let user = match std::env::var("MX_TEST_USER") {
Ok(s) if !s.is_empty() => s,
_ => {
eprintln!("MX_TEST_USER not set; skipping");
return;
}
};
let password = match std::env::var("MX_TEST_PASSWORD") {
Ok(s) if !s.is_empty() => s,
_ => {
eprintln!("MX_TEST_PASSWORD not set; skipping");
return;
}
};
let domain = std::env::var("MX_TEST_DOMAIN").unwrap_or_default();
let factory = || {
mxaccess_rpc::ntlm::NtlmClientContext::new(&user, &password, &domain, None)
};
let client = NmxClient::create(factory)
.await
.expect("NmxClient::create round-trip");
// The resolved IPID must be non-zero — the activated server
// always picks a real GUID.
assert_ne!(
client.service_ipid().as_bytes(),
&[0u8; 16],
"service IPID is all-zero (RemQueryInterface didn't return a real IPID)"
);
}
/// Spin a hand-rolled DCE/RPC server that:
/// 1. accepts one connection,
/// 2. drains one Bind PDU and replies with a 16-byte BindAck shell,
/// 3. handles `request_count` Request PDUs by responding with an
/// OrpcThat + i32 HRESULT body (passed in via `responses`).
///
/// The server does NOT validate the inbound NTLM signature — for tests
/// we exercise the unauthenticated path via `NmxClient::from_bound_transport`.
async fn unauthenticated_server(
responses: Vec<(i32, Vec<u8>)>,
) -> (SocketAddr, tokio::task::JoinHandle<()>) {
let listener = TcpListener::bind(local_addr()).await.unwrap();
let addr = listener.local_addr().unwrap();
let handle = tokio::spawn(async move {
let (mut sock, _) = listener.accept().await.unwrap();
// Bind + BindAck.
let mut hdr = [0u8; 16];
sock.read_exact(&mut hdr).await.unwrap();
let bind_h = PduHeader::decode(&hdr).unwrap();
let mut body = vec![0u8; bind_h.fragment_length as usize - 16];
sock.read_exact(&mut body).await.unwrap();
let resp_h = PduHeader {
version: 5,
version_minor: 0,
packet_type: PacketType::BindAck,
packet_flags: 0x03,
data_representation: 0x10,
fragment_length: 16,
auth_length: 0,
call_id: bind_h.call_id,
};
let mut out = [0u8; 16];
resp_h.encode(&mut out).unwrap();
sock.write_all(&out).await.unwrap();
// Service the requested number of Request PDUs.
for (custom_hresult, extra_payload) in responses {
sock.read_exact(&mut hdr).await.unwrap();
let req_h = PduHeader::decode(&hdr).unwrap();
let mut body = vec![0u8; req_h.fragment_length as usize - 16];
sock.read_exact(&mut body).await.unwrap();
let mut stub = Vec::new();
stub.extend_from_slice(&OrpcThat::default().encode());
stub.extend_from_slice(&custom_hresult.to_le_bytes());
stub.extend_from_slice(&extra_payload);
let response = ResponsePdu {
header: PduHeader {
version: 5,
version_minor: 0,
packet_type: PacketType::Response,
packet_flags: 0x03,
data_representation: 0x10,
fragment_length: 0,
auth_length: 0,
call_id: req_h.call_id,
},
allocation_hint: stub.len() as u32,
context_id: 0,
cancel_count: 0,
reserved23: 0,
stub_data: stub,
};
let bytes = response.encode();
sock.write_all(&bytes).await.unwrap();
}
});
(addr, handle)
}
/// Build a NmxClient over an already-bound unauthenticated transport
/// for tests. Bypasses the NTLM bind — the test server does not
/// validate signatures.
async fn connect_unauth(
addr: SocketAddr,
service_ipid: Guid,
) -> Result<NmxClient, NmxClientError> {
let mut transport = DceRpcTcpClient::connect(addr)
.await
.map_err(TransportError::from)?;
transport.bind(svc::INTERFACE_ID, 0, 0).await?;
Ok(NmxClient::from_bound_transport(transport, service_ipid))
}
#[tokio::test]
async fn get_partner_version_returns_partner_version_on_zero_hresult() {
// Response stub = OrpcThat(8) + partner_version(4) + hresult(4).
// The server appends an extra 4 bytes for partner_version=6.
let responses = vec![(0i32, 6i32.to_le_bytes().to_vec())];
// Wait — encode order is: OrpcThat || partner_version(i32) || hresult(i32).
// But our server appends `custom_hresult` before `extra_payload`. So we
// need to flip: extra_payload here should NOT exist — instead we lay
// out the stub manually. Switch responses interpretation: treat the
// first i32 as the value at offset 8..12 (partner_version), and use
// extra_payload for the trailing hresult.
let _ = responses;
let responses = vec![(6i32, 0i32.to_le_bytes().to_vec())];
let (addr, handle) = unauthenticated_server(responses).await;
let mut client = connect_unauth(addr, Guid::new([0xCC; 16])).await.unwrap();
let v = client.get_partner_version(1, 1, 0).await.unwrap();
assert_eq!(v, 6);
handle.await.unwrap();
}
#[tokio::test]
async fn get_partner_version_errors_on_non_zero_hresult() {
// Server replies partner_version=0, hresult=0x8004_0005 (E_FAIL).
let responses = vec![(0i32, 0x8004_0005u32.to_le_bytes().to_vec())];
let (addr, handle) = unauthenticated_server(responses).await;
let mut client = connect_unauth(addr, Guid::new([0xCC; 16])).await.unwrap();
let err = client.get_partner_version(1, 1, 0).await.unwrap_err();
match err {
NmxClientError::NonZeroHresult { operation, hresult } => {
assert_eq!(operation, "GetPartnerVersion");
assert_eq!(hresult, 0x8004_0005u32 as i32);
}
other => panic!("expected NonZeroHresult, got {other:?}"),
}
handle.await.unwrap();
}
#[tokio::test]
async fn unregister_engine_returns_hresult_verbatim() {
// CallForHResult does NOT raise on non-zero — caller decides.
// Server replies with a non-zero HRESULT; client just returns it.
let responses = vec![(0x4242i32, Vec::new())];
let (addr, handle) = unauthenticated_server(responses).await;
let mut client = connect_unauth(addr, Guid::new([0xCC; 16])).await.unwrap();
let hr = client.unregister_engine(1).await.unwrap();
assert_eq!(hr, 0x4242);
handle.await.unwrap();
}
#[tokio::test]
async fn transfer_data_rejects_empty_body() {
let (addr, handle) = unauthenticated_server(Vec::new()).await;
let mut client = connect_unauth(addr, Guid::new([0xCC; 16])).await.unwrap();
let err = client.transfer_data(1, 1, 1, &[]).await.unwrap_err();
assert!(matches!(err, NmxClientError::EmptyTransferDataBody));
// The server is still running; abort it.
handle.abort();
}
#[tokio::test]
async fn register_engine_2_without_callback_round_trip() {
let responses = vec![(0i32, Vec::new())];
let (addr, handle) = unauthenticated_server(responses).await;
let mut client = connect_unauth(addr, Guid::new([0xCC; 16])).await.unwrap();
let hr = client
.register_engine_2_without_callback(7, "TestEngine", 6)
.await
.unwrap();
assert_eq!(hr, 0);
handle.await.unwrap();
}
#[tokio::test]
async fn set_heartbeat_send_interval_round_trip() {
let responses = vec![(0i32, Vec::new())];
let (addr, handle) = unauthenticated_server(responses).await;
let mut client = connect_unauth(addr, Guid::new([0xCC; 16])).await.unwrap();
let hr = client.set_heartbeat_send_interval(100, 5).await.unwrap();
assert_eq!(hr, 0);
handle.await.unwrap();
}
#[test]
fn fresh_orpc_this_uses_default_com_version() {
let o = fresh_orpc_this();
assert_eq!(o.version, mxaccess_rpc::orpc::ComVersion::VERSION_5_7);
assert_eq!(o.flags, 0);
assert_eq!(o.extensions_referent_id, 0);
}
#[test]
fn service_ipid_accessor() {
// Use a deterministic IPID; doesn't need a network round-trip since
// the accessor just echoes the field. We bypass `connect` and build
// the struct via from_bound_transport with a stub connection later
// — for now just prove the IPID round-trips through the struct.
let g = Guid::new([0xAB; 16]);
// We can't construct a NmxClient without a TcpStream here, so just
// verify the Guid type itself.
assert_eq!(g.as_bytes()[0], 0xAB);
}
// ------------------------------------------------------------------
// F13 wrapper tests — exercise the encode_*_transfer_body helpers
// standalone (no network), then a representative round-trip that
// routes the resulting envelope through the unauthenticated server.
// ------------------------------------------------------------------
fn sample_metadata() -> GalaxyTagMetadata {
GalaxyTagMetadata {
object_tag_name: "TestObj".to_string(),
attribute_name: "TestInt".to_string(),
primitive_name: None,
platform_id: 5,
engine_id: 7,
object_id: 42,
primitive_id: -1,
attribute_id: 99,
property_id: 10,
mx_data_type: 2, // Integer
is_array: false,
security_classification: 0,
attribute_source: "dynamic".to_string(),
}
}
#[test]
fn encode_write_transfer_body_produces_46_byte_envelope_plus_inner() {
let meta = sample_metadata();
let body =
encode_write_transfer_body(42, &meta, &WriteValue::Int32(123), 1, 0, 1, 1, 1).unwrap();
// 46-byte envelope header + non-empty inner.
assert!(body.len() > NmxTransferEnvelope::HEADER_LEN);
// Envelope target_galaxy_id=1, target_platform_id=5, target_engine_id=7.
let env = NmxTransferEnvelope::parse(&body).unwrap();
assert_eq!(env.message_kind, NmxTransferMessageKind::Write);
assert_eq!(env.target_galaxy_id, 1);
assert_eq!(env.target_platform_id, 5);
assert_eq!(env.target_engine_id, 7);
assert_eq!(env.local_engine_id, 42);
}
#[test]
fn encode_write2_transfer_body_carries_timestamp() {
let meta = sample_metadata();
let body = encode_write2_transfer_body(
10,
&meta,
&WriteValue::Float64(2.5_f64),
0x1234_5678_ABCD_EF00,
5,
0xCAFE_BABE,
1,
1,
1,
)
.unwrap();
let env = NmxTransferEnvelope::parse(&body).unwrap();
assert_eq!(env.message_kind, NmxTransferMessageKind::Write);
// Inner body contains the timestamp; we just confirm it exists
// (codec parity tests in mxaccess-codec verify the byte layout).
assert!(body.len() > NmxTransferEnvelope::HEADER_LEN);
}
#[test]
fn encode_advise_supervisory_uses_item_control_kind() {
let meta = sample_metadata();
let body = encode_advise_supervisory_transfer_body(42, &meta, [0xAA; 16], 1, 1, 1).unwrap();
let env = NmxTransferEnvelope::parse(&body).unwrap();
assert_eq!(env.message_kind, NmxTransferMessageKind::ItemControl);
}
#[test]
fn encode_un_advise_uses_write_kind_per_dotnet_reference() {
// cs:457 — the .NET reference uses Write (not ItemControl) for
// the unadvise envelope. Preserved verbatim per CLAUDE.md.
let meta = sample_metadata();
let body = encode_un_advise_transfer_body(42, &meta, [0xBB; 16], 1, 1, 1).unwrap();
let env = NmxTransferEnvelope::parse(&body).unwrap();
assert_eq!(env.message_kind, NmxTransferMessageKind::Write);
}
#[test]
fn encode_write_secured2_includes_dual_user_tokens() {
let meta = sample_metadata();
let body = encode_write_secured2_transfer_body(
42,
&meta,
&WriteValue::Int32(99),
0x1234_5678_ABCD_EF00,
"dohejw01",
7,
7, // single-user secured: same id twice
1,
0,
1,
1,
1,
)
.unwrap();
let env = NmxTransferEnvelope::parse(&body).unwrap();
assert_eq!(env.message_kind, NmxTransferMessageKind::Write);
// Inner body must be longer than a plain Write2 because of the
// 16+16 user tokens + UTF-16LE client name.
let plain_write2 = encode_write2_transfer_body(
42,
&meta,
&WriteValue::Int32(99),
0x1234_5678_ABCD_EF00,
1,
0,
1,
1,
1,
)
.unwrap();
assert!(body.len() > plain_write2.len());
}
#[test]
fn encode_wrapper_propagates_invalid_name_as_codec_error() {
let mut meta = sample_metadata();
meta.object_tag_name = " ".to_string();
let err = encode_write_transfer_body(42, &meta, &WriteValue::Int32(0), 1, 0, 1, 1, 1)
.unwrap_err();
assert!(matches!(err, NmxClientError::Codec(_)));
}
#[tokio::test]
async fn write_round_trip_via_server() {
let responses = vec![(0i32, Vec::new())];
let (addr, handle) = unauthenticated_server(responses).await;
let mut client = connect_unauth(addr, Guid::new([0xCC; 16])).await.unwrap();
let meta = sample_metadata();
let hr = client
.write(42, &meta, &WriteValue::Int32(1234), 1, 0, 1, 1, 1)
.await
.unwrap();
assert_eq!(hr, 0);
handle.await.unwrap();
}
#[tokio::test]
async fn advise_supervisory_round_trip_via_server() {
let responses = vec![(0i32, Vec::new())];
let (addr, handle) = unauthenticated_server(responses).await;
let mut client = connect_unauth(addr, Guid::new([0xCC; 16])).await.unwrap();
let meta = sample_metadata();
let hr = client
.advise_supervisory(42, &meta, [0xDD; 16], 1, 1, 1)
.await
.unwrap();
assert_eq!(hr, 0);
handle.await.unwrap();
}
#[tokio::test]
async fn send_observed_pre_advise_metadata_routes_to_platform_engine_one() {
// The .NET reference hardcodes target (1, 1); verify the wrapper
// calls transfer_data with that routing.
let responses = vec![(0i32, Vec::new())];
let (addr, handle) = unauthenticated_server(responses).await;
let mut client = connect_unauth(addr, Guid::new([0xCC; 16])).await.unwrap();
let hr = client
.send_observed_pre_advise_metadata(42, [0xEE; 16], 1, 1, 1)
.await
.unwrap();
assert_eq!(hr, 0);
handle.await.unwrap();
}
}