[F45] mxaccess: recovery replay re-issues RegisterReference for buffered subs

`Session::recover_connection_core` previously walked
`SessionInner::subscriptions` and replayed every entry via
`AdviseSupervisory`, which lost the `.property(buffer)` registration
on buffered subscriptions — silently downgrading buffered → plain on
transport rebuild.

Fix:

- New `pub(crate) enum SubscriptionMode { Plain, Buffered { ... } }`
  discriminator carried on each `SubscriptionEntry`. Buffered variant
  retains the un-suffixed reference + the rounded interval (so the
  re-issued buffered registration matches the original cadence) +
  the empty `item_context` / zero `item_handle` matching the wire
  send.
- `Session::subscribe` (plain path) records `SubscriptionMode::Plain`.
  `subscribe_buffered_nmx` records `SubscriptionMode::Buffered { ... }`.
- `recover_connection_core` matches on `entry.mode`. Plain branch
  unchanged. Buffered branch re-applies `.property(buffer)` via
  `to_buffered_item_definition` (idempotent), rebuilds the original
  `NmxReferenceRegistrationMessage` with the saved correlation id +
  `subscribe = true`, and dispatches `register_reference` (kind=
  ItemControl, inner command 0x10) against the replacement
  transport. Mirrors `MxNativeSession.ReAdviseSubscription`
  (`MxNativeSession.cs:538-569`).

New unit test `recover_connection_replays_buffered_subscription_via_
register_reference` synthesises a buffered registry entry, installs a
`RebuildFactory` pointing at a recording NMX server, drives
`recover_connection`, then asserts the recorded `TransferData` carries
inner command `0x10` (NOT `0x1f`) with the `.property(buffer)`-
suffixed item_definition + the saved correlation id + subscribe=true.

Side-finding worth filing separately: `Session::unsubscribe`
unconditionally calls `un_advise` for both plain and buffered
entries, but the .NET reference's `Unsubscribe`
(`MxNativeSession.cs:361-381`) skips `UnAdvise` for buffered
(`if (!subscription.IsBuffered)`). Out of scope for F45 (recovery-
only); will file as F47.

Public API unchanged. `SubscriptionMode` + `SubscriptionEntry` stay
`pub(crate)` — `cargo public-api -p mxaccess` baseline is unchanged.

