Point the Rust client at the StreamAlarms alarm feed

Replace GatewayClient::query_active_alarms with stream_alarms, an
AlarmFeedStream over AlarmFeedMessage served by the gateway's central
alarm monitor (snapshot, snapshot_complete, then live transitions).
Drops session_id from the acknowledge surface.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-21 16:49:26 -04:00
parent 1ad0be8276
commit 1b6ca07bb5
2 changed files with 58 additions and 40 deletions
+19 -18
View File
@@ -16,9 +16,9 @@ use crate::auth::AuthInterceptor;
use crate::error::{ensure_command_success, ensure_protocol_success, Error};
use crate::generated::mxaccess_gateway::v1::mx_access_gateway_client::MxAccessGatewayClient;
use crate::generated::mxaccess_gateway::v1::{
AcknowledgeAlarmReply, AcknowledgeAlarmRequest, ActiveAlarmSnapshot, CloseSessionReply,
AcknowledgeAlarmReply, AcknowledgeAlarmRequest, AlarmFeedMessage, CloseSessionReply,
CloseSessionRequest, MxCommandReply, MxCommandRequest, MxEvent, OpenSessionReply,
OpenSessionRequest, QueryActiveAlarmsRequest, StreamEventsRequest,
OpenSessionRequest, StreamAlarmsRequest, StreamEventsRequest,
};
use crate::options::ClientOptions;
use crate::session::Session;
@@ -33,11 +33,11 @@ pub type RawGatewayClient = MxAccessGatewayClient<InterceptedService<Channel, Au
pub type EventStream =
std::pin::Pin<Box<dyn futures_core::Stream<Item = Result<MxEvent, Error>> + Send + 'static>>;
/// Pinned, boxed [`ActiveAlarmSnapshot`] stream returned by
/// [`GatewayClient::query_active_alarms`]. Errors are pre-mapped from
/// Pinned, boxed [`AlarmFeedMessage`] stream returned by
/// [`GatewayClient::stream_alarms`]. Errors are pre-mapped from
/// `tonic::Status` to [`Error`]; dropping the stream cancels the call.
pub type ActiveAlarmStream = std::pin::Pin<
Box<dyn futures_core::Stream<Item = Result<ActiveAlarmSnapshot, Error>> + Send + 'static>,
pub type AlarmFeedStream = std::pin::Pin<
Box<dyn futures_core::Stream<Item = Result<AlarmFeedMessage, Error>> + Send + 'static>,
>;
/// Thin async wrapper around the generated gateway client.
@@ -227,26 +227,27 @@ impl GatewayClient {
Ok(reply)
}
/// Open the server-streaming `QueryActiveAlarms` RPC — the gateway's
/// ConditionRefresh equivalent.
/// Attach to the gateway's central `StreamAlarms` feed.
///
/// The returned [`ActiveAlarmStream`] yields one [`ActiveAlarmSnapshot`]
/// per currently-active alarm. Dropping the stream cancels the gRPC call
/// cooperatively. Optional alarm-reference prefix scoping
/// (`request.alarm_filter_prefix`) limits the stream to a sub-tree.
/// The returned [`AlarmFeedStream`] opens with one [`AlarmFeedMessage`]
/// per currently-active alarm (the ConditionRefresh snapshot), then a
/// single `snapshot_complete`, then a `transition` for every subsequent
/// raise / acknowledge / clear. It is served by the gateway's always-on
/// alarm monitor — no worker session is opened — so any number of clients
/// may attach. Dropping the stream cancels the gRPC call cooperatively.
/// Optional alarm-reference prefix scoping (`request.alarm_filter_prefix`)
/// limits the stream to a sub-tree.
///
/// # Errors
///
/// Returns the `tonic::Status` mapped through [`Error::from`] if the
/// server rejects the request.
pub async fn query_active_alarms(
pub async fn stream_alarms(
&self,
request: QueryActiveAlarmsRequest,
) -> Result<ActiveAlarmStream, Error> {
request: StreamAlarmsRequest,
) -> Result<AlarmFeedStream, Error> {
let mut client = self.inner.clone();
let response = client
.query_active_alarms(self.stream_request(request))
.await?;
let response = client.stream_alarms(self.stream_request(request)).await?;
let stream = futures_util::StreamExt::map(response.into_inner(), |result| {
result.map_err(Error::from)
});
+39 -22
View File
@@ -8,6 +8,7 @@ use std::time::Duration;
use futures_core::Stream;
use futures_util::StreamExt;
use mxgateway_client::generated::mxaccess_gateway::v1::alarm_feed_message;
use mxgateway_client::generated::mxaccess_gateway::v1::mx_access_gateway_server::{
MxAccessGateway, MxAccessGatewayServer,
};
@@ -16,12 +17,12 @@ use mxgateway_client::generated::mxaccess_gateway::v1::mx_command_reply;
use mxgateway_client::generated::mxaccess_gateway::v1::mx_value::Kind;
use mxgateway_client::generated::mxaccess_gateway::v1::{
AcknowledgeAlarmReply, AcknowledgeAlarmRequest, ActiveAlarmSnapshot, AddItemReply,
BulkReadReply, BulkReadResult, BulkSubscribeReply, BulkWriteReply, BulkWriteResult,
CloseSessionReply, CloseSessionRequest, MxCommandKind, MxCommandReply, MxDataType, MxEvent,
MxEventFamily, MxStatusCategory, MxStatusProxy, MxStatusSource, MxValue, OpenSessionReply,
OpenSessionRequest, ProtocolStatus, ProtocolStatusCode, QueryActiveAlarmsRequest, SessionState,
StreamEventsRequest, SubscribeResult, Write2BulkEntry, WriteBulkEntry, WriteSecured2BulkEntry,
WriteSecuredBulkEntry,
AlarmFeedMessage, BulkReadReply, BulkReadResult, BulkSubscribeReply, BulkWriteReply,
BulkWriteResult, CloseSessionReply, CloseSessionRequest, MxCommandKind, MxCommandReply,
MxDataType, MxEvent, MxEventFamily, MxStatusCategory, MxStatusProxy, MxStatusSource, MxValue,
OpenSessionReply, OpenSessionRequest, ProtocolStatus, ProtocolStatusCode, SessionState,
StreamAlarmsRequest, StreamEventsRequest, SubscribeResult, Write2BulkEntry, WriteBulkEntry,
WriteSecured2BulkEntry, WriteSecuredBulkEntry,
};
use mxgateway_client::{
ApiKey, ClientOptions, CommandError, Error, GatewayClient, MxStatus, MxValue as ClientMxValue,
@@ -208,7 +209,6 @@ async fn acknowledge_alarm_returns_reply_with_native_status() {
let reply = client
.acknowledge_alarm(AcknowledgeAlarmRequest {
session_id: "session-fixture".to_owned(),
client_correlation_id: "corr-1".to_owned(),
alarm_full_reference: "Tank01.Level.HiHi".to_owned(),
comment: "investigating".to_owned(),
@@ -225,7 +225,7 @@ async fn acknowledge_alarm_returns_reply_with_native_status() {
}
#[tokio::test]
async fn query_active_alarms_streams_snapshot_rows() {
async fn stream_alarms_streams_snapshot_then_complete() {
let state = Arc::new(FakeState::default());
let endpoint = spawn_fake_gateway(state.clone()).await;
let client = GatewayClient::connect(ClientOptions::new(endpoint))
@@ -233,15 +233,23 @@ async fn query_active_alarms_streams_snapshot_rows() {
.unwrap();
let mut stream = client
.query_active_alarms(QueryActiveAlarmsRequest {
session_id: "session-fixture".to_owned(),
..QueryActiveAlarmsRequest::default()
})
.stream_alarms(StreamAlarmsRequest::default())
.await
.unwrap();
let first = stream.next().await.unwrap().unwrap();
assert_eq!(first.alarm_full_reference, "Tank01.Level.HiHi");
match first.payload {
Some(alarm_feed_message::Payload::ActiveAlarm(snapshot)) => {
assert_eq!(snapshot.alarm_full_reference, "Tank01.Level.HiHi");
}
other => panic!("expected an active-alarm snapshot, got {other:?}"),
}
let second = stream.next().await.unwrap().unwrap();
assert_eq!(
second.payload,
Some(alarm_feed_message::Payload::SnapshotComplete(true)),
);
}
#[test]
@@ -907,7 +915,6 @@ impl MxAccessGateway for FakeGateway {
_request: Request<AcknowledgeAlarmRequest>,
) -> Result<Response<AcknowledgeAlarmReply>, Status> {
Ok(Response::new(AcknowledgeAlarmReply {
session_id: "session-fixture".to_owned(),
correlation_id: "corr-1".to_owned(),
protocol_status: Some(ok_status("ack ok")),
status: Some(MxStatusProxy {
@@ -920,18 +927,28 @@ impl MxAccessGateway for FakeGateway {
}))
}
type QueryActiveAlarmsStream =
Pin<Box<dyn Stream<Item = Result<ActiveAlarmSnapshot, Status>> + Send + 'static>>;
type StreamAlarmsStream =
Pin<Box<dyn Stream<Item = Result<AlarmFeedMessage, Status>> + Send + 'static>>;
async fn query_active_alarms(
async fn stream_alarms(
&self,
_request: Request<QueryActiveAlarmsRequest>,
) -> Result<Response<Self::QueryActiveAlarmsStream>, Status> {
_request: Request<StreamAlarmsRequest>,
) -> Result<Response<Self::StreamAlarmsStream>, Status> {
let (sender, receiver) = mpsc::channel(4);
sender
.send(Ok(ActiveAlarmSnapshot {
alarm_full_reference: "Tank01.Level.HiHi".to_owned(),
..ActiveAlarmSnapshot::default()
.send(Ok(AlarmFeedMessage {
payload: Some(alarm_feed_message::Payload::ActiveAlarm(
ActiveAlarmSnapshot {
alarm_full_reference: "Tank01.Level.HiHi".to_owned(),
..ActiveAlarmSnapshot::default()
},
)),
}))
.await
.unwrap();
sender
.send(Ok(AlarmFeedMessage {
payload: Some(alarm_feed_message::Payload::SnapshotComplete(true)),
}))
.await
.unwrap();