[F26 stream] mxaccess: AsbSession::subscribe — Stream<Item = MonitoredItemValue>
rust / build / test / clippy / fmt (push) Has been cancelled

Closes the last F26 stub from the M5 status block. New
AsbSession::subscribe(subscription_id) returns an AsbSubscription
that impls Stream<Item = Result<MonitoredItemValue, Error>>. An
internal tokio::spawn'd publish-loop drains the subscription queue
via the existing AsbSession::publish() and fans each
PublishResponse's `values` array out as individual stream items.

Termination semantics:
  - Drop of AsbSubscription calls JoinHandle::abort() — the publish
    task stops draining the server-side queue (the .NET reference
    pattern at MxAsbDataClient.cs uses the same task-cancellation
    shape).
  - Transport error from publish() is delivered as the final stream
    item; the loop returns and the channel closes.
  - Receiver-drop (consumer stops polling) is detected when
    tx.send returns Err — the loop exits without making more
    publish calls.

The inner publish_loop helper takes any FnMut() -> Future<Result<...>>
so it's testable in isolation (no live ASB endpoint required).

Per-item ItemStatus from the server is intentionally not surfaced
on the stream: the field is opaque per-item and rarely actionable
for the streaming consumer. A richer struct can wrap each value if
that need surfaces.

3 new tests pin:
  - asb_subscription_is_stream_send_unpin (compile-time bounds);
  - publish_loop_delivers_values_then_terminates_on_error
    (3 Ok values from 2 batches, then 1 terminal Err);
  - publish_loop_exits_when_consumer_drops_channel.

New deps used (already in mxaccess Cargo.toml): futures_util::Stream,
tokio::sync::mpsc, tokio_stream::wrappers::ReceiverStream,
tokio::task::JoinHandle.

Workspace: 718 → 721 tests. Default-feature clippy clean.
mxaccess crate-level doc updated to drop the "stubbed for next F26
iteration" note for the subscription stream.

