[M4] mxaccess: Session::recover_connection + RecoveryEvent broadcast

Wires the recovery API surface and event channel. Recovery is
currently a no-op (validates policy + emits Started/Recovered
events); the real teardown + re-bind + re-advise loop is wave-3
work tracked as F16.

New
- Session::recover_connection(policy) — port of
  MxNativeSession.RecoverConnectionAsync (cs:399-440). Validates
  policy.max_attempts >= 1 (mirrors cs:33-36 via
  RecoveryPolicy::validate). Emits RecoveryEvent::Started + Recovered
  through the broadcast channel. Returns Ok(()) immediately — actual
  reconnect work is F16.
- Session::recovery_events() -> broadcast::Receiver<Arc<RecoveryEvent>>
  — typed observable for consumers that want to wire monitoring or
  state-machine handling. Same Arc-broadcast pattern as
  Session::callbacks(). Multi-subscriber safe (Arc::ptr_eq verified
  in tests).
- SessionInner.recovery_tx: broadcast::Sender<Arc<RecoveryEvent>>
  initialized in connect_nmx + connect_test_session.

Removed lib.rs stub (was Err(Unsupported)).

design/followups.md: F16 added (P1) covering the actual reconnect
loop. Resolves when R15's long-lived connection task lands and
SessionInner gains a subscription registry — at that point the
recover loop becomes ~50 lines slotting RecoverConnectionCore-style
work between the Started and Recovered events.

Tests (4 new in mxaccess; total 48)
- recover_connection emits Started + Recovered for the default
  single-attempt policy.
- recover_connection rejects max_attempts == 0 with InvalidArgument.
- recover_connection after shutdown returns EngineNotRegistered.
- recovery_events supports multiple subscribers (Arc::ptr_eq
  verifies the same allocation reaches both).

