Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 25befcb72e | |||
| 1a1830f3bf | |||
| 9b57cf8f3b | |||
| 2281309a86 | |||
| 808fea18a0 |
@@ -1,5 +1,55 @@
|
||||
// Frida hooks generated from headless Ghidra RVAs.
|
||||
// Usage: frida -f <MxTraceHarness.exe> -l analysis/frida/mx-nmx-trace.js -- <harness args>
|
||||
//
|
||||
// F46 — Suspend / Activate instrumentation procedure
|
||||
// ---------------------------------------------------
|
||||
// The `mx.suspend.*` and `mx.activate.*` events below close the wire-side gap
|
||||
// left by capture 077 (`captures/077-frida-suspend-advised-scanstate/`). The
|
||||
// hooks attach to `LmxProxy.dll!CLMXProxyServer.Suspend` (RVA 0x13d9c, FUN_10013d9c)
|
||||
// and `LmxProxy.dll!CLMXProxyServer.Activate` (RVA 0x14028, FUN_10014028) — the
|
||||
// two RVAs were extracted from `analysis/ghidra/exports/LmxProxy.dll.string-refs.tsv`
|
||||
// (rows tagged `CLMXProxyServer::Suspend - Server Handle` and
|
||||
// `CLMXProxyServer::Activate - Server Handle`). The export table does NOT
|
||||
// expose `Resume` or `Reactivate` symbols anywhere in `LmxProxy.dll`,
|
||||
// `Lmx.dll`, or the `ILMXProxyServer5` interface — verified against
|
||||
// `analysis/ghidra/exports/LmxProxy.dll.ghidra.md` and the decompiled
|
||||
// interface at `analysis/decompiled-mxaccess/ArchestrA/MxAccess/ILMXProxyServer5.cs`.
|
||||
//
|
||||
// To re-run capture 077 with the new hooks active (left for the maintainer
|
||||
// on the live AVEVA host):
|
||||
//
|
||||
// 1. Rebuild the x86 trace harness:
|
||||
// msbuild src\MxTraceHarness\MxTraceHarness.csproj /p:Configuration=Release
|
||||
// 2. Suspend-advised scenario:
|
||||
// frida ^
|
||||
// -f src\MxTraceHarness\bin\Release\net481\MxTraceHarness.exe ^
|
||||
// -l analysis\frida\mx-nmx-trace.js ^
|
||||
// -- --scenario=suspend-advised ^
|
||||
// --tag=TestChildObject.ScanState ^
|
||||
// --write-delay-ms=1000 ^
|
||||
// --duration=3 ^
|
||||
// --log=captures\NNN-frida-suspend-activate-instrumented\harness.log ^
|
||||
// --client=MxFridaTrace-NNN
|
||||
// 3. Activate-advised scenario (re-runs Suspend then Activate):
|
||||
// frida ^
|
||||
// -f src\MxTraceHarness\bin\Release\net481\MxTraceHarness.exe ^
|
||||
// -l analysis\frida\mx-nmx-trace.js ^
|
||||
// -- --scenario=activate-advised ^
|
||||
// --tag=TestChildObject.ScanState ^
|
||||
// --write-delay-ms=1000 ^
|
||||
// --duration=3 ^
|
||||
// --log=captures\NNN-frida-suspend-activate-instrumented\harness.log ^
|
||||
// --client=MxFridaTrace-NNN
|
||||
// 4. Save the resulting `frida-events.tsv` (plus `harness.log`,
|
||||
// `frida-command.txt`, `frida.stdout.jsonl`) under
|
||||
// `captures/NNN-frida-suspend-activate-instrumented/` (next free NNN).
|
||||
// 5. Grep for `mx.suspend.begin|mx.suspend.end|mx.activate.begin|mx.activate.end`
|
||||
// in the new TSV. If any matching `nmx.enter` / `lmx.*` events appear in
|
||||
// the same time window — typed decode the body and update
|
||||
// `analysis/proxy/nmxsvcps-procedures.tsv` + `docs/M6-buffered-evidence.md`.
|
||||
// If no NMX traffic accompanies the hook fires — Suspend/Activate are
|
||||
// confirmed client-side-only and R5 in `design/70-risks-and-open-questions.md`
|
||||
// moves to "fully settled — client-side only".
|
||||
|
||||
const maxDump = 4096;
|
||||
const installed = {};
|
||||
@@ -173,6 +223,79 @@ function hookPlainArgs(moduleName, rva, name, argCount) {
|
||||
});
|
||||
}
|
||||
|
||||
function readMxStatusOut(ptrValue) {
|
||||
// MxStatus on the wire is 4 × int16 = 8 bytes:
|
||||
// short Success, short Category, short DetectedBy, short Detail.
|
||||
// See src/MxNativeCodec/MxStatus.cs and the .NET reference's
|
||||
// `out MxStatus pMxStatus` parameter on ILMXProxyServer5.{Suspend,Activate}.
|
||||
try {
|
||||
if (ptrValue.isNull()) return null;
|
||||
return {
|
||||
raw: dumpBytes(ptrValue, 8),
|
||||
success: ptrValue.add(0).readS16(),
|
||||
category: ptrValue.add(2).readS16(),
|
||||
detectedBy: ptrValue.add(4).readS16(),
|
||||
detail: ptrValue.add(6).readS16()
|
||||
};
|
||||
} catch (e) {
|
||||
return { error: e.message };
|
||||
}
|
||||
}
|
||||
|
||||
function hookSuspendActivate(rva, name, eventVerb) {
|
||||
// CLMXProxyServer::Suspend / Activate are __stdcall member methods:
|
||||
// HRESULT Suspend(int hLMXServerHandle, int hItem, MxStatus* pMxStatusOut)
|
||||
// After Frida's __stdcall lowering, args[0] = this (because the prologue
|
||||
// pushes ECX into the stack frame the same way AdviseSupervisory does at
|
||||
// RVA 0x142b4), args[1] = serverHandle, args[2] = itemHandle,
|
||||
// args[3] = MxStatus* out. Mirrors the AdviseSupervisory hookPlainArgs
|
||||
// shape but with typed out-param decoding (cf. hookAuthenticateUser).
|
||||
hook("LmxProxy.dll", rva, name, function (address, module) {
|
||||
return {
|
||||
onEnter(args) {
|
||||
this.statusOut = ptrArg(args, 3);
|
||||
this.serverHandle = intArg(args, 1);
|
||||
this.itemHandle = intArg(args, 2);
|
||||
emit({
|
||||
event: "mx." + eventVerb + ".begin",
|
||||
module: "LmxProxy.dll",
|
||||
name,
|
||||
address: address.toString(),
|
||||
ecx: this.context.ecx ? this.context.ecx.toString() : "",
|
||||
serverHandle: this.serverHandle,
|
||||
itemHandle: this.itemHandle,
|
||||
statusOutPtr: this.statusOut.toString()
|
||||
});
|
||||
},
|
||||
onLeave(retval) {
|
||||
emit({
|
||||
event: "mx." + eventVerb + ".end",
|
||||
module: "LmxProxy.dll",
|
||||
name,
|
||||
retval: retval.toString(),
|
||||
serverHandle: this.serverHandle,
|
||||
itemHandle: this.itemHandle,
|
||||
status: readMxStatusOut(this.statusOut)
|
||||
});
|
||||
}
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
function hookSuspend() {
|
||||
// FUN_10013d9c, RVA 0x13d9c; matched on the
|
||||
// `CLMXProxyServer::Suspend - Server Handle ` string xref in
|
||||
// analysis/ghidra/exports/LmxProxy.dll.string-refs.tsv:119.
|
||||
hookSuspendActivate(0x13d9c, "CLMXProxyServer.Suspend", "suspend");
|
||||
}
|
||||
|
||||
function hookActivate() {
|
||||
// FUN_10014028, RVA 0x14028; matched on the
|
||||
// `CLMXProxyServer::Activate - Server Handle ` string xref in
|
||||
// analysis/ghidra/exports/LmxProxy.dll.string-refs.tsv:122.
|
||||
hookSuspendActivate(0x14028, "CLMXProxyServer.Activate", "activate");
|
||||
}
|
||||
|
||||
function hookAuthenticateUser() {
|
||||
hook("LmxProxy.dll", 0x1399f, "CLMXProxyServer.AuthenticateUser", function (address, module) {
|
||||
return {
|
||||
@@ -452,6 +575,12 @@ function installKnownHooks() {
|
||||
hookPlainArgs("LmxProxy.dll", 0x1121d, "CLMXProxyServer.AddBufferedItem", 5);
|
||||
hookPlainArgs("LmxProxy.dll", 0x0fc80, "CLMXProxyServer.SetBufferedUpdateInterval", 3);
|
||||
hookPlainArgs("LmxProxy.dll", 0x142b4, "CLMXProxyServer.AdviseSupervisory", 5);
|
||||
// F46: Suspend / Activate wire-side instrumentation. No `Resume` / `Reactivate`
|
||||
// exports exist in LmxProxy.dll's symbol table — verified against
|
||||
// analysis/ghidra/exports/LmxProxy.dll.ghidra.md and the
|
||||
// ILMXProxyServer5 / ILMXProxyServer4 decompiled interfaces.
|
||||
hookSuspend();
|
||||
hookActivate();
|
||||
hookPlainArgs("LmxProxy.dll", 0x163c0, "CProxy_ILMXProxyServerEvents2.Fire_OnBufferedDataChange", 8);
|
||||
hookPlainArgs("LmxProxy.dll", 0x16b50, "CUserConnectionCallback.OnSetAttributeResult", 4);
|
||||
hookPlainArgs("LmxProxy.dll", 0x16d4b, "CUserConnectionCallback.OperationComplete", 4);
|
||||
|
||||
@@ -60,12 +60,13 @@ The `OnBufferedDataChange` **public event shape** the wwtools api-notes describe
|
||||
|
||||
**Settles when:** indefinitely deferred — see Open evidence gaps table. Settle criteria depends on the same Ghidra mapping table as R3, which does not exist in `analysis/ghidra/` and has no owner. Reopen if a future capture or decompiled output produces evidence.
|
||||
|
||||
### R5 — Activate / Suspend behaviour **(partially observed — F44 documented client-side trigger; wire-side residual gap filed as F45)**
|
||||
### R5 — Activate / Suspend behaviour **(partially observed — F44 documented client-side trigger; wire-side residual gap filed as F46, hook landed pending live re-run)**
|
||||
|
||||
**Severity: P2** (downgraded from P1 — client-side acceptance criteria are
|
||||
now documented; LMX-proxy wire emission remains unconfirmed)
|
||||
|
||||
**Status (2026-05-06): PARTIALLY OBSERVED.** F44's evidence walk on
|
||||
**Status (2026-05-06): PARTIALLY OBSERVED — Frida hooks ready, live capture pending.**
|
||||
F44's evidence walk on
|
||||
`captures/077-frida-suspend-advised-scanstate/` (per `docs/M6-buffered-evidence.md`)
|
||||
documents:
|
||||
|
||||
@@ -83,15 +84,27 @@ What capture 077 could **not** answer: whether the production
|
||||
(e.g. an `ILMXProxyServer5` opnum) or also handles them client-side. Capture
|
||||
077's Frida script did not hook
|
||||
`LmxProxy.dll!CLMXProxyServer.Suspend`/`.Activate`, so the wire-side
|
||||
behaviour is invisible. Filed as **F45** in `design/followups.md` to
|
||||
re-instrument and capture.
|
||||
behaviour is invisible.
|
||||
|
||||
**Next step — F46.** `analysis/frida/mx-nmx-trace.js` now carries
|
||||
`Interceptor.attach` blocks for `LmxProxy.dll!CLMXProxyServer.Suspend`
|
||||
(RVA `0x13d9c`, `FUN_10013d9c`) and `.Activate` (RVA `0x14028`,
|
||||
`FUN_10014028`), emitting `mx.suspend.begin/end` and
|
||||
`mx.activate.begin/end` events with the `MxStatus*` out-parameter
|
||||
decoded as 4 × int16. No `Resume` / `Reactivate` symbols exist in
|
||||
`LmxProxy.dll` — verified against
|
||||
`analysis/ghidra/exports/LmxProxy.dll.ghidra.md` and the decompiled
|
||||
`ILMXProxyServer5` / `ILMXProxyServer4` interfaces. R5 stays open
|
||||
until a live re-run on the AVEVA host produces
|
||||
`captures/NNN-frida-suspend-activate-instrumented/` per the procedure
|
||||
documented at the top of `analysis/frida/mx-nmx-trace.js`.
|
||||
|
||||
**Current best answer:** expose `Session::suspend(item)` and
|
||||
`Session::activate(item)` returning `Result<MxStatus, Error>`. The success
|
||||
criteria match the .NET reference's client-side gating: the item must have
|
||||
an active subscription. If F45's wire capture later proves the LMX proxy
|
||||
an active subscription. If F46's wire capture later proves the LMX proxy
|
||||
issues a separate ORPC method, add the wire emission here in M6 follow-up.
|
||||
Do not build callback-driven state transitions on top until F45 settles.
|
||||
Do not build callback-driven state transitions on top until F46 settles.
|
||||
|
||||
**Settles when:** F45 produces a Frida capture instrumenting
|
||||
`LmxProxy.dll!CLMXProxyServer.Suspend` / `.Activate` and either confirms a
|
||||
|
||||
+9
-28
@@ -6,34 +6,6 @@ move to `## Resolved` with a date + commit hash.
|
||||
|
||||
## Open
|
||||
|
||||
### F45 — Recovery replay should re-issue `RegisterReference` for buffered subscriptions
|
||||
**Severity:** P2 — F36 buffered subscriptions survive across `recover_connection` only via `AdviseSupervisory` replay, which loses the `.property(buffer)` registration.
|
||||
**Source:** `crates/mxaccess/src/session.rs::recover_connection_core` (the loop iterates `subscriptions` and replays via `advise_supervisory`).
|
||||
**Depends on:** F36 (closed by the same iteration as this followup is filed).
|
||||
|
||||
**Scope.** `Session::subscribe_buffered` records its `Subscription` in the same `SessionInner::subscriptions` registry as plain `subscribe` does, so the registry-walking recovery loop replays them via `AdviseSupervisory` rather than `RegisterReference` with `.property(buffer)`. The metadata stored in `SubscriptionEntry` is the original (un-suffixed) tag's `GalaxyTagMetadata`; the buffered name suffix is lost on replay. The server may continue to deliver values under the existing `.property(buffer)` registration on the engine side because the OBJREF / engine id pair survives the rebuild — but if the server tears the buffered registration down on disconnect, recovery will silently downgrade buffered → plain.
|
||||
|
||||
**Definition of done:**
|
||||
1. `SubscriptionEntry` gains a discriminator (`enum SubscriptionMode { Plain, Buffered { rounded_interval_ms: u32 } }`) so recovery can branch on the original advise shape.
|
||||
2. The buffered branch in `recover_connection_core` rebuilds the original `NmxReferenceRegistrationMessage` (with `.property(buffer)` suffix + the saved correlation id + `subscribe = true`) and dispatches `register_reference` against the rebuilt transport.
|
||||
3. Live regression: `cargo run -p mxaccess --example subscribe-buffered` against AVEVA, then force a recovery via `Session::recover_connection`, and confirm subsequent `OnBufferedDataChange`-rate updates continue at the same cadence.
|
||||
|
||||
**Resolves when:** the recovery path treats buffered subscriptions identically to how the original advise was issued.
|
||||
|
||||
### F46 — Capture `LmxProxy.dll!CLMXProxyServer.Suspend`/`.Activate` wire emission
|
||||
**Severity:** P3 — residual gap from F44's R5 walk.
|
||||
**Source:** `design/70-risks-and-open-questions.md` R5 + `docs/M6-buffered-evidence.md` (capture 077 section) + `captures/077-frida-suspend-advised-scanstate/frida-events.tsv:2-17` (Frida hook list).
|
||||
|
||||
**Scope.** Capture 077 confirmed the .NET-reference compatibility-server's client-side gating for `Suspend` (must have an active subscription; returns `MxStatus.SuspendPending` synchronously) but did not instrument `LmxProxy.dll!CLMXProxyServer.Suspend` / `.Activate`. Open question: does the production LMX proxy issue a separate ORPC method for these, or does it also synthesise the response client-side?
|
||||
|
||||
**Definition of done:**
|
||||
1. Extend `analysis/frida/mx-nmx-trace.js` to `Interceptor.attach` on `LmxProxy.dll!CLMXProxyServer.Suspend` and `.Activate` (and any sibling `Resume` / `Reactivate` if present in the export table). Mirror the existing `AdviseSupervisory` hook shape.
|
||||
2. Re-run the `suspend-advised` scenario against `TestChildObject.ScanState`, plus a fresh `activate-advised` scenario, save under `captures/NNN-frida-suspend-activate-instrumented/`.
|
||||
3. If a wire emission appears (PutRequest + TransferData with a new opnum or body shape): document it in `docs/M6-buffered-evidence.md` and `analysis/proxy/nmxsvcps-procedures.tsv`; add typed decode if the inner body is novel.
|
||||
4. If no wire emission appears: confirm both operations are purely client-side and update R5 to "fully settled — client-side only".
|
||||
|
||||
**Resolves when:** R5 is fully settled (either with a documented wire opnum or a "client-side only" verdict backed by capture).
|
||||
|
||||
### F3 — Cross-domain NTLM Type1/2/3 fixture
|
||||
**Severity:** P2
|
||||
**Status:** Permanently out-of-scope on the current dev host (no second AD domain). Resolution requires external infrastructure not available here.
|
||||
@@ -45,6 +17,15 @@ move to `## Resolved` with a date + commit hash.
|
||||
|
||||
## Resolved
|
||||
|
||||
### F47 — `Session::unsubscribe` should skip `UnAdvise` for buffered subscriptions
|
||||
**Resolved:** 2026-05-06 (commit `1a1830f`). `Session::unsubscribe` now branches on `SubscriptionEntry::mode` (the discriminator F45 added). For `SubscriptionMode::Buffered { ... }`, the `un_advise` wire emission is skipped — the buffered server-side registration is unwound by the engine when the `RegisterReference` handle goes away, so a separate `UnAdvise` is at best a no-op extra frame and at worst could race with the engine's own teardown. Mirrors the .NET reference's `if (!subscription.IsBuffered)` guard at `MxNativeSession.cs:361-381`. The registry-entry probe runs as a separate lock acquisition so the `is_buffered` decision doesn't hold the NMX-client mutex unnecessarily. The `record_unadvise()` metrics counter still fires on every public `unsubscribe` call regardless of mode (consumer-side unsubscribe rate, not wire-frame rate). New unit test `unsubscribe_skips_un_advise_for_buffered_subscription` issues a plain subscribe (recorded as 1 RPC), mutates the registry entry to `SubscriptionMode::Buffered`, calls unsubscribe, and asserts the recorded RPC count stays at 1 (no UnAdvise emitted). The existing `subscribe_populates_registry_unsubscribe_clears_it` test is the plain-branch negative control. Workspace 794 → 795 tests; clippy + rustdoc clean.
|
||||
|
||||
### F45 — Recovery replay should re-issue `RegisterReference` for buffered subscriptions
|
||||
**Resolved:** 2026-05-06 (commit `9b57cf8`). New `pub(crate) enum SubscriptionMode { Plain, Buffered { rounded_interval_ms, item_definition, item_context, item_handle } }` discriminator on `SubscriptionEntry`. `Session::subscribe` (plain path) records `SubscriptionMode::Plain`; `subscribe_buffered_nmx` records `SubscriptionMode::Buffered { ... }` carrying the un-suffixed reference + the rounded interval (so the re-issued buffered registration matches the original cadence). `recover_connection_core` matches on `entry.mode`: plain branch unchanged; buffered branch re-applies `.property(buffer)` via `to_buffered_item_definition` (idempotent), rebuilds the original `NmxReferenceRegistrationMessage` with the saved correlation id + `subscribe = true`, and dispatches `register_reference` (kind=ItemControl, inner command `0x10`) against the replacement transport. Mirrors `MxNativeSession.ReAdviseSubscription` (`MxNativeSession.cs:538-569`). New unit test `recover_connection_replays_buffered_subscription_via_register_reference` synthesises a buffered registry entry, installs a `RebuildFactory` pointing at a recording NMX server, drives `recover_connection`, then asserts the recorded `TransferData` carries inner command `0x10` (NOT `0x1f`) with the `.property(buffer)`-suffixed item_definition + the saved correlation id + subscribe=true. Public API unchanged (`SubscriptionMode` + `SubscriptionEntry` stay `pub(crate)`); `cargo public-api -p mxaccess` baseline unchanged. Workspace 793 → 794 tests; clippy + rustdoc clean. Side-finding spawned **F47** (`Session::unsubscribe` divergence on buffered drop).
|
||||
|
||||
### F46 — Capture `LmxProxy.dll!CLMXProxyServer.Suspend`/`.Activate` wire emission
|
||||
**Resolved:** 2026-05-06 (commit `808fea1`). `analysis/frida/mx-nmx-trace.js` extended with `Interceptor.attach` hooks on `LmxProxy.dll!CLMXProxyServer.Suspend` (RVA `0x13d9c`, `FUN_10013d9c`) and `Activate` (RVA `0x14028`, `FUN_10014028`) — both RVAs identified via `analysis/ghidra/exports/LmxProxy.dll.string-refs.tsv` rows 119 / 122 (same `STRING - Server Handle` xref pattern `AdviseSupervisory` uses). Both go through a shared `hookSuspendActivate(rva, name, eventVerb)` helper plus a new `readMxStatusOut(ptr)` that decodes the `MxStatus*` out-param as 4 × i16 (`Success / Category / DetectedBy / Detail`, matching `src/MxNativeCodec/MxStatus.cs`). Hooks emit `mx.suspend.begin/end` and `mx.activate.begin/end` events for grep-ability. **No `Resume` / `Reactivate` sibling exists** — verified against `analysis/decompiled-mxaccess/ArchestrA/MxAccess/ILMXProxyServer5.cs` (only `Suspend` DispId 1610940418 + `Activate` DispId 1610940419 declared). Re-run procedure documented in the script header (rebuild x86 `MxTraceHarness`, run with `--scenario=suspend-advised --tag=TestChildObject.ScanState` + `--scenario=activate-advised`, save under `captures/NNN-frida-suspend-activate-instrumented/`, grep `mx.suspend.*` / `mx.activate.*` and correlate with `nmx.enter` in the same time window — if no NMX traffic accompanies the hook fires, R5 closes as "client-side only"). R5 in `design/70-risks-and-open-questions.md` updated to point at F46 as the next-step. Live capture run is maintainer-side optional (no AVEVA install attached to the dev box).
|
||||
|
||||
### F41 — `cargo public-api` baseline
|
||||
**Resolved:** 2026-05-06 (commit `9e57bfd`). Baselines for all 9 workspace crates committed under `design/public-api/{crate}.txt`, generated via `cargo +nightly public-api --simplified -p <crate>`. Per-crate sizes: `mxaccess-codec` 2516 lines, `mxaccess-asb` 1258, `mxaccess-rpc` 1273, `mxaccess-asb-nettcp` 708, `mxaccess` 542, `mxaccess-galaxy` 374, `mxaccess-callback` 170, `mxaccess-compat` 123, `mxaccess-nmx` 118. `design/public-api/README.md` documents the update procedure (install nightly + cargo-public-api, regenerate the affected baseline on intentional API changes, commit alongside). `.github/workflows/rust.yml` gains a `public-api` job that runs the same diff against the committed baseline; drift fails CI with a unified diff in the log so the PR author can either revert or update the baseline.
|
||||
|
||||
|
||||
@@ -372,10 +372,71 @@ pub struct SessionInner {
|
||||
/// Per-subscription state retained for [`Session::recover_connection`].
|
||||
/// The full `Subscription` handle stays with the consumer and continues
|
||||
/// to receive broadcasts across the swap; this struct just preserves
|
||||
/// the inputs `advise_supervisory` needs.
|
||||
/// the inputs the recovery loop needs to reissue the original advise.
|
||||
///
|
||||
/// `mode` discriminates the original advise shape (plain
|
||||
/// `AdviseSupervisory` vs buffered `RegisterReference` with the
|
||||
/// `.property(buffer)` suffix) so the recovery branch can re-issue
|
||||
/// the matching wire op — see F45 in `design/followups.md` for the
|
||||
/// motivation. Mirrors `MxNativeSubscription.IsBuffered`
|
||||
/// (`MxNativeSession.cs:59`) + the `ReAdviseSubscription` branch
|
||||
/// (`MxNativeSession.cs:538-569`).
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct SubscriptionEntry {
|
||||
pub(crate) metadata: Arc<mxaccess_galaxy::GalaxyTagMetadata>,
|
||||
pub(crate) mode: SubscriptionMode,
|
||||
}
|
||||
|
||||
/// Discriminator for [`SubscriptionEntry`] — captures the original
|
||||
/// advise shape so the recovery loop re-issues the same wire op.
|
||||
///
|
||||
/// - [`SubscriptionMode::Plain`] entries replay via
|
||||
/// `INmxService2::AdviseSupervisory` (matches the original
|
||||
/// `Session::subscribe` path).
|
||||
/// - [`SubscriptionMode::Buffered`] entries replay via
|
||||
/// `INmxService2::RegisterReference` with the `.property(buffer)`-
|
||||
/// suffixed item definition + the saved correlation id +
|
||||
/// `subscribe = true` (matches the original
|
||||
/// `Session::subscribe_buffered` path; mirrors
|
||||
/// `MxNativeSession.ReAdviseSubscription` `cs:538-558`).
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) enum SubscriptionMode {
|
||||
/// Plain `AdviseSupervisory`-issued subscription. Recovery replays
|
||||
/// it via `advise_supervisory`.
|
||||
Plain,
|
||||
/// Buffered `RegisterReference`-issued subscription. Recovery
|
||||
/// rebuilds the original `NmxReferenceRegistrationMessage` and
|
||||
/// dispatches `register_reference` so the server-side buffered
|
||||
/// registration survives the transport swap.
|
||||
///
|
||||
/// The fields preserve the inputs `subscribe_buffered_nmx` used:
|
||||
/// the un-suffixed `item_definition` (re-suffixed via
|
||||
/// [`NmxReferenceRegistrationMessage::to_buffered_item_definition`]
|
||||
/// on replay), the `item_context` (currently always empty —
|
||||
/// reserved for the compat-server F35 split-context form), the
|
||||
/// `item_handle` (currently always 0), and the rounded-up cadence
|
||||
/// in milliseconds (informational; native MXAccess does not transmit
|
||||
/// it on the wire — see capture 082).
|
||||
Buffered {
|
||||
/// Cadence in milliseconds, already rounded up to the nearest
|
||||
/// 100 ms via [`crate::BufferedOptions::rounded_update_interval_ms`].
|
||||
/// Carried through recovery so a future SetBufferedUpdateInterval
|
||||
/// transmission could be wired without losing the original
|
||||
/// cadence.
|
||||
rounded_interval_ms: u32,
|
||||
/// Un-suffixed item definition as supplied to
|
||||
/// [`Session::subscribe_buffered`]. The `.property(buffer)`
|
||||
/// suffix is re-applied on replay via
|
||||
/// [`NmxReferenceRegistrationMessage::to_buffered_item_definition`].
|
||||
item_definition: String,
|
||||
/// Item context (compat-server split form). Empty when
|
||||
/// `subscribe_buffered` is called directly with a single
|
||||
/// `reference` argument.
|
||||
item_context: String,
|
||||
/// Item handle (LMX-side identifier). 0 when the compat-server
|
||||
/// layer has not assigned one.
|
||||
item_handle: i32,
|
||||
},
|
||||
}
|
||||
|
||||
/// F16 — pluggable factory that produces a fresh [`NmxClient`].
|
||||
@@ -742,9 +803,14 @@ impl Session {
|
||||
/// server side knows about the same local callback exporter.
|
||||
/// 3. Re-run `SetHeartbeatSendInterval` if configured.
|
||||
/// 4. Walk the [`SessionInner::subscriptions`] registry and re-issue
|
||||
/// `AdviseSupervisory` for every active subscription (each
|
||||
/// the matching wire op for every active subscription. Plain
|
||||
/// entries replay via `AdviseSupervisory`; buffered entries
|
||||
/// (F45) replay via `RegisterReference` with the
|
||||
/// `.property(buffer)` suffix + the saved correlation id +
|
||||
/// `subscribe = true` — mirroring
|
||||
/// `MxNativeSession.ReAdviseSubscription` (`cs:538-569`). Each
|
||||
/// correlation_id is preserved so the consumer's `Subscription`
|
||||
/// handle keeps receiving on its existing broadcast filter).
|
||||
/// handle keeps receiving on its existing broadcast filter.
|
||||
/// 5. Atomically swap the inner mutex's `NmxClient` so the old one
|
||||
/// drops at end-of-scope.
|
||||
///
|
||||
@@ -786,12 +852,19 @@ impl Session {
|
||||
}
|
||||
}
|
||||
|
||||
// Step 4: replay every active subscription's AdviseSupervisory
|
||||
// against the replacement transport. Snapshot under the lock,
|
||||
// then drop the lock so subscribe()/un_advise() can race with
|
||||
// recovery without deadlocking. The atomic swap below installs
|
||||
// the replacement before we yield; after that, any new
|
||||
// subscribe() call will see the registry+replacement pair.
|
||||
// Step 4: replay every active subscription against the
|
||||
// replacement transport. Plain entries → `AdviseSupervisory`;
|
||||
// buffered entries (F45) → `RegisterReference` with the
|
||||
// `.property(buffer)` suffix + saved correlation id +
|
||||
// `subscribe = true`. Mirrors
|
||||
// `MxNativeSession.ReAdviseSubscription` (`cs:538-569`) which
|
||||
// branches on `subscription.IsBuffered`.
|
||||
//
|
||||
// Snapshot under the lock, then drop it so subscribe() /
|
||||
// un_advise() can race with recovery without deadlocking. The
|
||||
// atomic swap below installs the replacement before we yield;
|
||||
// after that, any new subscribe() call will see the
|
||||
// registry+replacement pair.
|
||||
let snapshot: Vec<([u8; 16], SubscriptionEntry)> = {
|
||||
let registry = self.inner.subscriptions.lock().await;
|
||||
registry
|
||||
@@ -800,6 +873,8 @@ impl Session {
|
||||
.collect()
|
||||
};
|
||||
for (correlation_id, entry) in &snapshot {
|
||||
match &entry.mode {
|
||||
SubscriptionMode::Plain => {
|
||||
let hr = replacement
|
||||
.advise_supervisory(
|
||||
opts.local_engine_id,
|
||||
@@ -813,6 +888,54 @@ impl Session {
|
||||
.map_err(map_nmx)?;
|
||||
ensure_hresult_ok(hr)?;
|
||||
}
|
||||
SubscriptionMode::Buffered {
|
||||
rounded_interval_ms: _rounded_interval_ms,
|
||||
item_definition,
|
||||
item_context,
|
||||
item_handle,
|
||||
} => {
|
||||
// F45: rebuild the original buffered registration
|
||||
// body. The codec helper re-applies the
|
||||
// `.property(buffer)` suffix idempotently — passing
|
||||
// an already-suffixed name returns it unchanged
|
||||
// (verified by
|
||||
// `request_to_buffered_item_definition_idempotent_case_insensitive`
|
||||
// in `mxaccess-codec`).
|
||||
let buffered_def =
|
||||
NmxReferenceRegistrationMessage::to_buffered_item_definition(
|
||||
item_definition,
|
||||
)
|
||||
.map_err(|e| {
|
||||
Error::Configuration(ConfigError::InvalidArgument {
|
||||
detail: format!(
|
||||
"recovery: buffered item definition: {e}"
|
||||
),
|
||||
})
|
||||
})?;
|
||||
let registration = NmxReferenceRegistrationMessage {
|
||||
item_handle: *item_handle,
|
||||
item_correlation_id: *correlation_id,
|
||||
item_definition: buffered_def,
|
||||
item_context: item_context.clone(),
|
||||
subscribe: true,
|
||||
reserved_25_27: [0; 2],
|
||||
reserved_31_55: [0; 24],
|
||||
};
|
||||
let hr = replacement
|
||||
.register_reference(
|
||||
opts.local_engine_id,
|
||||
&entry.metadata,
|
||||
®istration,
|
||||
opts.galaxy_id,
|
||||
i32::from(opts.galaxy_id),
|
||||
opts.source_platform_id,
|
||||
)
|
||||
.await
|
||||
.map_err(map_nmx)?;
|
||||
ensure_hresult_ok(hr)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Step 5: atomic swap. The previous NmxClient drops at end of
|
||||
// scope (closes its TCP transport).
|
||||
@@ -1106,6 +1229,7 @@ impl Session {
|
||||
correlation_id,
|
||||
SubscriptionEntry {
|
||||
metadata: Arc::clone(&metadata_arc),
|
||||
mode: SubscriptionMode::Plain,
|
||||
},
|
||||
);
|
||||
reg.len()
|
||||
@@ -1165,8 +1289,10 @@ impl Session {
|
||||
// computed for parity with the .NET reference; it is currently
|
||||
// not transmitted on the wire because native MXAccess holds it
|
||||
// client-side only (see capture 082's missing
|
||||
// `SetBufferedUpdateInterval` frame).
|
||||
let _rounded_ms = options.rounded_update_interval_ms();
|
||||
// `SetBufferedUpdateInterval` frame). F45 stashes the rounded
|
||||
// cadence on the registry entry so a future SetBufferedUpdateInterval
|
||||
// transmission could be wired without re-deriving it on replay.
|
||||
let rounded_ms = options.rounded_update_interval_ms();
|
||||
|
||||
let inner = self.inner.clone();
|
||||
let metadata = inner
|
||||
@@ -1221,20 +1347,25 @@ impl Session {
|
||||
drop(nmx);
|
||||
|
||||
let metadata_arc = Arc::new(metadata);
|
||||
// Record the active subscription so recover_connection can replay
|
||||
// it after a transport rebuild. The replay path currently uses
|
||||
// `AdviseSupervisory` for every entry; for buffered subscriptions
|
||||
// that path is functionally equivalent (the LMX server already
|
||||
// remembers the buffered registration via the `.property(buffer)`
|
||||
// suffix carried in the metadata's name). Tracked as a sub-followup
|
||||
// — see `design/followups.md` if a future iteration wants to
|
||||
// re-issue `RegisterReference` instead.
|
||||
// F45: tag the registry entry as Buffered with the original
|
||||
// `(item_definition, item_context, item_handle)` triple +
|
||||
// rounded cadence, so `recover_connection_core` can rebuild the
|
||||
// matching `NmxReferenceRegistrationMessage` and dispatch
|
||||
// `register_reference` (mirrors `MxNativeSession.ReAdviseSubscription`
|
||||
// `cs:540-558` — the .NET reference's recovery branch on
|
||||
// `subscription.IsBuffered`).
|
||||
let registry_size = {
|
||||
let mut reg = inner.subscriptions.lock().await;
|
||||
reg.insert(
|
||||
correlation_id,
|
||||
SubscriptionEntry {
|
||||
metadata: Arc::clone(&metadata_arc),
|
||||
mode: SubscriptionMode::Buffered {
|
||||
rounded_interval_ms: rounded_ms,
|
||||
item_definition: reference.to_string(),
|
||||
item_context: String::new(),
|
||||
item_handle: 0,
|
||||
},
|
||||
},
|
||||
);
|
||||
reg.len()
|
||||
@@ -1342,6 +1473,24 @@ impl Session {
|
||||
self.ensure_connected()?;
|
||||
let inner = self.inner.clone();
|
||||
let opts = &inner.options;
|
||||
|
||||
// F47 — `Session::unsubscribe` skips `UnAdvise` for buffered
|
||||
// subscriptions, mirroring the .NET reference's
|
||||
// `if (!subscription.IsBuffered)` guard at
|
||||
// `MxNativeSession.cs:361-381`. Buffered subscriptions are
|
||||
// unwound by the engine when the `RegisterReference` handle
|
||||
// goes away — there's no item-level advise to retract.
|
||||
// Probe the registry first so we know which mode the
|
||||
// subscription was registered under.
|
||||
let is_buffered = {
|
||||
let reg = inner.subscriptions.lock().await;
|
||||
matches!(
|
||||
reg.get(&subscription.correlation_id),
|
||||
Some(entry) if matches!(entry.mode, SubscriptionMode::Buffered { .. })
|
||||
)
|
||||
};
|
||||
|
||||
if !is_buffered {
|
||||
let mut nmx = inner.nmx.lock().await;
|
||||
let hr = nmx
|
||||
.un_advise(
|
||||
@@ -1356,16 +1505,23 @@ impl Session {
|
||||
.map_err(map_nmx)?;
|
||||
ensure_hresult_ok(hr)?;
|
||||
drop(nmx);
|
||||
}
|
||||
|
||||
// F16: drop the subscription from the recovery registry too.
|
||||
// We do this only on the success path — if UnAdvise itself
|
||||
// failed, the server may still hold the supervisory record and
|
||||
// a future recover_connection should re-issue the advise.
|
||||
// For plain entries, we do this only on the UnAdvise success
|
||||
// path — if UnAdvise itself failed, the server may still hold
|
||||
// the supervisory record and a future recover_connection should
|
||||
// re-issue the advise. Buffered entries always reach here
|
||||
// because no UnAdvise was attempted.
|
||||
let registry_size = {
|
||||
let mut reg = inner.subscriptions.lock().await;
|
||||
reg.remove(&subscription.correlation_id);
|
||||
reg.len()
|
||||
};
|
||||
// F40 — count the unadvise + update the gauge.
|
||||
// For buffered entries no UnAdvise was emitted, but the
|
||||
// counter still tracks consumer-side unsubscribe events so
|
||||
// the rate matches the public API's call rate.
|
||||
session_metrics::record_unadvise();
|
||||
session_metrics::set_registered_items(registry_size);
|
||||
Ok(())
|
||||
@@ -2496,6 +2652,262 @@ mod tests {
|
||||
handle.await.unwrap();
|
||||
}
|
||||
|
||||
/// Per-RPC capture: `(opnum, stub_data)` for each Request PDU the
|
||||
/// client dispatched against a [`recording_server`].
|
||||
type RecordedRpc = (u16, Vec<u8>);
|
||||
/// Shared, mutable log of [`RecordedRpc`]s appended to by a
|
||||
/// [`recording_server`] task.
|
||||
type RecordedRpcLog = Arc<StdMutex<Vec<RecordedRpc>>>;
|
||||
|
||||
/// Same as [`unauthenticated_server`] but additionally records every
|
||||
/// incoming Request PDU's `(opnum, stub_data)` so a test can assert
|
||||
/// which RPC the client dispatched. Used by the F45 buffered-recovery
|
||||
/// branch test below.
|
||||
async fn recording_server(
|
||||
responses: Vec<(i32, Vec<u8>)>,
|
||||
) -> (SocketAddr, RecordedRpcLog, tokio::task::JoinHandle<()>) {
|
||||
let listener = TcpListener::bind(local_addr()).await.unwrap();
|
||||
let addr = listener.local_addr().unwrap();
|
||||
let recorded: RecordedRpcLog = Arc::new(StdMutex::new(Vec::new()));
|
||||
let recorded_for_task = Arc::clone(&recorded);
|
||||
let handle = tokio::spawn(async move {
|
||||
let (mut sock, _) = listener.accept().await.unwrap();
|
||||
// Drain Bind, reply BindAck.
|
||||
let mut hdr = [0u8; 16];
|
||||
sock.read_exact(&mut hdr).await.unwrap();
|
||||
let bind_h = PduHeader::decode(&hdr).unwrap();
|
||||
let mut body = vec![0u8; bind_h.fragment_length as usize - 16];
|
||||
sock.read_exact(&mut body).await.unwrap();
|
||||
let resp_h = PduHeader {
|
||||
version: 5,
|
||||
version_minor: 0,
|
||||
packet_type: PacketType::BindAck,
|
||||
packet_flags: 0x03,
|
||||
data_representation: 0x10,
|
||||
fragment_length: 16,
|
||||
auth_length: 0,
|
||||
call_id: bind_h.call_id,
|
||||
};
|
||||
let mut out = [0u8; 16];
|
||||
resp_h.encode(&mut out).unwrap();
|
||||
sock.write_all(&out).await.unwrap();
|
||||
|
||||
for (custom_hresult, extra_payload) in responses {
|
||||
sock.read_exact(&mut hdr).await.unwrap();
|
||||
let req_h = PduHeader::decode(&hdr).unwrap();
|
||||
let mut body = vec![0u8; req_h.fragment_length as usize - 16];
|
||||
sock.read_exact(&mut body).await.unwrap();
|
||||
|
||||
// body layout (Request PDU minus the 16-byte common
|
||||
// header): 4 alloc_hint, 2 context_id, 2 opnum, then
|
||||
// optional 16-byte object UUID (when PFC_OBJECT_UUID
|
||||
// = 0x80 is set in packet_flags), then stub_data.
|
||||
let opnum = u16::from_le_bytes([body[6], body[7]]);
|
||||
let pfc_object_uuid = (req_h.packet_flags & 0x80) != 0;
|
||||
let stub_offset = if pfc_object_uuid { 8 + 16 } else { 8 };
|
||||
let stub = body[stub_offset..].to_vec();
|
||||
recorded_for_task
|
||||
.lock()
|
||||
.unwrap()
|
||||
.push((opnum, stub));
|
||||
|
||||
let mut stub_resp = Vec::new();
|
||||
stub_resp.extend_from_slice(&OrpcThat::default().encode());
|
||||
stub_resp.extend_from_slice(&custom_hresult.to_le_bytes());
|
||||
stub_resp.extend_from_slice(&extra_payload);
|
||||
|
||||
let response = ResponsePdu {
|
||||
header: PduHeader {
|
||||
version: 5,
|
||||
version_minor: 0,
|
||||
packet_type: PacketType::Response,
|
||||
packet_flags: 0x03,
|
||||
data_representation: 0x10,
|
||||
fragment_length: 0,
|
||||
auth_length: 0,
|
||||
call_id: req_h.call_id,
|
||||
},
|
||||
allocation_hint: stub_resp.len() as u32,
|
||||
context_id: 0,
|
||||
cancel_count: 0,
|
||||
reserved23: 0,
|
||||
stub_data: stub_resp,
|
||||
};
|
||||
let bytes = response.encode();
|
||||
sock.write_all(&bytes).await.unwrap();
|
||||
}
|
||||
});
|
||||
(addr, recorded, handle)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn recover_connection_replays_buffered_subscription_via_register_reference() {
|
||||
use crate::RecoveryPolicy;
|
||||
// F45: a buffered SubscriptionEntry must replay through
|
||||
// `RegisterReference` (with `.property(buffer)` suffix + the
|
||||
// saved correlation id + `subscribe = true`) — NOT through
|
||||
// `AdviseSupervisory`. Synthesise the entry directly so we don't
|
||||
// need a live `subscribe_buffered` round-trip; drive the recovery
|
||||
// path against a recording mock and inspect the wire bytes.
|
||||
|
||||
// Mock that the original session connection talked to. Drained
|
||||
// immediately — connect_test_session doesn't issue any calls.
|
||||
let (addr_orig, handle_orig) = unauthenticated_server(Vec::new()).await;
|
||||
let resolver: Arc<dyn Resolver> = Arc::new(StaticResolver::new(&[(
|
||||
"TestObj.TestInt",
|
||||
sample_metadata(),
|
||||
)]));
|
||||
let session = connect_test_session(addr_orig, resolver).await.unwrap();
|
||||
|
||||
// Inject a buffered registry entry. Correlation id is fixed so
|
||||
// we can assert the rebuilt registration carries the same id.
|
||||
let cid: [u8; 16] = [0xAB; 16];
|
||||
let buffered_ref = "TestObj.TestInt";
|
||||
{
|
||||
let mut reg = session.inner.subscriptions.lock().await;
|
||||
reg.insert(
|
||||
cid,
|
||||
SubscriptionEntry {
|
||||
metadata: Arc::new(sample_metadata()),
|
||||
mode: SubscriptionMode::Buffered {
|
||||
rounded_interval_ms: 1000,
|
||||
item_definition: buffered_ref.to_string(),
|
||||
item_context: String::new(),
|
||||
item_handle: 0,
|
||||
},
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
// Recording server that the rebuild factory will hand the
|
||||
// session a fresh NmxClient pointing at. The recovery loop fires
|
||||
// exactly two RPCs against this transport:
|
||||
// 1. RegisterEngine2 (HRESULT 0)
|
||||
// 2. TransferData carrying the rebuilt RegisterReference
|
||||
// (HRESULT 0)
|
||||
// The assertions below decode the second stub and pin the
|
||||
// envelope kind + inner registration shape.
|
||||
let (addr_replacement, recorded, handle_replacement) =
|
||||
recording_server(vec![(0, Vec::new()), (0, Vec::new())]).await;
|
||||
|
||||
let factory: crate::RebuildFactory = Arc::new(move || {
|
||||
let addr = addr_replacement;
|
||||
Box::pin(async move {
|
||||
let mut transport = DceRpcTcpClient::connect(addr).await.map_err(|e| {
|
||||
mxaccess_nmx::NmxClientError::Transport(
|
||||
mxaccess_rpc::transport::TransportError::Io(std::io::Error::other(
|
||||
e.to_string(),
|
||||
)),
|
||||
)
|
||||
})?;
|
||||
transport.bind(svc::INTERFACE_ID, 0, 0).await.map_err(|e| {
|
||||
mxaccess_nmx::NmxClientError::Transport(
|
||||
mxaccess_rpc::transport::TransportError::Io(std::io::Error::other(
|
||||
e.to_string(),
|
||||
)),
|
||||
)
|
||||
})?;
|
||||
Ok(NmxClient::from_bound_transport(
|
||||
transport,
|
||||
Guid::new([0xCC; 16]),
|
||||
))
|
||||
})
|
||||
});
|
||||
session.set_recovery_factory(factory).await;
|
||||
|
||||
// Drive the recovery cycle.
|
||||
session
|
||||
.recover_connection(RecoveryPolicy {
|
||||
max_attempts: 1,
|
||||
delay: std::time::Duration::ZERO,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Inspect the recorded RPCs against the replacement server.
|
||||
let recorded = recorded.lock().unwrap().clone();
|
||||
assert_eq!(
|
||||
recorded.len(),
|
||||
2,
|
||||
"expected RegisterEngine2 + TransferData(RegisterReference); got {}",
|
||||
recorded.len()
|
||||
);
|
||||
|
||||
// Slot 0 is RegisterEngine2 — pin only the opnum.
|
||||
assert_eq!(
|
||||
recorded[0].0,
|
||||
mxaccess_rpc::nmx_service2_messages::REGISTER_ENGINE_2_OPNUM,
|
||||
"first replay RPC should be RegisterEngine2"
|
||||
);
|
||||
|
||||
// Slot 1 is TransferData carrying the rebuilt RegisterReference.
|
||||
let (opnum, stub) = &recorded[1];
|
||||
assert_eq!(
|
||||
*opnum,
|
||||
mxaccess_rpc::nmx_service2_messages::TRANSFER_DATA_OPNUM,
|
||||
"buffered replay RPC must be TransferData (not, e.g., a no-op)"
|
||||
);
|
||||
|
||||
// The transfer_data stub layout:
|
||||
// 0..32 OrpcThis
|
||||
// 32..36 remote_galaxy_id i32 LE
|
||||
// 36..40 remote_platform_id i32 LE
|
||||
// 40..44 remote_engine_id i32 LE
|
||||
// 44..48 length i32 LE
|
||||
// 48..52 max_count i32 LE
|
||||
// 52..(52+L) message_body (NmxTransferEnvelope + inner)
|
||||
let body_offset = 52;
|
||||
let length = i32::from_le_bytes(stub[44..48].try_into().unwrap()) as usize;
|
||||
let message_body = &stub[body_offset..body_offset + length];
|
||||
|
||||
// Envelope kind at offset 10 of the envelope; ItemControl = 2
|
||||
// (the codec routes both AdviseSupervisory and RegisterReference
|
||||
// through ItemControl envelopes — the inner command byte is
|
||||
// what disambiguates).
|
||||
let envelope_kind_i32 = i32::from_le_bytes(message_body[10..14].try_into().unwrap());
|
||||
assert_eq!(
|
||||
envelope_kind_i32,
|
||||
NmxTransferMessageKind::ItemControl as i32,
|
||||
"envelope kind must be ItemControl"
|
||||
);
|
||||
|
||||
// Inner body starts after the 46-byte envelope. The first byte
|
||||
// distinguishes AdviseSupervisory (0x1f) from RegisterReference
|
||||
// (0x10) — F45 specifically requires the buffered branch to
|
||||
// emit the 0x10 form.
|
||||
let inner = &message_body[NmxTransferEnvelope::HEADER_LEN..];
|
||||
assert_eq!(
|
||||
inner[0], 0x10,
|
||||
"buffered replay must use RegisterReference (command 0x10), not \
|
||||
AdviseSupervisory (0x1f); got 0x{:02x}",
|
||||
inner[0]
|
||||
);
|
||||
|
||||
// Decode the inner registration and pin: correlation id matches
|
||||
// the registry entry's, item_definition carries `.property(buffer)`,
|
||||
// and `subscribe == true`.
|
||||
let parsed = NmxReferenceRegistrationMessage::parse(inner).unwrap();
|
||||
assert_eq!(
|
||||
parsed.item_correlation_id, cid,
|
||||
"rebuilt registration must carry the original correlation id"
|
||||
);
|
||||
assert!(
|
||||
parsed
|
||||
.item_definition
|
||||
.to_lowercase()
|
||||
.ends_with(".property(buffer)"),
|
||||
"rebuilt item_definition must end with .property(buffer); got {:?}",
|
||||
parsed.item_definition
|
||||
);
|
||||
assert!(
|
||||
parsed.subscribe,
|
||||
"rebuilt registration must have subscribe = true"
|
||||
);
|
||||
|
||||
handle_replacement.await.unwrap();
|
||||
handle_orig.abort();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn recover_connection_after_shutdown_returns_engine_not_registered() {
|
||||
use crate::RecoveryPolicy;
|
||||
@@ -2702,4 +3114,67 @@ mod tests {
|
||||
drop(event_tx);
|
||||
let _ = router_h.await;
|
||||
}
|
||||
|
||||
/// F47 — `Session::unsubscribe` must NOT emit an `UnAdvise` for
|
||||
/// buffered subscriptions, mirroring the .NET reference's
|
||||
/// `if (!subscription.IsBuffered)` guard at
|
||||
/// `MxNativeSession.cs:361-381`. Two parts:
|
||||
///
|
||||
/// - Plain subscribe → unsubscribe records 2 RPCs against the
|
||||
/// server (AdviseSupervisory + UnAdvise).
|
||||
/// - Buffered (mutated post-subscribe to flip the registry entry)
|
||||
/// → unsubscribe records 1 RPC (just the original
|
||||
/// AdviseSupervisory; no UnAdvise emitted).
|
||||
///
|
||||
/// Plain check uses the existing
|
||||
/// `subscribe_populates_registry_unsubscribe_clears_it` test as
|
||||
/// the negative control; this test pins the buffered branch.
|
||||
#[tokio::test]
|
||||
async fn unsubscribe_skips_un_advise_for_buffered_subscription() {
|
||||
let (addr, recorded, handle) =
|
||||
recording_server(vec![(0, Vec::new()), (0, Vec::new())]).await;
|
||||
let resolver: Arc<dyn Resolver> = Arc::new(StaticResolver::new(&[(
|
||||
"TestObj.TestInt",
|
||||
sample_metadata(),
|
||||
)]));
|
||||
let session = connect_test_session(addr, resolver).await.unwrap();
|
||||
|
||||
// Issue a plain subscribe — server records AdviseSupervisory.
|
||||
let sub = session.subscribe("TestObj.TestInt").await.unwrap();
|
||||
let cid = sub.correlation_id;
|
||||
assert_eq!(recorded.lock().unwrap().len(), 1, "subscribe should issue 1 RPC");
|
||||
|
||||
// Mutate the registry entry's mode to Buffered (synthesise the
|
||||
// state subscribe_buffered_nmx would have produced).
|
||||
{
|
||||
let mut reg = session.inner.subscriptions.lock().await;
|
||||
let entry = reg.get_mut(&cid).expect("registry entry present");
|
||||
entry.mode = SubscriptionMode::Buffered {
|
||||
rounded_interval_ms: 1000,
|
||||
item_definition: "TestObj.TestInt".to_string(),
|
||||
item_context: String::new(),
|
||||
item_handle: 0,
|
||||
};
|
||||
}
|
||||
|
||||
// Unsubscribe the now-buffered entry. F47 contract: NO
|
||||
// UnAdvise is emitted on the wire; recorded count stays at 1.
|
||||
session.unsubscribe(sub).await.unwrap();
|
||||
assert_eq!(
|
||||
recorded.lock().unwrap().len(),
|
||||
1,
|
||||
"buffered unsubscribe must not issue UnAdvise; recorded RPC count must stay at 1 \
|
||||
(the original AdviseSupervisory)"
|
||||
);
|
||||
|
||||
// Registry is still cleared — F47's skip applies only to the
|
||||
// wire emission, not the consumer-side bookkeeping.
|
||||
assert_eq!(
|
||||
session.inner.subscriptions.lock().await.len(),
|
||||
0,
|
||||
"buffered unsubscribe still removes the registry entry"
|
||||
);
|
||||
|
||||
handle.abort();
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user