Resolve Client.Rust-022..029: MalformedReply, correlation ids, clippy

Client.Rust-022  Restored Error::MalformedReply for register / add_item /
                 add_item2 and the bulk-subscribe / read-bulk / write-bulk
                 dispatch arms so malformed-but-OK replies fail loudly
                 instead of returning Vec::new().
Client.Rust-023  Restored next_correlation_id and routed every CLI close /
                 stream-alarms / acknowledge-alarm / bench-read-bulk call
                 through it so each call carries a unique opaque token.
Client.Rust-024  Added round-trip tests for read_bulk / write_bulk /
                 write2_bulk / write_secured_bulk / write_secured2_bulk
                 plus stream_alarms and percentile_summary unit tests.
Client.Rust-025  RustClientDesign.md re-synced — new bulk SDK, alarms
                 surface, Error variants, CLI command list, and the
                 Windows stack workaround.
Client.Rust-026  Session::read_bulk now borrows a tag slice; bench-read-
                 bulk binds tags once outside the warm-up / steady-state
                 loops.
Client.Rust-027  .cargo/config.toml selector tightened to
                 cfg(all(windows, target_env = "msvc")) and comment
                 rewritten to match reality (release + debug ship the
                 8 MB reservation).
Client.Rust-028  run_batch removed the empty-line break; stdin EOF is
                 the only terminator.
Client.Rust-029  Re-applied Client.Rust-001 / 002 / 012 — added the
                 missing doc comments, renamed BulkReplyKind variants,
                 and replaced the clone-on-copy with a deref under lock
                 so cargo clippy -D warnings is clean.

