diff --git a/rust/crates/mxaccess/src/session.rs b/rust/crates/mxaccess/src/session.rs index 7c35e8e..3ee8f95 100644 --- a/rust/crates/mxaccess/src/session.rs +++ b/rust/crates/mxaccess/src/session.rs @@ -372,10 +372,71 @@ pub struct SessionInner { /// Per-subscription state retained for [`Session::recover_connection`]. /// The full `Subscription` handle stays with the consumer and continues /// to receive broadcasts across the swap; this struct just preserves -/// the inputs `advise_supervisory` needs. +/// the inputs the recovery loop needs to reissue the original advise. +/// +/// `mode` discriminates the original advise shape (plain +/// `AdviseSupervisory` vs buffered `RegisterReference` with the +/// `.property(buffer)` suffix) so the recovery branch can re-issue +/// the matching wire op — see F45 in `design/followups.md` for the +/// motivation. Mirrors `MxNativeSubscription.IsBuffered` +/// (`MxNativeSession.cs:59`) + the `ReAdviseSubscription` branch +/// (`MxNativeSession.cs:538-569`). #[derive(Debug, Clone)] pub(crate) struct SubscriptionEntry { pub(crate) metadata: Arc, + pub(crate) mode: SubscriptionMode, +} + +/// Discriminator for [`SubscriptionEntry`] — captures the original +/// advise shape so the recovery loop re-issues the same wire op. +/// +/// - [`SubscriptionMode::Plain`] entries replay via +/// `INmxService2::AdviseSupervisory` (matches the original +/// `Session::subscribe` path). +/// - [`SubscriptionMode::Buffered`] entries replay via +/// `INmxService2::RegisterReference` with the `.property(buffer)`- +/// suffixed item definition + the saved correlation id + +/// `subscribe = true` (matches the original +/// `Session::subscribe_buffered` path; mirrors +/// `MxNativeSession.ReAdviseSubscription` `cs:538-558`). +#[derive(Debug, Clone)] +pub(crate) enum SubscriptionMode { + /// Plain `AdviseSupervisory`-issued subscription. Recovery replays + /// it via `advise_supervisory`. + Plain, + /// Buffered `RegisterReference`-issued subscription. Recovery + /// rebuilds the original `NmxReferenceRegistrationMessage` and + /// dispatches `register_reference` so the server-side buffered + /// registration survives the transport swap. + /// + /// The fields preserve the inputs `subscribe_buffered_nmx` used: + /// the un-suffixed `item_definition` (re-suffixed via + /// [`NmxReferenceRegistrationMessage::to_buffered_item_definition`] + /// on replay), the `item_context` (currently always empty — + /// reserved for the compat-server F35 split-context form), the + /// `item_handle` (currently always 0), and the rounded-up cadence + /// in milliseconds (informational; native MXAccess does not transmit + /// it on the wire — see capture 082). + Buffered { + /// Cadence in milliseconds, already rounded up to the nearest + /// 100 ms via [`crate::BufferedOptions::rounded_update_interval_ms`]. + /// Carried through recovery so a future SetBufferedUpdateInterval + /// transmission could be wired without losing the original + /// cadence. + rounded_interval_ms: u32, + /// Un-suffixed item definition as supplied to + /// [`Session::subscribe_buffered`]. The `.property(buffer)` + /// suffix is re-applied on replay via + /// [`NmxReferenceRegistrationMessage::to_buffered_item_definition`]. + item_definition: String, + /// Item context (compat-server split form). Empty when + /// `subscribe_buffered` is called directly with a single + /// `reference` argument. + item_context: String, + /// Item handle (LMX-side identifier). 0 when the compat-server + /// layer has not assigned one. + item_handle: i32, + }, } /// F16 — pluggable factory that produces a fresh [`NmxClient`]. @@ -742,9 +803,14 @@ impl Session { /// server side knows about the same local callback exporter. /// 3. Re-run `SetHeartbeatSendInterval` if configured. /// 4. Walk the [`SessionInner::subscriptions`] registry and re-issue - /// `AdviseSupervisory` for every active subscription (each + /// the matching wire op for every active subscription. Plain + /// entries replay via `AdviseSupervisory`; buffered entries + /// (F45) replay via `RegisterReference` with the + /// `.property(buffer)` suffix + the saved correlation id + + /// `subscribe = true` — mirroring + /// `MxNativeSession.ReAdviseSubscription` (`cs:538-569`). Each /// correlation_id is preserved so the consumer's `Subscription` - /// handle keeps receiving on its existing broadcast filter). + /// handle keeps receiving on its existing broadcast filter. /// 5. Atomically swap the inner mutex's `NmxClient` so the old one /// drops at end-of-scope. /// @@ -786,12 +852,19 @@ impl Session { } } - // Step 4: replay every active subscription's AdviseSupervisory - // against the replacement transport. Snapshot under the lock, - // then drop the lock so subscribe()/un_advise() can race with - // recovery without deadlocking. The atomic swap below installs - // the replacement before we yield; after that, any new - // subscribe() call will see the registry+replacement pair. + // Step 4: replay every active subscription against the + // replacement transport. Plain entries → `AdviseSupervisory`; + // buffered entries (F45) → `RegisterReference` with the + // `.property(buffer)` suffix + saved correlation id + + // `subscribe = true`. Mirrors + // `MxNativeSession.ReAdviseSubscription` (`cs:538-569`) which + // branches on `subscription.IsBuffered`. + // + // Snapshot under the lock, then drop it so subscribe() / + // un_advise() can race with recovery without deadlocking. The + // atomic swap below installs the replacement before we yield; + // after that, any new subscribe() call will see the + // registry+replacement pair. let snapshot: Vec<([u8; 16], SubscriptionEntry)> = { let registry = self.inner.subscriptions.lock().await; registry @@ -800,18 +873,68 @@ impl Session { .collect() }; for (correlation_id, entry) in &snapshot { - let hr = replacement - .advise_supervisory( - opts.local_engine_id, - &entry.metadata, - *correlation_id, - opts.galaxy_id, - i32::from(opts.galaxy_id), - opts.source_platform_id, - ) - .await - .map_err(map_nmx)?; - ensure_hresult_ok(hr)?; + match &entry.mode { + SubscriptionMode::Plain => { + let hr = replacement + .advise_supervisory( + opts.local_engine_id, + &entry.metadata, + *correlation_id, + opts.galaxy_id, + i32::from(opts.galaxy_id), + opts.source_platform_id, + ) + .await + .map_err(map_nmx)?; + ensure_hresult_ok(hr)?; + } + SubscriptionMode::Buffered { + rounded_interval_ms: _rounded_interval_ms, + item_definition, + item_context, + item_handle, + } => { + // F45: rebuild the original buffered registration + // body. The codec helper re-applies the + // `.property(buffer)` suffix idempotently — passing + // an already-suffixed name returns it unchanged + // (verified by + // `request_to_buffered_item_definition_idempotent_case_insensitive` + // in `mxaccess-codec`). + let buffered_def = + NmxReferenceRegistrationMessage::to_buffered_item_definition( + item_definition, + ) + .map_err(|e| { + Error::Configuration(ConfigError::InvalidArgument { + detail: format!( + "recovery: buffered item definition: {e}" + ), + }) + })?; + let registration = NmxReferenceRegistrationMessage { + item_handle: *item_handle, + item_correlation_id: *correlation_id, + item_definition: buffered_def, + item_context: item_context.clone(), + subscribe: true, + reserved_25_27: [0; 2], + reserved_31_55: [0; 24], + }; + let hr = replacement + .register_reference( + opts.local_engine_id, + &entry.metadata, + ®istration, + opts.galaxy_id, + i32::from(opts.galaxy_id), + opts.source_platform_id, + ) + .await + .map_err(map_nmx)?; + ensure_hresult_ok(hr)?; + } + } } // Step 5: atomic swap. The previous NmxClient drops at end of @@ -1106,6 +1229,7 @@ impl Session { correlation_id, SubscriptionEntry { metadata: Arc::clone(&metadata_arc), + mode: SubscriptionMode::Plain, }, ); reg.len() @@ -1165,8 +1289,10 @@ impl Session { // computed for parity with the .NET reference; it is currently // not transmitted on the wire because native MXAccess holds it // client-side only (see capture 082's missing - // `SetBufferedUpdateInterval` frame). - let _rounded_ms = options.rounded_update_interval_ms(); + // `SetBufferedUpdateInterval` frame). F45 stashes the rounded + // cadence on the registry entry so a future SetBufferedUpdateInterval + // transmission could be wired without re-deriving it on replay. + let rounded_ms = options.rounded_update_interval_ms(); let inner = self.inner.clone(); let metadata = inner @@ -1221,20 +1347,25 @@ impl Session { drop(nmx); let metadata_arc = Arc::new(metadata); - // Record the active subscription so recover_connection can replay - // it after a transport rebuild. The replay path currently uses - // `AdviseSupervisory` for every entry; for buffered subscriptions - // that path is functionally equivalent (the LMX server already - // remembers the buffered registration via the `.property(buffer)` - // suffix carried in the metadata's name). Tracked as a sub-followup - // — see `design/followups.md` if a future iteration wants to - // re-issue `RegisterReference` instead. + // F45: tag the registry entry as Buffered with the original + // `(item_definition, item_context, item_handle)` triple + + // rounded cadence, so `recover_connection_core` can rebuild the + // matching `NmxReferenceRegistrationMessage` and dispatch + // `register_reference` (mirrors `MxNativeSession.ReAdviseSubscription` + // `cs:540-558` — the .NET reference's recovery branch on + // `subscription.IsBuffered`). let registry_size = { let mut reg = inner.subscriptions.lock().await; reg.insert( correlation_id, SubscriptionEntry { metadata: Arc::clone(&metadata_arc), + mode: SubscriptionMode::Buffered { + rounded_interval_ms: rounded_ms, + item_definition: reference.to_string(), + item_context: String::new(), + item_handle: 0, + }, }, ); reg.len() @@ -2496,6 +2627,262 @@ mod tests { handle.await.unwrap(); } + /// Per-RPC capture: `(opnum, stub_data)` for each Request PDU the + /// client dispatched against a [`recording_server`]. + type RecordedRpc = (u16, Vec); + /// Shared, mutable log of [`RecordedRpc`]s appended to by a + /// [`recording_server`] task. + type RecordedRpcLog = Arc>>; + + /// Same as [`unauthenticated_server`] but additionally records every + /// incoming Request PDU's `(opnum, stub_data)` so a test can assert + /// which RPC the client dispatched. Used by the F45 buffered-recovery + /// branch test below. + async fn recording_server( + responses: Vec<(i32, Vec)>, + ) -> (SocketAddr, RecordedRpcLog, tokio::task::JoinHandle<()>) { + let listener = TcpListener::bind(local_addr()).await.unwrap(); + let addr = listener.local_addr().unwrap(); + let recorded: RecordedRpcLog = Arc::new(StdMutex::new(Vec::new())); + let recorded_for_task = Arc::clone(&recorded); + let handle = tokio::spawn(async move { + let (mut sock, _) = listener.accept().await.unwrap(); + // Drain Bind, reply BindAck. + let mut hdr = [0u8; 16]; + sock.read_exact(&mut hdr).await.unwrap(); + let bind_h = PduHeader::decode(&hdr).unwrap(); + let mut body = vec![0u8; bind_h.fragment_length as usize - 16]; + sock.read_exact(&mut body).await.unwrap(); + let resp_h = PduHeader { + version: 5, + version_minor: 0, + packet_type: PacketType::BindAck, + packet_flags: 0x03, + data_representation: 0x10, + fragment_length: 16, + auth_length: 0, + call_id: bind_h.call_id, + }; + let mut out = [0u8; 16]; + resp_h.encode(&mut out).unwrap(); + sock.write_all(&out).await.unwrap(); + + for (custom_hresult, extra_payload) in responses { + sock.read_exact(&mut hdr).await.unwrap(); + let req_h = PduHeader::decode(&hdr).unwrap(); + let mut body = vec![0u8; req_h.fragment_length as usize - 16]; + sock.read_exact(&mut body).await.unwrap(); + + // body layout (Request PDU minus the 16-byte common + // header): 4 alloc_hint, 2 context_id, 2 opnum, then + // optional 16-byte object UUID (when PFC_OBJECT_UUID + // = 0x80 is set in packet_flags), then stub_data. + let opnum = u16::from_le_bytes([body[6], body[7]]); + let pfc_object_uuid = (req_h.packet_flags & 0x80) != 0; + let stub_offset = if pfc_object_uuid { 8 + 16 } else { 8 }; + let stub = body[stub_offset..].to_vec(); + recorded_for_task + .lock() + .unwrap() + .push((opnum, stub)); + + let mut stub_resp = Vec::new(); + stub_resp.extend_from_slice(&OrpcThat::default().encode()); + stub_resp.extend_from_slice(&custom_hresult.to_le_bytes()); + stub_resp.extend_from_slice(&extra_payload); + + let response = ResponsePdu { + header: PduHeader { + version: 5, + version_minor: 0, + packet_type: PacketType::Response, + packet_flags: 0x03, + data_representation: 0x10, + fragment_length: 0, + auth_length: 0, + call_id: req_h.call_id, + }, + allocation_hint: stub_resp.len() as u32, + context_id: 0, + cancel_count: 0, + reserved23: 0, + stub_data: stub_resp, + }; + let bytes = response.encode(); + sock.write_all(&bytes).await.unwrap(); + } + }); + (addr, recorded, handle) + } + + #[tokio::test] + async fn recover_connection_replays_buffered_subscription_via_register_reference() { + use crate::RecoveryPolicy; + // F45: a buffered SubscriptionEntry must replay through + // `RegisterReference` (with `.property(buffer)` suffix + the + // saved correlation id + `subscribe = true`) — NOT through + // `AdviseSupervisory`. Synthesise the entry directly so we don't + // need a live `subscribe_buffered` round-trip; drive the recovery + // path against a recording mock and inspect the wire bytes. + + // Mock that the original session connection talked to. Drained + // immediately — connect_test_session doesn't issue any calls. + let (addr_orig, handle_orig) = unauthenticated_server(Vec::new()).await; + let resolver: Arc = Arc::new(StaticResolver::new(&[( + "TestObj.TestInt", + sample_metadata(), + )])); + let session = connect_test_session(addr_orig, resolver).await.unwrap(); + + // Inject a buffered registry entry. Correlation id is fixed so + // we can assert the rebuilt registration carries the same id. + let cid: [u8; 16] = [0xAB; 16]; + let buffered_ref = "TestObj.TestInt"; + { + let mut reg = session.inner.subscriptions.lock().await; + reg.insert( + cid, + SubscriptionEntry { + metadata: Arc::new(sample_metadata()), + mode: SubscriptionMode::Buffered { + rounded_interval_ms: 1000, + item_definition: buffered_ref.to_string(), + item_context: String::new(), + item_handle: 0, + }, + }, + ); + } + + // Recording server that the rebuild factory will hand the + // session a fresh NmxClient pointing at. The recovery loop fires + // exactly two RPCs against this transport: + // 1. RegisterEngine2 (HRESULT 0) + // 2. TransferData carrying the rebuilt RegisterReference + // (HRESULT 0) + // The assertions below decode the second stub and pin the + // envelope kind + inner registration shape. + let (addr_replacement, recorded, handle_replacement) = + recording_server(vec![(0, Vec::new()), (0, Vec::new())]).await; + + let factory: crate::RebuildFactory = Arc::new(move || { + let addr = addr_replacement; + Box::pin(async move { + let mut transport = DceRpcTcpClient::connect(addr).await.map_err(|e| { + mxaccess_nmx::NmxClientError::Transport( + mxaccess_rpc::transport::TransportError::Io(std::io::Error::other( + e.to_string(), + )), + ) + })?; + transport.bind(svc::INTERFACE_ID, 0, 0).await.map_err(|e| { + mxaccess_nmx::NmxClientError::Transport( + mxaccess_rpc::transport::TransportError::Io(std::io::Error::other( + e.to_string(), + )), + ) + })?; + Ok(NmxClient::from_bound_transport( + transport, + Guid::new([0xCC; 16]), + )) + }) + }); + session.set_recovery_factory(factory).await; + + // Drive the recovery cycle. + session + .recover_connection(RecoveryPolicy { + max_attempts: 1, + delay: std::time::Duration::ZERO, + }) + .await + .unwrap(); + + // Inspect the recorded RPCs against the replacement server. + let recorded = recorded.lock().unwrap().clone(); + assert_eq!( + recorded.len(), + 2, + "expected RegisterEngine2 + TransferData(RegisterReference); got {}", + recorded.len() + ); + + // Slot 0 is RegisterEngine2 — pin only the opnum. + assert_eq!( + recorded[0].0, + mxaccess_rpc::nmx_service2_messages::REGISTER_ENGINE_2_OPNUM, + "first replay RPC should be RegisterEngine2" + ); + + // Slot 1 is TransferData carrying the rebuilt RegisterReference. + let (opnum, stub) = &recorded[1]; + assert_eq!( + *opnum, + mxaccess_rpc::nmx_service2_messages::TRANSFER_DATA_OPNUM, + "buffered replay RPC must be TransferData (not, e.g., a no-op)" + ); + + // The transfer_data stub layout: + // 0..32 OrpcThis + // 32..36 remote_galaxy_id i32 LE + // 36..40 remote_platform_id i32 LE + // 40..44 remote_engine_id i32 LE + // 44..48 length i32 LE + // 48..52 max_count i32 LE + // 52..(52+L) message_body (NmxTransferEnvelope + inner) + let body_offset = 52; + let length = i32::from_le_bytes(stub[44..48].try_into().unwrap()) as usize; + let message_body = &stub[body_offset..body_offset + length]; + + // Envelope kind at offset 10 of the envelope; ItemControl = 2 + // (the codec routes both AdviseSupervisory and RegisterReference + // through ItemControl envelopes — the inner command byte is + // what disambiguates). + let envelope_kind_i32 = i32::from_le_bytes(message_body[10..14].try_into().unwrap()); + assert_eq!( + envelope_kind_i32, + NmxTransferMessageKind::ItemControl as i32, + "envelope kind must be ItemControl" + ); + + // Inner body starts after the 46-byte envelope. The first byte + // distinguishes AdviseSupervisory (0x1f) from RegisterReference + // (0x10) — F45 specifically requires the buffered branch to + // emit the 0x10 form. + let inner = &message_body[NmxTransferEnvelope::HEADER_LEN..]; + assert_eq!( + inner[0], 0x10, + "buffered replay must use RegisterReference (command 0x10), not \ + AdviseSupervisory (0x1f); got 0x{:02x}", + inner[0] + ); + + // Decode the inner registration and pin: correlation id matches + // the registry entry's, item_definition carries `.property(buffer)`, + // and `subscribe == true`. + let parsed = NmxReferenceRegistrationMessage::parse(inner).unwrap(); + assert_eq!( + parsed.item_correlation_id, cid, + "rebuilt registration must carry the original correlation id" + ); + assert!( + parsed + .item_definition + .to_lowercase() + .ends_with(".property(buffer)"), + "rebuilt item_definition must end with .property(buffer); got {:?}", + parsed.item_definition + ); + assert!( + parsed.subscribe, + "rebuilt registration must have subscribe = true" + ); + + handle_replacement.await.unwrap(); + handle_orig.abort(); + } + #[tokio::test] async fn recover_connection_after_shutdown_returns_engine_not_registered() { use crate::RecoveryPolicy;