Compare commits

..

5 Commits

Author SHA1 Message Date
Joseph Doherty a1c4c6203e design/followups: move F37/F38/F39/F42 to Resolved
rust / build / test / clippy / fmt (push) Has been cancelled
Four M6 sub-followups closed in this session — moved to Resolved
section with concise verdicts referencing the matching commits:

- F37 (commit 34045c2): ASB subscribe_buffered returns Unsupported
- F38 (commit 71c69b8): counting-allocator bench harness + R12
  baseline showing the target is already met
- F39 (closed-via-F38): zero-copy pass not needed for R12 target
  (1-4 allocs/op across the proven matrix); remaining
  optimisations documented as post-V1 work
- F42 (commit e79e289): cargo doc --workspace --no-deps clean

Open M6 work remaining: F35 (compat facade), F36 (NMX
subscribe_buffered), F40 (metrics feature), F41 (public-api
baseline), F43 (release prep), F44 (capture decode evidence).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-06 04:47:38 -04:00
Joseph Doherty 71c69b80c6 [F38] mxaccess-codec: counting-allocator bench harness + R12 baseline
Hand-rolled GlobalAlloc wrapper around System that tracks allocs +
bytes + deallocs via two atomics. Each scenario runs 10k iterations
after a 1k warm-up; output is a markdown table with allocs/op,
bytes/op, deallocs/op.

Why hand-rolled (not dhat/criterion): R12 gates on a single number
("< 5 allocs/write"). dhat is heap-profiling-oriented (call-stack
attribution, JSON snapshots); criterion measures wall-clock latency
which is reported-but-not-gated per 60-roadmap.md:104. A 50-line
GlobalAlloc + atomic counters is the simplest thing that answers
the gate.

Run: `cargo bench -p mxaccess-codec`

Baseline numbers (release, Windows x64):
- Bool write:    1.00 allocs/op
- Int32 write:   2.00 allocs/op
- Float32 write: 2.00 allocs/op
- Float64 write: 2.00 allocs/op
- String write:  4.00 allocs/op (5-char string)
- Handle from_names: 2.00 allocs/op
- DataUpdate decode: 1.00 alloc/op

R12's < 5 allocs/write target is **already met** across the proven
matrix without any zero-copy work. The bench gates on this — any
write_message::encode scenario at >= 5 allocs/op exits the harness
with code 1.

Companion: `design/M6-bench-baseline.md` documents the numbers,
explains the per-scenario breakdown, and tightens F39's scope from
"hit the target" to "nice-to-have optimisations" (BytesMut output
buffer, name-signature cache, session-level scratch pool).

Workspace: 759 tests still pass; clippy --benches clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-06 04:45:33 -04:00
Joseph Doherty e79e289743 [F42] cargo doc --workspace --no-deps clean (0 warnings)
Fix all 33 rustdoc warnings across the workspace:

- Unresolved intra-doc links: rewrite [`name`] → either backtick text
  (when not actually a link) or fully-qualified `[Type::method]` /
  `[crate::module::name]` form. Affected: mxaccess-codec
  (asb_variant, item_control, metadata_query, observed_write_template,
  reference_handle, write_message), mxaccess-rpc (pdu), mxaccess-nmx
  (client), mxaccess-asb-nettcp (nmf), mxaccess-callback (exporter),
  mxaccess (asb_session, session, lib).
- Bracket-text being interpreted as link refs (e.g. `body[17]` →
  `` `body[17]` ``).
- Private-item references in public docs (CALLBACK_BROADCAST_CAPACITY,
  recover_connection_core, mxvalue_to_writevalue) reduced to
  backtick-text since they aren't part of the public API.

`RUSTDOCFLAGS="-D warnings" cargo doc --workspace --no-deps` now
exits clean. Workspace 759 tests pass; clippy clean.

Defers `#![warn(missing_docs)]` lint to a future pass — the cleanup
target is the broken-link warnings, which are signal; missing-docs
would surface hundreds of low-priority public-item gaps that are out
of scope for this F-number.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-06 04:39:51 -04:00
Joseph Doherty 34045c2f6d [F37] mxaccess: AsbSession::subscribe_buffered returns Unsupported
ASB has no `SetBufferedUpdateInterval` analogue — the per-monitored-
item `MinimalMonitoredItem::sample_interval` plays the cadence-knob
role. Calling `subscribe_buffered` on an ASB session now returns
`Error::Unsupported { transport: TransportKind::Asb, operation: ... }`
synchronously, without touching the wire.

The error-construction logic is split into a free fn
`unsupported_subscribe_buffered_error()` so the gate's exact shape
is unit-testable without spinning up a live authenticator + transport
fake. New unit test asserts both the variant tag and that the
operation message names the unsupported method + hints at the
`sample_interval` analogue.

Workspace 758 → 759 tests, clippy clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-06 04:32:45 -04:00
Joseph Doherty 2546710604 design/followups: add F35-F44 for M6 implementation plan
10 new followups decompose M6 (compatibility shim + production
hardening) into parallel-safe sub-streams:

- F35: mxaccess-compat LMXProxyServer-shaped facade (18 methods over
  Session/AsbSession)
- F36: Session::subscribe_buffered NMX path per R2 single-sample
- F37: ASB subscribe_buffered capability gate
- F38: counting-allocator cargo bench harness for R12 target
- F39: zero-copy codec pass (depends on F38)
- F40: optional metrics feature
- F41: cargo public-api baseline (depends on F35/F36/F37/F39/F40)
- F42: cargo doc cleanup pass
- F43: cargo publish --dry-run all crates (depends on F41)
- F44: decode buffered batch + suspend captures (077, 079-082, 094)
  for R2/R5 evidence

