//! Typed handle around an opened gateway session. //! //! [`Session`] wraps an `OpenSession` reply (just the session id) plus a //! cloned [`GatewayClient`] and offers Rust-shaped methods for the //! command surface that the worker exposes — `Register`, `AddItem`, //! bulk subscribe variants, `Write`/`Write2`, and the event stream. //! //! Bulk commands enforce a 1000-item cap before contacting the worker, in //! line with the gateway's documented `MAX_BULK_ITEMS`. use std::sync::atomic::{AtomicU64, Ordering}; use crate::client::{EventStream, GatewayClient}; use crate::error::{ensure_protocol_success, Error}; use crate::generated::mxaccess_gateway::v1::mx_command::Payload; use crate::generated::mxaccess_gateway::v1::mx_command_reply; use crate::generated::mxaccess_gateway::v1::{ AddItem2Command, AddItemBulkCommand, AddItemCommand, AdviseCommand, AdviseItemBulkCommand, BulkReadResult, BulkWriteResult, CloseSessionRequest, MxCommand, MxCommandKind, MxCommandReply, MxCommandRequest, MxValue as ProtoMxValue, OpenSessionRequest, ReadBulkCommand, RegisterCommand, RemoveItemBulkCommand, RemoveItemCommand, StreamEventsRequest, SubscribeBulkCommand, SubscribeResult, UnAdviseCommand, UnAdviseItemBulkCommand, UnsubscribeBulkCommand, Write2BulkCommand, Write2BulkEntry, Write2Command, WriteBulkCommand, WriteBulkEntry, WriteCommand, WriteSecured2BulkCommand, WriteSecured2BulkEntry, WriteSecuredBulkCommand, WriteSecuredBulkEntry, }; use crate::value::MxValue; const MAX_BULK_ITEMS: usize = 1_000; /// Process-wide monotonic sequence used by [`next_correlation_id`]. static CORRELATION_SEQUENCE: AtomicU64 = AtomicU64::new(1); /// Build a per-call correlation id that embeds the supplied `label`. /// /// The returned token is opaque and guaranteed to be unique within the /// current process: every call increments a process-wide atomic counter, /// so concurrent CLI smokes and library callers on the same machine produce /// distinct ids that gateway logs can tell apart. The token carries no /// embedded secret beyond `label`. /// /// The exact textual format (currently `rust-client-{label}-{N}`) is *not* /// part of the public contract — callers must not parse it. The crate root /// re-exports this helper as /// [`zb_mom_ww_mxgateway_client::next_correlation_id`] so out-of-tree /// consumers can build correlation ids without referencing the `session` /// module path. pub fn next_correlation_id(label: &str) -> String { let sequence = CORRELATION_SEQUENCE.fetch_add(1, Ordering::Relaxed); format!("rust-client-{label}-{sequence}") } /// Handle to an opened gateway session. /// /// `Session` carries the gateway-issued session id and a cloned /// [`GatewayClient`]. All methods are async and stateless on the client /// side: the gateway tracks per-session worker state. Drop the handle and /// call [`Session::close`] to release the worker; the gateway also reaps /// orphaned sessions on its own schedule. #[derive(Clone)] pub struct Session { id: String, client: GatewayClient, } impl Session { pub(crate) fn new(id: impl Into, client: GatewayClient) -> Self { Self { id: id.into(), client, } } /// Borrow the gateway-assigned session id. pub fn id(&self) -> &str { &self.id } /// Convenience constructor that issues `OpenSession` with the supplied /// client session name and returns the resulting [`Session`]. /// /// # Errors /// /// Propagates errors from /// [`GatewayClient::open_session`](crate::client::GatewayClient::open_session). pub async fn open(client: GatewayClient, client_session_name: &str) -> Result { client .open_session(OpenSessionRequest { client_session_name: client_session_name.to_owned(), ..OpenSessionRequest::default() }) .await } /// Issue `CloseSession` against this session id. /// /// # Errors /// /// Returns [`Error::ProtocolStatus`] if the gateway returns a non-OK /// envelope and any transport/status errors propagated by tonic. pub async fn close(&self) -> Result<(), Error> { let reply = self .client .close_session_raw(CloseSessionRequest { session_id: self.id.clone(), client_correlation_id: next_correlation_id("close-session"), }) .await?; ensure_protocol_success("close session", reply.protocol_status.as_ref())?; Ok(()) } /// Run MXAccess `Register` and return the assigned `ServerHandle`. /// /// # Errors /// /// Returns [`Error::Command`] if the worker reports a non-OK status, /// plus transport/status errors from tonic. pub async fn register(&self, client_name: &str) -> Result { let reply = self .invoke( MxCommandKind::Register, Payload::Register(RegisterCommand { client_name: client_name.to_owned(), }), ) .await?; register_server_handle(&reply) } /// Run MXAccess `AddItem` against `server_handle` and return the /// assigned `ItemHandle`. /// /// # Errors /// /// Returns [`Error::Command`] when MXAccess rejects the item /// definition, plus transport/status errors from tonic. pub async fn add_item(&self, server_handle: i32, item_definition: &str) -> Result { let reply = self .invoke( MxCommandKind::AddItem, Payload::AddItem(AddItemCommand { server_handle, item_definition: item_definition.to_owned(), }), ) .await?; add_item_handle(&reply) } /// Run MXAccess `AddItem2` (item with a caller-supplied context string) /// and return the assigned `ItemHandle`. /// /// # Errors /// /// Same conditions as [`Session::add_item`]. pub async fn add_item2( &self, server_handle: i32, item_definition: &str, item_context: &str, ) -> Result { let reply = self .invoke( MxCommandKind::AddItem2, Payload::AddItem2(AddItem2Command { server_handle, item_definition: item_definition.to_owned(), item_context: item_context.to_owned(), }), ) .await?; add_item2_handle(&reply) } /// Run MXAccess `RemoveItem` for the given handle pair. /// /// # Errors /// /// Returns [`Error::Command`] on a non-OK worker status, plus the /// usual transport/status errors. pub async fn remove_item(&self, server_handle: i32, item_handle: i32) -> Result<(), Error> { self.invoke( MxCommandKind::RemoveItem, Payload::RemoveItem(RemoveItemCommand { server_handle, item_handle, }), ) .await?; Ok(()) } /// Run MXAccess `Advise` to start receiving change notifications for /// the given item. /// /// # Errors /// /// Returns [`Error::Command`] when the worker reports a non-OK status. pub async fn advise(&self, server_handle: i32, item_handle: i32) -> Result<(), Error> { self.invoke( MxCommandKind::Advise, Payload::Advise(AdviseCommand { server_handle, item_handle, }), ) .await?; Ok(()) } /// Run MXAccess `UnAdvise` to stop change notifications for the given /// item. /// /// # Errors /// /// Returns [`Error::Command`] when the worker reports a non-OK status. pub async fn un_advise(&self, server_handle: i32, item_handle: i32) -> Result<(), Error> { self.invoke( MxCommandKind::UnAdvise, Payload::UnAdvise(UnAdviseCommand { server_handle, item_handle, }), ) .await?; Ok(()) } /// Bulk variant of [`Session::add_item`]. Each tag address yields one /// `SubscribeResult` in the returned vector. /// /// # Errors /// /// Returns [`Error::InvalidArgument`] when the input exceeds the /// gateway's 1000-item bulk cap, plus the usual command-level errors. pub async fn add_item_bulk( &self, server_handle: i32, tag_addresses: Vec, ) -> Result, Error> { ensure_bulk_size("tag_addresses", tag_addresses.len())?; let reply = self .invoke( MxCommandKind::AddItemBulk, Payload::AddItemBulk(AddItemBulkCommand { server_handle, tag_addresses, }), ) .await?; bulk_results(reply, BulkReplyKind::AddItem) } /// Bulk variant of [`Session::advise`]. /// /// # Errors /// /// Same conditions as [`Session::add_item_bulk`]. pub async fn advise_item_bulk( &self, server_handle: i32, item_handles: Vec, ) -> Result, Error> { ensure_bulk_size("item_handles", item_handles.len())?; let reply = self .invoke( MxCommandKind::AdviseItemBulk, Payload::AdviseItemBulk(AdviseItemBulkCommand { server_handle, item_handles, }), ) .await?; bulk_results(reply, BulkReplyKind::AdviseItem) } /// Bulk variant of [`Session::remove_item`]. /// /// # Errors /// /// Same conditions as [`Session::add_item_bulk`]. pub async fn remove_item_bulk( &self, server_handle: i32, item_handles: Vec, ) -> Result, Error> { ensure_bulk_size("item_handles", item_handles.len())?; let reply = self .invoke( MxCommandKind::RemoveItemBulk, Payload::RemoveItemBulk(RemoveItemBulkCommand { server_handle, item_handles, }), ) .await?; bulk_results(reply, BulkReplyKind::RemoveItem) } /// Bulk variant of [`Session::un_advise`]. /// /// # Errors /// /// Same conditions as [`Session::add_item_bulk`]. pub async fn un_advise_item_bulk( &self, server_handle: i32, item_handles: Vec, ) -> Result, Error> { ensure_bulk_size("item_handles", item_handles.len())?; let reply = self .invoke( MxCommandKind::UnAdviseItemBulk, Payload::UnAdviseItemBulk(UnAdviseItemBulkCommand { server_handle, item_handles, }), ) .await?; bulk_results(reply, BulkReplyKind::UnAdviseItem) } /// Bulk `Subscribe` (atomic add-and-advise) for a list of tag addresses. /// /// # Errors /// /// Same conditions as [`Session::add_item_bulk`]. pub async fn subscribe_bulk( &self, server_handle: i32, tag_addresses: Vec, ) -> Result, Error> { ensure_bulk_size("tag_addresses", tag_addresses.len())?; let reply = self .invoke( MxCommandKind::SubscribeBulk, Payload::SubscribeBulk(SubscribeBulkCommand { server_handle, tag_addresses, }), ) .await?; bulk_results(reply, BulkReplyKind::Subscribe) } /// Bulk `Unsubscribe` (atomic un-advise-and-remove) for a list of /// item handles. /// /// # Errors /// /// Same conditions as [`Session::add_item_bulk`]. pub async fn unsubscribe_bulk( &self, server_handle: i32, item_handles: Vec, ) -> Result, Error> { ensure_bulk_size("item_handles", item_handles.len())?; let reply = self .invoke( MxCommandKind::UnsubscribeBulk, Payload::UnsubscribeBulk(UnsubscribeBulkCommand { server_handle, item_handles, }), ) .await?; bulk_results(reply, BulkReplyKind::Unsubscribe) } /// Bulk `Read` — snapshot the current value for each requested tag. /// /// MXAccess COM has no synchronous `Read`; the worker satisfies this by /// returning the most recent cached `OnDataChange` value when the tag is /// already advised (`was_cached = true`), or by taking a full AddItem + /// Advise + wait + UnAdvise + RemoveItem snapshot lifecycle otherwise. /// `timeout_ms == 0` lets the worker pick its default (1000 ms). /// Per-tag failures appear as `BulkReadResult` entries with /// `was_successful = false`; the call never errors on per-tag failure. /// /// # Errors /// /// Same conditions as [`Session::add_item_bulk`]. pub async fn read_bulk>( &self, server_handle: i32, tag_addresses: &[S], timeout_ms: u32, ) -> Result, Error> { ensure_bulk_size("tag_addresses", tag_addresses.len())?; let reply = self .invoke( MxCommandKind::ReadBulk, Payload::ReadBulk(ReadBulkCommand { server_handle, tag_addresses: tag_addresses .iter() .map(|tag| tag.as_ref().to_owned()) .collect(), timeout_ms, }), ) .await?; match reply.payload { Some(mx_command_reply::Payload::ReadBulk(reply)) => Ok(reply.results), _ => Err(Error::MalformedReply { detail: "read_bulk reply did not carry a ReadBulk payload".to_owned(), }), } } /// Bulk `Write` (sequential MXAccess Write per entry, on the worker's STA). /// /// Per-entry MXAccess failures are reported as `BulkWriteResult` entries /// with `was_successful = false`; the call never errors on per-entry /// failure. Protocol-level failures still surface as [`Error::Command`]. /// /// # Errors /// /// Same conditions as [`Session::add_item_bulk`], plus the usual /// transport/status errors. pub async fn write_bulk( &self, server_handle: i32, entries: Vec, ) -> Result, Error> { ensure_bulk_size("entries", entries.len())?; let reply = self .invoke( MxCommandKind::WriteBulk, Payload::WriteBulk(WriteBulkCommand { server_handle, entries, }), ) .await?; bulk_write_results(reply, BulkWriteReplyKind::Write) } /// Bulk `Write2` (timestamped) — see [`Session::write_bulk`]. /// /// # Errors /// /// Same conditions as [`Session::write_bulk`]. pub async fn write2_bulk( &self, server_handle: i32, entries: Vec, ) -> Result, Error> { ensure_bulk_size("entries", entries.len())?; let reply = self .invoke( MxCommandKind::Write2Bulk, Payload::Write2Bulk(Write2BulkCommand { server_handle, entries, }), ) .await?; bulk_write_results(reply, BulkWriteReplyKind::Write2) } /// Bulk `WriteSecured` — credential-sensitive values follow the same /// redaction contract as the single-item `write_secured` path. /// /// # Errors /// /// Same conditions as [`Session::write_bulk`]. pub async fn write_secured_bulk( &self, server_handle: i32, entries: Vec, ) -> Result, Error> { ensure_bulk_size("entries", entries.len())?; let reply = self .invoke( MxCommandKind::WriteSecuredBulk, Payload::WriteSecuredBulk(WriteSecuredBulkCommand { server_handle, entries, }), ) .await?; bulk_write_results(reply, BulkWriteReplyKind::WriteSecured) } /// Bulk `WriteSecured2` (timestamped) — see [`Session::write_secured_bulk`]. /// /// # Errors /// /// Same conditions as [`Session::write_bulk`]. pub async fn write_secured2_bulk( &self, server_handle: i32, entries: Vec, ) -> Result, Error> { ensure_bulk_size("entries", entries.len())?; let reply = self .invoke( MxCommandKind::WriteSecured2Bulk, Payload::WriteSecured2Bulk(WriteSecured2BulkCommand { server_handle, entries, }), ) .await?; bulk_write_results(reply, BulkWriteReplyKind::WriteSecured2) } /// Run MXAccess `Write` (single-value, no caller-supplied timestamp). /// /// # Errors /// /// Returns [`Error::Command`] for non-OK worker statuses, plus the /// usual transport/status errors. pub async fn write( &self, server_handle: i32, item_handle: i32, value: MxValue, user_id: i32, ) -> Result<(), Error> { self.invoke( MxCommandKind::Write, Payload::Write(WriteCommand { server_handle, item_handle, value: Some(value.into_proto()), user_id, }), ) .await?; Ok(()) } /// Run MXAccess `Write2` (single-value with caller-supplied timestamp). /// /// # Errors /// /// Same conditions as [`Session::write`]. pub async fn write2( &self, server_handle: i32, item_handle: i32, value: MxValue, timestamp_value: MxValue, user_id: i32, ) -> Result<(), Error> { self.invoke( MxCommandKind::Write2, Payload::Write2(Write2Command { server_handle, item_handle, value: Some(value.into_proto()), timestamp_value: Some(timestamp_value.into_proto()), user_id, }), ) .await?; Ok(()) } /// Open the per-session event stream from the beginning. /// /// # Errors /// /// Returns the `tonic::Status` mapped through [`Error::from`] when the /// gateway rejects the subscription. pub async fn events(&self) -> Result { self.events_after(0).await } /// Open the per-session event stream, requesting only events whose /// `worker_sequence` is greater than `after_worker_sequence`. Pass `0` /// to receive every buffered event. /// /// # Errors /// /// Same conditions as [`Session::events`]. pub async fn events_after(&self, after_worker_sequence: u64) -> Result { self.client .stream_events(StreamEventsRequest { session_id: self.id.clone(), after_worker_sequence, }) .await } /// Issue a raw `Invoke` for an arbitrary command, without filtering on /// the protocol status. Useful when callers need the full reply for /// commands not yet wrapped by `Session`. /// /// # Errors /// /// Returns the `tonic::Status` mapped through [`Error::from`]. pub async fn invoke_raw( &self, kind: MxCommandKind, payload: Payload, ) -> Result { self.client .invoke_raw(self.command_request(kind, payload)) .await } /// Issue an `Invoke` for an arbitrary command and surface a non-OK /// reply as [`Error::Command`]. /// /// # Errors /// /// Returns [`Error::Command`] for non-OK worker statuses plus any /// errors propagated by [`invoke_raw`](Self::invoke_raw). pub async fn invoke( &self, kind: MxCommandKind, payload: Payload, ) -> Result { self.client .invoke(self.command_request(kind, payload)) .await } fn command_request(&self, kind: MxCommandKind, payload: Payload) -> MxCommandRequest { MxCommandRequest { session_id: self.id.clone(), client_correlation_id: next_correlation_id(kind.as_str_name()), command: Some(MxCommand { kind: kind as i32, payload: Some(payload), }), } } } fn ensure_bulk_size(name: &'static str, len: usize) -> Result<(), Error> { if len > MAX_BULK_ITEMS { Err(Error::InvalidArgument { name: name.to_owned(), detail: format!("bulk commands are limited to {MAX_BULK_ITEMS} item(s)"), }) } else { Ok(()) } } fn register_server_handle(reply: &MxCommandReply) -> Result { match reply.payload.as_ref() { Some(mx_command_reply::Payload::Register(register)) => Ok(register.server_handle), _ => reply .return_value .as_ref() .and_then(int32_reply_value) .ok_or_else(|| Error::MalformedReply { detail: "register reply lacked a server_handle payload or int32 return_value" .to_owned(), }), } } fn add_item_handle(reply: &MxCommandReply) -> Result { match reply.payload.as_ref() { Some(mx_command_reply::Payload::AddItem(add_item)) => Ok(add_item.item_handle), _ => reply .return_value .as_ref() .and_then(int32_reply_value) .ok_or_else(|| Error::MalformedReply { detail: "add_item reply lacked an item_handle payload or int32 return_value" .to_owned(), }), } } fn add_item2_handle(reply: &MxCommandReply) -> Result { match reply.payload.as_ref() { Some(mx_command_reply::Payload::AddItem2(add_item)) => Ok(add_item.item_handle), _ => reply .return_value .as_ref() .and_then(int32_reply_value) .ok_or_else(|| Error::MalformedReply { detail: "add_item2 reply lacked an item_handle payload or int32 return_value" .to_owned(), }), } } enum BulkReplyKind { AddItem, AdviseItem, RemoveItem, UnAdviseItem, Subscribe, Unsubscribe, } fn bulk_results(reply: MxCommandReply, kind: BulkReplyKind) -> Result, Error> { match (reply.payload, kind) { (Some(mx_command_reply::Payload::AddItemBulk(reply)), BulkReplyKind::AddItem) => { Ok(reply.results) } (Some(mx_command_reply::Payload::AdviseItemBulk(reply)), BulkReplyKind::AdviseItem) => { Ok(reply.results) } (Some(mx_command_reply::Payload::RemoveItemBulk(reply)), BulkReplyKind::RemoveItem) => { Ok(reply.results) } (Some(mx_command_reply::Payload::UnAdviseItemBulk(reply)), BulkReplyKind::UnAdviseItem) => { Ok(reply.results) } (Some(mx_command_reply::Payload::SubscribeBulk(reply)), BulkReplyKind::Subscribe) => { Ok(reply.results) } (Some(mx_command_reply::Payload::UnsubscribeBulk(reply)), BulkReplyKind::Unsubscribe) => { Ok(reply.results) } _ => Err(Error::MalformedReply { detail: "bulk subscribe reply did not carry the expected payload arm".to_owned(), }), } } enum BulkWriteReplyKind { Write, Write2, WriteSecured, WriteSecured2, } fn bulk_write_results( reply: MxCommandReply, kind: BulkWriteReplyKind, ) -> Result, Error> { match (reply.payload, kind) { (Some(mx_command_reply::Payload::WriteBulk(reply)), BulkWriteReplyKind::Write) => { Ok(reply.results) } (Some(mx_command_reply::Payload::Write2Bulk(reply)), BulkWriteReplyKind::Write2) => { Ok(reply.results) } ( Some(mx_command_reply::Payload::WriteSecuredBulk(reply)), BulkWriteReplyKind::WriteSecured, ) => Ok(reply.results), ( Some(mx_command_reply::Payload::WriteSecured2Bulk(reply)), BulkWriteReplyKind::WriteSecured2, ) => Ok(reply.results), _ => Err(Error::MalformedReply { detail: "bulk write reply did not carry the expected payload arm".to_owned(), }), } } fn int32_reply_value(value: &ProtoMxValue) -> Option { match value.kind.as_ref()? { crate::generated::mxaccess_gateway::v1::mx_value::Kind::Int32Value(value) => Some(*value), _ => None, } }