5e375f6d3d
Adds five new MXAccess command kinds (WriteBulk, Write2Bulk,
WriteSecuredBulk, WriteSecured2Bulk, ReadBulk) that ride the existing
"one round-trip, per-entry results" bulk shape used by AddItemBulk and
SubscribeBulk today. MXAccess COM has no native bulk API; the worker
runs each bulk operation as a sequential loop on its STA, returning
one BulkWriteResult / BulkReadResult per requested entry so per-item
MXAccess failures surface as was_successful=false rather than throwing.
ReadBulk has no MXAccess analogue. The worker satisfies it by:
- Returning the last cached OnDataChange payload (was_cached=true)
when the requested tag is already in the session''s item registry
AND advised — the existing subscription is NOT touched, since the
caller did not create it.
- Otherwise taking the AddItem + Advise + wait-for-OnDataChange +
UnAdvise + RemoveItem snapshot lifecycle itself (was_cached=false)
and leaving the session exactly as it was. The wait pumps Windows
messages on the STA so the inbound MXAccess event can dispatch
while the executor still holds the thread.
The new MxAccessValueCache lives on each MxAccessSession, shared with
MxAccessBaseEventSink which populates it on every OnDataChange after
the event clears the outbound queue. Eviction on RemoveItem keeps
reused MXAccess handles from serving stale values from a previous
lifetime.
Gateway-side authorization wires WriteBulk/Write2Bulk to invoke:write,
WriteSecuredBulk/WriteSecured2Bulk to invoke:secure, ReadBulk to
invoke:read. The constraint-filter pipeline is refactored from a single
BulkConstraintPlan record into an abstract base plus three concretes
(SubscribeBulk, WriteBulk, ReadBulk), each owning its own denied-entry
merge so the dispatch site never branches on reply shape. A new
FilterWriteBulkAsync<TEntry> generic over the four write-entry shapes
runs CheckWriteHandleAsync per entry; denied entries surface as the
BulkWriteResult shape, preserving original-index order.
All five language clients (.NET, Go, Rust, Python, Java) gained the
five new methods following their existing bulk pattern, with regenerated
protobufs.
Tests added:
- MxAccessValueCacheTests (6 cases) — Set/TryGet, Remove resets the
version, TryWaitForUpdate signals on Set, pump step fires each poll.
- MxAccessBaseEventSinkTests — OnDataChange populates the cache,
ValueCache property exposes the bound instance.
- MxAccessCommandExecutorTests — four bulk-write variants (per-entry
success/failure, value+timestamp forwarding, secured user ids),
ReadBulk snapshot lifecycle on uncached tag (timeout surfaces as
was_successful=false), invalid-payload reply.
- GatewayGrpcScopeResolverTests — five new MxCommandKind cases.
- SessionManagerTests — WriteBulk and ReadBulk forwarding through
FakeWorkerHarness; ReadBulk forwards timeout_ms.
- Per-client (.NET, Go, Rust, Python, Java) — WriteBulk builds the
right command and returns per-entry results, ReadBulk forwards the
timeout and unpacks the was_cached flag.
Cross-language e2e CLI subcommands for the new bulks are deliberately
scoped out of this change (each of the five client CLIs would need
five new subcommands plus matching phases in
scripts/run-client-e2e-tests.ps1); coverage equivalent to the existing
bulk-subscribe coverage is provided by worker + gateway + per-client
unit tests.
Docs updated in the same commit: gateway.md (Public MXAccess Command
Surface), docs/DesignDecisions.md (new "Bulk Command Family" section
with the ReadBulk cache-then-snapshot rationale), and every client
README.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
725 lines
25 KiB
Rust
725 lines
25 KiB
Rust
use std::pin::Pin;
|
|
use std::sync::{
|
|
atomic::{AtomicBool, Ordering},
|
|
Arc,
|
|
};
|
|
use std::task::{Context, Poll};
|
|
use std::time::Duration;
|
|
|
|
use futures_core::Stream;
|
|
use futures_util::StreamExt;
|
|
use mxgateway_client::generated::mxaccess_gateway::v1::mx_access_gateway_server::{
|
|
MxAccessGateway, MxAccessGatewayServer,
|
|
};
|
|
use mxgateway_client::generated::mxaccess_gateway::v1::mx_command;
|
|
use mxgateway_client::generated::mxaccess_gateway::v1::mx_command_reply;
|
|
use mxgateway_client::generated::mxaccess_gateway::v1::mx_value::Kind;
|
|
use mxgateway_client::generated::mxaccess_gateway::v1::{
|
|
AcknowledgeAlarmReply, AcknowledgeAlarmRequest, ActiveAlarmSnapshot, AddItemReply,
|
|
BulkReadReply, BulkReadResult, BulkSubscribeReply, BulkWriteReply, BulkWriteResult,
|
|
CloseSessionReply, CloseSessionRequest, MxCommandKind, MxCommandReply, MxDataType, MxEvent,
|
|
MxEventFamily, MxStatusCategory, MxStatusProxy, MxStatusSource, MxValue, OpenSessionReply,
|
|
OpenSessionRequest, ProtocolStatus, ProtocolStatusCode, QueryActiveAlarmsRequest, SessionState,
|
|
StreamEventsRequest, SubscribeResult, WriteBulkEntry,
|
|
};
|
|
use mxgateway_client::{
|
|
ApiKey, ClientOptions, CommandError, Error, GatewayClient, MxStatus, MxValue as ClientMxValue,
|
|
MxValueProjection,
|
|
};
|
|
use serde_json::Value;
|
|
use tokio::net::TcpListener;
|
|
use tokio::sync::{mpsc, Mutex};
|
|
use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream};
|
|
use tonic::transport::Server;
|
|
use tonic::{Request, Response, Status};
|
|
|
|
#[tokio::test]
|
|
async fn fake_server_receives_bearer_metadata_and_raw_client_is_reachable() {
|
|
let state = Arc::new(FakeState::default());
|
|
let endpoint = spawn_fake_gateway(state.clone()).await;
|
|
let mut client = GatewayClient::connect(
|
|
ClientOptions::new(endpoint).with_api_key(ApiKey::new("mxgw_fixture_secret")),
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
let _raw = client.raw_client();
|
|
let session = client
|
|
.open_session(OpenSessionRequest {
|
|
client_session_name: "rust-test".to_owned(),
|
|
..OpenSessionRequest::default()
|
|
})
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_eq!(session.id(), "session-fixture");
|
|
assert_eq!(
|
|
state.authorization.lock().await.as_deref(),
|
|
Some("Bearer mxgw_fixture_secret")
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn session_helpers_build_commands_and_preserve_command_errors() {
|
|
let state = Arc::new(FakeState::default());
|
|
let endpoint = spawn_fake_gateway(state.clone()).await;
|
|
let client = GatewayClient::connect(ClientOptions::new(endpoint))
|
|
.await
|
|
.unwrap();
|
|
let session = client.session("session-fixture");
|
|
|
|
let item_handle = session.add_item(12, "Plant.Area.Tag").await.unwrap();
|
|
|
|
assert_eq!(item_handle, 34);
|
|
let last_command = state.last_command_kind.lock().await;
|
|
assert_eq!(*last_command, Some(MxCommandKind::AddItem as i32));
|
|
drop(last_command);
|
|
|
|
let error = session
|
|
.write(12, 34, ClientMxValue::int32(123), 0)
|
|
.await
|
|
.unwrap_err();
|
|
let Error::Command(error) = error else {
|
|
panic!("write failure should preserve the raw command reply: {error:?}");
|
|
};
|
|
assert_eq!(
|
|
error.reply().protocol_status.as_ref().unwrap().code,
|
|
ProtocolStatusCode::MxaccessFailure as i32
|
|
);
|
|
assert_eq!(error.reply().hresult, Some(-2147220992));
|
|
assert_eq!(error.reply().statuses.len(), 2);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn subscribe_bulk_builds_one_bulk_command_and_returns_results() {
|
|
let state = Arc::new(FakeState::default());
|
|
let endpoint = spawn_fake_gateway(state.clone()).await;
|
|
let client = GatewayClient::connect(ClientOptions::new(endpoint))
|
|
.await
|
|
.unwrap();
|
|
let session = client.session("session-fixture");
|
|
|
|
let results = session
|
|
.subscribe_bulk(12, vec!["Area001.Pump001.Speed".to_owned()])
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_eq!(results[0].item_handle, 34);
|
|
let last_command = state.last_command_kind.lock().await;
|
|
assert_eq!(*last_command, Some(MxCommandKind::SubscribeBulk as i32));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn write_bulk_builds_one_bulk_command_and_returns_per_entry_results() {
|
|
let state = Arc::new(FakeState::default());
|
|
let endpoint = spawn_fake_gateway(state.clone()).await;
|
|
let client = GatewayClient::connect(ClientOptions::new(endpoint))
|
|
.await
|
|
.unwrap();
|
|
let session = client.session("session-fixture");
|
|
|
|
let results = session
|
|
.write_bulk(
|
|
12,
|
|
vec![
|
|
WriteBulkEntry {
|
|
item_handle: 901,
|
|
value: Some(int_value(11)),
|
|
user_id: 5,
|
|
},
|
|
WriteBulkEntry {
|
|
item_handle: 902,
|
|
value: Some(int_value(22)),
|
|
user_id: 5,
|
|
},
|
|
],
|
|
)
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_eq!(results.len(), 2);
|
|
assert!(results[0].was_successful);
|
|
assert!(!results[1].was_successful);
|
|
let last_command = state.last_command_kind.lock().await;
|
|
assert_eq!(*last_command, Some(MxCommandKind::WriteBulk as i32));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn read_bulk_forwards_timeout_and_unpacks_cached_flag() {
|
|
let state = Arc::new(FakeState::default());
|
|
let endpoint = spawn_fake_gateway(state.clone()).await;
|
|
let client = GatewayClient::connect(ClientOptions::new(endpoint))
|
|
.await
|
|
.unwrap();
|
|
let session = client.session("session-fixture");
|
|
|
|
let results = session
|
|
.read_bulk(12, vec!["Area001.Pump001.Speed".to_owned()], 750)
|
|
.await
|
|
.unwrap();
|
|
|
|
let entry = &results[0];
|
|
assert!(entry.was_cached);
|
|
assert_eq!(entry.value.as_ref().and_then(|v| v.kind.as_ref()), Some(&Kind::Int32Value(99)));
|
|
assert_eq!(*state.last_read_bulk_timeout_ms.lock().await, Some(750));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn event_stream_preserves_order_and_drop_cancels_server_stream() {
|
|
let state = Arc::new(FakeState::default());
|
|
let endpoint = spawn_fake_gateway(state.clone()).await;
|
|
let client = GatewayClient::connect(ClientOptions::new(endpoint))
|
|
.await
|
|
.unwrap();
|
|
|
|
let mut stream = client
|
|
.stream_events(StreamEventsRequest {
|
|
session_id: "session-fixture".to_owned(),
|
|
after_worker_sequence: 0,
|
|
})
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_eq!(stream.next().await.unwrap().unwrap().worker_sequence, 1);
|
|
assert_eq!(stream.next().await.unwrap().unwrap().worker_sequence, 2);
|
|
|
|
drop(stream);
|
|
for _ in 0..20 {
|
|
if state.stream_dropped.load(Ordering::SeqCst) {
|
|
return;
|
|
}
|
|
tokio::time::sleep(Duration::from_millis(25)).await;
|
|
}
|
|
|
|
assert!(state.stream_dropped.load(Ordering::SeqCst));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn acknowledge_alarm_returns_reply_with_native_status() {
|
|
let state = Arc::new(FakeState::default());
|
|
let endpoint = spawn_fake_gateway(state.clone()).await;
|
|
let client = GatewayClient::connect(ClientOptions::new(endpoint))
|
|
.await
|
|
.unwrap();
|
|
|
|
let reply = client
|
|
.acknowledge_alarm(AcknowledgeAlarmRequest {
|
|
session_id: "session-fixture".to_owned(),
|
|
client_correlation_id: "corr-1".to_owned(),
|
|
alarm_full_reference: "Tank01.Level.HiHi".to_owned(),
|
|
comment: "investigating".to_owned(),
|
|
operator_user: "alice".to_owned(),
|
|
})
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_eq!(
|
|
reply.protocol_status.as_ref().unwrap().code,
|
|
ProtocolStatusCode::Ok as i32
|
|
);
|
|
assert_eq!(reply.status.as_ref().unwrap().success, 1);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn query_active_alarms_streams_snapshot_rows() {
|
|
let state = Arc::new(FakeState::default());
|
|
let endpoint = spawn_fake_gateway(state.clone()).await;
|
|
let client = GatewayClient::connect(ClientOptions::new(endpoint))
|
|
.await
|
|
.unwrap();
|
|
|
|
let mut stream = client
|
|
.query_active_alarms(QueryActiveAlarmsRequest {
|
|
session_id: "session-fixture".to_owned(),
|
|
..QueryActiveAlarmsRequest::default()
|
|
})
|
|
.await
|
|
.unwrap();
|
|
|
|
let first = stream.next().await.unwrap().unwrap();
|
|
assert_eq!(first.alarm_full_reference, "Tank01.Level.HiHi");
|
|
}
|
|
|
|
#[test]
|
|
fn value_conversion_fixtures_keep_typed_projection_and_raw_metadata() {
|
|
let fixture = behavior_fixture("values/value-conversion-cases.json");
|
|
let cases = fixture["cases"].as_array().unwrap();
|
|
|
|
let int64_case = case_by_id(cases, "int64.large");
|
|
let int64_value = ClientMxValue::from_proto(MxValue {
|
|
data_type: MxDataType::Integer as i32,
|
|
variant_type: "VT_I8".to_owned(),
|
|
kind: Some(Kind::Int64Value(
|
|
int64_case["value"]["int64Value"]
|
|
.as_str()
|
|
.unwrap()
|
|
.parse()
|
|
.unwrap(),
|
|
)),
|
|
..MxValue::default()
|
|
});
|
|
assert_eq!(
|
|
int64_value.projection(),
|
|
MxValueProjection::Int64(9_223_372_036_854_770_000)
|
|
);
|
|
|
|
let raw_case = case_by_id(cases, "raw-fallback.variant");
|
|
let raw_value = ClientMxValue::from_proto(MxValue {
|
|
data_type: MxDataType::Unknown as i32,
|
|
variant_type: "VT_RECORD".to_owned(),
|
|
raw_diagnostic: raw_case["value"]["rawDiagnostic"]
|
|
.as_str()
|
|
.unwrap()
|
|
.to_owned(),
|
|
raw_data_type: raw_case["value"]["rawDataType"].as_i64().unwrap() as i32,
|
|
kind: Some(Kind::RawValue(vec![1, 2, 3, 4, 5])),
|
|
..MxValue::default()
|
|
});
|
|
assert_eq!(
|
|
raw_value.projection(),
|
|
MxValueProjection::Raw(vec![1, 2, 3, 4, 5])
|
|
);
|
|
assert_eq!(raw_value.raw().raw_data_type, 32767);
|
|
assert!(raw_value.raw().raw_diagnostic.contains("No lossless"));
|
|
}
|
|
|
|
#[test]
|
|
fn status_conversion_fixtures_preserve_raw_fields() {
|
|
let fixture = behavior_fixture("statuses/status-conversion-cases.json");
|
|
let cases = fixture["cases"].as_array().unwrap();
|
|
let raw_case = case_by_id(cases, "raw-unknown-category");
|
|
let status = MxStatus::from_proto(MxStatusProxy {
|
|
success: raw_case["status"]["success"].as_i64().unwrap() as i32,
|
|
category: MxStatusCategory::Unknown as i32,
|
|
detected_by: MxStatusSource::Unknown as i32,
|
|
detail: raw_case["status"]["detail"].as_i64().unwrap() as i32,
|
|
raw_category: raw_case["status"]["rawCategory"].as_i64().unwrap() as i32,
|
|
raw_detected_by: raw_case["status"]["rawDetectedBy"].as_i64().unwrap() as i32,
|
|
diagnostic_text: raw_case["status"]["diagnosticText"]
|
|
.as_str()
|
|
.unwrap()
|
|
.to_owned(),
|
|
});
|
|
|
|
assert_eq!(status.success(), 0);
|
|
assert_eq!(status.category(), Some(MxStatusCategory::Unknown));
|
|
assert_eq!(status.raw_category(), 99);
|
|
assert_eq!(status.raw_detected_by(), 77);
|
|
assert!(status.diagnostic_text().contains("preserved"));
|
|
}
|
|
|
|
#[test]
|
|
fn authentication_and_authorization_statuses_are_distinct_and_redacted() {
|
|
let auth = Error::from(Status::unauthenticated(
|
|
"invalid API key mxgw_visible_secret",
|
|
));
|
|
let denied = Error::from(Status::permission_denied("missing scope mxaccess.write"));
|
|
|
|
assert!(matches!(auth, Error::Authentication { .. }));
|
|
assert!(matches!(denied, Error::Authorization { .. }));
|
|
assert!(!auth.to_string().contains("visible_secret"));
|
|
}
|
|
|
|
#[test]
|
|
fn command_error_display_keeps_raw_reply_accessible() {
|
|
let reply = mxaccess_failure_reply();
|
|
let error = CommandError::new(reply.clone());
|
|
|
|
assert_eq!(error.reply().hresult, Some(-2147220992));
|
|
assert!(error.to_string().contains("MxaccessFailure"));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn add_item_bulk_rejects_input_above_the_thousand_item_cap() {
|
|
let state = Arc::new(FakeState::default());
|
|
let endpoint = spawn_fake_gateway(state.clone()).await;
|
|
let client = GatewayClient::connect(ClientOptions::new(endpoint))
|
|
.await
|
|
.unwrap();
|
|
let session = client.session("session-fixture");
|
|
|
|
let oversized: Vec<String> = (0..1001).map(|index| format!("Tag{index}")).collect();
|
|
let error = session.add_item_bulk(12, oversized).await.unwrap_err();
|
|
|
|
assert!(
|
|
matches!(&error, Error::InvalidArgument { name, .. } if name.as_str() == "tag_addresses"),
|
|
"expected InvalidArgument for tag_addresses, got {error:?}"
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn event_stream_surfaces_a_mid_stream_status_fault() {
|
|
let state = Arc::new(FakeState::default());
|
|
state.emit_stream_fault.store(true, Ordering::SeqCst);
|
|
let endpoint = spawn_fake_gateway(state.clone()).await;
|
|
let client = GatewayClient::connect(ClientOptions::new(endpoint))
|
|
.await
|
|
.unwrap();
|
|
|
|
let mut stream = client
|
|
.stream_events(StreamEventsRequest {
|
|
session_id: "session-fixture".to_owned(),
|
|
after_worker_sequence: 0,
|
|
})
|
|
.await
|
|
.unwrap();
|
|
|
|
assert_eq!(stream.next().await.unwrap().unwrap().worker_sequence, 1);
|
|
assert_eq!(stream.next().await.unwrap().unwrap().worker_sequence, 2);
|
|
|
|
let fault = stream.next().await.unwrap().unwrap_err();
|
|
|
|
assert!(
|
|
matches!(fault, Error::Unavailable { .. }),
|
|
"expected Error::Unavailable, got {fault:?}"
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn connect_with_unreadable_ca_file_reports_invalid_endpoint() {
|
|
let options = ClientOptions::new("https://127.0.0.1:65000")
|
|
.with_plaintext(false)
|
|
.with_ca_file("definitely-not-a-real-ca-file.pem");
|
|
|
|
// GatewayClient is not Debug, so unwrap_err is unavailable here.
|
|
let error = match GatewayClient::connect(options).await {
|
|
Ok(_) => panic!("connect should fail when the CA file cannot be read"),
|
|
Err(error) => error,
|
|
};
|
|
|
|
assert!(
|
|
matches!(error, Error::InvalidEndpoint { .. }),
|
|
"expected Error::InvalidEndpoint, got {error:?}"
|
|
);
|
|
}
|
|
|
|
#[derive(Default)]
|
|
struct FakeState {
|
|
authorization: Mutex<Option<String>>,
|
|
last_command_kind: Mutex<Option<i32>>,
|
|
last_read_bulk_timeout_ms: Mutex<Option<u32>>,
|
|
stream_dropped: Arc<AtomicBool>,
|
|
emit_stream_fault: AtomicBool,
|
|
}
|
|
|
|
#[derive(Clone)]
|
|
struct FakeGateway {
|
|
state: Arc<FakeState>,
|
|
}
|
|
|
|
#[tonic::async_trait]
|
|
impl MxAccessGateway for FakeGateway {
|
|
async fn open_session(
|
|
&self,
|
|
request: Request<OpenSessionRequest>,
|
|
) -> Result<Response<OpenSessionReply>, Status> {
|
|
*self.state.authorization.lock().await = request
|
|
.metadata()
|
|
.get("authorization")
|
|
.and_then(|value| value.to_str().ok())
|
|
.map(str::to_owned);
|
|
|
|
Ok(Response::new(OpenSessionReply {
|
|
session_id: "session-fixture".to_owned(),
|
|
backend_name: "fake".to_owned(),
|
|
worker_process_id: 1234,
|
|
worker_protocol_version: 1,
|
|
gateway_protocol_version: 1,
|
|
protocol_status: Some(ok_status("opened")),
|
|
..OpenSessionReply::default()
|
|
}))
|
|
}
|
|
|
|
async fn close_session(
|
|
&self,
|
|
request: Request<CloseSessionRequest>,
|
|
) -> Result<Response<CloseSessionReply>, Status> {
|
|
Ok(Response::new(CloseSessionReply {
|
|
session_id: request.into_inner().session_id,
|
|
final_state: SessionState::Closed as i32,
|
|
protocol_status: Some(ok_status("closed")),
|
|
}))
|
|
}
|
|
|
|
async fn invoke(
|
|
&self,
|
|
request: Request<mxgateway_client::generated::mxaccess_gateway::v1::MxCommandRequest>,
|
|
) -> Result<Response<MxCommandReply>, Status> {
|
|
let request = request.into_inner();
|
|
let kind = request
|
|
.command
|
|
.as_ref()
|
|
.map(|command| command.kind)
|
|
.unwrap_or_default();
|
|
*self.state.last_command_kind.lock().await = Some(kind);
|
|
|
|
if kind == MxCommandKind::Write as i32 {
|
|
return Ok(Response::new(mxaccess_failure_reply()));
|
|
}
|
|
|
|
if kind == MxCommandKind::SubscribeBulk as i32 {
|
|
return Ok(Response::new(MxCommandReply {
|
|
session_id: request.session_id,
|
|
correlation_id: "fake-correlation".to_owned(),
|
|
kind,
|
|
protocol_status: Some(ok_status("command ok")),
|
|
payload: Some(mx_command_reply::Payload::SubscribeBulk(
|
|
BulkSubscribeReply {
|
|
results: vec![SubscribeResult {
|
|
server_handle: 12,
|
|
tag_address: "Area001.Pump001.Speed".to_owned(),
|
|
item_handle: 34,
|
|
was_successful: true,
|
|
error_message: String::new(),
|
|
}],
|
|
},
|
|
)),
|
|
..MxCommandReply::default()
|
|
}));
|
|
}
|
|
|
|
if kind == MxCommandKind::WriteBulk as i32 {
|
|
// Echo one success and one failure so the test can assert the per-entry
|
|
// shape and verify the call did not throw on per-entry failure.
|
|
return Ok(Response::new(MxCommandReply {
|
|
session_id: request.session_id,
|
|
correlation_id: "fake-correlation".to_owned(),
|
|
kind,
|
|
protocol_status: Some(ok_status("command ok")),
|
|
payload: Some(mx_command_reply::Payload::WriteBulk(BulkWriteReply {
|
|
results: vec![
|
|
BulkWriteResult {
|
|
server_handle: 12,
|
|
item_handle: 901,
|
|
was_successful: true,
|
|
hresult: None,
|
|
statuses: vec![],
|
|
error_message: String::new(),
|
|
},
|
|
BulkWriteResult {
|
|
server_handle: 12,
|
|
item_handle: 902,
|
|
was_successful: false,
|
|
hresult: None,
|
|
statuses: vec![],
|
|
error_message: "invalid handle".to_owned(),
|
|
},
|
|
],
|
|
})),
|
|
..MxCommandReply::default()
|
|
}));
|
|
}
|
|
|
|
if kind == MxCommandKind::ReadBulk as i32 {
|
|
let read_command = request.command.as_ref().and_then(|c| {
|
|
if let Some(mx_command::Payload::ReadBulk(ref r)) = c.payload {
|
|
Some(r.timeout_ms)
|
|
} else {
|
|
None
|
|
}
|
|
});
|
|
*self.state.last_read_bulk_timeout_ms.lock().await = read_command;
|
|
return Ok(Response::new(MxCommandReply {
|
|
session_id: request.session_id,
|
|
correlation_id: "fake-correlation".to_owned(),
|
|
kind,
|
|
protocol_status: Some(ok_status("command ok")),
|
|
payload: Some(mx_command_reply::Payload::ReadBulk(BulkReadReply {
|
|
results: vec![BulkReadResult {
|
|
server_handle: 12,
|
|
tag_address: "Area001.Pump001.Speed".to_owned(),
|
|
item_handle: 34,
|
|
was_successful: true,
|
|
was_cached: true,
|
|
value: Some(int_value(99)),
|
|
quality: 192,
|
|
source_timestamp: None,
|
|
statuses: vec![],
|
|
error_message: String::new(),
|
|
}],
|
|
})),
|
|
..MxCommandReply::default()
|
|
}));
|
|
}
|
|
|
|
Ok(Response::new(MxCommandReply {
|
|
session_id: request.session_id,
|
|
correlation_id: "fake-correlation".to_owned(),
|
|
kind,
|
|
protocol_status: Some(ok_status("command ok")),
|
|
payload: Some(mx_command_reply::Payload::AddItem(AddItemReply {
|
|
item_handle: 34,
|
|
})),
|
|
..MxCommandReply::default()
|
|
}))
|
|
}
|
|
|
|
type StreamEventsStream = DropAwareStream;
|
|
|
|
async fn stream_events(
|
|
&self,
|
|
_request: Request<StreamEventsRequest>,
|
|
) -> Result<Response<Self::StreamEventsStream>, Status> {
|
|
let (sender, receiver) = mpsc::channel(4);
|
|
sender.send(Ok(event(1))).await.unwrap();
|
|
sender.send(Ok(event(2))).await.unwrap();
|
|
if self.state.emit_stream_fault.load(Ordering::SeqCst) {
|
|
sender
|
|
.send(Err(Status::unavailable("worker dropped the session")))
|
|
.await
|
|
.unwrap();
|
|
}
|
|
|
|
Ok(Response::new(DropAwareStream {
|
|
inner: ReceiverStream::new(receiver),
|
|
dropped: self.state.stream_dropped.clone(),
|
|
}))
|
|
}
|
|
|
|
async fn acknowledge_alarm(
|
|
&self,
|
|
_request: Request<AcknowledgeAlarmRequest>,
|
|
) -> Result<Response<AcknowledgeAlarmReply>, Status> {
|
|
Ok(Response::new(AcknowledgeAlarmReply {
|
|
session_id: "session-fixture".to_owned(),
|
|
correlation_id: "corr-1".to_owned(),
|
|
protocol_status: Some(ok_status("ack ok")),
|
|
status: Some(MxStatusProxy {
|
|
success: 1,
|
|
category: MxStatusCategory::Ok as i32,
|
|
detected_by: MxStatusSource::RespondingLmx as i32,
|
|
..MxStatusProxy::default()
|
|
}),
|
|
..AcknowledgeAlarmReply::default()
|
|
}))
|
|
}
|
|
|
|
type QueryActiveAlarmsStream =
|
|
Pin<Box<dyn Stream<Item = Result<ActiveAlarmSnapshot, Status>> + Send + 'static>>;
|
|
|
|
async fn query_active_alarms(
|
|
&self,
|
|
_request: Request<QueryActiveAlarmsRequest>,
|
|
) -> Result<Response<Self::QueryActiveAlarmsStream>, Status> {
|
|
let (sender, receiver) = mpsc::channel(4);
|
|
sender
|
|
.send(Ok(ActiveAlarmSnapshot {
|
|
alarm_full_reference: "Tank01.Level.HiHi".to_owned(),
|
|
..ActiveAlarmSnapshot::default()
|
|
}))
|
|
.await
|
|
.unwrap();
|
|
let stream = ReceiverStream::new(receiver);
|
|
Ok(Response::new(Box::pin(stream)))
|
|
}
|
|
}
|
|
|
|
struct DropAwareStream {
|
|
inner: ReceiverStream<Result<MxEvent, Status>>,
|
|
dropped: Arc<AtomicBool>,
|
|
}
|
|
|
|
impl Stream for DropAwareStream {
|
|
type Item = Result<MxEvent, Status>;
|
|
|
|
fn poll_next(mut self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
|
Pin::new(&mut self.inner).poll_next(context)
|
|
}
|
|
}
|
|
|
|
impl Drop for DropAwareStream {
|
|
fn drop(&mut self) {
|
|
self.dropped.store(true, Ordering::SeqCst);
|
|
}
|
|
}
|
|
|
|
async fn spawn_fake_gateway(state: Arc<FakeState>) -> String {
|
|
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
|
let address = listener.local_addr().unwrap();
|
|
let incoming = TcpListenerStream::new(listener);
|
|
let service = MxAccessGatewayServer::new(FakeGateway { state });
|
|
tokio::spawn(async move {
|
|
Server::builder()
|
|
.add_service(service)
|
|
.serve_with_incoming(incoming)
|
|
.await
|
|
.unwrap();
|
|
});
|
|
|
|
format!("http://{address}")
|
|
}
|
|
|
|
fn int_value(value: i32) -> MxValue {
|
|
MxValue {
|
|
data_type: MxDataType::Integer as i32,
|
|
variant_type: "VT_I4".to_owned(),
|
|
kind: Some(Kind::Int32Value(value)),
|
|
..MxValue::default()
|
|
}
|
|
}
|
|
|
|
fn ok_status(message: &str) -> ProtocolStatus {
|
|
ProtocolStatus {
|
|
code: ProtocolStatusCode::Ok as i32,
|
|
message: message.to_owned(),
|
|
}
|
|
}
|
|
|
|
fn mxaccess_failure_reply() -> MxCommandReply {
|
|
MxCommandReply {
|
|
session_id: "session-fixture".to_owned(),
|
|
correlation_id: "gateway-correlation-write-1".to_owned(),
|
|
kind: MxCommandKind::Write as i32,
|
|
protocol_status: Some(ProtocolStatus {
|
|
code: ProtocolStatusCode::MxaccessFailure as i32,
|
|
message: "MXAccess rejected the write.".to_owned(),
|
|
}),
|
|
hresult: Some(-2147220992),
|
|
statuses: vec![
|
|
MxStatusProxy {
|
|
success: 0,
|
|
category: MxStatusCategory::SecurityError as i32,
|
|
detected_by: MxStatusSource::RespondingLmx as i32,
|
|
detail: 321,
|
|
raw_category: 8,
|
|
raw_detected_by: 3,
|
|
diagnostic_text: "Write denied by provider security.".to_owned(),
|
|
},
|
|
MxStatusProxy {
|
|
success: 0,
|
|
category: MxStatusCategory::OperationalError as i32,
|
|
detected_by: MxStatusSource::RespondingNmx as i32,
|
|
detail: 902,
|
|
raw_category: 7,
|
|
raw_detected_by: 5,
|
|
diagnostic_text: "Provider rejected the item state.".to_owned(),
|
|
},
|
|
],
|
|
..MxCommandReply::default()
|
|
}
|
|
}
|
|
|
|
fn event(sequence: u64) -> MxEvent {
|
|
MxEvent {
|
|
family: MxEventFamily::OnDataChange as i32,
|
|
session_id: "session-fixture".to_owned(),
|
|
worker_sequence: sequence,
|
|
..MxEvent::default()
|
|
}
|
|
}
|
|
|
|
fn behavior_fixture(path: &str) -> Value {
|
|
let path = std::path::Path::new(env!("CARGO_MANIFEST_DIR"))
|
|
.join("../proto/fixtures/behavior")
|
|
.join(path);
|
|
let data = std::fs::read_to_string(&path).unwrap();
|
|
serde_json::from_str(&data).unwrap()
|
|
}
|
|
|
|
fn case_by_id<'a>(cases: &'a [Value], id: &str) -> &'a Value {
|
|
cases
|
|
.iter()
|
|
.find(|case| case["id"].as_str() == Some(id))
|
|
.unwrap_or_else(|| panic!("missing fixture case {id}"))
|
|
}
|