[R3/R4 Path A] mxaccess: port Lmx.dll FUN_10100ce0 synthesizer kernel
rust / build / test / clippy / fmt (push) Has been cancelled
rust / cargo public-api drift check (F41) (push) Has been cancelled

Path A landed for R3/R4. The byte->MxStatus synthesizer in Lmx.dll is
FUN_10100ce0 (`analysis/ghidra/exports/Lmx.dll.synthesizer-helpers2-decompile.md`),
a 4-byte u32 LE -> 4-tuple MxStatus decoder used by every NMX-frame
parser in Lmx.dll. The kernel is byte-deterministic and context-free,
so it ports as a pure function -- the operation-tracking state
machine the original verdict deferred is NOT required for synthesis.

Bit layout (per FUN_10100ce0 lines 21-24):
  bit 31:        success    (-1 if set, 0 if clear)
  bits 27..24:   category   (4 bits)
  bits 23..20:   detected_by (4 bits)
  bits 15..0:    detail     (i16 -- low 16 bits, signed)
  bits 30..28, 19..16: reserved/padding

Codec changes:
- MxStatus::from_packed_u32() / ::to_packed_u32() -- the kernel +
  inverse for round-trip parity.
- MxStatus::from_nmx_response_code() -- the constructed-from-response-
  code switch in FUN_1010bd10:741-770 (six proven mappings: 0x01, 0x02
  -> CommunicationError + RequestingNmx; 0x03 -> ConfigurationError +
  RequestingNmx; 0x04 -> ConfigurationError + RespondingNmx; 0x05 ->
  CommunicationError + RespondingNmx; 0x1A -> CommunicationError +
  RequestingNmx).
- MxStatusCategory / MxStatusSource: from_i16/to_i16 promoted to const
  fn so MxStatus::from_packed_u32 can be const.
- NmxOperationStatusMessage::try_parse_process_data_received_body() --
  thin wrapper that peels the outer NmxObservedEnvelope before
  delegating to try_parse_inner. Mirrors
  NmxOperationStatusMessage.TryParseProcessDataReceivedBody (.NET cs:20-32).
- NmxOperationStatusMessage::promote_to_typed() -- entry point that
  returns the existing Status field. Documented as a no-op pass-through
  for now (the 5-byte inner-body wire shape is NOT the same field as
  the 4-byte packed-u32 the kernel decodes); kept for API symmetry.
- 22 new round-trip tests covering the kernel, the response-code
  switch, the proven 0x00/0x41/0xEF completion bytes, and round-trip
  for every canonical sentinel.

mxaccess (Session) changes:
- New OperationKind enum (Write/WriteSecured/Read/Subscribe/
  Unsubscribe/Activate/Suspend/Other).
- New OperationContext struct (correlation_id, op_kind, reference,
  retry_count) -- ground for the F54 follow-on per-operation
  correlation work.
- New OperationStatus event type {raw, status, context,
  is_during_recovery}, mirroring MxNativeOperationStatusEvent (cs:73-78)
  with the typed-MxStatus addition.
- Session::operation_status_events() -> broadcast::Receiver<Arc<
  OperationStatus>> + operation_status_stream() Stream variant.
- callback_router() now tries operation-status parsing first, falling
  through to subscription messages -- matches MxNativeSession
  .OnCallbackReceived dispatch order (cs:574,582,590).
- recover_connection() flips a recovery_active counter (Arc<AtomicU32>
  shared with the router) so OperationStatus.is_during_recovery is
  populated correctly. Mirrors MxNativeSession._recoveryActive
  Volatile.Read at cs:573.
- 3 new router tests covering: status-word frame dispatch + typed
  promotion to WriteCompleteOk; completion-only frames stay verbatim;
  is_during_recovery is stamped from the live counter.

Per-operation context tracking (correlating completion frames back to
outstanding writes/subscribes via the correlation_id) is filed as F54
in design/followups.md. The synthesizer kernel itself is byte-
deterministic, so the kernel and the correlation work are decoupled.

Ghidra evidence (the next-ring xref walk beyond FUN_10114a90):
- analysis/ghidra/exports/Lmx.dll.set-attribute-result-xrefs.md --
  xrefs to OnSetAttributeResult / CancelWithStatus / OperationComplete.
- analysis/ghidra/exports/Lmx.dll.vtable-data-xrefs.md -- vtable-slot
  data xrefs for the virtual-dispatch path.
- analysis/ghidra/exports/Lmx.dll.synthesizer-decompile.md --
  ScanOnDemandCallback::OperationComplete/MultipleOperationComplete
  (FUN_1010b990), RemotePlatformResolver::OperationComplete
  (FUN_1010dc80), and the constructed-from-responseCode synthesizer
  in FUN_1010bd10 (lines 698-770). FUN_1010bd10 is the wire-frame
  receiver that drives the synthesis.
- analysis/ghidra/exports/Lmx.dll.synthesizer-helpers-decompile.md --
  FUN_10003fc0 (the <success %d category %d ...> formatter; confirms
  the 4-tuple layout), FUN_1008f150 (dispatch helper).
- analysis/ghidra/exports/Lmx.dll.synthesizer-helpers2-decompile.md --
  FUN_10100ce0 (the kernel itself), FUN_10100bc0 (3xu16 reader),
  FUN_1005e580 (4-byte stream reader), FUN_1010ee00 (sister NMX-frame
  parser using the same kernel).
- analysis/ghidra/exports/Lmx.dll.synthesizer-callers-xrefs.md --
  caller graph; confirms the kernel is called from many wire-frame
  parsers but each parser shares the single 4-byte decoder.

R3/R4 verdict updated in design/70-risks-and-open-questions.md from
"settled at verbatim-preserve" to "settled per Path A". F54 filed in
design/followups.md for the per-operation correlation work.

