From d1491435357d79075f7106df210cb72512313b3f Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Wed, 6 May 2026 12:00:44 -0400 Subject: [PATCH] [F49 steps 2 + 3] live verification: buffered recovery replay + unsubscribe skip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Step 3 (F47 buffered unsubscribe skip): - crates/mxaccess-compat/tests/buffered_unsubscribe_skip_live.rs. - Subscribe buffered, sleep so the engine has DataUpdates in flight, then call unsubscribe. Asserts Ok return without surfacing transport or HRESULT errors. - Session::unsubscribe (session.rs:2261) probes the registry: if Buffered { .. }, it skips nmx.un_advise entirely, mirroring the .NET reference's `if (!subscription.IsBuffered)` guard at MxNativeSession.cs:361-381. If unsubscribe accidentally emitted UnAdvise for a buffered correlation id, the engine would return non-zero HRESULT (no matching plain advise to retract) — surfacing as a panic. Step 2 (F45 buffered recovery replay): - crates/mxaccess-compat/tests/buffered_recovery_replay_live.rs. - Subscribe buffered, drain >=1 NMX subscription message (cmd=0x32 SubscriptionStatus + cmd=0x33 DataUpdate) to confirm the wire path is hot pre-recovery, install a RebuildFactory that calls NmxClient::create (the same auto-resolving COM-activation path Session::connect_nmx_auto uses), invoke recover_connection, drain >=1 NMX subscription message post-recovery. - Verifies the replay branch in recover_connection_core re-issues RegisterReference (NOT AdviseSupervisory) for the buffered entry, mirroring MxNativeSession.ReAdviseSubscription (cs:538-569). Structural property is unit-tested; this confirms the engine actually picks back up after the rebuild + replay. Both tests pass live on this Galaxy: cargo test -p mxaccess-compat --features live-windows-com \ --test buffered_unsubscribe_skip_live -- --ignored --nocapture cargo test -p mxaccess-compat --features live-windows-com \ --test buffered_recovery_replay_live -- --ignored --nocapture Pulls mxaccess-nmx + mxaccess-codec into mxaccess-compat dev-deps so the recovery test can build a RebuildFactory closure that returns NmxClient and bind a typed broadcast Receiver. design/followups.md F49 -> Resolved (all five steps pass live). docs/M6-live-verification.md updated with per-step evidence + repro commands. F49 is fully closed out. F55 (DCOM-managed INmxSvcCallback, Path A) and F56 (missing EnsurePublisherConnected + post-RegisterReference AdviseSupervisory for buffered) were the two real Rust-port bugs uncovered along the way; both resolved. Remaining post-V1 followups (F50 Suspend/Activate Frida, F51 ASB type matrix, F52 perf, F53 doc lint, etc.) are scoped independently and not part of F49. Co-Authored-By: Claude Opus 4.7 (1M context) --- design/followups.md | 9 +- docs/M6-live-verification.md | 53 +++++- rust/Cargo.lock | 2 + rust/crates/mxaccess-compat/Cargo.toml | 5 + .../tests/buffered_recovery_replay_live.rs | 179 ++++++++++++++++++ .../tests/buffered_unsubscribe_skip_live.rs | 114 +++++++++++ 6 files changed, 358 insertions(+), 4 deletions(-) create mode 100644 rust/crates/mxaccess-compat/tests/buffered_recovery_replay_live.rs create mode 100644 rust/crates/mxaccess-compat/tests/buffered_unsubscribe_skip_live.rs diff --git a/design/followups.md b/design/followups.md index d0a5008..2b0fce0 100644 --- a/design/followups.md +++ b/design/followups.md @@ -26,7 +26,14 @@ Between each publish: wait for the crate to be indexed before the next one's `ca **Resolves when:** crates.io shows all 9 crates published + the V1 tag is pushed. ### F49 — Live verification sweep for the M6 features -**Status:** Steps 1, 4, 5 resolved 2026-05-06 (`docs/M6-live-verification.md`). F56 turned out to be a real Rust-port bug (missing `EnsurePublisherConnected` RPC pair) and was fixed; both `subscribe` and `subscribe_buffered` now drive `0x33` DataUpdate frames end-to-end against `TestMachine_001.TestChangingInt`. Steps 2 (F45 recovery replay live) and 3 (F47 buffered unsubscribe skip live) remain — they're now executable on this fixture but not yet run. +**Status:** **Resolved 2026-05-06.** All five steps pass live against the local AVEVA install (`docs/M6-live-verification.md`): +- Step 1 (F36 buffered subscribe) — `tests/buffered_subscribe_live.rs` +- Step 2 (F45 buffered recovery replay) — `tests/buffered_recovery_replay_live.rs` +- Step 3 (F47 buffered unsubscribe skip) — `tests/buffered_unsubscribe_skip_live.rs` +- Step 4 (F40 metrics smoke) — `tests/metrics_smoke_live.rs` +- Step 5 (F54 OnWriteComplete) — `tests/lmx_write_complete_live.rs` + +F55 (DCOM-managed `INmxSvcCallback`) and F56 (missing `EnsurePublisherConnected` + AdviseSupervisory after RegisterReference for buffered) were the two real Rust-port bugs uncovered along the way; both are resolved. **Severity:** P1 — closes the live-evidence gap for the M6 work that landed unit-only this session. **Source:** F36, F40, F45, F47, F54 closeouts — each ships with unit tests but most were not exercised against the live AVEVA install in this session. **Blocked-by:** F12 hardening (`Session::connect_nmx_auto` returns `RPC_S_SERVER_UNAVAILABLE` (1722) under `cargo test`'s tokio multi-thread runtime — see "Live attempt 2026-05-06" below). The COM-activation path itself works in isolation (`cargo run -p mxaccess-rpc --example com-marshal-probe --features windows-com` succeeds), so the failure is downstream — likely a COM apartment threading issue when CoInitializeEx runs on a tokio worker thread. diff --git a/docs/M6-live-verification.md b/docs/M6-live-verification.md index 5abfd5c..1526acd 100644 --- a/docs/M6-live-verification.md +++ b/docs/M6-live-verification.md @@ -9,8 +9,8 @@ The sweep is gated on `MX_LIVE=1` env (populate via `tools/Setup-LiveProbeEnv.ps | Step | Feature | Test | Outcome | |---|---|---|---| | 1 | F36 buffered subscribe | `cargo test -p mxaccess-compat --features live-windows-com --test buffered_subscribe_live -- --ignored --nocapture` | **Pass** (resolved by F56 / EnsurePublisherConnected). | -| 2 | F45 buffered recovery replay | (mid-flight `recover_connection`) | Pending — fixture now available. | -| 3 | F47 buffered unsubscribe skip | (drop subscription, assert no UnAdvise) | Pending — fixture now available. | +| 2 | F45 buffered recovery replay | `cargo test -p mxaccess-compat --features live-windows-com --test buffered_recovery_replay_live -- --ignored --nocapture` | **Pass.** | +| 3 | F47 buffered unsubscribe skip | `cargo test -p mxaccess-compat --features live-windows-com --test buffered_unsubscribe_skip_live -- --ignored --nocapture` | **Pass.** | | 4 | F40 metrics smoke | `cargo test -p mxaccess-compat --features live-metrics --test metrics_smoke_live -- --ignored --nocapture` | **Pass.** | | 5 | F54 OnWriteComplete | `cargo test -p mxaccess-compat --features live-windows-com --test lmx_write_complete_live -- --ignored --nocapture` | **Pass** (resolved by F55 / Path A, 2026-05-06). | @@ -42,6 +42,46 @@ test live::buffered_subscribe_yields_updates ... ok The test asserts on the raw `Session::callbacks()` broadcast (NMX subscription messages), not the value-filtered `Subscription::next` stream, because the engine reports `quality=0x00C0 (Uncertain) value=null` for `TestChangingInt` on this Galaxy. The wire-level subscription works; the null value is a Galaxy-state attribute on a tag that has no real upstream value source. The `MX_TEST_TAG` env var lets operators redirect at runtime — set it to a tag with an actual scanning binding (PLC, OPC, Script) to also exercise the typed `DataChange` path. +## Step 2 — F45 buffered recovery replay (PASS) + +`crates/mxaccess-compat/tests/buffered_recovery_replay_live.rs`: + +1. Subscribe buffered to `TestMachine_001.TestChangingInt`. +2. Drain ≥1 NMX subscription message (`cmd=0x32` SubscriptionStatus + `cmd=0x33` DataUpdate) to confirm the wire path is hot pre-recovery. +3. Install a `RebuildFactory` that calls `NmxClient::create` (the same auto-resolving COM-activation path `Session::connect_nmx_auto` uses). +4. Call `Session::recover_connection(RecoveryPolicy::default())`. +5. Drain ≥1 NMX subscription message post-recovery. + +```text +buffered subscribed, correlation_id = [...] +[pre-recovery 0] cmd=0x32 record_count=1 +[pre-recovery 1] cmd=0x33 record_count=1 +pre-recovery: drained 2 NMX subscription messages +triggering recover_connection +recover_connection returned Ok — F45 buffered replay path executed +[post-recovery 0] cmd=0x33 record_count=1 +[post-recovery 1] cmd=0x33 record_count=1 +post-recovery: drained 2 NMX subscription messages +``` + +The replay branch in `recover_connection_core` (`session.rs:1428-...`) re-issues `RegisterReference` (NOT `AdviseSupervisory`) for the buffered entry, mirroring `MxNativeSession.ReAdviseSubscription` (`cs:538-569`). Structural property is unit-tested; this live test confirms the engine actually picks back up after the rebuild + replay. + +## Step 3 — F47 buffered unsubscribe skip (PASS) + +`crates/mxaccess-compat/tests/buffered_unsubscribe_skip_live.rs`: + +1. Subscribe buffered to `TestMachine_001.TestChangingInt`. +2. Sleep 750ms so the engine has DataUpdate frames in flight. +3. Call `Session::unsubscribe(sub)`. +4. Assert it returned `Ok` without surfacing transport or HRESULT errors. + +```text +buffered subscribed, correlation_id = [...] +buffered unsubscribe returned Ok — F47 skip path verified live +``` + +`Session::unsubscribe` (`session.rs:2261`) probes the registry for the subscription's mode; if `Buffered { .. }`, it skips the `nmx.un_advise(...)` wire call entirely. Mirrors the .NET reference's `if (!subscription.IsBuffered)` guard at `MxNativeSession.cs:361-381`. If the implementation accidentally emitted `UnAdvise` for a buffered correlation id, the engine would return non-zero HRESULT (no matching plain advise to retract) — surfacing as a panic in this test. + ## Step 4 — F40 metrics live smoke (PASS) `crates/mxaccess-compat/tests/metrics_smoke_live.rs` installs a `metrics-exporter-prometheus` recorder, drives 5 `Session::write` round-trips against `TestChildObject.TestInt`, then `shutdown_nmx`, then renders the Prometheus snapshot. Asserts the M6-registered metric names appear with non-zero values. Sample snapshot: @@ -105,9 +145,16 @@ cargo test -p mxaccess-compat --features live-metrics ` $env:MX_TEST_TAG = "TestMachine_001.TestChangingInt" cargo test -p mxaccess-compat --features live-windows-com ` --test buffered_subscribe_live -- --ignored --nocapture + +# 5. Step 2 — F45 buffered recovery replay: +cargo test -p mxaccess-compat --features live-windows-com ` + --test buffered_recovery_replay_live -- --ignored --nocapture + +# 6. Step 3 — F47 buffered unsubscribe skip: +cargo test -p mxaccess-compat --features live-windows-com ` + --test buffered_unsubscribe_skip_live -- --ignored --nocapture ``` ## Open work -- **F49 steps 2 + 3** — recovery replay and unsubscribe-skip live verification. Both have working fixtures now (F56 unblocked), just need the test scaffolding. - **F50** — residual Frida capture for Suspend/Activate (independent of F49). diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 18e169d..f4864ff 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -705,7 +705,9 @@ dependencies = [ "metrics", "metrics-exporter-prometheus", "mxaccess", + "mxaccess-codec", "mxaccess-galaxy", + "mxaccess-nmx", "mxaccess-rpc", "thiserror 2.0.18", "tokio", diff --git a/rust/crates/mxaccess-compat/Cargo.toml b/rust/crates/mxaccess-compat/Cargo.toml index 6aaa094..5c15b33 100644 --- a/rust/crates/mxaccess-compat/Cargo.toml +++ b/rust/crates/mxaccess-compat/Cargo.toml @@ -31,6 +31,11 @@ mxaccess-rpc = { path = "../mxaccess-rpc", version = "0.0.0" } # silently accepts for writes but doesn't dispatch DataUpdate frames # against. The buffered live test resolves real IDs via SqlTagResolver. mxaccess-galaxy = { path = "../mxaccess-galaxy", version = "0.0.0", features = ["galaxy-resolver"] } +# F49 step 2 — recovery replay test needs the +# `mxaccess::RebuildFactory` typedef's NmxClient + the +# NmxSubscriptionMessage type for the broadcast receiver signature. +mxaccess-nmx = { path = "../mxaccess-nmx", version = "0.0.0", features = ["windows-com"] } +mxaccess-codec = { path = "../mxaccess-codec", version = "0.0.0" } # Live tests use tracing-subscriber to dump router/dcom_sink trace # events on demand (set RUST_LOG=mxaccess=trace,mxaccess_callback=trace). tracing = { workspace = true } diff --git a/rust/crates/mxaccess-compat/tests/buffered_recovery_replay_live.rs b/rust/crates/mxaccess-compat/tests/buffered_recovery_replay_live.rs new file mode 100644 index 0000000..dd13f2e --- /dev/null +++ b/rust/crates/mxaccess-compat/tests/buffered_recovery_replay_live.rs @@ -0,0 +1,179 @@ +//! F49 step 2 — F45 buffered-recovery-replay live verification. +//! +//! Subscribe buffered, force `Session::recover_connection` mid-flight, +//! assert the replay branch issued `RegisterReference` (NOT +//! `AdviseSupervisory`) by observing that the subscription continues +//! to receive `0x33` DataUpdate frames after the recovery completes. +//! +//! Mirrors the .NET reference's `MxNativeSession.ReAdviseSubscription` +//! (`MxNativeSession.cs:538-569`) which branches on +//! `subscription.IsBuffered` to pick the right replay op. + +#![allow( + clippy::unwrap_used, + clippy::expect_used, + clippy::indexing_slicing, + clippy::panic +)] + +#[cfg(all(windows, feature = "live-windows-com"))] +mod live { + use std::sync::Arc; + use std::time::{Duration, Instant}; + + use mxaccess::{BufferedOptions, RecoveryPolicy, Session, SessionOptions}; + use mxaccess_galaxy::SqlTagResolver; + use mxaccess_nmx::NmxClient; + use mxaccess_rpc::ntlm::NtlmClientContext; + + fn ntlm_from_test_env() -> NtlmClientContext { + let user = std::env::var("MX_TEST_USER").expect("MX_TEST_USER"); + let password = std::env::var("MX_TEST_PASSWORD").expect("MX_TEST_PASSWORD"); + let domain = std::env::var("MX_TEST_DOMAIN").unwrap_or_default(); + let hostname = std::env::var("COMPUTERNAME").unwrap_or_default(); + NtlmClientContext::new(&user, &password, &domain, Some(&hostname)) + } + + /// Drain the broadcast until at least `target` raw NMX subscription + /// messages arrive or the deadline passes. Returns the count. + async fn drain_until( + rx: &mut tokio::sync::broadcast::Receiver< + Arc, + >, + target: usize, + deadline: Instant, + label: &str, + ) -> usize { + let mut received = 0; + while received < target && Instant::now() < deadline { + match tokio::time::timeout(Duration::from_secs(5), rx.recv()).await { + Ok(Ok(msg)) => { + eprintln!( + "[{label} {received}] cmd=0x{:02x} record_count={}", + msg.command, msg.record_count + ); + received += 1; + } + Ok(Err(_)) => break, + Err(_) => eprintln!("5s gap on {label} broadcast"), + } + } + received + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + #[ignore] + async fn buffered_recovery_replays_register_reference() { + if std::env::var_os("MX_LIVE").is_none() { + eprintln!("MX_LIVE not set — skipping live test"); + return; + } + let tag = std::env::var("MX_TEST_TAG") + .unwrap_or_else(|_| "TestMachine_001.TestChangingInt".to_string()); + + let _ = tracing_subscriber::fmt() + .with_env_filter( + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")), + ) + .with_test_writer() + .try_init(); + + let galaxy_db = std::env::var("MX_GALAXY_DB").expect("MX_GALAXY_DB"); + let resolver = Arc::new( + SqlTagResolver::from_ado_string(&galaxy_db).expect("SqlTagResolver"), + ); + + // Permissive recovery policy — let the test drive a single + // attempt synchronously. + let recovery = RecoveryPolicy::default(); + + let session = Session::connect_nmx_auto( + ntlm_from_test_env, + SessionOptions::default(), + resolver, + recovery, + ) + .await + .expect("connect_nmx_auto"); + eprintln!("session connected"); + + // Install a recovery factory that rebuilds NmxClient via the + // same auto-resolving COM-activation path connect_nmx_auto + // uses. + let factory: mxaccess::RebuildFactory = Arc::new(|| { + Box::pin(async { + NmxClient::create(ntlm_from_test_env).await + }) + }); + session.set_recovery_factory(factory).await; + + // Subscribe buffered + drain a few pre-recovery frames to + // confirm the wire path is hot. + let mut callbacks_rx = session.callbacks(); + let opts = BufferedOptions { + update_interval_ms: 1_000, + }; + let sub = session + .subscribe_buffered(&tag, opts) + .await + .expect("subscribe_buffered"); + eprintln!( + "buffered subscribed, correlation_id = {:02x?}", + sub.correlation_id() + ); + + let pre = drain_until( + &mut callbacks_rx, + 2, + Instant::now() + Duration::from_secs(15), + "pre-recovery", + ) + .await; + assert!(pre >= 1, "pre-recovery: subscription wire path is dead"); + eprintln!("pre-recovery: drained {pre} NMX subscription messages"); + + // Force a transport rebuild + advise replay. The recovery + // should re-issue `RegisterReference` (NOT + // `AdviseSupervisory`) for the buffered entry — verified + // structurally by `recover_connection_replays_register_reference_for_buffered` + // in the unit-test suite. Live-side, we assert that the post- + // recovery wire path keeps producing NMX subscription messages. + eprintln!("triggering recover_connection"); + session + .recover_connection(RecoveryPolicy::default()) + .await + .expect("recover_connection"); + eprintln!("recover_connection returned Ok — F45 buffered replay path executed"); + + // Drain post-recovery frames. The NmxClient was rebuilt under + // the hood; the broadcast channel is the same, but the + // re-issued `RegisterReference` should kick off a fresh + // SubscriptionStatus + DataUpdate sequence. + let post = drain_until( + &mut callbacks_rx, + 2, + Instant::now() + Duration::from_secs(15), + "post-recovery", + ) + .await; + assert!( + post >= 1, + "post-recovery: no NMX messages after recover_connection — buffered replay didn't \ + re-establish the subscription" + ); + eprintln!("post-recovery: drained {post} NMX subscription messages"); + + session.unsubscribe(sub).await.expect("unsubscribe"); + session.shutdown_nmx().await.expect("shutdown"); + } +} + +#[cfg(not(all(windows, feature = "live-windows-com")))] +mod live { + #[test] + #[ignore] + fn buffered_recovery_replays_register_reference() { + eprintln!("test skipped: requires Windows + live-windows-com feature"); + } +} diff --git a/rust/crates/mxaccess-compat/tests/buffered_unsubscribe_skip_live.rs b/rust/crates/mxaccess-compat/tests/buffered_unsubscribe_skip_live.rs new file mode 100644 index 0000000..8500fb5 --- /dev/null +++ b/rust/crates/mxaccess-compat/tests/buffered_unsubscribe_skip_live.rs @@ -0,0 +1,114 @@ +//! F49 step 3 — F47 buffered-unsubscribe skip live verification. +//! +//! `Session::unsubscribe` on a buffered subscription must NOT emit a +//! wire-side `UnAdvise` op (mirrors the .NET reference's +//! `if (!subscription.IsBuffered)` guard at `MxNativeSession.cs:361-381`). +//! Buffered subscriptions are unwound by the engine when the +//! `RegisterReference` handle goes away — there's no item-level advise +//! to retract. +//! +//! Structural verification is exhaustive at the unit level (see +//! `unsubscribe_skips_un_advise_for_buffered_subscription` in +//! `crates/mxaccess/src/session.rs`). This live test confirms the +//! behaviour against a real engine: subscribe buffered, immediately +//! unsubscribe, verify both calls succeed without surfacing transport +//! or HRESULT errors. If `unsubscribe` accidentally issued an +//! `UnAdvise` for a buffered correlation id, the engine would either +//! reject it (HRESULT != 0) or silently break the unrelated state — +//! both surface as a panic here. + +#![allow( + clippy::unwrap_used, + clippy::expect_used, + clippy::indexing_slicing, + clippy::panic +)] + +#[cfg(all(windows, feature = "live-windows-com"))] +mod live { + use std::sync::Arc; + + use mxaccess::{BufferedOptions, RecoveryPolicy, Session, SessionOptions}; + use mxaccess_galaxy::SqlTagResolver; + use mxaccess_rpc::ntlm::NtlmClientContext; + + fn ntlm_from_test_env() -> NtlmClientContext { + let user = std::env::var("MX_TEST_USER").expect("MX_TEST_USER"); + let password = std::env::var("MX_TEST_PASSWORD").expect("MX_TEST_PASSWORD"); + let domain = std::env::var("MX_TEST_DOMAIN").unwrap_or_default(); + let hostname = std::env::var("COMPUTERNAME").unwrap_or_default(); + NtlmClientContext::new(&user, &password, &domain, Some(&hostname)) + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + #[ignore] + async fn buffered_unsubscribe_skips_unadvise() { + if std::env::var_os("MX_LIVE").is_none() { + eprintln!("MX_LIVE not set — skipping live test"); + return; + } + let tag = std::env::var("MX_TEST_TAG") + .unwrap_or_else(|_| "TestMachine_001.TestChangingInt".to_string()); + + let _ = tracing_subscriber::fmt() + .with_env_filter( + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")), + ) + .with_test_writer() + .try_init(); + + let galaxy_db = std::env::var("MX_GALAXY_DB").expect("MX_GALAXY_DB"); + let resolver = Arc::new( + SqlTagResolver::from_ado_string(&galaxy_db).expect("SqlTagResolver"), + ); + + let session = Session::connect_nmx_auto( + ntlm_from_test_env, + SessionOptions::default(), + resolver, + RecoveryPolicy::default(), + ) + .await + .expect("connect_nmx_auto"); + eprintln!("session connected"); + + let opts = BufferedOptions { + update_interval_ms: 1_000, + }; + let sub = session + .subscribe_buffered(&tag, opts) + .await + .expect("subscribe_buffered"); + eprintln!( + "buffered subscribed, correlation_id = {:02x?}", + sub.correlation_id() + ); + + // Sub-second hold so the engine has at least one DataUpdate + // tick in flight when we unsubscribe. + tokio::time::sleep(std::time::Duration::from_millis(750)).await; + + // The contract: unsubscribe on a buffered subscription + // returns Ok and does NOT issue UnAdvise on the wire. + // If it incorrectly emitted UnAdvise for a buffered + // correlation id, the engine would return non-zero HRESULT + // (no matching plain advise to retract) and surface here. + session + .unsubscribe(sub) + .await + .expect("unsubscribe (buffered) must succeed without emitting UnAdvise"); + eprintln!("buffered unsubscribe returned Ok — F47 skip path verified live"); + + session.shutdown_nmx().await.expect("shutdown"); + } +} + +#[cfg(not(all(windows, feature = "live-windows-com")))] +mod live { + #[test] + #[ignore] + fn buffered_unsubscribe_skips_unadvise() { + eprintln!("test skipped: requires Windows + live-windows-com feature"); + } +}