Files
mxaccess/design/20-async-layer.md
T
Joseph Doherty fe2a6db786
rust / build / test / clippy / fmt (push) Has been cancelled
Initial project state: .NET reference, design, Rust port (M0+M1), evidence
Layout:
- src/                    .NET 10 x64 reference: MxNativeCodec, MxNativeClient,
                          MxAsbClient, probes, tests, harnesses. Executable spec.
- design/                 Architectural plan for the Rust port (M0–M6), error
                          model, protocol invariants, risks (R1–R16), adversarial
                          review log (review.md).
- rust/                   Rust workspace. M0 skeleton + M1 codec parity.
                          mxaccess-codec: 215 unit tests + 2 cross-implementation
                          parity tests (byte-identical against .NET reference).
                          Other crates are M0 stubs awaiting M2+.
- captures/               Frida + netsh + pcap evidence per CLAUDE.md
                          ("captures are evidence, not throwaway logs").
- analysis/               Decompiled C# (frida/proxy/decompiled-*),
                          Ghidra exports for native DLLs (`exports/` only —
                          working state at `projects/` and AVEVA's input
                          binaries at `input/` are gitignored).
- docs/                   Reverse-engineering reference docs.
- tools/                  Setup-LiveProbeEnv.ps1 (Infisical credential fetcher),
                          Compute-Crc.ps1 (.NET parity helper).
- .github/workflows/      Rust CI: fmt + build + test + clippy on Windows.
- LICENSE                 MIT (Joseph Doherty, 2026).

Verified:
- cargo test --workspace → 217 passed (215 unit + 2 .NET parity), 0 failed
- cargo clippy --workspace -- -D warnings → clean
- cargo fmt --all -- --check → clean
- cargo publish --dry-run -p mxaccess-codec → packages cleanly

Excluded from history (see .gitignore):
- **/bin, **/obj, **/target — build artifacts
- analysis/ghidra/projects/ — Ghidra working state (regenerable)
- analysis/ghidra/input/ — AVEVA proprietary DLLs (vendor IP)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-05 06:21:00 -04:00

23 KiB

Async layer (Tokio)

The async layer is the public face of the library. Most consumers depend on mxaccess (the top-level crate) and never see the raw crates. The crate is async-native, Tokio-based, and exposes idiomatic Rust: typed errors, Send + Sync handles, Streams for subscriptions, drop-cancellable subscriptions, tracing instrumentation.

Public crate: mxaccess

Re-exports the core types from mxaccess-codec, hides the raw transports, adds the async session.

Connection

use mxaccess::{Session, ConnectionOptions, Credentials, MxValue};
use std::time::SystemTime;

let session = Session::connect(
    ConnectionOptions::nmx("localhost")
        .galaxy_id(1)
        .platform_id(1)
        .engine_id(MX_LOCAL_ENGINE)
        .credentials(Credentials::current_user())
        .galaxy_db("Server=localhost;Database=Galaxy;Integrated Security=True;TrustServerCertificate=True"),
).await?;

ConnectionOptions::nmx(...) selects NmxTransport; ConnectionOptions::asb(...) selects AsbTransport. Both produce a Session.

Session is Clone + Send + Sync. Internally it wraps Arc<SessionInner> so cloned handles share the same underlying connection.

Orderly shutdown — Session::shutdown(timeout: Duration) -> impl Future<Output = Result<(), Error>>. Sends UnAdvise for every live subscription, then UnregisterEngine, and awaits the connection task's confirmation that those frames have flushed — or returns Err(Error::Timeout(_)) if the timeout elapses first. This is the recommended exit path for production code and is the async equivalent of the .NET reference's synchronous Dispose (src/MxNativeClient/MxNativeSession.cs:476-514).

Drop of the last Session clone is a best-effort fallback: it signals UnregisterEngine to the connection task via the same in-process channel that subscription drops use (no tokio::spawn, no block_on). If the runtime is shut down before the connection task drains, the unregister is lost — see the runtime-shutdown leak note under "Cancellation". Callers that care about deterministic engine deregistration must call Session::shutdown rather than relying on drop.

Operations

