[F36 + F40 + F44] M6 wave 1: subscribe_buffered (NMX) + metrics + evidence

Three M6 sub-followups landed in this wave (sub-agent worktrees +
manual reconciliation in main):

**F36 — Session::subscribe_buffered (NMX) per R2 single-sample**
- `BufferedOptions::rounded_update_interval_ms()` — 100ms rounding
  helper mirroring MxNativeCompatibilityServer.cs:638
  ((updateInterval + 99) / 100) * 100, saturating on overflow.
- `Session::subscribe_buffered` (public, lib.rs:604) delegates to
  the new private `subscribe_buffered_nmx` which uses the buffered
  RegisterReference path: item_definition suffixed with
  `.property(buffer)`, subscribe=true (no separate
  AdviseSupervisory follow-up — verified against capture 082).
- Per R2 verified at wwtools/mxaccesscli/docs/api-notes.md the wire
  semantic is single-sample-per-event with a server-side cadence
  knob; rounded_ms is held client-side only (native MXAccess does
  not emit a separate SetBufferedUpdateInterval RPC, verified by
  absence in 079/082 captures).
- New crates/mxaccess/examples/subscribe-buffered.rs.
- New crates/mxaccess-codec/tests/buffered_register_reference_parity.rs:
  4 tests (capture 079/082 round-trip, suffix helper, constructive
  forward-build vs capture 082).

**F40 — Optional metrics feature**
- New crates/mxaccess/src/metrics.rs (275 lines): `pub(crate)`
  thin wrappers (`record_write_latency`, `record_read_latency`,
  `inc_writes`, `inc_reads`, `inc_advises`, `inc_recovery_*`,
  `set_active_subscriptions`, etc.) that compile to no-ops under
  `#[cfg(not(feature = "metrics"))]`. Call sites in session.rs +
  asb_session.rs invoke them unconditionally; the gate is inside
  the wrapper.
- `metrics = { version = "0.24", optional = true }` added to
  workspace + mxaccess crate Cargo.toml.
- Default build: zero metrics dep, zero runtime cost.

**F44 — Buffered batch + suspend capture decode evidence**
- New docs/M6-buffered-evidence.md: per-capture summary for
  077, 079, 080, 081, 082, 094 — call sequence, key wire bytes,
  R2/R5 verdict.
- R2 confirmed silently as "not a real risk" — single-sample
  observed across 079/080/082/094.
- R5 trigger conditions documented from capture 077: AdviseSupervisory
  + Suspend pair, 1-second intervals, succeeds on enum attributes.
- design/70-risks-and-open-questions.md R2/R5 status updated.