Test count delta: 520 -> 524 (+4). All four DoD gates green.
Open followups: 9 -> 10 (added F16).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-05 09:59:25 -04:00
parent 2dc091d0be
commit 4863c6dc1f
3 changed files with 168 additions and 8 deletions
-7
View File
@@ -428,13 +428,6 @@ impl Session {
})
}
pub async fn recover_connection(&self, _policy: RecoveryPolicy) -> Result<(), Error> {
Err(Error::Unsupported {
operation: Cow::Borrowed("Session::recover_connection"),
transport: TransportKind::Nmx,
})
}
/// Orderly shutdown — flushes `UnAdvise` for every live subscription,
/// then `UnregisterEngine`. Recommended exit path for production code.
pub async fn shutdown(self, _timeout: Duration) -> Result<(), Error> {
+162 -1
View File
@@ -46,9 +46,18 @@ use tokio::sync::{Mutex, broadcast};
use tokio::task::JoinHandle;
use tokio_stream::wrappers::BroadcastStream;
use crate::DataChange;
use crate::{DataChange, RecoveryEvent};
use futures_util::Stream;
/// Capacity of the broadcast channel that fans out
/// [`RecoveryEvent`]s to consumers via [`Session::recovery_events`].
///
/// Recovery events are bursty (one Started + one Recovered/Failed per
/// attempt) but rare (only fire when `recover_connection` runs).
/// Picked to absorb a multi-attempt sequence without dropping for a
/// briefly slow consumer.
const RECOVERY_BROADCAST_CAPACITY: usize = 64;
use crate::{
ConfigError, ConnectionError, Error, RecoveryPolicy, SecurityContext, Session, SessionOptions,
};
@@ -328,6 +337,10 @@ pub struct SessionInner {
/// `NmxSubscriptionMessage`s onto `callback_tx`. `None` after
/// `shutdown_nmx` joins on it.
pub(crate) router_handle: std::sync::Mutex<Option<JoinHandle<()>>>,
/// Broadcast channel for `RecoveryEvent`s. Wrapped in `Arc` so
/// `RecoveryEvent` (which contains `Error`, not `Clone`) can be
/// cheaply cloned across receivers.
pub(crate) recovery_tx: broadcast::Sender<Arc<RecoveryEvent>>,
/// `false` after [`Session::shutdown`] has run successfully. Subsequent
/// operations short-circuit with [`Error::Connection`].
pub(crate) connected: std::sync::atomic::AtomicBool,
@@ -468,6 +481,8 @@ impl Session {
}
}
let (recovery_tx, _) = broadcast::channel(RECOVERY_BROADCAST_CAPACITY);
Ok(Self {
inner: Arc::new(SessionInner {
options,
@@ -476,11 +491,66 @@ impl Session {
callback_exporter: Mutex::new(Some(exporter)),
callback_tx,
router_handle: std::sync::Mutex::new(Some(router_handle)),
recovery_tx,
connected: std::sync::atomic::AtomicBool::new(true),
}),
})
}
/// Subscribe to recovery events.
///
/// Returns a [`broadcast::Receiver`] that yields one
/// [`RecoveryEvent`] per attempt + completion-or-failure of any
/// future [`Session::recover_connection`] call.
///
/// The wave-2 implementation of `recover_connection` is currently a
/// no-op (validates the policy + emits Started/Recovered events
/// only); the real reconnect-and-readvise loop is wave-3 work
/// tracked as F16. Consumers can wire this up today and start
/// observing the event shape; events will gain semantic meaning
/// when the real loop lands.
#[must_use]
pub fn recovery_events(&self) -> broadcast::Receiver<Arc<RecoveryEvent>> {
self.inner.recovery_tx.subscribe()
}
/// Force a connection-recovery cycle. Mirrors
/// `MxNativeSession.RecoverConnectionAsync` (`cs:399-440`).
///
/// Currently a **no-op recovery** — the API surface, policy
/// validation, and event-emission shape are wired but the actual
/// "tear down + re-bind + re-advise" loop is wave-3 work
/// (followup F16). Each attempt fires
/// [`RecoveryEvent::Started`] then immediately
/// [`RecoveryEvent::Recovered`] without doing real recovery work.
/// This lets downstream consumers wire `recovery_events()`
/// observers today; once F16 lands the events gain semantic
/// meaning without an API break.
///
/// # Errors
/// - [`Error::Configuration`] when `policy.max_attempts == 0`
/// (per [`RecoveryPolicy::validate`]).
/// - [`Error::Connection`] when the session is already shut down.
pub async fn recover_connection(&self, policy: RecoveryPolicy) -> Result<(), Error> {
self.ensure_connected()?;
policy.validate()?;
// Single-pass loop — the no-op "recovery" always succeeds, so
// we exit after attempt 1. The shape mirrors what F16's real
// loop will look like (with `RecoverConnectionCore` slotted in
// place of the immediate Recovered emission).
let _ = self
.inner
.recovery_tx
.send(Arc::new(RecoveryEvent::Started { attempt: 1 }));
// (No-op recovery body — F16 fills in.)
let _ = self
.inner
.recovery_tx
.send(Arc::new(RecoveryEvent::Recovered { attempt: 1 }));
Ok(())
}
/// Subscribe to the raw stream of parsed callback messages.
/// Returns a [`tokio::sync::broadcast::Receiver`] that yields every
/// `NmxSubscriptionMessage` the router decoded (both `0x32`
@@ -1194,6 +1264,7 @@ mod tests {
.unwrap();
let (callback_tx, _) = broadcast::channel(CALLBACK_BROADCAST_CAPACITY);
let router_handle = tokio::spawn(callback_router(callback_events, callback_tx.clone()));
let (recovery_tx, _) = broadcast::channel(RECOVERY_BROADCAST_CAPACITY);
Ok(Session {
inner: Arc::new(SessionInner {
@@ -1203,6 +1274,7 @@ mod tests {
callback_exporter: Mutex::new(Some(exporter)),
callback_tx,
router_handle: std::sync::Mutex::new(Some(router_handle)),
recovery_tx,
connected: std::sync::atomic::AtomicBool::new(true),
}),
})
@@ -1832,6 +1904,95 @@ mod tests {
handle.abort();
}
// ---- Session::recover_connection + recovery_events ---------------
#[tokio::test]
async fn recover_connection_emits_started_then_recovered_for_default_policy() {
use crate::RecoveryPolicy;
let (addr, handle) = unauthenticated_server(Vec::new()).await;
let resolver: Arc<dyn Resolver> = Arc::new(StaticResolver::new(&[]));
let session = connect_test_session(addr, resolver).await.unwrap();
let mut rx = session.recovery_events();
session
.recover_connection(RecoveryPolicy::default())
.await
.unwrap();
let first = rx.recv().await.unwrap();
let second = rx.recv().await.unwrap();
match (&*first, &*second) {
(RecoveryEvent::Started { attempt: a1 }, RecoveryEvent::Recovered { attempt: a2 }) => {
assert_eq!(*a1, 1);
assert_eq!(*a2, 1);
}
other => panic!("expected (Started, Recovered), got {other:?}"),
}
handle.abort();
}
#[tokio::test]
async fn recover_connection_rejects_zero_max_attempts() {
use crate::RecoveryPolicy;
let (addr, handle) = unauthenticated_server(Vec::new()).await;
let resolver: Arc<dyn Resolver> = Arc::new(StaticResolver::new(&[]));
let session = connect_test_session(addr, resolver).await.unwrap();
let bad = RecoveryPolicy {
max_attempts: 0,
delay: std::time::Duration::ZERO,
};
let err = session.recover_connection(bad).await.unwrap_err();
match err {
Error::Configuration(ConfigError::InvalidArgument { detail }) => {
assert!(detail.contains("max_attempts must be at least 1"));
}
other => panic!("expected InvalidArgument, got {other:?}"),
}
handle.abort();
}
#[tokio::test]
async fn recover_connection_after_shutdown_returns_engine_not_registered() {
use crate::RecoveryPolicy;
let (addr, handle) = unauthenticated_server(vec![(0, Vec::new())]).await;
let resolver: Arc<dyn Resolver> = Arc::new(StaticResolver::new(&[]));
let session = connect_test_session(addr, resolver).await.unwrap();
let cloned = session.clone();
session.shutdown_nmx().await.unwrap();
let err = cloned
.recover_connection(RecoveryPolicy::default())
.await
.unwrap_err();
assert!(matches!(
err,
Error::Connection(ConnectionError::EngineNotRegistered)
));
handle.await.unwrap();
}
#[tokio::test]
async fn recovery_events_supports_multiple_subscribers() {
use crate::RecoveryPolicy;
let (addr, handle) = unauthenticated_server(Vec::new()).await;
let resolver: Arc<dyn Resolver> = Arc::new(StaticResolver::new(&[]));
let session = connect_test_session(addr, resolver).await.unwrap();
let mut rx_a = session.recovery_events();
let mut rx_b = session.recovery_events();
session
.recover_connection(RecoveryPolicy::default())
.await
.unwrap();
// Both subscribers receive the same Arc-cloned event.
let a = rx_a.recv().await.unwrap();
let b = rx_b.recv().await.unwrap();
assert!(Arc::ptr_eq(&a, &b));
handle.abort();
}
// ---- Session::read (read-as-subscribe) ----------------------------
#[tokio::test]