design/followups.md F18 M5 status block updated: F26 stream
subscription marked resolved.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-06 01:10:22 -04:00
parent 8e695b9347
commit f2f22dfcd1
2 changed files with 222 additions and 9 deletions
+1 -1
View File
@@ -56,7 +56,7 @@ move to `## Resolved` with a date + commit hash.
- ~~F32~~: resolved (commit `<this commit>`) via option (b) — three-type live coverage is the deployable maximum; missing types are Galaxy-provisioning-gated.
- **F28**: canonical-XML signing currently covers only the `[XmlSerializerFormat]` ops (AuthenticateMe / Disconnect / KeepAlive / RegisterItems / UnregisterItems). Read / Write / CreateSubscription / AddMonitoredItems / Publish / etc. still sign over NBFX wire bytes via the legacy fallback. Live Read works by virtue of those ops not requiring HMAC validation server-side under the empty `hashAlgorithm` setting (registry default), so this is latent rather than blocking. Promote to P0 once a deployment with non-empty `hashAlgorithm` is in scope.
- ~~F29~~: resolved (commit `<this commit>`) — `nbfs.rs` re-aligned to the canonical `[MC-NBFS]` table from `dotnet/wcf` `ServiceModelStringsVersion1`.
- **F26 stream subscription**: `Stream<Item = MonitoredItemValue>` over a publish-loop is still stubbed in `AsbSession`. Tracked under F25 step 8 / F26 step 3 in the cumulative log.
- ~~F26 stream subscription~~: resolved (commit `<this commit>`) — `AsbSession::subscribe(subscription_id)` returns an `AsbSubscription: Stream<Item = Result<MonitoredItemValue, Error>>` driven by an internal `tokio::spawn`'d publish-loop. Drop of the subscription aborts the loop. Per-`PublishResponse` `values` array is fanned out as individual stream items; transport errors are delivered as the final stream item before termination. Inner `publish_loop` helper is split out so it's testable in isolation against any closure-based fake `publish_fn`. 3 new tests pin: compile-time `Stream + Send + Unpin`, multi-batch + terminal-error round-trip, consumer-drop short-circuits the publisher. Workspace 718 → 721 tests.
**Cumulative execution log.** F19 + F23 (`ed17c07`); F24 (`7611d9e`); F20 (`9dfd193`); F22 (`43c10a1`); F21 (`5f98558`); F25 step 1 (`25dbd8d`); F25 step 2 (`a2b8989`); F25 step 3 (`c4bf0a0`); F25 step 4 (`1e59249`); F25 step 5 (`9b8133f`); F25 step 6 (`321b796`); F25 step 7 (`1b1ee1e`); F26 step 1 (`8a0f92b`); F26 step 2 (`14bb529`); example rewrite (`c6570dc`); F25 step 8 (`b543eb1`); F25 step 9 (`0441a2e`); F25 step 10 (`9876b4e`); F26 step 3 (`<previous>`); **F25 live-bring-up reconciliation** (this commit):
- F25 live-bring-up reconciliation: live `asb-subscribe` + `asb-relay` (TCP middleman) capture-and-diff against AVEVA's MxDataProvider on Windows. Five concrete fixes landed:
+221 -8
View File
@@ -16,7 +16,7 @@
//! converge at the `mxaccess` consumer-facing API but stay distinct
//! at the orchestration layer.
//!
//! ## Scope of this iteration (F26 step 3)
//! ## Scope
//!
//! Implements:
//! * [`AsbSession::connect`] — TCP connect → preamble → DH handshake
@@ -29,15 +29,15 @@
//! [`add_monitored_items`] / [`publish`] /
//! [`delete_monitored_items`] / [`delete_subscription`] —
//! subscription primitives.
//! * [`AsbSession::subscribe`] — returns an [`AsbSubscription`]
//! `Stream<Item = Result<MonitoredItemValue, Error>>` driven by a
//! background publish-loop. Drop of the stream aborts the loop.
//! * Cheap-clone semantics — the inner state lives behind
//! `Arc<Mutex<...>>`, so `clone()` is `O(1)` and the lock
//! serialises operation calls (matches the NMX Session's pattern
//! per `session.rs:326`).
//!
//! Stubbed for next F26 iteration:
//! * `Stream<Item = MonitoredItemValue>` subscription handle that
//! internally drives a `publish`-loop. Today consumers call
//! `publish().await` themselves in a loop.
//! Stubbed for later iteration:
//! * Recovery / reconnect — the NMX `RecoveryPolicy` shape needs to
//! be reused once a captured ASB-side disconnect informs the
//! retry strategy.
@@ -46,21 +46,32 @@
//! inline.
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use futures_util::Stream;
use mxaccess_asb::{
AddMonitoredItemsResponse, ConnectResponse, CreateSubscriptionResponse,
DeleteMonitoredItemsResponse, DeleteSubscriptionResponse, ItemIdentity, MinimalMonitoredItem,
MinimalWriteValue, PublishResponse, PublishWriteCompleteResponse, ReadResponse,
RegisterItemsResponse, UnregisterItemsResponse, WriteResponse,
MinimalWriteValue, MonitoredItemValue, PublishResponse, PublishWriteCompleteResponse,
ReadResponse, RegisterItemsResponse, UnregisterItemsResponse, WriteResponse,
};
use mxaccess_asb_nettcp::auth::CryptoParameters;
use tokio::net::TcpStream;
use tokio::sync::Mutex;
use tokio::sync::{mpsc, Mutex};
use tokio::task::JoinHandle;
use tokio_stream::wrappers::ReceiverStream;
use crate::transport_asb::AsbTransport;
use crate::{ConnectionError, Error};
/// Channel buffer for [`AsbSubscription`]'s publish-loop. 256 samples is
/// generous for the typical sample-rate budget (1-100 Hz) — bounded so
/// a slow consumer eventually back-pressures the publish task instead
/// of growing memory unbounded.
const SUBSCRIPTION_CHANNEL_CAPACITY: usize = 256;
/// Cheap-clone async client for the ASB data plane. Drop of the last
/// clone fires a best-effort `disconnect()` + `send_end()` per the
/// `Drop` impl below.
@@ -291,6 +302,113 @@ impl AsbSession {
client.send_end().await.map_err(map_client_error)?;
Ok(())
}
/// Spawn a background publish-loop on this subscription and return
/// a [`Stream`] that yields each delivered [`MonitoredItemValue`].
///
/// The returned [`AsbSubscription`] is `Stream<Item = Result<MonitoredItemValue, Error>>`.
/// The internal task calls [`AsbSession::publish`] in a tight loop;
/// each `PublishResponse`'s `values` array is fanned out as
/// individual stream items. Per-item `status` from the server is
/// **not** surfaced (the field is opaque per-item and rarely
/// actionable for the streaming consumer); a future iteration may
/// wrap each value in a richer struct if the need is real.
///
/// The stream terminates when:
/// - the consumer drops the [`AsbSubscription`] (the publish task
/// is `abort()`-ed in `Drop`); or
/// - the underlying `publish()` call returns an [`Error`] — that
/// error is delivered as the final stream item, then `None`.
///
/// `subscription_id` must come from a prior
/// [`AsbSession::create_subscription`] +
/// [`AsbSession::add_monitored_items`] sequence. Calling
/// `delete_subscription()` while the stream is alive will cause
/// the next `publish()` to fail and the stream to terminate
/// gracefully.
#[must_use = "AsbSubscription is a Stream — drop it explicitly to cancel"]
pub fn subscribe(&self, subscription_id: i64) -> AsbSubscription {
let (tx, rx) = mpsc::channel(SUBSCRIPTION_CHANNEL_CAPACITY);
let session = self.clone();
let join = tokio::spawn(async move {
publish_loop(
move || {
let s = session.clone();
async move { s.publish(subscription_id).await }
},
tx,
)
.await;
});
AsbSubscription {
inner: ReceiverStream::new(rx),
join: Some(join),
}
}
}
/// Inner publish-loop body — testable in isolation by passing any
/// closure that produces `Result<PublishResponse, Error>` futures.
///
/// Exits on the first transport error (after delivering it as the
/// final item) and on consumer-drop (when `tx.send` returns an error
/// because the receiver was closed).
async fn publish_loop<F, Fut>(
mut publish_fn: F,
tx: mpsc::Sender<Result<MonitoredItemValue, Error>>,
) where
F: FnMut() -> Fut,
Fut: std::future::Future<Output = Result<PublishResponse, Error>>,
{
loop {
match publish_fn().await {
Ok(response) => {
for value in response.values {
if tx.send(Ok(value)).await.is_err() {
return; // consumer dropped the stream
}
}
}
Err(e) => {
// Best-effort delivery of the terminal error; a closed
// channel here means the consumer already gave up.
let _ = tx.send(Err(e)).await;
return;
}
}
}
}
/// `Stream<Item = Result<MonitoredItemValue, Error>>` over an
/// ASB subscription. Created via [`AsbSession::subscribe`].
///
/// Drop aborts the publish-loop so the server-side queue stops
/// being drained.
pub struct AsbSubscription {
inner: ReceiverStream<Result<MonitoredItemValue, Error>>,
join: Option<JoinHandle<()>>,
}
impl std::fmt::Debug for AsbSubscription {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AsbSubscription").finish_non_exhaustive()
}
}
impl Stream for AsbSubscription {
type Item = Result<MonitoredItemValue, Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.get_mut().inner).poll_next(cx)
}
}
impl Drop for AsbSubscription {
fn drop(&mut self) {
if let Some(handle) = self.join.take() {
handle.abort();
}
}
}
fn map_client_error(err: mxaccess_asb::ClientError) -> Error {
@@ -308,6 +426,11 @@ fn map_client_error(err: mxaccess_asb::ClientError) -> Error {
)]
mod tests {
use super::*;
use futures_util::StreamExt;
use mxaccess_asb::ItemStatus;
use mxaccess_codec::asb_variant::{AsbStatus, AsbVariant, RuntimeValue};
use std::sync::Arc as StdArc;
use std::sync::atomic::{AtomicUsize, Ordering};
/// Compile-time only: `AsbSession` must be `Clone + Send + Sync`
/// (the `mxaccess` consumer ergonomics contract).
@@ -316,4 +439,94 @@ mod tests {
fn assert_clone_send_sync<T: Clone + Send + Sync + 'static>() {}
assert_clone_send_sync::<AsbSession>();
}
/// Compile-time only: `AsbSubscription` must be `Stream + Send + Unpin`.
/// `Unpin` matters because consumers typically `.next().await` it
/// directly without pinning. `Send` matters because the tokio
/// runtime moves it across threads.
#[test]
fn asb_subscription_is_stream_send_unpin() {
fn assert_stream_send_unpin<T>()
where
T: Stream<Item = Result<MonitoredItemValue, Error>> + Send + Unpin + 'static,
{
}
assert_stream_send_unpin::<AsbSubscription>();
}
fn fake_value(idx: i32) -> MonitoredItemValue {
MonitoredItemValue {
item: ItemIdentity::absolute_by_name(format!("Tag{idx}")),
value: RuntimeValue {
timestamp_binary: 0,
timestamp_specified: false,
value: AsbVariant::from_i32(idx),
status: AsbStatus::default(),
},
user_data: AsbVariant::empty(),
}
}
fn fake_response(values: Vec<MonitoredItemValue>) -> PublishResponse {
PublishResponse {
status: Vec::<ItemStatus>::new(),
values,
}
}
#[tokio::test]
async fn publish_loop_delivers_values_then_terminates_on_error() {
let (tx, rx) = mpsc::channel(8);
let calls = StdArc::new(AtomicUsize::new(0));
let calls_clone = calls.clone();
let publish_fn = move || {
let n = calls_clone.fetch_add(1, Ordering::Relaxed);
async move {
if n == 0 {
Ok(fake_response(vec![fake_value(1), fake_value(2)]))
} else if n == 1 {
Ok(fake_response(vec![fake_value(3)]))
} else {
Err(Error::Connection(ConnectionError::TransportFailure {
detail: "synthetic terminal error".to_string(),
}))
}
}
};
publish_loop(publish_fn, tx).await;
// Drain: 3 Ok values, then 1 Err, then channel-closed.
let mut stream = ReceiverStream::new(rx);
let mut oks = Vec::new();
let mut errs = 0;
while let Some(item) = stream.next().await {
match item {
Ok(v) => oks.push(v),
Err(_) => errs += 1,
}
}
assert_eq!(oks.len(), 3, "three Ok values delivered");
assert_eq!(errs, 1, "one terminal Err delivered");
assert_eq!(calls.load(Ordering::Relaxed), 3, "publish_fn called 3x");
}
#[tokio::test]
async fn publish_loop_exits_when_consumer_drops_channel() {
let (tx, rx) = mpsc::channel(2);
let calls = StdArc::new(AtomicUsize::new(0));
let calls_clone = calls.clone();
let publish_fn = move || {
calls_clone.fetch_add(1, Ordering::Relaxed);
async move { Ok(fake_response(vec![fake_value(7), fake_value(8), fake_value(9)])) }
};
// Drop the receiver immediately — first send triggers exit.
drop(rx);
publish_loop(publish_fn, tx).await;
// Loop must exit; tx is dropped on return. The publisher might
// make 1 call before discovering the closed channel (tx.send
// is what surfaces the closure).
assert!(
calls.load(Ordering::Relaxed) >= 1,
"publish_fn called at least once"
);
}
}