[M4] mxaccess: Session::subscribe + unsubscribe + Subscription handle

Lands the subscribe-path lifecycle: AdviseSupervisory + UnAdvise
round-trip via a Subscription handle. The actual DataChange stream
routing is deferred to F15.

New
- Session::subscribe(reference) -> Result<Subscription, Error> —
  resolves the tag, generates a 16-byte correlation_id via
  rand::random(), calls NmxClient::advise_supervisory. Mirrors
  MxNativeSession.SubscribeAsync (cs:250-270) minus the publisher
  Connect dance (will land alongside F15's callback routing).
- Session::unsubscribe(subscription) -> Result<(), Error> — consumes
  the handle and calls NmxClient::un_advise. Mirrors
  MxNativeSession.Unsubscribe (cs:361-381).
- Subscription { correlation_id, reference, metadata } public type
  with accessor methods. Currently a pure lifecycle handle — no
  Stream impl yet; the Stream<Item=DataChange> shape lands when F15
  wires CallbackExporter routing.
- Removed the old subscribe stub from lib.rs (was Err(Unsupported)).

Drop hazard note
- Subscription deliberately does NOT impl Drop to fire UnAdvise. The
  spawn-from-Drop pattern is the R15 hazard tracked in
  design/70-risks-and-open-questions.md. Callers must call
  Session::unsubscribe(sub).await explicitly. F15's wave-2 long-lived
  connection task will support best-effort drop-time cleanup without
  the spawn-from-Drop hazard.

Cargo.toml: added rand (for correlation_id generation).

design/followups.md: F15 added (P1, M4 wave 2 callback router).
Open followups now at 11 — slightly over the soft 10-item threshold
but no drift (F13 just resolved last iteration). Next iteration's
Step 0 triage will check whether F15 is actionable.

Tests (4 new in mxaccess; total 30)
- subscribe_then_unsubscribe round-trip via in-memory resolver +
  hand-rolled server (2 RPCs: AdviseSupervisory + UnAdvise).
- subscribe propagates non-zero AdviseSupervisory HRESULT.
- subscribe after shutdown returns EngineNotRegistered.
- two_subscribes_produce_distinct_correlation_ids — verifies the
  rand::random() correlation id generation differentiates handles.

Test count delta: 494 -> 498 (+4). All four DoD gates green.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-05 09:16:47 -04:00
parent bf95995573
commit 70feb63ea5
4 changed files with 224 additions and 11 deletions
+6
View File
@@ -60,6 +60,12 @@ move to `## Resolved` with a date + commit hash.
**Why deferred:** `ManagedNmxService2Client.Create()` (`ManagedNmxService2Client.cs:30-64`) auto-discovers `(host, port, service_ipid)` by activating the `NmxSvc.NmxService` COM ProgID, marshalling the resulting `IUnknown` to an OBJREF, calling `IObjectExporter::ResolveOxid` against the OXID inside, then `IRemUnknown::RemQueryInterface` to get the `INmxService2` IPID. This requires `windows-rs` for `CoCreateInstance` / `CLSIDFromProgID` (the same gating dep as F6), plus the `ComObjRefProvider.MarshalIUnknownObjRef` port (also F6).
**Resolves when:** F6 lands (windows-rs wired in + `ComObjRefProvider` port). At that point `NmxClient::create()` becomes ~30 lines that chain the existing primitives: COM activation → `MarshalIUnknownObjRef``ComObjRef::parse``object_exporter_client::resolve_oxid_with_managed_ntlm_packet_integrity``rem_unknown::encode_rem_query_interface_request` over a temporary transport → `NmxClient::connect`.
### F15 — Callback router wires `CallbackExporter` events into `Subscription` stream
**Severity:** P1
**Source:** M4 wave 2, `crates/mxaccess/src/session.rs`
**Why deferred:** The wave 1 `Session::subscribe` returns a [`Subscription`] handle that registers the advise on the wire, but no `DataChange` items are delivered yet — the routing layer that would (a) wire `CallbackExporter` into `Session::connect_nmx` (instead of the current null-callback `RegisterEngine2`), (b) spawn a router task that drains [`mxaccess_callback::CallbackEvent`], decodes the inner body via `mxaccess_codec::NmxSubscriptionMessage`, and (c) routes `DataChange` items to per-subscription `mpsc` channels keyed by `correlation_id` (for `0x32` SubscriptionStatus) or `operation_id` (for `0x33` DataUpdate). Mirrors the .NET `MxNativeSession.OnCallbackReceived` (`MxNativeSession.cs:113-114` event wiring + `cs:333-343` filter pattern).
**Resolves when:** `Subscription` impls `Stream<Item = Result<DataChange, Error>>` and a real-server round-trip test (or hand-rolled `CallbackExporter` loopback) shows a `DataChange` flowing end-to-end. Notes: the `0x33` DataUpdate routing is the trickier piece because the codec exposes `item_correlation_id` only on `0x32` SubscriptionStatus — the .NET reference's `MxNativeCallbackEvent` filter at `cs:336` actually only catches the initial SubscriptionStatus, which suggests data updates go to a single per-engine sink rather than per-correlation channels. The Rust port should re-verify the wire model against `captures/058-frida-subscribe-testint` before locking in the routing key.
### F14 — `tiberius`-backed SQL implementation of `Resolver` + `UserResolver`
**Severity:** P2
**Source:** M3 stream A, `crates/mxaccess-galaxy/src/sql.rs` (constants present, no client wiring yet)
+1
View File
@@ -16,6 +16,7 @@ mxaccess-rpc = { path = "../mxaccess-rpc" }
thiserror = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
rand = "0.8"
[dev-dependencies]
async-trait = { workspace = true }
+2 -11
View File
@@ -25,6 +25,7 @@ pub mod session;
pub use mxaccess_galaxy::{GalaxyTagMetadata, Resolver, ResolverError};
pub use mxaccess_nmx::WriteValue;
pub use session::Subscription;
/// Async session façade. Cheap clones share the inner state; drop of the last
/// clone fires `UnregisterEngine` best-effort. For deterministic shutdown,
@@ -34,10 +35,7 @@ pub struct Session {
pub(crate) inner: Arc<session::SessionInner>,
}
/// Stream of `DataChange` items. Drop sends `UnAdvise` via the long-lived
/// connection task (no `tokio::spawn` from `Drop`).
#[derive(Debug)]
pub struct Subscription;
// `Subscription` is defined in `session.rs`; re-export below alongside Session.
/// One inbound update. Carries both `quality: u16` (legacy 16-bit OPC quality,
/// e.g. `0xC0` = "Good") and `status: MxStatus` (the richer category model).
@@ -421,13 +419,6 @@ impl Session {
})
}
pub async fn subscribe(&self, _reference: &str) -> Result<Subscription, Error> {
Err(Error::Unsupported {
operation: Cow::Borrowed("Session::subscribe"),
transport: TransportKind::Nmx,
})
}
pub async fn subscribe_many(&self, _references: &[&str]) -> Result<Subscription, Error> {
Err(Error::Unsupported {
operation: Cow::Borrowed("Session::subscribe_many"),
+215
View File
@@ -40,6 +40,53 @@ use crate::{
ConfigError, ConnectionError, Error, RecoveryPolicy, SecurityContext, Session, SessionOptions,
};
/// Subscription lifecycle handle returned by [`Session::subscribe`].
///
/// Carries the 16-byte `correlation_id` the Rust port generated for
/// the subscription, the original `reference` string (for diagnostics),
/// and the resolved [`GalaxyTagMetadata`] (used by
/// [`Session::unsubscribe`] to issue the matching `UnAdvise`).
///
/// Currently a pure lifecycle handle — does not yet impl
/// `Stream<Item = DataChange>`. The callback router that will turn
/// this into a stream is followup F15 in `design/followups.md`.
///
/// **Not auto-cleaning**: `Subscription` deliberately does not
/// implement `Drop` to fire `UnAdvise`. The .NET reference's
/// `tokio::spawn`-from-Drop pattern is the R15 hazard tracked at
/// `design/70-risks-and-open-questions.md`. Callers must call
/// `Session::unsubscribe(sub).await` explicitly. Followup F16 will
/// add a long-lived connection task that supports best-effort
/// drop-time cleanup without the spawn-from-Drop hazard.
#[derive(Debug, Clone)]
pub struct Subscription {
pub(crate) correlation_id: [u8; 16],
pub(crate) reference: Arc<str>,
pub(crate) metadata: Arc<GalaxyTagMetadata>,
}
impl Subscription {
/// 16-byte correlation id this subscription was registered with.
/// Useful for diagnostics; consumers shouldn't need to inspect it
/// otherwise.
#[must_use]
pub fn correlation_id(&self) -> [u8; 16] {
self.correlation_id
}
/// The tag reference this subscription was opened against.
#[must_use]
pub fn reference(&self) -> &str {
&self.reference
}
/// The resolved Galaxy metadata for this subscription's tag.
#[must_use]
pub fn metadata(&self) -> &GalaxyTagMetadata {
&self.metadata
}
}
/// Convert a `SystemTime` to a Windows FILETIME tick count (100-ns
/// intervals since 1601-01-01 UTC). Mirrors `DateTime.ToFileTime()`
/// (referenced at `NmxWriteMessage.cs:248` and used by every
@@ -360,6 +407,95 @@ impl Session {
.map_err(map_resolver)
}
/// Subscribe to value updates for `reference`. Mirrors
/// `MxNativeSession.SubscribeAsync` (`MxNativeSession.cs:250-270`)
/// — resolves the tag, generates a fresh correlation id, and calls
/// `AdviseSupervisory`.
///
/// The returned [`Subscription`] is currently a **lifecycle handle**:
/// it carries the correlation id + metadata needed for an explicit
/// [`Session::unsubscribe`] but does NOT yet emit `DataChange` items
/// — the callback exporter routing is followup F15 in
/// `design/followups.md`. Once that lands, the same `Subscription`
/// type will impl `Stream<Item = DataChange>` without breaking the
/// current API surface.
///
/// `Subscription` deliberately does **not** implement `Drop` to
/// fire `UnAdvise` — that's the spawn-from-Drop hazard tracked at
/// `design/70-risks-and-open-questions.md` R15 / followup F16.
/// Callers must call `Session::unsubscribe(sub).await` explicitly.
///
/// # Errors
/// - [`Error::Connection`] if the session is shut down.
/// - [`Error::Configuration`] if the resolver rejects `reference`.
/// - [`Error::Io`] / transport errors from the underlying RPC.
/// - [`Error::Status`]-shaped error if the LMX server returned a
/// non-zero HRESULT for AdviseSupervisory.
pub async fn subscribe(&self, reference: &str) -> Result<Subscription, Error> {
self.ensure_connected()?;
let inner = self.inner.clone();
let metadata = inner
.resolver
.resolve(reference)
.await
.map_err(map_resolver)?;
let correlation_id: [u8; 16] = rand::random();
let opts = &inner.options;
let mut nmx = inner.nmx.lock().await;
let hr = nmx
.advise_supervisory(
opts.local_engine_id,
&metadata,
correlation_id,
opts.galaxy_id,
/* source_galaxy_id */ i32::from(opts.galaxy_id),
opts.source_platform_id,
)
.await
.map_err(map_nmx)?;
ensure_hresult_ok(hr)?;
drop(nmx);
Ok(Subscription {
correlation_id,
reference: Arc::<str>::from(reference),
metadata: Arc::new(metadata),
})
}
/// Cancel a subscription. Mirrors `MxNativeSession.Unsubscribe`
/// (`cs:361-381`) — calls `UnAdvise` on the underlying transport.
///
/// Consumes the [`Subscription`] so the same handle can't be
/// double-unsubscribed. Pre-condition: the supplied `Subscription`
/// was returned by `subscribe` on this same `Session` (the Rust
/// port currently has no cross-session validation; mismatching
/// session/subscription pairs is undefined behavior — caller's
/// responsibility).
///
/// # Errors
/// As for [`Self::subscribe`].
pub async fn unsubscribe(&self, subscription: Subscription) -> Result<(), Error> {
self.ensure_connected()?;
let inner = self.inner.clone();
let opts = &inner.options;
let mut nmx = inner.nmx.lock().await;
let hr = nmx
.un_advise(
opts.local_engine_id,
&subscription.metadata,
subscription.correlation_id,
opts.galaxy_id,
/* source_galaxy_id */ i32::from(opts.galaxy_id),
opts.source_platform_id,
)
.await
.map_err(map_nmx)?;
ensure_hresult_ok(hr)?;
Ok(())
}
/// Orderly shutdown — calls `UnregisterEngine` then drops the inner
/// transport. Mirrors `MxNativeSession.Dispose` (`cs:468-482`)
/// minus the COM-side `Marshal.ReleaseComObject` (no .NET COM in
@@ -892,6 +1028,85 @@ mod tests {
assert_eq!(system_time_to_filetime(t).unwrap(), expected);
}
#[tokio::test]
async fn subscribe_then_unsubscribe_round_trip() {
// Two RPCs: AdviseSupervisory + UnAdvise. Both return HRESULT 0.
let (addr, handle) = unauthenticated_server(vec![(0, Vec::new()), (0, Vec::new())]).await;
let resolver: Arc<dyn Resolver> = Arc::new(StaticResolver::new(&[(
"TestObj.TestInt",
sample_metadata(),
)]));
let session = connect_test_session(addr, resolver).await.unwrap();
let sub = session.subscribe("TestObj.TestInt").await.unwrap();
assert_eq!(sub.reference(), "TestObj.TestInt");
assert_eq!(sub.metadata().attribute_id, 99);
// Two distinct subscribes should produce distinct correlation ids.
// (This second subscribe goes through the same server but tests a
// different point — see next test.)
session.unsubscribe(sub).await.unwrap();
handle.await.unwrap();
}
#[tokio::test]
async fn subscribe_propagates_non_zero_advise_hresult() {
let (addr, handle) = unauthenticated_server(vec![(0x4242, Vec::new())]).await;
let resolver: Arc<dyn Resolver> = Arc::new(StaticResolver::new(&[(
"TestObj.TestInt",
sample_metadata(),
)]));
let session = connect_test_session(addr, resolver).await.unwrap();
let err = session.subscribe("TestObj.TestInt").await.unwrap_err();
match err {
Error::Configuration(ConfigError::InvalidArgument { detail }) => {
assert!(detail.contains("0x00004242"));
}
other => panic!("expected InvalidArgument with HRESULT, got {other:?}"),
}
handle.await.unwrap();
}
#[tokio::test]
async fn subscribe_after_shutdown_returns_engine_not_registered() {
let (addr, handle) = unauthenticated_server(vec![(0, Vec::new())]).await;
let resolver: Arc<dyn Resolver> = Arc::new(StaticResolver::new(&[]));
let session = connect_test_session(addr, resolver).await.unwrap();
let cloned = session.clone();
session.shutdown_nmx().await.unwrap();
let err = cloned.subscribe("anything").await.unwrap_err();
assert!(matches!(
err,
Error::Connection(ConnectionError::EngineNotRegistered)
));
handle.await.unwrap();
}
#[tokio::test]
async fn two_subscribes_produce_distinct_correlation_ids() {
// Two AdviseSupervisory calls + two UnAdvise calls.
let (addr, handle) = unauthenticated_server(vec![
(0, Vec::new()),
(0, Vec::new()),
(0, Vec::new()),
(0, Vec::new()),
])
.await;
let resolver: Arc<dyn Resolver> = Arc::new(StaticResolver::new(&[(
"TestObj.TestInt",
sample_metadata(),
)]));
let session = connect_test_session(addr, resolver).await.unwrap();
let s1 = session.subscribe("TestObj.TestInt").await.unwrap();
let s2 = session.subscribe("TestObj.TestInt").await.unwrap();
assert_ne!(
s1.correlation_id(),
s2.correlation_id(),
"subscriptions must have distinct correlation ids"
);
session.unsubscribe(s1).await.unwrap();
session.unsubscribe(s2).await.unwrap();
handle.await.unwrap();
}
#[test]
fn system_time_to_filetime_pre_1970_rejected() {
// SystemTime::UNIX_EPOCH - 1 second is before the Unix epoch on