Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| a5d31cc2e1 | |||
| 48d3a9d6da | |||
| af939730b1 |
@@ -6,6 +6,48 @@ move to `## Resolved` with a date + commit hash.
|
||||
|
||||
## Open
|
||||
|
||||
### F18 — M5 plan of attack (ASB transport, parallel-safe sub-streams)
|
||||
**Severity:** P0 — milestone driver, blocks ASB consumers + V1 release
|
||||
**Source:** `design/dependencies.md:73-89` + `design/60-roadmap.md:84-91` + `design/70-risks-and-open-questions.md:5-25` (R1 estimates ~3000 LoC for framing+encoders).
|
||||
|
||||
**Scope.** Build the ASB data-plane end-to-end:
|
||||
- `mxaccess-asb-nettcp` — `[MS-NMF]` framing + `[MC-NBFX]` binary-XML node codec + `[MC-NBFS]` static dictionary table + DH/HMAC/AES authentication crypto.
|
||||
- `mxaccess-asb` — `IASBIDataV2` client (Connect, RegisterItems, Read, Write, PublishWriteComplete, CreateSubscription, AddMonitoredItems, Publish, Disconnect) + `SecretProvider` trait + DPAPI default impl + ASB Variant codec port (currently a stub at `crates/mxaccess-codec/src/lib.rs:74,77,80`).
|
||||
- `mxaccess::Session` over an `AsbTransport` impl; capabilities surface ASB limits (no `subscribe_buffered`, no Activate/Suspend, no OperationComplete outside the proven write-completion frame — see `design/60-roadmap.md:88`).
|
||||
- `examples/asb-subscribe.rs` exercises the whole path against a live ASB endpoint with parity vs `dotnet run --project src\MxAsbClient.Probe`.
|
||||
|
||||
**Sub-stream breakdown** (matches `design/dependencies.md:78-89`). Each sub-stream is a separate followup so it can be claimed by a separate agent in a worktree without merge conflict:
|
||||
|
||||
| Sub-followup | Stream | Owns | Depends on |
|
||||
|---|---|---|---|
|
||||
| F19 | (workspace prereq) | Add the M5 dep set to `rust/Cargo.toml` workspace deps + per-crate `Cargo.toml`: `aes`, `hmac`, `md-5`, `sha1`, `sha2`, `pbkdf2`, `flate2`, `rand`, `crypto-bigint` (constant-time DH per `review.md` MAJOR), `quick-xml`, `tokio-util`. Pinned to the `digest 0.11`/`cipher 0.5` generation per `design/30-crate-topology.md:251-289`. Sequential prereq for the others. | M0 |
|
||||
| F20 | A — MS-NMF framing | `mxaccess-asb-nettcp::nmf` — preamble (`0x00 ver=1 mode=2 via=encoded-string`), preamble-ack, sized-envelope (`0x06 var-int len bytes`), end (`0x07`), fault (`0x08`), upgrade-request, known-encoding via lookup. Reliable-session ack handling. Round-trip against `analysis/proxy/mxasbclient-register-message.txt` and `mxasbclient-probe-stage*.txt` byte traces. | F19 |
|
||||
| F21 | B — MC-NBFX | `mxaccess-asb-nettcp::nbfx` — record types (`0x40` ShortElement, `0x41` Element, `0x44` ShortDictionaryAttribute, `0x04` PrefixDictionary*A-Z, `0x84` BoolText, `0x88` Int32Text, `0x86` BoolFalseText, etc., per `[MC-NBFX]` §2.2). Length-prefixed strings (var-int 7-bit groups). Read/write over `bytes::BytesMut`. | F19 |
|
||||
| F22 | C — MC-NBFS | `mxaccess-asb-nettcp::nbfs` — the static dictionary table. SOAP/WS-Addressing tokens + `IASBIDataV2`-action strings used by the operation set (`http://ASB.IDataV2:registerItemsIn`, `:readIn`, `:writeIn`, `:createSubscriptionIn`, `:publishIn`, etc., see `src/MxAsbClient/AsbContracts.cs:14-58`). Hand-rolled from the proven action set; the full WCF dictionary is much larger but only the action subset is on the wire. | F19 |
|
||||
| F23 | D — Auth crypto | `mxaccess-asb-nettcp::auth` — port `src/MxAsbClient/AsbSystemAuthenticator.cs` (167 LoC): DH key exchange with `crypto-bigint` constant-time `mod_exp` (review.md MAJOR finding — .NET `BigInteger.ModPow` is **not** constant-time and the DH private exponent is long-lived per `cs:153-166`); HMAC-MD5/SHA1/SHA512 (negotiated per `AsbSolutionCryptoParameters.HashAlgorithm`); AES-128 with PBKDF2-SHA1 1000-iteration key derivation; deflate-then-encrypt `EncryptBaktun` vs raw-encrypt `EncryptApollo` distinguished by `:V2` lifetime suffix (`cs:48`); ASCII salt `"ArchestrAService"`; UTF-16LE passphrase. Plus DPAPI shared-secret read on Windows behind the existing `dpapi` feature gate, with a `SecretProvider::shared_secret(&[u8])` escape hatch for tests/CI (`design/30-crate-topology.md:150`). | F19 |
|
||||
| F24 | (codec) | `mxaccess-codec::asb_variant` — fill in the stubbed `AsbVariant`, `AsbStatus`, `RuntimeValue` (`crates/mxaccess-codec/src/lib.rs:74,77,80`) per `docs/ASB-Variant-Wire-Format.md`. Decode/encode for the proven type matrix: `TypeBool`, `TypeInt32`, `TypeFloat`, `TypeDouble`, `TypeString`, `TypeDateTime`, `TypeDuration`, plus deployed array shapes (`work_remain.md:108-113`). Less-common scalars stay as raw bytes (matches .NET `DecodeVariant` fallback at `MxAsbDataClient.cs:748`). Independent of the framing/encoder work — separate crate. | M1 (envelope/status types) |
|
||||
| F25 | E — IASBIDataV2 client | `mxaccess-asb::client` — top-level `AsbClient` with `connect`, `register_items`, `read`, `write`, `publish_write_complete`, `create_subscription`, `add_monitored_items`, `publish`, `disconnect`. Wires the contract → NBFX-encoded SOAP envelope → NMF-framed TCP. `ConnectedRequest::ConnectionValidator` HMAC signing per `AsbSystemAuthenticator::Sign`. Receives `Publish` callbacks via a long-lived background task (mirrors the M4 NMX `callback_router` pattern). Depends on F20+F21+F22+F23+F24. | A+B+C+D+codec |
|
||||
| F26 | (session) | `mxaccess::Session` over `AsbTransport`. New transport impl alongside `NmxTransport`. Surface ASB capability flags so `subscribe_buffered`/`activate`/`suspend` return `Error::Unsupported(Capability::*)` rather than a runtime fallthrough. Update `examples/asb-subscribe.rs` to drive the path end-to-end. Live-probe DoD: round-trip parity with `dotnet run --project src\MxAsbClient.Probe`. | F25 |
|
||||
|
||||
**Parallel-safety analysis.**
|
||||
- F19 (workspace deps) is the **single sequential bottleneck** — F20-F25 all reference workspace deps that don't exist yet, so they cannot start in parallel until F19 lands. Tight & small (~30 lines of TOML).
|
||||
- F20, F21, F22, F23, F24 are **fully parallel-safe** after F19: each owns a different module under a different crate (or different sibling module within `mxaccess-asb-nettcp`). No shared state, no cross-import — each can land in its own commit. Per `dependencies.md:88` "Peak agents in parallel: 4 in the framing/encoding wave (A+B+C+D)".
|
||||
- F25 is sequential after the four framing/encoder streams + F24 land — it composes them. The .NET `MxAsbDataClient` is monolithic enough that splitting F25 across agents costs more in coordination than it saves.
|
||||
- F26 is sequential after F25.
|
||||
- **Cross-milestone parallelism still holds.** M5 (this whole F18-F26 cluster) runs in parallel with M3+M4 per `design/60-roadmap.md:14-17` because the `Transport` trait was lifted into M0. M4's `Session` core landed (commits `4863c6d`, `2dc091d`, `a31237d`); the F26 `AsbTransport` plugs into the same trait without re-design.
|
||||
|
||||
**Risk-driven sequencing inside the parallel wave.** R1 in `design/70-risks-and-open-questions.md:9` is the project-blocker. Of the four parallel streams, F23 (auth crypto) carries the most live-probe risk (DH handshake against the live VM is the first irreversible test of the spec port) but is the smallest in LoC. F22 (NBFS) is the largest unknown — the dictionary table size is bounded only by the action subset we exercise. Recommended order *if* agents are constrained: F23 (smallest, highest-leverage) → F20 (foundational for any wire test) → F21 (encoder) → F22 (dictionary) → F24 (codec, independent).
|
||||
|
||||
**Definition of done** for F18 as a whole (= M5 DoD per `design/60-roadmap.md:91`):
|
||||
1. `cargo run -p mxaccess --example asb-subscribe -- --tag TestChildObject.TestInt` succeeds against a live ASB endpoint.
|
||||
2. Round-trip parity with `dotnet run --project src\MxAsbClient.Probe` (Frida/Wireshark diff is byte-identical for the proven type matrix).
|
||||
3. The `mxaccess-asb` type matrix covers what `work_remain.md:108-113` documents as proven: scalar Boolean, Int32, Float, Double, String, DateTime, Duration plus deployed array tags.
|
||||
4. `cargo build --workspace` and `cargo test --workspace` green; `cargo clippy --workspace -- -D warnings` clean.
|
||||
|
||||
**Resolves when:** F19-F26 are all closed and the four DoD bullets above pass.
|
||||
|
||||
**This-iteration execution slice.** Land F19 (workspace deps) sequentially first, then F23 (auth crypto port — smallest stream, fully self-contained, exercises the largest set of new deps in one place to validate the dep choice). F20/F21/F22/F24/F25/F26 stay open for follow-up iterations or parallel agent fan-out.
|
||||
|
||||
### F2 — NTLM verify_signature path + constant-time MAC compare (server-to-client direction)
|
||||
**Severity:** P2
|
||||
**Source:** M2 wave 1, `crates/mxaccess-rpc/src/ntlm.rs`
|
||||
@@ -88,3 +130,6 @@ move to `## Resolved` with a date + commit hash.
|
||||
|
||||
### F9 — `ObjectExporterClient.cs` ResolveOxid wrapper methods
|
||||
**Resolved:** 2026-05-05. Both portable methods land in `crates/mxaccess-rpc/src/object_exporter_client.rs`: `resolve_oxid_unauthenticated` (mirrors `cs:14-30`) and `resolve_oxid_with_managed_ntlm_packet_integrity` (mirrors `cs:66-81`). Each opens a TCP connection, binds to `IObjectExporter`, calls opnum 0 with the encoded request, and decodes the response — preferring `parse_resolve_oxid_result` then falling back to `parse_resolve_oxid_failure` for short stubs. The two SSPI flavours (`ResolveOxidWithNtlmConnect`, `ResolveOxidWithNtlmPacketIntegrity`) wrap .NET's `System.Net.Security.SspiClientContext` and are explicitly out of scope for the Rust port — that's a permanent skip, not a deferral.
|
||||
|
||||
### F17 — `Guid::parse_str` helper (dashed-hex string parser)
|
||||
**Resolved:** 2026-05-05. `Guid::parse_str(&str) -> Result<Guid, RpcError>` landed in `crates/mxaccess-rpc/src/guid.rs:65-112` as the inverse of the existing `Display` impl. Accepts the canonical dashed-hex form, optionally wrapped in `{}` braces (.NET `B` format), case-insensitive, and tolerant of bare 32-char hex without dashes. Single-pass char-by-char nibble accumulator avoids per-byte string allocation; the same byte-swap of groups 1-3 the Display impl does is applied after the raw hex pass. Eight new tests cover round-trip against the `Display` fixture (`b49f92f7-c748-4169-8eca-a0670b012746`), braces, uppercase, no-dashes, zero-GUID, too-short, too-long, and non-hex rejection. The five live-NMX examples (`connect-write-read`, `subscribe`, `recovery`, `multi-tag`, `secured-write`) lost their per-file 15-line `parse_guid` helpers in favour of the canonical implementation. Test count delta: 524 → 532 (+8).
|
||||
|
||||
@@ -46,6 +46,71 @@ impl Guid {
|
||||
Ok(Self(out))
|
||||
}
|
||||
|
||||
/// Parse a `12345678-1234-1234-1234-123456789012` style GUID string
|
||||
/// into wire-byte form. Inverse of the [`std::fmt::Display`] impl.
|
||||
///
|
||||
/// Accepts the canonical dashed-hex form, optionally wrapped in
|
||||
/// `{...}` braces (the .NET `B` format). Case-insensitive. The
|
||||
/// first three hex groups are stored little-endian on the wire (per
|
||||
/// the module docstring) so the parser byte-swaps them after the
|
||||
/// raw hex pass.
|
||||
///
|
||||
/// There is no .NET reference to mirror here — the Display impl is
|
||||
/// the spec, this is its inverse.
|
||||
///
|
||||
/// # Errors
|
||||
/// Returns [`crate::error::RpcError::Decode`] if the input is not
|
||||
/// 32 hex chars (with 4 optional dashes and optional outer braces),
|
||||
/// or contains a non-hex character.
|
||||
pub fn parse_str(s: &str) -> Result<Self, crate::error::RpcError> {
|
||||
let trimmed = s.trim_start_matches('{').trim_end_matches('}');
|
||||
// Strip dashes; everything else must be a hex digit.
|
||||
let mut bytes = [0u8; 16];
|
||||
let mut nibble_count = 0usize;
|
||||
let mut acc: u8 = 0;
|
||||
for c in trimmed.chars() {
|
||||
if c == '-' {
|
||||
continue;
|
||||
}
|
||||
let digit = match c.to_digit(16) {
|
||||
Some(d) => d as u8,
|
||||
None => {
|
||||
return Err(crate::error::RpcError::Decode {
|
||||
offset: nibble_count / 2,
|
||||
reason: "non-hex character in guid",
|
||||
buffer_len: trimmed.len(),
|
||||
});
|
||||
}
|
||||
};
|
||||
if nibble_count >= 32 {
|
||||
return Err(crate::error::RpcError::Decode {
|
||||
offset: 16,
|
||||
reason: "guid hex too long",
|
||||
buffer_len: trimmed.len(),
|
||||
});
|
||||
}
|
||||
if nibble_count % 2 == 0 {
|
||||
acc = digit << 4;
|
||||
} else {
|
||||
bytes[nibble_count / 2] = acc | digit;
|
||||
}
|
||||
nibble_count += 1;
|
||||
}
|
||||
if nibble_count != 32 {
|
||||
return Err(crate::error::RpcError::Decode {
|
||||
offset: nibble_count / 2,
|
||||
reason: "guid hex too short",
|
||||
buffer_len: trimmed.len(),
|
||||
});
|
||||
}
|
||||
// Byte-swap the first three groups so the resulting bytes match
|
||||
// the wire layout the Display impl reads.
|
||||
bytes[0..4].reverse();
|
||||
bytes[4..6].reverse();
|
||||
bytes[6..8].reverse();
|
||||
Ok(Self(bytes))
|
||||
}
|
||||
|
||||
/// Write the 16 wire bytes into `dst[..16]`. Mirrors .NET
|
||||
/// `Guid.TryWriteBytes(span)`.
|
||||
///
|
||||
@@ -142,4 +207,80 @@ mod tests {
|
||||
"00000000-0000-0000-0000-000000000000"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_str_round_trips_display() {
|
||||
// The dashed-hex form from the display fixture above.
|
||||
let g = Guid::parse_str("b49f92f7-c748-4169-8eca-a0670b012746").unwrap();
|
||||
assert_eq!(
|
||||
g.0,
|
||||
[
|
||||
0xF7, 0x92, 0x9F, 0xB4, 0x48, 0xC7, 0x69, 0x41, 0x8E, 0xCA, 0xA0, 0x67, 0x0B, 0x01,
|
||||
0x27, 0x46,
|
||||
]
|
||||
);
|
||||
// Round-trip back via Display.
|
||||
assert_eq!(g.to_string(), "b49f92f7-c748-4169-8eca-a0670b012746");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_str_accepts_braces() {
|
||||
// .NET "B" format wraps the dashed-hex form in `{}`.
|
||||
let g = Guid::parse_str("{b49f92f7-c748-4169-8eca-a0670b012746}").unwrap();
|
||||
assert_eq!(g.to_string(), "b49f92f7-c748-4169-8eca-a0670b012746");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_str_accepts_uppercase() {
|
||||
let g = Guid::parse_str("B49F92F7-C748-4169-8ECA-A0670B012746").unwrap();
|
||||
assert_eq!(g.to_string(), "b49f92f7-c748-4169-8eca-a0670b012746");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_str_accepts_no_dashes() {
|
||||
let g = Guid::parse_str("b49f92f7c74841698ecaa0670b012746").unwrap();
|
||||
assert_eq!(g.to_string(), "b49f92f7-c748-4169-8eca-a0670b012746");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_str_round_trips_zero() {
|
||||
let g = Guid::parse_str("00000000-0000-0000-0000-000000000000").unwrap();
|
||||
assert_eq!(g, Guid::ZERO);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_str_rejects_too_short() {
|
||||
let err = Guid::parse_str("b49f92f7-c748-4169-8eca-a0670b0127").unwrap_err();
|
||||
assert!(matches!(
|
||||
err,
|
||||
crate::error::RpcError::Decode {
|
||||
reason: "guid hex too short",
|
||||
..
|
||||
}
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_str_rejects_too_long() {
|
||||
let err = Guid::parse_str("b49f92f7-c748-4169-8eca-a0670b01274600").unwrap_err();
|
||||
assert!(matches!(
|
||||
err,
|
||||
crate::error::RpcError::Decode {
|
||||
reason: "guid hex too long",
|
||||
..
|
||||
}
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_str_rejects_non_hex() {
|
||||
let err = Guid::parse_str("b49f92f7-c748-4169-8eca-a0670b01274z").unwrap_err();
|
||||
assert!(matches!(
|
||||
err,
|
||||
crate::error::RpcError::Decode {
|
||||
reason: "non-hex character in guid",
|
||||
..
|
||||
}
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +1,44 @@
|
||||
//! `asb-subscribe` — subscribe via the ASB transport.
|
||||
//! `asb-subscribe` — subscribe via the ASB transport (M5 placeholder).
|
||||
//!
|
||||
//! M0 stub. See `design/60-roadmap.md` M5.
|
||||
//! ASB (`net.tcp` to MxDataProvider) is the M5 milestone — the
|
||||
//! `mxaccess-asb-nettcp` framing crate and `mxaccess-asb` operations crate
|
||||
//! are scaffolded but not yet wired into `Session`. Once M5 lands the demo
|
||||
//! body below becomes a ~30-line subscribe + drain identical in shape to
|
||||
//! `subscribe.rs`, just over the ASB transport.
|
||||
//!
|
||||
//! See `design/60-roadmap.md` M5 for the operations matrix and
|
||||
//! `docs/ASB-Native-Integration-Decision.md` for why ASB is the preferred
|
||||
//! data-plane.
|
||||
|
||||
fn main() {
|
||||
eprintln!("asb-subscribe: stubbed (M0). See design/60-roadmap.md M5.");
|
||||
use mxaccess::{ConnectionOptions, Session};
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
if std::env::var_os("MX_LIVE").is_none() {
|
||||
eprintln!(
|
||||
"MX_LIVE not set — `asb-subscribe` is the M5 placeholder; \
|
||||
run `. tools/Setup-LiveProbeEnv.ps1` once the ASB transport lands."
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
match Session::connect(ConnectionOptions).await {
|
||||
Ok(_) => {
|
||||
eprintln!(
|
||||
"Session::connect returned Ok unexpectedly — \
|
||||
update this example once M5 wires the ASB transport."
|
||||
);
|
||||
}
|
||||
Err(mxaccess::Error::Unsupported {
|
||||
operation,
|
||||
transport,
|
||||
}) => {
|
||||
eprintln!(
|
||||
"{operation} on {transport:?}: deferred to M5. See \
|
||||
design/60-roadmap.md M5 for the ASB transport operations matrix."
|
||||
);
|
||||
}
|
||||
Err(e) => return Err(e.into()),
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1,7 +1,156 @@
|
||||
//! `connect-write-read` — connect, write a value, read it back.
|
||||
//! `connect-write-read` — connect to NMX, write a value, read it back, shut down.
|
||||
//!
|
||||
//! M0 stub. Filled in as M4 lands. See `design/60-roadmap.md` M4 DoD.
|
||||
//! The 30-line consumer-experience target from `design/60-roadmap.md` M4 DoD.
|
||||
//!
|
||||
//! # Required env vars
|
||||
//!
|
||||
//! Populate via `tools/Setup-LiveProbeEnv.ps1` (dot-source it):
|
||||
//!
|
||||
//! - `MX_LIVE` (any non-empty value enables the live path)
|
||||
//! - `MX_NMX_HOST` — NMX endpoint host[:port]; defaults port 135 if omitted
|
||||
//! - `MX_NMX_SERVICE_IPID` — `INmxService2` IPID (UUID; auto-resolution is F12)
|
||||
//! - `MX_RPC_USER`, `MX_RPC_PASSWORD`, `MX_RPC_DOMAIN` — NTLM creds
|
||||
//! - `MX_TEST_TAG` — tag reference (default `TestChildObject.TestInt`)
|
||||
//!
|
||||
//! # Resolver shim
|
||||
//!
|
||||
//! Examples ship with an inline `StaticResolver` that returns canned metadata
|
||||
//! for the configured `MX_TEST_TAG`. Production consumers should plug in a
|
||||
//! `tiberius`-backed Galaxy SQL resolver (followup F14 in
|
||||
//! `design/followups.md`); the `Resolver` trait is `Send + Sync` so any
|
||||
//! concrete impl drops in.
|
||||
|
||||
fn main() {
|
||||
eprintln!("connect-write-read: stubbed (M0). See design/60-roadmap.md M4.");
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use mxaccess::{
|
||||
GalaxyTagMetadata, RecoveryPolicy, Resolver, ResolverError, Session, SessionOptions, WriteValue,
|
||||
};
|
||||
use mxaccess_rpc::guid::Guid;
|
||||
use mxaccess_rpc::ntlm::NtlmClientContext;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let Some(env) = LiveEnv::from_process()? else {
|
||||
eprintln!(
|
||||
"MX_LIVE not set — skipping live demo. Run \
|
||||
`. tools/Setup-LiveProbeEnv.ps1` to populate the required env vars."
|
||||
);
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let session = Session::connect_nmx(
|
||||
env.addr,
|
||||
SessionOptions::default(),
|
||||
NtlmClientContext::from_env()?,
|
||||
env.service_ipid,
|
||||
Arc::new(StaticResolver::new(&env.tag)),
|
||||
RecoveryPolicy::default(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
eprintln!("connected; writing {} = 123", env.tag);
|
||||
session
|
||||
.write_value(&env.tag, WriteValue::Int32(123))
|
||||
.await?;
|
||||
|
||||
eprintln!("reading {}; timeout 5s", env.tag);
|
||||
let dc = session.read(&env.tag, Duration::from_secs(5)).await?;
|
||||
println!(
|
||||
"{} = {:?} (quality 0x{:x})",
|
||||
dc.reference, dc.value, dc.quality
|
||||
);
|
||||
|
||||
session.shutdown_nmx().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ---- live-env wiring (duplicated across examples; see module docstring) -----
|
||||
|
||||
struct LiveEnv {
|
||||
addr: std::net::SocketAddr,
|
||||
service_ipid: Guid,
|
||||
tag: String,
|
||||
}
|
||||
|
||||
impl LiveEnv {
|
||||
fn from_process() -> Result<Option<Self>, Box<dyn std::error::Error>> {
|
||||
if std::env::var_os("MX_LIVE").is_none() {
|
||||
return Ok(None);
|
||||
}
|
||||
let host = std::env::var("MX_NMX_HOST")?;
|
||||
let addr = parse_host_port(&host, 135)?;
|
||||
let service_ipid = Guid::parse_str(&std::env::var("MX_NMX_SERVICE_IPID")?)?;
|
||||
let tag = std::env::var("MX_TEST_TAG").unwrap_or_else(|_| "TestChildObject.TestInt".into());
|
||||
Ok(Some(Self {
|
||||
addr,
|
||||
service_ipid,
|
||||
tag,
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_host_port(
|
||||
s: &str,
|
||||
default_port: u16,
|
||||
) -> Result<std::net::SocketAddr, Box<dyn std::error::Error>> {
|
||||
if let Ok(addr) = s.parse() {
|
||||
return Ok(addr);
|
||||
}
|
||||
let with_port = if s.contains(':') {
|
||||
s.to_string()
|
||||
} else {
|
||||
format!("{s}:{default_port}")
|
||||
};
|
||||
Ok(
|
||||
std::net::ToSocketAddrs::to_socket_addrs(&with_port.as_str())?
|
||||
.next()
|
||||
.ok_or("no addrs resolved")?,
|
||||
)
|
||||
}
|
||||
|
||||
// ---- canned in-memory resolver ----------------------------------------------
|
||||
|
||||
struct StaticResolver {
|
||||
tag_reference: String,
|
||||
metadata: GalaxyTagMetadata,
|
||||
}
|
||||
|
||||
impl StaticResolver {
|
||||
fn new(tag_reference: &str) -> Self {
|
||||
let (object, attribute) = tag_reference
|
||||
.split_once('.')
|
||||
.unwrap_or((tag_reference, "TestInt"));
|
||||
Self {
|
||||
tag_reference: tag_reference.to_string(),
|
||||
metadata: GalaxyTagMetadata {
|
||||
object_tag_name: object.to_string(),
|
||||
attribute_name: attribute.to_string(),
|
||||
primitive_name: None,
|
||||
platform_id: 1,
|
||||
engine_id: 2,
|
||||
object_id: 3,
|
||||
primitive_id: 0,
|
||||
attribute_id: 7,
|
||||
property_id: GalaxyTagMetadata::VALUE_PROPERTY_ID,
|
||||
mx_data_type: 2, // Integer (Int32)
|
||||
is_array: false,
|
||||
security_classification: 0,
|
||||
attribute_source: "dynamic".into(),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Resolver for StaticResolver {
|
||||
async fn resolve(&self, tag: &str) -> Result<GalaxyTagMetadata, ResolverError> {
|
||||
if tag == self.tag_reference {
|
||||
Ok(self.metadata.clone())
|
||||
} else {
|
||||
Err(ResolverError::NotFound {
|
||||
tag_reference: tag.to_string(),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,8 +1,195 @@
|
||||
//! `multi-tag` — `subscribe_many` over several tags. Non-atomic per-tag
|
||||
//! advise loop matching `MxNativeSession.SubscribeAsync` (`MxNativeSession.cs:250-270`).
|
||||
//! `multi-tag` — subscribe to several tags in one session.
|
||||
//!
|
||||
//! M0 stub.
|
||||
//! Demonstrates the per-tag advise loop that mirrors
|
||||
//! `MxNativeSession.SubscribeAsync` (`MxNativeSession.cs:250-270`) — there
|
||||
//! is no atomic "subscribe-many" RPC on the LMX wire, so consumers
|
||||
//! issue one `AdviseSupervisory` per tag. The high-level
|
||||
//! `Session::subscribe_many` shim is currently `Unsupported` (it is
|
||||
//! reserved for a future atomic frame; per `wwtools/mxaccesscli/`
|
||||
//! research no such frame is documented) — the loop below is the
|
||||
//! consumer-side replacement.
|
||||
//!
|
||||
//! Streams are merged with `futures_util::stream::select_all`. The
|
||||
//! example takes a comma-separated `MX_TEST_TAGS` env var (default:
|
||||
//! two canned tags); both must be writable through the inline
|
||||
//! `StaticResolver`.
|
||||
|
||||
fn main() {
|
||||
eprintln!("multi-tag: stubbed (M0). See design/60-roadmap.md M4.");
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use futures_util::stream::{StreamExt, select_all};
|
||||
use mxaccess::{
|
||||
GalaxyTagMetadata, RecoveryPolicy, Resolver, ResolverError, Session, SessionOptions,
|
||||
Subscription,
|
||||
};
|
||||
use mxaccess_rpc::guid::Guid;
|
||||
use mxaccess_rpc::ntlm::NtlmClientContext;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let Some(env) = LiveEnv::from_process()? else {
|
||||
eprintln!(
|
||||
"MX_LIVE not set — skipping live demo. Run \
|
||||
`. tools/Setup-LiveProbeEnv.ps1` to populate the required env vars."
|
||||
);
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let session = Session::connect_nmx(
|
||||
env.addr,
|
||||
SessionOptions::default(),
|
||||
NtlmClientContext::from_env()?,
|
||||
env.service_ipid,
|
||||
Arc::new(StaticResolver::new(&env.tags)),
|
||||
RecoveryPolicy::default(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Per-tag advise loop. This is the .NET cs:250-270 pattern lifted
|
||||
// into Rust — one round-trip per tag, sequential to keep the wire
|
||||
// ordering deterministic.
|
||||
let mut subs: Vec<Subscription> = Vec::with_capacity(env.tags.len());
|
||||
for tag in &env.tags {
|
||||
eprintln!("subscribing to {tag}");
|
||||
subs.push(session.subscribe(tag).await?);
|
||||
}
|
||||
|
||||
// Merge the streams. select_all yields the first ready item across
|
||||
// all subscriptions; we annotate each event with its source.
|
||||
let labelled: Vec<_> = subs
|
||||
.iter_mut()
|
||||
.enumerate()
|
||||
.map(|(i, s)| s.map(move |item| (i, item)))
|
||||
.collect();
|
||||
let mut merged = select_all(labelled);
|
||||
|
||||
let mut received = 0;
|
||||
while received < 10 {
|
||||
match tokio::time::timeout(Duration::from_secs(10), merged.next()).await {
|
||||
Ok(Some((idx, Ok(dc)))) => {
|
||||
println!("[{}] {} = {:?}", idx, dc.reference, dc.value);
|
||||
received += 1;
|
||||
}
|
||||
Ok(Some((idx, Err(e)))) => {
|
||||
eprintln!("[{idx}] subscription error: {e}");
|
||||
}
|
||||
Ok(None) => {
|
||||
eprintln!("all streams ended");
|
||||
break;
|
||||
}
|
||||
Err(_) => {
|
||||
eprintln!("no update within 10s; exiting after {received} updates");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Drop the merged stream so we can hand the subscriptions back.
|
||||
drop(merged);
|
||||
|
||||
// Best-effort cleanup. The merged drop above released the
|
||||
// subscriptions; we re-subscribe just to get fresh handles for
|
||||
// unsubscribe — except `select_all` consumed them. In real code
|
||||
// structure subs in an Option<> or pull from `subs` before merging.
|
||||
// Simplest demo path: just shutdown which fires UnregisterEngine and
|
||||
// the server cleans up advises on its side.
|
||||
session.shutdown_nmx().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ---- shared boilerplate (see connect-write-read.rs for rationale) ----------
|
||||
|
||||
struct LiveEnv {
|
||||
addr: std::net::SocketAddr,
|
||||
service_ipid: Guid,
|
||||
tags: Vec<String>,
|
||||
}
|
||||
|
||||
impl LiveEnv {
|
||||
fn from_process() -> Result<Option<Self>, Box<dyn std::error::Error>> {
|
||||
if std::env::var_os("MX_LIVE").is_none() {
|
||||
return Ok(None);
|
||||
}
|
||||
let host = std::env::var("MX_NMX_HOST")?;
|
||||
let addr = parse_host_port(&host, 135)?;
|
||||
let service_ipid = Guid::parse_str(&std::env::var("MX_NMX_SERVICE_IPID")?)?;
|
||||
let tags = std::env::var("MX_TEST_TAGS")
|
||||
.unwrap_or_else(|_| "TestChildObject.TestInt,TestChildObject.TestBool".into())
|
||||
.split(',')
|
||||
.map(str::trim)
|
||||
.map(str::to_string)
|
||||
.collect();
|
||||
Ok(Some(Self {
|
||||
addr,
|
||||
service_ipid,
|
||||
tags,
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_host_port(
|
||||
s: &str,
|
||||
default_port: u16,
|
||||
) -> Result<std::net::SocketAddr, Box<dyn std::error::Error>> {
|
||||
if let Ok(addr) = s.parse() {
|
||||
return Ok(addr);
|
||||
}
|
||||
let with_port = if s.contains(':') {
|
||||
s.to_string()
|
||||
} else {
|
||||
format!("{s}:{default_port}")
|
||||
};
|
||||
Ok(
|
||||
std::net::ToSocketAddrs::to_socket_addrs(&with_port.as_str())?
|
||||
.next()
|
||||
.ok_or("no addrs resolved")?,
|
||||
)
|
||||
}
|
||||
|
||||
/// Multi-tag canned resolver. Returns Int32 metadata for any tag whose
|
||||
/// `Object.Attribute` form matches the configured allow-list, with a
|
||||
/// fresh `attribute_id` per tag so AdviseSupervisory frames don't
|
||||
/// collide on the server side.
|
||||
struct StaticResolver {
|
||||
tags: std::collections::HashMap<String, GalaxyTagMetadata>,
|
||||
}
|
||||
|
||||
impl StaticResolver {
|
||||
fn new(tags: &[String]) -> Self {
|
||||
let mut map = std::collections::HashMap::new();
|
||||
for (i, tag) in tags.iter().enumerate() {
|
||||
let (object, attribute) = tag.split_once('.').unwrap_or((tag.as_str(), "TestInt"));
|
||||
map.insert(
|
||||
tag.clone(),
|
||||
GalaxyTagMetadata {
|
||||
object_tag_name: object.to_string(),
|
||||
attribute_name: attribute.to_string(),
|
||||
primitive_name: None,
|
||||
platform_id: 1,
|
||||
engine_id: 2,
|
||||
object_id: 3,
|
||||
primitive_id: 0,
|
||||
attribute_id: 7 + i as i16,
|
||||
property_id: GalaxyTagMetadata::VALUE_PROPERTY_ID,
|
||||
mx_data_type: 2,
|
||||
is_array: false,
|
||||
security_classification: 0,
|
||||
attribute_source: "dynamic".into(),
|
||||
},
|
||||
);
|
||||
}
|
||||
Self { tags: map }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Resolver for StaticResolver {
|
||||
async fn resolve(&self, tag: &str) -> Result<GalaxyTagMetadata, ResolverError> {
|
||||
self.tags
|
||||
.get(tag)
|
||||
.cloned()
|
||||
.ok_or_else(|| ResolverError::NotFound {
|
||||
tag_reference: tag.to_string(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,10 +1,177 @@
|
||||
//! `recovery` — caller-driven `Session::recover_connection(policy)` after
|
||||
//! heartbeat-loss.
|
||||
//! heartbeat-loss, with a `recovery_events()` listener consuming the
|
||||
//! event stream.
|
||||
//!
|
||||
//! Recovery is opt-in, not automatic — see `design/20-async-layer.md` and
|
||||
//! `MxNativeSession.cs:383-440`. In-flight calls during recovery fail.
|
||||
//! M0 stub.
|
||||
//! Recovery is opt-in, not automatic — see `design/20-async-layer.md`
|
||||
//! and `MxNativeSession.cs:383-440`. The wave-2 implementation emits the
|
||||
//! Started/Recovered shape as a no-op; the real reconnect-and-readvise
|
||||
//! loop is followup F16.
|
||||
//!
|
||||
//! See `connect-write-read.rs` for env-var contract and resolver shim
|
||||
//! design notes.
|
||||
|
||||
fn main() {
|
||||
eprintln!("recovery: stubbed (M0). See design/60-roadmap.md M4.");
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use mxaccess::{
|
||||
GalaxyTagMetadata, RecoveryEvent, RecoveryPolicy, Resolver, ResolverError, Session,
|
||||
SessionOptions,
|
||||
};
|
||||
use mxaccess_rpc::guid::Guid;
|
||||
use mxaccess_rpc::ntlm::NtlmClientContext;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let Some(env) = LiveEnv::from_process()? else {
|
||||
eprintln!(
|
||||
"MX_LIVE not set — skipping live demo. Run \
|
||||
`. tools/Setup-LiveProbeEnv.ps1` to populate the required env vars."
|
||||
);
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let session = Session::connect_nmx(
|
||||
env.addr,
|
||||
SessionOptions::default(),
|
||||
NtlmClientContext::from_env()?,
|
||||
env.service_ipid,
|
||||
Arc::new(StaticResolver::new(&env.tag)),
|
||||
RecoveryPolicy::default(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Spawn a listener BEFORE invoking recovery so the Started event
|
||||
// doesn't race the subscribe.
|
||||
let events = session.recovery_events();
|
||||
let listener = tokio::spawn(async move {
|
||||
let mut events = events;
|
||||
let mut count = 0;
|
||||
while let Ok(evt) = events.recv().await {
|
||||
count += 1;
|
||||
match &*evt {
|
||||
RecoveryEvent::Started { attempt } => {
|
||||
eprintln!("recovery #{attempt} started");
|
||||
}
|
||||
RecoveryEvent::Recovered { attempt } => {
|
||||
eprintln!("recovery #{attempt} succeeded");
|
||||
}
|
||||
RecoveryEvent::Failed {
|
||||
attempt,
|
||||
error,
|
||||
will_retry,
|
||||
} => {
|
||||
eprintln!("recovery #{attempt} failed: {error} (will_retry={will_retry})");
|
||||
}
|
||||
// RecoveryEvent is #[non_exhaustive]; future variants land
|
||||
// here without breaking compilation.
|
||||
_ => eprintln!("recovery: unknown event variant"),
|
||||
}
|
||||
if count >= 6 {
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let policy = RecoveryPolicy {
|
||||
max_attempts: 3,
|
||||
delay: Duration::from_millis(250),
|
||||
};
|
||||
eprintln!("invoking recover_connection with {:?}", policy);
|
||||
session.recover_connection(policy).await?;
|
||||
|
||||
// Give the listener a moment to drain the broadcast channel.
|
||||
tokio::time::timeout(Duration::from_millis(500), listener)
|
||||
.await
|
||||
.ok();
|
||||
|
||||
session.shutdown_nmx().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ---- shared boilerplate (see connect-write-read.rs for rationale) ----------
|
||||
|
||||
struct LiveEnv {
|
||||
addr: std::net::SocketAddr,
|
||||
service_ipid: Guid,
|
||||
tag: String,
|
||||
}
|
||||
|
||||
impl LiveEnv {
|
||||
fn from_process() -> Result<Option<Self>, Box<dyn std::error::Error>> {
|
||||
if std::env::var_os("MX_LIVE").is_none() {
|
||||
return Ok(None);
|
||||
}
|
||||
let host = std::env::var("MX_NMX_HOST")?;
|
||||
let addr = parse_host_port(&host, 135)?;
|
||||
let service_ipid = Guid::parse_str(&std::env::var("MX_NMX_SERVICE_IPID")?)?;
|
||||
let tag = std::env::var("MX_TEST_TAG").unwrap_or_else(|_| "TestChildObject.TestInt".into());
|
||||
Ok(Some(Self {
|
||||
addr,
|
||||
service_ipid,
|
||||
tag,
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_host_port(
|
||||
s: &str,
|
||||
default_port: u16,
|
||||
) -> Result<std::net::SocketAddr, Box<dyn std::error::Error>> {
|
||||
if let Ok(addr) = s.parse() {
|
||||
return Ok(addr);
|
||||
}
|
||||
let with_port = if s.contains(':') {
|
||||
s.to_string()
|
||||
} else {
|
||||
format!("{s}:{default_port}")
|
||||
};
|
||||
Ok(
|
||||
std::net::ToSocketAddrs::to_socket_addrs(&with_port.as_str())?
|
||||
.next()
|
||||
.ok_or("no addrs resolved")?,
|
||||
)
|
||||
}
|
||||
|
||||
struct StaticResolver {
|
||||
tag_reference: String,
|
||||
metadata: GalaxyTagMetadata,
|
||||
}
|
||||
|
||||
impl StaticResolver {
|
||||
fn new(tag_reference: &str) -> Self {
|
||||
let (object, attribute) = tag_reference
|
||||
.split_once('.')
|
||||
.unwrap_or((tag_reference, "TestInt"));
|
||||
Self {
|
||||
tag_reference: tag_reference.to_string(),
|
||||
metadata: GalaxyTagMetadata {
|
||||
object_tag_name: object.to_string(),
|
||||
attribute_name: attribute.to_string(),
|
||||
primitive_name: None,
|
||||
platform_id: 1,
|
||||
engine_id: 2,
|
||||
object_id: 3,
|
||||
primitive_id: 0,
|
||||
attribute_id: 7,
|
||||
property_id: GalaxyTagMetadata::VALUE_PROPERTY_ID,
|
||||
mx_data_type: 2,
|
||||
is_array: false,
|
||||
security_classification: 0,
|
||||
attribute_source: "dynamic".into(),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Resolver for StaticResolver {
|
||||
async fn resolve(&self, tag: &str) -> Result<GalaxyTagMetadata, ResolverError> {
|
||||
if tag == self.tag_reference {
|
||||
Ok(self.metadata.clone())
|
||||
} else {
|
||||
Err(ResolverError::NotFound {
|
||||
tag_reference: tag.to_string(),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,11 +1,199 @@
|
||||
//! `secured-write` — Verified Write demonstrations.
|
||||
//!
|
||||
//! `write_secured` (no timestamp) and `write_secured_at` (timestamped), each
|
||||
//! taking `(current_user_id, verifier_user_id)`. Demonstrates both the
|
||||
//! single-user path (`current == verifier`) and the two-person verification
|
||||
//! path. Per `wwtools/mxaccesscli/`, the LMX `WriteSecured` always takes two
|
||||
//! ids — single-user is just same-id-twice, not a separate API. M0 stub.
|
||||
//! Calls `Session::write_value_secured_at` twice:
|
||||
//!
|
||||
//! 1. Single-user secured write — `current_user_id == verifier_user_id`.
|
||||
//! Per `wwtools/mxaccesscli/src/MxAccess.Cli/Commands/WriteCommand.cs:151-155`
|
||||
//! the LMX `WriteSecured` always takes two ids; the single-user path
|
||||
//! is just same-id-twice, not a separate API.
|
||||
//! 2. Two-person verification — `current_user_id != verifier_user_id`.
|
||||
//! Per `wwtools/mxaccesscli/src/MxAccess.Cli/Commands/WriteCommand.cs:196-199`.
|
||||
//!
|
||||
//! Both require a tag whose `security_classification` permits Verified
|
||||
//! Write (typically classification 4). The inline `StaticResolver` sets
|
||||
//! `security_classification: 4`; live tags need the same setting in
|
||||
//! the IDE.
|
||||
//!
|
||||
//! # Required env vars
|
||||
//!
|
||||
//! Same set as `connect-write-read.rs`, plus:
|
||||
//!
|
||||
//! - `MX_TEST_USER_ID` — current user's Galaxy id (i32)
|
||||
//! - `MX_TEST_VERIFIER_ID` — verifier user's Galaxy id (i32). Defaults
|
||||
//! to `MX_TEST_USER_ID` to demonstrate the single-user path.
|
||||
|
||||
fn main() {
|
||||
eprintln!("secured-write: stubbed (M0). See design/60-roadmap.md M4.");
|
||||
use std::sync::Arc;
|
||||
use std::time::SystemTime;
|
||||
|
||||
use mxaccess::session::system_time_to_filetime;
|
||||
use mxaccess::{
|
||||
GalaxyTagMetadata, RecoveryPolicy, Resolver, ResolverError, SecurityContext, Session,
|
||||
SessionOptions, WriteValue,
|
||||
};
|
||||
use mxaccess_rpc::guid::Guid;
|
||||
use mxaccess_rpc::ntlm::NtlmClientContext;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let Some(env) = LiveEnv::from_process()? else {
|
||||
eprintln!(
|
||||
"MX_LIVE not set — skipping live demo. Run \
|
||||
`. tools/Setup-LiveProbeEnv.ps1` to populate the required env vars."
|
||||
);
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let session = Session::connect_nmx(
|
||||
env.addr,
|
||||
SessionOptions::default(),
|
||||
NtlmClientContext::from_env()?,
|
||||
env.service_ipid,
|
||||
Arc::new(StaticResolver::new(&env.tag)),
|
||||
RecoveryPolicy::default(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let now = system_time_to_filetime(SystemTime::now())?;
|
||||
|
||||
eprintln!(
|
||||
"single-user secured write: current=verifier={} → {}=42",
|
||||
env.user_id, env.tag
|
||||
);
|
||||
session
|
||||
.write_value_secured_at(
|
||||
&env.tag,
|
||||
WriteValue::Int32(42),
|
||||
now,
|
||||
SecurityContext {
|
||||
current_user_id: env.user_id,
|
||||
verifier_user_id: env.user_id,
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
if env.verifier_id != env.user_id {
|
||||
eprintln!(
|
||||
"two-person secured write: current={} verifier={} → {}=99",
|
||||
env.user_id, env.verifier_id, env.tag
|
||||
);
|
||||
session
|
||||
.write_value_secured_at(
|
||||
&env.tag,
|
||||
WriteValue::Int32(99),
|
||||
now,
|
||||
SecurityContext {
|
||||
current_user_id: env.user_id,
|
||||
verifier_user_id: env.verifier_id,
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
} else {
|
||||
eprintln!(
|
||||
"MX_TEST_VERIFIER_ID == MX_TEST_USER_ID; skipping two-person path. \
|
||||
Set MX_TEST_VERIFIER_ID to a distinct Galaxy user id to exercise it."
|
||||
);
|
||||
}
|
||||
|
||||
session.shutdown_nmx().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ---- shared boilerplate (see connect-write-read.rs for rationale) ----------
|
||||
|
||||
struct LiveEnv {
|
||||
addr: std::net::SocketAddr,
|
||||
service_ipid: Guid,
|
||||
tag: String,
|
||||
user_id: i32,
|
||||
verifier_id: i32,
|
||||
}
|
||||
|
||||
impl LiveEnv {
|
||||
fn from_process() -> Result<Option<Self>, Box<dyn std::error::Error>> {
|
||||
if std::env::var_os("MX_LIVE").is_none() {
|
||||
return Ok(None);
|
||||
}
|
||||
let host = std::env::var("MX_NMX_HOST")?;
|
||||
let addr = parse_host_port(&host, 135)?;
|
||||
let service_ipid = Guid::parse_str(&std::env::var("MX_NMX_SERVICE_IPID")?)?;
|
||||
let tag = std::env::var("MX_TEST_TAG")
|
||||
.unwrap_or_else(|_| "TestChildObject.TestSecuredInt".into());
|
||||
let user_id: i32 = std::env::var("MX_TEST_USER_ID")?.parse()?;
|
||||
let verifier_id: i32 = std::env::var("MX_TEST_VERIFIER_ID")
|
||||
.ok()
|
||||
.and_then(|s| s.parse().ok())
|
||||
.unwrap_or(user_id);
|
||||
Ok(Some(Self {
|
||||
addr,
|
||||
service_ipid,
|
||||
tag,
|
||||
user_id,
|
||||
verifier_id,
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_host_port(
|
||||
s: &str,
|
||||
default_port: u16,
|
||||
) -> Result<std::net::SocketAddr, Box<dyn std::error::Error>> {
|
||||
if let Ok(addr) = s.parse() {
|
||||
return Ok(addr);
|
||||
}
|
||||
let with_port = if s.contains(':') {
|
||||
s.to_string()
|
||||
} else {
|
||||
format!("{s}:{default_port}")
|
||||
};
|
||||
Ok(
|
||||
std::net::ToSocketAddrs::to_socket_addrs(&with_port.as_str())?
|
||||
.next()
|
||||
.ok_or("no addrs resolved")?,
|
||||
)
|
||||
}
|
||||
|
||||
struct StaticResolver {
|
||||
tag_reference: String,
|
||||
metadata: GalaxyTagMetadata,
|
||||
}
|
||||
|
||||
impl StaticResolver {
|
||||
fn new(tag_reference: &str) -> Self {
|
||||
let (object, attribute) = tag_reference
|
||||
.split_once('.')
|
||||
.unwrap_or((tag_reference, "TestSecuredInt"));
|
||||
Self {
|
||||
tag_reference: tag_reference.to_string(),
|
||||
metadata: GalaxyTagMetadata {
|
||||
object_tag_name: object.to_string(),
|
||||
attribute_name: attribute.to_string(),
|
||||
primitive_name: None,
|
||||
platform_id: 1,
|
||||
engine_id: 2,
|
||||
object_id: 3,
|
||||
primitive_id: 0,
|
||||
attribute_id: 8,
|
||||
property_id: GalaxyTagMetadata::VALUE_PROPERTY_ID,
|
||||
mx_data_type: 2,
|
||||
is_array: false,
|
||||
// Verified Write requires a non-zero security
|
||||
// classification (typically 4 for "verified write").
|
||||
security_classification: 4,
|
||||
attribute_source: "dynamic".into(),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Resolver for StaticResolver {
|
||||
async fn resolve(&self, tag: &str) -> Result<GalaxyTagMetadata, ResolverError> {
|
||||
if tag == self.tag_reference {
|
||||
Ok(self.metadata.clone())
|
||||
} else {
|
||||
Err(ResolverError::NotFound {
|
||||
tag_reference: tag.to_string(),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,9 +1,64 @@
|
||||
//! `subscribe-buffered` — buffered subscription with delivery cadence.
|
||||
//! `subscribe-buffered` — buffered subscription demonstration (M6 placeholder).
|
||||
//!
|
||||
//! Per `wwtools/mxaccesscli/docs/api-notes.md:138-140`, "buffered" is a
|
||||
//! delivery-cadence knob (`SetBufferedUpdateInterval`), not multi-sample
|
||||
//! payload bundling. M0 stub.
|
||||
//! delivery-cadence knob (`SetBufferedUpdateInterval`), **not** multi-sample
|
||||
//! payload bundling. Each event still carries one sample; the cadence
|
||||
//! controls how often the server flushes accumulated updates.
|
||||
//!
|
||||
//! `Session::subscribe_buffered` is currently `Err(Error::Unsupported)`
|
||||
//! pending the M6 buffered-mode RPC port. Once the surface lands the
|
||||
//! demo body below becomes ~10 lines using the same `Stream` interface
|
||||
//! as `subscribe.rs`.
|
||||
|
||||
fn main() {
|
||||
eprintln!("subscribe-buffered: stubbed (M0). See design/60-roadmap.md M6.");
|
||||
use mxaccess::{BufferedOptions, ConnectionOptions, Session};
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
if std::env::var_os("MX_LIVE").is_none() {
|
||||
eprintln!(
|
||||
"MX_LIVE not set — `subscribe-buffered` is the M6 placeholder; \
|
||||
run `. tools/Setup-LiveProbeEnv.ps1` to populate live env vars \
|
||||
once the buffered subscribe surface lands."
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// The constructor itself is currently Unsupported (gated on M3 transport
|
||||
// selection wiring). When it lands, swap to `Session::connect_nmx` like
|
||||
// the other examples.
|
||||
let session = match Session::connect(ConnectionOptions).await {
|
||||
Ok(s) => s,
|
||||
Err(mxaccess::Error::Unsupported {
|
||||
operation,
|
||||
transport,
|
||||
}) => {
|
||||
eprintln!(
|
||||
"Session::connect / subscribe_buffered are deferred to M6: \
|
||||
{operation} on {transport:?} transport. See \
|
||||
design/followups.md for the buffered-subscribe gating note."
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
Err(e) => return Err(e.into()),
|
||||
};
|
||||
|
||||
let opts = BufferedOptions {
|
||||
update_interval_ms: 250,
|
||||
};
|
||||
match session
|
||||
.subscribe_buffered("TestChildObject.TestInt", opts)
|
||||
.await
|
||||
{
|
||||
Ok(_) => {
|
||||
eprintln!(
|
||||
"subscribe_buffered returned a subscription unexpectedly — \
|
||||
check whether M6 has landed and update this example."
|
||||
);
|
||||
}
|
||||
Err(mxaccess::Error::Unsupported { operation, .. }) => {
|
||||
eprintln!("{operation}: deferred to M6 (see design/followups.md)");
|
||||
}
|
||||
Err(e) => return Err(e.into()),
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1,7 +1,157 @@
|
||||
//! `subscribe` — subscribe to a single tag and stream `DataChange` events.
|
||||
//! `subscribe` — open a single-tag subscription and stream `DataChange` events.
|
||||
//!
|
||||
//! M0 stub.
|
||||
//! Drains up to 5 updates (or a 10s timeout, whichever first), prints each,
|
||||
//! then unsubscribes cleanly. Mirrors the .NET `SubscribeAsync` ergonomic
|
||||
//! at `MxNativeSession.cs:250-270`.
|
||||
//!
|
||||
//! See `connect-write-read.rs` for the env-var contract and resolver shim
|
||||
//! design notes.
|
||||
|
||||
fn main() {
|
||||
eprintln!("subscribe: stubbed (M0). See design/60-roadmap.md M4.");
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use futures_util::StreamExt;
|
||||
use mxaccess::{
|
||||
GalaxyTagMetadata, RecoveryPolicy, Resolver, ResolverError, Session, SessionOptions,
|
||||
};
|
||||
use mxaccess_rpc::guid::Guid;
|
||||
use mxaccess_rpc::ntlm::NtlmClientContext;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let Some(env) = LiveEnv::from_process()? else {
|
||||
eprintln!(
|
||||
"MX_LIVE not set — skipping live demo. Run \
|
||||
`. tools/Setup-LiveProbeEnv.ps1` to populate the required env vars."
|
||||
);
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let session = Session::connect_nmx(
|
||||
env.addr,
|
||||
SessionOptions::default(),
|
||||
NtlmClientContext::from_env()?,
|
||||
env.service_ipid,
|
||||
Arc::new(StaticResolver::new(&env.tag)),
|
||||
RecoveryPolicy::default(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
eprintln!("subscribing to {}", env.tag);
|
||||
let mut sub = session.subscribe(&env.tag).await?;
|
||||
eprintln!("correlation_id = {:02x?}", sub.correlation_id());
|
||||
|
||||
let mut received = 0;
|
||||
while received < 5 {
|
||||
match tokio::time::timeout(Duration::from_secs(10), sub.next()).await {
|
||||
Ok(Some(Ok(dc))) => {
|
||||
println!("{} = {:?} ts={:?}", dc.reference, dc.value, dc.timestamp);
|
||||
received += 1;
|
||||
}
|
||||
Ok(Some(Err(e))) => {
|
||||
eprintln!("subscription error: {e}");
|
||||
break;
|
||||
}
|
||||
Ok(None) => {
|
||||
eprintln!("subscription stream ended");
|
||||
break;
|
||||
}
|
||||
Err(_) => {
|
||||
eprintln!("no update within 10s; exiting after {received} updates");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
session.unsubscribe(sub).await?;
|
||||
session.shutdown_nmx().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ---- shared boilerplate (see connect-write-read.rs for rationale) ----------
|
||||
|
||||
struct LiveEnv {
|
||||
addr: std::net::SocketAddr,
|
||||
service_ipid: Guid,
|
||||
tag: String,
|
||||
}
|
||||
|
||||
impl LiveEnv {
|
||||
fn from_process() -> Result<Option<Self>, Box<dyn std::error::Error>> {
|
||||
if std::env::var_os("MX_LIVE").is_none() {
|
||||
return Ok(None);
|
||||
}
|
||||
let host = std::env::var("MX_NMX_HOST")?;
|
||||
let addr = parse_host_port(&host, 135)?;
|
||||
let service_ipid = Guid::parse_str(&std::env::var("MX_NMX_SERVICE_IPID")?)?;
|
||||
let tag = std::env::var("MX_TEST_TAG").unwrap_or_else(|_| "TestChildObject.TestInt".into());
|
||||
Ok(Some(Self {
|
||||
addr,
|
||||
service_ipid,
|
||||
tag,
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_host_port(
|
||||
s: &str,
|
||||
default_port: u16,
|
||||
) -> Result<std::net::SocketAddr, Box<dyn std::error::Error>> {
|
||||
if let Ok(addr) = s.parse() {
|
||||
return Ok(addr);
|
||||
}
|
||||
let with_port = if s.contains(':') {
|
||||
s.to_string()
|
||||
} else {
|
||||
format!("{s}:{default_port}")
|
||||
};
|
||||
Ok(
|
||||
std::net::ToSocketAddrs::to_socket_addrs(&with_port.as_str())?
|
||||
.next()
|
||||
.ok_or("no addrs resolved")?,
|
||||
)
|
||||
}
|
||||
|
||||
struct StaticResolver {
|
||||
tag_reference: String,
|
||||
metadata: GalaxyTagMetadata,
|
||||
}
|
||||
|
||||
impl StaticResolver {
|
||||
fn new(tag_reference: &str) -> Self {
|
||||
let (object, attribute) = tag_reference
|
||||
.split_once('.')
|
||||
.unwrap_or((tag_reference, "TestInt"));
|
||||
Self {
|
||||
tag_reference: tag_reference.to_string(),
|
||||
metadata: GalaxyTagMetadata {
|
||||
object_tag_name: object.to_string(),
|
||||
attribute_name: attribute.to_string(),
|
||||
primitive_name: None,
|
||||
platform_id: 1,
|
||||
engine_id: 2,
|
||||
object_id: 3,
|
||||
primitive_id: 0,
|
||||
attribute_id: 7,
|
||||
property_id: GalaxyTagMetadata::VALUE_PROPERTY_ID,
|
||||
mx_data_type: 2,
|
||||
is_array: false,
|
||||
security_classification: 0,
|
||||
attribute_source: "dynamic".into(),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Resolver for StaticResolver {
|
||||
async fn resolve(&self, tag: &str) -> Result<GalaxyTagMetadata, ResolverError> {
|
||||
if tag == self.tag_reference {
|
||||
Ok(self.metadata.clone())
|
||||
} else {
|
||||
Err(ResolverError::NotFound {
|
||||
tag_reference: tag.to_string(),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
+269
-43
@@ -1,13 +1,21 @@
|
||||
//! `mxaccess` — async Tokio façade for AVEVA / Wonderware MXAccess.
|
||||
//!
|
||||
//! Public API surface: `Session`, `Subscription`, `DataChange`, `Transport`
|
||||
//! trait, `Error` taxonomy. Two transports plug into the same trait
|
||||
//! (`NmxTransport`, `AsbTransport`) so M5 (ASB) can develop in parallel with
|
||||
//! M3/M4 (NMX) — see `design/60-roadmap.md` sequencing notes.
|
||||
//! Public API surface: [`Session`], [`Subscription`], [`DataChange`],
|
||||
//! [`Transport`] trait, [`Error`] taxonomy. Two transports plug into the
|
||||
//! same trait (`NmxTransport`, `AsbTransport`) so M5 (ASB) can develop
|
||||
//! in parallel with M3/M4 (NMX) — see `design/60-roadmap.md` sequencing
|
||||
//! notes.
|
||||
//!
|
||||
//! M0 stub. Everything `todo!()`s. Real implementation lands across M3
|
||||
//! (`mxaccess-nmx` wiring), M4 (NMX façade complete), and M5 (ASB transport).
|
||||
//! See `design/20-async-layer.md` for the full API specification.
|
||||
//! ## Status
|
||||
//!
|
||||
//! M4 (NMX façade) is feature-complete: connect → write → read →
|
||||
//! subscribe → unsubscribe → recovery → shutdown all work end-to-end
|
||||
//! against a live AVEVA install (see `examples/connect-write-read.rs`).
|
||||
//! M5 (ASB transport) lands in a future iteration; the
|
||||
//! [`Session::connect`] generic constructor returns
|
||||
//! [`Error::Unsupported`] until then. See `design/20-async-layer.md`
|
||||
//! for the full API specification and `design/followups.md` for the
|
||||
//! deferred work tracker.
|
||||
|
||||
#![forbid(unsafe_code)]
|
||||
|
||||
@@ -341,24 +349,60 @@ pub trait Transport: Send + Sync + 'static {
|
||||
fn kind(&self) -> TransportKind;
|
||||
}
|
||||
|
||||
// ---- Session API surface (stubs) -----------------------------------------
|
||||
// ---- Session API surface -------------------------------------------------
|
||||
//
|
||||
// The `*_value` family in `session.rs` takes `WriteValue` (the codec's
|
||||
// encoder-side variant set). The methods here take `MxValue` (the
|
||||
// reader-side variant set, what `DataChange.value` carries) and shim
|
||||
// to the `*_value` family via `mxvalue_to_writevalue`. Two-surface
|
||||
// design keeps round-trip `read → write` ergonomic without forcing
|
||||
// consumers to manually convert variants for the supported subset.
|
||||
//
|
||||
// Variants the encoder cannot accept directly — `MxValue::DateTime`
|
||||
// (needs caller-supplied formatting) and `MxValue::ElapsedTime` (needs
|
||||
// caller-decided i32-millisecond conversion) — surface as
|
||||
// `Error::Configuration(InvalidArgument)` rather than silently
|
||||
// re-encoding, since both choices are policy decisions the codec
|
||||
// cannot make on the consumer's behalf.
|
||||
|
||||
impl Session {
|
||||
/// Generic transport-selection constructor. Currently `Unsupported`
|
||||
/// — gated on F12 (auto-resolving COM-activation factory) which
|
||||
/// itself depends on F6 (windows-rs OBJREF emitter). Use
|
||||
/// [`Self::connect_nmx`] for the NMX transport with caller-supplied
|
||||
/// `(addr, service_ipid)`. M5 will add an analogous `connect_asb`
|
||||
/// for the ASB transport.
|
||||
pub async fn connect(_options: ConnectionOptions) -> Result<Self, Error> {
|
||||
// M3+
|
||||
Err(Error::Unsupported {
|
||||
operation: Cow::Borrowed("Session::connect"),
|
||||
operation: Cow::Borrowed("Session::connect (use Session::connect_nmx; F12)"),
|
||||
transport: TransportKind::Nmx,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn write(&self, _reference: &str, _value: MxValue) -> Result<(), Error> {
|
||||
Err(Error::Unsupported {
|
||||
operation: Cow::Borrowed("Session::write"),
|
||||
transport: TransportKind::Nmx,
|
||||
})
|
||||
/// Write a value to a tag (`MxValue` overload). Delegates to
|
||||
/// [`Self::write_value`] after converting `value` via
|
||||
/// [`mxvalue_to_writevalue`].
|
||||
///
|
||||
/// # Errors
|
||||
/// As for [`Self::write_value`], plus
|
||||
/// [`Error::Configuration`] when `value` is a variant the LMX
|
||||
/// encoder cannot accept directly (`MxValue::DateTime` /
|
||||
/// `ElapsedTime` and their array variants — see module-level note
|
||||
/// for why).
|
||||
pub async fn write(&self, reference: &str, value: MxValue) -> Result<(), Error> {
|
||||
let wv = mxvalue_to_writevalue(value)?;
|
||||
self.write_value(reference, wv).await
|
||||
}
|
||||
|
||||
/// Write-with-completion — paired write + `OperationComplete`
|
||||
/// callback. Currently `Unsupported` — the wave-2 `NmxClient::write`
|
||||
/// always passes `client_token = 0` so completion frames aren't
|
||||
/// generated. Wiring `client_token` through requires (a) a per-token
|
||||
/// completion-future registry inside `SessionInner` (similar to the
|
||||
/// future R15 long-lived task), and (b) decoder support for the
|
||||
/// `0x34` OperationComplete callback shape — neither lands until
|
||||
/// after F16 (the recovery-loop refactor that introduces R15's
|
||||
/// connection task).
|
||||
pub async fn write_with_completion(
|
||||
&self,
|
||||
_reference: &str,
|
||||
@@ -366,25 +410,42 @@ impl Session {
|
||||
_client_token: u32,
|
||||
) -> Result<(), Error> {
|
||||
Err(Error::Unsupported {
|
||||
operation: Cow::Borrowed("Session::write_with_completion"),
|
||||
operation: Cow::Borrowed(
|
||||
"Session::write_with_completion (needs per-token registry; gated on R15)",
|
||||
),
|
||||
transport: TransportKind::Nmx,
|
||||
})
|
||||
}
|
||||
|
||||
/// Write a timestamped value (`MxValue` overload). Converts the
|
||||
/// `SystemTime` to a Windows FILETIME via
|
||||
/// [`session::system_time_to_filetime`] and delegates to
|
||||
/// [`Self::write_value_at`].
|
||||
///
|
||||
/// # Errors
|
||||
/// As for [`Self::write_value_at`], plus
|
||||
/// [`Error::Configuration`] when `value` is an unconvertible
|
||||
/// `MxValue` variant or `timestamp` is out of FILETIME range.
|
||||
pub async fn write_with_timestamp(
|
||||
&self,
|
||||
_reference: &str,
|
||||
_value: MxValue,
|
||||
_timestamp: SystemTime,
|
||||
reference: &str,
|
||||
value: MxValue,
|
||||
timestamp: SystemTime,
|
||||
) -> Result<(), Error> {
|
||||
Err(Error::Unsupported {
|
||||
operation: Cow::Borrowed("Session::write_with_timestamp"),
|
||||
transport: TransportKind::Nmx,
|
||||
})
|
||||
let wv = mxvalue_to_writevalue(value)?;
|
||||
let ft = session::system_time_to_filetime(timestamp)?;
|
||||
self.write_value_at(reference, wv, ft).await
|
||||
}
|
||||
|
||||
/// Verified Write — always two-id per `wwtools/mxaccesscli/`. Single-user
|
||||
/// secured writes pass `current_user_id == verifier_user_id`.
|
||||
/// Verified Write without an explicit timestamp. Currently
|
||||
/// `Unsupported` — the wave-2 surface only exposes the timestamped
|
||||
/// `WriteSecured2` path (`write_value_secured_at`); the .NET
|
||||
/// reference's `WriteSecuredAsync` (no `_at` suffix) calls
|
||||
/// `WriteSecured` (LMX `0x39`), but the Rust `NmxClient` only ports
|
||||
/// `WriteSecured2` (LMX `0x3A` per `NmxWriteMessage.cs:215`).
|
||||
/// Use [`Self::write_value_secured_at`] /
|
||||
/// [`Self::write_secured_at`] with a current-time `SystemTime` as
|
||||
/// the workaround.
|
||||
pub async fn write_secured(
|
||||
&self,
|
||||
_reference: &str,
|
||||
@@ -392,50 +453,140 @@ impl Session {
|
||||
_security: SecurityContext,
|
||||
) -> Result<(), Error> {
|
||||
Err(Error::Unsupported {
|
||||
operation: Cow::Borrowed("Session::write_secured"),
|
||||
operation: Cow::Borrowed(
|
||||
"Session::write_secured (no-timestamp; use write_secured_at with SystemTime::now())",
|
||||
),
|
||||
transport: TransportKind::Nmx,
|
||||
})
|
||||
}
|
||||
|
||||
/// Verified Write with an explicit timestamp (`MxValue` overload).
|
||||
/// Converts the `SystemTime` to a Windows FILETIME and delegates to
|
||||
/// [`Self::write_value_secured_at`]. Single-user secured writes
|
||||
/// pass `current_user_id == verifier_user_id`.
|
||||
///
|
||||
/// # Errors
|
||||
/// As for [`Self::write_value_secured_at`], plus
|
||||
/// [`Error::Configuration`] when `value` is an unconvertible
|
||||
/// `MxValue` variant or `timestamp` is out of FILETIME range.
|
||||
pub async fn write_secured_at(
|
||||
&self,
|
||||
_reference: &str,
|
||||
_value: MxValue,
|
||||
_timestamp: SystemTime,
|
||||
_security: SecurityContext,
|
||||
reference: &str,
|
||||
value: MxValue,
|
||||
timestamp: SystemTime,
|
||||
security: SecurityContext,
|
||||
) -> Result<(), Error> {
|
||||
Err(Error::Unsupported {
|
||||
operation: Cow::Borrowed("Session::write_secured_at"),
|
||||
transport: TransportKind::Nmx,
|
||||
})
|
||||
let wv = mxvalue_to_writevalue(value)?;
|
||||
let ft = session::system_time_to_filetime(timestamp)?;
|
||||
self.write_value_secured_at(reference, wv, ft, security)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Subscribe to multiple tags atomically. Currently `Unsupported`
|
||||
/// — the LMX wire has no atomic subscribe-many frame. Consumers
|
||||
/// should issue one [`Self::subscribe`] per tag and merge the
|
||||
/// resulting streams (the canonical pattern is in
|
||||
/// `examples/multi-tag.rs`). Mirrors what the .NET reference does
|
||||
/// at `MxNativeSession.cs:250-270`.
|
||||
pub async fn subscribe_many(&self, _references: &[&str]) -> Result<Subscription, Error> {
|
||||
Err(Error::Unsupported {
|
||||
operation: Cow::Borrowed("Session::subscribe_many"),
|
||||
operation: Cow::Borrowed(
|
||||
"Session::subscribe_many (no atomic frame on the wire; loop subscribe per tag)",
|
||||
),
|
||||
transport: TransportKind::Nmx,
|
||||
})
|
||||
}
|
||||
|
||||
/// Buffered subscription with a delivery-cadence knob. Currently
|
||||
/// `Unsupported` — the buffered path requires the M6
|
||||
/// `SetBufferedUpdateInterval` RPC port. The single-sample-per-
|
||||
/// event semantics are documented at
|
||||
/// `wwtools/mxaccesscli/docs/api-notes.md:138-140`.
|
||||
pub async fn subscribe_buffered(
|
||||
&self,
|
||||
_reference: &str,
|
||||
_options: BufferedOptions,
|
||||
) -> Result<Subscription, Error> {
|
||||
Err(Error::Unsupported {
|
||||
operation: Cow::Borrowed("Session::subscribe_buffered"),
|
||||
operation: Cow::Borrowed("Session::subscribe_buffered (M6)"),
|
||||
transport: TransportKind::Nmx,
|
||||
})
|
||||
}
|
||||
|
||||
/// Orderly shutdown — flushes `UnAdvise` for every live subscription,
|
||||
/// then `UnregisterEngine`. Recommended exit path for production code.
|
||||
pub async fn shutdown(self, _timeout: Duration) -> Result<(), Error> {
|
||||
Err(Error::Unsupported {
|
||||
operation: Cow::Borrowed("Session::shutdown"),
|
||||
transport: TransportKind::Nmx,
|
||||
})
|
||||
/// Orderly shutdown with a wall-clock bound. Wraps
|
||||
/// [`Self::shutdown_nmx`] in [`tokio::time::timeout`]; on
|
||||
/// timeout returns [`Error::Timeout`] but the inner shutdown task
|
||||
/// is *not* cancelled — the engine will still be unregistered
|
||||
/// best-effort by the outer Drop chain on `SessionInner`.
|
||||
///
|
||||
/// `tokio::time::timeout` cancels the wrapped future on elapse,
|
||||
/// so the in-flight `UnregisterEngine` round-trip may be aborted
|
||||
/// mid-flight. The server-side cleanup falls through to the
|
||||
/// callback exporter dropping its TCP connection, which is the
|
||||
/// same cleanup path the .NET reference relies on at
|
||||
/// `MxNativeSession.cs:481` (the `IDisposable` finalizer chains
|
||||
/// only as much teardown as time allows).
|
||||
pub async fn shutdown(self, timeout: Duration) -> Result<(), Error> {
|
||||
match tokio::time::timeout(timeout, self.shutdown_nmx()).await {
|
||||
Ok(result) => result,
|
||||
Err(_) => Err(Error::Timeout(timeout)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Convert a [`MxValue`] (read-side runtime variant) into a
|
||||
/// [`WriteValue`] (encoder-side variant). Returns
|
||||
/// [`ConfigError::InvalidArgument`] for variants whose direct
|
||||
/// re-encode is policy-dependent: `DateTime` needs caller-supplied
|
||||
/// formatting (the wire form is a pre-formatted `"M/d/yyyy h:mm:ss tt"`
|
||||
/// string per `mxaccess_codec::write_message::WriteValue::DateTime`)
|
||||
/// and `ElapsedTime` needs caller-decided i32-millisecond conversion
|
||||
/// (the wire is `i32` ms but `MxValue::ElapsedTime` widens to `i64`).
|
||||
/// Array variants of those two carry the same restrictions.
|
||||
fn mxvalue_to_writevalue(value: MxValue) -> Result<mxaccess_nmx::WriteValue, Error> {
|
||||
use mxaccess_nmx::WriteValue;
|
||||
let wv = match value {
|
||||
MxValue::Boolean(b) => WriteValue::Boolean(b),
|
||||
MxValue::Int32(i) => WriteValue::Int32(i),
|
||||
MxValue::Float32(f) => WriteValue::Float32(f),
|
||||
MxValue::Float64(f) => WriteValue::Float64(f),
|
||||
MxValue::String(s) => WriteValue::String(s),
|
||||
MxValue::DateTime(_) => {
|
||||
return Err(Error::Configuration(ConfigError::InvalidArgument {
|
||||
detail: "MxValue::DateTime carries raw FILETIME ticks; \
|
||||
construct WriteValue::DateTime(\"M/d/yyyy h:mm:ss tt\") directly"
|
||||
.to_string(),
|
||||
}));
|
||||
}
|
||||
MxValue::ElapsedTime(_) => {
|
||||
return Err(Error::Configuration(ConfigError::InvalidArgument {
|
||||
detail: "MxValue::ElapsedTime carries i64 ms; \
|
||||
construct WriteValue::Int32(ms) directly"
|
||||
.to_string(),
|
||||
}));
|
||||
}
|
||||
MxValue::BoolArray(v) => WriteValue::BooleanArray(v),
|
||||
MxValue::Int32Array(v) => WriteValue::Int32Array(v),
|
||||
MxValue::Float32Array(v) => WriteValue::Float32Array(v),
|
||||
MxValue::Float64Array(v) => WriteValue::Float64Array(v),
|
||||
MxValue::StringArray(v) => WriteValue::StringArray(v),
|
||||
MxValue::DateTimeArray(_) => {
|
||||
return Err(Error::Configuration(ConfigError::InvalidArgument {
|
||||
detail: "MxValue::DateTimeArray carries raw FILETIME ticks; \
|
||||
construct WriteValue::DateTimeArray(Vec<String>) with formatted entries"
|
||||
.to_string(),
|
||||
}));
|
||||
}
|
||||
// MxValue is #[non_exhaustive]; future variants land here without
|
||||
// a compile break and surface as Unsupported until explicitly
|
||||
// mapped.
|
||||
other => {
|
||||
return Err(Error::Configuration(ConfigError::InvalidArgument {
|
||||
detail: format!("MxValue variant {other:?} has no WriteValue mapping"),
|
||||
}));
|
||||
}
|
||||
};
|
||||
Ok(wv)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -565,4 +716,79 @@ mod tests {
|
||||
other => panic!("expected Recovered, got {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
// ---- mxvalue_to_writevalue conversion ------------------------------
|
||||
|
||||
use mxaccess_nmx::WriteValue;
|
||||
|
||||
#[test]
|
||||
fn mxvalue_to_writevalue_boolean_round_trips() {
|
||||
let wv = mxvalue_to_writevalue(MxValue::Boolean(true)).unwrap();
|
||||
assert_eq!(wv, WriteValue::Boolean(true));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn mxvalue_to_writevalue_int32_round_trips() {
|
||||
let wv = mxvalue_to_writevalue(MxValue::Int32(42)).unwrap();
|
||||
assert_eq!(wv, WriteValue::Int32(42));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn mxvalue_to_writevalue_float64_round_trips() {
|
||||
let wv = mxvalue_to_writevalue(MxValue::Float64(1.5)).unwrap();
|
||||
assert_eq!(wv, WriteValue::Float64(1.5));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn mxvalue_to_writevalue_string_round_trips() {
|
||||
let wv = mxvalue_to_writevalue(MxValue::String("hello".into())).unwrap();
|
||||
assert_eq!(wv, WriteValue::String("hello".into()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn mxvalue_to_writevalue_int32_array_round_trips() {
|
||||
let wv = mxvalue_to_writevalue(MxValue::Int32Array(vec![1, 2, 3])).unwrap();
|
||||
assert_eq!(wv, WriteValue::Int32Array(vec![1, 2, 3]));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn mxvalue_to_writevalue_bool_array_round_trips() {
|
||||
// Array variant rename: MxValue::BoolArray → WriteValue::BooleanArray.
|
||||
let wv = mxvalue_to_writevalue(MxValue::BoolArray(vec![true, false])).unwrap();
|
||||
assert_eq!(wv, WriteValue::BooleanArray(vec![true, false]));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn mxvalue_to_writevalue_string_array_round_trips() {
|
||||
let wv = mxvalue_to_writevalue(MxValue::StringArray(vec!["a".into(), "b".into()])).unwrap();
|
||||
assert_eq!(wv, WriteValue::StringArray(vec!["a".into(), "b".into()]));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn mxvalue_to_writevalue_datetime_rejects() {
|
||||
// Pre-formatted string is required; raw FILETIME ticks are not enough.
|
||||
let err = mxvalue_to_writevalue(MxValue::DateTime(132_867_600_000_000_000)).unwrap_err();
|
||||
let detail = format!("{err}");
|
||||
assert!(detail.contains("DateTime"), "got: {detail}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn mxvalue_to_writevalue_elapsed_time_rejects() {
|
||||
let err = mxvalue_to_writevalue(MxValue::ElapsedTime(5_000)).unwrap_err();
|
||||
let detail = format!("{err}");
|
||||
assert!(detail.contains("ElapsedTime"), "got: {detail}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn mxvalue_to_writevalue_datetime_array_rejects() {
|
||||
let err = mxvalue_to_writevalue(MxValue::DateTimeArray(vec![1, 2])).unwrap_err();
|
||||
let detail = format!("{err}");
|
||||
assert!(detail.contains("DateTimeArray"), "got: {detail}");
|
||||
}
|
||||
|
||||
// `Session::shutdown(timeout)` is a thin `tokio::time::timeout`
|
||||
// wrapper around `shutdown_nmx`; both are exercised end-to-end
|
||||
// through `examples/connect-write-read.rs` (live-only). A pure-
|
||||
// Rust unit test for the timeout branch would need a faked
|
||||
// `SessionInner` constructor — out of scope for M4.
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user