clients/rust: SDK methods for AcknowledgeAlarm + QueryActiveAlarms (PR E.6)
Eleventh PR of the alarms-over-gateway epic (docs/plans/alarms-over-gateway.md). Mirrors PR E.2's .NET surface on the Rust async SDK. Depends on PR E.1 (regen, merged). - GatewayClient::acknowledge_alarm — async unary call. Uses the existing unary_request helper (call timeout) and routes failures through Error mapping; non-OK protocol status promotes to Error::ProtocolStatus via ensure_protocol_success. - GatewayClient::query_active_alarms — async server-streaming call returning a new ActiveAlarmStream type alias (parallel to EventStream). Errors are pre-mapped from tonic::Status; dropping the stream cancels the call cooperatively. - GATEWAY_PROTOCOL_VERSION bumped 2 → 3 to match the .NET contract. - FakeGateway test impl extends to satisfy the new trait methods so client_behavior.rs builds. Two new integration tests cover the new SDK methods. Tests: - 12 unit + 10 client_behavior + 4 proto_fixtures = 26 tests, all pass under cargo test (Rust 1.x via existing toolchain). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -14,10 +14,11 @@ use mxgateway_client::generated::mxaccess_gateway::v1::mx_access_gateway_server:
|
||||
use mxgateway_client::generated::mxaccess_gateway::v1::mx_command_reply;
|
||||
use mxgateway_client::generated::mxaccess_gateway::v1::mx_value::Kind;
|
||||
use mxgateway_client::generated::mxaccess_gateway::v1::{
|
||||
AddItemReply, BulkSubscribeReply, CloseSessionReply, CloseSessionRequest, MxCommandKind,
|
||||
MxCommandReply, MxDataType, MxEvent, MxEventFamily, MxStatusCategory, MxStatusProxy,
|
||||
MxStatusSource, MxValue, OpenSessionReply, OpenSessionRequest, ProtocolStatus,
|
||||
ProtocolStatusCode, SessionState, StreamEventsRequest, SubscribeResult,
|
||||
AcknowledgeAlarmReply, AcknowledgeAlarmRequest, ActiveAlarmSnapshot, AddItemReply,
|
||||
BulkSubscribeReply, CloseSessionReply, CloseSessionRequest, MxCommandKind, MxCommandReply,
|
||||
MxDataType, MxEvent, MxEventFamily, MxStatusCategory, MxStatusProxy, MxStatusSource, MxValue,
|
||||
OpenSessionReply, OpenSessionRequest, ProtocolStatus, ProtocolStatusCode,
|
||||
QueryActiveAlarmsRequest, SessionState, StreamEventsRequest, SubscribeResult,
|
||||
};
|
||||
use mxgateway_client::{
|
||||
ApiKey, ClientOptions, CommandError, Error, GatewayClient, MxStatus, MxValue as ClientMxValue,
|
||||
@@ -136,6 +137,52 @@ async fn event_stream_preserves_order_and_drop_cancels_server_stream() {
|
||||
assert!(state.stream_dropped.load(Ordering::SeqCst));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn acknowledge_alarm_returns_reply_with_native_status() {
|
||||
let state = Arc::new(FakeState::default());
|
||||
let endpoint = spawn_fake_gateway(state.clone()).await;
|
||||
let client = GatewayClient::connect(ClientOptions::new(endpoint))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let reply = client
|
||||
.acknowledge_alarm(AcknowledgeAlarmRequest {
|
||||
session_id: "session-fixture".to_owned(),
|
||||
client_correlation_id: "corr-1".to_owned(),
|
||||
alarm_full_reference: "Tank01.Level.HiHi".to_owned(),
|
||||
comment: "investigating".to_owned(),
|
||||
operator_user: "alice".to_owned(),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
reply.protocol_status.as_ref().unwrap().code,
|
||||
ProtocolStatusCode::Ok as i32
|
||||
);
|
||||
assert_eq!(reply.status.as_ref().unwrap().success, 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn query_active_alarms_streams_snapshot_rows() {
|
||||
let state = Arc::new(FakeState::default());
|
||||
let endpoint = spawn_fake_gateway(state.clone()).await;
|
||||
let client = GatewayClient::connect(ClientOptions::new(endpoint))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut stream = client
|
||||
.query_active_alarms(QueryActiveAlarmsRequest {
|
||||
session_id: "session-fixture".to_owned(),
|
||||
..QueryActiveAlarmsRequest::default()
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let first = stream.next().await.unwrap().unwrap();
|
||||
assert_eq!(first.alarm_full_reference, "Tank01.Level.HiHi");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn value_conversion_fixtures_keep_typed_projection_and_raw_metadata() {
|
||||
let fixture = behavior_fixture("values/value-conversion-cases.json");
|
||||
@@ -335,6 +382,43 @@ impl MxAccessGateway for FakeGateway {
|
||||
dropped: self.state.stream_dropped.clone(),
|
||||
}))
|
||||
}
|
||||
|
||||
async fn acknowledge_alarm(
|
||||
&self,
|
||||
_request: Request<AcknowledgeAlarmRequest>,
|
||||
) -> Result<Response<AcknowledgeAlarmReply>, Status> {
|
||||
Ok(Response::new(AcknowledgeAlarmReply {
|
||||
session_id: "session-fixture".to_owned(),
|
||||
correlation_id: "corr-1".to_owned(),
|
||||
protocol_status: Some(ok_status("ack ok")),
|
||||
status: Some(MxStatusProxy {
|
||||
success: 1,
|
||||
category: MxStatusCategory::Ok as i32,
|
||||
detected_by: MxStatusSource::RespondingLmx as i32,
|
||||
..MxStatusProxy::default()
|
||||
}),
|
||||
..AcknowledgeAlarmReply::default()
|
||||
}))
|
||||
}
|
||||
|
||||
type QueryActiveAlarmsStream =
|
||||
Pin<Box<dyn Stream<Item = Result<ActiveAlarmSnapshot, Status>> + Send + 'static>>;
|
||||
|
||||
async fn query_active_alarms(
|
||||
&self,
|
||||
_request: Request<QueryActiveAlarmsRequest>,
|
||||
) -> Result<Response<Self::QueryActiveAlarmsStream>, Status> {
|
||||
let (sender, receiver) = mpsc::channel(4);
|
||||
sender
|
||||
.send(Ok(ActiveAlarmSnapshot {
|
||||
alarm_full_reference: "Tank01.Level.HiHi".to_owned(),
|
||||
..ActiveAlarmSnapshot::default()
|
||||
}))
|
||||
.await
|
||||
.unwrap();
|
||||
let stream = ReceiverStream::new(receiver);
|
||||
Ok(Response::new(Box::pin(stream)))
|
||||
}
|
||||
}
|
||||
|
||||
struct DropAwareStream {
|
||||
|
||||
Reference in New Issue
Block a user