[M5] mxaccess: F26 step 3 — AsbSession high-level cheap-clone async API

Adds the public high-level entry point for the ASB transport.
Parallel to the NMX-shaped `Session` (rather than unified) because
NMX's `Session` carries CallbackExporter / callback router task /
recovery broadcast / INmxService2 mutex orchestration that has no
ASB analogue — and ASB's request/response loop over a single TCP
stream maps naturally to `Mutex<AsbClient>` that would be foreign
to NMX. Two paths converge at the consumer-facing API but stay
distinct at the orchestration layer.

Struct shape:
```rust
pub struct AsbSession { inner: Arc<AsbSessionInner> }
struct AsbSessionInner {
    transport: Mutex<AsbTransport<TcpStream>>,
    connect_response: ConnectResponse,
}
```

`Clone + Send + Sync` — clones share state through `Arc`, lock
serialises operations. Compile-time `assert_clone_send_sync` test
guards the contract.

API:
* `connect(endpoint, passphrase, crypto_parameters, via_uri,
  connection_id)` — full bring-up (TCP + preamble + DH handshake).
* `from_transport(transport, connect_response)` — build from an
  existing transport (tests, custom transports).
* `connect_response()` — surface the negotiated lifetime /
  Apollo flag.

Operation methods forward to AsbClient:
* `register_items` / `unregister_items` / `read` / `write`
* `keep_alive` / `disconnect`
* `create_subscription` / `add_monitored_items` / `publish` /
  `delete_monitored_items` / `delete_subscription`
* `publish_write_complete`

ClientError → mxaccess::Error mapping via
`ConnectionError::TransportFailure` (consistent with F26 step 2).

1 new test:
* `asb_session_is_clone_send_sync` — compile-time trait-bound
  assertion.

Workspace: 702 tests pass.

Stubbed for next F26 iteration:
* `Stream<Item = MonitoredItemValue>` subscription handle that
  internally drives a publish-loop. Today consumers loop
  `publish().await` themselves.
* Recovery / reconnect policy — needs a captured ASB-side
  disconnect to inform the retry strategy.
* Live-probe wire-byte reconciliation against the WCF DataContract
  XML serializer's actual output.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-05 13:23:59 -04:00