// Fire-and-forget: returns when the LMX `Write` RPC return is acked.
// No `WriteCompleted` callback is awaited.
session.write("TestChildObject.TestInt", MxValue::Int32(123)).await?;

// Awaits the 5-byte `OperationStatus` completion frame. The `client_token`
// correlates the wire callback to this call (see writeIndex/clientToken on
// `MxNativeSession.WriteAsync`, src/MxNativeClient/MxNativeSession.cs:165-185).
session.write_with_completion(
    "TestChildObject.TestInt",
    MxValue::Int32(123),
    /* client_token: */ 0x1001u32,
).await?;

session.write_with_timestamp(
    "TestChildObject.TestInt",
    MxValue::Int32(123),
    SystemTime::now(),
).await?;

// Verified Write — the LMX `WriteSecured` always takes TWO user ids:
// `(currentUserId, verifierUserId, value)`. "Single-user secured write" is
// callers passing the same id twice; it is NOT a separate API surface.
// `WriteSecured2` adds a timestamp; it does NOT add a second token. The
// `0x80004021` failure observed in `MxNativeSession.WriteSecuredAsync` is a
// defect of the .NET native reimplementation, not a real LMX constraint
// (verified against wwtools/mxaccesscli/docs/api-notes.md:60-72,87-95 and
// wwtools/mxaccesscli/src/MxAccess.Cli/Commands/WriteCommand.cs:44-101,151-155,196-199;
// the LMX proxy CLI exposes `WriteSecured(currentUserId, verifierUserId, value)`
// and treats single-user secured writes as `currentUserId == verifierUserId`).
session.write_secured(
    "TestChildObject.TestInt",
    MxValue::Int32(123),
    SecurityContext { current_user_id, verifier_user_id },
).await?;

// Timestamped Verified Write — adds a `SystemTime`. Same two-id token shape;
// matches `WriteSecured2(currentUserId, verifierUserId, value, timestamp)`.
session.write_secured_at(
    "TestChildObject.TestInt",
    MxValue::Int32(123),
    SystemTime::now(),
    SecurityContext { current_user_id, verifier_user_id },
).await?;

// `read` is implemented as `subscribe + first-result + drop`, mirroring
// `MxNativeSession.ReadAsync` (src/MxNativeClient/MxNativeSession.cs:312-359),
// which requires a positive timeout and unadvises on completion or timeout.
let DataChange { value, status, timestamp, .. } =
    session.read("TestChildObject.TestInt", Duration::from_secs(5)).await?;

All operations take a &str reference name (e.g. "TestObject.Attribute") and resolve it to a MxReferenceHandle internally via the configured Resolver. Default resolver is mxaccess-galaxy::SqlResolver; an in-memory resolver is provided for tests (InMemoryResolver::insert("Tag", metadata)).

Session::write returns Ok(()) once the LMX Write RPC has been acknowledged at the transport level — it does not await a wire WriteCompleted frame. Callers that need write-completion semantics must use Session::write_with_completion(reference, value, client_token), which threads client_token through the MxNativeSession.WriteAsync clientToken parameter (src/MxNativeClient/MxNativeSession.cs:165-185) and returns when the matching 5-byte OperationStatus callback frame is decoded. See 70-risks-and-open-questions.md R3/R4 for the cases where the proven stack does not emit a completion frame.