All resolved at 2026-05-24; cargo fmt + check + clippy + test all green
(55 tests).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-24 08:50:15 -04:00
parent 82996aa8e6
commit 4a0f88b17d
10 changed files with 922 additions and 120 deletions
+569 -11
View File
@@ -20,16 +20,19 @@ use zb_mom_ww_mxgateway_client::generated::mxaccess_gateway::v1::mx_access_gatew
use zb_mom_ww_mxgateway_client::generated::mxaccess_gateway::v1::mx_command_reply;
use zb_mom_ww_mxgateway_client::generated::mxaccess_gateway::v1::mx_value::Kind;
use zb_mom_ww_mxgateway_client::generated::mxaccess_gateway::v1::{
AcknowledgeAlarmReply, AcknowledgeAlarmRequest, ActiveAlarmSnapshot, AddItemReply,
AlarmFeedMessage, BulkSubscribeReply, CloseSessionReply, CloseSessionRequest, MxCommandKind,
MxCommandReply, MxDataType, MxEvent, MxEventFamily, MxStatusCategory, MxStatusProxy,
MxStatusSource, MxValue, OpenSessionReply, OpenSessionRequest, ProtocolStatus,
ProtocolStatusCode, QueryActiveAlarmsRequest, SessionState, StreamAlarmsRequest,
StreamEventsRequest, SubscribeResult,
alarm_feed_message, AcknowledgeAlarmReply, AcknowledgeAlarmRequest, ActiveAlarmSnapshot,
AddItem2Reply, AddItemReply, AlarmConditionState, AlarmFeedMessage, AlarmTransitionKind,
BulkReadReply, BulkReadResult, BulkSubscribeReply, BulkWriteReply, BulkWriteResult,
CloseSessionReply, CloseSessionRequest, MxCommandKind, MxCommandReply, MxDataType, MxEvent,
MxEventFamily, MxStatusCategory, MxStatusProxy, MxStatusSource, MxValue,
OnAlarmTransitionEvent, OpenSessionReply, OpenSessionRequest, ProtocolStatus,
ProtocolStatusCode, QueryActiveAlarmsRequest, RegisterReply, SessionState, StreamAlarmsRequest,
StreamEventsRequest, SubscribeResult, Write2BulkEntry, WriteBulkEntry, WriteSecured2BulkEntry,
WriteSecuredBulkEntry,
};
use zb_mom_ww_mxgateway_client::{
ApiKey, ClientOptions, CommandError, Error, GatewayClient, MxStatus, MxValue as ClientMxValue,
MxValueProjection,
next_correlation_id, ApiKey, ClientOptions, CommandError, Error, GatewayClient, MxStatus,
MxValue as ClientMxValue, MxValueProjection,
};
#[tokio::test]
@@ -272,11 +275,414 @@ fn command_error_display_keeps_raw_reply_accessible() {
assert!(error.to_string().contains("MxaccessFailure"));
}
// ---- Client.Rust-022 / 024 regression coverage ---------------------------
#[tokio::test]
async fn register_returns_malformed_reply_when_ok_reply_has_no_payload() {
let state = Arc::new(FakeState::default());
*state.invoke_override.lock().await = Some(InvokeOverride::OkWithoutPayload);
let endpoint = spawn_fake_gateway(state.clone()).await;
let client = GatewayClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let session = client.session("session-fixture");
let error = session.register("client").await.unwrap_err();
assert!(
matches!(error, Error::MalformedReply { .. }),
"expected MalformedReply, got {error:?}"
);
}
#[tokio::test]
async fn add_item_returns_malformed_reply_when_ok_reply_has_no_payload() {
let state = Arc::new(FakeState::default());
*state.invoke_override.lock().await = Some(InvokeOverride::OkWithoutPayload);
let endpoint = spawn_fake_gateway(state.clone()).await;
let client = GatewayClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let session = client.session("session-fixture");
let error = session.add_item(12, "Plant.Area.Tag").await.unwrap_err();
assert!(
matches!(error, Error::MalformedReply { .. }),
"expected MalformedReply, got {error:?}"
);
}
#[tokio::test]
async fn add_item2_returns_malformed_reply_when_ok_reply_has_no_payload() {
let state = Arc::new(FakeState::default());
*state.invoke_override.lock().await = Some(InvokeOverride::OkWithoutPayload);
let endpoint = spawn_fake_gateway(state.clone()).await;
let client = GatewayClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let session = client.session("session-fixture");
let error = session
.add_item2(12, "Plant.Area.Tag", "ctx")
.await
.unwrap_err();
assert!(
matches!(error, Error::MalformedReply { .. }),
"expected MalformedReply, got {error:?}"
);
}
#[tokio::test]
async fn subscribe_bulk_returns_malformed_reply_on_mismatched_payload_arm() {
let state = Arc::new(FakeState::default());
*state.invoke_override.lock().await = Some(InvokeOverride::OkWithMismatchedPayload);
let endpoint = spawn_fake_gateway(state.clone()).await;
let client = GatewayClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let session = client.session("session-fixture");
let error = session
.subscribe_bulk(12, vec!["Area001.Pump001.Speed".to_owned()])
.await
.unwrap_err();
assert!(
matches!(error, Error::MalformedReply { .. }),
"expected MalformedReply, got {error:?}"
);
}
#[tokio::test]
async fn read_bulk_returns_malformed_reply_on_mismatched_payload_arm() {
let state = Arc::new(FakeState::default());
*state.invoke_override.lock().await = Some(InvokeOverride::OkWithMismatchedPayload);
let endpoint = spawn_fake_gateway(state.clone()).await;
let client = GatewayClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let session = client.session("session-fixture");
let error = session
.read_bulk(12, &["Area001.Pump001.Speed".to_owned()], 1000)
.await
.unwrap_err();
assert!(
matches!(error, Error::MalformedReply { .. }),
"expected MalformedReply, got {error:?}"
);
}
#[tokio::test]
async fn write_bulk_returns_malformed_reply_on_mismatched_payload_arm() {
let state = Arc::new(FakeState::default());
*state.invoke_override.lock().await = Some(InvokeOverride::OkWithMismatchedPayload);
let endpoint = spawn_fake_gateway(state.clone()).await;
let client = GatewayClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let session = client.session("session-fixture");
let error = session
.write_bulk(
12,
vec![WriteBulkEntry {
item_handle: 34,
value: Some(ClientMxValue::int32(1).into_proto()),
user_id: 0,
}],
)
.await
.unwrap_err();
assert!(
matches!(error, Error::MalformedReply { .. }),
"expected MalformedReply, got {error:?}"
);
}
#[tokio::test]
async fn unary_invoke_maps_status_unavailable_to_error_unavailable() {
let state = Arc::new(FakeState::default());
*state.invoke_override.lock().await =
Some(InvokeOverride::Unavailable("gateway restarting".to_owned()));
let endpoint = spawn_fake_gateway(state.clone()).await;
let client = GatewayClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let session = client.session("session-fixture");
let error = session.add_item(12, "Plant.Area.Tag").await.unwrap_err();
assert!(
matches!(error, Error::Unavailable { .. }),
"expected Unavailable, got {error:?}"
);
}
#[tokio::test]
async fn read_bulk_round_trips_through_the_fake_gateway() {
let state = Arc::new(FakeState::default());
let endpoint = spawn_fake_gateway(state.clone()).await;
let client = GatewayClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let session = client.session("session-fixture");
let results = session
.read_bulk(12, &["Area001.Pump001.Speed".to_owned()], 1000)
.await
.unwrap();
assert_eq!(results.len(), 1);
assert!(results[0].was_successful);
assert!(results[0].was_cached);
}
#[tokio::test]
async fn write_bulk_round_trips_through_the_fake_gateway() {
let state = Arc::new(FakeState::default());
let endpoint = spawn_fake_gateway(state.clone()).await;
let client = GatewayClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let session = client.session("session-fixture");
let results = session
.write_bulk(
12,
vec![WriteBulkEntry {
item_handle: 34,
value: Some(ClientMxValue::int32(1).into_proto()),
user_id: 0,
}],
)
.await
.unwrap();
assert_eq!(results.len(), 1);
assert!(results[0].was_successful);
let last_command = state.last_command_kind.lock().await;
assert_eq!(*last_command, Some(MxCommandKind::WriteBulk as i32));
}
#[tokio::test]
async fn write2_bulk_round_trips_through_the_fake_gateway() {
let state = Arc::new(FakeState::default());
let endpoint = spawn_fake_gateway(state.clone()).await;
let client = GatewayClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let session = client.session("session-fixture");
let results = session
.write2_bulk(
12,
vec![Write2BulkEntry {
item_handle: 34,
value: Some(ClientMxValue::int32(1).into_proto()),
timestamp_value: Some(ClientMxValue::string("2026-05-24T00:00:00Z").into_proto()),
user_id: 0,
}],
)
.await
.unwrap();
assert_eq!(results.len(), 1);
assert!(results[0].was_successful);
let last_command = state.last_command_kind.lock().await;
assert_eq!(*last_command, Some(MxCommandKind::Write2Bulk as i32));
}
#[tokio::test]
async fn write_secured_bulk_round_trips_through_the_fake_gateway() {
let state = Arc::new(FakeState::default());
let endpoint = spawn_fake_gateway(state.clone()).await;
let client = GatewayClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let session = client.session("session-fixture");
let results = session
.write_secured_bulk(
12,
vec![WriteSecuredBulkEntry {
item_handle: 34,
value: Some(ClientMxValue::int32(1).into_proto()),
current_user_id: 0,
verifier_user_id: 0,
}],
)
.await
.unwrap();
assert_eq!(results.len(), 1);
assert!(results[0].was_successful);
let last_command = state.last_command_kind.lock().await;
assert_eq!(*last_command, Some(MxCommandKind::WriteSecuredBulk as i32));
}
#[tokio::test]
async fn write_secured2_bulk_round_trips_through_the_fake_gateway() {
let state = Arc::new(FakeState::default());
let endpoint = spawn_fake_gateway(state.clone()).await;
let client = GatewayClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let session = client.session("session-fixture");
let results = session
.write_secured2_bulk(
12,
vec![WriteSecured2BulkEntry {
item_handle: 34,
value: Some(ClientMxValue::int32(1).into_proto()),
timestamp_value: Some(ClientMxValue::string("2026-05-24T00:00:00Z").into_proto()),
current_user_id: 0,
verifier_user_id: 0,
}],
)
.await
.unwrap();
assert_eq!(results.len(), 1);
assert!(results[0].was_successful);
let last_command = state.last_command_kind.lock().await;
assert_eq!(*last_command, Some(MxCommandKind::WriteSecured2Bulk as i32));
}
#[tokio::test]
async fn stream_alarms_emits_snapshot_then_complete_then_transition_in_order() {
let state = Arc::new(FakeState::default());
*state.stream_alarms_script.lock().await = Some(vec![
AlarmFeedMessage {
payload: Some(alarm_feed_message::Payload::ActiveAlarm(
ActiveAlarmSnapshot {
alarm_full_reference: "Tank01.Level.HiHi".to_owned(),
current_state: AlarmConditionState::Active as i32,
..ActiveAlarmSnapshot::default()
},
)),
},
AlarmFeedMessage {
payload: Some(alarm_feed_message::Payload::SnapshotComplete(true)),
},
AlarmFeedMessage {
payload: Some(alarm_feed_message::Payload::Transition(
OnAlarmTransitionEvent {
alarm_full_reference: "Tank01.Level.HiHi".to_owned(),
transition_kind: AlarmTransitionKind::Raise as i32,
..OnAlarmTransitionEvent::default()
},
)),
},
]);
let endpoint = spawn_fake_gateway(state.clone()).await;
let client = GatewayClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let mut stream = client
.stream_alarms(StreamAlarmsRequest {
client_correlation_id: next_correlation_id("test-stream-alarms"),
alarm_filter_prefix: String::new(),
})
.await
.unwrap();
let first = stream.next().await.unwrap().unwrap();
let second = stream.next().await.unwrap().unwrap();
let third = stream.next().await.unwrap().unwrap();
assert!(matches!(
first.payload,
Some(alarm_feed_message::Payload::ActiveAlarm(_))
));
assert!(matches!(
second.payload,
Some(alarm_feed_message::Payload::SnapshotComplete(true))
));
assert!(matches!(
third.payload,
Some(alarm_feed_message::Payload::Transition(_))
));
}
#[tokio::test]
async fn cli_subcommands_propagate_unique_correlation_ids_from_next_correlation_id() {
// The CLI's `stream-alarms` and `acknowledge-alarm` paths used to
// hard-code their correlation ids (Client.Rust-023). Verify the
// resolution end-to-end through `next_correlation_id`: every call
// observed at the fake gateway has a unique id that embeds the
// `cli-...` label, so concurrent CLI smokes can tell collisions apart.
let state = Arc::new(FakeState::default());
let endpoint = spawn_fake_gateway(state.clone()).await;
let client = GatewayClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let first_corr = next_correlation_id("cli-stream-alarms");
let _ = client
.stream_alarms(StreamAlarmsRequest {
client_correlation_id: first_corr.clone(),
alarm_filter_prefix: String::new(),
})
.await
.unwrap();
assert_eq!(
*state.last_correlation_id.lock().await,
Some(first_corr.clone())
);
let second_corr = next_correlation_id("cli-stream-alarms");
assert_ne!(first_corr, second_corr);
assert!(second_corr.contains("cli-stream-alarms"));
let third_corr = next_correlation_id("cli-acknowledge-alarm");
let _ = client
.acknowledge_alarm(AcknowledgeAlarmRequest {
client_correlation_id: third_corr.clone(),
alarm_full_reference: "Tank01.Level.HiHi".to_owned(),
comment: String::new(),
operator_user: String::new(),
})
.await
.unwrap();
assert_eq!(*state.last_correlation_id.lock().await, Some(third_corr));
}
#[derive(Default)]
struct FakeState {
authorization: Mutex<Option<String>>,
last_command_kind: Mutex<Option<i32>>,
last_correlation_id: Mutex<Option<String>>,
stream_dropped: Arc<AtomicBool>,
/// Optional per-test override that pins the fake's `Invoke` handler to
/// a specific reply shape (or `Err(Status)`). The default of `None`
/// keeps the existing happy-path dispatcher.
invoke_override: Mutex<Option<InvokeOverride>>,
/// Optional per-test override that pins the fake's `StreamAlarms`
/// handler to emit a synthetic ConditionRefresh -> snapshot_complete
/// -> transition sequence.
stream_alarms_script: Mutex<Option<Vec<AlarmFeedMessage>>>,
}
/// Per-test override for the fake's `Invoke` handler.
#[allow(dead_code)]
enum InvokeOverride {
/// Reply with `protocol_status = Ok` and no `payload` set.
OkWithoutPayload,
/// Reply with `protocol_status = Ok` and a deliberately wrong payload
/// arm — e.g. an `AddItemReply` body when the caller invoked a bulk
/// command. The variant carries the kind to recognise in tests but the
/// reply itself is the mismatched-payload shape.
OkWithMismatchedPayload,
/// Fail the unary call with `Status::unavailable(...)` so the client's
/// `Code::Unavailable` -> `Error::Unavailable` mapping is exercised.
Unavailable(String),
}
#[derive(Clone)]
@@ -331,6 +737,35 @@ impl MxAccessGateway for FakeGateway {
.map(|command| command.kind)
.unwrap_or_default();
*self.state.last_command_kind.lock().await = Some(kind);
*self.state.last_correlation_id.lock().await = Some(request.client_correlation_id.clone());
// Honour any per-test override before falling through to the
// happy-path dispatcher.
if let Some(override_) = self.state.invoke_override.lock().await.take() {
return match override_ {
InvokeOverride::OkWithoutPayload => Ok(Response::new(MxCommandReply {
session_id: request.session_id,
correlation_id: "fake-correlation".to_owned(),
kind,
protocol_status: Some(ok_status("command ok")),
payload: None,
..MxCommandReply::default()
})),
InvokeOverride::OkWithMismatchedPayload => Ok(Response::new(MxCommandReply {
session_id: request.session_id,
correlation_id: "fake-correlation".to_owned(),
kind,
protocol_status: Some(ok_status("command ok")),
// Deliberately the wrong payload arm — `AddItemReply`
// for whatever command was actually invoked.
payload: Some(mx_command_reply::Payload::AddItem(AddItemReply {
item_handle: 99,
})),
..MxCommandReply::default()
})),
InvokeOverride::Unavailable(message) => Err(Status::unavailable(message)),
};
}
if kind == MxCommandKind::Write as i32 {
return Ok(Response::new(mxaccess_failure_reply()));
@@ -357,6 +792,92 @@ impl MxAccessGateway for FakeGateway {
}));
}
if kind == MxCommandKind::Register as i32 {
return Ok(Response::new(MxCommandReply {
session_id: request.session_id,
correlation_id: "fake-correlation".to_owned(),
kind,
protocol_status: Some(ok_status("command ok")),
payload: Some(mx_command_reply::Payload::Register(RegisterReply {
server_handle: 12,
})),
..MxCommandReply::default()
}));
}
if kind == MxCommandKind::AddItem2 as i32 {
return Ok(Response::new(MxCommandReply {
session_id: request.session_id,
correlation_id: "fake-correlation".to_owned(),
kind,
protocol_status: Some(ok_status("command ok")),
payload: Some(mx_command_reply::Payload::AddItem2(AddItem2Reply {
item_handle: 56,
})),
..MxCommandReply::default()
}));
}
if kind == MxCommandKind::ReadBulk as i32 {
return Ok(Response::new(MxCommandReply {
session_id: request.session_id,
correlation_id: "fake-correlation".to_owned(),
kind,
protocol_status: Some(ok_status("command ok")),
payload: Some(mx_command_reply::Payload::ReadBulk(BulkReadReply {
results: vec![BulkReadResult {
server_handle: 12,
tag_address: "Area001.Pump001.Speed".to_owned(),
item_handle: 34,
was_successful: true,
was_cached: true,
..BulkReadResult::default()
}],
})),
..MxCommandReply::default()
}));
}
if kind == MxCommandKind::WriteBulk as i32 {
return Ok(Response::new(write_bulk_reply_for(
request.session_id,
kind,
mx_command_reply::Payload::WriteBulk(BulkWriteReply {
results: vec![bulk_write_result_ok(12, 34)],
}),
)));
}
if kind == MxCommandKind::Write2Bulk as i32 {
return Ok(Response::new(write_bulk_reply_for(
request.session_id,
kind,
mx_command_reply::Payload::Write2Bulk(BulkWriteReply {
results: vec![bulk_write_result_ok(12, 34)],
}),
)));
}
if kind == MxCommandKind::WriteSecuredBulk as i32 {
return Ok(Response::new(write_bulk_reply_for(
request.session_id,
kind,
mx_command_reply::Payload::WriteSecuredBulk(BulkWriteReply {
results: vec![bulk_write_result_ok(12, 34)],
}),
)));
}
if kind == MxCommandKind::WriteSecured2Bulk as i32 {
return Ok(Response::new(write_bulk_reply_for(
request.session_id,
kind,
mx_command_reply::Payload::WriteSecured2Bulk(BulkWriteReply {
results: vec![bulk_write_result_ok(12, 34)],
}),
)));
}
Ok(Response::new(MxCommandReply {
session_id: request.session_id,
correlation_id: "fake-correlation".to_owned(),
@@ -387,8 +908,10 @@ impl MxAccessGateway for FakeGateway {
async fn acknowledge_alarm(
&self,
_request: Request<AcknowledgeAlarmRequest>,
request: Request<AcknowledgeAlarmRequest>,
) -> Result<Response<AcknowledgeAlarmReply>, Status> {
*self.state.last_correlation_id.lock().await =
Some(request.into_inner().client_correlation_id);
Ok(Response::new(AcknowledgeAlarmReply {
correlation_id: "corr-1".to_owned(),
protocol_status: Some(ok_status("ack ok")),
@@ -407,9 +930,18 @@ impl MxAccessGateway for FakeGateway {
async fn stream_alarms(
&self,
_request: Request<StreamAlarmsRequest>,
request: Request<StreamAlarmsRequest>,
) -> Result<Response<Self::StreamAlarmsStream>, Status> {
let (_sender, receiver) = mpsc::channel::<Result<AlarmFeedMessage, Status>>(1);
*self.state.last_correlation_id.lock().await =
Some(request.into_inner().client_correlation_id);
let script = self.state.stream_alarms_script.lock().await.take();
let (sender, receiver) =
mpsc::channel::<Result<AlarmFeedMessage, Status>>(script.as_ref().map_or(1, Vec::len));
if let Some(messages) = script {
for message in messages {
sender.send(Ok(message)).await.unwrap();
}
}
let stream = ReceiverStream::new(receiver);
Ok(Response::new(Box::pin(stream)))
}
@@ -469,6 +1001,32 @@ async fn spawn_fake_gateway(state: Arc<FakeState>) -> String {
format!("http://{address}")
}
fn write_bulk_reply_for(
session_id: String,
kind: i32,
payload: mx_command_reply::Payload,
) -> MxCommandReply {
MxCommandReply {
session_id,
correlation_id: "fake-correlation".to_owned(),
kind,
protocol_status: Some(ok_status("command ok")),
payload: Some(payload),
..MxCommandReply::default()
}
}
fn bulk_write_result_ok(server_handle: i32, item_handle: i32) -> BulkWriteResult {
BulkWriteResult {
server_handle,
item_handle,
was_successful: true,
hresult: Some(0),
statuses: Vec::new(),
error_message: String::new(),
}
}
fn ok_status(message: &str) -> ProtocolStatus {
ProtocolStatus {
code: ProtocolStatusCode::Ok as i32,