4a0f88b17d
Client.Rust-022 Restored Error::MalformedReply for register / add_item /
add_item2 and the bulk-subscribe / read-bulk / write-bulk
dispatch arms so malformed-but-OK replies fail loudly
instead of returning Vec::new().
Client.Rust-023 Restored next_correlation_id and routed every CLI close /
stream-alarms / acknowledge-alarm / bench-read-bulk call
through it so each call carries a unique opaque token.
Client.Rust-024 Added round-trip tests for read_bulk / write_bulk /
write2_bulk / write_secured_bulk / write_secured2_bulk
plus stream_alarms and percentile_summary unit tests.
Client.Rust-025 RustClientDesign.md re-synced — new bulk SDK, alarms
surface, Error variants, CLI command list, and the
Windows stack workaround.
Client.Rust-026 Session::read_bulk now borrows a tag slice; bench-read-
bulk binds tags once outside the warm-up / steady-state
loops.
Client.Rust-027 .cargo/config.toml selector tightened to
cfg(all(windows, target_env = "msvc")) and comment
rewritten to match reality (release + debug ship the
8 MB reservation).
Client.Rust-028 run_batch removed the empty-line break; stdin EOF is
the only terminator.
Client.Rust-029 Re-applied Client.Rust-001 / 002 / 012 — added the
missing doc comments, renamed BulkReplyKind variants,
and replaced the clone-on-copy with a deref under lock
so cargo clippy -D warnings is clean.
All resolved at 2026-05-24; cargo fmt + check + clippy + test all green
(55 tests).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1094 lines
38 KiB
Rust
1094 lines
38 KiB
Rust
use std::pin::Pin;
|
|
use std::sync::{
|
|
atomic::{AtomicBool, Ordering},
|
|
Arc,
|
|
};
|
|
use std::task::{Context, Poll};
|
|
use std::time::Duration;
|
|
|
|
use futures_core::Stream;
|
|
use futures_util::StreamExt;
|
|
use serde_json::Value;
|
|
use tokio::net::TcpListener;
|
|
use tokio::sync::{mpsc, Mutex};
|
|
use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream};
|
|
use tonic::transport::Server;
|
|
use tonic::{Request, Response, Status};
|
|
use zb_mom_ww_mxgateway_client::generated::mxaccess_gateway::v1::mx_access_gateway_server::{
|
|
MxAccessGateway, MxAccessGatewayServer,
|
|
};
|
|
use zb_mom_ww_mxgateway_client::generated::mxaccess_gateway::v1::mx_command_reply;
|
|
use zb_mom_ww_mxgateway_client::generated::mxaccess_gateway::v1::mx_value::Kind;
|
|
use zb_mom_ww_mxgateway_client::generated::mxaccess_gateway::v1::{
|
|
alarm_feed_message, AcknowledgeAlarmReply, AcknowledgeAlarmRequest, ActiveAlarmSnapshot,
|
|
AddItem2Reply, AddItemReply, AlarmConditionState, AlarmFeedMessage, AlarmTransitionKind,
|
|
BulkReadReply, BulkReadResult, BulkSubscribeReply, BulkWriteReply, BulkWriteResult,
|
|
CloseSessionReply, CloseSessionRequest, MxCommandKind, MxCommandReply, MxDataType, MxEvent,
|
|
MxEventFamily, MxStatusCategory, MxStatusProxy, MxStatusSource, MxValue,
|
|
OnAlarmTransitionEvent, OpenSessionReply, OpenSessionRequest, ProtocolStatus,
|
|
ProtocolStatusCode, QueryActiveAlarmsRequest, RegisterReply, SessionState, StreamAlarmsRequest,
|
|
StreamEventsRequest, SubscribeResult, Write2BulkEntry, WriteBulkEntry, WriteSecured2BulkEntry,
|
|
WriteSecuredBulkEntry,
|
|
};
|
|
use zb_mom_ww_mxgateway_client::{
|
|
next_correlation_id, ApiKey, ClientOptions, CommandError, Error, GatewayClient, MxStatus,
|
|
MxValue as ClientMxValue, MxValueProjection,
|
|
};
|
|
|
|
#[tokio::test]
|
|
async fn fake_server_receives_bearer_metadata_and_raw_client_is_reachable() {
|
|
let state = Arc::new(FakeState::default());
|
|
let endpoint = spawn_fake_gateway(state.clone()).await;
|
|
let mut client = GatewayClient::connect(
|
|
ClientOptions::new(endpoint).with_api_key(ApiKey::new("mxgw_fixture_secret")),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
let _raw = client.raw_client();
|
|
let session = client
|
|
.open_session(OpenSessionRequest {
|
|
client_session_name: "rust-test".to_owned(),
|
|
..OpenSessionRequest::default()
|
|
})
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_eq!(session.id(), "session-fixture");
|
|
assert_eq!(
|
|
state.authorization.lock().await.as_deref(),
|
|
Some("Bearer mxgw_fixture_secret")
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn session_helpers_build_commands_and_preserve_command_errors() {
|
|
let state = Arc::new(FakeState::default());
|
|
let endpoint = spawn_fake_gateway(state.clone()).await;
|
|
let client = GatewayClient::connect(ClientOptions::new(endpoint))
|
|
.await
|
|
.unwrap();
|
|
let session = client.session("session-fixture");
|
|
|
|
let item_handle = session.add_item(12, "Plant.Area.Tag").await.unwrap();
|
|
|
|
assert_eq!(item_handle, 34);
|
|
let last_command = state.last_command_kind.lock().await;
|
|
assert_eq!(*last_command, Some(MxCommandKind::AddItem as i32));
|
|
drop(last_command);
|
|
|
|
let error = session
|
|
.write(12, 34, ClientMxValue::int32(123), 0)
|
|
.await
|
|
.unwrap_err();
|
|
let Error::Command(error) = error else {
|
|
panic!("write failure should preserve the raw command reply: {error:?}");
|
|
};
|
|
assert_eq!(
|
|
error.reply().protocol_status.as_ref().unwrap().code,
|
|
ProtocolStatusCode::MxaccessFailure as i32
|
|
);
|
|
assert_eq!(error.reply().hresult, Some(-2147220992));
|
|
assert_eq!(error.reply().statuses.len(), 2);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn subscribe_bulk_builds_one_bulk_command_and_returns_results() {
|
|
let state = Arc::new(FakeState::default());
|
|
let endpoint = spawn_fake_gateway(state.clone()).await;
|
|
let client = GatewayClient::connect(ClientOptions::new(endpoint))
|
|
.await
|
|
.unwrap();
|
|
let session = client.session("session-fixture");
|
|
|
|
let results = session
|
|
.subscribe_bulk(12, vec!["Area001.Pump001.Speed".to_owned()])
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_eq!(results[0].item_handle, 34);
|
|
let last_command = state.last_command_kind.lock().await;
|
|
assert_eq!(*last_command, Some(MxCommandKind::SubscribeBulk as i32));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn event_stream_preserves_order_and_drop_cancels_server_stream() {
|
|
let state = Arc::new(FakeState::default());
|
|
let endpoint = spawn_fake_gateway(state.clone()).await;
|
|
let client = GatewayClient::connect(ClientOptions::new(endpoint))
|
|
.await
|
|
.unwrap();
|
|
|
|
let mut stream = client
|
|
.stream_events(StreamEventsRequest {
|
|
session_id: "session-fixture".to_owned(),
|
|
after_worker_sequence: 0,
|
|
})
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_eq!(stream.next().await.unwrap().unwrap().worker_sequence, 1);
|
|
assert_eq!(stream.next().await.unwrap().unwrap().worker_sequence, 2);
|
|
|
|
drop(stream);
|
|
for _ in 0..20 {
|
|
if state.stream_dropped.load(Ordering::SeqCst) {
|
|
return;
|
|
}
|
|
tokio::time::sleep(Duration::from_millis(25)).await;
|
|
}
|
|
|
|
assert!(state.stream_dropped.load(Ordering::SeqCst));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn acknowledge_alarm_returns_reply_with_native_status() {
|
|
let state = Arc::new(FakeState::default());
|
|
let endpoint = spawn_fake_gateway(state.clone()).await;
|
|
let client = GatewayClient::connect(ClientOptions::new(endpoint))
|
|
.await
|
|
.unwrap();
|
|
|
|
let reply = client
|
|
.acknowledge_alarm(AcknowledgeAlarmRequest {
|
|
client_correlation_id: "corr-1".to_owned(),
|
|
alarm_full_reference: "Tank01.Level.HiHi".to_owned(),
|
|
comment: "investigating".to_owned(),
|
|
operator_user: "alice".to_owned(),
|
|
})
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_eq!(
|
|
reply.protocol_status.as_ref().unwrap().code,
|
|
ProtocolStatusCode::Ok as i32
|
|
);
|
|
assert_eq!(reply.status.as_ref().unwrap().success, 1);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn query_active_alarms_streams_snapshot_rows() {
|
|
let state = Arc::new(FakeState::default());
|
|
let endpoint = spawn_fake_gateway(state.clone()).await;
|
|
let client = GatewayClient::connect(ClientOptions::new(endpoint))
|
|
.await
|
|
.unwrap();
|
|
|
|
let mut stream = client
|
|
.query_active_alarms(QueryActiveAlarmsRequest {
|
|
session_id: "session-fixture".to_owned(),
|
|
..QueryActiveAlarmsRequest::default()
|
|
})
|
|
.await
|
|
.unwrap();
|
|
|
|
let first = stream.next().await.unwrap().unwrap();
|
|
assert_eq!(first.alarm_full_reference, "Tank01.Level.HiHi");
|
|
}
|
|
|
|
#[test]
|
|
fn value_conversion_fixtures_keep_typed_projection_and_raw_metadata() {
|
|
let fixture = behavior_fixture("values/value-conversion-cases.json");
|
|
let cases = fixture["cases"].as_array().unwrap();
|
|
|
|
let int64_case = case_by_id(cases, "int64.large");
|
|
let int64_value = ClientMxValue::from_proto(MxValue {
|
|
data_type: MxDataType::Integer as i32,
|
|
variant_type: "VT_I8".to_owned(),
|
|
kind: Some(Kind::Int64Value(
|
|
int64_case["value"]["int64Value"]
|
|
.as_str()
|
|
.unwrap()
|
|
.parse()
|
|
.unwrap(),
|
|
)),
|
|
..MxValue::default()
|
|
});
|
|
assert_eq!(
|
|
int64_value.projection(),
|
|
&MxValueProjection::Int64(9_223_372_036_854_770_000)
|
|
);
|
|
|
|
let raw_case = case_by_id(cases, "raw-fallback.variant");
|
|
let raw_value = ClientMxValue::from_proto(MxValue {
|
|
data_type: MxDataType::Unknown as i32,
|
|
variant_type: "VT_RECORD".to_owned(),
|
|
raw_diagnostic: raw_case["value"]["rawDiagnostic"]
|
|
.as_str()
|
|
.unwrap()
|
|
.to_owned(),
|
|
raw_data_type: raw_case["value"]["rawDataType"].as_i64().unwrap() as i32,
|
|
kind: Some(Kind::RawValue(vec![1, 2, 3, 4, 5])),
|
|
..MxValue::default()
|
|
});
|
|
assert_eq!(
|
|
raw_value.projection(),
|
|
&MxValueProjection::Raw(vec![1, 2, 3, 4, 5])
|
|
);
|
|
assert_eq!(raw_value.raw().raw_data_type, 32767);
|
|
assert!(raw_value.raw().raw_diagnostic.contains("No lossless"));
|
|
}
|
|
|
|
#[test]
|
|
fn status_conversion_fixtures_preserve_raw_fields() {
|
|
let fixture = behavior_fixture("statuses/status-conversion-cases.json");
|
|
let cases = fixture["cases"].as_array().unwrap();
|
|
let raw_case = case_by_id(cases, "raw-unknown-category");
|
|
let status = MxStatus::from_proto(MxStatusProxy {
|
|
success: raw_case["status"]["success"].as_i64().unwrap() as i32,
|
|
category: MxStatusCategory::Unknown as i32,
|
|
detected_by: MxStatusSource::Unknown as i32,
|
|
detail: raw_case["status"]["detail"].as_i64().unwrap() as i32,
|
|
raw_category: raw_case["status"]["rawCategory"].as_i64().unwrap() as i32,
|
|
raw_detected_by: raw_case["status"]["rawDetectedBy"].as_i64().unwrap() as i32,
|
|
diagnostic_text: raw_case["status"]["diagnosticText"]
|
|
.as_str()
|
|
.unwrap()
|
|
.to_owned(),
|
|
});
|
|
|
|
assert_eq!(status.success(), 0);
|
|
assert_eq!(status.category(), Some(MxStatusCategory::Unknown));
|
|
assert_eq!(status.raw_category(), 99);
|
|
assert_eq!(status.raw_detected_by(), 77);
|
|
assert!(status.diagnostic_text().contains("preserved"));
|
|
}
|
|
|
|
#[test]
|
|
fn authentication_and_authorization_statuses_are_distinct_and_redacted() {
|
|
let auth = Error::from(Status::unauthenticated(
|
|
"invalid API key mxgw_visible_secret",
|
|
));
|
|
let denied = Error::from(Status::permission_denied("missing scope mxaccess.write"));
|
|
|
|
assert!(matches!(auth, Error::Authentication { .. }));
|
|
assert!(matches!(denied, Error::Authorization { .. }));
|
|
assert!(!auth.to_string().contains("visible_secret"));
|
|
}
|
|
|
|
#[test]
|
|
fn command_error_display_keeps_raw_reply_accessible() {
|
|
let reply = mxaccess_failure_reply();
|
|
let error = CommandError::new(reply.clone());
|
|
|
|
assert_eq!(error.reply().hresult, Some(-2147220992));
|
|
assert!(error.to_string().contains("MxaccessFailure"));
|
|
}
|
|
|
|
// ---- Client.Rust-022 / 024 regression coverage ---------------------------
|
|
|
|
#[tokio::test]
|
|
async fn register_returns_malformed_reply_when_ok_reply_has_no_payload() {
|
|
let state = Arc::new(FakeState::default());
|
|
*state.invoke_override.lock().await = Some(InvokeOverride::OkWithoutPayload);
|
|
let endpoint = spawn_fake_gateway(state.clone()).await;
|
|
let client = GatewayClient::connect(ClientOptions::new(endpoint))
|
|
.await
|
|
.unwrap();
|
|
let session = client.session("session-fixture");
|
|
|
|
let error = session.register("client").await.unwrap_err();
|
|
|
|
assert!(
|
|
matches!(error, Error::MalformedReply { .. }),
|
|
"expected MalformedReply, got {error:?}"
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn add_item_returns_malformed_reply_when_ok_reply_has_no_payload() {
|
|
let state = Arc::new(FakeState::default());
|
|
*state.invoke_override.lock().await = Some(InvokeOverride::OkWithoutPayload);
|
|
let endpoint = spawn_fake_gateway(state.clone()).await;
|
|
let client = GatewayClient::connect(ClientOptions::new(endpoint))
|
|
.await
|
|
.unwrap();
|
|
let session = client.session("session-fixture");
|
|
|
|
let error = session.add_item(12, "Plant.Area.Tag").await.unwrap_err();
|
|
|
|
assert!(
|
|
matches!(error, Error::MalformedReply { .. }),
|
|
"expected MalformedReply, got {error:?}"
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn add_item2_returns_malformed_reply_when_ok_reply_has_no_payload() {
|
|
let state = Arc::new(FakeState::default());
|
|
*state.invoke_override.lock().await = Some(InvokeOverride::OkWithoutPayload);
|
|
let endpoint = spawn_fake_gateway(state.clone()).await;
|
|
let client = GatewayClient::connect(ClientOptions::new(endpoint))
|
|
.await
|
|
.unwrap();
|
|
let session = client.session("session-fixture");
|
|
|
|
let error = session
|
|
.add_item2(12, "Plant.Area.Tag", "ctx")
|
|
.await
|
|
.unwrap_err();
|
|
|
|
assert!(
|
|
matches!(error, Error::MalformedReply { .. }),
|
|
"expected MalformedReply, got {error:?}"
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn subscribe_bulk_returns_malformed_reply_on_mismatched_payload_arm() {
|
|
let state = Arc::new(FakeState::default());
|
|
*state.invoke_override.lock().await = Some(InvokeOverride::OkWithMismatchedPayload);
|
|
let endpoint = spawn_fake_gateway(state.clone()).await;
|
|
let client = GatewayClient::connect(ClientOptions::new(endpoint))
|
|
.await
|
|
.unwrap();
|
|
let session = client.session("session-fixture");
|
|
|
|
let error = session
|
|
.subscribe_bulk(12, vec!["Area001.Pump001.Speed".to_owned()])
|
|
.await
|
|
.unwrap_err();
|
|
|
|
assert!(
|
|
matches!(error, Error::MalformedReply { .. }),
|
|
"expected MalformedReply, got {error:?}"
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn read_bulk_returns_malformed_reply_on_mismatched_payload_arm() {
|
|
let state = Arc::new(FakeState::default());
|
|
*state.invoke_override.lock().await = Some(InvokeOverride::OkWithMismatchedPayload);
|
|
let endpoint = spawn_fake_gateway(state.clone()).await;
|
|
let client = GatewayClient::connect(ClientOptions::new(endpoint))
|
|
.await
|
|
.unwrap();
|
|
let session = client.session("session-fixture");
|
|
|
|
let error = session
|
|
.read_bulk(12, &["Area001.Pump001.Speed".to_owned()], 1000)
|
|
.await
|
|
.unwrap_err();
|
|
|
|
assert!(
|
|
matches!(error, Error::MalformedReply { .. }),
|
|
"expected MalformedReply, got {error:?}"
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn write_bulk_returns_malformed_reply_on_mismatched_payload_arm() {
|
|
let state = Arc::new(FakeState::default());
|
|
*state.invoke_override.lock().await = Some(InvokeOverride::OkWithMismatchedPayload);
|
|
let endpoint = spawn_fake_gateway(state.clone()).await;
|
|
let client = GatewayClient::connect(ClientOptions::new(endpoint))
|
|
.await
|
|
.unwrap();
|
|
let session = client.session("session-fixture");
|
|
|
|
let error = session
|
|
.write_bulk(
|
|
12,
|
|
vec![WriteBulkEntry {
|
|
item_handle: 34,
|
|
value: Some(ClientMxValue::int32(1).into_proto()),
|
|
user_id: 0,
|
|
}],
|
|
)
|
|
.await
|
|
.unwrap_err();
|
|
|
|
assert!(
|
|
matches!(error, Error::MalformedReply { .. }),
|
|
"expected MalformedReply, got {error:?}"
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn unary_invoke_maps_status_unavailable_to_error_unavailable() {
|
|
let state = Arc::new(FakeState::default());
|
|
*state.invoke_override.lock().await =
|
|
Some(InvokeOverride::Unavailable("gateway restarting".to_owned()));
|
|
let endpoint = spawn_fake_gateway(state.clone()).await;
|
|
let client = GatewayClient::connect(ClientOptions::new(endpoint))
|
|
.await
|
|
.unwrap();
|
|
let session = client.session("session-fixture");
|
|
|
|
let error = session.add_item(12, "Plant.Area.Tag").await.unwrap_err();
|
|
|
|
assert!(
|
|
matches!(error, Error::Unavailable { .. }),
|
|
"expected Unavailable, got {error:?}"
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn read_bulk_round_trips_through_the_fake_gateway() {
|
|
let state = Arc::new(FakeState::default());
|
|
let endpoint = spawn_fake_gateway(state.clone()).await;
|
|
let client = GatewayClient::connect(ClientOptions::new(endpoint))
|
|
.await
|
|
.unwrap();
|
|
let session = client.session("session-fixture");
|
|
|
|
let results = session
|
|
.read_bulk(12, &["Area001.Pump001.Speed".to_owned()], 1000)
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_eq!(results.len(), 1);
|
|
assert!(results[0].was_successful);
|
|
assert!(results[0].was_cached);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn write_bulk_round_trips_through_the_fake_gateway() {
|
|
let state = Arc::new(FakeState::default());
|
|
let endpoint = spawn_fake_gateway(state.clone()).await;
|
|
let client = GatewayClient::connect(ClientOptions::new(endpoint))
|
|
.await
|
|
.unwrap();
|
|
let session = client.session("session-fixture");
|
|
|
|
let results = session
|
|
.write_bulk(
|
|
12,
|
|
vec![WriteBulkEntry {
|
|
item_handle: 34,
|
|
value: Some(ClientMxValue::int32(1).into_proto()),
|
|
user_id: 0,
|
|
}],
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_eq!(results.len(), 1);
|
|
assert!(results[0].was_successful);
|
|
let last_command = state.last_command_kind.lock().await;
|
|
assert_eq!(*last_command, Some(MxCommandKind::WriteBulk as i32));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn write2_bulk_round_trips_through_the_fake_gateway() {
|
|
let state = Arc::new(FakeState::default());
|
|
let endpoint = spawn_fake_gateway(state.clone()).await;
|
|
let client = GatewayClient::connect(ClientOptions::new(endpoint))
|
|
.await
|
|
.unwrap();
|
|
let session = client.session("session-fixture");
|
|
|
|
let results = session
|
|
.write2_bulk(
|
|
12,
|
|
vec![Write2BulkEntry {
|
|
item_handle: 34,
|
|
value: Some(ClientMxValue::int32(1).into_proto()),
|
|
timestamp_value: Some(ClientMxValue::string("2026-05-24T00:00:00Z").into_proto()),
|
|
user_id: 0,
|
|
}],
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_eq!(results.len(), 1);
|
|
assert!(results[0].was_successful);
|
|
let last_command = state.last_command_kind.lock().await;
|
|
assert_eq!(*last_command, Some(MxCommandKind::Write2Bulk as i32));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn write_secured_bulk_round_trips_through_the_fake_gateway() {
|
|
let state = Arc::new(FakeState::default());
|
|
let endpoint = spawn_fake_gateway(state.clone()).await;
|
|
let client = GatewayClient::connect(ClientOptions::new(endpoint))
|
|
.await
|
|
.unwrap();
|
|
let session = client.session("session-fixture");
|
|
|
|
let results = session
|
|
.write_secured_bulk(
|
|
12,
|
|
vec![WriteSecuredBulkEntry {
|
|
item_handle: 34,
|
|
value: Some(ClientMxValue::int32(1).into_proto()),
|
|
current_user_id: 0,
|
|
verifier_user_id: 0,
|
|
}],
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_eq!(results.len(), 1);
|
|
assert!(results[0].was_successful);
|
|
let last_command = state.last_command_kind.lock().await;
|
|
assert_eq!(*last_command, Some(MxCommandKind::WriteSecuredBulk as i32));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn write_secured2_bulk_round_trips_through_the_fake_gateway() {
|
|
let state = Arc::new(FakeState::default());
|
|
let endpoint = spawn_fake_gateway(state.clone()).await;
|
|
let client = GatewayClient::connect(ClientOptions::new(endpoint))
|
|
.await
|
|
.unwrap();
|
|
let session = client.session("session-fixture");
|
|
|
|
let results = session
|
|
.write_secured2_bulk(
|
|
12,
|
|
vec![WriteSecured2BulkEntry {
|
|
item_handle: 34,
|
|
value: Some(ClientMxValue::int32(1).into_proto()),
|
|
timestamp_value: Some(ClientMxValue::string("2026-05-24T00:00:00Z").into_proto()),
|
|
current_user_id: 0,
|
|
verifier_user_id: 0,
|
|
}],
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_eq!(results.len(), 1);
|
|
assert!(results[0].was_successful);
|
|
let last_command = state.last_command_kind.lock().await;
|
|
assert_eq!(*last_command, Some(MxCommandKind::WriteSecured2Bulk as i32));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn stream_alarms_emits_snapshot_then_complete_then_transition_in_order() {
|
|
let state = Arc::new(FakeState::default());
|
|
*state.stream_alarms_script.lock().await = Some(vec![
|
|
AlarmFeedMessage {
|
|
payload: Some(alarm_feed_message::Payload::ActiveAlarm(
|
|
ActiveAlarmSnapshot {
|
|
alarm_full_reference: "Tank01.Level.HiHi".to_owned(),
|
|
current_state: AlarmConditionState::Active as i32,
|
|
..ActiveAlarmSnapshot::default()
|
|
},
|
|
)),
|
|
},
|
|
AlarmFeedMessage {
|
|
payload: Some(alarm_feed_message::Payload::SnapshotComplete(true)),
|
|
},
|
|
AlarmFeedMessage {
|
|
payload: Some(alarm_feed_message::Payload::Transition(
|
|
OnAlarmTransitionEvent {
|
|
alarm_full_reference: "Tank01.Level.HiHi".to_owned(),
|
|
transition_kind: AlarmTransitionKind::Raise as i32,
|
|
..OnAlarmTransitionEvent::default()
|
|
},
|
|
)),
|
|
},
|
|
]);
|
|
let endpoint = spawn_fake_gateway(state.clone()).await;
|
|
let client = GatewayClient::connect(ClientOptions::new(endpoint))
|
|
.await
|
|
.unwrap();
|
|
|
|
let mut stream = client
|
|
.stream_alarms(StreamAlarmsRequest {
|
|
client_correlation_id: next_correlation_id("test-stream-alarms"),
|
|
alarm_filter_prefix: String::new(),
|
|
})
|
|
.await
|
|
.unwrap();
|
|
|
|
let first = stream.next().await.unwrap().unwrap();
|
|
let second = stream.next().await.unwrap().unwrap();
|
|
let third = stream.next().await.unwrap().unwrap();
|
|
|
|
assert!(matches!(
|
|
first.payload,
|
|
Some(alarm_feed_message::Payload::ActiveAlarm(_))
|
|
));
|
|
assert!(matches!(
|
|
second.payload,
|
|
Some(alarm_feed_message::Payload::SnapshotComplete(true))
|
|
));
|
|
assert!(matches!(
|
|
third.payload,
|
|
Some(alarm_feed_message::Payload::Transition(_))
|
|
));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn cli_subcommands_propagate_unique_correlation_ids_from_next_correlation_id() {
|
|
// The CLI's `stream-alarms` and `acknowledge-alarm` paths used to
|
|
// hard-code their correlation ids (Client.Rust-023). Verify the
|
|
// resolution end-to-end through `next_correlation_id`: every call
|
|
// observed at the fake gateway has a unique id that embeds the
|
|
// `cli-...` label, so concurrent CLI smokes can tell collisions apart.
|
|
let state = Arc::new(FakeState::default());
|
|
let endpoint = spawn_fake_gateway(state.clone()).await;
|
|
let client = GatewayClient::connect(ClientOptions::new(endpoint))
|
|
.await
|
|
.unwrap();
|
|
|
|
let first_corr = next_correlation_id("cli-stream-alarms");
|
|
let _ = client
|
|
.stream_alarms(StreamAlarmsRequest {
|
|
client_correlation_id: first_corr.clone(),
|
|
alarm_filter_prefix: String::new(),
|
|
})
|
|
.await
|
|
.unwrap();
|
|
assert_eq!(
|
|
*state.last_correlation_id.lock().await,
|
|
Some(first_corr.clone())
|
|
);
|
|
|
|
let second_corr = next_correlation_id("cli-stream-alarms");
|
|
assert_ne!(first_corr, second_corr);
|
|
assert!(second_corr.contains("cli-stream-alarms"));
|
|
|
|
let third_corr = next_correlation_id("cli-acknowledge-alarm");
|
|
let _ = client
|
|
.acknowledge_alarm(AcknowledgeAlarmRequest {
|
|
client_correlation_id: third_corr.clone(),
|
|
alarm_full_reference: "Tank01.Level.HiHi".to_owned(),
|
|
comment: String::new(),
|
|
operator_user: String::new(),
|
|
})
|
|
.await
|
|
.unwrap();
|
|
assert_eq!(*state.last_correlation_id.lock().await, Some(third_corr));
|
|
}
|
|
|
|
#[derive(Default)]
|
|
struct FakeState {
|
|
authorization: Mutex<Option<String>>,
|
|
last_command_kind: Mutex<Option<i32>>,
|
|
last_correlation_id: Mutex<Option<String>>,
|
|
stream_dropped: Arc<AtomicBool>,
|
|
/// Optional per-test override that pins the fake's `Invoke` handler to
|
|
/// a specific reply shape (or `Err(Status)`). The default of `None`
|
|
/// keeps the existing happy-path dispatcher.
|
|
invoke_override: Mutex<Option<InvokeOverride>>,
|
|
/// Optional per-test override that pins the fake's `StreamAlarms`
|
|
/// handler to emit a synthetic ConditionRefresh -> snapshot_complete
|
|
/// -> transition sequence.
|
|
stream_alarms_script: Mutex<Option<Vec<AlarmFeedMessage>>>,
|
|
}
|
|
|
|
/// Per-test override for the fake's `Invoke` handler.
|
|
#[allow(dead_code)]
|
|
enum InvokeOverride {
|
|
/// Reply with `protocol_status = Ok` and no `payload` set.
|
|
OkWithoutPayload,
|
|
/// Reply with `protocol_status = Ok` and a deliberately wrong payload
|
|
/// arm — e.g. an `AddItemReply` body when the caller invoked a bulk
|
|
/// command. The variant carries the kind to recognise in tests but the
|
|
/// reply itself is the mismatched-payload shape.
|
|
OkWithMismatchedPayload,
|
|
/// Fail the unary call with `Status::unavailable(...)` so the client's
|
|
/// `Code::Unavailable` -> `Error::Unavailable` mapping is exercised.
|
|
Unavailable(String),
|
|
}
|
|
|
|
#[derive(Clone)]
|
|
struct FakeGateway {
|
|
state: Arc<FakeState>,
|
|
}
|
|
|
|
#[tonic::async_trait]
|
|
impl MxAccessGateway for FakeGateway {
|
|
async fn open_session(
|
|
&self,
|
|
request: Request<OpenSessionRequest>,
|
|
) -> Result<Response<OpenSessionReply>, Status> {
|
|
*self.state.authorization.lock().await = request
|
|
.metadata()
|
|
.get("authorization")
|
|
.and_then(|value| value.to_str().ok())
|
|
.map(str::to_owned);
|
|
|
|
Ok(Response::new(OpenSessionReply {
|
|
session_id: "session-fixture".to_owned(),
|
|
backend_name: "fake".to_owned(),
|
|
worker_process_id: 1234,
|
|
worker_protocol_version: 1,
|
|
gateway_protocol_version: 1,
|
|
protocol_status: Some(ok_status("opened")),
|
|
..OpenSessionReply::default()
|
|
}))
|
|
}
|
|
|
|
async fn close_session(
|
|
&self,
|
|
request: Request<CloseSessionRequest>,
|
|
) -> Result<Response<CloseSessionReply>, Status> {
|
|
Ok(Response::new(CloseSessionReply {
|
|
session_id: request.into_inner().session_id,
|
|
final_state: SessionState::Closed as i32,
|
|
protocol_status: Some(ok_status("closed")),
|
|
}))
|
|
}
|
|
|
|
async fn invoke(
|
|
&self,
|
|
request: Request<
|
|
zb_mom_ww_mxgateway_client::generated::mxaccess_gateway::v1::MxCommandRequest,
|
|
>,
|
|
) -> Result<Response<MxCommandReply>, Status> {
|
|
let request = request.into_inner();
|
|
let kind = request
|
|
.command
|
|
.as_ref()
|
|
.map(|command| command.kind)
|
|
.unwrap_or_default();
|
|
*self.state.last_command_kind.lock().await = Some(kind);
|
|
*self.state.last_correlation_id.lock().await = Some(request.client_correlation_id.clone());
|
|
|
|
// Honour any per-test override before falling through to the
|
|
// happy-path dispatcher.
|
|
if let Some(override_) = self.state.invoke_override.lock().await.take() {
|
|
return match override_ {
|
|
InvokeOverride::OkWithoutPayload => Ok(Response::new(MxCommandReply {
|
|
session_id: request.session_id,
|
|
correlation_id: "fake-correlation".to_owned(),
|
|
kind,
|
|
protocol_status: Some(ok_status("command ok")),
|
|
payload: None,
|
|
..MxCommandReply::default()
|
|
})),
|
|
InvokeOverride::OkWithMismatchedPayload => Ok(Response::new(MxCommandReply {
|
|
session_id: request.session_id,
|
|
correlation_id: "fake-correlation".to_owned(),
|
|
kind,
|
|
protocol_status: Some(ok_status("command ok")),
|
|
// Deliberately the wrong payload arm — `AddItemReply`
|
|
// for whatever command was actually invoked.
|
|
payload: Some(mx_command_reply::Payload::AddItem(AddItemReply {
|
|
item_handle: 99,
|
|
})),
|
|
..MxCommandReply::default()
|
|
})),
|
|
InvokeOverride::Unavailable(message) => Err(Status::unavailable(message)),
|
|
};
|
|
}
|
|
|
|
if kind == MxCommandKind::Write as i32 {
|
|
return Ok(Response::new(mxaccess_failure_reply()));
|
|
}
|
|
|
|
if kind == MxCommandKind::SubscribeBulk as i32 {
|
|
return Ok(Response::new(MxCommandReply {
|
|
session_id: request.session_id,
|
|
correlation_id: "fake-correlation".to_owned(),
|
|
kind,
|
|
protocol_status: Some(ok_status("command ok")),
|
|
payload: Some(mx_command_reply::Payload::SubscribeBulk(
|
|
BulkSubscribeReply {
|
|
results: vec![SubscribeResult {
|
|
server_handle: 12,
|
|
tag_address: "Area001.Pump001.Speed".to_owned(),
|
|
item_handle: 34,
|
|
was_successful: true,
|
|
error_message: String::new(),
|
|
}],
|
|
},
|
|
)),
|
|
..MxCommandReply::default()
|
|
}));
|
|
}
|
|
|
|
if kind == MxCommandKind::Register as i32 {
|
|
return Ok(Response::new(MxCommandReply {
|
|
session_id: request.session_id,
|
|
correlation_id: "fake-correlation".to_owned(),
|
|
kind,
|
|
protocol_status: Some(ok_status("command ok")),
|
|
payload: Some(mx_command_reply::Payload::Register(RegisterReply {
|
|
server_handle: 12,
|
|
})),
|
|
..MxCommandReply::default()
|
|
}));
|
|
}
|
|
|
|
if kind == MxCommandKind::AddItem2 as i32 {
|
|
return Ok(Response::new(MxCommandReply {
|
|
session_id: request.session_id,
|
|
correlation_id: "fake-correlation".to_owned(),
|
|
kind,
|
|
protocol_status: Some(ok_status("command ok")),
|
|
payload: Some(mx_command_reply::Payload::AddItem2(AddItem2Reply {
|
|
item_handle: 56,
|
|
})),
|
|
..MxCommandReply::default()
|
|
}));
|
|
}
|
|
|
|
if kind == MxCommandKind::ReadBulk as i32 {
|
|
return Ok(Response::new(MxCommandReply {
|
|
session_id: request.session_id,
|
|
correlation_id: "fake-correlation".to_owned(),
|
|
kind,
|
|
protocol_status: Some(ok_status("command ok")),
|
|
payload: Some(mx_command_reply::Payload::ReadBulk(BulkReadReply {
|
|
results: vec![BulkReadResult {
|
|
server_handle: 12,
|
|
tag_address: "Area001.Pump001.Speed".to_owned(),
|
|
item_handle: 34,
|
|
was_successful: true,
|
|
was_cached: true,
|
|
..BulkReadResult::default()
|
|
}],
|
|
})),
|
|
..MxCommandReply::default()
|
|
}));
|
|
}
|
|
|
|
if kind == MxCommandKind::WriteBulk as i32 {
|
|
return Ok(Response::new(write_bulk_reply_for(
|
|
request.session_id,
|
|
kind,
|
|
mx_command_reply::Payload::WriteBulk(BulkWriteReply {
|
|
results: vec![bulk_write_result_ok(12, 34)],
|
|
}),
|
|
)));
|
|
}
|
|
|
|
if kind == MxCommandKind::Write2Bulk as i32 {
|
|
return Ok(Response::new(write_bulk_reply_for(
|
|
request.session_id,
|
|
kind,
|
|
mx_command_reply::Payload::Write2Bulk(BulkWriteReply {
|
|
results: vec![bulk_write_result_ok(12, 34)],
|
|
}),
|
|
)));
|
|
}
|
|
|
|
if kind == MxCommandKind::WriteSecuredBulk as i32 {
|
|
return Ok(Response::new(write_bulk_reply_for(
|
|
request.session_id,
|
|
kind,
|
|
mx_command_reply::Payload::WriteSecuredBulk(BulkWriteReply {
|
|
results: vec![bulk_write_result_ok(12, 34)],
|
|
}),
|
|
)));
|
|
}
|
|
|
|
if kind == MxCommandKind::WriteSecured2Bulk as i32 {
|
|
return Ok(Response::new(write_bulk_reply_for(
|
|
request.session_id,
|
|
kind,
|
|
mx_command_reply::Payload::WriteSecured2Bulk(BulkWriteReply {
|
|
results: vec![bulk_write_result_ok(12, 34)],
|
|
}),
|
|
)));
|
|
}
|
|
|
|
Ok(Response::new(MxCommandReply {
|
|
session_id: request.session_id,
|
|
correlation_id: "fake-correlation".to_owned(),
|
|
kind,
|
|
protocol_status: Some(ok_status("command ok")),
|
|
payload: Some(mx_command_reply::Payload::AddItem(AddItemReply {
|
|
item_handle: 34,
|
|
})),
|
|
..MxCommandReply::default()
|
|
}))
|
|
}
|
|
|
|
type StreamEventsStream = DropAwareStream;
|
|
|
|
async fn stream_events(
|
|
&self,
|
|
_request: Request<StreamEventsRequest>,
|
|
) -> Result<Response<Self::StreamEventsStream>, Status> {
|
|
let (sender, receiver) = mpsc::channel(4);
|
|
sender.send(Ok(event(1))).await.unwrap();
|
|
sender.send(Ok(event(2))).await.unwrap();
|
|
|
|
Ok(Response::new(DropAwareStream {
|
|
inner: ReceiverStream::new(receiver),
|
|
dropped: self.state.stream_dropped.clone(),
|
|
}))
|
|
}
|
|
|
|
async fn acknowledge_alarm(
|
|
&self,
|
|
request: Request<AcknowledgeAlarmRequest>,
|
|
) -> Result<Response<AcknowledgeAlarmReply>, Status> {
|
|
*self.state.last_correlation_id.lock().await =
|
|
Some(request.into_inner().client_correlation_id);
|
|
Ok(Response::new(AcknowledgeAlarmReply {
|
|
correlation_id: "corr-1".to_owned(),
|
|
protocol_status: Some(ok_status("ack ok")),
|
|
status: Some(MxStatusProxy {
|
|
success: 1,
|
|
category: MxStatusCategory::Ok as i32,
|
|
detected_by: MxStatusSource::RespondingLmx as i32,
|
|
..MxStatusProxy::default()
|
|
}),
|
|
..AcknowledgeAlarmReply::default()
|
|
}))
|
|
}
|
|
|
|
type StreamAlarmsStream =
|
|
Pin<Box<dyn Stream<Item = Result<AlarmFeedMessage, Status>> + Send + 'static>>;
|
|
|
|
async fn stream_alarms(
|
|
&self,
|
|
request: Request<StreamAlarmsRequest>,
|
|
) -> Result<Response<Self::StreamAlarmsStream>, Status> {
|
|
*self.state.last_correlation_id.lock().await =
|
|
Some(request.into_inner().client_correlation_id);
|
|
let script = self.state.stream_alarms_script.lock().await.take();
|
|
let (sender, receiver) =
|
|
mpsc::channel::<Result<AlarmFeedMessage, Status>>(script.as_ref().map_or(1, Vec::len));
|
|
if let Some(messages) = script {
|
|
for message in messages {
|
|
sender.send(Ok(message)).await.unwrap();
|
|
}
|
|
}
|
|
let stream = ReceiverStream::new(receiver);
|
|
Ok(Response::new(Box::pin(stream)))
|
|
}
|
|
|
|
type QueryActiveAlarmsStream =
|
|
Pin<Box<dyn Stream<Item = Result<ActiveAlarmSnapshot, Status>> + Send + 'static>>;
|
|
|
|
async fn query_active_alarms(
|
|
&self,
|
|
_request: Request<QueryActiveAlarmsRequest>,
|
|
) -> Result<Response<Self::QueryActiveAlarmsStream>, Status> {
|
|
let (sender, receiver) = mpsc::channel(4);
|
|
sender
|
|
.send(Ok(ActiveAlarmSnapshot {
|
|
alarm_full_reference: "Tank01.Level.HiHi".to_owned(),
|
|
..ActiveAlarmSnapshot::default()
|
|
}))
|
|
.await
|
|
.unwrap();
|
|
let stream = ReceiverStream::new(receiver);
|
|
Ok(Response::new(Box::pin(stream)))
|
|
}
|
|
}
|
|
|
|
struct DropAwareStream {
|
|
inner: ReceiverStream<Result<MxEvent, Status>>,
|
|
dropped: Arc<AtomicBool>,
|
|
}
|
|
|
|
impl Stream for DropAwareStream {
|
|
type Item = Result<MxEvent, Status>;
|
|
|
|
fn poll_next(mut self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
|
Pin::new(&mut self.inner).poll_next(context)
|
|
}
|
|
}
|
|
|
|
impl Drop for DropAwareStream {
|
|
fn drop(&mut self) {
|
|
self.dropped.store(true, Ordering::SeqCst);
|
|
}
|
|
}
|
|
|
|
async fn spawn_fake_gateway(state: Arc<FakeState>) -> String {
|
|
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
|
let address = listener.local_addr().unwrap();
|
|
let incoming = TcpListenerStream::new(listener);
|
|
let service = MxAccessGatewayServer::new(FakeGateway { state });
|
|
tokio::spawn(async move {
|
|
Server::builder()
|
|
.add_service(service)
|
|
.serve_with_incoming(incoming)
|
|
.await
|
|
.unwrap();
|
|
});
|
|
|
|
format!("http://{address}")
|
|
}
|
|
|
|
fn write_bulk_reply_for(
|
|
session_id: String,
|
|
kind: i32,
|
|
payload: mx_command_reply::Payload,
|
|
) -> MxCommandReply {
|
|
MxCommandReply {
|
|
session_id,
|
|
correlation_id: "fake-correlation".to_owned(),
|
|
kind,
|
|
protocol_status: Some(ok_status("command ok")),
|
|
payload: Some(payload),
|
|
..MxCommandReply::default()
|
|
}
|
|
}
|
|
|
|
fn bulk_write_result_ok(server_handle: i32, item_handle: i32) -> BulkWriteResult {
|
|
BulkWriteResult {
|
|
server_handle,
|
|
item_handle,
|
|
was_successful: true,
|
|
hresult: Some(0),
|
|
statuses: Vec::new(),
|
|
error_message: String::new(),
|
|
}
|
|
}
|
|
|
|
fn ok_status(message: &str) -> ProtocolStatus {
|
|
ProtocolStatus {
|
|
code: ProtocolStatusCode::Ok as i32,
|
|
message: message.to_owned(),
|
|
}
|
|
}
|
|
|
|
fn mxaccess_failure_reply() -> MxCommandReply {
|
|
MxCommandReply {
|
|
session_id: "session-fixture".to_owned(),
|
|
correlation_id: "gateway-correlation-write-1".to_owned(),
|
|
kind: MxCommandKind::Write as i32,
|
|
protocol_status: Some(ProtocolStatus {
|
|
code: ProtocolStatusCode::MxaccessFailure as i32,
|
|
message: "MXAccess rejected the write.".to_owned(),
|
|
}),
|
|
hresult: Some(-2147220992),
|
|
statuses: vec![
|
|
MxStatusProxy {
|
|
success: 0,
|
|
category: MxStatusCategory::SecurityError as i32,
|
|
detected_by: MxStatusSource::RespondingLmx as i32,
|
|
detail: 321,
|
|
raw_category: 8,
|
|
raw_detected_by: 3,
|
|
diagnostic_text: "Write denied by provider security.".to_owned(),
|
|
},
|
|
MxStatusProxy {
|
|
success: 0,
|
|
category: MxStatusCategory::OperationalError as i32,
|
|
detected_by: MxStatusSource::RespondingNmx as i32,
|
|
detail: 902,
|
|
raw_category: 7,
|
|
raw_detected_by: 5,
|
|
diagnostic_text: "Provider rejected the item state.".to_owned(),
|
|
},
|
|
],
|
|
..MxCommandReply::default()
|
|
}
|
|
}
|
|
|
|
fn event(sequence: u64) -> MxEvent {
|
|
MxEvent {
|
|
family: MxEventFamily::OnDataChange as i32,
|
|
session_id: "session-fixture".to_owned(),
|
|
worker_sequence: sequence,
|
|
..MxEvent::default()
|
|
}
|
|
}
|
|
|
|
fn behavior_fixture(path: &str) -> Value {
|
|
let path = std::path::Path::new(env!("CARGO_MANIFEST_DIR"))
|
|
.join("../proto/fixtures/behavior")
|
|
.join(path);
|
|
let data = std::fs::read_to_string(&path).unwrap();
|
|
serde_json::from_str(&data).unwrap()
|
|
}
|
|
|
|
fn case_by_id<'a>(cases: &'a [Value], id: &str) -> &'a Value {
|
|
cases
|
|
.iter()
|
|
.find(|case| case["id"].as_str() == Some(id))
|
|
.unwrap_or_else(|| panic!("missing fixture case {id}"))
|
|
}
|