Session::read takes a Duration timeout (matching the .NET reference's mandatory TimeSpan timeout argument and ArgumentOutOfRangeException for non-positive values, src/MxNativeClient/MxNativeSession.cs:312-321). Implementation is subscribe + first-result + drop; the drop guard guarantees UnAdvise runs on the success, error, and timeout paths so no advise is leaked, mirroring the finally/Unsubscribe block at src/MxNativeClient/MxNativeSession.cs:351-358.

Subscriptions

use futures::StreamExt;

let mut subscription = session.subscribe("TestChildObject.TestInt").await?;

while let Some(change) = subscription.next().await {
    let change = change?;
    println!("{} = {:?} @ {:?} (status={:?})",
        change.reference, change.value, change.timestamp, change.status);
}

// Drop the subscription to unadvise.
drop(subscription);

Subscription implements Stream<Item = Result<DataChange, Error>>.

Err semantics — non-terminal for parse errors, terminal after connection loss. The stream yields Err items for parse-level failures (the consumer can keep polling — the next inbound frame will be delivered). The stream ends with None after a final Err for connection-loss / subscription-end events; once None is observed, no further items will be yielded. This split mirrors the .NET reference's two events: CallbackReceived is raised per-record after a successful parse (src/MxNativeClient/MxNativeSession.cs:603-606), while UnparsedCallbackReceived is raised when NmxSubscriptionMessage.ParseProcessDataReceivedBody throws — without tearing down other live subscriptions (src/MxNativeClient/MxNativeSession.cs:590-601).

Consumers wanting strict parity with UnparsedCallbackReceived (raw bytes for unparseable frames) can subscribe to Subscription::raw_callbacks() -> Stream<Item = RawCallback>, or simply inspect the Err variants and keep polling: Error::Protocol(ProtocolError::Decode { .. }) corresponds to the .NET unparsed-callback path and is non-terminal; Error::Connection(_) is terminal and the next next().await returns None.

Dropping the subscription sends UnAdvise (best-effort, fire-and-forget — see drop semantics below) and removes the correlation from the session's subscription map.

For batch subscriptions:

let mut sub = session.subscribe_many(&["A.X", "A.Y", "A.Z"]).await?;
while let Some(change) = sub.next().await {
    let change = change?;
    // change.reference identifies which of the three
}

Multi-tag subscriptions multiplex on the same callback channel and demultiplex by correlation ID inside the session task. The wire still issues one Advise per tag — the Rust API does not pretend a single advise covers many tags.

subscribe_many is non-atomic. The implementation issues one AdviseSupervisory per tag in a loop, mirroring MxNativeSession.SubscribeAsync which produces a fresh CorrelationId and calls _service.AdviseSupervisory per tag (src/MxNativeClient/MxNativeSession.cs:250-270). If the Nth advise fails, the first N-1 succeed and remain advised. The error is surfaced through the returned Result; the partial set lives on the returned Subscription, and the consumer chooses how to recover:

  • Subscription::drop to unadvise the partial set, or
  • retry the failed tag (e.g. via a follow-up session.subscribe(failed_tag).await).

subscribe_many_atomic (an all-or-nothing variant that rolls back on partial failure) is not provided in V1 — the proven .NET reference has no atomic equivalent and the wire offers no transactional advise primitive.

Buffered subscriptions (NMX only)

subscribe_buffered mirrors the .NET reference's MxNativeSession.RegisterBufferedItemAsync, which takes the dual-string itemDefinition/itemContext split plus an itemHandle: int (src/MxNativeClient/MxNativeSession.cs:272-310). The Rust API takes the same parameters explicitly via a BufferedSubscription request struct — no convenience overload that hides the (definition, context, item_handle) triple is offered, because a consumer that omits any of the three cannot reproduce the captured Frida bodies.

let mut sub = session.subscribe_buffered(BufferedSubscription {
    definition:   "TestMachine_001.TestHistoryValue",
    context:      "",                  // optional, may be empty per RegisterBufferedItemAsync:279
    item_handle:  0x1001,              // i32 mapped to NmxReferenceRegistrationMessage.ItemHandle
    options: BufferedOptions {
        sample_interval: Duration::from_millis(100),
        max_queue_size:  1000,
    },
}).await?;

while let Some(batch) = sub.next().await {
    for sample in batch?.samples {
        // sample is a DataChange
    }
}

subscribe_buffered is gated on the nmx feature and returns Stream<Item = Result<DataChangeBatch, Error>>. The deployed AVEVA provider may emit single-sample batches even with buffering enabled — see risk R2 in 70-risks-and-open-questions.md. The API does not synthesise batches; if the wire returns one sample per record, the batch is samples.len() == 1.

Recovery

Recovery is caller-driven, not automatic. This mirrors the .NET reference: MxNativeSession.RecoverConnection and RecoverConnectionAsync(policy) are explicit entry points the consumer invokes — the session never auto-starts recovery on heartbeat loss (src/MxNativeClient/MxNativeSession.cs:383-440). The Rust API exposes the same shape:

// Caller invokes recovery when they choose, with the policy of their choice.
session.recover_connection(RecoveryPolicy::exponential(
    Duration::from_secs(1),
    /* max_attempts: */ 5,
    Duration::from_secs(60),
)).await?;

Heartbeat-loss surfaces as Error::Connection(...) on subsequent operations (write/read/subscribe). The caller decides whether to call recover_connection based on observed errors and recovery events. There is no implicit recovery thread that resurrects the session in the background.

In-flight calls during recovery fail. While recover_connection is running, in-flight writes/reads/subscriptions against the previous transport are not paused, replayed, or migrated — they observe the existing transport being torn down and fail with Error::Connection(...). The .NET reference's _recoveryActive is similarly just an inbound-callback annotation flag (src/MxNativeClient/MxNativeSession.cs:444,472); concurrent calls against _service are not interlocked. The Rust design does not promise "the future resumes on the new connection" — the caller is responsible for retrying after a successful RecoveryEvent::Completed.

let mut events = session.recovery_events();
while let Some(ev) = events.next().await {
    tracing::info!(?ev, "recovery event");
}

pub enum RecoveryEvent {
    Started   { attempt: u32 },
    Failed    { attempt: u32, error: Error, will_retry: bool },
    Completed { duration: Duration },
}

The event stream mirrors the .NET reference's MxNativeSession.RecoveryAttemptStarted/Failed/Completed events one-for-one (src/MxNativeClient/MxNativeSession.cs:121-123).

Cancellation

Three cancellation surfaces, in order of preference for callers:

  1. Drop the future or handle. Dropping a Subscription signals UnAdvise to the long-lived connection task; dropping a Session signals UnregisterEngine to the same task. Drop never spawns a new Tokio task — instead, Subscription holds a tokio::sync::oneshot::Sender<UnAdviseRequest> (or equivalent unbounded channel sender), and its Drop impl sends a message that the connection task drains in its event loop. Drop is therefore safe outside a runtime context and during runtime shutdown — it does not call tokio::spawn from Drop.
  2. tokio_util::sync::CancellationToken. Long operations (subscribe_buffered, recovery, connect) accept an optional CancellationToken via *_with_cancellation variants.
  3. Timeout. tokio::time::timeout works on every operation because async fns are cancel-correct by construction.

Known runtime-shutdown leak. If the Tokio runtime is shut down before the connection task has drained pending UnAdvise/UnregisterEngine messages, those frames are not delivered to the wire. Production code should avoid this by calling Session::shutdown(timeout).await (see below) on the orderly-exit path. The .NET reference has the same shape: Dispose runs synchronously and calls _service.UnAdvise(...) per live subscription before UnregisterEngine (src/MxNativeClient/MxNativeSession.cs:483-495). The Rust async equivalent is Session::shutdown; relying on Drop alone for cleanup is best-effort and documented as such.

Error model

mxaccess::Error is a thiserror-derived #[non_exhaustive] enum. See 50-error-model.md for the full surface. All operations return Result<T, Error>; no panics in the public surface.

A non-Ok MxStatus on a returned DataChange is data, not an error. A non-Ok status on read/write/subscribe's synchronous result is an Err. This mirrors the .NET reference and is the only sensible split: subscription frames carry status that callers want to inspect ("stale" or "uncertain" is still data); operation results are pass/fail.

Threading model

  • Multi-thread Tokio is the default. Single-thread is supported (no Send-only future escapes the local thread) but not the recommended deployment.
  • All public types are Send + Sync.
  • The codec is Send + Sync trivially (immutable after parse, owns its bytes).
  • The session uses tokio::sync::Mutex for the per-connection RPC channel state and tokio::sync::watch for recovery state. No parking_lot::Mutex — sync mutexes inside async paths cause hidden blocking.

Observability

tracing spans on every public operation: tracing::instrument on register, write, subscribe, read, recover. Span fields: reference, correlation_id, transport (nmx|asb), engine_ids. Span events for state transitions (subscription added, callback received, recovery started, recovery completed).

Recommended subscriber filter:

mxaccess::session=info,mxaccess::transport=debug

Optional metrics feature exposes:

  • counters: mxaccess_writes_total, mxaccess_subscribes_total, mxaccess_callbacks_total, mxaccess_recoveries_total
  • histograms: mxaccess_operation_latency_seconds{op="write"|"read"|"subscribe"}

Trait Transport

Transport uses native async fn in trait (AFIT, stable in Rust 1.75+) and is generic-only. Consumers parameterise sites that take a transport with impl Transport or a generic <T: Transport> bound. The trait is not dyn-compatible — Box<dyn Transport> is not supported in V1 — and that limitation is intentional: the design already uses Session::connect<T: Transport>(...)-style generic entry points, so giving up dyn Transport costs nothing the design currently uses, while keeping zero per-call heap allocation in the hot path. (#[async_trait] was the alternative; it allows dyn Transport but boxes a Pin<Box<dyn Future>> per call — accepted as a known cost only if a future revision needs runtime polymorphism.)

pub trait Transport: Send + Sync {
    fn capabilities(&self) -> TransportCapabilities;

    async fn register(&self, options: &ConnectionOptions) -> Result<RegisteredEngine, Error>;
    async fn unregister(&self, engine: &RegisteredEngine) -> Result<(), Error>;

    async fn write(
        &self,
        handle: &MxReferenceHandle,
        value: &MxValue,
        opts: WriteOptions,
    ) -> Result<WriteOutcome, Error>;

    async fn advise(
        &self,
        handle: &MxReferenceHandle,
        opts: AdviseOptions,
    ) -> Result<SubscriptionHandle, Error>;

    async fn unadvise(&self, sub: SubscriptionHandle) -> Result<(), Error>;

    fn callbacks(&self) -> CallbackStream;
}

pub struct TransportCapabilities {
    pub timestamped_writes: bool,
    pub secured_writes: bool,
    pub buffered_subscriptions: bool,
    pub supervisory_advise: bool,
    pub operation_complete_events: bool,
}

Two implementations: NmxTransport (capabilities mostly true) and AsbTransport (capabilities mostly false; see 70-risks-and-open-questions.md).

Calling a NMX-only API on an AsbTransport returns Error::Unsupported { operation: Cow<'static, str>, transport: TransportKind }. The Cow is used so the variant accepts both interned &'static str literals (the common case) and runtime-formatted operation names without allocation when not required; TransportKind is the corresponding enum TransportKind { Nmx, Asb } (matching the transport span field at line 204). The Session may also pre-flight via transport.capabilities() to give a better error message before issuing the call.

Public surface (re-exports)

pub use mxaccess_codec::{
    MxReferenceHandle, MxStatus, MxStatusCategory, MxStatusSource,
    MxValue, MxValueKind, MxDataType,
};

pub struct Session;
pub struct Subscription;

pub struct DataChange {
    pub reference: Arc<str>,
    pub value: MxValue,
    /// Legacy 16-bit OPC quality (e.g. `0xC0` = 192 = "Good"). Distinct from
    /// `status: MxStatus` — both are surfaced because real MxAccess
    /// (`OnDataChange(hServer, hItem, MxDataType, value, quality, timestamp,
    /// statuses)`) carries them as separate fields. Verified against
    /// `wwtools/mxaccesscli/docs/api-notes.md:104-105` ("quality on
    /// OnDataChange is the legacy 16-bit OPC quality value … the richer state
    /// lives in the statuses[] array") and
    /// `wwtools/mxaccesscli/src/MxAccess.Cli/Mx/MxUpdate.cs:13-22,39-65`.
    /// Earlier drafts of this design dropped `quality` as redundant with
    /// `status`; that was a parity break and has been restored.
    pub quality: u16,
    pub timestamp: SystemTime,
    pub status: MxStatus,
}
pub struct DataChangeBatch {
    pub reference: Arc<str>,
    pub samples: Vec<DataChange>,
}
// Note on `quality`: `DataChange` carries a 16-bit OPC quality alongside
// `status: MxStatus`. They are distinct: `quality` is the legacy wire field
// (e.g. `0xC0` = "Good"), preserved for parity with real MxAccess
// (`OnDataChange` exposes both). The canonical projection from a wire record
// to a typed status is `Record.ToDataChangeStatus()` in the .NET reference
// (src/MxNativeClient/MxNativeSession.cs:70), which produces an `MxStatus`.
// Consumers that need the historical "quality" view (Good/Uncertain/Bad on
// bits 7..6) read it from `status.detail` and `status.category` rather than
// from a redundant raw u16. Exposing both invites callers to use the wrong
// field; the codec's `MxStatus` is the single source of truth.

pub struct ConnectionOptions;
pub struct WriteOptions;
pub struct AdviseOptions;
pub struct BufferedOptions;
pub struct RecoveryPolicy;
pub enum   RecoveryEvent { Started { .. }, Failed { .. }, Completed { .. } }

pub struct Credentials;
pub struct SecurityContext;

pub enum Error;  // see 50-error-model.md

mxaccess-compat (optional)

LMXProxyServer-shaped methods on top of Session. Each method maps one-to-one to a Session::* operation:

let server = mxaccess_compat::Server::new(session);
let server_handle = server.register("MyClient");
let item_handle = server.add_item(server_handle, "TestObject.TestInt").await?;
server.advise(server_handle, item_handle).await?;
server.write(server_handle, item_handle, MxValue::Int32(123), user_id).await?;

Useful for porting code that depends on the COM API shape; not the recommended consumer surface. Not COM-visible by itself; a separate mxaccess-compat-com crate (deferred to post-V1) will register windows-rs-generated COM classes that wrap this.

Examples

End-to-end consumer-grade examples in rust/examples/:

  • connect-write-read.rs — open session, write, read back
  • subscribe.rs — long-running subscription
  • subscribe-buffered.rs — buffered subscription (NMX feature)
  • asb-subscribe.rs — ASB subscription
  • recovery.rs — recovery policy + recovery events
  • multi-tag.rssubscribe_many on a 100-tag set
  • secured-write.rswrite_secured (no timestamp) and write_secured_at (timestamped), each taking (current_user_id, verifier_user_id); demonstrates both single-user (current == verifier) and two-person verification paths

Code sample (full)

use std::time::Duration;
use futures::StreamExt;
use mxaccess::{Session, ConnectionOptions, Credentials, MxValue, RecoveryPolicy};

#[tokio::main(flavor = "multi_thread")]
async fn main() -> anyhow::Result<()> {
    tracing_subscriber::fmt::init();

    let mut session = Session::connect(
        ConnectionOptions::nmx("localhost")
            .galaxy_id(1)
            .platform_id(1)
            .engine_id(420)
            .credentials(Credentials::current_user())
            .galaxy_db(std::env::var("MX_GALAXY_DB")?)
            .recovery(RecoveryPolicy::exponential(
                Duration::from_secs(1), 5, Duration::from_secs(60),
            )),
    ).await?;

    session.write("TestChildObject.TestInt", MxValue::Int32(123)).await?;

    let mut sub = session.subscribe("TestChildObject.TestInt").await?;
    while let Some(change) = sub.next().await {
        let change = change?;
        tracing::info!(value = ?change.value, ts = ?change.timestamp, "data change");
    }
    Ok(())
}

What the async layer does not do

  • It does not pretend to be sync. There is no block_on shortcut in the public API.
  • It does not support multiple async runtimes. Tokio only.
  • It does not transmit raw bytes. All operations go through the codec.
  • It does not retry by default. Recovery is opt-in via ConnectionOptions::recovery(...) at session construction. There is no runtime mutator for RecoveryPolicy: Session is Clone + Arc<SessionInner>-backed, so a &mut self setter on a clone would not propagate to other clones; the policy is fixed once and shared by every clone. Consumers that need a different policy build a new Session.
  • It does not own a thread pool. It uses Tokio's runtime.
  • It does not synthesise events the wire does not produce. WriteCompleted only fires when the proven 5-byte completion frame is observed; otherwise RawStatus is exposed verbatim through Session::operation_status_events(). See 70-risks-and-open-questions.md R3/R4.