[F47] mxaccess: unsubscribe skips UnAdvise for buffered subscriptions
Mirrors the .NET reference's `if (!subscription.IsBuffered)` guard
at `MxNativeSession.cs:361-381`. The Rust port previously emitted an
`UnAdvise` frame for both plain and buffered subscriptions; the
buffered server-side registration is unwound by the engine when the
`RegisterReference` handle goes away, so emitting an `UnAdvise` for
buffered entries is at best a no-op extra frame and at worst could
race with the engine's own teardown.
Fix: branch `Session::unsubscribe` on `SubscriptionEntry::mode` (the
discriminator F45 added). For `SubscriptionMode::Buffered { ... }`,
skip the `un_advise` call and proceed directly to registry cleanup.
For `SubscriptionMode::Plain`, retain the previous behaviour.
The registry-entry probe runs first (separate lock acquisition) so
the `is_buffered` decision doesn't hold the NMX-client mutex
unnecessarily — common case where the entry is plain still acquires
the NMX lock immediately after.
The metrics counter `record_unadvise()` still fires on every public
`unsubscribe` call regardless of mode — it tracks consumer-side
unsubscribe rate, not wire-frame rate. That matches what dashboards
expect from the public API.
New unit test `unsubscribe_skips_un_advise_for_buffered_subscription`
issues a plain subscribe (recorded as 1 RPC), mutates the registry
entry to `SubscriptionMode::Buffered`, calls unsubscribe, and
asserts the recorded RPC count stays at 1 (no UnAdvise emitted).
The existing `subscribe_populates_registry_unsubscribe_clears_it`
test serves as the negative control for the plain branch.
Workspace 794 → 795 tests; clippy clean; rustdoc clean.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -1473,30 +1473,55 @@ impl Session {
|
||||
self.ensure_connected()?;
|
||||
let inner = self.inner.clone();
|
||||
let opts = &inner.options;
|
||||
let mut nmx = inner.nmx.lock().await;
|
||||
let hr = nmx
|
||||
.un_advise(
|
||||
opts.local_engine_id,
|
||||
&subscription.metadata,
|
||||
subscription.correlation_id,
|
||||
opts.galaxy_id,
|
||||
/* source_galaxy_id */ i32::from(opts.galaxy_id),
|
||||
opts.source_platform_id,
|
||||
|
||||
// F47 — `Session::unsubscribe` skips `UnAdvise` for buffered
|
||||
// subscriptions, mirroring the .NET reference's
|
||||
// `if (!subscription.IsBuffered)` guard at
|
||||
// `MxNativeSession.cs:361-381`. Buffered subscriptions are
|
||||
// unwound by the engine when the `RegisterReference` handle
|
||||
// goes away — there's no item-level advise to retract.
|
||||
// Probe the registry first so we know which mode the
|
||||
// subscription was registered under.
|
||||
let is_buffered = {
|
||||
let reg = inner.subscriptions.lock().await;
|
||||
matches!(
|
||||
reg.get(&subscription.correlation_id),
|
||||
Some(entry) if matches!(entry.mode, SubscriptionMode::Buffered { .. })
|
||||
)
|
||||
.await
|
||||
.map_err(map_nmx)?;
|
||||
ensure_hresult_ok(hr)?;
|
||||
drop(nmx);
|
||||
};
|
||||
|
||||
if !is_buffered {
|
||||
let mut nmx = inner.nmx.lock().await;
|
||||
let hr = nmx
|
||||
.un_advise(
|
||||
opts.local_engine_id,
|
||||
&subscription.metadata,
|
||||
subscription.correlation_id,
|
||||
opts.galaxy_id,
|
||||
/* source_galaxy_id */ i32::from(opts.galaxy_id),
|
||||
opts.source_platform_id,
|
||||
)
|
||||
.await
|
||||
.map_err(map_nmx)?;
|
||||
ensure_hresult_ok(hr)?;
|
||||
drop(nmx);
|
||||
}
|
||||
|
||||
// F16: drop the subscription from the recovery registry too.
|
||||
// We do this only on the success path — if UnAdvise itself
|
||||
// failed, the server may still hold the supervisory record and
|
||||
// a future recover_connection should re-issue the advise.
|
||||
// For plain entries, we do this only on the UnAdvise success
|
||||
// path — if UnAdvise itself failed, the server may still hold
|
||||
// the supervisory record and a future recover_connection should
|
||||
// re-issue the advise. Buffered entries always reach here
|
||||
// because no UnAdvise was attempted.
|
||||
let registry_size = {
|
||||
let mut reg = inner.subscriptions.lock().await;
|
||||
reg.remove(&subscription.correlation_id);
|
||||
reg.len()
|
||||
};
|
||||
// F40 — count the unadvise + update the gauge.
|
||||
// For buffered entries no UnAdvise was emitted, but the
|
||||
// counter still tracks consumer-side unsubscribe events so
|
||||
// the rate matches the public API's call rate.
|
||||
session_metrics::record_unadvise();
|
||||
session_metrics::set_registered_items(registry_size);
|
||||
Ok(())
|
||||
@@ -3089,4 +3114,67 @@ mod tests {
|
||||
drop(event_tx);
|
||||
let _ = router_h.await;
|
||||
}
|
||||
|
||||
/// F47 — `Session::unsubscribe` must NOT emit an `UnAdvise` for
|
||||
/// buffered subscriptions, mirroring the .NET reference's
|
||||
/// `if (!subscription.IsBuffered)` guard at
|
||||
/// `MxNativeSession.cs:361-381`. Two parts:
|
||||
///
|
||||
/// - Plain subscribe → unsubscribe records 2 RPCs against the
|
||||
/// server (AdviseSupervisory + UnAdvise).
|
||||
/// - Buffered (mutated post-subscribe to flip the registry entry)
|
||||
/// → unsubscribe records 1 RPC (just the original
|
||||
/// AdviseSupervisory; no UnAdvise emitted).
|
||||
///
|
||||
/// Plain check uses the existing
|
||||
/// `subscribe_populates_registry_unsubscribe_clears_it` test as
|
||||
/// the negative control; this test pins the buffered branch.
|
||||
#[tokio::test]
|
||||
async fn unsubscribe_skips_un_advise_for_buffered_subscription() {
|
||||
let (addr, recorded, handle) =
|
||||
recording_server(vec![(0, Vec::new()), (0, Vec::new())]).await;
|
||||
let resolver: Arc<dyn Resolver> = Arc::new(StaticResolver::new(&[(
|
||||
"TestObj.TestInt",
|
||||
sample_metadata(),
|
||||
)]));
|
||||
let session = connect_test_session(addr, resolver).await.unwrap();
|
||||
|
||||
// Issue a plain subscribe — server records AdviseSupervisory.
|
||||
let sub = session.subscribe("TestObj.TestInt").await.unwrap();
|
||||
let cid = sub.correlation_id;
|
||||
assert_eq!(recorded.lock().unwrap().len(), 1, "subscribe should issue 1 RPC");
|
||||
|
||||
// Mutate the registry entry's mode to Buffered (synthesise the
|
||||
// state subscribe_buffered_nmx would have produced).
|
||||
{
|
||||
let mut reg = session.inner.subscriptions.lock().await;
|
||||
let entry = reg.get_mut(&cid).expect("registry entry present");
|
||||
entry.mode = SubscriptionMode::Buffered {
|
||||
rounded_interval_ms: 1000,
|
||||
item_definition: "TestObj.TestInt".to_string(),
|
||||
item_context: String::new(),
|
||||
item_handle: 0,
|
||||
};
|
||||
}
|
||||
|
||||
// Unsubscribe the now-buffered entry. F47 contract: NO
|
||||
// UnAdvise is emitted on the wire; recorded count stays at 1.
|
||||
session.unsubscribe(sub).await.unwrap();
|
||||
assert_eq!(
|
||||
recorded.lock().unwrap().len(),
|
||||
1,
|
||||
"buffered unsubscribe must not issue UnAdvise; recorded RPC count must stay at 1 \
|
||||
(the original AdviseSupervisory)"
|
||||
);
|
||||
|
||||
// Registry is still cleared — F47's skip applies only to the
|
||||
// wire emission, not the consumer-side bookkeeping.
|
||||
assert_eq!(
|
||||
session.inner.subscriptions.lock().await.len(),
|
||||
0,
|
||||
"buffered unsubscribe still removes the registry entry"
|
||||
);
|
||||
|
||||
handle.abort();
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user