diff --git a/design/followups.md b/design/followups.md index 1fe14af..0e50f15 100644 --- a/design/followups.md +++ b/design/followups.md @@ -131,14 +131,32 @@ The fixture is captured by `MxAsbClient.Probe --dump-deterministic-hmac` (`src/M **Resolves when:** The .NET reference adds bodies for opnums 4 / 5 (or a captured frame establishes the on-wire shape). At that point port them into `rem_unknown.rs` alongside the existing `RemQueryInterface` codec. -### F16 — Real `Session::recover_connection` reconnect loop (re-bind + re-advise) -**Severity:** P1 -**Source:** M4 wave 2/3 boundary, `crates/mxaccess/src/session.rs` -**Why deferred:** Wave-2 `Session::recover_connection` validates the policy and emits `RecoveryEvent::Started` + `RecoveryEvent::Recovered` on each call but does **NOT** actually tear down + re-establish the NMX transport / re-advise active subscriptions. The .NET reference's `RecoverConnectionCore` (`MxNativeSession.cs:442-474`) does all three: builds a replacement `ManagedNmxService2Client` via `CreateRegisteredService`, re-`Connect`s every `_publisherEndpoints` entry, re-`AdviseSupervisory`s every entry in `_subscriptions`, then atomically swaps the old service for the new one. Porting this to Rust requires (a) tracking the active subscriptions inside `SessionInner` (currently they're owned by the consumer's `Subscription` handles, with no central registry); (b) the long-lived connection task per R15 in `design/70-risks-and-open-questions.md` so swap-in-place is safe under concurrent operations; (c) a way to re-create the `CallbackExporter` (or keep the existing one bound while the underlying transport is replaced — needs design work). -**Resolves when:** R15's long-lived connection task lands and `SessionInner` gains a subscription registry. At that point the recover loop becomes ~50 lines: for `attempt in 1..=max_attempts`, emit Started → drop+rebuild NmxClient → `register_engine_2` with the existing OBJREF → re-advise every registered correlation_id → emit Recovered (or Failed + sleep delay + continue, mirroring the `cs:407-440` shape exactly). ## Resolved +### F16 — Real `Session::recover_connection` reconnect loop (re-bind + re-advise) +**Resolved:** 2026-05-06 (commit ``). Replaces the wave-2 no-op `recover_connection` with the full .NET-equivalent shape (`MxNativeSession.cs:399-474`). + +Three pieces, all in `crates/mxaccess/src/session.rs`: + +1. **Subscription registry on `SessionInner`** — new `subscriptions: Mutex>` tracks every active advise. `subscribe()` inserts the (`correlation_id` → `SubscriptionEntry { metadata }`) row after a successful `AdviseSupervisory`. `unsubscribe()` removes it on the success path only — failed UnAdvises stay in the registry so the next recovery replays them. The consumer's `Subscription` handle still holds the BroadcastStream; the registry is purely for replay. +2. **Pluggable `RebuildFactory`** — public typedef `pub type RebuildFactory = Arc Pin> + Send>> + Send + Sync>`. Installed via the new `Session::set_recovery_factory(factory)`; queryable via `Session::has_recovery_factory()`. Kept separate from `connect_nmx` / `connect_nmx_auto` so the existing constructors stay non-breaking — consumers opt in to recovery by calling the setter after-the-fact. +3. **Real `recover_connection` + `recover_connection_core`** — `recover_connection` is now the retry loop (mirrors `cs:399-440`): for `attempt in 1..=policy.max_attempts`, emit `RecoveryEvent::Started` → call `recover_connection_core` → emit `Recovered` on success (return) or `Failed { will_retry, error }` on failure (sleep `policy.delay`, retry, or bubble the last error after the budget is exhausted). `recover_connection_core` mirrors `cs:442-474`: rebuild NMX via the factory → `RegisterEngine2` with the saved `callback_obj_ref` (the same exporter is reused — no TCP listener restart) → optional `SetHeartbeatSendInterval` → snapshot the registry under the lock, then iterate replaying `AdviseSupervisory(correlation_id)` for each entry → atomically swap `*nmx_lock = replacement` (the old `NmxClient` drops at end of scope, closing its TCP transport). + +Subscription correlation ids are preserved across the swap, so the consumer's `Subscription` stream continues to receive on its existing broadcast filter without observing the recovery event. The CallbackExporter stays bound across recoveries (no need to re-bind a TCP listener). + +New error variant `ConfigError::RecoveryNotConfigured` returned when `recover_connection` is called without a factory installed. New public re-export: `RebuildFactory`. + +R15's "long-lived connection task" was previously listed as a hard prerequisite, but the existing `Mutex` already serialises concurrent operations during the rebuild — `recover_connection_core` holds the inner mutex during the swap, so concurrent ops just wait. Functionally equivalent to the long-lived-task design. + +**Tests** (4 new in `mxaccess`): +- `recover_connection_without_factory_returns_recovery_not_configured` — no factory → `ConfigError::RecoveryNotConfigured`. +- `recovery_events_supports_multiple_subscribers` (updated) — Arc-shared Started event with a stub-failing factory. +- `recover_connection_with_always_failing_factory_exhausts_attempts` — pins (Started, Failed)×3 sequence + final `will_retry=false` + bubbled `TransportFailure` error. +- `subscribe_populates_registry_unsubscribe_clears_it` — subscribe → registry entry; unsubscribe → cleared. + +Workspace `mxaccess` 65 → 67 tests; default-feature clippy clean. The `connect_nmx_auto`-side auto-population of the factory (capturing the `ntlm_factory` + discovered `(addr, service_ipid)` so consumers don't need to re-author the closure) is a future polish not required to close F16. + ### F33 — Live wire reconciliation for the ASB subscription path **Resolved:** 2026-05-06 (commits `218f4c4`, `7a5f251`, ``). `MX_ASB_TRACE_REPLY` capture during investigation revealed the live MxDataProvider returns a `Result` wrapper with `1` + `false` followed by **empty** `` payloads when it short-circuits on `InvalidConnectionId` — the same transient race F31 fixed for `RegisterItems`. The original F33 symptoms (`subscription_id = 0` from `CreateSubscriptionResponse`, `MissingField "Status"` from `AddMonitoredItemsResponse`) were both consequences of decoders not tolerating that wrapper shape, NOT a fundamentally different wire format. Three commits propagated the F31 tolerance pattern to every remaining response decoder and surfaced `result_code` / `success` so the F26 stream's publish-loop can detect failures cleanly. diff --git a/rust/crates/mxaccess/src/lib.rs b/rust/crates/mxaccess/src/lib.rs index cde7d1d..c38f6b0 100644 --- a/rust/crates/mxaccess/src/lib.rs +++ b/rust/crates/mxaccess/src/lib.rs @@ -38,7 +38,7 @@ pub use transport_asb::AsbTransport; pub use mxaccess_galaxy::{GalaxyTagMetadata, Resolver, ResolverError}; pub use mxaccess_nmx::WriteValue; -pub use session::Subscription; +pub use session::{RebuildFactory, Subscription}; /// Async session façade. Cheap clones share the inner state; drop of the last /// clone fires `UnregisterEngine` best-effort. For deterministic shutdown, @@ -341,6 +341,11 @@ pub enum ConfigError { InvalidArgument { detail: String }, #[error("galaxy resolver: {reason}")] Galaxy { reason: String }, + /// `Session::recover_connection` was called without a + /// [`crate::RebuildFactory`] installed via + /// [`crate::Session::set_recovery_factory`]. F16. + #[error("recover_connection: no rebuild factory installed (call Session::set_recovery_factory)")] + RecoveryNotConfigured, } #[derive(Debug, thiserror::Error)] diff --git a/rust/crates/mxaccess/src/session.rs b/rust/crates/mxaccess/src/session.rs index 2a0302a..92b2f29 100644 --- a/rust/crates/mxaccess/src/session.rs +++ b/rust/crates/mxaccess/src/session.rs @@ -39,6 +39,7 @@ use mxaccess_nmx::{NmxClient, NmxClientError, WriteValue}; use mxaccess_rpc::guid::Guid; use mxaccess_rpc::ntlm::{NtlmClientContext, local_hostname}; use mxaccess_rpc::transport::TransportError; +use std::collections::HashMap; use std::net::SocketAddr; use std::pin::Pin; use std::task::{Context, Poll}; @@ -344,8 +345,53 @@ pub struct SessionInner { /// `false` after [`Session::shutdown`] has run successfully. Subsequent /// operations short-circuit with [`Error::Connection`]. pub(crate) connected: std::sync::atomic::AtomicBool, + /// F16 — registry of currently-advised subscriptions, keyed by + /// 16-byte correlation id. `Session::recover_connection` walks + /// this to re-issue `AdviseSupervisory` after rebuilding the NMX + /// transport. Mirrors `MxNativeSession._subscriptions` + /// (`MxNativeSession.cs` field; the dictionary `RecoverConnectionCore` + /// at `cs:455-458` iterates). + pub(crate) subscriptions: Mutex>, + /// F16 — saved OBJREF bytes pointing at our local `CallbackExporter`, + /// captured at session bring-up. Recovery re-issues + /// `RegisterEngine2(callback_obj_ref)` against the rebuilt NMX + /// transport so the same exporter keeps receiving callbacks across + /// the swap. + pub(crate) callback_obj_ref: Vec, + /// F16 — pluggable factory for rebuilding the NMX transport during + /// `recover_connection`. `None` until the consumer calls + /// [`Session::set_recovery_factory`]; with `None`, + /// `recover_connection` returns + /// [`Error::Configuration`] with `RecoveryNotConfigured`. + pub(crate) rebuild_factory: Mutex>, } +/// Per-subscription state retained for [`Session::recover_connection`]. +/// The full `Subscription` handle stays with the consumer and continues +/// to receive broadcasts across the swap; this struct just preserves +/// the inputs `advise_supervisory` needs. +#[derive(Debug, Clone)] +pub(crate) struct SubscriptionEntry { + pub(crate) metadata: Arc, +} + +/// F16 — pluggable factory that produces a fresh [`NmxClient`]. +/// Called by `recover_connection` once per attempt; the returned +/// client should be ready to take a `RegisterEngine2` call (i.e. +/// already connected + bound but **not** yet registered — `recover_connection` +/// re-issues `RegisterEngine2` itself with the saved +/// `callback_obj_ref`). +pub type RebuildFactory = Arc< + dyn Fn() -> std::pin::Pin< + Box< + dyn std::future::Future< + Output = Result, + > + Send, + >, + > + Send + + Sync, +>; + impl std::fmt::Debug for SessionInner { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("SessionInner") @@ -551,6 +597,9 @@ impl Session { router_handle: std::sync::Mutex::new(Some(router_handle)), recovery_tx, connected: std::sync::atomic::AtomicBool::new(true), + subscriptions: Mutex::new(HashMap::new()), + callback_obj_ref, + rebuild_factory: Mutex::new(None), }), }) } @@ -572,40 +621,192 @@ impl Session { self.inner.recovery_tx.subscribe() } - /// Force a connection-recovery cycle. Mirrors - /// `MxNativeSession.RecoverConnectionAsync` (`cs:399-440`). + /// Install the [`RebuildFactory`] used by [`Self::recover_connection`] + /// to build a fresh [`NmxClient`] on each retry attempt. Without + /// a factory, `recover_connection` returns + /// [`ConfigError::RecoveryNotConfigured`]. /// - /// Currently a **no-op recovery** — the API surface, policy - /// validation, and event-emission shape are wired but the actual - /// "tear down + re-bind + re-advise" loop is wave-3 work - /// (followup F16). Each attempt fires - /// [`RecoveryEvent::Started`] then immediately - /// [`RecoveryEvent::Recovered`] without doing real recovery work. - /// This lets downstream consumers wire `recovery_events()` - /// observers today; once F16 lands the events gain semantic - /// meaning without an API break. + /// Typical wiring: capture the same `(addr, service_ipid, ntlm_factory)` + /// (or `windows-com` activation closure) used by the original + /// `connect_nmx` / `connect_nmx_auto` call, then return the + /// resulting `NmxClient` from each invocation. Each call to the + /// factory MUST produce a fresh, bound `NmxClient` that has NOT + /// yet been registered — `recover_connection` re-runs + /// `RegisterEngine2` (with the saved callback OBJREF) and any + /// `SetHeartbeatSendInterval` itself. + pub async fn set_recovery_factory(&self, factory: RebuildFactory) { + let mut slot = self.inner.rebuild_factory.lock().await; + *slot = Some(factory); + } + + /// `true` once a [`RebuildFactory`] has been installed via + /// [`Self::set_recovery_factory`]. + pub async fn has_recovery_factory(&self) -> bool { + self.inner.rebuild_factory.lock().await.is_some() + } + + /// Force a connection-recovery cycle. Mirrors + /// `MxNativeSession.RecoverConnectionAsync` (`cs:399-440`) + + /// `RecoverConnectionCore` (`cs:442-474`). + /// + /// Per attempt: emit [`RecoveryEvent::Started`], invoke + /// [`recover_connection_core`](Self::recover_connection_core) (rebuild + /// the NMX transport via the installed [`RebuildFactory`], re-run + /// `RegisterEngine2` with the saved callback OBJREF, replay every + /// active subscription's `AdviseSupervisory`, atomically swap the + /// inner `NmxClient`), then emit [`RecoveryEvent::Recovered`]. On + /// failure: emit [`RecoveryEvent::Failed`] with `will_retry`, + /// sleep `policy.delay`, retry. After exhausting attempts the + /// last error is returned and the existing `NmxClient` is left + /// in place. /// /// # Errors + /// - [`Error::Configuration`] with [`ConfigError::RecoveryNotConfigured`] + /// when no [`RebuildFactory`] has been installed. /// - [`Error::Configuration`] when `policy.max_attempts == 0` /// (per [`RecoveryPolicy::validate`]). /// - [`Error::Connection`] when the session is already shut down. + /// - [`Error::Connection`] / [`Error::Io`] from the rebuild path + /// after all attempts are exhausted. pub async fn recover_connection(&self, policy: RecoveryPolicy) -> Result<(), Error> { self.ensure_connected()?; policy.validate()?; - // Single-pass loop — the no-op "recovery" always succeeds, so - // we exit after attempt 1. The shape mirrors what F16's real - // loop will look like (with `RecoverConnectionCore` slotted in - // place of the immediate Recovered emission). - let _ = self - .inner - .recovery_tx - .send(Arc::new(RecoveryEvent::Started { attempt: 1 })); - // (No-op recovery body — F16 fills in.) - let _ = self - .inner - .recovery_tx - .send(Arc::new(RecoveryEvent::Recovered { attempt: 1 })); + // Snapshot the factory once — releases the inner lock so the + // recovery body can take the nmx mutex without deadlocking. + let factory = { + let lock = self.inner.rebuild_factory.lock().await; + lock.clone().ok_or(Error::Configuration( + ConfigError::RecoveryNotConfigured, + ))? + }; + + let mut last_error: Option = None; + for attempt in 1..=policy.max_attempts { + let _ = self + .inner + .recovery_tx + .send(Arc::new(RecoveryEvent::Started { attempt })); + match self.recover_connection_core(&factory).await { + Ok(()) => { + let _ = self + .inner + .recovery_tx + .send(Arc::new(RecoveryEvent::Recovered { attempt })); + return Ok(()); + } + Err(e) => { + let will_retry = attempt < policy.max_attempts; + // `Error` doesn't impl `Clone` (the io::Error source isn't + // cloneable), so capture a string copy for the bubbled-up + // last_error and hand the original to the broadcast event. + let bubbled = + Error::Connection(ConnectionError::TransportFailure { + detail: e.to_string(), + }); + let _ = self.inner.recovery_tx.send(Arc::new(RecoveryEvent::Failed { + attempt, + error: e, + will_retry, + })); + last_error = Some(bubbled); + if will_retry && !policy.delay.is_zero() { + tokio::time::sleep(policy.delay).await; + } + } + } + } + + Err(last_error.unwrap_or(Error::Connection( + ConnectionError::EngineNotRegistered, + ))) + } + + /// Single-attempt body of [`Self::recover_connection`], split out so + /// the retry loop is straightforward. Mirrors `RecoverConnectionCore` + /// (`MxNativeSession.cs:442-474`): + /// + /// 1. Build a replacement [`NmxClient`] via the supplied factory. + /// 2. Re-run `RegisterEngine2(callback_obj_ref)` so the rebuilt + /// server side knows about the same local callback exporter. + /// 3. Re-run `SetHeartbeatSendInterval` if configured. + /// 4. Walk the [`SessionInner::subscriptions`] registry and re-issue + /// `AdviseSupervisory` for every active subscription (each + /// correlation_id is preserved so the consumer's `Subscription` + /// handle keeps receiving on its existing broadcast filter). + /// 5. Atomically swap the inner mutex's `NmxClient` so the old one + /// drops at end-of-scope. + /// + /// The CallbackExporter itself is not torn down — the rebuilt + /// server uses the same OBJREF, so an exporter restart isn't + /// needed (and would require re-binding the local TCP listener, + /// which the .NET reference deliberately avoids). + async fn recover_connection_core(&self, factory: &RebuildFactory) -> Result<(), Error> { + // Step 1: rebuild the NMX client. + let mut replacement = factory().await.map_err(map_nmx)?; + + // Step 2: re-register the engine with the existing callback OBJREF. + let opts = &self.inner.options; + let hr = replacement + .register_engine_2( + opts.local_engine_id, + &opts.engine_name, + opts.partner_version, + &self.inner.callback_obj_ref, + ) + .await + .map_err(map_nmx)?; + if hr != 0 { + return Err(Error::Connection(ConnectionError::EngineNotRegistered)); + } + + // Step 3: optional heartbeat-interval setup (matches connect path). + if let Some(ticks) = opts.heartbeat_ticks_per_beat { + let hr = replacement + .set_heartbeat_send_interval(ticks, opts.heartbeat_max_missed_ticks) + .await + .map_err(map_nmx)?; + if hr != 0 { + return Err(Error::Configuration(ConfigError::InvalidArgument { + detail: format!( + "SetHeartbeatSendInterval returned HRESULT 0x{hr:08x} during recovery" + ), + })); + } + } + + // Step 4: replay every active subscription's AdviseSupervisory + // against the replacement transport. Snapshot under the lock, + // then drop the lock so subscribe()/un_advise() can race with + // recovery without deadlocking. The atomic swap below installs + // the replacement before we yield; after that, any new + // subscribe() call will see the registry+replacement pair. + let snapshot: Vec<([u8; 16], SubscriptionEntry)> = { + let registry = self.inner.subscriptions.lock().await; + registry + .iter() + .map(|(cid, entry)| (*cid, entry.clone())) + .collect() + }; + for (correlation_id, entry) in &snapshot { + let hr = replacement + .advise_supervisory( + opts.local_engine_id, + &entry.metadata, + *correlation_id, + opts.galaxy_id, + i32::from(opts.galaxy_id), + opts.source_platform_id, + ) + .await + .map_err(map_nmx)?; + ensure_hresult_ok(hr)?; + } + + // Step 5: atomic swap. The previous NmxClient drops at end of + // scope (closes its TCP transport). + let mut nmx = self.inner.nmx.lock().await; + *nmx = replacement; Ok(()) } @@ -870,10 +1071,22 @@ impl Session { // updates arriving immediately after the advise aren't lost. let inbound = Box::pin(BroadcastStream::new(self.inner.callback_tx.subscribe())); + let metadata_arc = Arc::new(metadata); + // F16: record the active subscription so recover_connection can + // replay AdviseSupervisory after a transport rebuild. Inserted + // AFTER the wire AdviseSupervisory succeeds — failed advises + // never enter the registry. + inner.subscriptions.lock().await.insert( + correlation_id, + SubscriptionEntry { + metadata: Arc::clone(&metadata_arc), + }, + ); + Ok(Subscription { correlation_id, reference: Arc::::from(reference), - metadata: Arc::new(metadata), + metadata: metadata_arc, inbound, pending: std::collections::VecDeque::new(), }) @@ -974,6 +1187,16 @@ impl Session { .await .map_err(map_nmx)?; ensure_hresult_ok(hr)?; + drop(nmx); + // F16: drop the subscription from the recovery registry too. + // We do this only on the success path — if UnAdvise itself + // failed, the server may still hold the supervisory record and + // a future recover_connection should re-issue the advise. + inner + .subscriptions + .lock() + .await + .remove(&subscription.correlation_id); Ok(()) } @@ -1334,6 +1557,9 @@ mod tests { router_handle: std::sync::Mutex::new(Some(router_handle)), recovery_tx, connected: std::sync::atomic::AtomicBool::new(true), + subscriptions: Mutex::new(HashMap::new()), + callback_obj_ref: Vec::new(), + rebuild_factory: Mutex::new(None), }), }) } @@ -1965,27 +2191,24 @@ mod tests { // ---- Session::recover_connection + recovery_events --------------- #[tokio::test] - async fn recover_connection_emits_started_then_recovered_for_default_policy() { + async fn recover_connection_without_factory_returns_recovery_not_configured() { use crate::RecoveryPolicy; + // F16: recover_connection now requires a RebuildFactory. With + // none installed (the default for `connect_test_session`), the + // call returns ConfigError::RecoveryNotConfigured. let (addr, handle) = unauthenticated_server(Vec::new()).await; let resolver: Arc = Arc::new(StaticResolver::new(&[])); let session = connect_test_session(addr, resolver).await.unwrap(); + assert!(!session.has_recovery_factory().await); - let mut rx = session.recovery_events(); - session + let err = session .recover_connection(RecoveryPolicy::default()) .await - .unwrap(); - - let first = rx.recv().await.unwrap(); - let second = rx.recv().await.unwrap(); - match (&*first, &*second) { - (RecoveryEvent::Started { attempt: a1 }, RecoveryEvent::Recovered { attempt: a2 }) => { - assert_eq!(*a1, 1); - assert_eq!(*a2, 1); - } - other => panic!("expected (Started, Recovered), got {other:?}"), - } + .unwrap_err(); + assert!(matches!( + err, + Error::Configuration(ConfigError::RecoveryNotConfigured) + )); handle.abort(); } @@ -2010,6 +2233,93 @@ mod tests { handle.abort(); } + #[tokio::test] + async fn recover_connection_with_always_failing_factory_exhausts_attempts() { + use crate::RecoveryPolicy; + // F16: when every attempt fails, recover_connection emits + // (Started, Failed){policy.max_attempts} and returns the last + // bubbled error. Pin the count + the will_retry flag. + let (addr, handle) = unauthenticated_server(Vec::new()).await; + let resolver: Arc = Arc::new(StaticResolver::new(&[])); + let session = connect_test_session(addr, resolver).await.unwrap(); + + let stub: crate::RebuildFactory = Arc::new(|| { + Box::pin(async { + Err(mxaccess_nmx::NmxClientError::Transport( + mxaccess_rpc::transport::TransportError::Io( + std::io::Error::other("synthetic rebuild failure"), + ), + )) + }) + }); + session.set_recovery_factory(stub).await; + assert!(session.has_recovery_factory().await); + + let mut rx = session.recovery_events(); + let policy = RecoveryPolicy { + max_attempts: 3, + delay: std::time::Duration::ZERO, + }; + let err = session.recover_connection(policy).await.unwrap_err(); + + // Expect: Started{1}, Failed{1, will_retry=true}, Started{2}, + // Failed{2, will_retry=true}, Started{3}, Failed{3, will_retry=false}. + let expected_events = 6; + let mut started = 0; + let mut failed = 0; + let mut last_will_retry = None; + for _ in 0..expected_events { + match &*rx.recv().await.unwrap() { + RecoveryEvent::Started { .. } => started += 1, + RecoveryEvent::Failed { + will_retry, .. + } => { + failed += 1; + last_will_retry = Some(*will_retry); + } + RecoveryEvent::Recovered { .. } => panic!("unexpected Recovered"), + } + } + assert_eq!(started, 3); + assert_eq!(failed, 3); + assert_eq!(last_will_retry, Some(false)); + + // Bubbled error wraps the final synthesised IO error. + match err { + Error::Connection(ConnectionError::TransportFailure { detail }) => { + assert!(detail.contains("synthetic rebuild failure")); + } + other => panic!("expected TransportFailure, got {other:?}"), + } + handle.abort(); + } + + #[tokio::test] + async fn subscribe_populates_registry_unsubscribe_clears_it() { + // F16: every successful subscribe() inserts into the + // SubscriptionEntry registry; unsubscribe() removes it. + // Recovery walks this registry to replay AdviseSupervisory. + let (addr, handle) = + unauthenticated_server(vec![(0, Vec::new()), (0, Vec::new())]).await; + let resolver: Arc = Arc::new(StaticResolver::new(&[( + "TestObj.TestInt", + sample_metadata(), + )])); + let session = connect_test_session(addr, resolver).await.unwrap(); + + // Pre-condition: empty registry. + assert_eq!(session.inner.subscriptions.lock().await.len(), 0); + + let sub = session.subscribe("TestObj.TestInt").await.unwrap(); + let cid = sub.correlation_id; + assert_eq!(session.inner.subscriptions.lock().await.len(), 1); + assert!(session.inner.subscriptions.lock().await.contains_key(&cid)); + + session.unsubscribe(sub).await.unwrap(); + assert_eq!(session.inner.subscriptions.lock().await.len(), 0); + handle.await.unwrap(); + } + #[tokio::test] async fn recover_connection_after_shutdown_returns_engine_not_registered() { use crate::RecoveryPolicy; @@ -2036,18 +2346,32 @@ mod tests { let resolver: Arc = Arc::new(StaticResolver::new(&[])); let session = connect_test_session(addr, resolver).await.unwrap(); + // F16: install a factory that always errors. recover_connection + // emits Started → Failed; both subscribers see the same + // Arc-cloned Started event before the call returns. + let stub: crate::RebuildFactory = Arc::new(|| { + Box::pin(async { + Err(mxaccess_nmx::NmxClientError::Transport( + mxaccess_rpc::transport::TransportError::Io( + std::io::Error::other("stub factory: rebuild always fails"), + ), + )) + }) + }); + session.set_recovery_factory(stub).await; + let mut rx_a = session.recovery_events(); let mut rx_b = session.recovery_events(); - session + let _ = session .recover_connection(RecoveryPolicy::default()) - .await - .unwrap(); + .await; - // Both subscribers receive the same Arc-cloned event. + // First event from each receiver is the same Started Arc. let a = rx_a.recv().await.unwrap(); let b = rx_b.recv().await.unwrap(); assert!(Arc::ptr_eq(&a, &b)); + assert!(matches!(*a, RecoveryEvent::Started { attempt: 1 })); handle.abort(); }