//! Plain (non-buffered) subscribe live diagnostic for F49 / F56. //! //! Mirror of `buffered_subscribe_live.rs` but invokes //! `Session::subscribe` instead of `subscribe_buffered`. Used to //! isolate whether F56's "no DataUpdate" symptom is buffered-specific //! (only `subscribe_buffered` broken) or affects all subscribe paths. #![allow( clippy::unwrap_used, clippy::expect_used, clippy::indexing_slicing, clippy::panic )] #[cfg(all(windows, feature = "live-windows-com"))] mod live { use std::sync::Arc; use std::time::{Duration, Instant}; use mxaccess::{RecoveryPolicy, Session, SessionOptions}; use mxaccess_galaxy::SqlTagResolver; use mxaccess_rpc::ntlm::NtlmClientContext; fn ntlm_from_test_env() -> NtlmClientContext { let user = std::env::var("MX_TEST_USER").expect("MX_TEST_USER"); let password = std::env::var("MX_TEST_PASSWORD").expect("MX_TEST_PASSWORD"); let domain = std::env::var("MX_TEST_DOMAIN").unwrap_or_default(); let hostname = std::env::var("COMPUTERNAME").unwrap_or_default(); NtlmClientContext::new(&user, &password, &domain, Some(&hostname)) } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] #[ignore] async fn plain_subscribe_yields_updates() { if std::env::var_os("MX_LIVE").is_none() { eprintln!("MX_LIVE not set — skipping live test"); return; } let tag = std::env::var("MX_TEST_TAG") .unwrap_or_else(|_| "TestChildObject.TestInt".to_string()); let _ = tracing_subscriber::fmt() .with_env_filter( tracing_subscriber::EnvFilter::try_from_default_env() .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")), ) .with_test_writer() .try_init(); let galaxy_db = std::env::var("MX_GALAXY_DB").expect("MX_GALAXY_DB"); let resolver = Arc::new( SqlTagResolver::from_ado_string(&galaxy_db).expect("SqlTagResolver"), ); let session = Session::connect_nmx_auto( ntlm_from_test_env, SessionOptions::default(), resolver, RecoveryPolicy::default(), ) .await .expect("connect_nmx_auto"); eprintln!("session connected"); // F56 — check raw NMX subscription messages on the broadcast, // not the value-filtered Subscription stream. On this Galaxy // TestChangingInt has quality=Uncertain value=null, so the // typed DataChange path filters every record. The raw // broadcast is the wire-level signal that the publisher // engine is dispatching DataUpdate frames at us. let mut callbacks_rx = session.callbacks(); let sub = session.subscribe(&tag).await.expect("subscribe"); eprintln!("plain subscribe correlation_id = {:02x?}", sub.correlation_id()); let deadline = Instant::now() + Duration::from_secs(20); let mut raw_received = 0; while raw_received < 3 && Instant::now() < deadline { match tokio::time::timeout(Duration::from_secs(5), callbacks_rx.recv()).await { Ok(Ok(msg)) => { eprintln!( "[raw {raw_received}] cmd=0x{:02x} record_count={} records.len={}", msg.command, msg.record_count, msg.records.len() ); raw_received += 1; } Ok(Err(_)) => break, Err(_) => eprintln!("5s gap waiting for next NMX message"), } } assert!( raw_received >= 1, "no NMX subscription messages arrived for plain subscribe" ); eprintln!("received {raw_received} raw NMX subscription messages"); session.unsubscribe(sub).await.expect("unsubscribe"); session.shutdown_nmx().await.expect("shutdown"); } } #[cfg(not(all(windows, feature = "live-windows-com")))] mod live { #[test] #[ignore] fn plain_subscribe_yields_updates() { eprintln!("test skipped: requires Windows + live-windows-com feature"); } }