Rust client: port BenchReadBulk subcommand + session.rs tightening

The bulk-write/read SDK methods (read_bulk, write_bulk, write2_bulk,
write_secured_bulk, write_secured2_bulk) and the matching clap
subcommands (ReadBulk, WriteBulk, Write2Bulk, WriteSecuredBulk,
WriteSecured2Bulk) were already on HEAD from a prior session — they
were the only bulk family that HEAD shipped before the .NET / Go /
Python / Java parallel ports. The one missing piece from the divergent
branch (commit f220908) was the BenchReadBulk benchmark harness.

mxgw-cli/src/main.rs adds:
- BenchReadBulk clap variant with flags --client-name,
  --duration-seconds, --warmup-seconds, --bulk-size, --tag-start,
  --tag-prefix, --tag-attribute, --timeout-ms, --json — defaults match
  the .NET and Go benches.
- run_bench_read_bulk(): open-session → register → subscribe_bulk on
  the synthesized TestMachine_NNN.TestChangingInt tags to populate the
  worker value cache → warmup → steady-state loop with per-call
  std::time::Instant capture → unsubscribe → close-session.
- BenchStats + LatencySummary structs and a percentile()
  helper (nearest-rank with linear interpolation, matching the Go and
  .NET implementations) so the cross-language JSON output is byte-for-
  byte comparable. JSON schema: language / command / endpoint /
  clientName / bulkSize / durationSeconds / warmupSeconds / durationMs
  / tags / totalCalls / successfulCalls / failedCalls /
  totalReadResults / cachedReadResults / callsPerSecond /
  latencyMs:{p50,p95,p99,max,mean}. scripts/bench-read-bulk.ps1 will
  pick up the Rust line on its next run.

session.rs picks up minor tightening tied to the bulk SDK methods that
were already in the file (per-entry validation paths, BulkReplyKind
dispatch coverage) — no public-surface change.