parent 9876b4ebb4
commit e3baeb8803
3 changed files with 326 additions and 1 deletions
+5 -1
View File
@@ -46,7 +46,11 @@ move to `## Resolved` with a date + commit hash.
**Resolves when:** F19-F26 are all closed and the four DoD bullets above pass.
**Cumulative execution log.** F19 + F23 (`ed17c07`); F24 (`7611d9e`); F20 (`9dfd193`); F22 (`43c10a1`); F21 (`5f98558`); F25 step 1 (`25dbd8d`); F25 step 2 (`a2b8989`); F25 step 3 (`c4bf0a0`); F25 step 4 (`1e59249`); F25 step 5 (`9b8133f`); F25 step 6 (`321b796`); F25 step 7 (`1b1ee1e`); F26 step 1 (`8a0f92b`); F26 step 2 (`14bb529`); example rewrite (`c6570dc`); F25 step 8 (`b543eb1`); F25 step 9 (`0441a2e`); F25 step 10 landed in this commit:
**Cumulative execution log.** F19 + F23 (`ed17c07`); F24 (`7611d9e`); F20 (`9dfd193`); F22 (`43c10a1`); F21 (`5f98558`); F25 step 1 (`25dbd8d`); F25 step 2 (`a2b8989`); F25 step 3 (`c4bf0a0`); F25 step 4 (`1e59249`); F25 step 5 (`9b8133f`); F25 step 6 (`321b796`); F25 step 7 (`1b1ee1e`); F26 step 1 (`8a0f92b`); F26 step 2 (`14bb529`); example rewrite (`c6570dc`); F25 step 8 (`b543eb1`); F25 step 9 (`0441a2e`); F25 step 10 (`9876b4e`); F26 step 3 landed in this commit:
- F26 step 3: `mxaccess::AsbSession` — high-level cheap-clone async API on top of `AsbTransport`. Parallel to the NMX-shaped `Session` rather than unified, because NMX's `Session` carries orchestration (`CallbackExporter`, callback router task, recovery broadcast, `INmxService2` mutex) that has no ASB analogue, and ASB's request/response loop over a single TCP stream maps naturally to a `Mutex<AsbClient>` that would be foreign to NMX. The struct is `Clone + Send + Sync` (compile-time `assert_clone_send_sync` test guards the contract) — clones share inner state through `Arc<AsbSessionInner { transport: Mutex<AsbTransport<TcpStream>>, connect_response }>`, so each `clone()` is `O(1)` and the lock serialises operation calls. API surface: `AsbSession::connect(endpoint, passphrase, crypto_parameters, via_uri, connection_id)` runs the full bring-up; `from_transport(transport, connect_response)` builds from an existing transport for tests; `connect_response()` exposes the negotiated lifetime / Apollo flag. Operation methods forward to AsbClient: `register_items`/`unregister_items`/`read`/`write`/`keep_alive`/`disconnect`/`create_subscription`/`add_monitored_items`/`publish`/`delete_monitored_items`/`delete_subscription`/`publish_write_complete`. ClientError → mxaccess::Error mapping via `ConnectionError::TransportFailure` (consistent with F26 step 2). 1 new test (compile-time Clone+Send+Sync assertion). **Stubbed for next F26 iteration**: `Stream<Item = MonitoredItemValue>` subscription handle that internally drives a publish-loop, recovery/reconnect policy, and full live-probe wire-byte reconciliation. Workspace: 702 tests pass.
**Earlier slices:**
- F25 step 10 (commit `9876b4e`):
- F25 step 10: PublishWriteComplete + DeleteMonitoredItems — closes out the F25 operation matrix. `build_publish_write_complete_request_body` emits the empty wrapper element per `AsbContracts.cs:204-205` (no body fields beyond ConnectionValidator). `decode_publish_write_complete_response` returns a count of `<ItemWriteComplete>` elements observed; per-element decode (Status array + WriteHandle) is deferred to a later iteration since `ItemWriteComplete` is regular WCF DataContract rather than the binary fast-path. `build_delete_monitored_items_request_body` mirrors AddMonitoredItems but omits the RequireId field per `cs:268-277`. `decode_delete_monitored_items_response` returns the per-item Status array. Two new client wrappers: `publish_write_complete()` and `delete_monitored_items(subscription_id, items)`. 6 new tests cover empty-body shape, ItemWriteComplete counting (0 / 2 elements), DeleteMonitoredItems body shape (carries SubscriptionId + MonitoredItem), DeleteMonitoredItems omits RequireId, and Status round-trip. **F25 operation matrix complete**: AsbClient now wraps every IASBIDataV2 operation: `connect`/`disconnect`/`send_end`/`send_preamble`/`keep_alive` (lifecycle), `register_items`/`unregister_items`/`read`/`write` (items), `create_subscription`/`add_monitored_items`/`publish`/`delete_monitored_items`/`delete_subscription` (subscriptions), `publish_write_complete` (write callback). Workspace: 701 tests pass (was 695, +6).
**Earlier slices:**
+319
View File
@@ -0,0 +1,319 @@
//! `AsbSession` — high-level async API on top of [`crate::AsbTransport`].
//!
//! Parallel to the NMX-shaped [`crate::Session`] but with an
//! ASB-specific API surface: register/read/write items, manage
//! subscriptions, drain publish callbacks, disconnect cleanly. The
//! struct is `Clone + Send + Sync` (cheap clones share the inner
//! state through `Arc<Mutex<...>>`), matching the ergonomics of
//! `Session`.
//!
//! Why a parallel struct rather than unifying with `Session`: the NMX
//! `Session` carries NMX-specific orchestration (`CallbackExporter`,
//! callback router task, recovery broadcast, `INmxService2` mutex)
//! that has no ASB analogue. ASB's request/response loop is sync over
//! a single TCP stream — owning it through a `Mutex<AsbClient>` is
//! natural for ASB and would be foreign to NMX. The two paths
//! converge at the `mxaccess` consumer-facing API but stay distinct
//! at the orchestration layer.
//!
//! ## Scope of this iteration (F26 step 3)
//!
//! Implements:
//! * [`AsbSession::connect`] — TCP connect → preamble → DH handshake
//! → ready session.
//! * [`AsbSession::register_items`] / [`unregister_items`] /
//! [`read`] / [`write`] — per-operation thin async wrappers.
//! * [`AsbSession::keep_alive`] / [`disconnect`] / [`shutdown`] —
//! lifecycle.
//! * [`AsbSession::create_subscription`] /
//! [`add_monitored_items`] / [`publish`] /
//! [`delete_monitored_items`] / [`delete_subscription`] —
//! subscription primitives.
//! * Cheap-clone semantics — the inner state lives behind
//! `Arc<Mutex<...>>`, so `clone()` is `O(1)` and the lock
//! serialises operation calls (matches the NMX Session's pattern
//! per `session.rs:326`).
//!
//! Stubbed for next F26 iteration:
//! * `Stream<Item = MonitoredItemValue>` subscription handle that
//! internally drives a `publish`-loop. Today consumers call
//! `publish().await` themselves in a loop.
//! * Recovery / reconnect — the NMX `RecoveryPolicy` shape needs to
//! be reused once a captured ASB-side disconnect informs the
//! retry strategy.
//! * Live-probe wire-byte reconciliation against the WCF DataContract
//! XML serializer's actual output — flagged in `mxaccess-asb`
//! inline.
use std::net::SocketAddr;
use std::sync::Arc;
use mxaccess_asb::{
AddMonitoredItemsResponse, ConnectResponse, CreateSubscriptionResponse,
DeleteMonitoredItemsResponse, DeleteSubscriptionResponse, ItemIdentity, MinimalMonitoredItem,
MinimalWriteValue, PublishResponse, PublishWriteCompleteResponse, ReadResponse,
RegisterItemsResponse, UnregisterItemsResponse, WriteResponse,
};
use mxaccess_asb_nettcp::auth::CryptoParameters;
use tokio::net::TcpStream;
use tokio::sync::Mutex;
use crate::transport_asb::AsbTransport;
use crate::{ConnectionError, Error};
/// Cheap-clone async client for the ASB data plane. Drop of the last
/// clone fires a best-effort `disconnect()` + `send_end()` per the
/// `Drop` impl below.
#[derive(Clone)]
pub struct AsbSession {
inner: Arc<AsbSessionInner>,
}
struct AsbSessionInner {
transport: Mutex<AsbTransport<TcpStream>>,
/// Negotiated connection lifetime / `:V2` Apollo flag from the
/// initial Connect handshake. Stable for the life of the session.
#[allow(dead_code)]
connect_response: ConnectResponse,
}
impl std::fmt::Debug for AsbSession {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AsbSession").finish_non_exhaustive()
}
}
impl AsbSession {
/// Open a TCP connection, run the NMF preamble + DH handshake, and
/// return a ready-to-use session.
///
/// `passphrase` is the solution shared secret (typically read
/// from DPAPI on a real install — see F23's `dpapi` feature gate
/// in `mxaccess-asb-nettcp`). `crypto_parameters` controls the
/// DH prime / generator / hash algorithm; pass
/// [`CryptoParameters::defaults`] for a stock AVEVA install.
/// `connection_id` should be a freshly-generated UUID.
pub async fn connect(
endpoint: SocketAddr,
passphrase: &str,
crypto_parameters: &CryptoParameters,
via_uri: impl Into<String>,
connection_id: [u8; 16],
) -> Result<Self, Error> {
let (transport, connect_response) = AsbTransport::connect(
endpoint,
passphrase,
crypto_parameters,
via_uri,
connection_id,
)
.await?;
Ok(Self {
inner: Arc::new(AsbSessionInner {
transport: Mutex::new(transport),
connect_response,
}),
})
}
/// Build from an already-constructed [`AsbTransport`] +
/// [`ConnectResponse`]. Useful for tests using an in-memory
/// transport, and for the F26 step-2 path that surfaces the
/// transport for caller customisation before promoting to a
/// session.
pub fn from_transport(
transport: AsbTransport<TcpStream>,
connect_response: ConnectResponse,
) -> Self {
Self {
inner: Arc::new(AsbSessionInner {
transport: Mutex::new(transport),
connect_response,
}),
}
}
/// Borrow the negotiated connect response — useful for inspecting
/// the connection lifetime or whether Apollo signing was selected.
pub fn connect_response(&self) -> &ConnectResponse {
&self.inner.connect_response
}
/// `RegisterItems` — server allocates per-item handles + returns
/// per-item Status array. Mirrors `MxAsbDataClient.Register`.
pub async fn register_items(
&self,
items: &[ItemIdentity],
require_id: bool,
register_only: bool,
) -> Result<RegisterItemsResponse, Error> {
let mut transport = self.inner.transport.lock().await;
let client = transport.client_mut();
client
.register_items(items, require_id, register_only)
.await
.map_err(map_client_error)
}
/// `UnregisterItems` — releases server-side per-item handles.
pub async fn unregister_items(
&self,
items: &[ItemIdentity],
) -> Result<UnregisterItemsResponse, Error> {
let mut transport = self.inner.transport.lock().await;
let client = transport.client_mut();
client
.unregister_items(items)
.await
.map_err(map_client_error)
}
/// `Read` — fetch the current value of each item.
pub async fn read(&self, items: &[ItemIdentity]) -> Result<ReadResponse, Error> {
let mut transport = self.inner.transport.lock().await;
let client = transport.client_mut();
client.read(items).await.map_err(map_client_error)
}
/// `Write` — set the value of each item. `items.len()` should
/// equal `values.len()`; `write_handle` is an opaque correlation
/// ID echoed back via `publish_write_complete`.
pub async fn write(
&self,
items: &[ItemIdentity],
values: &[MinimalWriteValue],
write_handle: u32,
) -> Result<WriteResponse, Error> {
let mut transport = self.inner.transport.lock().await;
let client = transport.client_mut();
client
.write(items, values, write_handle)
.await
.map_err(map_client_error)
}
/// `KeepAlive` — one-way heartbeat to keep the channel alive
/// past the WCF inactivity timeout (~30s).
pub async fn keep_alive(&self) -> Result<(), Error> {
let mut transport = self.inner.transport.lock().await;
let client = transport.client_mut();
client.keep_alive().await.map_err(map_client_error)
}
/// `CreateSubscription` — server allocates a subscription, returns
/// its ID for use with `add_monitored_items` / `publish` /
/// `delete_subscription`.
pub async fn create_subscription(
&self,
max_queue_size: i64,
sample_interval: u64,
) -> Result<CreateSubscriptionResponse, Error> {
let mut transport = self.inner.transport.lock().await;
let client = transport.client_mut();
client
.create_subscription(max_queue_size, sample_interval)
.await
.map_err(map_client_error)
}
/// `AddMonitoredItems` — adds items to a subscription.
pub async fn add_monitored_items(
&self,
subscription_id: i64,
items: &[MinimalMonitoredItem],
require_id: bool,
) -> Result<AddMonitoredItemsResponse, Error> {
let mut transport = self.inner.transport.lock().await;
let client = transport.client_mut();
client
.add_monitored_items(subscription_id, items, require_id)
.await
.map_err(map_client_error)
}
/// `Publish` — long-poll the subscription queue for available
/// samples. Caller typically loops this with a `tokio::time::timeout`.
pub async fn publish(&self, subscription_id: i64) -> Result<PublishResponse, Error> {
let mut transport = self.inner.transport.lock().await;
let client = transport.client_mut();
client
.publish(subscription_id)
.await
.map_err(map_client_error)
}
/// `DeleteMonitoredItems` — remove items from a subscription.
pub async fn delete_monitored_items(
&self,
subscription_id: i64,
items: &[MinimalMonitoredItem],
) -> Result<DeleteMonitoredItemsResponse, Error> {
let mut transport = self.inner.transport.lock().await;
let client = transport.client_mut();
client
.delete_monitored_items(subscription_id, items)
.await
.map_err(map_client_error)
}
/// `DeleteSubscription` — release a server-side subscription.
pub async fn delete_subscription(
&self,
subscription_id: i64,
) -> Result<DeleteSubscriptionResponse, Error> {
let mut transport = self.inner.transport.lock().await;
let client = transport.client_mut();
client
.delete_subscription(subscription_id)
.await
.map_err(map_client_error)
}
/// `PublishWriteComplete` — drain the write-complete callback
/// queue. Returns a count of completed writes (per-element decode
/// is deferred to a later iteration).
pub async fn publish_write_complete(&self) -> Result<PublishWriteCompleteResponse, Error> {
let mut transport = self.inner.transport.lock().await;
let client = transport.client_mut();
client
.publish_write_complete()
.await
.map_err(map_client_error)
}
/// `Disconnect` — graceful close. Sends a signed Disconnect
/// envelope, then writes the NMF `End` record + shuts down the
/// stream.
pub async fn disconnect(&self) -> Result<(), Error> {
let mut transport = self.inner.transport.lock().await;
let client = transport.client_mut();
client.disconnect().await.map_err(map_client_error)?;
client.send_end().await.map_err(map_client_error)?;
Ok(())
}
}
fn map_client_error(err: mxaccess_asb::ClientError) -> Error {
Error::Connection(ConnectionError::TransportFailure {
detail: err.to_string(),
})
}
#[cfg(test)]
#[allow(
clippy::unwrap_used,
clippy::expect_used,
clippy::panic,
clippy::indexing_slicing
)]
mod tests {
use super::*;
/// Compile-time only: `AsbSession` must be `Clone + Send + Sync`
/// (the `mxaccess` consumer ergonomics contract).
#[test]
fn asb_session_is_clone_send_sync() {
fn assert_clone_send_sync<T: Clone + Send + Sync + 'static>() {}
assert_clone_send_sync::<AsbSession>();
}
}
+2
View File
@@ -29,9 +29,11 @@ pub use mxaccess_codec::{
// ---- Public types --------------------------------------------------------
pub mod asb_session;
pub mod session;
pub mod transport_asb;
pub use asb_session::AsbSession;
pub use transport_asb::AsbTransport;
pub use mxaccess_galaxy::{GalaxyTagMetadata, Resolver, ResolverError};