[F26 stream] mxaccess: AsbSession::subscribe — Stream<Item = MonitoredItemValue>
rust / build / test / clippy / fmt (push) Has been cancelled
rust / build / test / clippy / fmt (push) Has been cancelled
Closes the last F26 stub from the M5 status block. New
AsbSession::subscribe(subscription_id) returns an AsbSubscription
that impls Stream<Item = Result<MonitoredItemValue, Error>>. An
internal tokio::spawn'd publish-loop drains the subscription queue
via the existing AsbSession::publish() and fans each
PublishResponse's `values` array out as individual stream items.
Termination semantics:
- Drop of AsbSubscription calls JoinHandle::abort() — the publish
task stops draining the server-side queue (the .NET reference
pattern at MxAsbDataClient.cs uses the same task-cancellation
shape).
- Transport error from publish() is delivered as the final stream
item; the loop returns and the channel closes.
- Receiver-drop (consumer stops polling) is detected when
tx.send returns Err — the loop exits without making more
publish calls.
The inner publish_loop helper takes any FnMut() -> Future<Result<...>>
so it's testable in isolation (no live ASB endpoint required).
Per-item ItemStatus from the server is intentionally not surfaced
on the stream: the field is opaque per-item and rarely actionable
for the streaming consumer. A richer struct can wrap each value if
that need surfaces.
3 new tests pin:
- asb_subscription_is_stream_send_unpin (compile-time bounds);
- publish_loop_delivers_values_then_terminates_on_error
(3 Ok values from 2 batches, then 1 terminal Err);
- publish_loop_exits_when_consumer_drops_channel.
New deps used (already in mxaccess Cargo.toml): futures_util::Stream,
tokio::sync::mpsc, tokio_stream::wrappers::ReceiverStream,
tokio::task::JoinHandle.
Workspace: 718 → 721 tests. Default-feature clippy clean.
mxaccess crate-level doc updated to drop the "stubbed for next F26
iteration" note for the subscription stream.
design/followups.md F18 M5 status block updated: F26 stream
subscription marked resolved.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
+1
-1
@@ -56,7 +56,7 @@ move to `## Resolved` with a date + commit hash.
|
|||||||
- ~~F32~~: resolved (commit `<this commit>`) via option (b) — three-type live coverage is the deployable maximum; missing types are Galaxy-provisioning-gated.
|
- ~~F32~~: resolved (commit `<this commit>`) via option (b) — three-type live coverage is the deployable maximum; missing types are Galaxy-provisioning-gated.
|
||||||
- **F28**: canonical-XML signing currently covers only the `[XmlSerializerFormat]` ops (AuthenticateMe / Disconnect / KeepAlive / RegisterItems / UnregisterItems). Read / Write / CreateSubscription / AddMonitoredItems / Publish / etc. still sign over NBFX wire bytes via the legacy fallback. Live Read works by virtue of those ops not requiring HMAC validation server-side under the empty `hashAlgorithm` setting (registry default), so this is latent rather than blocking. Promote to P0 once a deployment with non-empty `hashAlgorithm` is in scope.
|
- **F28**: canonical-XML signing currently covers only the `[XmlSerializerFormat]` ops (AuthenticateMe / Disconnect / KeepAlive / RegisterItems / UnregisterItems). Read / Write / CreateSubscription / AddMonitoredItems / Publish / etc. still sign over NBFX wire bytes via the legacy fallback. Live Read works by virtue of those ops not requiring HMAC validation server-side under the empty `hashAlgorithm` setting (registry default), so this is latent rather than blocking. Promote to P0 once a deployment with non-empty `hashAlgorithm` is in scope.
|
||||||
- ~~F29~~: resolved (commit `<this commit>`) — `nbfs.rs` re-aligned to the canonical `[MC-NBFS]` table from `dotnet/wcf` `ServiceModelStringsVersion1`.
|
- ~~F29~~: resolved (commit `<this commit>`) — `nbfs.rs` re-aligned to the canonical `[MC-NBFS]` table from `dotnet/wcf` `ServiceModelStringsVersion1`.
|
||||||
- **F26 stream subscription**: `Stream<Item = MonitoredItemValue>` over a publish-loop is still stubbed in `AsbSession`. Tracked under F25 step 8 / F26 step 3 in the cumulative log.
|
- ~~F26 stream subscription~~: resolved (commit `<this commit>`) — `AsbSession::subscribe(subscription_id)` returns an `AsbSubscription: Stream<Item = Result<MonitoredItemValue, Error>>` driven by an internal `tokio::spawn`'d publish-loop. Drop of the subscription aborts the loop. Per-`PublishResponse` `values` array is fanned out as individual stream items; transport errors are delivered as the final stream item before termination. Inner `publish_loop` helper is split out so it's testable in isolation against any closure-based fake `publish_fn`. 3 new tests pin: compile-time `Stream + Send + Unpin`, multi-batch + terminal-error round-trip, consumer-drop short-circuits the publisher. Workspace 718 → 721 tests.
|
||||||
|
|
||||||
**Cumulative execution log.** F19 + F23 (`ed17c07`); F24 (`7611d9e`); F20 (`9dfd193`); F22 (`43c10a1`); F21 (`5f98558`); F25 step 1 (`25dbd8d`); F25 step 2 (`a2b8989`); F25 step 3 (`c4bf0a0`); F25 step 4 (`1e59249`); F25 step 5 (`9b8133f`); F25 step 6 (`321b796`); F25 step 7 (`1b1ee1e`); F26 step 1 (`8a0f92b`); F26 step 2 (`14bb529`); example rewrite (`c6570dc`); F25 step 8 (`b543eb1`); F25 step 9 (`0441a2e`); F25 step 10 (`9876b4e`); F26 step 3 (`<previous>`); **F25 live-bring-up reconciliation** (this commit):
|
**Cumulative execution log.** F19 + F23 (`ed17c07`); F24 (`7611d9e`); F20 (`9dfd193`); F22 (`43c10a1`); F21 (`5f98558`); F25 step 1 (`25dbd8d`); F25 step 2 (`a2b8989`); F25 step 3 (`c4bf0a0`); F25 step 4 (`1e59249`); F25 step 5 (`9b8133f`); F25 step 6 (`321b796`); F25 step 7 (`1b1ee1e`); F26 step 1 (`8a0f92b`); F26 step 2 (`14bb529`); example rewrite (`c6570dc`); F25 step 8 (`b543eb1`); F25 step 9 (`0441a2e`); F25 step 10 (`9876b4e`); F26 step 3 (`<previous>`); **F25 live-bring-up reconciliation** (this commit):
|
||||||
- F25 live-bring-up reconciliation: live `asb-subscribe` + `asb-relay` (TCP middleman) capture-and-diff against AVEVA's MxDataProvider on Windows. Five concrete fixes landed:
|
- F25 live-bring-up reconciliation: live `asb-subscribe` + `asb-relay` (TCP middleman) capture-and-diff against AVEVA's MxDataProvider on Windows. Five concrete fixes landed:
|
||||||
|
|||||||
@@ -16,7 +16,7 @@
|
|||||||
//! converge at the `mxaccess` consumer-facing API but stay distinct
|
//! converge at the `mxaccess` consumer-facing API but stay distinct
|
||||||
//! at the orchestration layer.
|
//! at the orchestration layer.
|
||||||
//!
|
//!
|
||||||
//! ## Scope of this iteration (F26 step 3)
|
//! ## Scope
|
||||||
//!
|
//!
|
||||||
//! Implements:
|
//! Implements:
|
||||||
//! * [`AsbSession::connect`] — TCP connect → preamble → DH handshake
|
//! * [`AsbSession::connect`] — TCP connect → preamble → DH handshake
|
||||||
@@ -29,15 +29,15 @@
|
|||||||
//! [`add_monitored_items`] / [`publish`] /
|
//! [`add_monitored_items`] / [`publish`] /
|
||||||
//! [`delete_monitored_items`] / [`delete_subscription`] —
|
//! [`delete_monitored_items`] / [`delete_subscription`] —
|
||||||
//! subscription primitives.
|
//! subscription primitives.
|
||||||
|
//! * [`AsbSession::subscribe`] — returns an [`AsbSubscription`]
|
||||||
|
//! `Stream<Item = Result<MonitoredItemValue, Error>>` driven by a
|
||||||
|
//! background publish-loop. Drop of the stream aborts the loop.
|
||||||
//! * Cheap-clone semantics — the inner state lives behind
|
//! * Cheap-clone semantics — the inner state lives behind
|
||||||
//! `Arc<Mutex<...>>`, so `clone()` is `O(1)` and the lock
|
//! `Arc<Mutex<...>>`, so `clone()` is `O(1)` and the lock
|
||||||
//! serialises operation calls (matches the NMX Session's pattern
|
//! serialises operation calls (matches the NMX Session's pattern
|
||||||
//! per `session.rs:326`).
|
//! per `session.rs:326`).
|
||||||
//!
|
//!
|
||||||
//! Stubbed for next F26 iteration:
|
//! Stubbed for later iteration:
|
||||||
//! * `Stream<Item = MonitoredItemValue>` subscription handle that
|
|
||||||
//! internally drives a `publish`-loop. Today consumers call
|
|
||||||
//! `publish().await` themselves in a loop.
|
|
||||||
//! * Recovery / reconnect — the NMX `RecoveryPolicy` shape needs to
|
//! * Recovery / reconnect — the NMX `RecoveryPolicy` shape needs to
|
||||||
//! be reused once a captured ASB-side disconnect informs the
|
//! be reused once a captured ASB-side disconnect informs the
|
||||||
//! retry strategy.
|
//! retry strategy.
|
||||||
@@ -46,21 +46,32 @@
|
|||||||
//! inline.
|
//! inline.
|
||||||
|
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
|
use std::pin::Pin;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use std::task::{Context, Poll};
|
||||||
|
|
||||||
|
use futures_util::Stream;
|
||||||
use mxaccess_asb::{
|
use mxaccess_asb::{
|
||||||
AddMonitoredItemsResponse, ConnectResponse, CreateSubscriptionResponse,
|
AddMonitoredItemsResponse, ConnectResponse, CreateSubscriptionResponse,
|
||||||
DeleteMonitoredItemsResponse, DeleteSubscriptionResponse, ItemIdentity, MinimalMonitoredItem,
|
DeleteMonitoredItemsResponse, DeleteSubscriptionResponse, ItemIdentity, MinimalMonitoredItem,
|
||||||
MinimalWriteValue, PublishResponse, PublishWriteCompleteResponse, ReadResponse,
|
MinimalWriteValue, MonitoredItemValue, PublishResponse, PublishWriteCompleteResponse,
|
||||||
RegisterItemsResponse, UnregisterItemsResponse, WriteResponse,
|
ReadResponse, RegisterItemsResponse, UnregisterItemsResponse, WriteResponse,
|
||||||
};
|
};
|
||||||
use mxaccess_asb_nettcp::auth::CryptoParameters;
|
use mxaccess_asb_nettcp::auth::CryptoParameters;
|
||||||
use tokio::net::TcpStream;
|
use tokio::net::TcpStream;
|
||||||
use tokio::sync::Mutex;
|
use tokio::sync::{mpsc, Mutex};
|
||||||
|
use tokio::task::JoinHandle;
|
||||||
|
use tokio_stream::wrappers::ReceiverStream;
|
||||||
|
|
||||||
use crate::transport_asb::AsbTransport;
|
use crate::transport_asb::AsbTransport;
|
||||||
use crate::{ConnectionError, Error};
|
use crate::{ConnectionError, Error};
|
||||||
|
|
||||||
|
/// Channel buffer for [`AsbSubscription`]'s publish-loop. 256 samples is
|
||||||
|
/// generous for the typical sample-rate budget (1-100 Hz) — bounded so
|
||||||
|
/// a slow consumer eventually back-pressures the publish task instead
|
||||||
|
/// of growing memory unbounded.
|
||||||
|
const SUBSCRIPTION_CHANNEL_CAPACITY: usize = 256;
|
||||||
|
|
||||||
/// Cheap-clone async client for the ASB data plane. Drop of the last
|
/// Cheap-clone async client for the ASB data plane. Drop of the last
|
||||||
/// clone fires a best-effort `disconnect()` + `send_end()` per the
|
/// clone fires a best-effort `disconnect()` + `send_end()` per the
|
||||||
/// `Drop` impl below.
|
/// `Drop` impl below.
|
||||||
@@ -291,6 +302,113 @@ impl AsbSession {
|
|||||||
client.send_end().await.map_err(map_client_error)?;
|
client.send_end().await.map_err(map_client_error)?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Spawn a background publish-loop on this subscription and return
|
||||||
|
/// a [`Stream`] that yields each delivered [`MonitoredItemValue`].
|
||||||
|
///
|
||||||
|
/// The returned [`AsbSubscription`] is `Stream<Item = Result<MonitoredItemValue, Error>>`.
|
||||||
|
/// The internal task calls [`AsbSession::publish`] in a tight loop;
|
||||||
|
/// each `PublishResponse`'s `values` array is fanned out as
|
||||||
|
/// individual stream items. Per-item `status` from the server is
|
||||||
|
/// **not** surfaced (the field is opaque per-item and rarely
|
||||||
|
/// actionable for the streaming consumer); a future iteration may
|
||||||
|
/// wrap each value in a richer struct if the need is real.
|
||||||
|
///
|
||||||
|
/// The stream terminates when:
|
||||||
|
/// - the consumer drops the [`AsbSubscription`] (the publish task
|
||||||
|
/// is `abort()`-ed in `Drop`); or
|
||||||
|
/// - the underlying `publish()` call returns an [`Error`] — that
|
||||||
|
/// error is delivered as the final stream item, then `None`.
|
||||||
|
///
|
||||||
|
/// `subscription_id` must come from a prior
|
||||||
|
/// [`AsbSession::create_subscription`] +
|
||||||
|
/// [`AsbSession::add_monitored_items`] sequence. Calling
|
||||||
|
/// `delete_subscription()` while the stream is alive will cause
|
||||||
|
/// the next `publish()` to fail and the stream to terminate
|
||||||
|
/// gracefully.
|
||||||
|
#[must_use = "AsbSubscription is a Stream — drop it explicitly to cancel"]
|
||||||
|
pub fn subscribe(&self, subscription_id: i64) -> AsbSubscription {
|
||||||
|
let (tx, rx) = mpsc::channel(SUBSCRIPTION_CHANNEL_CAPACITY);
|
||||||
|
let session = self.clone();
|
||||||
|
let join = tokio::spawn(async move {
|
||||||
|
publish_loop(
|
||||||
|
move || {
|
||||||
|
let s = session.clone();
|
||||||
|
async move { s.publish(subscription_id).await }
|
||||||
|
},
|
||||||
|
tx,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
});
|
||||||
|
AsbSubscription {
|
||||||
|
inner: ReceiverStream::new(rx),
|
||||||
|
join: Some(join),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Inner publish-loop body — testable in isolation by passing any
|
||||||
|
/// closure that produces `Result<PublishResponse, Error>` futures.
|
||||||
|
///
|
||||||
|
/// Exits on the first transport error (after delivering it as the
|
||||||
|
/// final item) and on consumer-drop (when `tx.send` returns an error
|
||||||
|
/// because the receiver was closed).
|
||||||
|
async fn publish_loop<F, Fut>(
|
||||||
|
mut publish_fn: F,
|
||||||
|
tx: mpsc::Sender<Result<MonitoredItemValue, Error>>,
|
||||||
|
) where
|
||||||
|
F: FnMut() -> Fut,
|
||||||
|
Fut: std::future::Future<Output = Result<PublishResponse, Error>>,
|
||||||
|
{
|
||||||
|
loop {
|
||||||
|
match publish_fn().await {
|
||||||
|
Ok(response) => {
|
||||||
|
for value in response.values {
|
||||||
|
if tx.send(Ok(value)).await.is_err() {
|
||||||
|
return; // consumer dropped the stream
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
// Best-effort delivery of the terminal error; a closed
|
||||||
|
// channel here means the consumer already gave up.
|
||||||
|
let _ = tx.send(Err(e)).await;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// `Stream<Item = Result<MonitoredItemValue, Error>>` over an
|
||||||
|
/// ASB subscription. Created via [`AsbSession::subscribe`].
|
||||||
|
///
|
||||||
|
/// Drop aborts the publish-loop so the server-side queue stops
|
||||||
|
/// being drained.
|
||||||
|
pub struct AsbSubscription {
|
||||||
|
inner: ReceiverStream<Result<MonitoredItemValue, Error>>,
|
||||||
|
join: Option<JoinHandle<()>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::fmt::Debug for AsbSubscription {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
f.debug_struct("AsbSubscription").finish_non_exhaustive()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Stream for AsbSubscription {
|
||||||
|
type Item = Result<MonitoredItemValue, Error>;
|
||||||
|
|
||||||
|
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||||
|
Pin::new(&mut self.get_mut().inner).poll_next(cx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for AsbSubscription {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
if let Some(handle) = self.join.take() {
|
||||||
|
handle.abort();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn map_client_error(err: mxaccess_asb::ClientError) -> Error {
|
fn map_client_error(err: mxaccess_asb::ClientError) -> Error {
|
||||||
@@ -308,6 +426,11 @@ fn map_client_error(err: mxaccess_asb::ClientError) -> Error {
|
|||||||
)]
|
)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
use futures_util::StreamExt;
|
||||||
|
use mxaccess_asb::ItemStatus;
|
||||||
|
use mxaccess_codec::asb_variant::{AsbStatus, AsbVariant, RuntimeValue};
|
||||||
|
use std::sync::Arc as StdArc;
|
||||||
|
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||||
|
|
||||||
/// Compile-time only: `AsbSession` must be `Clone + Send + Sync`
|
/// Compile-time only: `AsbSession` must be `Clone + Send + Sync`
|
||||||
/// (the `mxaccess` consumer ergonomics contract).
|
/// (the `mxaccess` consumer ergonomics contract).
|
||||||
@@ -316,4 +439,94 @@ mod tests {
|
|||||||
fn assert_clone_send_sync<T: Clone + Send + Sync + 'static>() {}
|
fn assert_clone_send_sync<T: Clone + Send + Sync + 'static>() {}
|
||||||
assert_clone_send_sync::<AsbSession>();
|
assert_clone_send_sync::<AsbSession>();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Compile-time only: `AsbSubscription` must be `Stream + Send + Unpin`.
|
||||||
|
/// `Unpin` matters because consumers typically `.next().await` it
|
||||||
|
/// directly without pinning. `Send` matters because the tokio
|
||||||
|
/// runtime moves it across threads.
|
||||||
|
#[test]
|
||||||
|
fn asb_subscription_is_stream_send_unpin() {
|
||||||
|
fn assert_stream_send_unpin<T>()
|
||||||
|
where
|
||||||
|
T: Stream<Item = Result<MonitoredItemValue, Error>> + Send + Unpin + 'static,
|
||||||
|
{
|
||||||
|
}
|
||||||
|
assert_stream_send_unpin::<AsbSubscription>();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn fake_value(idx: i32) -> MonitoredItemValue {
|
||||||
|
MonitoredItemValue {
|
||||||
|
item: ItemIdentity::absolute_by_name(format!("Tag{idx}")),
|
||||||
|
value: RuntimeValue {
|
||||||
|
timestamp_binary: 0,
|
||||||
|
timestamp_specified: false,
|
||||||
|
value: AsbVariant::from_i32(idx),
|
||||||
|
status: AsbStatus::default(),
|
||||||
|
},
|
||||||
|
user_data: AsbVariant::empty(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn fake_response(values: Vec<MonitoredItemValue>) -> PublishResponse {
|
||||||
|
PublishResponse {
|
||||||
|
status: Vec::<ItemStatus>::new(),
|
||||||
|
values,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn publish_loop_delivers_values_then_terminates_on_error() {
|
||||||
|
let (tx, rx) = mpsc::channel(8);
|
||||||
|
let calls = StdArc::new(AtomicUsize::new(0));
|
||||||
|
let calls_clone = calls.clone();
|
||||||
|
let publish_fn = move || {
|
||||||
|
let n = calls_clone.fetch_add(1, Ordering::Relaxed);
|
||||||
|
async move {
|
||||||
|
if n == 0 {
|
||||||
|
Ok(fake_response(vec![fake_value(1), fake_value(2)]))
|
||||||
|
} else if n == 1 {
|
||||||
|
Ok(fake_response(vec![fake_value(3)]))
|
||||||
|
} else {
|
||||||
|
Err(Error::Connection(ConnectionError::TransportFailure {
|
||||||
|
detail: "synthetic terminal error".to_string(),
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
publish_loop(publish_fn, tx).await;
|
||||||
|
// Drain: 3 Ok values, then 1 Err, then channel-closed.
|
||||||
|
let mut stream = ReceiverStream::new(rx);
|
||||||
|
let mut oks = Vec::new();
|
||||||
|
let mut errs = 0;
|
||||||
|
while let Some(item) = stream.next().await {
|
||||||
|
match item {
|
||||||
|
Ok(v) => oks.push(v),
|
||||||
|
Err(_) => errs += 1,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assert_eq!(oks.len(), 3, "three Ok values delivered");
|
||||||
|
assert_eq!(errs, 1, "one terminal Err delivered");
|
||||||
|
assert_eq!(calls.load(Ordering::Relaxed), 3, "publish_fn called 3x");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn publish_loop_exits_when_consumer_drops_channel() {
|
||||||
|
let (tx, rx) = mpsc::channel(2);
|
||||||
|
let calls = StdArc::new(AtomicUsize::new(0));
|
||||||
|
let calls_clone = calls.clone();
|
||||||
|
let publish_fn = move || {
|
||||||
|
calls_clone.fetch_add(1, Ordering::Relaxed);
|
||||||
|
async move { Ok(fake_response(vec![fake_value(7), fake_value(8), fake_value(9)])) }
|
||||||
|
};
|
||||||
|
// Drop the receiver immediately — first send triggers exit.
|
||||||
|
drop(rx);
|
||||||
|
publish_loop(publish_fn, tx).await;
|
||||||
|
// Loop must exit; tx is dropped on return. The publisher might
|
||||||
|
// make 1 call before discovering the closed channel (tx.send
|
||||||
|
// is what surfaces the closure).
|
||||||
|
assert!(
|
||||||
|
calls.load(Ordering::Relaxed) >= 1,
|
||||||
|
"publish_fn called at least once"
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user