From 1a1830f3bf07126dc4b6a165566bca3a9749e835 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Wed, 6 May 2026 05:58:57 -0400 Subject: [PATCH] [F47] mxaccess: unsubscribe skips UnAdvise for buffered subscriptions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Mirrors the .NET reference's `if (!subscription.IsBuffered)` guard at `MxNativeSession.cs:361-381`. The Rust port previously emitted an `UnAdvise` frame for both plain and buffered subscriptions; the buffered server-side registration is unwound by the engine when the `RegisterReference` handle goes away, so emitting an `UnAdvise` for buffered entries is at best a no-op extra frame and at worst could race with the engine's own teardown. Fix: branch `Session::unsubscribe` on `SubscriptionEntry::mode` (the discriminator F45 added). For `SubscriptionMode::Buffered { ... }`, skip the `un_advise` call and proceed directly to registry cleanup. For `SubscriptionMode::Plain`, retain the previous behaviour. The registry-entry probe runs first (separate lock acquisition) so the `is_buffered` decision doesn't hold the NMX-client mutex unnecessarily — common case where the entry is plain still acquires the NMX lock immediately after. The metrics counter `record_unadvise()` still fires on every public `unsubscribe` call regardless of mode — it tracks consumer-side unsubscribe rate, not wire-frame rate. That matches what dashboards expect from the public API. New unit test `unsubscribe_skips_un_advise_for_buffered_subscription` issues a plain subscribe (recorded as 1 RPC), mutates the registry entry to `SubscriptionMode::Buffered`, calls unsubscribe, and asserts the recorded RPC count stays at 1 (no UnAdvise emitted). The existing `subscribe_populates_registry_unsubscribe_clears_it` test serves as the negative control for the plain branch. Workspace 794 → 795 tests; clippy clean; rustdoc clean. Co-Authored-By: Claude Opus 4.7 (1M context) --- rust/crates/mxaccess/src/session.rs | 120 ++++++++++++++++++++++++---- 1 file changed, 104 insertions(+), 16 deletions(-) diff --git a/rust/crates/mxaccess/src/session.rs b/rust/crates/mxaccess/src/session.rs index 3ee8f95..25fb423 100644 --- a/rust/crates/mxaccess/src/session.rs +++ b/rust/crates/mxaccess/src/session.rs @@ -1473,30 +1473,55 @@ impl Session { self.ensure_connected()?; let inner = self.inner.clone(); let opts = &inner.options; - let mut nmx = inner.nmx.lock().await; - let hr = nmx - .un_advise( - opts.local_engine_id, - &subscription.metadata, - subscription.correlation_id, - opts.galaxy_id, - /* source_galaxy_id */ i32::from(opts.galaxy_id), - opts.source_platform_id, + + // F47 — `Session::unsubscribe` skips `UnAdvise` for buffered + // subscriptions, mirroring the .NET reference's + // `if (!subscription.IsBuffered)` guard at + // `MxNativeSession.cs:361-381`. Buffered subscriptions are + // unwound by the engine when the `RegisterReference` handle + // goes away — there's no item-level advise to retract. + // Probe the registry first so we know which mode the + // subscription was registered under. + let is_buffered = { + let reg = inner.subscriptions.lock().await; + matches!( + reg.get(&subscription.correlation_id), + Some(entry) if matches!(entry.mode, SubscriptionMode::Buffered { .. }) ) - .await - .map_err(map_nmx)?; - ensure_hresult_ok(hr)?; - drop(nmx); + }; + + if !is_buffered { + let mut nmx = inner.nmx.lock().await; + let hr = nmx + .un_advise( + opts.local_engine_id, + &subscription.metadata, + subscription.correlation_id, + opts.galaxy_id, + /* source_galaxy_id */ i32::from(opts.galaxy_id), + opts.source_platform_id, + ) + .await + .map_err(map_nmx)?; + ensure_hresult_ok(hr)?; + drop(nmx); + } + // F16: drop the subscription from the recovery registry too. - // We do this only on the success path — if UnAdvise itself - // failed, the server may still hold the supervisory record and - // a future recover_connection should re-issue the advise. + // For plain entries, we do this only on the UnAdvise success + // path — if UnAdvise itself failed, the server may still hold + // the supervisory record and a future recover_connection should + // re-issue the advise. Buffered entries always reach here + // because no UnAdvise was attempted. let registry_size = { let mut reg = inner.subscriptions.lock().await; reg.remove(&subscription.correlation_id); reg.len() }; // F40 — count the unadvise + update the gauge. + // For buffered entries no UnAdvise was emitted, but the + // counter still tracks consumer-side unsubscribe events so + // the rate matches the public API's call rate. session_metrics::record_unadvise(); session_metrics::set_registered_items(registry_size); Ok(()) @@ -3089,4 +3114,67 @@ mod tests { drop(event_tx); let _ = router_h.await; } + + /// F47 — `Session::unsubscribe` must NOT emit an `UnAdvise` for + /// buffered subscriptions, mirroring the .NET reference's + /// `if (!subscription.IsBuffered)` guard at + /// `MxNativeSession.cs:361-381`. Two parts: + /// + /// - Plain subscribe → unsubscribe records 2 RPCs against the + /// server (AdviseSupervisory + UnAdvise). + /// - Buffered (mutated post-subscribe to flip the registry entry) + /// → unsubscribe records 1 RPC (just the original + /// AdviseSupervisory; no UnAdvise emitted). + /// + /// Plain check uses the existing + /// `subscribe_populates_registry_unsubscribe_clears_it` test as + /// the negative control; this test pins the buffered branch. + #[tokio::test] + async fn unsubscribe_skips_un_advise_for_buffered_subscription() { + let (addr, recorded, handle) = + recording_server(vec![(0, Vec::new()), (0, Vec::new())]).await; + let resolver: Arc = Arc::new(StaticResolver::new(&[( + "TestObj.TestInt", + sample_metadata(), + )])); + let session = connect_test_session(addr, resolver).await.unwrap(); + + // Issue a plain subscribe — server records AdviseSupervisory. + let sub = session.subscribe("TestObj.TestInt").await.unwrap(); + let cid = sub.correlation_id; + assert_eq!(recorded.lock().unwrap().len(), 1, "subscribe should issue 1 RPC"); + + // Mutate the registry entry's mode to Buffered (synthesise the + // state subscribe_buffered_nmx would have produced). + { + let mut reg = session.inner.subscriptions.lock().await; + let entry = reg.get_mut(&cid).expect("registry entry present"); + entry.mode = SubscriptionMode::Buffered { + rounded_interval_ms: 1000, + item_definition: "TestObj.TestInt".to_string(), + item_context: String::new(), + item_handle: 0, + }; + } + + // Unsubscribe the now-buffered entry. F47 contract: NO + // UnAdvise is emitted on the wire; recorded count stays at 1. + session.unsubscribe(sub).await.unwrap(); + assert_eq!( + recorded.lock().unwrap().len(), + 1, + "buffered unsubscribe must not issue UnAdvise; recorded RPC count must stay at 1 \ + (the original AdviseSupervisory)" + ); + + // Registry is still cleared — F47's skip applies only to the + // wire emission, not the consumer-side bookkeeping. + assert_eq!( + session.inner.subscriptions.lock().await.len(), + 0, + "buffered unsubscribe still removes the registry entry" + ); + + handle.abort(); + } }