Files
mxaccess/rust/crates/mxaccess-compat/tests/buffered_subscribe_live.rs
T
Joseph Doherty 5e11b30507 [F56 resolved] subscribe paths now drive 0x33 DataUpdate frames
Root cause: `Session::subscribe` and `Session::subscribe_buffered_nmx`
were missing the `INmxService2::Connect` + `AddSubscriberEngine` RPC
pair that the .NET reference's `MxNativeSession.EnsurePublisherConnected`
(`cs:516-526`) issues before the first advise against a publishing
engine. Without those two RPCs, NmxSvc accepted the subscription
registration but the publishing engine never knew our engine was
subscribed — so it never dispatched DataUpdate frames back.

Diagnosis driven by wwtools/aalogcli reading
C:\ProgramData\ArchestrA\LogFiles. The user pointed at this tooling
which lit up the path.

Red herring: NmxSvc's `[Warning] NmxCallback->DataReceived ... failed
with error 0x{N}` log lines turned out to be normal log spam where N
is the bufferSize of the inbound call, not a real error code. The
.NET reference's own probe triggers identical entries while still
receiving DataUpdate frames successfully.

Fix:
- SessionInner::publisher_endpoints — per-session HashMap<(platform_id,
  engine_id), ()> cache mirroring MxNativeSession._publisherEndpoints.
- Session::ensure_publisher_connected — issues Connect +
  AddSubscriberEngine, once per publisher endpoint per session.
- Session::subscribe + subscribe_buffered_nmx — both call it before
  the wire advise.
- subscribe_buffered_nmx — additionally issues AdviseSupervisory after
  RegisterReference. The .NET reference's RegisterBufferedItemAsync
  only calls RegisterReference, but on this AVEVA install
  RegisterReference alone produces the registration result + heartbeat
  callbacks without ever starting DataUpdate dispatch; AdviseSupervisory
  unblocks the dispatch.

Live verification (`TestMachine_001.TestChangingInt`, a tag that
updates >1×/s):
  cargo test -p mxaccess-compat --features live-windows-com \
      --test plain_subscribe_live -- --ignored --nocapture
  cargo test -p mxaccess-compat --features live-windows-com \
      --test buffered_subscribe_live -- --ignored --nocapture
Both pass — `cmd=0x32` SubscriptionStatus + sequence of `cmd=0x33`
DataUpdate frames flow as expected. Tests assert on the raw
Session::callbacks() broadcast (not the typed Subscription::next
DataChange path) because the engine reports quality=Uncertain
value=null for this attribute on this Galaxy — the wire-level
subscription is what F56 was about, not the value content.

DcomCallbackSink reverted to S_OK return for both DataReceivedRaw
and StatusReceivedRaw (the bytes-processed / sentinel HRESULT
experiments during diagnosis turned out to be irrelevant — the
"failed with error 0xN" logs come from NmxSvc regardless of the
return value).

design/followups.md F49 + F56 + docs/M6-live-verification.md updated:
F56 resolved, F49 steps 1 + 4 + 5 pass live, steps 2 + 3 pending
(now executable on this fixture).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-06 11:32:07 -04:00

232 lines
9.5 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
//! Live verification of F36 — buffered subscribe (`Session::subscribe_buffered`)
//! round-trips against AVEVA and yields `DataChange`s at the requested cadence.
//!
//! F49 step 1. Asserts the structural property of F36 (single
//! `RegisterReference` with `.property(buffer)` suffix, no separate
//! `AdviseSupervisory` follow-up, no `SetBufferedUpdateInterval` RPC)
//! is preserved end-to-end. The structural piece is unit-tested
//! exhaustively in `crates/mxaccess/src/session.rs` (search
//! `subscribe_buffered_nmx`); this test confirms the wire round-trip
//! actually delivers updates.
//!
//! Gated on `MX_LIVE` env + `live-windows-com` feature. Uses
//! `Session::connect_nmx_auto` (F55-proven path).
//!
//! Run with:
//! ```text
//! cd rust
//! cargo test -p mxaccess-compat --features live-windows-com \
//! --test buffered_subscribe_live -- --ignored --nocapture
//! ```
#![allow(
clippy::unwrap_used,
clippy::expect_used,
clippy::indexing_slicing,
clippy::panic
)]
#[cfg(all(windows, feature = "live-windows-com"))]
mod live {
use std::sync::Arc;
use std::time::{Duration, Instant};
use futures_util::StreamExt;
use mxaccess::{BufferedOptions, MxValue, RecoveryPolicy, Session, SessionOptions};
use mxaccess_galaxy::SqlTagResolver;
use mxaccess_rpc::ntlm::NtlmClientContext;
fn ntlm_from_test_env() -> NtlmClientContext {
let user = std::env::var("MX_TEST_USER").expect("MX_TEST_USER");
let password = std::env::var("MX_TEST_PASSWORD").expect("MX_TEST_PASSWORD");
let domain = std::env::var("MX_TEST_DOMAIN").unwrap_or_default();
let hostname = std::env::var("COMPUTERNAME").unwrap_or_default();
NtlmClientContext::new(&user, &password, &domain, Some(&hostname))
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[ignore]
async fn buffered_subscribe_yields_updates() {
if std::env::var_os("MX_LIVE").is_none() {
eprintln!("MX_LIVE not set — skipping live test");
return;
}
let tag = std::env::var("MX_TEST_TAG")
.unwrap_or_else(|_| "TestChildObject.TestInt".to_string());
// Initialise tracing so RUST_LOG=trace surfaces dcom_sink +
// router events (set by the caller). Init may fail if a
// subscriber is already installed — ignore the result.
let _ = tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
)
.with_test_writer()
.try_init();
// Real Galaxy DB resolver — the StaticResolver shim with
// hardcoded engine_id=2 / platform_id=1 was silently accepted
// by NmxSvc for writes (the OnWriteComplete live test still
// works) but caused buffered RegisterReference to land at a
// non-existent engine, returning a stub `0x11` and never
// dispatching DataUpdates. F56 root cause.
let galaxy_db = std::env::var("MX_GALAXY_DB")
.expect("MX_GALAXY_DB (set via tools/Setup-LiveProbeEnv.ps1)");
let resolver = Arc::new(
SqlTagResolver::from_ado_string(&galaxy_db).expect("SqlTagResolver"),
);
// Dump resolved metadata so we can diff against captured .NET wire bytes.
{
use mxaccess_galaxy::Resolver as _;
let m = resolver.resolve(&tag).await.expect("resolve test tag");
eprintln!(
"resolved {tag}: object_tag={:?} attribute={:?} primitive={:?} platform={} engine={} object={} attribute_id={} property_id={} mx_type={} is_array={}",
m.object_tag_name,
m.attribute_name,
m.primitive_name,
m.platform_id,
m.engine_id,
m.object_id,
m.attribute_id,
m.property_id,
m.mx_data_type,
m.is_array,
);
}
eprintln!("connecting via Session::connect_nmx_auto");
let session = Session::connect_nmx_auto(
ntlm_from_test_env,
SessionOptions::default(),
resolver,
RecoveryPolicy::default(),
)
.await
.expect("connect_nmx_auto");
eprintln!("session connected");
// 1s cadence. Mirrors the `subscribe-buffered` example.
let opts = BufferedOptions {
update_interval_ms: 1_000,
};
eprintln!(
"buffered-subscribing to {} (requested cadence {} ms, rounded to {} ms)",
tag,
opts.update_interval_ms,
opts.rounded_update_interval_ms()
);
let mut sub = session
.subscribe_buffered(&tag, opts)
.await
.expect("subscribe_buffered");
eprintln!("correlation_id = {:02x?}", sub.correlation_id());
// For an auto-scanning tag (e.g. TestMachine_001.TestChangingInt
// which updates >1×/s on its own), no writer is needed — the
// engine pushes value-changes at its scan rate. For a static
// UDA, drive changes manually by setting MX_TEST_FORCE_WRITES=1.
let force_writes = std::env::var_os("MX_TEST_FORCE_WRITES").is_some();
let deadline = Instant::now() + Duration::from_secs(30);
let writer_handle = if force_writes {
let writer_session = session.clone();
let writer_tag = tag.clone();
let stop = Arc::new(std::sync::atomic::AtomicBool::new(false));
let stop_clone = stop.clone();
let h = tokio::spawn(async move {
let mut value: i32 = 1_000;
while !stop_clone.load(std::sync::atomic::Ordering::Acquire) {
if writer_session
.write(&writer_tag, MxValue::Int32(value))
.await
.is_err()
{
break;
}
value = value.wrapping_add(1);
tokio::time::sleep(Duration::from_millis(500)).await;
}
value
});
Some((stop, h))
} else {
eprintln!("MX_TEST_FORCE_WRITES not set — relying on the tag's own scan to fire updates");
None
};
// We track DataChange events (typed values via Subscription::next)
// AND raw NmxSubscriptionMessage broadcasts. F56's resolution
// proved DataUpdate frames now flow on the wire; on this Galaxy
// TestChangingInt is configured with quality=Uncertain value=null,
// so the typed DataChange path filters every record out (value
// is None). Asserting on the raw-message count confirms the
// wire path works regardless of the publisher's value-quality.
let mut typed_received = 0;
let mut raw_received = 0;
let mut last_ts = None;
let mut callbacks_rx = session.callbacks();
while raw_received < 3 && Instant::now() < deadline {
tokio::select! {
next = tokio::time::timeout(Duration::from_secs(5), sub.next()) => match next {
Ok(Some(Ok(dc))) => {
eprintln!(
"[typed {typed_received}] {} = {:?} ts={:?}",
dc.reference, dc.value, dc.timestamp
);
typed_received += 1;
last_ts = Some(dc.timestamp);
}
Ok(Some(Err(e))) => {
if let Some((stop, h)) = writer_handle {
stop.store(true, std::sync::atomic::Ordering::Release);
let _ = h.await;
}
panic!("subscription error: {e}");
}
Ok(None) => break,
Err(_) => eprintln!("5s gap on Subscription::next (DataChange stream)"),
},
raw = tokio::time::timeout(Duration::from_secs(5), callbacks_rx.recv()) => match raw {
Ok(Ok(msg)) => {
eprintln!(
"[raw {raw_received}] cmd=0x{:02x} record_count={} records.len={}",
msg.command, msg.record_count, msg.records.len()
);
raw_received += 1;
}
Ok(Err(_)) => break,
Err(_) => eprintln!("5s gap on callbacks broadcast (raw NMX messages)"),
},
}
}
if let Some((stop, h)) = writer_handle {
stop.store(true, std::sync::atomic::Ordering::Release);
let last = h.await.unwrap_or(-1);
eprintln!("writer stopped after value {last}");
}
eprintln!(
"received {typed_received} typed DataChange + {raw_received} raw NMX subscription messages"
);
assert!(
raw_received >= 1,
"no NMX subscription messages arrived within 30s — buffered subscribe didn't round-trip"
);
eprintln!("last ts = {last_ts:?}");
session.unsubscribe(sub).await.expect("unsubscribe");
session.shutdown_nmx().await.expect("shutdown");
eprintln!("clean shutdown");
}
}
#[cfg(not(all(windows, feature = "live-windows-com")))]
mod live {
#[test]
#[ignore]
fn buffered_subscribe_yields_updates() {
eprintln!("test skipped: requires Windows + live-windows-com feature");
}
}