Files
mxaccess/design/20-async-layer.md
Joseph Doherty fe2a6db786
rust / build / test / clippy / fmt (push) Has been cancelled
Initial project state: .NET reference, design, Rust port (M0+M1), evidence
Layout:
- src/                    .NET 10 x64 reference: MxNativeCodec, MxNativeClient,
                          MxAsbClient, probes, tests, harnesses. Executable spec.
- design/                 Architectural plan for the Rust port (M0–M6), error
                          model, protocol invariants, risks (R1–R16), adversarial
                          review log (review.md).
- rust/                   Rust workspace. M0 skeleton + M1 codec parity.
                          mxaccess-codec: 215 unit tests + 2 cross-implementation
                          parity tests (byte-identical against .NET reference).
                          Other crates are M0 stubs awaiting M2+.
- captures/               Frida + netsh + pcap evidence per CLAUDE.md
                          ("captures are evidence, not throwaway logs").
- analysis/               Decompiled C# (frida/proxy/decompiled-*),
                          Ghidra exports for native DLLs (`exports/` only —
                          working state at `projects/` and AVEVA's input
                          binaries at `input/` are gitignored).
- docs/                   Reverse-engineering reference docs.
- tools/                  Setup-LiveProbeEnv.ps1 (Infisical credential fetcher),
                          Compute-Crc.ps1 (.NET parity helper).
- .github/workflows/      Rust CI: fmt + build + test + clippy on Windows.
- LICENSE                 MIT (Joseph Doherty, 2026).

Verified:
- cargo test --workspace → 217 passed (215 unit + 2 .NET parity), 0 failed
- cargo clippy --workspace -- -D warnings → clean
- cargo fmt --all -- --check → clean
- cargo publish --dry-run -p mxaccess-codec → packages cleanly

Excluded from history (see .gitignore):
- **/bin, **/obj, **/target — build artifacts
- analysis/ghidra/projects/ — Ghidra working state (regenerable)
- analysis/ghidra/input/ — AVEVA proprietary DLLs (vendor IP)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-05 06:21:00 -04:00

394 lines
23 KiB
Markdown