Verification: cargo build --workspace clean (the 2 pre-existing
options.rs missing_docs warnings remain — out of scope); cargo test
--workspace 34/34 passing; cargo clippy --workspace --all-targets has
only the 3 pre-existing tolerated warnings (enum_variant_names on
BulkReplyKind, missing_docs on options.rs, clone_on_copy on
galaxy.rs:282). Manual smoke against live gateway on localhost:5120:
read-bulk on two TestMachine tags returned wasCached=true,
wasSuccessful=true; bench-read-bulk --duration-seconds 2
--warmup-seconds 1 --bulk-size 2 --json ran 363 calls / 181.35 calls
per second / p50=5.3 ms / p99=7.8 ms / 726 of 726 cached reads, all
emitting valid JSON in the shared bench schema.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-24 04:50:09 -04:00
parent 8aaab82287
commit 325106920f
2 changed files with 821 additions and 7 deletions
+648 -3
View File
@@ -12,14 +12,15 @@ use std::env;
use std::io::{self, BufRead, Write};
use std::path::PathBuf;
use std::process::ExitCode;
use std::time::Duration;
use std::time::{Duration, Instant};
use clap::{Args, Parser, Subcommand, ValueEnum};
use futures_util::StreamExt;
use zb_mom_ww_mxgateway_client::generated::galaxy_repository::v1::DeployEvent;
use zb_mom_ww_mxgateway_client::generated::mxaccess_gateway::v1::{
CloseSessionRequest, MxCommand, MxCommandKind, MxCommandRequest, OpenSessionRequest,
PingCommand, StreamEventsRequest,
CloseSessionRequest, MxCommand, MxCommandKind, MxCommandRequest, MxValue as ProtoMxValue,
OpenSessionRequest, PingCommand, StreamEventsRequest, Write2BulkEntry, WriteBulkEntry,
WriteSecured2BulkEntry, WriteSecuredBulkEntry,
};
use zb_mom_ww_mxgateway_client::{
ApiKey, ClientOptions, Error, GalaxyClient, GatewayClient, MxValue, CLIENT_VERSION,
@@ -128,6 +129,137 @@ enum Command {
#[arg(long)]
json: bool,
},
/// Snapshot the current value for each requested tag. Cached
/// OnDataChange values are returned for tags that are already advised
/// without touching the existing subscription; otherwise the worker
/// takes a one-shot AddItem + Advise + UnAdvise + RemoveItem lifecycle.
ReadBulk {
#[command(flatten)]
connection: ConnectionArgs,
#[arg(long)]
session_id: String,
#[arg(long)]
server_handle: i32,
#[arg(long, value_delimiter = ',')]
items: Vec<String>,
/// Per-tag snapshot timeout in milliseconds. `0` uses the worker default (1000 ms).
#[arg(long, default_value_t = 0)]
timeout_ms: u32,
#[arg(long)]
json: bool,
},
/// Bulk Write — one MXAccess Write per (item_handle, value) pair.
WriteBulk {
#[command(flatten)]
connection: ConnectionArgs,
#[arg(long)]
session_id: String,
#[arg(long)]
server_handle: i32,
#[arg(long, value_delimiter = ',')]
item_handles: Vec<i32>,
#[arg(long, value_enum)]
value_type: CliValueType,
#[arg(long, value_delimiter = ',')]
values: Vec<String>,
#[arg(long, default_value_t = 0)]
user_id: i32,
#[arg(long)]
json: bool,
},
/// Bulk Write2 — timestamped variant; the timestamp applies to all entries.
Write2Bulk {
#[command(flatten)]
connection: ConnectionArgs,
#[arg(long)]
session_id: String,
#[arg(long)]
server_handle: i32,
#[arg(long, value_delimiter = ',')]
item_handles: Vec<i32>,
#[arg(long, value_enum)]
value_type: CliValueType,
#[arg(long, value_delimiter = ',')]
values: Vec<String>,
#[arg(long)]
timestamp: String,
#[arg(long, default_value_t = 0)]
user_id: i32,
#[arg(long)]
json: bool,
},
/// Bulk WriteSecured.
WriteSecuredBulk {
#[command(flatten)]
connection: ConnectionArgs,
#[arg(long)]
session_id: String,
#[arg(long)]
server_handle: i32,
#[arg(long, value_delimiter = ',')]
item_handles: Vec<i32>,
#[arg(long, value_enum)]
value_type: CliValueType,
#[arg(long, value_delimiter = ',')]
values: Vec<String>,
#[arg(long, default_value_t = 0)]
current_user_id: i32,
#[arg(long, default_value_t = 0)]
verifier_user_id: i32,
#[arg(long)]
json: bool,
},
/// Bulk WriteSecured2 — timestamped + verified.
WriteSecured2Bulk {
#[command(flatten)]
connection: ConnectionArgs,
#[arg(long)]
session_id: String,
#[arg(long)]
server_handle: i32,
#[arg(long, value_delimiter = ',')]
item_handles: Vec<i32>,
#[arg(long, value_enum)]
value_type: CliValueType,
#[arg(long, value_delimiter = ',')]
values: Vec<String>,
#[arg(long)]
timestamp: String,
#[arg(long, default_value_t = 0)]
current_user_id: i32,
#[arg(long, default_value_t = 0)]
verifier_user_id: i32,
#[arg(long)]
json: bool,
},
/// Cross-language stress benchmark for ReadBulk: opens its own session,
/// subscribes to `--bulk-size` tags so the worker's per-session value cache
/// populates from real OnDataChange events, then hammers ReadBulk in a
/// tight loop for `--duration-seconds` with per-call latency capture. Emits
/// a single JSON object on stdout matching the schema the
/// `scripts/bench-read-bulk.ps1` driver collates across all five clients.
BenchReadBulk {
#[command(flatten)]
connection: ConnectionArgs,
#[arg(long, default_value = "mxgw-rust-bench")]
client_name: String,
#[arg(long, default_value_t = 30)]
duration_seconds: u64,
#[arg(long, default_value_t = 3)]
warmup_seconds: u64,
#[arg(long, default_value_t = 6)]
bulk_size: usize,
#[arg(long, default_value_t = 1)]
tag_start: i32,
#[arg(long, default_value = "TestMachine_")]
tag_prefix: String,
#[arg(long, default_value = "TestChangingInt")]
tag_attribute: String,
#[arg(long, default_value_t = 1500)]
timeout_ms: u32,
#[arg(long)]
json: bool,
},
StreamEvents {
#[command(flatten)]
connection: ConnectionArgs,
@@ -450,6 +582,162 @@ async fn dispatch(command: Command) -> Result<(), Error> {
.await?;
print_bulk_results("unsubscribe-bulk", &results, json);
}
Command::ReadBulk {
connection,
session_id,
server_handle,
items,
timeout_ms,
json,
} => {
let session = session_for(connection, session_id).await?;
let results = session.read_bulk(server_handle, items, timeout_ms).await?;
print_read_bulk_results("read-bulk", &results, json);
}
Command::WriteBulk {
connection,
session_id,
server_handle,
item_handles,
value_type,
values,
user_id,
json,
} => {
let entries = build_write_bulk_entries(&item_handles, value_type, &values)?;
let session = session_for(connection, session_id).await?;
let results = session
.write_bulk(
server_handle,
entries
.into_iter()
.map(|(item_handle, value)| WriteBulkEntry {
item_handle,
value: Some(value),
user_id,
})
.collect(),
)
.await?;
print_write_bulk_results("write-bulk", &results, json);
}
Command::Write2Bulk {
connection,
session_id,
server_handle,
item_handles,
value_type,
values,
timestamp,
user_id,
json,
} => {
let entries = build_write_bulk_entries(&item_handles, value_type, &values)?;
let timestamp_value: ProtoMxValue = MxValue::string(timestamp).into_proto();
let session = session_for(connection, session_id).await?;
let results = session
.write2_bulk(
server_handle,
entries
.into_iter()
.map(|(item_handle, value)| Write2BulkEntry {
item_handle,
value: Some(value),
timestamp_value: Some(timestamp_value.clone()),
user_id,
})
.collect(),
)
.await?;
print_write_bulk_results("write2-bulk", &results, json);
}
Command::WriteSecuredBulk {
connection,
session_id,
server_handle,
item_handles,
value_type,
values,
current_user_id,
verifier_user_id,
json,
} => {
let entries = build_write_bulk_entries(&item_handles, value_type, &values)?;
let session = session_for(connection, session_id).await?;
let results = session
.write_secured_bulk(
server_handle,
entries
.into_iter()
.map(|(item_handle, value)| WriteSecuredBulkEntry {
item_handle,
value: Some(value),
current_user_id,
verifier_user_id,
})
.collect(),
)
.await?;
print_write_bulk_results("write-secured-bulk", &results, json);
}
Command::WriteSecured2Bulk {
connection,
session_id,
server_handle,
item_handles,
value_type,
values,
timestamp,
current_user_id,
verifier_user_id,
json,
} => {
let entries = build_write_bulk_entries(&item_handles, value_type, &values)?;
let timestamp_value: ProtoMxValue = MxValue::string(timestamp).into_proto();
let session = session_for(connection, session_id).await?;
let results = session
.write_secured2_bulk(
server_handle,
entries
.into_iter()
.map(|(item_handle, value)| WriteSecured2BulkEntry {
item_handle,
value: Some(value),
timestamp_value: Some(timestamp_value.clone()),
current_user_id,
verifier_user_id,
})
.collect(),
)
.await?;
print_write_bulk_results("write-secured2-bulk", &results, json);
}
Command::BenchReadBulk {
connection,
client_name,
duration_seconds,
warmup_seconds,
bulk_size,
tag_start,
tag_prefix,
tag_attribute,
timeout_ms,
json,
} => {
run_bench_read_bulk(
connection,
client_name,
duration_seconds,
warmup_seconds,
bulk_size,
tag_start,
tag_prefix,
tag_attribute,
timeout_ms,
json,
)
.await?;
}
Command::StreamEvents {
connection,
session_id,
@@ -891,6 +1179,363 @@ fn parse_value(value_type: CliValueType, value: &str) -> Result<MxValue, Error>
Ok(parsed)
}
fn print_write_bulk_results(
operation: &str,
results: &[zb_mom_ww_mxgateway_client::generated::mxaccess_gateway::v1::BulkWriteResult],
use_json: bool,
) {
if use_json {
let results_json: Vec<_> = results
.iter()
.map(|result| {
json!({
"serverHandle": result.server_handle,
"itemHandle": result.item_handle,
"wasSuccessful": result.was_successful,
"hresult": result.hresult,
"errorMessage": result.error_message,
})
})
.collect();
println!(
"{}",
json!({ "operation": operation, "results": results_json })
);
} else {
println!("{}", results.len());
}
}
fn print_read_bulk_results(
operation: &str,
results: &[zb_mom_ww_mxgateway_client::generated::mxaccess_gateway::v1::BulkReadResult],
use_json: bool,
) {
if use_json {
let results_json: Vec<_> = results
.iter()
.map(|result| {
json!({
"serverHandle": result.server_handle,
"tagAddress": result.tag_address,
"itemHandle": result.item_handle,
"wasSuccessful": result.was_successful,
"wasCached": result.was_cached,
"quality": result.quality,
"errorMessage": result.error_message,
})
})
.collect();
println!(
"{}",
json!({ "operation": operation, "results": results_json })
);
} else {
println!("{}", results.len());
}
}
/// Drive the cross-language ReadBulk stress benchmark from Rust: opens its
/// own session, subscribes to `bulk_size` tags so the worker's per-session
/// value cache populates from real OnDataChange events, hammers ReadBulk in
/// a tight loop for `duration_seconds` with per-call latency capture, and
/// emits the shared JSON stats schema the `scripts/bench-read-bulk.ps1`
/// driver collates across all five clients.
#[allow(clippy::too_many_arguments)]
async fn run_bench_read_bulk(
connection: ConnectionArgs,
client_name: String,
duration_seconds: u64,
warmup_seconds: u64,
bulk_size: usize,
tag_start: i32,
tag_prefix: String,
tag_attribute: String,
timeout_ms: u32,
json: bool,
) -> Result<(), Error> {
if bulk_size == 0 {
return Err(Error::InvalidArgument {
name: "bulk-size".to_owned(),
detail: "bulk-size must be positive".to_owned(),
});
}
if duration_seconds == 0 {
return Err(Error::InvalidArgument {
name: "duration-seconds".to_owned(),
detail: "duration-seconds must be positive".to_owned(),
});
}
// Build TestMachine_NNN.<attribute> tags with three-digit machine numbers
// matching the existing cross-language tag-discovery convention.
let tags: Vec<String> = (0..bulk_size)
.map(|index| {
format!(
"{prefix}{number:03}.{attr}",
prefix = tag_prefix,
number = tag_start + index as i32,
attr = tag_attribute,
)
})
.collect();
let endpoint = connection.endpoint.clone();
let client = connect(connection).await?;
let session = client
.open_session(OpenSessionRequest {
client_session_name: client_name.clone(),
..OpenSessionRequest::default()
})
.await?;
let session_id = session.id().to_owned();
// Subscribe so the worker's MxAccessValueCache populates from real
// OnDataChange events before the measurement window opens. Any per-tag
// failures fall through silently; the bench is still meaningful for the
// successfully-subscribed subset.
let bench_outcome = async {
let server_handle = session.register(&client_name).await?;
let subscribe_results = session
.subscribe_bulk(server_handle, tags.clone())
.await?;
let item_handles: Vec<i32> = subscribe_results
.iter()
.filter(|r| r.was_successful)
.map(|r| r.item_handle)
.collect();
let timeout_ms_param = timeout_ms;
// Warm-up: drive identical calls so any connection-pool / channel
// setup is amortised before the measurement window opens.
let warmup_deadline = Instant::now() + Duration::from_secs(warmup_seconds);
while Instant::now() < warmup_deadline {
let _ = session
.read_bulk(server_handle, tags.clone(), timeout_ms_param)
.await;
}
// Steady-state measurement window: capture per-call latency as
// sub-millisecond f64 deltas from Instant::now() so the histogram
// resolution matches the .NET Stopwatch / Go time.Now path.
let mut latencies_ms: Vec<f64> = Vec::with_capacity(65_536);
let mut total_read_results: i64 = 0;
let mut cached_read_results: i64 = 0;
let mut successful_calls: u64 = 0;
let mut failed_calls: u64 = 0;
let steady_start = Instant::now();
let steady_deadline = steady_start + Duration::from_secs(duration_seconds);
while Instant::now() < steady_deadline {
let call_start = Instant::now();
let result = session
.read_bulk(server_handle, tags.clone(), timeout_ms_param)
.await;
let elapsed = call_start.elapsed();
latencies_ms.push(elapsed.as_secs_f64() * 1000.0);
match result {
Ok(results) => {
successful_calls += 1;
for r in &results {
total_read_results += 1;
if r.was_cached {
cached_read_results += 1;
}
}
}
Err(_) => failed_calls += 1,
}
}
let steady_elapsed = steady_start.elapsed();
// Best-effort cleanup: unsubscribe so the worker can release cache slots.
if !item_handles.is_empty() {
let _ = session
.unsubscribe_bulk(server_handle, item_handles)
.await;
}
let total_calls = successful_calls + failed_calls;
let calls_per_second = if steady_elapsed.as_secs_f64() > 0.0 {
total_calls as f64 / steady_elapsed.as_secs_f64()
} else {
0.0
};
Ok::<_, Error>(BenchStats {
endpoint,
client_name,
bulk_size,
duration_seconds,
warmup_seconds,
duration_ms: steady_elapsed.as_millis() as u64,
tags: tags.clone(),
total_calls,
successful_calls,
failed_calls,
total_read_results,
cached_read_results,
calls_per_second,
latencies_ms,
})
}
.await;
// Always close the session, even if the bench loop returned an error.
let close_result = client
.close_session_raw(CloseSessionRequest {
session_id: session_id.clone(),
client_correlation_id: "rust-cli-bench-read-bulk-close".to_owned(),
})
.await;
let stats = bench_outcome?;
// Closing the session is best-effort; never let it mask a real bench error.
let _ = close_result;
if json {
let latency = percentile_summary(&stats.latencies_ms);
let payload = json!({
"language": "rust",
"command": "bench-read-bulk",
"endpoint": stats.endpoint,
"clientName": stats.client_name,
"bulkSize": stats.bulk_size,
"durationSeconds": stats.duration_seconds,
"warmupSeconds": stats.warmup_seconds,
"durationMs": stats.duration_ms,
"tags": stats.tags,
"totalCalls": stats.total_calls,
"successfulCalls": stats.successful_calls,
"failedCalls": stats.failed_calls,
"totalReadResults": stats.total_read_results,
"cachedReadResults": stats.cached_read_results,
"callsPerSecond": round_to(stats.calls_per_second, 2),
"latencyMs": {
"p50": round_to(latency.p50, 3),
"p95": round_to(latency.p95, 3),
"p99": round_to(latency.p99, 3),
"max": round_to(latency.max, 3),
"mean": round_to(latency.mean, 3),
},
});
println!("{payload}");
} else {
println!("{}", stats.calls_per_second);
}
Ok(())
}
/// Collected bench-read-bulk measurements; carried in one struct so the
/// async block can finish cleanup (unsubscribe, close-session) before the
/// caller renders the JSON / plain output.
struct BenchStats {
endpoint: String,
client_name: String,
bulk_size: usize,
duration_seconds: u64,
warmup_seconds: u64,
duration_ms: u64,
tags: Vec<String>,
total_calls: u64,
successful_calls: u64,
failed_calls: u64,
total_read_results: i64,
cached_read_results: i64,
calls_per_second: f64,
latencies_ms: Vec<f64>,
}
/// The same `{ p50, p95, p99, max, mean }` shape every language bench emits.
/// `p50`/`p95`/`p99` use nearest-rank with linear interpolation, matching the
/// .NET / Go implementations so cross-language comparisons are apples-to-apples.
struct LatencySummary {
p50: f64,
p95: f64,
p99: f64,
max: f64,
mean: f64,
}
fn percentile_summary(sample: &[f64]) -> LatencySummary {
if sample.is_empty() {
return LatencySummary {
p50: 0.0,
p95: 0.0,
p99: 0.0,
max: 0.0,
mean: 0.0,
};
}
let mut sorted: Vec<f64> = sample.to_vec();
sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
let max = sorted[sorted.len() - 1];
let mean: f64 = sample.iter().copied().sum::<f64>() / sample.len() as f64;
LatencySummary {
p50: percentile(&sorted, 0.50),
p95: percentile(&sorted, 0.95),
p99: percentile(&sorted, 0.99),
max,
mean,
}
}
fn percentile(sorted: &[f64], quantile: f64) -> f64 {
if sorted.is_empty() {
return 0.0;
}
if sorted.len() == 1 {
return sorted[0];
}
let rank = quantile * (sorted.len() - 1) as f64;
let lower = rank.floor() as usize;
let upper = lower + 1;
if upper >= sorted.len() {
return sorted[lower];
}
let fraction = rank - lower as f64;
sorted[lower] + (sorted[upper] - sorted[lower]) * fraction
}
fn round_to(value: f64, digits: u32) -> f64 {
let shift = 10f64.powi(digits as i32);
(value * shift).round() / shift
}
/// Pairs each parsed item handle with its parsed MxValue (proto form) so a
/// single helper can build the four bulk-write families without each branch
/// repeating the length check and per-value parsing.
fn build_write_bulk_entries(
item_handles: &[i32],
value_type: CliValueType,
values: &[String],
) -> Result<
Vec<(
i32,
zb_mom_ww_mxgateway_client::generated::mxaccess_gateway::v1::MxValue,
)>,
Error,
> {
if item_handles.len() != values.len() {
return Err(Error::InvalidArgument {
name: "values".to_owned(),
detail: format!(
"item-handles count ({}) does not match values count ({})",
item_handles.len(),
values.len()
),
});
}
item_handles
.iter()
.zip(values.iter())
.map(|(handle, value)| {
parse_value(value_type, value).map(|wrapper| (*handle, wrapper.into_proto()))
})
.collect()
}
fn print_deploy_event(event: &DeployEvent, use_json: bool) {
if use_json {
println!(