From 2dc091d0beecacb0c1be3d9105262b7a5ca9db07 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Tue, 5 May 2026 09:52:14 -0400 Subject: [PATCH] [M4] mxaccess: Session::read (read-as-subscribe pattern) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Now that Subscription impls Stream>, the read-as-subscribe pattern is a thin wrapper over subscribe + timeout + best-effort unsubscribe. New - Session::read(reference, timeout) -> Result — port of MxNativeSession.ReadAsync (cs:312-359). Validates timeout > 0, subscribes, awaits the first DataChange under tokio::time::timeout, then issues UnAdvise (best-effort, mirrors the .NET finally block at cs:351-358 which discards the unsubscribe return). Error mapping - timeout=0: Configuration::InvalidArgument ("Read timeout must be positive") matching ArgumentOutOfRangeException at cs:318-321. - timeout elapsed: Error::Timeout(timeout). - subscribe failure (resolver / transport): propagated unchanged. - stream ends before any value: Connection::EngineNotRegistered (broadcast sender dropped during shutdown). - unsubscribe failure: tracing::warn! with the error; doesn't override the read result. Removed the placeholder stub in lib.rs that returned Error::Unsupported. Tests (4 new in mxaccess; total 44) - read_returns_first_data_change_within_timeout: spawn read, inject a 0x33 DataUpdate via test_inject_sender (which fans out to all subscriptions), assert the DataChange comes back with the right value. - read_returns_timeout_when_no_data_arrives: read times out cleanly with Error::Timeout when no callback fires. - read_zero_timeout_returns_invalid_argument_without_subscribing: validates the early-reject path before any RPC is issued. - read_propagates_resolver_not_found: subscribe-side error surfaces through read unchanged. Test count delta: 516 -> 520 (+4). All four DoD gates green. Co-Authored-By: Claude Opus 4.7 (1M context) --- rust/crates/mxaccess/src/lib.rs | 9 -- rust/crates/mxaccess/src/session.rs | 168 ++++++++++++++++++++++++++++ 2 files changed, 168 insertions(+), 9 deletions(-) diff --git a/rust/crates/mxaccess/src/lib.rs b/rust/crates/mxaccess/src/lib.rs index c9e672d..bc62c69 100644 --- a/rust/crates/mxaccess/src/lib.rs +++ b/rust/crates/mxaccess/src/lib.rs @@ -410,15 +410,6 @@ impl Session { }) } - /// Read-as-subscribe per `MxNativeSession.ReadAsync` — requires a positive - /// timeout, drop guarantees `UnAdvise`. - pub async fn read(&self, _reference: &str, _timeout: Duration) -> Result { - Err(Error::Unsupported { - operation: Cow::Borrowed("Session::read"), - transport: TransportKind::Nmx, - }) - } - pub async fn subscribe_many(&self, _references: &[&str]) -> Result { Err(Error::Unsupported { operation: Cow::Borrowed("Session::subscribe_many"), diff --git a/rust/crates/mxaccess/src/session.rs b/rust/crates/mxaccess/src/session.rs index 2a02cae..9a98a9e 100644 --- a/rust/crates/mxaccess/src/session.rs +++ b/rust/crates/mxaccess/src/session.rs @@ -757,6 +757,66 @@ impl Session { /// `broadcast::subscribe` would be dropped. /// /// + /// Read the current value of `reference` using the read-as-subscribe + /// pattern. Mirrors `MxNativeSession.ReadAsync` + /// (`MxNativeSession.cs:312-359`). + /// + /// Internally: subscribes to the tag, waits up to `timeout` for the + /// first `DataChange` whose value parses successfully, then issues + /// `UnAdvise` (best-effort — failures during cleanup are logged but + /// don't override the read result, mirroring the .NET `finally` + /// block at `cs:351-358` which discards the unsubscribe return). + /// + /// # Errors + /// - [`Error::Configuration`] if `timeout` is zero (the .NET + /// reference at `cs:318-321` rejects non-positive timeouts). + /// - [`Error::Timeout`] if no value arrives before `timeout` + /// elapses. + /// - Resolver / transport / RPC errors propagated from the + /// underlying subscribe. + pub async fn read( + &self, + reference: &str, + timeout: std::time::Duration, + ) -> Result { + if timeout.is_zero() { + return Err(Error::Configuration(ConfigError::InvalidArgument { + detail: "Read timeout must be positive".to_string(), + })); + } + + // Subscribe through the public path so the broadcast wiring + + // AdviseSupervisory both run. + let subscription = self.subscribe(reference).await?; + + // Wait for the first DataChange. Stream::next yields a Result; + // we transpose so timeout vs read-result are distinguishable. + use futures_util::StreamExt; + let mut sub = subscription; + let result = match tokio::time::timeout(timeout, sub.next()).await { + Ok(Some(Ok(dc))) => Ok(dc), + Ok(Some(Err(e))) => Err(e), + // Stream ended before any value (broadcast sender dropped — + // typically during shutdown). Surface as a Connection error + // because that's the closest fit; the read can't complete. + Ok(None) => Err(Error::Connection(ConnectionError::EngineNotRegistered)), + Err(_elapsed) => Err(Error::Timeout(timeout)), + }; + + // Best-effort unsubscribe. The .NET finally block at cs:351-358 + // ignores the return of Unsubscribe; mirror that — a failed + // cleanup shouldn't mask the read result. We log via tracing so + // operators can spot leak signals. + if let Err(unsub_err) = self.unsubscribe(sub).await { + tracing::warn!( + ?unsub_err, + "best-effort unsubscribe after read failed; possible server-side handle leak" + ); + } + + result + } + /// Cancel a subscription. Mirrors `MxNativeSession.Unsubscribe` /// (`cs:361-381`) — calls `UnAdvise` on the underlying transport. /// @@ -1772,6 +1832,114 @@ mod tests { handle.abort(); } + // ---- Session::read (read-as-subscribe) ---------------------------- + + #[tokio::test] + async fn read_returns_first_data_change_within_timeout() { + // Server: AdviseSupervisory ack + UnAdvise ack. + let (addr, handle) = unauthenticated_server(vec![(0, Vec::new()), (0, Vec::new())]).await; + let resolver: Arc = Arc::new(StaticResolver::new(&[( + "TestObj.TestInt", + sample_metadata(), + )])); + let session = connect_test_session(addr, resolver).await.unwrap(); + + // Spawn the read first (it'll subscribe internally). + let session_for_read = session.clone(); + let read_h = tokio::spawn(async move { + session_for_read + .read("TestObj.TestInt", std::time::Duration::from_secs(2)) + .await + }); + + // Give the subscribe time to register the broadcast receiver. + // Inject the matching message after a brief yield. + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + // Find the subscription's correlation id by inspecting the + // session's subscriber count proxy. Easier: just inject a 0x33 + // DataUpdate which fans out to all subscribers regardless of + // correlation. + let header_len = 23; + let record_len = 4 + 2 + 8 + 1 + 4; + let mut body = vec![0u8; header_len + record_len]; + body[0] = 0x33; + body[1..3].copy_from_slice(&1u16.to_le_bytes()); + body[3..7].copy_from_slice(&1i32.to_le_bytes()); + body[7..23].copy_from_slice(&[0xEEu8; 16]); + let off = header_len; + body[off..off + 4].copy_from_slice(&0i32.to_le_bytes()); + body[off + 4..off + 6].copy_from_slice(&0xC0u16.to_le_bytes()); + body[off + 6..off + 14].copy_from_slice(&0x1F0E_2D60_4C00_0000i64.to_le_bytes()); + body[off + 14] = 0x02; // wire_kind Int32 + body[off + 15..off + 19].copy_from_slice(&123i32.to_le_bytes()); + + let msg = NmxSubscriptionMessage::parse_inner(&body).unwrap(); + test_inject_sender(&session).send(Arc::new(msg)).unwrap(); + + let dc = read_h.await.unwrap().unwrap(); + assert_eq!(dc.value, mxaccess_codec::MxValue::Int32(123)); + assert_eq!(&*dc.reference, "TestObj.TestInt"); + handle.await.unwrap(); + } + + #[tokio::test] + async fn read_returns_timeout_when_no_data_arrives() { + // Server only handles the AdviseSupervisory + UnAdvise (no data + // injection). Read must hit the timeout branch. + let (addr, handle) = unauthenticated_server(vec![(0, Vec::new()), (0, Vec::new())]).await; + let resolver: Arc = Arc::new(StaticResolver::new(&[( + "TestObj.TestInt", + sample_metadata(), + )])); + let session = connect_test_session(addr, resolver).await.unwrap(); + + let err = session + .read("TestObj.TestInt", std::time::Duration::from_millis(100)) + .await + .unwrap_err(); + assert!(matches!(err, Error::Timeout(_))); + handle.await.unwrap(); + } + + #[tokio::test] + async fn read_zero_timeout_returns_invalid_argument_without_subscribing() { + let (addr, handle) = unauthenticated_server(Vec::new()).await; + let resolver: Arc = Arc::new(StaticResolver::new(&[])); + let session = connect_test_session(addr, resolver).await.unwrap(); + + let err = session + .read("anything", std::time::Duration::ZERO) + .await + .unwrap_err(); + match err { + Error::Configuration(ConfigError::InvalidArgument { detail }) => { + assert!(detail.contains("must be positive")); + } + other => panic!("expected InvalidArgument, got {other:?}"), + } + handle.abort(); + } + + #[tokio::test] + async fn read_propagates_resolver_not_found() { + let (addr, handle) = unauthenticated_server(Vec::new()).await; + let resolver: Arc = Arc::new(StaticResolver::new(&[])); + let session = connect_test_session(addr, resolver).await.unwrap(); + + let err = session + .read("does.not.exist", std::time::Duration::from_secs(1)) + .await + .unwrap_err(); + match err { + Error::Configuration(ConfigError::Galaxy { reason }) => { + assert!(reason.contains("tag not found")); + } + other => panic!("expected Galaxy not-found, got {other:?}"), + } + handle.abort(); + } + #[test] fn filetime_to_system_time_round_trip() { // Build a SystemTime, convert to FILETIME, convert back.