Layout:
- src/ .NET 10 x64 reference: MxNativeCodec, MxNativeClient,
MxAsbClient, probes, tests, harnesses. Executable spec.
- design/ Architectural plan for the Rust port (M0–M6), error
model, protocol invariants, risks (R1–R16), adversarial
review log (review.md).
- rust/ Rust workspace. M0 skeleton + M1 codec parity.
mxaccess-codec: 215 unit tests + 2 cross-implementation
parity tests (byte-identical against .NET reference).
Other crates are M0 stubs awaiting M2+.
- captures/ Frida + netsh + pcap evidence per CLAUDE.md
("captures are evidence, not throwaway logs").
- analysis/ Decompiled C# (frida/proxy/decompiled-*),
Ghidra exports for native DLLs (`exports/` only —
working state at `projects/` and AVEVA's input
binaries at `input/` are gitignored).
- docs/ Reverse-engineering reference docs.
- tools/ Setup-LiveProbeEnv.ps1 (Infisical credential fetcher),
Compute-Crc.ps1 (.NET parity helper).
- .github/workflows/ Rust CI: fmt + build + test + clippy on Windows.
- LICENSE MIT (Joseph Doherty, 2026).
Verified:
- cargo test --workspace → 217 passed (215 unit + 2 .NET parity), 0 failed
- cargo clippy --workspace -- -D warnings → clean
- cargo fmt --all -- --check → clean
- cargo publish --dry-run -p mxaccess-codec → packages cleanly
Excluded from history (see .gitignore):
- **/bin, **/obj, **/target — build artifacts
- analysis/ghidra/projects/ — Ghidra working state (regenerable)
- analysis/ghidra/input/ — AVEVA proprietary DLLs (vendor IP)
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
23 KiB
Async layer (Tokio)
The async layer is the public face of the library. Most consumers depend on mxaccess (the top-level crate) and never see the raw crates. The crate is async-native, Tokio-based, and exposes idiomatic Rust: typed errors, Send + Sync handles, Streams for subscriptions, drop-cancellable subscriptions, tracing instrumentation.
Public crate: mxaccess
Re-exports the core types from mxaccess-codec, hides the raw transports, adds the async session.
Connection
use mxaccess::{Session, ConnectionOptions, Credentials, MxValue};
use std::time::SystemTime;
let session = Session::connect(
ConnectionOptions::nmx("localhost")
.galaxy_id(1)
.platform_id(1)
.engine_id(MX_LOCAL_ENGINE)
.credentials(Credentials::current_user())
.galaxy_db("Server=localhost;Database=Galaxy;Integrated Security=True;TrustServerCertificate=True"),
).await?;
ConnectionOptions::nmx(...) selects NmxTransport; ConnectionOptions::asb(...) selects AsbTransport. Both produce a Session.
Session is Clone + Send + Sync. Internally it wraps Arc<SessionInner> so cloned handles share the same underlying connection.
Orderly shutdown — Session::shutdown(timeout: Duration) -> impl Future<Output = Result<(), Error>>. Sends UnAdvise for every live subscription, then UnregisterEngine, and awaits the connection task's confirmation that those frames have flushed — or returns Err(Error::Timeout(_)) if the timeout elapses first. This is the recommended exit path for production code and is the async equivalent of the .NET reference's synchronous Dispose (src/MxNativeClient/MxNativeSession.cs:476-514).
Drop of the last Session clone is a best-effort fallback: it signals UnregisterEngine to the connection task via the same in-process channel that subscription drops use (no tokio::spawn, no block_on). If the runtime is shut down before the connection task drains, the unregister is lost — see the runtime-shutdown leak note under "Cancellation". Callers that care about deterministic engine deregistration must call Session::shutdown rather than relying on drop.
Operations
// Fire-and-forget: returns when the LMX `Write` RPC return is acked.
// No `WriteCompleted` callback is awaited.
session.write("TestChildObject.TestInt", MxValue::Int32(123)).await?;
// Awaits the 5-byte `OperationStatus` completion frame. The `client_token`
// correlates the wire callback to this call (see writeIndex/clientToken on
// `MxNativeSession.WriteAsync`, src/MxNativeClient/MxNativeSession.cs:165-185).
session.write_with_completion(
"TestChildObject.TestInt",
MxValue::Int32(123),
/* client_token: */ 0x1001u32,
).await?;
session.write_with_timestamp(
"TestChildObject.TestInt",
MxValue::Int32(123),
SystemTime::now(),
).await?;
// Verified Write — the LMX `WriteSecured` always takes TWO user ids:
// `(currentUserId, verifierUserId, value)`. "Single-user secured write" is
// callers passing the same id twice; it is NOT a separate API surface.
// `WriteSecured2` adds a timestamp; it does NOT add a second token. The
// `0x80004021` failure observed in `MxNativeSession.WriteSecuredAsync` is a
// defect of the .NET native reimplementation, not a real LMX constraint
// (verified against wwtools/mxaccesscli/docs/api-notes.md:60-72,87-95 and
// wwtools/mxaccesscli/src/MxAccess.Cli/Commands/WriteCommand.cs:44-101,151-155,196-199;
// the LMX proxy CLI exposes `WriteSecured(currentUserId, verifierUserId, value)`
// and treats single-user secured writes as `currentUserId == verifierUserId`).
session.write_secured(
"TestChildObject.TestInt",
MxValue::Int32(123),
SecurityContext { current_user_id, verifier_user_id },
).await?;
// Timestamped Verified Write — adds a `SystemTime`. Same two-id token shape;
// matches `WriteSecured2(currentUserId, verifierUserId, value, timestamp)`.
session.write_secured_at(
"TestChildObject.TestInt",
MxValue::Int32(123),
SystemTime::now(),
SecurityContext { current_user_id, verifier_user_id },
).await?;
// `read` is implemented as `subscribe + first-result + drop`, mirroring
// `MxNativeSession.ReadAsync` (src/MxNativeClient/MxNativeSession.cs:312-359),
// which requires a positive timeout and unadvises on completion or timeout.
let DataChange { value, status, timestamp, .. } =
session.read("TestChildObject.TestInt", Duration::from_secs(5)).await?;
All operations take a &str reference name (e.g. "TestObject.Attribute") and resolve it to a MxReferenceHandle internally via the configured Resolver. Default resolver is mxaccess-galaxy::SqlResolver; an in-memory resolver is provided for tests (InMemoryResolver::insert("Tag", metadata)).
Session::write returns Ok(()) once the LMX Write RPC has been acknowledged at the transport level — it does not await a wire WriteCompleted frame. Callers that need write-completion semantics must use Session::write_with_completion(reference, value, client_token), which threads client_token through the MxNativeSession.WriteAsync clientToken parameter (src/MxNativeClient/MxNativeSession.cs:165-185) and returns when the matching 5-byte OperationStatus callback frame is decoded. See 70-risks-and-open-questions.md R3/R4 for the cases where the proven stack does not emit a completion frame.
Session::read takes a Duration timeout (matching the .NET reference's mandatory TimeSpan timeout argument and ArgumentOutOfRangeException for non-positive values, src/MxNativeClient/MxNativeSession.cs:312-321). Implementation is subscribe + first-result + drop; the drop guard guarantees UnAdvise runs on the success, error, and timeout paths so no advise is leaked, mirroring the finally/Unsubscribe block at src/MxNativeClient/MxNativeSession.cs:351-358.
Subscriptions
use futures::StreamExt;
let mut subscription = session.subscribe("TestChildObject.TestInt").await?;
while let Some(change) = subscription.next().await {
let change = change?;
println!("{} = {:?} @ {:?} (status={:?})",
change.reference, change.value, change.timestamp, change.status);
}
// Drop the subscription to unadvise.
drop(subscription);
Subscription implements Stream<Item = Result<DataChange, Error>>.
Err semantics — non-terminal for parse errors, terminal after connection loss. The stream yields Err items for parse-level failures (the consumer can keep polling — the next inbound frame will be delivered). The stream ends with None after a final Err for connection-loss / subscription-end events; once None is observed, no further items will be yielded. This split mirrors the .NET reference's two events: CallbackReceived is raised per-record after a successful parse (src/MxNativeClient/MxNativeSession.cs:603-606), while UnparsedCallbackReceived is raised when NmxSubscriptionMessage.ParseProcessDataReceivedBody throws — without tearing down other live subscriptions (src/MxNativeClient/MxNativeSession.cs:590-601).
Consumers wanting strict parity with UnparsedCallbackReceived (raw bytes for unparseable frames) can subscribe to Subscription::raw_callbacks() -> Stream<Item = RawCallback>, or simply inspect the Err variants and keep polling: Error::Protocol(ProtocolError::Decode { .. }) corresponds to the .NET unparsed-callback path and is non-terminal; Error::Connection(_) is terminal and the next next().await returns None.
Dropping the subscription sends UnAdvise (best-effort, fire-and-forget — see drop semantics below) and removes the correlation from the session's subscription map.
For batch subscriptions:
let mut sub = session.subscribe_many(&["A.X", "A.Y", "A.Z"]).await?;
while let Some(change) = sub.next().await {
let change = change?;
// change.reference identifies which of the three
}
Multi-tag subscriptions multiplex on the same callback channel and demultiplex by correlation ID inside the session task. The wire still issues one Advise per tag — the Rust API does not pretend a single advise covers many tags.
subscribe_many is non-atomic. The implementation issues one AdviseSupervisory per tag in a loop, mirroring MxNativeSession.SubscribeAsync which produces a fresh CorrelationId and calls _service.AdviseSupervisory per tag (src/MxNativeClient/MxNativeSession.cs:250-270). If the Nth advise fails, the first N-1 succeed and remain advised. The error is surfaced through the returned Result; the partial set lives on the returned Subscription, and the consumer chooses how to recover:
Subscription::dropto unadvise the partial set, or- retry the failed tag (e.g. via a follow-up
session.subscribe(failed_tag).await).
subscribe_many_atomic (an all-or-nothing variant that rolls back on partial failure) is not provided in V1 — the proven .NET reference has no atomic equivalent and the wire offers no transactional advise primitive.
Buffered subscriptions (NMX only)
subscribe_buffered mirrors the .NET reference's MxNativeSession.RegisterBufferedItemAsync, which takes the dual-string itemDefinition/itemContext split plus an itemHandle: int (src/MxNativeClient/MxNativeSession.cs:272-310). The Rust API takes the same parameters explicitly via a BufferedSubscription request struct — no convenience overload that hides the (definition, context, item_handle) triple is offered, because a consumer that omits any of the three cannot reproduce the captured Frida bodies.
let mut sub = session.subscribe_buffered(BufferedSubscription {
definition: "TestMachine_001.TestHistoryValue",
context: "", // optional, may be empty per RegisterBufferedItemAsync:279
item_handle: 0x1001, // i32 mapped to NmxReferenceRegistrationMessage.ItemHandle
options: BufferedOptions {
sample_interval: Duration::from_millis(100),
max_queue_size: 1000,
},
}).await?;
while let Some(batch) = sub.next().await {
for sample in batch?.samples {
// sample is a DataChange
}
}
subscribe_buffered is gated on the nmx feature and returns Stream<Item = Result<DataChangeBatch, Error>>. The deployed AVEVA provider may emit single-sample batches even with buffering enabled — see risk R2 in 70-risks-and-open-questions.md. The API does not synthesise batches; if the wire returns one sample per record, the batch is samples.len() == 1.
Recovery
Recovery is caller-driven, not automatic. This mirrors the .NET reference: MxNativeSession.RecoverConnection and RecoverConnectionAsync(policy) are explicit entry points the consumer invokes — the session never auto-starts recovery on heartbeat loss (src/MxNativeClient/MxNativeSession.cs:383-440). The Rust API exposes the same shape:
// Caller invokes recovery when they choose, with the policy of their choice.
session.recover_connection(RecoveryPolicy::exponential(
Duration::from_secs(1),
/* max_attempts: */ 5,
Duration::from_secs(60),
)).await?;
Heartbeat-loss surfaces as Error::Connection(...) on subsequent operations (write/read/subscribe). The caller decides whether to call recover_connection based on observed errors and recovery events. There is no implicit recovery thread that resurrects the session in the background.
In-flight calls during recovery fail. While recover_connection is running, in-flight writes/reads/subscriptions against the previous transport are not paused, replayed, or migrated — they observe the existing transport being torn down and fail with Error::Connection(...). The .NET reference's _recoveryActive is similarly just an inbound-callback annotation flag (src/MxNativeClient/MxNativeSession.cs:444,472); concurrent calls against _service are not interlocked. The Rust design does not promise "the future resumes on the new connection" — the caller is responsible for retrying after a successful RecoveryEvent::Completed.
let mut events = session.recovery_events();
while let Some(ev) = events.next().await {
tracing::info!(?ev, "recovery event");
}
pub enum RecoveryEvent {
Started { attempt: u32 },
Failed { attempt: u32, error: Error, will_retry: bool },
Completed { duration: Duration },
}
The event stream mirrors the .NET reference's MxNativeSession.RecoveryAttemptStarted/Failed/Completed events one-for-one (src/MxNativeClient/MxNativeSession.cs:121-123).
Cancellation
Three cancellation surfaces, in order of preference for callers:
- Drop the future or handle. Dropping a
SubscriptionsignalsUnAdviseto the long-lived connection task; dropping aSessionsignalsUnregisterEngineto the same task. Drop never spawns a new Tokio task — instead,Subscriptionholds atokio::sync::oneshot::Sender<UnAdviseRequest>(or equivalent unbounded channel sender), and itsDropimpl sends a message that the connection task drains in its event loop. Drop is therefore safe outside a runtime context and during runtime shutdown — it does not calltokio::spawnfromDrop. tokio_util::sync::CancellationToken. Long operations (subscribe_buffered, recovery,connect) accept an optionalCancellationTokenvia*_with_cancellationvariants.- Timeout.
tokio::time::timeoutworks on every operation becauseasync fns are cancel-correct by construction.
Known runtime-shutdown leak. If the Tokio runtime is shut down before the connection task has drained pending UnAdvise/UnregisterEngine messages, those frames are not delivered to the wire. Production code should avoid this by calling Session::shutdown(timeout).await (see below) on the orderly-exit path. The .NET reference has the same shape: Dispose runs synchronously and calls _service.UnAdvise(...) per live subscription before UnregisterEngine (src/MxNativeClient/MxNativeSession.cs:483-495). The Rust async equivalent is Session::shutdown; relying on Drop alone for cleanup is best-effort and documented as such.
Error model
mxaccess::Error is a thiserror-derived #[non_exhaustive] enum. See 50-error-model.md for the full surface. All operations return Result<T, Error>; no panics in the public surface.
A non-Ok MxStatus on a returned DataChange is data, not an error. A non-Ok status on read/write/subscribe's synchronous result is an Err. This mirrors the .NET reference and is the only sensible split: subscription frames carry status that callers want to inspect ("stale" or "uncertain" is still data); operation results are pass/fail.
Threading model
- Multi-thread Tokio is the default. Single-thread is supported (no
Send-only future escapes the local thread) but not the recommended deployment. - All public types are
Send + Sync. - The codec is
Send + Synctrivially (immutable after parse, owns its bytes). - The session uses
tokio::sync::Mutexfor the per-connection RPC channel state andtokio::sync::watchfor recovery state. Noparking_lot::Mutex— sync mutexes inside async paths cause hidden blocking.
Observability
tracing spans on every public operation: tracing::instrument on register, write, subscribe, read, recover. Span fields: reference, correlation_id, transport (nmx|asb), engine_ids. Span events for state transitions (subscription added, callback received, recovery started, recovery completed).
Recommended subscriber filter:
mxaccess::session=info,mxaccess::transport=debug
Optional metrics feature exposes:
- counters:
mxaccess_writes_total,mxaccess_subscribes_total,mxaccess_callbacks_total,mxaccess_recoveries_total - histograms:
mxaccess_operation_latency_seconds{op="write"|"read"|"subscribe"}
Trait Transport
Transport uses native async fn in trait (AFIT, stable in Rust 1.75+) and is generic-only. Consumers parameterise sites that take a transport with impl Transport or a generic <T: Transport> bound. The trait is not dyn-compatible — Box<dyn Transport> is not supported in V1 — and that limitation is intentional: the design already uses Session::connect<T: Transport>(...)-style generic entry points, so giving up dyn Transport costs nothing the design currently uses, while keeping zero per-call heap allocation in the hot path. (#[async_trait] was the alternative; it allows dyn Transport but boxes a Pin<Box<dyn Future>> per call — accepted as a known cost only if a future revision needs runtime polymorphism.)
pub trait Transport: Send + Sync {
fn capabilities(&self) -> TransportCapabilities;
async fn register(&self, options: &ConnectionOptions) -> Result<RegisteredEngine, Error>;
async fn unregister(&self, engine: &RegisteredEngine) -> Result<(), Error>;
async fn write(
&self,
handle: &MxReferenceHandle,
value: &MxValue,
opts: WriteOptions,
) -> Result<WriteOutcome, Error>;
async fn advise(
&self,
handle: &MxReferenceHandle,
opts: AdviseOptions,
) -> Result<SubscriptionHandle, Error>;
async fn unadvise(&self, sub: SubscriptionHandle) -> Result<(), Error>;
fn callbacks(&self) -> CallbackStream;
}
pub struct TransportCapabilities {
pub timestamped_writes: bool,
pub secured_writes: bool,
pub buffered_subscriptions: bool,
pub supervisory_advise: bool,
pub operation_complete_events: bool,
}
Two implementations: NmxTransport (capabilities mostly true) and AsbTransport (capabilities mostly false; see 70-risks-and-open-questions.md).
Calling a NMX-only API on an AsbTransport returns Error::Unsupported { operation: Cow<'static, str>, transport: TransportKind }. The Cow is used so the variant accepts both interned &'static str literals (the common case) and runtime-formatted operation names without allocation when not required; TransportKind is the corresponding enum TransportKind { Nmx, Asb } (matching the transport span field at line 204). The Session may also pre-flight via transport.capabilities() to give a better error message before issuing the call.
Public surface (re-exports)
pub use mxaccess_codec::{
MxReferenceHandle, MxStatus, MxStatusCategory, MxStatusSource,
MxValue, MxValueKind, MxDataType,
};
pub struct Session;
pub struct Subscription;
pub struct DataChange {
pub reference: Arc<str>,
pub value: MxValue,
/// Legacy 16-bit OPC quality (e.g. `0xC0` = 192 = "Good"). Distinct from
/// `status: MxStatus` — both are surfaced because real MxAccess
/// (`OnDataChange(hServer, hItem, MxDataType, value, quality, timestamp,
/// statuses)`) carries them as separate fields. Verified against
/// `wwtools/mxaccesscli/docs/api-notes.md:104-105` ("quality on
/// OnDataChange is the legacy 16-bit OPC quality value … the richer state
/// lives in the statuses[] array") and
/// `wwtools/mxaccesscli/src/MxAccess.Cli/Mx/MxUpdate.cs:13-22,39-65`.
/// Earlier drafts of this design dropped `quality` as redundant with
/// `status`; that was a parity break and has been restored.
pub quality: u16,
pub timestamp: SystemTime,
pub status: MxStatus,
}
pub struct DataChangeBatch {
pub reference: Arc<str>,
pub samples: Vec<DataChange>,
}
// Note on `quality`: `DataChange` carries a 16-bit OPC quality alongside
// `status: MxStatus`. They are distinct: `quality` is the legacy wire field
// (e.g. `0xC0` = "Good"), preserved for parity with real MxAccess
// (`OnDataChange` exposes both). The canonical projection from a wire record
// to a typed status is `Record.ToDataChangeStatus()` in the .NET reference
// (src/MxNativeClient/MxNativeSession.cs:70), which produces an `MxStatus`.
// Consumers that need the historical "quality" view (Good/Uncertain/Bad on
// bits 7..6) read it from `status.detail` and `status.category` rather than
// from a redundant raw u16. Exposing both invites callers to use the wrong
// field; the codec's `MxStatus` is the single source of truth.
pub struct ConnectionOptions;
pub struct WriteOptions;
pub struct AdviseOptions;
pub struct BufferedOptions;
pub struct RecoveryPolicy;
pub enum RecoveryEvent { Started { .. }, Failed { .. }, Completed { .. } }
pub struct Credentials;
pub struct SecurityContext;
pub enum Error; // see 50-error-model.md
mxaccess-compat (optional)
LMXProxyServer-shaped methods on top of Session. Each method maps one-to-one to a Session::* operation:
let server = mxaccess_compat::Server::new(session);
let server_handle = server.register("MyClient");
let item_handle = server.add_item(server_handle, "TestObject.TestInt").await?;
server.advise(server_handle, item_handle).await?;
server.write(server_handle, item_handle, MxValue::Int32(123), user_id).await?;
Useful for porting code that depends on the COM API shape; not the recommended consumer surface. Not COM-visible by itself; a separate mxaccess-compat-com crate (deferred to post-V1) will register windows-rs-generated COM classes that wrap this.
Examples
End-to-end consumer-grade examples in rust/examples/:
connect-write-read.rs— open session, write, read backsubscribe.rs— long-running subscriptionsubscribe-buffered.rs— buffered subscription (NMX feature)asb-subscribe.rs— ASB subscriptionrecovery.rs— recovery policy + recovery eventsmulti-tag.rs—subscribe_manyon a 100-tag setsecured-write.rs—write_secured(no timestamp) andwrite_secured_at(timestamped), each taking(current_user_id, verifier_user_id); demonstrates both single-user (current == verifier) and two-person verification paths
Code sample (full)
use std::time::Duration;
use futures::StreamExt;
use mxaccess::{Session, ConnectionOptions, Credentials, MxValue, RecoveryPolicy};
#[tokio::main(flavor = "multi_thread")]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let mut session = Session::connect(
ConnectionOptions::nmx("localhost")
.galaxy_id(1)
.platform_id(1)
.engine_id(420)
.credentials(Credentials::current_user())
.galaxy_db(std::env::var("MX_GALAXY_DB")?)
.recovery(RecoveryPolicy::exponential(
Duration::from_secs(1), 5, Duration::from_secs(60),
)),
).await?;
session.write("TestChildObject.TestInt", MxValue::Int32(123)).await?;
let mut sub = session.subscribe("TestChildObject.TestInt").await?;
while let Some(change) = sub.next().await {
let change = change?;
tracing::info!(value = ?change.value, ts = ?change.timestamp, "data change");
}
Ok(())
}
What the async layer does not do
- It does not pretend to be sync. There is no
block_onshortcut in the public API. - It does not support multiple async runtimes. Tokio only.
- It does not transmit raw bytes. All operations go through the codec.
- It does not retry by default. Recovery is opt-in via
ConnectionOptions::recovery(...)at session construction. There is no runtime mutator forRecoveryPolicy:SessionisClone + Arc<SessionInner>-backed, so a&mut selfsetter on a clone would not propagate to other clones; the policy is fixed once and shared by every clone. Consumers that need a different policy build a newSession. - It does not own a thread pool. It uses Tokio's runtime.
- It does not synthesise events the wire does not produce.
WriteCompletedonly fires when the proven 5-byte completion frame is observed; otherwiseRawStatusis exposed verbatim throughSession::operation_status_events(). See70-risks-and-open-questions.mdR3/R4.