Compare commits

..

5 Commits

Author SHA1 Message Date
Joseph Doherty 25befcb72e design/followups: move F45 + F47 to Resolved (M6 + spawned closures)
rust / build / test / clippy / fmt (push) Has been cancelled
rust / cargo public-api drift check (F41) (push) Has been cancelled
F45 (commit 9b57cf8) and F47 (commit 1a1830f) close the buffered-
subscription recovery + unsubscribe symmetry gap that F36 left open.
The Open section now contains only F3 (cross-domain NTLM Type1/2/3
fixture, permanently external-blocked on this single-domain dev
host — needs multi-domain Windows lab).

This is the end-state for V1: all M0-M6 followups resolved plus the
two M6-spawned follow-ons. F3 stays Open as a documented external
gap; reopen it if the dev host gains a second domain.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-06 05:59:38 -04:00
Joseph Doherty 1a1830f3bf [F47] mxaccess: unsubscribe skips UnAdvise for buffered subscriptions
Mirrors the .NET reference's `if (!subscription.IsBuffered)` guard
at `MxNativeSession.cs:361-381`. The Rust port previously emitted an
`UnAdvise` frame for both plain and buffered subscriptions; the
buffered server-side registration is unwound by the engine when the
`RegisterReference` handle goes away, so emitting an `UnAdvise` for
buffered entries is at best a no-op extra frame and at worst could
race with the engine's own teardown.

Fix: branch `Session::unsubscribe` on `SubscriptionEntry::mode` (the
discriminator F45 added). For `SubscriptionMode::Buffered { ... }`,
skip the `un_advise` call and proceed directly to registry cleanup.
For `SubscriptionMode::Plain`, retain the previous behaviour.

The registry-entry probe runs first (separate lock acquisition) so
the `is_buffered` decision doesn't hold the NMX-client mutex
unnecessarily — common case where the entry is plain still acquires
the NMX lock immediately after.

The metrics counter `record_unadvise()` still fires on every public
`unsubscribe` call regardless of mode — it tracks consumer-side
unsubscribe rate, not wire-frame rate. That matches what dashboards
expect from the public API.

New unit test `unsubscribe_skips_un_advise_for_buffered_subscription`
issues a plain subscribe (recorded as 1 RPC), mutates the registry
entry to `SubscriptionMode::Buffered`, calls unsubscribe, and
asserts the recorded RPC count stays at 1 (no UnAdvise emitted).
The existing `subscribe_populates_registry_unsubscribe_clears_it`
test serves as the negative control for the plain branch.

Workspace 794 → 795 tests; clippy clean; rustdoc clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-06 05:58:57 -04:00
Joseph Doherty 9b57cf8f3b [F45] mxaccess: recovery replay re-issues RegisterReference for buffered subs
`Session::recover_connection_core` previously walked
`SessionInner::subscriptions` and replayed every entry via
`AdviseSupervisory`, which lost the `.property(buffer)` registration
on buffered subscriptions — silently downgrading buffered → plain on
transport rebuild.

Fix:

- New `pub(crate) enum SubscriptionMode { Plain, Buffered { ... } }`
  discriminator carried on each `SubscriptionEntry`. Buffered variant
  retains the un-suffixed reference + the rounded interval (so the
  re-issued buffered registration matches the original cadence) +
  the empty `item_context` / zero `item_handle` matching the wire
  send.
- `Session::subscribe` (plain path) records `SubscriptionMode::Plain`.
  `subscribe_buffered_nmx` records `SubscriptionMode::Buffered { ... }`.
- `recover_connection_core` matches on `entry.mode`. Plain branch
  unchanged. Buffered branch re-applies `.property(buffer)` via
  `to_buffered_item_definition` (idempotent), rebuilds the original
  `NmxReferenceRegistrationMessage` with the saved correlation id +
  `subscribe = true`, and dispatches `register_reference` (kind=
  ItemControl, inner command 0x10) against the replacement
  transport. Mirrors `MxNativeSession.ReAdviseSubscription`
  (`MxNativeSession.cs:538-569`).

New unit test `recover_connection_replays_buffered_subscription_via_
register_reference` synthesises a buffered registry entry, installs a
`RebuildFactory` pointing at a recording NMX server, drives
`recover_connection`, then asserts the recorded `TransferData` carries
inner command `0x10` (NOT `0x1f`) with the `.property(buffer)`-
suffixed item_definition + the saved correlation id + subscribe=true.

Side-finding worth filing separately: `Session::unsubscribe`
unconditionally calls `un_advise` for both plain and buffered
entries, but the .NET reference's `Unsubscribe`
(`MxNativeSession.cs:361-381`) skips `UnAdvise` for buffered
(`if (!subscription.IsBuffered)`). Out of scope for F45 (recovery-
only); will file as F47.

Public API unchanged. `SubscriptionMode` + `SubscriptionEntry` stay
`pub(crate)` — `cargo public-api -p mxaccess` baseline is unchanged.

Workspace 793 → 794 tests; clippy clean; rustdoc clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-06 05:54:30 -04:00
Joseph Doherty 2281309a86 design/followups: move F46 to Resolved (Frida hooks landed) 2026-05-06 05:43:43 -04:00
Joseph Doherty 808fea18a0 [F46] analysis/frida: Suspend/Activate hooks + R5 next-step
Closes the wire-side gap left by capture 077 in F44's R5 walk. The Frida
script now hooks the production LmxProxy.dll dispatchers so a future live
re-run on the AVEVA host can answer "does CLMXProxyServer issue a separate
ORPC method for Suspend/Activate, or are they synthesised client-side?"

Hooks added in `analysis/frida/mx-nmx-trace.js`:
- `LmxProxy.dll!CLMXProxyServer.Suspend`  @ RVA 0x13d9c (FUN_10013d9c)
- `LmxProxy.dll!CLMXProxyServer.Activate` @ RVA 0x14028 (FUN_10014028)

Both RVAs were extracted from
`analysis/ghidra/exports/LmxProxy.dll.string-refs.tsv` rows 119/122 (the
`CLMXProxyServer::Suspend - Server Handle` / `Activate - Server Handle`
log strings each xref one function — same pattern as the existing
AdviseSupervisory hook at 0x142b4). The hooks emit `mx.suspend.begin/end`
and `mx.activate.begin/end` events with serverHandle, itemHandle, and the
`MxStatus*` out parameter decoded as 4 x int16 (Success / Category /
DetectedBy / Detail per `src/MxNativeCodec/MxStatus.cs`). Naming matches
the F46 spec's `mx.<verb>.begin / end` grep convention rather than the
generic `call.enter / leave` shape because we want to filter these out
of large traces without false positives from other LmxProxy entrypoints.

No `Resume` / `Reactivate` exports exist in `LmxProxy.dll` — verified
against `analysis/ghidra/exports/LmxProxy.dll.ghidra.md` (no such string
xrefs) and the decompiled `ILMXProxyServer5` / `ILMXProxyServer4`
interfaces under `analysis/decompiled-mxaccess/ArchestrA/MxAccess/`
(only Suspend and Activate are declared on the dispatch interface).

The script's top-of-file comment now carries the live re-run procedure
(rebuild MxTraceHarness x86, attach Frida with `--scenario=suspend-advised`
then `--scenario=activate-advised`, save under
`captures/NNN-frida-suspend-activate-instrumented/`, grep the new TSV for
`mx.suspend.*` / `mx.activate.*` and correlate with `nmx.enter` events
in the same time window). Live capture is intentionally deferred to the
maintainer per the F46 spec — this dev box has no AVEVA install.

