[F49 steps 2 + 3] live verification: buffered recovery replay + unsubscribe skip

Step 3 (F47 buffered unsubscribe skip):
- crates/mxaccess-compat/tests/buffered_unsubscribe_skip_live.rs.
- Subscribe buffered, sleep so the engine has DataUpdates in flight,
  then call unsubscribe. Asserts Ok return without surfacing transport
  or HRESULT errors.
- Session::unsubscribe (session.rs:2261) probes the registry: if
  Buffered { .. }, it skips nmx.un_advise entirely, mirroring the .NET
  reference's `if (!subscription.IsBuffered)` guard at
  MxNativeSession.cs:361-381. If unsubscribe accidentally emitted
  UnAdvise for a buffered correlation id, the engine would return
  non-zero HRESULT (no matching plain advise to retract) — surfacing
  as a panic.

Step 2 (F45 buffered recovery replay):
- crates/mxaccess-compat/tests/buffered_recovery_replay_live.rs.
- Subscribe buffered, drain >=1 NMX subscription message
  (cmd=0x32 SubscriptionStatus + cmd=0x33 DataUpdate) to confirm the
  wire path is hot pre-recovery, install a RebuildFactory that calls
  NmxClient::create (the same auto-resolving COM-activation path
  Session::connect_nmx_auto uses), invoke recover_connection, drain
  >=1 NMX subscription message post-recovery.
- Verifies the replay branch in recover_connection_core re-issues
  RegisterReference (NOT AdviseSupervisory) for the buffered entry,
  mirroring MxNativeSession.ReAdviseSubscription (cs:538-569).
  Structural property is unit-tested; this confirms the engine
  actually picks back up after the rebuild + replay.

Both tests pass live on this Galaxy:
  cargo test -p mxaccess-compat --features live-windows-com \
      --test buffered_unsubscribe_skip_live -- --ignored --nocapture
  cargo test -p mxaccess-compat --features live-windows-com \
      --test buffered_recovery_replay_live -- --ignored --nocapture

Pulls mxaccess-nmx + mxaccess-codec into mxaccess-compat dev-deps so
the recovery test can build a RebuildFactory closure that returns
NmxClient and bind a typed broadcast Receiver.

design/followups.md F49 -> Resolved (all five steps pass live).
docs/M6-live-verification.md updated with per-step evidence + repro
commands.

