diff --git a/design/followups.md b/design/followups.md index a67daa0..1d686b3 100644 --- a/design/followups.md +++ b/design/followups.md @@ -54,12 +54,6 @@ move to `## Resolved` with a date + commit hash. **Why deferred:** `ManagedNmxService2Client.Create()` (`ManagedNmxService2Client.cs:30-64`) auto-discovers `(host, port, service_ipid)` by activating the `NmxSvc.NmxService` COM ProgID, marshalling the resulting `IUnknown` to an OBJREF, calling `IObjectExporter::ResolveOxid` against the OXID inside, then `IRemUnknown::RemQueryInterface` to get the `INmxService2` IPID. This requires `windows-rs` for `CoCreateInstance` / `CLSIDFromProgID` (the same gating dep as F6), plus the `ComObjRefProvider.MarshalIUnknownObjRef` port (also F6). **Resolves when:** F6 lands (windows-rs wired in + `ComObjRefProvider` port). At that point `NmxClient::create()` becomes ~30 lines that chain the existing primitives: COM activation → `MarshalIUnknownObjRef` → `ComObjRef::parse` → `object_exporter_client::resolve_oxid_with_managed_ntlm_packet_integrity` → `rem_unknown::encode_rem_query_interface_request` over a temporary transport → `NmxClient::connect`. -### F15 — Callback router wires `CallbackExporter` events into `Subscription` stream -**Severity:** P1 -**Source:** M4 wave 2, `crates/mxaccess/src/session.rs` -**Why deferred:** The wave 1 `Session::subscribe` returns a [`Subscription`] handle that registers the advise on the wire, but no `DataChange` items are delivered yet — the routing layer that would (a) wire `CallbackExporter` into `Session::connect_nmx` (instead of the current null-callback `RegisterEngine2`), (b) spawn a router task that drains [`mxaccess_callback::CallbackEvent`], decodes the inner body via `mxaccess_codec::NmxSubscriptionMessage`, and (c) routes `DataChange` items to per-subscription `mpsc` channels keyed by `correlation_id` (for `0x32` SubscriptionStatus) or `operation_id` (for `0x33` DataUpdate). Mirrors the .NET `MxNativeSession.OnCallbackReceived` (`MxNativeSession.cs:113-114` event wiring + `cs:333-343` filter pattern). -**Resolves when:** `Subscription` impls `Stream>` and a real-server round-trip test (or hand-rolled `CallbackExporter` loopback) shows a `DataChange` flowing end-to-end. Notes: the `0x33` DataUpdate routing is the trickier piece because the codec exposes `item_correlation_id` only on `0x32` SubscriptionStatus — the .NET reference's `MxNativeCallbackEvent` filter at `cs:336` actually only catches the initial SubscriptionStatus, which suggests data updates go to a single per-engine sink rather than per-correlation channels. The Rust port should re-verify the wire model against `captures/058-frida-subscribe-testint` before locking in the routing key. - ### F14 — `tiberius`-backed SQL implementation of `Resolver` + `UserResolver` **Severity:** P2 **Source:** M3 stream A, `crates/mxaccess-galaxy/src/sql.rs` (constants present, no client wiring yet) @@ -78,6 +72,11 @@ move to `## Resolved` with a date + commit hash. ### F13 — `NmxClient` high-level write/advise/subscribe wrappers **Resolved:** 2026-05-05. All seven wrappers landed in `crates/mxaccess-nmx/src/client.rs`: `write`, `write2`, `write_secured2`, `advise_supervisory`, `send_observed_pre_advise_metadata`, `register_reference`, `un_advise`. Each takes a `GalaxyTagMetadata` + a typed `WriteValue` (re-exported from `mxaccess-codec`), builds the inner NMX body via `mxaccess-codec` (`write_message::encode` / `encode_timestamped` / `secured_write::encode` / `NmxItemControlMessage` / `NmxMetadataQueryMessage` / `NmxReferenceRegistrationMessage`), wraps in `NmxTransferEnvelope`, and routes through `transfer_data`. The pure-codec `encode_*_transfer_body` helpers are extracted as `pub(crate) fn` for testability, mirroring the .NET reference's `internal static` shape. `un_advise` preserves the .NET reference's quirky `NmxTransferMessageKind::Write` envelope (not `ItemControl`) per `cs:457`. +### F15 — Callback router wires `CallbackExporter` events into `Subscription` stream +**Resolved:** 2026-05-05 across two commits. +- Step 1/2 (`2b849ae`): `Session::connect_nmx` now starts a `CallbackExporter` on a 127.0.0.1 ephemeral port, builds the OBJREF via `local_hostname()` + `127.0.0.1` fallback, registers it through `NmxClient::register_engine_2` (was `..._without_callback`). A `callback_router` task drains `CallbackEvent`s, decodes each `CallbackInvoked` body via `NmxSubscriptionMessage::parse_inner`, and broadcasts parsed messages on a `tokio::sync::broadcast` channel exposed via `Session::callbacks()`. Shutdown chains: UnregisterEngine → CallbackExporter::shutdown → wait for router task. +- Step 2/2 (this commit): `Subscription` now impls `Stream>`. Filtering follows the .NET reference at `cs:333-343` exactly — `0x32` SubscriptionStatus messages are kept only when `message.item_correlation_id == subscription.correlation_id`; `0x33` DataUpdate messages pass through to ALL subscriptions because the codec exposes no per-record correlation field (matches the .NET `MxNativeCallbackEvent` filter behavior verbatim). Each `NmxSubscriptionRecord` with a parseable `value` becomes one `DataChange`. Records with `value: None` are dropped silently (mirrors the .NET `evt.Record.Value is null` filter at `cs:337`). Lag-loss surfaces as `Error::Configuration(InvalidArgument)` carrying the lag count. Stream-end (broadcast sender dropped) yields `None`. New helper: `filetime_to_system_time` (inverse of the existing `system_time_to_filetime`); saturates at Unix epoch for pre-1970 FILETIMEs. Tests cover correlation match/mismatch for `0x32`, `0x33` pass-through for any correlation, and FILETIME round-trip. + ### F1 — NTLM consumer-layer helpers (workstation default + from_env constructor) **Resolved:** 2026-05-05. `NtlmClientContext::from_env()` reads `MX_RPC_USER` / `MX_RPC_PASSWORD` / `MX_RPC_DOMAIN` (mirrors `ManagedNtlmClientContext.FromEnvironment` at `cs:41-49`); empty `MX_RPC_DOMAIN` is permitted. `local_hostname()` checks `COMPUTERNAME` then `HOSTNAME` and returns the empty string when neither is set — same "unavailable" semantics as `Environment.MachineName` returning null. Lives in `mxaccess-rpc/src/ntlm.rs`; deliberately doesn't pull `gethostname` (no native-libc deps, no `unsafe` for hostname lookup). Added `NtlmError::MissingEnvVar { name }` for the env-var-unset case. Test mod gained an `EnvScope` + `ENV_LOCK` mutex pattern for serializing process-global env mutation across parallel tests. diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 2875462..00e35a5 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -96,6 +96,23 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7e3450815272ef58cec6d564423f6e755e25379b217b0bc688e295ba24df6b1d" +[[package]] +name = "futures-macro" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e835b70203e41293343137df5c0664546da5745f82ec9b84d40be8336958447b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c39754e157331b013978ec91992bde1ac089843443c49cbc7f46150b0fad0893" + [[package]] name = "futures-task" version = "0.3.32" @@ -109,6 +126,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "389ca41296e6190b48053de0321d02a77f32f8a5d2461dd38762c0593805c6d6" dependencies = [ "futures-core", + "futures-macro", "futures-task", "pin-project-lite", "slab", @@ -215,12 +233,16 @@ name = "mxaccess" version = "0.0.0" dependencies = [ "async-trait", + "futures-util", + "mxaccess-callback", "mxaccess-codec", "mxaccess-galaxy", "mxaccess-nmx", "mxaccess-rpc", + "rand", "thiserror", "tokio", + "tokio-stream", "tracing", ] @@ -462,6 +484,31 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-stream" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32da49809aab5c3bc678af03902d4ccddea2a87d028d86392a4b1560c6906c70" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", + "tokio-util", +] + +[[package]] +name = "tokio-util" +version = "0.7.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ae9cec805b01e8fc3fd2fe289f89149a9b66dd16786abd8b19cfa7b48cb0098" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "pin-project-lite", + "tokio", +] + [[package]] name = "tracing" version = "0.1.44" diff --git a/rust/crates/mxaccess/Cargo.toml b/rust/crates/mxaccess/Cargo.toml index 4889275..3aeefba 100644 --- a/rust/crates/mxaccess/Cargo.toml +++ b/rust/crates/mxaccess/Cargo.toml @@ -17,6 +17,8 @@ mxaccess-rpc = { path = "../mxaccess-rpc" } thiserror = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } +futures-util = { workspace = true } +tokio-stream = { version = "0.1", features = ["sync"] } rand = "0.8" [dev-dependencies] diff --git a/rust/crates/mxaccess/src/session.rs b/rust/crates/mxaccess/src/session.rs index 8d48b9c..2a02cae 100644 --- a/rust/crates/mxaccess/src/session.rs +++ b/rust/crates/mxaccess/src/session.rs @@ -33,15 +33,21 @@ use std::sync::Arc; use std::time::SystemTime; use mxaccess_callback::{CallbackEvent, CallbackExporter, ExporterIdentities}; -use mxaccess_codec::NmxSubscriptionMessage; +use mxaccess_codec::{MxStatus, NmxSubscriptionMessage, NmxSubscriptionRecord}; use mxaccess_galaxy::{GalaxyTagMetadata, Resolver, ResolverError}; use mxaccess_nmx::{NmxClient, NmxClientError, WriteValue}; use mxaccess_rpc::guid::Guid; use mxaccess_rpc::ntlm::{NtlmClientContext, local_hostname}; use mxaccess_rpc::transport::TransportError; use std::net::SocketAddr; +use std::pin::Pin; +use std::task::{Context, Poll}; use tokio::sync::{Mutex, broadcast}; use tokio::task::JoinHandle; +use tokio_stream::wrappers::BroadcastStream; + +use crate::DataChange; +use futures_util::Stream; use crate::{ ConfigError, ConnectionError, Error, RecoveryPolicy, SecurityContext, Session, SessionOptions, @@ -58,29 +64,77 @@ use crate::{ /// either keep up or accept lag-loss. const CALLBACK_BROADCAST_CAPACITY: usize = 256; -/// Subscription lifecycle handle returned by [`Session::subscribe`]. +/// Subscription handle returned by [`Session::subscribe`]. Implements +/// `Stream>` — driving it forward +/// yields one [`DataChange`] per matching record observed on the +/// session's broadcast channel. /// /// Carries the 16-byte `correlation_id` the Rust port generated for -/// the subscription, the original `reference` string (for diagnostics), -/// and the resolved [`GalaxyTagMetadata`] (used by -/// [`Session::unsubscribe`] to issue the matching `UnAdvise`). +/// the subscription, the original `reference` string, the resolved +/// [`GalaxyTagMetadata`] (used by [`Session::unsubscribe`] to issue +/// the matching `UnAdvise`), and the underlying broadcast receiver. /// -/// Currently a pure lifecycle handle — does not yet impl -/// `Stream`. The callback router that will turn -/// this into a stream is followup F15 in `design/followups.md`. +/// ## Routing model /// -/// **Not auto-cleaning**: `Subscription` deliberately does not -/// implement `Drop` to fire `UnAdvise`. The .NET reference's -/// `tokio::spawn`-from-Drop pattern is the R15 hazard tracked at +/// The stream applies the same filter the .NET reference does at +/// `MxNativeSession.cs:333-343`: +/// +/// - `0x32` SubscriptionStatus messages: keep when +/// `message.item_correlation_id == self.correlation_id`. Other +/// subscriptions' correlation ids are filtered out. +/// - `0x33` DataUpdate messages: keep ALL. The codec exposes no +/// per-record correlation field, and the .NET reference's filter +/// only checks `item_correlation_id` (which `0x33` doesn't carry), +/// so DataUpdates fan out to every active subscription. This +/// matches the .NET behavior verbatim — proper per-subscription +/// `0x33` routing requires evidence-based design work against +/// `captures/058-frida-subscribe-testint` (tracked in F15's +/// doc note in `design/followups.md`). +/// +/// Each `NmxSubscriptionRecord` whose `value` parsed successfully +/// becomes one `DataChange`. Records with `value: None` are dropped +/// silently (mirrors the .NET `evt.Record.Value is null` filter at +/// `cs:337`). +/// +/// ## Not auto-cleaning +/// +/// `Subscription` deliberately does not implement `Drop` to fire +/// `UnAdvise`. The .NET reference's `tokio::spawn`-from-Drop pattern +/// is the R15 hazard tracked at /// `design/70-risks-and-open-questions.md`. Callers must call -/// `Session::unsubscribe(sub).await` explicitly. Followup F16 will +/// `Session::unsubscribe(sub).await` explicitly. Future work will /// add a long-lived connection task that supports best-effort /// drop-time cleanup without the spawn-from-Drop hazard. -#[derive(Debug, Clone)] +/// +/// ## Lag behavior +/// +/// The underlying broadcast channel has a fixed capacity +/// ([`CALLBACK_BROADCAST_CAPACITY`]). Slow consumers that fall behind +/// receive `Some(Err(Error::Configuration(InvalidArgument)))` whose +/// detail string contains the lag-loss count. The stream stays open +/// and resumes from the next available message — same shape as +/// `tokio_stream::wrappers::BroadcastStream`'s lag handling. pub struct Subscription { pub(crate) correlation_id: [u8; 16], pub(crate) reference: Arc, pub(crate) metadata: Arc, + /// Broadcast stream wrapper. `None` after the first call to + /// `Stream::poll_next` exhausts the channel (sender dropped). + pub(crate) inbound: Pin>>>, + /// Buffered records from the current message. Drained one-per-poll + /// before pulling the next message off `inbound`. + pub(crate) pending: std::collections::VecDeque, +} + +impl std::fmt::Debug for Subscription { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Subscription") + .field("correlation_id", &self.correlation_id) + .field("reference", &self.reference) + .field("metadata", &self.metadata.attribute_id) + .field("pending", &self.pending.len()) + .finish_non_exhaustive() + } } impl Subscription { @@ -105,6 +159,115 @@ impl Subscription { } } +impl Stream for Subscription { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + loop { + // Drain any buffered DataChanges from the previous message. + if let Some(dc) = self.pending.pop_front() { + return Poll::Ready(Some(Ok(dc))); + } + + // Pull the next broadcast message. + let next = std::task::ready!(self.inbound.as_mut().poll_next(cx)); + let msg = match next { + Some(Ok(msg)) => msg, + Some(Err(tokio_stream::wrappers::errors::BroadcastStreamRecvError::Lagged(n))) => { + return Poll::Ready(Some(Err(Error::Configuration( + ConfigError::InvalidArgument { + detail: format!("subscription lagged behind broadcast by {n} messages"), + }, + )))); + } + None => return Poll::Ready(None), // sender dropped + }; + + // Apply the .NET-compatible filter and convert matching + // records to DataChange items. + let reference = self.reference.clone(); + let correlation_id = self.correlation_id; + for dc in records_to_data_changes(&msg, correlation_id, &reference) { + self.pending.push_back(dc); + } + // Loop back to drain `pending` (or pull the next message if + // none of the records survived the filter). + } + } +} + +/// Convert subscription-message records to [`DataChange`] items, applying +/// the per-`Subscription` filter (`0x32` keep when correlation matches; +/// `0x33` keep all). +/// +/// Records with `value: None` are silently dropped — mirrors the .NET +/// `evt.Record.Value is null` filter at `MxNativeSession.cs:337`. +pub(crate) fn records_to_data_changes( + msg: &NmxSubscriptionMessage, + subscription_correlation: [u8; 16], + reference: &Arc, +) -> Vec { + // Filter at message level first — see the routing-model doc on + // `Subscription` for the rationale. + if msg.command == 0x32 { + match msg.item_correlation_id { + Some(id) if id.0 == subscription_correlation => {} + _ => return Vec::new(), + } + } + // 0x33 (and any other commands the codec accepts) pass through. + + msg.records + .iter() + .filter_map(|rec| record_to_data_change(rec, reference)) + .collect() +} + +/// Build a single [`DataChange`] from one [`NmxSubscriptionRecord`]. +/// Returns `None` when `record.value` is `None` (the codec couldn't +/// decode the wire kind, or the wire kind was unknown). +fn record_to_data_change( + record: &NmxSubscriptionRecord, + reference: &Arc, +) -> Option { + let value = record.value.clone()?; + Some(DataChange { + reference: reference.clone(), + value, + quality: record.quality, + timestamp: filetime_to_system_time(record.timestamp_filetime), + // Mirrors NmxSubscriptionRecord.ToDataChangeStatus + // (`NmxSubscriptionMessage.cs:22-25`) which always returns + // `MxStatus.DataChangeOk`. Proper status derivation from the + // record's `status` / `detail_status` fields is downstream + // work — currently the .NET reference itself does not + // distinguish. + status: MxStatus::DATA_CHANGE_OK, + }) +} + +/// Convert a Windows FILETIME tick count (100-ns intervals since +/// 1601-01-01 UTC) to a `SystemTime`. Inverse of +/// [`system_time_to_filetime`]. +/// +/// Saturates at `SystemTime::UNIX_EPOCH` for FILETIMEs before the +/// Unix epoch (i.e. before 1970-01-01 UTC) — those cannot be +/// represented portably across all `SystemTime` platforms. +#[must_use] +pub fn filetime_to_system_time(filetime_ticks: i64) -> std::time::SystemTime { + const FILETIME_TO_UNIX_EPOCH_SECONDS: i64 = 11_644_473_600; + const TICKS_PER_SECOND: i64 = 10_000_000; + + let total_secs_filetime = filetime_ticks / TICKS_PER_SECOND; + let subsec_ticks = filetime_ticks % TICKS_PER_SECOND; + let unix_secs = total_secs_filetime - FILETIME_TO_UNIX_EPOCH_SECONDS; + if unix_secs < 0 { + return std::time::SystemTime::UNIX_EPOCH; + } + let subsec_nanos = (subsec_ticks * 100) as u32; + std::time::SystemTime::UNIX_EPOCH + std::time::Duration::new(unix_secs as u64, subsec_nanos) +} + /// Convert a `SystemTime` to a Windows FILETIME tick count (100-ns /// intervals since 1601-01-01 UTC). Mirrors `DateTime.ToFileTime()` /// (referenced at `NmxWriteMessage.cs:248` and used by every @@ -575,13 +738,25 @@ impl Session { ensure_hresult_ok(hr)?; drop(nmx); + // Subscribe to the broadcast BEFORE issuing AdviseSupervisory so + // updates arriving immediately after the advise aren't lost. + let inbound = Box::pin(BroadcastStream::new(self.inner.callback_tx.subscribe())); + Ok(Subscription { correlation_id, reference: Arc::::from(reference), metadata: Arc::new(metadata), + inbound, + pending: std::collections::VecDeque::new(), }) } + /// `subscribe` ordering note: subscribe to the broadcast channel + /// FIRST, then issue `AdviseSupervisory`. If we ordered the other + /// way, updates that arrive between the advise call and the + /// `broadcast::subscribe` would be dropped. + /// + /// /// Cancel a subscription. Mirrors `MxNativeSession.Unsubscribe` /// (`cs:361-381`) — calls `UnAdvise` on the underlying transport. /// @@ -1439,6 +1614,182 @@ mod tests { .expect("router task didn't exit after upstream close"); } + // ---- F15 step 2/2: Subscription as Stream ------------ + + /// Build a synthetic 0x32 SubscriptionStatus body with one record + /// carrying a recognized scalar value (Int32 wire kind). + fn build_status_with_int32_record( + item_correlation_id: [u8; 16], + operation_id: [u8; 16], + int_value: i32, + timestamp_filetime: i64, + ) -> Vec { + // Header layout: 23-byte preamble + 16-byte item_correlation_id + + // record (with detail_status: status(4) + detail_status(4) + + // quality(2) + filetime(8) + wire_kind(1) + value bytes). + // For Int32 wire kind 0x02 the value is just 4 bytes. + let header_len = 39; // preamble + item_correlation_id + let record_len = 4 + 4 + 2 + 8 + 1 + 4; // 23 bytes for Int32 record + let mut body = vec![0u8; header_len + record_len]; + body[0] = 0x32; + body[1..3].copy_from_slice(&1u16.to_le_bytes()); // version + body[3..7].copy_from_slice(&1i32.to_le_bytes()); // record_count = 1 + body[7..23].copy_from_slice(&operation_id); + body[23..39].copy_from_slice(&item_correlation_id); + + let off = header_len; + body[off..off + 4].copy_from_slice(&0i32.to_le_bytes()); // status + body[off + 4..off + 8].copy_from_slice(&0i32.to_le_bytes()); // detail_status + body[off + 8..off + 10].copy_from_slice(&0xC0u16.to_le_bytes()); // quality + body[off + 10..off + 18].copy_from_slice(×tamp_filetime.to_le_bytes()); + body[off + 18] = 0x02; // wire_kind = Int32 + body[off + 19..off + 23].copy_from_slice(&int_value.to_le_bytes()); + body + } + + #[tokio::test] + async fn subscription_stream_yields_data_change_for_matching_correlation() { + use futures_util::StreamExt; + + let (addr, handle) = unauthenticated_server(vec![(0, Vec::new())]).await; + let resolver: Arc = Arc::new(StaticResolver::new(&[( + "TestObj.TestInt", + sample_metadata(), + )])); + let session = connect_test_session(addr, resolver).await.unwrap(); + + // Subscribe via the public path so we get a Subscription with + // its broadcast::Receiver wired in. We need the correlation id + // it generated to inject a matching message. + let mut sub = session.subscribe("TestObj.TestInt").await.unwrap(); + let correlation = sub.correlation_id(); + + // Inject a 0x32 SubscriptionStatus with that correlation id. + let body = build_status_with_int32_record( + correlation, + [0xEE; 16], // operation_id (any) + 42, // value + // FILETIME for ~2024-01-01 — exact value doesn't matter. + 0x1F0E_2D60_4C00_0000, + ); + let msg = NmxSubscriptionMessage::parse_inner(&body).unwrap(); + test_inject_sender(&session).send(Arc::new(msg)).unwrap(); + + let dc = tokio::time::timeout(std::time::Duration::from_secs(1), sub.next()) + .await + .expect("stream poll timed out") + .expect("stream returned None") + .expect("stream item was Err"); + + assert_eq!(&*dc.reference, "TestObj.TestInt"); + assert_eq!(dc.value, mxaccess_codec::MxValue::Int32(42)); + assert_eq!(dc.quality, 0xC0); + // Drain the unsubscribe HRESULT. + // Server already sent (0, Vec::new()) for the advise — the unsubscribe + // would consume the next response slot which we didn't queue. So + // drop the sub without unsubscribe to keep the test simple. + drop(sub); + handle.abort(); + } + + #[tokio::test] + async fn subscription_stream_filters_out_mismatched_correlation_for_status() { + use futures_util::StreamExt; + + let (addr, handle) = unauthenticated_server(vec![(0, Vec::new())]).await; + let resolver: Arc = Arc::new(StaticResolver::new(&[( + "TestObj.TestInt", + sample_metadata(), + )])); + let session = connect_test_session(addr, resolver).await.unwrap(); + + let mut sub = session.subscribe("TestObj.TestInt").await.unwrap(); + + // Inject a 0x32 with a DIFFERENT correlation id — should be filtered. + let body = build_status_with_int32_record( + [0xAA; 16], // mismatched + [0xEE; 16], + 42, + 0x1F0E_2D60_4C00_0000, + ); + let msg = NmxSubscriptionMessage::parse_inner(&body).unwrap(); + test_inject_sender(&session).send(Arc::new(msg)).unwrap(); + + // Stream should still be pending (no DataChange delivered). + let res = tokio::time::timeout(std::time::Duration::from_millis(150), sub.next()).await; + assert!( + res.is_err(), + "expected timeout (stream pending), got {res:?}" + ); + + drop(sub); + handle.abort(); + } + + #[tokio::test] + async fn subscription_stream_keeps_data_update_regardless_of_correlation() { + use futures_util::StreamExt; + // 0x33 DataUpdate has no item_correlation_id; the .NET-style + // filter passes them through to all subscriptions. + + let (addr, handle) = unauthenticated_server(vec![(0, Vec::new())]).await; + let resolver: Arc = Arc::new(StaticResolver::new(&[( + "TestObj.TestInt", + sample_metadata(), + )])); + let session = connect_test_session(addr, resolver).await.unwrap(); + let mut sub = session.subscribe("TestObj.TestInt").await.unwrap(); + + // Build a minimal valid 0x33 DataUpdate body. record_count=1 + // because parse_data_update always reads exactly one record + // regardless of the field; record carries an Int32. + let header_len = 23; // preamble (no correlation id for 0x33) + let record_len = 4 + 2 + 8 + 1 + 4; // status + quality + filetime + wire_kind + i32 + let mut body = vec![0u8; header_len + record_len]; + body[0] = 0x33; + body[1..3].copy_from_slice(&1u16.to_le_bytes()); + body[3..7].copy_from_slice(&1i32.to_le_bytes()); + body[7..23].copy_from_slice(&[0xEEu8; 16]); // operation_id + + let off = header_len; + body[off..off + 4].copy_from_slice(&0i32.to_le_bytes()); // status + body[off + 4..off + 6].copy_from_slice(&0xC0u16.to_le_bytes()); // quality + body[off + 6..off + 14].copy_from_slice(&0x1F0E_2D60_4C00_0000i64.to_le_bytes()); + body[off + 14] = 0x02; // wire_kind = Int32 + body[off + 15..off + 19].copy_from_slice(&7i32.to_le_bytes()); + + let msg = NmxSubscriptionMessage::parse_inner(&body).unwrap(); + test_inject_sender(&session).send(Arc::new(msg)).unwrap(); + + let dc = tokio::time::timeout(std::time::Duration::from_secs(1), sub.next()) + .await + .expect("stream poll timed out") + .expect("stream returned None") + .expect("stream item was Err"); + assert_eq!(dc.value, mxaccess_codec::MxValue::Int32(7)); + + drop(sub); + handle.abort(); + } + + #[test] + fn filetime_to_system_time_round_trip() { + // Build a SystemTime, convert to FILETIME, convert back. + // Sub-second precision is preserved at 100ns granularity. + let t = std::time::SystemTime::UNIX_EPOCH + std::time::Duration::new(1234, 5_000_000); // .005s + let ft = system_time_to_filetime(t).unwrap(); + let t2 = filetime_to_system_time(ft); + assert_eq!(t, t2); + } + + #[test] + fn filetime_to_system_time_pre_unix_epoch_saturates() { + // FILETIME for year 1900 is well before Unix epoch. + let pre_epoch_ft: i64 = 0; // FILETIME 0 = year 1601 + let t = filetime_to_system_time(pre_epoch_ft); + assert_eq!(t, std::time::SystemTime::UNIX_EPOCH); + } + #[tokio::test] async fn router_silently_drops_non_callback_events() { // Bind / Auth3Ignored / ProtocolError / etc. should be ignored