Files
mxaccessgw/clients/rust/tests/client_behavior.rs
T
Joseph Doherty a0203503a7 Code-review 2026-05-20 sweep: re-review at 1cd51bb, resolve 72 findings across all 11 modules
Re-reviewed every module/client against the 10-category checklist
(REVIEW-PROCESS.md) at commit 1cd51bb, filed 72 new findings, and
fixed them in three priority waves (3 High, 17 Medium, 52 Low).

Highs
- Server-017: enumerate AcknowledgeAlarm / QueryActiveAlarms in
  GatewayGrpcScopeResolver so non-admin keys can use them; document
  the mapping in docs/Authorization.md; add interceptor tests.
- Client.Java-013: add the five missing bulk-method stubs to the
  CLI FakeSession so the test module compiles on a clean tree.
- Client.Rust-013: fix the clippy::doc_lazy_continuation regression
  in generated tonic code by reformatting the ReadBulkCommand proto
  comment and scoping a #![allow(...)] to the generated submodules.

Mediums (highlights)
- Server: unify GatewaySession state-lock discipline (-015) and
  make DisposeAsync race-safe against in-flight CloseAsync (-016);
  add constraint-enforcement test coverage for the bulk-plan path
  (-021).
- Worker: introduce StaRuntimeShutdownException so RunAlarmPollLoop
  can distinguish graceful shutdown from a real STA-affinity
  violation (-016); have the watchdog skip StaHung while
  CurrentCommandCorrelationId is non-empty so a legitimate slow
  ReadBulk no longer self-faults (-017).
- Tests: add per-method round-trip + cancellation coverage for the
  11 GatewaySession bulk methods (-013); replace the real TCP probe
  in GalaxyHierarchyCacheTests with an IGalaxyRepository fake
  (-016).
- IntegrationTests: drive the StreamEvents writer in the live Write
  test and assert OnWriteComplete (-012); add live tests for
  Unadvise/RemoveItem/Unregister ordering, WriteSecured, and
  abnormal worker exit (-014).
- Worker.Tests: replace MxAccessSession reflection with an internal
  CreateForTesting factory (-016); cover WorkerCancel and
  unexpected-body envelope branches (-017).
- Client.Java: cancel MxEventStream when close() races
  beforeStart() (-014); return a CancellingCompletableFuture that
  actually forwards cancellation through .thenApply chains (-015).
- Client.Python: drop the silent localhost-plaintext downgrade in
  the CLI; require explicit --plaintext (-013).
- Client.Rust: stop bench-read-bulk from polluting success-latency
  histograms with failed-call durations (-015); add coverage for
  the five MalformedReply paths, the bulk-write helpers, the
  Error::Unavailable mapping, and the unary-fault path (-016).
- Contracts: extend docs/Contracts.md with the bulk read/write
  command family (-009).

Lows (highlights)
- Server: cap GalaxyGlobMatcher.RegexCache; align
  WorkerAlarmRpcDispatcher missing-session handling; drop the
  duplicate dashboard @page routes; refresh IAlarmRpcDispatcher
  XML doc.
- Worker: surface SetXmlAlarmQuery COM failures; remove dead
  subscriptionExpression / ExecutingCommand arms; preserve
  factory-supplied runtime sessions; split MxAlarmSnapshot.cs into
  three files.
- Tests: dispose the WebApplication in seven test classes; rebuild
  FakeWorkerProcess.WaitForExitAsync against a real TaskCompletion
  source; switch the heartbeat-expires test to ManualTimeProvider;
  add InvariantCulture to the remaining DateTimeOffset.Parse sites;
  document GalaxyFilterInputSafetyTests in GatewayTesting.md.
- IntegrationTests: comment fixes, RecordingServerStreamWriter
  IDisposable, class-level [Trait], single-source ZB default
  connection string.
- Worker.Tests: replace silent-return gating with LiveMxAccessFact
  so absent env vars SKIP not pass; PascalCase rename of probe
  [Fact]s; deterministic deadline test; new frame-protocol error
  tests; ComputeTransitions diff-coverage; relocate dev-rig probes
  to Probes/.
- Contracts: add round-trip coverage and per-field redaction /
  Galaxy-identifier comments to the protos.
- Client.Dotnet: introduce clients/dotnet/Directory.Build.props so
  TreatWarningsAsErrors / analysers apply; document
  DiscoverHierarchyOptions and IMxGatewayCliClient; require typed
  bulk-read handles in CLI; surface AcknowledgeAlarm transport
  faults through Translate().
- Client.Go: kill dead code in alarms_test / fakeGalaxyServer /
  runWriteBulkVariant; document the six new subcommands in
  writeUsage; drain galaxy-watch events on limit; switch io.EOF
  comparisons to errors.Is.
- Client.Java: shared shutdown helpers + new shutdownTimeout
  option; regex-based credential redaction; Long.toUnsignedString
  for uint64 sequence; doc fixes.
- Client.Python: combine duplicate imports; add coverage for
  _percentile / bench-read-bulk / MAX_AGGREGATE_EVENTS /
  _api_key_from_env; populate pyproject metadata and ship py.typed.
