[M4] mxaccess: Subscription impls Stream<Item = DataChange> (resolves F15)

F15 step 2/2 lands the per-Subscription routing on top of step 1's
broadcast layer. Subscription is now a working data-change stream.

Subscription type
- Now impls futures_util::Stream<Item = Result<DataChange, Error>>
  via tokio_stream::wrappers::BroadcastStream + a per-message filter.
- No longer Clone (broadcast::Receiver isn't Clone). Consumers that
  want fanout subscribe twice or share via Arc<Mutex<...>>.
- Holds the broadcast::Receiver subscribed BEFORE AdviseSupervisory
  fires — guarantees no updates between advise and stream-creation
  are dropped.
- pending VecDeque buffers records from the current message so each
  poll_next yields at most one DataChange (Stream contract).

Filter logic (records_to_data_changes, mirrors cs:333-343)
- 0x32 SubscriptionStatus: keep when
  msg.item_correlation_id == subscription.correlation_id; drop
  otherwise.
- 0x33 DataUpdate: keep ALL — codec exposes no per-record correlation
  field, and the .NET filter only checks item_correlation_id (which
  0x33 doesn't carry), so DataUpdates fan out to every active
  subscription. Matches .NET behavior verbatim.
- Records with value: None drop silently (mirrors evt.Record.Value
  is null filter at cs:337).
- BroadcastStream Lagged(n) maps to Error::Configuration with the
  lag count in the detail string.

Helpers
- filetime_to_system_time(i64) -> SystemTime: inverse of
  system_time_to_filetime; saturates at Unix epoch for FILETIMEs
  before 1970 since SystemTime can't portably represent pre-epoch.
- record_to_data_change(record, reference) -> Option<DataChange>:
  builds DataChange from one record, returns None for unparseable
  value (the codec couldn't decode the wire kind).
- Status currently hardcoded to MxStatus::DATA_CHANGE_OK (mirrors
  NmxSubscriptionRecord.ToDataChangeStatus at NmxSubscriptionMessage.cs:22-25
  which the .NET reference itself stubs to the OK constant).

Cargo.toml additions: futures-util (workspace) + tokio-stream (0.1
with sync feature for BroadcastStream).

Tests (5 new in mxaccess; total 40)
- subscription_stream_yields_data_change_for_matching_correlation:
  build a 0x32 SubscriptionStatus with one Int32 record and the
  subscription's correlation id, inject through test_inject_sender,
  observe the DataChange (reference, value, quality match) on the
  Stream.
- subscription_stream_filters_out_mismatched_correlation_for_status:
  inject 0x32 with wrong correlation id, assert the stream stays
  pending (timeout-as-success).
- subscription_stream_keeps_data_update_regardless_of_correlation:
  inject 0x33 DataUpdate with one Int32 record (no correlation
  field on the message); stream still yields the DataChange.
- filetime_to_system_time_round_trip: build a SystemTime with .005s
  precision, round-trip through both helpers, assert equality.
- filetime_to_system_time_pre_unix_epoch_saturates: FILETIME 0 (year
  1601) → SystemTime::UNIX_EPOCH (saturating clamp).

design/followups.md: F15 moved to Resolved with both step commits
referenced. Open list: 9 items (was 10).

Test count delta: 511 -> 516 (+5). All four DoD gates green.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-05 09:45:16 -04:00
parent 2b849aed7a
commit a31237d1d0
4 changed files with 418 additions and 19 deletions
+5 -6
View File
@@ -54,12 +54,6 @@ move to `## Resolved` with a date + commit hash.
**Why deferred:** `ManagedNmxService2Client.Create()` (`ManagedNmxService2Client.cs:30-64`) auto-discovers `(host, port, service_ipid)` by activating the `NmxSvc.NmxService` COM ProgID, marshalling the resulting `IUnknown` to an OBJREF, calling `IObjectExporter::ResolveOxid` against the OXID inside, then `IRemUnknown::RemQueryInterface` to get the `INmxService2` IPID. This requires `windows-rs` for `CoCreateInstance` / `CLSIDFromProgID` (the same gating dep as F6), plus the `ComObjRefProvider.MarshalIUnknownObjRef` port (also F6).
**Resolves when:** F6 lands (windows-rs wired in + `ComObjRefProvider` port). At that point `NmxClient::create()` becomes ~30 lines that chain the existing primitives: COM activation → `MarshalIUnknownObjRef``ComObjRef::parse``object_exporter_client::resolve_oxid_with_managed_ntlm_packet_integrity``rem_unknown::encode_rem_query_interface_request` over a temporary transport → `NmxClient::connect`.
### F15 — Callback router wires `CallbackExporter` events into `Subscription` stream
**Severity:** P1
**Source:** M4 wave 2, `crates/mxaccess/src/session.rs`
**Why deferred:** The wave 1 `Session::subscribe` returns a [`Subscription`] handle that registers the advise on the wire, but no `DataChange` items are delivered yet — the routing layer that would (a) wire `CallbackExporter` into `Session::connect_nmx` (instead of the current null-callback `RegisterEngine2`), (b) spawn a router task that drains [`mxaccess_callback::CallbackEvent`], decodes the inner body via `mxaccess_codec::NmxSubscriptionMessage`, and (c) routes `DataChange` items to per-subscription `mpsc` channels keyed by `correlation_id` (for `0x32` SubscriptionStatus) or `operation_id` (for `0x33` DataUpdate). Mirrors the .NET `MxNativeSession.OnCallbackReceived` (`MxNativeSession.cs:113-114` event wiring + `cs:333-343` filter pattern).
**Resolves when:** `Subscription` impls `Stream<Item = Result<DataChange, Error>>` and a real-server round-trip test (or hand-rolled `CallbackExporter` loopback) shows a `DataChange` flowing end-to-end. Notes: the `0x33` DataUpdate routing is the trickier piece because the codec exposes `item_correlation_id` only on `0x32` SubscriptionStatus — the .NET reference's `MxNativeCallbackEvent` filter at `cs:336` actually only catches the initial SubscriptionStatus, which suggests data updates go to a single per-engine sink rather than per-correlation channels. The Rust port should re-verify the wire model against `captures/058-frida-subscribe-testint` before locking in the routing key.
### F14 — `tiberius`-backed SQL implementation of `Resolver` + `UserResolver`
**Severity:** P2
**Source:** M3 stream A, `crates/mxaccess-galaxy/src/sql.rs` (constants present, no client wiring yet)
@@ -78,6 +72,11 @@ move to `## Resolved` with a date + commit hash.
### F13 — `NmxClient` high-level write/advise/subscribe wrappers
**Resolved:** 2026-05-05. All seven wrappers landed in `crates/mxaccess-nmx/src/client.rs`: `write`, `write2`, `write_secured2`, `advise_supervisory`, `send_observed_pre_advise_metadata`, `register_reference`, `un_advise`. Each takes a `GalaxyTagMetadata` + a typed `WriteValue` (re-exported from `mxaccess-codec`), builds the inner NMX body via `mxaccess-codec` (`write_message::encode` / `encode_timestamped` / `secured_write::encode` / `NmxItemControlMessage` / `NmxMetadataQueryMessage` / `NmxReferenceRegistrationMessage`), wraps in `NmxTransferEnvelope`, and routes through `transfer_data`. The pure-codec `encode_*_transfer_body` helpers are extracted as `pub(crate) fn` for testability, mirroring the .NET reference's `internal static` shape. `un_advise` preserves the .NET reference's quirky `NmxTransferMessageKind::Write` envelope (not `ItemControl`) per `cs:457`.
### F15 — Callback router wires `CallbackExporter` events into `Subscription` stream
**Resolved:** 2026-05-05 across two commits.
- Step 1/2 (`2b849ae`): `Session::connect_nmx` now starts a `CallbackExporter` on a 127.0.0.1 ephemeral port, builds the OBJREF via `local_hostname()` + `127.0.0.1` fallback, registers it through `NmxClient::register_engine_2` (was `..._without_callback`). A `callback_router` task drains `CallbackEvent`s, decodes each `CallbackInvoked` body via `NmxSubscriptionMessage::parse_inner`, and broadcasts parsed messages on a `tokio::sync::broadcast` channel exposed via `Session::callbacks()`. Shutdown chains: UnregisterEngine → CallbackExporter::shutdown → wait for router task.
- Step 2/2 (this commit): `Subscription` now impls `Stream<Item = Result<DataChange, Error>>`. Filtering follows the .NET reference at `cs:333-343` exactly — `0x32` SubscriptionStatus messages are kept only when `message.item_correlation_id == subscription.correlation_id`; `0x33` DataUpdate messages pass through to ALL subscriptions because the codec exposes no per-record correlation field (matches the .NET `MxNativeCallbackEvent` filter behavior verbatim). Each `NmxSubscriptionRecord` with a parseable `value` becomes one `DataChange`. Records with `value: None` are dropped silently (mirrors the .NET `evt.Record.Value is null` filter at `cs:337`). Lag-loss surfaces as `Error::Configuration(InvalidArgument)` carrying the lag count. Stream-end (broadcast sender dropped) yields `None`. New helper: `filetime_to_system_time` (inverse of the existing `system_time_to_filetime`); saturates at Unix epoch for pre-1970 FILETIMEs. Tests cover correlation match/mismatch for `0x32`, `0x33` pass-through for any correlation, and FILETIME round-trip.
### F1 — NTLM consumer-layer helpers (workstation default + from_env constructor)
**Resolved:** 2026-05-05. `NtlmClientContext::from_env()` reads `MX_RPC_USER` / `MX_RPC_PASSWORD` / `MX_RPC_DOMAIN` (mirrors `ManagedNtlmClientContext.FromEnvironment` at `cs:41-49`); empty `MX_RPC_DOMAIN` is permitted. `local_hostname()` checks `COMPUTERNAME` then `HOSTNAME` and returns the empty string when neither is set — same "unavailable" semantics as `Environment.MachineName` returning null. Lives in `mxaccess-rpc/src/ntlm.rs`; deliberately doesn't pull `gethostname` (no native-libc deps, no `unsafe` for hostname lookup). Added `NtlmError::MissingEnvVar { name }` for the env-var-unset case. Test mod gained an `EnvScope` + `ENV_LOCK` mutex pattern for serializing process-global env mutation across parallel tests.
+47
View File
@@ -96,6 +96,23 @@ version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e3450815272ef58cec6d564423f6e755e25379b217b0bc688e295ba24df6b1d"
[[package]]
name = "futures-macro"
version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e835b70203e41293343137df5c0664546da5745f82ec9b84d40be8336958447b"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "futures-sink"
version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c39754e157331b013978ec91992bde1ac089843443c49cbc7f46150b0fad0893"
[[package]]
name = "futures-task"
version = "0.3.32"
@@ -109,6 +126,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "389ca41296e6190b48053de0321d02a77f32f8a5d2461dd38762c0593805c6d6"
dependencies = [
"futures-core",
"futures-macro",
"futures-task",
"pin-project-lite",
"slab",
@@ -215,12 +233,16 @@ name = "mxaccess"
version = "0.0.0"
dependencies = [
"async-trait",
"futures-util",
"mxaccess-callback",
"mxaccess-codec",
"mxaccess-galaxy",
"mxaccess-nmx",
"mxaccess-rpc",
"rand",
"thiserror",
"tokio",
"tokio-stream",
"tracing",
]
@@ -462,6 +484,31 @@ dependencies = [
"syn",
]
[[package]]
name = "tokio-stream"
version = "0.1.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32da49809aab5c3bc678af03902d4ccddea2a87d028d86392a4b1560c6906c70"
dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
"tokio-util",
]
[[package]]
name = "tokio-util"
version = "0.7.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ae9cec805b01e8fc3fd2fe289f89149a9b66dd16786abd8b19cfa7b48cb0098"
dependencies = [
"bytes",
"futures-core",
"futures-sink",
"pin-project-lite",
"tokio",
]
[[package]]
name = "tracing"
version = "0.1.44"
+2
View File
@@ -17,6 +17,8 @@ mxaccess-rpc = { path = "../mxaccess-rpc" }
thiserror = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
futures-util = { workspace = true }
tokio-stream = { version = "0.1", features = ["sync"] }
rand = "0.8"
[dev-dependencies]
+364 -13
View File
@@ -33,15 +33,21 @@ use std::sync::Arc;
use std::time::SystemTime;
use mxaccess_callback::{CallbackEvent, CallbackExporter, ExporterIdentities};
use mxaccess_codec::NmxSubscriptionMessage;
use mxaccess_codec::{MxStatus, NmxSubscriptionMessage, NmxSubscriptionRecord};
use mxaccess_galaxy::{GalaxyTagMetadata, Resolver, ResolverError};
use mxaccess_nmx::{NmxClient, NmxClientError, WriteValue};
use mxaccess_rpc::guid::Guid;
use mxaccess_rpc::ntlm::{NtlmClientContext, local_hostname};
use mxaccess_rpc::transport::TransportError;
use std::net::SocketAddr;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::sync::{Mutex, broadcast};
use tokio::task::JoinHandle;
use tokio_stream::wrappers::BroadcastStream;
use crate::DataChange;
use futures_util::Stream;
use crate::{
ConfigError, ConnectionError, Error, RecoveryPolicy, SecurityContext, Session, SessionOptions,
@@ -58,29 +64,77 @@ use crate::{
/// either keep up or accept lag-loss.
const CALLBACK_BROADCAST_CAPACITY: usize = 256;
/// Subscription lifecycle handle returned by [`Session::subscribe`].
/// Subscription handle returned by [`Session::subscribe`]. Implements
/// `Stream<Item = Result<DataChange, Error>>` — driving it forward
/// yields one [`DataChange`] per matching record observed on the
/// session's broadcast channel.
///
/// Carries the 16-byte `correlation_id` the Rust port generated for
/// the subscription, the original `reference` string (for diagnostics),
/// and the resolved [`GalaxyTagMetadata`] (used by
/// [`Session::unsubscribe`] to issue the matching `UnAdvise`).
/// the subscription, the original `reference` string, the resolved
/// [`GalaxyTagMetadata`] (used by [`Session::unsubscribe`] to issue
/// the matching `UnAdvise`), and the underlying broadcast receiver.
///
/// Currently a pure lifecycle handle — does not yet impl
/// `Stream<Item = DataChange>`. The callback router that will turn
/// this into a stream is followup F15 in `design/followups.md`.
/// ## Routing model
///
/// **Not auto-cleaning**: `Subscription` deliberately does not
/// implement `Drop` to fire `UnAdvise`. The .NET reference's
/// `tokio::spawn`-from-Drop pattern is the R15 hazard tracked at
/// The stream applies the same filter the .NET reference does at
/// `MxNativeSession.cs:333-343`:
///
/// - `0x32` SubscriptionStatus messages: keep when
/// `message.item_correlation_id == self.correlation_id`. Other
/// subscriptions' correlation ids are filtered out.
/// - `0x33` DataUpdate messages: keep ALL. The codec exposes no
/// per-record correlation field, and the .NET reference's filter
/// only checks `item_correlation_id` (which `0x33` doesn't carry),
/// so DataUpdates fan out to every active subscription. This
/// matches the .NET behavior verbatim — proper per-subscription
/// `0x33` routing requires evidence-based design work against
/// `captures/058-frida-subscribe-testint` (tracked in F15's
/// doc note in `design/followups.md`).
///
/// Each `NmxSubscriptionRecord` whose `value` parsed successfully
/// becomes one `DataChange`. Records with `value: None` are dropped
/// silently (mirrors the .NET `evt.Record.Value is null` filter at
/// `cs:337`).
///
/// ## Not auto-cleaning
///
/// `Subscription` deliberately does not implement `Drop` to fire
/// `UnAdvise`. The .NET reference's `tokio::spawn`-from-Drop pattern
/// is the R15 hazard tracked at
/// `design/70-risks-and-open-questions.md`. Callers must call
/// `Session::unsubscribe(sub).await` explicitly. Followup F16 will
/// `Session::unsubscribe(sub).await` explicitly. Future work will
/// add a long-lived connection task that supports best-effort
/// drop-time cleanup without the spawn-from-Drop hazard.
#[derive(Debug, Clone)]
///
/// ## Lag behavior
///
/// The underlying broadcast channel has a fixed capacity
/// ([`CALLBACK_BROADCAST_CAPACITY`]). Slow consumers that fall behind
/// receive `Some(Err(Error::Configuration(InvalidArgument)))` whose
/// detail string contains the lag-loss count. The stream stays open
/// and resumes from the next available message — same shape as
/// `tokio_stream::wrappers::BroadcastStream`'s lag handling.
pub struct Subscription {
pub(crate) correlation_id: [u8; 16],
pub(crate) reference: Arc<str>,
pub(crate) metadata: Arc<GalaxyTagMetadata>,
/// Broadcast stream wrapper. `None` after the first call to
/// `Stream::poll_next` exhausts the channel (sender dropped).
pub(crate) inbound: Pin<Box<BroadcastStream<Arc<NmxSubscriptionMessage>>>>,
/// Buffered records from the current message. Drained one-per-poll
/// before pulling the next message off `inbound`.
pub(crate) pending: std::collections::VecDeque<DataChange>,
}
impl std::fmt::Debug for Subscription {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Subscription")
.field("correlation_id", &self.correlation_id)
.field("reference", &self.reference)
.field("metadata", &self.metadata.attribute_id)
.field("pending", &self.pending.len())
.finish_non_exhaustive()
}
}
impl Subscription {
@@ -105,6 +159,115 @@ impl Subscription {
}
}
impl Stream for Subscription {
type Item = Result<DataChange, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
// Drain any buffered DataChanges from the previous message.
if let Some(dc) = self.pending.pop_front() {
return Poll::Ready(Some(Ok(dc)));
}
// Pull the next broadcast message.
let next = std::task::ready!(self.inbound.as_mut().poll_next(cx));
let msg = match next {
Some(Ok(msg)) => msg,
Some(Err(tokio_stream::wrappers::errors::BroadcastStreamRecvError::Lagged(n))) => {
return Poll::Ready(Some(Err(Error::Configuration(
ConfigError::InvalidArgument {
detail: format!("subscription lagged behind broadcast by {n} messages"),
},
))));
}
None => return Poll::Ready(None), // sender dropped
};
// Apply the .NET-compatible filter and convert matching
// records to DataChange items.
let reference = self.reference.clone();
let correlation_id = self.correlation_id;
for dc in records_to_data_changes(&msg, correlation_id, &reference) {
self.pending.push_back(dc);
}
// Loop back to drain `pending` (or pull the next message if
// none of the records survived the filter).
}
}
}
/// Convert subscription-message records to [`DataChange`] items, applying
/// the per-`Subscription` filter (`0x32` keep when correlation matches;
/// `0x33` keep all).
///
/// Records with `value: None` are silently dropped — mirrors the .NET
/// `evt.Record.Value is null` filter at `MxNativeSession.cs:337`.
pub(crate) fn records_to_data_changes(
msg: &NmxSubscriptionMessage,
subscription_correlation: [u8; 16],
reference: &Arc<str>,
) -> Vec<DataChange> {
// Filter at message level first — see the routing-model doc on
// `Subscription` for the rationale.
if msg.command == 0x32 {
match msg.item_correlation_id {
Some(id) if id.0 == subscription_correlation => {}
_ => return Vec::new(),
}
}
// 0x33 (and any other commands the codec accepts) pass through.
msg.records
.iter()
.filter_map(|rec| record_to_data_change(rec, reference))
.collect()
}
/// Build a single [`DataChange`] from one [`NmxSubscriptionRecord`].
/// Returns `None` when `record.value` is `None` (the codec couldn't
/// decode the wire kind, or the wire kind was unknown).
fn record_to_data_change(
record: &NmxSubscriptionRecord,
reference: &Arc<str>,
) -> Option<DataChange> {
let value = record.value.clone()?;
Some(DataChange {
reference: reference.clone(),
value,
quality: record.quality,
timestamp: filetime_to_system_time(record.timestamp_filetime),
// Mirrors NmxSubscriptionRecord.ToDataChangeStatus
// (`NmxSubscriptionMessage.cs:22-25`) which always returns
// `MxStatus.DataChangeOk`. Proper status derivation from the
// record's `status` / `detail_status` fields is downstream
// work — currently the .NET reference itself does not
// distinguish.
status: MxStatus::DATA_CHANGE_OK,
})
}
/// Convert a Windows FILETIME tick count (100-ns intervals since
/// 1601-01-01 UTC) to a `SystemTime`. Inverse of
/// [`system_time_to_filetime`].
///
/// Saturates at `SystemTime::UNIX_EPOCH` for FILETIMEs before the
/// Unix epoch (i.e. before 1970-01-01 UTC) — those cannot be
/// represented portably across all `SystemTime` platforms.
#[must_use]
pub fn filetime_to_system_time(filetime_ticks: i64) -> std::time::SystemTime {
const FILETIME_TO_UNIX_EPOCH_SECONDS: i64 = 11_644_473_600;
const TICKS_PER_SECOND: i64 = 10_000_000;
let total_secs_filetime = filetime_ticks / TICKS_PER_SECOND;
let subsec_ticks = filetime_ticks % TICKS_PER_SECOND;
let unix_secs = total_secs_filetime - FILETIME_TO_UNIX_EPOCH_SECONDS;
if unix_secs < 0 {
return std::time::SystemTime::UNIX_EPOCH;
}
let subsec_nanos = (subsec_ticks * 100) as u32;
std::time::SystemTime::UNIX_EPOCH + std::time::Duration::new(unix_secs as u64, subsec_nanos)
}
/// Convert a `SystemTime` to a Windows FILETIME tick count (100-ns
/// intervals since 1601-01-01 UTC). Mirrors `DateTime.ToFileTime()`
/// (referenced at `NmxWriteMessage.cs:248` and used by every
@@ -575,13 +738,25 @@ impl Session {
ensure_hresult_ok(hr)?;
drop(nmx);
// Subscribe to the broadcast BEFORE issuing AdviseSupervisory so
// updates arriving immediately after the advise aren't lost.
let inbound = Box::pin(BroadcastStream::new(self.inner.callback_tx.subscribe()));
Ok(Subscription {
correlation_id,
reference: Arc::<str>::from(reference),
metadata: Arc::new(metadata),
inbound,
pending: std::collections::VecDeque::new(),
})
}
/// `subscribe` ordering note: subscribe to the broadcast channel
/// FIRST, then issue `AdviseSupervisory`. If we ordered the other
/// way, updates that arrive between the advise call and the
/// `broadcast::subscribe` would be dropped.
///
///
/// Cancel a subscription. Mirrors `MxNativeSession.Unsubscribe`
/// (`cs:361-381`) — calls `UnAdvise` on the underlying transport.
///
@@ -1439,6 +1614,182 @@ mod tests {
.expect("router task didn't exit after upstream close");
}
// ---- F15 step 2/2: Subscription as Stream<DataChange> ------------
/// Build a synthetic 0x32 SubscriptionStatus body with one record
/// carrying a recognized scalar value (Int32 wire kind).
fn build_status_with_int32_record(
item_correlation_id: [u8; 16],
operation_id: [u8; 16],
int_value: i32,
timestamp_filetime: i64,
) -> Vec<u8> {
// Header layout: 23-byte preamble + 16-byte item_correlation_id +
// record (with detail_status: status(4) + detail_status(4) +
// quality(2) + filetime(8) + wire_kind(1) + value bytes).
// For Int32 wire kind 0x02 the value is just 4 bytes.
let header_len = 39; // preamble + item_correlation_id
let record_len = 4 + 4 + 2 + 8 + 1 + 4; // 23 bytes for Int32 record
let mut body = vec![0u8; header_len + record_len];
body[0] = 0x32;
body[1..3].copy_from_slice(&1u16.to_le_bytes()); // version
body[3..7].copy_from_slice(&1i32.to_le_bytes()); // record_count = 1
body[7..23].copy_from_slice(&operation_id);
body[23..39].copy_from_slice(&item_correlation_id);
let off = header_len;
body[off..off + 4].copy_from_slice(&0i32.to_le_bytes()); // status
body[off + 4..off + 8].copy_from_slice(&0i32.to_le_bytes()); // detail_status
body[off + 8..off + 10].copy_from_slice(&0xC0u16.to_le_bytes()); // quality
body[off + 10..off + 18].copy_from_slice(&timestamp_filetime.to_le_bytes());
body[off + 18] = 0x02; // wire_kind = Int32
body[off + 19..off + 23].copy_from_slice(&int_value.to_le_bytes());
body
}
#[tokio::test]
async fn subscription_stream_yields_data_change_for_matching_correlation() {
use futures_util::StreamExt;
let (addr, handle) = unauthenticated_server(vec![(0, Vec::new())]).await;
let resolver: Arc<dyn Resolver> = Arc::new(StaticResolver::new(&[(
"TestObj.TestInt",
sample_metadata(),
)]));
let session = connect_test_session(addr, resolver).await.unwrap();
// Subscribe via the public path so we get a Subscription with
// its broadcast::Receiver wired in. We need the correlation id
// it generated to inject a matching message.
let mut sub = session.subscribe("TestObj.TestInt").await.unwrap();
let correlation = sub.correlation_id();
// Inject a 0x32 SubscriptionStatus with that correlation id.
let body = build_status_with_int32_record(
correlation,
[0xEE; 16], // operation_id (any)
42, // value
// FILETIME for ~2024-01-01 — exact value doesn't matter.
0x1F0E_2D60_4C00_0000,
);
let msg = NmxSubscriptionMessage::parse_inner(&body).unwrap();
test_inject_sender(&session).send(Arc::new(msg)).unwrap();
let dc = tokio::time::timeout(std::time::Duration::from_secs(1), sub.next())
.await
.expect("stream poll timed out")
.expect("stream returned None")
.expect("stream item was Err");
assert_eq!(&*dc.reference, "TestObj.TestInt");
assert_eq!(dc.value, mxaccess_codec::MxValue::Int32(42));
assert_eq!(dc.quality, 0xC0);
// Drain the unsubscribe HRESULT.
// Server already sent (0, Vec::new()) for the advise — the unsubscribe
// would consume the next response slot which we didn't queue. So
// drop the sub without unsubscribe to keep the test simple.
drop(sub);
handle.abort();
}
#[tokio::test]
async fn subscription_stream_filters_out_mismatched_correlation_for_status() {
use futures_util::StreamExt;
let (addr, handle) = unauthenticated_server(vec![(0, Vec::new())]).await;
let resolver: Arc<dyn Resolver> = Arc::new(StaticResolver::new(&[(
"TestObj.TestInt",
sample_metadata(),
)]));
let session = connect_test_session(addr, resolver).await.unwrap();
let mut sub = session.subscribe("TestObj.TestInt").await.unwrap();
// Inject a 0x32 with a DIFFERENT correlation id — should be filtered.
let body = build_status_with_int32_record(
[0xAA; 16], // mismatched
[0xEE; 16],
42,
0x1F0E_2D60_4C00_0000,
);
let msg = NmxSubscriptionMessage::parse_inner(&body).unwrap();
test_inject_sender(&session).send(Arc::new(msg)).unwrap();
// Stream should still be pending (no DataChange delivered).
let res = tokio::time::timeout(std::time::Duration::from_millis(150), sub.next()).await;
assert!(
res.is_err(),
"expected timeout (stream pending), got {res:?}"
);
drop(sub);
handle.abort();
}
#[tokio::test]
async fn subscription_stream_keeps_data_update_regardless_of_correlation() {
use futures_util::StreamExt;
// 0x33 DataUpdate has no item_correlation_id; the .NET-style
// filter passes them through to all subscriptions.
let (addr, handle) = unauthenticated_server(vec![(0, Vec::new())]).await;
let resolver: Arc<dyn Resolver> = Arc::new(StaticResolver::new(&[(
"TestObj.TestInt",
sample_metadata(),
)]));
let session = connect_test_session(addr, resolver).await.unwrap();
let mut sub = session.subscribe("TestObj.TestInt").await.unwrap();
// Build a minimal valid 0x33 DataUpdate body. record_count=1
// because parse_data_update always reads exactly one record
// regardless of the field; record carries an Int32.
let header_len = 23; // preamble (no correlation id for 0x33)
let record_len = 4 + 2 + 8 + 1 + 4; // status + quality + filetime + wire_kind + i32
let mut body = vec![0u8; header_len + record_len];
body[0] = 0x33;
body[1..3].copy_from_slice(&1u16.to_le_bytes());
body[3..7].copy_from_slice(&1i32.to_le_bytes());
body[7..23].copy_from_slice(&[0xEEu8; 16]); // operation_id
let off = header_len;
body[off..off + 4].copy_from_slice(&0i32.to_le_bytes()); // status
body[off + 4..off + 6].copy_from_slice(&0xC0u16.to_le_bytes()); // quality
body[off + 6..off + 14].copy_from_slice(&0x1F0E_2D60_4C00_0000i64.to_le_bytes());
body[off + 14] = 0x02; // wire_kind = Int32
body[off + 15..off + 19].copy_from_slice(&7i32.to_le_bytes());
let msg = NmxSubscriptionMessage::parse_inner(&body).unwrap();
test_inject_sender(&session).send(Arc::new(msg)).unwrap();
let dc = tokio::time::timeout(std::time::Duration::from_secs(1), sub.next())
.await
.expect("stream poll timed out")
.expect("stream returned None")
.expect("stream item was Err");
assert_eq!(dc.value, mxaccess_codec::MxValue::Int32(7));
drop(sub);
handle.abort();
}
#[test]
fn filetime_to_system_time_round_trip() {
// Build a SystemTime, convert to FILETIME, convert back.
// Sub-second precision is preserved at 100ns granularity.
let t = std::time::SystemTime::UNIX_EPOCH + std::time::Duration::new(1234, 5_000_000); // .005s
let ft = system_time_to_filetime(t).unwrap();
let t2 = filetime_to_system_time(ft);
assert_eq!(t, t2);
}
#[test]
fn filetime_to_system_time_pre_unix_epoch_saturates() {
// FILETIME for year 1900 is well before Unix epoch.
let pre_epoch_ft: i64 = 0; // FILETIME 0 = year 1601
let t = filetime_to_system_time(pre_epoch_ft);
assert_eq!(t, std::time::SystemTime::UNIX_EPOCH);
}
#[tokio::test]
async fn router_silently_drops_non_callback_events() {
// Bind / Auth3Ignored / ProtocolError / etc. should be ignored