[M5] mxaccess: F26 step 2 — AsbTransport::connect TCP+preamble+handshake

Adds the `tokio::net::TcpStream`-specialised async constructor that
owns the full transport-bring-up sequence:

  TCP connect → NMF preamble → DH Connect → AuthenticateMe (one-way)

Signature:
```
async fn connect(
    endpoint: SocketAddr,
    passphrase: &str,
    crypto_parameters: &CryptoParameters,
    via_uri: impl Into<String>,
    connection_id: [u8; 16],
) -> Result<(AsbTransport<TcpStream>, ConnectResponse), Error>
```

Returns the `ConnectResponse` alongside the transport so callers can
inspect the negotiated `connection_lifetime` (the `:V2` suffix
toggles Apollo vs Baktun encryption — see F23).

New error variant: `ConnectionError::TransportFailure { detail }`
covers all transport-bring-up failure modes (NMF / NBFX / auth /
peer Fault). The underlying error type is intentionally erased to
keep the public taxonomy small; `detail` carries the Display
representation.

Errors are mapped at the AsbClient / AuthError boundary via private
`map_client_error` / `map_auth_error` helpers.

1 new test:
* `connect_to_unreachable_endpoint_surfaces_connection_error` — TCP
  connect to 127.0.0.1:1 (TCPMUX-reserved) cleanly errors without
  panicking. Smoke test for the constructor signature + error path.

Stubbed for F26 step 3:
* `Session::connect_asb` constructor — the SessionInner refactor to
  host both NMX + ASB transports under one struct is heavier than
  this iteration's scope.
* Operation-routing layer that maps ASB result types (ItemStatus,
  RuntimeValue) back to mxaccess types (MxStatus, DataChange,
  MxValue).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-05 12:14:16 -04:00
parent 8a0f92b6bc
commit 14bb5297a8
3 changed files with 103 additions and 4 deletions
+7
View File
@@ -301,6 +301,13 @@ pub enum ConnectionError {
CallbackProxyMissing,
#[error("engine not registered (UninitializedObject / ERROR_INVALID_STATE)")]
EngineNotRegistered,
/// Transport bring-up failed during preamble exchange or
/// authentication handshake. `detail` is the underlying error
/// message — the original error type is intentionally erased to
/// keep the public taxonomy small. ASB-specific (F26 step 2);
/// `EngineNotRegistered` covers the analogous NMX failure mode.
#[error("transport bring-up failed: {detail}")]
TransportFailure { detail: String },
}
#[derive(Debug, thiserror::Error)]
+91 -3
View File
@@ -31,10 +31,14 @@
//! `CreateSubscription` + `AddMonitoredItems` + `Publish`-callback
//! pipeline; the F25 subscription operations are not yet wired up.
use mxaccess_asb::AsbClient;
use tokio::io::{AsyncRead, AsyncWrite};
use std::net::SocketAddr;
use crate::{Transport, TransportCapabilities, TransportKind};
use mxaccess_asb::{AsbClient, ClientError, ConnectResponse};
use mxaccess_asb_nettcp::auth::{AsbAuthenticator, CryptoParameters};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::TcpStream;
use crate::{Error, Transport, TransportCapabilities, TransportKind};
/// `Transport` implementation for the ASB (`net.tcp` + binary-message-
/// encoder) data plane. Owns the underlying [`AsbClient`].
@@ -66,6 +70,68 @@ impl<T: AsyncRead + AsyncWrite + Unpin + Send + 'static> AsbTransport<T> {
}
}
impl AsbTransport<TcpStream> {
/// `tokio::net::TcpStream`-specialised constructor: opens the TCP
/// connection, runs the F20 preamble exchange, and runs the F25
/// step-6 DH `Connect` + `AuthenticateMe` handshake. Returns a
/// transport ready for operation calls.
///
/// The `via_uri` is the `net.tcp://host:port/path` URL the peer
/// expects in the [MS-NMF] `ViaRecord`. `passphrase` is the
/// solution-shared secret (typically read from DPAPI on a real
/// install — see F23's `dpapi` feature gate; tests / CI pass it
/// directly via `AsbCredentials::shared_secret(...)` once that
/// type lands).
///
/// `crypto_parameters` controls the DH prime / generator / hash
/// algorithm; pass [`CryptoParameters::defaults`] for a stock
/// AVEVA install.
///
/// `connection_id` should typically be a freshly-generated UUID
/// (e.g. `Uuid::new_v4().into_bytes()`). Tests pin it for
/// determinism.
///
/// # Errors
///
/// Surfaces all transport-bring-up failure modes as
/// [`Error::Connection`]:
/// * TCP connect fails.
/// * NMF preamble exchange fails (peer responded with `Fault` or
/// an unexpected record).
/// * DH `Connect` operation fails.
/// * Encrypted authentication-data assembly fails.
/// * `AuthenticateMe` write fails.
pub async fn connect(
endpoint: SocketAddr,
passphrase: &str,
crypto_parameters: &CryptoParameters,
via_uri: impl Into<String>,
connection_id: [u8; 16],
) -> Result<(Self, ConnectResponse), Error> {
let stream = TcpStream::connect(endpoint).await.map_err(Error::Io)?;
let authenticator = AsbAuthenticator::new(passphrase, crypto_parameters, connection_id)
.map_err(map_auth_error)?;
let mut client = AsbClient::new(stream, authenticator, via_uri);
client.send_preamble().await.map_err(map_client_error)?;
let response = client.connect().await.map_err(map_client_error)?;
Ok((Self::new(client), response))
}
}
fn map_client_error(err: ClientError) -> Error {
use crate::ConnectionError;
Error::Connection(ConnectionError::TransportFailure {
detail: err.to_string(),
})
}
fn map_auth_error(err: mxaccess_asb_nettcp::auth::AuthError) -> Error {
use crate::ConnectionError;
Error::Connection(ConnectionError::TransportFailure {
detail: err.to_string(),
})
}
/// Compile-time only: `AsbTransport` must be `Send + Sync + 'static`
/// (the `Transport` trait bound). Sync is provided by `AsbClient`'s
/// internal lack of interior mutability over non-Sync types — the
@@ -116,6 +182,28 @@ mod tests {
assert_eq!(transport.kind(), TransportKind::Asb);
}
#[tokio::test]
async fn connect_to_unreachable_endpoint_surfaces_connection_error() {
// Bind to a port that won't accept connections. Address
// 127.0.0.1:1 is reserved (TCPMUX) and almost always closed,
// so connect() should fail immediately. Whether it surfaces
// as Io or Connection depends on the platform; we just assert
// that it errors cleanly without panicking.
let endpoint = "127.0.0.1:1".parse::<std::net::SocketAddr>().unwrap();
let result = AsbTransport::<TcpStream>::connect(
endpoint,
"test-passphrase",
&CryptoParameters::defaults(),
"net.tcp://127.0.0.1:1/asb",
[0u8; 16],
)
.await;
assert!(
result.is_err(),
"expected connect to unreachable endpoint to fail"
);
}
#[test]
fn asb_transport_capabilities_disable_buffered_and_activate_suspend() {
let (client_end, _peer) = tokio::io::duplex(64);