[F45] mxaccess: recovery replay re-issues RegisterReference for buffered subs
`Session::recover_connection_core` previously walked
`SessionInner::subscriptions` and replayed every entry via
`AdviseSupervisory`, which lost the `.property(buffer)` registration
on buffered subscriptions — silently downgrading buffered → plain on
transport rebuild.
Fix:
- New `pub(crate) enum SubscriptionMode { Plain, Buffered { ... } }`
discriminator carried on each `SubscriptionEntry`. Buffered variant
retains the un-suffixed reference + the rounded interval (so the
re-issued buffered registration matches the original cadence) +
the empty `item_context` / zero `item_handle` matching the wire
send.
- `Session::subscribe` (plain path) records `SubscriptionMode::Plain`.
`subscribe_buffered_nmx` records `SubscriptionMode::Buffered { ... }`.
- `recover_connection_core` matches on `entry.mode`. Plain branch
unchanged. Buffered branch re-applies `.property(buffer)` via
`to_buffered_item_definition` (idempotent), rebuilds the original
`NmxReferenceRegistrationMessage` with the saved correlation id +
`subscribe = true`, and dispatches `register_reference` (kind=
ItemControl, inner command 0x10) against the replacement
transport. Mirrors `MxNativeSession.ReAdviseSubscription`
(`MxNativeSession.cs:538-569`).
New unit test `recover_connection_replays_buffered_subscription_via_
register_reference` synthesises a buffered registry entry, installs a
`RebuildFactory` pointing at a recording NMX server, drives
`recover_connection`, then asserts the recorded `TransferData` carries
inner command `0x10` (NOT `0x1f`) with the `.property(buffer)`-
suffixed item_definition + the saved correlation id + subscribe=true.
Side-finding worth filing separately: `Session::unsubscribe`
unconditionally calls `un_advise` for both plain and buffered
entries, but the .NET reference's `Unsubscribe`
(`MxNativeSession.cs:361-381`) skips `UnAdvise` for buffered
(`if (!subscription.IsBuffered)`). Out of scope for F45 (recovery-
only); will file as F47.
Public API unchanged. `SubscriptionMode` + `SubscriptionEntry` stay
`pub(crate)` — `cargo public-api -p mxaccess` baseline is unchanged.
Workspace 793 → 794 tests; clippy clean; rustdoc clean.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -372,10 +372,71 @@ pub struct SessionInner {
|
|||||||
/// Per-subscription state retained for [`Session::recover_connection`].
|
/// Per-subscription state retained for [`Session::recover_connection`].
|
||||||
/// The full `Subscription` handle stays with the consumer and continues
|
/// The full `Subscription` handle stays with the consumer and continues
|
||||||
/// to receive broadcasts across the swap; this struct just preserves
|
/// to receive broadcasts across the swap; this struct just preserves
|
||||||
/// the inputs `advise_supervisory` needs.
|
/// the inputs the recovery loop needs to reissue the original advise.
|
||||||
|
///
|
||||||
|
/// `mode` discriminates the original advise shape (plain
|
||||||
|
/// `AdviseSupervisory` vs buffered `RegisterReference` with the
|
||||||
|
/// `.property(buffer)` suffix) so the recovery branch can re-issue
|
||||||
|
/// the matching wire op — see F45 in `design/followups.md` for the
|
||||||
|
/// motivation. Mirrors `MxNativeSubscription.IsBuffered`
|
||||||
|
/// (`MxNativeSession.cs:59`) + the `ReAdviseSubscription` branch
|
||||||
|
/// (`MxNativeSession.cs:538-569`).
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub(crate) struct SubscriptionEntry {
|
pub(crate) struct SubscriptionEntry {
|
||||||
pub(crate) metadata: Arc<mxaccess_galaxy::GalaxyTagMetadata>,
|
pub(crate) metadata: Arc<mxaccess_galaxy::GalaxyTagMetadata>,
|
||||||
|
pub(crate) mode: SubscriptionMode,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Discriminator for [`SubscriptionEntry`] — captures the original
|
||||||
|
/// advise shape so the recovery loop re-issues the same wire op.
|
||||||
|
///
|
||||||
|
/// - [`SubscriptionMode::Plain`] entries replay via
|
||||||
|
/// `INmxService2::AdviseSupervisory` (matches the original
|
||||||
|
/// `Session::subscribe` path).
|
||||||
|
/// - [`SubscriptionMode::Buffered`] entries replay via
|
||||||
|
/// `INmxService2::RegisterReference` with the `.property(buffer)`-
|
||||||
|
/// suffixed item definition + the saved correlation id +
|
||||||
|
/// `subscribe = true` (matches the original
|
||||||
|
/// `Session::subscribe_buffered` path; mirrors
|
||||||
|
/// `MxNativeSession.ReAdviseSubscription` `cs:538-558`).
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub(crate) enum SubscriptionMode {
|
||||||
|
/// Plain `AdviseSupervisory`-issued subscription. Recovery replays
|
||||||
|
/// it via `advise_supervisory`.
|
||||||
|
Plain,
|
||||||
|
/// Buffered `RegisterReference`-issued subscription. Recovery
|
||||||
|
/// rebuilds the original `NmxReferenceRegistrationMessage` and
|
||||||
|
/// dispatches `register_reference` so the server-side buffered
|
||||||
|
/// registration survives the transport swap.
|
||||||
|
///
|
||||||
|
/// The fields preserve the inputs `subscribe_buffered_nmx` used:
|
||||||
|
/// the un-suffixed `item_definition` (re-suffixed via
|
||||||
|
/// [`NmxReferenceRegistrationMessage::to_buffered_item_definition`]
|
||||||
|
/// on replay), the `item_context` (currently always empty —
|
||||||
|
/// reserved for the compat-server F35 split-context form), the
|
||||||
|
/// `item_handle` (currently always 0), and the rounded-up cadence
|
||||||
|
/// in milliseconds (informational; native MXAccess does not transmit
|
||||||
|
/// it on the wire — see capture 082).
|
||||||
|
Buffered {
|
||||||
|
/// Cadence in milliseconds, already rounded up to the nearest
|
||||||
|
/// 100 ms via [`crate::BufferedOptions::rounded_update_interval_ms`].
|
||||||
|
/// Carried through recovery so a future SetBufferedUpdateInterval
|
||||||
|
/// transmission could be wired without losing the original
|
||||||
|
/// cadence.
|
||||||
|
rounded_interval_ms: u32,
|
||||||
|
/// Un-suffixed item definition as supplied to
|
||||||
|
/// [`Session::subscribe_buffered`]. The `.property(buffer)`
|
||||||
|
/// suffix is re-applied on replay via
|
||||||
|
/// [`NmxReferenceRegistrationMessage::to_buffered_item_definition`].
|
||||||
|
item_definition: String,
|
||||||
|
/// Item context (compat-server split form). Empty when
|
||||||
|
/// `subscribe_buffered` is called directly with a single
|
||||||
|
/// `reference` argument.
|
||||||
|
item_context: String,
|
||||||
|
/// Item handle (LMX-side identifier). 0 when the compat-server
|
||||||
|
/// layer has not assigned one.
|
||||||
|
item_handle: i32,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
/// F16 — pluggable factory that produces a fresh [`NmxClient`].
|
/// F16 — pluggable factory that produces a fresh [`NmxClient`].
|
||||||
@@ -742,9 +803,14 @@ impl Session {
|
|||||||
/// server side knows about the same local callback exporter.
|
/// server side knows about the same local callback exporter.
|
||||||
/// 3. Re-run `SetHeartbeatSendInterval` if configured.
|
/// 3. Re-run `SetHeartbeatSendInterval` if configured.
|
||||||
/// 4. Walk the [`SessionInner::subscriptions`] registry and re-issue
|
/// 4. Walk the [`SessionInner::subscriptions`] registry and re-issue
|
||||||
/// `AdviseSupervisory` for every active subscription (each
|
/// the matching wire op for every active subscription. Plain
|
||||||
|
/// entries replay via `AdviseSupervisory`; buffered entries
|
||||||
|
/// (F45) replay via `RegisterReference` with the
|
||||||
|
/// `.property(buffer)` suffix + the saved correlation id +
|
||||||
|
/// `subscribe = true` — mirroring
|
||||||
|
/// `MxNativeSession.ReAdviseSubscription` (`cs:538-569`). Each
|
||||||
/// correlation_id is preserved so the consumer's `Subscription`
|
/// correlation_id is preserved so the consumer's `Subscription`
|
||||||
/// handle keeps receiving on its existing broadcast filter).
|
/// handle keeps receiving on its existing broadcast filter.
|
||||||
/// 5. Atomically swap the inner mutex's `NmxClient` so the old one
|
/// 5. Atomically swap the inner mutex's `NmxClient` so the old one
|
||||||
/// drops at end-of-scope.
|
/// drops at end-of-scope.
|
||||||
///
|
///
|
||||||
@@ -786,12 +852,19 @@ impl Session {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Step 4: replay every active subscription's AdviseSupervisory
|
// Step 4: replay every active subscription against the
|
||||||
// against the replacement transport. Snapshot under the lock,
|
// replacement transport. Plain entries → `AdviseSupervisory`;
|
||||||
// then drop the lock so subscribe()/un_advise() can race with
|
// buffered entries (F45) → `RegisterReference` with the
|
||||||
// recovery without deadlocking. The atomic swap below installs
|
// `.property(buffer)` suffix + saved correlation id +
|
||||||
// the replacement before we yield; after that, any new
|
// `subscribe = true`. Mirrors
|
||||||
// subscribe() call will see the registry+replacement pair.
|
// `MxNativeSession.ReAdviseSubscription` (`cs:538-569`) which
|
||||||
|
// branches on `subscription.IsBuffered`.
|
||||||
|
//
|
||||||
|
// Snapshot under the lock, then drop it so subscribe() /
|
||||||
|
// un_advise() can race with recovery without deadlocking. The
|
||||||
|
// atomic swap below installs the replacement before we yield;
|
||||||
|
// after that, any new subscribe() call will see the
|
||||||
|
// registry+replacement pair.
|
||||||
let snapshot: Vec<([u8; 16], SubscriptionEntry)> = {
|
let snapshot: Vec<([u8; 16], SubscriptionEntry)> = {
|
||||||
let registry = self.inner.subscriptions.lock().await;
|
let registry = self.inner.subscriptions.lock().await;
|
||||||
registry
|
registry
|
||||||
@@ -800,18 +873,68 @@ impl Session {
|
|||||||
.collect()
|
.collect()
|
||||||
};
|
};
|
||||||
for (correlation_id, entry) in &snapshot {
|
for (correlation_id, entry) in &snapshot {
|
||||||
let hr = replacement
|
match &entry.mode {
|
||||||
.advise_supervisory(
|
SubscriptionMode::Plain => {
|
||||||
opts.local_engine_id,
|
let hr = replacement
|
||||||
&entry.metadata,
|
.advise_supervisory(
|
||||||
*correlation_id,
|
opts.local_engine_id,
|
||||||
opts.galaxy_id,
|
&entry.metadata,
|
||||||
i32::from(opts.galaxy_id),
|
*correlation_id,
|
||||||
opts.source_platform_id,
|
opts.galaxy_id,
|
||||||
)
|
i32::from(opts.galaxy_id),
|
||||||
.await
|
opts.source_platform_id,
|
||||||
.map_err(map_nmx)?;
|
)
|
||||||
ensure_hresult_ok(hr)?;
|
.await
|
||||||
|
.map_err(map_nmx)?;
|
||||||
|
ensure_hresult_ok(hr)?;
|
||||||
|
}
|
||||||
|
SubscriptionMode::Buffered {
|
||||||
|
rounded_interval_ms: _rounded_interval_ms,
|
||||||
|
item_definition,
|
||||||
|
item_context,
|
||||||
|
item_handle,
|
||||||
|
} => {
|
||||||
|
// F45: rebuild the original buffered registration
|
||||||
|
// body. The codec helper re-applies the
|
||||||
|
// `.property(buffer)` suffix idempotently — passing
|
||||||
|
// an already-suffixed name returns it unchanged
|
||||||
|
// (verified by
|
||||||
|
// `request_to_buffered_item_definition_idempotent_case_insensitive`
|
||||||
|
// in `mxaccess-codec`).
|
||||||
|
let buffered_def =
|
||||||
|
NmxReferenceRegistrationMessage::to_buffered_item_definition(
|
||||||
|
item_definition,
|
||||||
|
)
|
||||||
|
.map_err(|e| {
|
||||||
|
Error::Configuration(ConfigError::InvalidArgument {
|
||||||
|
detail: format!(
|
||||||
|
"recovery: buffered item definition: {e}"
|
||||||
|
),
|
||||||
|
})
|
||||||
|
})?;
|
||||||
|
let registration = NmxReferenceRegistrationMessage {
|
||||||
|
item_handle: *item_handle,
|
||||||
|
item_correlation_id: *correlation_id,
|
||||||
|
item_definition: buffered_def,
|
||||||
|
item_context: item_context.clone(),
|
||||||
|
subscribe: true,
|
||||||
|
reserved_25_27: [0; 2],
|
||||||
|
reserved_31_55: [0; 24],
|
||||||
|
};
|
||||||
|
let hr = replacement
|
||||||
|
.register_reference(
|
||||||
|
opts.local_engine_id,
|
||||||
|
&entry.metadata,
|
||||||
|
®istration,
|
||||||
|
opts.galaxy_id,
|
||||||
|
i32::from(opts.galaxy_id),
|
||||||
|
opts.source_platform_id,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.map_err(map_nmx)?;
|
||||||
|
ensure_hresult_ok(hr)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Step 5: atomic swap. The previous NmxClient drops at end of
|
// Step 5: atomic swap. The previous NmxClient drops at end of
|
||||||
@@ -1106,6 +1229,7 @@ impl Session {
|
|||||||
correlation_id,
|
correlation_id,
|
||||||
SubscriptionEntry {
|
SubscriptionEntry {
|
||||||
metadata: Arc::clone(&metadata_arc),
|
metadata: Arc::clone(&metadata_arc),
|
||||||
|
mode: SubscriptionMode::Plain,
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
reg.len()
|
reg.len()
|
||||||
@@ -1165,8 +1289,10 @@ impl Session {
|
|||||||
// computed for parity with the .NET reference; it is currently
|
// computed for parity with the .NET reference; it is currently
|
||||||
// not transmitted on the wire because native MXAccess holds it
|
// not transmitted on the wire because native MXAccess holds it
|
||||||
// client-side only (see capture 082's missing
|
// client-side only (see capture 082's missing
|
||||||
// `SetBufferedUpdateInterval` frame).
|
// `SetBufferedUpdateInterval` frame). F45 stashes the rounded
|
||||||
let _rounded_ms = options.rounded_update_interval_ms();
|
// cadence on the registry entry so a future SetBufferedUpdateInterval
|
||||||
|
// transmission could be wired without re-deriving it on replay.
|
||||||
|
let rounded_ms = options.rounded_update_interval_ms();
|
||||||
|
|
||||||
let inner = self.inner.clone();
|
let inner = self.inner.clone();
|
||||||
let metadata = inner
|
let metadata = inner
|
||||||
@@ -1221,20 +1347,25 @@ impl Session {
|
|||||||
drop(nmx);
|
drop(nmx);
|
||||||
|
|
||||||
let metadata_arc = Arc::new(metadata);
|
let metadata_arc = Arc::new(metadata);
|
||||||
// Record the active subscription so recover_connection can replay
|
// F45: tag the registry entry as Buffered with the original
|
||||||
// it after a transport rebuild. The replay path currently uses
|
// `(item_definition, item_context, item_handle)` triple +
|
||||||
// `AdviseSupervisory` for every entry; for buffered subscriptions
|
// rounded cadence, so `recover_connection_core` can rebuild the
|
||||||
// that path is functionally equivalent (the LMX server already
|
// matching `NmxReferenceRegistrationMessage` and dispatch
|
||||||
// remembers the buffered registration via the `.property(buffer)`
|
// `register_reference` (mirrors `MxNativeSession.ReAdviseSubscription`
|
||||||
// suffix carried in the metadata's name). Tracked as a sub-followup
|
// `cs:540-558` — the .NET reference's recovery branch on
|
||||||
// — see `design/followups.md` if a future iteration wants to
|
// `subscription.IsBuffered`).
|
||||||
// re-issue `RegisterReference` instead.
|
|
||||||
let registry_size = {
|
let registry_size = {
|
||||||
let mut reg = inner.subscriptions.lock().await;
|
let mut reg = inner.subscriptions.lock().await;
|
||||||
reg.insert(
|
reg.insert(
|
||||||
correlation_id,
|
correlation_id,
|
||||||
SubscriptionEntry {
|
SubscriptionEntry {
|
||||||
metadata: Arc::clone(&metadata_arc),
|
metadata: Arc::clone(&metadata_arc),
|
||||||
|
mode: SubscriptionMode::Buffered {
|
||||||
|
rounded_interval_ms: rounded_ms,
|
||||||
|
item_definition: reference.to_string(),
|
||||||
|
item_context: String::new(),
|
||||||
|
item_handle: 0,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
reg.len()
|
reg.len()
|
||||||
@@ -2496,6 +2627,262 @@ mod tests {
|
|||||||
handle.await.unwrap();
|
handle.await.unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Per-RPC capture: `(opnum, stub_data)` for each Request PDU the
|
||||||
|
/// client dispatched against a [`recording_server`].
|
||||||
|
type RecordedRpc = (u16, Vec<u8>);
|
||||||
|
/// Shared, mutable log of [`RecordedRpc`]s appended to by a
|
||||||
|
/// [`recording_server`] task.
|
||||||
|
type RecordedRpcLog = Arc<StdMutex<Vec<RecordedRpc>>>;
|
||||||
|
|
||||||
|
/// Same as [`unauthenticated_server`] but additionally records every
|
||||||
|
/// incoming Request PDU's `(opnum, stub_data)` so a test can assert
|
||||||
|
/// which RPC the client dispatched. Used by the F45 buffered-recovery
|
||||||
|
/// branch test below.
|
||||||
|
async fn recording_server(
|
||||||
|
responses: Vec<(i32, Vec<u8>)>,
|
||||||
|
) -> (SocketAddr, RecordedRpcLog, tokio::task::JoinHandle<()>) {
|
||||||
|
let listener = TcpListener::bind(local_addr()).await.unwrap();
|
||||||
|
let addr = listener.local_addr().unwrap();
|
||||||
|
let recorded: RecordedRpcLog = Arc::new(StdMutex::new(Vec::new()));
|
||||||
|
let recorded_for_task = Arc::clone(&recorded);
|
||||||
|
let handle = tokio::spawn(async move {
|
||||||
|
let (mut sock, _) = listener.accept().await.unwrap();
|
||||||
|
// Drain Bind, reply BindAck.
|
||||||
|
let mut hdr = [0u8; 16];
|
||||||
|
sock.read_exact(&mut hdr).await.unwrap();
|
||||||
|
let bind_h = PduHeader::decode(&hdr).unwrap();
|
||||||
|
let mut body = vec![0u8; bind_h.fragment_length as usize - 16];
|
||||||
|
sock.read_exact(&mut body).await.unwrap();
|
||||||
|
let resp_h = PduHeader {
|
||||||
|
version: 5,
|
||||||
|
version_minor: 0,
|
||||||
|
packet_type: PacketType::BindAck,
|
||||||
|
packet_flags: 0x03,
|
||||||
|
data_representation: 0x10,
|
||||||
|
fragment_length: 16,
|
||||||
|
auth_length: 0,
|
||||||
|
call_id: bind_h.call_id,
|
||||||
|
};
|
||||||
|
let mut out = [0u8; 16];
|
||||||
|
resp_h.encode(&mut out).unwrap();
|
||||||
|
sock.write_all(&out).await.unwrap();
|
||||||
|
|
||||||
|
for (custom_hresult, extra_payload) in responses {
|
||||||
|
sock.read_exact(&mut hdr).await.unwrap();
|
||||||
|
let req_h = PduHeader::decode(&hdr).unwrap();
|
||||||
|
let mut body = vec![0u8; req_h.fragment_length as usize - 16];
|
||||||
|
sock.read_exact(&mut body).await.unwrap();
|
||||||
|
|
||||||
|
// body layout (Request PDU minus the 16-byte common
|
||||||
|
// header): 4 alloc_hint, 2 context_id, 2 opnum, then
|
||||||
|
// optional 16-byte object UUID (when PFC_OBJECT_UUID
|
||||||
|
// = 0x80 is set in packet_flags), then stub_data.
|
||||||
|
let opnum = u16::from_le_bytes([body[6], body[7]]);
|
||||||
|
let pfc_object_uuid = (req_h.packet_flags & 0x80) != 0;
|
||||||
|
let stub_offset = if pfc_object_uuid { 8 + 16 } else { 8 };
|
||||||
|
let stub = body[stub_offset..].to_vec();
|
||||||
|
recorded_for_task
|
||||||
|
.lock()
|
||||||
|
.unwrap()
|
||||||
|
.push((opnum, stub));
|
||||||
|
|
||||||
|
let mut stub_resp = Vec::new();
|
||||||
|
stub_resp.extend_from_slice(&OrpcThat::default().encode());
|
||||||
|
stub_resp.extend_from_slice(&custom_hresult.to_le_bytes());
|
||||||
|
stub_resp.extend_from_slice(&extra_payload);
|
||||||
|
|
||||||
|
let response = ResponsePdu {
|
||||||
|
header: PduHeader {
|
||||||
|
version: 5,
|
||||||
|
version_minor: 0,
|
||||||
|
packet_type: PacketType::Response,
|
||||||
|
packet_flags: 0x03,
|
||||||
|
data_representation: 0x10,
|
||||||
|
fragment_length: 0,
|
||||||
|
auth_length: 0,
|
||||||
|
call_id: req_h.call_id,
|
||||||
|
},
|
||||||
|
allocation_hint: stub_resp.len() as u32,
|
||||||
|
context_id: 0,
|
||||||
|
cancel_count: 0,
|
||||||
|
reserved23: 0,
|
||||||
|
stub_data: stub_resp,
|
||||||
|
};
|
||||||
|
let bytes = response.encode();
|
||||||
|
sock.write_all(&bytes).await.unwrap();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
(addr, recorded, handle)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn recover_connection_replays_buffered_subscription_via_register_reference() {
|
||||||
|
use crate::RecoveryPolicy;
|
||||||
|
// F45: a buffered SubscriptionEntry must replay through
|
||||||
|
// `RegisterReference` (with `.property(buffer)` suffix + the
|
||||||
|
// saved correlation id + `subscribe = true`) — NOT through
|
||||||
|
// `AdviseSupervisory`. Synthesise the entry directly so we don't
|
||||||
|
// need a live `subscribe_buffered` round-trip; drive the recovery
|
||||||
|
// path against a recording mock and inspect the wire bytes.
|
||||||
|
|
||||||
|
// Mock that the original session connection talked to. Drained
|
||||||
|
// immediately — connect_test_session doesn't issue any calls.
|
||||||
|
let (addr_orig, handle_orig) = unauthenticated_server(Vec::new()).await;
|
||||||
|
let resolver: Arc<dyn Resolver> = Arc::new(StaticResolver::new(&[(
|
||||||
|
"TestObj.TestInt",
|
||||||
|
sample_metadata(),
|
||||||
|
)]));
|
||||||
|
let session = connect_test_session(addr_orig, resolver).await.unwrap();
|
||||||
|
|
||||||
|
// Inject a buffered registry entry. Correlation id is fixed so
|
||||||
|
// we can assert the rebuilt registration carries the same id.
|
||||||
|
let cid: [u8; 16] = [0xAB; 16];
|
||||||
|
let buffered_ref = "TestObj.TestInt";
|
||||||
|
{
|
||||||
|
let mut reg = session.inner.subscriptions.lock().await;
|
||||||
|
reg.insert(
|
||||||
|
cid,
|
||||||
|
SubscriptionEntry {
|
||||||
|
metadata: Arc::new(sample_metadata()),
|
||||||
|
mode: SubscriptionMode::Buffered {
|
||||||
|
rounded_interval_ms: 1000,
|
||||||
|
item_definition: buffered_ref.to_string(),
|
||||||
|
item_context: String::new(),
|
||||||
|
item_handle: 0,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Recording server that the rebuild factory will hand the
|
||||||
|
// session a fresh NmxClient pointing at. The recovery loop fires
|
||||||
|
// exactly two RPCs against this transport:
|
||||||
|
// 1. RegisterEngine2 (HRESULT 0)
|
||||||
|
// 2. TransferData carrying the rebuilt RegisterReference
|
||||||
|
// (HRESULT 0)
|
||||||
|
// The assertions below decode the second stub and pin the
|
||||||
|
// envelope kind + inner registration shape.
|
||||||
|
let (addr_replacement, recorded, handle_replacement) =
|
||||||
|
recording_server(vec![(0, Vec::new()), (0, Vec::new())]).await;
|
||||||
|
|
||||||
|
let factory: crate::RebuildFactory = Arc::new(move || {
|
||||||
|
let addr = addr_replacement;
|
||||||
|
Box::pin(async move {
|
||||||
|
let mut transport = DceRpcTcpClient::connect(addr).await.map_err(|e| {
|
||||||
|
mxaccess_nmx::NmxClientError::Transport(
|
||||||
|
mxaccess_rpc::transport::TransportError::Io(std::io::Error::other(
|
||||||
|
e.to_string(),
|
||||||
|
)),
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
transport.bind(svc::INTERFACE_ID, 0, 0).await.map_err(|e| {
|
||||||
|
mxaccess_nmx::NmxClientError::Transport(
|
||||||
|
mxaccess_rpc::transport::TransportError::Io(std::io::Error::other(
|
||||||
|
e.to_string(),
|
||||||
|
)),
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
Ok(NmxClient::from_bound_transport(
|
||||||
|
transport,
|
||||||
|
Guid::new([0xCC; 16]),
|
||||||
|
))
|
||||||
|
})
|
||||||
|
});
|
||||||
|
session.set_recovery_factory(factory).await;
|
||||||
|
|
||||||
|
// Drive the recovery cycle.
|
||||||
|
session
|
||||||
|
.recover_connection(RecoveryPolicy {
|
||||||
|
max_attempts: 1,
|
||||||
|
delay: std::time::Duration::ZERO,
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// Inspect the recorded RPCs against the replacement server.
|
||||||
|
let recorded = recorded.lock().unwrap().clone();
|
||||||
|
assert_eq!(
|
||||||
|
recorded.len(),
|
||||||
|
2,
|
||||||
|
"expected RegisterEngine2 + TransferData(RegisterReference); got {}",
|
||||||
|
recorded.len()
|
||||||
|
);
|
||||||
|
|
||||||
|
// Slot 0 is RegisterEngine2 — pin only the opnum.
|
||||||
|
assert_eq!(
|
||||||
|
recorded[0].0,
|
||||||
|
mxaccess_rpc::nmx_service2_messages::REGISTER_ENGINE_2_OPNUM,
|
||||||
|
"first replay RPC should be RegisterEngine2"
|
||||||
|
);
|
||||||
|
|
||||||
|
// Slot 1 is TransferData carrying the rebuilt RegisterReference.
|
||||||
|
let (opnum, stub) = &recorded[1];
|
||||||
|
assert_eq!(
|
||||||
|
*opnum,
|
||||||
|
mxaccess_rpc::nmx_service2_messages::TRANSFER_DATA_OPNUM,
|
||||||
|
"buffered replay RPC must be TransferData (not, e.g., a no-op)"
|
||||||
|
);
|
||||||
|
|
||||||
|
// The transfer_data stub layout:
|
||||||
|
// 0..32 OrpcThis
|
||||||
|
// 32..36 remote_galaxy_id i32 LE
|
||||||
|
// 36..40 remote_platform_id i32 LE
|
||||||
|
// 40..44 remote_engine_id i32 LE
|
||||||
|
// 44..48 length i32 LE
|
||||||
|
// 48..52 max_count i32 LE
|
||||||
|
// 52..(52+L) message_body (NmxTransferEnvelope + inner)
|
||||||
|
let body_offset = 52;
|
||||||
|
let length = i32::from_le_bytes(stub[44..48].try_into().unwrap()) as usize;
|
||||||
|
let message_body = &stub[body_offset..body_offset + length];
|
||||||
|
|
||||||
|
// Envelope kind at offset 10 of the envelope; ItemControl = 2
|
||||||
|
// (the codec routes both AdviseSupervisory and RegisterReference
|
||||||
|
// through ItemControl envelopes — the inner command byte is
|
||||||
|
// what disambiguates).
|
||||||
|
let envelope_kind_i32 = i32::from_le_bytes(message_body[10..14].try_into().unwrap());
|
||||||
|
assert_eq!(
|
||||||
|
envelope_kind_i32,
|
||||||
|
NmxTransferMessageKind::ItemControl as i32,
|
||||||
|
"envelope kind must be ItemControl"
|
||||||
|
);
|
||||||
|
|
||||||
|
// Inner body starts after the 46-byte envelope. The first byte
|
||||||
|
// distinguishes AdviseSupervisory (0x1f) from RegisterReference
|
||||||
|
// (0x10) — F45 specifically requires the buffered branch to
|
||||||
|
// emit the 0x10 form.
|
||||||
|
let inner = &message_body[NmxTransferEnvelope::HEADER_LEN..];
|
||||||
|
assert_eq!(
|
||||||
|
inner[0], 0x10,
|
||||||
|
"buffered replay must use RegisterReference (command 0x10), not \
|
||||||
|
AdviseSupervisory (0x1f); got 0x{:02x}",
|
||||||
|
inner[0]
|
||||||
|
);
|
||||||
|
|
||||||
|
// Decode the inner registration and pin: correlation id matches
|
||||||
|
// the registry entry's, item_definition carries `.property(buffer)`,
|
||||||
|
// and `subscribe == true`.
|
||||||
|
let parsed = NmxReferenceRegistrationMessage::parse(inner).unwrap();
|
||||||
|
assert_eq!(
|
||||||
|
parsed.item_correlation_id, cid,
|
||||||
|
"rebuilt registration must carry the original correlation id"
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
parsed
|
||||||
|
.item_definition
|
||||||
|
.to_lowercase()
|
||||||
|
.ends_with(".property(buffer)"),
|
||||||
|
"rebuilt item_definition must end with .property(buffer); got {:?}",
|
||||||
|
parsed.item_definition
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
parsed.subscribe,
|
||||||
|
"rebuilt registration must have subscribe = true"
|
||||||
|
);
|
||||||
|
|
||||||
|
handle_replacement.await.unwrap();
|
||||||
|
handle_orig.abort();
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn recover_connection_after_shutdown_returns_engine_not_registered() {
|
async fn recover_connection_after_shutdown_returns_engine_not_registered() {
|
||||||
use crate::RecoveryPolicy;
|
use crate::RecoveryPolicy;
|
||||||
|
|||||||
Reference in New Issue
Block a user