Parallelization: Wave 1 = F35/F36/F37/F38/F40/F42/F44 (different
crates / different concerns); Wave 2 = F39 (needs F38's bench);
Wave 3 = F41 (needs API stable); Wave 4 = F43 (release).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-06 04:28:38 -04:00
17 changed files with 594 additions and 36 deletions
+69
View File
@@ -0,0 +1,69 @@
# M6 — `mxaccess-codec` allocation-count baseline
Source: `cargo bench -p mxaccess-codec` (commit recording this file).
Harness: `crates/mxaccess-codec/benches/alloc_count.rs` — a thin
`GlobalAlloc` wrapper that increments two atomics on every `alloc` /
`dealloc` call, then runs each scenario for 10k iterations after a
1k-iteration warm-up.
## Target (per `70-risks-and-open-questions.md` R12)
> Aim for < 5 allocations per write at steady state.
The bench gates on this: any `write_message::encode` scenario at
≥ 5 allocs/op causes the binary to exit with code 1.
## Baseline (release profile, Windows x64)
| scenario | iters | allocs/op | bytes/op | deallocs/op |
|-------------------------------------------|--------:|----------:|---------:|------------:|
| `write_message::encode` (Int32) | 10,000 | 2.00 | 44 | 2.00 |
| `write_message::encode` (Float32) | 10,000 | 2.00 | 44 | 2.00 |
| `write_message::encode` (Float64) | 10,000 | 2.00 | 52 | 2.00 |
| `write_message::encode` (Boolean) | 10,000 | 1.00 | 37 | 1.00 |
| `write_message::encode` (String, 5 chars) | 10,000 | 4.00 | 92 | 4.00 |
| `MxReferenceHandle::from_names` | 10,000 | 2.00 | 22 | 2.00 |
| `NmxSubscriptionMessage::parse_inner` | 10,000 | 1.00 | 72 | 1.00 |
| (DataUpdate, Int32) | | | | |
## Read
R12's < 5 allocs/write target is **already met** across the proven matrix:
- Scalar writes (Bool, Int32, Float32, Float64) sit at 12 allocs/op.
The two allocs come from (1) the encoder's `Vec<u8>` output buffer
and (2) an internal scratch buffer in the value-encode path.
- String writes hit 4 allocs/op (output buffer, UTF-16LE conversion
buffer, the inner-length wrapper, and one more downstream).
- `MxReferenceHandle::from_names` allocates twice (one per
`compute_name_signature` call — UTF-16LE buffer for each name).
- `NmxSubscriptionMessage::parse_inner` allocates once for the
`records: Vec<NmxSubscriptionRecord>` collection.
## Implications for F39
F39 (zero-copy pass) was scoped as the work to *hit* the R12 target.
With the target already met, F39's scope tightens to:
- Move the encoder's output buffer to `bytes::BytesMut` so consumers
can split it without copying. Doesn't reduce alloc count but
improves downstream zero-copy on the wire-write path.
- Cache the per-handle UTF-16LE name conversion (the two
`compute_name_signature` allocs per `from_names`) inside
`MxReferenceHandle` if the same name is registered repeatedly.
- Pool the per-frame scratch buffer at the session level so the
per-write count drops from 2 → 1 on hot paths.
These are nice-to-have optimisations rather than R12 blockers.
## Reproducing
```powershell
cd rust
cargo bench -p mxaccess-codec
```
Numbers are deterministic per release-profile build on a given host.
Numeric drift across hosts is expected (the warm-up + black_box hints
keep iteration counts stable, not the underlying allocator's
small-alloc fast-path heuristics).
+126
View File
@@ -6,6 +6,120 @@ move to `## Resolved` with a date + commit hash.
## Open
### F35 — `mxaccess-compat` LMXProxyServer-shaped facade
**Severity:** P1 — blocks M6 DoD bullet 1 (`mxaccess-compat`: `LMXProxyServer`-shaped methods on top of `Session`).
**Source:** `design/60-roadmap.md:95` + `analysis/decompiled-mxaccess/` (`ILMXProxyServer5` interface) + `src/MxNativeClient/MxNativeCompatibilityServer.cs` (.NET reference) + `c:\Users\dohertj2\Desktop\wwtools\mxaccesscli\` (production CLI consumer).
**Scope.** Port the 18-method `ILMXProxyServer5` surface into `crates/mxaccess-compat/src/lib.rs` as Rust async fns over `mxaccess::Session` (NMX path) and `mxaccess::AsbSession` (ASB path). Method list:
| LMX method | Session mapping | Notes |
|---|---|---|
| `Register(clientName) → hServer` | `Session::connect_nmx` / `AsbSession::connect_asb` (factory) | Returns owned facade handle, not raw int |
| `Unregister(hServer)` | `Session::shutdown` / drop | |
| `AddItem(hServer, itemDef) → hItem` | metadata resolution | Caches `(hItem → ItemRef)` mapping |
| `AddItem2(hServer, itemDef, context) → hItem` | metadata resolution | Same with context |
| `RemoveItem(hServer, hItem)` | handle cleanup | |
| `Advise(hServer, hItem)` | `Session::subscribe` | Routes through internal stream |
| `UnAdvise(hServer, hItem)` | `Session::unsubscribe` | |
| `AdviseSupervisory(hServer, hItem)` | `Session::subscribe` (supervisory mode flag) | wave 2 |
| `Write(hServer, hItem, value, userId)` | `Session::write` | |
| `Write2(hServer, hItem, value, time, userId)` | `Session::write_with_timestamp` | |
| `WriteSecured(hServer, hItem, currUser, verifUser, value)` | `Session::write_secured` | always two ids per R6 |
| `WriteSecured2(hServer, hItem, currUser, verifUser, value, time)` | `Session::write_secured_at` | |
| `AuthenticateUser(hServer, user, pwd) → uid` | identity mapping | |
| `ArchestrAUserToId(hServer, userGuid) → uid` | identity mapping | |
| `Suspend(hServer, hItem) → MxStatus` | `Session::suspend` | experimental per R5 |
| `Activate(hServer, hItem) → MxStatus` | `Session::activate` | experimental per R5 |
| `AddBufferedItem(hServer, itemDef, context) → hItem` | `Session::register_buffered_item` | wave 2; depends on F36 |
| `SetBufferedUpdateInterval(hServer, intervalMs)` | per-session cached cadence | rounded to nearest 100ms |
**Definition of done:**
1. `crates/mxaccess-compat/src/lib.rs` exposes a top-level `LmxClient` (or `LmxFacade`) struct + 18 async methods covering the table above. Internal `(hItem → ItemRef)` map is `Mutex<HashMap<i32, ItemRef>>`.
2. Event surface: per Q4, expose `OnDataChange`/`OnWriteComplete`/`OnBufferedDataChange`/`OperationComplete` as `Stream` impls (NOT COM events — `mxaccess-compat-com` is post-V1).
3. Unit tests covering the handle-table lifecycle (Add → Advise → UnAdvise → Remove and the 18-method dispatch). No live tests required at this stage; wave 2 adds them.
4. `cargo build -p mxaccess-compat` + `cargo test -p mxaccess-compat` + clippy clean.
**Resolves when:** the .NET reference's `MxNativeCompatibilityServer.cs` has a Rust counterpart at the API-shape level (not byte-for-byte at the COM level — that's `mxaccess-compat-com`).
### F36 — `Session::subscribe_buffered` (NMX) per R2 single-sample-per-event answer
**Severity:** P1 — blocks M6 DoD bullet 2 (`subscribe_buffered` (NMX feature) — guarded by `BufferedOptions`; no synthesis if provider returns single-sample batches).
**Source:** `design/60-roadmap.md:97` + `design/70-risks-and-open-questions.md` R2 (single-sample-per-event verified against `wwtools/mxaccesscli/docs/api-notes.md:97-100,138-140,154-157`).
**Scope.** Add a `subscribe_buffered(reference, BufferedOptions { update_interval_ms })` method to `mxaccess::Session` that returns `Stream<Item = Result<DataChange, Error>>` — same item shape as plain `subscribe`, just with a per-session-cached cadence knob. Internal wiring:
1. Translate `update_interval_ms` to LMX's `SetBufferedUpdateInterval` semantics (rounded to nearest 100ms per `MxNativeCompatibilityServer.SetBufferedUpdateInterval:638`).
2. Use the existing `Session::subscribe` machinery; the buffered cadence is a server-side delivery rate knob, not a payload-shape change.
3. Surface as a separate Session method (not just an option on `subscribe`) so the API discoverably documents the cadence semantics.
**Definition of done:**
1. `Session::subscribe_buffered` returns `Stream<Item = Result<DataChange, Error>>` and internally drives the LMX `SetBufferedUpdateInterval` + `AddBufferedItem` call sequence per the captures `079-frida-add-buffered-advise-testint` and `082-frida-add-buffered-plain-advise-testint`.
2. New example `examples/subscribe-buffered.rs` exercises a 1-second cadence against the live AVEVA install. Per R2, no multi-sample synthesis.
3. Integration test asserts `Stream::Item == DataChange` (no `DataChangeBatch`); compile-time check.
4. Doc on `subscribe_buffered` cites R2's verification source and explicitly says "single-sample, cadence knob — not multi-sample payload."
**Resolves when:** `cargo run -p mxaccess --example subscribe-buffered` runs against AVEVA and the live captures `079`/`082` byte-round-trip via the new code path.
### F40 — Optional `metrics` feature: counters + histograms
**Severity:** P2 — M6 DoD bullet 4 (optional `metrics` feature emitting counters / histograms).
**Source:** `design/60-roadmap.md:98`.
**Scope.** Behind a Cargo feature flag (`metrics`), emit counters and histograms via the `metrics` crate (or `tracing` with a metrics layer). Suggested instrumentation points:
- Per-op counters: writes, reads, advises, unadvises, recovery attempts, recovery successes.
- Latency histograms: end-to-end write→OperationComplete, read→reply, subscribe→first-DataChange.
- Connection state gauges: connected sessions, registered items, active subscriptions.
**Definition of done:**
1. `cargo build -p mxaccess --features metrics` compiles + links the metrics crate.
2. Default build (no `metrics` feature) has zero `metrics` dep + zero runtime cost.
3. Doc page lists the emitted metric names + their semantic meaning.
**Resolves when:** the feature compiles, default build is clean, and at least one metric counter increments under a live exercise.
### F41 — `cargo public-api` baseline
**Severity:** P1 — M6 DoD bullet 5 (Docs: `cargo doc` published; `cargo public-api` baseline established).
**Source:** `design/60-roadmap.md:99`.
**Depends on:** F35, F36, F37, F39, F40 (the public surface must be stable before snapshotting).
**Scope.** Run `cargo public-api` on each crate; commit the resulting baseline to `design/public-api/{crate}.txt`. Add a CI step that diffs against the baseline and fails if the public surface changes without a corresponding baseline update.
**Definition of done:**
1. `cargo public-api -p mxaccess` runs clean + baseline committed.
2. Same for `mxaccess-codec`, `mxaccess-compat`, `mxaccess-asb`, `mxaccess-asb-nettcp`, `mxaccess-galaxy`, `mxaccess-rpc`, `mxaccess-callback`, `mxaccess-nmx`.
3. CI diff step in `.github/workflows/rust.yml` (or equivalent).
**Resolves when:** baseline files exist and CI catches drift.
### F43 — Release prep: `cargo publish --dry-run` all crates
**Severity:** P1 — M6 DoD bullet 6.
**Source:** `design/60-roadmap.md:100`.
**Depends on:** F41 (public-api baseline).
**Scope.** Run `cargo publish --dry-run -p {crate}` for every workspace crate. Resolve any missing `description`, `keywords`, `categories`, `readme` metadata fields. Decide a version-bump strategy (likely 0.1.0 across the board for V1 release).
**Definition of done:**
1. `cargo publish --dry-run` passes for every crate.
2. Workspace `Cargo.toml` + per-crate metadata complete.
3. Release notes draft in `CHANGELOG.md` for V1.
**Resolves when:** dry-runs are green and the release notes are written.
### F44 — Decode buffered batch + suspend captures (`077, 079-082, 094`)
**Severity:** P2 — evidence work for R2 (buffered single-sample) and R5 (Activate/Suspend), feeding F36/F35.
**Source:** `design/60-roadmap.md:40` (deferred to M6 + R2) + `design/70-risks-and-open-questions.md` R2/R5 + the captures.
**Scope.** Walk each capture's `frida-to-tcp-map.tsv`, decode the LMX call sequence + matching TCP wire bytes, and either:
- (a) confirm R2's "single-sample-per-event" verdict (default) and document the buffered captures as evidence, OR
- (b) if a multi-sample body is observed, reopen R2 and surface a typed `DataChangeBatch` decode path.
For `077` (Suspend on advised ScanState): document the trigger conditions for R5 — what input made `Suspend` succeed?
**Definition of done:**
1. Each capture has a one-paragraph summary in `docs/M6-buffered-evidence.md` documenting the call sequence, wire bytes, and verdict (R2 single-sample / R5 trigger).
2. Round-trip fixture loaded under `crates/mxaccess-codec/tests/fixtures/m6-buffered/` for any new typed decode paths added.
3. R2 / R5 status updated in `design/70-risks-and-open-questions.md` (either "settled per option (b)" or "settled silently as not a real risk").
**Resolves when:** the evidence summary is committed and R2/R5 statuses are updated accordingly.
### F3 — Cross-domain NTLM Type1/2/3 fixture
**Severity:** P2
**Status:** Permanently out-of-scope on the current dev host (no second AD domain). Resolution requires external infrastructure not available here.
@@ -17,6 +131,18 @@ move to `## Resolved` with a date + commit hash.
## Resolved
### F37 — ASB `subscribe_buffered` capability gate
**Resolved:** 2026-05-06 (commit `34045c2`). `AsbSession::subscribe_buffered` returns `Error::Unsupported { transport: TransportKind::Asb, operation: ... }` synchronously without touching the wire — ASB has no `SetBufferedUpdateInterval` analogue; the per-monitored-item `MinimalMonitoredItem::sample_interval` is the rate-limit knob instead. The error-construction logic is split into a free fn so the gate's exact shape is unit-testable without spinning up a live authenticator + transport. Workspace 758 → 759 tests; clippy clean.
### F38 — Counting-allocator `cargo bench` harness
**Resolved:** 2026-05-06 (commit `71c69b8`). Hand-rolled `GlobalAlloc` wrapper + atomic counters in `crates/mxaccess-codec/benches/alloc_count.rs`; `cargo bench -p mxaccess-codec` runs the proven matrix (write encode for Int32/Float32/Float64/Boolean/String, `MxReferenceHandle::from_names`, `NmxSubscriptionMessage::parse_inner`) and reports allocs/op + bytes/op + deallocs/op. Baseline numbers committed to `design/M6-bench-baseline.md`. Bench gates on R12 (< 5 allocs/write) — exits with code 1 on violation; current baseline is 14 allocs/op across the matrix, well under the target.
### F39 — Zero-copy codec pass (per R12)
**Resolved:** 2026-05-06 (closed via F38 measurements, no code change required). The R12 target (< 5 allocations per write at steady state) is already met across the proven matrix without any zero-copy rewrite — scalar writes are 12 allocs/op, String writes 4 allocs/op (5-char string), `MxReferenceHandle::from_names` 2 allocs/op, `NmxSubscriptionMessage::parse_inner` 1 alloc/op. The remaining nice-to-have optimisations (`BytesMut` output buffer to enable downstream zero-copy splits, name-signature cache to elide the two `compute_name_signature` UTF-16LE conversions per `from_names`, session-level scratch pool to drop per-write count from 2 → 1) are documented in `design/M6-bench-baseline.md` as post-V1 work — they don't gate M6 DoD because R12 is already satisfied.
### F42 — `cargo doc` cleanup pass
**Resolved:** 2026-05-06 (commit `e79e289`). All 33 rustdoc warnings across the workspace fixed: unresolved intra-doc links rewritten as fully-qualified `[Type::method]` / `[crate::module::name]` forms or backtick text where no link target exists; bracket text that was being interpreted as link refs (e.g. `body[17]`) escaped to backtick form; private-item references in public docs (`CALLBACK_BROADCAST_CAPACITY`, `recover_connection_core`, `mxvalue_to_writevalue`) reduced to backtick text. `RUSTDOCFLAGS="-D warnings" cargo doc --workspace --no-deps` exits clean. Workspace 759 tests pass; clippy clean. The optional `#![warn(missing_docs)]` lint is deferred — it would surface hundreds of low-priority public-item gaps that are out of scope for this F-number; it can be re-evaluated in F41 (`cargo public-api`) when the public surface is final.
### F18 — M5 plan of attack (ASB transport, parallel-safe sub-streams)
**Resolved:** 2026-05-06 — all sub-followups F19F26 closed plus F28 / F29 / F30 / F31 / F32 / F33 / F34 layered on top. M5 is functionally LIVE end-to-end: `cargo run -p mxaccess --example asb-subscribe -- --tag TestChildObject.TestInt` against the AVEVA install successfully exercises Connect → AuthenticateMe → RegisterItems → Read → CreateSubscription → AddMonitoredItems → Publish (delivers tag value) → DeleteMonitoredItems → DeleteSubscription → UnregisterItems → Disconnect with canonical-XML HMAC signing on every signed op. **Severity:** P0 — milestone driver, blocks ASB consumers + V1 release.
**Source:** `design/dependencies.md:73-89` + `design/60-roadmap.md:84-91` + `design/70-risks-and-open-questions.md:5-25`.
+1 -1
View File
@@ -220,7 +220,7 @@ impl NmfRecord {
}
/// Encode to a fresh buffer. Convenience wrapper around
/// [`encode_into`].
/// [`Self::encode_into`].
pub fn encode(&self) -> Result<Vec<u8>, NmfError> {
let mut out = Vec::new();
self.encode_into(&mut out)?;
@@ -18,10 +18,9 @@
//! - `INmxSvcCallback` (IID `B49F92F7-C748-4169-8ECA-A0670B012746`,
//! `NmxProcedureMetadata.cs:6`) — opnums 3 (`DataReceived`),
//! 4 (`StatusReceived`). The handler decodes the inbound buffer via
//! [`crate::nmx_callback_messages::parse_callback_request`] (re-export
//! from `mxaccess-rpc::nmx_callback_messages`), emits a typed event,
//! and returns the success response built by
//! [`crate::nmx_callback_messages::encode_callback_response`].
//! [`mxaccess_rpc::nmx_callback_messages::parse_callback_request`],
//! emits a typed event, and returns the success response built by
//! [`mxaccess_rpc::nmx_callback_messages::encode_callback_response`].
//!
//! Auth3 PDUs are accepted but ignored (`ManagedCallbackExporter.cs:133-137`)
//! — NTLM packet integrity for inbound frames is not yet wired (open
+4
View File
@@ -15,5 +15,9 @@ thiserror = { workspace = true }
default = []
serde = []
[[bench]]
name = "alloc_count"
harness = false
[lints]
workspace = true
@@ -0,0 +1,298 @@
//! F38 — counting-allocator bench for `mxaccess-codec`.
//!
//! Measures allocation count + bytes-allocated for the proven
//! encode/decode matrix per `design/70-risks-and-open-questions.md`
//! R12 (< 5 allocs per write at steady state). The harness wraps the
//! global allocator with a [`CountingAllocator`] that tracks
//! per-call counts; each scenario records pre-state, runs N
//! iterations, and reports `(alloc_count, bytes_allocated) / N`.
//!
//! Output is the source of truth for `design/M6-bench-baseline.md`.
//!
//! ## Why hand-rolled (not `dhat` / `criterion`)
//!
//! - `dhat` is heap-profiling oriented (snapshots, call-stack
//! attribution); for "did this op allocate < 5 times?" the simpler
//! approach is a thin `GlobalAlloc` wrapper that increments two
//! atomics. No call-stack capture, no JSON output to post-process.
//! - `criterion` measures wall-clock latency; per `60-roadmap.md:104`,
//! latency is reported but not gating in V1. Allocation count IS
//! the gating metric for M6 DoD bullet 3.
//!
//! ## Run
//!
//! ```text
//! cargo bench -p mxaccess-codec
//! ```
//!
//! Each scenario runs in release mode by default (cargo bench
//! profile = `bench` which inherits release).
#![allow(
clippy::unwrap_used,
clippy::expect_used,
clippy::cast_possible_truncation,
clippy::cast_sign_loss
)]
use std::alloc::{GlobalAlloc, Layout, System};
use std::sync::atomic::{AtomicU64, Ordering};
use mxaccess_codec::{
MxReferenceHandle, NmxSubscriptionMessage, write_message,
write_message::WriteValue,
};
// ---- counting allocator -------------------------------------------------
struct CountingAllocator;
static ALLOC_COUNT: AtomicU64 = AtomicU64::new(0);
static ALLOC_BYTES: AtomicU64 = AtomicU64::new(0);
static DEALLOC_COUNT: AtomicU64 = AtomicU64::new(0);
unsafe impl GlobalAlloc for CountingAllocator {
unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
ALLOC_COUNT.fetch_add(1, Ordering::Relaxed);
ALLOC_BYTES.fetch_add(layout.size() as u64, Ordering::Relaxed);
// SAFETY: forwarding to the system allocator with the same layout.
unsafe { System.alloc(layout) }
}
unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
DEALLOC_COUNT.fetch_add(1, Ordering::Relaxed);
// SAFETY: forwarding to the system allocator with the same ptr+layout.
unsafe { System.dealloc(ptr, layout) }
}
}
#[global_allocator]
static GLOBAL: CountingAllocator = CountingAllocator;
// ---- scenario harness ---------------------------------------------------
#[derive(Debug, Clone, Copy)]
struct Snapshot {
allocs: u64,
bytes: u64,
deallocs: u64,
}
fn snapshot() -> Snapshot {
Snapshot {
allocs: ALLOC_COUNT.load(Ordering::Relaxed),
bytes: ALLOC_BYTES.load(Ordering::Relaxed),
deallocs: DEALLOC_COUNT.load(Ordering::Relaxed),
}
}
fn diff(start: Snapshot, end: Snapshot, iterations: u64) -> (f64, f64, f64) {
(
(end.allocs - start.allocs) as f64 / iterations as f64,
(end.bytes - start.bytes) as f64 / iterations as f64,
(end.deallocs - start.deallocs) as f64 / iterations as f64,
)
}
/// Run `op` `iterations` times and return per-op alloc/bytes/dealloc
/// counts. The hint is passed through `std::hint::black_box` to keep
/// the compiler from optimising the work away.
fn measure<F>(name: &str, iterations: u64, mut op: F) -> Row
where
F: FnMut(),
{
// Warm-up: 1k iterations to settle any one-time setup state.
for _ in 0..1024 {
op();
}
let start = snapshot();
for _ in 0..iterations {
op();
}
let end = snapshot();
let (allocs, bytes, deallocs) = diff(start, end, iterations);
Row {
name: name.to_string(),
iterations,
allocs_per_op: allocs,
bytes_per_op: bytes,
deallocs_per_op: deallocs,
}
}
struct Row {
name: String,
iterations: u64,
allocs_per_op: f64,
bytes_per_op: f64,
deallocs_per_op: f64,
}
fn print_table(rows: &[Row]) {
println!();
println!(
"| {:40} | {:>10} | {:>10} | {:>10} | {:>10} |",
"scenario", "iters", "allocs/op", "bytes/op", "deallocs/op"
);
println!(
"| {:40} | {:>10} | {:>10} | {:>10} | {:>10} |",
"-".repeat(40),
"-".repeat(10),
"-".repeat(10),
"-".repeat(10),
"-".repeat(10)
);
for row in rows {
println!(
"| {:40} | {:>10} | {:>10.2} | {:>10.0} | {:>10.2} |",
row.name, row.iterations, row.allocs_per_op, row.bytes_per_op, row.deallocs_per_op
);
}
println!();
}
// ---- scenarios ----------------------------------------------------------
fn make_handle() -> MxReferenceHandle {
MxReferenceHandle::from_names(0, 1, 2, 3, "TestObject", 0, 1, 0, "TestAttr", false)
.expect("handle")
}
fn bench_write_int32() -> Row {
let handle = make_handle();
let value = WriteValue::Int32(42);
measure("write_message::encode (Int32)", 10_000, || {
let bytes = write_message::encode(&handle, &value, 0, 0).unwrap();
std::hint::black_box(bytes);
})
}
fn bench_write_float() -> Row {
let handle = make_handle();
let value = WriteValue::Float32(1.5);
measure("write_message::encode (Float32)", 10_000, || {
let bytes = write_message::encode(&handle, &value, 0, 0).unwrap();
std::hint::black_box(bytes);
})
}
fn bench_write_double() -> Row {
let handle = make_handle();
let value = WriteValue::Float64(3.25);
measure("write_message::encode (Float64)", 10_000, || {
let bytes = write_message::encode(&handle, &value, 0, 0).unwrap();
std::hint::black_box(bytes);
})
}
fn bench_write_bool() -> Row {
let handle = make_handle();
let value = WriteValue::Boolean(true);
measure("write_message::encode (Boolean)", 10_000, || {
let bytes = write_message::encode(&handle, &value, 0, 0).unwrap();
std::hint::black_box(bytes);
})
}
fn bench_write_string() -> Row {
let handle = make_handle();
let value = WriteValue::String("hello".to_string());
measure("write_message::encode (String, 5 chars)", 10_000, || {
let bytes = write_message::encode(&handle, &value, 0, 0).unwrap();
std::hint::black_box(bytes);
})
}
fn bench_subscription_decode() -> Row {
// Build a single-record DataUpdate body once; decode N times.
let body = build_data_update_int32_body(42);
measure(
"NmxSubscriptionMessage::parse_inner (DataUpdate, Int32)",
10_000,
|| {
let msg = NmxSubscriptionMessage::parse_inner(&body).unwrap();
std::hint::black_box(msg);
},
)
}
fn bench_handle_from_names() -> Row {
measure("MxReferenceHandle::from_names", 10_000, || {
let h = MxReferenceHandle::from_names(
0,
1,
2,
3,
"TestChildObject",
0,
1,
0,
"TestInt",
false,
)
.unwrap();
std::hint::black_box(h);
})
}
// ---- helpers (mirror the test fixtures in subscription_message.rs) -----
fn build_data_update_int32_body(value: i32) -> Vec<u8> {
// Operation id + correlation id are arbitrary 16-byte sequences for
// a synthetic body; the codec doesn't reject any GUID shape.
const DATA_UPDATE_COMMAND: u8 = 0x33;
let operation_id = [0x11u8; 16];
// 15-byte record prefix: status(4) + quality(2) + filetime(8) + wire_kind(1).
// wire_kind = 0x02 = Int32. Then the 4-byte i32 LE payload.
let mut record = Vec::with_capacity(15 + 4);
record.extend_from_slice(&0i32.to_le_bytes()); // status
record.extend_from_slice(&0x00C0u16.to_le_bytes()); // quality
record.extend_from_slice(&0i64.to_le_bytes()); // filetime
record.push(0x02); // wire_kind = Int32
record.extend_from_slice(&value.to_le_bytes());
let mut out = Vec::with_capacity(23 + record.len());
out.push(DATA_UPDATE_COMMAND);
out.extend_from_slice(&1u16.to_le_bytes()); // version
out.extend_from_slice(&1i32.to_le_bytes()); // record_count = 1
out.extend_from_slice(&operation_id);
out.extend_from_slice(&record);
out
}
// ---- main --------------------------------------------------------------
fn main() {
println!("F38 — mxaccess-codec allocation-count baseline");
println!("Counting allocator: thin GlobalAlloc wrapper around System.");
println!("R12 target: < 5 allocations per write at steady state.");
let rows = vec![
bench_write_int32(),
bench_write_float(),
bench_write_double(),
bench_write_bool(),
bench_write_string(),
bench_handle_from_names(),
bench_subscription_decode(),
];
print_table(&rows);
// R12 gate: emit a non-zero exit code if any encode-write scenario
// exceeds the 5-allocs threshold. Decoders are reported but not
// gated (the sweep below explicitly excludes them).
let mut violations = 0;
for row in &rows {
if row.name.starts_with("write_message::encode") && row.allocs_per_op >= 5.0 {
eprintln!(
"R12 violation: {} allocates {:.2}/op (>= 5)",
row.name, row.allocs_per_op
);
violations += 1;
}
}
if violations > 0 {
std::process::exit(1);
}
}
@@ -154,7 +154,7 @@ impl AsbVariant {
}
}
/// Standalone encode: convenience wrapper around [`encode_into`].
/// Standalone encode: convenience wrapper around [`Self::encode_into`].
pub fn encode(&self) -> Vec<u8> {
let mut out = Vec::with_capacity(self.wire_len());
self.encode_into(&mut out);
@@ -2,7 +2,7 @@
//!
//! Direct port of `src/MxNativeCodec/NmxItemControlMessage.cs`. The body
//! carries an advise-supervisory or unadvise command together with a 16-byte
//! item correlation GUID and a 14-byte projection of an [`MxReferenceHandle`]
//! item correlation GUID and a 14-byte projection of an [`crate::reference_handle::MxReferenceHandle`]
//! (handle bytes 6..20 — `object_id` through `attribute_index`).
//!
//! ## Wire layout
@@ -87,7 +87,7 @@ const PAYLOAD_LENGTH: usize = 18; // cs:28 — 7×u16 + u32 tail = 18 bytes
pub const DEFAULT_TAIL: u32 = 3;
/// Decoded NMX item-control body. The fields after `item_correlation_id`
/// project bytes 6..20 of an [`MxReferenceHandle`] — see
/// project bytes 6..20 of an [`crate::reference_handle::MxReferenceHandle`] — see
/// `NmxItemControlMessage.cs:71-81, 134-141`.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct NmxItemControlMessage {
@@ -1,7 +1,7 @@
//! `NmxMetadataQueryMessage` — observed pre-advise metadata-query body.
//!
//! Direct port of `src/MxNativeCodec/NmxMetadataQueryMessage.cs`. The .NET
//! reference exposes a single static helper, [`encode_observed_pre_advise`],
//! reference exposes a single static helper, `encode_observed_pre_advise`,
//! which returns a fixed observed body with a 16-byte item-correlation GUID
//! patched in at offset `0x8a`.
//!
@@ -44,7 +44,7 @@
//! `Encode` (`.cs:51-64`) writes:
//!
//! 1. The captured prefix (`_prefix`, raw bytes) — preserved verbatim.
//! 2. The freshly-encoded value bytes from [`encode_value`].
//! 2. The freshly-encoded value bytes from `encode_value_bytes`.
//! 3. The captured suffix (`_suffixBeforeWriteIndex`) — preserved verbatim.
//! 4. The fresh `writeIndex` as i32 LE in the trailing 4 bytes.
//!
@@ -167,12 +167,12 @@ impl ObservedWriteBodyTemplate {
}
}
/// Captured opcode at body[0]. Mirrors `_prefix[0]`.
/// Captured opcode at `body[0]`. Mirrors `_prefix[0]`.
pub fn command(&self) -> u8 {
self.command
}
/// Captured wire-kind byte at body[17]. Drawn from the captured prefix,
/// Captured wire-kind byte at `body[17]`. Drawn from the captured prefix,
/// not from the runtime [`MxValueKind`] (which can disambiguate
/// String vs DateTime past the encoder collapse).
pub fn wire_kind(&self) -> u8 {
@@ -34,8 +34,9 @@ const CRC16_IBM_POLYNOMIAL: u16 = 0xa001;
///
/// `object_signature` and `attribute_signature` are derived values. The Rust
/// port keeps them private — the only constructor that produces a handle from
/// names is [`from_names`]; the only mutators that update one signature are
/// [`with_object_tag_name`] and [`with_attribute_name`], which both
/// names is [`MxReferenceHandle::from_names`]; the only mutators that update
/// one signature are [`MxReferenceHandle::with_object_tag_name`] and
/// [`MxReferenceHandle::with_attribute_name`], which both
/// recompute. This is a deliberate tightening over the .NET reference (which
/// is a record with public init-only signature fields).
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
@@ -157,7 +158,7 @@ impl MxReferenceHandle {
/// # Panics
///
/// Panics if `destination.len() < 20`. Use a 20-byte slice or call
/// [`encode`] for a fresh buffer.
/// [`Self::encode`] for a fresh buffer.
pub fn write_to(self, destination: &mut [u8]) {
assert!(
destination.len() >= Self::ENCODED_LEN,
@@ -72,8 +72,8 @@
//! 28.. payload
//! ```
//!
//! The encoder writes `count` (u16) at body[22] and `element_width` (u16) at
//! body[24]. The decoder/subscription side reads `element_width` as `i32` at
//! The encoder writes `count` (u16) at `body[22]` and `element_width` (u16) at
//! `body[24]`. The decoder/subscription side reads `element_width` as `i32` at
//! a different offset — that asymmetry is documented in the subscription
//! message module, not here. Encoder element widths are 2/4/4/8 for
//! Boolean/Int32/Float32/Float64 arrays; for variable arrays (String,
+2 -2
View File
@@ -88,7 +88,7 @@ pub enum NmxClientError {
/// The metadata's `(mx_data_type, is_array)` pair has no LMX wire
/// encoding (e.g. arrays of `ElapsedTime`, scalars of
/// `ReferenceType`). Returned by [`NmxClient::resolve_write_kind`]
/// `ReferenceType`). Returned by [`GalaxyTagMetadata::resolve_write_kind`]
/// helpers when the caller asks for a kind that
/// [`mxaccess_codec::MxValueKind::for_data_type`] rejects.
/// Mirrors the `ArgumentOutOfRangeException` paths in the .NET
@@ -530,7 +530,7 @@ impl NmxClient {
}
/// `INmxService2::RemoveSubscriberEngine` (opnum 8). Mirrors
/// `cs:137-147`. Same wire shape as [`add_subscriber_engine`].
/// `cs:137-147`. Same wire shape as [`Self::add_subscriber_engine`].
///
/// # Errors
/// Transport or codec.
+3 -3
View File
@@ -539,7 +539,7 @@ impl BindPdu {
/// Encode the Bind / AlterContext PDU. Returns the wire bytes
/// (`DceRpcPdu.cs:264-290`). Sets `frag_length` and `auth_length=0` on
/// the encoded header — to attach an auth verifier use [`encode_with_auth`].
/// the encoded header — to attach an auth verifier use [`Self::encode_with_auth`].
pub fn encode(&self) -> Vec<u8> {
let length: usize = BIND_BODY_OFFSET
+ self
@@ -955,7 +955,7 @@ impl BindAckPdu {
///
/// The .NET `Encode` (`DceRpcPdu.cs:118-124`) defaults `packet_flags` to
/// `0x03` (PFC_FIRST_FRAG | PFC_LAST_FRAG) only when the supplied flags are
/// 0; the Rust port keeps the same exact behaviour in [`encode`].
/// 0; the Rust port keeps the same exact behaviour in [`RequestPdu::encode`].
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RequestPdu {
pub header: PduHeader,
@@ -1081,7 +1081,7 @@ pub struct ResponsePdu {
impl ResponsePdu {
/// Decode a Response PDU (`DceRpcPdu.cs:141-168`). The .NET reference
/// has no encoder for Response — the Rust port adds [`encode`] for tests
/// has no encoder for Response — the Rust port adds [`Self::encode`] for tests
/// and round-trip use.
pub fn decode(buf: &[u8]) -> Result<Self, RpcError> {
let header = PduHeader::decode(buf)?;
+69 -8
View File
@@ -21,14 +21,15 @@
//! Implements:
//! * [`AsbSession::connect`] — TCP connect → preamble → DH handshake
//! → ready session.
//! * [`AsbSession::register_items`] / [`unregister_items`] /
//! [`read`] / [`write`] — per-operation thin async wrappers.
//! * [`AsbSession::keep_alive`] / [`disconnect`] / [`shutdown`] —
//! lifecycle.
//! * [`AsbSession::register_items`] / [`AsbSession::unregister_items`] /
//! [`AsbSession::read`] / [`AsbSession::write`] — per-operation thin
//! async wrappers.
//! * [`AsbSession::keep_alive`] / [`AsbSession::disconnect`] /
//! `AsbSession::shutdown` — lifecycle.
//! * [`AsbSession::create_subscription`] /
//! [`add_monitored_items`] / [`publish`] /
//! [`delete_monitored_items`] / [`delete_subscription`] —
//! subscription primitives.
//! [`AsbSession::add_monitored_items`] / [`AsbSession::publish`] /
//! [`AsbSession::delete_monitored_items`] /
//! [`AsbSession::delete_subscription`] — subscription primitives.
//! * [`AsbSession::subscribe`] — returns an [`AsbSubscription`]
//! `Stream<Item = Result<MonitoredItemValue, Error>>` driven by a
//! background publish-loop. Drop of the stream aborts the loop.
@@ -64,7 +65,7 @@ use tokio::task::JoinHandle;
use tokio_stream::wrappers::ReceiverStream;
use crate::transport_asb::AsbTransport;
use crate::{ConnectionError, Error};
use crate::{BufferedOptions, ConnectionError, Error, TransportKind};
/// Channel buffer for [`AsbSubscription`]'s publish-loop. 256 samples is
/// generous for the typical sample-rate budget (1-100 Hz) — bounded so
@@ -345,6 +346,36 @@ impl AsbSession {
join: Some(join),
}
}
/// `subscribe_buffered` is **not supported** on the ASB transport.
/// ASB has no buffered-cadence equivalent of NMX's
/// `SetBufferedUpdateInterval`; the per-monitored-item `SampleInterval`
/// (see [`mxaccess_asb::MinimalMonitoredItem::sample_interval`])
/// already plays the rate-limit role — set it on each
/// `MonitoredItem` passed to [`Self::add_monitored_items`] instead.
///
/// Returns [`Error::Unsupported`] synchronously, without touching
/// the wire. F37.
pub async fn subscribe_buffered(
&self,
_reference: &str,
_options: BufferedOptions,
) -> Result<AsbSubscription, Error> {
Err(unsupported_subscribe_buffered_error())
}
}
/// F37 — typed `Error::Unsupported` returned by
/// [`AsbSession::subscribe_buffered`]. Extracted into a free fn so the
/// gate's exact shape is unit-testable without constructing a full
/// `AsbSession` (which requires a live authenticator + transport).
fn unsupported_subscribe_buffered_error() -> Error {
Error::Unsupported {
operation: std::borrow::Cow::Borrowed(
"AsbSession::subscribe_buffered (use MinimalMonitoredItem::sample_interval; ASB has no SetBufferedUpdateInterval analogue)",
),
transport: TransportKind::Asb,
}
}
/// Inner publish-loop body — testable in isolation by passing any
@@ -477,6 +508,36 @@ mod tests {
assert_stream_send_unpin::<AsbSubscription>();
}
/// F37 — the `Error::Unsupported` returned by
/// `AsbSession::subscribe_buffered` carries `TransportKind::Asb`
/// and an operation message mentioning `subscribe_buffered`.
/// Targets the helper directly so the test doesn't need to spin
/// up a live authenticator / transport.
#[test]
fn subscribe_buffered_unsupported_error_shape() {
let err = unsupported_subscribe_buffered_error();
match err {
Error::Unsupported {
transport,
operation,
} => {
assert!(
matches!(transport, TransportKind::Asb),
"expected TransportKind::Asb, got {transport:?}"
);
assert!(
operation.contains("subscribe_buffered"),
"operation should name the unsupported method, got {operation:?}"
);
assert!(
operation.contains("sample_interval"),
"operation should hint at the ASB analogue, got {operation:?}"
);
}
other => panic!("expected Error::Unsupported, got {other:?}"),
}
}
fn fake_value(idx: i32) -> MonitoredItemValue {
MonitoredItemValue {
item: ItemIdentity::absolute_by_name(format!("Tag{idx}")),
+2 -2
View File
@@ -397,8 +397,8 @@ impl Session {
}
/// Write a value to a tag (`MxValue` overload). Delegates to
/// [`Self::write_value`] after converting `value` via
/// [`mxvalue_to_writevalue`].
/// [`Self::write_value`] after converting `value` via the private
/// `mxvalue_to_writevalue` helper.
///
/// # Errors
/// As for [`Self::write_value`], plus
+4 -4
View File
@@ -119,7 +119,7 @@ const CALLBACK_BROADCAST_CAPACITY: usize = 256;
/// ## Lag behavior
///
/// The underlying broadcast channel has a fixed capacity
/// ([`CALLBACK_BROADCAST_CAPACITY`]). Slow consumers that fall behind
/// (`CALLBACK_BROADCAST_CAPACITY`). Slow consumers that fall behind
/// receive `Some(Err(Error::Configuration(InvalidArgument)))` whose
/// detail string contains the lag-loss count. The stream stays open
/// and resumes from the next available message — same shape as
@@ -449,7 +449,7 @@ impl Session {
/// `(host, port, service_ipid)` before `RegisterEngine2`. This
/// constructor takes the resolved triple by hand — useful for
/// tests, deterministic probes, and non-Windows builds. Use
/// [`Self::connect_nmx_auto`] (Windows + `windows-com` feature) to
/// `Self::connect_nmx_auto` (Windows + `windows-com` feature) to
/// drive the auto-resolving path.
///
/// On success: a `RegisterEngine2` round-trip has completed and the
@@ -650,7 +650,7 @@ impl Session {
/// `RecoverConnectionCore` (`cs:442-474`).
///
/// Per attempt: emit [`RecoveryEvent::Started`], invoke
/// [`recover_connection_core`](Self::recover_connection_core) (rebuild
/// the private `recover_connection_core` helper (rebuild
/// the NMX transport via the installed [`RebuildFactory`], re-run
/// `RegisterEngine2` with the saved callback OBJREF, replay every
/// active subscription's `AdviseSupervisory`, atomically swap the
@@ -821,7 +821,7 @@ impl Session {
/// This accessor is the test seam + escape hatch consumers can use
/// today to observe the raw stream.
///
/// Receivers can lag by up to [`CALLBACK_BROADCAST_CAPACITY`]
/// Receivers can lag by up to `CALLBACK_BROADCAST_CAPACITY`
/// messages before the broadcast channel starts dropping; lagged
/// receivers see [`tokio::sync::broadcast::error::RecvError::Lagged`].
#[must_use]