Workspace 793 → 794 tests; clippy clean; rustdoc clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-06 05:54:30 -04:00
parent 2281309a86
commit 9b57cf8f3b
+418 -31
View File
@@ -372,10 +372,71 @@ pub struct SessionInner {
/// Per-subscription state retained for [`Session::recover_connection`].
/// The full `Subscription` handle stays with the consumer and continues
/// to receive broadcasts across the swap; this struct just preserves
/// the inputs `advise_supervisory` needs.
/// the inputs the recovery loop needs to reissue the original advise.
///
/// `mode` discriminates the original advise shape (plain
/// `AdviseSupervisory` vs buffered `RegisterReference` with the
/// `.property(buffer)` suffix) so the recovery branch can re-issue
/// the matching wire op — see F45 in `design/followups.md` for the
/// motivation. Mirrors `MxNativeSubscription.IsBuffered`
/// (`MxNativeSession.cs:59`) + the `ReAdviseSubscription` branch
/// (`MxNativeSession.cs:538-569`).
#[derive(Debug, Clone)]
pub(crate) struct SubscriptionEntry {
pub(crate) metadata: Arc<mxaccess_galaxy::GalaxyTagMetadata>,
pub(crate) mode: SubscriptionMode,
}
/// Discriminator for [`SubscriptionEntry`] — captures the original
/// advise shape so the recovery loop re-issues the same wire op.
///
/// - [`SubscriptionMode::Plain`] entries replay via
/// `INmxService2::AdviseSupervisory` (matches the original
/// `Session::subscribe` path).
/// - [`SubscriptionMode::Buffered`] entries replay via
/// `INmxService2::RegisterReference` with the `.property(buffer)`-
/// suffixed item definition + the saved correlation id +
/// `subscribe = true` (matches the original
/// `Session::subscribe_buffered` path; mirrors
/// `MxNativeSession.ReAdviseSubscription` `cs:538-558`).
#[derive(Debug, Clone)]
pub(crate) enum SubscriptionMode {
/// Plain `AdviseSupervisory`-issued subscription. Recovery replays
/// it via `advise_supervisory`.
Plain,
/// Buffered `RegisterReference`-issued subscription. Recovery
/// rebuilds the original `NmxReferenceRegistrationMessage` and
/// dispatches `register_reference` so the server-side buffered
/// registration survives the transport swap.
///
/// The fields preserve the inputs `subscribe_buffered_nmx` used:
/// the un-suffixed `item_definition` (re-suffixed via
/// [`NmxReferenceRegistrationMessage::to_buffered_item_definition`]
/// on replay), the `item_context` (currently always empty —
/// reserved for the compat-server F35 split-context form), the
/// `item_handle` (currently always 0), and the rounded-up cadence
/// in milliseconds (informational; native MXAccess does not transmit
/// it on the wire — see capture 082).
Buffered {
/// Cadence in milliseconds, already rounded up to the nearest
/// 100 ms via [`crate::BufferedOptions::rounded_update_interval_ms`].
/// Carried through recovery so a future SetBufferedUpdateInterval
/// transmission could be wired without losing the original
/// cadence.
rounded_interval_ms: u32,
/// Un-suffixed item definition as supplied to
/// [`Session::subscribe_buffered`]. The `.property(buffer)`
/// suffix is re-applied on replay via
/// [`NmxReferenceRegistrationMessage::to_buffered_item_definition`].
item_definition: String,
/// Item context (compat-server split form). Empty when
/// `subscribe_buffered` is called directly with a single
/// `reference` argument.
item_context: String,
/// Item handle (LMX-side identifier). 0 when the compat-server
/// layer has not assigned one.
item_handle: i32,
},
}
/// F16 — pluggable factory that produces a fresh [`NmxClient`].
@@ -742,9 +803,14 @@ impl Session {
/// server side knows about the same local callback exporter.
/// 3. Re-run `SetHeartbeatSendInterval` if configured.
/// 4. Walk the [`SessionInner::subscriptions`] registry and re-issue
/// `AdviseSupervisory` for every active subscription (each
/// the matching wire op for every active subscription. Plain
/// entries replay via `AdviseSupervisory`; buffered entries
/// (F45) replay via `RegisterReference` with the
/// `.property(buffer)` suffix + the saved correlation id +
/// `subscribe = true` — mirroring
/// `MxNativeSession.ReAdviseSubscription` (`cs:538-569`). Each
/// correlation_id is preserved so the consumer's `Subscription`
/// handle keeps receiving on its existing broadcast filter).
/// handle keeps receiving on its existing broadcast filter.
/// 5. Atomically swap the inner mutex's `NmxClient` so the old one
/// drops at end-of-scope.
///
@@ -786,12 +852,19 @@ impl Session {
}
}
// Step 4: replay every active subscription's AdviseSupervisory
// against the replacement transport. Snapshot under the lock,
// then drop the lock so subscribe()/un_advise() can race with
// recovery without deadlocking. The atomic swap below installs
// the replacement before we yield; after that, any new
// subscribe() call will see the registry+replacement pair.
// Step 4: replay every active subscription against the
// replacement transport. Plain entries → `AdviseSupervisory`;
// buffered entries (F45) → `RegisterReference` with the
// `.property(buffer)` suffix + saved correlation id +
// `subscribe = true`. Mirrors
// `MxNativeSession.ReAdviseSubscription` (`cs:538-569`) which
// branches on `subscription.IsBuffered`.
//
// Snapshot under the lock, then drop it so subscribe() /
// un_advise() can race with recovery without deadlocking. The
// atomic swap below installs the replacement before we yield;
// after that, any new subscribe() call will see the
// registry+replacement pair.
let snapshot: Vec<([u8; 16], SubscriptionEntry)> = {
let registry = self.inner.subscriptions.lock().await;
registry
@@ -800,18 +873,68 @@ impl Session {
.collect()
};
for (correlation_id, entry) in &snapshot {
let hr = replacement
.advise_supervisory(
opts.local_engine_id,
&entry.metadata,
*correlation_id,
opts.galaxy_id,
i32::from(opts.galaxy_id),
opts.source_platform_id,
)
.await
.map_err(map_nmx)?;
ensure_hresult_ok(hr)?;
match &entry.mode {
SubscriptionMode::Plain => {
let hr = replacement
.advise_supervisory(
opts.local_engine_id,
&entry.metadata,
*correlation_id,
opts.galaxy_id,
i32::from(opts.galaxy_id),
opts.source_platform_id,
)
.await
.map_err(map_nmx)?;
ensure_hresult_ok(hr)?;
}
SubscriptionMode::Buffered {
rounded_interval_ms: _rounded_interval_ms,
item_definition,
item_context,
item_handle,
} => {
// F45: rebuild the original buffered registration
// body. The codec helper re-applies the
// `.property(buffer)` suffix idempotently — passing
// an already-suffixed name returns it unchanged
// (verified by
// `request_to_buffered_item_definition_idempotent_case_insensitive`
// in `mxaccess-codec`).
let buffered_def =
NmxReferenceRegistrationMessage::to_buffered_item_definition(
item_definition,
)
.map_err(|e| {
Error::Configuration(ConfigError::InvalidArgument {
detail: format!(
"recovery: buffered item definition: {e}"
),
})
})?;
let registration = NmxReferenceRegistrationMessage {
item_handle: *item_handle,
item_correlation_id: *correlation_id,
item_definition: buffered_def,
item_context: item_context.clone(),
subscribe: true,
reserved_25_27: [0; 2],
reserved_31_55: [0; 24],
};
let hr = replacement
.register_reference(
opts.local_engine_id,
&entry.metadata,
&registration,
opts.galaxy_id,
i32::from(opts.galaxy_id),
opts.source_platform_id,
)
.await
.map_err(map_nmx)?;
ensure_hresult_ok(hr)?;
}
}
}
// Step 5: atomic swap. The previous NmxClient drops at end of
@@ -1106,6 +1229,7 @@ impl Session {
correlation_id,
SubscriptionEntry {
metadata: Arc::clone(&metadata_arc),
mode: SubscriptionMode::Plain,
},
);
reg.len()
@@ -1165,8 +1289,10 @@ impl Session {
// computed for parity with the .NET reference; it is currently
// not transmitted on the wire because native MXAccess holds it
// client-side only (see capture 082's missing
// `SetBufferedUpdateInterval` frame).
let _rounded_ms = options.rounded_update_interval_ms();
// `SetBufferedUpdateInterval` frame). F45 stashes the rounded
// cadence on the registry entry so a future SetBufferedUpdateInterval
// transmission could be wired without re-deriving it on replay.
let rounded_ms = options.rounded_update_interval_ms();
let inner = self.inner.clone();
let metadata = inner
@@ -1221,20 +1347,25 @@ impl Session {
drop(nmx);
let metadata_arc = Arc::new(metadata);
// Record the active subscription so recover_connection can replay
// it after a transport rebuild. The replay path currently uses
// `AdviseSupervisory` for every entry; for buffered subscriptions
// that path is functionally equivalent (the LMX server already
// remembers the buffered registration via the `.property(buffer)`
// suffix carried in the metadata's name). Tracked as a sub-followup
// — see `design/followups.md` if a future iteration wants to
// re-issue `RegisterReference` instead.
// F45: tag the registry entry as Buffered with the original
// `(item_definition, item_context, item_handle)` triple +
// rounded cadence, so `recover_connection_core` can rebuild the
// matching `NmxReferenceRegistrationMessage` and dispatch
// `register_reference` (mirrors `MxNativeSession.ReAdviseSubscription`
// `cs:540-558` — the .NET reference's recovery branch on
// `subscription.IsBuffered`).
let registry_size = {
let mut reg = inner.subscriptions.lock().await;
reg.insert(
correlation_id,
SubscriptionEntry {
metadata: Arc::clone(&metadata_arc),
mode: SubscriptionMode::Buffered {
rounded_interval_ms: rounded_ms,
item_definition: reference.to_string(),
item_context: String::new(),
item_handle: 0,
},
},
);
reg.len()
@@ -2496,6 +2627,262 @@ mod tests {
handle.await.unwrap();
}
/// Per-RPC capture: `(opnum, stub_data)` for each Request PDU the
/// client dispatched against a [`recording_server`].
type RecordedRpc = (u16, Vec<u8>);
/// Shared, mutable log of [`RecordedRpc`]s appended to by a
/// [`recording_server`] task.
type RecordedRpcLog = Arc<StdMutex<Vec<RecordedRpc>>>;
/// Same as [`unauthenticated_server`] but additionally records every
/// incoming Request PDU's `(opnum, stub_data)` so a test can assert
/// which RPC the client dispatched. Used by the F45 buffered-recovery
/// branch test below.
async fn recording_server(
responses: Vec<(i32, Vec<u8>)>,
) -> (SocketAddr, RecordedRpcLog, tokio::task::JoinHandle<()>) {
let listener = TcpListener::bind(local_addr()).await.unwrap();
let addr = listener.local_addr().unwrap();
let recorded: RecordedRpcLog = Arc::new(StdMutex::new(Vec::new()));
let recorded_for_task = Arc::clone(&recorded);
let handle = tokio::spawn(async move {
let (mut sock, _) = listener.accept().await.unwrap();
// Drain Bind, reply BindAck.
let mut hdr = [0u8; 16];
sock.read_exact(&mut hdr).await.unwrap();
let bind_h = PduHeader::decode(&hdr).unwrap();
let mut body = vec![0u8; bind_h.fragment_length as usize - 16];
sock.read_exact(&mut body).await.unwrap();
let resp_h = PduHeader {
version: 5,
version_minor: 0,
packet_type: PacketType::BindAck,
packet_flags: 0x03,
data_representation: 0x10,
fragment_length: 16,
auth_length: 0,
call_id: bind_h.call_id,
};
let mut out = [0u8; 16];
resp_h.encode(&mut out).unwrap();
sock.write_all(&out).await.unwrap();
for (custom_hresult, extra_payload) in responses {
sock.read_exact(&mut hdr).await.unwrap();
let req_h = PduHeader::decode(&hdr).unwrap();
let mut body = vec![0u8; req_h.fragment_length as usize - 16];
sock.read_exact(&mut body).await.unwrap();
// body layout (Request PDU minus the 16-byte common
// header): 4 alloc_hint, 2 context_id, 2 opnum, then
// optional 16-byte object UUID (when PFC_OBJECT_UUID
// = 0x80 is set in packet_flags), then stub_data.
let opnum = u16::from_le_bytes([body[6], body[7]]);
let pfc_object_uuid = (req_h.packet_flags & 0x80) != 0;
let stub_offset = if pfc_object_uuid { 8 + 16 } else { 8 };
let stub = body[stub_offset..].to_vec();
recorded_for_task
.lock()
.unwrap()
.push((opnum, stub));
let mut stub_resp = Vec::new();
stub_resp.extend_from_slice(&OrpcThat::default().encode());
stub_resp.extend_from_slice(&custom_hresult.to_le_bytes());
stub_resp.extend_from_slice(&extra_payload);
let response = ResponsePdu {
header: PduHeader {
version: 5,
version_minor: 0,
packet_type: PacketType::Response,
packet_flags: 0x03,
data_representation: 0x10,
fragment_length: 0,
auth_length: 0,
call_id: req_h.call_id,
},
allocation_hint: stub_resp.len() as u32,
context_id: 0,
cancel_count: 0,
reserved23: 0,
stub_data: stub_resp,
};
let bytes = response.encode();
sock.write_all(&bytes).await.unwrap();
}
});
(addr, recorded, handle)
}
#[tokio::test]
async fn recover_connection_replays_buffered_subscription_via_register_reference() {
use crate::RecoveryPolicy;
// F45: a buffered SubscriptionEntry must replay through
// `RegisterReference` (with `.property(buffer)` suffix + the
// saved correlation id + `subscribe = true`) — NOT through
// `AdviseSupervisory`. Synthesise the entry directly so we don't
// need a live `subscribe_buffered` round-trip; drive the recovery
// path against a recording mock and inspect the wire bytes.
// Mock that the original session connection talked to. Drained
// immediately — connect_test_session doesn't issue any calls.
let (addr_orig, handle_orig) = unauthenticated_server(Vec::new()).await;
let resolver: Arc<dyn Resolver> = Arc::new(StaticResolver::new(&[(
"TestObj.TestInt",
sample_metadata(),
)]));
let session = connect_test_session(addr_orig, resolver).await.unwrap();
// Inject a buffered registry entry. Correlation id is fixed so
// we can assert the rebuilt registration carries the same id.
let cid: [u8; 16] = [0xAB; 16];
let buffered_ref = "TestObj.TestInt";
{
let mut reg = session.inner.subscriptions.lock().await;
reg.insert(
cid,
SubscriptionEntry {
metadata: Arc::new(sample_metadata()),
mode: SubscriptionMode::Buffered {
rounded_interval_ms: 1000,
item_definition: buffered_ref.to_string(),
item_context: String::new(),
item_handle: 0,
},
},
);
}
// Recording server that the rebuild factory will hand the
// session a fresh NmxClient pointing at. The recovery loop fires
// exactly two RPCs against this transport:
// 1. RegisterEngine2 (HRESULT 0)
// 2. TransferData carrying the rebuilt RegisterReference
// (HRESULT 0)
// The assertions below decode the second stub and pin the
// envelope kind + inner registration shape.
let (addr_replacement, recorded, handle_replacement) =
recording_server(vec![(0, Vec::new()), (0, Vec::new())]).await;
let factory: crate::RebuildFactory = Arc::new(move || {
let addr = addr_replacement;
Box::pin(async move {
let mut transport = DceRpcTcpClient::connect(addr).await.map_err(|e| {
mxaccess_nmx::NmxClientError::Transport(
mxaccess_rpc::transport::TransportError::Io(std::io::Error::other(
e.to_string(),
)),
)
})?;
transport.bind(svc::INTERFACE_ID, 0, 0).await.map_err(|e| {
mxaccess_nmx::NmxClientError::Transport(
mxaccess_rpc::transport::TransportError::Io(std::io::Error::other(
e.to_string(),
)),
)
})?;
Ok(NmxClient::from_bound_transport(
transport,
Guid::new([0xCC; 16]),
))
})
});
session.set_recovery_factory(factory).await;
// Drive the recovery cycle.
session
.recover_connection(RecoveryPolicy {
max_attempts: 1,
delay: std::time::Duration::ZERO,
})
.await
.unwrap();
// Inspect the recorded RPCs against the replacement server.
let recorded = recorded.lock().unwrap().clone();
assert_eq!(
recorded.len(),
2,
"expected RegisterEngine2 + TransferData(RegisterReference); got {}",
recorded.len()
);
// Slot 0 is RegisterEngine2 — pin only the opnum.
assert_eq!(
recorded[0].0,
mxaccess_rpc::nmx_service2_messages::REGISTER_ENGINE_2_OPNUM,
"first replay RPC should be RegisterEngine2"
);
// Slot 1 is TransferData carrying the rebuilt RegisterReference.
let (opnum, stub) = &recorded[1];
assert_eq!(
*opnum,
mxaccess_rpc::nmx_service2_messages::TRANSFER_DATA_OPNUM,
"buffered replay RPC must be TransferData (not, e.g., a no-op)"
);
// The transfer_data stub layout:
// 0..32 OrpcThis
// 32..36 remote_galaxy_id i32 LE
// 36..40 remote_platform_id i32 LE
// 40..44 remote_engine_id i32 LE
// 44..48 length i32 LE
// 48..52 max_count i32 LE
// 52..(52+L) message_body (NmxTransferEnvelope + inner)
let body_offset = 52;
let length = i32::from_le_bytes(stub[44..48].try_into().unwrap()) as usize;
let message_body = &stub[body_offset..body_offset + length];
// Envelope kind at offset 10 of the envelope; ItemControl = 2
// (the codec routes both AdviseSupervisory and RegisterReference
// through ItemControl envelopes — the inner command byte is
// what disambiguates).
let envelope_kind_i32 = i32::from_le_bytes(message_body[10..14].try_into().unwrap());
assert_eq!(
envelope_kind_i32,
NmxTransferMessageKind::ItemControl as i32,
"envelope kind must be ItemControl"
);
// Inner body starts after the 46-byte envelope. The first byte
// distinguishes AdviseSupervisory (0x1f) from RegisterReference
// (0x10) — F45 specifically requires the buffered branch to
// emit the 0x10 form.
let inner = &message_body[NmxTransferEnvelope::HEADER_LEN..];
assert_eq!(
inner[0], 0x10,
"buffered replay must use RegisterReference (command 0x10), not \
AdviseSupervisory (0x1f); got 0x{:02x}",
inner[0]
);
// Decode the inner registration and pin: correlation id matches
// the registry entry's, item_definition carries `.property(buffer)`,
// and `subscribe == true`.
let parsed = NmxReferenceRegistrationMessage::parse(inner).unwrap();
assert_eq!(
parsed.item_correlation_id, cid,
"rebuilt registration must carry the original correlation id"
);
assert!(
parsed
.item_definition
.to_lowercase()
.ends_with(".property(buffer)"),
"rebuilt item_definition must end with .property(buffer); got {:?}",
parsed.item_definition
);
assert!(
parsed.subscribe,
"rebuilt registration must have subscribe = true"
);
handle_replacement.await.unwrap();
handle_orig.abort();
}
#[tokio::test]
async fn recover_connection_after_shutdown_returns_engine_not_registered() {
use crate::RecoveryPolicy;