[F26 stream] mxaccess: AsbSession::subscribe — Stream<Item = MonitoredItemValue>
rust / build / test / clippy / fmt (push) Has been cancelled
rust / build / test / clippy / fmt (push) Has been cancelled
Closes the last F26 stub from the M5 status block. New
AsbSession::subscribe(subscription_id) returns an AsbSubscription
that impls Stream<Item = Result<MonitoredItemValue, Error>>. An
internal tokio::spawn'd publish-loop drains the subscription queue
via the existing AsbSession::publish() and fans each
PublishResponse's `values` array out as individual stream items.
Termination semantics:
- Drop of AsbSubscription calls JoinHandle::abort() — the publish
task stops draining the server-side queue (the .NET reference
pattern at MxAsbDataClient.cs uses the same task-cancellation
shape).
- Transport error from publish() is delivered as the final stream
item; the loop returns and the channel closes.
- Receiver-drop (consumer stops polling) is detected when
tx.send returns Err — the loop exits without making more
publish calls.
The inner publish_loop helper takes any FnMut() -> Future<Result<...>>
so it's testable in isolation (no live ASB endpoint required).
Per-item ItemStatus from the server is intentionally not surfaced
on the stream: the field is opaque per-item and rarely actionable
for the streaming consumer. A richer struct can wrap each value if
that need surfaces.
3 new tests pin:
- asb_subscription_is_stream_send_unpin (compile-time bounds);
- publish_loop_delivers_values_then_terminates_on_error
(3 Ok values from 2 batches, then 1 terminal Err);
- publish_loop_exits_when_consumer_drops_channel.
New deps used (already in mxaccess Cargo.toml): futures_util::Stream,
tokio::sync::mpsc, tokio_stream::wrappers::ReceiverStream,
tokio::task::JoinHandle.
Workspace: 718 → 721 tests. Default-feature clippy clean.
mxaccess crate-level doc updated to drop the "stubbed for next F26
iteration" note for the subscription stream.
design/followups.md F18 M5 status block updated: F26 stream
subscription marked resolved.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
+1
-1
@@ -56,7 +56,7 @@ move to `## Resolved` with a date + commit hash.
|
||||
- ~~F32~~: resolved (commit `<this commit>`) via option (b) — three-type live coverage is the deployable maximum; missing types are Galaxy-provisioning-gated.
|
||||
- **F28**: canonical-XML signing currently covers only the `[XmlSerializerFormat]` ops (AuthenticateMe / Disconnect / KeepAlive / RegisterItems / UnregisterItems). Read / Write / CreateSubscription / AddMonitoredItems / Publish / etc. still sign over NBFX wire bytes via the legacy fallback. Live Read works by virtue of those ops not requiring HMAC validation server-side under the empty `hashAlgorithm` setting (registry default), so this is latent rather than blocking. Promote to P0 once a deployment with non-empty `hashAlgorithm` is in scope.
|
||||
- ~~F29~~: resolved (commit `<this commit>`) — `nbfs.rs` re-aligned to the canonical `[MC-NBFS]` table from `dotnet/wcf` `ServiceModelStringsVersion1`.
|
||||
- **F26 stream subscription**: `Stream<Item = MonitoredItemValue>` over a publish-loop is still stubbed in `AsbSession`. Tracked under F25 step 8 / F26 step 3 in the cumulative log.
|
||||
- ~~F26 stream subscription~~: resolved (commit `<this commit>`) — `AsbSession::subscribe(subscription_id)` returns an `AsbSubscription: Stream<Item = Result<MonitoredItemValue, Error>>` driven by an internal `tokio::spawn`'d publish-loop. Drop of the subscription aborts the loop. Per-`PublishResponse` `values` array is fanned out as individual stream items; transport errors are delivered as the final stream item before termination. Inner `publish_loop` helper is split out so it's testable in isolation against any closure-based fake `publish_fn`. 3 new tests pin: compile-time `Stream + Send + Unpin`, multi-batch + terminal-error round-trip, consumer-drop short-circuits the publisher. Workspace 718 → 721 tests.
|
||||
|
||||
**Cumulative execution log.** F19 + F23 (`ed17c07`); F24 (`7611d9e`); F20 (`9dfd193`); F22 (`43c10a1`); F21 (`5f98558`); F25 step 1 (`25dbd8d`); F25 step 2 (`a2b8989`); F25 step 3 (`c4bf0a0`); F25 step 4 (`1e59249`); F25 step 5 (`9b8133f`); F25 step 6 (`321b796`); F25 step 7 (`1b1ee1e`); F26 step 1 (`8a0f92b`); F26 step 2 (`14bb529`); example rewrite (`c6570dc`); F25 step 8 (`b543eb1`); F25 step 9 (`0441a2e`); F25 step 10 (`9876b4e`); F26 step 3 (`<previous>`); **F25 live-bring-up reconciliation** (this commit):
|
||||
- F25 live-bring-up reconciliation: live `asb-subscribe` + `asb-relay` (TCP middleman) capture-and-diff against AVEVA's MxDataProvider on Windows. Five concrete fixes landed:
|
||||
|
||||
Reference in New Issue
Block a user