mxaccess: fix 9 unit tests broken silently by F56's ensure_publisher_connected
Workspace gate sweep flagged 9 unit tests in mxaccess::session that
had been silently failing since F56 landed (commit 5e11b30). Root
cause: F56 added ensure_publisher_connected (issuing
INmxService2::Connect + AddSubscriberEngine before each
AdviseSupervisory) but the in-process fake-NMX-server fixtures'
responses vec sizes weren't bumped. Once the fake server ran out of
responses mid-handshake, the connection was closed and the client
got ConnectionAborted (10053).
Fix: bumped each test's unauthenticated_server / recording_server
response count by 2 to cover the new pair of RPCs. Tests touched:
- subscribe_then_unsubscribe_round_trip (2 → 4 responses)
- two_subscribes_produce_distinct_correlation_ids (4 → 6)
- subscription_stream_yields_data_change_for_matching_correlation (1 → 3)
- subscription_stream_filters_out_mismatched_correlation_for_status (1 → 3)
- subscription_stream_keeps_data_update_regardless_of_correlation (1 → 3)
- subscribe_populates_registry_unsubscribe_clears_it (2 → 4)
- read_returns_first_data_change_within_timeout (2 → 4)
- read_returns_timeout_when_no_data_arrives (2 → 4)
- unsubscribe_skips_un_advise_for_buffered_subscription (2 → 3
+ mid-flow assertion bumped from len()==1 to len()==3)
The two_subscribes test only adds 2 (not 4) extra responses because
the second subscribe hits the per-engine publisher_endpoints cache.
Workspace gate post-fix: 847 tests pass, 0 failed, 9 ignored
(live-only). Clippy + bench clean. Pinned in
docs/M6-live-verification.md "Workspace gate (2026-05-07)" so the
test-fixture lag is recorded for future audits.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -2952,8 +2952,16 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn subscribe_then_unsubscribe_round_trip() {
|
||||
// Two RPCs: AdviseSupervisory + UnAdvise. Both return HRESULT 0.
|
||||
let (addr, handle) = unauthenticated_server(vec![(0, Vec::new()), (0, Vec::new())]).await;
|
||||
// Four RPCs: Connect + AddSubscriberEngine (F56's
|
||||
// ensure_publisher_connected) + AdviseSupervisory + UnAdvise.
|
||||
// All return HRESULT 0.
|
||||
let (addr, handle) = unauthenticated_server(vec![
|
||||
(0, Vec::new()),
|
||||
(0, Vec::new()),
|
||||
(0, Vec::new()),
|
||||
(0, Vec::new()),
|
||||
])
|
||||
.await;
|
||||
let resolver: Arc<dyn Resolver> = Arc::new(StaticResolver::new(&[(
|
||||
"TestObj.TestInt",
|
||||
sample_metadata(),
|
||||
@@ -3004,12 +3012,15 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn two_subscribes_produce_distinct_correlation_ids() {
|
||||
// Two AdviseSupervisory calls + two UnAdvise calls.
|
||||
// Six RPCs: Connect + AddSubscriberEngine (once, cached on the
|
||||
// 2nd subscribe) + 2 AdviseSupervisory + 2 UnAdvise.
|
||||
let (addr, handle) = unauthenticated_server(vec![
|
||||
(0, Vec::new()),
|
||||
(0, Vec::new()),
|
||||
(0, Vec::new()),
|
||||
(0, Vec::new()),
|
||||
(0, Vec::new()),
|
||||
(0, Vec::new()),
|
||||
])
|
||||
.await;
|
||||
let resolver: Arc<dyn Resolver> = Arc::new(StaticResolver::new(&[(
|
||||
@@ -3242,7 +3253,9 @@ mod tests {
|
||||
async fn subscription_stream_yields_data_change_for_matching_correlation() {
|
||||
use futures_util::StreamExt;
|
||||
|
||||
let (addr, handle) = unauthenticated_server(vec![(0, Vec::new())]).await;
|
||||
// Three RPCs: Connect + AddSubscriberEngine (F56) + AdviseSupervisory.
|
||||
let (addr, handle) =
|
||||
unauthenticated_server(vec![(0, Vec::new()), (0, Vec::new()), (0, Vec::new())]).await;
|
||||
let resolver: Arc<dyn Resolver> = Arc::new(StaticResolver::new(&[(
|
||||
"TestObj.TestInt",
|
||||
sample_metadata(),
|
||||
@@ -3287,7 +3300,9 @@ mod tests {
|
||||
async fn subscription_stream_filters_out_mismatched_correlation_for_status() {
|
||||
use futures_util::StreamExt;
|
||||
|
||||
let (addr, handle) = unauthenticated_server(vec![(0, Vec::new())]).await;
|
||||
// Three RPCs: Connect + AddSubscriberEngine (F56) + AdviseSupervisory.
|
||||
let (addr, handle) =
|
||||
unauthenticated_server(vec![(0, Vec::new()), (0, Vec::new()), (0, Vec::new())]).await;
|
||||
let resolver: Arc<dyn Resolver> = Arc::new(StaticResolver::new(&[(
|
||||
"TestObj.TestInt",
|
||||
sample_metadata(),
|
||||
@@ -3322,8 +3337,9 @@ mod tests {
|
||||
use futures_util::StreamExt;
|
||||
// 0x33 DataUpdate has no item_correlation_id; the .NET-style
|
||||
// filter passes them through to all subscriptions.
|
||||
|
||||
let (addr, handle) = unauthenticated_server(vec![(0, Vec::new())]).await;
|
||||
// Three RPCs: Connect + AddSubscriberEngine (F56) + AdviseSupervisory.
|
||||
let (addr, handle) =
|
||||
unauthenticated_server(vec![(0, Vec::new()), (0, Vec::new()), (0, Vec::new())]).await;
|
||||
let resolver: Arc<dyn Resolver> = Arc::new(StaticResolver::new(&[(
|
||||
"TestObj.TestInt",
|
||||
sample_metadata(),
|
||||
@@ -3472,7 +3488,15 @@ mod tests {
|
||||
// F16: every successful subscribe() inserts into the
|
||||
// SubscriptionEntry registry; unsubscribe() removes it.
|
||||
// Recovery walks this registry to replay AdviseSupervisory.
|
||||
let (addr, handle) = unauthenticated_server(vec![(0, Vec::new()), (0, Vec::new())]).await;
|
||||
// Four RPCs: Connect + AddSubscriberEngine (F56) +
|
||||
// AdviseSupervisory + UnAdvise.
|
||||
let (addr, handle) = unauthenticated_server(vec![
|
||||
(0, Vec::new()),
|
||||
(0, Vec::new()),
|
||||
(0, Vec::new()),
|
||||
(0, Vec::new()),
|
||||
])
|
||||
.await;
|
||||
let resolver: Arc<dyn Resolver> = Arc::new(StaticResolver::new(&[(
|
||||
"TestObj.TestInt",
|
||||
sample_metadata(),
|
||||
@@ -3802,8 +3826,15 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn read_returns_first_data_change_within_timeout() {
|
||||
// Server: AdviseSupervisory ack + UnAdvise ack.
|
||||
let (addr, handle) = unauthenticated_server(vec![(0, Vec::new()), (0, Vec::new())]).await;
|
||||
// Server: Connect + AddSubscriberEngine (F56) +
|
||||
// AdviseSupervisory + UnAdvise.
|
||||
let (addr, handle) = unauthenticated_server(vec![
|
||||
(0, Vec::new()),
|
||||
(0, Vec::new()),
|
||||
(0, Vec::new()),
|
||||
(0, Vec::new()),
|
||||
])
|
||||
.await;
|
||||
let resolver: Arc<dyn Resolver> = Arc::new(StaticResolver::new(&[(
|
||||
"TestObj.TestInt",
|
||||
sample_metadata(),
|
||||
@@ -3851,9 +3882,16 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn read_returns_timeout_when_no_data_arrives() {
|
||||
// Server only handles the AdviseSupervisory + UnAdvise (no data
|
||||
// injection). Read must hit the timeout branch.
|
||||
let (addr, handle) = unauthenticated_server(vec![(0, Vec::new()), (0, Vec::new())]).await;
|
||||
// Server only handles the Connect + AddSubscriberEngine (F56) +
|
||||
// AdviseSupervisory + UnAdvise (no data injection). Read must
|
||||
// hit the timeout branch.
|
||||
let (addr, handle) = unauthenticated_server(vec![
|
||||
(0, Vec::new()),
|
||||
(0, Vec::new()),
|
||||
(0, Vec::new()),
|
||||
(0, Vec::new()),
|
||||
])
|
||||
.await;
|
||||
let resolver: Arc<dyn Resolver> = Arc::new(StaticResolver::new(&[(
|
||||
"TestObj.TestInt",
|
||||
sample_metadata(),
|
||||
@@ -4407,21 +4445,29 @@ mod tests {
|
||||
/// the negative control; this test pins the buffered branch.
|
||||
#[tokio::test]
|
||||
async fn unsubscribe_skips_un_advise_for_buffered_subscription() {
|
||||
let (addr, recorded, handle) =
|
||||
recording_server(vec![(0, Vec::new()), (0, Vec::new())]).await;
|
||||
// Three RPCs: Connect + AddSubscriberEngine (F56) +
|
||||
// AdviseSupervisory. The buffered unsubscribe MUST NOT add a
|
||||
// fourth (F47 skips UnAdvise on buffered drop).
|
||||
let (addr, recorded, handle) = recording_server(vec![
|
||||
(0, Vec::new()),
|
||||
(0, Vec::new()),
|
||||
(0, Vec::new()),
|
||||
])
|
||||
.await;
|
||||
let resolver: Arc<dyn Resolver> = Arc::new(StaticResolver::new(&[(
|
||||
"TestObj.TestInt",
|
||||
sample_metadata(),
|
||||
)]));
|
||||
let session = connect_test_session(addr, resolver).await.unwrap();
|
||||
|
||||
// Issue a plain subscribe — server records AdviseSupervisory.
|
||||
// Issue a plain subscribe — server records Connect +
|
||||
// AddSubscriberEngine + AdviseSupervisory.
|
||||
let sub = session.subscribe("TestObj.TestInt").await.unwrap();
|
||||
let cid = sub.correlation_id;
|
||||
assert_eq!(
|
||||
recorded.lock().unwrap().len(),
|
||||
1,
|
||||
"subscribe should issue 1 RPC"
|
||||
3,
|
||||
"subscribe should issue 3 RPCs (Connect + AddSubscriberEngine + AdviseSupervisory)"
|
||||
);
|
||||
|
||||
// Mutate the registry entry's mode to Buffered (synthesise the
|
||||
@@ -4438,13 +4484,14 @@ mod tests {
|
||||
}
|
||||
|
||||
// Unsubscribe the now-buffered entry. F47 contract: NO
|
||||
// UnAdvise is emitted on the wire; recorded count stays at 1.
|
||||
// UnAdvise is emitted on the wire; recorded count stays at 3
|
||||
// (the Connect + AddSubscriberEngine + AdviseSupervisory from
|
||||
// the original subscribe).
|
||||
session.unsubscribe(sub).await.unwrap();
|
||||
assert_eq!(
|
||||
recorded.lock().unwrap().len(),
|
||||
1,
|
||||
"buffered unsubscribe must not issue UnAdvise; recorded RPC count must stay at 1 \
|
||||
(the original AdviseSupervisory)"
|
||||
3,
|
||||
"buffered unsubscribe must not issue UnAdvise; recorded RPC count must stay at 3"
|
||||
);
|
||||
|
||||
// Registry is still cleared — F47's skip applies only to the
|
||||
|
||||
Reference in New Issue
Block a user