Compare commits
5 Commits
bedad57b4e
...
a1c4c6203e
| Author | SHA1 | Date | |
|---|---|---|---|
| a1c4c6203e | |||
| 71c69b80c6 | |||
| e79e289743 | |||
| 34045c2f6d | |||
| 2546710604 |
@@ -0,0 +1,69 @@
|
||||
# M6 — `mxaccess-codec` allocation-count baseline
|
||||
|
||||
Source: `cargo bench -p mxaccess-codec` (commit recording this file).
|
||||
Harness: `crates/mxaccess-codec/benches/alloc_count.rs` — a thin
|
||||
`GlobalAlloc` wrapper that increments two atomics on every `alloc` /
|
||||
`dealloc` call, then runs each scenario for 10k iterations after a
|
||||
1k-iteration warm-up.
|
||||
|
||||
## Target (per `70-risks-and-open-questions.md` R12)
|
||||
|
||||
> Aim for < 5 allocations per write at steady state.
|
||||
|
||||
The bench gates on this: any `write_message::encode` scenario at
|
||||
≥ 5 allocs/op causes the binary to exit with code 1.
|
||||
|
||||
## Baseline (release profile, Windows x64)
|
||||
|
||||
| scenario | iters | allocs/op | bytes/op | deallocs/op |
|
||||
|-------------------------------------------|--------:|----------:|---------:|------------:|
|
||||
| `write_message::encode` (Int32) | 10,000 | 2.00 | 44 | 2.00 |
|
||||
| `write_message::encode` (Float32) | 10,000 | 2.00 | 44 | 2.00 |
|
||||
| `write_message::encode` (Float64) | 10,000 | 2.00 | 52 | 2.00 |
|
||||
| `write_message::encode` (Boolean) | 10,000 | 1.00 | 37 | 1.00 |
|
||||
| `write_message::encode` (String, 5 chars) | 10,000 | 4.00 | 92 | 4.00 |
|
||||
| `MxReferenceHandle::from_names` | 10,000 | 2.00 | 22 | 2.00 |
|
||||
| `NmxSubscriptionMessage::parse_inner` | 10,000 | 1.00 | 72 | 1.00 |
|
||||
| (DataUpdate, Int32) | | | | |
|
||||
|
||||
## Read
|
||||
|
||||
R12's < 5 allocs/write target is **already met** across the proven matrix:
|
||||
|
||||
- Scalar writes (Bool, Int32, Float32, Float64) sit at 1–2 allocs/op.
|
||||
The two allocs come from (1) the encoder's `Vec<u8>` output buffer
|
||||
and (2) an internal scratch buffer in the value-encode path.
|
||||
- String writes hit 4 allocs/op (output buffer, UTF-16LE conversion
|
||||
buffer, the inner-length wrapper, and one more downstream).
|
||||
- `MxReferenceHandle::from_names` allocates twice (one per
|
||||
`compute_name_signature` call — UTF-16LE buffer for each name).
|
||||
- `NmxSubscriptionMessage::parse_inner` allocates once for the
|
||||
`records: Vec<NmxSubscriptionRecord>` collection.
|
||||
|
||||
## Implications for F39
|
||||
|
||||
F39 (zero-copy pass) was scoped as the work to *hit* the R12 target.
|
||||
With the target already met, F39's scope tightens to:
|
||||
|
||||
- Move the encoder's output buffer to `bytes::BytesMut` so consumers
|
||||
can split it without copying. Doesn't reduce alloc count but
|
||||
improves downstream zero-copy on the wire-write path.
|
||||
- Cache the per-handle UTF-16LE name conversion (the two
|
||||
`compute_name_signature` allocs per `from_names`) inside
|
||||
`MxReferenceHandle` if the same name is registered repeatedly.
|
||||
- Pool the per-frame scratch buffer at the session level so the
|
||||
per-write count drops from 2 → 1 on hot paths.
|
||||
|
||||
These are nice-to-have optimisations rather than R12 blockers.
|
||||
|
||||
## Reproducing
|
||||
|
||||
```powershell
|
||||
cd rust
|
||||
cargo bench -p mxaccess-codec
|
||||
```
|
||||
|
||||
Numbers are deterministic per release-profile build on a given host.
|
||||
Numeric drift across hosts is expected (the warm-up + black_box hints
|
||||
keep iteration counts stable, not the underlying allocator's
|
||||
small-alloc fast-path heuristics).
|
||||
@@ -6,6 +6,120 @@ move to `## Resolved` with a date + commit hash.
|
||||
|
||||
## Open
|
||||
|
||||
### F35 — `mxaccess-compat` LMXProxyServer-shaped facade
|
||||
**Severity:** P1 — blocks M6 DoD bullet 1 (`mxaccess-compat`: `LMXProxyServer`-shaped methods on top of `Session`).
|
||||
**Source:** `design/60-roadmap.md:95` + `analysis/decompiled-mxaccess/` (`ILMXProxyServer5` interface) + `src/MxNativeClient/MxNativeCompatibilityServer.cs` (.NET reference) + `c:\Users\dohertj2\Desktop\wwtools\mxaccesscli\` (production CLI consumer).
|
||||
|
||||
**Scope.** Port the 18-method `ILMXProxyServer5` surface into `crates/mxaccess-compat/src/lib.rs` as Rust async fns over `mxaccess::Session` (NMX path) and `mxaccess::AsbSession` (ASB path). Method list:
|
||||
|
||||
| LMX method | Session mapping | Notes |
|
||||
|---|---|---|
|
||||
| `Register(clientName) → hServer` | `Session::connect_nmx` / `AsbSession::connect_asb` (factory) | Returns owned facade handle, not raw int |
|
||||
| `Unregister(hServer)` | `Session::shutdown` / drop | |
|
||||
| `AddItem(hServer, itemDef) → hItem` | metadata resolution | Caches `(hItem → ItemRef)` mapping |
|
||||
| `AddItem2(hServer, itemDef, context) → hItem` | metadata resolution | Same with context |
|
||||
| `RemoveItem(hServer, hItem)` | handle cleanup | |
|
||||
| `Advise(hServer, hItem)` | `Session::subscribe` | Routes through internal stream |
|
||||
| `UnAdvise(hServer, hItem)` | `Session::unsubscribe` | |
|
||||
| `AdviseSupervisory(hServer, hItem)` | `Session::subscribe` (supervisory mode flag) | wave 2 |
|
||||
| `Write(hServer, hItem, value, userId)` | `Session::write` | |
|
||||
| `Write2(hServer, hItem, value, time, userId)` | `Session::write_with_timestamp` | |
|
||||
| `WriteSecured(hServer, hItem, currUser, verifUser, value)` | `Session::write_secured` | always two ids per R6 |
|
||||
| `WriteSecured2(hServer, hItem, currUser, verifUser, value, time)` | `Session::write_secured_at` | |
|
||||
| `AuthenticateUser(hServer, user, pwd) → uid` | identity mapping | |
|
||||
| `ArchestrAUserToId(hServer, userGuid) → uid` | identity mapping | |
|
||||
| `Suspend(hServer, hItem) → MxStatus` | `Session::suspend` | experimental per R5 |
|
||||
| `Activate(hServer, hItem) → MxStatus` | `Session::activate` | experimental per R5 |
|
||||
| `AddBufferedItem(hServer, itemDef, context) → hItem` | `Session::register_buffered_item` | wave 2; depends on F36 |
|
||||
| `SetBufferedUpdateInterval(hServer, intervalMs)` | per-session cached cadence | rounded to nearest 100ms |
|
||||
|
||||
**Definition of done:**
|
||||
1. `crates/mxaccess-compat/src/lib.rs` exposes a top-level `LmxClient` (or `LmxFacade`) struct + 18 async methods covering the table above. Internal `(hItem → ItemRef)` map is `Mutex<HashMap<i32, ItemRef>>`.
|
||||
2. Event surface: per Q4, expose `OnDataChange`/`OnWriteComplete`/`OnBufferedDataChange`/`OperationComplete` as `Stream` impls (NOT COM events — `mxaccess-compat-com` is post-V1).
|
||||
3. Unit tests covering the handle-table lifecycle (Add → Advise → UnAdvise → Remove and the 18-method dispatch). No live tests required at this stage; wave 2 adds them.
|
||||
4. `cargo build -p mxaccess-compat` + `cargo test -p mxaccess-compat` + clippy clean.
|
||||
|
||||
**Resolves when:** the .NET reference's `MxNativeCompatibilityServer.cs` has a Rust counterpart at the API-shape level (not byte-for-byte at the COM level — that's `mxaccess-compat-com`).
|
||||
|
||||
### F36 — `Session::subscribe_buffered` (NMX) per R2 single-sample-per-event answer
|
||||
**Severity:** P1 — blocks M6 DoD bullet 2 (`subscribe_buffered` (NMX feature) — guarded by `BufferedOptions`; no synthesis if provider returns single-sample batches).
|
||||
**Source:** `design/60-roadmap.md:97` + `design/70-risks-and-open-questions.md` R2 (single-sample-per-event verified against `wwtools/mxaccesscli/docs/api-notes.md:97-100,138-140,154-157`).
|
||||
|
||||
**Scope.** Add a `subscribe_buffered(reference, BufferedOptions { update_interval_ms })` method to `mxaccess::Session` that returns `Stream<Item = Result<DataChange, Error>>` — same item shape as plain `subscribe`, just with a per-session-cached cadence knob. Internal wiring:
|
||||
1. Translate `update_interval_ms` to LMX's `SetBufferedUpdateInterval` semantics (rounded to nearest 100ms per `MxNativeCompatibilityServer.SetBufferedUpdateInterval:638`).
|
||||
2. Use the existing `Session::subscribe` machinery; the buffered cadence is a server-side delivery rate knob, not a payload-shape change.
|
||||
3. Surface as a separate Session method (not just an option on `subscribe`) so the API discoverably documents the cadence semantics.
|
||||
|
||||
**Definition of done:**
|
||||
1. `Session::subscribe_buffered` returns `Stream<Item = Result<DataChange, Error>>` and internally drives the LMX `SetBufferedUpdateInterval` + `AddBufferedItem` call sequence per the captures `079-frida-add-buffered-advise-testint` and `082-frida-add-buffered-plain-advise-testint`.
|
||||
2. New example `examples/subscribe-buffered.rs` exercises a 1-second cadence against the live AVEVA install. Per R2, no multi-sample synthesis.
|
||||
3. Integration test asserts `Stream::Item == DataChange` (no `DataChangeBatch`); compile-time check.
|
||||
4. Doc on `subscribe_buffered` cites R2's verification source and explicitly says "single-sample, cadence knob — not multi-sample payload."
|
||||
|
||||
**Resolves when:** `cargo run -p mxaccess --example subscribe-buffered` runs against AVEVA and the live captures `079`/`082` byte-round-trip via the new code path.
|
||||
|
||||
|
||||
### F40 — Optional `metrics` feature: counters + histograms
|
||||
**Severity:** P2 — M6 DoD bullet 4 (optional `metrics` feature emitting counters / histograms).
|
||||
**Source:** `design/60-roadmap.md:98`.
|
||||
|
||||
**Scope.** Behind a Cargo feature flag (`metrics`), emit counters and histograms via the `metrics` crate (or `tracing` with a metrics layer). Suggested instrumentation points:
|
||||
- Per-op counters: writes, reads, advises, unadvises, recovery attempts, recovery successes.
|
||||
- Latency histograms: end-to-end write→OperationComplete, read→reply, subscribe→first-DataChange.
|
||||
- Connection state gauges: connected sessions, registered items, active subscriptions.
|
||||
|
||||
**Definition of done:**
|
||||
1. `cargo build -p mxaccess --features metrics` compiles + links the metrics crate.
|
||||
2. Default build (no `metrics` feature) has zero `metrics` dep + zero runtime cost.
|
||||
3. Doc page lists the emitted metric names + their semantic meaning.
|
||||
|
||||
**Resolves when:** the feature compiles, default build is clean, and at least one metric counter increments under a live exercise.
|
||||
|
||||
### F41 — `cargo public-api` baseline
|
||||
**Severity:** P1 — M6 DoD bullet 5 (Docs: `cargo doc` published; `cargo public-api` baseline established).
|
||||
**Source:** `design/60-roadmap.md:99`.
|
||||
**Depends on:** F35, F36, F37, F39, F40 (the public surface must be stable before snapshotting).
|
||||
|
||||
**Scope.** Run `cargo public-api` on each crate; commit the resulting baseline to `design/public-api/{crate}.txt`. Add a CI step that diffs against the baseline and fails if the public surface changes without a corresponding baseline update.
|
||||
|
||||
**Definition of done:**
|
||||
1. `cargo public-api -p mxaccess` runs clean + baseline committed.
|
||||
2. Same for `mxaccess-codec`, `mxaccess-compat`, `mxaccess-asb`, `mxaccess-asb-nettcp`, `mxaccess-galaxy`, `mxaccess-rpc`, `mxaccess-callback`, `mxaccess-nmx`.
|
||||
3. CI diff step in `.github/workflows/rust.yml` (or equivalent).
|
||||
|
||||
**Resolves when:** baseline files exist and CI catches drift.
|
||||
|
||||
### F43 — Release prep: `cargo publish --dry-run` all crates
|
||||
**Severity:** P1 — M6 DoD bullet 6.
|
||||
**Source:** `design/60-roadmap.md:100`.
|
||||
**Depends on:** F41 (public-api baseline).
|
||||
|
||||
**Scope.** Run `cargo publish --dry-run -p {crate}` for every workspace crate. Resolve any missing `description`, `keywords`, `categories`, `readme` metadata fields. Decide a version-bump strategy (likely 0.1.0 across the board for V1 release).
|
||||
|
||||
**Definition of done:**
|
||||
1. `cargo publish --dry-run` passes for every crate.
|
||||
2. Workspace `Cargo.toml` + per-crate metadata complete.
|
||||
3. Release notes draft in `CHANGELOG.md` for V1.
|
||||
|
||||
**Resolves when:** dry-runs are green and the release notes are written.
|
||||
|
||||
### F44 — Decode buffered batch + suspend captures (`077, 079-082, 094`)
|
||||
**Severity:** P2 — evidence work for R2 (buffered single-sample) and R5 (Activate/Suspend), feeding F36/F35.
|
||||
**Source:** `design/60-roadmap.md:40` (deferred to M6 + R2) + `design/70-risks-and-open-questions.md` R2/R5 + the captures.
|
||||
|
||||
**Scope.** Walk each capture's `frida-to-tcp-map.tsv`, decode the LMX call sequence + matching TCP wire bytes, and either:
|
||||
- (a) confirm R2's "single-sample-per-event" verdict (default) and document the buffered captures as evidence, OR
|
||||
- (b) if a multi-sample body is observed, reopen R2 and surface a typed `DataChangeBatch` decode path.
|
||||
|
||||
For `077` (Suspend on advised ScanState): document the trigger conditions for R5 — what input made `Suspend` succeed?
|
||||
|
||||
**Definition of done:**
|
||||
1. Each capture has a one-paragraph summary in `docs/M6-buffered-evidence.md` documenting the call sequence, wire bytes, and verdict (R2 single-sample / R5 trigger).
|
||||
2. Round-trip fixture loaded under `crates/mxaccess-codec/tests/fixtures/m6-buffered/` for any new typed decode paths added.
|
||||
3. R2 / R5 status updated in `design/70-risks-and-open-questions.md` (either "settled per option (b)" or "settled silently as not a real risk").
|
||||
|
||||
**Resolves when:** the evidence summary is committed and R2/R5 statuses are updated accordingly.
|
||||
|
||||
### F3 — Cross-domain NTLM Type1/2/3 fixture
|
||||
**Severity:** P2
|
||||
**Status:** Permanently out-of-scope on the current dev host (no second AD domain). Resolution requires external infrastructure not available here.
|
||||
@@ -17,6 +131,18 @@ move to `## Resolved` with a date + commit hash.
|
||||
|
||||
## Resolved
|
||||
|
||||
### F37 — ASB `subscribe_buffered` capability gate
|
||||
**Resolved:** 2026-05-06 (commit `34045c2`). `AsbSession::subscribe_buffered` returns `Error::Unsupported { transport: TransportKind::Asb, operation: ... }` synchronously without touching the wire — ASB has no `SetBufferedUpdateInterval` analogue; the per-monitored-item `MinimalMonitoredItem::sample_interval` is the rate-limit knob instead. The error-construction logic is split into a free fn so the gate's exact shape is unit-testable without spinning up a live authenticator + transport. Workspace 758 → 759 tests; clippy clean.
|
||||
|
||||
### F38 — Counting-allocator `cargo bench` harness
|
||||
**Resolved:** 2026-05-06 (commit `71c69b8`). Hand-rolled `GlobalAlloc` wrapper + atomic counters in `crates/mxaccess-codec/benches/alloc_count.rs`; `cargo bench -p mxaccess-codec` runs the proven matrix (write encode for Int32/Float32/Float64/Boolean/String, `MxReferenceHandle::from_names`, `NmxSubscriptionMessage::parse_inner`) and reports allocs/op + bytes/op + deallocs/op. Baseline numbers committed to `design/M6-bench-baseline.md`. Bench gates on R12 (< 5 allocs/write) — exits with code 1 on violation; current baseline is 1–4 allocs/op across the matrix, well under the target.
|
||||
|
||||
### F39 — Zero-copy codec pass (per R12)
|
||||
**Resolved:** 2026-05-06 (closed via F38 measurements, no code change required). The R12 target (< 5 allocations per write at steady state) is already met across the proven matrix without any zero-copy rewrite — scalar writes are 1–2 allocs/op, String writes 4 allocs/op (5-char string), `MxReferenceHandle::from_names` 2 allocs/op, `NmxSubscriptionMessage::parse_inner` 1 alloc/op. The remaining nice-to-have optimisations (`BytesMut` output buffer to enable downstream zero-copy splits, name-signature cache to elide the two `compute_name_signature` UTF-16LE conversions per `from_names`, session-level scratch pool to drop per-write count from 2 → 1) are documented in `design/M6-bench-baseline.md` as post-V1 work — they don't gate M6 DoD because R12 is already satisfied.
|
||||
|
||||
### F42 — `cargo doc` cleanup pass
|
||||
**Resolved:** 2026-05-06 (commit `e79e289`). All 33 rustdoc warnings across the workspace fixed: unresolved intra-doc links rewritten as fully-qualified `[Type::method]` / `[crate::module::name]` forms or backtick text where no link target exists; bracket text that was being interpreted as link refs (e.g. `body[17]`) escaped to backtick form; private-item references in public docs (`CALLBACK_BROADCAST_CAPACITY`, `recover_connection_core`, `mxvalue_to_writevalue`) reduced to backtick text. `RUSTDOCFLAGS="-D warnings" cargo doc --workspace --no-deps` exits clean. Workspace 759 tests pass; clippy clean. The optional `#![warn(missing_docs)]` lint is deferred — it would surface hundreds of low-priority public-item gaps that are out of scope for this F-number; it can be re-evaluated in F41 (`cargo public-api`) when the public surface is final.
|
||||
|
||||
### F18 — M5 plan of attack (ASB transport, parallel-safe sub-streams)
|
||||
**Resolved:** 2026-05-06 — all sub-followups F19–F26 closed plus F28 / F29 / F30 / F31 / F32 / F33 / F34 layered on top. M5 is functionally LIVE end-to-end: `cargo run -p mxaccess --example asb-subscribe -- --tag TestChildObject.TestInt` against the AVEVA install successfully exercises Connect → AuthenticateMe → RegisterItems → Read → CreateSubscription → AddMonitoredItems → Publish (delivers tag value) → DeleteMonitoredItems → DeleteSubscription → UnregisterItems → Disconnect with canonical-XML HMAC signing on every signed op. **Severity:** P0 — milestone driver, blocks ASB consumers + V1 release.
|
||||
**Source:** `design/dependencies.md:73-89` + `design/60-roadmap.md:84-91` + `design/70-risks-and-open-questions.md:5-25`.
|
||||
|
||||
@@ -220,7 +220,7 @@ impl NmfRecord {
|
||||
}
|
||||
|
||||
/// Encode to a fresh buffer. Convenience wrapper around
|
||||
/// [`encode_into`].
|
||||
/// [`Self::encode_into`].
|
||||
pub fn encode(&self) -> Result<Vec<u8>, NmfError> {
|
||||
let mut out = Vec::new();
|
||||
self.encode_into(&mut out)?;
|
||||
|
||||
@@ -18,10 +18,9 @@
|
||||
//! - `INmxSvcCallback` (IID `B49F92F7-C748-4169-8ECA-A0670B012746`,
|
||||
//! `NmxProcedureMetadata.cs:6`) — opnums 3 (`DataReceived`),
|
||||
//! 4 (`StatusReceived`). The handler decodes the inbound buffer via
|
||||
//! [`crate::nmx_callback_messages::parse_callback_request`] (re-export
|
||||
//! from `mxaccess-rpc::nmx_callback_messages`), emits a typed event,
|
||||
//! and returns the success response built by
|
||||
//! [`crate::nmx_callback_messages::encode_callback_response`].
|
||||
//! [`mxaccess_rpc::nmx_callback_messages::parse_callback_request`],
|
||||
//! emits a typed event, and returns the success response built by
|
||||
//! [`mxaccess_rpc::nmx_callback_messages::encode_callback_response`].
|
||||
//!
|
||||
//! Auth3 PDUs are accepted but ignored (`ManagedCallbackExporter.cs:133-137`)
|
||||
//! — NTLM packet integrity for inbound frames is not yet wired (open
|
||||
|
||||
@@ -15,5 +15,9 @@ thiserror = { workspace = true }
|
||||
default = []
|
||||
serde = []
|
||||
|
||||
[[bench]]
|
||||
name = "alloc_count"
|
||||
harness = false
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
@@ -0,0 +1,298 @@
|
||||
//! F38 — counting-allocator bench for `mxaccess-codec`.
|
||||
//!
|
||||
//! Measures allocation count + bytes-allocated for the proven
|
||||
//! encode/decode matrix per `design/70-risks-and-open-questions.md`
|
||||
//! R12 (< 5 allocs per write at steady state). The harness wraps the
|
||||
//! global allocator with a [`CountingAllocator`] that tracks
|
||||
//! per-call counts; each scenario records pre-state, runs N
|
||||
//! iterations, and reports `(alloc_count, bytes_allocated) / N`.
|
||||
//!
|
||||
//! Output is the source of truth for `design/M6-bench-baseline.md`.
|
||||
//!
|
||||
//! ## Why hand-rolled (not `dhat` / `criterion`)
|
||||
//!
|
||||
//! - `dhat` is heap-profiling oriented (snapshots, call-stack
|
||||
//! attribution); for "did this op allocate < 5 times?" the simpler
|
||||
//! approach is a thin `GlobalAlloc` wrapper that increments two
|
||||
//! atomics. No call-stack capture, no JSON output to post-process.
|
||||
//! - `criterion` measures wall-clock latency; per `60-roadmap.md:104`,
|
||||
//! latency is reported but not gating in V1. Allocation count IS
|
||||
//! the gating metric for M6 DoD bullet 3.
|
||||
//!
|
||||
//! ## Run
|
||||
//!
|
||||
//! ```text
|
||||
//! cargo bench -p mxaccess-codec
|
||||
//! ```
|
||||
//!
|
||||
//! Each scenario runs in release mode by default (cargo bench
|
||||
//! profile = `bench` which inherits release).
|
||||
|
||||
#![allow(
|
||||
clippy::unwrap_used,
|
||||
clippy::expect_used,
|
||||
clippy::cast_possible_truncation,
|
||||
clippy::cast_sign_loss
|
||||
)]
|
||||
|
||||
use std::alloc::{GlobalAlloc, Layout, System};
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
|
||||
use mxaccess_codec::{
|
||||
MxReferenceHandle, NmxSubscriptionMessage, write_message,
|
||||
write_message::WriteValue,
|
||||
};
|
||||
|
||||
// ---- counting allocator -------------------------------------------------
|
||||
|
||||
struct CountingAllocator;
|
||||
|
||||
static ALLOC_COUNT: AtomicU64 = AtomicU64::new(0);
|
||||
static ALLOC_BYTES: AtomicU64 = AtomicU64::new(0);
|
||||
static DEALLOC_COUNT: AtomicU64 = AtomicU64::new(0);
|
||||
|
||||
unsafe impl GlobalAlloc for CountingAllocator {
|
||||
unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
|
||||
ALLOC_COUNT.fetch_add(1, Ordering::Relaxed);
|
||||
ALLOC_BYTES.fetch_add(layout.size() as u64, Ordering::Relaxed);
|
||||
// SAFETY: forwarding to the system allocator with the same layout.
|
||||
unsafe { System.alloc(layout) }
|
||||
}
|
||||
|
||||
unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
|
||||
DEALLOC_COUNT.fetch_add(1, Ordering::Relaxed);
|
||||
// SAFETY: forwarding to the system allocator with the same ptr+layout.
|
||||
unsafe { System.dealloc(ptr, layout) }
|
||||
}
|
||||
}
|
||||
|
||||
#[global_allocator]
|
||||
static GLOBAL: CountingAllocator = CountingAllocator;
|
||||
|
||||
// ---- scenario harness ---------------------------------------------------
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
struct Snapshot {
|
||||
allocs: u64,
|
||||
bytes: u64,
|
||||
deallocs: u64,
|
||||
}
|
||||
|
||||
fn snapshot() -> Snapshot {
|
||||
Snapshot {
|
||||
allocs: ALLOC_COUNT.load(Ordering::Relaxed),
|
||||
bytes: ALLOC_BYTES.load(Ordering::Relaxed),
|
||||
deallocs: DEALLOC_COUNT.load(Ordering::Relaxed),
|
||||
}
|
||||
}
|
||||
|
||||
fn diff(start: Snapshot, end: Snapshot, iterations: u64) -> (f64, f64, f64) {
|
||||
(
|
||||
(end.allocs - start.allocs) as f64 / iterations as f64,
|
||||
(end.bytes - start.bytes) as f64 / iterations as f64,
|
||||
(end.deallocs - start.deallocs) as f64 / iterations as f64,
|
||||
)
|
||||
}
|
||||
|
||||
/// Run `op` `iterations` times and return per-op alloc/bytes/dealloc
|
||||
/// counts. The hint is passed through `std::hint::black_box` to keep
|
||||
/// the compiler from optimising the work away.
|
||||
fn measure<F>(name: &str, iterations: u64, mut op: F) -> Row
|
||||
where
|
||||
F: FnMut(),
|
||||
{
|
||||
// Warm-up: 1k iterations to settle any one-time setup state.
|
||||
for _ in 0..1024 {
|
||||
op();
|
||||
}
|
||||
let start = snapshot();
|
||||
for _ in 0..iterations {
|
||||
op();
|
||||
}
|
||||
let end = snapshot();
|
||||
let (allocs, bytes, deallocs) = diff(start, end, iterations);
|
||||
Row {
|
||||
name: name.to_string(),
|
||||
iterations,
|
||||
allocs_per_op: allocs,
|
||||
bytes_per_op: bytes,
|
||||
deallocs_per_op: deallocs,
|
||||
}
|
||||
}
|
||||
|
||||
struct Row {
|
||||
name: String,
|
||||
iterations: u64,
|
||||
allocs_per_op: f64,
|
||||
bytes_per_op: f64,
|
||||
deallocs_per_op: f64,
|
||||
}
|
||||
|
||||
fn print_table(rows: &[Row]) {
|
||||
println!();
|
||||
println!(
|
||||
"| {:40} | {:>10} | {:>10} | {:>10} | {:>10} |",
|
||||
"scenario", "iters", "allocs/op", "bytes/op", "deallocs/op"
|
||||
);
|
||||
println!(
|
||||
"| {:40} | {:>10} | {:>10} | {:>10} | {:>10} |",
|
||||
"-".repeat(40),
|
||||
"-".repeat(10),
|
||||
"-".repeat(10),
|
||||
"-".repeat(10),
|
||||
"-".repeat(10)
|
||||
);
|
||||
for row in rows {
|
||||
println!(
|
||||
"| {:40} | {:>10} | {:>10.2} | {:>10.0} | {:>10.2} |",
|
||||
row.name, row.iterations, row.allocs_per_op, row.bytes_per_op, row.deallocs_per_op
|
||||
);
|
||||
}
|
||||
println!();
|
||||
}
|
||||
|
||||
// ---- scenarios ----------------------------------------------------------
|
||||
|
||||
fn make_handle() -> MxReferenceHandle {
|
||||
MxReferenceHandle::from_names(0, 1, 2, 3, "TestObject", 0, 1, 0, "TestAttr", false)
|
||||
.expect("handle")
|
||||
}
|
||||
|
||||
fn bench_write_int32() -> Row {
|
||||
let handle = make_handle();
|
||||
let value = WriteValue::Int32(42);
|
||||
measure("write_message::encode (Int32)", 10_000, || {
|
||||
let bytes = write_message::encode(&handle, &value, 0, 0).unwrap();
|
||||
std::hint::black_box(bytes);
|
||||
})
|
||||
}
|
||||
|
||||
fn bench_write_float() -> Row {
|
||||
let handle = make_handle();
|
||||
let value = WriteValue::Float32(1.5);
|
||||
measure("write_message::encode (Float32)", 10_000, || {
|
||||
let bytes = write_message::encode(&handle, &value, 0, 0).unwrap();
|
||||
std::hint::black_box(bytes);
|
||||
})
|
||||
}
|
||||
|
||||
fn bench_write_double() -> Row {
|
||||
let handle = make_handle();
|
||||
let value = WriteValue::Float64(3.25);
|
||||
measure("write_message::encode (Float64)", 10_000, || {
|
||||
let bytes = write_message::encode(&handle, &value, 0, 0).unwrap();
|
||||
std::hint::black_box(bytes);
|
||||
})
|
||||
}
|
||||
|
||||
fn bench_write_bool() -> Row {
|
||||
let handle = make_handle();
|
||||
let value = WriteValue::Boolean(true);
|
||||
measure("write_message::encode (Boolean)", 10_000, || {
|
||||
let bytes = write_message::encode(&handle, &value, 0, 0).unwrap();
|
||||
std::hint::black_box(bytes);
|
||||
})
|
||||
}
|
||||
|
||||
fn bench_write_string() -> Row {
|
||||
let handle = make_handle();
|
||||
let value = WriteValue::String("hello".to_string());
|
||||
measure("write_message::encode (String, 5 chars)", 10_000, || {
|
||||
let bytes = write_message::encode(&handle, &value, 0, 0).unwrap();
|
||||
std::hint::black_box(bytes);
|
||||
})
|
||||
}
|
||||
|
||||
fn bench_subscription_decode() -> Row {
|
||||
// Build a single-record DataUpdate body once; decode N times.
|
||||
let body = build_data_update_int32_body(42);
|
||||
measure(
|
||||
"NmxSubscriptionMessage::parse_inner (DataUpdate, Int32)",
|
||||
10_000,
|
||||
|| {
|
||||
let msg = NmxSubscriptionMessage::parse_inner(&body).unwrap();
|
||||
std::hint::black_box(msg);
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
fn bench_handle_from_names() -> Row {
|
||||
measure("MxReferenceHandle::from_names", 10_000, || {
|
||||
let h = MxReferenceHandle::from_names(
|
||||
0,
|
||||
1,
|
||||
2,
|
||||
3,
|
||||
"TestChildObject",
|
||||
0,
|
||||
1,
|
||||
0,
|
||||
"TestInt",
|
||||
false,
|
||||
)
|
||||
.unwrap();
|
||||
std::hint::black_box(h);
|
||||
})
|
||||
}
|
||||
|
||||
// ---- helpers (mirror the test fixtures in subscription_message.rs) -----
|
||||
|
||||
fn build_data_update_int32_body(value: i32) -> Vec<u8> {
|
||||
// Operation id + correlation id are arbitrary 16-byte sequences for
|
||||
// a synthetic body; the codec doesn't reject any GUID shape.
|
||||
const DATA_UPDATE_COMMAND: u8 = 0x33;
|
||||
let operation_id = [0x11u8; 16];
|
||||
// 15-byte record prefix: status(4) + quality(2) + filetime(8) + wire_kind(1).
|
||||
// wire_kind = 0x02 = Int32. Then the 4-byte i32 LE payload.
|
||||
let mut record = Vec::with_capacity(15 + 4);
|
||||
record.extend_from_slice(&0i32.to_le_bytes()); // status
|
||||
record.extend_from_slice(&0x00C0u16.to_le_bytes()); // quality
|
||||
record.extend_from_slice(&0i64.to_le_bytes()); // filetime
|
||||
record.push(0x02); // wire_kind = Int32
|
||||
record.extend_from_slice(&value.to_le_bytes());
|
||||
|
||||
let mut out = Vec::with_capacity(23 + record.len());
|
||||
out.push(DATA_UPDATE_COMMAND);
|
||||
out.extend_from_slice(&1u16.to_le_bytes()); // version
|
||||
out.extend_from_slice(&1i32.to_le_bytes()); // record_count = 1
|
||||
out.extend_from_slice(&operation_id);
|
||||
out.extend_from_slice(&record);
|
||||
out
|
||||
}
|
||||
|
||||
// ---- main --------------------------------------------------------------
|
||||
|
||||
fn main() {
|
||||
println!("F38 — mxaccess-codec allocation-count baseline");
|
||||
println!("Counting allocator: thin GlobalAlloc wrapper around System.");
|
||||
println!("R12 target: < 5 allocations per write at steady state.");
|
||||
|
||||
let rows = vec![
|
||||
bench_write_int32(),
|
||||
bench_write_float(),
|
||||
bench_write_double(),
|
||||
bench_write_bool(),
|
||||
bench_write_string(),
|
||||
bench_handle_from_names(),
|
||||
bench_subscription_decode(),
|
||||
];
|
||||
|
||||
print_table(&rows);
|
||||
|
||||
// R12 gate: emit a non-zero exit code if any encode-write scenario
|
||||
// exceeds the 5-allocs threshold. Decoders are reported but not
|
||||
// gated (the sweep below explicitly excludes them).
|
||||
let mut violations = 0;
|
||||
for row in &rows {
|
||||
if row.name.starts_with("write_message::encode") && row.allocs_per_op >= 5.0 {
|
||||
eprintln!(
|
||||
"R12 violation: {} allocates {:.2}/op (>= 5)",
|
||||
row.name, row.allocs_per_op
|
||||
);
|
||||
violations += 1;
|
||||
}
|
||||
}
|
||||
if violations > 0 {
|
||||
std::process::exit(1);
|
||||
}
|
||||
}
|
||||
@@ -154,7 +154,7 @@ impl AsbVariant {
|
||||
}
|
||||
}
|
||||
|
||||
/// Standalone encode: convenience wrapper around [`encode_into`].
|
||||
/// Standalone encode: convenience wrapper around [`Self::encode_into`].
|
||||
pub fn encode(&self) -> Vec<u8> {
|
||||
let mut out = Vec::with_capacity(self.wire_len());
|
||||
self.encode_into(&mut out);
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
//!
|
||||
//! Direct port of `src/MxNativeCodec/NmxItemControlMessage.cs`. The body
|
||||
//! carries an advise-supervisory or unadvise command together with a 16-byte
|
||||
//! item correlation GUID and a 14-byte projection of an [`MxReferenceHandle`]
|
||||
//! item correlation GUID and a 14-byte projection of an [`crate::reference_handle::MxReferenceHandle`]
|
||||
//! (handle bytes 6..20 — `object_id` through `attribute_index`).
|
||||
//!
|
||||
//! ## Wire layout
|
||||
@@ -87,7 +87,7 @@ const PAYLOAD_LENGTH: usize = 18; // cs:28 — 7×u16 + u32 tail = 18 bytes
|
||||
pub const DEFAULT_TAIL: u32 = 3;
|
||||
|
||||
/// Decoded NMX item-control body. The fields after `item_correlation_id`
|
||||
/// project bytes 6..20 of an [`MxReferenceHandle`] — see
|
||||
/// project bytes 6..20 of an [`crate::reference_handle::MxReferenceHandle`] — see
|
||||
/// `NmxItemControlMessage.cs:71-81, 134-141`.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
|
||||
pub struct NmxItemControlMessage {
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
//! `NmxMetadataQueryMessage` — observed pre-advise metadata-query body.
|
||||
//!
|
||||
//! Direct port of `src/MxNativeCodec/NmxMetadataQueryMessage.cs`. The .NET
|
||||
//! reference exposes a single static helper, [`encode_observed_pre_advise`],
|
||||
//! reference exposes a single static helper, `encode_observed_pre_advise`,
|
||||
//! which returns a fixed observed body with a 16-byte item-correlation GUID
|
||||
//! patched in at offset `0x8a`.
|
||||
//!
|
||||
|
||||
@@ -44,7 +44,7 @@
|
||||
//! `Encode` (`.cs:51-64`) writes:
|
||||
//!
|
||||
//! 1. The captured prefix (`_prefix`, raw bytes) — preserved verbatim.
|
||||
//! 2. The freshly-encoded value bytes from [`encode_value`].
|
||||
//! 2. The freshly-encoded value bytes from `encode_value_bytes`.
|
||||
//! 3. The captured suffix (`_suffixBeforeWriteIndex`) — preserved verbatim.
|
||||
//! 4. The fresh `writeIndex` as i32 LE in the trailing 4 bytes.
|
||||
//!
|
||||
@@ -167,12 +167,12 @@ impl ObservedWriteBodyTemplate {
|
||||
}
|
||||
}
|
||||
|
||||
/// Captured opcode at body[0]. Mirrors `_prefix[0]`.
|
||||
/// Captured opcode at `body[0]`. Mirrors `_prefix[0]`.
|
||||
pub fn command(&self) -> u8 {
|
||||
self.command
|
||||
}
|
||||
|
||||
/// Captured wire-kind byte at body[17]. Drawn from the captured prefix,
|
||||
/// Captured wire-kind byte at `body[17]`. Drawn from the captured prefix,
|
||||
/// not from the runtime [`MxValueKind`] (which can disambiguate
|
||||
/// String vs DateTime past the encoder collapse).
|
||||
pub fn wire_kind(&self) -> u8 {
|
||||
|
||||
@@ -34,8 +34,9 @@ const CRC16_IBM_POLYNOMIAL: u16 = 0xa001;
|
||||
///
|
||||
/// `object_signature` and `attribute_signature` are derived values. The Rust
|
||||
/// port keeps them private — the only constructor that produces a handle from
|
||||
/// names is [`from_names`]; the only mutators that update one signature are
|
||||
/// [`with_object_tag_name`] and [`with_attribute_name`], which both
|
||||
/// names is [`MxReferenceHandle::from_names`]; the only mutators that update
|
||||
/// one signature are [`MxReferenceHandle::with_object_tag_name`] and
|
||||
/// [`MxReferenceHandle::with_attribute_name`], which both
|
||||
/// recompute. This is a deliberate tightening over the .NET reference (which
|
||||
/// is a record with public init-only signature fields).
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
|
||||
@@ -157,7 +158,7 @@ impl MxReferenceHandle {
|
||||
/// # Panics
|
||||
///
|
||||
/// Panics if `destination.len() < 20`. Use a 20-byte slice or call
|
||||
/// [`encode`] for a fresh buffer.
|
||||
/// [`Self::encode`] for a fresh buffer.
|
||||
pub fn write_to(self, destination: &mut [u8]) {
|
||||
assert!(
|
||||
destination.len() >= Self::ENCODED_LEN,
|
||||
|
||||
@@ -72,8 +72,8 @@
|
||||
//! 28.. payload
|
||||
//! ```
|
||||
//!
|
||||
//! The encoder writes `count` (u16) at body[22] and `element_width` (u16) at
|
||||
//! body[24]. The decoder/subscription side reads `element_width` as `i32` at
|
||||
//! The encoder writes `count` (u16) at `body[22]` and `element_width` (u16) at
|
||||
//! `body[24]`. The decoder/subscription side reads `element_width` as `i32` at
|
||||
//! a different offset — that asymmetry is documented in the subscription
|
||||
//! message module, not here. Encoder element widths are 2/4/4/8 for
|
||||
//! Boolean/Int32/Float32/Float64 arrays; for variable arrays (String,
|
||||
|
||||
@@ -88,7 +88,7 @@ pub enum NmxClientError {
|
||||
|
||||
/// The metadata's `(mx_data_type, is_array)` pair has no LMX wire
|
||||
/// encoding (e.g. arrays of `ElapsedTime`, scalars of
|
||||
/// `ReferenceType`). Returned by [`NmxClient::resolve_write_kind`]
|
||||
/// `ReferenceType`). Returned by [`GalaxyTagMetadata::resolve_write_kind`]
|
||||
/// helpers when the caller asks for a kind that
|
||||
/// [`mxaccess_codec::MxValueKind::for_data_type`] rejects.
|
||||
/// Mirrors the `ArgumentOutOfRangeException` paths in the .NET
|
||||
@@ -530,7 +530,7 @@ impl NmxClient {
|
||||
}
|
||||
|
||||
/// `INmxService2::RemoveSubscriberEngine` (opnum 8). Mirrors
|
||||
/// `cs:137-147`. Same wire shape as [`add_subscriber_engine`].
|
||||
/// `cs:137-147`. Same wire shape as [`Self::add_subscriber_engine`].
|
||||
///
|
||||
/// # Errors
|
||||
/// Transport or codec.
|
||||
|
||||
@@ -539,7 +539,7 @@ impl BindPdu {
|
||||
|
||||
/// Encode the Bind / AlterContext PDU. Returns the wire bytes
|
||||
/// (`DceRpcPdu.cs:264-290`). Sets `frag_length` and `auth_length=0` on
|
||||
/// the encoded header — to attach an auth verifier use [`encode_with_auth`].
|
||||
/// the encoded header — to attach an auth verifier use [`Self::encode_with_auth`].
|
||||
pub fn encode(&self) -> Vec<u8> {
|
||||
let length: usize = BIND_BODY_OFFSET
|
||||
+ self
|
||||
@@ -955,7 +955,7 @@ impl BindAckPdu {
|
||||
///
|
||||
/// The .NET `Encode` (`DceRpcPdu.cs:118-124`) defaults `packet_flags` to
|
||||
/// `0x03` (PFC_FIRST_FRAG | PFC_LAST_FRAG) only when the supplied flags are
|
||||
/// 0; the Rust port keeps the same exact behaviour in [`encode`].
|
||||
/// 0; the Rust port keeps the same exact behaviour in [`RequestPdu::encode`].
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct RequestPdu {
|
||||
pub header: PduHeader,
|
||||
@@ -1081,7 +1081,7 @@ pub struct ResponsePdu {
|
||||
|
||||
impl ResponsePdu {
|
||||
/// Decode a Response PDU (`DceRpcPdu.cs:141-168`). The .NET reference
|
||||
/// has no encoder for Response — the Rust port adds [`encode`] for tests
|
||||
/// has no encoder for Response — the Rust port adds [`Self::encode`] for tests
|
||||
/// and round-trip use.
|
||||
pub fn decode(buf: &[u8]) -> Result<Self, RpcError> {
|
||||
let header = PduHeader::decode(buf)?;
|
||||
|
||||
@@ -21,14 +21,15 @@
|
||||
//! Implements:
|
||||
//! * [`AsbSession::connect`] — TCP connect → preamble → DH handshake
|
||||
//! → ready session.
|
||||
//! * [`AsbSession::register_items`] / [`unregister_items`] /
|
||||
//! [`read`] / [`write`] — per-operation thin async wrappers.
|
||||
//! * [`AsbSession::keep_alive`] / [`disconnect`] / [`shutdown`] —
|
||||
//! lifecycle.
|
||||
//! * [`AsbSession::register_items`] / [`AsbSession::unregister_items`] /
|
||||
//! [`AsbSession::read`] / [`AsbSession::write`] — per-operation thin
|
||||
//! async wrappers.
|
||||
//! * [`AsbSession::keep_alive`] / [`AsbSession::disconnect`] /
|
||||
//! `AsbSession::shutdown` — lifecycle.
|
||||
//! * [`AsbSession::create_subscription`] /
|
||||
//! [`add_monitored_items`] / [`publish`] /
|
||||
//! [`delete_monitored_items`] / [`delete_subscription`] —
|
||||
//! subscription primitives.
|
||||
//! [`AsbSession::add_monitored_items`] / [`AsbSession::publish`] /
|
||||
//! [`AsbSession::delete_monitored_items`] /
|
||||
//! [`AsbSession::delete_subscription`] — subscription primitives.
|
||||
//! * [`AsbSession::subscribe`] — returns an [`AsbSubscription`]
|
||||
//! `Stream<Item = Result<MonitoredItemValue, Error>>` driven by a
|
||||
//! background publish-loop. Drop of the stream aborts the loop.
|
||||
@@ -64,7 +65,7 @@ use tokio::task::JoinHandle;
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
|
||||
use crate::transport_asb::AsbTransport;
|
||||
use crate::{ConnectionError, Error};
|
||||
use crate::{BufferedOptions, ConnectionError, Error, TransportKind};
|
||||
|
||||
/// Channel buffer for [`AsbSubscription`]'s publish-loop. 256 samples is
|
||||
/// generous for the typical sample-rate budget (1-100 Hz) — bounded so
|
||||
@@ -345,6 +346,36 @@ impl AsbSession {
|
||||
join: Some(join),
|
||||
}
|
||||
}
|
||||
|
||||
/// `subscribe_buffered` is **not supported** on the ASB transport.
|
||||
/// ASB has no buffered-cadence equivalent of NMX's
|
||||
/// `SetBufferedUpdateInterval`; the per-monitored-item `SampleInterval`
|
||||
/// (see [`mxaccess_asb::MinimalMonitoredItem::sample_interval`])
|
||||
/// already plays the rate-limit role — set it on each
|
||||
/// `MonitoredItem` passed to [`Self::add_monitored_items`] instead.
|
||||
///
|
||||
/// Returns [`Error::Unsupported`] synchronously, without touching
|
||||
/// the wire. F37.
|
||||
pub async fn subscribe_buffered(
|
||||
&self,
|
||||
_reference: &str,
|
||||
_options: BufferedOptions,
|
||||
) -> Result<AsbSubscription, Error> {
|
||||
Err(unsupported_subscribe_buffered_error())
|
||||
}
|
||||
}
|
||||
|
||||
/// F37 — typed `Error::Unsupported` returned by
|
||||
/// [`AsbSession::subscribe_buffered`]. Extracted into a free fn so the
|
||||
/// gate's exact shape is unit-testable without constructing a full
|
||||
/// `AsbSession` (which requires a live authenticator + transport).
|
||||
fn unsupported_subscribe_buffered_error() -> Error {
|
||||
Error::Unsupported {
|
||||
operation: std::borrow::Cow::Borrowed(
|
||||
"AsbSession::subscribe_buffered (use MinimalMonitoredItem::sample_interval; ASB has no SetBufferedUpdateInterval analogue)",
|
||||
),
|
||||
transport: TransportKind::Asb,
|
||||
}
|
||||
}
|
||||
|
||||
/// Inner publish-loop body — testable in isolation by passing any
|
||||
@@ -477,6 +508,36 @@ mod tests {
|
||||
assert_stream_send_unpin::<AsbSubscription>();
|
||||
}
|
||||
|
||||
/// F37 — the `Error::Unsupported` returned by
|
||||
/// `AsbSession::subscribe_buffered` carries `TransportKind::Asb`
|
||||
/// and an operation message mentioning `subscribe_buffered`.
|
||||
/// Targets the helper directly so the test doesn't need to spin
|
||||
/// up a live authenticator / transport.
|
||||
#[test]
|
||||
fn subscribe_buffered_unsupported_error_shape() {
|
||||
let err = unsupported_subscribe_buffered_error();
|
||||
match err {
|
||||
Error::Unsupported {
|
||||
transport,
|
||||
operation,
|
||||
} => {
|
||||
assert!(
|
||||
matches!(transport, TransportKind::Asb),
|
||||
"expected TransportKind::Asb, got {transport:?}"
|
||||
);
|
||||
assert!(
|
||||
operation.contains("subscribe_buffered"),
|
||||
"operation should name the unsupported method, got {operation:?}"
|
||||
);
|
||||
assert!(
|
||||
operation.contains("sample_interval"),
|
||||
"operation should hint at the ASB analogue, got {operation:?}"
|
||||
);
|
||||
}
|
||||
other => panic!("expected Error::Unsupported, got {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
fn fake_value(idx: i32) -> MonitoredItemValue {
|
||||
MonitoredItemValue {
|
||||
item: ItemIdentity::absolute_by_name(format!("Tag{idx}")),
|
||||
|
||||
@@ -397,8 +397,8 @@ impl Session {
|
||||
}
|
||||
|
||||
/// Write a value to a tag (`MxValue` overload). Delegates to
|
||||
/// [`Self::write_value`] after converting `value` via
|
||||
/// [`mxvalue_to_writevalue`].
|
||||
/// [`Self::write_value`] after converting `value` via the private
|
||||
/// `mxvalue_to_writevalue` helper.
|
||||
///
|
||||
/// # Errors
|
||||
/// As for [`Self::write_value`], plus
|
||||
|
||||
@@ -119,7 +119,7 @@ const CALLBACK_BROADCAST_CAPACITY: usize = 256;
|
||||
/// ## Lag behavior
|
||||
///
|
||||
/// The underlying broadcast channel has a fixed capacity
|
||||
/// ([`CALLBACK_BROADCAST_CAPACITY`]). Slow consumers that fall behind
|
||||
/// (`CALLBACK_BROADCAST_CAPACITY`). Slow consumers that fall behind
|
||||
/// receive `Some(Err(Error::Configuration(InvalidArgument)))` whose
|
||||
/// detail string contains the lag-loss count. The stream stays open
|
||||
/// and resumes from the next available message — same shape as
|
||||
@@ -449,7 +449,7 @@ impl Session {
|
||||
/// `(host, port, service_ipid)` before `RegisterEngine2`. This
|
||||
/// constructor takes the resolved triple by hand — useful for
|
||||
/// tests, deterministic probes, and non-Windows builds. Use
|
||||
/// [`Self::connect_nmx_auto`] (Windows + `windows-com` feature) to
|
||||
/// `Self::connect_nmx_auto` (Windows + `windows-com` feature) to
|
||||
/// drive the auto-resolving path.
|
||||
///
|
||||
/// On success: a `RegisterEngine2` round-trip has completed and the
|
||||
@@ -650,7 +650,7 @@ impl Session {
|
||||
/// `RecoverConnectionCore` (`cs:442-474`).
|
||||
///
|
||||
/// Per attempt: emit [`RecoveryEvent::Started`], invoke
|
||||
/// [`recover_connection_core`](Self::recover_connection_core) (rebuild
|
||||
/// the private `recover_connection_core` helper (rebuild
|
||||
/// the NMX transport via the installed [`RebuildFactory`], re-run
|
||||
/// `RegisterEngine2` with the saved callback OBJREF, replay every
|
||||
/// active subscription's `AdviseSupervisory`, atomically swap the
|
||||
@@ -821,7 +821,7 @@ impl Session {
|
||||
/// This accessor is the test seam + escape hatch consumers can use
|
||||
/// today to observe the raw stream.
|
||||
///
|
||||
/// Receivers can lag by up to [`CALLBACK_BROADCAST_CAPACITY`]
|
||||
/// Receivers can lag by up to `CALLBACK_BROADCAST_CAPACITY`
|
||||
/// messages before the broadcast channel starts dropping; lagged
|
||||
/// receivers see [`tokio::sync::broadcast::error::RecvError::Lagged`].
|
||||
#[must_use]
|
||||
|
||||
Reference in New Issue
Block a user