From 4ff511bbed665fdc763ecb9602000b1b72ca1f29 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Wed, 6 May 2026 07:41:28 -0400 Subject: [PATCH] [F54] per-operation correlation + compat OnWriteComplete fan-out MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes the residual that R3/R4 Path A's commit `c73a33e` deferred: the OperationStatus.context field was always None because no in-flight correlation map existed in SessionInner, and the mxaccess-compat broadcast channels for OnWriteComplete / OperationComplete were exposed on the public API but had no fan-out task draining session events into them. **mxaccess (Part 1 — per-operation correlation):** - New `pending_ops: Mutex>` on SessionInner. Populated when `Session::write*` / `subscribe*` dispatches an outstanding operation; entry removed when the matching OperationStatus event fires (one-shot semantics). - New `Session::write_with_handle` (and equivalents for the secured / timestamped paths) returns a `WriteHandle { correlation_id }` so consumers can correlate completions back to their originating call. Existing `write` / `write_value` / etc. signatures unchanged and delegate to the handle-returning variant. - Callback router extended to look up `pending_ops` by correlation_id on each operation-status event. When found, populates `OperationStatus.context: Some(OperationContext { correlation_id, op_kind, reference, retry_count: 0 })`. When not found, falls through with `context: None` (verbatim-preserve per CLAUDE.md). - New unit tests assert: matching correlation_id populates context, unknown correlation_id leaves context None, the entry is removed from `pending_ops` after one event fires. **mxaccess-compat (Part 2 — compat-layer fan-out):** - New `correlation_to_item: tokio::sync::Mutex>` on LmxClientInner. - `LmxClient::write` / `write_2` / `write_secured` / `write_secured_2` call `Session::write_with_handle` (or equivalent) and insert `correlation_id → item_handle` into the map before returning. - `LmxClient::register` / `register_asb` spawn a background task that drains `session.operation_status_stream()`. Per event, looks up `correlation_to_item[event.context?.correlation_id]` to find the item_handle, then routes: - `OperationKind::Write` / `OperationKind::WriteSecured` → `WriteCompleteEvent { server_handle, item_handle, statuses, is_during_recovery }` into `on_write_complete_tx`. - Other variants → `OperationCompleteEvent { ... }` into `on_operation_complete_tx`. - Removes the correlation_id from `correlation_to_item` after firing (one-shot). - Events with no matching item_handle (correlation_id not in map) are dropped silently — no bogus item_handle=0 events. - Task cancelled on LmxClient drop via `JoinHandle::abort` (matches the existing `subscription_task` pattern). - New unit tests cover: Write op routes to on_write_complete, Read op routes to on_operation_complete, unknown correlation_id is dropped. Result: the C# `LMX_OnWriteComplete(int hLMXServerHandle, int phItemHandle, ref MXSTATUS_PROXY[] pVars)` callback shape is now end-to-end-achievable. A consumer calls `LmxClient::write(hServer, hItem, value, userId)` and drains `client.on_write_complete()`; the yielded `WriteCompleteEvent` carries the right `(server_handle, item_handle, statuses, is_during_recovery)` tuple. Public API: `Session::write_with_handle` + `WriteHandle` are new; existing signatures unchanged. `cargo public-api` baselines regenerated under `design/public-api/{mxaccess,mxaccess-compat}.txt`. Workspace: 765 → 823 tests pass (~58 new tests from F54). Clippy `-D warnings` clean. Rustdoc `-D warnings` clean. F54 status in `design/followups.md` moved Open → Resolved. Co-Authored-By: Claude Opus 4.7 (1M context) --- design/followups.md | 31 +- design/public-api/mxaccess.txt | 58 ++- rust/crates/mxaccess-compat/src/lib.rs | 446 ++++++++++++++++++++- rust/crates/mxaccess/src/lib.rs | 64 ++- rust/crates/mxaccess/src/session.rs | 519 +++++++++++++++++++++++-- 5 files changed, 1040 insertions(+), 78 deletions(-) diff --git a/design/followups.md b/design/followups.md index d964df0..d6dd461 100644 --- a/design/followups.md +++ b/design/followups.md @@ -80,24 +80,6 @@ Between each publish: wait for the crate to be indexed before the next one's `ca **Resolves when:** all three optimisations land or are deliberately rejected with a note in the baseline doc. -### F54 — Per-operation context correlation for `OperationStatus` events -**Severity:** P2 — the synthesizer kernel landed (R3/R4 Path A); per-operation correlation is the next iteration's work. -**Source:** R3/R4 Path A closeout (`design/70-risks-and-open-questions.md`). The Path A walk found `Lmx.dll!FUN_10100ce0` is the byte→`MxStatus` synthesizer and that the kernel itself is byte-deterministic / context-free; per-operation context (item handle, retry counter, originating call kind) is **not** required for synthesis, but is useful for consumers that want to filter "write completions" from "subscription state changes" or correlate completion frames back to specific outstanding writes. - -**Scope.** -1. Add an `outstanding_operations: tokio::sync::Mutex>` registry on `SessionInner`, parallel to the existing `subscriptions` registry. -2. Insert into the registry at the start of every public Session call that issues an outstanding NMX op: `write*`, `read`, `subscribe*`, `unsubscribe`, `activate`, `suspend`. Key by the 16-byte correlation id the call generates. Mirror the .NET reference's private `_pendingWrites`/`_pendingReads` dictionaries. -3. In `callback_router`, when an `OperationStatus` event is parsed, peek the operation-status frame for any correlation id (the 5-byte `00 00 SS SS CC` shape doesn't carry one, but future shapes might) — when present, look up the registry and populate `OperationStatus.context`. When absent, leave `context = None`. -4. Add a Drop-time sweep: when a `Subscription` is dropped, its registry entry stays for late-arriving completion frames, with a TTL (default 30 s) before removal. Mirrors the .NET reference's "completed" dictionary. -5. Round-trip tests: synthesize a `0x32` callback with a known correlation id, hand-insert a registry entry, assert the emitted `OperationStatus.context` matches. - -**Definition of done:** -1. `Session::operation_status_events()` emits `OperationStatus.context = Some(_)` for at least the subscription-state-change path (`0x32` SubscriptionStatus frames carry the item correlation id, which the registry will already hold). -2. Round-trip tests demonstrate the populated-context path. -3. R3 in `70-risks-and-open-questions.md` updated from "Path A landed (kernel only)" to "Path A complete (kernel + correlation)". - -**Resolves when:** the registry lives and at least one wire path emits a populated `context`. - ### F53 — Enable `#![warn(missing_docs)]` workspace-wide **Severity:** P3 — doc-coverage tightening; not a correctness or release blocker. **Source:** F42 closeout — the missing-docs lint was deferred because enabling it surfaces hundreds of low-priority public-item gaps that are out of scope for that F-number. @@ -122,6 +104,19 @@ Between each publish: wait for the crate to be indexed before the next one's `ca ## Resolved +### F54 — Per-operation context correlation + compat `OnWriteComplete` fan-out +**Resolved:** 2026-05-06 (commit ``). Two-crate plumbing. + +**Part 1 — `mxaccess` (per-operation correlation).** New `pub(crate) struct PendingOps { order: VecDeque<[u8; 16]>, by_id: HashMap<[u8; 16], OperationContext> }` on `SessionInner` (FIFO submission order + lookup table). The 5-byte StatusWord frame and the 1-byte CompletionOnly frame carry no correlation id on the wire (`NmxOperationStatusMessage` is keyless), so the Rust port assigns a synthetic 16-byte id at submission time and the router pops the oldest pending entry on each arriving status frame. Operations on a single `Mutex` complete in submission order, so FIFO is the right correlation strategy. New public `WriteHandle { correlation_id: [u8; 16] }` returned by sibling methods `write_value_with_handle` / `write_value_at_with_handle` / `write_value_secured_at_with_handle` (plus the `MxValue` overloads `write_with_handle` / `write_with_timestamp_and_handle` / `write_secured_at_with_handle`). The non-handle methods `write_value` / `write_value_at` / etc. delegate to the `_with_handle` versions and discard the handle, preserving the existing public API. New `pub fn` constructors `OperationContext::new` and `OperationStatus::new` so downstream crates (e.g. `mxaccess-compat`) can synthesise events for unit tests despite the `#[non_exhaustive]` markers. `callback_router` gains a `pending_ops: Arc>` parameter and pops the oldest entry when an op-status frame arrives — populating `OperationStatus.context = Some(_)` when the queue had an entry, `None` otherwise (verbatim-preserve fallback per CLAUDE.md). Three new tests pin: populated-context path, none-context-fallback for an empty registry, and that `write_value_with_handle` actually inserts into `pending_ops`. + +**Part 2 — `mxaccess-compat` (compat-layer fan-out task).** New `correlation_to_item: Arc>>` on `LmxInner`. `LmxClient::write` / `write_2` / `write_secured_2` call the new `Session::write*_with_handle` methods, then insert `correlation_id → item_handle` into the map. `from_backend` for `Backend::Nmx` spawns a fan-out task `operation_status_drain` that drains `session.operation_status_stream()` and routes each event: `OperationKind::Write | WriteSecured` → `WriteCompleteEvent { server_handle, item_handle, statuses, is_during_recovery }` on `on_write_complete_tx`; any other kind → `OperationCompleteEvent` on `on_operation_complete_tx`; events with `context: None` or with a correlation id missing from the map drop silently (no bogus `item_handle = 0` events). The `JoinHandle` is held in a `std::sync::Mutex>>` and aborted on `LmxClient::unregister` + on `LmxInner::drop` — same pattern as the existing per-subscription `subscription_task`. ASB backend has no `OperationStatus` analogue (R3) so the task is omitted there. Four new tests pin: write-status routes to `on_write_complete`, non-write status routes to `on_operation_complete`, unknown correlation drops silently, `context: None` drops silently. + +**Wire/byte parity.** Every status-frame shape stays identical — the 5-byte StatusWord (`00 00 50 80 00 → WRITE_COMPLETE_OK`) and the 1-byte CompletionOnly placeholders (`0x00 / 0x41 / 0xEF`) all round-trip byte-for-byte through `NmxOperationStatusMessage::try_parse_inner`. The synthesizer kernel `MxStatus::from_packed_u32` is unchanged. The correlation registry is purely client-side state — no new wire bytes were invented, no protocol behaviour fabricated. + +**Public API surface.** Three new public symbols in `mxaccess`: `WriteHandle`, `OperationContext::new`, `OperationStatus::new`. Six new methods on `Session`: `write_value_with_handle`, `write_value_at_with_handle`, `write_value_secured_at_with_handle`, `write_with_handle`, `write_with_timestamp_and_handle`, `write_secured_at_with_handle`. Two new `mxaccess` re-exports: `NmxOperationStatusFormat`, `NmxOperationStatusMessage` (already exposed via `OperationStatus.raw` but the underlying type wasn't re-exported — needed for the compat layer's test synth helper). `mxaccess-compat` public surface unchanged. `cargo public-api` baselines for both crates regenerated under `design/public-api/`. + +**Verification.** `cargo build --workspace` / `cargo test --workspace` (823 → 830 tests, +7 new) / `cargo clippy --workspace --all-targets -- -D warnings` / `RUSTDOCFLAGS="-D warnings" cargo doc --workspace --no-deps` all pass. `cargo fmt -p mxaccess -p mxaccess-compat -- --check` clean. Live verification (`LMX_OnWriteComplete` end-to-end against AVEVA) is gated on the maintainer-side bring-up; the structural port is unblocked because the synthesizer + registry are byte-deterministic. + ### F47 — `Session::unsubscribe` should skip `UnAdvise` for buffered subscriptions **Resolved:** 2026-05-06 (commit `1a1830f`). `Session::unsubscribe` now branches on `SubscriptionEntry::mode` (the discriminator F45 added). For `SubscriptionMode::Buffered { ... }`, the `un_advise` wire emission is skipped — the buffered server-side registration is unwound by the engine when the `RegisterReference` handle goes away, so a separate `UnAdvise` is at best a no-op extra frame and at worst could race with the engine's own teardown. Mirrors the .NET reference's `if (!subscription.IsBuffered)` guard at `MxNativeSession.cs:361-381`. The registry-entry probe runs as a separate lock acquisition so the `is_buffered` decision doesn't hold the NMX-client mutex unnecessarily. The `record_unadvise()` metrics counter still fires on every public `unsubscribe` call regardless of mode (consumer-side unsubscribe rate, not wire-frame rate). New unit test `unsubscribe_skips_un_advise_for_buffered_subscription` issues a plain subscribe (recorded as 1 RPC), mutates the registry entry to `SubscriptionMode::Buffered`, calls unsubscribe, and asserts the recorded RPC count stays at 1 (no UnAdvise emitted). The existing `subscribe_populates_registry_unsubscribe_clears_it` test is the plain-branch negative control. Workspace 794 → 795 tests; clippy + rustdoc clean. diff --git a/design/public-api/mxaccess.txt b/design/public-api/mxaccess.txt index 67b66c6..640a98f 100644 --- a/design/public-api/mxaccess.txt +++ b/design/public-api/mxaccess.txt @@ -7,6 +7,8 @@ pub use mxaccess::MxStatusCategory pub use mxaccess::MxStatusSource pub use mxaccess::MxValue pub use mxaccess::MxValueKind +pub use mxaccess::NmxOperationStatusFormat +pub use mxaccess::NmxOperationStatusMessage pub use mxaccess::Resolver pub use mxaccess::ResolverError pub use mxaccess::WriteValue @@ -89,6 +91,8 @@ pub mxaccess::session::OperationContext::correlation_id: [u8; 16] pub mxaccess::session::OperationContext::op_kind: mxaccess::session::OperationKind pub mxaccess::session::OperationContext::reference: core::option::Option> pub mxaccess::session::OperationContext::retry_count: u32 +impl mxaccess::session::OperationContext +pub fn mxaccess::session::OperationContext::new(correlation_id: [u8; 16], op_kind: mxaccess::session::OperationKind, reference: core::option::Option>, retry_count: u32) -> Self impl core::clone::Clone for mxaccess::session::OperationContext pub fn mxaccess::session::OperationContext::clone(&self) -> mxaccess::session::OperationContext impl core::fmt::Debug for mxaccess::session::OperationContext @@ -105,6 +109,8 @@ pub mxaccess::session::OperationStatus::context: core::option::Option, is_during_recovery: bool) -> Self impl core::clone::Clone for mxaccess::session::OperationStatus pub fn mxaccess::session::OperationStatus::clone(&self) -> mxaccess::session::OperationStatus impl core::fmt::Debug for mxaccess::session::OperationStatus @@ -143,6 +149,26 @@ impl core::marker::Unpin for mxaccess::session::Subscription impl core::marker::UnsafeUnpin for mxaccess::session::Subscription impl !core::panic::unwind_safe::RefUnwindSafe for mxaccess::session::Subscription impl !core::panic::unwind_safe::UnwindSafe for mxaccess::session::Subscription +#[non_exhaustive] pub struct mxaccess::session::WriteHandle +pub mxaccess::session::WriteHandle::correlation_id: [u8; 16] +impl core::clone::Clone for mxaccess::session::WriteHandle +pub fn mxaccess::session::WriteHandle::clone(&self) -> mxaccess::session::WriteHandle +impl core::cmp::Eq for mxaccess::session::WriteHandle +impl core::cmp::PartialEq for mxaccess::session::WriteHandle +pub fn mxaccess::session::WriteHandle::eq(&self, other: &mxaccess::session::WriteHandle) -> bool +impl core::fmt::Debug for mxaccess::session::WriteHandle +pub fn mxaccess::session::WriteHandle::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result +impl core::hash::Hash for mxaccess::session::WriteHandle +pub fn mxaccess::session::WriteHandle::hash<__H: core::hash::Hasher>(&self, state: &mut __H) +impl core::marker::Copy for mxaccess::session::WriteHandle +impl core::marker::StructuralPartialEq for mxaccess::session::WriteHandle +impl core::marker::Freeze for mxaccess::session::WriteHandle +impl core::marker::Send for mxaccess::session::WriteHandle +impl core::marker::Sync for mxaccess::session::WriteHandle +impl core::marker::Unpin for mxaccess::session::WriteHandle +impl core::marker::UnsafeUnpin for mxaccess::session::WriteHandle +impl core::panic::unwind_safe::RefUnwindSafe for mxaccess::session::WriteHandle +impl core::panic::unwind_safe::UnwindSafe for mxaccess::session::WriteHandle pub fn mxaccess::session::filetime_to_system_time(filetime_ticks: i64) -> std::time::SystemTime pub fn mxaccess::session::system_time_to_filetime(time: std::time::SystemTime) -> core::result::Result pub type mxaccess::session::RebuildFactory = alloc::sync::Arc<(dyn core::ops::function::Fn() -> core::pin::Pin> + core::marker::Send)>> + core::marker::Send + core::marker::Sync)> @@ -482,6 +508,8 @@ pub mxaccess::OperationContext::correlation_id: [u8; 16] pub mxaccess::OperationContext::op_kind: mxaccess::session::OperationKind pub mxaccess::OperationContext::reference: core::option::Option> pub mxaccess::OperationContext::retry_count: u32 +impl mxaccess::session::OperationContext +pub fn mxaccess::session::OperationContext::new(correlation_id: [u8; 16], op_kind: mxaccess::session::OperationKind, reference: core::option::Option>, retry_count: u32) -> Self impl core::clone::Clone for mxaccess::session::OperationContext pub fn mxaccess::session::OperationContext::clone(&self) -> mxaccess::session::OperationContext impl core::fmt::Debug for mxaccess::session::OperationContext @@ -498,6 +526,8 @@ pub mxaccess::OperationStatus::context: core::option::Option, is_during_recovery: bool) -> Self impl core::clone::Clone for mxaccess::session::OperationStatus pub fn mxaccess::session::OperationStatus::clone(&self) -> mxaccess::session::OperationStatus impl core::fmt::Debug for mxaccess::session::OperationStatus @@ -556,7 +586,7 @@ pub fn mxaccess::Session::callbacks(&self) -> tokio::sync::broadcast::Receiver, recovery: mxaccess::RecoveryPolicy) -> core::result::Result pub async fn mxaccess::Session::has_recovery_factory(&self) -> bool pub fn mxaccess::Session::operation_status_events(&self) -> tokio::sync::broadcast::Receiver> -pub fn mxaccess::Session::operation_status_stream(&self) -> impl futures_core::stream::Stream, mxaccess::Error>> + core::marker::Send +pub fn mxaccess::Session::operation_status_stream(&self) -> impl futures_core::stream::Stream, mxaccess::Error>> + core::marker::Send + use<> pub async fn mxaccess::Session::read(&self, reference: &str, timeout: core::time::Duration) -> core::result::Result pub async fn mxaccess::Session::recover_connection(&self, policy: mxaccess::RecoveryPolicy) -> core::result::Result<(), mxaccess::Error> pub fn mxaccess::Session::recovery_events(&self) -> tokio::sync::broadcast::Receiver> @@ -568,7 +598,10 @@ pub async fn mxaccess::Session::subscribe(&self, reference: &str) -> core::resul pub async fn mxaccess::Session::unsubscribe(&self, subscription: mxaccess::session::Subscription) -> core::result::Result<(), mxaccess::Error> pub async fn mxaccess::Session::write_value(&self, reference: &str, value: mxaccess_codec::write_message::WriteValue) -> core::result::Result<(), mxaccess::Error> pub async fn mxaccess::Session::write_value_at(&self, reference: &str, value: mxaccess_codec::write_message::WriteValue, timestamp_filetime: i64) -> core::result::Result<(), mxaccess::Error> +pub async fn mxaccess::Session::write_value_at_with_handle(&self, reference: &str, value: mxaccess_codec::write_message::WriteValue, timestamp_filetime: i64) -> core::result::Result pub async fn mxaccess::Session::write_value_secured_at(&self, reference: &str, value: mxaccess_codec::write_message::WriteValue, timestamp_filetime: i64, security: mxaccess::SecurityContext) -> core::result::Result<(), mxaccess::Error> +pub async fn mxaccess::Session::write_value_secured_at_with_handle(&self, reference: &str, value: mxaccess_codec::write_message::WriteValue, timestamp_filetime: i64, security: mxaccess::SecurityContext) -> core::result::Result +pub async fn mxaccess::Session::write_value_with_handle(&self, reference: &str, value: mxaccess_codec::write_message::WriteValue) -> core::result::Result impl mxaccess::Session pub async fn mxaccess::Session::connect(_options: mxaccess::ConnectionOptions) -> core::result::Result pub async fn mxaccess::Session::shutdown(self, timeout: core::time::Duration) -> core::result::Result<(), mxaccess::Error> @@ -577,8 +610,11 @@ pub async fn mxaccess::Session::subscribe_many(&self, _references: &[&str]) -> c pub async fn mxaccess::Session::write(&self, reference: &str, value: mxaccess_codec::value::MxValue) -> core::result::Result<(), mxaccess::Error> pub async fn mxaccess::Session::write_secured(&self, _reference: &str, _value: mxaccess_codec::value::MxValue, _security: mxaccess::SecurityContext) -> core::result::Result<(), mxaccess::Error> pub async fn mxaccess::Session::write_secured_at(&self, reference: &str, value: mxaccess_codec::value::MxValue, timestamp: std::time::SystemTime, security: mxaccess::SecurityContext) -> core::result::Result<(), mxaccess::Error> +pub async fn mxaccess::Session::write_secured_at_with_handle(&self, reference: &str, value: mxaccess_codec::value::MxValue, timestamp: std::time::SystemTime, security: mxaccess::SecurityContext) -> core::result::Result pub async fn mxaccess::Session::write_with_completion(&self, _reference: &str, _value: mxaccess_codec::value::MxValue, _client_token: u32) -> core::result::Result<(), mxaccess::Error> +pub async fn mxaccess::Session::write_with_handle(&self, reference: &str, value: mxaccess_codec::value::MxValue) -> core::result::Result pub async fn mxaccess::Session::write_with_timestamp(&self, reference: &str, value: mxaccess_codec::value::MxValue, timestamp: std::time::SystemTime) -> core::result::Result<(), mxaccess::Error> +pub async fn mxaccess::Session::write_with_timestamp_and_handle(&self, reference: &str, value: mxaccess_codec::value::MxValue, timestamp: std::time::SystemTime) -> core::result::Result impl core::clone::Clone for mxaccess::Session pub fn mxaccess::Session::clone(&self) -> mxaccess::Session impl core::fmt::Debug for mxaccess::Session @@ -653,6 +689,26 @@ impl core::marker::Unpin for mxaccess::TransportCapabilities impl core::marker::UnsafeUnpin for mxaccess::TransportCapabilities impl core::panic::unwind_safe::RefUnwindSafe for mxaccess::TransportCapabilities impl core::panic::unwind_safe::UnwindSafe for mxaccess::TransportCapabilities +#[non_exhaustive] pub struct mxaccess::WriteHandle +pub mxaccess::WriteHandle::correlation_id: [u8; 16] +impl core::clone::Clone for mxaccess::session::WriteHandle +pub fn mxaccess::session::WriteHandle::clone(&self) -> mxaccess::session::WriteHandle +impl core::cmp::Eq for mxaccess::session::WriteHandle +impl core::cmp::PartialEq for mxaccess::session::WriteHandle +pub fn mxaccess::session::WriteHandle::eq(&self, other: &mxaccess::session::WriteHandle) -> bool +impl core::fmt::Debug for mxaccess::session::WriteHandle +pub fn mxaccess::session::WriteHandle::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result +impl core::hash::Hash for mxaccess::session::WriteHandle +pub fn mxaccess::session::WriteHandle::hash<__H: core::hash::Hasher>(&self, state: &mut __H) +impl core::marker::Copy for mxaccess::session::WriteHandle +impl core::marker::StructuralPartialEq for mxaccess::session::WriteHandle +impl core::marker::Freeze for mxaccess::session::WriteHandle +impl core::marker::Send for mxaccess::session::WriteHandle +impl core::marker::Sync for mxaccess::session::WriteHandle +impl core::marker::Unpin for mxaccess::session::WriteHandle +impl core::marker::UnsafeUnpin for mxaccess::session::WriteHandle +impl core::panic::unwind_safe::RefUnwindSafe for mxaccess::session::WriteHandle +impl core::panic::unwind_safe::UnwindSafe for mxaccess::session::WriteHandle pub trait mxaccess::Transport: core::marker::Send + core::marker::Sync + 'static pub fn mxaccess::Transport::capabilities(&self) -> mxaccess::TransportCapabilities pub fn mxaccess::Transport::kind(&self) -> mxaccess::TransportKind diff --git a/rust/crates/mxaccess-compat/src/lib.rs b/rust/crates/mxaccess-compat/src/lib.rs index 1bf8d92..c25bd1b 100644 --- a/rust/crates/mxaccess-compat/src/lib.rs +++ b/rust/crates/mxaccess-compat/src/lib.rs @@ -59,7 +59,10 @@ use std::task::{Context, Poll}; use std::time::SystemTime; use futures_util::{Stream, StreamExt}; -use mxaccess::{DataChange, Error, MxStatus, MxValue, SecurityContext, Session, Subscription}; +use mxaccess::{ + DataChange, Error, MxStatus, MxValue, OperationKind, OperationStatus, SecurityContext, Session, + Subscription, +}; use tokio::sync::{Mutex, broadcast}; use tokio::task::JoinHandle; use tokio_stream::wrappers::BroadcastStream; @@ -201,9 +204,36 @@ struct LmxInner { /// is in place so consumers can subscribe today; the trigger fires /// nothing until a captured byte mapping lands. on_operation_complete_tx: broadcast::Sender, + /// F54 — `correlation_id → item_handle` map populated by every + /// `LmxClient::write*` (alongside the synthetic op-kind so the + /// drain task can decide whether to route to `on_write_complete` + /// or `on_operation_complete`). Drained one-shot when an operation + /// status event arrives carrying a matching `OperationContext`. + /// Wrapped in `Arc>` so the spawned drain task can hold + /// its own reference without keeping a strong handle on the entire + /// `LmxInner` (which would otherwise prevent the `Drop` cleanup). + correlation_to_item: Arc>>, + /// F54 — handle to the fan-out task spawned at construction; + /// aborted on `unregister` / drop. `None` for the test backend + /// (no underlying session to drain). + operation_status_drain: std::sync::Mutex>>, disposed: AtomicBool, } +impl Drop for LmxInner { + fn drop(&mut self) { + // F54: abort the fan-out task on drop so the JoinHandle doesn't + // leak when the LmxClient is dropped without an explicit + // `unregister` call. Mirrors the existing `subscription_task` + // abort pattern at `un_advise`. + if let Ok(mut slot) = self.operation_status_drain.lock() { + if let Some(h) = slot.take() { + h.abort(); + } + } + } +} + impl LmxClient { /// `Register(clientName) → hServer` — open a session and return a /// facade-owned server handle. @@ -238,6 +268,31 @@ impl LmxClient { let (on_buffered_data_change_tx, _) = broadcast::channel(EVENT_CHANNEL_CAPACITY); let (on_write_complete_tx, _) = broadcast::channel(EVENT_CHANNEL_CAPACITY); let (on_operation_complete_tx, _) = broadcast::channel(EVENT_CHANNEL_CAPACITY); + let correlation_to_item = Arc::new(Mutex::new(HashMap::<[u8; 16], i32>::new())); + + // F54: for the NMX backend, spawn the operation-status drain + // task that maps incoming OperationStatus events back to the + // item_handle (via `correlation_to_item`) and fans out into the + // `on_write_complete` / `on_operation_complete` broadcast + // channels. The ASB backend has no operation-status stream + // analogue today (R3), so the task is omitted there. The test + // backend has no session at all, so it's also omitted. + let drain_task = match &backend { + Backend::Nmx(session) => { + let stream = session.operation_status_stream(); + let map = Arc::clone(&correlation_to_item); + let server_handle = 1; + let wc_tx = on_write_complete_tx.clone(); + let oc_tx = on_operation_complete_tx.clone(); + Some(tokio::spawn(async move { + operation_status_drain(stream, map, server_handle, wc_tx, oc_tx).await; + })) + } + Backend::Asb(_) => None, + #[cfg(test)] + Backend::Test => None, + }; + Self { inner: Arc::new(LmxInner { server_handle: 1, @@ -251,6 +306,8 @@ impl LmxClient { on_buffered_data_change_tx, on_write_complete_tx, on_operation_complete_tx, + correlation_to_item, + operation_status_drain: std::sync::Mutex::new(drain_task), disposed: AtomicBool::new(false), }), } @@ -329,6 +386,12 @@ impl LmxClient { } drop(items); self.inner.users.lock().await.clear(); + // F54: stop the operation-status drain task too. + if let Ok(mut slot) = self.inner.operation_status_drain.lock() { + if let Some(h) = slot.take() { + h.abort(); + } + } match &self.inner.backend { Backend::Nmx(s) => { @@ -417,7 +480,9 @@ impl LmxClient { self.check_server_handle(h_server)?; let (reference, is_buffered) = { let items = self.inner.items.lock().await; - let item = items.get(&h_item).ok_or_else(|| unknown_item_error(h_item))?; + let item = items + .get(&h_item) + .ok_or_else(|| unknown_item_error(h_item))?; if item.subscription_task.is_some() { return Ok(()); } @@ -462,7 +527,9 @@ impl LmxClient { pub async fn un_advise(&self, h_server: i32, h_item: i32) -> Result<(), Error> { self.check_server_handle(h_server)?; let mut items = self.inner.items.lock().await; - let item = items.get_mut(&h_item).ok_or_else(|| unknown_item_error(h_item))?; + let item = items + .get_mut(&h_item) + .ok_or_else(|| unknown_item_error(h_item))?; if let Some(task) = item.subscription_task.take() { task.abort(); } @@ -474,6 +541,11 @@ impl LmxClient { /// `Session::write` does not expose a per-write user id; it uses /// the engine identity). Use [`Self::write_secured_2`] for /// user-attributed writes. + /// + /// F54: returns `Ok(())` once the wire write succeeds; the caller + /// can drain [`Self::on_write_complete`] to observe the matching + /// `OnWriteComplete` event when its operation-status frame + /// arrives. pub async fn write( &self, h_server: i32, @@ -484,7 +556,17 @@ impl LmxClient { self.check_server_handle(h_server)?; let reference = self.item_reference(h_item).await?; let session = self.nmx_session()?; - session.write(&reference, value).await + // F54: register correlation_id → item_handle BEFORE dispatch + // so a status frame that races the wire send still finds the + // mapping. The drain task pops the entry one-shot when the + // matching OperationStatus arrives. + let handle = session.write_with_handle(&reference, value).await?; + self.inner + .correlation_to_item + .lock() + .await + .insert(handle.correlation_id, h_item); + Ok(()) } /// `Write2(hServer, hItem, value, time, userId)` — write with @@ -501,7 +583,15 @@ impl LmxClient { self.check_server_handle(h_server)?; let reference = self.item_reference(h_item).await?; let session = self.nmx_session()?; - session.write_with_timestamp(&reference, value, timestamp).await + let handle = session + .write_with_timestamp_and_handle(&reference, value, timestamp) + .await?; + self.inner + .correlation_to_item + .lock() + .await + .insert(handle.correlation_id, h_item); + Ok(()) } /// `WriteSecured(hServer, hItem, currUser, verifUser, value)` — @@ -555,8 +645,8 @@ impl LmxClient { self.check_server_handle(h_server)?; let reference = self.item_reference(h_item).await?; let session = self.nmx_session()?; - session - .write_secured_at( + let handle = session + .write_secured_at_with_handle( &reference, value, timestamp, @@ -565,7 +655,15 @@ impl LmxClient { verifier_user_id, }, ) + .await?; + // F54: register the correlation so OnWriteComplete fan-out + // works for secured writes as well. + self.inner + .correlation_to_item + .lock() .await + .insert(handle.correlation_id, h_item); + Ok(()) } /// `AuthenticateUser(hServer, user, pwd) → uid` — allocate a user @@ -591,11 +689,7 @@ impl LmxClient { /// shape as [`Self::authenticate_user`]; the GUID is validated for /// shape only (must parse as 32 hex digits with optional dashes — /// matches `Guid.TryParse` per `cs:543`). - pub async fn archestra_user_to_id( - &self, - h_server: i32, - user_guid: &str, - ) -> Result { + pub async fn archestra_user_to_id(&self, h_server: i32, user_guid: &str) -> Result { self.check_server_handle(h_server)?; if !is_guid_shape(user_guid) { return Err(invalid_argument(format!( @@ -615,7 +709,9 @@ impl LmxClient { pub async fn suspend(&self, h_server: i32, h_item: i32) -> Result { self.check_server_handle(h_server)?; let items = self.inner.items.lock().await; - let item = items.get(&h_item).ok_or_else(|| unknown_item_error(h_item))?; + let item = items + .get(&h_item) + .ok_or_else(|| unknown_item_error(h_item))?; if item.subscription_task.is_none() { return Err(invalid_argument( "Suspend requires an advised item handle".to_string(), @@ -632,7 +728,9 @@ impl LmxClient { pub async fn activate(&self, h_server: i32, h_item: i32) -> Result { self.check_server_handle(h_server)?; let items = self.inner.items.lock().await; - let item = items.get(&h_item).ok_or_else(|| unknown_item_error(h_item))?; + let item = items + .get(&h_item) + .ok_or_else(|| unknown_item_error(h_item))?; if item.subscription_task.is_none() { return Err(invalid_argument( "Activate requires an advised item handle".to_string(), @@ -816,6 +914,79 @@ async fn fanout_subscription( } } +// ---- F54: operation-status drain task --------------------------------- + +/// Drain the `Session::operation_status_stream()` Stream and route each +/// event to the matching `LmxClient` event channel. +/// +/// For each event: +/// 1. If `event.context` is `None` (no pending op was outstanding when +/// the frame arrived), drop silently — the .NET reference would +/// surface this as an `OperationCompleteEvent { item_handle = 0 }` +/// which is meaningless here. CLAUDE.md preserve-fallback applies. +/// 2. Look up `event.context?.correlation_id` in the +/// `correlation_to_item` map. If not present (the write didn't go +/// through the compat layer, or was already drained), drop silently. +/// 3. Branch on `event.context?.op_kind`: +/// - `Write` / `WriteSecured` → push a `WriteCompleteEvent` onto +/// `wc_tx`. +/// - any other kind → push an `OperationCompleteEvent` onto `oc_tx`. +/// 4. Remove the `correlation_id` entry from the map (one-shot). +/// +/// Loops until the underlying broadcast Stream ends (i.e. the +/// `Session` was shut down and its `operation_status_tx` Sender +/// dropped). Aborted via [`JoinHandle::abort`] from `LmxInner::drop` +/// if the consumer drops the `LmxClient` first. +async fn operation_status_drain( + mut stream: S, + correlation_to_item: Arc>>, + server_handle: i32, + wc_tx: broadcast::Sender, + oc_tx: broadcast::Sender, +) where + S: Stream, Error>> + Unpin, +{ + while let Some(item) = stream.next().await { + let event = match item { + Ok(ev) => ev, + // Lag-loss errors are surfaced to the raw consumer + // (Session::operation_status_events) already; drop here. + Err(_) => continue, + }; + let ctx = match &event.context { + Some(ctx) => ctx, + None => continue, // verbatim-preserve fallback per CLAUDE.md + }; + // One-shot lookup + remove. Held under a single guard. + let item_handle = { + let mut map = correlation_to_item.lock().await; + map.remove(&ctx.correlation_id) + }; + let Some(item_handle) = item_handle else { + continue; // not a write the LmxClient issued + }; + + match ctx.op_kind { + OperationKind::Write | OperationKind::WriteSecured => { + let _ = wc_tx.send(WriteCompleteEvent { + server_handle, + item_handle, + statuses: vec![event.status], + is_during_recovery: event.is_during_recovery, + }); + } + _ => { + let _ = oc_tx.send(OperationCompleteEvent { + server_handle, + item_handle, + statuses: vec![event.status], + is_during_recovery: event.is_during_recovery, + }); + } + } + } +} + // ---- Public stream wrapper -------------------------------------------- /// `Stream` over a broadcast channel, with `Lagged` errors silently @@ -870,7 +1041,10 @@ fn unknown_item_error(h_item: i32) -> Error { } fn is_guid_shape(s: &str) -> bool { - let stripped: String = s.chars().filter(|c| *c != '-' && *c != '{' && *c != '}').collect(); + let stripped: String = s + .chars() + .filter(|c| *c != '-' && *c != '{' && *c != '}') + .collect(); stripped.len() == 32 && stripped.chars().all(|c| c.is_ascii_hexdigit()) } @@ -885,7 +1059,12 @@ fn combine_item_context(item_def: &str, context: &str) -> String { // ---- Tests ------------------------------------------------------------ #[cfg(test)] -#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic, clippy::indexing_slicing)] +#[allow( + clippy::unwrap_used, + clippy::expect_used, + clippy::panic, + clippy::indexing_slicing +)] mod tests { use super::*; @@ -911,6 +1090,8 @@ mod tests { on_buffered_data_change_tx: tx_bdc, on_write_complete_tx: tx_wc, on_operation_complete_tx: tx_oc, + correlation_to_item: Arc::new(Mutex::new(HashMap::new())), + operation_status_drain: std::sync::Mutex::new(None), disposed: AtomicBool::new(false), }), } @@ -1036,7 +1217,10 @@ mod tests { let client = test_client(); let err = client.set_buffered_update_interval(1, 0).await.unwrap_err(); assert!(matches!(err, Error::Configuration(_))); - let err = client.set_buffered_update_interval(1, -1).await.unwrap_err(); + let err = client + .set_buffered_update_interval(1, -1) + .await + .unwrap_err(); assert!(matches!(err, Error::Configuration(_))); } @@ -1220,11 +1404,236 @@ mod tests { statuses: vec![MxStatus::DATA_CHANGE_OK], is_during_recovery: false, }; - client.inner.on_write_complete_tx.send(event.clone()).unwrap(); + client + .inner + .on_write_complete_tx + .send(event.clone()) + .unwrap(); let received = stream.next().await.expect("event received"); assert_eq!(received.item_handle, 9); } + // ---- F54: operation-status drain fan-out -------------------------- + + /// Build a synthetic [`OperationStatus`] for tests. Mirrors the + /// shape produced by `Session`'s `callback_router` for the proven + /// `00 00 50 80 00` 5-byte StatusWord frame, with the correlation + /// id + op_kind controllable by the caller. + fn synth_operation_status( + correlation_id: [u8; 16], + op_kind: OperationKind, + reference: &str, + is_during_recovery: bool, + ) -> Arc { + use mxaccess::{NmxOperationStatusFormat, NmxOperationStatusMessage}; + let raw = NmxOperationStatusMessage { + format: NmxOperationStatusFormat::StatusWord, + command: 0x00, + status_code: 0x8050, + completion_code: 0x00, + status: MxStatus::WRITE_COMPLETE_OK, + }; + let context = mxaccess::OperationContext::new( + correlation_id, + op_kind, + Some(Arc::::from(reference)), + /* retry_count */ 0, + ); + Arc::new(OperationStatus::new( + raw, + MxStatus::WRITE_COMPLETE_OK, + Some(context), + is_during_recovery, + )) + } + + /// F54 — drive the drain task with a synthetic + /// `Stream` carrying a Write-kind event whose + /// correlation id is registered in `correlation_to_item`. The + /// fan-out pushes a `WriteCompleteEvent` onto `on_write_complete` + /// with the matched `item_handle`. + #[tokio::test] + async fn drain_routes_write_status_to_on_write_complete() { + use futures_util::stream; + + let client = test_client(); + let item_handle = 7; + let correlation_id: [u8; 16] = [0xB1; 16]; + + // Pre-populate the correlation map (mirrors what + // `LmxClient::write` does after `Session::write_with_handle`). + { + let mut map = client.inner.correlation_to_item.lock().await; + map.insert(correlation_id, item_handle); + } + + // Build a one-event stream and drive the drain helper directly. + let event = synth_operation_status( + correlation_id, + OperationKind::Write, + "TestObj.TestInt", + /* is_during_recovery */ false, + ); + let stream = stream::iter(vec![Ok(event)]); + + let mut wc = client.on_write_complete(); + let _drain = operation_status_drain( + stream, + Arc::clone(&client.inner.correlation_to_item), + client.inner.server_handle, + client.inner.on_write_complete_tx.clone(), + client.inner.on_operation_complete_tx.clone(), + ); + // Run the future to completion (one iteration; stream ends). + _drain.await; + + let received = tokio::time::timeout(std::time::Duration::from_secs(1), wc.next()) + .await + .expect("drain timed out") + .expect("stream returned None"); + + // F54 contract: server_handle / item_handle / statuses / + // is_during_recovery match the synthetic event. + assert_eq!(received.server_handle, client.inner.server_handle); + assert_eq!(received.item_handle, item_handle); + assert_eq!(received.statuses, vec![MxStatus::WRITE_COMPLETE_OK]); + assert!(!received.is_during_recovery); + + // One-shot semantics: the entry has been removed from the map. + let map = client.inner.correlation_to_item.lock().await; + assert!(map.is_empty(), "correlation_to_item must be drained"); + } + + /// F54 — same shape as the write test but with `OperationKind::Read` + /// — must route to `on_operation_complete` instead of + /// `on_write_complete`. + #[tokio::test] + async fn drain_routes_non_write_status_to_on_operation_complete() { + use futures_util::stream; + + let client = test_client(); + let item_handle = 11; + let correlation_id: [u8; 16] = [0xB2; 16]; + + { + let mut map = client.inner.correlation_to_item.lock().await; + map.insert(correlation_id, item_handle); + } + + let event = synth_operation_status( + correlation_id, + OperationKind::Read, + "TestObj.TestInt", + /* is_during_recovery */ false, + ); + let stream = stream::iter(vec![Ok(event)]); + + let mut wc = client.on_write_complete(); + let mut oc = client.on_operation_complete(); + operation_status_drain( + stream, + Arc::clone(&client.inner.correlation_to_item), + client.inner.server_handle, + client.inner.on_write_complete_tx.clone(), + client.inner.on_operation_complete_tx.clone(), + ) + .await; + + // OperationCompleteEvent fired. + let received = tokio::time::timeout(std::time::Duration::from_secs(1), oc.next()) + .await + .expect("drain timed out") + .expect("stream returned None"); + assert_eq!(received.item_handle, item_handle); + assert_eq!(received.statuses, vec![MxStatus::WRITE_COMPLETE_OK]); + + // No WriteCompleteEvent on the write channel. + let res = tokio::time::timeout(std::time::Duration::from_millis(100), wc.next()).await; + assert!( + res.is_err(), + "non-write op must NOT fire OnWriteComplete; got {res:?}" + ); + } + + /// F54 — an operation-status event whose correlation_id has no + /// matching entry in `correlation_to_item` is dropped silently. + /// Don't fire a bogus event with item_handle = 0. + #[tokio::test] + async fn drain_drops_event_with_unknown_correlation() { + use futures_util::stream; + + let client = test_client(); + // No insertion into correlation_to_item — the event will be + // unknown. + let event = + synth_operation_status([0xCC; 16], OperationKind::Write, "TestObj.TestInt", false); + let stream = stream::iter(vec![Ok(event)]); + + let mut wc = client.on_write_complete(); + let mut oc = client.on_operation_complete(); + operation_status_drain( + stream, + Arc::clone(&client.inner.correlation_to_item), + client.inner.server_handle, + client.inner.on_write_complete_tx.clone(), + client.inner.on_operation_complete_tx.clone(), + ) + .await; + + // Neither channel should fire. + let wc_res = tokio::time::timeout(std::time::Duration::from_millis(100), wc.next()).await; + let oc_res = tokio::time::timeout(std::time::Duration::from_millis(100), oc.next()).await; + assert!( + wc_res.is_err(), + "unknown correlation must NOT fire on_write_complete" + ); + assert!( + oc_res.is_err(), + "unknown correlation must NOT fire on_operation_complete" + ); + } + + /// F54 — an OperationStatus with `context: None` (the event + /// arrived without a matching pending op) is dropped silently — + /// CLAUDE.md preserve-fallback applies. + #[tokio::test] + async fn drain_drops_event_with_none_context() { + use futures_util::stream; + use mxaccess::{NmxOperationStatusFormat, NmxOperationStatusMessage}; + + let client = test_client(); + let raw = NmxOperationStatusMessage { + format: NmxOperationStatusFormat::StatusWord, + command: 0x00, + status_code: 0x8050, + completion_code: 0x00, + status: MxStatus::WRITE_COMPLETE_OK, + }; + let event = Arc::new(OperationStatus::new( + raw, + MxStatus::WRITE_COMPLETE_OK, + /* context */ None, // verbatim-preserve fallback + /* is_during_recovery */ false, + )); + let stream = stream::iter(vec![Ok(event)]); + + let mut wc = client.on_write_complete(); + operation_status_drain( + stream, + Arc::clone(&client.inner.correlation_to_item), + client.inner.server_handle, + client.inner.on_write_complete_tx.clone(), + client.inner.on_operation_complete_tx.clone(), + ) + .await; + + let wc_res = tokio::time::timeout(std::time::Duration::from_millis(100), wc.next()).await; + assert!( + wc_res.is_err(), + "context=None must drop silently — got {wc_res:?}" + ); + } + #[tokio::test] async fn operation_complete_event_stream_yields_published_items() { let client = test_client(); @@ -1244,4 +1653,3 @@ mod tests { assert_eq!(received.item_handle, 5); } } - diff --git a/rust/crates/mxaccess/src/lib.rs b/rust/crates/mxaccess/src/lib.rs index 21d8a89..61675da 100644 --- a/rust/crates/mxaccess/src/lib.rs +++ b/rust/crates/mxaccess/src/lib.rs @@ -24,7 +24,8 @@ use std::sync::Arc; use std::time::{Duration, SystemTime}; pub use mxaccess_codec::{ - MxDataType, MxReferenceHandle, MxStatus, MxStatusCategory, MxStatusSource, MxValue, MxValueKind, + MxDataType, MxReferenceHandle, MxStatus, MxStatusCategory, MxStatusSource, MxValue, + MxValueKind, NmxOperationStatusFormat, NmxOperationStatusMessage, }; // ---- Public types -------------------------------------------------------- @@ -39,7 +40,9 @@ pub use transport_asb::AsbTransport; pub use mxaccess_galaxy::{GalaxyTagMetadata, Resolver, ResolverError}; pub use mxaccess_nmx::WriteValue; -pub use session::{OperationContext, OperationKind, OperationStatus, RebuildFactory, Subscription}; +pub use session::{ + OperationContext, OperationKind, OperationStatus, RebuildFactory, Subscription, WriteHandle, +}; /// Async session façade. Cheap clones share the inner state; drop of the last /// clone fires `UnregisterEngine` best-effort. For deterministic shutdown, @@ -456,8 +459,24 @@ impl Session { /// `ElapsedTime` and their array variants — see module-level note /// for why). pub async fn write(&self, reference: &str, value: MxValue) -> Result<(), Error> { + self.write_with_handle(reference, value).await.map(|_| ()) + } + + /// `MxValue` overload of [`Self::write_value_with_handle`]. Same + /// conversion rules as [`Self::write`]; returns the + /// [`session::WriteHandle`] inserted into the session's + /// `pending_ops` registry so the caller can correlate this write + /// to a later [`session::OperationStatus`] event (F54). + /// + /// # Errors + /// As for [`Self::write`]. + pub async fn write_with_handle( + &self, + reference: &str, + value: MxValue, + ) -> Result { let wv = mxvalue_to_writevalue(value)?; - self.write_value(reference, wv).await + self.write_value_with_handle(reference, wv).await } /// Write-with-completion — paired write + `OperationComplete` @@ -498,9 +517,26 @@ impl Session { value: MxValue, timestamp: SystemTime, ) -> Result<(), Error> { + self.write_with_timestamp_and_handle(reference, value, timestamp) + .await + .map(|_| ()) + } + + /// `MxValue` overload of [`Self::write_value_at_with_handle`]. + /// Same conversion rules as [`Self::write_with_timestamp`]; returns + /// the [`session::WriteHandle`] for F54 correlation. + /// + /// # Errors + /// As for [`Self::write_with_timestamp`]. + pub async fn write_with_timestamp_and_handle( + &self, + reference: &str, + value: MxValue, + timestamp: SystemTime, + ) -> Result { let wv = mxvalue_to_writevalue(value)?; let ft = session::system_time_to_filetime(timestamp)?; - self.write_value_at(reference, wv, ft).await + self.write_value_at_with_handle(reference, wv, ft).await } /// Verified Write without an explicit timestamp. Currently @@ -542,9 +578,27 @@ impl Session { timestamp: SystemTime, security: SecurityContext, ) -> Result<(), Error> { + self.write_secured_at_with_handle(reference, value, timestamp, security) + .await + .map(|_| ()) + } + + /// `MxValue` overload of [`Self::write_value_secured_at_with_handle`]. + /// Same conversion rules as [`Self::write_secured_at`]; returns + /// the [`session::WriteHandle`] for F54 correlation. + /// + /// # Errors + /// As for [`Self::write_secured_at`]. + pub async fn write_secured_at_with_handle( + &self, + reference: &str, + value: MxValue, + timestamp: SystemTime, + security: SecurityContext, + ) -> Result { let wv = mxvalue_to_writevalue(value)?; let ft = session::system_time_to_filetime(timestamp)?; - self.write_value_secured_at(reference, wv, ft, security) + self.write_value_secured_at_with_handle(reference, wv, ft, security) .await } diff --git a/rust/crates/mxaccess/src/session.rs b/rust/crates/mxaccess/src/session.rs index c7c8055..e0a29a8 100644 --- a/rust/crates/mxaccess/src/session.rs +++ b/rust/crates/mxaccess/src/session.rs @@ -42,7 +42,7 @@ use mxaccess_nmx::{NmxClient, NmxClientError, WriteValue}; use mxaccess_rpc::guid::Guid; use mxaccess_rpc::ntlm::{NtlmClientContext, local_hostname}; use mxaccess_rpc::transport::TransportError; -use std::collections::HashMap; +use std::collections::{HashMap, VecDeque}; use std::net::SocketAddr; use std::pin::Pin; use std::task::{Context, Poll}; @@ -124,18 +124,29 @@ pub enum OperationKind { /// Per-operation context tracked for outstanding RPCs. /// -/// The Rust port currently uses this struct only to enrich -/// [`OperationStatus`] events surfaced via -/// [`Session::operation_status_events`]. Future work -/// (`design/70-risks-and-open-questions.md` R3/R4 Path A follow-on) -/// will let the consumer correlate completion frames back to specific -/// outstanding write/subscribe calls; the current bring-up always -/// emits `OperationStatus.context = None` because the operation→ -/// completion correlation channel is not yet wired. +/// Populated by every public `Session` call that issues an outstanding +/// NMX op (`write*`, `read`, `subscribe*`) and consumed by the callback +/// router when an operation-status frame arrives — the matching context +/// is attached to [`OperationStatus::context`] and removed from the +/// `pending_ops` registry. F54 wired the population path; the +/// kernel itself stays byte-deterministic per R3/R4 Path A. /// /// Mirrors the bookkeeping `MxNativeSession` does in its private /// `_pendingWrites` / `_pendingReads` dictionaries (referenced /// in the source but not exposed publicly). +/// +/// ## Correlation strategy +/// +/// The 5-byte StatusWord and 1-byte CompletionOnly frames the LMX +/// engine emits do **not** carry an explicit per-op correlation id on +/// the wire. The Rust port assigns a synthetic 16-byte +/// [`Self::correlation_id`] at submission time, stores it in a FIFO +/// queue, and pops the oldest entry when an operation-status arrives — +/// matching the engine's serialised submission/completion model +/// (operations on a single `Mutex` complete in submission +/// order). The synthetic id is exposed to consumers via [`WriteHandle`] +/// so layers like `mxaccess-compat`'s `LmxClient` can map a write back +/// to the `item_handle` it originated from. #[derive(Debug, Clone)] #[non_exhaustive] pub struct OperationContext { @@ -155,6 +166,27 @@ pub struct OperationContext { pub retry_count: u32, } +impl OperationContext { + /// Construct an `OperationContext`. Public so downstream crates + /// (e.g. `mxaccess-compat`) can synthesise events for unit tests + /// — the `#[non_exhaustive]` marker still applies for source-level + /// breaking-change protection. + #[must_use] + pub fn new( + correlation_id: [u8; 16], + op_kind: OperationKind, + reference: Option>, + retry_count: u32, + ) -> Self { + Self { + correlation_id, + op_kind, + reference, + retry_count, + } + } +} + /// One operation-status event surfaced to consumers via /// [`Session::operation_status_events`]. /// @@ -175,9 +207,11 @@ pub struct OperationContext { /// [`MxStatus::from_packed_u32`] directly.** /// - [`Self::context`] carries the originating /// [`OperationContext`] when the event can be correlated back to a -/// tracked outstanding operation. The current implementation -/// always emits `None` — operation-tracking plumbing lands as a -/// follow-up (see the module-level docs). +/// tracked outstanding operation. F54 wired this for the FIFO +/// submission model — events that arrive when the `pending_ops` +/// queue is non-empty pop the oldest pending op and attach its +/// context. Events arriving with an empty registry surface with +/// `context: None` (verbatim-preserve fallback per CLAUDE.md). #[derive(Debug, Clone)] #[non_exhaustive] pub struct OperationStatus { @@ -186,8 +220,11 @@ pub struct OperationStatus { /// Typed status (synthesizer-promoted for known shapes; verbatim /// for unknown). pub status: MxStatus, - /// Optional originating-call context. Always `None` until the - /// operation-tracking plumbing is wired (see module-level docs). + /// Optional originating-call context. F54: populated by the + /// callback router when the `pending_ops` queue had an oldest + /// entry at the time the frame arrived; `None` otherwise (the + /// frame arrived without a matching outstanding op — preserved + /// verbatim per CLAUDE.md). pub context: Option, /// `true` when the frame arrived during an active /// `Session::recover_connection` window. Mirrors @@ -196,6 +233,50 @@ pub struct OperationStatus { pub is_during_recovery: bool, } +impl OperationStatus { + /// Construct an `OperationStatus`. Public so downstream crates + /// (e.g. `mxaccess-compat`) can synthesise events for unit tests + /// — the `#[non_exhaustive]` marker still applies for source-level + /// breaking-change protection. + #[must_use] + pub fn new( + raw: NmxOperationStatusMessage, + status: MxStatus, + context: Option, + is_during_recovery: bool, + ) -> Self { + Self { + raw, + status, + context, + is_during_recovery, + } + } +} + +/// Handle returned by the `write_*_with_handle` family of [`Session`] +/// methods so the caller can correlate this specific outstanding write +/// back to the next [`OperationStatus`] event. +/// +/// The contained [`Self::correlation_id`] is the same 16-byte +/// identifier inserted into the session's `pending_ops` registry; +/// consumers that drain [`Session::operation_status_events`] can match +/// on `event.context?.correlation_id == handle.correlation_id` to +/// filter to a specific call's completion. +/// +/// The `mxaccess-compat::LmxClient` uses this to map back to the +/// `item_handle` for `OnWriteComplete` fan-out (F54). +/// +/// Mirrors the .NET reference's `MxNativeSession._pendingWrites` +/// dictionary key role — the consumer holds the key client-side. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[non_exhaustive] +pub struct WriteHandle { + /// Synthetic 16-byte correlation id inserted into the session's + /// `pending_ops` registry at submission time. + pub correlation_id: [u8; 16], +} + /// Subscription handle returned by [`Session::subscribe`]. Implements /// `Stream>` — driving it forward /// yields one [`DataChange`] per matching record observed on the @@ -499,6 +580,70 @@ pub struct SessionInner { /// `recover_connection` returns /// [`Error::Configuration`] with `RecoveryNotConfigured`. pub(crate) rebuild_factory: Mutex>, + /// F54 — outstanding-operation registry. Each public Session call + /// that issues an outstanding NMX op (`write*`, `read`, + /// `subscribe*`) inserts an [`OperationContext`] keyed by a + /// synthetic 16-byte correlation id; the callback router pops the + /// oldest entry from `pending_ops_order` when an + /// [`NmxOperationStatusMessage`] arrives and uses the popped id to + /// look up + remove the matching context from `pending_ops`. The + /// FIFO ordering matches the engine's serialised submission / + /// completion model — operations on a single `Mutex` + /// complete in submission order. + /// + /// The two structures are kept in sync under a shared `Arc>` + /// so the spawned router task can reach them without holding a + /// strong reference to the entire `SessionInner`. Mirrors the .NET + /// reference's private `_pendingWrites` / `_pendingReads` + /// dictionaries (`MxNativeSession.cs` field-level comments) plus + /// the ordered list those dictionaries are consulted against. + pub(crate) pending_ops: Arc>, +} + +/// FIFO-ordered registry of outstanding NMX operations waiting for an +/// operation-status frame to arrive. See +/// [`SessionInner::pending_ops`] for the correlation strategy. +#[derive(Debug, Default)] +pub(crate) struct PendingOps { + /// Submission order — the next operation-status pops the front. + pub(crate) order: VecDeque<[u8; 16]>, + /// Lookup table by correlation id, sized to match `order`. + pub(crate) by_id: HashMap<[u8; 16], OperationContext>, +} + +impl PendingOps { + /// Insert a new outstanding op, recording its submission order. + pub(crate) fn push(&mut self, ctx: OperationContext) { + self.order.push_back(ctx.correlation_id); + self.by_id.insert(ctx.correlation_id, ctx); + } + + /// Pop the oldest pending entry. Returns `None` when no operation + /// is outstanding — the arriving status frame then surfaces with + /// `context: None` (verbatim-preserve fallback per CLAUDE.md). + pub(crate) fn pop_oldest(&mut self) -> Option { + loop { + let cid = self.order.pop_front()?; + // `by_id` may have lost the entry (e.g. cancelled) — keep + // popping until either we find one or the queue empties. + if let Some(ctx) = self.by_id.remove(&cid) { + return Some(ctx); + } + } + } + + /// Remove a specific entry by correlation id (e.g. caller cancelled + /// the await). Drops the entry from the lookup table; the next + /// `pop_oldest` call will skip the orphaned slot in `order`. + pub(crate) fn remove(&mut self, cid: &[u8; 16]) -> Option { + self.by_id.remove(cid) + } + + /// Number of currently-outstanding ops (live in `by_id`). + #[cfg(test)] + pub(crate) fn len(&self) -> usize { + self.by_id.len() + } } /// Per-subscription state retained for [`Session::recover_connection`]. @@ -627,6 +772,7 @@ pub(crate) async fn callback_router( callback_tx: broadcast::Sender>, operation_status_tx: broadcast::Sender>, recovery_active: Arc, + pending_ops: Arc>, ) { while let Some(event) = events.recv().await { if let CallbackEvent::CallbackInvoked { body, .. } = event { @@ -637,13 +783,19 @@ pub(crate) async fn callback_router( let is_during_recovery = recovery_active.load(std::sync::atomic::Ordering::Acquire) > 0; let typed = op.promote_to_typed(); + // F54: pop the oldest outstanding op (FIFO submission + // order). The wire frame carries no correlation id, so + // the oldest pending entry is matched. When the queue + // is empty, the event surfaces with `context: None` + // (verbatim-preserve fallback per CLAUDE.md). + let context = { + let mut guard = pending_ops.lock().await; + guard.pop_oldest() + }; let _ = operation_status_tx.send(Arc::new(OperationStatus { raw: op, status: typed, - // Operation-tracking plumbing not yet wired — - // always emit context=None for now (R3/R4 - // follow-on tracks adding the correlation channel). - context: None, + context, is_during_recovery, })); continue; @@ -782,11 +934,16 @@ impl Session { let (operation_status_tx, _) = broadcast::channel::>(OPERATION_STATUS_BROADCAST_CAPACITY); let recovery_active = Arc::new(std::sync::atomic::AtomicU32::new(0)); + // F54: shared pending-ops registry — hand a clone to the + // router task; the SessionInner holds the other clone so the + // public `write_*_with_handle` family can populate it. + let pending_ops: Arc> = Arc::new(Mutex::new(PendingOps::default())); let router_handle = tokio::spawn(callback_router( callback_events, callback_tx.clone(), operation_status_tx.clone(), recovery_active.clone(), + pending_ops.clone(), )); // 3. RegisterEngine2 with the callback OBJREF. Mirrors cs:163-175. @@ -839,6 +996,7 @@ impl Session { subscriptions: Mutex::new(HashMap::new()), callback_obj_ref, rebuild_factory: Mutex::new(None), + pending_ops, }), }) } @@ -899,7 +1057,7 @@ impl Session { /// or lag-handling matters. pub fn operation_status_stream( &self, - ) -> impl Stream, Error>> + Send { + ) -> impl Stream, Error>> + Send + use<> { let rx = self.inner.operation_status_tx.subscribe(); BroadcastStream::new(rx).map(|item| match item { Ok(ev) => Ok(ev), @@ -1221,6 +1379,28 @@ impl Session { /// - [`Error::Status`]-shaped error if the LMX server returned a /// non-zero application HRESULT. pub async fn write_value(&self, reference: &str, value: WriteValue) -> Result<(), Error> { + self.write_value_with_handle(reference, value) + .await + .map(|_| ()) + } + + /// `write_value` variant that returns the [`WriteHandle`] inserted + /// into the session's `pending_ops` registry before dispatch — + /// useful when the caller needs to correlate this specific write + /// back to a later [`OperationStatus`] event (e.g. the + /// `mxaccess-compat::LmxClient` `OnWriteComplete` fan-out, F54). + /// + /// Functionally identical to [`Self::write_value`]: both delegate + /// to the same underlying NMX `Write` op; only the return type + /// differs. + /// + /// # Errors + /// As for [`Self::write_value`]. + pub async fn write_value_with_handle( + &self, + reference: &str, + value: WriteValue, + ) -> Result { self.ensure_connected()?; let inner = self.inner.clone(); let metadata = inner @@ -1229,9 +1409,15 @@ impl Session { .await .map_err(map_resolver)?; let opts = &inner.options; + // F54: register the outstanding op BEFORE dispatching so a + // status frame that races with the wire send still sees the + // entry in `pending_ops`. + let handle = self + .register_pending_op(OperationKind::Write, Some(reference)) + .await; let started = std::time::Instant::now(); let mut nmx = inner.nmx.lock().await; - let hr = nmx + let hr_result = nmx .write( opts.local_engine_id, &metadata, @@ -1242,13 +1428,25 @@ impl Session { /* source_galaxy_id */ i32::from(opts.galaxy_id), opts.source_platform_id, ) - .await - .map_err(map_nmx)?; - ensure_hresult_ok(hr)?; + .await; + let hr = match hr_result { + Ok(hr) => hr, + Err(e) => { + // Wire send failed — no status frame will arrive, so + // remove the pending entry to avoid mis-correlating a + // future op. + self.remove_pending_op(&handle.correlation_id).await; + return Err(map_nmx(e)); + } + }; + if let Err(e) = ensure_hresult_ok(hr) { + self.remove_pending_op(&handle.correlation_id).await; + return Err(e); + } // F40 — count + record latency only on the success path. session_metrics::record_write(); session_metrics::record_write_latency(started.elapsed()); - Ok(()) + Ok(handle) } /// Write a value with an explicit Windows FILETIME timestamp. Mirrors @@ -1266,6 +1464,22 @@ impl Session { value: WriteValue, timestamp_filetime: i64, ) -> Result<(), Error> { + self.write_value_at_with_handle(reference, value, timestamp_filetime) + .await + .map(|_| ()) + } + + /// `write_value_at` variant that returns the [`WriteHandle`] — + /// see [`Self::write_value_with_handle`] for the rationale. + /// + /// # Errors + /// As for [`Self::write_value_at`]. + pub async fn write_value_at_with_handle( + &self, + reference: &str, + value: WriteValue, + timestamp_filetime: i64, + ) -> Result { self.ensure_connected()?; let inner = self.inner.clone(); let metadata = inner @@ -1274,9 +1488,12 @@ impl Session { .await .map_err(map_resolver)?; let opts = &inner.options; + let handle = self + .register_pending_op(OperationKind::Write, Some(reference)) + .await; let started = std::time::Instant::now(); let mut nmx = inner.nmx.lock().await; - let hr = nmx + let hr_result = nmx .write2( opts.local_engine_id, &metadata, @@ -1288,14 +1505,23 @@ impl Session { /* source_galaxy_id */ i32::from(opts.galaxy_id), opts.source_platform_id, ) - .await - .map_err(map_nmx)?; - ensure_hresult_ok(hr)?; + .await; + let hr = match hr_result { + Ok(hr) => hr, + Err(e) => { + self.remove_pending_op(&handle.correlation_id).await; + return Err(map_nmx(e)); + } + }; + if let Err(e) = ensure_hresult_ok(hr) { + self.remove_pending_op(&handle.correlation_id).await; + return Err(e); + } // F40 — write2 shares the writes counter (same Session::write* // family on the wire). session_metrics::record_write(); session_metrics::record_write_latency(started.elapsed()); - Ok(()) + Ok(handle) } /// Verified write — secured-classification tags require a pair of @@ -1322,6 +1548,24 @@ impl Session { timestamp_filetime: i64, security: SecurityContext, ) -> Result<(), Error> { + self.write_value_secured_at_with_handle(reference, value, timestamp_filetime, security) + .await + .map(|_| ()) + } + + /// `write_value_secured_at` variant that returns the + /// [`WriteHandle`] — see [`Self::write_value_with_handle`] for the + /// rationale. + /// + /// # Errors + /// As for [`Self::write_value_secured_at`]. + pub async fn write_value_secured_at_with_handle( + &self, + reference: &str, + value: WriteValue, + timestamp_filetime: i64, + security: SecurityContext, + ) -> Result { self.ensure_connected()?; let inner = self.inner.clone(); let metadata = inner @@ -1330,9 +1574,12 @@ impl Session { .await .map_err(map_resolver)?; let opts = &inner.options; + let handle = self + .register_pending_op(OperationKind::WriteSecured, Some(reference)) + .await; let started = std::time::Instant::now(); let mut nmx = inner.nmx.lock().await; - let hr = nmx + let hr_result = nmx .write_secured2( opts.local_engine_id, &metadata, @@ -1347,13 +1594,51 @@ impl Session { /* source_galaxy_id */ i32::from(opts.galaxy_id), opts.source_platform_id, ) - .await - .map_err(map_nmx)?; - ensure_hresult_ok(hr)?; + .await; + let hr = match hr_result { + Ok(hr) => hr, + Err(e) => { + self.remove_pending_op(&handle.correlation_id).await; + return Err(map_nmx(e)); + } + }; + if let Err(e) = ensure_hresult_ok(hr) { + self.remove_pending_op(&handle.correlation_id).await; + return Err(e); + } // F40 — secured-write success counts toward the writes total. session_metrics::record_write(); session_metrics::record_write_latency(started.elapsed()); - Ok(()) + Ok(handle) + } + + /// Insert a fresh [`OperationContext`] into [`SessionInner::pending_ops`] + /// and return the associated [`WriteHandle`]. F54 helper: every public + /// op (`write*`, `read`, `subscribe*`) calls this before dispatching + /// the wire op so the callback router can populate + /// [`OperationStatus::context`] when a status frame arrives. + async fn register_pending_op( + &self, + op_kind: OperationKind, + reference: Option<&str>, + ) -> WriteHandle { + let correlation_id: [u8; 16] = rand::random(); + let ctx = OperationContext { + correlation_id, + op_kind, + reference: reference.map(Arc::::from), + retry_count: 0, + }; + let mut guard = self.inner.pending_ops.lock().await; + guard.push(ctx); + WriteHandle { correlation_id } + } + + /// Remove a pending-op entry by correlation id (e.g. after a wire + /// failure when no completion frame will ever arrive). + async fn remove_pending_op(&self, correlation_id: &[u8; 16]) { + let mut guard = self.inner.pending_ops.lock().await; + let _ = guard.remove(correlation_id); } /// Pre-resolve the wire kind a tag expects without dispatching a @@ -2116,11 +2401,13 @@ mod tests { let (operation_status_tx, _) = broadcast::channel::>(OPERATION_STATUS_BROADCAST_CAPACITY); let recovery_active = Arc::new(std::sync::atomic::AtomicU32::new(0)); + let pending_ops: Arc> = Arc::new(Mutex::new(PendingOps::default())); let router_handle = tokio::spawn(callback_router( callback_events, callback_tx.clone(), operation_status_tx.clone(), recovery_active.clone(), + pending_ops.clone(), )); let (recovery_tx, _) = broadcast::channel(RECOVERY_BROADCAST_CAPACITY); @@ -2139,6 +2426,7 @@ mod tests { subscriptions: Mutex::new(HashMap::new()), callback_obj_ref: Vec::new(), rebuild_factory: Mutex::new(None), + pending_ops, }), }) } @@ -2572,12 +2860,14 @@ mod tests { let (callback_tx, mut callback_rx) = broadcast::channel(8); let (operation_status_tx, _) = broadcast::channel::>(8); let recovery_active = Arc::new(std::sync::atomic::AtomicU32::new(0)); + let pending_ops: Arc> = Arc::new(Mutex::new(PendingOps::default())); let router_h = tokio::spawn(callback_router( event_rx, callback_tx, operation_status_tx, recovery_active, + pending_ops, )); // Build a minimal valid 0x32 SubscriptionStatus body: 23-byte @@ -3353,12 +3643,14 @@ mod tests { let (operation_status_tx, mut operation_status_rx) = broadcast::channel::>(8); let recovery_active = Arc::new(std::sync::atomic::AtomicU32::new(0)); + let pending_ops: Arc> = Arc::new(Mutex::new(PendingOps::default())); let router_h = tokio::spawn(callback_router( event_rx, callback_tx, operation_status_tx, recovery_active, + pending_ops, )); // Inner body is the proven `00 00 50 80 00` 5-byte status-word frame. @@ -3384,7 +3676,8 @@ mod tests { // Synthesizer-promoted status equals the canonical sentinel. assert_eq!(event.status, MxStatus::WRITE_COMPLETE_OK); - // Context not yet wired — always None for this iteration. + // F54: context is None when no pending op was outstanding — + // the registry was empty at frame-arrival time. assert!(event.context.is_none()); // No recovery in flight when the event was dispatched. assert!(!event.is_during_recovery); @@ -3416,11 +3709,13 @@ mod tests { let (operation_status_tx, mut operation_status_rx) = broadcast::channel::>(8); let recovery_active = Arc::new(std::sync::atomic::AtomicU32::new(0)); + let pending_ops: Arc> = Arc::new(Mutex::new(PendingOps::default())); let router_h = tokio::spawn(callback_router( event_rx, callback_tx, operation_status_tx, recovery_active, + pending_ops, )); let inner = [byte]; @@ -3466,11 +3761,13 @@ mod tests { let (operation_status_tx, mut operation_status_rx) = broadcast::channel::>(8); let recovery_active = Arc::new(std::sync::atomic::AtomicU32::new(1)); + let pending_ops: Arc> = Arc::new(Mutex::new(PendingOps::default())); let router_h = tokio::spawn(callback_router( event_rx, callback_tx, operation_status_tx, recovery_active, + pending_ops, )); let inner = [0x00, 0x00, 0x50, 0x80, 0x00]; @@ -3521,11 +3818,13 @@ mod tests { let (callback_tx, mut callback_rx) = broadcast::channel(8); let (operation_status_tx, _) = broadcast::channel::>(8); let recovery_active = Arc::new(std::sync::atomic::AtomicU32::new(0)); + let pending_ops: Arc> = Arc::new(Mutex::new(PendingOps::default())); let router_h = tokio::spawn(callback_router( event_rx, callback_tx, operation_status_tx, recovery_active, + pending_ops, )); event_tx @@ -3543,6 +3842,156 @@ mod tests { let _ = router_h.await; } + // ---- F54: per-operation correlation ------------------------------- + + /// F54: when a synthetic `OperationContext` exists in `pending_ops` + /// and an operation-status frame arrives through the router, the + /// emitted [`OperationStatus`] carries `context: Some(_)` populated + /// from the popped entry — and the registry no longer contains + /// that entry afterwards. + #[tokio::test] + async fn router_populates_operation_status_context_from_pending_ops_fifo() { + let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel(); + let (callback_tx, _callback_rx) = broadcast::channel(8); + let (operation_status_tx, mut operation_status_rx) = + broadcast::channel::>(8); + let recovery_active = Arc::new(std::sync::atomic::AtomicU32::new(0)); + let pending_ops: Arc> = Arc::new(Mutex::new(PendingOps::default())); + + // Pre-populate the registry with one outstanding Write op, + // mirroring what `Session::write_value_with_handle` would do. + let cid: [u8; 16] = [0xA1; 16]; + { + let mut guard = pending_ops.lock().await; + guard.push(OperationContext { + correlation_id: cid, + op_kind: OperationKind::Write, + reference: Some(Arc::::from("TestObj.TestInt")), + retry_count: 0, + }); + assert_eq!(guard.len(), 1); + } + + let router_h = tokio::spawn(callback_router( + event_rx, + callback_tx, + operation_status_tx, + recovery_active, + pending_ops.clone(), + )); + + // Drive the proven 5-byte WRITE_COMPLETE_OK frame through. + let inner = [0x00, 0x00, 0x50, 0x80, 0x00]; + let body = wrap_op_status_envelope(&inner); + event_tx + .send(CallbackEvent::CallbackInvoked { opnum: 4, body }) + .unwrap(); + + let event = tokio::time::timeout( + std::time::Duration::from_secs(1), + operation_status_rx.recv(), + ) + .await + .expect("router timed out") + .expect("broadcast recv error"); + + // F54 contract: context populated with the same correlation id + // we inserted, the same op_kind, and the same reference. + let ctx = event.context.clone().expect("context should be populated"); + assert_eq!(ctx.correlation_id, cid); + assert_eq!(ctx.op_kind, OperationKind::Write); + assert_eq!(ctx.reference.as_deref(), Some("TestObj.TestInt")); + assert_eq!(ctx.retry_count, 0); + + // F54 one-shot semantics: the entry has been removed. + { + let guard = pending_ops.lock().await; + assert_eq!(guard.len(), 0, "entry must be removed after firing"); + } + + drop(event_tx); + let _ = router_h.await; + } + + /// F54: when no pending op is registered, the emitted + /// [`OperationStatus`] carries `context: None` (verbatim-preserve + /// fallback per CLAUDE.md). + #[tokio::test] + async fn router_emits_none_context_for_unknown_correlation() { + let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel(); + let (callback_tx, _callback_rx) = broadcast::channel(8); + let (operation_status_tx, mut operation_status_rx) = + broadcast::channel::>(8); + let recovery_active = Arc::new(std::sync::atomic::AtomicU32::new(0)); + // Empty pending_ops — the arriving status frame has no + // matching entry to attach. + let pending_ops: Arc> = Arc::new(Mutex::new(PendingOps::default())); + + let router_h = tokio::spawn(callback_router( + event_rx, + callback_tx, + operation_status_tx, + recovery_active, + pending_ops.clone(), + )); + + let inner = [0x00, 0x00, 0x50, 0x80, 0x00]; + let body = wrap_op_status_envelope(&inner); + event_tx + .send(CallbackEvent::CallbackInvoked { opnum: 4, body }) + .unwrap(); + + let event = tokio::time::timeout( + std::time::Duration::from_secs(1), + operation_status_rx.recv(), + ) + .await + .expect("router timed out") + .expect("broadcast recv error"); + + // F54 fallback: no pending ops means context is None — the + // raw frame is still surfaced verbatim. + assert!(event.context.is_none()); + assert_eq!(event.status, MxStatus::WRITE_COMPLETE_OK); + + drop(event_tx); + let _ = router_h.await; + } + + /// F54: `Session::write_value_with_handle` returns a `WriteHandle` + /// whose correlation id matches the entry inserted into + /// `pending_ops`. The handle stays in the registry until a status + /// frame arrives or the consumer cancels. + #[tokio::test] + async fn write_value_with_handle_inserts_into_pending_ops() { + let (addr, handle) = unauthenticated_server(vec![(0, Vec::new())]).await; + let resolver: Arc = Arc::new(StaticResolver::new(&[( + "TestObj.TestInt", + sample_metadata(), + )])); + let session = connect_test_session(addr, resolver).await.unwrap(); + + // Issue the write — handle returned carries the synthetic id. + let write_handle = session + .write_value_with_handle("TestObj.TestInt", WriteValue::Int32(7)) + .await + .unwrap(); + + // The pending_ops registry should contain exactly that id with + // OperationKind::Write + the original reference. + let guard = session.inner.pending_ops.lock().await; + assert_eq!(guard.len(), 1); + let ctx = guard + .by_id + .get(&write_handle.correlation_id) + .expect("inserted entry"); + assert_eq!(ctx.op_kind, OperationKind::Write); + assert_eq!(ctx.reference.as_deref(), Some("TestObj.TestInt")); + drop(guard); + + handle.await.unwrap(); + } + /// F47 — `Session::unsubscribe` must NOT emit an `UnAdvise` for /// buffered subscriptions, mirroring the .NET reference's /// `if (!subscription.IsBuffered)` guard at