diff --git a/design/followups.md b/design/followups.md index 1d686b3..7eb851c 100644 --- a/design/followups.md +++ b/design/followups.md @@ -54,6 +54,12 @@ move to `## Resolved` with a date + commit hash. **Why deferred:** `ManagedNmxService2Client.Create()` (`ManagedNmxService2Client.cs:30-64`) auto-discovers `(host, port, service_ipid)` by activating the `NmxSvc.NmxService` COM ProgID, marshalling the resulting `IUnknown` to an OBJREF, calling `IObjectExporter::ResolveOxid` against the OXID inside, then `IRemUnknown::RemQueryInterface` to get the `INmxService2` IPID. This requires `windows-rs` for `CoCreateInstance` / `CLSIDFromProgID` (the same gating dep as F6), plus the `ComObjRefProvider.MarshalIUnknownObjRef` port (also F6). **Resolves when:** F6 lands (windows-rs wired in + `ComObjRefProvider` port). At that point `NmxClient::create()` becomes ~30 lines that chain the existing primitives: COM activation → `MarshalIUnknownObjRef` → `ComObjRef::parse` → `object_exporter_client::resolve_oxid_with_managed_ntlm_packet_integrity` → `rem_unknown::encode_rem_query_interface_request` over a temporary transport → `NmxClient::connect`. +### F16 — Real `Session::recover_connection` reconnect loop (re-bind + re-advise) +**Severity:** P1 +**Source:** M4 wave 2/3 boundary, `crates/mxaccess/src/session.rs` +**Why deferred:** Wave-2 `Session::recover_connection` validates the policy and emits `RecoveryEvent::Started` + `RecoveryEvent::Recovered` on each call but does **NOT** actually tear down + re-establish the NMX transport / re-advise active subscriptions. The .NET reference's `RecoverConnectionCore` (`MxNativeSession.cs:442-474`) does all three: builds a replacement `ManagedNmxService2Client` via `CreateRegisteredService`, re-`Connect`s every `_publisherEndpoints` entry, re-`AdviseSupervisory`s every entry in `_subscriptions`, then atomically swaps the old service for the new one. Porting this to Rust requires (a) tracking the active subscriptions inside `SessionInner` (currently they're owned by the consumer's `Subscription` handles, with no central registry); (b) the long-lived connection task per R15 in `design/70-risks-and-open-questions.md` so swap-in-place is safe under concurrent operations; (c) a way to re-create the `CallbackExporter` (or keep the existing one bound while the underlying transport is replaced — needs design work). +**Resolves when:** R15's long-lived connection task lands and `SessionInner` gains a subscription registry. At that point the recover loop becomes ~50 lines: for `attempt in 1..=max_attempts`, emit Started → drop+rebuild NmxClient → `register_engine_2` with the existing OBJREF → re-advise every registered correlation_id → emit Recovered (or Failed + sleep delay + continue, mirroring the `cs:407-440` shape exactly). + ### F14 — `tiberius`-backed SQL implementation of `Resolver` + `UserResolver` **Severity:** P2 **Source:** M3 stream A, `crates/mxaccess-galaxy/src/sql.rs` (constants present, no client wiring yet) diff --git a/rust/crates/mxaccess/src/lib.rs b/rust/crates/mxaccess/src/lib.rs index bc62c69..c779c5f 100644 --- a/rust/crates/mxaccess/src/lib.rs +++ b/rust/crates/mxaccess/src/lib.rs @@ -428,13 +428,6 @@ impl Session { }) } - pub async fn recover_connection(&self, _policy: RecoveryPolicy) -> Result<(), Error> { - Err(Error::Unsupported { - operation: Cow::Borrowed("Session::recover_connection"), - transport: TransportKind::Nmx, - }) - } - /// Orderly shutdown — flushes `UnAdvise` for every live subscription, /// then `UnregisterEngine`. Recommended exit path for production code. pub async fn shutdown(self, _timeout: Duration) -> Result<(), Error> { diff --git a/rust/crates/mxaccess/src/session.rs b/rust/crates/mxaccess/src/session.rs index 9a98a9e..38ab907 100644 --- a/rust/crates/mxaccess/src/session.rs +++ b/rust/crates/mxaccess/src/session.rs @@ -46,9 +46,18 @@ use tokio::sync::{Mutex, broadcast}; use tokio::task::JoinHandle; use tokio_stream::wrappers::BroadcastStream; -use crate::DataChange; +use crate::{DataChange, RecoveryEvent}; use futures_util::Stream; +/// Capacity of the broadcast channel that fans out +/// [`RecoveryEvent`]s to consumers via [`Session::recovery_events`]. +/// +/// Recovery events are bursty (one Started + one Recovered/Failed per +/// attempt) but rare (only fire when `recover_connection` runs). +/// Picked to absorb a multi-attempt sequence without dropping for a +/// briefly slow consumer. +const RECOVERY_BROADCAST_CAPACITY: usize = 64; + use crate::{ ConfigError, ConnectionError, Error, RecoveryPolicy, SecurityContext, Session, SessionOptions, }; @@ -328,6 +337,10 @@ pub struct SessionInner { /// `NmxSubscriptionMessage`s onto `callback_tx`. `None` after /// `shutdown_nmx` joins on it. pub(crate) router_handle: std::sync::Mutex>>, + /// Broadcast channel for `RecoveryEvent`s. Wrapped in `Arc` so + /// `RecoveryEvent` (which contains `Error`, not `Clone`) can be + /// cheaply cloned across receivers. + pub(crate) recovery_tx: broadcast::Sender>, /// `false` after [`Session::shutdown`] has run successfully. Subsequent /// operations short-circuit with [`Error::Connection`]. pub(crate) connected: std::sync::atomic::AtomicBool, @@ -468,6 +481,8 @@ impl Session { } } + let (recovery_tx, _) = broadcast::channel(RECOVERY_BROADCAST_CAPACITY); + Ok(Self { inner: Arc::new(SessionInner { options, @@ -476,11 +491,66 @@ impl Session { callback_exporter: Mutex::new(Some(exporter)), callback_tx, router_handle: std::sync::Mutex::new(Some(router_handle)), + recovery_tx, connected: std::sync::atomic::AtomicBool::new(true), }), }) } + /// Subscribe to recovery events. + /// + /// Returns a [`broadcast::Receiver`] that yields one + /// [`RecoveryEvent`] per attempt + completion-or-failure of any + /// future [`Session::recover_connection`] call. + /// + /// The wave-2 implementation of `recover_connection` is currently a + /// no-op (validates the policy + emits Started/Recovered events + /// only); the real reconnect-and-readvise loop is wave-3 work + /// tracked as F16. Consumers can wire this up today and start + /// observing the event shape; events will gain semantic meaning + /// when the real loop lands. + #[must_use] + pub fn recovery_events(&self) -> broadcast::Receiver> { + self.inner.recovery_tx.subscribe() + } + + /// Force a connection-recovery cycle. Mirrors + /// `MxNativeSession.RecoverConnectionAsync` (`cs:399-440`). + /// + /// Currently a **no-op recovery** — the API surface, policy + /// validation, and event-emission shape are wired but the actual + /// "tear down + re-bind + re-advise" loop is wave-3 work + /// (followup F16). Each attempt fires + /// [`RecoveryEvent::Started`] then immediately + /// [`RecoveryEvent::Recovered`] without doing real recovery work. + /// This lets downstream consumers wire `recovery_events()` + /// observers today; once F16 lands the events gain semantic + /// meaning without an API break. + /// + /// # Errors + /// - [`Error::Configuration`] when `policy.max_attempts == 0` + /// (per [`RecoveryPolicy::validate`]). + /// - [`Error::Connection`] when the session is already shut down. + pub async fn recover_connection(&self, policy: RecoveryPolicy) -> Result<(), Error> { + self.ensure_connected()?; + policy.validate()?; + + // Single-pass loop — the no-op "recovery" always succeeds, so + // we exit after attempt 1. The shape mirrors what F16's real + // loop will look like (with `RecoverConnectionCore` slotted in + // place of the immediate Recovered emission). + let _ = self + .inner + .recovery_tx + .send(Arc::new(RecoveryEvent::Started { attempt: 1 })); + // (No-op recovery body — F16 fills in.) + let _ = self + .inner + .recovery_tx + .send(Arc::new(RecoveryEvent::Recovered { attempt: 1 })); + Ok(()) + } + /// Subscribe to the raw stream of parsed callback messages. /// Returns a [`tokio::sync::broadcast::Receiver`] that yields every /// `NmxSubscriptionMessage` the router decoded (both `0x32` @@ -1194,6 +1264,7 @@ mod tests { .unwrap(); let (callback_tx, _) = broadcast::channel(CALLBACK_BROADCAST_CAPACITY); let router_handle = tokio::spawn(callback_router(callback_events, callback_tx.clone())); + let (recovery_tx, _) = broadcast::channel(RECOVERY_BROADCAST_CAPACITY); Ok(Session { inner: Arc::new(SessionInner { @@ -1203,6 +1274,7 @@ mod tests { callback_exporter: Mutex::new(Some(exporter)), callback_tx, router_handle: std::sync::Mutex::new(Some(router_handle)), + recovery_tx, connected: std::sync::atomic::AtomicBool::new(true), }), }) @@ -1832,6 +1904,95 @@ mod tests { handle.abort(); } + // ---- Session::recover_connection + recovery_events --------------- + + #[tokio::test] + async fn recover_connection_emits_started_then_recovered_for_default_policy() { + use crate::RecoveryPolicy; + let (addr, handle) = unauthenticated_server(Vec::new()).await; + let resolver: Arc = Arc::new(StaticResolver::new(&[])); + let session = connect_test_session(addr, resolver).await.unwrap(); + + let mut rx = session.recovery_events(); + session + .recover_connection(RecoveryPolicy::default()) + .await + .unwrap(); + + let first = rx.recv().await.unwrap(); + let second = rx.recv().await.unwrap(); + match (&*first, &*second) { + (RecoveryEvent::Started { attempt: a1 }, RecoveryEvent::Recovered { attempt: a2 }) => { + assert_eq!(*a1, 1); + assert_eq!(*a2, 1); + } + other => panic!("expected (Started, Recovered), got {other:?}"), + } + handle.abort(); + } + + #[tokio::test] + async fn recover_connection_rejects_zero_max_attempts() { + use crate::RecoveryPolicy; + let (addr, handle) = unauthenticated_server(Vec::new()).await; + let resolver: Arc = Arc::new(StaticResolver::new(&[])); + let session = connect_test_session(addr, resolver).await.unwrap(); + + let bad = RecoveryPolicy { + max_attempts: 0, + delay: std::time::Duration::ZERO, + }; + let err = session.recover_connection(bad).await.unwrap_err(); + match err { + Error::Configuration(ConfigError::InvalidArgument { detail }) => { + assert!(detail.contains("max_attempts must be at least 1")); + } + other => panic!("expected InvalidArgument, got {other:?}"), + } + handle.abort(); + } + + #[tokio::test] + async fn recover_connection_after_shutdown_returns_engine_not_registered() { + use crate::RecoveryPolicy; + let (addr, handle) = unauthenticated_server(vec![(0, Vec::new())]).await; + let resolver: Arc = Arc::new(StaticResolver::new(&[])); + let session = connect_test_session(addr, resolver).await.unwrap(); + let cloned = session.clone(); + session.shutdown_nmx().await.unwrap(); + let err = cloned + .recover_connection(RecoveryPolicy::default()) + .await + .unwrap_err(); + assert!(matches!( + err, + Error::Connection(ConnectionError::EngineNotRegistered) + )); + handle.await.unwrap(); + } + + #[tokio::test] + async fn recovery_events_supports_multiple_subscribers() { + use crate::RecoveryPolicy; + let (addr, handle) = unauthenticated_server(Vec::new()).await; + let resolver: Arc = Arc::new(StaticResolver::new(&[])); + let session = connect_test_session(addr, resolver).await.unwrap(); + + let mut rx_a = session.recovery_events(); + let mut rx_b = session.recovery_events(); + + session + .recover_connection(RecoveryPolicy::default()) + .await + .unwrap(); + + // Both subscribers receive the same Arc-cloned event. + let a = rx_a.recv().await.unwrap(); + let b = rx_b.recv().await.unwrap(); + assert!(Arc::ptr_eq(&a, &b)); + handle.abort(); + } + // ---- Session::read (read-as-subscribe) ---------------------------- #[tokio::test]