From fe19c478c04071f49db6e3e583aaacdbdf37d7c3 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Thu, 30 Apr 2026 17:06:13 -0400 Subject: [PATCH] clients/rust: SDK methods for AcknowledgeAlarm + QueryActiveAlarms (PR E.6) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Eleventh PR of the alarms-over-gateway epic (docs/plans/alarms-over-gateway.md). Mirrors PR E.2's .NET surface on the Rust async SDK. Depends on PR E.1 (regen, merged). - GatewayClient::acknowledge_alarm — async unary call. Uses the existing unary_request helper (call timeout) and routes failures through Error mapping; non-OK protocol status promotes to Error::ProtocolStatus via ensure_protocol_success. - GatewayClient::query_active_alarms — async server-streaming call returning a new ActiveAlarmStream type alias (parallel to EventStream). Errors are pre-mapped from tonic::Status; dropping the stream cancels the call cooperatively. - GATEWAY_PROTOCOL_VERSION bumped 2 → 3 to match the .NET contract. - FakeGateway test impl extends to satisfy the new trait methods so client_behavior.rs builds. Two new integration tests cover the new SDK methods. Tests: - 12 unit + 10 client_behavior + 4 proto_fixtures = 26 tests, all pass under cargo test (Rust 1.x via existing toolchain). Co-Authored-By: Claude Opus 4.7 (1M context) --- clients/rust/src/client.rs | 62 +++++++++++++++++- clients/rust/src/version.rs | 2 +- clients/rust/tests/client_behavior.rs | 92 +++++++++++++++++++++++++-- 3 files changed, 149 insertions(+), 7 deletions(-) diff --git a/clients/rust/src/client.rs b/clients/rust/src/client.rs index 219b897..8490284 100644 --- a/clients/rust/src/client.rs +++ b/clients/rust/src/client.rs @@ -16,8 +16,9 @@ use crate::auth::AuthInterceptor; use crate::error::{ensure_command_success, ensure_protocol_success, Error}; use crate::generated::mxaccess_gateway::v1::mx_access_gateway_client::MxAccessGatewayClient; use crate::generated::mxaccess_gateway::v1::{ - CloseSessionReply, CloseSessionRequest, MxCommandReply, MxCommandRequest, MxEvent, - OpenSessionReply, OpenSessionRequest, StreamEventsRequest, + AcknowledgeAlarmReply, AcknowledgeAlarmRequest, ActiveAlarmSnapshot, CloseSessionReply, + CloseSessionRequest, MxCommandReply, MxCommandRequest, MxEvent, OpenSessionReply, + OpenSessionRequest, QueryActiveAlarmsRequest, StreamEventsRequest, }; use crate::options::ClientOptions; use crate::session::Session; @@ -32,6 +33,13 @@ pub type RawGatewayClient = MxAccessGatewayClient> + Send + 'static>>; +/// Pinned, boxed [`ActiveAlarmSnapshot`] stream returned by +/// [`GatewayClient::query_active_alarms`]. Errors are pre-mapped from +/// `tonic::Status` to [`Error`]; dropping the stream cancels the call. +pub type ActiveAlarmStream = std::pin::Pin< + Box> + Send + 'static>, +>; + /// Thin async wrapper around the generated gateway client. /// /// The wrapper is `Clone`: every clone shares the underlying tonic channel @@ -194,6 +202,56 @@ impl GatewayClient { Ok(Box::pin(stream)) } + /// Acknowledge an active MXAccess alarm condition through the gateway. + /// + /// The gateway authenticates the request against the API key's + /// `invoke:alarm-ack` scope and forwards the acknowledge to the worker's + /// MXAccess session; the resulting native MxStatus is returned in the + /// reply. Acks are idempotent at the MxAccess layer. + /// + /// # Errors + /// + /// Returns [`Error::ProtocolStatus`] when the gateway accepts the call but + /// reports a non-OK protocol status, plus any of the [`Error`] variants + /// produced by transport failures. + pub async fn acknowledge_alarm( + &self, + request: AcknowledgeAlarmRequest, + ) -> Result { + let mut client = self.inner.clone(); + let response = client.acknowledge_alarm(self.unary_request(request)).await?; + let reply = response.into_inner(); + ensure_protocol_success("acknowledge alarm", reply.protocol_status.as_ref())?; + Ok(reply) + } + + /// Open the server-streaming `QueryActiveAlarms` RPC — the gateway's + /// ConditionRefresh equivalent. + /// + /// The returned [`ActiveAlarmStream`] yields one [`ActiveAlarmSnapshot`] + /// per currently-active alarm. Dropping the stream cancels the gRPC call + /// cooperatively. Optional alarm-reference prefix scoping + /// (`request.alarm_filter_prefix`) limits the stream to a sub-tree. + /// + /// # Errors + /// + /// Returns the `tonic::Status` mapped through [`Error::from`] if the + /// server rejects the request. + pub async fn query_active_alarms( + &self, + request: QueryActiveAlarmsRequest, + ) -> Result { + let mut client = self.inner.clone(); + let response = client + .query_active_alarms(self.stream_request(request)) + .await?; + let stream = futures_util::StreamExt::map(response.into_inner(), |result| { + result.map_err(Error::from) + }); + + Ok(Box::pin(stream)) + } + fn unary_request(&self, message: T) -> Request { let mut request = Request::new(message); request.set_timeout(self.call_timeout); diff --git a/clients/rust/src/version.rs b/clients/rust/src/version.rs index c5a7627..e27fc2e 100644 --- a/clients/rust/src/version.rs +++ b/clients/rust/src/version.rs @@ -7,7 +7,7 @@ pub const CLIENT_VERSION: &str = "0.1.0-dev"; /// Public gateway gRPC protocol version this client targets. -pub const GATEWAY_PROTOCOL_VERSION: u32 = 2; +pub const GATEWAY_PROTOCOL_VERSION: u32 = 3; /// Internal worker IPC protocol version this client expects sessions to use. pub const WORKER_PROTOCOL_VERSION: u32 = 1; diff --git a/clients/rust/tests/client_behavior.rs b/clients/rust/tests/client_behavior.rs index 7e0c894..117d77b 100644 --- a/clients/rust/tests/client_behavior.rs +++ b/clients/rust/tests/client_behavior.rs @@ -14,10 +14,11 @@ use mxgateway_client::generated::mxaccess_gateway::v1::mx_access_gateway_server: use mxgateway_client::generated::mxaccess_gateway::v1::mx_command_reply; use mxgateway_client::generated::mxaccess_gateway::v1::mx_value::Kind; use mxgateway_client::generated::mxaccess_gateway::v1::{ - AddItemReply, BulkSubscribeReply, CloseSessionReply, CloseSessionRequest, MxCommandKind, - MxCommandReply, MxDataType, MxEvent, MxEventFamily, MxStatusCategory, MxStatusProxy, - MxStatusSource, MxValue, OpenSessionReply, OpenSessionRequest, ProtocolStatus, - ProtocolStatusCode, SessionState, StreamEventsRequest, SubscribeResult, + AcknowledgeAlarmReply, AcknowledgeAlarmRequest, ActiveAlarmSnapshot, AddItemReply, + BulkSubscribeReply, CloseSessionReply, CloseSessionRequest, MxCommandKind, MxCommandReply, + MxDataType, MxEvent, MxEventFamily, MxStatusCategory, MxStatusProxy, MxStatusSource, MxValue, + OpenSessionReply, OpenSessionRequest, ProtocolStatus, ProtocolStatusCode, + QueryActiveAlarmsRequest, SessionState, StreamEventsRequest, SubscribeResult, }; use mxgateway_client::{ ApiKey, ClientOptions, CommandError, Error, GatewayClient, MxStatus, MxValue as ClientMxValue, @@ -136,6 +137,52 @@ async fn event_stream_preserves_order_and_drop_cancels_server_stream() { assert!(state.stream_dropped.load(Ordering::SeqCst)); } +#[tokio::test] +async fn acknowledge_alarm_returns_reply_with_native_status() { + let state = Arc::new(FakeState::default()); + let endpoint = spawn_fake_gateway(state.clone()).await; + let client = GatewayClient::connect(ClientOptions::new(endpoint)) + .await + .unwrap(); + + let reply = client + .acknowledge_alarm(AcknowledgeAlarmRequest { + session_id: "session-fixture".to_owned(), + client_correlation_id: "corr-1".to_owned(), + alarm_full_reference: "Tank01.Level.HiHi".to_owned(), + comment: "investigating".to_owned(), + operator_user: "alice".to_owned(), + }) + .await + .unwrap(); + + assert_eq!( + reply.protocol_status.as_ref().unwrap().code, + ProtocolStatusCode::Ok as i32 + ); + assert_eq!(reply.status.as_ref().unwrap().success, 1); +} + +#[tokio::test] +async fn query_active_alarms_streams_snapshot_rows() { + let state = Arc::new(FakeState::default()); + let endpoint = spawn_fake_gateway(state.clone()).await; + let client = GatewayClient::connect(ClientOptions::new(endpoint)) + .await + .unwrap(); + + let mut stream = client + .query_active_alarms(QueryActiveAlarmsRequest { + session_id: "session-fixture".to_owned(), + ..QueryActiveAlarmsRequest::default() + }) + .await + .unwrap(); + + let first = stream.next().await.unwrap().unwrap(); + assert_eq!(first.alarm_full_reference, "Tank01.Level.HiHi"); +} + #[test] fn value_conversion_fixtures_keep_typed_projection_and_raw_metadata() { let fixture = behavior_fixture("values/value-conversion-cases.json"); @@ -335,6 +382,43 @@ impl MxAccessGateway for FakeGateway { dropped: self.state.stream_dropped.clone(), })) } + + async fn acknowledge_alarm( + &self, + _request: Request, + ) -> Result, Status> { + Ok(Response::new(AcknowledgeAlarmReply { + session_id: "session-fixture".to_owned(), + correlation_id: "corr-1".to_owned(), + protocol_status: Some(ok_status("ack ok")), + status: Some(MxStatusProxy { + success: 1, + category: MxStatusCategory::Ok as i32, + detected_by: MxStatusSource::RespondingLmx as i32, + ..MxStatusProxy::default() + }), + ..AcknowledgeAlarmReply::default() + })) + } + + type QueryActiveAlarmsStream = + Pin> + Send + 'static>>; + + async fn query_active_alarms( + &self, + _request: Request, + ) -> Result, Status> { + let (sender, receiver) = mpsc::channel(4); + sender + .send(Ok(ActiveAlarmSnapshot { + alarm_full_reference: "Tank01.Level.HiHi".to_owned(), + ..ActiveAlarmSnapshot::default() + })) + .await + .unwrap(); + let stream = ReceiverStream::new(receiver); + Ok(Response::new(Box::pin(stream))) + } } struct DropAwareStream {