clients/rust: SDK methods for AcknowledgeAlarm + QueryActiveAlarms (PR E.6) #110
@@ -16,8 +16,9 @@ use crate::auth::AuthInterceptor;
|
||||
use crate::error::{ensure_command_success, ensure_protocol_success, Error};
|
||||
use crate::generated::mxaccess_gateway::v1::mx_access_gateway_client::MxAccessGatewayClient;
|
||||
use crate::generated::mxaccess_gateway::v1::{
|
||||
CloseSessionReply, CloseSessionRequest, MxCommandReply, MxCommandRequest, MxEvent,
|
||||
OpenSessionReply, OpenSessionRequest, StreamEventsRequest,
|
||||
AcknowledgeAlarmReply, AcknowledgeAlarmRequest, ActiveAlarmSnapshot, CloseSessionReply,
|
||||
CloseSessionRequest, MxCommandReply, MxCommandRequest, MxEvent, OpenSessionReply,
|
||||
OpenSessionRequest, QueryActiveAlarmsRequest, StreamEventsRequest,
|
||||
};
|
||||
use crate::options::ClientOptions;
|
||||
use crate::session::Session;
|
||||
@@ -32,6 +33,13 @@ pub type RawGatewayClient = MxAccessGatewayClient<InterceptedService<Channel, Au
|
||||
pub type EventStream =
|
||||
std::pin::Pin<Box<dyn futures_core::Stream<Item = Result<MxEvent, Error>> + Send + 'static>>;
|
||||
|
||||
/// Pinned, boxed [`ActiveAlarmSnapshot`] stream returned by
|
||||
/// [`GatewayClient::query_active_alarms`]. Errors are pre-mapped from
|
||||
/// `tonic::Status` to [`Error`]; dropping the stream cancels the call.
|
||||
pub type ActiveAlarmStream = std::pin::Pin<
|
||||
Box<dyn futures_core::Stream<Item = Result<ActiveAlarmSnapshot, Error>> + Send + 'static>,
|
||||
>;
|
||||
|
||||
/// Thin async wrapper around the generated gateway client.
|
||||
///
|
||||
/// The wrapper is `Clone`: every clone shares the underlying tonic channel
|
||||
@@ -194,6 +202,56 @@ impl GatewayClient {
|
||||
Ok(Box::pin(stream))
|
||||
}
|
||||
|
||||
/// Acknowledge an active MXAccess alarm condition through the gateway.
|
||||
///
|
||||
/// The gateway authenticates the request against the API key's
|
||||
/// `invoke:alarm-ack` scope and forwards the acknowledge to the worker's
|
||||
/// MXAccess session; the resulting native MxStatus is returned in the
|
||||
/// reply. Acks are idempotent at the MxAccess layer.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns [`Error::ProtocolStatus`] when the gateway accepts the call but
|
||||
/// reports a non-OK protocol status, plus any of the [`Error`] variants
|
||||
/// produced by transport failures.
|
||||
pub async fn acknowledge_alarm(
|
||||
&self,
|
||||
request: AcknowledgeAlarmRequest,
|
||||
) -> Result<AcknowledgeAlarmReply, Error> {
|
||||
let mut client = self.inner.clone();
|
||||
let response = client.acknowledge_alarm(self.unary_request(request)).await?;
|
||||
let reply = response.into_inner();
|
||||
ensure_protocol_success("acknowledge alarm", reply.protocol_status.as_ref())?;
|
||||
Ok(reply)
|
||||
}
|
||||
|
||||
/// Open the server-streaming `QueryActiveAlarms` RPC — the gateway's
|
||||
/// ConditionRefresh equivalent.
|
||||
///
|
||||
/// The returned [`ActiveAlarmStream`] yields one [`ActiveAlarmSnapshot`]
|
||||
/// per currently-active alarm. Dropping the stream cancels the gRPC call
|
||||
/// cooperatively. Optional alarm-reference prefix scoping
|
||||
/// (`request.alarm_filter_prefix`) limits the stream to a sub-tree.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns the `tonic::Status` mapped through [`Error::from`] if the
|
||||
/// server rejects the request.
|
||||
pub async fn query_active_alarms(
|
||||
&self,
|
||||
request: QueryActiveAlarmsRequest,
|
||||
) -> Result<ActiveAlarmStream, Error> {
|
||||
let mut client = self.inner.clone();
|
||||
let response = client
|
||||
.query_active_alarms(self.stream_request(request))
|
||||
.await?;
|
||||
let stream = futures_util::StreamExt::map(response.into_inner(), |result| {
|
||||
result.map_err(Error::from)
|
||||
});
|
||||
|
||||
Ok(Box::pin(stream))
|
||||
}
|
||||
|
||||
fn unary_request<T>(&self, message: T) -> Request<T> {
|
||||
let mut request = Request::new(message);
|
||||
request.set_timeout(self.call_timeout);
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
pub const CLIENT_VERSION: &str = "0.1.0-dev";
|
||||
|
||||
/// Public gateway gRPC protocol version this client targets.
|
||||
pub const GATEWAY_PROTOCOL_VERSION: u32 = 2;
|
||||
pub const GATEWAY_PROTOCOL_VERSION: u32 = 3;
|
||||
|
||||
/// Internal worker IPC protocol version this client expects sessions to use.
|
||||
pub const WORKER_PROTOCOL_VERSION: u32 = 1;
|
||||
|
||||
@@ -14,10 +14,11 @@ use mxgateway_client::generated::mxaccess_gateway::v1::mx_access_gateway_server:
|
||||
use mxgateway_client::generated::mxaccess_gateway::v1::mx_command_reply;
|
||||
use mxgateway_client::generated::mxaccess_gateway::v1::mx_value::Kind;
|
||||
use mxgateway_client::generated::mxaccess_gateway::v1::{
|
||||
AddItemReply, BulkSubscribeReply, CloseSessionReply, CloseSessionRequest, MxCommandKind,
|
||||
MxCommandReply, MxDataType, MxEvent, MxEventFamily, MxStatusCategory, MxStatusProxy,
|
||||
MxStatusSource, MxValue, OpenSessionReply, OpenSessionRequest, ProtocolStatus,
|
||||
ProtocolStatusCode, SessionState, StreamEventsRequest, SubscribeResult,
|
||||
AcknowledgeAlarmReply, AcknowledgeAlarmRequest, ActiveAlarmSnapshot, AddItemReply,
|
||||
BulkSubscribeReply, CloseSessionReply, CloseSessionRequest, MxCommandKind, MxCommandReply,
|
||||
MxDataType, MxEvent, MxEventFamily, MxStatusCategory, MxStatusProxy, MxStatusSource, MxValue,
|
||||
OpenSessionReply, OpenSessionRequest, ProtocolStatus, ProtocolStatusCode,
|
||||
QueryActiveAlarmsRequest, SessionState, StreamEventsRequest, SubscribeResult,
|
||||
};
|
||||
use mxgateway_client::{
|
||||
ApiKey, ClientOptions, CommandError, Error, GatewayClient, MxStatus, MxValue as ClientMxValue,
|
||||
@@ -136,6 +137,52 @@ async fn event_stream_preserves_order_and_drop_cancels_server_stream() {
|
||||
assert!(state.stream_dropped.load(Ordering::SeqCst));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn acknowledge_alarm_returns_reply_with_native_status() {
|
||||
let state = Arc::new(FakeState::default());
|
||||
let endpoint = spawn_fake_gateway(state.clone()).await;
|
||||
let client = GatewayClient::connect(ClientOptions::new(endpoint))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let reply = client
|
||||
.acknowledge_alarm(AcknowledgeAlarmRequest {
|
||||
session_id: "session-fixture".to_owned(),
|
||||
client_correlation_id: "corr-1".to_owned(),
|
||||
alarm_full_reference: "Tank01.Level.HiHi".to_owned(),
|
||||
comment: "investigating".to_owned(),
|
||||
operator_user: "alice".to_owned(),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
reply.protocol_status.as_ref().unwrap().code,
|
||||
ProtocolStatusCode::Ok as i32
|
||||
);
|
||||
assert_eq!(reply.status.as_ref().unwrap().success, 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn query_active_alarms_streams_snapshot_rows() {
|
||||
let state = Arc::new(FakeState::default());
|
||||
let endpoint = spawn_fake_gateway(state.clone()).await;
|
||||
let client = GatewayClient::connect(ClientOptions::new(endpoint))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut stream = client
|
||||
.query_active_alarms(QueryActiveAlarmsRequest {
|
||||
session_id: "session-fixture".to_owned(),
|
||||
..QueryActiveAlarmsRequest::default()
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let first = stream.next().await.unwrap().unwrap();
|
||||
assert_eq!(first.alarm_full_reference, "Tank01.Level.HiHi");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn value_conversion_fixtures_keep_typed_projection_and_raw_metadata() {
|
||||
let fixture = behavior_fixture("values/value-conversion-cases.json");
|
||||
@@ -335,6 +382,43 @@ impl MxAccessGateway for FakeGateway {
|
||||
dropped: self.state.stream_dropped.clone(),
|
||||
}))
|
||||
}
|
||||
|
||||
async fn acknowledge_alarm(
|
||||
&self,
|
||||
_request: Request<AcknowledgeAlarmRequest>,
|
||||
) -> Result<Response<AcknowledgeAlarmReply>, Status> {
|
||||
Ok(Response::new(AcknowledgeAlarmReply {
|
||||
session_id: "session-fixture".to_owned(),
|
||||
correlation_id: "corr-1".to_owned(),
|
||||
protocol_status: Some(ok_status("ack ok")),
|
||||
status: Some(MxStatusProxy {
|
||||
success: 1,
|
||||
category: MxStatusCategory::Ok as i32,
|
||||
detected_by: MxStatusSource::RespondingLmx as i32,
|
||||
..MxStatusProxy::default()
|
||||
}),
|
||||
..AcknowledgeAlarmReply::default()
|
||||
}))
|
||||
}
|
||||
|
||||
type QueryActiveAlarmsStream =
|
||||
Pin<Box<dyn Stream<Item = Result<ActiveAlarmSnapshot, Status>> + Send + 'static>>;
|
||||
|
||||
async fn query_active_alarms(
|
||||
&self,
|
||||
_request: Request<QueryActiveAlarmsRequest>,
|
||||
) -> Result<Response<Self::QueryActiveAlarmsStream>, Status> {
|
||||
let (sender, receiver) = mpsc::channel(4);
|
||||
sender
|
||||
.send(Ok(ActiveAlarmSnapshot {
|
||||
alarm_full_reference: "Tank01.Level.HiHi".to_owned(),
|
||||
..ActiveAlarmSnapshot::default()
|
||||
}))
|
||||
.await
|
||||
.unwrap();
|
||||
let stream = ReceiverStream::new(receiver);
|
||||
Ok(Response::new(Box::pin(stream)))
|
||||
}
|
||||
}
|
||||
|
||||
struct DropAwareStream {
|
||||
|
||||
Reference in New Issue
Block a user