[F45] mxaccess: recovery replay re-issues RegisterReference for buffered subs
`Session::recover_connection_core` previously walked
`SessionInner::subscriptions` and replayed every entry via
`AdviseSupervisory`, which lost the `.property(buffer)` registration
on buffered subscriptions — silently downgrading buffered → plain on
transport rebuild.
Fix:
- New `pub(crate) enum SubscriptionMode { Plain, Buffered { ... } }`
discriminator carried on each `SubscriptionEntry`. Buffered variant
retains the un-suffixed reference + the rounded interval (so the
re-issued buffered registration matches the original cadence) +
the empty `item_context` / zero `item_handle` matching the wire
send.
- `Session::subscribe` (plain path) records `SubscriptionMode::Plain`.
`subscribe_buffered_nmx` records `SubscriptionMode::Buffered { ... }`.
- `recover_connection_core` matches on `entry.mode`. Plain branch
unchanged. Buffered branch re-applies `.property(buffer)` via
`to_buffered_item_definition` (idempotent), rebuilds the original
`NmxReferenceRegistrationMessage` with the saved correlation id +
`subscribe = true`, and dispatches `register_reference` (kind=
ItemControl, inner command 0x10) against the replacement
transport. Mirrors `MxNativeSession.ReAdviseSubscription`
(`MxNativeSession.cs:538-569`).
New unit test `recover_connection_replays_buffered_subscription_via_
register_reference` synthesises a buffered registry entry, installs a
`RebuildFactory` pointing at a recording NMX server, drives
`recover_connection`, then asserts the recorded `TransferData` carries
inner command `0x10` (NOT `0x1f`) with the `.property(buffer)`-
suffixed item_definition + the saved correlation id + subscribe=true.
Side-finding worth filing separately: `Session::unsubscribe`
unconditionally calls `un_advise` for both plain and buffered
entries, but the .NET reference's `Unsubscribe`
(`MxNativeSession.cs:361-381`) skips `UnAdvise` for buffered
(`if (!subscription.IsBuffered)`). Out of scope for F45 (recovery-
only); will file as F47.
Public API unchanged. `SubscriptionMode` + `SubscriptionEntry` stay
`pub(crate)` — `cargo public-api -p mxaccess` baseline is unchanged.
Workspace 793 → 794 tests; clippy clean; rustdoc clean.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -372,10 +372,71 @@ pub struct SessionInner {
|
||||
/// Per-subscription state retained for [`Session::recover_connection`].
|
||||
/// The full `Subscription` handle stays with the consumer and continues
|
||||
/// to receive broadcasts across the swap; this struct just preserves
|
||||
/// the inputs `advise_supervisory` needs.
|
||||
/// the inputs the recovery loop needs to reissue the original advise.
|
||||
///
|
||||
/// `mode` discriminates the original advise shape (plain
|
||||
/// `AdviseSupervisory` vs buffered `RegisterReference` with the
|
||||
/// `.property(buffer)` suffix) so the recovery branch can re-issue
|
||||
/// the matching wire op — see F45 in `design/followups.md` for the
|
||||
/// motivation. Mirrors `MxNativeSubscription.IsBuffered`
|
||||
/// (`MxNativeSession.cs:59`) + the `ReAdviseSubscription` branch
|
||||
/// (`MxNativeSession.cs:538-569`).
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct SubscriptionEntry {
|
||||
pub(crate) metadata: Arc<mxaccess_galaxy::GalaxyTagMetadata>,
|
||||
pub(crate) mode: SubscriptionMode,
|
||||
}
|
||||
|
||||
/// Discriminator for [`SubscriptionEntry`] — captures the original
|
||||
/// advise shape so the recovery loop re-issues the same wire op.
|
||||
///
|
||||
/// - [`SubscriptionMode::Plain`] entries replay via
|
||||
/// `INmxService2::AdviseSupervisory` (matches the original
|
||||
/// `Session::subscribe` path).
|
||||
/// - [`SubscriptionMode::Buffered`] entries replay via
|
||||
/// `INmxService2::RegisterReference` with the `.property(buffer)`-
|
||||
/// suffixed item definition + the saved correlation id +
|
||||
/// `subscribe = true` (matches the original
|
||||
/// `Session::subscribe_buffered` path; mirrors
|
||||
/// `MxNativeSession.ReAdviseSubscription` `cs:538-558`).
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) enum SubscriptionMode {
|
||||
/// Plain `AdviseSupervisory`-issued subscription. Recovery replays
|
||||
/// it via `advise_supervisory`.
|
||||
Plain,
|
||||
/// Buffered `RegisterReference`-issued subscription. Recovery
|
||||
/// rebuilds the original `NmxReferenceRegistrationMessage` and
|
||||
/// dispatches `register_reference` so the server-side buffered
|
||||
/// registration survives the transport swap.
|
||||
///
|
||||
/// The fields preserve the inputs `subscribe_buffered_nmx` used:
|
||||
/// the un-suffixed `item_definition` (re-suffixed via
|
||||
/// [`NmxReferenceRegistrationMessage::to_buffered_item_definition`]
|
||||
/// on replay), the `item_context` (currently always empty —
|
||||
/// reserved for the compat-server F35 split-context form), the
|
||||
/// `item_handle` (currently always 0), and the rounded-up cadence
|
||||
/// in milliseconds (informational; native MXAccess does not transmit
|
||||
/// it on the wire — see capture 082).
|
||||
Buffered {
|
||||
/// Cadence in milliseconds, already rounded up to the nearest
|
||||
/// 100 ms via [`crate::BufferedOptions::rounded_update_interval_ms`].
|
||||
/// Carried through recovery so a future SetBufferedUpdateInterval
|
||||
/// transmission could be wired without losing the original
|
||||
/// cadence.
|
||||
rounded_interval_ms: u32,
|
||||
/// Un-suffixed item definition as supplied to
|
||||
/// [`Session::subscribe_buffered`]. The `.property(buffer)`
|
||||
/// suffix is re-applied on replay via
|
||||
/// [`NmxReferenceRegistrationMessage::to_buffered_item_definition`].
|
||||
item_definition: String,
|
||||
/// Item context (compat-server split form). Empty when
|
||||
/// `subscribe_buffered` is called directly with a single
|
||||
/// `reference` argument.
|
||||
item_context: String,
|
||||
/// Item handle (LMX-side identifier). 0 when the compat-server
|
||||
/// layer has not assigned one.
|
||||
item_handle: i32,
|
||||
},
|
||||
}
|
||||
|
||||
/// F16 — pluggable factory that produces a fresh [`NmxClient`].
|
||||
@@ -742,9 +803,14 @@ impl Session {
|
||||
/// server side knows about the same local callback exporter.
|
||||
/// 3. Re-run `SetHeartbeatSendInterval` if configured.
|
||||
/// 4. Walk the [`SessionInner::subscriptions`] registry and re-issue
|
||||
/// `AdviseSupervisory` for every active subscription (each
|
||||
/// the matching wire op for every active subscription. Plain
|
||||
/// entries replay via `AdviseSupervisory`; buffered entries
|
||||
/// (F45) replay via `RegisterReference` with the
|
||||
/// `.property(buffer)` suffix + the saved correlation id +
|
||||
/// `subscribe = true` — mirroring
|
||||
/// `MxNativeSession.ReAdviseSubscription` (`cs:538-569`). Each
|
||||
/// correlation_id is preserved so the consumer's `Subscription`
|
||||
/// handle keeps receiving on its existing broadcast filter).
|
||||
/// handle keeps receiving on its existing broadcast filter.
|
||||
/// 5. Atomically swap the inner mutex's `NmxClient` so the old one
|
||||
/// drops at end-of-scope.
|
||||
///
|
||||
@@ -786,12 +852,19 @@ impl Session {
|
||||
}
|
||||
}
|
||||
|
||||
// Step 4: replay every active subscription's AdviseSupervisory
|
||||
// against the replacement transport. Snapshot under the lock,
|
||||
// then drop the lock so subscribe()/un_advise() can race with
|
||||
// recovery without deadlocking. The atomic swap below installs
|
||||
// the replacement before we yield; after that, any new
|
||||
// subscribe() call will see the registry+replacement pair.
|
||||
// Step 4: replay every active subscription against the
|
||||
// replacement transport. Plain entries → `AdviseSupervisory`;
|
||||
// buffered entries (F45) → `RegisterReference` with the
|
||||
// `.property(buffer)` suffix + saved correlation id +
|
||||
// `subscribe = true`. Mirrors
|
||||
// `MxNativeSession.ReAdviseSubscription` (`cs:538-569`) which
|
||||
// branches on `subscription.IsBuffered`.
|
||||
//
|
||||
// Snapshot under the lock, then drop it so subscribe() /
|
||||
// un_advise() can race with recovery without deadlocking. The
|
||||
// atomic swap below installs the replacement before we yield;
|
||||
// after that, any new subscribe() call will see the
|
||||
// registry+replacement pair.
|
||||
let snapshot: Vec<([u8; 16], SubscriptionEntry)> = {
|
||||
let registry = self.inner.subscriptions.lock().await;
|
||||
registry
|
||||
@@ -800,18 +873,68 @@ impl Session {
|
||||
.collect()
|
||||
};
|
||||
for (correlation_id, entry) in &snapshot {
|
||||
let hr = replacement
|
||||
.advise_supervisory(
|
||||
opts.local_engine_id,
|
||||
&entry.metadata,
|
||||
*correlation_id,
|
||||
opts.galaxy_id,
|
||||
i32::from(opts.galaxy_id),
|
||||
opts.source_platform_id,
|
||||
)
|
||||
.await
|
||||
.map_err(map_nmx)?;
|
||||
ensure_hresult_ok(hr)?;
|
||||
match &entry.mode {
|
||||
SubscriptionMode::Plain => {
|
||||
let hr = replacement
|
||||
.advise_supervisory(
|
||||
opts.local_engine_id,
|
||||
&entry.metadata,
|
||||
*correlation_id,
|
||||
opts.galaxy_id,
|
||||
i32::from(opts.galaxy_id),
|
||||
opts.source_platform_id,
|
||||
)
|
||||
.await
|
||||
.map_err(map_nmx)?;
|
||||
ensure_hresult_ok(hr)?;
|
||||
}
|
||||
SubscriptionMode::Buffered {
|
||||
rounded_interval_ms: _rounded_interval_ms,
|
||||
item_definition,
|
||||
item_context,
|
||||
item_handle,
|
||||
} => {
|
||||
// F45: rebuild the original buffered registration
|
||||
// body. The codec helper re-applies the
|
||||
// `.property(buffer)` suffix idempotently — passing
|
||||
// an already-suffixed name returns it unchanged
|
||||
// (verified by
|
||||
// `request_to_buffered_item_definition_idempotent_case_insensitive`
|
||||
// in `mxaccess-codec`).
|
||||
let buffered_def =
|
||||
NmxReferenceRegistrationMessage::to_buffered_item_definition(
|
||||
item_definition,
|
||||
)
|
||||
.map_err(|e| {
|
||||
Error::Configuration(ConfigError::InvalidArgument {
|
||||
detail: format!(
|
||||
"recovery: buffered item definition: {e}"
|
||||
),
|
||||
})
|
||||
})?;
|
||||
let registration = NmxReferenceRegistrationMessage {
|
||||
item_handle: *item_handle,
|
||||
item_correlation_id: *correlation_id,
|
||||
item_definition: buffered_def,
|
||||
item_context: item_context.clone(),
|
||||
subscribe: true,
|
||||
reserved_25_27: [0; 2],
|
||||
reserved_31_55: [0; 24],
|
||||
};
|
||||
let hr = replacement
|
||||
.register_reference(
|
||||
opts.local_engine_id,
|
||||
&entry.metadata,
|
||||
®istration,
|
||||
opts.galaxy_id,
|
||||
i32::from(opts.galaxy_id),
|
||||
opts.source_platform_id,
|
||||
)
|
||||
.await
|
||||
.map_err(map_nmx)?;
|
||||
ensure_hresult_ok(hr)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Step 5: atomic swap. The previous NmxClient drops at end of
|
||||
@@ -1106,6 +1229,7 @@ impl Session {
|
||||
correlation_id,
|
||||
SubscriptionEntry {
|
||||
metadata: Arc::clone(&metadata_arc),
|
||||
mode: SubscriptionMode::Plain,
|
||||
},
|
||||
);
|
||||
reg.len()
|
||||
@@ -1165,8 +1289,10 @@ impl Session {
|
||||
// computed for parity with the .NET reference; it is currently
|
||||
// not transmitted on the wire because native MXAccess holds it
|
||||
// client-side only (see capture 082's missing
|
||||
// `SetBufferedUpdateInterval` frame).
|
||||
let _rounded_ms = options.rounded_update_interval_ms();
|
||||
// `SetBufferedUpdateInterval` frame). F45 stashes the rounded
|
||||
// cadence on the registry entry so a future SetBufferedUpdateInterval
|
||||
// transmission could be wired without re-deriving it on replay.
|
||||
let rounded_ms = options.rounded_update_interval_ms();
|
||||
|
||||
let inner = self.inner.clone();
|
||||
let metadata = inner
|
||||
@@ -1221,20 +1347,25 @@ impl Session {
|
||||
drop(nmx);
|
||||
|
||||
let metadata_arc = Arc::new(metadata);
|
||||
// Record the active subscription so recover_connection can replay
|
||||
// it after a transport rebuild. The replay path currently uses
|
||||
// `AdviseSupervisory` for every entry; for buffered subscriptions
|
||||
// that path is functionally equivalent (the LMX server already
|
||||
// remembers the buffered registration via the `.property(buffer)`
|
||||
// suffix carried in the metadata's name). Tracked as a sub-followup
|
||||
// — see `design/followups.md` if a future iteration wants to
|
||||
// re-issue `RegisterReference` instead.
|
||||
// F45: tag the registry entry as Buffered with the original
|
||||
// `(item_definition, item_context, item_handle)` triple +
|
||||
// rounded cadence, so `recover_connection_core` can rebuild the
|
||||
// matching `NmxReferenceRegistrationMessage` and dispatch
|
||||
// `register_reference` (mirrors `MxNativeSession.ReAdviseSubscription`
|
||||
// `cs:540-558` — the .NET reference's recovery branch on
|
||||
// `subscription.IsBuffered`).
|
||||
let registry_size = {
|
||||
let mut reg = inner.subscriptions.lock().await;
|
||||
reg.insert(
|
||||
correlation_id,
|
||||
SubscriptionEntry {
|
||||
metadata: Arc::clone(&metadata_arc),
|
||||
mode: SubscriptionMode::Buffered {
|
||||
rounded_interval_ms: rounded_ms,
|
||||
item_definition: reference.to_string(),
|
||||
item_context: String::new(),
|
||||
item_handle: 0,
|
||||
},
|
||||
},
|
||||
);
|
||||
reg.len()
|
||||
@@ -2496,6 +2627,262 @@ mod tests {
|
||||
handle.await.unwrap();
|
||||
}
|
||||
|
||||
/// Per-RPC capture: `(opnum, stub_data)` for each Request PDU the
|
||||
/// client dispatched against a [`recording_server`].
|
||||
type RecordedRpc = (u16, Vec<u8>);
|
||||
/// Shared, mutable log of [`RecordedRpc`]s appended to by a
|
||||
/// [`recording_server`] task.
|
||||
type RecordedRpcLog = Arc<StdMutex<Vec<RecordedRpc>>>;
|
||||
|
||||
/// Same as [`unauthenticated_server`] but additionally records every
|
||||
/// incoming Request PDU's `(opnum, stub_data)` so a test can assert
|
||||
/// which RPC the client dispatched. Used by the F45 buffered-recovery
|
||||
/// branch test below.
|
||||
async fn recording_server(
|
||||
responses: Vec<(i32, Vec<u8>)>,
|
||||
) -> (SocketAddr, RecordedRpcLog, tokio::task::JoinHandle<()>) {
|
||||
let listener = TcpListener::bind(local_addr()).await.unwrap();
|
||||
let addr = listener.local_addr().unwrap();
|
||||
let recorded: RecordedRpcLog = Arc::new(StdMutex::new(Vec::new()));
|
||||
let recorded_for_task = Arc::clone(&recorded);
|
||||
let handle = tokio::spawn(async move {
|
||||
let (mut sock, _) = listener.accept().await.unwrap();
|
||||
// Drain Bind, reply BindAck.
|
||||
let mut hdr = [0u8; 16];
|
||||
sock.read_exact(&mut hdr).await.unwrap();
|
||||
let bind_h = PduHeader::decode(&hdr).unwrap();
|
||||
let mut body = vec![0u8; bind_h.fragment_length as usize - 16];
|
||||
sock.read_exact(&mut body).await.unwrap();
|
||||
let resp_h = PduHeader {
|
||||
version: 5,
|
||||
version_minor: 0,
|
||||
packet_type: PacketType::BindAck,
|
||||
packet_flags: 0x03,
|
||||
data_representation: 0x10,
|
||||
fragment_length: 16,
|
||||
auth_length: 0,
|
||||
call_id: bind_h.call_id,
|
||||
};
|
||||
let mut out = [0u8; 16];
|
||||
resp_h.encode(&mut out).unwrap();
|
||||
sock.write_all(&out).await.unwrap();
|
||||
|
||||
for (custom_hresult, extra_payload) in responses {
|
||||
sock.read_exact(&mut hdr).await.unwrap();
|
||||
let req_h = PduHeader::decode(&hdr).unwrap();
|
||||
let mut body = vec![0u8; req_h.fragment_length as usize - 16];
|
||||
sock.read_exact(&mut body).await.unwrap();
|
||||
|
||||
// body layout (Request PDU minus the 16-byte common
|
||||
// header): 4 alloc_hint, 2 context_id, 2 opnum, then
|
||||
// optional 16-byte object UUID (when PFC_OBJECT_UUID
|
||||
// = 0x80 is set in packet_flags), then stub_data.
|
||||
let opnum = u16::from_le_bytes([body[6], body[7]]);
|
||||
let pfc_object_uuid = (req_h.packet_flags & 0x80) != 0;
|
||||
let stub_offset = if pfc_object_uuid { 8 + 16 } else { 8 };
|
||||
let stub = body[stub_offset..].to_vec();
|
||||
recorded_for_task
|
||||
.lock()
|
||||
.unwrap()
|
||||
.push((opnum, stub));
|
||||
|
||||
let mut stub_resp = Vec::new();
|
||||
stub_resp.extend_from_slice(&OrpcThat::default().encode());
|
||||
stub_resp.extend_from_slice(&custom_hresult.to_le_bytes());
|
||||
stub_resp.extend_from_slice(&extra_payload);
|
||||
|
||||
let response = ResponsePdu {
|
||||
header: PduHeader {
|
||||
version: 5,
|
||||
version_minor: 0,
|
||||
packet_type: PacketType::Response,
|
||||
packet_flags: 0x03,
|
||||
data_representation: 0x10,
|
||||
fragment_length: 0,
|
||||
auth_length: 0,
|
||||
call_id: req_h.call_id,
|
||||
},
|
||||
allocation_hint: stub_resp.len() as u32,
|
||||
context_id: 0,
|
||||
cancel_count: 0,
|
||||
reserved23: 0,
|
||||
stub_data: stub_resp,
|
||||
};
|
||||
let bytes = response.encode();
|
||||
sock.write_all(&bytes).await.unwrap();
|
||||
}
|
||||
});
|
||||
(addr, recorded, handle)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn recover_connection_replays_buffered_subscription_via_register_reference() {
|
||||
use crate::RecoveryPolicy;
|
||||
// F45: a buffered SubscriptionEntry must replay through
|
||||
// `RegisterReference` (with `.property(buffer)` suffix + the
|
||||
// saved correlation id + `subscribe = true`) — NOT through
|
||||
// `AdviseSupervisory`. Synthesise the entry directly so we don't
|
||||
// need a live `subscribe_buffered` round-trip; drive the recovery
|
||||
// path against a recording mock and inspect the wire bytes.
|
||||
|
||||
// Mock that the original session connection talked to. Drained
|
||||
// immediately — connect_test_session doesn't issue any calls.
|
||||
let (addr_orig, handle_orig) = unauthenticated_server(Vec::new()).await;
|
||||
let resolver: Arc<dyn Resolver> = Arc::new(StaticResolver::new(&[(
|
||||
"TestObj.TestInt",
|
||||
sample_metadata(),
|
||||
)]));
|
||||
let session = connect_test_session(addr_orig, resolver).await.unwrap();
|
||||
|
||||
// Inject a buffered registry entry. Correlation id is fixed so
|
||||
// we can assert the rebuilt registration carries the same id.
|
||||
let cid: [u8; 16] = [0xAB; 16];
|
||||
let buffered_ref = "TestObj.TestInt";
|
||||
{
|
||||
let mut reg = session.inner.subscriptions.lock().await;
|
||||
reg.insert(
|
||||
cid,
|
||||
SubscriptionEntry {
|
||||
metadata: Arc::new(sample_metadata()),
|
||||
mode: SubscriptionMode::Buffered {
|
||||
rounded_interval_ms: 1000,
|
||||
item_definition: buffered_ref.to_string(),
|
||||
item_context: String::new(),
|
||||
item_handle: 0,
|
||||
},
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
// Recording server that the rebuild factory will hand the
|
||||
// session a fresh NmxClient pointing at. The recovery loop fires
|
||||
// exactly two RPCs against this transport:
|
||||
// 1. RegisterEngine2 (HRESULT 0)
|
||||
// 2. TransferData carrying the rebuilt RegisterReference
|
||||
// (HRESULT 0)
|
||||
// The assertions below decode the second stub and pin the
|
||||
// envelope kind + inner registration shape.
|
||||
let (addr_replacement, recorded, handle_replacement) =
|
||||
recording_server(vec![(0, Vec::new()), (0, Vec::new())]).await;
|
||||
|
||||
let factory: crate::RebuildFactory = Arc::new(move || {
|
||||
let addr = addr_replacement;
|
||||
Box::pin(async move {
|
||||
let mut transport = DceRpcTcpClient::connect(addr).await.map_err(|e| {
|
||||
mxaccess_nmx::NmxClientError::Transport(
|
||||
mxaccess_rpc::transport::TransportError::Io(std::io::Error::other(
|
||||
e.to_string(),
|
||||
)),
|
||||
)
|
||||
})?;
|
||||
transport.bind(svc::INTERFACE_ID, 0, 0).await.map_err(|e| {
|
||||
mxaccess_nmx::NmxClientError::Transport(
|
||||
mxaccess_rpc::transport::TransportError::Io(std::io::Error::other(
|
||||
e.to_string(),
|
||||
)),
|
||||
)
|
||||
})?;
|
||||
Ok(NmxClient::from_bound_transport(
|
||||
transport,
|
||||
Guid::new([0xCC; 16]),
|
||||
))
|
||||
})
|
||||
});
|
||||
session.set_recovery_factory(factory).await;
|
||||
|
||||
// Drive the recovery cycle.
|
||||
session
|
||||
.recover_connection(RecoveryPolicy {
|
||||
max_attempts: 1,
|
||||
delay: std::time::Duration::ZERO,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Inspect the recorded RPCs against the replacement server.
|
||||
let recorded = recorded.lock().unwrap().clone();
|
||||
assert_eq!(
|
||||
recorded.len(),
|
||||
2,
|
||||
"expected RegisterEngine2 + TransferData(RegisterReference); got {}",
|
||||
recorded.len()
|
||||
);
|
||||
|
||||
// Slot 0 is RegisterEngine2 — pin only the opnum.
|
||||
assert_eq!(
|
||||
recorded[0].0,
|
||||
mxaccess_rpc::nmx_service2_messages::REGISTER_ENGINE_2_OPNUM,
|
||||
"first replay RPC should be RegisterEngine2"
|
||||
);
|
||||
|
||||
// Slot 1 is TransferData carrying the rebuilt RegisterReference.
|
||||
let (opnum, stub) = &recorded[1];
|
||||
assert_eq!(
|
||||
*opnum,
|
||||
mxaccess_rpc::nmx_service2_messages::TRANSFER_DATA_OPNUM,
|
||||
"buffered replay RPC must be TransferData (not, e.g., a no-op)"
|
||||
);
|
||||
|
||||
// The transfer_data stub layout:
|
||||
// 0..32 OrpcThis
|
||||
// 32..36 remote_galaxy_id i32 LE
|
||||
// 36..40 remote_platform_id i32 LE
|
||||
// 40..44 remote_engine_id i32 LE
|
||||
// 44..48 length i32 LE
|
||||
// 48..52 max_count i32 LE
|
||||
// 52..(52+L) message_body (NmxTransferEnvelope + inner)
|
||||
let body_offset = 52;
|
||||
let length = i32::from_le_bytes(stub[44..48].try_into().unwrap()) as usize;
|
||||
let message_body = &stub[body_offset..body_offset + length];
|
||||
|
||||
// Envelope kind at offset 10 of the envelope; ItemControl = 2
|
||||
// (the codec routes both AdviseSupervisory and RegisterReference
|
||||
// through ItemControl envelopes — the inner command byte is
|
||||
// what disambiguates).
|
||||
let envelope_kind_i32 = i32::from_le_bytes(message_body[10..14].try_into().unwrap());
|
||||
assert_eq!(
|
||||
envelope_kind_i32,
|
||||
NmxTransferMessageKind::ItemControl as i32,
|
||||
"envelope kind must be ItemControl"
|
||||
);
|
||||
|
||||
// Inner body starts after the 46-byte envelope. The first byte
|
||||
// distinguishes AdviseSupervisory (0x1f) from RegisterReference
|
||||
// (0x10) — F45 specifically requires the buffered branch to
|
||||
// emit the 0x10 form.
|
||||
let inner = &message_body[NmxTransferEnvelope::HEADER_LEN..];
|
||||
assert_eq!(
|
||||
inner[0], 0x10,
|
||||
"buffered replay must use RegisterReference (command 0x10), not \
|
||||
AdviseSupervisory (0x1f); got 0x{:02x}",
|
||||
inner[0]
|
||||
);
|
||||
|
||||
// Decode the inner registration and pin: correlation id matches
|
||||
// the registry entry's, item_definition carries `.property(buffer)`,
|
||||
// and `subscribe == true`.
|
||||
let parsed = NmxReferenceRegistrationMessage::parse(inner).unwrap();
|
||||
assert_eq!(
|
||||
parsed.item_correlation_id, cid,
|
||||
"rebuilt registration must carry the original correlation id"
|
||||
);
|
||||
assert!(
|
||||
parsed
|
||||
.item_definition
|
||||
.to_lowercase()
|
||||
.ends_with(".property(buffer)"),
|
||||
"rebuilt item_definition must end with .property(buffer); got {:?}",
|
||||
parsed.item_definition
|
||||
);
|
||||
assert!(
|
||||
parsed.subscribe,
|
||||
"rebuilt registration must have subscribe = true"
|
||||
);
|
||||
|
||||
handle_replacement.await.unwrap();
|
||||
handle_orig.abort();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn recover_connection_after_shutdown_returns_engine_not_registered() {
|
||||
use crate::RecoveryPolicy;
|
||||
|
||||
Reference in New Issue
Block a user