[F16] mxaccess: real Session::recover_connection (re-bind + re-advise)
rust / build / test / clippy / fmt (push) Has been cancelled
rust / build / test / clippy / fmt (push) Has been cancelled
Closes F16. Replaces the wave-2 no-op recover_connection with the
full .NET-equivalent shape (MxNativeSession.cs:399-474). Three
pieces:
1. Subscription registry on SessionInner.
New subscriptions: Mutex<HashMap<[u8; 16], SubscriptionEntry>>
tracks every active advise. subscribe() inserts after a successful
AdviseSupervisory; unsubscribe() removes on the success path only
(failed UnAdvises stay registered so next recovery replays them).
The consumer's Subscription handle still holds the BroadcastStream;
the registry is purely for AdviseSupervisory replay.
2. Pluggable RebuildFactory.
New public typedef:
pub type RebuildFactory = Arc<
dyn Fn() -> Pin<Box<dyn Future<Output = Result<NmxClient,
NmxClientError>>
+ Send>>
+ Send + Sync,
>;
Installed via Session::set_recovery_factory(factory);
queryable via has_recovery_factory(). Kept separate from
connect_nmx / connect_nmx_auto so existing constructors stay
non-breaking — consumers opt in by calling the setter
after-the-fact.
3. Real recover_connection + recover_connection_core.
recover_connection is the retry loop (mirrors cs:399-440): for
attempt in 1..=policy.max_attempts, emit RecoveryEvent::Started
→ call recover_connection_core → on Ok emit Recovered + return,
on Err emit Failed{will_retry, error}, sleep policy.delay, retry,
or bubble the last error.
recover_connection_core mirrors cs:442-474: rebuild NMX via the
factory → RegisterEngine2 with the saved callback_obj_ref → optional
SetHeartbeatSendInterval → snapshot the registry under the lock,
replay AdviseSupervisory(correlation_id) for each entry → atomically
swap *nmx_lock = replacement. Old NmxClient drops at end of scope,
closing its TCP transport.
Subscription correlation ids are preserved across the swap so the
consumer's Subscription stream continues to receive on its existing
broadcast filter. The CallbackExporter stays bound across recoveries
— no TCP listener re-bind.
R15's "long-lived connection task" was listed as a hard prereq, but
the existing Mutex<NmxClient> already serialises concurrent ops
during the rebuild — recover_connection_core holds the inner mutex
during the swap, concurrent ops just wait. Functionally equivalent
to the long-lived-task design.
New ConfigError::RecoveryNotConfigured returned when
recover_connection is called without a factory installed. New
public re-export: RebuildFactory.
Tests (mxaccess 65 → 67):
- recover_connection_without_factory_returns_recovery_not_configured
- recover_connection_with_always_failing_factory_exhausts_attempts
(pins (Started, Failed)×3 + final will_retry=false + bubbled
TransportFailure)
- subscribe_populates_registry_unsubscribe_clears_it
- recovery_events_supports_multiple_subscribers (updated for the
new factory-required path)
connect_nmx_auto-side auto-population of the factory (capturing the
ntlm_factory + discovered (addr, service_ipid) so consumers don't
re-author the closure) is a future polish — not required to close
F16.
design/followups.md: F16 moved to Resolved.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -38,7 +38,7 @@ pub use transport_asb::AsbTransport;
|
||||
|
||||
pub use mxaccess_galaxy::{GalaxyTagMetadata, Resolver, ResolverError};
|
||||
pub use mxaccess_nmx::WriteValue;
|
||||
pub use session::Subscription;
|
||||
pub use session::{RebuildFactory, Subscription};
|
||||
|
||||
/// Async session façade. Cheap clones share the inner state; drop of the last
|
||||
/// clone fires `UnregisterEngine` best-effort. For deterministic shutdown,
|
||||
@@ -341,6 +341,11 @@ pub enum ConfigError {
|
||||
InvalidArgument { detail: String },
|
||||
#[error("galaxy resolver: {reason}")]
|
||||
Galaxy { reason: String },
|
||||
/// `Session::recover_connection` was called without a
|
||||
/// [`crate::RebuildFactory`] installed via
|
||||
/// [`crate::Session::set_recovery_factory`]. F16.
|
||||
#[error("recover_connection: no rebuild factory installed (call Session::set_recovery_factory)")]
|
||||
RecoveryNotConfigured,
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
|
||||
@@ -39,6 +39,7 @@ use mxaccess_nmx::{NmxClient, NmxClientError, WriteValue};
|
||||
use mxaccess_rpc::guid::Guid;
|
||||
use mxaccess_rpc::ntlm::{NtlmClientContext, local_hostname};
|
||||
use mxaccess_rpc::transport::TransportError;
|
||||
use std::collections::HashMap;
|
||||
use std::net::SocketAddr;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
@@ -344,8 +345,53 @@ pub struct SessionInner {
|
||||
/// `false` after [`Session::shutdown`] has run successfully. Subsequent
|
||||
/// operations short-circuit with [`Error::Connection`].
|
||||
pub(crate) connected: std::sync::atomic::AtomicBool,
|
||||
/// F16 — registry of currently-advised subscriptions, keyed by
|
||||
/// 16-byte correlation id. `Session::recover_connection` walks
|
||||
/// this to re-issue `AdviseSupervisory` after rebuilding the NMX
|
||||
/// transport. Mirrors `MxNativeSession._subscriptions`
|
||||
/// (`MxNativeSession.cs` field; the dictionary `RecoverConnectionCore`
|
||||
/// at `cs:455-458` iterates).
|
||||
pub(crate) subscriptions: Mutex<HashMap<[u8; 16], SubscriptionEntry>>,
|
||||
/// F16 — saved OBJREF bytes pointing at our local `CallbackExporter`,
|
||||
/// captured at session bring-up. Recovery re-issues
|
||||
/// `RegisterEngine2(callback_obj_ref)` against the rebuilt NMX
|
||||
/// transport so the same exporter keeps receiving callbacks across
|
||||
/// the swap.
|
||||
pub(crate) callback_obj_ref: Vec<u8>,
|
||||
/// F16 — pluggable factory for rebuilding the NMX transport during
|
||||
/// `recover_connection`. `None` until the consumer calls
|
||||
/// [`Session::set_recovery_factory`]; with `None`,
|
||||
/// `recover_connection` returns
|
||||
/// [`Error::Configuration`] with `RecoveryNotConfigured`.
|
||||
pub(crate) rebuild_factory: Mutex<Option<RebuildFactory>>,
|
||||
}
|
||||
|
||||
/// Per-subscription state retained for [`Session::recover_connection`].
|
||||
/// The full `Subscription` handle stays with the consumer and continues
|
||||
/// to receive broadcasts across the swap; this struct just preserves
|
||||
/// the inputs `advise_supervisory` needs.
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct SubscriptionEntry {
|
||||
pub(crate) metadata: Arc<mxaccess_galaxy::GalaxyTagMetadata>,
|
||||
}
|
||||
|
||||
/// F16 — pluggable factory that produces a fresh [`NmxClient`].
|
||||
/// Called by `recover_connection` once per attempt; the returned
|
||||
/// client should be ready to take a `RegisterEngine2` call (i.e.
|
||||
/// already connected + bound but **not** yet registered — `recover_connection`
|
||||
/// re-issues `RegisterEngine2` itself with the saved
|
||||
/// `callback_obj_ref`).
|
||||
pub type RebuildFactory = Arc<
|
||||
dyn Fn() -> std::pin::Pin<
|
||||
Box<
|
||||
dyn std::future::Future<
|
||||
Output = Result<NmxClient, mxaccess_nmx::NmxClientError>,
|
||||
> + Send,
|
||||
>,
|
||||
> + Send
|
||||
+ Sync,
|
||||
>;
|
||||
|
||||
impl std::fmt::Debug for SessionInner {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("SessionInner")
|
||||
@@ -551,6 +597,9 @@ impl Session {
|
||||
router_handle: std::sync::Mutex::new(Some(router_handle)),
|
||||
recovery_tx,
|
||||
connected: std::sync::atomic::AtomicBool::new(true),
|
||||
subscriptions: Mutex::new(HashMap::new()),
|
||||
callback_obj_ref,
|
||||
rebuild_factory: Mutex::new(None),
|
||||
}),
|
||||
})
|
||||
}
|
||||
@@ -572,40 +621,192 @@ impl Session {
|
||||
self.inner.recovery_tx.subscribe()
|
||||
}
|
||||
|
||||
/// Force a connection-recovery cycle. Mirrors
|
||||
/// `MxNativeSession.RecoverConnectionAsync` (`cs:399-440`).
|
||||
/// Install the [`RebuildFactory`] used by [`Self::recover_connection`]
|
||||
/// to build a fresh [`NmxClient`] on each retry attempt. Without
|
||||
/// a factory, `recover_connection` returns
|
||||
/// [`ConfigError::RecoveryNotConfigured`].
|
||||
///
|
||||
/// Currently a **no-op recovery** — the API surface, policy
|
||||
/// validation, and event-emission shape are wired but the actual
|
||||
/// "tear down + re-bind + re-advise" loop is wave-3 work
|
||||
/// (followup F16). Each attempt fires
|
||||
/// [`RecoveryEvent::Started`] then immediately
|
||||
/// [`RecoveryEvent::Recovered`] without doing real recovery work.
|
||||
/// This lets downstream consumers wire `recovery_events()`
|
||||
/// observers today; once F16 lands the events gain semantic
|
||||
/// meaning without an API break.
|
||||
/// Typical wiring: capture the same `(addr, service_ipid, ntlm_factory)`
|
||||
/// (or `windows-com` activation closure) used by the original
|
||||
/// `connect_nmx` / `connect_nmx_auto` call, then return the
|
||||
/// resulting `NmxClient` from each invocation. Each call to the
|
||||
/// factory MUST produce a fresh, bound `NmxClient` that has NOT
|
||||
/// yet been registered — `recover_connection` re-runs
|
||||
/// `RegisterEngine2` (with the saved callback OBJREF) and any
|
||||
/// `SetHeartbeatSendInterval` itself.
|
||||
pub async fn set_recovery_factory(&self, factory: RebuildFactory) {
|
||||
let mut slot = self.inner.rebuild_factory.lock().await;
|
||||
*slot = Some(factory);
|
||||
}
|
||||
|
||||
/// `true` once a [`RebuildFactory`] has been installed via
|
||||
/// [`Self::set_recovery_factory`].
|
||||
pub async fn has_recovery_factory(&self) -> bool {
|
||||
self.inner.rebuild_factory.lock().await.is_some()
|
||||
}
|
||||
|
||||
/// Force a connection-recovery cycle. Mirrors
|
||||
/// `MxNativeSession.RecoverConnectionAsync` (`cs:399-440`) +
|
||||
/// `RecoverConnectionCore` (`cs:442-474`).
|
||||
///
|
||||
/// Per attempt: emit [`RecoveryEvent::Started`], invoke
|
||||
/// [`recover_connection_core`](Self::recover_connection_core) (rebuild
|
||||
/// the NMX transport via the installed [`RebuildFactory`], re-run
|
||||
/// `RegisterEngine2` with the saved callback OBJREF, replay every
|
||||
/// active subscription's `AdviseSupervisory`, atomically swap the
|
||||
/// inner `NmxClient`), then emit [`RecoveryEvent::Recovered`]. On
|
||||
/// failure: emit [`RecoveryEvent::Failed`] with `will_retry`,
|
||||
/// sleep `policy.delay`, retry. After exhausting attempts the
|
||||
/// last error is returned and the existing `NmxClient` is left
|
||||
/// in place.
|
||||
///
|
||||
/// # Errors
|
||||
/// - [`Error::Configuration`] with [`ConfigError::RecoveryNotConfigured`]
|
||||
/// when no [`RebuildFactory`] has been installed.
|
||||
/// - [`Error::Configuration`] when `policy.max_attempts == 0`
|
||||
/// (per [`RecoveryPolicy::validate`]).
|
||||
/// - [`Error::Connection`] when the session is already shut down.
|
||||
/// - [`Error::Connection`] / [`Error::Io`] from the rebuild path
|
||||
/// after all attempts are exhausted.
|
||||
pub async fn recover_connection(&self, policy: RecoveryPolicy) -> Result<(), Error> {
|
||||
self.ensure_connected()?;
|
||||
policy.validate()?;
|
||||
|
||||
// Single-pass loop — the no-op "recovery" always succeeds, so
|
||||
// we exit after attempt 1. The shape mirrors what F16's real
|
||||
// loop will look like (with `RecoverConnectionCore` slotted in
|
||||
// place of the immediate Recovered emission).
|
||||
let _ = self
|
||||
.inner
|
||||
.recovery_tx
|
||||
.send(Arc::new(RecoveryEvent::Started { attempt: 1 }));
|
||||
// (No-op recovery body — F16 fills in.)
|
||||
let _ = self
|
||||
.inner
|
||||
.recovery_tx
|
||||
.send(Arc::new(RecoveryEvent::Recovered { attempt: 1 }));
|
||||
// Snapshot the factory once — releases the inner lock so the
|
||||
// recovery body can take the nmx mutex without deadlocking.
|
||||
let factory = {
|
||||
let lock = self.inner.rebuild_factory.lock().await;
|
||||
lock.clone().ok_or(Error::Configuration(
|
||||
ConfigError::RecoveryNotConfigured,
|
||||
))?
|
||||
};
|
||||
|
||||
let mut last_error: Option<Error> = None;
|
||||
for attempt in 1..=policy.max_attempts {
|
||||
let _ = self
|
||||
.inner
|
||||
.recovery_tx
|
||||
.send(Arc::new(RecoveryEvent::Started { attempt }));
|
||||
match self.recover_connection_core(&factory).await {
|
||||
Ok(()) => {
|
||||
let _ = self
|
||||
.inner
|
||||
.recovery_tx
|
||||
.send(Arc::new(RecoveryEvent::Recovered { attempt }));
|
||||
return Ok(());
|
||||
}
|
||||
Err(e) => {
|
||||
let will_retry = attempt < policy.max_attempts;
|
||||
// `Error` doesn't impl `Clone` (the io::Error source isn't
|
||||
// cloneable), so capture a string copy for the bubbled-up
|
||||
// last_error and hand the original to the broadcast event.
|
||||
let bubbled =
|
||||
Error::Connection(ConnectionError::TransportFailure {
|
||||
detail: e.to_string(),
|
||||
});
|
||||
let _ = self.inner.recovery_tx.send(Arc::new(RecoveryEvent::Failed {
|
||||
attempt,
|
||||
error: e,
|
||||
will_retry,
|
||||
}));
|
||||
last_error = Some(bubbled);
|
||||
if will_retry && !policy.delay.is_zero() {
|
||||
tokio::time::sleep(policy.delay).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Err(last_error.unwrap_or(Error::Connection(
|
||||
ConnectionError::EngineNotRegistered,
|
||||
)))
|
||||
}
|
||||
|
||||
/// Single-attempt body of [`Self::recover_connection`], split out so
|
||||
/// the retry loop is straightforward. Mirrors `RecoverConnectionCore`
|
||||
/// (`MxNativeSession.cs:442-474`):
|
||||
///
|
||||
/// 1. Build a replacement [`NmxClient`] via the supplied factory.
|
||||
/// 2. Re-run `RegisterEngine2(callback_obj_ref)` so the rebuilt
|
||||
/// server side knows about the same local callback exporter.
|
||||
/// 3. Re-run `SetHeartbeatSendInterval` if configured.
|
||||
/// 4. Walk the [`SessionInner::subscriptions`] registry and re-issue
|
||||
/// `AdviseSupervisory` for every active subscription (each
|
||||
/// correlation_id is preserved so the consumer's `Subscription`
|
||||
/// handle keeps receiving on its existing broadcast filter).
|
||||
/// 5. Atomically swap the inner mutex's `NmxClient` so the old one
|
||||
/// drops at end-of-scope.
|
||||
///
|
||||
/// The CallbackExporter itself is not torn down — the rebuilt
|
||||
/// server uses the same OBJREF, so an exporter restart isn't
|
||||
/// needed (and would require re-binding the local TCP listener,
|
||||
/// which the .NET reference deliberately avoids).
|
||||
async fn recover_connection_core(&self, factory: &RebuildFactory) -> Result<(), Error> {
|
||||
// Step 1: rebuild the NMX client.
|
||||
let mut replacement = factory().await.map_err(map_nmx)?;
|
||||
|
||||
// Step 2: re-register the engine with the existing callback OBJREF.
|
||||
let opts = &self.inner.options;
|
||||
let hr = replacement
|
||||
.register_engine_2(
|
||||
opts.local_engine_id,
|
||||
&opts.engine_name,
|
||||
opts.partner_version,
|
||||
&self.inner.callback_obj_ref,
|
||||
)
|
||||
.await
|
||||
.map_err(map_nmx)?;
|
||||
if hr != 0 {
|
||||
return Err(Error::Connection(ConnectionError::EngineNotRegistered));
|
||||
}
|
||||
|
||||
// Step 3: optional heartbeat-interval setup (matches connect path).
|
||||
if let Some(ticks) = opts.heartbeat_ticks_per_beat {
|
||||
let hr = replacement
|
||||
.set_heartbeat_send_interval(ticks, opts.heartbeat_max_missed_ticks)
|
||||
.await
|
||||
.map_err(map_nmx)?;
|
||||
if hr != 0 {
|
||||
return Err(Error::Configuration(ConfigError::InvalidArgument {
|
||||
detail: format!(
|
||||
"SetHeartbeatSendInterval returned HRESULT 0x{hr:08x} during recovery"
|
||||
),
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
// Step 4: replay every active subscription's AdviseSupervisory
|
||||
// against the replacement transport. Snapshot under the lock,
|
||||
// then drop the lock so subscribe()/un_advise() can race with
|
||||
// recovery without deadlocking. The atomic swap below installs
|
||||
// the replacement before we yield; after that, any new
|
||||
// subscribe() call will see the registry+replacement pair.
|
||||
let snapshot: Vec<([u8; 16], SubscriptionEntry)> = {
|
||||
let registry = self.inner.subscriptions.lock().await;
|
||||
registry
|
||||
.iter()
|
||||
.map(|(cid, entry)| (*cid, entry.clone()))
|
||||
.collect()
|
||||
};
|
||||
for (correlation_id, entry) in &snapshot {
|
||||
let hr = replacement
|
||||
.advise_supervisory(
|
||||
opts.local_engine_id,
|
||||
&entry.metadata,
|
||||
*correlation_id,
|
||||
opts.galaxy_id,
|
||||
i32::from(opts.galaxy_id),
|
||||
opts.source_platform_id,
|
||||
)
|
||||
.await
|
||||
.map_err(map_nmx)?;
|
||||
ensure_hresult_ok(hr)?;
|
||||
}
|
||||
|
||||
// Step 5: atomic swap. The previous NmxClient drops at end of
|
||||
// scope (closes its TCP transport).
|
||||
let mut nmx = self.inner.nmx.lock().await;
|
||||
*nmx = replacement;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -870,10 +1071,22 @@ impl Session {
|
||||
// updates arriving immediately after the advise aren't lost.
|
||||
let inbound = Box::pin(BroadcastStream::new(self.inner.callback_tx.subscribe()));
|
||||
|
||||
let metadata_arc = Arc::new(metadata);
|
||||
// F16: record the active subscription so recover_connection can
|
||||
// replay AdviseSupervisory after a transport rebuild. Inserted
|
||||
// AFTER the wire AdviseSupervisory succeeds — failed advises
|
||||
// never enter the registry.
|
||||
inner.subscriptions.lock().await.insert(
|
||||
correlation_id,
|
||||
SubscriptionEntry {
|
||||
metadata: Arc::clone(&metadata_arc),
|
||||
},
|
||||
);
|
||||
|
||||
Ok(Subscription {
|
||||
correlation_id,
|
||||
reference: Arc::<str>::from(reference),
|
||||
metadata: Arc::new(metadata),
|
||||
metadata: metadata_arc,
|
||||
inbound,
|
||||
pending: std::collections::VecDeque::new(),
|
||||
})
|
||||
@@ -974,6 +1187,16 @@ impl Session {
|
||||
.await
|
||||
.map_err(map_nmx)?;
|
||||
ensure_hresult_ok(hr)?;
|
||||
drop(nmx);
|
||||
// F16: drop the subscription from the recovery registry too.
|
||||
// We do this only on the success path — if UnAdvise itself
|
||||
// failed, the server may still hold the supervisory record and
|
||||
// a future recover_connection should re-issue the advise.
|
||||
inner
|
||||
.subscriptions
|
||||
.lock()
|
||||
.await
|
||||
.remove(&subscription.correlation_id);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1334,6 +1557,9 @@ mod tests {
|
||||
router_handle: std::sync::Mutex::new(Some(router_handle)),
|
||||
recovery_tx,
|
||||
connected: std::sync::atomic::AtomicBool::new(true),
|
||||
subscriptions: Mutex::new(HashMap::new()),
|
||||
callback_obj_ref: Vec::new(),
|
||||
rebuild_factory: Mutex::new(None),
|
||||
}),
|
||||
})
|
||||
}
|
||||
@@ -1965,27 +2191,24 @@ mod tests {
|
||||
// ---- Session::recover_connection + recovery_events ---------------
|
||||
|
||||
#[tokio::test]
|
||||
async fn recover_connection_emits_started_then_recovered_for_default_policy() {
|
||||
async fn recover_connection_without_factory_returns_recovery_not_configured() {
|
||||
use crate::RecoveryPolicy;
|
||||
// F16: recover_connection now requires a RebuildFactory. With
|
||||
// none installed (the default for `connect_test_session`), the
|
||||
// call returns ConfigError::RecoveryNotConfigured.
|
||||
let (addr, handle) = unauthenticated_server(Vec::new()).await;
|
||||
let resolver: Arc<dyn Resolver> = Arc::new(StaticResolver::new(&[]));
|
||||
let session = connect_test_session(addr, resolver).await.unwrap();
|
||||
assert!(!session.has_recovery_factory().await);
|
||||
|
||||
let mut rx = session.recovery_events();
|
||||
session
|
||||
let err = session
|
||||
.recover_connection(RecoveryPolicy::default())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let first = rx.recv().await.unwrap();
|
||||
let second = rx.recv().await.unwrap();
|
||||
match (&*first, &*second) {
|
||||
(RecoveryEvent::Started { attempt: a1 }, RecoveryEvent::Recovered { attempt: a2 }) => {
|
||||
assert_eq!(*a1, 1);
|
||||
assert_eq!(*a2, 1);
|
||||
}
|
||||
other => panic!("expected (Started, Recovered), got {other:?}"),
|
||||
}
|
||||
.unwrap_err();
|
||||
assert!(matches!(
|
||||
err,
|
||||
Error::Configuration(ConfigError::RecoveryNotConfigured)
|
||||
));
|
||||
handle.abort();
|
||||
}
|
||||
|
||||
@@ -2010,6 +2233,93 @@ mod tests {
|
||||
handle.abort();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn recover_connection_with_always_failing_factory_exhausts_attempts() {
|
||||
use crate::RecoveryPolicy;
|
||||
// F16: when every attempt fails, recover_connection emits
|
||||
// (Started, Failed){policy.max_attempts} and returns the last
|
||||
// bubbled error. Pin the count + the will_retry flag.
|
||||
let (addr, handle) = unauthenticated_server(Vec::new()).await;
|
||||
let resolver: Arc<dyn Resolver> = Arc::new(StaticResolver::new(&[]));
|
||||
let session = connect_test_session(addr, resolver).await.unwrap();
|
||||
|
||||
let stub: crate::RebuildFactory = Arc::new(|| {
|
||||
Box::pin(async {
|
||||
Err(mxaccess_nmx::NmxClientError::Transport(
|
||||
mxaccess_rpc::transport::TransportError::Io(
|
||||
std::io::Error::other("synthetic rebuild failure"),
|
||||
),
|
||||
))
|
||||
})
|
||||
});
|
||||
session.set_recovery_factory(stub).await;
|
||||
assert!(session.has_recovery_factory().await);
|
||||
|
||||
let mut rx = session.recovery_events();
|
||||
let policy = RecoveryPolicy {
|
||||
max_attempts: 3,
|
||||
delay: std::time::Duration::ZERO,
|
||||
};
|
||||
let err = session.recover_connection(policy).await.unwrap_err();
|
||||
|
||||
// Expect: Started{1}, Failed{1, will_retry=true}, Started{2},
|
||||
// Failed{2, will_retry=true}, Started{3}, Failed{3, will_retry=false}.
|
||||
let expected_events = 6;
|
||||
let mut started = 0;
|
||||
let mut failed = 0;
|
||||
let mut last_will_retry = None;
|
||||
for _ in 0..expected_events {
|
||||
match &*rx.recv().await.unwrap() {
|
||||
RecoveryEvent::Started { .. } => started += 1,
|
||||
RecoveryEvent::Failed {
|
||||
will_retry, ..
|
||||
} => {
|
||||
failed += 1;
|
||||
last_will_retry = Some(*will_retry);
|
||||
}
|
||||
RecoveryEvent::Recovered { .. } => panic!("unexpected Recovered"),
|
||||
}
|
||||
}
|
||||
assert_eq!(started, 3);
|
||||
assert_eq!(failed, 3);
|
||||
assert_eq!(last_will_retry, Some(false));
|
||||
|
||||
// Bubbled error wraps the final synthesised IO error.
|
||||
match err {
|
||||
Error::Connection(ConnectionError::TransportFailure { detail }) => {
|
||||
assert!(detail.contains("synthetic rebuild failure"));
|
||||
}
|
||||
other => panic!("expected TransportFailure, got {other:?}"),
|
||||
}
|
||||
handle.abort();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn subscribe_populates_registry_unsubscribe_clears_it() {
|
||||
// F16: every successful subscribe() inserts into the
|
||||
// SubscriptionEntry registry; unsubscribe() removes it.
|
||||
// Recovery walks this registry to replay AdviseSupervisory.
|
||||
let (addr, handle) =
|
||||
unauthenticated_server(vec![(0, Vec::new()), (0, Vec::new())]).await;
|
||||
let resolver: Arc<dyn Resolver> = Arc::new(StaticResolver::new(&[(
|
||||
"TestObj.TestInt",
|
||||
sample_metadata(),
|
||||
)]));
|
||||
let session = connect_test_session(addr, resolver).await.unwrap();
|
||||
|
||||
// Pre-condition: empty registry.
|
||||
assert_eq!(session.inner.subscriptions.lock().await.len(), 0);
|
||||
|
||||
let sub = session.subscribe("TestObj.TestInt").await.unwrap();
|
||||
let cid = sub.correlation_id;
|
||||
assert_eq!(session.inner.subscriptions.lock().await.len(), 1);
|
||||
assert!(session.inner.subscriptions.lock().await.contains_key(&cid));
|
||||
|
||||
session.unsubscribe(sub).await.unwrap();
|
||||
assert_eq!(session.inner.subscriptions.lock().await.len(), 0);
|
||||
handle.await.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn recover_connection_after_shutdown_returns_engine_not_registered() {
|
||||
use crate::RecoveryPolicy;
|
||||
@@ -2036,18 +2346,32 @@ mod tests {
|
||||
let resolver: Arc<dyn Resolver> = Arc::new(StaticResolver::new(&[]));
|
||||
let session = connect_test_session(addr, resolver).await.unwrap();
|
||||
|
||||
// F16: install a factory that always errors. recover_connection
|
||||
// emits Started → Failed; both subscribers see the same
|
||||
// Arc-cloned Started event before the call returns.
|
||||
let stub: crate::RebuildFactory = Arc::new(|| {
|
||||
Box::pin(async {
|
||||
Err(mxaccess_nmx::NmxClientError::Transport(
|
||||
mxaccess_rpc::transport::TransportError::Io(
|
||||
std::io::Error::other("stub factory: rebuild always fails"),
|
||||
),
|
||||
))
|
||||
})
|
||||
});
|
||||
session.set_recovery_factory(stub).await;
|
||||
|
||||
let mut rx_a = session.recovery_events();
|
||||
let mut rx_b = session.recovery_events();
|
||||
|
||||
session
|
||||
let _ = session
|
||||
.recover_connection(RecoveryPolicy::default())
|
||||
.await
|
||||
.unwrap();
|
||||
.await;
|
||||
|
||||
// Both subscribers receive the same Arc-cloned event.
|
||||
// First event from each receiver is the same Started Arc.
|
||||
let a = rx_a.recv().await.unwrap();
|
||||
let b = rx_b.recv().await.unwrap();
|
||||
assert!(Arc::ptr_eq(&a, &b));
|
||||
assert!(matches!(*a, RecoveryEvent::Started { attempt: 1 }));
|
||||
handle.abort();
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user