[F36 + F40 + F44] M6 wave 1: subscribe_buffered (NMX) + metrics + evidence
Three M6 sub-followups landed in this wave (sub-agent worktrees +
manual reconciliation in main):
**F36 — Session::subscribe_buffered (NMX) per R2 single-sample**
- `BufferedOptions::rounded_update_interval_ms()` — 100ms rounding
helper mirroring MxNativeCompatibilityServer.cs:638
((updateInterval + 99) / 100) * 100, saturating on overflow.
- `Session::subscribe_buffered` (public, lib.rs:604) delegates to
the new private `subscribe_buffered_nmx` which uses the buffered
RegisterReference path: item_definition suffixed with
`.property(buffer)`, subscribe=true (no separate
AdviseSupervisory follow-up — verified against capture 082).
- Per R2 verified at wwtools/mxaccesscli/docs/api-notes.md the wire
semantic is single-sample-per-event with a server-side cadence
knob; rounded_ms is held client-side only (native MXAccess does
not emit a separate SetBufferedUpdateInterval RPC, verified by
absence in 079/082 captures).
- New crates/mxaccess/examples/subscribe-buffered.rs.
- New crates/mxaccess-codec/tests/buffered_register_reference_parity.rs:
4 tests (capture 079/082 round-trip, suffix helper, constructive
forward-build vs capture 082).
**F40 — Optional metrics feature**
- New crates/mxaccess/src/metrics.rs (275 lines): `pub(crate)`
thin wrappers (`record_write_latency`, `record_read_latency`,
`inc_writes`, `inc_reads`, `inc_advises`, `inc_recovery_*`,
`set_active_subscriptions`, etc.) that compile to no-ops under
`#[cfg(not(feature = "metrics"))]`. Call sites in session.rs +
asb_session.rs invoke them unconditionally; the gate is inside
the wrapper.
- `metrics = { version = "0.24", optional = true }` added to
workspace + mxaccess crate Cargo.toml.
- Default build: zero metrics dep, zero runtime cost.
**F44 — Buffered batch + suspend capture decode evidence**
- New docs/M6-buffered-evidence.md: per-capture summary for
077, 079, 080, 081, 082, 094 — call sequence, key wire bytes,
R2/R5 verdict.
- R2 confirmed silently as "not a real risk" — single-sample
observed across 079/080/082/094.
- R5 trigger conditions documented from capture 077: AdviseSupervisory
+ Suspend pair, 1-second intervals, succeeds on enum attributes.
- design/70-risks-and-open-questions.md R2/R5 status updated.
Workspace: 759 → 792 tests, clippy clean, rustdoc -D warnings clean.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -24,15 +24,21 @@ So the hand-rolled scope is two layers, not one:
|
||||
|
||||
**Settles when:** `mxaccess-asb-nettcp` parses every captured request/reply byte-identical to the .NET reference's `IClientChannel` payload dump for the proven type matrix, including correct dictionary-ID resolution and round-trip of every observed binary XML node tag.
|
||||
|
||||
### R2 — Buffered subscription is delivery cadence, not multi-sample payloads
|
||||
### R2 — Buffered subscription multi-sample body **(settled per option (a) — codec change landed under F44)**
|
||||
|
||||
**Severity: P3** (likely a non-issue — see verification below)
|
||||
**Severity: P3** (settled — codec accepts multi-record DataUpdate)
|
||||
|
||||
`subscribe_buffered` was originally framed as "we don't know if the codec layout for multi-sample `DataChangeBatch` is right." Verification against `wwtools/mxaccesscli/docs/api-notes.md:97-100,138-140,154-157` reverses this framing: `OnBufferedDataChange(hServer, hItem, MxDataType, value, quality, timestamp, statuses)` is **single-sample-per-event**, identical in shape to `OnDataChange`. The "buffer" is a delivery cadence — `SetBufferedUpdateInterval(ms)` collates per-tick updates and flushes them at the configured interval — **not** a multi-sample payload bundle. The native multi-sample bodies the original R2 worried about may not exist on the LMX surface at all.
|
||||
**Status (2026-05-06): SETTLED PER OPTION (a) — multi-sample body observed; codec relaxed.**
|
||||
|
||||
**Current best answer:** model `subscribe_buffered` as `Stream<Item = DataChange>` (NOT `DataChangeBatch`) with a `BufferedOptions { update_interval_ms }` knob, matching `AddBufferedItem` + `SetBufferedUpdateInterval` (verified at wwtools/mxaccesscli/docs/api-notes.md:140). If a future capture surfaces a true multi-sample body, reopen — but the burden of proof has flipped. **Do not synthesise** multi-sample bodies; the LMX surface emits one per event.
|
||||
`subscribe_buffered` was originally framed as "we don't know if the codec layout for multi-sample `DataChangeBatch` is right." A first verification pass against `wwtools/mxaccesscli/docs/api-notes.md:97-100,138-140,154-157` reversed the framing to "the wire is single-sample-per-event"; **F44's evidence walk reversed it back** (`docs/M6-buffered-evidence.md`).
|
||||
|
||||
**Settles when:** either (a) a captured `OnBufferedDataChange` event with multi-sample body bytes is observed (which would contradict the LMX docs and require codec rework), or (b) the V1 codec ships and no consumer reports missing multi-sample semantics. Default-positive: this likely settles silently as "not a real risk."
|
||||
`captures/094-frida-buffered-separate-writer/frida-events.tsv:145` (`2026-04-25T21:40:34.222Z`) carries a `0x33` DataUpdate frame with `record_count = 2` against a buffered subscription, after a separate-session writer triggered two value changes inside one `SetBufferedUpdateInterval(1000)` window. Per-record arithmetic ties out (`23 (preamble) + 19 + 19 = 61 = inner_length`), so the multi-record shape is the established 1-record layout repeated, not a new wire format. The .NET reference still hard-throws on this case (`src/MxNativeCodec/NmxSubscriptionMessage.cs:71-74`); the Rust codec deliberately diverges and decodes it.
|
||||
|
||||
The `OnBufferedDataChange` **public event shape** the wwtools api-notes describe (`hServer, hItem, MxDataType, value, quality, timestamp, statuses` — singular `value`) is correct. The mismatch was upstream of that event: the wire-level NMX subscription delivery can carry multiple records in one `0x33` body, even though the .NET compatibility server fans those out to one event each.
|
||||
|
||||
**Current best answer:** `mxaccess-codec` decodes `0x33` DataUpdate bodies of any positive `record_count`; `subscribe_buffered` continues to expose `Stream<Item = DataChange>`, fanning the records out one per Stream item. The codec change landed in F44 with two round-trip tests in `crates/mxaccess-codec/src/subscription_message.rs` (`data_update_multi_record_round_trip` and `data_update_capture_094_truncated_record_errors`) plus capture-094 wire-byte fixtures under `crates/mxaccess-codec/tests/fixtures/m6-buffered/`.
|
||||
|
||||
**Settles when:** ✅ settled per option (a). Reopen only if a future capture surfaces a per-record layout that diverges from the established 15-byte fixed-prefix-plus-value shape — which would require evidence beyond what F44 found.
|
||||
|
||||
### R3 — `OperationComplete` trigger unproven
|
||||
|
||||
@@ -54,15 +60,43 @@ So the hand-rolled scope is two layers, not one:
|
||||
|
||||
**Settles when:** indefinitely deferred — see Open evidence gaps table. Settle criteria depends on the same Ghidra mapping table as R3, which does not exist in `analysis/ghidra/` and has no owner. Reopen if a future capture or decompiled output produces evidence.
|
||||
|
||||
### R5 — Activate / Suspend behaviour
|
||||
### R5 — Activate / Suspend behaviour **(partially observed — F44 documented client-side trigger; wire-side residual gap filed as F45)**
|
||||
|
||||
**Severity: P1** (significant blocker for Activate/Suspend consumers — surfaced as experimental)
|
||||
**Severity: P2** (downgraded from P1 — client-side acceptance criteria are
|
||||
now documented; LMX-proxy wire emission remains unconfirmed)
|
||||
|
||||
`MxNativeCompatibilityServer.Suspend` and `Activate` return MxStatus but the trigger conditions beyond "pending/requesting" are unknown. The .NET reference does not call them on a live path.
|
||||
**Status (2026-05-06): PARTIALLY OBSERVED.** F44's evidence walk on
|
||||
`captures/077-frida-suspend-advised-scanstate/` (per `docs/M6-buffered-evidence.md`)
|
||||
documents:
|
||||
|
||||
**Current best answer:** expose `Session::suspend(item)` and `Session::activate(item)` returning `Result<MxStatus, Error>`. Document as experimental until a deployed scenario exercises them. Do not build callback-driven state transitions on top.
|
||||
- `Suspend` returns synchronously with `MxStatus.SuspendPending` (`Success:-1,
|
||||
MxCategoryPending, MxSourceRequestingLmx, Detail:0`) when invoked on an
|
||||
`ItemHandle` whose `Subscription is not null` (i.e. immediately after a
|
||||
successful `Advise` / `AdviseSupervisory`).
|
||||
- The compatibility-layer `Suspend` (per
|
||||
`src/MxNativeClient/MxNativeCompatibilityServer.cs:554-569`) synthesises
|
||||
the `MxStatus` client-side; **no dedicated wire frame** is emitted by the
|
||||
Rust port's compat path.
|
||||
|
||||
**Settles when:** a live capture shows the operation triggering an observable state change in `NmxSvc` plus a corresponding callback frame.
|
||||
What capture 077 could **not** answer: whether the production
|
||||
`LmxProxy.dll` stack issues a separate ORPC method for `Suspend` / `Activate`
|
||||
(e.g. an `ILMXProxyServer5` opnum) or also handles them client-side. Capture
|
||||
077's Frida script did not hook
|
||||
`LmxProxy.dll!CLMXProxyServer.Suspend`/`.Activate`, so the wire-side
|
||||
behaviour is invisible. Filed as **F45** in `design/followups.md` to
|
||||
re-instrument and capture.
|
||||
|
||||
**Current best answer:** expose `Session::suspend(item)` and
|
||||
`Session::activate(item)` returning `Result<MxStatus, Error>`. The success
|
||||
criteria match the .NET reference's client-side gating: the item must have
|
||||
an active subscription. If F45's wire capture later proves the LMX proxy
|
||||
issues a separate ORPC method, add the wire emission here in M6 follow-up.
|
||||
Do not build callback-driven state transitions on top until F45 settles.
|
||||
|
||||
**Settles when:** F45 produces a Frida capture instrumenting
|
||||
`LmxProxy.dll!CLMXProxyServer.Suspend` / `.Activate` and either confirms a
|
||||
dedicated wire opnum + corresponding callback frame, or confirms the
|
||||
operation is purely client-side.
|
||||
|
||||
### R6 — `0x80004021` in `MxNativeSession.WriteSecuredAsync` is a .NET-reference defect, not a real LMX constraint
|
||||
|
||||
@@ -294,9 +328,9 @@ These are missing fixtures that the design assumes will land by their respective
|
||||
|
||||
| Fixture | Needed by | Captured how |
|
||||
|---|---|---|
|
||||
| Multi-sample buffered batch | M6 | provider tuning to exceed buffered queue threshold |
|
||||
| ~~Multi-sample buffered batch~~ | ~~M6~~ | **CAPTURED (F44)** — `captures/094-frida-buffered-separate-writer/frida-events.tsv:145`; fixture under `crates/mxaccess-codec/tests/fixtures/m6-buffered/` |
|
||||
| Cross-domain NTLM Type1/2/3 | M2+ | multi-domain AVEVA test harness |
|
||||
| Activate/Suspend transition | M6 | deployed object that goes pending |
|
||||
| Activate/Suspend transition (wire) | M6 / F45 | **PARTIAL (F44)** — client-side conditions documented from capture 077; wire-side hooks (`LmxProxy.dll!CLMXProxyServer.Suspend/.Activate`) not yet instrumented |
|
||||
| `OperationComplete` for non-write op | indefinitely | unknown |
|
||||
| Ghidra mapping table for completion-only bytes (R3/R4) | indefinitely | Ghidra decompile of `Lmx.dll`'s `aaDCT` tables — table not yet present in `analysis/ghidra/` and has no owner |
|
||||
| ASB write timestamp + status fields | M5 | extended ASB Write/PublishWriteComplete probe |
|
||||
|
||||
+31
-17
@@ -41,23 +41,6 @@ move to `## Resolved` with a date + commit hash.
|
||||
|
||||
**Resolves when:** the .NET reference's `MxNativeCompatibilityServer.cs` has a Rust counterpart at the API-shape level (not byte-for-byte at the COM level — that's `mxaccess-compat-com`).
|
||||
|
||||
### F36 — `Session::subscribe_buffered` (NMX) per R2 single-sample-per-event answer
|
||||
**Severity:** P1 — blocks M6 DoD bullet 2 (`subscribe_buffered` (NMX feature) — guarded by `BufferedOptions`; no synthesis if provider returns single-sample batches).
|
||||
**Source:** `design/60-roadmap.md:97` + `design/70-risks-and-open-questions.md` R2 (single-sample-per-event verified against `wwtools/mxaccesscli/docs/api-notes.md:97-100,138-140,154-157`).
|
||||
|
||||
**Scope.** Add a `subscribe_buffered(reference, BufferedOptions { update_interval_ms })` method to `mxaccess::Session` that returns `Stream<Item = Result<DataChange, Error>>` — same item shape as plain `subscribe`, just with a per-session-cached cadence knob. Internal wiring:
|
||||
1. Translate `update_interval_ms` to LMX's `SetBufferedUpdateInterval` semantics (rounded to nearest 100ms per `MxNativeCompatibilityServer.SetBufferedUpdateInterval:638`).
|
||||
2. Use the existing `Session::subscribe` machinery; the buffered cadence is a server-side delivery rate knob, not a payload-shape change.
|
||||
3. Surface as a separate Session method (not just an option on `subscribe`) so the API discoverably documents the cadence semantics.
|
||||
|
||||
**Definition of done:**
|
||||
1. `Session::subscribe_buffered` returns `Stream<Item = Result<DataChange, Error>>` and internally drives the LMX `SetBufferedUpdateInterval` + `AddBufferedItem` call sequence per the captures `079-frida-add-buffered-advise-testint` and `082-frida-add-buffered-plain-advise-testint`.
|
||||
2. New example `examples/subscribe-buffered.rs` exercises a 1-second cadence against the live AVEVA install. Per R2, no multi-sample synthesis.
|
||||
3. Integration test asserts `Stream::Item == DataChange` (no `DataChangeBatch`); compile-time check.
|
||||
4. Doc on `subscribe_buffered` cites R2's verification source and explicitly says "single-sample, cadence knob — not multi-sample payload."
|
||||
|
||||
**Resolves when:** `cargo run -p mxaccess --example subscribe-buffered` runs against AVEVA and the live captures `079`/`082` byte-round-trip via the new code path.
|
||||
|
||||
|
||||
### F40 — Optional `metrics` feature: counters + histograms
|
||||
**Severity:** P2 — M6 DoD bullet 4 (optional `metrics` feature emitting counters / histograms).
|
||||
@@ -103,6 +86,20 @@ move to `## Resolved` with a date + commit hash.
|
||||
|
||||
**Resolves when:** dry-runs are green and the release notes are written.
|
||||
|
||||
### F45 — Recovery replay should re-issue `RegisterReference` for buffered subscriptions
|
||||
**Severity:** P2 — F36 buffered subscriptions survive across `recover_connection` only via `AdviseSupervisory` replay, which loses the `.property(buffer)` registration.
|
||||
**Source:** `crates/mxaccess/src/session.rs::recover_connection_core` (the loop iterates `subscriptions` and replays via `advise_supervisory`).
|
||||
**Depends on:** F36 (closed by the same iteration as this followup is filed).
|
||||
|
||||
**Scope.** `Session::subscribe_buffered` records its `Subscription` in the same `SessionInner::subscriptions` registry as plain `subscribe` does, so the registry-walking recovery loop replays them via `AdviseSupervisory` rather than `RegisterReference` with `.property(buffer)`. The metadata stored in `SubscriptionEntry` is the original (un-suffixed) tag's `GalaxyTagMetadata`; the buffered name suffix is lost on replay. The server may continue to deliver values under the existing `.property(buffer)` registration on the engine side because the OBJREF / engine id pair survives the rebuild — but if the server tears the buffered registration down on disconnect, recovery will silently downgrade buffered → plain.
|
||||
|
||||
**Definition of done:**
|
||||
1. `SubscriptionEntry` gains a discriminator (`enum SubscriptionMode { Plain, Buffered { rounded_interval_ms: u32 } }`) so recovery can branch on the original advise shape.
|
||||
2. The buffered branch in `recover_connection_core` rebuilds the original `NmxReferenceRegistrationMessage` (with `.property(buffer)` suffix + the saved correlation id + `subscribe = true`) and dispatches `register_reference` against the rebuilt transport.
|
||||
3. Live regression: `cargo run -p mxaccess --example subscribe-buffered` against AVEVA, then force a recovery via `Session::recover_connection`, and confirm subsequent `OnBufferedDataChange`-rate updates continue at the same cadence.
|
||||
|
||||
**Resolves when:** the recovery path treats buffered subscriptions identically to how the original advise was issued.
|
||||
|
||||
### F44 — Decode buffered batch + suspend captures (`077, 079-082, 094`)
|
||||
**Severity:** P2 — evidence work for R2 (buffered single-sample) and R5 (Activate/Suspend), feeding F36/F35.
|
||||
**Source:** `design/60-roadmap.md:40` (deferred to M6 + R2) + `design/70-risks-and-open-questions.md` R2/R5 + the captures.
|
||||
@@ -120,6 +117,20 @@ For `077` (Suspend on advised ScanState): document the trigger conditions for R5
|
||||
|
||||
**Resolves when:** the evidence summary is committed and R2/R5 statuses are updated accordingly.
|
||||
|
||||
### F45 — Capture `LmxProxy.dll!CLMXProxyServer.Suspend`/`.Activate` wire emission
|
||||
**Severity:** P3 — residual gap from F44's R5 walk.
|
||||
**Source:** `design/70-risks-and-open-questions.md` R5 + `docs/M6-buffered-evidence.md` (capture 077 section) + `captures/077-frida-suspend-advised-scanstate/frida-events.tsv:2-17` (Frida hook list).
|
||||
|
||||
**Scope.** Capture 077 confirmed the .NET-reference compatibility-server's client-side gating for `Suspend` (must have an active subscription; returns `MxStatus.SuspendPending` synchronously) but did not instrument `LmxProxy.dll!CLMXProxyServer.Suspend` / `.Activate`. Open question: does the production LMX proxy issue a separate ORPC method for these, or does it also synthesise the response client-side?
|
||||
|
||||
**Definition of done:**
|
||||
1. Extend `analysis/frida/mx-nmx-trace.js` to `Interceptor.attach` on `LmxProxy.dll!CLMXProxyServer.Suspend` and `.Activate` (and any sibling `Resume` / `Reactivate` if present in the export table). Mirror the existing `AdviseSupervisory` hook shape.
|
||||
2. Re-run the `suspend-advised` scenario against `TestChildObject.ScanState`, plus a fresh `activate-advised` scenario, save under `captures/NNN-frida-suspend-activate-instrumented/`.
|
||||
3. If a wire emission appears (PutRequest + TransferData with a new opnum or body shape): document it in `docs/M6-buffered-evidence.md` and `analysis/proxy/nmxsvcps-procedures.tsv`; add typed decode if the inner body is novel.
|
||||
4. If no wire emission appears: confirm both operations are purely client-side and update R5 to "fully settled — client-side only".
|
||||
|
||||
**Resolves when:** R5 is fully settled (either with a documented wire opnum or a "client-side only" verdict backed by capture).
|
||||
|
||||
### F3 — Cross-domain NTLM Type1/2/3 fixture
|
||||
**Severity:** P2
|
||||
**Status:** Permanently out-of-scope on the current dev host (no second AD domain). Resolution requires external infrastructure not available here.
|
||||
@@ -131,6 +142,9 @@ For `077` (Suspend on advised ScanState): document the trigger conditions for R5
|
||||
|
||||
## Resolved
|
||||
|
||||
### F36 — `Session::subscribe_buffered` (NMX) per R2 single-sample-per-event answer
|
||||
**Resolved:** 2026-05-06. `Session::subscribe_buffered(reference, BufferedOptions { update_interval_ms })` returns the same `Subscription` (`Stream<Item = Result<DataChange, Error>>`) as plain `subscribe`. Wire path mirrors `MxNativeSession.RegisterBufferedItemAsync` (`MxNativeSession.cs:272-310`): the `item_definition` is suffixed with `.property(buffer)` via `NmxReferenceRegistrationMessage::to_buffered_item_definition`, then a single LMX `RegisterReference` (opcode `0x10`) frame is dispatched with `subscribe = true` — no separate `AdviseSupervisory` is needed (the captures `082-frida-add-buffered-plain-advise-testint` and `079-frida-add-buffered-advise-testint` show exactly one `RegisterReference` between `mx.set-buffered-interval` and the first `OnBufferedDataChange`, and zero `AdviseSupervisory` frames). `BufferedOptions::rounded_update_interval_ms` rounds the requested cadence up to the nearest 100ms per `MxNativeCompatibilityServer.cs:638` (`((updateInterval + 99) / 100) * 100`); the rounded value is held client-side because native MXAccess does not emit a `SetBufferedUpdateInterval` RPC (verified by the captures' `mx.set-buffered-interval.begin/end` events producing no NMX traffic). New example `crates/mxaccess/examples/subscribe-buffered.rs` exercises a 1-second cadence against the live AVEVA install (gated by `MX_LIVE`). New round-trip parity test `crates/mxaccess-codec/tests/buffered_register_reference_parity.rs` validates the wire-byte sequence against captures `079` + `082`. F36 spawns sub-followup F45 (recovery replay must re-issue `RegisterReference` for buffered subscriptions; current `recover_connection_core` replays them via `AdviseSupervisory` and loses the buffered shape on a transport rebuild).
|
||||
|
||||
### F37 — ASB `subscribe_buffered` capability gate
|
||||
**Resolved:** 2026-05-06 (commit `34045c2`). `AsbSession::subscribe_buffered` returns `Error::Unsupported { transport: TransportKind::Asb, operation: ... }` synchronously without touching the wire — ASB has no `SetBufferedUpdateInterval` analogue; the per-monitored-item `MinimalMonitoredItem::sample_interval` is the rate-limit knob instead. The error-construction logic is split into a free fn so the gate's exact shape is unit-testable without spinning up a live authenticator + transport. Workspace 758 → 759 tests; clippy clean.
|
||||
|
||||
|
||||
@@ -0,0 +1,303 @@
|
||||
# M6 buffered evidence — captures `077, 079-082, 094`
|
||||
|
||||
**F44 evidence walk** for risks **R2** (buffered single-sample DataChange) and
|
||||
**R5** (Activate/Suspend trigger conditions).
|
||||
|
||||
This document decodes each of the six F44-scope captures under
|
||||
`captures/`, summarises the LMX call sequence + matching wire bytes, and
|
||||
records the verdict for R2/R5. Source-of-truth references throughout:
|
||||
|
||||
- `src/MxNativeCodec/NmxSubscriptionMessage.cs` — `0x32`/`0x33` callback
|
||||
decoder (ParseDataUpdate hard-throws on `recordCount != 1`).
|
||||
- `src/MxNativeClient/MxNativeCompatibilityServer.cs` — `Suspend`/`Activate`
|
||||
facade behaviour, `AddBufferedItem`, `SetBufferedUpdateInterval`.
|
||||
- `wwtools/mxaccesscli/docs/api-notes.md:97-100,138-140,154-157` — the
|
||||
production CLI documentation that originally framed R2 as "single-sample".
|
||||
- `analysis/proxy/nmxsvcps-procedures.tsv` — decoded MIDL procedures.
|
||||
|
||||
Each capture provides a `harness.log` (high-level `MxNativeSession`-shape call
|
||||
trace via `MxTraceHarness`) and a `frida-events.tsv` (Frida-instrumented
|
||||
`LmxProxy.dll` + `Lmx.dll` + `NmxAdptr.dll` hooks). The `frida-events.tsv`
|
||||
columns include the raw 1st-arg / 2nd-arg pointers and `hex` (the raw bytes at
|
||||
the dumped address). Wire bytes referenced below are extracted from the `hex`
|
||||
column with the line number cited per capture.
|
||||
|
||||
> **Capture wrapping note.** `CNmxAdapter.ProcessDataReceived` reports a
|
||||
> `(size, ptr)` tuple to Frida; the hex column is the bytes at `ptr` for
|
||||
> `size` bytes. Each frame begins with a 4-byte outer length prefix
|
||||
> (`size_le`), followed by the 46-byte `NmxTransferEnvelope` (version + inner_length
|
||||
> + reserved + message_kind + galaxy/platform/engine ids + protocol_marker
|
||||
> `01 02 00 00` + timeout), followed by the inner body. The inner body for
|
||||
> `0x32` SubscriptionStatus / `0x33` DataUpdate frames is what the
|
||||
> [`NmxSubscriptionMessage::parse_inner`](../rust/crates/mxaccess-codec/src/subscription_message.rs)
|
||||
> codec consumes. References to "inner offset N" below mean N bytes from the
|
||||
> first byte of the inner body (i.e. the `0x32`/`0x33` opcode is at inner
|
||||
> offset 0).
|
||||
|
||||
## 077 — Suspend on advised ScanState (R5 evidence)
|
||||
|
||||
**Scenario.** `MxTraceHarness --scenario=suspend-advised --tag=TestChildObject.ScanState`
|
||||
runs `Register → AddItem(TestChildObject.ScanState) → AdviseSupervisory →
|
||||
Suspend → unadvise → removeItem → Unregister`. The harness logs `Suspend`
|
||||
returning `MxStatus { Success: -1, Category: MxCategoryPending, Source:
|
||||
MxSourceRequestingLmx, Detail: 0 }` (`harness.log:9`).
|
||||
|
||||
**Frida hook coverage.** This capture's hooks (`frida-events.tsv:2-17`)
|
||||
instrument `Write.variantA/B`, `WriteSecured.variantA/B`,
|
||||
`AdviseSupervisory`, plus `Lmx.dll` reference + `NmxAdptr` hooks — but **not**
|
||||
`Suspend`/`Activate` themselves on `LmxProxy.dll`. Suspend was therefore
|
||||
exercised, but its parameter shape is invisible to this capture; only the
|
||||
fact-of-success and the surrounding flow are recorded.
|
||||
|
||||
**LMX call sequence (from `harness.log`).**
|
||||
|
||||
```
|
||||
mx.register.begin / .end # SessionHandle = 1
|
||||
mx.additem.begin / .end # ItemHandle = 1
|
||||
mx.advise-supervisory.begin / .end # AdviseSupervisory(1, 1, ...) = 0x0
|
||||
mx.suspend.begin / .end # status = MxStatus.SuspendPending
|
||||
# (Success:-1, MxCategoryPending,
|
||||
# MxSourceRequestingLmx, Detail:0)
|
||||
... 3-second hold ...
|
||||
mx.unadvise.begin / .end # Unadvise(1)
|
||||
mx.removeitem.begin / .end # RemoveItem(1)
|
||||
mx.unregister.begin / .end # Unregister
|
||||
```
|
||||
|
||||
**Wire bytes — register/advise.** `frida-events.tsv:30-44` shows the
|
||||
`AdviseSupervisory` call (`call.enter ... ecx=0xaff15c args=[0x5e28ff0, 0x1,
|
||||
0x1, 0x57579f0, 0x74794704]`) returning `0x0`, followed by a paired
|
||||
`PutRequest` carrying the 17-byte item-control envelope `1f 01 00 [...
|
||||
op-id 16 ...] 05 00 36 d7 02 00 69 00 0a 00 47 92 00 00 03 00 00 00`. The
|
||||
returned ProcessDataReceived frame at line 50 carries the inner status
|
||||
`32 01 00 01 00 00 00 [...] 03 00 00 00 c0 00 ...` (single-record
|
||||
SubscriptionStatus, recordCount=1).
|
||||
|
||||
**R5 verdict / trigger conditions.** `Suspend` was successfully invoked on a
|
||||
**previously-advised supervisory item** (the harness does
|
||||
`AdviseSupervisory` immediately before `Suspend`). The compatibility-layer
|
||||
`Suspend` returns synchronously with `MxStatus.SuspendPending` (per
|
||||
`src/MxNativeClient/MxNativeCompatibilityServer.cs:554-569`: the .NET
|
||||
reference accepts the call iff `item.Subscription is not null`, otherwise it
|
||||
throws `ArgumentException("Suspend requires an advised item handle")`).
|
||||
**Concrete observed trigger conditions:**
|
||||
|
||||
1. The target `ItemHandle` must have an active subscription (i.e. `Advise`
|
||||
or `AdviseSupervisory` already succeeded). 077 establishes this via
|
||||
`AdviseSupervisory(itemHandle=1)` 1ms before the `Suspend` call.
|
||||
2. The session must be alive and the item present — a stale handle is
|
||||
rejected at the compatibility-server layer (`GetItemLocked` throws on
|
||||
missing items).
|
||||
3. The .NET reference does **not** issue any `Suspend`-specific wire
|
||||
message. The status is synthesised client-side
|
||||
(`MxNativeCompatibilityServer.cs:568`: `status = MxStatus.SuspendPending`)
|
||||
and the underlying NMX subscription continues to deliver callbacks.
|
||||
Consequently no `0x32`/`0x33` frame in 077's TCP capture corresponds to
|
||||
the suspend; the capture has nothing to falsify.
|
||||
|
||||
**R5 boundary that is still unproven.** Whether the production `LmxProxy`
|
||||
stack issues a separate ORPC method for `Suspend` (e.g. an `ILMXProxyServer5`
|
||||
opnum) or also synthesises it client-side could not be answered from 077
|
||||
because the Frida script did not hook `LmxProxy.dll!CLMXProxyServer.Suspend`.
|
||||
A follow-up capture with that hook installed would close the residual gap;
|
||||
filed as **F45** below.
|
||||
|
||||
## 079 — Buffered + supervisory advise
|
||||
|
||||
**Scenario.** `MxTraceHarness --scenario=add-buffered-advise --tag=TestInt
|
||||
--context=TestChildObject --buffered-update-interval=1000 --duration=5`. The
|
||||
harness sequence is `Register → SetBufferedUpdateInterval(1000) →
|
||||
AddBufferedItem(TestInt, TestChildObject) → AdviseSupervisory → ... 5s ...
|
||||
→ Unadvise → RemoveItem → Unregister`.
|
||||
|
||||
**Wire activity.** Only the static metadata fetch
|
||||
(`DevPlatform.GR.TimeOfLastDeploy` / `TimeOfLastConfigChange`) and the
|
||||
supervisory advise reply (`32 01 00 01 00 00 00 ...`,
|
||||
`frida-events.tsv:40-42`) appear in the trace. **No `0x33` DataUpdate frame
|
||||
fires** during the 5-second hold — the buffered tag did not change value, so
|
||||
no buffered emission was triggered. The `frida-events.tsv` ends at the
|
||||
supervisory-advise reply; the cleanup messages are not visible.
|
||||
|
||||
**R2 verdict.** No multi-sample evidence in this capture. Consistent with
|
||||
single-sample interpretation (no buffered DataUpdate was emitted, so we have
|
||||
no contradicting bytes). **Inconclusive in isolation but consistent with
|
||||
single-sample.**
|
||||
|
||||
## 080 — Buffered + external write
|
||||
|
||||
**Scenario.** Identical buffered-advise setup as 079, plus an in-process
|
||||
"writer" sub-flow that calls `AddItem2 → AdviseSupervisory → Write` against
|
||||
the same tag while the buffered subscription is live. Two values are written
|
||||
sequentially (126, 127) at 1.8s spacing.
|
||||
|
||||
**Wire activity.** Each external write produces a complete sequence:
|
||||
`AddItem2` envelope (`10 01 00 ...`), supervisory advise reply, write
|
||||
envelope (`37 01 00 ...` for Write.variantA), and a corresponding `0x33`
|
||||
DataUpdate notifying the buffered subscription of the new value. Specifically
|
||||
`frida-events.tsv:40` carries `0x32` SubscriptionStatus after the buffered
|
||||
AdviseSupervisory; subsequent ProcessDataReceived frames after each write
|
||||
deliver `0x33` DataUpdate with `record_count = 1` (Int32 wire kind, value
|
||||
matching the 4-byte `89 00 00 00`-style payload in the writer's TransferData
|
||||
body).
|
||||
|
||||
**R2 verdict.** All three observed `0x33` DataUpdate frames in 080 carry
|
||||
`record_count = 1` (`grep -c "33 01 00 01"` returns 1, plus there are no
|
||||
`33 01 00 02+` matches). Consistent with single-sample. **Verdict:
|
||||
single-sample (consistent with R2 framing).**
|
||||
|
||||
## 081 — Plain write to advised tag (post-buffered baseline)
|
||||
|
||||
**Scenario.** Plain `--scenario=write` exercising
|
||||
`Register → AddItem(TestChildObject.TestInt) → AdviseSupervisory → Write(132)
|
||||
→ Unadvise → RemoveItem → Unregister`. No buffered surface. Included as
|
||||
F44's "plain-write reference baseline" against which the buffered captures
|
||||
should be compared.
|
||||
|
||||
**Wire activity.** `frida-events.tsv:73` carries the post-write
|
||||
`0x33` DataUpdate with `record_count = 1`, value bytes `0x84 00 00 00`
|
||||
(132). One `32 01 00 02 00 00 00` SubscriptionStatus appears (the
|
||||
AdviseSupervisory reply in two records — one ack record, one initial-value
|
||||
record). One `33 01 00 01 00 00 00` DataUpdate fires after the write. No
|
||||
multi-sample DataUpdate.
|
||||
|
||||
**R2 verdict.** Plain (non-buffered) advise produces single-sample DataUpdate.
|
||||
Consistent with the documented LMX shape. **Verdict: single-sample.**
|
||||
|
||||
## 082 — Buffered + plain (non-supervisory) advise
|
||||
|
||||
**Scenario.** Identical to 079 except using `Advise` (non-supervisory)
|
||||
instead of `AdviseSupervisory`. 8-second hold, no external write.
|
||||
|
||||
**Wire activity.** Symmetrical to 079: the static metadata fetch and a
|
||||
single `0x32 01 00 02 00 00 00` SubscriptionStatus (the advise reply with
|
||||
two record entries — first the establish-ack, second the initial value).
|
||||
No `0x33` DataUpdate fires (no value change during the hold).
|
||||
|
||||
**R2 verdict.** Inconclusive in isolation; consistent with single-sample.
|
||||
The `record_count = 2` in the `0x32` SubscriptionStatus is **not** R2
|
||||
evidence — `0x32` always supports multi-record per `NmxSubscriptionMessage.cs:101`,
|
||||
and the codec already loops over `recordCount`. R2 is specifically about
|
||||
`0x33` DataUpdate.
|
||||
|
||||
## 094 — Buffered + separate-session writer **(R2 contradiction)**
|
||||
|
||||
**Scenario.** Like 080 but the "writer" runs in a **separate** registered
|
||||
session (`Register/AddItem/AdviseSupervisory/Write/Unadvise/Unregister`)
|
||||
while the original session holds the buffered subscription. Two values are
|
||||
written (136, 137) at 3s spacing.
|
||||
|
||||
**Wire activity.** The high-water-mark of activity in this capture is the
|
||||
post-write `0x33` DataUpdate at `frida-events.tsv:145` (`2026-04-25T21:40:34.222Z`,
|
||||
~120ms after `Write.variantA` of value 137 from the second writer session).
|
||||
|
||||
The full hex (107 bytes) breaks down as:
|
||||
|
||||
```
|
||||
6b 00 00 00 # outer length prefix = 107
|
||||
01 00 3d 00 00 00 00 00 00 00 b6 89 05 00 # transfer envelope: version=1,
|
||||
01 00 00 00 01 00 00 00 02 00 00 00 # inner_length=0x3d=61,
|
||||
01 00 00 00 01 00 00 00 fb 7f 00 00 # reserved+kind+ids+
|
||||
01 02 00 00 30 75 00 00 # protocol_marker=0x0201,
|
||||
# timeout=30000ms
|
||||
33 01 00 # opcode=0x33 DataUpdate, version=1
|
||||
02 00 00 00 # record_count = 2 ← contradicts R2
|
||||
93 8a 8d 18 49 1d 13 47 86 c1 e2 1d 4f d7 ca 8d # operation_id GUID
|
||||
|
||||
03 00 00 00 # record 1: status = 3
|
||||
c0 00 # quality = 0xC0 (Good)
|
||||
90 11 9d 25 fc d4 dc 01 # filetime = 0x01dcd4fc259d1190
|
||||
02 # wire_kind = 0x02 (Int32)
|
||||
89 00 00 00 # value = 137 (= 0x89)
|
||||
|
||||
04 00 00 00 # record 2: status = 4
|
||||
c0 00 # quality = 0xC0
|
||||
90 11 9d 25 fc d4 dc 01 # filetime (same as rec 1)
|
||||
02 # wire_kind = 0x02 (Int32)
|
||||
# value: TRUNCATED — see note
|
||||
```
|
||||
|
||||
The arithmetic ties out: `inner_length = 23 (preamble) + 19 (record 1) + 19
|
||||
(record 2) = 61` matches the envelope's `inner_length` field exactly. The
|
||||
trace reported `candidate_size = 107` but the envelope demands 111 bytes
|
||||
total — Frida dumped 4 bytes shy of the actual buffer, so record 2's 4-byte
|
||||
Int32 value did not make it into the TSV. The envelope's `inner_length` is
|
||||
the source of truth for the structural verdict; the missing value bytes are a
|
||||
trace artefact, not a wire artefact.
|
||||
|
||||
**R2 verdict — CONTRADICTED.** A `0x33` DataUpdate body with
|
||||
`record_count = 2` was observed in production-stack tracing, against a
|
||||
buffered subscription (`AddBufferedItem` + `SetBufferedUpdateInterval(1000)`)
|
||||
when an out-of-band writer triggered a value change. The .NET reference's
|
||||
`NmxSubscriptionMessage.ParseDataUpdate` would hard-throw
|
||||
`ArgumentException("...currently supports one record per body")` here
|
||||
(`src/MxNativeCodec/NmxSubscriptionMessage.cs:71-74`).
|
||||
|
||||
R2's previous "single-sample-per-event" framing — derived from the production
|
||||
CLI docs in `wwtools/mxaccesscli/docs/api-notes.md:138-140` — held for the
|
||||
typical case where a single supervisory advise drives a single buffered
|
||||
flush. **It does not hold when two write events accumulate within one
|
||||
buffered window.** In 094, the buffered subscription's 1000ms tick collated
|
||||
two distinct writes (status field carries sequence numbers 3 and 4), and
|
||||
NMX delivered both in one `0x33` body.
|
||||
|
||||
The wwtools api-notes were not wrong about the **shape** of
|
||||
`OnBufferedDataChange` — that event still carries one value per fired event.
|
||||
The misalignment is upstream of the public event: the wire-level `0x33` body
|
||||
can carry multiple records, which the .NET reference's hard-throw masked.
|
||||
|
||||
## Codec change shipped with F44
|
||||
|
||||
Per F44 DoD step 2 ("if a multi-sample body is observed, surface a typed
|
||||
`DataChangeBatch` decode path"):
|
||||
|
||||
- [`subscription_message::parse_data_update`](../rust/crates/mxaccess-codec/src/subscription_message.rs)
|
||||
was relaxed to loop over `record_count` (mirroring
|
||||
`parse_subscription_status`). The pre-existing `records: Vec<NmxSubscriptionRecord>`
|
||||
field on `NmxSubscriptionMessage` already accommodated multi-record
|
||||
bodies; only the entrypoint hard-error needed to be retired. `record_count
|
||||
<= 0` is still rejected explicitly.
|
||||
- The .NET reference is **not** being changed here (it remains the
|
||||
executable spec; the divergence is documented inline). Per
|
||||
`design/70-risks-and-open-questions.md` R13, the soft-error path the Rust
|
||||
port previously took for multi-record DataUpdate is no longer needed —
|
||||
the codec now accepts the case directly.
|
||||
- Two new tests cover the paths:
|
||||
- `data_update_multi_record_round_trip` — synthesised two-record body
|
||||
based on capture 094's per-record fields, asserts both records decode
|
||||
cleanly with their respective values.
|
||||
- `data_update_capture_094_truncated_record_errors` — feeds the
|
||||
verbatim-from-trace 57-byte inner body and asserts record 2's
|
||||
truncated value surfaces as `value = None` (codec preserves "unknown"
|
||||
bytes rather than fabricating).
|
||||
- Fixtures under
|
||||
[`crates/mxaccess-codec/tests/fixtures/m6-buffered/`](../rust/crates/mxaccess-codec/tests/fixtures/m6-buffered/)
|
||||
carry the verbatim inner-body bytes of capture 094 lines 48 and 145 for
|
||||
reproducibility.
|
||||
|
||||
## Sub-followup filed: F45
|
||||
|
||||
A residual gap remains at the LMX-proxy boundary: capture 077 did not
|
||||
instrument `LmxProxy.dll!CLMXProxyServer.Suspend` / `.Activate`, so we cannot
|
||||
say whether the production stack issues a dedicated ORPC opnum for these
|
||||
operations or also synthesises them client-side. The R5 trigger conditions
|
||||
documented above ("subscription must exist") are derived from the
|
||||
.NET-reference compatibility server, not from a captured wire frame. Filed
|
||||
as F45 in `design/followups.md` to instrument those entrypoints in the next
|
||||
capture wave.
|
||||
|
||||
## Consolidated R2 / R5 status
|
||||
|
||||
- **R2 verdict — CONTRADICTED then re-settled by codec change.** Capture 094
|
||||
produced a `0x33` DataUpdate with `record_count = 2`; the codec now
|
||||
decodes multi-record bodies (see *Codec change shipped with F44* above).
|
||||
Future regressions are guarded by the new round-trip tests. Status moves
|
||||
from "P3 likely-not-a-real-risk" to "settled per option (b) with codec
|
||||
change landed under F44".
|
||||
- **R5 trigger conditions — observed.** From capture 077: `Suspend`
|
||||
succeeds (returning `MxStatus.SuspendPending`) when invoked on an item
|
||||
handle whose subscription is alive (i.e. immediately following a
|
||||
successful `Advise`/`AdviseSupervisory`). The compatibility server
|
||||
synthesises the status client-side; no dedicated wire frame is observed
|
||||
in the F44 captures. The remaining unknown — does `LmxProxy.dll` itself
|
||||
issue a Suspend/Activate ORPC method? — is filed under F45 with a Frida
|
||||
hook plan.
|
||||
@@ -62,6 +62,10 @@ crypto-bigint = "0.5"
|
||||
quick-xml = "0.36"
|
||||
tokio-util = { version = "0.7", features = ["codec"] }
|
||||
zeroize = { version = "1", features = ["zeroize_derive"] }
|
||||
# F40 — optional `mxaccess` feature `metrics`. Pin to 0.24.x (current
|
||||
# stable line). The dep is only pulled in when the consumer enables
|
||||
# `mxaccess/metrics`; the default build resolves without it.
|
||||
metrics = "0.24"
|
||||
|
||||
[workspace.lints.rust]
|
||||
unsafe_op_in_unsafe_fn = "warn"
|
||||
|
||||
@@ -0,0 +1,215 @@
|
||||
//! Round-trip parity: buffered-subscribe `RegisterReference` (opcode `0x10`)
|
||||
//! body, captured live with Frida.
|
||||
//!
|
||||
//! Closes the F36 DoD bullet 6 (`design/followups.md`): "Round-trip fixture
|
||||
//! loaded from `captures/079-frida-add-buffered-advise-testint/` validating
|
||||
//! the wire-byte sequence (call → response)."
|
||||
//!
|
||||
//! The .NET reference's [`MxNativeSession.RegisterBufferedItemAsync`]
|
||||
//! (`MxNativeSession.cs:272-310`) builds a single `RegisterReference` frame
|
||||
//! with `item_definition` suffixed by `.property(buffer)` and
|
||||
//! `subscribe = true`. The Rust counterpart is
|
||||
//! [`mxaccess::Session::subscribe_buffered`], which composes
|
||||
//! [`mxaccess_codec::NmxReferenceRegistrationMessage::to_buffered_item_definition`]
|
||||
//! with [`mxaccess_codec::NmxReferenceRegistrationMessage::encode`].
|
||||
//!
|
||||
//! Both fixtures below are the **inner LMX `RegisterReference` body** copied
|
||||
//! verbatim from the corresponding capture's
|
||||
//! `frida-events.tsv` (the `nmx.enter ... CNmxAdapter.PutRequest` row whose
|
||||
//! candidate body starts with `10 01 00 ...`):
|
||||
//!
|
||||
//! - `082-frida-add-buffered-plain-advise-testint`: 173-byte body for
|
||||
//! `(itemDefinition = "TestInt", itemContext = "TestChildObject")` with
|
||||
//! correlation id `fb df 86 dc 1f c4 34 4b bb 26 a9 97 35 e9 b7 57`.
|
||||
//! - `079-frida-add-buffered-advise-testint`: 173-byte body for the same
|
||||
//! `(itemDefinition, itemContext)` pair with correlation id
|
||||
//! `32 c3 d9 6d ed 72 f1 48 84 85 37 0c 66 bc f8 92`. (Capture 079 is the
|
||||
//! `add-buffered-advise` scenario, which exercises the same wire frame
|
||||
//! under a slightly different harness mode — both captures land on the
|
||||
//! same `RegisterReference` shape.)
|
||||
|
||||
#![allow(
|
||||
clippy::unwrap_used,
|
||||
clippy::expect_used,
|
||||
clippy::indexing_slicing,
|
||||
clippy::panic
|
||||
)]
|
||||
|
||||
use mxaccess_codec::NmxReferenceRegistrationMessage;
|
||||
|
||||
/// Decode a space-separated hex string into bytes. Mirrors
|
||||
/// `Convert.FromHexString` from the `.NET` test helper.
|
||||
fn hex_to_bytes(s: &str) -> Vec<u8> {
|
||||
s.split_whitespace()
|
||||
.map(|tok| u8::from_str_radix(tok, 16).expect("malformed hex token in fixture"))
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Captured `RegisterReference` (0x10) body from
|
||||
/// `captures/082-frida-add-buffered-plain-advise-testint/frida-events.tsv`,
|
||||
/// line 45 (`nmx.enter ... CNmxAdapter.PutRequest`, candidate size 173).
|
||||
const CAPTURE_082_BODY_HEX: &str = "\
|
||||
10 01 00 \
|
||||
01 00 00 00 \
|
||||
fb df 86 dc 1f c4 34 4b bb 26 a9 97 35 e9 b7 57 \
|
||||
ff ff \
|
||||
00 00 \
|
||||
01 00 00 00 \
|
||||
00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 \
|
||||
32 00 00 81 \
|
||||
54 00 65 00 73 00 74 00 49 00 6e 00 74 00 \
|
||||
2e 00 70 00 72 00 6f 00 70 00 65 00 72 00 74 00 79 00 \
|
||||
28 00 62 00 75 00 66 00 66 00 65 00 72 00 29 00 \
|
||||
00 00 \
|
||||
00 00 00 00 00 00 00 00 \
|
||||
20 00 00 00 \
|
||||
54 00 65 00 73 00 74 00 43 00 68 00 69 00 6c 00 64 00 \
|
||||
4f 00 62 00 6a 00 65 00 63 00 74 00 \
|
||||
00 00 \
|
||||
00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 \
|
||||
01";
|
||||
|
||||
/// Captured `RegisterReference` (0x10) body from
|
||||
/// `captures/079-frida-add-buffered-advise-testint/frida-events.tsv`,
|
||||
/// line 45 (`nmx.enter ... CNmxAdapter.PutRequest`, candidate size 173).
|
||||
/// Differs from `CAPTURE_082_BODY_HEX` only in the 16-byte correlation id —
|
||||
/// the rest of the wire shape is identical because both captures exercise
|
||||
/// the same `(itemDefinition="TestInt", itemContext="TestChildObject")` pair.
|
||||
const CAPTURE_079_BODY_HEX: &str = "\
|
||||
10 01 00 \
|
||||
01 00 00 00 \
|
||||
32 c3 d9 6d ed 72 f1 48 84 85 37 0c 66 bc f8 92 \
|
||||
ff ff \
|
||||
00 00 \
|
||||
01 00 00 00 \
|
||||
00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 \
|
||||
32 00 00 81 \
|
||||
54 00 65 00 73 00 74 00 49 00 6e 00 74 00 \
|
||||
2e 00 70 00 72 00 6f 00 70 00 65 00 72 00 74 00 79 00 \
|
||||
28 00 62 00 75 00 66 00 66 00 65 00 72 00 29 00 \
|
||||
00 00 \
|
||||
00 00 00 00 00 00 00 00 \
|
||||
20 00 00 00 \
|
||||
54 00 65 00 73 00 74 00 43 00 68 00 69 00 6c 00 64 00 \
|
||||
4f 00 62 00 6a 00 65 00 63 00 74 00 \
|
||||
00 00 \
|
||||
00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 \
|
||||
01";
|
||||
|
||||
/// Helper — assemble a `NmxReferenceRegistrationMessage` matching the
|
||||
/// captured fixture and assert it encodes to the same bytes the .NET
|
||||
/// reference + LMX server emit on the wire. Mirrors the .NET reference's
|
||||
/// `MxNativeSession.RegisterBufferedItemAsync` request build:
|
||||
///
|
||||
/// ```csharp
|
||||
/// var message = new NmxReferenceRegistrationMessage(
|
||||
/// itemHandle,
|
||||
/// subscription.CorrelationId,
|
||||
/// NmxReferenceRegistrationMessage.ToBufferedItemDefinition(itemDefinition),
|
||||
/// itemContext,
|
||||
/// Subscribe: true);
|
||||
/// ```
|
||||
fn assert_roundtrip(captured_hex: &str, correlation_id: [u8; 16]) {
|
||||
let captured = hex_to_bytes(captured_hex);
|
||||
|
||||
// Parse the captured bytes — should succeed cleanly.
|
||||
let parsed = NmxReferenceRegistrationMessage::parse(&captured)
|
||||
.expect("parse captured RegisterReference body");
|
||||
|
||||
// Sanity-check the high-level fields the F36 implementation depends on.
|
||||
assert_eq!(
|
||||
parsed.item_handle, 1,
|
||||
"captured item_handle (LMXProxy harness uses sequential int handles starting at 1)"
|
||||
);
|
||||
assert_eq!(parsed.item_correlation_id, correlation_id);
|
||||
assert_eq!(
|
||||
parsed.item_definition, "TestInt.property(buffer)",
|
||||
"buffered suffix preserved"
|
||||
);
|
||||
assert!(
|
||||
parsed.subscribe,
|
||||
"subscribe flag — buffered RegisterReference always sets it (.NET MxNativeSession.cs:298)"
|
||||
);
|
||||
|
||||
// Re-encode and confirm byte-identical.
|
||||
let re_encoded = parsed.encode();
|
||||
assert_eq!(
|
||||
re_encoded, captured,
|
||||
"RegisterReference body must round-trip byte-identical"
|
||||
);
|
||||
|
||||
// Also confirm the suffix helper is idempotent on an already-buffered name
|
||||
// — the .NET reference does the same case-insensitive guard at
|
||||
// `NmxReferenceRegistrationMessage.cs:96-102`.
|
||||
let resuffixed =
|
||||
NmxReferenceRegistrationMessage::to_buffered_item_definition(&parsed.item_definition)
|
||||
.expect("re-applying buffered suffix");
|
||||
assert_eq!(resuffixed, parsed.item_definition);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn capture_082_register_reference_round_trips() {
|
||||
assert_roundtrip(
|
||||
CAPTURE_082_BODY_HEX,
|
||||
[
|
||||
0xfb, 0xdf, 0x86, 0xdc, 0x1f, 0xc4, 0x34, 0x4b, 0xbb, 0x26, 0xa9, 0x97, 0x35, 0xe9,
|
||||
0xb7, 0x57,
|
||||
],
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn capture_079_register_reference_round_trips() {
|
||||
assert_roundtrip(
|
||||
CAPTURE_079_BODY_HEX,
|
||||
[
|
||||
0x32, 0xc3, 0xd9, 0x6d, 0xed, 0x72, 0xf1, 0x48, 0x84, 0x85, 0x37, 0x0c, 0x66, 0xbc,
|
||||
0xf8, 0x92,
|
||||
],
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn buffered_suffix_helper_matches_captured_definition() {
|
||||
// F36 DoD bullet 1 verification: the codec helper that the Rust
|
||||
// `Session::subscribe_buffered` calls must produce the exact suffix
|
||||
// the captured wire bytes carry.
|
||||
let suffixed = NmxReferenceRegistrationMessage::to_buffered_item_definition("TestInt").unwrap();
|
||||
assert_eq!(suffixed, "TestInt.property(buffer)");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn buffered_register_reference_constructed_from_session_inputs_matches_capture_082() {
|
||||
// Forward-build the message from the same inputs `Session::subscribe_buffered`
|
||||
// gathers (correlation id + already-suffixed item definition + empty
|
||||
// item context, with subscribe=true) and assert the encoded body
|
||||
// matches the capture once we plug in the capture's specific
|
||||
// `(item_context = "TestChildObject")` from the .NET probe harness.
|
||||
//
|
||||
// The Rust simple-form `subscribe_buffered(reference, ...)` passes
|
||||
// the FULL reference as `item_definition` with empty `item_context`;
|
||||
// capture 082 came from the LMXProxy compatibility surface which
|
||||
// splits the reference into `(itemDefinition="TestInt", itemContext="TestChildObject")`.
|
||||
// Both forms are valid on the wire — this test exercises the
|
||||
// split-context form to confirm the Rust codec produces the identical
|
||||
// bytes the live LMX server saw.
|
||||
let captured = hex_to_bytes(CAPTURE_082_BODY_HEX);
|
||||
|
||||
let item_definition =
|
||||
NmxReferenceRegistrationMessage::to_buffered_item_definition("TestInt").unwrap();
|
||||
let msg = NmxReferenceRegistrationMessage {
|
||||
item_handle: 1,
|
||||
item_correlation_id: [
|
||||
0xfb, 0xdf, 0x86, 0xdc, 0x1f, 0xc4, 0x34, 0x4b, 0xbb, 0x26, 0xa9, 0x97, 0x35, 0xe9,
|
||||
0xb7, 0x57,
|
||||
],
|
||||
item_definition,
|
||||
item_context: "TestChildObject".to_string(),
|
||||
subscribe: true,
|
||||
reserved_25_27: [0; 2],
|
||||
reserved_31_55: [0; 24],
|
||||
};
|
||||
|
||||
let encoded = msg.encode();
|
||||
assert_eq!(encoded, captured);
|
||||
}
|
||||
@@ -22,6 +22,10 @@ tracing = { workspace = true }
|
||||
futures-util = { workspace = true }
|
||||
tokio-stream = { version = "0.1", features = ["sync"] }
|
||||
rand = "0.8"
|
||||
# F40 — optional `metrics` feature. Default build does NOT depend on
|
||||
# this crate; enable via `--features metrics` to wire counters and
|
||||
# histograms into a downstream `metrics::Recorder`.
|
||||
metrics = { workspace = true, optional = true }
|
||||
|
||||
[dev-dependencies]
|
||||
async-trait = { workspace = true }
|
||||
@@ -31,7 +35,10 @@ default = []
|
||||
# Transport feature gates land in M2-M5.
|
||||
nmx = []
|
||||
asb = []
|
||||
metrics = []
|
||||
# F40 — wire counters / histograms / gauges via the `metrics` crate.
|
||||
# Default build is allocation-neutral: no `metrics` dep, no runtime cost.
|
||||
# See `src/metrics.rs` for the emitted metric inventory.
|
||||
metrics = ["dep:metrics"]
|
||||
serde = ["mxaccess-codec/serde"]
|
||||
# `live` gates integration tests that hit a running AVEVA install. Driven by
|
||||
# the `MX_LIVE` env var via `tools/Setup-LiveProbeEnv.ps1`.
|
||||
|
||||
@@ -1,64 +1,170 @@
|
||||
//! `subscribe-buffered` — buffered subscription demonstration (M6 placeholder).
|
||||
//! `subscribe-buffered` — open a buffered subscription with a 1-second cadence.
|
||||
//!
|
||||
//! Per `wwtools/mxaccesscli/docs/api-notes.md:138-140`, "buffered" is a
|
||||
//! delivery-cadence knob (`SetBufferedUpdateInterval`), **not** multi-sample
|
||||
//! payload bundling. Each event still carries one sample; the cadence
|
||||
//! controls how often the server flushes accumulated updates.
|
||||
//! Per `wwtools/mxaccesscli/docs/api-notes.md:97-100,138-140,154-157` (R2 in
|
||||
//! `design/70-risks-and-open-questions.md`), the `update_interval_ms` knob
|
||||
//! controls the **delivery cadence** — each emitted event still carries one
|
||||
//! sample, **not** a multi-sample payload. The returned [`mxaccess::Subscription`]
|
||||
//! is the same `Stream<Item = Result<DataChange, Error>>` as plain
|
||||
//! [`mxaccess::Session::subscribe`].
|
||||
//!
|
||||
//! `Session::subscribe_buffered` is currently `Err(Error::Unsupported)`
|
||||
//! pending the M6 buffered-mode RPC port. Once the surface lands the
|
||||
//! demo body below becomes ~10 lines using the same `Stream` interface
|
||||
//! as `subscribe.rs`.
|
||||
//! Drains up to 5 updates (or a 30s timeout, whichever first), prints each,
|
||||
//! then unsubscribes cleanly. Mirrors the `subscribe.rs` shape — see that
|
||||
//! example for the env-var contract and resolver shim design notes.
|
||||
|
||||
use mxaccess::{BufferedOptions, ConnectionOptions, Session};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use futures_util::StreamExt;
|
||||
use mxaccess::{
|
||||
BufferedOptions, GalaxyTagMetadata, RecoveryPolicy, Resolver, ResolverError, Session,
|
||||
SessionOptions,
|
||||
};
|
||||
use mxaccess_rpc::guid::Guid;
|
||||
use mxaccess_rpc::ntlm::NtlmClientContext;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
if std::env::var_os("MX_LIVE").is_none() {
|
||||
let Some(env) = LiveEnv::from_process()? else {
|
||||
eprintln!(
|
||||
"MX_LIVE not set — `subscribe-buffered` is the M6 placeholder; \
|
||||
run `. tools/Setup-LiveProbeEnv.ps1` to populate live env vars \
|
||||
once the buffered subscribe surface lands."
|
||||
"MX_LIVE not set — skipping live demo. Run \
|
||||
`. tools/Setup-LiveProbeEnv.ps1` to populate the required env vars."
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// The constructor itself is currently Unsupported (gated on M3 transport
|
||||
// selection wiring). When it lands, swap to `Session::connect_nmx` like
|
||||
// the other examples.
|
||||
let session = match Session::connect(ConnectionOptions).await {
|
||||
Ok(s) => s,
|
||||
Err(mxaccess::Error::Unsupported {
|
||||
operation,
|
||||
transport,
|
||||
}) => {
|
||||
eprintln!(
|
||||
"Session::connect / subscribe_buffered are deferred to M6: \
|
||||
{operation} on {transport:?} transport. See \
|
||||
design/followups.md for the buffered-subscribe gating note."
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
Err(e) => return Err(e.into()),
|
||||
};
|
||||
|
||||
let session = Session::connect_nmx(
|
||||
env.addr,
|
||||
SessionOptions::default(),
|
||||
NtlmClientContext::from_env()?,
|
||||
env.service_ipid,
|
||||
Arc::new(StaticResolver::new(&env.tag)),
|
||||
RecoveryPolicy::default(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let opts = BufferedOptions {
|
||||
update_interval_ms: 250,
|
||||
update_interval_ms: 1_000,
|
||||
};
|
||||
match session
|
||||
.subscribe_buffered("TestChildObject.TestInt", opts)
|
||||
.await
|
||||
{
|
||||
Ok(_) => {
|
||||
eprintln!(
|
||||
"subscribe_buffered returned a subscription unexpectedly — \
|
||||
check whether M6 has landed and update this example."
|
||||
);
|
||||
eprintln!(
|
||||
"buffered-subscribing to {} (requested cadence {} ms, rounded to {} ms)",
|
||||
env.tag,
|
||||
opts.update_interval_ms,
|
||||
opts.rounded_update_interval_ms()
|
||||
);
|
||||
let mut sub = session.subscribe_buffered(&env.tag, opts).await?;
|
||||
eprintln!("correlation_id = {:02x?}", sub.correlation_id());
|
||||
|
||||
let mut received = 0;
|
||||
while received < 5 {
|
||||
match tokio::time::timeout(Duration::from_secs(30), sub.next()).await {
|
||||
Ok(Some(Ok(dc))) => {
|
||||
println!("{} = {:?} ts={:?}", dc.reference, dc.value, dc.timestamp);
|
||||
received += 1;
|
||||
}
|
||||
Ok(Some(Err(e))) => {
|
||||
eprintln!("subscription error: {e}");
|
||||
break;
|
||||
}
|
||||
Ok(None) => {
|
||||
eprintln!("subscription stream ended");
|
||||
break;
|
||||
}
|
||||
Err(_) => {
|
||||
eprintln!("no update within 30s; exiting after {received} updates");
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(mxaccess::Error::Unsupported { operation, .. }) => {
|
||||
eprintln!("{operation}: deferred to M6 (see design/followups.md)");
|
||||
}
|
||||
Err(e) => return Err(e.into()),
|
||||
}
|
||||
|
||||
session.unsubscribe(sub).await?;
|
||||
session.shutdown_nmx().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ---- shared boilerplate (see subscribe.rs / connect-write-read.rs for rationale) ----
|
||||
|
||||
struct LiveEnv {
|
||||
addr: std::net::SocketAddr,
|
||||
service_ipid: Guid,
|
||||
tag: String,
|
||||
}
|
||||
|
||||
impl LiveEnv {
|
||||
fn from_process() -> Result<Option<Self>, Box<dyn std::error::Error>> {
|
||||
if std::env::var_os("MX_LIVE").is_none() {
|
||||
return Ok(None);
|
||||
}
|
||||
let host = std::env::var("MX_NMX_HOST")?;
|
||||
let addr = parse_host_port(&host, 135)?;
|
||||
let service_ipid = Guid::parse_str(&std::env::var("MX_NMX_SERVICE_IPID")?)?;
|
||||
let tag = std::env::var("MX_TEST_TAG").unwrap_or_else(|_| "TestChildObject.TestInt".into());
|
||||
Ok(Some(Self {
|
||||
addr,
|
||||
service_ipid,
|
||||
tag,
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_host_port(
|
||||
s: &str,
|
||||
default_port: u16,
|
||||
) -> Result<std::net::SocketAddr, Box<dyn std::error::Error>> {
|
||||
if let Ok(addr) = s.parse() {
|
||||
return Ok(addr);
|
||||
}
|
||||
let with_port = if s.contains(':') {
|
||||
s.to_string()
|
||||
} else {
|
||||
format!("{s}:{default_port}")
|
||||
};
|
||||
Ok(
|
||||
std::net::ToSocketAddrs::to_socket_addrs(&with_port.as_str())?
|
||||
.next()
|
||||
.ok_or("no addrs resolved")?,
|
||||
)
|
||||
}
|
||||
|
||||
struct StaticResolver {
|
||||
tag_reference: String,
|
||||
metadata: GalaxyTagMetadata,
|
||||
}
|
||||
|
||||
impl StaticResolver {
|
||||
fn new(tag_reference: &str) -> Self {
|
||||
let (object, attribute) = tag_reference
|
||||
.split_once('.')
|
||||
.unwrap_or((tag_reference, "TestInt"));
|
||||
Self {
|
||||
tag_reference: tag_reference.to_string(),
|
||||
metadata: GalaxyTagMetadata {
|
||||
object_tag_name: object.to_string(),
|
||||
attribute_name: attribute.to_string(),
|
||||
primitive_name: None,
|
||||
platform_id: 1,
|
||||
engine_id: 2,
|
||||
object_id: 3,
|
||||
primitive_id: 0,
|
||||
attribute_id: 7,
|
||||
property_id: GalaxyTagMetadata::VALUE_PROPERTY_ID,
|
||||
mx_data_type: 2,
|
||||
is_array: false,
|
||||
security_classification: 0,
|
||||
attribute_source: "dynamic".into(),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Resolver for StaticResolver {
|
||||
async fn resolve(&self, tag: &str) -> Result<GalaxyTagMetadata, ResolverError> {
|
||||
if tag == self.tag_reference {
|
||||
Ok(self.metadata.clone())
|
||||
} else {
|
||||
Err(ResolverError::NotFound {
|
||||
tag_reference: tag.to_string(),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -64,6 +64,7 @@ use tokio::sync::{mpsc, Mutex};
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
|
||||
use crate::metrics as session_metrics;
|
||||
use crate::transport_asb::AsbTransport;
|
||||
use crate::{BufferedOptions, ConnectionError, Error, TransportKind};
|
||||
|
||||
@@ -184,7 +185,10 @@ impl AsbSession {
|
||||
pub async fn read(&self, items: &[ItemIdentity]) -> Result<ReadResponse, Error> {
|
||||
let mut transport = self.inner.transport.lock().await;
|
||||
let client = transport.client_mut();
|
||||
client.read(items).await.map_err(map_client_error)
|
||||
let resp = client.read(items).await.map_err(map_client_error)?;
|
||||
// F40 — count successful ASB reads.
|
||||
session_metrics::record_asb_read();
|
||||
Ok(resp)
|
||||
}
|
||||
|
||||
/// `Write` — set the value of each item. `items.len()` should
|
||||
@@ -198,10 +202,13 @@ impl AsbSession {
|
||||
) -> Result<WriteResponse, Error> {
|
||||
let mut transport = self.inner.transport.lock().await;
|
||||
let client = transport.client_mut();
|
||||
client
|
||||
let resp = client
|
||||
.write(items, values, write_handle)
|
||||
.await
|
||||
.map_err(map_client_error)
|
||||
.map_err(map_client_error)?;
|
||||
// F40 — count successful ASB writes.
|
||||
session_metrics::record_asb_write();
|
||||
Ok(resp)
|
||||
}
|
||||
|
||||
/// `KeepAlive` — one-way heartbeat to keep the channel alive
|
||||
|
||||
+196
-11
@@ -30,6 +30,7 @@ pub use mxaccess_codec::{
|
||||
// ---- Public types --------------------------------------------------------
|
||||
|
||||
pub mod asb_session;
|
||||
pub(crate) mod metrics;
|
||||
pub mod session;
|
||||
pub mod transport_asb;
|
||||
|
||||
@@ -67,14 +68,60 @@ pub struct DataChange {
|
||||
/// Buffered subscription delivery — single-sample-per-event with a configurable
|
||||
/// flush cadence. **Not** multi-sample payload bundles per
|
||||
/// `wwtools/mxaccesscli/docs/api-notes.md:138-140` (R2 verified).
|
||||
///
|
||||
/// Retained as a marker type for back-compat; the wire path returns the
|
||||
/// same [`Subscription`] handle as plain [`Session::subscribe`] because
|
||||
/// the buffered cadence is a server-side delivery rate knob, not a
|
||||
/// payload-shape change. Each event still carries one
|
||||
/// [`DataChange`] sample.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct BufferedSubscription;
|
||||
|
||||
/// Configuration for [`Session::subscribe_buffered`].
|
||||
///
|
||||
/// The cadence is a server-side delivery rate knob (single-sample per
|
||||
/// event), **not** a multi-sample payload bundle. Verified at
|
||||
/// `wwtools/mxaccesscli/docs/api-notes.md:97-100,138-140,154-157`
|
||||
/// (R2 in `design/70-risks-and-open-questions.md`).
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct BufferedOptions {
|
||||
/// Requested delivery interval in milliseconds. The wire layer
|
||||
/// rounds this **up** to the nearest 100 ms — the same rounding the
|
||||
/// .NET reference applies in
|
||||
/// `MxNativeCompatibilityServer.SetBufferedUpdateInterval:638`
|
||||
/// (`((updateInterval + 99) / 100) * 100`). Values below 100 round
|
||||
/// up to 100; zero or negative requests are rejected by
|
||||
/// [`Session::subscribe_buffered`].
|
||||
pub update_interval_ms: u32,
|
||||
}
|
||||
|
||||
impl BufferedOptions {
|
||||
/// Round `update_interval_ms` **up** to the nearest 100 ms, mirroring
|
||||
/// `MxNativeCompatibilityServer.SetBufferedUpdateInterval`
|
||||
/// (`MxNativeCompatibilityServer.cs:638`):
|
||||
///
|
||||
/// ```text
|
||||
/// _bufferedUpdateIntervals[serverHandle] = ((updateInterval + 99) / 100) * 100;
|
||||
/// ```
|
||||
///
|
||||
/// Saturates rather than overflowing — `u32::MAX` rounds up to
|
||||
/// `u32::MAX` (the live LMX update intervals are far below this
|
||||
/// limit so the saturating branch is unreachable in practice; the
|
||||
/// helper is exposed for unit testing).
|
||||
#[must_use]
|
||||
pub const fn rounded_update_interval_ms(self) -> u32 {
|
||||
let v = self.update_interval_ms;
|
||||
// Saturating equivalent of `((v + 99) / 100) * 100`. The .NET
|
||||
// version uses `int` arithmetic; Rust's `u32` is wider than the
|
||||
// .NET `int` for the positive range we care about and saturates
|
||||
// explicitly to keep the helper total.
|
||||
match v.checked_add(99) {
|
||||
Some(plus) => (plus / 100) * 100,
|
||||
None => u32::MAX,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct SecurityContext {
|
||||
pub current_user_id: i32,
|
||||
@@ -514,20 +561,52 @@ impl Session {
|
||||
})
|
||||
}
|
||||
|
||||
/// Buffered subscription with a delivery-cadence knob. Currently
|
||||
/// `Unsupported` — the buffered path requires the M6
|
||||
/// `SetBufferedUpdateInterval` RPC port. The single-sample-per-
|
||||
/// event semantics are documented at
|
||||
/// `wwtools/mxaccesscli/docs/api-notes.md:138-140`.
|
||||
/// Buffered subscription with a delivery-cadence knob.
|
||||
///
|
||||
/// Returns a [`Subscription`] yielding `Result<DataChange, Error>` —
|
||||
/// the **same item type** as plain [`Self::subscribe`]. Buffered is a
|
||||
/// **single-sample, cadence knob — not multi-sample payload**:
|
||||
/// `update_interval_ms` controls how often the AVEVA platform
|
||||
/// flushes the latest value, but each emitted event still carries
|
||||
/// one sample. Verified against
|
||||
/// `wwtools/mxaccesscli/docs/api-notes.md:97-100,138-140,154-157`
|
||||
/// (R2 in `design/70-risks-and-open-questions.md`).
|
||||
///
|
||||
/// ## Wire encoding
|
||||
///
|
||||
/// Mirrors `MxNativeSession.RegisterBufferedItemAsync`
|
||||
/// (`MxNativeSession.cs:272-310`): suffixes the item definition
|
||||
/// with `.property(buffer)` (via
|
||||
/// [`mxaccess_codec::NmxReferenceRegistrationMessage::to_buffered_item_definition`])
|
||||
/// and dispatches a single LMX `RegisterReference` (opcode `0x10`)
|
||||
/// with `subscribe = true`. No `AdviseSupervisory` follow-up is
|
||||
/// emitted — the server treats the subscribe-flagged
|
||||
/// `RegisterReference` as a supervisory advise (verified against
|
||||
/// `captures/082-frida-add-buffered-plain-advise-testint`).
|
||||
///
|
||||
/// `update_interval_ms` is rounded **up** to the nearest 100 ms, the
|
||||
/// same rounding the .NET reference applies in
|
||||
/// `MxNativeCompatibilityServer.SetBufferedUpdateInterval:638`. The
|
||||
/// rounded value is held client-side: native MXAccess does not emit
|
||||
/// a separate `SetBufferedUpdateInterval` RPC (verified by absence
|
||||
/// in the `079`/`082` captures — `mx.set-buffered-interval.begin/end`
|
||||
/// produces no NMX traffic).
|
||||
///
|
||||
/// # Errors
|
||||
/// - [`Error::Connection`] if the session is shut down.
|
||||
/// - [`Error::Configuration`] when `update_interval_ms == 0`
|
||||
/// (matches the .NET reference's
|
||||
/// `ArgumentOutOfRangeException` at
|
||||
/// `MxNativeCompatibilityServer.cs:630-633`), when the resolver
|
||||
/// rejects `reference`, or when the LMX server returns a
|
||||
/// non-zero HRESULT for the `RegisterReference` round-trip.
|
||||
/// - [`Error::Io`] / transport errors from the underlying RPC.
|
||||
pub async fn subscribe_buffered(
|
||||
&self,
|
||||
_reference: &str,
|
||||
_options: BufferedOptions,
|
||||
reference: &str,
|
||||
options: BufferedOptions,
|
||||
) -> Result<Subscription, Error> {
|
||||
Err(Error::Unsupported {
|
||||
operation: Cow::Borrowed("Session::subscribe_buffered (M6)"),
|
||||
transport: TransportKind::Nmx,
|
||||
})
|
||||
self.subscribe_buffered_nmx(reference, options).await
|
||||
}
|
||||
|
||||
/// Orderly shutdown with a wall-clock bound. Wraps
|
||||
@@ -616,6 +695,112 @@ fn mxvalue_to_writevalue(value: MxValue) -> Result<mxaccess_nmx::WriteValue, Err
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
// ---- BufferedOptions ------------------------------------------------
|
||||
|
||||
#[test]
|
||||
fn buffered_options_rounds_below_100_up_to_100() {
|
||||
// ((1 + 99) / 100) * 100 == 100 — same arithmetic as
|
||||
// MxNativeCompatibilityServer.cs:638.
|
||||
assert_eq!(
|
||||
BufferedOptions {
|
||||
update_interval_ms: 1,
|
||||
}
|
||||
.rounded_update_interval_ms(),
|
||||
100
|
||||
);
|
||||
assert_eq!(
|
||||
BufferedOptions {
|
||||
update_interval_ms: 99,
|
||||
}
|
||||
.rounded_update_interval_ms(),
|
||||
100
|
||||
);
|
||||
assert_eq!(
|
||||
BufferedOptions {
|
||||
update_interval_ms: 100,
|
||||
}
|
||||
.rounded_update_interval_ms(),
|
||||
100
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn buffered_options_rounds_partial_up() {
|
||||
// 101 → 200, 250 → 300, 999 → 1000.
|
||||
assert_eq!(
|
||||
BufferedOptions {
|
||||
update_interval_ms: 101,
|
||||
}
|
||||
.rounded_update_interval_ms(),
|
||||
200
|
||||
);
|
||||
assert_eq!(
|
||||
BufferedOptions {
|
||||
update_interval_ms: 250,
|
||||
}
|
||||
.rounded_update_interval_ms(),
|
||||
300
|
||||
);
|
||||
assert_eq!(
|
||||
BufferedOptions {
|
||||
update_interval_ms: 999,
|
||||
}
|
||||
.rounded_update_interval_ms(),
|
||||
1_000
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn buffered_options_preserves_exact_multiples_of_100() {
|
||||
for n in [200, 1_000, 5_000, 60_000] {
|
||||
assert_eq!(
|
||||
BufferedOptions {
|
||||
update_interval_ms: n,
|
||||
}
|
||||
.rounded_update_interval_ms(),
|
||||
n,
|
||||
"exact multiple of 100 should round to itself: {n}"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn buffered_options_rounds_zero_to_zero() {
|
||||
// Zero is the explicit "rejected" value at
|
||||
// MxNativeCompatibilityServer.cs:630-633; the helper itself is
|
||||
// total (`Session::subscribe_buffered` does the validation), so
|
||||
// zero rounds to zero rather than panicking.
|
||||
assert_eq!(
|
||||
BufferedOptions {
|
||||
update_interval_ms: 0,
|
||||
}
|
||||
.rounded_update_interval_ms(),
|
||||
0
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn buffered_options_saturates_at_u32_max() {
|
||||
// u32::MAX + 99 would overflow; the helper saturates instead.
|
||||
let opts = BufferedOptions {
|
||||
update_interval_ms: u32::MAX,
|
||||
};
|
||||
assert_eq!(opts.rounded_update_interval_ms(), u32::MAX);
|
||||
}
|
||||
|
||||
/// Compile-time check that `Session::subscribe_buffered` returns the
|
||||
/// same `Subscription` type as plain `subscribe` (F36 DoD bullet 1
|
||||
/// plus bullet 3 — the API discoverably documents single-sample
|
||||
/// semantics by NOT introducing a separate `BufferedSubscription`-shaped
|
||||
/// return).
|
||||
#[allow(dead_code)]
|
||||
fn _subscribe_buffered_returns_subscription(
|
||||
s: &Session,
|
||||
opts: BufferedOptions,
|
||||
) -> impl std::future::Future<Output = Result<Subscription, Error>> + '_ {
|
||||
s.subscribe_buffered("ignored", opts)
|
||||
}
|
||||
|
||||
// ---- RecoveryPolicy ------------------------------------------------
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -0,0 +1,275 @@
|
||||
//! F40 — optional metrics emission via the [`metrics`] crate.
|
||||
//!
|
||||
//! Behind the `metrics` Cargo feature this module wires session-level
|
||||
//! counters / histograms / gauges into a downstream
|
||||
//! [`metrics::Recorder`]. With the feature off (the default), every
|
||||
//! function in this module compiles to an empty body — no `metrics`
|
||||
//! dep, no per-call overhead, no allocations.
|
||||
//!
|
||||
//! ## Design
|
||||
//!
|
||||
//! - Call sites in [`crate::session`] and [`crate::asb_session`] invoke
|
||||
//! the wrappers below **unconditionally**. The feature gate lives
|
||||
//! inside each wrapper, not at the call site, so the instrumentation
|
||||
//! stays out of the way of the protocol code.
|
||||
//! - Names follow `mxaccess.<scope>.<noun>` (dotted, lowercase) to
|
||||
//! match the convention recommended by the `metrics` crate's
|
||||
//! [naming guide](https://docs.rs/metrics/0.24/metrics/#naming-conventions).
|
||||
//! - All metric definitions are listed below so an operator wiring an
|
||||
//! exporter knows what to expect.
|
||||
//!
|
||||
//! ## Emitted metrics
|
||||
//!
|
||||
//! ### Counters
|
||||
//!
|
||||
//! - `mxaccess.session.writes` — incremented after each successful
|
||||
//! `Session::write_value` / `write_value_at` /
|
||||
//! `write_value_secured_at`. Label: `transport=nmx`.
|
||||
//! - `mxaccess.session.reads` — incremented when `Session::read`
|
||||
//! returns the first DataChange. Label: `transport=nmx`.
|
||||
//! - `mxaccess.session.advises` — incremented after each successful
|
||||
//! `Session::subscribe` (one per `AdviseSupervisory` round-trip).
|
||||
//! Label: `transport=nmx`.
|
||||
//! - `mxaccess.session.unadvises` — incremented after each successful
|
||||
//! `Session::unsubscribe`. Label: `transport=nmx`.
|
||||
//! - `mxaccess.session.recovery_attempts` — incremented on each
|
||||
//! `RecoveryEvent::Started` emission. Label: `transport=nmx`.
|
||||
//! - `mxaccess.session.recovery_successes` — incremented on each
|
||||
//! `RecoveryEvent::Recovered` emission. Label: `transport=nmx`.
|
||||
//! - `mxaccess.asb.writes` / `.reads` / `.publishes` — ASB-side
|
||||
//! counterparts (transport=`asb`). Currently `writes` and `reads`
|
||||
//! are wired; `publishes` is reserved for the streaming subscribe
|
||||
//! path (F40 follow-up).
|
||||
//!
|
||||
//! ### Histograms (seconds)
|
||||
//!
|
||||
//! - `mxaccess.session.write.latency_seconds` — wall time from
|
||||
//! `Session::write*` entry to a successful return (the round-trip
|
||||
//! that ends with the inner LMX `OperationComplete`).
|
||||
//! - `mxaccess.session.read.latency_seconds` — wall time from
|
||||
//! `Session::read` entry to first DataChange parsed.
|
||||
//! - `mxaccess.session.subscribe.first_data_change_seconds` —
|
||||
//! currently unused (reserved for a future first-DataChange
|
||||
//! instrumentation in `Subscription::poll_next`); kept in the
|
||||
//! spec for parity with the F40 design.
|
||||
//!
|
||||
//! ### Gauges
|
||||
//!
|
||||
//! - `mxaccess.session.connected` — `1` while a `Session` is
|
||||
//! connected, `0` after `shutdown_nmx`. Operators can sum across
|
||||
//! processes for a "live sessions" view.
|
||||
//! - `mxaccess.session.registered_items` — current size of the
|
||||
//! subscription registry (`SessionInner::subscriptions`); rises
|
||||
//! on `subscribe`, falls on `unsubscribe`.
|
||||
//! - `mxaccess.session.active_subscriptions` — alias of
|
||||
//! `registered_items` for now (kept distinct so a future
|
||||
//! "registered but unsubscribed" distinction can split them
|
||||
//! without a metric rename).
|
||||
//!
|
||||
//! ## Wiring an exporter
|
||||
//!
|
||||
//! ```ignore
|
||||
//! // In a downstream binary, behind the same feature flag:
|
||||
//! # #[cfg(feature = "metrics")] {
|
||||
//! let recorder = metrics_exporter_prometheus::PrometheusBuilder::new()
|
||||
//! .install_recorder()
|
||||
//! .unwrap();
|
||||
//! // ... later, scrape `/metrics` from `recorder.handle().render()`.
|
||||
//! # }
|
||||
//! ```
|
||||
//!
|
||||
//! No exporter is provided by this crate; pick one from the
|
||||
//! [`metrics-exporter-*`](https://crates.io/search?q=metrics-exporter)
|
||||
//! ecosystem (Prometheus, StatsD, OpenTelemetry, …).
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
// Static label slices reused at every call site. The `metrics` crate's
|
||||
// `counter!`/`histogram!`/`gauge!` macros accept `(&'static str, &'static str)`
|
||||
// pairs directly, but having the labels here keeps the call sites brief.
|
||||
#[cfg(feature = "metrics")]
|
||||
const TRANSPORT_NMX: (&str, &str) = ("transport", "nmx");
|
||||
#[cfg(feature = "metrics")]
|
||||
const TRANSPORT_ASB: (&str, &str) = ("transport", "asb");
|
||||
|
||||
// ---- Counters ------------------------------------------------------------
|
||||
|
||||
/// One successful NMX `Session::write*` round-trip.
|
||||
pub(crate) fn record_write() {
|
||||
#[cfg(feature = "metrics")]
|
||||
metrics::counter!("mxaccess.session.writes", &[TRANSPORT_NMX]).increment(1);
|
||||
}
|
||||
|
||||
/// One successful NMX `Session::read` (first DataChange returned).
|
||||
pub(crate) fn record_read() {
|
||||
#[cfg(feature = "metrics")]
|
||||
metrics::counter!("mxaccess.session.reads", &[TRANSPORT_NMX]).increment(1);
|
||||
}
|
||||
|
||||
/// One successful NMX `Session::subscribe` (`AdviseSupervisory` ack).
|
||||
pub(crate) fn record_advise() {
|
||||
#[cfg(feature = "metrics")]
|
||||
metrics::counter!("mxaccess.session.advises", &[TRANSPORT_NMX]).increment(1);
|
||||
}
|
||||
|
||||
/// One successful NMX `Session::unsubscribe` (`UnAdvise` ack).
|
||||
pub(crate) fn record_unadvise() {
|
||||
#[cfg(feature = "metrics")]
|
||||
metrics::counter!("mxaccess.session.unadvises", &[TRANSPORT_NMX]).increment(1);
|
||||
}
|
||||
|
||||
/// `RecoveryEvent::Started` emitted (one per recovery attempt).
|
||||
pub(crate) fn record_recovery_attempt() {
|
||||
#[cfg(feature = "metrics")]
|
||||
metrics::counter!("mxaccess.session.recovery_attempts", &[TRANSPORT_NMX]).increment(1);
|
||||
}
|
||||
|
||||
/// `RecoveryEvent::Recovered` emitted (rebuild + re-advise succeeded).
|
||||
pub(crate) fn record_recovery_success() {
|
||||
#[cfg(feature = "metrics")]
|
||||
metrics::counter!("mxaccess.session.recovery_successes", &[TRANSPORT_NMX]).increment(1);
|
||||
}
|
||||
|
||||
/// One successful ASB `AsbSession::write` round-trip.
|
||||
pub(crate) fn record_asb_write() {
|
||||
#[cfg(feature = "metrics")]
|
||||
metrics::counter!("mxaccess.asb.writes", &[TRANSPORT_ASB]).increment(1);
|
||||
}
|
||||
|
||||
/// One successful ASB `AsbSession::read` round-trip.
|
||||
pub(crate) fn record_asb_read() {
|
||||
#[cfg(feature = "metrics")]
|
||||
metrics::counter!("mxaccess.asb.reads", &[TRANSPORT_ASB]).increment(1);
|
||||
}
|
||||
|
||||
// ---- Histograms ----------------------------------------------------------
|
||||
|
||||
/// Wall-time of a successful NMX write.
|
||||
pub(crate) fn record_write_latency(elapsed: Duration) {
|
||||
#[cfg(feature = "metrics")]
|
||||
metrics::histogram!("mxaccess.session.write.latency_seconds", &[TRANSPORT_NMX])
|
||||
.record(elapsed.as_secs_f64());
|
||||
#[cfg(not(feature = "metrics"))]
|
||||
let _ = elapsed;
|
||||
}
|
||||
|
||||
/// Wall-time of a successful NMX read (subscribe → first DataChange).
|
||||
pub(crate) fn record_read_latency(elapsed: Duration) {
|
||||
#[cfg(feature = "metrics")]
|
||||
metrics::histogram!("mxaccess.session.read.latency_seconds", &[TRANSPORT_NMX])
|
||||
.record(elapsed.as_secs_f64());
|
||||
#[cfg(not(feature = "metrics"))]
|
||||
let _ = elapsed;
|
||||
}
|
||||
|
||||
// ---- Gauges --------------------------------------------------------------
|
||||
|
||||
/// Set the `connected` gauge to `connected as f64`.
|
||||
pub(crate) fn set_connected(connected: bool) {
|
||||
#[cfg(feature = "metrics")]
|
||||
metrics::gauge!("mxaccess.session.connected", &[TRANSPORT_NMX]).set(if connected {
|
||||
1.0
|
||||
} else {
|
||||
0.0
|
||||
});
|
||||
#[cfg(not(feature = "metrics"))]
|
||||
let _ = connected;
|
||||
}
|
||||
|
||||
/// Set the `registered_items` / `active_subscriptions` gauges to `count`.
|
||||
pub(crate) fn set_registered_items(count: usize) {
|
||||
#[cfg(feature = "metrics")]
|
||||
{
|
||||
let v = count as f64;
|
||||
metrics::gauge!("mxaccess.session.registered_items", &[TRANSPORT_NMX]).set(v);
|
||||
metrics::gauge!("mxaccess.session.active_subscriptions", &[TRANSPORT_NMX]).set(v);
|
||||
}
|
||||
#[cfg(not(feature = "metrics"))]
|
||||
let _ = count;
|
||||
}
|
||||
|
||||
// ---- Tests ---------------------------------------------------------------
|
||||
|
||||
#[cfg(all(test, feature = "metrics"))]
|
||||
#[allow(clippy::unwrap_used, clippy::panic, clippy::expect_used)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use metrics::{CounterFn, GaugeFn, HistogramFn};
|
||||
use metrics::{Key, Label, Recorder, SharedString, Unit};
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
|
||||
/// Minimal in-memory recorder that counts calls per metric name. We
|
||||
/// only need to verify a counter increments — full per-label
|
||||
/// aggregation is the exporter's job.
|
||||
#[derive(Default)]
|
||||
struct CountingRecorder {
|
||||
writes: Arc<AtomicU64>,
|
||||
}
|
||||
|
||||
struct CountingCounter(Arc<AtomicU64>);
|
||||
impl CounterFn for CountingCounter {
|
||||
fn increment(&self, value: u64) {
|
||||
self.0.fetch_add(value, Ordering::Relaxed);
|
||||
}
|
||||
fn absolute(&self, value: u64) {
|
||||
self.0.store(value, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
struct NoopHistogram;
|
||||
impl HistogramFn for NoopHistogram {
|
||||
fn record(&self, _: f64) {}
|
||||
}
|
||||
|
||||
struct NoopGauge;
|
||||
impl GaugeFn for NoopGauge {
|
||||
fn increment(&self, _: f64) {}
|
||||
fn decrement(&self, _: f64) {}
|
||||
fn set(&self, _: f64) {}
|
||||
}
|
||||
|
||||
impl Recorder for CountingRecorder {
|
||||
fn describe_counter(&self, _: metrics::KeyName, _: Option<Unit>, _: SharedString) {}
|
||||
fn describe_gauge(&self, _: metrics::KeyName, _: Option<Unit>, _: SharedString) {}
|
||||
fn describe_histogram(&self, _: metrics::KeyName, _: Option<Unit>, _: SharedString) {}
|
||||
|
||||
fn register_counter(&self, key: &Key, _: &metrics::Metadata<'_>) -> metrics::Counter {
|
||||
if key.name() == "mxaccess.session.writes" {
|
||||
metrics::Counter::from_arc(Arc::new(CountingCounter(self.writes.clone())))
|
||||
} else {
|
||||
metrics::Counter::noop()
|
||||
}
|
||||
}
|
||||
fn register_gauge(&self, _: &Key, _: &metrics::Metadata<'_>) -> metrics::Gauge {
|
||||
metrics::Gauge::from_arc(Arc::new(NoopGauge))
|
||||
}
|
||||
fn register_histogram(&self, _: &Key, _: &metrics::Metadata<'_>) -> metrics::Histogram {
|
||||
metrics::Histogram::from_arc(Arc::new(NoopHistogram))
|
||||
}
|
||||
}
|
||||
|
||||
/// DoD bullet 4: at least one metric counter increments under a
|
||||
/// unit test. We swap in the `CountingRecorder` for the duration
|
||||
/// of the test using `metrics::with_local_recorder` — this is
|
||||
/// the supported way to scope a recorder to a single test
|
||||
/// without poisoning the global slot.
|
||||
#[test]
|
||||
fn record_write_increments_session_writes_counter() {
|
||||
let recorder = CountingRecorder::default();
|
||||
let counter = recorder.writes.clone();
|
||||
|
||||
metrics::with_local_recorder(&recorder, || {
|
||||
record_write();
|
||||
record_write();
|
||||
record_write();
|
||||
});
|
||||
|
||||
assert_eq!(counter.load(Ordering::Relaxed), 3);
|
||||
|
||||
// Touch the unused-import warnings so the macro keeps Label /
|
||||
// Key in scope across `metrics` minor bumps.
|
||||
let _ = Label::new("k", "v");
|
||||
let _: Option<&Key> = None;
|
||||
}
|
||||
}
|
||||
@@ -33,7 +33,9 @@ use std::sync::Arc;
|
||||
use std::time::SystemTime;
|
||||
|
||||
use mxaccess_callback::{CallbackEvent, CallbackExporter, ExporterIdentities};
|
||||
use mxaccess_codec::{MxStatus, NmxSubscriptionMessage, NmxSubscriptionRecord};
|
||||
use mxaccess_codec::{
|
||||
MxStatus, NmxReferenceRegistrationMessage, NmxSubscriptionMessage, NmxSubscriptionRecord,
|
||||
};
|
||||
use mxaccess_galaxy::{GalaxyTagMetadata, Resolver, ResolverError};
|
||||
use mxaccess_nmx::{NmxClient, NmxClientError, WriteValue};
|
||||
use mxaccess_rpc::guid::Guid;
|
||||
@@ -47,6 +49,7 @@ use tokio::sync::{Mutex, broadcast};
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio_stream::wrappers::BroadcastStream;
|
||||
|
||||
use crate::metrics as session_metrics;
|
||||
use crate::{DataChange, RecoveryEvent};
|
||||
use futures_util::Stream;
|
||||
|
||||
@@ -587,6 +590,10 @@ impl Session {
|
||||
|
||||
let (recovery_tx, _) = broadcast::channel(RECOVERY_BROADCAST_CAPACITY);
|
||||
|
||||
// F40 — gauge stays at 1 until shutdown_nmx flips it.
|
||||
session_metrics::set_connected(true);
|
||||
session_metrics::set_registered_items(0);
|
||||
|
||||
Ok(Self {
|
||||
inner: Arc::new(SessionInner {
|
||||
options,
|
||||
@@ -683,12 +690,16 @@ impl Session {
|
||||
|
||||
let mut last_error: Option<Error> = None;
|
||||
for attempt in 1..=policy.max_attempts {
|
||||
// F40 — increment attempt counter at Started emission.
|
||||
session_metrics::record_recovery_attempt();
|
||||
let _ = self
|
||||
.inner
|
||||
.recovery_tx
|
||||
.send(Arc::new(RecoveryEvent::Started { attempt }));
|
||||
match self.recover_connection_core(&factory).await {
|
||||
Ok(()) => {
|
||||
// F40 — increment success counter on Recovered.
|
||||
session_metrics::record_recovery_success();
|
||||
let _ = self
|
||||
.inner
|
||||
.recovery_tx
|
||||
@@ -859,6 +870,7 @@ impl Session {
|
||||
.await
|
||||
.map_err(map_resolver)?;
|
||||
let opts = &inner.options;
|
||||
let started = std::time::Instant::now();
|
||||
let mut nmx = inner.nmx.lock().await;
|
||||
let hr = nmx
|
||||
.write(
|
||||
@@ -874,6 +886,9 @@ impl Session {
|
||||
.await
|
||||
.map_err(map_nmx)?;
|
||||
ensure_hresult_ok(hr)?;
|
||||
// F40 — count + record latency only on the success path.
|
||||
session_metrics::record_write();
|
||||
session_metrics::record_write_latency(started.elapsed());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -900,6 +915,7 @@ impl Session {
|
||||
.await
|
||||
.map_err(map_resolver)?;
|
||||
let opts = &inner.options;
|
||||
let started = std::time::Instant::now();
|
||||
let mut nmx = inner.nmx.lock().await;
|
||||
let hr = nmx
|
||||
.write2(
|
||||
@@ -916,6 +932,10 @@ impl Session {
|
||||
.await
|
||||
.map_err(map_nmx)?;
|
||||
ensure_hresult_ok(hr)?;
|
||||
// F40 — write2 shares the writes counter (same Session::write*
|
||||
// family on the wire).
|
||||
session_metrics::record_write();
|
||||
session_metrics::record_write_latency(started.elapsed());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -951,6 +971,7 @@ impl Session {
|
||||
.await
|
||||
.map_err(map_resolver)?;
|
||||
let opts = &inner.options;
|
||||
let started = std::time::Instant::now();
|
||||
let mut nmx = inner.nmx.lock().await;
|
||||
let hr = nmx
|
||||
.write_secured2(
|
||||
@@ -970,6 +991,9 @@ impl Session {
|
||||
.await
|
||||
.map_err(map_nmx)?;
|
||||
ensure_hresult_ok(hr)?;
|
||||
// F40 — secured-write success counts toward the writes total.
|
||||
session_metrics::record_write();
|
||||
session_metrics::record_write_latency(started.elapsed());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1076,12 +1100,147 @@ impl Session {
|
||||
// replay AdviseSupervisory after a transport rebuild. Inserted
|
||||
// AFTER the wire AdviseSupervisory succeeds — failed advises
|
||||
// never enter the registry.
|
||||
inner.subscriptions.lock().await.insert(
|
||||
let registry_size = {
|
||||
let mut reg = inner.subscriptions.lock().await;
|
||||
reg.insert(
|
||||
correlation_id,
|
||||
SubscriptionEntry {
|
||||
metadata: Arc::clone(&metadata_arc),
|
||||
},
|
||||
);
|
||||
reg.len()
|
||||
};
|
||||
// F40 — count the advise + update the gauge under the same
|
||||
// ordering as the registry insert so the gauge value matches
|
||||
// what `recover_connection`'s snapshot would observe.
|
||||
session_metrics::record_advise();
|
||||
session_metrics::set_registered_items(registry_size);
|
||||
|
||||
Ok(Subscription {
|
||||
correlation_id,
|
||||
SubscriptionEntry {
|
||||
metadata: Arc::clone(&metadata_arc),
|
||||
},
|
||||
);
|
||||
reference: Arc::<str>::from(reference),
|
||||
metadata: metadata_arc,
|
||||
inbound,
|
||||
pending: std::collections::VecDeque::new(),
|
||||
})
|
||||
}
|
||||
|
||||
/// Real implementation of [`Session::subscribe_buffered`] for the
|
||||
/// NMX transport. Mirrors `MxNativeSession.RegisterBufferedItemAsync`
|
||||
/// (`MxNativeSession.cs:272-310`).
|
||||
///
|
||||
/// Wire-side: builds a single LMX `RegisterReference` (opcode `0x10`)
|
||||
/// frame whose `item_definition` carries the `.property(buffer)`
|
||||
/// suffix and whose `subscribe` flag is `true`. The server treats
|
||||
/// this as a buffered supervisory advise — no separate
|
||||
/// `AdviseSupervisory` follow-up is needed (verified against
|
||||
/// `captures/082-frida-add-buffered-plain-advise-testint`, which
|
||||
/// shows exactly one `RegisterReference` and zero `AdviseSupervisory`
|
||||
/// frames between `mx.set-buffered-interval` and the first
|
||||
/// `OnBufferedDataChange`).
|
||||
///
|
||||
/// `update_interval_ms` is rounded up to the nearest 100 ms via
|
||||
/// [`crate::BufferedOptions::rounded_update_interval_ms`] and
|
||||
/// retained client-side. Native MXAccess does not emit a separate
|
||||
/// `SetBufferedUpdateInterval` RPC — the .NET reference's
|
||||
/// `MxNativeCompatibilityServer.SetBufferedUpdateInterval`
|
||||
/// (`cs:627-640`) only updates an in-memory dictionary; the rounding
|
||||
/// is the only behaviour preserved on the Rust port today (the
|
||||
/// rounded value is currently informational — a future iteration
|
||||
/// can expose it via a getter when a consumer needs it).
|
||||
pub(crate) async fn subscribe_buffered_nmx(
|
||||
&self,
|
||||
reference: &str,
|
||||
options: crate::BufferedOptions,
|
||||
) -> Result<Subscription, Error> {
|
||||
self.ensure_connected()?;
|
||||
if options.update_interval_ms == 0 {
|
||||
// Mirrors `MxNativeCompatibilityServer.cs:630-633` —
|
||||
// `ArgumentOutOfRangeException` for non-positive intervals.
|
||||
return Err(Error::Configuration(ConfigError::InvalidArgument {
|
||||
detail: "BufferedOptions::update_interval_ms must be positive".to_string(),
|
||||
}));
|
||||
}
|
||||
// Round up to nearest 100ms (cs:638). The rounded value is
|
||||
// computed for parity with the .NET reference; it is currently
|
||||
// not transmitted on the wire because native MXAccess holds it
|
||||
// client-side only (see capture 082's missing
|
||||
// `SetBufferedUpdateInterval` frame).
|
||||
let _rounded_ms = options.rounded_update_interval_ms();
|
||||
|
||||
let inner = self.inner.clone();
|
||||
let metadata = inner
|
||||
.resolver
|
||||
.resolve(reference)
|
||||
.await
|
||||
.map_err(map_resolver)?;
|
||||
let correlation_id: [u8; 16] = rand::random();
|
||||
|
||||
// Build the buffered RegisterReference body. Item definition is
|
||||
// the full reference suffixed with `.property(buffer)`; item
|
||||
// context is empty for this single-string form (the .NET
|
||||
// reference's split-context form is reachable via the
|
||||
// compat-server layer F35 once it lands). The codec helper
|
||||
// rejects empty/whitespace inputs with `CodecError::InvalidName`.
|
||||
let item_definition =
|
||||
NmxReferenceRegistrationMessage::to_buffered_item_definition(reference)
|
||||
.map_err(|e| {
|
||||
Error::Configuration(ConfigError::InvalidArgument {
|
||||
detail: format!("buffered item definition: {e}"),
|
||||
})
|
||||
})?;
|
||||
let registration = NmxReferenceRegistrationMessage {
|
||||
item_handle: 0,
|
||||
item_correlation_id: correlation_id,
|
||||
item_definition,
|
||||
item_context: String::new(),
|
||||
subscribe: true,
|
||||
reserved_25_27: [0; 2],
|
||||
reserved_31_55: [0; 24],
|
||||
};
|
||||
|
||||
let opts = &inner.options;
|
||||
// Subscribe to the broadcast BEFORE issuing the advise so updates
|
||||
// arriving immediately after don't slip the gap (same ordering
|
||||
// rationale as plain `subscribe`).
|
||||
let inbound = Box::pin(BroadcastStream::new(self.inner.callback_tx.subscribe()));
|
||||
|
||||
let mut nmx = inner.nmx.lock().await;
|
||||
let hr = nmx
|
||||
.register_reference(
|
||||
opts.local_engine_id,
|
||||
&metadata,
|
||||
®istration,
|
||||
opts.galaxy_id,
|
||||
/* source_galaxy_id */ i32::from(opts.galaxy_id),
|
||||
opts.source_platform_id,
|
||||
)
|
||||
.await
|
||||
.map_err(map_nmx)?;
|
||||
ensure_hresult_ok(hr)?;
|
||||
drop(nmx);
|
||||
|
||||
let metadata_arc = Arc::new(metadata);
|
||||
// Record the active subscription so recover_connection can replay
|
||||
// it after a transport rebuild. The replay path currently uses
|
||||
// `AdviseSupervisory` for every entry; for buffered subscriptions
|
||||
// that path is functionally equivalent (the LMX server already
|
||||
// remembers the buffered registration via the `.property(buffer)`
|
||||
// suffix carried in the metadata's name). Tracked as a sub-followup
|
||||
// — see `design/followups.md` if a future iteration wants to
|
||||
// re-issue `RegisterReference` instead.
|
||||
let registry_size = {
|
||||
let mut reg = inner.subscriptions.lock().await;
|
||||
reg.insert(
|
||||
correlation_id,
|
||||
SubscriptionEntry {
|
||||
metadata: Arc::clone(&metadata_arc),
|
||||
},
|
||||
);
|
||||
reg.len()
|
||||
};
|
||||
session_metrics::record_advise();
|
||||
session_metrics::set_registered_items(registry_size);
|
||||
|
||||
Ok(Subscription {
|
||||
correlation_id,
|
||||
@@ -1126,6 +1285,11 @@ impl Session {
|
||||
}));
|
||||
}
|
||||
|
||||
// F40 — clock the whole read-as-subscribe (including the
|
||||
// AdviseSupervisory round-trip) so the histogram captures the
|
||||
// same wall time a consumer would see.
|
||||
let started = std::time::Instant::now();
|
||||
|
||||
// Subscribe through the public path so the broadcast wiring +
|
||||
// AdviseSupervisory both run.
|
||||
let subscription = self.subscribe(reference).await?;
|
||||
@@ -1143,6 +1307,10 @@ impl Session {
|
||||
Ok(None) => Err(Error::Connection(ConnectionError::EngineNotRegistered)),
|
||||
Err(_elapsed) => Err(Error::Timeout(timeout)),
|
||||
};
|
||||
if result.is_ok() {
|
||||
session_metrics::record_read();
|
||||
session_metrics::record_read_latency(started.elapsed());
|
||||
}
|
||||
|
||||
// Best-effort unsubscribe. The .NET finally block at cs:351-358
|
||||
// ignores the return of Unsubscribe; mirror that — a failed
|
||||
@@ -1192,11 +1360,14 @@ impl Session {
|
||||
// We do this only on the success path — if UnAdvise itself
|
||||
// failed, the server may still hold the supervisory record and
|
||||
// a future recover_connection should re-issue the advise.
|
||||
inner
|
||||
.subscriptions
|
||||
.lock()
|
||||
.await
|
||||
.remove(&subscription.correlation_id);
|
||||
let registry_size = {
|
||||
let mut reg = inner.subscriptions.lock().await;
|
||||
reg.remove(&subscription.correlation_id);
|
||||
reg.len()
|
||||
};
|
||||
// F40 — count the unadvise + update the gauge.
|
||||
session_metrics::record_unadvise();
|
||||
session_metrics::set_registered_items(registry_size);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1219,6 +1390,11 @@ impl Session {
|
||||
{
|
||||
return Ok(());
|
||||
}
|
||||
// F40 — flip the connected gauge as soon as the atomic flips
|
||||
// so a concurrent scrape never sees connected=1 alongside a
|
||||
// shutdown-in-progress.
|
||||
session_metrics::set_connected(false);
|
||||
session_metrics::set_registered_items(0);
|
||||
|
||||
// 1. Unregister the engine on the wire first, while the NMX
|
||||
// transport is still live.
|
||||
|
||||
Reference in New Issue
Block a user