Workspace: 759 → 792 tests, clippy clean, rustdoc -D warnings clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-06 05:12:17 -04:00
parent d5aa152b1f
commit ad1cf2351c
11 changed files with 1428 additions and 102 deletions
+4
View File
@@ -62,6 +62,10 @@ crypto-bigint = "0.5"
quick-xml = "0.36"
tokio-util = { version = "0.7", features = ["codec"] }
zeroize = { version = "1", features = ["zeroize_derive"] }
# F40 — optional `mxaccess` feature `metrics`. Pin to 0.24.x (current
# stable line). The dep is only pulled in when the consumer enables
# `mxaccess/metrics`; the default build resolves without it.
metrics = "0.24"
[workspace.lints.rust]
unsafe_op_in_unsafe_fn = "warn"
@@ -0,0 +1,215 @@
//! Round-trip parity: buffered-subscribe `RegisterReference` (opcode `0x10`)
//! body, captured live with Frida.
//!
//! Closes the F36 DoD bullet 6 (`design/followups.md`): "Round-trip fixture
//! loaded from `captures/079-frida-add-buffered-advise-testint/` validating
//! the wire-byte sequence (call → response)."
//!
//! The .NET reference's [`MxNativeSession.RegisterBufferedItemAsync`]
//! (`MxNativeSession.cs:272-310`) builds a single `RegisterReference` frame
//! with `item_definition` suffixed by `.property(buffer)` and
//! `subscribe = true`. The Rust counterpart is
//! [`mxaccess::Session::subscribe_buffered`], which composes
//! [`mxaccess_codec::NmxReferenceRegistrationMessage::to_buffered_item_definition`]
//! with [`mxaccess_codec::NmxReferenceRegistrationMessage::encode`].
//!
//! Both fixtures below are the **inner LMX `RegisterReference` body** copied
//! verbatim from the corresponding capture's
//! `frida-events.tsv` (the `nmx.enter ... CNmxAdapter.PutRequest` row whose
//! candidate body starts with `10 01 00 ...`):
//!
//! - `082-frida-add-buffered-plain-advise-testint`: 173-byte body for
//! `(itemDefinition = "TestInt", itemContext = "TestChildObject")` with
//! correlation id `fb df 86 dc 1f c4 34 4b bb 26 a9 97 35 e9 b7 57`.
//! - `079-frida-add-buffered-advise-testint`: 173-byte body for the same
//! `(itemDefinition, itemContext)` pair with correlation id
//! `32 c3 d9 6d ed 72 f1 48 84 85 37 0c 66 bc f8 92`. (Capture 079 is the
//! `add-buffered-advise` scenario, which exercises the same wire frame
//! under a slightly different harness mode — both captures land on the
//! same `RegisterReference` shape.)
#![allow(
clippy::unwrap_used,
clippy::expect_used,
clippy::indexing_slicing,
clippy::panic
)]
use mxaccess_codec::NmxReferenceRegistrationMessage;
/// Decode a space-separated hex string into bytes. Mirrors
/// `Convert.FromHexString` from the `.NET` test helper.
fn hex_to_bytes(s: &str) -> Vec<u8> {
s.split_whitespace()
.map(|tok| u8::from_str_radix(tok, 16).expect("malformed hex token in fixture"))
.collect()
}
/// Captured `RegisterReference` (0x10) body from
/// `captures/082-frida-add-buffered-plain-advise-testint/frida-events.tsv`,
/// line 45 (`nmx.enter ... CNmxAdapter.PutRequest`, candidate size 173).
const CAPTURE_082_BODY_HEX: &str = "\
10 01 00 \
01 00 00 00 \
fb df 86 dc 1f c4 34 4b bb 26 a9 97 35 e9 b7 57 \
ff ff \
00 00 \
01 00 00 00 \
00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 \
32 00 00 81 \
54 00 65 00 73 00 74 00 49 00 6e 00 74 00 \
2e 00 70 00 72 00 6f 00 70 00 65 00 72 00 74 00 79 00 \
28 00 62 00 75 00 66 00 66 00 65 00 72 00 29 00 \
00 00 \
00 00 00 00 00 00 00 00 \
20 00 00 00 \
54 00 65 00 73 00 74 00 43 00 68 00 69 00 6c 00 64 00 \
4f 00 62 00 6a 00 65 00 63 00 74 00 \
00 00 \
00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 \
01";
/// Captured `RegisterReference` (0x10) body from
/// `captures/079-frida-add-buffered-advise-testint/frida-events.tsv`,
/// line 45 (`nmx.enter ... CNmxAdapter.PutRequest`, candidate size 173).
/// Differs from `CAPTURE_082_BODY_HEX` only in the 16-byte correlation id —
/// the rest of the wire shape is identical because both captures exercise
/// the same `(itemDefinition="TestInt", itemContext="TestChildObject")` pair.
const CAPTURE_079_BODY_HEX: &str = "\
10 01 00 \
01 00 00 00 \
32 c3 d9 6d ed 72 f1 48 84 85 37 0c 66 bc f8 92 \
ff ff \
00 00 \
01 00 00 00 \
00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 \
32 00 00 81 \
54 00 65 00 73 00 74 00 49 00 6e 00 74 00 \
2e 00 70 00 72 00 6f 00 70 00 65 00 72 00 74 00 79 00 \
28 00 62 00 75 00 66 00 66 00 65 00 72 00 29 00 \
00 00 \
00 00 00 00 00 00 00 00 \
20 00 00 00 \
54 00 65 00 73 00 74 00 43 00 68 00 69 00 6c 00 64 00 \
4f 00 62 00 6a 00 65 00 63 00 74 00 \
00 00 \
00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 \
01";
/// Helper — assemble a `NmxReferenceRegistrationMessage` matching the
/// captured fixture and assert it encodes to the same bytes the .NET
/// reference + LMX server emit on the wire. Mirrors the .NET reference's
/// `MxNativeSession.RegisterBufferedItemAsync` request build:
///
/// ```csharp
/// var message = new NmxReferenceRegistrationMessage(
/// itemHandle,
/// subscription.CorrelationId,
/// NmxReferenceRegistrationMessage.ToBufferedItemDefinition(itemDefinition),
/// itemContext,
/// Subscribe: true);
/// ```
fn assert_roundtrip(captured_hex: &str, correlation_id: [u8; 16]) {
let captured = hex_to_bytes(captured_hex);
// Parse the captured bytes — should succeed cleanly.
let parsed = NmxReferenceRegistrationMessage::parse(&captured)
.expect("parse captured RegisterReference body");
// Sanity-check the high-level fields the F36 implementation depends on.
assert_eq!(
parsed.item_handle, 1,
"captured item_handle (LMXProxy harness uses sequential int handles starting at 1)"
);
assert_eq!(parsed.item_correlation_id, correlation_id);
assert_eq!(
parsed.item_definition, "TestInt.property(buffer)",
"buffered suffix preserved"
);
assert!(
parsed.subscribe,
"subscribe flag — buffered RegisterReference always sets it (.NET MxNativeSession.cs:298)"
);
// Re-encode and confirm byte-identical.
let re_encoded = parsed.encode();
assert_eq!(
re_encoded, captured,
"RegisterReference body must round-trip byte-identical"
);
// Also confirm the suffix helper is idempotent on an already-buffered name
// — the .NET reference does the same case-insensitive guard at
// `NmxReferenceRegistrationMessage.cs:96-102`.
let resuffixed =
NmxReferenceRegistrationMessage::to_buffered_item_definition(&parsed.item_definition)
.expect("re-applying buffered suffix");
assert_eq!(resuffixed, parsed.item_definition);
}
#[test]
fn capture_082_register_reference_round_trips() {
assert_roundtrip(
CAPTURE_082_BODY_HEX,
[
0xfb, 0xdf, 0x86, 0xdc, 0x1f, 0xc4, 0x34, 0x4b, 0xbb, 0x26, 0xa9, 0x97, 0x35, 0xe9,
0xb7, 0x57,
],
);
}
#[test]
fn capture_079_register_reference_round_trips() {
assert_roundtrip(
CAPTURE_079_BODY_HEX,
[
0x32, 0xc3, 0xd9, 0x6d, 0xed, 0x72, 0xf1, 0x48, 0x84, 0x85, 0x37, 0x0c, 0x66, 0xbc,
0xf8, 0x92,
],
);
}
#[test]
fn buffered_suffix_helper_matches_captured_definition() {
// F36 DoD bullet 1 verification: the codec helper that the Rust
// `Session::subscribe_buffered` calls must produce the exact suffix
// the captured wire bytes carry.
let suffixed = NmxReferenceRegistrationMessage::to_buffered_item_definition("TestInt").unwrap();
assert_eq!(suffixed, "TestInt.property(buffer)");
}
#[test]
fn buffered_register_reference_constructed_from_session_inputs_matches_capture_082() {
// Forward-build the message from the same inputs `Session::subscribe_buffered`
// gathers (correlation id + already-suffixed item definition + empty
// item context, with subscribe=true) and assert the encoded body
// matches the capture once we plug in the capture's specific
// `(item_context = "TestChildObject")` from the .NET probe harness.
//
// The Rust simple-form `subscribe_buffered(reference, ...)` passes
// the FULL reference as `item_definition` with empty `item_context`;
// capture 082 came from the LMXProxy compatibility surface which
// splits the reference into `(itemDefinition="TestInt", itemContext="TestChildObject")`.
// Both forms are valid on the wire — this test exercises the
// split-context form to confirm the Rust codec produces the identical
// bytes the live LMX server saw.
let captured = hex_to_bytes(CAPTURE_082_BODY_HEX);
let item_definition =
NmxReferenceRegistrationMessage::to_buffered_item_definition("TestInt").unwrap();
let msg = NmxReferenceRegistrationMessage {
item_handle: 1,
item_correlation_id: [
0xfb, 0xdf, 0x86, 0xdc, 0x1f, 0xc4, 0x34, 0x4b, 0xbb, 0x26, 0xa9, 0x97, 0x35, 0xe9,
0xb7, 0x57,
],
item_definition,
item_context: "TestChildObject".to_string(),
subscribe: true,
reserved_25_27: [0; 2],
reserved_31_55: [0; 24],
};
let encoded = msg.encode();
assert_eq!(encoded, captured);
}
+8 -1
View File
@@ -22,6 +22,10 @@ tracing = { workspace = true }
futures-util = { workspace = true }
tokio-stream = { version = "0.1", features = ["sync"] }
rand = "0.8"
# F40 — optional `metrics` feature. Default build does NOT depend on
# this crate; enable via `--features metrics` to wire counters and
# histograms into a downstream `metrics::Recorder`.
metrics = { workspace = true, optional = true }
[dev-dependencies]
async-trait = { workspace = true }
@@ -31,7 +35,10 @@ default = []
# Transport feature gates land in M2-M5.
nmx = []
asb = []
metrics = []
# F40 — wire counters / histograms / gauges via the `metrics` crate.
# Default build is allocation-neutral: no `metrics` dep, no runtime cost.
# See `src/metrics.rs` for the emitted metric inventory.
metrics = ["dep:metrics"]
serde = ["mxaccess-codec/serde"]
# `live` gates integration tests that hit a running AVEVA install. Driven by
# the `MX_LIVE` env var via `tools/Setup-LiveProbeEnv.ps1`.
@@ -1,64 +1,170 @@
//! `subscribe-buffered` — buffered subscription demonstration (M6 placeholder).
//! `subscribe-buffered` — open a buffered subscription with a 1-second cadence.
//!
//! Per `wwtools/mxaccesscli/docs/api-notes.md:138-140`, "buffered" is a
//! delivery-cadence knob (`SetBufferedUpdateInterval`), **not** multi-sample
//! payload bundling. Each event still carries one sample; the cadence
//! controls how often the server flushes accumulated updates.
//! Per `wwtools/mxaccesscli/docs/api-notes.md:97-100,138-140,154-157` (R2 in
//! `design/70-risks-and-open-questions.md`), the `update_interval_ms` knob
//! controls the **delivery cadence** — each emitted event still carries one
//! sample, **not** a multi-sample payload. The returned [`mxaccess::Subscription`]
//! is the same `Stream<Item = Result<DataChange, Error>>` as plain
//! [`mxaccess::Session::subscribe`].
//!
//! `Session::subscribe_buffered` is currently `Err(Error::Unsupported)`
//! pending the M6 buffered-mode RPC port. Once the surface lands the
//! demo body below becomes ~10 lines using the same `Stream` interface
//! as `subscribe.rs`.
//! Drains up to 5 updates (or a 30s timeout, whichever first), prints each,
//! then unsubscribes cleanly. Mirrors the `subscribe.rs` shape — see that
//! example for the env-var contract and resolver shim design notes.
use mxaccess::{BufferedOptions, ConnectionOptions, Session};
use std::sync::Arc;
use std::time::Duration;
use futures_util::StreamExt;
use mxaccess::{
BufferedOptions, GalaxyTagMetadata, RecoveryPolicy, Resolver, ResolverError, Session,
SessionOptions,
};
use mxaccess_rpc::guid::Guid;
use mxaccess_rpc::ntlm::NtlmClientContext;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
if std::env::var_os("MX_LIVE").is_none() {
let Some(env) = LiveEnv::from_process()? else {
eprintln!(
"MX_LIVE not set — `subscribe-buffered` is the M6 placeholder; \
run `. tools/Setup-LiveProbeEnv.ps1` to populate live env vars \
once the buffered subscribe surface lands."
"MX_LIVE not set — skipping live demo. Run \
`. tools/Setup-LiveProbeEnv.ps1` to populate the required env vars."
);
return Ok(());
}
// The constructor itself is currently Unsupported (gated on M3 transport
// selection wiring). When it lands, swap to `Session::connect_nmx` like
// the other examples.
let session = match Session::connect(ConnectionOptions).await {
Ok(s) => s,
Err(mxaccess::Error::Unsupported {
operation,
transport,
}) => {
eprintln!(
"Session::connect / subscribe_buffered are deferred to M6: \
{operation} on {transport:?} transport. See \
design/followups.md for the buffered-subscribe gating note."
);
return Ok(());
}
Err(e) => return Err(e.into()),
};
let session = Session::connect_nmx(
env.addr,
SessionOptions::default(),
NtlmClientContext::from_env()?,
env.service_ipid,
Arc::new(StaticResolver::new(&env.tag)),
RecoveryPolicy::default(),
)
.await?;
let opts = BufferedOptions {
update_interval_ms: 250,
update_interval_ms: 1_000,
};
match session
.subscribe_buffered("TestChildObject.TestInt", opts)
.await
{
Ok(_) => {
eprintln!(
"subscribe_buffered returned a subscription unexpectedly — \
check whether M6 has landed and update this example."
);
eprintln!(
"buffered-subscribing to {} (requested cadence {} ms, rounded to {} ms)",
env.tag,
opts.update_interval_ms,
opts.rounded_update_interval_ms()
);
let mut sub = session.subscribe_buffered(&env.tag, opts).await?;
eprintln!("correlation_id = {:02x?}", sub.correlation_id());
let mut received = 0;
while received < 5 {
match tokio::time::timeout(Duration::from_secs(30), sub.next()).await {
Ok(Some(Ok(dc))) => {
println!("{} = {:?} ts={:?}", dc.reference, dc.value, dc.timestamp);
received += 1;
}
Ok(Some(Err(e))) => {
eprintln!("subscription error: {e}");
break;
}
Ok(None) => {
eprintln!("subscription stream ended");
break;
}
Err(_) => {
eprintln!("no update within 30s; exiting after {received} updates");
break;
}
}
Err(mxaccess::Error::Unsupported { operation, .. }) => {
eprintln!("{operation}: deferred to M6 (see design/followups.md)");
}
Err(e) => return Err(e.into()),
}
session.unsubscribe(sub).await?;
session.shutdown_nmx().await?;
Ok(())
}
// ---- shared boilerplate (see subscribe.rs / connect-write-read.rs for rationale) ----
struct LiveEnv {
addr: std::net::SocketAddr,
service_ipid: Guid,
tag: String,
}
impl LiveEnv {
fn from_process() -> Result<Option<Self>, Box<dyn std::error::Error>> {
if std::env::var_os("MX_LIVE").is_none() {
return Ok(None);
}
let host = std::env::var("MX_NMX_HOST")?;
let addr = parse_host_port(&host, 135)?;
let service_ipid = Guid::parse_str(&std::env::var("MX_NMX_SERVICE_IPID")?)?;
let tag = std::env::var("MX_TEST_TAG").unwrap_or_else(|_| "TestChildObject.TestInt".into());
Ok(Some(Self {
addr,
service_ipid,
tag,
}))
}
}
fn parse_host_port(
s: &str,
default_port: u16,
) -> Result<std::net::SocketAddr, Box<dyn std::error::Error>> {
if let Ok(addr) = s.parse() {
return Ok(addr);
}
let with_port = if s.contains(':') {
s.to_string()
} else {
format!("{s}:{default_port}")
};
Ok(
std::net::ToSocketAddrs::to_socket_addrs(&with_port.as_str())?
.next()
.ok_or("no addrs resolved")?,
)
}
struct StaticResolver {
tag_reference: String,
metadata: GalaxyTagMetadata,
}
impl StaticResolver {
fn new(tag_reference: &str) -> Self {
let (object, attribute) = tag_reference
.split_once('.')
.unwrap_or((tag_reference, "TestInt"));
Self {
tag_reference: tag_reference.to_string(),
metadata: GalaxyTagMetadata {
object_tag_name: object.to_string(),
attribute_name: attribute.to_string(),
primitive_name: None,
platform_id: 1,
engine_id: 2,
object_id: 3,
primitive_id: 0,
attribute_id: 7,
property_id: GalaxyTagMetadata::VALUE_PROPERTY_ID,
mx_data_type: 2,
is_array: false,
security_classification: 0,
attribute_source: "dynamic".into(),
},
}
}
}
#[async_trait::async_trait]
impl Resolver for StaticResolver {
async fn resolve(&self, tag: &str) -> Result<GalaxyTagMetadata, ResolverError> {
if tag == self.tag_reference {
Ok(self.metadata.clone())
} else {
Err(ResolverError::NotFound {
tag_reference: tag.to_string(),
})
}
}
}
+10 -3
View File
@@ -64,6 +64,7 @@ use tokio::sync::{mpsc, Mutex};
use tokio::task::JoinHandle;
use tokio_stream::wrappers::ReceiverStream;
use crate::metrics as session_metrics;
use crate::transport_asb::AsbTransport;
use crate::{BufferedOptions, ConnectionError, Error, TransportKind};
@@ -184,7 +185,10 @@ impl AsbSession {
pub async fn read(&self, items: &[ItemIdentity]) -> Result<ReadResponse, Error> {
let mut transport = self.inner.transport.lock().await;
let client = transport.client_mut();
client.read(items).await.map_err(map_client_error)
let resp = client.read(items).await.map_err(map_client_error)?;
// F40 — count successful ASB reads.
session_metrics::record_asb_read();
Ok(resp)
}
/// `Write` — set the value of each item. `items.len()` should
@@ -198,10 +202,13 @@ impl AsbSession {
) -> Result<WriteResponse, Error> {
let mut transport = self.inner.transport.lock().await;
let client = transport.client_mut();
client
let resp = client
.write(items, values, write_handle)
.await
.map_err(map_client_error)
.map_err(map_client_error)?;
// F40 — count successful ASB writes.
session_metrics::record_asb_write();
Ok(resp)
}
/// `KeepAlive` — one-way heartbeat to keep the channel alive
+196 -11
View File
@@ -30,6 +30,7 @@ pub use mxaccess_codec::{
// ---- Public types --------------------------------------------------------
pub mod asb_session;
pub(crate) mod metrics;
pub mod session;
pub mod transport_asb;
@@ -67,14 +68,60 @@ pub struct DataChange {
/// Buffered subscription delivery — single-sample-per-event with a configurable
/// flush cadence. **Not** multi-sample payload bundles per
/// `wwtools/mxaccesscli/docs/api-notes.md:138-140` (R2 verified).
///
/// Retained as a marker type for back-compat; the wire path returns the
/// same [`Subscription`] handle as plain [`Session::subscribe`] because
/// the buffered cadence is a server-side delivery rate knob, not a
/// payload-shape change. Each event still carries one
/// [`DataChange`] sample.
#[derive(Debug, Clone)]
pub struct BufferedSubscription;
/// Configuration for [`Session::subscribe_buffered`].
///
/// The cadence is a server-side delivery rate knob (single-sample per
/// event), **not** a multi-sample payload bundle. Verified at
/// `wwtools/mxaccesscli/docs/api-notes.md:97-100,138-140,154-157`
/// (R2 in `design/70-risks-and-open-questions.md`).
#[derive(Debug, Clone, Copy)]
pub struct BufferedOptions {
/// Requested delivery interval in milliseconds. The wire layer
/// rounds this **up** to the nearest 100 ms — the same rounding the
/// .NET reference applies in
/// `MxNativeCompatibilityServer.SetBufferedUpdateInterval:638`
/// (`((updateInterval + 99) / 100) * 100`). Values below 100 round
/// up to 100; zero or negative requests are rejected by
/// [`Session::subscribe_buffered`].
pub update_interval_ms: u32,
}
impl BufferedOptions {
/// Round `update_interval_ms` **up** to the nearest 100 ms, mirroring
/// `MxNativeCompatibilityServer.SetBufferedUpdateInterval`
/// (`MxNativeCompatibilityServer.cs:638`):
///
/// ```text
/// _bufferedUpdateIntervals[serverHandle] = ((updateInterval + 99) / 100) * 100;
/// ```
///
/// Saturates rather than overflowing — `u32::MAX` rounds up to
/// `u32::MAX` (the live LMX update intervals are far below this
/// limit so the saturating branch is unreachable in practice; the
/// helper is exposed for unit testing).
#[must_use]
pub const fn rounded_update_interval_ms(self) -> u32 {
let v = self.update_interval_ms;
// Saturating equivalent of `((v + 99) / 100) * 100`. The .NET
// version uses `int` arithmetic; Rust's `u32` is wider than the
// .NET `int` for the positive range we care about and saturates
// explicitly to keep the helper total.
match v.checked_add(99) {
Some(plus) => (plus / 100) * 100,
None => u32::MAX,
}
}
}
#[derive(Debug, Clone)]
pub struct SecurityContext {
pub current_user_id: i32,
@@ -514,20 +561,52 @@ impl Session {
})
}
/// Buffered subscription with a delivery-cadence knob. Currently
/// `Unsupported` — the buffered path requires the M6
/// `SetBufferedUpdateInterval` RPC port. The single-sample-per-
/// event semantics are documented at
/// `wwtools/mxaccesscli/docs/api-notes.md:138-140`.
/// Buffered subscription with a delivery-cadence knob.
///
/// Returns a [`Subscription`] yielding `Result<DataChange, Error>` —
/// the **same item type** as plain [`Self::subscribe`]. Buffered is a
/// **single-sample, cadence knob — not multi-sample payload**:
/// `update_interval_ms` controls how often the AVEVA platform
/// flushes the latest value, but each emitted event still carries
/// one sample. Verified against
/// `wwtools/mxaccesscli/docs/api-notes.md:97-100,138-140,154-157`
/// (R2 in `design/70-risks-and-open-questions.md`).
///
/// ## Wire encoding
///
/// Mirrors `MxNativeSession.RegisterBufferedItemAsync`
/// (`MxNativeSession.cs:272-310`): suffixes the item definition
/// with `.property(buffer)` (via
/// [`mxaccess_codec::NmxReferenceRegistrationMessage::to_buffered_item_definition`])
/// and dispatches a single LMX `RegisterReference` (opcode `0x10`)
/// with `subscribe = true`. No `AdviseSupervisory` follow-up is
/// emitted — the server treats the subscribe-flagged
/// `RegisterReference` as a supervisory advise (verified against
/// `captures/082-frida-add-buffered-plain-advise-testint`).
///
/// `update_interval_ms` is rounded **up** to the nearest 100 ms, the
/// same rounding the .NET reference applies in
/// `MxNativeCompatibilityServer.SetBufferedUpdateInterval:638`. The
/// rounded value is held client-side: native MXAccess does not emit
/// a separate `SetBufferedUpdateInterval` RPC (verified by absence
/// in the `079`/`082` captures — `mx.set-buffered-interval.begin/end`
/// produces no NMX traffic).
///
/// # Errors
/// - [`Error::Connection`] if the session is shut down.
/// - [`Error::Configuration`] when `update_interval_ms == 0`
/// (matches the .NET reference's
/// `ArgumentOutOfRangeException` at
/// `MxNativeCompatibilityServer.cs:630-633`), when the resolver
/// rejects `reference`, or when the LMX server returns a
/// non-zero HRESULT for the `RegisterReference` round-trip.
/// - [`Error::Io`] / transport errors from the underlying RPC.
pub async fn subscribe_buffered(
&self,
_reference: &str,
_options: BufferedOptions,
reference: &str,
options: BufferedOptions,
) -> Result<Subscription, Error> {
Err(Error::Unsupported {
operation: Cow::Borrowed("Session::subscribe_buffered (M6)"),
transport: TransportKind::Nmx,
})
self.subscribe_buffered_nmx(reference, options).await
}
/// Orderly shutdown with a wall-clock bound. Wraps
@@ -616,6 +695,112 @@ fn mxvalue_to_writevalue(value: MxValue) -> Result<mxaccess_nmx::WriteValue, Err
mod tests {
use super::*;
// ---- BufferedOptions ------------------------------------------------
#[test]
fn buffered_options_rounds_below_100_up_to_100() {
// ((1 + 99) / 100) * 100 == 100 — same arithmetic as
// MxNativeCompatibilityServer.cs:638.
assert_eq!(
BufferedOptions {
update_interval_ms: 1,
}
.rounded_update_interval_ms(),
100
);
assert_eq!(
BufferedOptions {
update_interval_ms: 99,
}
.rounded_update_interval_ms(),
100
);
assert_eq!(
BufferedOptions {
update_interval_ms: 100,
}
.rounded_update_interval_ms(),
100
);
}
#[test]
fn buffered_options_rounds_partial_up() {
// 101 → 200, 250 → 300, 999 → 1000.
assert_eq!(
BufferedOptions {
update_interval_ms: 101,
}
.rounded_update_interval_ms(),
200
);
assert_eq!(
BufferedOptions {
update_interval_ms: 250,
}
.rounded_update_interval_ms(),
300
);
assert_eq!(
BufferedOptions {
update_interval_ms: 999,
}
.rounded_update_interval_ms(),
1_000
);
}
#[test]
fn buffered_options_preserves_exact_multiples_of_100() {
for n in [200, 1_000, 5_000, 60_000] {
assert_eq!(
BufferedOptions {
update_interval_ms: n,
}
.rounded_update_interval_ms(),
n,
"exact multiple of 100 should round to itself: {n}"
);
}
}
#[test]
fn buffered_options_rounds_zero_to_zero() {
// Zero is the explicit "rejected" value at
// MxNativeCompatibilityServer.cs:630-633; the helper itself is
// total (`Session::subscribe_buffered` does the validation), so
// zero rounds to zero rather than panicking.
assert_eq!(
BufferedOptions {
update_interval_ms: 0,
}
.rounded_update_interval_ms(),
0
);
}
#[test]
fn buffered_options_saturates_at_u32_max() {
// u32::MAX + 99 would overflow; the helper saturates instead.
let opts = BufferedOptions {
update_interval_ms: u32::MAX,
};
assert_eq!(opts.rounded_update_interval_ms(), u32::MAX);
}
/// Compile-time check that `Session::subscribe_buffered` returns the
/// same `Subscription` type as plain `subscribe` (F36 DoD bullet 1
/// plus bullet 3 — the API discoverably documents single-sample
/// semantics by NOT introducing a separate `BufferedSubscription`-shaped
/// return).
#[allow(dead_code)]
fn _subscribe_buffered_returns_subscription(
s: &Session,
opts: BufferedOptions,
) -> impl std::future::Future<Output = Result<Subscription, Error>> + '_ {
s.subscribe_buffered("ignored", opts)
}
// ---- RecoveryPolicy ------------------------------------------------
#[test]
+275
View File
@@ -0,0 +1,275 @@
//! F40 — optional metrics emission via the [`metrics`] crate.
//!
//! Behind the `metrics` Cargo feature this module wires session-level
//! counters / histograms / gauges into a downstream
//! [`metrics::Recorder`]. With the feature off (the default), every
//! function in this module compiles to an empty body — no `metrics`
//! dep, no per-call overhead, no allocations.
//!
//! ## Design
//!
//! - Call sites in [`crate::session`] and [`crate::asb_session`] invoke
//! the wrappers below **unconditionally**. The feature gate lives
//! inside each wrapper, not at the call site, so the instrumentation
//! stays out of the way of the protocol code.
//! - Names follow `mxaccess.<scope>.<noun>` (dotted, lowercase) to
//! match the convention recommended by the `metrics` crate's
//! [naming guide](https://docs.rs/metrics/0.24/metrics/#naming-conventions).
//! - All metric definitions are listed below so an operator wiring an
//! exporter knows what to expect.
//!
//! ## Emitted metrics
//!
//! ### Counters
//!
//! - `mxaccess.session.writes` — incremented after each successful
//! `Session::write_value` / `write_value_at` /
//! `write_value_secured_at`. Label: `transport=nmx`.
//! - `mxaccess.session.reads` — incremented when `Session::read`
//! returns the first DataChange. Label: `transport=nmx`.
//! - `mxaccess.session.advises` — incremented after each successful
//! `Session::subscribe` (one per `AdviseSupervisory` round-trip).
//! Label: `transport=nmx`.
//! - `mxaccess.session.unadvises` — incremented after each successful
//! `Session::unsubscribe`. Label: `transport=nmx`.
//! - `mxaccess.session.recovery_attempts` — incremented on each
//! `RecoveryEvent::Started` emission. Label: `transport=nmx`.
//! - `mxaccess.session.recovery_successes` — incremented on each
//! `RecoveryEvent::Recovered` emission. Label: `transport=nmx`.
//! - `mxaccess.asb.writes` / `.reads` / `.publishes` — ASB-side
//! counterparts (transport=`asb`). Currently `writes` and `reads`
//! are wired; `publishes` is reserved for the streaming subscribe
//! path (F40 follow-up).
//!
//! ### Histograms (seconds)
//!
//! - `mxaccess.session.write.latency_seconds` — wall time from
//! `Session::write*` entry to a successful return (the round-trip
//! that ends with the inner LMX `OperationComplete`).
//! - `mxaccess.session.read.latency_seconds` — wall time from
//! `Session::read` entry to first DataChange parsed.
//! - `mxaccess.session.subscribe.first_data_change_seconds` —
//! currently unused (reserved for a future first-DataChange
//! instrumentation in `Subscription::poll_next`); kept in the
//! spec for parity with the F40 design.
//!
//! ### Gauges
//!
//! - `mxaccess.session.connected` — `1` while a `Session` is
//! connected, `0` after `shutdown_nmx`. Operators can sum across
//! processes for a "live sessions" view.
//! - `mxaccess.session.registered_items` — current size of the
//! subscription registry (`SessionInner::subscriptions`); rises
//! on `subscribe`, falls on `unsubscribe`.
//! - `mxaccess.session.active_subscriptions` — alias of
//! `registered_items` for now (kept distinct so a future
//! "registered but unsubscribed" distinction can split them
//! without a metric rename).
//!
//! ## Wiring an exporter
//!
//! ```ignore
//! // In a downstream binary, behind the same feature flag:
//! # #[cfg(feature = "metrics")] {
//! let recorder = metrics_exporter_prometheus::PrometheusBuilder::new()
//! .install_recorder()
//! .unwrap();
//! // ... later, scrape `/metrics` from `recorder.handle().render()`.
//! # }
//! ```
//!
//! No exporter is provided by this crate; pick one from the
//! [`metrics-exporter-*`](https://crates.io/search?q=metrics-exporter)
//! ecosystem (Prometheus, StatsD, OpenTelemetry, …).
use std::time::Duration;
// Static label slices reused at every call site. The `metrics` crate's
// `counter!`/`histogram!`/`gauge!` macros accept `(&'static str, &'static str)`
// pairs directly, but having the labels here keeps the call sites brief.
#[cfg(feature = "metrics")]
const TRANSPORT_NMX: (&str, &str) = ("transport", "nmx");
#[cfg(feature = "metrics")]
const TRANSPORT_ASB: (&str, &str) = ("transport", "asb");
// ---- Counters ------------------------------------------------------------
/// One successful NMX `Session::write*` round-trip.
pub(crate) fn record_write() {
#[cfg(feature = "metrics")]
metrics::counter!("mxaccess.session.writes", &[TRANSPORT_NMX]).increment(1);
}
/// One successful NMX `Session::read` (first DataChange returned).
pub(crate) fn record_read() {
#[cfg(feature = "metrics")]
metrics::counter!("mxaccess.session.reads", &[TRANSPORT_NMX]).increment(1);
}
/// One successful NMX `Session::subscribe` (`AdviseSupervisory` ack).
pub(crate) fn record_advise() {
#[cfg(feature = "metrics")]
metrics::counter!("mxaccess.session.advises", &[TRANSPORT_NMX]).increment(1);
}
/// One successful NMX `Session::unsubscribe` (`UnAdvise` ack).
pub(crate) fn record_unadvise() {
#[cfg(feature = "metrics")]
metrics::counter!("mxaccess.session.unadvises", &[TRANSPORT_NMX]).increment(1);
}
/// `RecoveryEvent::Started` emitted (one per recovery attempt).
pub(crate) fn record_recovery_attempt() {
#[cfg(feature = "metrics")]
metrics::counter!("mxaccess.session.recovery_attempts", &[TRANSPORT_NMX]).increment(1);
}
/// `RecoveryEvent::Recovered` emitted (rebuild + re-advise succeeded).
pub(crate) fn record_recovery_success() {
#[cfg(feature = "metrics")]
metrics::counter!("mxaccess.session.recovery_successes", &[TRANSPORT_NMX]).increment(1);
}
/// One successful ASB `AsbSession::write` round-trip.
pub(crate) fn record_asb_write() {
#[cfg(feature = "metrics")]
metrics::counter!("mxaccess.asb.writes", &[TRANSPORT_ASB]).increment(1);
}
/// One successful ASB `AsbSession::read` round-trip.
pub(crate) fn record_asb_read() {
#[cfg(feature = "metrics")]
metrics::counter!("mxaccess.asb.reads", &[TRANSPORT_ASB]).increment(1);
}
// ---- Histograms ----------------------------------------------------------
/// Wall-time of a successful NMX write.
pub(crate) fn record_write_latency(elapsed: Duration) {
#[cfg(feature = "metrics")]
metrics::histogram!("mxaccess.session.write.latency_seconds", &[TRANSPORT_NMX])
.record(elapsed.as_secs_f64());
#[cfg(not(feature = "metrics"))]
let _ = elapsed;
}
/// Wall-time of a successful NMX read (subscribe → first DataChange).
pub(crate) fn record_read_latency(elapsed: Duration) {
#[cfg(feature = "metrics")]
metrics::histogram!("mxaccess.session.read.latency_seconds", &[TRANSPORT_NMX])
.record(elapsed.as_secs_f64());
#[cfg(not(feature = "metrics"))]
let _ = elapsed;
}
// ---- Gauges --------------------------------------------------------------
/// Set the `connected` gauge to `connected as f64`.
pub(crate) fn set_connected(connected: bool) {
#[cfg(feature = "metrics")]
metrics::gauge!("mxaccess.session.connected", &[TRANSPORT_NMX]).set(if connected {
1.0
} else {
0.0
});
#[cfg(not(feature = "metrics"))]
let _ = connected;
}
/// Set the `registered_items` / `active_subscriptions` gauges to `count`.
pub(crate) fn set_registered_items(count: usize) {
#[cfg(feature = "metrics")]
{
let v = count as f64;
metrics::gauge!("mxaccess.session.registered_items", &[TRANSPORT_NMX]).set(v);
metrics::gauge!("mxaccess.session.active_subscriptions", &[TRANSPORT_NMX]).set(v);
}
#[cfg(not(feature = "metrics"))]
let _ = count;
}
// ---- Tests ---------------------------------------------------------------
#[cfg(all(test, feature = "metrics"))]
#[allow(clippy::unwrap_used, clippy::panic, clippy::expect_used)]
mod tests {
use super::*;
use metrics::{CounterFn, GaugeFn, HistogramFn};
use metrics::{Key, Label, Recorder, SharedString, Unit};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
/// Minimal in-memory recorder that counts calls per metric name. We
/// only need to verify a counter increments — full per-label
/// aggregation is the exporter's job.
#[derive(Default)]
struct CountingRecorder {
writes: Arc<AtomicU64>,
}
struct CountingCounter(Arc<AtomicU64>);
impl CounterFn for CountingCounter {
fn increment(&self, value: u64) {
self.0.fetch_add(value, Ordering::Relaxed);
}
fn absolute(&self, value: u64) {
self.0.store(value, Ordering::Relaxed);
}
}
struct NoopHistogram;
impl HistogramFn for NoopHistogram {
fn record(&self, _: f64) {}
}
struct NoopGauge;
impl GaugeFn for NoopGauge {
fn increment(&self, _: f64) {}
fn decrement(&self, _: f64) {}
fn set(&self, _: f64) {}
}
impl Recorder for CountingRecorder {
fn describe_counter(&self, _: metrics::KeyName, _: Option<Unit>, _: SharedString) {}
fn describe_gauge(&self, _: metrics::KeyName, _: Option<Unit>, _: SharedString) {}
fn describe_histogram(&self, _: metrics::KeyName, _: Option<Unit>, _: SharedString) {}
fn register_counter(&self, key: &Key, _: &metrics::Metadata<'_>) -> metrics::Counter {
if key.name() == "mxaccess.session.writes" {
metrics::Counter::from_arc(Arc::new(CountingCounter(self.writes.clone())))
} else {
metrics::Counter::noop()
}
}
fn register_gauge(&self, _: &Key, _: &metrics::Metadata<'_>) -> metrics::Gauge {
metrics::Gauge::from_arc(Arc::new(NoopGauge))
}
fn register_histogram(&self, _: &Key, _: &metrics::Metadata<'_>) -> metrics::Histogram {
metrics::Histogram::from_arc(Arc::new(NoopHistogram))
}
}
/// DoD bullet 4: at least one metric counter increments under a
/// unit test. We swap in the `CountingRecorder` for the duration
/// of the test using `metrics::with_local_recorder` — this is
/// the supported way to scope a recorder to a single test
/// without poisoning the global slot.
#[test]
fn record_write_increments_session_writes_counter() {
let recorder = CountingRecorder::default();
let counter = recorder.writes.clone();
metrics::with_local_recorder(&recorder, || {
record_write();
record_write();
record_write();
});
assert_eq!(counter.load(Ordering::Relaxed), 3);
// Touch the unused-import warnings so the macro keeps Label /
// Key in scope across `metrics` minor bumps.
let _ = Label::new("k", "v");
let _: Option<&Key> = None;
}
}
+187 -11
View File
@@ -33,7 +33,9 @@ use std::sync::Arc;
use std::time::SystemTime;
use mxaccess_callback::{CallbackEvent, CallbackExporter, ExporterIdentities};
use mxaccess_codec::{MxStatus, NmxSubscriptionMessage, NmxSubscriptionRecord};
use mxaccess_codec::{
MxStatus, NmxReferenceRegistrationMessage, NmxSubscriptionMessage, NmxSubscriptionRecord,
};
use mxaccess_galaxy::{GalaxyTagMetadata, Resolver, ResolverError};
use mxaccess_nmx::{NmxClient, NmxClientError, WriteValue};
use mxaccess_rpc::guid::Guid;
@@ -47,6 +49,7 @@ use tokio::sync::{Mutex, broadcast};
use tokio::task::JoinHandle;
use tokio_stream::wrappers::BroadcastStream;
use crate::metrics as session_metrics;
use crate::{DataChange, RecoveryEvent};
use futures_util::Stream;
@@ -587,6 +590,10 @@ impl Session {
let (recovery_tx, _) = broadcast::channel(RECOVERY_BROADCAST_CAPACITY);
// F40 — gauge stays at 1 until shutdown_nmx flips it.
session_metrics::set_connected(true);
session_metrics::set_registered_items(0);
Ok(Self {
inner: Arc::new(SessionInner {
options,
@@ -683,12 +690,16 @@ impl Session {
let mut last_error: Option<Error> = None;
for attempt in 1..=policy.max_attempts {
// F40 — increment attempt counter at Started emission.
session_metrics::record_recovery_attempt();
let _ = self
.inner
.recovery_tx
.send(Arc::new(RecoveryEvent::Started { attempt }));
match self.recover_connection_core(&factory).await {
Ok(()) => {
// F40 — increment success counter on Recovered.
session_metrics::record_recovery_success();
let _ = self
.inner
.recovery_tx
@@ -859,6 +870,7 @@ impl Session {
.await
.map_err(map_resolver)?;
let opts = &inner.options;
let started = std::time::Instant::now();
let mut nmx = inner.nmx.lock().await;
let hr = nmx
.write(
@@ -874,6 +886,9 @@ impl Session {
.await
.map_err(map_nmx)?;
ensure_hresult_ok(hr)?;
// F40 — count + record latency only on the success path.
session_metrics::record_write();
session_metrics::record_write_latency(started.elapsed());
Ok(())
}
@@ -900,6 +915,7 @@ impl Session {
.await
.map_err(map_resolver)?;
let opts = &inner.options;
let started = std::time::Instant::now();
let mut nmx = inner.nmx.lock().await;
let hr = nmx
.write2(
@@ -916,6 +932,10 @@ impl Session {
.await
.map_err(map_nmx)?;
ensure_hresult_ok(hr)?;
// F40 — write2 shares the writes counter (same Session::write*
// family on the wire).
session_metrics::record_write();
session_metrics::record_write_latency(started.elapsed());
Ok(())
}
@@ -951,6 +971,7 @@ impl Session {
.await
.map_err(map_resolver)?;
let opts = &inner.options;
let started = std::time::Instant::now();
let mut nmx = inner.nmx.lock().await;
let hr = nmx
.write_secured2(
@@ -970,6 +991,9 @@ impl Session {
.await
.map_err(map_nmx)?;
ensure_hresult_ok(hr)?;
// F40 — secured-write success counts toward the writes total.
session_metrics::record_write();
session_metrics::record_write_latency(started.elapsed());
Ok(())
}
@@ -1076,12 +1100,147 @@ impl Session {
// replay AdviseSupervisory after a transport rebuild. Inserted
// AFTER the wire AdviseSupervisory succeeds — failed advises
// never enter the registry.
inner.subscriptions.lock().await.insert(
let registry_size = {
let mut reg = inner.subscriptions.lock().await;
reg.insert(
correlation_id,
SubscriptionEntry {
metadata: Arc::clone(&metadata_arc),
},
);
reg.len()
};
// F40 — count the advise + update the gauge under the same
// ordering as the registry insert so the gauge value matches
// what `recover_connection`'s snapshot would observe.
session_metrics::record_advise();
session_metrics::set_registered_items(registry_size);
Ok(Subscription {
correlation_id,
SubscriptionEntry {
metadata: Arc::clone(&metadata_arc),
},
);
reference: Arc::<str>::from(reference),
metadata: metadata_arc,
inbound,
pending: std::collections::VecDeque::new(),
})
}
/// Real implementation of [`Session::subscribe_buffered`] for the
/// NMX transport. Mirrors `MxNativeSession.RegisterBufferedItemAsync`
/// (`MxNativeSession.cs:272-310`).
///
/// Wire-side: builds a single LMX `RegisterReference` (opcode `0x10`)
/// frame whose `item_definition` carries the `.property(buffer)`
/// suffix and whose `subscribe` flag is `true`. The server treats
/// this as a buffered supervisory advise — no separate
/// `AdviseSupervisory` follow-up is needed (verified against
/// `captures/082-frida-add-buffered-plain-advise-testint`, which
/// shows exactly one `RegisterReference` and zero `AdviseSupervisory`
/// frames between `mx.set-buffered-interval` and the first
/// `OnBufferedDataChange`).
///
/// `update_interval_ms` is rounded up to the nearest 100 ms via
/// [`crate::BufferedOptions::rounded_update_interval_ms`] and
/// retained client-side. Native MXAccess does not emit a separate
/// `SetBufferedUpdateInterval` RPC — the .NET reference's
/// `MxNativeCompatibilityServer.SetBufferedUpdateInterval`
/// (`cs:627-640`) only updates an in-memory dictionary; the rounding
/// is the only behaviour preserved on the Rust port today (the
/// rounded value is currently informational — a future iteration
/// can expose it via a getter when a consumer needs it).
pub(crate) async fn subscribe_buffered_nmx(
&self,
reference: &str,
options: crate::BufferedOptions,
) -> Result<Subscription, Error> {
self.ensure_connected()?;
if options.update_interval_ms == 0 {
// Mirrors `MxNativeCompatibilityServer.cs:630-633` —
// `ArgumentOutOfRangeException` for non-positive intervals.
return Err(Error::Configuration(ConfigError::InvalidArgument {
detail: "BufferedOptions::update_interval_ms must be positive".to_string(),
}));
}
// Round up to nearest 100ms (cs:638). The rounded value is
// computed for parity with the .NET reference; it is currently
// not transmitted on the wire because native MXAccess holds it
// client-side only (see capture 082's missing
// `SetBufferedUpdateInterval` frame).
let _rounded_ms = options.rounded_update_interval_ms();
let inner = self.inner.clone();
let metadata = inner
.resolver
.resolve(reference)
.await
.map_err(map_resolver)?;
let correlation_id: [u8; 16] = rand::random();
// Build the buffered RegisterReference body. Item definition is
// the full reference suffixed with `.property(buffer)`; item
// context is empty for this single-string form (the .NET
// reference's split-context form is reachable via the
// compat-server layer F35 once it lands). The codec helper
// rejects empty/whitespace inputs with `CodecError::InvalidName`.
let item_definition =
NmxReferenceRegistrationMessage::to_buffered_item_definition(reference)
.map_err(|e| {
Error::Configuration(ConfigError::InvalidArgument {
detail: format!("buffered item definition: {e}"),
})
})?;
let registration = NmxReferenceRegistrationMessage {
item_handle: 0,
item_correlation_id: correlation_id,
item_definition,
item_context: String::new(),
subscribe: true,
reserved_25_27: [0; 2],
reserved_31_55: [0; 24],
};
let opts = &inner.options;
// Subscribe to the broadcast BEFORE issuing the advise so updates
// arriving immediately after don't slip the gap (same ordering
// rationale as plain `subscribe`).
let inbound = Box::pin(BroadcastStream::new(self.inner.callback_tx.subscribe()));
let mut nmx = inner.nmx.lock().await;
let hr = nmx
.register_reference(
opts.local_engine_id,
&metadata,
&registration,
opts.galaxy_id,
/* source_galaxy_id */ i32::from(opts.galaxy_id),
opts.source_platform_id,
)
.await
.map_err(map_nmx)?;
ensure_hresult_ok(hr)?;
drop(nmx);
let metadata_arc = Arc::new(metadata);
// Record the active subscription so recover_connection can replay
// it after a transport rebuild. The replay path currently uses
// `AdviseSupervisory` for every entry; for buffered subscriptions
// that path is functionally equivalent (the LMX server already
// remembers the buffered registration via the `.property(buffer)`
// suffix carried in the metadata's name). Tracked as a sub-followup
// — see `design/followups.md` if a future iteration wants to
// re-issue `RegisterReference` instead.
let registry_size = {
let mut reg = inner.subscriptions.lock().await;
reg.insert(
correlation_id,
SubscriptionEntry {
metadata: Arc::clone(&metadata_arc),
},
);
reg.len()
};
session_metrics::record_advise();
session_metrics::set_registered_items(registry_size);
Ok(Subscription {
correlation_id,
@@ -1126,6 +1285,11 @@ impl Session {
}));
}
// F40 — clock the whole read-as-subscribe (including the
// AdviseSupervisory round-trip) so the histogram captures the
// same wall time a consumer would see.
let started = std::time::Instant::now();
// Subscribe through the public path so the broadcast wiring +
// AdviseSupervisory both run.
let subscription = self.subscribe(reference).await?;
@@ -1143,6 +1307,10 @@ impl Session {
Ok(None) => Err(Error::Connection(ConnectionError::EngineNotRegistered)),
Err(_elapsed) => Err(Error::Timeout(timeout)),
};
if result.is_ok() {
session_metrics::record_read();
session_metrics::record_read_latency(started.elapsed());
}
// Best-effort unsubscribe. The .NET finally block at cs:351-358
// ignores the return of Unsubscribe; mirror that — a failed
@@ -1192,11 +1360,14 @@ impl Session {
// We do this only on the success path — if UnAdvise itself
// failed, the server may still hold the supervisory record and
// a future recover_connection should re-issue the advise.
inner
.subscriptions
.lock()
.await
.remove(&subscription.correlation_id);
let registry_size = {
let mut reg = inner.subscriptions.lock().await;
reg.remove(&subscription.correlation_id);
reg.len()
};
// F40 — count the unadvise + update the gauge.
session_metrics::record_unadvise();
session_metrics::set_registered_items(registry_size);
Ok(())
}
@@ -1219,6 +1390,11 @@ impl Session {
{
return Ok(());
}
// F40 — flip the connected gauge as soon as the atomic flips
// so a concurrent scrape never sees connected=1 alongside a
// shutdown-in-progress.
session_metrics::set_connected(false);
session_metrics::set_registered_items(0);
// 1. Unregister the engine on the wire first, while the NMX
// transport is still live.