[F47] mxaccess: unsubscribe skips UnAdvise for buffered subscriptions
Mirrors the .NET reference's `if (!subscription.IsBuffered)` guard
at `MxNativeSession.cs:361-381`. The Rust port previously emitted an
`UnAdvise` frame for both plain and buffered subscriptions; the
buffered server-side registration is unwound by the engine when the
`RegisterReference` handle goes away, so emitting an `UnAdvise` for
buffered entries is at best a no-op extra frame and at worst could
race with the engine's own teardown.
Fix: branch `Session::unsubscribe` on `SubscriptionEntry::mode` (the
discriminator F45 added). For `SubscriptionMode::Buffered { ... }`,
skip the `un_advise` call and proceed directly to registry cleanup.
For `SubscriptionMode::Plain`, retain the previous behaviour.
The registry-entry probe runs first (separate lock acquisition) so
the `is_buffered` decision doesn't hold the NMX-client mutex
unnecessarily — common case where the entry is plain still acquires
the NMX lock immediately after.
The metrics counter `record_unadvise()` still fires on every public
`unsubscribe` call regardless of mode — it tracks consumer-side
unsubscribe rate, not wire-frame rate. That matches what dashboards
expect from the public API.
New unit test `unsubscribe_skips_un_advise_for_buffered_subscription`
issues a plain subscribe (recorded as 1 RPC), mutates the registry
entry to `SubscriptionMode::Buffered`, calls unsubscribe, and
asserts the recorded RPC count stays at 1 (no UnAdvise emitted).
The existing `subscribe_populates_registry_unsubscribe_clears_it`
test serves as the negative control for the plain branch.
Workspace 794 → 795 tests; clippy clean; rustdoc clean.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -1473,30 +1473,55 @@ impl Session {
|
|||||||
self.ensure_connected()?;
|
self.ensure_connected()?;
|
||||||
let inner = self.inner.clone();
|
let inner = self.inner.clone();
|
||||||
let opts = &inner.options;
|
let opts = &inner.options;
|
||||||
let mut nmx = inner.nmx.lock().await;
|
|
||||||
let hr = nmx
|
// F47 — `Session::unsubscribe` skips `UnAdvise` for buffered
|
||||||
.un_advise(
|
// subscriptions, mirroring the .NET reference's
|
||||||
opts.local_engine_id,
|
// `if (!subscription.IsBuffered)` guard at
|
||||||
&subscription.metadata,
|
// `MxNativeSession.cs:361-381`. Buffered subscriptions are
|
||||||
subscription.correlation_id,
|
// unwound by the engine when the `RegisterReference` handle
|
||||||
opts.galaxy_id,
|
// goes away — there's no item-level advise to retract.
|
||||||
/* source_galaxy_id */ i32::from(opts.galaxy_id),
|
// Probe the registry first so we know which mode the
|
||||||
opts.source_platform_id,
|
// subscription was registered under.
|
||||||
|
let is_buffered = {
|
||||||
|
let reg = inner.subscriptions.lock().await;
|
||||||
|
matches!(
|
||||||
|
reg.get(&subscription.correlation_id),
|
||||||
|
Some(entry) if matches!(entry.mode, SubscriptionMode::Buffered { .. })
|
||||||
)
|
)
|
||||||
.await
|
};
|
||||||
.map_err(map_nmx)?;
|
|
||||||
ensure_hresult_ok(hr)?;
|
if !is_buffered {
|
||||||
drop(nmx);
|
let mut nmx = inner.nmx.lock().await;
|
||||||
|
let hr = nmx
|
||||||
|
.un_advise(
|
||||||
|
opts.local_engine_id,
|
||||||
|
&subscription.metadata,
|
||||||
|
subscription.correlation_id,
|
||||||
|
opts.galaxy_id,
|
||||||
|
/* source_galaxy_id */ i32::from(opts.galaxy_id),
|
||||||
|
opts.source_platform_id,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.map_err(map_nmx)?;
|
||||||
|
ensure_hresult_ok(hr)?;
|
||||||
|
drop(nmx);
|
||||||
|
}
|
||||||
|
|
||||||
// F16: drop the subscription from the recovery registry too.
|
// F16: drop the subscription from the recovery registry too.
|
||||||
// We do this only on the success path — if UnAdvise itself
|
// For plain entries, we do this only on the UnAdvise success
|
||||||
// failed, the server may still hold the supervisory record and
|
// path — if UnAdvise itself failed, the server may still hold
|
||||||
// a future recover_connection should re-issue the advise.
|
// the supervisory record and a future recover_connection should
|
||||||
|
// re-issue the advise. Buffered entries always reach here
|
||||||
|
// because no UnAdvise was attempted.
|
||||||
let registry_size = {
|
let registry_size = {
|
||||||
let mut reg = inner.subscriptions.lock().await;
|
let mut reg = inner.subscriptions.lock().await;
|
||||||
reg.remove(&subscription.correlation_id);
|
reg.remove(&subscription.correlation_id);
|
||||||
reg.len()
|
reg.len()
|
||||||
};
|
};
|
||||||
// F40 — count the unadvise + update the gauge.
|
// F40 — count the unadvise + update the gauge.
|
||||||
|
// For buffered entries no UnAdvise was emitted, but the
|
||||||
|
// counter still tracks consumer-side unsubscribe events so
|
||||||
|
// the rate matches the public API's call rate.
|
||||||
session_metrics::record_unadvise();
|
session_metrics::record_unadvise();
|
||||||
session_metrics::set_registered_items(registry_size);
|
session_metrics::set_registered_items(registry_size);
|
||||||
Ok(())
|
Ok(())
|
||||||
@@ -3089,4 +3114,67 @@ mod tests {
|
|||||||
drop(event_tx);
|
drop(event_tx);
|
||||||
let _ = router_h.await;
|
let _ = router_h.await;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// F47 — `Session::unsubscribe` must NOT emit an `UnAdvise` for
|
||||||
|
/// buffered subscriptions, mirroring the .NET reference's
|
||||||
|
/// `if (!subscription.IsBuffered)` guard at
|
||||||
|
/// `MxNativeSession.cs:361-381`. Two parts:
|
||||||
|
///
|
||||||
|
/// - Plain subscribe → unsubscribe records 2 RPCs against the
|
||||||
|
/// server (AdviseSupervisory + UnAdvise).
|
||||||
|
/// - Buffered (mutated post-subscribe to flip the registry entry)
|
||||||
|
/// → unsubscribe records 1 RPC (just the original
|
||||||
|
/// AdviseSupervisory; no UnAdvise emitted).
|
||||||
|
///
|
||||||
|
/// Plain check uses the existing
|
||||||
|
/// `subscribe_populates_registry_unsubscribe_clears_it` test as
|
||||||
|
/// the negative control; this test pins the buffered branch.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn unsubscribe_skips_un_advise_for_buffered_subscription() {
|
||||||
|
let (addr, recorded, handle) =
|
||||||
|
recording_server(vec![(0, Vec::new()), (0, Vec::new())]).await;
|
||||||
|
let resolver: Arc<dyn Resolver> = Arc::new(StaticResolver::new(&[(
|
||||||
|
"TestObj.TestInt",
|
||||||
|
sample_metadata(),
|
||||||
|
)]));
|
||||||
|
let session = connect_test_session(addr, resolver).await.unwrap();
|
||||||
|
|
||||||
|
// Issue a plain subscribe — server records AdviseSupervisory.
|
||||||
|
let sub = session.subscribe("TestObj.TestInt").await.unwrap();
|
||||||
|
let cid = sub.correlation_id;
|
||||||
|
assert_eq!(recorded.lock().unwrap().len(), 1, "subscribe should issue 1 RPC");
|
||||||
|
|
||||||
|
// Mutate the registry entry's mode to Buffered (synthesise the
|
||||||
|
// state subscribe_buffered_nmx would have produced).
|
||||||
|
{
|
||||||
|
let mut reg = session.inner.subscriptions.lock().await;
|
||||||
|
let entry = reg.get_mut(&cid).expect("registry entry present");
|
||||||
|
entry.mode = SubscriptionMode::Buffered {
|
||||||
|
rounded_interval_ms: 1000,
|
||||||
|
item_definition: "TestObj.TestInt".to_string(),
|
||||||
|
item_context: String::new(),
|
||||||
|
item_handle: 0,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unsubscribe the now-buffered entry. F47 contract: NO
|
||||||
|
// UnAdvise is emitted on the wire; recorded count stays at 1.
|
||||||
|
session.unsubscribe(sub).await.unwrap();
|
||||||
|
assert_eq!(
|
||||||
|
recorded.lock().unwrap().len(),
|
||||||
|
1,
|
||||||
|
"buffered unsubscribe must not issue UnAdvise; recorded RPC count must stay at 1 \
|
||||||
|
(the original AdviseSupervisory)"
|
||||||
|
);
|
||||||
|
|
||||||
|
// Registry is still cleared — F47's skip applies only to the
|
||||||
|
// wire emission, not the consumer-side bookkeeping.
|
||||||
|
assert_eq!(
|
||||||
|
session.inner.subscriptions.lock().await.len(),
|
||||||
|
0,
|
||||||
|
"buffered unsubscribe still removes the registry entry"
|
||||||
|
);
|
||||||
|
|
||||||
|
handle.abort();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user