diff --git a/rust/crates/mxaccess/src/session.rs b/rust/crates/mxaccess/src/session.rs index 3ee8f95..25fb423 100644 --- a/rust/crates/mxaccess/src/session.rs +++ b/rust/crates/mxaccess/src/session.rs @@ -1473,30 +1473,55 @@ impl Session { self.ensure_connected()?; let inner = self.inner.clone(); let opts = &inner.options; - let mut nmx = inner.nmx.lock().await; - let hr = nmx - .un_advise( - opts.local_engine_id, - &subscription.metadata, - subscription.correlation_id, - opts.galaxy_id, - /* source_galaxy_id */ i32::from(opts.galaxy_id), - opts.source_platform_id, + + // F47 — `Session::unsubscribe` skips `UnAdvise` for buffered + // subscriptions, mirroring the .NET reference's + // `if (!subscription.IsBuffered)` guard at + // `MxNativeSession.cs:361-381`. Buffered subscriptions are + // unwound by the engine when the `RegisterReference` handle + // goes away — there's no item-level advise to retract. + // Probe the registry first so we know which mode the + // subscription was registered under. + let is_buffered = { + let reg = inner.subscriptions.lock().await; + matches!( + reg.get(&subscription.correlation_id), + Some(entry) if matches!(entry.mode, SubscriptionMode::Buffered { .. }) ) - .await - .map_err(map_nmx)?; - ensure_hresult_ok(hr)?; - drop(nmx); + }; + + if !is_buffered { + let mut nmx = inner.nmx.lock().await; + let hr = nmx + .un_advise( + opts.local_engine_id, + &subscription.metadata, + subscription.correlation_id, + opts.galaxy_id, + /* source_galaxy_id */ i32::from(opts.galaxy_id), + opts.source_platform_id, + ) + .await + .map_err(map_nmx)?; + ensure_hresult_ok(hr)?; + drop(nmx); + } + // F16: drop the subscription from the recovery registry too. - // We do this only on the success path — if UnAdvise itself - // failed, the server may still hold the supervisory record and - // a future recover_connection should re-issue the advise. + // For plain entries, we do this only on the UnAdvise success + // path — if UnAdvise itself failed, the server may still hold + // the supervisory record and a future recover_connection should + // re-issue the advise. Buffered entries always reach here + // because no UnAdvise was attempted. let registry_size = { let mut reg = inner.subscriptions.lock().await; reg.remove(&subscription.correlation_id); reg.len() }; // F40 — count the unadvise + update the gauge. + // For buffered entries no UnAdvise was emitted, but the + // counter still tracks consumer-side unsubscribe events so + // the rate matches the public API's call rate. session_metrics::record_unadvise(); session_metrics::set_registered_items(registry_size); Ok(()) @@ -3089,4 +3114,67 @@ mod tests { drop(event_tx); let _ = router_h.await; } + + /// F47 — `Session::unsubscribe` must NOT emit an `UnAdvise` for + /// buffered subscriptions, mirroring the .NET reference's + /// `if (!subscription.IsBuffered)` guard at + /// `MxNativeSession.cs:361-381`. Two parts: + /// + /// - Plain subscribe → unsubscribe records 2 RPCs against the + /// server (AdviseSupervisory + UnAdvise). + /// - Buffered (mutated post-subscribe to flip the registry entry) + /// → unsubscribe records 1 RPC (just the original + /// AdviseSupervisory; no UnAdvise emitted). + /// + /// Plain check uses the existing + /// `subscribe_populates_registry_unsubscribe_clears_it` test as + /// the negative control; this test pins the buffered branch. + #[tokio::test] + async fn unsubscribe_skips_un_advise_for_buffered_subscription() { + let (addr, recorded, handle) = + recording_server(vec![(0, Vec::new()), (0, Vec::new())]).await; + let resolver: Arc = Arc::new(StaticResolver::new(&[( + "TestObj.TestInt", + sample_metadata(), + )])); + let session = connect_test_session(addr, resolver).await.unwrap(); + + // Issue a plain subscribe — server records AdviseSupervisory. + let sub = session.subscribe("TestObj.TestInt").await.unwrap(); + let cid = sub.correlation_id; + assert_eq!(recorded.lock().unwrap().len(), 1, "subscribe should issue 1 RPC"); + + // Mutate the registry entry's mode to Buffered (synthesise the + // state subscribe_buffered_nmx would have produced). + { + let mut reg = session.inner.subscriptions.lock().await; + let entry = reg.get_mut(&cid).expect("registry entry present"); + entry.mode = SubscriptionMode::Buffered { + rounded_interval_ms: 1000, + item_definition: "TestObj.TestInt".to_string(), + item_context: String::new(), + item_handle: 0, + }; + } + + // Unsubscribe the now-buffered entry. F47 contract: NO + // UnAdvise is emitted on the wire; recorded count stays at 1. + session.unsubscribe(sub).await.unwrap(); + assert_eq!( + recorded.lock().unwrap().len(), + 1, + "buffered unsubscribe must not issue UnAdvise; recorded RPC count must stay at 1 \ + (the original AdviseSupervisory)" + ); + + // Registry is still cleared — F47's skip applies only to the + // wire emission, not the consumer-side bookkeeping. + assert_eq!( + session.inner.subscriptions.lock().await.len(), + 0, + "buffered unsubscribe still removes the registry entry" + ); + + handle.abort(); + } }