# Async layer (Tokio)
The async layer is the public face of the library. Most consumers depend on `mxaccess` (the top-level crate) and never see the raw crates. The crate is async-native, Tokio-based, and exposes idiomatic Rust: typed errors, `Send + Sync` handles, `Stream`s for subscriptions, drop-cancellable subscriptions, `tracing` instrumentation.
## Public crate: `mxaccess`
Re-exports the core types from `mxaccess-codec`, hides the raw transports, adds the async session.
### Connection
```rust
use mxaccess::{Session, ConnectionOptions, Credentials, MxValue};
use std::time::SystemTime;
let session = Session::connect(
ConnectionOptions::nmx("localhost")
.galaxy_id(1)
.platform_id(1)
.engine_id(MX_LOCAL_ENGINE)
.credentials(Credentials::current_user())
.galaxy_db("Server=localhost;Database=Galaxy;Integrated Security=True;TrustServerCertificate=True"),
).await?;
```
`ConnectionOptions::nmx(...)` selects `NmxTransport`; `ConnectionOptions::asb(...)` selects `AsbTransport`. Both produce a `Session`.
`Session` is `Clone + Send + Sync`. Internally it wraps `Arc<SessionInner>` so cloned handles share the same underlying connection.
**Orderly shutdown — `Session::shutdown(timeout: Duration) -> impl Future<Output = Result<(), Error>>`.** Sends `UnAdvise` for every live subscription, then `UnregisterEngine`, and awaits the connection task's confirmation that those frames have flushed — or returns `Err(Error::Timeout(_))` if the timeout elapses first. This is the recommended exit path for production code and is the async equivalent of the .NET reference's synchronous `Dispose` (src/MxNativeClient/MxNativeSession.cs:476-514).
Drop of the last `Session` clone is a best-effort fallback: it signals `UnregisterEngine` to the connection task via the same in-process channel that subscription drops use (no `tokio::spawn`, no `block_on`). If the runtime is shut down before the connection task drains, the unregister is lost — see the runtime-shutdown leak note under "Cancellation". Callers that care about deterministic engine deregistration must call `Session::shutdown` rather than relying on drop.
### Operations
```rust
// Fire-and-forget: returns when the LMX `Write` RPC return is acked.
// No `WriteCompleted` callback is awaited.
session.write("TestChildObject.TestInt", MxValue::Int32(123)).await?;
// Awaits the 5-byte `OperationStatus` completion frame. The `client_token`
// correlates the wire callback to this call (see writeIndex/clientToken on
// `MxNativeSession.WriteAsync`, src/MxNativeClient/MxNativeSession.cs:165-185).
session.write_with_completion(
"TestChildObject.TestInt",
MxValue::Int32(123),
/* client_token: */ 0x1001u32,
).await?;
session.write_with_timestamp(
"TestChildObject.TestInt",
MxValue::Int32(123),
SystemTime::now(),
).await?;
// Verified Write — the LMX `WriteSecured` always takes TWO user ids:
// `(currentUserId, verifierUserId, value)`. "Single-user secured write" is
// callers passing the same id twice; it is NOT a separate API surface.
// `WriteSecured2` adds a timestamp; it does NOT add a second token. The
// `0x80004021` failure observed in `MxNativeSession.WriteSecuredAsync` is a
// defect of the .NET native reimplementation, not a real LMX constraint
// (verified against wwtools/mxaccesscli/docs/api-notes.md:60-72,87-95 and
// wwtools/mxaccesscli/src/MxAccess.Cli/Commands/WriteCommand.cs:44-101,151-155,196-199;
// the LMX proxy CLI exposes `WriteSecured(currentUserId, verifierUserId, value)`
// and treats single-user secured writes as `currentUserId == verifierUserId`).
session.write_secured(
"TestChildObject.TestInt",
MxValue::Int32(123),
SecurityContext { current_user_id, verifier_user_id },
).await?;
// Timestamped Verified Write — adds a `SystemTime`. Same two-id token shape;
// matches `WriteSecured2(currentUserId, verifierUserId, value, timestamp)`.
session.write_secured_at(
"TestChildObject.TestInt",
MxValue::Int32(123),
SystemTime::now(),
SecurityContext { current_user_id, verifier_user_id },
).await?;
// `read` is implemented as `subscribe + first-result + drop`, mirroring
// `MxNativeSession.ReadAsync` (src/MxNativeClient/MxNativeSession.cs:312-359),
// which requires a positive timeout and unadvises on completion or timeout.
let DataChange { value, status, timestamp, .. } =
session.read("TestChildObject.TestInt", Duration::from_secs(5)).await?;
```
All operations take a `&str` reference name (e.g. `"TestObject.Attribute"`) and resolve it to a `MxReferenceHandle` internally via the configured `Resolver`. Default resolver is `mxaccess-galaxy::SqlResolver`; an in-memory resolver is provided for tests (`InMemoryResolver::insert("Tag", metadata)`).
`Session::write` returns `Ok(())` once the LMX `Write` RPC has been acknowledged at the transport level — it does **not** await a wire `WriteCompleted` frame. Callers that need write-completion semantics must use `Session::write_with_completion(reference, value, client_token)`, which threads `client_token` through the `MxNativeSession.WriteAsync` `clientToken` parameter (src/MxNativeClient/MxNativeSession.cs:165-185) and returns when the matching 5-byte `OperationStatus` callback frame is decoded. See `70-risks-and-open-questions.md` R3/R4 for the cases where the proven stack does not emit a completion frame.
`Session::read` takes a `Duration` timeout (matching the .NET reference's mandatory `TimeSpan timeout` argument and `ArgumentOutOfRangeException` for non-positive values, src/MxNativeClient/MxNativeSession.cs:312-321). Implementation is `subscribe + first-result + drop`; the drop guard guarantees `UnAdvise` runs on the success, error, and timeout paths so no advise is leaked, mirroring the `finally`/`Unsubscribe` block at src/MxNativeClient/MxNativeSession.cs:351-358.
### Subscriptions
```rust
use futures::StreamExt;
let mut subscription = session.subscribe("TestChildObject.TestInt").await?;
while let Some(change) = subscription.next().await {
let change = change?;
println!("{} = {:?} @ {:?} (status={:?})",
change.reference, change.value, change.timestamp, change.status);
}
// Drop the subscription to unadvise.
drop(subscription);
```
`Subscription` implements `Stream<Item = Result<DataChange, Error>>`.
**Err semantics — non-terminal for parse errors, terminal after connection loss.** The stream yields `Err` items for parse-level failures (the consumer can keep polling — the next inbound frame will be delivered). The stream ends with `None` after a final `Err` for connection-loss / subscription-end events; once `None` is observed, no further items will be yielded. This split mirrors the .NET reference's two events: `CallbackReceived` is raised per-record after a successful parse (src/MxNativeClient/MxNativeSession.cs:603-606), while `UnparsedCallbackReceived` is raised when `NmxSubscriptionMessage.ParseProcessDataReceivedBody` throws — without tearing down other live subscriptions (src/MxNativeClient/MxNativeSession.cs:590-601).
Consumers wanting strict parity with `UnparsedCallbackReceived` (raw bytes for unparseable frames) can subscribe to `Subscription::raw_callbacks() -> Stream<Item = RawCallback>`, or simply inspect the `Err` variants and keep polling: `Error::Protocol(ProtocolError::Decode { .. })` corresponds to the .NET unparsed-callback path and is non-terminal; `Error::Connection(_)` is terminal and the next `next().await` returns `None`.
Dropping the subscription sends `UnAdvise` (best-effort, fire-and-forget — see drop semantics below) and removes the correlation from the session's subscription map.
For batch subscriptions:
```rust
let mut sub = session.subscribe_many(&["A.X", "A.Y", "A.Z"]).await?;
while let Some(change) = sub.next().await {
let change = change?;
// change.reference identifies which of the three
}
```
Multi-tag subscriptions multiplex on the same callback channel and demultiplex by correlation ID inside the session task. The wire still issues one `Advise` per tag — the Rust API does not pretend a single advise covers many tags.
**`subscribe_many` is non-atomic.** The implementation issues one `AdviseSupervisory` per tag in a loop, mirroring `MxNativeSession.SubscribeAsync` which produces a fresh `CorrelationId` and calls `_service.AdviseSupervisory` per tag (src/MxNativeClient/MxNativeSession.cs:250-270). If the Nth advise fails, the first N-1 succeed and remain advised. The error is surfaced through the returned `Result`; the partial set lives on the returned `Subscription`, and the consumer chooses how to recover:
- `Subscription::drop` to unadvise the partial set, or
- retry the failed tag (e.g. via a follow-up `session.subscribe(failed_tag).await`).
`subscribe_many_atomic` (an all-or-nothing variant that rolls back on partial failure) is **not** provided in V1 — the proven .NET reference has no atomic equivalent and the wire offers no transactional advise primitive.
### Buffered subscriptions (NMX only)
`subscribe_buffered` mirrors the .NET reference's `MxNativeSession.RegisterBufferedItemAsync`, which takes the dual-string `itemDefinition`/`itemContext` split plus an `itemHandle: int` (src/MxNativeClient/MxNativeSession.cs:272-310). The Rust API takes the same parameters explicitly via a `BufferedSubscription` request struct — no convenience overload that hides the `(definition, context, item_handle)` triple is offered, because a consumer that omits any of the three cannot reproduce the captured Frida bodies.
```rust
let mut sub = session.subscribe_buffered(BufferedSubscription {
definition: "TestMachine_001.TestHistoryValue",
context: "", // optional, may be empty per RegisterBufferedItemAsync:279
item_handle: 0x1001, // i32 mapped to NmxReferenceRegistrationMessage.ItemHandle
options: BufferedOptions {
sample_interval: Duration::from_millis(100),
max_queue_size: 1000,
},
}).await?;
while let Some(batch) = sub.next().await {
for sample in batch?.samples {
// sample is a DataChange
}
}
```
`subscribe_buffered` is gated on the `nmx` feature and returns `Stream<Item = Result<DataChangeBatch, Error>>`. The deployed AVEVA provider may emit single-sample batches even with buffering enabled — see risk R2 in `70-risks-and-open-questions.md`. The API does not synthesise batches; if the wire returns one sample per record, the batch is `samples.len() == 1`.
### Recovery
**Recovery is caller-driven, not automatic.** This mirrors the .NET reference: `MxNativeSession.RecoverConnection` and `RecoverConnectionAsync(policy)` are explicit entry points the consumer invokes — the session never auto-starts recovery on heartbeat loss (src/MxNativeClient/MxNativeSession.cs:383-440). The Rust API exposes the same shape:
```rust
// Caller invokes recovery when they choose, with the policy of their choice.
session.recover_connection(RecoveryPolicy::exponential(
Duration::from_secs(1),
/* max_attempts: */ 5,
Duration::from_secs(60),
)).await?;
```
Heartbeat-loss surfaces as `Error::Connection(...)` on subsequent operations (write/read/subscribe). The caller decides whether to call `recover_connection` based on observed errors and recovery events. There is no implicit recovery thread that resurrects the session in the background.
**In-flight calls during recovery fail.** While `recover_connection` is running, in-flight writes/reads/subscriptions against the previous transport are not paused, replayed, or migrated — they observe the existing transport being torn down and fail with `Error::Connection(...)`. The .NET reference's `_recoveryActive` is similarly just an inbound-callback annotation flag (src/MxNativeClient/MxNativeSession.cs:444,472); concurrent calls against `_service` are not interlocked. The Rust design **does not promise** "the future resumes on the new connection" — the caller is responsible for retrying after a successful `RecoveryEvent::Completed`.
```rust
let mut events = session.recovery_events();
while let Some(ev) = events.next().await {
tracing::info!(?ev, "recovery event");
}
pub enum RecoveryEvent {
Started { attempt: u32 },
Failed { attempt: u32, error: Error, will_retry: bool },
Completed { duration: Duration },
}
```
The event stream mirrors the .NET reference's `MxNativeSession.RecoveryAttemptStarted/Failed/Completed` events one-for-one (src/MxNativeClient/MxNativeSession.cs:121-123).
### Cancellation
Three cancellation surfaces, in order of preference for callers:
1. **Drop the future or handle.** Dropping a `Subscription` signals `UnAdvise` to the long-lived connection task; dropping a `Session` signals `UnregisterEngine` to the same task. **Drop never spawns a new Tokio task** — instead, `Subscription` holds a `tokio::sync::oneshot::Sender<UnAdviseRequest>` (or equivalent unbounded channel sender), and its `Drop` impl sends a message that the connection task drains in its event loop. Drop is therefore safe outside a runtime context and during runtime shutdown — it does not call `tokio::spawn` from `Drop`.
2. **`tokio_util::sync::CancellationToken`.** Long operations (`subscribe_buffered`, recovery, `connect`) accept an optional `CancellationToken` via `*_with_cancellation` variants.
3. **Timeout.** `tokio::time::timeout` works on every operation because `async fn`s are cancel-correct by construction.
**Known runtime-shutdown leak.** If the Tokio runtime is shut down before the connection task has drained pending `UnAdvise`/`UnregisterEngine` messages, those frames are not delivered to the wire. Production code should avoid this by calling `Session::shutdown(timeout).await` (see below) on the orderly-exit path. The .NET reference has the same shape: `Dispose` runs synchronously and calls `_service.UnAdvise(...)` per live subscription before `UnregisterEngine` (src/MxNativeClient/MxNativeSession.cs:483-495). The Rust async equivalent is `Session::shutdown`; relying on `Drop` alone for cleanup is best-effort and documented as such.
### Error model
`mxaccess::Error` is a `thiserror`-derived `#[non_exhaustive]` enum. See `50-error-model.md` for the full surface. All operations return `Result<T, Error>`; **no panics in the public surface**.
A non-Ok `MxStatus` on a returned `DataChange` is data, not an error. A non-Ok status on `read`/`write`/`subscribe`'s synchronous result is an `Err`. This mirrors the .NET reference and is the only sensible split: subscription frames carry status that callers want to inspect ("stale" or "uncertain" is still data); operation results are pass/fail.
### Threading model
- Multi-thread Tokio is the default. Single-thread is supported (no `Send`-only future escapes the local thread) but not the recommended deployment.
- All public types are `Send + Sync`.
- The codec is `Send + Sync` trivially (immutable after parse, owns its bytes).
- The session uses `tokio::sync::Mutex` for the per-connection RPC channel state and `tokio::sync::watch` for recovery state. **No `parking_lot::Mutex`** — sync mutexes inside async paths cause hidden blocking.
### Observability
`tracing` spans on every public operation: `tracing::instrument` on `register`, `write`, `subscribe`, `read`, `recover`. Span fields: `reference`, `correlation_id`, `transport` (`nmx`|`asb`), `engine_ids`. Span events for state transitions (subscription added, callback received, recovery started, recovery completed).
Recommended subscriber filter:
```
mxaccess::session=info,mxaccess::transport=debug
```
Optional `metrics` feature exposes:
- counters: `mxaccess_writes_total`, `mxaccess_subscribes_total`, `mxaccess_callbacks_total`, `mxaccess_recoveries_total`
- histograms: `mxaccess_operation_latency_seconds{op="write"|"read"|"subscribe"}`
### Trait `Transport`
`Transport` uses **native `async fn` in trait** (AFIT, stable in Rust 1.75+) and is **generic-only**. Consumers parameterise sites that take a transport with `impl Transport` or a generic `<T: Transport>` bound. The trait is **not** dyn-compatible — `Box<dyn Transport>` is not supported in V1 — and that limitation is intentional: the design already uses `Session::connect<T: Transport>(...)`-style generic entry points, so giving up `dyn Transport` costs nothing the design currently uses, while keeping zero per-call heap allocation in the hot path. (`#[async_trait]` was the alternative; it allows `dyn Transport` but boxes a `Pin<Box<dyn Future>>` per call — accepted as a known cost only if a future revision needs runtime polymorphism.)
```rust
pub trait Transport: Send + Sync {
fn capabilities(&self) -> TransportCapabilities;
async fn register(&self, options: &ConnectionOptions) -> Result<RegisteredEngine, Error>;
async fn unregister(&self, engine: &RegisteredEngine) -> Result<(), Error>;
async fn write(
&self,
handle: &MxReferenceHandle,
value: &MxValue,
opts: WriteOptions,
) -> Result<WriteOutcome, Error>;
async fn advise(
&self,
handle: &MxReferenceHandle,
opts: AdviseOptions,
) -> Result<SubscriptionHandle, Error>;
async fn unadvise(&self, sub: SubscriptionHandle) -> Result<(), Error>;
fn callbacks(&self) -> CallbackStream;
}
pub struct TransportCapabilities {
pub timestamped_writes: bool,
pub secured_writes: bool,
pub buffered_subscriptions: bool,
pub supervisory_advise: bool,
pub operation_complete_events: bool,
}
```
Two implementations: `NmxTransport` (capabilities mostly true) and `AsbTransport` (capabilities mostly false; see `70-risks-and-open-questions.md`).
Calling a NMX-only API on an `AsbTransport` returns `Error::Unsupported { operation: Cow<'static, str>, transport: TransportKind }`. The `Cow` is used so the variant accepts both interned `&'static str` literals (the common case) and runtime-formatted operation names without allocation when not required; `TransportKind` is the corresponding `enum TransportKind { Nmx, Asb }` (matching the `transport` span field at line 204). The `Session` may also pre-flight via `transport.capabilities()` to give a better error message before issuing the call.
### Public surface (re-exports)
```rust
pub use mxaccess_codec::{
MxReferenceHandle, MxStatus, MxStatusCategory, MxStatusSource,
MxValue, MxValueKind, MxDataType,
};
pub struct Session;
pub struct Subscription;
pub struct DataChange {
pub reference: Arc<str>,
pub value: MxValue,
/// Legacy 16-bit OPC quality (e.g. `0xC0` = 192 = "Good"). Distinct from
/// `status: MxStatus` — both are surfaced because real MxAccess
/// (`OnDataChange(hServer, hItem, MxDataType, value, quality, timestamp,
/// statuses)`) carries them as separate fields. Verified against
/// `wwtools/mxaccesscli/docs/api-notes.md:104-105` ("quality on
/// OnDataChange is the legacy 16-bit OPC quality value … the richer state
/// lives in the statuses[] array") and
/// `wwtools/mxaccesscli/src/MxAccess.Cli/Mx/MxUpdate.cs:13-22,39-65`.
/// Earlier drafts of this design dropped `quality` as redundant with
/// `status`; that was a parity break and has been restored.
pub quality: u16,
pub timestamp: SystemTime,
pub status: MxStatus,
}
pub struct DataChangeBatch {
pub reference: Arc<str>,
pub samples: Vec<DataChange>,
}
// Note on `quality`: `DataChange` carries a 16-bit OPC quality alongside
// `status: MxStatus`. They are distinct: `quality` is the legacy wire field
// (e.g. `0xC0` = "Good"), preserved for parity with real MxAccess
// (`OnDataChange` exposes both). The canonical projection from a wire record
// to a typed status is `Record.ToDataChangeStatus()` in the .NET reference
// (src/MxNativeClient/MxNativeSession.cs:70), which produces an `MxStatus`.
// Consumers that need the historical "quality" view (Good/Uncertain/Bad on
// bits 7..6) read it from `status.detail` and `status.category` rather than
// from a redundant raw u16. Exposing both invites callers to use the wrong
// field; the codec's `MxStatus` is the single source of truth.
pub struct ConnectionOptions;
pub struct WriteOptions;
pub struct AdviseOptions;
pub struct BufferedOptions;
pub struct RecoveryPolicy;
pub enum RecoveryEvent { Started { .. }, Failed { .. }, Completed { .. } }
pub struct Credentials;
pub struct SecurityContext;
pub enum Error; // see 50-error-model.md
```
## `mxaccess-compat` (optional)
`LMXProxyServer`-shaped methods on top of `Session`. Each method maps one-to-one to a `Session::*` operation:
```rust
let server = mxaccess_compat::Server::new(session);
let server_handle = server.register("MyClient");
let item_handle = server.add_item(server_handle, "TestObject.TestInt").await?;
server.advise(server_handle, item_handle).await?;
server.write(server_handle, item_handle, MxValue::Int32(123), user_id).await?;
```
Useful for porting code that depends on the COM API shape; not the recommended consumer surface. Not COM-visible by itself; a separate `mxaccess-compat-com` crate (deferred to post-V1) will register `windows-rs`-generated COM classes that wrap this.
## Examples
End-to-end consumer-grade examples in `rust/examples/`:
- `connect-write-read.rs` — open session, write, read back
- `subscribe.rs` — long-running subscription
- `subscribe-buffered.rs` — buffered subscription (NMX feature)
- `asb-subscribe.rs` — ASB subscription
- `recovery.rs` — recovery policy + recovery events
- `multi-tag.rs``subscribe_many` on a 100-tag set
- `secured-write.rs``write_secured` (no timestamp) and `write_secured_at` (timestamped), each taking `(current_user_id, verifier_user_id)`; demonstrates both single-user (`current == verifier`) and two-person verification paths
## Code sample (full)
```rust
use std::time::Duration;
use futures::StreamExt;
use mxaccess::{Session, ConnectionOptions, Credentials, MxValue, RecoveryPolicy};
#[tokio::main(flavor = "multi_thread")]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let mut session = Session::connect(
ConnectionOptions::nmx("localhost")
.galaxy_id(1)
.platform_id(1)
.engine_id(420)
.credentials(Credentials::current_user())
.galaxy_db(std::env::var("MX_GALAXY_DB")?)
.recovery(RecoveryPolicy::exponential(
Duration::from_secs(1), 5, Duration::from_secs(60),
)),
).await?;
session.write("TestChildObject.TestInt", MxValue::Int32(123)).await?;
let mut sub = session.subscribe("TestChildObject.TestInt").await?;
while let Some(change) = sub.next().await {
let change = change?;
tracing::info!(value = ?change.value, ts = ?change.timestamp, "data change");
}
Ok(())
}
```
## What the async layer does **not** do
- It does not pretend to be sync. There is no `block_on` shortcut in the public API.
- It does not support multiple async runtimes. Tokio only.
- It does not transmit raw bytes. All operations go through the codec.
- It does not retry by default. Recovery is opt-in via `ConnectionOptions::recovery(...)` at session construction. There is no runtime mutator for `RecoveryPolicy`: `Session` is `Clone + Arc<SessionInner>`-backed, so a `&mut self` setter on a clone would not propagate to other clones; the policy is fixed once and shared by every clone. Consumers that need a different policy build a new `Session`.
- It does not own a thread pool. It uses Tokio's runtime.
- It does not synthesise events the wire does not produce. `WriteCompleted` only fires when the proven 5-byte completion frame is observed; otherwise `RawStatus` is exposed verbatim through `Session::operation_status_events()`. See `70-risks-and-open-questions.md` R3/R4.