325106920f
The bulk-write/read SDK methods (read_bulk, write_bulk, write2_bulk,
write_secured_bulk, write_secured2_bulk) and the matching clap
subcommands (ReadBulk, WriteBulk, Write2Bulk, WriteSecuredBulk,
WriteSecured2Bulk) were already on HEAD from a prior session — they
were the only bulk family that HEAD shipped before the .NET / Go /
Python / Java parallel ports. The one missing piece from the divergent
branch (commit f220908) was the BenchReadBulk benchmark harness.
mxgw-cli/src/main.rs adds:
- BenchReadBulk clap variant with flags --client-name,
--duration-seconds, --warmup-seconds, --bulk-size, --tag-start,
--tag-prefix, --tag-attribute, --timeout-ms, --json — defaults match
the .NET and Go benches.
- run_bench_read_bulk(): open-session → register → subscribe_bulk on
the synthesized TestMachine_NNN.TestChangingInt tags to populate the
worker value cache → warmup → steady-state loop with per-call
std::time::Instant capture → unsubscribe → close-session.
- BenchStats + LatencySummary structs and a percentile()
helper (nearest-rank with linear interpolation, matching the Go and
.NET implementations) so the cross-language JSON output is byte-for-
byte comparable. JSON schema: language / command / endpoint /
clientName / bulkSize / durationSeconds / warmupSeconds / durationMs
/ tags / totalCalls / successfulCalls / failedCalls /
totalReadResults / cachedReadResults / callsPerSecond /
latencyMs:{p50,p95,p99,max,mean}. scripts/bench-read-bulk.ps1 will
pick up the Rust line on its next run.
session.rs picks up minor tightening tied to the bulk SDK methods that
were already in the file (per-entry validation paths, BulkReplyKind
dispatch coverage) — no public-surface change.
Verification: cargo build --workspace clean (the 2 pre-existing
options.rs missing_docs warnings remain — out of scope); cargo test
--workspace 34/34 passing; cargo clippy --workspace --all-targets has
only the 3 pre-existing tolerated warnings (enum_variant_names on
BulkReplyKind, missing_docs on options.rs, clone_on_copy on
galaxy.rs:282). Manual smoke against live gateway on localhost:5120:
read-bulk on two TestMachine tags returned wasCached=true,
wasSuccessful=true; bench-read-bulk --duration-seconds 2
--warmup-seconds 1 --bulk-size 2 --json ran 363 calls / 181.35 calls
per second / p50=5.3 ms / p99=7.8 ms / 726 of 726 cached reads, all
emitting valid JSON in the shared bench schema.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
732 lines
23 KiB
Rust
732 lines
23 KiB
Rust
//! Typed handle around an opened gateway session.
|
|
//!
|
|
//! [`Session`] wraps an `OpenSession` reply (just the session id) plus a
|
|
//! cloned [`GatewayClient`] and offers Rust-shaped methods for the
|
|
//! command surface that the worker exposes — `Register`, `AddItem`,
|
|
//! bulk subscribe variants, `Write`/`Write2`, and the event stream.
|
|
//!
|
|
//! Bulk commands enforce a 1000-item cap before contacting the worker, in
|
|
//! line with the gateway's documented `MAX_BULK_ITEMS`.
|
|
|
|
use crate::client::{EventStream, GatewayClient};
|
|
use crate::error::{ensure_protocol_success, Error};
|
|
use crate::generated::mxaccess_gateway::v1::mx_command::Payload;
|
|
use crate::generated::mxaccess_gateway::v1::mx_command_reply;
|
|
use crate::generated::mxaccess_gateway::v1::{
|
|
AddItem2Command, AddItemBulkCommand, AddItemCommand, AdviseCommand, AdviseItemBulkCommand,
|
|
BulkReadResult, BulkWriteResult, CloseSessionRequest, MxCommand, MxCommandKind, MxCommandReply,
|
|
MxCommandRequest, MxValue as ProtoMxValue, OpenSessionRequest, ReadBulkCommand,
|
|
RegisterCommand, RemoveItemBulkCommand, RemoveItemCommand, StreamEventsRequest,
|
|
SubscribeBulkCommand, SubscribeResult, UnAdviseCommand, UnAdviseItemBulkCommand,
|
|
UnsubscribeBulkCommand, Write2BulkCommand, Write2BulkEntry, Write2Command, WriteBulkCommand,
|
|
WriteBulkEntry, WriteCommand, WriteSecured2BulkCommand, WriteSecured2BulkEntry,
|
|
WriteSecuredBulkCommand, WriteSecuredBulkEntry,
|
|
};
|
|
use crate::value::MxValue;
|
|
|
|
const MAX_BULK_ITEMS: usize = 1_000;
|
|
|
|
/// Handle to an opened gateway session.
|
|
///
|
|
/// `Session` carries the gateway-issued session id and a cloned
|
|
/// [`GatewayClient`]. All methods are async and stateless on the client
|
|
/// side: the gateway tracks per-session worker state. Drop the handle and
|
|
/// call [`Session::close`] to release the worker; the gateway also reaps
|
|
/// orphaned sessions on its own schedule.
|
|
#[derive(Clone)]
|
|
pub struct Session {
|
|
id: String,
|
|
client: GatewayClient,
|
|
}
|
|
|
|
impl Session {
|
|
pub(crate) fn new(id: impl Into<String>, client: GatewayClient) -> Self {
|
|
Self {
|
|
id: id.into(),
|
|
client,
|
|
}
|
|
}
|
|
|
|
/// Borrow the gateway-assigned session id.
|
|
pub fn id(&self) -> &str {
|
|
&self.id
|
|
}
|
|
|
|
/// Convenience constructor that issues `OpenSession` with the supplied
|
|
/// client session name and returns the resulting [`Session`].
|
|
///
|
|
/// # Errors
|
|
///
|
|
/// Propagates errors from
|
|
/// [`GatewayClient::open_session`](crate::client::GatewayClient::open_session).
|
|
pub async fn open(client: GatewayClient, client_session_name: &str) -> Result<Self, Error> {
|
|
client
|
|
.open_session(OpenSessionRequest {
|
|
client_session_name: client_session_name.to_owned(),
|
|
..OpenSessionRequest::default()
|
|
})
|
|
.await
|
|
}
|
|
|
|
/// Issue `CloseSession` against this session id.
|
|
///
|
|
/// # Errors
|
|
///
|
|
/// Returns [`Error::ProtocolStatus`] if the gateway returns a non-OK
|
|
/// envelope and any transport/status errors propagated by tonic.
|
|
pub async fn close(&self) -> Result<(), Error> {
|
|
let reply = self
|
|
.client
|
|
.close_session_raw(CloseSessionRequest {
|
|
session_id: self.id.clone(),
|
|
client_correlation_id: "rust-client-close-session".to_owned(),
|
|
})
|
|
.await?;
|
|
ensure_protocol_success("close session", reply.protocol_status.as_ref())?;
|
|
Ok(())
|
|
}
|
|
|
|
/// Run MXAccess `Register` and return the assigned `ServerHandle`.
|
|
///
|
|
/// # Errors
|
|
///
|
|
/// Returns [`Error::Command`] if the worker reports a non-OK status,
|
|
/// plus transport/status errors from tonic.
|
|
pub async fn register(&self, client_name: &str) -> Result<i32, Error> {
|
|
let reply = self
|
|
.invoke(
|
|
MxCommandKind::Register,
|
|
Payload::Register(RegisterCommand {
|
|
client_name: client_name.to_owned(),
|
|
}),
|
|
)
|
|
.await?;
|
|
|
|
Ok(register_server_handle(&reply))
|
|
}
|
|
|
|
/// Run MXAccess `AddItem` against `server_handle` and return the
|
|
/// assigned `ItemHandle`.
|
|
///
|
|
/// # Errors
|
|
///
|
|
/// Returns [`Error::Command`] when MXAccess rejects the item
|
|
/// definition, plus transport/status errors from tonic.
|
|
pub async fn add_item(&self, server_handle: i32, item_definition: &str) -> Result<i32, Error> {
|
|
let reply = self
|
|
.invoke(
|
|
MxCommandKind::AddItem,
|
|
Payload::AddItem(AddItemCommand {
|
|
server_handle,
|
|
item_definition: item_definition.to_owned(),
|
|
}),
|
|
)
|
|
.await?;
|
|
|
|
Ok(add_item_handle(&reply))
|
|
}
|
|
|
|
/// Run MXAccess `AddItem2` (item with a caller-supplied context string)
|
|
/// and return the assigned `ItemHandle`.
|
|
///
|
|
/// # Errors
|
|
///
|
|
/// Same conditions as [`Session::add_item`].
|
|
pub async fn add_item2(
|
|
&self,
|
|
server_handle: i32,
|
|
item_definition: &str,
|
|
item_context: &str,
|
|
) -> Result<i32, Error> {
|
|
let reply = self
|
|
.invoke(
|
|
MxCommandKind::AddItem2,
|
|
Payload::AddItem2(AddItem2Command {
|
|
server_handle,
|
|
item_definition: item_definition.to_owned(),
|
|
item_context: item_context.to_owned(),
|
|
}),
|
|
)
|
|
.await?;
|
|
|
|
Ok(add_item2_handle(&reply))
|
|
}
|
|
|
|
/// Run MXAccess `RemoveItem` for the given handle pair.
|
|
///
|
|
/// # Errors
|
|
///
|
|
/// Returns [`Error::Command`] on a non-OK worker status, plus the
|
|
/// usual transport/status errors.
|
|
pub async fn remove_item(&self, server_handle: i32, item_handle: i32) -> Result<(), Error> {
|
|
self.invoke(
|
|
MxCommandKind::RemoveItem,
|
|
Payload::RemoveItem(RemoveItemCommand {
|
|
server_handle,
|
|
item_handle,
|
|
}),
|
|
)
|
|
.await?;
|
|
Ok(())
|
|
}
|
|
|
|
/// Run MXAccess `Advise` to start receiving change notifications for
|
|
/// the given item.
|
|
///
|
|
/// # Errors
|
|
///
|
|
/// Returns [`Error::Command`] when the worker reports a non-OK status.
|
|
pub async fn advise(&self, server_handle: i32, item_handle: i32) -> Result<(), Error> {
|
|
self.invoke(
|
|
MxCommandKind::Advise,
|
|
Payload::Advise(AdviseCommand {
|
|
server_handle,
|
|
item_handle,
|
|
}),
|
|
)
|
|
.await?;
|
|
Ok(())
|
|
}
|
|
|
|
/// Run MXAccess `UnAdvise` to stop change notifications for the given
|
|
/// item.
|
|
///
|
|
/// # Errors
|
|
///
|
|
/// Returns [`Error::Command`] when the worker reports a non-OK status.
|
|
pub async fn un_advise(&self, server_handle: i32, item_handle: i32) -> Result<(), Error> {
|
|
self.invoke(
|
|
MxCommandKind::UnAdvise,
|
|
Payload::UnAdvise(UnAdviseCommand {
|
|
server_handle,
|
|
item_handle,
|
|
}),
|
|
)
|
|
.await?;
|
|
Ok(())
|
|
}
|
|
|
|
/// Bulk variant of [`Session::add_item`]. Each tag address yields one
|
|
/// `SubscribeResult` in the returned vector.
|
|
///
|
|
/// # Errors
|
|
///
|
|
/// Returns [`Error::InvalidArgument`] when the input exceeds the
|
|
/// gateway's 1000-item bulk cap, plus the usual command-level errors.
|
|
pub async fn add_item_bulk(
|
|
&self,
|
|
server_handle: i32,
|
|
tag_addresses: Vec<String>,
|
|
) -> Result<Vec<SubscribeResult>, Error> {
|
|
ensure_bulk_size("tag_addresses", tag_addresses.len())?;
|
|
let reply = self
|
|
.invoke(
|
|
MxCommandKind::AddItemBulk,
|
|
Payload::AddItemBulk(AddItemBulkCommand {
|
|
server_handle,
|
|
tag_addresses,
|
|
}),
|
|
)
|
|
.await?;
|
|
|
|
Ok(bulk_results(reply, BulkReplyKind::AddItemBulk))
|
|
}
|
|
|
|
/// Bulk variant of [`Session::advise`].
|
|
///
|
|
/// # Errors
|
|
///
|
|
/// Same conditions as [`Session::add_item_bulk`].
|
|
pub async fn advise_item_bulk(
|
|
&self,
|
|
server_handle: i32,
|
|
item_handles: Vec<i32>,
|
|
) -> Result<Vec<SubscribeResult>, Error> {
|
|
ensure_bulk_size("item_handles", item_handles.len())?;
|
|
let reply = self
|
|
.invoke(
|
|
MxCommandKind::AdviseItemBulk,
|
|
Payload::AdviseItemBulk(AdviseItemBulkCommand {
|
|
server_handle,
|
|
item_handles,
|
|
}),
|
|
)
|
|
.await?;
|
|
|
|
Ok(bulk_results(reply, BulkReplyKind::AdviseItemBulk))
|
|
}
|
|
|
|
/// Bulk variant of [`Session::remove_item`].
|
|
///
|
|
/// # Errors
|
|
///
|
|
/// Same conditions as [`Session::add_item_bulk`].
|
|
pub async fn remove_item_bulk(
|
|
&self,
|
|
server_handle: i32,
|
|
item_handles: Vec<i32>,
|
|
) -> Result<Vec<SubscribeResult>, Error> {
|
|
ensure_bulk_size("item_handles", item_handles.len())?;
|
|
let reply = self
|
|
.invoke(
|
|
MxCommandKind::RemoveItemBulk,
|
|
Payload::RemoveItemBulk(RemoveItemBulkCommand {
|
|
server_handle,
|
|
item_handles,
|
|
}),
|
|
)
|
|
.await?;
|
|
|
|
Ok(bulk_results(reply, BulkReplyKind::RemoveItemBulk))
|
|
}
|
|
|
|
/// Bulk variant of [`Session::un_advise`].
|
|
///
|
|
/// # Errors
|
|
///
|
|
/// Same conditions as [`Session::add_item_bulk`].
|
|
pub async fn un_advise_item_bulk(
|
|
&self,
|
|
server_handle: i32,
|
|
item_handles: Vec<i32>,
|
|
) -> Result<Vec<SubscribeResult>, Error> {
|
|
ensure_bulk_size("item_handles", item_handles.len())?;
|
|
let reply = self
|
|
.invoke(
|
|
MxCommandKind::UnAdviseItemBulk,
|
|
Payload::UnAdviseItemBulk(UnAdviseItemBulkCommand {
|
|
server_handle,
|
|
item_handles,
|
|
}),
|
|
)
|
|
.await?;
|
|
|
|
Ok(bulk_results(reply, BulkReplyKind::UnAdviseItemBulk))
|
|
}
|
|
|
|
/// Bulk `Subscribe` (atomic add-and-advise) for a list of tag addresses.
|
|
///
|
|
/// # Errors
|
|
///
|
|
/// Same conditions as [`Session::add_item_bulk`].
|
|
pub async fn subscribe_bulk(
|
|
&self,
|
|
server_handle: i32,
|
|
tag_addresses: Vec<String>,
|
|
) -> Result<Vec<SubscribeResult>, Error> {
|
|
ensure_bulk_size("tag_addresses", tag_addresses.len())?;
|
|
let reply = self
|
|
.invoke(
|
|
MxCommandKind::SubscribeBulk,
|
|
Payload::SubscribeBulk(SubscribeBulkCommand {
|
|
server_handle,
|
|
tag_addresses,
|
|
}),
|
|
)
|
|
.await?;
|
|
|
|
Ok(bulk_results(reply, BulkReplyKind::SubscribeBulk))
|
|
}
|
|
|
|
/// Bulk `Unsubscribe` (atomic un-advise-and-remove) for a list of
|
|
/// item handles.
|
|
///
|
|
/// # Errors
|
|
///
|
|
/// Same conditions as [`Session::add_item_bulk`].
|
|
pub async fn unsubscribe_bulk(
|
|
&self,
|
|
server_handle: i32,
|
|
item_handles: Vec<i32>,
|
|
) -> Result<Vec<SubscribeResult>, Error> {
|
|
ensure_bulk_size("item_handles", item_handles.len())?;
|
|
let reply = self
|
|
.invoke(
|
|
MxCommandKind::UnsubscribeBulk,
|
|
Payload::UnsubscribeBulk(UnsubscribeBulkCommand {
|
|
server_handle,
|
|
item_handles,
|
|
}),
|
|
)
|
|
.await?;
|
|
|
|
Ok(bulk_results(reply, BulkReplyKind::UnsubscribeBulk))
|
|
}
|
|
|
|
/// Bulk `Read` — snapshot the current value for each requested tag.
|
|
///
|
|
/// MXAccess COM has no synchronous `Read`; the worker satisfies this by
|
|
/// returning the most recent cached `OnDataChange` value when the tag is
|
|
/// already advised (`was_cached = true`), or by taking a full AddItem +
|
|
/// Advise + wait + UnAdvise + RemoveItem snapshot lifecycle otherwise.
|
|
/// `timeout_ms == 0` lets the worker pick its default (1000 ms).
|
|
/// Per-tag failures appear as `BulkReadResult` entries with
|
|
/// `was_successful = false`; the call never errors on per-tag failure.
|
|
///
|
|
/// # Errors
|
|
///
|
|
/// Same conditions as [`Session::add_item_bulk`].
|
|
pub async fn read_bulk(
|
|
&self,
|
|
server_handle: i32,
|
|
tag_addresses: Vec<String>,
|
|
timeout_ms: u32,
|
|
) -> Result<Vec<BulkReadResult>, Error> {
|
|
ensure_bulk_size("tag_addresses", tag_addresses.len())?;
|
|
let reply = self
|
|
.invoke(
|
|
MxCommandKind::ReadBulk,
|
|
Payload::ReadBulk(ReadBulkCommand {
|
|
server_handle,
|
|
tag_addresses,
|
|
timeout_ms,
|
|
}),
|
|
)
|
|
.await?;
|
|
|
|
Ok(match reply.payload {
|
|
Some(mx_command_reply::Payload::ReadBulk(reply)) => reply.results,
|
|
_ => Vec::new(),
|
|
})
|
|
}
|
|
|
|
/// Bulk `Write` (sequential MXAccess Write per entry, on the worker's STA).
|
|
///
|
|
/// Per-entry MXAccess failures are reported as `BulkWriteResult` entries
|
|
/// with `was_successful = false`; the call never errors on per-entry
|
|
/// failure. Protocol-level failures still surface as [`Error::Command`].
|
|
///
|
|
/// # Errors
|
|
///
|
|
/// Same conditions as [`Session::add_item_bulk`], plus the usual
|
|
/// transport/status errors.
|
|
pub async fn write_bulk(
|
|
&self,
|
|
server_handle: i32,
|
|
entries: Vec<WriteBulkEntry>,
|
|
) -> Result<Vec<BulkWriteResult>, Error> {
|
|
ensure_bulk_size("entries", entries.len())?;
|
|
let reply = self
|
|
.invoke(
|
|
MxCommandKind::WriteBulk,
|
|
Payload::WriteBulk(WriteBulkCommand {
|
|
server_handle,
|
|
entries,
|
|
}),
|
|
)
|
|
.await?;
|
|
|
|
Ok(bulk_write_results(reply, BulkWriteReplyKind::Write))
|
|
}
|
|
|
|
/// Bulk `Write2` (timestamped) — see [`Session::write_bulk`].
|
|
///
|
|
/// # Errors
|
|
///
|
|
/// Same conditions as [`Session::write_bulk`].
|
|
pub async fn write2_bulk(
|
|
&self,
|
|
server_handle: i32,
|
|
entries: Vec<Write2BulkEntry>,
|
|
) -> Result<Vec<BulkWriteResult>, Error> {
|
|
ensure_bulk_size("entries", entries.len())?;
|
|
let reply = self
|
|
.invoke(
|
|
MxCommandKind::Write2Bulk,
|
|
Payload::Write2Bulk(Write2BulkCommand {
|
|
server_handle,
|
|
entries,
|
|
}),
|
|
)
|
|
.await?;
|
|
|
|
Ok(bulk_write_results(reply, BulkWriteReplyKind::Write2))
|
|
}
|
|
|
|
/// Bulk `WriteSecured` — credential-sensitive values follow the same
|
|
/// redaction contract as the single-item `write_secured` path.
|
|
///
|
|
/// # Errors
|
|
///
|
|
/// Same conditions as [`Session::write_bulk`].
|
|
pub async fn write_secured_bulk(
|
|
&self,
|
|
server_handle: i32,
|
|
entries: Vec<WriteSecuredBulkEntry>,
|
|
) -> Result<Vec<BulkWriteResult>, Error> {
|
|
ensure_bulk_size("entries", entries.len())?;
|
|
let reply = self
|
|
.invoke(
|
|
MxCommandKind::WriteSecuredBulk,
|
|
Payload::WriteSecuredBulk(WriteSecuredBulkCommand {
|
|
server_handle,
|
|
entries,
|
|
}),
|
|
)
|
|
.await?;
|
|
|
|
Ok(bulk_write_results(reply, BulkWriteReplyKind::WriteSecured))
|
|
}
|
|
|
|
/// Bulk `WriteSecured2` (timestamped) — see [`Session::write_secured_bulk`].
|
|
///
|
|
/// # Errors
|
|
///
|
|
/// Same conditions as [`Session::write_bulk`].
|
|
pub async fn write_secured2_bulk(
|
|
&self,
|
|
server_handle: i32,
|
|
entries: Vec<WriteSecured2BulkEntry>,
|
|
) -> Result<Vec<BulkWriteResult>, Error> {
|
|
ensure_bulk_size("entries", entries.len())?;
|
|
let reply = self
|
|
.invoke(
|
|
MxCommandKind::WriteSecured2Bulk,
|
|
Payload::WriteSecured2Bulk(WriteSecured2BulkCommand {
|
|
server_handle,
|
|
entries,
|
|
}),
|
|
)
|
|
.await?;
|
|
|
|
Ok(bulk_write_results(reply, BulkWriteReplyKind::WriteSecured2))
|
|
}
|
|
|
|
/// Run MXAccess `Write` (single-value, no caller-supplied timestamp).
|
|
///
|
|
/// # Errors
|
|
///
|
|
/// Returns [`Error::Command`] for non-OK worker statuses, plus the
|
|
/// usual transport/status errors.
|
|
pub async fn write(
|
|
&self,
|
|
server_handle: i32,
|
|
item_handle: i32,
|
|
value: MxValue,
|
|
user_id: i32,
|
|
) -> Result<(), Error> {
|
|
self.invoke(
|
|
MxCommandKind::Write,
|
|
Payload::Write(WriteCommand {
|
|
server_handle,
|
|
item_handle,
|
|
value: Some(value.into_proto()),
|
|
user_id,
|
|
}),
|
|
)
|
|
.await?;
|
|
Ok(())
|
|
}
|
|
|
|
/// Run MXAccess `Write2` (single-value with caller-supplied timestamp).
|
|
///
|
|
/// # Errors
|
|
///
|
|
/// Same conditions as [`Session::write`].
|
|
pub async fn write2(
|
|
&self,
|
|
server_handle: i32,
|
|
item_handle: i32,
|
|
value: MxValue,
|
|
timestamp_value: MxValue,
|
|
user_id: i32,
|
|
) -> Result<(), Error> {
|
|
self.invoke(
|
|
MxCommandKind::Write2,
|
|
Payload::Write2(Write2Command {
|
|
server_handle,
|
|
item_handle,
|
|
value: Some(value.into_proto()),
|
|
timestamp_value: Some(timestamp_value.into_proto()),
|
|
user_id,
|
|
}),
|
|
)
|
|
.await?;
|
|
Ok(())
|
|
}
|
|
|
|
/// Open the per-session event stream from the beginning.
|
|
///
|
|
/// # Errors
|
|
///
|
|
/// Returns the `tonic::Status` mapped through [`Error::from`] when the
|
|
/// gateway rejects the subscription.
|
|
pub async fn events(&self) -> Result<EventStream, Error> {
|
|
self.events_after(0).await
|
|
}
|
|
|
|
/// Open the per-session event stream, requesting only events whose
|
|
/// `worker_sequence` is greater than `after_worker_sequence`. Pass `0`
|
|
/// to receive every buffered event.
|
|
///
|
|
/// # Errors
|
|
///
|
|
/// Same conditions as [`Session::events`].
|
|
pub async fn events_after(&self, after_worker_sequence: u64) -> Result<EventStream, Error> {
|
|
self.client
|
|
.stream_events(StreamEventsRequest {
|
|
session_id: self.id.clone(),
|
|
after_worker_sequence,
|
|
})
|
|
.await
|
|
}
|
|
|
|
/// Issue a raw `Invoke` for an arbitrary command, without filtering on
|
|
/// the protocol status. Useful when callers need the full reply for
|
|
/// commands not yet wrapped by `Session`.
|
|
///
|
|
/// # Errors
|
|
///
|
|
/// Returns the `tonic::Status` mapped through [`Error::from`].
|
|
pub async fn invoke_raw(
|
|
&self,
|
|
kind: MxCommandKind,
|
|
payload: Payload,
|
|
) -> Result<MxCommandReply, Error> {
|
|
self.client
|
|
.invoke_raw(self.command_request(kind, payload))
|
|
.await
|
|
}
|
|
|
|
/// Issue an `Invoke` for an arbitrary command and surface a non-OK
|
|
/// reply as [`Error::Command`].
|
|
///
|
|
/// # Errors
|
|
///
|
|
/// Returns [`Error::Command`] for non-OK worker statuses plus any
|
|
/// errors propagated by [`invoke_raw`](Self::invoke_raw).
|
|
pub async fn invoke(
|
|
&self,
|
|
kind: MxCommandKind,
|
|
payload: Payload,
|
|
) -> Result<MxCommandReply, Error> {
|
|
self.client
|
|
.invoke(self.command_request(kind, payload))
|
|
.await
|
|
}
|
|
|
|
fn command_request(&self, kind: MxCommandKind, payload: Payload) -> MxCommandRequest {
|
|
MxCommandRequest {
|
|
session_id: self.id.clone(),
|
|
client_correlation_id: format!("rust-client-{}", kind.as_str_name()),
|
|
command: Some(MxCommand {
|
|
kind: kind as i32,
|
|
payload: Some(payload),
|
|
}),
|
|
}
|
|
}
|
|
}
|
|
|
|
fn ensure_bulk_size(name: &'static str, len: usize) -> Result<(), Error> {
|
|
if len > MAX_BULK_ITEMS {
|
|
Err(Error::InvalidArgument {
|
|
name: name.to_owned(),
|
|
detail: format!("bulk commands are limited to {MAX_BULK_ITEMS} item(s)"),
|
|
})
|
|
} else {
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
fn register_server_handle(reply: &MxCommandReply) -> i32 {
|
|
match reply.payload.as_ref() {
|
|
Some(mx_command_reply::Payload::Register(register)) => register.server_handle,
|
|
_ => reply
|
|
.return_value
|
|
.as_ref()
|
|
.and_then(int32_reply_value)
|
|
.unwrap_or_default(),
|
|
}
|
|
}
|
|
|
|
fn add_item_handle(reply: &MxCommandReply) -> i32 {
|
|
match reply.payload.as_ref() {
|
|
Some(mx_command_reply::Payload::AddItem(add_item)) => add_item.item_handle,
|
|
_ => reply
|
|
.return_value
|
|
.as_ref()
|
|
.and_then(int32_reply_value)
|
|
.unwrap_or_default(),
|
|
}
|
|
}
|
|
|
|
fn add_item2_handle(reply: &MxCommandReply) -> i32 {
|
|
match reply.payload.as_ref() {
|
|
Some(mx_command_reply::Payload::AddItem2(add_item)) => add_item.item_handle,
|
|
_ => reply
|
|
.return_value
|
|
.as_ref()
|
|
.and_then(int32_reply_value)
|
|
.unwrap_or_default(),
|
|
}
|
|
}
|
|
|
|
enum BulkReplyKind {
|
|
AddItemBulk,
|
|
AdviseItemBulk,
|
|
RemoveItemBulk,
|
|
UnAdviseItemBulk,
|
|
SubscribeBulk,
|
|
UnsubscribeBulk,
|
|
}
|
|
|
|
fn bulk_results(reply: MxCommandReply, kind: BulkReplyKind) -> Vec<SubscribeResult> {
|
|
match (reply.payload, kind) {
|
|
(Some(mx_command_reply::Payload::AddItemBulk(reply)), BulkReplyKind::AddItemBulk) => {
|
|
reply.results
|
|
}
|
|
(Some(mx_command_reply::Payload::AdviseItemBulk(reply)), BulkReplyKind::AdviseItemBulk) => {
|
|
reply.results
|
|
}
|
|
(Some(mx_command_reply::Payload::RemoveItemBulk(reply)), BulkReplyKind::RemoveItemBulk) => {
|
|
reply.results
|
|
}
|
|
(
|
|
Some(mx_command_reply::Payload::UnAdviseItemBulk(reply)),
|
|
BulkReplyKind::UnAdviseItemBulk,
|
|
) => reply.results,
|
|
(Some(mx_command_reply::Payload::SubscribeBulk(reply)), BulkReplyKind::SubscribeBulk) => {
|
|
reply.results
|
|
}
|
|
(
|
|
Some(mx_command_reply::Payload::UnsubscribeBulk(reply)),
|
|
BulkReplyKind::UnsubscribeBulk,
|
|
) => reply.results,
|
|
_ => Vec::new(),
|
|
}
|
|
}
|
|
|
|
enum BulkWriteReplyKind {
|
|
Write,
|
|
Write2,
|
|
WriteSecured,
|
|
WriteSecured2,
|
|
}
|
|
|
|
fn bulk_write_results(reply: MxCommandReply, kind: BulkWriteReplyKind) -> Vec<BulkWriteResult> {
|
|
match (reply.payload, kind) {
|
|
(Some(mx_command_reply::Payload::WriteBulk(reply)), BulkWriteReplyKind::Write) => {
|
|
reply.results
|
|
}
|
|
(Some(mx_command_reply::Payload::Write2Bulk(reply)), BulkWriteReplyKind::Write2) => {
|
|
reply.results
|
|
}
|
|
(
|
|
Some(mx_command_reply::Payload::WriteSecuredBulk(reply)),
|
|
BulkWriteReplyKind::WriteSecured,
|
|
) => reply.results,
|
|
(
|
|
Some(mx_command_reply::Payload::WriteSecured2Bulk(reply)),
|
|
BulkWriteReplyKind::WriteSecured2,
|
|
) => reply.results,
|
|
_ => Vec::new(),
|
|
}
|
|
}
|
|
|
|
fn int32_reply_value(value: &ProtoMxValue) -> Option<i32> {
|
|
match value.kind.as_ref()? {
|
|
crate::generated::mxaccess_gateway::v1::mx_value::Kind::Int32Value(value) => Some(*value),
|
|
_ => None,
|
|
}
|
|
}
|