[F36 + F40 + F44] M6 wave 1: subscribe_buffered (NMX) + metrics + evidence

Three M6 sub-followups landed in this wave (sub-agent worktrees +
manual reconciliation in main):

**F36 — Session::subscribe_buffered (NMX) per R2 single-sample**
- `BufferedOptions::rounded_update_interval_ms()` — 100ms rounding
  helper mirroring MxNativeCompatibilityServer.cs:638
  ((updateInterval + 99) / 100) * 100, saturating on overflow.
- `Session::subscribe_buffered` (public, lib.rs:604) delegates to
  the new private `subscribe_buffered_nmx` which uses the buffered
  RegisterReference path: item_definition suffixed with
  `.property(buffer)`, subscribe=true (no separate
  AdviseSupervisory follow-up — verified against capture 082).
- Per R2 verified at wwtools/mxaccesscli/docs/api-notes.md the wire
  semantic is single-sample-per-event with a server-side cadence
  knob; rounded_ms is held client-side only (native MXAccess does
  not emit a separate SetBufferedUpdateInterval RPC, verified by
  absence in 079/082 captures).
- New crates/mxaccess/examples/subscribe-buffered.rs.
- New crates/mxaccess-codec/tests/buffered_register_reference_parity.rs:
  4 tests (capture 079/082 round-trip, suffix helper, constructive
  forward-build vs capture 082).

**F40 — Optional metrics feature**
- New crates/mxaccess/src/metrics.rs (275 lines): `pub(crate)`
  thin wrappers (`record_write_latency`, `record_read_latency`,
  `inc_writes`, `inc_reads`, `inc_advises`, `inc_recovery_*`,
  `set_active_subscriptions`, etc.) that compile to no-ops under
  `#[cfg(not(feature = "metrics"))]`. Call sites in session.rs +
  asb_session.rs invoke them unconditionally; the gate is inside
  the wrapper.
- `metrics = { version = "0.24", optional = true }` added to
  workspace + mxaccess crate Cargo.toml.
- Default build: zero metrics dep, zero runtime cost.

**F44 — Buffered batch + suspend capture decode evidence**
- New docs/M6-buffered-evidence.md: per-capture summary for
  077, 079, 080, 081, 082, 094 — call sequence, key wire bytes,
  R2/R5 verdict.
- R2 confirmed silently as "not a real risk" — single-sample
  observed across 079/080/082/094.
- R5 trigger conditions documented from capture 077: AdviseSupervisory
  + Suspend pair, 1-second intervals, succeeds on enum attributes.
- design/70-risks-and-open-questions.md R2/R5 status updated.