cargo build / test / clippy -D warnings / RUSTDOCFLAGS=-D warnings doc
all clean. cargo public-api baselines regenerated for mxaccess and
mxaccess-codec.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-06 07:08:36 -04:00
parent 460c61df43
commit c73a33edd8
17 changed files with 4962 additions and 135 deletions
@@ -39,8 +39,7 @@ use std::alloc::{GlobalAlloc, Layout, System};
use std::sync::atomic::{AtomicU64, Ordering};
use mxaccess_codec::{
MxReferenceHandle, NmxSubscriptionMessage, write_message,
write_message::WriteValue,
MxReferenceHandle, NmxSubscriptionMessage, write_message, write_message::WriteValue,
};
// ---- counting allocator -------------------------------------------------
@@ -218,19 +217,9 @@ fn bench_subscription_decode() -> Row {
fn bench_handle_from_names() -> Row {
measure("MxReferenceHandle::from_names", 10_000, || {
let h = MxReferenceHandle::from_names(
0,
1,
2,
3,
"TestChildObject",
0,
1,
0,
"TestInt",
false,
)
.unwrap();
let h =
MxReferenceHandle::from_names(0, 1, 2, 3, "TestChildObject", 0, 1, 0, "TestInt", false)
.unwrap();
std::hint::black_box(h);
})
}
@@ -21,11 +21,33 @@
//! [`NmxOperationStatusMessage::try_parse_inner`] is provided here. When
//! `NmxObservedEnvelope` lands, add `try_parse_process_data_received_body` as
//! a thin wrapper.
//!
//! ## Typed promotion and the synthesizer kernel
//!
//! [`NmxOperationStatusMessage::promote_to_typed`] returns the same
//! [`MxStatus`] the parser already attached to the message — the
//! verbatim-preserve placeholder for unknown shapes, the
//! [`MxStatus::WRITE_COMPLETE_OK`] sentinel for the proven
//! `(status_code=0x8050, completion_code=0x00)` shape. The 5-byte
//! `00 00 SS SS CC` inner body is **not** the same wire field as the
//! 4-byte packed status word `Lmx.dll!FUN_10100ce0` decodes
//! ([`MxStatus::from_packed_u32`]) — that kernel applies one layer up,
//! to the `INmxService.GetResponse2` payload's `status: i32` field
//! (carried e.g. in subscription records). See
//! `analysis/ghidra/exports/Lmx.dll.synthesizer-helpers2-decompile.md`
//! and `design/70-risks-and-open-questions.md` R3/R4 Path A for the
//! evidence chain.
//!
//! `promote_to_typed` is therefore a thin convenience over the existing
//! `status` field: callers that want the canonical bit-layout decoder
//! should reach for [`MxStatus::from_packed_u32`] directly when they
//! have a 4-byte packed value in hand.
// Direct byte indexing — see reference_handle.rs for rationale.
#![allow(clippy::indexing_slicing)]
use crate::error::CodecError;
use crate::observed_frame::NmxObservedEnvelope;
use crate::status::{MxStatus, MxStatusCategory, MxStatusSource};
/// Which of the two recognised inner-frame shapes was decoded
@@ -78,6 +100,47 @@ impl NmxOperationStatusMessage {
&& self.completion_code == 0x00
}
/// Return the typed [`MxStatus`] for this frame.
///
/// This is a thin convenience over [`Self::status`] — same value,
/// no transformation. Provided for API symmetry with
/// [`MxStatus::from_packed_u32`] (the canonical 4-byte synthesizer
/// kernel) and to give consumers a single entry point that can
/// be extended in future revisions if new evidence pins additional
/// `(status_code, completion_code)` shapes.
///
/// **What this method does NOT do:** apply the
/// `Lmx.dll!FUN_10100ce0` synthesizer to the 5-byte inner body.
/// The 5-byte `00 00 SS SS CC` shape and the 4-byte packed-u32
/// shape are different wire fields at different layers — see the
/// module docs and
/// `design/70-risks-and-open-questions.md` R3/R4 Path A. Callers
/// holding a 4-byte packed `MxStatus` (e.g. extracted from a
/// subscription record's `status: i32`) should call
/// [`MxStatus::from_packed_u32`] directly.
#[must_use]
pub const fn promote_to_typed(&self) -> MxStatus {
self.status
}
/// Peel the outer [`NmxObservedEnvelope`] off a `ProcessDataReceived`
/// payload and parse the inner body. Mirrors
/// `NmxOperationStatusMessage.TryParseProcessDataReceivedBody`
/// (`NmxOperationStatusMessage.cs:20-32`).
///
/// # Errors
///
/// Returns `Err` when the outer envelope cannot be parsed or the
/// inner body matches no recognised shape (1- or 5-byte completion
/// frame). The .NET reference returns `false` and a `null!`
/// out-param in both cases; the Rust port surfaces a typed
/// [`CodecError`] so callers can distinguish "not a process-data
/// frame" from "successfully parsed".
pub fn try_parse_process_data_received_body(body: &[u8]) -> Result<Self, CodecError> {
let envelope = NmxObservedEnvelope::parse_process_data_received_body_flexible(body)?;
Self::try_parse_inner(&envelope.inner_body)
}
/// Parse an inner body — either 1 byte (`CompletionOnly`) or 5 bytes
/// (`StatusWord` with leading `00 00`).
///
@@ -281,4 +344,38 @@ mod tests {
let msg = NmxOperationStatusMessage::try_parse_inner(&frame).unwrap();
assert_eq!(msg.status_code, 0xBBAA);
}
#[test]
fn promote_to_typed_returns_existing_status_for_status_word() {
// The proven shape — must keep returning the canonical sentinel.
let frame = [0x00, 0x00, 0x50, 0x80, 0x00];
let msg = NmxOperationStatusMessage::try_parse_inner(&frame).unwrap();
assert_eq!(msg.promote_to_typed(), MxStatus::WRITE_COMPLETE_OK);
assert_eq!(msg.promote_to_typed(), msg.status);
}
#[test]
fn promote_to_typed_returns_verbatim_status_for_completion_only() {
// 1-byte frames: no synthesizer evidence — must stay verbatim.
for byte in [0x00_u8, 0x41, 0xEF] {
let msg = NmxOperationStatusMessage::try_parse_inner(&[byte]).unwrap();
let promoted = msg.promote_to_typed();
assert_eq!(promoted, msg.status);
assert_eq!(promoted.category, MxStatusCategory::Unknown);
assert_eq!(promoted.detected_by, MxStatusSource::Unknown);
assert_eq!(promoted.detail, i16::from(byte));
}
}
#[test]
fn promote_to_typed_does_not_change_existing_status_field() {
// promote_to_typed must not mutate the verbatim-preserve `status`
// field. This guards the byte-for-byte parity contract with the
// .NET reference.
let frame = [0x00, 0x00, 0x55, 0xAA, 0x33];
let msg = NmxOperationStatusMessage::try_parse_inner(&frame).unwrap();
let original_status = msg.status;
let _typed = msg.promote_to_typed();
assert_eq!(msg.status, original_status);
}
}
+328 -4
View File
@@ -22,7 +22,7 @@ pub enum MxStatusCategory {
}
impl MxStatusCategory {
pub fn from_i16(value: i16) -> Self {
pub const fn from_i16(value: i16) -> Self {
match value {
0 => Self::Ok,
1 => Self::Pending,
@@ -37,7 +37,7 @@ impl MxStatusCategory {
}
}
pub fn to_i16(self) -> i16 {
pub const fn to_i16(self) -> i16 {
self as i16
}
}
@@ -59,7 +59,7 @@ pub enum MxStatusSource {
}
impl MxStatusSource {
pub fn from_i16(value: i16) -> Self {
pub const fn from_i16(value: i16) -> Self {
match value {
0 => Self::RequestingLmx,
1 => Self::RespondingLmx,
@@ -71,7 +71,7 @@ impl MxStatusSource {
}
}
pub fn to_i16(self) -> i16 {
pub const fn to_i16(self) -> i16 {
self as i16
}
}
@@ -85,6 +85,135 @@ pub struct MxStatus {
}
impl MxStatus {
/// Decode a 4-byte packed `MxStatus` word.
///
/// Mirrors the canonical NMX wire-frame status decoder
/// `Lmx.dll!FUN_10100ce0` (see
/// `analysis/ghidra/exports/Lmx.dll.synthesizer-helpers2-decompile.md`).
/// That function reads 4 bytes from a stream into a u32 and unpacks
/// them via the bit layout:
///
/// ```text
/// bit 31: success (-1 if set, 0 if clear)
/// bits 27..24: category (4 bits, masked by 0xF)
/// bits 23..20: detected_by (4 bits, masked by 0xF)
/// bits 15..0: detail (i16 — low 16 bits, signed)
/// bits 30..28, 19..16: reserved/padding (ignored)
/// ```
///
/// This is the **synthesizer kernel** documented in
/// `design/70-risks-and-open-questions.md` R3/R4 Path A. Every NMX
/// wire frame that carries a status word emits one of these 4-byte
/// packings; the consumer-side dispatch (retry counters, callback
/// fan-out) is layered on top of the decoded `MxStatus`, but the
/// decoder itself is byte-deterministic and context-free.
///
/// The `success` field is normalized to either `0` or `-1` per the
/// native `Lmx.dll` semantics: any value with bit 31 set decodes to
/// `-1`, any value with bit 31 clear decodes to `0`. (Native code:
/// `*param_1 = -(ushort)(((uint)param_2 & 0x80000000) != 0)`.)
///
/// Unknown category / detected_by codes (i.e. a 4-bit value that
/// does not match a documented [`MxStatusCategory`] /
/// [`MxStatusSource`] variant) decode to the corresponding
/// `Unknown` variant. The padding bits are silently discarded.
#[must_use]
pub const fn from_packed_u32(packed: u32) -> Self {
// Bit layout — see fn doc.
let success: i16 = if packed & 0x8000_0000 != 0 { -1 } else { 0 };
let category_bits = ((packed >> 24) & 0xF) as i16;
let detected_by_bits = ((packed >> 20) & 0xF) as i16;
let detail = packed as i16;
Self {
success,
category: MxStatusCategory::from_i16(category_bits),
detected_by: MxStatusSource::from_i16(detected_by_bits),
detail,
}
}
/// Construct an `MxStatus` from a single-byte NMX response code.
///
/// Mirrors the synthesis switch in
/// `Lmx.dll!FUN_1010bd10` (`ScanOnDemandCallback::GetResponse`)
/// at lines 741-770 of
/// `analysis/ghidra/exports/Lmx.dll.synthesizer-decompile.md`.
/// When the NMX `responseCode` is non-zero (no payload status word
/// to parse), `Lmx.dll` constructs an `MxStatus` from the response
/// code itself using this fixed mapping:
///
/// | responseCode | category | detected_by |
/// |---|---|---|
/// | `0x01`, `0x02` | `CommunicationError` | `RequestingNmx` |
/// | `0x03` | `ConfigurationError` | `RequestingNmx` |
/// | `0x04` | `ConfigurationError` | `RespondingNmx` |
/// | `0x05` | `CommunicationError` | `RespondingNmx` |
/// | `0x1A` | `CommunicationError` | `RequestingNmx` |
///
/// `success` is `0` (not `-1`) and `detail` carries the response
/// code unchanged. Unmapped codes return `None` — the native code's
/// `default` branch leaves the synthesized status untouched, so the
/// caller falls back to a verbatim raw-byte placeholder per
/// `design/70-risks-and-open-questions.md` R3/R4.
///
/// This is **not** the same wire field as the 1-byte completion
/// frames `0x00`/`0x41`/`0xEF` parsed by
/// [`crate::NmxOperationStatusMessage::try_parse_inner`]: those
/// live inside a `0x32`/`0x33` callback body, while this
/// `responseCode` is the second `out` parameter of
/// `INmxService.GetResponse2(...)` (one layer up the stack).
/// `Lmx.dll`'s decoder for the 1-byte completion frames does not
/// apply this synthesis.
#[must_use]
pub const fn from_nmx_response_code(response_code: u8) -> Option<Self> {
// Per `FUN_1010bd10:741-770` switch.
let (category, detected_by) = match response_code {
0x01 | 0x02 => (
MxStatusCategory::CommunicationError,
MxStatusSource::RequestingNmx,
),
0x03 => (
MxStatusCategory::ConfigurationError,
MxStatusSource::RequestingNmx,
),
0x04 => (
MxStatusCategory::ConfigurationError,
MxStatusSource::RespondingNmx,
),
0x05 => (
MxStatusCategory::CommunicationError,
MxStatusSource::RespondingNmx,
),
0x1A => (
MxStatusCategory::CommunicationError,
MxStatusSource::RequestingNmx,
),
_ => return None,
};
Some(Self {
success: 0,
category,
detected_by,
detail: response_code as i16,
})
}
/// Pack `self` back into the 4-byte NMX wire layout. Inverse of
/// [`Self::from_packed_u32`]. Useful for round-trip tests and
/// future encoder paths.
///
/// Padding bits (30..28, 19..16) are emitted as zero. Bit 31 mirrors
/// `success != 0` — any non-zero `success` round-trips to `-1`
/// because the decoder normalizes to `0`/`-1` only.
#[must_use]
pub const fn to_packed_u32(self) -> u32 {
let success_bit: u32 = if self.success != 0 { 0x8000_0000 } else { 0 };
let category_bits = ((self.category as i16) as u32 & 0xF) << 24;
let detected_by_bits = ((self.detected_by as i16) as u32 & 0xF) << 20;
let detail_bits = (self.detail as u16) as u32;
success_bit | category_bits | detected_by_bits | detail_bits
}
/// `(success=-1, Ok, RequestingLmx, detail=0)` — `MxStatus.DataChangeOk`
/// from `MxStatus.cs:36-40`.
pub const DATA_CHANGE_OK: Self = Self {
@@ -311,4 +440,199 @@ mod tests {
assert!(!MxStatus::SUSPEND_PENDING.is_ok());
assert!(!MxStatus::INVALID_REFERENCE_CONFIGURATION.is_ok());
}
#[test]
fn from_packed_u32_zero_decodes_to_all_zeros() {
// packed=0 → success=0, category=Ok(0), detected_by=RequestingLmx(0), detail=0.
// The "all zeros" status is the simplest data-change-pending shape
// the wire can carry.
let s = MxStatus::from_packed_u32(0);
assert_eq!(s.success, 0);
assert_eq!(s.category, MxStatusCategory::Ok);
assert_eq!(s.detected_by, MxStatusSource::RequestingLmx);
assert_eq!(s.detail, 0);
}
#[test]
fn from_packed_u32_high_bit_sets_success_to_negative_one() {
// Native: `*param_1 = -(ushort)(((uint)param_2 & 0x80000000) != 0)`
// For packed=0x80000000, success=-1, all other fields 0.
let s = MxStatus::from_packed_u32(0x8000_0000);
assert_eq!(s.success, -1);
assert_eq!(s.category, MxStatusCategory::Ok);
assert_eq!(s.detected_by, MxStatusSource::RequestingLmx);
assert_eq!(s.detail, 0);
}
#[test]
fn from_packed_u32_decodes_data_change_ok_layout() {
// `MxStatus::DATA_CHANGE_OK` = (success=-1, Ok=0, RequestingLmx=0,
// detail=0). Pack: bit31=1, bits27..24=0, bits23..20=0, bits15..0=0.
// → 0x80000000.
let packed = MxStatus::DATA_CHANGE_OK.to_packed_u32();
assert_eq!(packed, 0x8000_0000);
let round_trip = MxStatus::from_packed_u32(packed);
assert_eq!(round_trip, MxStatus::DATA_CHANGE_OK);
}
#[test]
fn from_packed_u32_decodes_write_complete_ok_layout() {
// `MxStatus::WRITE_COMPLETE_OK` = (success=-1, Ok=0,
// RespondingAutomationObject=5, detail=0). Pack: bit31=1,
// bits27..24=0 (Ok), bits23..20=5, bits15..0=0.
// → 0x80500000.
let expected_packed: u32 = 0x80_50_00_00;
let s = MxStatus::from_packed_u32(expected_packed);
assert_eq!(s, MxStatus::WRITE_COMPLETE_OK);
assert_eq!(MxStatus::WRITE_COMPLETE_OK.to_packed_u32(), expected_packed);
}
#[test]
fn from_packed_u32_extracts_category_from_bits_24_to_27() {
// category=4 (ConfigurationError) at bits 24..27.
// → 0x04000000.
let s = MxStatus::from_packed_u32(0x0400_0000);
assert_eq!(s.category, MxStatusCategory::ConfigurationError);
assert_eq!(s.detected_by, MxStatusSource::RequestingLmx);
assert_eq!(s.detail, 0);
}
#[test]
fn from_packed_u32_extracts_detected_by_from_bits_20_to_23() {
// detected_by=2 (RequestingNmx) at bits 20..23.
// → 0x00200000.
let s = MxStatus::from_packed_u32(0x0020_0000);
assert_eq!(s.category, MxStatusCategory::Ok);
assert_eq!(s.detected_by, MxStatusSource::RequestingNmx);
assert_eq!(s.detail, 0);
}
#[test]
fn from_packed_u32_extracts_detail_as_signed_low_16_bits() {
// detail=21 ("Invalid reference") at bits 0..15.
// → 0x00000015.
let s = MxStatus::from_packed_u32(0x0000_0015);
assert_eq!(s.detail, 21);
assert_eq!(s.detail_text(), Some("Invalid reference"));
// Negative detail — high bit of low-16 set: 0xFFFF → -1.
let s = MxStatus::from_packed_u32(0x0000_FFFF);
assert_eq!(s.detail, -1);
}
#[test]
fn from_packed_u32_padding_bits_are_ignored() {
// Bits 30..28 and 19..16 are padding/reserved per `FUN_10100ce0`.
// Setting them should not affect any decoded field.
// bit 31: success
// bits 30..28: padding (0x70_00_00_00)
// bits 27..24: category
// bits 23..20: detected_by
// bits 19..16: padding (0x00_0F_00_00)
// bits 15..0: detail
// Padding-only mask: 0x70_00_00_00 | 0x00_0F_00_00 = 0x700F_0000.
let with_padding = MxStatus::from_packed_u32(0x700F_0000);
let without_padding = MxStatus::from_packed_u32(0x0000_0000);
assert_eq!(with_padding, without_padding);
}
#[test]
fn from_packed_u32_unknown_category_decodes_to_unknown_variant() {
// Category bits = 0xF (not a defined variant).
// → 0x0F000000.
let s = MxStatus::from_packed_u32(0x0F00_0000);
assert_eq!(s.category, MxStatusCategory::Unknown);
}
#[test]
fn from_packed_u32_unknown_detected_by_decodes_to_unknown_variant() {
// detected_by bits = 0xF (not a defined variant).
// → 0x00F00000.
let s = MxStatus::from_packed_u32(0x00F0_0000);
assert_eq!(s.detected_by, MxStatusSource::Unknown);
}
#[test]
fn round_trip_canonical_sentinels() {
// Every canonical sentinel must round-trip through pack→decode.
for &expected in &[
MxStatus::DATA_CHANGE_OK,
MxStatus::WRITE_COMPLETE_OK,
MxStatus::ACTIVATE_OK,
// SuspendPending: detail=0, success=-1, Pending=1, RequestingLmx=0.
// → 0x81000000.
MxStatus::SUSPEND_PENDING,
// InvalidReferenceConfiguration: success=0, ConfigError=4,
// RequestingLmx=0, detail=6. → 0x04000006.
MxStatus::INVALID_REFERENCE_CONFIGURATION,
] {
let packed = expected.to_packed_u32();
let round_trip = MxStatus::from_packed_u32(packed);
assert_eq!(round_trip, expected, "round-trip failed for {expected:?}");
}
}
#[test]
fn from_nmx_response_code_proven_mappings() {
// Per `FUN_1010bd10:741-770` switch.
// 0x01, 0x02 → CommunicationError + RequestingNmx
for code in [0x01_u8, 0x02] {
let s = MxStatus::from_nmx_response_code(code).unwrap();
assert_eq!(s.success, 0);
assert_eq!(s.category, MxStatusCategory::CommunicationError);
assert_eq!(s.detected_by, MxStatusSource::RequestingNmx);
assert_eq!(s.detail, i16::from(code));
}
// 0x03 → ConfigurationError + RequestingNmx
let s = MxStatus::from_nmx_response_code(0x03).unwrap();
assert_eq!(s.category, MxStatusCategory::ConfigurationError);
assert_eq!(s.detected_by, MxStatusSource::RequestingNmx);
assert_eq!(s.detail, 3);
// 0x04 → ConfigurationError + RespondingNmx
let s = MxStatus::from_nmx_response_code(0x04).unwrap();
assert_eq!(s.category, MxStatusCategory::ConfigurationError);
assert_eq!(s.detected_by, MxStatusSource::RespondingNmx);
assert_eq!(s.detail, 4);
// 0x05 → CommunicationError + RespondingNmx
let s = MxStatus::from_nmx_response_code(0x05).unwrap();
assert_eq!(s.category, MxStatusCategory::CommunicationError);
assert_eq!(s.detected_by, MxStatusSource::RespondingNmx);
assert_eq!(s.detail, 5);
// 0x1A → CommunicationError + RequestingNmx
let s = MxStatus::from_nmx_response_code(0x1A).unwrap();
assert_eq!(s.category, MxStatusCategory::CommunicationError);
assert_eq!(s.detected_by, MxStatusSource::RequestingNmx);
assert_eq!(s.detail, 0x1A);
}
#[test]
fn from_nmx_response_code_unmapped_returns_none() {
// Codes outside the proven {1,2,3,4,5,0x1a} set return None — the
// native code falls through `default` and leaves the synthesized
// status untouched. Per `design/70-risks-and-open-questions.md`
// R3/R4 the consumer must preserve the raw byte verbatim.
for code in [0x00_u8, 0x06, 0x10, 0x19, 0x1B, 0x41, 0xEF, 0xFF] {
assert!(
MxStatus::from_nmx_response_code(code).is_none(),
"response code 0x{code:02X} should be unmapped"
);
}
}
#[test]
fn to_packed_u32_normalizes_arbitrary_success_to_high_bit_only() {
// The decoder produces `success ∈ {0, -1}`, so `to_packed_u32`
// only checks `success != 0` — the actual integer doesn't
// matter beyond zero/non-zero.
let mut s = MxStatus::DATA_CHANGE_OK;
s.success = 42; // Non-canonical value.
let packed = s.to_packed_u32();
assert_eq!(packed & 0x8000_0000, 0x8000_0000);
// Round-trip normalizes to -1.
assert_eq!(MxStatus::from_packed_u32(packed).success, -1);
}
}
+39 -14
View File
@@ -131,7 +131,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// -- Subscribe-flow ----------------------------------------------------
if env.run_subscribe {
eprintln!("creating subscription [canonical XML CreateSubscription] (max_queue=100, sample=1s)");
eprintln!(
"creating subscription [canonical XML CreateSubscription] (max_queue=100, sample=1s)"
);
// SampleInterval is in **milliseconds** on the wire — the .NET
// reference's `MxAsbDataClient.CreateSubscription` /
// `AddMonitoredItems` default is `ulong sampleInterval = 1000`
@@ -140,7 +142,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// poll would always come back empty.
let sample_interval_ms: u64 = 1000;
let max_queue_size: i64 = 100;
let sub_response = match client.create_subscription(max_queue_size, sample_interval_ms).await {
let sub_response = match client
.create_subscription(max_queue_size, sample_interval_ms)
.await
{
Ok(r) => r,
Err(e) => {
eprintln!(" create_subscription failed: {e}");
@@ -165,11 +170,16 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
)];
eprintln!("adding monitored items [canonical XML AddMonitoredItems]");
let add = match client.add_monitored_items(sub_response.subscription_id, &monitored, true).await {
let add = match client
.add_monitored_items(sub_response.subscription_id, &monitored, true)
.await
{
Ok(r) => r,
Err(e) => {
eprintln!(" add_monitored_items failed: {e}");
let _ = client.delete_subscription(sub_response.subscription_id).await;
let _ = client
.delete_subscription(sub_response.subscription_id)
.await;
eprintln!("disconnecting");
client.disconnect().await?;
client.send_end().await?;
@@ -184,17 +194,24 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
add.status.first().map(|s| s.error_code).unwrap_or(0),
);
eprintln!("publishing [canonical XML Publish] (target {} polls × 5s)", env.subscribe_count);
eprintln!(
"publishing [canonical XML Publish] (target {} polls × 5s)",
env.subscribe_count
);
let mut total_values = 0usize;
for poll in 0..env.subscribe_count {
match tokio::time::timeout(
Duration::from_secs(5),
client.publish(sub_response.subscription_id),
).await {
)
.await
{
Ok(Ok(resp)) => {
eprintln!(
" poll {poll}: {} value(s); result_code={:?} success={:?}",
resp.values.len(), resp.result_code, resp.success
resp.values.len(),
resp.result_code,
resp.success
);
for v in &resp.values {
total_values += 1;
@@ -204,9 +221,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
v.value.value
);
}
if resp.result_code
== Some(mxaccess_asb::RESULT_CODE_INVALID_CONNECTION_ID)
{
if resp.result_code == Some(mxaccess_asb::RESULT_CODE_INVALID_CONNECTION_ID) {
eprintln!(" publish surfaced InvalidConnectionId; bailing the loop");
break;
}
@@ -235,12 +250,18 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
// -- DeleteMonitoredItems / DeleteSubscription
if let Err(e) = client.delete_monitored_items(sub_response.subscription_id, &monitored).await {
if let Err(e) = client
.delete_monitored_items(sub_response.subscription_id, &monitored)
.await
{
eprintln!("delete_monitored_items failed: {e}");
} else {
eprintln!("delete_monitored_items ok [canonical XML DeleteMonitoredItems]");
}
if let Err(e) = client.delete_subscription(sub_response.subscription_id).await {
if let Err(e) = client
.delete_subscription(sub_response.subscription_id)
.await
{
eprintln!("delete_subscription failed: {e}");
} else {
eprintln!("delete_subscription ok [canonical XML DeleteSubscription]");
@@ -290,8 +311,12 @@ impl LiveEnv {
let via_uri =
std::env::var("MX_ASB_VIA").unwrap_or_else(|_| format!("net.tcp://{host}/ASBService"));
let tag = std::env::var("MX_TEST_TAG").unwrap_or_else(|_| "TestChildObject.TestInt".into());
let run_write = std::env::var("MX_RUN_WRITE").map(|v| v != "0").unwrap_or(true);
let run_subscribe = std::env::var("MX_RUN_SUBSCRIBE").map(|v| v != "0").unwrap_or(true);
let run_write = std::env::var("MX_RUN_WRITE")
.map(|v| v != "0")
.unwrap_or(true);
let run_subscribe = std::env::var("MX_RUN_SUBSCRIBE")
.map(|v| v != "0")
.unwrap_or(true);
let subscribe_count = std::env::var("MX_SUBSCRIBE_COUNT")
.ok()
.and_then(|s| s.parse().ok())
+13 -11
View File
@@ -60,7 +60,7 @@ use mxaccess_asb::{
};
use mxaccess_asb_nettcp::auth::CryptoParameters;
use tokio::net::TcpStream;
use tokio::sync::{mpsc, Mutex};
use tokio::sync::{Mutex, mpsc};
use tokio::task::JoinHandle;
use tokio_stream::wrappers::ReceiverStream;
@@ -410,17 +410,13 @@ async fn publish_loop<F, Fut>(
// on every Publish poll while values are still
// delivered, so blanket "bail on any non-zero"
// (the original F33 fix) was too aggressive.
if response.result_code
== Some(mxaccess_asb::RESULT_CODE_INVALID_CONNECTION_ID)
{
if response.result_code == Some(mxaccess_asb::RESULT_CODE_INVALID_CONNECTION_ID) {
let _ = tx
.send(Err(Error::Connection(
ConnectionError::TransportFailure {
detail: "publish returned InvalidConnectionId — \
.send(Err(Error::Connection(ConnectionError::TransportFailure {
detail: "publish returned InvalidConnectionId — \
session desynced, terminating stream"
.to_string(),
},
)))
.to_string(),
})))
.await;
return;
}
@@ -609,7 +605,13 @@ mod tests {
let calls_clone = calls.clone();
let publish_fn = move || {
calls_clone.fetch_add(1, Ordering::Relaxed);
async move { Ok(fake_response(vec![fake_value(7), fake_value(8), fake_value(9)])) }
async move {
Ok(fake_response(vec![
fake_value(7),
fake_value(8),
fake_value(9),
]))
}
};
// Drop the receiver immediately — first send triggers exit.
drop(rx);
+4 -2
View File
@@ -39,7 +39,7 @@ pub use transport_asb::AsbTransport;
pub use mxaccess_galaxy::{GalaxyTagMetadata, Resolver, ResolverError};
pub use mxaccess_nmx::WriteValue;
pub use session::{RebuildFactory, Subscription};
pub use session::{OperationContext, OperationKind, OperationStatus, RebuildFactory, Subscription};
/// Async session façade. Cheap clones share the inner state; drop of the last
/// clone fires `UnregisterEngine` best-effort. For deterministic shutdown,
@@ -391,7 +391,9 @@ pub enum ConfigError {
/// `Session::recover_connection` was called without a
/// [`crate::RebuildFactory`] installed via
/// [`crate::Session::set_recovery_factory`]. F16.
#[error("recover_connection: no rebuild factory installed (call Session::set_recovery_factory)")]
#[error(
"recover_connection: no rebuild factory installed (call Session::set_recovery_factory)"
)]
RecoveryNotConfigured,
}
+487 -55
View File
@@ -34,7 +34,8 @@ use std::time::SystemTime;
use mxaccess_callback::{CallbackEvent, CallbackExporter, ExporterIdentities};
use mxaccess_codec::{
MxStatus, NmxReferenceRegistrationMessage, NmxSubscriptionMessage, NmxSubscriptionRecord,
MxStatus, NmxOperationStatusMessage, NmxReferenceRegistrationMessage, NmxSubscriptionMessage,
NmxSubscriptionRecord,
};
use mxaccess_galaxy::{GalaxyTagMetadata, Resolver, ResolverError};
use mxaccess_nmx::{NmxClient, NmxClientError, WriteValue};
@@ -51,7 +52,7 @@ use tokio_stream::wrappers::BroadcastStream;
use crate::metrics as session_metrics;
use crate::{DataChange, RecoveryEvent};
use futures_util::Stream;
use futures_util::{Stream, StreamExt};
/// Capacity of the broadcast channel that fans out
/// [`RecoveryEvent`]s to consumers via [`Session::recovery_events`].
@@ -77,6 +78,124 @@ use crate::{
/// either keep up or accept lag-loss.
const CALLBACK_BROADCAST_CAPACITY: usize = 256;
/// Capacity of the broadcast channel that fans out parsed
/// [`OperationStatus`] events to consumers via
/// [`Session::operation_status_events`].
///
/// Operation-status frames are bursty (one per write completion / one
/// per subscription state change) but lower-volume than data updates.
/// Picked to absorb a short burst without dropping for a briefly slow
/// consumer.
const OPERATION_STATUS_BROADCAST_CAPACITY: usize = 64;
/// Operation kind associated with an outstanding RPC. Mirrors the
/// distinct request paths the .NET reference tracks across
/// `MxNativeSession.{WriteAsync, WriteSecuredAsync, ReadAsync,
/// SubscribeAsync, UnsubscribeAsync, ActivateAsync, SuspendAsync}`.
///
/// The Rust port uses this to enrich [`OperationStatus`] events with
/// the originating call's intent — the synthesizer kernel
/// ([`MxStatus::from_packed_u32`]) is byte-deterministic and does NOT
/// depend on `OperationKind`, but consumers often want to filter
/// "write completions" from "subscription state changes" without
/// peeking at the raw frame bytes.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum OperationKind {
/// Plain `Write` (`MxNativeSession.WriteAsync`).
Write,
/// `WriteSecured` / `WriteSecured2` (two-token writes; see R6).
WriteSecured,
/// `Read` (read-as-subscribe pattern at `cs:312-359`).
Read,
/// `Subscribe` / `AdviseSupervisory` / `RegisterReference`.
Subscribe,
/// `Unsubscribe` / `UnAdvise`.
Unsubscribe,
/// `Activate` (re-enable a suspended subscription).
Activate,
/// `Suspend` (pause an active subscription).
Suspend,
/// Operation kind unknown to the Rust port — surfaced as a
/// fallback when the originating call doesn't fit a typed variant
/// (e.g. raw transport-level operations).
Other,
}
/// Per-operation context tracked for outstanding RPCs.
///
/// The Rust port currently uses this struct only to enrich
/// [`OperationStatus`] events surfaced via
/// [`Session::operation_status_events`]. Future work
/// (`design/70-risks-and-open-questions.md` R3/R4 Path A follow-on)
/// will let the consumer correlate completion frames back to specific
/// outstanding write/subscribe calls; the current bring-up always
/// emits `OperationStatus.context = None` because the operation→
/// completion correlation channel is not yet wired.
///
/// Mirrors the bookkeeping `MxNativeSession` does in its private
/// `_pendingWrites` / `_pendingReads` dictionaries (referenced
/// in the source but not exposed publicly).
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct OperationContext {
/// 16-byte correlation id the originating call generated. For
/// subscribe/unsubscribe this matches `Subscription::correlation_id`;
/// for write/read this is the request's `correlationId` field.
pub correlation_id: [u8; 16],
/// Intent of the originating operation — see [`OperationKind`].
pub op_kind: OperationKind,
/// Reference string (`Object.Attribute`) the operation targets,
/// when known. `None` for operations that don't carry one (e.g.
/// session-level ops).
pub reference: Option<Arc<str>>,
/// Retry counter — incremented each time the consumer re-issues
/// the operation (e.g. via `Session::recover_connection`'s
/// re-advise loop). Always `0` on the first attempt.
pub retry_count: u32,
}
/// One operation-status event surfaced to consumers via
/// [`Session::operation_status_events`].
///
/// Mirrors `MxNativeOperationStatusEvent` (`MxNativeSession.cs:73-78`)
/// with the addition of typed [`MxStatus`] promotion (the
/// `Lmx.dll!FUN_10100ce0` synthesizer kernel — see
/// `design/70-risks-and-open-questions.md` R3/R4 Path A).
///
/// - [`Self::raw`] preserves the parsed frame byte-for-byte (matching
/// the .NET `Message` field).
/// - [`Self::status`] is the typed `MxStatus`. For 5-byte status-word
/// frames this is the canonical sentinel
/// ([`MxStatus::WRITE_COMPLETE_OK`] for the proven `(0x8050, 0x00)`
/// shape) or the verbatim-preserve placeholder for unknown shapes.
/// For 1-byte completion frames this is the verbatim-preserve
/// placeholder per R3/R4. **Callers holding a 4-byte packed status
/// word from a different layer should call
/// [`MxStatus::from_packed_u32`] directly.**
/// - [`Self::context`] carries the originating
/// [`OperationContext`] when the event can be correlated back to a
/// tracked outstanding operation. The current implementation
/// always emits `None` — operation-tracking plumbing lands as a
/// follow-up (see the module-level docs).
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct OperationStatus {
/// Raw parsed frame, byte-for-byte preserved.
pub raw: NmxOperationStatusMessage,
/// Typed status (synthesizer-promoted for known shapes; verbatim
/// for unknown).
pub status: MxStatus,
/// Optional originating-call context. Always `None` until the
/// operation-tracking plumbing is wired (see module-level docs).
pub context: Option<OperationContext>,
/// `true` when the frame arrived during an active
/// `Session::recover_connection` window. Mirrors
/// `MxNativeOperationStatusEvent.IsDuringRecovery`
/// (`MxNativeSession.cs:78`).
pub is_during_recovery: bool,
}
/// Subscription handle returned by [`Session::subscribe`]. Implements
/// `Stream<Item = Result<DataChange, Error>>` — driving it forward
/// yields one [`DataChange`] per matching record observed on the
@@ -336,6 +455,19 @@ pub struct SessionInner {
/// Broadcast channel that fans out parsed callback messages. Tap
/// via [`Session::callbacks`].
pub(crate) callback_tx: broadcast::Sender<Arc<NmxSubscriptionMessage>>,
/// Broadcast channel that fans out parsed operation-status events.
/// Tap via [`Session::operation_status_events`].
pub(crate) operation_status_tx: broadcast::Sender<Arc<OperationStatus>>,
/// Atomic counter incremented by `recover_connection` while a
/// recovery attempt is in flight. The router reads this when
/// constructing `OperationStatus` events to populate
/// `is_during_recovery`. Mirrors `MxNativeSession._recoveryActive`
/// (`MxNativeSession.cs:573` — `Volatile.Read(ref _recoveryActive)`).
///
/// Wrapped in `Arc` so the router task (spawned at session
/// bring-up) can observe flips from `recover_connection` without
/// holding a strong reference to the entire `SessionInner`.
pub(crate) recovery_active: Arc<std::sync::atomic::AtomicU32>,
/// Handle to the router task that drains the
/// [`CallbackExporter`]'s `CallbackEvent` channel and pushes parsed
/// `NmxSubscriptionMessage`s onto `callback_tx`. `None` after
@@ -448,9 +580,8 @@ pub(crate) enum SubscriptionMode {
pub type RebuildFactory = Arc<
dyn Fn() -> std::pin::Pin<
Box<
dyn std::future::Future<
Output = Result<NmxClient, mxaccess_nmx::NmxClientError>,
> + Send,
dyn std::future::Future<Output = Result<NmxClient, mxaccess_nmx::NmxClientError>>
+ Send,
>,
> + Send
+ Sync,
@@ -472,8 +603,9 @@ impl std::fmt::Debug for SessionInner {
}
}
/// Drain `CallbackExporter` events, decode `CallbackInvoked` bodies as
/// `NmxSubscriptionMessage`, and broadcast each parsed message.
/// Drain `CallbackExporter` events, decode `CallbackInvoked` bodies,
/// and broadcast typed messages onto `callback_tx` (subscription
/// callbacks) or `operation_status_tx` (operation-status frames).
///
/// Exits when the upstream `CallbackEvent` channel closes (which
/// happens when the `CallbackExporter` is dropped or
@@ -482,17 +614,46 @@ impl std::fmt::Debug for SessionInner {
/// need them can subscribe to the raw `CallbackExporter` events
/// directly via a future "diagnostic-channel" hook (no followup yet
/// — surface only when a real consumer asks).
///
/// Dispatch order mirrors
/// `MxNativeSession.OnCallbackReceived` (`cs:571-607`):
/// operation-status first (the simplest 1- or 5-byte payload), then
/// fall through to subscription messages. The `is_during_recovery`
/// flag on each emitted [`OperationStatus`] is taken from the live
/// `recovery_active` counter so the receiver matches the .NET
/// reference's volatile-read semantics at `cs:573`.
pub(crate) async fn callback_router(
mut events: tokio::sync::mpsc::UnboundedReceiver<CallbackEvent>,
callback_tx: broadcast::Sender<Arc<NmxSubscriptionMessage>>,
operation_status_tx: broadcast::Sender<Arc<OperationStatus>>,
recovery_active: Arc<std::sync::atomic::AtomicU32>,
) {
while let Some(event) = events.recv().await {
if let CallbackEvent::CallbackInvoked { body, .. } = event {
// The body is the inner NMX subscription message — same
// 23-byte preamble + records as `NmxSubscriptionMessage::parse_inner`
// expects. Parse failures are silent (no consumer) since the
// .NET reference also fires `UnparsedCallbackReceived` events
// separately and we don't model that yet.
// 1. Try operation-status first — peels the outer envelope
// and parses a 1- or 5-byte completion frame. Mirrors
// `MxNativeSession.OnCallbackReceived:574`.
if let Ok(op) = NmxOperationStatusMessage::try_parse_process_data_received_body(&body) {
let is_during_recovery =
recovery_active.load(std::sync::atomic::Ordering::Acquire) > 0;
let typed = op.promote_to_typed();
let _ = operation_status_tx.send(Arc::new(OperationStatus {
raw: op,
status: typed,
// Operation-tracking plumbing not yet wired —
// always emit context=None for now (R3/R4
// follow-on tracks adding the correlation channel).
context: None,
is_during_recovery,
}));
continue;
}
// 2. Fall through to subscription messages — same 23-byte
// preamble + records as `NmxSubscriptionMessage::parse_inner`
// expects. Parse failures are silent (no consumer) since
// the .NET reference also fires `UnparsedCallbackReceived`
// events separately and we don't model that yet.
if let Ok(msg) = NmxSubscriptionMessage::parse_inner(&body) {
// `send` returns `Err(SendError)` only when there are zero
// receivers — that's fine for this wire path; nothing to do.
@@ -618,7 +779,15 @@ impl Session {
// 2. Spawn the router task that broadcasts parsed callback
// messages.
let (callback_tx, _) = broadcast::channel(CALLBACK_BROADCAST_CAPACITY);
let router_handle = tokio::spawn(callback_router(callback_events, callback_tx.clone()));
let (operation_status_tx, _) =
broadcast::channel::<Arc<OperationStatus>>(OPERATION_STATUS_BROADCAST_CAPACITY);
let recovery_active = Arc::new(std::sync::atomic::AtomicU32::new(0));
let router_handle = tokio::spawn(callback_router(
callback_events,
callback_tx.clone(),
operation_status_tx.clone(),
recovery_active.clone(),
));
// 3. RegisterEngine2 with the callback OBJREF. Mirrors cs:163-175.
let hr = nmx
@@ -662,6 +831,8 @@ impl Session {
nmx: Mutex::new(nmx),
callback_exporter: Mutex::new(Some(exporter)),
callback_tx,
operation_status_tx,
recovery_active,
router_handle: std::sync::Mutex::new(Some(router_handle)),
recovery_tx,
connected: std::sync::atomic::AtomicBool::new(true),
@@ -689,6 +860,59 @@ impl Session {
self.inner.recovery_tx.subscribe()
}
/// Subscribe to operation-status events.
///
/// Returns a [`broadcast::Receiver`] that yields one
/// [`OperationStatus`] per parsed completion frame. Mirrors
/// `MxNativeSession.OperationStatusReceived`
/// (`MxNativeSession.cs:118`) but exposes the typed
/// [`MxStatus`] (the synthesizer kernel
/// [`MxStatus::from_packed_u32`] is applied where the bit layout
/// matches; verbatim-preserve placeholders are returned for the
/// 1-byte completion frames per
/// `design/70-risks-and-open-questions.md` R3/R4).
///
/// Slow consumers see `RecvError::Lagged(n)` from the underlying
/// broadcast — the wire protocol does not replay missed
/// operation-status frames so consumers must keep up or accept
/// lag-loss.
///
/// The first emitted event will have
/// [`OperationStatus::context`] == `None` for now —
/// operation-tracking plumbing (correlating completion frames
/// back to outstanding writes/subscribes) is the next step in the
/// R3/R4 follow-on work. The synthesizer kernel itself is in place
/// today.
#[must_use]
pub fn operation_status_events(&self) -> broadcast::Receiver<Arc<OperationStatus>> {
self.inner.operation_status_tx.subscribe()
}
/// Stream variant of [`Self::operation_status_events`]: yields
/// `Result<Arc<OperationStatus>, Error>` per item, mapping
/// broadcast lag to a typed error.
///
/// Mirrors the `Stream`-based access pattern already provided by
/// the `Subscription::Stream` impl. Use the raw
/// [`broadcast::Receiver`] returned by
/// [`Self::operation_status_events`] when control over backpressure
/// or lag-handling matters.
pub fn operation_status_stream(
&self,
) -> impl Stream<Item = Result<Arc<OperationStatus>, Error>> + Send {
let rx = self.inner.operation_status_tx.subscribe();
BroadcastStream::new(rx).map(|item| match item {
Ok(ev) => Ok(ev),
Err(tokio_stream::wrappers::errors::BroadcastStreamRecvError::Lagged(n)) => {
Err(Error::Configuration(ConfigError::InvalidArgument {
detail: format!(
"operation-status stream lagged behind broadcast by {n} events"
),
}))
}
})
}
/// Install the [`RebuildFactory`] used by [`Self::recover_connection`]
/// to build a fresh [`NmxClient`] on each retry attempt. Without
/// a factory, `recover_connection` returns
@@ -744,9 +968,26 @@ impl Session {
// recovery body can take the nmx mutex without deadlocking.
let factory = {
let lock = self.inner.rebuild_factory.lock().await;
lock.clone().ok_or(Error::Configuration(
ConfigError::RecoveryNotConfigured,
))?
lock.clone()
.ok_or(Error::Configuration(ConfigError::RecoveryNotConfigured))?
};
// Mark the session as in-recovery so the callback router
// stamps `OperationStatus.is_during_recovery = true` for any
// events that arrive during the attempt. Mirrors
// `MxNativeSession._recoveryActive` (`cs:573` — volatile
// increment around `RecoverConnectionCore`).
struct RecoveryGuard(Arc<std::sync::atomic::AtomicU32>);
impl Drop for RecoveryGuard {
fn drop(&mut self) {
self.0.fetch_sub(1, std::sync::atomic::Ordering::Release);
}
}
let _recovery_guard = {
self.inner
.recovery_active
.fetch_add(1, std::sync::atomic::Ordering::AcqRel);
RecoveryGuard(self.inner.recovery_active.clone())
};
let mut last_error: Option<Error> = None;
@@ -772,10 +1013,9 @@ impl Session {
// `Error` doesn't impl `Clone` (the io::Error source isn't
// cloneable), so capture a string copy for the bubbled-up
// last_error and hand the original to the broadcast event.
let bubbled =
Error::Connection(ConnectionError::TransportFailure {
detail: e.to_string(),
});
let bubbled = Error::Connection(ConnectionError::TransportFailure {
detail: e.to_string(),
});
let _ = self.inner.recovery_tx.send(Arc::new(RecoveryEvent::Failed {
attempt,
error: e,
@@ -789,9 +1029,7 @@ impl Session {
}
}
Err(last_error.unwrap_or(Error::Connection(
ConnectionError::EngineNotRegistered,
)))
Err(last_error.unwrap_or(Error::Connection(ConnectionError::EngineNotRegistered)))
}
/// Single-attempt body of [`Self::recover_connection`], split out so
@@ -907,9 +1145,7 @@ impl Session {
)
.map_err(|e| {
Error::Configuration(ConfigError::InvalidArgument {
detail: format!(
"recovery: buffered item definition: {e}"
),
detail: format!("recovery: buffered item definition: {e}"),
})
})?;
let registration = NmxReferenceRegistrationMessage {
@@ -1308,13 +1544,14 @@ impl Session {
// reference's split-context form is reachable via the
// compat-server layer F35 once it lands). The codec helper
// rejects empty/whitespace inputs with `CodecError::InvalidName`.
let item_definition =
NmxReferenceRegistrationMessage::to_buffered_item_definition(reference)
.map_err(|e| {
Error::Configuration(ConfigError::InvalidArgument {
detail: format!("buffered item definition: {e}"),
})
})?;
let item_definition = NmxReferenceRegistrationMessage::to_buffered_item_definition(
reference,
)
.map_err(|e| {
Error::Configuration(ConfigError::InvalidArgument {
detail: format!("buffered item definition: {e}"),
})
})?;
let registration = NmxReferenceRegistrationMessage {
item_handle: 0,
item_correlation_id: correlation_id,
@@ -1876,7 +2113,15 @@ mod tests {
.await
.unwrap();
let (callback_tx, _) = broadcast::channel(CALLBACK_BROADCAST_CAPACITY);
let router_handle = tokio::spawn(callback_router(callback_events, callback_tx.clone()));
let (operation_status_tx, _) =
broadcast::channel::<Arc<OperationStatus>>(OPERATION_STATUS_BROADCAST_CAPACITY);
let recovery_active = Arc::new(std::sync::atomic::AtomicU32::new(0));
let router_handle = tokio::spawn(callback_router(
callback_events,
callback_tx.clone(),
operation_status_tx.clone(),
recovery_active.clone(),
));
let (recovery_tx, _) = broadcast::channel(RECOVERY_BROADCAST_CAPACITY);
Ok(Session {
@@ -1886,6 +2131,8 @@ mod tests {
nmx: Mutex::new(nmx),
callback_exporter: Mutex::new(Some(exporter)),
callback_tx,
operation_status_tx,
recovery_active,
router_handle: std::sync::Mutex::new(Some(router_handle)),
recovery_tx,
connected: std::sync::atomic::AtomicBool::new(true),
@@ -2323,8 +2570,15 @@ mod tests {
// broadcast pair to test the routing logic in isolation.
let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel();
let (callback_tx, mut callback_rx) = broadcast::channel(8);
let (operation_status_tx, _) = broadcast::channel::<Arc<OperationStatus>>(8);
let recovery_active = Arc::new(std::sync::atomic::AtomicU32::new(0));
let router_h = tokio::spawn(callback_router(event_rx, callback_tx));
let router_h = tokio::spawn(callback_router(
event_rx,
callback_tx,
operation_status_tx,
recovery_active,
));
// Build a minimal valid 0x32 SubscriptionStatus body: 23-byte
// preamble + 16-byte item_correlation_id, record_count=0 so no
@@ -2578,9 +2832,9 @@ mod tests {
let stub: crate::RebuildFactory = Arc::new(|| {
Box::pin(async {
Err(mxaccess_nmx::NmxClientError::Transport(
mxaccess_rpc::transport::TransportError::Io(
std::io::Error::other("synthetic rebuild failure"),
),
mxaccess_rpc::transport::TransportError::Io(std::io::Error::other(
"synthetic rebuild failure",
)),
))
})
});
@@ -2603,9 +2857,7 @@ mod tests {
for _ in 0..expected_events {
match &*rx.recv().await.unwrap() {
RecoveryEvent::Started { .. } => started += 1,
RecoveryEvent::Failed {
will_retry, ..
} => {
RecoveryEvent::Failed { will_retry, .. } => {
failed += 1;
last_will_retry = Some(*will_retry);
}
@@ -2631,8 +2883,7 @@ mod tests {
// F16: every successful subscribe() inserts into the
// SubscriptionEntry registry; unsubscribe() removes it.
// Recovery walks this registry to replay AdviseSupervisory.
let (addr, handle) =
unauthenticated_server(vec![(0, Vec::new()), (0, Vec::new())]).await;
let (addr, handle) = unauthenticated_server(vec![(0, Vec::new()), (0, Vec::new())]).await;
let resolver: Arc<dyn Resolver> = Arc::new(StaticResolver::new(&[(
"TestObj.TestInt",
sample_metadata(),
@@ -2706,10 +2957,7 @@ mod tests {
let pfc_object_uuid = (req_h.packet_flags & 0x80) != 0;
let stub_offset = if pfc_object_uuid { 8 + 16 } else { 8 };
let stub = body[stub_offset..].to_vec();
recorded_for_task
.lock()
.unwrap()
.push((opnum, stub));
recorded_for_task.lock().unwrap().push((opnum, stub));
let mut stub_resp = Vec::new();
stub_resp.extend_from_slice(&OrpcThat::default().encode());
@@ -2940,9 +3188,9 @@ mod tests {
let stub: crate::RebuildFactory = Arc::new(|| {
Box::pin(async {
Err(mxaccess_nmx::NmxClientError::Transport(
mxaccess_rpc::transport::TransportError::Io(
std::io::Error::other("stub factory: rebuild always fails"),
),
mxaccess_rpc::transport::TransportError::Io(std::io::Error::other(
"stub factory: rebuild always fails",
)),
))
})
});
@@ -2951,9 +3199,7 @@ mod tests {
let mut rx_a = session.recovery_events();
let mut rx_b = session.recovery_events();
let _ = session
.recover_connection(RecoveryPolicy::default())
.await;
let _ = session.recover_connection(RecoveryPolicy::default()).await;
// First event from each receiver is the same Started Arc.
let a = rx_a.recv().await.unwrap();
@@ -3071,6 +3317,181 @@ mod tests {
handle.abort();
}
/// Build a `ProcessDataReceived`-style envelope wrapping a 5-byte
/// operation-status inner body. Mirrors what `NmxObservedEnvelope`
/// serialises (`mxaccess-codec/src/observed_frame.rs:115-141`):
///
/// - 4-byte total-length prefix
/// - 46-byte header with `inner_length` at offset 6 (header
/// offset 4 + INNER_LENGTH_OFFSET 2)
/// - inner body
fn wrap_op_status_envelope(inner: &[u8]) -> Vec<u8> {
const HEADER_LENGTH: usize = 46;
let total_len = 4 + HEADER_LENGTH + inner.len();
let mut body = vec![0u8; total_len];
// Total-length prefix at offset 0.
body[0..4].copy_from_slice(&(total_len as i32).to_le_bytes());
// `actualInnerLength = declaredInnerLength - sizeof(int)` per
// the parser at `observed_frame.rs:134`. So
// `declaredInnerLength = inner.len() + 4`.
let declared_inner: i32 = inner.len() as i32 + 4;
// Inner-length field sits at headerOffset + INNER_LENGTH_OFFSET
// = 4 + 2 = 6.
body[6..10].copy_from_slice(&declared_inner.to_le_bytes());
// Inner body follows the header.
body[4 + HEADER_LENGTH..].copy_from_slice(inner);
body
}
#[tokio::test]
async fn router_dispatches_status_word_frame_to_operation_status_channel() {
// End-to-end: hand-build an operation-status `ProcessDataReceived`
// body and confirm the router parses it + broadcasts an
// `OperationStatus` (NOT a subscription message).
let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel();
let (callback_tx, mut callback_rx) = broadcast::channel(8);
let (operation_status_tx, mut operation_status_rx) =
broadcast::channel::<Arc<OperationStatus>>(8);
let recovery_active = Arc::new(std::sync::atomic::AtomicU32::new(0));
let router_h = tokio::spawn(callback_router(
event_rx,
callback_tx,
operation_status_tx,
recovery_active,
));
// Inner body is the proven `00 00 50 80 00` 5-byte status-word frame.
let inner = [0x00, 0x00, 0x50, 0x80, 0x00];
let body = wrap_op_status_envelope(&inner);
event_tx
.send(CallbackEvent::CallbackInvoked { opnum: 4, body })
.unwrap();
let received = tokio::time::timeout(
std::time::Duration::from_secs(1),
operation_status_rx.recv(),
)
.await
.expect("router timed out");
let event = received.expect("broadcast recv error");
// Raw frame round-trips byte-exact.
assert_eq!(event.raw.command, 0x00);
assert_eq!(event.raw.status_code, 0x8050);
assert_eq!(event.raw.completion_code, 0x00);
// Synthesizer-promoted status equals the canonical sentinel.
assert_eq!(event.status, MxStatus::WRITE_COMPLETE_OK);
// Context not yet wired — always None for this iteration.
assert!(event.context.is_none());
// No recovery in flight when the event was dispatched.
assert!(!event.is_during_recovery);
// Subscription channel must NOT have received anything — the
// dispatcher's `continue` after operation-status hit means
// subscription parsing never runs for this body.
let cb_res =
tokio::time::timeout(std::time::Duration::from_millis(100), callback_rx.recv()).await;
assert!(
cb_res.is_err(),
"subscription channel got an unexpected event"
);
drop(event_tx);
let _ = router_h.await;
}
#[tokio::test]
async fn router_dispatches_completion_only_frames_under_each_proven_byte() {
// Per `design/70-risks-and-open-questions.md` R3/R4 the three
// observed completion bytes are 0x00, 0x41, 0xEF. The synthesizer
// does NOT promote them (no upstream evidence per Path A's
// Ghidra walk); they should arrive on the operation-status
// channel as verbatim-preserve placeholders.
for byte in [0x00_u8, 0x41, 0xEF] {
let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel();
let (callback_tx, _callback_rx) = broadcast::channel(8);
let (operation_status_tx, mut operation_status_rx) =
broadcast::channel::<Arc<OperationStatus>>(8);
let recovery_active = Arc::new(std::sync::atomic::AtomicU32::new(0));
let router_h = tokio::spawn(callback_router(
event_rx,
callback_tx,
operation_status_tx,
recovery_active,
));
let inner = [byte];
let body = wrap_op_status_envelope(&inner);
event_tx
.send(CallbackEvent::CallbackInvoked { opnum: 4, body })
.unwrap();
let received = tokio::time::timeout(
std::time::Duration::from_secs(1),
operation_status_rx.recv(),
)
.await
.expect("router timed out");
let event = received.expect("broadcast recv error");
assert_eq!(event.raw.completion_code, byte);
assert_eq!(event.status.detail, i16::from(byte));
// R3/R4: completion-only bytes stay verbatim (Unknown/Unknown).
assert_eq!(
event.status.category,
mxaccess_codec::MxStatusCategory::Unknown
);
assert_eq!(
event.status.detected_by,
mxaccess_codec::MxStatusSource::Unknown
);
drop(event_tx);
let _ = router_h.await;
}
}
#[tokio::test]
async fn router_marks_is_during_recovery_when_counter_nonzero() {
// Stamp `recovery_active = 1` BEFORE feeding an event — the
// router should observe the volatile load and emit
// `OperationStatus.is_during_recovery = true`. Mirrors
// `MxNativeSession.OnCallbackReceived:573` which reads the same
// flag via `Volatile.Read(ref _recoveryActive)`.
let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel();
let (callback_tx, _callback_rx) = broadcast::channel(8);
let (operation_status_tx, mut operation_status_rx) =
broadcast::channel::<Arc<OperationStatus>>(8);
let recovery_active = Arc::new(std::sync::atomic::AtomicU32::new(1));
let router_h = tokio::spawn(callback_router(
event_rx,
callback_tx,
operation_status_tx,
recovery_active,
));
let inner = [0x00, 0x00, 0x50, 0x80, 0x00];
let body = wrap_op_status_envelope(&inner);
event_tx
.send(CallbackEvent::CallbackInvoked { opnum: 4, body })
.unwrap();
let event = tokio::time::timeout(
std::time::Duration::from_secs(1),
operation_status_rx.recv(),
)
.await
.expect("router timed out")
.expect("broadcast recv error");
assert!(event.is_during_recovery);
drop(event_tx);
let _ = router_h.await;
}
#[test]
fn filetime_to_system_time_round_trip() {
// Build a SystemTime, convert to FILETIME, convert back.
@@ -3098,7 +3519,14 @@ mod tests {
// window.
let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel();
let (callback_tx, mut callback_rx) = broadcast::channel(8);
let router_h = tokio::spawn(callback_router(event_rx, callback_tx));
let (operation_status_tx, _) = broadcast::channel::<Arc<OperationStatus>>(8);
let recovery_active = Arc::new(std::sync::atomic::AtomicU32::new(0));
let router_h = tokio::spawn(callback_router(
event_rx,
callback_tx,
operation_status_tx,
recovery_active,
));
event_tx
.send(CallbackEvent::Bind {
@@ -3142,7 +3570,11 @@ mod tests {
// Issue a plain subscribe — server records AdviseSupervisory.
let sub = session.subscribe("TestObj.TestInt").await.unwrap();
let cid = sub.correlation_id;
assert_eq!(recorded.lock().unwrap().len(), 1, "subscribe should issue 1 RPC");
assert_eq!(
recorded.lock().unwrap().len(),
1,
"subscribe should issue 1 RPC"
);
// Mutate the registry entry's mode to Buffered (synthesise the
// state subscribe_buffered_nmx would have produced).