//! Typed handle around an opened gateway session. //! //! [`Session`] wraps an `OpenSession` reply (just the session id) plus a //! cloned [`GatewayClient`] and offers Rust-shaped methods for the //! command surface that the worker exposes — `Register`, `AddItem`, //! bulk subscribe variants, `Write`/`Write2`, and the event stream. //! //! Bulk commands enforce a 1000-item cap before contacting the worker, in //! line with the gateway's documented `MAX_BULK_ITEMS`. use crate::client::{EventStream, GatewayClient}; use crate::error::{ensure_protocol_success, Error}; use crate::generated::mxaccess_gateway::v1::mx_command::Payload; use crate::generated::mxaccess_gateway::v1::mx_command_reply; use crate::generated::mxaccess_gateway::v1::{ AddItem2Command, AddItemBulkCommand, AddItemCommand, AdviseCommand, AdviseItemBulkCommand, CloseSessionRequest, MxCommand, MxCommandKind, MxCommandReply, MxCommandRequest, MxValue as ProtoMxValue, OpenSessionRequest, RegisterCommand, RemoveItemBulkCommand, RemoveItemCommand, StreamEventsRequest, SubscribeBulkCommand, SubscribeResult, UnAdviseCommand, UnAdviseItemBulkCommand, UnsubscribeBulkCommand, Write2Command, WriteCommand, }; use crate::value::MxValue; const MAX_BULK_ITEMS: usize = 1_000; /// Handle to an opened gateway session. /// /// `Session` carries the gateway-issued session id and a cloned /// [`GatewayClient`]. All methods are async and stateless on the client /// side: the gateway tracks per-session worker state. Drop the handle and /// call [`Session::close`] to release the worker; the gateway also reaps /// orphaned sessions on its own schedule. #[derive(Clone)] pub struct Session { id: String, client: GatewayClient, } impl Session { pub(crate) fn new(id: impl Into, client: GatewayClient) -> Self { Self { id: id.into(), client, } } /// Borrow the gateway-assigned session id. pub fn id(&self) -> &str { &self.id } /// Convenience constructor that issues `OpenSession` with the supplied /// client session name and returns the resulting [`Session`]. /// /// # Errors /// /// Propagates errors from /// [`GatewayClient::open_session`](crate::client::GatewayClient::open_session). pub async fn open(client: GatewayClient, client_session_name: &str) -> Result { client .open_session(OpenSessionRequest { client_session_name: client_session_name.to_owned(), ..OpenSessionRequest::default() }) .await } /// Issue `CloseSession` against this session id. /// /// # Errors /// /// Returns [`Error::ProtocolStatus`] if the gateway returns a non-OK /// envelope and any transport/status errors propagated by tonic. pub async fn close(&self) -> Result<(), Error> { let reply = self .client .close_session_raw(CloseSessionRequest { session_id: self.id.clone(), client_correlation_id: "rust-client-close-session".to_owned(), }) .await?; ensure_protocol_success("close session", reply.protocol_status.as_ref())?; Ok(()) } /// Run MXAccess `Register` and return the assigned `ServerHandle`. /// /// # Errors /// /// Returns [`Error::Command`] if the worker reports a non-OK status, /// plus transport/status errors from tonic. pub async fn register(&self, client_name: &str) -> Result { let reply = self .invoke( MxCommandKind::Register, Payload::Register(RegisterCommand { client_name: client_name.to_owned(), }), ) .await?; Ok(register_server_handle(&reply)) } /// Run MXAccess `AddItem` against `server_handle` and return the /// assigned `ItemHandle`. /// /// # Errors /// /// Returns [`Error::Command`] when MXAccess rejects the item /// definition, plus transport/status errors from tonic. pub async fn add_item(&self, server_handle: i32, item_definition: &str) -> Result { let reply = self .invoke( MxCommandKind::AddItem, Payload::AddItem(AddItemCommand { server_handle, item_definition: item_definition.to_owned(), }), ) .await?; Ok(add_item_handle(&reply)) } /// Run MXAccess `AddItem2` (item with a caller-supplied context string) /// and return the assigned `ItemHandle`. /// /// # Errors /// /// Same conditions as [`Session::add_item`]. pub async fn add_item2( &self, server_handle: i32, item_definition: &str, item_context: &str, ) -> Result { let reply = self .invoke( MxCommandKind::AddItem2, Payload::AddItem2(AddItem2Command { server_handle, item_definition: item_definition.to_owned(), item_context: item_context.to_owned(), }), ) .await?; Ok(add_item2_handle(&reply)) } /// Run MXAccess `RemoveItem` for the given handle pair. /// /// # Errors /// /// Returns [`Error::Command`] on a non-OK worker status, plus the /// usual transport/status errors. pub async fn remove_item(&self, server_handle: i32, item_handle: i32) -> Result<(), Error> { self.invoke( MxCommandKind::RemoveItem, Payload::RemoveItem(RemoveItemCommand { server_handle, item_handle, }), ) .await?; Ok(()) } /// Run MXAccess `Advise` to start receiving change notifications for /// the given item. /// /// # Errors /// /// Returns [`Error::Command`] when the worker reports a non-OK status. pub async fn advise(&self, server_handle: i32, item_handle: i32) -> Result<(), Error> { self.invoke( MxCommandKind::Advise, Payload::Advise(AdviseCommand { server_handle, item_handle, }), ) .await?; Ok(()) } /// Run MXAccess `UnAdvise` to stop change notifications for the given /// item. /// /// # Errors /// /// Returns [`Error::Command`] when the worker reports a non-OK status. pub async fn un_advise(&self, server_handle: i32, item_handle: i32) -> Result<(), Error> { self.invoke( MxCommandKind::UnAdvise, Payload::UnAdvise(UnAdviseCommand { server_handle, item_handle, }), ) .await?; Ok(()) } /// Bulk variant of [`Session::add_item`]. Each tag address yields one /// `SubscribeResult` in the returned vector. /// /// # Errors /// /// Returns [`Error::InvalidArgument`] when the input exceeds the /// gateway's 1000-item bulk cap, plus the usual command-level errors. pub async fn add_item_bulk( &self, server_handle: i32, tag_addresses: Vec, ) -> Result, Error> { ensure_bulk_size("tag_addresses", tag_addresses.len())?; let reply = self .invoke( MxCommandKind::AddItemBulk, Payload::AddItemBulk(AddItemBulkCommand { server_handle, tag_addresses, }), ) .await?; Ok(bulk_results(reply, BulkReplyKind::AddItemBulk)) } /// Bulk variant of [`Session::advise`]. /// /// # Errors /// /// Same conditions as [`Session::add_item_bulk`]. pub async fn advise_item_bulk( &self, server_handle: i32, item_handles: Vec, ) -> Result, Error> { ensure_bulk_size("item_handles", item_handles.len())?; let reply = self .invoke( MxCommandKind::AdviseItemBulk, Payload::AdviseItemBulk(AdviseItemBulkCommand { server_handle, item_handles, }), ) .await?; Ok(bulk_results(reply, BulkReplyKind::AdviseItemBulk)) } /// Bulk variant of [`Session::remove_item`]. /// /// # Errors /// /// Same conditions as [`Session::add_item_bulk`]. pub async fn remove_item_bulk( &self, server_handle: i32, item_handles: Vec, ) -> Result, Error> { ensure_bulk_size("item_handles", item_handles.len())?; let reply = self .invoke( MxCommandKind::RemoveItemBulk, Payload::RemoveItemBulk(RemoveItemBulkCommand { server_handle, item_handles, }), ) .await?; Ok(bulk_results(reply, BulkReplyKind::RemoveItemBulk)) } /// Bulk variant of [`Session::un_advise`]. /// /// # Errors /// /// Same conditions as [`Session::add_item_bulk`]. pub async fn un_advise_item_bulk( &self, server_handle: i32, item_handles: Vec, ) -> Result, Error> { ensure_bulk_size("item_handles", item_handles.len())?; let reply = self .invoke( MxCommandKind::UnAdviseItemBulk, Payload::UnAdviseItemBulk(UnAdviseItemBulkCommand { server_handle, item_handles, }), ) .await?; Ok(bulk_results(reply, BulkReplyKind::UnAdviseItemBulk)) } /// Bulk `Subscribe` (atomic add-and-advise) for a list of tag addresses. /// /// # Errors /// /// Same conditions as [`Session::add_item_bulk`]. pub async fn subscribe_bulk( &self, server_handle: i32, tag_addresses: Vec, ) -> Result, Error> { ensure_bulk_size("tag_addresses", tag_addresses.len())?; let reply = self .invoke( MxCommandKind::SubscribeBulk, Payload::SubscribeBulk(SubscribeBulkCommand { server_handle, tag_addresses, }), ) .await?; Ok(bulk_results(reply, BulkReplyKind::SubscribeBulk)) } /// Bulk `Unsubscribe` (atomic un-advise-and-remove) for a list of /// item handles. /// /// # Errors /// /// Same conditions as [`Session::add_item_bulk`]. pub async fn unsubscribe_bulk( &self, server_handle: i32, item_handles: Vec, ) -> Result, Error> { ensure_bulk_size("item_handles", item_handles.len())?; let reply = self .invoke( MxCommandKind::UnsubscribeBulk, Payload::UnsubscribeBulk(UnsubscribeBulkCommand { server_handle, item_handles, }), ) .await?; Ok(bulk_results(reply, BulkReplyKind::UnsubscribeBulk)) } /// Run MXAccess `Write` (single-value, no caller-supplied timestamp). /// /// # Errors /// /// Returns [`Error::Command`] for non-OK worker statuses, plus the /// usual transport/status errors. pub async fn write( &self, server_handle: i32, item_handle: i32, value: MxValue, user_id: i32, ) -> Result<(), Error> { self.invoke( MxCommandKind::Write, Payload::Write(WriteCommand { server_handle, item_handle, value: Some(value.into_proto()), user_id, }), ) .await?; Ok(()) } /// Run MXAccess `Write2` (single-value with caller-supplied timestamp). /// /// # Errors /// /// Same conditions as [`Session::write`]. pub async fn write2( &self, server_handle: i32, item_handle: i32, value: MxValue, timestamp_value: MxValue, user_id: i32, ) -> Result<(), Error> { self.invoke( MxCommandKind::Write2, Payload::Write2(Write2Command { server_handle, item_handle, value: Some(value.into_proto()), timestamp_value: Some(timestamp_value.into_proto()), user_id, }), ) .await?; Ok(()) } /// Open the per-session event stream from the beginning. /// /// # Errors /// /// Returns the `tonic::Status` mapped through [`Error::from`] when the /// gateway rejects the subscription. pub async fn events(&self) -> Result { self.events_after(0).await } /// Open the per-session event stream, requesting only events whose /// `worker_sequence` is greater than `after_worker_sequence`. Pass `0` /// to receive every buffered event. /// /// # Errors /// /// Same conditions as [`Session::events`]. pub async fn events_after(&self, after_worker_sequence: u64) -> Result { self.client .stream_events(StreamEventsRequest { session_id: self.id.clone(), after_worker_sequence, }) .await } /// Issue a raw `Invoke` for an arbitrary command, without filtering on /// the protocol status. Useful when callers need the full reply for /// commands not yet wrapped by `Session`. /// /// # Errors /// /// Returns the `tonic::Status` mapped through [`Error::from`]. pub async fn invoke_raw( &self, kind: MxCommandKind, payload: Payload, ) -> Result { self.client .invoke_raw(self.command_request(kind, payload)) .await } /// Issue an `Invoke` for an arbitrary command and surface a non-OK /// reply as [`Error::Command`]. /// /// # Errors /// /// Returns [`Error::Command`] for non-OK worker statuses plus any /// errors propagated by [`invoke_raw`](Self::invoke_raw). pub async fn invoke( &self, kind: MxCommandKind, payload: Payload, ) -> Result { self.client .invoke(self.command_request(kind, payload)) .await } fn command_request(&self, kind: MxCommandKind, payload: Payload) -> MxCommandRequest { MxCommandRequest { session_id: self.id.clone(), client_correlation_id: format!("rust-client-{}", kind.as_str_name()), command: Some(MxCommand { kind: kind as i32, payload: Some(payload), }), } } } fn ensure_bulk_size(name: &'static str, len: usize) -> Result<(), Error> { if len > MAX_BULK_ITEMS { Err(Error::InvalidArgument { name: name.to_owned(), detail: format!("bulk commands are limited to {MAX_BULK_ITEMS} item(s)"), }) } else { Ok(()) } } fn register_server_handle(reply: &MxCommandReply) -> i32 { match reply.payload.as_ref() { Some(mx_command_reply::Payload::Register(register)) => register.server_handle, _ => reply .return_value .as_ref() .and_then(int32_reply_value) .unwrap_or_default(), } } fn add_item_handle(reply: &MxCommandReply) -> i32 { match reply.payload.as_ref() { Some(mx_command_reply::Payload::AddItem(add_item)) => add_item.item_handle, _ => reply .return_value .as_ref() .and_then(int32_reply_value) .unwrap_or_default(), } } fn add_item2_handle(reply: &MxCommandReply) -> i32 { match reply.payload.as_ref() { Some(mx_command_reply::Payload::AddItem2(add_item)) => add_item.item_handle, _ => reply .return_value .as_ref() .and_then(int32_reply_value) .unwrap_or_default(), } } enum BulkReplyKind { AddItemBulk, AdviseItemBulk, RemoveItemBulk, UnAdviseItemBulk, SubscribeBulk, UnsubscribeBulk, } fn bulk_results(reply: MxCommandReply, kind: BulkReplyKind) -> Vec { match (reply.payload, kind) { (Some(mx_command_reply::Payload::AddItemBulk(reply)), BulkReplyKind::AddItemBulk) => { reply.results } (Some(mx_command_reply::Payload::AdviseItemBulk(reply)), BulkReplyKind::AdviseItemBulk) => { reply.results } (Some(mx_command_reply::Payload::RemoveItemBulk(reply)), BulkReplyKind::RemoveItemBulk) => { reply.results } ( Some(mx_command_reply::Payload::UnAdviseItemBulk(reply)), BulkReplyKind::UnAdviseItemBulk, ) => reply.results, (Some(mx_command_reply::Payload::SubscribeBulk(reply)), BulkReplyKind::SubscribeBulk) => { reply.results } ( Some(mx_command_reply::Payload::UnsubscribeBulk(reply)), BulkReplyKind::UnsubscribeBulk, ) => reply.results, _ => Vec::new(), } } fn int32_reply_value(value: &ProtoMxValue) -> Option { match value.kind.as_ref()? { crate::generated::mxaccess_gateway::v1::mx_value::Kind::Int32Value(value) => Some(*value), _ => None, } }