- Client.Rust: expose next_correlation_id() so CLI ping/close
  stop hard-coding correlation IDs; resync RustClientDesign.md
  with the current Session / Error surface and CLI subcommand set.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-20 09:46:47 -04:00

1089 lines
38 KiB
Rust

use std::pin::Pin;
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use std::task::{Context, Poll};
use std::time::Duration;
use futures_core::Stream;
use futures_util::StreamExt;
use mxgateway_client::generated::mxaccess_gateway::v1::mx_access_gateway_server::{
MxAccessGateway, MxAccessGatewayServer,
};
use mxgateway_client::generated::mxaccess_gateway::v1::mx_command;
use mxgateway_client::generated::mxaccess_gateway::v1::mx_command_reply;
use mxgateway_client::generated::mxaccess_gateway::v1::mx_value::Kind;
use mxgateway_client::generated::mxaccess_gateway::v1::{
AcknowledgeAlarmReply, AcknowledgeAlarmRequest, ActiveAlarmSnapshot, AddItemReply,
BulkReadReply, BulkReadResult, BulkSubscribeReply, BulkWriteReply, BulkWriteResult,
CloseSessionReply, CloseSessionRequest, MxCommandKind, MxCommandReply, MxDataType, MxEvent,
MxEventFamily, MxStatusCategory, MxStatusProxy, MxStatusSource, MxValue, OpenSessionReply,
OpenSessionRequest, ProtocolStatus, ProtocolStatusCode, QueryActiveAlarmsRequest, SessionState,
StreamEventsRequest, SubscribeResult, Write2BulkEntry, WriteBulkEntry, WriteSecured2BulkEntry,
WriteSecuredBulkEntry,
};
use mxgateway_client::{
ApiKey, ClientOptions, CommandError, Error, GatewayClient, MxStatus, MxValue as ClientMxValue,
MxValueProjection,
};
use serde_json::Value;
use tokio::net::TcpListener;
use tokio::sync::{mpsc, Mutex};
use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream};
use tonic::transport::Server;
use tonic::{Request, Response, Status};
#[tokio::test]
async fn fake_server_receives_bearer_metadata_and_raw_client_is_reachable() {
let state = Arc::new(FakeState::default());
let endpoint = spawn_fake_gateway(state.clone()).await;
let mut client = GatewayClient::connect(
ClientOptions::new(endpoint).with_api_key(ApiKey::new("mxgw_fixture_secret")),
)
.await
.unwrap();
let _raw = client.raw_client();
let session = client
.open_session(OpenSessionRequest {
client_session_name: "rust-test".to_owned(),
..OpenSessionRequest::default()
})
.await
.unwrap();
assert_eq!(session.id(), "session-fixture");
assert_eq!(
state.authorization.lock().await.as_deref(),
Some("Bearer mxgw_fixture_secret")
);
}
#[tokio::test]
async fn session_helpers_build_commands_and_preserve_command_errors() {
let state = Arc::new(FakeState::default());
let endpoint = spawn_fake_gateway(state.clone()).await;
let client = GatewayClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let session = client.session("session-fixture");
let item_handle = session.add_item(12, "Plant.Area.Tag").await.unwrap();
assert_eq!(item_handle, 34);
let last_command = state.last_command_kind.lock().await;
assert_eq!(*last_command, Some(MxCommandKind::AddItem as i32));
drop(last_command);
let error = session
.write(12, 34, ClientMxValue::int32(123), 0)
.await
.unwrap_err();
let Error::Command(error) = error else {
panic!("write failure should preserve the raw command reply: {error:?}");
};
assert_eq!(
error.reply().protocol_status.as_ref().unwrap().code,
ProtocolStatusCode::MxaccessFailure as i32
);
assert_eq!(error.reply().hresult, Some(-2147220992));
assert_eq!(error.reply().statuses.len(), 2);
}
#[tokio::test]
async fn subscribe_bulk_builds_one_bulk_command_and_returns_results() {
let state = Arc::new(FakeState::default());
let endpoint = spawn_fake_gateway(state.clone()).await;
let client = GatewayClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let session = client.session("session-fixture");
let results = session
.subscribe_bulk(12, vec!["Area001.Pump001.Speed".to_owned()])
.await
.unwrap();
assert_eq!(results[0].item_handle, 34);
let last_command = state.last_command_kind.lock().await;
assert_eq!(*last_command, Some(MxCommandKind::SubscribeBulk as i32));
}
#[tokio::test]
async fn write_bulk_builds_one_bulk_command_and_returns_per_entry_results() {
let state = Arc::new(FakeState::default());
let endpoint = spawn_fake_gateway(state.clone()).await;
let client = GatewayClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let session = client.session("session-fixture");
let results = session
.write_bulk(
12,
vec![
WriteBulkEntry {
item_handle: 901,
value: Some(int_value(11)),
user_id: 5,
},
WriteBulkEntry {
item_handle: 902,
value: Some(int_value(22)),
user_id: 5,
},
],
)
.await
.unwrap();
assert_eq!(results.len(), 2);
assert!(results[0].was_successful);
assert!(!results[1].was_successful);
let last_command = state.last_command_kind.lock().await;
assert_eq!(*last_command, Some(MxCommandKind::WriteBulk as i32));
}
#[tokio::test]
async fn read_bulk_forwards_timeout_and_unpacks_cached_flag() {
let state = Arc::new(FakeState::default());
let endpoint = spawn_fake_gateway(state.clone()).await;
let client = GatewayClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let session = client.session("session-fixture");
let results = session
.read_bulk(12, &["Area001.Pump001.Speed"], 750)
.await
.unwrap();
let entry = &results[0];
assert!(entry.was_cached);
assert_eq!(
entry.value.as_ref().and_then(|v| v.kind.as_ref()),
Some(&Kind::Int32Value(99))
);
assert_eq!(*state.last_read_bulk_timeout_ms.lock().await, Some(750));
}
#[tokio::test]
async fn event_stream_preserves_order_and_drop_cancels_server_stream() {
let state = Arc::new(FakeState::default());
let endpoint = spawn_fake_gateway(state.clone()).await;
let client = GatewayClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let mut stream = client
.stream_events(StreamEventsRequest {
session_id: "session-fixture".to_owned(),
after_worker_sequence: 0,
})
.await
.unwrap();
assert_eq!(stream.next().await.unwrap().unwrap().worker_sequence, 1);
assert_eq!(stream.next().await.unwrap().unwrap().worker_sequence, 2);
drop(stream);
for _ in 0..20 {
if state.stream_dropped.load(Ordering::SeqCst) {
return;
}
tokio::time::sleep(Duration::from_millis(25)).await;
}
assert!(state.stream_dropped.load(Ordering::SeqCst));
}
#[tokio::test]
async fn acknowledge_alarm_returns_reply_with_native_status() {
let state = Arc::new(FakeState::default());
let endpoint = spawn_fake_gateway(state.clone()).await;
let client = GatewayClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let reply = client
.acknowledge_alarm(AcknowledgeAlarmRequest {
session_id: "session-fixture".to_owned(),
client_correlation_id: "corr-1".to_owned(),
alarm_full_reference: "Tank01.Level.HiHi".to_owned(),
comment: "investigating".to_owned(),
operator_user: "alice".to_owned(),
})
.await
.unwrap();
assert_eq!(
reply.protocol_status.as_ref().unwrap().code,
ProtocolStatusCode::Ok as i32
);
assert_eq!(reply.status.as_ref().unwrap().success, 1);
}
#[tokio::test]
async fn query_active_alarms_streams_snapshot_rows() {
let state = Arc::new(FakeState::default());
let endpoint = spawn_fake_gateway(state.clone()).await;
let client = GatewayClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let mut stream = client
.query_active_alarms(QueryActiveAlarmsRequest {
session_id: "session-fixture".to_owned(),
..QueryActiveAlarmsRequest::default()
})
.await
.unwrap();
let first = stream.next().await.unwrap().unwrap();
assert_eq!(first.alarm_full_reference, "Tank01.Level.HiHi");
}
#[test]
fn value_conversion_fixtures_keep_typed_projection_and_raw_metadata() {
let fixture = behavior_fixture("values/value-conversion-cases.json");
let cases = fixture["cases"].as_array().unwrap();
let int64_case = case_by_id(cases, "int64.large");
let int64_value = ClientMxValue::from_proto(MxValue {
data_type: MxDataType::Integer as i32,
variant_type: "VT_I8".to_owned(),
kind: Some(Kind::Int64Value(
int64_case["value"]["int64Value"]
.as_str()
.unwrap()
.parse()
.unwrap(),
)),
..MxValue::default()
});
assert_eq!(
int64_value.projection(),
MxValueProjection::Int64(9_223_372_036_854_770_000)
);
let raw_case = case_by_id(cases, "raw-fallback.variant");
let raw_value = ClientMxValue::from_proto(MxValue {
data_type: MxDataType::Unknown as i32,
variant_type: "VT_RECORD".to_owned(),
raw_diagnostic: raw_case["value"]["rawDiagnostic"]
.as_str()
.unwrap()
.to_owned(),
raw_data_type: raw_case["value"]["rawDataType"].as_i64().unwrap() as i32,
kind: Some(Kind::RawValue(vec![1, 2, 3, 4, 5])),
..MxValue::default()
});
assert_eq!(
raw_value.projection(),
MxValueProjection::Raw(vec![1, 2, 3, 4, 5])
);
assert_eq!(raw_value.raw().raw_data_type, 32767);
assert!(raw_value.raw().raw_diagnostic.contains("No lossless"));
}
#[test]
fn status_conversion_fixtures_preserve_raw_fields() {
let fixture = behavior_fixture("statuses/status-conversion-cases.json");
let cases = fixture["cases"].as_array().unwrap();
let raw_case = case_by_id(cases, "raw-unknown-category");
let status = MxStatus::from_proto(MxStatusProxy {
success: raw_case["status"]["success"].as_i64().unwrap() as i32,
category: MxStatusCategory::Unknown as i32,
detected_by: MxStatusSource::Unknown as i32,
detail: raw_case["status"]["detail"].as_i64().unwrap() as i32,
raw_category: raw_case["status"]["rawCategory"].as_i64().unwrap() as i32,
raw_detected_by: raw_case["status"]["rawDetectedBy"].as_i64().unwrap() as i32,
diagnostic_text: raw_case["status"]["diagnosticText"]
.as_str()
.unwrap()
.to_owned(),
});
assert_eq!(status.success(), 0);
assert_eq!(status.category(), Some(MxStatusCategory::Unknown));
assert_eq!(status.raw_category(), 99);
assert_eq!(status.raw_detected_by(), 77);
assert!(status.diagnostic_text().contains("preserved"));
}
#[test]
fn authentication_and_authorization_statuses_are_distinct_and_redacted() {
let auth = Error::from(Status::unauthenticated(
"invalid API key mxgw_visible_secret",
));
let denied = Error::from(Status::permission_denied("missing scope mxaccess.write"));
assert!(matches!(auth, Error::Authentication { .. }));
assert!(matches!(denied, Error::Authorization { .. }));
assert!(!auth.to_string().contains("visible_secret"));
}
#[test]
fn command_error_display_keeps_raw_reply_accessible() {
let reply = mxaccess_failure_reply();
let error = CommandError::new(reply.clone());
assert_eq!(error.reply().hresult, Some(-2147220992));
assert!(error.to_string().contains("MxaccessFailure"));
}
#[tokio::test]
async fn add_item_bulk_rejects_input_above_the_thousand_item_cap() {
let state = Arc::new(FakeState::default());
let endpoint = spawn_fake_gateway(state.clone()).await;
let client = GatewayClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let session = client.session("session-fixture");
let oversized: Vec<String> = (0..1001).map(|index| format!("Tag{index}")).collect();
let error = session.add_item_bulk(12, oversized).await.unwrap_err();
assert!(
matches!(&error, Error::InvalidArgument { name, .. } if name.as_str() == "tag_addresses"),
"expected InvalidArgument for tag_addresses, got {error:?}"
);
}
#[tokio::test]
async fn event_stream_surfaces_a_mid_stream_status_fault() {
let state = Arc::new(FakeState::default());
state.emit_stream_fault.store(true, Ordering::SeqCst);
let endpoint = spawn_fake_gateway(state.clone()).await;
let client = GatewayClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let mut stream = client
.stream_events(StreamEventsRequest {
session_id: "session-fixture".to_owned(),
after_worker_sequence: 0,
})
.await
.unwrap();
assert_eq!(stream.next().await.unwrap().unwrap().worker_sequence, 1);
assert_eq!(stream.next().await.unwrap().unwrap().worker_sequence, 2);
let fault = stream.next().await.unwrap().unwrap_err();
assert!(
matches!(fault, Error::Unavailable { .. }),
"expected Error::Unavailable, got {fault:?}"
);
}
#[tokio::test]
async fn connect_with_unreadable_ca_file_reports_invalid_endpoint() {
let options = ClientOptions::new("https://127.0.0.1:65000")
.with_plaintext(false)
.with_ca_file("definitely-not-a-real-ca-file.pem");
// GatewayClient is not Debug, so unwrap_err is unavailable here.
let error = match GatewayClient::connect(options).await {
Ok(_) => panic!("connect should fail when the CA file cannot be read"),
Err(error) => error,
};
assert!(
matches!(error, Error::InvalidEndpoint { .. }),
"expected Error::InvalidEndpoint, got {error:?}"
);
}
#[tokio::test]
async fn register_returns_malformed_reply_when_ok_reply_has_no_payload() {
let state = Arc::new(FakeState::default());
*state.invoke_override.lock().await = Some(InvokeOverride::OkReplyNoPayload);
let endpoint = spawn_fake_gateway(state.clone()).await;
let client = GatewayClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let session = client.session("session-fixture");
let error = session.register("client-name").await.unwrap_err();
assert!(
matches!(&error, Error::MalformedReply { detail } if detail.contains("Register")),
"expected MalformedReply for register, got {error:?}"
);
}
#[tokio::test]
async fn add_item_returns_malformed_reply_when_ok_reply_has_no_payload() {
let state = Arc::new(FakeState::default());
*state.invoke_override.lock().await = Some(InvokeOverride::OkReplyNoPayload);
let endpoint = spawn_fake_gateway(state.clone()).await;
let client = GatewayClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let session = client.session("session-fixture");
let error = session.add_item(12, "Plant.Area.Tag").await.unwrap_err();
assert!(
matches!(&error, Error::MalformedReply { detail } if detail.contains("AddItem")),
"expected MalformedReply for add_item, got {error:?}"
);
}
#[tokio::test]
async fn add_item2_returns_malformed_reply_when_ok_reply_has_no_payload() {
let state = Arc::new(FakeState::default());
*state.invoke_override.lock().await = Some(InvokeOverride::OkReplyNoPayload);
let endpoint = spawn_fake_gateway(state.clone()).await;
let client = GatewayClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let session = client.session("session-fixture");
let error = session
.add_item2(12, "Plant.Area.Tag", "ctx")
.await
.unwrap_err();
assert!(
matches!(&error, Error::MalformedReply { detail } if detail.contains("AddItem2")),
"expected MalformedReply for add_item2, got {error:?}"
);
}
#[tokio::test]
async fn subscribe_bulk_returns_malformed_reply_on_mismatched_payload_arm() {
let state = Arc::new(FakeState::default());
*state.invoke_override.lock().await = Some(InvokeOverride::OkReplyWrongPayloadForBulk);
let endpoint = spawn_fake_gateway(state.clone()).await;
let client = GatewayClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let session = client.session("session-fixture");
let error = session
.subscribe_bulk(12, vec!["Tank01.Level".to_owned()])
.await
.unwrap_err();
assert!(
matches!(&error, Error::MalformedReply { detail } if detail.contains("bulk")),
"expected MalformedReply for subscribe_bulk, got {error:?}"
);
}
#[tokio::test]
async fn write_bulk_returns_malformed_reply_on_mismatched_payload_arm() {
let state = Arc::new(FakeState::default());
*state.invoke_override.lock().await = Some(InvokeOverride::OkReplyWrongPayloadForBulkWrite);
let endpoint = spawn_fake_gateway(state.clone()).await;
let client = GatewayClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let session = client.session("session-fixture");
let error = session
.write_bulk(
12,
vec![WriteBulkEntry {
item_handle: 901,
value: Some(int_value(11)),
user_id: 5,
}],
)
.await
.unwrap_err();
assert!(
matches!(&error, Error::MalformedReply { detail } if detail.contains("bulk write")),
"expected MalformedReply for write_bulk, got {error:?}"
);
}
#[tokio::test]
async fn read_bulk_returns_malformed_reply_on_mismatched_payload_arm() {
let state = Arc::new(FakeState::default());
*state.invoke_override.lock().await = Some(InvokeOverride::OkReplyWrongPayloadForReadBulk);
let endpoint = spawn_fake_gateway(state.clone()).await;
let client = GatewayClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let session = client.session("session-fixture");
let error = session
.read_bulk(12, &["Tank01.Level"], 500)
.await
.unwrap_err();
assert!(
matches!(&error, Error::MalformedReply { detail } if detail.contains("ReadBulk")),
"expected MalformedReply for read_bulk, got {error:?}"
);
}
#[tokio::test]
async fn unary_invoke_maps_status_unavailable_to_error_unavailable() {
let state = Arc::new(FakeState::default());
*state.invoke_override.lock().await =
Some(InvokeOverride::Unavailable("gateway restarting".to_owned()));
let endpoint = spawn_fake_gateway(state.clone()).await;
let client = GatewayClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let session = client.session("session-fixture");
let error = session.add_item(12, "Plant.Area.Tag").await.unwrap_err();
assert!(
matches!(&error, Error::Unavailable { .. }),
"expected Error::Unavailable for unary unavailable, got {error:?}"
);
}
#[tokio::test]
async fn write2_bulk_round_trips_through_the_fake_gateway() {
let state = Arc::new(FakeState::default());
let endpoint = spawn_fake_gateway(state.clone()).await;
let client = GatewayClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let session = client.session("session-fixture");
let results = session
.write2_bulk(
12,
vec![Write2BulkEntry {
item_handle: 901,
value: Some(int_value(11)),
timestamp_value: Some(int_value(0)),
user_id: 5,
}],
)
.await
.unwrap();
assert_eq!(results.len(), 2);
assert!(results[0].was_successful);
assert!(!results[1].was_successful);
let last_command = state.last_command_kind.lock().await;
assert_eq!(*last_command, Some(MxCommandKind::Write2Bulk as i32));
}
#[tokio::test]
async fn write_secured_bulk_round_trips_through_the_fake_gateway() {
let state = Arc::new(FakeState::default());
let endpoint = spawn_fake_gateway(state.clone()).await;
let client = GatewayClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let session = client.session("session-fixture");
let results = session
.write_secured_bulk(
12,
vec![WriteSecuredBulkEntry {
item_handle: 901,
current_user_id: 7,
verifier_user_id: 9,
value: Some(int_value(11)),
}],
)
.await
.unwrap();
assert_eq!(results.len(), 2);
assert!(results[0].was_successful);
let last_command = state.last_command_kind.lock().await;
assert_eq!(*last_command, Some(MxCommandKind::WriteSecuredBulk as i32));
}
#[tokio::test]
async fn write_secured2_bulk_round_trips_through_the_fake_gateway() {
let state = Arc::new(FakeState::default());
let endpoint = spawn_fake_gateway(state.clone()).await;
let client = GatewayClient::connect(ClientOptions::new(endpoint))
.await
.unwrap();
let session = client.session("session-fixture");
let results = session
.write_secured2_bulk(
12,
vec![WriteSecured2BulkEntry {
item_handle: 901,
current_user_id: 7,
verifier_user_id: 9,
value: Some(int_value(11)),
timestamp_value: Some(int_value(0)),
}],
)
.await
.unwrap();
assert_eq!(results.len(), 2);
assert!(results[0].was_successful);
let last_command = state.last_command_kind.lock().await;
assert_eq!(*last_command, Some(MxCommandKind::WriteSecured2Bulk as i32));
}
#[derive(Default)]
struct FakeState {
authorization: Mutex<Option<String>>,
last_command_kind: Mutex<Option<i32>>,
last_read_bulk_timeout_ms: Mutex<Option<u32>>,
stream_dropped: Arc<AtomicBool>,
emit_stream_fault: AtomicBool,
/// Test-injected override for the next (and all subsequent) `Invoke`
/// calls. When `Some`, the fake gateway returns the override's response
/// instead of its default per-kind reply. Used by the malformed-reply
/// and unary-Unavailable tests; default `None` preserves existing
/// happy-path test behaviour.
invoke_override: Mutex<Option<InvokeOverride>>,
}
/// Test-injected override for the fake gateway's `Invoke` handler.
///
/// Each variant short-circuits the per-kind dispatch in `FakeGateway::invoke`
/// and reproduces one of the wire shapes the Rust client's error paths must
/// handle. The bool tags the OK reply variants as "OK envelope, payload
/// missing/wrong" — the exact condition the new `Error::MalformedReply`
/// paths in `session.rs` are designed to catch.
#[derive(Clone)]
enum InvokeOverride {
/// Return `Status::unavailable(message)` from the unary Invoke RPC, so
/// the client maps it to `Error::Unavailable`.
Unavailable(String),
/// Return an OK `MxCommandReply` whose `payload` field is `None`. Used
/// to exercise `register_server_handle` / `add_item_handle` /
/// `add_item2_handle` falling through to the `MalformedReply` arm.
OkReplyNoPayload,
/// Return an OK reply whose payload arm does not match the bulk-read
/// command, so `read_bulk` falls through to its `MalformedReply` arm.
OkReplyWrongPayloadForReadBulk,
/// Return an OK reply whose payload arm does not match the requested
/// bulk command, so `bulk_results` falls through to `MalformedReply`.
OkReplyWrongPayloadForBulk,
/// Return an OK reply whose payload arm does not match the requested
/// bulk-write command, so `bulk_write_results` returns `MalformedReply`.
OkReplyWrongPayloadForBulkWrite,
}
#[derive(Clone)]
struct FakeGateway {
state: Arc<FakeState>,
}
#[tonic::async_trait]
impl MxAccessGateway for FakeGateway {
async fn open_session(
&self,
request: Request<OpenSessionRequest>,
) -> Result<Response<OpenSessionReply>, Status> {
*self.state.authorization.lock().await = request
.metadata()
.get("authorization")
.and_then(|value| value.to_str().ok())
.map(str::to_owned);
Ok(Response::new(OpenSessionReply {
session_id: "session-fixture".to_owned(),
backend_name: "fake".to_owned(),
worker_process_id: 1234,
worker_protocol_version: 1,
gateway_protocol_version: 1,
protocol_status: Some(ok_status("opened")),
..OpenSessionReply::default()
}))
}
async fn close_session(
&self,
request: Request<CloseSessionRequest>,
) -> Result<Response<CloseSessionReply>, Status> {
Ok(Response::new(CloseSessionReply {
session_id: request.into_inner().session_id,
final_state: SessionState::Closed as i32,
protocol_status: Some(ok_status("closed")),
}))
}
async fn invoke(
&self,
request: Request<mxgateway_client::generated::mxaccess_gateway::v1::MxCommandRequest>,
) -> Result<Response<MxCommandReply>, Status> {
let request = request.into_inner();
let kind = request
.command
.as_ref()
.map(|command| command.kind)
.unwrap_or_default();
*self.state.last_command_kind.lock().await = Some(kind);
if let Some(override_) = self.state.invoke_override.lock().await.clone() {
return match override_ {
InvokeOverride::Unavailable(message) => Err(Status::unavailable(message)),
InvokeOverride::OkReplyNoPayload => Ok(Response::new(MxCommandReply {
session_id: request.session_id,
correlation_id: "fake-correlation".to_owned(),
kind,
protocol_status: Some(ok_status("command ok but payload omitted")),
payload: None,
..MxCommandReply::default()
})),
InvokeOverride::OkReplyWrongPayloadForReadBulk => {
Ok(Response::new(MxCommandReply {
session_id: request.session_id,
correlation_id: "fake-correlation".to_owned(),
kind,
protocol_status: Some(ok_status("read-bulk wrong payload arm")),
// AddItem payload arm against a ReadBulk request:
// the client's `read_bulk` matcher must reject it.
payload: Some(mx_command_reply::Payload::AddItem(AddItemReply {
item_handle: 0,
})),
..MxCommandReply::default()
}))
}
InvokeOverride::OkReplyWrongPayloadForBulk => Ok(Response::new(MxCommandReply {
session_id: request.session_id,
correlation_id: "fake-correlation".to_owned(),
kind,
protocol_status: Some(ok_status("bulk wrong payload arm")),
// AddItem payload arm against a SubscribeBulk request.
payload: Some(mx_command_reply::Payload::AddItem(AddItemReply {
item_handle: 0,
})),
..MxCommandReply::default()
})),
InvokeOverride::OkReplyWrongPayloadForBulkWrite => {
Ok(Response::new(MxCommandReply {
session_id: request.session_id,
correlation_id: "fake-correlation".to_owned(),
kind,
protocol_status: Some(ok_status("bulk-write wrong payload arm")),
// AddItem payload arm against a WriteBulk request.
payload: Some(mx_command_reply::Payload::AddItem(AddItemReply {
item_handle: 0,
})),
..MxCommandReply::default()
}))
}
};
}
if kind == MxCommandKind::Write as i32 {
return Ok(Response::new(mxaccess_failure_reply()));
}
if kind == MxCommandKind::SubscribeBulk as i32 {
return Ok(Response::new(MxCommandReply {
session_id: request.session_id,
correlation_id: "fake-correlation".to_owned(),
kind,
protocol_status: Some(ok_status("command ok")),
payload: Some(mx_command_reply::Payload::SubscribeBulk(
BulkSubscribeReply {
results: vec![SubscribeResult {
server_handle: 12,
tag_address: "Area001.Pump001.Speed".to_owned(),
item_handle: 34,
was_successful: true,
error_message: String::new(),
}],
},
)),
..MxCommandReply::default()
}));
}
// All four bulk-write families return `BulkWriteReply` over the
// wire and only differ by which `payload` arm carries it. The
// round-trip tests below want one entry per family, so wire them
// all up to the same canned reply (one success + one failure) and
// pick the matching payload arm by kind.
if kind == MxCommandKind::WriteBulk as i32 {
return Ok(Response::new(bulk_write_envelope(
request.session_id,
kind,
mx_command_reply::Payload::WriteBulk(canned_bulk_write_reply()),
)));
}
if kind == MxCommandKind::Write2Bulk as i32 {
return Ok(Response::new(bulk_write_envelope(
request.session_id,
kind,
mx_command_reply::Payload::Write2Bulk(canned_bulk_write_reply()),
)));
}
if kind == MxCommandKind::WriteSecuredBulk as i32 {
return Ok(Response::new(bulk_write_envelope(
request.session_id,
kind,
mx_command_reply::Payload::WriteSecuredBulk(canned_bulk_write_reply()),
)));
}
if kind == MxCommandKind::WriteSecured2Bulk as i32 {
return Ok(Response::new(bulk_write_envelope(
request.session_id,
kind,
mx_command_reply::Payload::WriteSecured2Bulk(canned_bulk_write_reply()),
)));
}
if kind == MxCommandKind::ReadBulk as i32 {
let read_command = request.command.as_ref().and_then(|c| {
if let Some(mx_command::Payload::ReadBulk(ref r)) = c.payload {
Some(r.timeout_ms)
} else {
None
}
});
*self.state.last_read_bulk_timeout_ms.lock().await = read_command;
return Ok(Response::new(MxCommandReply {
session_id: request.session_id,
correlation_id: "fake-correlation".to_owned(),
kind,
protocol_status: Some(ok_status("command ok")),
payload: Some(mx_command_reply::Payload::ReadBulk(BulkReadReply {
results: vec![BulkReadResult {
server_handle: 12,
tag_address: "Area001.Pump001.Speed".to_owned(),
item_handle: 34,
was_successful: true,
was_cached: true,
value: Some(int_value(99)),
quality: 192,
source_timestamp: None,
statuses: vec![],
error_message: String::new(),
}],
})),
..MxCommandReply::default()
}));
}
Ok(Response::new(MxCommandReply {
session_id: request.session_id,
correlation_id: "fake-correlation".to_owned(),
kind,
protocol_status: Some(ok_status("command ok")),
payload: Some(mx_command_reply::Payload::AddItem(AddItemReply {
item_handle: 34,
})),
..MxCommandReply::default()
}))
}
type StreamEventsStream = DropAwareStream;
async fn stream_events(
&self,
_request: Request<StreamEventsRequest>,
) -> Result<Response<Self::StreamEventsStream>, Status> {
let (sender, receiver) = mpsc::channel(4);
sender.send(Ok(event(1))).await.unwrap();
sender.send(Ok(event(2))).await.unwrap();
if self.state.emit_stream_fault.load(Ordering::SeqCst) {
sender
.send(Err(Status::unavailable("worker dropped the session")))
.await
.unwrap();
}
Ok(Response::new(DropAwareStream {
inner: ReceiverStream::new(receiver),
dropped: self.state.stream_dropped.clone(),
}))
}
async fn acknowledge_alarm(
&self,
_request: Request<AcknowledgeAlarmRequest>,
) -> Result<Response<AcknowledgeAlarmReply>, Status> {
Ok(Response::new(AcknowledgeAlarmReply {
session_id: "session-fixture".to_owned(),
correlation_id: "corr-1".to_owned(),
protocol_status: Some(ok_status("ack ok")),
status: Some(MxStatusProxy {
success: 1,
category: MxStatusCategory::Ok as i32,
detected_by: MxStatusSource::RespondingLmx as i32,
..MxStatusProxy::default()
}),
..AcknowledgeAlarmReply::default()
}))
}
type QueryActiveAlarmsStream =
Pin<Box<dyn Stream<Item = Result<ActiveAlarmSnapshot, Status>> + Send + 'static>>;
async fn query_active_alarms(
&self,
_request: Request<QueryActiveAlarmsRequest>,
) -> Result<Response<Self::QueryActiveAlarmsStream>, Status> {
let (sender, receiver) = mpsc::channel(4);
sender
.send(Ok(ActiveAlarmSnapshot {
alarm_full_reference: "Tank01.Level.HiHi".to_owned(),
..ActiveAlarmSnapshot::default()
}))
.await
.unwrap();
let stream = ReceiverStream::new(receiver);
Ok(Response::new(Box::pin(stream)))
}
}
struct DropAwareStream {
inner: ReceiverStream<Result<MxEvent, Status>>,
dropped: Arc<AtomicBool>,
}
impl Stream for DropAwareStream {
type Item = Result<MxEvent, Status>;
fn poll_next(mut self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.inner).poll_next(context)
}
}
impl Drop for DropAwareStream {
fn drop(&mut self) {
self.dropped.store(true, Ordering::SeqCst);
}
}
async fn spawn_fake_gateway(state: Arc<FakeState>) -> String {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let address = listener.local_addr().unwrap();
let incoming = TcpListenerStream::new(listener);
let service = MxAccessGatewayServer::new(FakeGateway { state });
tokio::spawn(async move {
Server::builder()
.add_service(service)
.serve_with_incoming(incoming)
.await
.unwrap();
});
format!("http://{address}")
}
fn int_value(value: i32) -> MxValue {
MxValue {
data_type: MxDataType::Integer as i32,
variant_type: "VT_I4".to_owned(),
kind: Some(Kind::Int32Value(value)),
..MxValue::default()
}
}
fn ok_status(message: &str) -> ProtocolStatus {
ProtocolStatus {
code: ProtocolStatusCode::Ok as i32,
message: message.to_owned(),
}
}
fn mxaccess_failure_reply() -> MxCommandReply {
MxCommandReply {
session_id: "session-fixture".to_owned(),
correlation_id: "gateway-correlation-write-1".to_owned(),
kind: MxCommandKind::Write as i32,
protocol_status: Some(ProtocolStatus {
code: ProtocolStatusCode::MxaccessFailure as i32,
message: "MXAccess rejected the write.".to_owned(),
}),
hresult: Some(-2147220992),
statuses: vec![
MxStatusProxy {
success: 0,
category: MxStatusCategory::SecurityError as i32,
detected_by: MxStatusSource::RespondingLmx as i32,
detail: 321,
raw_category: 8,
raw_detected_by: 3,
diagnostic_text: "Write denied by provider security.".to_owned(),
},
MxStatusProxy {
success: 0,
category: MxStatusCategory::OperationalError as i32,
detected_by: MxStatusSource::RespondingNmx as i32,
detail: 902,
raw_category: 7,
raw_detected_by: 5,
diagnostic_text: "Provider rejected the item state.".to_owned(),
},
],
..MxCommandReply::default()
}
}
fn canned_bulk_write_reply() -> BulkWriteReply {
BulkWriteReply {
results: vec![
BulkWriteResult {
server_handle: 12,
item_handle: 901,
was_successful: true,
hresult: None,
statuses: vec![],
error_message: String::new(),
},
BulkWriteResult {
server_handle: 12,
item_handle: 902,
was_successful: false,
hresult: None,
statuses: vec![],
error_message: "invalid handle".to_owned(),
},
],
}
}
fn bulk_write_envelope(
session_id: String,
kind: i32,
payload: mx_command_reply::Payload,
) -> MxCommandReply {
MxCommandReply {
session_id,
correlation_id: "fake-correlation".to_owned(),
kind,
protocol_status: Some(ok_status("command ok")),
payload: Some(payload),
..MxCommandReply::default()
}
}
fn event(sequence: u64) -> MxEvent {
MxEvent {
family: MxEventFamily::OnDataChange as i32,
session_id: "session-fixture".to_owned(),
worker_sequence: sequence,
..MxEvent::default()
}
}
fn behavior_fixture(path: &str) -> Value {
let path = std::path::Path::new(env!("CARGO_MANIFEST_DIR"))
.join("../proto/fixtures/behavior")
.join(path);
let data = std::fs::read_to_string(&path).unwrap();
serde_json::from_str(&data).unwrap()
}
fn case_by_id<'a>(cases: &'a [Value], id: &str) -> &'a Value {
cases
.iter()
.find(|case| case["id"].as_str() == Some(id))
.unwrap_or_else(|| panic!("missing fixture case {id}"))
}