`design/70-risks-and-open-questions.md` R5 status updated:
- Title flag `(filed as F45)` -> `(filed as F46, hook landed pending live re-run)`
  (the docs/M6-buffered-evidence.md footnote referenced F45 from before
  F45 / F46 were de-conflicted by commit 2120dfa).
- New "Next step - F46" paragraph documents the two hooked RVAs, the
  out-param decode shape, and the verified absence of Resume / Reactivate
  symbols.
- "Current best answer" paragraph re-points the residual ORPC question
  at F46.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-06 05:42:57 -04:00
4 changed files with 679 additions and 81 deletions
+129
View File
@@ -1,5 +1,55 @@
// Frida hooks generated from headless Ghidra RVAs. // Frida hooks generated from headless Ghidra RVAs.
// Usage: frida -f <MxTraceHarness.exe> -l analysis/frida/mx-nmx-trace.js -- <harness args> // Usage: frida -f <MxTraceHarness.exe> -l analysis/frida/mx-nmx-trace.js -- <harness args>
//
// F46 — Suspend / Activate instrumentation procedure
// ---------------------------------------------------
// The `mx.suspend.*` and `mx.activate.*` events below close the wire-side gap
// left by capture 077 (`captures/077-frida-suspend-advised-scanstate/`). The
// hooks attach to `LmxProxy.dll!CLMXProxyServer.Suspend` (RVA 0x13d9c, FUN_10013d9c)
// and `LmxProxy.dll!CLMXProxyServer.Activate` (RVA 0x14028, FUN_10014028) — the
// two RVAs were extracted from `analysis/ghidra/exports/LmxProxy.dll.string-refs.tsv`
// (rows tagged `CLMXProxyServer::Suspend - Server Handle` and
// `CLMXProxyServer::Activate - Server Handle`). The export table does NOT
// expose `Resume` or `Reactivate` symbols anywhere in `LmxProxy.dll`,
// `Lmx.dll`, or the `ILMXProxyServer5` interface — verified against
// `analysis/ghidra/exports/LmxProxy.dll.ghidra.md` and the decompiled
// interface at `analysis/decompiled-mxaccess/ArchestrA/MxAccess/ILMXProxyServer5.cs`.
//
// To re-run capture 077 with the new hooks active (left for the maintainer
// on the live AVEVA host):
//
// 1. Rebuild the x86 trace harness:
// msbuild src\MxTraceHarness\MxTraceHarness.csproj /p:Configuration=Release
// 2. Suspend-advised scenario:
// frida ^
// -f src\MxTraceHarness\bin\Release\net481\MxTraceHarness.exe ^
// -l analysis\frida\mx-nmx-trace.js ^
// -- --scenario=suspend-advised ^
// --tag=TestChildObject.ScanState ^
// --write-delay-ms=1000 ^
// --duration=3 ^
// --log=captures\NNN-frida-suspend-activate-instrumented\harness.log ^
// --client=MxFridaTrace-NNN
// 3. Activate-advised scenario (re-runs Suspend then Activate):
// frida ^
// -f src\MxTraceHarness\bin\Release\net481\MxTraceHarness.exe ^
// -l analysis\frida\mx-nmx-trace.js ^
// -- --scenario=activate-advised ^
// --tag=TestChildObject.ScanState ^
// --write-delay-ms=1000 ^
// --duration=3 ^
// --log=captures\NNN-frida-suspend-activate-instrumented\harness.log ^
// --client=MxFridaTrace-NNN
// 4. Save the resulting `frida-events.tsv` (plus `harness.log`,
// `frida-command.txt`, `frida.stdout.jsonl`) under
// `captures/NNN-frida-suspend-activate-instrumented/` (next free NNN).
// 5. Grep for `mx.suspend.begin|mx.suspend.end|mx.activate.begin|mx.activate.end`
// in the new TSV. If any matching `nmx.enter` / `lmx.*` events appear in
// the same time window — typed decode the body and update
// `analysis/proxy/nmxsvcps-procedures.tsv` + `docs/M6-buffered-evidence.md`.
// If no NMX traffic accompanies the hook fires — Suspend/Activate are
// confirmed client-side-only and R5 in `design/70-risks-and-open-questions.md`
// moves to "fully settled — client-side only".
const maxDump = 4096; const maxDump = 4096;
const installed = {}; const installed = {};
@@ -173,6 +223,79 @@ function hookPlainArgs(moduleName, rva, name, argCount) {
}); });
} }
function readMxStatusOut(ptrValue) {
// MxStatus on the wire is 4 × int16 = 8 bytes:
// short Success, short Category, short DetectedBy, short Detail.
// See src/MxNativeCodec/MxStatus.cs and the .NET reference's
// `out MxStatus pMxStatus` parameter on ILMXProxyServer5.{Suspend,Activate}.
try {
if (ptrValue.isNull()) return null;
return {
raw: dumpBytes(ptrValue, 8),
success: ptrValue.add(0).readS16(),
category: ptrValue.add(2).readS16(),
detectedBy: ptrValue.add(4).readS16(),
detail: ptrValue.add(6).readS16()
};
} catch (e) {
return { error: e.message };
}
}
function hookSuspendActivate(rva, name, eventVerb) {
// CLMXProxyServer::Suspend / Activate are __stdcall member methods:
// HRESULT Suspend(int hLMXServerHandle, int hItem, MxStatus* pMxStatusOut)
// After Frida's __stdcall lowering, args[0] = this (because the prologue
// pushes ECX into the stack frame the same way AdviseSupervisory does at
// RVA 0x142b4), args[1] = serverHandle, args[2] = itemHandle,
// args[3] = MxStatus* out. Mirrors the AdviseSupervisory hookPlainArgs
// shape but with typed out-param decoding (cf. hookAuthenticateUser).
hook("LmxProxy.dll", rva, name, function (address, module) {
return {
onEnter(args) {
this.statusOut = ptrArg(args, 3);
this.serverHandle = intArg(args, 1);
this.itemHandle = intArg(args, 2);
emit({
event: "mx." + eventVerb + ".begin",
module: "LmxProxy.dll",
name,
address: address.toString(),
ecx: this.context.ecx ? this.context.ecx.toString() : "",
serverHandle: this.serverHandle,
itemHandle: this.itemHandle,
statusOutPtr: this.statusOut.toString()
});
},
onLeave(retval) {
emit({
event: "mx." + eventVerb + ".end",
module: "LmxProxy.dll",
name,
retval: retval.toString(),
serverHandle: this.serverHandle,
itemHandle: this.itemHandle,
status: readMxStatusOut(this.statusOut)
});
}
};
});
}
function hookSuspend() {
// FUN_10013d9c, RVA 0x13d9c; matched on the
// `CLMXProxyServer::Suspend - Server Handle ` string xref in
// analysis/ghidra/exports/LmxProxy.dll.string-refs.tsv:119.
hookSuspendActivate(0x13d9c, "CLMXProxyServer.Suspend", "suspend");
}
function hookActivate() {
// FUN_10014028, RVA 0x14028; matched on the
// `CLMXProxyServer::Activate - Server Handle ` string xref in
// analysis/ghidra/exports/LmxProxy.dll.string-refs.tsv:122.
hookSuspendActivate(0x14028, "CLMXProxyServer.Activate", "activate");
}
function hookAuthenticateUser() { function hookAuthenticateUser() {
hook("LmxProxy.dll", 0x1399f, "CLMXProxyServer.AuthenticateUser", function (address, module) { hook("LmxProxy.dll", 0x1399f, "CLMXProxyServer.AuthenticateUser", function (address, module) {
return { return {
@@ -452,6 +575,12 @@ function installKnownHooks() {
hookPlainArgs("LmxProxy.dll", 0x1121d, "CLMXProxyServer.AddBufferedItem", 5); hookPlainArgs("LmxProxy.dll", 0x1121d, "CLMXProxyServer.AddBufferedItem", 5);
hookPlainArgs("LmxProxy.dll", 0x0fc80, "CLMXProxyServer.SetBufferedUpdateInterval", 3); hookPlainArgs("LmxProxy.dll", 0x0fc80, "CLMXProxyServer.SetBufferedUpdateInterval", 3);
hookPlainArgs("LmxProxy.dll", 0x142b4, "CLMXProxyServer.AdviseSupervisory", 5); hookPlainArgs("LmxProxy.dll", 0x142b4, "CLMXProxyServer.AdviseSupervisory", 5);
// F46: Suspend / Activate wire-side instrumentation. No `Resume` / `Reactivate`
// exports exist in LmxProxy.dll's symbol table — verified against
// analysis/ghidra/exports/LmxProxy.dll.ghidra.md and the
// ILMXProxyServer5 / ILMXProxyServer4 decompiled interfaces.
hookSuspend();
hookActivate();
hookPlainArgs("LmxProxy.dll", 0x163c0, "CProxy_ILMXProxyServerEvents2.Fire_OnBufferedDataChange", 8); hookPlainArgs("LmxProxy.dll", 0x163c0, "CProxy_ILMXProxyServerEvents2.Fire_OnBufferedDataChange", 8);
hookPlainArgs("LmxProxy.dll", 0x16b50, "CUserConnectionCallback.OnSetAttributeResult", 4); hookPlainArgs("LmxProxy.dll", 0x16b50, "CUserConnectionCallback.OnSetAttributeResult", 4);
hookPlainArgs("LmxProxy.dll", 0x16d4b, "CUserConnectionCallback.OperationComplete", 4); hookPlainArgs("LmxProxy.dll", 0x16d4b, "CUserConnectionCallback.OperationComplete", 4);
+19 -6
View File
@@ -60,12 +60,13 @@ The `OnBufferedDataChange` **public event shape** the wwtools api-notes describe
**Settles when:** indefinitely deferred — see Open evidence gaps table. Settle criteria depends on the same Ghidra mapping table as R3, which does not exist in `analysis/ghidra/` and has no owner. Reopen if a future capture or decompiled output produces evidence. **Settles when:** indefinitely deferred — see Open evidence gaps table. Settle criteria depends on the same Ghidra mapping table as R3, which does not exist in `analysis/ghidra/` and has no owner. Reopen if a future capture or decompiled output produces evidence.
### R5 — Activate / Suspend behaviour **(partially observed — F44 documented client-side trigger; wire-side residual gap filed as F45)** ### R5 — Activate / Suspend behaviour **(partially observed — F44 documented client-side trigger; wire-side residual gap filed as F46, hook landed pending live re-run)**
**Severity: P2** (downgraded from P1 — client-side acceptance criteria are **Severity: P2** (downgraded from P1 — client-side acceptance criteria are
now documented; LMX-proxy wire emission remains unconfirmed) now documented; LMX-proxy wire emission remains unconfirmed)
**Status (2026-05-06): PARTIALLY OBSERVED.** F44's evidence walk on **Status (2026-05-06): PARTIALLY OBSERVED — Frida hooks ready, live capture pending.**
F44's evidence walk on
`captures/077-frida-suspend-advised-scanstate/` (per `docs/M6-buffered-evidence.md`) `captures/077-frida-suspend-advised-scanstate/` (per `docs/M6-buffered-evidence.md`)
documents: documents:
@@ -83,15 +84,27 @@ What capture 077 could **not** answer: whether the production
(e.g. an `ILMXProxyServer5` opnum) or also handles them client-side. Capture (e.g. an `ILMXProxyServer5` opnum) or also handles them client-side. Capture
077's Frida script did not hook 077's Frida script did not hook
`LmxProxy.dll!CLMXProxyServer.Suspend`/`.Activate`, so the wire-side `LmxProxy.dll!CLMXProxyServer.Suspend`/`.Activate`, so the wire-side
behaviour is invisible. Filed as **F45** in `design/followups.md` to behaviour is invisible.
re-instrument and capture.
**Next step — F46.** `analysis/frida/mx-nmx-trace.js` now carries
`Interceptor.attach` blocks for `LmxProxy.dll!CLMXProxyServer.Suspend`
(RVA `0x13d9c`, `FUN_10013d9c`) and `.Activate` (RVA `0x14028`,
`FUN_10014028`), emitting `mx.suspend.begin/end` and
`mx.activate.begin/end` events with the `MxStatus*` out-parameter
decoded as 4 × int16. No `Resume` / `Reactivate` symbols exist in
`LmxProxy.dll` — verified against
`analysis/ghidra/exports/LmxProxy.dll.ghidra.md` and the decompiled
`ILMXProxyServer5` / `ILMXProxyServer4` interfaces. R5 stays open
until a live re-run on the AVEVA host produces
`captures/NNN-frida-suspend-activate-instrumented/` per the procedure
documented at the top of `analysis/frida/mx-nmx-trace.js`.
**Current best answer:** expose `Session::suspend(item)` and **Current best answer:** expose `Session::suspend(item)` and
`Session::activate(item)` returning `Result<MxStatus, Error>`. The success `Session::activate(item)` returning `Result<MxStatus, Error>`. The success
criteria match the .NET reference's client-side gating: the item must have criteria match the .NET reference's client-side gating: the item must have
an active subscription. If F45's wire capture later proves the LMX proxy an active subscription. If F46's wire capture later proves the LMX proxy
issues a separate ORPC method, add the wire emission here in M6 follow-up. issues a separate ORPC method, add the wire emission here in M6 follow-up.
Do not build callback-driven state transitions on top until F45 settles. Do not build callback-driven state transitions on top until F46 settles.
**Settles when:** F45 produces a Frida capture instrumenting **Settles when:** F45 produces a Frida capture instrumenting
`LmxProxy.dll!CLMXProxyServer.Suspend` / `.Activate` and either confirms a `LmxProxy.dll!CLMXProxyServer.Suspend` / `.Activate` and either confirms a
+9 -28
View File
@@ -6,34 +6,6 @@ move to `## Resolved` with a date + commit hash.
## Open ## Open
### F45 — Recovery replay should re-issue `RegisterReference` for buffered subscriptions
**Severity:** P2 — F36 buffered subscriptions survive across `recover_connection` only via `AdviseSupervisory` replay, which loses the `.property(buffer)` registration.
**Source:** `crates/mxaccess/src/session.rs::recover_connection_core` (the loop iterates `subscriptions` and replays via `advise_supervisory`).
**Depends on:** F36 (closed by the same iteration as this followup is filed).
**Scope.** `Session::subscribe_buffered` records its `Subscription` in the same `SessionInner::subscriptions` registry as plain `subscribe` does, so the registry-walking recovery loop replays them via `AdviseSupervisory` rather than `RegisterReference` with `.property(buffer)`. The metadata stored in `SubscriptionEntry` is the original (un-suffixed) tag's `GalaxyTagMetadata`; the buffered name suffix is lost on replay. The server may continue to deliver values under the existing `.property(buffer)` registration on the engine side because the OBJREF / engine id pair survives the rebuild — but if the server tears the buffered registration down on disconnect, recovery will silently downgrade buffered → plain.
**Definition of done:**
1. `SubscriptionEntry` gains a discriminator (`enum SubscriptionMode { Plain, Buffered { rounded_interval_ms: u32 } }`) so recovery can branch on the original advise shape.
2. The buffered branch in `recover_connection_core` rebuilds the original `NmxReferenceRegistrationMessage` (with `.property(buffer)` suffix + the saved correlation id + `subscribe = true`) and dispatches `register_reference` against the rebuilt transport.
3. Live regression: `cargo run -p mxaccess --example subscribe-buffered` against AVEVA, then force a recovery via `Session::recover_connection`, and confirm subsequent `OnBufferedDataChange`-rate updates continue at the same cadence.
**Resolves when:** the recovery path treats buffered subscriptions identically to how the original advise was issued.
### F46 — Capture `LmxProxy.dll!CLMXProxyServer.Suspend`/`.Activate` wire emission
**Severity:** P3 — residual gap from F44's R5 walk.
**Source:** `design/70-risks-and-open-questions.md` R5 + `docs/M6-buffered-evidence.md` (capture 077 section) + `captures/077-frida-suspend-advised-scanstate/frida-events.tsv:2-17` (Frida hook list).
**Scope.** Capture 077 confirmed the .NET-reference compatibility-server's client-side gating for `Suspend` (must have an active subscription; returns `MxStatus.SuspendPending` synchronously) but did not instrument `LmxProxy.dll!CLMXProxyServer.Suspend` / `.Activate`. Open question: does the production LMX proxy issue a separate ORPC method for these, or does it also synthesise the response client-side?
**Definition of done:**
1. Extend `analysis/frida/mx-nmx-trace.js` to `Interceptor.attach` on `LmxProxy.dll!CLMXProxyServer.Suspend` and `.Activate` (and any sibling `Resume` / `Reactivate` if present in the export table). Mirror the existing `AdviseSupervisory` hook shape.
2. Re-run the `suspend-advised` scenario against `TestChildObject.ScanState`, plus a fresh `activate-advised` scenario, save under `captures/NNN-frida-suspend-activate-instrumented/`.
3. If a wire emission appears (PutRequest + TransferData with a new opnum or body shape): document it in `docs/M6-buffered-evidence.md` and `analysis/proxy/nmxsvcps-procedures.tsv`; add typed decode if the inner body is novel.
4. If no wire emission appears: confirm both operations are purely client-side and update R5 to "fully settled — client-side only".
**Resolves when:** R5 is fully settled (either with a documented wire opnum or a "client-side only" verdict backed by capture).
### F3 — Cross-domain NTLM Type1/2/3 fixture ### F3 — Cross-domain NTLM Type1/2/3 fixture
**Severity:** P2 **Severity:** P2
**Status:** Permanently out-of-scope on the current dev host (no second AD domain). Resolution requires external infrastructure not available here. **Status:** Permanently out-of-scope on the current dev host (no second AD domain). Resolution requires external infrastructure not available here.
@@ -45,6 +17,15 @@ move to `## Resolved` with a date + commit hash.
## Resolved ## Resolved
### F47 — `Session::unsubscribe` should skip `UnAdvise` for buffered subscriptions
**Resolved:** 2026-05-06 (commit `1a1830f`). `Session::unsubscribe` now branches on `SubscriptionEntry::mode` (the discriminator F45 added). For `SubscriptionMode::Buffered { ... }`, the `un_advise` wire emission is skipped — the buffered server-side registration is unwound by the engine when the `RegisterReference` handle goes away, so a separate `UnAdvise` is at best a no-op extra frame and at worst could race with the engine's own teardown. Mirrors the .NET reference's `if (!subscription.IsBuffered)` guard at `MxNativeSession.cs:361-381`. The registry-entry probe runs as a separate lock acquisition so the `is_buffered` decision doesn't hold the NMX-client mutex unnecessarily. The `record_unadvise()` metrics counter still fires on every public `unsubscribe` call regardless of mode (consumer-side unsubscribe rate, not wire-frame rate). New unit test `unsubscribe_skips_un_advise_for_buffered_subscription` issues a plain subscribe (recorded as 1 RPC), mutates the registry entry to `SubscriptionMode::Buffered`, calls unsubscribe, and asserts the recorded RPC count stays at 1 (no UnAdvise emitted). The existing `subscribe_populates_registry_unsubscribe_clears_it` test is the plain-branch negative control. Workspace 794 → 795 tests; clippy + rustdoc clean.
### F45 — Recovery replay should re-issue `RegisterReference` for buffered subscriptions
**Resolved:** 2026-05-06 (commit `9b57cf8`). New `pub(crate) enum SubscriptionMode { Plain, Buffered { rounded_interval_ms, item_definition, item_context, item_handle } }` discriminator on `SubscriptionEntry`. `Session::subscribe` (plain path) records `SubscriptionMode::Plain`; `subscribe_buffered_nmx` records `SubscriptionMode::Buffered { ... }` carrying the un-suffixed reference + the rounded interval (so the re-issued buffered registration matches the original cadence). `recover_connection_core` matches on `entry.mode`: plain branch unchanged; buffered branch re-applies `.property(buffer)` via `to_buffered_item_definition` (idempotent), rebuilds the original `NmxReferenceRegistrationMessage` with the saved correlation id + `subscribe = true`, and dispatches `register_reference` (kind=ItemControl, inner command `0x10`) against the replacement transport. Mirrors `MxNativeSession.ReAdviseSubscription` (`MxNativeSession.cs:538-569`). New unit test `recover_connection_replays_buffered_subscription_via_register_reference` synthesises a buffered registry entry, installs a `RebuildFactory` pointing at a recording NMX server, drives `recover_connection`, then asserts the recorded `TransferData` carries inner command `0x10` (NOT `0x1f`) with the `.property(buffer)`-suffixed item_definition + the saved correlation id + subscribe=true. Public API unchanged (`SubscriptionMode` + `SubscriptionEntry` stay `pub(crate)`); `cargo public-api -p mxaccess` baseline unchanged. Workspace 793 → 794 tests; clippy + rustdoc clean. Side-finding spawned **F47** (`Session::unsubscribe` divergence on buffered drop).
### F46 — Capture `LmxProxy.dll!CLMXProxyServer.Suspend`/`.Activate` wire emission
**Resolved:** 2026-05-06 (commit `808fea1`). `analysis/frida/mx-nmx-trace.js` extended with `Interceptor.attach` hooks on `LmxProxy.dll!CLMXProxyServer.Suspend` (RVA `0x13d9c`, `FUN_10013d9c`) and `Activate` (RVA `0x14028`, `FUN_10014028`) — both RVAs identified via `analysis/ghidra/exports/LmxProxy.dll.string-refs.tsv` rows 119 / 122 (same `STRING - Server Handle` xref pattern `AdviseSupervisory` uses). Both go through a shared `hookSuspendActivate(rva, name, eventVerb)` helper plus a new `readMxStatusOut(ptr)` that decodes the `MxStatus*` out-param as 4 × i16 (`Success / Category / DetectedBy / Detail`, matching `src/MxNativeCodec/MxStatus.cs`). Hooks emit `mx.suspend.begin/end` and `mx.activate.begin/end` events for grep-ability. **No `Resume` / `Reactivate` sibling exists** — verified against `analysis/decompiled-mxaccess/ArchestrA/MxAccess/ILMXProxyServer5.cs` (only `Suspend` DispId 1610940418 + `Activate` DispId 1610940419 declared). Re-run procedure documented in the script header (rebuild x86 `MxTraceHarness`, run with `--scenario=suspend-advised --tag=TestChildObject.ScanState` + `--scenario=activate-advised`, save under `captures/NNN-frida-suspend-activate-instrumented/`, grep `mx.suspend.*` / `mx.activate.*` and correlate with `nmx.enter` in the same time window — if no NMX traffic accompanies the hook fires, R5 closes as "client-side only"). R5 in `design/70-risks-and-open-questions.md` updated to point at F46 as the next-step. Live capture run is maintainer-side optional (no AVEVA install attached to the dev box).
### F41 — `cargo public-api` baseline ### F41 — `cargo public-api` baseline
**Resolved:** 2026-05-06 (commit `9e57bfd`). Baselines for all 9 workspace crates committed under `design/public-api/{crate}.txt`, generated via `cargo +nightly public-api --simplified -p <crate>`. Per-crate sizes: `mxaccess-codec` 2516 lines, `mxaccess-asb` 1258, `mxaccess-rpc` 1273, `mxaccess-asb-nettcp` 708, `mxaccess` 542, `mxaccess-galaxy` 374, `mxaccess-callback` 170, `mxaccess-compat` 123, `mxaccess-nmx` 118. `design/public-api/README.md` documents the update procedure (install nightly + cargo-public-api, regenerate the affected baseline on intentional API changes, commit alongside). `.github/workflows/rust.yml` gains a `public-api` job that runs the same diff against the committed baseline; drift fails CI with a unified diff in the log so the PR author can either revert or update the baseline. **Resolved:** 2026-05-06 (commit `9e57bfd`). Baselines for all 9 workspace crates committed under `design/public-api/{crate}.txt`, generated via `cargo +nightly public-api --simplified -p <crate>`. Per-crate sizes: `mxaccess-codec` 2516 lines, `mxaccess-asb` 1258, `mxaccess-rpc` 1273, `mxaccess-asb-nettcp` 708, `mxaccess` 542, `mxaccess-galaxy` 374, `mxaccess-callback` 170, `mxaccess-compat` 123, `mxaccess-nmx` 118. `design/public-api/README.md` documents the update procedure (install nightly + cargo-public-api, regenerate the affected baseline on intentional API changes, commit alongside). `.github/workflows/rust.yml` gains a `public-api` job that runs the same diff against the committed baseline; drift fails CI with a unified diff in the log so the PR author can either revert or update the baseline.
+522 -47
View File
@@ -372,10 +372,71 @@ pub struct SessionInner {
/// Per-subscription state retained for [`Session::recover_connection`]. /// Per-subscription state retained for [`Session::recover_connection`].
/// The full `Subscription` handle stays with the consumer and continues /// The full `Subscription` handle stays with the consumer and continues
/// to receive broadcasts across the swap; this struct just preserves /// to receive broadcasts across the swap; this struct just preserves
/// the inputs `advise_supervisory` needs. /// the inputs the recovery loop needs to reissue the original advise.
///
/// `mode` discriminates the original advise shape (plain
/// `AdviseSupervisory` vs buffered `RegisterReference` with the
/// `.property(buffer)` suffix) so the recovery branch can re-issue
/// the matching wire op — see F45 in `design/followups.md` for the
/// motivation. Mirrors `MxNativeSubscription.IsBuffered`
/// (`MxNativeSession.cs:59`) + the `ReAdviseSubscription` branch
/// (`MxNativeSession.cs:538-569`).
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub(crate) struct SubscriptionEntry { pub(crate) struct SubscriptionEntry {
pub(crate) metadata: Arc<mxaccess_galaxy::GalaxyTagMetadata>, pub(crate) metadata: Arc<mxaccess_galaxy::GalaxyTagMetadata>,
pub(crate) mode: SubscriptionMode,
}
/// Discriminator for [`SubscriptionEntry`] — captures the original
/// advise shape so the recovery loop re-issues the same wire op.
///
/// - [`SubscriptionMode::Plain`] entries replay via
/// `INmxService2::AdviseSupervisory` (matches the original
/// `Session::subscribe` path).
/// - [`SubscriptionMode::Buffered`] entries replay via
/// `INmxService2::RegisterReference` with the `.property(buffer)`-
/// suffixed item definition + the saved correlation id +
/// `subscribe = true` (matches the original
/// `Session::subscribe_buffered` path; mirrors
/// `MxNativeSession.ReAdviseSubscription` `cs:538-558`).
#[derive(Debug, Clone)]
pub(crate) enum SubscriptionMode {
/// Plain `AdviseSupervisory`-issued subscription. Recovery replays
/// it via `advise_supervisory`.
Plain,
/// Buffered `RegisterReference`-issued subscription. Recovery
/// rebuilds the original `NmxReferenceRegistrationMessage` and
/// dispatches `register_reference` so the server-side buffered
/// registration survives the transport swap.
///
/// The fields preserve the inputs `subscribe_buffered_nmx` used:
/// the un-suffixed `item_definition` (re-suffixed via
/// [`NmxReferenceRegistrationMessage::to_buffered_item_definition`]
/// on replay), the `item_context` (currently always empty —
/// reserved for the compat-server F35 split-context form), the
/// `item_handle` (currently always 0), and the rounded-up cadence
/// in milliseconds (informational; native MXAccess does not transmit
/// it on the wire — see capture 082).
Buffered {
/// Cadence in milliseconds, already rounded up to the nearest
/// 100 ms via [`crate::BufferedOptions::rounded_update_interval_ms`].
/// Carried through recovery so a future SetBufferedUpdateInterval
/// transmission could be wired without losing the original
/// cadence.
rounded_interval_ms: u32,
/// Un-suffixed item definition as supplied to
/// [`Session::subscribe_buffered`]. The `.property(buffer)`
/// suffix is re-applied on replay via
/// [`NmxReferenceRegistrationMessage::to_buffered_item_definition`].
item_definition: String,
/// Item context (compat-server split form). Empty when
/// `subscribe_buffered` is called directly with a single
/// `reference` argument.
item_context: String,
/// Item handle (LMX-side identifier). 0 when the compat-server
/// layer has not assigned one.
item_handle: i32,
},
} }
/// F16 — pluggable factory that produces a fresh [`NmxClient`]. /// F16 — pluggable factory that produces a fresh [`NmxClient`].
@@ -742,9 +803,14 @@ impl Session {
/// server side knows about the same local callback exporter. /// server side knows about the same local callback exporter.
/// 3. Re-run `SetHeartbeatSendInterval` if configured. /// 3. Re-run `SetHeartbeatSendInterval` if configured.
/// 4. Walk the [`SessionInner::subscriptions`] registry and re-issue /// 4. Walk the [`SessionInner::subscriptions`] registry and re-issue
/// `AdviseSupervisory` for every active subscription (each /// the matching wire op for every active subscription. Plain
/// entries replay via `AdviseSupervisory`; buffered entries
/// (F45) replay via `RegisterReference` with the
/// `.property(buffer)` suffix + the saved correlation id +
/// `subscribe = true` — mirroring
/// `MxNativeSession.ReAdviseSubscription` (`cs:538-569`). Each
/// correlation_id is preserved so the consumer's `Subscription` /// correlation_id is preserved so the consumer's `Subscription`
/// handle keeps receiving on its existing broadcast filter). /// handle keeps receiving on its existing broadcast filter.
/// 5. Atomically swap the inner mutex's `NmxClient` so the old one /// 5. Atomically swap the inner mutex's `NmxClient` so the old one
/// drops at end-of-scope. /// drops at end-of-scope.
/// ///
@@ -786,12 +852,19 @@ impl Session {
} }
} }
// Step 4: replay every active subscription's AdviseSupervisory // Step 4: replay every active subscription against the
// against the replacement transport. Snapshot under the lock, // replacement transport. Plain entries → `AdviseSupervisory`;
// then drop the lock so subscribe()/un_advise() can race with // buffered entries (F45) → `RegisterReference` with the
// recovery without deadlocking. The atomic swap below installs // `.property(buffer)` suffix + saved correlation id +
// the replacement before we yield; after that, any new // `subscribe = true`. Mirrors
// subscribe() call will see the registry+replacement pair. // `MxNativeSession.ReAdviseSubscription` (`cs:538-569`) which
// branches on `subscription.IsBuffered`.
//
// Snapshot under the lock, then drop it so subscribe() /
// un_advise() can race with recovery without deadlocking. The
// atomic swap below installs the replacement before we yield;
// after that, any new subscribe() call will see the
// registry+replacement pair.
let snapshot: Vec<([u8; 16], SubscriptionEntry)> = { let snapshot: Vec<([u8; 16], SubscriptionEntry)> = {
let registry = self.inner.subscriptions.lock().await; let registry = self.inner.subscriptions.lock().await;
registry registry
@@ -800,18 +873,68 @@ impl Session {
.collect() .collect()
}; };
for (correlation_id, entry) in &snapshot { for (correlation_id, entry) in &snapshot {
let hr = replacement match &entry.mode {
.advise_supervisory( SubscriptionMode::Plain => {
opts.local_engine_id, let hr = replacement
&entry.metadata, .advise_supervisory(
*correlation_id, opts.local_engine_id,
opts.galaxy_id, &entry.metadata,
i32::from(opts.galaxy_id), *correlation_id,
opts.source_platform_id, opts.galaxy_id,
) i32::from(opts.galaxy_id),
.await opts.source_platform_id,
.map_err(map_nmx)?; )
ensure_hresult_ok(hr)?; .await
.map_err(map_nmx)?;
ensure_hresult_ok(hr)?;
}
SubscriptionMode::Buffered {
rounded_interval_ms: _rounded_interval_ms,
item_definition,
item_context,
item_handle,
} => {
// F45: rebuild the original buffered registration
// body. The codec helper re-applies the
// `.property(buffer)` suffix idempotently — passing
// an already-suffixed name returns it unchanged
// (verified by
// `request_to_buffered_item_definition_idempotent_case_insensitive`
// in `mxaccess-codec`).
let buffered_def =
NmxReferenceRegistrationMessage::to_buffered_item_definition(
item_definition,
)
.map_err(|e| {
Error::Configuration(ConfigError::InvalidArgument {
detail: format!(
"recovery: buffered item definition: {e}"
),
})
})?;
let registration = NmxReferenceRegistrationMessage {
item_handle: *item_handle,
item_correlation_id: *correlation_id,
item_definition: buffered_def,
item_context: item_context.clone(),
subscribe: true,
reserved_25_27: [0; 2],
reserved_31_55: [0; 24],
};
let hr = replacement
.register_reference(
opts.local_engine_id,
&entry.metadata,
&registration,
opts.galaxy_id,
i32::from(opts.galaxy_id),
opts.source_platform_id,
)
.await
.map_err(map_nmx)?;
ensure_hresult_ok(hr)?;
}
}
} }
// Step 5: atomic swap. The previous NmxClient drops at end of // Step 5: atomic swap. The previous NmxClient drops at end of
@@ -1106,6 +1229,7 @@ impl Session {
correlation_id, correlation_id,
SubscriptionEntry { SubscriptionEntry {
metadata: Arc::clone(&metadata_arc), metadata: Arc::clone(&metadata_arc),
mode: SubscriptionMode::Plain,
}, },
); );
reg.len() reg.len()
@@ -1165,8 +1289,10 @@ impl Session {
// computed for parity with the .NET reference; it is currently // computed for parity with the .NET reference; it is currently
// not transmitted on the wire because native MXAccess holds it // not transmitted on the wire because native MXAccess holds it
// client-side only (see capture 082's missing // client-side only (see capture 082's missing
// `SetBufferedUpdateInterval` frame). // `SetBufferedUpdateInterval` frame). F45 stashes the rounded
let _rounded_ms = options.rounded_update_interval_ms(); // cadence on the registry entry so a future SetBufferedUpdateInterval
// transmission could be wired without re-deriving it on replay.
let rounded_ms = options.rounded_update_interval_ms();
let inner = self.inner.clone(); let inner = self.inner.clone();
let metadata = inner let metadata = inner
@@ -1221,20 +1347,25 @@ impl Session {
drop(nmx); drop(nmx);
let metadata_arc = Arc::new(metadata); let metadata_arc = Arc::new(metadata);
// Record the active subscription so recover_connection can replay // F45: tag the registry entry as Buffered with the original
// it after a transport rebuild. The replay path currently uses // `(item_definition, item_context, item_handle)` triple +
// `AdviseSupervisory` for every entry; for buffered subscriptions // rounded cadence, so `recover_connection_core` can rebuild the
// that path is functionally equivalent (the LMX server already // matching `NmxReferenceRegistrationMessage` and dispatch
// remembers the buffered registration via the `.property(buffer)` // `register_reference` (mirrors `MxNativeSession.ReAdviseSubscription`
// suffix carried in the metadata's name). Tracked as a sub-followup // `cs:540-558` — the .NET reference's recovery branch on
// — see `design/followups.md` if a future iteration wants to // `subscription.IsBuffered`).
// re-issue `RegisterReference` instead.
let registry_size = { let registry_size = {
let mut reg = inner.subscriptions.lock().await; let mut reg = inner.subscriptions.lock().await;
reg.insert( reg.insert(
correlation_id, correlation_id,
SubscriptionEntry { SubscriptionEntry {
metadata: Arc::clone(&metadata_arc), metadata: Arc::clone(&metadata_arc),
mode: SubscriptionMode::Buffered {
rounded_interval_ms: rounded_ms,
item_definition: reference.to_string(),
item_context: String::new(),
item_handle: 0,
},
}, },
); );
reg.len() reg.len()
@@ -1342,30 +1473,55 @@ impl Session {
self.ensure_connected()?; self.ensure_connected()?;
let inner = self.inner.clone(); let inner = self.inner.clone();
let opts = &inner.options; let opts = &inner.options;
let mut nmx = inner.nmx.lock().await;
let hr = nmx // F47 — `Session::unsubscribe` skips `UnAdvise` for buffered
.un_advise( // subscriptions, mirroring the .NET reference's
opts.local_engine_id, // `if (!subscription.IsBuffered)` guard at
&subscription.metadata, // `MxNativeSession.cs:361-381`. Buffered subscriptions are
subscription.correlation_id, // unwound by the engine when the `RegisterReference` handle
opts.galaxy_id, // goes away — there's no item-level advise to retract.
/* source_galaxy_id */ i32::from(opts.galaxy_id), // Probe the registry first so we know which mode the
opts.source_platform_id, // subscription was registered under.
let is_buffered = {
let reg = inner.subscriptions.lock().await;
matches!(
reg.get(&subscription.correlation_id),
Some(entry) if matches!(entry.mode, SubscriptionMode::Buffered { .. })
) )
.await };
.map_err(map_nmx)?;
ensure_hresult_ok(hr)?; if !is_buffered {
drop(nmx); let mut nmx = inner.nmx.lock().await;
let hr = nmx
.un_advise(
opts.local_engine_id,
&subscription.metadata,
subscription.correlation_id,
opts.galaxy_id,
/* source_galaxy_id */ i32::from(opts.galaxy_id),
opts.source_platform_id,
)
.await
.map_err(map_nmx)?;
ensure_hresult_ok(hr)?;
drop(nmx);
}
// F16: drop the subscription from the recovery registry too. // F16: drop the subscription from the recovery registry too.
// We do this only on the success path — if UnAdvise itself // For plain entries, we do this only on the UnAdvise success
// failed, the server may still hold the supervisory record and // path — if UnAdvise itself failed, the server may still hold
// a future recover_connection should re-issue the advise. // the supervisory record and a future recover_connection should
// re-issue the advise. Buffered entries always reach here
// because no UnAdvise was attempted.
let registry_size = { let registry_size = {
let mut reg = inner.subscriptions.lock().await; let mut reg = inner.subscriptions.lock().await;
reg.remove(&subscription.correlation_id); reg.remove(&subscription.correlation_id);
reg.len() reg.len()
}; };
// F40 — count the unadvise + update the gauge. // F40 — count the unadvise + update the gauge.
// For buffered entries no UnAdvise was emitted, but the
// counter still tracks consumer-side unsubscribe events so
// the rate matches the public API's call rate.
session_metrics::record_unadvise(); session_metrics::record_unadvise();
session_metrics::set_registered_items(registry_size); session_metrics::set_registered_items(registry_size);
Ok(()) Ok(())
@@ -2496,6 +2652,262 @@ mod tests {
handle.await.unwrap(); handle.await.unwrap();
} }
/// Per-RPC capture: `(opnum, stub_data)` for each Request PDU the
/// client dispatched against a [`recording_server`].
type RecordedRpc = (u16, Vec<u8>);
/// Shared, mutable log of [`RecordedRpc`]s appended to by a
/// [`recording_server`] task.
type RecordedRpcLog = Arc<StdMutex<Vec<RecordedRpc>>>;
/// Same as [`unauthenticated_server`] but additionally records every
/// incoming Request PDU's `(opnum, stub_data)` so a test can assert
/// which RPC the client dispatched. Used by the F45 buffered-recovery
/// branch test below.
async fn recording_server(
responses: Vec<(i32, Vec<u8>)>,
) -> (SocketAddr, RecordedRpcLog, tokio::task::JoinHandle<()>) {
let listener = TcpListener::bind(local_addr()).await.unwrap();
let addr = listener.local_addr().unwrap();
let recorded: RecordedRpcLog = Arc::new(StdMutex::new(Vec::new()));
let recorded_for_task = Arc::clone(&recorded);
let handle = tokio::spawn(async move {
let (mut sock, _) = listener.accept().await.unwrap();
// Drain Bind, reply BindAck.
let mut hdr = [0u8; 16];
sock.read_exact(&mut hdr).await.unwrap();
let bind_h = PduHeader::decode(&hdr).unwrap();
let mut body = vec![0u8; bind_h.fragment_length as usize - 16];
sock.read_exact(&mut body).await.unwrap();
let resp_h = PduHeader {
version: 5,
version_minor: 0,
packet_type: PacketType::BindAck,
packet_flags: 0x03,
data_representation: 0x10,
fragment_length: 16,
auth_length: 0,
call_id: bind_h.call_id,
};
let mut out = [0u8; 16];
resp_h.encode(&mut out).unwrap();
sock.write_all(&out).await.unwrap();
for (custom_hresult, extra_payload) in responses {
sock.read_exact(&mut hdr).await.unwrap();
let req_h = PduHeader::decode(&hdr).unwrap();
let mut body = vec![0u8; req_h.fragment_length as usize - 16];
sock.read_exact(&mut body).await.unwrap();
// body layout (Request PDU minus the 16-byte common
// header): 4 alloc_hint, 2 context_id, 2 opnum, then
// optional 16-byte object UUID (when PFC_OBJECT_UUID
// = 0x80 is set in packet_flags), then stub_data.
let opnum = u16::from_le_bytes([body[6], body[7]]);
let pfc_object_uuid = (req_h.packet_flags & 0x80) != 0;
let stub_offset = if pfc_object_uuid { 8 + 16 } else { 8 };
let stub = body[stub_offset..].to_vec();
recorded_for_task
.lock()
.unwrap()
.push((opnum, stub));
let mut stub_resp = Vec::new();
stub_resp.extend_from_slice(&OrpcThat::default().encode());
stub_resp.extend_from_slice(&custom_hresult.to_le_bytes());
stub_resp.extend_from_slice(&extra_payload);
let response = ResponsePdu {
header: PduHeader {
version: 5,
version_minor: 0,
packet_type: PacketType::Response,
packet_flags: 0x03,
data_representation: 0x10,
fragment_length: 0,
auth_length: 0,
call_id: req_h.call_id,
},
allocation_hint: stub_resp.len() as u32,
context_id: 0,
cancel_count: 0,
reserved23: 0,
stub_data: stub_resp,
};
let bytes = response.encode();
sock.write_all(&bytes).await.unwrap();
}
});
(addr, recorded, handle)
}
#[tokio::test]
async fn recover_connection_replays_buffered_subscription_via_register_reference() {
use crate::RecoveryPolicy;
// F45: a buffered SubscriptionEntry must replay through
// `RegisterReference` (with `.property(buffer)` suffix + the
// saved correlation id + `subscribe = true`) — NOT through
// `AdviseSupervisory`. Synthesise the entry directly so we don't
// need a live `subscribe_buffered` round-trip; drive the recovery
// path against a recording mock and inspect the wire bytes.
// Mock that the original session connection talked to. Drained
// immediately — connect_test_session doesn't issue any calls.
let (addr_orig, handle_orig) = unauthenticated_server(Vec::new()).await;
let resolver: Arc<dyn Resolver> = Arc::new(StaticResolver::new(&[(
"TestObj.TestInt",
sample_metadata(),
)]));
let session = connect_test_session(addr_orig, resolver).await.unwrap();
// Inject a buffered registry entry. Correlation id is fixed so
// we can assert the rebuilt registration carries the same id.
let cid: [u8; 16] = [0xAB; 16];
let buffered_ref = "TestObj.TestInt";
{
let mut reg = session.inner.subscriptions.lock().await;
reg.insert(
cid,
SubscriptionEntry {
metadata: Arc::new(sample_metadata()),
mode: SubscriptionMode::Buffered {
rounded_interval_ms: 1000,
item_definition: buffered_ref.to_string(),
item_context: String::new(),
item_handle: 0,
},
},
);
}
// Recording server that the rebuild factory will hand the
// session a fresh NmxClient pointing at. The recovery loop fires
// exactly two RPCs against this transport:
// 1. RegisterEngine2 (HRESULT 0)
// 2. TransferData carrying the rebuilt RegisterReference
// (HRESULT 0)
// The assertions below decode the second stub and pin the
// envelope kind + inner registration shape.
let (addr_replacement, recorded, handle_replacement) =
recording_server(vec![(0, Vec::new()), (0, Vec::new())]).await;
let factory: crate::RebuildFactory = Arc::new(move || {
let addr = addr_replacement;
Box::pin(async move {
let mut transport = DceRpcTcpClient::connect(addr).await.map_err(|e| {
mxaccess_nmx::NmxClientError::Transport(
mxaccess_rpc::transport::TransportError::Io(std::io::Error::other(
e.to_string(),
)),
)
})?;
transport.bind(svc::INTERFACE_ID, 0, 0).await.map_err(|e| {
mxaccess_nmx::NmxClientError::Transport(
mxaccess_rpc::transport::TransportError::Io(std::io::Error::other(
e.to_string(),
)),
)
})?;
Ok(NmxClient::from_bound_transport(
transport,
Guid::new([0xCC; 16]),
))
})
});
session.set_recovery_factory(factory).await;
// Drive the recovery cycle.
session
.recover_connection(RecoveryPolicy {
max_attempts: 1,
delay: std::time::Duration::ZERO,
})
.await
.unwrap();
// Inspect the recorded RPCs against the replacement server.
let recorded = recorded.lock().unwrap().clone();
assert_eq!(
recorded.len(),
2,
"expected RegisterEngine2 + TransferData(RegisterReference); got {}",
recorded.len()
);
// Slot 0 is RegisterEngine2 — pin only the opnum.
assert_eq!(
recorded[0].0,
mxaccess_rpc::nmx_service2_messages::REGISTER_ENGINE_2_OPNUM,
"first replay RPC should be RegisterEngine2"
);
// Slot 1 is TransferData carrying the rebuilt RegisterReference.
let (opnum, stub) = &recorded[1];
assert_eq!(
*opnum,
mxaccess_rpc::nmx_service2_messages::TRANSFER_DATA_OPNUM,
"buffered replay RPC must be TransferData (not, e.g., a no-op)"
);
// The transfer_data stub layout:
// 0..32 OrpcThis
// 32..36 remote_galaxy_id i32 LE
// 36..40 remote_platform_id i32 LE
// 40..44 remote_engine_id i32 LE
// 44..48 length i32 LE
// 48..52 max_count i32 LE
// 52..(52+L) message_body (NmxTransferEnvelope + inner)
let body_offset = 52;
let length = i32::from_le_bytes(stub[44..48].try_into().unwrap()) as usize;
let message_body = &stub[body_offset..body_offset + length];
// Envelope kind at offset 10 of the envelope; ItemControl = 2
// (the codec routes both AdviseSupervisory and RegisterReference
// through ItemControl envelopes — the inner command byte is
// what disambiguates).
let envelope_kind_i32 = i32::from_le_bytes(message_body[10..14].try_into().unwrap());
assert_eq!(
envelope_kind_i32,
NmxTransferMessageKind::ItemControl as i32,
"envelope kind must be ItemControl"
);
// Inner body starts after the 46-byte envelope. The first byte
// distinguishes AdviseSupervisory (0x1f) from RegisterReference
// (0x10) — F45 specifically requires the buffered branch to
// emit the 0x10 form.
let inner = &message_body[NmxTransferEnvelope::HEADER_LEN..];
assert_eq!(
inner[0], 0x10,
"buffered replay must use RegisterReference (command 0x10), not \
AdviseSupervisory (0x1f); got 0x{:02x}",
inner[0]
);
// Decode the inner registration and pin: correlation id matches
// the registry entry's, item_definition carries `.property(buffer)`,
// and `subscribe == true`.
let parsed = NmxReferenceRegistrationMessage::parse(inner).unwrap();
assert_eq!(
parsed.item_correlation_id, cid,
"rebuilt registration must carry the original correlation id"
);
assert!(
parsed
.item_definition
.to_lowercase()
.ends_with(".property(buffer)"),
"rebuilt item_definition must end with .property(buffer); got {:?}",
parsed.item_definition
);
assert!(
parsed.subscribe,
"rebuilt registration must have subscribe = true"
);
handle_replacement.await.unwrap();
handle_orig.abort();
}
#[tokio::test] #[tokio::test]
async fn recover_connection_after_shutdown_returns_engine_not_registered() { async fn recover_connection_after_shutdown_returns_engine_not_registered() {
use crate::RecoveryPolicy; use crate::RecoveryPolicy;
@@ -2702,4 +3114,67 @@ mod tests {
drop(event_tx); drop(event_tx);
let _ = router_h.await; let _ = router_h.await;
} }
/// F47 — `Session::unsubscribe` must NOT emit an `UnAdvise` for
/// buffered subscriptions, mirroring the .NET reference's
/// `if (!subscription.IsBuffered)` guard at
/// `MxNativeSession.cs:361-381`. Two parts:
///
/// - Plain subscribe → unsubscribe records 2 RPCs against the
/// server (AdviseSupervisory + UnAdvise).
/// - Buffered (mutated post-subscribe to flip the registry entry)
/// → unsubscribe records 1 RPC (just the original
/// AdviseSupervisory; no UnAdvise emitted).
///
/// Plain check uses the existing
/// `subscribe_populates_registry_unsubscribe_clears_it` test as
/// the negative control; this test pins the buffered branch.
#[tokio::test]
async fn unsubscribe_skips_un_advise_for_buffered_subscription() {
let (addr, recorded, handle) =
recording_server(vec![(0, Vec::new()), (0, Vec::new())]).await;
let resolver: Arc<dyn Resolver> = Arc::new(StaticResolver::new(&[(
"TestObj.TestInt",
sample_metadata(),
)]));
let session = connect_test_session(addr, resolver).await.unwrap();
// Issue a plain subscribe — server records AdviseSupervisory.
let sub = session.subscribe("TestObj.TestInt").await.unwrap();
let cid = sub.correlation_id;
assert_eq!(recorded.lock().unwrap().len(), 1, "subscribe should issue 1 RPC");
// Mutate the registry entry's mode to Buffered (synthesise the
// state subscribe_buffered_nmx would have produced).
{
let mut reg = session.inner.subscriptions.lock().await;
let entry = reg.get_mut(&cid).expect("registry entry present");
entry.mode = SubscriptionMode::Buffered {
rounded_interval_ms: 1000,
item_definition: "TestObj.TestInt".to_string(),
item_context: String::new(),
item_handle: 0,
};
}
// Unsubscribe the now-buffered entry. F47 contract: NO
// UnAdvise is emitted on the wire; recorded count stays at 1.
session.unsubscribe(sub).await.unwrap();
assert_eq!(
recorded.lock().unwrap().len(),
1,
"buffered unsubscribe must not issue UnAdvise; recorded RPC count must stay at 1 \
(the original AdviseSupervisory)"
);
// Registry is still cleared — F47's skip applies only to the
// wire emission, not the consumer-side bookkeeping.
assert_eq!(
session.inner.subscriptions.lock().await.len(),
0,
"buffered unsubscribe still removes the registry entry"
);
handle.abort();
}
} }