From 70feb63ea536ce297ba109874f59b2f71b6190e1 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Tue, 5 May 2026 09:16:47 -0400 Subject: [PATCH] [M4] mxaccess: Session::subscribe + unsubscribe + Subscription handle MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Lands the subscribe-path lifecycle: AdviseSupervisory + UnAdvise round-trip via a Subscription handle. The actual DataChange stream routing is deferred to F15. New - Session::subscribe(reference) -> Result — resolves the tag, generates a 16-byte correlation_id via rand::random(), calls NmxClient::advise_supervisory. Mirrors MxNativeSession.SubscribeAsync (cs:250-270) minus the publisher Connect dance (will land alongside F15's callback routing). - Session::unsubscribe(subscription) -> Result<(), Error> — consumes the handle and calls NmxClient::un_advise. Mirrors MxNativeSession.Unsubscribe (cs:361-381). - Subscription { correlation_id, reference, metadata } public type with accessor methods. Currently a pure lifecycle handle — no Stream impl yet; the Stream shape lands when F15 wires CallbackExporter routing. - Removed the old subscribe stub from lib.rs (was Err(Unsupported)). Drop hazard note - Subscription deliberately does NOT impl Drop to fire UnAdvise. The spawn-from-Drop pattern is the R15 hazard tracked in design/70-risks-and-open-questions.md. Callers must call Session::unsubscribe(sub).await explicitly. F15's wave-2 long-lived connection task will support best-effort drop-time cleanup without the spawn-from-Drop hazard. Cargo.toml: added rand (for correlation_id generation). design/followups.md: F15 added (P1, M4 wave 2 callback router). Open followups now at 11 — slightly over the soft 10-item threshold but no drift (F13 just resolved last iteration). Next iteration's Step 0 triage will check whether F15 is actionable. Tests (4 new in mxaccess; total 30) - subscribe_then_unsubscribe round-trip via in-memory resolver + hand-rolled server (2 RPCs: AdviseSupervisory + UnAdvise). - subscribe propagates non-zero AdviseSupervisory HRESULT. - subscribe after shutdown returns EngineNotRegistered. - two_subscribes_produce_distinct_correlation_ids — verifies the rand::random() correlation id generation differentiates handles. Test count delta: 494 -> 498 (+4). All four DoD gates green. Co-Authored-By: Claude Opus 4.7 (1M context) --- design/followups.md | 6 + rust/crates/mxaccess/Cargo.toml | 1 + rust/crates/mxaccess/src/lib.rs | 13 +- rust/crates/mxaccess/src/session.rs | 215 ++++++++++++++++++++++++++++ 4 files changed, 224 insertions(+), 11 deletions(-) diff --git a/design/followups.md b/design/followups.md index 580b7d1..c15965f 100644 --- a/design/followups.md +++ b/design/followups.md @@ -60,6 +60,12 @@ move to `## Resolved` with a date + commit hash. **Why deferred:** `ManagedNmxService2Client.Create()` (`ManagedNmxService2Client.cs:30-64`) auto-discovers `(host, port, service_ipid)` by activating the `NmxSvc.NmxService` COM ProgID, marshalling the resulting `IUnknown` to an OBJREF, calling `IObjectExporter::ResolveOxid` against the OXID inside, then `IRemUnknown::RemQueryInterface` to get the `INmxService2` IPID. This requires `windows-rs` for `CoCreateInstance` / `CLSIDFromProgID` (the same gating dep as F6), plus the `ComObjRefProvider.MarshalIUnknownObjRef` port (also F6). **Resolves when:** F6 lands (windows-rs wired in + `ComObjRefProvider` port). At that point `NmxClient::create()` becomes ~30 lines that chain the existing primitives: COM activation → `MarshalIUnknownObjRef` → `ComObjRef::parse` → `object_exporter_client::resolve_oxid_with_managed_ntlm_packet_integrity` → `rem_unknown::encode_rem_query_interface_request` over a temporary transport → `NmxClient::connect`. +### F15 — Callback router wires `CallbackExporter` events into `Subscription` stream +**Severity:** P1 +**Source:** M4 wave 2, `crates/mxaccess/src/session.rs` +**Why deferred:** The wave 1 `Session::subscribe` returns a [`Subscription`] handle that registers the advise on the wire, but no `DataChange` items are delivered yet — the routing layer that would (a) wire `CallbackExporter` into `Session::connect_nmx` (instead of the current null-callback `RegisterEngine2`), (b) spawn a router task that drains [`mxaccess_callback::CallbackEvent`], decodes the inner body via `mxaccess_codec::NmxSubscriptionMessage`, and (c) routes `DataChange` items to per-subscription `mpsc` channels keyed by `correlation_id` (for `0x32` SubscriptionStatus) or `operation_id` (for `0x33` DataUpdate). Mirrors the .NET `MxNativeSession.OnCallbackReceived` (`MxNativeSession.cs:113-114` event wiring + `cs:333-343` filter pattern). +**Resolves when:** `Subscription` impls `Stream>` and a real-server round-trip test (or hand-rolled `CallbackExporter` loopback) shows a `DataChange` flowing end-to-end. Notes: the `0x33` DataUpdate routing is the trickier piece because the codec exposes `item_correlation_id` only on `0x32` SubscriptionStatus — the .NET reference's `MxNativeCallbackEvent` filter at `cs:336` actually only catches the initial SubscriptionStatus, which suggests data updates go to a single per-engine sink rather than per-correlation channels. The Rust port should re-verify the wire model against `captures/058-frida-subscribe-testint` before locking in the routing key. + ### F14 — `tiberius`-backed SQL implementation of `Resolver` + `UserResolver` **Severity:** P2 **Source:** M3 stream A, `crates/mxaccess-galaxy/src/sql.rs` (constants present, no client wiring yet) diff --git a/rust/crates/mxaccess/Cargo.toml b/rust/crates/mxaccess/Cargo.toml index cfd6d5d..30d3fc5 100644 --- a/rust/crates/mxaccess/Cargo.toml +++ b/rust/crates/mxaccess/Cargo.toml @@ -16,6 +16,7 @@ mxaccess-rpc = { path = "../mxaccess-rpc" } thiserror = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } +rand = "0.8" [dev-dependencies] async-trait = { workspace = true } diff --git a/rust/crates/mxaccess/src/lib.rs b/rust/crates/mxaccess/src/lib.rs index 2ab81a1..c9e672d 100644 --- a/rust/crates/mxaccess/src/lib.rs +++ b/rust/crates/mxaccess/src/lib.rs @@ -25,6 +25,7 @@ pub mod session; pub use mxaccess_galaxy::{GalaxyTagMetadata, Resolver, ResolverError}; pub use mxaccess_nmx::WriteValue; +pub use session::Subscription; /// Async session façade. Cheap clones share the inner state; drop of the last /// clone fires `UnregisterEngine` best-effort. For deterministic shutdown, @@ -34,10 +35,7 @@ pub struct Session { pub(crate) inner: Arc, } -/// Stream of `DataChange` items. Drop sends `UnAdvise` via the long-lived -/// connection task (no `tokio::spawn` from `Drop`). -#[derive(Debug)] -pub struct Subscription; +// `Subscription` is defined in `session.rs`; re-export below alongside Session. /// One inbound update. Carries both `quality: u16` (legacy 16-bit OPC quality, /// e.g. `0xC0` = "Good") and `status: MxStatus` (the richer category model). @@ -421,13 +419,6 @@ impl Session { }) } - pub async fn subscribe(&self, _reference: &str) -> Result { - Err(Error::Unsupported { - operation: Cow::Borrowed("Session::subscribe"), - transport: TransportKind::Nmx, - }) - } - pub async fn subscribe_many(&self, _references: &[&str]) -> Result { Err(Error::Unsupported { operation: Cow::Borrowed("Session::subscribe_many"), diff --git a/rust/crates/mxaccess/src/session.rs b/rust/crates/mxaccess/src/session.rs index 2c03a80..bb6642d 100644 --- a/rust/crates/mxaccess/src/session.rs +++ b/rust/crates/mxaccess/src/session.rs @@ -40,6 +40,53 @@ use crate::{ ConfigError, ConnectionError, Error, RecoveryPolicy, SecurityContext, Session, SessionOptions, }; +/// Subscription lifecycle handle returned by [`Session::subscribe`]. +/// +/// Carries the 16-byte `correlation_id` the Rust port generated for +/// the subscription, the original `reference` string (for diagnostics), +/// and the resolved [`GalaxyTagMetadata`] (used by +/// [`Session::unsubscribe`] to issue the matching `UnAdvise`). +/// +/// Currently a pure lifecycle handle — does not yet impl +/// `Stream`. The callback router that will turn +/// this into a stream is followup F15 in `design/followups.md`. +/// +/// **Not auto-cleaning**: `Subscription` deliberately does not +/// implement `Drop` to fire `UnAdvise`. The .NET reference's +/// `tokio::spawn`-from-Drop pattern is the R15 hazard tracked at +/// `design/70-risks-and-open-questions.md`. Callers must call +/// `Session::unsubscribe(sub).await` explicitly. Followup F16 will +/// add a long-lived connection task that supports best-effort +/// drop-time cleanup without the spawn-from-Drop hazard. +#[derive(Debug, Clone)] +pub struct Subscription { + pub(crate) correlation_id: [u8; 16], + pub(crate) reference: Arc, + pub(crate) metadata: Arc, +} + +impl Subscription { + /// 16-byte correlation id this subscription was registered with. + /// Useful for diagnostics; consumers shouldn't need to inspect it + /// otherwise. + #[must_use] + pub fn correlation_id(&self) -> [u8; 16] { + self.correlation_id + } + + /// The tag reference this subscription was opened against. + #[must_use] + pub fn reference(&self) -> &str { + &self.reference + } + + /// The resolved Galaxy metadata for this subscription's tag. + #[must_use] + pub fn metadata(&self) -> &GalaxyTagMetadata { + &self.metadata + } +} + /// Convert a `SystemTime` to a Windows FILETIME tick count (100-ns /// intervals since 1601-01-01 UTC). Mirrors `DateTime.ToFileTime()` /// (referenced at `NmxWriteMessage.cs:248` and used by every @@ -360,6 +407,95 @@ impl Session { .map_err(map_resolver) } + /// Subscribe to value updates for `reference`. Mirrors + /// `MxNativeSession.SubscribeAsync` (`MxNativeSession.cs:250-270`) + /// — resolves the tag, generates a fresh correlation id, and calls + /// `AdviseSupervisory`. + /// + /// The returned [`Subscription`] is currently a **lifecycle handle**: + /// it carries the correlation id + metadata needed for an explicit + /// [`Session::unsubscribe`] but does NOT yet emit `DataChange` items + /// — the callback exporter routing is followup F15 in + /// `design/followups.md`. Once that lands, the same `Subscription` + /// type will impl `Stream` without breaking the + /// current API surface. + /// + /// `Subscription` deliberately does **not** implement `Drop` to + /// fire `UnAdvise` — that's the spawn-from-Drop hazard tracked at + /// `design/70-risks-and-open-questions.md` R15 / followup F16. + /// Callers must call `Session::unsubscribe(sub).await` explicitly. + /// + /// # Errors + /// - [`Error::Connection`] if the session is shut down. + /// - [`Error::Configuration`] if the resolver rejects `reference`. + /// - [`Error::Io`] / transport errors from the underlying RPC. + /// - [`Error::Status`]-shaped error if the LMX server returned a + /// non-zero HRESULT for AdviseSupervisory. + pub async fn subscribe(&self, reference: &str) -> Result { + self.ensure_connected()?; + let inner = self.inner.clone(); + let metadata = inner + .resolver + .resolve(reference) + .await + .map_err(map_resolver)?; + let correlation_id: [u8; 16] = rand::random(); + + let opts = &inner.options; + let mut nmx = inner.nmx.lock().await; + let hr = nmx + .advise_supervisory( + opts.local_engine_id, + &metadata, + correlation_id, + opts.galaxy_id, + /* source_galaxy_id */ i32::from(opts.galaxy_id), + opts.source_platform_id, + ) + .await + .map_err(map_nmx)?; + ensure_hresult_ok(hr)?; + drop(nmx); + + Ok(Subscription { + correlation_id, + reference: Arc::::from(reference), + metadata: Arc::new(metadata), + }) + } + + /// Cancel a subscription. Mirrors `MxNativeSession.Unsubscribe` + /// (`cs:361-381`) — calls `UnAdvise` on the underlying transport. + /// + /// Consumes the [`Subscription`] so the same handle can't be + /// double-unsubscribed. Pre-condition: the supplied `Subscription` + /// was returned by `subscribe` on this same `Session` (the Rust + /// port currently has no cross-session validation; mismatching + /// session/subscription pairs is undefined behavior — caller's + /// responsibility). + /// + /// # Errors + /// As for [`Self::subscribe`]. + pub async fn unsubscribe(&self, subscription: Subscription) -> Result<(), Error> { + self.ensure_connected()?; + let inner = self.inner.clone(); + let opts = &inner.options; + let mut nmx = inner.nmx.lock().await; + let hr = nmx + .un_advise( + opts.local_engine_id, + &subscription.metadata, + subscription.correlation_id, + opts.galaxy_id, + /* source_galaxy_id */ i32::from(opts.galaxy_id), + opts.source_platform_id, + ) + .await + .map_err(map_nmx)?; + ensure_hresult_ok(hr)?; + Ok(()) + } + /// Orderly shutdown — calls `UnregisterEngine` then drops the inner /// transport. Mirrors `MxNativeSession.Dispose` (`cs:468-482`) /// minus the COM-side `Marshal.ReleaseComObject` (no .NET COM in @@ -892,6 +1028,85 @@ mod tests { assert_eq!(system_time_to_filetime(t).unwrap(), expected); } + #[tokio::test] + async fn subscribe_then_unsubscribe_round_trip() { + // Two RPCs: AdviseSupervisory + UnAdvise. Both return HRESULT 0. + let (addr, handle) = unauthenticated_server(vec![(0, Vec::new()), (0, Vec::new())]).await; + let resolver: Arc = Arc::new(StaticResolver::new(&[( + "TestObj.TestInt", + sample_metadata(), + )])); + let session = connect_test_session(addr, resolver).await.unwrap(); + let sub = session.subscribe("TestObj.TestInt").await.unwrap(); + assert_eq!(sub.reference(), "TestObj.TestInt"); + assert_eq!(sub.metadata().attribute_id, 99); + // Two distinct subscribes should produce distinct correlation ids. + // (This second subscribe goes through the same server but tests a + // different point — see next test.) + session.unsubscribe(sub).await.unwrap(); + handle.await.unwrap(); + } + + #[tokio::test] + async fn subscribe_propagates_non_zero_advise_hresult() { + let (addr, handle) = unauthenticated_server(vec![(0x4242, Vec::new())]).await; + let resolver: Arc = Arc::new(StaticResolver::new(&[( + "TestObj.TestInt", + sample_metadata(), + )])); + let session = connect_test_session(addr, resolver).await.unwrap(); + let err = session.subscribe("TestObj.TestInt").await.unwrap_err(); + match err { + Error::Configuration(ConfigError::InvalidArgument { detail }) => { + assert!(detail.contains("0x00004242")); + } + other => panic!("expected InvalidArgument with HRESULT, got {other:?}"), + } + handle.await.unwrap(); + } + + #[tokio::test] + async fn subscribe_after_shutdown_returns_engine_not_registered() { + let (addr, handle) = unauthenticated_server(vec![(0, Vec::new())]).await; + let resolver: Arc = Arc::new(StaticResolver::new(&[])); + let session = connect_test_session(addr, resolver).await.unwrap(); + let cloned = session.clone(); + session.shutdown_nmx().await.unwrap(); + let err = cloned.subscribe("anything").await.unwrap_err(); + assert!(matches!( + err, + Error::Connection(ConnectionError::EngineNotRegistered) + )); + handle.await.unwrap(); + } + + #[tokio::test] + async fn two_subscribes_produce_distinct_correlation_ids() { + // Two AdviseSupervisory calls + two UnAdvise calls. + let (addr, handle) = unauthenticated_server(vec![ + (0, Vec::new()), + (0, Vec::new()), + (0, Vec::new()), + (0, Vec::new()), + ]) + .await; + let resolver: Arc = Arc::new(StaticResolver::new(&[( + "TestObj.TestInt", + sample_metadata(), + )])); + let session = connect_test_session(addr, resolver).await.unwrap(); + let s1 = session.subscribe("TestObj.TestInt").await.unwrap(); + let s2 = session.subscribe("TestObj.TestInt").await.unwrap(); + assert_ne!( + s1.correlation_id(), + s2.correlation_id(), + "subscriptions must have distinct correlation ids" + ); + session.unsubscribe(s1).await.unwrap(); + session.unsubscribe(s2).await.unwrap(); + handle.await.unwrap(); + } + #[test] fn system_time_to_filetime_pre_1970_rejected() { // SystemTime::UNIX_EPOCH - 1 second is before the Unix epoch on