Files
mxaccess/rust/crates/mxaccess-compat/tests/buffered_subscribe_live.rs
T
Joseph Doherty df3457c54a [F56] subscribe / subscribe_buffered: split-form wire body + diagnose Galaxy fixture gap
Three real fixes + one architectural diagnosis:

1. Session::subscribe_buffered_nmx now sends the .NET-reference split
   form on the wire:
     item_definition = "<attr>.property(buffer)"   (was: full reference)
     item_context    = "<object_tag_name>"          (was: empty)
     item_handle     = SessionInner::next_item_handle.fetch_add(1)
                       (was: hardcoded 0)
   Verified byte-identical against captures/082 + 094 by the existing
   buffered_register_reference_parity unit tests. The
   item_handle counter mirrors MxNativeCompatibilityServer's
   _nextItemHandle++ at MxNativeSession.cs:613.

2. New live tests:
   - tests/buffered_subscribe_live.rs (F49 step 1) — uses real Galaxy
     metadata via SqlTagResolver + connect_nmx_auto, drives a
     background writer at 500ms cadence to force value-changes,
     drains DataChange events from Subscription.
   - tests/plain_subscribe_live.rs — same harness over plain
     Session::subscribe (NOT buffered), used to isolate whether
     "no DataUpdate" is buffered-specific (it's not — both fail).

   Both pull tracing-subscriber as a dev-dep so `RUST_LOG=trace`
   surfaces dcom_sink + router activity.

3. mxaccess-galaxy/sql_resolver.rs: drop the inner-attribute
   `#![cfg(feature = "galaxy-resolver")]` — the module-level cfg on
   `pub mod sql_resolver` in lib.rs already handles this and Rust
   1.85's clippy::duplicated_attributes lint flagged the duplicate
   once mxaccess-compat dev-deps activated the feature.

4. F56 finding (diagnosis, NOT a bug fix): the engine on this Galaxy
   install does not have an active value for TestChildObject.TestInt.
   Confirmed by running the .NET reference's own probe:

     dotnet run --project src/MxNativeClient.Probe -c Release \
       -- --probe-session-subscribe --tag=TestChildObject.TestInt \
       --subscribe-hold-seconds=10

   ...returns ONE 0x32 SubscriptionStatus (status=3 detail=3
   quality=0x00C0 Uncertain value=null) and zero 0x33 DataUpdates —
   matching the Rust port's symptom exactly. Not a Rust port bug,
   not a wire-byte gap. F49 steps 1-3 need either an actively-
   scanned tag or local Galaxy reconfiguration to scan
   TestChildObject.TestInt.

Workspace tests + clippy clean under both feature configurations.
F56 entry in design/followups.md updated with the full diagnostic
chain so future-me / future-collaborators can pick it up without
re-tracing.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-06 10:27:08 -04:00

199 lines
7.7 KiB
Rust

//! Live verification of F36 — buffered subscribe (`Session::subscribe_buffered`)
//! round-trips against AVEVA and yields `DataChange`s at the requested cadence.
//!
//! F49 step 1. Asserts the structural property of F36 (single
//! `RegisterReference` with `.property(buffer)` suffix, no separate
//! `AdviseSupervisory` follow-up, no `SetBufferedUpdateInterval` RPC)
//! is preserved end-to-end. The structural piece is unit-tested
//! exhaustively in `crates/mxaccess/src/session.rs` (search
//! `subscribe_buffered_nmx`); this test confirms the wire round-trip
//! actually delivers updates.
//!
//! Gated on `MX_LIVE` env + `live-windows-com` feature. Uses
//! `Session::connect_nmx_auto` (F55-proven path).
//!
//! Run with:
//! ```text
//! cd rust
//! cargo test -p mxaccess-compat --features live-windows-com \
//! --test buffered_subscribe_live -- --ignored --nocapture
//! ```
#![allow(
clippy::unwrap_used,
clippy::expect_used,
clippy::indexing_slicing,
clippy::panic
)]
#[cfg(all(windows, feature = "live-windows-com"))]
mod live {
use std::sync::Arc;
use std::time::{Duration, Instant};
use futures_util::StreamExt;
use mxaccess::{BufferedOptions, MxValue, RecoveryPolicy, Session, SessionOptions};
use mxaccess_galaxy::SqlTagResolver;
use mxaccess_rpc::ntlm::NtlmClientContext;
fn ntlm_from_test_env() -> NtlmClientContext {
let user = std::env::var("MX_TEST_USER").expect("MX_TEST_USER");
let password = std::env::var("MX_TEST_PASSWORD").expect("MX_TEST_PASSWORD");
let domain = std::env::var("MX_TEST_DOMAIN").unwrap_or_default();
let hostname = std::env::var("COMPUTERNAME").unwrap_or_default();
NtlmClientContext::new(&user, &password, &domain, Some(&hostname))
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[ignore]
async fn buffered_subscribe_yields_updates() {
if std::env::var_os("MX_LIVE").is_none() {
eprintln!("MX_LIVE not set — skipping live test");
return;
}
let tag = std::env::var("MX_TEST_TAG")
.unwrap_or_else(|_| "TestChildObject.TestInt".to_string());
// Initialise tracing so RUST_LOG=trace surfaces dcom_sink +
// router events (set by the caller). Init may fail if a
// subscriber is already installed — ignore the result.
let _ = tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
)
.with_test_writer()
.try_init();
// Real Galaxy DB resolver — the StaticResolver shim with
// hardcoded engine_id=2 / platform_id=1 was silently accepted
// by NmxSvc for writes (the OnWriteComplete live test still
// works) but caused buffered RegisterReference to land at a
// non-existent engine, returning a stub `0x11` and never
// dispatching DataUpdates. F56 root cause.
let galaxy_db = std::env::var("MX_GALAXY_DB")
.expect("MX_GALAXY_DB (set via tools/Setup-LiveProbeEnv.ps1)");
let resolver = Arc::new(
SqlTagResolver::from_ado_string(&galaxy_db).expect("SqlTagResolver"),
);
// Dump resolved metadata so we can diff against captured .NET wire bytes.
{
use mxaccess_galaxy::Resolver as _;
let m = resolver.resolve(&tag).await.expect("resolve test tag");
eprintln!(
"resolved {tag}: object_tag={:?} attribute={:?} primitive={:?} platform={} engine={} object={} attribute_id={} property_id={} mx_type={} is_array={}",
m.object_tag_name,
m.attribute_name,
m.primitive_name,
m.platform_id,
m.engine_id,
m.object_id,
m.attribute_id,
m.property_id,
m.mx_data_type,
m.is_array,
);
}
eprintln!("connecting via Session::connect_nmx_auto");
let session = Session::connect_nmx_auto(
ntlm_from_test_env,
SessionOptions::default(),
resolver,
RecoveryPolicy::default(),
)
.await
.expect("connect_nmx_auto");
eprintln!("session connected");
// 1s cadence. Mirrors the `subscribe-buffered` example.
let opts = BufferedOptions {
update_interval_ms: 1_000,
};
eprintln!(
"buffered-subscribing to {} (requested cadence {} ms, rounded to {} ms)",
tag,
opts.update_interval_ms,
opts.rounded_update_interval_ms()
);
let mut sub = session
.subscribe_buffered(&tag, opts)
.await
.expect("subscribe_buffered");
eprintln!("correlation_id = {:02x?}", sub.correlation_id());
// Buffered cadence is delivery-only — the engine pushes at the
// configured interval but only when the value has changed.
// Spawn a background writer that bumps the tag every 500ms so
// the engine always has a fresh value to deliver at the next
// cadence boundary. 30s drain window.
let deadline = Instant::now() + Duration::from_secs(30);
let writer_session = session.clone();
let writer_tag = tag.clone();
let writer_stop = Arc::new(std::sync::atomic::AtomicBool::new(false));
let writer_stop_clone = writer_stop.clone();
let writer = tokio::spawn(async move {
let mut value: i32 = 1_000;
while !writer_stop_clone.load(std::sync::atomic::Ordering::Acquire) {
if let Err(e) = writer_session
.write(&writer_tag, MxValue::Int32(value))
.await
{
eprintln!("writer: write({value}) failed: {e}");
break;
}
value = value.wrapping_add(1);
tokio::time::sleep(Duration::from_millis(500)).await;
}
value
});
let mut received = 0;
let mut last_ts = None;
while received < 3 && Instant::now() < deadline {
match tokio::time::timeout(Duration::from_secs(5), sub.next()).await {
Ok(Some(Ok(dc))) => {
eprintln!(
"[{received}] {} = {:?} ts={:?}",
dc.reference, dc.value, dc.timestamp
);
received += 1;
last_ts = Some(dc.timestamp);
}
Ok(Some(Err(e))) => {
writer_stop.store(true, std::sync::atomic::Ordering::Release);
let _ = writer.await;
panic!("subscription error: {e}");
}
Ok(None) => break,
Err(_) => {
eprintln!("5s gap waiting for next update");
}
}
}
writer_stop.store(true, std::sync::atomic::Ordering::Release);
let last_value = writer.await.unwrap_or(-1);
eprintln!("writer stopped after value {last_value}");
assert!(
received >= 1,
"no DataChange arrived within 15s — buffered subscribe didn't round-trip"
);
eprintln!("received {received} updates; last ts = {last_ts:?}");
session.unsubscribe(sub).await.expect("unsubscribe");
session.shutdown_nmx().await.expect("shutdown");
eprintln!("clean shutdown");
}
}
#[cfg(not(all(windows, feature = "live-windows-com")))]
mod live {
#[test]
#[ignore]
fn buffered_subscribe_yields_updates() {
eprintln!("test skipped: requires Windows + live-windows-com feature");
}
}