use std::pin::Pin; use std::sync::{ atomic::{AtomicBool, Ordering}, Arc, }; use std::task::{Context, Poll}; use std::time::Duration; use futures_core::Stream; use futures_util::StreamExt; use mxgateway_client::generated::mxaccess_gateway::v1::alarm_feed_message; use mxgateway_client::generated::mxaccess_gateway::v1::mx_access_gateway_server::{ MxAccessGateway, MxAccessGatewayServer, }; use mxgateway_client::generated::mxaccess_gateway::v1::mx_command; use mxgateway_client::generated::mxaccess_gateway::v1::mx_command_reply; use mxgateway_client::generated::mxaccess_gateway::v1::mx_value::Kind; use mxgateway_client::generated::mxaccess_gateway::v1::{ AcknowledgeAlarmReply, AcknowledgeAlarmRequest, ActiveAlarmSnapshot, AddItemReply, AlarmFeedMessage, BulkReadReply, BulkReadResult, BulkSubscribeReply, BulkWriteReply, BulkWriteResult, CloseSessionReply, CloseSessionRequest, MxCommandKind, MxCommandReply, MxDataType, MxEvent, MxEventFamily, MxStatusCategory, MxStatusProxy, MxStatusSource, MxValue, OpenSessionReply, OpenSessionRequest, ProtocolStatus, ProtocolStatusCode, SessionState, StreamAlarmsRequest, StreamEventsRequest, SubscribeResult, Write2BulkEntry, WriteBulkEntry, WriteSecured2BulkEntry, WriteSecuredBulkEntry, }; use mxgateway_client::{ ApiKey, ClientOptions, CommandError, Error, GatewayClient, MxStatus, MxValue as ClientMxValue, MxValueProjection, }; use serde_json::Value; use tokio::net::TcpListener; use tokio::sync::{mpsc, Mutex}; use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream}; use tonic::transport::Server; use tonic::{Request, Response, Status}; #[tokio::test] async fn fake_server_receives_bearer_metadata_and_raw_client_is_reachable() { let state = Arc::new(FakeState::default()); let endpoint = spawn_fake_gateway(state.clone()).await; let mut client = GatewayClient::connect( ClientOptions::new(endpoint).with_api_key(ApiKey::new("mxgw_fixture_secret")), ) .await .unwrap(); let _raw = client.raw_client(); let session = client .open_session(OpenSessionRequest { client_session_name: "rust-test".to_owned(), ..OpenSessionRequest::default() }) .await .unwrap(); assert_eq!(session.id(), "session-fixture"); assert_eq!( state.authorization.lock().await.as_deref(), Some("Bearer mxgw_fixture_secret") ); } #[tokio::test] async fn session_helpers_build_commands_and_preserve_command_errors() { let state = Arc::new(FakeState::default()); let endpoint = spawn_fake_gateway(state.clone()).await; let client = GatewayClient::connect(ClientOptions::new(endpoint)) .await .unwrap(); let session = client.session("session-fixture"); let item_handle = session.add_item(12, "Plant.Area.Tag").await.unwrap(); assert_eq!(item_handle, 34); let last_command = state.last_command_kind.lock().await; assert_eq!(*last_command, Some(MxCommandKind::AddItem as i32)); drop(last_command); let error = session .write(12, 34, ClientMxValue::int32(123), 0) .await .unwrap_err(); let Error::Command(error) = error else { panic!("write failure should preserve the raw command reply: {error:?}"); }; assert_eq!( error.reply().protocol_status.as_ref().unwrap().code, ProtocolStatusCode::MxaccessFailure as i32 ); assert_eq!(error.reply().hresult, Some(-2147220992)); assert_eq!(error.reply().statuses.len(), 2); } #[tokio::test] async fn subscribe_bulk_builds_one_bulk_command_and_returns_results() { let state = Arc::new(FakeState::default()); let endpoint = spawn_fake_gateway(state.clone()).await; let client = GatewayClient::connect(ClientOptions::new(endpoint)) .await .unwrap(); let session = client.session("session-fixture"); let results = session .subscribe_bulk(12, vec!["Area001.Pump001.Speed".to_owned()]) .await .unwrap(); assert_eq!(results[0].item_handle, 34); let last_command = state.last_command_kind.lock().await; assert_eq!(*last_command, Some(MxCommandKind::SubscribeBulk as i32)); } #[tokio::test] async fn write_bulk_builds_one_bulk_command_and_returns_per_entry_results() { let state = Arc::new(FakeState::default()); let endpoint = spawn_fake_gateway(state.clone()).await; let client = GatewayClient::connect(ClientOptions::new(endpoint)) .await .unwrap(); let session = client.session("session-fixture"); let results = session .write_bulk( 12, vec![ WriteBulkEntry { item_handle: 901, value: Some(int_value(11)), user_id: 5, }, WriteBulkEntry { item_handle: 902, value: Some(int_value(22)), user_id: 5, }, ], ) .await .unwrap(); assert_eq!(results.len(), 2); assert!(results[0].was_successful); assert!(!results[1].was_successful); let last_command = state.last_command_kind.lock().await; assert_eq!(*last_command, Some(MxCommandKind::WriteBulk as i32)); } #[tokio::test] async fn read_bulk_forwards_timeout_and_unpacks_cached_flag() { let state = Arc::new(FakeState::default()); let endpoint = spawn_fake_gateway(state.clone()).await; let client = GatewayClient::connect(ClientOptions::new(endpoint)) .await .unwrap(); let session = client.session("session-fixture"); let results = session .read_bulk(12, &["Area001.Pump001.Speed"], 750) .await .unwrap(); let entry = &results[0]; assert!(entry.was_cached); assert_eq!( entry.value.as_ref().and_then(|v| v.kind.as_ref()), Some(&Kind::Int32Value(99)) ); assert_eq!(*state.last_read_bulk_timeout_ms.lock().await, Some(750)); } #[tokio::test] async fn event_stream_preserves_order_and_drop_cancels_server_stream() { let state = Arc::new(FakeState::default()); let endpoint = spawn_fake_gateway(state.clone()).await; let client = GatewayClient::connect(ClientOptions::new(endpoint)) .await .unwrap(); let mut stream = client .stream_events(StreamEventsRequest { session_id: "session-fixture".to_owned(), after_worker_sequence: 0, }) .await .unwrap(); assert_eq!(stream.next().await.unwrap().unwrap().worker_sequence, 1); assert_eq!(stream.next().await.unwrap().unwrap().worker_sequence, 2); drop(stream); for _ in 0..20 { if state.stream_dropped.load(Ordering::SeqCst) { return; } tokio::time::sleep(Duration::from_millis(25)).await; } assert!(state.stream_dropped.load(Ordering::SeqCst)); } #[tokio::test] async fn acknowledge_alarm_returns_reply_with_native_status() { let state = Arc::new(FakeState::default()); let endpoint = spawn_fake_gateway(state.clone()).await; let client = GatewayClient::connect(ClientOptions::new(endpoint)) .await .unwrap(); let reply = client .acknowledge_alarm(AcknowledgeAlarmRequest { client_correlation_id: "corr-1".to_owned(), alarm_full_reference: "Tank01.Level.HiHi".to_owned(), comment: "investigating".to_owned(), operator_user: "alice".to_owned(), }) .await .unwrap(); assert_eq!( reply.protocol_status.as_ref().unwrap().code, ProtocolStatusCode::Ok as i32 ); assert_eq!(reply.status.as_ref().unwrap().success, 1); } #[tokio::test] async fn stream_alarms_streams_snapshot_then_complete() { let state = Arc::new(FakeState::default()); let endpoint = spawn_fake_gateway(state.clone()).await; let client = GatewayClient::connect(ClientOptions::new(endpoint)) .await .unwrap(); let mut stream = client .stream_alarms(StreamAlarmsRequest::default()) .await .unwrap(); let first = stream.next().await.unwrap().unwrap(); match first.payload { Some(alarm_feed_message::Payload::ActiveAlarm(snapshot)) => { assert_eq!(snapshot.alarm_full_reference, "Tank01.Level.HiHi"); } other => panic!("expected an active-alarm snapshot, got {other:?}"), } let second = stream.next().await.unwrap().unwrap(); assert_eq!( second.payload, Some(alarm_feed_message::Payload::SnapshotComplete(true)), ); } #[test] fn value_conversion_fixtures_keep_typed_projection_and_raw_metadata() { let fixture = behavior_fixture("values/value-conversion-cases.json"); let cases = fixture["cases"].as_array().unwrap(); let int64_case = case_by_id(cases, "int64.large"); let int64_value = ClientMxValue::from_proto(MxValue { data_type: MxDataType::Integer as i32, variant_type: "VT_I8".to_owned(), kind: Some(Kind::Int64Value( int64_case["value"]["int64Value"] .as_str() .unwrap() .parse() .unwrap(), )), ..MxValue::default() }); assert_eq!( int64_value.projection(), MxValueProjection::Int64(9_223_372_036_854_770_000) ); let raw_case = case_by_id(cases, "raw-fallback.variant"); let raw_value = ClientMxValue::from_proto(MxValue { data_type: MxDataType::Unknown as i32, variant_type: "VT_RECORD".to_owned(), raw_diagnostic: raw_case["value"]["rawDiagnostic"] .as_str() .unwrap() .to_owned(), raw_data_type: raw_case["value"]["rawDataType"].as_i64().unwrap() as i32, kind: Some(Kind::RawValue(vec![1, 2, 3, 4, 5])), ..MxValue::default() }); assert_eq!( raw_value.projection(), MxValueProjection::Raw(vec![1, 2, 3, 4, 5]) ); assert_eq!(raw_value.raw().raw_data_type, 32767); assert!(raw_value.raw().raw_diagnostic.contains("No lossless")); } #[test] fn status_conversion_fixtures_preserve_raw_fields() { let fixture = behavior_fixture("statuses/status-conversion-cases.json"); let cases = fixture["cases"].as_array().unwrap(); let raw_case = case_by_id(cases, "raw-unknown-category"); let status = MxStatus::from_proto(MxStatusProxy { success: raw_case["status"]["success"].as_i64().unwrap() as i32, category: MxStatusCategory::Unknown as i32, detected_by: MxStatusSource::Unknown as i32, detail: raw_case["status"]["detail"].as_i64().unwrap() as i32, raw_category: raw_case["status"]["rawCategory"].as_i64().unwrap() as i32, raw_detected_by: raw_case["status"]["rawDetectedBy"].as_i64().unwrap() as i32, diagnostic_text: raw_case["status"]["diagnosticText"] .as_str() .unwrap() .to_owned(), }); assert_eq!(status.success(), 0); assert_eq!(status.category(), Some(MxStatusCategory::Unknown)); assert_eq!(status.raw_category(), 99); assert_eq!(status.raw_detected_by(), 77); assert!(status.diagnostic_text().contains("preserved")); } #[test] fn authentication_and_authorization_statuses_are_distinct_and_redacted() { let auth = Error::from(Status::unauthenticated( "invalid API key mxgw_visible_secret", )); let denied = Error::from(Status::permission_denied("missing scope mxaccess.write")); assert!(matches!(auth, Error::Authentication { .. })); assert!(matches!(denied, Error::Authorization { .. })); assert!(!auth.to_string().contains("visible_secret")); } #[test] fn command_error_display_keeps_raw_reply_accessible() { let reply = mxaccess_failure_reply(); let error = CommandError::new(reply.clone()); assert_eq!(error.reply().hresult, Some(-2147220992)); assert!(error.to_string().contains("MxaccessFailure")); } #[tokio::test] async fn add_item_bulk_rejects_input_above_the_thousand_item_cap() { let state = Arc::new(FakeState::default()); let endpoint = spawn_fake_gateway(state.clone()).await; let client = GatewayClient::connect(ClientOptions::new(endpoint)) .await .unwrap(); let session = client.session("session-fixture"); let oversized: Vec = (0..1001).map(|index| format!("Tag{index}")).collect(); let error = session.add_item_bulk(12, oversized).await.unwrap_err(); assert!( matches!(&error, Error::InvalidArgument { name, .. } if name.as_str() == "tag_addresses"), "expected InvalidArgument for tag_addresses, got {error:?}" ); } #[tokio::test] async fn event_stream_surfaces_a_mid_stream_status_fault() { let state = Arc::new(FakeState::default()); state.emit_stream_fault.store(true, Ordering::SeqCst); let endpoint = spawn_fake_gateway(state.clone()).await; let client = GatewayClient::connect(ClientOptions::new(endpoint)) .await .unwrap(); let mut stream = client .stream_events(StreamEventsRequest { session_id: "session-fixture".to_owned(), after_worker_sequence: 0, }) .await .unwrap(); assert_eq!(stream.next().await.unwrap().unwrap().worker_sequence, 1); assert_eq!(stream.next().await.unwrap().unwrap().worker_sequence, 2); let fault = stream.next().await.unwrap().unwrap_err(); assert!( matches!(fault, Error::Unavailable { .. }), "expected Error::Unavailable, got {fault:?}" ); } #[tokio::test] async fn connect_with_unreadable_ca_file_reports_invalid_endpoint() { let options = ClientOptions::new("https://127.0.0.1:65000") .with_plaintext(false) .with_ca_file("definitely-not-a-real-ca-file.pem"); // GatewayClient is not Debug, so unwrap_err is unavailable here. let error = match GatewayClient::connect(options).await { Ok(_) => panic!("connect should fail when the CA file cannot be read"), Err(error) => error, }; assert!( matches!(error, Error::InvalidEndpoint { .. }), "expected Error::InvalidEndpoint, got {error:?}" ); } #[tokio::test] async fn register_returns_malformed_reply_when_ok_reply_has_no_payload() { let state = Arc::new(FakeState::default()); *state.invoke_override.lock().await = Some(InvokeOverride::OkReplyNoPayload); let endpoint = spawn_fake_gateway(state.clone()).await; let client = GatewayClient::connect(ClientOptions::new(endpoint)) .await .unwrap(); let session = client.session("session-fixture"); let error = session.register("client-name").await.unwrap_err(); assert!( matches!(&error, Error::MalformedReply { detail } if detail.contains("Register")), "expected MalformedReply for register, got {error:?}" ); } #[tokio::test] async fn add_item_returns_malformed_reply_when_ok_reply_has_no_payload() { let state = Arc::new(FakeState::default()); *state.invoke_override.lock().await = Some(InvokeOverride::OkReplyNoPayload); let endpoint = spawn_fake_gateway(state.clone()).await; let client = GatewayClient::connect(ClientOptions::new(endpoint)) .await .unwrap(); let session = client.session("session-fixture"); let error = session.add_item(12, "Plant.Area.Tag").await.unwrap_err(); assert!( matches!(&error, Error::MalformedReply { detail } if detail.contains("AddItem")), "expected MalformedReply for add_item, got {error:?}" ); } #[tokio::test] async fn add_item2_returns_malformed_reply_when_ok_reply_has_no_payload() { let state = Arc::new(FakeState::default()); *state.invoke_override.lock().await = Some(InvokeOverride::OkReplyNoPayload); let endpoint = spawn_fake_gateway(state.clone()).await; let client = GatewayClient::connect(ClientOptions::new(endpoint)) .await .unwrap(); let session = client.session("session-fixture"); let error = session .add_item2(12, "Plant.Area.Tag", "ctx") .await .unwrap_err(); assert!( matches!(&error, Error::MalformedReply { detail } if detail.contains("AddItem2")), "expected MalformedReply for add_item2, got {error:?}" ); } #[tokio::test] async fn subscribe_bulk_returns_malformed_reply_on_mismatched_payload_arm() { let state = Arc::new(FakeState::default()); *state.invoke_override.lock().await = Some(InvokeOverride::OkReplyWrongPayloadForBulk); let endpoint = spawn_fake_gateway(state.clone()).await; let client = GatewayClient::connect(ClientOptions::new(endpoint)) .await .unwrap(); let session = client.session("session-fixture"); let error = session .subscribe_bulk(12, vec!["Tank01.Level".to_owned()]) .await .unwrap_err(); assert!( matches!(&error, Error::MalformedReply { detail } if detail.contains("bulk")), "expected MalformedReply for subscribe_bulk, got {error:?}" ); } #[tokio::test] async fn write_bulk_returns_malformed_reply_on_mismatched_payload_arm() { let state = Arc::new(FakeState::default()); *state.invoke_override.lock().await = Some(InvokeOverride::OkReplyWrongPayloadForBulkWrite); let endpoint = spawn_fake_gateway(state.clone()).await; let client = GatewayClient::connect(ClientOptions::new(endpoint)) .await .unwrap(); let session = client.session("session-fixture"); let error = session .write_bulk( 12, vec![WriteBulkEntry { item_handle: 901, value: Some(int_value(11)), user_id: 5, }], ) .await .unwrap_err(); assert!( matches!(&error, Error::MalformedReply { detail } if detail.contains("bulk write")), "expected MalformedReply for write_bulk, got {error:?}" ); } #[tokio::test] async fn read_bulk_returns_malformed_reply_on_mismatched_payload_arm() { let state = Arc::new(FakeState::default()); *state.invoke_override.lock().await = Some(InvokeOverride::OkReplyWrongPayloadForReadBulk); let endpoint = spawn_fake_gateway(state.clone()).await; let client = GatewayClient::connect(ClientOptions::new(endpoint)) .await .unwrap(); let session = client.session("session-fixture"); let error = session .read_bulk(12, &["Tank01.Level"], 500) .await .unwrap_err(); assert!( matches!(&error, Error::MalformedReply { detail } if detail.contains("ReadBulk")), "expected MalformedReply for read_bulk, got {error:?}" ); } #[tokio::test] async fn unary_invoke_maps_status_unavailable_to_error_unavailable() { let state = Arc::new(FakeState::default()); *state.invoke_override.lock().await = Some(InvokeOverride::Unavailable("gateway restarting".to_owned())); let endpoint = spawn_fake_gateway(state.clone()).await; let client = GatewayClient::connect(ClientOptions::new(endpoint)) .await .unwrap(); let session = client.session("session-fixture"); let error = session.add_item(12, "Plant.Area.Tag").await.unwrap_err(); assert!( matches!(&error, Error::Unavailable { .. }), "expected Error::Unavailable for unary unavailable, got {error:?}" ); } #[tokio::test] async fn write2_bulk_round_trips_through_the_fake_gateway() { let state = Arc::new(FakeState::default()); let endpoint = spawn_fake_gateway(state.clone()).await; let client = GatewayClient::connect(ClientOptions::new(endpoint)) .await .unwrap(); let session = client.session("session-fixture"); let results = session .write2_bulk( 12, vec![Write2BulkEntry { item_handle: 901, value: Some(int_value(11)), timestamp_value: Some(int_value(0)), user_id: 5, }], ) .await .unwrap(); assert_eq!(results.len(), 2); assert!(results[0].was_successful); assert!(!results[1].was_successful); let last_command = state.last_command_kind.lock().await; assert_eq!(*last_command, Some(MxCommandKind::Write2Bulk as i32)); } #[tokio::test] async fn write_secured_bulk_round_trips_through_the_fake_gateway() { let state = Arc::new(FakeState::default()); let endpoint = spawn_fake_gateway(state.clone()).await; let client = GatewayClient::connect(ClientOptions::new(endpoint)) .await .unwrap(); let session = client.session("session-fixture"); let results = session .write_secured_bulk( 12, vec![WriteSecuredBulkEntry { item_handle: 901, current_user_id: 7, verifier_user_id: 9, value: Some(int_value(11)), }], ) .await .unwrap(); assert_eq!(results.len(), 2); assert!(results[0].was_successful); let last_command = state.last_command_kind.lock().await; assert_eq!(*last_command, Some(MxCommandKind::WriteSecuredBulk as i32)); } #[tokio::test] async fn write_secured2_bulk_round_trips_through_the_fake_gateway() { let state = Arc::new(FakeState::default()); let endpoint = spawn_fake_gateway(state.clone()).await; let client = GatewayClient::connect(ClientOptions::new(endpoint)) .await .unwrap(); let session = client.session("session-fixture"); let results = session .write_secured2_bulk( 12, vec![WriteSecured2BulkEntry { item_handle: 901, current_user_id: 7, verifier_user_id: 9, value: Some(int_value(11)), timestamp_value: Some(int_value(0)), }], ) .await .unwrap(); assert_eq!(results.len(), 2); assert!(results[0].was_successful); let last_command = state.last_command_kind.lock().await; assert_eq!(*last_command, Some(MxCommandKind::WriteSecured2Bulk as i32)); } #[derive(Default)] struct FakeState { authorization: Mutex>, last_command_kind: Mutex>, last_read_bulk_timeout_ms: Mutex>, stream_dropped: Arc, emit_stream_fault: AtomicBool, /// Test-injected override for the next (and all subsequent) `Invoke` /// calls. When `Some`, the fake gateway returns the override's response /// instead of its default per-kind reply. Used by the malformed-reply /// and unary-Unavailable tests; default `None` preserves existing /// happy-path test behaviour. invoke_override: Mutex>, } /// Test-injected override for the fake gateway's `Invoke` handler. /// /// Each variant short-circuits the per-kind dispatch in `FakeGateway::invoke` /// and reproduces one of the wire shapes the Rust client's error paths must /// handle. The bool tags the OK reply variants as "OK envelope, payload /// missing/wrong" — the exact condition the new `Error::MalformedReply` /// paths in `session.rs` are designed to catch. #[derive(Clone)] enum InvokeOverride { /// Return `Status::unavailable(message)` from the unary Invoke RPC, so /// the client maps it to `Error::Unavailable`. Unavailable(String), /// Return an OK `MxCommandReply` whose `payload` field is `None`. Used /// to exercise `register_server_handle` / `add_item_handle` / /// `add_item2_handle` falling through to the `MalformedReply` arm. OkReplyNoPayload, /// Return an OK reply whose payload arm does not match the bulk-read /// command, so `read_bulk` falls through to its `MalformedReply` arm. OkReplyWrongPayloadForReadBulk, /// Return an OK reply whose payload arm does not match the requested /// bulk command, so `bulk_results` falls through to `MalformedReply`. OkReplyWrongPayloadForBulk, /// Return an OK reply whose payload arm does not match the requested /// bulk-write command, so `bulk_write_results` returns `MalformedReply`. OkReplyWrongPayloadForBulkWrite, } #[derive(Clone)] struct FakeGateway { state: Arc, } #[tonic::async_trait] impl MxAccessGateway for FakeGateway { async fn open_session( &self, request: Request, ) -> Result, Status> { *self.state.authorization.lock().await = request .metadata() .get("authorization") .and_then(|value| value.to_str().ok()) .map(str::to_owned); Ok(Response::new(OpenSessionReply { session_id: "session-fixture".to_owned(), backend_name: "fake".to_owned(), worker_process_id: 1234, worker_protocol_version: 1, gateway_protocol_version: 1, protocol_status: Some(ok_status("opened")), ..OpenSessionReply::default() })) } async fn close_session( &self, request: Request, ) -> Result, Status> { Ok(Response::new(CloseSessionReply { session_id: request.into_inner().session_id, final_state: SessionState::Closed as i32, protocol_status: Some(ok_status("closed")), })) } async fn invoke( &self, request: Request, ) -> Result, Status> { let request = request.into_inner(); let kind = request .command .as_ref() .map(|command| command.kind) .unwrap_or_default(); *self.state.last_command_kind.lock().await = Some(kind); if let Some(override_) = self.state.invoke_override.lock().await.clone() { return match override_ { InvokeOverride::Unavailable(message) => Err(Status::unavailable(message)), InvokeOverride::OkReplyNoPayload => Ok(Response::new(MxCommandReply { session_id: request.session_id, correlation_id: "fake-correlation".to_owned(), kind, protocol_status: Some(ok_status("command ok but payload omitted")), payload: None, ..MxCommandReply::default() })), InvokeOverride::OkReplyWrongPayloadForReadBulk => { Ok(Response::new(MxCommandReply { session_id: request.session_id, correlation_id: "fake-correlation".to_owned(), kind, protocol_status: Some(ok_status("read-bulk wrong payload arm")), // AddItem payload arm against a ReadBulk request: // the client's `read_bulk` matcher must reject it. payload: Some(mx_command_reply::Payload::AddItem(AddItemReply { item_handle: 0, })), ..MxCommandReply::default() })) } InvokeOverride::OkReplyWrongPayloadForBulk => Ok(Response::new(MxCommandReply { session_id: request.session_id, correlation_id: "fake-correlation".to_owned(), kind, protocol_status: Some(ok_status("bulk wrong payload arm")), // AddItem payload arm against a SubscribeBulk request. payload: Some(mx_command_reply::Payload::AddItem(AddItemReply { item_handle: 0, })), ..MxCommandReply::default() })), InvokeOverride::OkReplyWrongPayloadForBulkWrite => { Ok(Response::new(MxCommandReply { session_id: request.session_id, correlation_id: "fake-correlation".to_owned(), kind, protocol_status: Some(ok_status("bulk-write wrong payload arm")), // AddItem payload arm against a WriteBulk request. payload: Some(mx_command_reply::Payload::AddItem(AddItemReply { item_handle: 0, })), ..MxCommandReply::default() })) } }; } if kind == MxCommandKind::Write as i32 { return Ok(Response::new(mxaccess_failure_reply())); } if kind == MxCommandKind::SubscribeBulk as i32 { return Ok(Response::new(MxCommandReply { session_id: request.session_id, correlation_id: "fake-correlation".to_owned(), kind, protocol_status: Some(ok_status("command ok")), payload: Some(mx_command_reply::Payload::SubscribeBulk( BulkSubscribeReply { results: vec![SubscribeResult { server_handle: 12, tag_address: "Area001.Pump001.Speed".to_owned(), item_handle: 34, was_successful: true, error_message: String::new(), }], }, )), ..MxCommandReply::default() })); } // All four bulk-write families return `BulkWriteReply` over the // wire and only differ by which `payload` arm carries it. The // round-trip tests below want one entry per family, so wire them // all up to the same canned reply (one success + one failure) and // pick the matching payload arm by kind. if kind == MxCommandKind::WriteBulk as i32 { return Ok(Response::new(bulk_write_envelope( request.session_id, kind, mx_command_reply::Payload::WriteBulk(canned_bulk_write_reply()), ))); } if kind == MxCommandKind::Write2Bulk as i32 { return Ok(Response::new(bulk_write_envelope( request.session_id, kind, mx_command_reply::Payload::Write2Bulk(canned_bulk_write_reply()), ))); } if kind == MxCommandKind::WriteSecuredBulk as i32 { return Ok(Response::new(bulk_write_envelope( request.session_id, kind, mx_command_reply::Payload::WriteSecuredBulk(canned_bulk_write_reply()), ))); } if kind == MxCommandKind::WriteSecured2Bulk as i32 { return Ok(Response::new(bulk_write_envelope( request.session_id, kind, mx_command_reply::Payload::WriteSecured2Bulk(canned_bulk_write_reply()), ))); } if kind == MxCommandKind::ReadBulk as i32 { let read_command = request.command.as_ref().and_then(|c| { if let Some(mx_command::Payload::ReadBulk(ref r)) = c.payload { Some(r.timeout_ms) } else { None } }); *self.state.last_read_bulk_timeout_ms.lock().await = read_command; return Ok(Response::new(MxCommandReply { session_id: request.session_id, correlation_id: "fake-correlation".to_owned(), kind, protocol_status: Some(ok_status("command ok")), payload: Some(mx_command_reply::Payload::ReadBulk(BulkReadReply { results: vec![BulkReadResult { server_handle: 12, tag_address: "Area001.Pump001.Speed".to_owned(), item_handle: 34, was_successful: true, was_cached: true, value: Some(int_value(99)), quality: 192, source_timestamp: None, statuses: vec![], error_message: String::new(), }], })), ..MxCommandReply::default() })); } Ok(Response::new(MxCommandReply { session_id: request.session_id, correlation_id: "fake-correlation".to_owned(), kind, protocol_status: Some(ok_status("command ok")), payload: Some(mx_command_reply::Payload::AddItem(AddItemReply { item_handle: 34, })), ..MxCommandReply::default() })) } type StreamEventsStream = DropAwareStream; async fn stream_events( &self, _request: Request, ) -> Result, Status> { let (sender, receiver) = mpsc::channel(4); sender.send(Ok(event(1))).await.unwrap(); sender.send(Ok(event(2))).await.unwrap(); if self.state.emit_stream_fault.load(Ordering::SeqCst) { sender .send(Err(Status::unavailable("worker dropped the session"))) .await .unwrap(); } Ok(Response::new(DropAwareStream { inner: ReceiverStream::new(receiver), dropped: self.state.stream_dropped.clone(), })) } async fn acknowledge_alarm( &self, _request: Request, ) -> Result, Status> { Ok(Response::new(AcknowledgeAlarmReply { correlation_id: "corr-1".to_owned(), protocol_status: Some(ok_status("ack ok")), status: Some(MxStatusProxy { success: 1, category: MxStatusCategory::Ok as i32, detected_by: MxStatusSource::RespondingLmx as i32, ..MxStatusProxy::default() }), ..AcknowledgeAlarmReply::default() })) } type StreamAlarmsStream = Pin> + Send + 'static>>; async fn stream_alarms( &self, _request: Request, ) -> Result, Status> { let (sender, receiver) = mpsc::channel(4); sender .send(Ok(AlarmFeedMessage { payload: Some(alarm_feed_message::Payload::ActiveAlarm( ActiveAlarmSnapshot { alarm_full_reference: "Tank01.Level.HiHi".to_owned(), ..ActiveAlarmSnapshot::default() }, )), })) .await .unwrap(); sender .send(Ok(AlarmFeedMessage { payload: Some(alarm_feed_message::Payload::SnapshotComplete(true)), })) .await .unwrap(); let stream = ReceiverStream::new(receiver); Ok(Response::new(Box::pin(stream))) } } struct DropAwareStream { inner: ReceiverStream>, dropped: Arc, } impl Stream for DropAwareStream { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll> { Pin::new(&mut self.inner).poll_next(context) } } impl Drop for DropAwareStream { fn drop(&mut self) { self.dropped.store(true, Ordering::SeqCst); } } async fn spawn_fake_gateway(state: Arc) -> String { let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); let address = listener.local_addr().unwrap(); let incoming = TcpListenerStream::new(listener); let service = MxAccessGatewayServer::new(FakeGateway { state }); tokio::spawn(async move { Server::builder() .add_service(service) .serve_with_incoming(incoming) .await .unwrap(); }); format!("http://{address}") } fn int_value(value: i32) -> MxValue { MxValue { data_type: MxDataType::Integer as i32, variant_type: "VT_I4".to_owned(), kind: Some(Kind::Int32Value(value)), ..MxValue::default() } } fn ok_status(message: &str) -> ProtocolStatus { ProtocolStatus { code: ProtocolStatusCode::Ok as i32, message: message.to_owned(), } } fn mxaccess_failure_reply() -> MxCommandReply { MxCommandReply { session_id: "session-fixture".to_owned(), correlation_id: "gateway-correlation-write-1".to_owned(), kind: MxCommandKind::Write as i32, protocol_status: Some(ProtocolStatus { code: ProtocolStatusCode::MxaccessFailure as i32, message: "MXAccess rejected the write.".to_owned(), }), hresult: Some(-2147220992), statuses: vec![ MxStatusProxy { success: 0, category: MxStatusCategory::SecurityError as i32, detected_by: MxStatusSource::RespondingLmx as i32, detail: 321, raw_category: 8, raw_detected_by: 3, diagnostic_text: "Write denied by provider security.".to_owned(), }, MxStatusProxy { success: 0, category: MxStatusCategory::OperationalError as i32, detected_by: MxStatusSource::RespondingNmx as i32, detail: 902, raw_category: 7, raw_detected_by: 5, diagnostic_text: "Provider rejected the item state.".to_owned(), }, ], ..MxCommandReply::default() } } fn canned_bulk_write_reply() -> BulkWriteReply { BulkWriteReply { results: vec![ BulkWriteResult { server_handle: 12, item_handle: 901, was_successful: true, hresult: None, statuses: vec![], error_message: String::new(), }, BulkWriteResult { server_handle: 12, item_handle: 902, was_successful: false, hresult: None, statuses: vec![], error_message: "invalid handle".to_owned(), }, ], } } fn bulk_write_envelope( session_id: String, kind: i32, payload: mx_command_reply::Payload, ) -> MxCommandReply { MxCommandReply { session_id, correlation_id: "fake-correlation".to_owned(), kind, protocol_status: Some(ok_status("command ok")), payload: Some(payload), ..MxCommandReply::default() } } fn event(sequence: u64) -> MxEvent { MxEvent { family: MxEventFamily::OnDataChange as i32, session_id: "session-fixture".to_owned(), worker_sequence: sequence, ..MxEvent::default() } } fn behavior_fixture(path: &str) -> Value { let path = std::path::Path::new(env!("CARGO_MANIFEST_DIR")) .join("../proto/fixtures/behavior") .join(path); let data = std::fs::read_to_string(&path).unwrap(); serde_json::from_str(&data).unwrap() } fn case_by_id<'a>(cases: &'a [Value], id: &str) -> &'a Value { cases .iter() .find(|case| case["id"].as_str() == Some(id)) .unwrap_or_else(|| panic!("missing fixture case {id}")) }