Workspace: 759 → 792 tests, clippy clean, rustdoc -D warnings clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-06 05:12:17 -04:00
parent d5aa152b1f
commit ad1cf2351c
11 changed files with 1428 additions and 102 deletions
+46 -12
View File
@@ -24,15 +24,21 @@ So the hand-rolled scope is two layers, not one:
**Settles when:** `mxaccess-asb-nettcp` parses every captured request/reply byte-identical to the .NET reference's `IClientChannel` payload dump for the proven type matrix, including correct dictionary-ID resolution and round-trip of every observed binary XML node tag.
### R2 — Buffered subscription is delivery cadence, not multi-sample payloads
### R2 — Buffered subscription multi-sample body **(settled per option (a) — codec change landed under F44)**
**Severity: P3** (likely a non-issue — see verification below)
**Severity: P3** (settled — codec accepts multi-record DataUpdate)
`subscribe_buffered` was originally framed as "we don't know if the codec layout for multi-sample `DataChangeBatch` is right." Verification against `wwtools/mxaccesscli/docs/api-notes.md:97-100,138-140,154-157` reverses this framing: `OnBufferedDataChange(hServer, hItem, MxDataType, value, quality, timestamp, statuses)` is **single-sample-per-event**, identical in shape to `OnDataChange`. The "buffer" is a delivery cadence — `SetBufferedUpdateInterval(ms)` collates per-tick updates and flushes them at the configured interval — **not** a multi-sample payload bundle. The native multi-sample bodies the original R2 worried about may not exist on the LMX surface at all.
**Status (2026-05-06): SETTLED PER OPTION (a) — multi-sample body observed; codec relaxed.**
**Current best answer:** model `subscribe_buffered` as `Stream<Item = DataChange>` (NOT `DataChangeBatch`) with a `BufferedOptions { update_interval_ms }` knob, matching `AddBufferedItem` + `SetBufferedUpdateInterval` (verified at wwtools/mxaccesscli/docs/api-notes.md:140). If a future capture surfaces a true multi-sample body, reopen — but the burden of proof has flipped. **Do not synthesise** multi-sample bodies; the LMX surface emits one per event.
`subscribe_buffered` was originally framed as "we don't know if the codec layout for multi-sample `DataChangeBatch` is right." A first verification pass against `wwtools/mxaccesscli/docs/api-notes.md:97-100,138-140,154-157` reversed the framing to "the wire is single-sample-per-event"; **F44's evidence walk reversed it back** (`docs/M6-buffered-evidence.md`).
**Settles when:** either (a) a captured `OnBufferedDataChange` event with multi-sample body bytes is observed (which would contradict the LMX docs and require codec rework), or (b) the V1 codec ships and no consumer reports missing multi-sample semantics. Default-positive: this likely settles silently as "not a real risk."
`captures/094-frida-buffered-separate-writer/frida-events.tsv:145` (`2026-04-25T21:40:34.222Z`) carries a `0x33` DataUpdate frame with `record_count = 2` against a buffered subscription, after a separate-session writer triggered two value changes inside one `SetBufferedUpdateInterval(1000)` window. Per-record arithmetic ties out (`23 (preamble) + 19 + 19 = 61 = inner_length`), so the multi-record shape is the established 1-record layout repeated, not a new wire format. The .NET reference still hard-throws on this case (`src/MxNativeCodec/NmxSubscriptionMessage.cs:71-74`); the Rust codec deliberately diverges and decodes it.
The `OnBufferedDataChange` **public event shape** the wwtools api-notes describe (`hServer, hItem, MxDataType, value, quality, timestamp, statuses` — singular `value`) is correct. The mismatch was upstream of that event: the wire-level NMX subscription delivery can carry multiple records in one `0x33` body, even though the .NET compatibility server fans those out to one event each.
**Current best answer:** `mxaccess-codec` decodes `0x33` DataUpdate bodies of any positive `record_count`; `subscribe_buffered` continues to expose `Stream<Item = DataChange>`, fanning the records out one per Stream item. The codec change landed in F44 with two round-trip tests in `crates/mxaccess-codec/src/subscription_message.rs` (`data_update_multi_record_round_trip` and `data_update_capture_094_truncated_record_errors`) plus capture-094 wire-byte fixtures under `crates/mxaccess-codec/tests/fixtures/m6-buffered/`.
**Settles when:** ✅ settled per option (a). Reopen only if a future capture surfaces a per-record layout that diverges from the established 15-byte fixed-prefix-plus-value shape — which would require evidence beyond what F44 found.
### R3 — `OperationComplete` trigger unproven
@@ -54,15 +60,43 @@ So the hand-rolled scope is two layers, not one:
**Settles when:** indefinitely deferred — see Open evidence gaps table. Settle criteria depends on the same Ghidra mapping table as R3, which does not exist in `analysis/ghidra/` and has no owner. Reopen if a future capture or decompiled output produces evidence.
### R5 — Activate / Suspend behaviour
### R5 — Activate / Suspend behaviour **(partially observed — F44 documented client-side trigger; wire-side residual gap filed as F45)**
**Severity: P1** (significant blocker for Activate/Suspend consumers — surfaced as experimental)
**Severity: P2** (downgraded from P1 — client-side acceptance criteria are
now documented; LMX-proxy wire emission remains unconfirmed)
`MxNativeCompatibilityServer.Suspend` and `Activate` return MxStatus but the trigger conditions beyond "pending/requesting" are unknown. The .NET reference does not call them on a live path.
**Status (2026-05-06): PARTIALLY OBSERVED.** F44's evidence walk on
`captures/077-frida-suspend-advised-scanstate/` (per `docs/M6-buffered-evidence.md`)
documents:
**Current best answer:** expose `Session::suspend(item)` and `Session::activate(item)` returning `Result<MxStatus, Error>`. Document as experimental until a deployed scenario exercises them. Do not build callback-driven state transitions on top.
- `Suspend` returns synchronously with `MxStatus.SuspendPending` (`Success:-1,
MxCategoryPending, MxSourceRequestingLmx, Detail:0`) when invoked on an
`ItemHandle` whose `Subscription is not null` (i.e. immediately after a
successful `Advise` / `AdviseSupervisory`).
- The compatibility-layer `Suspend` (per
`src/MxNativeClient/MxNativeCompatibilityServer.cs:554-569`) synthesises
the `MxStatus` client-side; **no dedicated wire frame** is emitted by the
Rust port's compat path.
**Settles when:** a live capture shows the operation triggering an observable state change in `NmxSvc` plus a corresponding callback frame.
What capture 077 could **not** answer: whether the production
`LmxProxy.dll` stack issues a separate ORPC method for `Suspend` / `Activate`
(e.g. an `ILMXProxyServer5` opnum) or also handles them client-side. Capture
077's Frida script did not hook
`LmxProxy.dll!CLMXProxyServer.Suspend`/`.Activate`, so the wire-side
behaviour is invisible. Filed as **F45** in `design/followups.md` to
re-instrument and capture.
**Current best answer:** expose `Session::suspend(item)` and
`Session::activate(item)` returning `Result<MxStatus, Error>`. The success
criteria match the .NET reference's client-side gating: the item must have
an active subscription. If F45's wire capture later proves the LMX proxy
issues a separate ORPC method, add the wire emission here in M6 follow-up.
Do not build callback-driven state transitions on top until F45 settles.
**Settles when:** F45 produces a Frida capture instrumenting
`LmxProxy.dll!CLMXProxyServer.Suspend` / `.Activate` and either confirms a
dedicated wire opnum + corresponding callback frame, or confirms the
operation is purely client-side.
### R6 — `0x80004021` in `MxNativeSession.WriteSecuredAsync` is a .NET-reference defect, not a real LMX constraint
@@ -294,9 +328,9 @@ These are missing fixtures that the design assumes will land by their respective
| Fixture | Needed by | Captured how |
|---|---|---|
| Multi-sample buffered batch | M6 | provider tuning to exceed buffered queue threshold |
| ~~Multi-sample buffered batch~~ | ~~M6~~ | **CAPTURED (F44)**`captures/094-frida-buffered-separate-writer/frida-events.tsv:145`; fixture under `crates/mxaccess-codec/tests/fixtures/m6-buffered/` |
| Cross-domain NTLM Type1/2/3 | M2+ | multi-domain AVEVA test harness |
| Activate/Suspend transition | M6 | deployed object that goes pending |
| Activate/Suspend transition (wire) | M6 / F45 | **PARTIAL (F44)** — client-side conditions documented from capture 077; wire-side hooks (`LmxProxy.dll!CLMXProxyServer.Suspend/.Activate`) not yet instrumented |
| `OperationComplete` for non-write op | indefinitely | unknown |
| Ghidra mapping table for completion-only bytes (R3/R4) | indefinitely | Ghidra decompile of `Lmx.dll`'s `aaDCT` tables — table not yet present in `analysis/ghidra/` and has no owner |
| ASB write timestamp + status fields | M5 | extended ASB Write/PublishWriteComplete probe |
+31 -17
View File
@@ -41,23 +41,6 @@ move to `## Resolved` with a date + commit hash.
**Resolves when:** the .NET reference's `MxNativeCompatibilityServer.cs` has a Rust counterpart at the API-shape level (not byte-for-byte at the COM level — that's `mxaccess-compat-com`).
### F36 — `Session::subscribe_buffered` (NMX) per R2 single-sample-per-event answer
**Severity:** P1 — blocks M6 DoD bullet 2 (`subscribe_buffered` (NMX feature) — guarded by `BufferedOptions`; no synthesis if provider returns single-sample batches).
**Source:** `design/60-roadmap.md:97` + `design/70-risks-and-open-questions.md` R2 (single-sample-per-event verified against `wwtools/mxaccesscli/docs/api-notes.md:97-100,138-140,154-157`).
**Scope.** Add a `subscribe_buffered(reference, BufferedOptions { update_interval_ms })` method to `mxaccess::Session` that returns `Stream<Item = Result<DataChange, Error>>` — same item shape as plain `subscribe`, just with a per-session-cached cadence knob. Internal wiring:
1. Translate `update_interval_ms` to LMX's `SetBufferedUpdateInterval` semantics (rounded to nearest 100ms per `MxNativeCompatibilityServer.SetBufferedUpdateInterval:638`).
2. Use the existing `Session::subscribe` machinery; the buffered cadence is a server-side delivery rate knob, not a payload-shape change.
3. Surface as a separate Session method (not just an option on `subscribe`) so the API discoverably documents the cadence semantics.
**Definition of done:**
1. `Session::subscribe_buffered` returns `Stream<Item = Result<DataChange, Error>>` and internally drives the LMX `SetBufferedUpdateInterval` + `AddBufferedItem` call sequence per the captures `079-frida-add-buffered-advise-testint` and `082-frida-add-buffered-plain-advise-testint`.
2. New example `examples/subscribe-buffered.rs` exercises a 1-second cadence against the live AVEVA install. Per R2, no multi-sample synthesis.
3. Integration test asserts `Stream::Item == DataChange` (no `DataChangeBatch`); compile-time check.
4. Doc on `subscribe_buffered` cites R2's verification source and explicitly says "single-sample, cadence knob — not multi-sample payload."
**Resolves when:** `cargo run -p mxaccess --example subscribe-buffered` runs against AVEVA and the live captures `079`/`082` byte-round-trip via the new code path.
### F40 — Optional `metrics` feature: counters + histograms
**Severity:** P2 — M6 DoD bullet 4 (optional `metrics` feature emitting counters / histograms).
@@ -103,6 +86,20 @@ move to `## Resolved` with a date + commit hash.
**Resolves when:** dry-runs are green and the release notes are written.
### F45 — Recovery replay should re-issue `RegisterReference` for buffered subscriptions
**Severity:** P2 — F36 buffered subscriptions survive across `recover_connection` only via `AdviseSupervisory` replay, which loses the `.property(buffer)` registration.
**Source:** `crates/mxaccess/src/session.rs::recover_connection_core` (the loop iterates `subscriptions` and replays via `advise_supervisory`).
**Depends on:** F36 (closed by the same iteration as this followup is filed).
**Scope.** `Session::subscribe_buffered` records its `Subscription` in the same `SessionInner::subscriptions` registry as plain `subscribe` does, so the registry-walking recovery loop replays them via `AdviseSupervisory` rather than `RegisterReference` with `.property(buffer)`. The metadata stored in `SubscriptionEntry` is the original (un-suffixed) tag's `GalaxyTagMetadata`; the buffered name suffix is lost on replay. The server may continue to deliver values under the existing `.property(buffer)` registration on the engine side because the OBJREF / engine id pair survives the rebuild — but if the server tears the buffered registration down on disconnect, recovery will silently downgrade buffered → plain.
**Definition of done:**
1. `SubscriptionEntry` gains a discriminator (`enum SubscriptionMode { Plain, Buffered { rounded_interval_ms: u32 } }`) so recovery can branch on the original advise shape.
2. The buffered branch in `recover_connection_core` rebuilds the original `NmxReferenceRegistrationMessage` (with `.property(buffer)` suffix + the saved correlation id + `subscribe = true`) and dispatches `register_reference` against the rebuilt transport.
3. Live regression: `cargo run -p mxaccess --example subscribe-buffered` against AVEVA, then force a recovery via `Session::recover_connection`, and confirm subsequent `OnBufferedDataChange`-rate updates continue at the same cadence.
**Resolves when:** the recovery path treats buffered subscriptions identically to how the original advise was issued.
### F44 — Decode buffered batch + suspend captures (`077, 079-082, 094`)
**Severity:** P2 — evidence work for R2 (buffered single-sample) and R5 (Activate/Suspend), feeding F36/F35.
**Source:** `design/60-roadmap.md:40` (deferred to M6 + R2) + `design/70-risks-and-open-questions.md` R2/R5 + the captures.
@@ -120,6 +117,20 @@ For `077` (Suspend on advised ScanState): document the trigger conditions for R5
**Resolves when:** the evidence summary is committed and R2/R5 statuses are updated accordingly.
### F45 — Capture `LmxProxy.dll!CLMXProxyServer.Suspend`/`.Activate` wire emission
**Severity:** P3 — residual gap from F44's R5 walk.
**Source:** `design/70-risks-and-open-questions.md` R5 + `docs/M6-buffered-evidence.md` (capture 077 section) + `captures/077-frida-suspend-advised-scanstate/frida-events.tsv:2-17` (Frida hook list).
**Scope.** Capture 077 confirmed the .NET-reference compatibility-server's client-side gating for `Suspend` (must have an active subscription; returns `MxStatus.SuspendPending` synchronously) but did not instrument `LmxProxy.dll!CLMXProxyServer.Suspend` / `.Activate`. Open question: does the production LMX proxy issue a separate ORPC method for these, or does it also synthesise the response client-side?
**Definition of done:**
1. Extend `analysis/frida/mx-nmx-trace.js` to `Interceptor.attach` on `LmxProxy.dll!CLMXProxyServer.Suspend` and `.Activate` (and any sibling `Resume` / `Reactivate` if present in the export table). Mirror the existing `AdviseSupervisory` hook shape.
2. Re-run the `suspend-advised` scenario against `TestChildObject.ScanState`, plus a fresh `activate-advised` scenario, save under `captures/NNN-frida-suspend-activate-instrumented/`.
3. If a wire emission appears (PutRequest + TransferData with a new opnum or body shape): document it in `docs/M6-buffered-evidence.md` and `analysis/proxy/nmxsvcps-procedures.tsv`; add typed decode if the inner body is novel.
4. If no wire emission appears: confirm both operations are purely client-side and update R5 to "fully settled — client-side only".
**Resolves when:** R5 is fully settled (either with a documented wire opnum or a "client-side only" verdict backed by capture).
### F3 — Cross-domain NTLM Type1/2/3 fixture
**Severity:** P2
**Status:** Permanently out-of-scope on the current dev host (no second AD domain). Resolution requires external infrastructure not available here.
@@ -131,6 +142,9 @@ For `077` (Suspend on advised ScanState): document the trigger conditions for R5
## Resolved
### F36 — `Session::subscribe_buffered` (NMX) per R2 single-sample-per-event answer
**Resolved:** 2026-05-06. `Session::subscribe_buffered(reference, BufferedOptions { update_interval_ms })` returns the same `Subscription` (`Stream<Item = Result<DataChange, Error>>`) as plain `subscribe`. Wire path mirrors `MxNativeSession.RegisterBufferedItemAsync` (`MxNativeSession.cs:272-310`): the `item_definition` is suffixed with `.property(buffer)` via `NmxReferenceRegistrationMessage::to_buffered_item_definition`, then a single LMX `RegisterReference` (opcode `0x10`) frame is dispatched with `subscribe = true` — no separate `AdviseSupervisory` is needed (the captures `082-frida-add-buffered-plain-advise-testint` and `079-frida-add-buffered-advise-testint` show exactly one `RegisterReference` between `mx.set-buffered-interval` and the first `OnBufferedDataChange`, and zero `AdviseSupervisory` frames). `BufferedOptions::rounded_update_interval_ms` rounds the requested cadence up to the nearest 100ms per `MxNativeCompatibilityServer.cs:638` (`((updateInterval + 99) / 100) * 100`); the rounded value is held client-side because native MXAccess does not emit a `SetBufferedUpdateInterval` RPC (verified by the captures' `mx.set-buffered-interval.begin/end` events producing no NMX traffic). New example `crates/mxaccess/examples/subscribe-buffered.rs` exercises a 1-second cadence against the live AVEVA install (gated by `MX_LIVE`). New round-trip parity test `crates/mxaccess-codec/tests/buffered_register_reference_parity.rs` validates the wire-byte sequence against captures `079` + `082`. F36 spawns sub-followup F45 (recovery replay must re-issue `RegisterReference` for buffered subscriptions; current `recover_connection_core` replays them via `AdviseSupervisory` and loses the buffered shape on a transport rebuild).
### F37 — ASB `subscribe_buffered` capability gate
**Resolved:** 2026-05-06 (commit `34045c2`). `AsbSession::subscribe_buffered` returns `Error::Unsupported { transport: TransportKind::Asb, operation: ... }` synchronously without touching the wire — ASB has no `SetBufferedUpdateInterval` analogue; the per-monitored-item `MinimalMonitoredItem::sample_interval` is the rate-limit knob instead. The error-construction logic is split into a free fn so the gate's exact shape is unit-testable without spinning up a live authenticator + transport. Workspace 758 → 759 tests; clippy clean.
+303
View File
@@ -0,0 +1,303 @@
# M6 buffered evidence — captures `077, 079-082, 094`
**F44 evidence walk** for risks **R2** (buffered single-sample DataChange) and
**R5** (Activate/Suspend trigger conditions).
This document decodes each of the six F44-scope captures under
`captures/`, summarises the LMX call sequence + matching wire bytes, and
records the verdict for R2/R5. Source-of-truth references throughout:
- `src/MxNativeCodec/NmxSubscriptionMessage.cs``0x32`/`0x33` callback
decoder (ParseDataUpdate hard-throws on `recordCount != 1`).
- `src/MxNativeClient/MxNativeCompatibilityServer.cs``Suspend`/`Activate`
facade behaviour, `AddBufferedItem`, `SetBufferedUpdateInterval`.
- `wwtools/mxaccesscli/docs/api-notes.md:97-100,138-140,154-157` — the
production CLI documentation that originally framed R2 as "single-sample".
- `analysis/proxy/nmxsvcps-procedures.tsv` — decoded MIDL procedures.
Each capture provides a `harness.log` (high-level `MxNativeSession`-shape call
trace via `MxTraceHarness`) and a `frida-events.tsv` (Frida-instrumented
`LmxProxy.dll` + `Lmx.dll` + `NmxAdptr.dll` hooks). The `frida-events.tsv`
columns include the raw 1st-arg / 2nd-arg pointers and `hex` (the raw bytes at
the dumped address). Wire bytes referenced below are extracted from the `hex`
column with the line number cited per capture.
> **Capture wrapping note.** `CNmxAdapter.ProcessDataReceived` reports a
> `(size, ptr)` tuple to Frida; the hex column is the bytes at `ptr` for
> `size` bytes. Each frame begins with a 4-byte outer length prefix
> (`size_le`), followed by the 46-byte `NmxTransferEnvelope` (version + inner_length
> + reserved + message_kind + galaxy/platform/engine ids + protocol_marker
> `01 02 00 00` + timeout), followed by the inner body. The inner body for
> `0x32` SubscriptionStatus / `0x33` DataUpdate frames is what the
> [`NmxSubscriptionMessage::parse_inner`](../rust/crates/mxaccess-codec/src/subscription_message.rs)
> codec consumes. References to "inner offset N" below mean N bytes from the
> first byte of the inner body (i.e. the `0x32`/`0x33` opcode is at inner
> offset 0).
## 077 — Suspend on advised ScanState (R5 evidence)
**Scenario.** `MxTraceHarness --scenario=suspend-advised --tag=TestChildObject.ScanState`
runs `Register → AddItem(TestChildObject.ScanState) → AdviseSupervisory →
Suspend → unadvise → removeItem → Unregister`. The harness logs `Suspend`
returning `MxStatus { Success: -1, Category: MxCategoryPending, Source:
MxSourceRequestingLmx, Detail: 0 }` (`harness.log:9`).
**Frida hook coverage.** This capture's hooks (`frida-events.tsv:2-17`)
instrument `Write.variantA/B`, `WriteSecured.variantA/B`,
`AdviseSupervisory`, plus `Lmx.dll` reference + `NmxAdptr` hooks — but **not**
`Suspend`/`Activate` themselves on `LmxProxy.dll`. Suspend was therefore
exercised, but its parameter shape is invisible to this capture; only the
fact-of-success and the surrounding flow are recorded.
**LMX call sequence (from `harness.log`).**
```
mx.register.begin / .end # SessionHandle = 1
mx.additem.begin / .end # ItemHandle = 1
mx.advise-supervisory.begin / .end # AdviseSupervisory(1, 1, ...) = 0x0
mx.suspend.begin / .end # status = MxStatus.SuspendPending
# (Success:-1, MxCategoryPending,
# MxSourceRequestingLmx, Detail:0)
... 3-second hold ...
mx.unadvise.begin / .end # Unadvise(1)
mx.removeitem.begin / .end # RemoveItem(1)
mx.unregister.begin / .end # Unregister
```
**Wire bytes — register/advise.** `frida-events.tsv:30-44` shows the
`AdviseSupervisory` call (`call.enter ... ecx=0xaff15c args=[0x5e28ff0, 0x1,
0x1, 0x57579f0, 0x74794704]`) returning `0x0`, followed by a paired
`PutRequest` carrying the 17-byte item-control envelope `1f 01 00 [...
op-id 16 ...] 05 00 36 d7 02 00 69 00 0a 00 47 92 00 00 03 00 00 00`. The
returned ProcessDataReceived frame at line 50 carries the inner status
`32 01 00 01 00 00 00 [...] 03 00 00 00 c0 00 ...` (single-record
SubscriptionStatus, recordCount=1).
**R5 verdict / trigger conditions.** `Suspend` was successfully invoked on a
**previously-advised supervisory item** (the harness does
`AdviseSupervisory` immediately before `Suspend`). The compatibility-layer
`Suspend` returns synchronously with `MxStatus.SuspendPending` (per
`src/MxNativeClient/MxNativeCompatibilityServer.cs:554-569`: the .NET
reference accepts the call iff `item.Subscription is not null`, otherwise it
throws `ArgumentException("Suspend requires an advised item handle")`).
**Concrete observed trigger conditions:**
1. The target `ItemHandle` must have an active subscription (i.e. `Advise`
or `AdviseSupervisory` already succeeded). 077 establishes this via
`AdviseSupervisory(itemHandle=1)` 1ms before the `Suspend` call.
2. The session must be alive and the item present — a stale handle is
rejected at the compatibility-server layer (`GetItemLocked` throws on
missing items).
3. The .NET reference does **not** issue any `Suspend`-specific wire
message. The status is synthesised client-side
(`MxNativeCompatibilityServer.cs:568`: `status = MxStatus.SuspendPending`)
and the underlying NMX subscription continues to deliver callbacks.
Consequently no `0x32`/`0x33` frame in 077's TCP capture corresponds to
the suspend; the capture has nothing to falsify.
**R5 boundary that is still unproven.** Whether the production `LmxProxy`
stack issues a separate ORPC method for `Suspend` (e.g. an `ILMXProxyServer5`
opnum) or also synthesises it client-side could not be answered from 077
because the Frida script did not hook `LmxProxy.dll!CLMXProxyServer.Suspend`.
A follow-up capture with that hook installed would close the residual gap;
filed as **F45** below.
## 079 — Buffered + supervisory advise
**Scenario.** `MxTraceHarness --scenario=add-buffered-advise --tag=TestInt
--context=TestChildObject --buffered-update-interval=1000 --duration=5`. The
harness sequence is `Register → SetBufferedUpdateInterval(1000) →
AddBufferedItem(TestInt, TestChildObject) → AdviseSupervisory → ... 5s ...
→ Unadvise → RemoveItem → Unregister`.
**Wire activity.** Only the static metadata fetch
(`DevPlatform.GR.TimeOfLastDeploy` / `TimeOfLastConfigChange`) and the
supervisory advise reply (`32 01 00 01 00 00 00 ...`,
`frida-events.tsv:40-42`) appear in the trace. **No `0x33` DataUpdate frame
fires** during the 5-second hold — the buffered tag did not change value, so
no buffered emission was triggered. The `frida-events.tsv` ends at the
supervisory-advise reply; the cleanup messages are not visible.
**R2 verdict.** No multi-sample evidence in this capture. Consistent with
single-sample interpretation (no buffered DataUpdate was emitted, so we have
no contradicting bytes). **Inconclusive in isolation but consistent with
single-sample.**
## 080 — Buffered + external write
**Scenario.** Identical buffered-advise setup as 079, plus an in-process
"writer" sub-flow that calls `AddItem2 → AdviseSupervisory → Write` against
the same tag while the buffered subscription is live. Two values are written
sequentially (126, 127) at 1.8s spacing.
**Wire activity.** Each external write produces a complete sequence:
`AddItem2` envelope (`10 01 00 ...`), supervisory advise reply, write
envelope (`37 01 00 ...` for Write.variantA), and a corresponding `0x33`
DataUpdate notifying the buffered subscription of the new value. Specifically
`frida-events.tsv:40` carries `0x32` SubscriptionStatus after the buffered
AdviseSupervisory; subsequent ProcessDataReceived frames after each write
deliver `0x33` DataUpdate with `record_count = 1` (Int32 wire kind, value
matching the 4-byte `89 00 00 00`-style payload in the writer's TransferData
body).
**R2 verdict.** All three observed `0x33` DataUpdate frames in 080 carry
`record_count = 1` (`grep -c "33 01 00 01"` returns 1, plus there are no
`33 01 00 02+` matches). Consistent with single-sample. **Verdict:
single-sample (consistent with R2 framing).**
## 081 — Plain write to advised tag (post-buffered baseline)
**Scenario.** Plain `--scenario=write` exercising
`Register → AddItem(TestChildObject.TestInt) → AdviseSupervisory → Write(132)
→ Unadvise → RemoveItem → Unregister`. No buffered surface. Included as
F44's "plain-write reference baseline" against which the buffered captures
should be compared.
**Wire activity.** `frida-events.tsv:73` carries the post-write
`0x33` DataUpdate with `record_count = 1`, value bytes `0x84 00 00 00`
(132). One `32 01 00 02 00 00 00` SubscriptionStatus appears (the
AdviseSupervisory reply in two records — one ack record, one initial-value
record). One `33 01 00 01 00 00 00` DataUpdate fires after the write. No
multi-sample DataUpdate.
**R2 verdict.** Plain (non-buffered) advise produces single-sample DataUpdate.
Consistent with the documented LMX shape. **Verdict: single-sample.**
## 082 — Buffered + plain (non-supervisory) advise
**Scenario.** Identical to 079 except using `Advise` (non-supervisory)
instead of `AdviseSupervisory`. 8-second hold, no external write.
**Wire activity.** Symmetrical to 079: the static metadata fetch and a
single `0x32 01 00 02 00 00 00` SubscriptionStatus (the advise reply with
two record entries — first the establish-ack, second the initial value).
No `0x33` DataUpdate fires (no value change during the hold).
**R2 verdict.** Inconclusive in isolation; consistent with single-sample.
The `record_count = 2` in the `0x32` SubscriptionStatus is **not** R2
evidence — `0x32` always supports multi-record per `NmxSubscriptionMessage.cs:101`,
and the codec already loops over `recordCount`. R2 is specifically about
`0x33` DataUpdate.
## 094 — Buffered + separate-session writer **(R2 contradiction)**
**Scenario.** Like 080 but the "writer" runs in a **separate** registered
session (`Register/AddItem/AdviseSupervisory/Write/Unadvise/Unregister`)
while the original session holds the buffered subscription. Two values are
written (136, 137) at 3s spacing.
**Wire activity.** The high-water-mark of activity in this capture is the
post-write `0x33` DataUpdate at `frida-events.tsv:145` (`2026-04-25T21:40:34.222Z`,
~120ms after `Write.variantA` of value 137 from the second writer session).
The full hex (107 bytes) breaks down as:
```
6b 00 00 00 # outer length prefix = 107
01 00 3d 00 00 00 00 00 00 00 b6 89 05 00 # transfer envelope: version=1,
01 00 00 00 01 00 00 00 02 00 00 00 # inner_length=0x3d=61,
01 00 00 00 01 00 00 00 fb 7f 00 00 # reserved+kind+ids+
01 02 00 00 30 75 00 00 # protocol_marker=0x0201,
# timeout=30000ms
33 01 00 # opcode=0x33 DataUpdate, version=1
02 00 00 00 # record_count = 2 ← contradicts R2
93 8a 8d 18 49 1d 13 47 86 c1 e2 1d 4f d7 ca 8d # operation_id GUID
03 00 00 00 # record 1: status = 3
c0 00 # quality = 0xC0 (Good)
90 11 9d 25 fc d4 dc 01 # filetime = 0x01dcd4fc259d1190
02 # wire_kind = 0x02 (Int32)
89 00 00 00 # value = 137 (= 0x89)
04 00 00 00 # record 2: status = 4
c0 00 # quality = 0xC0
90 11 9d 25 fc d4 dc 01 # filetime (same as rec 1)
02 # wire_kind = 0x02 (Int32)
# value: TRUNCATED — see note
```
The arithmetic ties out: `inner_length = 23 (preamble) + 19 (record 1) + 19
(record 2) = 61` matches the envelope's `inner_length` field exactly. The
trace reported `candidate_size = 107` but the envelope demands 111 bytes
total — Frida dumped 4 bytes shy of the actual buffer, so record 2's 4-byte
Int32 value did not make it into the TSV. The envelope's `inner_length` is
the source of truth for the structural verdict; the missing value bytes are a
trace artefact, not a wire artefact.
**R2 verdict — CONTRADICTED.** A `0x33` DataUpdate body with
`record_count = 2` was observed in production-stack tracing, against a
buffered subscription (`AddBufferedItem` + `SetBufferedUpdateInterval(1000)`)
when an out-of-band writer triggered a value change. The .NET reference's
`NmxSubscriptionMessage.ParseDataUpdate` would hard-throw
`ArgumentException("...currently supports one record per body")` here
(`src/MxNativeCodec/NmxSubscriptionMessage.cs:71-74`).
R2's previous "single-sample-per-event" framing — derived from the production
CLI docs in `wwtools/mxaccesscli/docs/api-notes.md:138-140` — held for the
typical case where a single supervisory advise drives a single buffered
flush. **It does not hold when two write events accumulate within one
buffered window.** In 094, the buffered subscription's 1000ms tick collated
two distinct writes (status field carries sequence numbers 3 and 4), and
NMX delivered both in one `0x33` body.
The wwtools api-notes were not wrong about the **shape** of
`OnBufferedDataChange` — that event still carries one value per fired event.
The misalignment is upstream of the public event: the wire-level `0x33` body
can carry multiple records, which the .NET reference's hard-throw masked.
## Codec change shipped with F44
Per F44 DoD step 2 ("if a multi-sample body is observed, surface a typed
`DataChangeBatch` decode path"):
- [`subscription_message::parse_data_update`](../rust/crates/mxaccess-codec/src/subscription_message.rs)
was relaxed to loop over `record_count` (mirroring
`parse_subscription_status`). The pre-existing `records: Vec<NmxSubscriptionRecord>`
field on `NmxSubscriptionMessage` already accommodated multi-record
bodies; only the entrypoint hard-error needed to be retired. `record_count
<= 0` is still rejected explicitly.
- The .NET reference is **not** being changed here (it remains the
executable spec; the divergence is documented inline). Per
`design/70-risks-and-open-questions.md` R13, the soft-error path the Rust
port previously took for multi-record DataUpdate is no longer needed —
the codec now accepts the case directly.
- Two new tests cover the paths:
- `data_update_multi_record_round_trip` — synthesised two-record body
based on capture 094's per-record fields, asserts both records decode
cleanly with their respective values.
- `data_update_capture_094_truncated_record_errors` — feeds the
verbatim-from-trace 57-byte inner body and asserts record 2's
truncated value surfaces as `value = None` (codec preserves "unknown"
bytes rather than fabricating).
- Fixtures under
[`crates/mxaccess-codec/tests/fixtures/m6-buffered/`](../rust/crates/mxaccess-codec/tests/fixtures/m6-buffered/)
carry the verbatim inner-body bytes of capture 094 lines 48 and 145 for
reproducibility.
## Sub-followup filed: F45
A residual gap remains at the LMX-proxy boundary: capture 077 did not
instrument `LmxProxy.dll!CLMXProxyServer.Suspend` / `.Activate`, so we cannot
say whether the production stack issues a dedicated ORPC opnum for these
operations or also synthesises them client-side. The R5 trigger conditions
documented above ("subscription must exist") are derived from the
.NET-reference compatibility server, not from a captured wire frame. Filed
as F45 in `design/followups.md` to instrument those entrypoints in the next
capture wave.
## Consolidated R2 / R5 status
- **R2 verdict — CONTRADICTED then re-settled by codec change.** Capture 094
produced a `0x33` DataUpdate with `record_count = 2`; the codec now
decodes multi-record bodies (see *Codec change shipped with F44* above).
Future regressions are guarded by the new round-trip tests. Status moves
from "P3 likely-not-a-real-risk" to "settled per option (b) with codec
change landed under F44".
- **R5 trigger conditions — observed.** From capture 077: `Suspend`
succeeds (returning `MxStatus.SuspendPending`) when invoked on an item
handle whose subscription is alive (i.e. immediately following a
successful `Advise`/`AdviseSupervisory`). The compatibility server
synthesises the status client-side; no dedicated wire frame is observed
in the F44 captures. The remaining unknown — does `LmxProxy.dll` itself
issue a Suspend/Activate ORPC method? — is filed under F45 with a Frida
hook plan.
+4
View File
@@ -62,6 +62,10 @@ crypto-bigint = "0.5"
quick-xml = "0.36"
tokio-util = { version = "0.7", features = ["codec"] }
zeroize = { version = "1", features = ["zeroize_derive"] }
# F40 — optional `mxaccess` feature `metrics`. Pin to 0.24.x (current
# stable line). The dep is only pulled in when the consumer enables
# `mxaccess/metrics`; the default build resolves without it.
metrics = "0.24"
[workspace.lints.rust]
unsafe_op_in_unsafe_fn = "warn"
@@ -0,0 +1,215 @@
//! Round-trip parity: buffered-subscribe `RegisterReference` (opcode `0x10`)
//! body, captured live with Frida.
//!
//! Closes the F36 DoD bullet 6 (`design/followups.md`): "Round-trip fixture
//! loaded from `captures/079-frida-add-buffered-advise-testint/` validating
//! the wire-byte sequence (call → response)."
//!
//! The .NET reference's [`MxNativeSession.RegisterBufferedItemAsync`]
//! (`MxNativeSession.cs:272-310`) builds a single `RegisterReference` frame
//! with `item_definition` suffixed by `.property(buffer)` and
//! `subscribe = true`. The Rust counterpart is
//! [`mxaccess::Session::subscribe_buffered`], which composes
//! [`mxaccess_codec::NmxReferenceRegistrationMessage::to_buffered_item_definition`]
//! with [`mxaccess_codec::NmxReferenceRegistrationMessage::encode`].
//!
//! Both fixtures below are the **inner LMX `RegisterReference` body** copied
//! verbatim from the corresponding capture's
//! `frida-events.tsv` (the `nmx.enter ... CNmxAdapter.PutRequest` row whose
//! candidate body starts with `10 01 00 ...`):
//!
//! - `082-frida-add-buffered-plain-advise-testint`: 173-byte body for
//! `(itemDefinition = "TestInt", itemContext = "TestChildObject")` with
//! correlation id `fb df 86 dc 1f c4 34 4b bb 26 a9 97 35 e9 b7 57`.
//! - `079-frida-add-buffered-advise-testint`: 173-byte body for the same
//! `(itemDefinition, itemContext)` pair with correlation id
//! `32 c3 d9 6d ed 72 f1 48 84 85 37 0c 66 bc f8 92`. (Capture 079 is the
//! `add-buffered-advise` scenario, which exercises the same wire frame
//! under a slightly different harness mode — both captures land on the
//! same `RegisterReference` shape.)
#![allow(
clippy::unwrap_used,
clippy::expect_used,
clippy::indexing_slicing,
clippy::panic
)]
use mxaccess_codec::NmxReferenceRegistrationMessage;
/// Decode a space-separated hex string into bytes. Mirrors
/// `Convert.FromHexString` from the `.NET` test helper.
fn hex_to_bytes(s: &str) -> Vec<u8> {
s.split_whitespace()
.map(|tok| u8::from_str_radix(tok, 16).expect("malformed hex token in fixture"))
.collect()
}
/// Captured `RegisterReference` (0x10) body from
/// `captures/082-frida-add-buffered-plain-advise-testint/frida-events.tsv`,
/// line 45 (`nmx.enter ... CNmxAdapter.PutRequest`, candidate size 173).
const CAPTURE_082_BODY_HEX: &str = "\
10 01 00 \
01 00 00 00 \
fb df 86 dc 1f c4 34 4b bb 26 a9 97 35 e9 b7 57 \
ff ff \
00 00 \
01 00 00 00 \
00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 \
32 00 00 81 \
54 00 65 00 73 00 74 00 49 00 6e 00 74 00 \
2e 00 70 00 72 00 6f 00 70 00 65 00 72 00 74 00 79 00 \
28 00 62 00 75 00 66 00 66 00 65 00 72 00 29 00 \
00 00 \
00 00 00 00 00 00 00 00 \
20 00 00 00 \
54 00 65 00 73 00 74 00 43 00 68 00 69 00 6c 00 64 00 \
4f 00 62 00 6a 00 65 00 63 00 74 00 \
00 00 \
00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 \
01";
/// Captured `RegisterReference` (0x10) body from
/// `captures/079-frida-add-buffered-advise-testint/frida-events.tsv`,
/// line 45 (`nmx.enter ... CNmxAdapter.PutRequest`, candidate size 173).
/// Differs from `CAPTURE_082_BODY_HEX` only in the 16-byte correlation id —
/// the rest of the wire shape is identical because both captures exercise
/// the same `(itemDefinition="TestInt", itemContext="TestChildObject")` pair.
const CAPTURE_079_BODY_HEX: &str = "\
10 01 00 \
01 00 00 00 \
32 c3 d9 6d ed 72 f1 48 84 85 37 0c 66 bc f8 92 \
ff ff \
00 00 \
01 00 00 00 \
00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 \
32 00 00 81 \
54 00 65 00 73 00 74 00 49 00 6e 00 74 00 \
2e 00 70 00 72 00 6f 00 70 00 65 00 72 00 74 00 79 00 \
28 00 62 00 75 00 66 00 66 00 65 00 72 00 29 00 \
00 00 \
00 00 00 00 00 00 00 00 \
20 00 00 00 \
54 00 65 00 73 00 74 00 43 00 68 00 69 00 6c 00 64 00 \
4f 00 62 00 6a 00 65 00 63 00 74 00 \
00 00 \
00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 \
01";
/// Helper — assemble a `NmxReferenceRegistrationMessage` matching the
/// captured fixture and assert it encodes to the same bytes the .NET
/// reference + LMX server emit on the wire. Mirrors the .NET reference's
/// `MxNativeSession.RegisterBufferedItemAsync` request build:
///
/// ```csharp
/// var message = new NmxReferenceRegistrationMessage(
/// itemHandle,
/// subscription.CorrelationId,
/// NmxReferenceRegistrationMessage.ToBufferedItemDefinition(itemDefinition),
/// itemContext,
/// Subscribe: true);
/// ```
fn assert_roundtrip(captured_hex: &str, correlation_id: [u8; 16]) {
let captured = hex_to_bytes(captured_hex);
// Parse the captured bytes — should succeed cleanly.
let parsed = NmxReferenceRegistrationMessage::parse(&captured)
.expect("parse captured RegisterReference body");
// Sanity-check the high-level fields the F36 implementation depends on.
assert_eq!(
parsed.item_handle, 1,
"captured item_handle (LMXProxy harness uses sequential int handles starting at 1)"
);
assert_eq!(parsed.item_correlation_id, correlation_id);
assert_eq!(
parsed.item_definition, "TestInt.property(buffer)",
"buffered suffix preserved"
);
assert!(
parsed.subscribe,
"subscribe flag — buffered RegisterReference always sets it (.NET MxNativeSession.cs:298)"
);
// Re-encode and confirm byte-identical.
let re_encoded = parsed.encode();
assert_eq!(
re_encoded, captured,
"RegisterReference body must round-trip byte-identical"
);
// Also confirm the suffix helper is idempotent on an already-buffered name
// — the .NET reference does the same case-insensitive guard at
// `NmxReferenceRegistrationMessage.cs:96-102`.
let resuffixed =
NmxReferenceRegistrationMessage::to_buffered_item_definition(&parsed.item_definition)
.expect("re-applying buffered suffix");
assert_eq!(resuffixed, parsed.item_definition);
}
#[test]
fn capture_082_register_reference_round_trips() {
assert_roundtrip(
CAPTURE_082_BODY_HEX,
[
0xfb, 0xdf, 0x86, 0xdc, 0x1f, 0xc4, 0x34, 0x4b, 0xbb, 0x26, 0xa9, 0x97, 0x35, 0xe9,
0xb7, 0x57,
],
);
}
#[test]
fn capture_079_register_reference_round_trips() {
assert_roundtrip(
CAPTURE_079_BODY_HEX,
[
0x32, 0xc3, 0xd9, 0x6d, 0xed, 0x72, 0xf1, 0x48, 0x84, 0x85, 0x37, 0x0c, 0x66, 0xbc,
0xf8, 0x92,
],
);
}
#[test]
fn buffered_suffix_helper_matches_captured_definition() {
// F36 DoD bullet 1 verification: the codec helper that the Rust
// `Session::subscribe_buffered` calls must produce the exact suffix
// the captured wire bytes carry.
let suffixed = NmxReferenceRegistrationMessage::to_buffered_item_definition("TestInt").unwrap();
assert_eq!(suffixed, "TestInt.property(buffer)");
}
#[test]
fn buffered_register_reference_constructed_from_session_inputs_matches_capture_082() {
// Forward-build the message from the same inputs `Session::subscribe_buffered`
// gathers (correlation id + already-suffixed item definition + empty
// item context, with subscribe=true) and assert the encoded body
// matches the capture once we plug in the capture's specific
// `(item_context = "TestChildObject")` from the .NET probe harness.
//
// The Rust simple-form `subscribe_buffered(reference, ...)` passes
// the FULL reference as `item_definition` with empty `item_context`;
// capture 082 came from the LMXProxy compatibility surface which
// splits the reference into `(itemDefinition="TestInt", itemContext="TestChildObject")`.
// Both forms are valid on the wire — this test exercises the
// split-context form to confirm the Rust codec produces the identical
// bytes the live LMX server saw.
let captured = hex_to_bytes(CAPTURE_082_BODY_HEX);
let item_definition =
NmxReferenceRegistrationMessage::to_buffered_item_definition("TestInt").unwrap();
let msg = NmxReferenceRegistrationMessage {
item_handle: 1,
item_correlation_id: [
0xfb, 0xdf, 0x86, 0xdc, 0x1f, 0xc4, 0x34, 0x4b, 0xbb, 0x26, 0xa9, 0x97, 0x35, 0xe9,
0xb7, 0x57,
],
item_definition,
item_context: "TestChildObject".to_string(),
subscribe: true,
reserved_25_27: [0; 2],
reserved_31_55: [0; 24],
};
let encoded = msg.encode();
assert_eq!(encoded, captured);
}
+8 -1
View File
@@ -22,6 +22,10 @@ tracing = { workspace = true }
futures-util = { workspace = true }
tokio-stream = { version = "0.1", features = ["sync"] }
rand = "0.8"
# F40 — optional `metrics` feature. Default build does NOT depend on
# this crate; enable via `--features metrics` to wire counters and
# histograms into a downstream `metrics::Recorder`.
metrics = { workspace = true, optional = true }
[dev-dependencies]
async-trait = { workspace = true }
@@ -31,7 +35,10 @@ default = []
# Transport feature gates land in M2-M5.
nmx = []
asb = []
metrics = []
# F40 — wire counters / histograms / gauges via the `metrics` crate.
# Default build is allocation-neutral: no `metrics` dep, no runtime cost.
# See `src/metrics.rs` for the emitted metric inventory.
metrics = ["dep:metrics"]
serde = ["mxaccess-codec/serde"]
# `live` gates integration tests that hit a running AVEVA install. Driven by
# the `MX_LIVE` env var via `tools/Setup-LiveProbeEnv.ps1`.
@@ -1,64 +1,170 @@
//! `subscribe-buffered` — buffered subscription demonstration (M6 placeholder).
//! `subscribe-buffered` — open a buffered subscription with a 1-second cadence.
//!
//! Per `wwtools/mxaccesscli/docs/api-notes.md:138-140`, "buffered" is a
//! delivery-cadence knob (`SetBufferedUpdateInterval`), **not** multi-sample
//! payload bundling. Each event still carries one sample; the cadence
//! controls how often the server flushes accumulated updates.
//! Per `wwtools/mxaccesscli/docs/api-notes.md:97-100,138-140,154-157` (R2 in
//! `design/70-risks-and-open-questions.md`), the `update_interval_ms` knob
//! controls the **delivery cadence** — each emitted event still carries one
//! sample, **not** a multi-sample payload. The returned [`mxaccess::Subscription`]
//! is the same `Stream<Item = Result<DataChange, Error>>` as plain
//! [`mxaccess::Session::subscribe`].
//!
//! `Session::subscribe_buffered` is currently `Err(Error::Unsupported)`
//! pending the M6 buffered-mode RPC port. Once the surface lands the
//! demo body below becomes ~10 lines using the same `Stream` interface
//! as `subscribe.rs`.
//! Drains up to 5 updates (or a 30s timeout, whichever first), prints each,
//! then unsubscribes cleanly. Mirrors the `subscribe.rs` shape — see that
//! example for the env-var contract and resolver shim design notes.
use mxaccess::{BufferedOptions, ConnectionOptions, Session};
use std::sync::Arc;
use std::time::Duration;
use futures_util::StreamExt;
use mxaccess::{
BufferedOptions, GalaxyTagMetadata, RecoveryPolicy, Resolver, ResolverError, Session,
SessionOptions,
};
use mxaccess_rpc::guid::Guid;
use mxaccess_rpc::ntlm::NtlmClientContext;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
if std::env::var_os("MX_LIVE").is_none() {
let Some(env) = LiveEnv::from_process()? else {
eprintln!(
"MX_LIVE not set — `subscribe-buffered` is the M6 placeholder; \
run `. tools/Setup-LiveProbeEnv.ps1` to populate live env vars \
once the buffered subscribe surface lands."
"MX_LIVE not set — skipping live demo. Run \
`. tools/Setup-LiveProbeEnv.ps1` to populate the required env vars."
);
return Ok(());
}
// The constructor itself is currently Unsupported (gated on M3 transport
// selection wiring). When it lands, swap to `Session::connect_nmx` like
// the other examples.
let session = match Session::connect(ConnectionOptions).await {
Ok(s) => s,
Err(mxaccess::Error::Unsupported {
operation,
transport,
}) => {
eprintln!(
"Session::connect / subscribe_buffered are deferred to M6: \
{operation} on {transport:?} transport. See \
design/followups.md for the buffered-subscribe gating note."
);
return Ok(());
}
Err(e) => return Err(e.into()),
};
let session = Session::connect_nmx(
env.addr,
SessionOptions::default(),
NtlmClientContext::from_env()?,
env.service_ipid,
Arc::new(StaticResolver::new(&env.tag)),
RecoveryPolicy::default(),
)
.await?;
let opts = BufferedOptions {
update_interval_ms: 250,
update_interval_ms: 1_000,
};
match session
.subscribe_buffered("TestChildObject.TestInt", opts)
.await
{
Ok(_) => {
eprintln!(
"subscribe_buffered returned a subscription unexpectedly — \
check whether M6 has landed and update this example."
);
eprintln!(
"buffered-subscribing to {} (requested cadence {} ms, rounded to {} ms)",
env.tag,
opts.update_interval_ms,
opts.rounded_update_interval_ms()
);
let mut sub = session.subscribe_buffered(&env.tag, opts).await?;
eprintln!("correlation_id = {:02x?}", sub.correlation_id());
let mut received = 0;
while received < 5 {
match tokio::time::timeout(Duration::from_secs(30), sub.next()).await {
Ok(Some(Ok(dc))) => {
println!("{} = {:?} ts={:?}", dc.reference, dc.value, dc.timestamp);
received += 1;
}
Ok(Some(Err(e))) => {
eprintln!("subscription error: {e}");
break;
}
Ok(None) => {
eprintln!("subscription stream ended");
break;
}
Err(_) => {
eprintln!("no update within 30s; exiting after {received} updates");
break;
}
}
Err(mxaccess::Error::Unsupported { operation, .. }) => {
eprintln!("{operation}: deferred to M6 (see design/followups.md)");
}
Err(e) => return Err(e.into()),
}
session.unsubscribe(sub).await?;
session.shutdown_nmx().await?;
Ok(())
}
// ---- shared boilerplate (see subscribe.rs / connect-write-read.rs for rationale) ----
struct LiveEnv {
addr: std::net::SocketAddr,
service_ipid: Guid,
tag: String,
}
impl LiveEnv {
fn from_process() -> Result<Option<Self>, Box<dyn std::error::Error>> {
if std::env::var_os("MX_LIVE").is_none() {
return Ok(None);
}
let host = std::env::var("MX_NMX_HOST")?;
let addr = parse_host_port(&host, 135)?;
let service_ipid = Guid::parse_str(&std::env::var("MX_NMX_SERVICE_IPID")?)?;
let tag = std::env::var("MX_TEST_TAG").unwrap_or_else(|_| "TestChildObject.TestInt".into());
Ok(Some(Self {
addr,
service_ipid,
tag,
}))
}
}
fn parse_host_port(
s: &str,
default_port: u16,
) -> Result<std::net::SocketAddr, Box<dyn std::error::Error>> {
if let Ok(addr) = s.parse() {
return Ok(addr);
}
let with_port = if s.contains(':') {
s.to_string()
} else {
format!("{s}:{default_port}")
};
Ok(
std::net::ToSocketAddrs::to_socket_addrs(&with_port.as_str())?
.next()
.ok_or("no addrs resolved")?,
)
}
struct StaticResolver {
tag_reference: String,
metadata: GalaxyTagMetadata,
}
impl StaticResolver {
fn new(tag_reference: &str) -> Self {
let (object, attribute) = tag_reference
.split_once('.')
.unwrap_or((tag_reference, "TestInt"));
Self {
tag_reference: tag_reference.to_string(),
metadata: GalaxyTagMetadata {
object_tag_name: object.to_string(),
attribute_name: attribute.to_string(),
primitive_name: None,
platform_id: 1,
engine_id: 2,
object_id: 3,
primitive_id: 0,
attribute_id: 7,
property_id: GalaxyTagMetadata::VALUE_PROPERTY_ID,
mx_data_type: 2,
is_array: false,
security_classification: 0,
attribute_source: "dynamic".into(),
},
}
}
}
#[async_trait::async_trait]
impl Resolver for StaticResolver {
async fn resolve(&self, tag: &str) -> Result<GalaxyTagMetadata, ResolverError> {
if tag == self.tag_reference {
Ok(self.metadata.clone())
} else {
Err(ResolverError::NotFound {
tag_reference: tag.to_string(),
})
}
}
}
+10 -3
View File
@@ -64,6 +64,7 @@ use tokio::sync::{mpsc, Mutex};
use tokio::task::JoinHandle;
use tokio_stream::wrappers::ReceiverStream;
use crate::metrics as session_metrics;
use crate::transport_asb::AsbTransport;
use crate::{BufferedOptions, ConnectionError, Error, TransportKind};
@@ -184,7 +185,10 @@ impl AsbSession {
pub async fn read(&self, items: &[ItemIdentity]) -> Result<ReadResponse, Error> {
let mut transport = self.inner.transport.lock().await;
let client = transport.client_mut();
client.read(items).await.map_err(map_client_error)
let resp = client.read(items).await.map_err(map_client_error)?;
// F40 — count successful ASB reads.
session_metrics::record_asb_read();
Ok(resp)
}
/// `Write` — set the value of each item. `items.len()` should
@@ -198,10 +202,13 @@ impl AsbSession {
) -> Result<WriteResponse, Error> {
let mut transport = self.inner.transport.lock().await;
let client = transport.client_mut();
client
let resp = client
.write(items, values, write_handle)
.await
.map_err(map_client_error)
.map_err(map_client_error)?;
// F40 — count successful ASB writes.
session_metrics::record_asb_write();
Ok(resp)
}
/// `KeepAlive` — one-way heartbeat to keep the channel alive
+196 -11
View File
@@ -30,6 +30,7 @@ pub use mxaccess_codec::{
// ---- Public types --------------------------------------------------------
pub mod asb_session;
pub(crate) mod metrics;
pub mod session;
pub mod transport_asb;
@@ -67,14 +68,60 @@ pub struct DataChange {
/// Buffered subscription delivery — single-sample-per-event with a configurable
/// flush cadence. **Not** multi-sample payload bundles per
/// `wwtools/mxaccesscli/docs/api-notes.md:138-140` (R2 verified).
///
/// Retained as a marker type for back-compat; the wire path returns the
/// same [`Subscription`] handle as plain [`Session::subscribe`] because
/// the buffered cadence is a server-side delivery rate knob, not a
/// payload-shape change. Each event still carries one
/// [`DataChange`] sample.
#[derive(Debug, Clone)]
pub struct BufferedSubscription;
/// Configuration for [`Session::subscribe_buffered`].
///
/// The cadence is a server-side delivery rate knob (single-sample per
/// event), **not** a multi-sample payload bundle. Verified at
/// `wwtools/mxaccesscli/docs/api-notes.md:97-100,138-140,154-157`
/// (R2 in `design/70-risks-and-open-questions.md`).
#[derive(Debug, Clone, Copy)]
pub struct BufferedOptions {
/// Requested delivery interval in milliseconds. The wire layer
/// rounds this **up** to the nearest 100 ms — the same rounding the
/// .NET reference applies in
/// `MxNativeCompatibilityServer.SetBufferedUpdateInterval:638`
/// (`((updateInterval + 99) / 100) * 100`). Values below 100 round
/// up to 100; zero or negative requests are rejected by
/// [`Session::subscribe_buffered`].
pub update_interval_ms: u32,
}
impl BufferedOptions {
/// Round `update_interval_ms` **up** to the nearest 100 ms, mirroring
/// `MxNativeCompatibilityServer.SetBufferedUpdateInterval`
/// (`MxNativeCompatibilityServer.cs:638`):
///
/// ```text
/// _bufferedUpdateIntervals[serverHandle] = ((updateInterval + 99) / 100) * 100;
/// ```
///
/// Saturates rather than overflowing — `u32::MAX` rounds up to
/// `u32::MAX` (the live LMX update intervals are far below this
/// limit so the saturating branch is unreachable in practice; the
/// helper is exposed for unit testing).
#[must_use]
pub const fn rounded_update_interval_ms(self) -> u32 {
let v = self.update_interval_ms;
// Saturating equivalent of `((v + 99) / 100) * 100`. The .NET
// version uses `int` arithmetic; Rust's `u32` is wider than the
// .NET `int` for the positive range we care about and saturates
// explicitly to keep the helper total.
match v.checked_add(99) {
Some(plus) => (plus / 100) * 100,
None => u32::MAX,
}
}
}
#[derive(Debug, Clone)]
pub struct SecurityContext {
pub current_user_id: i32,
@@ -514,20 +561,52 @@ impl Session {
})
}
/// Buffered subscription with a delivery-cadence knob. Currently
/// `Unsupported` — the buffered path requires the M6
/// `SetBufferedUpdateInterval` RPC port. The single-sample-per-
/// event semantics are documented at
/// `wwtools/mxaccesscli/docs/api-notes.md:138-140`.
/// Buffered subscription with a delivery-cadence knob.
///
/// Returns a [`Subscription`] yielding `Result<DataChange, Error>` —
/// the **same item type** as plain [`Self::subscribe`]. Buffered is a
/// **single-sample, cadence knob — not multi-sample payload**:
/// `update_interval_ms` controls how often the AVEVA platform
/// flushes the latest value, but each emitted event still carries
/// one sample. Verified against
/// `wwtools/mxaccesscli/docs/api-notes.md:97-100,138-140,154-157`
/// (R2 in `design/70-risks-and-open-questions.md`).
///
/// ## Wire encoding
///
/// Mirrors `MxNativeSession.RegisterBufferedItemAsync`
/// (`MxNativeSession.cs:272-310`): suffixes the item definition
/// with `.property(buffer)` (via
/// [`mxaccess_codec::NmxReferenceRegistrationMessage::to_buffered_item_definition`])
/// and dispatches a single LMX `RegisterReference` (opcode `0x10`)
/// with `subscribe = true`. No `AdviseSupervisory` follow-up is
/// emitted — the server treats the subscribe-flagged
/// `RegisterReference` as a supervisory advise (verified against
/// `captures/082-frida-add-buffered-plain-advise-testint`).
///
/// `update_interval_ms` is rounded **up** to the nearest 100 ms, the
/// same rounding the .NET reference applies in
/// `MxNativeCompatibilityServer.SetBufferedUpdateInterval:638`. The
/// rounded value is held client-side: native MXAccess does not emit
/// a separate `SetBufferedUpdateInterval` RPC (verified by absence
/// in the `079`/`082` captures — `mx.set-buffered-interval.begin/end`
/// produces no NMX traffic).
///
/// # Errors
/// - [`Error::Connection`] if the session is shut down.
/// - [`Error::Configuration`] when `update_interval_ms == 0`
/// (matches the .NET reference's
/// `ArgumentOutOfRangeException` at
/// `MxNativeCompatibilityServer.cs:630-633`), when the resolver
/// rejects `reference`, or when the LMX server returns a
/// non-zero HRESULT for the `RegisterReference` round-trip.
/// - [`Error::Io`] / transport errors from the underlying RPC.
pub async fn subscribe_buffered(
&self,
_reference: &str,
_options: BufferedOptions,
reference: &str,
options: BufferedOptions,
) -> Result<Subscription, Error> {
Err(Error::Unsupported {
operation: Cow::Borrowed("Session::subscribe_buffered (M6)"),
transport: TransportKind::Nmx,
})
self.subscribe_buffered_nmx(reference, options).await
}
/// Orderly shutdown with a wall-clock bound. Wraps
@@ -616,6 +695,112 @@ fn mxvalue_to_writevalue(value: MxValue) -> Result<mxaccess_nmx::WriteValue, Err
mod tests {
use super::*;
// ---- BufferedOptions ------------------------------------------------
#[test]
fn buffered_options_rounds_below_100_up_to_100() {
// ((1 + 99) / 100) * 100 == 100 — same arithmetic as
// MxNativeCompatibilityServer.cs:638.
assert_eq!(
BufferedOptions {
update_interval_ms: 1,
}
.rounded_update_interval_ms(),
100
);
assert_eq!(
BufferedOptions {
update_interval_ms: 99,
}
.rounded_update_interval_ms(),
100
);
assert_eq!(
BufferedOptions {
update_interval_ms: 100,
}
.rounded_update_interval_ms(),
100
);
}
#[test]
fn buffered_options_rounds_partial_up() {
// 101 → 200, 250 → 300, 999 → 1000.
assert_eq!(
BufferedOptions {
update_interval_ms: 101,
}
.rounded_update_interval_ms(),
200
);
assert_eq!(
BufferedOptions {
update_interval_ms: 250,
}
.rounded_update_interval_ms(),
300
);
assert_eq!(
BufferedOptions {
update_interval_ms: 999,
}
.rounded_update_interval_ms(),
1_000
);
}
#[test]
fn buffered_options_preserves_exact_multiples_of_100() {
for n in [200, 1_000, 5_000, 60_000] {
assert_eq!(
BufferedOptions {
update_interval_ms: n,
}
.rounded_update_interval_ms(),
n,
"exact multiple of 100 should round to itself: {n}"
);
}
}
#[test]
fn buffered_options_rounds_zero_to_zero() {
// Zero is the explicit "rejected" value at
// MxNativeCompatibilityServer.cs:630-633; the helper itself is
// total (`Session::subscribe_buffered` does the validation), so
// zero rounds to zero rather than panicking.
assert_eq!(
BufferedOptions {
update_interval_ms: 0,
}
.rounded_update_interval_ms(),
0
);
}
#[test]
fn buffered_options_saturates_at_u32_max() {
// u32::MAX + 99 would overflow; the helper saturates instead.
let opts = BufferedOptions {
update_interval_ms: u32::MAX,
};
assert_eq!(opts.rounded_update_interval_ms(), u32::MAX);
}
/// Compile-time check that `Session::subscribe_buffered` returns the
/// same `Subscription` type as plain `subscribe` (F36 DoD bullet 1
/// plus bullet 3 — the API discoverably documents single-sample
/// semantics by NOT introducing a separate `BufferedSubscription`-shaped
/// return).
#[allow(dead_code)]
fn _subscribe_buffered_returns_subscription(
s: &Session,
opts: BufferedOptions,
) -> impl std::future::Future<Output = Result<Subscription, Error>> + '_ {
s.subscribe_buffered("ignored", opts)
}
// ---- RecoveryPolicy ------------------------------------------------
#[test]
+275
View File
@@ -0,0 +1,275 @@
//! F40 — optional metrics emission via the [`metrics`] crate.
//!
//! Behind the `metrics` Cargo feature this module wires session-level
//! counters / histograms / gauges into a downstream
//! [`metrics::Recorder`]. With the feature off (the default), every
//! function in this module compiles to an empty body — no `metrics`
//! dep, no per-call overhead, no allocations.
//!
//! ## Design
//!
//! - Call sites in [`crate::session`] and [`crate::asb_session`] invoke
//! the wrappers below **unconditionally**. The feature gate lives
//! inside each wrapper, not at the call site, so the instrumentation
//! stays out of the way of the protocol code.
//! - Names follow `mxaccess.<scope>.<noun>` (dotted, lowercase) to
//! match the convention recommended by the `metrics` crate's
//! [naming guide](https://docs.rs/metrics/0.24/metrics/#naming-conventions).
//! - All metric definitions are listed below so an operator wiring an
//! exporter knows what to expect.
//!
//! ## Emitted metrics
//!
//! ### Counters
//!
//! - `mxaccess.session.writes` — incremented after each successful
//! `Session::write_value` / `write_value_at` /
//! `write_value_secured_at`. Label: `transport=nmx`.
//! - `mxaccess.session.reads` — incremented when `Session::read`
//! returns the first DataChange. Label: `transport=nmx`.
//! - `mxaccess.session.advises` — incremented after each successful
//! `Session::subscribe` (one per `AdviseSupervisory` round-trip).
//! Label: `transport=nmx`.
//! - `mxaccess.session.unadvises` — incremented after each successful
//! `Session::unsubscribe`. Label: `transport=nmx`.
//! - `mxaccess.session.recovery_attempts` — incremented on each
//! `RecoveryEvent::Started` emission. Label: `transport=nmx`.
//! - `mxaccess.session.recovery_successes` — incremented on each
//! `RecoveryEvent::Recovered` emission. Label: `transport=nmx`.
//! - `mxaccess.asb.writes` / `.reads` / `.publishes` — ASB-side
//! counterparts (transport=`asb`). Currently `writes` and `reads`
//! are wired; `publishes` is reserved for the streaming subscribe
//! path (F40 follow-up).
//!
//! ### Histograms (seconds)
//!
//! - `mxaccess.session.write.latency_seconds` — wall time from
//! `Session::write*` entry to a successful return (the round-trip
//! that ends with the inner LMX `OperationComplete`).
//! - `mxaccess.session.read.latency_seconds` — wall time from
//! `Session::read` entry to first DataChange parsed.
//! - `mxaccess.session.subscribe.first_data_change_seconds` —
//! currently unused (reserved for a future first-DataChange
//! instrumentation in `Subscription::poll_next`); kept in the
//! spec for parity with the F40 design.
//!
//! ### Gauges
//!
//! - `mxaccess.session.connected` — `1` while a `Session` is
//! connected, `0` after `shutdown_nmx`. Operators can sum across
//! processes for a "live sessions" view.
//! - `mxaccess.session.registered_items` — current size of the
//! subscription registry (`SessionInner::subscriptions`); rises
//! on `subscribe`, falls on `unsubscribe`.
//! - `mxaccess.session.active_subscriptions` — alias of
//! `registered_items` for now (kept distinct so a future
//! "registered but unsubscribed" distinction can split them
//! without a metric rename).
//!
//! ## Wiring an exporter
//!
//! ```ignore
//! // In a downstream binary, behind the same feature flag:
//! # #[cfg(feature = "metrics")] {
//! let recorder = metrics_exporter_prometheus::PrometheusBuilder::new()
//! .install_recorder()
//! .unwrap();
//! // ... later, scrape `/metrics` from `recorder.handle().render()`.
//! # }
//! ```
//!
//! No exporter is provided by this crate; pick one from the
//! [`metrics-exporter-*`](https://crates.io/search?q=metrics-exporter)
//! ecosystem (Prometheus, StatsD, OpenTelemetry, …).
use std::time::Duration;
// Static label slices reused at every call site. The `metrics` crate's
// `counter!`/`histogram!`/`gauge!` macros accept `(&'static str, &'static str)`
// pairs directly, but having the labels here keeps the call sites brief.
#[cfg(feature = "metrics")]
const TRANSPORT_NMX: (&str, &str) = ("transport", "nmx");
#[cfg(feature = "metrics")]
const TRANSPORT_ASB: (&str, &str) = ("transport", "asb");
// ---- Counters ------------------------------------------------------------
/// One successful NMX `Session::write*` round-trip.
pub(crate) fn record_write() {
#[cfg(feature = "metrics")]
metrics::counter!("mxaccess.session.writes", &[TRANSPORT_NMX]).increment(1);
}
/// One successful NMX `Session::read` (first DataChange returned).
pub(crate) fn record_read() {
#[cfg(feature = "metrics")]
metrics::counter!("mxaccess.session.reads", &[TRANSPORT_NMX]).increment(1);
}
/// One successful NMX `Session::subscribe` (`AdviseSupervisory` ack).
pub(crate) fn record_advise() {
#[cfg(feature = "metrics")]
metrics::counter!("mxaccess.session.advises", &[TRANSPORT_NMX]).increment(1);
}
/// One successful NMX `Session::unsubscribe` (`UnAdvise` ack).
pub(crate) fn record_unadvise() {
#[cfg(feature = "metrics")]
metrics::counter!("mxaccess.session.unadvises", &[TRANSPORT_NMX]).increment(1);
}
/// `RecoveryEvent::Started` emitted (one per recovery attempt).
pub(crate) fn record_recovery_attempt() {
#[cfg(feature = "metrics")]
metrics::counter!("mxaccess.session.recovery_attempts", &[TRANSPORT_NMX]).increment(1);
}
/// `RecoveryEvent::Recovered` emitted (rebuild + re-advise succeeded).
pub(crate) fn record_recovery_success() {
#[cfg(feature = "metrics")]
metrics::counter!("mxaccess.session.recovery_successes", &[TRANSPORT_NMX]).increment(1);
}
/// One successful ASB `AsbSession::write` round-trip.
pub(crate) fn record_asb_write() {
#[cfg(feature = "metrics")]
metrics::counter!("mxaccess.asb.writes", &[TRANSPORT_ASB]).increment(1);
}
/// One successful ASB `AsbSession::read` round-trip.
pub(crate) fn record_asb_read() {
#[cfg(feature = "metrics")]
metrics::counter!("mxaccess.asb.reads", &[TRANSPORT_ASB]).increment(1);
}
// ---- Histograms ----------------------------------------------------------
/// Wall-time of a successful NMX write.
pub(crate) fn record_write_latency(elapsed: Duration) {
#[cfg(feature = "metrics")]
metrics::histogram!("mxaccess.session.write.latency_seconds", &[TRANSPORT_NMX])
.record(elapsed.as_secs_f64());
#[cfg(not(feature = "metrics"))]
let _ = elapsed;
}
/// Wall-time of a successful NMX read (subscribe → first DataChange).
pub(crate) fn record_read_latency(elapsed: Duration) {
#[cfg(feature = "metrics")]
metrics::histogram!("mxaccess.session.read.latency_seconds", &[TRANSPORT_NMX])
.record(elapsed.as_secs_f64());
#[cfg(not(feature = "metrics"))]
let _ = elapsed;
}
// ---- Gauges --------------------------------------------------------------
/// Set the `connected` gauge to `connected as f64`.
pub(crate) fn set_connected(connected: bool) {
#[cfg(feature = "metrics")]
metrics::gauge!("mxaccess.session.connected", &[TRANSPORT_NMX]).set(if connected {
1.0
} else {
0.0
});
#[cfg(not(feature = "metrics"))]
let _ = connected;
}
/// Set the `registered_items` / `active_subscriptions` gauges to `count`.
pub(crate) fn set_registered_items(count: usize) {
#[cfg(feature = "metrics")]
{
let v = count as f64;
metrics::gauge!("mxaccess.session.registered_items", &[TRANSPORT_NMX]).set(v);
metrics::gauge!("mxaccess.session.active_subscriptions", &[TRANSPORT_NMX]).set(v);
}
#[cfg(not(feature = "metrics"))]
let _ = count;
}
// ---- Tests ---------------------------------------------------------------
#[cfg(all(test, feature = "metrics"))]
#[allow(clippy::unwrap_used, clippy::panic, clippy::expect_used)]
mod tests {
use super::*;
use metrics::{CounterFn, GaugeFn, HistogramFn};
use metrics::{Key, Label, Recorder, SharedString, Unit};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
/// Minimal in-memory recorder that counts calls per metric name. We
/// only need to verify a counter increments — full per-label
/// aggregation is the exporter's job.
#[derive(Default)]
struct CountingRecorder {
writes: Arc<AtomicU64>,
}
struct CountingCounter(Arc<AtomicU64>);
impl CounterFn for CountingCounter {
fn increment(&self, value: u64) {
self.0.fetch_add(value, Ordering::Relaxed);
}
fn absolute(&self, value: u64) {
self.0.store(value, Ordering::Relaxed);
}
}
struct NoopHistogram;
impl HistogramFn for NoopHistogram {
fn record(&self, _: f64) {}
}
struct NoopGauge;
impl GaugeFn for NoopGauge {
fn increment(&self, _: f64) {}
fn decrement(&self, _: f64) {}
fn set(&self, _: f64) {}
}
impl Recorder for CountingRecorder {
fn describe_counter(&self, _: metrics::KeyName, _: Option<Unit>, _: SharedString) {}
fn describe_gauge(&self, _: metrics::KeyName, _: Option<Unit>, _: SharedString) {}
fn describe_histogram(&self, _: metrics::KeyName, _: Option<Unit>, _: SharedString) {}
fn register_counter(&self, key: &Key, _: &metrics::Metadata<'_>) -> metrics::Counter {
if key.name() == "mxaccess.session.writes" {
metrics::Counter::from_arc(Arc::new(CountingCounter(self.writes.clone())))
} else {
metrics::Counter::noop()
}
}
fn register_gauge(&self, _: &Key, _: &metrics::Metadata<'_>) -> metrics::Gauge {
metrics::Gauge::from_arc(Arc::new(NoopGauge))
}
fn register_histogram(&self, _: &Key, _: &metrics::Metadata<'_>) -> metrics::Histogram {
metrics::Histogram::from_arc(Arc::new(NoopHistogram))
}
}
/// DoD bullet 4: at least one metric counter increments under a
/// unit test. We swap in the `CountingRecorder` for the duration
/// of the test using `metrics::with_local_recorder` — this is
/// the supported way to scope a recorder to a single test
/// without poisoning the global slot.
#[test]
fn record_write_increments_session_writes_counter() {
let recorder = CountingRecorder::default();
let counter = recorder.writes.clone();
metrics::with_local_recorder(&recorder, || {
record_write();
record_write();
record_write();
});
assert_eq!(counter.load(Ordering::Relaxed), 3);
// Touch the unused-import warnings so the macro keeps Label /
// Key in scope across `metrics` minor bumps.
let _ = Label::new("k", "v");
let _: Option<&Key> = None;
}
}
+187 -11
View File
@@ -33,7 +33,9 @@ use std::sync::Arc;
use std::time::SystemTime;
use mxaccess_callback::{CallbackEvent, CallbackExporter, ExporterIdentities};
use mxaccess_codec::{MxStatus, NmxSubscriptionMessage, NmxSubscriptionRecord};
use mxaccess_codec::{
MxStatus, NmxReferenceRegistrationMessage, NmxSubscriptionMessage, NmxSubscriptionRecord,
};
use mxaccess_galaxy::{GalaxyTagMetadata, Resolver, ResolverError};
use mxaccess_nmx::{NmxClient, NmxClientError, WriteValue};
use mxaccess_rpc::guid::Guid;
@@ -47,6 +49,7 @@ use tokio::sync::{Mutex, broadcast};
use tokio::task::JoinHandle;
use tokio_stream::wrappers::BroadcastStream;
use crate::metrics as session_metrics;
use crate::{DataChange, RecoveryEvent};
use futures_util::Stream;
@@ -587,6 +590,10 @@ impl Session {
let (recovery_tx, _) = broadcast::channel(RECOVERY_BROADCAST_CAPACITY);
// F40 — gauge stays at 1 until shutdown_nmx flips it.
session_metrics::set_connected(true);
session_metrics::set_registered_items(0);
Ok(Self {
inner: Arc::new(SessionInner {
options,
@@ -683,12 +690,16 @@ impl Session {
let mut last_error: Option<Error> = None;
for attempt in 1..=policy.max_attempts {
// F40 — increment attempt counter at Started emission.
session_metrics::record_recovery_attempt();
let _ = self
.inner
.recovery_tx
.send(Arc::new(RecoveryEvent::Started { attempt }));
match self.recover_connection_core(&factory).await {
Ok(()) => {
// F40 — increment success counter on Recovered.
session_metrics::record_recovery_success();
let _ = self
.inner
.recovery_tx
@@ -859,6 +870,7 @@ impl Session {
.await
.map_err(map_resolver)?;
let opts = &inner.options;
let started = std::time::Instant::now();
let mut nmx = inner.nmx.lock().await;
let hr = nmx
.write(
@@ -874,6 +886,9 @@ impl Session {
.await
.map_err(map_nmx)?;
ensure_hresult_ok(hr)?;
// F40 — count + record latency only on the success path.
session_metrics::record_write();
session_metrics::record_write_latency(started.elapsed());
Ok(())
}
@@ -900,6 +915,7 @@ impl Session {
.await
.map_err(map_resolver)?;
let opts = &inner.options;
let started = std::time::Instant::now();
let mut nmx = inner.nmx.lock().await;
let hr = nmx
.write2(
@@ -916,6 +932,10 @@ impl Session {
.await
.map_err(map_nmx)?;
ensure_hresult_ok(hr)?;
// F40 — write2 shares the writes counter (same Session::write*
// family on the wire).
session_metrics::record_write();
session_metrics::record_write_latency(started.elapsed());
Ok(())
}
@@ -951,6 +971,7 @@ impl Session {
.await
.map_err(map_resolver)?;
let opts = &inner.options;
let started = std::time::Instant::now();
let mut nmx = inner.nmx.lock().await;
let hr = nmx
.write_secured2(
@@ -970,6 +991,9 @@ impl Session {
.await
.map_err(map_nmx)?;
ensure_hresult_ok(hr)?;
// F40 — secured-write success counts toward the writes total.
session_metrics::record_write();
session_metrics::record_write_latency(started.elapsed());
Ok(())
}
@@ -1076,12 +1100,147 @@ impl Session {
// replay AdviseSupervisory after a transport rebuild. Inserted
// AFTER the wire AdviseSupervisory succeeds — failed advises
// never enter the registry.
inner.subscriptions.lock().await.insert(
let registry_size = {
let mut reg = inner.subscriptions.lock().await;
reg.insert(
correlation_id,
SubscriptionEntry {
metadata: Arc::clone(&metadata_arc),
},
);
reg.len()
};
// F40 — count the advise + update the gauge under the same
// ordering as the registry insert so the gauge value matches
// what `recover_connection`'s snapshot would observe.
session_metrics::record_advise();
session_metrics::set_registered_items(registry_size);
Ok(Subscription {
correlation_id,
SubscriptionEntry {
metadata: Arc::clone(&metadata_arc),
},
);
reference: Arc::<str>::from(reference),
metadata: metadata_arc,
inbound,
pending: std::collections::VecDeque::new(),
})
}
/// Real implementation of [`Session::subscribe_buffered`] for the
/// NMX transport. Mirrors `MxNativeSession.RegisterBufferedItemAsync`
/// (`MxNativeSession.cs:272-310`).
///
/// Wire-side: builds a single LMX `RegisterReference` (opcode `0x10`)
/// frame whose `item_definition` carries the `.property(buffer)`
/// suffix and whose `subscribe` flag is `true`. The server treats
/// this as a buffered supervisory advise — no separate
/// `AdviseSupervisory` follow-up is needed (verified against
/// `captures/082-frida-add-buffered-plain-advise-testint`, which
/// shows exactly one `RegisterReference` and zero `AdviseSupervisory`
/// frames between `mx.set-buffered-interval` and the first
/// `OnBufferedDataChange`).
///
/// `update_interval_ms` is rounded up to the nearest 100 ms via
/// [`crate::BufferedOptions::rounded_update_interval_ms`] and
/// retained client-side. Native MXAccess does not emit a separate
/// `SetBufferedUpdateInterval` RPC — the .NET reference's
/// `MxNativeCompatibilityServer.SetBufferedUpdateInterval`
/// (`cs:627-640`) only updates an in-memory dictionary; the rounding
/// is the only behaviour preserved on the Rust port today (the
/// rounded value is currently informational — a future iteration
/// can expose it via a getter when a consumer needs it).
pub(crate) async fn subscribe_buffered_nmx(
&self,
reference: &str,
options: crate::BufferedOptions,
) -> Result<Subscription, Error> {
self.ensure_connected()?;
if options.update_interval_ms == 0 {
// Mirrors `MxNativeCompatibilityServer.cs:630-633` —
// `ArgumentOutOfRangeException` for non-positive intervals.
return Err(Error::Configuration(ConfigError::InvalidArgument {
detail: "BufferedOptions::update_interval_ms must be positive".to_string(),
}));
}
// Round up to nearest 100ms (cs:638). The rounded value is
// computed for parity with the .NET reference; it is currently
// not transmitted on the wire because native MXAccess holds it
// client-side only (see capture 082's missing
// `SetBufferedUpdateInterval` frame).
let _rounded_ms = options.rounded_update_interval_ms();
let inner = self.inner.clone();
let metadata = inner
.resolver
.resolve(reference)
.await
.map_err(map_resolver)?;
let correlation_id: [u8; 16] = rand::random();
// Build the buffered RegisterReference body. Item definition is
// the full reference suffixed with `.property(buffer)`; item
// context is empty for this single-string form (the .NET
// reference's split-context form is reachable via the
// compat-server layer F35 once it lands). The codec helper
// rejects empty/whitespace inputs with `CodecError::InvalidName`.
let item_definition =
NmxReferenceRegistrationMessage::to_buffered_item_definition(reference)
.map_err(|e| {
Error::Configuration(ConfigError::InvalidArgument {
detail: format!("buffered item definition: {e}"),
})
})?;
let registration = NmxReferenceRegistrationMessage {
item_handle: 0,
item_correlation_id: correlation_id,
item_definition,
item_context: String::new(),
subscribe: true,
reserved_25_27: [0; 2],
reserved_31_55: [0; 24],
};
let opts = &inner.options;
// Subscribe to the broadcast BEFORE issuing the advise so updates
// arriving immediately after don't slip the gap (same ordering
// rationale as plain `subscribe`).
let inbound = Box::pin(BroadcastStream::new(self.inner.callback_tx.subscribe()));
let mut nmx = inner.nmx.lock().await;
let hr = nmx
.register_reference(
opts.local_engine_id,
&metadata,
&registration,
opts.galaxy_id,
/* source_galaxy_id */ i32::from(opts.galaxy_id),
opts.source_platform_id,
)
.await
.map_err(map_nmx)?;
ensure_hresult_ok(hr)?;
drop(nmx);
let metadata_arc = Arc::new(metadata);
// Record the active subscription so recover_connection can replay
// it after a transport rebuild. The replay path currently uses
// `AdviseSupervisory` for every entry; for buffered subscriptions
// that path is functionally equivalent (the LMX server already
// remembers the buffered registration via the `.property(buffer)`
// suffix carried in the metadata's name). Tracked as a sub-followup
// — see `design/followups.md` if a future iteration wants to
// re-issue `RegisterReference` instead.
let registry_size = {
let mut reg = inner.subscriptions.lock().await;
reg.insert(
correlation_id,
SubscriptionEntry {
metadata: Arc::clone(&metadata_arc),
},
);
reg.len()
};
session_metrics::record_advise();
session_metrics::set_registered_items(registry_size);
Ok(Subscription {
correlation_id,
@@ -1126,6 +1285,11 @@ impl Session {
}));
}
// F40 — clock the whole read-as-subscribe (including the
// AdviseSupervisory round-trip) so the histogram captures the
// same wall time a consumer would see.
let started = std::time::Instant::now();
// Subscribe through the public path so the broadcast wiring +
// AdviseSupervisory both run.
let subscription = self.subscribe(reference).await?;
@@ -1143,6 +1307,10 @@ impl Session {
Ok(None) => Err(Error::Connection(ConnectionError::EngineNotRegistered)),
Err(_elapsed) => Err(Error::Timeout(timeout)),
};
if result.is_ok() {
session_metrics::record_read();
session_metrics::record_read_latency(started.elapsed());
}
// Best-effort unsubscribe. The .NET finally block at cs:351-358
// ignores the return of Unsubscribe; mirror that — a failed
@@ -1192,11 +1360,14 @@ impl Session {
// We do this only on the success path — if UnAdvise itself
// failed, the server may still hold the supervisory record and
// a future recover_connection should re-issue the advise.
inner
.subscriptions
.lock()
.await
.remove(&subscription.correlation_id);
let registry_size = {
let mut reg = inner.subscriptions.lock().await;
reg.remove(&subscription.correlation_id);
reg.len()
};
// F40 — count the unadvise + update the gauge.
session_metrics::record_unadvise();
session_metrics::set_registered_items(registry_size);
Ok(())
}
@@ -1219,6 +1390,11 @@ impl Session {
{
return Ok(());
}
// F40 — flip the connected gauge as soon as the atomic flips
// so a concurrent scrape never sees connected=1 alongside a
// shutdown-in-progress.
session_metrics::set_connected(false);
session_metrics::set_registered_items(0);
// 1. Unregister the engine on the wire first, while the NMX
// transport is still live.