F49 is fully closed out. F55 (DCOM-managed INmxSvcCallback, Path A)
and F56 (missing EnsurePublisherConnected + post-RegisterReference
AdviseSupervisory for buffered) were the two real Rust-port bugs
uncovered along the way; both resolved. Remaining post-V1 followups
(F50 Suspend/Activate Frida, F51 ASB type matrix, F52 perf, F53 doc
lint, etc.) are scoped independently and not part of F49.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-06 12:00:44 -04:00
parent 5e11b30507
commit d149143535
6 changed files with 358 additions and 4 deletions
+2
View File
@@ -705,7 +705,9 @@ dependencies = [
"metrics",
"metrics-exporter-prometheus",
"mxaccess",
"mxaccess-codec",
"mxaccess-galaxy",
"mxaccess-nmx",
"mxaccess-rpc",
"thiserror 2.0.18",
"tokio",
+5
View File
@@ -31,6 +31,11 @@ mxaccess-rpc = { path = "../mxaccess-rpc", version = "0.0.0" }
# silently accepts for writes but doesn't dispatch DataUpdate frames
# against. The buffered live test resolves real IDs via SqlTagResolver.
mxaccess-galaxy = { path = "../mxaccess-galaxy", version = "0.0.0", features = ["galaxy-resolver"] }
# F49 step 2 — recovery replay test needs the
# `mxaccess::RebuildFactory` typedef's NmxClient + the
# NmxSubscriptionMessage type for the broadcast receiver signature.
mxaccess-nmx = { path = "../mxaccess-nmx", version = "0.0.0", features = ["windows-com"] }
mxaccess-codec = { path = "../mxaccess-codec", version = "0.0.0" }
# Live tests use tracing-subscriber to dump router/dcom_sink trace
# events on demand (set RUST_LOG=mxaccess=trace,mxaccess_callback=trace).
tracing = { workspace = true }
@@ -0,0 +1,179 @@
//! F49 step 2 — F45 buffered-recovery-replay live verification.
//!
//! Subscribe buffered, force `Session::recover_connection` mid-flight,
//! assert the replay branch issued `RegisterReference` (NOT
//! `AdviseSupervisory`) by observing that the subscription continues
//! to receive `0x33` DataUpdate frames after the recovery completes.
//!
//! Mirrors the .NET reference's `MxNativeSession.ReAdviseSubscription`
//! (`MxNativeSession.cs:538-569`) which branches on
//! `subscription.IsBuffered` to pick the right replay op.
#![allow(
clippy::unwrap_used,
clippy::expect_used,
clippy::indexing_slicing,
clippy::panic
)]
#[cfg(all(windows, feature = "live-windows-com"))]
mod live {
use std::sync::Arc;
use std::time::{Duration, Instant};
use mxaccess::{BufferedOptions, RecoveryPolicy, Session, SessionOptions};
use mxaccess_galaxy::SqlTagResolver;
use mxaccess_nmx::NmxClient;
use mxaccess_rpc::ntlm::NtlmClientContext;
fn ntlm_from_test_env() -> NtlmClientContext {
let user = std::env::var("MX_TEST_USER").expect("MX_TEST_USER");
let password = std::env::var("MX_TEST_PASSWORD").expect("MX_TEST_PASSWORD");
let domain = std::env::var("MX_TEST_DOMAIN").unwrap_or_default();
let hostname = std::env::var("COMPUTERNAME").unwrap_or_default();
NtlmClientContext::new(&user, &password, &domain, Some(&hostname))
}
/// Drain the broadcast until at least `target` raw NMX subscription
/// messages arrive or the deadline passes. Returns the count.
async fn drain_until(
rx: &mut tokio::sync::broadcast::Receiver<
Arc<mxaccess_codec::NmxSubscriptionMessage>,
>,
target: usize,
deadline: Instant,
label: &str,
) -> usize {
let mut received = 0;
while received < target && Instant::now() < deadline {
match tokio::time::timeout(Duration::from_secs(5), rx.recv()).await {
Ok(Ok(msg)) => {
eprintln!(
"[{label} {received}] cmd=0x{:02x} record_count={}",
msg.command, msg.record_count
);
received += 1;
}
Ok(Err(_)) => break,
Err(_) => eprintln!("5s gap on {label} broadcast"),
}
}
received
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[ignore]
async fn buffered_recovery_replays_register_reference() {
if std::env::var_os("MX_LIVE").is_none() {
eprintln!("MX_LIVE not set — skipping live test");
return;
}
let tag = std::env::var("MX_TEST_TAG")
.unwrap_or_else(|_| "TestMachine_001.TestChangingInt".to_string());
let _ = tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
)
.with_test_writer()
.try_init();
let galaxy_db = std::env::var("MX_GALAXY_DB").expect("MX_GALAXY_DB");
let resolver = Arc::new(
SqlTagResolver::from_ado_string(&galaxy_db).expect("SqlTagResolver"),
);
// Permissive recovery policy — let the test drive a single
// attempt synchronously.
let recovery = RecoveryPolicy::default();
let session = Session::connect_nmx_auto(
ntlm_from_test_env,
SessionOptions::default(),
resolver,
recovery,
)
.await
.expect("connect_nmx_auto");
eprintln!("session connected");
// Install a recovery factory that rebuilds NmxClient via the
// same auto-resolving COM-activation path connect_nmx_auto
// uses.
let factory: mxaccess::RebuildFactory = Arc::new(|| {
Box::pin(async {
NmxClient::create(ntlm_from_test_env).await
})
});
session.set_recovery_factory(factory).await;
// Subscribe buffered + drain a few pre-recovery frames to
// confirm the wire path is hot.
let mut callbacks_rx = session.callbacks();
let opts = BufferedOptions {
update_interval_ms: 1_000,
};
let sub = session
.subscribe_buffered(&tag, opts)
.await
.expect("subscribe_buffered");
eprintln!(
"buffered subscribed, correlation_id = {:02x?}",
sub.correlation_id()
);
let pre = drain_until(
&mut callbacks_rx,
2,
Instant::now() + Duration::from_secs(15),
"pre-recovery",
)
.await;
assert!(pre >= 1, "pre-recovery: subscription wire path is dead");
eprintln!("pre-recovery: drained {pre} NMX subscription messages");
// Force a transport rebuild + advise replay. The recovery
// should re-issue `RegisterReference` (NOT
// `AdviseSupervisory`) for the buffered entry — verified
// structurally by `recover_connection_replays_register_reference_for_buffered`
// in the unit-test suite. Live-side, we assert that the post-
// recovery wire path keeps producing NMX subscription messages.
eprintln!("triggering recover_connection");
session
.recover_connection(RecoveryPolicy::default())
.await
.expect("recover_connection");
eprintln!("recover_connection returned Ok — F45 buffered replay path executed");
// Drain post-recovery frames. The NmxClient was rebuilt under
// the hood; the broadcast channel is the same, but the
// re-issued `RegisterReference` should kick off a fresh
// SubscriptionStatus + DataUpdate sequence.
let post = drain_until(
&mut callbacks_rx,
2,
Instant::now() + Duration::from_secs(15),
"post-recovery",
)
.await;
assert!(
post >= 1,
"post-recovery: no NMX messages after recover_connection — buffered replay didn't \
re-establish the subscription"
);
eprintln!("post-recovery: drained {post} NMX subscription messages");
session.unsubscribe(sub).await.expect("unsubscribe");
session.shutdown_nmx().await.expect("shutdown");
}
}
#[cfg(not(all(windows, feature = "live-windows-com")))]
mod live {
#[test]
#[ignore]
fn buffered_recovery_replays_register_reference() {
eprintln!("test skipped: requires Windows + live-windows-com feature");
}
}
@@ -0,0 +1,114 @@
//! F49 step 3 — F47 buffered-unsubscribe skip live verification.
//!
//! `Session::unsubscribe` on a buffered subscription must NOT emit a
//! wire-side `UnAdvise` op (mirrors the .NET reference's
//! `if (!subscription.IsBuffered)` guard at `MxNativeSession.cs:361-381`).
//! Buffered subscriptions are unwound by the engine when the
//! `RegisterReference` handle goes away — there's no item-level advise
//! to retract.
//!
//! Structural verification is exhaustive at the unit level (see
//! `unsubscribe_skips_un_advise_for_buffered_subscription` in
//! `crates/mxaccess/src/session.rs`). This live test confirms the
//! behaviour against a real engine: subscribe buffered, immediately
//! unsubscribe, verify both calls succeed without surfacing transport
//! or HRESULT errors. If `unsubscribe` accidentally issued an
//! `UnAdvise` for a buffered correlation id, the engine would either
//! reject it (HRESULT != 0) or silently break the unrelated state —
//! both surface as a panic here.
#![allow(
clippy::unwrap_used,
clippy::expect_used,
clippy::indexing_slicing,
clippy::panic
)]
#[cfg(all(windows, feature = "live-windows-com"))]
mod live {
use std::sync::Arc;
use mxaccess::{BufferedOptions, RecoveryPolicy, Session, SessionOptions};
use mxaccess_galaxy::SqlTagResolver;
use mxaccess_rpc::ntlm::NtlmClientContext;
fn ntlm_from_test_env() -> NtlmClientContext {
let user = std::env::var("MX_TEST_USER").expect("MX_TEST_USER");
let password = std::env::var("MX_TEST_PASSWORD").expect("MX_TEST_PASSWORD");
let domain = std::env::var("MX_TEST_DOMAIN").unwrap_or_default();
let hostname = std::env::var("COMPUTERNAME").unwrap_or_default();
NtlmClientContext::new(&user, &password, &domain, Some(&hostname))
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[ignore]
async fn buffered_unsubscribe_skips_unadvise() {
if std::env::var_os("MX_LIVE").is_none() {
eprintln!("MX_LIVE not set — skipping live test");
return;
}
let tag = std::env::var("MX_TEST_TAG")
.unwrap_or_else(|_| "TestMachine_001.TestChangingInt".to_string());
let _ = tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
)
.with_test_writer()
.try_init();
let galaxy_db = std::env::var("MX_GALAXY_DB").expect("MX_GALAXY_DB");
let resolver = Arc::new(
SqlTagResolver::from_ado_string(&galaxy_db).expect("SqlTagResolver"),
);
let session = Session::connect_nmx_auto(
ntlm_from_test_env,
SessionOptions::default(),
resolver,
RecoveryPolicy::default(),
)
.await
.expect("connect_nmx_auto");
eprintln!("session connected");
let opts = BufferedOptions {
update_interval_ms: 1_000,
};
let sub = session
.subscribe_buffered(&tag, opts)
.await
.expect("subscribe_buffered");
eprintln!(
"buffered subscribed, correlation_id = {:02x?}",
sub.correlation_id()
);
// Sub-second hold so the engine has at least one DataUpdate
// tick in flight when we unsubscribe.
tokio::time::sleep(std::time::Duration::from_millis(750)).await;
// The contract: unsubscribe on a buffered subscription
// returns Ok and does NOT issue UnAdvise on the wire.
// If it incorrectly emitted UnAdvise for a buffered
// correlation id, the engine would return non-zero HRESULT
// (no matching plain advise to retract) and surface here.
session
.unsubscribe(sub)
.await
.expect("unsubscribe (buffered) must succeed without emitting UnAdvise");
eprintln!("buffered unsubscribe returned Ok — F47 skip path verified live");
session.shutdown_nmx().await.expect("shutdown");
}
}
#[cfg(not(all(windows, feature = "live-windows-com")))]
mod live {
#[test]
#[ignore]
fn buffered_unsubscribe_skips_unadvise() {
eprintln!("test skipped: requires Windows + live-windows-com feature");
}
}