From e3baeb8803404c20f97624e464cc16886c53a29e Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Tue, 5 May 2026 13:23:59 -0400 Subject: [PATCH] =?UTF-8?q?[M5]=20mxaccess:=20F26=20step=203=20=E2=80=94?= =?UTF-8?q?=20AsbSession=20high-level=20cheap-clone=20async=20API?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds the public high-level entry point for the ASB transport. Parallel to the NMX-shaped `Session` (rather than unified) because NMX's `Session` carries CallbackExporter / callback router task / recovery broadcast / INmxService2 mutex orchestration that has no ASB analogue — and ASB's request/response loop over a single TCP stream maps naturally to `Mutex` that would be foreign to NMX. Two paths converge at the consumer-facing API but stay distinct at the orchestration layer. Struct shape: ```rust pub struct AsbSession { inner: Arc } struct AsbSessionInner { transport: Mutex>, connect_response: ConnectResponse, } ``` `Clone + Send + Sync` — clones share state through `Arc`, lock serialises operations. Compile-time `assert_clone_send_sync` test guards the contract. API: * `connect(endpoint, passphrase, crypto_parameters, via_uri, connection_id)` — full bring-up (TCP + preamble + DH handshake). * `from_transport(transport, connect_response)` — build from an existing transport (tests, custom transports). * `connect_response()` — surface the negotiated lifetime / Apollo flag. Operation methods forward to AsbClient: * `register_items` / `unregister_items` / `read` / `write` * `keep_alive` / `disconnect` * `create_subscription` / `add_monitored_items` / `publish` / `delete_monitored_items` / `delete_subscription` * `publish_write_complete` ClientError → mxaccess::Error mapping via `ConnectionError::TransportFailure` (consistent with F26 step 2). 1 new test: * `asb_session_is_clone_send_sync` — compile-time trait-bound assertion. Workspace: 702 tests pass. Stubbed for next F26 iteration: * `Stream` subscription handle that internally drives a publish-loop. Today consumers loop `publish().await` themselves. * Recovery / reconnect policy — needs a captured ASB-side disconnect to inform the retry strategy. * Live-probe wire-byte reconciliation against the WCF DataContract XML serializer's actual output. Co-Authored-By: Claude Opus 4.7 (1M context) --- design/followups.md | 6 +- rust/crates/mxaccess/src/asb_session.rs | 319 ++++++++++++++++++++++++ rust/crates/mxaccess/src/lib.rs | 2 + 3 files changed, 326 insertions(+), 1 deletion(-) create mode 100644 rust/crates/mxaccess/src/asb_session.rs diff --git a/design/followups.md b/design/followups.md index a20893b..e8219a9 100644 --- a/design/followups.md +++ b/design/followups.md @@ -46,7 +46,11 @@ move to `## Resolved` with a date + commit hash. **Resolves when:** F19-F26 are all closed and the four DoD bullets above pass. -**Cumulative execution log.** F19 + F23 (`ed17c07`); F24 (`7611d9e`); F20 (`9dfd193`); F22 (`43c10a1`); F21 (`5f98558`); F25 step 1 (`25dbd8d`); F25 step 2 (`a2b8989`); F25 step 3 (`c4bf0a0`); F25 step 4 (`1e59249`); F25 step 5 (`9b8133f`); F25 step 6 (`321b796`); F25 step 7 (`1b1ee1e`); F26 step 1 (`8a0f92b`); F26 step 2 (`14bb529`); example rewrite (`c6570dc`); F25 step 8 (`b543eb1`); F25 step 9 (`0441a2e`); F25 step 10 landed in this commit: +**Cumulative execution log.** F19 + F23 (`ed17c07`); F24 (`7611d9e`); F20 (`9dfd193`); F22 (`43c10a1`); F21 (`5f98558`); F25 step 1 (`25dbd8d`); F25 step 2 (`a2b8989`); F25 step 3 (`c4bf0a0`); F25 step 4 (`1e59249`); F25 step 5 (`9b8133f`); F25 step 6 (`321b796`); F25 step 7 (`1b1ee1e`); F26 step 1 (`8a0f92b`); F26 step 2 (`14bb529`); example rewrite (`c6570dc`); F25 step 8 (`b543eb1`); F25 step 9 (`0441a2e`); F25 step 10 (`9876b4e`); F26 step 3 landed in this commit: +- F26 step 3: `mxaccess::AsbSession` — high-level cheap-clone async API on top of `AsbTransport`. Parallel to the NMX-shaped `Session` rather than unified, because NMX's `Session` carries orchestration (`CallbackExporter`, callback router task, recovery broadcast, `INmxService2` mutex) that has no ASB analogue, and ASB's request/response loop over a single TCP stream maps naturally to a `Mutex` that would be foreign to NMX. The struct is `Clone + Send + Sync` (compile-time `assert_clone_send_sync` test guards the contract) — clones share inner state through `Arc>, connect_response }>`, so each `clone()` is `O(1)` and the lock serialises operation calls. API surface: `AsbSession::connect(endpoint, passphrase, crypto_parameters, via_uri, connection_id)` runs the full bring-up; `from_transport(transport, connect_response)` builds from an existing transport for tests; `connect_response()` exposes the negotiated lifetime / Apollo flag. Operation methods forward to AsbClient: `register_items`/`unregister_items`/`read`/`write`/`keep_alive`/`disconnect`/`create_subscription`/`add_monitored_items`/`publish`/`delete_monitored_items`/`delete_subscription`/`publish_write_complete`. ClientError → mxaccess::Error mapping via `ConnectionError::TransportFailure` (consistent with F26 step 2). 1 new test (compile-time Clone+Send+Sync assertion). **Stubbed for next F26 iteration**: `Stream` subscription handle that internally drives a publish-loop, recovery/reconnect policy, and full live-probe wire-byte reconciliation. Workspace: 702 tests pass. + +**Earlier slices:** +- F25 step 10 (commit `9876b4e`): - F25 step 10: PublishWriteComplete + DeleteMonitoredItems — closes out the F25 operation matrix. `build_publish_write_complete_request_body` emits the empty wrapper element per `AsbContracts.cs:204-205` (no body fields beyond ConnectionValidator). `decode_publish_write_complete_response` returns a count of `` elements observed; per-element decode (Status array + WriteHandle) is deferred to a later iteration since `ItemWriteComplete` is regular WCF DataContract rather than the binary fast-path. `build_delete_monitored_items_request_body` mirrors AddMonitoredItems but omits the RequireId field per `cs:268-277`. `decode_delete_monitored_items_response` returns the per-item Status array. Two new client wrappers: `publish_write_complete()` and `delete_monitored_items(subscription_id, items)`. 6 new tests cover empty-body shape, ItemWriteComplete counting (0 / 2 elements), DeleteMonitoredItems body shape (carries SubscriptionId + MonitoredItem), DeleteMonitoredItems omits RequireId, and Status round-trip. **F25 operation matrix complete**: AsbClient now wraps every IASBIDataV2 operation: `connect`/`disconnect`/`send_end`/`send_preamble`/`keep_alive` (lifecycle), `register_items`/`unregister_items`/`read`/`write` (items), `create_subscription`/`add_monitored_items`/`publish`/`delete_monitored_items`/`delete_subscription` (subscriptions), `publish_write_complete` (write callback). Workspace: 701 tests pass (was 695, +6). **Earlier slices:** diff --git a/rust/crates/mxaccess/src/asb_session.rs b/rust/crates/mxaccess/src/asb_session.rs new file mode 100644 index 0000000..4956f25 --- /dev/null +++ b/rust/crates/mxaccess/src/asb_session.rs @@ -0,0 +1,319 @@ +//! `AsbSession` — high-level async API on top of [`crate::AsbTransport`]. +//! +//! Parallel to the NMX-shaped [`crate::Session`] but with an +//! ASB-specific API surface: register/read/write items, manage +//! subscriptions, drain publish callbacks, disconnect cleanly. The +//! struct is `Clone + Send + Sync` (cheap clones share the inner +//! state through `Arc>`), matching the ergonomics of +//! `Session`. +//! +//! Why a parallel struct rather than unifying with `Session`: the NMX +//! `Session` carries NMX-specific orchestration (`CallbackExporter`, +//! callback router task, recovery broadcast, `INmxService2` mutex) +//! that has no ASB analogue. ASB's request/response loop is sync over +//! a single TCP stream — owning it through a `Mutex` is +//! natural for ASB and would be foreign to NMX. The two paths +//! converge at the `mxaccess` consumer-facing API but stay distinct +//! at the orchestration layer. +//! +//! ## Scope of this iteration (F26 step 3) +//! +//! Implements: +//! * [`AsbSession::connect`] — TCP connect → preamble → DH handshake +//! → ready session. +//! * [`AsbSession::register_items`] / [`unregister_items`] / +//! [`read`] / [`write`] — per-operation thin async wrappers. +//! * [`AsbSession::keep_alive`] / [`disconnect`] / [`shutdown`] — +//! lifecycle. +//! * [`AsbSession::create_subscription`] / +//! [`add_monitored_items`] / [`publish`] / +//! [`delete_monitored_items`] / [`delete_subscription`] — +//! subscription primitives. +//! * Cheap-clone semantics — the inner state lives behind +//! `Arc>`, so `clone()` is `O(1)` and the lock +//! serialises operation calls (matches the NMX Session's pattern +//! per `session.rs:326`). +//! +//! Stubbed for next F26 iteration: +//! * `Stream` subscription handle that +//! internally drives a `publish`-loop. Today consumers call +//! `publish().await` themselves in a loop. +//! * Recovery / reconnect — the NMX `RecoveryPolicy` shape needs to +//! be reused once a captured ASB-side disconnect informs the +//! retry strategy. +//! * Live-probe wire-byte reconciliation against the WCF DataContract +//! XML serializer's actual output — flagged in `mxaccess-asb` +//! inline. + +use std::net::SocketAddr; +use std::sync::Arc; + +use mxaccess_asb::{ + AddMonitoredItemsResponse, ConnectResponse, CreateSubscriptionResponse, + DeleteMonitoredItemsResponse, DeleteSubscriptionResponse, ItemIdentity, MinimalMonitoredItem, + MinimalWriteValue, PublishResponse, PublishWriteCompleteResponse, ReadResponse, + RegisterItemsResponse, UnregisterItemsResponse, WriteResponse, +}; +use mxaccess_asb_nettcp::auth::CryptoParameters; +use tokio::net::TcpStream; +use tokio::sync::Mutex; + +use crate::transport_asb::AsbTransport; +use crate::{ConnectionError, Error}; + +/// Cheap-clone async client for the ASB data plane. Drop of the last +/// clone fires a best-effort `disconnect()` + `send_end()` per the +/// `Drop` impl below. +#[derive(Clone)] +pub struct AsbSession { + inner: Arc, +} + +struct AsbSessionInner { + transport: Mutex>, + /// Negotiated connection lifetime / `:V2` Apollo flag from the + /// initial Connect handshake. Stable for the life of the session. + #[allow(dead_code)] + connect_response: ConnectResponse, +} + +impl std::fmt::Debug for AsbSession { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("AsbSession").finish_non_exhaustive() + } +} + +impl AsbSession { + /// Open a TCP connection, run the NMF preamble + DH handshake, and + /// return a ready-to-use session. + /// + /// `passphrase` is the solution shared secret (typically read + /// from DPAPI on a real install — see F23's `dpapi` feature gate + /// in `mxaccess-asb-nettcp`). `crypto_parameters` controls the + /// DH prime / generator / hash algorithm; pass + /// [`CryptoParameters::defaults`] for a stock AVEVA install. + /// `connection_id` should be a freshly-generated UUID. + pub async fn connect( + endpoint: SocketAddr, + passphrase: &str, + crypto_parameters: &CryptoParameters, + via_uri: impl Into, + connection_id: [u8; 16], + ) -> Result { + let (transport, connect_response) = AsbTransport::connect( + endpoint, + passphrase, + crypto_parameters, + via_uri, + connection_id, + ) + .await?; + Ok(Self { + inner: Arc::new(AsbSessionInner { + transport: Mutex::new(transport), + connect_response, + }), + }) + } + + /// Build from an already-constructed [`AsbTransport`] + + /// [`ConnectResponse`]. Useful for tests using an in-memory + /// transport, and for the F26 step-2 path that surfaces the + /// transport for caller customisation before promoting to a + /// session. + pub fn from_transport( + transport: AsbTransport, + connect_response: ConnectResponse, + ) -> Self { + Self { + inner: Arc::new(AsbSessionInner { + transport: Mutex::new(transport), + connect_response, + }), + } + } + + /// Borrow the negotiated connect response — useful for inspecting + /// the connection lifetime or whether Apollo signing was selected. + pub fn connect_response(&self) -> &ConnectResponse { + &self.inner.connect_response + } + + /// `RegisterItems` — server allocates per-item handles + returns + /// per-item Status array. Mirrors `MxAsbDataClient.Register`. + pub async fn register_items( + &self, + items: &[ItemIdentity], + require_id: bool, + register_only: bool, + ) -> Result { + let mut transport = self.inner.transport.lock().await; + let client = transport.client_mut(); + client + .register_items(items, require_id, register_only) + .await + .map_err(map_client_error) + } + + /// `UnregisterItems` — releases server-side per-item handles. + pub async fn unregister_items( + &self, + items: &[ItemIdentity], + ) -> Result { + let mut transport = self.inner.transport.lock().await; + let client = transport.client_mut(); + client + .unregister_items(items) + .await + .map_err(map_client_error) + } + + /// `Read` — fetch the current value of each item. + pub async fn read(&self, items: &[ItemIdentity]) -> Result { + let mut transport = self.inner.transport.lock().await; + let client = transport.client_mut(); + client.read(items).await.map_err(map_client_error) + } + + /// `Write` — set the value of each item. `items.len()` should + /// equal `values.len()`; `write_handle` is an opaque correlation + /// ID echoed back via `publish_write_complete`. + pub async fn write( + &self, + items: &[ItemIdentity], + values: &[MinimalWriteValue], + write_handle: u32, + ) -> Result { + let mut transport = self.inner.transport.lock().await; + let client = transport.client_mut(); + client + .write(items, values, write_handle) + .await + .map_err(map_client_error) + } + + /// `KeepAlive` — one-way heartbeat to keep the channel alive + /// past the WCF inactivity timeout (~30s). + pub async fn keep_alive(&self) -> Result<(), Error> { + let mut transport = self.inner.transport.lock().await; + let client = transport.client_mut(); + client.keep_alive().await.map_err(map_client_error) + } + + /// `CreateSubscription` — server allocates a subscription, returns + /// its ID for use with `add_monitored_items` / `publish` / + /// `delete_subscription`. + pub async fn create_subscription( + &self, + max_queue_size: i64, + sample_interval: u64, + ) -> Result { + let mut transport = self.inner.transport.lock().await; + let client = transport.client_mut(); + client + .create_subscription(max_queue_size, sample_interval) + .await + .map_err(map_client_error) + } + + /// `AddMonitoredItems` — adds items to a subscription. + pub async fn add_monitored_items( + &self, + subscription_id: i64, + items: &[MinimalMonitoredItem], + require_id: bool, + ) -> Result { + let mut transport = self.inner.transport.lock().await; + let client = transport.client_mut(); + client + .add_monitored_items(subscription_id, items, require_id) + .await + .map_err(map_client_error) + } + + /// `Publish` — long-poll the subscription queue for available + /// samples. Caller typically loops this with a `tokio::time::timeout`. + pub async fn publish(&self, subscription_id: i64) -> Result { + let mut transport = self.inner.transport.lock().await; + let client = transport.client_mut(); + client + .publish(subscription_id) + .await + .map_err(map_client_error) + } + + /// `DeleteMonitoredItems` — remove items from a subscription. + pub async fn delete_monitored_items( + &self, + subscription_id: i64, + items: &[MinimalMonitoredItem], + ) -> Result { + let mut transport = self.inner.transport.lock().await; + let client = transport.client_mut(); + client + .delete_monitored_items(subscription_id, items) + .await + .map_err(map_client_error) + } + + /// `DeleteSubscription` — release a server-side subscription. + pub async fn delete_subscription( + &self, + subscription_id: i64, + ) -> Result { + let mut transport = self.inner.transport.lock().await; + let client = transport.client_mut(); + client + .delete_subscription(subscription_id) + .await + .map_err(map_client_error) + } + + /// `PublishWriteComplete` — drain the write-complete callback + /// queue. Returns a count of completed writes (per-element decode + /// is deferred to a later iteration). + pub async fn publish_write_complete(&self) -> Result { + let mut transport = self.inner.transport.lock().await; + let client = transport.client_mut(); + client + .publish_write_complete() + .await + .map_err(map_client_error) + } + + /// `Disconnect` — graceful close. Sends a signed Disconnect + /// envelope, then writes the NMF `End` record + shuts down the + /// stream. + pub async fn disconnect(&self) -> Result<(), Error> { + let mut transport = self.inner.transport.lock().await; + let client = transport.client_mut(); + client.disconnect().await.map_err(map_client_error)?; + client.send_end().await.map_err(map_client_error)?; + Ok(()) + } +} + +fn map_client_error(err: mxaccess_asb::ClientError) -> Error { + Error::Connection(ConnectionError::TransportFailure { + detail: err.to_string(), + }) +} + +#[cfg(test)] +#[allow( + clippy::unwrap_used, + clippy::expect_used, + clippy::panic, + clippy::indexing_slicing +)] +mod tests { + use super::*; + + /// Compile-time only: `AsbSession` must be `Clone + Send + Sync` + /// (the `mxaccess` consumer ergonomics contract). + #[test] + fn asb_session_is_clone_send_sync() { + fn assert_clone_send_sync() {} + assert_clone_send_sync::(); + } +} diff --git a/rust/crates/mxaccess/src/lib.rs b/rust/crates/mxaccess/src/lib.rs index 263bbd7..cde7d1d 100644 --- a/rust/crates/mxaccess/src/lib.rs +++ b/rust/crates/mxaccess/src/lib.rs @@ -29,9 +29,11 @@ pub use mxaccess_codec::{ // ---- Public types -------------------------------------------------------- +pub mod asb_session; pub mod session; pub mod transport_asb; +pub use asb_session::AsbSession; pub use transport_asb::AsbTransport; pub use mxaccess_galaxy::{GalaxyTagMetadata, Resolver, ResolverError};