diff --git a/clients/rust/src/client.rs b/clients/rust/src/client.rs index 308c70f..ae7a4f3 100644 --- a/clients/rust/src/client.rs +++ b/clients/rust/src/client.rs @@ -16,9 +16,9 @@ use crate::auth::AuthInterceptor; use crate::error::{ensure_command_success, ensure_protocol_success, Error}; use crate::generated::mxaccess_gateway::v1::mx_access_gateway_client::MxAccessGatewayClient; use crate::generated::mxaccess_gateway::v1::{ - AcknowledgeAlarmReply, AcknowledgeAlarmRequest, ActiveAlarmSnapshot, CloseSessionReply, + AcknowledgeAlarmReply, AcknowledgeAlarmRequest, AlarmFeedMessage, CloseSessionReply, CloseSessionRequest, MxCommandReply, MxCommandRequest, MxEvent, OpenSessionReply, - OpenSessionRequest, QueryActiveAlarmsRequest, StreamEventsRequest, + OpenSessionRequest, StreamAlarmsRequest, StreamEventsRequest, }; use crate::options::ClientOptions; use crate::session::Session; @@ -33,11 +33,11 @@ pub type RawGatewayClient = MxAccessGatewayClient> + Send + 'static>>; -/// Pinned, boxed [`ActiveAlarmSnapshot`] stream returned by -/// [`GatewayClient::query_active_alarms`]. Errors are pre-mapped from +/// Pinned, boxed [`AlarmFeedMessage`] stream returned by +/// [`GatewayClient::stream_alarms`]. Errors are pre-mapped from /// `tonic::Status` to [`Error`]; dropping the stream cancels the call. -pub type ActiveAlarmStream = std::pin::Pin< - Box> + Send + 'static>, +pub type AlarmFeedStream = std::pin::Pin< + Box> + Send + 'static>, >; /// Thin async wrapper around the generated gateway client. @@ -227,26 +227,27 @@ impl GatewayClient { Ok(reply) } - /// Open the server-streaming `QueryActiveAlarms` RPC — the gateway's - /// ConditionRefresh equivalent. + /// Attach to the gateway's central `StreamAlarms` feed. /// - /// The returned [`ActiveAlarmStream`] yields one [`ActiveAlarmSnapshot`] - /// per currently-active alarm. Dropping the stream cancels the gRPC call - /// cooperatively. Optional alarm-reference prefix scoping - /// (`request.alarm_filter_prefix`) limits the stream to a sub-tree. + /// The returned [`AlarmFeedStream`] opens with one [`AlarmFeedMessage`] + /// per currently-active alarm (the ConditionRefresh snapshot), then a + /// single `snapshot_complete`, then a `transition` for every subsequent + /// raise / acknowledge / clear. It is served by the gateway's always-on + /// alarm monitor — no worker session is opened — so any number of clients + /// may attach. Dropping the stream cancels the gRPC call cooperatively. + /// Optional alarm-reference prefix scoping (`request.alarm_filter_prefix`) + /// limits the stream to a sub-tree. /// /// # Errors /// /// Returns the `tonic::Status` mapped through [`Error::from`] if the /// server rejects the request. - pub async fn query_active_alarms( + pub async fn stream_alarms( &self, - request: QueryActiveAlarmsRequest, - ) -> Result { + request: StreamAlarmsRequest, + ) -> Result { let mut client = self.inner.clone(); - let response = client - .query_active_alarms(self.stream_request(request)) - .await?; + let response = client.stream_alarms(self.stream_request(request)).await?; let stream = futures_util::StreamExt::map(response.into_inner(), |result| { result.map_err(Error::from) }); diff --git a/clients/rust/tests/client_behavior.rs b/clients/rust/tests/client_behavior.rs index bb281e0..7723fff 100644 --- a/clients/rust/tests/client_behavior.rs +++ b/clients/rust/tests/client_behavior.rs @@ -8,6 +8,7 @@ use std::time::Duration; use futures_core::Stream; use futures_util::StreamExt; +use mxgateway_client::generated::mxaccess_gateway::v1::alarm_feed_message; use mxgateway_client::generated::mxaccess_gateway::v1::mx_access_gateway_server::{ MxAccessGateway, MxAccessGatewayServer, }; @@ -16,12 +17,12 @@ use mxgateway_client::generated::mxaccess_gateway::v1::mx_command_reply; use mxgateway_client::generated::mxaccess_gateway::v1::mx_value::Kind; use mxgateway_client::generated::mxaccess_gateway::v1::{ AcknowledgeAlarmReply, AcknowledgeAlarmRequest, ActiveAlarmSnapshot, AddItemReply, - BulkReadReply, BulkReadResult, BulkSubscribeReply, BulkWriteReply, BulkWriteResult, - CloseSessionReply, CloseSessionRequest, MxCommandKind, MxCommandReply, MxDataType, MxEvent, - MxEventFamily, MxStatusCategory, MxStatusProxy, MxStatusSource, MxValue, OpenSessionReply, - OpenSessionRequest, ProtocolStatus, ProtocolStatusCode, QueryActiveAlarmsRequest, SessionState, - StreamEventsRequest, SubscribeResult, Write2BulkEntry, WriteBulkEntry, WriteSecured2BulkEntry, - WriteSecuredBulkEntry, + AlarmFeedMessage, BulkReadReply, BulkReadResult, BulkSubscribeReply, BulkWriteReply, + BulkWriteResult, CloseSessionReply, CloseSessionRequest, MxCommandKind, MxCommandReply, + MxDataType, MxEvent, MxEventFamily, MxStatusCategory, MxStatusProxy, MxStatusSource, MxValue, + OpenSessionReply, OpenSessionRequest, ProtocolStatus, ProtocolStatusCode, SessionState, + StreamAlarmsRequest, StreamEventsRequest, SubscribeResult, Write2BulkEntry, WriteBulkEntry, + WriteSecured2BulkEntry, WriteSecuredBulkEntry, }; use mxgateway_client::{ ApiKey, ClientOptions, CommandError, Error, GatewayClient, MxStatus, MxValue as ClientMxValue, @@ -208,7 +209,6 @@ async fn acknowledge_alarm_returns_reply_with_native_status() { let reply = client .acknowledge_alarm(AcknowledgeAlarmRequest { - session_id: "session-fixture".to_owned(), client_correlation_id: "corr-1".to_owned(), alarm_full_reference: "Tank01.Level.HiHi".to_owned(), comment: "investigating".to_owned(), @@ -225,7 +225,7 @@ async fn acknowledge_alarm_returns_reply_with_native_status() { } #[tokio::test] -async fn query_active_alarms_streams_snapshot_rows() { +async fn stream_alarms_streams_snapshot_then_complete() { let state = Arc::new(FakeState::default()); let endpoint = spawn_fake_gateway(state.clone()).await; let client = GatewayClient::connect(ClientOptions::new(endpoint)) @@ -233,15 +233,23 @@ async fn query_active_alarms_streams_snapshot_rows() { .unwrap(); let mut stream = client - .query_active_alarms(QueryActiveAlarmsRequest { - session_id: "session-fixture".to_owned(), - ..QueryActiveAlarmsRequest::default() - }) + .stream_alarms(StreamAlarmsRequest::default()) .await .unwrap(); let first = stream.next().await.unwrap().unwrap(); - assert_eq!(first.alarm_full_reference, "Tank01.Level.HiHi"); + match first.payload { + Some(alarm_feed_message::Payload::ActiveAlarm(snapshot)) => { + assert_eq!(snapshot.alarm_full_reference, "Tank01.Level.HiHi"); + } + other => panic!("expected an active-alarm snapshot, got {other:?}"), + } + + let second = stream.next().await.unwrap().unwrap(); + assert_eq!( + second.payload, + Some(alarm_feed_message::Payload::SnapshotComplete(true)), + ); } #[test] @@ -907,7 +915,6 @@ impl MxAccessGateway for FakeGateway { _request: Request, ) -> Result, Status> { Ok(Response::new(AcknowledgeAlarmReply { - session_id: "session-fixture".to_owned(), correlation_id: "corr-1".to_owned(), protocol_status: Some(ok_status("ack ok")), status: Some(MxStatusProxy { @@ -920,18 +927,28 @@ impl MxAccessGateway for FakeGateway { })) } - type QueryActiveAlarmsStream = - Pin> + Send + 'static>>; + type StreamAlarmsStream = + Pin> + Send + 'static>>; - async fn query_active_alarms( + async fn stream_alarms( &self, - _request: Request, - ) -> Result, Status> { + _request: Request, + ) -> Result, Status> { let (sender, receiver) = mpsc::channel(4); sender - .send(Ok(ActiveAlarmSnapshot { - alarm_full_reference: "Tank01.Level.HiHi".to_owned(), - ..ActiveAlarmSnapshot::default() + .send(Ok(AlarmFeedMessage { + payload: Some(alarm_feed_message::Payload::ActiveAlarm( + ActiveAlarmSnapshot { + alarm_full_reference: "Tank01.Level.HiHi".to_owned(), + ..ActiveAlarmSnapshot::default() + }, + )), + })) + .await + .unwrap(); + sender + .send(Ok(AlarmFeedMessage { + payload: Some(alarm_feed_message::Payload::SnapshotComplete(true)), })) .await .unwrap();