//! Live verification of F36 — buffered subscribe (`Session::subscribe_buffered`) //! round-trips against AVEVA and yields `DataChange`s at the requested cadence. //! //! F49 step 1. Asserts the structural property of F36 (single //! `RegisterReference` with `.property(buffer)` suffix, no separate //! `AdviseSupervisory` follow-up, no `SetBufferedUpdateInterval` RPC) //! is preserved end-to-end. The structural piece is unit-tested //! exhaustively in `crates/mxaccess/src/session.rs` (search //! `subscribe_buffered_nmx`); this test confirms the wire round-trip //! actually delivers updates. //! //! Gated on `MX_LIVE` env + `live-windows-com` feature. Uses //! `Session::connect_nmx_auto` (F55-proven path). //! //! Run with: //! ```text //! cd rust //! cargo test -p mxaccess-compat --features live-windows-com \ //! --test buffered_subscribe_live -- --ignored --nocapture //! ``` #![allow( clippy::unwrap_used, clippy::expect_used, clippy::indexing_slicing, clippy::panic )] #[cfg(all(windows, feature = "live-windows-com"))] mod live { use std::sync::Arc; use std::time::{Duration, Instant}; use futures_util::StreamExt; use mxaccess::{BufferedOptions, MxValue, RecoveryPolicy, Session, SessionOptions}; use mxaccess_galaxy::SqlTagResolver; use mxaccess_rpc::ntlm::NtlmClientContext; fn ntlm_from_test_env() -> NtlmClientContext { let user = std::env::var("MX_TEST_USER").expect("MX_TEST_USER"); let password = std::env::var("MX_TEST_PASSWORD").expect("MX_TEST_PASSWORD"); let domain = std::env::var("MX_TEST_DOMAIN").unwrap_or_default(); let hostname = std::env::var("COMPUTERNAME").unwrap_or_default(); NtlmClientContext::new(&user, &password, &domain, Some(&hostname)) } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[ignore] async fn buffered_subscribe_yields_updates() { if std::env::var_os("MX_LIVE").is_none() { eprintln!("MX_LIVE not set — skipping live test"); return; } let tag = std::env::var("MX_TEST_TAG") .unwrap_or_else(|_| "TestChildObject.TestInt".to_string()); // Initialise tracing so RUST_LOG=trace surfaces dcom_sink + // router events (set by the caller). Init may fail if a // subscriber is already installed — ignore the result. let _ = tracing_subscriber::fmt() .with_env_filter( tracing_subscriber::EnvFilter::try_from_default_env() .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")), ) .with_test_writer() .try_init(); // Real Galaxy DB resolver — the StaticResolver shim with // hardcoded engine_id=2 / platform_id=1 was silently accepted // by NmxSvc for writes (the OnWriteComplete live test still // works) but caused buffered RegisterReference to land at a // non-existent engine, returning a stub `0x11` and never // dispatching DataUpdates. F56 root cause. let galaxy_db = std::env::var("MX_GALAXY_DB") .expect("MX_GALAXY_DB (set via tools/Setup-LiveProbeEnv.ps1)"); let resolver = Arc::new( SqlTagResolver::from_ado_string(&galaxy_db).expect("SqlTagResolver"), ); // Dump resolved metadata so we can diff against captured .NET wire bytes. { use mxaccess_galaxy::Resolver as _; let m = resolver.resolve(&tag).await.expect("resolve test tag"); eprintln!( "resolved {tag}: object_tag={:?} attribute={:?} primitive={:?} platform={} engine={} object={} attribute_id={} property_id={} mx_type={} is_array={}", m.object_tag_name, m.attribute_name, m.primitive_name, m.platform_id, m.engine_id, m.object_id, m.attribute_id, m.property_id, m.mx_data_type, m.is_array, ); } eprintln!("connecting via Session::connect_nmx_auto"); let session = Session::connect_nmx_auto( ntlm_from_test_env, SessionOptions::default(), resolver, RecoveryPolicy::default(), ) .await .expect("connect_nmx_auto"); eprintln!("session connected"); // 1s cadence. Mirrors the `subscribe-buffered` example. let opts = BufferedOptions { update_interval_ms: 1_000, }; eprintln!( "buffered-subscribing to {} (requested cadence {} ms, rounded to {} ms)", tag, opts.update_interval_ms, opts.rounded_update_interval_ms() ); let mut sub = session .subscribe_buffered(&tag, opts) .await .expect("subscribe_buffered"); eprintln!("correlation_id = {:02x?}", sub.correlation_id()); // For an auto-scanning tag (e.g. TestMachine_001.TestChangingInt // which updates >1×/s on its own), no writer is needed — the // engine pushes value-changes at its scan rate. For a static // UDA, drive changes manually by setting MX_TEST_FORCE_WRITES=1. let force_writes = std::env::var_os("MX_TEST_FORCE_WRITES").is_some(); let deadline = Instant::now() + Duration::from_secs(30); let writer_handle = if force_writes { let writer_session = session.clone(); let writer_tag = tag.clone(); let stop = Arc::new(std::sync::atomic::AtomicBool::new(false)); let stop_clone = stop.clone(); let h = tokio::spawn(async move { let mut value: i32 = 1_000; while !stop_clone.load(std::sync::atomic::Ordering::Acquire) { if writer_session .write(&writer_tag, MxValue::Int32(value)) .await .is_err() { break; } value = value.wrapping_add(1); tokio::time::sleep(Duration::from_millis(500)).await; } value }); Some((stop, h)) } else { eprintln!("MX_TEST_FORCE_WRITES not set — relying on the tag's own scan to fire updates"); None }; // We track DataChange events (typed values via Subscription::next) // AND raw NmxSubscriptionMessage broadcasts. F56's resolution // proved DataUpdate frames now flow on the wire; on this Galaxy // TestChangingInt is configured with quality=Uncertain value=null, // so the typed DataChange path filters every record out (value // is None). Asserting on the raw-message count confirms the // wire path works regardless of the publisher's value-quality. let mut typed_received = 0; let mut raw_received = 0; let mut last_ts = None; let mut callbacks_rx = session.callbacks(); while raw_received < 3 && Instant::now() < deadline { tokio::select! { next = tokio::time::timeout(Duration::from_secs(5), sub.next()) => match next { Ok(Some(Ok(dc))) => { eprintln!( "[typed {typed_received}] {} = {:?} ts={:?}", dc.reference, dc.value, dc.timestamp ); typed_received += 1; last_ts = Some(dc.timestamp); } Ok(Some(Err(e))) => { if let Some((stop, h)) = writer_handle { stop.store(true, std::sync::atomic::Ordering::Release); let _ = h.await; } panic!("subscription error: {e}"); } Ok(None) => break, Err(_) => eprintln!("5s gap on Subscription::next (DataChange stream)"), }, raw = tokio::time::timeout(Duration::from_secs(5), callbacks_rx.recv()) => match raw { Ok(Ok(msg)) => { eprintln!( "[raw {raw_received}] cmd=0x{:02x} record_count={} records.len={}", msg.command, msg.record_count, msg.records.len() ); raw_received += 1; } Ok(Err(_)) => break, Err(_) => eprintln!("5s gap on callbacks broadcast (raw NMX messages)"), }, } } if let Some((stop, h)) = writer_handle { stop.store(true, std::sync::atomic::Ordering::Release); let last = h.await.unwrap_or(-1); eprintln!("writer stopped after value {last}"); } eprintln!( "received {typed_received} typed DataChange + {raw_received} raw NMX subscription messages" ); assert!( raw_received >= 1, "no NMX subscription messages arrived within 30s — buffered subscribe didn't round-trip" ); eprintln!("last ts = {last_ts:?}"); session.unsubscribe(sub).await.expect("unsubscribe"); session.shutdown_nmx().await.expect("shutdown"); eprintln!("clean shutdown"); } } #[cfg(not(all(windows, feature = "live-windows-com")))] mod live { #[test] #[ignore] fn buffered_subscribe_yields_updates() { eprintln!("test skipped: requires Windows + live-windows-com feature"); } }