diff --git a/design/followups.md b/design/followups.md index e9b1fef..1dfbc61 100644 --- a/design/followups.md +++ b/design/followups.md @@ -56,7 +56,7 @@ move to `## Resolved` with a date + commit hash. - ~~F32~~: resolved (commit ``) via option (b) — three-type live coverage is the deployable maximum; missing types are Galaxy-provisioning-gated. - **F28**: canonical-XML signing currently covers only the `[XmlSerializerFormat]` ops (AuthenticateMe / Disconnect / KeepAlive / RegisterItems / UnregisterItems). Read / Write / CreateSubscription / AddMonitoredItems / Publish / etc. still sign over NBFX wire bytes via the legacy fallback. Live Read works by virtue of those ops not requiring HMAC validation server-side under the empty `hashAlgorithm` setting (registry default), so this is latent rather than blocking. Promote to P0 once a deployment with non-empty `hashAlgorithm` is in scope. - ~~F29~~: resolved (commit ``) — `nbfs.rs` re-aligned to the canonical `[MC-NBFS]` table from `dotnet/wcf` `ServiceModelStringsVersion1`. -- **F26 stream subscription**: `Stream` over a publish-loop is still stubbed in `AsbSession`. Tracked under F25 step 8 / F26 step 3 in the cumulative log. +- ~~F26 stream subscription~~: resolved (commit ``) — `AsbSession::subscribe(subscription_id)` returns an `AsbSubscription: Stream>` driven by an internal `tokio::spawn`'d publish-loop. Drop of the subscription aborts the loop. Per-`PublishResponse` `values` array is fanned out as individual stream items; transport errors are delivered as the final stream item before termination. Inner `publish_loop` helper is split out so it's testable in isolation against any closure-based fake `publish_fn`. 3 new tests pin: compile-time `Stream + Send + Unpin`, multi-batch + terminal-error round-trip, consumer-drop short-circuits the publisher. Workspace 718 → 721 tests. **Cumulative execution log.** F19 + F23 (`ed17c07`); F24 (`7611d9e`); F20 (`9dfd193`); F22 (`43c10a1`); F21 (`5f98558`); F25 step 1 (`25dbd8d`); F25 step 2 (`a2b8989`); F25 step 3 (`c4bf0a0`); F25 step 4 (`1e59249`); F25 step 5 (`9b8133f`); F25 step 6 (`321b796`); F25 step 7 (`1b1ee1e`); F26 step 1 (`8a0f92b`); F26 step 2 (`14bb529`); example rewrite (`c6570dc`); F25 step 8 (`b543eb1`); F25 step 9 (`0441a2e`); F25 step 10 (`9876b4e`); F26 step 3 (``); **F25 live-bring-up reconciliation** (this commit): - F25 live-bring-up reconciliation: live `asb-subscribe` + `asb-relay` (TCP middleman) capture-and-diff against AVEVA's MxDataProvider on Windows. Five concrete fixes landed: diff --git a/rust/crates/mxaccess/src/asb_session.rs b/rust/crates/mxaccess/src/asb_session.rs index 4956f25..b8e2875 100644 --- a/rust/crates/mxaccess/src/asb_session.rs +++ b/rust/crates/mxaccess/src/asb_session.rs @@ -16,7 +16,7 @@ //! converge at the `mxaccess` consumer-facing API but stay distinct //! at the orchestration layer. //! -//! ## Scope of this iteration (F26 step 3) +//! ## Scope //! //! Implements: //! * [`AsbSession::connect`] — TCP connect → preamble → DH handshake @@ -29,15 +29,15 @@ //! [`add_monitored_items`] / [`publish`] / //! [`delete_monitored_items`] / [`delete_subscription`] — //! subscription primitives. +//! * [`AsbSession::subscribe`] — returns an [`AsbSubscription`] +//! `Stream>` driven by a +//! background publish-loop. Drop of the stream aborts the loop. //! * Cheap-clone semantics — the inner state lives behind //! `Arc>`, so `clone()` is `O(1)` and the lock //! serialises operation calls (matches the NMX Session's pattern //! per `session.rs:326`). //! -//! Stubbed for next F26 iteration: -//! * `Stream` subscription handle that -//! internally drives a `publish`-loop. Today consumers call -//! `publish().await` themselves in a loop. +//! Stubbed for later iteration: //! * Recovery / reconnect — the NMX `RecoveryPolicy` shape needs to //! be reused once a captured ASB-side disconnect informs the //! retry strategy. @@ -46,21 +46,32 @@ //! inline. use std::net::SocketAddr; +use std::pin::Pin; use std::sync::Arc; +use std::task::{Context, Poll}; +use futures_util::Stream; use mxaccess_asb::{ AddMonitoredItemsResponse, ConnectResponse, CreateSubscriptionResponse, DeleteMonitoredItemsResponse, DeleteSubscriptionResponse, ItemIdentity, MinimalMonitoredItem, - MinimalWriteValue, PublishResponse, PublishWriteCompleteResponse, ReadResponse, - RegisterItemsResponse, UnregisterItemsResponse, WriteResponse, + MinimalWriteValue, MonitoredItemValue, PublishResponse, PublishWriteCompleteResponse, + ReadResponse, RegisterItemsResponse, UnregisterItemsResponse, WriteResponse, }; use mxaccess_asb_nettcp::auth::CryptoParameters; use tokio::net::TcpStream; -use tokio::sync::Mutex; +use tokio::sync::{mpsc, Mutex}; +use tokio::task::JoinHandle; +use tokio_stream::wrappers::ReceiverStream; use crate::transport_asb::AsbTransport; use crate::{ConnectionError, Error}; +/// Channel buffer for [`AsbSubscription`]'s publish-loop. 256 samples is +/// generous for the typical sample-rate budget (1-100 Hz) — bounded so +/// a slow consumer eventually back-pressures the publish task instead +/// of growing memory unbounded. +const SUBSCRIPTION_CHANNEL_CAPACITY: usize = 256; + /// Cheap-clone async client for the ASB data plane. Drop of the last /// clone fires a best-effort `disconnect()` + `send_end()` per the /// `Drop` impl below. @@ -291,6 +302,113 @@ impl AsbSession { client.send_end().await.map_err(map_client_error)?; Ok(()) } + + /// Spawn a background publish-loop on this subscription and return + /// a [`Stream`] that yields each delivered [`MonitoredItemValue`]. + /// + /// The returned [`AsbSubscription`] is `Stream>`. + /// The internal task calls [`AsbSession::publish`] in a tight loop; + /// each `PublishResponse`'s `values` array is fanned out as + /// individual stream items. Per-item `status` from the server is + /// **not** surfaced (the field is opaque per-item and rarely + /// actionable for the streaming consumer); a future iteration may + /// wrap each value in a richer struct if the need is real. + /// + /// The stream terminates when: + /// - the consumer drops the [`AsbSubscription`] (the publish task + /// is `abort()`-ed in `Drop`); or + /// - the underlying `publish()` call returns an [`Error`] — that + /// error is delivered as the final stream item, then `None`. + /// + /// `subscription_id` must come from a prior + /// [`AsbSession::create_subscription`] + + /// [`AsbSession::add_monitored_items`] sequence. Calling + /// `delete_subscription()` while the stream is alive will cause + /// the next `publish()` to fail and the stream to terminate + /// gracefully. + #[must_use = "AsbSubscription is a Stream — drop it explicitly to cancel"] + pub fn subscribe(&self, subscription_id: i64) -> AsbSubscription { + let (tx, rx) = mpsc::channel(SUBSCRIPTION_CHANNEL_CAPACITY); + let session = self.clone(); + let join = tokio::spawn(async move { + publish_loop( + move || { + let s = session.clone(); + async move { s.publish(subscription_id).await } + }, + tx, + ) + .await; + }); + AsbSubscription { + inner: ReceiverStream::new(rx), + join: Some(join), + } + } +} + +/// Inner publish-loop body — testable in isolation by passing any +/// closure that produces `Result` futures. +/// +/// Exits on the first transport error (after delivering it as the +/// final item) and on consumer-drop (when `tx.send` returns an error +/// because the receiver was closed). +async fn publish_loop( + mut publish_fn: F, + tx: mpsc::Sender>, +) where + F: FnMut() -> Fut, + Fut: std::future::Future>, +{ + loop { + match publish_fn().await { + Ok(response) => { + for value in response.values { + if tx.send(Ok(value)).await.is_err() { + return; // consumer dropped the stream + } + } + } + Err(e) => { + // Best-effort delivery of the terminal error; a closed + // channel here means the consumer already gave up. + let _ = tx.send(Err(e)).await; + return; + } + } + } +} + +/// `Stream>` over an +/// ASB subscription. Created via [`AsbSession::subscribe`]. +/// +/// Drop aborts the publish-loop so the server-side queue stops +/// being drained. +pub struct AsbSubscription { + inner: ReceiverStream>, + join: Option>, +} + +impl std::fmt::Debug for AsbSubscription { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("AsbSubscription").finish_non_exhaustive() + } +} + +impl Stream for AsbSubscription { + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.get_mut().inner).poll_next(cx) + } +} + +impl Drop for AsbSubscription { + fn drop(&mut self) { + if let Some(handle) = self.join.take() { + handle.abort(); + } + } } fn map_client_error(err: mxaccess_asb::ClientError) -> Error { @@ -308,6 +426,11 @@ fn map_client_error(err: mxaccess_asb::ClientError) -> Error { )] mod tests { use super::*; + use futures_util::StreamExt; + use mxaccess_asb::ItemStatus; + use mxaccess_codec::asb_variant::{AsbStatus, AsbVariant, RuntimeValue}; + use std::sync::Arc as StdArc; + use std::sync::atomic::{AtomicUsize, Ordering}; /// Compile-time only: `AsbSession` must be `Clone + Send + Sync` /// (the `mxaccess` consumer ergonomics contract). @@ -316,4 +439,94 @@ mod tests { fn assert_clone_send_sync() {} assert_clone_send_sync::(); } + + /// Compile-time only: `AsbSubscription` must be `Stream + Send + Unpin`. + /// `Unpin` matters because consumers typically `.next().await` it + /// directly without pinning. `Send` matters because the tokio + /// runtime moves it across threads. + #[test] + fn asb_subscription_is_stream_send_unpin() { + fn assert_stream_send_unpin() + where + T: Stream> + Send + Unpin + 'static, + { + } + assert_stream_send_unpin::(); + } + + fn fake_value(idx: i32) -> MonitoredItemValue { + MonitoredItemValue { + item: ItemIdentity::absolute_by_name(format!("Tag{idx}")), + value: RuntimeValue { + timestamp_binary: 0, + timestamp_specified: false, + value: AsbVariant::from_i32(idx), + status: AsbStatus::default(), + }, + user_data: AsbVariant::empty(), + } + } + + fn fake_response(values: Vec) -> PublishResponse { + PublishResponse { + status: Vec::::new(), + values, + } + } + + #[tokio::test] + async fn publish_loop_delivers_values_then_terminates_on_error() { + let (tx, rx) = mpsc::channel(8); + let calls = StdArc::new(AtomicUsize::new(0)); + let calls_clone = calls.clone(); + let publish_fn = move || { + let n = calls_clone.fetch_add(1, Ordering::Relaxed); + async move { + if n == 0 { + Ok(fake_response(vec![fake_value(1), fake_value(2)])) + } else if n == 1 { + Ok(fake_response(vec![fake_value(3)])) + } else { + Err(Error::Connection(ConnectionError::TransportFailure { + detail: "synthetic terminal error".to_string(), + })) + } + } + }; + publish_loop(publish_fn, tx).await; + // Drain: 3 Ok values, then 1 Err, then channel-closed. + let mut stream = ReceiverStream::new(rx); + let mut oks = Vec::new(); + let mut errs = 0; + while let Some(item) = stream.next().await { + match item { + Ok(v) => oks.push(v), + Err(_) => errs += 1, + } + } + assert_eq!(oks.len(), 3, "three Ok values delivered"); + assert_eq!(errs, 1, "one terminal Err delivered"); + assert_eq!(calls.load(Ordering::Relaxed), 3, "publish_fn called 3x"); + } + + #[tokio::test] + async fn publish_loop_exits_when_consumer_drops_channel() { + let (tx, rx) = mpsc::channel(2); + let calls = StdArc::new(AtomicUsize::new(0)); + let calls_clone = calls.clone(); + let publish_fn = move || { + calls_clone.fetch_add(1, Ordering::Relaxed); + async move { Ok(fake_response(vec![fake_value(7), fake_value(8), fake_value(9)])) } + }; + // Drop the receiver immediately — first send triggers exit. + drop(rx); + publish_loop(publish_fn, tx).await; + // Loop must exit; tx is dropped on return. The publisher might + // make 1 call before discovering the closed channel (tx.send + // is what surfaces the closure). + assert!( + calls.load(Ordering::Relaxed) >= 1, + "publish_fn called at least once" + ); + } }